Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
184 changes: 97 additions & 87 deletions src/duckdb/src/common/adbc/adbc.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -138,8 +138,6 @@ struct DuckDBAdbcStreamWrapper {
duckdb::DuckDBAdbcConnectionWrapper *conn_wrapper;
};

static void MaterializeActiveStreams(duckdb::DuckDBAdbcConnectionWrapper *conn_wrapper);

static bool IsInterruptError(const char *message) {
if (!message) {
return false;
Expand Down Expand Up @@ -965,14 +963,9 @@ AdbcStatusCode ConnectionRelease(struct AdbcConnection *connection, struct AdbcE
if (connection && connection->private_data) {
auto conn_wrapper = static_cast<duckdb::DuckDBAdbcConnectionWrapper *>(connection->private_data);
// Materialize active streams before disconnecting so they remain readable
MaterializeActiveStreams(conn_wrapper);
conn_wrapper->MaterializeStreams();
// Detach active streams before deleting conn_wrapper to avoid dangling pointers
for (auto *stream_wrapper : conn_wrapper->active_streams) {
if (stream_wrapper) {
stream_wrapper->conn_wrapper = nullptr;
}
}
conn_wrapper->active_streams.clear();
conn_wrapper->DetachAndClearStreams();
auto conn = reinterpret_cast<duckdb::Connection *>(conn_wrapper->connection);
duckdb_disconnect(reinterpret_cast<duckdb_connection *>(&conn));
delete conn_wrapper;
Expand Down Expand Up @@ -1072,13 +1065,9 @@ void release(struct ArrowArrayStream *stream) {
}
auto result_wrapper = reinterpret_cast<DuckDBAdbcStreamWrapper *>(stream->private_data);
if (result_wrapper) {
// Unregister from connection's active_streams
// Unregister from connection's active streams
if (result_wrapper->conn_wrapper) {
auto &active = result_wrapper->conn_wrapper->active_streams;
auto it = std::find(active.begin(), active.end(), result_wrapper);
if (it != active.end()) {
active.erase(it);
}
result_wrapper->conn_wrapper->UnregisterStream(result_wrapper);
}
// Clean up materialized data if present
if (result_wrapper->materialized) {
Expand Down Expand Up @@ -1506,75 +1495,6 @@ static AdbcStatusCode IngestToTableFromBoundStream(DuckDBAdbcStatementWrapper *s
rows_affected);
}

// Materialize all active streams on a connection so that a new query can execute.
// This fetches remaining data from each streaming result into memory, making the
// streams independent of the connection's active query context.
static void MaterializeActiveStreams(duckdb::DuckDBAdbcConnectionWrapper *conn_wrapper) {
for (auto *result_wrapper : conn_wrapper->active_streams) {
if (!result_wrapper || result_wrapper->materialized) {
continue;
}

// Collect remaining batches from the streaming result
duckdb::vector<ArrowArray> batches;
auto arrow_options = duckdb_result_get_arrow_options(&result_wrapper->result);
while (true) {
ArrowArray array;
std::memset(&array, 0, sizeof(ArrowArray));

auto duckdb_chunk = duckdb_fetch_chunk(result_wrapper->result);
if (!duckdb_chunk) {
break;
}
auto conversion_err = duckdb_data_chunk_to_arrow(arrow_options, duckdb_chunk, &array);
duckdb_destroy_data_chunk(&duckdb_chunk);

if (conversion_err) {
duckdb_destroy_error_data(&conversion_err);
if (array.release) {
array.release(&array);
}
break;
}
batches.push_back(array);
}
duckdb_destroy_arrow_options(&arrow_options);

// Store materialized data
auto mat = static_cast<MaterializedData *>(malloc(sizeof(MaterializedData)));
if (!mat) {
// Allocation failed — release fetched batches and skip materialization
for (auto &batch : batches) {
if (batch.release) {
batch.release(&batch);
}
}
continue;
}
mat->current = 0;
mat->count = static_cast<idx_t>(batches.size());
if (!batches.empty()) {
mat->batches = static_cast<ArrowArray *>(malloc(sizeof(ArrowArray) * batches.size()));
if (!mat->batches) {
// Allocation failed — release fetched batches and skip materialization
for (auto &batch : batches) {
if (batch.release) {
batch.release(&batch);
}
}
free(mat);
continue;
}
for (idx_t i = 0; i < batches.size(); i++) {
mat->batches[i] = batches[i];
}
} else {
mat->batches = nullptr;
}
result_wrapper->materialized = mat;
}
}

AdbcStatusCode StatementExecuteQuery(struct AdbcStatement *statement, struct ArrowArrayStream *out,
int64_t *rows_affected, struct AdbcError *error) {
if (!statement) {
Expand All @@ -1595,7 +1515,7 @@ AdbcStatusCode StatementExecuteQuery(struct AdbcStatement *statement, struct Arr
// Without materialization, executing a new query would silently invalidate any existing streaming results on the
// same connection.
if (wrapper->conn_wrapper) {
MaterializeActiveStreams(wrapper->conn_wrapper);
wrapper->conn_wrapper->MaterializeStreams();
}

// TODO: Set affected rows, careful with early return
Expand Down Expand Up @@ -1755,7 +1675,7 @@ AdbcStatusCode StatementExecuteQuery(struct AdbcStatement *statement, struct Arr
out->get_last_error = get_last_error;
// Register this stream wrapper so it can be materialized if another query runs
if (wrapper->conn_wrapper) {
wrapper->conn_wrapper->active_streams.push_back(stream_wrapper);
wrapper->conn_wrapper->RegisterStream(stream_wrapper);
}
} else {
// Caller didn't request a stream; clean up resources
Expand Down Expand Up @@ -1803,7 +1723,7 @@ AdbcStatusCode StatementSetSqlQuery(struct AdbcStatement *statement, const char

// Materialize any active streams before preparing
if (wrapper->conn_wrapper) {
MaterializeActiveStreams(wrapper->conn_wrapper);
wrapper->conn_wrapper->MaterializeStreams();
}

if (wrapper->ingestion_stream.release) {
Expand Down Expand Up @@ -2468,6 +2388,96 @@ AdbcStatusCode ConnectionGetTableTypes(struct AdbcConnection *connection, struct

} // namespace duckdb_adbc

void duckdb::DuckDBAdbcConnectionWrapper::RegisterStream(duckdb_adbc::DuckDBAdbcStreamWrapper *stream) {
const duckdb::lock_guard<duckdb::mutex> guard(stream_mutex);
active_streams.push_back(stream);
}

void duckdb::DuckDBAdbcConnectionWrapper::UnregisterStream(duckdb_adbc::DuckDBAdbcStreamWrapper *stream) {
const duckdb::lock_guard<duckdb::mutex> guard(stream_mutex);
auto it = std::find(active_streams.begin(), active_streams.end(), stream);
if (it != active_streams.end()) {
active_streams.erase(it);
}
}

void duckdb::DuckDBAdbcConnectionWrapper::MaterializeStreams() {
const duckdb::lock_guard<duckdb::mutex> guard(stream_mutex);
for (auto *result_wrapper : active_streams) {
if (!result_wrapper || result_wrapper->materialized) {
continue;
}

// Collect remaining batches from the streaming result
duckdb::vector<ArrowArray> batches;
auto arrow_options = duckdb_result_get_arrow_options(&result_wrapper->result);
while (true) {
ArrowArray array;
std::memset(&array, 0, sizeof(ArrowArray));

auto duckdb_chunk = duckdb_fetch_chunk(result_wrapper->result);
if (!duckdb_chunk) {
break;
}
auto conversion_err = duckdb_data_chunk_to_arrow(arrow_options, duckdb_chunk, &array);
duckdb_destroy_data_chunk(&duckdb_chunk);

if (conversion_err) {
duckdb_destroy_error_data(&conversion_err);
if (array.release) {
array.release(&array);
}
break;
}
batches.push_back(array);
}
duckdb_destroy_arrow_options(&arrow_options);

// Store materialized data
auto mat = static_cast<duckdb_adbc::MaterializedData *>(malloc(sizeof(duckdb_adbc::MaterializedData)));
if (!mat) {
// Allocation failed — release fetched batches and skip materialization
for (auto &batch : batches) {
if (batch.release) {
batch.release(&batch);
}
}
continue;
}
mat->current = 0;
mat->count = static_cast<idx_t>(batches.size());
if (!batches.empty()) {
mat->batches = static_cast<ArrowArray *>(malloc(sizeof(ArrowArray) * batches.size()));
if (!mat->batches) {
// Allocation failed — release fetched batches and skip materialization
for (auto &batch : batches) {
if (batch.release) {
batch.release(&batch);
}
}
free(mat);
continue;
}
for (idx_t i = 0; i < batches.size(); i++) {
mat->batches[i] = batches[i];
}
} else {
mat->batches = nullptr;
}
result_wrapper->materialized = mat;
}
}

void duckdb::DuckDBAdbcConnectionWrapper::DetachAndClearStreams() {
const duckdb::lock_guard<duckdb::mutex> guard(stream_mutex);
for (auto *stream_wrapper : active_streams) {
if (stream_wrapper) {
stream_wrapper->conn_wrapper = nullptr;
}
}
active_streams.clear();
}

static void ReleaseError(struct AdbcError *error) {
if (error) {
if (error->message)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -886,7 +886,9 @@ unique_ptr<DataChunk> JoinFilterPushdownInfo::FinalizeFilters(ClientContext &con
break;
}

if (ht && CanUseBloomFilter(context, ht, op, cmp, is_perfect_hashtable)) {
auto condition_type = op.conditions[join_condition[filter_idx]].left->return_type;
bool has_cast = condition_type != pushdown_column.storage_type;
if (!has_cast && ht && CanUseBloomFilter(context, ht, op, cmp, is_perfect_hashtable)) {
PushBloomFilter(info, *ht, op, filter_col_idx);
}
}
Expand Down
14 changes: 2 additions & 12 deletions src/duckdb/src/execution/operator/persistent/physical_delete.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
#include "duckdb/storage/table/delete_state.hpp"
#include "duckdb/storage/table/scan_state.hpp"
#include "duckdb/transaction/duck_transaction.hpp"
#include "duckdb/transaction/local_storage.hpp"

namespace duckdb {

Expand Down Expand Up @@ -128,21 +129,10 @@ SinkResultType PhysicalDelete::Sink(ExecutionContext &context, DataChunk &chunk,
l_state.delete_chunk.SetCardinality(fetch_chunk);

// Append the deleted row IDs to the delete indexes.
// If we only delete local row IDs, then the delete_chunk is empty.
if (g_state.has_unique_indexes && l_state.delete_chunk.size() != 0) {
auto &local_storage = LocalStorage::Get(context.client, table.db);
auto storage = local_storage.GetStorage(table);
IndexAppendInfo index_append_info(IndexAppendMode::IGNORE_DUPLICATES, nullptr);
for (auto &index : storage->delete_indexes.Indexes()) {
if (!index.IsBound() || !index.IsUnique()) {
continue;
}
auto &bound_index = index.Cast<BoundIndex>();
auto error = bound_index.Append(l_state.delete_chunk, row_ids, index_append_info);
if (error.HasError()) {
throw InternalException("failed to update delete ART in physical delete: ", error.Message());
}
}
storage->AppendToDeleteIndexes(row_ids, l_state.delete_chunk);
}

auto deleted_count = table.Delete(*l_state.delete_state, context.client, row_ids, chunk.size());
Expand Down
4 changes: 1 addition & 3 deletions src/duckdb/src/function/table/system/duckdb_functions.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -11,13 +11,11 @@
#include "duckdb/catalog/catalog_entry/table_function_catalog_entry.hpp"
#include "duckdb/catalog/catalog_entry/pragma_function_catalog_entry.hpp"
#include "duckdb/parser/expression/columnref_expression.hpp"
#include "duckdb/common/algorithm.hpp"
#include "duckdb/common/optional_idx.hpp"
#include "duckdb/common/types.hpp"
#include "duckdb/main/client_data.hpp"
#include "duckdb/parser/expression/window_expression.hpp"
#include "duckdb/main/database.hpp"
#include "duckdb/parser/parsed_data/create_scalar_function_info.hpp"
#include "duckdb/function/scalar_function.hpp"

namespace duckdb {
Expand Down Expand Up @@ -571,7 +569,7 @@ struct PragmaFunctionExtractor {
static vector<Value> ToValueVector(vector<string> &string_vector) {
vector<Value> result;
for (string &str : string_vector) {
result.emplace_back(Value(str));
result.emplace_back(str);
}
return result;
}
Expand Down
6 changes: 3 additions & 3 deletions src/duckdb/src/function/table/version/pragma_version.cpp
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
#ifndef DUCKDB_PATCH_VERSION
#define DUCKDB_PATCH_VERSION "2-dev356"
#define DUCKDB_PATCH_VERSION "2-dev408"
#endif
#ifndef DUCKDB_MINOR_VERSION
#define DUCKDB_MINOR_VERSION 5
Expand All @@ -8,10 +8,10 @@
#define DUCKDB_MAJOR_VERSION 1
#endif
#ifndef DUCKDB_VERSION
#define DUCKDB_VERSION "v1.5.2-dev356"
#define DUCKDB_VERSION "v1.5.2-dev408"
#endif
#ifndef DUCKDB_SOURCE_ID
#define DUCKDB_SOURCE_ID "10c4e2493e"
#define DUCKDB_SOURCE_ID "cbc6f2230e"
#endif
#include "duckdb/function/table/system_functions.hpp"
#include "duckdb/main/database.hpp"
Expand Down
2 changes: 1 addition & 1 deletion src/duckdb/src/function/window/window_merge_sort_tree.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ WindowMergeSortTree::WindowMergeSortTree(ClientContext &client, const vector<Bou
auto unique_expr = make_uniq<BoundReferenceExpression>(scan_types.back(), orders.size());
const auto order_type = OrderType::ASCENDING;
const auto order_by_type = OrderByNullType::NULLS_LAST;
orders.emplace_back(BoundOrderByNode(order_type, order_by_type, std::move(unique_expr)));
orders.emplace_back(order_type, order_by_type, std::move(unique_expr));
key_cols.emplace_back(key_cols.size());
}

Expand Down
18 changes: 18 additions & 0 deletions src/duckdb/src/include/duckdb.h
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,8 @@ typedef enum DUCKDB_TYPE {
DUCKDB_TYPE_INTEGER_LITERAL = 38,
// duckdb_time_ns (nanoseconds)
DUCKDB_TYPE_TIME_NS = 39,
// GEOMETRY type, WKB blob
DUCKDB_TYPE_GEOMETRY = 40,
} duckdb_type;

//! An enum over the returned state of different functions.
Expand Down Expand Up @@ -6254,6 +6256,22 @@ Registers a custom log storage for the logger.
*/
DUCKDB_C_API duckdb_state duckdb_register_log_storage(duckdb_database database, duckdb_log_storage log_storage);

//----------------------------------------------------------------------------------------------------------------------
// Geometry Helpers
//----------------------------------------------------------------------------------------------------------------------
// DESCRIPTION:
// Functions to operate on GEOMETRY types`.
//----------------------------------------------------------------------------------------------------------------------

/*!
Gets the CRS (Coordinate Reference System) of a GEOMETRY type.
Result must be freed with `duckdb_free`.

* @param type The GEOMETRY type.
* @return The CRS of the GEOMETRY type, or NULL if the type is not a GEOMETRY type.
*/
DUCKDB_C_API char *duckdb_geometry_type_get_crs(duckdb_logical_type type);

#endif

#ifdef __cplusplus
Expand Down
14 changes: 13 additions & 1 deletion src/duckdb/src/include/duckdb/common/adbc/wrappers.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
#pragma once

#include "duckdb.h"
#include "duckdb/common/mutex.hpp"
#include "duckdb/common/string.hpp"
#include "duckdb/common/unordered_map.hpp"
#include "duckdb/common/vector.hpp"
Expand All @@ -22,7 +23,18 @@ namespace duckdb {
struct DuckDBAdbcConnectionWrapper {
duckdb_connection connection;
unordered_map<string, string> options;
//! Active stream wrappers on this connection (for materialization on concurrent execute)

//! Register a stream wrapper so it can be materialized if another query runs on this connection.
void RegisterStream(duckdb_adbc::DuckDBAdbcStreamWrapper *stream);
//! Unregister a stream wrapper.
void UnregisterStream(duckdb_adbc::DuckDBAdbcStreamWrapper *stream);
//! Materialize all active streams, fetching remaining data into memory.
void MaterializeStreams();
//! Detach all streams from this connection and clear the list (called on connection release).
void DetachAndClearStreams();

private:
mutex stream_mutex;
vector<duckdb_adbc::DuckDBAdbcStreamWrapper *> active_streams;
};
} // namespace duckdb
Loading
Loading