From 9e9a189fcf5fb7855f8c2fc2bec9e529132dcbfd Mon Sep 17 00:00:00 2001 From: Pavel Kirilin Date: Mon, 31 Mar 2025 22:45:36 +0200 Subject: [PATCH 1/4] Added more logs for scheduler. --- taskiq/cli/scheduler/run.py | 21 ++++++++++++++++++--- 1 file changed, 18 insertions(+), 3 deletions(-) diff --git a/taskiq/cli/scheduler/run.py b/taskiq/cli/scheduler/run.py index 88337b89..25d72ba4 100644 --- a/taskiq/cli/scheduler/run.py +++ b/taskiq/cli/scheduler/run.py @@ -129,9 +129,19 @@ async def delayed_send( :param task: task to send. :param delay: how long to wait. """ + logger.debug( + "Waiting %d seconds before sending task %s with schedule_id %s.", + delay, + task.task_name, + task.schedule_id, + ) if delay > 0: await asyncio.sleep(delay) - logger.info("Sending task %s.", task.task_name) + logger.info( + "Sending task %s with schedule_id %s.", + task.task_name, + task.schedule_id, + ) await scheduler.on_ready(source, task) @@ -150,12 +160,13 @@ async def run_scheduler_loop(scheduler: TaskiqScheduler) -> None: # We use this method to correctly sleep for one minute. scheduled_tasks = await get_all_schedules(scheduler) for source, task_list in scheduled_tasks.items(): + logger.debug("Got %d schedules from source %s.", len(task_list), source) for task in task_list: try: task_delay = get_task_delay(task) except ValueError: logger.warning( - "Cannot parse cron: %s for task: %s, schedule_id: %s", + "Cannot parse cron: %s for task: %s, schedule_id: %s.", task.cron, task.task_name, task.schedule_id, @@ -163,7 +174,7 @@ async def run_scheduler_loop(scheduler: TaskiqScheduler) -> None: continue if task_delay is not None: send_task = loop.create_task( - delayed_send(scheduler, source, task, task_delay), + delayed_send(scheduler, source, task, 1), ) running_schedules.add(send_task) send_task.add_done_callback(running_schedules.discard) @@ -171,6 +182,10 @@ async def run_scheduler_loop(scheduler: TaskiqScheduler) -> None: minutes=1, ) delay = next_minute - datetime.now() + logger.debug( + "Sleeping for %.2f seconds before getting schedules.", + delay.total_seconds(), + ) await asyncio.sleep(delay.total_seconds()) From 88b97b427c464f1460ad926aaa3815578b4ee5c4 Mon Sep 17 00:00:00 2001 From: Pavel Kirilin Date: Mon, 31 Mar 2025 22:59:34 +0200 Subject: [PATCH 2/4] FIxed tests. --- taskiq/cli/scheduler/run.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/taskiq/cli/scheduler/run.py b/taskiq/cli/scheduler/run.py index 25d72ba4..b1487599 100644 --- a/taskiq/cli/scheduler/run.py +++ b/taskiq/cli/scheduler/run.py @@ -174,7 +174,7 @@ async def run_scheduler_loop(scheduler: TaskiqScheduler) -> None: continue if task_delay is not None: send_task = loop.create_task( - delayed_send(scheduler, source, task, 1), + delayed_send(scheduler, source, task, task_delay), ) running_schedules.add(send_task) send_task.add_done_callback(running_schedules.discard) From 54dbbe5a67ea113fa3a8e10275c4edd3e590a44f Mon Sep 17 00:00:00 2001 From: Pavel Kirilin Date: Mon, 31 Mar 2025 23:00:31 +0200 Subject: [PATCH 3/4] Changed version for testing purposes. --- pyproject.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pyproject.toml b/pyproject.toml index 9a08b759..2beb88fd 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,6 +1,6 @@ [tool.poetry] name = "taskiq" -version = "0.0.0" +version = "0.11.16" description = "Distributed task queue with full async support" authors = ["Pavel Kirilin "] maintainers = ["Pavel Kirilin "] From bb0e107407da0738e15d34abae45c11fb3366419 Mon Sep 17 00:00:00 2001 From: Pavel Kirilin Date: Thu, 3 Apr 2025 23:53:43 +0200 Subject: [PATCH 4/4] Changed version back. --- pyproject.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pyproject.toml b/pyproject.toml index 2beb88fd..9a08b759 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,6 +1,6 @@ [tool.poetry] name = "taskiq" -version = "0.11.16" +version = "0.0.0" description = "Distributed task queue with full async support" authors = ["Pavel Kirilin "] maintainers = ["Pavel Kirilin "]