diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java index 6dc2d02dd0af5..83d8712dea7a4 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java @@ -150,6 +150,7 @@ import org.apache.ignite.internal.codegen.NodeIdMessageSerializer; import org.apache.ignite.internal.codegen.NodeMetricsMessageSerializer; import org.apache.ignite.internal.codegen.PartitionReservationsMapSerializer; +import org.apache.ignite.internal.codegen.PartitionUpdateCountersMessageSerializer; import org.apache.ignite.internal.codegen.PartitionsToReloadSerializer; import org.apache.ignite.internal.codegen.RecoveryLastReceivedMessageSerializer; import org.apache.ignite.internal.codegen.SchemaOperationStatusMessageSerializer; @@ -464,7 +465,7 @@ public class GridIoMessageFactory implements MessageFactoryProvider { factory.register(CacheMetricsMessage.TYPE_CODE, CacheMetricsMessage::new, new CacheMetricsMessageSerializer()); factory.register(NodeMetricsMessage.TYPE_CODE, NodeMetricsMessage::new, new NodeMetricsMessageSerializer()); factory.register(NodeFullMetricsMessage.TYPE_CODE, NodeFullMetricsMessage::new, new NodeFullMetricsMessageSerializer()); - factory.register((short)157, PartitionUpdateCountersMessage::new); + factory.register((short)157, PartitionUpdateCountersMessage::new, new PartitionUpdateCountersMessageSerializer()); factory.register((short)162, GenerateEncryptionKeyRequest::new, new GenerateEncryptionKeyRequestSerializer()); factory.register((short)163, GenerateEncryptionKeyResponse::new); factory.register((short)167, ServiceDeploymentProcessId::new, new ServiceDeploymentProcessIdSerializer()); diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/PartitionUpdateCountersMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/PartitionUpdateCountersMessage.java index 8911359032ff3..bf36020f1ee8b 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/PartitionUpdateCountersMessage.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/PartitionUpdateCountersMessage.java @@ -17,37 +17,32 @@ package org.apache.ignite.internal.processors.cache.distributed.dht; -import java.nio.ByteBuffer; import java.util.Arrays; import java.util.Map; -import org.apache.ignite.internal.GridDirectTransient; -import org.apache.ignite.internal.IgniteCodeGeneratingFail; +import org.apache.ignite.internal.Order; import org.apache.ignite.internal.util.GridUnsafe; import org.apache.ignite.internal.util.typedef.internal.U; import org.apache.ignite.plugin.extensions.communication.Message; -import org.apache.ignite.plugin.extensions.communication.MessageReader; -import org.apache.ignite.plugin.extensions.communication.MessageWriter; /** * Partition update counters message. */ -@IgniteCodeGeneratingFail public class PartitionUpdateCountersMessage implements Message { /** */ private static final int ITEM_SIZE = 4 /* partition */ + 8 /* initial counter */ + 8 /* updates count */; - /** */ - private byte data[]; + /** Byte representation of partition counters. */ + @Order(0) + private byte[] data; /** */ + @Order(1) private int cacheId; /** */ - @GridDirectTransient private int size; /** Used for assigning counters to cache entries during tx finish. */ - @GridDirectTransient private Map counters; /** */ @@ -66,6 +61,21 @@ public PartitionUpdateCountersMessage(int cacheId, int initSize) { data = new byte[initSize * ITEM_SIZE]; } + /** + * @return Data. + */ + public byte[] data() { + return Arrays.copyOf(data, size * ITEM_SIZE); + } + + /** + * @param data New data. + */ + public void data(byte[] data) { + this.data = data; + size = data == null ? 0 : data.length / ITEM_SIZE; + } + /** * @return Cache id. */ @@ -73,6 +83,13 @@ public int cacheId() { return cacheId; } + /** + * @param cacheId New cache id. + */ + public void cacheId(int cacheId) { + this.cacheId = cacheId; + } + /** * @return Size. */ @@ -152,13 +169,6 @@ public Long nextCounter(int partId) { return counters.computeIfPresent(partId, (key, cntr) -> cntr + 1); } - /** - * Clears message. - */ - public void clear() { - size = 0; - } - /** * Check if there is enough space is allocated. * @@ -171,63 +181,6 @@ private void ensureSpace(int newSize) { data = Arrays.copyOf(data, data.length << 1); } - /** {@inheritDoc} */ - @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) { - writer.setBuffer(buf); - - if (!writer.isHeaderWritten()) { - if (!writer.writeHeader(directType())) - return false; - - writer.onHeaderWritten(); - } - - switch (writer.state()) { - case 0: - if (!writer.writeInt(cacheId)) - return false; - - writer.incrementState(); - - case 1: - if (!writer.writeByteArray(data, 0, size * ITEM_SIZE)) - return false; - - writer.incrementState(); - - } - - return true; - } - - /** {@inheritDoc} */ - @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) { - reader.setBuffer(buf); - - switch (reader.state()) { - case 0: - cacheId = reader.readInt(); - - if (!reader.isLastRead()) - return false; - - reader.incrementState(); - - case 1: - data = reader.readByteArray(); - - if (!reader.isLastRead()) - return false; - - size = data.length / ITEM_SIZE; - - reader.incrementState(); - - } - - return true; - } - /** {@inheritDoc} */ @Override public short directType() { return 157; diff --git a/modules/core/src/test/java/org/apache/ignite/internal/managers/communication/IgniteIoCommunicationMessageSerializationTest.java b/modules/core/src/test/java/org/apache/ignite/internal/managers/communication/IgniteIoCommunicationMessageSerializationTest.java index c5d6afb5bc63c..a05bb4e9ac693 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/managers/communication/IgniteIoCommunicationMessageSerializationTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/managers/communication/IgniteIoCommunicationMessageSerializationTest.java @@ -19,6 +19,7 @@ import java.util.UUID; import org.apache.commons.lang3.reflect.FieldUtils; +import org.apache.ignite.internal.processors.cache.distributed.dht.PartitionUpdateCountersMessage; import org.apache.ignite.lang.IgniteUuid; import org.apache.ignite.plugin.extensions.communication.Message; import org.apache.ignite.plugin.extensions.communication.MessageFactoryProvider; @@ -38,6 +39,9 @@ public class IgniteIoCommunicationMessageSerializationTest extends AbstractMessa if (msg instanceof NodeIdMessage) FieldUtils.writeField(msg, "nodeId", UUID.randomUUID(), true); + if (msg instanceof PartitionUpdateCountersMessage) + FieldUtils.writeField(msg, "data", new byte[0], true); + return msg; }