Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 32 additions & 1 deletion pkg/backfill/db/source.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@ package db

import (
"context"
"encoding/hex"
"encoding/json"
"fmt"
"os"
"path/filepath"
Expand Down Expand Up @@ -184,12 +186,17 @@ func (s *Source) FetchHeight(_ context.Context, height uint64, namespaces []type
return nil, nil, fmt.Errorf("height mismatch in block: got %d want %d", block.Height, height)
}

rawHeader, err := buildMinimalRawHeader(height, block.Time, block.DataHash, meta.BlockHash)
if err != nil {
return nil, nil, fmt.Errorf("build minimal raw header at height %d: %w", height, err)
}

hdr := &types.Header{
Height: height,
Hash: meta.BlockHash,
DataHash: block.DataHash,
Time: block.Time,
RawHeader: rawBlock,
RawHeader: rawHeader,
}

if len(namespaces) == 0 {
Expand Down Expand Up @@ -710,6 +717,30 @@ func splitIntoParts(raw []byte, partSize int) [][]byte {
return out
}

// buildMinimalRawHeader synthesizes a small JSON object from the decoded block
// fields. The backfill source reads raw protobuf, so we build the JSON that
// consumers (ev-node) expect rather than storing the full protobuf blob.
func buildMinimalRawHeader(height uint64, t time.Time, dataHash, blockHash []byte) ([]byte, error) {
obj := map[string]any{
"header": map[string]any{
"height": fmt.Sprintf("%d", height),
"time": t.Format(time.RFC3339Nano),
"data_hash": hex.EncodeToString(dataHash),
},
"commit": map[string]any{
"height": fmt.Sprintf("%d", height),
"block_id": map[string]any{
"hash": hex.EncodeToString(blockHash),
},
},
}
raw, err := json.Marshal(obj)
if err != nil {
return nil, fmt.Errorf("marshal minimal raw header: %w", err)
}
return raw, nil
}

func writeTestBlock(path, backend, layout string, height uint64, hash, dataHash []byte, t time.Time, txs [][]byte, partSize int) error {
w, err := openWritable(path, backend)
if err != nil {
Expand Down
15 changes: 13 additions & 2 deletions pkg/fetch/celestia_app.go
Original file line number Diff line number Diff line change
Expand Up @@ -213,9 +213,20 @@ func mapBlockResponse(blockID *cometpb.BlockID, block *cometpb.Block) (*types.He
hdr := block.Header
t := hdr.Time.AsTime()

raw, err := json.Marshal(block)
// Wrap in envelope matching the canonical shape used by celestia_node and
// backfill: {"header": ..., "commit": ...}. The gRPC response does not
// include a separate commit object, so we synthesize a minimal one from
// the block_id.
envelope := map[string]any{
"header": hdr,
"commit": map[string]any{
"height": fmt.Sprintf("%d", hdr.Height),
"block_id": blockID,
},
}
raw, err := json.Marshal(envelope)
if err != nil {
return nil, fmt.Errorf("marshal raw header: %w", err)
return nil, fmt.Errorf("marshal raw header envelope: %w", err)
}

return &types.Header{
Expand Down
41 changes: 40 additions & 1 deletion pkg/fetch/celestia_node.go
Original file line number Diff line number Diff line change
Expand Up @@ -400,10 +400,49 @@ func mapHeader(raw json.RawMessage) (*types.Header, error) {
Hash: []byte(h.Commit.BlockID.Hash),
DataHash: []byte(h.Header.DataHash),
Time: t,
RawHeader: []byte(raw),
RawHeader: TrimRawHeader([]byte(raw)),
}, nil
}

// heavyHeaderKeys are top-level ExtendedHeader fields that are large and unused
// by any consumer. Removing them reduces per-header storage from ~87KB to ~2KB.
//
// - dah: Data Availability Header with row/column roots (~64KB, 73%)
// - validator_set: full validator public keys and voting power (~14KB, 16%)
// - commit.signatures: individual validator signatures (~10KB, 12%)
var heavyHeaderKeys = []string{"dah", "validator_set"}

// TrimRawHeader removes large, unused fields from a Celestia ExtendedHeader JSON
// to reduce storage footprint. The remaining ~1KB "header" object plus "commit"
// metadata are sufficient for all known consumers (ev-node only reads the timestamp).
func TrimRawHeader(raw []byte) []byte {
var obj map[string]json.RawMessage
if err := json.Unmarshal(raw, &obj); err != nil {
return raw
}

for _, key := range heavyHeaderKeys {
delete(obj, key)
}

// Keep commit.block_id and commit.height but strip commit.signatures.
if commitRaw, ok := obj["commit"]; ok {
var commit map[string]json.RawMessage
if err := json.Unmarshal(commitRaw, &commit); err == nil {
delete(commit, "signatures")
if trimmed, err := json.Marshal(commit); err == nil {
obj["commit"] = trimmed
}
}
}

trimmed, err := json.Marshal(obj)
if err != nil {
return raw
}
return trimmed
}

func mapBlobs(raw json.RawMessage, height uint64) ([]types.Blob, error) {
// Celestia returns null/empty for no blobs.
if len(raw) == 0 || string(raw) == "null" {
Expand Down
77 changes: 77 additions & 0 deletions pkg/fetch/celestia_node_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -231,6 +231,83 @@ func TestJsonInt64(t *testing.T) {
}
}

func TestTrimRawHeader(t *testing.T) {
tests := []struct {
name string
input []byte
expectedPresent []string // top-level keys that must exist
expectedAbsent []string // top-level keys that must not exist
checkCommit bool // if true, verify commit internals
expectUnchanged bool // if true, output must equal input
expectSmaller bool // if true, output must be smaller than input
}{
{
name: "valid header trims heavy fields",
input: []byte(`{
"header": {"height":"100","time":"2025-01-01T00:00:00Z","chain_id":"test"},
"dah": {"row_roots":["AAAA","BBBB"],"column_roots":["CCCC","DDDD"]},
"validator_set": {"validators":[{"address":"abc","pub_key":{"type":"ed25519","value":"xxx"}}]},
"commit": {"height":"100","block_id":{"hash":"1234"},"signatures":[{"validator_address":"abc","signature":"sig"}]}
}`),
expectedPresent: []string{"header", "commit"},
expectedAbsent: []string{"dah", "validator_set"},
checkCommit: true,
expectSmaller: true,
},
{
name: "invalid JSON returned as-is",
input: []byte(`not json`),
expectUnchanged: true,
},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
trimmed := TrimRawHeader(tt.input)

if tt.expectUnchanged {
if string(trimmed) != string(tt.input) {
t.Error("expected output to equal input")
}
return
}

var obj map[string]json.RawMessage
if err := json.Unmarshal(trimmed, &obj); err != nil {
t.Fatalf("unmarshal trimmed: %v", err)
}

for _, key := range tt.expectedPresent {
if _, ok := obj[key]; !ok {
t.Errorf("%s should be preserved", key)
}
}
for _, key := range tt.expectedAbsent {
if _, ok := obj[key]; ok {
t.Errorf("%s should be removed", key)
}
}

if tt.checkCommit {
var commit map[string]json.RawMessage
if err := json.Unmarshal(obj["commit"], &commit); err != nil {
t.Fatalf("unmarshal commit: %v", err)
}
if _, ok := commit["block_id"]; !ok {
t.Error("commit.block_id should be preserved")
}
if _, ok := commit["signatures"]; ok {
t.Error("commit.signatures should be removed")
}
}

if tt.expectSmaller && len(trimmed) >= len(tt.input) {
t.Errorf("trimmed (%d bytes) should be smaller than original (%d bytes)", len(trimmed), len(tt.input))
}
})
}
}

func TestHexBytes(t *testing.T) {
tests := []struct {
input string
Expand Down