Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,12 @@ public enum ProtocolBitmaskFeature {
/**
* Client supports SQL_UPDATE_COUNTERS_2 error extension (single binary value instead of array).
*/
SQL_UPDATE_COUNTERS_2(18);
SQL_UPDATE_COUNTERS_2(18),

/**
* Allow rolling back direct transactions using the first request id.
*/
TX_ROLLBACK_USING_FIRST_REQUEST(19);

private static final EnumSet<ProtocolBitmaskFeature> ALL_FEATURES_AS_ENUM_SET =
EnumSet.allOf(ProtocolBitmaskFeature.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -565,6 +565,7 @@ public void testServerReturnsAllItsFeatures() throws IOException {
expected.set(16);
expected.set(17);
expected.set(18);
expected.set(19);

assertEquals(expected, supportedFeatures);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,8 @@ public class ClientHandlerModule implements IgniteComponent, PlatformComputeTran
ProtocolBitmaskFeature.TX_DIRECT_MAPPING_SEND_REMOTE_WRITES,
ProtocolBitmaskFeature.SQL_PARTITION_AWARENESS_TABLE_NAME,
ProtocolBitmaskFeature.TX_DIRECT_MAPPING_SEND_DISCARD,
ProtocolBitmaskFeature.SQL_UPDATE_COUNTERS_2
ProtocolBitmaskFeature.SQL_UPDATE_COUNTERS_2,
ProtocolBitmaskFeature.TX_ROLLBACK_USING_FIRST_REQUEST
));

/** Connection id generator.
Expand Down

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ public class ClientSqlExecuteBatchRequest {
* @param cancelHandleMap Registry of handlers. Request must register itself in this registry before switching to another
* thread.
* @param username Authenticated user name.
* @param reqToTxMap Tracker for first request of direct transactions.
* @return Future representing result of operation.
*/
public static CompletableFuture<ResponseWriter> process(
Expand All @@ -58,6 +59,7 @@ public static CompletableFuture<ResponseWriter> process(
long requestId,
Map<Long, CancelHandle> cancelHandleMap,
HybridTimestampTracker tsTracker,
Map<Long, Long> reqToTxMap,
String username
) {
CancelHandle cancelHandle = CancelHandle.create();
Expand All @@ -71,7 +73,9 @@ public static CompletableFuture<ResponseWriter> process(
null,
null,
null,
null
null,
requestId,
reqToTxMap
);

ClientSqlProperties props = new ClientSqlProperties(in, false);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ public class ClientSqlExecuteRequest {
* transaction.
* @param notificationSender Notification sender is required to send acknowledge for underlying write operation within a remote
* transaction.
* @param reqToTxMap Tracker for first request of direct transactions.
* @param username Authenticated user name or {@code null} for unknown user.
* @return Future representing result of operation.
*/
Expand All @@ -97,6 +98,7 @@ public static CompletableFuture<ResponseWriter> process(
IgniteTables tables,
ClockService clockService,
NotificationSender notificationSender,
Map<Long, Long> reqToTxMap,
@Nullable String username,
boolean sqlMultistatementSupported,
boolean sqlPartitionAwarenessQualifiedNameSupported,
Expand All @@ -119,7 +121,9 @@ public static CompletableFuture<ResponseWriter> process(
txManager,
tables,
notificationSender,
resIdHolder
resIdHolder,
requestId,
reqToTxMap
);

ClientSqlProperties props = new ClientSqlProperties(in, sqlMultistatementSupported);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import static org.apache.ignite.client.handler.requests.table.ClientTableCommon.readTx;

import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import org.apache.ignite.client.handler.ClientHandlerMetricSource;
Expand All @@ -43,6 +44,8 @@ public class ClientSqlQueryMetadataRequest {
* @param in Unpacker.
* @param processor SQL API.
* @param resources Resources.
* @param requestId Id of the request.
* @param reqToTxMap Tracker for first request of direct transactions.
* @return Future representing result of operation.
*/
public static CompletableFuture<ResponseWriter> process(
Expand All @@ -51,9 +54,21 @@ public static CompletableFuture<ResponseWriter> process(
QueryProcessor processor,
ClientResourceRegistry resources,
ClientHandlerMetricSource metrics,
HybridTimestampTracker tsTracker
HybridTimestampTracker tsTracker,
long requestId,
Map<Long, Long> reqToTxMap
) {
CompletableFuture<InternalTransaction> txFut = readTx(in, tsTracker, resources, metrics, null, null, null, null);
CompletableFuture<InternalTransaction> txFut = readTx(in,
tsTracker,
resources,
metrics,
null,
null,
null,
null,
requestId,
reqToTxMap
);

String schema = in.unpackString();
String query = in.unpackString();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import java.util.BitSet;
import java.util.Collection;
import java.util.EnumSet;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import org.apache.ignite.client.handler.ClientHandlerMetricSource;
Expand Down Expand Up @@ -422,6 +423,8 @@ public static TableNotFoundException tableIdNotFoundException(Integer tableId) {
* @param txManager Tx manager.
* @param notificationSender Notification sender.
* @param resourceIdHolder Resource id holder.
* @param requestId Id of the request.
* @param reqToTxMap Tracker for first request of direct transactions.
* @return Transaction, if present, or null.
*/
public static CompletableFuture<@Nullable InternalTransaction> readTx(
Expand All @@ -432,7 +435,9 @@ public static TableNotFoundException tableIdNotFoundException(Integer tableId) {
@Nullable TxManager txManager,
@Nullable IgniteTables tables,
@Nullable NotificationSender notificationSender,
long[] resourceIdHolder
long[] resourceIdHolder,
long requestId,
Map<Long, Long> reqToTxMap
) {
return readTx(
in,
Expand All @@ -443,6 +448,8 @@ public static TableNotFoundException tableIdNotFoundException(Integer tableId) {
tables,
notificationSender,
resourceIdHolder,
requestId,
reqToTxMap,
EnumSet.noneOf(RequestOptions.class)
);
}
Expand All @@ -456,6 +463,8 @@ public static TableNotFoundException tableIdNotFoundException(Integer tableId) {
* @param txManager Tx manager.
* @param notificationSender Notification sender.
* @param resourceIdHolder Resource id holder.
* @param requestId Id of the request.
* @param reqToTxMap Tracker for first request of direct transactions.
* @param options Request options. Defines how a request is processed.
* @return Transaction, if present, or null.
*/
Expand All @@ -468,6 +477,8 @@ public static TableNotFoundException tableIdNotFoundException(Integer tableId) {
@Nullable IgniteTables tables,
@Nullable NotificationSender notificationSender,
long[] resourceIdHolder,
long requestId,
Map<Long, Long> reqToTxMap,
EnumSet<RequestOptions> options
) {
if (in.tryUnpackNil()) {
Expand Down Expand Up @@ -509,11 +520,18 @@ public static TableNotFoundException tableIdNotFoundException(Integer tableId) {
});

InternalTxOptions txOptions = builder.build();
var tx = startExplicitTx(tsUpdater, txManager, HybridTimestamp.nullableHybridTimestamp(observableTs), readOnly, txOptions);
var tx = new DirectTransactionWithFirstRequest(
startExplicitTx(tsUpdater, txManager, HybridTimestamp.nullableHybridTimestamp(observableTs), readOnly, txOptions),
reqToTxMap,
requestId
);

// Attach resource id only on first direct request.
resourceIdHolder[0] = resources.put(new ClientResource(tx, tx::rollbackAsync));

// Record the mapping between first request and resourceId.
reqToTxMap.put(requestId, resourceIdHolder[0]);

metrics.transactionsActiveIncrement();

return completedFuture(tx);
Expand Down Expand Up @@ -596,9 +614,23 @@ static CompletableFuture<InternalTransaction> readOrStartImplicitTx(
IgniteTables tables,
EnumSet<RequestOptions> options,
@Nullable NotificationSender notificationSender,
long[] resourceIdHolder
long[] resourceIdHolder,
long requestId,
Map<Long, Long> reqToTxMap
) {
return readTx(in, readTs, resources, metrics, txManager, tables, notificationSender, resourceIdHolder, options)
return readTx(
in,
readTs,
resources,
metrics,
txManager,
tables,
notificationSender,
resourceIdHolder,
requestId,
reqToTxMap,
options
)
.thenApply(tx -> {
if (tx == null) {
// Implicit transactions do not use an observation timestamp because RW never depends on it,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import static org.apache.ignite.client.handler.requests.table.ClientTupleRequestBase.RequestOptions.KEY_ONLY;

import java.util.EnumSet;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import org.apache.ignite.client.handler.ClientHandlerMetricSource;
import org.apache.ignite.client.handler.ClientResourceRegistry;
Expand All @@ -47,6 +48,8 @@ public class ClientTupleContainsAllKeysRequest {
* @param txManager Transaction manager.
* @param clockService Clock service.
* @param tsTracker Tracker.
* @param requestId Id of the request.
* @param reqToTxMap Tracker for first request of direct transactions.
* @param supportsOptions {@code True} if supports tx options.
* @return Future.
*/
Expand All @@ -58,11 +61,13 @@ public static CompletableFuture<ResponseWriter> process(
TxManager txManager,
ClockService clockService,
HybridTimestampTracker tsTracker,
long requestId,
Map<Long, Long> reqToTxMap,
boolean supportsOptions
) {
EnumSet<RequestOptions> options = supportsOptions ? of(KEY_ONLY, HAS_OPTIONS) : of(KEY_ONLY);

return ClientTuplesRequestBase.readAsync(in, tables, resources, metrics, txManager, null, tsTracker, options)
return ClientTuplesRequestBase.readAsync(in, tables, resources, metrics, txManager, null, tsTracker, options, requestId, reqToTxMap)
.thenCompose(req -> req.table().recordView().containsAllAsync(req.tx(), req.tuples())
.thenApply(containsAll -> out -> {
writeTxMeta(out, tsTracker, clockService, req);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,9 @@
import static org.apache.ignite.client.handler.requests.table.ClientTableCommon.writeTxMeta;
import static org.apache.ignite.client.handler.requests.table.ClientTupleRequestBase.RequestOptions.KEY_ONLY;
import static org.apache.ignite.client.handler.requests.table.ClientTupleRequestBase.RequestOptions.READ_ONLY;
import static org.apache.ignite.client.handler.requests.table.ClientTupleRequestBase.readAsync;

import java.util.Map;
import java.util.concurrent.CompletableFuture;
import org.apache.ignite.client.handler.ClientHandlerMetricSource;
import org.apache.ignite.client.handler.ClientResourceRegistry;
Expand All @@ -43,6 +45,8 @@ public class ClientTupleContainsKeyRequest {
* @param tables Ignite tables.
* @param resources Resource registry.
* @param txManager Transaction manager.
* @param requestId Id of the request.
* @param reqToTxMap Tracker for first request of direct transactions.
* @return Future.
*/
public static CompletableFuture<ResponseWriter> process(
Expand All @@ -52,9 +56,11 @@ public static CompletableFuture<ResponseWriter> process(
ClientHandlerMetricSource metrics,
TxManager txManager,
ClockService clockService,
HybridTimestampTracker tsTracker
HybridTimestampTracker tsTracker,
long requestId,
Map<Long, Long> reqToTxMap
) {
return ClientTupleRequestBase.readAsync(in, tables, resources, metrics, txManager, null, tsTracker, of(READ_ONLY, KEY_ONLY))
return readAsync(in, tables, resources, metrics, txManager, null, tsTracker, of(READ_ONLY, KEY_ONLY), requestId, reqToTxMap)
.thenCompose(req -> req.table().recordView().containsAsync(req.tx(), req.tuple())
.thenApply(res -> out -> {
writeTxMeta(out, tsTracker, clockService, req);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import static org.apache.ignite.client.handler.requests.table.ClientTableCommon.writeTxMeta;
import static org.apache.ignite.client.handler.requests.table.ClientTuplesRequestBase.readAsync;

import java.util.Map;
import java.util.concurrent.CompletableFuture;
import org.apache.ignite.client.handler.ClientHandlerMetricSource;
import org.apache.ignite.client.handler.ClientResourceRegistry;
Expand All @@ -45,6 +46,8 @@ public class ClientTupleDeleteAllExactRequest {
* @param tables Ignite tables.
* @param resources Resource registry.
* @param txManager Ignite transactions.
* @param requestId Id of the request.
* @param reqToTxMap Tracker for first request of direct transactions.
* @return Future.
*/
public static CompletableFuture<ResponseWriter> process(
Expand All @@ -55,13 +58,27 @@ public static CompletableFuture<ResponseWriter> process(
TxManager txManager,
ClockService clockService,
NotificationSender notificationSender,
HybridTimestampTracker tsTracker
HybridTimestampTracker tsTracker,
long requestId,
Map<Long, Long> reqToTxMap
) {
return readAsync(in, tables, resources, metrics, txManager, notificationSender, tsTracker, noneOf(RequestOptions.class))
return readAsync(
in,
tables,
resources,
metrics,
txManager,
notificationSender,
tsTracker,
noneOf(RequestOptions.class),
requestId,
reqToTxMap
)
.thenCompose(req -> req.table().recordView().deleteAllExactAsync(req.tx(), req.tuples())
.thenApply(skippedTuples -> out -> {
writeTxMeta(out, tsTracker, clockService, req);
writeTuples(out, skippedTuples, req.table().schemaView());
}));
})
);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,9 @@
import static org.apache.ignite.client.handler.requests.table.ClientTableCommon.writeTuples;
import static org.apache.ignite.client.handler.requests.table.ClientTableCommon.writeTxMeta;
import static org.apache.ignite.client.handler.requests.table.ClientTupleRequestBase.RequestOptions.KEY_ONLY;
import static org.apache.ignite.client.handler.requests.table.ClientTuplesRequestBase.readAsync;

import java.util.Map;
import java.util.concurrent.CompletableFuture;
import org.apache.ignite.client.handler.ClientHandlerMetricSource;
import org.apache.ignite.client.handler.ClientResourceRegistry;
Expand All @@ -45,6 +47,8 @@ public class ClientTupleDeleteAllRequest {
* @param tables Ignite tables.
* @param resources Resource registry.
* @param txManager Ignite transactions.
* @param requestId Id of the request.
* @param reqToTxMap Tracker for first request of direct transactions.
* @return Future.
*/
public static CompletableFuture<ResponseWriter> process(
Expand All @@ -55,9 +59,11 @@ public static CompletableFuture<ResponseWriter> process(
TxManager txManager,
ClockService clockService,
NotificationSender notificationSender,
HybridTimestampTracker tsTracker
HybridTimestampTracker tsTracker,
long requestId,
Map<Long, Long> reqToTxMap
) {
return ClientTuplesRequestBase.readAsync(in, tables, resources, metrics, txManager, notificationSender, tsTracker, of(KEY_ONLY))
return readAsync(in, tables, resources, metrics, txManager, notificationSender, tsTracker, of(KEY_ONLY), requestId, reqToTxMap)
.thenCompose(req -> req.table().recordView().deleteAllAsync(req.tx(), req.tuples())
.thenApply(skippedTuples -> out -> {
writeTxMeta(out, tsTracker, clockService, req);
Expand Down
Loading