From 2a04113a91976f6ab561377fc46952bff5e2e615 Mon Sep 17 00:00:00 2001 From: Brian Botha Date: Wed, 25 Jun 2025 10:02:57 +1000 Subject: [PATCH 1/5] fix: optimised how long `queueDataFromRequest` uses the connection to fetch data --- src/nodes/NodeManager.ts | 101 +++++++++++++++++++++++---------------- 1 file changed, 61 insertions(+), 40 deletions(-) diff --git a/src/nodes/NodeManager.ts b/src/nodes/NodeManager.ts index 232297309..051453601 100644 --- a/src/nodes/NodeManager.ts +++ b/src/nodes/NodeManager.ts @@ -34,6 +34,7 @@ import type { NodeAddress, NodeBucket, NodeBucketIndex, + NodeContact, NodeContactAddressData, NodeId, NodeIdEncoded, @@ -1145,48 +1146,68 @@ class NodeManager { nodeConnectionsQueue: NodeConnectionQueue, ctx: ContextTimed, ) { - await this.nodeConnectionManager.withConnF(nodeId, ctx, async (conn) => { - const nodeIdEncoded = nodesUtils.encodeNodeId(nodeIdTarget); - const closestConnectionsRequestP = (async () => { - const resultStream = - await conn.rpcClient.methods.nodesClosestActiveConnectionsGet( - { - nodeIdEncoded: nodeIdEncoded, - }, - ctx, - ); - // Collecting results - for await (const result of resultStream) { - ctx.signal.throwIfAborted(); - const nodeIdNew = nodesUtils.decodeNodeId(result.nodeId); - if (nodeIdNew == null) { - utils.never(`failed to decode NodeId "${result.nodeId}"`); + const nodeIdEncoded = nodesUtils.encodeNodeId(nodeIdTarget); + const closestConnectionsRequestP = (async () => { + const data = await this.nodeConnectionManager.withConnF( + nodeId, + ctx, + async (conn) => { + const resultStream = + await conn.rpcClient.methods.nodesClosestActiveConnectionsGet( + { + nodeIdEncoded: nodeIdEncoded, + }, + ctx, + ); + const connections: Array = []; + // Collecting results + for await (const result of resultStream) { + ctx.signal.throwIfAborted(); + const nodeIdNew = nodesUtils.decodeNodeId(result.nodeId); + if (nodeIdNew == null) { + utils.never(`failed to decode NodeId "${result.nodeId}"`); + } + connections.push(nodeIdNew); } - nodeConnectionsQueue.queueNodeSignal(nodeIdNew, nodeId); - } - })(); - const closestNodesRequestP = (async () => { - const resultStream = - await conn.rpcClient.methods.nodesClosestLocalNodesGet( - { - nodeIdEncoded: nodeIdEncoded, - }, - ctx, - ); - for await (const { nodeIdEncoded, nodeContact } of resultStream) { - ctx.signal.throwIfAborted(); - const nodeId = nodesUtils.decodeNodeId(nodeIdEncoded); - if (nodeId == null) { - utils.never(`failed to decode NodeId "${nodeIdEncoded}"`); + return connections; + }, + ); + for (const nodeIdNew of data) { + nodeConnectionsQueue.queueNodeSignal(nodeIdNew, nodeId); + } + })(); + const closestNodesRequestP = (async () => { + const data = await this.nodeConnectionManager.withConnF( + nodeId, + ctx, + async (conn) => { + const resultStream = + await conn.rpcClient.methods.nodesClosestLocalNodesGet( + { + nodeIdEncoded: nodeIdEncoded, + }, + ctx, + ); + const data: Array<[NodeId, NodeContact]> = []; + for await (const { nodeIdEncoded, nodeContact } of resultStream) { + ctx.signal.throwIfAborted(); + const nodeId = nodesUtils.decodeNodeId(nodeIdEncoded); + if (nodeId == null) { + utils.never(`failed to decode NodeId "${nodeIdEncoded}"`); + } + data.push([nodeId, nodeContact]); } - nodeConnectionsQueue.queueNodeDirect(nodeId, nodeContact); - } - })(); - await Promise.allSettled([ - closestConnectionsRequestP, - closestNodesRequestP, - ]); - }); + return data; + }, + ); + for (const [nodeId, nodeContact] of data) { + nodeConnectionsQueue.queueNodeDirect(nodeId, nodeContact); + } + })(); + await Promise.allSettled([ + closestConnectionsRequestP, + closestNodesRequestP, + ]); } /** From a62c3c97f89b84f26ad941769570fee4a02de573 Mon Sep 17 00:00:00 2001 From: Brian Botha Date: Wed, 25 Jun 2025 11:00:07 +1000 Subject: [PATCH 2/5] feat: discovery now only requests claims it doesn't know about from the peer when processing a node --- src/discovery/Discovery.ts | 23 ++++++++++++++++++++++- src/nodes/NodeManager.ts | 19 ++++++------------- 2 files changed, 28 insertions(+), 14 deletions(-) diff --git a/src/discovery/Discovery.ts b/src/discovery/Discovery.ts index 7e51b52ab..ac1fc7ab2 100644 --- a/src/discovery/Discovery.ts +++ b/src/discovery/Discovery.ts @@ -460,7 +460,11 @@ class Discovery { return; } // Iterate over each of the claims in the chain (already verified). - for (const signedClaim of Object.values(vertexChainData)) { + const processedClaimIds: Set = new Set(); + for (const [claimIdString, signedClaim] of Object.entries( + vertexChainData, + )) { + processedClaimIds.add(claimIdString); switch (signedClaim.payload.typ) { case 'ClaimLinkNode': await this.processClaimLinkNode( @@ -483,6 +487,23 @@ class Discovery { ); } } + // Queue up known linked vertices that weren't just processed + for await (const [gestaltId, gestaltLink] of this.gestaltGraph.getLinks([ + 'node', + nodeId, + ])) { + const claimIdString = decodeClaimId( + gestaltLink[1].claim.payload.jti, + )!.toString(); + if (!processedClaimIds.has(claimIdString)) { + await this.scheduleDiscoveryForVertex( + gestaltId, + undefined, + lastProcessedCutoffTime, + ['node', nodeId], + ); + } + } await this.gestaltGraph.setVertexProcessedTime( gestaltNodeId, processedTime, diff --git a/src/nodes/NodeManager.ts b/src/nodes/NodeManager.ts index 051453601..f3128dcf5 100644 --- a/src/nodes/NodeManager.ts +++ b/src/nodes/NodeManager.ts @@ -1284,18 +1284,18 @@ class NodeManager { * For node1 -> node2 claims, the verification process also involves connecting * to node2 to verify the claim (to retrieve its signing public key). * @param targetNodeId Id of the node to connect request the chain data of. - * @param _claimId If set then we get the claims newer that this claim ID. + * @param claimId If set then we get the claims newer that this claim ID. * @param ctx */ public requestChainData( targetNodeId: NodeId, - _claimId?: ClaimId, + claimId?: ClaimId, ctx?: Partial, ): PromiseCancellable>; @decorators.timedCancellable(true) public async requestChainData( targetNodeId: NodeId, - _claimId: ClaimId | undefined, + claimId: ClaimId | undefined, @decorators.context ctx: ContextTimed, ): Promise> { // Verify the node's chain with its own public key @@ -1303,18 +1303,11 @@ class NodeManager { const claims: Record = {}; const client = connection.getClient(); - // Let claimIdEncoded: ClaimIdEncoded | undefined; - - // if (claimId != null) { - // claimIdEncoded = claimsUtils.encodeClaimId(claimId); - // } else { - // claimIdEncoded = undefined; - // } - + const claimIdEncoded: ClaimIdEncoded | undefined = + claimId != null ? claimsUtils.encodeClaimId(claimId) : undefined; for await (const agentClaim of await client.methods.nodesClaimsGet( { - // Needs to be addressed later - causes test failures in Discovery.test.ts - // seek: claimIdEncoded, + seek: claimIdEncoded, }, ctx, )) { From e7bd1048b731c639950362bd63f02f0306d72c8e Mon Sep 17 00:00:00 2001 From: Brian Botha Date: Wed, 25 Jun 2025 11:07:20 +1000 Subject: [PATCH 3/5] fix: removed resolved TODO --- src/utils/utils.ts | 2 -- 1 file changed, 2 deletions(-) diff --git a/src/utils/utils.ts b/src/utils/utils.ts index 803ef16f3..cb713d4a9 100644 --- a/src/utils/utils.ts +++ b/src/utils/utils.ts @@ -114,8 +114,6 @@ function sleepCancellable(ms: number): PromiseCancellable { /** * Checks if value is an object. * Arrays are also considered objects. - * The type guard here says `o is any`. - * TODO: When TS 4.9.x is released, change this to `o is object`. * At that point `'x' in o` checks become type guards that * can assert the property's existence. */ From e7daa643b84d3f52f3e3db8460ab06415a89075a Mon Sep 17 00:00:00 2001 From: Brian Botha Date: Wed, 25 Jun 2025 11:17:15 +1000 Subject: [PATCH 4/5] fix: fixed warnings about adding event handlers to abort signals in the nodes domain --- src/utils/utils.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/utils/utils.ts b/src/utils/utils.ts index cb713d4a9..b0acff9a9 100644 --- a/src/utils/utils.ts +++ b/src/utils/utils.ts @@ -296,8 +296,8 @@ function promise(): PromiseDeconstructed { * Promise constructed from signal * This rejects when the signal is aborted */ -// fixme: There is also a one signal to many `signalPromise` relationship in the NM connection queue that needs to be fixed. function signalPromise(signal: AbortSignal): PromiseCancellable { + setMaxListeners(signal); return new PromiseCancellable((resolve, _, signalCancel) => { // Short circuit if signal already aborted if (signal.aborted) return resolve(); From 7be67ce3496967c07003e31216a31a78f6a76caa Mon Sep 17 00:00:00 2001 From: Brian Botha Date: Fri, 27 Jun 2025 15:57:59 +1000 Subject: [PATCH 5/5] fix: `garbageCollectBucket` now only does a simple direct ping to the known address when checking a node rather than performing full ICE --- src/nodes/NodeManager.ts | 83 ++++++++++++++++++++++++++------- tests/nodes/NodeManager.test.ts | 16 +++---- 2 files changed, 75 insertions(+), 24 deletions(-) diff --git a/src/nodes/NodeManager.ts b/src/nodes/NodeManager.ts index f3128dcf5..783cfbfb8 100644 --- a/src/nodes/NodeManager.ts +++ b/src/nodes/NodeManager.ts @@ -1277,6 +1277,44 @@ class NodeManager { } } + /** + * Will attempt to make a direct connection without ICE. + * This will only succeed due to these conditions + * 1. connection already exists to target. + * 2. Nat already allows port due to already being punched. + * 3. Port is publicly accessible due to nat configuration . + * Will return true if connection was established or already exists, false otherwise. + */ + public pingNodeAddressMultiple( + nodeId: NodeId, + addresses: Array<[Host, Port]>, + ctx?: Partial, + ): PromiseCancellable; + @startStop.ready(new nodesErrors.ErrorNodeConnectionManagerNotRunning()) + @decorators.timedCancellable( + true, + (nodeConnectionManager: NodeConnectionManager) => + nodeConnectionManager.connectionConnectTimeoutTime, + ) + public async pingNodeAddressMultiple( + nodeId: NodeId, + addresses: Array<[Host, Port]>, + @decorators.context ctx: ContextTimed, + ): Promise { + if (this.nodeConnectionManager.hasConnection(nodeId)) return true; + try { + await this.nodeConnectionManager.createConnectionMultiple( + [nodeId], + addresses, + ctx, + ); + return true; + } catch (e) { + if (!nodesUtils.isConnectionError(e)) throw e; + return false; + } + } + /** * Connects to the target node, and retrieves its sigchain data. * Verifies and returns the decoded chain as ChainData. Note: this will drop @@ -2353,7 +2391,7 @@ class NodeManager { let removedNodes = 0; const unsetLock = new Lock(); const pendingPromises: Array> = []; - for (const [nodeId] of bucket) { + for (const [nodeId, nodeContact] of bucket) { if (removedNodes >= pendingNodes.size) break; await semaphore.waitForUnlock(ctx); if (ctx.signal?.aborted === true) break; @@ -2365,21 +2403,34 @@ class NodeManager { signal: ctx.signal, timer: connectionConnectTimeoutTime, }; - const pingResult = await this.pingNode(nodeId, pingCtx); - if (pingResult != null) { - // Succeeded so update - const [nodeAddress, nodeContactAddressData] = pingResult; - await this.setNode( - nodeId, - nodeAddress, - nodeContactAddressData, - false, - false, - undefined, - tran, - ctx, - ); - } else { + // Getting known addresses for the ping + const desiredAddresses: Array = []; + for (const [ + nodeContactAddress, + nodeContactAddressData, + ] of Object.entries(nodeContact)) { + if (nodeContactAddressData.mode === 'direct') { + desiredAddresses.push( + nodesUtils.parseNodeContactAddress(nodeContactAddress), + ); + } + } + + const resolvedAddresses = await networkUtils.resolveHostnames( + desiredAddresses, + undefined, + this.dnsServers, + ctx, + ); + + const pingResult = await this.pingNodeAddressMultiple( + nodeId, + resolvedAddresses, + pingCtx, + ); + + // If ping fails we remove it, otherwise we don't update + if (!pingResult) { // We don't remove node the ping was aborted if (ctx.signal.aborted) return; // We need to lock this since it's concurrent diff --git a/tests/nodes/NodeManager.test.ts b/tests/nodes/NodeManager.test.ts index 846f799c4..dd5cc55e4 100644 --- a/tests/nodes/NodeManager.test.ts +++ b/tests/nodes/NodeManager.test.ts @@ -350,7 +350,7 @@ describe(`${NodeManager.name}`, () => { ); }); test('should not add new node if bucket is full and old nodes are responsive', async () => { - const mockedPingNode = jest.spyOn(nodeManager, 'pingNode'); + const mockedPingNode = jest.spyOn(nodeManager, 'pingNodeAddressMultiple'); // Fill bucket const nodeId = generateNodeIdForBucket(keyRing.getNodeId(), 255, 0); for (let i = 0; i < 20; i++) { @@ -362,7 +362,7 @@ describe(`${NodeManager.name}`, () => { }); } - mockedPingNode.mockResolvedValue([nodeAddress, nodeContactAddressData]); + mockedPingNode.mockResolvedValue(true); // Add 21st node await nodeManager.setNode( nodeId, @@ -374,7 +374,7 @@ describe(`${NodeManager.name}`, () => { expect(await nodeGraph.getNodeContact(nodeId)).toBeUndefined(); }); test('should add new node if bucket is full and old nodes are responsive but force is set', async () => { - const mockedPingNode = jest.spyOn(nodeManager, 'pingNode'); + const mockedPingNode = jest.spyOn(nodeManager, 'pingNodeAddressMultiple'); // Fill bucket const nodeId = generateNodeIdForBucket(keyRing.getNodeId(), 255, 0); for (let i = 0; i < 20; i++) { @@ -386,7 +386,7 @@ describe(`${NodeManager.name}`, () => { }); } - mockedPingNode.mockResolvedValue([nodeAddress, nodeContactAddressData]); + mockedPingNode.mockResolvedValue(true); // Add 21st node await nodeManager.setNode( nodeId, @@ -399,7 +399,7 @@ describe(`${NodeManager.name}`, () => { expect(await nodeGraph.getNodeContact(nodeId)).toBeDefined(); }); test('should add new node if bucket is full and old nodes are unresponsive', async () => { - const mockedPingNode = jest.spyOn(nodeManager, 'pingNode'); + const mockedPingNode = jest.spyOn(nodeManager, 'pingNodeAddressMultiple'); // Fill bucket const nodeId = generateNodeIdForBucket(keyRing.getNodeId(), 255, 0); for (let i = 0; i < 20; i++) { @@ -407,7 +407,7 @@ describe(`${NodeManager.name}`, () => { await nodeManager.setNode(nodeId, nodeAddress, nodeContactAddressData); } - mockedPingNode.mockResolvedValue(undefined); + mockedPingNode.mockResolvedValue(false); // Add 21st node await nodeManager.setNode( nodeId, @@ -419,7 +419,7 @@ describe(`${NodeManager.name}`, () => { expect(await nodeGraph.getNodeContact(nodeId)).toBeDefined(); }); test('should not block when bucket is full', async () => { - const mockedPingNode = jest.spyOn(nodeManager, 'pingNode'); + const mockedPingNode = jest.spyOn(nodeManager, 'pingNodeAddressMultiple'); // Fill bucket const nodeId = generateNodeIdForBucket(keyRing.getNodeId(), 255, 0); for (let i = 0; i < 20; i++) { @@ -436,7 +436,7 @@ describe(`${NodeManager.name}`, () => { mockedPingNode.mockImplementation(() => { return new PromiseCancellable(async (resolve) => { await waitP; - resolve(undefined); + resolve(false); }); }); // Add 21st node