diff --git a/pkg/backfill/db/source.go b/pkg/backfill/db/source.go index 36f8f6d..c448e22 100644 --- a/pkg/backfill/db/source.go +++ b/pkg/backfill/db/source.go @@ -2,6 +2,8 @@ package db import ( "context" + "encoding/hex" + "encoding/json" "fmt" "os" "path/filepath" @@ -184,12 +186,17 @@ func (s *Source) FetchHeight(_ context.Context, height uint64, namespaces []type return nil, nil, fmt.Errorf("height mismatch in block: got %d want %d", block.Height, height) } + rawHeader, err := buildMinimalRawHeader(height, block.Time, block.DataHash, meta.BlockHash) + if err != nil { + return nil, nil, fmt.Errorf("build minimal raw header at height %d: %w", height, err) + } + hdr := &types.Header{ Height: height, Hash: meta.BlockHash, DataHash: block.DataHash, Time: block.Time, - RawHeader: rawBlock, + RawHeader: rawHeader, } if len(namespaces) == 0 { @@ -710,6 +717,30 @@ func splitIntoParts(raw []byte, partSize int) [][]byte { return out } +// buildMinimalRawHeader synthesizes a small JSON object from the decoded block +// fields. The backfill source reads raw protobuf, so we build the JSON that +// consumers (ev-node) expect rather than storing the full protobuf blob. +func buildMinimalRawHeader(height uint64, t time.Time, dataHash, blockHash []byte) ([]byte, error) { + obj := map[string]any{ + "header": map[string]any{ + "height": fmt.Sprintf("%d", height), + "time": t.Format(time.RFC3339Nano), + "data_hash": hex.EncodeToString(dataHash), + }, + "commit": map[string]any{ + "height": fmt.Sprintf("%d", height), + "block_id": map[string]any{ + "hash": hex.EncodeToString(blockHash), + }, + }, + } + raw, err := json.Marshal(obj) + if err != nil { + return nil, fmt.Errorf("marshal minimal raw header: %w", err) + } + return raw, nil +} + func writeTestBlock(path, backend, layout string, height uint64, hash, dataHash []byte, t time.Time, txs [][]byte, partSize int) error { w, err := openWritable(path, backend) if err != nil { diff --git a/pkg/fetch/celestia_app.go b/pkg/fetch/celestia_app.go index b1b7de5..b2f45d8 100644 --- a/pkg/fetch/celestia_app.go +++ b/pkg/fetch/celestia_app.go @@ -213,9 +213,20 @@ func mapBlockResponse(blockID *cometpb.BlockID, block *cometpb.Block) (*types.He hdr := block.Header t := hdr.Time.AsTime() - raw, err := json.Marshal(block) + // Wrap in envelope matching the canonical shape used by celestia_node and + // backfill: {"header": ..., "commit": ...}. The gRPC response does not + // include a separate commit object, so we synthesize a minimal one from + // the block_id. + envelope := map[string]any{ + "header": hdr, + "commit": map[string]any{ + "height": fmt.Sprintf("%d", hdr.Height), + "block_id": blockID, + }, + } + raw, err := json.Marshal(envelope) if err != nil { - return nil, fmt.Errorf("marshal raw header: %w", err) + return nil, fmt.Errorf("marshal raw header envelope: %w", err) } return &types.Header{ diff --git a/pkg/fetch/celestia_node.go b/pkg/fetch/celestia_node.go index f7035bd..ddbfa78 100644 --- a/pkg/fetch/celestia_node.go +++ b/pkg/fetch/celestia_node.go @@ -400,10 +400,49 @@ func mapHeader(raw json.RawMessage) (*types.Header, error) { Hash: []byte(h.Commit.BlockID.Hash), DataHash: []byte(h.Header.DataHash), Time: t, - RawHeader: []byte(raw), + RawHeader: TrimRawHeader([]byte(raw)), }, nil } +// heavyHeaderKeys are top-level ExtendedHeader fields that are large and unused +// by any consumer. Removing them reduces per-header storage from ~87KB to ~2KB. +// +// - dah: Data Availability Header with row/column roots (~64KB, 73%) +// - validator_set: full validator public keys and voting power (~14KB, 16%) +// - commit.signatures: individual validator signatures (~10KB, 12%) +var heavyHeaderKeys = []string{"dah", "validator_set"} + +// TrimRawHeader removes large, unused fields from a Celestia ExtendedHeader JSON +// to reduce storage footprint. The remaining ~1KB "header" object plus "commit" +// metadata are sufficient for all known consumers (ev-node only reads the timestamp). +func TrimRawHeader(raw []byte) []byte { + var obj map[string]json.RawMessage + if err := json.Unmarshal(raw, &obj); err != nil { + return raw + } + + for _, key := range heavyHeaderKeys { + delete(obj, key) + } + + // Keep commit.block_id and commit.height but strip commit.signatures. + if commitRaw, ok := obj["commit"]; ok { + var commit map[string]json.RawMessage + if err := json.Unmarshal(commitRaw, &commit); err == nil { + delete(commit, "signatures") + if trimmed, err := json.Marshal(commit); err == nil { + obj["commit"] = trimmed + } + } + } + + trimmed, err := json.Marshal(obj) + if err != nil { + return raw + } + return trimmed +} + func mapBlobs(raw json.RawMessage, height uint64) ([]types.Blob, error) { // Celestia returns null/empty for no blobs. if len(raw) == 0 || string(raw) == "null" { diff --git a/pkg/fetch/celestia_node_test.go b/pkg/fetch/celestia_node_test.go index 376fa74..a89bb32 100644 --- a/pkg/fetch/celestia_node_test.go +++ b/pkg/fetch/celestia_node_test.go @@ -231,6 +231,83 @@ func TestJsonInt64(t *testing.T) { } } +func TestTrimRawHeader(t *testing.T) { + tests := []struct { + name string + input []byte + expectedPresent []string // top-level keys that must exist + expectedAbsent []string // top-level keys that must not exist + checkCommit bool // if true, verify commit internals + expectUnchanged bool // if true, output must equal input + expectSmaller bool // if true, output must be smaller than input + }{ + { + name: "valid header trims heavy fields", + input: []byte(`{ + "header": {"height":"100","time":"2025-01-01T00:00:00Z","chain_id":"test"}, + "dah": {"row_roots":["AAAA","BBBB"],"column_roots":["CCCC","DDDD"]}, + "validator_set": {"validators":[{"address":"abc","pub_key":{"type":"ed25519","value":"xxx"}}]}, + "commit": {"height":"100","block_id":{"hash":"1234"},"signatures":[{"validator_address":"abc","signature":"sig"}]} + }`), + expectedPresent: []string{"header", "commit"}, + expectedAbsent: []string{"dah", "validator_set"}, + checkCommit: true, + expectSmaller: true, + }, + { + name: "invalid JSON returned as-is", + input: []byte(`not json`), + expectUnchanged: true, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + trimmed := TrimRawHeader(tt.input) + + if tt.expectUnchanged { + if string(trimmed) != string(tt.input) { + t.Error("expected output to equal input") + } + return + } + + var obj map[string]json.RawMessage + if err := json.Unmarshal(trimmed, &obj); err != nil { + t.Fatalf("unmarshal trimmed: %v", err) + } + + for _, key := range tt.expectedPresent { + if _, ok := obj[key]; !ok { + t.Errorf("%s should be preserved", key) + } + } + for _, key := range tt.expectedAbsent { + if _, ok := obj[key]; ok { + t.Errorf("%s should be removed", key) + } + } + + if tt.checkCommit { + var commit map[string]json.RawMessage + if err := json.Unmarshal(obj["commit"], &commit); err != nil { + t.Fatalf("unmarshal commit: %v", err) + } + if _, ok := commit["block_id"]; !ok { + t.Error("commit.block_id should be preserved") + } + if _, ok := commit["signatures"]; ok { + t.Error("commit.signatures should be removed") + } + } + + if tt.expectSmaller && len(trimmed) >= len(tt.input) { + t.Errorf("trimmed (%d bytes) should be smaller than original (%d bytes)", len(trimmed), len(tt.input)) + } + }) + } +} + func TestHexBytes(t *testing.T) { tests := []struct { input string