Source code for cordy.client

from __future__ import annotations

from logging import getLogger
from typing import TYPE_CHECKING, Any, Callable, Sequence
from asyncio import get_running_loop

from .auth import Token
from .events import Emitter, Publisher
from .gateway import Sharder
from .http import HTTPSession
from .models import Intents

if TYPE_CHECKING:
    from .auth import StrOrToken
    from .events import CheckFn, CoroFn
    from .gateway import BaseSharder, Shard

__all__ = (
    "Client",
)

logger = getLogger("cordy.client")

[docs]class Client: """The discord API Gateway client. Attributes ---------- token : :class:`~cordy.auth.Token` The gateway authorization token. intents : :class:`~cordy.models.Intent` The gateway intents used by the client emitter : :class:`~cordy.events.Emitter` The client event emitter. You don't need to interact with this if you do not want custom events publisher : :class:`~cordy.events.Publisher` The event publisher. shard_ids : :class:`set`, (``set[int] | None``) The shard ids to launch. If :data:`None` then sharder automatically shards the client num_shard : :class:`int`, (``int | None``) The total number of shards the client has, this number includes shards which are not under this client. sharder : :class:`~cordy.gateway.BaseSharder`, (``BaseSharder[Shard]``) The gateway sharder for the client. Parameters ---------- token : :class:`str`, :class:`Token`, (``str | Token``) The token for the gateway. Is a string is provided then a bot token is built. This is a required argument. All following arguments are keyword only. intents : :class:`~cordy.models.Intent` This is an optional parameter. The gateway intents used by the client. If None, then value returned by :meth:`~cordy.models.Intent.default` is used. by default None. sharder_cls: type[:class:`~cordy.gateway.BaseSharder`] The sharder type to use. Must subclass the :class:`~cordy.gateway.BaseSharder` protocol. If :data:`None` then :class:`~cordy.gateway.Sharder` is used. By default :data:`None` num_shards : :class:`int` The total number of shards. If :data:`None` :attr:`.Client.sharder` provided value will be used. By default :data:`None` shard_ids : Sequence[:class:`int`] A sequence of shard ids that this client should launch. If not :data:`None` then ``num_shards`` parameter must be provided. If :data:`None` then shards are generated from ``num_shards`` or :attr:`.Client.sharder` provided value will be used. """ num_shards: int | None shard_ids: set[int] | None http: HTTPSession def __init__( self, token: StrOrToken, *, intents: Intents | None = None, sharder_cls: type[BaseSharder] = Sharder, num_shards: int | None = None, shard_ids: Sequence[int] | None = None ): self.intents = intents or Intents.default() self.token = token if isinstance(token, Token) else Token(token, bot=True) self.emitter = Emitter() self.publisher = Publisher(None) self.publisher.add(self.emitter) self.http = HTTPSession(self.token) self.sharder = sharder_cls(self, set(shard_ids) if shard_ids else None, num_shards) self._closed_cb: Callable | None = None self._closed: bool = False @property def shards(self) -> list[Shard]: "list[:class:`~cordy.gateway.Shard`] : All the shards under this client" return self.sharder.shards
[docs] def listen(self, name: str | None = None) -> Callable[[CoroFn], CoroFn]: """This method is used as a decorator. Add the decorated function as a listener for the specified event Parameters ---------- name : :class:`str` The name of the event. If :data:`None` then the name of the function is used, by default :data:`None`. Returns ------- Callable The decorator. """ def deco(func: CoroFn): self.publisher.subscribe(func, name = name or func.__name__.lower()) return func return deco
[docs] def add_listener(self, func: CoroFn, name: str | None = None) -> None: """Add a listener for the given event. Parameters ---------- func : Callable[..., Coroutine] The coroutine function to add as a listener. This is a required argument. name : :class:`str` The name of the event. If :data:`None` then the name of the function is used, by default :data:`None`. """ return self.publisher.subscribe(func, name = name or func.__name__.lower())
[docs] def remove_listener(self, func: CoroFn, name: str | None = None) -> None: """Remove a registered listener. If the listener or event is not found, then does nothing. Parameters ---------- func : Callable[..., Coroutine] The coroutine function which needs to be unsubscribed. This is a required argument. name : :class:`str` The name of the event. If :data:`None` then the name of the function is used, by default :data:`None`. """ return self.publisher.unsubscribe(func, name)
[docs] async def wait_for(self, name: str, timeout: int | None = None, check: CheckFn | None = None) -> tuple[Any, ...]: """Wait for an event to occur. Parameters ---------- name : :class:`str` The name of the event to wait for. timeout : :class:`int` The time to wait for the event to occur, if :data:`None` then wait indefinetly, by default :data:`None` check : Callable[..., :class:`bool`] The check function, a subroutine returning a boolean, the provided arguments are the same as the returned :class:`tuple`. If the check function returns :data:`True` then the same data that was given to the check function will be returned, or else continue waiting for the event. If :data:`None` then return the first event data, by default :data:`None` Returns ------- tuple[Any, ...] The data associated with the event. This is the same as the arguments received by listeners for the event. """ return await self.publisher.wait_for(name, timeout, check)
[docs] async def setup(self) -> None: """Initialise client with the current running event loop. This is implicity called when the gateway is launched, otherwise this needs to be called explicitly with a running loop. """ try: loop = get_running_loop() except RuntimeError as err: raise Exception("Ran Coroutine without a running aysncio event loop") from err else: if loop is not self.http.session._loop: self.http = HTTPSession(self.token)
[docs] async def connect(self) -> None: """Launch all shards and connect to gateway. """ if self._closed: raise ValueError("Can't connect with a closed client.") await self.setup() await self.sharder.launch_shards()
[docs] async def disconnect(self, *, code: int = 4000, message: str = "") -> None: """Disconnect all shards from the gateway Parameters ---------- code : :class:`int` Optional status code to close the gateway connection, by default 4000 message : :class:`str` A message to close the gateway connection, by default an empty string. """ if self._closed: return for shard in self.shards: await shard.disconnect(code=code, message=message) logger.info("Shards %s disconnected from gateway", self.sharder.shard_ids)
[docs] async def reconnect(self) -> None: """Reconnect all shards to the gateway """ if self._closed: raise ValueError("Can't reconnect with a closed client.") for shard in self.shards: await shard.reconnect() logger.info("Shards %s reconnected to gateway", self.sharder.shard_ids)
[docs] async def close(self) -> None: """Close the client permanently. """ if self._closed: return await self.disconnect(code=1000, message="Client Closed") await self.http.close() if self._closed_cb: from asyncio import get_running_loop get_running_loop().call_soon(self._closed_cb) self._closed_cb = None logger.info("Client closed") # add user info here