From 709026ea424c7c8589f275e5551953d7ec6c8844 Mon Sep 17 00:00:00 2001 From: chesnokoff Date: Thu, 15 Jan 2026 01:13:34 +0300 Subject: [PATCH 1/5] IGNITE-26646 Use MessageSerializer for PartitionUpdateCountersMessage --- .../communication/GridIoMessageFactory.java | 3 +- .../dht/PartitionUpdateCountersMessage.java | 101 +++++------------- 2 files changed, 29 insertions(+), 75 deletions(-) diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java index 6dc2d02dd0af5..83d8712dea7a4 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java @@ -150,6 +150,7 @@ import org.apache.ignite.internal.codegen.NodeIdMessageSerializer; import org.apache.ignite.internal.codegen.NodeMetricsMessageSerializer; import org.apache.ignite.internal.codegen.PartitionReservationsMapSerializer; +import org.apache.ignite.internal.codegen.PartitionUpdateCountersMessageSerializer; import org.apache.ignite.internal.codegen.PartitionsToReloadSerializer; import org.apache.ignite.internal.codegen.RecoveryLastReceivedMessageSerializer; import org.apache.ignite.internal.codegen.SchemaOperationStatusMessageSerializer; @@ -464,7 +465,7 @@ public class GridIoMessageFactory implements MessageFactoryProvider { factory.register(CacheMetricsMessage.TYPE_CODE, CacheMetricsMessage::new, new CacheMetricsMessageSerializer()); factory.register(NodeMetricsMessage.TYPE_CODE, NodeMetricsMessage::new, new NodeMetricsMessageSerializer()); factory.register(NodeFullMetricsMessage.TYPE_CODE, NodeFullMetricsMessage::new, new NodeFullMetricsMessageSerializer()); - factory.register((short)157, PartitionUpdateCountersMessage::new); + factory.register((short)157, PartitionUpdateCountersMessage::new, new PartitionUpdateCountersMessageSerializer()); factory.register((short)162, GenerateEncryptionKeyRequest::new, new GenerateEncryptionKeyRequestSerializer()); factory.register((short)163, GenerateEncryptionKeyResponse::new); factory.register((short)167, ServiceDeploymentProcessId::new, new ServiceDeploymentProcessIdSerializer()); diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/PartitionUpdateCountersMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/PartitionUpdateCountersMessage.java index 8911359032ff3..8dd8a44b796c0 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/PartitionUpdateCountersMessage.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/PartitionUpdateCountersMessage.java @@ -17,37 +17,32 @@ package org.apache.ignite.internal.processors.cache.distributed.dht; -import java.nio.ByteBuffer; import java.util.Arrays; import java.util.Map; -import org.apache.ignite.internal.GridDirectTransient; -import org.apache.ignite.internal.IgniteCodeGeneratingFail; +import org.apache.ignite.internal.Order; import org.apache.ignite.internal.util.GridUnsafe; import org.apache.ignite.internal.util.typedef.internal.U; import org.apache.ignite.plugin.extensions.communication.Message; -import org.apache.ignite.plugin.extensions.communication.MessageReader; -import org.apache.ignite.plugin.extensions.communication.MessageWriter; /** * Partition update counters message. */ -@IgniteCodeGeneratingFail public class PartitionUpdateCountersMessage implements Message { /** */ private static final int ITEM_SIZE = 4 /* partition */ + 8 /* initial counter */ + 8 /* updates count */; /** */ - private byte data[]; + @Order(0) + private int cacheId; /** */ - private int cacheId; + @Order(value = 1, method = "payload") + private byte[] data; /** */ - @GridDirectTransient private int size; /** Used for assigning counters to cache entries during tx finish. */ - @GridDirectTransient private Map counters; /** */ @@ -66,6 +61,21 @@ public PartitionUpdateCountersMessage(int cacheId, int initSize) { data = new byte[initSize * ITEM_SIZE]; } + /** + * @return Payload. + */ + public byte[] payload() { + return Arrays.copyOf(data, size * ITEM_SIZE); + } + + /** + * @param payload New payload. + */ + public void payload(byte[] payload) { + this.data = payload; + this.size = this.data.length / ITEM_SIZE; + } + /** * @return Cache id. */ @@ -73,6 +83,13 @@ public int cacheId() { return cacheId; } + /** + * @param cacheId New cache id. + */ + public void cacheId(int cacheId) { + this.cacheId = cacheId; + } + /** * @return Size. */ @@ -152,13 +169,6 @@ public Long nextCounter(int partId) { return counters.computeIfPresent(partId, (key, cntr) -> cntr + 1); } - /** - * Clears message. - */ - public void clear() { - size = 0; - } - /** * Check if there is enough space is allocated. * @@ -171,63 +181,6 @@ private void ensureSpace(int newSize) { data = Arrays.copyOf(data, data.length << 1); } - /** {@inheritDoc} */ - @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) { - writer.setBuffer(buf); - - if (!writer.isHeaderWritten()) { - if (!writer.writeHeader(directType())) - return false; - - writer.onHeaderWritten(); - } - - switch (writer.state()) { - case 0: - if (!writer.writeInt(cacheId)) - return false; - - writer.incrementState(); - - case 1: - if (!writer.writeByteArray(data, 0, size * ITEM_SIZE)) - return false; - - writer.incrementState(); - - } - - return true; - } - - /** {@inheritDoc} */ - @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) { - reader.setBuffer(buf); - - switch (reader.state()) { - case 0: - cacheId = reader.readInt(); - - if (!reader.isLastRead()) - return false; - - reader.incrementState(); - - case 1: - data = reader.readByteArray(); - - if (!reader.isLastRead()) - return false; - - size = data.length / ITEM_SIZE; - - reader.incrementState(); - - } - - return true; - } - /** {@inheritDoc} */ @Override public short directType() { return 157; From 5d3d229347d10cc0fffb6ea6e615a7073e162fc2 Mon Sep 17 00:00:00 2001 From: chesnokoff Date: Fri, 16 Jan 2026 00:51:13 +0300 Subject: [PATCH 2/5] IGNITE-26646 Fix org.apache.ignite.internal.managers.communication.IgniteIoCommunicationMessageSerializationTest#testMessageSerializationAndDeserializationConsistency --- .../cache/distributed/dht/PartitionUpdateCountersMessage.java | 3 +++ 1 file changed, 3 insertions(+) diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/PartitionUpdateCountersMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/PartitionUpdateCountersMessage.java index 8dd8a44b796c0..684e895a15d67 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/PartitionUpdateCountersMessage.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/PartitionUpdateCountersMessage.java @@ -65,6 +65,9 @@ public PartitionUpdateCountersMessage(int cacheId, int initSize) { * @return Payload. */ public byte[] payload() { + if (data == null) + return new byte[0]; + return Arrays.copyOf(data, size * ITEM_SIZE); } From b535028ba8635d42f47e8dd4126620d84d52af63 Mon Sep 17 00:00:00 2001 From: chesnokoff Date: Fri, 16 Jan 2026 13:36:54 +0300 Subject: [PATCH 3/5] IGNITE-26646 Add fix for IgniteCachePutGetRestartAbstractTest#testTxPutGetRestart --- .../cache/distributed/dht/PartitionUpdateCountersMessage.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/PartitionUpdateCountersMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/PartitionUpdateCountersMessage.java index 684e895a15d67..4ff9fb02631b0 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/PartitionUpdateCountersMessage.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/PartitionUpdateCountersMessage.java @@ -75,8 +75,8 @@ public byte[] payload() { * @param payload New payload. */ public void payload(byte[] payload) { - this.data = payload; - this.size = this.data.length / ITEM_SIZE; + data = payload; + size = data == null ? 0 : data.length / ITEM_SIZE; } /** From f43390123d776092aedc51d163126b4ae263e9de Mon Sep 17 00:00:00 2001 From: chesnokoff Date: Mon, 19 Jan 2026 12:34:47 +0300 Subject: [PATCH 4/5] IGNITE-26646 Fix IgniteIoCommunicationMessageSerializationTest#testMessageSerializationAndDeserializationConsistency --- .../cache/distributed/dht/PartitionUpdateCountersMessage.java | 3 --- .../IgniteIoCommunicationMessageSerializationTest.java | 4 ++++ 2 files changed, 4 insertions(+), 3 deletions(-) diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/PartitionUpdateCountersMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/PartitionUpdateCountersMessage.java index 4ff9fb02631b0..08391d0fb19d0 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/PartitionUpdateCountersMessage.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/PartitionUpdateCountersMessage.java @@ -65,9 +65,6 @@ public PartitionUpdateCountersMessage(int cacheId, int initSize) { * @return Payload. */ public byte[] payload() { - if (data == null) - return new byte[0]; - return Arrays.copyOf(data, size * ITEM_SIZE); } diff --git a/modules/core/src/test/java/org/apache/ignite/internal/managers/communication/IgniteIoCommunicationMessageSerializationTest.java b/modules/core/src/test/java/org/apache/ignite/internal/managers/communication/IgniteIoCommunicationMessageSerializationTest.java index c5d6afb5bc63c..a05bb4e9ac693 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/managers/communication/IgniteIoCommunicationMessageSerializationTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/managers/communication/IgniteIoCommunicationMessageSerializationTest.java @@ -19,6 +19,7 @@ import java.util.UUID; import org.apache.commons.lang3.reflect.FieldUtils; +import org.apache.ignite.internal.processors.cache.distributed.dht.PartitionUpdateCountersMessage; import org.apache.ignite.lang.IgniteUuid; import org.apache.ignite.plugin.extensions.communication.Message; import org.apache.ignite.plugin.extensions.communication.MessageFactoryProvider; @@ -38,6 +39,9 @@ public class IgniteIoCommunicationMessageSerializationTest extends AbstractMessa if (msg instanceof NodeIdMessage) FieldUtils.writeField(msg, "nodeId", UUID.randomUUID(), true); + if (msg instanceof PartitionUpdateCountersMessage) + FieldUtils.writeField(msg, "data", new byte[0], true); + return msg; } From 97c0519eefefa72a3e3e3984443b94dafe7d39e2 Mon Sep 17 00:00:00 2001 From: chesnokoff Date: Mon, 19 Jan 2026 21:59:12 +0300 Subject: [PATCH 5/5] IGNITE-26646 Fix comments --- .../dht/PartitionUpdateCountersMessage.java | 18 +++++++++--------- 1 file changed, 9 insertions(+), 9 deletions(-) diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/PartitionUpdateCountersMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/PartitionUpdateCountersMessage.java index 08391d0fb19d0..bf36020f1ee8b 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/PartitionUpdateCountersMessage.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/PartitionUpdateCountersMessage.java @@ -31,13 +31,13 @@ public class PartitionUpdateCountersMessage implements Message { /** */ private static final int ITEM_SIZE = 4 /* partition */ + 8 /* initial counter */ + 8 /* updates count */; - /** */ + /** Byte representation of partition counters. */ @Order(0) - private int cacheId; + private byte[] data; /** */ - @Order(value = 1, method = "payload") - private byte[] data; + @Order(1) + private int cacheId; /** */ private int size; @@ -62,17 +62,17 @@ public PartitionUpdateCountersMessage(int cacheId, int initSize) { } /** - * @return Payload. + * @return Data. */ - public byte[] payload() { + public byte[] data() { return Arrays.copyOf(data, size * ITEM_SIZE); } /** - * @param payload New payload. + * @param data New data. */ - public void payload(byte[] payload) { - data = payload; + public void data(byte[] data) { + this.data = data; size = data == null ? 0 : data.length / ITEM_SIZE; }