Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions NEXT_CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
- PECOBLR-1121 Arrow patch to circumvent Arrow issues with JDK 16+.

### Fixed
- Fixed Thrift polling infinite loop when server restarts invalidate operation handles, and added configurable timeout (`MetadataOperationTimeout`, default 300s) with sleep between polls for metadata operations.
- Fixed `DatabricksParameterMetaData.countParameters` and `DatabricksStatement.trimCommentsAndWhitespaces` with a `SqlCommentParser` utility class.
- Fixed `rollback()` to throw `SQLException` when called in auto-commit mode (no active transaction), aligning with JDBC spec. Previously it silently sent a ROLLBACK command to the server.
- Fixed `fetchAutoCommitStateFromServer()` to accept both `"1"`/`"0"` and `"true"`/`"false"` responses from `SET AUTOCOMMIT` query, since different server implementations return different formats.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1108,6 +1108,16 @@ public boolean treatMetadataCatalogNameAsPattern() {
return getParameter(DatabricksJdbcUrlParams.TREAT_METADATA_CATALOG_NAME_AS_PATTERN).equals("1");
}

@Override
public int getMetadataOperationTimeout() {
try {
return Integer.parseInt(getParameter(DatabricksJdbcUrlParams.METADATA_OPERATION_TIMEOUT));
} catch (NumberFormatException e) {
LOGGER.warn("Invalid value for MetadataOperationTimeout, using default of 300 seconds");
return 300;
}
}

@Override
public boolean getEnableMetricViewMetadata() {
return getParameter(DatabricksJdbcUrlParams.ENABLE_METRIC_VIEW_METADATA).equals("1");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -409,6 +409,9 @@ public interface IDatabricksConnectionContext {

boolean treatMetadataCatalogNameAsPattern();

/** Returns the timeout in seconds for metadata polling operations. 0 means no timeout. */
int getMetadataOperationTimeout();

/** Returns whether batched INSERT optimization is enabled */
boolean isBatchedInsertsEnabled();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -176,6 +176,10 @@ public enum DatabricksJdbcUrlParams {
"TreatMetadataCatalogNameAsPattern",
"Treat catalog names as patterns in Thrift metadata RPCs. When disabled (default), wildcard characters in catalog names are escaped",
"0"),
METADATA_OPERATION_TIMEOUT(
"MetadataOperationTimeout",
"Timeout in seconds for metadata polling operations (e.g. GetTables, GetColumns). 0 means no timeout",
"300"),
ENABLE_BATCHED_INSERTS("EnableBatchedInserts", "Enable batched INSERT optimization", "0"),
ENABLE_SQL_VALIDATION_FOR_IS_VALID(
"EnableSQLValidationForIsValid",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -712,9 +712,42 @@ TFetchResultsResp fetchMetadataResults(TResp response, String contextDescription
new TGetOperationStatusReq()
.setOperationHandle(operationHandle)
.setGetProgressUpdate(false);
TimeoutHandler metadataTimeoutHandler =
new TimeoutHandler(
connectionContext.getMetadataOperationTimeout(),
"Metadata operation for statement: " + statementId,
() -> {
try {
if (operationHandle != null) {
LOGGER.debug("Canceling metadata operation due to timeout: {}", operationHandle);
cancelOperation(new TCancelOperationReq().setOperationHandle(operationHandle));
}
} catch (Exception e) {
LOGGER.warn("Failed to cancel metadata operation on timeout: {}", e.getMessage());
}
},
DatabricksDriverErrorCode.OPERATION_TIMEOUT_ERROR);
while (shouldContinuePolling(statusResp)) {
metadataTimeoutHandler.checkTimeout();
statusResp = getThriftClient().GetOperationStatus(statusReq);
checkOperationStatusForErrors(statusResp, statementId);
if (!shouldContinuePolling(statusResp)) {
break;
}
try {
TimeUnit.MILLISECONDS.sleep(asyncPollIntervalMillis);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
LOGGER.error(
"Metadata operation interrupted for statement [{}], canceling operation", statementId);
if (operationHandle != null) {
cancelOperation(new TCancelOperationReq().setOperationHandle(operationHandle));
}
throw new DatabricksSQLException(
"Metadata operation interrupted",
e,
DatabricksDriverErrorCode.THREAD_INTERRUPTED_ERROR);
}
}

if (hasResultDataInDirectResults(response)) {
Expand Down Expand Up @@ -758,9 +791,25 @@ private <T extends TBase<T, F>, F extends TFieldIdEnum> void checkResponseForErr

private void checkOperationStatusForErrors(TGetOperationStatusResp statusResp, String statementId)
throws SQLException {
if (statusResp != null
&& statusResp.isSetOperationState()
&& isErrorOperationState(statusResp.getOperationState())) {
if (statusResp == null) {
return;
}

// Check TStatus for INVALID_HANDLE_STATUS — this can happen when the server restarts
// and the operation handle becomes invalid. Without this check, the polling loop would
// continue indefinitely since operationState may not be set in the response.
if (statusResp.isSetStatus() && isErrorStatusCode(statusResp.getStatus())) {
String errorMsg =
String.format(
"Operation status check failed with status code: [%s] for statement [%s], "
+ "error: [%s]",
statusResp.getStatus().getStatusCode(), statementId, statusResp.getErrorMessage());
LOGGER.error(errorMsg);
throw new DatabricksSQLException(
errorMsg, statusResp.isSetSqlState() ? statusResp.getSqlState() : null);
}

if (statusResp.isSetOperationState() && isErrorOperationState(statusResp.getOperationState())) {
String errorMsg =
String.format(
"Operation failed with error: [%s] for statement [%s], with response [%s]",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -940,6 +940,130 @@ void testFetchResultsWithCustomMaxRowsPerBlock()
verify(thriftClient).FetchResults(expectedFetchRequest);
}

@Test
void testPollingThrowsOnInvalidHandleStatus()
throws TException, SQLException, DatabricksValidationException {
setup(false);

TExecuteStatementReq request = new TExecuteStatementReq();
TExecuteStatementResp tExecuteStatementResp =
new TExecuteStatementResp()
.setOperationHandle(tOperationHandle)
.setStatus(new TStatus().setStatusCode(TStatusCode.SUCCESS_STATUS));
when(thriftClient.ExecuteStatement(request)).thenReturn(tExecuteStatementResp);

// Simulate server restart: GetOperationStatus returns INVALID_HANDLE_STATUS
// without setting operationState
TGetOperationStatusResp invalidHandleResp =
new TGetOperationStatusResp()
.setStatus(new TStatus().setStatusCode(TStatusCode.INVALID_HANDLE_STATUS));
when(thriftClient.GetOperationStatus(operationStatusReq)).thenReturn(invalidHandleResp);

Statement statement = mock(Statement.class);
when(parentStatement.getStatement()).thenReturn(statement);
when(statement.getQueryTimeout()).thenReturn(0);

DatabricksSQLException exception =
assertThrows(
DatabricksSQLException.class,
() -> accessor.execute(request, parentStatement, session, StatementType.SQL));
assertTrue(exception.getMessage().contains("INVALID_HANDLE_STATUS"));
}

@Test
void testMetadataPollingThrowsOnInvalidHandleStatus()
throws TException, SQLException, DatabricksValidationException {
setup(false);
lenient().when(connectionContext.getMetadataOperationTimeout()).thenReturn(300);

TGetSchemasReq request = new TGetSchemasReq();
TGetSchemasResp tGetSchemasResp =
new TGetSchemasResp()
.setOperationHandle(tOperationHandle)
.setStatus(new TStatus().setStatusCode(TStatusCode.SUCCESS_STATUS));
when(thriftClient.GetSchemas(request)).thenReturn(tGetSchemasResp);

// Simulate server restart: GetOperationStatus returns INVALID_HANDLE_STATUS
TGetOperationStatusResp invalidHandleResp =
new TGetOperationStatusResp()
.setStatus(new TStatus().setStatusCode(TStatusCode.INVALID_HANDLE_STATUS));
when(thriftClient.GetOperationStatus(operationStatusReq)).thenReturn(invalidHandleResp);

DatabricksSQLException exception =
assertThrows(DatabricksSQLException.class, () -> accessor.getThriftResponse(request));
assertTrue(exception.getMessage().contains("INVALID_HANDLE_STATUS"));
}

@Test
void testMetadataPollingTimesOut()
throws TException, SQLException, DatabricksValidationException {
// Set the async poll interval to 200ms for faster test
when(connectionContext.getAsyncExecPollInterval()).thenReturn(200);
// Set metadata timeout to 1 second
when(connectionContext.getMetadataOperationTimeout()).thenReturn(1);

accessor = spy(new DatabricksThriftAccessor(connectionContext));
doReturn(thriftClient).when(accessor).getThriftClient();

TGetTablesReq request = new TGetTablesReq();
TGetTablesResp tGetTablesResp =
new TGetTablesResp()
.setOperationHandle(tOperationHandle)
.setStatus(new TStatus().setStatusCode(TStatusCode.SUCCESS_STATUS));
when(thriftClient.GetTables(request)).thenReturn(tGetTablesResp);

// Simulate operation that stays running forever
when(thriftClient.GetOperationStatus(operationStatusReq))
.thenReturn(operationStatusRunningResp);

// Create cancel mock
TCancelOperationResp cancelResp =
new TCancelOperationResp()
.setStatus(new TStatus().setStatusCode(TStatusCode.SUCCESS_STATUS));
when(thriftClient.CancelOperation(any(TCancelOperationReq.class))).thenReturn(cancelResp);

// getThriftResponse wraps SQLException into DatabricksSQLException, so the timeout
// exception is wrapped. Verify that the root cause message contains the timeout info.
DatabricksSQLException exception =
assertThrows(DatabricksSQLException.class, () -> accessor.getThriftResponse(request));
assertTrue(exception.getMessage().contains("timed-out after 1 seconds"));

// Verify cancel was called
verify(thriftClient).CancelOperation(any(TCancelOperationReq.class));
}

@Test
void testMetadataPollingWithSleepBetweenPolls()
throws TException, SQLException, DatabricksValidationException {
// Set poll interval to 200ms
when(connectionContext.getAsyncExecPollInterval()).thenReturn(200);
when(connectionContext.getMetadataOperationTimeout()).thenReturn(300);

accessor = spy(new DatabricksThriftAccessor(connectionContext));
doReturn(thriftClient).when(accessor).getThriftClient();

TGetColumnsReq request = new TGetColumnsReq();
TGetColumnsResp tGetColumnsResp =
new TGetColumnsResp()
.setOperationHandle(tOperationHandle)
.setStatus(new TStatus().setStatusCode(TStatusCode.SUCCESS_STATUS));
when(thriftClient.GetColumns(request)).thenReturn(tGetColumnsResp);

// Simulate: first poll returns running, second returns finished
when(thriftClient.GetOperationStatus(operationStatusReq))
.thenReturn(operationStatusRunningResp)
.thenReturn(operationStatusFinishedResp);
when(thriftClient.FetchResults(getFetchResultsRequest(false))).thenReturn(fetchResultsResponse);

long startTime = System.currentTimeMillis();
TFetchResultsResp actualResponse = (TFetchResultsResp) accessor.getThriftResponse(request);
long elapsed = System.currentTimeMillis() - startTime;

assertEquals(actualResponse, fetchResultsResponse);
// Verify sleep happened — elapsed time should be at least ~200ms
assertTrue(elapsed >= 150, "Expected at least 150ms elapsed due to poll sleep, got " + elapsed);
}

private TFetchResultsReq getFetchResultsRequest(boolean includeMetadata)
throws DatabricksValidationException {
TFetchResultsReq request =
Expand Down
Loading