Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion fastdeploy/engine/common_engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -1097,6 +1097,7 @@ def _fetch_request():
if self.engine_worker_queue.exist_tasks():
time.sleep(0.001)
continue

if self.cfg.scheduler_config.splitwise_role != "mixed":
if not is_fetching:
is_fetching = True
Expand All @@ -1118,7 +1119,10 @@ def _fetch_request():
if hasattr(self.resource_manager, "scheduler_unhandled_request_num"):
self.resource_manager.scheduler_unhandled_request_num = self._get_scheduler_unhandled_request_num()
# 2. Schedule requests
batch_request, error_tasks = self.resource_manager.schedule()
if self.cfg.scheduler_config.splitwise_role == "prefill":
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🟡 建议 prefill_schedule() 仅在 resource_manager_v1 中实现,其他实现调用时会抛 AttributeError

common_engine.py 对所有 splitwise_role == "prefill" 的实例无条件调用 self.resource_manager.prefill_schedule(),但该方法目前只在 resource_manager_v1.py 中新增。若项目存在其他 resource manager 实现(如 v2 或未来新增版本),以 prefill 角色启动时会在此处崩溃。

注意到代码中已有类似的防御写法(hasattr 检查 scheduler_unhandled_request_num)。建议同步到其他 resource manager 实现,或添加防御检查:

if self.cfg.scheduler_config.splitwise_role == "prefill":
    if hasattr(self.resource_manager, "prefill_schedule"):
        batch_request, error_tasks = self.resource_manager.prefill_schedule()
    else:
        batch_request, error_tasks = self.resource_manager.schedule()
else:
    batch_request, error_tasks = self.resource_manager.schedule()

batch_request, error_tasks = self.resource_manager.prefill_schedule()
else:
batch_request, error_tasks = self.resource_manager.schedule()

# 3. Send to engine
if len(batch_request) > 0:
Expand Down
26 changes: 26 additions & 0 deletions fastdeploy/engine/sched/resource_manager_v1.py
Original file line number Diff line number Diff line change
Expand Up @@ -1248,6 +1248,32 @@ def _allocate_decode_and_extend():

return batch_request, error_reqs

def prefill_schedule(self):
with self.lock:
# P instance has no decode — full budget for prefill
batch_request = BatchRequest()
token_budget = self.config.scheduler_config.max_num_batched_tokens

assert len(self.waiting) == 0, "Prefill scheduler should not have waiting requests"
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🟡 建议 assert 用于运行时状态不变量检查

Python 以 -O(优化模式)运行时,assert 语句会被完全跳过。若 self.waiting 非空,该断言失效,后续调度将静默跳过所有 waiting 请求,造成请求饥饿且无任何日志警告。

建议改为显式异常或日志警告:

if len(self.waiting) != 0:
    llm_logger.warning(
        f"prefill_schedule: unexpected {len(self.waiting)} waiting requests, skipping them."
    )

或如果这是不可恢复的状态:

if len(self.waiting) != 0:
    raise RuntimeError(
        f"Prefill scheduler should not have waiting requests, got {len(self.waiting)}"
    )


# Prepare prefill tasks for all running requests
for request in list(self.running):
if self._is_decoding(request):
continue

num_new_tokens = self._get_num_new_tokens(request, token_budget)
if num_new_tokens == 0:
continue

# Add requests into scheduled batch as blocks were preallocated
llm_logger.debug(f"schedule prefill tasks {request.request_id} with {num_new_tokens} tokens")
batch_request.add_request(self._prepare_prefill_task(request, num_new_tokens))
token_budget -= num_new_tokens
request.num_computed_tokens += num_new_tokens

self.update_metrics()
return batch_request, []

def waiting_async_process(self, request: Request) -> None:
"""
Check if async preprocessing is complete for a request.
Expand Down
Loading