add wip slimmed down from gamerbot2

pull/1/head
Matt C 3 years ago
parent ea4864d7b4
commit c807b4ab71

@ -1,3 +1,5 @@
# caveman # caveman
Discord boy (pycord) for monitoring repos and stuff Discord bot (pycord) for monitoring repos and stuff
Based on Matt's Gamerbot2 from https://git.tar.black/matt/Gamerbot2

@ -0,0 +1,68 @@
import discord
from discord.ext import commands
from util_functions import *
# Hopefully we'll never need logging here
class About(commands.Cog):
"""Stuff that the developer couldn't find a better category for"""
def __init__(self, bot):
self.bot = bot
@commands.command()
async def source(self, ctx):
"""Bot source code link"""
await ctx.send(
embed=infmsg(
"Source",
"My source code lives here: https://git.getcryst.al/crystal/caveman",
)
)
@commands.command()
async def report(self, ctx):
"""Report bot issues"""
await ctx.send(
embed=infmsg(
"Issues",
"You can file issues here: https://git.getcryst.al/crystal/caveman/issues",
)
)
@commands.command()
async def suggest(self, ctx):
"""Suggest bot feature(s)"""
await ctx.send(
embed=infmsg(
"Issues",
"You can file issues here: https://git.getcryst.al/crystal/caveman/issues",
)
)
@commands.command()
async def version(self, ctx):
"""Bot version"""
commit_msg = await run_command_shell(
"git --no-pager log --decorate=short --pretty=oneline -n1"
)
msg = ""
msg += "Latest Git commit: \n"
msg += "```" + commit_msg + "```"
await ctx.send(embed=infmsg("Bot Stats", msg))
@commands.command()
async def invite(self, ctx):
"""Add me to another server"""
await ctx.send(
embed=infmsg(
"Invite me :)",
"https://discord.com/api/oauth2/authorize?client_id=900841588996063282&permissions=8&scope=bot%20applications.commands",
)
)
def setup(bot):
bot.add_cog(About(bot))

@ -0,0 +1,139 @@
import discord
from discord.ext import commands
from util_functions import *
from global_config import configboi
from server_config import serverconfig
# Hopefully we'll never need logging here
class Debug(commands.Cog):
"""Stuff that the developer couldn't find a better category for"""
def __init__(self, bot):
self.bot = bot
self.confmgr = configboi("config.txt", False)
self.sconf = serverconfig()
@commands.command()
async def resetgd(self, ctx):
if ctx.message.author.id == ctx.message.guild.owner_id:
self.sconf.rs(str(ctx.message.guild.id))
await ctx.send(":thumbsup:")
@commands.command()
async def checkcog(self, ctx, *, n):
"""check if cog is a thing"""
try:
if ctx.bot.get_cog(n) is not None:
await ctx.send(
embed=infmsg("Debug Tools", "Bot was able to find `" + n + "`")
)
else:
await ctx.send(
embed=errmsg("Debug Tools", "Bot was not able to find `" + n + "`")
)
except Exception as e:
await ctx.send(
embed=errmsg(
"Debug Tools - ERROR",
"Had error `" + str(e) + "` while checking cog `" + n + "`",
)
)
@commands.command()
async def restart(self, ctx):
"""Restart the bot (Mod. only)"""
if ctx.message.author.id in MOD_IDS:
await ctx.send(embed=infmsg("Sad", "Ok, restarting"))
if ctx.voice_client is not None:
await ctx.voice_client.disconnect()
await ctx.bot.logout()
syslog.log(
"Admin-Important",
"Bot is restarting because "
+ ctx.message.author.display_name
+ " requested we do so.",
)
save("restarted.txt", str(ctx.message.channel.id))
await login(os.environ["bottoken"], bot=True)
else:
await ctx.send(embed=errmsg("Oops", wrongperms("restart")))
# TODO: Move to admin file?
@commands.command()
async def update(self, ctx):
"""Update bot from Git, and restart (Mod. only)"""
if ctx.message.author.id in MOD_IDS:
await ctx.send(embed=infmsg("Updater", "Updating..."))
syslog.log(
"Admin-Important",
"Bot is updating & restarting because "
+ ctx.message.author.display_name
+ " requested we do so.",
)
# are these being upset?
pull_out = await run_command_shell("git pull -v")
commit_msg = await run_command_shell(
"git --no-pager log --decorate=short --pretty=oneline -n1"
)
msg = (
"Changes:"
+ "\n```"
+ pull_out
+ "```\nCommit message:\n"
+ "```"
+ commit_msg
+ "```"
)
await ctx.send(embed=infmsg("Updater", msg))
await run_command_shell("pip3 install --upgrade -r requirements.txt")
await ctx.send(embed=infmsg("Updater", "Restarting"))
if ctx.voice_client is not None:
await ctx.voice_client.disconnect()
await ctx.bot.logout()
save("restarted.txt", str(ctx.message.channel.id))
await login(os.environ["bottoken"], bot=True)
else:
await ctx.send(embed=errmsg("Oops", wrongperms("update")))
@commands.command()
async def chbranch(self, ctx, *, branch):
"""Switch bot's upstream to a given branch (Mod. only)"""
if ctx.message.author.id in MOD_IDS:
await ctx.send(embed=infmsg("Updater", "Switching branch..."))
syslog.log(
"Admin-Important",
"Bot is switching branch to "
+ branch
+ " because "
+ ctx.message.author.display_name
+ " requested we do so.",
)
await run_command_shell("git checkout " + branch)
await ctx.send(embed=infmsg("Updater", "Done!"))
else:
await ctx.send(embed=errmsg("Oops", wrongperms("chbranch")))
@commands.command()
async def gitstatus(self, ctx):
"""Show the output of git status"""
commit_msg = await run_command_shell(
"git --no-pager log --decorate=short --pretty=oneline -n1"
)
await ctx.send(embed=infmsg("Git Status", "```" + commit_msg + "```"))
@commands.command()
async def purgesyslog(self, ctx):
"""Delete all existing syslogs (USE WITH CARE) (Owner only)"""
if ctx.message.author.id == OWNER:
purged = await run_command_shell("rm system_log* -v")
await ctx.send(embed=infmsg("Syslog Purger", "We purged:\n```" + purged + "```"))
else:
await ctx.send(embed=errmsg("Oops", wrongperms("purgesyslog")))
def setup(bot):
bot.add_cog(Debug(bot))

@ -0,0 +1,63 @@
import os, json, random
import urllib.parse
import urllib
import discord
from discord.ext import commands
import asyncio
import gmplot
from util_functions import *
from global_config import configboi
from server_config import serverconfig
from better_profanity import profanity
profanity.load_censor_words(
whitelist_words=open("data/whitelist_words.txt").read().split("\n")
)
profanity.add_censor_words(open("data/blacklist_words.txt").read().split("\n"))
# Fun internet things
class Internet(commands.Cog):
"""Useful tools on the interwebs"""
def __init__(self, bot):
self.bot = bot
self.confmgr = configboi("config.txt", False)
self.sconf = serverconfig()
async def getasjson(self, url):
try:
data = await run_command_shell('curl "' + url + '"')
return json.loads(data)
except Exception as e:
return '{"haha":"heeho"}'
@commands.command()
async def kernel(self, ctx):
"""Get Linux kernel info for host and latest"""
try:
await ctx.send(embed=infmsg("Kernel", "Getting kernel info."))
data = await self.getasjson("https://www.kernel.org/releases.json")
new_ver = data["latest_stable"]["version"]
mine = await run_command_shell("uname -r")
msg = (
"I'm running: `"
+ mine
+ "`\nKernel.org reports stable is: `"
+ new_ver
+ "`"
)
await ctx.send(embed=infmsg("Kernel", msg))
except Exception as e:
await ctx.send(
embed=errmsg("Kernel", "Had an issue getting info: `" + str(e) + "`")
)
syslog.log("Internet-Important", "Kernel command had error: " + str(e))
# End fun internet things
def setup(bot):
bot.add_cog(Internet(bot))

@ -0,0 +1,23 @@
import discord
from discord.ext import commands
from util_functions import *
# Hopefully we'll never need logging here
class Random(commands.Cog):
"""Stuff that the developer couldn't find a better category for"""
def __init__(self, bot):
self.bot = bot
@commands.command()
async def ping(self, ctx):
"""pong."""
await ctx.send(
"pong. :upside_down: :gun:", file=discord.File("images/pong.jpg")
)
def setup(bot):
bot.add_cog(Random(bot))

@ -0,0 +1,90 @@
import sys, datetime
import discord
from discord.ext import commands, tasks
from global_config import configboi
from util_functions import *
class Status(commands.Cog):
"""This cog keeps the bot status in sync"""
def __init__(self, bot):
self.bot = bot
self.confmgr = configboi("config.txt", False)
self.status_task.start()
self.uptime_logger.start()
self.upt = 0
def cog_unload(self):
self.status_task.cancel()
async def setDefaultStatus(self):
ac_type = None
if self.confmgr.get("DEFAULT_STATUS_TYPE") == "watching":
ac_type = discord.ActivityType.watching
elif self.confmgr.get("DEFAULT_STATUS_TYPE") == "listening":
ac_type = discord.ActivityType.listening
elif self.confmgr.get("DEFAULT_STATUS_TYPE") == "streaming":
ac_type = discord.ActivityType.streaming
total = 0
if "{number_users}" in self.confmgr.get("DEFAULT_STATUS_TEXT"):
guilds = self.bot.guilds
for guild in guilds:
total += guild.member_count
if ac_type is None:
ac_type = discord.ActivityType.playing
await self.bot.change_presence(
activity=discord.Activity(
type=ac_type,
name=self.confmgr.get("DEFAULT_STATUS_TEXT")
.replace("{guild_count}", str(len(list(self.bot.guilds))))
.replace("{number_users}", str(total)),
)
)
@commands.Cog.listener()
async def on_ready(self):
syslog.log("Bot Status", "Setting default status as per config")
await self.setDefaultStatus()
@tasks.loop(seconds=60.0)
async def status_task(self):
await self.setDefaultStatus()
@status_task.before_loop
async def before_status_task(self):
syslog.log(
"Bot Status", "Waiting for bot to be ready before starting updater task"
)
await self.bot.wait_until_ready()
syslog.log("Bot Status", "Bot is ready. Enabling update task")
@tasks.loop(seconds=1.0)
async def uptime_logger(self):
self.upt += 1
@uptime_logger.before_loop
async def before_logger_task(self):
await self.bot.wait_until_ready()
@commands.command(aliases=["uptime"])
async def getuptime(self, ctx):
await ctx.send(
embed=infmsg(
"Bot Stats",
"Uptime: `" + str(datetime.timedelta(seconds=self.upt)) + "`",
)
)
def setup(bot):
bot.add_cog(Status(bot))

@ -0,0 +1,172 @@
# Token is BOTTOKEN in env-vars
# E.G. "BOTTOKEN=<something> python3 combo.py"
# Standard python imports
import os, string, unicodedata, sys, re, random, time, datetime, subprocess, json, traceback
import urllib.parse
import importlib
from os import listdir
from os.path import isfile, join
# Pycord
import discord
from discord.ext import commands
# Kind've discord related
from pretty_help import DefaultMenu, PrettyHelp
# Other pip packages
from PIL import Image, ImageDraw, ImageFont
from better_profanity import profanity
import asyncio
import requests
# My own classes n such
from global_config import configboi
from util_functions import *
from server_config import serverconfig
if os.path.sep == "\\":
print("This bot is only supported on UNIX-like systems. Aborting.")
sys.exit(1)
sconf = serverconfig()
intents = discord.Intents.default()
intents.members = True
# Start event handling and bot creation
bot = commands.Bot(
command_prefix=commands.when_mentioned_or(";"),
description="Ooga booga hit rocks together",
intents=intents,
)
helpmenu = DefaultMenu("◀️", "▶️", "")
bot.help_command = PrettyHelp(
no_category="Commands", navigation=helpmenu, color=discord.Colour.blurple()
)
# Sane default?
my_homedir = os.getenv("HOME", "/home/caveman")
# No default b/c we're fucked long before this if PATH is none
old_path = os.getenv("PATH")
new_path = old_path + ":" + my_homedir + "/.local/bin/"
os.environ["PATH"] = new_path
print("Our PATH is: " + os.getenv("PATH"))
# Startup event
@bot.event
async def on_ready():
syslog.log("Main-Important", "Bot has restarted at " + getstamp())
syslog.log("Main", f"\n{bot.user} has connected to Discord!\n")
if check("restarted.txt"):
channel = get("restarted.txt")
chan = bot.get_channel(int(channel))
if chan is not None:
await chan.send(
embed=infmsg("System", "Finished restarting at: `" + getstamp() + "`")
)
os.remove("restarted.txt")
ownerman = await bot.fetch_user(OWNER)
notifyowner = confmgr.getasbool("OWNER_DM_RESTART")
cogs_dir = "cogs"
for extension in [
f.replace(".py", "") for f in listdir(cogs_dir) if isfile(join(cogs_dir, f))
]:
try:
bot.load_extension(cogs_dir + "." + extension)
syslog.log("Main", "Loaded " + extension)
# await ownerman.send(embed=infmsg("System","Loaded `" + extension + "`"))
except (Exception) as e:
await ownerman.send(
embed=errmsg(
"System", "Error from cog: " + extension + ": ```" + str(e) + "```"
)
)
syslog.log("Main", f"Failed to load extension {extension}.")
# traceback.print_exc()
if notifyowner:
await ownerman.send(
embed=infmsg("System", "Started/restarted at: `" + getstamp() + "`")
)
@bot.event
async def on_message(message):
if message.author != bot.user:
mc = message.content
if "bot" in mc:
# we're being talked to
if "bad" in mc or "sucks" in mc:
await message.channel.send(":(")
await bot.process_commands(message)
@bot.event
async def on_command_error(ctx, error):
syslog.log("Main", "Error in command: " + str(error))
await ctx.send(embed=errmsg("Error", "```" + str(error) + "```"))
@bot.command()
async def removecog(ctx, name):
"""Un-load a cog that was loaded by default."""
if ctx.message.author.id in MOD_IDS:
await ctx.send(embed=infmsg("Gotcha", "Ok, I'll try to disable `" + name + "`"))
try:
bot.remove_cog(name)
syslog.log("Main", "Disabled cog: " + name)
await ctx.send(embed=warnmsg("Done", "Disabled: `" + name + "`."))
except Exception as e:
await ctx.send(
embed=errmsg("Broke", "Something went wrong: `" + str(e) + "`.")
)
else:
await ctx.send(embed=errmsg("Oops", wrongperms("removecog")))
@bot.command()
async def getsyslog(ctx):
"""Get a copy of the system log"""
if ctx.message.author.id in MOD_IDS:
log = syslog.getlog()
if len(log) > 1994:
text = paste(log)
await ctx.send(embed=infmsg("Output", text))
else:
text = "```" + log + "```"
await ctx.send("Here you go:")
await ctx.send(text)
else:
await ctx.send(embed=errmsg("Oops", wrongperms("getsyslog")))
if UNLOAD_COGS is not None:
# Remove any cogs as per config
for item in UNLOAD_COGS:
if item != "" and item != " ":
syslog.log("Main", "Trying to remove '" + item + "'")
try:
bot.remove_cog(item)
syslog.log("Main", "Removed '" + item + "'")
except:
syslog.log("Main", "Failed to remove '" + item + "'")
if os.environ["bottoken"] != "":
bot.run(os.environ["bottoken"])
else:
print(
"You need to call this file with a variable 'bottoken' set to your bot token."
)
sys.exit(1)

@ -0,0 +1,35 @@
# This is a comment!
# Note: In the code, if the data type is not supported,
# you'll have to deal yourself (by editing global_config.py likely)
# Also, at the moment, these aren't type safe, so if you use this file in other cogs,
# make sure to use the correct getter function for the type you expect it to be. (and cast!)
# General bot stuff
# Types: watching, listening, playing
DEFAULT_STATUS_TYPE:playing
# These are currently the only substitutions
# - {number_users}
# - {guild_count}
DEFAULT_STATUS_TEXT:ooga booga w/ {number_users}
# Any default cogs you don't want to load:
# Example: UNLOAD_COGS:Internet,Speak
# Or maybe more likely: UNLOAD_COGS:Shells
# Blank is fine, too
UNLOAD_COGS:
# Text for messages:
# Note {command} is the only substitution allowed here (we do auto-mention the user)
WRONG_PERMS:You're not a special snowflake, so you can't run `{command}`
# {username} is the only one here
NEW_MEMBER:Welcome, {username}
# and {channel} is the only one here
INTRO_CHANNEL:You should go to {channel}, since it's your first time here.
# Should the owner get a DM when the bot restarts
OWNER_DM_RESTART:True

@ -0,0 +1,99 @@
import os
# Nonstandard to avoid depend loop
from logger import BotLogger
syslog = BotLogger("system_log.txt")
def check(fn):
if os.path.exists(fn):
return True
else:
return False
class configboi:
def __init__(self, fn, logging=True):
self.config = {}
if not check(fn):
if logging:
syslog.log("Config", "No config found!")
else:
if logging:
syslog.log("Config", "----- Loading config values -----")
with open(fn) as f:
config_lines = f.read().split("\n")
for line in config_lines:
if line != "" and line != "\n":
if line[0] != "#":
bits = line.split(":")
key = bits[0]
val = bits[1]
if logging:
syslog.log("Config", "Added " + key + ": " + val)
self.config[key] = val
self.islogging = logging
def reloadconfig(self):
if not check(fn):
if self.logging:
syslog.log("Config", "No config found!")
else:
if self.logging:
syslog.log("Config", "----- Loading config values -----")
with open(fn) as f:
config_lines = f.read().split("\n")
for line in config_lines:
if line != "" and line != "\n":
if line[0] != "#":
bits = line.split(":")
key = bits[0]
val = bits[1]
if self.logging:
syslog.log("Config", "Added " + key + ": " + val)
self.config[key] = val
def get(self, key):
if key in self.config:
return self.config[key].replace("//", "://")
else:
return "Not found"
def getasint(self, key):
if key in self.config:
return int(self.config[key])
else:
return 0
def getasbool(self, key):
if key in self.config:
result = self.config[key]
if result == "true" or result == "True":
return True
else:
return False
else:
return False
def getaslist(self, key):
if key in self.config:
if "," in self.config[key]:
return self.config[key].split(",")
else:
return [self.config[key]]
else:
return None
def getasintlist(self, key):
if key in self.config:
if "," in self.config[key]:
data = self.config[key].split(",")
newdata = []
for item in data:
newdata.append(int(item))
return newdata
else:
return [int(self.config[key])]
else:
return [0]

@ -0,0 +1,31 @@
import os
def getstamp():
os.system("date >> stamp")
with open("stamp") as f:
s = f.read()
os.remove("stamp")
return s
class BotLogger:
def __init__(self, filename):
if os.path.exists(filename):
n = 0
while os.path.exists(filename + "." + str(n)):
n += 1
filename = filename + "." + str(n)
self.fn = filename
def log(self, caller, text):
info = getstamp().strip() + " --> " + caller + ": " + text
with open(self.fn, "a+") as f:
f.write("\n" + info + "\n")
print(info)
def getlog(self):
with open(self.fn) as f:
return f.read()

Binary file not shown.

After

Width:  |  Height:  |  Size: 367 KiB

@ -0,0 +1,3 @@
py-cord[voice]
discord-pretty-help
requests

@ -0,0 +1,125 @@
import os, sys, shutil
# Nonstandard to avoid depend loop
from logger import BotLogger
syslog = BotLogger("system_log.txt")
def check(fn):
if os.path.exists(fn):
return True
else:
return False
class simpledb:
def __init__(self):
self.dbroot = "server-data/"
if not check(self.dbroot):
os.makedirs(self.dbroot)
def get(self, server, key):
serverdb = self.dbroot + server
keyp = serverdb + "/" + key
if not check(serverdb):
return "NOT SET"
else:
if not check(keyp):
return "NOT SET"
else:
with open(keyp) as f:
return str(f.read())
def set(self, server, key, value):
serverdb = self.dbroot + server
keyp = serverdb + "/" + key
if not check(serverdb):
os.makedirs(serverdb)
with open(keyp, "w") as f:
f.write(str(value))
def exists(self, server):
serverdb = self.dbroot + server
if check(serverdb):
return True
else:
return False
def erase(self, server):
serverdb = self.dbroot + server
if self.exists(server):
shutil.rmtree(serverdb)
def listdata(self, server):
serverdb = self.dbroot + server
if self.exists(server):
data = ""
for f in os.listdir(serverdb):
if f != "blank":
data += f + " : " + open(serverdb + "/" + f).read() + "\n"
return data
else:
return "No DB for " + server
# ____________________________________________
class serverconfig:
def __init__(self):
self.db = simpledb()
def showdata(self, server):
server = str(server)
data = self.db.listdata(server)
out = "Data for `" + server + "`:\n```" + data + "```"
return out
def checkinit(self, server):
server = str(server)
if self.db.exists(server):
return True
else:
self.db.set(server, "blank", "blank")
return False
def getstring(self, server, key):
server = str(server)
return self.db.get(server, key)
def getint(self, server, key):
server = str(server)
val = self.db.get(server, key)
if val == "NOT SET":
return -1
else:
return int(self.db.get(server, key))
def getbool(self, server, key):
server = str(server)
val = self.db.get(server, key)
if val == "NOT SET":
return False
else:
if "true" in self.db.get(server, key):
return True
else:
return False
def set(self, server, key, value):
server = str(server)
key = str(key)
value = str(value)
self.db.set(server, key, value)
def rs(self, server):
server = str(server)
self.db.erase(server)
# GUILD ID:
# - mods
# - swears_censored
# - require_privileges
# - announcements

@ -0,0 +1,236 @@
# System
import os, sys, random, string
# Pip
import asyncio, requests, youtube_dl, discord
# Me
from global_config import configboi
from logger import BotLogger
# lol
confmgr = configboi("config.txt", False)
syslog = BotLogger("system_log.txt")
# <-------------- Don't touch pls --------------->
# If you're adding your own stuff, you need to look at
# global_config.py to see the supported data types, and add your
# own if needed.
# .get is string
VER = confmgr.get("VER")
PASTE_BASE = confmgr.get("PASTE_BASE")
PASTE_URL_BASE = confmgr.get("PASTE_URL_BASE")
HELP_LOC = confmgr.get("HELP_LOC")
WRONG_PERMS = confmgr.get("WRONG_PERMS")
NEW_MEMBER = confmgr.get("NEW_MEMBER")
INTRO_CHANNEL = confmgr.get("INTRO_CHANNEL")
# and a list (vv)
IMAGE_RESPONSES = confmgr.getaslist("IMAGE_RESPONSES")
# and a boolean (vv)
DO_IMAGE_RESPONSE = confmgr.getasbool("DO_IMAGE_RESPONSES")
IMAGE_RESPONSE_PROB = confmgr.getasint("IMAGE_RESPONSE_PROB")
# list of integers
MOD_IDS = confmgr.getasintlist("MOD_IDS")
# and an int (vv)
OWNER = confmgr.getasint("OWNER")
DEFAULT_STATUS_TYPE = confmgr.get("DEFAULT_STATUS_TYPE")
DEFAULT_STATUS_TEXT = confmgr.get("DEFAULT_STATUS_TEXT")
UNLOAD_COGS = confmgr.getaslist("UNLOAD_COGS")
# <-------------- End --------------------->
# <--------------Colors Start-------------->
# For embed msgs (you can override these if you want)
# But changing the commands which use embed would be better
purple_dark = 0x6A006A
purple_medium = 0xA958A5
purple_light = 0xC481FB
orange = 0xFFA500
gold = 0xDAA520
red_dark = 8e2430
red_light = 0xF94343
blue_dark = 0x3B5998
cyan = 0x5780CD
blue_light = 0xACE9E7
aqua = 0x33A1EE
pink = 0xFF9DBB
green_dark = 0x2AC075
green_light = 0xA1EE33
white = 0xF9F9F6
cream = 0xFFDAB9
# <--------------Colors End-------------->
WHITELIST = []
def strip_dangerous(text):
remove = [";", "&&", "&", '"']
for thing in remove:
text = text.replace(thing, "")
if "\n" in text:
text = text.split("\n")[0]
return text
def fancymsg(title, text, color, footnote=None):
e = discord.Embed(colour=color)
e.add_field(name=title, value=text, inline=False)
if footnote is not None:
e.set_footer(text=footnote)
return e
def errmsg(title, text, footnote=None):
return fancymsg(title, text, discord.Colour.red(), footnote)
def warnmsg(title, text, footnote=None):
return fancymsg(title, text, discord.Colour.gold(), footnote)
def infmsg(title, text, footnote=None):
return fancymsg(title, text, discord.Colour.blurple(), footnote)
def imgbed(title, type, dat):
# see https://discordpy.readthedocs.io/en/stable/faq.html?highlight=embed#how-do-i-use-a-local-image-file-for-an-embed-image
e = discord.Embed(color=discord.Colour.blurple())
e.add_field(name="foo", value=title, inline=False)
if type == "rem":
e.set_image(url=dat)
else:
e.set_image(url="attachment://" + dat)
return e
# Youtube Stuff
async def getytvid(link, songname):
syslog.log("Util-GetYTvid", "We're starting a download session")
syslog.log("Util-GetYTvid", "Target filename is: " + songname)
await run_command_shell(
"cd bin && python3 download_one.py " + link + " " + songname + " && cd ../"
)
syslog.log("Util-GetYTvid", "All done!")
# Simple file wrappers
def check(fn):
if os.path.exists(fn):
return True
else:
return False
def save(fn, text):
with open(fn, "a+") as f:
f.write(text + "\n")
def get(fn):
if check(fn):
with open(fn) as f:
return f.read()
def ensure(fn):
if not check(fn):
os.makedirs(fn, exist_ok=True)
def getstamp():
if sys.platform != "win32":
os.system("date >> stamp")
with open("stamp") as f:
s = f.read()
os.remove("stamp")
return s
else:
return ""
def iswhitelisted(word):
if word in WHITELIST:
return True
else:
return False
# WTF IS THIS?
def purifylink(content):
sp1 = content.split("http")[1]
sp2 = sp1.split(" ")[0]
return "http" + sp2
def wrongperms(command):
syslog.log("System", "Someone just failed to run: '" + command + "'")
return WRONG_PERMS.replace("{command}", command)
# Maybe add: https://docs.python.org/3/library/shlex.html#shlex.quote ?
async def run_command_shell(command):
"""Run command in subprocess (shell)."""
# Create subprocess
process = await asyncio.create_subprocess_shell(
command, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE
)
# Status
print("Started:", command, "(pid = " + str(process.pid) + ")", flush=True)
# Wait for the subprocess to finish
stdout, stderr = await process.communicate()
# Progress
if process.returncode == 0:
print("Done:", command, "(pid = " + str(process.pid) + ")", flush=True)
# Result
result = stdout.decode().strip()
else:
print("Failed:", command, "(pid = " + str(process.pid) + ")", flush=True)
# Result
result = stderr.decode().strip()
# Return stdout
return result
def paste(text):
N = 25
fn = (
"".join(
random.choice(
string.ascii_uppercase + string.digits + string.ascii_lowercase
)
for _ in range(N)
)
+ ".txt"
)
with open(PASTE_BASE + fn, "w") as f:
f.write(text)
return PASTE_URL_BASE + fn
def getgeoip(ip):
url = "https://freegeoip.app/json/" + ip
headers = {"accept": "application/json", "content-type": "application/json"}
response = requests.request("GET", url, headers=headers)
dat = response.json()
return dat
Loading…
Cancel
Save