|
56 | 56 | # With SDK timeout of 15s and max_retries=2, this allows 3 × 15s = 45s + 15s buffer = 60s |
57 | 57 | EMBEDDING_TIMEOUT = 60 # Timeout in seconds for each embedding API call (including retries) |
58 | 58 | FILE_PROCESSING_TIMEOUT = 300 # Timeout in seconds for processing a single file (5 minutes) |
59 | | -_THREADPOOL_WORKERS = max(16, EMBEDDING_CONCURRENCY + 8) |
60 | | -_EXECUTOR = concurrent.futures.ThreadPoolExecutor(max_workers=_THREADPOOL_WORKERS) |
| 59 | + |
| 60 | +# Separate executors to avoid deadlock: |
| 61 | +# - File executor: runs _process_file_sync tasks (one per file) |
| 62 | +# - Embedding executor: runs _get_embedding_with_semaphore tasks (multiple per file) |
| 63 | +# Without separation, all file threads can block waiting for embedding results that can't run |
| 64 | +# because all threads are occupied by file tasks. |
| 65 | +_FILE_EXECUTOR_WORKERS = 16 |
| 66 | +_EMBEDDING_EXECUTOR_WORKERS = max(16, EMBEDDING_CONCURRENCY + 8) |
| 67 | +_FILE_EXECUTOR = concurrent.futures.ThreadPoolExecutor(max_workers=_FILE_EXECUTOR_WORKERS) |
| 68 | +_EMBEDDING_EXECUTOR = concurrent.futures.ThreadPoolExecutor(max_workers=_EMBEDDING_EXECUTOR_WORKERS) |
61 | 69 |
|
62 | 70 | logger = get_logger(__name__) |
63 | 71 |
|
@@ -219,7 +227,7 @@ def _process_file_sync( |
219 | 227 | for idx, chunk_doc in batch: |
220 | 228 | # Submit task to executor; semaphore will be acquired inside the worker |
221 | 229 | embedding_start_time = time.time() |
222 | | - future = _EXECUTOR.submit(_get_embedding_with_semaphore, semaphore, chunk_doc.text, rel_path, idx, embedding_model) |
| 230 | + future = _EMBEDDING_EXECUTOR.submit(_get_embedding_with_semaphore, semaphore, chunk_doc.text, rel_path, idx, embedding_model) |
223 | 231 | embedding_futures.append((idx, chunk_doc, future, embedding_start_time)) |
224 | 232 |
|
225 | 233 | # Wait for batch to complete and store results |
@@ -399,7 +407,7 @@ def analyze_local_path_sync( |
399 | 407 | counters[0] += 1 |
400 | 408 | file_num = counters[0] |
401 | 409 |
|
402 | | - fut = _EXECUTOR.submit( |
| 410 | + fut = _FILE_EXECUTOR.submit( |
403 | 411 | _process_file_sync, |
404 | 412 | semaphore, |
405 | 413 | database_path, |
|
0 commit comments