From 08683e150f57af18c67778762121c31cf7666833 Mon Sep 17 00:00:00 2001 From: Vladimir Steshin Date: Wed, 10 Dec 2025 21:39:58 +0300 Subject: [PATCH 01/16] raw --- .../discovery/DiscoveryMessageFactory.java | 3 +++ .../tracing/messages/SpanContainer.java | 2 +- .../TcpDiscoveryAbstractTraceableMessage.java | 27 ++++++++++++++++--- .../messages/TcpDiscoveryNodeLeftMessage.java | 14 +++++++++- 4 files changed, 41 insertions(+), 5 deletions(-) diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/DiscoveryMessageFactory.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/DiscoveryMessageFactory.java index 1ac4567b31ea0..0b563d48716db 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/DiscoveryMessageFactory.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/DiscoveryMessageFactory.java @@ -20,6 +20,7 @@ import org.apache.ignite.internal.codegen.TcpDiscoveryCheckFailedMessageSerializer; import org.apache.ignite.internal.codegen.TcpDiscoveryClientPingRequestSerializer; import org.apache.ignite.internal.codegen.TcpDiscoveryClientPingResponseSerializer; +import org.apache.ignite.internal.codegen.TcpDiscoveryNodeLeftMessageSerializer; import org.apache.ignite.internal.codegen.TcpDiscoveryPingRequestSerializer; import org.apache.ignite.internal.codegen.TcpDiscoveryPingResponseSerializer; import org.apache.ignite.plugin.extensions.communication.MessageFactory; @@ -27,6 +28,7 @@ import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryCheckFailedMessage; import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryClientPingRequest; import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryClientPingResponse; +import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryNodeLeftMessage; import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryPingRequest; import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryPingResponse; @@ -39,5 +41,6 @@ public class DiscoveryMessageFactory implements MessageFactoryProvider { factory.register((short)2, TcpDiscoveryPingResponse::new, new TcpDiscoveryPingResponseSerializer()); factory.register((short)3, TcpDiscoveryClientPingRequest::new, new TcpDiscoveryClientPingRequestSerializer()); factory.register((short)4, TcpDiscoveryClientPingResponse::new, new TcpDiscoveryClientPingResponseSerializer()); + factory.register((short)5, TcpDiscoveryNodeLeftMessage::new, new TcpDiscoveryNodeLeftMessageSerializer()); } } diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/tracing/messages/SpanContainer.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/tracing/messages/SpanContainer.java index c1f73f47d2cee..dc60d51f0d13f 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/tracing/messages/SpanContainer.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/tracing/messages/SpanContainer.java @@ -46,7 +46,7 @@ public byte[] serializedSpanBytes() { * @param serializedSpan Serialized span. */ public void serializedSpanBytes(byte[] serializedSpan) { - this.serializedSpanBytes = serializedSpan.clone(); + serializedSpanBytes = serializedSpan.clone(); } /** diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryAbstractTraceableMessage.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryAbstractTraceableMessage.java index 433566d7e3021..3df159a48e806 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryAbstractTraceableMessage.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryAbstractTraceableMessage.java @@ -17,10 +17,12 @@ package org.apache.ignite.spi.discovery.tcp.messages; -import java.io.Externalizable; import java.util.UUID; +import org.apache.ignite.internal.Order; +import org.apache.ignite.internal.managers.discovery.DiscoveryMessageFactory; import org.apache.ignite.internal.processors.tracing.messages.SpanContainer; import org.apache.ignite.internal.processors.tracing.messages.TraceableMessage; +import org.jetbrains.annotations.Nullable; /** * Abstract traceable message for TCP discovery. @@ -29,8 +31,12 @@ public abstract class TcpDiscoveryAbstractTraceableMessage extends TcpDiscoveryA /** Container. */ private SpanContainer spanContainer = new SpanContainer(); + /** Serialization holder of {@link #spanContainer}'s bytes. */ + @Order(value = 5, method = "spanBytes") + private @Nullable byte[] spanBytesHolder; + /** - * Default no-arg constructor for {@link Externalizable} interface. + * Default constructor for {@link DiscoveryMessageFactory}. */ protected TcpDiscoveryAbstractTraceableMessage() { // No-op. @@ -51,7 +57,7 @@ protected TcpDiscoveryAbstractTraceableMessage(UUID creatorNodeId) { protected TcpDiscoveryAbstractTraceableMessage(TcpDiscoveryAbstractTraceableMessage msg) { super(msg); - this.spanContainer = msg.spanContainer; + spanContainer = msg.spanContainer; } /** @@ -67,6 +73,21 @@ public Object readResolve() { return this; } + /** @return {@link #spanContainer}'s bytes. */ + public @Nullable byte[] spanBytes() { + return spanContainer == null ? null : spanContainer.serializedSpanBytes(); + } + + /** @param spanBytes {@link #spanContainer}'s bytes. */ + public void spanBytes(@Nullable byte[] spanBytes) { + if (spanBytes == null) + return; + + readResolve(); + + spanContainer.serializedSpanBytes(spanBytes); + } + /** {@inheritDoc} */ @Override public SpanContainer spanContainer() { return spanContainer; diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryNodeLeftMessage.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryNodeLeftMessage.java index 5d6df69b715b7..350bb3eaa647b 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryNodeLeftMessage.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryNodeLeftMessage.java @@ -18,7 +18,9 @@ package org.apache.ignite.spi.discovery.tcp.messages; import java.util.UUID; +import org.apache.ignite.internal.managers.discovery.DiscoveryMessageFactory; import org.apache.ignite.internal.util.typedef.internal.S; +import org.apache.ignite.plugin.extensions.communication.Message; /** * Sent by node that is stopping to coordinator across the ring, @@ -26,10 +28,15 @@ */ @TcpDiscoveryEnsureDelivery @TcpDiscoveryRedirectToClient -public class TcpDiscoveryNodeLeftMessage extends TcpDiscoveryAbstractTraceableMessage { +public class TcpDiscoveryNodeLeftMessage extends TcpDiscoveryAbstractTraceableMessage implements Message { /** */ private static final long serialVersionUID = 0L; + /** Default constructor for {@link DiscoveryMessageFactory}. */ + public TcpDiscoveryNodeLeftMessage() { + // No-op. + } + /** * Constructor. * @@ -43,4 +50,9 @@ public TcpDiscoveryNodeLeftMessage(UUID creatorNodeId) { @Override public String toString() { return S.toString(TcpDiscoveryNodeLeftMessage.class, this, "super", super.toString()); } + + /** */ + @Override public short directType() { + return 5; + } } From ae56c1cd4b0770d3da0aaadd8222f0cb6a4b8ae6 Mon Sep 17 00:00:00 2001 From: Vladimir Steshin Date: Thu, 11 Dec 2025 22:06:54 +0300 Subject: [PATCH 02/16] class cast fixes --- .../discovery/tcp/messages/TcpDiscoveryAbstractMessage.java | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryAbstractMessage.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryAbstractMessage.java index c8fea72ec3cfd..d15a642a1db83 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryAbstractMessage.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryAbstractMessage.java @@ -293,8 +293,10 @@ public void addFailedNode(UUID nodeId) { /** * @param failedNodes Failed nodes. */ - public void failedNodes(@Nullable Set failedNodes) { - this.failedNodes = failedNodes; + public void failedNodes(@Nullable Collection failedNodes) { + this.failedNodes = failedNodes == null + ? null + : failedNodes instanceof Set ? (Set)failedNodes : new HashSet<>(failedNodes); } /** From 71e0fe20ec367988fff2cd1667f3ae326079be22 Mon Sep 17 00:00:00 2001 From: Vladimir Steshin Date: Fri, 12 Dec 2025 00:03:55 +0300 Subject: [PATCH 03/16] exception type fix --- .../ignite/spi/discovery/tcp/TcpDiscoveryIoSession.java | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryIoSession.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryIoSession.java index 5c1946f0eac34..b64cea9133710 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryIoSession.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryIoSession.java @@ -170,7 +170,9 @@ T readMessage() throws IgniteCheckedException, IOException { if (MESSAGE_SERIALIZATION != serMode) { detectSslAlert(serMode, in); - throw new IgniteCheckedException("Received unexpected byte while reading discovery message: " + serMode); + // There are many `X.hasCause` in the discovery errors processing which change connection recovery processing. + // It is better to throw an IOException on reading failures. Often happens at nodes stop and streams closing. + throw new IOException("Received unexpected byte while reading discovery message: " + serMode); } byte b0 = (byte)in.read(); From f8b0a8d047d80069bbd5778108a2975db9792156 Mon Sep 17 00:00:00 2001 From: Vladimir Steshin Date: Mon, 15 Dec 2025 18:19:46 +0300 Subject: [PATCH 04/16] merged master --- .../internal/managers/discovery/DiscoveryMessageFactory.java | 4 ++-- .../discovery/tcp/messages/TcpDiscoveryNodeLeftMessage.java | 2 +- .../org/apache/ignite/internal/IgniteClientRejoinTest.java | 2 +- 3 files changed, 4 insertions(+), 4 deletions(-) diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/DiscoveryMessageFactory.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/DiscoveryMessageFactory.java index 0650e2f8e6c76..1d0367c9a72b4 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/DiscoveryMessageFactory.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/DiscoveryMessageFactory.java @@ -21,6 +21,7 @@ import org.apache.ignite.internal.codegen.TcpDiscoveryClientPingRequestSerializer; import org.apache.ignite.internal.codegen.TcpDiscoveryClientPingResponseSerializer; import org.apache.ignite.internal.codegen.TcpDiscoveryLoopbackProblemMessageSerializer; +import org.apache.ignite.internal.codegen.TcpDiscoveryNodeLeftMessageSerializer; import org.apache.ignite.internal.codegen.TcpDiscoveryPingRequestSerializer; import org.apache.ignite.internal.codegen.TcpDiscoveryPingResponseSerializer; import org.apache.ignite.plugin.extensions.communication.MessageFactory; @@ -29,10 +30,9 @@ import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryClientPingRequest; import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryClientPingResponse; import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryLoopbackProblemMessage; +import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryNodeLeftMessage; import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryPingRequest; import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryPingResponse; -import org.apache.ignite.internal.codegen.TcpDiscoveryNodeLeftMessageSerializer; -import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryNodeLeftMessage; /** Message factory for discovery messages. */ public class DiscoveryMessageFactory implements MessageFactoryProvider { diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryNodeLeftMessage.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryNodeLeftMessage.java index 350bb3eaa647b..ffc1180890928 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryNodeLeftMessage.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryNodeLeftMessage.java @@ -53,6 +53,6 @@ public TcpDiscoveryNodeLeftMessage(UUID creatorNodeId) { /** */ @Override public short directType() { - return 5; + return 6; } } diff --git a/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientRejoinTest.java b/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientRejoinTest.java index b29a8171fbce4..28f3d7255db02 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientRejoinTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientRejoinTest.java @@ -264,7 +264,7 @@ public void testClientsReconnect() throws Exception { * @throws Exception If failed. */ @Test - public void testClientsReconnectDisabled() throws Exception { + public void testClientsRecocnnectDisabled() throws Exception { clientReconnectDisabled = true; Ignite srv1 = startGrid("server1"); From 9ff2e0699dedc9eac0c44f2fea6ba6da4803dcb9 Mon Sep 17 00:00:00 2001 From: Vladimir Steshin Date: Wed, 24 Dec 2025 19:06:46 +0300 Subject: [PATCH 05/16] + master --- .../cluster/NodeMetricsMessage.java | 104 +++++++++--------- .../ignite/spi/discovery/tcp/ServerImpl.java | 3 +- .../discovery/tcp/TcpDiscoveryIoSession.java | 39 ++++++- .../messages/TcpDiscoveryAbstractMessage.java | 6 +- ...cpDiscoveryClientMetricsUpdateMessage.java | 38 +++++-- ...pDiscoveryClusterMetricsHolderMessage.java | 49 +++++++++ .../messages/TcpDiscoveryNodeLeftMessage.java | 2 +- .../internal/IgniteClientRejoinTest.java | 2 +- 8 files changed, 171 insertions(+), 72 deletions(-) create mode 100644 modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryClusterMetricsHolderMessage.java diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/NodeMetricsMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/NodeMetricsMessage.java index f11ea1d48c0ba..55b51cb041dba 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/NodeMetricsMessage.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/NodeMetricsMessage.java @@ -137,20 +137,20 @@ public class NodeMetricsMessage implements Message { private long curIdleTime = -1; /** */ - @Order(value = 25, method = "totalCpus") - private int availProcs = -1; + @Order(value = 25) + private int totalCpus = -1; /** */ @Order(value = 26, method = "currentCpuLoad") - private double load = -1; + private double curCpuLoad = -1; /** */ @Order(value = 27, method = "averageCpuLoad") - private double avgLoad = -1; + private double avgCpuLoad = -1; /** */ @Order(value = 28, method = "currentGcCpuLoad") - private double gcLoad = -1; + private double curGcCpuLoad = -1; /** */ @Order(value = 29, method = "heapMemoryInitialized") @@ -173,15 +173,15 @@ public class NodeMetricsMessage implements Message { private long heapTotal = -1; /** */ - @Order(value = 34, method = "heapMemoryInitialized") + @Order(value = 34, method = "nonHeapMemoryInitialized") private long nonHeapInit = -1; /** */ - @Order(value = 35, method = "heapMemoryUsed") + @Order(value = 35, method = "nonHeapMemoryUsed") private long nonHeapUsed = -1; /** */ - @Order(value = 36, method = "heapMemoryCommitted") + @Order(value = 36, method = "nonHeapMemoryCommitted") private long nonHeapCommitted = -1; /** */ @@ -253,8 +253,8 @@ public class NodeMetricsMessage implements Message { private long totalJobsExecTime = -1; /** */ - @Order(value = 54) - private long currentPmeDuration = -1; + @Order(value = 54, method = "currentPmeDuration") + private long curPmeDuration = -1; /** */ public NodeMetricsMessage() { @@ -295,10 +295,10 @@ public NodeMetricsMessage(Collection nodes) { totalExecTasks = 0; totalIdleTime = 0; curIdleTime = 0; - availProcs = 0; - load = 0; - avgLoad = 0; - gcLoad = 0; + totalCpus = 0; + curCpuLoad = 0; + avgCpuLoad = 0; + curGcCpuLoad = 0; heapInit = 0; heapUsed = 0; heapCommitted = 0; @@ -323,7 +323,7 @@ public NodeMetricsMessage(Collection nodes) { outMesQueueSize = 0; heapTotal = 0; totalNodes = nodes.size(); - currentPmeDuration = 0; + curPmeDuration = 0; for (ClusterNode node : nodes) { ClusterMetrics m = node.metrics(); @@ -399,9 +399,9 @@ public NodeMetricsMessage(Collection nodes) { rcvdBytesCnt += m.getReceivedBytesCount(); outMesQueueSize += m.getOutboundMessagesQueueSize(); - avgLoad += m.getCurrentCpuLoad(); + avgCpuLoad += m.getCurrentCpuLoad(); - currentPmeDuration = max(currentPmeDuration, m.getCurrentPmeDuration()); + curPmeDuration = max(curPmeDuration, m.getCurrentPmeDuration()); } curJobExecTime /= size; @@ -412,7 +412,7 @@ public NodeMetricsMessage(Collection nodes) { avgWaitingJobs /= size; avgJobExecTime /= size; avgJobWaitTime /= size; - avgLoad /= size; + avgCpuLoad /= size; if (!F.isEmpty(nodes)) { ClusterMetrics oldestNodeMetrics = oldest(nodes).metrics(); @@ -423,9 +423,9 @@ public NodeMetricsMessage(Collection nodes) { Map> neighborhood = U.neighborhood(nodes); - gcLoad = gcCpus(neighborhood); - load = cpus(neighborhood); - availProcs = cpuCnt(neighborhood); + curGcCpuLoad = currentGcCpuLoad(neighborhood); + curCpuLoad = currentCpuLoad(neighborhood); + totalCpus = cpuCnt(neighborhood); } /** */ @@ -464,10 +464,10 @@ public NodeMetricsMessage(ClusterMetrics metrics) { curIdleTime = metrics.getCurrentIdleTime(); totalIdleTime = metrics.getTotalIdleTime(); - availProcs = metrics.getTotalCpus(); - load = metrics.getCurrentCpuLoad(); - avgLoad = metrics.getAverageCpuLoad(); - gcLoad = metrics.getCurrentGcCpuLoad(); + totalCpus = metrics.getTotalCpus(); + curCpuLoad = metrics.getCurrentCpuLoad(); + avgCpuLoad = metrics.getAverageCpuLoad(); + curGcCpuLoad = metrics.getCurrentGcCpuLoad(); heapInit = metrics.getHeapMemoryInitialized(); heapUsed = metrics.getHeapMemoryUsed(); @@ -487,7 +487,7 @@ public NodeMetricsMessage(ClusterMetrics metrics) { lastDataVer = metrics.getLastDataVersion(); - currentPmeDuration = metrics.getCurrentPmeDuration(); + curPmeDuration = metrics.getCurrentPmeDuration(); totalNodes = metrics.getTotalNodes(); @@ -885,22 +885,22 @@ public void currentIdleTime(long curIdleTime) { /** */ public int totalCpus() { - return availProcs; + return totalCpus; } /** */ public double currentCpuLoad() { - return load; + return curCpuLoad; } /** */ public double averageCpuLoad() { - return avgLoad; + return avgCpuLoad; } /** */ public double currentGcCpuLoad() { - return gcLoad; + return curGcCpuLoad; } /** */ @@ -1020,43 +1020,43 @@ public int totalNodes() { /** */ public long currentPmeDuration() { - return currentPmeDuration; + return curPmeDuration; } /** * Sets available processors. * - * @param availProcs Available processors. + * @param totalCpus Available processors. */ - public void totalCpus(int availProcs) { - this.availProcs = availProcs; + public void totalCpus(int totalCpus) { + this.totalCpus = totalCpus; } /** * Sets current CPU load. * - * @param load Current CPU load. + * @param curCpuLoad Current CPU load. */ - public void currentCpuLoad(double load) { - this.load = load; + public void currentCpuLoad(double curCpuLoad) { + this.curCpuLoad = curCpuLoad; } /** * Sets CPU load average over the metrics history. * - * @param avgLoad CPU load average. + * @param avgCpuLoad CPU load average. */ - public void averageCpuLoad(double avgLoad) { - this.avgLoad = avgLoad; + public void averageCpuLoad(double avgCpuLoad) { + this.avgCpuLoad = avgCpuLoad; } /** * Sets current GC load. * - * @param gcLoad Current GC load. + * @param curGcCpuLoad Current GC load. */ - public void currentGcCpuLoad(double gcLoad) { - this.gcLoad = gcLoad; + public void currentGcCpuLoad(double curGcCpuLoad) { + this.curGcCpuLoad = curGcCpuLoad; } /** @@ -1263,7 +1263,7 @@ public void totalNodes(int totalNodes) { * @param curPmeDuration Execution duration for current partition map exchange. */ public void currentPmeDuration(long curPmeDuration) { - this.currentPmeDuration = curPmeDuration; + this.curPmeDuration = curPmeDuration; } /** @@ -1308,36 +1308,36 @@ private static int cpuCnt(Map> neighborhood) { * @param neighborhood Cluster neighborhood. * @return CPU load. */ - private static int cpus(Map> neighborhood) { - int cpus = 0; + private static double currentCpuLoad(Map> neighborhood) { + double curCpuLoad = 0.0; for (Collection nodes : neighborhood.values()) { ClusterNode first = F.first(nodes); // Projection can be empty if all nodes in it failed. if (first != null) - cpus += first.metrics().getCurrentCpuLoad(); + curCpuLoad += first.metrics().getCurrentCpuLoad(); } - return cpus; + return curCpuLoad; } /** * @param neighborhood Cluster neighborhood. * @return GC CPU load. */ - private static int gcCpus(Map> neighborhood) { - int cpus = 0; + private static double currentGcCpuLoad(Map> neighborhood) { + double curGcCpuLoad = 0; for (Collection nodes : neighborhood.values()) { ClusterNode first = F.first(nodes); // Projection can be empty if all nodes in it failed. if (first != null) - cpus += first.metrics().getCurrentGcCpuLoad(); + curGcCpuLoad += first.metrics().getCurrentGcCpuLoad(); } - return cpus; + return curGcCpuLoad; } /** {@inheritDoc} */ diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java index 473ff1533578b..5e1b77fefb2e8 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java @@ -72,6 +72,7 @@ import org.apache.ignite.cluster.ClusterNode; import org.apache.ignite.events.NodeValidationFailedEvent; import org.apache.ignite.failure.FailureContext; +import org.apache.ignite.internal.ClusterMetricsSnapshot; import org.apache.ignite.internal.IgniteEx; import org.apache.ignite.internal.IgniteFutureTimeoutCheckedException; import org.apache.ignite.internal.IgniteInterruptedCheckedException; @@ -7580,7 +7581,7 @@ private void processClientMetricsUpdateMessage(TcpDiscoveryClientMetricsUpdateMe ClientMessageWorker wrk = clientMsgWorkers.get(msg.creatorNodeId()); if (wrk != null) - wrk.metrics(msg.metrics()); + wrk.metrics(new ClusterMetricsSnapshot(msg.metricsMessage())); else if (log.isDebugEnabled()) log.debug("Received client metrics update message from unknown client node: " + msg); } diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryIoSession.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryIoSession.java index b64cea9133710..00f7f69b8d4da 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryIoSession.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryIoSession.java @@ -170,8 +170,8 @@ T readMessage() throws IgniteCheckedException, IOException { if (MESSAGE_SERIALIZATION != serMode) { detectSslAlert(serMode, in); - // There are many `X.hasCause` in the discovery errors processing which change connection recovery processing. - // It is better to throw an IOException on reading failures. Often happens at nodes stop and streams closing. + // IOException type is important for ServerImpl for connection error processing behavior. + // It may search the cause (X.hasCause). throw new IOException("Received unexpected byte while reading discovery message: " + serMode); } @@ -187,19 +187,48 @@ T readMessage() throws IgniteCheckedException, IOException { boolean finished; + byte[] unprocessedBytes = null; + int unprocessedBytesLen = 0; + do { // Should be cleared before first operation. msgBuf.clear(); - int read = in.read(msgBuf.array(), 0, msgBuf.limit()); + if (unprocessedBytes != null) { + assert unprocessedBytesLen == unprocessedBytes.length; + + msgBuf.put(unprocessedBytes); + + unprocessedBytes = null; + } + + int read = in.read(msgBuf.array(), msgBuf.position(), msgBuf.remaining()); if (read == -1) throw new EOFException("Connection closed before message was fully read."); - msgBuf.limit(read); + if (unprocessedBytesLen > 0) { + msgBuf.rewind(); + + msgBuf.limit(read + unprocessedBytesLen); + + unprocessedBytesLen = 0; + } + else + msgBuf.limit(read); finished = msgSer.readFrom(msg, msgReader); - } while (!finished); + + // We must keep the uprocessed bytes read from the socket. It won't return them again. + if (!finished && msgBuf.position() > 0 && msgBuf.remaining() > 0) { + unprocessedBytes = new byte[msgBuf.remaining()]; + + unprocessedBytesLen = unprocessedBytes.length; + + msgBuf.get(unprocessedBytes, 0, msgBuf.remaining()); + } + } + while (!finished); return (T)msg; } diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryAbstractMessage.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryAbstractMessage.java index 809cf9bbd798f..17083499ec434 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryAbstractMessage.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryAbstractMessage.java @@ -289,10 +289,8 @@ public void addFailedNode(UUID nodeId) { /** * @param failedNodes Failed nodes. */ - public void failedNodes(@Nullable Collection failedNodes) { - this.failedNodes = failedNodes == null - ? null - : failedNodes instanceof Set ? (Set)failedNodes : new HashSet<>(failedNodes); + public void failedNodes(@Nullable Set failedNodes) { + this.failedNodes = failedNodes; } /** diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryClientMetricsUpdateMessage.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryClientMetricsUpdateMessage.java index 8092ef3a7255f..144ada143e9df 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryClientMetricsUpdateMessage.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryClientMetricsUpdateMessage.java @@ -19,20 +19,28 @@ import java.util.UUID; import org.apache.ignite.cluster.ClusterMetrics; -import org.apache.ignite.internal.ClusterMetricsSnapshot; +import org.apache.ignite.internal.Order; +import org.apache.ignite.internal.managers.discovery.DiscoveryMessageFactory; import org.apache.ignite.internal.util.typedef.internal.S; +import org.apache.ignite.plugin.extensions.communication.Message; /** * Metrics update message. *

* Client sends his metrics in this message. */ -public class TcpDiscoveryClientMetricsUpdateMessage extends TcpDiscoveryAbstractMessage { +public class TcpDiscoveryClientMetricsUpdateMessage extends TcpDiscoveryAbstractMessage implements Message { /** */ private static final long serialVersionUID = 0L; /** */ - private final byte[] metrics; + @Order(value = 5, method = "metricsMessage") + private TcpDiscoveryClusterMetricsHolderMessage metricsMsg; + + /** Constructor for {@link DiscoveryMessageFactory}. */ + public TcpDiscoveryClientMetricsUpdateMessage() { + // No-op. + } /** * Constructor. @@ -43,16 +51,30 @@ public class TcpDiscoveryClientMetricsUpdateMessage extends TcpDiscoveryAbstract public TcpDiscoveryClientMetricsUpdateMessage(UUID creatorNodeId, ClusterMetrics metrics) { super(creatorNodeId); - this.metrics = ClusterMetricsSnapshot.serialize(metrics); + metricsMsg = new TcpDiscoveryClusterMetricsHolderMessage(metrics); + } + + /** + * Gets the metrics message. + * + * @return Metrics holder message. + */ + public TcpDiscoveryClusterMetricsHolderMessage metricsMessage() { + return metricsMsg; } /** - * Gets metrics map. + * Sets the metrics message. * - * @return Metrics map. + * @param metricsMsg Metrics holder message. */ - public ClusterMetrics metrics() { - return ClusterMetricsSnapshot.deserialize(metrics, 0); + public void metricsMessage(TcpDiscoveryClusterMetricsHolderMessage metricsMsg) { + this.metricsMsg = metricsMsg; + } + + /** {@inheritDoc} */ + @Override public short directType() { + return 11; } /** {@inheritDoc} */ diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryClusterMetricsHolderMessage.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryClusterMetricsHolderMessage.java new file mode 100644 index 0000000000000..9252a4addbb8a --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryClusterMetricsHolderMessage.java @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.spi.discovery.tcp.messages; + +import org.apache.ignite.cluster.ClusterMetrics; +import org.apache.ignite.internal.managers.discovery.DiscoveryMessageFactory; +import org.apache.ignite.internal.processors.cluster.NodeMetricsMessage; +import org.apache.ignite.internal.util.typedef.internal.S; + +/** + * Utility container message of the node metrics. Is not a pure {@link TcpDiscoveryAbstractMessage}. + * Reuses Communication's {@link NodeMetricsMessage}. + */ +public class TcpDiscoveryClusterMetricsHolderMessage extends NodeMetricsMessage { + /** Constructor for {@link DiscoveryMessageFactory}. */ + public TcpDiscoveryClusterMetricsHolderMessage() { + // No-op. + } + + /** @param metrics Metrics. */ + public TcpDiscoveryClusterMetricsHolderMessage(ClusterMetrics metrics) { + super(metrics); + } + + /** {@inheritDoc} */ + @Override public short directType() { + return -101; + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(TcpDiscoveryClusterMetricsHolderMessage.class, this, "super", super.toString()); + } +} diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryNodeLeftMessage.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryNodeLeftMessage.java index ffc1180890928..aec40df3a900d 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryNodeLeftMessage.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryNodeLeftMessage.java @@ -53,6 +53,6 @@ public TcpDiscoveryNodeLeftMessage(UUID creatorNodeId) { /** */ @Override public short directType() { - return 6; + return 11; } } diff --git a/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientRejoinTest.java b/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientRejoinTest.java index 28f3d7255db02..b29a8171fbce4 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientRejoinTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientRejoinTest.java @@ -264,7 +264,7 @@ public void testClientsReconnect() throws Exception { * @throws Exception If failed. */ @Test - public void testClientsRecocnnectDisabled() throws Exception { + public void testClientsReconnectDisabled() throws Exception { clientReconnectDisabled = true; Ignite srv1 = startGrid("server1"); From e911cb1d28f06e4468157dfb1a019f09fc3f7a38 Mon Sep 17 00:00:00 2001 From: Vladimir Steshin Date: Thu, 1 Jan 2026 16:42:27 +0300 Subject: [PATCH 06/16] +master --- .../discovery/DiscoveryMessageFactory.java | 3 - .../cluster/NodeMetricsMessage.java | 104 +++++++++--------- .../tracing/messages/SpanContainer.java | 2 +- .../ignite/spi/discovery/tcp/ServerImpl.java | 3 +- .../discovery/tcp/TcpDiscoveryIoSession.java | 55 ++++----- .../TcpDiscoveryAbstractTraceableMessage.java | 2 +- ...cpDiscoveryClientMetricsUpdateMessage.java | 38 ++----- .../messages/TcpDiscoveryNodeLeftMessage.java | 14 +-- 8 files changed, 86 insertions(+), 135 deletions(-) diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/DiscoveryMessageFactory.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/DiscoveryMessageFactory.java index cf2ea37fe71b1..1c01592fd0dd5 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/DiscoveryMessageFactory.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/DiscoveryMessageFactory.java @@ -29,7 +29,6 @@ import org.apache.ignite.internal.codegen.TcpDiscoveryHandshakeRequestSerializer; import org.apache.ignite.internal.codegen.TcpDiscoveryHandshakeResponseSerializer; import org.apache.ignite.internal.codegen.TcpDiscoveryLoopbackProblemMessageSerializer; -import org.apache.ignite.internal.codegen.TcpDiscoveryNodeLeftMessageSerializer; import org.apache.ignite.internal.codegen.TcpDiscoveryPingRequestSerializer; import org.apache.ignite.internal.codegen.TcpDiscoveryPingResponseSerializer; import org.apache.ignite.internal.codegen.TcpDiscoveryRingLatencyCheckMessageSerializer; @@ -47,7 +46,6 @@ import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryHandshakeRequest; import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryHandshakeResponse; import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryLoopbackProblemMessage; -import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryNodeLeftMessage; import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryPingRequest; import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryPingResponse; import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryRingLatencyCheckMessage; @@ -72,6 +70,5 @@ public class DiscoveryMessageFactory implements MessageFactoryProvider { factory.register((short)10, TcpDiscoveryHandshakeResponse::new, new TcpDiscoveryHandshakeResponseSerializer()); factory.register((short)11, TcpDiscoveryAuthFailedMessage::new, new TcpDiscoveryAuthFailedMessageSerializer()); factory.register((short)12, TcpDiscoveryDuplicateIdMessage::new, new TcpDiscoveryDuplicateIdMessageSerializer()); - factory.register((short)11, TcpDiscoveryNodeLeftMessage::new, new TcpDiscoveryNodeLeftMessageSerializer()); } } diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/NodeMetricsMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/NodeMetricsMessage.java index 55b51cb041dba..f11ea1d48c0ba 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/NodeMetricsMessage.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/NodeMetricsMessage.java @@ -137,20 +137,20 @@ public class NodeMetricsMessage implements Message { private long curIdleTime = -1; /** */ - @Order(value = 25) - private int totalCpus = -1; + @Order(value = 25, method = "totalCpus") + private int availProcs = -1; /** */ @Order(value = 26, method = "currentCpuLoad") - private double curCpuLoad = -1; + private double load = -1; /** */ @Order(value = 27, method = "averageCpuLoad") - private double avgCpuLoad = -1; + private double avgLoad = -1; /** */ @Order(value = 28, method = "currentGcCpuLoad") - private double curGcCpuLoad = -1; + private double gcLoad = -1; /** */ @Order(value = 29, method = "heapMemoryInitialized") @@ -173,15 +173,15 @@ public class NodeMetricsMessage implements Message { private long heapTotal = -1; /** */ - @Order(value = 34, method = "nonHeapMemoryInitialized") + @Order(value = 34, method = "heapMemoryInitialized") private long nonHeapInit = -1; /** */ - @Order(value = 35, method = "nonHeapMemoryUsed") + @Order(value = 35, method = "heapMemoryUsed") private long nonHeapUsed = -1; /** */ - @Order(value = 36, method = "nonHeapMemoryCommitted") + @Order(value = 36, method = "heapMemoryCommitted") private long nonHeapCommitted = -1; /** */ @@ -253,8 +253,8 @@ public class NodeMetricsMessage implements Message { private long totalJobsExecTime = -1; /** */ - @Order(value = 54, method = "currentPmeDuration") - private long curPmeDuration = -1; + @Order(value = 54) + private long currentPmeDuration = -1; /** */ public NodeMetricsMessage() { @@ -295,10 +295,10 @@ public NodeMetricsMessage(Collection nodes) { totalExecTasks = 0; totalIdleTime = 0; curIdleTime = 0; - totalCpus = 0; - curCpuLoad = 0; - avgCpuLoad = 0; - curGcCpuLoad = 0; + availProcs = 0; + load = 0; + avgLoad = 0; + gcLoad = 0; heapInit = 0; heapUsed = 0; heapCommitted = 0; @@ -323,7 +323,7 @@ public NodeMetricsMessage(Collection nodes) { outMesQueueSize = 0; heapTotal = 0; totalNodes = nodes.size(); - curPmeDuration = 0; + currentPmeDuration = 0; for (ClusterNode node : nodes) { ClusterMetrics m = node.metrics(); @@ -399,9 +399,9 @@ public NodeMetricsMessage(Collection nodes) { rcvdBytesCnt += m.getReceivedBytesCount(); outMesQueueSize += m.getOutboundMessagesQueueSize(); - avgCpuLoad += m.getCurrentCpuLoad(); + avgLoad += m.getCurrentCpuLoad(); - curPmeDuration = max(curPmeDuration, m.getCurrentPmeDuration()); + currentPmeDuration = max(currentPmeDuration, m.getCurrentPmeDuration()); } curJobExecTime /= size; @@ -412,7 +412,7 @@ public NodeMetricsMessage(Collection nodes) { avgWaitingJobs /= size; avgJobExecTime /= size; avgJobWaitTime /= size; - avgCpuLoad /= size; + avgLoad /= size; if (!F.isEmpty(nodes)) { ClusterMetrics oldestNodeMetrics = oldest(nodes).metrics(); @@ -423,9 +423,9 @@ public NodeMetricsMessage(Collection nodes) { Map> neighborhood = U.neighborhood(nodes); - curGcCpuLoad = currentGcCpuLoad(neighborhood); - curCpuLoad = currentCpuLoad(neighborhood); - totalCpus = cpuCnt(neighborhood); + gcLoad = gcCpus(neighborhood); + load = cpus(neighborhood); + availProcs = cpuCnt(neighborhood); } /** */ @@ -464,10 +464,10 @@ public NodeMetricsMessage(ClusterMetrics metrics) { curIdleTime = metrics.getCurrentIdleTime(); totalIdleTime = metrics.getTotalIdleTime(); - totalCpus = metrics.getTotalCpus(); - curCpuLoad = metrics.getCurrentCpuLoad(); - avgCpuLoad = metrics.getAverageCpuLoad(); - curGcCpuLoad = metrics.getCurrentGcCpuLoad(); + availProcs = metrics.getTotalCpus(); + load = metrics.getCurrentCpuLoad(); + avgLoad = metrics.getAverageCpuLoad(); + gcLoad = metrics.getCurrentGcCpuLoad(); heapInit = metrics.getHeapMemoryInitialized(); heapUsed = metrics.getHeapMemoryUsed(); @@ -487,7 +487,7 @@ public NodeMetricsMessage(ClusterMetrics metrics) { lastDataVer = metrics.getLastDataVersion(); - curPmeDuration = metrics.getCurrentPmeDuration(); + currentPmeDuration = metrics.getCurrentPmeDuration(); totalNodes = metrics.getTotalNodes(); @@ -885,22 +885,22 @@ public void currentIdleTime(long curIdleTime) { /** */ public int totalCpus() { - return totalCpus; + return availProcs; } /** */ public double currentCpuLoad() { - return curCpuLoad; + return load; } /** */ public double averageCpuLoad() { - return avgCpuLoad; + return avgLoad; } /** */ public double currentGcCpuLoad() { - return curGcCpuLoad; + return gcLoad; } /** */ @@ -1020,43 +1020,43 @@ public int totalNodes() { /** */ public long currentPmeDuration() { - return curPmeDuration; + return currentPmeDuration; } /** * Sets available processors. * - * @param totalCpus Available processors. + * @param availProcs Available processors. */ - public void totalCpus(int totalCpus) { - this.totalCpus = totalCpus; + public void totalCpus(int availProcs) { + this.availProcs = availProcs; } /** * Sets current CPU load. * - * @param curCpuLoad Current CPU load. + * @param load Current CPU load. */ - public void currentCpuLoad(double curCpuLoad) { - this.curCpuLoad = curCpuLoad; + public void currentCpuLoad(double load) { + this.load = load; } /** * Sets CPU load average over the metrics history. * - * @param avgCpuLoad CPU load average. + * @param avgLoad CPU load average. */ - public void averageCpuLoad(double avgCpuLoad) { - this.avgCpuLoad = avgCpuLoad; + public void averageCpuLoad(double avgLoad) { + this.avgLoad = avgLoad; } /** * Sets current GC load. * - * @param curGcCpuLoad Current GC load. + * @param gcLoad Current GC load. */ - public void currentGcCpuLoad(double curGcCpuLoad) { - this.curGcCpuLoad = curGcCpuLoad; + public void currentGcCpuLoad(double gcLoad) { + this.gcLoad = gcLoad; } /** @@ -1263,7 +1263,7 @@ public void totalNodes(int totalNodes) { * @param curPmeDuration Execution duration for current partition map exchange. */ public void currentPmeDuration(long curPmeDuration) { - this.curPmeDuration = curPmeDuration; + this.currentPmeDuration = curPmeDuration; } /** @@ -1308,36 +1308,36 @@ private static int cpuCnt(Map> neighborhood) { * @param neighborhood Cluster neighborhood. * @return CPU load. */ - private static double currentCpuLoad(Map> neighborhood) { - double curCpuLoad = 0.0; + private static int cpus(Map> neighborhood) { + int cpus = 0; for (Collection nodes : neighborhood.values()) { ClusterNode first = F.first(nodes); // Projection can be empty if all nodes in it failed. if (first != null) - curCpuLoad += first.metrics().getCurrentCpuLoad(); + cpus += first.metrics().getCurrentCpuLoad(); } - return curCpuLoad; + return cpus; } /** * @param neighborhood Cluster neighborhood. * @return GC CPU load. */ - private static double currentGcCpuLoad(Map> neighborhood) { - double curGcCpuLoad = 0; + private static int gcCpus(Map> neighborhood) { + int cpus = 0; for (Collection nodes : neighborhood.values()) { ClusterNode first = F.first(nodes); // Projection can be empty if all nodes in it failed. if (first != null) - curGcCpuLoad += first.metrics().getCurrentGcCpuLoad(); + cpus += first.metrics().getCurrentGcCpuLoad(); } - return curGcCpuLoad; + return cpus; } /** {@inheritDoc} */ diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/tracing/messages/SpanContainer.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/tracing/messages/SpanContainer.java index dc60d51f0d13f..c1f73f47d2cee 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/tracing/messages/SpanContainer.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/tracing/messages/SpanContainer.java @@ -46,7 +46,7 @@ public byte[] serializedSpanBytes() { * @param serializedSpan Serialized span. */ public void serializedSpanBytes(byte[] serializedSpan) { - serializedSpanBytes = serializedSpan.clone(); + this.serializedSpanBytes = serializedSpan.clone(); } /** diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java index 44ca6f0093f7c..aef67a57809d8 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java @@ -72,7 +72,6 @@ import org.apache.ignite.cluster.ClusterNode; import org.apache.ignite.events.NodeValidationFailedEvent; import org.apache.ignite.failure.FailureContext; -import org.apache.ignite.internal.ClusterMetricsSnapshot; import org.apache.ignite.internal.IgniteEx; import org.apache.ignite.internal.IgniteFutureTimeoutCheckedException; import org.apache.ignite.internal.IgniteInterruptedCheckedException; @@ -7581,7 +7580,7 @@ private void processClientMetricsUpdateMessage(TcpDiscoveryClientMetricsUpdateMe ClientMessageWorker wrk = clientMsgWorkers.get(msg.creatorNodeId()); if (wrk != null) - wrk.metrics(new ClusterMetricsSnapshot(msg.metricsMessage())); + wrk.metrics(msg.metrics()); else if (log.isDebugEnabled()) log.debug("Received client metrics update message from unknown client node: " + msg); } diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryIoSession.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryIoSession.java index 00f7f69b8d4da..92a20088bf4c5 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryIoSession.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryIoSession.java @@ -170,15 +170,12 @@ T readMessage() throws IgniteCheckedException, IOException { if (MESSAGE_SERIALIZATION != serMode) { detectSslAlert(serMode, in); - // IOException type is important for ServerImpl for connection error processing behavior. - // It may search the cause (X.hasCause). + // IOException type is important for ServerImpl. It may search the cause (X.hasCause). + // The connection error processing behavior depends on it. throw new IOException("Received unexpected byte while reading discovery message: " + serMode); } - byte b0 = (byte)in.read(); - byte b1 = (byte)in.read(); - - Message msg = spi.messageFactory().create(makeMessageType(b0, b1)); + Message msg = spi.messageFactory().create(makeMessageType((byte)in.read(), (byte)in.read())); msgReader.reset(); msgReader.setBuffer(msgBuf); @@ -187,48 +184,40 @@ T readMessage() throws IgniteCheckedException, IOException { boolean finished; - byte[] unprocessedBytes = null; - int unprocessedBytesLen = 0; + msgBuf.clear(); do { - // Should be cleared before first operation. - msgBuf.clear(); - - if (unprocessedBytes != null) { - assert unprocessedBytesLen == unprocessedBytes.length; - - msgBuf.put(unprocessedBytes); - - unprocessedBytes = null; - } - int read = in.read(msgBuf.array(), msgBuf.position(), msgBuf.remaining()); if (read == -1) throw new EOFException("Connection closed before message was fully read."); - if (unprocessedBytesLen > 0) { - msgBuf.rewind(); + msgBuf.limit(msgBuf.position() > 0 ? msgBuf.position() + read + 1 : read); - msgBuf.limit(read + unprocessedBytesLen); + finished = msgSer.readFrom(msg, msgReader); - unprocessedBytesLen = 0; - } - else - msgBuf.limit(read); + // We rely on the fact that Discovery only sends next message upon receiving a receipt for the previous one. + // This behaviour guarantees that we never read a next message from the buffer right after the end of + // the previous message. + assert msgBuf.remaining() == 0 || !finished : "Some data was read from the socket but left unprocessed."; - finished = msgSer.readFrom(msg, msgReader); + if (finished) + break; // We must keep the uprocessed bytes read from the socket. It won't return them again. - if (!finished && msgBuf.position() > 0 && msgBuf.remaining() > 0) { - unprocessedBytes = new byte[msgBuf.remaining()]; - - unprocessedBytesLen = unprocessedBytes.length; + byte[] unprocessedTail = null; - msgBuf.get(unprocessedBytes, 0, msgBuf.remaining()); + if (msgBuf.remaining() > 0) { + unprocessedTail = new byte[msgBuf.remaining()]; + msgBuf.get(unprocessedTail, 0, msgBuf.remaining()); } + + msgBuf.clear(); + + if (unprocessedTail != null) + msgBuf.put(unprocessedTail); } - while (!finished); + while (true); return (T)msg; } diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryAbstractTraceableMessage.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryAbstractTraceableMessage.java index 3df159a48e806..c76fc8694d9b9 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryAbstractTraceableMessage.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryAbstractTraceableMessage.java @@ -57,7 +57,7 @@ protected TcpDiscoveryAbstractTraceableMessage(UUID creatorNodeId) { protected TcpDiscoveryAbstractTraceableMessage(TcpDiscoveryAbstractTraceableMessage msg) { super(msg); - spanContainer = msg.spanContainer; + this.spanContainer = msg.spanContainer; } /** diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryClientMetricsUpdateMessage.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryClientMetricsUpdateMessage.java index 144ada143e9df..8092ef3a7255f 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryClientMetricsUpdateMessage.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryClientMetricsUpdateMessage.java @@ -19,28 +19,20 @@ import java.util.UUID; import org.apache.ignite.cluster.ClusterMetrics; -import org.apache.ignite.internal.Order; -import org.apache.ignite.internal.managers.discovery.DiscoveryMessageFactory; +import org.apache.ignite.internal.ClusterMetricsSnapshot; import org.apache.ignite.internal.util.typedef.internal.S; -import org.apache.ignite.plugin.extensions.communication.Message; /** * Metrics update message. *

* Client sends his metrics in this message. */ -public class TcpDiscoveryClientMetricsUpdateMessage extends TcpDiscoveryAbstractMessage implements Message { +public class TcpDiscoveryClientMetricsUpdateMessage extends TcpDiscoveryAbstractMessage { /** */ private static final long serialVersionUID = 0L; /** */ - @Order(value = 5, method = "metricsMessage") - private TcpDiscoveryClusterMetricsHolderMessage metricsMsg; - - /** Constructor for {@link DiscoveryMessageFactory}. */ - public TcpDiscoveryClientMetricsUpdateMessage() { - // No-op. - } + private final byte[] metrics; /** * Constructor. @@ -51,30 +43,16 @@ public TcpDiscoveryClientMetricsUpdateMessage() { public TcpDiscoveryClientMetricsUpdateMessage(UUID creatorNodeId, ClusterMetrics metrics) { super(creatorNodeId); - metricsMsg = new TcpDiscoveryClusterMetricsHolderMessage(metrics); - } - - /** - * Gets the metrics message. - * - * @return Metrics holder message. - */ - public TcpDiscoveryClusterMetricsHolderMessage metricsMessage() { - return metricsMsg; + this.metrics = ClusterMetricsSnapshot.serialize(metrics); } /** - * Sets the metrics message. + * Gets metrics map. * - * @param metricsMsg Metrics holder message. + * @return Metrics map. */ - public void metricsMessage(TcpDiscoveryClusterMetricsHolderMessage metricsMsg) { - this.metricsMsg = metricsMsg; - } - - /** {@inheritDoc} */ - @Override public short directType() { - return 11; + public ClusterMetrics metrics() { + return ClusterMetricsSnapshot.deserialize(metrics, 0); } /** {@inheritDoc} */ diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryNodeLeftMessage.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryNodeLeftMessage.java index aec40df3a900d..5d6df69b715b7 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryNodeLeftMessage.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryNodeLeftMessage.java @@ -18,9 +18,7 @@ package org.apache.ignite.spi.discovery.tcp.messages; import java.util.UUID; -import org.apache.ignite.internal.managers.discovery.DiscoveryMessageFactory; import org.apache.ignite.internal.util.typedef.internal.S; -import org.apache.ignite.plugin.extensions.communication.Message; /** * Sent by node that is stopping to coordinator across the ring, @@ -28,15 +26,10 @@ */ @TcpDiscoveryEnsureDelivery @TcpDiscoveryRedirectToClient -public class TcpDiscoveryNodeLeftMessage extends TcpDiscoveryAbstractTraceableMessage implements Message { +public class TcpDiscoveryNodeLeftMessage extends TcpDiscoveryAbstractTraceableMessage { /** */ private static final long serialVersionUID = 0L; - /** Default constructor for {@link DiscoveryMessageFactory}. */ - public TcpDiscoveryNodeLeftMessage() { - // No-op. - } - /** * Constructor. * @@ -50,9 +43,4 @@ public TcpDiscoveryNodeLeftMessage(UUID creatorNodeId) { @Override public String toString() { return S.toString(TcpDiscoveryNodeLeftMessage.class, this, "super", super.toString()); } - - /** */ - @Override public short directType() { - return 11; - } } From c86614457cf213b1dc75622e237a4c78cebf68d7 Mon Sep 17 00:00:00 2001 From: Vladimir Steshin Date: Sat, 3 Jan 2026 02:01:12 +0300 Subject: [PATCH 07/16] message reading fix --- .../ignite/spi/discovery/tcp/TcpDiscoveryIoSession.java | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryIoSession.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryIoSession.java index 92a20088bf4c5..edcab473b4c91 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryIoSession.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryIoSession.java @@ -192,7 +192,14 @@ T readMessage() throws IgniteCheckedException, IOException { if (read == -1) throw new EOFException("Connection closed before message was fully read."); - msgBuf.limit(msgBuf.position() > 0 ? msgBuf.position() + read + 1 : read); + if (msgBuf.position() > 0) { + msgBuf.limit(msgBuf.position() + read); + + // We've stored an unprocessed tail before. + msgBuf.rewind(); + } + else + msgBuf.limit(read); finished = msgSer.readFrom(msg, msgReader); From fe4210b416cc0713344851b7a19987001f37654c Mon Sep 17 00:00:00 2001 From: Vladimir Steshin Date: Sat, 3 Jan 2026 02:19:05 +0300 Subject: [PATCH 08/16] + TcpDiscoveryNodeLeftMessage --- .../discovery/DiscoveryMessageFactory.java | 3 +++ .../tcp/messages/TcpDiscoveryNodeLeftMessage.java | 14 +++++++++++++- 2 files changed, 16 insertions(+), 1 deletion(-) diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/DiscoveryMessageFactory.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/DiscoveryMessageFactory.java index 1c01592fd0dd5..3a2844af1ac57 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/DiscoveryMessageFactory.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/DiscoveryMessageFactory.java @@ -29,6 +29,7 @@ import org.apache.ignite.internal.codegen.TcpDiscoveryHandshakeRequestSerializer; import org.apache.ignite.internal.codegen.TcpDiscoveryHandshakeResponseSerializer; import org.apache.ignite.internal.codegen.TcpDiscoveryLoopbackProblemMessageSerializer; +import org.apache.ignite.internal.codegen.TcpDiscoveryNodeLeftMessageSerializer; import org.apache.ignite.internal.codegen.TcpDiscoveryPingRequestSerializer; import org.apache.ignite.internal.codegen.TcpDiscoveryPingResponseSerializer; import org.apache.ignite.internal.codegen.TcpDiscoveryRingLatencyCheckMessageSerializer; @@ -46,6 +47,7 @@ import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryHandshakeRequest; import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryHandshakeResponse; import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryLoopbackProblemMessage; +import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryNodeLeftMessage; import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryPingRequest; import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryPingResponse; import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryRingLatencyCheckMessage; @@ -70,5 +72,6 @@ public class DiscoveryMessageFactory implements MessageFactoryProvider { factory.register((short)10, TcpDiscoveryHandshakeResponse::new, new TcpDiscoveryHandshakeResponseSerializer()); factory.register((short)11, TcpDiscoveryAuthFailedMessage::new, new TcpDiscoveryAuthFailedMessageSerializer()); factory.register((short)12, TcpDiscoveryDuplicateIdMessage::new, new TcpDiscoveryDuplicateIdMessageSerializer()); + factory.register((short)13, TcpDiscoveryNodeLeftMessage::new, new TcpDiscoveryNodeLeftMessageSerializer()); } } diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryNodeLeftMessage.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryNodeLeftMessage.java index 5d6df69b715b7..b6ec6ff137d01 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryNodeLeftMessage.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryNodeLeftMessage.java @@ -18,7 +18,9 @@ package org.apache.ignite.spi.discovery.tcp.messages; import java.util.UUID; +import org.apache.ignite.internal.managers.discovery.DiscoveryMessageFactory; import org.apache.ignite.internal.util.typedef.internal.S; +import org.apache.ignite.plugin.extensions.communication.Message; /** * Sent by node that is stopping to coordinator across the ring, @@ -26,10 +28,15 @@ */ @TcpDiscoveryEnsureDelivery @TcpDiscoveryRedirectToClient -public class TcpDiscoveryNodeLeftMessage extends TcpDiscoveryAbstractTraceableMessage { +public class TcpDiscoveryNodeLeftMessage extends TcpDiscoveryAbstractTraceableMessage implements Message { /** */ private static final long serialVersionUID = 0L; + /** Constructor for {@link DiscoveryMessageFactory}. */ + public TcpDiscoveryNodeLeftMessage() { + // No-op. + } + /** * Constructor. * @@ -39,6 +46,11 @@ public TcpDiscoveryNodeLeftMessage(UUID creatorNodeId) { super(creatorNodeId); } + /** {@inheritDoc} */ + @Override public short directType() { + return 13; + } + /** {@inheritDoc} */ @Override public String toString() { return S.toString(TcpDiscoveryNodeLeftMessage.class, this, "super", super.toString()); From 2e3791d95b3a551ae5cbe0cd3cb7fcc748c238b5 Mon Sep 17 00:00:00 2001 From: Vladimir Steshin Date: Fri, 9 Jan 2026 20:20:23 +0300 Subject: [PATCH 09/16] fix --- .../org/apache/ignite/spi/discovery/tcp/ServerImpl.java | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java index aef67a57809d8..75a68d64f85df 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java @@ -3379,10 +3379,11 @@ private void sendMessageToClients(TcpDiscoveryAbstractMessage msg) { for (ClientMessageWorker clientMsgWorker : clientMsgWorkers.values()) { if (msgBytes == null) { try { - msgBytes = U.marshal(spi.marshaller(), msg); + msgBytes = clientMsgWorker.ses.serializeMessage(msg); } - catch (IgniteCheckedException e) { - U.error(log, "Failed to marshal message: " + msg, e); + catch (IgniteCheckedException | IOException e) { + U.error(log, "Failed to serialize message to a client: " + msg + ". Client id: " + + clientMsgWorker.clientNodeId, e); break; } From 31332433b3e4647c4346ef818b054bb634c0fa15 Mon Sep 17 00:00:00 2001 From: Vladimir Steshin Date: Tue, 13 Jan 2026 14:08:34 +0300 Subject: [PATCH 10/16] + master. + buffer --- .../discovery/DiscoveryMessageFactory.java | 3 + .../ignite/spi/discovery/tcp/ServerImpl.java | 2 +- .../discovery/tcp/TcpDiscoveryIoSession.java | 172 +++++++++++++++--- .../messages/TcpDiscoveryNodeLeftMessage.java | 2 +- 4 files changed, 156 insertions(+), 23 deletions(-) diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/DiscoveryMessageFactory.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/DiscoveryMessageFactory.java index 5f399ed0c46fd..85db770983f97 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/DiscoveryMessageFactory.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/DiscoveryMessageFactory.java @@ -30,6 +30,7 @@ import org.apache.ignite.internal.codegen.TcpDiscoveryHandshakeRequestSerializer; import org.apache.ignite.internal.codegen.TcpDiscoveryHandshakeResponseSerializer; import org.apache.ignite.internal.codegen.TcpDiscoveryLoopbackProblemMessageSerializer; +import org.apache.ignite.internal.codegen.TcpDiscoveryNodeLeftMessageSerializer; import org.apache.ignite.internal.codegen.TcpDiscoveryNodeMetricsMessageSerializer; import org.apache.ignite.internal.codegen.TcpDiscoveryPingRequestSerializer; import org.apache.ignite.internal.codegen.TcpDiscoveryPingResponseSerializer; @@ -49,6 +50,7 @@ import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryHandshakeRequest; import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryHandshakeResponse; import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryLoopbackProblemMessage; +import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryNodeLeftMessage; import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryNodeMetricsMessage; import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryPingRequest; import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryPingResponse; @@ -76,5 +78,6 @@ public class DiscoveryMessageFactory implements MessageFactoryProvider { factory.register((short)11, TcpDiscoveryAuthFailedMessage::new, new TcpDiscoveryAuthFailedMessageSerializer()); factory.register((short)12, TcpDiscoveryDuplicateIdMessage::new, new TcpDiscoveryDuplicateIdMessageSerializer()); factory.register((short)13, TcpDiscoveryClientMetricsUpdateMessage::new, new TcpDiscoveryClientMetricsUpdateMessageSerializer()); + factory.register((short)14, TcpDiscoveryNodeLeftMessage::new, new TcpDiscoveryNodeLeftMessageSerializer()); } } diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java index c19865d6e26dd..66fea323f0919 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java @@ -3383,7 +3383,7 @@ private void sendMessageToClients(TcpDiscoveryAbstractMessage msg) { msgBytes = clientMsgWorker.ses.serializeMessage(msg); } catch (IgniteCheckedException | IOException e) { - U.error(log, "Failed to serialize message to a client: " + msg + ". Client id: " + U.error(log, "Failed to serialize message to a client: " + msg + ", client id: " + clientMsgWorker.clientNodeId, e); break; diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryIoSession.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryIoSession.java index edcab473b4c91..c95e4b09b8a5d 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryIoSession.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryIoSession.java @@ -39,6 +39,7 @@ import org.apache.ignite.plugin.extensions.communication.Message; import org.apache.ignite.plugin.extensions.communication.MessageSerializer; import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryAbstractMessage; +import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.Nullable; import static org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi.makeMessageType; @@ -92,6 +93,9 @@ public class TcpDiscoveryIoSession { /** Intermediate buffer for serializing discovery messages. */ private final ByteBuffer msgBuf; + /** */ + private byte[] unprocessedReadTail; + /** * Creates a new discovery I/O session bound to the given socket. * @@ -161,10 +165,27 @@ void writeMessage(TcpDiscoveryAbstractMessage msg) throws IgniteCheckedException * @throws IgniteCheckedException If deserialization fails. */ T readMessage() throws IgniteCheckedException, IOException { - byte serMode = (byte)in.read(); + byte serMode; + InputStream in; + + if (unprocessedReadTail == null) { + in = this.in; + + serMode = (byte)in.read(); + } + else { + serMode = unprocessedReadTail[0]; - if (JAVA_SERIALIZATION == serMode) - return U.unmarshal(spi.marshaller(), in, clsLdr); + in = new PrefixedBufferedInputStream(unprocessedReadTail, 1, this.in); + + unprocessedReadTail = null; + } + + if (JAVA_SERIALIZATION == serMode) { + T m = U.unmarshal(spi.marshaller(), in, clsLdr); + + return m; + } try { if (MESSAGE_SERIALIZATION != serMode) { @@ -175,6 +196,8 @@ T readMessage() throws IgniteCheckedException, IOException { throw new IOException("Received unexpected byte while reading discovery message: " + serMode); } + msgBuf.clear(); + Message msg = spi.messageFactory().create(makeMessageType((byte)in.read(), (byte)in.read())); msgReader.reset(); @@ -184,9 +207,13 @@ T readMessage() throws IgniteCheckedException, IOException { boolean finished; - msgBuf.clear(); - do { + if (unprocessedReadTail != null) { + msgBuf.put(unprocessedReadTail); + + unprocessedReadTail = null; + } + int read = in.read(msgBuf.array(), msgBuf.position(), msgBuf.remaining()); if (read == -1) @@ -203,26 +230,20 @@ T readMessage() throws IgniteCheckedException, IOException { finished = msgSer.readFrom(msg, msgReader); - // We rely on the fact that Discovery only sends next message upon receiving a receipt for the previous one. + // Server Discovery only sends next message to next Server upon receiving a receipt for the previous one. // This behaviour guarantees that we never read a next message from the buffer right after the end of - // the previous message. - assert msgBuf.remaining() == 0 || !finished : "Some data was read from the socket but left unprocessed."; - - if (finished) - break; - - // We must keep the uprocessed bytes read from the socket. It won't return them again. - byte[] unprocessedTail = null; - + // the previous message. But it is not guaranteed for clients where messages aren't acknowledged. + // Thus, we have to keep the uprocessed bytes read from the socket. It won't return them again. if (msgBuf.remaining() > 0) { - unprocessedTail = new byte[msgBuf.remaining()]; - msgBuf.get(unprocessedTail, 0, msgBuf.remaining()); - } + unprocessedReadTail = new byte[msgBuf.remaining()]; - msgBuf.clear(); + msgBuf.get(unprocessedReadTail, 0, msgBuf.remaining()); + } - if (unprocessedTail != null) - msgBuf.put(unprocessedTail); + if (finished) + break; + else + msgBuf.clear(); } while (true); @@ -320,4 +341,113 @@ private void detectSslAlert(byte firstByte, InputStream in) throws IOException { if (hex.matches("15....00")) throw new StreamCorruptedException("invalid stream header: " + hex); } + + /** */ + private static class PrefixedBufferedInputStream extends BufferedInputStream { + /** */ + private int prefixLeft; + + /** */ + private final byte[] prefixBuf; + + /** */ + private PrefixedBufferedInputStream(byte[] prefix, int offset, InputStream in) { + super(in); + + this.prefixBuf = prefix; + prefixLeft = prefix.length - offset; + } + + /** {@inheritDoc} */ + @Override public synchronized int read() throws IOException { + if (prefixLeft > 0) { + --prefixLeft; + + return prefixBuf[prefixBuf.length - prefixLeft + 1]; + } + + return super.read(); + } + + /** {@inheritDoc} */ + @Override public synchronized int read(@NotNull byte[] b, int off, int len) throws IOException { + int len0 = 0; + + if (len > b.length - off) + len = b.length - off; + + if (prefixLeft > 0) { + len0 = Math.min(len, prefixLeft); + + System.arraycopy(prefixBuf, prefixBuf.length - prefixLeft, b, off, len0); + + prefixLeft -= len0; + + assert prefixLeft >= 0; + + if (len0 == len) + return len0; + } + + return len0 + super.read(b, off + len0, len - len0); + } + + /** {@inheritDoc} */ + @Override public int read(@NotNull byte[] b) throws IOException { + return read(b, 0, b.length); + } + + /** {@inheritDoc} */ + @Override public synchronized int available() throws IOException { + return super.available() + prefixLeft; + } + + /** {@inheritDoc} */ + @Override public synchronized void mark(int readlimit) { + throw new UnsupportedOperationException("mark() is not supported."); + } + + /** {@inheritDoc} */ + @Override public boolean markSupported() { + return false; + } + + /** {@inheritDoc} */ + @Override public synchronized void reset() throws IOException { + throw new UnsupportedOperationException("reset() is not supported."); + } + + /** {@inheritDoc} */ + @Override public synchronized long skip(long n) throws IOException { + throw new UnsupportedOperationException("skip() is not supported."); + } + + /** {@inheritDoc} */ + @Override public long transferTo(OutputStream out) throws IOException { + throw new UnsupportedOperationException("transferTo() is not supported."); + } + + /** {@inheritDoc} */ + @Override public int readNBytes(byte[] b, int off, int len) throws IOException { + throw new UnsupportedOperationException("readNBytes() is not supported."); + } + + /** {@inheritDoc} */ + @Override public @NotNull byte[] readAllBytes() throws IOException { + throw new UnsupportedOperationException("readAllBytes() is not supported."); + } + + /** {@inheritDoc} */ + @Override public @NotNull byte[] readNBytes(int len) throws IOException { + throw new UnsupportedOperationException("readNBytes() is not supported."); + } + + /** {@inheritDoc} */ + @Override public void close() throws IOException { + prefixLeft = 0; + + super.close(); + } + } } + diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryNodeLeftMessage.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryNodeLeftMessage.java index b6ec6ff137d01..543a06fe4dc7f 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryNodeLeftMessage.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryNodeLeftMessage.java @@ -48,7 +48,7 @@ public TcpDiscoveryNodeLeftMessage(UUID creatorNodeId) { /** {@inheritDoc} */ @Override public short directType() { - return 13; + return 14; } /** {@inheritDoc} */ From 556fb21ca8a9245f517ea73b789281f9adfeb6f4 Mon Sep 17 00:00:00 2001 From: Vladimir Steshin Date: Tue, 13 Jan 2026 18:33:55 +0300 Subject: [PATCH 11/16] separated buffers. Combined burffer read --- .../discovery/tcp/TcpDiscoveryIoSession.java | 172 ++++++++++-------- 1 file changed, 97 insertions(+), 75 deletions(-) diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryIoSession.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryIoSession.java index c95e4b09b8a5d..66b4f514188ba 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryIoSession.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryIoSession.java @@ -19,6 +19,7 @@ import java.io.BufferedInputStream; import java.io.BufferedOutputStream; +import java.io.ByteArrayInputStream; import java.io.ByteArrayOutputStream; import java.io.EOFException; import java.io.IOException; @@ -88,13 +89,13 @@ public class TcpDiscoveryIoSession { private final OutputStream out; /** Buffered socket input stream. */ - private final InputStream in; + private final PrefixedBufferedInputStream in; - /** Intermediate buffer for serializing discovery messages. */ - private final ByteBuffer msgBuf; + /** Intermediate buffer for deserializing discovery messages. */ + private final ByteBuffer msgInBuf; - /** */ - private byte[] unprocessedReadTail; + /** Intermediate buffer for serializing discovery messages. */ + private final ByteBuffer msgOutBuf; /** * Creates a new discovery I/O session bound to the given socket. @@ -109,7 +110,8 @@ public class TcpDiscoveryIoSession { clsLdr = U.resolveClassLoader(spi.ignite().configuration()); - msgBuf = ByteBuffer.allocate(MSG_BUFFER_SIZE); + msgInBuf = ByteBuffer.allocate(MSG_BUFFER_SIZE); + msgOutBuf = ByteBuffer.allocate(MSG_BUFFER_SIZE); msgWriter = new DirectMessageWriter(spi.messageFactory()); msgReader = new DirectMessageReader(spi.messageFactory(), null); @@ -119,7 +121,7 @@ public class TcpDiscoveryIoSession { int rcvBufSize = sock.getReceiveBufferSize() > 0 ? sock.getReceiveBufferSize() : DFLT_SOCK_BUFFER_SIZE; out = new BufferedOutputStream(sock.getOutputStream(), sendBufSize); - in = new BufferedInputStream(sock.getInputStream(), rcvBufSize); + in = new PrefixedBufferedInputStream(new BufferedInputStream(sock.getInputStream(), rcvBufSize)); } catch (IOException e) { throw new IgniteException(e); @@ -165,27 +167,12 @@ void writeMessage(TcpDiscoveryAbstractMessage msg) throws IgniteCheckedException * @throws IgniteCheckedException If deserialization fails. */ T readMessage() throws IgniteCheckedException, IOException { - byte serMode; - InputStream in; + ByteBuffer msgBuf = msgInBuf; - if (unprocessedReadTail == null) { - in = this.in; + byte serMode = (byte)in.read(); - serMode = (byte)in.read(); - } - else { - serMode = unprocessedReadTail[0]; - - in = new PrefixedBufferedInputStream(unprocessedReadTail, 1, this.in); - - unprocessedReadTail = null; - } - - if (JAVA_SERIALIZATION == serMode) { - T m = U.unmarshal(spi.marshaller(), in, clsLdr); - - return m; - } + if (JAVA_SERIALIZATION == serMode) + return U.unmarshal(spi.marshaller(), in, clsLdr); try { if (MESSAGE_SERIALIZATION != serMode) { @@ -196,8 +183,6 @@ T readMessage() throws IgniteCheckedException, IOException { throw new IOException("Received unexpected byte while reading discovery message: " + serMode); } - msgBuf.clear(); - Message msg = spi.messageFactory().create(makeMessageType((byte)in.read(), (byte)in.read())); msgReader.reset(); @@ -207,43 +192,34 @@ T readMessage() throws IgniteCheckedException, IOException { boolean finished; - do { - if (unprocessedReadTail != null) { - msgBuf.put(unprocessedReadTail); - - unprocessedReadTail = null; - } + msgBuf.clear(); + do { int read = in.read(msgBuf.array(), msgBuf.position(), msgBuf.remaining()); if (read == -1) throw new EOFException("Connection closed before message was fully read."); - if (msgBuf.position() > 0) { - msgBuf.limit(msgBuf.position() + read); - - // We've stored an unprocessed tail before. - msgBuf.rewind(); - } - else - msgBuf.limit(read); + msgBuf.limit(read); finished = msgSer.readFrom(msg, msgReader); // Server Discovery only sends next message to next Server upon receiving a receipt for the previous one. // This behaviour guarantees that we never read a next message from the buffer right after the end of - // the previous message. But it is not guaranteed for clients where messages aren't acknowledged. + // the previous message. But it is not guaranteed with Client Discovery where messages aren't acknowledged. // Thus, we have to keep the uprocessed bytes read from the socket. It won't return them again. if (msgBuf.remaining() > 0) { - unprocessedReadTail = new byte[msgBuf.remaining()]; + byte[] unprocessedReadTail = new byte[msgBuf.remaining()]; msgBuf.get(unprocessedReadTail, 0, msgBuf.remaining()); + + in.acceptPrefixBuffer(unprocessedReadTail); } if (finished) break; - else - msgBuf.clear(); + + msgBuf.clear(); } while (true); @@ -305,6 +281,8 @@ public Socket socket() { * @throws IOException If serialization fails. */ private void serializeMessage(Message m, OutputStream out) throws IOException { + ByteBuffer msgBuf = msgOutBuf; + MessageSerializer msgSer = spi.messageFactory().serializer(m.directType()); msgWriter.reset(); @@ -345,51 +323,89 @@ private void detectSslAlert(byte firstByte, InputStream in) throws IOException { /** */ private static class PrefixedBufferedInputStream extends BufferedInputStream { /** */ - private int prefixLeft; + private ByteArrayInputStream bais; /** */ - private final byte[] prefixBuf; + private PrefixedBufferedInputStream(InputStream in) { + super(in); + } /** */ - private PrefixedBufferedInputStream(byte[] prefix, int offset, InputStream in) { - super(in); + public void acceptPrefixBuffer(byte[] prefixBuf) { + assert prefixBytesLeft() == 0; - this.prefixBuf = prefix; - prefixLeft = prefix.length - offset; + bais = new ByteArrayInputStream(prefixBuf); } /** {@inheritDoc} */ @Override public synchronized int read() throws IOException { - if (prefixLeft > 0) { - --prefixLeft; + if (prefixBytesLeft() > 0) { + int res = bais.read(); + + checkPrefixBufferExhausted(); - return prefixBuf[prefixBuf.length - prefixLeft + 1]; + return res; } return super.read(); } - /** {@inheritDoc} */ - @Override public synchronized int read(@NotNull byte[] b, int off, int len) throws IOException { - int len0 = 0; + /** */ + private int prefixBytesLeft() { + return bais == null ? 0 : bais.available(); + //return prefixBuf.length - prefOffset; + } + + /** */ + private void checkPrefixBufferExhausted() { + if (bais != null && bais.available() == 0) + bais = null; + } - if (len > b.length - off) - len = b.length - off; + /** {@inheritDoc} */ + @Override public int read(@NotNull byte[] b, int off, int len) throws IOException { + int len0 = readPrefixBuffer(b, off, len); + + if (len0 == len) + return len0; + +// int len0 = 0; +// +// if (len > b.length - off) +// len = b.length - off; +// +// if (prefLeft() > 0) { +// len0 = Math.min(len, prefLeft()); +// +// System.arraycopy(prefixBuf, prefOffset, b, off, len0); +// +// prefOffset += len0; +// +// assert prefLeft() >= 0; +// +// if (len0 == len) +// return len0; +// } +// + return len0 + super.read(b, off + len0, len - len0); + } - if (prefixLeft > 0) { - len0 = Math.min(len, prefixLeft); + /** */ + private int readPrefixBuffer(byte[] b, int off, int len) { + int res = 0; - System.arraycopy(prefixBuf, prefixBuf.length - prefixLeft, b, off, len0); + int prefixBytesLeft = prefixBytesLeft(); - prefixLeft -= len0; + if (prefixBytesLeft > 0) { + if (len > b.length - off) + len = b.length - off; - assert prefixLeft >= 0; + res = bais.read(b, off, Math.min(len, prefixBytesLeft)); - if (len0 == len) - return len0; + checkPrefixBufferExhausted(); } - return len0 + super.read(b, off + len0, len - len0); + return res; } /** {@inheritDoc} */ @@ -398,12 +414,12 @@ private PrefixedBufferedInputStream(byte[] prefix, int offset, InputStream in) { } /** {@inheritDoc} */ - @Override public synchronized int available() throws IOException { - return super.available() + prefixLeft; + @Override public int available() throws IOException { + return super.available() + prefixBytesLeft(); } /** {@inheritDoc} */ - @Override public synchronized void mark(int readlimit) { + @Override public void mark(int readlimit) { throw new UnsupportedOperationException("mark() is not supported."); } @@ -413,12 +429,12 @@ private PrefixedBufferedInputStream(byte[] prefix, int offset, InputStream in) { } /** {@inheritDoc} */ - @Override public synchronized void reset() throws IOException { + @Override public void reset() throws IOException { throw new UnsupportedOperationException("reset() is not supported."); } /** {@inheritDoc} */ - @Override public synchronized long skip(long n) throws IOException { + @Override public long skip(long n) throws IOException { throw new UnsupportedOperationException("skip() is not supported."); } @@ -429,7 +445,9 @@ private PrefixedBufferedInputStream(byte[] prefix, int offset, InputStream in) { /** {@inheritDoc} */ @Override public int readNBytes(byte[] b, int off, int len) throws IOException { - throw new UnsupportedOperationException("readNBytes() is not supported."); + int len0 = readPrefixBuffer(b, off, len); + + return super.readNBytes(b, off + len0, len - len0); } /** {@inheritDoc} */ @@ -444,7 +462,11 @@ private PrefixedBufferedInputStream(byte[] prefix, int offset, InputStream in) { /** {@inheritDoc} */ @Override public void close() throws IOException { - prefixLeft = 0; + if (bais != null) { + bais.close(); + + bais = null; + } super.close(); } From 26a9ab68c198f07ab3a89b967b765fbb07239c3e Mon Sep 17 00:00:00 2001 From: Vladimir Steshin Date: Tue, 13 Jan 2026 18:51:26 +0300 Subject: [PATCH 12/16] fix --- .../discovery/tcp/TcpDiscoveryIoSession.java | 33 ++----------------- 1 file changed, 3 insertions(+), 30 deletions(-) diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryIoSession.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryIoSession.java index 66b4f514188ba..0f47bfcaa8021 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryIoSession.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryIoSession.java @@ -91,11 +91,8 @@ public class TcpDiscoveryIoSession { /** Buffered socket input stream. */ private final PrefixedBufferedInputStream in; - /** Intermediate buffer for deserializing discovery messages. */ - private final ByteBuffer msgInBuf; - /** Intermediate buffer for serializing discovery messages. */ - private final ByteBuffer msgOutBuf; + private final ByteBuffer msgBuf; /** * Creates a new discovery I/O session bound to the given socket. @@ -110,8 +107,7 @@ public class TcpDiscoveryIoSession { clsLdr = U.resolveClassLoader(spi.ignite().configuration()); - msgInBuf = ByteBuffer.allocate(MSG_BUFFER_SIZE); - msgOutBuf = ByteBuffer.allocate(MSG_BUFFER_SIZE); + msgBuf = ByteBuffer.allocate(MSG_BUFFER_SIZE); msgWriter = new DirectMessageWriter(spi.messageFactory()); msgReader = new DirectMessageReader(spi.messageFactory(), null); @@ -167,8 +163,6 @@ void writeMessage(TcpDiscoveryAbstractMessage msg) throws IgniteCheckedException * @throws IgniteCheckedException If deserialization fails. */ T readMessage() throws IgniteCheckedException, IOException { - ByteBuffer msgBuf = msgInBuf; - byte serMode = (byte)in.read(); if (JAVA_SERIALIZATION == serMode) @@ -281,8 +275,6 @@ public Socket socket() { * @throws IOException If serialization fails. */ private void serializeMessage(Message m, OutputStream out) throws IOException { - ByteBuffer msgBuf = msgOutBuf; - MessageSerializer msgSer = spi.messageFactory().serializer(m.directType()); msgWriter.reset(); @@ -338,7 +330,7 @@ public void acceptPrefixBuffer(byte[] prefixBuf) { } /** {@inheritDoc} */ - @Override public synchronized int read() throws IOException { + @Override public int read() throws IOException { if (prefixBytesLeft() > 0) { int res = bais.read(); @@ -353,7 +345,6 @@ public void acceptPrefixBuffer(byte[] prefixBuf) { /** */ private int prefixBytesLeft() { return bais == null ? 0 : bais.available(); - //return prefixBuf.length - prefOffset; } /** */ @@ -369,24 +360,6 @@ private void checkPrefixBufferExhausted() { if (len0 == len) return len0; -// int len0 = 0; -// -// if (len > b.length - off) -// len = b.length - off; -// -// if (prefLeft() > 0) { -// len0 = Math.min(len, prefLeft()); -// -// System.arraycopy(prefixBuf, prefOffset, b, off, len0); -// -// prefOffset += len0; -// -// assert prefLeft() >= 0; -// -// if (len0 == len) -// return len0; -// } -// return len0 + super.read(b, off + len0, len - len0); } From 4961ad617fbd9b2efb8c64db7cb349d8f2575234 Mon Sep 17 00:00:00 2001 From: Vladimir Steshin Date: Wed, 14 Jan 2026 11:43:00 +0300 Subject: [PATCH 13/16] +TODO --- .../java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java | 1 + 1 file changed, 1 insertion(+) diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java index 66fea323f0919..b61ea8fecebcf 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java @@ -3400,6 +3400,7 @@ private void sendMessageToClients(TcpDiscoveryAbstractMessage msg) { if (clientMsgWorker.clientNodeId.equals(node.id())) { try { + // TODO: https://issues.apache.org/jira/browse/IGNITE-27556 refactor serialization. msg0 = U.unmarshal(spi.marshaller(), msgBytes, U.resolveClassLoader(spi.ignite().configuration())); From e266a332fd7081baf444eae77b880a378a42cef8 Mon Sep 17 00:00:00 2001 From: Vladimir Steshin Date: Fri, 16 Jan 2026 10:34:07 +0300 Subject: [PATCH 14/16] review fixes --- .../discovery/tcp/TcpDiscoveryIoSession.java | 124 +++++++++--------- 1 file changed, 64 insertions(+), 60 deletions(-) diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryIoSession.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryIoSession.java index 0f47bfcaa8021..4dc254204d41e 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryIoSession.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryIoSession.java @@ -186,9 +186,9 @@ T readMessage() throws IgniteCheckedException, IOException { boolean finished; - msgBuf.clear(); - do { + msgBuf.clear(); + int read = in.read(msgBuf.array(), msgBuf.position(), msgBuf.remaining()); if (read == -1) @@ -202,20 +202,15 @@ T readMessage() throws IgniteCheckedException, IOException { // This behaviour guarantees that we never read a next message from the buffer right after the end of // the previous message. But it is not guaranteed with Client Discovery where messages aren't acknowledged. // Thus, we have to keep the uprocessed bytes read from the socket. It won't return them again. - if (msgBuf.remaining() > 0) { + if (msgBuf.hasRemaining()) { byte[] unprocessedReadTail = new byte[msgBuf.remaining()]; msgBuf.get(unprocessedReadTail, 0, msgBuf.remaining()); in.acceptPrefixBuffer(unprocessedReadTail); } - - if (finished) - break; - - msgBuf.clear(); } - while (true); + while (!finished); return (T)msg; } @@ -312,27 +307,30 @@ private void detectSslAlert(byte firstByte, InputStream in) throws IOException { throw new StreamCorruptedException("invalid stream header: " + hex); } - /** */ + /** + * Input stream allowing to read some data first as a prefix of an original input stream. + * Supports only basic read methods. + */ private static class PrefixedBufferedInputStream extends BufferedInputStream { - /** */ - private ByteArrayInputStream bais; + /** Prefix data input stream to read before the original input stream. */ + @Nullable private ByteArrayInputStream prefixIs; - /** */ - private PrefixedBufferedInputStream(InputStream in) { - super(in); + /** @param srcIs Original input stream to read when {@link #prefixIs} is empty. */ + private PrefixedBufferedInputStream(InputStream srcIs) { + super(srcIs); } - /** */ - public void acceptPrefixBuffer(byte[] prefixBuf) { + /** @param prefixData Prefix data to read before the original input stream. */ + private void acceptPrefixBuffer(byte[] prefixData) { assert prefixBytesLeft() == 0; - bais = new ByteArrayInputStream(prefixBuf); + prefixIs = new ByteArrayInputStream(prefixData); } /** {@inheritDoc} */ @Override public int read() throws IOException { if (prefixBytesLeft() > 0) { - int res = bais.read(); + int res = prefixIs.read(); checkPrefixBufferExhausted(); @@ -342,27 +340,50 @@ public void acceptPrefixBuffer(byte[] prefixBuf) { return super.read(); } - /** */ - private int prefixBytesLeft() { - return bais == null ? 0 : bais.available(); - } - - /** */ - private void checkPrefixBufferExhausted() { - if (bais != null && bais.available() == 0) - bais = null; - } - /** {@inheritDoc} */ @Override public int read(@NotNull byte[] b, int off, int len) throws IOException { int len0 = readPrefixBuffer(b, off, len); + assert len0 <= len; + if (len0 == len) return len0; return len0 + super.read(b, off + len0, len - len0); } + /** {@inheritDoc} */ + @Override public int read(@NotNull byte[] b) throws IOException { + return read(b, 0, b.length); + } + + /** {@inheritDoc} */ + @Override public int readNBytes(byte[] b, int off, int len) throws IOException { + int len0 = readPrefixBuffer(b, off, len); + + return super.readNBytes(b, off + len0, len - len0); + } + + /** {@inheritDoc} */ + @Override public int available() throws IOException { + // Original input stream may return Integer#MAX_VALUE. + if (super.available() > Integer.MAX_VALUE - prefixBytesLeft()) + return super.available(); + + return super.available() + prefixBytesLeft(); + } + + /** {@inheritDoc} */ + @Override public void close() throws IOException { + if (prefixIs != null) { + prefixIs.close(); + + prefixIs = null; + } + + super.close(); + } + /** */ private int readPrefixBuffer(byte[] b, int off, int len) { int res = 0; @@ -373,7 +394,7 @@ private int readPrefixBuffer(byte[] b, int off, int len) { if (len > b.length - off) len = b.length - off; - res = bais.read(b, off, Math.min(len, prefixBytesLeft)); + res = prefixIs.read(b, off, Math.min(len, prefixBytesLeft)); checkPrefixBufferExhausted(); } @@ -381,14 +402,15 @@ private int readPrefixBuffer(byte[] b, int off, int len) { return res; } - /** {@inheritDoc} */ - @Override public int read(@NotNull byte[] b) throws IOException { - return read(b, 0, b.length); + /** */ + private int prefixBytesLeft() { + return prefixIs == null ? 0 : prefixIs.available(); } - /** {@inheritDoc} */ - @Override public int available() throws IOException { - return super.available() + prefixBytesLeft(); + /** */ + private void checkPrefixBufferExhausted() { + if (prefixIs != null && prefixIs.available() == 0) + prefixIs = null; } /** {@inheritDoc} */ @@ -402,47 +424,29 @@ private int readPrefixBuffer(byte[] b, int off, int len) { } /** {@inheritDoc} */ - @Override public void reset() throws IOException { + @Override public void reset() { throw new UnsupportedOperationException("reset() is not supported."); } /** {@inheritDoc} */ - @Override public long skip(long n) throws IOException { + @Override public long skip(long n) { throw new UnsupportedOperationException("skip() is not supported."); } /** {@inheritDoc} */ - @Override public long transferTo(OutputStream out) throws IOException { + @Override public long transferTo(OutputStream out) { throw new UnsupportedOperationException("transferTo() is not supported."); } /** {@inheritDoc} */ - @Override public int readNBytes(byte[] b, int off, int len) throws IOException { - int len0 = readPrefixBuffer(b, off, len); - - return super.readNBytes(b, off + len0, len - len0); - } - - /** {@inheritDoc} */ - @Override public @NotNull byte[] readAllBytes() throws IOException { + @Override public @NotNull byte[] readAllBytes() { throw new UnsupportedOperationException("readAllBytes() is not supported."); } /** {@inheritDoc} */ - @Override public @NotNull byte[] readNBytes(int len) throws IOException { + @Override public @NotNull byte[] readNBytes(int len) { throw new UnsupportedOperationException("readNBytes() is not supported."); } - - /** {@inheritDoc} */ - @Override public void close() throws IOException { - if (bais != null) { - bais.close(); - - bais = null; - } - - super.close(); - } } } From b0b88c99272270dedf6c9db3708d2305db99abf6 Mon Sep 17 00:00:00 2001 From: Vladimir Steshin Date: Thu, 22 Jan 2026 11:13:30 +0300 Subject: [PATCH 15/16] checkstyle --- .../internal/managers/discovery/DiscoveryMessageFactory.java | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/DiscoveryMessageFactory.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/DiscoveryMessageFactory.java index fb19d1adbc34d..c1421ae064f06 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/DiscoveryMessageFactory.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/DiscoveryMessageFactory.java @@ -60,12 +60,11 @@ import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryLoopbackProblemMessage; import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryMetricsUpdateMessage; import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryNodeFullMetricsMessage; +import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryNodeLeftMessage; import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryNodeMetricsMessage; import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryPingRequest; import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryPingResponse; import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryRingLatencyCheckMessage; -import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryNodeLeftMessage; -import org.apache.ignite.internal.codegen.TcpDiscoveryNodeLeftMessageSerializer; /** Message factory for discovery messages. */ public class DiscoveryMessageFactory implements MessageFactoryProvider { From 18e477f4f08c950bcdf0b38f8ebd2922044fcc8f Mon Sep 17 00:00:00 2001 From: Vladimir Steshin Date: Thu, 22 Jan 2026 11:28:51 +0300 Subject: [PATCH 16/16] review fixes --- .../tcp/messages/TcpDiscoveryAbstractTraceableMessage.java | 1 + .../spi/discovery/tcp/messages/TcpDiscoveryNodeLeftMessage.java | 2 +- 2 files changed, 2 insertions(+), 1 deletion(-) diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryAbstractTraceableMessage.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryAbstractTraceableMessage.java index c76fc8694d9b9..effc8bb2006b8 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryAbstractTraceableMessage.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryAbstractTraceableMessage.java @@ -32,6 +32,7 @@ public abstract class TcpDiscoveryAbstractTraceableMessage extends TcpDiscoveryA private SpanContainer spanContainer = new SpanContainer(); /** Serialization holder of {@link #spanContainer}'s bytes. */ + @SuppressWarnings("unused") @Order(value = 5, method = "spanBytes") private @Nullable byte[] spanBytesHolder; diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryNodeLeftMessage.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryNodeLeftMessage.java index 543a06fe4dc7f..8e70196ab051b 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryNodeLeftMessage.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryNodeLeftMessage.java @@ -48,7 +48,7 @@ public TcpDiscoveryNodeLeftMessage(UUID creatorNodeId) { /** {@inheritDoc} */ @Override public short directType() { - return 14; + return 16; } /** {@inheritDoc} */