Skip to content

Commit 478f2a2

Browse files
committed
ADD: Add slow reader behavior support to clients
1 parent bddfe12 commit 478f2a2

File tree

12 files changed

+142
-40
lines changed

12 files changed

+142
-40
lines changed

CHANGELOG.md

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,11 @@
22

33
## 0.47.1 - TBD
44

5+
### Enhancements
6+
- Added `SlowReadBehavior` enum and `LiveBuilder::SetSlowReadBehavior()` to configure
7+
gateway behavior when client falls behind
8+
- Added `SlowReadBehavior()` getter to `LiveBlocking` and `LiveThreaded`
9+
510
### Bug fixes
611
- Added conversion for missing schemas for function `RTypeFromSchema`
712

include/databento/enums.hpp

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,15 @@ enum class DatasetCondition : std::uint8_t {
4848
Missing,
4949
};
5050

51+
// Live session parameter which controls gateway behavior when the client
52+
// falls behind real time.
53+
enum class SlowReadBehavior : std::uint8_t {
54+
// Send a warning but continue reading.
55+
Warn = 0,
56+
// Skip records to catch up.
57+
Skip = 1,
58+
};
59+
5160
// A record type sentinel.
5261
namespace r_type {
5362
enum RType : std::uint8_t {
@@ -662,6 +671,7 @@ const char* ToString(SplitDuration duration_interval);
662671
const char* ToString(Delivery delivery);
663672
const char* ToString(JobState state);
664673
const char* ToString(DatasetCondition condition);
674+
const char* ToString(SlowReadBehavior slow_read_behavior);
665675
const char* ToString(RType r_type);
666676
const char* ToString(Side side);
667677
const char* ToString(Action action);
@@ -688,6 +698,7 @@ std::ostream& operator<<(std::ostream& out, SplitDuration duration_interval);
688698
std::ostream& operator<<(std::ostream& out, Delivery delivery);
689699
std::ostream& operator<<(std::ostream& out, JobState state);
690700
std::ostream& operator<<(std::ostream& out, DatasetCondition condition);
701+
std::ostream& operator<<(std::ostream& out, SlowReadBehavior slow_read_behavior);
691702
std::ostream& operator<<(std::ostream& out, RType r_type);
692703
std::ostream& operator<<(std::ostream& out, Side side);
693704
std::ostream& operator<<(std::ostream& out, Action action);

include/databento/live.hpp

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,8 @@
22

33
#include <chrono>
44
#include <cstddef>
5+
#include <cstdint>
6+
#include <optional>
57
#include <string>
68

79
#include "databento/enums.hpp" // VersionUpgradePolicy
@@ -53,6 +55,8 @@ class LiveBuilder {
5355
LiveBuilder& ExtendUserAgent(std::string extension);
5456
// Sets the compression mode for the read stream.
5557
LiveBuilder& SetCompression(Compression compression);
58+
// Sets the behavior of the gateway when the client falls behind real time.
59+
LiveBuilder& SetSlowReadBehavior(SlowReadBehavior slow_read_behavior);
5660

5761
/*
5862
* Build a live client instance
@@ -80,5 +84,6 @@ class LiveBuilder {
8084
std::size_t buffer_size_;
8185
std::string user_agent_ext_;
8286
Compression compression_{Compression::None};
87+
std::optional<SlowReadBehavior> slow_read_behavior_{};
8388
};
8489
} // namespace databento

include/databento/live_blocking.hpp

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,9 @@ class LiveBlocking {
4545
return heartbeat_interval_;
4646
}
4747
databento::Compression Compression() const { return compression_; }
48+
std::optional<databento::SlowReadBehavior> SlowReadBehavior() const {
49+
return slow_read_behavior_;
50+
}
4851
const std::vector<LiveSubscription>& Subscriptions() const { return subscriptions_; }
4952
std::vector<LiveSubscription>& Subscriptions() { return subscriptions_; }
5053

@@ -95,13 +98,15 @@ class LiveBlocking {
9598
bool send_ts_out, VersionUpgradePolicy upgrade_policy,
9699
std::optional<std::chrono::seconds> heartbeat_interval,
97100
std::size_t buffer_size, std::string user_agent_ext,
98-
databento::Compression compression);
101+
databento::Compression compression,
102+
std::optional<databento::SlowReadBehavior> slow_read_behavior);
99103
LiveBlocking(ILogReceiver* log_receiver, std::string key, std::string dataset,
100104
std::string gateway, std::uint16_t port, bool send_ts_out,
101105
VersionUpgradePolicy upgrade_policy,
102106
std::optional<std::chrono::seconds> heartbeat_interval,
103107
std::size_t buffer_size, std::string user_agent_ext,
104-
databento::Compression compression);
108+
databento::Compression compression,
109+
std::optional<databento::SlowReadBehavior> slow_read_behavior);
105110

106111
std::string DetermineGateway() const;
107112
std::uint64_t Authenticate();
@@ -128,6 +133,7 @@ class LiveBlocking {
128133
const VersionUpgradePolicy upgrade_policy_;
129134
const std::optional<std::chrono::seconds> heartbeat_interval_;
130135
const databento::Compression compression_;
136+
const std::optional<databento::SlowReadBehavior> slow_read_behavior_;
131137
detail::LiveConnection connection_;
132138
std::uint32_t sub_counter_{};
133139
std::vector<LiveSubscription> subscriptions_;

include/databento/live_threaded.hpp

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,7 @@ class LiveThreaded {
5555
VersionUpgradePolicy UpgradePolicy() const;
5656
std::optional<std::chrono::seconds> HeartbeatInterval() const;
5757
databento::Compression Compression() const;
58+
std::optional<databento::SlowReadBehavior> SlowReadBehavior() const;
5859
const std::vector<LiveSubscription>& Subscriptions() const;
5960
std::vector<LiveSubscription>& Subscriptions();
6061

@@ -109,13 +110,15 @@ class LiveThreaded {
109110
bool send_ts_out, VersionUpgradePolicy upgrade_policy,
110111
std::optional<std::chrono::seconds> heartbeat_interval,
111112
std::size_t buffer_size, std::string user_agent_ext,
112-
databento::Compression compression);
113+
databento::Compression compression,
114+
std::optional<databento::SlowReadBehavior> slow_read_behavior);
113115
LiveThreaded(ILogReceiver* log_receiver, std::string key, std::string dataset,
114116
std::string gateway, std::uint16_t port, bool send_ts_out,
115117
VersionUpgradePolicy upgrade_policy,
116118
std::optional<std::chrono::seconds> heartbeat_interval,
117119
std::size_t buffer_size, std::string user_agent_ext,
118-
databento::Compression compression);
120+
databento::Compression compression,
121+
std::optional<databento::SlowReadBehavior> slow_read_behavior);
119122

120123
// unique_ptr to be movable
121124
std::unique_ptr<Impl> impl_;

src/enums.cpp

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -108,6 +108,20 @@ const char* ToString(DatasetCondition condition) {
108108
}
109109
}
110110

111+
const char* ToString(SlowReadBehavior slow_read_behavior) {
112+
switch (slow_read_behavior) {
113+
case SlowReadBehavior::Warn: {
114+
return "warn";
115+
}
116+
case SlowReadBehavior::Skip: {
117+
return "skip";
118+
}
119+
default: {
120+
return "Unknown";
121+
}
122+
}
123+
}
124+
111125
const char* ToString(RType r_type) {
112126
switch (r_type) {
113127
case RType::Mbp0: {
@@ -847,6 +861,11 @@ std::ostream& operator<<(std::ostream& out, DatasetCondition condition) {
847861
return out;
848862
}
849863

864+
std::ostream& operator<<(std::ostream& out, SlowReadBehavior slow_read_behavior) {
865+
out << ToString(slow_read_behavior);
866+
return out;
867+
}
868+
850869
std::ostream& operator<<(std::ostream& out, RType r_type) {
851870
out << ToString(r_type);
852871
return out;

src/live.cpp

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -84,21 +84,26 @@ LiveBuilder& LiveBuilder::SetCompression(Compression compression) {
8484
return *this;
8585
}
8686

87+
LiveBuilder& LiveBuilder::SetSlowReadBehavior(SlowReadBehavior slow_read_behavior) {
88+
slow_read_behavior_ = slow_read_behavior;
89+
return *this;
90+
}
91+
8792
databento::LiveBlocking LiveBuilder::BuildBlocking() {
8893
Validate();
8994
if (gateway_.empty()) {
9095
return databento::LiveBlocking{log_receiver_, key_,
9196
dataset_, send_ts_out_,
9297
upgrade_policy_, heartbeat_interval_,
9398
buffer_size_, user_agent_ext_,
94-
compression_};
99+
compression_, slow_read_behavior_};
95100
}
96101
return databento::LiveBlocking{log_receiver_, key_,
97102
dataset_, gateway_,
98103
port_, send_ts_out_,
99104
upgrade_policy_, heartbeat_interval_,
100105
buffer_size_, user_agent_ext_,
101-
compression_};
106+
compression_, slow_read_behavior_};
102107
}
103108

104109
databento::LiveThreaded LiveBuilder::BuildThreaded() {
@@ -108,14 +113,14 @@ databento::LiveThreaded LiveBuilder::BuildThreaded() {
108113
dataset_, send_ts_out_,
109114
upgrade_policy_, heartbeat_interval_,
110115
buffer_size_, user_agent_ext_,
111-
compression_};
116+
compression_, slow_read_behavior_};
112117
}
113118
return databento::LiveThreaded{log_receiver_, key_,
114119
dataset_, gateway_,
115120
port_, send_ts_out_,
116121
upgrade_policy_, heartbeat_interval_,
117122
buffer_size_, user_agent_ext_,
118-
compression_};
123+
compression_, slow_read_behavior_};
119124
}
120125

121126
void LiveBuilder::Validate() {

src/live_blocking.cpp

Lines changed: 18 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -28,12 +28,12 @@ constexpr std::size_t kBucketIdLength = 5;
2828

2929
databento::LiveBuilder LiveBlocking::Builder() { return databento::LiveBuilder{}; }
3030

31-
LiveBlocking::LiveBlocking(ILogReceiver* log_receiver, std::string key,
32-
std::string dataset, bool send_ts_out,
33-
VersionUpgradePolicy upgrade_policy,
34-
std::optional<std::chrono::seconds> heartbeat_interval,
35-
std::size_t buffer_size, std::string user_agent_ext,
36-
databento::Compression compression)
31+
LiveBlocking::LiveBlocking(
32+
ILogReceiver* log_receiver, std::string key, std::string dataset, bool send_ts_out,
33+
VersionUpgradePolicy upgrade_policy,
34+
std::optional<std::chrono::seconds> heartbeat_interval, std::size_t buffer_size,
35+
std::string user_agent_ext, databento::Compression compression,
36+
std::optional<databento::SlowReadBehavior> slow_read_behavior)
3737

3838
: log_receiver_{log_receiver},
3939
key_{std::move(key)},
@@ -45,16 +45,18 @@ LiveBlocking::LiveBlocking(ILogReceiver* log_receiver, std::string key,
4545
upgrade_policy_{upgrade_policy},
4646
heartbeat_interval_{heartbeat_interval},
4747
compression_{compression},
48+
slow_read_behavior_{slow_read_behavior},
4849
connection_{gateway_, port_},
4950
buffer_{buffer_size},
5051
session_id_{this->Authenticate()} {}
5152

52-
LiveBlocking::LiveBlocking(ILogReceiver* log_receiver, std::string key,
53-
std::string dataset, std::string gateway, std::uint16_t port,
54-
bool send_ts_out, VersionUpgradePolicy upgrade_policy,
55-
std::optional<std::chrono::seconds> heartbeat_interval,
56-
std::size_t buffer_size, std::string user_agent_ext,
57-
databento::Compression compression)
53+
LiveBlocking::LiveBlocking(
54+
ILogReceiver* log_receiver, std::string key, std::string dataset,
55+
std::string gateway, std::uint16_t port, bool send_ts_out,
56+
VersionUpgradePolicy upgrade_policy,
57+
std::optional<std::chrono::seconds> heartbeat_interval, std::size_t buffer_size,
58+
std::string user_agent_ext, databento::Compression compression,
59+
std::optional<databento::SlowReadBehavior> slow_read_behavior)
5860
: log_receiver_{log_receiver},
5961
key_{std::move(key)},
6062
dataset_{std::move(dataset)},
@@ -65,6 +67,7 @@ LiveBlocking::LiveBlocking(ILogReceiver* log_receiver, std::string key,
6567
upgrade_policy_{upgrade_policy},
6668
heartbeat_interval_{heartbeat_interval},
6769
compression_{compression},
70+
slow_read_behavior_{slow_read_behavior},
6871
connection_{gateway_, port_},
6972
buffer_{buffer_size},
7073
session_id_{this->Authenticate()} {}
@@ -340,6 +343,9 @@ std::string LiveBlocking::EncodeAuthReq(std::string_view auth) {
340343
if (heartbeat_interval_.has_value()) {
341344
req_stream << "|heartbeat_interval_s=" << heartbeat_interval_->count();
342345
}
346+
if (slow_read_behavior_.has_value()) {
347+
req_stream << "|slow_read_behavior=" << *slow_read_behavior_;
348+
}
343349
req_stream << '\n';
344350
return req_stream.str();
345351
}

src/live_threaded.cpp

Lines changed: 25 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -57,26 +57,28 @@ LiveThreaded::~LiveThreaded() {
5757
}
5858
}
5959

60-
LiveThreaded::LiveThreaded(ILogReceiver* log_receiver, std::string key,
61-
std::string dataset, bool send_ts_out,
62-
VersionUpgradePolicy upgrade_policy,
63-
std::optional<std::chrono::seconds> heartbeat_interval,
64-
std::size_t buffer_size, std::string user_agent_ext,
65-
databento::Compression compression)
66-
: impl_{std::make_unique<Impl>(
67-
log_receiver, std::move(key), std::move(dataset), send_ts_out, upgrade_policy,
68-
heartbeat_interval, buffer_size, std::move(user_agent_ext), compression)} {}
69-
70-
LiveThreaded::LiveThreaded(ILogReceiver* log_receiver, std::string key,
71-
std::string dataset, std::string gateway, std::uint16_t port,
72-
bool send_ts_out, VersionUpgradePolicy upgrade_policy,
73-
std::optional<std::chrono::seconds> heartbeat_interval,
74-
std::size_t buffer_size, std::string user_agent_ext,
75-
databento::Compression compression)
60+
LiveThreaded::LiveThreaded(
61+
ILogReceiver* log_receiver, std::string key, std::string dataset, bool send_ts_out,
62+
VersionUpgradePolicy upgrade_policy,
63+
std::optional<std::chrono::seconds> heartbeat_interval, std::size_t buffer_size,
64+
std::string user_agent_ext, databento::Compression compression,
65+
std::optional<databento::SlowReadBehavior> slow_read_behavior)
7666
: impl_{std::make_unique<Impl>(log_receiver, std::move(key), std::move(dataset),
77-
std::move(gateway), port, send_ts_out,
78-
upgrade_policy, heartbeat_interval, buffer_size,
79-
std::move(user_agent_ext), compression)} {}
67+
send_ts_out, upgrade_policy, heartbeat_interval,
68+
buffer_size, std::move(user_agent_ext), compression,
69+
slow_read_behavior)} {}
70+
71+
LiveThreaded::LiveThreaded(
72+
ILogReceiver* log_receiver, std::string key, std::string dataset,
73+
std::string gateway, std::uint16_t port, bool send_ts_out,
74+
VersionUpgradePolicy upgrade_policy,
75+
std::optional<std::chrono::seconds> heartbeat_interval, std::size_t buffer_size,
76+
std::string user_agent_ext, databento::Compression compression,
77+
std::optional<databento::SlowReadBehavior> slow_read_behavior)
78+
: impl_{std::make_unique<Impl>(
79+
log_receiver, std::move(key), std::move(dataset), std::move(gateway), port,
80+
send_ts_out, upgrade_policy, heartbeat_interval, buffer_size,
81+
std::move(user_agent_ext), compression, slow_read_behavior)} {}
8082

8183
const std::string& LiveThreaded::Key() const { return impl_->blocking.Key(); }
8284

@@ -100,6 +102,10 @@ databento::Compression LiveThreaded::Compression() const {
100102
return impl_->blocking.Compression();
101103
}
102104

105+
std::optional<databento::SlowReadBehavior> LiveThreaded::SlowReadBehavior() const {
106+
return impl_->blocking.SlowReadBehavior();
107+
}
108+
103109
const std::vector<databento::LiveSubscription>& LiveThreaded::Subscriptions() const {
104110
return impl_->blocking.Subscriptions();
105111
}

tests/include/mock/mock_lsg_server.hpp

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,8 @@ class MockLsgServer {
4949
std::function<void(MockLsgServer&)> serve_fn);
5050
MockLsgServer(std::string dataset, bool ts_out, Compression compression,
5151
std::function<void(MockLsgServer&)> serve_fn);
52+
MockLsgServer(std::string dataset, bool ts_out, SlowReadBehavior slow_read_behavior,
53+
std::function<void(MockLsgServer&)> serve_fn);
5254

5355
std::uint16_t Port() const { return port_; }
5456

@@ -119,6 +121,7 @@ class MockLsgServer {
119121
bool ts_out_;
120122
std::chrono::seconds heartbeat_interval_;
121123
Compression compression_{Compression::None};
124+
std::optional<SlowReadBehavior> slow_read_behavior_{};
122125
std::uint16_t port_{};
123126
detail::ScopedFd socket_{};
124127
detail::ScopedFd conn_fd_{};

0 commit comments

Comments
 (0)