Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 14 additions & 2 deletions fastdeploy/engine/common_engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@
)
from fastdeploy.engine.resource_manager import ResourceManager
from fastdeploy.engine.sched.resource_manager_v1 import ResourceManagerV1
from fastdeploy.engine.sched.resource_manager_v2 import ResourceManagerV2
from fastdeploy.engine.sched.scheduler_metrics_logger import SchedulerMetricsLogger
from fastdeploy.eplb.utils import init_eplb_signals
from fastdeploy.input.preprocess import InputPreprocessor
Expand Down Expand Up @@ -171,7 +172,14 @@ def __init__(self, cfg: FDConfig, start_queue=True, use_async_llm=False):
self.scheduler = cfg.scheduler_config.scheduler()
self.enable_decode_cache_task = envs.FD_ENABLE_CACHE_TASK == "1"

if envs.ENABLE_V1_KVCACHE_SCHEDULER:
if envs.ENABLE_V2_KVCACHE_SCHEDULER:
if not envs.ENABLE_V1_KVCACHE_SCHEDULER:
raise ValueError(
"To enable v2 lock-free scheduler, v1 scheduler must be true (export ENABLE_V1_KVCACHE_SCHEDULER=1)."
)
self.llm_logger.info("Use V2 Lock-Free KVCache Scheduler")
self.resource_manager = ResourceManagerV2(cfg)
elif envs.ENABLE_V1_KVCACHE_SCHEDULER:
self.llm_logger.info("Use V1 KVCache Scheduler")
self.resource_manager = ResourceManagerV1(
cfg.scheduler_config.max_num_seqs,
Expand Down Expand Up @@ -1097,6 +1105,7 @@ def _fetch_request():
if self.engine_worker_queue.exist_tasks():
time.sleep(0.001)
continue

if self.cfg.scheduler_config.splitwise_role != "mixed":
if not is_fetching:
is_fetching = True
Expand All @@ -1118,7 +1127,10 @@ def _fetch_request():
if hasattr(self.resource_manager, "scheduler_unhandled_request_num"):
self.resource_manager.scheduler_unhandled_request_num = self._get_scheduler_unhandled_request_num()
# 2. Schedule requests
batch_request, error_tasks = self.resource_manager.schedule()
if self.cfg.scheduler_config.splitwise_role == "prefill":
batch_request, error_tasks = self.resource_manager.prefill_schedule()

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🔴 BugENABLE_V1_KVCACHE_SCHEDULER=0(使用默认 ResourceManager)且 splitwise_role=="prefill" 时,此处调用 self.resource_manager.prefill_schedule() 会触发 AttributeError,因为默认 ResourceManager 没有该方法。

prefill_schedule() 仅在 ResourceManagerV1(及继承它的 V2)中定义,但当前代码无条件地对所有 resource_manager 实例调用。

建议修复方式:

if self.cfg.scheduler_config.splitwise_role == "prefill" and hasattr(self.resource_manager, "prefill_schedule"):
    batch_request, error_tasks = self.resource_manager.prefill_schedule()
else:
    batch_request, error_tasks = self.resource_manager.schedule()

或者在 ResourceManager 基类中定义 prefill_schedule() 并回退到 schedule(),确保接口一致。

else:
batch_request, error_tasks = self.resource_manager.schedule()

# 3. Send to engine
if len(batch_request) > 0:
Expand Down
6 changes: 6 additions & 0 deletions fastdeploy/engine/sched/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,3 +13,9 @@
# See the License for the specific language governing permissions and
# limitations under the License.
"""

from fastdeploy.engine.sched.request_manager import RequestManager
from fastdeploy.engine.sched.resource_manager_v1 import ResourceManagerV1
from fastdeploy.engine.sched.resource_manager_v2 import ResourceManagerV2

__all__ = ["RequestManager", "ResourceManagerV1", "ResourceManagerV2"]
59 changes: 59 additions & 0 deletions fastdeploy/engine/sched/request_manager.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
"""
# Copyright (c) 2025 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""


class RequestManager:
"""
Unified interface for request slot management.

Wraps the existing stop_flags / tasks_list arrays from V1's
ResourceManager — no new data structures are created. All slot
operations should go through this interface rather than directly
manipulating the underlying arrays.

Thread-safety:
- acquire_slot / get_available_position / available_batch: called
only inside schedule() (single-writer), no locking needed.
- release_slot: also safe to call from other threads (e.g.
finish_requests) because single-element list assignment is
GIL-atomic under CPython.
"""

def __init__(self, stop_flags: list[bool], tasks_list: list):
self.stop_flags = stop_flags
self.tasks_list = tasks_list
self.max_num_seqs = len(stop_flags)

def acquire_slot(self, idx: int, task) -> None:
"""Occupy slot idx with task."""
self.stop_flags[idx] = False
self.tasks_list[idx] = task

def release_slot(self, idx: int) -> None:
"""Release a slot. Safe to call from any thread."""
self.stop_flags[idx] = True
self.tasks_list[idx] = None

def get_available_position(self) -> int:
"""Return the first available slot index, or raise RuntimeError."""
for idx in range(self.max_num_seqs):
if self.stop_flags[idx]:
return idx
raise RuntimeError("No available position for new request")

def available_batch(self) -> int:
"""Return the number of free slots."""
return sum(self.stop_flags)
40 changes: 40 additions & 0 deletions fastdeploy/engine/sched/resource_manager_v1.py
Original file line number Diff line number Diff line change
Expand Up @@ -1248,6 +1248,46 @@ def _allocate_decode_and_extend():

return batch_request, error_reqs

def prefill_schedule(self):
"""
P-instance-only fast path that replaces ``schedule()``.

After ``preallocate_resource_in_p`` all blocks are already
allocated and requests are already in ``self.running``
(via ``add_request_in_p``), so this method skips:

* ``can_allocate_gpu_blocks`` / ``_allocate_gpu_blocks`` re-check
* decode scheduling branch (P has no decode)
* waiting queue iteration (P bypasses the waiting queue)
* preemption / ``chunk_prefill_in_running_not_satisfied``
* ``_get_can_schedule_prefill_threshold_block`` computation

It directly iterates ``self.running`` and prepares prefill
tasks under the full ``max_num_batched_tokens`` budget.
"""
with self.lock:
# P instance has no decode — full budget for prefill
batch_request = BatchRequest()
token_budget = self.config.scheduler_config.max_num_batched_tokens

# Prepare prefill tasks for all running requests
for request in list(self.running):
if self._is_decoding(request):
# Request finished prefill, waiting for finish_requests.
continue

num_new_tokens = self._get_num_new_tokens(request, token_budget)
if num_new_tokens == 0:
continue

# Blocks were preallocated — skip block checking
batch_request.add_request(self._prepare_prefill_task(request, num_new_tokens))
token_budget -= num_new_tokens
request.num_computed_tokens += num_new_tokens

self.update_metrics()
return batch_request, []

def waiting_async_process(self, request: Request) -> None:
"""
Check if async preprocessing is complete for a request.
Expand Down
Loading
Loading