Skip to content
2 changes: 1 addition & 1 deletion apps/evm/cmd/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ var RunCmd = &cobra.Command{
return err
}

blobClient, err := blobrpc.NewClient(context.Background(), nodeConfig.DA.Address, nodeConfig.DA.AuthToken, "")
blobClient, err := blobrpc.NewWSClient(context.Background(), nodeConfig.DA.Address, nodeConfig.DA.AuthToken, "")
if err != nil {
return fmt.Errorf("failed to create blob client: %w", err)
}
Comment on lines +63 to 66
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Use command-scoped context for DA WS dialing.

Line 63 uses context.Background(), which disconnects the WS dial from CLI cancellation/shutdown flow. Pass cmd.Context() (or a derived context) instead.

Suggested fix
-		blobClient, err := blobrpc.NewWSClient(context.Background(), nodeConfig.DA.Address, nodeConfig.DA.AuthToken, "")
+		blobClient, err := blobrpc.NewWSClient(cmd.Context(), nodeConfig.DA.Address, nodeConfig.DA.AuthToken, "")

As per coding guidelines, "Use context.Context for cancellation".

📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
blobClient, err := blobrpc.NewWSClient(context.Background(), nodeConfig.DA.Address, nodeConfig.DA.AuthToken, "")
if err != nil {
return fmt.Errorf("failed to create blob client: %w", err)
}
blobClient, err := blobrpc.NewWSClient(cmd.Context(), nodeConfig.DA.Address, nodeConfig.DA.AuthToken, "")
if err != nil {
return fmt.Errorf("failed to create blob client: %w", err)
}
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@apps/evm/cmd/run.go` around lines 63 - 66, The WS dial is using
context.Background() when calling blobrpc.NewWSClient which prevents CLI
cancellation from propagating; replace the background context with the
command-scoped context (use cmd.Context() or a derived context) when calling
blobrpc.NewWSClient so that the DA WebSocket handshake is canceled on CLI
shutdown and respects timeouts; update the call site where
blobrpc.NewWSClient(context.Background(), nodeConfig.DA.Address,
nodeConfig.DA.AuthToken, "") is invoked to pass cmd.Context() (or ctx :=
cmd.Context() / ctx, cancel := context.WithTimeout(cmd.Context(), ...) if you
need a timeout) instead.

Expand Down
7 changes: 7 additions & 0 deletions apps/evm/server/force_inclusion_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,13 @@ func (m *mockDA) Get(ctx context.Context, ids []da.ID, namespace []byte) ([]da.B
return nil, nil
}

func (m *mockDA) Subscribe(_ context.Context, _ []byte) (<-chan da.SubscriptionEvent, error) {
// Not needed in these tests; return a closed channel.
ch := make(chan da.SubscriptionEvent)
close(ch)
return ch, nil
}

func (m *mockDA) Validate(ctx context.Context, ids []da.ID, proofs []da.Proof, namespace []byte) ([]bool, error) {
return nil, nil
}
Expand Down
2 changes: 1 addition & 1 deletion apps/grpc/cmd/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ func createSequencer(
genesis genesis.Genesis,
executor execution.Executor,
) (coresequencer.Sequencer, error) {
blobClient, err := blobrpc.NewClient(ctx, nodeConfig.DA.Address, nodeConfig.DA.AuthToken, "")
blobClient, err := blobrpc.NewWSClient(ctx, nodeConfig.DA.Address, nodeConfig.DA.AuthToken, "")
if err != nil {
return nil, fmt.Errorf("failed to create blob client: %w", err)
}
Expand Down
2 changes: 1 addition & 1 deletion apps/testapp/cmd/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ func createSequencer(
genesis genesis.Genesis,
executor execution.Executor,
) (coresequencer.Sequencer, error) {
blobClient, err := blobrpc.NewClient(ctx, nodeConfig.DA.Address, nodeConfig.DA.AuthToken, "")
blobClient, err := blobrpc.NewWSClient(ctx, nodeConfig.DA.Address, nodeConfig.DA.AuthToken, "")
if err != nil {
return nil, fmt.Errorf("failed to create blob client: %w", err)
}
Expand Down
65 changes: 65 additions & 0 deletions block/internal/da/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -350,6 +350,71 @@ func (c *client) HasForcedInclusionNamespace() bool {
return c.hasForcedNamespace
}

// Subscribe subscribes to blobs in the given namespace via the celestia-node
// Subscribe API. It returns a channel that emits a SubscriptionEvent for every
// DA block containing a matching blob. The channel is closed when ctx is
// cancelled. The caller must drain the channel after cancellation to avoid
// goroutine leaks.
func (c *client) Subscribe(ctx context.Context, namespace []byte) (<-chan datypes.SubscriptionEvent, error) {
ns, err := share.NewNamespaceFromBytes(namespace)
if err != nil {
return nil, fmt.Errorf("invalid namespace: %w", err)
}

rawCh, err := c.blobAPI.Subscribe(ctx, ns)
if err != nil {
return nil, fmt.Errorf("blob subscribe: %w", err)
}

out := make(chan datypes.SubscriptionEvent, 16)
go func() {
defer close(out)
for {
select {
case <-ctx.Done():
return
case resp, ok := <-rawCh:
if !ok {
return
}
if resp == nil {
continue
}
select {
case out <- datypes.SubscriptionEvent{
Height: resp.Height,
Blobs: extractBlobData(resp),
}:
case <-ctx.Done():
return
}
}
}
}()

return out, nil
}

// extractBlobData extracts raw byte slices from a subscription response,
// filtering out nil blobs, empty data, and blobs exceeding DefaultMaxBlobSize.
func extractBlobData(resp *blobrpc.SubscriptionResponse) [][]byte {
if resp == nil || len(resp.Blobs) == 0 {
return nil
}
blobs := make([][]byte, 0, len(resp.Blobs))
for _, blob := range resp.Blobs {
if blob == nil {
continue
}
data := blob.Data()
if len(data) == 0 || len(data) > common.DefaultMaxBlobSize {
continue
}
blobs = append(blobs, data)
}
return blobs
}

// Get fetches blobs by their IDs. Used for visualization and fetching specific blobs.
func (c *client) Get(ctx context.Context, ids []datypes.ID, namespace []byte) ([]datypes.Blob, error) {
if len(ids) == 0 {
Expand Down
5 changes: 5 additions & 0 deletions block/internal/da/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,11 @@ type Client interface {
// Get retrieves blobs by their IDs. Used for visualization and fetching specific blobs.
Get(ctx context.Context, ids []datypes.ID, namespace []byte) ([]datypes.Blob, error)

// Subscribe returns a channel that emits one SubscriptionEvent per DA block
// that contains a blob in the given namespace. The channel is closed when ctx
// is cancelled. Callers MUST drain the channel after cancellation.
Subscribe(ctx context.Context, namespace []byte) (<-chan datypes.SubscriptionEvent, error)

// GetLatestDAHeight returns the latest height available on the DA layer.
GetLatestDAHeight(ctx context.Context) (uint64, error)

Expand Down
3 changes: 3 additions & 0 deletions block/internal/da/tracing.go
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,9 @@ func (t *tracedClient) GetForcedInclusionNamespace() []byte {
func (t *tracedClient) HasForcedInclusionNamespace() bool {
return t.inner.HasForcedInclusionNamespace()
}
func (t *tracedClient) Subscribe(ctx context.Context, namespace []byte) (<-chan datypes.SubscriptionEvent, error) {
return t.inner.Subscribe(ctx, namespace)
}

type submitError struct{ msg string }

Expand Down
8 changes: 8 additions & 0 deletions block/internal/da/tracing_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,14 @@ type mockFullClient struct {
getFn func(ctx context.Context, ids []datypes.ID, namespace []byte) ([]datypes.Blob, error)
getProofsFn func(ctx context.Context, ids []datypes.ID, namespace []byte) ([]datypes.Proof, error)
validateFn func(ctx context.Context, ids []datypes.ID, proofs []datypes.Proof, namespace []byte) ([]bool, error)
subscribeFn func(ctx context.Context, namespace []byte) (<-chan datypes.SubscriptionEvent, error)
}

func (m *mockFullClient) Subscribe(ctx context.Context, namespace []byte) (<-chan datypes.SubscriptionEvent, error) {
if m.subscribeFn == nil {
panic("not expected to be called")
}
return m.subscribeFn(ctx, namespace)
}

func (m *mockFullClient) Submit(ctx context.Context, data [][]byte, gasPrice float64, namespace []byte, options []byte) datypes.ResultSubmit {
Expand Down
Loading
Loading