Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions common/src/main/java/org/tron/core/config/Parameter.java
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,7 @@ public class NetConstants {
public static final int MSG_CACHE_DURATION_IN_BLOCKS = 5;
public static final int MAX_BLOCK_FETCH_PER_PEER = 100;
public static final int MAX_TRX_FETCH_PER_PEER = 1000;
public static final int MAX_SYNC_CHAIN_IDS = 30;
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice to have a named constant here instead of a magic number! 👍

Quick question — how was 30 chosen? Observed from real traffic, or estimated from getBlockChainSummary()'s log-step algorithm?

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Based on the chain digest algorithm, using a binary search from the solidified block to the head block, 30 blocks can generate 2^30 block digests, which is theoretically large enough, and 30 blocks will not put pressure on the system.

}

public class DatabaseConstants {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;
import com.google.common.collect.Lists;
import java.util.HashSet;
import java.util.List;
import java.util.concurrent.TimeUnit;
import lombok.extern.slf4j.Slf4j;
Expand Down Expand Up @@ -153,6 +154,12 @@ public boolean isAdvInv(PeerConnection peer, FetchInvDataMessage msg) {

private void check(PeerConnection peer, FetchInvDataMessage fetchInvDataMsg,
boolean isAdv) throws P2pException {
List<Sha256Hash> hashList = fetchInvDataMsg.getHashList();
if (hashList.size() != new HashSet<>(hashList).size()) {
throw new P2pException(TypeEnum.BAD_MESSAGE,
"FetchInvData contains duplicate hashes, size: " + hashList.size());
}
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good check. Validation has already been added for FetchInvDataMessage, SyncBlockChainMessage, and TransactionsMessage. Why not also add deduplication checks for InventoryMessage and ChainInventoryMessage?

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Inventory Messages have rate limits and perform deduplication during processing, and they don't consume CPU, so there's no need to perform a second check before processing. ChainInventoryMessages have stricter checks than deduplication, while also ensuring that there are no duplicate lists.

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ChainInventoryMessages's block number's order must be increasing, we can ignore it. But although inventory messages don't need consume CPU, deduplication checks are still needed to keep consistency with other message handling logic and to defend against malicious peers.

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

InventoryMessages does not constitute an attack, and the lack of duplicate checks has no impact on the system.


MessageTypes type = fetchInvDataMsg.getInvMessageType();

if (type == MessageTypes.TRX) {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,10 +1,14 @@
package org.tron.core.net.messagehandler;

import java.util.HashSet;
import java.util.List;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import org.tron.common.utils.Sha256Hash;
import org.tron.core.config.args.Args;
import org.tron.core.exception.P2pException;
import org.tron.core.exception.P2pException.TypeEnum;
import org.tron.core.net.TronNetDelegate;
import org.tron.core.net.message.TronMessage;
import org.tron.core.net.message.adv.InventoryMessage;
Expand All @@ -27,7 +31,7 @@ public class InventoryMsgHandler implements TronMsgHandler {
private TransactionsMsgHandler transactionsMsgHandler;

@Override
public void processMessage(PeerConnection peer, TronMessage msg) {
public void processMessage(PeerConnection peer, TronMessage msg) throws P2pException {
InventoryMessage inventoryMessage = (InventoryMessage) msg;
InventoryType type = inventoryMessage.getInventoryType();

Expand All @@ -45,10 +49,17 @@ public void processMessage(PeerConnection peer, TronMessage msg) {
}
}

private boolean check(PeerConnection peer, InventoryMessage inventoryMessage) {
private boolean check(PeerConnection peer, InventoryMessage inventoryMessage)
throws P2pException {

List<Sha256Hash> hashList = inventoryMessage.getHashList();
if (hashList.size() != new HashSet<>(hashList).size()) {
throw new P2pException(TypeEnum.BAD_MESSAGE,
"Inventory contains duplicate hashes, size: " + hashList.size());
}

InventoryType type = inventoryMessage.getInventoryType();
int size = inventoryMessage.getHashList().size();
int size = hashList.size();

if (peer.isNeedSyncFromPeer() || peer.isNeedSyncFromUs()) {
logger.warn("Drop inv: {} size: {} from Peer {}, syncFromUs: {}, syncFromPeer: {}",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,12 @@ private boolean check(PeerConnection peer, SyncBlockChainMessage msg) throws P2p
throw new P2pException(TypeEnum.BAD_MESSAGE, "SyncBlockChain blockIds is empty");
}

if (blockIds.size() > NetConstants.MAX_SYNC_CHAIN_IDS) {
throw new P2pException(TypeEnum.BAD_MESSAGE,
"SyncBlockChain blockIds size " + blockIds.size()
+ " exceeds limit " + NetConstants.MAX_SYNC_CHAIN_IDS);
}

BlockId firstId = blockIds.get(0);
if (!tronNetDelegate.containBlockInMainChain(firstId)) {
logger.warn("Sync message from peer {} without the first block: {}",
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
package org.tron.core.net.messagehandler;

import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
Expand All @@ -11,6 +14,7 @@
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import org.tron.common.es.ExecutorServiceManager;
import org.tron.common.utils.Sha256Hash;
import org.tron.core.ChainBaseManager;
import org.tron.core.config.args.Args;
import org.tron.core.exception.P2pException;
Expand Down Expand Up @@ -121,8 +125,15 @@ public void processMessage(PeerConnection peer, TronMessage msg) throws P2pExcep
}

private void check(PeerConnection peer, TransactionsMessage msg) throws P2pException {
for (Transaction trx : msg.getTransactions().getTransactionsList()) {
Item item = new Item(new TransactionMessage(trx).getMessageId(), InventoryType.TRX);
List<Transaction> list = msg.getTransactions().getTransactionsList();
Set<Sha256Hash> seen = new HashSet<>(list.size() * 2);
for (Transaction trx : list) {
Sha256Hash id = new TransactionMessage(trx).getMessageId();
if (!seen.add(id)) {
throw new P2pException(TypeEnum.BAD_MESSAGE,
"TransactionsMessage contains duplicate transaction: " + id);
}
Item item = new Item(id, InventoryType.TRX);
if (!peer.getAdvInvRequest().containsKey(item)) {
throw new P2pException(TypeEnum.BAD_MESSAGE,
"trx: " + msg.getMessageId() + " without request.");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import org.tron.common.utils.Sha256Hash;
import org.tron.core.capsule.BlockCapsule;
import org.tron.core.config.Parameter;
import org.tron.core.exception.P2pException;
import org.tron.core.net.P2pRateLimiter;
import org.tron.core.net.TronNetDelegate;
import org.tron.core.net.message.adv.BlockMessage;
Expand Down Expand Up @@ -124,12 +125,42 @@ public void testSyncFetchCheck() {
Assert.assertEquals("minBlockNum: 16000, blockNum: 10000", e2.getMessage());
}

@Test
public void testDuplicateHashRejected() throws Exception {
FetchInvDataMsgHandler handler = new FetchInvDataMsgHandler();
PeerConnection peer = Mockito.mock(PeerConnection.class);
AdvService advService = Mockito.mock(AdvService.class);
TronNetDelegate tronNetDelegate = Mockito.mock(TronNetDelegate.class);

ReflectUtils.setFieldValue(handler, "advService", advService);
ReflectUtils.setFieldValue(handler, "tronNetDelegate", tronNetDelegate);

Sha256Hash hash = Sha256Hash.ZERO_HASH;
List<Sha256Hash> hashList = new LinkedList<>();
hashList.add(hash);
hashList.add(hash); // duplicate

FetchInvDataMessage msg = new FetchInvDataMessage(hashList,
Protocol.Inventory.InventoryType.TRX);

Cache<Item, Long> advInvSpread = CacheBuilder.newBuilder()
.maximumSize(20000).expireAfterWrite(1, TimeUnit.HOURS).build();
advInvSpread.put(new Item(hash, Protocol.Inventory.InventoryType.TRX), 1L);
Mockito.when(peer.getAdvInvSpread()).thenReturn(advInvSpread);

try {
handler.processMessage(peer, msg);
Assert.fail("Expected P2pException for duplicate hash");
} catch (P2pException e) {
Assert.assertEquals(P2pException.TypeEnum.BAD_MESSAGE, e.getType());
}
}

@Test
public void testRateLimiter() {
BlockCapsule.BlockId blockId = new BlockCapsule.BlockId(Sha256Hash.ZERO_HASH, 10000L);
List<Sha256Hash> blockIds = new LinkedList<>();
for (int i = 0; i <= 100; i++) {
blockIds.add(blockId);
blockIds.add(new BlockCapsule.BlockId(Sha256Hash.ZERO_HASH, (long) i));
}
FetchInvDataMessage msg =
new FetchInvDataMessage(blockIds, Protocol.Inventory.InventoryType.BLOCK);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,14 @@
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.Arrays;
import org.junit.Assert;
import org.junit.Test;
import org.mockito.Mockito;
import org.tron.common.TestConstants;
import org.tron.common.utils.Sha256Hash;
import org.tron.core.config.args.Args;
import org.tron.core.exception.P2pException;
import org.tron.core.net.TronNetDelegate;
import org.tron.core.net.message.adv.InventoryMessage;
import org.tron.core.net.peer.PeerConnection;
Expand Down Expand Up @@ -52,6 +56,24 @@ public void testProcessMessage() throws Exception {
Mockito.verify(tronNetDelegate, Mockito.atLeastOnce()).isBlockUnsolidified();
}

@Test
public void testDuplicateHashesRejected() throws Exception {
InventoryMsgHandler handler = new InventoryMsgHandler();
Args.setParam(new String[] {}, TestConstants.TEST_CONF);

Sha256Hash hash = Sha256Hash.wrap(new byte[32]);
InventoryMessage msg = new InventoryMessage(Arrays.asList(hash, hash), InventoryType.TRX);
PeerConnection peer = new PeerConnection();
peer.setChannel(getChannel("1.0.0.4", 1000));

try {
handler.processMessage(peer, msg);
Assert.fail("Expected P2pException for duplicate hashes");
} catch (P2pException e) {
Assert.assertEquals(P2pException.TypeEnum.BAD_MESSAGE, e.getType());
}
}

private Channel getChannel(String host, int port) throws Exception {
Channel channel = new Channel();
InetSocketAddress inetSocketAddress = new InetSocketAddress(host, port);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,13 @@
import org.junit.rules.TemporaryFolder;
import org.tron.common.TestConstants;
import org.tron.common.application.TronApplicationContext;
import org.tron.common.utils.Sha256Hash;
import org.tron.core.capsule.BlockCapsule;
import org.tron.core.capsule.BlockCapsule.BlockId;
import org.tron.core.config.DefaultConfig;
import org.tron.core.config.args.Args;
import org.tron.core.exception.P2pException;
import org.tron.core.net.TronNetDelegate;
import org.tron.core.net.message.sync.BlockInventoryMessage;
import org.tron.core.net.message.sync.SyncBlockChainMessage;
import org.tron.core.net.peer.PeerConnection;
Expand Down Expand Up @@ -108,6 +110,56 @@ public void testProcessMessage() throws Exception {
Assert.assertEquals(1, list.size());
}

@Test
public void testBlockIdsExceedsLimit() throws Exception {
List<BlockId> blockIds = new ArrayList<>();
// genesis block as first (in main chain), then 30 more = 31 total → exceeds limit
BlockId genesis = context.getBean(
TronNetDelegate.class).getGenesisBlockId();
blockIds.add(genesis);
for (int i = 1; i <= 30; i++) {
blockIds.add(new BlockId(Sha256Hash.ZERO_HASH, i));
}
SyncBlockChainMessage msg = new SyncBlockChainMessage(blockIds);

try {
Method checkMethod = SyncBlockChainMsgHandler.class
.getDeclaredMethod("check", PeerConnection.class, SyncBlockChainMessage.class);
checkMethod.setAccessible(true);
checkMethod.invoke(handler, peer, msg);
Assert.fail("Expected P2pException for oversized blockIds");
} catch (InvocationTargetException e) {
Assert.assertTrue(e.getCause() instanceof P2pException);
Assert.assertEquals(P2pException.TypeEnum.BAD_MESSAGE,
((P2pException) e.getCause()).getType());
}
}

@Test
public void testBlockIdsAtLimit() throws Exception {
List<BlockId> blockIds = new ArrayList<>();
BlockId genesis = context.getBean(
TronNetDelegate.class).getGenesisBlockId();
blockIds.add(genesis);
for (int i = 1; i < 30; i++) {
blockIds.add(new BlockId(Sha256Hash.ZERO_HASH, i));
}
// exactly 30 → should not throw for length check
SyncBlockChainMessage msg = new SyncBlockChainMessage(blockIds);

Method checkMethod = SyncBlockChainMsgHandler.class
.getDeclaredMethod("check", PeerConnection.class, SyncBlockChainMessage.class);
checkMethod.setAccessible(true);
// does not throw P2pException due to length (may return false for other checks — that's fine)
try {
checkMethod.invoke(handler, peer, msg);
} catch (InvocationTargetException e) {
Assert.assertFalse("Should not fail with BAD_MESSAGE for length at limit",
e.getCause() instanceof P2pException
&& ((P2pException) e.getCause()).getMessage().contains("exceeds limit"));
}
}

@AfterClass
public static void destroy() {
for (PeerConnection p : PeerManager.getPeers()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.tron.common.TestConstants;
import org.tron.common.runtime.TvmTestUtils;
import org.tron.common.utils.ByteArray;
import org.tron.common.utils.ReflectUtils;
import org.tron.core.ChainBaseManager;
import org.tron.core.config.args.Args;
import org.tron.core.exception.P2pException;
Expand Down Expand Up @@ -142,10 +143,10 @@ public void testProcessMessageAfterClose() throws Exception {
TransactionsMsgHandler handler = new TransactionsMsgHandler();
handler.init();
handler.close();

PeerConnection peer = Mockito.mock(PeerConnection.class);
TransactionsMessage msg = Mockito.mock(TransactionsMessage.class);

handler.processMessage(peer, msg);

Mockito.verify(msg, Mockito.never()).getTransactions();
Expand Down Expand Up @@ -290,6 +291,52 @@ public void testHandleTransaction() throws Exception {
}
}

@Test
public void testDuplicateTransactionRejected() throws Exception {
TransactionsMsgHandler handler = new TransactionsMsgHandler();
handler.init();
try {
PeerConnection peer = Mockito.mock(PeerConnection.class);

// Build a transaction
BalanceContract.TransferContract transferContract = BalanceContract.TransferContract
.newBuilder()
.setAmount(10)
.setOwnerAddress(ByteString.copyFrom(ByteArray.fromHexString("121212a9cf")))
.setToAddress(ByteString.copyFrom(ByteArray.fromHexString("232323a9cf")))
.build();
Protocol.Transaction trx = Protocol.Transaction.newBuilder()
.setRawData(Protocol.Transaction.raw.newBuilder()
.addContract(Protocol.Transaction.Contract.newBuilder()
.setType(Protocol.Transaction.Contract.ContractType.TransferContract)
.setParameter(Any.pack(transferContract)).build())
.build())
.build();

// Same trx twice → duplicate
Protocol.Transactions transactions = Protocol.Transactions.newBuilder()
.addTransactions(trx)
.addTransactions(trx)
.build();
TransactionsMessage msg = new TransactionsMessage(transactions.getTransactionsList());

TransactionMessage trxMsg = new TransactionMessage(trx);
Item item = new Item(trxMsg.getMessageId(), Protocol.Inventory.InventoryType.TRX);
Map<Item, Long> advInvRequest = new ConcurrentHashMap<>();
advInvRequest.put(item, System.currentTimeMillis());
Mockito.when(peer.getAdvInvRequest()).thenReturn(advInvRequest);

try {
handler.processMessage(peer, msg);
Assert.fail("Expected P2pException for duplicate transaction");
} catch (P2pException e) {
Assert.assertEquals(P2pException.TypeEnum.BAD_MESSAGE, e.getType());
}
} finally {
handler.close();
}
}

@Test
public void testIsBusyWithCachedTransactions() throws Exception {
TransactionsMsgHandler handler = new TransactionsMsgHandler();
Expand Down
Loading