Skip to content
2 changes: 1 addition & 1 deletion .github/workflows/python-pytest.yml
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ jobs:

strategy:
matrix:
python-version: ["3.9", "3.10", "3.11", "3.12", "3.13"]
python-version: ["3.8", "3.9", "3.10", "3.11", "3.12", "3.13"]
os: [ubuntu-latest, windows-latest]
runs-on: ${{ matrix.os }}

Expand Down
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -52,3 +52,4 @@ coverage.xml
# vim
*.sw?

/.vscode
48 changes: 19 additions & 29 deletions yaqd-core/yaqd_core/_is_daemon.py
Original file line number Diff line number Diff line change
Expand Up @@ -123,17 +123,7 @@ def _traits(cls) -> List[str]:

@classmethod
def main(cls):
"""Run the event loop."""
loop = asyncio.get_event_loop()
if sys.platform.startswith("win"):
signals = ()
else:
signals = (signal.SIGHUP, signal.SIGTERM, signal.SIGINT)
for s in signals:
loop.add_signal_handler(
s, lambda s=s: asyncio.create_task(cls.shutdown_all(s, loop))
)

"""parse arguments and start the event loop"""
parser = argparse.ArgumentParser()
parser.add_argument(
"--config",
Expand Down Expand Up @@ -199,19 +189,26 @@ def main(cls):
with open(config_filepath, "rb") as f:
config_file = tomli.load(f)

loop.create_task(cls._main(config_filepath, config_file, args))
# Run the event loop
try:
loop.run_forever()
asyncio.run(cls._main(config_filepath, config_file, args))
except asyncio.CancelledError:
pass
finally:
loop.close()

@classmethod
async def _main(cls, config_filepath, config_file, args=None):
"""Parse command line arguments, start event loop tasks."""
"""Parse command line arguments, run event loop."""
loop = asyncio.get_running_loop()
cls.__servers = []
if sys.platform.startswith("win"):
signals = ()
else:
signals = (signal.SIGHUP, signal.SIGTERM, signal.SIGINT)
for s in signals:
loop.add_signal_handler(
s, lambda s=s: asyncio.create_task(cls.shutdown_all(s, loop))
)

cls.__servers = set()
for section in config_file:
if section == "shared-settings":
continue
Expand All @@ -225,7 +222,7 @@ async def _main(cls, config_filepath, config_file, args=None):

while cls.__servers:
awaiting = cls.__servers
cls.__servers = []
cls.__servers = set()
await asyncio.wait(awaiting)
await asyncio.sleep(1)
loop.stop()
Expand All @@ -252,7 +249,9 @@ def server(daemon):
server(daemon), config.get("host", ""), config.get("port", None)
)
daemon._server = ser
cls.__servers.append(asyncio.create_task(ser.serve_forever()))
task = asyncio.create_task(ser.serve_forever())
cls.__servers.add(task)
task.add_done_callback(cls.__servers.discard)

@classmethod
def _parse_config(cls, config_file, section, args=None):
Expand Down Expand Up @@ -297,16 +296,7 @@ async def shutdown_all(cls, sig, loop):
# This is done after cancelling so that shutdown tasks which require the loop
# are not themselves cancelled.
[d.close() for d in cls._daemons]
tasks = [
t
for t in asyncio.all_tasks()
if (
t is not asyncio.current_task()
and "serve_forever" not in t.get_coro().__repr__()
)
]
for task in tasks:
logger.info(task.get_coro())
tasks = [t for t in asyncio.all_tasks() if t is not asyncio.current_task()]
await asyncio.gather(*tasks, return_exceptions=True)
[d._save_state() for d in cls._daemons]
if hasattr(signal, "SIGHUP") and sig == signal.SIGHUP:
Expand Down
2 changes: 1 addition & 1 deletion yaqd-core/yaqd_core/_protocol.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ def connection_made(self, transport):
self.transport = transport
self.unpacker = avrorpc.Unpacker(self._avro_protocol)
self._daemon._connection_made(peername)
self.task = asyncio.get_event_loop().create_task(self.process_requests())
self.task = asyncio.get_running_loop().create_task(self.process_requests())

def data_received(self, data):
"""Process an incomming request."""
Expand Down
2 changes: 1 addition & 1 deletion yaqd-fakes/yaqd_fakes/_fake_sensor.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ def __init__(self, name, config, config_filepath):
self._channel_generators[name] = random_walk(min_, max_)
else:
raise Exception(f"channel kind {kwargs['kind']} not recognized")
asyncio.get_event_loop().create_task(self._update_measurements())
self._loop.create_task(self._update_measurements())

async def _update_measurements(self):
while True:
Expand Down