From d9ad05fa6387043b3953c2a58eb28ea0d73061e2 Mon Sep 17 00:00:00 2001 From: zeusoo001 Date: Mon, 13 Apr 2026 15:32:56 +0800 Subject: [PATCH] feat: fix TRX inv rate limit bug and add BLOCK inv rate limit Co-Authored-By: Claude Sonnet 4.6 --- .../common/parameter/CommonParameter.java | 3 + .../java/org/tron/core/config/args/Args.java | 3 + .../org/tron/core/config/args/ConfigKey.java | 1 + .../tron/core/net/P2pEventHandlerImpl.java | 43 +++++--- .../core/net/P2pEventHandlerImplTest.java | 99 ++++++++++++++++++- 5 files changed, 132 insertions(+), 17 deletions(-) diff --git a/common/src/main/java/org/tron/common/parameter/CommonParameter.java b/common/src/main/java/org/tron/common/parameter/CommonParameter.java index a73158a718a..e7957c917e2 100644 --- a/common/src/main/java/org/tron/common/parameter/CommonParameter.java +++ b/common/src/main/java/org/tron/common/parameter/CommonParameter.java @@ -119,6 +119,9 @@ public class CommonParameter { public int maxTps; // clearParam: 1000 @Getter @Setter + public int maxBlockInvPerSecond = 10; // default: 10 block inv hashes/s per peer + @Getter + @Setter public int minParticipationRate; @Getter public P2pConfig p2pConfig; diff --git a/framework/src/main/java/org/tron/core/config/args/Args.java b/framework/src/main/java/org/tron/core/config/args/Args.java index 83d7fd2c63d..78eb63ee9a6 100644 --- a/framework/src/main/java/org/tron/core/config/args/Args.java +++ b/framework/src/main/java/org/tron/core/config/args/Args.java @@ -400,6 +400,9 @@ public static void applyConfigParams( PARAMETER.maxTps = config.hasPath(ConfigKey.NODE_MAX_TPS) ? config.getInt(ConfigKey.NODE_MAX_TPS) : 1000; + int rawBlockInvRate = config.hasPath(ConfigKey.NODE_MAX_BLOCK_INV_PER_SECOND) + ? config.getInt(ConfigKey.NODE_MAX_BLOCK_INV_PER_SECOND) : 10; + PARAMETER.maxBlockInvPerSecond = Math.max(1, rawBlockInvRate); PARAMETER.minParticipationRate = config.hasPath(ConfigKey.NODE_MIN_PARTICIPATION_RATE) diff --git a/framework/src/main/java/org/tron/core/config/args/ConfigKey.java b/framework/src/main/java/org/tron/core/config/args/ConfigKey.java index b21c9c440a4..0c514100329 100644 --- a/framework/src/main/java/org/tron/core/config/args/ConfigKey.java +++ b/framework/src/main/java/org/tron/core/config/args/ConfigKey.java @@ -76,6 +76,7 @@ private ConfigKey() { public static final String NODE_ENABLE_IPV6 = "node.enableIpv6"; public static final String NODE_SYNC_FETCH_BATCH_NUM = "node.syncFetchBatchNum"; public static final String NODE_MAX_TPS = "node.maxTps"; + public static final String NODE_MAX_BLOCK_INV_PER_SECOND = "node.maxBlockInvPerSecond"; public static final String NODE_NET_MAX_TRX_PER_SECOND = "node.netMaxTrxPerSecond"; public static final String NODE_TCP_NETTY_WORK_THREAD_NUM = "node.tcpNettyWorkThreadNum"; public static final String NODE_UDP_NETTY_WORK_THREAD_NUM = "node.udpNettyWorkThreadNum"; diff --git a/framework/src/main/java/org/tron/core/net/P2pEventHandlerImpl.java b/framework/src/main/java/org/tron/core/net/P2pEventHandlerImpl.java index 9cfa5058e8c..b9173b95cde 100644 --- a/framework/src/main/java/org/tron/core/net/P2pEventHandlerImpl.java +++ b/framework/src/main/java/org/tron/core/net/P2pEventHandlerImpl.java @@ -36,6 +36,7 @@ import org.tron.core.net.service.effective.EffectiveCheckService; import org.tron.core.net.service.handshake.HandshakeService; import org.tron.core.net.service.keepalive.KeepAliveService; +import org.tron.core.net.service.statistics.MessageStatistics; import org.tron.p2p.P2pEventHandler; import org.tron.p2p.connection.Channel; import org.tron.protos.Protocol; @@ -91,6 +92,7 @@ public class P2pEventHandlerImpl extends P2pEventHandler { private byte MESSAGE_MAX_TYPE = 127; private int maxCountIn10s = Args.getInstance().getMaxTps() * 10; + private int maxBlockInvIn10s = Args.getInstance().getMaxBlockInvPerSecond() * 10; public P2pEventHandlerImpl() { Set set = new HashSet<>(); @@ -149,19 +151,8 @@ private void processMessage(PeerConnection peer, byte[] data) { msg = TronMessageFactory.create(data); type = msg.getType(); - if (INVENTORY.equals(type)) { - InventoryMessage message = (InventoryMessage) msg; - Protocol.Inventory.InventoryType inventoryType = message.getInventoryType(); - int count = peer.getPeerStatistics().messageStatistics.tronInTrxInventoryElement - .getCount(10); - if (inventoryType.equals(Protocol.Inventory.InventoryType.TRX) && count > maxCountIn10s) { - logger.warn("Drop inventory from Peer {}, cur:{}, max:{}", - peer.getInetAddress(), count, maxCountIn10s); - if (Args.getInstance().isOpenPrintLog()) { - logger.warn("[overload]Drop tx list is: {}", ((InventoryMessage) msg).getHashList()); - } - return; - } + if (INVENTORY.equals(type) && !checkInvRateLimit(peer, (InventoryMessage) msg)) { + return; } peer.getPeerStatistics().messageStatistics.addTcpInMessage(msg); @@ -224,6 +215,32 @@ private void processMessage(PeerConnection peer, byte[] data) { } } + private boolean checkInvRateLimit(PeerConnection peer, InventoryMessage msg) { + InventoryType invType = msg.getInventoryType(); + int currentSize = msg.getInventory().getIdsCount(); + MessageStatistics stats = peer.getPeerStatistics().messageStatistics; + + if (invType == InventoryType.TRX) { + int count = stats.tronInTrxInventoryElement.getCount(10); + if (count + currentSize > maxCountIn10s) { + logger.warn("Drop TRX inv from {}, window:{}, cur:{}, max:{}", + peer.getInetAddress(), count, currentSize, maxCountIn10s); + if (Args.getInstance().isOpenPrintLog()) { + logger.warn("[overload] Drop tx list: {}", msg.getHashList()); + } + return false; + } + } else if (invType == InventoryType.BLOCK) { + int count = stats.tronInBlockInventoryElement.getCount(10); + if (count + currentSize > maxBlockInvIn10s) { + logger.warn("Drop BLOCK inv from {}, window:{}, cur:{}, max:{}", + peer.getInetAddress(), count, currentSize, maxBlockInvIn10s); + return false; + } + } + return true; + } + private void updateLastInteractiveTime(PeerConnection peer, TronMessage msg) { MessageTypes type = msg.getType(); diff --git a/framework/src/test/java/org/tron/core/net/P2pEventHandlerImplTest.java b/framework/src/test/java/org/tron/core/net/P2pEventHandlerImplTest.java index 03c79f495ee..2e79bbf5809 100644 --- a/framework/src/test/java/org/tron/core/net/P2pEventHandlerImplTest.java +++ b/framework/src/test/java/org/tron/core/net/P2pEventHandlerImplTest.java @@ -34,6 +34,7 @@ public static void init() throws Exception { public void testProcessInventoryMessage() throws Exception { CommonParameter parameter = CommonParameter.getInstance(); parameter.setMaxTps(10); + parameter.setMaxBlockInvPerSecond(10); PeerStatistics peerStatistics = new PeerStatistics(); @@ -75,7 +76,7 @@ public void testProcessInventoryMessage() throws Exception { count = peer.getPeerStatistics().messageStatistics.tronInTrxInventoryElement.getCount(10); - Assert.assertEquals(110, count); + Assert.assertEquals(10, count); // 100 hashes dropped: 10+100=110 > maxCountIn10s(100) list.clear(); for (int i = 0; i < 100; i++) { @@ -88,7 +89,7 @@ public void testProcessInventoryMessage() throws Exception { count = peer.getPeerStatistics().messageStatistics.tronInTrxInventoryElement.getCount(10); - Assert.assertEquals(110, count); + Assert.assertEquals(10, count); // still dropped: window=10, 10+100=110 > 100 list.clear(); for (int i = 0; i < 200; i++) { @@ -101,7 +102,7 @@ public void testProcessInventoryMessage() throws Exception { count = peer.getPeerStatistics().messageStatistics.tronInBlockInventoryElement.getCount(10); - Assert.assertEquals(200, count); + Assert.assertEquals(0, count); // 200 hashes dropped: 0+200=200 > maxBlockInvIn10s(100) list.clear(); for (int i = 0; i < 100; i++) { @@ -114,10 +115,100 @@ public void testProcessInventoryMessage() throws Exception { count = peer.getPeerStatistics().messageStatistics.tronInBlockInventoryElement.getCount(10); - Assert.assertEquals(300, count); + Assert.assertEquals(100, count); // passes: window=0, 0+100=100, not > 100 } + @Test + public void testCheckInvRateLimitTrxBoundary() throws Exception { + // maxTps=10 → maxCountIn10s=100 + CommonParameter parameter = CommonParameter.getInstance(); + parameter.setMaxTps(10); + parameter.setMaxBlockInvPerSecond(10); + + PeerStatistics peerStatistics = new PeerStatistics(); + PeerConnection peer = mock(PeerConnection.class); + Mockito.when(peer.getPeerStatistics()).thenReturn(peerStatistics); + + P2pEventHandlerImpl handler = new P2pEventHandlerImpl(); + Method method = handler.getClass() + .getDeclaredMethod("processMessage", PeerConnection.class, byte[].class); + method.setAccessible(true); + + // Fill window to 91: send 91 TRX hashes → passes (0+91=91 ≤ 100) + List list91 = new ArrayList<>(); + for (int i = 0; i < 91; i++) { + list91.add(new Sha256Hash(i, new byte[32])); + } + InventoryMessage msg91 = new InventoryMessage(list91, InventoryType.TRX); + method.invoke(handler, peer, msg91.getSendBytes()); + Assert.assertEquals(91, + peer.getPeerStatistics().messageStatistics.tronInTrxInventoryElement.getCount(10)); + + // Send 9 more TRX hashes → passes (91+9=100, not > 100) + List list9 = new ArrayList<>(); + for (int i = 0; i < 9; i++) { + list9.add(new Sha256Hash(i, new byte[32])); + } + InventoryMessage msg9 = new InventoryMessage(list9, InventoryType.TRX); + method.invoke(handler, peer, msg9.getSendBytes()); + Assert.assertEquals(100, + peer.getPeerStatistics().messageStatistics.tronInTrxInventoryElement.getCount(10)); + + // Send 1 more TRX hash → DROPPED (100+1=101 > 100) + List list1 = new ArrayList<>(); + list1.add(new Sha256Hash(0, new byte[32])); + InventoryMessage msg1 = new InventoryMessage(list1, InventoryType.TRX); + method.invoke(handler, peer, msg1.getSendBytes()); + Assert.assertEquals(100, // count unchanged: message was dropped + peer.getPeerStatistics().messageStatistics.tronInTrxInventoryElement.getCount(10)); + } + + @Test + public void testCheckInvRateLimitBlockBoundary() throws Exception { + // maxBlockInvPerSecond=10 → maxBlockInvIn10s=100 + CommonParameter parameter = CommonParameter.getInstance(); + parameter.setMaxTps(1000); + parameter.setMaxBlockInvPerSecond(10); + + PeerStatistics peerStatistics = new PeerStatistics(); + PeerConnection peer = mock(PeerConnection.class); + Mockito.when(peer.getPeerStatistics()).thenReturn(peerStatistics); + + P2pEventHandlerImpl handler = new P2pEventHandlerImpl(); + Method method = handler.getClass() + .getDeclaredMethod("processMessage", PeerConnection.class, byte[].class); + method.setAccessible(true); + + // Send 101 BLOCK hashes → DROPPED (0+101=101 > 100) + List list101 = new ArrayList<>(); + for (int i = 0; i < 101; i++) { + list101.add(new Sha256Hash(i, new byte[32])); + } + InventoryMessage msgBlock101 = new InventoryMessage(list101, InventoryType.BLOCK); + method.invoke(handler, peer, msgBlock101.getSendBytes()); + Assert.assertEquals(0, + peer.getPeerStatistics().messageStatistics.tronInBlockInventoryElement.getCount(10)); + + // Send 100 BLOCK hashes → passes (0+100=100, not > 100) + List list100 = new ArrayList<>(); + for (int i = 0; i < 100; i++) { + list100.add(new Sha256Hash(i, new byte[32])); + } + InventoryMessage msgBlock100 = new InventoryMessage(list100, InventoryType.BLOCK); + method.invoke(handler, peer, msgBlock100.getSendBytes()); + Assert.assertEquals(100, + peer.getPeerStatistics().messageStatistics.tronInBlockInventoryElement.getCount(10)); + + // Send 1 more BLOCK hash → DROPPED (100+1=101 > 100) + List list1 = new ArrayList<>(); + list1.add(new Sha256Hash(0, new byte[32])); + InventoryMessage msgBlock1 = new InventoryMessage(list1, InventoryType.BLOCK); + method.invoke(handler, peer, msgBlock1.getSendBytes()); + Assert.assertEquals(100, // count unchanged: message was dropped + peer.getPeerStatistics().messageStatistics.tronInBlockInventoryElement.getCount(10)); + } + @Test public void testUpdateLastInteractiveTime() throws Exception { PeerConnection peer = new PeerConnection();