From aab9ff4d171d5c6b2117802fd3dbeb04428f6e29 Mon Sep 17 00:00:00 2001 From: Aleksandr Mezin Date: Mon, 12 Jul 2021 13:17:16 +0600 Subject: [PATCH] jeepney: add asyncio server --- dbus_objects/integration/jeepney.py | 69 +++++++++++++++++++++++++++++ setup.cfg | 2 + tests/test_jeepney_asyncio.py | 65 +++++++++++++++++++++++++++ 3 files changed, 136 insertions(+) create mode 100644 tests/test_jeepney_asyncio.py diff --git a/dbus_objects/integration/jeepney.py b/dbus_objects/integration/jeepney.py index 508c7a2..3d84acb 100644 --- a/dbus_objects/integration/jeepney.py +++ b/dbus_objects/integration/jeepney.py @@ -240,3 +240,72 @@ async def listen(self) -> None: await self._handle_msg(msg) except KeyboardInterrupt: self._logger.info('exiting...') + + +class AsyncIODBusServer(_JeepneyServerBase): + def __init__(self, bus: str, name: str) -> None: + ''' + Async DBus server built on top of Jeepney+asyncio + + :param bus: DBus bus (hint: usually SESSION or SYSTEM) + :param name: DBus name + ''' + super().__init__(bus, name) + self._logger = logging.getLogger(self.__class__.__name__) + # TODO: support signals + # self.emit_signal_callback = self.emit_signal + + # We can't have an async __init__ method, so we use this as an alternative. + @classmethod + async def new(cls, bus: str, name: str) -> AsyncIODBusServer: + inst = cls(bus, name) + await inst._conn_start() + return inst + + async def _conn_start(self) -> None: + ''' + Start DBus connection + ''' + import jeepney.io.asyncio + + self._conn = await jeepney.io.asyncio.open_dbus_connection(self._bus) + async with jeepney.io.asyncio.DBusRouter(self._conn) as router: + bus_proxy = jeepney.io.asyncio.Proxy(jeepney.message_bus, router) + await bus_proxy.RequestName(self._name) + + async def _handle_msg(self, msg: jeepney.Message) -> None: + ''' + Handle message + + :param msg: message to handle + ''' + return_msg = self._jeepney_handle_msg(msg) + if return_msg: + await self._conn.send(return_msg) + + async def emit_signal(self, signal: dbus_objects._DBusSignal, path: str, body: Any) -> None: + self._logger.debug(f'emitting signal: {signal.name} {body}') + await self._conn.send_message(self._get_signal_msg(signal, path, body)) + + async def close(self) -> None: + ''' + Close the DBus connection + ''' + await self._conn.close() + + async def listen(self) -> None: + ''' + Start listening and handling messages + ''' + self._log_topology() + try: + while True: + try: + msg = await self._conn.receive() + except ConnectionResetError: + self._logger.debug('connection reset abruptly, restarting...') + await self._conn_start() + else: + await self._handle_msg(msg) + except KeyboardInterrupt: + self._logger.info('exiting...') diff --git a/setup.cfg b/setup.cfg index ed74c72..6a8dd6b 100644 --- a/setup.cfg +++ b/setup.cfg @@ -33,11 +33,13 @@ jeepney = jeepney >= 0.5 test = pytest + pytest-asyncio pytest-subtests pytest-cov pytest-trio jeepney >= 0.5 trio + async-timeout xmldiff docs = furo diff --git a/tests/test_jeepney_asyncio.py b/tests/test_jeepney_asyncio.py new file mode 100644 index 0000000..b570a0a --- /dev/null +++ b/tests/test_jeepney_asyncio.py @@ -0,0 +1,65 @@ +# SPDX-License-Identifier: MIT + +import asyncio +import async_timeout +import contextlib +import jeepney +import jeepney.io.asyncio +import pytest + +from dbus_objects.integration.jeepney import AsyncIODBusServer + + +pytestmark = [ + pytest.mark.asyncio, +] + + +@pytest.fixture() +async def jeepney_asyncio_server(obj, event_loop): + server = await AsyncIODBusServer.new( + bus='SESSION', + name='io.github.ffy00.dbus-objects.jeepney_asyncio_test', + ) + + server.register_object('/io/github/ffy00/dbus_objects/example', obj) + + listen = event_loop.create_task(server.listen()) + try: + yield + + finally: + listen.cancel() + await server.close() + with contextlib.suppress(asyncio.CancelledError): + await listen + + +@pytest.fixture() +def jeepney_asyncio_client(): + yield jeepney.DBusAddress( + '/io/github/ffy00/dbus_objects/example', + bus_name='io.github.ffy00.dbus-objects.jeepney_asyncio_test', + interface='com.example.object.ExampleObject', + ) + + +@pytest.fixture() +async def jeepney_asyncio_router(): + async with jeepney.io.asyncio.open_dbus_router(bus='SESSION') as router: + yield router + + +async def test_create_error(): + with pytest.raises(jeepney.DBusErrorResponse): + await AsyncIODBusServer.new(bus='SESSION', name='org.freedesktop.DBus') + + +async def test_listen_asyncio(jeepney_asyncio_client, jeepney_asyncio_router, jeepney_asyncio_server): + msg = jeepney.new_method_call(jeepney_asyncio_client, 'Ping', '', tuple()) + + async with async_timeout.timeout(3): + reply = await jeepney_asyncio_router.send_and_get_reply(msg) + + assert reply.header.message_type is jeepney.MessageType.method_return + assert reply.body[0] == 'Pong!'