@@ -36,6 +36,11 @@ type DARetriever struct {
3636 // calculate namespaces bytes once and reuse them
3737 namespaceBz []byte
3838 namespaceDataBz []byte
39+
40+ // todo: ensure that these indexes are filled on restart
41+ pendingHeaders map [uint64 ]* types.SignedHeader
42+ pendingData map [uint64 ]* types.Data
43+ headerDAHeights map [uint64 ]uint64
3944}
4045
4146// NewDARetriever creates a new DA retriever
@@ -55,6 +60,9 @@ func NewDARetriever(
5560 logger : logger .With ().Str ("component" , "da_retriever" ).Logger (),
5661 namespaceBz : coreda .NamespaceFromString (config .DA .GetNamespace ()).Bytes (),
5762 namespaceDataBz : coreda .NamespaceFromString (config .DA .GetDataNamespace ()).Bytes (),
63+ pendingHeaders : make (map [uint64 ]* types.SignedHeader ),
64+ pendingData : make (map [uint64 ]* types.Data ),
65+ headerDAHeights : make (map [uint64 ]uint64 ), // Track DA height for each header
5866 }
5967}
6068
@@ -168,49 +176,53 @@ func (r *DARetriever) validateBlobResponse(res coreda.ResultRetrieve, daHeight u
168176
169177// processBlobs processes retrieved blobs to extract headers and data and returns height events
170178func (r * DARetriever ) processBlobs (ctx context.Context , blobs [][]byte , daHeight uint64 ) []common.DAHeightEvent {
171- headers := make (map [uint64 ]* types.SignedHeader )
172- dataMap := make (map [uint64 ]* types.Data )
173- headerDAHeights := make (map [uint64 ]uint64 ) // Track DA height for each header
174-
175179 // Decode all blobs
176180 for _ , bz := range blobs {
177181 if len (bz ) == 0 {
178182 continue
179183 }
180184
181185 if header := r .tryDecodeHeader (bz , daHeight ); header != nil {
182- headers [header .Height ()] = header
183- headerDAHeights [header .Height ()] = daHeight
186+ r . pendingHeaders [header .Height ()] = header
187+ r . headerDAHeights [header .Height ()] = daHeight
184188 continue
185189 }
186190
187191 if data := r .tryDecodeData (bz , daHeight ); data != nil {
188- dataMap [data .Height ()] = data
192+ r . pendingData [data .Height ()] = data
189193 }
190194 }
191195
192196 var events []common.DAHeightEvent
193197
194198 // Match headers with data and create events
195- for height , header := range headers {
196- data := dataMap [height ]
199+ for height , header := range r .pendingHeaders {
200+ data := r .pendingData [height ]
201+ includedHeight := r .headerDAHeights [height ]
197202
198203 // Handle empty data case
199204 if data == nil {
200205 if r .isEmptyDataExpected (header ) {
201206 data = r .createEmptyDataForHeader (ctx , header )
207+ delete (r .pendingHeaders , height )
208+ delete (r .headerDAHeights , height )
202209 } else {
210+ // keep header in pending headers until data lands
203211 r .logger .Debug ().Uint64 ("height" , height ).Msg ("header found but no matching data" )
204212 continue
205213 }
214+ } else {
215+ delete (r .pendingHeaders , height )
216+ delete (r .pendingData , height )
217+ delete (r .headerDAHeights , height )
206218 }
207219
208220 // Create height event
209221 event := common.DAHeightEvent {
210222 Header : header ,
211223 Data : data ,
212224 DaHeight : daHeight ,
213- HeaderDaIncludedHeight : headerDAHeights [ height ] ,
225+ HeaderDaIncludedHeight : includedHeight ,
214226 }
215227
216228 events = append (events , event )
0 commit comments