Skip to content
30 changes: 0 additions & 30 deletions core/src/main/java/io/questdb/client/Sender.java
Original file line number Diff line number Diff line change
Expand Up @@ -1036,7 +1036,6 @@ final class LineSenderBuilder {
private int maxBackoffMillis = PARAMETER_NOT_SET_EXPLICITLY;
private int maxDatagramSize = PARAMETER_NOT_SET_EXPLICITLY;
private int maxNameLength = PARAMETER_NOT_SET_EXPLICITLY;
private int maxSchemasPerConnection = PARAMETER_NOT_SET_EXPLICITLY;
private int maximumBufferCapacity = PARAMETER_NOT_SET_EXPLICITLY;
private final HttpClientConfiguration httpClientConfiguration = new DefaultHttpClientConfiguration() {
@Override
Expand Down Expand Up @@ -1369,8 +1368,6 @@ public Sender build() {
long actualAutoFlushIntervalNanos = autoFlushIntervalMillis == PARAMETER_NOT_SET_EXPLICITLY
? DEFAULT_WS_AUTO_FLUSH_INTERVAL_NANOS
: TimeUnit.MILLISECONDS.toNanos(autoFlushIntervalMillis);
int actualMaxSchemasPerConnection = maxSchemasPerConnection == PARAMETER_NOT_SET_EXPLICITLY
? QwpWebSocketSender.DEFAULT_MAX_SCHEMAS_PER_CONNECTION : maxSchemasPerConnection;

String wsAuthHeader = buildWebSocketAuthHeader();

Expand Down Expand Up @@ -1499,7 +1496,6 @@ public Sender build() {
actualAutoFlushBytes,
actualAutoFlushIntervalNanos,
wsAuthHeader,
actualMaxSchemasPerConnection,
requestDurableAck,
cursorEngine,
actualCloseFlushTimeoutMillis,
Expand Down Expand Up @@ -2217,25 +2213,6 @@ public LineSenderBuilder maxNameLength(int maxNameLength) {
return this;
}

/**
* Sets the maximum number of distinct schemas the WebSocket sender may assign on one connection.
*/
public LineSenderBuilder maxSchemasPerConnection(int maxSchemasPerConnection) {
if (protocol != PARAMETER_NOT_SET_EXPLICITLY && protocol != PROTOCOL_WEBSOCKET) {
throw new LineSenderException("max schemas per connection is only supported for WebSocket transport");
}
if (this.maxSchemasPerConnection != PARAMETER_NOT_SET_EXPLICITLY) {
throw new LineSenderException("max schemas per connection was already configured")
.put("[maxSchemasPerConnection=").put(this.maxSchemasPerConnection).put("]");
}
if (maxSchemasPerConnection < 1) {
throw new LineSenderException("max schemas per connection must be positive")
.put("[maxSchemasPerConnection=").put(maxSchemasPerConnection).put("]");
}
this.maxSchemasPerConnection = maxSchemasPerConnection;
return this;
}

/**
* Minimum expected throughput in bytes per second for HTTP requests.
* <br>
Expand Down Expand Up @@ -3154,13 +3131,6 @@ private LineSenderBuilder fromConfig(CharSequence configurationString) {
} else {
throw new LineSenderException("invalid transaction [value=").put(sink).put(", allowed-values=[on, off]]");
}
} else if (Chars.equals("max_schemas_per_connection", sink)) {
if (protocol != PROTOCOL_WEBSOCKET) {
throw new LineSenderException("max_schemas_per_connection is only supported for WebSocket transport");
}
pos = getValue(configurationString, pos, sink, "max_schemas_per_connection");
int maxSchemas = parseIntValue(sink, "max_schemas_per_connection");
maxSchemasPerConnection(maxSchemas);
} else if (Chars.equals("sf_dir", sink)) {
if (protocol != PROTOCOL_WEBSOCKET) {
throw new LineSenderException("sf_dir is only supported for WebSocket transport");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,8 +59,8 @@
* {@code arrayRowAddr} accessors expose raw native pointers into the WebSocket
* payload buffer for SIMD or JNI consumers. The layout of the bytes behind
* these addresses follows the QWP wire format (see
* {@code docs/qwp/wire-egress.md}); these methods are an expert API and
* may shift as the wire format evolves.
* {@code https://questdb.com/docs/connect/wire-protocols/qwp-egress-websocket/});
* these methods are an expert API and may shift as the wire format evolves.
* <p>
* <strong>Type contract / NULL handling.</strong> Each typed accessor delegates
* to the same read path as the corresponding {@code QwpColumnBatch}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -618,7 +618,8 @@ public long getUuidLo(int col, int row) {
/**
* True if the cell is NULL on the wire.
* <p>
* Note on type-specific sentinels (see {@code docs/qwp/wire-egress.md} sec 11.5):
* Note on type-specific sentinels (see
* {@code https://questdb.com/docs/connect/wire-protocols/qwp-egress-websocket/}):
* QuestDB stores NULL as a sentinel value for several types -- {@code Long.MIN_VALUE}
* for LONG/INT/etc., {@code 0.0.0.0} for IPv4, {@code -1} for GEOHASH, and crucially
* {@code NaN} for FLOAT and DOUBLE. Egress preserves these conventions: a row carrying
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -106,8 +106,7 @@ default void onError(long requestId, byte status, String message) {
* the query (with {@code failover=on}, the default).
* <p>
* {@code newNode} is the {@link QwpServerInfo} of the endpoint the client
* just bound to, or {@code null} if the new server negotiated the v1
* protocol (no SERVER_INFO frame).
* just bound to.
* <p>
* After this callback fires, {@link #onBatch} will be invoked again with
* {@code batch_seq} restarting at 0 on the new connection. Handlers that
Expand All @@ -127,7 +126,7 @@ default void onFailoverReset(QwpServerInfo newNode) {
* {@link #onFailoverReset(QwpServerInfo)} for backwards compatibility.
*
* @param requestId client-assigned request id of the query being replayed
* @param newNode the endpoint just bound to, or {@code null} if the new server negotiated v1
* @param newNode the {@link QwpServerInfo} of the endpoint just bound to
*/
default void onFailoverReset(long requestId, QwpServerInfo newNode) {
onFailoverReset(newNode);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -279,26 +279,16 @@ private void writeSymbolColumnWithGlobalIds(QwpTableBuffer.ColumnBuffer col, int
}
}

private void writeTableHeaderWithSchema(String tableName, int rowCount, int schemaId, QwpColumnDef[] columns) {
private void writeTableHeader(String tableName, int rowCount, QwpColumnDef[] columns) {
buffer.putString(tableName);
buffer.putVarint(rowCount);
buffer.putVarint(columns.length);
buffer.putByte(SCHEMA_MODE_FULL);
buffer.putVarint(schemaId);
for (QwpColumnDef col : columns) {
buffer.putString(col.getName());
buffer.putByte(col.getWireTypeCode());
}
}

private void writeTableHeaderWithSchemaRef(String tableName, int rowCount, int schemaId, int columnCount) {
buffer.putString(tableName);
buffer.putVarint(rowCount);
buffer.putVarint(columnCount);
buffer.putByte(SCHEMA_MODE_REFERENCE);
buffer.putVarint(schemaId);
}

private void writeTimestampColumn(long addr, int count, boolean useGorilla) {
if (useGorilla && count > 2) {
// Single pass: check feasibility and compute encoded size together
Expand All @@ -325,12 +315,8 @@ private void writeTimestampColumn(long addr, int count, boolean useGorilla) {
}
}

void encodeTable(QwpTableBuffer tableBuffer, boolean useSchemaRef, boolean useGlobalSymbols, boolean useGorilla) {
int schemaId = tableBuffer.getSchemaId();
if (schemaId < 0) {
schemaId = 0;
}
encodeTable(tableBuffer, tableBuffer.getRowCount(), null, null, null, useSchemaRef, useGlobalSymbols, useGorilla, schemaId);
void encodeTable(QwpTableBuffer tableBuffer, boolean useGlobalSymbols, boolean useGorilla) {
encodeTable(tableBuffer, tableBuffer.getRowCount(), null, null, null, useGlobalSymbols, useGorilla);
}

void encodeTable(
Expand All @@ -339,23 +325,12 @@ void encodeTable(
int[] limitedValueCounts,
long[] limitedStringDataSizes,
int[] limitedSymbolDictionarySizes,
boolean useSchemaRef,
boolean useGlobalSymbols,
boolean useGorilla,
int schemaId
boolean useGorilla
) {
QwpColumnDef[] columnDefs = tableBuffer.getColumnDefs();

if (useSchemaRef) {
writeTableHeaderWithSchemaRef(
tableBuffer.getTableName(),
rowCount,
schemaId,
columnDefs.length
);
} else {
writeTableHeaderWithSchema(tableBuffer.getTableName(), rowCount, schemaId, columnDefs);
}
writeTableHeader(tableBuffer.getTableName(), rowCount, columnDefs);

for (int i = 0; i < tableBuffer.getColumnCount(); i++) {
QwpTableBuffer.ColumnBuffer col = tableBuffer.getColumn(i);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -214,11 +214,10 @@ public void onBinaryMessage(long payloadPtr, int payloadLen) {
decodeAndEmitError(payloadPtr, payloadLen);
currentQueryDone = true;
} else if (msgKind == QwpEgressMsgKind.CACHE_RESET) {
// Server reached a configured soft cap on the connection-scoped
// SYMBOL dict or schema-fingerprint cache. Drop the indicated
// caches on this side so the next RESULT_BATCH's deltaStart and
// schema-reference ids line up with the server's fresh counter.
// No user-visible event -- CACHE_RESET never arrives between the
// Server reached its configured soft cap on the connection-scoped
// SYMBOL dict. Drop the dict on this side so the next RESULT_BATCH's
// deltaStart lines up with the server's fresh counter. No
// user-visible event -- CACHE_RESET never arrives between the
// RESULT_BATCH / RESULT_END / EXEC_DONE / QUERY_ERROR of a query
// and the user callback, only after it.
handleCacheReset(payloadPtr, payloadLen);
Expand Down Expand Up @@ -318,6 +317,10 @@ public void run() {
currentQueryDone = false;
currentRequestId = req.requestId;
creditEnabled = req.initialCredit > 0L;
// The schema rides the first RESULT_BATCH (batch_seq == 0) of each
// query; invalidate any schema left from the prior query so a
// continuation batch can't bind rows to a stale schema.
decoder.resetQuerySchema();
sendQueryRequest(req);

while (!currentQueryDone && !shutdown) {
Expand Down Expand Up @@ -573,8 +576,8 @@ private void emitTransportErrorBlocking(String message) {

/**
* Decodes a {@code CACHE_RESET} frame body and clears the indicated
* connection-scoped caches on the client side. Body is a single byte mask:
* bit 0 = SYMBOL dict, bit 1 = schema-fingerprint cache.
* connection-scoped caches on the client side. Body is a single byte mask;
* bit 0 = SYMBOL dict is the only bit defined.
*/
private void handleCacheReset(long payloadPtr, int payloadLen) {
int bodyStart = QwpConstants.HEADER_SIZE + 1; // msg_kind byte consumed by caller
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,9 +32,9 @@
public final class QwpEgressMsgKind {
/**
* Server -> client. Connection-scoped cache reset. Body: {@code reset_mask:u8}
* with bit 0 = SYMBOL dict, bit 1 = schema-fingerprint cache. Sent between
* queries when a cache hits its server-side soft cap. Recipient clears the
* indicated caches; subsequent RESULT_BATCH delta sections start fresh.
* with bit 0 = SYMBOL dict (the only bit defined). Sent between queries when
* the dict hits its server-side soft cap. Recipient clears the dict;
* subsequent RESULT_BATCH delta sections start fresh.
*/
public static final byte CACHE_RESET = 0x17;
public static final byte CANCEL = 0x14;
Expand All @@ -59,10 +59,6 @@ public final class QwpEgressMsgKind {
* Reset mask bit: clear the connection-scoped SYMBOL dict.
*/
public static final byte RESET_MASK_DICT = 0x01;
/**
* Reset mask bit: clear the connection-scoped schema-fingerprint cache.
*/
public static final byte RESET_MASK_SCHEMAS = 0x02;
public static final byte RESULT_BATCH = 0x11;
public static final byte RESULT_END = 0x12;
/**
Expand All @@ -88,7 +84,7 @@ public final class QwpEgressMsgKind {
public static final byte ROLE_STANDALONE = 0;
/**
* Server -> client. Unsolicited frame delivered as the first QWP message
* on every v2 WebSocket connection. Body (little-endian): {@code
* on every WebSocket connection. Body (little-endian): {@code
* msg_kind:u8, role:u8, epoch:u64, capabilities:u32, server_wall_ns:i64,
* cluster_id:u16_len+utf8, node_id:u16_len+utf8} followed by an optional
* {@code zone_id:u16_len+utf8} when the {@link #CAP_ZONE} bit is set in
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -274,7 +274,7 @@ public void recordTransportError(int idx) {
/**
* Records a server-advertised zone for the given host index per
* failover.md §2.1. Called once with {@code SERVER_INFO.zone_id} after a
* successful upgrade on a v2 connection (gated by {@code CAP_ZONE}), and
* successful upgrade (gated by {@code CAP_ZONE}), and
* once with the {@code X-QuestDB-Zone} HTTP header on a {@code 421}
* upgrade reject.
* <p>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@
import io.questdb.client.std.str.StringSink;

/**
* Server negotiated a QWP version outside {@code [VERSION_1, MAX_SUPPORTED_VERSION]}.
* Server negotiated a QWP version other than {@code QwpConstants.VERSION}.
* Terminal: version negotiation is cluster-wide, so failover masks the disagreement.
*/
public class QwpProtocolVersionException extends QwpDecodeException {
Expand Down
Loading
Loading