@@ -129,9 +129,19 @@ async def delayed_send(
129129 :param task: task to send.
130130 :param delay: how long to wait.
131131 """
132+ logger .debug (
133+ "Waiting %d seconds before sending task %s with schedule_id %s." ,
134+ delay ,
135+ task .task_name ,
136+ task .schedule_id ,
137+ )
132138 if delay > 0 :
133139 await asyncio .sleep (delay )
134- logger .info ("Sending task %s." , task .task_name )
140+ logger .info (
141+ "Sending task %s with schedule_id %s." ,
142+ task .task_name ,
143+ task .schedule_id ,
144+ )
135145 await scheduler .on_ready (source , task )
136146
137147
@@ -150,27 +160,32 @@ async def run_scheduler_loop(scheduler: TaskiqScheduler) -> None:
150160 # We use this method to correctly sleep for one minute.
151161 scheduled_tasks = await get_all_schedules (scheduler )
152162 for source , task_list in scheduled_tasks .items ():
163+ logger .debug ("Got %d schedules from source %s." , len (task_list ), source )
153164 for task in task_list :
154165 try :
155166 task_delay = get_task_delay (task )
156167 except ValueError :
157168 logger .warning (
158- "Cannot parse cron: %s for task: %s, schedule_id: %s" ,
169+ "Cannot parse cron: %s for task: %s, schedule_id: %s. " ,
159170 task .cron ,
160171 task .task_name ,
161172 task .schedule_id ,
162173 )
163174 continue
164175 if task_delay is not None :
165176 send_task = loop .create_task (
166- delayed_send (scheduler , source , task , task_delay ),
177+ delayed_send (scheduler , source , task , 1 ),
167178 )
168179 running_schedules .add (send_task )
169180 send_task .add_done_callback (running_schedules .discard )
170181 next_minute = datetime .now ().replace (second = 0 , microsecond = 0 ) + timedelta (
171182 minutes = 1 ,
172183 )
173184 delay = next_minute - datetime .now ()
185+ logger .debug (
186+ "Sleeping for %.2f seconds before getting schedules." ,
187+ delay .total_seconds (),
188+ )
174189 await asyncio .sleep (delay .total_seconds ())
175190
176191
0 commit comments