Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,7 @@
import org.apache.ignite.internal.codegen.NodeIdMessageSerializer;
import org.apache.ignite.internal.codegen.NodeMetricsMessageSerializer;
import org.apache.ignite.internal.codegen.PartitionReservationsMapSerializer;
import org.apache.ignite.internal.codegen.PartitionUpdateCountersMessageSerializer;
import org.apache.ignite.internal.codegen.PartitionsToReloadSerializer;
import org.apache.ignite.internal.codegen.RecoveryLastReceivedMessageSerializer;
import org.apache.ignite.internal.codegen.SchemaOperationStatusMessageSerializer;
Expand Down Expand Up @@ -464,7 +465,7 @@ public class GridIoMessageFactory implements MessageFactoryProvider {
factory.register(CacheMetricsMessage.TYPE_CODE, CacheMetricsMessage::new, new CacheMetricsMessageSerializer());
factory.register(NodeMetricsMessage.TYPE_CODE, NodeMetricsMessage::new, new NodeMetricsMessageSerializer());
factory.register(NodeFullMetricsMessage.TYPE_CODE, NodeFullMetricsMessage::new, new NodeFullMetricsMessageSerializer());
factory.register((short)157, PartitionUpdateCountersMessage::new);
factory.register((short)157, PartitionUpdateCountersMessage::new, new PartitionUpdateCountersMessageSerializer());
factory.register((short)162, GenerateEncryptionKeyRequest::new, new GenerateEncryptionKeyRequestSerializer());
factory.register((short)163, GenerateEncryptionKeyResponse::new);
factory.register((short)167, ServiceDeploymentProcessId::new, new ServiceDeploymentProcessIdSerializer());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,37 +17,32 @@

package org.apache.ignite.internal.processors.cache.distributed.dht;

import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.Map;
import org.apache.ignite.internal.GridDirectTransient;
import org.apache.ignite.internal.IgniteCodeGeneratingFail;
import org.apache.ignite.internal.Order;
import org.apache.ignite.internal.util.GridUnsafe;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.plugin.extensions.communication.Message;
import org.apache.ignite.plugin.extensions.communication.MessageReader;
import org.apache.ignite.plugin.extensions.communication.MessageWriter;

/**
* Partition update counters message.
*/
@IgniteCodeGeneratingFail
public class PartitionUpdateCountersMessage implements Message {
/** */
private static final int ITEM_SIZE = 4 /* partition */ + 8 /* initial counter */ + 8 /* updates count */;

/** */
private byte data[];
/** Byte representation of partition counters. */
@Order(0)
private byte[] data;

/** */
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
/** */
/** Byte representation of partition counters. */

@Order(1)
private int cacheId;

/** */
@GridDirectTransient
private int size;

/** Used for assigning counters to cache entries during tx finish. */
@GridDirectTransient
private Map<Integer, Long> counters;

/** */
Expand All @@ -66,13 +61,35 @@ public PartitionUpdateCountersMessage(int cacheId, int initSize) {
data = new byte[initSize * ITEM_SIZE];
}

/**
* @return Data.
*/
public byte[] data() {
return Arrays.copyOf(data, size * ITEM_SIZE);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Possbile NPE here.
Also, we should ensure that it could not lead to a potential performance drop.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@chesnokoff , see an example:

Suggested change
return Arrays.copyOf(data, size * ITEM_SIZE);
return F.isEmpty(data) ? null : Arrays.copyOf(data, size * ITEM_SIZE);

}

/**
* @param data New data.
*/
public void data(byte[] data) {
this.data = data;
size = data == null ? 0 : data.length / ITEM_SIZE;
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IgniteCachePutGetRestartAbstractTest#testTxPutGetRestart fails without null check

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

MessageReader#readByteArray returns null when it couldn't fully extract the entire payload from the buffer because the complete byte array didn't fit into the read buffer.

Previously, we were setting the size after successfully reading the whole byte array using the reader.

In my opinion, the current logic seems fine.

}

/**
* @return Cache id.
*/
public int cacheId() {
return cacheId;
}

/**
* @param cacheId New cache id.
*/
public void cacheId(int cacheId) {
this.cacheId = cacheId;
}

/**
* @return Size.
*/
Expand Down Expand Up @@ -152,13 +169,6 @@ public Long nextCounter(int partId) {
return counters.computeIfPresent(partId, (key, cntr) -> cntr + 1);
}

/**
* Clears message.
*/
public void clear() {
size = 0;
}

/**
* Check if there is enough space is allocated.
*
Expand All @@ -171,63 +181,6 @@ private void ensureSpace(int newSize) {
data = Arrays.copyOf(data, data.length << 1);
}

/** {@inheritDoc} */
@Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) {
writer.setBuffer(buf);

if (!writer.isHeaderWritten()) {
if (!writer.writeHeader(directType()))
return false;

writer.onHeaderWritten();
}

switch (writer.state()) {
case 0:
if (!writer.writeInt(cacheId))
return false;

writer.incrementState();

case 1:
if (!writer.writeByteArray(data, 0, size * ITEM_SIZE))
return false;

writer.incrementState();

}

return true;
}

/** {@inheritDoc} */
@Override public boolean readFrom(ByteBuffer buf, MessageReader reader) {
reader.setBuffer(buf);

switch (reader.state()) {
case 0:
cacheId = reader.readInt();

if (!reader.isLastRead())
return false;

reader.incrementState();

case 1:
data = reader.readByteArray();

if (!reader.isLastRead())
return false;

size = data.length / ITEM_SIZE;

reader.incrementState();

}

return true;
}

/** {@inheritDoc} */
@Override public short directType() {
return 157;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import java.util.UUID;
import org.apache.commons.lang3.reflect.FieldUtils;
import org.apache.ignite.internal.processors.cache.distributed.dht.PartitionUpdateCountersMessage;
import org.apache.ignite.lang.IgniteUuid;
import org.apache.ignite.plugin.extensions.communication.Message;
import org.apache.ignite.plugin.extensions.communication.MessageFactoryProvider;
Expand All @@ -38,6 +39,9 @@ public class IgniteIoCommunicationMessageSerializationTest extends AbstractMessa
if (msg instanceof NodeIdMessage)
FieldUtils.writeField(msg, "nodeId", UUID.randomUUID(), true);

if (msg instanceof PartitionUpdateCountersMessage)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need this change? NPE should be properly fixed in a message.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I suggested removing the null check from the message and adding it to the test.
See #12402 (comment)
But the final decision is up to you.

Copy link
Contributor

@shishkovilja shishkovilja Jan 19, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I suggested removing the null check from the message and adding it to the test. See #12402 (comment) But the final decision is up to you.

We can safely send null for empty or null array. BUT, as I see now, message will be in incorrect state if data is null.

So, you are right. Let's keep it.

FieldUtils.writeField(msg, "data", new byte[0], true);

return msg;
}

Expand Down