diff --git a/.github/workflows/python-pytest.yml b/.github/workflows/python-pytest.yml index 7cc9631..a6c2c8d 100644 --- a/.github/workflows/python-pytest.yml +++ b/.github/workflows/python-pytest.yml @@ -10,7 +10,7 @@ jobs: strategy: matrix: - python-version: ["3.9", "3.10", "3.11", "3.12", "3.13"] + python-version: ["3.8", "3.9", "3.10", "3.11", "3.12", "3.13", "3.14"] os: [ubuntu-latest, windows-latest] runs-on: ${{ matrix.os }} diff --git a/.gitignore b/.gitignore index 90aa4fe..248920a 100644 --- a/.gitignore +++ b/.gitignore @@ -52,3 +52,4 @@ coverage.xml # vim *.sw? +/.vscode diff --git a/yaqd-core/CHANGELOG.md b/yaqd-core/CHANGELOG.md index 5f8d640..0eb919b 100644 --- a/yaqd-core/CHANGELOG.md +++ b/yaqd-core/CHANGELOG.md @@ -6,6 +6,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/). ## [Unreleased] ### Fixed +- removed asyncio syntax that was removed in python 3.14 - type hints for IsSensor attributes are appropriate for _n_-dimensional data ## [2023.11.0] diff --git a/yaqd-core/yaqd_core/_is_daemon.py b/yaqd-core/yaqd_core/_is_daemon.py old mode 100755 new mode 100644 index c2940fe..0ba6cd7 --- a/yaqd-core/yaqd_core/_is_daemon.py +++ b/yaqd-core/yaqd_core/_is_daemon.py @@ -123,17 +123,7 @@ def _traits(cls) -> List[str]: @classmethod def main(cls): - """Run the event loop.""" - loop = asyncio.get_event_loop() - if sys.platform.startswith("win"): - signals = () - else: - signals = (signal.SIGHUP, signal.SIGTERM, signal.SIGINT) - for s in signals: - loop.add_signal_handler( - s, lambda s=s: asyncio.create_task(cls.shutdown_all(s, loop)) - ) - + """parse arguments and start the event loop""" parser = argparse.ArgumentParser() parser.add_argument( "--config", @@ -199,19 +189,26 @@ def main(cls): with open(config_filepath, "rb") as f: config_file = tomli.load(f) - loop.create_task(cls._main(config_filepath, config_file, args)) + # Run the event loop try: - loop.run_forever() + asyncio.run(cls._main(config_filepath, config_file, args)) except asyncio.CancelledError: pass - finally: - loop.close() @classmethod async def _main(cls, config_filepath, config_file, args=None): - """Parse command line arguments, start event loop tasks.""" + """Parse command line arguments, run event loop.""" loop = asyncio.get_running_loop() - cls.__servers = [] + if sys.platform.startswith("win"): + signals = () + else: + signals = (signal.SIGHUP, signal.SIGTERM, signal.SIGINT) + for s in signals: + loop.add_signal_handler( + s, lambda s=s: asyncio.create_task(cls.shutdown_all(s, loop)) + ) + + cls.__servers = set() for section in config_file: if section == "shared-settings": continue @@ -225,7 +222,7 @@ async def _main(cls, config_filepath, config_file, args=None): while cls.__servers: awaiting = cls.__servers - cls.__servers = [] + cls.__servers = set() await asyncio.wait(awaiting) await asyncio.sleep(1) loop.stop() @@ -252,7 +249,9 @@ def server(daemon): server(daemon), config.get("host", ""), config.get("port", None) ) daemon._server = ser - cls.__servers.append(asyncio.create_task(ser.serve_forever())) + task = asyncio.create_task(ser.serve_forever()) + cls.__servers.add(task) + task.add_done_callback(cls.__servers.discard) @classmethod def _parse_config(cls, config_file, section, args=None): @@ -297,16 +296,7 @@ async def shutdown_all(cls, sig, loop): # This is done after cancelling so that shutdown tasks which require the loop # are not themselves cancelled. [d.close() for d in cls._daemons] - tasks = [ - t - for t in asyncio.all_tasks() - if ( - t is not asyncio.current_task() - and "serve_forever" not in t.get_coro().__repr__() - ) - ] - for task in tasks: - logger.info(task.get_coro()) + tasks = [t for t in asyncio.all_tasks() if t is not asyncio.current_task()] await asyncio.gather(*tasks, return_exceptions=True) [d._save_state() for d in cls._daemons] if hasattr(signal, "SIGHUP") and sig == signal.SIGHUP: diff --git a/yaqd-core/yaqd_core/_protocol.py b/yaqd-core/yaqd_core/_protocol.py index 6644a08..4609714 100644 --- a/yaqd-core/yaqd_core/_protocol.py +++ b/yaqd-core/yaqd_core/_protocol.py @@ -17,18 +17,20 @@ def __init__(self, daemon, *args, **kwargs): def connection_lost(self, exc): peername = self.transport.get_extra_info("peername") - self.logger.info(f"Connection lost from {peername} to {self._daemon.name}") + self.logger.info(f"Connection lost from {peername}") self.task.cancel() self._daemon._connection_lost(peername) def connection_made(self, transport): """Process an incomming connection.""" peername = transport.get_extra_info("peername") - self.logger.info(f"Connection made from {peername} to {self._daemon.name}") + self.logger.info(f"Connection made from {peername}") self.transport = transport self.unpacker = avrorpc.Unpacker(self._avro_protocol) self._daemon._connection_made(peername) - self.task = asyncio.get_event_loop().create_task(self.process_requests()) + self.task = self._daemon._loop.create_task(self.process_requests()) + self._daemon._tasks.append(self.task) + self.task.add_done_callback(self._daemon._tasks.remove) def data_received(self, data): """Process an incomming request.""" @@ -38,61 +40,72 @@ def data_received(self, data): self.unpacker.feed(data) async def process_requests(self): - async for hs, meta, name, params in self.unpacker: - if hs is not None: - out = bytes(hs) - out = struct.pack(">L", len(out)) + out - self.transport.write(out) - if hs.match == "NONE": - name = "" + try: + async for hs, meta, name, params in self.unpacker: + if hs is not None: + out = bytes(hs) + out = struct.pack(">L", len(out)) + out + self.transport.write(out) + if hs.match == "NONE": + name = "" - out_meta = io.BytesIO() - fastavro.schemaless_writer( - out_meta, {"type": "map", "values": "bytes"}, meta - ) - length = out_meta.tell() - self.transport.write(struct.pack(">L", length) + out_meta.getvalue()) - self.logger.debug(f"Wrote meta, {meta}, {out_meta.getvalue()}") - try: - response_out = io.BytesIO() - response = None - response_schema = "null" - if name: - fun = getattr(self._daemon, name) - if params is None: - params = [] - response = fun(*params) - response_schema = fastavro.parse_schema( - self._avro_protocol["messages"][name].get("response", "null"), - expand=True, - named_schemas=self._named_types, + out_meta = io.BytesIO() + fastavro.schemaless_writer( + out_meta, {"type": "map", "values": "bytes"}, meta + ) + length = out_meta.tell() + self.transport.write(struct.pack(">L", length) + out_meta.getvalue()) + self.logger.debug(f"Wrote meta, {meta}, {out_meta.getvalue()}") + try: + response_out = io.BytesIO() + response = None + response_schema = "null" + if name: + fun = getattr(self._daemon, name) + if params is None: + params = [] + response = fun(*params) + response_schema = fastavro.parse_schema( + self._avro_protocol["messages"][name].get( + "response", "null" + ), + expand=True, + named_schemas=self._named_types, + ) + # Needed twice for nested types... Probably can be fixed upstream + response_schema = fastavro.parse_schema( + response_schema, + expand=True, + named_schemas=self._named_types, + ) + fastavro.schemaless_writer(response_out, response_schema, response) + except Exception as e: + self.logger.error(f"Caught exception: {type(e)} in message {name}") + self.logger.debug(traceback.format_exc()) + self.transport.write(struct.pack(">L", 1) + b"\1") + error_out = io.BytesIO() + fastavro.schemaless_writer(error_out, ["string"], repr(e)) + length = error_out.tell() + self.transport.write( + struct.pack(">L", length) + error_out.getvalue() ) - # Needed twice for nested types... Probably can be fixed upstream - response_schema = fastavro.parse_schema( - response_schema, - expand=True, - named_schemas=self._named_types, + else: + self.transport.write(struct.pack(">L", 1) + b"\0") + self.logger.debug(f"Wrote non-error flag") + length = response_out.tell() + self.transport.write( + struct.pack(">L", length) + response_out.getvalue() ) - fastavro.schemaless_writer(response_out, response_schema, response) - except Exception as e: - self.logger.error(f"Caught exception: {type(e)} in message {name}") - self.logger.debug(traceback.format_exc()) - self.transport.write(struct.pack(">L", 1) + b"\1") - error_out = io.BytesIO() - fastavro.schemaless_writer(error_out, ["string"], repr(e)) - length = error_out.tell() - self.transport.write(struct.pack(">L", length) + error_out.getvalue()) - else: - self.transport.write(struct.pack(">L", 1) + b"\0") - self.logger.debug(f"Wrote non-error flag") - length = response_out.tell() - self.transport.write( - struct.pack(">L", length) + response_out.getvalue() - ) - self.logger.debug( - f"Wrote response {response}, {response_out.getvalue()}" - ) - self.transport.write(struct.pack(">L", 0)) - if name == "shutdown": - self.logger.debug("Closing transport") - self.transport.close() + self.logger.debug( + f"Wrote response {response}, {response_out.getvalue()}" + ) + self.transport.write(struct.pack(">L", 0)) + if name == "shutdown": + self.logger.debug("Closing transport") + self.transport.close() + except asyncio.CancelledError as e: + self.logger.debug("task cancellation caught") + await self.unpacker.__aexit__(None, None, None) + self.transport.close() + self.logger.debug(f"file closed? {self.unpacker._file.closed}") + raise e diff --git a/yaqd-core/yaqd_core/avrorpc/unpacker.py b/yaqd-core/yaqd_core/avrorpc/unpacker.py index 1d507f7..1c2207f 100644 --- a/yaqd-core/yaqd_core/avrorpc/unpacker.py +++ b/yaqd-core/yaqd_core/avrorpc/unpacker.py @@ -67,13 +67,20 @@ async def __anext__(self): except (ValueError, struct.error): await self.new_data.wait() + async def __aexit__(self, exc_type, exc_val, exc_tb): + logger.info("closing") + await asyncio.sleep(0) + self._file.close() + self.buf.close() + def feed(self, data: bytes): - # Must support random access, if it does not, must be fed externally (e.g. TCP) - pos = self._file.tell() - self._file.seek(0, 2) - self._file.write(data) - self._file.seek(pos) - self.new_data.set() + if not self._file.closed: + # Must support random access, if it does not, must be fed externally (e.g. TCP) + pos = self._file.tell() + self._file.seek(0, 2) + self._file.write(data) + self._file.seek(pos) + self.new_data.set() async def _read_object(self, schema): schema = fastavro.parse_schema( diff --git a/yaqd-fakes/yaqd_fakes/_fake_sensor.py b/yaqd-fakes/yaqd_fakes/_fake_sensor.py index c56f1b3..980115e 100644 --- a/yaqd-fakes/yaqd_fakes/_fake_sensor.py +++ b/yaqd-fakes/yaqd_fakes/_fake_sensor.py @@ -28,7 +28,7 @@ def __init__(self, name, config, config_filepath): self._channel_generators[name] = random_walk(min_, max_) else: raise Exception(f"channel kind {kwargs['kind']} not recognized") - asyncio.get_event_loop().create_task(self._update_measurements()) + self._loop.create_task(self._update_measurements()) async def _update_measurements(self): while True: