diff --git a/src/llm/BUILD b/src/llm/BUILD index 9cb85d657e..272d323538 100644 --- a/src/llm/BUILD +++ b/src/llm/BUILD @@ -295,7 +295,8 @@ ovms_cc_library( ovms_cc_library( name = "genai_servables", - hdrs = ["servable.hpp", + hdrs = ["servable.hpp", + "ovms_text_streamer.hpp", "servable_initializer.hpp", "language_model/continuous_batching/servable.hpp", "language_model/continuous_batching/llm_executor.hpp", @@ -310,6 +311,7 @@ ovms_cc_library( "text_utils.hpp"], srcs = ["servable.cpp", "servable_initializer.cpp", + "ovms_text_streamer.cpp", "language_model/continuous_batching/servable.cpp", "language_model/continuous_batching/servable_initializer.cpp", "visual_language_model/continuous_batching/servable.cpp", diff --git a/src/llm/apis/openai_api_handler.cpp b/src/llm/apis/openai_api_handler.cpp index c52136d67c..0e96e9b335 100644 --- a/src/llm/apis/openai_api_handler.cpp +++ b/src/llm/apis/openai_api_handler.cpp @@ -520,7 +520,7 @@ bool OpenAIApiHandler::isStream() const { return request.stream; } Endpoint OpenAIApiHandler::getEndpoint() const { return endpoint; } std::string OpenAIApiHandler::getModel() const { return request.model; } std::string OpenAIApiHandler::getToolChoice() const { return request.toolChoice; } -const std::unique_ptr& OpenAIApiHandler::getOutputParser() const { return outputParser; } +const std::shared_ptr& OpenAIApiHandler::getOutputParser() const { return outputParser; } void OpenAIApiHandler::setPromptTokensUsage(size_t promptTokens) { usage.promptTokens = promptTokens; diff --git a/src/llm/apis/openai_api_handler.hpp b/src/llm/apis/openai_api_handler.hpp index 7c56bcbf95..bbfb0763af 100644 --- a/src/llm/apis/openai_api_handler.hpp +++ b/src/llm/apis/openai_api_handler.hpp @@ -97,7 +97,7 @@ class OpenAIApiHandler { ov::genai::Tokenizer tokenizer; // Output parser is used to parse chat completions response to extract specific fields like tool calls and reasoning. - std::unique_ptr outputParser = nullptr; + std::shared_ptr outputParser = nullptr; // Shared parsing helpers absl::Status parseCommonPart(std::optional maxTokensLimit, uint32_t bestOfLimit, std::optional maxModelLength); @@ -118,7 +118,7 @@ class OpenAIApiHandler { // TODO we should delay creating output parser until we have request with toolNameSchemaMap parsed // we pass it now, but it has to be populated first before first use if (!toolParserName.empty() || !reasoningParserName.empty()) { - outputParser = std::make_unique(tokenizer, toolParserName, reasoningParserName, this->request.toolNameSchemaMap); + outputParser = std::make_shared(tokenizer, toolParserName, reasoningParserName, this->request.toolNameSchemaMap); } } @@ -154,7 +154,7 @@ class OpenAIApiHandler { Endpoint getEndpoint() const; std::string getModel() const; std::string getToolChoice() const; - const std::unique_ptr& getOutputParser() const; + const std::shared_ptr& getOutputParser() const; // Usage tracking void setPromptTokensUsage(size_t promptTokens); @@ -165,7 +165,7 @@ class OpenAIApiHandler { virtual std::string serializeUnaryResponse(const std::vector& generationOutputs) = 0; virtual std::string serializeUnaryResponse(ov::genai::EncodedResults& results) = 0; virtual std::string serializeUnaryResponse(ov::genai::VLMDecodedResults& results, const std::string& textResponse) = 0; - virtual std::string serializeStreamingChunk(const std::string& chunkResponse, ov::genai::GenerationFinishReason finishReason) = 0; + virtual std::string serializeStreamingChunk(rapidjson::Document parsedDelta, ov::genai::GenerationFinishReason finishReason) = 0; virtual std::string serializeStreamingUsageChunk() = 0; virtual std::string serializeStreamingHandshakeChunk() = 0; diff --git a/src/llm/apis/openai_completions.cpp b/src/llm/apis/openai_completions.cpp index 20ccfe372f..e2352ba2bc 100644 --- a/src/llm/apis/openai_completions.cpp +++ b/src/llm/apis/openai_completions.cpp @@ -535,7 +535,7 @@ std::string OpenAIChatCompletionsHandler::serializeUnaryResponse(ov::genai::VLMD // --- Streaming serialization --- -std::string OpenAIChatCompletionsHandler::serializeStreamingChunk(const std::string& chunkResponse, ov::genai::GenerationFinishReason finishReason) { +std::string OpenAIChatCompletionsHandler::serializeStreamingChunk(rapidjson::Document parsedDelta, ov::genai::GenerationFinishReason finishReason) { OVMS_PROFILE_FUNCTION(); Document doc; @@ -561,33 +561,25 @@ std::string OpenAIChatCompletionsHandler::serializeStreamingChunk(const std::str // TODO: logprobs: object/null; Log probability information for the choice. choice.AddMember("logprobs", Value(), allocator); if (endpoint == Endpoint::CHAT_COMPLETIONS) { - if (outputParser != nullptr) { - std::optional delta = outputParser->parseChunk(chunkResponse, areToolsAvailable(), finishReason); - if (!delta.has_value()) { - // If the generation is still ongoing, there is nothing to emit yet - if (finishReason == ov::genai::GenerationFinishReason::NONE) { - return ""; - } - // Generation finished but parser returned no delta (e.g. empty chunk after tool call). - // We still need to emit a chunk with the appropriate finish_reason. - } - if (delta.has_value() && delta->HasMember("delta")) { - // Deep copy the "delta" member value into the choice object - choice.AddMember("delta", Value((*delta)["delta"], allocator), allocator); - hasToolCalls = hasToolCallsInStreamingDelta(*delta); - if (hasToolCalls) { - toolCallsDetectedInStream = true; - } + // parsedDelta is a pre-parsed Document produced by OVMSTextStreamer::flush_chunk. + // Shape: {"delta":{...}} for content/reasoning/tool_calls, or an empty Document{} + // for finish-only chunks (generation ended on a swallowed token). + if (parsedDelta.HasMember("delta")) { + choice.AddMember("delta", Value(parsedDelta["delta"], allocator), allocator); + hasToolCalls = hasToolCallsInStreamingDelta(parsedDelta); + if (hasToolCalls) { + toolCallsDetectedInStream = true; } - - } else { - Value delta(kObjectType); - delta.SetObject(); - delta.AddMember("content", Value(chunkResponse.c_str(), allocator), allocator); - choice.AddMember("delta", delta, allocator); } + // If no "delta" member, choice has no delta — valid for the final finish_reason chunk. } else if (endpoint == Endpoint::COMPLETIONS) { - choice.AddMember("text", Value(chunkResponse.c_str(), allocator), allocator); + // For /v1/completions, extract the plain text from the content delta. + if (parsedDelta.HasMember("delta") && parsedDelta["delta"].IsObject() && + parsedDelta["delta"].HasMember("content") && parsedDelta["delta"]["content"].IsString()) { + choice.AddMember("text", Value(parsedDelta["delta"]["content"].GetString(), allocator), allocator); + } else { + choice.AddMember("text", Value("", allocator), allocator); + } } auto serializedFinishReason = mapFinishReason(finishReason, hasToolCalls || toolCallsDetectedInStream); @@ -672,6 +664,9 @@ std::string OpenAIChatCompletionsHandler::serializeStreamingUsageChunk() { std::string OpenAIChatCompletionsHandler::serializeStreamingHandshakeChunk() { OVMS_PROFILE_FUNCTION(); + // The handshake chunk signals that prefill is complete and generation has started. + // Emitted on every endpoint so clients can distinguish prefill latency from + // time-to-first-token. Document doc; doc.SetObject(); Document::AllocatorType& allocator = doc.GetAllocator(); @@ -691,7 +686,8 @@ std::string OpenAIChatCompletionsHandler::serializeStreamingHandshakeChunk() { delta.AddMember("content", Value(rapidjson::kNullType), allocator); choice.AddMember("delta", delta, allocator); } else if (endpoint == Endpoint::COMPLETIONS) { - choice.AddMember("text", Value(rapidjson::kNullType), allocator); + // Empty string (not null) so the field is present and typed as string. + choice.AddMember("text", Value("", allocator), allocator); } choice.AddMember("finish_reason", Value(rapidjson::kNullType), allocator); @@ -705,7 +701,6 @@ std::string OpenAIChatCompletionsHandler::serializeStreamingHandshakeChunk() { // model: string; copied from the request doc.AddMember("model", Value(request.model.c_str(), allocator), allocator); - // object: string; defined that the type streamed chunk rather than complete response if (endpoint == Endpoint::CHAT_COMPLETIONS) { doc.AddMember("object", Value("chat.completion.chunk", allocator), allocator); } else if (endpoint == Endpoint::COMPLETIONS) { diff --git a/src/llm/apis/openai_completions.hpp b/src/llm/apis/openai_completions.hpp index cbb8f2645f..7b1059fb6c 100644 --- a/src/llm/apis/openai_completions.hpp +++ b/src/llm/apis/openai_completions.hpp @@ -40,7 +40,7 @@ class OpenAIChatCompletionsHandler : public OpenAIApiHandler { std::string serializeUnaryResponse(const std::vector& generationOutputs) override; std::string serializeUnaryResponse(ov::genai::EncodedResults& results) override; std::string serializeUnaryResponse(ov::genai::VLMDecodedResults& results, const std::string& textResponse) override; - std::string serializeStreamingChunk(const std::string& chunkResponse, ov::genai::GenerationFinishReason finishReason) override; + std::string serializeStreamingChunk(rapidjson::Document parsedDelta, ov::genai::GenerationFinishReason finishReason) override; std::string serializeStreamingUsageChunk() override; std::string serializeStreamingHandshakeChunk() override; void incrementProcessedTokens(size_t numTokens = 1) override; diff --git a/src/llm/apis/openai_responses.cpp b/src/llm/apis/openai_responses.cpp index 4b20b054b9..de2e4f4349 100644 --- a/src/llm/apis/openai_responses.cpp +++ b/src/llm/apis/openai_responses.cpp @@ -1565,7 +1565,7 @@ std::string OpenAIResponsesHandler::serializeStreamingInProgressEvent() { return buffer.GetString(); } -std::string OpenAIResponsesHandler::serializeStreamingChunk(const std::string& chunkResponse, ov::genai::GenerationFinishReason finishReason) { +std::string OpenAIResponsesHandler::serializeStreamingChunk(rapidjson::Document parsedDelta, ov::genai::GenerationFinishReason finishReason) { OVMS_PROFILE_FUNCTION(); const auto createdAt = std::chrono::duration_cast(created.time_since_epoch()).count(); const std::string responseId = "resp-" + std::to_string(createdAt); @@ -1583,33 +1583,24 @@ std::string OpenAIResponsesHandler::serializeStreamingChunk(const std::string& c events.emplace_back(std::move(inProgressEvent)); } - // Lifecycle priming: when the servable invokes serializeStreamingChunk("") - // before the first token is generated (Responses-only behavior), we must - // only emit lifecycle events and skip the parser. Feeding an empty chunk - // into outputParser->parseChunk would advance processingPhase from UNKNOWN - // to CONTENT and cause subsequent reasoning-tag chunks to leak into - // delta.content. - if (chunkResponse.empty() && finishReason == ov::genai::GenerationFinishReason::NONE) { - return joinServerSideEvents(events); - } - - if (outputParser != nullptr) { - // Use output parser to separate reasoning from content - std::optional delta = outputParser->parseChunk(chunkResponse, areToolsAvailable(), finishReason); - - if (delta.has_value() && delta->HasMember("delta") && (*delta)["delta"].IsObject()) { - const auto& deltaObj = (*delta)["delta"]; - if (deltaObj.HasMember("reasoning_content") && deltaObj["reasoning_content"].IsString()) { - // Reasoning chunk - if (!responsesState.reasoningInitialized) { - events.emplace_back(serializeReasoningOutputItemAddedEvent(reasoningItemId)); - events.emplace_back(serializeReasoningSummaryPartAddedEvent(reasoningItemId)); - responsesState.reasoningInitialized = true; - } - const std::string reasoningText = deltaObj["reasoning_content"].GetString(); - responsesState.reasoningText += reasoningText; - events.emplace_back(serializeReasoningSummaryTextDeltaEvent(reasoningItemId, reasoningText)); - } else if (deltaObj.HasMember("content") && deltaObj["content"].IsString()) { + // parsedDelta is a pre-parsed Document produced by OVMSTextStreamer::flushChunk. + // Shape: {"delta":{...}} for content/reasoning/tool_calls, or an empty Document{} + // for finish-only chunks. Inspect the delta directly — no parsing needed here. + if (parsedDelta.HasMember("delta") && parsedDelta["delta"].IsObject()) { + const auto& deltaObj = parsedDelta["delta"]; + if (deltaObj.HasMember("reasoning_content") && deltaObj["reasoning_content"].IsString()) { + // Reasoning chunk + if (!responsesState.reasoningInitialized) { + events.emplace_back(serializeReasoningOutputItemAddedEvent(reasoningItemId)); + events.emplace_back(serializeReasoningSummaryPartAddedEvent(reasoningItemId)); + responsesState.reasoningInitialized = true; + } + const std::string reasoningText = deltaObj["reasoning_content"].GetString(); + responsesState.reasoningText += reasoningText; + events.emplace_back(serializeReasoningSummaryTextDeltaEvent(reasoningItemId, reasoningText)); + } else if (deltaObj.HasMember("content") && deltaObj["content"].IsString()) { + const std::string contentText = deltaObj["content"].GetString(); + if (!contentText.empty()) { // Content chunk - close reasoning if it was active, init message if needed if (responsesState.reasoningInitialized && !responsesState.reasoningCompleted) { events.emplace_back(serializeReasoningSummaryTextDoneEvent(reasoningItemId)); @@ -1623,72 +1614,61 @@ std::string OpenAIResponsesHandler::serializeStreamingChunk(const std::string& c events.emplace_back(serializeContentPartAddedEvent(outputItemId, msgIdx)); responsesState.messageInitialized = true; } - const std::string contentText = deltaObj["content"].GetString(); responsesState.outputText += contentText; events.emplace_back(serializeOutputTextDeltaEvent(outputItemId, contentText, msgIdx)); - } else if (deltaObj.HasMember("tool_calls") && deltaObj["tool_calls"].IsArray()) { - // Tool call chunk - close reasoning if active - if (responsesState.reasoningInitialized && !responsesState.reasoningCompleted) { - events.emplace_back(serializeReasoningSummaryTextDoneEvent(reasoningItemId)); - events.emplace_back(serializeReasoningSummaryPartDoneEvent(reasoningItemId)); - events.emplace_back(serializeReasoningOutputItemDoneEvent(reasoningItemId)); - responsesState.reasoningCompleted = true; - } - const auto& toolCallsArr = deltaObj["tool_calls"]; - for (rapidjson::SizeType i = 0; i < toolCallsArr.Size(); ++i) { - const auto& tc = toolCallsArr[i]; - int tcIndex = tc.HasMember("index") ? tc["index"].GetInt() : 0; - // Determine the output index for this tool call - const uint64_t baseIdx = responsesState.reasoningInitialized ? 1 : 0; - const uint64_t tcOutputIdx = baseIdx + static_cast(tcIndex); - // Determine if this is a new tool call (has function name) - bool isNewToolCall = false; - std::string funcName; - std::string tcId; - std::string argDelta; - if (tc.HasMember("function") && tc["function"].IsObject()) { - const auto& funcObj = tc["function"]; - if (funcObj.HasMember("name") && funcObj["name"].IsString()) { - funcName = funcObj["name"].GetString(); - isNewToolCall = true; - } - if (funcObj.HasMember("arguments") && funcObj["arguments"].IsString()) { - argDelta = funcObj["arguments"].GetString(); - } - } - if (tc.HasMember("id") && tc["id"].IsString()) { - tcId = tc["id"].GetString(); + } + } else if (deltaObj.HasMember("tool_calls") && deltaObj["tool_calls"].IsArray()) { + // Tool call chunk - close reasoning if active + if (responsesState.reasoningInitialized && !responsesState.reasoningCompleted) { + events.emplace_back(serializeReasoningSummaryTextDoneEvent(reasoningItemId)); + events.emplace_back(serializeReasoningSummaryPartDoneEvent(reasoningItemId)); + events.emplace_back(serializeReasoningOutputItemDoneEvent(reasoningItemId)); + responsesState.reasoningCompleted = true; + } + const auto& toolCallsArr = deltaObj["tool_calls"]; + for (rapidjson::SizeType i = 0; i < toolCallsArr.Size(); ++i) { + const auto& tc = toolCallsArr[i]; + int tcIndex = tc.HasMember("index") ? tc["index"].GetInt() : 0; + // Determine the output index for this tool call + const uint64_t baseIdx = responsesState.reasoningInitialized ? 1 : 0; + const uint64_t tcOutputIdx = baseIdx + static_cast(tcIndex); + // Determine if this is a new tool call (has function name) + bool isNewToolCall = false; + std::string funcName; + std::string tcId; + std::string argDelta; + if (tc.HasMember("function") && tc["function"].IsObject()) { + const auto& funcObj = tc["function"]; + if (funcObj.HasMember("name") && funcObj["name"].IsString()) { + funcName = funcObj["name"].GetString(); + isNewToolCall = true; } - if (isNewToolCall) { - // Ensure we have enough entries in our tracking vector - while (static_cast(responsesState.toolCalls.size()) <= tcIndex) { - responsesState.toolCalls.push_back(ToolCall{}); - } - responsesState.toolCalls[tcIndex].id = tcId; - responsesState.toolCalls[tcIndex].name = funcName; - responsesState.toolCalls[tcIndex].arguments = ""; - events.emplace_back(serializeFunctionCallOutputItemAddedEvent(responsesState.toolCalls[tcIndex], tcOutputIdx)); + if (funcObj.HasMember("arguments") && funcObj["arguments"].IsString()) { + argDelta = funcObj["arguments"].GetString(); } - if (!argDelta.empty() && static_cast(responsesState.toolCalls.size()) > tcIndex) { - responsesState.toolCalls[tcIndex].arguments += argDelta; - events.emplace_back(serializeFunctionCallArgumentsDeltaEvent(responsesState.toolCalls[tcIndex].id, argDelta, tcOutputIdx)); + } + if (tc.HasMember("id") && tc["id"].IsString()) { + tcId = tc["id"].GetString(); + } + if (isNewToolCall) { + // Ensure we have enough entries in our tracking vector + while (static_cast(responsesState.toolCalls.size()) <= tcIndex) { + responsesState.toolCalls.push_back(ToolCall{}); } + responsesState.toolCalls[tcIndex].id = tcId; + responsesState.toolCalls[tcIndex].name = funcName; + responsesState.toolCalls[tcIndex].arguments = ""; + events.emplace_back(serializeFunctionCallOutputItemAddedEvent(responsesState.toolCalls[tcIndex], tcOutputIdx)); + } + if (!argDelta.empty() && static_cast(responsesState.toolCalls.size()) > tcIndex) { + responsesState.toolCalls[tcIndex].arguments += argDelta; + events.emplace_back(serializeFunctionCallArgumentsDeltaEvent(responsesState.toolCalls[tcIndex].id, argDelta, tcOutputIdx)); } } } - // If delta is nullopt, the parser is accumulating tag tokens - skip - } else { - // No parser - pass through raw text - if (!chunkResponse.empty()) { - if (!responsesState.messageInitialized) { - events.emplace_back(serializeOutputItemAddedEvent(outputItemId)); - events.emplace_back(serializeContentPartAddedEvent(outputItemId)); - responsesState.messageInitialized = true; - } - responsesState.outputText += chunkResponse; - events.emplace_back(serializeOutputTextDeltaEvent(outputItemId, chunkResponse)); - } + // Empty delta object (no recognized member) — finish-only chunk, no events to emit here. } + // Empty Document (no "delta" member) — finish-only chunk; lifecycle events already emitted above. if (finishReason != ov::genai::GenerationFinishReason::NONE) { // Close any open reasoning that wasn't closed by content transition diff --git a/src/llm/apis/openai_responses.hpp b/src/llm/apis/openai_responses.hpp index 6a10400952..d0a773a7d8 100644 --- a/src/llm/apis/openai_responses.hpp +++ b/src/llm/apis/openai_responses.hpp @@ -98,7 +98,7 @@ class OpenAIResponsesHandler : public OpenAIApiHandler { std::string serializeUnaryResponse(const std::vector& generationOutputs) override; std::string serializeUnaryResponse(ov::genai::EncodedResults& results) override; std::string serializeUnaryResponse(ov::genai::VLMDecodedResults& results, const std::string& textResponse) override; - std::string serializeStreamingChunk(const std::string& chunkResponse, ov::genai::GenerationFinishReason finishReason) override; + std::string serializeStreamingChunk(rapidjson::Document parsedDelta, ov::genai::GenerationFinishReason finishReason) override; std::string serializeStreamingUsageChunk() override; std::string serializeStreamingHandshakeChunk() override; std::string serializeStreamingCreatedEvent() override; diff --git a/src/llm/io_processing/base_output_parser.hpp b/src/llm/io_processing/base_output_parser.hpp index cf44d084bc..f5026ba5bb 100644 --- a/src/llm/io_processing/base_output_parser.hpp +++ b/src/llm/io_processing/base_output_parser.hpp @@ -84,7 +84,8 @@ class BaseOutputParser { // Parse model output chunk in the streaming mode. If in result of processing the chunk we cannot produce meaningful response, we return std::nullopt. // Otherwise we return a JSON object containing the delta that conforms to OpenAI API. - virtual std::optional parseChunk(const std::string& chunkResponse, ov::genai::GenerationFinishReason finishReason) = 0; + // tokens holds the token IDs that produced chunkResponse (may be empty; currently informational for future use). + virtual std::optional parseChunk(const std::string& chunkResponse, const std::vector& tokens, ov::genai::GenerationFinishReason finishReason) = 0; // Get the tags that marks the beginning of the segment that should be processed by the parser. // This method is used in streaming mode to determine if the parser should start processing the content. diff --git a/src/llm/io_processing/devstral/tool_parser.cpp b/src/llm/io_processing/devstral/tool_parser.cpp index 2274d2e2b0..6ceb74a42d 100644 --- a/src/llm/io_processing/devstral/tool_parser.cpp +++ b/src/llm/io_processing/devstral/tool_parser.cpp @@ -134,7 +134,7 @@ rapidjson::Document DevstralToolParser::parseContentChunk() { return doc; } -std::optional DevstralToolParser::parseChunk(const std::string& chunk, ov::genai::GenerationFinishReason finishReason) { +std::optional DevstralToolParser::parseChunk(const std::string& chunk, const std::vector& /*tokens*/, ov::genai::GenerationFinishReason finishReason) { /* Devstral [TOOL_CALL]tool_name[ARGS]arguments[] It does not support parallel tool calls, so tool calls are always in sequence. diff --git a/src/llm/io_processing/devstral/tool_parser.hpp b/src/llm/io_processing/devstral/tool_parser.hpp index c07b38b34e..5a591696ab 100644 --- a/src/llm/io_processing/devstral/tool_parser.hpp +++ b/src/llm/io_processing/devstral/tool_parser.hpp @@ -56,7 +56,7 @@ class DevstralToolParser : public BaseOutputParser { toolSchemas(toolSchemas) {} void parse(ParsedOutput& parsedOutput, const std::vector& generatedTokens) override; - std::optional parseChunk(const std::string& chunk, ov::genai::GenerationFinishReason finishReason) override; + std::optional parseChunk(const std::string& chunk, const std::vector& tokens, ov::genai::GenerationFinishReason finishReason) override; rapidjson::Document parseContentChunk(); rapidjson::Document wrapCombinedDelta(ToolCall& toolCall); const std::vector& getParsingStartTags() const override { diff --git a/src/llm/io_processing/gemma4/gemma4_reasoning_parser.cpp b/src/llm/io_processing/gemma4/gemma4_reasoning_parser.cpp index dae9b0075e..cd1077adb2 100644 --- a/src/llm/io_processing/gemma4/gemma4_reasoning_parser.cpp +++ b/src/llm/io_processing/gemma4/gemma4_reasoning_parser.cpp @@ -55,7 +55,7 @@ void Gemma4ReasoningParser::parse(ParsedOutput& parsedOutput, const std::vector< parsedOutput.content = contentWithoutReasoning; } } -std::optional Gemma4ReasoningParser::parseChunk(const std::string& chunk, ov::genai::GenerationFinishReason finishReason) { +std::optional Gemma4ReasoningParser::parseChunk(const std::string& chunk, const std::vector& /*tokens*/, ov::genai::GenerationFinishReason finishReason) { if (chunk.empty()) { SPDLOG_LOGGER_DEBUG(llm_calculator_logger, "Received empty chunk for Gemma4ReasoningParser"); return std::nullopt; diff --git a/src/llm/io_processing/gemma4/gemma4_reasoning_parser.hpp b/src/llm/io_processing/gemma4/gemma4_reasoning_parser.hpp index a293d41c92..887036a59d 100644 --- a/src/llm/io_processing/gemma4/gemma4_reasoning_parser.hpp +++ b/src/llm/io_processing/gemma4/gemma4_reasoning_parser.hpp @@ -38,7 +38,7 @@ class Gemma4ReasoningParser : public Qwen3ReasoningParser { explicit Gemma4ReasoningParser(ov::genai::Tokenizer& tokenizer) : Qwen3ReasoningParser(tokenizer) {} void parse(ParsedOutput& parsedOutput, const std::vector& generatedTokens) override; - std::optional parseChunk(const std::string& chunk, ov::genai::GenerationFinishReason finishReason) override; + std::optional parseChunk(const std::string& chunk, const std::vector& tokens, ov::genai::GenerationFinishReason finishReason) override; bool requiresStreamingWithSpecialTokens() const override { return true; diff --git a/src/llm/io_processing/gemma4/gemma4_tool_parser.cpp b/src/llm/io_processing/gemma4/gemma4_tool_parser.cpp index 7aa1477304..058f0c1add 100644 --- a/src/llm/io_processing/gemma4/gemma4_tool_parser.cpp +++ b/src/llm/io_processing/gemma4/gemma4_tool_parser.cpp @@ -363,7 +363,7 @@ rapidjson::Document Gemma4ToolParser::wrapDeltaArgs(const std::string& argsStr, return BaseOutputParser::wrapDelta(doc, toolCallIndex); } -std::optional Gemma4ToolParser::parseChunk(const std::string& chunk, ov::genai::GenerationFinishReason finishReason) { +std::optional Gemma4ToolParser::parseChunk(const std::string& chunk, const std::vector& /*tokens*/, ov::genai::GenerationFinishReason finishReason) { if (!chunk.empty()) { this->streamingContent += chunk; } diff --git a/src/llm/io_processing/gemma4/gemma4_tool_parser.hpp b/src/llm/io_processing/gemma4/gemma4_tool_parser.hpp index 1a687bf761..0a7ab12b15 100644 --- a/src/llm/io_processing/gemma4/gemma4_tool_parser.hpp +++ b/src/llm/io_processing/gemma4/gemma4_tool_parser.hpp @@ -52,7 +52,7 @@ class Gemma4ToolParser : public BaseOutputParser { BaseOutputParser(tokenizer) {} void parse(ParsedOutput& parsedOutput, const std::vector& generatedTokens) override; - std::optional parseChunk(const std::string& chunk, ov::genai::GenerationFinishReason finishReason) override; + std::optional parseChunk(const std::string& chunk, const std::vector& tokens, ov::genai::GenerationFinishReason finishReason) override; const std::vector& getParsingStartTags() const override { static const std::vector parsingStartTags = {TOOL_CALL_START_TAG}; return parsingStartTags; diff --git a/src/llm/io_processing/gptoss/reasoning_parser.cpp b/src/llm/io_processing/gptoss/reasoning_parser.cpp index cc3c295092..2d856cbd85 100644 --- a/src/llm/io_processing/gptoss/reasoning_parser.cpp +++ b/src/llm/io_processing/gptoss/reasoning_parser.cpp @@ -40,7 +40,7 @@ void GptOssReasoningParser::parse(ParsedOutput& parsedOutput, const std::vector< SPDLOG_LOGGER_DEBUG(llm_calculator_logger, "Unary | GPT Reasoning | [{}]", parsedOutput.reasoning); } -std::optional GptOssReasoningParser::parseChunk(const std::string& newChunk, ov::genai::GenerationFinishReason finishReason) { +std::optional GptOssReasoningParser::parseChunk(const std::string& newChunk, const std::vector& /*tokens*/, ov::genai::GenerationFinishReason finishReason) { SPDLOG_LOGGER_DEBUG(llm_calculator_logger, "Streaming | GPT Reason | Processing Chunk [{}]", newChunk); if (newChunk.empty()) { diff --git a/src/llm/io_processing/gptoss/reasoning_parser.hpp b/src/llm/io_processing/gptoss/reasoning_parser.hpp index 83f080f083..37af80b4bf 100644 --- a/src/llm/io_processing/gptoss/reasoning_parser.hpp +++ b/src/llm/io_processing/gptoss/reasoning_parser.hpp @@ -50,7 +50,7 @@ class GptOssReasoningParser : public BaseOutputParser { // Unary void parse(ParsedOutput& parsedOutput, const std::vector& generatedTokens) override; // Streaming - std::optional parseChunk(const std::string& chunk, ov::genai::GenerationFinishReason finishReason) override; + std::optional parseChunk(const std::string& chunk, const std::vector& tokens, ov::genai::GenerationFinishReason finishReason) override; const std::vector& getParsingStartTags() const override { // If you add another element you have to update implementation as well diff --git a/src/llm/io_processing/gptoss/tool_parser.cpp b/src/llm/io_processing/gptoss/tool_parser.cpp index 0dc1fa8a89..2ca3a50f7b 100644 --- a/src/llm/io_processing/gptoss/tool_parser.cpp +++ b/src/llm/io_processing/gptoss/tool_parser.cpp @@ -80,7 +80,7 @@ void GptOssToolParser::clearState() { functionNameCache.clear(); } -std::optional GptOssToolParser::parseChunk(const std::string& newChunk, ov::genai::GenerationFinishReason finishReason) { +std::optional GptOssToolParser::parseChunk(const std::string& newChunk, const std::vector& /*tokens*/, ov::genai::GenerationFinishReason finishReason) { SPDLOG_LOGGER_DEBUG(llm_calculator_logger, "Streaming | GPT Tool | Processing Chunk [{}]", newChunk); std::string chunk = newChunk; diff --git a/src/llm/io_processing/gptoss/tool_parser.hpp b/src/llm/io_processing/gptoss/tool_parser.hpp index b3733cf76a..ff6655db37 100644 --- a/src/llm/io_processing/gptoss/tool_parser.hpp +++ b/src/llm/io_processing/gptoss/tool_parser.hpp @@ -55,7 +55,7 @@ class GptOssToolParser : public BaseOutputParser { // Unary void parse(ParsedOutput& parsedOutput, const std::vector& generatedTokens) override; // Streaming - std::optional parseChunk(const std::string& chunk, ov::genai::GenerationFinishReason finishReason) override; + std::optional parseChunk(const std::string& chunk, const std::vector& tokens, ov::genai::GenerationFinishReason finishReason) override; const std::vector& getParsingStartTags() const override { static const std::vector parsingStartTags{ diff --git a/src/llm/io_processing/hermes3/tool_parser.cpp b/src/llm/io_processing/hermes3/tool_parser.cpp index 1f3a4fd2ed..c4b1d55b4a 100644 --- a/src/llm/io_processing/hermes3/tool_parser.cpp +++ b/src/llm/io_processing/hermes3/tool_parser.cpp @@ -178,7 +178,7 @@ void Hermes3ToolParser::parse(ParsedOutput& parsedOutput, const std::vector Hermes3ToolParser::parseChunk(const std::string& chunk, ov::genai::GenerationFinishReason finishReason) { +std::optional Hermes3ToolParser::parseChunk(const std::string& chunk, const std::vector& /*tokens*/, ov::genai::GenerationFinishReason finishReason) { /* We first collect data until we have full function name - that's when we return the first delta. Every next delta contains next parts of the arguments. Hermes3 generates arguments as JSON, but OpenAI API expects them in a string format. diff --git a/src/llm/io_processing/hermes3/tool_parser.hpp b/src/llm/io_processing/hermes3/tool_parser.hpp index bd1c5bb576..dc8f98d634 100644 --- a/src/llm/io_processing/hermes3/tool_parser.hpp +++ b/src/llm/io_processing/hermes3/tool_parser.hpp @@ -74,7 +74,7 @@ class Hermes3ToolParser : public BaseOutputParser { BaseOutputParser(tokenizer) {} void parse(ParsedOutput& parsedOutput, const std::vector& generatedTokens) override; - std::optional parseChunk(const std::string& chunk, ov::genai::GenerationFinishReason finishReason) override; + std::optional parseChunk(const std::string& chunk, const std::vector& tokens, ov::genai::GenerationFinishReason finishReason) override; const std::vector& getParsingStartTags() const override { static const std::vector parsingStartTags = {parsingStartTag}; return parsingStartTags; diff --git a/src/llm/io_processing/lfm2/lfm2_tool_parser.cpp b/src/llm/io_processing/lfm2/lfm2_tool_parser.cpp index fc36f11577..056a5bead9 100644 --- a/src/llm/io_processing/lfm2/lfm2_tool_parser.cpp +++ b/src/llm/io_processing/lfm2/lfm2_tool_parser.cpp @@ -330,7 +330,7 @@ void Lfm2ToolParser::cutEOSFromContent(std::string& content) { } } -std::optional Lfm2ToolParser::parseChunk(const std::string& chunk, ov::genai::GenerationFinishReason finishReason) { +std::optional Lfm2ToolParser::parseChunk(const std::string& chunk, const std::vector& /*tokens*/, ov::genai::GenerationFinishReason finishReason) { if (chunk.empty()) { return std::nullopt; } diff --git a/src/llm/io_processing/lfm2/lfm2_tool_parser.hpp b/src/llm/io_processing/lfm2/lfm2_tool_parser.hpp index ac492919cf..cd56634eb4 100644 --- a/src/llm/io_processing/lfm2/lfm2_tool_parser.hpp +++ b/src/llm/io_processing/lfm2/lfm2_tool_parser.hpp @@ -55,7 +55,7 @@ class Lfm2ToolParser : public BaseOutputParser { BaseOutputParser(tokenizer) {} void parse(ParsedOutput& parsedOutput, const std::vector& generatedTokens) override; - std::optional parseChunk(const std::string& chunk, ov::genai::GenerationFinishReason finishReason) override; + std::optional parseChunk(const std::string& chunk, const std::vector& tokens, ov::genai::GenerationFinishReason finishReason) override; const std::vector& getParsingStartTags() const override { static const std::vector parsingStartTags = {TOOL_CALL_START_TAG}; return parsingStartTags; diff --git a/src/llm/io_processing/llama3/tool_parser.cpp b/src/llm/io_processing/llama3/tool_parser.cpp index 4be468c62b..845a6cca7b 100644 --- a/src/llm/io_processing/llama3/tool_parser.cpp +++ b/src/llm/io_processing/llama3/tool_parser.cpp @@ -121,7 +121,7 @@ static inline void changeParametersToArguments(rapidjson::Document& json) { } } -std::optional Llama3ToolParser::parseChunk(const std::string& chunk, ov::genai::GenerationFinishReason finishReason) { +std::optional Llama3ToolParser::parseChunk(const std::string& chunk, const std::vector& /*tokens*/, ov::genai::GenerationFinishReason finishReason) { if (chunk.empty()) { SPDLOG_LOGGER_DEBUG(llm_calculator_logger, "Received empty chunk for Llama3ToolParser"); return std::nullopt; diff --git a/src/llm/io_processing/llama3/tool_parser.hpp b/src/llm/io_processing/llama3/tool_parser.hpp index 6418e8d04c..40d235d411 100644 --- a/src/llm/io_processing/llama3/tool_parser.hpp +++ b/src/llm/io_processing/llama3/tool_parser.hpp @@ -55,7 +55,7 @@ class Llama3ToolParser : public BaseOutputParser { BaseOutputParser(tokenizer) {} void parse(ParsedOutput& parsedOutput, const std::vector& generatedTokens) override; - std::optional parseChunk(const std::string& chunk, ov::genai::GenerationFinishReason finishReason) override; + std::optional parseChunk(const std::string& chunk, const std::vector& tokens, ov::genai::GenerationFinishReason finishReason) override; const std::vector& getParsingStartTags() const override { static const std::vector parsingStartTags = {parsingStartTag}; return parsingStartTags; diff --git a/src/llm/io_processing/mistral/tool_parser.cpp b/src/llm/io_processing/mistral/tool_parser.cpp index 9d36741c27..11ba39979c 100644 --- a/src/llm/io_processing/mistral/tool_parser.cpp +++ b/src/llm/io_processing/mistral/tool_parser.cpp @@ -162,7 +162,7 @@ void MistralToolParser::clearState() { openBracesCount = 1; // Reset to 1 as we count the tool call opening brace } -std::optional MistralToolParser::parseChunk(const std::string& chunk, ov::genai::GenerationFinishReason finishReason) { +std::optional MistralToolParser::parseChunk(const std::string& chunk, const std::vector& /*tokens*/, ov::genai::GenerationFinishReason finishReason) { /* Mistral with vLLM template produces tool calls in the format (beginning [TOOL_CALL] is skipped by the mode or just not visible during streaming): [{"name": [function name], "arguments": [function arguments as JSON]}, ...] @@ -204,7 +204,7 @@ std::optional MistralToolParser::parseChunk(const std::stri // We found "[{", so we switch to the the state where we are waiting for the opening bracket of the array internalState = AWAITING_TOOL_CALLS_OPENING_BRACKET; // We have more content in the chunk after "[{", so we process the rest of the chunk in the next state - return parseChunk(modifiedChunk, finishReason); + return parseChunk(modifiedChunk, {}, finishReason); } return std::nullopt; } else if (internalState == AWAITING_TOOL_CALLS_OPENING_BRACKET) { @@ -219,7 +219,7 @@ std::optional MistralToolParser::parseChunk(const std::stri return std::nullopt; // Nothing more to process in this chunk } else { // Process the remaining chunk as part of tool call processing - return parseChunk(remainingChunk, finishReason); + return parseChunk(remainingChunk, {}, finishReason); } } else { // Still waiting for the opening bracket, ignore this chunk @@ -237,7 +237,7 @@ std::optional MistralToolParser::parseChunk(const std::stri if (remainingChunk.empty()) { return std::nullopt; // Nothing more to process in this chunk } else { - return parseChunk(remainingChunk, finishReason); + return parseChunk(remainingChunk, {}, finishReason); } } else { // Still waiting for the opening brace, ignore this chunk diff --git a/src/llm/io_processing/mistral/tool_parser.hpp b/src/llm/io_processing/mistral/tool_parser.hpp index f4193f8999..8f1a762f85 100644 --- a/src/llm/io_processing/mistral/tool_parser.hpp +++ b/src/llm/io_processing/mistral/tool_parser.hpp @@ -70,7 +70,7 @@ class MistralToolParser : public BaseOutputParser { BaseOutputParser(tokenizer) {} void parse(ParsedOutput& parsedOutput, const std::vector& generatedTokens) override; - std::optional parseChunk(const std::string& chunk, ov::genai::GenerationFinishReason finishReason) override; + std::optional parseChunk(const std::string& chunk, const std::vector& tokens, ov::genai::GenerationFinishReason finishReason) override; const std::vector& getParsingStartTags() const override { static const std::vector toolCallStartTags{"[TOOL_CALLS]", streamingParsingStartTag}; return toolCallStartTags; diff --git a/src/llm/io_processing/output_parser.cpp b/src/llm/io_processing/output_parser.cpp index 543d86c6c1..6c5e4953c6 100644 --- a/src/llm/io_processing/output_parser.cpp +++ b/src/llm/io_processing/output_parser.cpp @@ -143,13 +143,13 @@ std::optional OutputParser::parseContentChunk(ProcessingPha return doc; } -std::optional OutputParser::parseToolCallChunk(ov::genai::GenerationFinishReason finishReason, ProcessingPhase newPhase) { +std::optional OutputParser::parseToolCallChunk(const std::vector& tokens, ov::genai::GenerationFinishReason finishReason, ProcessingPhase newPhase) { if (!toolParser) { throw std::runtime_error("Tool parser is not available, cannot parse tool call chunk"); } std::optional result; try { - result = toolParser->parseChunk(streamOutputCache.getBuffer(), finishReason); + result = toolParser->parseChunk(streamOutputCache.getBuffer(), tokens, finishReason); } catch (...) { streamOutputCache.clear(); throw; @@ -159,13 +159,13 @@ std::optional OutputParser::parseToolCallChunk(ov::genai::G return result; } -std::optional OutputParser::parseReasoningChunk(ov::genai::GenerationFinishReason finishReason, ProcessingPhase newPhase) { +std::optional OutputParser::parseReasoningChunk(const std::vector& tokens, ov::genai::GenerationFinishReason finishReason, ProcessingPhase newPhase) { if (!reasoningParser) { throw std::runtime_error("Reasoning parser is not available, cannot parse reasoning chunk"); } std::optional result; try { - result = reasoningParser->parseChunk(streamOutputCache.getBuffer(), finishReason); + result = reasoningParser->parseChunk(streamOutputCache.getBuffer(), tokens, finishReason); } catch (...) { streamOutputCache.clear(); throw; @@ -253,7 +253,7 @@ ParsedOutput OutputParser::parse(const std::vector& generatedTokens, co return parsedOutput; } -std::optional OutputParser::parseChunk(const std::string& chunkResponse, const bool toolsAvailable, ov::genai::GenerationFinishReason finishReason) { +std::optional OutputParser::parseChunk(const std::string& chunkResponse, const std::vector& tokens, const bool toolsAvailable, ov::genai::GenerationFinishReason finishReason) { /* Using appropriate parser based on the current processing phase Call to this method should return either result from parserContentChunk, parseToolCallChunk, parseReasoningChunk when we can determine the phase @@ -279,7 +279,7 @@ std::optional OutputParser::parseChunk(const std::string& c reasoningStartTagStatus = streamOutputCache.lookupTags(reasoningParser->getSpecialParsingStartTags()); } if (reasoningStartTagStatus == TagLookupStatus::FOUND_COMPLETE) { - return parseReasoningChunk(finishReason); + return parseReasoningChunk(tokens, finishReason); } // else startTagStatus is FOUND_INCOMPLETE or NOT_FOUND, we continue processing, so potential tool parser start tag is not missed anyStartTagStatus = reasoningStartTagStatus; } @@ -292,7 +292,7 @@ std::optional OutputParser::parseChunk(const std::string& c toolCallStartTagStatus = streamOutputCache.lookupTags(toolParser->getSpecialParsingStartTags()); } if (toolCallStartTagStatus == TagLookupStatus::FOUND_COMPLETE) { - return parseToolCallChunk(finishReason); + return parseToolCallChunk(tokens, finishReason); } // else startTagStatus is FOUND_INCOMPLETE or NOT_FOUND, we continue processing if (toolCallStartTagStatus == TagLookupStatus::FOUND_INCOMPLETE) { anyStartTagStatus = toolCallStartTagStatus; // We have at least one incomplete start tag @@ -310,18 +310,18 @@ std::optional OutputParser::parseChunk(const std::string& c TagLookupStatus endTagStatus = streamOutputCache.lookupTag(reasoningParser->getParsingEndTag()); if (endTagStatus == TagLookupStatus::FOUND_COMPLETE) { // Switch back to UNKNOWN phase (we can have either CONTENT or TOOL_CALLS next) - return parseReasoningChunk(finishReason, UNKNOWN); + return parseReasoningChunk(tokens, finishReason, UNKNOWN); } else if (endTagStatus == TagLookupStatus::FOUND_INCOMPLETE && finishReason == ov::genai::GenerationFinishReason::NONE) { return std::nullopt; // Wait for more chunks to determine if end tag is complete } - return parseReasoningChunk(finishReason); + return parseReasoningChunk(tokens, finishReason); } else if (processingPhase == CONTENT) { // If we are in the CONTENT phase, we check if tool parser start tag is found and if so, switch to TOOL_CALLS phase. // TOOL_CALLS is the only phase that can be processed after CONTENT. if (applyToolParser) { TagLookupStatus toolStartTagStatus = streamOutputCache.lookupTags(toolParser->getParsingStartTags()); if (toolStartTagStatus == TagLookupStatus::FOUND_COMPLETE) { - return parseToolCallChunk(finishReason); + return parseToolCallChunk(tokens, finishReason); } else if (toolStartTagStatus == TagLookupStatus::FOUND_INCOMPLETE && finishReason == ov::genai::GenerationFinishReason::NONE) { return std::nullopt; // Wait for more chunks to determine if end tag is complete } @@ -337,9 +337,9 @@ std::optional OutputParser::parseChunk(const std::string& c if (toolEndTagStatus == TagLookupStatus::FOUND_COMPLETE) { // If tool call has finished, we switch to waiting for next tool call as tool calls in the last phase, // so we either get next tool call or finish processing. - return parseToolCallChunk(finishReason, TOOL_CALLS_WAITING_FOR_TOOL); + return parseToolCallChunk(tokens, finishReason, TOOL_CALLS_WAITING_FOR_TOOL); } - return parseToolCallChunk(finishReason); + return parseToolCallChunk(tokens, finishReason); } else if (processingPhase == TOOL_CALLS_WAITING_FOR_TOOL) { // In this phase we are waiting for next tool call or finish of generation. // If we get next tool call start tag, we switch to TOOL_CALLS phase, otherwise if generation finishes we switch to CONTENT phase to flush any remaining content. @@ -349,9 +349,9 @@ std::optional OutputParser::parseChunk(const std::string& c } if (toolStartTagStatus == TagLookupStatus::FOUND_COMPLETE) { // If tool call has started, we switch back to processing tool phase. - return parseToolCallChunk(finishReason, TOOL_CALLS_PROCESSING_TOOL); + return parseToolCallChunk(tokens, finishReason, TOOL_CALLS_PROCESSING_TOOL); } - return parseToolCallChunk(finishReason); + return parseToolCallChunk(tokens, finishReason); } else { SPDLOG_LOGGER_ERROR(llm_calculator_logger, "Unexpected processing phase: {}", static_cast(processingPhase)); throw std::runtime_error("Unexpected error during stream output parsing"); diff --git a/src/llm/io_processing/output_parser.hpp b/src/llm/io_processing/output_parser.hpp index 312393fd47..2e424f63dd 100644 --- a/src/llm/io_processing/output_parser.hpp +++ b/src/llm/io_processing/output_parser.hpp @@ -68,8 +68,8 @@ class OutputParser { // Regular content parsing method does not require finishReason as content is always parsed std::optional parseContentChunk(ProcessingPhase newPhase = CONTENT); - std::optional parseToolCallChunk(ov::genai::GenerationFinishReason finishReason, ProcessingPhase newPhase = TOOL_CALLS_PROCESSING_TOOL); - std::optional parseReasoningChunk(ov::genai::GenerationFinishReason finishReason, ProcessingPhase newPhase = REASONING); + std::optional parseToolCallChunk(const std::vector& tokens, ov::genai::GenerationFinishReason finishReason, ProcessingPhase newPhase = TOOL_CALLS_PROCESSING_TOOL); + std::optional parseReasoningChunk(const std::vector& tokens, ov::genai::GenerationFinishReason finishReason, ProcessingPhase newPhase = REASONING); public: OutputParser() = delete; @@ -84,7 +84,8 @@ class OutputParser { // Parse model output chunk in the steaming mode. Returns a JSON object containing the delta that conforms to OpenAI API // or nullopt if no response can be produced. - std::optional parseChunk(const std::string& chunkResponse, const bool toolsAvailable, ov::genai::GenerationFinishReason finishReason); + // tokens holds the token IDs that produced chunkResponse (may be empty; currently informational for future use). + std::optional parseChunk(const std::string& chunkResponse, const std::vector& tokens, const bool toolsAvailable, ov::genai::GenerationFinishReason finishReason); bool requiresStreamingWithSpecialTokens() const { if (!reasoningParser) { diff --git a/src/llm/io_processing/phi4/tool_parser.cpp b/src/llm/io_processing/phi4/tool_parser.cpp index f1dc946d97..a9e6366026 100644 --- a/src/llm/io_processing/phi4/tool_parser.cpp +++ b/src/llm/io_processing/phi4/tool_parser.cpp @@ -158,7 +158,7 @@ void Phi4ToolParser::parse(ParsedOutput& parsedOutput, const std::vector Phi4ToolParser::parseChunk(const std::string& chunk, ov::genai::GenerationFinishReason finishReason) { +std::optional Phi4ToolParser::parseChunk(const std::string& chunk, const std::vector& /*tokens*/, ov::genai::GenerationFinishReason finishReason) { /* Phi4 with vLLM template produces tool calls in the format: functools[{"name": [function name], "arguments": [function arguments as JSON]}, ...] @@ -205,7 +205,7 @@ std::optional Phi4ToolParser::parseChunk(const std::string& if (remainingChunk.empty()) { return std::nullopt; // Nothing more to process in this chunk } else { - return parseChunk(remainingChunk, finishReason); + return parseChunk(remainingChunk, {}, finishReason); } } else { // modifiedChunk.length() == parsingStartTag.length() as at this state, chunk cannot be smaller return std::nullopt; // Nothing more to process in this chunk @@ -224,7 +224,7 @@ std::optional Phi4ToolParser::parseChunk(const std::string& return std::nullopt; // Nothing more to process in this chunk } else { // Process the remaining chunk as part of tool call processing - return parseChunk(remainingChunk, finishReason); + return parseChunk(remainingChunk, {}, finishReason); } } else { // Still waiting for the opening bracket, ignore this chunk @@ -242,7 +242,7 @@ std::optional Phi4ToolParser::parseChunk(const std::string& if (remainingChunk.empty()) { return std::nullopt; // Nothing more to process in this chunk } else { - return parseChunk(remainingChunk, finishReason); + return parseChunk(remainingChunk, {}, finishReason); } } else { // Still waiting for the opening brace, ignore this chunk diff --git a/src/llm/io_processing/phi4/tool_parser.hpp b/src/llm/io_processing/phi4/tool_parser.hpp index d358b273ce..7e6734d27f 100644 --- a/src/llm/io_processing/phi4/tool_parser.hpp +++ b/src/llm/io_processing/phi4/tool_parser.hpp @@ -68,7 +68,7 @@ class Phi4ToolParser : public BaseOutputParser { BaseOutputParser(tokenizer) {} void parse(ParsedOutput& parsedOutput, const std::vector& generatedTokens) override; - std::optional parseChunk(const std::string& chunk, ov::genai::GenerationFinishReason finishReason) override; + std::optional parseChunk(const std::string& chunk, const std::vector& tokens, ov::genai::GenerationFinishReason finishReason) override; const std::vector& getParsingStartTags() const override { static const std::vector parsingStartTags = {this->parsingStartTag}; return parsingStartTags; diff --git a/src/llm/io_processing/qwen3/reasoning_parser.cpp b/src/llm/io_processing/qwen3/reasoning_parser.cpp index 87b3614920..df84a2c645 100644 --- a/src/llm/io_processing/qwen3/reasoning_parser.cpp +++ b/src/llm/io_processing/qwen3/reasoning_parser.cpp @@ -41,7 +41,7 @@ void Qwen3ReasoningParser::parse(ParsedOutput& parsedOutput, const std::vector Qwen3ReasoningParser::parseChunk(const std::string& chunk, ov::genai::GenerationFinishReason finishReason) { +std::optional Qwen3ReasoningParser::parseChunk(const std::string& chunk, const std::vector& /*tokens*/, ov::genai::GenerationFinishReason finishReason) { if (chunk.empty()) { SPDLOG_LOGGER_DEBUG(llm_calculator_logger, "Received empty chunk for Qwen3ReasoningParser"); return std::nullopt; diff --git a/src/llm/io_processing/qwen3/reasoning_parser.hpp b/src/llm/io_processing/qwen3/reasoning_parser.hpp index 6254e874e5..9b59f62760 100644 --- a/src/llm/io_processing/qwen3/reasoning_parser.hpp +++ b/src/llm/io_processing/qwen3/reasoning_parser.hpp @@ -37,7 +37,7 @@ class Qwen3ReasoningParser : public BaseOutputParser { BaseOutputParser(tokenizer) {} void parse(ParsedOutput& parsedOutput, const std::vector& generatedTokens) override; - std::optional parseChunk(const std::string& chunk, ov::genai::GenerationFinishReason finishReason) override; + std::optional parseChunk(const std::string& chunk, const std::vector& tokens, ov::genai::GenerationFinishReason finishReason) override; const std::vector& getParsingStartTags() const override { static const std::vector parsingStartTags{this->parsingStartTag}; return parsingStartTags; diff --git a/src/llm/io_processing/qwen3coder/qwen3coder_tool_parser.cpp b/src/llm/io_processing/qwen3coder/qwen3coder_tool_parser.cpp index 0c42d109d0..4f38be2548 100644 --- a/src/llm/io_processing/qwen3coder/qwen3coder_tool_parser.cpp +++ b/src/llm/io_processing/qwen3coder/qwen3coder_tool_parser.cpp @@ -412,7 +412,7 @@ std::optional Qwen3CoderToolParser::sendFirstDeltaIfNeeded( return doc; } -std::optional Qwen3CoderToolParser::parseChunk(const std::string& newChunk, ov::genai::GenerationFinishReason finishReason) { +std::optional Qwen3CoderToolParser::parseChunk(const std::string& newChunk, const std::vector& /*tokens*/, ov::genai::GenerationFinishReason finishReason) { // streamParser will return optional toolCalls when a tool call is completed // if toolCalls is returned, we need to wrap it in the required JSON structure and return it // if toolCalls is not returned, but we are insideFunction state, we need to return the first delta with function name once diff --git a/src/llm/io_processing/qwen3coder/qwen3coder_tool_parser.hpp b/src/llm/io_processing/qwen3coder/qwen3coder_tool_parser.hpp index b9d26aaa73..b5db9c019d 100644 --- a/src/llm/io_processing/qwen3coder/qwen3coder_tool_parser.hpp +++ b/src/llm/io_processing/qwen3coder/qwen3coder_tool_parser.hpp @@ -146,7 +146,7 @@ class Qwen3CoderToolParser : public BaseOutputParser { explicit Qwen3CoderToolParser(ov::genai::Tokenizer& tokenizer, const ToolsSchemas_t& toolSchemas); void parse(ParsedOutput& parsedOutput, const std::vector& generatedTokens) override; - std::optional parseChunk(const std::string& chunk, ov::genai::GenerationFinishReason finishReason) override; + std::optional parseChunk(const std::string& chunk, const std::vector& tokens, ov::genai::GenerationFinishReason finishReason) override; const std::vector& getParsingStartTags() const override { static const std::vector startTags = {TOOL_START_TAG, FUNCTION_NAME_TAG}; return startTags; diff --git a/src/llm/language_model/legacy/legacy_executor.cpp b/src/llm/language_model/legacy/legacy_executor.cpp index 80699d418c..73ac908e98 100644 --- a/src/llm/language_model/legacy/legacy_executor.cpp +++ b/src/llm/language_model/legacy/legacy_executor.cpp @@ -43,7 +43,7 @@ void LegacyExecutor::processRequest() { SPDLOG_LOGGER_TRACE(llm_executor_logger, "Generation ended"); } requestExecutionContext->readySignal.set_value(); - requestExecutionContext->executionInProgress.notify_one(); + requestExecutionContext->deltaChannel.signalComplete(); std::unique_lock lock(queueMutex); requests.pop(); } diff --git a/src/llm/language_model/legacy/servable.cpp b/src/llm/language_model/legacy/servable.cpp index 8e244df219..fdd152f648 100644 --- a/src/llm/language_model/legacy/servable.cpp +++ b/src/llm/language_model/legacy/servable.cpp @@ -17,6 +17,7 @@ #include #include #include +#include #include #include "../../../logging.hpp" @@ -24,6 +25,7 @@ #include "../../../status.hpp" #include "../../apis/openai_completions.hpp" #include "../../apis/openai_responses.hpp" +#include "../../ovms_text_streamer.hpp" #pragma warning(push) #pragma warning(disable : 4005 4309 6001 6385 6386 6326 6011 4005 4456 6246) @@ -92,34 +94,32 @@ absl::Status LegacyServable::parseRequest(std::shared_ptrapiHandler->isStream()) { - legacyExecutionContext->lastStreamerCallbackOutput = ""; // initialize with empty string - } - auto callback = [& executionInProgress = legacyExecutionContext->executionInProgress, - &mutex = legacyExecutionContext->mutex, - &lastStreamerCallbackOutput = legacyExecutionContext->lastStreamerCallbackOutput, - &clientDisconnected = legacyExecutionContext->clientDisconnected, - streamMode = legacyExecutionContext->apiHandler->isStream()](std::string text) { - SPDLOG_LOGGER_TRACE(llm_calculator_logger, "Streamer callback executed with text: [{}]", text); - if (clientDisconnected.load()) { - executionInProgress.notify_one(); - return ov::genai::StreamingStatus::CANCEL; - } - if (streamMode) { - std::lock_guard lock(mutex); - lastStreamerCallbackOutput += text; - executionInProgress.notify_one(); + if ((legacyExecutionContext->apiHandler->getOutputParser() != nullptr && + legacyExecutionContext->apiHandler->getOutputParser()->requiresStreamingWithSpecialTokens()) || + !legacyExecutionContext->apiHandler->getRequest().skipSpecialTokens) { + streamerConfig.insert(ov::genai::skip_special_tokens(false)); } - return ov::genai::StreamingStatus::RUNNING; - }; - ov::AnyMap streamerConfig; - if (legacyExecutionContext->apiHandler->isStream() && - ((legacyExecutionContext->apiHandler->getOutputParser() != nullptr && - legacyExecutionContext->apiHandler->getOutputParser()->requiresStreamingWithSpecialTokens()) || - !legacyExecutionContext->apiHandler->getRequest().skipSpecialTokens)) { - streamerConfig.insert(ov::genai::skip_special_tokens(false)); + auto ovmsCallback = [& ctx = *legacyExecutionContext](rapidjson::Document delta) -> ov::genai::StreamingStatus { + if (ctx.clientDisconnected.load()) { + ctx.deltaChannel.signalComplete(); + return ov::genai::StreamingStatus::CANCEL; + } + ctx.deltaChannel.push(std::move(delta)); + return ov::genai::StreamingStatus::RUNNING; + }; + legacyExecutionContext->textStreamer = std::make_shared( + getProperties()->tokenizer, + legacyExecutionContext->apiHandler->getOutputParser(), + legacyExecutionContext->apiHandler->areToolsAvailable(), + std::move(ovmsCallback), + streamerConfig); + } else { + legacyExecutionContext->textStreamer = std::make_shared( + getProperties()->tokenizer, + [](std::string) { return ov::genai::StreamingStatus::RUNNING; }); } - legacyExecutionContext->textStreamer = std::make_shared(getProperties()->tokenizer, callback, streamerConfig); legacyExecutionContext->generationConfigBuilder = std::make_shared(getProperties()->baseGenerationConfig, getProperties()->toolParserName, getProperties()->enableToolGuidedGeneration, @@ -185,6 +185,7 @@ absl::Status LegacyServable::prepareCompleteResponse(std::shared_ptr& executionContext) { + executionContext->deltaChannel.waitForData(); return absl::OkStatus(); } @@ -193,62 +194,75 @@ absl::Status LegacyServable::preparePartialResponse(std::shared_ptrpayload.client->isDisconnected()) { return absl::CancelledError(); } - std::string lastTextChunk; - auto generationStatus = legacyExecutionContext->finished.wait_for(std::chrono::nanoseconds::zero()); - { - std::unique_lock lock(legacyExecutionContext->mutex); - while (executionContext->lastStreamerCallbackOutput.size() == 0 && generationStatus != std::future_status::ready) { - SPDLOG_LOGGER_TRACE(llm_executor_logger, "Waiting for partial data..."); - auto executionInProgressStatus = legacyExecutionContext->executionInProgress.wait_for(lock, std::chrono::milliseconds(10)); - generationStatus = legacyExecutionContext->finished.wait_for(std::chrono::nanoseconds::zero()); - if (executionInProgressStatus == std::cv_status::timeout && generationStatus == std::future_status::ready) { - SPDLOG_LOGGER_TRACE(llm_executor_logger, "Race condition avoided - notification was missed but recovered with timeout"); - } - } - lastTextChunk = executionContext->lastStreamerCallbackOutput; - executionContext->lastStreamerCallbackOutput = ""; - } - if (generationStatus != std::future_status::ready) { // continue + std::vector deltas = executionContext->deltaChannel.drain(); + const bool isFinishing = executionContext->deltaChannel.complete(); + if (!isFinishing) { // For RESPONSES endpoint, always call serializeStreamingChunk so that // output item initialization events are emitted even before the tokenizer produces text. - if (lastTextChunk.size() > 0 || executionContext->apiHandler->getEndpoint() == Endpoint::RESPONSES) { - std::string serializedChunk = executionContext->apiHandler->serializeStreamingChunk(lastTextChunk, ov::genai::GenerationFinishReason::NONE); - if (!serializedChunk.empty()) { - executionContext->response = wrapTextInServerSideEventMessage(serializedChunk); - SPDLOG_LOGGER_DEBUG(llm_calculator_logger, "Generated subsequent streaming response: {}", executionContext->response); + if (deltas.size() > 0 || executionContext->apiHandler->getEndpoint() == Endpoint::RESPONSES) { + for (auto& delta : deltas) { + std::string serialized = executionContext->apiHandler->serializeStreamingChunk( + std::move(delta), ov::genai::GenerationFinishReason::NONE); + if (!serialized.empty()) { + executionContext->response += wrapTextInServerSideEventMessage(serialized); + SPDLOG_LOGGER_DEBUG(llm_calculator_logger, "Generated subsequent streaming response: {}", serialized); + } + } + if (deltas.empty()) { + // No delta generated yet — emit lifecycle events for RESPONSES endpoint. + if (!executionContext->lifecyclePrimed) { + std::string serialized = executionContext->apiHandler->serializeStreamingChunk( + rapidjson::Document{}, ov::genai::GenerationFinishReason::NONE); + if (!serialized.empty()) { + executionContext->response = wrapTextInServerSideEventMessage(serialized); + executionContext->lifecyclePrimed = true; + } + } } } executionContext->sendLoopbackSignal = true; - } else { // finish generation + } else { if (!legacyExecutionContext->success) { return absl::InvalidArgumentError("Request processing failed, check its correctness."); } OVMS_PROFILE_SCOPE("Generation of last streaming response"); + // Flush held-back tokens from the delay buffer; fires OVMSTextStreamer callback + // which pushes final delta(s) into deltaChannel. executionContext->textStreamer->end(); - // if streamer::put returned a value, streamer::end() result will not contain it, so we add it manually - if (!executionContext->lastStreamerCallbackOutput.empty()) { - lastTextChunk = lastTextChunk + executionContext->lastStreamerCallbackOutput; + // Drain again to collect the end-flush delta(s) and merge with any pre-end ones. + for (auto& d : executionContext->deltaChannel.drain()) { + deltas.push_back(std::move(d)); } if (legacyExecutionContext->results.finish_reasons.empty()) { SPDLOG_LOGGER_DEBUG(llm_calculator_logger, "Missing finish reason in legacy LM streaming generation result, defaulting to STOP"); } - // Legacy generation path always runs with batch=1, so we read the single finish reason at index 0. + // Legacy generation path always runs with deltas=1, so we read the single finish reason at index 0. ov::genai::GenerationFinishReason finishReason = legacyExecutionContext->results.finish_reasons.empty() ? ov::genai::GenerationFinishReason::STOP : legacyExecutionContext->results.finish_reasons[0]; - std::string serializedChunk = executionContext->apiHandler->serializeStreamingChunk(lastTextChunk, finishReason); - if (!serializedChunk.empty()) { - executionContext->response = wrapTextInServerSideEventMessage(serializedChunk); + if (!deltas.empty()) { + for (size_t i = 0; i < deltas.size(); ++i) { + const bool isLast = (i == deltas.size() - 1); + std::string serialized = executionContext->apiHandler->serializeStreamingChunk( + std::move(deltas[i]), + isLast ? finishReason : ov::genai::GenerationFinishReason::NONE); + if (!serialized.empty()) { + executionContext->response += wrapTextInServerSideEventMessage(serialized); + } + } + } else { + // Parser produced no delta (generation ended on a swallowed token). + std::string serialized = executionContext->apiHandler->serializeStreamingChunk( + rapidjson::Document{}, finishReason); + if (!serialized.empty()) { + executionContext->response += wrapTextInServerSideEventMessage(serialized); + } } - executionContext->apiHandler->setPromptTokensUsage(legacyExecutionContext->results.perf_metrics.get_num_input_tokens()); executionContext->apiHandler->setCompletionTokensUsage(legacyExecutionContext->results.perf_metrics.get_num_generated_tokens()); if (executionContext->apiHandler->getStreamOptions().includeUsage) executionContext->response += wrapTextInServerSideEventMessage(executionContext->apiHandler->serializeStreamingUsageChunk()); - executionContext->response += wrapTextInServerSideEventMessage("[DONE]"); - SPDLOG_LOGGER_DEBUG(llm_calculator_logger, "Generated complete streaming response: {}", executionContext->response); executionContext->sendLoopbackSignal = false; - return absl::OkStatus(); } return absl::OkStatus(); } diff --git a/src/llm/language_model/legacy/servable.hpp b/src/llm/language_model/legacy/servable.hpp index eae3c9580e..81e4b798d6 100644 --- a/src/llm/language_model/legacy/servable.hpp +++ b/src/llm/language_model/legacy/servable.hpp @@ -30,8 +30,6 @@ struct LegacyServableExecutionContext : public GenAiServableExecutionContext { ov::genai::EncodedResults results; std::promise readySignal; std::future finished = readySignal.get_future(); - std::mutex mutex; - std::condition_variable executionInProgress; // Workaround needed to pass generation config to the executor that requires it ov::genai::GenerationConfig baseGenerationConfig; bool success{true}; @@ -41,7 +39,7 @@ struct LegacyServableExecutionContext : public GenAiServableExecutionContext { void signalDisconnection() { clientDisconnected = true; - executionInProgress.notify_all(); + deltaChannel.signalComplete(); } }; diff --git a/src/llm/ovms_text_streamer.cpp b/src/llm/ovms_text_streamer.cpp new file mode 100644 index 0000000000..9076bffa8b --- /dev/null +++ b/src/llm/ovms_text_streamer.cpp @@ -0,0 +1,196 @@ +//***************************************************************************** +// Copyright 2026 Intel Corporation +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +//***************************************************************************** +#include "ovms_text_streamer.hpp" + +#include +#include +#include + +#include + +namespace ovms { + +// No-op callback passed to the base TextStreamer constructor. +// OVMSTextStreamer overrides write(int64_t) and end() completely, so the base +// callback is never invoked through the normal TextStreamer code path. +static ov::genai::StreamingStatus noop_string_callback(std::string) { + return ov::genai::StreamingStatus::RUNNING; +} + +OVMSTextStreamer::OVMSTextStreamer( + const ov::genai::Tokenizer& tokenizer, + std::shared_ptr output_parser, + bool tools_available, + Callback callback, + const ov::AnyMap& decode_params) : + ov::genai::TextStreamer(tokenizer, noop_string_callback, decode_params), + m_output_parser(output_parser), + m_tools_available(tools_available), + m_callback(std::move(callback)) {} + +// ----------------------------------------------------------------------------- +// write(int64_t) — owned decode loop (does NOT delegate to TextStreamer::write) +// +// Replicates TextStreamer's flush heuristics: +// 1. Newline flush: emit immediately when text ends with '\n'. +// 2. Incomplete UTF-8 guard: if decoded length did not advance, mark as -1. +// 3. Delay buffer: hold back the last DELAY_N_TOKENS positions before flushing. +// +// Operates directly on the protected members inherited from TextStreamer: +// m_tokens_cache, m_decoded_lengths, m_printed_len, +// m_tokenizer, m_additional_detokenization_params. +// ----------------------------------------------------------------------------- +ov::genai::StreamingStatus OVMSTextStreamer::write(int64_t token) { + m_tokens_cache.push_back(token); + const std::string text = m_tokenizer.decode(m_tokens_cache, m_additional_detokenization_params); + m_decoded_lengths.push_back(static_cast(text.size())); + + // 1. Newline flush: emit everything and reset. + if (!text.empty() && text.back() == '\n' && text.size() > m_printed_len) { + const auto status = flush_chunk(text, text.size(), ov::genai::GenerationFinishReason::NONE); + m_tokens_cache.clear(); + m_decoded_lengths.clear(); + m_printed_len = 0; + return status; + } + + // 2. Incomplete UTF-8: decoded length did not advance — last bytes are a + // partial multibyte sequence. Mark this slot as -1 so the delay check + // skips it (matching TextStreamer's own handling). + const size_t n = m_decoded_lengths.size(); + if (n >= 2 && m_decoded_lengths[n - 1] == m_decoded_lengths[n - 2]) { + m_decoded_lengths.back() = -1; + return ov::genai::StreamingStatus::RUNNING; + } + + // 3. Delay buffer: need at least DELAY_N_TOKENS entries before flushing. + if (n < DELAY_N_TOKENS) { + return ov::genai::StreamingStatus::RUNNING; + } + + // Flush up to the decoded length DELAY_N_TOKENS positions from the end. + const int64_t print_until_len = m_decoded_lengths[n - DELAY_N_TOKENS]; + if (print_until_len <= 0 || static_cast(print_until_len) <= m_printed_len) { + return ov::genai::StreamingStatus::RUNNING; + } + + return flush_chunk(text, static_cast(print_until_len), + ov::genai::GenerationFinishReason::NONE); +} + +ov::genai::StreamingStatus OVMSTextStreamer::write(const std::vector& tokens) { + ov::genai::StreamingStatus status = ov::genai::StreamingStatus::RUNNING; + for (const int64_t token : tokens) { + status = write(token); + if (status != ov::genai::StreamingStatus::RUNNING) { + return status; + } + } + return status; +} + +// ----------------------------------------------------------------------------- +// +// Decodes the remaining token cache (up to DELAY_N_TOKENS - 1 tokens that +// write() deliberately held back) and flushes with GenerationFinishReason::STOP. +// +// Does NOT call TextStreamer::end() — the base would fire its no-op callback +// and attempt to clear the protected state that we have already managed. +// ----------------------------------------------------------------------------- +void OVMSTextStreamer::end() { + if (!m_tokens_cache.empty()) { + const std::string text = m_tokenizer.decode(m_tokens_cache, m_additional_detokenization_params); + if (text.size() > m_printed_len) { + flush_chunk(text, text.size(), ov::genai::GenerationFinishReason::STOP); + } else { + // Nothing new to emit from the token cache, but we still need to + // signal end-of-stream so parseChunk can do its STOP-path cleanup + // (e.g. Hermes3 closes the argument string via finish_reason == STOP). + flush_chunk(text, m_printed_len, ov::genai::GenerationFinishReason::STOP); + } + } + m_tokens_cache.clear(); + m_decoded_lengths.clear(); + m_printed_len = 0; +} + +// ----------------------------------------------------------------------------- +// flush_chunk — compute token slice, call OutputParser::parseChunk, fire callback +// +// Token slice computation mirrors ov::genai::TextParserStreamer::write(string): +// first_idx = upper_bound(m_decoded_lengths, m_printed_len) +// last_idx = upper_bound(m_decoded_lengths, print_until) +// +// The resulting tokens sub-vector is passed to OutputParser::parseChunk alongside +// the decoded text chunk. All existing parsers ignore tokens in Phase 1; the +// parameter is available for future phase-aware parsers. +// +// Callback is always fired when: +// - parseChunk returns a non-nullopt Document, OR +// - finish_reason != NONE (ensures finish_reason chunk is always emitted even +// when the parser produces no final delta, e.g. after a completed tool call). +// When no Document is available for a STOP flush, an empty Document{} is passed. +// ----------------------------------------------------------------------------- +ov::genai::StreamingStatus OVMSTextStreamer::flush_chunk( + const std::string& text, + size_t print_until, + ov::genai::GenerationFinishReason finish_reason) { + const std::string chunk{text.data() + m_printed_len, print_until - m_printed_len}; + + // Token slice: tokens whose decoded positions fall in (m_printed_len, print_until]. + const auto first_it = std::upper_bound( + m_decoded_lengths.begin(), m_decoded_lengths.end(), + static_cast(m_printed_len)); + const auto last_it = std::upper_bound( + m_decoded_lengths.begin(), m_decoded_lengths.end(), + static_cast(print_until)); + const auto first_idx = static_cast(first_it - m_decoded_lengths.begin()); + const auto last_idx = static_cast(last_it - m_decoded_lengths.begin()); + const std::vector tokens( + m_tokens_cache.begin() + static_cast(first_idx), + m_tokens_cache.begin() + static_cast(last_idx)); + + m_printed_len = print_until; + + std::optional delta; + if (m_output_parser != nullptr) { + delta = m_output_parser->parseChunk(chunk, tokens, m_tools_available, finish_reason); + } else { + // No parser: wrap raw text in a trivial {"delta":{"content":"..."}} document. + rapidjson::Document doc; + doc.SetObject(); + rapidjson::Document::AllocatorType& alloc = doc.GetAllocator(); + rapidjson::Value delta_obj(rapidjson::kObjectType); + delta_obj.AddMember("content", + rapidjson::Value(chunk.c_str(), alloc), + alloc); + doc.AddMember("delta", delta_obj, alloc); + delta = std::move(doc); + } + + if (delta.has_value()) { + return m_callback(std::move(*delta)); + } + if (finish_reason != ov::genai::GenerationFinishReason::NONE) { + // Parser produced no delta for the final flush (e.g. generation ended + // on a special token the parser absorbed). Still fire the callback with + // an empty Document so preparePartialResponse can emit the finish_reason. + return m_callback(rapidjson::Document{}); + } + return ov::genai::StreamingStatus::RUNNING; +} + +} // namespace ovms diff --git a/src/llm/ovms_text_streamer.hpp b/src/llm/ovms_text_streamer.hpp new file mode 100644 index 0000000000..808054ae60 --- /dev/null +++ b/src/llm/ovms_text_streamer.hpp @@ -0,0 +1,95 @@ +//***************************************************************************** +// Copyright 2026 Intel Corporation +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +//***************************************************************************** +#pragma once + +#include +#include +#include +#include +#include + +#include +#include + +#include "io_processing/output_parser.hpp" + +namespace ovms { + +// OVMSTextStreamer inherits ov::genai::TextStreamer to reuse its protected +// decode-loop state (m_tokenizer, m_tokens_cache, m_decoded_lengths, +// m_printed_len, m_additional_detokenization_params). It overrides +// write(int64_t) and end() completely — the no-op callback passed at +// construction is never invoked. +// +// On every flush event the streamer: +// 1. Computes the token slice that produced the current text chunk via the +// same upper_bound logic used by ov::genai::TextParserStreamer. +// 2. Calls OutputParser::parseChunk(chunk, tokens, tools_available, finish_reason). +// 3. If the result is non-nullopt (or this is the final flush), fires the +// registered Callback with the Document. +// +// The Callback accumulates Documents in pendingDeltas on the execution context. +// preparePartialResponse drains pendingDeltas after each write()/end() cycle. +// +// When output_parser is nullptr (e.g. /v1/completions endpoint), the streamer +// wraps the raw text in a trivial {"delta":{"content":"..."}} Document and +// fires the callback unconditionally, preserving existing behavior. +class OVMSTextStreamer : public ov::genai::TextStreamer { +public: + // Callback receives a Document and returns the streaming status. + // Document shape is always {"delta":{...}} matching the OpenAI delta format. + // For the finish-only case (nullopt from parseChunk + STOP finishReason), + // an empty Document{} is passed so the caller can emit the finish_reason chunk. + using Callback = std::function; + + // outputParser may be nullptr (e.g. for the unary VLM path). + // TODO(phase3): rework ownership — OVMSTextStreamer should not need to keep + // the parser alive; it will be restructured in the next refactor phase. + // toolsAvailable must be evaluated after parseRequest() has processed the body. + // decodeParams controls skip_special_tokens etc. — static for Phase 1. + OVMSTextStreamer( + const ov::genai::Tokenizer& tokenizer, + std::shared_ptr output_parser, + bool tools_available, + Callback callback, + const ov::AnyMap& decode_params); + + ov::genai::StreamingStatus write(int64_t token) override; + // TextStreamer::write(const vector&) calls ov::genai::TextStreamer::write(token) + // with a qualified (non-virtual) call, bypassing this class's write(int64_t) override. + // Override here to ensure our flush logic fires for every token. + // TODO(phase2): revisit once GenAI provides a cleaner extensibility hook. + ov::genai::StreamingStatus write(const std::vector& tokens) override; + void end() override; + +private: + // TODO(phase3): see constructor comment — ownership will be reworked. + std::shared_ptr m_output_parser; + bool m_tools_available; + Callback m_callback; + + // Must match the file-scope constexpr in openvino/genai text_streamer.cpp. + // Named here so a future GenAI change is a single update point. + static constexpr size_t DELAY_N_TOKENS = 3; + + // Flush text[m_printed_len : print_until] with the corresponding token slice. + ov::genai::StreamingStatus flush_chunk( + const std::string& text, + size_t print_until, + ov::genai::GenerationFinishReason finish_reason); +}; + +} // namespace ovms diff --git a/src/llm/servable.cpp b/src/llm/servable.cpp index 5a0955b4f5..c2b1d40a2f 100644 --- a/src/llm/servable.cpp +++ b/src/llm/servable.cpp @@ -16,6 +16,7 @@ #include #include #include +#include #include #pragma warning(push) @@ -35,6 +36,7 @@ #include "../profiler.hpp" #include "apis/openai_completions.hpp" #include "apis/openai_responses.hpp" +#include "ovms_text_streamer.hpp" #include "servable.hpp" #include "text_utils.hpp" #include "../tokenize/tokenize_parser.hpp" @@ -139,10 +141,8 @@ absl::Status GenAiServable::parseRequest(std::shared_ptrapiHandler->isStream()) { - executionContext->lastStreamerCallbackOutput = ""; // initialize with empty string - auto callback = [& lastStreamerCallbackOutput = executionContext->lastStreamerCallbackOutput](std::string text) { - SPDLOG_LOGGER_TRACE(llm_calculator_logger, "Streamer callback executed with text: [{}]", text); - lastStreamerCallbackOutput = text; + auto ovmsCallback = [& ctx = *executionContext](rapidjson::Document delta) -> ov::genai::StreamingStatus { + ctx.deltaChannel.push(std::move(delta)); return ov::genai::StreamingStatus::RUNNING; }; ov::AnyMap streamerConfig; @@ -151,7 +151,12 @@ absl::Status GenAiServable::parseRequest(std::shared_ptrapiHandler->getRequest().skipSpecialTokens) { streamerConfig.insert(ov::genai::skip_special_tokens(false)); } - executionContext->textStreamer = std::make_shared(getProperties()->tokenizer, callback, streamerConfig); + executionContext->textStreamer = std::make_shared( + getProperties()->tokenizer, + executionContext->apiHandler->getOutputParser(), + executionContext->apiHandler->areToolsAvailable(), + std::move(ovmsCallback), + streamerConfig); } executionContext->generationConfigBuilder = std::make_shared(getProperties()->baseGenerationConfig, getProperties()->toolParserName, @@ -300,46 +305,83 @@ absl::Status GenAiServable::preparePartialResponse(std::shared_ptrgenerationOutputs[0]; executionContext->apiHandler->incrementProcessedTokens(generationOutput.generated_ids.size()); - std::stringstream ss; - executionContext->textStreamer->write(generationOutput.generated_ids); - ss << executionContext->lastStreamerCallbackOutput; - // OpenVINO GenAI TextStreamer dose not trigger callback if text is empty: https://github.com/openvinotoolkit/openvino.genai/blob/434c2a9494fb1ee83ca7a36fe8315cfc2691c232/src/cpp/src/text_streamer.cpp#L102-L108 - // Reset lastStreamerCallbackOutput as "" to avoid repeated sending previous text if lastStreamerCallbackOutput not updated by callback - executionContext->lastStreamerCallbackOutput = ""; - - std::string lastTextChunk = ss.str(); - bool isFirstToken = GenerationPhase::INPUT_TOKEN_PROCESSING == executionContext->generationPhase; if (isFirstToken) { executionContext->generationPhase = GenerationPhase::OUTPUT_TOKEN_PROCESSING; } ov::genai::GenerationFinishReason finishReason = generationOutput.finish_reason; - if (finishReason == ov::genai::GenerationFinishReason::NONE) { // continue - // For RESPONSES endpoint, always call serializeStreamingChunk so that - // output item initialization events (output_item.added, content_part.added) - // are emitted on the first chunk, even before the tokenizer produces text. - if (lastTextChunk.size() > 0 || executionContext->apiHandler->getEndpoint() == Endpoint::RESPONSES) { - std::string serializedChunk = executionContext->apiHandler->serializeStreamingChunk(lastTextChunk, finishReason); - if (!serializedChunk.empty()) { - executionContext->response = wrapTextInServerSideEventMessage(serializedChunk); - SPDLOG_LOGGER_DEBUG(llm_calculator_logger, "Generated subsequent streaming response: {}", executionContext->response); + const bool isFinishing = (finishReason != ov::genai::GenerationFinishReason::NONE); + + // OVMSTextStreamer::write() fires the callback for each flush event, pushing + // Documents into executionContext->deltaChannel. + executionContext->textStreamer->write(generationOutput.generated_ids); + + if (isFinishing) { + OVMS_PROFILE_SCOPE("Generation of last streaming response"); + // end() flushes held-back tokens and calls parseChunk(STOP). Any resulting + // Document is pushed into deltaChannel by the callback. + executionContext->textStreamer->end(); + } + + // Drain all deltas accumulated during this write()/end() cycle. + std::vector deltas = executionContext->deltaChannel.drain(); + const size_t count = deltas.size(); + + if (!isFinishing) { + // For RESPONSES endpoint, always call serializeStreamingChunk so lifecycle + // events (output_item.added, content_part.added) are emitted on the first + // call, even before the tokenizer produces text. + if (count > 0 || executionContext->apiHandler->getEndpoint() == Endpoint::RESPONSES) { + // Emit each delta. All are mid-stream so finishReason is NONE. + for (size_t i = 0; i < count; ++i) { + std::string serialized = executionContext->apiHandler->serializeStreamingChunk( + std::move(deltas[i]), + ov::genai::GenerationFinishReason::NONE); + if (!serialized.empty()) { + executionContext->response += wrapTextInServerSideEventMessage(serialized); + SPDLOG_LOGGER_DEBUG(llm_calculator_logger, "Generated subsequent streaming response: {}", serialized); + } + } + if (count == 0) { + // No delta generated yet — emit lifecycle events (response.created, response.in_progress) + // for the RESPONSES endpoint before any content arrives. + if (!executionContext->lifecyclePrimed) { + std::string serialized = executionContext->apiHandler->serializeStreamingChunk( + rapidjson::Document{}, ov::genai::GenerationFinishReason::NONE); + if (!serialized.empty()) { + executionContext->response += wrapTextInServerSideEventMessage(serialized); + executionContext->lifecyclePrimed = true; + } + } } } else if (isFirstToken) { std::string serializedChunk = executionContext->apiHandler->serializeStreamingHandshakeChunk(); - executionContext->response = wrapTextInServerSideEventMessage(serializedChunk); + if (!serializedChunk.empty()) { + executionContext->response = wrapTextInServerSideEventMessage(serializedChunk); + } } executionContext->sendLoopbackSignal = true; - } else { // finish generation - OVMS_PROFILE_SCOPE("Generation of last streaming response"); - executionContext->textStreamer->end(); - // if streamer::put returned a value, streamer::end() result will not contain it, so we add it manually - if (!executionContext->lastStreamerCallbackOutput.empty()) { - lastTextChunk = lastTextChunk + executionContext->lastStreamerCallbackOutput; - } - std::string serializedChunk = executionContext->apiHandler->serializeStreamingChunk(lastTextChunk, finishReason); - if (!serializedChunk.empty()) { - executionContext->response = wrapTextInServerSideEventMessage(serializedChunk); + } else { + // Finishing: emit all pending deltas; the last one gets the real finishReason. + if (count > 0) { + for (size_t i = 0; i < count; ++i) { + const bool isLast = (i == count - 1); + std::string serialized = executionContext->apiHandler->serializeStreamingChunk( + std::move(deltas[i]), + isLast ? finishReason : ov::genai::GenerationFinishReason::NONE); + if (!serialized.empty()) { + executionContext->response += wrapTextInServerSideEventMessage(serialized); + } + } + } else { + // No delta produced (generation ended on a swallowed token). + // Still emit a chunk carrying the finish_reason with an empty Document. + std::string serialized = executionContext->apiHandler->serializeStreamingChunk( + rapidjson::Document{}, finishReason); + if (!serialized.empty()) { + executionContext->response += wrapTextInServerSideEventMessage(serialized); + } } if (executionContext->apiHandler->getStreamOptions().includeUsage) { std::string usageChunk = executionContext->apiHandler->serializeStreamingUsageChunk(); @@ -347,12 +389,11 @@ absl::Status GenAiServable::preparePartialResponse(std::shared_ptrresponse += wrapTextInServerSideEventMessage(usageChunk); } } - executionContext->response += wrapTextInServerSideEventMessage("[DONE]"); - SPDLOG_LOGGER_DEBUG(llm_calculator_logger, "Generated complete streaming response: {}", executionContext->response); executionContext->sendLoopbackSignal = false; } + return absl::OkStatus(); } diff --git a/src/llm/servable.hpp b/src/llm/servable.hpp index fa253f942f..41d39716d0 100644 --- a/src/llm/servable.hpp +++ b/src/llm/servable.hpp @@ -15,15 +15,19 @@ //***************************************************************************** #pragma once +#include #include +#include #include #include +#include #include #pragma warning(push) -#pragma warning(disable : 4251 4005 4309 6001 6385 6386 6326 6011 4005 4456 6246) +#pragma warning(disable : 4251 4005 4309 6001 6385 6386 6326 6011 4005 4456 6246 6313) #pragma GCC diagnostic push #pragma GCC diagnostic ignored "-Wdeprecated-declarations" +#include #include "openvino/genai/text_streamer.hpp" #include "mediapipe/framework/calculator_graph.h" #pragma GCC diagnostic pop @@ -65,6 +69,59 @@ enum class GenerationPhase { OUTPUT_TOKEN_PROCESSING, }; +// Thread-safe channel for parsed streaming deltas. +// The producer (OVMSTextStreamer callback, possibly on a background executor thread) +// calls push(); the consumer (preparePartialResponse, always on the calculator thread) +// calls waitForData() then drain(). For CB/stateful paths both sides run on the same +// thread, so the mutex is acquired but uncontested. +struct DeltaChannel { + // Push a delta from any thread (streamer callback). + void push(rapidjson::Document delta) { + { + std::lock_guard lock(m_mutex); + m_deltas.push_back(std::move(delta)); + } + m_cv.notify_one(); + } + + // Signal that no more deltas will be pushed (generation complete or cancelled). + // May be called from any thread. + void signalComplete() { + { + std::lock_guard lock(m_mutex); + m_complete = true; + } + m_cv.notify_one(); + } + + // Block until at least one delta is available or signalComplete() has been called. + // For CB paths this returns immediately since data is already present. + void waitForData() { + std::unique_lock lock(m_mutex); + m_cv.wait(lock, [this] { return !m_deltas.empty() || m_complete; }); + } + + // Move all pending deltas out atomically. Returns an empty vector if none pending. + std::vector drain() { + std::lock_guard lock(m_mutex); + std::vector result; + result.swap(m_deltas); + return result; + } + + // Returns true after signalComplete() has been called. + bool complete() const { + std::lock_guard lock(m_mutex); + return m_complete; + } + +private: + mutable std::mutex m_mutex; + std::condition_variable m_cv; + std::vector m_deltas; + bool m_complete = false; +}; + struct GenAiServableExecutionContext { // Common API related members HttpPayload payload; @@ -79,7 +136,8 @@ struct GenAiServableExecutionContext { std::string response; std::shared_ptr textStreamer; bool sendLoopbackSignal = false; - std::string lastStreamerCallbackOutput; + bool lifecyclePrimed = false; // true once RESPONSES lifecycle events have been primed + DeltaChannel deltaChannel; // thread-safe delta queue used by all streaming paths GenerationPhase generationPhase = GenerationPhase::INPUT_TOKEN_PROCESSING; }; @@ -152,9 +210,9 @@ class GenAiServable { /* parseRequest method implementation MUST fill executionContext apiHandler field and parse request. - For streaming requests, it MUST initialize textStreamer and lastStreamerCallbackOutput fields of executionContext. + For streaming requests, it MUST initialize the textStreamer field of executionContext. Base implementation creates OpenAIChatCompletionsHandler and calls its parseRequest method. - Additionally it initializes textStreamer and lastStreamerCallbackOutput for streaming requests. + Additionally it initializes textStreamer for streaming requests. */ virtual absl::Status parseRequest(std::shared_ptr& executionContext); diff --git a/src/llm/visual_language_model/legacy/legacy_executor.cpp b/src/llm/visual_language_model/legacy/legacy_executor.cpp index a21c799cec..319550c612 100644 --- a/src/llm/visual_language_model/legacy/legacy_executor.cpp +++ b/src/llm/visual_language_model/legacy/legacy_executor.cpp @@ -48,7 +48,7 @@ void VisualLanguageModelLegacyExecutor::processRequest() { SPDLOG_LOGGER_TRACE(llm_executor_logger, "Generation ended"); } requestExecutionContext->readySignal.set_value(); - requestExecutionContext->executionInProgress.notify_one(); + requestExecutionContext->deltaChannel.signalComplete(); std::unique_lock lock(queueMutex); requests.pop(); } diff --git a/src/llm/visual_language_model/legacy/servable.cpp b/src/llm/visual_language_model/legacy/servable.cpp index a40dee296e..b90bd1c2ba 100644 --- a/src/llm/visual_language_model/legacy/servable.cpp +++ b/src/llm/visual_language_model/legacy/servable.cpp @@ -25,6 +25,7 @@ #include "../../../status.hpp" #include "../../apis/openai_completions.hpp" #include "../../apis/openai_responses.hpp" +#include "../../ovms_text_streamer.hpp" #pragma warning(push) #pragma warning(disable : 4005 4309 6001 6385 6386 6326 6011 4005 4456 6246) @@ -105,35 +106,58 @@ absl::Status VisualLanguageModelLegacyServable::parseRequest(std::shared_ptrapiHandler->isStream()) { - legacyExecutionContext->lastStreamerCallbackOutput = ""; // initialize with empty string - } - auto callback = [& executionInProgress = legacyExecutionContext->executionInProgress, - &mutex = legacyExecutionContext->mutex, - &lastStreamerCallbackOutput = legacyExecutionContext->lastStreamerCallbackOutput, - &clientDisconnected = legacyExecutionContext->clientDisconnected](std::string text) { - SPDLOG_LOGGER_TRACE(llm_calculator_logger, "Streamer callback executed with text: [{}]", text); - if (clientDisconnected.load()) { - executionInProgress.notify_one(); - return ov::genai::StreamingStatus::CANCEL; + if ((legacyExecutionContext->apiHandler->getOutputParser() != nullptr && + legacyExecutionContext->apiHandler->getOutputParser()->requiresStreamingWithSpecialTokens()) || + !legacyExecutionContext->apiHandler->getRequest().skipSpecialTokens) { + streamerConfig.insert(ov::genai::skip_special_tokens(false)); } - - // TODO(mzegla): unconditional streaming-like behavior also for unary flow due to GenAI generate limitations. - // This diverges from the general flow - we should have unified systematic approach. - { - std::lock_guard lock(mutex); - lastStreamerCallbackOutput += text; - executionInProgress.notify_one(); + auto ovmsCallback = [& ctx = *legacyExecutionContext](rapidjson::Document delta) -> ov::genai::StreamingStatus { + if (ctx.clientDisconnected.load()) { + ctx.deltaChannel.signalComplete(); + return ov::genai::StreamingStatus::CANCEL; + } + ctx.deltaChannel.push(std::move(delta)); + return ov::genai::StreamingStatus::RUNNING; + }; + legacyExecutionContext->textStreamer = std::make_shared( + getProperties()->tokenizer, + legacyExecutionContext->apiHandler->getOutputParser(), + legacyExecutionContext->apiHandler->areToolsAvailable(), + std::move(ovmsCallback), + streamerConfig); + } else { + // For the unary path we still need OVMSTextStreamer so that the tokenizer + // decode params (e.g. skip_special_tokens) from the request are applied. + // results.texts[0] is decoded by the VLM pipeline with its own hardcoded + // config — using the streamer callback is the only way to respect the user's + // setting here. + // + // Crucially, we pass nullptr as the output parser: serializeUnaryResponse + // feeds accumulatedUnaryText back through encodeTextToTokens() and the + // batch parser (parseOutputIfNeeded), which expects raw decoded text with + // structural tags intact (e.g. , ). Passing a non-null + // parser here would strip those tags via parseChunk before accumulation + // and break the downstream unary parsing of reasoning/tool_calls. + // Will be further reworked in next refactor phases. + if (!legacyExecutionContext->apiHandler->getRequest().skipSpecialTokens) { + streamerConfig.insert(ov::genai::skip_special_tokens(false)); } - return ov::genai::StreamingStatus::RUNNING; - }; - ov::AnyMap streamerConfig; - if ((legacyExecutionContext->apiHandler->getOutputParser() != nullptr && - legacyExecutionContext->apiHandler->getOutputParser()->requiresStreamingWithSpecialTokens()) || - !legacyExecutionContext->apiHandler->getRequest().skipSpecialTokens) { - streamerConfig.insert(ov::genai::skip_special_tokens(false)); + auto unaryCallback = [& ctx = *legacyExecutionContext](rapidjson::Document delta) -> ov::genai::StreamingStatus { + if (delta.HasMember("delta") && delta["delta"].IsObject() && + delta["delta"].HasMember("content") && delta["delta"]["content"].IsString()) { + ctx.accumulatedUnaryText += delta["delta"]["content"].GetString(); + } + return ov::genai::StreamingStatus::RUNNING; + }; + legacyExecutionContext->textStreamer = std::make_shared( + getProperties()->tokenizer, + nullptr, // no parser: accumulate raw decoded text for batch unary parsing + false, + std::move(unaryCallback), + streamerConfig); } - legacyExecutionContext->textStreamer = std::make_shared(getProperties()->tokenizer, callback, streamerConfig); legacyExecutionContext->generationConfigBuilder = std::make_shared(getProperties()->baseGenerationConfig, getProperties()->toolParserName, getProperties()->enableToolGuidedGeneration, @@ -183,24 +207,17 @@ absl::Status VisualLanguageModelLegacyServable::prepareCompleteResponse(std::sha return absl::CancelledError(); } - // TODO(mzegla): Usage of streaming flow here due to GenAI generate limitations. - // This diverges from the general flow - we should have unified systematic approach. - - executionContext->textStreamer->end(); - - std::string completeText; - { - std::lock_guard lock(legacyExecutionContext->mutex); - completeText = std::move(executionContext->lastStreamerCallbackOutput); - executionContext->lastStreamerCallbackOutput.clear(); - } - - executionContext->response = executionContext->apiHandler->serializeUnaryResponse(legacyExecutionContext->results, completeText); + // pipe->generate() called streamer->end() before returning, so accumulatedUnaryText is + // already fully populated by the callbacks fired from OVMSTextStreamer::write()/end(). + const std::string& completeText = legacyExecutionContext->accumulatedUnaryText; + executionContext->response = executionContext->apiHandler->serializeUnaryResponse( + legacyExecutionContext->results, completeText); SPDLOG_LOGGER_DEBUG(llm_calculator_logger, "Complete unary response: {}", executionContext->response); return absl::OkStatus(); } absl::Status VisualLanguageModelLegacyServable::readPartialExecutionResults(std::shared_ptr& executionContext) { + executionContext->deltaChannel.waitForData(); return absl::OkStatus(); } @@ -209,61 +226,75 @@ absl::Status VisualLanguageModelLegacyServable::preparePartialResponse(std::shar if (legacyExecutionContext->payload.client->isDisconnected()) { return absl::CancelledError(); } - std::string lastTextChunk; - auto generationStatus = legacyExecutionContext->finished.wait_for(std::chrono::nanoseconds::zero()); - { - std::unique_lock lock(legacyExecutionContext->mutex); - while (executionContext->lastStreamerCallbackOutput.size() == 0 && generationStatus != std::future_status::ready) { - SPDLOG_LOGGER_TRACE(llm_executor_logger, "Waiting for partial data..."); - auto cvStatus = legacyExecutionContext->executionInProgress.wait_for(lock, std::chrono::milliseconds(10)); - generationStatus = legacyExecutionContext->finished.wait_for(std::chrono::nanoseconds::zero()); - if (cvStatus == std::cv_status::timeout && generationStatus == std::future_status::ready) { - SPDLOG_LOGGER_TRACE(llm_executor_logger, "Race condition avoided - notification was missed but recovered with timeout"); - } - } - lastTextChunk = executionContext->lastStreamerCallbackOutput; - executionContext->lastStreamerCallbackOutput = ""; - } - if (generationStatus != std::future_status::ready) { // continue + std::vector deltas = executionContext->deltaChannel.drain(); + const bool isFinishing = executionContext->deltaChannel.complete(); + if (!isFinishing) { // For RESPONSES endpoint, always call serializeStreamingChunk so that // output item initialization events are emitted even before the tokenizer produces text. - if (lastTextChunk.size() > 0 || executionContext->apiHandler->getEndpoint() == Endpoint::RESPONSES) { - std::string serializedChunk = executionContext->apiHandler->serializeStreamingChunk(lastTextChunk, ov::genai::GenerationFinishReason::NONE); - if (!serializedChunk.empty()) { - executionContext->response = wrapTextInServerSideEventMessage(serializedChunk); - SPDLOG_LOGGER_DEBUG(llm_calculator_logger, "Generated subsequent streaming response: {}", executionContext->response); + if (deltas.size() > 0 || executionContext->apiHandler->getEndpoint() == Endpoint::RESPONSES) { + for (auto& delta : deltas) { + std::string serialized = executionContext->apiHandler->serializeStreamingChunk( + std::move(delta), ov::genai::GenerationFinishReason::NONE); + if (!serialized.empty()) { + executionContext->response += wrapTextInServerSideEventMessage(serialized); + SPDLOG_LOGGER_DEBUG(llm_calculator_logger, "Generated subsequent streaming response: {}", serialized); + } + } + if (deltas.empty()) { + // No delta generated yet — emit lifecycle events for RESPONSES endpoint. + if (!executionContext->lifecyclePrimed) { + std::string serialized = executionContext->apiHandler->serializeStreamingChunk( + rapidjson::Document{}, ov::genai::GenerationFinishReason::NONE); + if (!serialized.empty()) { + executionContext->response = wrapTextInServerSideEventMessage(serialized); + executionContext->lifecyclePrimed = true; + } + } } } executionContext->sendLoopbackSignal = true; - } else { // finish generation + } else { if (!legacyExecutionContext->success) { return absl::InvalidArgumentError("Request processing failed, check its correctness."); } OVMS_PROFILE_SCOPE("Generation of last streaming response"); + // Flush held-back tokens from the delay buffer; fires OVMSTextStreamer callback + // which pushes final delta(s) into deltaChannel. executionContext->textStreamer->end(); - // if streamer::put returned a value, streamer::end() result will not contain it, so we add it manually - if (!executionContext->lastStreamerCallbackOutput.empty()) { - lastTextChunk = lastTextChunk + executionContext->lastStreamerCallbackOutput; + // Drain again to collect the end-flush delta(s) and merge with any pre-end ones. + for (auto& d : executionContext->deltaChannel.drain()) { + deltas.push_back(std::move(d)); } if (legacyExecutionContext->results.finish_reasons.empty()) { SPDLOG_LOGGER_DEBUG(llm_calculator_logger, "Missing finish reason in legacy VLM streaming generation result, defaulting to STOP"); } - // Legacy generation path always runs with batch=1, so we read the single finish reason at index 0. + // Legacy generation path always runs with deltas=1, so we read the single finish reason at index 0. ov::genai::GenerationFinishReason finishReason = legacyExecutionContext->results.finish_reasons.empty() ? ov::genai::GenerationFinishReason::STOP : legacyExecutionContext->results.finish_reasons[0]; - std::string serializedChunk = executionContext->apiHandler->serializeStreamingChunk(lastTextChunk, finishReason); - if (!serializedChunk.empty()) { - executionContext->response = wrapTextInServerSideEventMessage(serializedChunk); + if (!deltas.empty()) { + for (size_t i = 0; i < deltas.size(); ++i) { + const bool isLast = (i == deltas.size() - 1); + std::string serialized = executionContext->apiHandler->serializeStreamingChunk( + std::move(deltas[i]), + isLast ? finishReason : ov::genai::GenerationFinishReason::NONE); + if (!serialized.empty()) { + executionContext->response += wrapTextInServerSideEventMessage(serialized); + } + } + } else { + // Parser produced no delta (generation ended on a swallowed token). + std::string serialized = executionContext->apiHandler->serializeStreamingChunk( + rapidjson::Document{}, finishReason); + if (!serialized.empty()) { + executionContext->response += wrapTextInServerSideEventMessage(serialized); + } } executionContext->apiHandler->setPromptTokensUsage(legacyExecutionContext->results.perf_metrics.get_num_input_tokens()); executionContext->apiHandler->setCompletionTokensUsage(legacyExecutionContext->results.perf_metrics.get_num_generated_tokens()); if (executionContext->apiHandler->getStreamOptions().includeUsage) executionContext->response += wrapTextInServerSideEventMessage(executionContext->apiHandler->serializeStreamingUsageChunk()); - executionContext->response += wrapTextInServerSideEventMessage("[DONE]"); - SPDLOG_LOGGER_DEBUG(llm_calculator_logger, "Generated complete streaming response: {}", executionContext->response); executionContext->sendLoopbackSignal = false; - return absl::OkStatus(); } return absl::OkStatus(); } diff --git a/src/llm/visual_language_model/legacy/servable.hpp b/src/llm/visual_language_model/legacy/servable.hpp index 8c07818bce..6cb5933757 100644 --- a/src/llm/visual_language_model/legacy/servable.hpp +++ b/src/llm/visual_language_model/legacy/servable.hpp @@ -14,6 +14,7 @@ // limitations under the License. //***************************************************************************** #pragma once +#include #include #include #include @@ -30,20 +31,21 @@ struct VisualLanguageModelLegacyServableExecutionContext : public GenAiServableE ov::genai::VLMDecodedResults results; std::promise readySignal; std::future finished = readySignal.get_future(); - std::mutex mutex; std::vector inputImages; - std::condition_variable executionInProgress; std::string inputText; // Workaround needed to pass generation config to the executor that requires it ov::genai::GenerationConfig baseGenerationConfig; bool success{true}; + // Accumulated decoded text for the unary path — populated via OVMSTextStreamer + // callback so that the user's skip_special_tokens / decode params are respected. + std::string accumulatedUnaryText; // Disconnection handling std::atomic clientDisconnected{false}; void signalDisconnection() { clientDisconnected = true; - executionInProgress.notify_all(); + deltaChannel.signalComplete(); } }; diff --git a/src/test/http_openai_handler_test.cpp b/src/test/http_openai_handler_test.cpp index f62f302020..62144456d1 100644 --- a/src/test/http_openai_handler_test.cpp +++ b/src/test/http_openai_handler_test.cpp @@ -872,6 +872,32 @@ static std::vector createHermes3ToolCallTokens(ov::genai::Tokenizer& to return generatedTokens; } +// Test helper: wraps the old serializeStreamingChunk(string, reason) behaviour for migration period. +// Calls outputParser->parseChunk when a parser is present; otherwise builds a trivial content delta. +static std::string serializeStreamingChunkFromText(ovms::OpenAIApiHandler& handler, + const std::string& text, + ov::genai::GenerationFinishReason finishReason) { + const auto& outputParser = handler.getOutputParser(); + rapidjson::Document delta; + if (outputParser != nullptr) { + auto parsed = outputParser->parseChunk(text, {}, handler.areToolsAvailable(), finishReason); + if (!parsed.has_value()) { + if (finishReason == ov::genai::GenerationFinishReason::NONE) + return ""; + delta = rapidjson::Document{}; + } else { + delta = std::move(*parsed); + } + } else { + delta.SetObject(); + rapidjson::Document::AllocatorType& alloc = delta.GetAllocator(); + rapidjson::Value deltaObj(rapidjson::kObjectType); + deltaObj.AddMember("content", rapidjson::Value(text.c_str(), alloc), alloc); + delta.AddMember("delta", deltaObj, alloc); + } + return handler.serializeStreamingChunk(std::move(delta), finishReason); +} + TEST_F(HttpOpenAIHandlerParsingTest, serializeStreamingChunkReturnsIntermediateNullAndFinallyToolCallsFinishReason) { std::string json = R"({ "model": "llama", @@ -912,7 +938,7 @@ TEST_F(HttpOpenAIHandlerParsingTest, serializeStreamingChunkReturnsIntermediateN std::vector serializedChunks; for (const auto& [chunk, finishReason] : stream) { - std::string serialized = apiHandler->serializeStreamingChunk(chunk, finishReason); + std::string serialized = serializeStreamingChunkFromText(*apiHandler, chunk, finishReason); if (!serialized.empty()) { serializedChunks.push_back(serialized); } @@ -971,7 +997,7 @@ TEST_F(HttpOpenAIHandlerParsingTest, serializeStreamingChunkReturnsToolCallsFini std::vector serializedChunks; for (const auto& [chunk, finishReason] : stream) { - std::string serialized = apiHandler->serializeStreamingChunk(chunk, finishReason); + std::string serialized = serializeStreamingChunkFromText(*apiHandler, chunk, finishReason); if (!serialized.empty()) { serializedChunks.push_back(serialized); } @@ -1296,11 +1322,11 @@ TEST_F(HttpOpenAIHandlerParsingTest, serializeStreamingChunkForResponsesContains ASSERT_EQ(inProgressChunk.find("\"type\":\"response.created\""), std::string::npos) << "Only in_progress event: " << inProgressChunk; // Phase 3: Empty chunk should produce no output - std::string secondChunk = apiHandler->serializeStreamingChunk("", ov::genai::GenerationFinishReason::NONE); + std::string secondChunk = serializeStreamingChunkFromText(*apiHandler, "", ov::genai::GenerationFinishReason::NONE); ASSERT_TRUE(secondChunk.empty()) << "Empty text should produce no output: " << secondChunk; // Phase 4: First text delta - should include output_item.added + content_part.added + delta - std::string deltaChunk = apiHandler->serializeStreamingChunk("Hello", ov::genai::GenerationFinishReason::NONE); + std::string deltaChunk = serializeStreamingChunkFromText(*apiHandler, "Hello", ov::genai::GenerationFinishReason::NONE); ASSERT_NE(deltaChunk.find("\"type\":\"response.output_item.added\""), std::string::npos) << deltaChunk; ASSERT_NE(deltaChunk.find("\"type\":\"response.content_part.added\""), std::string::npos) << deltaChunk; ASSERT_NE(deltaChunk.find("\"type\":\"response.output_text.delta\""), std::string::npos) << deltaChunk; @@ -1315,7 +1341,7 @@ TEST_F(HttpOpenAIHandlerParsingTest, serializeStreamingChunkForResponsesContains ASSERT_LT(contentPartAddedPos, firstDeltaPos) << "content_part.added must come before delta"; // Phase 5: Final chunk with finish reason - std::string finalChunk = apiHandler->serializeStreamingChunk(" world", ov::genai::GenerationFinishReason::STOP); + std::string finalChunk = serializeStreamingChunkFromText(*apiHandler, " world", ov::genai::GenerationFinishReason::STOP); ASSERT_NE(finalChunk.find("\"type\":\"response.output_text.delta\""), std::string::npos) << finalChunk; ASSERT_NE(finalChunk.find("\"type\":\"response.output_text.done\""), std::string::npos) << finalChunk; ASSERT_NE(finalChunk.find("\"type\":\"response.content_part.done\""), std::string::npos) << finalChunk; @@ -1361,12 +1387,12 @@ TEST_F(HttpOpenAIHandlerParsingTest, serializeStreamingChunkForResponsesWithReas ASSERT_EQ(inProgressChunk.find("\"type\":\"response.output_item.added\""), std::string::npos) << "output_item.added should be deferred: " << inProgressChunk; // Phase 2: Reasoning chunk with tag - should emit reasoning init + delta - std::string reasoningChunk = apiHandler->serializeStreamingChunk("", ov::genai::GenerationFinishReason::NONE); + std::string reasoningChunk = serializeStreamingChunkFromText(*apiHandler, "", ov::genai::GenerationFinishReason::NONE); // tag itself should be consumed by parser, no events // (parser returns nullopt for tag tokens) // Phase 3: Reasoning content - std::string reasoningContent = apiHandler->serializeStreamingChunk("Let me think", ov::genai::GenerationFinishReason::NONE); + std::string reasoningContent = serializeStreamingChunkFromText(*apiHandler, "Let me think", ov::genai::GenerationFinishReason::NONE); ASSERT_NE(reasoningContent.find("\"type\":\"response.output_item.added\""), std::string::npos) << "Should have reasoning output_item.added: " << reasoningContent; ASSERT_NE(reasoningContent.find("\"type\":\"reasoning\""), std::string::npos) << "Output item should be reasoning type: " << reasoningContent; ASSERT_NE(reasoningContent.find("\"type\":\"response.reasoning_summary_part.added\""), std::string::npos) << reasoningContent; @@ -1374,18 +1400,18 @@ TEST_F(HttpOpenAIHandlerParsingTest, serializeStreamingChunkForResponsesWithReas ASSERT_NE(reasoningContent.find("\"delta\":\"Let me think\""), std::string::npos) << reasoningContent; // Phase 4: More reasoning - std::string moreReasoning = apiHandler->serializeStreamingChunk(" harder", ov::genai::GenerationFinishReason::NONE); + std::string moreReasoning = serializeStreamingChunkFromText(*apiHandler, " harder", ov::genai::GenerationFinishReason::NONE); ASSERT_NE(moreReasoning.find("\"type\":\"response.reasoning_summary_text.delta\""), std::string::npos) << moreReasoning; ASSERT_NE(moreReasoning.find("\"delta\":\" harder\""), std::string::npos) << moreReasoning; // Should NOT have another output_item.added ASSERT_EQ(moreReasoning.find("\"type\":\"response.output_item.added\""), std::string::npos) << "No repeated init: " << moreReasoning; // Phase 5: End of reasoning with - std::string endThink = apiHandler->serializeStreamingChunk("", ov::genai::GenerationFinishReason::NONE); + std::string endThink = serializeStreamingChunkFromText(*apiHandler, "", ov::genai::GenerationFinishReason::NONE); // tag consumed by parser // Phase 6: Content chunk - should close reasoning and open message - std::string contentChunk = apiHandler->serializeStreamingChunk("The answer", ov::genai::GenerationFinishReason::NONE); + std::string contentChunk = serializeStreamingChunkFromText(*apiHandler, "The answer", ov::genai::GenerationFinishReason::NONE); ASSERT_NE(contentChunk.find("\"type\":\"response.reasoning_summary_text.done\""), std::string::npos) << "Should close reasoning: " << contentChunk; ASSERT_NE(contentChunk.find("\"type\":\"response.reasoning_summary_part.done\""), std::string::npos) << contentChunk; // Message item should be at output_index 1 @@ -1394,7 +1420,7 @@ TEST_F(HttpOpenAIHandlerParsingTest, serializeStreamingChunkForResponsesWithReas ASSERT_NE(contentChunk.find("\"type\":\"response.output_text.delta\""), std::string::npos) << contentChunk; // Phase 7: Final chunk - std::string finalChunk = apiHandler->serializeStreamingChunk(" is 42", ov::genai::GenerationFinishReason::STOP); + std::string finalChunk = serializeStreamingChunkFromText(*apiHandler, " is 42", ov::genai::GenerationFinishReason::STOP); ASSERT_NE(finalChunk.find("\"type\":\"response.output_text.delta\""), std::string::npos) << finalChunk; ASSERT_NE(finalChunk.find("\"type\":\"response.output_text.done\""), std::string::npos) << finalChunk; ASSERT_NE(finalChunk.find("\"type\":\"response.content_part.done\""), std::string::npos) << finalChunk; @@ -1427,7 +1453,7 @@ TEST_F(HttpOpenAIHandlerParsingTest, serializeStreamingChunkEmptyPrimingDoesNotP // Empty priming call: should emit only lifecycle events, never output_text.delta, // and must not move the parser past the reasoning start tag. - std::string primingChunk = apiHandler->serializeStreamingChunk("", ov::genai::GenerationFinishReason::NONE); + std::string primingChunk = apiHandler->serializeStreamingChunk(rapidjson::Document{}, ov::genai::GenerationFinishReason::NONE); ASSERT_NE(primingChunk.find("\"type\":\"response.created\""), std::string::npos) << primingChunk; ASSERT_NE(primingChunk.find("\"type\":\"response.in_progress\""), std::string::npos) << primingChunk; ASSERT_EQ(primingChunk.find("\"type\":\"response.output_text.delta\""), std::string::npos) @@ -1437,8 +1463,8 @@ TEST_F(HttpOpenAIHandlerParsingTest, serializeStreamingChunkEmptyPrimingDoesNotP // Now the parser must still recognise the reasoning start tag and route the // following text to reasoning, not content. - apiHandler->serializeStreamingChunk("", ov::genai::GenerationFinishReason::NONE); - std::string reasoningChunk = apiHandler->serializeStreamingChunk("hello", ov::genai::GenerationFinishReason::NONE); + serializeStreamingChunkFromText(*apiHandler, "", ov::genai::GenerationFinishReason::NONE); + std::string reasoningChunk = serializeStreamingChunkFromText(*apiHandler, "hello", ov::genai::GenerationFinishReason::NONE); ASSERT_NE(reasoningChunk.find("\"type\":\"response.reasoning_summary_text.delta\""), std::string::npos) << "Reasoning text must be routed to reasoning_summary_text.delta: " << reasoningChunk; ASSERT_EQ(reasoningChunk.find("\"type\":\"response.output_text.delta\""), std::string::npos) @@ -1469,7 +1495,7 @@ TEST_F(HttpOpenAIHandlerParsingTest, serializeStreamingChunkForResponsesWithoutR ASSERT_EQ(inProgressChunk.find("\"type\":\"response.output_item.added\""), std::string::npos) << "Should be deferred: " << inProgressChunk; // Content without reasoning - should emit message init events on first content - std::string contentChunk = apiHandler->serializeStreamingChunk("Hello", ov::genai::GenerationFinishReason::NONE); + std::string contentChunk = serializeStreamingChunkFromText(*apiHandler, "Hello", ov::genai::GenerationFinishReason::NONE); ASSERT_NE(contentChunk.find("\"type\":\"response.output_item.added\""), std::string::npos) << "Should init message: " << contentChunk; ASSERT_NE(contentChunk.find("\"type\":\"response.content_part.added\""), std::string::npos) << contentChunk; ASSERT_NE(contentChunk.find("\"type\":\"response.output_text.delta\""), std::string::npos) << contentChunk; @@ -1478,7 +1504,7 @@ TEST_F(HttpOpenAIHandlerParsingTest, serializeStreamingChunkForResponsesWithoutR ASSERT_EQ(contentChunk.find("\"type\":\"response.reasoning_summary"), std::string::npos) << "No reasoning: " << contentChunk; // Final chunk - std::string finalChunk = apiHandler->serializeStreamingChunk(" world", ov::genai::GenerationFinishReason::STOP); + std::string finalChunk = serializeStreamingChunkFromText(*apiHandler, " world", ov::genai::GenerationFinishReason::STOP); ASSERT_NE(finalChunk.find("\"type\":\"response.completed\""), std::string::npos) << finalChunk; } @@ -1519,10 +1545,10 @@ TEST_F(HttpOpenAIHandlerParsingTest, serializeStreamingChunkForResponsesEmitsInc apiHandler->serializeStreamingCreatedEvent(); apiHandler->serializeStreamingInProgressEvent(); // Delta - apiHandler->serializeStreamingChunk("Hello", ov::genai::GenerationFinishReason::NONE); + serializeStreamingChunkFromText(*apiHandler, "Hello", ov::genai::GenerationFinishReason::NONE); // Final chunk with LENGTH finish reason - std::string finalChunk = apiHandler->serializeStreamingChunk("", ov::genai::GenerationFinishReason::LENGTH); + std::string finalChunk = serializeStreamingChunkFromText(*apiHandler, "", ov::genai::GenerationFinishReason::LENGTH); // Should emit response.incomplete instead of response.completed ASSERT_NE(finalChunk.find("\"type\":\"response.incomplete\""), std::string::npos) << finalChunk; @@ -1572,7 +1598,7 @@ TEST_F(HttpOpenAIHandlerParsingTest, serializeStreamingChunkForResponsesEmitsCom apiHandler->serializeStreamingCreatedEvent(); apiHandler->serializeStreamingInProgressEvent(); // Delta + finish with STOP - std::string finalChunk = apiHandler->serializeStreamingChunk("Hello", ov::genai::GenerationFinishReason::STOP); + std::string finalChunk = serializeStreamingChunkFromText(*apiHandler, "Hello", ov::genai::GenerationFinishReason::STOP); // Should emit response.completed, NOT response.incomplete ASSERT_NE(finalChunk.find("\"type\":\"response.completed\""), std::string::npos) << finalChunk; @@ -1671,7 +1697,7 @@ TEST_F(HttpOpenAIHandlerParsingTest, serializeFailedEventAfterPartialStreaming) // Emit init events and some deltas first apiHandler->serializeStreamingCreatedEvent(); apiHandler->serializeStreamingInProgressEvent(); - apiHandler->serializeStreamingChunk("Hello", ov::genai::GenerationFinishReason::NONE); + serializeStreamingChunkFromText(*apiHandler, "Hello", ov::genai::GenerationFinishReason::NONE); // Then fail std::string failedEvent = apiHandler->serializeFailedEvent("Generation aborted"); diff --git a/src/test/llm/llmnode_test.cpp b/src/test/llm/llmnode_test.cpp index 0e52c1bd1f..dcaa1ac6e7 100644 --- a/src/test/llm/llmnode_test.cpp +++ b/src/test/llm/llmnode_test.cpp @@ -362,26 +362,40 @@ TEST_P(LLMFlowHttpTestParameterized, streamCompletionsEchoWithCompletion) { )"; std::vector chunks; ON_CALL(*writer, PartialReply).WillByDefault([this, &chunks, ¶ms](std::string response) { - rapidjson::Document d; - std::string dataPrefix = "data:"; + // A single PartialReply may contain multiple SSE events (e.g. all echo + // tokens in the first call). Iterate every event and collect text chunks. + const std::string eventSep = "\n\n"; + const std::string dataPrefix = "data:"; ASSERT_STREQ(response.substr(0, dataPrefix.size()).c_str(), dataPrefix.c_str()); - size_t pos = response.find("\n"); - ASSERT_NE(pos, response.npos); - rapidjson::ParseResult parsingSucceeded = d.Parse(response.substr(dataPrefix.size(), (pos - dataPrefix.size())).c_str()); - ASSERT_EQ(parsingSucceeded.Code(), 0); - ASSERT_TRUE(d["choices"].IsArray()); - ASSERT_EQ(d["choices"].Capacity(), 1); - int i = 0; - for (auto& choice : d["choices"].GetArray()) { - ASSERT_EQ(choice["index"], i++); - if (params.checkLogprobs) { - ASSERT_FALSE(choice["logprobs"].IsObject()); + size_t start = 0; + while (start < response.size()) { + const size_t eventEnd = response.find(eventSep, start); + if (eventEnd == std::string::npos) + break; + const std::string event = response.substr(start, eventEnd - start); + start = eventEnd + eventSep.size(); + if (event.size() < dataPrefix.size()) + continue; + const std::string body = event.substr(dataPrefix.size()); + if (body.find("[DONE]") != std::string::npos) + break; + rapidjson::Document d; + rapidjson::ParseResult pr = d.Parse(body.c_str()); + ASSERT_EQ(pr.Code(), 0); + ASSERT_TRUE(d["choices"].IsArray()); + ASSERT_EQ(d["choices"].Capacity(), 1); + int i = 0; + for (auto& choice : d["choices"].GetArray()) { + ASSERT_EQ(choice["index"], i++); + if (params.checkLogprobs) { + ASSERT_FALSE(choice["logprobs"].IsObject()); + } + ASSERT_TRUE(choice["text"].IsString()); + chunks.push_back(std::string(choice["text"].GetString())); } - ASSERT_TRUE(choice["text"].IsString()); - chunks.push_back(std::string(choice["text"].GetString())); + EXPECT_STREQ(d["model"].GetString(), params.modelName.c_str()); + EXPECT_STREQ(d["object"].GetString(), "text_completion.chunk"); } - EXPECT_STREQ(d["model"].GetString(), params.modelName.c_str()); - EXPECT_STREQ(d["object"].GetString(), "text_completion.chunk"); }); ASSERT_EQ( @@ -478,35 +492,53 @@ TEST_P(LLMFlowHttpTestParameterized, streamCompletionsEchoOnly) { )"; if (params.modelName.find("legacy") == std::string::npos) { - EXPECT_CALL(*writer, PartialReply(::testing::_)).WillOnce([this, ¶ms](std::string response) { - rapidjson::Document d; - std::string dataPrefix = "data:"; + // Echo tokens are streamed one SSE event per token (through the normal + // delay-buffer path), so a single PartialReply may contain multiple events. + // Accumulate all text chunks and verify their concatenation equals the prompt. + std::string echoText; + std::string lastFinishReason; + EXPECT_CALL(*writer, PartialReply(::testing::_)).WillOnce([this, ¶ms, &echoText, &lastFinishReason](std::string response) { + const std::string eventSep = "\n\n"; + const std::string dataPrefix = "data:"; ASSERT_STREQ(response.substr(0, dataPrefix.size()).c_str(), dataPrefix.c_str()); - size_t pos = response.find("\n"); - ASSERT_NE(pos, response.npos); - rapidjson::ParseResult parsingSucceeded = d.Parse(response.substr(dataPrefix.size(), (pos - dataPrefix.size())).c_str()); - ASSERT_EQ(parsingSucceeded.Code(), 0); - ASSERT_TRUE(d["choices"].IsArray()); - ASSERT_EQ(d["choices"].Capacity(), 1); - int i = 0; - for (auto& choice : d["choices"].GetArray()) { - if (params.checkFinishReason) { - ASSERT_TRUE(choice["finish_reason"].IsString()); - EXPECT_STREQ(choice["finish_reason"].GetString(), "length"); - } - ASSERT_EQ(choice["index"], i++); - if (params.checkLogprobs) { - ASSERT_FALSE(choice["logprobs"].IsObject()); + size_t start = 0; + while (start < response.size()) { + const size_t eventEnd = response.find(eventSep, start); + if (eventEnd == std::string::npos) + break; + const std::string event = response.substr(start, eventEnd - start); + start = eventEnd + eventSep.size(); + if (event.size() < dataPrefix.size()) + continue; + const std::string body = event.substr(dataPrefix.size()); + if (body.find("[DONE]") != std::string::npos) + break; + rapidjson::Document d; + rapidjson::ParseResult pr = d.Parse(body.c_str()); + ASSERT_EQ(pr.Code(), 0); + ASSERT_TRUE(d["choices"].IsArray()); + ASSERT_EQ(d["choices"].Capacity(), 1); + for (auto& choice : d["choices"].GetArray()) { + if (params.checkLogprobs) { + ASSERT_FALSE(choice["logprobs"].IsObject()); + } + ASSERT_TRUE(choice["text"].IsString()); + echoText += choice["text"].GetString(); + if (choice.HasMember("finish_reason") && choice["finish_reason"].IsString()) { + lastFinishReason = choice["finish_reason"].GetString(); + } } - ASSERT_TRUE(choice["text"].IsString()); - EXPECT_STREQ(choice["text"].GetString(), "What is OpenVINO?"); + EXPECT_STREQ(d["model"].GetString(), params.modelName.c_str()); + EXPECT_STREQ(d["object"].GetString(), "text_completion.chunk"); } - EXPECT_STREQ(d["model"].GetString(), params.modelName.c_str()); - EXPECT_STREQ(d["object"].GetString(), "text_completion.chunk"); }); ASSERT_EQ( handler->dispatchToProcessor(endpointCompletions, requestBody, &response, comp, responseComponents, writer, multiPartParser), ovms::StatusCode::PARTIAL_END); + if (params.checkFinishReason) { + EXPECT_STREQ(lastFinishReason.c_str(), "length"); + } + EXPECT_EQ(echoText, "What is OpenVINO?"); } else { // In legacy servable streaming with echo, prompt can be sent back in multiple chunks std::vector responses; @@ -1774,8 +1806,10 @@ TEST_P(LLMFlowHttpTestParameterized, inferChatCompletionsStream) { if (params.checkLogprobs) { ASSERT_FALSE(choice["logprobs"].IsObject()); } - ASSERT_TRUE(choice["delta"].IsObject()); - ASSERT_TRUE(choice["delta"]["content"].IsString()); + if (choice.HasMember("delta")) { + ASSERT_TRUE(choice["delta"].IsObject()); + ASSERT_TRUE(choice["delta"]["content"].IsString()); + } } EXPECT_STREQ(d["model"].GetString(), params.modelName.c_str()); EXPECT_STREQ(d["object"].GetString(), "chat.completion.chunk"); @@ -1821,8 +1855,10 @@ TEST_P(LLMFlowHttpTestParameterized, inferChatCompletionsStreamSkipSpecialTokens ASSERT_TRUE(d["choices"].IsArray()); ASSERT_EQ(d["choices"].Capacity(), 1); for (auto& choice : d["choices"].GetArray()) { - ASSERT_TRUE(choice["delta"].IsObject()); - ASSERT_TRUE(choice["delta"]["content"].IsString()); + if (choice.HasMember("delta")) { + ASSERT_TRUE(choice["delta"].IsObject()); + ASSERT_TRUE(choice["delta"]["content"].IsString()); + } } EXPECT_STREQ(d["object"].GetString(), "chat.completion.chunk"); }); @@ -1967,62 +2003,77 @@ TEST_P(LLMFlowHttpTestParameterized, streamChatCompletionsSingleStopString) { // In legacy streaming we don't know if the callback is the last one, so we rely on entire generation call finish. // Because of that, we might get additional response with empty content at the end of the stream. - const size_t numberOfLastResponsesToCheckForStopString = params.modelName.find("legacy") != std::string::npos ? 2 : 1; + const size_t numberOfLastResponsesToCheckForStopString = std::min( + params.modelName.find("legacy") != std::string::npos ? size_t{2} : size_t{1}, + responses.size()); // The stop string (.) does not need to be at the end of the message. // There are cases when the last generation contains dot and a new lines, or generated token is "e.g", // or simply any token (or group of tokens) that has dot in a middle. + const std::string eventSep = "\n\n"; + const std::string dataPrefix = "data:"; + // Check for no existence of a dot: for (size_t i = params.checkHandshakeChunk ? 1 : 0; i < responses.size() - numberOfLastResponsesToCheckForStopString; ++i) { - // Assert there is no dot '.' in the response - - // Cut "data: " prefix - std::string dataPrefix = "data:"; - std::string resp = responses[i].substr(dataPrefix.size()); - - rapidjson::Document d; - rapidjson::ParseResult ok = d.Parse(resp.c_str()); - ASSERT_EQ(ok.Code(), 0) << d.GetParseError() << "\n" - << resp; - - ASSERT_TRUE(d["choices"].IsArray()); - ASSERT_EQ(d["choices"].Size(), 1); - ASSERT_TRUE(d["choices"][0].IsObject()); - ASSERT_TRUE(d["choices"][0]["delta"].IsObject()); - ASSERT_TRUE(d["choices"][0]["delta"]["content"].IsString()); - resp = d["choices"][0]["delta"]["content"].GetString(); - ASSERT_EQ(resp.find('.'), std::string::npos) << "found dot in response: " << responses[i] << " at index: " << i << " out of: " << responses.size(); + size_t start = 0; + while (start < responses[i].size()) { + const size_t eventEnd = responses[i].find(eventSep, start); + if (eventEnd == std::string::npos) + break; + const std::string event = responses[i].substr(start, eventEnd - start); + start = eventEnd + eventSep.size(); + if (event.size() < dataPrefix.size()) + continue; + const std::string body = event.substr(dataPrefix.size()); + if (body.find("[DONE]") != std::string::npos) + break; + rapidjson::Document d; + rapidjson::ParseResult ok = d.Parse(body.c_str()); + ASSERT_EQ(ok.Code(), 0) << d.GetParseError() << "\n" + << body; + ASSERT_TRUE(d["choices"].IsArray()); + ASSERT_EQ(d["choices"].Size(), 1); + ASSERT_TRUE(d["choices"][0].IsObject()); + if (!d["choices"][0].HasMember("delta")) + continue; + ASSERT_TRUE(d["choices"][0]["delta"].IsObject()); + ASSERT_TRUE(d["choices"][0]["delta"]["content"].IsString()); + std::string content = d["choices"][0]["delta"]["content"].GetString(); + ASSERT_EQ(content.find('.'), std::string::npos) << "found dot in response: " << responses[i] << " at index: " << i << " out of: " << responses.size(); + } } bool foundDotInLastResponse = false; // Check for existence of a dot: for (size_t i = responses.size() - numberOfLastResponsesToCheckForStopString; i < responses.size(); ++i) { - // Assert there is a dot '.' in the response - - // Cut "data: " prefix - std::string dataPrefix = "data:"; - std::string resp = responses[i].substr(dataPrefix.size()); - - // remove from resp: "data: [DONE]" (not only in the beginning) - size_t pos = resp.find("data: [DONE]"); - if (pos != std::string::npos) { - resp.erase(pos, std::string("data: [DONE]").length()); - } - - rapidjson::Document d; - rapidjson::ParseResult ok = d.Parse(resp.c_str()); - ASSERT_EQ(ok.Code(), 0) << d.GetParseError() << "\n" - << resp; - - ASSERT_TRUE(d["choices"].IsArray()); - ASSERT_EQ(d["choices"].Size(), 1); - ASSERT_TRUE(d["choices"][0].IsObject()); - ASSERT_TRUE(d["choices"][0]["delta"].IsObject()); - ASSERT_TRUE(d["choices"][0]["delta"]["content"].IsString()); - resp = d["choices"][0]["delta"]["content"].GetString(); - if (resp.find('.') != std::string::npos) { - foundDotInLastResponse = true; + size_t start = 0; + while (start < responses[i].size()) { + const size_t eventEnd = responses[i].find(eventSep, start); + if (eventEnd == std::string::npos) + break; + const std::string event = responses[i].substr(start, eventEnd - start); + start = eventEnd + eventSep.size(); + if (event.size() < dataPrefix.size()) + continue; + const std::string body = event.substr(dataPrefix.size()); + if (body.find("[DONE]") != std::string::npos) + break; + rapidjson::Document d; + rapidjson::ParseResult ok = d.Parse(body.c_str()); + ASSERT_EQ(ok.Code(), 0) << d.GetParseError() << "\n" + << body; + ASSERT_TRUE(d["choices"].IsArray()); + ASSERT_EQ(d["choices"].Size(), 1); + ASSERT_TRUE(d["choices"][0].IsObject()); + if (!d["choices"][0].HasMember("delta")) + continue; + ASSERT_TRUE(d["choices"][0]["delta"].IsObject()); + ASSERT_TRUE(d["choices"][0]["delta"]["content"].IsString()); + std::string content = d["choices"][0]["delta"]["content"].GetString(); + if (content.find('.') != std::string::npos) { + foundDotInLastResponse = true; + } } } ASSERT_TRUE(foundDotInLastResponse) << "cannot find dot last responses"; @@ -2101,7 +2152,13 @@ TEST_P(LLMFlowHttpTestParameterized, streamCompletionsSingleStopString) { if (params.modelName.find("legacy") != std::string::npos) { // In legacy streaming we don't know if the callback is the last one, so we rely on entire generation call finish. // Because of that, we might get additional response with empty content at the end of the stream. - ASSERT_TRUE(std::regex_search(responses[responses.size() - 2], content_regex) || std::regex_search(responses.back(), content_regex)); + // Guard against responses.size() < 2 (can happen when all deltas arrive in a single drain). + if (responses.size() >= 2) { + ASSERT_TRUE(std::regex_search(responses[responses.size() - 2], content_regex) || std::regex_search(responses.back(), content_regex)); + } else { + ASSERT_GE(responses.size(), 1u); + ASSERT_TRUE(std::regex_search(responses.back(), content_regex)); + } } else { ASSERT_TRUE(std::regex_search(responses.back(), content_regex)); } diff --git a/src/test/llm/output_parsers/devstral_output_parser_test.cpp b/src/test/llm/output_parsers/devstral_output_parser_test.cpp index ca61b5c2cc..a694e1e487 100644 --- a/src/test/llm/output_parsers/devstral_output_parser_test.cpp +++ b/src/test/llm/output_parsers/devstral_output_parser_test.cpp @@ -225,7 +225,7 @@ TEST_F(DevstralOutputParserTest, HolisticStreaming) { int64_t chunkIteration = -1; for (const auto& [chunk, finishReason, expectedDelta] : chunkToDeltaVecCopy) { chunkIteration++; - std::optional doc = outputParserWithRegularToolParsing->parseChunk(chunk, true, finishReason); + std::optional doc = outputParserWithRegularToolParsing->parseChunk(chunk, {}, true, finishReason); if (!expectedDelta.has_value() && !doc.has_value()) { continue; // Both are nullopt, OK } @@ -291,7 +291,7 @@ TEST_F(DevstralOutputParserTest, EmptyArgumentsStreaming) { int64_t chunkIteration = 0; for (const auto& [chunk, finishReason, expectedDelta] : chunkToDeltaVec) { chunkIteration++; - std::optional doc = outputParserWithRegularToolParsing->parseChunk(chunk, true, finishReason); + std::optional doc = outputParserWithRegularToolParsing->parseChunk(chunk, {}, true, finishReason); if (!expectedDelta.has_value() && !doc.has_value()) { continue; // Both are nullopt, OK } @@ -354,7 +354,7 @@ TEST_F(DevstralOutputParserTest, ToolCallsWithoutToolsInTheRequestStreaming) { for (const auto& [chunk, expectedDelta] : chunkToDeltaVec) { // Second argument is false as we simulate the case where tools have not been provided in the request - std::optional doc = outputParserWithRegularToolParsing->parseChunk(chunk, false, ov::genai::GenerationFinishReason::NONE); + std::optional doc = outputParserWithRegularToolParsing->parseChunk(chunk, {}, false, ov::genai::GenerationFinishReason::NONE); if (!expectedDelta.has_value() && !doc.has_value()) { continue; // Both are nullopt, OK } diff --git a/src/test/llm/output_parsers/gemma4_output_parser_test.cpp b/src/test/llm/output_parsers/gemma4_output_parser_test.cpp index 64c1a0e90e..95b7df68a6 100644 --- a/src/test/llm/output_parsers/gemma4_output_parser_test.cpp +++ b/src/test/llm/output_parsers/gemma4_output_parser_test.cpp @@ -79,7 +79,7 @@ class Gemma4OutputParserTest : public ::testing::Test { void assertStreamingVec(const std::vector>>& chunkToDeltaVec) { for (const auto& [chunk, finishReason, expectedDelta] : chunkToDeltaVec) { - std::optional doc = outputParserWithRegularToolParsing->parseChunk(chunk, true, finishReason); + std::optional doc = outputParserWithRegularToolParsing->parseChunk(chunk, {}, true, finishReason); if (!expectedDelta.has_value() && !doc.has_value()) { continue; // Both are nullopt, OK } @@ -633,7 +633,7 @@ TEST_F(Gemma4OutputParserTest, ToolCallsWithoutToolsInTheRequestStreaming) { }; for (const auto& [chunk, expectedDelta] : chunkToDeltaVec) { - std::optional doc = outputParserWithRegularToolParsing->parseChunk(chunk, false, ov::genai::GenerationFinishReason::NONE); + std::optional doc = outputParserWithRegularToolParsing->parseChunk(chunk, {}, false, ov::genai::GenerationFinishReason::NONE); assertChunkEqual(doc, expectedDelta, chunk); } } diff --git a/src/test/llm/output_parsers/gptoss_output_parser_test.cpp b/src/test/llm/output_parsers/gptoss_output_parser_test.cpp index 58b5e70009..ea22c3be2e 100644 --- a/src/test/llm/output_parsers/gptoss_output_parser_test.cpp +++ b/src/test/llm/output_parsers/gptoss_output_parser_test.cpp @@ -465,7 +465,7 @@ class GptOssOutputStreamParserTest : public GptOssOutputUnaryParserTest { int64_t chunkIteration = -1; for (const auto& [chunk, finishReason, expectedDelta] : chunkToDeltaVecCopy) { chunkIteration++; - std::optional doc = outputParser->parseChunk(chunk, true, finishReason); + std::optional doc = outputParser->parseChunk(chunk, {}, true, finishReason); if (!expectedDelta.has_value() && !doc.has_value()) { continue; // Both are nullopt, OK } diff --git a/src/test/llm/output_parsers/hermes3_output_parser_test.cpp b/src/test/llm/output_parsers/hermes3_output_parser_test.cpp index ac2c29cad2..820d0fd19e 100644 --- a/src/test/llm/output_parsers/hermes3_output_parser_test.cpp +++ b/src/test/llm/output_parsers/hermes3_output_parser_test.cpp @@ -339,7 +339,7 @@ TEST_F(Hermes3OutputParserTest, HolisticStreaming) { }; for (const auto& [chunk, finishReason, expectedDelta] : chunkToDeltaVec) { - std::optional doc = outputParserWithRegularToolParsing->parseChunk(chunk, true, finishReason); + std::optional doc = outputParserWithRegularToolParsing->parseChunk(chunk, {}, true, finishReason); if (!expectedDelta.has_value() && !doc.has_value()) { continue; // Both are nullopt, OK } @@ -419,7 +419,7 @@ TEST_F(Hermes3OutputParserTest, ToolCallsWithoutToolsInTheRequestStreaming) { for (const auto& [chunk, expectedDelta] : chunkToDeltaVec) { // Second argument is false as we simulate the case where tools have not been provided in the request - std::optional doc = outputParserWithRegularToolParsing->parseChunk(chunk, false, ov::genai::GenerationFinishReason::NONE); + std::optional doc = outputParserWithRegularToolParsing->parseChunk(chunk, {}, false, ov::genai::GenerationFinishReason::NONE); if (!expectedDelta.has_value() && !doc.has_value()) { continue; // Both are nullopt, OK } diff --git a/src/test/llm/output_parsers/lfm2_output_parser_test.cpp b/src/test/llm/output_parsers/lfm2_output_parser_test.cpp index b557dae2ec..e20920aa6b 100644 --- a/src/test/llm/output_parsers/lfm2_output_parser_test.cpp +++ b/src/test/llm/output_parsers/lfm2_output_parser_test.cpp @@ -78,7 +78,7 @@ class LFM2OutputParserTest : public ::testing::Test { } void assertStreamingVec(const std::vector>>& chunkToDeltaVec) { for (const auto& [chunk, finishReason, expectedDelta] : chunkToDeltaVec) { - std::optional doc = outputParserWithRegularToolParsing->parseChunk(chunk, true, finishReason); + std::optional doc = outputParserWithRegularToolParsing->parseChunk(chunk, {}, true, finishReason); if (!expectedDelta.has_value() && !doc.has_value()) { continue; // Both are nullopt, OK } @@ -651,7 +651,7 @@ TEST_F(LFM2OutputParserTest, ToolCallsWithoutToolsInTheRequestStreaming) { for (const auto& [chunk, expectedDelta] : chunkToDeltaVec) { // Second argument is false as we simulate the case where tools have not been provided in the request - std::optional doc = outputParserWithRegularToolParsing->parseChunk(chunk, false, ov::genai::GenerationFinishReason::NONE); + std::optional doc = outputParserWithRegularToolParsing->parseChunk(chunk, {}, false, ov::genai::GenerationFinishReason::NONE); assertChunkEqual(doc, expectedDelta, chunk); } } diff --git a/src/test/llm/output_parsers/llama3_output_parser_test.cpp b/src/test/llm/output_parsers/llama3_output_parser_test.cpp index a26da4703f..0ce5fad9e6 100644 --- a/src/test/llm/output_parsers/llama3_output_parser_test.cpp +++ b/src/test/llm/output_parsers/llama3_output_parser_test.cpp @@ -214,7 +214,7 @@ TEST_F(Llama3OutputParserTest, HolisticStreaming) { int64_t chunkIteration = -1; for (const auto& [chunk, finishReason, expectedDelta] : chunkToDeltaVecCopy) { chunkIteration++; - std::optional doc = outputParserWithRegularToolParsing->parseChunk(chunk, true, finishReason); + std::optional doc = outputParserWithRegularToolParsing->parseChunk(chunk, {}, true, finishReason); if (!expectedDelta.has_value() && !doc.has_value()) { continue; // Both are nullopt, OK } @@ -333,7 +333,7 @@ TEST_F(Llama3OutputParserTest, StreamingToolWithComplexArguments) { auto outputParser = std::make_unique(*llama3Tokenizer, "llama3", "", EMPTY_TOOLS_SCHEMA); for (const auto& [chunk, expectedDelta] : chunkToDeltaVec) { - std::optional doc = outputParser->parseChunk(chunk, true, ov::genai::GenerationFinishReason::NONE); + std::optional doc = outputParser->parseChunk(chunk, {}, true, ov::genai::GenerationFinishReason::NONE); if (!expectedDelta.has_value() && !doc.has_value()) { continue; // Both are nullopt, OK } @@ -412,7 +412,7 @@ TEST_F(Llama3OutputParserTest, ToolCallsWithoutToolsInTheRequestStreaming) { for (const auto& [chunk, expectedDelta] : chunkToDeltaVec) { // Second argument is false as we simulate the case where tools have not been provided in the request - std::optional doc = outputParserWithRegularToolParsing->parseChunk(chunk, false, ov::genai::GenerationFinishReason::NONE); + std::optional doc = outputParserWithRegularToolParsing->parseChunk(chunk, {}, false, ov::genai::GenerationFinishReason::NONE); if (!expectedDelta.has_value() && !doc.has_value()) { continue; // Both are nullopt, OK } diff --git a/src/test/llm/output_parsers/mistral_output_parser_test.cpp b/src/test/llm/output_parsers/mistral_output_parser_test.cpp index bdc0a6f887..1f7c61d231 100644 --- a/src/test/llm/output_parsers/mistral_output_parser_test.cpp +++ b/src/test/llm/output_parsers/mistral_output_parser_test.cpp @@ -255,7 +255,7 @@ TEST_F(MistralOutputParserTest, HolisticStreaming) { int64_t chunkIteration = -1; for (const auto& [chunk, finishReason, expectedDelta] : chunkToDeltaVecCopy) { chunkIteration++; - std::optional doc = outputParserWithRegularToolParsing->parseChunk(chunk, true, finishReason); + std::optional doc = outputParserWithRegularToolParsing->parseChunk(chunk, {}, true, finishReason); if (!expectedDelta.has_value() && !doc.has_value()) { continue; // Both are nullopt, OK } @@ -374,7 +374,7 @@ TEST_F(MistralOutputParserTest, StreamingToolWithComplexArguments) { auto outputParser = std::make_unique(*mistralTokenizer, "mistral", "", EMPTY_TOOLS_SCHEMA); for (const auto& [chunk, expectedDelta] : chunkToDeltaVec) { - std::optional doc = outputParser->parseChunk(chunk, true, ov::genai::GenerationFinishReason::NONE); + std::optional doc = outputParser->parseChunk(chunk, {}, true, ov::genai::GenerationFinishReason::NONE); if (!expectedDelta.has_value() && !doc.has_value()) { continue; // Both are nullopt, OK } @@ -453,7 +453,7 @@ TEST_F(MistralOutputParserTest, ToolCallsWithoutToolsInTheRequestStreaming) { for (const auto& [chunk, expectedDelta] : chunkToDeltaVec) { // Second argument is false as we simulate the case where tools have not been provided in the request - std::optional doc = outputParserWithRegularToolParsing->parseChunk(chunk, false, ov::genai::GenerationFinishReason::NONE); + std::optional doc = outputParserWithRegularToolParsing->parseChunk(chunk, {}, false, ov::genai::GenerationFinishReason::NONE); if (!expectedDelta.has_value() && !doc.has_value()) { continue; // Both are nullopt, OK } diff --git a/src/test/llm/output_parsers/phi4_output_parser_test.cpp b/src/test/llm/output_parsers/phi4_output_parser_test.cpp index d66966a37d..fbd21515ce 100644 --- a/src/test/llm/output_parsers/phi4_output_parser_test.cpp +++ b/src/test/llm/output_parsers/phi4_output_parser_test.cpp @@ -238,7 +238,7 @@ TEST_F(Phi4OutputParserTest, HolisticStreaming) { int64_t chunkIteration = -1; for (const auto& [chunk, finishReason, expectedDelta] : chunkToDeltaVecCopy) { chunkIteration++; - std::optional doc = outputParserWithRegularToolParsing->parseChunk(chunk, true, finishReason); + std::optional doc = outputParserWithRegularToolParsing->parseChunk(chunk, {}, true, finishReason); if (!expectedDelta.has_value() && !doc.has_value()) { continue; // Both are nullopt, OK } @@ -357,7 +357,7 @@ TEST_F(Phi4OutputParserTest, StreamingToolWithComplexArguments) { auto outputParser = std::make_unique(*phi4Tokenizer, "phi4", "", EMPTY_TOOLS_SCHEMA); for (const auto& [chunk, expectedDelta] : chunkToDeltaVec) { - std::optional doc = outputParser->parseChunk(chunk, true, ov::genai::GenerationFinishReason::NONE); + std::optional doc = outputParser->parseChunk(chunk, {}, true, ov::genai::GenerationFinishReason::NONE); if (!expectedDelta.has_value() && !doc.has_value()) { continue; // Both are nullopt, OK } @@ -438,7 +438,7 @@ TEST_F(Phi4OutputParserTest, ToolCallsWithoutToolsInTheRequestStreaming) { for (const auto& [chunk, expectedDelta] : chunkToDeltaVec) { // Second argument is false as we simulate the case where tools have not been provided in the request - std::optional doc = outputParserWithRegularToolParsing->parseChunk(chunk, false, ov::genai::GenerationFinishReason::NONE); + std::optional doc = outputParserWithRegularToolParsing->parseChunk(chunk, {}, false, ov::genai::GenerationFinishReason::NONE); if (!expectedDelta.has_value() && !doc.has_value()) { continue; // Both are nullopt, OK } diff --git a/src/test/llm/output_parsers/qwen3_output_parser_test.cpp b/src/test/llm/output_parsers/qwen3_output_parser_test.cpp index 032fb53935..790c4a3b86 100644 --- a/src/test/llm/output_parsers/qwen3_output_parser_test.cpp +++ b/src/test/llm/output_parsers/qwen3_output_parser_test.cpp @@ -257,7 +257,7 @@ TEST_F(Qwen3OutputParserTest, HolisticStreaming) { }; for (const auto& [chunk, expectedDelta] : chunkToDeltaVec) { - std::optional doc = outputParser->parseChunk(chunk, true, ov::genai::GenerationFinishReason::NONE); + std::optional doc = outputParser->parseChunk(chunk, {}, true, ov::genai::GenerationFinishReason::NONE); if (!expectedDelta.has_value() && !doc.has_value()) { continue; // Both are nullopt, OK } @@ -380,7 +380,7 @@ TEST_F(Qwen3OutputParserTest, StreamingToolWithComplexArguments) { }; for (const auto& [chunk, expectedDelta] : chunkToDeltaVec) { - std::optional doc = outputParser->parseChunk(chunk, true, ov::genai::GenerationFinishReason::NONE); + std::optional doc = outputParser->parseChunk(chunk, {}, true, ov::genai::GenerationFinishReason::NONE); if (!expectedDelta.has_value() && !doc.has_value()) { continue; // Both are nullopt, OK } @@ -465,7 +465,7 @@ TEST_F(Qwen3OutputParserTest, ToolCallsInsideReasoningStreaming) { }; for (const auto& [chunk, expectedDelta] : chunkToDeltaVec) { - std::optional doc = outputParser->parseChunk(chunk, true, ov::genai::GenerationFinishReason::NONE); + std::optional doc = outputParser->parseChunk(chunk, {}, true, ov::genai::GenerationFinishReason::NONE); if (!expectedDelta.has_value() && !doc.has_value()) { continue; // Both are nullopt, OK } @@ -502,10 +502,10 @@ TEST_F(Qwen3OutputParserTest, ToolCallsBrokenJson) { }; for (const auto& [chunk, shouldThrow] : chunkToErrorVec) { if (shouldThrow) { - EXPECT_THROW(outputParser->parseChunk(chunk, true, ov::genai::GenerationFinishReason::NONE), std::runtime_error) << "Expected error for chunk: " << chunk; + EXPECT_THROW(outputParser->parseChunk(chunk, {}, true, ov::genai::GenerationFinishReason::NONE), std::runtime_error) << "Expected error for chunk: " << chunk; } else { EXPECT_NO_THROW({ - auto doc = outputParser->parseChunk(chunk, true, ov::genai::GenerationFinishReason::NONE); + auto doc = outputParser->parseChunk(chunk, {}, true, ov::genai::GenerationFinishReason::NONE); // No further checks, just ensure no exception }) << "Unexpected error for chunk: " << chunk; @@ -532,10 +532,10 @@ TEST_F(Qwen3OutputParserTest, ToolCallsDataAfterToolCall) { {"Buffer is not cleared, JSON is still broken", true}}; for (const auto& [chunk, shouldThrow] : chunkToErrorVec) { if (shouldThrow) { - EXPECT_THROW(outputParser->parseChunk(chunk, true, ov::genai::GenerationFinishReason::NONE), std::runtime_error) << "Expected error for chunk: " << chunk; + EXPECT_THROW(outputParser->parseChunk(chunk, {}, true, ov::genai::GenerationFinishReason::NONE), std::runtime_error) << "Expected error for chunk: " << chunk; } else { EXPECT_NO_THROW({ - auto doc = outputParser->parseChunk(chunk, true, ov::genai::GenerationFinishReason::NONE); + auto doc = outputParser->parseChunk(chunk, {}, true, ov::genai::GenerationFinishReason::NONE); // No further checks, just ensure no exception }) << "Unexpected error for chunk: " << chunk; diff --git a/src/test/llm/output_parsers/qwen3coder_output_parser_test.cpp b/src/test/llm/output_parsers/qwen3coder_output_parser_test.cpp index 347e30ab12..7a025d7bd6 100644 --- a/src/test/llm/output_parsers/qwen3coder_output_parser_test.cpp +++ b/src/test/llm/output_parsers/qwen3coder_output_parser_test.cpp @@ -755,7 +755,7 @@ if __name__ == "__main__": ov::genai::GenerationFinishReason::NONE, R"({"delta":{"tool_calls":[{"index":6,"function":{"arguments":"{\"arg1\":\"if __name__ == \\\"__main__\\\":\\n addresses = {}\\n addresses[\\\"Hodor\\\"] = \\\"\\\"\\\"The door\\\"\\\"\\\"\\n addresses[\\\"Arya\\\"] = \\\"Winterfell\\\"\\n for name, address in addresses.items():\\n print(f'\\\\n\\\\t{name} lives at {address}\\\\n\\\\r')\"}"}}]}})"}}; for (const auto& [chunk, finishReason, expectedDelta] : chunkToDeltaVec) { i++; - std::optional doc = outputParser->parseChunk(chunk, true, ov::genai::GenerationFinishReason::NONE); + std::optional doc = outputParser->parseChunk(chunk, {}, true, ov::genai::GenerationFinishReason::NONE); if (!expectedDelta.has_value() && !doc.has_value()) { continue; // Both are nullopt, OK }