Files
Discord-Music-Bot/Music-Bot.py
2023-02-08 19:01:07 +01:00

304 lines
12 KiB
Python
Raw Blame History

This file contains invisible Unicode characters

This file contains invisible Unicode characters that are indistinguishable to humans but may be processed differently by a computer. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import asyncio
import re
from datetime import timedelta
from urllib.parse import quote_plus
import aiohttp
import yt_dlp
import discord
import logging
import argparse
import os
from discord.ext import commands
from discord import Embed
banner = """
\ | _) __ ) |
|\/ | | | __| | __| __ \ _ \ __|
| | | | \__ \ | ( _____| | | ( | |
_| _| \__,_| ____/ _| \___| ____/ \___/ \__|
"""
version = "0.1"
branch = "dev"
success = "**Success ✅**\n"
warning = "**Warning **\n"
error = "**Error ❗️**\n"
cache_dir = "cache"
logging.basicConfig(level=logging.INFO)
parser = argparse.ArgumentParser()
parser.add_argument("-t", "--token", help="Bot TOKEN")
parser.add_argument("-c", "--cores", help="Number of Cores for ffmpeg (Standard 2)")
parser.add_argument("-p", "--prefix", help="Provides the Prefix for the bot (Standard <)")
args = parser.parse_args()
if args.cores == None:
threads = 2
else:
threads = int(args.cores)
if args.prefix == None:
b_prefix = "<"
else:
b_prefix = str(args.prefix)
if args.token is None:
print(f"{banner}\n\nPLEASE PROVIDE BOT A TOKEN BY RUNNING LIKE THE FOLLOWING:")
print("\n")
print(">>> python3 Music-Bot.py -t TOKEN <<<")
else:
# Suppress noise about console usage from errors
yt_dlp.utils.bug_reports_message = lambda: ""
ytdl_format_options = {
"format": "bestaudio/mp3",
"outtmpl": f"{cache_dir}/%(extractor)s-%(id)s-%(title)s.mp3",
"restrictfilenames": True,
"noplaylist": True,
"nocheckcertificate": True,
"ignoreerrors": False,
"logtostderr": False,
"quiet": True,
"no_warnings": True,
"default_search": "auto",
"source_address": "0.0.0.0", # Bind to ipv4 since ipv6 addresses cause issues at certain times
"postprocessors": [{
'key': 'FFmpegExtractAudio',
'preferredcodec': 'mp3',
'preferredquality': '1080',
}],
}
ffmpeg_options = {"options": "-vn"}
ytdl = yt_dlp.YoutubeDL(ytdl_format_options)
class YTDLSource(discord.PCMVolumeTransformer):
def __init__(self, source: discord.AudioSource, *, data: dict, volume: float = 0.5):
super().__init__(source, volume)
self.data = data
self.title = data.get("title")
self.url = data.get("url")
@classmethod
async def from_url(cls, url, *, loop=None, stream=False):
loop = loop or asyncio.get_event_loop()
data = await loop.run_in_executor(
None, lambda: ytdl.extract_info(url, download=not stream)
)
if "entries" in data:
# Takes the first item from a playlist
data = data["entries"][0]
filename = data["url"] if stream else ytdl.prepare_filename(data)
return cls(discord.FFmpegPCMAudio(source=filename, executable="ffmpeg", pipe=False, stderr=False,
before_options=f"-threads {threads}", options=ffmpeg_options), data=data)
async def youtube_search(search: str):
p = {"search_query": search}
# Spoof a user agent header or the request will immediately fail
h = {"User-Agent": "Mozilla/5.0"}
async with aiohttp.ClientSession() as client:
async with client.get("https://www.youtube.com/results", params=p, headers=h) as resp:
dom = await resp.text()
found = re.findall(r"watch\?v=(\S{11})", dom)[0]
url = f"https://www.youtube.com/watch?v={found}"
return url
intents = discord.Intents.default()
intents.message_content = True
bot = commands.Bot(
command_prefix=commands.when_mentioned_or(b_prefix),
description="Relatively simple music bot example",
intents=intents, help_command=None
)
@bot.event
async def on_ready():
print(banner)
print(f"Bot Version: {version} ({branch})")
print(f"Bot: {bot.user} (ID: {bot.user.id})")
print(f"Threads for ffmpeg: {threads}")
print(f"Prefix: {b_prefix}")
print(
f"Invitation LINK: https://discord.com/api/oauth2/authorize?client_id={bot.user.id}&permissions=968552344896&scope=bot%20applications.commands")
print("------")
@bot.command(name="shutdown")
@commands.is_owner()
async def _shutdown(ctx):
files = os.listdir(cache_dir)
async with ctx.channel.typing():
for file in files:
os.remove(f"{cache_dir}/{file}")
await ctx.send(f"{warning}Cleaned up {len(files)} Files...\n\n{success}Shutting down...")
await bot.close()
@bot.slash_command(name="join", description="Summon the bot into your channel")
async def join(ctx: commands.Context):
try:
channel = ctx.author.voice.channel
if channel is None:
await ctx.send(f"{error}You are not in a voice-channel! Could not join...")
if ctx.voice_client is not None:
return await ctx.voice_client.move_to(channel)
await ctx.respond(f"{success}Connected to `{channel.name}`")
await channel.connect()
except Exception as err:
print(err)
@bot.slash_command(name="nowplaying", descriprion="Show details what the bot is currently playing")
async def nowplaying(ctx: commands.Context):
async with ctx.channel.typing():
if ctx.voice_client is None:
embed = Embed(title=f"{error}Currently is no music playing!", color=discord.Color.red())
else:
embed = Embed(title=f"⏯ Now playing", color=discord.Color.blue())
embed.add_field(name="🎵 Title", value=ctx.voice_client.source.title, inline=False)
embed.add_field(name="🔗 Link",
value=f"https://youtube.com/watch?v={ctx.voice_client.source.data['id']}", inline=False)
embed.add_field(name="🔈 Volume",
value=f"Current setting `{int(ctx.voice_client.source.volume * 100)}%`", inline=False)
embed.set_author(name=bot.user.name, icon_url=bot.user.avatar.url)
embed.set_image(url=f"https://i3.ytimg.com/vi/{ctx.voice_client.source.data['id']}/maxresdefault.jpg")
embed.set_footer(text=f"{round(bot.latency, 2)}ms Latency 🚀")
await ctx.respond(embed=embed)
@bot.slash_command(name="play", description="Plays a song from YouTube [With preloading]")
async def play(ctx: commands.Context, url: str):
async with ctx.typing():
await ctx.respond("🤖 Your song is queued for download... please wait ", ephemeral=True)
try:
player = await YTDLSource.from_url(url, loop=bot.loop)
ctx.voice_client.play(
player, after=lambda e: print(f"Player error: {e}") if e else None
)
except discord.HTTPException as err:
raise commands.CommandError(err)
finally:
await ctx.send(f"{success}Now playing: `{player.title}`\nRequested by {ctx.author.mention}")
while ctx.voice_client.is_playing():
await asyncio.sleep(3)
else:
await ctx.voice_client.disconnect()
@bot.slash_command(name="stream", description="Streams a song from YouTube [Without preloading]")
async def stream(ctx: commands.Context, url: str):
async with ctx.typing():
player = await YTDLSource.from_url(url, loop=bot.loop, stream=True)
ctx.voice_client.play(
player, after=lambda e: print(f"Player error: {e}") if e else None
)
await ctx.respond(f"{success}Now playing: `{player.title}`\nRequested by {ctx.author.mention}")
while ctx.voice_client.is_playing():
await asyncio.sleep(3)
else:
await ctx.voice_client.disconnect()
@bot.slash_command(name="pause", description="Pauses the playback")
async def _pause(ctx: commands.Context):
ctx.voice_client.pause()
await ctx.respond(f"{success}⏸️ {ctx.author} has paused the playback")
@bot.slash_command(name="resume", description="Resumes the playback")
async def _resume(ctx: commands.Context):
ctx.voice_client.resume()
await ctx.respond(f"{success}▶️ {ctx.author} has resumed the playback")
@bot.slash_command(name="volume", description="Sets the bot volume")
async def volume(ctx: commands.Context, volume: int):
if ctx.voice_client is None:
return await ctx.send(f"{error}Not connected to a voice channel.")
ctx.voice_client.source.volume = volume / 100
await ctx.respond(f"{success}Changed volume to `{volume}%`")
@bot.slash_command(name="stop", description="Stopps the playback and disconnects the Bot")
async def stop(ctx: commands.Context):
await ctx.respond(f"{success}Stopping playback in `{ctx.voice_client.channel.name}`")
await ctx.voice_client.disconnect(force=True)
@bot.slash_command(name="youtube", description="Searches YouTube for a given string and plays the first result")
async def youtube(ctx, *, search: str):
url = await youtube_search(search=search)
async with ctx.typing():
await ctx.respond("🤖 Your song is queued for download... please wait ", ephemeral=True)
try:
player = await YTDLSource.from_url(url, loop=bot.loop)
ctx.voice_client.play(
player, after=lambda e: print(f"Player error: {e}") if e else None
)
except discord.HTTPException as err:
raise commands.CommandError(err)
finally:
await ctx.send(f"{success}Now playing: `{player.title}`\nRequested by {ctx.author.mention}")
while ctx.voice_client.is_playing():
await asyncio.sleep(3)
else:
await ctx.voice_client.disconnect()
@play.before_invoke
@stream.before_invoke
@youtube.before_invoke
async def ensure_voice(ctx: commands.Context):
if ctx.voice_client is None:
if ctx.author.voice:
await ctx.author.voice.channel.connect()
else:
raise commands.CommandError("Author not connected to a voice channel.")
elif ctx.voice_client.is_playing():
ctx.voice_client.stop()
@bot.event
async def on_application_command_error(ctx, error):
print(f"[on_application_command_error]\n{ctx.author}\n{error}")
if isinstance(error, discord.ext.commands.CommandError):
embed = Embed(title=f"{warning}{error}",
color=15158332)
elif isinstance(error, discord.ext.commands.CommandOnCooldown):
cool_down_time = int(error.cooldown.get_retry_after())
td = timedelta(seconds=cool_down_time)
embed = Embed(title=f"{warning}Dieser Command befindet sich im Cool Down!\n Versuche es in `{td}` nochmal!",
color=15158332)
elif isinstance(error, discord.ext.commands.is_owner()):
embed = Embed(title=f"{warning}{error}",
color=15158332)
else:
embed = Embed(title=f"{error}", color=15158332)
await ctx.respond(embed=embed)
@bot.event
async def on_command_error(ctx, error):
print(f"[on_command_error]\n{ctx.author}\n{error}")
embed = Embed(title=f"{error}", color=15158332)
await ctx.send(embed=embed)
bot.run(token=args.token, reconnect=True)