From d7f530cd0cea4f925b8500d2cf1d39b843c06ec0 Mon Sep 17 00:00:00 2001 From: liyonghua0910 Date: Wed, 27 May 2026 04:05:39 +0000 Subject: [PATCH 1/2] [feature] provide lock free scheduler --- fastdeploy/engine/common_engine.py | 10 +- fastdeploy/engine/sched/__init__.py | 6 + fastdeploy/engine/sched/request_manager.py | 59 +++ .../engine/sched/resource_manager_v2.py | 337 ++++++++++++++++++ fastdeploy/envs.py | 2 + fastdeploy/output/token_processor.py | 16 +- 6 files changed, 427 insertions(+), 3 deletions(-) create mode 100644 fastdeploy/engine/sched/request_manager.py create mode 100644 fastdeploy/engine/sched/resource_manager_v2.py diff --git a/fastdeploy/engine/common_engine.py b/fastdeploy/engine/common_engine.py index 64c0058db06..5e0ca41f593 100644 --- a/fastdeploy/engine/common_engine.py +++ b/fastdeploy/engine/common_engine.py @@ -53,6 +53,7 @@ ) from fastdeploy.engine.resource_manager import ResourceManager from fastdeploy.engine.sched.resource_manager_v1 import ResourceManagerV1 +from fastdeploy.engine.sched.resource_manager_v2 import ResourceManagerV2 from fastdeploy.engine.sched.scheduler_metrics_logger import SchedulerMetricsLogger from fastdeploy.eplb.utils import init_eplb_signals from fastdeploy.input.preprocess import InputPreprocessor @@ -171,7 +172,14 @@ def __init__(self, cfg: FDConfig, start_queue=True, use_async_llm=False): self.scheduler = cfg.scheduler_config.scheduler() self.enable_decode_cache_task = envs.FD_ENABLE_CACHE_TASK == "1" - if envs.ENABLE_V1_KVCACHE_SCHEDULER: + if envs.ENABLE_V2_KVCACHE_SCHEDULER: + if not envs.ENABLE_V1_KVCACHE_SCHEDULER: + raise ValueError( + "To enable v2 lock-free scheduler, v1 scheduler must be true (export ENABLE_V1_KVCACHE_SCHEDULER=1)." + ) + self.llm_logger.info("Use V2 Lock-Free KVCache Scheduler") + self.resource_manager = ResourceManagerV2(cfg) + elif envs.ENABLE_V1_KVCACHE_SCHEDULER: self.llm_logger.info("Use V1 KVCache Scheduler") self.resource_manager = ResourceManagerV1( cfg.scheduler_config.max_num_seqs, diff --git a/fastdeploy/engine/sched/__init__.py b/fastdeploy/engine/sched/__init__.py index f4ede90624a..2d019306c5d 100644 --- a/fastdeploy/engine/sched/__init__.py +++ b/fastdeploy/engine/sched/__init__.py @@ -13,3 +13,9 @@ # See the License for the specific language governing permissions and # limitations under the License. """ + +from fastdeploy.engine.sched.request_manager import RequestManager +from fastdeploy.engine.sched.resource_manager_v1 import ResourceManagerV1 +from fastdeploy.engine.sched.resource_manager_v2 import ResourceManagerV2 + +__all__ = ["RequestManager", "ResourceManagerV1", "ResourceManagerV2"] diff --git a/fastdeploy/engine/sched/request_manager.py b/fastdeploy/engine/sched/request_manager.py new file mode 100644 index 00000000000..df5ef9df5c8 --- /dev/null +++ b/fastdeploy/engine/sched/request_manager.py @@ -0,0 +1,59 @@ +""" +# Copyright (c) 2025 PaddlePaddle Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License" +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +""" + + +class RequestManager: + """ + Unified interface for request slot management. + + Wraps the existing stop_flags / tasks_list arrays from V1's + ResourceManager — no new data structures are created. All slot + operations should go through this interface rather than directly + manipulating the underlying arrays. + + Thread-safety: + - acquire_slot / get_available_position / available_batch: called + only inside schedule() (single-writer), no locking needed. + - release_slot: also safe to call from other threads (e.g. + finish_requests) because single-element list assignment is + GIL-atomic under CPython. + """ + + def __init__(self, stop_flags: list[bool], tasks_list: list): + self.stop_flags = stop_flags + self.tasks_list = tasks_list + self.max_num_seqs = len(stop_flags) + + def acquire_slot(self, idx: int, task) -> None: + """Occupy slot idx with task.""" + self.stop_flags[idx] = False + self.tasks_list[idx] = task + + def release_slot(self, idx: int) -> None: + """Release a slot. Safe to call from any thread.""" + self.stop_flags[idx] = True + self.tasks_list[idx] = None + + def get_available_position(self) -> int: + """Return the first available slot index, or raise RuntimeError.""" + for idx in range(self.max_num_seqs): + if self.stop_flags[idx]: + return idx + raise RuntimeError("No available position for new request") + + def available_batch(self) -> int: + """Return the number of free slots.""" + return sum(self.stop_flags) diff --git a/fastdeploy/engine/sched/resource_manager_v2.py b/fastdeploy/engine/sched/resource_manager_v2.py new file mode 100644 index 00000000000..b23466ca45e --- /dev/null +++ b/fastdeploy/engine/sched/resource_manager_v2.py @@ -0,0 +1,337 @@ +""" +# Copyright (c) 2025 PaddlePaddle Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License" +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +ResourceManagerV2 — Lock-free scheduling via single-writer message queue +========================================================================= + +Design +------ +V2 inherits V1 and reuses **all** logic from V1's methods. The only +difference is how external threads invoke state-mutating methods: + +- V1: external threads acquire ``self.lock`` and run the method body. +- V2: external threads enqueue a message; ``schedule()`` drains the queue + and calls ``super().method()`` directly. + +Since ``self.lock`` is replaced with a ``_NoOpLock``, every +``with self.lock:`` in V1 becomes a transparent pass-through. V2 contains +**zero duplicated logic** — all message processors delegate to the parent +class via ``super()``. +""" + +import threading +import time +import traceback +from collections import deque +from collections.abc import Iterable +from typing import Union + +from fastdeploy.engine.request import Request, RequestOutput +from fastdeploy.engine.sched.request_manager import RequestManager +from fastdeploy.engine.sched.resource_manager_v1 import ResourceManagerV1 +from fastdeploy.utils import llm_logger + +# --------------------------------------------------------------------------- +# No-op lock — makes every ``with self.lock:`` a transparent pass-through +# --------------------------------------------------------------------------- + + +class _NoOpLock: + """Context manager that does nothing. Replaces V1's global lock.""" + + def __enter__(self): + return self + + def __exit__(self, *args): + pass + + def acquire(self, *args, **kwargs): + pass + + def release(self): + pass + + +# --------------------------------------------------------------------------- +# Message types +# --------------------------------------------------------------------------- + + +class _Msg: + ADD_REQUEST = "add_request" + FINISH_REQUESTS = "finish_requests" + RESCHEDULE_PREEMPT = "reschedule_preempt" + RECYCLE_ABORT = "recycle_abort" + ADD_ABORT_IDS = "add_abort_ids" + CACHE_OUTPUT_TOKENS = "cache_output_tokens" + PRE_RECYCLE_RESOURCE = "pre_recycle_resource" + ADD_REQUEST_IN_P = "add_request_in_p" + PREALLOCATE_IN_P = "preallocate_in_p" + PREALLOCATE_IN_D = "preallocate_in_d" + HAS_RESOURCE_FOR_PREFILLED = "has_resource_for_prefilled" + ADD_PREFILLED_REQUEST = "add_prefilled_request" + + +class ResourceManagerV2(ResourceManagerV1): + """ + Lock-free scheduler that reuses V1's logic verbatim via ``super()``. + + External threads do not acquire any lock. Instead they enqueue messages + into ``_msg_queue``. ``schedule()`` drains the queue, then delegates to + ``super().schedule()`` which runs with the no-op lock — effectively + lock-free. + """ + + def __init__(self, config): + super().__init__( + config.scheduler_config.max_num_seqs, + config, + config.parallel_config.tensor_parallel_size, + config.scheduler_config.splitwise_role, + config.parallel_config.local_data_parallel_id, + ) + # ---- Replace the global lock with a no-op ---- + self.lock = _NoOpLock() + + # ---- Unified slot management interface (wraps V1's arrays) ---- + self._req_mgr = RequestManager(self.stop_flags, self.tasks_list) + + # ---- Lock-free message queue ---- + self._msg_queue: deque = deque() + self._msg_lock = threading.Lock() # only protects _msg_queue + + # ---- Futures for synchronous query methods (P/D disaggregation) ---- + self._result_futures: dict[str, threading.Event] = {} + self._result_values: dict[str, object] = {} + + # ================================================================== + # Message queue helpers + # ================================================================== + + def _enqueue_msg(self, msg_type: str, *args) -> None: + with self._msg_lock: + self._msg_queue.append((msg_type, *args)) + + def _drain_messages(self) -> int: + """Drain and process all pending messages. Called inside schedule().""" + with self._msg_lock: + msgs = list(self._msg_queue) + self._msg_queue.clear() + + count = 0 + for msg in msgs: + msg_type = msg[0] + args = msg[1:] + try: + handler = self._MSG_HANDLERS.get(msg_type) + if handler is not None: + handler(self, args) + else: + llm_logger.warning(f"Unknown message type: {msg_type}") + except Exception as e: + llm_logger.error(f"Error processing message {msg_type}: {e}, {traceback.format_exc()}") + count += 1 + return count + + # ================================================================== + # schedule() — drain messages, then delegate to V1 (lock-free) + # ================================================================== + + def schedule(self): + """ + Drain pending messages, then delegate to V1's schedule(). + V1's ``with self.lock:`` becomes a no-op because ``self.lock`` + is a ``_NoOpLock``. + """ + self._drain_messages() + return super().schedule() + + # ================================================================== + # External-thread API — override V1 to use message queue + # ================================================================== + + def add_request(self, request: Request) -> None: + """Enqueue an add-request message. Non-blocking.""" + # apply_async_preprocess is called here (before enqueuing) so that + # the async download starts as early as possible. The processor + # only does waiting.append + requests[...] — it must NOT call + # super().add_request() because that would call apply_async_preprocess + # a second time, submitting a duplicate download task. + self.apply_async_preprocess(request) + self._enqueue_msg(_Msg.ADD_REQUEST, request) + + def finish_requests(self, request_ids: Union[str, Iterable[str]]) -> None: + """ + Immediate slot release + deferred block recycle. + + Slot release (stop_flags[idx] = True) is the most latency-sensitive + operation — it directly controls available_batch() which gates how + many new requests schedule() can accept. A single list element + assignment is GIL-atomic, so we do it right here without any lock. + + Block recycle + cache write + state cleanup are deferred to the + message queue so they run in schedule()'s drain phase. This avoids + concurrent modification of self.running (which schedule iterates) + and keeps I/O off the caller's thread. + """ + if isinstance(request_ids, str): + request_ids = (request_ids,) + else: + request_ids = set(request_ids) + + # Immediate: release slots via RequestManager + for req_id in request_ids: + request = self.requests.get(req_id) + if request is not None: + self._req_mgr.release_slot(request.idx) + + # Deferred: block recycle + cache write + state cleanup + self._enqueue_msg(_Msg.FINISH_REQUESTS, request_ids) + + def reschedule_preempt_task(self, request_id, process_func=None): + """Enqueue a reschedule message. Non-blocking.""" + self._enqueue_msg(_Msg.RESCHEDULE_PREEMPT, request_id, process_func) + + def recycle_abort_task(self, request_id): + """Enqueue a recycle-abort message. Non-blocking.""" + self._enqueue_msg(_Msg.RECYCLE_ABORT, request_id) + + def add_abort_req_ids(self, req_ids): + """Enqueue an add-abort-ids message. Non-blocking.""" + self._enqueue_msg(_Msg.ADD_ABORT_IDS, req_ids) + + def cache_output_tokens(self, request): + """Enqueue a cache-output-tokens message. Non-blocking.""" + self._enqueue_msg(_Msg.CACHE_OUTPUT_TOKENS, request) + + def pre_recycle_resource(self, request_id: str): + """Enqueue a pre-recycle-resource message. Non-blocking.""" + self._enqueue_msg(_Msg.PRE_RECYCLE_RESOURCE, request_id) + + def add_request_in_p(self, requests: list[Request]): + """Enqueue an add-request-in-p message. Non-blocking.""" + self._enqueue_msg(_Msg.ADD_REQUEST_IN_P, requests) + + # ------------------------------------------------------------------ + # Synchronous query methods (P/D disaggregation) + # ------------------------------------------------------------------ + + def preallocate_resource_in_p(self, request: Request): + return self._sync_query(_Msg.PREALLOCATE_IN_P, request) + + def preallocate_resource_in_d(self, request: Request): + return self._sync_query(_Msg.PREALLOCATE_IN_D, request) + + def has_resource_for_prefilled_req(self, request_id: str): + return self._sync_query(_Msg.HAS_RESOURCE_FOR_PREFILLED, request_id) + + def add_prefilled_request(self, request_output: RequestOutput): + return self._sync_query(_Msg.ADD_PREFILLED_REQUEST, request_output) + + def _sync_query(self, msg_type: str, payload): + """Submit a synchronous query and wait for schedule() to process it.""" + query_id = f"{msg_type}_{id(payload)}_{time.time()}" + event = threading.Event() + self._result_futures[query_id] = event + self._enqueue_msg(msg_type, payload, query_id) + event.wait() # block until schedule() processes and signals + result = self._result_values.pop(query_id, None) + self._result_futures.pop(query_id, None) + return result + + def _set_query_result(self, query_id: str, result) -> None: + self._result_values[query_id] = result + event = self._result_futures.get(query_id) + if event is not None: + event.set() + + # ================================================================== + # Message processors — all delegate to super(), zero code duplication + # ================================================================== + + def _process_add_request(self, args) -> None: + """ + Only handler that does NOT call super().add_request(), because + apply_async_preprocess was already called before enqueuing. + """ + request = args[0] + self.waiting.append(request) + self.requests[request.request_id] = request + + def _process_finish_requests(self, args) -> None: + super().finish_requests(args[0]) + + def _process_reschedule_preempt(self, args) -> None: + request_id = args[0] + process_func = args[1] if len(args) > 1 else None + super().reschedule_preempt_task(request_id, process_func) + + def _process_recycle_abort(self, args) -> None: + super().recycle_abort_task(args[0]) + + def _process_add_abort_ids(self, args) -> None: + super().add_abort_req_ids(args[0]) + + def _process_cache_output_tokens(self, args) -> None: + super().cache_output_tokens(args[0]) + + def _process_pre_recycle_resource(self, args) -> None: + super().pre_recycle_resource(args[0]) + + def _process_add_request_in_p(self, args) -> None: + super().add_request_in_p(args[0]) + + # ------------------------------------------------------------------ + # Synchronous query processors — call super() + set Future result + # ------------------------------------------------------------------ + + def _process_preallocate_in_p(self, args) -> None: + request, query_id = args[0], args[1] + result = super().preallocate_resource_in_p(request) + self._set_query_result(query_id, result) + + def _process_preallocate_in_d(self, args) -> None: + request, query_id = args[0], args[1] + result = super().preallocate_resource_in_d(request) + self._set_query_result(query_id, result) + + def _process_has_resource_for_prefilled(self, args) -> None: + request_id, query_id = args[0], args[1] + result = super().has_resource_for_prefilled_req(request_id) + self._set_query_result(query_id, result) + + def _process_add_prefilled_request(self, args) -> None: + request_output, query_id = args[0], args[1] + super().add_prefilled_request(request_output) + self._set_query_result(query_id, True) + + # ------------------------------------------------------------------ + # Handler dispatch table + # ------------------------------------------------------------------ + + _MSG_HANDLERS = { + _Msg.ADD_REQUEST: _process_add_request, + _Msg.FINISH_REQUESTS: _process_finish_requests, + _Msg.RESCHEDULE_PREEMPT: _process_reschedule_preempt, + _Msg.RECYCLE_ABORT: _process_recycle_abort, + _Msg.ADD_ABORT_IDS: _process_add_abort_ids, + _Msg.CACHE_OUTPUT_TOKENS: _process_cache_output_tokens, + _Msg.PRE_RECYCLE_RESOURCE: _process_pre_recycle_resource, + _Msg.ADD_REQUEST_IN_P: _process_add_request_in_p, + _Msg.PREALLOCATE_IN_P: _process_preallocate_in_p, + _Msg.PREALLOCATE_IN_D: _process_preallocate_in_d, + _Msg.HAS_RESOURCE_FOR_PREFILLED: _process_has_resource_for_prefilled, + _Msg.ADD_PREFILLED_REQUEST: _process_add_prefilled_request, + } diff --git a/fastdeploy/envs.py b/fastdeploy/envs.py index 8e0344384b6..c8b9f5496d3 100644 --- a/fastdeploy/envs.py +++ b/fastdeploy/envs.py @@ -104,6 +104,8 @@ def _validate_split_kv_size(value: int) -> int: "EXPORTER_OTLP_HEADERS": lambda: os.getenv("EXPORTER_OTLP_HEADERS"), # enable kv cache block scheduler v1 (no need for kv_cache_ratio) "ENABLE_V1_KVCACHE_SCHEDULER": lambda: int(os.getenv("ENABLE_V1_KVCACHE_SCHEDULER", "1")), + # enable lock-free kv cache block scheduler v2 (message queue based, overrides V1) + "ENABLE_V2_KVCACHE_SCHEDULER": lambda: int(os.getenv("ENABLE_V2_KVCACHE_SCHEDULER", "0")), # set prealloc block num for decoder "FD_ENC_DEC_BLOCK_NUM": lambda: int(os.getenv("FD_ENC_DEC_BLOCK_NUM", "2")), # enbale max prefill of one execute step diff --git a/fastdeploy/output/token_processor.py b/fastdeploy/output/token_processor.py index f0cd22e1309..80f3e4be8ac 100644 --- a/fastdeploy/output/token_processor.py +++ b/fastdeploy/output/token_processor.py @@ -602,7 +602,13 @@ def _recycle_resources(self, task_id, index, task, result=None, is_prefill=False trace_print(LoggingEventName.CHECK_CACHE_TRANSFER_END, task_id, getattr(task, "user", "")) result.metrics.send_request_output_to_decode_time = time.time() self.split_connector.send_first_token(task.disaggregate_info, [result]) - if envs.ENABLE_V1_KVCACHE_SCHEDULER: + if envs.ENABLE_V2_KVCACHE_SCHEDULER: + if not envs.ENABLE_V1_KVCACHE_SCHEDULER: + raise ValueError( + "To enable v2 lock-free scheduler, v1 scheduler must be true (export ENABLE_V1_KVCACHE_SCHEDULER=1)." + ) + self.resource_manager.finish_requests(task_id) + elif envs.ENABLE_V1_KVCACHE_SCHEDULER: self.resource_manager.finish_requests_async(task_id) else: self.resource_manager.stop_flags[index] = True @@ -617,7 +623,13 @@ def _recycle_resources(self, task_id, index, task, result=None, is_prefill=False llm_logger.warning(f"wait for sending cache, {task_id}") time.sleep(0.002) else: - if envs.ENABLE_V1_KVCACHE_SCHEDULER: + if envs.ENABLE_V2_KVCACHE_SCHEDULER: + if not envs.ENABLE_V1_KVCACHE_SCHEDULER: + raise ValueError( + "To enable v2 lock-free scheduler, v1 scheduler must be true (export ENABLE_V1_KVCACHE_SCHEDULER=1)." + ) + self.resource_manager.finish_requests(task_id) + elif envs.ENABLE_V1_KVCACHE_SCHEDULER: self.resource_manager.finish_requests_async(task_id) else: self.resource_manager.stop_flags[index] = True From bb9eec458699946bbf1aafce9431939447b7f295 Mon Sep 17 00:00:00 2001 From: liyonghua0910 Date: Wed, 27 May 2026 07:41:26 +0000 Subject: [PATCH 2/2] [feat] prefill schedule --- fastdeploy/engine/common_engine.py | 6 ++- .../engine/sched/resource_manager_v1.py | 40 +++++++++++++++++++ .../engine/sched/resource_manager_v2.py | 8 ++++ 3 files changed, 53 insertions(+), 1 deletion(-) diff --git a/fastdeploy/engine/common_engine.py b/fastdeploy/engine/common_engine.py index 5e0ca41f593..17ac71d2847 100644 --- a/fastdeploy/engine/common_engine.py +++ b/fastdeploy/engine/common_engine.py @@ -1105,6 +1105,7 @@ def _fetch_request(): if self.engine_worker_queue.exist_tasks(): time.sleep(0.001) continue + if self.cfg.scheduler_config.splitwise_role != "mixed": if not is_fetching: is_fetching = True @@ -1126,7 +1127,10 @@ def _fetch_request(): if hasattr(self.resource_manager, "scheduler_unhandled_request_num"): self.resource_manager.scheduler_unhandled_request_num = self._get_scheduler_unhandled_request_num() # 2. Schedule requests - batch_request, error_tasks = self.resource_manager.schedule() + if self.cfg.scheduler_config.splitwise_role == "prefill": + batch_request, error_tasks = self.resource_manager.prefill_schedule() + else: + batch_request, error_tasks = self.resource_manager.schedule() # 3. Send to engine if len(batch_request) > 0: diff --git a/fastdeploy/engine/sched/resource_manager_v1.py b/fastdeploy/engine/sched/resource_manager_v1.py index 372fd4e490f..8c34eefbc83 100644 --- a/fastdeploy/engine/sched/resource_manager_v1.py +++ b/fastdeploy/engine/sched/resource_manager_v1.py @@ -1248,6 +1248,46 @@ def _allocate_decode_and_extend(): return batch_request, error_reqs + def prefill_schedule(self): + """ + P-instance-only fast path that replaces ``schedule()``. + + After ``preallocate_resource_in_p`` all blocks are already + allocated and requests are already in ``self.running`` + (via ``add_request_in_p``), so this method skips: + + * ``can_allocate_gpu_blocks`` / ``_allocate_gpu_blocks`` re-check + * decode scheduling branch (P has no decode) + * waiting queue iteration (P bypasses the waiting queue) + * preemption / ``chunk_prefill_in_running_not_satisfied`` + * ``_get_can_schedule_prefill_threshold_block`` computation + + It directly iterates ``self.running`` and prepares prefill + tasks under the full ``max_num_batched_tokens`` budget. + """ + with self.lock: + # P instance has no decode — full budget for prefill + batch_request = BatchRequest() + token_budget = self.config.scheduler_config.max_num_batched_tokens + + # Prepare prefill tasks for all running requests + for request in list(self.running): + if self._is_decoding(request): + # Request finished prefill, waiting for finish_requests. + continue + + num_new_tokens = self._get_num_new_tokens(request, token_budget) + if num_new_tokens == 0: + continue + + # Blocks were preallocated — skip block checking + batch_request.add_request(self._prepare_prefill_task(request, num_new_tokens)) + token_budget -= num_new_tokens + request.num_computed_tokens += num_new_tokens + + self.update_metrics() + return batch_request, [] + def waiting_async_process(self, request: Request) -> None: """ Check if async preprocessing is complete for a request. diff --git a/fastdeploy/engine/sched/resource_manager_v2.py b/fastdeploy/engine/sched/resource_manager_v2.py index b23466ca45e..34ffa83c4ab 100644 --- a/fastdeploy/engine/sched/resource_manager_v2.py +++ b/fastdeploy/engine/sched/resource_manager_v2.py @@ -158,6 +158,14 @@ def schedule(self): self._drain_messages() return super().schedule() + def prefill_schedule(self): + """ + V2 override: drain messages first, then delegate to V1's + prefill_schedule(). V1's ``with self.lock:`` is a no-op. + """ + self._drain_messages() + return super().prefill_schedule() + # ================================================================== # External-thread API — override V1 to use message queue # ==================================================================