Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
69 changes: 69 additions & 0 deletions dbus_objects/integration/jeepney.py
Original file line number Diff line number Diff line change
Expand Up @@ -240,3 +240,72 @@ async def listen(self) -> None:
await self._handle_msg(msg)
except KeyboardInterrupt:
self._logger.info('exiting...')


class AsyncIODBusServer(_JeepneyServerBase):
def __init__(self, bus: str, name: str) -> None:
'''
Async DBus server built on top of Jeepney+asyncio

:param bus: DBus bus (hint: usually SESSION or SYSTEM)
:param name: DBus name
'''
super().__init__(bus, name)
self._logger = logging.getLogger(self.__class__.__name__)
# TODO: support signals
# self.emit_signal_callback = self.emit_signal

# We can't have an async __init__ method, so we use this as an alternative.
@classmethod
async def new(cls, bus: str, name: str) -> AsyncIODBusServer:
inst = cls(bus, name)
await inst._conn_start()
return inst

async def _conn_start(self) -> None:
'''
Start DBus connection
'''
import jeepney.io.asyncio

self._conn = await jeepney.io.asyncio.open_dbus_connection(self._bus)
async with jeepney.io.asyncio.DBusRouter(self._conn) as router:
bus_proxy = jeepney.io.asyncio.Proxy(jeepney.message_bus, router)
await bus_proxy.RequestName(self._name)

async def _handle_msg(self, msg: jeepney.Message) -> None:
'''
Handle message

:param msg: message to handle
'''
return_msg = self._jeepney_handle_msg(msg)
if return_msg:
await self._conn.send(return_msg)

async def emit_signal(self, signal: dbus_objects._DBusSignal, path: str, body: Any) -> None:
self._logger.debug(f'emitting signal: {signal.name} {body}')
await self._conn.send_message(self._get_signal_msg(signal, path, body))

async def close(self) -> None:
'''
Close the DBus connection
'''
await self._conn.close()

async def listen(self) -> None:
'''
Start listening and handling messages
'''
self._log_topology()
try:
while True:
try:
msg = await self._conn.receive()
except ConnectionResetError:
self._logger.debug('connection reset abruptly, restarting...')
await self._conn_start()
else:
await self._handle_msg(msg)
except KeyboardInterrupt:
self._logger.info('exiting...')
2 changes: 2 additions & 0 deletions setup.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -33,11 +33,13 @@ jeepney =
jeepney >= 0.5
test =
pytest
pytest-asyncio
pytest-subtests
pytest-cov
pytest-trio
jeepney >= 0.5
trio
async-timeout
xmldiff
docs =
furo
Expand Down
65 changes: 65 additions & 0 deletions tests/test_jeepney_asyncio.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
# SPDX-License-Identifier: MIT

import asyncio
import async_timeout
import contextlib
import jeepney
import jeepney.io.asyncio
import pytest

from dbus_objects.integration.jeepney import AsyncIODBusServer


pytestmark = [
pytest.mark.asyncio,
]


@pytest.fixture()
async def jeepney_asyncio_server(obj, event_loop):
server = await AsyncIODBusServer.new(
bus='SESSION',
name='io.github.ffy00.dbus-objects.jeepney_asyncio_test',
)

server.register_object('/io/github/ffy00/dbus_objects/example', obj)

listen = event_loop.create_task(server.listen())
try:
yield

finally:
listen.cancel()
await server.close()
with contextlib.suppress(asyncio.CancelledError):
await listen


@pytest.fixture()
def jeepney_asyncio_client():
yield jeepney.DBusAddress(
'/io/github/ffy00/dbus_objects/example',
bus_name='io.github.ffy00.dbus-objects.jeepney_asyncio_test',
interface='com.example.object.ExampleObject',
)


@pytest.fixture()
async def jeepney_asyncio_router():
async with jeepney.io.asyncio.open_dbus_router(bus='SESSION') as router:
yield router


async def test_create_error():
with pytest.raises(jeepney.DBusErrorResponse):
await AsyncIODBusServer.new(bus='SESSION', name='org.freedesktop.DBus')


async def test_listen_asyncio(jeepney_asyncio_client, jeepney_asyncio_router, jeepney_asyncio_server):
msg = jeepney.new_method_call(jeepney_asyncio_client, 'Ping', '', tuple())

async with async_timeout.timeout(3):
reply = await jeepney_asyncio_router.send_and_get_reply(msg)

assert reply.header.message_type is jeepney.MessageType.method_return
assert reply.body[0] == 'Pong!'