Skip to content

Commit dfe64b4

Browse files
committed
refactor asyncio loop
1 parent 6374e57 commit dfe64b4

File tree

2 files changed

+26
-21
lines changed

2 files changed

+26
-21
lines changed

yaqd-core/yaqd_core/_is_daemon.py

Lines changed: 25 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -123,17 +123,6 @@ def _traits(cls) -> List[str]:
123123

124124
@classmethod
125125
def main(cls):
126-
"""Run the event loop."""
127-
loop = asyncio.get_event_loop()
128-
if sys.platform.startswith("win"):
129-
signals = ()
130-
else:
131-
signals = (signal.SIGHUP, signal.SIGTERM, signal.SIGINT)
132-
for s in signals:
133-
loop.add_signal_handler(
134-
s, lambda s=s: asyncio.create_task(cls.shutdown_all(s, loop))
135-
)
136-
137126
parser = argparse.ArgumentParser()
138127
parser.add_argument(
139128
"--config",
@@ -199,19 +188,28 @@ def main(cls):
199188
with open(config_filepath, "rb") as f:
200189
config_file = tomli.load(f)
201190

202-
loop.create_task(cls._main(config_filepath, config_file, args))
191+
# Run the event loop
203192
try:
204-
loop.run_forever()
193+
asyncio.run(
194+
cls._main(config_filepath, config_file, args)
195+
)
205196
except asyncio.CancelledError:
206197
pass
207-
finally:
208-
loop.close()
209198

210199
@classmethod
211200
async def _main(cls, config_filepath, config_file, args=None):
212201
"""Parse command line arguments, start event loop tasks."""
213202
loop = asyncio.get_running_loop()
214-
cls.__servers = []
203+
if sys.platform.startswith("win"):
204+
signals = ()
205+
else:
206+
signals = (signal.SIGHUP, signal.SIGTERM, signal.SIGINT)
207+
for s in signals:
208+
loop.add_signal_handler(
209+
s, lambda s=s: asyncio.create_task(cls.shutdown_all(s, loop))
210+
)
211+
212+
cls.__servers = set()
215213
for section in config_file:
216214
if section == "shared-settings":
217215
continue
@@ -225,7 +223,7 @@ async def _main(cls, config_filepath, config_file, args=None):
225223

226224
while cls.__servers:
227225
awaiting = cls.__servers
228-
cls.__servers = []
226+
cls.__servers = set()
229227
await asyncio.wait(awaiting)
230228
await asyncio.sleep(1)
231229
loop.stop()
@@ -252,7 +250,11 @@ def server(daemon):
252250
server(daemon), config.get("host", ""), config.get("port", None)
253251
)
254252
daemon._server = ser
255-
cls.__servers.append(asyncio.create_task(ser.serve_forever()))
253+
# cls.__servers.add(asyncio.create_task())
254+
task = asyncio.create_task(ser.serve_forever())
255+
cls.__servers.add(task)
256+
# Add a done callback to remove the task from the set when it completes
257+
task.add_done_callback(cls.__servers.discard)
256258

257259
@classmethod
258260
def _parse_config(cls, config_file, section, args=None):
@@ -300,13 +302,16 @@ async def shutdown_all(cls, sig, loop):
300302
tasks = [
301303
t for t in asyncio.all_tasks()
302304
if (
303-
t is not asyncio.current_task()
304-
and "serve_forever" not in t.get_coro().__repr__()
305+
t is not asyncio.current_task()
306+
# and "serve_forever" not in t.get_coro().__repr__()
305307
)
306308
]
309+
logger.info("gathering")
307310
for task in tasks:
308311
logger.info(task.get_coro())
309312
await asyncio.gather(*tasks, return_exceptions=True)
313+
logger.info("gathered")
314+
logger.info([task.get_coro() for task in asyncio.all_tasks()])
310315
[d._save_state() for d in cls._daemons]
311316
if hasattr(signal, "SIGHUP") and sig == signal.SIGHUP:
312317
config_filepath = [d._config_filepath for d in cls._daemons][0]

yaqd-core/yaqd_core/_protocol.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@ def connection_made(self, transport):
2828
self.transport = transport
2929
self.unpacker = avrorpc.Unpacker(self._avro_protocol)
3030
self._daemon._connection_made(peername)
31-
self.task = asyncio.get_event_loop().create_task(self.process_requests())
31+
self.task = asyncio.get_running_loop().create_task(self.process_requests())
3232

3333
def data_received(self, data):
3434
"""Process an incomming request."""

0 commit comments

Comments
 (0)