Skip to content

Commit 8baaa65

Browse files
committed
feat: add background P2P init retries to SyncService
1 parent f70e6da commit 8baaa65

File tree

1 file changed

+79
-36
lines changed

1 file changed

+79
-36
lines changed

pkg/sync/sync_service.go

Lines changed: 79 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,10 @@ type SyncService[H header.Header[H]] struct {
6161
getterByHeight GetterByHeightFunc[H]
6262
rangeGetter RangeGetterFunc[H]
6363
storeInitialized atomic.Bool
64+
65+
// context for background operations
66+
bgCtx context.Context
67+
bgCancel context.CancelFunc
6468
}
6569

6670
// DataSyncService is the P2P Sync Service for blocks.
@@ -153,6 +157,8 @@ func newSyncService[H header.Header[H]](
153157
return nil, fmt.Errorf("failed to initialize the %s store: %w", syncType, err)
154158
}
155159

160+
bgCtx, bgCancel := context.WithCancel(context.Background())
161+
156162
svc := &SyncService[H]{
157163
conf: conf,
158164
genesis: genesis,
@@ -164,6 +170,8 @@ func newSyncService[H header.Header[H]](
164170
syncType: syncType,
165171
logger: logger,
166172
syncerStatus: new(SyncerStatus),
173+
bgCtx: bgCtx,
174+
bgCancel: bgCancel,
167175
}
168176

169177
return svc, nil
@@ -389,6 +397,42 @@ func (syncService *SyncService[H]) startSubscriber(ctx context.Context) error {
389397
return nil
390398
}
391399

400+
// tryInit attempts to initialize the syncer from P2P once.
401+
// Returns true if successful, false otherwise with an error.
402+
func (syncService *SyncService[H]) tryInit(ctx context.Context) (bool, error) {
403+
var (
404+
trusted H
405+
err error
406+
heightToQuery uint64
407+
)
408+
409+
head, headErr := syncService.store.Head(ctx)
410+
switch {
411+
case errors.Is(headErr, header.ErrNotFound), errors.Is(headErr, header.ErrEmptyStore):
412+
heightToQuery = syncService.genesis.InitialHeight
413+
case headErr != nil:
414+
return false, fmt.Errorf("failed to inspect local store head: %w", headErr)
415+
default:
416+
heightToQuery = head.Height()
417+
}
418+
419+
if trusted, err = syncService.ex.GetByHeight(ctx, heightToQuery); err != nil {
420+
return false, fmt.Errorf("failed to fetch height %d from peers: %w", heightToQuery, err)
421+
}
422+
423+
if syncService.storeInitialized.CompareAndSwap(false, true) {
424+
if _, err := syncService.initStore(ctx, trusted); err != nil {
425+
syncService.storeInitialized.Store(false)
426+
return false, fmt.Errorf("failed to initialize the store: %w", err)
427+
}
428+
}
429+
if err := syncService.startSyncer(ctx); err != nil {
430+
return false, err
431+
}
432+
433+
return true, nil
434+
}
435+
392436
// initFromP2PWithRetry initializes the syncer from P2P with a retry mechanism.
393437
// It inspects the local store to determine the first height to request:
394438
// - when the store already contains items, it reuses the latest height as the starting point;
@@ -398,48 +442,15 @@ func (syncService *SyncService[H]) initFromP2PWithRetry(ctx context.Context, pee
398442
return nil
399443
}
400444

401-
tryInit := func(ctx context.Context) (bool, error) {
402-
var (
403-
trusted H
404-
err error
405-
heightToQuery uint64
406-
)
407-
408-
head, headErr := syncService.store.Head(ctx)
409-
switch {
410-
case errors.Is(headErr, header.ErrNotFound), errors.Is(headErr, header.ErrEmptyStore):
411-
heightToQuery = syncService.genesis.InitialHeight
412-
case headErr != nil:
413-
return false, fmt.Errorf("failed to inspect local store head: %w", headErr)
414-
default:
415-
heightToQuery = head.Height()
416-
}
417-
418-
if trusted, err = syncService.ex.GetByHeight(ctx, heightToQuery); err != nil {
419-
return false, fmt.Errorf("failed to fetch height %d from peers: %w", heightToQuery, err)
420-
}
421-
422-
if syncService.storeInitialized.CompareAndSwap(false, true) {
423-
if _, err := syncService.initStore(ctx, trusted); err != nil {
424-
syncService.storeInitialized.Store(false)
425-
return false, fmt.Errorf("failed to initialize the store: %w", err)
426-
}
427-
}
428-
if err := syncService.startSyncer(ctx); err != nil {
429-
return false, err
430-
}
431-
return true, nil
432-
}
433-
434445
// block with exponential backoff until initialization succeeds or context is canceled.
435446
backoff := 1 * time.Second
436447
maxBackoff := 10 * time.Second
437448

438-
timeoutTimer := time.NewTimer(time.Minute * 10)
449+
timeoutTimer := time.NewTimer(time.Minute * 2)
439450
defer timeoutTimer.Stop()
440451

441452
for {
442-
ok, err := tryInit(ctx)
453+
ok, err := syncService.tryInit(ctx)
443454
if ok {
444455
return nil
445456
}
@@ -450,7 +461,9 @@ func (syncService *SyncService[H]) initFromP2PWithRetry(ctx context.Context, pee
450461
case <-ctx.Done():
451462
return ctx.Err()
452463
case <-timeoutTimer.C:
453-
return fmt.Errorf("timeout reached while trying to initialize the store after 10 minutes: %w", err)
464+
syncService.logger.Warn().Err(err).Msg("timeout reached while trying to initialize the store, scheduling background retry")
465+
go syncService.retryInitInBackground()
466+
return nil
454467
case <-time.After(backoff):
455468
}
456469

@@ -461,10 +474,40 @@ func (syncService *SyncService[H]) initFromP2PWithRetry(ctx context.Context, pee
461474
}
462475
}
463476

477+
// retryInitInBackground continues attempting to initialize the syncer in the background.
478+
func (syncService *SyncService[H]) retryInitInBackground() {
479+
backoff := 15 * time.Second
480+
maxBackoff := 5 * time.Minute
481+
482+
for {
483+
select {
484+
case <-syncService.bgCtx.Done():
485+
syncService.logger.Info().Msg("background retry cancelled")
486+
return
487+
case <-time.After(backoff):
488+
}
489+
490+
ok, err := syncService.tryInit(syncService.bgCtx)
491+
if ok {
492+
syncService.logger.Info().Msg("successfully initialized store from P2P in background")
493+
return
494+
}
495+
496+
syncService.logger.Info().Err(err).Dur("retry_in", backoff).Msg("background retry: headers not yet available from peers")
497+
498+
backoff *= 2
499+
if backoff > maxBackoff {
500+
backoff = maxBackoff
501+
}
502+
}
503+
}
504+
464505
// Stop is a part of Service interface.
465506
//
466507
// `store` is closed last because it's used by other services.
467508
func (syncService *SyncService[H]) Stop(ctx context.Context) error {
509+
syncService.bgCancel()
510+
468511
// unsubscribe from topic first so that sub.Stop() does not fail
469512
syncService.topicSubscription.Cancel()
470513
err := errors.Join(

0 commit comments

Comments
 (0)