Skip to content
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 25 additions & 4 deletions fastdeploy/output/token_processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -227,7 +227,8 @@ def _process_per_token(self, task, batch_id: int, token_ids: np.ndarray, result:
llm_logger.info(f"recovery stop signal found at task {task_id}")
self.tokens_counter[task_id] += 1
if token_id != RECOVERY_STOP_SIGNAL:
result.outputs.token_ids.append(token_id)
if not (envs.FD_ENABLE_INTERNAL_ADAPTER and token_id in task.eos_token_ids):
result.outputs.token_ids.append(token_id)
task.output_token_ids.append(token_id)

if token_id in task.eos_token_ids or is_prefill or recovery_stop:
Expand All @@ -252,20 +253,27 @@ def _process_per_token(self, task, batch_id: int, token_ids: np.ndarray, result:

# Print combined log with all required information
ttft = task.metrics.first_token_time if task.metrics.first_token_time else 0
ttft_s = ttft + task.metrics.time_in_queue
llm_logger.info(
f"Request={task_id}, InputToken={task.prompt_token_ids_len}, "
f"CachedDetail={cached_detail}, OutputToken={self.tokens_counter[task_id]}, "
f"TokenRatio={token_ratio:.2f}, TTFT={ttft:.2f}, "
f"TokenRatio={token_ratio:.2f}, TTFT={ttft:.2f}, TTFT_S={ttft_s:.2f}, "
f"E2E={e2e_time:.2f}, IsPrefill={is_prefill}, RecoveryStop={recovery_stop}, "
f"PreemptedCount={getattr(task.metrics, 'preempted_count', 0)}"
)

main_process_metrics.request_token_ratio.observe(token_ratio)
llm_logger.info(f"{self.resource_manager.info()}")
if self.cfg.speculative_config.method:
self._compute_speculative_status()
self._compute_speculative_status(result)
if not is_prefill:
self._record_completion_metrics(task, current_time)
if (
envs.ENABLE_V1_KVCACHE_SCHEDULER
and self.cfg.cache_config.enable_prefix_caching
and self.cfg.cache_config.enable_output_caching
):
self.resource_manager.cache_output_tokens(task)
self._recycle_resources(task_id, batch_id, task, result, is_prefill)
break
return result
Expand Down Expand Up @@ -306,6 +314,15 @@ def _process_batch_output_use_zmq(self, receive_datas):
if task_id in self.resource_manager.to_be_rescheduled_request_id_set:
continue

if self.scheduler_metrics_logger and self._is_decode_stage(task):
self.scheduler_metrics_logger.on_decode_tokens(len(token_ids))

if task.get("prefill_chunk_info", None) is not None:
prefill_chunk_num = task.get("prefill_chunk_num", 0)
task.prefill_chunk_num = prefill_chunk_num + 1
if task.prefill_chunk_num < len(task.prefill_chunk_info):
continue

current_time = time.time()
if self.tokens_counter[task_id] == 0:
task.metrics.record_recv_first_token()
Expand Down Expand Up @@ -334,6 +351,7 @@ def _process_batch_output_use_zmq(self, receive_datas):
else:
result = RequestOutput(
request_id=task_id,
output_type=3,
outputs=CompletionOutput(
index=i,
send_idx=self.tokens_counter[task_id],
Expand All @@ -343,6 +361,7 @@ def _process_batch_output_use_zmq(self, receive_datas):
finished=False,
metrics=metrics,
ic_req_data=task.ic_req_data,
prompt_token_ids_len=task.prompt_token_ids_len,
)
if self.use_logprobs:
if getattr(stream_data, "logprobs", None) is not None:
Expand All @@ -360,12 +379,14 @@ def _process_batch_output_use_zmq(self, receive_datas):
if self.tokens_counter[task_id] == 0:
if task.messages is not None:
result.prompt = task.messages
result.num_cached_tokens = task.num_cached_tokens
if task.get("multimodal_inputs", None):
result.num_input_image_tokens = task.multimodal_inputs.get("num_input_image_tokens", 0)
result.num_input_video_tokens = task.multimodal_inputs.get("num_input_video_tokens", 0)
result.num_cached_tokens = task.num_cached_tokens

is_prefill = task.disaggregate_info is not None and task.disaggregate_info["role"] == "prefill"
if is_prefill and len(token_ids) > 1:
result.outputs.draft_token_ids = token_ids.tolist()
result = self._process_per_token(task, i, token_ids, result, is_prefill)
if not is_prefill or self.cfg.scheduler_config.name == "splitwise":
batch_result.append(result)
Expand Down