Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions go/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,7 @@ type Config struct {
WsURL string // Websocket Api url

WsHA bool // Use concurrent connections to multiple Streams servers
WsAllowOutOfOrder bool // Allow out-of-order reports through while still deduplicating HA duplicates
WsMaxReconnect int // Maximum number of reconnection attempts for Stream underlying connections
LogDebug bool // Log debug information
InsecureSkipVerify bool // Skip server certificate chain and host name verification
Expand Down Expand Up @@ -292,6 +293,7 @@ func (r *ReportResponse) UnmarshalJSON(b []byte) (err error)
type Stats struct {
Accepted uint64 // Total number of accepted reports
Deduplicated uint64 // Total number of deduplicated reports when in HA
OutOfOrder uint64 // Total number of out-of-order reports seen
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice touch!

TotalReceived uint64 // Total number of received reports
PartialReconnects uint64 // Total number of partial reconnects when in HA
FullReconnects uint64 // Total number of full reconnects
Expand Down
1 change: 1 addition & 0 deletions go/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ type Config struct {
WsURL string // Websocket Api url
wsURL *url.URL // Websocket Api url
WsHA bool // Use concurrent connections to multiple Streams servers
WsAllowOutOfOrder bool // Allow out-of-order reports through while still deduplicating HA duplicates
WsMaxReconnect int // Maximum number of reconnection attempts for Stream underlying connections
LogDebug bool // Log debug information
InsecureSkipVerify bool // Skip server certificate chain and host name verification
Expand Down
59 changes: 59 additions & 0 deletions go/dedup.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
package streams

const seenBufferSize = 32

type Verdict int

const (
Accept Verdict = iota
Duplicate
OutOfOrder
)

type feedState struct {
watermark int64
ring [seenBufferSize]int64
set map[int64]struct{}
cursor int
count int
}

type FeedDeduplicator struct {
feeds map[string]*feedState
}

func NewFeedDeduplicator() *FeedDeduplicator {
return &FeedDeduplicator{feeds: make(map[string]*feedState)}
}

func (d *FeedDeduplicator) Check(feedID string, ts int64) Verdict {
fs := d.feeds[feedID]
if fs == nil {
fs = &feedState{set: make(map[int64]struct{}, seenBufferSize)}
d.feeds[feedID] = fs
}

if _, dup := fs.set[ts]; dup {
return Duplicate
}

if fs.count == seenBufferSize {
evict := fs.ring[fs.cursor]
delete(fs.set, evict)
} else {
fs.count++
}
fs.ring[fs.cursor] = ts
fs.set[ts] = struct{}{}
fs.cursor = (fs.cursor + 1) % seenBufferSize

isOutOfOrder := fs.watermark > 0 && ts < fs.watermark
if ts > fs.watermark {
fs.watermark = ts
}

if isOutOfOrder {
return OutOfOrder
}
return Accept
}
109 changes: 109 additions & 0 deletions go/dedup_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
package streams

import "testing"

func TestFeedDeduplicator_Accept(t *testing.T) {
d := NewFeedDeduplicator()
if v := d.Check("feed-1", 100); v != Accept {
t.Fatalf("expected Accept, got %d", v)
}
}

func TestFeedDeduplicator_Duplicate(t *testing.T) {
d := NewFeedDeduplicator()
d.Check("feed-1", 100)
if v := d.Check("feed-1", 100); v != Duplicate {
t.Fatalf("expected Duplicate, got %d", v)
}
}

func TestFeedDeduplicator_OutOfOrder(t *testing.T) {
d := NewFeedDeduplicator()
d.Check("feed-1", 200)
if v := d.Check("feed-1", 100); v != OutOfOrder {
t.Fatalf("expected OutOfOrder, got %d", v)
}
}

func TestFeedDeduplicator_OutOfOrderNotDuplicate(t *testing.T) {
d := NewFeedDeduplicator()
d.Check("feed-1", 200)
v := d.Check("feed-1", 100)
if v != OutOfOrder {
t.Fatalf("expected OutOfOrder for first OOO delivery, got %d", v)
}
if v := d.Check("feed-1", 100); v != Duplicate {
t.Fatalf("expected Duplicate for second OOO delivery, got %d", v)
}
}

func TestFeedDeduplicator_FIFOEviction(t *testing.T) {
d := NewFeedDeduplicator()
for i := int64(1); i <= seenBufferSize; i++ {
d.Check("feed-1", i)
}
d.Check("feed-1", 33)
// ts=2 (second inserted) should still be in the buffer
if v := d.Check("feed-1", 2); v != Duplicate {
t.Fatalf("expected ts=2 still in buffer, got %d", v)
}
// ts=1 (first inserted) was evicted by ts=33
if v := d.Check("feed-1", 1); v == Duplicate {
t.Fatal("expected ts=1 to be evicted (FIFO), but got Duplicate")
}
}

func TestFeedDeduplicator_FIFOEvictsOldestInsertedNotSmallest(t *testing.T) {
d := NewFeedDeduplicator()
// Insert out of order: 100, 1, 2, 3, ..., 31 (total 32 entries)
d.Check("feed-1", 100)
for i := int64(1); i <= seenBufferSize-1; i++ {
d.Check("feed-1", i)
}
// Buffer is full. ts=100 was inserted first (oldest by insertion).
// Adding ts=999 should evict ts=100, NOT ts=1 (the smallest value).
d.Check("feed-1", 999)
// ts=1 should still be present (smallest value, but NOT oldest inserted)
if v := d.Check("feed-1", 1); v != Duplicate {
t.Fatalf("expected ts=1 (smallest value, but not oldest inserted) to remain, got %d", v)
}
// ts=100 should have been evicted (oldest inserted)
if v := d.Check("feed-1", 100); v == Duplicate {
t.Fatal("expected ts=100 (oldest inserted) to be evicted, but got Duplicate")
}
}

func TestFeedDeduplicator_IndependentFeeds(t *testing.T) {
d := NewFeedDeduplicator()
d.Check("feed-a", 100)
d.Check("feed-b", 100)

if v := d.Check("feed-a", 100); v != Duplicate {
t.Fatalf("expected Duplicate for feed-a, got %d", v)
}
if v := d.Check("feed-b", 100); v != Duplicate {
t.Fatalf("expected Duplicate for feed-b, got %d", v)
}
// Different feed, same ts is not a duplicate
if v := d.Check("feed-c", 100); v != Accept {
t.Fatalf("expected Accept for new feed-c, got %d", v)
}
}

func TestFeedDeduplicator_WatermarkZeroNotOutOfOrder(t *testing.T) {
d := NewFeedDeduplicator()
// First report at ts=0 should be Accept, not OutOfOrder
if v := d.Check("feed-1", 0); v != Accept {
t.Fatalf("expected Accept for first report at ts=0, got %d", v)
}
}

func TestFeedDeduplicator_HADuplicateAfterWatermarkAdvance(t *testing.T) {
d := NewFeedDeduplicator()
d.Check("feed-1", 100) // Accept
d.Check("feed-1", 200) // Accept, watermark -> 200
// HA duplicate of ts=100 arrives from second connection
if v := d.Check("feed-1", 100); v != Duplicate {
t.Fatalf("expected Duplicate for HA retransmit, got %d", v)
}
}
32 changes: 21 additions & 11 deletions go/stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ type Stream interface {
type Stats struct {
Accepted uint64 // Total number of accepted reports
Deduplicated uint64 // Total number of deduplicated reports when in HA
OutOfOrder uint64 // Total number of out-of-order reports seen
TotalReceived uint64 // Total number of received reports
PartialReconnects uint64 // Total number of partial reconnects when in HA
FullReconnects uint64 // Total number of full reconnects
Expand All @@ -63,8 +64,8 @@ type Stats struct {

func (s Stats) String() (st string) {
return fmt.Sprintf(
"accepted: %d, deduplicated: %d, total_received %d, partial_reconnects: %d, full_reconnects: %d, configured_connections: %d, active_connections %d",
s.Accepted, s.Deduplicated,
"accepted: %d, deduplicated: %d, out_of_order: %d, total_received %d, partial_reconnects: %d, full_reconnects: %d, configured_connections: %d, active_connections %d",
s.Accepted, s.Deduplicated, s.OutOfOrder,
s.TotalReceived, s.PartialReconnects,
s.FullReconnects, s.ConfiguredConnections, s.ActiveConnections,
)
Expand All @@ -82,12 +83,13 @@ type stream struct {
closeError atomic.Value
connStatusCallback func(isConneccted bool, host string, origin string)

waterMarkMu sync.Mutex
waterMark map[string]time.Time
feedsMu sync.Mutex
dedup *FeedDeduplicator

stats struct {
accepted atomic.Uint64
skipped atomic.Uint64
outOfOrder atomic.Uint64
partialReconnects atomic.Uint64
fullReconnects atomic.Uint64
activeConnections atomic.Uint64
Expand All @@ -107,7 +109,7 @@ func (c *client) newStream(ctx context.Context, httpClient *http.Client, feedIDs
config: c.config,
output: make(chan *ReportResponse, 1),
feedIDs: feedIDs,
waterMark: make(map[string]time.Time),
dedup: NewFeedDeduplicator(),
streamCtx: streamCtx,
streamCtxCancel: streamCtxCancel,
}
Expand Down Expand Up @@ -310,6 +312,7 @@ func (s *stream) newWSconnWithRetry(origin string) (conn *wsConn, err error) {
func (s *stream) Stats() (st Stats) {
st.Accepted = s.stats.accepted.Load()
st.Deduplicated = s.stats.skipped.Load()
st.OutOfOrder = s.stats.outOfOrder.Load()
st.TotalReceived = st.Accepted + st.Deduplicated
st.PartialReconnects = s.stats.partialReconnects.Load()
st.FullReconnects = s.stats.fullReconnects.Load()
Expand Down Expand Up @@ -359,18 +362,25 @@ func (s *stream) Close() (err error) {

func (s *stream) accept(ctx context.Context, m *message) (err error) {
id := m.Report.FeedID.String()
ts := m.Report.ObservationsTimestamp.UnixMilli()

s.waterMarkMu.Lock()
// Skip older reports and reports with the same timestamp
if !m.Report.ObservationsTimestamp.After(s.waterMark[id]) {
s.feedsMu.Lock()
verdict := s.dedup.Check(id, ts)
s.feedsMu.Unlock()

switch verdict {
case Duplicate:
s.stats.skipped.Add(1)
s.waterMarkMu.Unlock()
return nil
case OutOfOrder:
s.stats.outOfOrder.Add(1)
if !s.config.WsAllowOutOfOrder {
s.stats.skipped.Add(1)
return nil
}
}

s.stats.accepted.Add(1)
s.waterMark[id] = m.Report.ObservationsTimestamp
s.waterMarkMu.Unlock()

select {
case <-ctx.Done():
Expand Down
Loading
Loading