Source code for discodo.utils.eventDispatcher

import asyncio
import collections
import functools
import traceback
from typing import Callable


[docs]class EventDispatcher: r"""Represents an event dispatcher similar to EventEmitter_ .. _EventEmitter: https://nodejs.org/api/events.html#events_class_eventemitter :var Optional[asyncio.AbstractEventLoop] loop: The event loop that the dispatcher uses for operation, defaults to :py:meth:`asyncio.get_event_loop`""" def __init__(self, loop: asyncio.AbstractEventLoop = None) -> None: self._Events = collections.defaultdict(list) self._listeners = collections.defaultdict(list) self._Any = [] self.loop = loop or asyncio.get_event_loop()
[docs] def on(self, event: str, func: Callable): """Adds the ``func`` function to the end of the listeners list of the ``event`` :param str event: The event name to listen to. :param Callable func: The function to call when event dispatching. :rtype: discodo.EventDispatcher""" self._Events[event].append(func) return self
[docs] def off(self, event: str, func: Callable): """Remove the ``func`` function from the listeners list of the ``event`` :param str event: The event name to remove from. :param Callable func: The function to remove from the list. :rtype: discodo.EventDispatcher""" self._Events[event].remove(func) return self
[docs] def onAny(self, func: Callable): """Adds the ``func`` function to the end of the listeners list :param Callable func: The function to call when event dispatching. :rtype: discodo.EventDispatcher""" self._Any.append(func) return self
[docs] def offAny(self, func: Callable): """Remove the ``func`` function from the listeners list :param Callable func: The function to remove from the list. :rtype: discodo.EventDispatcher""" self._Any.remove(func) return self
[docs] def dispatch(self, event_: str, *args, **kwargs) -> None: r"""Call the listeners which is matched with event name. :param str event_: The event name to dispatch. :param \*args: An argument list of data to dispatch with. :param \*kwargs: A keyword argument list of data to dispatch with.""" if self._Any: for func in self._Any: try: if asyncio.iscoroutinefunction(func): self.loop.create_task(func(event_, *args, **kwargs)) else: self.loop.call_soon_threadsafe( functools.partial(func, event_, *args, **kwargs) ) except: traceback.print_exc() listeners = self._listeners.get(event_) if listeners: for (Future, condition) in listeners: if not (Future.done() or Future.cancelled()): try: result = condition(*args, **kwargs) except Exception as exception: Future.set_exception(exception) result = True else: if result: Future.set_result( args if len(args) > 1 else (args[0] if args else None) ) if result: listeners.remove((Future, condition)) if not listeners: self._listeners.pop(event_) if not event_ in self._Events: return for func in self._Events[event_]: try: if asyncio.iscoroutinefunction(func): asyncio.run_coroutine_threadsafe(func(*args, **kwargs), self.loop) else: self.loop.call_soon_threadsafe( functools.partial(func, *args, **kwargs) ) except: traceback.print_exc()
[docs] def event(self, event: str): """A decorator that registers an event to listen to. :param str event: The event name to listen to.""" def wrapper(func): self.on(event, func) return wrapper
[docs] async def wait_for( self, event: str, condition: Callable = None, timeout: float = None ): """Waits for an event that is matching with ``condition`` to be dispatched for ``timeout`` :param str event: The event name to wait for :param Optional[Callable] condition: A predicate to check what to wait for. this function must return ``bool`` :param Optional[float] timeout: The number of seconds to wait. :raises asyncio.TimeoutError: The timeout is provided and it was reached. :rtype: Any""" Future = self.loop.create_future() if not condition: condition = lambda *_: True self._listeners[event].append((Future, condition)) return await asyncio.wait_for(Future, timeout)