55 "context"
66 "errors"
77 "fmt"
8+ "sort"
89 "sync"
910
1011 "github.com/rs/zerolog"
@@ -22,7 +23,7 @@ import (
2223// DARetriever defines the interface for retrieving events from the DA layer
2324type DARetriever interface {
2425 RetrieveFromDA (ctx context.Context , daHeight uint64 ) ([]common.DAHeightEvent , error )
25- Subscribe (ctx context.Context ) ( <- chan common.DAHeightEvent , error )
26+ Subscribe (ctx context.Context , ch chan common.DAHeightEvent ) error
2627}
2728
2829// daRetriever handles DA retrieval operations for syncing
@@ -78,65 +79,68 @@ func (r *daRetriever) RetrieveFromDA(ctx context.Context, daHeight uint64) ([]co
7879 return r .processBlobs (ctx , blobsResp .Data , daHeight ), nil
7980}
8081
81- // Subscribe subscribes to specific DA namespace and returns a channel of height events
82- func (r * daRetriever ) Subscribe (ctx context.Context ) (<- chan common.DAHeightEvent , error ) {
83- // Subscribe to header namespace
84- // Note: We currently only subscribe to header namespace.
85- // If header and data namespaces are different, we might need to subscribe to both or fetch data on demand.
86- // For now, assuming header subscription is sufficient to trigger catchup or process full blocks if namespaces are same.
87- // Actually, if we follow the "Subscribe" API from recent changes, we likely want to subscribe to *headers*.
88- // However, processBlobs expects blobs.
89- // Let's subscribe to the header namespace.
90-
91- // Use combined channel for output
92- outCh := make (chan common.DAHeightEvent , 100 )
93-
94- subCh , err := r .client .Subscribe (ctx , r .client .GetHeaderNamespace ())
82+ // Subscribe subscribes to specific DA namespace
83+ func (r * daRetriever ) Subscribe (ctx context.Context , outCh chan common.DAHeightEvent ) error {
84+ subChHeader , err := r .client .Subscribe (ctx , r .client .GetHeaderNamespace ())
9585 if err != nil {
96- return nil , fmt .Errorf ("failed to subscribe to headers: %w" , err )
86+ return fmt .Errorf ("subscribe to headers: %w" , err )
87+ }
88+
89+ var subChData <- chan datypes.ResultRetrieve
90+ if ! bytes .Equal (r .client .GetHeaderNamespace (), r .client .GetDataNamespace ()) {
91+ var err error
92+ subChData , err = r .client .Subscribe (ctx , r .client .GetDataNamespace ())
93+ if err != nil {
94+ return fmt .Errorf ("subscribe to data: %w" , err )
95+ }
9796 }
9897
9998 go func () {
10099 defer close (outCh )
101100 for {
101+ var blobs [][]byte
102+ var height uint64
103+ var errCode datypes.StatusCode
104+
102105 select {
103106 case <- ctx .Done ():
104107 return
105- case res , ok := <- subCh :
108+ case res , ok := <- subChHeader :
106109 if ! ok {
107110 return
108111 }
109- if res .Code != datypes .StatusSuccess {
110- r .logger .Error ().Uint64 ("code" , uint64 (res .Code )).Msg ("subscription error" )
112+ blobs = res .Data
113+ height = res .Height
114+ errCode = res .Code
115+ case res , ok := <- subChData :
116+ if subChData == nil {
111117 continue
112118 }
119+ if ! ok {
120+ return
121+ }
122+ blobs = res .Data
123+ height = res .Height
124+ errCode = res .Code
125+ }
113126
114- // We received some blobs (headers) via subscription.
115- // We process them similar to RetrieveFromDA, but we might need to fetch data if namespaces differ.
116- // If namespaces differ, data might not be in the subscription result.
117- // For the "Follow" case, we want to emit an event.
118- // If we have just headers, we might need to fetch data corresponding to these headers.
119-
120- // However, `processBlobs` handles pending headers/data.
121- // If we only get headers, they will be stored in pendingHeaders.
122- // If we need data, we might need to fetch it.
123-
124- // IMPORTANT: processBlobs takes "blobs" [][]byte. `res.Data` is []byte (aliases to Blob).
125- // So we can pass res.Data directly.
126-
127- events := r .processBlobs (ctx , res .Data , res .Height )
128- for _ , ev := range events {
129- select {
130- case <- ctx .Done ():
131- return
132- case outCh <- ev :
133- }
127+ if errCode != datypes .StatusSuccess {
128+ r .logger .Error ().Uint64 ("code" , uint64 (errCode )).Msg ("subscription error" )
129+ continue
130+ }
131+
132+ events := r .processBlobs (ctx , blobs , height )
133+ for _ , ev := range events {
134+ select {
135+ case <- ctx .Done ():
136+ return
137+ case outCh <- ev :
134138 }
135139 }
136140 }
137141 }()
138142
139- return outCh , nil
143+ return nil
140144}
141145
142146// fetchBlobs retrieves blobs from both header and data namespaces
@@ -279,18 +283,17 @@ func (r *daRetriever) processBlobs(ctx context.Context, blobs [][]byte, daHeight
279283 events = append (events , event )
280284 }
281285
286+ // Sort events by height to match execution order
287+ sort .Slice (events , func (i , j int ) bool {
288+ if events [i ].DaHeight != events [j ].DaHeight {
289+ return events [i ].DaHeight < events [j ].DaHeight
290+ }
291+ return events [i ].Header .Height () < events [j ].Header .Height ()
292+ })
293+
282294 if len (events ) > 0 {
283295 startHeight := events [0 ].Header .Height ()
284- endHeight := events [0 ].Header .Height ()
285- for _ , event := range events {
286- h := event .Header .Height ()
287- if h < startHeight {
288- startHeight = h
289- }
290- if h > endHeight {
291- endHeight = h
292- }
293- }
296+ endHeight := events [len (events )- 1 ].Header .Height ()
294297 r .logger .Info ().Uint64 ("da_height" , daHeight ).Uint64 ("start_height" , startHeight ).Uint64 ("end_height" , endHeight ).Msg ("processed blocks from DA" )
295298 }
296299
0 commit comments