Mit dem Commit b212b90 (10. Dezember 2025) wurde die gesamte Thread-basierte Implementierung durch asyncio ersetzt. Dieser Leitfaden hilft bestehenden Nutzern, ihre Integrationen und Skripte an die neue asynchrone API anzupassen.
- Höhere Performance – Asynchrone I/O-Operationen blockieren nicht den gesamten Prozess.
- Einfachere Integration – Moderne Python-Bibliotheken setzen zunehmend auf asyncio.
- Bessere Wartbarkeit – Klare Trennung von Aufgaben durch
async/await. - MQTT-Integration – Die neue MQTT-Bridge nutzt
aiomqtt, das nahtlos in asyncio‑Event‑Loops integriert ist.
Vorher (Thread-basiert):
from signalduino.controller import SignalduinoController
from signalduino.transport import SerialTransport
transport = SerialTransport(port="/dev/ttyUSB0")
controller = SignalduinoController(transport=transport)
controller.start() # Startet Reader- und Parser-Threads
controller.join() # Blockiert, bis Threads beendet sindNachher (asynchron):
import asyncio
from signalduino.controller import SignalduinoController
from signalduino.transport import SerialTransport
async def main():
transport = SerialTransport(port="/dev/ttyUSB0")
controller = SignalduinoController(transport=transport)
async with controller: # Asynchroner Kontextmanager
await controller.run() # Asynchrone Hauptschleife
asyncio.run(main())Alle Transporte (SerialTransport, TCPTransport) sind jetzt asynchrone Kontextmanager und bieten asynchrone Methoden:
await transport.aopen()statttransport.open()await transport.aclose()statttransport.close()await transport.readline()statttransport.readline()(blockierend)await transport.write_line(data)statttransport.write_line(data)
Der MqttPublisher ist jetzt vollständig asynchron und muss mit async with verwendet werden:
from signalduino.mqtt import MqttPublisher
from signalduino.types import DecodedMessage
async def example():
publisher = MqttPublisher()
async with publisher:
msg = DecodedMessage(...)
await publisher.publish(msg)Callback-Funktionen, die an den Controller übergeben werden (z.B. message_callback), müssen asynchron sein:
async def my_callback(message: DecodedMessage):
print(f"Received: {message.protocol_id}")
# Asynchrone Operationen erlaubt, z.B.:
# await database.store(message)
controller = SignalduinoController(
transport=transport,
message_callback=my_callback # ← async Funktion
)Die Ausführung von Befehlen (z.B. version, set) erfolgt asynchron über den Controller:
async with controller:
version = await controller.execute_command("version")
print(f"Firmware: {version}")Stellen Sie sicher, dass Sie die neueste Version des Projekts installiert haben:
cd PySignalduino
git pull
pip install -e . --upgradeDie neuen Abhängigkeiten (aiomqtt, pyserial-asyncio) werden automatisch installiert.
Wenn Sie ein eigenes Skript verwenden, das den Controller direkt instanziiert:
- Event‑Loop – Verwenden Sie
asyncio.run()als Einstiegspunkt. - Kontextmanager – Nutzen Sie
async with controller:stattcontroller.start()/controller.stop(). - Async/Await – Markieren Sie alle Funktionen, die auf den Controller zugreifen, mit
asyncund verwenden Sieawaitfür asynchrone Aufrufe.
Beispiel – Migration eines einfachen Skripts:
# ALT
def main():
transport = SerialTransport(...)
controller = SignalduinoController(transport)
controller.start()
try:
while True:
time.sleep(1)
except KeyboardInterrupt:
controller.stop()
# NEU
async def main():
transport = SerialTransport(...)
controller = SignalduinoController(transport)
async with controller:
# Hauptschleife: Controller.run() läuft intern
await controller.run(timeout=None)
if __name__ == "__main__":
asyncio.run(main())Suchen Sie nach Callback‑Definitionen (z.B. message_callback, command_callback) und machen Sie sie asynchron:
# ALT
def on_message(msg):
print(msg)
# NEU
async def on_message(msg):
print(msg)
# Falls Sie asynchrone Bibliotheken verwenden:
# await mqtt_client.publish(...)Falls Sie eigene Tests haben, die unittest oder pytest mit Thread‑Mocks verwenden, müssen Sie auf pytest‑asyncio und AsyncMock umstellen:
# ALT
with patch("signalduino.controller.SerialTransport") as MockTransport:
transport = MockTransport.return_value
transport.readline.return_value = "MS;..."
# NEU
@pytest.mark.asyncio
async def test_controller():
with patch("signalduino.controller.SerialTransport") as MockTransport:
transport = AsyncMock()
transport.readline.return_value = "MS;..."Vermeiden Sie blockierende Funktionen wie time.sleep() oder serial.Serial.read(). Verwenden Sie stattdessen:
await asyncio.sleep(1)statttime.sleep(1)await transport.readline()statttransport.readline()(blockierend)
Vergessene await‑Schlüsselwörter führen zu RuntimeWarning oder hängen das Programm auf. Achten Sie besonders auf:
await controller.run()await publisher.publish()await transport.write_line()
Wenn Sie Threads und asyncio mischen müssen (z.B. für Legacy‑Code), verwenden Sie asyncio.run_coroutine_threadsafe() oder loop.call_soon_threadsafe().
Wenn asyncio.Queue.get() in einer while True-Schleife ständig Elemente zurückgibt (z.B. bei hohem Nachrichtenaufkommen), kann die Co-Routine den Event-Loop dominieren, selbst wenn die schwere Arbeit in einem Thread-Pool ausgelagert wird. Dies führt zu hoher CPU-Auslastung und sporadischer Bearbeitung anderer Async-Tasks.
Lösung: Stellen Sie in schnell laufenden Verarbeitungsschleifen sicher, dass ein expliziter Yield-Punkt vorhanden ist, um anderen Tasks die Kontrolle zu übergeben.
# Falsch (potenzielle Busy-Loop bei vollem Buffer)
# while not self._stop_event.is_set():
# item = await queue.get()
# process_item(item) # Wenn schnell, dominiert diese Task
# Korrekt
while not self._stop_event.is_set():
try:
line = await self._raw_message_queue.get()
# ... Verarbeitung (kann await asyncio.to_thread enthalten) ...
# Sicherstellen, dass andere Tasks Zeit bekommen
await asyncio.sleep(0.01)
except Exception:
breakHier ein komplettes Beispiel, das einen einfachen MQTT‑Bridge‑Service migriert:
# ALT: Thread-basierter Bridge-Service
import time
from signalduino.controller import SignalduinoController
from signalduino.transport import SerialTransport
from signalduino.mqtt import MqttPublisher
def message_callback(msg):
publisher = MqttPublisher()
publisher.connect()
publisher.publish(msg)
publisher.disconnect()
def main():
transport = SerialTransport(port="/dev/ttyUSB0")
controller = SignalduinoController(
transport=transport,
message_callback=message_callback
)
controller.start()
try:
while True:
time.sleep(1)
except KeyboardInterrupt:
controller.stop()
# NEU: Asynchrone Version
import asyncio
from signalduino.controller import SignalduinoController
from signalduino.transport import SerialTransport
from signalduino.mqtt import MqttPublisher
async def message_callback(msg):
# Publisher ist jetzt asynchron und muss mit async with verwendet werden
publisher = MqttPublisher()
async with publisher:
await publisher.publish(msg)
async def main():
transport = SerialTransport(port="/dev/ttyUSB0")
controller = SignalduinoController(
transport=transport,
message_callback=message_callback
)
async with controller:
await controller.run()
if __name__ == "__main__":
asyncio.run(main())- Logging aktivieren – Setzen Sie
LOG_LEVEL=DEBUG, um detaillierte Informationen über asynchrone Operationen zu erhalten. - Tests als Referenz – Die Testdateien
tests/test_controller.pyundtests/test_mqtt.pyzeigen korrekte asynchrone Nutzung. - Issue melden – Falls Sie auf Probleme stoßen, öffnen Sie ein Issue im Repository.
Es gibt keine Rückwärtskompatibilität für die Thread‑API. Ältere Skripte, die controller.start() oder controller.stop() aufrufen, müssen angepasst werden.
Nach der Migration können Sie die neuen Features nutzen:
- MQTT‑Integration – Nutzen Sie den integrierten Publisher/Subscriber.
- Kompression – Aktivieren Sie die Payload‑Kompression für effizientere MQTT‑Nachrichten.
- Heartbeat – Überwachen Sie die Verbindung mit dem MQTT‑Heartbeat.
Weitere Informationen finden Sie in der Benutzerdokumentation und der MQTT‑Dokumentation.