Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions .gitmodules
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
[submodule "modules/accord"]
path = modules/accord
url = https://github.com/apache/cassandra-accord.git
branch = trunk
url = https://github.com/alanwang67/cassandra-accord.git
branch = deleteCommandStores
Original file line number Diff line number Diff line change
Expand Up @@ -988,9 +988,9 @@ UnfilteredRowIterator result(JournalKey journalKey, DecoratedKey partitionKey) t
if (read.kind() == Repeat && !hasWritten)
{
Invariants.require(lastImage != null);
write = new TopologyImage(read.epoch(), Image, lastImage.getUpdate());
write = new TopologyImage(read.epoch(), Image, lastImage.update());
}
else if (hasWritten && read.kind() == Repeat && lastImage.getUpdate().isEquivalent(read.getUpdate()))
else if (hasWritten && read.kind() == Repeat && lastImage.update().isEquivalent(read.update()))
{
write = read.asRepeat();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -405,12 +405,13 @@ public TopologyUpdate next()
logger.error("Encountered TopologyImage Repeat record for epoch {}, but no prior image record was found", ref.key().id.epoch());
return null;
}
prev = reader.read().asImage(Invariants.nonNull(prev.getUpdate()));
prev = reader.read().asImage(Invariants.nonNull(prev.update()));
}
else prev = reader.read();

return new TopologyUpdate(prev.getUpdate().commandStores,
prev.getUpdate().global);
return new TopologyUpdate(prev.update().commandStores,
prev.update().global,
prev.update().previouslyOwned);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@

import accord.api.Journal;
import accord.local.CommandStores;
import accord.local.CommandStores.PreviouslyOwned;
import accord.primitives.Ranges;
import accord.topology.Topology;
import accord.utils.Invariants;
Expand All @@ -48,7 +49,7 @@ public interface AccordTopologyUpdate
long epoch();
AccordTopologyUpdate asRepeat();

Journal.TopologyUpdate getUpdate();
Journal.TopologyUpdate update();
static AccordTopologyUpdate newTopology(Journal.TopologyUpdate update)
{
return new NewTopology(update);
Expand Down Expand Up @@ -97,12 +98,20 @@ public long serializedSize(CommandStores.RangesForEpoch from)

class TopologyUpdateSerializer implements UnversionedSerializer<Journal.TopologyUpdate>
{
private static final int TOP_BIT = 0x40000000;
public static final TopologyUpdateSerializer instance = new TopologyUpdateSerializer();

@Override
public void serialize(Journal.TopologyUpdate from, DataOutputPlus out) throws IOException
{
out.writeUnsignedVInt32(from.commandStores.size());
out.writeUnsignedVInt32(from.commandStores.size() | TOP_BIT);
out.writeUnsignedVInt32(0);
out.writeUnsignedVInt32(from.previouslyOwned.size());
for (int i = 0 ; i < from.previouslyOwned.size() ; ++i)
{
out.writeUnsignedVInt(from.previouslyOwned.epochs(i));
KeySerializers.ranges.serialize(from.previouslyOwned.ranges(i), out);
}
for (Map.Entry<Integer, CommandStores.RangesForEpoch> e : from.commandStores.entrySet())
{
out.writeUnsignedVInt32(e.getKey());
Expand All @@ -115,6 +124,23 @@ public void serialize(Journal.TopologyUpdate from, DataOutputPlus out) throws IO
public Journal.TopologyUpdate deserialize(DataInputPlus in) throws IOException
{
int commandStoresSize = in.readUnsignedVInt32();
int flags = 0;
PreviouslyOwned previouslyOwned = PreviouslyOwned.EMPTY;
if ((commandStoresSize & TOP_BIT) != 0)
{
commandStoresSize ^= TOP_BIT;
// future proofing
flags = in.readUnsignedVInt32();
int previouslyOwnedSize = in.readUnsignedVInt32();
long[] epochs = new long[previouslyOwnedSize];
Ranges[] ranges = new Ranges[previouslyOwnedSize];
for (int i = 0 ; i < previouslyOwnedSize ; ++i)
{
epochs[i] = in .readUnsignedVInt();
ranges[i] = KeySerializers.ranges.deserialize(in);
}
previouslyOwned = new PreviouslyOwned(epochs.length == 0 ? 0 : epochs[0], epochs, ranges);
}
Int2ObjectHashMap<CommandStores.RangesForEpoch> commandStores = new Int2ObjectHashMap<>();
for (int j = 0; j < commandStoresSize; j++)
{
Expand All @@ -123,13 +149,20 @@ public Journal.TopologyUpdate deserialize(DataInputPlus in) throws IOException
commandStores.put(commandStoreId, rangesForEpoch);
}
Topology global = TopologySerializers.compactTopology.deserialize(in);
return new Journal.TopologyUpdate(commandStores, global);
return new Journal.TopologyUpdate(commandStores, global, previouslyOwned);
}

@Override
public long serializedSize(Journal.TopologyUpdate from)
{
long size = TypeSizes.sizeofUnsignedVInt(from.commandStores.size());
long size = TypeSizes.sizeofUnsignedVInt(from.commandStores.size() | TOP_BIT);
size += TypeSizes.sizeofUnsignedVInt(0);
size += TypeSizes.sizeofUnsignedVInt(from.previouslyOwned.size());
for (int i = 0 ; i < from.previouslyOwned.size() ; ++i)
{
size += TypeSizes.sizeofUnsignedVInt(from.previouslyOwned.epochs(i));
size += KeySerializers.ranges.serializedSize(from.previouslyOwned.ranges(i));
}
for (Map.Entry<Integer, CommandStores.RangesForEpoch> e : from.commandStores.entrySet())
{
size += TypeSizes.sizeofUnsignedVInt(e.getKey());
Expand Down Expand Up @@ -289,7 +322,7 @@ public long epoch()
}

@Override
public Journal.TopologyUpdate getUpdate()
public Journal.TopologyUpdate update()
{
return update;
}
Expand Down Expand Up @@ -350,7 +383,7 @@ public long epoch()
}

@Override
public Journal.TopologyUpdate getUpdate()
public Journal.TopologyUpdate update()
{
return update;
}
Expand Down Expand Up @@ -413,7 +446,7 @@ public TopologyImage read()
public void read(AccordTopologyUpdate update)
{
if (Objects.requireNonNull(update.kind()) == Kind.New)
read = new TopologyImage(update.epoch(), Kind.Image, update.getUpdate());
read = new TopologyImage(update.epoch(), Kind.Image, update.update());
else
read = (TopologyImage) update;
write = read;
Expand Down
74 changes: 74 additions & 0 deletions src/java/org/apache/cassandra/tcm/sequences/Move.java
Original file line number Diff line number Diff line change
Expand Up @@ -27,12 +27,22 @@
import java.util.concurrent.ExecutionException;
import java.util.concurrent.atomic.AtomicBoolean;

import javax.annotation.Nullable;

import com.google.common.annotations.VisibleForTesting;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import accord.api.TopologyListener;
import accord.primitives.Ranges;
import accord.primitives.Routables;
import accord.topology.ActiveEpoch;
import accord.topology.ActiveEpochs;
import accord.topology.EpochReady;
import accord.topology.Topology;
import accord.topology.TopologyManager;
import accord.topology.TopologyManager.RegainingEpochRange;

import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.db.SystemKeyspace;
Expand All @@ -53,6 +63,7 @@
import org.apache.cassandra.schema.Schema;
import org.apache.cassandra.service.StorageService;
import org.apache.cassandra.service.accord.AccordService;
import org.apache.cassandra.service.accord.AccordTopology;
import org.apache.cassandra.streaming.StreamOperation;
import org.apache.cassandra.streaming.StreamPlan;
import org.apache.cassandra.streaming.StreamResultFuture;
Expand All @@ -73,17 +84,21 @@
import org.apache.cassandra.tcm.transformations.PrepareMove;
import org.apache.cassandra.utils.FBUtilities;
import org.apache.cassandra.utils.JVMStabilityInspector;
import org.apache.cassandra.utils.concurrent.Condition;
import org.apache.cassandra.utils.concurrent.Future;
import org.apache.cassandra.utils.concurrent.FutureCombiner;
import org.apache.cassandra.utils.vint.VIntCoding;

import static accord.primitives.AbstractRanges.UnionMode.MERGE_ADJACENT;
import static accord.primitives.Routables.Slice.Minimal;
import static com.google.common.collect.ImmutableList.of;
import static org.apache.cassandra.tcm.MultiStepOperation.Kind.MOVE;
import static org.apache.cassandra.tcm.Transformation.Kind.FINISH_MOVE;
import static org.apache.cassandra.tcm.Transformation.Kind.MID_MOVE;
import static org.apache.cassandra.tcm.Transformation.Kind.START_MOVE;
import static org.apache.cassandra.tcm.sequences.SequenceState.continuable;
import static org.apache.cassandra.tcm.sequences.SequenceState.error;
import static org.apache.cassandra.utils.concurrent.Condition.newOneTimeCondition;

public class Move extends MultiStepOperation<Epoch>
{
Expand Down Expand Up @@ -194,15 +209,69 @@ public Transformation.Result applyTo(ClusterMetadata metadata)
return applyMultipleTransformations(metadata, next, of(startMove, midMove, finishMove));
}

static class WaitForEpochAndRangeRetirement implements TopologyListener
{
final Condition condition = newOneTimeCondition();
final long waitingForEpoch;
final Ranges waitingForRanges;
Ranges retiredRanges;

public WaitForEpochAndRangeRetirement(long waitingForEpoch, Ranges waitingForRanges)
{
this.waitingForEpoch = waitingForEpoch;
this.waitingForRanges = waitingForRanges;
this.retiredRanges = Ranges.EMPTY;
}

synchronized void updateRetiredRanges(Ranges ranges)
{
ranges = ranges.slice(waitingForRanges, Minimal).without(retiredRanges);
if (!ranges.isEmpty())
{
retiredRanges = retiredRanges.union(MERGE_ADJACENT, ranges);
if (retiredRanges.containsAll(waitingForRanges))
condition.signal();
}
}

@Override
public synchronized void onEpochRetired(Ranges ranges, long epoch, @Nullable Topology topology)
{
if (epoch >= waitingForEpoch)
updateRetiredRanges(ranges);
}
}

@Override
public SequenceState executeNext()
{
switch (next)
{
case START_MOVE:
WaitForEpochAndRangeRetirement wait = null;

try
{
ClusterMetadata metadata = ClusterMetadata.current();
TopologyManager topologyManager = AccordService.instance().topology();
AccordService.toFuture(topologyManager.await(metadata.epoch.getEpoch() - 1, null))
.awaitThrowUncheckedOnInterrupt().rethrowIfFailed();

ActiveEpochs activeEpochs = topologyManager.active();
Topology current = activeEpochs.globalForEpoch(metadata.epoch.getEpoch() - 1);
RegainingEpochRange regaining = topologyManager.computeRegaining(current, AccordTopology.createAccordTopology(applyTo(metadata).success().metadata));

if (regaining != null)
{
Condition condition = newOneTimeCondition();
wait = new WaitForEpochAndRangeRetirement(regaining.epoch(), regaining.ranges());
topologyManager.addListener(wait);
ActiveEpoch e = activeEpochs.ifExists(regaining.epoch());
if (e != null)
wait.updateRetiredRanges(e.retired());
condition.awaitThrowUncheckedOnInterrupt();
}

logger.info("Moving {} from {} to {}.",
metadata.directory.endpoint(startMove.nodeId()),
metadata.tokenMap.tokens(startMove.nodeId()),
Expand All @@ -214,6 +283,11 @@ public SequenceState executeNext()
JVMStabilityInspector.inspectThrowable(t);
return continuable();
}
finally
{
if (wait != null)
AccordService.instance().topology().removeListener(wait);
}
break;
case MID_MOVE:
try
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.cassandra.distributed.test.accord;

import java.io.IOException;
import java.util.Arrays;
import java.util.HashSet;
import java.util.List;
import java.util.Set;

import accord.local.CommandStore;
import accord.local.PreLoadContext;
import accord.primitives.Ranges;
import org.apache.cassandra.distributed.api.Feature;
import org.apache.cassandra.service.StorageService;
import org.apache.cassandra.service.accord.AccordService;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import static org.apache.cassandra.service.accord.AccordService.getBlocking;
import static com.google.common.collect.Iterables.getOnlyElement;
import static org.junit.Assert.assertFalse;

import org.junit.BeforeClass;
import org.junit.Test;

public class AccordDeleteCommandStoreTest extends AccordTestBase
{
private static final Logger logger = LoggerFactory.getLogger(AccordRegainRangesTest.class);

@Override
protected Logger logger()
{
return logger;
}

@BeforeClass
public static void setupClass() throws IOException
{
AccordTestBase.setupCluster(builder -> builder
.withoutVNodes()
.withConfig(config ->
config
.set("accord.shard_durability_target_splits", "1")
.set("accord.shard_durability_cycle", "20s")
.with(Feature.NETWORK, Feature.GOSSIP)), 6);
}

@Test
public void deleteCommandStoresTest() throws Throwable
{
List<String> ddls = Arrays.asList("DROP KEYSPACE IF EXISTS " + KEYSPACE + ';',
"CREATE KEYSPACE " + KEYSPACE + " WITH REPLICATION={'class':'SimpleStrategy', 'replication_factor': 1}",
"CREATE TABLE " + qualifiedAccordTableName + " (k int PRIMARY KEY, v int) WITH transactional_mode='full'");
test(ddls, cluster -> {
String newToken = cluster.get(1).callOnInstance(() -> getOnlyElement(StorageService.instance.getTokens()));
String originalToken = cluster.get(2).callOnInstance(() -> getOnlyElement(StorageService.instance.getTokens()));

cluster.get(2).runOnInstance(() -> {
StorageService.instance.move(Long.toString(Long.parseLong(newToken) + 100));
});

cluster.get(2).runOnInstance(() -> {
Set<Integer> commandStoresThatWillBeRemoved = new HashSet<>();

for (CommandStore commandStore : AccordService.instance().node().commandStores().all())
{
Ranges ranges = getBlocking(commandStore.submit((PreLoadContext.Empty) () -> "Get rangesForEpoch", safeCommandStore -> safeCommandStore.ranges().currentRanges()));

if (ranges.isEmpty())
commandStoresThatWillBeRemoved.add(commandStore.id());
}

StorageService.instance.move(originalToken);

for (CommandStore commandStore : AccordService.instance().node().commandStores().all())
{
assertFalse(commandStoresThatWillBeRemoved.contains(commandStore.id()));
}
});
});
}
}

Loading