Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion src/llm/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -295,7 +295,8 @@ ovms_cc_library(

ovms_cc_library(
name = "genai_servables",
hdrs = ["servable.hpp",
hdrs = ["servable.hpp",
"ovms_text_streamer.hpp",
"servable_initializer.hpp",
"language_model/continuous_batching/servable.hpp",
"language_model/continuous_batching/llm_executor.hpp",
Expand All @@ -310,6 +311,7 @@ ovms_cc_library(
"text_utils.hpp"],
srcs = ["servable.cpp",
"servable_initializer.cpp",
"ovms_text_streamer.cpp",
"language_model/continuous_batching/servable.cpp",
"language_model/continuous_batching/servable_initializer.cpp",
"visual_language_model/continuous_batching/servable.cpp",
Expand Down
2 changes: 1 addition & 1 deletion src/llm/apis/openai_api_handler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -520,7 +520,7 @@ bool OpenAIApiHandler::isStream() const { return request.stream; }
Endpoint OpenAIApiHandler::getEndpoint() const { return endpoint; }
std::string OpenAIApiHandler::getModel() const { return request.model; }
std::string OpenAIApiHandler::getToolChoice() const { return request.toolChoice; }
const std::unique_ptr<OutputParser>& OpenAIApiHandler::getOutputParser() const { return outputParser; }
const std::shared_ptr<OutputParser>& OpenAIApiHandler::getOutputParser() const { return outputParser; }

void OpenAIApiHandler::setPromptTokensUsage(size_t promptTokens) {
usage.promptTokens = promptTokens;
Expand Down
8 changes: 4 additions & 4 deletions src/llm/apis/openai_api_handler.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ class OpenAIApiHandler {
ov::genai::Tokenizer tokenizer;

// Output parser is used to parse chat completions response to extract specific fields like tool calls and reasoning.
std::unique_ptr<OutputParser> outputParser = nullptr;
std::shared_ptr<OutputParser> outputParser = nullptr;

// Shared parsing helpers
absl::Status parseCommonPart(std::optional<uint32_t> maxTokensLimit, uint32_t bestOfLimit, std::optional<uint32_t> maxModelLength);
Expand All @@ -118,7 +118,7 @@ class OpenAIApiHandler {
// TODO we should delay creating output parser until we have request with toolNameSchemaMap parsed
// we pass it now, but it has to be populated first before first use
if (!toolParserName.empty() || !reasoningParserName.empty()) {
outputParser = std::make_unique<OutputParser>(tokenizer, toolParserName, reasoningParserName, this->request.toolNameSchemaMap);
outputParser = std::make_shared<OutputParser>(tokenizer, toolParserName, reasoningParserName, this->request.toolNameSchemaMap);
}
}

Expand Down Expand Up @@ -154,7 +154,7 @@ class OpenAIApiHandler {
Endpoint getEndpoint() const;
std::string getModel() const;
std::string getToolChoice() const;
const std::unique_ptr<OutputParser>& getOutputParser() const;
const std::shared_ptr<OutputParser>& getOutputParser() const;

// Usage tracking
void setPromptTokensUsage(size_t promptTokens);
Expand All @@ -165,7 +165,7 @@ class OpenAIApiHandler {
virtual std::string serializeUnaryResponse(const std::vector<ov::genai::GenerationOutput>& generationOutputs) = 0;
virtual std::string serializeUnaryResponse(ov::genai::EncodedResults& results) = 0;
virtual std::string serializeUnaryResponse(ov::genai::VLMDecodedResults& results, const std::string& textResponse) = 0;
virtual std::string serializeStreamingChunk(const std::string& chunkResponse, ov::genai::GenerationFinishReason finishReason) = 0;
virtual std::string serializeStreamingChunk(rapidjson::Document parsedDelta, ov::genai::GenerationFinishReason finishReason) = 0;
virtual std::string serializeStreamingUsageChunk() = 0;
virtual std::string serializeStreamingHandshakeChunk() = 0;

Expand Down
49 changes: 22 additions & 27 deletions src/llm/apis/openai_completions.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -535,7 +535,7 @@ std::string OpenAIChatCompletionsHandler::serializeUnaryResponse(ov::genai::VLMD

// --- Streaming serialization ---

std::string OpenAIChatCompletionsHandler::serializeStreamingChunk(const std::string& chunkResponse, ov::genai::GenerationFinishReason finishReason) {
std::string OpenAIChatCompletionsHandler::serializeStreamingChunk(rapidjson::Document parsedDelta, ov::genai::GenerationFinishReason finishReason) {
OVMS_PROFILE_FUNCTION();

Document doc;
Expand All @@ -561,33 +561,25 @@ std::string OpenAIChatCompletionsHandler::serializeStreamingChunk(const std::str
// TODO: logprobs: object/null; Log probability information for the choice.
choice.AddMember("logprobs", Value(), allocator);
if (endpoint == Endpoint::CHAT_COMPLETIONS) {
if (outputParser != nullptr) {
std::optional<Document> delta = outputParser->parseChunk(chunkResponse, areToolsAvailable(), finishReason);
if (!delta.has_value()) {
// If the generation is still ongoing, there is nothing to emit yet
if (finishReason == ov::genai::GenerationFinishReason::NONE) {
return "";
}
// Generation finished but parser returned no delta (e.g. empty chunk after tool call).
// We still need to emit a chunk with the appropriate finish_reason.
}
if (delta.has_value() && delta->HasMember("delta")) {
// Deep copy the "delta" member value into the choice object
choice.AddMember("delta", Value((*delta)["delta"], allocator), allocator);
hasToolCalls = hasToolCallsInStreamingDelta(*delta);
if (hasToolCalls) {
toolCallsDetectedInStream = true;
}
// parsedDelta is a pre-parsed Document produced by OVMSTextStreamer::flush_chunk.
// Shape: {"delta":{...}} for content/reasoning/tool_calls, or an empty Document{}
// for finish-only chunks (generation ended on a swallowed token).
if (parsedDelta.HasMember("delta")) {
choice.AddMember("delta", Value(parsedDelta["delta"], allocator), allocator);
hasToolCalls = hasToolCallsInStreamingDelta(parsedDelta);
if (hasToolCalls) {
toolCallsDetectedInStream = true;
}

} else {
Value delta(kObjectType);
delta.SetObject();
delta.AddMember("content", Value(chunkResponse.c_str(), allocator), allocator);
choice.AddMember("delta", delta, allocator);
}
// If no "delta" member, choice has no delta — valid for the final finish_reason chunk.
} else if (endpoint == Endpoint::COMPLETIONS) {
choice.AddMember("text", Value(chunkResponse.c_str(), allocator), allocator);
// For /v1/completions, extract the plain text from the content delta.
if (parsedDelta.HasMember("delta") && parsedDelta["delta"].IsObject() &&
parsedDelta["delta"].HasMember("content") && parsedDelta["delta"]["content"].IsString()) {
choice.AddMember("text", Value(parsedDelta["delta"]["content"].GetString(), allocator), allocator);
} else {
choice.AddMember("text", Value("", allocator), allocator);
}
}

auto serializedFinishReason = mapFinishReason(finishReason, hasToolCalls || toolCallsDetectedInStream);
Expand Down Expand Up @@ -672,6 +664,9 @@ std::string OpenAIChatCompletionsHandler::serializeStreamingUsageChunk() {

std::string OpenAIChatCompletionsHandler::serializeStreamingHandshakeChunk() {
OVMS_PROFILE_FUNCTION();
// The handshake chunk signals that prefill is complete and generation has started.
// Emitted on every endpoint so clients can distinguish prefill latency from
// time-to-first-token.
Document doc;
doc.SetObject();
Document::AllocatorType& allocator = doc.GetAllocator();
Expand All @@ -691,7 +686,8 @@ std::string OpenAIChatCompletionsHandler::serializeStreamingHandshakeChunk() {
delta.AddMember("content", Value(rapidjson::kNullType), allocator);
choice.AddMember("delta", delta, allocator);
} else if (endpoint == Endpoint::COMPLETIONS) {
choice.AddMember("text", Value(rapidjson::kNullType), allocator);
// Empty string (not null) so the field is present and typed as string.
choice.AddMember("text", Value("", allocator), allocator);
}

choice.AddMember("finish_reason", Value(rapidjson::kNullType), allocator);
Expand All @@ -705,7 +701,6 @@ std::string OpenAIChatCompletionsHandler::serializeStreamingHandshakeChunk() {
// model: string; copied from the request
doc.AddMember("model", Value(request.model.c_str(), allocator), allocator);

// object: string; defined that the type streamed chunk rather than complete response
if (endpoint == Endpoint::CHAT_COMPLETIONS) {
doc.AddMember("object", Value("chat.completion.chunk", allocator), allocator);
} else if (endpoint == Endpoint::COMPLETIONS) {
Expand Down
2 changes: 1 addition & 1 deletion src/llm/apis/openai_completions.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ class OpenAIChatCompletionsHandler : public OpenAIApiHandler {
std::string serializeUnaryResponse(const std::vector<ov::genai::GenerationOutput>& generationOutputs) override;
std::string serializeUnaryResponse(ov::genai::EncodedResults& results) override;
std::string serializeUnaryResponse(ov::genai::VLMDecodedResults& results, const std::string& textResponse) override;
std::string serializeStreamingChunk(const std::string& chunkResponse, ov::genai::GenerationFinishReason finishReason) override;
std::string serializeStreamingChunk(rapidjson::Document parsedDelta, ov::genai::GenerationFinishReason finishReason) override;
std::string serializeStreamingUsageChunk() override;
std::string serializeStreamingHandshakeChunk() override;
void incrementProcessedTokens(size_t numTokens = 1) override;
Expand Down
Loading