From 416efc47f95027b6d5b67fa31be765888b463593 Mon Sep 17 00:00:00 2001 From: wb Date: Thu, 16 Apr 2026 12:14:51 +0800 Subject: [PATCH] feat(net): optimize transaction rate limiting with accurate cache size check 1. Add Manager.getCachedTransactionSize() = pushTransactionQueue + pendingTransactions + rePushTransactions to expose the true cached transaction count across all three queues. 2. Fix isTooManyPending() to include pushTransactionQueue, which was previously omitted, causing the pending threshold to be underestimated. 3. Update TransactionsMsgHandler.isBusy() to factor in the Manager cache size via TronNetDelegate.getCachedTransactionSize(), so the node stops accepting TRX INV messages when the full pipeline is busy. 4. Make the busy threshold configurable via node.maxTrxCacheSize (default: 50000), replacing the hardcoded MAX_TRX_SIZE constant. --- .../common/parameter/CommonParameter.java | 3 ++ .../org/tron/core/config/args/NodeConfig.java | 6 +++ common/src/main/resources/reference.conf | 2 + .../java/org/tron/core/config/args/Args.java | 1 + .../main/java/org/tron/core/db/Manager.java | 8 ++- .../org/tron/core/net/TronNetDelegate.java | 4 ++ .../TransactionsMsgHandler.java | 7 +-- framework/src/main/resources/config.conf | 4 ++ .../java/org/tron/core/db/ManagerTest.java | 51 +++++++++++++++++++ .../TransactionsMsgHandlerTest.java | 26 +++++++++- 10 files changed, 105 insertions(+), 7 deletions(-) diff --git a/common/src/main/java/org/tron/common/parameter/CommonParameter.java b/common/src/main/java/org/tron/common/parameter/CommonParameter.java index 24acb09443a..1f6b972d6f7 100644 --- a/common/src/main/java/org/tron/common/parameter/CommonParameter.java +++ b/common/src/main/java/org/tron/common/parameter/CommonParameter.java @@ -492,6 +492,9 @@ public class CommonParameter { public long pendingTransactionTimeout; @Getter @Setter + public int maxTrxCacheSize; + @Getter + @Setter public boolean nodeMetricsEnable = false; @Getter @Setter diff --git a/common/src/main/java/org/tron/core/config/args/NodeConfig.java b/common/src/main/java/org/tron/core/config/args/NodeConfig.java index 532c25efa44..10cc17800db 100644 --- a/common/src/main/java/org/tron/core/config/args/NodeConfig.java +++ b/common/src/main/java/org/tron/core/config/args/NodeConfig.java @@ -85,6 +85,7 @@ public class NodeConfig { private ChannelConfig channel = new ChannelConfig(); private int maxTransactionPendingSize = 2000; private long pendingTransactionTimeout = 60000; + private int maxTrxCacheSize = 50_000; private int agreeNodeCount = 0; private boolean openHistoryQueryWhenLiteFN = false; private boolean unsolidifiedBlockCheck = false; @@ -496,6 +497,11 @@ private void postProcess() { if (dynamicConfig.checkInterval <= 0) { dynamicConfig.checkInterval = 600; } + + // maxTrxCacheSize: minimum 2000 + if (maxTrxCacheSize < 2000) { + maxTrxCacheSize = 2000; + } } // =========================================================================== diff --git a/common/src/main/resources/reference.conf b/common/src/main/resources/reference.conf index 3c5e3c256c6..b6e7e287f92 100644 --- a/common/src/main/resources/reference.conf +++ b/common/src/main/resources/reference.conf @@ -344,6 +344,8 @@ node { receiveTcpMinDataLength = 2048 maxTransactionPendingSize = 2000 pendingTransactionTimeout = 60000 + # total cached trx across handler queues + pending + rePush + maxTrxCacheSize = 50000 # Consensus agreement agreeNodeCount = 0 diff --git a/framework/src/main/java/org/tron/core/config/args/Args.java b/framework/src/main/java/org/tron/core/config/args/Args.java index bed1c661c23..1d33cf335a8 100644 --- a/framework/src/main/java/org/tron/core/config/args/Args.java +++ b/framework/src/main/java/org/tron/core/config/args/Args.java @@ -640,6 +640,7 @@ private static void applyNodeConfig(NodeConfig nc) { PARAMETER.maxTransactionPendingSize = nc.getMaxTransactionPendingSize(); PARAMETER.pendingTransactionTimeout = nc.getPendingTransactionTimeout(); + PARAMETER.maxTrxCacheSize = nc.getMaxTrxCacheSize(); PARAMETER.validContractProtoThreadNum = nc.getValidContractProtoThreads(); diff --git a/framework/src/main/java/org/tron/core/db/Manager.java b/framework/src/main/java/org/tron/core/db/Manager.java index 2c188c90b30..d6ce53299f1 100644 --- a/framework/src/main/java/org/tron/core/db/Manager.java +++ b/framework/src/main/java/org/tron/core/db/Manager.java @@ -2085,9 +2085,13 @@ public NullifierStore getNullifierStore() { return chainBaseManager.getNullifierStore(); } + public int getCachedTransactionSize() { + return pushTransactionQueue.size() + getPendingTransactions().size() + + getRePushTransactions().size(); + } + public boolean isTooManyPending() { - return getPendingTransactions().size() + getRePushTransactions().size() - > maxTransactionPendingSize; + return getCachedTransactionSize() > maxTransactionPendingSize; } private void preValidateTransactionSign(List txs) diff --git a/framework/src/main/java/org/tron/core/net/TronNetDelegate.java b/framework/src/main/java/org/tron/core/net/TronNetDelegate.java index 100bad179bf..9a53172a806 100644 --- a/framework/src/main/java/org/tron/core/net/TronNetDelegate.java +++ b/framework/src/main/java/org/tron/core/net/TronNetDelegate.java @@ -384,4 +384,8 @@ public boolean isBlockUnsolidified() { return headNum - solidNum >= maxUnsolidifiedBlocks; } + public int getCachedTransactionSize() { + return dbManager.getCachedTransactionSize(); + } + } diff --git a/framework/src/main/java/org/tron/core/net/messagehandler/TransactionsMsgHandler.java b/framework/src/main/java/org/tron/core/net/messagehandler/TransactionsMsgHandler.java index d6bd439d7ff..02a1983c260 100644 --- a/framework/src/main/java/org/tron/core/net/messagehandler/TransactionsMsgHandler.java +++ b/framework/src/main/java/org/tron/core/net/messagehandler/TransactionsMsgHandler.java @@ -32,7 +32,6 @@ @Component public class TransactionsMsgHandler implements TronMsgHandler { - private static int MAX_TRX_SIZE = 50_000; private static int MAX_SMART_CONTRACT_SUBMIT_SIZE = 100; @Autowired private TronNetDelegate tronNetDelegate; @@ -41,7 +40,8 @@ public class TransactionsMsgHandler implements TronMsgHandler { @Autowired private ChainBaseManager chainBaseManager; - private BlockingQueue smartContractQueue = new LinkedBlockingQueue(MAX_TRX_SIZE); + private BlockingQueue smartContractQueue = new LinkedBlockingQueue( + Args.getInstance().getMaxTrxCacheSize()); private BlockingQueue queue = new LinkedBlockingQueue(); @@ -71,7 +71,8 @@ public void close() { } public boolean isBusy() { - return queue.size() + smartContractQueue.size() > MAX_TRX_SIZE; + return queue.size() + smartContractQueue.size() + + tronNetDelegate.getCachedTransactionSize() > Args.getInstance().getMaxTrxCacheSize(); } @Override diff --git a/framework/src/main/resources/config.conf b/framework/src/main/resources/config.conf index 6c8f2082301..07f5974a27d 100644 --- a/framework/src/main/resources/config.conf +++ b/framework/src/main/resources/config.conf @@ -160,6 +160,10 @@ node { # Range: [50, 2000], default: 500 # maxPendingBlockSize = 500 + # Maximum total number of cached transactions (handler queues + pending + rePush). + # When exceeded, the node stops accepting TRX INV messages from peers. + # maxTrxCacheSize = 50000 + # Number of validate sign thread, default availableProcessors # validateSignThreadNum = 16 diff --git a/framework/src/test/java/org/tron/core/db/ManagerTest.java b/framework/src/test/java/org/tron/core/db/ManagerTest.java index a0522417c59..211bddd1c07 100755 --- a/framework/src/test/java/org/tron/core/db/ManagerTest.java +++ b/framework/src/test/java/org/tron/core/db/ManagerTest.java @@ -26,6 +26,7 @@ import java.util.List; import java.util.Map; import java.util.Set; +import java.util.concurrent.BlockingQueue; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.atomic.AtomicInteger; import java.util.stream.Collectors; @@ -1419,6 +1420,56 @@ public void testClearSolidityContractTriggerCache() throws Exception { } } + @Test + public void testGetCachedTransactionSize() throws Exception { + BlockingQueue pushQ = new LinkedBlockingQueue<>(); + pushQ.add(new TransactionCapsule(Protocol.Transaction.getDefaultInstance())); + Field pushField = Manager.class.getDeclaredField("pushTransactionQueue"); + pushField.setAccessible(true); + pushField.set(dbManager, pushQ); + + dbManager.getPendingTransactions().clear(); + dbManager.getPendingTransactions().add( + new TransactionCapsule(Protocol.Transaction.getDefaultInstance())); + dbManager.getPendingTransactions().add( + new TransactionCapsule(Protocol.Transaction.getDefaultInstance())); + + dbManager.getRePushTransactions().clear(); + + // 1 (push) + 2 (pending) + 0 (rePush) = 3 + Assert.assertEquals(3, dbManager.getCachedTransactionSize()); + + // cleanup + pushQ.clear(); + dbManager.getPendingTransactions().clear(); + } + + @Test + public void testIsTooManyPendingIncludesPushQueue() throws Exception { + int threshold = Args.getInstance().getMaxTransactionPendingSize(); + + BlockingQueue pushQ = new LinkedBlockingQueue<>(); + Field pushField = Manager.class.getDeclaredField("pushTransactionQueue"); + pushField.setAccessible(true); + pushField.set(dbManager, pushQ); + + dbManager.getPendingTransactions().clear(); + dbManager.getRePushTransactions().clear(); + + for (int i = 0; i < threshold; i++) { + dbManager.getPendingTransactions().add( + new TransactionCapsule(Protocol.Transaction.getDefaultInstance())); + } + Assert.assertFalse(dbManager.isTooManyPending()); + + pushQ.add(new TransactionCapsule(Protocol.Transaction.getDefaultInstance())); + Assert.assertTrue(dbManager.isTooManyPending()); + + // cleanup + dbManager.getPendingTransactions().clear(); + pushQ.clear(); + } + public void adjustBalance(AccountStore accountStore, byte[] accountAddress, long amount) throws BalanceInsufficientException { Commons.adjustBalance(accountStore, accountAddress, amount, diff --git a/framework/src/test/java/org/tron/core/net/messagehandler/TransactionsMsgHandlerTest.java b/framework/src/test/java/org/tron/core/net/messagehandler/TransactionsMsgHandlerTest.java index c14f9a9c86a..beeeecbca98 100644 --- a/framework/src/test/java/org/tron/core/net/messagehandler/TransactionsMsgHandlerTest.java +++ b/framework/src/test/java/org/tron/core/net/messagehandler/TransactionsMsgHandlerTest.java @@ -48,8 +48,6 @@ public static void init() { public void testProcessMessage() { TransactionsMsgHandler transactionsMsgHandler = new TransactionsMsgHandler(); try { - Assert.assertFalse(transactionsMsgHandler.isBusy()); - transactionsMsgHandler.init(); PeerConnection peer = Mockito.mock(PeerConnection.class); @@ -60,6 +58,8 @@ public void testProcessMessage() { field.setAccessible(true); field.set(transactionsMsgHandler, tronNetDelegate); + Assert.assertFalse(transactionsMsgHandler.isBusy()); + BalanceContract.TransferContract transferContract = BalanceContract.TransferContract .newBuilder() .setAmount(10) @@ -290,6 +290,28 @@ public void testHandleTransaction() throws Exception { } } + @Test + public void testIsBusyWithCachedTransactions() throws Exception { + TransactionsMsgHandler handler = new TransactionsMsgHandler(); + + int threshold = Args.getInstance().getMaxTrxCacheSize(); + TronNetDelegate tronNetDelegateMock = Mockito.mock(TronNetDelegate.class); + Field field = TransactionsMsgHandler.class.getDeclaredField("tronNetDelegate"); + field.setAccessible(true); + field.set(handler, tronNetDelegateMock); + + // queue and smartContractQueue are empty, but cached size > threshold + Mockito.when(tronNetDelegateMock.getCachedTransactionSize()).thenReturn(threshold + 1); + Assert.assertTrue(handler.isBusy()); + + // boundary: cached size == threshold, isBusy() uses strict >, so not busy + Mockito.when(tronNetDelegateMock.getCachedTransactionSize()).thenReturn(threshold); + Assert.assertFalse(handler.isBusy()); + + Mockito.when(tronNetDelegateMock.getCachedTransactionSize()).thenReturn(0); + Assert.assertFalse(handler.isBusy()); + } + class TrxEvent { @Getter