Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions src/duckdb/extension/parquet/include/parquet_reader.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,7 @@ struct ParquetOptions {
explicit ParquetOptions(ClientContext &context);

bool binary_as_string = false;
bool variant_legacy_encoding = false;
bool file_row_number = false;
shared_ptr<ParquetEncryptionConfig> encryption_config;

Expand Down
38 changes: 38 additions & 0 deletions src/duckdb/extension/parquet/parquet_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -527,6 +527,38 @@ static bool IsGeometryType(const SchemaElement &s_ele, const ParquetFileMetadata
return false;
}

static bool IsVariantType(const SchemaElement &root, const vector<ParquetColumnSchema> &children) {
if (children.size() < 2) {
return false;
}
//! Names have to be 'metadata' and 'value' respectively
//! But apparently some writers can mix the order, so we are more lenient
if (children[0].name != "metadata" && children[1].name != "metadata") {
return false;
}
if (children[0].name != "value" && children[1].name != "value") {
return false;
}

//! Verify types
if (children[0].parquet_type != duckdb_parquet::Type::BYTE_ARRAY) {
return false;
}
if (children[1].parquet_type != duckdb_parquet::Type::BYTE_ARRAY) {
return false;
}
if (children.size() == 3) {
auto &typed_value = children[2];
if (typed_value.name != "typed_value") {
return false;
}
throw NotImplementedException("Shredded Variants are not supported yet");
} else if (children.size() != 2) {
return false;
}
return true;
}

ParquetColumnSchema ParquetReader::ParseSchemaRecursive(idx_t depth, idx_t max_define, idx_t max_repeat,
idx_t &next_schema_idx, idx_t &next_file_idx,
ClientContext &context) {
Expand Down Expand Up @@ -628,6 +660,9 @@ ParquetColumnSchema ParquetReader::ParseSchemaRecursive(idx_t depth, idx_t max_d
const bool is_map = s_ele.__isset.converted_type && s_ele.converted_type == ConvertedType::MAP;
bool is_map_kv = s_ele.__isset.converted_type && s_ele.converted_type == ConvertedType::MAP_KEY_VALUE;
bool is_variant = s_ele.__isset.logicalType && s_ele.logicalType.__isset.VARIANT == true;
if (!is_variant && parquet_options.variant_legacy_encoding && IsVariantType(s_ele, child_schemas)) {
is_variant = true;
}

if (!is_map_kv && this_idx > 0) {
// check if the parent node of this is a map
Expand Down Expand Up @@ -797,6 +832,9 @@ ParquetOptions::ParquetOptions(ClientContext &context) {
if (context.TryGetCurrentSetting("binary_as_string", lookup_value)) {
binary_as_string = lookup_value.GetValue<bool>();
}
if (context.TryGetCurrentSetting("__delta_only_variant_encoding_enabled", lookup_value)) {
variant_legacy_encoding = lookup_value.GetValue<bool>();
}
}

ParquetColumnDefinition ParquetColumnDefinition::FromSchemaValue(ClientContext &context, const Value &column_value) {
Expand Down
4 changes: 3 additions & 1 deletion src/duckdb/src/catalog/catalog_entry/duck_schema_entry.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,9 @@ optional_ptr<CatalogEntry> DuckSchemaEntry::AddEntryInternal(CatalogTransaction
if (!set.CreateEntry(transaction, entry_name, std::move(entry), dependencies)) {
// entry already exists!
if (on_conflict == OnCreateConflict::ERROR_ON_CONFLICT) {
throw CatalogException::EntryAlreadyExists(entry_type, entry_name);
auto existing_entry = set.GetEntry(transaction, entry_name);
auto existing_type = existing_entry ? existing_entry->type : entry_type;
throw CatalogException::EntryAlreadyExists(existing_type, entry_name);
} else {
return nullptr;
}
Expand Down
2 changes: 2 additions & 0 deletions src/duckdb/src/catalog/catalog_entry/duck_table_entry.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -745,6 +745,7 @@ unique_ptr<CatalogEntry> DuckTableEntry::RemoveColumn(ClientContext &context, Re
dropped_column_is_generated);

auto bound_create_info = binder->BindCreateTableInfo(std::move(create_info), schema);
info.new_dependencies = make_uniq<LogicalDependencyList>(std::move(bound_create_info->dependencies));
if (columns.GetColumn(LogicalIndex(removed_index)).Generated()) {
return make_uniq<DuckTableEntry>(catalog, schema, *bound_create_info, storage);
}
Expand Down Expand Up @@ -968,6 +969,7 @@ unique_ptr<CatalogEntry> DuckTableEntry::SetDefault(ClientContext &context, SetD

auto binder = Binder::CreateBinder(context);
auto bound_create_info = binder->BindCreateTableInfo(std::move(create_info), schema);
info.new_dependencies = make_uniq<LogicalDependencyList>(std::move(bound_create_info->dependencies));
return make_uniq<DuckTableEntry>(catalog, schema, *bound_create_info, storage);
}

Expand Down
22 changes: 15 additions & 7 deletions src/duckdb/src/catalog/dependency_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -677,8 +677,13 @@ void DependencyManager::AlterObject(CatalogTransaction transaction, CatalogEntry
});

// Keep old dependencies
dependency_set_t dependents;
bool has_new_dependencies = alter_info.new_dependencies.get();
ScanSubjects(transaction, old_info, [&](DependencyEntry &dep) {
if (has_new_dependencies && !dep.Subject().flags.IsOwnership()) {
// The alter provided updated dependencies - skip old non-ownership subject dependencies
// as they will be replaced by the new dependencies
return;
}
auto entry = LookupEntry(transaction, dep);
if (!entry) {
return;
Expand All @@ -689,15 +694,18 @@ void DependencyManager::AlterObject(CatalogTransaction transaction, CatalogEntry
dependencies.emplace_back(dep_info);
});

// FIXME: we should update dependencies in the future
// some alters could cause dependencies to change (imagine types of table columns)
// or DEFAULT depending on a sequence
if (!StringUtil::CIEquals(old_obj.name, new_obj.name)) {
// The name has been changed, we need to recreate the dependency links
if (has_new_dependencies || !StringUtil::CIEquals(old_obj.name, new_obj.name)) {
// The dependencies have changed (e.g. SET DEFAULT) or the name has changed
// We need to recreate the dependency links
CleanupDependencies(transaction, old_obj);
}

// Reinstate the old dependencies
if (has_new_dependencies) {
// Add the new dependencies
CreateDependencies(transaction, new_obj, *alter_info.new_dependencies);
}

// Reinstate any old dependencies
for (auto &dep : dependencies) {
CreateDependency(transaction, dep);
}
Expand Down
20 changes: 18 additions & 2 deletions src/duckdb/src/execution/operator/join/physical_hash_join.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -726,7 +726,12 @@ void JoinFilterPushdownInfo::PushInFilter(const JoinFilterPushdownFilter &info,
// generate the OR-clause - note that we only need to consider unique values here (so we use a seT)
value_set_t unique_ht_values;
for (idx_t k = 0; k < key_count; k++) {
unique_ht_values.insert(build_vector.GetValue(k));
// Cast to storage type, only insert if it succeeds
auto value = build_vector.GetValue(k);
if (!value.DefaultTryCastAs(info.columns[filter_idx].storage_type)) {
return; // it's all or nothing sadly
}
unique_ht_values.insert(value);
}
vector<Value> in_list(unique_ht_values.begin(), unique_ht_values.end());

Expand Down Expand Up @@ -811,12 +816,23 @@ unique_ptr<DataChunk> JoinFilterPushdownInfo::FinalizeFilters(ClientContext &con
for (idx_t filter_idx = 0; filter_idx < join_condition.size(); filter_idx++) {
const auto cmp = op.conditions[join_condition[filter_idx]].comparison;
for (auto &info : probe_info) {
auto filter_col_idx = info.columns[filter_idx].probe_column_index.column_index;
const auto &pushdown_column = info.columns[filter_idx];
auto &filter_col_idx = pushdown_column.probe_column_index.column_index;
auto min_idx = filter_idx * 2;
auto max_idx = min_idx + 1;

auto min_val = final_min_max->data[min_idx].GetValue(0);
auto max_val = final_min_max->data[max_idx].GetValue(0);

// Cast to storage type, skip if fails
D_ASSERT(pushdown_column.storage_type.IsValid());
if (!min_val.DefaultTryCastAs(pushdown_column.storage_type)) {
continue;
}
if (!max_val.DefaultTryCastAs(pushdown_column.storage_type)) {
continue;
}

if (min_val.IsNull() || max_val.IsNull()) {
// min/max is NULL
// this can happen in case all values in the RHS column are NULL, but they are still pushed into the
Expand Down
67 changes: 33 additions & 34 deletions src/duckdb/src/function/aggregate/distributive/minmax.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -333,43 +333,42 @@ static AggregateFunction GetMinMaxOperator(const LogicalType &type) {
template <class OP, class OP_STRING, class OP_VECTOR>
unique_ptr<FunctionData> BindMinMax(ClientContext &context, AggregateFunction &function,
vector<unique_ptr<Expression>> &arguments) {
if (arguments[0]->return_type.id() == LogicalTypeId::VARCHAR) {
auto str_collation = StringType::GetCollation(arguments[0]->return_type);
if (!str_collation.empty() || !Settings::Get<DefaultCollationSetting>(context).empty()) {
// If aggr function is min/max and uses collations, replace bound_function with arg_min/arg_max
// to make sure the result's correctness.
string function_name = function.name == "min" ? "arg_min" : "arg_max";
QueryErrorContext error_context;
auto func = Catalog::GetEntry<AggregateFunctionCatalogEntry>(context, "", "", function_name,
OnEntryNotFound::RETURN_NULL, error_context);
if (!func) {
throw NotImplementedException(
"Failure while binding function \"%s\" using collations - arg_min/arg_max do not exist in the "
"catalog - load the core_functions module to fix this issue",
function.name);
}

auto &func_entry = *func;

FunctionBinder function_binder(context);
vector<LogicalType> types {arguments[0]->return_type, arguments[0]->return_type};
ErrorData error;
auto best_function = function_binder.BindFunction(func_entry.name, func_entry.functions, types, error);
if (!best_function.IsValid()) {
throw BinderException(string("Fail to find corresponding function for collation min/max: ") +
error.Message());
}
function = func_entry.functions.GetFunctionByOffset(best_function.GetIndex());
// We should also push collations for non-VARCHAR here, but we aren't ready for it yet (see internal #8704)
const auto collation = arguments[0]->return_type.id() == LogicalTypeId::VARCHAR &&
(!StringType::GetCollation(arguments[0]->return_type).empty() ||
!Settings::Get<DefaultCollationSetting>(context).empty());
auto collated_arg = collation ? arguments[0]->Copy() : nullptr;
if (collation && ExpressionBinder::PushCollation(context, collated_arg, collated_arg->return_type)) {
// If aggr function is min/max and uses collations, replace bound_function with arg_min/arg_max
// to make sure the result's correctness.
string function_name = function.name == "min" ? "arg_min" : "arg_max";
QueryErrorContext error_context;
auto func = Catalog::GetEntry<AggregateFunctionCatalogEntry>(context, "", "", function_name,
OnEntryNotFound::RETURN_NULL, error_context);
if (!func) {
throw NotImplementedException(
"Failure while binding function \"%s\" using collations - arg_min/arg_max do not exist in the "
"catalog - load the core_functions module to fix this issue",
function.name);
}

// Create a copied child and PushCollation for it.
arguments.push_back(arguments[0]->Copy());
ExpressionBinder::PushCollation(context, arguments[1], arguments[0]->return_type);
auto &func_entry = *func;

// Bind function like arg_min/arg_max.
function.arguments[0] = arguments[0]->return_type;
function.SetReturnType(arguments[0]->return_type);
return make_uniq<ArgMinMaxFunctionData>();
FunctionBinder function_binder(context);
vector<LogicalType> types {arguments[0]->return_type, collated_arg->return_type};
ErrorData error;
auto best_function = function_binder.BindFunction(func_entry.name, func_entry.functions, types, error);
if (!best_function.IsValid()) {
throw BinderException(string("Fail to find corresponding function for collation min/max: ") +
error.Message());
}
function = func_entry.functions.GetFunctionByOffset(best_function.GetIndex());

// Bind function like arg_min/arg_max.
arguments.push_back(std::move(collated_arg));
function.arguments[0] = arguments[0]->return_type;
function.SetReturnType(arguments[0]->return_type);
return make_uniq<ArgMinMaxFunctionData>();
}

auto input_type = arguments[0]->return_type;
Expand Down
6 changes: 3 additions & 3 deletions src/duckdb/src/function/table/version/pragma_version.cpp
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
#ifndef DUCKDB_PATCH_VERSION
#define DUCKDB_PATCH_VERSION "2-dev329"
#define DUCKDB_PATCH_VERSION "2-dev356"
#endif
#ifndef DUCKDB_MINOR_VERSION
#define DUCKDB_MINOR_VERSION 5
Expand All @@ -8,10 +8,10 @@
#define DUCKDB_MAJOR_VERSION 1
#endif
#ifndef DUCKDB_VERSION
#define DUCKDB_VERSION "v1.5.2-dev329"
#define DUCKDB_VERSION "v1.5.2-dev356"
#endif
#ifndef DUCKDB_SOURCE_ID
#define DUCKDB_SOURCE_ID "699249af49"
#define DUCKDB_SOURCE_ID "10c4e2493e"
#endif
#include "duckdb/function/table/system_functions.hpp"
#include "duckdb/main/database.hpp"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ struct LocalUngroupedAggregateState;
struct JoinFilterPushdownColumn {
//! The probe column index to which this filter should be applied
ColumnBinding probe_column_index;
//! The type of the value in storage (LogicalGet)
LogicalType storage_type;
};

struct JoinFilterGlobalState {
Expand Down
2 changes: 2 additions & 0 deletions src/duckdb/src/include/duckdb/main/config.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,8 @@ struct DBConfigOptions {
idx_t allocator_flush_threshold = 134217728ULL;
//! If bulk deallocation larger than this occurs, flush outstanding allocations (1 << 30, ~1GB)
idx_t allocator_bulk_deallocation_flush_threshold = 536870912ULL;
//! Delta Only! - Fall back to recognizing Variant columns structurally
bool variant_legacy_encoding = false;
//! Metadata from DuckDB callers
string custom_user_agent;
//! The default block header size for new duckdb database files.
Expand Down
10 changes: 10 additions & 0 deletions src/duckdb/src/include/duckdb/main/settings.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,16 @@ struct Settings {
// Start of the auto-generated list of settings structures
//===----------------------------------------------------------------------===//

struct DeltaOnlyVariantEncodingEnabledSetting {
using RETURN_TYPE = bool;
static constexpr const char *Name = "__delta_only_variant_encoding_enabled";
static constexpr const char *Description = "Enables the Parquet reader to identify a Variant structurally.";
static constexpr const char *InputType = "BOOLEAN";
static void SetGlobal(DatabaseInstance *db, DBConfig &config, const Value &parameter);
static void ResetGlobal(DatabaseInstance *db, DBConfig &config);
static Value GetSetting(const ClientContext &context);
};

struct AccessModeSetting {
using RETURN_TYPE = AccessMode;
static constexpr const char *Name = "access_mode";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
#include "duckdb/common/enums/catalog_type.hpp"
#include "duckdb/parser/parsed_data/parse_info.hpp"
#include "duckdb/common/enums/on_entry_not_found.hpp"
#include "duckdb/catalog/dependency_list.hpp"

namespace duckdb {

Expand Down Expand Up @@ -60,6 +61,8 @@ struct AlterInfo : public ParseInfo {
string name;
//! Allow altering internal entries
bool allow_internal;
//! New dependencies for the altered entry (set during binding)
unique_ptr<LogicalDependencyList> new_dependencies;

public:
virtual CatalogType GetCatalogType() const = 0;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,8 @@ class LogicalUpdate : public LogicalOperator {
DUCKDB_API static void BindExtraColumns(TableCatalogEntry &table, LogicalGet &get, LogicalProjection &proj,
LogicalUpdate &update, physical_index_set_t &bound_columns);

static void RewriteInPlaceUpdates(LogicalOperator &update_op);

protected:
vector<ColumnBinding> GetColumnBindings() override;
void ResolveTypes() override;
Expand Down
13 changes: 7 additions & 6 deletions src/duckdb/src/main/config.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ bool DBConfigOptions::debug_print_bindings = false;

static const ConfigurationOption internal_options[] = {

DUCKDB_GLOBAL(DeltaOnlyVariantEncodingEnabledSetting),
DUCKDB_GLOBAL(AccessModeSetting),
DUCKDB_SETTING_CALLBACK(AllocatorBackgroundThreadsSetting),
DUCKDB_GLOBAL(AllocatorBulkDeallocationFlushThresholdSetting),
Expand Down Expand Up @@ -206,12 +207,12 @@ static const ConfigurationOption internal_options[] = {
DUCKDB_SETTING(ZstdMinStringLengthSetting),
FINAL_SETTING};

static const ConfigurationAlias setting_aliases[] = {DUCKDB_SETTING_ALIAS("memory_limit", 98),
DUCKDB_SETTING_ALIAS("null_order", 42),
DUCKDB_SETTING_ALIAS("profiling_output", 117),
DUCKDB_SETTING_ALIAS("user", 132),
DUCKDB_SETTING_ALIAS("wal_autocheckpoint", 24),
DUCKDB_SETTING_ALIAS("worker_threads", 131),
static const ConfigurationAlias setting_aliases[] = {DUCKDB_SETTING_ALIAS("memory_limit", 99),
DUCKDB_SETTING_ALIAS("null_order", 43),
DUCKDB_SETTING_ALIAS("profiling_output", 118),
DUCKDB_SETTING_ALIAS("user", 133),
DUCKDB_SETTING_ALIAS("wal_autocheckpoint", 25),
DUCKDB_SETTING_ALIAS("worker_threads", 132),
FINAL_ALIAS};

vector<ConfigurationOption> DBConfig::GetOptions() {
Expand Down
16 changes: 16 additions & 0 deletions src/duckdb/src/main/settings/custom_settings.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,22 @@ Value AllocatorBulkDeallocationFlushThresholdSetting::GetSetting(const ClientCon
return Value(StringUtil::BytesToHumanReadableString(config.options.allocator_bulk_deallocation_flush_threshold));
}

//===----------------------------------------------------------------------===//
// Delta Only Variant Legacy Encoding
//===----------------------------------------------------------------------===//
void DeltaOnlyVariantEncodingEnabledSetting::SetGlobal(DatabaseInstance *db, DBConfig &config, const Value &input) {
throw InvalidInputException("This setting is not adjustable by a user");
}

void DeltaOnlyVariantEncodingEnabledSetting::ResetGlobal(DatabaseInstance *db, DBConfig &config) {
throw InvalidInputException("This setting is not adjustable by a user");
}

Value DeltaOnlyVariantEncodingEnabledSetting::GetSetting(const ClientContext &context) {
auto &config = DBConfig::GetConfig(context);
return Value::BOOLEAN(config.options.variant_legacy_encoding);
}

//===----------------------------------------------------------------------===//
// Allocator Flush Threshold
//===----------------------------------------------------------------------===//
Expand Down
Loading
Loading