Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -7649,6 +7649,16 @@ void metrics(ClusterMetrics metrics) {
* @param msg Message.
*/
void addMessage(TcpDiscoveryAbstractMessage msg) {
try {
ses.serializeMessage(msg);
}
catch (IgniteCheckedException | IOException e) {
if (log.isDebugEnabled())
U.error(log, "Serialization failed [msg=" + msg + ']', e);

onException("Serialization failed [msg=" + msg + ']', e);
}

if (msg.highPriority())
queue.addFirst(msg);
else
Expand All @@ -7667,8 +7677,6 @@ void addMessage(TcpDiscoveryAbstractMessage msg) {
try {
assert msg.verified() : msg;

byte[] msgBytes = ses.serializeMessage(msg);

DebugLogger msgLog = messageLogger(msg);

if (msg instanceof TcpDiscoveryClientAckResponse) {
Expand All @@ -7690,7 +7698,7 @@ else if (msgLog.isDebugEnabled()) {
+ getLocalNodeId() + ", rmtNodeId=" + clientNodeId + ", msg=" + msg + ']');
}

spi.writeToSocket(sock, msg, msgBytes, spi.failureDetectionTimeoutEnabled() ?
spi.writeToSocket(sock, msg, msg.serializedData(), spi.failureDetectionTimeoutEnabled() ?
spi.clientFailureDetectionTimeout() : spi.getSocketTimeout());
}
}
Expand All @@ -7702,7 +7710,7 @@ else if (msgLog.isDebugEnabled()) {

assert topologyInitialized(msg) : msg;

spi.writeToSocket(sock, msg, msgBytes, spi.getEffectiveSocketTimeout(false));
spi.writeToSocket(sock, msg, msg.serializedData(), spi.getEffectiveSocketTimeout(false));
}

boolean clientFailed = msg instanceof TcpDiscoveryNodeFailedMessage &&
Expand All @@ -7712,7 +7720,7 @@ else if (msgLog.isDebugEnabled()) {

success = !clientFailed;
}
catch (IgniteCheckedException | IOException e) {
catch (IOException e) {
if (log.isDebugEnabled())
U.error(log, "Client connection failed [sock=" + sock + ", locNodeId="
+ getLocalNodeId() + ", rmtNodeId=" + clientNodeId + ", msg=" + msg + ']', e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -242,19 +242,21 @@ <T> T readMessage() throws IgniteCheckedException, IOException {
* Serializes a discovery message into a byte array.
*
* @param msg Discovery message to serialize.
* @return Serialized byte array containing the message data.
* @throws IgniteCheckedException If serialization fails.
* @throws IOException If serialization fails.
*/
byte[] serializeMessage(TcpDiscoveryAbstractMessage msg) throws IgniteCheckedException, IOException {
if (!(msg instanceof Message))
return U.marshal(spi.marshaller(), msg);
void serializeMessage(TcpDiscoveryAbstractMessage msg) throws IgniteCheckedException, IOException {
if (msg.serializedData() != null)
return;

try (ByteArrayOutputStream out = new ByteArrayOutputStream()) {
serializeMessage((Message)msg, out);
if (!(msg instanceof Message))
msg.serializedData(U.marshal(spi.marshaller(), msg));
else
try (ByteArrayOutputStream out = new ByteArrayOutputStream()) {
serializeMessage((Message)msg, out);

return out.toByteArray();
}
msg.serializedData(out.toByteArray());
}
}

/** @return Socket. */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,11 @@ public abstract class TcpDiscoveryAbstractMessage implements Serializable {
@Order(4)
private Set<UUID> failedNodes;

/** Helps to avoid multiply serialization in case message is sent more than once using different workers. */
public byte[] serializedData() {
return serData;
}

/**
* Default no-arg constructor for {@link Externalizable} interface.
*/
Expand All @@ -105,6 +110,14 @@ protected TcpDiscoveryAbstractMessage(TcpDiscoveryAbstractMessage msg) {
flags = msg.flags;
}

/** */
public void serializedData(byte[] serData) {
this.serData = serData;
}

/** */
private byte[] serData;

/**
* @return {@code True} if need use trace logging for this message (to reduce amount of logging with debug level).
*/
Expand Down
Loading