Source code for discodo.client.node

import asyncio
import json
import logging
import os
import secrets
import sys

import aiohttp
from websockets import ConnectionClosed

from .. import __dirname
from ..errors import NodeNotConnected, VoiceClientNotFound
from ..utils import EventDispatcher, tcp
from .gateway import NodeConnection
from .models import ensureQueueObjectType
from .voice_client import VoiceClient

log = logging.getLogger("discodo.client.node")

LocalNodeProc = None


def getLocalNode():
    return LocalNodeProc


async def launchLocalNode(**options):
    global LocalNodeProc

    if LocalNodeProc and LocalNodeProc.returncode is not None:
        raise ValueError("LocalNode already launched.")

    options["HOST"] = "127.0.0.1"
    options["PORT"] = tcp.getFreePort()
    options["PASSWORD"] = secrets.token_hex()

    log.info(
        f"trying to spawn local node process on {options['HOST']}:{options['PORT']}"
    )

    os.environ["PYTHONPATH"] = os.path.dirname(__dirname)

    LocalNodeProc = await asyncio.create_subprocess_exec(
        sys.executable, "-m", "discodo", "--config-json", json.dumps(options)
    )

    LocalNodeProc.HOST = options["HOST"]
    LocalNodeProc.PORT = options["PORT"]
    LocalNodeProc.PASSWORD = options["PASSWORD"]

    if LocalNodeProc.returncode:
        log.debug(
            f"while launching local node process, returned {LocalNodeProc.returncode}"
        )

        raise SystemError("Cannot launch discodo subprocess.")

    loop = asyncio.get_event_loop()

    for _ in range(30):
        try:
            transport, _ = await loop.create_connection(
                asyncio.Protocol, host=LocalNodeProc.HOST, port=LocalNodeProc.PORT
            )
            transport.close()
        except (OSError, ConnectionRefusedError):
            await asyncio.sleep(1)
            continue

        return LocalNodeProc

    LocalNodeProc.kill()

    raise SystemError("Testing local node connection timed out.")


[docs]class Node: r"""Represents a discodo node connection. :var discodo.DPYClient client: The client which the node is binded. :var Optional ws: The websocket gateway the client is currently connected to. :var EventDispatcher dispatcher: The event dispatcher that the client dispatches events. :var asyncio.AbstractEventLoop loop: The event loop that the client uses for operation. :var str host: The host of the node to connect to. :var int port: The port of the node to connect to. :var str password: The password of the node to connect to. :var int user_id: This bot's ID :var Optional[int] shard_id: This bot's shard ID, could be ``None`` :var Optional[str] region: Region set when registering a node :var dict voiceClients: A dictionary consisting of pairs of guild IDs and voice clients.""" def __init__( self, client, host, port, user_id, shard_id=None, password="hellodiscodo", region=None, ): self.client = client self.ws = None self.session = aiohttp.ClientSession() self.dispatcher = EventDispatcher() self.dispatcher.onAny(self.onAnyEvent) self.loop = asyncio.get_event_loop() self.host = host self.port = port self.password = password self.user_id = user_id self.shard_id = shard_id self.region = region self._polling = None self.voiceClients = {} self.connected = asyncio.Event() def __del__(self): self.loop.call_soon_threadsafe( lambda: self.loop.create_task(self.session.close()) ) def __repr__(self) -> str: return ( f"<Node connected={self.is_connected} host={self.host} port={self.port}" + f" region={self.region} voiceClients={len(self.voiceClients)}>" ) @property def URL(self) -> str: r"""Represents the restful api url of the node. :rtype: str""" return f"http://{self.host}:{self.port}" @property def WS_URL(self) -> str: r"""Represents the websocket url of the node. :rtype: str""" return f"ws://{self.host}:{self.port}/ws" @property def is_connected(self) -> bool: r"""Represents whether the node is connected. :rtype: bool""" return self.connected.is_set() and self.ws and self.ws.is_connected
[docs] async def connect(self): r"""Connect to the node. :raises ValueError: The node is already connected.""" if self.connected.is_set(): raise ValueError("Node already connected") if self.ws and self.ws.is_connected: await self.ws.close() self.ws = await NodeConnection.connect(self) self.voiceClients = {} self.connected.set() if not self._polling or self._polling.done(): self._polling = self.loop.create_task(self.pollingWS()) await self.send( "IDENTIFY", {"user_id": self.user_id, "shard_id": self.shard_id} )
[docs] async def close(self): """ some action to do after disconnected from node """ ...
[docs] async def destroy(self): r"""Destroy the node and remove from the client.""" if self._polling and not self._polling.done(): self._polling.cancel() if self.ws and not self.ws.closed: await self.ws.close() self.ws = None self.connected.clear() self.voiceClients = {} if self in self.client.Nodes: self.client.Nodes.remove(self)
async def pollingWS(self) -> None: while True: try: Operation, Data = await self.ws.poll() except (asyncio.TimeoutError, ConnectionClosed): self.connected.clear() if self.ws: await self.ws.close() self.ws = None await self.close() self.voiceClients = {} return if Data and isinstance(Data, dict) and "guild_id" in Data: VC = self.getVC(Data["guild_id"], safe=True) if VC: Data = ensureQueueObjectType(VC, Data) VC.dispatcher.dispatch(Operation, Data) self.dispatcher.dispatch(Operation, Data)
[docs] async def send(self, op, data=None): """Send websocket payload to the node :param str op: Operation name of the payload :param Optional[dict] data: Operation data to send with :raises discodo.NodeNotConnected: The node is not connnected.""" if not self.ws: raise NodeNotConnected return await self.ws.sendJson({"op": op, "d": data})
async def onResumed(self, Data): for voice_client in self.voiceClients: voice_client.__del__() for guild_id, vc_data in Data["voice_clients"].items(): self.voiceClients[int(guild_id)] = VoiceClient( self, vc_data["id"], guild_id ) async def onAnyEvent(self, Operation, Data): if Operation == "RESUMED": await self.onResumed(Data) if Operation == "VC_CREATED": guild_id = int(Data["guild_id"]) self.voiceClients[guild_id] = VoiceClient(self, Data["id"], guild_id) if Operation == "VC_DESTROYED": guild_id = int(Data["guild_id"]) if guild_id in self.voiceClients: self.voiceClients[guild_id].__del__()
[docs] def getVC(self, guildID, safe=False): r"""Get a voice client from the guild. :param int guildID: guild ID from which to get the voice client. :param bool safe: Whether to raise an exception when the voice client cannot be found, defaults to ``False``. :raises discodo.VoiceClientNotFound: The voice client was not found and the ``safe`` value is ``False``. :rtype: discodo.VoiceClient""" if not int(guildID) in self.voiceClients and not safe: raise VoiceClientNotFound return self.voiceClients.get(int(guildID))
[docs] async def discordDispatch(self, payload): r"""Dispatch the discord payload to the node. .. note:: If you are using :py:class:`discodo.DPYClient`, you don't have to use this. :param dict payload: The event data from the discord.""" if not payload["t"] in [ "READY", "RESUME", "VOICE_STATE_UPDATE", "VOICE_SERVER_UPDATE", ]: return return await self.send("DISCORD_EVENT", payload)
[docs] async def getStatus(self): r"""Get status like cpu usage from the node with websocket. :rtype: dict""" await self.send("GET_STATUS") return await self.dispatcher.wait_for("STATUS", timeout=10.0)
[docs]class Nodes(list): r"""Represents a discodo node connection list. You can also use it like :py:class:`list`."""
[docs] async def connect(self): """Try to connect to all registered nodes. :rtype: list""" task_list: list = list(map(lambda Node: Node.connect(), self)) if not task_list: return [] Done, _ = await asyncio.wait(task_list, return_when="ALL_COMPLETED") return list(map(lambda task: task.result(), Done))
[docs] async def getStatus(self): """Try to get status from all registered nodes. :rtype: list""" def get_task(Item): if Item.is_connected: return Item.getStatus() task_list: list = list(filter(None, map(get_task, self))) if not task_list: return [] Done, _ = await asyncio.wait(task_list, return_when="ALL_COMPLETED") return list(map(lambda task: task.result(), Done))