Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 10 additions & 3 deletions langfuse/openai.py
Original file line number Diff line number Diff line change
Expand Up @@ -640,7 +640,9 @@ def _extract_streamed_openai_response(resource: Any, chunks: Any) -> Any:
chunk = chunk.__dict__

model = model or chunk.get("model", None) or None
usage = chunk.get("usage", None)
chunk_usage = chunk.get("usage", None)
if chunk_usage is not None:
usage = chunk_usage

choices = chunk.get("choices", [])

Expand All @@ -649,11 +651,16 @@ def _extract_streamed_openai_response(resource: Any, chunks: Any) -> Any:
choice = choice.__dict__
if resource.type == "chat":
delta = choice.get("delta", None)
finish_reason = choice.get("finish_reason", None)
choice_finish_reason = choice.get("finish_reason", None)
if choice_finish_reason is not None:
finish_reason = choice_finish_reason

if _is_openai_v1():
if _is_openai_v1() and delta is not None:
delta = delta.__dict__

if delta is None:
delta = {}

if delta.get("role", None) is not None:
completion["role"] = delta["role"]

Expand Down
3 changes: 2 additions & 1 deletion tests/e2e/test_core_sdk.py
Original file line number Diff line number Diff line change
Expand Up @@ -2069,8 +2069,9 @@ def test_create_trace_sampling_zero():
}


def test_mask_function():
def test_mask_function(request):
LangfuseResourceManager.reset()
request.addfinalizer(LangfuseResourceManager.reset)

def mask_func(data):
if isinstance(data, dict):
Expand Down
93 changes: 93 additions & 0 deletions tests/unit/test_openai.py
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,64 @@ def _make_chat_stream_chunks():
]


def _make_chat_stream_chunks_with_trailing_content_filter_chunk():
usage = SimpleNamespace(prompt_tokens=3, completion_tokens=1, total_tokens=4)

return [
SimpleNamespace(
model="gpt-4o-mini",
choices=[
SimpleNamespace(
delta=SimpleNamespace(
role="assistant",
content="2",
function_call=None,
tool_calls=None,
),
finish_reason=None,
)
],
usage=None,
),
SimpleNamespace(
model="gpt-4o-mini",
choices=[
SimpleNamespace(
delta=SimpleNamespace(
role=None,
content=None,
function_call=None,
tool_calls=None,
),
finish_reason="stop",
)
],
usage=usage,
),
SimpleNamespace(
model="",
choices=[
SimpleNamespace(
delta=None,
finish_reason=None,
content_filter_offsets={
"check_offset": 44,
"start_offset": 44,
"end_offset": 121,
},
content_filter_results={
"hate": {"filtered": False, "severity": "safe"},
"self_harm": {"filtered": False, "severity": "safe"},
"sexual": {"filtered": False, "severity": "safe"},
"violence": {"filtered": False, "severity": "safe"},
},
)
],
usage=None,
),
]


def _make_single_chunk_stream():
return SimpleNamespace(
model="gpt-4o-mini",
Expand Down Expand Up @@ -315,6 +373,41 @@ def test_openai_stream_preserves_original_stream_contract(
}


def test_openai_stream_handles_trailing_azure_content_filter_chunk(
langfuse_memory_client, get_span, json_attr
):
openai_client = lf_openai.OpenAI(api_key="test")
raw_stream = DummyOpenAIStream(
_make_chat_stream_chunks_with_trailing_content_filter_chunk(),
DummySyncResponse(),
)

with patch.object(openai_client.chat.completions, "_post", return_value=raw_stream):
stream = openai_client.chat.completions.create(
name="unit-openai-native-stream-azure-filter",
model="gpt-4o-mini",
messages=[{"role": "user", "content": "1 + 1 = ?"}],
temperature=0,
stream=True,
)

chunks = list(stream)
stream.close()

assert len(chunks) == 3

langfuse_memory_client.flush()
span = get_span("unit-openai-native-stream-azure-filter")

assert span.attributes[LangfuseOtelSpanAttributes.OBSERVATION_OUTPUT] == "2"
assert span.attributes["langfuse.observation.metadata.finish_reason"] == "stop"
assert json_attr(span, LangfuseOtelSpanAttributes.OBSERVATION_USAGE_DETAILS) == {
"prompt_tokens": 3,
"completion_tokens": 1,
"total_tokens": 4,
}


def test_openai_stream_break_still_finalizes_generation(
langfuse_memory_client, get_span
):
Expand Down
Loading