Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/)
## Unreleased

### Added

- Added missing block detection and recovery in the indexer.[#488](https://github.com/proto-kit/framework/pull/488)
- `@dependencyFactory` for static dependency factory type safety
- Added Mempool sorting [#395](https://github.com/proto-kit/framework/pull/395)
- Introduced dynamic block building and JIT transaction fetching [#394](https://github.com/proto-kit/framework/pull/394)
Expand Down
149 changes: 96 additions & 53 deletions packages/indexer/src/IndexerNotifier.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
import {
BlockTriggerBase,
BlockStorage,
BlockWithResult,
Sequencer,
sequencerModule,
SequencerModule,
Expand All @@ -12,7 +14,8 @@ import {
import { log } from "@proto-kit/common";
import { inject } from "tsyringe";

import { IndexBlockTask } from "./tasks/IndexBlockTask";
import { IndexBlockTask, IndexBlockResult } from "./tasks/IndexBlockTask";
import { IndexMissingBlocksTask } from "./tasks/IndexMissingBlocksTask";
import { IndexPendingTxTask } from "./tasks/IndexPendingTxTask";
import { IndexSettlementTask } from "./tasks/IndexSettlementTask";
import { IndexBatchTask } from "./tasks/IndexBatchTask";
Expand All @@ -30,7 +33,10 @@ export class IndexerNotifier extends SequencerModule<Record<never, never>> {
public sequencer: Sequencer<NotifierMandatorySequencerModules>,
@inject("TaskQueue")
public taskQueue: TaskQueue,
@inject("BlockStorage")
private readonly blockStorage: BlockStorage,
public indexBlockTask: IndexBlockTask,
public indexMissingBlocksTask: IndexMissingBlocksTask,
public indexPendingTxTask: IndexPendingTxTask,
public indexBatchTask: IndexBatchTask,
public indexSettlementTask: IndexSettlementTask,
Expand All @@ -39,6 +45,67 @@ export class IndexerNotifier extends SequencerModule<Record<never, never>> {
super();
}

private async pushTask(
queueName: string,
name: string,
payload: string
): Promise<void> {
const queue = await this.taskQueue.getQueue(queueName);
await queue.addTask({
name,
payload,
flowId: "",
sequencerId: this.sequencerIdProvider.getSequencerId(),
});
}

private async handleIndexBlockTaskCompleted(
payload: TaskPayload
): Promise<void> {
if (payload.name !== this.indexBlockTask.name) {
return;
}

try {
// eslint-disable-next-line @typescript-eslint/consistent-type-assertions
const result = JSON.parse(payload.payload) as IndexBlockResult;

if (
result.status !== "missing-blocks" ||
result.missingHeights.length === 0
) {
return;
}

const heights = [...result.missingHeights, result.incomingHeight];

const blocks = await Promise.all(
heights.map((h) => this.blockStorage.getBlockWithResultAt(h))
);

const filteredBlocks = blocks.filter(
(block): block is BlockWithResult => block !== undefined
);

if (filteredBlocks.length === 0) {
log.warn("No blocks found to re-send");
return;
}

const serialized = await this.indexMissingBlocksTask
.inputSerializer()
.toJSON(filteredBlocks);

await this.pushTask(
this.indexMissingBlocksTask.name,
this.indexMissingBlocksTask.name,
serialized
);
} catch (error) {
log.error("Failed to handle block task completion result", error);
}
}

public async propagateEventsAsTasks() {
const queue = await this.taskQueue.getQueue(this.indexBlockTask.name);
const inputSerializer = this.indexBlockTask.inputSerializer();
Expand All @@ -47,86 +114,62 @@ export class IndexerNotifier extends SequencerModule<Record<never, never>> {
const settlementInputSerializer =
this.indexSettlementTask.inputSerializer();

await queue.onCompleted(
async (payload) => await this.handleIndexBlockTaskCompleted(payload)
);

this.sequencer.events.on("block-metadata-produced", async (block) => {
log.debug(
"Notifiying the indexer about block",
block.block.height.toBigInt()
);
const payload = await inputSerializer.toJSON(block);
const sequencerId = this.sequencerIdProvider.getSequencerId();

const task: TaskPayload = {
name: this.indexBlockTask.name,
payload,
flowId: "", // empty for now
sequencerId,
};

await queue.addTask(task);
await this.pushTask(
this.indexBlockTask.name,
this.indexBlockTask.name,
payload
);
});

this.sequencer.events.on("mempool-transaction-added", async (tx) => {
try {
const txQueue = await this.taskQueue.getQueue(
this.indexPendingTxTask.name
);
const payload = await txInputSerializer.toJSON(tx);
const sequencerId = this.sequencerIdProvider.getSequencerId();

const task: TaskPayload = {
name: this.indexPendingTxTask.name,
payload,
flowId: "",
sequencerId,
};

await txQueue.addTask(task);
await this.pushTask(
this.indexPendingTxTask.name,
this.indexPendingTxTask.name,
payload
);
} catch (err) {
log.error("Failed to add pending-tx task", err);
}
});

this.sequencer.events.on("batch-produced", async (batch) => {
log.debug("Notifiying the indexer about batch", batch?.height);
try {
const batchQueue = await this.taskQueue.getQueue(
this.indexBatchTask.name
);

const payload = await batchInputSerializer.toJSON(batch);
const sequencerId = this.sequencerIdProvider.getSequencerId();

const task: TaskPayload = {
name: this.indexBatchTask.name,
payload,
flowId: "",
sequencerId,
};

await batchQueue.addTask(task);
await this.pushTask(
this.indexBatchTask.name,
this.indexBatchTask.name,
payload
);
} catch (err) {
log.error(`Failed to index batch ${batch?.height} ${err}`);
}
});

this.sequencer.events.on("settlement-submitted", async (settlement) => {
log.debug(
"Notifiying the indexer about settlement",
"Notifying the indexer about settlement",
settlement.transactionHash
);
try {
const settlementQueue = await this.taskQueue.getQueue(
this.indexSettlementTask.name
);

const payload = await settlementInputSerializer.toJSON(settlement);
const sequencerId = this.sequencerIdProvider.getSequencerId();

const task: TaskPayload = {
name: this.indexSettlementTask.name,
payload,
flowId: "",
sequencerId,
};

await settlementQueue.addTask(task);
await this.pushTask(
this.indexSettlementTask.name,
this.indexSettlementTask.name,
payload
);
} catch (err) {
log.error(
`Failed to add index settlement: ${settlement.transactionHash} ${err}`
Expand Down
1 change: 1 addition & 0 deletions packages/indexer/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,3 +7,4 @@ export * from "./tasks/IndexBlockTaskParameters";
export * from "./tasks/IndexPendingTxTask";
export * from "./tasks/IndexBatchTask";
export * from "./tasks/IndexSettlementTask";
export * from "./tasks/IndexMissingBlocksTask";
45 changes: 34 additions & 11 deletions packages/indexer/src/tasks/IndexBlockTask.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import {
BlockQueue,
BlockStorage,
Task,
TaskSerializer,
TaskWorkerModule,
Expand All @@ -12,17 +13,25 @@ import {
IndexBlockTaskParametersSerializer,
} from "./IndexBlockTaskParameters";

export interface IndexBlockResult {
status: "ok" | "missing-blocks";
missingHeights: number[];
incomingHeight: number;
}

@injectable()
export class IndexBlockTask
extends TaskWorkerModule
implements Task<IndexBlockTaskParameters, string | void>
implements Task<IndexBlockTaskParameters, IndexBlockResult>
{
public name = "index-block";

public constructor(
public taskSerializer: IndexBlockTaskParametersSerializer,
@inject("BlockQueue")
public blockStorage: BlockQueue
public blockStorage: BlockQueue,
@inject("BlockStorage")
private readonly blockRepository: BlockStorage
) {
super();
}
Expand All @@ -32,27 +41,41 @@ export class IndexBlockTask

public async compute(
input: IndexBlockTaskParameters
): Promise<string | void> {
): Promise<IndexBlockResult> {
const incomingHeight = Number(input.block.height.toBigInt());
try {
const currentHeight = await this.blockRepository.getCurrentBlockHeight();

if (incomingHeight > currentHeight) {
const missingHeights = Array.from(
{ length: incomingHeight - currentHeight },
(_, i) => currentHeight + i
);

return { status: "missing-blocks", missingHeights, incomingHeight };
}
await this.blockStorage.pushBlock(input.block);
await this.blockStorage.pushResult(input.result);

log.info(`Block ${incomingHeight} indexed successfully`);
return { status: "ok", missingHeights: [], incomingHeight };
} catch (error) {
log.error("Failed to index block", input.block.height.toBigInt(), error);
return undefined;
log.error("Failed to index block", incomingHeight, error);
return { status: "ok", missingHeights: [], incomingHeight };
}

log.info(`Block ${input.block.height.toBigInt()} indexed sucessfully`);
return "";
}

public inputSerializer(): TaskSerializer<IndexBlockTaskParameters> {
return this.taskSerializer;
}

public resultSerializer(): TaskSerializer<string | void> {
public resultSerializer(): TaskSerializer<IndexBlockResult> {
return {
fromJSON: async () => {},
toJSON: async () => "",
toJSON: async (input: IndexBlockResult) => JSON.stringify(input),

fromJSON: async (json: string) =>
// eslint-disable-next-line @typescript-eslint/consistent-type-assertions
JSON.parse(json) as IndexBlockResult,
};
}
}
74 changes: 74 additions & 0 deletions packages/indexer/src/tasks/IndexMissingBlocksTask.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
import {
BlockQueue,
BlockWithResult,
Task,
TaskSerializer,
TaskWorkerModule,
} from "@proto-kit/sequencer";
import { log } from "@proto-kit/common";
import { inject, injectable } from "tsyringe";

import { IndexBlockTaskParametersSerializer } from "./IndexBlockTaskParameters";

@injectable()
export class IndexMissingBlocksTask
extends TaskWorkerModule
implements Task<BlockWithResult[], string | void>
{
public name = "index-missing-blocks";

public constructor(
public taskSerializer: IndexBlockTaskParametersSerializer,
@inject("BlockQueue")
public blockStorage: BlockQueue
) {
super();
}

// eslint-disable-next-line @typescript-eslint/no-empty-function
public async prepare(): Promise<void> {}

public async compute(input: BlockWithResult[]): Promise<string | void> {
const lastHeight = Number(input[input.length - 1].block.height.toBigInt());
for (const blockWithResult of input) {
const height = Number(blockWithResult.block.height.toBigInt());
const isLast = height === lastHeight;
try {
// eslint-disable-next-line no-await-in-loop
await this.blockStorage.pushBlock(blockWithResult.block);
// eslint-disable-next-line no-await-in-loop
await this.blockStorage.pushResult(blockWithResult.result);
log.info(
`${isLast ? "" : "Missing "}block ${height} indexed successfully`
);
} catch (error) {
log.error(
`Failed to index ${isLast ? "" : "missing "}block at height ${height}`,
error
);
return undefined;
}
}
return "";
}

public inputSerializer(): TaskSerializer<BlockWithResult[]> {
return {
toJSON: (blocks: BlockWithResult[]): string =>
JSON.stringify(blocks.map((b) => this.taskSerializer.toJSON(b))),

fromJSON: (json: string): BlockWithResult[] => {
// eslint-disable-next-line @typescript-eslint/consistent-type-assertions
const items = JSON.parse(json) as string[];
return items.map((item) => this.taskSerializer.fromJSON(item));
},
};
}

public resultSerializer(): TaskSerializer<string | void> {
return {
fromJSON: async () => {},
toJSON: async () => "",
};
}
}
Loading
Loading