diff --git a/src/task_processor/processor.py b/src/task_processor/processor.py index 501c993..daab3ae 100644 --- a/src/task_processor/processor.py +++ b/src/task_processor/processor.py @@ -128,12 +128,17 @@ def _run_task( ) task_run: AnyTaskRun = task.task_runs.model(started_at=timezone.now(), task=task) # type: ignore[attr-defined] result: str + executor = None try: - with ThreadPoolExecutor(max_workers=1) as executor: - future = executor.submit(task.run) - timeout = task.timeout.total_seconds() if task.timeout else None - future.result(timeout=timeout) # Wait for completion or timeout + # Use explicit executor management to avoid blocking on shutdown + # when tasks timeout but continue running in worker threads. + # The default context manager behavior (wait=True) would block + # the TaskRunner thread indefinitely waiting for stuck workers. + executor = ThreadPoolExecutor(max_workers=1) + future = executor.submit(task.run) + timeout = task.timeout.total_seconds() if task.timeout else None + future.result(timeout=timeout) # Wait for completion or timeout task_run.result = result = TaskResult.SUCCESS.value task_run.finished_at = timezone.now() @@ -176,6 +181,13 @@ def _run_task( delay_until, ) + finally: + # Always shutdown the executor without waiting for worker threads. + # This prevents the TaskRunner thread from blocking indefinitely + # when a task times out but continues running in a worker thread. + if executor is not None: + executor.shutdown(wait=False) + labels = { "task_identifier": task_identifier, "task_type": registered_task.task_type.value.lower(), diff --git a/tests/unit/task_processor/test_unit_task_processor_processor.py b/tests/unit/task_processor/test_unit_task_processor_processor.py index 52b05f3..cc86442 100644 --- a/tests/unit/task_processor/test_unit_task_processor_processor.py +++ b/tests/unit/task_processor/test_unit_task_processor_processor.py @@ -972,3 +972,47 @@ def my_task() -> None: # Then recurring_task.refresh_from_db(using=current_database) assert recurring_task.is_locked is False + + +@pytest.mark.multi_database +@pytest.mark.task_processor_mode +def test_run_task_does_not_block_on_timeout( + current_database: str, + sleep_task: TaskHandler[[int]], +) -> None: + """ + Verify that when a task times out, the calling thread (TaskRunner) + does not block indefinitely waiting for the worker thread to finish. + + """ + # Given - a task that will take longer than the timeout + task = Task.create( + sleep_task.task_identifier, + scheduled_for=timezone.now(), + args=(10,), # Task will sleep for 10 seconds + timeout=timedelta(milliseconds=100), # But timeout after 100ms + ) + task.save(using=current_database) + + # When - we run the task + start_time = time.time() + task_runs = run_tasks(current_database) + elapsed_time = time.time() - start_time + + # Then - the function should return quickly (within ~1 second) + # Not block for 10 seconds waiting for the worker thread + assert elapsed_time < 2.0, ( + f"run_tasks blocked for {elapsed_time:.2f} seconds, " + "indicating it's waiting for the worker thread to finish" + ) + + # And the task should be marked as failed due to timeout + assert len(task_runs) == 1 + task_run = task_runs[0] + assert task_run.result == TaskResult.FAILURE.value + assert task_run.error_details is not None + assert "TimeoutError" in task_run.error_details + + task.refresh_from_db(using=current_database) + assert task.completed is False + assert task.num_failures == 1