Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions go/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,7 @@ type Config struct {
WsURL string // Websocket Api url

WsHA bool // Use concurrent connections to multiple Streams servers
WsAllowOutOfOrder bool // Allow out-of-order reports through while still deduplicating HA duplicates
WsMaxReconnect int // Maximum number of reconnection attempts for Stream underlying connections
LogDebug bool // Log debug information
InsecureSkipVerify bool // Skip server certificate chain and host name verification
Expand Down Expand Up @@ -292,6 +293,7 @@ func (r *ReportResponse) UnmarshalJSON(b []byte) (err error)
type Stats struct {
Accepted uint64 // Total number of accepted reports
Deduplicated uint64 // Total number of deduplicated reports when in HA
OutOfOrder uint64 // Total number of out-of-order reports seen
TotalReceived uint64 // Total number of received reports
PartialReconnects uint64 // Total number of partial reconnects when in HA
FullReconnects uint64 // Total number of full reconnects
Expand Down
63 changes: 46 additions & 17 deletions go/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ import (
"strings"
"time"

"github.com/smartcontractkit/data-streams-sdk/go/feed"
"github.com/smartcontractkit/data-streams-sdk/go/v2/feed"
)

// Client is the data streams client interface.
Expand Down Expand Up @@ -151,25 +151,41 @@ func (c *client) GetLatestReport(ctx context.Context, id feed.ID) (r *ReportResp
// ReportResponse implements the report envelope that contains the full report payload,
// its FeedID and timestamps. For decoding the Report Payload use report.Decode().
type ReportResponse struct {
FeedID feed.ID `json:"feedID"`
FullReport []byte `json:"fullReport"`
ValidFromTimestamp uint64 `json:"validFromTimestamp"`
ObservationsTimestamp uint64 `json:"observationsTimestamp"`
FeedID feed.ID
FullReport []byte
ValidFromTimestamp time.Time
ObservationsTimestamp time.Time
}

func (r *ReportResponse) UnmarshalJSON(b []byte) (err error) {
type Alias ReportResponse
aux := &struct {
FullReport string `json:"fullReport"`
*Alias
}{
Alias: (*Alias)(r),
}
FeedID feed.ID `json:"feedID"`
FullReport string `json:"fullReport"`
ValidFromTimestamp uint64 `json:"validFromTimestamp"`
ObservationsTimestamp uint64 `json:"observationsTimestamp"`
ValidFromTimestampMs uint64 `json:"validFromTimestampMs"`
ObservationsTimestampMs uint64 `json:"observationsTimestampMs"`
}{}

if err := json.Unmarshal(b, aux); err != nil {
return err
}

r.FeedID = aux.FeedID

// V2 payloads use milliseconds, V1 payloads use seconds
if aux.ValidFromTimestampMs > 0 {
r.ValidFromTimestamp = time.UnixMilli(int64(aux.ValidFromTimestampMs))
} else if aux.ValidFromTimestamp > 0 {
r.ValidFromTimestamp = time.Unix(int64(aux.ValidFromTimestamp), 0)
}

if aux.ObservationsTimestampMs > 0 {
r.ObservationsTimestamp = time.UnixMilli(int64(aux.ObservationsTimestampMs))
} else if aux.ObservationsTimestamp > 0 {
r.ObservationsTimestamp = time.Unix(int64(aux.ObservationsTimestamp), 0)
}

if len(aux.FullReport) < 3 {
return nil
}
Expand All @@ -182,13 +198,26 @@ func (r *ReportResponse) UnmarshalJSON(b []byte) (err error) {
}

func (r *ReportResponse) MarshalJSON() ([]byte, error) {
type Alias ReportResponse
var validFrom, observationsTS uint64

// Wrapper timestamps are always in milliseconds
if !r.ValidFromTimestamp.IsZero() {
validFrom = uint64(r.ValidFromTimestamp.UnixMilli())
}
if !r.ObservationsTimestamp.IsZero() {
observationsTS = uint64(r.ObservationsTimestamp.UnixMilli())
}

return json.Marshal(&struct {
FullReport string `json:"fullReport"`
*Alias
FeedID feed.ID `json:"feedID"`
FullReport string `json:"fullReport"`
ValidFromTimestampMs uint64 `json:"validFromTimestampMs"`
ObservationsTimestampMs uint64 `json:"observationsTimestampMs"`
}{
FullReport: "0x" + hex.EncodeToString(r.FullReport),
Alias: (*Alias)(r),
FeedID: r.FeedID,
FullReport: "0x" + hex.EncodeToString(r.FullReport),
ValidFromTimestampMs: validFrom,
ObservationsTimestampMs: observationsTS,
})
}

Expand Down Expand Up @@ -243,7 +272,7 @@ func (c *client) GetReportPage(ctx context.Context, id feed.ID, pageTS uint64) (
}
r.NextPageTS = 0
if len(r.Reports) > 0 {
r.NextPageTS = r.Reports[len(r.Reports)-1].ObservationsTimestamp + 1
r.NextPageTS = uint64(r.Reports[len(r.Reports)-1].ObservationsTimestamp.Unix()) + 1
}
return r, err
}
Expand Down
69 changes: 56 additions & 13 deletions go/client_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package streams

import (
"bytes"
"context"
"encoding/json"
"fmt"
Expand All @@ -10,11 +11,53 @@ import (
"strconv"
"strings"
"testing"
"time"

"github.com/ethereum/go-ethereum/common/hexutil"
"github.com/smartcontractkit/data-streams-sdk/go/feed"
"github.com/smartcontractkit/data-streams-sdk/go/v2/feed"
)

// reportResponseEqual compares two ReportResponse structs, handling time.Time comparison properly
func reportResponseEqual(a, b *ReportResponse) bool {
if a.FeedID != b.FeedID {
return false
}
if !bytes.Equal(a.FullReport, b.FullReport) {
return false
}
if !a.ObservationsTimestamp.Equal(b.ObservationsTimestamp) {
return false
}
if !a.ValidFromTimestamp.Equal(b.ValidFromTimestamp) {
return false
}
return true
}

// reportResponsesEqual compares slices of ReportResponse
func reportResponsesEqual(a, b []*ReportResponse) bool {
if len(a) != len(b) {
return false
}
for i := range a {
if !reportResponseEqual(a[i], b[i]) {
return false
}
}
return true
}

// reportPageEqual compares two ReportPage structs
func reportPageEqual(a, b *ReportPage) bool {
if !reportResponsesEqual(a.Reports, b.Reports) {
return false
}
if a.NextPageTS != b.NextPageTS {
return false
}
return true
}

func mustFeedIDfromString(s string) (f feed.ID) {
err := f.FromString(s)
if err != nil {
Expand Down Expand Up @@ -68,8 +111,8 @@ func TestClient_GetFeeds(t *testing.T) {

func TestClient_GetReports(t *testing.T) {
expectedReports := []*ReportResponse{
{FeedID: feed1, ObservationsTimestamp: 12344},
{FeedID: feed2, ObservationsTimestamp: 12344},
{FeedID: feed1, ObservationsTimestamp: time.Unix(12344, 0)},
{FeedID: feed2, ObservationsTimestamp: time.Unix(12344, 0)},
}
expectedFeedIdListStr := fmt.Sprintf("%s,%s", feed1.String(), feed2.String())

Expand Down Expand Up @@ -111,7 +154,7 @@ func TestClient_GetReports(t *testing.T) {

fmt.Println(expectedReports[0], reports[0])

if !reflect.DeepEqual(reports, expectedReports) {
if !reportResponsesEqual(reports, expectedReports) {
t.Errorf("GetFeeds() = %v, want %v", reports, expectedReports)
}
}
Expand Down Expand Up @@ -155,7 +198,7 @@ func TestClient_GetLatestReport(t *testing.T) {
t.Fatalf("GetLatestReport() error = %v", err)
}

if !reflect.DeepEqual(report, expectedReport) {
if !reportResponseEqual(report, expectedReport) {
t.Errorf("GetLatestReport() = %v, want %v", report, expectedReport)
}
}
Expand All @@ -165,18 +208,18 @@ func TestClient_GetReportPage(t *testing.T) {

expectedReportPage1 := &ReportPage{
Reports: []*ReportResponse{
{FeedID: feed1, ObservationsTimestamp: 1234567890, FullReport: hexutil.Bytes(`report1 payload`)},
{FeedID: feed1, ObservationsTimestamp: 1234567891, FullReport: hexutil.Bytes(`report2 payload`)},
{FeedID: feed1, FullReport: hexutil.Bytes(`report1 payload`), ObservationsTimestamp: time.Unix(1234567897, 0)},
{FeedID: feed1, FullReport: hexutil.Bytes(`report2 payload`), ObservationsTimestamp: time.Unix(1234567898, 0)},
},
NextPageTS: 1234567892,
NextPageTS: 1234567899, // Last ObservationsTimestamp (1234567898) + 1
}

expectedReportPage2 := &ReportPage{
Reports: []*ReportResponse{
{FeedID: feed1, ObservationsTimestamp: 1234567892, FullReport: hexutil.Bytes(`report3 payload`)},
{FeedID: feed1, ObservationsTimestamp: 1234567893, FullReport: hexutil.Bytes(`report4 payload`)},
{FeedID: feed1, FullReport: hexutil.Bytes(`report3 payload`), ObservationsTimestamp: time.Unix(1234567997, 0)},
{FeedID: feed1, FullReport: hexutil.Bytes(`report4 payload`), ObservationsTimestamp: time.Unix(1234567998, 0)},
},
NextPageTS: 1234567894,
NextPageTS: 1234567999, // Last ObservationsTimestamp (1234567998) + 1
}

ms := newMockServer(func(w http.ResponseWriter, r *http.Request) {
Expand Down Expand Up @@ -230,7 +273,7 @@ func TestClient_GetReportPage(t *testing.T) {
t.Fatalf("GetReportPage() error = %v", err)
}

if !reflect.DeepEqual(reportPage, expectedReportPage1) {
if !reportPageEqual(reportPage, expectedReportPage1) {
t.Errorf("GetReportPage() = %v, want %v", reportPage, expectedReportPage1)
}

Expand All @@ -239,7 +282,7 @@ func TestClient_GetReportPage(t *testing.T) {
t.Fatalf("GetReportPage() error = %v", err)
}

if !reflect.DeepEqual(reportPage, expectedReportPage2) {
if !reportPageEqual(reportPage, expectedReportPage2) {
t.Errorf("GetReportPage() = %v, want %v", reportPage, expectedReportPage2)
}
}
Expand Down
1 change: 1 addition & 0 deletions go/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ type Config struct {
WsURL string // Websocket Api url
wsURL *url.URL // Websocket Api url
WsHA bool // Use concurrent connections to multiple Streams servers
WsAllowOutOfOrder bool // Allow out-of-order reports through while still deduplicating HA duplicates
WsMaxReconnect int // Maximum number of reconnection attempts for Stream underlying connections
LogDebug bool // Log debug information
InsecureSkipVerify bool // Skip server certificate chain and host name verification
Expand Down
66 changes: 66 additions & 0 deletions go/dedup.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
package streams

import "sync"

const seenBufferSize = 32

type Verdict int

const (
Accept Verdict = iota
Duplicate
OutOfOrder
)

type feedState struct {
watermark int64
ring [seenBufferSize]int64
set map[int64]struct{}
cursor int
count int
}

type FeedDeduplicator struct {
mu sync.Mutex
feeds map[string]*feedState
}

func NewFeedDeduplicator() *FeedDeduplicator {
return &FeedDeduplicator{feeds: make(map[string]*feedState)}
}

func (d *FeedDeduplicator) Check(feedID string, ts int64) Verdict {
d.mu.Lock()
defer d.mu.Unlock()

fs := d.feeds[feedID]
if fs == nil {
fs = &feedState{set: make(map[int64]struct{}, seenBufferSize)}
d.feeds[feedID] = fs
}

if _, dup := fs.set[ts]; dup {
return Duplicate
}

if fs.count == seenBufferSize {
evict := fs.ring[fs.cursor]
delete(fs.set, evict)
} else {
fs.count++
}
fs.ring[fs.cursor] = ts
fs.set[ts] = struct{}{}
fs.cursor = (fs.cursor + 1) % seenBufferSize

isOutOfOrder := fs.watermark > 0 && ts < fs.watermark
if isOutOfOrder {
return OutOfOrder
}

if ts > fs.watermark {
fs.watermark = ts
}

return Accept
}
Loading
Loading