diff --git a/.mockery.yaml b/.mockery.yaml index b5e092cd9..83c9d2968 100644 --- a/.mockery.yaml +++ b/.mockery.yaml @@ -30,6 +30,11 @@ packages: dir: ./test/mocks pkgname: mocks filename: store.go + Batch: + config: + dir: ./test/mocks + pkgname: mocks + filename: batch.go github.com/celestiaorg/go-header: interfaces: Store: diff --git a/block/internal/cache/pending_data.go b/block/internal/cache/pending_data.go index 33cff58d6..9e228d254 100644 --- a/block/internal/cache/pending_data.go +++ b/block/internal/cache/pending_data.go @@ -2,6 +2,7 @@ package cache import ( "context" + "errors" "fmt" "github.com/rs/zerolog" @@ -29,8 +30,17 @@ type PendingData struct { base *pendingBase[*types.Data] } +var errInFlightData = errors.New("inflight data") + func fetchData(ctx context.Context, store store.Store, height uint64) (*types.Data, error) { _, data, err := store.GetBlockData(ctx, height) + if err != nil { + return nil, err + } + // in the executor, WIP data is temporary stored. skip them until the process is completed + if data.Height() == 0 { + return nil, errInFlightData + } return data, err } diff --git a/block/internal/cache/pending_headers.go b/block/internal/cache/pending_headers.go index 712d18cd6..b28b28ab5 100644 --- a/block/internal/cache/pending_headers.go +++ b/block/internal/cache/pending_headers.go @@ -2,6 +2,7 @@ package cache import ( "context" + "errors" "fmt" "github.com/rs/zerolog" @@ -26,8 +27,17 @@ type PendingHeaders struct { base *pendingBase[*types.SignedHeader] } +var errInFlightHeader = errors.New("inflight header") + func fetchSignedHeader(ctx context.Context, store storepkg.Store, height uint64) (*types.SignedHeader, error) { header, err := store.GetHeader(ctx, height) + if err != nil { + return nil, err + } + // in the executor, WIP headers are temporary stored. skip them until the process is completed + if header.Height() == 0 { + return nil, errInFlightHeader + } return header, err } diff --git a/block/internal/executing/executor.go b/block/internal/executing/executor.go index 621b507a3..86a3d0eb0 100644 --- a/block/internal/executing/executor.go +++ b/block/internal/executing/executor.go @@ -227,6 +227,9 @@ func (e *Executor) initializeState() error { if err := batch.SetHeight(state.LastBlockHeight); err != nil { return fmt.Errorf("failed to set store height: %w", err) } + if err := batch.UpdateState(state); err != nil { + return fmt.Errorf("failed to update state: %w", err) + } if err := batch.Commit(); err != nil { return fmt.Errorf("failed to commit batch: %w", err) } @@ -236,7 +239,8 @@ func (e *Executor) initializeState() error { // Sync execution layer with store on startup execReplayer := common.NewReplayer(e.store, e.exec, e.genesis, e.logger) - if err := execReplayer.SyncToHeight(e.ctx, state.LastBlockHeight); err != nil { + syncTargetHeight := state.LastBlockHeight + if err := execReplayer.SyncToHeight(e.ctx, syncTargetHeight); err != nil { e.sendCriticalError(fmt.Errorf("failed to sync execution layer: %w", err)) return fmt.Errorf("failed to sync execution layer: %w", err) } @@ -281,7 +285,7 @@ func (e *Executor) executionLoop() { } txsAvailable := false - for { + for e.ctx.Err() == nil { select { case <-e.ctx.Done(): return @@ -316,6 +320,10 @@ func (e *Executor) executionLoop() { // ProduceBlock creates, validates, and stores a new block. func (e *Executor) ProduceBlock(ctx context.Context) error { + if ctx.Err() != nil { + return ctx.Err() + } + start := time.Now() defer func() { if e.metrics.OperationDuration["block_production"] != nil { @@ -582,7 +590,7 @@ func (e *Executor) CreateBlock(ctx context.Context, height uint64, batchData *Ba } for i, tx := range batchData.Transactions { - data.Txs[i] = types.Tx(tx) + data.Txs[i] = tx } // Set data hash diff --git a/block/internal/submitting/submitter.go b/block/internal/submitting/submitter.go index 0c70db978..e6a2328e9 100644 --- a/block/internal/submitting/submitter.go +++ b/block/internal/submitting/submitter.go @@ -128,6 +128,7 @@ func (s *Submitter) Start(ctx context.Context) error { // Start DA submission loop if signer is available (aggregator nodes only) if s.signer != nil { + s.logger.Info().Msg("starting DA submission loop") s.wg.Add(1) go func() { defer s.wg.Done() @@ -150,7 +151,18 @@ func (s *Submitter) Stop() error { if s.cancel != nil { s.cancel() } - s.wg.Wait() + // Wait for goroutines to finish with a timeout to prevent hanging + done := make(chan struct{}) + go func() { + s.wg.Wait() + close(done) + }() + select { + case <-done: + // All goroutines finished cleanly + case <-time.After(5 * time.Second): + s.logger.Warn().Msg("submitter shutdown timed out waiting for goroutines, proceeding anyway") + } s.logger.Info().Msg("submitter stopped") return nil } @@ -180,8 +192,14 @@ func (s *Submitter) daSubmissionLoop() { // For strategy decision, we need to estimate the size // We'll fetch headers to check, but only submit if strategy approves if s.headerSubmissionMtx.TryLock() { + s.logger.Debug().Time("t", time.Now()).Uint64("headers", headersNb).Msg("Header submission in progress") + s.wg.Add(1) go func() { - defer s.headerSubmissionMtx.Unlock() + defer func() { + s.headerSubmissionMtx.Unlock() + s.logger.Debug().Time("t", time.Now()).Uint64("headers", headersNb).Msg("Header submission completed") + s.wg.Done() + }() // Get headers with marshalled bytes from cache headers, marshalledHeaders, err := s.cache.GetPendingHeaders(s.ctx) @@ -233,10 +251,15 @@ func (s *Submitter) daSubmissionLoop() { if dataNb > 0 { lastSubmitNanos := s.lastDataSubmit.Load() timeSinceLastSubmit := time.Since(time.Unix(0, lastSubmitNanos)) - if s.dataSubmissionMtx.TryLock() { + s.logger.Debug().Time("t", time.Now()).Uint64("data", dataNb).Msg("Data submission in progress") + s.wg.Add(1) go func() { - defer s.dataSubmissionMtx.Unlock() + defer func() { + s.dataSubmissionMtx.Unlock() + s.logger.Debug().Time("t", time.Now()).Uint64("data", dataNb).Msg("Data submission completed") + s.wg.Done() + }() // Get data with marshalled bytes from cache signedDataList, marshalledData, err := s.cache.GetPendingData(s.ctx) diff --git a/block/internal/syncing/assert.go b/block/internal/syncing/assert.go new file mode 100644 index 000000000..02cf4040a --- /dev/null +++ b/block/internal/syncing/assert.go @@ -0,0 +1,43 @@ +package syncing + +import ( + "errors" + "fmt" + + "github.com/evstack/ev-node/pkg/genesis" + "github.com/evstack/ev-node/types" +) + +func assertExpectedProposer(genesis genesis.Genesis, proposerAddr []byte) error { + if string(proposerAddr) != string(genesis.ProposerAddress) { + return fmt.Errorf("unexpected proposer: got %x, expected %x", + proposerAddr, genesis.ProposerAddress) + } + return nil +} + +func assertValidSignedData(signedData *types.SignedData, genesis genesis.Genesis) error { + if signedData == nil || signedData.Txs == nil { + return errors.New("empty signed data") + } + + if err := assertExpectedProposer(genesis, signedData.Signer.Address); err != nil { + return err + } + + dataBytes, err := signedData.Data.MarshalBinary() + if err != nil { + return fmt.Errorf("failed to get signed data payload: %w", err) + } + + valid, err := signedData.Signer.PubKey.Verify(dataBytes, signedData.Signature) + if err != nil { + return fmt.Errorf("failed to verify signature: %w", err) + } + + if !valid { + return fmt.Errorf("invalid signature") + } + + return nil +} diff --git a/block/internal/syncing/da_retriever.go b/block/internal/syncing/da_retriever.go index 1307b3968..506840f5c 100644 --- a/block/internal/syncing/da_retriever.go +++ b/block/internal/syncing/da_retriever.go @@ -346,38 +346,12 @@ func (r *daRetriever) tryDecodeData(bz []byte, daHeight uint64) *types.Data { // assertExpectedProposer validates the proposer address func (r *daRetriever) assertExpectedProposer(proposerAddr []byte) error { - if string(proposerAddr) != string(r.genesis.ProposerAddress) { - return fmt.Errorf("unexpected proposer: got %x, expected %x", - proposerAddr, r.genesis.ProposerAddress) - } - return nil + return assertExpectedProposer(r.genesis, proposerAddr) } // assertValidSignedData validates signed data using the configured signature provider func (r *daRetriever) assertValidSignedData(signedData *types.SignedData) error { - if signedData == nil || signedData.Txs == nil { - return errors.New("empty signed data") - } - - if err := r.assertExpectedProposer(signedData.Signer.Address); err != nil { - return err - } - - dataBytes, err := signedData.Data.MarshalBinary() - if err != nil { - return fmt.Errorf("failed to get signed data payload: %w", err) - } - - valid, err := signedData.Signer.PubKey.Verify(dataBytes, signedData.Signature) - if err != nil { - return fmt.Errorf("failed to verify signature: %w", err) - } - - if !valid { - return fmt.Errorf("invalid signature") - } - - return nil + return assertValidSignedData(signedData, r.genesis) } // isEmptyDataExpected checks if empty data is expected for a header diff --git a/block/internal/syncing/syncer.go b/block/internal/syncing/syncer.go index 0ea9d03dd..a00c2f4fc 100644 --- a/block/internal/syncing/syncer.go +++ b/block/internal/syncing/syncer.go @@ -229,12 +229,41 @@ func (s *Syncer) Start(ctx context.Context) error { // Stop shuts down the syncing component func (s *Syncer) Stop() error { - if s.cancel != nil { - s.cancel() + if s.cancel == nil { + return nil } + + s.cancel() s.cancelP2PWait(0) s.wg.Wait() + + drainCtx, drainCancel := context.WithTimeout(context.Background(), 5*time.Second) + defer drainCancel() + + drained := 0 +drainLoop: + for { + select { + case event, ok := <-s.heightInCh: + if !ok { + break drainLoop + } + s.processHeightEvent(drainCtx, &event) + drained++ + case <-drainCtx.Done(): + s.logger.Warn().Int("remaining", len(s.heightInCh)).Msg("timeout draining height events during shutdown") + break drainLoop + default: + break drainLoop + } + } + if drained > 0 { + s.logger.Info().Int("count", drained).Msg("drained pending height events during shutdown") + } + s.logger.Info().Msg("syncer stopped") + close(s.heightInCh) + s.cancel = nil return nil } @@ -282,6 +311,21 @@ func (s *Syncer) initializeState() error { if state.DAHeight != 0 && state.DAHeight < s.genesis.DAStartHeight { return fmt.Errorf("DA height (%d) is lower than DA start height (%d)", state.DAHeight, s.genesis.DAStartHeight) } + + // Persist the initialized state to the store + batch, err := s.store.NewBatch(s.ctx) + if err != nil { + return fmt.Errorf("failed to create batch: %w", err) + } + if err := batch.SetHeight(state.LastBlockHeight); err != nil { + return fmt.Errorf("failed to set store height: %w", err) + } + if err := batch.UpdateState(state); err != nil { + return fmt.Errorf("failed to update state: %w", err) + } + if err := batch.Commit(); err != nil { + return fmt.Errorf("failed to commit batch: %w", err) + } s.SetLastState(state) // Set DA height to the maximum of the genesis start height, the state's DA height, the cached DA height, and the highest stored included DA height. @@ -312,8 +356,10 @@ func (s *Syncer) processLoop() { select { case <-s.ctx.Done(): return - case heightEvent := <-s.heightInCh: - s.processHeightEvent(&heightEvent) + case heightEvent, ok := <-s.heightInCh: + if ok { + s.processHeightEvent(s.ctx, &heightEvent) + } } } } @@ -386,10 +432,8 @@ func (s *Syncer) fetchDAUntilCaughtUp() error { // Process DA events for _, event := range events { - select { - case s.heightInCh <- event: - default: - s.cache.SetPendingEvent(event.Header.Height(), &event) + if err := s.pipeEvent(s.ctx, event); err != nil { + return err } } @@ -484,7 +528,20 @@ func (s *Syncer) waitForGenesis() bool { return true } -func (s *Syncer) processHeightEvent(event *common.DAHeightEvent) { +func (s *Syncer) pipeEvent(ctx context.Context, event common.DAHeightEvent) error { + select { + case s.heightInCh <- event: + return nil + case <-ctx.Done(): + s.cache.SetPendingEvent(event.Header.Height(), &event) + return ctx.Err() + default: + s.cache.SetPendingEvent(event.Header.Height(), &event) + } + return nil +} + +func (s *Syncer) processHeightEvent(ctx context.Context, event *common.DAHeightEvent) { height := event.Header.Height() headerHash := event.Header.Hash().String() @@ -494,7 +551,7 @@ func (s *Syncer) processHeightEvent(event *common.DAHeightEvent) { Str("hash", headerHash). Msg("processing height event") - currentHeight, err := s.store.Height(s.ctx) + currentHeight, err := s.store.Height(ctx) if err != nil { s.logger.Error().Err(err).Msg("failed to get current height") return @@ -517,7 +574,7 @@ func (s *Syncer) processHeightEvent(event *common.DAHeightEvent) { // Last data must be got from store if the event comes from DA and the data hash is empty. // When if the event comes from P2P, the sequencer and then all the full nodes contains the data. if event.Source == common.SourceDA && bytes.Equal(event.Header.DataHash, common.DataHashForEmptyTxs) && currentHeight > 0 { - _, lastData, err := s.store.GetBlockData(s.ctx, currentHeight) + _, lastData, err := s.store.GetBlockData(ctx, currentHeight) if err != nil { s.logger.Error().Err(err).Msg("failed to get last data") return @@ -529,9 +586,12 @@ func (s *Syncer) processHeightEvent(event *common.DAHeightEvent) { s.cancelP2PWait(height) // Try to sync the next block - if err := s.blockSyncer.TrySyncNextBlock(s.ctx, event); err != nil { - s.logger.Error().Err(err).Msg("failed to sync next block") - // If the error is not due to an validation error, re-store the event as pending + if err := s.blockSyncer.TrySyncNextBlock(ctx, event); err != nil { + s.logger.Error().Err(err). + Uint64("event-height", event.Header.Height()). + Uint64("state-height", s.getLastState().LastBlockHeight). + Msg("failed to sync next block") + // If the error is not due to a validation error, re-store the event as pending switch { case errors.Is(err, errInvalidBlock): // do not reschedule @@ -549,7 +609,7 @@ func (s *Syncer) processHeightEvent(event *common.DAHeightEvent) { // only save to p2p stores if the event came from DA if event.Source == common.SourceDA { // TODO(@julienrbrt): To be reverted once DA Hints are merged (https://github.com/evstack/ev-node/pull/2891) - g, ctx := errgroup.WithContext(s.ctx) + g, ctx := errgroup.WithContext(ctx) g.Go(func() error { // broadcast header locally only — prevents spamming the p2p network with old height notifications, // allowing the syncer to update its target and fill missing blocks @@ -626,7 +686,7 @@ func (s *Syncer) TrySyncNextBlock(ctx context.Context, event *common.DAHeightEve newState.DAHeight = event.DaHeight } - batch, err := s.store.NewBatch(s.ctx) + batch, err := s.store.NewBatch(ctx) if err != nil { return fmt.Errorf("failed to create batch: %w", err) } @@ -708,8 +768,8 @@ func (s *Syncer) executeTxsWithRetry(ctx context.Context, rawTxs [][]byte, heade select { case <-time.After(common.MaxRetriesTimeout): continue - case <-s.ctx.Done(): - return nil, fmt.Errorf("context cancelled during retry: %w", s.ctx.Err()) + case <-ctx.Done(): + return nil, fmt.Errorf("context cancelled during retry: %w", ctx.Err()) } } @@ -822,7 +882,7 @@ func (s *Syncer) getEffectiveGracePeriod() uint64 { // Note: Due to block size constraints (MaxBytes), sequencers may defer forced inclusion transactions // to future blocks (smoothing). This is legitimate behavior within an epoch. // However, ALL forced inclusion txs from an epoch MUST be included before the next epoch begins or grace boundary (whichever comes later). -func (s *Syncer) VerifyForcedInclusionTxs(_ context.Context, currentState types.State, data *types.Data) error { +func (s *Syncer) VerifyForcedInclusionTxs(ctx context.Context, currentState types.State, data *types.Data) error { if s.fiRetriever == nil { return nil } @@ -832,7 +892,7 @@ func (s *Syncer) VerifyForcedInclusionTxs(_ context.Context, currentState types. s.updateDynamicGracePeriod(blockFullness) // Retrieve forced inclusion transactions from DA for current epoch - forcedIncludedTxsEvent, err := s.fiRetriever.RetrieveForcedIncludedTxs(s.ctx, currentState.DAHeight) + forcedIncludedTxsEvent, err := s.fiRetriever.RetrieveForcedIncludedTxs(ctx, currentState.DAHeight) if err != nil { if errors.Is(err, da.ErrForceInclusionNotConfigured) { s.logger.Debug().Msg("forced inclusion namespace not configured, skipping verification") diff --git a/block/internal/syncing/syncer_test.go b/block/internal/syncing/syncer_test.go index 554e80f63..2ba71a78f 100644 --- a/block/internal/syncing/syncer_test.go +++ b/block/internal/syncing/syncer_test.go @@ -134,19 +134,19 @@ func TestSyncer_validateBlock_DataHashMismatch(t *testing.T) { data := makeData(gen.ChainID, 1, 2) // non-empty _, header := makeSignedHeaderBytes(t, gen.ChainID, 1, addr, pub, signer, nil, data, nil) - err = s.ValidateBlock(context.Background(), s.getLastState(), data, header) + err = s.ValidateBlock(t.Context(), s.getLastState(), data, header) require.NoError(t, err) // Create header and data with mismatched hash data = makeData(gen.ChainID, 1, 2) // non-empty _, header = makeSignedHeaderBytes(t, gen.ChainID, 1, addr, pub, signer, nil, nil, nil) - err = s.ValidateBlock(context.Background(), s.getLastState(), data, header) + err = s.ValidateBlock(t.Context(), s.getLastState(), data, header) require.Error(t, err) // Create header and empty data data = makeData(gen.ChainID, 1, 0) // empty _, header = makeSignedHeaderBytes(t, gen.ChainID, 2, addr, pub, signer, nil, nil, nil) - err = s.ValidateBlock(context.Background(), s.getLastState(), data, header) + err = s.ValidateBlock(t.Context(), s.getLastState(), data, header) require.Error(t, err) } @@ -183,7 +183,7 @@ func TestProcessHeightEvent_SyncsAndUpdatesState(t *testing.T) { require.NoError(t, s.initializeState()) // set a context for internal loops that expect it - s.ctx = context.Background() + s.ctx = t.Context() // Create signed header & data for height 1 lastState := s.getLastState() data := makeData(gen.ChainID, 1, 0) @@ -194,13 +194,13 @@ func TestProcessHeightEvent_SyncsAndUpdatesState(t *testing.T) { Return([]byte("app1"), uint64(1024), nil).Once() evt := common.DAHeightEvent{Header: hdr, Data: data, DaHeight: 1} - s.processHeightEvent(&evt) + s.processHeightEvent(t.Context(), &evt) requireEmptyChan(t, errChan) - h, err := st.Height(context.Background()) + h, err := st.Height(t.Context()) require.NoError(t, err) assert.Equal(t, uint64(1), h) - st1, err := st.GetState(context.Background()) + st1, err := st.GetState(t.Context()) require.NoError(t, err) assert.Equal(t, uint64(1), st1.LastBlockHeight) } @@ -235,7 +235,7 @@ func TestSequentialBlockSync(t *testing.T) { errChan, ) require.NoError(t, s.initializeState()) - s.ctx = context.Background() + s.ctx = t.Context() // Sync two consecutive blocks via processHeightEvent so ExecuteTxs is called and state stored st0 := s.getLastState() @@ -245,16 +245,16 @@ func TestSequentialBlockSync(t *testing.T) { mockExec.EXPECT().ExecuteTxs(mock.Anything, mock.Anything, uint64(1), mock.Anything, st0.AppHash). Return([]byte("app1"), uint64(1024), nil).Once() evt1 := common.DAHeightEvent{Header: hdr1, Data: data1, DaHeight: 10} - s.processHeightEvent(&evt1) + s.processHeightEvent(t.Context(), &evt1) - st1, _ := st.GetState(context.Background()) + st1, _ := st.GetState(t.Context()) data2 := makeData(gen.ChainID, 2, 0) // empty data _, hdr2 := makeSignedHeaderBytes(t, gen.ChainID, 2, addr, pub, signer, st1.AppHash, data2, st1.LastHeaderHash) // Expect ExecuteTxs call for height 2 mockExec.EXPECT().ExecuteTxs(mock.Anything, mock.Anything, uint64(2), mock.Anything, st1.AppHash). Return([]byte("app2"), uint64(1024), nil).Once() evt2 := common.DAHeightEvent{Header: hdr2, Data: data2, DaHeight: 11} - s.processHeightEvent(&evt2) + s.processHeightEvent(t.Context(), &evt2) // Mark DA inclusion in cache (as DA retrieval would) cm.SetDataDAIncluded(data1.DACommitment().String(), 10, 1) @@ -263,7 +263,7 @@ func TestSequentialBlockSync(t *testing.T) { cm.SetHeaderDAIncluded(hdr2.Header.Hash().String(), 11, 2) // Verify both blocks were synced correctly - finalState, _ := st.GetState(context.Background()) + finalState, _ := st.GetState(t.Context()) assert.Equal(t, uint64(2), finalState.LastBlockHeight) // Verify DA inclusion markers are set @@ -286,7 +286,7 @@ func TestSyncer_processPendingEvents(t *testing.T) { require.NoError(t, err) // current height 1 - batch, err := st.NewBatch(context.Background()) + batch, err := st.NewBatch(t.Context()) require.NoError(t, err) require.NoError(t, batch.SetHeight(1)) require.NoError(t, batch.Commit()) @@ -294,7 +294,7 @@ func TestSyncer_processPendingEvents(t *testing.T) { s := &Syncer{ store: st, cache: cm, - ctx: context.Background(), + ctx: t.Context(), heightInCh: make(chan common.DAHeightEvent, 2), logger: zerolog.Nop(), } @@ -538,7 +538,7 @@ func TestSyncer_executeTxsWithRetry(t *testing.T) { t.Run(tt.name, func(t *testing.T) { t.Parallel() - ctx := context.Background() + ctx := t.Context() exec := testmocks.NewMockExecutor(t) tt.setupMock(exec) @@ -612,7 +612,13 @@ func TestSyncer_InitializeState_CallsReplayer(t *testing.T) { // Setup execution layer to be in sync mockExec.On("GetLatestHeight", mock.Anything).Return(storeHeight, nil) - // Create syncer with minimal dependencies + // Mock batch operations + mockBatch := new(testmocks.MockBatch) + mockBatch.On("SetHeight", storeHeight).Return(nil) + mockBatch.On("UpdateState", mock.Anything).Return(nil) + mockBatch.On("Commit").Return(nil) + mockStore.EXPECT().NewBatch(mock.Anything).Return(mockBatch, nil) + syncer := &Syncer{ store: mockStore, exec: mockExec, @@ -620,7 +626,7 @@ func TestSyncer_InitializeState_CallsReplayer(t *testing.T) { lastState: &atomic.Pointer[types.State]{}, daRetrieverHeight: &atomic.Uint64{}, logger: zerolog.Nop(), - ctx: context.Background(), + ctx: t.Context(), cache: cm, } @@ -649,7 +655,7 @@ func requireEmptyChan(t *testing.T, errorCh chan error) { func TestSyncer_getHighestStoredDAHeight(t *testing.T) { ds := dssync.MutexWrap(datastore.NewMapDatastore()) st := store.New(ds) - ctx := context.Background() + ctx := t.Context() syncer := &Syncer{ store: st, diff --git a/test/mocks/batch.go b/test/mocks/batch.go new file mode 100644 index 000000000..90025855b --- /dev/null +++ b/test/mocks/batch.go @@ -0,0 +1,355 @@ +// Code generated by mockery; DO NOT EDIT. +// github.com/vektra/mockery +// template: testify + +package mocks + +import ( + "github.com/evstack/ev-node/types" + "github.com/ipfs/go-datastore" + mock "github.com/stretchr/testify/mock" +) + +// NewMockBatch creates a new instance of MockBatch. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. +// The first argument is typically a *testing.T value. +func NewMockBatch(t interface { + mock.TestingT + Cleanup(func()) +}) *MockBatch { + mock := &MockBatch{} + mock.Mock.Test(t) + + t.Cleanup(func() { mock.AssertExpectations(t) }) + + return mock +} + +// MockBatch is an autogenerated mock type for the Batch type +type MockBatch struct { + mock.Mock +} + +type MockBatch_Expecter struct { + mock *mock.Mock +} + +func (_m *MockBatch) EXPECT() *MockBatch_Expecter { + return &MockBatch_Expecter{mock: &_m.Mock} +} + +// Commit provides a mock function for the type MockBatch +func (_mock *MockBatch) Commit() error { + ret := _mock.Called() + + if len(ret) == 0 { + panic("no return value specified for Commit") + } + + var r0 error + if returnFunc, ok := ret.Get(0).(func() error); ok { + r0 = returnFunc() + } else { + r0 = ret.Error(0) + } + return r0 +} + +// MockBatch_Commit_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'Commit' +type MockBatch_Commit_Call struct { + *mock.Call +} + +// Commit is a helper method to define mock.On call +func (_e *MockBatch_Expecter) Commit() *MockBatch_Commit_Call { + return &MockBatch_Commit_Call{Call: _e.mock.On("Commit")} +} + +func (_c *MockBatch_Commit_Call) Run(run func()) *MockBatch_Commit_Call { + _c.Call.Run(func(args mock.Arguments) { + run() + }) + return _c +} + +func (_c *MockBatch_Commit_Call) Return(err error) *MockBatch_Commit_Call { + _c.Call.Return(err) + return _c +} + +func (_c *MockBatch_Commit_Call) RunAndReturn(run func() error) *MockBatch_Commit_Call { + _c.Call.Return(run) + return _c +} + +// Delete provides a mock function for the type MockBatch +func (_mock *MockBatch) Delete(key datastore.Key) error { + ret := _mock.Called(key) + + if len(ret) == 0 { + panic("no return value specified for Delete") + } + + var r0 error + if returnFunc, ok := ret.Get(0).(func(datastore.Key) error); ok { + r0 = returnFunc(key) + } else { + r0 = ret.Error(0) + } + return r0 +} + +// MockBatch_Delete_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'Delete' +type MockBatch_Delete_Call struct { + *mock.Call +} + +// Delete is a helper method to define mock.On call +// - key datastore.Key +func (_e *MockBatch_Expecter) Delete(key interface{}) *MockBatch_Delete_Call { + return &MockBatch_Delete_Call{Call: _e.mock.On("Delete", key)} +} + +func (_c *MockBatch_Delete_Call) Run(run func(key datastore.Key)) *MockBatch_Delete_Call { + _c.Call.Run(func(args mock.Arguments) { + var arg0 datastore.Key + if args[0] != nil { + arg0 = args[0].(datastore.Key) + } + run( + arg0, + ) + }) + return _c +} + +func (_c *MockBatch_Delete_Call) Return(err error) *MockBatch_Delete_Call { + _c.Call.Return(err) + return _c +} + +func (_c *MockBatch_Delete_Call) RunAndReturn(run func(key datastore.Key) error) *MockBatch_Delete_Call { + _c.Call.Return(run) + return _c +} + +// Put provides a mock function for the type MockBatch +func (_mock *MockBatch) Put(key datastore.Key, value []byte) error { + ret := _mock.Called(key, value) + + if len(ret) == 0 { + panic("no return value specified for Put") + } + + var r0 error + if returnFunc, ok := ret.Get(0).(func(datastore.Key, []byte) error); ok { + r0 = returnFunc(key, value) + } else { + r0 = ret.Error(0) + } + return r0 +} + +// MockBatch_Put_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'Put' +type MockBatch_Put_Call struct { + *mock.Call +} + +// Put is a helper method to define mock.On call +// - key datastore.Key +// - value []byte +func (_e *MockBatch_Expecter) Put(key interface{}, value interface{}) *MockBatch_Put_Call { + return &MockBatch_Put_Call{Call: _e.mock.On("Put", key, value)} +} + +func (_c *MockBatch_Put_Call) Run(run func(key datastore.Key, value []byte)) *MockBatch_Put_Call { + _c.Call.Run(func(args mock.Arguments) { + var arg0 datastore.Key + if args[0] != nil { + arg0 = args[0].(datastore.Key) + } + var arg1 []byte + if args[1] != nil { + arg1 = args[1].([]byte) + } + run( + arg0, + arg1, + ) + }) + return _c +} + +func (_c *MockBatch_Put_Call) Return(err error) *MockBatch_Put_Call { + _c.Call.Return(err) + return _c +} + +func (_c *MockBatch_Put_Call) RunAndReturn(run func(key datastore.Key, value []byte) error) *MockBatch_Put_Call { + _c.Call.Return(run) + return _c +} + +// SaveBlockData provides a mock function for the type MockBatch +func (_mock *MockBatch) SaveBlockData(header *types.SignedHeader, data *types.Data, signature *types.Signature) error { + ret := _mock.Called(header, data, signature) + + if len(ret) == 0 { + panic("no return value specified for SaveBlockData") + } + + var r0 error + if returnFunc, ok := ret.Get(0).(func(*types.SignedHeader, *types.Data, *types.Signature) error); ok { + r0 = returnFunc(header, data, signature) + } else { + r0 = ret.Error(0) + } + return r0 +} + +// MockBatch_SaveBlockData_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'SaveBlockData' +type MockBatch_SaveBlockData_Call struct { + *mock.Call +} + +// SaveBlockData is a helper method to define mock.On call +// - header *types.SignedHeader +// - data *types.Data +// - signature *types.Signature +func (_e *MockBatch_Expecter) SaveBlockData(header interface{}, data interface{}, signature interface{}) *MockBatch_SaveBlockData_Call { + return &MockBatch_SaveBlockData_Call{Call: _e.mock.On("SaveBlockData", header, data, signature)} +} + +func (_c *MockBatch_SaveBlockData_Call) Run(run func(header *types.SignedHeader, data *types.Data, signature *types.Signature)) *MockBatch_SaveBlockData_Call { + _c.Call.Run(func(args mock.Arguments) { + var arg0 *types.SignedHeader + if args[0] != nil { + arg0 = args[0].(*types.SignedHeader) + } + var arg1 *types.Data + if args[1] != nil { + arg1 = args[1].(*types.Data) + } + var arg2 *types.Signature + if args[2] != nil { + arg2 = args[2].(*types.Signature) + } + run( + arg0, + arg1, + arg2, + ) + }) + return _c +} + +func (_c *MockBatch_SaveBlockData_Call) Return(err error) *MockBatch_SaveBlockData_Call { + _c.Call.Return(err) + return _c +} + +func (_c *MockBatch_SaveBlockData_Call) RunAndReturn(run func(header *types.SignedHeader, data *types.Data, signature *types.Signature) error) *MockBatch_SaveBlockData_Call { + _c.Call.Return(run) + return _c +} + +// SetHeight provides a mock function for the type MockBatch +func (_mock *MockBatch) SetHeight(height uint64) error { + ret := _mock.Called(height) + + if len(ret) == 0 { + panic("no return value specified for SetHeight") + } + + var r0 error + if returnFunc, ok := ret.Get(0).(func(uint64) error); ok { + r0 = returnFunc(height) + } else { + r0 = ret.Error(0) + } + return r0 +} + +// MockBatch_SetHeight_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'SetHeight' +type MockBatch_SetHeight_Call struct { + *mock.Call +} + +// SetHeight is a helper method to define mock.On call +// - height uint64 +func (_e *MockBatch_Expecter) SetHeight(height interface{}) *MockBatch_SetHeight_Call { + return &MockBatch_SetHeight_Call{Call: _e.mock.On("SetHeight", height)} +} + +func (_c *MockBatch_SetHeight_Call) Run(run func(height uint64)) *MockBatch_SetHeight_Call { + _c.Call.Run(func(args mock.Arguments) { + var arg0 uint64 + if args[0] != nil { + arg0 = args[0].(uint64) + } + run( + arg0, + ) + }) + return _c +} + +func (_c *MockBatch_SetHeight_Call) Return(err error) *MockBatch_SetHeight_Call { + _c.Call.Return(err) + return _c +} + +func (_c *MockBatch_SetHeight_Call) RunAndReturn(run func(height uint64) error) *MockBatch_SetHeight_Call { + _c.Call.Return(run) + return _c +} + +// UpdateState provides a mock function for the type MockBatch +func (_mock *MockBatch) UpdateState(state types.State) error { + ret := _mock.Called(state) + + if len(ret) == 0 { + panic("no return value specified for UpdateState") + } + + var r0 error + if returnFunc, ok := ret.Get(0).(func(types.State) error); ok { + r0 = returnFunc(state) + } else { + r0 = ret.Error(0) + } + return r0 +} + +// MockBatch_UpdateState_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'UpdateState' +type MockBatch_UpdateState_Call struct { + *mock.Call +} + +// UpdateState is a helper method to define mock.On call +// - state types.State +func (_e *MockBatch_Expecter) UpdateState(state interface{}) *MockBatch_UpdateState_Call { + return &MockBatch_UpdateState_Call{Call: _e.mock.On("UpdateState", state)} +} + +func (_c *MockBatch_UpdateState_Call) Run(run func(state types.State)) *MockBatch_UpdateState_Call { + _c.Call.Run(func(args mock.Arguments) { + var arg0 types.State + if args[0] != nil { + arg0 = args[0].(types.State) + } + run( + arg0, + ) + }) + return _c +} + +func (_c *MockBatch_UpdateState_Call) Return(err error) *MockBatch_UpdateState_Call { + _c.Call.Return(err) + return _c +} + +func (_c *MockBatch_UpdateState_Call) RunAndReturn(run func(state types.State) error) *MockBatch_UpdateState_Call { + _c.Call.Return(run) + return _c +}