Skip to content

Commit 369bdc2

Browse files
authored
PTHMINT-119: SSE event stream support and EventManager (#59)
This change introduces Server-Sent Events (SSE) support and real-time event stream processing to the SDK. It implements the HTTPStreamingTransport protocol and a high-level EventManager to easily subscribe to order event streams via tokens or URLs. Additionally, the Order response model is updated with backward-compatible fields, comprehensive usage examples are added to the README, and robust unit and E2E tests are included to ensure streaming stability.
1 parent 01815bd commit 369bdc2

27 files changed

Lines changed: 2007 additions & 75 deletions

File tree

README.md

Lines changed: 50 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -41,11 +41,14 @@ The SDK uses a small transport abstraction so you can choose (and swap) the unde
4141
### How it works
4242

4343
- The SDK expects an object implementing the `HTTPTransport` / `HTTPResponse` protocols defined in `src/multisafepay/transport/http_transport.py`.
44+
- Event stream subscriptions additionally require the transport to implement the `HTTPStreamingTransport` protocol (adds `open_stream(...)` returning an `HTTPStreamResponse` with `readline()`, `close()`, and `raise_for_status()`).
4445
- If you do not provide a transport, the SDK defaults to `RequestsTransport`.
4546
- `requests` is an optional extra:
4647
- To use the default transport, install `multisafepay[requests]`.
4748
- To avoid `requests`, inject your own transport (for example, `httpx` or `urllib3`).
4849

50+
The built-in `RequestsTransport` implements both `HTTPTransport` and `HTTPStreamingTransport`, so the same configured `requests.Session` is reused for regular requests and SSE streams. Custom transports that only implement `HTTPTransport` (`request(...)`) can still be used for regular API calls, but SSE subscriptions fail explicitly until they also implement `HTTPStreamingTransport`. The SDK does not fall back to another HTTP library for event streams.
51+
4952
### Custom transport example
5053

5154
```bash
@@ -87,10 +90,10 @@ from multisafepay.client import ScopedCredentialResolver
8790

8891
credential_resolver = ScopedCredentialResolver(
8992
default_api_key="<default_api_key>",
93+
partner_affiliate_api_key="<partner_api_key>",
9094
terminal_group_api_keys={
91-
"Default": "<terminal_group_api_key>",
95+
"<terminal_group_id>": "<terminal_group_api_key>",
9296
},
93-
partner_affiliate_api_key="<partner_api_key>",
9497
)
9598

9699
sdk = Sdk(
@@ -99,41 +102,48 @@ sdk = Sdk(
99102
)
100103
```
101104

102-
Resolver behavior:
103-
104-
- `default_api_key` is used for regular account-scoped requests.
105-
- `partner_affiliate_api_key` is used for partner-affiliate scoped requests and falls back to `default_api_key` when omitted.
105+
### Event stream subscriptions
106106

107-
### Terminal and terminal-group operations
108-
109-
The SDK exposes dedicated managers for POS terminal listing/creation and for listing terminals inside a specific terminal group.
107+
Use `EventManager` to subscribe to MultiSafepay SSE streams directly, or to subscribe from an order response that already contains event credentials.
110108

111109
```python
112-
from multisafepay.client import ScopedCredentialResolver
113110
from multisafepay import Sdk
111+
from multisafepay.client import ScopedCredentialResolver
114112

115113

116114
credential_resolver = ScopedCredentialResolver(
117115
default_api_key="<default_api_key>",
118-
partner_affiliate_api_key="<partner_api_key>",
116+
terminal_group_api_keys={
117+
"<terminal_group_id>": "<terminal_group_api_key>",
118+
},
119119
)
120120

121121
sdk = Sdk(
122122
is_production=False,
123123
credential_resolver=credential_resolver,
124124
)
125125

126-
terminal_manager = sdk.get_terminal_manager()
127-
terminal_group_manager = sdk.get_terminal_group_manager()
126+
order_manager = sdk.get_order_manager()
127+
event_manager = sdk.get_event_manager()
128+
129+
# Build your OrderRequest here, for example:
130+
# from multisafepay.api.paths.orders.request.order_request import OrderRequest
131+
# order_request = OrderRequest(...)
128132

129-
terminals = terminal_manager.get_terminals(options={"limit": 10, "page": 1})
130-
group_terminals = terminal_group_manager.get_terminals_by_group(
133+
create_response = order_manager.create(
134+
request_order=order_request,
131135
terminal_group_id="<terminal_group_id>",
132-
options={"limit": 10, "page": 1},
133136
)
137+
order = create_response.get_data()
138+
139+
with event_manager.subscribe_order_events(order, timeout=45.0) as stream:
140+
for event in stream:
141+
print(event)
134142
```
135143

136-
See terminal examples in `examples/terminal_manager/` and `examples/terminal_group_manager/`.
144+
Use `subscribe_events(events_token=..., events_stream_url=...)` when the token and stream URL are already available separately.
145+
146+
SSE subscriptions use the same configured SDK transport as regular API calls. With the default transport this reuses the same `requests.Session`; with a custom transport, implement the `HTTPStreamingTransport` protocol (adds `open_stream(...)` on top of `HTTPTransport`) on that transport instead of opening a separate HTTP connection path.
137147

138148
### Development-only custom base URL override
139149

@@ -180,6 +190,29 @@ In any non-dev profile (including default `release`), custom base URLs are block
180190

181191
Go to the folder `examples` to see how to use the SDK.
182192

193+
The event-stream example in `examples/event_manager/subscribe_events.py` requires:
194+
195+
```bash
196+
export API_KEY="<account_api_key>"
197+
export TERMINAL_GROUP_API_KEY_GROUP_DEFAULT="<terminal_group_api_key>"
198+
export CLOUD_POS_TERMINAL_GROUP_ID="<terminal_group_id>"
199+
export CLOUD_POS_TERMINAL_ID="<terminal_id>"
200+
```
201+
202+
The SSE E2E test can also run against a dev-backed base URL and optionally resolve the terminal group automatically:
203+
204+
```bash
205+
export E2E_NO_SANDBOX_BASE_URL="https://dev-api.example.com/v1/"
206+
export MSP_SDK_BUILD_PROFILE=dev
207+
export MSP_SDK_ALLOW_CUSTOM_BASE_URL=1
208+
export MSP_SDK_CUSTOM_BASE_URL="https://dev-api.example.com/v1/"
209+
export E2E_API_KEY="<account_api_key>"
210+
export E2E_TERMINAL_GROUP_API_KEY_GROUP_DEFAULT="<terminal_group_api_key>"
211+
export E2E_CLOUD_POS_TERMINAL_ID="<terminal_id>"
212+
# Optional when CLOUD_POS_TERMINAL_GROUP_ID is not set
213+
export E2E_PARTNER_API_KEY="<partner_api_key>"
214+
```
215+
183216
## Code quality checks
184217

185218
### Linting
Lines changed: 107 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,107 @@
1+
# Copyright (c) MultiSafepay, Inc. All rights reserved.
2+
3+
# This file is licensed under the Open Software License (OSL) version 3.0.
4+
# For a copy of the license, see the LICENSE.txt file in the project root.
5+
6+
# See the DISCLAIMER.md file for disclaimer details.
7+
8+
"""Create a Cloud POS order and subscribe to its event stream."""
9+
10+
import os
11+
import time
12+
13+
from dotenv import load_dotenv
14+
from multisafepay import Sdk
15+
from multisafepay.api.paths.orders.request import OrderRequest
16+
from multisafepay.client import ScopedCredentialResolver
17+
18+
# Load environment variables from a .env file
19+
load_dotenv()
20+
21+
22+
def _get_first_env(*names: str) -> str:
23+
for name in names:
24+
value = os.getenv(name, "").strip()
25+
if value:
26+
return value
27+
28+
return ""
29+
30+
31+
def _require_first_env(*names: str) -> str:
32+
value = _get_first_env(*names)
33+
if value:
34+
return value
35+
36+
raise RuntimeError(
37+
f"Missing required environment variable. Set one of: {', '.join(names)}",
38+
)
39+
40+
41+
DEFAULT_ACCOUNT_API_KEY = _require_first_env("API_KEY", "E2E_API_KEY")
42+
TERMINAL_GROUP_DEFAULT_API_KEY = _require_first_env(
43+
"TERMINAL_GROUP_API_KEY_GROUP_DEFAULT",
44+
"E2E_TERMINAL_GROUP_API_KEY_GROUP_DEFAULT",
45+
)
46+
CLOUD_POS_TERMINAL_GROUP_ID = _require_first_env(
47+
"CLOUD_POS_TERMINAL_GROUP_ID",
48+
)
49+
TERMINAL_ID = _require_first_env(
50+
"CLOUD_POS_TERMINAL_ID",
51+
"E2E_CLOUD_POS_TERMINAL_ID",
52+
)
53+
54+
if __name__ == "__main__":
55+
# This example executes Cloud POS calls with terminal-group scope.
56+
scoped_terminal_group_id = CLOUD_POS_TERMINAL_GROUP_ID
57+
resolver_kwargs = {
58+
"default_api_key": DEFAULT_ACCOUNT_API_KEY,
59+
}
60+
if scoped_terminal_group_id:
61+
resolver_kwargs["terminal_group_api_keys"] = {
62+
scoped_terminal_group_id: TERMINAL_GROUP_DEFAULT_API_KEY,
63+
}
64+
65+
credential_resolver = ScopedCredentialResolver(**resolver_kwargs)
66+
67+
multisafepay_sdk = Sdk(
68+
is_production=False,
69+
credential_resolver=credential_resolver,
70+
)
71+
order_manager = multisafepay_sdk.get_order_manager()
72+
event_manager = multisafepay_sdk.get_event_manager()
73+
74+
order_id = f"cloud-pos-{int(time.time())}"
75+
76+
order_request = (
77+
OrderRequest()
78+
.add_type("redirect")
79+
.add_order_id(order_id)
80+
.add_description("Cloud POS order")
81+
.add_amount(100)
82+
.add_currency("EUR")
83+
.add_gateway_info(
84+
{
85+
"terminal_id": TERMINAL_ID,
86+
},
87+
)
88+
)
89+
90+
create_response = order_manager.create(
91+
order_request,
92+
terminal_group_id=scoped_terminal_group_id,
93+
)
94+
order = create_response.get_data()
95+
96+
if order is None:
97+
raise RuntimeError("Order creation did not return order data")
98+
99+
print(f"Created Cloud POS order: {order.order_id}")
100+
print("Listening for events. Press Ctrl+C to stop.")
101+
102+
try:
103+
with event_manager.subscribe_order_events(order, timeout=45.0) as stream:
104+
for event in stream:
105+
print(event)
106+
except KeyboardInterrupt:
107+
print("Stream interrupted by user.")

examples/order_manager/cloud_pos_order.py

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -37,14 +37,20 @@
3737
)
3838
order_manager = multisafepay_sdk.get_order_manager()
3939

40+
order_id = f"cloud-pos-{int(time.time())}"
41+
4042
order_request = (
4143
OrderRequest()
4244
.add_type("redirect")
43-
.add_order_id(f"cloud-pos-{int(time.time())}")
45+
.add_order_id(order_id)
4446
.add_description("Cloud POS order")
4547
.add_amount(100)
4648
.add_currency("EUR")
47-
.add_gateway_info({"terminal_id": TERMINAL_ID})
49+
.add_gateway_info(
50+
{
51+
"terminal_id": TERMINAL_ID,
52+
},
53+
)
4854
)
4955

5056
create_response = order_manager.create(
Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,14 @@
1+
# Copyright (c) MultiSafepay, Inc. All rights reserved.
2+
3+
# This file is licensed under the Open Software License (OSL) version 3.0.
4+
# For a copy of the license, see the LICENSE.txt file in the project root.
5+
6+
# See the DISCLAIMER.md file for disclaimer details.
7+
8+
"""Events API endpoints."""
9+
10+
from multisafepay.api.paths.events.event_manager import EventManager
11+
12+
__all__ = [
13+
"EventManager",
14+
]
Lines changed: 89 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,89 @@
1+
# Copyright (c) MultiSafepay, Inc. All rights reserved.
2+
3+
# This file is licensed under the Open Software License (OSL) version 3.0.
4+
# For a copy of the license, see the LICENSE.txt file in the project root.
5+
6+
# See the DISCLAIMER.md file for disclaimer details.
7+
8+
"""Event manager for event stream subscription helpers."""
9+
10+
from __future__ import annotations
11+
12+
from multisafepay.api.base.abstract_manager import AbstractManager
13+
from multisafepay.api.paths.events.stream import EventStream
14+
from multisafepay.api.paths.orders.response.order_response import Order
15+
from multisafepay.client.client import Client
16+
17+
18+
class EventManager(AbstractManager):
19+
"""Manages event stream subscriptions for order events."""
20+
21+
def __init__(self: EventManager, client: Client) -> None:
22+
"""Initialize the EventManager with a client."""
23+
super().__init__(client)
24+
25+
def subscribe_events(
26+
self: EventManager,
27+
events_token: str,
28+
events_stream_url: str,
29+
last_event_id: str | None = None,
30+
timeout: float = 30.0,
31+
) -> EventStream:
32+
"""
33+
Subscribe to order events using the SSE stream endpoint.
34+
35+
Parameters
36+
----------
37+
events_token (str): Token returned by order creation for event auth.
38+
events_stream_url (str): Full SSE stream URL.
39+
last_event_id (str | None): Optional resume cursor.
40+
timeout (float): Socket timeout in seconds.
41+
42+
Returns
43+
-------
44+
EventStream: An iterator over incoming SSE messages.
45+
46+
"""
47+
return EventStream.open(
48+
events_token=events_token,
49+
events_stream_url=events_stream_url,
50+
transport=self.client.transport,
51+
last_event_id=last_event_id,
52+
timeout=timeout,
53+
)
54+
55+
def subscribe_order_events(
56+
self: EventManager,
57+
order: Order,
58+
last_event_id: str | None = None,
59+
timeout: float = 30.0,
60+
) -> EventStream:
61+
"""
62+
Subscribe to events for an existing order response object.
63+
64+
Parameters
65+
----------
66+
order (Order): Order response that contains event credentials.
67+
last_event_id (str | None): Optional resume cursor.
68+
timeout (float): Socket timeout in seconds.
69+
70+
Returns
71+
-------
72+
EventStream: An iterator over incoming SSE messages.
73+
74+
"""
75+
events_token = order.events_token or order.event_token
76+
events_stream_url = order.events_stream_url or order.event_stream_url
77+
78+
if not events_token or not events_stream_url:
79+
raise ValueError(
80+
"Order does not contain events_token/event_token "
81+
"or events_stream_url/event_stream_url.",
82+
)
83+
84+
return self.subscribe_events(
85+
events_token=events_token,
86+
events_stream_url=events_stream_url,
87+
last_event_id=last_event_id,
88+
timeout=timeout,
89+
)

0 commit comments

Comments
 (0)