diff --git a/modules/codegen/src/main/java/org/apache/ignite/internal/MessageSerializerGenerator.java b/modules/codegen/src/main/java/org/apache/ignite/internal/MessageSerializerGenerator.java index fef259eec6a08..893a1ccaa4878 100644 --- a/modules/codegen/src/main/java/org/apache/ignite/internal/MessageSerializerGenerator.java +++ b/modules/codegen/src/main/java/org/apache/ignite/internal/MessageSerializerGenerator.java @@ -724,7 +724,7 @@ private String messageCollectionItemTypes(VariableElement field, TypeMirror type String descName = field.getSimpleName() + "CollDesc"; String typeName = desc.substring(desc.indexOf(' ') + 1, desc.indexOf('(')); - fields.add("private final static " + typeName + " " + descName + " = " + desc + ";"); + fields.add("private static final " + typeName + " " + descName + " = " + desc + ";"); return descName; } diff --git a/modules/core/src/main/java/org/apache/ignite/internal/ClusterMetricsSnapshot.java b/modules/core/src/main/java/org/apache/ignite/internal/ClusterMetricsSnapshot.java index 60484f3280adf..4d2795abffa01 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/ClusterMetricsSnapshot.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/ClusterMetricsSnapshot.java @@ -109,6 +109,9 @@ public ClusterMetricsSnapshot() { * Creates snapshot based on the handled message. */ public ClusterMetricsSnapshot(NodeMetricsMessage m) { + // As in #deserialize(). + m.lastUpdateTime = U.currentTimeMillis(); + this.m = m; } diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/DiscoveryMessageFactory.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/DiscoveryMessageFactory.java index fe0a323461a7d..dd55f8363daf5 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/DiscoveryMessageFactory.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/DiscoveryMessageFactory.java @@ -138,6 +138,8 @@ import org.apache.ignite.spi.communication.tcp.internal.TcpConnectionRequestDiscoveryMessageSerializer; import org.apache.ignite.spi.discovery.tcp.internal.DiscoveryDataPacket; import org.apache.ignite.spi.discovery.tcp.internal.DiscoveryDataPacketSerializer; +import org.apache.ignite.spi.discovery.tcp.messages.IgniteProductVersionMessage; +import org.apache.ignite.spi.discovery.tcp.messages.IgniteProductVersionMessageSerializer; import org.apache.ignite.spi.discovery.tcp.messages.InetAddressMessage; import org.apache.ignite.spi.discovery.tcp.messages.InetAddressMessageSerializer; import org.apache.ignite.spi.discovery.tcp.messages.InetSocketAddressMessage; @@ -192,6 +194,8 @@ import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryNodeFullMetricsMessageSerializer; import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryNodeLeftMessage; import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryNodeLeftMessageMarshallableSerializer; +import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryNodeMessage; +import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryNodeMessageMarshallableSerializer; import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryNodeMetricsMessage; import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryNodeMetricsMessageSerializer; import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryPingRequest; @@ -227,6 +231,8 @@ public DiscoveryMessageFactory(Marshaller marsh, ClassLoader clsLdr) { factory.register((short)-200, TcpDiscoveryCollectionMessage::new, new TcpDiscoveryCollectionMessageMarshallableSerializer(marsh, clsLdr)); + factory.register((short)-117, TcpDiscoveryNodeMessage::new, new TcpDiscoveryNodeMessageMarshallableSerializer(marsh, clsLdr)); + factory.register((short)-116, IgniteProductVersionMessage::new, new IgniteProductVersionMessageSerializer()); factory.register((short)-115, SchemaAlterTableAddColumnOperation::new, new SchemaAlterTableAddColumnOperationSerializer()); factory.register((short)-114, SchemaIndexCreateOperation::new, diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/serializer/RecordDataV1Serializer.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/serializer/RecordDataV1Serializer.java index de920fddadf31..5fb2d97a02d13 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/serializer/RecordDataV1Serializer.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/serializer/RecordDataV1Serializer.java @@ -120,6 +120,7 @@ import org.apache.ignite.internal.util.typedef.internal.CU; import org.apache.ignite.internal.util.typedef.internal.U; import org.apache.ignite.lang.IgniteProductVersion; +import org.apache.ignite.spi.discovery.tcp.messages.IgniteProductVersionMessage; import org.apache.ignite.spi.encryption.EncryptionSpi; import org.apache.ignite.spi.encryption.noop.NoopEncryptionSpi; import org.jetbrains.annotations.Nullable; @@ -850,7 +851,7 @@ WALRecord readPlainRecord(RecordType type, ByteBufferBackedDataInput in, long flags = in.readLong(); - byte[] revHash = new byte[IgniteProductVersion.REV_HASH_SIZE]; + byte[] revHash = new byte[IgniteProductVersionMessage.REV_HASH_SIZE]; byte maj = in.readByte(); byte min = in.readByte(); byte maint = in.readByte(); diff --git a/modules/core/src/main/java/org/apache/ignite/lang/IgniteProductVersion.java b/modules/core/src/main/java/org/apache/ignite/lang/IgniteProductVersion.java index 1c78694550bd5..c62c2ff717720 100644 --- a/modules/core/src/main/java/org/apache/ignite/lang/IgniteProductVersion.java +++ b/modules/core/src/main/java/org/apache/ignite/lang/IgniteProductVersion.java @@ -27,6 +27,7 @@ import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.internal.IgniteVersionUtils; import org.apache.ignite.internal.util.typedef.internal.U; +import org.apache.ignite.spi.discovery.tcp.messages.IgniteProductVersionMessage; import org.jetbrains.annotations.NotNull; /** @@ -41,7 +42,7 @@ public class IgniteProductVersion implements Comparable, E /** */ private static final long serialVersionUID = 0L; - /** Size of the {@link #revHash }*/ + /** Size of the {@link #revisionHash()} }*/ public static final int REV_HASH_SIZE = 20; /** Size in bytes of serialized: 3 bytes (maj, min, maintenance version), 8 bytes - timestamp */ @@ -51,29 +52,19 @@ public class IgniteProductVersion implements Comparable, E private static final Pattern VER_PATTERN = Pattern.compile("(\\d+)\\.(\\d+)\\.(\\d+)([-.]([^0123456789][^-]+)(-SNAPSHOT)?)?(-(\\d+))?(-([\\da-f]+))?"); - /** Major version number. */ - private byte major; - - /** Minor version number. */ - private byte minor; - - /** Maintenance version number. */ - private byte maintenance; - - /** Stage of development. */ - private String stage; - - /** Revision timestamp. */ - private long revTs; - - /** Revision hash. */ - private byte[] revHash; + /** The values holder. */ + private IgniteProductVersionMessage valuesHolder; /** * Empty constructor required by {@link Externalizable}. */ public IgniteProductVersion() { - // No-op. + valuesHolder = new IgniteProductVersionMessage(); + } + + /** @param ver Version. */ + public IgniteProductVersion(IgniteProductVersionMessage ver) { + valuesHolder = ver; } /** @@ -96,17 +87,12 @@ public IgniteProductVersion(byte major, byte minor, byte maintenance, long revTs * @param revHash Revision hash. */ public IgniteProductVersion(byte major, byte minor, byte maintenance, String stage, long revTs, byte[] revHash) { + this(new IgniteProductVersionMessage(major, minor, maintenance, stage, revTs, revHash)); + if (revHash != null && revHash.length != REV_HASH_SIZE) { throw new IllegalArgumentException("Invalid length for SHA1 hash (must be " + REV_HASH_SIZE + "): " + revHash.length); } - - this.major = major; - this.minor = minor; - this.maintenance = maintenance; - this.stage = stage; - this.revTs = revTs; - this.revHash = revHash != null ? revHash : new byte[REV_HASH_SIZE]; } /** @@ -115,7 +101,7 @@ public IgniteProductVersion(byte major, byte minor, byte maintenance, String sta * @return Major version number. */ public byte major() { - return major; + return valuesHolder.major; } /** @@ -124,7 +110,7 @@ public byte major() { * @return Minor version number. */ public byte minor() { - return minor; + return valuesHolder.minor; } /** @@ -133,14 +119,14 @@ public byte minor() { * @return Maintenance version number. */ public byte maintenance() { - return maintenance; + return valuesHolder.maintenance; } /** * @return Stage of development. */ public String stage() { - return stage; + return valuesHolder.stage; } /** @@ -149,7 +135,7 @@ public String stage() { * @return Revision timestamp. */ public long revisionTimestamp() { - return revTs; + return valuesHolder.revTs; } /** @@ -158,7 +144,7 @@ public long revisionTimestamp() { * @return Revision hash. */ public byte[] revisionHash() { - return revHash; + return valuesHolder.revHash; } /** @@ -167,7 +153,7 @@ public byte[] revisionHash() { * @return Release date. */ public Date releaseDate() { - return new Date(revTs * 1000); + return new Date(revisionTimestamp() * 1000); } /** @@ -178,31 +164,31 @@ public Date releaseDate() { */ public boolean greaterThanEqual(int major, int minor, int maintenance) { // NOTE: Unknown version is less than any other version. - if (major == this.major) - return minor == this.minor ? this.maintenance >= maintenance : this.minor > minor; + if (major == major()) + return minor == minor() ? maintenance() >= maintenance : minor() > minor; else - return this.major > major; + return major() > major; } /** {@inheritDoc} */ @Override public int compareTo(@NotNull IgniteProductVersion o) { // NOTE: Unknown version is less than any other version. - int res = Integer.compare(major, o.major); + int res = Integer.compare(major(), o.major()); if (res != 0) return res; - res = Integer.compare(minor, o.minor); + res = Integer.compare(minor(), o.minor()); if (res != 0) return res; - res = Integer.compare(maintenance, o.maintenance); + res = Integer.compare(maintenance(), o.maintenance()); if (res != 0) return res; - return Long.compare(revTs, o.revTs); + return Long.compare(revisionTimestamp(), o.revisionTimestamp()); } /** @@ -210,17 +196,17 @@ public boolean greaterThanEqual(int major, int minor, int maintenance) { * @return Compare result. */ public int compareToIgnoreTimestamp(@NotNull IgniteProductVersion o) { - int res = Integer.compare(major, o.major); + int res = Integer.compare(major(), o.major()); if (res != 0) return res; - res = Integer.compare(minor, o.minor); + res = Integer.compare(minor(), o.minor()); if (res != 0) return res; - return Integer.compare(maintenance, o.maintenance); + return Integer.compare(maintenance(), o.maintenance()); } /** {@inheritDoc} */ @@ -233,47 +219,48 @@ public int compareToIgnoreTimestamp(@NotNull IgniteProductVersion o) { IgniteProductVersion that = (IgniteProductVersion)o; - return revTs == that.revTs && maintenance == that.maintenance && minor == that.minor && major == that.major; + return revisionTimestamp() == that.revisionTimestamp() && maintenance() == that.maintenance() + && minor() == that.minor() && major() == that.major(); } /** {@inheritDoc} */ @Override public int hashCode() { - int res = major; + int res = major(); - res = 31 * res + minor; - res = 31 * res + maintenance; - res = 31 * res + (int)(revTs ^ (revTs >>> 32)); + res = 31 * res + minor(); + res = 31 * res + maintenance(); + res = 31 * res + (int)(revisionTimestamp() ^ (revisionTimestamp() >>> 32)); return res; } /** {@inheritDoc} */ @Override public void writeExternal(ObjectOutput out) throws IOException { - out.writeByte(major); - out.writeByte(minor); - out.writeByte(maintenance); - out.writeLong(revTs); - U.writeByteArray(out, revHash); + out.writeByte(major()); + out.writeByte(minor()); + out.writeByte(maintenance()); + out.writeLong(revisionTimestamp()); + U.writeByteArray(out, revisionHash()); } /** {@inheritDoc} */ @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { - major = in.readByte(); - minor = in.readByte(); - maintenance = in.readByte(); - revTs = in.readLong(); - revHash = U.readByteArray(in); + valuesHolder.major = in.readByte(); + valuesHolder.minor = in.readByte(); + valuesHolder.maintenance = in.readByte(); + valuesHolder.revTs = in.readLong(); + valuesHolder.revHash = U.readByteArray(in); } /** {@inheritDoc} */ @Override public String toString() { - String revTsStr = IgniteVersionUtils.formatBuildTimeStamp(revTs * 1000); + String revTsStr = IgniteVersionUtils.formatBuildTimeStamp(revisionTimestamp() * 1000); - String hash = U.byteArray2HexString(revHash).toLowerCase(); + String hash = U.byteArray2HexString(revisionHash()).toLowerCase(); hash = hash.length() > 8 ? hash.substring(0, 8) : hash; - return major + "." + minor + "." + maintenance + "#" + revTsStr + "-sha1:" + hash; + return major() + "." + minor() + "." + maintenance() + "#" + revTsStr + "-sha1:" + hash; } /** diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java index 02f56983d4114..d4a2343cb2829 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java @@ -2214,7 +2214,7 @@ private void processNodeAddedMessage(TcpDiscoveryNodeAddedMessage msg) { if (spi.getSpiContext().isStopping()) return; - TcpDiscoveryNode node = msg.node(); + TcpDiscoveryNode node = new TcpDiscoveryNode(msg.nodeMsg); UUID newNodeId = node.id(); @@ -2239,8 +2239,7 @@ private void processNodeAddedMessage(TcpDiscoveryNodeAddedMessage msg) { nodeAdded = true; - if (msg.topologyHistory() != null) - topHist.putAll(msg.topologyHistory()); + topHist.putAll(msg.topologyHistory()); } else { if (log.isDebugEnabled()) diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java index f549529989963..715f7b2d45796 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java @@ -1860,9 +1860,7 @@ private void prepareNodeAddedMessage( if (msg instanceof TcpDiscoveryNodeAddedMessage) { TcpDiscoveryNodeAddedMessage nodeAddedMsg = (TcpDiscoveryNodeAddedMessage)msg; - TcpDiscoveryNode node = nodeAddedMsg.node(); - - if (node.id().equals(destNodeId)) { + if (nodeAddedMsg.nodeMsg.id.equals(destNodeId)) { Collection allNodes = ring.allNodes(); Collection topToSnd = new ArrayList<>(allNodes.size()); @@ -1873,7 +1871,7 @@ private void prepareNodeAddedMessage( // in case this message is resent due to failures/leaves. // There will be separate messages for nodes with greater // internal order. - if (n0.internalOrder() < nodeAddedMsg.node().internalOrder()) + if (n0.internalOrder() < nodeAddedMsg.nodeMsg.intOrder) topToSnd.add(n0); } @@ -2434,18 +2432,6 @@ private void processMessageFailedNodes(TcpDiscoveryAbstractMessage msg) { } } - /** */ - private static void enrichNodeWithAttribute(TcpDiscoveryNode node, String attrName, @Nullable Object attrVal) { - if (attrVal == null) - return; - - Map attrs = new HashMap<>(node.getAttributes()); - - attrs.put(attrName, attrVal); - - node.setAttributes(attrs); - } - /** */ private static WorkersRegistry getWorkerRegistry(TcpDiscoverySpi spi) { return spi.ignite() instanceof IgniteEx ? ((IgniteEx)spi.ignite()).context().workersRegistry() : null; @@ -2473,9 +2459,7 @@ void add(TcpDiscoveryAbstractMessage msg) { msg = addedMsg; - TcpDiscoveryNode node = addedMsg.node(); - - if (node.clientRouterNodeId() != null && !msgs.contains(msg)) { + if (addedMsg.nodeMsg.clientRouterNodeId != null && !msgs.contains(msg)) { Collection allNodes = ring.allNodes(); Collection top = new ArrayList<>(allNodes.size()); @@ -2483,7 +2467,7 @@ void add(TcpDiscoveryAbstractMessage msg) { for (TcpDiscoveryNode n0 : allNodes) { assert n0.internalOrder() > 0 : n0; - if (n0.internalOrder() < node.internalOrder()) + if (n0.internalOrder() < addedMsg.nodeMsg.intOrder) top.add(n0); } @@ -2581,7 +2565,7 @@ private void clearClientAddFinished(UUID clientId) { synchronized (msgs) { for (TcpDiscoveryAbstractMessage msg : msgs) { if (msg instanceof TcpDiscoveryNodeAddedMessage) { - if (node.id().equals(((TcpDiscoveryNodeAddedMessage)msg).node().id())) + if (node.id().equals(((TcpDiscoveryNodeAddedMessage)msg).nodeMsg.id)) res = new ArrayList<>(msgs.size()); } @@ -2644,7 +2628,7 @@ private TcpDiscoveryAbstractMessage prepare(TcpDiscoveryAbstractMessage msg, UUI if (msg instanceof TcpDiscoveryNodeAddedMessage) { TcpDiscoveryNodeAddedMessage addedMsg = (TcpDiscoveryNodeAddedMessage)msg; - if (addedMsg.node().id().equals(destNodeId)) { + if (addedMsg.nodeMsg.id.equals(destNodeId)) { assert addedMsg.clientTopology() != null : addedMsg; TcpDiscoveryNodeAddedMessage msg0 = new TcpDiscoveryNodeAddedMessage(addedMsg); @@ -3130,7 +3114,7 @@ protected void runTasks() { boolean proc = false; if (msg instanceof TcpDiscoveryNodeAddedMessage) - proc = ((TcpDiscoveryNodeAddedMessage)msg).node().equals(locNode); + proc = ((TcpDiscoveryNodeAddedMessage)msg).nodeMsg.id.equals(locNode.id()); if (!proc) { if (log.isDebugEnabled()) { @@ -3284,7 +3268,7 @@ private void sendMessageToClients(TcpDiscoveryAbstractMessage msg) { if (msg instanceof TcpDiscoveryNodeAddedMessage) { TcpDiscoveryNodeAddedMessage nodeAddedMsg = (TcpDiscoveryNodeAddedMessage)msg; - if (clientMsgWorker.clientNodeId.equals(nodeAddedMsg.node().id())) { + if (clientMsgWorker.clientNodeId.equals(nodeAddedMsg.nodeMsg.id)) { msg0 = new TcpDiscoveryNodeAddedMessage(nodeAddedMsg); prepareNodeAddedMessage(msg0, clientMsgWorker.clientNodeId, null); @@ -3512,7 +3496,7 @@ else if (log.isTraceEnabled()) if (nextOrder != next.internalOrder()) { // Is next currently being added? boolean nextNew = (msg instanceof TcpDiscoveryNodeAddedMessage && - ((TcpDiscoveryNodeAddedMessage)msg).node().id().equals(nextId)); + ((TcpDiscoveryNodeAddedMessage)msg).nodeMsg.id.equals(nextId)); if (!nextNew) nextNew = hasPendingAddMessage(nextId); @@ -4003,7 +3987,7 @@ private boolean hasPendingAddMessage(UUID nodeId) { if (pendingMsg.msg instanceof TcpDiscoveryNodeAddedMessage) { TcpDiscoveryNodeAddedMessage addMsg = (TcpDiscoveryNodeAddedMessage)pendingMsg.msg; - if (addMsg.node().id().equals(nodeId)) + if (addMsg.nodeMsg.id.equals(nodeId)) return true; } } @@ -4019,7 +4003,7 @@ private boolean hasPendingAddMessage(UUID nodeId) { private void processJoinRequestMessage(final TcpDiscoveryJoinRequestMessage msg) { assert msg != null; - final TcpDiscoveryNode node = msg.node(); + final TcpDiscoveryNode node = new TcpDiscoveryNode(msg.nodeMsg); final UUID locNodeId = getLocalNodeId(); @@ -4836,7 +4820,7 @@ private void sendDirectlyToClient(UUID clientNodeId, TcpDiscoveryAbstractMessage private void processNodeAddedMessage(TcpDiscoveryNodeAddedMessage msg) { assert msg != null; - TcpDiscoveryNode node = msg.node(); + TcpDiscoveryNode node = new TcpDiscoveryNode(msg.nodeMsg); assert node != null; @@ -5058,8 +5042,8 @@ else if (spiState == CONNECTING) // Initialize topology. Collection top = msg.topology(); - if (top != null && !top.isEmpty()) { - spi.gridStartTime = msg.gridStartTime(); + if (!F.isEmpty(top)) { + spi.gridStartTime = msg.gridStartTime; for (TcpDiscoveryNode n : top) { assert n.internalOrder() < node.internalOrder() : @@ -6968,8 +6952,11 @@ else if (msg instanceof TcpDiscoveryJoinRequestMessage) { // Current node holds connection with the node that is joining the cluster. Therefore, it can // save certificates with which the connection was established to joining node attributes. - if (spi.nodeAuth != null && nodeId.equals(req.node().id())) - enrichNodeWithAttribute(req.node(), ATTR_NODE_CERTIFICATES, ses.extractCertificates()); + if (spi.nodeAuth != null && nodeId.equals(req.nodeMsg.id) && ses.extractCertificates() != null) { + req.nodeMsg.attrs = new HashMap<>(req.nodeMsg.attrs); + + req.nodeMsg.attrs.put(ATTR_NODE_CERTIFICATES, ses.extractCertificates()); + } if (!req.responded()) { boolean ok = processJoinRequestMessage(req, clientMsgWrk); @@ -7484,11 +7471,11 @@ private boolean processJoinRequestMessage( long sockTimeout = spi.failureDetectionTimeoutEnabled() ? spi.failureDetectionTimeout() : spi.getSocketTimeout(); - if (state == CONNECTED) { - TcpDiscoveryNode node = msg.node(); + TcpDiscoveryNode node = new TcpDiscoveryNode(msg.nodeMsg); + if (state == CONNECTED) { // Check that joining node can accept incoming connections. - if (node.clientRouterNodeId() == null) { + if (msg.nodeMsg.clientRouterNodeId == null) { if (!pingJoiningNode(node)) { spi.writeToSocket(msg, sock, RES_JOIN_IMPOSSIBLE, sockTimeout); @@ -7504,7 +7491,7 @@ private boolean processJoinRequestMessage( msg.responded(true); if (clientMsgWrk != null && clientMsgWrk.runner() == null && !clientMsgWrk.isDone()) { - clientMsgWrk.clientVersion(U.productVersion(msg.node())); + clientMsgWrk.clientVersion(U.productVersion(node)); new MessageWorkerThreadWithCleanup<>(clientMsgWrk, log).start(); } @@ -7538,7 +7525,7 @@ private boolean processJoinRequestMessage( if (log.isDebugEnabled()) log.debug("Responded to join request message [msg=" + msg + ", res=" + res + ']'); - fromAddrs.addAll(msg.node().socketAddresses()); + fromAddrs.addAll(node.socketAddresses()); spi.stats.onMessageProcessingFinished(msg); @@ -7816,8 +7803,8 @@ private boolean topologyInitialized(TcpDiscoveryAbstractMessage msg) { if (msg instanceof TcpDiscoveryNodeAddedMessage) { TcpDiscoveryNodeAddedMessage addedMsg = (TcpDiscoveryNodeAddedMessage)msg; - if (clientNodeId.equals(addedMsg.node().id())) - return addedMsg.topology() != null; + if (clientNodeId.equals(addedMsg.nodeMsg.id)) + return addedMsg.topMsgs != null; } return true; diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/internal/TcpDiscoveryNode.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/internal/TcpDiscoveryNode.java index 89f1f492e9272..f8ec004b4a5e1 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/internal/TcpDiscoveryNode.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/internal/TcpDiscoveryNode.java @@ -32,7 +32,6 @@ import java.util.UUID; import org.apache.ignite.cache.CacheMetrics; import org.apache.ignite.cluster.ClusterMetrics; -import org.apache.ignite.cluster.ClusterNode; import org.apache.ignite.internal.ClusterMetricsSnapshot; import org.apache.ignite.internal.IgniteNodeAttributes; import org.apache.ignite.internal.managers.discovery.IgniteClusterNode; @@ -46,6 +45,7 @@ import org.apache.ignite.lang.IgniteProductVersion; import org.apache.ignite.spi.discovery.DiscoveryMetricsProvider; import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi; +import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryNodeMessage; import org.jetbrains.annotations.Nullable; import static org.apache.ignite.internal.IgniteNodeAttributes.ATTR_NODE_CONSISTENT_ID; @@ -112,7 +112,7 @@ public class TcpDiscoveryNode extends GridMetadataAwareAdapter implements Ignite /** Metrics provider (transient). */ @GridToStringExclude - private DiscoveryMetricsProvider metricsProvider; + private @Nullable DiscoveryMetricsProvider metricsProvider; /** Visible flag (transient). */ @GridToStringExclude @@ -158,20 +158,21 @@ public TcpDiscoveryNode() { * @param addrs Addresses. * @param hostNames Host names. * @param discPort Port. - * @param metricsProvider Metrics provider. + * @param nodeMetrics Node metrics. * @param ver Version. * @param consistentId Node consistent ID. */ - public TcpDiscoveryNode(UUID id, + private TcpDiscoveryNode( + UUID id, Collection addrs, Collection hostNames, int discPort, - DiscoveryMetricsProvider metricsProvider, + ClusterMetrics nodeMetrics, IgniteProductVersion ver, - Serializable consistentId + Object consistentId ) { assert id != null; - assert metricsProvider != null; + assert nodeMetrics != null; assert ver != null; this.id = id; @@ -183,16 +184,52 @@ public TcpDiscoveryNode(UUID id, this.addrs = sortedAddrs; this.hostNames = hostNames; this.discPort = discPort; - this.metricsProvider = metricsProvider; this.ver = ver; this.consistentId = consistentId != null ? consistentId : U.consistentId(sortedAddrs, discPort); - metrics = metricsProvider.metrics(); + metrics = nodeMetrics; + } + + /** + * Constructor. + * + * @param id Node Id. + * @param addrs Addresses. + * @param hostNames Host names. + * @param discPort Port. + * @param metricsProvider Metrics provider. + * @param ver Version. + * @param consistentId Node consistent ID. + */ + public TcpDiscoveryNode( + UUID id, + Collection addrs, + Collection hostNames, + int discPort, + DiscoveryMetricsProvider metricsProvider, + IgniteProductVersion ver, + Object consistentId + ) { + this(id, addrs, hostNames, discPort, metricsProvider.metrics(), ver, consistentId); + + this.metricsProvider = metricsProvider; cacheMetrics = metricsProvider.cacheMetrics(); + sockAddrs = U.toSocketAddresses(this, discPort); } + /** @param msg The transfer message. */ + public TcpDiscoveryNode(TcpDiscoveryNodeMessage msg) { + this(msg.id, msg.addrs, msg.hostNames, msg.discPort, new ClusterMetricsSnapshot(msg.metricsMsg), + new IgniteProductVersion(msg.verMsg), msg.consistentId); + + attrs = msg.attrs; + order = msg.order; + intOrder = msg.intOrder; + clientRouterNodeId = msg.clientRouterNodeId; + } + /** * @return Last successfully connected address. */ @@ -303,8 +340,8 @@ public Map getAttributes() { } /** {@inheritDoc} */ - @Override public void setCacheMetrics(Map cacheMetrics) { - this.cacheMetrics = cacheMetrics != null ? cacheMetrics : Collections.emptyMap(); + @Override public void setCacheMetrics(@Nullable Map cacheMetrics) { + this.cacheMetrics = !F.isEmpty(cacheMetrics) ? cacheMetrics : Collections.emptyMap(); } /** @@ -463,7 +500,7 @@ public void visible(boolean visible) { /** {@inheritDoc} */ @Override public boolean isClient() { if (!cacheCliInit) { - Boolean clientModeAttr = ((ClusterNode)this).attribute(IgniteNodeAttributes.ATTR_CLIENT_MODE); + Boolean clientModeAttr = (Boolean)attrs.get(IgniteNodeAttributes.ATTR_CLIENT_MODE); cacheCli = clientModeAttr != null && clientModeAttr; @@ -631,21 +668,4 @@ public TcpDiscoveryNode clientReconnectNode(Map nodeAttrs) { @Override public String toString() { return S.toString(TcpDiscoveryNode.class, this, "isClient", isClient(), "dataCenterId", dataCenterId()); } - - /** - * IMPORTANT! - * Only purpose of this constructor is creating node which contains necessary data to store on disc only - * @param node to copy data from - */ - public TcpDiscoveryNode(ClusterNode node) { - this.id = node.id(); - this.consistentId = node.consistentId(); - this.addrs = node.addresses(); - this.hostNames = node.hostNames(); - this.order = node.order(); - this.ver = node.version(); - this.clientRouterNodeId = node.isClient() ? node.id() : null; - - attrs = Collections.singletonMap(ATTR_NODE_CONSISTENT_ID, consistentId); - } } diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/IgniteProductVersionMessage.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/IgniteProductVersionMessage.java new file mode 100644 index 0000000000000..b10cee6d0397d --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/IgniteProductVersionMessage.java @@ -0,0 +1,92 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.spi.discovery.tcp.messages; + +import org.apache.ignite.internal.Order; +import org.apache.ignite.internal.managers.discovery.DiscoveryMessageFactory; +import org.apache.ignite.lang.IgniteProductVersion; +import org.apache.ignite.plugin.extensions.communication.Message; + +/** Message for {@link IgniteProductVersion}.*/ +public class IgniteProductVersionMessage implements Message { + /** Size of the {@link #revHash }*/ + public static final int REV_HASH_SIZE = 20; + + /** Major version number. */ + @Order(0) + public byte major; + + /** Minor version number. */ + @Order(1) + public byte minor; + + /** Maintenance version number. */ + @Order(2) + public byte maintenance; + + /** Stage of development. */ + @Order(3) + public String stage; + + /** Revision timestamp. */ + @Order(4) + public long revTs; + + /** Revision hash. */ + @Order(5) + public byte[] revHash; + + /** Constructor for {@link DiscoveryMessageFactory}. */ + public IgniteProductVersionMessage() { + // No-op. + } + + /** + * @param major Major version. + * @param minor Minor version. + * @param maintenance Maintenance. + * @param stage Stage. + * @param revTs Revision timestamp. + * @param revHash Revision hash. + */ + public IgniteProductVersionMessage(byte major, byte minor, byte maintenance, String stage, long revTs, byte[] revHash) { + this.major = major; + this.minor = minor; + this.maintenance = maintenance; + this.stage = stage; + this.revTs = revTs; + this.revHash = revHash != null ? revHash : new byte[REV_HASH_SIZE]; + } + + /** @param ver Product version. */ + public IgniteProductVersionMessage(IgniteProductVersion ver) { + this( + ver.major(), + ver.minor(), + ver.maintenance(), + ver.stage(), + ver.revisionTimestamp(), + ver.revisionHash() + ); + } + + /** {@inheritDoc} */ + @Override public short directType() { + return -116; + } +} diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryJoinRequestMessage.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryJoinRequestMessage.java index 1e39d5b29fe2d..9bcad97e08c8a 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryJoinRequestMessage.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryJoinRequestMessage.java @@ -17,32 +17,22 @@ package org.apache.ignite.spi.discovery.tcp.messages; -import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.internal.Order; import org.apache.ignite.internal.util.typedef.internal.S; -import org.apache.ignite.internal.util.typedef.internal.U; -import org.apache.ignite.marshaller.Marshaller; -import org.apache.ignite.plugin.extensions.communication.MarshallableMessage; import org.apache.ignite.spi.discovery.tcp.internal.DiscoveryDataPacket; import org.apache.ignite.spi.discovery.tcp.internal.TcpDiscoveryNode; -import static org.apache.ignite.internal.util.lang.ClusterNodeFunc.eqNodes; - /** * Initial message sent by a node that wants to enter topology. * Sent to random node during SPI start. Then forwarded directly to coordinator. */ -public class TcpDiscoveryJoinRequestMessage extends TcpDiscoveryAbstractTraceableMessage implements MarshallableMessage { +public class TcpDiscoveryJoinRequestMessage extends TcpDiscoveryAbstractTraceableMessage { /** */ private static final long serialVersionUID = 0L; - /** New node that wants to join the topology. */ - private TcpDiscoveryNode node; - - /** Serialized {@link #node}. */ - // TODO Remove the field after completing https://issues.apache.org/jira/browse/IGNITE-27899. + /** Message holding new node that wants to join the topology. */ @Order(0) - byte[] nodeBytes; + public TcpDiscoveryNodeMessage nodeMsg; /** Discovery data container. */ @Order(1) @@ -62,19 +52,10 @@ public TcpDiscoveryJoinRequestMessage() { public TcpDiscoveryJoinRequestMessage(TcpDiscoveryNode node, DiscoveryDataPacket dataPacket) { super(node.id()); - this.node = node; + nodeMsg = new TcpDiscoveryNodeMessage(node); this.dataPacket = dataPacket; } - /** - * Gets new node that wants to join the topology. - * - * @return Node that wants to join the topology. - */ - public TcpDiscoveryNode node() { - return node; - } - /** @return Discovery data container that collects data from all cluster nodes. */ public DiscoveryDataPacket gridDiscoveryData() { return dataPacket; @@ -94,17 +75,6 @@ public void responded(boolean responded) { setFlag(RESPONDED_FLAG_POS, responded); } - /** {@inheritDoc} */ - @Override public void prepareMarshal(Marshaller marsh) throws IgniteCheckedException { - if (node != null) - nodeBytes = U.marshal(marsh, node); - } - - /** {@inheritDoc} */ - @Override public void finishUnmarshal(Marshaller marsh, ClassLoader clsLdr) throws IgniteCheckedException { - if (nodeBytes != null) - node = U.unmarshal(marsh, nodeBytes, clsLdr); - } /** {@inheritDoc} */ @Override public boolean equals(Object obj) { @@ -116,7 +86,7 @@ public void responded(boolean responded) { TcpDiscoveryJoinRequestMessage other = (TcpDiscoveryJoinRequestMessage)obj; - return eqNodes(other.node, node); + return nodeMsg.id.equals(other.nodeMsg.id); } /** {@inheritDoc} */ diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryNodeAddFinishedMessage.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryNodeAddFinishedMessage.java index c11bee62e8513..73176cbb9988a 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryNodeAddFinishedMessage.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryNodeAddFinishedMessage.java @@ -25,7 +25,6 @@ import org.apache.ignite.internal.util.typedef.internal.S; import org.apache.ignite.internal.util.typedef.internal.U; import org.apache.ignite.marshaller.Marshaller; -import org.apache.ignite.plugin.extensions.communication.MarshallableMessage; import org.apache.ignite.spi.discovery.tcp.internal.DiscoveryDataPacket; import org.jetbrains.annotations.Nullable; @@ -34,7 +33,7 @@ */ @TcpDiscoveryEnsureDelivery @TcpDiscoveryRedirectToClient -public class TcpDiscoveryNodeAddFinishedMessage extends TcpDiscoveryAbstractTraceableMessage implements MarshallableMessage { +public class TcpDiscoveryNodeAddFinishedMessage extends TcpDiscoveryAbstractTraceableMessage { /** */ private static final long serialVersionUID = 0L; diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryNodeAddedMessage.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryNodeAddedMessage.java index bdb75e1978301..b7c7fe7153167 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryNodeAddedMessage.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryNodeAddedMessage.java @@ -18,42 +18,35 @@ package org.apache.ignite.spi.discovery.tcp.messages; import java.util.Collection; +import java.util.Collections; import java.util.Map; import java.util.UUID; -import org.apache.ignite.IgniteCheckedException; +import java.util.stream.Collectors; import org.apache.ignite.cluster.ClusterNode; import org.apache.ignite.internal.Order; import org.apache.ignite.internal.managers.discovery.DiscoveryMessageFactory; -import org.apache.ignite.internal.util.tostring.GridToStringExclude; import org.apache.ignite.internal.util.tostring.GridToStringInclude; import org.apache.ignite.internal.util.typedef.F; import org.apache.ignite.internal.util.typedef.internal.S; import org.apache.ignite.internal.util.typedef.internal.U; -import org.apache.ignite.marshaller.Marshaller; -import org.apache.ignite.plugin.extensions.communication.MarshallableMessage; import org.apache.ignite.spi.discovery.tcp.internal.DiscoveryDataPacket; import org.apache.ignite.spi.discovery.tcp.internal.TcpDiscoveryNode; import org.jetbrains.annotations.Nullable; /** - * TODO: Use NodeMessage for {@link TcpDiscoveryNode} and {@link ClusterNode} after https://issues.apache.org/jira/browse/IGNITE-27899 * Message telling nodes that new node should be added to topology. * When newly added node receives the message it connects to its next and finishes * join process. */ @TcpDiscoveryEnsureDelivery @TcpDiscoveryRedirectToClient -public class TcpDiscoveryNodeAddedMessage extends TcpDiscoveryAbstractTraceableMessage implements MarshallableMessage { +public class TcpDiscoveryNodeAddedMessage extends TcpDiscoveryAbstractTraceableMessage { /** */ private static final long serialVersionUID = 0L; - /** Added node. */ - private TcpDiscoveryNode node; - - /** Marshalled {@link #node}. */ + /** Message of the added node. */ @Order(0) - @GridToStringExclude - byte[] nodeBytes; + public TcpDiscoveryNodeMessage nodeMsg; /** */ @Order(1) @@ -65,28 +58,20 @@ public class TcpDiscoveryNodeAddedMessage extends TcpDiscoveryAbstractTraceableM /** Current topology. Initialized by coordinator. */ @GridToStringInclude - private Collection top; - - /** Marshalled {@link #top}. */ @Order(3) - @GridToStringExclude - @Nullable byte[] topBytes; + public @Nullable Collection topMsgs; /** */ @GridToStringInclude private transient Collection clientTop; /** Topology snapshots history. */ - private Map> topHist; - - /** Marshalled {@link #topHist}. */ @Order(4) - @GridToStringExclude - @Nullable byte[] topHistBytes; + Map> topHistMsgs; /** Start time of the first grid node. */ @Order(5) - long gridStartTime; + public long gridStartTime; /** Constructor for {@link DiscoveryMessageFactory}. */ public TcpDiscoveryNodeAddedMessage() { @@ -112,7 +97,7 @@ public TcpDiscoveryNodeAddedMessage( assert node != null; assert gridStartTime > 0; - this.node = node; + nodeMsg = new TcpDiscoveryNodeMessage(node); this.dataPacket = dataPacket; this.gridStartTime = gridStartTime; } @@ -123,14 +108,11 @@ public TcpDiscoveryNodeAddedMessage( public TcpDiscoveryNodeAddedMessage(TcpDiscoveryNodeAddedMessage msg) { super(msg); - node = msg.node; - nodeBytes = msg.nodeBytes; + nodeMsg = msg.nodeMsg; pendingMsgsMsg = msg.pendingMsgsMsg; - top = msg.top; - topBytes = msg.topBytes; + topMsgs = msg.topMsgs; clientTop = msg.clientTop; - topHist = msg.topHist; - topHistBytes = msg.topHistBytes; + topHistMsgs = msg.topHistMsgs; dataPacket = msg.dataPacket; gridStartTime = msg.gridStartTime; } @@ -141,7 +123,7 @@ public TcpDiscoveryNodeAddedMessage(TcpDiscoveryNodeAddedMessage msg) { * @return New node. */ public TcpDiscoveryNode node() { - return node; + return new TcpDiscoveryNode(nodeMsg); } /** @@ -159,7 +141,7 @@ public TcpDiscoveryNode node() { * @param msgs Pending messages to send to new node. */ public void messages(@Nullable Collection msgs) { - pendingMsgsMsg = F.isEmpty(msgs) ? null : new TcpDiscoveryCollectionMessage(msgs); + pendingMsgsMsg = msgs == null ? null : new TcpDiscoveryCollectionMessage(msgs); } /** @@ -168,7 +150,7 @@ public void messages(@Nullable Collection msgs) { * @return Current topology. */ @Nullable public Collection topology() { - return top; + return topMsgs == null ? null : topMsgs.stream().map(TcpDiscoveryNode::new).collect(Collectors.toList()); } /** @@ -177,8 +159,9 @@ public void messages(@Nullable Collection msgs) { * @param top Current topology. */ public void topology(@Nullable Collection top) { - this.top = top; - topBytes = null; + topMsgs = top == null + ? null + : top.stream().map(TcpDiscoveryNodeMessage::new).collect(Collectors.toList());; } /** @@ -203,7 +186,15 @@ public Collection clientTopology() { * @return Map with topology snapshots history. */ public Map> topologyHistory() { - return topHist; + if (F.isEmpty(topMsgs)) + return Collections.emptyMap(); + + Map> res = U.newHashMap(topHistMsgs.size()); + + topHistMsgs.forEach((id, nodeMsgs) -> + res.put(id, nodeMsgs.stream().map(TcpDiscoveryNode::new).collect(Collectors.toList()))); + + return res; } /** @@ -212,8 +203,16 @@ public Map> topologyHistory() { * @param topHist Map with topology snapshots history. */ public void topologyHistory(@Nullable Map> topHist) { - this.topHist = topHist; - topHistBytes = null; + if (F.isEmpty(topHist)) { + topHistMsgs = null; + + return; + } + + topHistMsgs = U.newHashMap(topHist.size()); + + topHist.forEach((id, nodes) -> + topHistMsgs.put(id, nodes.stream().map(TcpDiscoveryNodeMessage::new).collect(Collectors.toList()))); } /** @@ -246,35 +245,6 @@ public long gridStartTime() { return gridStartTime; } - /** @param marsh marshaller. */ - @Override public void prepareMarshal(Marshaller marsh) throws IgniteCheckedException { - if (node != null) - nodeBytes = U.marshal(marsh, node); - - if (top != null) - topBytes = U.marshal(marsh, top); - - if (topHist != null) - topHistBytes = U.marshal(marsh, topHist); - } - - /** {@inheritDoc} */ - @Override public void finishUnmarshal(Marshaller marsh, ClassLoader clsLdr) throws IgniteCheckedException { - if (nodeBytes != null) - node = U.unmarshal(marsh, nodeBytes, clsLdr); - - if (topBytes != null) - top = U.unmarshal(marsh, topBytes, clsLdr); - - if (topHistBytes != null) - topHist = U.unmarshal(marsh, topHistBytes, clsLdr); - - nodeBytes = null; - topBytes = null; - topHistBytes = null; - } - - /** {@inheritDoc} */ @Override public short directType() { return 29; diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryNodeMessage.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryNodeMessage.java new file mode 100644 index 0000000000000..e40e6a166ccaf --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryNodeMessage.java @@ -0,0 +1,135 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.spi.discovery.tcp.messages; + +import java.util.Collection; +import java.util.Map; +import java.util.UUID; +import org.apache.ignite.IgniteCheckedException; +import org.apache.ignite.cluster.ClusterNode; +import org.apache.ignite.internal.Order; +import org.apache.ignite.internal.managers.discovery.DiscoveryMessageFactory; +import org.apache.ignite.internal.util.typedef.internal.U; +import org.apache.ignite.marshaller.Marshaller; +import org.apache.ignite.plugin.extensions.communication.MarshallableMessage; +import org.apache.ignite.spi.discovery.tcp.internal.TcpDiscoveryNode; +import org.jetbrains.annotations.Nullable; + +/** Message for {@link TcpDiscoveryNode}. */ +public class TcpDiscoveryNodeMessage implements MarshallableMessage { + /** Node ID. */ + @Order(0) + public UUID id; + + /** */ + public Object consistentId; + + /** Serialized {@link #consistentId}. */ + @Order(1) + byte[] consistentIdBytes; + + /** */ + public Map attrs; + + /** */ + @Order(2) + byte[] attrsBytes; + + /** Internal discovery addresses as strings. */ + @Order(3) + public Collection addrs; + + /** Internal discovery host names as strings. */ + @Order(4) + public Collection hostNames; + + /** */ + @Order(5) + public int discPort; + + /** */ + @Order(6) + public TcpDiscoveryNodeMetricsMessage metricsMsg; + + /** */ + @Order(7) + public long order; + + /** */ + @Order(8) + public long intOrder; + + /** */ + @Order(9) + public IgniteProductVersionMessage verMsg; + + /** */ + @Order(10) + public @Nullable UUID clientRouterNodeId; + + /** Constructor for {@link DiscoveryMessageFactory}. */ + public TcpDiscoveryNodeMessage() { + // No-op. + } + + /** @param clusterNode Cluster node. */ + public TcpDiscoveryNodeMessage(ClusterNode clusterNode) { + assert clusterNode instanceof TcpDiscoveryNode; + + TcpDiscoveryNode n = (TcpDiscoveryNode)clusterNode; + + id = n.id(); + consistentId = n.consistentId(); + attrs = n.getAttributes(); + addrs = n.addresses(); + hostNames = n.hostNames(); + metricsMsg = new TcpDiscoveryNodeMetricsMessage(n.metrics()); + order = n.order(); + verMsg = new IgniteProductVersionMessage(n.version()); + + discPort = n.discoveryPort(); + intOrder = n.internalOrder(); + clientRouterNodeId = n.clientRouterNodeId(); + } + + /** {@inheritDoc} */ + @Override public void prepareMarshal(Marshaller marsh) throws IgniteCheckedException { + if (attrs != null) + attrsBytes = U.marshal(marsh, attrs); + + if (consistentId != null) + consistentIdBytes = U.marshal(marsh, consistentId); + } + + /** {@inheritDoc} */ + @Override public void finishUnmarshal(Marshaller marsh, ClassLoader clsLdr) throws IgniteCheckedException { + if (attrsBytes != null) + attrs = U.unmarshal(marsh, attrsBytes, clsLdr); + + if (consistentIdBytes != null) + consistentId = U.unmarshal(marsh, consistentIdBytes, clsLdr); + + attrsBytes = null; + consistentIdBytes = null; + } + + /** {@inheritDoc} */ + @Override public short directType() { + return -117; + } +} diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/affinity/GridAffinityAssignmentV2Test.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/affinity/GridAffinityAssignmentV2Test.java index 5d8e966851c3c..f994cee551f1d 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/affinity/GridAffinityAssignmentV2Test.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/affinity/GridAffinityAssignmentV2Test.java @@ -29,7 +29,10 @@ import java.util.List; import java.util.Set; import java.util.UUID; + +import org.apache.ignite.cluster.ClusterMetrics; import org.apache.ignite.cluster.ClusterNode; +import org.apache.ignite.internal.ClusterMetricsSnapshot; import org.apache.ignite.internal.util.collection.BitSetIntSet; import org.apache.ignite.internal.util.typedef.internal.U; import org.apache.ignite.lang.IgniteProductVersion; @@ -49,7 +52,11 @@ */ public class GridAffinityAssignmentV2Test { /** */ - protected DiscoveryMetricsProvider metrics = new SerializableMetricsProvider(); + protected DiscoveryMetricsProvider metrics = new SerializableMetricsProvider() { + @Override public ClusterMetrics metrics() { + return new ClusterMetricsSnapshot(); + } + }; /** */ protected IgniteProductVersion ver = new IgniteProductVersion(); diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/security/NodeSecurityContextPropagationTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/security/NodeSecurityContextPropagationTest.java index 43cb4a6dcae25..05aa8b6e27809 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/security/NodeSecurityContextPropagationTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/security/NodeSecurityContextPropagationTest.java @@ -170,7 +170,7 @@ public void testProcessCustomDiscoveryMessageFromLeftNode() throws Exception { private boolean isDiscoveryNodeAddedMessage(Object msg, int joiningNdeIdx) { return msg instanceof TcpDiscoveryNodeAddedMessage && Objects.equals(getTestIgniteInstanceName(joiningNdeIdx), - ((TcpDiscoveryNodeAddedMessage)msg).node().attribute(ATTR_IGNITE_INSTANCE_NAME)); + ((TcpDiscoveryNodeAddedMessage)msg).nodeMsg.attrs.get(ATTR_IGNITE_INSTANCE_NAME)); } /** */ diff --git a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiSelfTest.java b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiSelfTest.java index 8bc4664977ad3..14f403440e06a 100644 --- a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiSelfTest.java @@ -2684,7 +2684,7 @@ public void resumeAll() { if (delayJoinAckFor != null && msg instanceof TcpDiscoveryJoinRequestMessage) { TcpDiscoveryJoinRequestMessage msg0 = (TcpDiscoveryJoinRequestMessage)msg; - if (delayJoinAckFor.equals(msg0.node().attribute(IgniteNodeAttributes.ATTR_IGNITE_INSTANCE_NAME))) { + if (delayJoinAckFor.equals(msg0.nodeMsg.attrs.get(IgniteNodeAttributes.ATTR_IGNITE_INSTANCE_NAME))) { log.info("Delay response [sock=" + sock + ", msg=" + msg0 + ", res=" + res + ']'); delayJoinAckFor = null; diff --git a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryDeadNodeAddressResolvingTest.java b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryDeadNodeAddressResolvingTest.java index e07aee83326bf..ddd01e7c66482 100644 --- a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryDeadNodeAddressResolvingTest.java +++ b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryDeadNodeAddressResolvingTest.java @@ -100,8 +100,11 @@ private void checkSockAddrsNull(GridDiscoveryManager disco, int topology, String ClusterNode clusterNode = node.get(); - Object sockAddrs = GridTestUtils.getFieldValue(clusterNode, "sockAddrs"); - assertNull(sockAddrs); + if (clusterNode instanceof TcpDiscoveryNode) { + Object sockAddrs = GridTestUtils.getFieldValue(clusterNode, "sockAddrs"); + + assertNull(sockAddrs); + } } /** diff --git a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryNetworkIssuesTest.java b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryNetworkIssuesTest.java index 4175f0b61fa87..5e6a96d33ad84 100644 --- a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryNetworkIssuesTest.java +++ b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryNetworkIssuesTest.java @@ -58,6 +58,7 @@ import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryAbstractMessage; import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryHandshakeRequest; import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryHandshakeResponse; +import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryNodeMessage; import org.apache.ignite.testframework.GridTestUtils; import org.apache.ignite.testframework.ListeningTestLogger; import org.apache.ignite.testframework.LogListener; @@ -693,7 +694,7 @@ private static final class TestTcpDiscoveryNode extends TcpDiscoveryNode { * @param simulatedAddrs Simulated addresses of {@code node} */ public TestTcpDiscoveryNode(TcpDiscoveryNode node, Collection simulatedAddrs) { - super(node); + super(new TcpDiscoveryNodeMessage(node)); setAttributes(node.attributes()); diff --git a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryNodeJoinAndFailureTest.java b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryNodeJoinAndFailureTest.java index f449be13c8e85..e31045a241cad 100644 --- a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryNodeJoinAndFailureTest.java +++ b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryNodeJoinAndFailureTest.java @@ -152,7 +152,7 @@ Test reproduces the needed behavior (two nodes in CONNECTING state) doing the fo if (msg instanceof TcpDiscoveryJoinRequestMessage) { TcpDiscoveryJoinRequestMessage joinReq = (TcpDiscoveryJoinRequestMessage)msg; - if (joinReq.node().id().equals(node2Id)) + if (joinReq.nodeMsg.id.equals(node2Id)) joinReqsCntr.incrementAndGet(); } } @@ -195,14 +195,14 @@ Test reproduces the needed behavior (two nodes in CONNECTING state) doing the fo int joinReqsCnt = joinReqsCntr.get(); - if (joinReq.node().id().equals(node2Id) && joinReqsCnt == 1) + if (joinReq.nodeMsg.id.equals(node2Id) && joinReqsCnt == 1) throw new RuntimeException("Stop node1 exception by subsequent join req"); } if (msg instanceof TcpDiscoveryNodeAddedMessage) { TcpDiscoveryNodeAddedMessage addedMsg = (TcpDiscoveryNodeAddedMessage)msg; - if (addedMsg.node().discoveryPort() == 47503) + if (addedMsg.nodeMsg.discPort == 47503) throw new RuntimeException("Stop node1 exception by new node added msg"); } } diff --git a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySelfTest.java b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySelfTest.java index 95494a0bcdb30..751550d816f8c 100644 --- a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySelfTest.java @@ -17,7 +17,6 @@ package org.apache.ignite.spi.discovery.tcp; -import java.io.ByteArrayOutputStream; import java.io.IOException; import java.net.InetSocketAddress; import java.net.SocketTimeoutException; @@ -331,38 +330,6 @@ public void testThreeNodesStartStop() throws Exception { } } - /** - * @throws Exception If any errors occur. - */ - @Test - public void testNodeConnectMessageSize() throws Exception { - try { - Ignite g1 = startGrid(1); - - final AtomicInteger igniteInstanceNameIdx = new AtomicInteger(1); - - GridTestUtils.runMultiThreaded(new Callable() { - @Nullable @Override public Object call() throws Exception { - startGrid(igniteInstanceNameIdx.incrementAndGet()); - - return null; - } - }, 4, "grid-starter"); - - Collection nodes = ((ServerImpl)discoMap.get(g1.name()).impl).ring().allNodes(); - - ByteArrayOutputStream bos = new ByteArrayOutputStream(); - - marshaller(g1).marshal(nodes, bos); - - info(">>> Approximate node connect message size [topSize=" + nodes.size() + - ", msgSize=" + bos.size() / 1024.0 + "KB]"); - } - finally { - stopAllGrids(false); - } - } - /** * @throws Exception If any error occurs. */ diff --git a/modules/core/src/test/java/org/apache/ignite/testframework/junits/multijvm/IgniteProcessProxy.java b/modules/core/src/test/java/org/apache/ignite/testframework/junits/multijvm/IgniteProcessProxy.java index 7ccf15f67733f..caed628d1d61b 100644 --- a/modules/core/src/test/java/org/apache/ignite/testframework/junits/multijvm/IgniteProcessProxy.java +++ b/modules/core/src/test/java/org/apache/ignite/testframework/junits/multijvm/IgniteProcessProxy.java @@ -106,7 +106,7 @@ @SuppressWarnings("TransientFieldInNonSerializableClass") public class IgniteProcessProxy implements IgniteEx { /** Grid proxies. */ - private static final transient ConcurrentMap gridProxies = new ConcurrentHashMap<>(); + private static final ConcurrentMap gridProxies = new ConcurrentHashMap<>(); /** Property that specify alternative {@code JAVA_HOME}. */ private static final String TEST_MULTIJVM_JAVA_HOME = "test.multijvm.java.home";