From 805f5b5d1e7fd1aa1a2dd1762863d1cab2e9513c Mon Sep 17 00:00:00 2001 From: wb Date: Wed, 15 Apr 2026 11:13:51 +0800 Subject: [PATCH] feat(net): add P2P message deduplication and length validation - FetchInvDataMsgHandler: reject messages with duplicate hashes - TransactionsMsgHandler: reject messages with duplicate transactions - SyncBlockChainMsgHandler: reject blockIds list exceeding 30 entries - Add MAX_SYNC_CHAIN_IDS = 30 constant to NetConstants - Add unit tests covering duplicate rejection and boundary values All violations throw P2pException(BAD_MESSAGE), triggering peer disconnect via existing P2pEventHandlerImpl error path. --- .../common/parameter/CommonParameter.java | 3 + .../java/org/tron/core/config/Parameter.java | 1 + .../org/tron/core/config/args/NodeConfig.java | 6 ++ common/src/main/resources/reference.conf | 2 + .../java/org/tron/core/config/args/Args.java | 1 + .../main/java/org/tron/core/db/Manager.java | 8 +- .../org/tron/core/net/TronNetDelegate.java | 4 + .../FetchInvDataMsgHandler.java | 7 ++ .../messagehandler/InventoryMsgHandler.java | 17 +++- .../SyncBlockChainMsgHandler.java | 6 ++ .../TransactionsMsgHandler.java | 22 ++++-- framework/src/main/resources/config.conf | 4 + .../java/org/tron/core/db/ManagerTest.java | 51 ++++++++++++ .../FetchInvDataMsgHandlerTest.java | 35 ++++++++- .../InventoryMsgHandlerTest.java | 22 ++++++ .../SyncBlockChainMsgHandlerTest.java | 52 +++++++++++++ .../TransactionsMsgHandlerTest.java | 77 ++++++++++++++++++- 17 files changed, 302 insertions(+), 16 deletions(-) diff --git a/common/src/main/java/org/tron/common/parameter/CommonParameter.java b/common/src/main/java/org/tron/common/parameter/CommonParameter.java index 0028a5d50d0..1d59e3a9d10 100644 --- a/common/src/main/java/org/tron/common/parameter/CommonParameter.java +++ b/common/src/main/java/org/tron/common/parameter/CommonParameter.java @@ -492,6 +492,9 @@ public class CommonParameter { public long pendingTransactionTimeout; @Getter @Setter + public int maxTrxCacheSize; + @Getter + @Setter public boolean nodeMetricsEnable = false; @Getter @Setter diff --git a/common/src/main/java/org/tron/core/config/Parameter.java b/common/src/main/java/org/tron/core/config/Parameter.java index db88ab5e047..5349ef8d875 100644 --- a/common/src/main/java/org/tron/core/config/Parameter.java +++ b/common/src/main/java/org/tron/core/config/Parameter.java @@ -102,6 +102,7 @@ public class NetConstants { public static final int MSG_CACHE_DURATION_IN_BLOCKS = 5; public static final int MAX_BLOCK_FETCH_PER_PEER = 100; public static final int MAX_TRX_FETCH_PER_PEER = 1000; + public static final int MAX_SYNC_CHAIN_IDS = 30; } public class DatabaseConstants { diff --git a/common/src/main/java/org/tron/core/config/args/NodeConfig.java b/common/src/main/java/org/tron/core/config/args/NodeConfig.java index 620152a907a..653fac2eb2c 100644 --- a/common/src/main/java/org/tron/core/config/args/NodeConfig.java +++ b/common/src/main/java/org/tron/core/config/args/NodeConfig.java @@ -85,6 +85,7 @@ public class NodeConfig { private ChannelConfig channel = new ChannelConfig(); private int maxTransactionPendingSize = 2000; private long pendingTransactionTimeout = 60000; + private int maxTrxCacheSize = 50_000; private int agreeNodeCount = 0; private boolean openHistoryQueryWhenLiteFN = false; private boolean unsolidifiedBlockCheck = false; @@ -498,6 +499,11 @@ private void postProcess() { if (dynamicConfig.checkInterval <= 0) { dynamicConfig.checkInterval = 600; } + + // maxTrxCacheSize: minimum 2000 + if (maxTrxCacheSize < 2000) { + maxTrxCacheSize = 2000; + } } // =========================================================================== diff --git a/common/src/main/resources/reference.conf b/common/src/main/resources/reference.conf index 63e5d86a4af..67343b9b75a 100644 --- a/common/src/main/resources/reference.conf +++ b/common/src/main/resources/reference.conf @@ -346,6 +346,8 @@ node { receiveTcpMinDataLength = 2048 maxTransactionPendingSize = 2000 pendingTransactionTimeout = 60000 + # total cached trx across handler queues + pending + rePush + maxTrxCacheSize = 50000 # Consensus agreement agreeNodeCount = 0 diff --git a/framework/src/main/java/org/tron/core/config/args/Args.java b/framework/src/main/java/org/tron/core/config/args/Args.java index 652f37a90db..350f4455b4a 100644 --- a/framework/src/main/java/org/tron/core/config/args/Args.java +++ b/framework/src/main/java/org/tron/core/config/args/Args.java @@ -625,6 +625,7 @@ private static void applyNodeConfig(NodeConfig nc) { PARAMETER.maxTransactionPendingSize = nc.getMaxTransactionPendingSize(); PARAMETER.pendingTransactionTimeout = nc.getPendingTransactionTimeout(); + PARAMETER.maxTrxCacheSize = nc.getMaxTrxCacheSize(); PARAMETER.validContractProtoThreadNum = nc.getValidContractProtoThreads(); diff --git a/framework/src/main/java/org/tron/core/db/Manager.java b/framework/src/main/java/org/tron/core/db/Manager.java index 2c188c90b30..d6ce53299f1 100644 --- a/framework/src/main/java/org/tron/core/db/Manager.java +++ b/framework/src/main/java/org/tron/core/db/Manager.java @@ -2085,9 +2085,13 @@ public NullifierStore getNullifierStore() { return chainBaseManager.getNullifierStore(); } + public int getCachedTransactionSize() { + return pushTransactionQueue.size() + getPendingTransactions().size() + + getRePushTransactions().size(); + } + public boolean isTooManyPending() { - return getPendingTransactions().size() + getRePushTransactions().size() - > maxTransactionPendingSize; + return getCachedTransactionSize() > maxTransactionPendingSize; } private void preValidateTransactionSign(List txs) diff --git a/framework/src/main/java/org/tron/core/net/TronNetDelegate.java b/framework/src/main/java/org/tron/core/net/TronNetDelegate.java index 100bad179bf..9a53172a806 100644 --- a/framework/src/main/java/org/tron/core/net/TronNetDelegate.java +++ b/framework/src/main/java/org/tron/core/net/TronNetDelegate.java @@ -384,4 +384,8 @@ public boolean isBlockUnsolidified() { return headNum - solidNum >= maxUnsolidifiedBlocks; } + public int getCachedTransactionSize() { + return dbManager.getCachedTransactionSize(); + } + } diff --git a/framework/src/main/java/org/tron/core/net/messagehandler/FetchInvDataMsgHandler.java b/framework/src/main/java/org/tron/core/net/messagehandler/FetchInvDataMsgHandler.java index ecb7853ce6f..b1f26468081 100644 --- a/framework/src/main/java/org/tron/core/net/messagehandler/FetchInvDataMsgHandler.java +++ b/framework/src/main/java/org/tron/core/net/messagehandler/FetchInvDataMsgHandler.java @@ -3,6 +3,7 @@ import com.google.common.cache.Cache; import com.google.common.cache.CacheBuilder; import com.google.common.collect.Lists; +import java.util.HashSet; import java.util.List; import java.util.concurrent.TimeUnit; import lombok.extern.slf4j.Slf4j; @@ -153,6 +154,12 @@ public boolean isAdvInv(PeerConnection peer, FetchInvDataMessage msg) { private void check(PeerConnection peer, FetchInvDataMessage fetchInvDataMsg, boolean isAdv) throws P2pException { + List hashList = fetchInvDataMsg.getHashList(); + if (hashList.size() != new HashSet<>(hashList).size()) { + throw new P2pException(TypeEnum.BAD_MESSAGE, + "FetchInvData contains duplicate hashes, size: " + hashList.size()); + } + MessageTypes type = fetchInvDataMsg.getInvMessageType(); if (type == MessageTypes.TRX) { diff --git a/framework/src/main/java/org/tron/core/net/messagehandler/InventoryMsgHandler.java b/framework/src/main/java/org/tron/core/net/messagehandler/InventoryMsgHandler.java index e8783b25e95..c92d53584a3 100644 --- a/framework/src/main/java/org/tron/core/net/messagehandler/InventoryMsgHandler.java +++ b/framework/src/main/java/org/tron/core/net/messagehandler/InventoryMsgHandler.java @@ -1,10 +1,14 @@ package org.tron.core.net.messagehandler; +import java.util.HashSet; +import java.util.List; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; import org.tron.common.utils.Sha256Hash; import org.tron.core.config.args.Args; +import org.tron.core.exception.P2pException; +import org.tron.core.exception.P2pException.TypeEnum; import org.tron.core.net.TronNetDelegate; import org.tron.core.net.message.TronMessage; import org.tron.core.net.message.adv.InventoryMessage; @@ -27,7 +31,7 @@ public class InventoryMsgHandler implements TronMsgHandler { private TransactionsMsgHandler transactionsMsgHandler; @Override - public void processMessage(PeerConnection peer, TronMessage msg) { + public void processMessage(PeerConnection peer, TronMessage msg) throws P2pException { InventoryMessage inventoryMessage = (InventoryMessage) msg; InventoryType type = inventoryMessage.getInventoryType(); @@ -45,10 +49,17 @@ public void processMessage(PeerConnection peer, TronMessage msg) { } } - private boolean check(PeerConnection peer, InventoryMessage inventoryMessage) { + private boolean check(PeerConnection peer, InventoryMessage inventoryMessage) + throws P2pException { + + List hashList = inventoryMessage.getHashList(); + if (hashList.size() != new HashSet<>(hashList).size()) { + throw new P2pException(TypeEnum.BAD_MESSAGE, + "Inventory contains duplicate hashes, size: " + hashList.size()); + } InventoryType type = inventoryMessage.getInventoryType(); - int size = inventoryMessage.getHashList().size(); + int size = hashList.size(); if (peer.isNeedSyncFromPeer() || peer.isNeedSyncFromUs()) { logger.warn("Drop inv: {} size: {} from Peer {}, syncFromUs: {}, syncFromPeer: {}", diff --git a/framework/src/main/java/org/tron/core/net/messagehandler/SyncBlockChainMsgHandler.java b/framework/src/main/java/org/tron/core/net/messagehandler/SyncBlockChainMsgHandler.java index 71d268b22bc..5c18e014978 100644 --- a/framework/src/main/java/org/tron/core/net/messagehandler/SyncBlockChainMsgHandler.java +++ b/framework/src/main/java/org/tron/core/net/messagehandler/SyncBlockChainMsgHandler.java @@ -71,6 +71,12 @@ private boolean check(PeerConnection peer, SyncBlockChainMessage msg) throws P2p throw new P2pException(TypeEnum.BAD_MESSAGE, "SyncBlockChain blockIds is empty"); } + if (blockIds.size() > NetConstants.MAX_SYNC_CHAIN_IDS) { + throw new P2pException(TypeEnum.BAD_MESSAGE, + "SyncBlockChain blockIds size " + blockIds.size() + + " exceeds limit " + NetConstants.MAX_SYNC_CHAIN_IDS); + } + BlockId firstId = blockIds.get(0); if (!tronNetDelegate.containBlockInMainChain(firstId)) { logger.warn("Sync message from peer {} without the first block: {}", diff --git a/framework/src/main/java/org/tron/core/net/messagehandler/TransactionsMsgHandler.java b/framework/src/main/java/org/tron/core/net/messagehandler/TransactionsMsgHandler.java index d6bd439d7ff..e153e21f331 100644 --- a/framework/src/main/java/org/tron/core/net/messagehandler/TransactionsMsgHandler.java +++ b/framework/src/main/java/org/tron/core/net/messagehandler/TransactionsMsgHandler.java @@ -1,5 +1,8 @@ package org.tron.core.net.messagehandler; +import java.util.HashSet; +import java.util.List; +import java.util.Set; import java.util.concurrent.BlockingQueue; import java.util.concurrent.ExecutorService; import java.util.concurrent.LinkedBlockingQueue; @@ -11,6 +14,7 @@ import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; import org.tron.common.es.ExecutorServiceManager; +import org.tron.common.utils.Sha256Hash; import org.tron.core.ChainBaseManager; import org.tron.core.config.args.Args; import org.tron.core.exception.P2pException; @@ -32,7 +36,6 @@ @Component public class TransactionsMsgHandler implements TronMsgHandler { - private static int MAX_TRX_SIZE = 50_000; private static int MAX_SMART_CONTRACT_SUBMIT_SIZE = 100; @Autowired private TronNetDelegate tronNetDelegate; @@ -41,7 +44,8 @@ public class TransactionsMsgHandler implements TronMsgHandler { @Autowired private ChainBaseManager chainBaseManager; - private BlockingQueue smartContractQueue = new LinkedBlockingQueue(MAX_TRX_SIZE); + private BlockingQueue smartContractQueue = new LinkedBlockingQueue( + Args.getInstance().getMaxTrxCacheSize()); private BlockingQueue queue = new LinkedBlockingQueue(); @@ -71,7 +75,8 @@ public void close() { } public boolean isBusy() { - return queue.size() + smartContractQueue.size() > MAX_TRX_SIZE; + return queue.size() + smartContractQueue.size() + + tronNetDelegate.getCachedTransactionSize() > Args.getInstance().getMaxTrxCacheSize(); } @Override @@ -120,8 +125,15 @@ public void processMessage(PeerConnection peer, TronMessage msg) throws P2pExcep } private void check(PeerConnection peer, TransactionsMessage msg) throws P2pException { - for (Transaction trx : msg.getTransactions().getTransactionsList()) { - Item item = new Item(new TransactionMessage(trx).getMessageId(), InventoryType.TRX); + List list = msg.getTransactions().getTransactionsList(); + Set seen = new HashSet<>(list.size() * 2); + for (Transaction trx : list) { + Sha256Hash id = new TransactionMessage(trx).getMessageId(); + if (!seen.add(id)) { + throw new P2pException(TypeEnum.BAD_MESSAGE, + "TransactionsMessage contains duplicate transaction: " + id); + } + Item item = new Item(id, InventoryType.TRX); if (!peer.getAdvInvRequest().containsKey(item)) { throw new P2pException(TypeEnum.BAD_MESSAGE, "trx: " + msg.getMessageId() + " without request."); diff --git a/framework/src/main/resources/config.conf b/framework/src/main/resources/config.conf index 6c8f2082301..07f5974a27d 100644 --- a/framework/src/main/resources/config.conf +++ b/framework/src/main/resources/config.conf @@ -160,6 +160,10 @@ node { # Range: [50, 2000], default: 500 # maxPendingBlockSize = 500 + # Maximum total number of cached transactions (handler queues + pending + rePush). + # When exceeded, the node stops accepting TRX INV messages from peers. + # maxTrxCacheSize = 50000 + # Number of validate sign thread, default availableProcessors # validateSignThreadNum = 16 diff --git a/framework/src/test/java/org/tron/core/db/ManagerTest.java b/framework/src/test/java/org/tron/core/db/ManagerTest.java index a0522417c59..211bddd1c07 100755 --- a/framework/src/test/java/org/tron/core/db/ManagerTest.java +++ b/framework/src/test/java/org/tron/core/db/ManagerTest.java @@ -26,6 +26,7 @@ import java.util.List; import java.util.Map; import java.util.Set; +import java.util.concurrent.BlockingQueue; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.atomic.AtomicInteger; import java.util.stream.Collectors; @@ -1419,6 +1420,56 @@ public void testClearSolidityContractTriggerCache() throws Exception { } } + @Test + public void testGetCachedTransactionSize() throws Exception { + BlockingQueue pushQ = new LinkedBlockingQueue<>(); + pushQ.add(new TransactionCapsule(Protocol.Transaction.getDefaultInstance())); + Field pushField = Manager.class.getDeclaredField("pushTransactionQueue"); + pushField.setAccessible(true); + pushField.set(dbManager, pushQ); + + dbManager.getPendingTransactions().clear(); + dbManager.getPendingTransactions().add( + new TransactionCapsule(Protocol.Transaction.getDefaultInstance())); + dbManager.getPendingTransactions().add( + new TransactionCapsule(Protocol.Transaction.getDefaultInstance())); + + dbManager.getRePushTransactions().clear(); + + // 1 (push) + 2 (pending) + 0 (rePush) = 3 + Assert.assertEquals(3, dbManager.getCachedTransactionSize()); + + // cleanup + pushQ.clear(); + dbManager.getPendingTransactions().clear(); + } + + @Test + public void testIsTooManyPendingIncludesPushQueue() throws Exception { + int threshold = Args.getInstance().getMaxTransactionPendingSize(); + + BlockingQueue pushQ = new LinkedBlockingQueue<>(); + Field pushField = Manager.class.getDeclaredField("pushTransactionQueue"); + pushField.setAccessible(true); + pushField.set(dbManager, pushQ); + + dbManager.getPendingTransactions().clear(); + dbManager.getRePushTransactions().clear(); + + for (int i = 0; i < threshold; i++) { + dbManager.getPendingTransactions().add( + new TransactionCapsule(Protocol.Transaction.getDefaultInstance())); + } + Assert.assertFalse(dbManager.isTooManyPending()); + + pushQ.add(new TransactionCapsule(Protocol.Transaction.getDefaultInstance())); + Assert.assertTrue(dbManager.isTooManyPending()); + + // cleanup + dbManager.getPendingTransactions().clear(); + pushQ.clear(); + } + public void adjustBalance(AccountStore accountStore, byte[] accountAddress, long amount) throws BalanceInsufficientException { Commons.adjustBalance(accountStore, accountAddress, amount, diff --git a/framework/src/test/java/org/tron/core/net/messagehandler/FetchInvDataMsgHandlerTest.java b/framework/src/test/java/org/tron/core/net/messagehandler/FetchInvDataMsgHandlerTest.java index e05ee29d015..e8ec4257814 100644 --- a/framework/src/test/java/org/tron/core/net/messagehandler/FetchInvDataMsgHandlerTest.java +++ b/framework/src/test/java/org/tron/core/net/messagehandler/FetchInvDataMsgHandlerTest.java @@ -15,6 +15,7 @@ import org.tron.common.utils.Sha256Hash; import org.tron.core.capsule.BlockCapsule; import org.tron.core.config.Parameter; +import org.tron.core.exception.P2pException; import org.tron.core.net.P2pRateLimiter; import org.tron.core.net.TronNetDelegate; import org.tron.core.net.message.adv.BlockMessage; @@ -124,12 +125,42 @@ public void testSyncFetchCheck() { Assert.assertEquals("minBlockNum: 16000, blockNum: 10000", e2.getMessage()); } + @Test + public void testDuplicateHashRejected() throws Exception { + FetchInvDataMsgHandler handler = new FetchInvDataMsgHandler(); + PeerConnection peer = Mockito.mock(PeerConnection.class); + AdvService advService = Mockito.mock(AdvService.class); + TronNetDelegate tronNetDelegate = Mockito.mock(TronNetDelegate.class); + + ReflectUtils.setFieldValue(handler, "advService", advService); + ReflectUtils.setFieldValue(handler, "tronNetDelegate", tronNetDelegate); + + Sha256Hash hash = Sha256Hash.ZERO_HASH; + List hashList = new LinkedList<>(); + hashList.add(hash); + hashList.add(hash); // duplicate + + FetchInvDataMessage msg = new FetchInvDataMessage(hashList, + Protocol.Inventory.InventoryType.TRX); + + Cache advInvSpread = CacheBuilder.newBuilder() + .maximumSize(20000).expireAfterWrite(1, TimeUnit.HOURS).build(); + advInvSpread.put(new Item(hash, Protocol.Inventory.InventoryType.TRX), 1L); + Mockito.when(peer.getAdvInvSpread()).thenReturn(advInvSpread); + + try { + handler.processMessage(peer, msg); + Assert.fail("Expected P2pException for duplicate hash"); + } catch (P2pException e) { + Assert.assertEquals(P2pException.TypeEnum.BAD_MESSAGE, e.getType()); + } + } + @Test public void testRateLimiter() { - BlockCapsule.BlockId blockId = new BlockCapsule.BlockId(Sha256Hash.ZERO_HASH, 10000L); List blockIds = new LinkedList<>(); for (int i = 0; i <= 100; i++) { - blockIds.add(blockId); + blockIds.add(new BlockCapsule.BlockId(Sha256Hash.ZERO_HASH, (long) i)); } FetchInvDataMessage msg = new FetchInvDataMessage(blockIds, Protocol.Inventory.InventoryType.BLOCK); diff --git a/framework/src/test/java/org/tron/core/net/messagehandler/InventoryMsgHandlerTest.java b/framework/src/test/java/org/tron/core/net/messagehandler/InventoryMsgHandlerTest.java index 1dbf7c7150f..3d24ff2a4bf 100644 --- a/framework/src/test/java/org/tron/core/net/messagehandler/InventoryMsgHandlerTest.java +++ b/framework/src/test/java/org/tron/core/net/messagehandler/InventoryMsgHandlerTest.java @@ -6,10 +6,14 @@ import java.net.InetAddress; import java.net.InetSocketAddress; import java.util.ArrayList; +import java.util.Arrays; +import org.junit.Assert; import org.junit.Test; import org.mockito.Mockito; import org.tron.common.TestConstants; +import org.tron.common.utils.Sha256Hash; import org.tron.core.config.args.Args; +import org.tron.core.exception.P2pException; import org.tron.core.net.TronNetDelegate; import org.tron.core.net.message.adv.InventoryMessage; import org.tron.core.net.peer.PeerConnection; @@ -52,6 +56,24 @@ public void testProcessMessage() throws Exception { Mockito.verify(tronNetDelegate, Mockito.atLeastOnce()).isBlockUnsolidified(); } + @Test + public void testDuplicateHashesRejected() throws Exception { + InventoryMsgHandler handler = new InventoryMsgHandler(); + Args.setParam(new String[] {}, TestConstants.TEST_CONF); + + Sha256Hash hash = Sha256Hash.wrap(new byte[32]); + InventoryMessage msg = new InventoryMessage(Arrays.asList(hash, hash), InventoryType.TRX); + PeerConnection peer = new PeerConnection(); + peer.setChannel(getChannel("1.0.0.4", 1000)); + + try { + handler.processMessage(peer, msg); + Assert.fail("Expected P2pException for duplicate hashes"); + } catch (P2pException e) { + Assert.assertEquals(P2pException.TypeEnum.BAD_MESSAGE, e.getType()); + } + } + private Channel getChannel(String host, int port) throws Exception { Channel channel = new Channel(); InetSocketAddress inetSocketAddress = new InetSocketAddress(host, port); diff --git a/framework/src/test/java/org/tron/core/net/messagehandler/SyncBlockChainMsgHandlerTest.java b/framework/src/test/java/org/tron/core/net/messagehandler/SyncBlockChainMsgHandlerTest.java index 7960ef190f1..08c5484880f 100644 --- a/framework/src/test/java/org/tron/core/net/messagehandler/SyncBlockChainMsgHandlerTest.java +++ b/framework/src/test/java/org/tron/core/net/messagehandler/SyncBlockChainMsgHandlerTest.java @@ -16,11 +16,13 @@ import org.junit.rules.TemporaryFolder; import org.tron.common.TestConstants; import org.tron.common.application.TronApplicationContext; +import org.tron.common.utils.Sha256Hash; import org.tron.core.capsule.BlockCapsule; import org.tron.core.capsule.BlockCapsule.BlockId; import org.tron.core.config.DefaultConfig; import org.tron.core.config.args.Args; import org.tron.core.exception.P2pException; +import org.tron.core.net.TronNetDelegate; import org.tron.core.net.message.sync.BlockInventoryMessage; import org.tron.core.net.message.sync.SyncBlockChainMessage; import org.tron.core.net.peer.PeerConnection; @@ -108,6 +110,56 @@ public void testProcessMessage() throws Exception { Assert.assertEquals(1, list.size()); } + @Test + public void testBlockIdsExceedsLimit() throws Exception { + List blockIds = new ArrayList<>(); + // genesis block as first (in main chain), then 30 more = 31 total → exceeds limit + BlockId genesis = context.getBean( + TronNetDelegate.class).getGenesisBlockId(); + blockIds.add(genesis); + for (int i = 1; i <= 30; i++) { + blockIds.add(new BlockId(Sha256Hash.ZERO_HASH, i)); + } + SyncBlockChainMessage msg = new SyncBlockChainMessage(blockIds); + + try { + Method checkMethod = SyncBlockChainMsgHandler.class + .getDeclaredMethod("check", PeerConnection.class, SyncBlockChainMessage.class); + checkMethod.setAccessible(true); + checkMethod.invoke(handler, peer, msg); + Assert.fail("Expected P2pException for oversized blockIds"); + } catch (InvocationTargetException e) { + Assert.assertTrue(e.getCause() instanceof P2pException); + Assert.assertEquals(P2pException.TypeEnum.BAD_MESSAGE, + ((P2pException) e.getCause()).getType()); + } + } + + @Test + public void testBlockIdsAtLimit() throws Exception { + List blockIds = new ArrayList<>(); + BlockId genesis = context.getBean( + TronNetDelegate.class).getGenesisBlockId(); + blockIds.add(genesis); + for (int i = 1; i < 30; i++) { + blockIds.add(new BlockId(Sha256Hash.ZERO_HASH, i)); + } + // exactly 30 → should not throw for length check + SyncBlockChainMessage msg = new SyncBlockChainMessage(blockIds); + + Method checkMethod = SyncBlockChainMsgHandler.class + .getDeclaredMethod("check", PeerConnection.class, SyncBlockChainMessage.class); + checkMethod.setAccessible(true); + // does not throw P2pException due to length (may return false for other checks — that's fine) + try { + checkMethod.invoke(handler, peer, msg); + } catch (InvocationTargetException e) { + Assert.assertFalse("Should not fail with BAD_MESSAGE for length at limit", + e.getCause() instanceof P2pException + && ((P2pException) e.getCause()).getMessage().contains("exceeds limit")); + } + } + @AfterClass public static void destroy() { for (PeerConnection p : PeerManager.getPeers()) { diff --git a/framework/src/test/java/org/tron/core/net/messagehandler/TransactionsMsgHandlerTest.java b/framework/src/test/java/org/tron/core/net/messagehandler/TransactionsMsgHandlerTest.java index c14f9a9c86a..abe69e76ff2 100644 --- a/framework/src/test/java/org/tron/core/net/messagehandler/TransactionsMsgHandlerTest.java +++ b/framework/src/test/java/org/tron/core/net/messagehandler/TransactionsMsgHandlerTest.java @@ -23,6 +23,7 @@ import org.tron.common.TestConstants; import org.tron.common.runtime.TvmTestUtils; import org.tron.common.utils.ByteArray; +import org.tron.common.utils.ReflectUtils; import org.tron.core.ChainBaseManager; import org.tron.core.config.args.Args; import org.tron.core.exception.P2pException; @@ -48,8 +49,6 @@ public static void init() { public void testProcessMessage() { TransactionsMsgHandler transactionsMsgHandler = new TransactionsMsgHandler(); try { - Assert.assertFalse(transactionsMsgHandler.isBusy()); - transactionsMsgHandler.init(); PeerConnection peer = Mockito.mock(PeerConnection.class); @@ -60,6 +59,8 @@ public void testProcessMessage() { field.setAccessible(true); field.set(transactionsMsgHandler, tronNetDelegate); + Assert.assertFalse(transactionsMsgHandler.isBusy()); + BalanceContract.TransferContract transferContract = BalanceContract.TransferContract .newBuilder() .setAmount(10) @@ -142,10 +143,10 @@ public void testProcessMessageAfterClose() throws Exception { TransactionsMsgHandler handler = new TransactionsMsgHandler(); handler.init(); handler.close(); - + PeerConnection peer = Mockito.mock(PeerConnection.class); TransactionsMessage msg = Mockito.mock(TransactionsMessage.class); - + handler.processMessage(peer, msg); Mockito.verify(msg, Mockito.never()).getTransactions(); @@ -290,6 +291,74 @@ public void testHandleTransaction() throws Exception { } } + @Test + public void testDuplicateTransactionRejected() throws Exception { + TransactionsMsgHandler handler = new TransactionsMsgHandler(); + handler.init(); + try { + PeerConnection peer = Mockito.mock(PeerConnection.class); + + // Build a transaction + BalanceContract.TransferContract transferContract = BalanceContract.TransferContract + .newBuilder() + .setAmount(10) + .setOwnerAddress(ByteString.copyFrom(ByteArray.fromHexString("121212a9cf"))) + .setToAddress(ByteString.copyFrom(ByteArray.fromHexString("232323a9cf"))) + .build(); + Protocol.Transaction trx = Protocol.Transaction.newBuilder() + .setRawData(Protocol.Transaction.raw.newBuilder() + .addContract(Protocol.Transaction.Contract.newBuilder() + .setType(Protocol.Transaction.Contract.ContractType.TransferContract) + .setParameter(Any.pack(transferContract)).build()) + .build()) + .build(); + + // Same trx twice → duplicate + Protocol.Transactions transactions = Protocol.Transactions.newBuilder() + .addTransactions(trx) + .addTransactions(trx) + .build(); + TransactionsMessage msg = new TransactionsMessage(transactions.getTransactionsList()); + + TransactionMessage trxMsg = new TransactionMessage(trx); + Item item = new Item(trxMsg.getMessageId(), Protocol.Inventory.InventoryType.TRX); + Map advInvRequest = new ConcurrentHashMap<>(); + advInvRequest.put(item, System.currentTimeMillis()); + Mockito.when(peer.getAdvInvRequest()).thenReturn(advInvRequest); + + try { + handler.processMessage(peer, msg); + Assert.fail("Expected P2pException for duplicate transaction"); + } catch (P2pException e) { + Assert.assertEquals(P2pException.TypeEnum.BAD_MESSAGE, e.getType()); + } + } finally { + handler.close(); + } + } + + @Test + public void testIsBusyWithCachedTransactions() throws Exception { + TransactionsMsgHandler handler = new TransactionsMsgHandler(); + + int threshold = Args.getInstance().getMaxTrxCacheSize(); + TronNetDelegate tronNetDelegateMock = Mockito.mock(TronNetDelegate.class); + Field field = TransactionsMsgHandler.class.getDeclaredField("tronNetDelegate"); + field.setAccessible(true); + field.set(handler, tronNetDelegateMock); + + // queue and smartContractQueue are empty, but cached size > threshold + Mockito.when(tronNetDelegateMock.getCachedTransactionSize()).thenReturn(threshold + 1); + Assert.assertTrue(handler.isBusy()); + + // boundary: cached size == threshold, isBusy() uses strict >, so not busy + Mockito.when(tronNetDelegateMock.getCachedTransactionSize()).thenReturn(threshold); + Assert.assertFalse(handler.isBusy()); + + Mockito.when(tronNetDelegateMock.getCachedTransactionSize()).thenReturn(0); + Assert.assertFalse(handler.isBusy()); + } + class TrxEvent { @Getter