Source code for cordy.runner
"""High level runners for cordy only apps
"""
from __future__ import annotations
import asyncio
import signal
import sys
from asyncio.runners import _cancel_all_tasks # type: ignore[attr-defined]
from functools import partial
from typing import TYPE_CHECKING
if TYPE_CHECKING:
from collections.abc import Iterable
from .client import Client
__all__ = (
"run", "run_all", "launch", "launch_all"
)
def _set_fut(fut: asyncio.Future):
if not fut.done():
fut.set_result(True)
[docs]async def run(client: Client):
"""Start a client and await till it is stopped
Parameters
----------
client : :class:`cordy.client.Client`
The Client to be ran.
"""
loop = asyncio.get_running_loop()
fut = loop.create_future()
client._closed_cb = partial(_set_fut, fut)
await client.connect()
try:
await fut
finally:
await client.close()
[docs]async def run_all(clients: Iterable[Client]):
"""Start multiple clients and await till all have stopped.
Parameters
----------
clients : Iterable[:class:`cordy.client.Client`]
The iterable of clients to be ran.
"""
loop = asyncio.get_running_loop()
await asyncio.gather(*(loop.create_task(run(client)) for client in set(clients)))
[docs]def launch(client: Client):
"""Run the client until it is closed.
Cleanup due to unforseen cancellation is handled.
Parameters
----------
client : :class:`~cordy.client.Client`
The client to run.
"""
return run_loop(run(client))
[docs]def launch_all(clients: Iterable[Client]):
"""launch all the given clients util all clients are closed.
Cleanup for each client due to unforseen cancellation is handled.
Parameters
----------
clients : Iterable[:class:`~cordy.client.Client`]
All the clients to launch.
"""
return run_loop(run_all(clients))
# Proactor Transport require open event loop.
DEFAULT_CLOSE = not sys.platform.startswith("win")
def run_loop(coro, *, close: bool = DEFAULT_CLOSE, debug: bool | None = None):
try:
loop = asyncio.get_running_loop()
except RuntimeError:
pass
else:
# just schedule coro if running
asyncio.ensure_future(coro)
if loop.is_running():
return
policy = asyncio.get_event_loop_policy()
try:
loop = policy.get_event_loop()
except:
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
if debug is not None:
loop.set_debug(debug)
def stop_loop(loop):
loop.stop()
try:
loop.add_signal_handler(signal.SIGINT, stop_loop, loop)
loop.add_signal_handler(signal.SIGTERM, stop_loop, loop)
except NotImplementedError:
pass
try:
loop.run_until_complete(coro)
except KeyboardInterrupt:
pass
finally:
try:
_cancel_all_tasks(loop)
loop.run_until_complete(loop.shutdown_asyncgens())
loop.run_until_complete(loop.shutdown_default_executor())
finally:
if close:
loop.close()