diff --git a/core/src/main/java/io/questdb/client/Sender.java b/core/src/main/java/io/questdb/client/Sender.java
index b1c0677f..dccefb86 100644
--- a/core/src/main/java/io/questdb/client/Sender.java
+++ b/core/src/main/java/io/questdb/client/Sender.java
@@ -1036,7 +1036,6 @@ final class LineSenderBuilder {
private int maxBackoffMillis = PARAMETER_NOT_SET_EXPLICITLY;
private int maxDatagramSize = PARAMETER_NOT_SET_EXPLICITLY;
private int maxNameLength = PARAMETER_NOT_SET_EXPLICITLY;
- private int maxSchemasPerConnection = PARAMETER_NOT_SET_EXPLICITLY;
private int maximumBufferCapacity = PARAMETER_NOT_SET_EXPLICITLY;
private final HttpClientConfiguration httpClientConfiguration = new DefaultHttpClientConfiguration() {
@Override
@@ -1369,8 +1368,6 @@ public Sender build() {
long actualAutoFlushIntervalNanos = autoFlushIntervalMillis == PARAMETER_NOT_SET_EXPLICITLY
? DEFAULT_WS_AUTO_FLUSH_INTERVAL_NANOS
: TimeUnit.MILLISECONDS.toNanos(autoFlushIntervalMillis);
- int actualMaxSchemasPerConnection = maxSchemasPerConnection == PARAMETER_NOT_SET_EXPLICITLY
- ? QwpWebSocketSender.DEFAULT_MAX_SCHEMAS_PER_CONNECTION : maxSchemasPerConnection;
String wsAuthHeader = buildWebSocketAuthHeader();
@@ -1499,7 +1496,6 @@ public Sender build() {
actualAutoFlushBytes,
actualAutoFlushIntervalNanos,
wsAuthHeader,
- actualMaxSchemasPerConnection,
requestDurableAck,
cursorEngine,
actualCloseFlushTimeoutMillis,
@@ -2217,25 +2213,6 @@ public LineSenderBuilder maxNameLength(int maxNameLength) {
return this;
}
- /**
- * Sets the maximum number of distinct schemas the WebSocket sender may assign on one connection.
- */
- public LineSenderBuilder maxSchemasPerConnection(int maxSchemasPerConnection) {
- if (protocol != PARAMETER_NOT_SET_EXPLICITLY && protocol != PROTOCOL_WEBSOCKET) {
- throw new LineSenderException("max schemas per connection is only supported for WebSocket transport");
- }
- if (this.maxSchemasPerConnection != PARAMETER_NOT_SET_EXPLICITLY) {
- throw new LineSenderException("max schemas per connection was already configured")
- .put("[maxSchemasPerConnection=").put(this.maxSchemasPerConnection).put("]");
- }
- if (maxSchemasPerConnection < 1) {
- throw new LineSenderException("max schemas per connection must be positive")
- .put("[maxSchemasPerConnection=").put(maxSchemasPerConnection).put("]");
- }
- this.maxSchemasPerConnection = maxSchemasPerConnection;
- return this;
- }
-
/**
* Minimum expected throughput in bytes per second for HTTP requests.
*
@@ -3154,13 +3131,6 @@ private LineSenderBuilder fromConfig(CharSequence configurationString) {
} else {
throw new LineSenderException("invalid transaction [value=").put(sink).put(", allowed-values=[on, off]]");
}
- } else if (Chars.equals("max_schemas_per_connection", sink)) {
- if (protocol != PROTOCOL_WEBSOCKET) {
- throw new LineSenderException("max_schemas_per_connection is only supported for WebSocket transport");
- }
- pos = getValue(configurationString, pos, sink, "max_schemas_per_connection");
- int maxSchemas = parseIntValue(sink, "max_schemas_per_connection");
- maxSchemasPerConnection(maxSchemas);
} else if (Chars.equals("sf_dir", sink)) {
if (protocol != PROTOCOL_WEBSOCKET) {
throw new LineSenderException("sf_dir is only supported for WebSocket transport");
diff --git a/core/src/main/java/io/questdb/client/cutlass/qwp/client/ColumnView.java b/core/src/main/java/io/questdb/client/cutlass/qwp/client/ColumnView.java
index db7a51ba..1282d9f3 100644
--- a/core/src/main/java/io/questdb/client/cutlass/qwp/client/ColumnView.java
+++ b/core/src/main/java/io/questdb/client/cutlass/qwp/client/ColumnView.java
@@ -59,8 +59,8 @@
* {@code arrayRowAddr} accessors expose raw native pointers into the WebSocket
* payload buffer for SIMD or JNI consumers. The layout of the bytes behind
* these addresses follows the QWP wire format (see
- * {@code docs/qwp/wire-egress.md}); these methods are an expert API and
- * may shift as the wire format evolves.
+ * {@code https://questdb.com/docs/connect/wire-protocols/qwp-egress-websocket/});
+ * these methods are an expert API and may shift as the wire format evolves.
*
* Type contract / NULL handling. Each typed accessor delegates * to the same read path as the corresponding {@code QwpColumnBatch} diff --git a/core/src/main/java/io/questdb/client/cutlass/qwp/client/QwpColumnBatch.java b/core/src/main/java/io/questdb/client/cutlass/qwp/client/QwpColumnBatch.java index 20f0e82a..03f58141 100644 --- a/core/src/main/java/io/questdb/client/cutlass/qwp/client/QwpColumnBatch.java +++ b/core/src/main/java/io/questdb/client/cutlass/qwp/client/QwpColumnBatch.java @@ -618,7 +618,8 @@ public long getUuidLo(int col, int row) { /** * True if the cell is NULL on the wire. *
- * Note on type-specific sentinels (see {@code docs/qwp/wire-egress.md} sec 11.5): + * Note on type-specific sentinels (see + * {@code https://questdb.com/docs/connect/wire-protocols/qwp-egress-websocket/}): * QuestDB stores NULL as a sentinel value for several types -- {@code Long.MIN_VALUE} * for LONG/INT/etc., {@code 0.0.0.0} for IPv4, {@code -1} for GEOHASH, and crucially * {@code NaN} for FLOAT and DOUBLE. Egress preserves these conventions: a row carrying diff --git a/core/src/main/java/io/questdb/client/cutlass/qwp/client/QwpColumnBatchHandler.java b/core/src/main/java/io/questdb/client/cutlass/qwp/client/QwpColumnBatchHandler.java index a4ac77ee..6237e1de 100644 --- a/core/src/main/java/io/questdb/client/cutlass/qwp/client/QwpColumnBatchHandler.java +++ b/core/src/main/java/io/questdb/client/cutlass/qwp/client/QwpColumnBatchHandler.java @@ -106,8 +106,7 @@ default void onError(long requestId, byte status, String message) { * the query (with {@code failover=on}, the default). *
* {@code newNode} is the {@link QwpServerInfo} of the endpoint the client - * just bound to, or {@code null} if the new server negotiated the v1 - * protocol (no SERVER_INFO frame). + * just bound to. *
* After this callback fires, {@link #onBatch} will be invoked again with * {@code batch_seq} restarting at 0 on the new connection. Handlers that @@ -127,7 +126,7 @@ default void onFailoverReset(QwpServerInfo newNode) { * {@link #onFailoverReset(QwpServerInfo)} for backwards compatibility. * * @param requestId client-assigned request id of the query being replayed - * @param newNode the endpoint just bound to, or {@code null} if the new server negotiated v1 + * @param newNode the {@link QwpServerInfo} of the endpoint just bound to */ default void onFailoverReset(long requestId, QwpServerInfo newNode) { onFailoverReset(newNode); diff --git a/core/src/main/java/io/questdb/client/cutlass/qwp/client/QwpColumnWriter.java b/core/src/main/java/io/questdb/client/cutlass/qwp/client/QwpColumnWriter.java index 37833b99..f8cc669f 100644 --- a/core/src/main/java/io/questdb/client/cutlass/qwp/client/QwpColumnWriter.java +++ b/core/src/main/java/io/questdb/client/cutlass/qwp/client/QwpColumnWriter.java @@ -279,26 +279,16 @@ private void writeSymbolColumnWithGlobalIds(QwpTableBuffer.ColumnBuffer col, int } } - private void writeTableHeaderWithSchema(String tableName, int rowCount, int schemaId, QwpColumnDef[] columns) { + private void writeTableHeader(String tableName, int rowCount, QwpColumnDef[] columns) { buffer.putString(tableName); buffer.putVarint(rowCount); buffer.putVarint(columns.length); - buffer.putByte(SCHEMA_MODE_FULL); - buffer.putVarint(schemaId); for (QwpColumnDef col : columns) { buffer.putString(col.getName()); buffer.putByte(col.getWireTypeCode()); } } - private void writeTableHeaderWithSchemaRef(String tableName, int rowCount, int schemaId, int columnCount) { - buffer.putString(tableName); - buffer.putVarint(rowCount); - buffer.putVarint(columnCount); - buffer.putByte(SCHEMA_MODE_REFERENCE); - buffer.putVarint(schemaId); - } - private void writeTimestampColumn(long addr, int count, boolean useGorilla) { if (useGorilla && count > 2) { // Single pass: check feasibility and compute encoded size together @@ -325,12 +315,8 @@ private void writeTimestampColumn(long addr, int count, boolean useGorilla) { } } - void encodeTable(QwpTableBuffer tableBuffer, boolean useSchemaRef, boolean useGlobalSymbols, boolean useGorilla) { - int schemaId = tableBuffer.getSchemaId(); - if (schemaId < 0) { - schemaId = 0; - } - encodeTable(tableBuffer, tableBuffer.getRowCount(), null, null, null, useSchemaRef, useGlobalSymbols, useGorilla, schemaId); + void encodeTable(QwpTableBuffer tableBuffer, boolean useGlobalSymbols, boolean useGorilla) { + encodeTable(tableBuffer, tableBuffer.getRowCount(), null, null, null, useGlobalSymbols, useGorilla); } void encodeTable( @@ -339,23 +325,12 @@ void encodeTable( int[] limitedValueCounts, long[] limitedStringDataSizes, int[] limitedSymbolDictionarySizes, - boolean useSchemaRef, boolean useGlobalSymbols, - boolean useGorilla, - int schemaId + boolean useGorilla ) { QwpColumnDef[] columnDefs = tableBuffer.getColumnDefs(); - if (useSchemaRef) { - writeTableHeaderWithSchemaRef( - tableBuffer.getTableName(), - rowCount, - schemaId, - columnDefs.length - ); - } else { - writeTableHeaderWithSchema(tableBuffer.getTableName(), rowCount, schemaId, columnDefs); - } + writeTableHeader(tableBuffer.getTableName(), rowCount, columnDefs); for (int i = 0; i < tableBuffer.getColumnCount(); i++) { QwpTableBuffer.ColumnBuffer col = tableBuffer.getColumn(i); diff --git a/core/src/main/java/io/questdb/client/cutlass/qwp/client/QwpEgressIoThread.java b/core/src/main/java/io/questdb/client/cutlass/qwp/client/QwpEgressIoThread.java index 9752ace8..228360c7 100644 --- a/core/src/main/java/io/questdb/client/cutlass/qwp/client/QwpEgressIoThread.java +++ b/core/src/main/java/io/questdb/client/cutlass/qwp/client/QwpEgressIoThread.java @@ -214,11 +214,10 @@ public void onBinaryMessage(long payloadPtr, int payloadLen) { decodeAndEmitError(payloadPtr, payloadLen); currentQueryDone = true; } else if (msgKind == QwpEgressMsgKind.CACHE_RESET) { - // Server reached a configured soft cap on the connection-scoped - // SYMBOL dict or schema-fingerprint cache. Drop the indicated - // caches on this side so the next RESULT_BATCH's deltaStart and - // schema-reference ids line up with the server's fresh counter. - // No user-visible event -- CACHE_RESET never arrives between the + // Server reached its configured soft cap on the connection-scoped + // SYMBOL dict. Drop the dict on this side so the next RESULT_BATCH's + // deltaStart lines up with the server's fresh counter. No + // user-visible event -- CACHE_RESET never arrives between the // RESULT_BATCH / RESULT_END / EXEC_DONE / QUERY_ERROR of a query // and the user callback, only after it. handleCacheReset(payloadPtr, payloadLen); @@ -318,6 +317,10 @@ public void run() { currentQueryDone = false; currentRequestId = req.requestId; creditEnabled = req.initialCredit > 0L; + // The schema rides the first RESULT_BATCH (batch_seq == 0) of each + // query; invalidate any schema left from the prior query so a + // continuation batch can't bind rows to a stale schema. + decoder.resetQuerySchema(); sendQueryRequest(req); while (!currentQueryDone && !shutdown) { @@ -573,8 +576,8 @@ private void emitTransportErrorBlocking(String message) { /** * Decodes a {@code CACHE_RESET} frame body and clears the indicated - * connection-scoped caches on the client side. Body is a single byte mask: - * bit 0 = SYMBOL dict, bit 1 = schema-fingerprint cache. + * connection-scoped caches on the client side. Body is a single byte mask; + * bit 0 = SYMBOL dict is the only bit defined. */ private void handleCacheReset(long payloadPtr, int payloadLen) { int bodyStart = QwpConstants.HEADER_SIZE + 1; // msg_kind byte consumed by caller diff --git a/core/src/main/java/io/questdb/client/cutlass/qwp/client/QwpEgressMsgKind.java b/core/src/main/java/io/questdb/client/cutlass/qwp/client/QwpEgressMsgKind.java index 9dd6287f..63d1fd43 100644 --- a/core/src/main/java/io/questdb/client/cutlass/qwp/client/QwpEgressMsgKind.java +++ b/core/src/main/java/io/questdb/client/cutlass/qwp/client/QwpEgressMsgKind.java @@ -32,9 +32,9 @@ public final class QwpEgressMsgKind { /** * Server -> client. Connection-scoped cache reset. Body: {@code reset_mask:u8} - * with bit 0 = SYMBOL dict, bit 1 = schema-fingerprint cache. Sent between - * queries when a cache hits its server-side soft cap. Recipient clears the - * indicated caches; subsequent RESULT_BATCH delta sections start fresh. + * with bit 0 = SYMBOL dict (the only bit defined). Sent between queries when + * the dict hits its server-side soft cap. Recipient clears the dict; + * subsequent RESULT_BATCH delta sections start fresh. */ public static final byte CACHE_RESET = 0x17; public static final byte CANCEL = 0x14; @@ -59,10 +59,6 @@ public final class QwpEgressMsgKind { * Reset mask bit: clear the connection-scoped SYMBOL dict. */ public static final byte RESET_MASK_DICT = 0x01; - /** - * Reset mask bit: clear the connection-scoped schema-fingerprint cache. - */ - public static final byte RESET_MASK_SCHEMAS = 0x02; public static final byte RESULT_BATCH = 0x11; public static final byte RESULT_END = 0x12; /** @@ -88,7 +84,7 @@ public final class QwpEgressMsgKind { public static final byte ROLE_STANDALONE = 0; /** * Server -> client. Unsolicited frame delivered as the first QWP message - * on every v2 WebSocket connection. Body (little-endian): {@code + * on every WebSocket connection. Body (little-endian): {@code * msg_kind:u8, role:u8, epoch:u64, capabilities:u32, server_wall_ns:i64, * cluster_id:u16_len+utf8, node_id:u16_len+utf8} followed by an optional * {@code zone_id:u16_len+utf8} when the {@link #CAP_ZONE} bit is set in diff --git a/core/src/main/java/io/questdb/client/cutlass/qwp/client/QwpHostHealthTracker.java b/core/src/main/java/io/questdb/client/cutlass/qwp/client/QwpHostHealthTracker.java index 4c4d76cd..166c0331 100644 --- a/core/src/main/java/io/questdb/client/cutlass/qwp/client/QwpHostHealthTracker.java +++ b/core/src/main/java/io/questdb/client/cutlass/qwp/client/QwpHostHealthTracker.java @@ -274,7 +274,7 @@ public void recordTransportError(int idx) { /** * Records a server-advertised zone for the given host index per * failover.md §2.1. Called once with {@code SERVER_INFO.zone_id} after a - * successful upgrade on a v2 connection (gated by {@code CAP_ZONE}), and + * successful upgrade (gated by {@code CAP_ZONE}), and * once with the {@code X-QuestDB-Zone} HTTP header on a {@code 421} * upgrade reject. *
diff --git a/core/src/main/java/io/questdb/client/cutlass/qwp/client/QwpProtocolVersionException.java b/core/src/main/java/io/questdb/client/cutlass/qwp/client/QwpProtocolVersionException.java index 21431170..584602e6 100644 --- a/core/src/main/java/io/questdb/client/cutlass/qwp/client/QwpProtocolVersionException.java +++ b/core/src/main/java/io/questdb/client/cutlass/qwp/client/QwpProtocolVersionException.java @@ -28,7 +28,7 @@ import io.questdb.client.std.str.StringSink; /** - * Server negotiated a QWP version outside {@code [VERSION_1, MAX_SUPPORTED_VERSION]}. + * Server negotiated a QWP version other than {@code QwpConstants.VERSION}. * Terminal: version negotiation is cluster-wide, so failover masks the disagreement. */ public class QwpProtocolVersionException extends QwpDecodeException { diff --git a/core/src/main/java/io/questdb/client/cutlass/qwp/client/QwpQueryClient.java b/core/src/main/java/io/questdb/client/cutlass/qwp/client/QwpQueryClient.java index d21aeffd..4051e1ac 100644 --- a/core/src/main/java/io/questdb/client/cutlass/qwp/client/QwpQueryClient.java +++ b/core/src/main/java/io/questdb/client/cutlass/qwp/client/QwpQueryClient.java @@ -63,7 +63,7 @@ * Multi-endpoint routing: the connection string accepts a comma-separated list * of {@code addr=host:port[,host:port...]} endpoints plus a {@code target=} * filter ({@code any} | {@code primary} | {@code replica}). {@link #connect()} - * walks the list in order, reads the server's role from the v2 + * walks the list in order, reads the server's role from the * {@code SERVER_INFO} frame, and picks the first endpoint matching the target. * When every endpoint reports a role the filter rejects, {@link #connect()} * throws {@link QwpRoleMismatchException} with the last observed info attached @@ -76,7 +76,7 @@ * {@link QwpColumnBatchHandler#onFailoverReset} callback just before replayed * batches start arriving (batch_seq restarts at 0 on the new node), so * accumulating handlers can discard rows the old connection already delivered. - * {@code failover=off} restores the pre-v2 behaviour -- terminal failures + * {@code failover=off} disables this -- terminal failures * surface immediately through {@link QwpColumnBatchHandler#onError}. *
* Terminal-failure latching: transport- or protocol-level faults detected by @@ -109,7 +109,7 @@ public class QwpQueryClient implements QuietCloseable { * sanity bound. */ public static final int MAX_BATCH_ROWS_UPPER_BOUND = 1_048_576; - public static final int QWP_MAX_VERSION = QwpConstants.MAX_SUPPORTED_VERSION; + public static final int QWP_MAX_VERSION = QwpConstants.VERSION; public static final String TARGET_ANY = "any"; public static final String TARGET_PRIMARY = "primary"; public static final String TARGET_REPLICA = "replica"; @@ -141,7 +141,7 @@ public class QwpQueryClient implements QuietCloseable { private static final long DEFAULT_FAILOVER_MAX_DURATION_MS = 30_000L; private static final int DEFAULT_IO_BUFFER_POOL_SIZE = 4; /** - * How long {@link #connect()} waits to read the v2 {@code SERVER_INFO} frame + * How long {@link #connect()} waits to read the {@code SERVER_INFO} frame * from each endpoint before giving up and moving to the next. 5 seconds is * comfortable on a WAN; the server writes SERVER_INFO into the same send * buffer as the 101 upgrade response so under normal conditions the frame @@ -262,9 +262,9 @@ public class QwpQueryClient implements QuietCloseable { // worker thread's post-requestId read. private volatile boolean pendingCancel; // Decoded SERVER_INFO from the current connection's handshake. Null before - // connect() has succeeded, and on connections that negotiated v1 (which - // doesn't emit the frame). Volatile so the I/O thread's read on the - // {@code onFailoverReset} path sees the latest reconnect. + // connect() has succeeded; non-null on every established connection (the + // server always emits the frame). Volatile so getServerInfo(), callable + // from any thread, observes the latest reconnect's value. private volatile QwpServerInfo serverInfo; private int serverInfoTimeoutMs = DEFAULT_SERVER_INFO_TIMEOUT_MS; // Maximum time close() will wait for the I/O thread to exit before giving up @@ -312,7 +312,7 @@ private QwpQueryClient(String host, int port) { * Per {@code failover.md} section 1, the comma form and repeated {@code addr=} keys * both accumulate into a single ordered list; empty entries are rejected. *
* Walks the endpoint list in order: for each entry it opens the TCP socket, - * runs the HTTP upgrade, reads the v2 {@code SERVER_INFO} frame, and accepts + * runs the HTTP upgrade, reads the {@code SERVER_INFO} frame, and accepts * the endpoint if the server's role matches the configured target. An * endpoint that matches becomes the bound connection and the I/O thread is * spawned. An endpoint whose role doesn't match is closed and the walk @@ -855,7 +854,6 @@ public synchronized void connect() { } QwpServerInfo lastObservedMismatch = null; QwpIngressRoleRejectedException lastUpgradeRoleReject = null; - boolean sawV1Mismatch = false; Throwable lastTransportError = null; while (true) { int i = hostTracker.pickNext(); @@ -884,19 +882,13 @@ public synchronized void connect() { cleanupFailedConnect(); continue; } + // info is non-null: connectToEndpoint() returns only after + // receiveServerInfoSync() set serverInfo (it returns non-null or throws). QwpServerInfo info = serverInfo; - if (info != null && (info.getCapabilities() & QwpEgressMsgKind.CAP_ZONE) != 0) { + if ((info.getCapabilities() & QwpEgressMsgKind.CAP_ZONE) != 0) { hostTracker.recordZone(i, info.getZoneId()); } - if (!TARGET_ANY.equals(target) && info == null) { - sawV1Mismatch = true; - hostTracker.recordRoleReject(i, false); - LOG.info("QwpQueryClient {}:{} negotiated v1 (no SERVER_INFO) and target={} requires v2; trying next", - ep.host, ep.port, target); - cleanupFailedConnect(); - continue; - } - if (info != null && !matchesTarget(info.getRole(), target)) { + if (!matchesTarget(info.getRole(), target)) { lastObservedMismatch = info; boolean isTransient = info.getRole() == QwpEgressMsgKind.ROLE_PRIMARY_CATCHUP; hostTracker.recordRoleReject(i, isTransient); @@ -920,14 +912,6 @@ public synchronized void connect() { + " cluster=" + lastObservedMismatch.getClusterId() ); } - if (sawV1Mismatch) { - throw new QwpRoleMismatchException( - target, - null, - "no endpoint matches target=" + target - + "; at least one endpoint negotiated v1 and cannot supply a role" - ); - } if (lastUpgradeRoleReject != null) { // 421 + X-QuestDB-Role on every host: the cluster answered with role // information at upgrade time but no host satisfied target=. Surface @@ -1061,8 +1045,7 @@ public int getNegotiatedZstdLevel() { /** * Returns the {@link QwpServerInfo} decoded from the currently-bound - * server's {@code SERVER_INFO} frame, or {@code null} if the server - * negotiated the v1 protocol (no frame sent) or the client is not + * server's {@code SERVER_INFO} frame, or {@code null} if the client is not * connected. The value is refreshed on every successful failover reconnect. */ public QwpServerInfo getServerInfo() { @@ -1309,7 +1292,7 @@ public QwpQueryClient withInsecureTls() { * Useful for latency-sensitive streaming consumers that want to start * processing the first row as soon as possible -- a smaller cap flushes * the first batch sooner, at the cost of more per-batch overhead (WS - * header, send syscall, schema-reference decode). The server clamps down + * header, send syscall). The server clamps down * to its own hard limit; a value of {@code 0} (default) omits the header * and the server uses its own cap. *
@@ -1326,7 +1309,7 @@ public QwpQueryClient withMaxBatchRows(int rows) {
}
/**
- * Overrides the {@link #DEFAULT_SERVER_INFO_TIMEOUT_MS} wait for the v2
+ * Overrides the {@link #DEFAULT_SERVER_INFO_TIMEOUT_MS} wait for the
* {@code SERVER_INFO} frame. Must be called before {@link #connect}.
*/
public void withServerInfoTimeout(int ms) {
@@ -1610,16 +1593,12 @@ private void connectToEndpoint(Endpoint ep) {
negotiatedQwpVersion = webSocketClient.getServerQwpVersion();
negotiatedZstdLevel = webSocketClient.getServerNegotiatedZstdLevel();
- // v2 servers send SERVER_INFO as the first WebSocket frame after the
+ // The server sends SERVER_INFO as the first WebSocket frame after the
// upgrade response. Consume it synchronously on the user thread before
// spawning the I/O thread, so the role filter can run without any
// cross-thread synchronisation and so a mismatched role doesn't waste
// the I/O thread setup + teardown.
- if (negotiatedQwpVersion >= QwpConstants.VERSION_2) {
- serverInfo = receiveServerInfoSync();
- } else {
- serverInfo = null;
- }
+ serverInfo = receiveServerInfoSync();
// Early probe: if we told the server we can accept zstd, make sure the
// bundled native library actually provides the decompression symbols
@@ -1926,7 +1905,6 @@ private void reconnectViaTracker() {
lastCloseTimedOut = false;
hostTracker.beginRound(false);
QwpServerInfo lastMismatch = null;
- boolean sawV1Mismatch = false;
Throwable lastError = null;
boolean retriedAfterReset = false;
while (true) {
@@ -1957,17 +1935,13 @@ private void reconnectViaTracker() {
cleanupFailedConnect();
continue;
}
+ // info is non-null: connectToEndpoint() returns only after
+ // receiveServerInfoSync() set serverInfo (it returns non-null or throws).
QwpServerInfo info = serverInfo;
- if (info != null && (info.getCapabilities() & QwpEgressMsgKind.CAP_ZONE) != 0) {
+ if ((info.getCapabilities() & QwpEgressMsgKind.CAP_ZONE) != 0) {
hostTracker.recordZone(i, info.getZoneId());
}
- if (!TARGET_ANY.equals(target) && info == null) {
- sawV1Mismatch = true;
- hostTracker.recordRoleReject(i, false);
- cleanupFailedConnect();
- continue;
- }
- if (info != null && !matchesTarget(info.getRole(), target)) {
+ if (!matchesTarget(info.getRole(), target)) {
lastMismatch = info;
boolean isTransient = info.getRole() == QwpEgressMsgKind.ROLE_PRIMARY_CATCHUP;
hostTracker.recordRoleReject(i, isTransient);
@@ -1985,11 +1959,6 @@ private void reconnectViaTracker() {
"no endpoint matches target=" + target + " on failover; last observed role="
+ QwpServerInfo.roleName(lastMismatch.getRole()));
}
- if (sawV1Mismatch) {
- throw new QwpRoleMismatchException(target, null,
- "no endpoint matches target=" + target
- + " on failover; at least one endpoint negotiated v1 and cannot supply a role");
- }
throw new HttpClientException(
"all QWP endpoints unreachable on failover [count=" + total
+ ", lastError=" + (lastError == null ? "
* The native dict buffers are reused (positions reset to 0, capacity
* retained) so a workload that churns just above the cap does not reallocate
@@ -172,9 +180,6 @@ public void applyCacheReset(byte resetMask) {
// and wipes before reading.
connDictGeneration++;
}
- if ((resetMask & QwpEgressMsgKind.RESET_MASK_SCHEMAS) != 0) {
- schemaRegistry.clear();
- }
}
@Override
@@ -200,6 +205,10 @@ public void close() {
dctx = 0;
}
connDictSize = 0;
+ // Invalidate any per-query schema captured before close(). The decoder is
+ // single-use per connection today, but resetting here keeps a future
+ // reuse from binding a continuation batch to a prior connection's schema.
+ querySchemaValid = false;
}
/**
@@ -222,6 +231,16 @@ public void decode(QwpBatchBuffer buffer, long payloadPtr, int payloadLen) throw
decodePayload(buffer, payloadPtr, payloadLen);
}
+ /**
+ * Invalidates the per-query schema captured from the last {@code batch_seq == 0}.
+ * The IoThread calls this when a new query starts so the next query's
+ * continuation batches can't bind rows to a stale schema; the underlying
+ * {@link QwpEgressColumnInfo} slots are retained for reuse.
+ */
+ public void resetQuerySchema() {
+ querySchemaValid = false;
+ }
+
// Pool helpers
private static long advanceFixed(QwpColumnLayout layout, long p, long limit, int sizeBytes) throws QwpDecodeException {
@@ -297,7 +316,7 @@ private void decodePayload(QwpBatchBuffer buffer, long payload, int payloadLen)
throw new QwpDecodeException("bad magic 0x" + Integer.toHexString(magic));
}
byte version = Unsafe.getUnsafe().getByte(payload + 4);
- if (version < QwpConstants.VERSION_1 || version > QwpConstants.MAX_SUPPORTED_VERSION) {
+ if (version != QwpConstants.VERSION) {
throw QwpProtocolVersionException.unsupported(version & 0xFF);
}
byte flags = Unsafe.getUnsafe().getByte(payload + QwpConstants.HEADER_OFFSET_FLAGS);
@@ -411,7 +430,11 @@ private void decodePayload(QwpBatchBuffer buffer, long payload, int payloadLen)
p = parseDeltaSymbolDict(p, limit);
}
- // Table block: name_length, name, row_count, column_count, schema, columns
+ // Table block. The schema (column_count + inline column descriptors)
+ // rides only the first batch of a query (batch_seq == 0); continuation
+ // batches carry rows against the schema parsed there:
+ // batch_seq == 0: name_length, name, row_count, column_count, columns
+ // batch_seq > 0: name_length, name, row_count
decodeVarint(p, limit);
long nameLen = varintValue;
p = varintPos;
@@ -433,29 +456,19 @@ private void decodePayload(QwpBatchBuffer buffer, long payload, int payloadLen)
}
int rowCount = (int) varintValue;
p = varintPos;
- decodeVarint(p, limit);
- if (varintValue < 0 || varintValue > QwpConstants.MAX_COLUMNS_PER_TABLE) {
- throw new QwpDecodeException("column_count out of range: " + varintValue);
- }
- int columnCount = (int) varintValue;
- p = varintPos;
-
- // Schema section
- if (p >= limit) throw new QwpDecodeException("truncated schema mode");
- byte schemaMode = Unsafe.getUnsafe().getByte(p++);
- decodeVarint(p, limit);
- // Reject schema ids that wouldn't fit in our registry (or that cast negative
- // from a hostile high varint). Without this guard, ensureSchemaSlot would
- // either OOM appending billions of nulls or AIOOBE on a negative index.
- if (varintValue < 0 || varintValue >= MAX_SCHEMAS_PER_CONNECTION) {
- throw new QwpDecodeException("schema_id out of range: " + varintValue);
- }
- int schemaId = (int) varintValue;
- p = varintPos;
ObjList
* Each {@link #flush()} encodes all buffered table data into self-contained
* datagrams (one per table) and sends them via UDP. Datagrams use local
- * symbol dictionaries (no global/delta dict) and full schema (no schema refs).
+ * symbol dictionaries (no global/delta dict) and inline column schemas.
*
* When {@code maxDatagramSize > 0}, the sender automatically flushes before
* a datagram exceeds the size limit. The in-progress row stays staged in sender
@@ -1005,9 +1005,7 @@ private int encodeCommittedPrefixPayloadForUdp(QwpTableBuffer tableBuffer) {
prefixStringDataSizeBefore,
prefixSymbolDictionarySizeBefore,
false,
- false,
- false,
- 0
+ false
);
payloadWriter.finish();
return payloadWriter.getPosition();
@@ -1016,7 +1014,7 @@ private int encodeCommittedPrefixPayloadForUdp(QwpTableBuffer tableBuffer) {
private int encodeTablePayloadForUdp(QwpTableBuffer tableBuffer) {
payloadWriter.reset();
columnWriter.setBuffer(payloadWriter);
- columnWriter.encodeTable(tableBuffer, false, false, false);
+ columnWriter.encodeTable(tableBuffer, false, false);
payloadWriter.finish();
return payloadWriter.getPosition();
}
@@ -1091,8 +1089,6 @@ private long estimateBaseForCurrentSchema() {
estimate += NativeBufferWriter.varintSize(tableNameUtf8) + tableNameUtf8;
estimate += VARINT_INT_UPPER_BOUND;
estimate += VARINT_INT_UPPER_BOUND;
- estimate += 1; // schema mode byte
- estimate += VARINT_INT_UPPER_BOUND; // schemaId
QwpColumnDef[] defs = currentTableBuffer.getColumnDefs();
for (int i = 0, n = defs.length; i < n; i++) {
@@ -1231,7 +1227,7 @@ private void sendEncodedPayload(CharSequence tableName, int payloadLength) {
headerBuffer.putByte((byte) 'W');
headerBuffer.putByte((byte) 'P');
headerBuffer.putByte((byte) '1');
- headerBuffer.putByte(VERSION_1);
+ headerBuffer.putByte(VERSION);
headerBuffer.putByte((byte) 0);
headerBuffer.putShort((short) 1);
headerBuffer.putInt(payloadLength);
diff --git a/core/src/main/java/io/questdb/client/cutlass/qwp/client/QwpWebSocketEncoder.java b/core/src/main/java/io/questdb/client/cutlass/qwp/client/QwpWebSocketEncoder.java
index f4b13733..85db3f0d 100644
--- a/core/src/main/java/io/questdb/client/cutlass/qwp/client/QwpWebSocketEncoder.java
+++ b/core/src/main/java/io/questdb/client/cutlass/qwp/client/QwpWebSocketEncoder.java
@@ -41,7 +41,7 @@ public class QwpWebSocketEncoder implements QuietCloseable {
private NativeBufferWriter buffer;
private byte flags;
private int payloadStart;
- private byte version = VERSION_1;
+ private byte version = VERSION;
public QwpWebSocketEncoder() {
this.buffer = new NativeBufferWriter();
@@ -53,8 +53,8 @@ public QwpWebSocketEncoder(int bufferSize) {
this.flags = 0;
}
- public void addTable(QwpTableBuffer tableBuffer, boolean useSchemaRef) {
- columnWriter.encodeTable(tableBuffer, useSchemaRef, true, isGorillaEnabled());
+ public void addTable(QwpTableBuffer tableBuffer) {
+ columnWriter.encodeTable(tableBuffer, true, isGorillaEnabled());
}
public void beginMessage(
@@ -89,12 +89,12 @@ public void close() {
}
}
- public int encode(QwpTableBuffer tableBuffer, boolean useSchemaRef) {
+ public int encode(QwpTableBuffer tableBuffer) {
buffer.reset();
writeHeader(1, 0);
int payloadStart = buffer.getPosition();
columnWriter.setBuffer(buffer);
- columnWriter.encodeTable(tableBuffer, useSchemaRef, false, isGorillaEnabled());
+ columnWriter.encodeTable(tableBuffer, false, isGorillaEnabled());
int payloadLength = buffer.getPosition() - payloadStart;
buffer.patchInt(8, payloadLength);
return buffer.getPosition();
@@ -104,11 +104,10 @@ public int encodeWithDeltaDict(
QwpTableBuffer tableBuffer,
GlobalSymbolDictionary globalDict,
int confirmedMaxId,
- int batchMaxId,
- boolean useSchemaRef
+ int batchMaxId
) {
beginMessage(1, globalDict, confirmedMaxId, batchMaxId);
- addTable(tableBuffer, useSchemaRef);
+ addTable(tableBuffer);
return finishMessage();
}
diff --git a/core/src/main/java/io/questdb/client/cutlass/qwp/client/QwpWebSocketSender.java b/core/src/main/java/io/questdb/client/cutlass/qwp/client/QwpWebSocketSender.java
index 7d43d72f..c8af5fc5 100644
--- a/core/src/main/java/io/questdb/client/cutlass/qwp/client/QwpWebSocketSender.java
+++ b/core/src/main/java/io/questdb/client/cutlass/qwp/client/QwpWebSocketSender.java
@@ -126,7 +126,6 @@ public class QwpWebSocketSender implements Sender {
public static final int DEFAULT_AUTO_FLUSH_BYTES = 8 * 1024 * 1024;
public static final long DEFAULT_AUTO_FLUSH_INTERVAL_NANOS = 100_000_000L; // 100ms
public static final int DEFAULT_AUTO_FLUSH_ROWS = 1_000;
- public static final int DEFAULT_MAX_SCHEMAS_PER_CONNECTION = 65_535;
private static final int DEFAULT_BUFFER_SIZE = 8192;
private static final int DEFAULT_MICROBATCH_BUFFER_SIZE = 1024 * 1024; // 1MB
private static final Logger LOG = LoggerFactory.getLogger(QwpWebSocketSender.class);
@@ -149,7 +148,6 @@ public class QwpWebSocketSender implements Sender {
// Global symbol dictionary for delta encoding
private final GlobalSymbolDictionary globalSymbolDictionary;
private final QwpHostHealthTracker hostTracker;
- private final int maxSchemasPerConnection;
private final CharSequenceObjHashMap
- * SERVER_INFO-driven role checks (target=primary against a v2 server
+ * SERVER_INFO-driven role checks (target=primary against a server
* advertising REPLICA via the SERVER_INFO frame) belong to the parent
* QuestDB egress integration suite -- TestWebSocketServer here only
* covers the upgrade-time {@code X-QuestDB-Role} header path which is
@@ -80,6 +81,7 @@ public void testWalk_404NotFoundIsTransportNotTerminal() throws Exception {
TestWebSocketServer notFound = new TestWebSocketServer(port404, NOOP_HANDLER);
notFound.setRejectWithStatus(404, "Not Found");
TestWebSocketServer ok = new TestWebSocketServer(portOk, NOOP_HANDLER);
+ ok.setSendServerInfo(true);
try {
notFound.start();
ok.start();
@@ -106,6 +108,7 @@ public void testWalk_426UpgradeRequiredIsTransportNotTerminal() throws Exception
TestWebSocketServer rejecting = new TestWebSocketServer(port426, NOOP_HANDLER);
rejecting.setRejectWithStatus(426, "Upgrade Required");
TestWebSocketServer ok = new TestWebSocketServer(portOk, NOOP_HANDLER);
+ ok.setSendServerInfo(true);
try {
rejecting.start();
ok.start();
@@ -258,14 +261,16 @@ public void testWalk_FallThroughResetRehabilitatesPriorTopologyRejects() throws
// see every host TopologyReject (priority 5) and walk past the
// now-healthy A only to fail.
//
- // target=any keeps this v1-friendly: the spec defines
- // target=primary as requiring v2 SERVER_INFO, which the test
- // server doesn't emit. A separate integration test in the
- // parent QuestDB repo covers the SERVER_INFO path.
+ // target=any so the rehabilitated host A binds regardless of the
+ // role it advertises in its SERVER_INFO frame -- this test
+ // exercises the fall-through reset, not the role filter. The
+ // SERVER_INFO-driven role filter (target=primary/replica) is
+ // covered by a separate integration test in the parent QuestDB repo.
int portA = TestPorts.findUnusedPort();
int portB = TestPorts.findUnusedPort();
TestWebSocketServer a = new TestWebSocketServer(portA, NOOP_HANDLER);
a.setRejectWithRole("REPLICA");
+ a.setSendServerInfo(true);
TestWebSocketServer b = new TestWebSocketServer(portB, NOOP_HANDLER);
b.setRejectWithRole("REPLICA");
try {
@@ -305,6 +310,7 @@ public void testWalk_FirstReachablePrimaryWins() throws Exception {
TestWebSocketServer rep = new TestWebSocketServer(portReplica, NOOP_HANDLER);
rep.setRejectWithRole("REPLICA");
TestWebSocketServer prim = new TestWebSocketServer(portPrimary, NOOP_HANDLER, false, "PRIMARY");
+ prim.setSendServerInfo(true);
try {
rep.start();
prim.start();
@@ -322,6 +328,110 @@ public void testWalk_FirstReachablePrimaryWins() throws Exception {
}
}
+ @Test
+ public void testWalk_ServerInfoReplicaRejectedForTargetPrimary() throws Exception {
+ // A node that completes a clean 101 upgrade and then advertises REPLICA
+ // only in its SERVER_INFO frame (not via the 421 X-QuestDB-Role header
+ // path the other role tests use) must be rejected by the role filter
+ // when target=primary. Pins matchesTarget(info.getRole(), target) where
+ // info is the decoded SERVER_INFO -- the branch that outlived the
+ // v1-mismatch removal. A clean 101 ignores the upgrade-time role header,
+ // so the rejection here is driven purely by the SERVER_INFO role.
+ int port = TestPorts.findUnusedPort();
+ TestWebSocketServer replica = new TestWebSocketServer(port, NOOP_HANDLER);
+ replica.setAdvertisedRole("REPLICA");
+ replica.setSendServerInfo(true);
+ try {
+ replica.start();
+ Assert.assertTrue(replica.awaitStart(5, TimeUnit.SECONDS));
+
+ try (QwpQueryClient client = QwpQueryClient.fromConfig(
+ "ws::addr=localhost:" + port + ";target=primary;auth_timeout_ms=2000;")) {
+ try {
+ client.connect();
+ Assert.fail("expected QwpRoleMismatchException for a REPLICA SERVER_INFO under target=primary");
+ } catch (QwpRoleMismatchException expected) {
+ Assert.assertFalse("a role mismatch must not leave the client connected", client.isConnected());
+ Assert.assertEquals("primary", expected.getTargetRole());
+ // The rejected role is taken from the decoded SERVER_INFO frame.
+ Assert.assertNotNull("observed SERVER_INFO must be attached", expected.getLastObserved());
+ Assert.assertEquals(QwpEgressMsgKind.ROLE_REPLICA, expected.getLastObserved().getRole());
+ Assert.assertTrue("message must mention target=primary: " + expected.getMessage(),
+ expected.getMessage().contains("target=primary"));
+ }
+ }
+ } finally {
+ replica.close();
+ }
+ }
+
+ @Test
+ public void testWalk_ServerInfoRoleFilterSkipsReplicaBindsPrimary() throws Exception {
+ // Both endpoints complete a clean 101 and advertise their role only via
+ // the SERVER_INFO frame. With target=primary the walk must skip the
+ // REPLICA endpoint -- a SERVER_INFO role mismatch is a skip, not a
+ // terminal failure -- and bind the PRIMARY one. Exercises the
+ // walk-continues side of matchesTarget(info.getRole(), target).
+ int portReplica = TestPorts.findUnusedPort();
+ int portPrimary = TestPorts.findUnusedPort();
+ TestWebSocketServer replica = new TestWebSocketServer(portReplica, NOOP_HANDLER);
+ replica.setAdvertisedRole("REPLICA");
+ replica.setSendServerInfo(true);
+ TestWebSocketServer primary = new TestWebSocketServer(portPrimary, NOOP_HANDLER);
+ primary.setAdvertisedRole("PRIMARY");
+ primary.setSendServerInfo(true);
+ try {
+ replica.start();
+ primary.start();
+ Assert.assertTrue(replica.awaitStart(5, TimeUnit.SECONDS));
+ Assert.assertTrue(primary.awaitStart(5, TimeUnit.SECONDS));
+
+ try (QwpQueryClient client = QwpQueryClient.fromConfig(
+ "ws::addr=localhost:" + portReplica + ",localhost:" + portPrimary
+ + ";target=primary;auth_timeout_ms=2000;")) {
+ client.connect();
+ Assert.assertTrue("client must skip the REPLICA and bind the PRIMARY", client.isConnected());
+ Assert.assertNotNull("bound connection must carry SERVER_INFO", client.getServerInfo());
+ Assert.assertEquals(QwpEgressMsgKind.ROLE_PRIMARY, client.getServerInfo().getRole());
+ }
+ } finally {
+ replica.close();
+ primary.close();
+ }
+ }
+
+ @Test(timeout = 15_000)
+ public void testWalk_ServerInfoTimeoutIsTransportNotTerminal() throws Exception {
+ // A node that completes the 101 upgrade but never sends the mandatory
+ // SERVER_INFO frame must be treated as a transport error so the walk
+ // continues, not a terminal failure. receiveServerInfoSync() now runs on
+ // every connect, so a silent post-upgrade peer would otherwise stall the
+ // client until the server-info timeout; bound it short here and verify
+ // the walk falls through to a healthy node.
+ int portSilent = TestPorts.findUnusedPort();
+ int portOk = TestPorts.findUnusedPort();
+ TestWebSocketServer silent = new TestWebSocketServer(portSilent, NOOP_HANDLER);
+ // sendServerInfo left off: the 101 upgrade succeeds, then the node stays silent.
+ TestWebSocketServer ok = new TestWebSocketServer(portOk, NOOP_HANDLER);
+ ok.setSendServerInfo(true);
+ try {
+ silent.start();
+ ok.start();
+ Assert.assertTrue(silent.awaitStart(5, TimeUnit.SECONDS));
+ Assert.assertTrue(ok.awaitStart(5, TimeUnit.SECONDS));
+
+ try (QwpQueryClient client = QwpQueryClient.fromConfig(
+ "ws::addr=localhost:" + portSilent + ",localhost:" + portOk + ";auth_timeout_ms=2000;")) {
+ client.withServerInfoTimeout(300);
+ client.connect();
+ Assert.assertTrue("client must walk past the SERVER_INFO-silent node", client.isConnected());
+ }
+ } finally {
+ silent.close();
+ ok.close();
+ }
+ }
+
@Test
public void testWalk_TransportFailureContinuesWalk() throws Exception {
// First port has no server (TCP refused); second is reachable.
@@ -330,6 +440,7 @@ public void testWalk_TransportFailureContinuesWalk() throws Exception {
int portDead = TestPorts.findUnusedPort();
int portOk = TestPorts.findUnusedPort();
try (TestWebSocketServer ok = new TestWebSocketServer(portOk, NOOP_HANDLER)) {
+ ok.setSendServerInfo(true);
ok.start();
Assert.assertTrue(ok.awaitStart(5, TimeUnit.SECONDS));
diff --git a/core/src/test/java/io/questdb/client/test/cutlass/qwp/client/QwpResultBatchDecoderColumnTypesTest.java b/core/src/test/java/io/questdb/client/test/cutlass/qwp/client/QwpResultBatchDecoderColumnTypesTest.java
index eed16251..a6872ed0 100644
--- a/core/src/test/java/io/questdb/client/test/cutlass/qwp/client/QwpResultBatchDecoderColumnTypesTest.java
+++ b/core/src/test/java/io/questdb/client/test/cutlass/qwp/client/QwpResultBatchDecoderColumnTypesTest.java
@@ -47,7 +47,7 @@
* pins the rejection paths for malformed frames, this file pins the
* happy-path decoders for every wire type plus the connection-scoped state
* machines (delta SYMBOL dict, FLAG_GORILLA TIMESTAMP, FLAG_DELTA_SYMBOL_DICT,
- * CACHE_RESET, SCHEMA_MODE_REFERENCE). Each test crafts a single-column or
+ * CACHE_RESET, batch_seq schema reuse). Each test crafts a single-column or
* minimal multi-column RESULT_BATCH in native memory and verifies the values
* surface correctly through {@link io.questdb.client.cutlass.qwp.client.QwpColumnBatch}
* accessors, mirroring the production end-to-end flow without a server.
@@ -137,8 +137,6 @@ public void testCacheResetDictMaskClearsConnectionDict() throws Exception {
p = putVarint(p, 0L);
p = putVarint(p, 1L);
p = putVarint(p, 1L);
- p = putByte(p, QwpConstants.SCHEMA_MODE_FULL);
- p = putVarint(p, 0L);
p = putVarint(p, 1L);
p = putByte(p, (byte) 's');
p = putByte(p, QwpConstants.TYPE_SYMBOL);
@@ -157,7 +155,7 @@ public void testCacheResetDictMaskClearsConnectionDict() throws Exception {
p = writeHeader(p, QwpConstants.FLAG_DELTA_SYMBOL_DICT);
p = putByte(p, (byte) 0x11);
p = putLong(p, 1L);
- p = putVarint(p, 1L);
+ p = putVarint(p, 0L); // batch_seq = 0 (independent batch)
p = putVarint(p, 0L); // deltaStart = 0 after reset
p = putVarint(p, 1L);
byte[] bbb = "bbb".getBytes(StandardCharsets.UTF_8);
@@ -166,8 +164,6 @@ public void testCacheResetDictMaskClearsConnectionDict() throws Exception {
p = putVarint(p, 0L);
p = putVarint(p, 1L);
p = putVarint(p, 1L);
- p = putByte(p, QwpConstants.SCHEMA_MODE_FULL);
- p = putVarint(p, 0L);
p = putVarint(p, 1L);
p = putByte(p, (byte) 's');
p = putByte(p, QwpConstants.TYPE_SYMBOL);
@@ -185,52 +181,10 @@ public void testCacheResetDictMaskClearsConnectionDict() throws Exception {
});
}
- @Test
- public void testCacheResetSchemasMaskClearsRegistry() throws Exception {
- TestUtils.assertMemoryLeak(() -> {
- QwpResultBatchDecoder decoder = new QwpResultBatchDecoder();
- QwpBatchBuffer buffer = new QwpBatchBuffer(256);
- long staging = Unsafe.malloc(256, MemoryTag.NATIVE_DEFAULT);
- try {
- // Register schema_id=3 with one INT column, then CACHE_RESET schemas,
- // then attempt to reference schema_id=3 -- decoder must reject.
- long p = startSingleColumnFrame(staging, "n", QwpConstants.TYPE_INT, 1, 3L);
- p = putByte(p, (byte) 0);
- p = putInt(p, 1);
- buffer.copyFromPayload(staging, (int) (p - staging));
- decoder.decode(buffer);
-
- decoder.applyCacheReset(QwpEgressMsgKind.RESET_MASK_SCHEMAS);
-
- p = staging;
- p = writeHeader(p, (byte) 0);
- p = putByte(p, (byte) 0x11);
- p = putLong(p, 1L);
- p = putVarint(p, 0L);
- p = putVarint(p, 0L);
- p = putVarint(p, 0L);
- p = putVarint(p, 1L);
- p = putByte(p, QwpConstants.SCHEMA_MODE_REFERENCE);
- p = putVarint(p, 3L);
- buffer.copyFromPayload(staging, (int) (p - staging));
- try {
- decoder.decode(buffer);
- Assert.fail("schema ref must be rejected after CACHE_RESET schemas mask");
- } catch (QwpDecodeException expected) {
- Assert.assertTrue(expected.getMessage().contains("not registered"));
- }
- } finally {
- Unsafe.free(staging, 256, MemoryTag.NATIVE_DEFAULT);
- buffer.close();
- decoder.close();
- }
- });
- }
-
@Test
public void testCharColumnDecodes() throws Exception {
TestUtils.assertMemoryLeak(() -> withDecoder(256, (decoder, buffer, staging) -> {
- char[] values = {'a', 'Z', '0', ' ', ''};
+ char[] values = {'a', 'Z', '0', '\u0000', '\uffff'};
long p = startSingleColumnFrame(staging, "c", QwpConstants.TYPE_CHAR, values.length);
p = putByte(p, (byte) 0);
for (char v : values) p = putShort(p, (short) v);
@@ -292,8 +246,6 @@ public void testEmptyBatchDecodes() throws Exception {
p = putVarint(p, 0L); // table_name_len
p = putVarint(p, 0L); // row_count = 0
p = putVarint(p, 0L); // column_count = 0
- p = putByte(p, QwpConstants.SCHEMA_MODE_FULL);
- p = putVarint(p, 0L); // schema_id
buffer.copyFromPayload(staging, (int) (p - staging));
decoder.decode(buffer);
@@ -302,6 +254,39 @@ public void testEmptyBatchDecodes() throws Exception {
}));
}
+ @Test
+ public void testEmptyResultSetCarriesSchema() throws Exception {
+ TestUtils.assertMemoryLeak(() -> withDecoder(128, (decoder, buffer, staging) -> {
+ // Realistic empty result: batch_seq == 0, row_count == 0, but the schema
+ // is present (column_count > 0 with descriptors, no row bodies). The
+ // server always ships the schema on batch 0 even when the result is empty.
+ long p = staging;
+ p = writeHeader(p, (byte) 0);
+ p = putByte(p, (byte) 0x11); // RESULT_BATCH
+ p = putLong(p, 1L); // request_id
+ p = putVarint(p, 0L); // batch_seq = 0
+ p = putVarint(p, 0L); // table_name_len
+ p = putVarint(p, 0L); // row_count = 0
+ p = putVarint(p, 2L); // column_count = 2
+ p = putVarint(p, 1L);
+ p = putByte(p, (byte) 'a');
+ p = putByte(p, QwpConstants.TYPE_INT);
+ p = putVarint(p, 1L);
+ p = putByte(p, (byte) 'b');
+ p = putByte(p, QwpConstants.TYPE_LONG);
+ // Each column still carries its null-flag byte even at row_count == 0
+ // (no nulls -> 0x00); the decoder reads one per column. No row bodies.
+ p = putByte(p, (byte) 0); // col "a" null_flag
+ p = putByte(p, (byte) 0); // col "b" null_flag
+ buffer.copyFromPayload(staging, (int) (p - staging));
+ decoder.decode(buffer);
+ Assert.assertEquals(0, batchOf(buffer).getRowCount());
+ Assert.assertEquals(2, batchOf(buffer).getColumnCount());
+ Assert.assertEquals("a", batchOf(buffer).getColumnName(0));
+ Assert.assertEquals("b", batchOf(buffer).getColumnName(1));
+ }));
+ }
+
@Test
public void testGeohashColumnReadsPrecisionVarint() throws Exception {
TestUtils.assertMemoryLeak(() -> withDecoder(256, (decoder, buffer, staging) -> {
@@ -434,39 +419,45 @@ public void testNullBitmapPopulatesNonNullIdx() throws Exception {
}
@Test
- public void testSchemaReferenceModeReusesEarlierSchema() throws Exception {
+ public void testResetQuerySchemaRejectsContinuationFromPriorQuery() throws Exception {
TestUtils.assertMemoryLeak(() -> {
QwpResultBatchDecoder decoder = new QwpResultBatchDecoder();
- QwpBatchBuffer buffer = new QwpBatchBuffer(512);
- long staging = Unsafe.malloc(512, MemoryTag.NATIVE_DEFAULT);
+ QwpBatchBuffer buffer = new QwpBatchBuffer(256);
+ long staging = Unsafe.malloc(256, MemoryTag.NATIVE_DEFAULT);
try {
- // Batch 1 registers schema_id=7 with one INT column.
- long p = startSingleColumnFrame(staging, "n", QwpConstants.TYPE_INT, 1, /*schemaId=*/ 7L);
+ // Query A delivers its schema on batch_seq == 0.
+ long p = startSingleColumnFrame(staging, "n", QwpConstants.TYPE_INT, 1);
p = putByte(p, (byte) 0);
p = putInt(p, 99);
buffer.copyFromPayload(staging, (int) (p - staging));
decoder.decode(buffer);
Assert.assertEquals(99, batchOf(buffer).getIntValue(0, 0));
- // Batch 2 references schema_id=7 (REFERENCE mode) -- no schema bytes inline.
+ // The IoThread calls resetQuerySchema() when the next query starts.
+ // After that, query A's schema must no longer satisfy a continuation:
+ // a batch_seq > 0 arriving before the new query's batch_seq == 0 must
+ // be rejected, not bound to the prior query's schema.
+ decoder.resetQuerySchema();
+
p = staging;
- p = writeHeader(p, /*flags=*/ (byte) 0);
+ p = writeHeader(p, (byte) 0);
p = putByte(p, (byte) 0x11);
p = putLong(p, 1L);
- p = putVarint(p, 1L);
- p = putVarint(p, 0L);
+ p = putVarint(p, 1L); // batch_seq = 1 (continuation)
+ p = putVarint(p, 0L); // table_name_len
p = putVarint(p, 1L); // row_count
- p = putVarint(p, 1L); // column_count
- p = putByte(p, QwpConstants.SCHEMA_MODE_REFERENCE);
- p = putVarint(p, 7L); // schema_id
p = putByte(p, (byte) 0); // null_flag
p = putInt(p, 1234); // value
buffer.copyFromPayload(staging, (int) (p - staging));
- decoder.decode(buffer);
- Assert.assertEquals(1234, batchOf(buffer).getIntValue(0, 0));
- Assert.assertEquals("n", batchOf(buffer).getColumnName(0));
+ try {
+ decoder.decode(buffer);
+ Assert.fail("decoder must reject a continuation after resetQuerySchema()");
+ } catch (QwpDecodeException expected) {
+ Assert.assertTrue("error mentions the missing schema batch: " + expected.getMessage(),
+ expected.getMessage().contains("batch_seq"));
+ }
} finally {
- Unsafe.free(staging, 512, MemoryTag.NATIVE_DEFAULT);
+ Unsafe.free(staging, 256, MemoryTag.NATIVE_DEFAULT);
buffer.close();
decoder.close();
}
@@ -474,29 +465,29 @@ public void testSchemaReferenceModeReusesEarlierSchema() throws Exception {
}
@Test
- public void testSchemaReferenceModeUnregisteredRejected() throws Exception {
+ public void testSchemaMissingOnContinuationRejected() throws Exception {
TestUtils.assertMemoryLeak(() -> {
QwpResultBatchDecoder decoder = new QwpResultBatchDecoder();
QwpBatchBuffer buffer = new QwpBatchBuffer(256);
long staging = Unsafe.malloc(256, MemoryTag.NATIVE_DEFAULT);
try {
+ // A continuation batch (batch_seq > 0) arriving before any
+ // schema-bearing batch_seq == 0 must be rejected, not bound to a
+ // stale schema.
long p = staging;
p = writeHeader(p, (byte) 0);
p = putByte(p, (byte) 0x11);
p = putLong(p, 1L);
- p = putVarint(p, 0L);
- p = putVarint(p, 0L);
- p = putVarint(p, 0L);
- p = putVarint(p, 0L);
- p = putByte(p, QwpConstants.SCHEMA_MODE_REFERENCE);
- p = putVarint(p, 99L); // never registered
+ p = putVarint(p, 1L); // batch_seq = 1 (continuation)
+ p = putVarint(p, 0L); // table_name_len
+ p = putVarint(p, 0L); // row_count
buffer.copyFromPayload(staging, (int) (p - staging));
try {
decoder.decode(buffer);
- Assert.fail("decoder must reject unregistered schema reference");
+ Assert.fail("decoder must reject a continuation batch with no prior schema");
} catch (QwpDecodeException expected) {
- Assert.assertTrue("error mentions schema id: " + expected.getMessage(),
- expected.getMessage().contains("not registered"));
+ Assert.assertTrue("error mentions the missing schema batch: " + expected.getMessage(),
+ expected.getMessage().contains("batch_seq"));
}
} finally {
Unsafe.free(staging, 256, MemoryTag.NATIVE_DEFAULT);
@@ -506,6 +497,170 @@ public void testSchemaReferenceModeUnregisteredRejected() throws Exception {
});
}
+ @Test
+ public void testSchemaReusedAcrossContinuationBatches() throws Exception {
+ TestUtils.assertMemoryLeak(() -> {
+ QwpResultBatchDecoder decoder = new QwpResultBatchDecoder();
+ QwpBatchBuffer buffer = new QwpBatchBuffer(512);
+ long staging = Unsafe.malloc(512, MemoryTag.NATIVE_DEFAULT);
+ try {
+ // Batch 1 (batch_seq == 0) carries the schema: one INT column "n".
+ long p = startSingleColumnFrame(staging, "n", QwpConstants.TYPE_INT, 1);
+ p = putByte(p, (byte) 0);
+ p = putInt(p, 99);
+ buffer.copyFromPayload(staging, (int) (p - staging));
+ decoder.decode(buffer);
+ Assert.assertEquals(99, batchOf(buffer).getIntValue(0, 0));
+
+ // Batch 2 (batch_seq == 1) carries rows only and reuses the schema
+ // parsed from batch 0 -- no column_count / column descriptors inline.
+ p = staging;
+ p = writeHeader(p, /*flags=*/ (byte) 0);
+ p = putByte(p, (byte) 0x11);
+ p = putLong(p, 1L);
+ p = putVarint(p, 1L); // batch_seq = 1 (continuation)
+ p = putVarint(p, 0L); // table_name_len
+ p = putVarint(p, 1L); // row_count
+ p = putByte(p, (byte) 0); // null_flag
+ p = putInt(p, 1234); // value
+ buffer.copyFromPayload(staging, (int) (p - staging));
+ decoder.decode(buffer);
+ Assert.assertEquals(1234, batchOf(buffer).getIntValue(0, 0));
+ Assert.assertEquals("n", batchOf(buffer).getColumnName(0));
+ } finally {
+ Unsafe.free(staging, 512, MemoryTag.NATIVE_DEFAULT);
+ buffer.close();
+ decoder.close();
+ }
+ });
+ }
+
+ @Test
+ public void testSchemaSlotsReusedAcrossQueriesWithDifferentTypes() throws Exception {
+ TestUtils.assertMemoryLeak(() -> {
+ QwpResultBatchDecoder decoder = new QwpResultBatchDecoder();
+ QwpBatchBuffer buffer = new QwpBatchBuffer(512);
+ long staging = Unsafe.malloc(512, MemoryTag.NATIVE_DEFAULT);
+ try {
+ // Query A: one INT column "i". Seeds pooled schema slot 0 with
+ // name "i" / wire-type INT.
+ long p = startSingleColumnFrame(staging, "i", QwpConstants.TYPE_INT, 1);
+ p = putByte(p, (byte) 0);
+ p = putInt(p, 5);
+ buffer.copyFromPayload(staging, (int) (p - staging));
+ decoder.decode(buffer);
+ Assert.assertEquals(5, batchOf(buffer).getIntValue(0, 0));
+
+ decoder.resetQuerySchema();
+
+ // Query B batch_seq == 0: two columns LONG "p", INT "q". Slot 0 is
+ // reused from query A and must be fully overwritten (name "p",
+ // wire-type LONG), not retain A's "i"/INT; slot 1 is freshly grown.
+ p = staging;
+ p = writeHeader(p, (byte) 0);
+ p = putByte(p, (byte) 0x11);
+ p = putLong(p, 1L);
+ p = putVarint(p, 0L); // batch_seq = 0
+ p = putVarint(p, 0L); // table_name_len
+ p = putVarint(p, 1L); // row_count
+ p = putVarint(p, 2L); // column_count
+ p = putVarint(p, 1L); // col 0 name length
+ p = putByte(p, (byte) 'p');
+ p = putByte(p, QwpConstants.TYPE_LONG);
+ p = putVarint(p, 1L); // col 1 name length
+ p = putByte(p, (byte) 'q');
+ p = putByte(p, QwpConstants.TYPE_INT);
+ p = putByte(p, (byte) 0); // col 0 null_flag
+ p = putLong(p, 9_876_543_210L); // col 0 value (LONG)
+ p = putByte(p, (byte) 0); // col 1 null_flag
+ p = putInt(p, 42); // col 1 value (INT)
+ buffer.copyFromPayload(staging, (int) (p - staging));
+ decoder.decode(buffer);
+
+ Assert.assertEquals(2, batchOf(buffer).getColumnCount());
+ Assert.assertEquals("p", batchOf(buffer).getColumnName(0));
+ Assert.assertEquals(QwpConstants.TYPE_LONG, batchOf(buffer).getColumnWireType(0));
+ Assert.assertEquals(9_876_543_210L, batchOf(buffer).getLongValue(0, 0));
+ Assert.assertEquals("q", batchOf(buffer).getColumnName(1));
+ Assert.assertEquals(42, batchOf(buffer).getIntValue(1, 0));
+ } finally {
+ Unsafe.free(staging, 512, MemoryTag.NATIVE_DEFAULT);
+ buffer.close();
+ decoder.close();
+ }
+ });
+ }
+
+ @Test
+ public void testSchemaSwitchesAcrossQueriesWithDifferentColumnCount() throws Exception {
+ TestUtils.assertMemoryLeak(() -> {
+ QwpResultBatchDecoder decoder = new QwpResultBatchDecoder();
+ QwpBatchBuffer buffer = new QwpBatchBuffer(512);
+ long staging = Unsafe.malloc(512, MemoryTag.NATIVE_DEFAULT);
+ try {
+ // Query A batch_seq == 0: two INT columns "a", "b" -- grows the
+ // pooled schema to 2 slots.
+ long p = staging;
+ p = writeHeader(p, (byte) 0);
+ p = putByte(p, (byte) 0x11);
+ p = putLong(p, 1L);
+ p = putVarint(p, 0L); // batch_seq = 0
+ p = putVarint(p, 0L); // table_name_len
+ p = putVarint(p, 1L); // row_count
+ p = putVarint(p, 2L); // column_count
+ p = putVarint(p, 1L);
+ p = putByte(p, (byte) 'a');
+ p = putByte(p, QwpConstants.TYPE_INT);
+ p = putVarint(p, 1L);
+ p = putByte(p, (byte) 'b');
+ p = putByte(p, QwpConstants.TYPE_INT);
+ p = putByte(p, (byte) 0); // col a null_flag
+ p = putInt(p, 10);
+ p = putByte(p, (byte) 0); // col b null_flag
+ p = putInt(p, 20);
+ buffer.copyFromPayload(staging, (int) (p - staging));
+ decoder.decode(buffer);
+ Assert.assertEquals(2, batchOf(buffer).getColumnCount());
+ Assert.assertEquals(10, batchOf(buffer).getIntValue(0, 0));
+ Assert.assertEquals(20, batchOf(buffer).getIntValue(1, 0));
+
+ decoder.resetQuerySchema();
+
+ // Query B batch_seq == 0: a single INT column "x" -- shrinks the
+ // pooled schema back to 1 slot.
+ p = startSingleColumnFrame(staging, "x", QwpConstants.TYPE_INT, 1);
+ p = putByte(p, (byte) 0);
+ p = putInt(p, 77);
+ buffer.copyFromPayload(staging, (int) (p - staging));
+ decoder.decode(buffer);
+ Assert.assertEquals(1, batchOf(buffer).getColumnCount());
+ Assert.assertEquals(77, batchOf(buffer).getIntValue(0, 0));
+
+ // Query B continuation (batch_seq == 1): rows only. It binds to
+ // query B's 1-column schema -- columnCount derives from
+ // querySchema.size(), which must have shrunk to 1 (not stale at 2).
+ p = staging;
+ p = writeHeader(p, (byte) 0);
+ p = putByte(p, (byte) 0x11);
+ p = putLong(p, 1L);
+ p = putVarint(p, 1L); // batch_seq = 1
+ p = putVarint(p, 0L); // table_name_len
+ p = putVarint(p, 1L); // row_count
+ p = putByte(p, (byte) 0); // null_flag
+ p = putInt(p, 88);
+ buffer.copyFromPayload(staging, (int) (p - staging));
+ decoder.decode(buffer);
+ Assert.assertEquals(1, batchOf(buffer).getColumnCount());
+ Assert.assertEquals("x", batchOf(buffer).getColumnName(0));
+ Assert.assertEquals(88, batchOf(buffer).getIntValue(0, 0));
+ } finally {
+ Unsafe.free(staging, 512, MemoryTag.NATIVE_DEFAULT);
+ buffer.close();
+ decoder.close();
+ }
+ });
+ }
+
@Test
public void testSymbolColumnDeltaModeAccumulatesAcrossBatches() throws Exception {
TestUtils.assertMemoryLeak(() -> {
@@ -532,8 +687,6 @@ public void testSymbolColumnDeltaModeAccumulatesAcrossBatches() throws Exception
p = putVarint(p, 0L); // table_name_len
p = putVarint(p, 2L); // row_count
p = putVarint(p, 1L); // column_count
- p = putByte(p, QwpConstants.SCHEMA_MODE_FULL);
- p = putVarint(p, 0L); // schema_id
p = putVarint(p, 1L); // colName length
p = putByte(p, (byte) 's');
p = putByte(p, QwpConstants.TYPE_SYMBOL);
@@ -560,12 +713,6 @@ public void testSymbolColumnDeltaModeAccumulatesAcrossBatches() throws Exception
for (byte b : gamma) p = putByte(p, b);
p = putVarint(p, 0L);
p = putVarint(p, 1L);
- p = putVarint(p, 1L);
- p = putByte(p, QwpConstants.SCHEMA_MODE_FULL);
- p = putVarint(p, 0L);
- p = putVarint(p, 1L);
- p = putByte(p, (byte) 's');
- p = putByte(p, QwpConstants.TYPE_SYMBOL);
p = putByte(p, (byte) 0);
p = putVarint(p, 2L); // row 0 -> "gamma" (id=2)
@@ -591,8 +738,6 @@ public void testSymbolColumnNonDeltaInlineDictionary() throws Exception {
p = putVarint(p, 0L);
p = putVarint(p, 3L); // row_count
p = putVarint(p, 1L); // column_count
- p = putByte(p, QwpConstants.SCHEMA_MODE_FULL);
- p = putVarint(p, 0L);
p = putVarint(p, 1L);
p = putByte(p, (byte) 's');
p = putByte(p, QwpConstants.TYPE_SYMBOL);
@@ -632,8 +777,6 @@ public void testSymbolIndexOutOfRangeRejected() throws Exception {
p = putVarint(p, 0L);
p = putVarint(p, 1L);
p = putVarint(p, 1L);
- p = putByte(p, QwpConstants.SCHEMA_MODE_FULL);
- p = putVarint(p, 0L);
p = putVarint(p, 1L);
p = putByte(p, (byte) 's');
p = putByte(p, QwpConstants.TYPE_SYMBOL);
@@ -689,8 +832,6 @@ public void testTimestampColumnGorillaEncoding() throws Exception {
p = putVarint(p, 0L);
p = putVarint(p, values.length);
p = putVarint(p, 1L);
- p = putByte(p, QwpConstants.SCHEMA_MODE_FULL);
- p = putVarint(p, 0L);
p = putVarint(p, 1L);
p = putByte(p, (byte) 't');
p = putByte(p, QwpConstants.TYPE_TIMESTAMP);
@@ -745,8 +886,6 @@ public void testTimestampUnknownEncodingByteRejected() throws Exception {
p = putVarint(p, 0L);
p = putVarint(p, 3L);
p = putVarint(p, 1L);
- p = putByte(p, QwpConstants.SCHEMA_MODE_FULL);
- p = putVarint(p, 0L);
p = putVarint(p, 1L);
p = putByte(p, (byte) 't');
p = putByte(p, QwpConstants.TYPE_TIMESTAMP);
@@ -825,39 +964,6 @@ public void testTruncatedFixedColumnRejected() throws Exception {
});
}
- @Test
- public void testUnknownSchemaModeRejected() throws Exception {
- TestUtils.assertMemoryLeak(() -> {
- QwpResultBatchDecoder decoder = new QwpResultBatchDecoder();
- QwpBatchBuffer buffer = new QwpBatchBuffer(256);
- long staging = Unsafe.malloc(256, MemoryTag.NATIVE_DEFAULT);
- try {
- long p = staging;
- p = writeHeader(p, (byte) 0);
- p = putByte(p, (byte) 0x11);
- p = putLong(p, 1L);
- p = putVarint(p, 0L);
- p = putVarint(p, 0L);
- p = putVarint(p, 0L);
- p = putVarint(p, 0L);
- p = putByte(p, (byte) 0x42); // unknown schema_mode
- p = putVarint(p, 0L);
- buffer.copyFromPayload(staging, (int) (p - staging));
- try {
- decoder.decode(buffer);
- Assert.fail("decoder must reject unknown schema mode");
- } catch (QwpDecodeException expected) {
- Assert.assertTrue("error mentions schema mode: " + expected.getMessage(),
- expected.getMessage().contains("schema mode"));
- }
- } finally {
- Unsafe.free(staging, 256, MemoryTag.NATIVE_DEFAULT);
- buffer.close();
- decoder.close();
- }
- });
- }
-
@Test
public void testUnsupportedVersionRejected() throws Exception {
TestUtils.assertMemoryLeak(() -> {
@@ -1043,10 +1149,6 @@ private static long putVarint(long p, long value) {
* returned position points at where the column body starts.
*/
private static long startSingleColumnFrame(long buf, String colName, byte wireType, int rowCount) {
- return startSingleColumnFrame(buf, colName, wireType, rowCount, 0L);
- }
-
- private static long startSingleColumnFrame(long buf, String colName, byte wireType, int rowCount, long schemaId) {
long p = buf;
p = writeHeader(p, (byte) 0);
p = putByte(p, (byte) 0x11);
@@ -1055,8 +1157,6 @@ private static long startSingleColumnFrame(long buf, String colName, byte wireTy
p = putVarint(p, 0L); // table_name_len
p = putVarint(p, rowCount);
p = putVarint(p, 1L); // column_count
- p = putByte(p, QwpConstants.SCHEMA_MODE_FULL);
- p = putVarint(p, schemaId);
byte[] nameBytes = colName.getBytes(StandardCharsets.UTF_8);
p = putVarint(p, nameBytes.length);
for (byte b : nameBytes) p = putByte(p, b);
@@ -1085,9 +1185,9 @@ private static long writeHeader(long p, byte flags) {
// the decoder see flags == 0 and silently take the non-delta / non-gorilla
// / non-zstd path, leaving the delta/gorilla payload bytes at the position
// the table-block parser then advances over -- producing inscrutable
- // "unknown schema mode 0x..." errors mid-frame.
+ // table-block parse errors (bad varint, out-of-range column_count) mid-frame.
p = putInt(p, QwpConstants.MAGIC_MESSAGE);
- p = putByte(p, QwpConstants.VERSION_1);
+ p = putByte(p, QwpConstants.VERSION);
p = putByte(p, flags);
p = putByte(p, (byte) 0);
p = putByte(p, (byte) 1);
diff --git a/core/src/test/java/io/questdb/client/test/cutlass/qwp/client/QwpResultBatchDecoderHardeningTest.java b/core/src/test/java/io/questdb/client/test/cutlass/qwp/client/QwpResultBatchDecoderHardeningTest.java
index 41f8a2b6..073570b1 100644
--- a/core/src/test/java/io/questdb/client/test/cutlass/qwp/client/QwpResultBatchDecoderHardeningTest.java
+++ b/core/src/test/java/io/questdb/client/test/cutlass/qwp/client/QwpResultBatchDecoderHardeningTest.java
@@ -45,8 +45,8 @@
* Hardening tests for {@link QwpResultBatchDecoder} against malformed RESULT_BATCH
* frames from a hostile or buggy server. Each test crafts a wire payload directly
* in native memory and asserts that the decoder rejects it cleanly with a
- * {@link QwpDecodeException} rather than reading out of bounds, growing the
- * schema registry without bound, or returning negative offsets that propagate
+ * {@link QwpDecodeException} rather than reading out of bounds, accepting an
+ * out-of-range column_count, or returning negative offsets that propagate
* into accessors.
*/
public class QwpResultBatchDecoderHardeningTest {
@@ -110,6 +110,50 @@ public void testArrayValidDimensionsAreAccepted() throws Exception {
});
}
+ /**
+ * Regression for the surviving table-block guard: a {@code column_count}
+ * above {@link QwpConstants#MAX_COLUMNS_PER_TABLE} on the schema-bearing
+ * batch_seq == 0 must be rejected before the decoder tries to parse that
+ * many column descriptors. This guard moved into the batch_seq == 0 branch
+ * when schema-reference mode was removed; the old schema_id range check that
+ * used to sit beside it is gone, so this is the only remaining bound here.
+ */
+ @Test
+ public void testColumnCountOutOfRangeIsRejected() throws Exception {
+ TestUtils.assertMemoryLeak(() -> {
+ QwpResultBatchDecoder decoder = new QwpResultBatchDecoder();
+ QwpBatchBuffer buffer = new QwpBatchBuffer(64);
+ long staging = Unsafe.malloc(64, MemoryTag.NATIVE_DEFAULT);
+ try {
+ long p = staging;
+ p = putInt(p, QwpConstants.MAGIC_MESSAGE);
+ p = putByte(p, QwpConstants.VERSION);
+ p = putByte(p, (byte) 0); // flags
+ p = putByte(p, (byte) 0); // msg_kind in header (unused)
+ p = putByte(p, (byte) 1); // table_count
+ p = putInt(p, 0); // payload_length (unused)
+ p = putByte(p, (byte) 0x11); // RESULT_BATCH
+ p = putLong(p, 1L); // request_id
+ p = putVarint(p, 0L); // batch_seq = 0 (schema-bearing)
+ p = putVarint(p, 0L); // table_name_len
+ p = putVarint(p, 0L); // row_count
+ p = putVarint(p, 1_000_000_000L); // column_count: far above the cap
+ buffer.copyFromPayload(staging, (int) (p - staging));
+ try {
+ decoder.decode(buffer);
+ Assert.fail("decoder must reject an out-of-range column_count");
+ } catch (QwpDecodeException expected) {
+ Assert.assertTrue("error must reference column_count: " + expected.getMessage(),
+ expected.getMessage().contains("column_count out of range"));
+ }
+ } finally {
+ Unsafe.free(staging, 64, MemoryTag.NATIVE_DEFAULT);
+ buffer.close();
+ decoder.close();
+ }
+ });
+ }
+
@Test
public void testUnsupportedVersionThrowsProtocolVersionException() throws Exception {
TestUtils.assertMemoryLeak(() -> {
@@ -117,7 +161,7 @@ public void testUnsupportedVersionThrowsProtocolVersionException() throws Except
QwpBatchBuffer buffer = new QwpBatchBuffer(128);
long staging = Unsafe.malloc(128, MemoryTag.NATIVE_DEFAULT);
try {
- int len = writeMinimalResultBatch(staging, 0L);
+ int len = writeMinimalResultBatch(staging);
Unsafe.getUnsafe().putByte(staging + 4, (byte) 99);
buffer.copyFromPayload(staging, len);
try {
@@ -137,6 +181,39 @@ public void testUnsupportedVersionThrowsProtocolVersionException() throws Except
});
}
+ /**
+ * The protocol collapsed to a single version. Version 2 -- accepted by the
+ * pre-collapse range check ({@code >= VERSION_1 && <= MAX_SUPPORTED_VERSION})
+ * -- must now be rejected by the flattened {@code != VERSION} check, the same
+ * as any other non-1 version byte. Pins the boundary that actually changed.
+ */
+ @Test
+ public void testVersionTwoIsRejected() throws Exception {
+ TestUtils.assertMemoryLeak(() -> {
+ QwpResultBatchDecoder decoder = new QwpResultBatchDecoder();
+ QwpBatchBuffer buffer = new QwpBatchBuffer(128);
+ long staging = Unsafe.malloc(128, MemoryTag.NATIVE_DEFAULT);
+ try {
+ int len = writeMinimalResultBatch(staging);
+ Unsafe.getUnsafe().putByte(staging + 4, (byte) 2);
+ buffer.copyFromPayload(staging, len);
+ try {
+ decoder.decode(buffer);
+ Assert.fail("decoder must reject protocol version 2");
+ } catch (QwpProtocolVersionException expected) {
+ Assert.assertTrue("error must reference unsupported version: " + expected.getMessage(),
+ expected.getMessage().contains("unsupported version"));
+ Assert.assertTrue("must extend QwpDecodeException",
+ expected instanceof QwpDecodeException);
+ }
+ } finally {
+ Unsafe.free(staging, 128, MemoryTag.NATIVE_DEFAULT);
+ buffer.close();
+ decoder.close();
+ }
+ });
+ }
+
/**
* Regression: a delta SYMBOL dict entry whose length exceeds
* {@link Integer#MAX_VALUE} must be rejected. Prior to the fix, the
@@ -314,73 +391,6 @@ public void testGeohashPrecisionBelowMinIsRejected() throws Exception {
});
}
- /**
- * Regression for C5: a server-supplied {@code schema_id} above the per-connection
- * cap must be rejected. Without the fix, {@code ensureSchemaSlot} would happily
- * append nulls until OOM (or AIOOBE for negative ids cast from a high varint).
- */
- @Test
- public void testHugeSchemaIdIsRejected() throws Exception {
- TestUtils.assertMemoryLeak(() -> {
- QwpResultBatchDecoder decoder = new QwpResultBatchDecoder();
- QwpBatchBuffer buffer = new QwpBatchBuffer(256);
- long staging = Unsafe.malloc(256, MemoryTag.NATIVE_DEFAULT);
- try {
- // schema_id = 1_000_000_000, well above the 65_535 cap.
- int len = writeMinimalResultBatch(staging, /*schemaId=*/ 1_000_000_000L);
- buffer.copyFromPayload(staging, len);
- try {
- decoder.decode(buffer);
- Assert.fail("decoder must reject huge schema_id");
- } catch (QwpDecodeException expected) {
- Assert.assertTrue("error message should mention schema_id: " + expected.getMessage(),
- expected.getMessage().contains("schema_id"));
- }
- } finally {
- Unsafe.free(staging, 256, MemoryTag.NATIVE_DEFAULT);
- buffer.close();
- decoder.close();
- }
- });
- }
-
- /**
- * Regression for C5: a varint that long-to-int casts to a negative value
- * (a hostile high varint with the sign bit set after the cast) must be
- * rejected, not silently passed to {@code getQuick(negativeIndex)}.
- */
- @Test
- public void testNegativeSchemaIdIsRejected() throws Exception {
- TestUtils.assertMemoryLeak(() -> {
- QwpResultBatchDecoder decoder = new QwpResultBatchDecoder();
- QwpBatchBuffer buffer = new QwpBatchBuffer(256);
- long staging = Unsafe.malloc(256, MemoryTag.NATIVE_DEFAULT);
- try {
- // 5-byte varint encoding 0x80000000 (which casts to Integer.MIN_VALUE).
- // varint bytes for 0x80000000:
- // value bits 7..0: 0x00 -> byte: 0x80 (continuation)
- // value bits 14..8: 0x00 -> byte: 0x80
- // value bits 21..15:0x00 -> byte: 0x80
- // value bits 28..22:0x00 -> byte: 0x80
- // value bits 35..29:0x08 -> byte: 0x08 (no continuation)
- int len = writeMinimalResultBatchWithRawSchemaIdVarint(
- staging, new byte[]{(byte) 0x80, (byte) 0x80, (byte) 0x80, (byte) 0x80, (byte) 0x08});
- buffer.copyFromPayload(staging, len);
- try {
- decoder.decode(buffer);
- Assert.fail("decoder must reject huge/negative schema_id");
- } catch (QwpDecodeException expected) {
- Assert.assertTrue("error message should mention schema_id: " + expected.getMessage(),
- expected.getMessage().contains("schema_id"));
- }
- } finally {
- Unsafe.free(staging, 256, MemoryTag.NATIVE_DEFAULT);
- buffer.close();
- decoder.close();
- }
- });
- }
-
/**
* Regression for C3: a hostile or buggy server can send a QUERY_ERROR frame
* that claims a 65535-byte message but supplies a tiny payload. Without the
@@ -822,9 +832,8 @@ private static void closePool(QwpEgressIoThread io) {
* table-block:
* name_len (varint), name bytes (none)
* row_count (varint)
- * column_count (varint)
- * schema_mode (1 byte) + schema_id (varint)
- * [if FULL] per column: name_len varint, name bytes, wire_type byte
+ * column_count (varint, batch_seq == 0 only)
+ * per column (batch_seq == 0 only): name_len varint, name bytes, wire_type byte
* per column: null_flag byte (+optional bitmap), then column body
*
*/
@@ -863,7 +872,7 @@ private static long putVarint(long p, long value) {
private static int writeArrayResultBatchWithDims(long buf, int[] dims) {
long p = buf;
p = putInt(p, QwpConstants.MAGIC_MESSAGE);
- p = putByte(p, QwpConstants.VERSION_1);
+ p = putByte(p, QwpConstants.VERSION);
p = putByte(p, (byte) 0);
p = putByte(p, (byte) 0);
p = putByte(p, (byte) 1);
@@ -874,8 +883,6 @@ private static int writeArrayResultBatchWithDims(long buf, int[] dims) {
p = putVarint(p, 0L); // table_name_len
p = putVarint(p, 1L); // row_count = 1
p = putVarint(p, 1L); // column_count
- p = putByte(p, QwpConstants.SCHEMA_MODE_FULL);
- p = putVarint(p, 0L); // schema_id
p = putVarint(p, 1L); // column name length
p = putByte(p, (byte) 'a');
p = putByte(p, QwpConstants.TYPE_DOUBLE_ARRAY);
@@ -901,7 +908,7 @@ private static int writeArrayResultBatchWithDims(long buf, int[] dims) {
private static int writeDeltaSymbolDictFrame(long buf, byte[] entryLenVarint) {
long p = buf;
p = putInt(p, QwpConstants.MAGIC_MESSAGE);
- p = putByte(p, QwpConstants.VERSION_1);
+ p = putByte(p, QwpConstants.VERSION);
// Flags byte lives at HEADER_OFFSET_FLAGS (=5). Existing helpers with
// flags=0 don't care about the exact slot; a delta-mode frame does.
p = putByte(p, QwpConstants.FLAG_DELTA_SYMBOL_DICT);
@@ -929,7 +936,7 @@ private static int writeDeltaSymbolDictFrame(long buf, byte[] entryLenVarint) {
private static int writeDeltaSymbolDictFrameWithRange(long buf, long deltaStart, long deltaCount) {
long p = buf;
p = putInt(p, QwpConstants.MAGIC_MESSAGE);
- p = putByte(p, QwpConstants.VERSION_1);
+ p = putByte(p, QwpConstants.VERSION);
p = putByte(p, QwpConstants.FLAG_DELTA_SYMBOL_DICT); // flags at HEADER_OFFSET_FLAGS
p = putByte(p, (byte) 0); // reserved
p = putByte(p, (byte) 1); // table_count
@@ -951,7 +958,7 @@ private static int writeDeltaSymbolDictFrameWithRange(long buf, long deltaStart,
private static int writeExecDoneTruncatedRowsAffected(long buf) {
long p = buf;
p = putInt(p, QwpConstants.MAGIC_MESSAGE);
- p = putByte(p, QwpConstants.VERSION_1);
+ p = putByte(p, QwpConstants.VERSION);
p = putByte(p, (byte) 0);
p = putByte(p, (byte) 0);
p = putByte(p, (byte) 1);
@@ -973,7 +980,7 @@ private static int writeExecDoneTruncatedRowsAffected(long buf) {
private static int writeGeohashResultBatch(long buf, long precisionBits) {
long p = buf;
p = putInt(p, QwpConstants.MAGIC_MESSAGE);
- p = putByte(p, QwpConstants.VERSION_1);
+ p = putByte(p, QwpConstants.VERSION);
p = putByte(p, (byte) 0); // header msg_kind (unused)
p = putByte(p, (byte) 0); // flags (no delta dict)
p = putByte(p, (byte) 1); // table_count
@@ -984,8 +991,6 @@ private static int writeGeohashResultBatch(long buf, long precisionBits) {
p = putVarint(p, 0L); // table_name_len
p = putVarint(p, 0L); // row_count (no data rows)
p = putVarint(p, 1L); // column_count
- p = putByte(p, QwpConstants.SCHEMA_MODE_FULL);
- p = putVarint(p, 0L); // schema_id
// Schema: one column "g" of TYPE_GEOHASH.
p = putVarint(p, 1L);
p = putByte(p, (byte) 'g');
@@ -997,10 +1002,10 @@ private static int writeGeohashResultBatch(long buf, long precisionBits) {
return (int) (p - buf);
}
- private static int writeMinimalResultBatch(long buf, long schemaId) {
+ private static int writeMinimalResultBatch(long buf) {
long p = buf;
p = putInt(p, QwpConstants.MAGIC_MESSAGE);
- p = putByte(p, QwpConstants.VERSION_1);
+ p = putByte(p, QwpConstants.VERSION);
p = putByte(p, (byte) 0);
p = putByte(p, (byte) 0);
p = putByte(p, (byte) 1);
@@ -1011,8 +1016,6 @@ private static int writeMinimalResultBatch(long buf, long schemaId) {
p = putVarint(p, 0L); // table_name_len
p = putVarint(p, 0L); // row_count = 0 (no body needed)
p = putVarint(p, 0L); // column_count = 0
- p = putByte(p, QwpConstants.SCHEMA_MODE_FULL);
- p = putVarint(p, schemaId);
return (int) (p - buf);
}
@@ -1024,7 +1027,7 @@ private static int writeMinimalResultBatch(long buf, long schemaId) {
private static int writeMinimalResultBatchWithRawNameLenVarint(long buf, byte[] nameLenVarint) {
long p = buf;
p = putInt(p, QwpConstants.MAGIC_MESSAGE);
- p = putByte(p, QwpConstants.VERSION_1);
+ p = putByte(p, QwpConstants.VERSION);
p = putByte(p, (byte) 0);
p = putByte(p, (byte) 0);
p = putByte(p, (byte) 1);
@@ -1036,30 +1039,6 @@ private static int writeMinimalResultBatchWithRawNameLenVarint(long buf, byte[]
return (int) (p - buf);
}
- /**
- * Variant that writes a custom raw varint sequence for schema_id. Lets us
- * inject a multi-byte varint that decodes to a value with the int sign bit
- * set after long-to-int truncation.
- */
- private static int writeMinimalResultBatchWithRawSchemaIdVarint(long buf, byte[] schemaIdVarint) {
- long p = buf;
- p = putInt(p, QwpConstants.MAGIC_MESSAGE);
- p = putByte(p, QwpConstants.VERSION_1);
- p = putByte(p, (byte) 0);
- p = putByte(p, (byte) 0);
- p = putByte(p, (byte) 1);
- p = putInt(p, 0);
- p = putByte(p, (byte) 0x11);
- p = putLong(p, 1L);
- p = putVarint(p, 0L);
- p = putVarint(p, 0L);
- p = putVarint(p, 0L);
- p = putVarint(p, 0L);
- p = putByte(p, QwpConstants.SCHEMA_MODE_FULL);
- for (byte b : schemaIdVarint) p = putByte(p, b);
- return (int) (p - buf);
- }
-
/**
* Crafts a RESULT_BATCH frame whose flags byte advertises FLAG_ZSTD but
* whose body is junk (not a valid zstd frame). Used by
@@ -1068,7 +1047,7 @@ private static int writeMinimalResultBatchWithRawSchemaIdVarint(long buf, byte[]
private static int writeResultBatchWithCorruptZstdBody(long buf) {
long p = buf;
p = putInt(p, QwpConstants.MAGIC_MESSAGE);
- p = putByte(p, QwpConstants.VERSION_1);
+ p = putByte(p, QwpConstants.VERSION);
p = putByte(p, QwpConstants.FLAG_ZSTD); // flags byte (offset 5) -- FLAG_ZSTD set
p = putByte(p, (byte) 0);
p = putByte(p, (byte) 1);
@@ -1091,7 +1070,7 @@ private static int writeResultBatchWithCorruptZstdBody(long buf) {
private static int writeResultEndTruncatedFinalSeq(long buf) {
long p = buf;
p = putInt(p, QwpConstants.MAGIC_MESSAGE);
- p = putByte(p, QwpConstants.VERSION_1);
+ p = putByte(p, QwpConstants.VERSION);
p = putByte(p, (byte) 0);
p = putByte(p, (byte) 0);
p = putByte(p, (byte) 1);
@@ -1110,7 +1089,7 @@ private static int writeResultEndTruncatedFinalSeq(long buf) {
private static int writeResultEndTruncatedTotalRows(long buf) {
long p = buf;
p = putInt(p, QwpConstants.MAGIC_MESSAGE);
- p = putByte(p, QwpConstants.VERSION_1);
+ p = putByte(p, QwpConstants.VERSION);
p = putByte(p, (byte) 0);
p = putByte(p, (byte) 0);
p = putByte(p, (byte) 1);
@@ -1126,7 +1105,7 @@ private static int writeStringResultBatch(long buf, int nonNull, int totalBytes)
long p = buf;
// Header: magic + version + msg_kind + flags + table_count + payload_length
p = putInt(p, QwpConstants.MAGIC_MESSAGE); // 4
- p = putByte(p, QwpConstants.VERSION_1); // 1
+ p = putByte(p, QwpConstants.VERSION); // 1
p = putByte(p, (byte) 0); // msg_kind in header (unused by client)
p = putByte(p, (byte) 0); // flags
p = putByte(p, (byte) 1); // table_count
@@ -1139,9 +1118,7 @@ private static int writeStringResultBatch(long buf, int nonNull, int totalBytes)
p = putVarint(p, 0L); // table_name_len = 0
p = putVarint(p, nonNull); // row_count
p = putVarint(p, 1L); // column_count
- p = putByte(p, QwpConstants.SCHEMA_MODE_FULL);
- p = putVarint(p, 0L); // schema_id
- // Schema entries (full): one column "s" of TYPE_VARCHAR
+ // Schema entries: one column "s" of TYPE_VARCHAR
p = putVarint(p, 1L); // column name length
p = putByte(p, (byte) 's');
p = putByte(p, QwpConstants.TYPE_VARCHAR);
@@ -1173,7 +1150,7 @@ private static int writeStringResultBatch(long buf, int nonNull, int totalBytes)
private static int writeStringResultBatchWithOffsets(long buf, int[] intOffsets, int totalBytes, int bytesLen) {
long p = buf;
p = putInt(p, QwpConstants.MAGIC_MESSAGE);
- p = putByte(p, QwpConstants.VERSION_1);
+ p = putByte(p, QwpConstants.VERSION);
p = putByte(p, (byte) 0);
p = putByte(p, (byte) 0);
p = putByte(p, (byte) 1);
@@ -1184,8 +1161,6 @@ private static int writeStringResultBatchWithOffsets(long buf, int[] intOffsets,
p = putVarint(p, 0L); // table_name_len
p = putVarint(p, intOffsets.length); // row_count = nonNull
p = putVarint(p, 1L); // column_count
- p = putByte(p, QwpConstants.SCHEMA_MODE_FULL);
- p = putVarint(p, 0L); // schema_id
p = putVarint(p, 1L); // column name length
p = putByte(p, (byte) 's');
p = putByte(p, QwpConstants.TYPE_VARCHAR);
@@ -1209,7 +1184,7 @@ private static int writeStringResultBatchWithOffsets(long buf, int[] intOffsets,
private static int writeSymbolResultBatch(long buf, long rowCount, long dictSize) {
long p = buf;
p = putInt(p, QwpConstants.MAGIC_MESSAGE);
- p = putByte(p, QwpConstants.VERSION_1);
+ p = putByte(p, QwpConstants.VERSION);
p = putByte(p, (byte) 0);
p = putByte(p, (byte) 0); // flags: no delta dict, so non-delta SYMBOL path
p = putByte(p, (byte) 1);
@@ -1220,8 +1195,6 @@ private static int writeSymbolResultBatch(long buf, long rowCount, long dictSize
p = putVarint(p, 0L); // table_name_len
p = putVarint(p, rowCount);
p = putVarint(p, 1L); // column_count
- p = putByte(p, QwpConstants.SCHEMA_MODE_FULL);
- p = putVarint(p, 0L); // schema_id
// Schema: one column "s" of TYPE_SYMBOL.
p = putVarint(p, 1L);
p = putByte(p, (byte) 's');
diff --git a/core/src/test/java/io/questdb/client/test/cutlass/qwp/client/QwpUdpSenderTest.java b/core/src/test/java/io/questdb/client/test/cutlass/qwp/client/QwpUdpSenderTest.java
index 98bbbdeb..067a8a97 100644
--- a/core/src/test/java/io/questdb/client/test/cutlass/qwp/client/QwpUdpSenderTest.java
+++ b/core/src/test/java/io/questdb/client/test/cutlass/qwp/client/QwpUdpSenderTest.java
@@ -2325,7 +2325,7 @@ private DatagramDecoder(byte[] packet) {
private List
- * The cursor SF path used to elide schema definitions and previously-sent
- * symbols on subsequent batches over the same connection — emitting refs
- * + delta-dicts. That's wrong for SF: the bytes survive process restarts
- * and are replayed against fresh server connections (post-reconnect, or
- * via a background drainer adopting an orphan slot). A frame with a
- * schema-ref to an ID the new server has never seen is unrecoverable.
+ * The cursor SF path used to elide previously-sent symbols on subsequent
+ * batches over the same connection, emitting a delta-dict that carried only
+ * the new entries. That's wrong for SF: the bytes survive process restarts and
+ * replay against fresh server connections (post-reconnect, or via a background
+ * drainer adopting an orphan slot). A delta that references symbol ids the new
+ * server has never seen is unrecoverable.
*
- * Today every frame must carry its full schema and a complete symbol-dict
- * delta starting at id 0. This test asserts both invariants on the wire.
+ * Today every frame must carry a complete symbol-dict delta starting at id 0
+ * (column schemas travel inline on the first batch too). This test asserts the
+ * symbol-dict invariant on the wire.
*/
public class SelfSufficientFramesTest {
diff --git a/core/src/test/java/io/questdb/client/test/cutlass/qwp/websocket/TestWebSocketServer.java b/core/src/test/java/io/questdb/client/test/cutlass/qwp/websocket/TestWebSocketServer.java
index 3da4f829..fadc3eed 100644
--- a/core/src/test/java/io/questdb/client/test/cutlass/qwp/websocket/TestWebSocketServer.java
+++ b/core/src/test/java/io/questdb/client/test/cutlass/qwp/websocket/TestWebSocketServer.java
@@ -66,6 +66,12 @@ public class TestWebSocketServer implements Closeable {
// The server emits the header on both the 101 success path and (when
// rejectingRole != null) the 421 misdirected-request path.
private volatile String advertisedRole;
+ // When true, the server sends a SERVER_INFO frame right after a successful
+ // 101 upgrade, mirroring the egress server contract (which always emits it).
+ // The advertised role follows advertisedRole (STANDALONE when unset). Egress
+ // QwpQueryClient tests enable this; ingress sender tests leave it off so their
+ // connections carry only ACK frames.
+ private volatile boolean sendServerInfo;
// When non-null the next handshake responds with HTTP 421 Misdirected
// Request + X-QuestDB-Role: