Skip to content

Commit 015a689

Browse files
GH-838: [FlightSQL][JDBC] Preserve sub-millisecond timestamp precision in PreparedStatement parameter binding
Intercept raw java.sql.Timestamp objects at the JDBC layer before Avatica serializes them to epoch milliseconds (losing sub-ms nanos). Propagate the original Timestamp through ArrowFlightMetaImpl and AvaticaParameterBinder to TimestampAvaticaParameterConverter, which reconstructs full-precision epoch values from Timestamp.getTime() + Timestamp.getNanos(). Add unit tests for micro/nano/milli/second precision with raw Timestamp, and integration tests exercising the full setTimestamp() path through the mock FlightSQL server.
1 parent 14079fc commit 015a689

File tree

9 files changed

+301
-11
lines changed

9 files changed

+301
-11
lines changed

flight/flight-sql-jdbc-core/src/main/java/org/apache/arrow/driver/jdbc/ArrowFlightMetaImpl.java

Lines changed: 14 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
import java.sql.Connection;
2020
import java.sql.SQLException;
2121
import java.sql.SQLTimeoutException;
22+
import java.sql.Timestamp;
2223
import java.util.ArrayList;
2324
import java.util.Collections;
2425
import java.util.List;
@@ -31,6 +32,7 @@
3132
import org.apache.arrow.vector.types.pojo.Schema;
3233
import org.apache.calcite.avatica.AvaticaConnection;
3334
import org.apache.calcite.avatica.AvaticaParameter;
35+
import org.apache.calcite.avatica.AvaticaStatement;
3436
import org.apache.calcite.avatica.ColumnMetaData;
3537
import org.apache.calcite.avatica.MetaImpl;
3638
import org.apache.calcite.avatica.NoSuchStatementException;
@@ -105,9 +107,10 @@ public ExecuteResult execute(
105107
throw new IllegalStateException("Prepared statement not found: " + statementHandle);
106108
}
107109

110+
Map<Integer, Timestamp> rawTimestamps = getRawTimestamps(statementHandle);
108111
new AvaticaParameterBinder(
109112
preparedStatement, ((ArrowFlightConnection) connection).getBufferAllocator())
110-
.bind(typedValues);
113+
.bind(typedValues, 0, rawTimestamps);
111114

112115
if (statementHandle.signature == null
113116
|| statementHandle.signature.statementType == StatementType.IS_DML) {
@@ -149,11 +152,12 @@ public ExecuteBatchResult executeBatch(
149152
throw new IllegalStateException("Prepared statement not found: " + statementHandle);
150153
}
151154

155+
Map<Integer, Timestamp> rawTimestamps = getRawTimestamps(statementHandle);
152156
final AvaticaParameterBinder binder =
153157
new AvaticaParameterBinder(
154158
preparedStatement, ((ArrowFlightConnection) connection).getBufferAllocator());
155159
for (int i = 0; i < parameterValuesList.size(); i++) {
156-
binder.bind(parameterValuesList.get(i), i);
160+
binder.bind(parameterValuesList.get(i), i, rawTimestamps);
157161
}
158162

159163
// Update query
@@ -173,6 +177,14 @@ public Frame fetch(
173177
String.format("%s does not use frames.", this), AvaticaConnection.HELPER.unsupported());
174178
}
175179

180+
private Map<Integer, Timestamp> getRawTimestamps(StatementHandle statementHandle) {
181+
AvaticaStatement avaticaStmt = connection.statementMap.get(statementHandle.id);
182+
if (avaticaStmt instanceof ArrowFlightPreparedStatement) {
183+
return ((ArrowFlightPreparedStatement) avaticaStmt).getRawTimestamps();
184+
}
185+
return Collections.emptyMap();
186+
}
187+
176188
private PreparedStatement prepareForHandle(final String query, StatementHandle handle) {
177189
final PreparedStatement preparedStatement =
178190
((ArrowFlightConnection) connection).getClientHandler().prepare(query);

flight/flight-sql-jdbc-core/src/main/java/org/apache/arrow/driver/jdbc/ArrowFlightPreparedStatement.java

Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,11 @@
1818

1919
import java.sql.PreparedStatement;
2020
import java.sql.SQLException;
21+
import java.sql.Timestamp;
22+
import java.util.Calendar;
23+
import java.util.Collections;
24+
import java.util.HashMap;
25+
import java.util.Map;
2126
import org.apache.arrow.driver.jdbc.client.ArrowFlightSqlClientHandler;
2227
import org.apache.arrow.flight.FlightInfo;
2328
import org.apache.arrow.util.Preconditions;
@@ -30,6 +35,7 @@ public class ArrowFlightPreparedStatement extends AvaticaPreparedStatement
3035
implements ArrowFlightInfoStatement {
3136

3237
private final ArrowFlightSqlClientHandler.PreparedStatement preparedStatement;
38+
private final Map<Integer, Timestamp> rawTimestamps = new HashMap<>();
3339

3440
private ArrowFlightPreparedStatement(
3541
final ArrowFlightConnection connection,
@@ -74,6 +80,41 @@ public synchronized void close() throws SQLException {
7480
super.close();
7581
}
7682

83+
@Override
84+
public void setTimestamp(int parameterIndex, Timestamp x) throws SQLException {
85+
if (x != null) {
86+
rawTimestamps.put(parameterIndex, x);
87+
} else {
88+
rawTimestamps.remove(parameterIndex);
89+
}
90+
super.setTimestamp(parameterIndex, x);
91+
}
92+
93+
@Override
94+
public void setTimestamp(int parameterIndex, Timestamp x, Calendar cal) throws SQLException {
95+
if (x != null) {
96+
rawTimestamps.put(parameterIndex, x);
97+
} else {
98+
rawTimestamps.remove(parameterIndex);
99+
}
100+
super.setTimestamp(parameterIndex, x, cal);
101+
}
102+
103+
@Override
104+
public void clearParameters() throws SQLException {
105+
rawTimestamps.clear();
106+
super.clearParameters();
107+
}
108+
109+
/**
110+
* Returns the raw java.sql.Timestamp objects set via setTimestamp(), keyed by 1-based parameter
111+
* index. These preserve sub-millisecond precision (getNanos()) that Avatica's TypedValue
112+
* serialization discards.
113+
*/
114+
Map<Integer, Timestamp> getRawTimestamps() {
115+
return Collections.unmodifiableMap(rawTimestamps);
116+
}
117+
77118
@Override
78119
public FlightInfo executeFlightInfoQuery() throws SQLException {
79120
return preparedStatement.executeQuery();

flight/flight-sql-jdbc-core/src/main/java/org/apache/arrow/driver/jdbc/converter/impl/FixedSizeListAvaticaParameterConverter.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -66,7 +66,10 @@ public boolean bindParameter(FieldVector vector, TypedValue typedValue, int inde
6666
.getType()
6767
.accept(
6868
new AvaticaParameterBinder.BinderVisitor(
69-
childVector, TypedValue.ofSerial(typedValue.componentType, val), childIndex));
69+
childVector,
70+
TypedValue.ofSerial(typedValue.componentType, val),
71+
childIndex,
72+
null));
7073
}
7174
}
7275
listVector.setValueCount(index + 1);

flight/flight-sql-jdbc-core/src/main/java/org/apache/arrow/driver/jdbc/converter/impl/LargeListAvaticaParameterConverter.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,10 @@ public boolean bindParameter(FieldVector vector, TypedValue typedValue, int inde
5555
.getType()
5656
.accept(
5757
new AvaticaParameterBinder.BinderVisitor(
58-
childVector, TypedValue.ofSerial(typedValue.componentType, val), childIndex));
58+
childVector,
59+
TypedValue.ofSerial(typedValue.componentType, val),
60+
childIndex,
61+
null));
5962
}
6063
}
6164
listVector.endValue(index, values.size());

flight/flight-sql-jdbc-core/src/main/java/org/apache/arrow/driver/jdbc/converter/impl/ListAvaticaParameterConverter.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -54,7 +54,10 @@ public boolean bindParameter(FieldVector vector, TypedValue typedValue, int inde
5454
.getType()
5555
.accept(
5656
new AvaticaParameterBinder.BinderVisitor(
57-
childVector, TypedValue.ofSerial(typedValue.componentType, val), childIndex));
57+
childVector,
58+
TypedValue.ofSerial(typedValue.componentType, val),
59+
childIndex,
60+
null));
5861
}
5962
}
6063
listVector.endValue(index, values.size());

flight/flight-sql-jdbc-core/src/main/java/org/apache/arrow/driver/jdbc/converter/impl/TimestampAvaticaParameterConverter.java

Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,8 @@
1616
*/
1717
package org.apache.arrow.driver.jdbc.converter.impl;
1818

19+
import java.sql.Timestamp;
20+
import org.checkerframework.checker.nullness.qual.Nullable;
1921
import org.apache.arrow.vector.FieldVector;
2022
import org.apache.arrow.vector.TimeStampMicroTZVector;
2123
import org.apache.arrow.vector.TimeStampMicroVector;
@@ -39,6 +41,30 @@ public TimestampAvaticaParameterConverter(ArrowType.Timestamp type) {
3941
this.type = type;
4042
}
4143

44+
/**
45+
* Converts a raw java.sql.Timestamp to the value in the target Arrow time unit, preserving
46+
* sub-millisecond precision from Timestamp.getNanos().
47+
*/
48+
private long convertFromTimestamp(Timestamp ts) {
49+
// Timestamp.getTime() returns epoch millis (truncated, no sub-ms precision).
50+
// Timestamp.getNanos() returns the fractional-second component in nanoseconds (0..999_999_999).
51+
// We reconstruct the full-precision value from epoch seconds + nanos to avoid double-counting.
52+
long epochSeconds = Math.floorDiv(ts.getTime(), 1_000L);
53+
int nanos = ts.getNanos(); // 0..999_999_999, full fractional second
54+
switch (type.getUnit()) {
55+
case SECOND:
56+
return epochSeconds;
57+
case MILLISECOND:
58+
return epochSeconds * 1_000L + nanos / 1_000_000;
59+
case MICROSECOND:
60+
return epochSeconds * 1_000_000L + nanos / 1_000;
61+
case NANOSECOND:
62+
return epochSeconds * 1_000_000_000L + nanos;
63+
default:
64+
throw new UnsupportedOperationException("Unsupported time unit: " + type.getUnit());
65+
}
66+
}
67+
4268
/** Converts an epoch millisecond value from Avatica to the target time unit. */
4369
private long convertFromMillis(long epochMillis) {
4470
switch (type.getUnit()) {
@@ -55,9 +81,33 @@ private long convertFromMillis(long epochMillis) {
5581
}
5682
}
5783

84+
/**
85+
* Bind a timestamp parameter, using the raw java.sql.Timestamp if available for full precision.
86+
*
87+
* @param vector FieldVector to bind to.
88+
* @param typedValue TypedValue from Avatica (epoch millis, may have lost sub-ms precision).
89+
* @param index Vector index to bind the value at.
90+
* @param rawTimestamp Optional raw java.sql.Timestamp preserving sub-millisecond nanos.
91+
* @return true if binding was successful.
92+
*/
93+
public boolean bindParameter(
94+
FieldVector vector, TypedValue typedValue, int index, @Nullable Timestamp rawTimestamp) {
95+
long value;
96+
if (rawTimestamp != null) {
97+
value = convertFromTimestamp(rawTimestamp);
98+
} else {
99+
value = convertFromMillis((long) typedValue.toLocal());
100+
}
101+
return setTimestampVector(vector, index, value);
102+
}
103+
58104
@Override
59105
public boolean bindParameter(FieldVector vector, TypedValue typedValue, int index) {
60106
long value = convertFromMillis((long) typedValue.toLocal());
107+
return setTimestampVector(vector, index, value);
108+
}
109+
110+
private boolean setTimestampVector(FieldVector vector, int index, long value) {
61111
if (vector instanceof TimeStampSecVector) {
62112
((TimeStampSecVector) vector).setSafe(index, value);
63113
return true;

flight/flight-sql-jdbc-core/src/main/java/org/apache/arrow/driver/jdbc/utils/AvaticaParameterBinder.java

Lines changed: 34 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,10 @@
1616
*/
1717
package org.apache.arrow.driver.jdbc.utils;
1818

19+
import java.sql.Timestamp;
20+
import java.util.Collections;
1921
import java.util.List;
22+
import java.util.Map;
2023
import org.apache.arrow.driver.jdbc.client.ArrowFlightSqlClientHandler.PreparedStatement;
2124
import org.apache.arrow.driver.jdbc.converter.impl.BinaryAvaticaParameterConverter;
2225
import org.apache.arrow.driver.jdbc.converter.impl.BinaryViewAvaticaParameterConverter;
@@ -81,7 +84,7 @@ public AvaticaParameterBinder(
8184
* @param typedValues The parameter values.
8285
*/
8386
public void bind(List<TypedValue> typedValues) {
84-
bind(typedValues, 0);
87+
bind(typedValues, 0, Collections.emptyMap());
8588
}
8689

8790
/**
@@ -91,6 +94,19 @@ public void bind(List<TypedValue> typedValues) {
9194
* @param index index for parameter.
9295
*/
9396
public void bind(List<TypedValue> typedValues, int index) {
97+
bind(typedValues, index, Collections.emptyMap());
98+
}
99+
100+
/**
101+
* Bind the given Avatica values to the prepared statement at the given index, with optional raw
102+
* java.sql.Timestamp values that preserve sub-millisecond precision.
103+
*
104+
* @param typedValues The parameter values.
105+
* @param index index for parameter.
106+
* @param rawTimestamps Raw java.sql.Timestamp objects keyed by 1-based parameter index.
107+
*/
108+
public void bind(
109+
List<TypedValue> typedValues, int index, Map<Integer, Timestamp> rawTimestamps) {
94110
if (preparedStatement.getParameterSchema().getFields().size() != typedValues.size()) {
95111
throw new IllegalStateException(
96112
String.format(
@@ -99,7 +115,9 @@ public void bind(List<TypedValue> typedValues, int index) {
99115
}
100116

101117
for (int i = 0; i < typedValues.size(); i++) {
102-
bind(parameters.getVector(i), typedValues.get(i), index);
118+
// rawTimestamps uses 1-based JDBC parameter indices
119+
Timestamp rawTs = rawTimestamps.get(i + 1);
120+
bind(parameters.getVector(i), typedValues.get(i), index, rawTs);
103121
}
104122

105123
if (!typedValues.isEmpty()) {
@@ -114,8 +132,13 @@ public void bind(List<TypedValue> typedValues, int index) {
114132
* @param vector FieldVector to bind to.
115133
* @param typedValue TypedValue to bind to the vector.
116134
* @param index Vector index to bind the value at.
135+
* @param rawTimestamp Optional raw java.sql.Timestamp with sub-millisecond precision.
117136
*/
118-
private void bind(FieldVector vector, @Nullable TypedValue typedValue, int index) {
137+
private void bind(
138+
FieldVector vector,
139+
@Nullable TypedValue typedValue,
140+
int index,
141+
@Nullable Timestamp rawTimestamp) {
119142
try {
120143
if (typedValue == null || typedValue.value == null) {
121144
if (vector.getField().isNullable()) {
@@ -126,7 +149,7 @@ private void bind(FieldVector vector, @Nullable TypedValue typedValue, int index
126149
} else if (!vector
127150
.getField()
128151
.getType()
129-
.accept(new BinderVisitor(vector, typedValue, index))) {
152+
.accept(new BinderVisitor(vector, typedValue, index, rawTimestamp))) {
130153
throw new UnsupportedOperationException(
131154
String.format("Binding to vector type %s is not yet supported", vector.getClass()));
132155
}
@@ -146,18 +169,22 @@ public static class BinderVisitor implements ArrowType.ArrowTypeVisitor<Boolean>
146169
private final FieldVector vector;
147170
private final TypedValue typedValue;
148171
private final int index;
172+
@Nullable private final Timestamp rawTimestamp;
149173

150174
/**
151175
* Instantiate a new BinderVisitor.
152176
*
153177
* @param vector FieldVector to bind values to.
154178
* @param value TypedValue to bind.
155179
* @param index Vector index (0-based) to bind the value to.
180+
* @param rawTimestamp Optional raw java.sql.Timestamp preserving sub-millisecond precision.
156181
*/
157-
public BinderVisitor(FieldVector vector, TypedValue value, int index) {
182+
public BinderVisitor(
183+
FieldVector vector, TypedValue value, int index, @Nullable Timestamp rawTimestamp) {
158184
this.vector = vector;
159185
this.typedValue = value;
160186
this.index = index;
187+
this.rawTimestamp = rawTimestamp;
161188
}
162189

163190
@Override
@@ -266,7 +293,8 @@ public Boolean visit(ArrowType.Time type) {
266293

267294
@Override
268295
public Boolean visit(ArrowType.Timestamp type) {
269-
return new TimestampAvaticaParameterConverter(type).bindParameter(vector, typedValue, index);
296+
return new TimestampAvaticaParameterConverter(type)
297+
.bindParameter(vector, typedValue, index, rawTimestamp);
270298
}
271299

272300
@Override

0 commit comments

Comments
 (0)