-
Notifications
You must be signed in to change notification settings - Fork 3.2k
Description
copy paste of @kristapratico GitHub issue :*
Streaming
This gist provides a draft of what the streaming implementation could look like for JSONL and Server Sent Events (SSE) implemented in Python.
Stream response examples: Examples
TypeSpec Reference: TypeSpec Streams Sync
Stream interface
Provide a generic, stream-agnostic Stream iterator in azure-core. It iterates over http_response.iter_bytes(), parsing and decoding chunks using the appropriate decoder class. When an event is ready to be dispatched, it calls deserialization_callback to deserialize the JSON to the model returned to the user.
In azure.core:
Note: below code is just a prototype I was playing with to try design concept with SSE streaming. It does not cover all scenarios.
from typing import Iterator, TypeVar, Type
ReturnType = TypeVar("ReturnType")
class Stream(Iterator[ReturnType]):
"""Stream class.
:keyword response: The response object.
:paramtype response: ~azure.core.pipeline.PipelineResponse
:keyword deserialization_callback: A callback that takes HttpResponse and returns a deserialized object
:paramtype deserialization_callback: Callable
:keyword terminal_event: A terminal event that indicates the end of the SSE stream.
:paramtype terminal_event: Optional[str]
"""
def __init__(
self,
*,
response: PipelineResponse,
deserialization_callback: Callable[[Any, Any], ReturnType],
terminal_event: Optional[str] = None,
) -> None:
self._response = response.http_response
self._decoder = SSEDecoder() if self._response.headers.get("Content-Type") == "text/event-stream" else JSONLDecoder()
self._deserialization_callback = deserialization_callback
self._terminal_event = terminal_event
self._iterator = self._iter_events()
def __next__(self) -> ReturnType:
return self._iterator.__next__()
def __iter__(self) -> Iterator[ReturnType]:
yield from self._iterator
def _iter_events(self) -> Iterator[ReturnType]:
for line in self._parse_chunk(self._response.iter_bytes()):
for data in line.splitlines():
if data:
self._decoder.decode(data)
else:
event = self._decoder.event()
if self._terminal_event:
if event.data == self._terminal_event:
break
event_model = self._deserialization_callback(self._response, event.json())
yield event_model
def _parse_chunk(self, iter_bytes: Iterator[bytes]) -> Iterator[str]:
data = b''
for chunk in iter_bytes:
for line in chunk.splitlines(keepends=True):
data += line
if data.endswith((b'\r\r', b'\n\n', b'\r\n\r\n')):
yield data.decode("utf-8")
data = b''
def __exit__(
self,
exc_type: type[BaseException] | None,
exc: BaseException | None,
exc_tb: TracebackType | None,
) -> None:
self.close()
def __enter__(self) -> Self:
return self
def close(self) -> None:
self._response.close()Decoders
Provide separate decoder classes for JSONL and SSE. These aim to collect and decode the data per stream chunk. Each decoder has a decode method which takes str and decodes per the spec, and an event method which creates and returns a SSE or JSONL event.
SSEDecoder
class SSEDecoder:
def __init__(self) -> None:
self.data: list[str] = []
self.last_event_id = None
self.event = None
self.retry = None
def decode(self, line: str) -> None:
if line.startswith(":"):
# comment, ignore the line
return None
if ":" in line:
field, _, value = line.partition(":")
if value.startswith(" "):
# data:test and data: test are equivalent
value = value[1:]
else:
field = line
value = ""
if field == "data":
self.data.append(value)
elif field == "event":
self.event = value
elif field == "id":
if "\0" in value:
pass
else:
self.last_event_id = value
elif field == "retry":
try:
self.retry = int(value)
except (TypeError, ValueError):
pass
# else: ignore the field
def event(self) -> ServerSentEvent:
sse = ServerSentEvent(
event=self.event,
data="\n".join(self.data),
id=self.last_event_id,
retry=self.retry,
)
self.data = []
self.event = None
self.retry = None
return sseJSONLDecoder
class JSONLDecoder:
def __init__(self) -> None:
self.data: list[str] = []
def decode(self, line: str) -> None:
self.data.append(line)
def event(self) -> JSONLEvent:
jsonl = JSONLEvent(data="\n".join(self.data))
self.data = []
return jsonlEvent types
We have 2 types which represent a ServerSentEvent and a JSONLEvent.
ServerSentEvent
class ServerSentEvent:
def __init__(
self,
*,
data: Optional[str] = None,
event: Optional[str] = None,
id: Optional[str] = None,
retry: Optional[int] = None,
) -> None:
self.data = data
self.event = event
self.id = id
self.retry = retry
def json(self) -> Any:
return json.loads(self.data)JSONLEvent
class JSONLEvent:
def __init__(
self,
*,
data: Optional[str] = None,
) -> None:
self.data = data
def json(self) -> Any:
return json.loads(self.data)Note: the only type exposed to users is the
Streamclass. The rest of the above classes are internal implementation details.
Library-specific code
Generated for homogeneous events
This example shows JSONL streaming, but similar could work for SSE.
Homogeneous events return the same type for every event.
Models:
class Notification:
id: str
content: str
timestamp: datetime.datetimeOperations:
class OperationMixin:
def subscribe(self, **kwargs: Any) -> Stream[Notification]:
pipeline_response: PipelineResponse = self._client._pipeline.run(
request, stream=True, **kwargs
)
def callback(pipeline_response, model_json):
deserialized = _deserialize(Notification, model_json)
if cls:
return cls(pipeline_response, deserialized, {})
return deserialized
return Stream[Notification](
response=pipeline_response,
deserialization_callback=callback
)User code:
response = client.subscribe(..., stream=True)
for item in response:
print(item) # NotificationGenerated for heterogeneous events
Heterogeneous events can have more than one event type.
Models:
class ChannelEvent(_model_base.Model):
kind: str
class UserConnect(ChannelEvent, discriminator="userconnect"):
kind: Literal["userconnect"]
username: str
time: str
class UserMessage(ChannelEvent, discriminator="usermessage"):
kind: Literal["usermessage"]
username: str
time: str
text: str
class UserDisconnect(ChannelEvent, discriminator="userdisconnect"):
kind: Literal["userdisconnect"]
username: str
time: str
ChannelEvents: TypeAlias = Union[UserConnect, UserMessage, UserDisconnect]Operations:
class OperationMixin:
def subscribe(self, **kwargs: Any) -> Stream[ChannelEvents]:
pipeline_response: PipelineResponse = self._client._pipeline.run(
request, stream=True, **kwargs
)
def callback(pipeline_response, model_json):
deserialized = _deserialize(ChannelEvent, model_json)
if cls:
return cls(pipeline_response, deserialized, {})
return deserialized
return Stream[ChannelEvents](
response=pipeline_response,
deserialization_callback=callback,
terminal_event="[DONE]",
)User code:
events: ChannelEvents = client.subscribe(..., stream=True)
for event in events:
if event.kind == "userconnect":
print("User is connecting...")
elif event.kind == "usermessage":
print("hello")
elif event.kind == "userdisconnect":
print("goodbye")Handling Events
NOTE: For the initial implementation we will not be adding any convenience for handling events like outlined below. We will wait for feedback and assess later.
1) Allow the user to register callbacks for event types
Provide a method on the Stream class which lets you register a callback for a specific event type. When there is an event of that type, the provided callback is called with the ChannelEvents variant.
class Stream:
# ...insert implementation from above section...
def add_callback(self, event_type: str, on_event: Callable[[ReturnType], None]) -> None:
self._callbacks[event_type] = callback
def iter_events(self) -> ReturnType:
"""Convenience method"""
for event in self._iterator:
yield eventUser code:
def say_goodbye(user_disconnect):
print("Goodbye!")
response = client.subscribe(..., stream=True)
response.add_callback(event_type="userdisconnect", say_goodbye)
# Provide convenience method that iterates over events so users don't have to iterate themselves
response.iter_events()2) Provide an EventHandler interface which a user can subclass and access specific events when they are dispatched
For services that expose multiple events, we can provide additional convenience by generating a tailored EventHandler class (the method names come from the event types, may need to do some language-specific conversion to snake_case, camelCase, etc).
Models:
class ChannelEventHandler:
def on_userconnect(self, userconnect: UserConnect) -> None:
pass
def on_usermessage(self, usermessage: UserMessage) -> None:
pass
def on_userdisconnect(self, userdisconnect: UserDisconnect) -> None:
passOperations:
class OperationsMixin:
def subscribe(self, **kwargs: Any) -> Stream[ChannelEvents]:
pipeline_response: PipelineResponse = self._client._pipeline.run(
request, stream=True, **kwargs
)
def callback(pipeline_response, model_json):
deserialized = _deserialize(ChannelEvent, model_json)
if cls:
return cls(pipeline_response, deserialized, {})
return deserialized
return Stream[ChannelEvents](
response=pipeline_response,
deserialization_callback=callback,
event_hander=event_handler or ChannelEventHandler(),
)User code:
A user will define their own Event handler subclass and implement the methods for the events they want act on.
class CustomEventHandler(ChannelEventHandler):
def on_usermessage(self, usermessage: UserMessage) -> None:
# my custom logic for the event
...
stream = client.subscribe(..., event_handler=CustomEventHandler())
# events handled in CustomEventHandler
stream.iter_events()