Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 17 additions & 0 deletions AGENTS.md
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,23 @@ Check new and modified lines against this list before finishing:
intended semantics.
- Silently reducing the scope of an approved plan when implementation is harder than expected.

## Native FFI in vortex-duckdb

`vortex-duckdb` vendors a pinned DuckDB version and compiles a C++ shim under
`vortex-duckdb/cpp/` from `build.rs`. When writing or modifying any callback that DuckDB
invokes through a C/C++ function pointer:

- Treat the bundled DuckDB headers as the authoritative reference for callback signatures.
Read the `typedef` directly from
`target/*/build/vortex-duckdb-*/out/duckdb-source-v*/duckdb-*/src/include/duckdb/`
(for example `function/copy_function.hpp` for `copy_to_get_written_statistics_t`).
Do not rely on memory or upstream docs, which may describe a different DuckDB version.
- Run `cargo build -p vortex-duckdb` before stopping. C++ signature mismatches only surface
at C++ compile time inside `build.rs`, so they are invisible to Rust-only checks.
- Run `cargo +nightly fmt --all -- --check` to confirm formatting matches the nightly
toolchain CI uses; stable `cargo fmt` may accept code that the nightly `Rust Lint - Format`
step rejects.

## Summaries

When summarizing work, write valid Markdown that can be copied into GitHub. Include the checks
Expand Down
30 changes: 30 additions & 0 deletions vortex-duckdb/cpp/copy_function.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,33 @@ void copy_to_finalize(ClientContext & /*context*/, FunctionData &bind_data, Glob
}
}

void c_get_written_statistics(ClientContext & /*context*/,
FunctionData &bind_data,
GlobalFunctionData &gstate,
CopyFunctionFileStatistics &statistics) {
auto &bind = bind_data.Cast<CCopyBindData>();
auto &global = gstate.Cast<CCopyGlobalData>();
if (!bind.vtab.copy_to_get_written_statistics) {
return;
}

duckdb_vx_error error_out = nullptr;
duckdb_vx_written_statistics_t stats = {};
bool has_stats = bind.vtab.copy_to_get_written_statistics(bind.ffi_data->DataPtr(),
global.ffi_data->DataPtr(),
&stats,
&error_out);
if (error_out) {
throw ExecutorException(IntoErrString(error_out));
}
if (!has_stats) {
return;
}

statistics.row_count = stats.row_count;
statistics.file_size_bytes = stats.file_size_bytes;
}

extern "C" duckdb_vx_copy_func_vtab_t *get_vtab_one() {
return &copy_vtab_one;
}
Expand All @@ -153,6 +180,9 @@ extern "C" duckdb_state duckdb_vx_copy_func_register_vtab_one(duckdb_database ff

copy_function.copy_to_sink = c_copy_to_sink;
copy_function.copy_to_finalize = copy_to_finalize;
if (copy_vtab_one.copy_to_get_written_statistics) {
copy_function.copy_to_get_written_statistics = c_get_written_statistics;
}
copy_function.extension = copy_vtab_one.extension;

// TODO(joe): expose this via c our api
Expand Down
19 changes: 19 additions & 0 deletions vortex-duckdb/cpp/include/duckdb_vx/copy_function.h
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,14 @@ typedef struct {
// BATCH_COPY_TO_FILE
// } copy_function_execution_mode;

/// Statistics about a single written file, reported via copy_to_get_written_statistics.
typedef struct {
/// Total number of rows written to the file.
unsigned long long row_count;
/// Total size of the written file in bytes.
unsigned long long file_size_bytes;
} duckdb_vx_written_statistics_t;

// A transparent DuckDB copy function vtable, which can be used to configure a copy function.
typedef struct {
// The name of the copy function.
Expand Down Expand Up @@ -63,6 +71,17 @@ typedef struct {

void (*copy_to_finalize)(const void *bind_data, void *global_data, duckdb_vx_error *error_out);

/// Optional callback to report per-file write statistics back to DuckDB.
///
/// Called after copy_to_finalize. If non-null and the callback returns true, `stats_out`
/// is filled with aggregate statistics about the written file, and DuckDB exposes these
/// to table-format layers (e.g. duck-lake) for use in manifest or catalog entries.
/// Return false to indicate that no statistics are available for this write.
bool (*copy_to_get_written_statistics)(const void *bind_data,
const void *global_data,
duckdb_vx_written_statistics_t *stats_out,
duckdb_vx_error *error_out);

// TODO(joe): expose via c api
// copy_function_execution_mode (*execution_mode)(bool preserve_insertion_order, bool
// supports_batch_index);
Expand Down
21 changes: 20 additions & 1 deletion vortex-duckdb/src/copy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ use crate::duckdb::CopyFunction;
use crate::duckdb::DataChunkRef;
use crate::duckdb::DuckDbFsWriter;
use crate::duckdb::LogicalTypeRef;
use crate::duckdb::WrittenStatistics;

#[derive(Debug)]
pub struct VortexCopyFunction;
Expand All @@ -56,6 +57,9 @@ pub struct GlobalState {
// into us, and we call `RUNTIME.block_on`.
#[expect(dead_code)]
worker_pool: CurrentThreadWorkerPool,
/// Summary populated by [`VortexCopyFunction::copy_to_finalize`] for use in
/// [`VortexCopyFunction::get_written_statistics`].
write_summary: Option<WriteSummary>,
}

impl CopyFunction for VortexCopyFunction {
Expand Down Expand Up @@ -112,7 +116,8 @@ impl CopyFunction for VortexCopyFunction {
.lock()
.take()
.vortex_expect("no file to close");
task.await?;
let summary = task.await?;
init_global.write_summary = Some(summary);
Ok(())
})
}
Expand Down Expand Up @@ -145,10 +150,24 @@ impl CopyFunction for VortexCopyFunction {
worker_pool,
write_task: Mutex::new(Some(write_task)),
sink: Some(sink),
write_summary: None,
})
}

fn init_local(_global: &Self::BindData) -> VortexResult<Self::LocalState> {
Ok(())
}

fn get_written_statistics(
_bind_data: &Self::BindData,
global_state: &Self::GlobalState,
) -> VortexResult<Option<WrittenStatistics>> {
Ok(global_state
.write_summary
.as_ref()
.map(|s| WrittenStatistics {
row_count: s.row_count(),
file_size_bytes: s.size(),
}))
}
}
26 changes: 26 additions & 0 deletions vortex-duckdb/src/duckdb/copy_function/callback.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ use crate::cpp::duckdb_data_chunk;
use crate::cpp::duckdb_logical_type;
use crate::cpp::duckdb_vx_copy_func_bind_input;
use crate::cpp::duckdb_vx_error;
use crate::cpp::duckdb_vx_written_statistics_t;
use crate::duckdb::ClientContext;
use crate::duckdb::CopyFunction;
use crate::duckdb::Data;
Expand Down Expand Up @@ -118,3 +119,28 @@ pub(crate) unsafe extern "C-unwind" fn copy_to_finalize_callback<T: CopyFunction
Ok(())
})
}

pub(crate) unsafe extern "C-unwind" fn copy_to_get_written_statistics_callback<T: CopyFunction>(
bind_data: *const c_void,
global_data: *const c_void,
stats_out: *mut duckdb_vx_written_statistics_t,
error_out: *mut duckdb_vx_error,
) -> bool {
let bind_data =
unsafe { bind_data.cast::<T::BindData>().as_ref() }.vortex_expect("bind_data null pointer");
let global_data = unsafe { global_data.cast::<T::GlobalState>().as_ref() }
.vortex_expect("global_data null pointer");

try_or(error_out, || {
match T::get_written_statistics(bind_data, global_data)? {
Some(stats) => {
if let Some(out) = unsafe { stats_out.as_mut() } {
out.row_count = stats.row_count;
out.file_size_bytes = stats.file_size_bytes;
}
Ok(true)
}
None => Ok(false),
}
})
}
25 changes: 25 additions & 0 deletions vortex-duckdb/src/duckdb/copy_function/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,24 @@ use crate::duckdb::DatabaseRef;
use crate::duckdb::LogicalTypeRef;
use crate::duckdb::copy_function::callback::bind_callback;
use crate::duckdb::copy_function::callback::copy_to_finalize_callback;
use crate::duckdb::copy_function::callback::copy_to_get_written_statistics_callback;
use crate::duckdb::copy_function::callback::copy_to_sink_callback;
use crate::duckdb::copy_function::callback::global_callback;
use crate::duckdb::copy_function::callback::local_callback;
use crate::duckdb_try;

/// Statistics about a single written Vortex file.
///
/// Returned by [`CopyFunction::get_written_statistics`] and forwarded to DuckDB via
/// `copy_to_get_written_statistics`. Table-format layers (e.g. duck-lake) consume these
/// values to build manifest or catalog entries.
pub struct WrittenStatistics {
/// Total number of rows written to the file.
pub row_count: u64,
/// Total size of the written file in bytes.
pub file_size_bytes: u64,
}

pub trait CopyFunction: Sized + Debug {
type BindData: Send;
type GlobalState: Send + Sync;
Expand Down Expand Up @@ -61,6 +74,17 @@ pub trait CopyFunction: Sized + Debug {
/// is thread-local.
fn init_local(bind: &Self::BindData) -> VortexResult<Self::LocalState>;

/// Returns per-file write statistics after [`copy_to_finalize`][Self::copy_to_finalize].
///
/// Return `Some` to report statistics to DuckDB (e.g. for duck-lake manifest entries).
/// The default implementation returns `None`, which disables the callback.
fn get_written_statistics(
_bind_data: &Self::BindData,
_global_state: &Self::GlobalState,
) -> VortexResult<Option<WrittenStatistics>> {
Ok(None)
}

// TODO(joe): there are many more callbacks that can be configured.
}

Expand All @@ -80,6 +104,7 @@ impl DatabaseRef {
vtab.init_local = Some(local_callback::<T>);
vtab.copy_to_sink = Some(copy_to_sink_callback::<T>);
vtab.copy_to_finalize = Some(copy_to_finalize_callback::<T>);
vtab.copy_to_get_written_statistics = Some(copy_to_get_written_statistics_callback::<T>);

duckdb_try!(
unsafe { cpp::duckdb_vx_copy_func_register_vtab_one(self.as_ptr()) },
Expand Down
Loading