Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 18 additions & 0 deletions src/java/org/apache/cassandra/tcm/transformations/Register.java
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,24 @@ public Kind kind()
@Override
public Result execute(ClusterMetadata prev)
{
// Ensure the joining node can read existing cluster metadata.
// Skip check for empty directory (first node in a new cluster).
if (!prev.directory.isEmpty())
{
Version clusterVersion = prev.directory.commonSerializationVersion;
Version newNodeVersion = version.serializationVersion();
if (newNodeVersion.isBefore(clusterVersion))
{
return new Rejected(INVALID,
String.format("Cannot register node: this node's metadata serialization version %s " +
"is lower than the cluster's minimum required version %s. " +
"Node would not be able to read cluster metadata. " +
"Please upgrade the node to a Cassandra version that supports " +
"metadata serialization version %s or higher before joining the cluster.",
newNodeVersion, clusterVersion, clusterVersion));
}
}

for (Map.Entry<NodeId, NodeAddresses> entry : prev.directory.addresses.entrySet())
{
NodeAddresses existingAddresses = entry.getValue();
Expand Down
15 changes: 15 additions & 0 deletions src/java/org/apache/cassandra/tcm/transformations/Startup.java
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,21 @@ public Kind kind()
@Override
public Result execute(ClusterMetadata prev)
{
// Prevent downgrade to a version that cannot read cluster metadata.
// This protects against restarting a node with an older binary.
Version clusterVersion = prev.directory.commonSerializationVersion;
Version newNodeVersion = nodeVersion.serializationVersion();
if (newNodeVersion.isBefore(clusterVersion))
{
return new Rejected(INVALID,
String.format("Cannot start node: this node's metadata serialization version %s " +
"is lower than the cluster's minimum required version %s. " +
"Node would not be able to read cluster metadata. " +
"Please upgrade the node to a Cassandra version that supports " +
"metadata serialization version %s or higher before restarting.",
newNodeVersion, clusterVersion, clusterVersion));
}

ClusterMetadata.Transformer next = prev.transformer();
if (!prev.directory.addresses.get(nodeId).equals(addresses))
{
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.cassandra.distributed.test.log;

import net.bytebuddy.ByteBuddy;
import net.bytebuddy.dynamic.loading.ClassLoadingStrategy;
import net.bytebuddy.implementation.MethodDelegation;

import org.junit.Assert;
import org.junit.Test;

import org.apache.cassandra.distributed.Cluster;
import org.apache.cassandra.distributed.test.TestBaseImpl;
import org.apache.cassandra.tcm.membership.NodeVersion;
import org.apache.cassandra.tcm.serialization.Version;

import static net.bytebuddy.matcher.ElementMatchers.named;
import static org.apache.cassandra.tcm.membership.NodeVersion.CURRENT_METADATA_VERSION;

public class IncompatibleMetadataSerializationVersionTest extends TestBaseImpl
{
@Test
public void incompatibleVersionsCauseStartupFailureTest() throws Throwable
{
try (Cluster cluster = builder().withNodes(2)
.withInstanceInitializer(BB::install)
.createWithoutStarting())
{
cluster.get(1).startup();
// node1 has joined as normal so any entries committed to the metadata log will be serialized with
// NodeVersion.CURRENT_METADATA_VERSION. We will join node2, but the BB class used as an instanceInitializer
// will force it not to recognise this version. This simulates a node running an older, incompatible version
// attempting to join the cluster and should fail as the metadata log and snapshots it receives at startup
// are unreadable to it.
// We'll also set up the uncaught exceptions filter so that errors reported by node2 do not automatically
// trigger a failure, so that we can assert that the specific error we're expecting is thrown and logged.
cluster.setUncaughtExceptionsFilter((i, t) -> i != 2);
try
{
cluster.get(2).startup();
Assert.fail("Node2 startup should fail due to unsupported metadata versions");
}
catch (Exception e)
{
String expectedError = String.format("Unsupported metadata version \\(%s\\)", CURRENT_METADATA_VERSION.asInt());
Assert.assertFalse(cluster.get(2)
.logs()
.grep(expectedError)
.getResult()
.isEmpty());
}
}
}

public static class BB
{
static void install(ClassLoader cl, int node)
{
// only change behaviour of node2
if (node == 2)
{
new ByteBuddy().rebase(Version.class)
.method(named("fromInt"))
.intercept(MethodDelegation.to(BB.class))
.make()
.load(cl, ClassLoadingStrategy.Default.INJECTION);

new ByteBuddy().rebase(NodeVersion.class)
.method(named("serializationVersion"))
.intercept(MethodDelegation.to(BB.class))
.make()
.load(cl, ClassLoadingStrategy.Default.INJECTION);
}
}

public static Version serializationVersion()
{
// This is called during node startup when initializing the LogState class and in particular its static
// defaultMessageSerializer field. We will emulate the behaviour of a node running an old version.
return Version.V0;
}

public static Version fromInt(int i)
{
// Behave as if the supplied version is invalid, unless it is the V0 value we are returning from the other
// intercepted method. This will cause any other version encountered, such as when receiving versioned log
// entries from another node, to appear unreadable.
if (i == Version.V0.asInt())
return Version.V0;

throw new IllegalArgumentException("Unsupported metadata version (" + i + ")");
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,6 @@
import org.apache.cassandra.tcm.serialization.Version;
import org.apache.cassandra.tcm.transformations.PrepareLeave;
import org.apache.cassandra.tcm.transformations.Register;
import org.apache.cassandra.tcm.transformations.Startup;
import org.apache.cassandra.tcm.transformations.TriggerSnapshot;
import org.apache.cassandra.tcm.transformations.Unregister;
import org.apache.cassandra.utils.CassandraVersion;
Expand All @@ -58,6 +57,8 @@

public class RegisterTest extends TestBaseImpl
{
private static final Location TEST_LOCATION = new Location("datacenter1", "rack1");

@Test
public void testRegistrationIdempotence() throws Throwable
{
Expand Down Expand Up @@ -103,28 +104,28 @@ public void serializationVersionCeilingTest() throws Throwable
try (Cluster cluster = builder().withNodes(1)
.createWithoutStarting())
{
final String firstNodeEndpoint = "127.0.0.10";
cluster.get(1).startup();
cluster.get(1).runOnInstance(() -> {
try
{
// Register a ghost node with V0 to fake-force V0 serialization. In a real world cluster we will always be upgrading from a smaller version.
ClusterMetadataService.instance().commit(new Register(new NodeAddresses(InetAddressAndPort.getByName(firstNodeEndpoint)),
ClusterMetadata.current().directory.location(ClusterMetadata.current().myNodeId()),
// Unregister to make directory empty
ClusterMetadataService.instance().commit(new Unregister(ClusterMetadata.current().myNodeId(),
EnumSet.allOf(NodeState.class),
ClusterMetadataService.instance().placementProvider()));

// Register a ghost node with V0 (bypasses version check because directory is now empty).
// In a real world cluster we will always be upgrading from a smaller version.
ClusterMetadataService.instance().commit(new Register(new NodeAddresses(InetAddressAndPort.getByName("127.0.0.100")),
TEST_LOCATION,
new NodeVersion(NodeVersion.CURRENT.cassandraVersion, Version.V0)));
NodeId oldNode = ClusterMetadata.current().directory.peerId(InetAddressAndPort.getByName(firstNodeEndpoint));
// Fake an upgrade of this node and assert we continue to serialize so that the one which only
// supports V0 can deserialize. In a real cluster it wouldn't happen exactly in this way (here the
// min serialization version actually goes backwards from CURRENT to V0 when we upgrade, which would
// not happen in a real cluster as we would never register like oldNode, with the current C* version
// but an older metadata version
NodeId oldNode = ClusterMetadata.current().directory.peerId(InetAddressAndPort.getByName("127.0.0.100"));

// Register a node with upgraded version
CassandraVersion currentVersion = NodeVersion.CURRENT.cassandraVersion;
NodeVersion upgraded = new NodeVersion(new CassandraVersion(String.format("%d.%d.%d", currentVersion.major + 1, 0, 0)),
NodeVersion.CURRENT_METADATA_VERSION);
ClusterMetadata metadata = ClusterMetadata.current();
NodeId id = metadata.myNodeId();
Startup startup = new Startup(id, metadata.directory.getNodeAddresses(id), upgraded);
ClusterMetadataService.instance().commit(startup);
ClusterMetadataService.instance().commit(new Register(new NodeAddresses(InetAddressAndPort.getByName("127.0.0.200")), TEST_LOCATION, upgraded));

// Doesn't matter which specific Transformation we use here, we're testing that the serializer uses
// the correct lower bound
Transformation t = new Register(NodeAddresses.current(), new Location("DC", "RACK"), NodeVersion.CURRENT);
Expand Down Expand Up @@ -173,9 +174,15 @@ public void replayLocallyFromV0Snapshot() throws Throwable
cluster.get(1).runOnInstance(() -> {
try
{
// Register a ghost node with V0 to fake-force V0 serialization. In a real world cluster we will always be upgrading from a smaller version.
ClusterMetadataService.instance().commit(new Register(new NodeAddresses(InetAddressAndPort.getByName("127.0.0.10")),
ClusterMetadata.current().directory.location(ClusterMetadata.current().myNodeId()),
// Unregister to make directory empty
ClusterMetadataService.instance().commit(new Unregister(ClusterMetadata.current().myNodeId(),
EnumSet.allOf(NodeState.class),
ClusterMetadataService.instance().placementProvider()));

// Register a ghost node with V0 (bypasses version check because directory is now empty).
// In a real world cluster we will always be upgrading from a smaller version.
ClusterMetadataService.instance().commit(new Register(new NodeAddresses(InetAddressAndPort.getByName("127.0.0.100")),
TEST_LOCATION,
new NodeVersion(NodeVersion.CURRENT.cassandraVersion, Version.V0)));
}
catch (UnknownHostException e)
Expand All @@ -187,9 +194,6 @@ public void replayLocallyFromV0Snapshot() throws Throwable
ClusterMetadata cm = new MetadataSnapshots.SystemKeyspaceMetadataSnapshots().getSnapshot(ClusterMetadata.current().epoch);
cm.equals(ClusterMetadata.current());
});


}
}

}
12 changes: 0 additions & 12 deletions test/unit/org/apache/cassandra/Util.java
Original file line number Diff line number Diff line change
Expand Up @@ -151,13 +151,8 @@
import org.apache.cassandra.streaming.StreamResultFuture;
import org.apache.cassandra.streaming.StreamState;
import org.apache.cassandra.tcm.ClusterMetadata;
import org.apache.cassandra.tcm.membership.NodeAddresses;
import org.apache.cassandra.tcm.membership.NodeVersion;
import org.apache.cassandra.tcm.serialization.Version;
import org.apache.cassandra.tcm.transformations.Register;
import org.apache.cassandra.transport.ProtocolVersion;
import org.apache.cassandra.utils.ByteBufferUtil;
import org.apache.cassandra.utils.CassandraVersion;
import org.apache.cassandra.utils.CounterId;
import org.apache.cassandra.utils.FBUtilities;
import org.apache.cassandra.utils.FilterFactory;
Expand Down Expand Up @@ -1205,13 +1200,6 @@ public static void disableBloomFilter(ColumnFamilyStore cfs)
assertEquals(0, ((SSTableReaderWithFilter) reader).getFilterOffHeapSize());
}

public static void setUpgradeFromVersion(String version)
{
InetAddressAndPort ep = InetAddressAndPort.getByNameUnchecked("127.0.0.10");
Register.register(new NodeAddresses(ep),
new NodeVersion(new CassandraVersion(version), Version.OLD));
}

/**
* Sets the length of the file to given size. File will be created if not exist.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,17 +22,16 @@
import java.util.Collection;
import java.util.List;

import org.junit.AfterClass;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;

import org.apache.cassandra.ServerTestUtils;
import org.apache.cassandra.cql3.CQLTester;
import org.apache.cassandra.exceptions.InvalidRequestException;
import org.apache.cassandra.exceptions.SyntaxException;
import org.apache.cassandra.tcm.membership.NodeVersion;

/* InsertUpdateIfConditionCollectionsTest class has been split into multiple ones because of timeout issues (CASSANDRA-16670)
* Any changes here check if they apply to the other classes
Expand All @@ -46,18 +45,16 @@ public class InsertUpdateIfConditionCollectionsTest extends CQLTester
@Parameterized.Parameters(name = "{index}: clusterMinVersion={0}")
public static Collection<Object[]> data()
{
ServerTestUtils.daemonInitialization();

return InsertUpdateIfConditionTest.data();
}

@Parameterized.Parameter(0)
public String clusterMinVersion;
public NodeVersion clusterMinVersion;

@BeforeClass
public static void beforeClass()
public static void setUpClass()
{
InsertUpdateIfConditionTest.beforeClass();
InsertUpdateIfConditionTest.setUpClass();
}

@Before
Expand All @@ -66,12 +63,6 @@ public void before()
InsertUpdateIfConditionTest.beforeSetup(clusterMinVersion);
}

@AfterClass
public static void afterClass()
{
InsertUpdateIfConditionTest.afterClass();
}

/**
* Migrated from cql_tests.py:TestCQL.bug_6069_test()
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@

import java.util.Collection;

import org.junit.AfterClass;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
Expand All @@ -29,6 +28,7 @@

import org.apache.cassandra.ServerTestUtils;
import org.apache.cassandra.cql3.CQLTester;
import org.apache.cassandra.tcm.membership.NodeVersion;

/* InsertUpdateIfConditionCollectionsTest class has been split into multiple ones because of timeout issues (CASSANDRA-16670)
* Any changes here check if they apply to the other classes
Expand All @@ -47,12 +47,12 @@ public static Collection<Object[]> data()
}

@Parameterized.Parameter(0)
public String clusterMinVersion;
public NodeVersion clusterMinVersion;

@BeforeClass
public static void beforeClass()
public static void setUpClass()
{
InsertUpdateIfConditionTest.beforeClass();
InsertUpdateIfConditionTest.setUpClass();
}

@Before
Expand All @@ -61,12 +61,6 @@ public void before()
InsertUpdateIfConditionTest.beforeSetup(clusterMinVersion);
}

@AfterClass
public static void afterClass()
{
InsertUpdateIfConditionTest.afterClass();
}

/**
* Migrated from cql_tests.py:TestCQL.static_columns_cas_test()
*/
Expand Down
Loading