Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion solana/solana-dump/src/dumper.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ interface Options extends DumperOptions {
maxConfirmationAttempts: number
assertLogMessagesNotNull: boolean
validateChainContinuity: boolean
txThreshold?: number
}


Expand All @@ -31,6 +32,7 @@ export class SolanaDumper extends Dumper<Block, Options> {
program.option('--max-confirmation-attempts <N>', 'Maximum number of confirmation attempts', positiveInt, 10)
program.option('--assert-log-messages-not-null', 'Check if tx.meta.logMessages is not null', false)
program.option('--validate-chain-continuity', 'Check if block parent hash matches previous block hash', false)
program.option('--tx-threshold <N>', 'Retry getBlock call if transactions count is less than threshold')
}

protected fixUnsafeIntegers(): boolean {
Expand Down Expand Up @@ -69,7 +71,8 @@ export class SolanaDumper extends Dumper<Block, Options> {
url: options.endpoint,
capacity: Number.MAX_SAFE_INTEGER,
retryAttempts: Number.MAX_SAFE_INTEGER,
requestTimeout: 30_000
requestTimeout: 30_000,
txThreshold: options.txThreshold,
})

return new SolanaRpcDataSource({
Expand Down
1 change: 1 addition & 0 deletions solana/solana-rpc/src/rpc-remote.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ export type RemoteRpcOptions = Pick<
* Remove vote transactions from all relevant responses
*/
noVotes?: boolean
txThreshold?: number
}


Expand Down
4 changes: 2 additions & 2 deletions solana/solana-rpc/src/rpc-worker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,12 @@ import {Commitment, GetBlockOptions, Rpc} from './rpc'
import type {RemoteRpcOptions} from './rpc-remote'


const {noVotes, ...rpcOptions} = getServerArguments<RemoteRpcOptions>()
const {noVotes, txThreshold, ...rpcOptions} = getServerArguments<RemoteRpcOptions>()

const rpc = new Rpc(new RpcClient({
...rpcOptions,
fixUnsafeIntegers: true
}))
}), txThreshold)


getServer()
Expand Down
63 changes: 57 additions & 6 deletions solana/solana-rpc/src/rpc.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
import {createLogger} from '@subsquid/logger'
import {CallOptions, RpcClient, RpcError, RpcProtocolError} from '@subsquid/rpc-client'
import {RpcCall, RpcErrorInfo} from '@subsquid/rpc-client/lib/interfaces'
import {GetBlock} from '@subsquid/solana-rpc-data'
import {CallOptions, RetryError, RpcClient, RpcError, RpcProtocolError} from '@subsquid/rpc-client'
import {RpcCall, RpcErrorInfo, RpcRequest} from '@subsquid/rpc-client/lib/interfaces'
import {GetBlock, isVoteTransaction} from '@subsquid/solana-rpc-data'
import {assertNotNull} from '@subsquid/util-internal'
import {
array,
B58,
Expand Down Expand Up @@ -49,10 +50,18 @@ export interface RpcApi {


export class Rpc implements RpcApi {
private requests: ThresholdRequests

constructor(
public readonly client: RpcClient,
public readonly log = createLogger('sqd:solana-rpc')
) {}
public readonly txThreshold?: number,
public readonly log = createLogger('sqd:solana-rpc'),
) {
if (this.txThreshold != null) {
assert(this.txThreshold > 0)
}
this.requests = new ThresholdRequests()
}

call<T=any>(method: string, params?: any[], options?: CallOptions<T>): Promise<T> {
return this.client.call(method, params, options)
Expand Down Expand Up @@ -107,7 +116,7 @@ export class Rpc implements RpcApi {
call[i] = {method: 'getBlock', params}
}
return this.reduceBatchOnRetry<GetBlock | 'skipped' | null | undefined>(call, {
validateResult: getResultValidator(nullable(GetBlock)),
validateResult: (result, req) => this.validateGetBlockResult(result, req),
validateError: captureNoBlockAtSlot
})
}
Expand All @@ -132,6 +141,23 @@ export class Rpc implements RpcApi {

return pack.flat()
}

validateGetBlockResult(result: unknown, req: RpcRequest) {
let validator = getResultValidator(nullable(GetBlock))
let block = validator(result)
if (this.txThreshold && block != null && block.transactions != null) {
let transactions = block.transactions.filter(tx => !isVoteTransaction(tx))
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

my bad i didn't understand my measurements at first. it doesn't take ~30ms to filter it's just ~0.030ms in average

if (transactions.length < this.txThreshold) {
let slot = req.params![0] as any as number
let retries = this.requests.get(slot)
if (retries < 3) {
this.requests.inc(slot)
throw new RetryError(`transactions count is less than threshold: ${transactions.length} < ${this.txThreshold}`)
}
}
}
return block
}
}


Expand All @@ -152,3 +178,28 @@ function getResultValidator<V extends Validator>(validator: V): (result: unknown
}
}
}


class ThresholdRequests {
inner: Map<number, number>

constructor() {
this.inner = new Map()
}

inc(slot: number) {
if (this.inner.size > 100) {
let keys = this.inner.keys()
for (let i = 0; i < 20; i++) {
let res = keys.next()
this.inner.delete(assertNotNull(res.value))
}
}
let val = this.inner.get(slot) ?? 0
this.inner.set(slot, val + 1)
}

get(slot: number) {
return this.inner.get(slot) ?? 0
}
}
Loading