Skip to content

Commit 4fc655c

Browse files
committed
IGNITE-27604 Use MessageSerializer for TcpDiscoveryNodeAddFinishedMessage
1 parent 9a4ddbf commit 4fc655c

5 files changed

Lines changed: 260 additions & 27 deletions

File tree

modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/DiscoveryMessageFactory.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,8 +17,10 @@
1717

1818
package org.apache.ignite.internal.managers.discovery;
1919

20+
import org.apache.ignite.internal.codegen.DiscoveryDataPacketSerializer;
2021
import org.apache.ignite.internal.codegen.InetAddressMessageSerializer;
2122
import org.apache.ignite.internal.codegen.InetSocketAddressMessageSerializer;
23+
import org.apache.ignite.internal.codegen.NodeSpecificDataSerializer;
2224
import org.apache.ignite.internal.codegen.TcpDiscoveryAuthFailedMessageSerializer;
2325
import org.apache.ignite.internal.codegen.TcpDiscoveryCacheMetricsMessageSerializer;
2426
import org.apache.ignite.internal.codegen.TcpDiscoveryCheckFailedMessageSerializer;
@@ -34,6 +36,7 @@
3436
import org.apache.ignite.internal.codegen.TcpDiscoveryHandshakeResponseSerializer;
3537
import org.apache.ignite.internal.codegen.TcpDiscoveryLoopbackProblemMessageSerializer;
3638
import org.apache.ignite.internal.codegen.TcpDiscoveryMetricsUpdateMessageSerializer;
39+
import org.apache.ignite.internal.codegen.TcpDiscoveryNodeAddFinishedMessageSerializer;
3740
import org.apache.ignite.internal.codegen.TcpDiscoveryNodeFullMetricsMessageSerializer;
3841
import org.apache.ignite.internal.codegen.TcpDiscoveryNodeLeftMessageSerializer;
3942
import org.apache.ignite.internal.codegen.TcpDiscoveryNodeMetricsMessageSerializer;
@@ -42,8 +45,10 @@
4245
import org.apache.ignite.internal.codegen.TcpDiscoveryRingLatencyCheckMessageSerializer;
4346
import org.apache.ignite.plugin.extensions.communication.MessageFactory;
4447
import org.apache.ignite.plugin.extensions.communication.MessageFactoryProvider;
48+
import org.apache.ignite.spi.discovery.tcp.internal.DiscoveryDataPacket;
4549
import org.apache.ignite.spi.discovery.tcp.messages.InetAddressMessage;
4650
import org.apache.ignite.spi.discovery.tcp.messages.InetSocketAddressMessage;
51+
import org.apache.ignite.spi.discovery.tcp.messages.NodeSpecificData;
4752
import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryAuthFailedMessage;
4853
import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryCacheMetricsMessage;
4954
import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryCheckFailedMessage;
@@ -59,6 +64,7 @@
5964
import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryHandshakeResponse;
6065
import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryLoopbackProblemMessage;
6166
import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryMetricsUpdateMessage;
67+
import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryNodeAddFinishedMessage;
6268
import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryNodeFullMetricsMessage;
6369
import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryNodeLeftMessage;
6470
import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryNodeMetricsMessage;
@@ -70,6 +76,8 @@
7076
public class DiscoveryMessageFactory implements MessageFactoryProvider {
7177
/** {@inheritDoc} */
7278
@Override public void registerAll(MessageFactory factory) {
79+
factory.register((short)-107, NodeSpecificData::new, new NodeSpecificDataSerializer());
80+
factory.register((short)-106, DiscoveryDataPacket::new, new DiscoveryDataPacketSerializer());
7381
factory.register((short)-105, TcpDiscoveryNodeFullMetricsMessage::new,
7482
new TcpDiscoveryNodeFullMetricsMessageSerializer());
7583
factory.register((short)-104, TcpDiscoveryClientNodesMetricsMessage::new, new TcpDiscoveryClientNodesMetricsMessageSerializer());
@@ -95,5 +103,6 @@ public class DiscoveryMessageFactory implements MessageFactoryProvider {
95103
factory.register((short)14, TcpDiscoveryMetricsUpdateMessage::new, new TcpDiscoveryMetricsUpdateMessageSerializer());
96104
factory.register((short)15, TcpDiscoveryClientAckResponse::new, new TcpDiscoveryClientAckResponseSerializer());
97105
factory.register((short)16, TcpDiscoveryNodeLeftMessage::new, new TcpDiscoveryNodeLeftMessageSerializer());
106+
factory.register((short)17, TcpDiscoveryNodeAddFinishedMessage::new, new TcpDiscoveryNodeAddFinishedMessageSerializer());
98107
}
99108
}

modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/internal/DiscoveryDataPacket.java

Lines changed: 95 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -28,42 +28,54 @@
2828
import org.apache.ignite.IgniteException;
2929
import org.apache.ignite.IgniteLogger;
3030
import org.apache.ignite.internal.GridComponent;
31+
import org.apache.ignite.internal.Order;
3132
import org.apache.ignite.internal.util.typedef.X;
3233
import org.apache.ignite.internal.util.typedef.internal.U;
3334
import org.apache.ignite.marshaller.Marshaller;
35+
import org.apache.ignite.plugin.extensions.communication.Message;
3436
import org.apache.ignite.spi.discovery.DiscoveryDataBag;
37+
import org.apache.ignite.spi.discovery.tcp.messages.NodeSpecificData;
3538

3639
import static org.apache.ignite.internal.GridComponent.DiscoveryDataExchangeType.CONTINUOUS_PROC;
3740

3841
/**
3942
* Carries discovery data in marshalled form
4043
* and allows convenient way of converting it to and from {@link DiscoveryDataBag} objects.
4144
*/
42-
public class DiscoveryDataPacket implements Serializable {
43-
/** Local file header signature(read as a little-endian number). */
44-
private static int ZIP_HEADER_SIGNATURE = 0x04034b50;
45+
public class DiscoveryDataPacket implements Serializable, Message {
46+
/** Local file header signature (read as a little-endian number). */
47+
private static final int ZIP_HEADER_SIGNATURE = 0x04034b50;
4548

4649
/** */
4750
private static final long serialVersionUID = 0L;
4851

4952
/** */
50-
private final UUID joiningNodeId;
53+
@Order(0)
54+
private UUID joiningNodeId;
5155

5256
/** */
57+
@Order(1)
5358
private Map<Integer, byte[]> joiningNodeData = new HashMap<>();
5459

5560
/** */
5661
private transient Map<Integer, Serializable> unmarshalledJoiningNodeData;
5762

5863
/** */
64+
@Order(2)
5965
private Map<Integer, byte[]> commonData = new HashMap<>();
6066

6167
/** */
62-
private Map<UUID, Map<Integer, byte[]>> nodeSpecificData = new LinkedHashMap<>();
68+
@Order(3)
69+
private Map<UUID, NodeSpecificData> nodeSpecificData = new LinkedHashMap<>();
6370

6471
/** */
6572
private transient boolean joiningNodeClient;
6673

74+
/** Constructor. */
75+
public DiscoveryDataPacket() {
76+
// No-op.
77+
}
78+
6779
/**
6880
* @param joiningNodeId Joining node id.
6981
*/
@@ -78,6 +90,55 @@ public UUID joiningNodeId() {
7890
return joiningNodeId;
7991
}
8092

93+
/**
94+
* @param joiningNodeId Joining node ID.
95+
*/
96+
public void joiningNodeId(UUID joiningNodeId) {
97+
this.joiningNodeId = joiningNodeId;
98+
}
99+
100+
/**
101+
* @return Joining node data.
102+
*/
103+
public Map<Integer, byte[]> joiningNodeData() {
104+
return joiningNodeData;
105+
}
106+
107+
/**
108+
* @param joiningNodeData Joining node data.
109+
*/
110+
public void joiningNodeData(Map<Integer, byte[]> joiningNodeData) {
111+
this.joiningNodeData = joiningNodeData;
112+
}
113+
114+
/**
115+
* @return Common data.
116+
*/
117+
public Map<Integer, byte[]> commonData() {
118+
return commonData;
119+
}
120+
121+
/**
122+
* @param commonData Common data.
123+
*/
124+
public void commonData(Map<Integer, byte[]> commonData) {
125+
this.commonData = commonData;
126+
}
127+
128+
/**
129+
* @return Node specific data.
130+
*/
131+
public Map<UUID, NodeSpecificData> nodeSpecificData() {
132+
return nodeSpecificData;
133+
}
134+
135+
/**
136+
* @param nodeSpecificData New node specific data.
137+
*/
138+
public void nodeSpecificData(Map<UUID, NodeSpecificData> nodeSpecificData) {
139+
this.nodeSpecificData = nodeSpecificData;
140+
}
141+
81142
/**
82143
* @param bag Bag.
83144
* @param nodeId Node id.
@@ -98,7 +159,7 @@ public void marshalGridNodeData(DiscoveryDataBag bag, UUID nodeId, Marshaller ma
98159
filterDuplicatedData(marshLocNodeSpecificData);
99160

100161
if (!marshLocNodeSpecificData.isEmpty())
101-
nodeSpecificData.put(nodeId, marshLocNodeSpecificData);
162+
nodeSpecificData.put(nodeId, new NodeSpecificData(marshLocNodeSpecificData));
102163
}
103164
}
104165

@@ -132,8 +193,11 @@ public DiscoveryDataBag unmarshalGridData(
132193
if (nodeSpecificData != null && !nodeSpecificData.isEmpty()) {
133194
Map<UUID, Map<Integer, Serializable>> unmarshNodeSpecData = U.newLinkedHashMap(nodeSpecificData.size());
134195

135-
for (Map.Entry<UUID, Map<Integer, byte[]>> nodeBinEntry : nodeSpecificData.entrySet()) {
136-
Map<Integer, byte[]> nodeBinData = nodeBinEntry.getValue();
196+
for (Map.Entry<UUID, NodeSpecificData> nodeBinEntry : nodeSpecificData.entrySet()) {
197+
if (nodeBinEntry.getValue() == null)
198+
continue;
199+
200+
Map<Integer, byte[]> nodeBinData = nodeBinEntry.getValue().nodeSpecificData();
137201

138202
if (nodeBinData == null || nodeBinData.isEmpty())
139203
continue;
@@ -260,12 +324,17 @@ public boolean mergeDataFrom(
260324
}
261325

262326
if (nodeSpecificData.size() != mrgdSpecifDataKeys.size()) {
263-
for (Map.Entry<UUID, Map<Integer, byte[]>> e : nodeSpecificData.entrySet()) {
327+
for (Map.Entry<UUID, NodeSpecificData> e : nodeSpecificData.entrySet()) {
264328
if (!mrgdSpecifDataKeys.contains(e.getKey())) {
265-
Map<Integer, byte[]> data = existingDataPacket.nodeSpecificData.get(e.getKey());
329+
NodeSpecificData dataMsg = existingDataPacket.nodeSpecificData.get(e.getKey());
266330

267-
if (data != null && mapsEqual(e.getValue(), data)) {
268-
e.setValue(data);
331+
if (dataMsg == null)
332+
continue;
333+
334+
Map<Integer, byte[]> data = dataMsg.nodeSpecificData();
335+
336+
if (data != null && mapsEqual(e.getValue().nodeSpecificData(), data)) {
337+
e.setValue(new NodeSpecificData(data));
269338

270339
boolean add = mrgdSpecifDataKeys.add(e.getKey());
271340

@@ -310,7 +379,7 @@ private boolean mapsEqual(Map<Integer, byte[]> m1, Map<Integer, byte[]> m2) {
310379
* @param clientNode Client node.
311380
* @param log Logger.
312381
* @param panic Throw unmarshalling if {@code true}.
313-
* @throws IgniteCheckedException If {@code panic} is {@true} and unmarshalling failed.
382+
* @throws IgniteCheckedException If {@code panic} is {@code True} and unmarshalling failed.
314383
*/
315384
private Map<Integer, Serializable> unmarshalData(
316385
Map<Integer, byte[]> src,
@@ -358,11 +427,11 @@ else if (binEntry.getKey() < GridComponent.DiscoveryDataExchangeType.VALUES.leng
358427
}
359428

360429
/**
361-
* @param value Value to check.
430+
* @param val Value to check.
362431
* @return {@code true} if value is zipped.
363432
*/
364-
private boolean isZipped(byte[] value) {
365-
return value != null && value.length > 3 && makeInt(value) == ZIP_HEADER_SIGNATURE;
433+
private boolean isZipped(byte[] val) {
434+
return val != null && val.length > 3 && makeInt(val) == ZIP_HEADER_SIGNATURE;
366435
}
367436

368437
/**
@@ -391,7 +460,7 @@ private void marshalData(
391460
int compressionLevel,
392461
IgniteLogger log
393462
) {
394-
//may happen if nothing was collected from components,
463+
// may happen if nothing was collected from components,
395464
// corresponding map (for common data or for node specific data) left null
396465
if (src == null)
397466
return;
@@ -411,13 +480,15 @@ private void marshalData(
411480
* TODO https://issues.apache.org/jira/browse/IGNITE-4435
412481
*/
413482
private void filterDuplicatedData(Map<Integer, byte[]> discoData) {
414-
for (Map<Integer, byte[]> existingData : nodeSpecificData.values()) {
483+
for (NodeSpecificData existingData : nodeSpecificData.values()) {
415484
Iterator<Map.Entry<Integer, byte[]>> it = discoData.entrySet().iterator();
416485

417486
while (it.hasNext()) {
418487
Map.Entry<Integer, byte[]> discoDataEntry = it.next();
419488

420-
byte[] curData = existingData.get(discoDataEntry.getKey());
489+
byte[] curData = (existingData == null || existingData.nodeSpecificData() == null)
490+
? null
491+
: existingData.nodeSpecificData().get(discoDataEntry.getKey());
421492

422493
if (Arrays.equals(curData, discoDataEntry.getValue()))
423494
it.remove();
@@ -454,4 +525,9 @@ public void joiningNodeClient(boolean joiningNodeClient) {
454525
public void clearUnmarshalledJoiningNodeData() {
455526
unmarshalledJoiningNodeData = null;
456527
}
528+
529+
/** {@inheritDoc} */
530+
@Override public short directType() {
531+
return -106;
532+
}
457533
}
Lines changed: 83 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,83 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.ignite.spi.discovery.tcp.messages;
19+
20+
import java.io.Serializable;
21+
import java.util.Map;
22+
import java.util.Objects;
23+
import org.apache.ignite.internal.Order;
24+
import org.apache.ignite.plugin.extensions.communication.Message;
25+
26+
/** */
27+
public class NodeSpecificData implements Message, Serializable {
28+
/** */
29+
private static final long serialVersionUID = 0L;
30+
31+
/** */
32+
@Order(0)
33+
private Map<Integer, byte[]> nodeSpecificData;
34+
35+
/** */
36+
public NodeSpecificData() {
37+
// No-op.
38+
}
39+
40+
/**
41+
* @param nodeSpecificData Node specific data.
42+
*/
43+
public NodeSpecificData(Map<Integer, byte[]> nodeSpecificData) {
44+
this.nodeSpecificData = nodeSpecificData;
45+
}
46+
47+
/**
48+
* @return Node specific data.
49+
*/
50+
public Map<Integer, byte[]> nodeSpecificData() {
51+
return nodeSpecificData;
52+
}
53+
54+
/**
55+
* @param nodeSpecificData New node specific data.
56+
*/
57+
public void nodeSpecificData(Map<Integer, byte[]> nodeSpecificData) {
58+
this.nodeSpecificData = nodeSpecificData;
59+
}
60+
61+
/** {@inheritDoc} */
62+
@Override public short directType() {
63+
return -107;
64+
}
65+
66+
/** {@inheritDoc} */
67+
@Override public boolean equals(Object o) {
68+
if (this == o)
69+
return true;
70+
71+
if (o == null || getClass() != o.getClass())
72+
return false;
73+
74+
NodeSpecificData that = (NodeSpecificData)o;
75+
76+
return Objects.equals(nodeSpecificData, that.nodeSpecificData);
77+
}
78+
79+
/** {@inheritDoc} */
80+
@Override public int hashCode() {
81+
return Objects.hashCode(nodeSpecificData);
82+
}
83+
}

0 commit comments

Comments
 (0)