Skip to content

Commit c8b5ebc

Browse files
tac0turtleclaude
andauthored
feat: trim heavy fields from raw_header before storage (#37)
* feat: trim heavy fields from raw_header before storage Strip dah (73%), validator_set (16%), and commit.signatures (12%) from Celestia ExtendedHeader JSON before persisting to SQLite. No consumer reads these fields — ev-node only extracts the timestamp. Reduces per-header storage from ~87KB to ~2KB (~98% reduction). Applied to all three ingestion paths: celestia-node JSON-RPC, celestia-app gRPC, and direct blockstore DB backfill. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com> * fix: address PR review comments - Move gorilla/websocket to indirect dep (not imported by committed code) - Propagate marshal error from buildMinimalRawHeader instead of returning nil - Standardize celestia_app RawHeader to canonical envelope shape matching celestia_node and backfill sources - Consolidate TrimRawHeader tests into table-driven structure Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com> --------- Co-authored-by: Claude Opus 4.6 <noreply@anthropic.com>
1 parent 87217cd commit c8b5ebc

4 files changed

Lines changed: 162 additions & 4 deletions

File tree

pkg/backfill/db/source.go

Lines changed: 32 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,8 @@ package db
22

33
import (
44
"context"
5+
"encoding/hex"
6+
"encoding/json"
57
"fmt"
68
"os"
79
"path/filepath"
@@ -184,12 +186,17 @@ func (s *Source) FetchHeight(_ context.Context, height uint64, namespaces []type
184186
return nil, nil, fmt.Errorf("height mismatch in block: got %d want %d", block.Height, height)
185187
}
186188

189+
rawHeader, err := buildMinimalRawHeader(height, block.Time, block.DataHash, meta.BlockHash)
190+
if err != nil {
191+
return nil, nil, fmt.Errorf("build minimal raw header at height %d: %w", height, err)
192+
}
193+
187194
hdr := &types.Header{
188195
Height: height,
189196
Hash: meta.BlockHash,
190197
DataHash: block.DataHash,
191198
Time: block.Time,
192-
RawHeader: rawBlock,
199+
RawHeader: rawHeader,
193200
}
194201

195202
if len(namespaces) == 0 {
@@ -710,6 +717,30 @@ func splitIntoParts(raw []byte, partSize int) [][]byte {
710717
return out
711718
}
712719

720+
// buildMinimalRawHeader synthesizes a small JSON object from the decoded block
721+
// fields. The backfill source reads raw protobuf, so we build the JSON that
722+
// consumers (ev-node) expect rather than storing the full protobuf blob.
723+
func buildMinimalRawHeader(height uint64, t time.Time, dataHash, blockHash []byte) ([]byte, error) {
724+
obj := map[string]any{
725+
"header": map[string]any{
726+
"height": fmt.Sprintf("%d", height),
727+
"time": t.Format(time.RFC3339Nano),
728+
"data_hash": hex.EncodeToString(dataHash),
729+
},
730+
"commit": map[string]any{
731+
"height": fmt.Sprintf("%d", height),
732+
"block_id": map[string]any{
733+
"hash": hex.EncodeToString(blockHash),
734+
},
735+
},
736+
}
737+
raw, err := json.Marshal(obj)
738+
if err != nil {
739+
return nil, fmt.Errorf("marshal minimal raw header: %w", err)
740+
}
741+
return raw, nil
742+
}
743+
713744
func writeTestBlock(path, backend, layout string, height uint64, hash, dataHash []byte, t time.Time, txs [][]byte, partSize int) error {
714745
w, err := openWritable(path, backend)
715746
if err != nil {

pkg/fetch/celestia_app.go

Lines changed: 13 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -213,9 +213,20 @@ func mapBlockResponse(blockID *cometpb.BlockID, block *cometpb.Block) (*types.He
213213
hdr := block.Header
214214
t := hdr.Time.AsTime()
215215

216-
raw, err := json.Marshal(block)
216+
// Wrap in envelope matching the canonical shape used by celestia_node and
217+
// backfill: {"header": ..., "commit": ...}. The gRPC response does not
218+
// include a separate commit object, so we synthesize a minimal one from
219+
// the block_id.
220+
envelope := map[string]any{
221+
"header": hdr,
222+
"commit": map[string]any{
223+
"height": fmt.Sprintf("%d", hdr.Height),
224+
"block_id": blockID,
225+
},
226+
}
227+
raw, err := json.Marshal(envelope)
217228
if err != nil {
218-
return nil, fmt.Errorf("marshal raw header: %w", err)
229+
return nil, fmt.Errorf("marshal raw header envelope: %w", err)
219230
}
220231

221232
return &types.Header{

pkg/fetch/celestia_node.go

Lines changed: 40 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -400,10 +400,49 @@ func mapHeader(raw json.RawMessage) (*types.Header, error) {
400400
Hash: []byte(h.Commit.BlockID.Hash),
401401
DataHash: []byte(h.Header.DataHash),
402402
Time: t,
403-
RawHeader: []byte(raw),
403+
RawHeader: TrimRawHeader([]byte(raw)),
404404
}, nil
405405
}
406406

407+
// heavyHeaderKeys are top-level ExtendedHeader fields that are large and unused
408+
// by any consumer. Removing them reduces per-header storage from ~87KB to ~2KB.
409+
//
410+
// - dah: Data Availability Header with row/column roots (~64KB, 73%)
411+
// - validator_set: full validator public keys and voting power (~14KB, 16%)
412+
// - commit.signatures: individual validator signatures (~10KB, 12%)
413+
var heavyHeaderKeys = []string{"dah", "validator_set"}
414+
415+
// TrimRawHeader removes large, unused fields from a Celestia ExtendedHeader JSON
416+
// to reduce storage footprint. The remaining ~1KB "header" object plus "commit"
417+
// metadata are sufficient for all known consumers (ev-node only reads the timestamp).
418+
func TrimRawHeader(raw []byte) []byte {
419+
var obj map[string]json.RawMessage
420+
if err := json.Unmarshal(raw, &obj); err != nil {
421+
return raw
422+
}
423+
424+
for _, key := range heavyHeaderKeys {
425+
delete(obj, key)
426+
}
427+
428+
// Keep commit.block_id and commit.height but strip commit.signatures.
429+
if commitRaw, ok := obj["commit"]; ok {
430+
var commit map[string]json.RawMessage
431+
if err := json.Unmarshal(commitRaw, &commit); err == nil {
432+
delete(commit, "signatures")
433+
if trimmed, err := json.Marshal(commit); err == nil {
434+
obj["commit"] = trimmed
435+
}
436+
}
437+
}
438+
439+
trimmed, err := json.Marshal(obj)
440+
if err != nil {
441+
return raw
442+
}
443+
return trimmed
444+
}
445+
407446
func mapBlobs(raw json.RawMessage, height uint64) ([]types.Blob, error) {
408447
// Celestia returns null/empty for no blobs.
409448
if len(raw) == 0 || string(raw) == "null" {

pkg/fetch/celestia_node_test.go

Lines changed: 77 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -231,6 +231,83 @@ func TestJsonInt64(t *testing.T) {
231231
}
232232
}
233233

234+
func TestTrimRawHeader(t *testing.T) {
235+
tests := []struct {
236+
name string
237+
input []byte
238+
expectedPresent []string // top-level keys that must exist
239+
expectedAbsent []string // top-level keys that must not exist
240+
checkCommit bool // if true, verify commit internals
241+
expectUnchanged bool // if true, output must equal input
242+
expectSmaller bool // if true, output must be smaller than input
243+
}{
244+
{
245+
name: "valid header trims heavy fields",
246+
input: []byte(`{
247+
"header": {"height":"100","time":"2025-01-01T00:00:00Z","chain_id":"test"},
248+
"dah": {"row_roots":["AAAA","BBBB"],"column_roots":["CCCC","DDDD"]},
249+
"validator_set": {"validators":[{"address":"abc","pub_key":{"type":"ed25519","value":"xxx"}}]},
250+
"commit": {"height":"100","block_id":{"hash":"1234"},"signatures":[{"validator_address":"abc","signature":"sig"}]}
251+
}`),
252+
expectedPresent: []string{"header", "commit"},
253+
expectedAbsent: []string{"dah", "validator_set"},
254+
checkCommit: true,
255+
expectSmaller: true,
256+
},
257+
{
258+
name: "invalid JSON returned as-is",
259+
input: []byte(`not json`),
260+
expectUnchanged: true,
261+
},
262+
}
263+
264+
for _, tt := range tests {
265+
t.Run(tt.name, func(t *testing.T) {
266+
trimmed := TrimRawHeader(tt.input)
267+
268+
if tt.expectUnchanged {
269+
if string(trimmed) != string(tt.input) {
270+
t.Error("expected output to equal input")
271+
}
272+
return
273+
}
274+
275+
var obj map[string]json.RawMessage
276+
if err := json.Unmarshal(trimmed, &obj); err != nil {
277+
t.Fatalf("unmarshal trimmed: %v", err)
278+
}
279+
280+
for _, key := range tt.expectedPresent {
281+
if _, ok := obj[key]; !ok {
282+
t.Errorf("%s should be preserved", key)
283+
}
284+
}
285+
for _, key := range tt.expectedAbsent {
286+
if _, ok := obj[key]; ok {
287+
t.Errorf("%s should be removed", key)
288+
}
289+
}
290+
291+
if tt.checkCommit {
292+
var commit map[string]json.RawMessage
293+
if err := json.Unmarshal(obj["commit"], &commit); err != nil {
294+
t.Fatalf("unmarshal commit: %v", err)
295+
}
296+
if _, ok := commit["block_id"]; !ok {
297+
t.Error("commit.block_id should be preserved")
298+
}
299+
if _, ok := commit["signatures"]; ok {
300+
t.Error("commit.signatures should be removed")
301+
}
302+
}
303+
304+
if tt.expectSmaller && len(trimmed) >= len(tt.input) {
305+
t.Errorf("trimmed (%d bytes) should be smaller than original (%d bytes)", len(trimmed), len(tt.input))
306+
}
307+
})
308+
}
309+
}
310+
234311
func TestHexBytes(t *testing.T) {
235312
tests := []struct {
236313
input string

0 commit comments

Comments
 (0)