From 3c49cb24803e713531785f1123cdae1e4e4133cb Mon Sep 17 00:00:00 2001 From: wb Date: Tue, 21 Apr 2026 11:44:47 +0800 Subject: [PATCH] feat(sync): reduce memory pressure by deferring block deserialization and throttling in-flight requests --- .../common/parameter/CommonParameter.java | 3 + .../org/tron/core/config/args/NodeConfig.java | 9 ++ common/src/main/resources/reference.conf | 2 + .../java/org/tron/core/config/args/Args.java | 1 + .../core/net/service/sync/SyncService.java | 60 +++++++++---- .../core/net/service/sync/UnparsedBlock.java | 46 ++++++++++ framework/src/main/resources/config.conf | 5 ++ .../core/net/services/SyncServiceTest.java | 84 ++++++++++++++----- 8 files changed, 177 insertions(+), 33 deletions(-) create mode 100644 framework/src/main/java/org/tron/core/net/service/sync/UnparsedBlock.java diff --git a/common/src/main/java/org/tron/common/parameter/CommonParameter.java b/common/src/main/java/org/tron/common/parameter/CommonParameter.java index a73158a718a..f66f50841dc 100644 --- a/common/src/main/java/org/tron/common/parameter/CommonParameter.java +++ b/common/src/main/java/org/tron/common/parameter/CommonParameter.java @@ -146,6 +146,9 @@ public class CommonParameter { @Getter @Setter public long syncFetchBatchNum; // clearParam: 2000 + @Getter + @Setter + public int maxPendingBlockSize; // If you are running a solidity node for java tron, // this flag is set to true diff --git a/common/src/main/java/org/tron/core/config/args/NodeConfig.java b/common/src/main/java/org/tron/core/config/args/NodeConfig.java index c3305e976de..105b285e37a 100644 --- a/common/src/main/java/org/tron/core/config/args/NodeConfig.java +++ b/common/src/main/java/org/tron/core/config/args/NodeConfig.java @@ -25,6 +25,7 @@ public class NodeConfig { private String trustNode = ""; private boolean walletExtensionApi = false; private int syncFetchBatchNum = 2000; + private int maxPendingBlockSize = 500; private int validateSignThreadNum = 0; // 0 = auto (availableProcessors) private int maxConnections = 30; private int minConnections = 8; @@ -439,6 +440,14 @@ private void postProcess() { syncFetchBatchNum = 100; } + // maxPendingBlockSize: clamp to [50, 2000] + if (maxPendingBlockSize > 2000) { + maxPendingBlockSize = 2000; + } + if (maxPendingBlockSize < 50) { + maxPendingBlockSize = 50; + } + // blockProducedTimeOut: clamp to [30, 100] if (blockProducedTimeOut < 30) { blockProducedTimeOut = 30; diff --git a/common/src/main/resources/reference.conf b/common/src/main/resources/reference.conf index 11970a0a673..679d195f3c4 100644 --- a/common/src/main/resources/reference.conf +++ b/common/src/main/resources/reference.conf @@ -202,6 +202,8 @@ node { # Number of blocks to fetch in one batch during sync. Range: [100, 2000]. syncFetchBatchNum = 2000 + # Max in-flight (requested but not yet processed) blocks during sync. Range: [50, 2000]. + maxPendingBlockSize = 500 # Number of validate sign threads, default availableProcessors # Number of validate sign threads, 0 = auto (availableProcessors) diff --git a/framework/src/main/java/org/tron/core/config/args/Args.java b/framework/src/main/java/org/tron/core/config/args/Args.java index f91c6a437ac..e9e2d413cbf 100644 --- a/framework/src/main/java/org/tron/core/config/args/Args.java +++ b/framework/src/main/java/org/tron/core/config/args/Args.java @@ -620,6 +620,7 @@ private static void applyNodeConfig(NodeConfig nc) { PARAMETER.nodeEnableIpv6 = nc.isEnableIpv6(); PARAMETER.syncFetchBatchNum = nc.getSyncFetchBatchNum(); + PARAMETER.maxPendingBlockSize = nc.getMaxPendingBlockSize(); PARAMETER.solidityThreads = nc.getSolidityThreads(); PARAMETER.blockProducedTimeOut = nc.getBlockProducedTimeOut(); diff --git a/framework/src/main/java/org/tron/core/net/service/sync/SyncService.java b/framework/src/main/java/org/tron/core/net/service/sync/SyncService.java index 75349bd4c19..32230612743 100644 --- a/framework/src/main/java/org/tron/core/net/service/sync/SyncService.java +++ b/framework/src/main/java/org/tron/core/net/service/sync/SyncService.java @@ -5,6 +5,7 @@ import com.google.common.cache.Cache; import com.google.common.cache.CacheBuilder; import java.util.ArrayList; +import java.util.Collection; import java.util.Collections; import java.util.HashMap; import java.util.LinkedList; @@ -44,9 +45,9 @@ public class SyncService { @Autowired private PbftDataSyncHandler pbftDataSyncHandler; - private Map blockWaitToProcess = new ConcurrentHashMap<>(); + private Map blockWaitToProcess = new ConcurrentHashMap<>(); - private Map blockJustReceived = new ConcurrentHashMap<>(); + private Map blockJustReceived = new ConcurrentHashMap<>(); private long blockCacheTimeout = Args.getInstance().getBlockCacheTimeout(); private Cache requestBlockIds = CacheBuilder.newBuilder() @@ -69,6 +70,10 @@ public class SyncService { private final long syncFetchBatchNum = Args.getInstance().getSyncFetchBatchNum(); + private final int maxPendingBlockSize = Args.getInstance().getMaxPendingBlockSize(); + + private volatile long maxRequestedBlockNum = 0; + public void init() { ExecutorServiceManager.scheduleWithFixedDelay(fetchExecutor, () -> { try { @@ -135,7 +140,9 @@ public void syncNext(PeerConnection peer) { public void processBlock(PeerConnection peer, BlockMessage blockMessage) { synchronized (blockJustReceived) { - blockJustReceived.put(blockMessage, peer); + UnparsedBlock unparsedBlock = new UnparsedBlock( + blockMessage.getBlockId(), blockMessage.getData()); + blockJustReceived.put(unparsedBlock, peer); } handleFlag = true; if (peer.isSyncIdle()) { @@ -227,8 +234,18 @@ private BlockId getBlockIdByNum(long num) throws P2pException { } private void startFetchSyncBlock() { + Collection activePeers = tronNetDelegate.getActivePeer(); + int reqNum = activePeers.stream() + .mapToInt(p -> p.getSyncBlockRequested().size()).sum(); + int remainNum; + synchronized (blockJustReceived) { + remainNum = maxPendingBlockSize - reqNum + - blockJustReceived.size() - blockWaitToProcess.size(); + } + HashMap> send = new HashMap<>(); - tronNetDelegate.getActivePeer().stream() + int[] fetchingBlockSize = {0}; + activePeers.stream() .filter(peer -> peer.isNeedSyncFromPeer() && peer.isSyncIdle()) .filter(peer -> peer.isFetchAble()) .forEach(peer -> { @@ -238,9 +255,16 @@ private void startFetchSyncBlock() { for (BlockId blockId : peer.getSyncBlockToFetch()) { if (requestBlockIds.getIfPresent(blockId) == null && !peer.getSyncBlockInProcess().contains(blockId)) { + if (fetchingBlockSize[0] >= remainNum && blockId.getNum() > maxRequestedBlockNum) { + break; + } + if (blockId.getNum() > maxRequestedBlockNum) { + maxRequestedBlockNum = blockId.getNum(); + } requestBlockIds.put(blockId, peer); peer.getSyncBlockRequested().put(blockId, System.currentTimeMillis()); send.get(peer).add(blockId); + fetchingBlockSize[0]++; if (send.get(peer).size() >= MAX_BLOCK_FETCH_PER_PEER) { break; } @@ -269,29 +293,37 @@ private synchronized void handleSyncBlock() { isProcessed[0] = false; - blockWaitToProcess.forEach((msg, peerConnection) -> { + blockWaitToProcess.forEach((unparsedBlock, peerConnection) -> { synchronized (tronNetDelegate.getBlockLock()) { + BlockId blockId = unparsedBlock.getBlockId(); if (peerConnection.isDisconnect()) { - blockWaitToProcess.remove(msg); - invalid(msg.getBlockId(), peerConnection); + blockWaitToProcess.remove(unparsedBlock); + invalid(blockId, peerConnection); return; } - if (msg.getBlockId().getNum() <= solidNum) { - blockWaitToProcess.remove(msg); - peerConnection.getSyncBlockInProcess().remove(msg.getBlockId()); + if (blockId.getNum() <= solidNum) { + blockWaitToProcess.remove(unparsedBlock); + peerConnection.getSyncBlockInProcess().remove(blockId); return; } final boolean[] isFound = {false}; tronNetDelegate.getActivePeer().stream() - .filter(peer -> msg.getBlockId().equals(peer.getSyncBlockToFetch().peek())) + .filter(peer -> blockId.equals(peer.getSyncBlockToFetch().peek())) .forEach(peer -> { isFound[0] = true; }); if (isFound[0]) { - blockWaitToProcess.remove(msg); + blockWaitToProcess.remove(unparsedBlock); isProcessed[0] = true; - processSyncBlock(msg.getBlockCapsule(), peerConnection); - peerConnection.getSyncBlockInProcess().remove(msg.getBlockId()); + BlockCapsule block; + try { + block = new BlockCapsule(unparsedBlock.getData()); + } catch (Exception e) { + logger.warn("Deserialize block {} failed", blockId.getString(), e); + return; + } + processSyncBlock(block, peerConnection); + peerConnection.getSyncBlockInProcess().remove(blockId); } } }); diff --git a/framework/src/main/java/org/tron/core/net/service/sync/UnparsedBlock.java b/framework/src/main/java/org/tron/core/net/service/sync/UnparsedBlock.java new file mode 100644 index 00000000000..129c992ce7b --- /dev/null +++ b/framework/src/main/java/org/tron/core/net/service/sync/UnparsedBlock.java @@ -0,0 +1,46 @@ +package org.tron.core.net.service.sync; + +import org.tron.core.capsule.BlockCapsule; + +public class UnparsedBlock { + + private final BlockCapsule.BlockId blockId; + private final byte[] data; + + public UnparsedBlock(BlockCapsule.BlockId blockId, byte[] data) { + if (blockId == null) { + throw new IllegalArgumentException("blockId must not be null"); + } + this.blockId = blockId; + this.data = data; + } + + public BlockCapsule.BlockId getBlockId() { + return blockId; + } + + public byte[] getData() { + return data; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (!(o instanceof UnparsedBlock)) { + return false; + } + return blockId.equals(((UnparsedBlock) o).blockId); + } + + @Override + public int hashCode() { + return blockId.hashCode(); + } + + @Override + public String toString() { + return blockId.getString(); + } +} diff --git a/framework/src/main/resources/config.conf b/framework/src/main/resources/config.conf index 369924074bc..bec9141fa18 100644 --- a/framework/src/main/resources/config.conf +++ b/framework/src/main/resources/config.conf @@ -163,6 +163,11 @@ node { fetchBlock.timeout = 200 # syncFetchBatchNum = 2000 + # Maximum number of blocks allowed in-flight (requested but not yet processed). + # Throttles block download to reduce memory pressure during sync. + # Range: [50, 2000], default: 500 + # maxPendingBlockSize = 500 + # Number of validate sign thread, default availableProcessors # validateSignThreadNum = 16 diff --git a/framework/src/test/java/org/tron/core/net/services/SyncServiceTest.java b/framework/src/test/java/org/tron/core/net/services/SyncServiceTest.java index 45ecbe866ab..2366aab3ab5 100644 --- a/framework/src/test/java/org/tron/core/net/services/SyncServiceTest.java +++ b/framework/src/test/java/org/tron/core/net/services/SyncServiceTest.java @@ -23,6 +23,7 @@ import org.tron.core.net.peer.PeerManager; import org.tron.core.net.peer.TronState; import org.tron.core.net.service.sync.SyncService; +import org.tron.core.net.service.sync.UnparsedBlock; import org.tron.p2p.connection.Channel; import org.tron.protos.Protocol; @@ -98,12 +99,22 @@ public void testProcessBlock() { ReflectUtils.setFieldValue(c1, "inetSocketAddress", inetSocketAddress); ReflectUtils.setFieldValue(c1, "inetAddress", inetSocketAddress.getAddress()); peer.setChannel(c1); - service.processBlock(peer, - new BlockMessage(new BlockCapsule(Protocol.Block.newBuilder().build()))); + + BlockCapsule blockCapsule = new BlockCapsule(Protocol.Block.newBuilder().build()); + BlockMessage blockMessage = new BlockMessage(blockCapsule); + service.processBlock(peer, blockMessage); + boolean fetchFlag = (boolean) ReflectUtils.getFieldObject(service, "fetchFlag"); boolean handleFlag = (boolean) ReflectUtils.getFieldObject(service, "handleFlag"); Assert.assertTrue(fetchFlag); Assert.assertTrue(handleFlag); + + Map blockJustReceived = + (Map) + ReflectUtils.getFieldObject(service, "blockJustReceived"); + Assert.assertEquals(1, blockJustReceived.size()); + UnparsedBlock stored = blockJustReceived.keySet().iterator().next(); + Assert.assertEquals(blockMessage.getBlockId(), stored.getBlockId()); } @Test @@ -169,6 +180,46 @@ public void testStartFetchSyncBlock() throws Exception { peer.getSyncBlockRequested().remove(blockId); method.invoke(service); Assert.assertTrue(peer.getSyncBlockRequested().get(blockId) == null); + + // reset maxRequestedBlockNum to 0 + Field maxRequestedBlockNumField = service.getClass().getDeclaredField("maxRequestedBlockNum"); + maxRequestedBlockNumField.setAccessible(true); + maxRequestedBlockNumField.set(service, 0L); + + Map blockWaitToProcess = + (Map) + ReflectUtils.getFieldObject(service, "blockWaitToProcess"); + + // target block has num=1, above maxRequestedBlockNum=0 so it can be throttled + BlockCapsule.BlockId highBlockId = new BlockCapsule.BlockId(Sha256Hash.ZERO_HASH, 1); + peer.getSyncBlockToFetch().clear(); + peer.getSyncBlockToFetch().add(highBlockId); + peer.getSyncBlockRequested().clear(); + requestBlockIds.invalidateAll(); + + // fill blockWaitToProcess to reach maxPendingBlockSize (default 500) + int maxPendingBlockSize = (int) ReflectUtils.getFieldObject(service, "maxPendingBlockSize"); + for (int i = 0; i < maxPendingBlockSize; i++) { + BlockCapsule.BlockId fillId = new BlockCapsule.BlockId(Sha256Hash.ZERO_HASH, 10000 + i); + blockWaitToProcess.put(new UnparsedBlock(fillId, new byte[0]), peer); + } + method.invoke(service); + // highBlockId must NOT be requested: remainNum <= 0 and num > maxRequestedBlockNum + Assert.assertNull(peer.getSyncBlockRequested().get(highBlockId)); + + // Symmetric retry-exemption case: budget still saturated, but the target block's num + // is below maxRequestedBlockNum, so it must still be requested (deadlock-avoidance + // retry path — guards an explicit invariant of the throttling design). + maxRequestedBlockNumField.set(service, 100L); + BlockCapsule.BlockId retryBlockId = new BlockCapsule.BlockId(Sha256Hash.ZERO_HASH, 50); + peer.getSyncBlockToFetch().clear(); + peer.getSyncBlockToFetch().add(retryBlockId); + peer.getSyncBlockRequested().clear(); + requestBlockIds.invalidateAll(); + method.invoke(service); + // retryBlockId MUST be requested: remainNum <= 0 but num=50 <= maxRequestedBlockNum=100 + Assert.assertNotNull(peer.getSyncBlockRequested().get(retryBlockId)); + blockWaitToProcess.clear(); } @Test @@ -181,24 +232,19 @@ public void testHandleSyncBlock() throws Exception { Method method = service.getClass().getDeclaredMethod("handleSyncBlock"); method.setAccessible(true); - Map blockJustReceived = - (Map) + Map blockJustReceived = + (Map) ReflectUtils.getFieldObject(service, "blockJustReceived"); - Protocol.BlockHeader.raw.Builder blockHeaderRawBuild = Protocol.BlockHeader.raw.newBuilder(); - Protocol.BlockHeader.raw blockHeaderRaw = blockHeaderRawBuild + + Protocol.BlockHeader.raw blockHeaderRaw = Protocol.BlockHeader.raw.newBuilder() .setNumber(100000) .build(); - - // block header - Protocol.BlockHeader.Builder blockHeaderBuild = Protocol.BlockHeader.newBuilder(); - Protocol.BlockHeader blockHeader = blockHeaderBuild.setRawData(blockHeaderRaw).build(); - - BlockCapsule blockCapsule = new BlockCapsule(Protocol.Block.newBuilder() - .setBlockHeader(blockHeader).build()); - + Protocol.BlockHeader blockHeader = Protocol.BlockHeader.newBuilder() + .setRawData(blockHeaderRaw).build(); + BlockCapsule blockCapsule = new BlockCapsule( + Protocol.Block.newBuilder().setBlockHeader(blockHeader).build()); BlockCapsule.BlockId blockId = blockCapsule.getBlockId(); - InetSocketAddress a1 = new InetSocketAddress("127.0.0.1", 10001); Channel c1 = mock(Channel.class); Mockito.when(c1.getInetSocketAddress()).thenReturn(a1); @@ -206,14 +252,14 @@ public void testHandleSyncBlock() throws Exception { PeerManager.add(ctx, c1); peer = PeerManager.getPeers().get(0); - blockJustReceived.put(new BlockMessage(blockCapsule), peer); + UnparsedBlock unparsedBlock = new UnparsedBlock(blockId, blockCapsule.getData()); + blockJustReceived.put(unparsedBlock, peer); peer.getSyncBlockToFetch().add(blockId); Cache requestBlockIds = - (Cache) - ReflectUtils.getFieldObject(service, "requestBlockIds"); - + (Cache) + ReflectUtils.getFieldObject(service, "requestBlockIds"); requestBlockIds.put(blockId, peer); method.invoke(service);