diff --git a/CHANGELOG.md b/CHANGELOG.md index bb77b2d31..b33033fcc 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -7,7 +7,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/) ## Unreleased ### Added - +- Added missing block detection and recovery in the indexer.[#488](https://github.com/proto-kit/framework/pull/488) - `@dependencyFactory` for static dependency factory type safety - Added Mempool sorting [#395](https://github.com/proto-kit/framework/pull/395) - Introduced dynamic block building and JIT transaction fetching [#394](https://github.com/proto-kit/framework/pull/394) diff --git a/packages/indexer/src/IndexerNotifier.ts b/packages/indexer/src/IndexerNotifier.ts index 93a33921c..28d5248ea 100644 --- a/packages/indexer/src/IndexerNotifier.ts +++ b/packages/indexer/src/IndexerNotifier.ts @@ -1,5 +1,7 @@ import { BlockTriggerBase, + BlockStorage, + BlockWithResult, Sequencer, sequencerModule, SequencerModule, @@ -12,7 +14,8 @@ import { import { log } from "@proto-kit/common"; import { inject } from "tsyringe"; -import { IndexBlockTask } from "./tasks/IndexBlockTask"; +import { IndexBlockTask, IndexBlockResult } from "./tasks/IndexBlockTask"; +import { IndexMissingBlocksTask } from "./tasks/IndexMissingBlocksTask"; import { IndexPendingTxTask } from "./tasks/IndexPendingTxTask"; import { IndexSettlementTask } from "./tasks/IndexSettlementTask"; import { IndexBatchTask } from "./tasks/IndexBatchTask"; @@ -30,7 +33,10 @@ export class IndexerNotifier extends SequencerModule> { public sequencer: Sequencer, @inject("TaskQueue") public taskQueue: TaskQueue, + @inject("BlockStorage") + private readonly blockStorage: BlockStorage, public indexBlockTask: IndexBlockTask, + public indexMissingBlocksTask: IndexMissingBlocksTask, public indexPendingTxTask: IndexPendingTxTask, public indexBatchTask: IndexBatchTask, public indexSettlementTask: IndexSettlementTask, @@ -39,6 +45,67 @@ export class IndexerNotifier extends SequencerModule> { super(); } + private async pushTask( + queueName: string, + name: string, + payload: string + ): Promise { + const queue = await this.taskQueue.getQueue(queueName); + await queue.addTask({ + name, + payload, + flowId: "", + sequencerId: this.sequencerIdProvider.getSequencerId(), + }); + } + + private async handleIndexBlockTaskCompleted( + payload: TaskPayload + ): Promise { + if (payload.name !== this.indexBlockTask.name) { + return; + } + + try { + // eslint-disable-next-line @typescript-eslint/consistent-type-assertions + const result = JSON.parse(payload.payload) as IndexBlockResult; + + if ( + result.status !== "missing-blocks" || + result.missingHeights.length === 0 + ) { + return; + } + + const heights = [...result.missingHeights, result.incomingHeight]; + + const blocks = await Promise.all( + heights.map((h) => this.blockStorage.getBlockWithResultAt(h)) + ); + + const filteredBlocks = blocks.filter( + (block): block is BlockWithResult => block !== undefined + ); + + if (filteredBlocks.length === 0) { + log.warn("No blocks found to re-send"); + return; + } + + const serialized = await this.indexMissingBlocksTask + .inputSerializer() + .toJSON(filteredBlocks); + + await this.pushTask( + this.indexMissingBlocksTask.name, + this.indexMissingBlocksTask.name, + serialized + ); + } catch (error) { + log.error("Failed to handle block task completion result", error); + } + } + public async propagateEventsAsTasks() { const queue = await this.taskQueue.getQueue(this.indexBlockTask.name); const inputSerializer = this.indexBlockTask.inputSerializer(); @@ -47,86 +114,62 @@ export class IndexerNotifier extends SequencerModule> { const settlementInputSerializer = this.indexSettlementTask.inputSerializer(); + await queue.onCompleted( + async (payload) => await this.handleIndexBlockTaskCompleted(payload) + ); + this.sequencer.events.on("block-metadata-produced", async (block) => { log.debug( "Notifiying the indexer about block", block.block.height.toBigInt() ); const payload = await inputSerializer.toJSON(block); - const sequencerId = this.sequencerIdProvider.getSequencerId(); - - const task: TaskPayload = { - name: this.indexBlockTask.name, - payload, - flowId: "", // empty for now - sequencerId, - }; - - await queue.addTask(task); + await this.pushTask( + this.indexBlockTask.name, + this.indexBlockTask.name, + payload + ); }); + this.sequencer.events.on("mempool-transaction-added", async (tx) => { try { - const txQueue = await this.taskQueue.getQueue( - this.indexPendingTxTask.name - ); const payload = await txInputSerializer.toJSON(tx); - const sequencerId = this.sequencerIdProvider.getSequencerId(); - - const task: TaskPayload = { - name: this.indexPendingTxTask.name, - payload, - flowId: "", - sequencerId, - }; - - await txQueue.addTask(task); + await this.pushTask( + this.indexPendingTxTask.name, + this.indexPendingTxTask.name, + payload + ); } catch (err) { log.error("Failed to add pending-tx task", err); } }); + this.sequencer.events.on("batch-produced", async (batch) => { log.debug("Notifiying the indexer about batch", batch?.height); try { - const batchQueue = await this.taskQueue.getQueue( - this.indexBatchTask.name - ); - const payload = await batchInputSerializer.toJSON(batch); - const sequencerId = this.sequencerIdProvider.getSequencerId(); - - const task: TaskPayload = { - name: this.indexBatchTask.name, - payload, - flowId: "", - sequencerId, - }; - - await batchQueue.addTask(task); + await this.pushTask( + this.indexBatchTask.name, + this.indexBatchTask.name, + payload + ); } catch (err) { log.error(`Failed to index batch ${batch?.height} ${err}`); } }); + this.sequencer.events.on("settlement-submitted", async (settlement) => { log.debug( - "Notifiying the indexer about settlement", + "Notifying the indexer about settlement", settlement.transactionHash ); try { - const settlementQueue = await this.taskQueue.getQueue( - this.indexSettlementTask.name - ); - const payload = await settlementInputSerializer.toJSON(settlement); - const sequencerId = this.sequencerIdProvider.getSequencerId(); - - const task: TaskPayload = { - name: this.indexSettlementTask.name, - payload, - flowId: "", - sequencerId, - }; - - await settlementQueue.addTask(task); + await this.pushTask( + this.indexSettlementTask.name, + this.indexSettlementTask.name, + payload + ); } catch (err) { log.error( `Failed to add index settlement: ${settlement.transactionHash} ${err}` diff --git a/packages/indexer/src/index.ts b/packages/indexer/src/index.ts index 07f57a70c..da99f5494 100644 --- a/packages/indexer/src/index.ts +++ b/packages/indexer/src/index.ts @@ -7,3 +7,4 @@ export * from "./tasks/IndexBlockTaskParameters"; export * from "./tasks/IndexPendingTxTask"; export * from "./tasks/IndexBatchTask"; export * from "./tasks/IndexSettlementTask"; +export * from "./tasks/IndexMissingBlocksTask"; diff --git a/packages/indexer/src/tasks/IndexBlockTask.ts b/packages/indexer/src/tasks/IndexBlockTask.ts index 96d92677c..e1fe222ee 100644 --- a/packages/indexer/src/tasks/IndexBlockTask.ts +++ b/packages/indexer/src/tasks/IndexBlockTask.ts @@ -1,5 +1,6 @@ import { BlockQueue, + BlockStorage, Task, TaskSerializer, TaskWorkerModule, @@ -12,17 +13,25 @@ import { IndexBlockTaskParametersSerializer, } from "./IndexBlockTaskParameters"; +export interface IndexBlockResult { + status: "ok" | "missing-blocks"; + missingHeights: number[]; + incomingHeight: number; +} + @injectable() export class IndexBlockTask extends TaskWorkerModule - implements Task + implements Task { public name = "index-block"; public constructor( public taskSerializer: IndexBlockTaskParametersSerializer, @inject("BlockQueue") - public blockStorage: BlockQueue + public blockStorage: BlockQueue, + @inject("BlockStorage") + private readonly blockRepository: BlockStorage ) { super(); } @@ -32,27 +41,41 @@ export class IndexBlockTask public async compute( input: IndexBlockTaskParameters - ): Promise { + ): Promise { + const incomingHeight = Number(input.block.height.toBigInt()); try { + const currentHeight = await this.blockRepository.getCurrentBlockHeight(); + + if (incomingHeight > currentHeight) { + const missingHeights = Array.from( + { length: incomingHeight - currentHeight }, + (_, i) => currentHeight + i + ); + + return { status: "missing-blocks", missingHeights, incomingHeight }; + } await this.blockStorage.pushBlock(input.block); await this.blockStorage.pushResult(input.result); + + log.info(`Block ${incomingHeight} indexed successfully`); + return { status: "ok", missingHeights: [], incomingHeight }; } catch (error) { - log.error("Failed to index block", input.block.height.toBigInt(), error); - return undefined; + log.error("Failed to index block", incomingHeight, error); + return { status: "ok", missingHeights: [], incomingHeight }; } - - log.info(`Block ${input.block.height.toBigInt()} indexed sucessfully`); - return ""; } public inputSerializer(): TaskSerializer { return this.taskSerializer; } - public resultSerializer(): TaskSerializer { + public resultSerializer(): TaskSerializer { return { - fromJSON: async () => {}, - toJSON: async () => "", + toJSON: async (input: IndexBlockResult) => JSON.stringify(input), + + fromJSON: async (json: string) => + // eslint-disable-next-line @typescript-eslint/consistent-type-assertions + JSON.parse(json) as IndexBlockResult, }; } } diff --git a/packages/indexer/src/tasks/IndexMissingBlocksTask.ts b/packages/indexer/src/tasks/IndexMissingBlocksTask.ts new file mode 100644 index 000000000..bc83a1923 --- /dev/null +++ b/packages/indexer/src/tasks/IndexMissingBlocksTask.ts @@ -0,0 +1,74 @@ +import { + BlockQueue, + BlockWithResult, + Task, + TaskSerializer, + TaskWorkerModule, +} from "@proto-kit/sequencer"; +import { log } from "@proto-kit/common"; +import { inject, injectable } from "tsyringe"; + +import { IndexBlockTaskParametersSerializer } from "./IndexBlockTaskParameters"; + +@injectable() +export class IndexMissingBlocksTask + extends TaskWorkerModule + implements Task +{ + public name = "index-missing-blocks"; + + public constructor( + public taskSerializer: IndexBlockTaskParametersSerializer, + @inject("BlockQueue") + public blockStorage: BlockQueue + ) { + super(); + } + + // eslint-disable-next-line @typescript-eslint/no-empty-function + public async prepare(): Promise {} + + public async compute(input: BlockWithResult[]): Promise { + const lastHeight = Number(input[input.length - 1].block.height.toBigInt()); + for (const blockWithResult of input) { + const height = Number(blockWithResult.block.height.toBigInt()); + const isLast = height === lastHeight; + try { + // eslint-disable-next-line no-await-in-loop + await this.blockStorage.pushBlock(blockWithResult.block); + // eslint-disable-next-line no-await-in-loop + await this.blockStorage.pushResult(blockWithResult.result); + log.info( + `${isLast ? "" : "Missing "}block ${height} indexed successfully` + ); + } catch (error) { + log.error( + `Failed to index ${isLast ? "" : "missing "}block at height ${height}`, + error + ); + return undefined; + } + } + return ""; + } + + public inputSerializer(): TaskSerializer { + return { + toJSON: (blocks: BlockWithResult[]): string => + JSON.stringify(blocks.map((b) => this.taskSerializer.toJSON(b))), + + fromJSON: (json: string): BlockWithResult[] => { + // eslint-disable-next-line @typescript-eslint/consistent-type-assertions + const items = JSON.parse(json) as string[]; + return items.map((item) => this.taskSerializer.fromJSON(item)); + }, + }; + } + + public resultSerializer(): TaskSerializer { + return { + fromJSON: async () => {}, + toJSON: async () => "", + }; + } +} diff --git a/packages/persistance/src/services/prisma/PrismaBlockStorage.ts b/packages/persistance/src/services/prisma/PrismaBlockStorage.ts index f3146642e..7ac3461b4 100644 --- a/packages/persistance/src/services/prisma/PrismaBlockStorage.ts +++ b/packages/persistance/src/services/prisma/PrismaBlockStorage.ts @@ -72,6 +72,16 @@ export class PrismaBlockStorage implements BlockQueue, BlockStorage { return (await this.getBlockByQuery({ height }))?.block; } + public async getBlockWithResultAt( + height: number + ): Promise { + const data = await this.getBlockByQuery({ height }); + if (data === undefined || data.result === undefined) { + return undefined; + } + return { block: data.block, result: data.result }; + } + public async getBlock(hash: string): Promise { return (await this.getBlockByQuery({ hash }))?.block; } diff --git a/packages/sequencer/src/storage/inmemory/InMemoryBlockStorage.ts b/packages/sequencer/src/storage/inmemory/InMemoryBlockStorage.ts index 5c5306e7b..4cc1c537f 100644 --- a/packages/sequencer/src/storage/inmemory/InMemoryBlockStorage.ts +++ b/packages/sequencer/src/storage/inmemory/InMemoryBlockStorage.ts @@ -23,6 +23,17 @@ export class InMemoryBlockStorage implements BlockStorage, BlockQueue { return this.blocks.at(height); } + public async getBlockWithResultAt( + height: number + ): Promise { + const block = this.blocks.at(height); + const result = this.results.at(height); + if (block === undefined || result === undefined) { + return undefined; + } + return { block, result }; + } + public async getCurrentBlockHeight(): Promise { return this.blocks.length; } diff --git a/packages/sequencer/src/storage/repositories/BlockStorage.ts b/packages/sequencer/src/storage/repositories/BlockStorage.ts index 7ec488754..d1246cc9d 100644 --- a/packages/sequencer/src/storage/repositories/BlockStorage.ts +++ b/packages/sequencer/src/storage/repositories/BlockStorage.ts @@ -19,5 +19,8 @@ export interface BlockStorage { pushBlock: (block: Block) => Promise; getBlockAt: (height: number) => Promise; + getBlockWithResultAt: ( + height: number + ) => Promise; getBlock: (hash: string) => Promise; }