278 lines
11 KiB
Python
278 lines
11 KiB
Python
import asyncio
|
||
import re
|
||
from datetime import timedelta
|
||
from urllib.parse import quote_plus
|
||
|
||
import aiohttp
|
||
import yt_dlp
|
||
import discord
|
||
import logging
|
||
import argparse
|
||
import os
|
||
from discord.ext import commands
|
||
from discord import Embed
|
||
|
||
banner = """
|
||
\ | _) __ ) |
|
||
|\/ | | | __| | __| __ \ _ \ __|
|
||
| | | | \__ \ | ( _____| | | ( | |
|
||
_| _| \__,_| ____/ _| \___| ____/ \___/ \__|
|
||
"""
|
||
version = "0.1"
|
||
branch = "dev"
|
||
success = "**Success ✅**\n"
|
||
warning = "**Warning ℹ️**\n"
|
||
error = "**Error ❗️**\n"
|
||
cache_dir = "cache"
|
||
logging.basicConfig(level=logging.INFO)
|
||
parser = argparse.ArgumentParser()
|
||
parser.add_argument("-t", "--token", help="Bot TOKEN")
|
||
args = parser.parse_args()
|
||
|
||
if args.token == None:
|
||
print(f"{banner}\n\nPLEASE PROVIDE BOT A TOKEN BY RUNNING LIKE THE FOLLOWING:")
|
||
print("\n")
|
||
print(">>> python3 Music-Bot.py -t TOKEN <<<")
|
||
else:
|
||
|
||
# Suppress noise about console usage from errors
|
||
yt_dlp.utils.bug_reports_message = lambda: ""
|
||
|
||
ytdl_format_options = {
|
||
"format": "bestaudio/mp3",
|
||
"outtmpl": f"{cache_dir}/%(extractor)s-%(id)s-%(title)s.mp3",
|
||
"restrictfilenames": True,
|
||
"noplaylist": True,
|
||
"nocheckcertificate": True,
|
||
"ignoreerrors": False,
|
||
"logtostderr": False,
|
||
"quiet": True,
|
||
"no_warnings": True,
|
||
"default_search": "auto",
|
||
"source_address": "0.0.0.0", # Bind to ipv4 since ipv6 addresses cause issues at certain times
|
||
"postprocessors": [{
|
||
'key': 'FFmpegExtractAudio',
|
||
'preferredcodec': 'mp3',
|
||
'preferredquality': '1080',
|
||
}],
|
||
}
|
||
|
||
ffmpeg_options = {"options": "-vn"}
|
||
|
||
ytdl = yt_dlp.YoutubeDL(ytdl_format_options)
|
||
|
||
|
||
class YTDLSource(discord.PCMVolumeTransformer):
|
||
def __init__(self, source: discord.AudioSource, *, data: dict, volume: float = 0.5):
|
||
super().__init__(source, volume)
|
||
|
||
self.data = data
|
||
|
||
self.title = data.get("title")
|
||
self.url = data.get("url")
|
||
|
||
@classmethod
|
||
async def from_url(cls, url, *, loop=None, stream=False):
|
||
loop = loop or asyncio.get_event_loop()
|
||
data = await loop.run_in_executor(
|
||
None, lambda: ytdl.extract_info(url, download=not stream)
|
||
)
|
||
|
||
if "entries" in data:
|
||
# Takes the first item from a playlist
|
||
data = data["entries"][0]
|
||
|
||
filename = data["url"] if stream else ytdl.prepare_filename(data)
|
||
return cls(discord.FFmpegPCMAudio(source=filename, executable="ffmpeg", pipe=False, stderr=False,
|
||
before_options="-threads 2", options=ffmpeg_options), data=data)
|
||
|
||
|
||
class Youtube(discord.ui.View):
|
||
def __init__(self, query: str):
|
||
super().__init__()
|
||
# We need to quote the query string to make a valid url. Discord will raise an error if it isn't valid.
|
||
query = quote_plus(query)
|
||
#url = f"https://www.youtube.com/results?search_query={query}"
|
||
|
||
p = {"search_query": query}
|
||
# Spoof a user agent header or the request will immediately fail
|
||
h = {"User-Agent": "Mozilla/5.0"}
|
||
with aiohttp.ClientSession() as client:
|
||
with client.get("https://www.youtube.com/results", params=p, headers=h) as resp:
|
||
dom = resp.text()
|
||
# open("debug.html", "w").write(dom)
|
||
found = re.findall(r'href"\/watch\?v=([a-zA-Z0-9_-]{11})', dom)
|
||
url = f"https://youtu.be/{found[0]}"
|
||
|
||
|
||
# Link buttons cannot be made with the
|
||
# decorator, so we have to manually create one.
|
||
# We add the quoted url to the button, and add the button to the view.
|
||
self.add_item(discord.ui.Button(label="Click Here", url=url))
|
||
|
||
# Initializing the view and adding the button can actually be done in a one-liner at the start if preferred:
|
||
# super().__init__(discord.ui.Button(label="Click Here", url=url))
|
||
|
||
|
||
intents = discord.Intents.default()
|
||
intents.message_content = True
|
||
|
||
bot = commands.Bot(
|
||
command_prefix=commands.when_mentioned_or("!"),
|
||
description="Relatively simple music bot example",
|
||
intents=intents, help_command=None
|
||
)
|
||
|
||
|
||
@bot.event
|
||
async def on_ready():
|
||
print(banner)
|
||
print(f"Bot Version: {version} ({branch})")
|
||
print(f"Bot: {bot.user} (ID: {bot.user.id})")
|
||
print(
|
||
f"Invitation LINK: https://discord.com/api/oauth2/authorize?client_id={bot.user.id}&permissions=968552344896&scope=bot%20applications.commands")
|
||
print("------")
|
||
|
||
|
||
@bot.command(name="shutdown")
|
||
@commands.is_owner()
|
||
async def _shutdown(ctx):
|
||
files = os.listdir(cache_dir)
|
||
async with ctx.channel.typing():
|
||
for file in files:
|
||
os.remove(f"{cache_dir}/{file}")
|
||
await ctx.send(f"{warning}Cleaned up {len(files)} Files...\n\n{success}Shutting down...")
|
||
await bot.close()
|
||
|
||
|
||
@bot.slash_command(name="join", description="Summon the bot into your channel")
|
||
async def join(ctx: commands.Context):
|
||
try:
|
||
channel = ctx.author.voice.channel
|
||
if channel is None:
|
||
await ctx.send(f"{error}You are not in a voice-channel! Could not join...")
|
||
if ctx.voice_client is not None:
|
||
return await ctx.voice_client.move_to(channel)
|
||
|
||
await ctx.respond(f"{success}Connected to `{channel.name}`")
|
||
await channel.connect()
|
||
except Exception as err:
|
||
print(err)
|
||
|
||
|
||
@bot.slash_command(name="nowplaying", descriprion="Show details what the bot is currently playing")
|
||
async def nowplaying(ctx: commands.Context):
|
||
async with ctx.channel.typing():
|
||
if ctx.voice_client is None:
|
||
embed = Embed(title=f"{error}Currently is no music playing!", color=discord.Color.red())
|
||
else:
|
||
embed = Embed(title=f"⏯ Now playing", color=discord.Color.blue())
|
||
embed.add_field(name="🎵 ️ Title", value=ctx.voice_client.source.title, inline=False)
|
||
embed.add_field(name="🔗 Link",
|
||
value=f"https://youtube.com/watch?v={ctx.voice_client.source.data['id']}", inline=False)
|
||
embed.add_field(name="🔈 Volume",
|
||
value=f"Current setting `{int(ctx.voice_client.source.volume * 100)}%`", inline=False)
|
||
embed.set_author(name=bot.user.name, icon_url=bot.user.avatar.url)
|
||
embed.set_image(url=f"https://i3.ytimg.com/vi/{ctx.voice_client.source.data['id']}/maxresdefault.jpg")
|
||
embed.set_footer(text=f"{round(bot.latency, 2)}ms Latency 🚀")
|
||
await ctx.respond(embed=embed)
|
||
|
||
|
||
@bot.slash_command(name="play", description="Plays a song from YouTube [With preloading]")
|
||
async def play(ctx: commands.Context, url: str):
|
||
async with ctx.typing():
|
||
await ctx.respond("🤖 Your song is queued for download... please wait ", ephemeral=True)
|
||
try:
|
||
player = await YTDLSource.from_url(url, loop=bot.loop)
|
||
ctx.voice_client.play(
|
||
player, after=lambda e: print(f"Player error: {e}") if e else None
|
||
)
|
||
except discord.HTTPException as err:
|
||
raise commands.CommandError(err)
|
||
finally:
|
||
await ctx.send(f"{success}Now playing: `{player.title}`\nRequested by {ctx.author.mention}")
|
||
|
||
|
||
@bot.slash_command(name="stream", description="Streams a song from YouTube [Without preloading]")
|
||
async def stream(ctx: commands.Context, url: str):
|
||
async with ctx.typing():
|
||
player = await YTDLSource.from_url(url, loop=bot.loop, stream=True)
|
||
ctx.voice_client.play(
|
||
player, after=lambda e: print(f"Player error: {e}") if e else None
|
||
)
|
||
|
||
await ctx.respond(f"{success}Now playing: `{player.title}`\nRequested by {ctx.author.mention}")
|
||
|
||
|
||
@bot.slash_command(name="pause", description="Pauses the playback")
|
||
async def _pause(ctx: commands.Context):
|
||
ctx.voice_client.pause()
|
||
await ctx.respond(f"{success}⏸️ {ctx.author} has paused the playback")
|
||
|
||
|
||
@bot.slash_command(name="resume", description="Resumes the playback")
|
||
async def _resume(ctx: commands.Context):
|
||
ctx.voice_client.resume()
|
||
await ctx.respond(f"{success}▶️ {ctx.author} has resumed the playback")
|
||
|
||
|
||
@bot.slash_command(name="volume", description="Sets the bot volume")
|
||
async def volume(ctx: commands.Context, volume: int):
|
||
if ctx.voice_client is None:
|
||
return await ctx.send(f"{error}Not connected to a voice channel.")
|
||
ctx.voice_client.source.volume = volume / 100
|
||
await ctx.respond(f"{success}Changed volume to `{volume}%`")
|
||
|
||
|
||
@bot.slash_command(name="stop", description="Stopps the playback and disconnects the Bot")
|
||
async def stop(ctx: commands.Context):
|
||
await ctx.respond(f"{success}Stopping playback in `{ctx.voice_client.channel.name}`")
|
||
await ctx.voice_client.disconnect(force=True)
|
||
|
||
|
||
@bot.slash_command()
|
||
async def youtube(ctx: discord.ApplicationContext, query: str):
|
||
"""Returns a google link for a query."""
|
||
await ctx.respond(f"Google Result for: `{query}`", view=Youtube(query))
|
||
|
||
|
||
@play.before_invoke
|
||
@stream.before_invoke
|
||
async def ensure_voice(ctx: commands.Context):
|
||
if ctx.voice_client is None:
|
||
if ctx.author.voice:
|
||
await ctx.author.voice.channel.connect()
|
||
else:
|
||
raise commands.CommandError("Author not connected to a voice channel.")
|
||
elif ctx.voice_client.is_playing():
|
||
ctx.voice_client.stop()
|
||
|
||
"""
|
||
@bot.event
|
||
async def on_application_command_error(ctx, error):
|
||
print(f"[on_application_command_error]\n{ctx.author}\n{error}")
|
||
if isinstance(error, discord.ext.commands.CommandError):
|
||
embed = Embed(title=f"{warning}{error}",
|
||
color=15158332)
|
||
elif isinstance(error, discord.ext.commands.CommandOnCooldown):
|
||
cool_down_time = int(error.cooldown.get_retry_after())
|
||
td = timedelta(seconds=cool_down_time)
|
||
embed = Embed(title=f"{warning}Dieser Command befindet sich im Cool Down!\n Versuche es in `{td}` nochmal!",
|
||
color=15158332)
|
||
elif isinstance(error, discord.ext.commands.is_owner()):
|
||
embed = Embed(title=f"{warning}{error}",
|
||
color=15158332)
|
||
else:
|
||
embed = Embed(title=f"{error}", color=15158332)
|
||
await ctx.respond(embed=embed)
|
||
|
||
|
||
@bot.event
|
||
async def on_command_error(ctx, error):
|
||
print(f"[on_command_error]\n{ctx.author}\n{error}")
|
||
embed = Embed(title=f"{error}", color=15158332)
|
||
await ctx.send(embed=embed)
|
||
"""
|
||
|
||
bot.run(token=args.token, reconnect=True)
|