diff --git a/src/duckdb/src/common/adbc/adbc.cpp b/src/duckdb/src/common/adbc/adbc.cpp index 97ab28ded..71bbab564 100644 --- a/src/duckdb/src/common/adbc/adbc.cpp +++ b/src/duckdb/src/common/adbc/adbc.cpp @@ -138,8 +138,6 @@ struct DuckDBAdbcStreamWrapper { duckdb::DuckDBAdbcConnectionWrapper *conn_wrapper; }; -static void MaterializeActiveStreams(duckdb::DuckDBAdbcConnectionWrapper *conn_wrapper); - static bool IsInterruptError(const char *message) { if (!message) { return false; @@ -965,14 +963,9 @@ AdbcStatusCode ConnectionRelease(struct AdbcConnection *connection, struct AdbcE if (connection && connection->private_data) { auto conn_wrapper = static_cast(connection->private_data); // Materialize active streams before disconnecting so they remain readable - MaterializeActiveStreams(conn_wrapper); + conn_wrapper->MaterializeStreams(); // Detach active streams before deleting conn_wrapper to avoid dangling pointers - for (auto *stream_wrapper : conn_wrapper->active_streams) { - if (stream_wrapper) { - stream_wrapper->conn_wrapper = nullptr; - } - } - conn_wrapper->active_streams.clear(); + conn_wrapper->DetachAndClearStreams(); auto conn = reinterpret_cast(conn_wrapper->connection); duckdb_disconnect(reinterpret_cast(&conn)); delete conn_wrapper; @@ -1072,13 +1065,9 @@ void release(struct ArrowArrayStream *stream) { } auto result_wrapper = reinterpret_cast(stream->private_data); if (result_wrapper) { - // Unregister from connection's active_streams + // Unregister from connection's active streams if (result_wrapper->conn_wrapper) { - auto &active = result_wrapper->conn_wrapper->active_streams; - auto it = std::find(active.begin(), active.end(), result_wrapper); - if (it != active.end()) { - active.erase(it); - } + result_wrapper->conn_wrapper->UnregisterStream(result_wrapper); } // Clean up materialized data if present if (result_wrapper->materialized) { @@ -1506,75 +1495,6 @@ static AdbcStatusCode IngestToTableFromBoundStream(DuckDBAdbcStatementWrapper *s rows_affected); } -// Materialize all active streams on a connection so that a new query can execute. -// This fetches remaining data from each streaming result into memory, making the -// streams independent of the connection's active query context. -static void MaterializeActiveStreams(duckdb::DuckDBAdbcConnectionWrapper *conn_wrapper) { - for (auto *result_wrapper : conn_wrapper->active_streams) { - if (!result_wrapper || result_wrapper->materialized) { - continue; - } - - // Collect remaining batches from the streaming result - duckdb::vector batches; - auto arrow_options = duckdb_result_get_arrow_options(&result_wrapper->result); - while (true) { - ArrowArray array; - std::memset(&array, 0, sizeof(ArrowArray)); - - auto duckdb_chunk = duckdb_fetch_chunk(result_wrapper->result); - if (!duckdb_chunk) { - break; - } - auto conversion_err = duckdb_data_chunk_to_arrow(arrow_options, duckdb_chunk, &array); - duckdb_destroy_data_chunk(&duckdb_chunk); - - if (conversion_err) { - duckdb_destroy_error_data(&conversion_err); - if (array.release) { - array.release(&array); - } - break; - } - batches.push_back(array); - } - duckdb_destroy_arrow_options(&arrow_options); - - // Store materialized data - auto mat = static_cast(malloc(sizeof(MaterializedData))); - if (!mat) { - // Allocation failed — release fetched batches and skip materialization - for (auto &batch : batches) { - if (batch.release) { - batch.release(&batch); - } - } - continue; - } - mat->current = 0; - mat->count = static_cast(batches.size()); - if (!batches.empty()) { - mat->batches = static_cast(malloc(sizeof(ArrowArray) * batches.size())); - if (!mat->batches) { - // Allocation failed — release fetched batches and skip materialization - for (auto &batch : batches) { - if (batch.release) { - batch.release(&batch); - } - } - free(mat); - continue; - } - for (idx_t i = 0; i < batches.size(); i++) { - mat->batches[i] = batches[i]; - } - } else { - mat->batches = nullptr; - } - result_wrapper->materialized = mat; - } -} - AdbcStatusCode StatementExecuteQuery(struct AdbcStatement *statement, struct ArrowArrayStream *out, int64_t *rows_affected, struct AdbcError *error) { if (!statement) { @@ -1595,7 +1515,7 @@ AdbcStatusCode StatementExecuteQuery(struct AdbcStatement *statement, struct Arr // Without materialization, executing a new query would silently invalidate any existing streaming results on the // same connection. if (wrapper->conn_wrapper) { - MaterializeActiveStreams(wrapper->conn_wrapper); + wrapper->conn_wrapper->MaterializeStreams(); } // TODO: Set affected rows, careful with early return @@ -1755,7 +1675,7 @@ AdbcStatusCode StatementExecuteQuery(struct AdbcStatement *statement, struct Arr out->get_last_error = get_last_error; // Register this stream wrapper so it can be materialized if another query runs if (wrapper->conn_wrapper) { - wrapper->conn_wrapper->active_streams.push_back(stream_wrapper); + wrapper->conn_wrapper->RegisterStream(stream_wrapper); } } else { // Caller didn't request a stream; clean up resources @@ -1803,7 +1723,7 @@ AdbcStatusCode StatementSetSqlQuery(struct AdbcStatement *statement, const char // Materialize any active streams before preparing if (wrapper->conn_wrapper) { - MaterializeActiveStreams(wrapper->conn_wrapper); + wrapper->conn_wrapper->MaterializeStreams(); } if (wrapper->ingestion_stream.release) { @@ -2468,6 +2388,96 @@ AdbcStatusCode ConnectionGetTableTypes(struct AdbcConnection *connection, struct } // namespace duckdb_adbc +void duckdb::DuckDBAdbcConnectionWrapper::RegisterStream(duckdb_adbc::DuckDBAdbcStreamWrapper *stream) { + const duckdb::lock_guard guard(stream_mutex); + active_streams.push_back(stream); +} + +void duckdb::DuckDBAdbcConnectionWrapper::UnregisterStream(duckdb_adbc::DuckDBAdbcStreamWrapper *stream) { + const duckdb::lock_guard guard(stream_mutex); + auto it = std::find(active_streams.begin(), active_streams.end(), stream); + if (it != active_streams.end()) { + active_streams.erase(it); + } +} + +void duckdb::DuckDBAdbcConnectionWrapper::MaterializeStreams() { + const duckdb::lock_guard guard(stream_mutex); + for (auto *result_wrapper : active_streams) { + if (!result_wrapper || result_wrapper->materialized) { + continue; + } + + // Collect remaining batches from the streaming result + duckdb::vector batches; + auto arrow_options = duckdb_result_get_arrow_options(&result_wrapper->result); + while (true) { + ArrowArray array; + std::memset(&array, 0, sizeof(ArrowArray)); + + auto duckdb_chunk = duckdb_fetch_chunk(result_wrapper->result); + if (!duckdb_chunk) { + break; + } + auto conversion_err = duckdb_data_chunk_to_arrow(arrow_options, duckdb_chunk, &array); + duckdb_destroy_data_chunk(&duckdb_chunk); + + if (conversion_err) { + duckdb_destroy_error_data(&conversion_err); + if (array.release) { + array.release(&array); + } + break; + } + batches.push_back(array); + } + duckdb_destroy_arrow_options(&arrow_options); + + // Store materialized data + auto mat = static_cast(malloc(sizeof(duckdb_adbc::MaterializedData))); + if (!mat) { + // Allocation failed — release fetched batches and skip materialization + for (auto &batch : batches) { + if (batch.release) { + batch.release(&batch); + } + } + continue; + } + mat->current = 0; + mat->count = static_cast(batches.size()); + if (!batches.empty()) { + mat->batches = static_cast(malloc(sizeof(ArrowArray) * batches.size())); + if (!mat->batches) { + // Allocation failed — release fetched batches and skip materialization + for (auto &batch : batches) { + if (batch.release) { + batch.release(&batch); + } + } + free(mat); + continue; + } + for (idx_t i = 0; i < batches.size(); i++) { + mat->batches[i] = batches[i]; + } + } else { + mat->batches = nullptr; + } + result_wrapper->materialized = mat; + } +} + +void duckdb::DuckDBAdbcConnectionWrapper::DetachAndClearStreams() { + const duckdb::lock_guard guard(stream_mutex); + for (auto *stream_wrapper : active_streams) { + if (stream_wrapper) { + stream_wrapper->conn_wrapper = nullptr; + } + } + active_streams.clear(); +} + static void ReleaseError(struct AdbcError *error) { if (error) { if (error->message) diff --git a/src/duckdb/src/execution/operator/join/physical_hash_join.cpp b/src/duckdb/src/execution/operator/join/physical_hash_join.cpp index b8f234ac8..ebef663af 100644 --- a/src/duckdb/src/execution/operator/join/physical_hash_join.cpp +++ b/src/duckdb/src/execution/operator/join/physical_hash_join.cpp @@ -886,7 +886,9 @@ unique_ptr JoinFilterPushdownInfo::FinalizeFilters(ClientContext &con break; } - if (ht && CanUseBloomFilter(context, ht, op, cmp, is_perfect_hashtable)) { + auto condition_type = op.conditions[join_condition[filter_idx]].left->return_type; + bool has_cast = condition_type != pushdown_column.storage_type; + if (!has_cast && ht && CanUseBloomFilter(context, ht, op, cmp, is_perfect_hashtable)) { PushBloomFilter(info, *ht, op, filter_col_idx); } } diff --git a/src/duckdb/src/execution/operator/persistent/physical_delete.cpp b/src/duckdb/src/execution/operator/persistent/physical_delete.cpp index 4603e6147..86e0e7a8b 100644 --- a/src/duckdb/src/execution/operator/persistent/physical_delete.cpp +++ b/src/duckdb/src/execution/operator/persistent/physical_delete.cpp @@ -9,6 +9,7 @@ #include "duckdb/storage/table/delete_state.hpp" #include "duckdb/storage/table/scan_state.hpp" #include "duckdb/transaction/duck_transaction.hpp" +#include "duckdb/transaction/local_storage.hpp" namespace duckdb { @@ -128,21 +129,10 @@ SinkResultType PhysicalDelete::Sink(ExecutionContext &context, DataChunk &chunk, l_state.delete_chunk.SetCardinality(fetch_chunk); // Append the deleted row IDs to the delete indexes. - // If we only delete local row IDs, then the delete_chunk is empty. if (g_state.has_unique_indexes && l_state.delete_chunk.size() != 0) { auto &local_storage = LocalStorage::Get(context.client, table.db); auto storage = local_storage.GetStorage(table); - IndexAppendInfo index_append_info(IndexAppendMode::IGNORE_DUPLICATES, nullptr); - for (auto &index : storage->delete_indexes.Indexes()) { - if (!index.IsBound() || !index.IsUnique()) { - continue; - } - auto &bound_index = index.Cast(); - auto error = bound_index.Append(l_state.delete_chunk, row_ids, index_append_info); - if (error.HasError()) { - throw InternalException("failed to update delete ART in physical delete: ", error.Message()); - } - } + storage->AppendToDeleteIndexes(row_ids, l_state.delete_chunk); } auto deleted_count = table.Delete(*l_state.delete_state, context.client, row_ids, chunk.size()); diff --git a/src/duckdb/src/function/table/system/duckdb_functions.cpp b/src/duckdb/src/function/table/system/duckdb_functions.cpp index 09ce83bcd..f2458f73c 100644 --- a/src/duckdb/src/function/table/system/duckdb_functions.cpp +++ b/src/duckdb/src/function/table/system/duckdb_functions.cpp @@ -11,13 +11,11 @@ #include "duckdb/catalog/catalog_entry/table_function_catalog_entry.hpp" #include "duckdb/catalog/catalog_entry/pragma_function_catalog_entry.hpp" #include "duckdb/parser/expression/columnref_expression.hpp" -#include "duckdb/common/algorithm.hpp" #include "duckdb/common/optional_idx.hpp" #include "duckdb/common/types.hpp" #include "duckdb/main/client_data.hpp" #include "duckdb/parser/expression/window_expression.hpp" #include "duckdb/main/database.hpp" -#include "duckdb/parser/parsed_data/create_scalar_function_info.hpp" #include "duckdb/function/scalar_function.hpp" namespace duckdb { @@ -571,7 +569,7 @@ struct PragmaFunctionExtractor { static vector ToValueVector(vector &string_vector) { vector result; for (string &str : string_vector) { - result.emplace_back(Value(str)); + result.emplace_back(str); } return result; } diff --git a/src/duckdb/src/function/table/version/pragma_version.cpp b/src/duckdb/src/function/table/version/pragma_version.cpp index a3623e18a..acf3eb70e 100644 --- a/src/duckdb/src/function/table/version/pragma_version.cpp +++ b/src/duckdb/src/function/table/version/pragma_version.cpp @@ -1,5 +1,5 @@ #ifndef DUCKDB_PATCH_VERSION -#define DUCKDB_PATCH_VERSION "2-dev356" +#define DUCKDB_PATCH_VERSION "2-dev408" #endif #ifndef DUCKDB_MINOR_VERSION #define DUCKDB_MINOR_VERSION 5 @@ -8,10 +8,10 @@ #define DUCKDB_MAJOR_VERSION 1 #endif #ifndef DUCKDB_VERSION -#define DUCKDB_VERSION "v1.5.2-dev356" +#define DUCKDB_VERSION "v1.5.2-dev408" #endif #ifndef DUCKDB_SOURCE_ID -#define DUCKDB_SOURCE_ID "10c4e2493e" +#define DUCKDB_SOURCE_ID "cbc6f2230e" #endif #include "duckdb/function/table/system_functions.hpp" #include "duckdb/main/database.hpp" diff --git a/src/duckdb/src/function/window/window_merge_sort_tree.cpp b/src/duckdb/src/function/window/window_merge_sort_tree.cpp index 5943e6228..b4f51f3d8 100644 --- a/src/duckdb/src/function/window/window_merge_sort_tree.cpp +++ b/src/duckdb/src/function/window/window_merge_sort_tree.cpp @@ -43,7 +43,7 @@ WindowMergeSortTree::WindowMergeSortTree(ClientContext &client, const vector(scan_types.back(), orders.size()); const auto order_type = OrderType::ASCENDING; const auto order_by_type = OrderByNullType::NULLS_LAST; - orders.emplace_back(BoundOrderByNode(order_type, order_by_type, std::move(unique_expr))); + orders.emplace_back(order_type, order_by_type, std::move(unique_expr)); key_cols.emplace_back(key_cols.size()); } diff --git a/src/duckdb/src/include/duckdb.h b/src/duckdb/src/include/duckdb.h index a9f9e79cd..56ee87b78 100644 --- a/src/duckdb/src/include/duckdb.h +++ b/src/duckdb/src/include/duckdb.h @@ -138,6 +138,8 @@ typedef enum DUCKDB_TYPE { DUCKDB_TYPE_INTEGER_LITERAL = 38, // duckdb_time_ns (nanoseconds) DUCKDB_TYPE_TIME_NS = 39, + // GEOMETRY type, WKB blob + DUCKDB_TYPE_GEOMETRY = 40, } duckdb_type; //! An enum over the returned state of different functions. @@ -6254,6 +6256,22 @@ Registers a custom log storage for the logger. */ DUCKDB_C_API duckdb_state duckdb_register_log_storage(duckdb_database database, duckdb_log_storage log_storage); +//---------------------------------------------------------------------------------------------------------------------- +// Geometry Helpers +//---------------------------------------------------------------------------------------------------------------------- +// DESCRIPTION: +// Functions to operate on GEOMETRY types`. +//---------------------------------------------------------------------------------------------------------------------- + +/*! +Gets the CRS (Coordinate Reference System) of a GEOMETRY type. +Result must be freed with `duckdb_free`. + +* @param type The GEOMETRY type. +* @return The CRS of the GEOMETRY type, or NULL if the type is not a GEOMETRY type. +*/ +DUCKDB_C_API char *duckdb_geometry_type_get_crs(duckdb_logical_type type); + #endif #ifdef __cplusplus diff --git a/src/duckdb/src/include/duckdb/common/adbc/wrappers.hpp b/src/duckdb/src/include/duckdb/common/adbc/wrappers.hpp index c34ccb54f..20f6bbb11 100644 --- a/src/duckdb/src/include/duckdb/common/adbc/wrappers.hpp +++ b/src/duckdb/src/include/duckdb/common/adbc/wrappers.hpp @@ -9,6 +9,7 @@ #pragma once #include "duckdb.h" +#include "duckdb/common/mutex.hpp" #include "duckdb/common/string.hpp" #include "duckdb/common/unordered_map.hpp" #include "duckdb/common/vector.hpp" @@ -22,7 +23,18 @@ namespace duckdb { struct DuckDBAdbcConnectionWrapper { duckdb_connection connection; unordered_map options; - //! Active stream wrappers on this connection (for materialization on concurrent execute) + + //! Register a stream wrapper so it can be materialized if another query runs on this connection. + void RegisterStream(duckdb_adbc::DuckDBAdbcStreamWrapper *stream); + //! Unregister a stream wrapper. + void UnregisterStream(duckdb_adbc::DuckDBAdbcStreamWrapper *stream); + //! Materialize all active streams, fetching remaining data into memory. + void MaterializeStreams(); + //! Detach all streams from this connection and clear the list (called on connection release). + void DetachAndClearStreams(); + +private: + mutex stream_mutex; vector active_streams; }; } // namespace duckdb diff --git a/src/duckdb/src/include/duckdb/main/capi/extension_api.hpp b/src/duckdb/src/include/duckdb/main/capi/extension_api.hpp index 9be892ef5..8262e4334 100644 --- a/src/duckdb/src/include/duckdb/main/capi/extension_api.hpp +++ b/src/duckdb/src/include/duckdb/main/capi/extension_api.hpp @@ -592,6 +592,9 @@ typedef struct { int64_t (*duckdb_file_handle_tell)(duckdb_file_handle file_handle); duckdb_state (*duckdb_file_handle_sync)(duckdb_file_handle file_handle); int64_t (*duckdb_file_handle_size)(duckdb_file_handle file_handle); + // API to operate on GEOMETRY types. + + char *(*duckdb_geometry_type_get_crs)(duckdb_logical_type type); // API to register a custom log storage. duckdb_log_storage (*duckdb_create_log_storage)(); @@ -1174,6 +1177,7 @@ inline duckdb_ext_api_v1 CreateAPIv1() { result.duckdb_file_handle_tell = duckdb_file_handle_tell; result.duckdb_file_handle_sync = duckdb_file_handle_sync; result.duckdb_file_handle_size = duckdb_file_handle_size; + result.duckdb_geometry_type_get_crs = duckdb_geometry_type_get_crs; result.duckdb_create_log_storage = duckdb_create_log_storage; result.duckdb_destroy_log_storage = duckdb_destroy_log_storage; result.duckdb_log_storage_set_write_log_entry = duckdb_log_storage_set_write_log_entry; diff --git a/src/duckdb/src/include/duckdb/optimizer/topn_window_elimination.hpp b/src/duckdb/src/include/duckdb/optimizer/topn_window_elimination.hpp index a94b006d1..a77077c97 100644 --- a/src/duckdb/src/include/duckdb/optimizer/topn_window_elimination.hpp +++ b/src/duckdb/src/include/duckdb/optimizer/topn_window_elimination.hpp @@ -58,10 +58,10 @@ class TopNWindowElimination : public BaseColumnPruner { unique_ptr CreateRowNumberGenerator(unique_ptr aggregate_column_ref) const; void AddStructExtractExprs(vector> &exprs, const LogicalType &struct_type, const unique_ptr &aggregate_column_ref) const; - static void UpdateTopmostBindings(idx_t window_idx, const unique_ptr &op, - const map &group_idxs, - const vector &topmost_bindings, - vector &new_bindings, ColumnBindingReplacer &replacer); + unique_ptr + UpdateTopmostBindings(idx_t window_idx, unique_ptr op, const vector &types, + const map &group_idxs, const vector &topmost_bindings, + vector &new_bindings, ColumnBindingReplacer &replacer); TopNWindowEliminationParameters ExtractOptimizerParameters(const LogicalWindow &window, const LogicalFilter &filter, const vector &bindings, vector> &aggregate_payload); diff --git a/src/duckdb/src/include/duckdb/transaction/local_storage.hpp b/src/duckdb/src/include/duckdb/transaction/local_storage.hpp index 2d51384b2..868e3a13a 100644 --- a/src/duckdb/src/include/duckdb/transaction/local_storage.hpp +++ b/src/duckdb/src/include/duckdb/transaction/local_storage.hpp @@ -182,7 +182,7 @@ class LocalStorage { const vector &bound_columns, Expression &cast_expr); void MoveStorage(DataTable &old_dt, DataTable &new_dt); - void FetchChunk(DataTable &table, Vector &row_ids, idx_t count, const vector &col_ids, + void FetchChunk(DataTable &table, const Vector &row_ids, idx_t count, const vector &col_ids, DataChunk &chunk, ColumnFetchState &fetch_state); //! Returns true, if the local storage contains the row id. bool CanFetch(DataTable &table, const row_t row_id); diff --git a/src/duckdb/src/include/duckdb_extension.h b/src/duckdb/src/include/duckdb_extension.h index 78de2d88a..651cadc2b 100644 --- a/src/duckdb/src/include/duckdb_extension.h +++ b/src/duckdb/src/include/duckdb_extension.h @@ -677,6 +677,11 @@ typedef struct { int64_t (*duckdb_file_handle_size)(duckdb_file_handle file_handle); #endif +// API to operate on GEOMETRY types. +#ifdef DUCKDB_EXTENSION_API_VERSION_UNSTABLE + char *(*duckdb_geometry_type_get_crs)(duckdb_logical_type type); +#endif + // API to register a custom log storage. #ifdef DUCKDB_EXTENSION_API_VERSION_UNSTABLE duckdb_log_storage (*duckdb_create_log_storage)(); @@ -1307,6 +1312,9 @@ typedef struct { #define duckdb_file_handle_sync duckdb_ext_api.duckdb_file_handle_sync #define duckdb_file_handle_close duckdb_ext_api.duckdb_file_handle_close +// Version unstable_new_geo_functions +#define duckdb_geometry_type_get_crs duckdb_ext_api.duckdb_geometry_type_get_crs + // Version unstable_new_logger_functions #define duckdb_create_log_storage duckdb_ext_api.duckdb_create_log_storage #define duckdb_destroy_log_storage duckdb_ext_api.duckdb_destroy_log_storage diff --git a/src/duckdb/src/main/attached_database.cpp b/src/duckdb/src/main/attached_database.cpp index 46696c340..00a4db663 100644 --- a/src/duckdb/src/main/attached_database.cpp +++ b/src/duckdb/src/main/attached_database.cpp @@ -77,6 +77,14 @@ AttachOptions::AttachOptions(const unordered_map &attach_options, default_table = QualifiedName::Parse(StringValue::Get(entry.second.DefaultCastAs(LogicalType::VARCHAR))); continue; } + + if (entry.first == "hidden") { + auto is_hidden = BooleanValue::Get(entry.second.DefaultCastAs(LogicalType::BOOLEAN)); + if (is_hidden) { + visibility = AttachVisibility::HIDDEN; + } + continue; + } options.emplace(entry.first, entry.second); } } diff --git a/src/duckdb/src/main/capi/helper-c.cpp b/src/duckdb/src/main/capi/helper-c.cpp index b46fa7fdb..1acbe30af 100644 --- a/src/duckdb/src/main/capi/helper-c.cpp +++ b/src/duckdb/src/main/capi/helper-c.cpp @@ -85,6 +85,8 @@ LogicalTypeId LogicalTypeIdFromC(const duckdb_type type) { return LogicalTypeId::INTEGER_LITERAL; case DUCKDB_TYPE_TIME_NS: return LogicalTypeId::TIME_NS; + case DUCKDB_TYPE_GEOMETRY: + return LogicalTypeId::GEOMETRY; default: // LCOV_EXCL_START D_ASSERT(0); return LogicalTypeId::INVALID; @@ -175,6 +177,8 @@ duckdb_type LogicalTypeIdToC(const LogicalTypeId type) { return DUCKDB_TYPE_INTEGER_LITERAL; case LogicalTypeId::TIME_NS: return DUCKDB_TYPE_TIME_NS; + case LogicalTypeId::GEOMETRY: + return DUCKDB_TYPE_GEOMETRY; default: // LCOV_EXCL_START D_ASSERT(0); return DUCKDB_TYPE_INVALID; diff --git a/src/duckdb/src/main/capi/logical_types-c.cpp b/src/duckdb/src/main/capi/logical_types-c.cpp index d30a2d6db..814877ec9 100644 --- a/src/duckdb/src/main/capi/logical_types-c.cpp +++ b/src/duckdb/src/main/capi/logical_types-c.cpp @@ -2,6 +2,7 @@ #include "duckdb/parser/parsed_data/create_type_info.hpp" #include "duckdb/common/type_visitor.hpp" #include "duckdb/common/helper.hpp" +#include "duckdb/common/types/geometry_crs.hpp" namespace duckdb { @@ -405,3 +406,14 @@ duckdb_state duckdb_register_logical_type(duckdb_connection connection, duckdb_l } return DuckDBSuccess; } + +char *duckdb_geometry_type_get_crs(duckdb_logical_type type) { + if (!AssertLogicalTypeId(type, duckdb::LogicalTypeId::GEOMETRY)) { + return nullptr; + } + auto &logical_type = *(reinterpret_cast(type)); + if (!duckdb::GeoType::HasCRS(logical_type)) { + return nullptr; + } + return strdup(duckdb::GeoType::GetCRS(logical_type).GetDefinition().c_str()); +} diff --git a/src/duckdb/src/optimizer/topn_window_elimination.cpp b/src/duckdb/src/optimizer/topn_window_elimination.cpp index 75c73a656..8828893a3 100644 --- a/src/duckdb/src/optimizer/topn_window_elimination.cpp +++ b/src/duckdb/src/optimizer/topn_window_elimination.cpp @@ -5,11 +5,14 @@ #include "duckdb/common/assert.hpp" #include "duckdb/common/enums/expression_type.hpp" #include "duckdb/common/helper.hpp" +#include "duckdb/common/unordered_set.hpp" #include "duckdb/common/unique_ptr.hpp" #include "duckdb/optimizer/late_materialization_helper.hpp" #include "duckdb/planner/binder.hpp" #include "duckdb/planner/operator/logical_aggregate.hpp" #include "duckdb/planner/operator/logical_comparison_join.hpp" +#include "duckdb/planner/operator/logical_cte.hpp" +#include "duckdb/planner/operator/logical_cteref.hpp" #include "duckdb/planner/operator/logical_get.hpp" #include "duckdb/planner/operator/logical_filter.hpp" #include "duckdb/planner/operator/logical_projection.hpp" @@ -84,6 +87,36 @@ bool BindingsReferenceRowNumber(const vector &bindings, const Log return false; } +void GatherLocalCTEInfo(const LogicalOperator &op, unordered_set &definitions, + unordered_set &references) { + switch (op.type) { + case LogicalOperatorType::LOGICAL_MATERIALIZED_CTE: + case LogicalOperatorType::LOGICAL_RECURSIVE_CTE: + definitions.insert(op.Cast().table_index); + break; + case LogicalOperatorType::LOGICAL_CTE_REF: + references.insert(op.Cast().cte_index); + break; + default: + break; + } + for (const auto &child : op.children) { + GatherLocalCTEInfo(*child, definitions, references); + } +} + +bool HasExternalCTEReferences(const LogicalOperator &op) { + unordered_set definitions; + unordered_set references; + GatherLocalCTEInfo(op, definitions, references); + for (const auto &cte_index : references) { + if (!definitions.count(cte_index)) { + return true; + } + } + return false; +} + ColumnBinding GetRowNumberColumnBinding(const unique_ptr &op) { switch (op->type) { case LogicalOperatorType::LOGICAL_UNNEST: { @@ -187,6 +220,7 @@ unique_ptr TopNWindowElimination::OptimizeInternal(unique_ptr new_bindings; if (!TraverseProjectionBindings(topmost_bindings, child, new_bindings)) { return op; @@ -224,11 +258,15 @@ unique_ptr TopNWindowElimination::OptimizeInternal(unique_ptr(std::move(op)); } @@ -675,16 +713,15 @@ bool TopNWindowElimination::TraverseProjectionBindings(const vector &op, - const map &group_idxs, - const vector &topmost_bindings, - vector &new_bindings, - ColumnBindingReplacer &replacer) { +unique_ptr +TopNWindowElimination::UpdateTopmostBindings(idx_t window_idx, unique_ptr op, + const vector &types, const map &group_idxs, + const vector &topmost_bindings, + vector &new_bindings, ColumnBindingReplacer &replacer) { // The top-most operator's column order is: // [projected groups][aggregate args/value][row number] // Now set the new bindings according to this order and remember replacements in replacer D_ASSERT(topmost_bindings.size() == new_bindings.size()); - replacer.replacement_bindings.reserve(new_bindings.size()); set row_id_binding_idxs; const idx_t group_table_idx = GetGroupIdx(op); @@ -706,13 +743,12 @@ void TopNWindowElimination::UpdateTopmostBindings(const idx_t window_idx, const idx_t current_column_idx = 0; for (auto group_idx : group_idxs) { const auto group_referencing_idx = group_idx.first; - const auto column_idx = - compact_group_columns ? compact_group_projection_idxs[group_idx.second] : group_idx.second; + const auto column_idx = (compact_group_columns && op->type != LogicalOperatorType::LOGICAL_COMPARISON_JOIN) + ? compact_group_projection_idxs[group_idx.second] + : group_idx.second; new_bindings[group_referencing_idx].table_index = group_table_idx; new_bindings[group_referencing_idx].column_index = column_idx; - replacer.replacement_bindings.emplace_back(topmost_bindings[group_referencing_idx], - new_bindings[group_referencing_idx]); current_column_idx++; } @@ -737,7 +773,6 @@ void TopNWindowElimination::UpdateTopmostBindings(const idx_t window_idx, const } binding.column_index = current_column_idx++; binding.table_index = aggregate_table_idx; - replacer.replacement_bindings.emplace_back(topmost_bindings[i], binding); } // Project the row number @@ -745,8 +780,27 @@ void TopNWindowElimination::UpdateTopmostBindings(const idx_t window_idx, const // Let all projections on row id point to the last output column auto &binding = new_bindings[row_id_binding_idx]; binding = GetRowNumberColumnBinding(op); - replacer.replacement_bindings.emplace_back(topmost_bindings[row_id_binding_idx], binding); } + + // If we are inside a SET operator, then replacing bindings is insufficient + // because the set operators assume that all the inputs have the same schema. + // To fix this, we have to inject another projection using the new bindings. + replacer.replacement_bindings.reserve(new_bindings.size()); + const auto proj_table = optimizer.binder.GenerateTableIndex(); + vector> proj_exprs; + for (idx_t i = 0; i < topmost_bindings.size(); ++i) { + auto &new_binding = new_bindings[i]; + proj_exprs.push_back(make_uniq(types[i], new_binding)); + new_binding.table_index = proj_table; + new_binding.column_index = i; + replacer.replacement_bindings.emplace_back(topmost_bindings[i], new_binding); + } + + auto set_projection = make_uniq(proj_table, std::move(proj_exprs)); + set_projection->children.push_back(std::move(op)); + set_projection->ResolveOperatorTypes(); + + return unique_ptr(std::move(set_projection)); } TopNWindowEliminationParameters diff --git a/src/duckdb/src/planner/binder/expression/bind_lambda.cpp b/src/duckdb/src/planner/binder/expression/bind_lambda.cpp index b6f77cafd..0f7ec4865 100644 --- a/src/duckdb/src/planner/binder/expression/bind_lambda.cpp +++ b/src/duckdb/src/planner/binder/expression/bind_lambda.cpp @@ -61,6 +61,26 @@ static void ExtractParameters(LambdaExpression &expr, vector &column_nam D_ASSERT(!column_names.empty()); } +static bool IsDoubleArrowRHS(const ParsedExpression &expr) { + if (expr.GetExpressionClass() != ExpressionClass::FUNCTION) { + return false; + } + auto &func = expr.Cast(); + return func.is_operator && func.function_name == "->>" && func.children.size() == 2; +} + +static unique_ptr RestructureArrowChain(LambdaExpression &expr) { + auto &rhs_func = expr.expr->Cast(); + auto inner_lambda = make_uniq(std::move(expr.lhs), std::move(rhs_func.children[0])); + inner_lambda->syntax_type = expr.syntax_type; + vector> children; + children.push_back(std::move(inner_lambda)); + children.push_back(std::move(rhs_func.children[1])); + auto restructured = make_uniq("->>", std::move(children)); + restructured->is_operator = true; + return std::move(restructured); +} + BindResult ExpressionBinder::BindExpression(LambdaExpression &expr, idx_t depth, const vector &function_child_types, optional_ptr bind_lambda_function) { @@ -69,6 +89,13 @@ BindResult ExpressionBinder::BindExpression(LambdaExpression &expr, idx_t depth, } if (!bind_lambda_function) { + // The PEG parser produces A -> (B ->> C) where the standard parser left-associates to (A -> B) ->> C. + // Restructure to match standard behavior before binding. + if (IsDoubleArrowRHS(*expr.expr)) { + unique_ptr restructured = RestructureArrowChain(expr); + return BindExpression(restructured, depth); + } + // This is not a lambda expression, but the JSON arrow operator. // Remember the original expression in case of a binding error. if (!expr.copied_expr) { diff --git a/src/duckdb/src/storage/data_table.cpp b/src/duckdb/src/storage/data_table.cpp index 2984a0fdb..230573941 100644 --- a/src/duckdb/src/storage/data_table.cpp +++ b/src/duckdb/src/storage/data_table.cpp @@ -29,6 +29,7 @@ #include "duckdb/storage/table_storage_info.hpp" #include "duckdb/transaction/duck_transaction.hpp" #include "duckdb/transaction/duck_transaction_manager.hpp" +#include "duckdb/transaction/local_storage.hpp" namespace duckdb { @@ -422,7 +423,76 @@ TableStorageInfo DataTable::GetStorageInfo() { //===--------------------------------------------------------------------===// void DataTable::Fetch(DuckTransaction &transaction, DataChunk &result, const vector &column_ids, const Vector &row_identifiers, idx_t fetch_count, ColumnFetchState &state) { + D_ASSERT(row_identifiers.GetVectorType() == VectorType::FLAT_VECTOR); + auto row_ids = FlatVector::GetData(row_identifiers); + + // Quick scan to check for transaction-local row IDs (>= MAX_ROW_ID). + bool has_local = false; + for (idx_t i = 0; i < fetch_count; i++) { + if (row_ids[i] >= MAX_ROW_ID) { + has_local = true; + break; + } + } + + if (!has_local) { + // All committed rows — fast path (common case). + row_groups->Fetch(transaction, result, column_ids, row_identifiers, fetch_count, state); + return; + } + + // There are local rows. Classify all row IDs to determine the split. + auto &local_storage = transaction.GetLocalStorage(); + SelectionVector committed_sel(fetch_count); + SelectionVector local_sel(fetch_count); + idx_t committed_count = 0; + idx_t local_count = 0; + for (idx_t i = 0; i < fetch_count; i++) { + if (row_ids[i] >= MAX_ROW_ID) { + local_sel.set_index(local_count++, i); + } else { + committed_sel.set_index(committed_count++, i); + } + } + + if (committed_count == 0) { + // All local rows. + local_storage.FetchChunk(*this, row_identifiers, fetch_count, column_ids, result, state); + return; + } + + // Mixed: some rows are committed, some are local. + // row_groups->Fetch silently skips local row IDs, packing committed rows at 0..committed_count-1. row_groups->Fetch(transaction, result, column_ids, row_identifiers, fetch_count, state); + D_ASSERT(result.size() == committed_count); + + // Fetch local rows into a separate chunk. + auto &allocator = Allocator::Get(local_storage.GetClientContext()); + DataChunk local_chunk; + local_chunk.Initialize(allocator, result.GetTypes()); + Vector local_row_ids(row_identifiers, local_sel, local_count); + local_row_ids.Flatten(local_count); + ColumnFetchState local_fetch_state; + local_storage.FetchChunk(*this, local_row_ids, local_count, column_ids, local_chunk, local_fetch_state); + + // Append local rows after committed rows in the result. + for (idx_t col = 0; col < result.ColumnCount(); col++) { + VectorOperations::Copy(local_chunk.data[col], result.data[col], local_count, 0, committed_count); + } + result.SetCardinality(committed_count + local_count); + + // Build inverse permutation to restore original row order. + // Current layout: [committed rows in relative order | local rows in relative order]. + SelectionVector inv_perm(fetch_count); + for (idx_t i = 0; i < committed_count; i++) { + inv_perm.set_index(committed_sel.get_index(i), i); + } + for (idx_t j = 0; j < local_count; j++) { + inv_perm.set_index(local_sel.get_index(j), committed_count + j); + } + for (idx_t col = 0; col < result.ColumnCount(); col++) { + result.data[col].Slice(inv_perm, fetch_count); + } } void DataTable::FetchCommitted(DataChunk &result, const vector &column_ids, const Vector &row_identifiers, diff --git a/src/duckdb/src/storage/local_storage.cpp b/src/duckdb/src/storage/local_storage.cpp index be434cbf4..3922c6b7e 100644 --- a/src/duckdb/src/storage/local_storage.cpp +++ b/src/duckdb/src/storage/local_storage.cpp @@ -431,13 +431,37 @@ void LocalTableStorage::AppendToDeleteIndexes(Vector &row_ids, DataChunk &delete return; } + // Only committed row IDs (< MAX_ROW_ID) belong in the delete indexes. + // Local row IDs (>= MAX_ROW_ID) live in transaction-local storage and are + // handled directly by LocalStorage::Delete. + row_ids.Flatten(delete_chunk.size()); + auto flat_row_ids = FlatVector::GetData(row_ids); + idx_t committed_count = 0; + SelectionVector committed_sel(delete_chunk.size()); + for (idx_t i = 0; i < delete_chunk.size(); i++) { + if (flat_row_ids[i] < MAX_ROW_ID) { + committed_sel.set_index(committed_count++, i); + } + } + + if (committed_count == 0) { + return; + } + + DataChunk committed_chunk; + committed_chunk.Initialize(Allocator::DefaultAllocator(), delete_chunk.GetTypes()); + committed_chunk.Slice(delete_chunk, committed_sel, committed_count); + + Vector committed_row_ids(row_ids, committed_sel, committed_count); + committed_row_ids.Flatten(committed_count); + for (auto &index : delete_indexes.Indexes()) { D_ASSERT(index.IsBound()); if (!index.IsUnique()) { continue; } IndexAppendInfo index_append_info(IndexAppendMode::IGNORE_DUPLICATES, nullptr); - auto result = index.Cast().Append(delete_chunk, row_ids, index_append_info); + auto result = index.Cast().Append(committed_chunk, committed_row_ids, index_append_info); if (result.HasError()) { throw InternalException("unexpected constraint violation on delete ART: ", result.Message()); } @@ -695,7 +719,7 @@ void LocalStorage::ChangeType(DataTable &old_dt, DataTable &new_dt, idx_t change table_manager.InsertEntry(new_dt, std::move(new_storage)); } -void LocalStorage::FetchChunk(DataTable &table, Vector &row_ids, idx_t count, const vector &col_ids, +void LocalStorage::FetchChunk(DataTable &table, const Vector &row_ids, idx_t count, const vector &col_ids, DataChunk &chunk, ColumnFetchState &fetch_state) { auto storage = table_manager.GetStorage(table); if (!storage) {