Source code for discodo.client.voice_client

import asyncio
import copy

from ..enums import PlayerState
from ..errors import NodeException
from ..utils import EventDispatcher
from .http import HTTPClient
from .models import Queue


[docs]class VoiceClient: """Represents a voice connection of the guild. :var discodo.Node Node: The node which the connection is connected with. :var discodo.DPYClient client: The client which the connection is binded. :var asyncio.AbstractEventLoop loop: The event loop that the client uses for operation. :var str id: The id of the voice client, which is used on restful api. :var int guild_id: The guild id which is connected to. :var Optional[int] channel_id: The channel id which is connected to. :var EventDispatcher dispatcher: The event dispatcher that the client dispatches events. :var list Queue: The queue of the guild, it is synced to node and readonly.""" def __init__(self, Node, id, guild_id): self.Node = Node self.client = Node.client self.loop = Node.loop self.id = id self.guild_id = guild_id self.channel_id = None self.http = HTTPClient(self) self.dispatcher = EventDispatcher() self.dispatcher.on( "VC_CHANNEL_EDITED", lambda data: setattr(self, "channel_id", data["channel_id"]), ) self._state = None self._volume = 1.0 self._crossfade = 10.0 self._autoplay = True self._filter = {} self._context = {} self._current = None self.dispatcher.on("getState", self.handleGetState) self.syncTask = self.loop.create_task(self.syncWithNode()) self.Queue = Queue(self) self.dispatcher.on("getQueue", self.Queue.handleGetQueue) self.dispatcher.on("QUEUE_EVENT", self.Queue.handleQueueEvent) self.loop.create_task(self.send("getQueue", {})) def __repr__(self) -> str: return f"<VoiceClient id={self.id} guild_id={self.guild_id} channel_id={self.channel_id} Node={self.Node}>" def __del__(self): if self.syncTask and not self.syncTask.done(): self.syncTask.cancel() vc = self.Node.voiceClients.get(self.guild_id) if vc and vc == self: self.Node.voiceClients.pop(self.guild_id) async def syncWithNode(self): while True: await self.send("getState", {}) await asyncio.sleep(5.0) @property def volume(self): """Represents the volume of this guild. The maximum value is ``2.0`` .. note:: This value is a percentage with decimal points. For example, ``100%`` is ``1.0``, and ``5%`` is ``0.05``. :rtype: float""" return self._volume @property def crossfade(self): """Represents the crossfade duration of this guild. :rtype: float""" return self._crossfade @property def autoplay(self): """Represents the autoplay state of this guild. :rtype: bool""" return self._autoplay @property def filter(self): """Represents the autoplay state of this guild. .. note:: For more information, check the documentation of the `FFmpeg Filter`_ .. _`FFmpeg Filter`: https://ffmpeg.org/ffmpeg-filters.html :rtype: bool""" return copy.copy(self._filter) @property def current(self): return self._current @property def duration(self): return self._current.duration if self._current else None @property def position(self): return self._current.position if self._current else None @property def remain(self): return round( self._current.duration - self._current.position if self._current else None, 2, ) @property def context(self): return self._context @property def state(self): return self._state def handleGetState(self, data): options = data["options"] self._volume = options["volume"] self._crossfade = options["crossfade"] self._autoplay = options["autoplay"] self._filter = options["filter"] self._current = data["current"] self._context = data["context"] self._state = PlayerState(data["state"]) self.channel_id = data["channel_id"]
[docs] async def send(self, op, data): """Send websocket payload to the node with guild id :param str op: Operation name of the payload :param Optional[dict] data: Operation data to send with""" data["guild_id"] = str(self.guild_id) return await self.Node.send(op, data)
[docs] async def query(self, op, data=None, event=None, timeout=10.0): """Send websocket payload to the node with guild id and await response. :param str op: Operation name of the payload :param Optional[dict] data: Operation data to send with :param Optional[str] event: Event name to receive response, defaults to ``op`` :param Optional[float] timeout: Seconds to wait for response :raise asyncio.TimeoutError: The query is timed out. :raise discodo.NodeException: The node returned some exceptions. :rtype: Any""" if not event: event = op if not data: data = {} Task = self.loop.create_task( self.dispatcher.wait_for( event, condition=lambda d: d["guild_id"] == str(self.guild_id), timeout=timeout, ) ) await self.send(op, data) Data = await Task if Data.get("traceback"): raise NodeException(*list(Data["traceback"].items())[0]) return Data
[docs] async def fetchContext(self): r"""Fetch the context from the node. :rtype: dict""" self._context = await self.http.getVCContext() return self.context
[docs] async def setContext(self, data): r"""Set the context to the node. :param dict data: The context to set. :rtype: dict""" self._context = await self.http.setVCContext(data) return self.context
[docs] async def getSource(self, query): r"""Search the query and get source from extractor :param str query: The query to search. :rtype: AudioData""" data = await self.http.getSource(query) return data["source"]
[docs] async def searchSources(self, query): r"""Search the query and get sources from extractor :param str query: The query to search. :rtype: list[AudioData]""" data = await self.http.searchSources(query) return data["sources"]
[docs] async def putSource(self, source): r"""Search the query and get sources from extractor :param source: The source to put on the queue. :type source: AudioData or AudioSource or list :rtype: AudioData or AudioSource or list""" data = await self.http.putSource( list(map(lambda x: x.toDict(), source)) if isinstance(source, list) else source.toDict() ) return data["source"]
[docs] async def loadSource(self, query): r"""Search the query and put source to the queue :param str query: The query to search. :rtype: AudioData or list""" data = await self.http.loadSource(query) return data["source"]
[docs] async def skip(self, offset=1): r"""Skip the source :param int offset: how many to skip the sources""" return await self.http.skip(offset)
[docs] async def seek(self, offset): r"""Seek the player :param float offset: The position to seek""" await self.http.seek(offset) await self.fetchState() return
[docs] async def getOptions(self): r"""Get options of the player :rtype: dict""" return await self.http.getOptions()
[docs] async def setOptions(self, **options): r"""Set options of the player :param Optional[float] volume: The volume of the player to change. :param Optional[float] crossfade: The crossfade of the player to change. :param Optional[bool] autoplay: The autoplay state of the player to change. :param Optional[dict] filter: The filter object of the player to change. :rtype: dict""" if "volume" in options: self._volume = options["volume"] if "crossfade" in options: self._crossfade = options["crossfade"] if "autoplay" in options: self._autoplay = options["autoplay"] if "filter" in options: self._filter = options["filter"] return await self.http.setOptions(options)
[docs] async def setVolume(self, volume): r"""Set volume of the player :param float volume: The volume of the player to change. :rtype: dict""" return await self.setOptions(volume=volume)
[docs] async def setCrossfade(self, crossfade): r"""Set crossfade of the player :param float crossfade: The crossfade of the player to change. :rtype: dict""" return await self.setOptions(crossfade=crossfade)
[docs] async def setAutoplay(self, autoplay): r"""Set autoplay state of the player :param bool autoplay: The autoplay state of the player to change. :rtype: dict""" return await self.setOptions(autoplay=autoplay)
[docs] async def setFilter(self, filter): r"""Set filter of the player :param float crossfade: The filter object of the player to change. :rtype: dict""" return await self.setOptions(filter=filter)
[docs] async def pause(self): r"""Pause the player""" await self.http.pause() await self.fetchState() return
[docs] async def resume(self): r"""Resume the player""" await self.http.resume() await self.fetchState() return
[docs] async def shuffle(self): r"""Shuffle the queue :rtype: list[AudioData or AudioSource]""" data = await self.http.shuffle() self.Queue.handleGetQueue(data) return self.Queue
[docs] async def getCurrent(self): r"""Fetch current playing source :rtype: AudioSource""" self._current = await self.http.getCurrent() return self._current
[docs] async def fetchState(self): r"""Fetch current player state. :rtype: dict""" data = await self.query("getState") return data
[docs] async def fetchQueue(self, ws=True): r"""Fetch queue to force refresh the internal queue. :param Optional[bool] ws: Whether to request queue on websocket or not. :rtype: list[AudioData or AudioSource]""" if ws: await self.query("getQueue") else: self.Queue.handleGetQueue(await self.http.queue()) return self.Queue
[docs] async def requestSubtitle(self, lang=None, url=None): r"""Request to send synced subtitle to discodo node. One of the parameters is required. :param Optional[str] lang: The language to get subtitle. :param Optional[str] url: The subtitle url to fetch. :rtype: dict""" if not any([lang, url]): raise ValueError("Either `lang` or `url` is needed.") Data = {} if url: Data["url"] = url elif lang: Data["lang"] = lang return await self.query("requestSubtitle", Data)
[docs] async def getSubtitle(self, *args, callback, **kwargs): r"""Request to send synced subtitle to discodo node and handle event to callback function. ``lang`` or ``url`` is required. :param callable callback: The callback function on subtitle event, must be coroutine function. :param Optional[str] lang: The language to get subtitle. :param Optional[str] url: The subtitle url to fetch. :rtype: dict""" if not asyncio.iscoroutinefunction(callback): raise ValueError("Callback function must be coroutine function.") Data = await self.requestSubtitle(*args, **kwargs) identify_token = Data.get("identify") if not identify_token: raise ValueError(f"Subtitle not found.") _lyricsLock = asyncio.Lock() async def lyricsRecieve(lyrics): if lyrics["identify"] != identify_token or _lyricsLock.locked(): return await _lyricsLock.acquire() try: await callback(lyrics) finally: _lyricsLock.release() async def lyricsDone(Data): if Data["identify"] != identify_token: return self.dispatcher.off("Subtitle", lyricsRecieve) self.dispatcher.off("subtitleDone", lyricsDone) self.dispatcher.on("Subtitle", lyricsRecieve) self.dispatcher.on("subtitleDone", lyricsDone) return Data
[docs] async def moveTo(self, node): r"""Move the player's current Node. :param discodo.Node node: The node to move to.""" if node == self.Node: raise ValueError("Already connected to this node.") channel = self.client.client.get_channel(int(self.channel_id)) if not channel: raise ValueError("this voice client is not connected to the channel.") await self.fetchState() VC = await self.client.connect(channel, node) await VC.setOptions( volume=self.volume, crossfade=self.crossfade, autoplay=self.autoplay, filter=self.filter, ) if self.context: await VC.setContext(self.context) if self.current: await VC.putSource(self.current) if self.Queue: await VC.putSource(self.Queue) return VC
[docs] async def destroy(self): r"""Destroy the client""" return await self.query("VC_DESTROY", event="VC_DESTROYED")