Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,11 @@

import com.datastax.dse.driver.api.core.graph.FluentGraphStatement;
import com.datastax.oss.driver.api.core.ConsistencyLevel;
import com.datastax.oss.driver.api.core.RequestRoutingType;
import com.datastax.oss.driver.api.core.config.DriverExecutionProfile;
import com.datastax.oss.driver.api.core.cql.Statement;
import com.datastax.oss.driver.api.core.metadata.Node;
import edu.umd.cs.findbugs.annotations.Nullable;
import java.nio.ByteBuffer;
import java.time.Duration;
import java.util.Collections;
Expand Down Expand Up @@ -127,4 +129,10 @@ protected BytecodeGraphStatement newInstance(
readConsistencyLevel,
writeConsistencyLevel);
}

@Nullable
@Override
public RequestRoutingType getRequestType() {
return RequestRoutingType.REGULAR;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,12 @@

import com.datastax.dse.driver.api.core.graph.BatchGraphStatement;
import com.datastax.oss.driver.api.core.ConsistencyLevel;
import com.datastax.oss.driver.api.core.RequestRoutingType;
import com.datastax.oss.driver.api.core.config.DriverExecutionProfile;
import com.datastax.oss.driver.api.core.metadata.Node;
import com.datastax.oss.driver.shaded.guava.common.collect.ImmutableList;
import edu.umd.cs.findbugs.annotations.NonNull;
import edu.umd.cs.findbugs.annotations.Nullable;
import java.nio.ByteBuffer;
import java.time.Duration;
import java.util.Iterator;
Expand Down Expand Up @@ -151,4 +153,10 @@ protected BatchGraphStatement newInstance(
public Iterator<GraphTraversal> iterator() {
return this.traversals.iterator();
}

@Nullable
@Override
public RequestRoutingType getRequestType() {
return RequestRoutingType.REGULAR;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,11 @@

import com.datastax.dse.driver.api.core.graph.FluentGraphStatement;
import com.datastax.oss.driver.api.core.ConsistencyLevel;
import com.datastax.oss.driver.api.core.RequestRoutingType;
import com.datastax.oss.driver.api.core.config.DriverExecutionProfile;
import com.datastax.oss.driver.api.core.metadata.Node;
import edu.umd.cs.findbugs.annotations.NonNull;
import edu.umd.cs.findbugs.annotations.Nullable;
import java.nio.ByteBuffer;
import java.time.Duration;
import java.util.Map;
Expand Down Expand Up @@ -103,4 +105,10 @@ protected FluentGraphStatement newInstance(
public GraphTraversal<?, ?> getTraversal() {
return traversal;
}

@Nullable
@Override
public RequestRoutingType getRequestType() {
return RequestRoutingType.REGULAR;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import com.datastax.dse.driver.api.core.graph.ScriptGraphStatement;
import com.datastax.oss.driver.api.core.ConsistencyLevel;
import com.datastax.oss.driver.api.core.RequestRoutingType;
import com.datastax.oss.driver.api.core.config.DriverExecutionProfile;
import com.datastax.oss.driver.api.core.metadata.Node;
import com.datastax.oss.protocol.internal.util.collection.NullAllowingImmutableMap;
Expand Down Expand Up @@ -204,4 +205,10 @@ protected ScriptGraphStatement newInstance(
public String toString() {
return String.format("ScriptGraphStatement['%s', params: %s]", this.script, this.queryParams);
}

@Nullable
@Override
public RequestRoutingType getRequestType() {
return RequestRoutingType.REGULAR;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
package com.datastax.oss.driver.api.core;

public enum RequestRoutingMethod {
REGULAR,
PRESERVE_REPLICA_ORDER,
TOKEN_BASED_REPLICA_SHUFFLING
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
package com.datastax.oss.driver.api.core;

public enum RequestRoutingType {
REGULAR,
LWT
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package com.datastax.oss.driver.api.core.cql;

import com.datastax.oss.driver.api.core.CqlIdentifier;
import com.datastax.oss.driver.api.core.RequestRoutingType;
import com.datastax.oss.driver.internal.core.cql.DefaultBatchStatement;
import com.datastax.oss.driver.shaded.guava.common.collect.ImmutableList;
import com.datastax.oss.driver.shaded.guava.common.collect.Iterables;
Expand Down Expand Up @@ -152,6 +153,8 @@ public BatchStatementBuilder clearStatements() {
@NonNull
public BatchStatement build() {
List<BatchableStatement<?>> statements = statementsBuilder.build();
RequestRoutingType routingType =
isLWT != null ? (isLWT ? RequestRoutingType.LWT : RequestRoutingType.REGULAR) : null;
return new DefaultBatchStatement(
batchType,
statements,
Expand All @@ -172,7 +175,7 @@ public BatchStatement build() {
timeout,
node,
nowInSeconds,
isLWT);
routingType);
}

public int getStatementsCount() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@

import com.datastax.oss.driver.api.core.CqlIdentifier;
import com.datastax.oss.driver.api.core.DefaultProtocolVersion;
import com.datastax.oss.driver.api.core.RequestRoutingMethod;
import com.datastax.oss.driver.api.core.RequestRoutingType;
import com.datastax.oss.driver.api.core.config.DefaultDriverOption;
import com.datastax.oss.driver.api.core.config.DriverConfig;
import com.datastax.oss.driver.api.core.config.DriverExecutionProfile;
Expand Down Expand Up @@ -199,4 +201,18 @@ default Partitioner getPartitioner() {
/** @return The node configured on this statement, or null if none is configured. */
@Nullable
Node getNode();

/**
* Returns the routing type configured on this statement.
*
* @return The routing method configured on this statement, or {@link RequestRoutingType#REGULAR}
* if none is configured.
*/
@Nullable
RequestRoutingType getRequestType();

@Nullable
default RequestRoutingMethod getRoutingMethod() {
return RequestRoutingMethod.REGULAR;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -97,11 +97,9 @@
import java.util.List;
import java.util.Map;
import java.util.Queue;
import java.util.Set;
import java.util.concurrent.CancellationException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
Expand Down Expand Up @@ -208,14 +206,6 @@ public void onThrottleReady(boolean wasDelayed) {
Queue<Node> queryPlan;
if (this.initialStatement.getNode() != null) {
queryPlan = new SimpleQueryPlan(this.initialStatement.getNode());
} else if (this.initialStatement.isLWT()) {
queryPlan =
getReplicas(
session.getKeyspace().orElse(null),
this.initialStatement,
context
.getLoadBalancingPolicyWrapper()
.newQueryPlan(initialStatement, executionProfile.getName(), session));
} else {
queryPlan =
context
Expand All @@ -226,26 +216,6 @@ public void onThrottleReady(boolean wasDelayed) {
sendRequest(initialStatement, null, queryPlan, 0, 0, true);
}

private Queue<Node> getReplicas(
CqlIdentifier loggedKeyspace, Statement<?> statement, Queue<Node> fallback) {
Token routingToken = getRoutingToken(statement);
CqlIdentifier keyspace = statement.getKeyspace();
if (keyspace == null) {
keyspace = statement.getRoutingKeyspace();
if (keyspace == null) {
keyspace = loggedKeyspace;
}
}

TokenMap tokenMap = context.getMetadataManager().getMetadata().getTokenMap().orElse(null);
if (routingToken == null || keyspace == null || tokenMap == null) {
return fallback;
}

Set<Node> replicas = tokenMap.getReplicas(keyspace, routingToken);
return new ConcurrentLinkedQueue<>(replicas);
}

public CompletionStage<AsyncResultSet> handle() {
return result;
}
Expand Down
Loading
Loading