Skip to content

Commit f61ec67

Browse files
committed
ADD: Add support for slow_reader_behavior
1 parent d8ab7d1 commit f61ec67

File tree

7 files changed

+86
-1
lines changed

7 files changed

+86
-1
lines changed

CHANGELOG.md

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,11 @@
11
# Changelog
22

3+
## 0.71.0 - TBD
4+
5+
#### Enhancements
6+
- Added `slow_reader_behavior` field to `AuthenticationRequest` message
7+
- Added `SlowReadBehavior` enum
8+
39
## 0.70.0 - 2026-01-27
410

511
#### Enhancements

databento/common/enums.py

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -257,3 +257,14 @@ class JobState(StringyMixin, str, Enum):
257257
PROCESSING = "processing"
258258
DONE = "done"
259259
EXPIRED = "expired"
260+
261+
262+
@unique
263+
class SlowReadBehavior(StringyMixin, str, Enum):
264+
"""
265+
Live session parameter which controls gateway behavior when the client
266+
falls behind real time.
267+
"""
268+
269+
SKIP = "skip"
270+
WARN = "warn"

databento/live/client.py

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
from databento.common.constants import ALL_SYMBOLS
2222
from databento.common.cram import BUCKET_ID_LENGTH
2323
from databento.common.enums import ReconnectPolicy
24+
from databento.common.enums import SlowReadBehavior
2425
from databento.common.error import BentoError
2526
from databento.common.parsing import optional_datetime_to_unix_nanoseconds
2627
from databento.common.publishers import Dataset
@@ -63,6 +64,10 @@ class Live:
6364
The reconnect policy for the live session.
6465
- "none": the client will not reconnect (default)
6566
- "reconnect": the client will reconnect automatically
67+
slow_reader_behavior: SlowReadBehavior | str, optional
68+
The live gateway behavior when the client falls behind real time.
69+
- "skip": skip records to immediately catch up
70+
- "warn": send a slow reader warning `SystemMsg` but continue reading every record
6671
6772
"""
6873

@@ -82,6 +87,7 @@ def __init__(
8287
ts_out: bool = False,
8388
heartbeat_interval_s: int | None = None,
8489
reconnect_policy: ReconnectPolicy | str = ReconnectPolicy.NONE,
90+
slow_reader_behavior: SlowReadBehavior | str | None = None,
8591
) -> None:
8692
if key is None:
8793
key = os.environ.get("DATABENTO_API_KEY")
@@ -112,6 +118,7 @@ def __init__(
112118
user_gateway=self._gateway,
113119
user_port=port,
114120
reconnect_policy=reconnect_policy,
121+
slow_reader_behavior=slow_reader_behavior,
115122
)
116123

117124
self._session._user_callbacks.append(ClientRecordCallback(self._map_symbol))

databento/live/gateway.py

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111
from databento_dbn import Schema
1212
from databento_dbn import SType
1313

14+
from databento.common.enums import SlowReadBehavior
1415
from databento.common.publishers import Dataset
1516
from databento.common.system import USER_AGENT
1617

@@ -118,8 +119,14 @@ class AuthenticationRequest(GatewayControl):
118119
details: str | None = None
119120
ts_out: str = "0"
120121
heartbeat_interval_s: int | None = None
122+
slow_reader_behavior: SlowReadBehavior | str | None = None
121123
client: str = USER_AGENT
122124

125+
def __post_init__(self) -> None:
126+
# Temporary work around for LSG support
127+
if self.slow_reader_behavior in [SlowReadBehavior.SKIP, "skip"]:
128+
self.slow_reader_behavior = "drop"
129+
123130

124131
@dataclasses.dataclass
125132
class SubscriptionRequest(GatewayControl):

databento/live/protocol.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616

1717
from databento.common import cram
1818
from databento.common.constants import ALL_SYMBOLS
19+
from databento.common.enums import SlowReadBehavior
1920
from databento.common.error import BentoError
2021
from databento.common.iterator import chunk
2122
from databento.common.parsing import optional_datetime_to_unix_nanoseconds
@@ -73,6 +74,7 @@ def __init__(
7374
dataset: Dataset | str,
7475
ts_out: bool = False,
7576
heartbeat_interval_s: int | None = None,
77+
slow_reader_behavior: SlowReadBehavior | str | None = None,
7678
) -> None:
7779
self.__api_key = api_key
7880
self.__transport: asyncio.Transport | None = None
@@ -81,6 +83,7 @@ def __init__(
8183
self._dataset = validate_semantic_string(dataset, "dataset")
8284
self._ts_out = ts_out
8385
self._heartbeat_interval_s = heartbeat_interval_s
86+
self._slow_reader_behavior: SlowReadBehavior | str | None = slow_reader_behavior
8487

8588
self._dbn_decoder = databento_dbn.DBNDecoder(
8689
upgrade_policy=VersionUpgradePolicy.UPGRADE_TO_V3,
@@ -441,6 +444,7 @@ def _(self, message: ChallengeRequest) -> None:
441444
dataset=self._dataset,
442445
ts_out=str(int(self._ts_out)),
443446
heartbeat_interval_s=self._heartbeat_interval_s,
447+
slow_reader_behavior=self._slow_reader_behavior,
444448
)
445449
logger.debug(
446450
"sending CRAM challenge response auth='%s' dataset=%s encoding=%s ts_out=%s heartbeat_interval_s=%s client='%s'",

databento/live/session.py

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020

2121
from databento.common.constants import ALL_SYMBOLS
2222
from databento.common.enums import ReconnectPolicy
23+
from databento.common.enums import SlowReadBehavior
2324
from databento.common.error import BentoError
2425
from databento.common.publishers import Dataset
2526
from databento.common.types import ClientRecordCallback
@@ -205,8 +206,9 @@ def __init__(
205206
metadata: SessionMetadata,
206207
ts_out: bool = False,
207208
heartbeat_interval_s: int | None = None,
209+
slow_reader_behavior: SlowReadBehavior | str | None = None,
208210
):
209-
super().__init__(api_key, dataset, ts_out, heartbeat_interval_s)
211+
super().__init__(api_key, dataset, ts_out, heartbeat_interval_s, slow_reader_behavior)
210212

211213
self._dbn_queue = dbn_queue
212214
self._loop = loop
@@ -313,6 +315,7 @@ def __init__(
313315
user_gateway: str | None = None,
314316
user_port: int = DEFAULT_REMOTE_PORT,
315317
reconnect_policy: ReconnectPolicy | str = ReconnectPolicy.NONE,
318+
slow_reader_behavior: SlowReadBehavior | str | None = None,
316319
) -> None:
317320
self._dbn_queue = DBNQueue()
318321
self._lock = threading.RLock()
@@ -329,6 +332,7 @@ def __init__(
329332
self._api_key = api_key
330333
self._ts_out = ts_out
331334
self._heartbeat_interval_s = heartbeat_interval_s or 30
335+
self._slow_reader_behavior = slow_reader_behavior
332336

333337
self._protocol: _SessionProtocol | None = None
334338
self._transport: asyncio.Transport | None = None
@@ -579,6 +583,7 @@ def _create_protocol(self, dataset: Dataset | str) -> _SessionProtocol:
579583
metadata=self._metadata,
580584
ts_out=self.ts_out,
581585
heartbeat_interval_s=self.heartbeat_interval_s,
586+
slow_reader_behavior=self._slow_reader_behavior,
582587
)
583588

584589
def _connect(

tests/test_live_client.py

Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
from databento.common.constants import SCHEMA_STRUCT_MAP
2525
from databento.common.cram import BUCKET_ID_LENGTH
2626
from databento.common.dbnstore import DBNStore
27+
from databento.common.enums import SlowReadBehavior
2728
from databento.common.error import BentoError
2829
from databento.common.publishers import Dataset
2930
from databento.live import client
@@ -331,6 +332,50 @@ async def test_live_connect_auth_with_heartbeat_interval(
331332
assert message.heartbeat_interval_s == "10"
332333

333334

335+
@pytest.mark.parametrize(
336+
"slow_reader_behavior",
337+
[b for b in SlowReadBehavior],
338+
)
339+
async def test_live_connect_auth_with_slow_reader_behavior(
340+
mock_live_server: MockLiveServerInterface,
341+
test_live_api_key: str,
342+
slow_reader_behavior: SlowReadBehavior,
343+
) -> None:
344+
"""
345+
Test that setting `slow_reader_behavior` on a Live client sends that field
346+
to the gateway.
347+
"""
348+
# Arrange
349+
live_client = client.Live(
350+
key=test_live_api_key,
351+
gateway=mock_live_server.host,
352+
port=mock_live_server.port,
353+
heartbeat_interval_s=10,
354+
slow_reader_behavior=slow_reader_behavior,
355+
)
356+
357+
live_client.subscribe(
358+
dataset=Dataset.GLBX_MDP3,
359+
schema=Schema.MBO,
360+
)
361+
362+
# Act
363+
message = await mock_live_server.wait_for_message_of_type(
364+
message_type=gateway.AuthenticationRequest,
365+
)
366+
367+
# Assert
368+
assert message.auth.endswith(live_client.key[-BUCKET_ID_LENGTH:])
369+
assert message.dataset == live_client.dataset
370+
assert message.encoding == Encoding.DBN
371+
372+
# Temporary handling of renamed variant
373+
if slow_reader_behavior == SlowReadBehavior.SKIP:
374+
assert message.slow_reader_behavior == "drop"
375+
else:
376+
assert message.slow_reader_behavior == slow_reader_behavior
377+
378+
334379
async def test_live_connect_auth_two_clients(
335380
mock_live_server: MockLiveServerInterface,
336381
test_live_api_key: str,

0 commit comments

Comments
 (0)