Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
41 changes: 22 additions & 19 deletions cli/serve/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,16 +12,19 @@
import uvicorn
from fastapi import FastAPI, Request
from fastapi.exceptions import RequestValidationError
from fastapi.responses import JSONResponse
from fastapi.responses import JSONResponse, StreamingResponse

from mellea.backends.model_options import ModelOption
from mellea.helpers.openai_compatible_helpers import (
build_completion_usage,
stream_chat_completion_chunks,
)

from .models import (
ChatCompletion,
ChatCompletionMessage,
ChatCompletionRequest,
Choice,
CompletionUsage,
OpenAIError,
OpenAIErrorResponse,
)
Expand Down Expand Up @@ -94,8 +97,8 @@ def _build_model_options(request: ChatCompletionRequest) -> dict:
"n", # Number of completions - not supported in Mellea's model_options
"user", # User tracking ID - metadata, not a generation parameter
"extra", # Pydantic's extra fields dict - unused (see model_config)
"stream_options", # Streaming options - handled separately in streaming response
# Not-yet-implemented OpenAI parameters (silently ignored)
"stream", # Streaming responses - not yet implemented
"stop", # Stop sequences - not yet implemented
"top_p", # Nucleus sampling - not yet implemented
"presence_penalty", # Presence penalty - not yet implemented
Expand All @@ -111,6 +114,7 @@ def _build_model_options(request: ChatCompletionRequest) -> dict:
"temperature": ModelOption.TEMPERATURE,
"max_tokens": ModelOption.MAX_NEW_TOKENS,
"seed": ModelOption.SEED,
"stream": ModelOption.STREAM,
}

filtered_options = {
Expand Down Expand Up @@ -157,26 +161,25 @@ async def endpoint(request: ChatCompletionRequest):
model_options=model_options,
)

# Extract usage information from the ModelOutputThunk if available
usage = None
if hasattr(output, "usage") and output.usage is not None:
prompt_tokens = output.usage.get("prompt_tokens", 0)
completion_tokens = output.usage.get("completion_tokens", 0)
# Calculate total_tokens if not provided
total_tokens = output.usage.get(
"total_tokens", prompt_tokens + completion_tokens
)
usage = CompletionUsage(
prompt_tokens=prompt_tokens,
completion_tokens=completion_tokens,
total_tokens=total_tokens,
)

# system_fingerprint represents backend config hash, not model name
# The model name is already in response.model (line 73)
# Leave as None since we don't track backend config fingerprints yet
system_fingerprint = None

# Handle streaming response
if request.stream:
return StreamingResponse(
stream_chat_completion_chunks(
output=output,
completion_id=completion_id,
model=request.model,
created=created_timestamp,
stream_options=request.stream_options,
system_fingerprint=system_fingerprint,
),
media_type="text/event-stream",
)

return ChatCompletion(
id=completion_id,
model=request.model,
Expand All @@ -192,7 +195,7 @@ async def endpoint(request: ChatCompletionRequest):
],
object="chat.completion", # type: ignore
system_fingerprint=system_fingerprint,
usage=usage,
usage=build_completion_usage(output),
) # type: ignore
except ValueError as e:
# Handle validation errors or invalid input
Expand Down
74 changes: 63 additions & 11 deletions cli/serve/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@

from pydantic import BaseModel, Field

from mellea.helpers.openai_compatible_helpers import CompletionUsage


class ChatMessage(BaseModel):
role: Literal["system", "user", "assistant", "tool", "function"]
Expand Down Expand Up @@ -58,6 +60,13 @@ class ChatCompletionRequest(BaseModel):
seed: int | None = None
response_format: ResponseFormat | None = None

# OpenAI-compatible streaming options. Only applies when stream=True.
# Supports `include_usage` (bool) to control whether usage statistics are
# included in the final streaming chunk. Defaults to True (include usage)
# when not specified for backward compatibility. For non-streaming requests
# (stream=False), usage is always included regardless of this parameter.
stream_options: dict[str, Any] | None = None

# For future/undocumented fields
extra: dict[str, Any] = Field(default_factory=dict)

Expand Down Expand Up @@ -88,17 +97,6 @@ class Choice(BaseModel):
"""The reason the model stopped generating tokens."""


class CompletionUsage(BaseModel):
completion_tokens: int
"""Number of tokens in the generated completion."""

prompt_tokens: int
"""Number of tokens in the prompt."""

total_tokens: int
"""Total number of tokens used in the request (prompt + completion)."""


class ChatCompletion(BaseModel):
id: str
"""A unique identifier for the chat completion."""
Expand All @@ -125,6 +123,60 @@ class ChatCompletion(BaseModel):
"""Usage statistics for the completion request."""


class ChatCompletionChunkDelta(BaseModel):
"""Delta content in a streaming chunk."""

content: str | None = None
"""The content fragment in this chunk."""

role: Literal["assistant"] | None = None
"""The role (only present in first chunk)."""

refusal: str | None = None
"""The refusal message fragment, if any."""


class ChatCompletionChunkChoice(BaseModel):
"""A choice in a streaming chunk."""

index: int
"""The index of the choice in the list of choices."""

delta: ChatCompletionChunkDelta
"""The delta content for this chunk."""

finish_reason: (
Literal["stop", "length", "content_filter", "tool_calls", "function_call"]
| None
) = None
"""The reason the model stopped generating tokens (only in final chunk)."""


class ChatCompletionChunk(BaseModel):
"""A chunk in a streaming chat completion response."""

id: str
"""A unique identifier for the chat completion."""

choices: list[ChatCompletionChunkChoice]
"""A list of chat completion choices."""

created: int
"""The Unix timestamp (in seconds) of when the chat completion was created."""

model: str
"""The model used for the chat completion."""

object: Literal["chat.completion.chunk"]
"""The object type, which is always `chat.completion.chunk`."""

system_fingerprint: str | None = None
"""This fingerprint represents the backend configuration that the model runs with."""

usage: CompletionUsage | None = None
"""Usage statistics for the final streaming chunk when available from the backend."""


class OpenAIError(BaseModel):
"""OpenAI API error object."""

Expand Down
64 changes: 61 additions & 3 deletions docs/examples/m_serve/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,21 @@ A simple example showing how to structure a Mellea program for serving as an API
- Custom validation functions for API constraints
- Handling chat message inputs

### m_serve_example_streaming.py
A dedicated streaming example for `m serve` that supports both modes:
- `stream=False` returns a normal computed response
- `stream=True` returns an uncomputed thunk so the server can emit
incremental Server-Sent Events (SSE) chunks

### pii_serve.py
Example of serving a PII (Personally Identifiable Information) detection service.

### client.py
Client code for testing the served API endpoints.
Client code for testing the served API endpoints with non-streaming requests.

### client_streaming.py
Client code demonstrating streaming responses using Server-Sent Events (SSE)
against `m_serve_example_streaming.py`.

## Concepts Demonstrated

Expand All @@ -26,6 +36,7 @@ Client code for testing the served API endpoints.
- **Output Formatting**: Returning appropriate response types
- **Validation in Production**: Using requirements in deployed services
- **Model Options**: Passing model configuration through API
- **Streaming Responses**: Real-time token streaming via Server-Sent Events (SSE)

## Basic Pattern

Expand Down Expand Up @@ -53,12 +64,59 @@ def serve(input: list[ChatMessage],

## Running the Server

### Sampling

```bash
# Start the server
# Start the sampling example server
m serve docs/examples/m_serve/m_serve_example_simple.py

# In another terminal, test with client
# In another terminal, test with the non-streaming client
python docs/examples/m_serve/client.py

### Streaming

# Start the dedicated streaming example server
m serve docs/examples/m_serve/m_serve_example_streaming.py

# In another terminal, test with the streaming client
python docs/examples/m_serve/client_streaming.py
```

## Streaming Support

The server supports streaming responses via Server-Sent Events (SSE) when the
`stream=True` parameter is set in the request. This allows clients to receive
tokens as they are generated, providing a better user experience for long-running
generations.

For a real streaming demo, serve `m_serve_example_streaming.py`. That example
supports both normal and streaming responses consistently. The sampling example
(`m_serve_example_simple.py`) demonstrates rejection sampling and validation,
not token-by-token streaming.

**Key Features:**
- Real-time token streaming using SSE
- OpenAI-compatible streaming format (`ChatCompletionChunk`)
- Final chunk includes usage statistics when the backend provides usage data
- The dedicated streaming example supports both `stream=False` and `stream=True`
- Works with any backend that supports `ModelOutputThunk.astream()`

**Example:**
```python
import openai

client = openai.OpenAI(api_key="na", base_url="http://0.0.0.0:8080/v1")

# Enable streaming with stream=True
stream = client.chat.completions.create(
messages=[{"role": "user", "content": "Tell me a story"}],
model="granite4:micro-h",
stream=True,
)

for chunk in stream:
if chunk.choices[0].delta.content:
print(chunk.choices[0].delta.content, end="", flush=True)
```

## API Endpoints
Expand Down
49 changes: 49 additions & 0 deletions docs/examples/m_serve/client_streaming.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
# pytest: skip_always
"""Example client demonstrating responses from m serve.

This example shows how to use the OpenAI Python client with a Mellea server
started with:

m serve docs/examples/m_serve/m_serve_example_streaming.py

Set ``streaming`` below to:
- ``True`` for incremental SSE chunks
- ``False`` for a normal non-streaming response
"""

import openai

PORT = 8080

client = openai.OpenAI(api_key="na", base_url=f"http://0.0.0.0:{PORT}/v1")

streaming = True # streaming enabled toggle

print(f"stream={streaming} response:")
print("-" * 50)

# Request either a streaming or non-streaming response from the dedicated example server
if streaming:
stream_result = client.chat.completions.create(
messages=[
{"role": "user", "content": "Count down from 100 using words not digits."}
],
model="granite4:micro-h",
stream=True,
)
for chunk in stream_result:
if chunk.choices[0].delta.content:
# If you want to see the chunks more clearly separated, change end
print(chunk.choices[0].delta.content, end="", flush=True)
else:
completion_result = client.chat.completions.create(
messages=[
{"role": "user", "content": "Count down from 100 using words not digits."}
],
model="granite4:micro-h",
stream=False,
)
print(completion_result.choices[0].message.content)

print("\n" + "-" * 50)
print("Stream complete!")
41 changes: 41 additions & 0 deletions docs/examples/m_serve/m_serve_example_streaming.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
# pytest: ollama, e2e

"""Example to run m serve with true streaming support."""

import mellea
from cli.serve.models import ChatMessage
from mellea.backends.model_options import ModelOption
from mellea.core import ComputedModelOutputThunk, ModelOutputThunk
from mellea.stdlib.context import SimpleContext

session = mellea.start_session(ctx=SimpleContext())


async def serve(
input: list[ChatMessage],
requirements: list[str] | None = None,
model_options: dict | None = None,
) -> ModelOutputThunk | ComputedModelOutputThunk:
"""Support both normal and streaming responses from the same example.

Returns a computed result for non-streaming requests and an uncomputed thunk
for streaming requests.
"""
del requirements
message = input[-1].content or ""
is_streaming = bool((model_options or {}).get(ModelOption.STREAM, False))

if is_streaming:
return await session.ainstruct(
description=message,
strategy=None,
model_options=model_options,
await_result=False,
)

return await session.ainstruct(
description=message,
strategy=None,
model_options=model_options,
await_result=True,
)
Loading
Loading