Skip to content

add support for SSE #44906

@iscai-msft

Description

@iscai-msft

copy paste of @kristapratico GitHub issue :*

Streaming

This gist provides a draft of what the streaming implementation could look like for JSONL and Server Sent Events (SSE) implemented in Python.

Stream response examples: Examples

TypeSpec Reference: TypeSpec Streams Sync

Stream interface

Provide a generic, stream-agnostic Stream iterator in azure-core. It iterates over http_response.iter_bytes(), parsing and decoding chunks using the appropriate decoder class. When an event is ready to be dispatched, it calls deserialization_callback to deserialize the JSON to the model returned to the user.

In azure.core:

Note: below code is just a prototype I was playing with to try design concept with SSE streaming. It does not cover all scenarios.

from typing import Iterator, TypeVar, Type

ReturnType = TypeVar("ReturnType")


class Stream(Iterator[ReturnType]):
    """Stream class.

    :keyword response: The response object.
    :paramtype response: ~azure.core.pipeline.PipelineResponse
    :keyword deserialization_callback: A callback that takes HttpResponse and returns a deserialized object
    :paramtype deserialization_callback: Callable
    :keyword terminal_event: A terminal event that indicates the end of the SSE stream.
    :paramtype terminal_event: Optional[str]
    """
    def __init__(
        self,
        *,
        response: PipelineResponse,
        deserialization_callback: Callable[[Any, Any], ReturnType],
        terminal_event: Optional[str] = None,
    ) -> None:
        self._response = response.http_response
        self._decoder = SSEDecoder() if self._response.headers.get("Content-Type") == "text/event-stream" else JSONLDecoder()
        self._deserialization_callback = deserialization_callback
        self._terminal_event = terminal_event
        self._iterator = self._iter_events()

    def __next__(self) -> ReturnType:
        return self._iterator.__next__()

    def __iter__(self) -> Iterator[ReturnType]:
        yield from self._iterator

    def _iter_events(self) -> Iterator[ReturnType]:
        for line in self._parse_chunk(self._response.iter_bytes()):
            for data in line.splitlines():
                if data:
                    self._decoder.decode(data)
                else:
                    event = self._decoder.event()
                    if self._terminal_event:
                        if event.data == self._terminal_event:
                            break

                    event_model = self._deserialization_callback(self._response, event.json())

                    yield event_model

    def _parse_chunk(self, iter_bytes: Iterator[bytes]) -> Iterator[str]:
        data = b''
        for chunk in iter_bytes:
            for line in chunk.splitlines(keepends=True):
                data += line
                if data.endswith((b'\r\r', b'\n\n', b'\r\n\r\n')):
                    yield data.decode("utf-8")
                    data = b''

    def __exit__(
        self,
        exc_type: type[BaseException] | None,
        exc: BaseException | None,
        exc_tb: TracebackType | None,
    ) -> None:
        self.close()

    def __enter__(self) -> Self:
        return self

    def close(self) -> None:
        self._response.close()

Decoders

Provide separate decoder classes for JSONL and SSE. These aim to collect and decode the data per stream chunk. Each decoder has a decode method which takes str and decodes per the spec, and an event method which creates and returns a SSE or JSONL event.

SSEDecoder

class SSEDecoder:
    def __init__(self) -> None:
        self.data: list[str] = []
        self.last_event_id = None
        self.event = None
        self.retry = None

    def decode(self, line: str) -> None:
        if line.startswith(":"):
            # comment, ignore the line
            return None

        if ":" in line:
            field, _, value = line.partition(":")
            if value.startswith(" "):
                # data:test and data: test are equivalent
                value = value[1:]
        else:
            field = line
            value = ""

        if field == "data":
            self.data.append(value)
        elif field == "event":
            self.event = value
        elif field == "id":
            if "\0" in value:
                pass
            else:
                self.last_event_id = value
        elif field == "retry":
            try:
                self.retry = int(value)
            except (TypeError, ValueError):
                pass

        # else: ignore the field

    def event(self) -> ServerSentEvent:
        sse = ServerSentEvent(
            event=self.event,
            data="\n".join(self.data),
            id=self.last_event_id,
            retry=self.retry,
        )
        
        self.data = []
        self.event = None
        self.retry = None
        return sse

JSONLDecoder

class JSONLDecoder:
    def __init__(self) -> None:
        self.data: list[str] = []

    def decode(self, line: str) -> None:
        self.data.append(line)

    def event(self) -> JSONLEvent:
        jsonl = JSONLEvent(data="\n".join(self.data))
        self.data = []
        return jsonl

Event types

We have 2 types which represent a ServerSentEvent and a JSONLEvent.

ServerSentEvent

class ServerSentEvent:
    def __init__(
        self,
        *,
        data: Optional[str] = None,
        event: Optional[str] = None,
        id: Optional[str] = None,
        retry: Optional[int] = None,
    ) -> None:
        self.data = data
        self.event = event
        self.id = id
        self.retry = retry

    def json(self) -> Any:
        return json.loads(self.data)

JSONLEvent

class JSONLEvent:
    def __init__(
        self,
        *,
        data: Optional[str] = None,
    ) -> None:
        self.data = data

    def json(self) -> Any:
        return json.loads(self.data)

Note: the only type exposed to users is the Stream class. The rest of the above classes are internal implementation details.

Library-specific code

Generated for homogeneous events

This example shows JSONL streaming, but similar could work for SSE.
Homogeneous events return the same type for every event.

Models:

class Notification:
    id: str
    content: str
    timestamp: datetime.datetime

Operations:

class OperationMixin:
    def subscribe(self, **kwargs: Any) -> Stream[Notification]:
        pipeline_response: PipelineResponse = self._client._pipeline.run(
            request, stream=True, **kwargs
        )

        def callback(pipeline_response, model_json):
            deserialized = _deserialize(Notification, model_json)
            if cls:
                return cls(pipeline_response, deserialized, {})
            return deserialized

        return Stream[Notification](
            response=pipeline_response,
            deserialization_callback=callback
        )

User code:

response = client.subscribe(..., stream=True)
for item in response:
    print(item)  # Notification

Generated for heterogeneous events

Heterogeneous events can have more than one event type.

Models:

class ChannelEvent(_model_base.Model):
  kind: str

class UserConnect(ChannelEvent, discriminator="userconnect"):
  kind: Literal["userconnect"]
  username: str
  time: str

class UserMessage(ChannelEvent, discriminator="usermessage"):
  kind: Literal["usermessage"]
  username: str
  time: str
  text: str

class UserDisconnect(ChannelEvent, discriminator="userdisconnect"):
  kind: Literal["userdisconnect"]
  username: str
  time: str

ChannelEvents: TypeAlias = Union[UserConnect, UserMessage, UserDisconnect]

Operations:

class OperationMixin:
    def subscribe(self, **kwargs: Any) -> Stream[ChannelEvents]:
        pipeline_response: PipelineResponse = self._client._pipeline.run(
            request, stream=True, **kwargs
        )

        def callback(pipeline_response, model_json):
            deserialized = _deserialize(ChannelEvent, model_json)
            if cls:
                return cls(pipeline_response, deserialized, {})
            return deserialized

        return Stream[ChannelEvents](
            response=pipeline_response,
            deserialization_callback=callback,
            terminal_event="[DONE]",
        )

User code:

 events: ChannelEvents = client.subscribe(..., stream=True)
 for event in events:
    if event.kind == "userconnect":
        print("User is connecting...")
    elif event.kind == "usermessage":
        print("hello")
    elif event.kind == "userdisconnect":
        print("goodbye")

Handling Events

NOTE: For the initial implementation we will not be adding any convenience for handling events like outlined below. We will wait for feedback and assess later.

1) Allow the user to register callbacks for event types

Provide a method on the Stream class which lets you register a callback for a specific event type. When there is an event of that type, the provided callback is called with the ChannelEvents variant.

class Stream:
    # ...insert implementation from above section...

    def add_callback(self, event_type: str, on_event: Callable[[ReturnType], None]) -> None:
        self._callbacks[event_type] = callback
    
    def iter_events(self) -> ReturnType:
        """Convenience method"""
        for event in self._iterator:
            yield event

User code:

def say_goodbye(user_disconnect):
    print("Goodbye!")

response = client.subscribe(..., stream=True)
response.add_callback(event_type="userdisconnect", say_goodbye)

# Provide convenience method that iterates over events so users don't have to iterate themselves
response.iter_events()

2) Provide an EventHandler interface which a user can subclass and access specific events when they are dispatched

For services that expose multiple events, we can provide additional convenience by generating a tailored EventHandler class (the method names come from the event types, may need to do some language-specific conversion to snake_case, camelCase, etc).

Models:

class ChannelEventHandler:

    def on_userconnect(self, userconnect: UserConnect) -> None:
        pass
        
    def on_usermessage(self, usermessage: UserMessage) -> None:
        pass

    def on_userdisconnect(self, userdisconnect: UserDisconnect) -> None:
        pass

Operations:

class OperationsMixin:

    def subscribe(self, **kwargs: Any) -> Stream[ChannelEvents]:
        pipeline_response: PipelineResponse = self._client._pipeline.run(
            request, stream=True, **kwargs
        )

        def callback(pipeline_response, model_json):
            deserialized = _deserialize(ChannelEvent, model_json)
            if cls:
                return cls(pipeline_response, deserialized, {})
            return deserialized

        return Stream[ChannelEvents](
            response=pipeline_response,
            deserialization_callback=callback,
            event_hander=event_handler or ChannelEventHandler(),
        )

User code:

A user will define their own Event handler subclass and implement the methods for the events they want act on.

class CustomEventHandler(ChannelEventHandler):

    def on_usermessage(self, usermessage: UserMessage) -> None:
        # my custom logic for the event
        ...
 
stream = client.subscribe(..., event_handler=CustomEventHandler())
# events handled in CustomEventHandler
stream.iter_events()

Metadata

Metadata

Assignees

Type

No type

Projects

No projects

Milestone

No milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions