diff --git a/solana/solana-dump/src/dumper.ts b/solana/solana-dump/src/dumper.ts index 80c49d173..526caf702 100644 --- a/solana/solana-dump/src/dumper.ts +++ b/solana/solana-dump/src/dumper.ts @@ -17,6 +17,7 @@ interface Options extends DumperOptions { maxConfirmationAttempts: number assertLogMessagesNotNull: boolean validateChainContinuity: boolean + txThreshold?: number } @@ -31,6 +32,7 @@ export class SolanaDumper extends Dumper { program.option('--max-confirmation-attempts ', 'Maximum number of confirmation attempts', positiveInt, 10) program.option('--assert-log-messages-not-null', 'Check if tx.meta.logMessages is not null', false) program.option('--validate-chain-continuity', 'Check if block parent hash matches previous block hash', false) + program.option('--tx-threshold ', 'Retry getBlock call if transactions count is less than threshold') } protected fixUnsafeIntegers(): boolean { @@ -69,7 +71,8 @@ export class SolanaDumper extends Dumper { url: options.endpoint, capacity: Number.MAX_SAFE_INTEGER, retryAttempts: Number.MAX_SAFE_INTEGER, - requestTimeout: 30_000 + requestTimeout: 30_000, + txThreshold: options.txThreshold, }) return new SolanaRpcDataSource({ diff --git a/solana/solana-rpc/src/rpc-remote.ts b/solana/solana-rpc/src/rpc-remote.ts index 2311d5451..f95c51fef 100644 --- a/solana/solana-rpc/src/rpc-remote.ts +++ b/solana/solana-rpc/src/rpc-remote.ts @@ -18,6 +18,7 @@ export type RemoteRpcOptions = Pick< * Remove vote transactions from all relevant responses */ noVotes?: boolean + txThreshold?: number } diff --git a/solana/solana-rpc/src/rpc-worker.ts b/solana/solana-rpc/src/rpc-worker.ts index 14c3bc3ad..a339f4ab1 100644 --- a/solana/solana-rpc/src/rpc-worker.ts +++ b/solana/solana-rpc/src/rpc-worker.ts @@ -5,12 +5,12 @@ import {Commitment, GetBlockOptions, Rpc} from './rpc' import type {RemoteRpcOptions} from './rpc-remote' -const {noVotes, ...rpcOptions} = getServerArguments() +const {noVotes, txThreshold, ...rpcOptions} = getServerArguments() const rpc = new Rpc(new RpcClient({ ...rpcOptions, fixUnsafeIntegers: true -})) +}), txThreshold) getServer() diff --git a/solana/solana-rpc/src/rpc.ts b/solana/solana-rpc/src/rpc.ts index a1c20ea32..b5155f7d2 100644 --- a/solana/solana-rpc/src/rpc.ts +++ b/solana/solana-rpc/src/rpc.ts @@ -1,7 +1,8 @@ import {createLogger} from '@subsquid/logger' -import {CallOptions, RpcClient, RpcError, RpcProtocolError} from '@subsquid/rpc-client' -import {RpcCall, RpcErrorInfo} from '@subsquid/rpc-client/lib/interfaces' -import {GetBlock} from '@subsquid/solana-rpc-data' +import {CallOptions, RetryError, RpcClient, RpcError, RpcProtocolError} from '@subsquid/rpc-client' +import {RpcCall, RpcErrorInfo, RpcRequest} from '@subsquid/rpc-client/lib/interfaces' +import {GetBlock, isVoteTransaction} from '@subsquid/solana-rpc-data' +import {assertNotNull} from '@subsquid/util-internal' import { array, B58, @@ -49,10 +50,18 @@ export interface RpcApi { export class Rpc implements RpcApi { + private requests: ThresholdRequests + constructor( public readonly client: RpcClient, - public readonly log = createLogger('sqd:solana-rpc') - ) {} + public readonly txThreshold?: number, + public readonly log = createLogger('sqd:solana-rpc'), + ) { + if (this.txThreshold != null) { + assert(this.txThreshold > 0) + } + this.requests = new ThresholdRequests() + } call(method: string, params?: any[], options?: CallOptions): Promise { return this.client.call(method, params, options) @@ -107,7 +116,7 @@ export class Rpc implements RpcApi { call[i] = {method: 'getBlock', params} } return this.reduceBatchOnRetry(call, { - validateResult: getResultValidator(nullable(GetBlock)), + validateResult: (result, req) => this.validateGetBlockResult(result, req), validateError: captureNoBlockAtSlot }) } @@ -132,6 +141,23 @@ export class Rpc implements RpcApi { return pack.flat() } + + validateGetBlockResult(result: unknown, req: RpcRequest) { + let validator = getResultValidator(nullable(GetBlock)) + let block = validator(result) + if (this.txThreshold && block != null && block.transactions != null) { + let transactions = block.transactions.filter(tx => !isVoteTransaction(tx)) + if (transactions.length < this.txThreshold) { + let slot = req.params![0] as any as number + let retries = this.requests.get(slot) + if (retries < 3) { + this.requests.inc(slot) + throw new RetryError(`transactions count is less than threshold: ${transactions.length} < ${this.txThreshold}`) + } + } + } + return block + } } @@ -152,3 +178,28 @@ function getResultValidator(validator: V): (result: unknown } } } + + +class ThresholdRequests { + inner: Map + + constructor() { + this.inner = new Map() + } + + inc(slot: number) { + if (this.inner.size > 100) { + let keys = this.inner.keys() + for (let i = 0; i < 20; i++) { + let res = keys.next() + this.inner.delete(assertNotNull(res.value)) + } + } + let val = this.inner.get(slot) ?? 0 + this.inner.set(slot, val + 1) + } + + get(slot: number) { + return this.inner.get(slot) ?? 0 + } +}