From a8bb492747236cccc4160fdf6961225189d00791 Mon Sep 17 00:00:00 2001 From: cal Date: Thu, 9 Apr 2026 15:07:38 +1000 Subject: [PATCH 1/3] feat: add WsAllowOutOfOrder config --- go/README.md | 2 + go/config.go | 1 + go/stream.go | 37 ++++- go/stream_test.go | 157 ++++++++++++++++++ rust/crates/sdk/src/config.rs | 13 ++ rust/crates/sdk/src/stream.rs | 6 + .../sdk/src/stream/monitor_connection.rs | 33 +++- typescript/src/stream/deduplication.ts | 60 +++++-- typescript/src/stream/index.ts | 9 +- typescript/src/stream/stats.ts | 10 ++ typescript/src/types/client.ts | 11 ++ typescript/src/types/metrics.ts | 9 + .../tests/unit/stream/stream-stats.test.ts | 2 + 13 files changed, 320 insertions(+), 30 deletions(-) diff --git a/go/README.md b/go/README.md index 692aab9..03484e7 100644 --- a/go/README.md +++ b/go/README.md @@ -175,6 +175,7 @@ type Config struct { WsURL string // Websocket Api url WsHA bool // Use concurrent connections to multiple Streams servers + WsAllowOutOfOrder bool // Allow out-of-order reports through while still deduplicating HA duplicates WsMaxReconnect int // Maximum number of reconnection attempts for Stream underlying connections LogDebug bool // Log debug information InsecureSkipVerify bool // Skip server certificate chain and host name verification @@ -292,6 +293,7 @@ func (r *ReportResponse) UnmarshalJSON(b []byte) (err error) type Stats struct { Accepted uint64 // Total number of accepted reports Deduplicated uint64 // Total number of deduplicated reports when in HA + OutOfOrder uint64 // Total number of out-of-order reports seen TotalReceived uint64 // Total number of received reports PartialReconnects uint64 // Total number of partial reconnects when in HA FullReconnects uint64 // Total number of full reconnects diff --git a/go/config.go b/go/config.go index bea548c..b74f764 100644 --- a/go/config.go +++ b/go/config.go @@ -15,6 +15,7 @@ type Config struct { WsURL string // Websocket Api url wsURL *url.URL // Websocket Api url WsHA bool // Use concurrent connections to multiple Streams servers + WsAllowOutOfOrder bool // Allow out-of-order reports through while still deduplicating HA duplicates WsMaxReconnect int // Maximum number of reconnection attempts for Stream underlying connections LogDebug bool // Log debug information InsecureSkipVerify bool // Skip server certificate chain and host name verification diff --git a/go/stream.go b/go/stream.go index ecbe347..56faa6a 100644 --- a/go/stream.go +++ b/go/stream.go @@ -54,6 +54,7 @@ type Stream interface { type Stats struct { Accepted uint64 // Total number of accepted reports Deduplicated uint64 // Total number of deduplicated reports when in HA + OutOfOrder uint64 // Total number of out-of-order reports seen TotalReceived uint64 // Total number of received reports PartialReconnects uint64 // Total number of partial reconnects when in HA FullReconnects uint64 // Total number of full reconnects @@ -63,8 +64,8 @@ type Stats struct { func (s Stats) String() (st string) { return fmt.Sprintf( - "accepted: %d, deduplicated: %d, total_received %d, partial_reconnects: %d, full_reconnects: %d, configured_connections: %d, active_connections %d", - s.Accepted, s.Deduplicated, + "accepted: %d, deduplicated: %d, out_of_order: %d, total_received %d, partial_reconnects: %d, full_reconnects: %d, configured_connections: %d, active_connections %d", + s.Accepted, s.Deduplicated, s.OutOfOrder, s.TotalReceived, s.PartialReconnects, s.FullReconnects, s.ConfiguredConnections, s.ActiveConnections, ) @@ -88,6 +89,7 @@ type stream struct { stats struct { accepted atomic.Uint64 skipped atomic.Uint64 + outOfOrder atomic.Uint64 partialReconnects atomic.Uint64 fullReconnects atomic.Uint64 activeConnections atomic.Uint64 @@ -310,6 +312,7 @@ func (s *stream) newWSconnWithRetry(origin string) (conn *wsConn, err error) { func (s *stream) Stats() (st Stats) { st.Accepted = s.stats.accepted.Load() st.Deduplicated = s.stats.skipped.Load() + st.OutOfOrder = s.stats.outOfOrder.Load() st.TotalReceived = st.Accepted + st.Deduplicated st.PartialReconnects = s.stats.partialReconnects.Load() st.FullReconnects = s.stats.fullReconnects.Load() @@ -359,17 +362,35 @@ func (s *stream) Close() (err error) { func (s *stream) accept(ctx context.Context, m *message) (err error) { id := m.Report.FeedID.String() + ts := m.Report.ObservationsTimestamp s.waterMarkMu.Lock() - // Skip older reports and reports with the same timestamp - if !m.Report.ObservationsTimestamp.After(s.waterMark[id]) { - s.stats.skipped.Add(1) - s.waterMarkMu.Unlock() - return nil + wm := s.waterMark[id] + + if s.config.WsAllowOutOfOrder { + if ts.Equal(wm) { + s.stats.skipped.Add(1) + s.waterMarkMu.Unlock() + return nil + } + if ts.Before(wm) { + s.stats.outOfOrder.Add(1) + } else { + s.waterMark[id] = ts + } + } else { + if !ts.After(wm) { + s.stats.skipped.Add(1) + if ts.Before(wm) { + s.stats.outOfOrder.Add(1) + } + s.waterMarkMu.Unlock() + return nil + } + s.waterMark[id] = ts } s.stats.accepted.Add(1) - s.waterMark[id] = m.Report.ObservationsTimestamp s.waterMarkMu.Unlock() select { diff --git a/go/stream_test.go b/go/stream_test.go index b214aba..071d389 100644 --- a/go/stream_test.go +++ b/go/stream_test.go @@ -801,6 +801,163 @@ func TestClient_StreamHA_OneOriginDown(t *testing.T) { } +func TestClient_StreamOutOfOrder(t *testing.T) { + reports := []*ReportResponse{ + {FeedID: feed1, ObservationsTimestamp: time.Unix(100, 0)}, + {FeedID: feed1, ObservationsTimestamp: time.Unix(102, 0)}, + {FeedID: feed1, ObservationsTimestamp: time.Unix(101, 0)}, // out-of-order + } + + ms := newMockServer(func(w http.ResponseWriter, r *http.Request) { + if r.Method == http.MethodHead { + return + } + + conn, err := websocket.Accept( + w, r, &websocket.AcceptOptions{CompressionMode: websocket.CompressionContextTakeover}, + ) + if err != nil { + t.Fatalf("error accepting connection: %s", err) + } + defer func() { _ = conn.CloseNow() }() + + for _, rpt := range reports { + b, err := json.Marshal(&message{rpt}) + if err != nil { + t.Errorf("failed to serialize message: %s", err) + } + if err := conn.Write(context.Background(), websocket.MessageBinary, b); err != nil { + t.Errorf("failed to write message: %s", err) + } + } + + for conn.Ping(context.Background()) == nil { + time.Sleep(100 * time.Millisecond) + } + }) + defer ms.Close() + + streamsClient, err := ms.Client() + if err != nil { + t.Fatalf("error creating client %s", err) + } + + cc := streamsClient.(*client) + cc.config.Logger = LogPrintf + cc.config.LogDebug = true + cc.config.WsAllowOutOfOrder = true + + sub, err := streamsClient.Stream(context.Background(), []feed.ID{feed1}) + if err != nil { + t.Fatalf("error subscribing %s", err) + } + defer sub.Close() + + var received []*ReportResponse + for i := 0; i < len(reports); i++ { + rep, err := sub.Read(context.Background()) + if err != nil { + t.Fatalf("error reading report %s", err) + } + received = append(received, rep) + } + + if !reportResponsesEqual(received, reports) { + t.Errorf("Read() = %v, want %v", received, reports) + } + + stats := sub.Stats() + if stats.Accepted != 3 { + t.Errorf("stats.Accepted = %d, want 3", stats.Accepted) + } + if stats.OutOfOrder != 1 { + t.Errorf("stats.OutOfOrder = %d, want 1", stats.OutOfOrder) + } + if stats.Deduplicated != 0 { + t.Errorf("stats.Deduplicated = %d, want 0", stats.Deduplicated) + } +} + +func TestClient_StreamOutOfOrder_DefaultDrop(t *testing.T) { + reports := []*ReportResponse{ + {FeedID: feed1, ObservationsTimestamp: time.Unix(100, 0)}, + {FeedID: feed1, ObservationsTimestamp: time.Unix(102, 0)}, + {FeedID: feed1, ObservationsTimestamp: time.Unix(101, 0)}, // out-of-order, should be dropped + } + + ms := newMockServer(func(w http.ResponseWriter, r *http.Request) { + if r.Method == http.MethodHead { + return + } + + conn, err := websocket.Accept( + w, r, &websocket.AcceptOptions{CompressionMode: websocket.CompressionContextTakeover}, + ) + if err != nil { + t.Fatalf("error accepting connection: %s", err) + } + defer func() { _ = conn.CloseNow() }() + + for _, rpt := range reports { + b, err := json.Marshal(&message{rpt}) + if err != nil { + t.Errorf("failed to serialize message: %s", err) + } + if err := conn.Write(context.Background(), websocket.MessageBinary, b); err != nil { + t.Errorf("failed to write message: %s", err) + } + } + + for conn.Ping(context.Background()) == nil { + time.Sleep(100 * time.Millisecond) + } + }) + defer ms.Close() + + streamsClient, err := ms.Client() + if err != nil { + t.Fatalf("error creating client %s", err) + } + + cc := streamsClient.(*client) + cc.config.Logger = LogPrintf + cc.config.LogDebug = true + + sub, err := streamsClient.Stream(context.Background(), []feed.ID{feed1}) + if err != nil { + t.Fatalf("error subscribing %s", err) + } + defer sub.Close() + + var received []*ReportResponse + for i := 0; i < 2; i++ { + rep, err := sub.Read(context.Background()) + if err != nil { + t.Fatalf("error reading report %s", err) + } + received = append(received, rep) + } + + // Wait for the third message to be processed by accept (it should be dropped) + time.Sleep(50 * time.Millisecond) + + expectedDelivered := reports[:2] + if !reportResponsesEqual(received, expectedDelivered) { + t.Errorf("Read() = %v, want %v", received, expectedDelivered) + } + + stats := sub.Stats() + if stats.Accepted != 2 { + t.Errorf("stats.Accepted = %d, want 2", stats.Accepted) + } + if stats.Deduplicated != 1 { + t.Errorf("stats.Deduplicated = %d, want 1", stats.Deduplicated) + } + if stats.OutOfOrder != 1 { + t.Errorf("stats.OutOfOrder = %d, want 1", stats.OutOfOrder) + } +} + // Tests that when in HA mode both origins are up after a recovery period even if one origin is down on initial connection func TestClient_StreamHA_OneOriginDownRecovery(t *testing.T) { connectAttempts := &atomic.Uint64{} diff --git a/rust/crates/sdk/src/config.rs b/rust/crates/sdk/src/config.rs index e55a551..627bfe9 100644 --- a/rust/crates/sdk/src/config.rs +++ b/rust/crates/sdk/src/config.rs @@ -51,6 +51,9 @@ pub struct Config { /// High Availability Mode: Use concurrent connections to multiple Streams servers pub ws_ha: WebSocketHighAvailability, + /// Allow out-of-order reports through while still deduplicating HA duplicates + pub ws_allow_out_of_order: bool, + /// Maximum number of reconnection attempts for underlying WebSocket connections pub ws_max_reconnect: usize, @@ -65,6 +68,7 @@ pub struct Config { impl Config { const DEFAULT_WS_MAX_RECONNECT: usize = 5; const DEFAULT_WS_HA: WebSocketHighAvailability = WebSocketHighAvailability::Disabled; + const DEFAULT_WS_ALLOW_OUT_OF_ORDER: bool = false; const DEFAULT_INSECURE_SKIP_VERIFY: InsecureSkipVerify = InsecureSkipVerify::Disabled; const DEFAULT_INSPECT_HTTP_RESPONSE: Option = None; @@ -140,6 +144,7 @@ impl Config { rest_url, ws_url, ws_ha: Self::DEFAULT_WS_HA, + ws_allow_out_of_order: Self::DEFAULT_WS_ALLOW_OUT_OF_ORDER, ws_max_reconnect: Self::DEFAULT_WS_MAX_RECONNECT, insecure_skip_verify: Self::DEFAULT_INSECURE_SKIP_VERIFY, inspect_http_response: Self::DEFAULT_INSPECT_HTTP_RESPONSE, @@ -160,6 +165,7 @@ pub struct ConfigBuilder { rest_url: String, ws_url: String, ws_ha: WebSocketHighAvailability, + ws_allow_out_of_order: bool, ws_max_reconnect: usize, insecure_skip_verify: InsecureSkipVerify, inspect_http_response: Option, @@ -172,6 +178,12 @@ impl ConfigBuilder { self } + /// Sets the `ws_allow_out_of_order` parameter. + pub fn with_ws_allow_out_of_order(mut self, ws_allow_out_of_order: bool) -> Self { + self.ws_allow_out_of_order = ws_allow_out_of_order; + self + } + // Sets the `ws_max_reconnect` parameter. pub fn with_ws_max_reconnect(mut self, ws_max_reconnect: usize) -> Self { self.ws_max_reconnect = ws_max_reconnect; @@ -206,6 +218,7 @@ impl ConfigBuilder { rest_url: self.rest_url, ws_url: self.ws_url, ws_ha: self.ws_ha, + ws_allow_out_of_order: self.ws_allow_out_of_order, ws_max_reconnect: self.ws_max_reconnect, insecure_skip_verify: self.insecure_skip_verify, inspect_http_response: self.inspect_http_response, diff --git a/rust/crates/sdk/src/stream.rs b/rust/crates/sdk/src/stream.rs index 4c4006d..7d7a955 100644 --- a/rust/crates/sdk/src/stream.rs +++ b/rust/crates/sdk/src/stream.rs @@ -57,6 +57,8 @@ struct Stats { accepted: AtomicUsize, /// Total number of deduplicated reports when in HA deduplicated: AtomicUsize, + /// Total number of out-of-order reports seen + out_of_order: AtomicUsize, /// Total number of partial reconnects when in HA partial_reconnects: AtomicUsize, /// Total number of full reconnects @@ -135,6 +137,7 @@ impl Stream { let stats = Arc::new(Stats { accepted: AtomicUsize::new(0), deduplicated: AtomicUsize::new(0), + out_of_order: AtomicUsize::new(0), partial_reconnects: AtomicUsize::new(0), full_reconnects: AtomicUsize::new(0), configured_connections: AtomicUsize::new(0), @@ -257,6 +260,7 @@ impl Stream { StatsSnapshot { accepted, deduplicated, + out_of_order: self.stats.out_of_order.load(Ordering::SeqCst), total_received: accepted + deduplicated, partial_reconnects: self.stats.partial_reconnects.load(Ordering::SeqCst), full_reconnects: self.stats.full_reconnects.load(Ordering::SeqCst), @@ -273,6 +277,8 @@ pub struct StatsSnapshot { pub accepted: usize, /// Total number of deduplicated reports when in HA pub deduplicated: usize, + /// Total number of out-of-order reports seen + pub out_of_order: usize, /// Total number of received reports pub total_received: usize, /// Total number of partial reconnects when in HA diff --git a/rust/crates/sdk/src/stream/monitor_connection.rs b/rust/crates/sdk/src/stream/monitor_connection.rs index 1cd497b..a2c486f 100644 --- a/rust/crates/sdk/src/stream/monitor_connection.rs +++ b/rust/crates/sdk/src/stream/monitor_connection.rs @@ -48,16 +48,41 @@ pub(crate) async fn run_stream( let feed_id = report.report.feed_id.to_hex_string(); let observations_timestamp = report.report.observations_timestamp; - if water_mark.lock().await.contains_key(&feed_id) && water_mark.lock().await[&feed_id] >= observations_timestamp { - stats.deduplicated.fetch_add(1, Ordering::SeqCst); - continue; + let mut wm = water_mark.lock().await; + let current_wm = wm.get(&feed_id).copied(); + + if let Some(current) = current_wm { + if config.ws_allow_out_of_order { + if observations_timestamp == current { + stats.deduplicated.fetch_add(1, Ordering::SeqCst); + drop(wm); + continue; + } + if observations_timestamp < current { + stats.out_of_order.fetch_add(1, Ordering::SeqCst); + } else { + wm.insert(feed_id.clone(), observations_timestamp); + } + } else { + if observations_timestamp <= current { + stats.deduplicated.fetch_add(1, Ordering::SeqCst); + if observations_timestamp < current { + stats.out_of_order.fetch_add(1, Ordering::SeqCst); + } + drop(wm); + continue; + } + wm.insert(feed_id.clone(), observations_timestamp); + } + } else { + wm.insert(feed_id.clone(), observations_timestamp); } + drop(wm); report_sender.send(report).await.map_err(|e| { StreamError::ConnectionError(format!("Failed to send report: {}", e)) })?; - water_mark.lock().await.insert(feed_id, observations_timestamp); stats.accepted.fetch_add(1, Ordering::SeqCst); } else { diff --git a/typescript/src/stream/deduplication.ts b/typescript/src/stream/deduplication.ts index bed17ce..660e6ce 100644 --- a/typescript/src/stream/deduplication.ts +++ b/typescript/src/stream/deduplication.ts @@ -12,12 +12,14 @@ export interface ReportMetadata { export interface DeduplicationResult { isAccepted: boolean; isDuplicate: boolean; + isOutOfOrder: boolean; reason?: string; } export interface DeduplicationStats { accepted: number; deduplicated: number; + outOfOrder: number; totalReceived: number; watermarkCount: number; } @@ -29,20 +31,24 @@ export class ReportDeduplicator { private waterMark: Map = new Map(); private acceptedCount = 0; private deduplicatedCount = 0; + private outOfOrderCount = 0; private cleanupInterval: NodeJS.Timeout | null = null; // Configuration private readonly maxWatermarkAge: number; private readonly cleanupIntervalMs: number; + private readonly allowOutOfOrder: boolean; constructor( options: { maxWatermarkAge?: number; // How long to keep watermarks (default: 1 hour) cleanupIntervalMs?: number; // How often to clean old watermarks (default: 5 minutes) + allowOutOfOrder?: boolean; // Allow out-of-order reports through (default: false) } = {} ) { this.maxWatermarkAge = options.maxWatermarkAge ?? 60 * 60 * 1000; // 1 hour this.cleanupIntervalMs = options.cleanupIntervalMs ?? 5 * 60 * 1000; // 5 minutes + this.allowOutOfOrder = options.allowOutOfOrder ?? false; // Start periodic cleanup this.startCleanup(); @@ -55,27 +61,47 @@ export class ReportDeduplicator { const feedId = report.feedID; const observationsTimestamp = report.observationsTimestamp; - // Get current watermark for this feed const currentWatermark = this.waterMark.get(feedId); - // Check if this report is older than or equal to the watermark - if (currentWatermark !== undefined && currentWatermark >= observationsTimestamp) { - this.deduplicatedCount++; - return { - isAccepted: false, - isDuplicate: true, - reason: `Report timestamp ${observationsTimestamp} <= watermark ${currentWatermark} for feed ${feedId}`, - }; + if (currentWatermark !== undefined) { + if (this.allowOutOfOrder) { + if (observationsTimestamp === currentWatermark) { + this.deduplicatedCount++; + return { + isAccepted: false, + isDuplicate: true, + isOutOfOrder: false, + reason: `Duplicate timestamp ${observationsTimestamp} for feed ${feedId}`, + }; + } + if (observationsTimestamp < currentWatermark) { + this.outOfOrderCount++; + this.acceptedCount++; + return { isAccepted: true, isDuplicate: false, isOutOfOrder: true }; + } + this.waterMark.set(feedId, observationsTimestamp); + } else { + if (observationsTimestamp <= currentWatermark) { + this.deduplicatedCount++; + const isOOO = observationsTimestamp < currentWatermark; + if (isOOO) { + this.outOfOrderCount++; + } + return { + isAccepted: false, + isDuplicate: true, + isOutOfOrder: isOOO, + reason: `Report timestamp ${observationsTimestamp} <= watermark ${currentWatermark} for feed ${feedId}`, + }; + } + this.waterMark.set(feedId, observationsTimestamp); + } + } else { + this.waterMark.set(feedId, observationsTimestamp); } - // Accept the report and update watermark - this.waterMark.set(feedId, observationsTimestamp); this.acceptedCount++; - - return { - isAccepted: true, - isDuplicate: false, - }; + return { isAccepted: true, isDuplicate: false, isOutOfOrder: false }; } /** @@ -85,6 +111,7 @@ export class ReportDeduplicator { return { accepted: this.acceptedCount, deduplicated: this.deduplicatedCount, + outOfOrder: this.outOfOrderCount, totalReceived: this.acceptedCount + this.deduplicatedCount, watermarkCount: this.waterMark.size, }; @@ -135,6 +162,7 @@ export class ReportDeduplicator { reset(): void { this.acceptedCount = 0; this.deduplicatedCount = 0; + this.outOfOrderCount = 0; this.waterMark.clear(); } diff --git a/typescript/src/stream/index.ts b/typescript/src/stream/index.ts index 232441c..c4859ff 100644 --- a/typescript/src/stream/index.ts +++ b/typescript/src/stream/index.ts @@ -132,8 +132,9 @@ export class Stream extends EventEmitter { this.connectionManager = new ConnectionManager(config, managerConfig); - // Initialize deduplicator for HA mode (single mode can also benefit from deduplication) - this.deduplicator = new ReportDeduplicator(); + this.deduplicator = new ReportDeduplicator({ + allowOutOfOrder: config.wsAllowOutOfOrder, + }); // Inject StreamStats into ConnectionManager for unified metrics this.connectionManager.setStreamStats(this.stats); @@ -234,6 +235,10 @@ export class Stream extends EventEmitter { const originInfo = origin ? ` from ${new URL(origin).host}` : ""; + if (result.isOutOfOrder) { + this.stats.incrementOutOfOrder(); + } + if (result.isAccepted) { this.stats.incrementAccepted(); this.logger.debug( diff --git a/typescript/src/stream/stats.ts b/typescript/src/stream/stats.ts index 15da3ef..d3f6cc4 100644 --- a/typescript/src/stream/stats.ts +++ b/typescript/src/stream/stats.ts @@ -6,6 +6,7 @@ import { MetricsSnapshot, ConnectionStatus } from "../types/metrics"; export class StreamStats { private _accepted = 0; private _deduplicated = 0; + private _outOfOrder = 0; private _partialReconnects = 0; private _fullReconnects = 0; private _configuredConnections = 0; @@ -33,6 +34,13 @@ export class StreamStats { this._totalReceived++; } + /** + * Increment the number of out-of-order reports seen + */ + incrementOutOfOrder(): void { + this._outOfOrder++; + } + /** * Increment the number of partial reconnects (some connections lost but not all) */ @@ -102,6 +110,7 @@ export class StreamStats { reset(): void { this._accepted = 0; this._deduplicated = 0; + this._outOfOrder = 0; this._partialReconnects = 0; this._fullReconnects = 0; this._totalReceived = 0; @@ -121,6 +130,7 @@ export class StreamStats { return { accepted: this._accepted, deduplicated: this._deduplicated, + outOfOrder: this._outOfOrder, partialReconnects: this._partialReconnects, fullReconnects: this._fullReconnects, configuredConnections: this._configuredConnections, diff --git a/typescript/src/types/client.ts b/typescript/src/types/client.ts index 6904062..6836d14 100644 --- a/typescript/src/types/client.ts +++ b/typescript/src/types/client.ts @@ -128,6 +128,17 @@ export interface Config { */ haMode?: boolean; + /** + * Allow out-of-order reports through while still deduplicating HA duplicates. + * + * When false (default), any report with timestamp <= watermark is dropped. + * When true, only exact-timestamp matches are deduplicated; out-of-order + * reports (older but distinct timestamps) are delivered. + * + * @default false + */ + wsAllowOutOfOrder?: boolean; + /** * Connection timeout for individual WebSocket connections in HA mode (milliseconds). * diff --git a/typescript/src/types/metrics.ts b/typescript/src/types/metrics.ts index a8240e2..3d5aded 100644 --- a/typescript/src/types/metrics.ts +++ b/typescript/src/types/metrics.ts @@ -50,6 +50,15 @@ export interface MetricsSnapshot { */ readonly deduplicated: number; + /** + * Total number of out-of-order reports seen. + * + * Tracks reports received with a timestamp older than the current watermark. + * When wsAllowOutOfOrder is false, these reports are dropped (included in deduplicated count). + * When wsAllowOutOfOrder is true, these reports are delivered (included in accepted count). + */ + readonly outOfOrder: number; + /** * Total number of reports received across all connections. * diff --git a/typescript/tests/unit/stream/stream-stats.test.ts b/typescript/tests/unit/stream/stream-stats.test.ts index c7241d0..8d37ebc 100644 --- a/typescript/tests/unit/stream/stream-stats.test.ts +++ b/typescript/tests/unit/stream/stream-stats.test.ts @@ -31,6 +31,7 @@ describe("StreamStats Tests", () => { expect(initialStats).toEqual({ accepted: 0, deduplicated: 0, + outOfOrder: 0, partialReconnects: 0, fullReconnects: 0, configuredConnections: 1, // Default is 1 @@ -147,6 +148,7 @@ describe("StreamStats Tests", () => { expect(finalStats).toEqual({ accepted: 4, deduplicated: 2, + outOfOrder: 0, partialReconnects: 1, fullReconnects: 1, configuredConnections: 2, From 09cfb0341f1c0f386d0c5518407bb3376bd3fd84 Mon Sep 17 00:00:00 2001 From: cal Date: Fri, 10 Apr 2026 13:25:26 +1000 Subject: [PATCH 2/3] refactor: extract deduplication into FeedDeduplicator --- go/dedup.go | 59 +++ go/dedup_test.go | 109 +++++ go/stream.go | 39 +- rust/crates/sdk/src/stream.rs | 27 +- rust/crates/sdk/src/stream/dedup.rs | 158 +++++++ .../sdk/src/stream/monitor_connection.rs | 50 +-- typescript/src/stream/deduplication.ts | 229 ++++------ .../tests/unit/stream/deduplication.test.ts | 425 +++++++++--------- 8 files changed, 660 insertions(+), 436 deletions(-) create mode 100644 go/dedup.go create mode 100644 go/dedup_test.go create mode 100644 rust/crates/sdk/src/stream/dedup.rs diff --git a/go/dedup.go b/go/dedup.go new file mode 100644 index 0000000..fc9bda8 --- /dev/null +++ b/go/dedup.go @@ -0,0 +1,59 @@ +package streams + +const seenBufferSize = 32 + +type Verdict int + +const ( + Accept Verdict = iota + Duplicate + OutOfOrder +) + +type feedState struct { + watermark int64 + ring [seenBufferSize]int64 + set map[int64]struct{} + cursor int + count int +} + +type FeedDeduplicator struct { + feeds map[string]*feedState +} + +func NewFeedDeduplicator() *FeedDeduplicator { + return &FeedDeduplicator{feeds: make(map[string]*feedState)} +} + +func (d *FeedDeduplicator) Check(feedID string, ts int64) Verdict { + fs := d.feeds[feedID] + if fs == nil { + fs = &feedState{set: make(map[int64]struct{}, seenBufferSize)} + d.feeds[feedID] = fs + } + + if _, dup := fs.set[ts]; dup { + return Duplicate + } + + if fs.count == seenBufferSize { + evict := fs.ring[fs.cursor] + delete(fs.set, evict) + } else { + fs.count++ + } + fs.ring[fs.cursor] = ts + fs.set[ts] = struct{}{} + fs.cursor = (fs.cursor + 1) % seenBufferSize + + isOutOfOrder := fs.watermark > 0 && ts < fs.watermark + if ts > fs.watermark { + fs.watermark = ts + } + + if isOutOfOrder { + return OutOfOrder + } + return Accept +} diff --git a/go/dedup_test.go b/go/dedup_test.go new file mode 100644 index 0000000..2cda469 --- /dev/null +++ b/go/dedup_test.go @@ -0,0 +1,109 @@ +package streams + +import "testing" + +func TestFeedDeduplicator_Accept(t *testing.T) { + d := NewFeedDeduplicator() + if v := d.Check("feed-1", 100); v != Accept { + t.Fatalf("expected Accept, got %d", v) + } +} + +func TestFeedDeduplicator_Duplicate(t *testing.T) { + d := NewFeedDeduplicator() + d.Check("feed-1", 100) + if v := d.Check("feed-1", 100); v != Duplicate { + t.Fatalf("expected Duplicate, got %d", v) + } +} + +func TestFeedDeduplicator_OutOfOrder(t *testing.T) { + d := NewFeedDeduplicator() + d.Check("feed-1", 200) + if v := d.Check("feed-1", 100); v != OutOfOrder { + t.Fatalf("expected OutOfOrder, got %d", v) + } +} + +func TestFeedDeduplicator_OutOfOrderNotDuplicate(t *testing.T) { + d := NewFeedDeduplicator() + d.Check("feed-1", 200) + v := d.Check("feed-1", 100) + if v != OutOfOrder { + t.Fatalf("expected OutOfOrder for first OOO delivery, got %d", v) + } + if v := d.Check("feed-1", 100); v != Duplicate { + t.Fatalf("expected Duplicate for second OOO delivery, got %d", v) + } +} + +func TestFeedDeduplicator_FIFOEviction(t *testing.T) { + d := NewFeedDeduplicator() + for i := int64(1); i <= seenBufferSize; i++ { + d.Check("feed-1", i) + } + d.Check("feed-1", 33) + // ts=2 (second inserted) should still be in the buffer + if v := d.Check("feed-1", 2); v != Duplicate { + t.Fatalf("expected ts=2 still in buffer, got %d", v) + } + // ts=1 (first inserted) was evicted by ts=33 + if v := d.Check("feed-1", 1); v == Duplicate { + t.Fatal("expected ts=1 to be evicted (FIFO), but got Duplicate") + } +} + +func TestFeedDeduplicator_FIFOEvictsOldestInsertedNotSmallest(t *testing.T) { + d := NewFeedDeduplicator() + // Insert out of order: 100, 1, 2, 3, ..., 31 (total 32 entries) + d.Check("feed-1", 100) + for i := int64(1); i <= seenBufferSize-1; i++ { + d.Check("feed-1", i) + } + // Buffer is full. ts=100 was inserted first (oldest by insertion). + // Adding ts=999 should evict ts=100, NOT ts=1 (the smallest value). + d.Check("feed-1", 999) + // ts=1 should still be present (smallest value, but NOT oldest inserted) + if v := d.Check("feed-1", 1); v != Duplicate { + t.Fatalf("expected ts=1 (smallest value, but not oldest inserted) to remain, got %d", v) + } + // ts=100 should have been evicted (oldest inserted) + if v := d.Check("feed-1", 100); v == Duplicate { + t.Fatal("expected ts=100 (oldest inserted) to be evicted, but got Duplicate") + } +} + +func TestFeedDeduplicator_IndependentFeeds(t *testing.T) { + d := NewFeedDeduplicator() + d.Check("feed-a", 100) + d.Check("feed-b", 100) + + if v := d.Check("feed-a", 100); v != Duplicate { + t.Fatalf("expected Duplicate for feed-a, got %d", v) + } + if v := d.Check("feed-b", 100); v != Duplicate { + t.Fatalf("expected Duplicate for feed-b, got %d", v) + } + // Different feed, same ts is not a duplicate + if v := d.Check("feed-c", 100); v != Accept { + t.Fatalf("expected Accept for new feed-c, got %d", v) + } +} + +func TestFeedDeduplicator_WatermarkZeroNotOutOfOrder(t *testing.T) { + d := NewFeedDeduplicator() + // First report at ts=0 should be Accept, not OutOfOrder + if v := d.Check("feed-1", 0); v != Accept { + t.Fatalf("expected Accept for first report at ts=0, got %d", v) + } +} + +func TestFeedDeduplicator_HADuplicateAfterWatermarkAdvance(t *testing.T) { + d := NewFeedDeduplicator() + d.Check("feed-1", 100) // Accept + d.Check("feed-1", 200) // Accept, watermark -> 200 + // HA duplicate of ts=100 arrives from second connection + if v := d.Check("feed-1", 100); v != Duplicate { + t.Fatalf("expected Duplicate for HA retransmit, got %d", v) + } +} diff --git a/go/stream.go b/go/stream.go index 56faa6a..81fe025 100644 --- a/go/stream.go +++ b/go/stream.go @@ -83,8 +83,8 @@ type stream struct { closeError atomic.Value connStatusCallback func(isConneccted bool, host string, origin string) - waterMarkMu sync.Mutex - waterMark map[string]time.Time + feedsMu sync.Mutex + dedup *FeedDeduplicator stats struct { accepted atomic.Uint64 @@ -109,7 +109,7 @@ func (c *client) newStream(ctx context.Context, httpClient *http.Client, feedIDs config: c.config, output: make(chan *ReportResponse, 1), feedIDs: feedIDs, - waterMark: make(map[string]time.Time), + dedup: NewFeedDeduplicator(), streamCtx: streamCtx, streamCtxCancel: streamCtxCancel, } @@ -362,36 +362,25 @@ func (s *stream) Close() (err error) { func (s *stream) accept(ctx context.Context, m *message) (err error) { id := m.Report.FeedID.String() - ts := m.Report.ObservationsTimestamp + ts := m.Report.ObservationsTimestamp.UnixMilli() - s.waterMarkMu.Lock() - wm := s.waterMark[id] + s.feedsMu.Lock() + verdict := s.dedup.Check(id, ts) + s.feedsMu.Unlock() - if s.config.WsAllowOutOfOrder { - if ts.Equal(wm) { - s.stats.skipped.Add(1) - s.waterMarkMu.Unlock() - return nil - } - if ts.Before(wm) { - s.stats.outOfOrder.Add(1) - } else { - s.waterMark[id] = ts - } - } else { - if !ts.After(wm) { + switch verdict { + case Duplicate: + s.stats.skipped.Add(1) + return nil + case OutOfOrder: + s.stats.outOfOrder.Add(1) + if !s.config.WsAllowOutOfOrder { s.stats.skipped.Add(1) - if ts.Before(wm) { - s.stats.outOfOrder.Add(1) - } - s.waterMarkMu.Unlock() return nil } - s.waterMark[id] = ts } s.stats.accepted.Add(1) - s.waterMarkMu.Unlock() select { case <-ctx.Done(): diff --git a/rust/crates/sdk/src/stream.rs b/rust/crates/sdk/src/stream.rs index 7d7a955..4b478ac 100644 --- a/rust/crates/sdk/src/stream.rs +++ b/rust/crates/sdk/src/stream.rs @@ -1,6 +1,8 @@ +mod dedup; mod establish_connection; mod monitor_connection; +use dedup::FeedDeduplicator; use establish_connection::connect; use monitor_connection::run_stream; @@ -10,12 +12,9 @@ use chainlink_data_streams_report::feed_id::ID; use chainlink_data_streams_report::report::Report; use serde::{Deserialize, Serialize}; -use std::{ - collections::HashMap, - sync::{ - atomic::{AtomicUsize, Ordering}, - Arc, - }, +use std::sync::{ + atomic::{AtomicUsize, Ordering}, + Arc, }; use tokio::{ net::TcpStream, @@ -23,7 +22,7 @@ use tokio::{ time::{sleep, Duration}, }; use tokio_tungstenite::{MaybeTlsStream, WebSocketStream as TungsteniteWebSocketStream}; -use tracing::{debug, error, info}; +use tracing::{debug, info}; pub const DEFAULT_WS_CONNECT_TIMEOUT: Duration = Duration::from_secs(5); pub const MIN_WS_RECONNECT_INTERVAL: Duration = Duration::from_millis(1000); @@ -87,7 +86,7 @@ pub struct Stream { report_receiver: mpsc::Receiver, shutdown_sender: broadcast::Sender<()>, stats: Arc, - water_mark: Arc>>, + dedup: Arc>, } impl Stream { @@ -146,7 +145,7 @@ impl Stream { let conn = connect(config, &feed_ids, stats.clone()).await?; - let water_mark = Arc::new(Mutex::new(HashMap::new())); + let dedup = Arc::new(Mutex::new(FeedDeduplicator::new())); Ok(Stream { config: config.clone(), @@ -156,7 +155,7 @@ impl Stream { report_receiver, shutdown_sender, stats, - water_mark, + dedup, }) } @@ -173,7 +172,7 @@ impl Stream { let report_sender = self.report_sender.clone(); let shutdown_receiver = self.shutdown_sender.subscribe(); let stats = self.stats.clone(); - let water_mark = self.water_mark.clone(); + let dedup = self.dedup.clone(); let config = self.config.clone(); let feed_ids = self.feed_ids.clone(); @@ -182,7 +181,7 @@ impl Stream { report_sender, shutdown_receiver, stats, - water_mark, + dedup, config, feed_ids, )); @@ -192,7 +191,7 @@ impl Stream { let report_sender = self.report_sender.clone(); let shutdown_receiver = self.shutdown_sender.subscribe(); let stats = self.stats.clone(); - let water_mark = self.water_mark.clone(); + let dedup = self.dedup.clone(); let config = self.config.clone(); let feed_ids = self.feed_ids.clone(); @@ -201,7 +200,7 @@ impl Stream { report_sender, shutdown_receiver, stats, - water_mark, + dedup, config, feed_ids, )); diff --git a/rust/crates/sdk/src/stream/dedup.rs b/rust/crates/sdk/src/stream/dedup.rs new file mode 100644 index 0000000..2c16a71 --- /dev/null +++ b/rust/crates/sdk/src/stream/dedup.rs @@ -0,0 +1,158 @@ +use std::collections::{HashMap, HashSet}; + +const SEEN_BUFFER_SIZE: usize = 32; + +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub(crate) enum Verdict { + Accept, + Duplicate, + OutOfOrder, +} + +struct FeedState { + watermark: u64, + ring: [u64; SEEN_BUFFER_SIZE], + set: HashSet, + cursor: usize, + count: usize, +} + +impl FeedState { + fn new() -> Self { + Self { + watermark: 0, + ring: [0; SEEN_BUFFER_SIZE], + set: HashSet::with_capacity(SEEN_BUFFER_SIZE), + cursor: 0, + count: 0, + } + } +} + +pub(crate) struct FeedDeduplicator { + feeds: HashMap, +} + +impl FeedDeduplicator { + pub fn new() -> Self { + Self { + feeds: HashMap::new(), + } + } + + pub fn check(&mut self, feed_id: &str, ts: u64) -> Verdict { + let fs = self + .feeds + .entry(feed_id.to_owned()) + .or_insert_with(FeedState::new); + + if fs.set.contains(&ts) { + return Verdict::Duplicate; + } + + if fs.count == SEEN_BUFFER_SIZE { + let evict = fs.ring[fs.cursor]; + fs.set.remove(&evict); + } else { + fs.count += 1; + } + fs.ring[fs.cursor] = ts; + fs.set.insert(ts); + fs.cursor = (fs.cursor + 1) % SEEN_BUFFER_SIZE; + + let is_out_of_order = fs.watermark > 0 && ts < fs.watermark; + if ts > fs.watermark { + fs.watermark = ts; + } + + if is_out_of_order { + Verdict::OutOfOrder + } else { + Verdict::Accept + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn accept() { + let mut d = FeedDeduplicator::new(); + assert_eq!(d.check("feed-1", 100), Verdict::Accept); + } + + #[test] + fn duplicate() { + let mut d = FeedDeduplicator::new(); + d.check("feed-1", 100); + assert_eq!(d.check("feed-1", 100), Verdict::Duplicate); + } + + #[test] + fn out_of_order() { + let mut d = FeedDeduplicator::new(); + d.check("feed-1", 200); + assert_eq!(d.check("feed-1", 100), Verdict::OutOfOrder); + } + + #[test] + fn out_of_order_then_duplicate() { + let mut d = FeedDeduplicator::new(); + d.check("feed-1", 200); + assert_eq!(d.check("feed-1", 100), Verdict::OutOfOrder); + assert_eq!(d.check("feed-1", 100), Verdict::Duplicate); + } + + #[test] + fn fifo_eviction() { + let mut d = FeedDeduplicator::new(); + for i in 1..=SEEN_BUFFER_SIZE as u64 { + d.check("feed-1", i); + } + d.check("feed-1", 33); + // ts=2 still present + assert_eq!(d.check("feed-1", 2), Verdict::Duplicate); + // ts=1 was evicted (FIFO oldest inserted) + assert_ne!(d.check("feed-1", 1), Verdict::Duplicate); + } + + #[test] + fn fifo_evicts_oldest_inserted_not_smallest_value() { + let mut d = FeedDeduplicator::new(); + d.check("feed-1", 100); + for i in 1..SEEN_BUFFER_SIZE as u64 { + d.check("feed-1", i); + } + d.check("feed-1", 999); + // ts=1 (smallest value) should still be present + assert_eq!(d.check("feed-1", 1), Verdict::Duplicate); + // ts=100 (oldest inserted) should be evicted + assert_ne!(d.check("feed-1", 100), Verdict::Duplicate); + } + + #[test] + fn independent_feeds() { + let mut d = FeedDeduplicator::new(); + d.check("feed-a", 100); + d.check("feed-b", 100); + assert_eq!(d.check("feed-a", 100), Verdict::Duplicate); + assert_eq!(d.check("feed-b", 100), Verdict::Duplicate); + assert_eq!(d.check("feed-c", 100), Verdict::Accept); + } + + #[test] + fn watermark_zero_not_out_of_order() { + let mut d = FeedDeduplicator::new(); + assert_eq!(d.check("feed-1", 0), Verdict::Accept); + } + + #[test] + fn ha_duplicate_after_watermark_advance() { + let mut d = FeedDeduplicator::new(); + d.check("feed-1", 100); + d.check("feed-1", 200); + assert_eq!(d.check("feed-1", 100), Verdict::Duplicate); + } +} diff --git a/rust/crates/sdk/src/stream/monitor_connection.rs b/rust/crates/sdk/src/stream/monitor_connection.rs index a2c486f..9ba8421 100644 --- a/rust/crates/sdk/src/stream/monitor_connection.rs +++ b/rust/crates/sdk/src/stream/monitor_connection.rs @@ -1,4 +1,4 @@ -use super::{Stats, StreamError, WebSocketReport}; +use super::{dedup::{FeedDeduplicator, Verdict}, Stats, StreamError, WebSocketReport}; use crate::{config::Config, stream::establish_connection::try_to_reconnect}; @@ -6,12 +6,9 @@ use chainlink_data_streams_report::feed_id::ID; use futures::SinkExt; use futures_util::StreamExt; -use std::{ - collections::HashMap, - sync::{ - atomic::{AtomicBool, Ordering}, - Arc, - }, +use std::sync::{ + atomic::{AtomicBool, Ordering}, + Arc, }; use tokio::{ net::TcpStream, @@ -27,7 +24,7 @@ pub(crate) async fn run_stream( report_sender: mpsc::Sender, mut shutdown_receiver: broadcast::Receiver<()>, stats: Arc, - water_mark: Arc>>, + dedup: Arc>, config: Config, feed_ids: Vec, ) -> Result<(), StreamError> { @@ -46,45 +43,30 @@ pub(crate) async fn run_stream( info!("Received new report from Data Streams Endpoint."); if let Ok(report) = serde_json::from_slice::(&data) { let feed_id = report.report.feed_id.to_hex_string(); - let observations_timestamp = report.report.observations_timestamp; + let ts = report.report.observations_timestamp as u64; - let mut wm = water_mark.lock().await; - let current_wm = wm.get(&feed_id).copied(); + let verdict = dedup.lock().await.check(&feed_id, ts); - if let Some(current) = current_wm { - if config.ws_allow_out_of_order { - if observations_timestamp == current { - stats.deduplicated.fetch_add(1, Ordering::SeqCst); - drop(wm); - continue; - } - if observations_timestamp < current { - stats.out_of_order.fetch_add(1, Ordering::SeqCst); - } else { - wm.insert(feed_id.clone(), observations_timestamp); - } - } else { - if observations_timestamp <= current { + match verdict { + Verdict::Duplicate => { + stats.deduplicated.fetch_add(1, Ordering::SeqCst); + continue; + } + Verdict::OutOfOrder => { + stats.out_of_order.fetch_add(1, Ordering::SeqCst); + if !config.ws_allow_out_of_order { stats.deduplicated.fetch_add(1, Ordering::SeqCst); - if observations_timestamp < current { - stats.out_of_order.fetch_add(1, Ordering::SeqCst); - } - drop(wm); continue; } - wm.insert(feed_id.clone(), observations_timestamp); } - } else { - wm.insert(feed_id.clone(), observations_timestamp); + Verdict::Accept => {} } - drop(wm); report_sender.send(report).await.map_err(|e| { StreamError::ConnectionError(format!("Failed to send report: {}", e)) })?; stats.accepted.fetch_add(1, Ordering::SeqCst); - } else { error!("Failed to parse binary message."); } diff --git a/typescript/src/stream/deduplication.ts b/typescript/src/stream/deduplication.ts index 660e6ce..0060e1a 100644 --- a/typescript/src/stream/deduplication.ts +++ b/typescript/src/stream/deduplication.ts @@ -1,7 +1,12 @@ /** - * Report deduplication using watermark timestamps + * Report deduplication using a bounded set of recently seen timestamps per feed. + * Each feed tracks a watermark (highest timestamp) for ordering decisions and a + * set of recently seen timestamps for deduplication, allowing correct dedup of + * both in-order and out-of-order HA duplicates. */ +const SEEN_BUFFER_SIZE = 32; + export interface ReportMetadata { feedID: string; observationsTimestamp: number; @@ -24,11 +29,20 @@ export interface DeduplicationStats { watermarkCount: number; } -/** - * Manages report deduplication using watermark timestamps - */ +enum Verdict { + Accept, + Duplicate, + OutOfOrder, +} + +interface FeedState { + watermark: number; + seen: Set; +} + +// ReportDeduplicator manages deduplication of reports for a set of feeds. export class ReportDeduplicator { - private waterMark: Map = new Map(); + private feedState: Map = new Map(); private acceptedCount = 0; private deduplicatedCount = 0; private outOfOrderCount = 0; @@ -54,193 +68,116 @@ export class ReportDeduplicator { this.startCleanup(); } - /** - * Process a report and determine if it should be accepted or deduplicated - */ - processReport(report: ReportMetadata): DeduplicationResult { - const feedId = report.feedID; - const observationsTimestamp = report.observationsTimestamp; + private check(feedId: string, ts: number): Verdict { + let state = this.feedState.get(feedId); + if (!state) { + state = { watermark: 0, seen: new Set() }; + this.feedState.set(feedId, state); + } - const currentWatermark = this.waterMark.get(feedId); + if (state.seen.has(ts)) { + return Verdict.Duplicate; + } - if (currentWatermark !== undefined) { - if (this.allowOutOfOrder) { - if (observationsTimestamp === currentWatermark) { - this.deduplicatedCount++; - return { - isAccepted: false, - isDuplicate: true, - isOutOfOrder: false, - reason: `Duplicate timestamp ${observationsTimestamp} for feed ${feedId}`, - }; - } - if (observationsTimestamp < currentWatermark) { - this.outOfOrderCount++; - this.acceptedCount++; - return { isAccepted: true, isDuplicate: false, isOutOfOrder: true }; - } - this.waterMark.set(feedId, observationsTimestamp); - } else { - if (observationsTimestamp <= currentWatermark) { + if (state.seen.size >= SEEN_BUFFER_SIZE) { + const oldest = state.seen.values().next().value!; + state.seen.delete(oldest); + } + state.seen.add(ts); + + const isOutOfOrder = state.watermark > 0 && ts < state.watermark; + if (ts > state.watermark) { + state.watermark = ts; + } + + if (isOutOfOrder) { + return Verdict.OutOfOrder; + } + return Verdict.Accept; + } + + // Process a report and return a verdict on whether it is accepted, duplicated, or out-of-order. + processReport(report: ReportMetadata): DeduplicationResult { + const feedId = report.feedID; + const ts = report.observationsTimestamp; + const verdict = this.check(feedId, ts); + + switch (verdict) { + case Verdict.Duplicate: + this.deduplicatedCount++; + return { + isAccepted: false, + isDuplicate: true, + isOutOfOrder: false, + reason: `Duplicate timestamp ${ts} already seen for feed ${feedId}`, + }; + + case Verdict.OutOfOrder: { + this.outOfOrderCount++; + if (!this.allowOutOfOrder) { this.deduplicatedCount++; - const isOOO = observationsTimestamp < currentWatermark; - if (isOOO) { - this.outOfOrderCount++; - } return { isAccepted: false, - isDuplicate: true, - isOutOfOrder: isOOO, - reason: `Report timestamp ${observationsTimestamp} <= watermark ${currentWatermark} for feed ${feedId}`, + isDuplicate: false, + isOutOfOrder: true, + reason: `Out-of-order timestamp ${ts} < watermark ${this.feedState.get(feedId)!.watermark} for feed ${feedId}`, }; } - this.waterMark.set(feedId, observationsTimestamp); + this.acceptedCount++; + return { isAccepted: true, isDuplicate: false, isOutOfOrder: true }; } - } else { - this.waterMark.set(feedId, observationsTimestamp); - } - this.acceptedCount++; - return { isAccepted: true, isDuplicate: false, isOutOfOrder: false }; + case Verdict.Accept: + this.acceptedCount++; + return { isAccepted: true, isDuplicate: false, isOutOfOrder: false }; + } } - /** - * Get current deduplication statistics - */ + // Get statistics on deduplication performance. getStats(): DeduplicationStats { return { accepted: this.acceptedCount, deduplicated: this.deduplicatedCount, outOfOrder: this.outOfOrderCount, totalReceived: this.acceptedCount + this.deduplicatedCount, - watermarkCount: this.waterMark.size, + watermarkCount: this.feedState.size, }; } - /** - * Get watermark for a specific feed ID - */ + // Get the watermark for a feed. getWatermark(feedId: string): number | undefined { - return this.waterMark.get(feedId); - } - - /** - * Get all current watermarks (for debugging/monitoring) - */ - getAllWatermarks(): Record { - const watermarks: Record = {}; - for (const [feedId, timestamp] of this.waterMark) { - watermarks[feedId] = timestamp; - } - return watermarks; - } - - /** - * Manually set watermark for a feed (useful for initialization) - */ - setWatermark(feedId: string, timestamp: number): void { - this.waterMark.set(feedId, timestamp); - } - - /** - * Clear watermark for a specific feed - */ - clearWatermark(feedId: string): boolean { - return this.waterMark.delete(feedId); - } - - /** - * Clear all watermarks - */ - clearAllWatermarks(): void { - this.waterMark.clear(); + return this.feedState.get(feedId)?.watermark; } - /** - * Reset all counters and watermarks - */ reset(): void { this.acceptedCount = 0; this.deduplicatedCount = 0; this.outOfOrderCount = 0; - this.waterMark.clear(); + this.feedState.clear(); } - /** - * Start periodic cleanup of old watermarks - * This prevents memory leaks for feeds that are no longer active - */ private startCleanup(): void { this.cleanupInterval = setInterval(() => { this.cleanupOldWatermarks(); }, this.cleanupIntervalMs); } - /** - * Clean up watermarks that are too old - * This is a safety mechanism to prevent unbounded memory growth - */ + // Clean up old watermarks to keep memory usage under control. private cleanupOldWatermarks(): void { const now = Date.now(); - const cutoffTime = now - this.maxWatermarkAge; + const cutoffTimestamp = Math.floor((now - this.maxWatermarkAge) / 1000); - // Convert cutoff time to seconds (like the timestamps in reports) - const cutoffTimestamp = Math.floor(cutoffTime / 1000); - - let _removedCount = 0; - for (const [feedId, timestamp] of this.waterMark) { - if (timestamp < cutoffTimestamp) { - this.waterMark.delete(feedId); - _removedCount++; + for (const [feedId, state] of this.feedState) { + if (state.watermark < cutoffTimestamp) { + this.feedState.delete(feedId); } } } - /** - * Stop the deduplicator and clean up resources - */ stop(): void { if (this.cleanupInterval) { clearInterval(this.cleanupInterval); this.cleanupInterval = null; } } - - /** - * Get memory usage information - */ - getMemoryInfo(): { - watermarkCount: number; - estimatedMemoryBytes: number; - } { - const watermarkCount = this.waterMark.size; - - // Rough estimation: each entry has a string key (~64 chars) + number value - // String: ~64 bytes (feed ID) + Number: 8 bytes + Map overhead: ~32 bytes - const estimatedMemoryBytes = watermarkCount * (64 + 8 + 32); - - return { - watermarkCount, - estimatedMemoryBytes, - }; - } - - /** - * Export watermarks for persistence/debugging - */ - exportWatermarks(): Array<{ feedId: string; timestamp: number }> { - return Array.from(this.waterMark.entries()).map(([feedId, timestamp]) => ({ - feedId, - timestamp, - })); - } - - /** - * Import watermarks from external source - */ - importWatermarks(watermarks: Array<{ feedId: string; timestamp: number }>): void { - for (const { feedId, timestamp } of watermarks) { - this.waterMark.set(feedId, timestamp); - } - } } diff --git a/typescript/tests/unit/stream/deduplication.test.ts b/typescript/tests/unit/stream/deduplication.test.ts index 2ca11b5..824c77d 100644 --- a/typescript/tests/unit/stream/deduplication.test.ts +++ b/typescript/tests/unit/stream/deduplication.test.ts @@ -33,16 +33,15 @@ describe("ReportDeduplicator", () => { validFromTimestamp: 900, }; - // First report should be accepted const result1 = deduplicator.processReport(report); expect(result1.isAccepted).toBe(true); expect(result1.isDuplicate).toBe(false); - // Duplicate should be rejected const result2 = deduplicator.processReport(report); expect(result2.isAccepted).toBe(false); expect(result2.isDuplicate).toBe(true); - expect(result2.reason).toContain("watermark"); + expect(result2.isOutOfOrder).toBe(false); + expect(result2.reason).toContain("already seen"); }); it("should reject reports with older timestamps", () => { @@ -60,14 +59,12 @@ describe("ReportDeduplicator", () => { validFromTimestamp: 900, }; - // Accept newer report first const result1 = deduplicator.processReport(newerReport); expect(result1.isAccepted).toBe(true); - // Reject older report const result2 = deduplicator.processReport(olderReport); expect(result2.isAccepted).toBe(false); - expect(result2.isDuplicate).toBe(true); + expect(result2.isOutOfOrder).toBe(true); }); it("should accept reports with newer timestamps", () => { @@ -85,17 +82,71 @@ describe("ReportDeduplicator", () => { validFromTimestamp: 1900, }; - // Accept older report first const result1 = deduplicator.processReport(olderReport); expect(result1.isAccepted).toBe(true); - // Accept newer report const result2 = deduplicator.processReport(newerReport); expect(result2.isAccepted).toBe(true); expect(result2.isDuplicate).toBe(false); }); }); + describe("out-of-order with allowOutOfOrder", () => { + it("should accept out-of-order reports when allowOutOfOrder is true", () => { + const dedup = new ReportDeduplicator({ allowOutOfOrder: true }); + + dedup.processReport({ + feedID: "0x123", + observationsTimestamp: 2000, + fullReport: "newer", + validFromTimestamp: 1900, + }); + + const result = dedup.processReport({ + feedID: "0x123", + observationsTimestamp: 1000, + fullReport: "older", + validFromTimestamp: 900, + }); + + expect(result.isAccepted).toBe(true); + expect(result.isOutOfOrder).toBe(true); + expect(result.isDuplicate).toBe(false); + dedup.stop(); + }); + + it("should distinguish out-of-order from duplicate", () => { + const dedup = new ReportDeduplicator({ allowOutOfOrder: true }); + + dedup.processReport({ + feedID: "0x123", + observationsTimestamp: 200, + fullReport: "r", + validFromTimestamp: 100, + }); + + const ooo = dedup.processReport({ + feedID: "0x123", + observationsTimestamp: 100, + fullReport: "r", + validFromTimestamp: 50, + }); + expect(ooo.isAccepted).toBe(true); + expect(ooo.isOutOfOrder).toBe(true); + + const dup = dedup.processReport({ + feedID: "0x123", + observationsTimestamp: 100, + fullReport: "r", + validFromTimestamp: 50, + }); + expect(dup.isAccepted).toBe(false); + expect(dup.isDuplicate).toBe(true); + expect(dup.isOutOfOrder).toBe(false); + dedup.stop(); + }); + }); + describe("multi-feed handling", () => { it("should handle multiple feeds independently", () => { const report1: ReportMetadata = { @@ -107,19 +158,17 @@ describe("ReportDeduplicator", () => { const report2: ReportMetadata = { feedID: "0x456", - observationsTimestamp: 1000, // Same timestamp, different feed + observationsTimestamp: 1000, fullReport: "report2", validFromTimestamp: 900, }; - // Both should be accepted since they're for different feeds const result1 = deduplicator.processReport(report1); expect(result1.isAccepted).toBe(true); const result2 = deduplicator.processReport(report2); expect(result2.isAccepted).toBe(true); - // Duplicates should be rejected const result3 = deduplicator.processReport(report1); expect(result3.isAccepted).toBe(false); @@ -128,36 +177,27 @@ describe("ReportDeduplicator", () => { }); it("should track watermarks per feed independently", () => { - const feed1Report1: ReportMetadata = { + deduplicator.processReport({ feedID: "0x123", observationsTimestamp: 1000, fullReport: "report1", validFromTimestamp: 900, - }; + }); - const feed2Report1: ReportMetadata = { + deduplicator.processReport({ feedID: "0x456", observationsTimestamp: 2000, fullReport: "report2", validFromTimestamp: 1900, - }; + }); - const feed1Report2: ReportMetadata = { + deduplicator.processReport({ feedID: "0x123", observationsTimestamp: 1500, fullReport: "report3", validFromTimestamp: 1400, - }; - - // Accept initial reports - deduplicator.processReport(feed1Report1); - deduplicator.processReport(feed2Report1); - - // Accept newer report for feed1 - const result = deduplicator.processReport(feed1Report2); - expect(result.isAccepted).toBe(true); + }); - // Verify watermarks are independent expect(deduplicator.getWatermark("0x123")).toBe(1500); expect(deduplicator.getWatermark("0x456")).toBe(2000); }); @@ -169,114 +209,170 @@ describe("ReportDeduplicator", () => { }); it("should update watermarks correctly", () => { - const report: ReportMetadata = { + expect(deduplicator.getWatermark("0x123")).toBeUndefined(); + + deduplicator.processReport({ feedID: "0x123", observationsTimestamp: 1500, fullReport: "report", validFromTimestamp: 1400, - }; - - expect(deduplicator.getWatermark("0x123")).toBeUndefined(); - - deduplicator.processReport(report); + }); expect(deduplicator.getWatermark("0x123")).toBe(1500); }); - it("should not update watermark for rejected reports", () => { - const report1: ReportMetadata = { + it("should not update watermark for out-of-order reports", () => { + deduplicator.processReport({ feedID: "0x123", observationsTimestamp: 2000, fullReport: "report1", validFromTimestamp: 1900, - }; + }); + expect(deduplicator.getWatermark("0x123")).toBe(2000); - const report2: ReportMetadata = { + deduplicator.processReport({ feedID: "0x123", - observationsTimestamp: 1000, // Older + observationsTimestamp: 1000, fullReport: "report2", validFromTimestamp: 900, - }; - - // Accept newer report - deduplicator.processReport(report1); + }); expect(deduplicator.getWatermark("0x123")).toBe(2000); - - // Reject older report - const result = deduplicator.processReport(report2); - expect(result.isAccepted).toBe(false); - expect(deduplicator.getWatermark("0x123")).toBe(2000); // Should remain unchanged }); + }); - it("should allow manual watermark setting", () => { - deduplicator.setWatermark("0x123", 5000); - expect(deduplicator.getWatermark("0x123")).toBe(5000); + describe("FIFO eviction", () => { + it("should evict oldest-inserted entry when buffer is full", () => { + for (let i = 1; i <= 32; i++) { + deduplicator.processReport({ + feedID: "0x123", + observationsTimestamp: i, + fullReport: "r", + validFromTimestamp: i - 1, + }); + } - const report: ReportMetadata = { + // ts=2 (second inserted) should still be in the buffer + const stillPresent = deduplicator.processReport({ feedID: "0x123", - observationsTimestamp: 3000, // Lower than manual watermark - fullReport: "report", - validFromTimestamp: 2900, - }; + observationsTimestamp: 2, + fullReport: "r", + validFromTimestamp: 1, + }); + expect(stillPresent.isDuplicate).toBe(true); - const result = deduplicator.processReport(report); - expect(result.isAccepted).toBe(false); + // Adding ts=33 evicts ts=1 (oldest inserted) + deduplicator.processReport({ + feedID: "0x123", + observationsTimestamp: 33, + fullReport: "r", + validFromTimestamp: 32, + }); + + // ts=1 was evicted, so it's no longer a duplicate + // (calling processReport re-inserts it, which evicts ts=3 as next oldest) + const evicted = deduplicator.processReport({ + feedID: "0x123", + observationsTimestamp: 1, + fullReport: "r", + validFromTimestamp: 0, + }); + expect(evicted.isDuplicate).toBe(false); }); - it("should clear specific watermarks", () => { - deduplicator.setWatermark("0x123", 1000); - deduplicator.setWatermark("0x456", 2000); + it("should evict by insertion order, not by smallest value", () => { + // Insert 100 first, then 1..31 (total 32 entries) + deduplicator.processReport({ + feedID: "0x123", + observationsTimestamp: 100, + fullReport: "r", + validFromTimestamp: 99, + }); + for (let i = 1; i <= 31; i++) { + deduplicator.processReport({ + feedID: "0x123", + observationsTimestamp: i, + fullReport: "r", + validFromTimestamp: i - 1, + }); + } - expect(deduplicator.getWatermark("0x123")).toBe(1000); - expect(deduplicator.getWatermark("0x456")).toBe(2000); + // Add 999 -> should evict 100 (oldest inserted), NOT 1 (smallest value) + deduplicator.processReport({ + feedID: "0x123", + observationsTimestamp: 999, + fullReport: "r", + validFromTimestamp: 998, + }); - const cleared = deduplicator.clearWatermark("0x123"); - expect(cleared).toBe(true); - expect(deduplicator.getWatermark("0x123")).toBeUndefined(); - expect(deduplicator.getWatermark("0x456")).toBe(2000); + // ts=1 should still be present + const smallestStillPresent = deduplicator.processReport({ + feedID: "0x123", + observationsTimestamp: 1, + fullReport: "r", + validFromTimestamp: 0, + }); + expect(smallestStillPresent.isDuplicate).toBe(true); - const alreadyCleared = deduplicator.clearWatermark("0x123"); - expect(alreadyCleared).toBe(false); + // ts=100 should have been evicted + const oldestEvicted = deduplicator.processReport({ + feedID: "0x123", + observationsTimestamp: 100, + fullReport: "r", + validFromTimestamp: 99, + }); + expect(oldestEvicted.isDuplicate).toBe(false); }); + }); - it("should clear all watermarks", () => { - deduplicator.setWatermark("0x123", 1000); - deduplicator.setWatermark("0x456", 2000); + describe("HA duplicate detection", () => { + it("should detect HA duplicate after watermark advance", () => { + deduplicator.processReport({ + feedID: "0x123", + observationsTimestamp: 100, + fullReport: "r", + validFromTimestamp: 99, + }); - deduplicator.clearAllWatermarks(); + deduplicator.processReport({ + feedID: "0x123", + observationsTimestamp: 200, + fullReport: "r", + validFromTimestamp: 199, + }); - expect(deduplicator.getWatermark("0x123")).toBeUndefined(); - expect(deduplicator.getWatermark("0x456")).toBeUndefined(); + // HA duplicate of ts=100 from second connection + const result = deduplicator.processReport({ + feedID: "0x123", + observationsTimestamp: 100, + fullReport: "r", + validFromTimestamp: 99, + }); + expect(result.isDuplicate).toBe(true); + expect(result.isOutOfOrder).toBe(false); + expect(result.isAccepted).toBe(false); }); }); describe("statistics tracking", () => { it("should track statistics correctly", () => { - const report1: ReportMetadata = { + deduplicator.processReport({ feedID: "0x123", observationsTimestamp: 1000, fullReport: "report1", validFromTimestamp: 900, - }; - - const report2: ReportMetadata = { + }); + deduplicator.processReport({ feedID: "0x123", - observationsTimestamp: 1000, // Duplicate + observationsTimestamp: 1000, fullReport: "report2", validFromTimestamp: 900, - }; - - const report3: ReportMetadata = { + }); + deduplicator.processReport({ feedID: "0x456", observationsTimestamp: 2000, fullReport: "report3", validFromTimestamp: 1900, - }; - - // Process reports - deduplicator.processReport(report1); // Accepted - deduplicator.processReport(report2); // Deduplicated - deduplicator.processReport(report3); // Accepted + }); const stats = deduplicator.getStats(); expect(stats.accepted).toBe(2); @@ -286,15 +382,18 @@ describe("ReportDeduplicator", () => { }); it("should reset statistics", () => { - const report: ReportMetadata = { + deduplicator.processReport({ feedID: "0x123", observationsTimestamp: 1000, fullReport: "report", validFromTimestamp: 900, - }; - - deduplicator.processReport(report); - deduplicator.processReport(report); // Duplicate + }); + deduplicator.processReport({ + feedID: "0x123", + observationsTimestamp: 1000, + fullReport: "report", + validFromTimestamp: 900, + }); let stats = deduplicator.getStats(); expect(stats.accepted).toBe(1); @@ -312,28 +411,23 @@ describe("ReportDeduplicator", () => { describe("memory management", () => { it("should handle large numbers of feeds efficiently", () => { - const feedCount = 1000; // Reduced for test performance + const feedCount = 1000; const feeds: string[] = []; - // Generate many unique feed IDs for (let i = 0; i < feedCount; i++) { feeds.push(`0x${i.toString(16).padStart(64, "0")}`); } - // Add reports for all feeds feeds.forEach((feedID, index) => { - const report: ReportMetadata = { + const result = deduplicator.processReport({ feedID, observationsTimestamp: index + 1000, fullReport: `report-${index}`, validFromTimestamp: index + 900, - }; - - const result = deduplicator.processReport(report); + }); expect(result.isAccepted).toBe(true); }); - // Verify all watermarks are set correctly feeds.forEach((feedID, index) => { expect(deduplicator.getWatermark(feedID)).toBe(index + 1000); }); @@ -341,167 +435,64 @@ describe("ReportDeduplicator", () => { const stats = deduplicator.getStats(); expect(stats.watermarkCount).toBe(feedCount); }); - - it("should provide memory usage information", () => { - const report: ReportMetadata = { - feedID: "0x123", - observationsTimestamp: 1000, - fullReport: "report", - validFromTimestamp: 900, - }; - - deduplicator.processReport(report); - - const memoryInfo = deduplicator.getMemoryInfo(); - expect(memoryInfo.watermarkCount).toBe(1); - expect(memoryInfo.estimatedMemoryBytes).toBeGreaterThan(0); - }); }); describe("edge cases", () => { it("should handle zero timestamp", () => { - const report: ReportMetadata = { + const result = deduplicator.processReport({ feedID: "0x123", observationsTimestamp: 0, fullReport: "report", validFromTimestamp: 0, - }; - - const result = deduplicator.processReport(report); + }); expect(result.isAccepted).toBe(true); expect(deduplicator.getWatermark("0x123")).toBe(0); - // Should reject duplicate with same zero timestamp - const result2 = deduplicator.processReport(report); + const result2 = deduplicator.processReport({ + feedID: "0x123", + observationsTimestamp: 0, + fullReport: "report", + validFromTimestamp: 0, + }); expect(result2.isAccepted).toBe(false); }); it("should handle very large timestamps", () => { const largeTimestamp = Number.MAX_SAFE_INTEGER; - const report: ReportMetadata = { + const result = deduplicator.processReport({ feedID: "0x123", observationsTimestamp: largeTimestamp, fullReport: "report", validFromTimestamp: largeTimestamp - 1, - }; - - const result = deduplicator.processReport(report); + }); expect(result.isAccepted).toBe(true); expect(deduplicator.getWatermark("0x123")).toBe(largeTimestamp); }); it("should handle empty feed ID", () => { - const report: ReportMetadata = { + const result = deduplicator.processReport({ feedID: "", observationsTimestamp: 1000, fullReport: "report", validFromTimestamp: 900, - }; - - const result = deduplicator.processReport(report); + }); expect(result.isAccepted).toBe(true); expect(deduplicator.getWatermark("")).toBe(1000); }); it("should handle special characters in feed ID", () => { const specialFeedId = "0x!@#$%^&*()_+-=[]{}|;:,.<>?"; - const report: ReportMetadata = { + const result = deduplicator.processReport({ feedID: specialFeedId, observationsTimestamp: 1000, fullReport: "report", validFromTimestamp: 900, - }; - - const result = deduplicator.processReport(report); + }); expect(result.isAccepted).toBe(true); expect(deduplicator.getWatermark(specialFeedId)).toBe(1000); }); }); - describe("export/import functionality", () => { - it("should export watermarks correctly", () => { - const reports = [ - { - feedID: "0x123", - observationsTimestamp: 1000, - fullReport: "report1", - validFromTimestamp: 900, - }, - { - feedID: "0x456", - observationsTimestamp: 2000, - fullReport: "report2", - validFromTimestamp: 1900, - }, - ]; - - reports.forEach(report => { - deduplicator.processReport(report as ReportMetadata); - }); - - const exported = deduplicator.exportWatermarks(); - expect(exported).toHaveLength(2); - expect(exported).toContainEqual({ feedId: "0x123", timestamp: 1000 }); - expect(exported).toContainEqual({ feedId: "0x456", timestamp: 2000 }); - }); - - it("should import watermarks correctly", () => { - const watermarks = [ - { feedId: "0x123", timestamp: 1500 }, - { feedId: "0x456", timestamp: 2500 }, - { feedId: "0x789", timestamp: 3500 }, - ]; - - deduplicator.importWatermarks(watermarks); - - expect(deduplicator.getWatermark("0x123")).toBe(1500); - expect(deduplicator.getWatermark("0x456")).toBe(2500); - expect(deduplicator.getWatermark("0x789")).toBe(3500); - - const stats = deduplicator.getStats(); - expect(stats.watermarkCount).toBe(3); - }); - - it("should handle empty export", () => { - const exported = deduplicator.exportWatermarks(); - expect(exported).toEqual([]); - }); - - it("should handle empty import", () => { - deduplicator.importWatermarks([]); - const stats = deduplicator.getStats(); - expect(stats.watermarkCount).toBe(0); - }); - - it("should overwrite existing watermarks on import", () => { - // Set initial watermark - deduplicator.setWatermark("0x123", 1000); - expect(deduplicator.getWatermark("0x123")).toBe(1000); - - // Import should overwrite - deduplicator.importWatermarks([{ feedId: "0x123", timestamp: 2000 }]); - expect(deduplicator.getWatermark("0x123")).toBe(2000); - }); - }); - - describe("watermark access", () => { - it("should get all watermarks", () => { - deduplicator.setWatermark("0x123", 1000); - deduplicator.setWatermark("0x456", 2000); - - const allWatermarks = deduplicator.getAllWatermarks(); - expect(allWatermarks).toEqual({ - "0x123": 1000, - "0x456": 2000, - }); - }); - - it("should return empty object when no watermarks exist", () => { - const allWatermarks = deduplicator.getAllWatermarks(); - expect(allWatermarks).toEqual({}); - }); - }); - describe("cleanup functionality", () => { it("should initialize with cleanup enabled", () => { const dedup = new ReportDeduplicator({ From 00916e8590101fedb5492e3896174ab6a33eb66b Mon Sep 17 00:00:00 2001 From: cal Date: Fri, 10 Apr 2026 13:32:34 +1000 Subject: [PATCH 3/3] add todo comment for ms support in s sdk --- typescript/src/stream/deduplication.ts | 2 ++ 1 file changed, 2 insertions(+) diff --git a/typescript/src/stream/deduplication.ts b/typescript/src/stream/deduplication.ts index 0060e1a..97838e8 100644 --- a/typescript/src/stream/deduplication.ts +++ b/typescript/src/stream/deduplication.ts @@ -165,6 +165,8 @@ export class ReportDeduplicator { // Clean up old watermarks to keep memory usage under control. private cleanupOldWatermarks(): void { const now = Date.now(); + // todo: this assumes that the timestamp is in seconds, introduce millisecond support when sdk is + // updated to handle millisecond timestamps. const cutoffTimestamp = Math.floor((now - this.maxWatermarkAge) / 1000); for (const [feedId, state] of this.feedState) {