Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import java.sql.Connection;
import java.sql.SQLException;
import java.sql.SQLTimeoutException;
import java.sql.Timestamp;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
Expand All @@ -31,6 +32,7 @@
import org.apache.arrow.vector.types.pojo.Schema;
import org.apache.calcite.avatica.AvaticaConnection;
import org.apache.calcite.avatica.AvaticaParameter;
import org.apache.calcite.avatica.AvaticaStatement;
import org.apache.calcite.avatica.ColumnMetaData;
import org.apache.calcite.avatica.MetaImpl;
import org.apache.calcite.avatica.NoSuchStatementException;
Expand Down Expand Up @@ -105,9 +107,10 @@ public ExecuteResult execute(
throw new IllegalStateException("Prepared statement not found: " + statementHandle);
}

Map<Integer, Timestamp> rawTimestamps = getRawTimestamps(statementHandle);
new AvaticaParameterBinder(
preparedStatement, ((ArrowFlightConnection) connection).getBufferAllocator())
.bind(typedValues);
.bind(typedValues, 0, rawTimestamps);

if (statementHandle.signature == null
|| statementHandle.signature.statementType == StatementType.IS_DML) {
Expand Down Expand Up @@ -149,11 +152,12 @@ public ExecuteBatchResult executeBatch(
throw new IllegalStateException("Prepared statement not found: " + statementHandle);
}

Map<Integer, Timestamp> rawTimestamps = getRawTimestamps(statementHandle);
final AvaticaParameterBinder binder =
new AvaticaParameterBinder(
preparedStatement, ((ArrowFlightConnection) connection).getBufferAllocator());
for (int i = 0; i < parameterValuesList.size(); i++) {
binder.bind(parameterValuesList.get(i), i);
binder.bind(parameterValuesList.get(i), i, rawTimestamps);
}

// Update query
Expand All @@ -173,6 +177,14 @@ public Frame fetch(
String.format("%s does not use frames.", this), AvaticaConnection.HELPER.unsupported());
}

private Map<Integer, Timestamp> getRawTimestamps(StatementHandle statementHandle) {
AvaticaStatement avaticaStmt = connection.statementMap.get(statementHandle.id);
if (avaticaStmt instanceof ArrowFlightPreparedStatement) {
return ((ArrowFlightPreparedStatement) avaticaStmt).getRawTimestamps();
}
return Collections.emptyMap();
}

private PreparedStatement prepareForHandle(final String query, StatementHandle handle) {
final PreparedStatement preparedStatement =
((ArrowFlightConnection) connection).getClientHandler().prepare(query);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,11 @@

import java.sql.PreparedStatement;
import java.sql.SQLException;
import java.sql.Timestamp;
import java.util.Calendar;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import org.apache.arrow.driver.jdbc.client.ArrowFlightSqlClientHandler;
import org.apache.arrow.flight.FlightInfo;
import org.apache.arrow.util.Preconditions;
Expand All @@ -30,6 +35,7 @@ public class ArrowFlightPreparedStatement extends AvaticaPreparedStatement
implements ArrowFlightInfoStatement {

private final ArrowFlightSqlClientHandler.PreparedStatement preparedStatement;
private final Map<Integer, Timestamp> rawTimestamps = new HashMap<>();

private ArrowFlightPreparedStatement(
final ArrowFlightConnection connection,
Expand Down Expand Up @@ -74,6 +80,41 @@ public synchronized void close() throws SQLException {
super.close();
}

@Override
public void setTimestamp(int parameterIndex, Timestamp x) throws SQLException {
if (x != null) {
rawTimestamps.put(parameterIndex, x);
} else {
rawTimestamps.remove(parameterIndex);
}
super.setTimestamp(parameterIndex, x);
}

@Override
public void setTimestamp(int parameterIndex, Timestamp x, Calendar cal) throws SQLException {
if (x != null) {
rawTimestamps.put(parameterIndex, x);
} else {
rawTimestamps.remove(parameterIndex);
}
super.setTimestamp(parameterIndex, x, cal);
}

@Override
public void clearParameters() throws SQLException {
rawTimestamps.clear();
super.clearParameters();
}

/**
* Returns the raw java.sql.Timestamp objects set via setTimestamp(), keyed by 1-based parameter
* index. These preserve sub-millisecond precision (getNanos()) that Avatica's TypedValue
* serialization discards.
*/
Map<Integer, Timestamp> getRawTimestamps() {
return Collections.unmodifiableMap(rawTimestamps);
}

@Override
public FlightInfo executeFlightInfoQuery() throws SQLException {
return preparedStatement.executeQuery();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,10 @@ public boolean bindParameter(FieldVector vector, TypedValue typedValue, int inde
.getType()
.accept(
new AvaticaParameterBinder.BinderVisitor(
childVector, TypedValue.ofSerial(typedValue.componentType, val), childIndex));
childVector,
TypedValue.ofSerial(typedValue.componentType, val),
childIndex,
null));
}
}
listVector.setValueCount(index + 1);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,10 @@ public boolean bindParameter(FieldVector vector, TypedValue typedValue, int inde
.getType()
.accept(
new AvaticaParameterBinder.BinderVisitor(
childVector, TypedValue.ofSerial(typedValue.componentType, val), childIndex));
childVector,
TypedValue.ofSerial(typedValue.componentType, val),
childIndex,
null));
}
}
listVector.endValue(index, values.size());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,10 @@ public boolean bindParameter(FieldVector vector, TypedValue typedValue, int inde
.getType()
.accept(
new AvaticaParameterBinder.BinderVisitor(
childVector, TypedValue.ofSerial(typedValue.componentType, val), childIndex));
childVector,
TypedValue.ofSerial(typedValue.componentType, val),
childIndex,
null));
}
}
listVector.endValue(index, values.size());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@
*/
package org.apache.arrow.driver.jdbc.converter.impl;

import java.sql.Timestamp;
import org.checkerframework.checker.nullness.qual.Nullable;
import org.apache.arrow.vector.FieldVector;
import org.apache.arrow.vector.TimeStampMicroTZVector;
import org.apache.arrow.vector.TimeStampMicroVector;
Expand All @@ -33,11 +35,79 @@
/** AvaticaParameterConverter for Timestamp Arrow types. */
public class TimestampAvaticaParameterConverter extends BaseAvaticaParameterConverter {

public TimestampAvaticaParameterConverter(ArrowType.Timestamp type) {}
private final ArrowType.Timestamp type;

public TimestampAvaticaParameterConverter(ArrowType.Timestamp type) {
this.type = type;
}

/**
* Converts a raw java.sql.Timestamp to the value in the target Arrow time unit, preserving
* sub-millisecond precision from Timestamp.getNanos().
*/
private long convertFromTimestamp(Timestamp ts) {
// Timestamp.getTime() returns epoch millis (truncated, no sub-ms precision).
// Timestamp.getNanos() returns the fractional-second component in nanoseconds (0..999_999_999).
// We reconstruct the full-precision value from epoch seconds + nanos to avoid double-counting.
long epochSeconds = Math.floorDiv(ts.getTime(), 1_000L);
int nanos = ts.getNanos(); // 0..999_999_999, full fractional second
switch (type.getUnit()) {
case SECOND:
return epochSeconds;
case MILLISECOND:
return epochSeconds * 1_000L + nanos / 1_000_000;
case MICROSECOND:
return epochSeconds * 1_000_000L + nanos / 1_000;
case NANOSECOND:
return epochSeconds * 1_000_000_000L + nanos;
default:
throw new UnsupportedOperationException("Unsupported time unit: " + type.getUnit());
}
}

/** Converts an epoch millisecond value from Avatica to the target time unit. */
private long convertFromMillis(long epochMillis) {
switch (type.getUnit()) {
case SECOND:
return epochMillis / 1_000L;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

epochMillis / 1_000L truncates toward zero. It means that -999ms will end as 0 instead of -1.
This differs from the convertFromTimestamp method which uses Math.floorDiv.

I suggest to use Math.floorDiv(epochMillis, 1_000L) here.

case MILLISECOND:
return epochMillis;
case MICROSECOND:
return epochMillis * 1_000L;
case NANOSECOND:
return epochMillis * 1_000_000L;
default:
throw new UnsupportedOperationException("Unsupported time unit: " + type.getUnit());
}
}

/**
* Bind a timestamp parameter, using the raw java.sql.Timestamp if available for full precision.
*
* @param vector FieldVector to bind to.
* @param typedValue TypedValue from Avatica (epoch millis, may have lost sub-ms precision).
* @param index Vector index to bind the value at.
* @param rawTimestamp Optional raw java.sql.Timestamp preserving sub-millisecond nanos.
* @return true if binding was successful.
*/
public boolean bindParameter(
FieldVector vector, TypedValue typedValue, int index, @Nullable Timestamp rawTimestamp) {
long value;
if (rawTimestamp != null) {
value = convertFromTimestamp(rawTimestamp);
} else {
value = convertFromMillis((long) typedValue.toLocal());
}
return setTimestampVector(vector, index, value);
}

@Override
public boolean bindParameter(FieldVector vector, TypedValue typedValue, int index) {
long value = (long) typedValue.toLocal();
long value = convertFromMillis((long) typedValue.toLocal());
return setTimestampVector(vector, index, value);
}

private boolean setTimestampVector(FieldVector vector, int index, long value) {
if (vector instanceof TimeStampSecVector) {
((TimeStampSecVector) vector).setSafe(index, value);
return true;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,10 @@
*/
package org.apache.arrow.driver.jdbc.utils;

import java.sql.Timestamp;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import org.apache.arrow.driver.jdbc.client.ArrowFlightSqlClientHandler.PreparedStatement;
import org.apache.arrow.driver.jdbc.converter.impl.BinaryAvaticaParameterConverter;
import org.apache.arrow.driver.jdbc.converter.impl.BinaryViewAvaticaParameterConverter;
Expand Down Expand Up @@ -81,7 +84,7 @@ public AvaticaParameterBinder(
* @param typedValues The parameter values.
*/
public void bind(List<TypedValue> typedValues) {
bind(typedValues, 0);
bind(typedValues, 0, Collections.emptyMap());
}

/**
Expand All @@ -91,6 +94,19 @@ public void bind(List<TypedValue> typedValues) {
* @param index index for parameter.
*/
public void bind(List<TypedValue> typedValues, int index) {
bind(typedValues, index, Collections.emptyMap());
}

/**
* Bind the given Avatica values to the prepared statement at the given index, with optional raw
* java.sql.Timestamp values that preserve sub-millisecond precision.
*
* @param typedValues The parameter values.
* @param index index for parameter.
* @param rawTimestamps Raw java.sql.Timestamp objects keyed by 1-based parameter index.
*/
public void bind(
List<TypedValue> typedValues, int index, Map<Integer, Timestamp> rawTimestamps) {
if (preparedStatement.getParameterSchema().getFields().size() != typedValues.size()) {
throw new IllegalStateException(
String.format(
Expand All @@ -99,7 +115,9 @@ public void bind(List<TypedValue> typedValues, int index) {
}

for (int i = 0; i < typedValues.size(); i++) {
bind(parameters.getVector(i), typedValues.get(i), index);
// rawTimestamps uses 1-based JDBC parameter indices
Timestamp rawTs = rawTimestamps.get(i + 1);
bind(parameters.getVector(i), typedValues.get(i), index, rawTs);
}

if (!typedValues.isEmpty()) {
Expand All @@ -114,8 +132,13 @@ public void bind(List<TypedValue> typedValues, int index) {
* @param vector FieldVector to bind to.
* @param typedValue TypedValue to bind to the vector.
* @param index Vector index to bind the value at.
* @param rawTimestamp Optional raw java.sql.Timestamp with sub-millisecond precision.
*/
private void bind(FieldVector vector, @Nullable TypedValue typedValue, int index) {
private void bind(
FieldVector vector,
@Nullable TypedValue typedValue,
int index,
@Nullable Timestamp rawTimestamp) {
try {
if (typedValue == null || typedValue.value == null) {
if (vector.getField().isNullable()) {
Expand All @@ -126,7 +149,7 @@ private void bind(FieldVector vector, @Nullable TypedValue typedValue, int index
} else if (!vector
.getField()
.getType()
.accept(new BinderVisitor(vector, typedValue, index))) {
.accept(new BinderVisitor(vector, typedValue, index, rawTimestamp))) {
throw new UnsupportedOperationException(
String.format("Binding to vector type %s is not yet supported", vector.getClass()));
}
Expand All @@ -146,18 +169,22 @@ public static class BinderVisitor implements ArrowType.ArrowTypeVisitor<Boolean>
private final FieldVector vector;
private final TypedValue typedValue;
private final int index;
@Nullable private final Timestamp rawTimestamp;

/**
* Instantiate a new BinderVisitor.
*
* @param vector FieldVector to bind values to.
* @param value TypedValue to bind.
* @param index Vector index (0-based) to bind the value to.
* @param rawTimestamp Optional raw java.sql.Timestamp preserving sub-millisecond precision.
*/
public BinderVisitor(FieldVector vector, TypedValue value, int index) {
public BinderVisitor(
FieldVector vector, TypedValue value, int index, @Nullable Timestamp rawTimestamp) {
this.vector = vector;
this.typedValue = value;
this.index = index;
this.rawTimestamp = rawTimestamp;
}

@Override
Expand Down Expand Up @@ -266,7 +293,8 @@ public Boolean visit(ArrowType.Time type) {

@Override
public Boolean visit(ArrowType.Timestamp type) {
return new TimestampAvaticaParameterConverter(type).bindParameter(vector, typedValue, index);
return new TimestampAvaticaParameterConverter(type)
.bindParameter(vector, typedValue, index, rawTimestamp);
}

@Override
Expand Down
Loading
Loading