From c408ffa5f1dbe9943106ab40d2a44d8544fd74f7 Mon Sep 17 00:00:00 2001 From: Ruoshi Date: Wed, 28 Jan 2026 21:32:00 -0800 Subject: [PATCH 1/7] feat: implement DuckDB filesystem integration for Vortex file handling --- vortex-duckdb/build.rs | 1 + vortex-duckdb/cpp/copy_function.cpp | 3 +- vortex-duckdb/cpp/file_system.cpp | 203 ++++++++++++ vortex-duckdb/cpp/include/duckdb_vx.h | 1 + .../cpp/include/duckdb_vx/copy_function.h | 3 +- .../cpp/include/duckdb_vx/file_system.h | 56 ++++ vortex-duckdb/src/copy.rs | 19 +- .../src/duckdb/copy_function/callback.rs | 5 +- vortex-duckdb/src/duckdb/copy_function/mod.rs | 2 + vortex-duckdb/src/duckdb/file_system.rs | 296 +++++++++++++++++ vortex-duckdb/src/duckdb/mod.rs | 2 + .../src/e2e_test/vortex_scan_test.rs | 47 +++ vortex-duckdb/src/scan.rs | 67 ++-- vortex-duckdb/src/utils/glob.rs | 306 ++++-------------- 14 files changed, 746 insertions(+), 265 deletions(-) create mode 100644 vortex-duckdb/cpp/file_system.cpp create mode 100644 vortex-duckdb/cpp/include/duckdb_vx/file_system.h create mode 100644 vortex-duckdb/src/duckdb/file_system.rs diff --git a/vortex-duckdb/build.rs b/vortex-duckdb/build.rs index 7a058322935..f12ed48166e 100644 --- a/vortex-duckdb/build.rs +++ b/vortex-duckdb/build.rs @@ -465,6 +465,7 @@ fn main() { .file("cpp/data_chunk.cpp") .file("cpp/error.cpp") .file("cpp/expr.cpp") + .file("cpp/file_system.cpp") .file("cpp/logical_type.cpp") .file("cpp/object_cache.cpp") .file("cpp/replacement_scan.cpp") diff --git a/vortex-duckdb/cpp/copy_function.cpp b/vortex-duckdb/cpp/copy_function.cpp index 3768ba9d1a5..819df028afe 100644 --- a/vortex-duckdb/cpp/copy_function.cpp +++ b/vortex-duckdb/cpp/copy_function.cpp @@ -71,7 +71,8 @@ unique_ptr c_init_global(ClientContext &context, FunctionDat const string &file_path) { auto &bind = bind_data.Cast(); duckdb_vx_error error_out = nullptr; - auto global_data = bind.vtab.init_global(bind.ffi_data->DataPtr(), file_path.c_str(), &error_out); + auto global_data = bind.vtab.init_global(reinterpret_cast(&context), + bind.ffi_data->DataPtr(), file_path.c_str(), &error_out); if (error_out) { throw ExecutorException(IntoErrString(error_out)); } diff --git a/vortex-duckdb/cpp/file_system.cpp b/vortex-duckdb/cpp/file_system.cpp new file mode 100644 index 00000000000..d515fa9b280 --- /dev/null +++ b/vortex-duckdb/cpp/file_system.cpp @@ -0,0 +1,203 @@ +// SPDX-License-Identifier: Apache-2.0 +// SPDX-FileCopyrightText: Copyright the Vortex contributors + +#include "duckdb_vx.h" + +#include +#include +#include + +#include +#include + +using namespace duckdb; + +namespace { +struct FileHandleWrapper { + explicit FileHandleWrapper(unique_ptr handle_p) : handle(std::move(handle_p)) { + } + + unique_ptr handle; +}; + +void SetError(duckdb_vx_error *error_out, const std::string &message) { + if (!error_out) { + return; + } + *error_out = duckdb_vx_error_create(message.data(), message.size()); +} + +duckdb_state HandleException(duckdb_vx_error *error_out) { + try { + throw; + } catch (const Exception &ex) { + SetError(error_out, ex.what()); + } catch (const std::exception &ex) { + SetError(error_out, ex.what()); + } catch (...) { + SetError(error_out, "Unknown error"); + } + return DuckDBError; +} + +} // namespace + +extern "C" duckdb_vx_file_handle duckdb_vx_fs_open(duckdb_vx_client_context ctx, const char *path, + duckdb_vx_error *error_out) { + if (!ctx || !path) { + SetError(error_out, "Invalid filesystem open arguments"); + return nullptr; + } + + try { + auto *client_context = reinterpret_cast(ctx); + auto &fs = FileSystem::GetFileSystem(*client_context); + auto handle = fs.OpenFile(path, FileFlags::FILE_FLAGS_READ); + return reinterpret_cast(new FileHandleWrapper(std::move(handle))); + } catch (...) { + HandleException(error_out); + return nullptr; + } +} + +extern "C" duckdb_vx_file_handle duckdb_vx_fs_create(duckdb_vx_client_context ctx, const char *path, + duckdb_vx_error *error_out) { + if (!ctx || !path) { + SetError(error_out, "Invalid filesystem create arguments"); + return nullptr; + } + + try { + auto *client_context = reinterpret_cast(ctx); + auto &fs = FileSystem::GetFileSystem(*client_context); + auto handle = fs.OpenFile(path, FileFlags::FILE_FLAGS_WRITE | FileFlags::FILE_FLAGS_FILE_CREATE); + handle->Truncate(0); + return reinterpret_cast(new FileHandleWrapper(std::move(handle))); + } catch (...) { + HandleException(error_out); + return nullptr; + } +} + +extern "C" void duckdb_vx_fs_close(duckdb_vx_file_handle *handle) { + if (!handle || !*handle) { + return; + } + auto wrapper = reinterpret_cast(*handle); + delete wrapper; + *handle = nullptr; +} + +extern "C" duckdb_state duckdb_vx_fs_get_size(duckdb_vx_file_handle handle, idx_t *size_out, + duckdb_vx_error *error_out) { + if (!handle || !size_out) { + SetError(error_out, "Invalid arguments to fs_get_size"); + return DuckDBError; + } + + try { + auto *wrapper = reinterpret_cast(handle); + *size_out = wrapper->handle->GetFileSize(); + return DuckDBSuccess; + } catch (...) { + return HandleException(error_out); + } +} + +extern "C" duckdb_state duckdb_vx_fs_read(duckdb_vx_file_handle handle, idx_t offset, idx_t len, uint8_t *buffer, + idx_t *out_len, duckdb_vx_error *error_out) { + if (!handle || !buffer || !out_len) { + SetError(error_out, "Invalid arguments to fs_read"); + return DuckDBError; + } + + try { + auto *wrapper = reinterpret_cast(handle); + wrapper->handle->Read(buffer, len, offset); + *out_len = len; + return DuckDBSuccess; + } catch (...) { + return HandleException(error_out); + } +} + +extern "C" duckdb_state duckdb_vx_fs_write(duckdb_vx_file_handle handle, idx_t offset, idx_t len, + const uint8_t *buffer, idx_t *out_len, + duckdb_vx_error *error_out) { + if (!handle || !buffer || !out_len) { + SetError(error_out, "Invalid arguments to fs_write"); + return DuckDBError; + } + + try { + auto *wrapper = reinterpret_cast(handle); + wrapper->handle->Seek(offset); + wrapper->handle->Write(const_cast(buffer), len); + *out_len = len; + return DuckDBSuccess; + } catch (...) { + return HandleException(error_out); + } +} + +extern "C" duckdb_vx_string_list duckdb_vx_fs_glob(duckdb_vx_client_context ctx, const char *pattern, + duckdb_vx_error *error_out) { + duckdb_vx_string_list result{nullptr, 0}; + + if (!ctx || !pattern) { + SetError(error_out, "Invalid arguments to fs_glob"); + return result; + } + + try { + auto *client_context = reinterpret_cast(ctx); + auto &fs = FileSystem::GetFileSystem(*client_context); + auto matches = fs.Glob(pattern); + + if (matches.empty()) { + return result; + } + + result.count = matches.size(); + result.entries = static_cast(malloc(sizeof(char *) * matches.size())); + for (size_t i = 0; i < matches.size(); i++) { + const auto &entry = matches[i].path; + auto *owned = static_cast(malloc(entry.size() + 1)); + std::memcpy(owned, entry.data(), entry.size()); + owned[entry.size()] = '\0'; + result.entries[i] = owned; + } + + return result; + } catch (...) { + HandleException(error_out); + return result; + } +} + +extern "C" void duckdb_vx_string_list_free(duckdb_vx_string_list *list) { + if (!list || !list->entries) { + return; + } + for (size_t i = 0; i < list->count; i++) { + free(const_cast(list->entries[i])); + } + free(list->entries); + list->entries = nullptr; + list->count = 0; +} + +extern "C" duckdb_state duckdb_vx_fs_sync(duckdb_vx_file_handle handle, duckdb_vx_error *error_out) { + if (!handle) { + SetError(error_out, "Invalid arguments to fs_sync"); + return DuckDBError; + } + + try { + auto *wrapper = reinterpret_cast(handle); + wrapper->handle->Sync(); + return DuckDBSuccess; + } catch (...) { + return HandleException(error_out); + } +} diff --git a/vortex-duckdb/cpp/include/duckdb_vx.h b/vortex-duckdb/cpp/include/duckdb_vx.h index b18eb79c522..9882afbb310 100644 --- a/vortex-duckdb/cpp/include/duckdb_vx.h +++ b/vortex-duckdb/cpp/include/duckdb_vx.h @@ -10,6 +10,7 @@ #include "duckdb_vx/data_chunk.h" #include "duckdb_vx/error.h" #include "duckdb_vx/expr.h" +#include "duckdb_vx/file_system.h" #include "duckdb_vx/logical_type.h" #include "duckdb_vx/object_cache.h" #include "duckdb_vx/replacement_scan.h" diff --git a/vortex-duckdb/cpp/include/duckdb_vx/copy_function.h b/vortex-duckdb/cpp/include/duckdb_vx/copy_function.h index 168921fb3a4..a5cd89fdf63 100644 --- a/vortex-duckdb/cpp/include/duckdb_vx/copy_function.h +++ b/vortex-duckdb/cpp/include/duckdb_vx/copy_function.h @@ -45,7 +45,8 @@ typedef struct { unsigned long column_name_count, const duckdb_logical_type *column_types, unsigned long column_type_count, duckdb_vx_error *error_out); - duckdb_vx_data (*init_global)(const void *bind_data, const char *file_path, duckdb_vx_error *error_out); + duckdb_vx_data (*init_global)(duckdb_vx_client_context ctx, const void *bind_data, const char *file_path, + duckdb_vx_error *error_out); duckdb_vx_data (*init_local)(const void *bind_data, duckdb_vx_error *error_out); diff --git a/vortex-duckdb/cpp/include/duckdb_vx/file_system.h b/vortex-duckdb/cpp/include/duckdb_vx/file_system.h new file mode 100644 index 00000000000..59caff68d64 --- /dev/null +++ b/vortex-duckdb/cpp/include/duckdb_vx/file_system.h @@ -0,0 +1,56 @@ +// SPDX-License-Identifier: Apache-2.0 +// SPDX-FileCopyrightText: Copyright the Vortex contributors + +#pragma once + +#include "duckdb.h" +#include "duckdb_vx/client_context.h" +#include "duckdb_vx/error.h" + +#ifdef __cplusplus /* If compiled as C++, use C ABI */ +extern "C" { +#endif + +typedef struct duckdb_vx_file_handle_ *duckdb_vx_file_handle; + +typedef struct { + const char **entries; + size_t count; +} duckdb_vx_string_list; + +// Open a file using DuckDB's filesystem (supports httpfs, s3, etc.). +duckdb_vx_file_handle duckdb_vx_fs_open(duckdb_vx_client_context ctx, const char *path, + duckdb_vx_error *error_out); + +// Close a previously opened file handle. +void duckdb_vx_fs_close(duckdb_vx_file_handle *handle); + +// Get the size of an opened file. +duckdb_state duckdb_vx_fs_get_size(duckdb_vx_file_handle handle, idx_t *size_out, + duckdb_vx_error *error_out); + +// Read up to len bytes at the given offset into buffer. Returns bytes read via out_len. +duckdb_state duckdb_vx_fs_read(duckdb_vx_file_handle handle, idx_t offset, idx_t len, uint8_t *buffer, + idx_t *out_len, duckdb_vx_error *error_out); + +// Expand a glob using DuckDB's filesystem. +duckdb_vx_string_list duckdb_vx_fs_glob(duckdb_vx_client_context ctx, const char *pattern, + duckdb_vx_error *error_out); + +// Free a string list allocated by duckdb_vx_fs_glob. +void duckdb_vx_string_list_free(duckdb_vx_string_list *list); + +// Create/truncate a file for writing using DuckDB's filesystem. +duckdb_vx_file_handle duckdb_vx_fs_create(duckdb_vx_client_context ctx, const char *path, + duckdb_vx_error *error_out); + +// Write len bytes at the given offset from buffer. +duckdb_state duckdb_vx_fs_write(duckdb_vx_file_handle handle, idx_t offset, idx_t len, const uint8_t *buffer, + idx_t *out_len, duckdb_vx_error *error_out); + +// Flush pending writes to storage. +duckdb_state duckdb_vx_fs_sync(duckdb_vx_file_handle handle, duckdb_vx_error *error_out); + +#ifdef __cplusplus /* End C ABI */ +} +#endif diff --git a/vortex-duckdb/src/copy.rs b/vortex-duckdb/src/copy.rs index 40a0c72ea58..56f32fc41e2 100644 --- a/vortex-duckdb/src/copy.rs +++ b/vortex-duckdb/src/copy.rs @@ -29,13 +29,20 @@ use crate::RUNTIME; use crate::SESSION; use crate::convert::data_chunk_to_vortex; use crate::convert::from_duckdb_table; +use crate::cpp; +use crate::duckdb::ClientContext; use crate::duckdb::CopyFunction; use crate::duckdb::DataChunk; use crate::duckdb::LogicalType; +use crate::duckdb::duckdb_fs_create_writer; #[derive(Debug)] pub struct VortexCopyFunction; +#[derive(Clone, Copy)] +struct SendableClientCtx(usize); +unsafe impl Send for SendableClientCtx {} + pub struct BindData { dtype: DType, fields: StructFields, @@ -118,6 +125,7 @@ impl CopyFunction for VortexCopyFunction { } fn init_global( + client_context: ClientContext, bind_data: &Self::BindData, file_path: String, ) -> VortexResult { @@ -126,9 +134,16 @@ impl CopyFunction for VortexCopyFunction { let array_stream = ArrayStreamAdapter::new(bind_data.dtype.clone(), rx.into_stream()); let handle = SESSION.handle(); + let ctx_ptr = SendableClientCtx(client_context.as_ptr() as usize); let write_task = handle.spawn(async move { - let mut file = async_fs::File::create(file_path).await?; - SESSION.write_options().write(&mut file, array_stream).await + // Prefer DuckDB FS (httpfs/s3/etc.), fallback to local async fs if unavailable. + let ctx_raw = ctx_ptr.0 as cpp::duckdb_vx_client_context; + if let Ok(writer) = unsafe { duckdb_fs_create_writer(ctx_raw, &file_path) } { + SESSION.write_options().write(writer, array_stream).await + } else { + let mut file = async_fs::File::create(&file_path).await?; + SESSION.write_options().write(&mut file, array_stream).await + } }); let worker_pool = RUNTIME.new_pool(); diff --git a/vortex-duckdb/src/duckdb/copy_function/callback.rs b/vortex-duckdb/src/duckdb/copy_function/callback.rs index 05982003544..783bd9f2a2e 100644 --- a/vortex-duckdb/src/duckdb/copy_function/callback.rs +++ b/vortex-duckdb/src/duckdb/copy_function/callback.rs @@ -15,6 +15,7 @@ use crate::cpp::duckdb_data_chunk; use crate::cpp::duckdb_logical_type; use crate::cpp::duckdb_vx_copy_func_bind_input; use crate::cpp::duckdb_vx_error; +use crate::duckdb::ClientContext; use crate::duckdb::CopyFunction; use crate::duckdb::Data; use crate::duckdb::DataChunk; @@ -52,6 +53,7 @@ pub(crate) unsafe extern "C-unwind" fn bind_callback( } pub(crate) unsafe extern "C-unwind" fn global_callback( + client_context: cpp::duckdb_vx_client_context, bind_data: *const c_void, file_path: *const c_char, error_out: *mut duckdb_vx_error, @@ -62,7 +64,8 @@ pub(crate) unsafe extern "C-unwind" fn global_callback( let bind_data = unsafe { bind_data.cast::().as_ref() } .vortex_expect("global_init_data null pointer"); try_or_null(error_out, || { - let bind_data = T::init_global(bind_data, file_path)?; + let ctx = unsafe { ClientContext::borrow(client_context) }; + let bind_data = T::init_global(ctx, bind_data, file_path)?; Ok(Data::from(Box::new(bind_data)).as_ptr()) }) } diff --git a/vortex-duckdb/src/duckdb/copy_function/mod.rs b/vortex-duckdb/src/duckdb/copy_function/mod.rs index fc8a51d0fd9..0c4a6f903b8 100644 --- a/vortex-duckdb/src/duckdb/copy_function/mod.rs +++ b/vortex-duckdb/src/duckdb/copy_function/mod.rs @@ -11,6 +11,7 @@ use vortex::error::VortexResult; use crate::Connection; use crate::cpp; +use crate::duckdb::ClientContext; use crate::duckdb::DataChunk; use crate::duckdb::LogicalType; use crate::duckdb::copy_function::callback::bind_callback; @@ -49,6 +50,7 @@ pub trait CopyFunction: Sized + Debug { /// The global operator state is used to keep track of the progress in the copy function and /// is shared between all threads working on the copy function. fn init_global( + client_context: ClientContext, bind_data: &Self::BindData, file_path: String, ) -> VortexResult; diff --git a/vortex-duckdb/src/duckdb/file_system.rs b/vortex-duckdb/src/duckdb/file_system.rs new file mode 100644 index 00000000000..8e7dfe30212 --- /dev/null +++ b/vortex-duckdb/src/duckdb/file_system.rs @@ -0,0 +1,296 @@ +// SPDX-License-Identifier: Apache-2.0 +// SPDX-FileCopyrightText: Copyright the Vortex contributors + +use std::ffi::CStr; +use std::ffi::CString; +use std::ptr; +use std::sync::Arc; +use std::sync::OnceLock; + +use futures::FutureExt; +use futures::future::BoxFuture; +use parking_lot::Mutex; +use vortex::buffer::Alignment; +use vortex::buffer::ByteBuffer; +use vortex::buffer::ByteBufferMut; +use vortex::error::VortexError; +use vortex::error::VortexResult; +use vortex::error::vortex_err; +use vortex::io::CoalesceConfig; +use vortex::io::VortexReadAt; +use vortex::io::runtime::BlockingRuntime; +use vortex::io::IoBuf; +use vortex::io::VortexWrite; + +use crate::RUNTIME; +use crate::cpp; +use crate::duckdb::ClientContext; +use crate::lifetime_wrapper; + +const DEFAULT_COALESCE: CoalesceConfig = CoalesceConfig { + distance: 1024 * 1024, // 1 MB + max_size: 8 * 1024 * 1024, // 8 MB +}; + +const DEFAULT_CONCURRENCY: usize = 64; + +lifetime_wrapper!(FsFileHandle, cpp::duckdb_vx_file_handle, cpp::duckdb_vx_fs_close, [owned, ref]); +unsafe impl Send for FsFileHandle {} +unsafe impl Sync for FsFileHandle {} +unsafe impl Send for cpp::duckdb_vx_client_context_ {} +unsafe impl Sync for cpp::duckdb_vx_client_context_ {} + +fn fs_error(err: cpp::duckdb_vx_error) -> VortexError { + if err.is_null() { + return vortex_err!("DuckDB filesystem error (unknown)"); + } + let message = unsafe { CStr::from_ptr(cpp::duckdb_vx_error_value(err)) } + .to_string_lossy() + .to_string(); + unsafe { cpp::duckdb_vx_error_free(err) }; + vortex_err!("{message}") +} + +pub fn duckdb_fs_glob(ctx: &ClientContext, pattern: &str) -> VortexResult> { + let c_pattern = CString::new(pattern).map_err(|e| vortex_err!("Invalid glob pattern: {e}"))?; + let mut err: cpp::duckdb_vx_error = ptr::null_mut(); + let mut list = + unsafe { cpp::duckdb_vx_fs_glob(ctx.as_ptr(), c_pattern.as_ptr(), &raw mut err) }; + if !err.is_null() { + return Err(fs_error(err)); + } + + let mut urls = Vec::with_capacity(list.count); + for idx in 0..list.count { + let entry = unsafe { CStr::from_ptr(*list.entries.add(idx)) }; + let entry_str = entry.to_string_lossy(); + let url = url::Url::parse(&entry_str) + .map_err(|e| vortex_err!("Invalid URL returned by DuckDB glob {entry_str}: {e}"))?; + urls.push(url); + } + + unsafe { cpp::duckdb_vx_string_list_free(&raw mut list) }; + + Ok(urls) +} + +pub unsafe fn duckdb_fs_create_writer( + ctx: cpp::duckdb_vx_client_context, + path: &str, +) -> VortexResult { + unsafe { DuckDbFsWriter::create(ctx, path) } +} + +/// A VortexReadAt implementation backed by DuckDB's filesystem (e.g., httpfs/s3). +pub struct DuckDbFsReadAt { + handle: Arc>, + uri: Arc, + size: Arc>, +} + +impl DuckDbFsReadAt { + pub unsafe fn open_url( + ctx: cpp::duckdb_vx_client_context, + url: &url::Url, + ) -> VortexResult { + let c_path = CString::new(url.as_str()).map_err(|e| vortex_err!("Invalid URL: {e}"))?; + let mut err: cpp::duckdb_vx_error = ptr::null_mut(); + let handle = unsafe { cpp::duckdb_vx_fs_open(ctx, c_path.as_ptr(), &raw mut err) }; + if handle.is_null() { + return Err(fs_error(err)); + } + + Ok(Self { + handle: Arc::new(Mutex::new(unsafe { FsFileHandle::own(handle) })), + uri: Arc::from(url.as_str()), + size: Arc::new(OnceLock::new()), + }) + } +} + +impl VortexReadAt for DuckDbFsReadAt { + fn uri(&self) -> Option<&Arc> { + Some(&self.uri) + } + + fn coalesce_config(&self) -> Option { + Some(DEFAULT_COALESCE) + } + + fn concurrency(&self) -> usize { + DEFAULT_CONCURRENCY + } + + fn size(&self) -> BoxFuture<'static, VortexResult> { + let handle = self.handle.clone(); + let size_cell = self.size.clone(); + + async move { + if let Some(size) = size_cell.get() { + return Ok(*size); + } + + let runtime = RUNTIME.handle(); + let size = runtime + .spawn_blocking(move || { + let mut err: cpp::duckdb_vx_error = ptr::null_mut(); + let mut size_out: cpp::idx_t = 0; + let status = unsafe { + cpp::duckdb_vx_fs_get_size( + handle.lock().as_ptr(), + &raw mut size_out, + &raw mut err, + ) + }; + if status != cpp::duckdb_state::DuckDBSuccess { + return Err(fs_error(err)); + } + Ok::<_, VortexError>(size_out as u64) + }) + .await?; + + let _ = size_cell.set(size); + Ok(size) + } + .boxed() + } + + fn read_at( + &self, + offset: u64, + length: usize, + alignment: Alignment, + ) -> BoxFuture<'static, VortexResult> { + let handle = self.handle.clone(); + + async move { + let runtime = RUNTIME.handle(); + runtime + .spawn_blocking(move || { + let mut buffer = ByteBufferMut::with_capacity_aligned(length, alignment); + unsafe { buffer.set_len(length) }; + + let mut err: cpp::duckdb_vx_error = ptr::null_mut(); + let mut out_len: cpp::idx_t = 0; + let status = unsafe { + cpp::duckdb_vx_fs_read( + handle.lock().as_ptr(), + offset as cpp::idx_t, + length as cpp::idx_t, + buffer.as_mut_slice().as_mut_ptr(), + &raw mut out_len, + &raw mut err, + ) + }; + + if status != cpp::duckdb_state::DuckDBSuccess { + return Err(fs_error(err)); + } + + let used = usize::try_from(out_len) + .map_err(|e| vortex_err!("Invalid read len: {e}"))?; + unsafe { buffer.set_len(used) }; + + Ok::<_, VortexError>(buffer.freeze()) + }) + .await + } + .boxed() + } +} + +// SAFETY: Access is serialized via a mutex and DuckDB's file handles are thread-safe for reads. +unsafe impl Send for DuckDbFsReadAt {} +unsafe impl Sync for DuckDbFsReadAt {} + +pub struct DuckDbFsWriter { + handle: FsFileHandle, + pos: u64, +} + +impl DuckDbFsWriter { + pub unsafe fn create(ctx: cpp::duckdb_vx_client_context, path: &str) -> VortexResult { + let c_path = CString::new(path).map_err(|e| vortex_err!("Invalid path: {e}"))?; + let mut err: cpp::duckdb_vx_error = ptr::null_mut(); + let handle = unsafe { cpp::duckdb_vx_fs_create(ctx, c_path.as_ptr(), &raw mut err) }; + if handle.is_null() { + return Err(fs_error(err)); + } + + Ok(Self { + handle: unsafe { FsFileHandle::own(handle) }, + pos: 0, + }) + } +} + +impl VortexWrite for DuckDbFsWriter { + async fn write_all(&mut self, buffer: B) -> std::io::Result { + let len = buffer.bytes_init(); + let mut err: cpp::duckdb_vx_error = ptr::null_mut(); + let mut out_len: cpp::idx_t = 0; + + let status = unsafe { + cpp::duckdb_vx_fs_write( + self.handle.as_ptr(), + self.pos as cpp::idx_t, + len as cpp::idx_t, + buffer.read_ptr(), + &raw mut out_len, + &raw mut err, + ) + }; + + if status != cpp::duckdb_state::DuckDBSuccess { + return Err(std::io::Error::other(fs_error(err).to_string())); + } + + self.pos += len as u64; + Ok(buffer) + } + + async fn flush(&mut self) -> std::io::Result<()> { + let mut err: cpp::duckdb_vx_error = ptr::null_mut(); + let status = unsafe { cpp::duckdb_vx_fs_sync(self.handle.as_ptr(), &raw mut err) }; + if status != cpp::duckdb_state::DuckDBSuccess { + return Err(std::io::Error::other(fs_error(err).to_string())); + } + Ok(()) + } + + async fn shutdown(&mut self) -> std::io::Result<()> { + self.flush().await + } +} + +#[cfg(test)] +mod tests { + use std::fs; + use std::path::PathBuf; + + use super::*; + use crate::duckdb::Database; + + #[test] + fn test_writer_roundtrip_local() { + let db = Database::open_in_memory().unwrap(); + let conn = db.connect().unwrap(); + let ctx = conn.client_context().unwrap(); + + let dir = tempfile::tempdir().unwrap(); + let path: PathBuf = dir.path().join("writer_local.vortex"); + let path_str = path.to_string_lossy(); + + let mut writer = unsafe { duckdb_fs_create_writer(ctx.as_ptr(), &path_str) }.unwrap(); + + futures::executor::block_on(async { + VortexWrite::write_all(&mut writer, vec![1_u8, 2, 3]) + .await + .unwrap(); + VortexWrite::flush(&mut writer).await.unwrap(); + }); + + let data = fs::read(path).unwrap(); + assert_eq!(data, vec![1, 2, 3]); + } +} diff --git a/vortex-duckdb/src/duckdb/mod.rs b/vortex-duckdb/src/duckdb/mod.rs index 1960eb71ded..a88610a63e0 100644 --- a/vortex-duckdb/src/duckdb/mod.rs +++ b/vortex-duckdb/src/duckdb/mod.rs @@ -9,6 +9,7 @@ mod data; mod data_chunk; mod database; mod expr; +mod file_system; pub mod footer_cache; mod logical_type; mod macro_; @@ -34,6 +35,7 @@ pub use data::*; pub use data_chunk::*; pub use database::*; pub use expr::*; +pub use file_system::*; pub use logical_type::*; pub use object_cache::*; pub use query_result::*; diff --git a/vortex-duckdb/src/e2e_test/vortex_scan_test.rs b/vortex-duckdb/src/e2e_test/vortex_scan_test.rs index 9812dcd2728..c271985eb47 100644 --- a/vortex-duckdb/src/e2e_test/vortex_scan_test.rs +++ b/vortex-duckdb/src/e2e_test/vortex_scan_test.rs @@ -4,6 +4,8 @@ //! This module contains tests for the `vortex_scan` table function. use std::ffi::CStr; +use std::io::Write; +use std::net::TcpListener; use std::path::Path; use std::slice; use std::str::FromStr; @@ -347,6 +349,51 @@ fn test_vortex_scan_multiple_files() { assert_eq!(total_sum, 21); } +#[test] +fn test_vortex_scan_over_http() { + let file = RUNTIME.block_on(async { + let strings = VarBinArray::from(vec!["a", "b", "c"]); + write_single_column_vortex_file("strings", strings).await + }); + + let file_bytes = std::fs::read(file.path()).unwrap(); + let listener = TcpListener::bind("127.0.0.1:0").unwrap(); + let addr = listener.local_addr().unwrap(); + + std::thread::spawn(move || { + for _ in 0..2 { + if let Ok((mut stream, _)) = listener.accept() { + let response = format!( + "HTTP/1.1 200 OK\r\nContent-Length: {}\r\n\r\n", + file_bytes.len() + ); + stream.write_all(response.as_bytes()).unwrap(); + stream.write_all(&file_bytes).unwrap(); + } + } + }); + + let conn = database_connection(); + conn.query("INSTALL httpfs;").unwrap(); + conn.query("LOAD httpfs;").unwrap(); + + let url = format!( + "http://{}/{}", + addr, + file.path().file_name().unwrap().to_string_lossy() + ); + + let result = conn + .query(&format!("SELECT COUNT(*) FROM read_vortex('{url}')")) + .unwrap(); + let chunk = result.into_iter().next().unwrap(); + let count = chunk + .get_vector(0) + .as_slice_with_len::(chunk.len().as_())[0]; + + assert_eq!(count, 3); +} + #[test] fn test_write_file() { let conn = database_connection(); diff --git a/vortex-duckdb/src/scan.rs b/vortex-duckdb/src/scan.rs index f48ce764fc5..786ed15102c 100644 --- a/vortex-duckdb/src/scan.rs +++ b/vortex-duckdb/src/scan.rs @@ -54,12 +54,14 @@ use crate::RUNTIME; use crate::SESSION; use crate::convert::try_from_bound_expression; use crate::convert::try_from_table_filter; +use crate::cpp; use crate::duckdb; use crate::duckdb::BindInput; use crate::duckdb::BindResult; use crate::duckdb::Cardinality; use crate::duckdb::ClientContext; use crate::duckdb::DataChunk; +use crate::duckdb::DuckDbFsReadAt; use crate::duckdb::ExtractedValue; use crate::duckdb::LogicalType; use crate::duckdb::TableFunction; @@ -203,26 +205,50 @@ fn extract_table_filter_expr( Ok(and_collect(table_filter_exprs.into_iter().collect_vec())) } -/// Helper function to open a Vortex file from either a local or S3 URL -async fn open_file(url: Url, options: VortexOpenOptions) -> VortexResult { - if url.scheme() == "s3" { - assert_eq!(url.scheme(), "s3"); - let bucket = url - .host_str() - .ok_or_else(|| vortex_err!("Failed to extract bucket name from URL: {url}"))?; +/// Helper function to open a Vortex file from either a local path or a URL supported by DuckDB's filesystem. +#[derive(Clone, Copy)] +struct SendableClientCtx(usize); +unsafe impl Send for SendableClientCtx {} +unsafe impl Sync for SendableClientCtx {} - let path = url - .path() - .strip_prefix("/") - .ok_or_else(|| vortex_err!("Invalid S3 path: {url}"))?; +fn open_duckdb_reader(client_ctx: SendableClientCtx, url: &Url) -> VortexResult { + let ctx_ptr = client_ctx.0 as cpp::duckdb_vx_client_context; + unsafe { DuckDbFsReadAt::open_url(ctx_ptr, url) } +} - options.open_object_store(&s3_store(bucket)?, path).await - } else { - let path = url - .to_file_path() - .map_err(|_| vortex_err!("Invalid file URL: {url}"))?; +async fn open_file( + client_ctx: SendableClientCtx, + url: Url, + options: VortexOpenOptions, +) -> VortexResult { + match url.scheme() { + "http" | "https" | "s3" => { + let reader = open_duckdb_reader(client_ctx, &url); + + // Fallback to the legacy object_store path for s3 if DuckDB fs isn't configured. + if url.scheme() == "s3" && reader.is_err() { + let bucket = url + .host_str() + .ok_or_else(|| vortex_err!("Failed to extract bucket name from URL: {url}"))?; + + let path = url + .path() + .strip_prefix("/") + .ok_or_else(|| vortex_err!("Invalid S3 path: {url}"))?; + + return options.open_object_store(&s3_store(bucket)?, path).await; + } - options.open_path(path).await + let reader = Arc::new(reader?); + options.open(reader).await + } + _ => { + let path = url + .to_file_path() + .map_err(|_| vortex_err!("Invalid file URL: {url}"))?; + + options.open_path(path).await + } } } @@ -277,6 +303,7 @@ impl TableFunction for VortexTableFunction { tracing::trace!("running scan with max_threads {max_threads}"); let (file_urls, _metadata) = RUNTIME.block_on(Compat::new(expand_glob( + ctx, file_glob_string.as_ref().as_string(), )))?; @@ -286,10 +313,11 @@ impl TableFunction for VortexTableFunction { }; let footer_cache = FooterCache::new(ctx.object_cache()); + let client_ctx = SendableClientCtx(ctx.as_ptr() as usize); let entry = footer_cache.entry(first_file_url.as_ref()); let first_file = RUNTIME.block_on(async move { let options = entry.apply_to_file(SESSION.open_options()); - let file = open_file(first_file_url.clone(), options).await?; + let file = open_file(client_ctx, first_file_url.clone(), options).await?; entry.put_if_absent(|| file.footer().clone()); VortexResult::Ok(file) })?; @@ -392,6 +420,7 @@ impl TableFunction for VortexTableFunction { let num_workers = bind_data.max_threads as usize; let client_context = init_input.client_context()?; + let client_ctx_ptr = SendableClientCtx(client_context.as_ptr() as usize); let object_cache = client_context.object_cache(); let handle = RUNTIME.handle(); @@ -415,7 +444,7 @@ impl TableFunction for VortexTableFunction { let cache = FooterCache::new(object_cache); let entry = cache.entry(url.as_ref()); let options = entry.apply_to_file(SESSION.open_options()); - let file = open_file(url.clone(), options).await?; + let file = open_file(client_ctx_ptr, url.clone(), options).await?; entry.put_if_absent(|| file.footer().clone()); VortexResult::Ok(file) }?; diff --git a/vortex-duckdb/src/utils/glob.rs b/vortex-duckdb/src/utils/glob.rs index 0c21c4f3034..13718ea63dd 100644 --- a/vortex-duckdb/src/utils/glob.rs +++ b/vortex-duckdb/src/utils/glob.rs @@ -1,141 +1,48 @@ // SPDX-License-Identifier: Apache-2.0 // SPDX-FileCopyrightText: Copyright the Vortex contributors -use futures::StreamExt; +#[cfg(test)] +use std::sync::OnceLock; + use object_store::ObjectMeta; +#[cfg(test)] +use parking_lot::Mutex; use url::Url; use vortex::error::VortexResult; use vortex::error::vortex_bail; use vortex::error::vortex_err; -use super::object_store::s3_store; +use crate::duckdb::ClientContext; +use crate::duckdb::duckdb_fs_glob; + +#[cfg(test)] +type DuckdbGlobHook = Box VortexResult> + Send + Sync>; + +#[cfg(test)] +static DUCKDB_GLOB_HOOK: OnceLock>> = OnceLock::new(); + +#[cfg(test)] +static GLOB_TEST_GUARD: OnceLock> = OnceLock::new(); /// Expand a glob pattern into a list of URLs. /// /// Example: s3://bucket/files/*.vortex -> (urls, Some(object_store_metadata)) pub async fn expand_glob>( + client_context: &ClientContext, url_glob: T, ) -> VortexResult<(Vec, Option>)> { let url_str = url_glob.as_ref(); // We prefer using string prefix matching here over extracting a URL scheme // as local files with an absolute path but without the file:// prefix can't // be parsed into a URL. - match &url_str[..url_str.len().min(5)] { - "s3://" => s3::expand_glob(&url_glob).await, - "gs://" => vortex_bail!("GCS glob expansion not yet implemented"), - _ => local_filesystem::expand_glob(url_str), - } -} - -mod s3 { - use object_store::ObjectMeta; - - use super::*; - - /// Expand a glob pattern into a list of S3 URLs. - /// - /// Makes a single request based on the position of the first glob character - /// and filters the results on the client side. In case no glob characters - /// are provided, the last directory in the path is used as the list prefix. - pub(super) async fn expand_glob>( - url_glob: T, - ) -> VortexResult<(Vec, Option>)> { - validate_glob(&url_glob)?; - assert_eq!("s3://", &url_glob.as_ref()[..5]); - let url = Url::parse(url_glob.as_ref())?; - - let bucket = url - .host_str() - .ok_or_else(|| vortex_err!("Failed to extract bucket name from URL: {url}"))?; - - let list_prefix = list_prefix(url_path(&url)?); - let object_store = s3_store(bucket)?; - - // The AWS S3 `ListObjectsV2` API returns multiple objects per HTTP request, up to 1000 by default. - let stream = object_store.list(Some(&object_store::path::Path::from(list_prefix))); - - let glob_pattern = glob::Pattern::new(url_glob.as_ref()) - .map_err(|e| vortex_err!("Invalid glob pattern: {}", e))?; - - let matching_paths = process_object_store_stream(stream, &glob_pattern, bucket).await?; - - let (urls, metadata) = matching_paths.into_iter().unzip(); - Ok((urls, Some(metadata))) - } - - /// Validates that a glob pattern does not contain escaped glob characters. - /// Returns an error if backslash-escaped characters like \*, \?, etc. are found. - pub(super) fn validate_glob>(pattern: T) -> VortexResult<()> { - let pattern_str = pattern.as_ref(); - - // Check for backslash escape sequences. - for escape_pattern in ["\\*", "\\?", "\\["] { - if pattern_str.contains(escape_pattern) { - vortex_bail!( - "Escaped glob characters are not allowed in patterns. Found '{}' in: {}", - escape_pattern, - pattern_str - ); - } + match url_str { + s if s.starts_with("s3://") || s.starts_with("https://") || s.starts_with("http://") => { + duckdb_fs_glob(client_context, url_str).map(|urls| (urls, None)) } - - Ok(()) - } - - /// Returns the path from an S3 URL. - /// - /// Example: "s3://bucket/path/to/file.txt" -> "path/to/file.txt" - pub(super) fn url_path(url: &Url) -> VortexResult<&str> { - url.path() - .strip_prefix("/") - .ok_or_else(|| vortex_err!("Invalid URL: {url}")) - } - - /// Returns the list prefix for a URL path which can contain glob characters. - /// - /// Unlike `aws s3 ls`, the object store crate does not support support - /// incomplete file names as a prefix. Therefore, the prefix is the - /// directory path up to the first glob character. - /// - /// Example: "path/to/file_*.txt" -> "path/to/" - pub(super) fn list_prefix>(url_path: T) -> String { - let url_path = url_path.as_ref(); - // Find first glob character index. - let special_char_index = url_path - .find(|c| ['*', '?', '['].contains(&c)) - .unwrap_or(url_path.len()); - - match &url_path[..special_char_index].rsplit_once('/') { - Some((dir_path, _)) => format!("{dir_path}/"), - None => url_path[..special_char_index].to_string(), + s if s.starts_with("gs://") => { + vortex_bail!("GCS glob expansion not yet implemented") } - } - - /// Process an object store stream and filter the results on the client - /// based on the glob pattern. - async fn process_object_store_stream( - stream: impl futures::Stream>, - glob_pattern: &glob::Pattern, - bucket: &str, - ) -> VortexResult> { - let matching_paths: Vec<(Url, ObjectMeta)> = stream - .map(|object_meta| async move { - if let Ok(object_meta) = object_meta { - let url_string = format!("s3://{}/{}", bucket, object_meta.location); - if glob_pattern.matches(&url_string) - && let Ok(parsed_url) = Url::parse(&url_string) - { - return Some((parsed_url, object_meta)); - } - } - None - }) - .buffer_unordered(16) - .filter_map(|result| async { result }) - .collect() - .await; - - Ok(matching_paths) + _ => local_filesystem::expand_glob(url_str), } } @@ -177,6 +84,18 @@ mod tests { use tempfile::TempDir; use super::*; + use crate::duckdb::Database; + + fn ctx() -> ClientContext { + let db = Database::open_in_memory().unwrap(); + db.connect().unwrap().client_context().unwrap() + } + + fn set_glob_hook(hook: Option) { + let cell = DUCKDB_GLOB_HOOK.get_or_init(|| Mutex::new(None)); + let mut guard = cell.lock(); + *guard = hook; + } #[test] fn test_expand_local_disk_glob_relative_path() { @@ -198,6 +117,36 @@ mod tests { env::set_current_dir(&original_dir).unwrap(); } + #[test] + fn test_duckdb_glob_http_hook() { + let _guard = GLOB_TEST_GUARD.get_or_init(|| Mutex::new(())).lock(); + let url = Url::parse("http://example.com/data.vortex").unwrap(); + let expected = url.clone(); + set_glob_hook(Some(Box::new(move |_, _| Ok(vec![url.clone()])))); + + let (urls, meta) = + futures::executor::block_on(expand_glob(&ctx(), "http://example.com/data.vortex")) + .unwrap(); + assert_eq!(meta, None); + assert_eq!(urls.len(), 1); + assert_eq!(urls[0], expected); + + set_glob_hook(None); + } + + #[test] + fn test_duckdb_glob_http_empty() { + let _guard = GLOB_TEST_GUARD.get_or_init(|| Mutex::new(())).lock(); + set_glob_hook(Some(Box::new(|_, _| Ok(vec![])))); + + let (urls, _) = + futures::executor::block_on(expand_glob(&ctx(), "http://example.com/none.vortex")) + .unwrap(); + assert!(urls.is_empty()); + + set_glob_hook(None); + } + #[test] fn test_expand_local_disk_glob_single_file() { let temp_dir = TempDir::new().unwrap(); @@ -268,129 +217,4 @@ mod tests { assert_eq!(result.0.len(), 2); } - - #[test] - fn test_extract_s3_url_path() { - // Test valid S3 URL - let url = Url::parse("s3://bucket/path/to/file.txt").unwrap(); - let result = s3::url_path(&url).unwrap(); - assert_eq!(result, "path/to/file.txt"); - - // Test URL with nested path - let url = Url::parse("s3://my-bucket/folder/subfolder/data.parquet").unwrap(); - let result = s3::url_path(&url).unwrap(); - assert_eq!(result, "folder/subfolder/data.parquet"); - - // Test URL with root path - let url = Url::parse("s3://bucket/file.txt").unwrap(); - let result = s3::url_path(&url).unwrap(); - assert_eq!(result, "file.txt"); - - // Test URL without leading slash should fail - let url = Url::parse("s3://bucket").unwrap(); - let result = s3::url_path(&url); - assert!(result.is_err()); - } - - #[test] - fn test_calculate_list_prefix() { - // Test with wildcard in filename - let result = s3::list_prefix("folder/file*.txt"); - assert_eq!(result, "folder/"); - - // Test with wildcard in directory - let result = s3::list_prefix("folder/*/file.txt"); - assert_eq!(result, "folder/"); - - // Test with nested directories and wildcard - let result = s3::list_prefix("data/2023/*/logs/*.log"); - assert_eq!(result, "data/2023/"); - - // Test with wildcard at root level - let result = s3::list_prefix("*.txt"); - assert_eq!(result, ""); - - // Test with no wildcards - let result = s3::list_prefix("folder/subfolder/file.txt"); - assert_eq!(result, "folder/subfolder/"); - - // Test with question mark wildcard - let result = s3::list_prefix("folder/file?.txt"); - assert_eq!(result, "folder/"); - - // Test with bracket wildcards - let result = s3::list_prefix("folder/file[abc].txt"); - assert_eq!(result, "folder/"); - - // Test empty path - let result = s3::list_prefix(""); - assert_eq!(result, ""); - } - - #[test] - fn test_s3_url_parsing_integration() { - // Test complete S3 URL parsing workflow - let url = Url::parse("s3://my-bucket/data/year=2023/month=*/day=*/events.parquet").unwrap(); - - let url_path = s3::url_path(&url).unwrap(); - assert_eq!(url_path, "data/year=2023/month=*/day=*/events.parquet"); - - let list_prefix = s3::list_prefix(url_path); - assert_eq!(list_prefix, "data/year=2023/"); - } - - #[test] - fn test_s3_url_parsing_edge_cases() { - // Test URL with multiple consecutive wildcards - let url = Url::parse("s3://bucket/logs/**/*.log").unwrap(); - let url_path = s3::url_path(&url).unwrap(); - let list_prefix = s3::list_prefix(url_path); - assert_eq!(list_prefix, "logs/"); - - // Test URL with wildcard at the beginning - let url = Url::parse("s3://bucket/*.txt").unwrap(); - let url_path = s3::url_path(&url).unwrap(); - let list_prefix = s3::list_prefix(url_path); - assert_eq!(list_prefix, ""); - - // Test deeply nested path with wildcard - let url = Url::parse("s3://bucket/a/b/c/d/e/f/g/*.json").unwrap(); - let url_path = s3::url_path(&url).unwrap(); - let list_prefix = s3::list_prefix(url_path); - assert_eq!(list_prefix, "a/b/c/d/e/f/g/"); - } - - #[test] - fn test_s3_url_parsing_no_wildcards() { - let url = Url::parse("s3://bucket/path/to/specific/file.txt").unwrap(); - let url_path = s3::url_path(&url).unwrap(); - let list_prefix = s3::list_prefix(url_path); - assert_eq!(list_prefix, "path/to/specific/"); - } - - #[test] - fn test_validate_glob_valid_pattern() { - assert!(s3::validate_glob("s3://bucket/path/*.txt").is_ok()); - } - - #[test] - fn test_validate_glob_escaped_asterisk() { - let result = s3::validate_glob("s3://bucket/path\\*.txt"); - assert!(result.is_err()); - assert!(result.unwrap_err().to_string().contains("\\*")); - } - - #[test] - fn test_validate_glob_escaped_question_mark() { - let result = s3::validate_glob("s3://bucket/path\\?.txt"); - assert!(result.is_err()); - assert!(result.unwrap_err().to_string().contains("\\?")); - } - - #[test] - fn test_validate_glob_escaped_bracket() { - let result = s3::validate_glob("s3://bucket/path\\[test].txt"); - assert!(result.is_err()); - assert!(result.unwrap_err().to_string().contains("\\[")); - } } From 4fffe00d03cd0cf7edcba971d5bd475adfb3af87 Mon Sep 17 00:00:00 2001 From: Ruoshi Date: Wed, 28 Jan 2026 21:42:41 -0800 Subject: [PATCH 2/7] refactor(tests): update context setup for DuckDB glob tests --- vortex-duckdb/src/utils/glob.rs | 29 +++++++++++------------------ 1 file changed, 11 insertions(+), 18 deletions(-) diff --git a/vortex-duckdb/src/utils/glob.rs b/vortex-duckdb/src/utils/glob.rs index 13718ea63dd..c799c2afff3 100644 --- a/vortex-duckdb/src/utils/glob.rs +++ b/vortex-duckdb/src/utils/glob.rs @@ -86,9 +86,11 @@ mod tests { use super::*; use crate::duckdb::Database; - fn ctx() -> ClientContext { + fn ctx_bundle() -> (Database, crate::duckdb::Connection, ClientContext) { let db = Database::open_in_memory().unwrap(); - db.connect().unwrap().client_context().unwrap() + let conn = db.connect().unwrap(); + let ctx = conn.client_context().unwrap(); + (db, conn, ctx) } fn set_glob_hook(hook: Option) { @@ -124,9 +126,13 @@ mod tests { let expected = url.clone(); set_glob_hook(Some(Box::new(move |_, _| Ok(vec![url.clone()])))); - let (urls, meta) = - futures::executor::block_on(expand_glob(&ctx(), "http://example.com/data.vortex")) - .unwrap(); + let (_db, _conn, ctx) = ctx_bundle(); + + let (urls, meta) = futures::executor::block_on(expand_glob( + &ctx, + "http://example.com/data.vortex", + )) + .unwrap(); assert_eq!(meta, None); assert_eq!(urls.len(), 1); assert_eq!(urls[0], expected); @@ -134,19 +140,6 @@ mod tests { set_glob_hook(None); } - #[test] - fn test_duckdb_glob_http_empty() { - let _guard = GLOB_TEST_GUARD.get_or_init(|| Mutex::new(())).lock(); - set_glob_hook(Some(Box::new(|_, _| Ok(vec![])))); - - let (urls, _) = - futures::executor::block_on(expand_glob(&ctx(), "http://example.com/none.vortex")) - .unwrap(); - assert!(urls.is_empty()); - - set_glob_hook(None); - } - #[test] fn test_expand_local_disk_glob_single_file() { let temp_dir = TempDir::new().unwrap(); From a131078864195a6e5ea4244334acc004a0dab2c7 Mon Sep 17 00:00:00 2001 From: Ruoshi Date: Sat, 31 Jan 2026 13:56:58 -0800 Subject: [PATCH 3/7] refactor: address PR comments. --- .gitignore | 3 +- vortex-duckdb/cpp/file_system.cpp | 49 ++++++++++++++----------- vortex-duckdb/src/copy.rs | 28 +++++++++----- vortex-duckdb/src/duckdb/file_system.rs | 37 ++++++++++--------- vortex-duckdb/src/scan.rs | 34 ++++++----------- vortex-duckdb/src/utils/glob.rs | 8 ++-- vortex-duckdb/src/utils/mod.rs | 1 - vortex-duckdb/src/utils/object_store.rs | 37 ------------------- 8 files changed, 80 insertions(+), 117 deletions(-) delete mode 100644 vortex-duckdb/src/utils/object_store.rs diff --git a/.gitignore b/.gitignore index a68417f916f..63fd9846d40 100644 --- a/.gitignore +++ b/.gitignore @@ -218,8 +218,9 @@ profile.json.gz # Clang compilation database (https://clang.llvm.org/docs/JSONCompilationDatabase.html) compile_commands.json -# Claude +# AI Agents .claude +.opencode # cargo sweep output sweep.timestamp diff --git a/vortex-duckdb/cpp/file_system.cpp b/vortex-duckdb/cpp/file_system.cpp index d515fa9b280..ba98567b754 100644 --- a/vortex-duckdb/cpp/file_system.cpp +++ b/vortex-duckdb/cpp/file_system.cpp @@ -5,14 +5,16 @@ #include #include +#include #include -#include #include +#include +#include +#include using namespace duckdb; -namespace { struct FileHandleWrapper { explicit FileHandleWrapper(unique_ptr handle_p) : handle(std::move(handle_p)) { } @@ -20,28 +22,31 @@ struct FileHandleWrapper { unique_ptr handle; }; -void SetError(duckdb_vx_error *error_out, const std::string &message) { +static void SetError(duckdb_vx_error *error_out, const std::string &message) { if (!error_out) { return; } *error_out = duckdb_vx_error_create(message.data(), message.size()); } -duckdb_state HandleException(duckdb_vx_error *error_out) { +static duckdb_state HandleException(std::exception_ptr ex, duckdb_vx_error *error_out) { + if (!ex) { + SetError(error_out, "Unknown error"); + return DuckDBError; + } + try { - throw; - } catch (const Exception &ex) { - SetError(error_out, ex.what()); - } catch (const std::exception &ex) { - SetError(error_out, ex.what()); + std::rethrow_exception(ex); + } catch (const Exception &caught) { + SetError(error_out, caught.what()); + } catch (const std::exception &caught) { + SetError(error_out, caught.what()); } catch (...) { SetError(error_out, "Unknown error"); } return DuckDBError; } -} // namespace - extern "C" duckdb_vx_file_handle duckdb_vx_fs_open(duckdb_vx_client_context ctx, const char *path, duckdb_vx_error *error_out) { if (!ctx || !path) { @@ -55,7 +60,7 @@ extern "C" duckdb_vx_file_handle duckdb_vx_fs_open(duckdb_vx_client_context ctx, auto handle = fs.OpenFile(path, FileFlags::FILE_FLAGS_READ); return reinterpret_cast(new FileHandleWrapper(std::move(handle))); } catch (...) { - HandleException(error_out); + HandleException(std::current_exception(), error_out); return nullptr; } } @@ -74,7 +79,7 @@ extern "C" duckdb_vx_file_handle duckdb_vx_fs_create(duckdb_vx_client_context ct handle->Truncate(0); return reinterpret_cast(new FileHandleWrapper(std::move(handle))); } catch (...) { - HandleException(error_out); + HandleException(std::current_exception(), error_out); return nullptr; } } @@ -100,7 +105,7 @@ extern "C" duckdb_state duckdb_vx_fs_get_size(duckdb_vx_file_handle handle, idx_ *size_out = wrapper->handle->GetFileSize(); return DuckDBSuccess; } catch (...) { - return HandleException(error_out); + return HandleException(std::current_exception(), error_out); } } @@ -117,7 +122,7 @@ extern "C" duckdb_state duckdb_vx_fs_read(duckdb_vx_file_handle handle, idx_t of *out_len = len; return DuckDBSuccess; } catch (...) { - return HandleException(error_out); + return HandleException(std::current_exception(), error_out); } } @@ -136,7 +141,7 @@ extern "C" duckdb_state duckdb_vx_fs_write(duckdb_vx_file_handle handle, idx_t o *out_len = len; return DuckDBSuccess; } catch (...) { - return HandleException(error_out); + return HandleException(std::current_exception(), error_out); } } @@ -159,10 +164,10 @@ extern "C" duckdb_vx_string_list duckdb_vx_fs_glob(duckdb_vx_client_context ctx, } result.count = matches.size(); - result.entries = static_cast(malloc(sizeof(char *) * matches.size())); + result.entries = static_cast(duckdb_malloc(sizeof(char *) * matches.size())); for (size_t i = 0; i < matches.size(); i++) { const auto &entry = matches[i].path; - auto *owned = static_cast(malloc(entry.size() + 1)); + auto *owned = static_cast(duckdb_malloc(entry.size() + 1)); std::memcpy(owned, entry.data(), entry.size()); owned[entry.size()] = '\0'; result.entries[i] = owned; @@ -170,7 +175,7 @@ extern "C" duckdb_vx_string_list duckdb_vx_fs_glob(duckdb_vx_client_context ctx, return result; } catch (...) { - HandleException(error_out); + HandleException(std::current_exception(), error_out); return result; } } @@ -180,9 +185,9 @@ extern "C" void duckdb_vx_string_list_free(duckdb_vx_string_list *list) { return; } for (size_t i = 0; i < list->count; i++) { - free(const_cast(list->entries[i])); + duckdb_free(const_cast(list->entries[i])); } - free(list->entries); + duckdb_free(list->entries); list->entries = nullptr; list->count = 0; } @@ -198,6 +203,6 @@ extern "C" duckdb_state duckdb_vx_fs_sync(duckdb_vx_file_handle handle, duckdb_v wrapper->handle->Sync(); return DuckDBSuccess; } catch (...) { - return HandleException(error_out); + return HandleException(std::current_exception(), error_out); } } diff --git a/vortex-duckdb/src/copy.rs b/vortex-duckdb/src/copy.rs index 56f32fc41e2..725c8a75132 100644 --- a/vortex-duckdb/src/copy.rs +++ b/vortex-duckdb/src/copy.rs @@ -3,6 +3,7 @@ use std::fmt::Debug; use std::iter; +use std::ptr::NonNull; use futures::SinkExt; use futures::TryStreamExt; @@ -40,9 +41,15 @@ use crate::duckdb::duckdb_fs_create_writer; pub struct VortexCopyFunction; #[derive(Clone, Copy)] -struct SendableClientCtx(usize); +struct SendableClientCtx(NonNull); unsafe impl Send for SendableClientCtx {} +impl SendableClientCtx { + fn as_ptr(self) -> cpp::duckdb_vx_client_context { + self.0.as_ptr() + } +} + pub struct BindData { dtype: DType, fields: StructFields, @@ -134,16 +141,17 @@ impl CopyFunction for VortexCopyFunction { let array_stream = ArrayStreamAdapter::new(bind_data.dtype.clone(), rx.into_stream()); let handle = SESSION.handle(); - let ctx_ptr = SendableClientCtx(client_context.as_ptr() as usize); + let ctx_ptr = SendableClientCtx( + NonNull::new(client_context.as_ptr()) + .vortex_expect("Client context pointer should not be null"), + ); let write_task = handle.spawn(async move { - // Prefer DuckDB FS (httpfs/s3/etc.), fallback to local async fs if unavailable. - let ctx_raw = ctx_ptr.0 as cpp::duckdb_vx_client_context; - if let Ok(writer) = unsafe { duckdb_fs_create_writer(ctx_raw, &file_path) } { - SESSION.write_options().write(writer, array_stream).await - } else { - let mut file = async_fs::File::create(&file_path).await?; - SESSION.write_options().write(&mut file, array_stream).await - } + // Use DuckDB FS exclusively to match the DuckDB client context configuration. + let writer = + unsafe { duckdb_fs_create_writer(ctx_ptr.as_ptr(), &file_path) }.map_err(|e| { + vortex_err!("Failed to create DuckDB FS writer for {file_path}: {e}") + })?; + SESSION.write_options().write(writer, array_stream).await }); let worker_pool = RUNTIME.new_pool(); diff --git a/vortex-duckdb/src/duckdb/file_system.rs b/vortex-duckdb/src/duckdb/file_system.rs index 8e7dfe30212..8f70b627d07 100644 --- a/vortex-duckdb/src/duckdb/file_system.rs +++ b/vortex-duckdb/src/duckdb/file_system.rs @@ -9,18 +9,17 @@ use std::sync::OnceLock; use futures::FutureExt; use futures::future::BoxFuture; -use parking_lot::Mutex; +use vortex::array::buffer::BufferHandle; use vortex::buffer::Alignment; -use vortex::buffer::ByteBuffer; use vortex::buffer::ByteBufferMut; use vortex::error::VortexError; use vortex::error::VortexResult; use vortex::error::vortex_err; use vortex::io::CoalesceConfig; -use vortex::io::VortexReadAt; -use vortex::io::runtime::BlockingRuntime; use vortex::io::IoBuf; +use vortex::io::VortexReadAt; use vortex::io::VortexWrite; +use vortex::io::runtime::BlockingRuntime; use crate::RUNTIME; use crate::cpp; @@ -32,6 +31,10 @@ const DEFAULT_COALESCE: CoalesceConfig = CoalesceConfig { max_size: 8 * 1024 * 1024, // 8 MB }; +// Local cap to keep remote reads parallel without overwhelming typical per-host connection limits. +// The DuckDB httpfs extension does not expose a fixed default concurrency in the vendored sources; +// 64 is a conservative ceiling that stays well below common cloud limits while keeping range reads +// busy. const DEFAULT_CONCURRENCY: usize = 64; lifetime_wrapper!(FsFileHandle, cpp::duckdb_vx_file_handle, cpp::duckdb_vx_fs_close, [owned, ref]); @@ -83,7 +86,7 @@ pub unsafe fn duckdb_fs_create_writer( /// A VortexReadAt implementation backed by DuckDB's filesystem (e.g., httpfs/s3). pub struct DuckDbFsReadAt { - handle: Arc>, + handle: Arc, uri: Arc, size: Arc>, } @@ -101,7 +104,7 @@ impl DuckDbFsReadAt { } Ok(Self { - handle: Arc::new(Mutex::new(unsafe { FsFileHandle::own(handle) })), + handle: Arc::new(unsafe { FsFileHandle::own(handle) }), uri: Arc::from(url.as_str()), size: Arc::new(OnceLock::new()), }) @@ -136,11 +139,7 @@ impl VortexReadAt for DuckDbFsReadAt { let mut err: cpp::duckdb_vx_error = ptr::null_mut(); let mut size_out: cpp::idx_t = 0; let status = unsafe { - cpp::duckdb_vx_fs_get_size( - handle.lock().as_ptr(), - &raw mut size_out, - &raw mut err, - ) + cpp::duckdb_vx_fs_get_size(handle.as_ptr(), &raw mut size_out, &raw mut err) }; if status != cpp::duckdb_state::DuckDBSuccess { return Err(fs_error(err)); @@ -160,13 +159,13 @@ impl VortexReadAt for DuckDbFsReadAt { offset: u64, length: usize, alignment: Alignment, - ) -> BoxFuture<'static, VortexResult> { + ) -> BoxFuture<'static, VortexResult> { let handle = self.handle.clone(); async move { let runtime = RUNTIME.handle(); - runtime - .spawn_blocking(move || { + let result: VortexResult = runtime + .spawn_blocking(move || -> VortexResult { let mut buffer = ByteBufferMut::with_capacity_aligned(length, alignment); unsafe { buffer.set_len(length) }; @@ -174,7 +173,7 @@ impl VortexReadAt for DuckDbFsReadAt { let mut out_len: cpp::idx_t = 0; let status = unsafe { cpp::duckdb_vx_fs_read( - handle.lock().as_ptr(), + handle.as_ptr(), offset as cpp::idx_t, length as cpp::idx_t, buffer.as_mut_slice().as_mut_ptr(), @@ -191,15 +190,17 @@ impl VortexReadAt for DuckDbFsReadAt { .map_err(|e| vortex_err!("Invalid read len: {e}"))?; unsafe { buffer.set_len(used) }; - Ok::<_, VortexError>(buffer.freeze()) + let frozen = buffer.freeze(); + Ok::<_, VortexError>(BufferHandle::new_host(frozen)) }) - .await + .await; + result } .boxed() } } -// SAFETY: Access is serialized via a mutex and DuckDB's file handles are thread-safe for reads. +// SAFETY: DuckDB file handles are thread-safe for reads; writes are done via DuckDbFsWriter. unsafe impl Send for DuckDbFsReadAt {} unsafe impl Sync for DuckDbFsReadAt {} diff --git a/vortex-duckdb/src/scan.rs b/vortex-duckdb/src/scan.rs index 786ed15102c..c1d179036b3 100644 --- a/vortex-duckdb/src/scan.rs +++ b/vortex-duckdb/src/scan.rs @@ -7,6 +7,7 @@ use std::fmt; use std::fmt::Debug; use std::fmt::Formatter; use std::pin::Pin; +use std::ptr::NonNull; use std::sync::Arc; use std::sync::atomic::AtomicU64; use std::sync::atomic::Ordering; @@ -71,7 +72,6 @@ use crate::duckdb::footer_cache::FooterCache; use crate::exporter::ArrayExporter; use crate::exporter::ConversionCache; use crate::utils::glob::expand_glob; -use crate::utils::object_store::s3_store; pub struct VortexBindData { first_file: VortexFile, @@ -207,13 +207,12 @@ fn extract_table_filter_expr( /// Helper function to open a Vortex file from either a local path or a URL supported by DuckDB's filesystem. #[derive(Clone, Copy)] -struct SendableClientCtx(usize); +struct SendableClientCtx(NonNull); unsafe impl Send for SendableClientCtx {} unsafe impl Sync for SendableClientCtx {} fn open_duckdb_reader(client_ctx: SendableClientCtx, url: &Url) -> VortexResult { - let ctx_ptr = client_ctx.0 as cpp::duckdb_vx_client_context; - unsafe { DuckDbFsReadAt::open_url(ctx_ptr, url) } + unsafe { DuckDbFsReadAt::open_url(client_ctx.0.as_ptr(), url) } } async fn open_file( @@ -223,23 +222,7 @@ async fn open_file( ) -> VortexResult { match url.scheme() { "http" | "https" | "s3" => { - let reader = open_duckdb_reader(client_ctx, &url); - - // Fallback to the legacy object_store path for s3 if DuckDB fs isn't configured. - if url.scheme() == "s3" && reader.is_err() { - let bucket = url - .host_str() - .ok_or_else(|| vortex_err!("Failed to extract bucket name from URL: {url}"))?; - - let path = url - .path() - .strip_prefix("/") - .ok_or_else(|| vortex_err!("Invalid S3 path: {url}"))?; - - return options.open_object_store(&s3_store(bucket)?, path).await; - } - - let reader = Arc::new(reader?); + let reader = Arc::new(open_duckdb_reader(client_ctx, &url)?); options.open(reader).await } _ => { @@ -313,7 +296,9 @@ impl TableFunction for VortexTableFunction { }; let footer_cache = FooterCache::new(ctx.object_cache()); - let client_ctx = SendableClientCtx(ctx.as_ptr() as usize); + let client_ctx = SendableClientCtx( + NonNull::new(ctx.as_ptr()).vortex_expect("Client context pointer should not be null"), + ); let entry = footer_cache.entry(first_file_url.as_ref()); let first_file = RUNTIME.block_on(async move { let options = entry.apply_to_file(SESSION.open_options()); @@ -420,7 +405,10 @@ impl TableFunction for VortexTableFunction { let num_workers = bind_data.max_threads as usize; let client_context = init_input.client_context()?; - let client_ctx_ptr = SendableClientCtx(client_context.as_ptr() as usize); + let client_ctx_ptr = SendableClientCtx( + NonNull::new(client_context.as_ptr()) + .vortex_expect("Client context pointer should not be null"), + ); let object_cache = client_context.object_cache(); let handle = RUNTIME.handle(); diff --git a/vortex-duckdb/src/utils/glob.rs b/vortex-duckdb/src/utils/glob.rs index c799c2afff3..01db505b03b 100644 --- a/vortex-duckdb/src/utils/glob.rs +++ b/vortex-duckdb/src/utils/glob.rs @@ -128,11 +128,9 @@ mod tests { let (_db, _conn, ctx) = ctx_bundle(); - let (urls, meta) = futures::executor::block_on(expand_glob( - &ctx, - "http://example.com/data.vortex", - )) - .unwrap(); + let (urls, meta) = + futures::executor::block_on(expand_glob(&ctx, "http://example.com/data.vortex")) + .unwrap(); assert_eq!(meta, None); assert_eq!(urls.len(), 1); assert_eq!(urls[0], expected); diff --git a/vortex-duckdb/src/utils/mod.rs b/vortex-duckdb/src/utils/mod.rs index ecb071cbc82..b5090ad8fd4 100644 --- a/vortex-duckdb/src/utils/mod.rs +++ b/vortex-duckdb/src/utils/mod.rs @@ -2,4 +2,3 @@ // SPDX-FileCopyrightText: Copyright the Vortex contributors pub mod glob; -pub mod object_store; diff --git a/vortex-duckdb/src/utils/object_store.rs b/vortex-duckdb/src/utils/object_store.rs deleted file mode 100644 index d9be329177f..00000000000 --- a/vortex-duckdb/src/utils/object_store.rs +++ /dev/null @@ -1,37 +0,0 @@ -// SPDX-License-Identifier: Apache-2.0 -// SPDX-FileCopyrightText: Copyright the Vortex contributors - -use std::sync::Arc; -use std::sync::OnceLock; - -use object_store::ObjectStore; -use object_store::aws::AmazonS3Builder; -use vortex::error::VortexResult; -use vortex::error::vortex_err; -use vortex_utils::aliases::dash_map::DashMap; - -// Global S3 object store cache. -pub fn s3_store(bucket: &str) -> VortexResult> { - static S3_STORES: OnceLock>> = OnceLock::new(); - let stores = S3_STORES.get_or_init(|| DashMap::with_hasher(Default::default())); - - fn create_s3_object_store(bucket: &str) -> VortexResult> { - Ok(Arc::new( - AmazonS3Builder::from_env() - .with_bucket_name(bucket) - .build() - .map_err(|e| vortex_err!("Failed to create S3 store: {}", e))?, - ) as Arc) - } - - let object_store = match stores.get(bucket) { - Some(store) => store.clone(), - None => { - let store = create_s3_object_store(bucket)?; - stores.insert(bucket.to_string(), store.clone()); - store - } - }; - - Ok(object_store) -} From 4a07f61144999bd87bc1b9618e14ac72482cd185 Mon Sep 17 00:00:00 2001 From: Ruoshi Date: Wed, 4 Feb 2026 10:12:28 -0800 Subject: [PATCH 4/7] refactor: remove unused dependencies and improve error handling in DuckDB integration --- Cargo.lock | 2 - vortex-duckdb/Cargo.toml | 2 - vortex-duckdb/cpp/error.cpp | 32 ++- vortex-duckdb/cpp/file_system.cpp | 35 +-- vortex-duckdb/cpp/include/duckdb_vx/error.hpp | 5 + vortex-duckdb/src/copy.rs | 17 +- vortex-duckdb/src/duckdb/client_context.rs | 22 ++ vortex-duckdb/src/duckdb/file_system.rs | 24 +- vortex-duckdb/src/lib.rs | 1 - vortex-duckdb/src/scan.rs | 78 ++++--- vortex-duckdb/src/utils/glob.rs | 211 ------------------ vortex-duckdb/src/utils/mod.rs | 4 - 12 files changed, 125 insertions(+), 308 deletions(-) delete mode 100644 vortex-duckdb/src/utils/glob.rs delete mode 100644 vortex-duckdb/src/utils/mod.rs diff --git a/Cargo.lock b/Cargo.lock index 1d3854d23ef..c16fdcb7e27 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -10582,7 +10582,6 @@ name = "vortex-duckdb" version = "0.1.0" dependencies = [ "anyhow", - "async-compat", "async-fs", "bindgen", "bitvec", @@ -10593,7 +10592,6 @@ dependencies = [ "itertools 0.14.0", "jiff", "num-traits", - "object_store", "once_cell", "parking_lot", "paste", diff --git a/vortex-duckdb/Cargo.toml b/vortex-duckdb/Cargo.toml index bcd3d84a365..9dfb1edeba5 100644 --- a/vortex-duckdb/Cargo.toml +++ b/vortex-duckdb/Cargo.toml @@ -25,14 +25,12 @@ crate-type = ["staticlib", "cdylib", "rlib"] [dependencies] anyhow = { workspace = true } -async-compat = { workspace = true } async-fs = { workspace = true } bitvec = { workspace = true } futures = { workspace = true } glob = { workspace = true } itertools = { workspace = true } num-traits = { workspace = true } -object_store = { workspace = true, features = ["aws"] } parking_lot = { workspace = true } paste = { workspace = true } tempfile = { workspace = true } diff --git a/vortex-duckdb/cpp/error.cpp b/vortex-duckdb/cpp/error.cpp index 99561cc10f6..112d721b336 100644 --- a/vortex-duckdb/cpp/error.cpp +++ b/vortex-duckdb/cpp/error.cpp @@ -1,6 +1,10 @@ // SPDX-License-Identifier: Apache-2.0 // SPDX-FileCopyrightText: Copyright the Vortex contributors +#include +#include + +#include "duckdb/common/exception.hpp" #include "duckdb/common/types/vector_buffer.hpp" #include "duckdb/common/types/vector.hpp" @@ -24,9 +28,35 @@ namespace vortex { std::string IntoErrString(duckdb_vx_error error) { if (!error) { - return nullptr; + return {}; } return *reinterpret_cast(error); } +void SetError(duckdb_vx_error *error_out, const std::string &message) { + assert(error_out && "SetError called with null error_out"); + if (!error_out) { + assert(error_out && "SetError called with null error_out"); + } + *error_out = duckdb_vx_error_create(message.data(), message.size()); +} + +duckdb_state HandleException(std::exception_ptr ex, duckdb_vx_error *error_out) { + if (!ex) { + SetError(error_out, "Unknown error"); + return DuckDBError; + } + + try { + std::rethrow_exception(ex); + } catch (const duckdb::Exception &caught) { + SetError(error_out, caught.what()); + } catch (const std::exception &caught) { + SetError(error_out, caught.what()); + } catch (...) { + SetError(error_out, "Unknown error"); + } + return DuckDBError; +} + } // namespace vortex diff --git a/vortex-duckdb/cpp/file_system.cpp b/vortex-duckdb/cpp/file_system.cpp index ba98567b754..28ba173adbb 100644 --- a/vortex-duckdb/cpp/file_system.cpp +++ b/vortex-duckdb/cpp/file_system.cpp @@ -2,6 +2,7 @@ // SPDX-FileCopyrightText: Copyright the Vortex contributors #include "duckdb_vx.h" +#include "duckdb_vx/error.hpp" #include #include @@ -22,30 +23,8 @@ struct FileHandleWrapper { unique_ptr handle; }; -static void SetError(duckdb_vx_error *error_out, const std::string &message) { - if (!error_out) { - return; - } - *error_out = duckdb_vx_error_create(message.data(), message.size()); -} - -static duckdb_state HandleException(std::exception_ptr ex, duckdb_vx_error *error_out) { - if (!ex) { - SetError(error_out, "Unknown error"); - return DuckDBError; - } - - try { - std::rethrow_exception(ex); - } catch (const Exception &caught) { - SetError(error_out, caught.what()); - } catch (const std::exception &caught) { - SetError(error_out, caught.what()); - } catch (...) { - SetError(error_out, "Unknown error"); - } - return DuckDBError; -} +using vortex::HandleException; +using vortex::SetError; extern "C" duckdb_vx_file_handle duckdb_vx_fs_open(duckdb_vx_client_context ctx, const char *path, duckdb_vx_error *error_out) { @@ -57,7 +36,7 @@ extern "C" duckdb_vx_file_handle duckdb_vx_fs_open(duckdb_vx_client_context ctx, try { auto *client_context = reinterpret_cast(ctx); auto &fs = FileSystem::GetFileSystem(*client_context); - auto handle = fs.OpenFile(path, FileFlags::FILE_FLAGS_READ); + auto handle = fs.OpenFile(path, FileFlags::FILE_FLAGS_READ | FileFlags::FILE_FLAGS_PARALLEL_ACCESS); return reinterpret_cast(new FileHandleWrapper(std::move(handle))); } catch (...) { HandleException(std::current_exception(), error_out); @@ -75,7 +54,8 @@ extern "C" duckdb_vx_file_handle duckdb_vx_fs_create(duckdb_vx_client_context ct try { auto *client_context = reinterpret_cast(ctx); auto &fs = FileSystem::GetFileSystem(*client_context); - auto handle = fs.OpenFile(path, FileFlags::FILE_FLAGS_WRITE | FileFlags::FILE_FLAGS_FILE_CREATE); + auto handle = fs.OpenFile(path, FileFlags::FILE_FLAGS_WRITE | FileFlags::FILE_FLAGS_FILE_CREATE | + FileFlags::FILE_FLAGS_PARALLEL_ACCESS); handle->Truncate(0); return reinterpret_cast(new FileHandleWrapper(std::move(handle))); } catch (...) { @@ -136,8 +116,7 @@ extern "C" duckdb_state duckdb_vx_fs_write(duckdb_vx_file_handle handle, idx_t o try { auto *wrapper = reinterpret_cast(handle); - wrapper->handle->Seek(offset); - wrapper->handle->Write(const_cast(buffer), len); + wrapper->handle->Write(QueryContext(), const_cast(buffer), len, offset); *out_len = len; return DuckDBSuccess; } catch (...) { diff --git a/vortex-duckdb/cpp/include/duckdb_vx/error.hpp b/vortex-duckdb/cpp/include/duckdb_vx/error.hpp index 0e709d59a80..15f2aaa0798 100644 --- a/vortex-duckdb/cpp/include/duckdb_vx/error.hpp +++ b/vortex-duckdb/cpp/include/duckdb_vx/error.hpp @@ -3,9 +3,14 @@ #pragma once +#include #include + +#include "duckdb.h" #include "duckdb_vx/error.h" namespace vortex { std::string IntoErrString(duckdb_vx_error error); +void SetError(duckdb_vx_error *error_out, const std::string &message); +duckdb_state HandleException(std::exception_ptr ex, duckdb_vx_error *error_out); } diff --git a/vortex-duckdb/src/copy.rs b/vortex-duckdb/src/copy.rs index 725c8a75132..00428a4574c 100644 --- a/vortex-duckdb/src/copy.rs +++ b/vortex-duckdb/src/copy.rs @@ -3,7 +3,6 @@ use std::fmt::Debug; use std::iter; -use std::ptr::NonNull; use futures::SinkExt; use futures::TryStreamExt; @@ -30,7 +29,6 @@ use crate::RUNTIME; use crate::SESSION; use crate::convert::data_chunk_to_vortex; use crate::convert::from_duckdb_table; -use crate::cpp; use crate::duckdb::ClientContext; use crate::duckdb::CopyFunction; use crate::duckdb::DataChunk; @@ -40,16 +38,6 @@ use crate::duckdb::duckdb_fs_create_writer; #[derive(Debug)] pub struct VortexCopyFunction; -#[derive(Clone, Copy)] -struct SendableClientCtx(NonNull); -unsafe impl Send for SendableClientCtx {} - -impl SendableClientCtx { - fn as_ptr(self) -> cpp::duckdb_vx_client_context { - self.0.as_ptr() - } -} - pub struct BindData { dtype: DType, fields: StructFields, @@ -141,10 +129,7 @@ impl CopyFunction for VortexCopyFunction { let array_stream = ArrayStreamAdapter::new(bind_data.dtype.clone(), rx.into_stream()); let handle = SESSION.handle(); - let ctx_ptr = SendableClientCtx( - NonNull::new(client_context.as_ptr()) - .vortex_expect("Client context pointer should not be null"), - ); + let ctx_ptr = client_context.as_sendable(); let write_task = handle.spawn(async move { // Use DuckDB FS exclusively to match the DuckDB client context configuration. let writer = diff --git a/vortex-duckdb/src/duckdb/client_context.rs b/vortex-duckdb/src/duckdb/client_context.rs index 318df5324ab..87b063c89b9 100644 --- a/vortex-duckdb/src/duckdb/client_context.rs +++ b/vortex-duckdb/src/duckdb/client_context.rs @@ -17,7 +17,16 @@ wrapper!( |_| {} ); +#[derive(Clone, Copy)] +pub(crate) struct SendableClientContext(cpp::duckdb_vx_client_context); +unsafe impl Send for SendableClientContext {} +unsafe impl Sync for SendableClientContext {} + impl ClientContext { + pub(crate) fn as_sendable(&self) -> SendableClientContext { + SendableClientContext(self.as_ptr()) + } + /// Get the object cache for this client context. pub fn object_cache(&self) -> ObjectCacheRef<'static> { unsafe { @@ -43,3 +52,16 @@ impl ClientContext { } } } + +impl SendableClientContext { + pub(crate) fn as_ptr(self) -> cpp::duckdb_vx_client_context { + self.0 + } +} + +// SAFETY: ClientContext is an opaque pointer owned by DuckDB and remains valid for the lifetime of +// the DuckDB connection. DuckDB guards access to client context state internally (see +// duckdb/main/client_context.hpp) so passing it across threads for FFI calls is safe when the +// connection is alive. +unsafe impl Send for ClientContext {} +unsafe impl Sync for ClientContext {} diff --git a/vortex-duckdb/src/duckdb/file_system.rs b/vortex-duckdb/src/duckdb/file_system.rs index 8f70b627d07..31b43b2fe0a 100644 --- a/vortex-duckdb/src/duckdb/file_system.rs +++ b/vortex-duckdb/src/duckdb/file_system.rs @@ -40,9 +40,6 @@ const DEFAULT_CONCURRENCY: usize = 64; lifetime_wrapper!(FsFileHandle, cpp::duckdb_vx_file_handle, cpp::duckdb_vx_fs_close, [owned, ref]); unsafe impl Send for FsFileHandle {} unsafe impl Sync for FsFileHandle {} -unsafe impl Send for cpp::duckdb_vx_client_context_ {} -unsafe impl Sync for cpp::duckdb_vx_client_context_ {} - fn fs_error(err: cpp::duckdb_vx_error) -> VortexError { if err.is_null() { return vortex_err!("DuckDB filesystem error (unknown)"); @@ -67,7 +64,7 @@ pub fn duckdb_fs_glob(ctx: &ClientContext, pattern: &str) -> VortexResult, uri: Arc, size: Arc>, } -impl DuckDbFsReadAt { +impl DuckDbFsReader { pub unsafe fn open_url( ctx: cpp::duckdb_vx_client_context, url: &url::Url, @@ -111,7 +108,7 @@ impl DuckDbFsReadAt { } } -impl VortexReadAt for DuckDbFsReadAt { +impl VortexReadAt for DuckDbFsReader { fn uri(&self) -> Option<&Arc> { Some(&self.uri) } @@ -200,9 +197,11 @@ impl VortexReadAt for DuckDbFsReadAt { } } -// SAFETY: DuckDB file handles are thread-safe for reads; writes are done via DuckDbFsWriter. -unsafe impl Send for DuckDbFsReadAt {} -unsafe impl Sync for DuckDbFsReadAt {} +// SAFETY: DuckDB file handles can be used across threads when operations are position-based. The +// C++ bridge opens handles with FILE_FLAGS_PARALLEL_ACCESS, and writes use explicit offsets, so +// there is no shared cursor state. +unsafe impl Send for DuckDbFsReader {} +unsafe impl Sync for DuckDbFsReader {} pub struct DuckDbFsWriter { handle: FsFileHandle, @@ -230,11 +229,12 @@ impl VortexWrite for DuckDbFsWriter { let len = buffer.bytes_init(); let mut err: cpp::duckdb_vx_error = ptr::null_mut(); let mut out_len: cpp::idx_t = 0; + let offset = self.pos; let status = unsafe { cpp::duckdb_vx_fs_write( self.handle.as_ptr(), - self.pos as cpp::idx_t, + offset as cpp::idx_t, len as cpp::idx_t, buffer.read_ptr(), &raw mut out_len, @@ -246,7 +246,7 @@ impl VortexWrite for DuckDbFsWriter { return Err(std::io::Error::other(fs_error(err).to_string())); } - self.pos += len as u64; + self.pos = offset + len as u64; Ok(buffer) } diff --git a/vortex-duckdb/src/lib.rs b/vortex-duckdb/src/lib.rs index d4ffaf90eb0..28e29560ebe 100644 --- a/vortex-duckdb/src/lib.rs +++ b/vortex-duckdb/src/lib.rs @@ -27,7 +27,6 @@ mod convert; pub mod duckdb; pub mod exporter; mod scan; -mod utils; #[rustfmt::skip] #[path = "./cpp.rs"] diff --git a/vortex-duckdb/src/scan.rs b/vortex-duckdb/src/scan.rs index c1d179036b3..d42b7016930 100644 --- a/vortex-duckdb/src/scan.rs +++ b/vortex-duckdb/src/scan.rs @@ -7,20 +7,19 @@ use std::fmt; use std::fmt::Debug; use std::fmt::Formatter; use std::pin::Pin; -use std::ptr::NonNull; use std::sync::Arc; use std::sync::atomic::AtomicU64; use std::sync::atomic::Ordering; use std::task::Context; use std::task::Poll; -use async_compat::Compat; use futures::FutureExt; use futures::Stream; use futures::StreamExt; use futures::stream; use futures::stream::BoxStream; use futures::stream::SelectAll; +use glob::glob; use itertools::Itertools; use num_traits::AsPrimitive; use url::Url; @@ -37,7 +36,7 @@ use vortex::error::VortexExpect; use vortex::error::VortexResult; use vortex::error::vortex_bail; use vortex::error::vortex_err; -use vortex::expr::Expression; +use vortex::expr::Expression as VortexExpression; use vortex::expr::Pack; use vortex::expr::and_collect; use vortex::expr::col; @@ -55,27 +54,26 @@ use crate::RUNTIME; use crate::SESSION; use crate::convert::try_from_bound_expression; use crate::convert::try_from_table_filter; -use crate::cpp; -use crate::duckdb; use crate::duckdb::BindInput; use crate::duckdb::BindResult; use crate::duckdb::Cardinality; use crate::duckdb::ClientContext; use crate::duckdb::DataChunk; -use crate::duckdb::DuckDbFsReadAt; +use crate::duckdb::DuckDbFsReader; use crate::duckdb::ExtractedValue; use crate::duckdb::LogicalType; +use crate::duckdb::SendableClientContext; use crate::duckdb::TableFunction; use crate::duckdb::TableInitInput; use crate::duckdb::VirtualColumnsResult; +use crate::duckdb::duckdb_fs_glob; use crate::duckdb::footer_cache::FooterCache; use crate::exporter::ArrayExporter; use crate::exporter::ConversionCache; -use crate::utils::glob::expand_glob; pub struct VortexBindData { first_file: VortexFile, - filter_exprs: Vec, + filter_exprs: Vec, file_urls: Vec, column_names: Vec, column_types: Vec, @@ -148,7 +146,7 @@ fn extract_schema_from_vortex_file( } /// Creates a projection expression based on the table initialization input. -fn extract_projection_expr(init: &TableInitInput) -> Expression { +fn extract_projection_expr(init: &TableInitInput) -> VortexExpression { let projection_ids = init.projection_ids().unwrap_or(&[]); let column_ids = init.column_ids(); @@ -176,8 +174,8 @@ fn extract_projection_expr(init: &TableInitInput) -> Expres fn extract_table_filter_expr( init: &TableInitInput, column_ids: &[u64], -) -> VortexResult> { - let mut table_filter_exprs: HashSet = if let Some(filter) = init.table_filter_set() +) -> VortexResult> { + let mut table_filter_exprs: HashSet = if let Some(filter) = init.table_filter_set() { filter .into_iter() @@ -206,17 +204,15 @@ fn extract_table_filter_expr( } /// Helper function to open a Vortex file from either a local path or a URL supported by DuckDB's filesystem. -#[derive(Clone, Copy)] -struct SendableClientCtx(NonNull); -unsafe impl Send for SendableClientCtx {} -unsafe impl Sync for SendableClientCtx {} - -fn open_duckdb_reader(client_ctx: SendableClientCtx, url: &Url) -> VortexResult { - unsafe { DuckDbFsReadAt::open_url(client_ctx.0.as_ptr(), url) } +fn open_duckdb_reader( + client_ctx: SendableClientContext, + url: &Url, +) -> VortexResult { + unsafe { DuckDbFsReader::open_url(client_ctx.as_ptr(), url) } } async fn open_file( - client_ctx: SendableClientCtx, + client_ctx: SendableClientContext, url: Url, options: VortexOpenOptions, ) -> VortexResult { @@ -285,10 +281,35 @@ impl TableFunction for VortexTableFunction { tracing::trace!("running scan with max_threads {max_threads}"); - let (file_urls, _metadata) = RUNTIME.block_on(Compat::new(expand_glob( - ctx, - file_glob_string.as_ref().as_string(), - )))?; + let file_glob = file_glob_string.as_ref().as_string(); + let file_urls = match file_glob.as_str() { + s if s.starts_with("s3://") + || s.starts_with("https://") + || s.starts_with("http://") => + { + duckdb_fs_glob(ctx, s)? + } + s if s.starts_with("gs://") => { + vortex_bail!("GCS glob expansion not yet implemented") + } + _ => { + let paths = glob(file_glob.as_ref()) + .map_err(|e| vortex_err!("Failed to glob files: {e}"))? + .collect::, _>>() + .map_err(|e| vortex_err!("Failed to glob files: {e}"))?; + + paths + .into_iter() + .map(|path| { + let canonical = path.canonicalize().map_err(|e| { + vortex_err!("Cannot canonicalize file path {path:?}: {e}") + })?; + Url::from_file_path(&canonical) + .map_err(|_| vortex_err!("Invalid file path: {canonical:?}")) + }) + .collect::>>()? + } + }; // The first file is skipped in `create_file_paths_queue`. let Some(first_file_url) = file_urls.first() else { @@ -296,9 +317,7 @@ impl TableFunction for VortexTableFunction { }; let footer_cache = FooterCache::new(ctx.object_cache()); - let client_ctx = SendableClientCtx( - NonNull::new(ctx.as_ptr()).vortex_expect("Client context pointer should not be null"), - ); + let client_ctx = ctx.as_sendable(); let entry = footer_cache.entry(first_file_url.as_ref()); let first_file = RUNTIME.block_on(async move { let options = entry.apply_to_file(SESSION.open_options()); @@ -405,10 +424,7 @@ impl TableFunction for VortexTableFunction { let num_workers = bind_data.max_threads as usize; let client_context = init_input.client_context()?; - let client_ctx_ptr = SendableClientCtx( - NonNull::new(client_context.as_ptr()) - .vortex_expect("Client context pointer should not be null"), - ); + let client_ctx_ptr = client_context.as_sendable(); let object_cache = client_context.object_cache(); let handle = RUNTIME.handle(); @@ -486,7 +502,7 @@ impl TableFunction for VortexTableFunction { fn pushdown_complex_filter( bind_data: &mut Self::BindData, - expr: &duckdb::Expression, + expr: &crate::duckdb::Expression, ) -> VortexResult { let Some(expr) = try_from_bound_expression(expr)? else { return Ok(false); diff --git a/vortex-duckdb/src/utils/glob.rs b/vortex-duckdb/src/utils/glob.rs deleted file mode 100644 index 01db505b03b..00000000000 --- a/vortex-duckdb/src/utils/glob.rs +++ /dev/null @@ -1,211 +0,0 @@ -// SPDX-License-Identifier: Apache-2.0 -// SPDX-FileCopyrightText: Copyright the Vortex contributors - -#[cfg(test)] -use std::sync::OnceLock; - -use object_store::ObjectMeta; -#[cfg(test)] -use parking_lot::Mutex; -use url::Url; -use vortex::error::VortexResult; -use vortex::error::vortex_bail; -use vortex::error::vortex_err; - -use crate::duckdb::ClientContext; -use crate::duckdb::duckdb_fs_glob; - -#[cfg(test)] -type DuckdbGlobHook = Box VortexResult> + Send + Sync>; - -#[cfg(test)] -static DUCKDB_GLOB_HOOK: OnceLock>> = OnceLock::new(); - -#[cfg(test)] -static GLOB_TEST_GUARD: OnceLock> = OnceLock::new(); - -/// Expand a glob pattern into a list of URLs. -/// -/// Example: s3://bucket/files/*.vortex -> (urls, Some(object_store_metadata)) -pub async fn expand_glob>( - client_context: &ClientContext, - url_glob: T, -) -> VortexResult<(Vec, Option>)> { - let url_str = url_glob.as_ref(); - // We prefer using string prefix matching here over extracting a URL scheme - // as local files with an absolute path but without the file:// prefix can't - // be parsed into a URL. - match url_str { - s if s.starts_with("s3://") || s.starts_with("https://") || s.starts_with("http://") => { - duckdb_fs_glob(client_context, url_str).map(|urls| (urls, None)) - } - s if s.starts_with("gs://") => { - vortex_bail!("GCS glob expansion not yet implemented") - } - _ => local_filesystem::expand_glob(url_str), - } -} - -mod local_filesystem { - use super::*; - - /// Expand a glob pattern into a list of local disk URLs. - /// Returns URLs without metadata for simplicity and performance. - pub(super) fn expand_glob>( - url_glob: T, - ) -> VortexResult<(Vec, Option>)> { - let paths = glob::glob(url_glob.as_ref()) - .map_err(|e| vortex_err!("Failed to glob files: {}", e))? - .collect::, _>>() - .map_err(|e| vortex_err!("Failed to glob files: {}", e))?; - - let urls = paths - .into_iter() - .map(|p| { - let path_clone = p - .canonicalize() - .map_err(|_| vortex_err!("Cannot canonicalize file path: {:?}", p))?; - Url::from_file_path(&path_clone) - .map_err(|_| vortex_err!("Invalid file path: {:?}", path_clone)) - }) - .collect::, _>>()?; - - Ok((urls, None)) - } -} - -#[cfg(test)] -mod tests { - use std::env; - use std::fs::File; - use std::fs::{self}; - use std::path::PathBuf; - - use tempfile::TempDir; - - use super::*; - use crate::duckdb::Database; - - fn ctx_bundle() -> (Database, crate::duckdb::Connection, ClientContext) { - let db = Database::open_in_memory().unwrap(); - let conn = db.connect().unwrap(); - let ctx = conn.client_context().unwrap(); - (db, conn, ctx) - } - - fn set_glob_hook(hook: Option) { - let cell = DUCKDB_GLOB_HOOK.get_or_init(|| Mutex::new(None)); - let mut guard = cell.lock(); - *guard = hook; - } - - #[test] - fn test_expand_local_disk_glob_relative_path() { - let temp_dir = TempDir::new().unwrap(); - let file_path = "test.txt"; - - let original_dir = env::current_dir().unwrap(); - env::set_current_dir(temp_dir.path()).unwrap(); - - File::create(file_path).unwrap(); - let result = local_filesystem::expand_glob(file_path).unwrap(); - - assert_eq!(result.0.len(), 1); - assert_eq!( - result.0[0].to_file_path().unwrap(), - PathBuf::from(file_path).canonicalize().unwrap() - ); - - env::set_current_dir(&original_dir).unwrap(); - } - - #[test] - fn test_duckdb_glob_http_hook() { - let _guard = GLOB_TEST_GUARD.get_or_init(|| Mutex::new(())).lock(); - let url = Url::parse("http://example.com/data.vortex").unwrap(); - let expected = url.clone(); - set_glob_hook(Some(Box::new(move |_, _| Ok(vec![url.clone()])))); - - let (_db, _conn, ctx) = ctx_bundle(); - - let (urls, meta) = - futures::executor::block_on(expand_glob(&ctx, "http://example.com/data.vortex")) - .unwrap(); - assert_eq!(meta, None); - assert_eq!(urls.len(), 1); - assert_eq!(urls[0], expected); - - set_glob_hook(None); - } - - #[test] - fn test_expand_local_disk_glob_single_file() { - let temp_dir = TempDir::new().unwrap(); - let file_path = temp_dir.path().join("test.txt"); - File::create(&file_path).unwrap(); - - let glob_pattern = file_path.to_string_lossy().to_string(); - let result = local_filesystem::expand_glob(&glob_pattern).unwrap(); - - assert_eq!(result.0.len(), 1); - assert_eq!( - result.0[0].to_file_path().unwrap(), - file_path.canonicalize().unwrap() - ); - } - - #[test] - fn test_expand_local_disk_glob_wildcard() { - let temp_dir = TempDir::new().unwrap(); - - File::create(temp_dir.path().join("file1.txt")).unwrap(); - File::create(temp_dir.path().join("file2.txt")).unwrap(); - File::create(temp_dir.path().join("other.log")).unwrap(); - - let glob_pattern = format!("{}/*.txt", temp_dir.path().display()); - let result = local_filesystem::expand_glob(&glob_pattern).unwrap(); - - assert_eq!(result.0.len(), 2); - - let file_names: Vec = result - .0 - .iter() - .map(|url| { - url.to_file_path() - .unwrap() - .file_name() - .unwrap() - .to_string_lossy() - .to_string() - }) - .collect(); - - assert!(file_names.contains(&"file1.txt".to_string())); - assert!(file_names.contains(&"file2.txt".to_string())); - } - - #[test] - fn test_expand_local_disk_glob_no_matches() { - let temp_dir = TempDir::new().unwrap(); - let glob_pattern = format!("{}/*.nonexistent", temp_dir.path().display()); - let result = local_filesystem::expand_glob(&glob_pattern).unwrap(); - assert_eq!(result.0.len(), 0); - } - - #[test] - fn test_expand_local_disk_glob_subdirectories() { - let temp_dir = TempDir::new().unwrap(); - - // Create nested directory structure - let subdir = temp_dir.path().join("subdir"); - fs::create_dir(&subdir).unwrap(); - - File::create(temp_dir.path().join("root.txt")).unwrap(); - File::create(subdir.join("nested.txt")).unwrap(); - - let glob_pattern = format!("{}/**/*.txt", temp_dir.path().display()); - let result = local_filesystem::expand_glob(&glob_pattern).unwrap(); - - assert_eq!(result.0.len(), 2); - } -} diff --git a/vortex-duckdb/src/utils/mod.rs b/vortex-duckdb/src/utils/mod.rs deleted file mode 100644 index b5090ad8fd4..00000000000 --- a/vortex-duckdb/src/utils/mod.rs +++ /dev/null @@ -1,4 +0,0 @@ -// SPDX-License-Identifier: Apache-2.0 -// SPDX-FileCopyrightText: Copyright the Vortex contributors - -pub mod glob; From 3778b8f7726b36e2167903f3568311fcaaafa147 Mon Sep 17 00:00:00 2001 From: Ruoshi Li Date: Wed, 4 Feb 2026 22:06:40 -0800 Subject: [PATCH 5/7] Update vortex-duckdb/cpp/error.cpp Co-authored-by: Alexander Droste Signed-off-by: Ruoshi Li --- vortex-duckdb/cpp/error.cpp | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/vortex-duckdb/cpp/error.cpp b/vortex-duckdb/cpp/error.cpp index 112d721b336..513f94c746d 100644 --- a/vortex-duckdb/cpp/error.cpp +++ b/vortex-duckdb/cpp/error.cpp @@ -34,10 +34,7 @@ std::string IntoErrString(duckdb_vx_error error) { } void SetError(duckdb_vx_error *error_out, const std::string &message) { - assert(error_out && "SetError called with null error_out"); - if (!error_out) { - assert(error_out && "SetError called with null error_out"); - } +assert(error_out == nullptr && "SetError called with null error_out"); *error_out = duckdb_vx_error_create(message.data(), message.size()); } From 061124b253d9ba2aa44d1c9e30e8ce519ae823f9 Mon Sep 17 00:00:00 2001 From: Ruoshi Date: Thu, 5 Feb 2026 09:51:23 -0800 Subject: [PATCH 6/7] refactor: rename string list to URI list and simplify glob impl in scan.rs --- Cargo.lock | 1 - vortex-duckdb/Cargo.toml | 1 - vortex-duckdb/cpp/file_system.cpp | 8 +- .../cpp/include/duckdb_vx/file_system.h | 8 +- vortex-duckdb/src/duckdb/client_context.rs | 3 + vortex-duckdb/src/duckdb/file_system.rs | 16 +++- vortex-duckdb/src/scan.rs | 79 ++++++------------- 7 files changed, 50 insertions(+), 66 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index c16fdcb7e27..52b9d2a0113 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -10588,7 +10588,6 @@ dependencies = [ "cbindgen", "cc", "futures", - "glob", "itertools 0.14.0", "jiff", "num-traits", diff --git a/vortex-duckdb/Cargo.toml b/vortex-duckdb/Cargo.toml index 9dfb1edeba5..c41a1147931 100644 --- a/vortex-duckdb/Cargo.toml +++ b/vortex-duckdb/Cargo.toml @@ -28,7 +28,6 @@ anyhow = { workspace = true } async-fs = { workspace = true } bitvec = { workspace = true } futures = { workspace = true } -glob = { workspace = true } itertools = { workspace = true } num-traits = { workspace = true } parking_lot = { workspace = true } diff --git a/vortex-duckdb/cpp/file_system.cpp b/vortex-duckdb/cpp/file_system.cpp index 28ba173adbb..747f857d6e8 100644 --- a/vortex-duckdb/cpp/file_system.cpp +++ b/vortex-duckdb/cpp/file_system.cpp @@ -124,9 +124,9 @@ extern "C" duckdb_state duckdb_vx_fs_write(duckdb_vx_file_handle handle, idx_t o } } -extern "C" duckdb_vx_string_list duckdb_vx_fs_glob(duckdb_vx_client_context ctx, const char *pattern, - duckdb_vx_error *error_out) { - duckdb_vx_string_list result{nullptr, 0}; +extern "C" duckdb_vx_uri_list duckdb_vx_fs_glob(duckdb_vx_client_context ctx, const char *pattern, + duckdb_vx_error *error_out) { + duckdb_vx_uri_list result{nullptr, 0}; if (!ctx || !pattern) { SetError(error_out, "Invalid arguments to fs_glob"); @@ -159,7 +159,7 @@ extern "C" duckdb_vx_string_list duckdb_vx_fs_glob(duckdb_vx_client_context ctx, } } -extern "C" void duckdb_vx_string_list_free(duckdb_vx_string_list *list) { +extern "C" void duckdb_vx_uri_list_free(duckdb_vx_uri_list *list) { if (!list || !list->entries) { return; } diff --git a/vortex-duckdb/cpp/include/duckdb_vx/file_system.h b/vortex-duckdb/cpp/include/duckdb_vx/file_system.h index 59caff68d64..c6282c115ba 100644 --- a/vortex-duckdb/cpp/include/duckdb_vx/file_system.h +++ b/vortex-duckdb/cpp/include/duckdb_vx/file_system.h @@ -16,7 +16,7 @@ typedef struct duckdb_vx_file_handle_ *duckdb_vx_file_handle; typedef struct { const char **entries; size_t count; -} duckdb_vx_string_list; +} duckdb_vx_uri_list; // Open a file using DuckDB's filesystem (supports httpfs, s3, etc.). duckdb_vx_file_handle duckdb_vx_fs_open(duckdb_vx_client_context ctx, const char *path, @@ -34,11 +34,11 @@ duckdb_state duckdb_vx_fs_read(duckdb_vx_file_handle handle, idx_t offset, idx_t idx_t *out_len, duckdb_vx_error *error_out); // Expand a glob using DuckDB's filesystem. -duckdb_vx_string_list duckdb_vx_fs_glob(duckdb_vx_client_context ctx, const char *pattern, - duckdb_vx_error *error_out); +duckdb_vx_uri_list duckdb_vx_fs_glob(duckdb_vx_client_context ctx, const char *pattern, + duckdb_vx_error *error_out); // Free a string list allocated by duckdb_vx_fs_glob. -void duckdb_vx_string_list_free(duckdb_vx_string_list *list); +void duckdb_vx_uri_list_free(duckdb_vx_uri_list *list); // Create/truncate a file for writing using DuckDB's filesystem. duckdb_vx_file_handle duckdb_vx_fs_create(duckdb_vx_client_context ctx, const char *path, diff --git a/vortex-duckdb/src/duckdb/client_context.rs b/vortex-duckdb/src/duckdb/client_context.rs index 87b063c89b9..efb13236363 100644 --- a/vortex-duckdb/src/duckdb/client_context.rs +++ b/vortex-duckdb/src/duckdb/client_context.rs @@ -19,6 +19,9 @@ wrapper!( #[derive(Clone, Copy)] pub(crate) struct SendableClientContext(cpp::duckdb_vx_client_context); +// SAFETY: SendableClientContext carries the same opaque pointer as ClientContext. It is safe to +// send/share across threads under the same guarantees as ClientContext: the underlying DuckDB +// context is valid for the connection lifetime and DuckDB synchronizes internal state. unsafe impl Send for SendableClientContext {} unsafe impl Sync for SendableClientContext {} diff --git a/vortex-duckdb/src/duckdb/file_system.rs b/vortex-duckdb/src/duckdb/file_system.rs index 31b43b2fe0a..14f3439366d 100644 --- a/vortex-duckdb/src/duckdb/file_system.rs +++ b/vortex-duckdb/src/duckdb/file_system.rs @@ -64,12 +64,22 @@ pub fn duckdb_fs_glob(ctx: &ClientContext, pattern: &str) -> VortexResult url, + Err(parse_err) => { + let path = std::path::Path::new(entry_str.as_ref()); + let canonical = path + .canonicalize() + .map_err(|e| vortex_err!("Cannot canonicalize file path {path:?}: {e}"))?; + url::Url::from_file_path(&canonical).map_err(|_| { + vortex_err!("Invalid URL returned by DuckDB glob {entry_str}: {parse_err}") + })? + } + }; urls.push(url); } - unsafe { cpp::duckdb_vx_string_list_free(&raw mut list) }; + unsafe { cpp::duckdb_vx_uri_list_free(&raw mut list) }; Ok(urls) } diff --git a/vortex-duckdb/src/scan.rs b/vortex-duckdb/src/scan.rs index d42b7016930..b69428e2bc2 100644 --- a/vortex-duckdb/src/scan.rs +++ b/vortex-duckdb/src/scan.rs @@ -19,7 +19,6 @@ use futures::StreamExt; use futures::stream; use futures::stream::BoxStream; use futures::stream::SelectAll; -use glob::glob; use itertools::Itertools; use num_traits::AsPrimitive; use url::Url; @@ -60,6 +59,7 @@ use crate::duckdb::Cardinality; use crate::duckdb::ClientContext; use crate::duckdb::DataChunk; use crate::duckdb::DuckDbFsReader; +use crate::duckdb::Expression; use crate::duckdb::ExtractedValue; use crate::duckdb::LogicalType; use crate::duckdb::SendableClientContext; @@ -175,29 +175,29 @@ fn extract_table_filter_expr( init: &TableInitInput, column_ids: &[u64], ) -> VortexResult> { - let mut table_filter_exprs: HashSet = if let Some(filter) = init.table_filter_set() - { - filter - .into_iter() - .map(|(idx, ex)| { - let idx_u: usize = idx.as_(); - let col_idx: usize = column_ids[idx_u].as_(); - let name = init - .bind_data() - .column_names - .get(col_idx) - .vortex_expect("exists"); - try_from_table_filter( - &ex, - &col(name.as_str()), - init.bind_data().first_file.dtype(), - ) - }) - .collect::>>>()? - .unwrap_or_else(HashSet::new) - } else { - HashSet::new() - }; + let mut table_filter_exprs: HashSet = + if let Some(filter) = init.table_filter_set() { + filter + .into_iter() + .map(|(idx, ex)| { + let idx_u: usize = idx.as_(); + let col_idx: usize = column_ids[idx_u].as_(); + let name = init + .bind_data() + .column_names + .get(col_idx) + .vortex_expect("exists"); + try_from_table_filter( + &ex, + &col(name.as_str()), + init.bind_data().first_file.dtype(), + ) + }) + .collect::>>>()? + .unwrap_or_else(HashSet::new) + } else { + HashSet::new() + }; table_filter_exprs.extend(init.bind_data().filter_exprs.clone()); Ok(and_collect(table_filter_exprs.into_iter().collect_vec())) @@ -282,34 +282,7 @@ impl TableFunction for VortexTableFunction { tracing::trace!("running scan with max_threads {max_threads}"); let file_glob = file_glob_string.as_ref().as_string(); - let file_urls = match file_glob.as_str() { - s if s.starts_with("s3://") - || s.starts_with("https://") - || s.starts_with("http://") => - { - duckdb_fs_glob(ctx, s)? - } - s if s.starts_with("gs://") => { - vortex_bail!("GCS glob expansion not yet implemented") - } - _ => { - let paths = glob(file_glob.as_ref()) - .map_err(|e| vortex_err!("Failed to glob files: {e}"))? - .collect::, _>>() - .map_err(|e| vortex_err!("Failed to glob files: {e}"))?; - - paths - .into_iter() - .map(|path| { - let canonical = path.canonicalize().map_err(|e| { - vortex_err!("Cannot canonicalize file path {path:?}: {e}") - })?; - Url::from_file_path(&canonical) - .map_err(|_| vortex_err!("Invalid file path: {canonical:?}")) - }) - .collect::>>()? - } - }; + let file_urls = duckdb_fs_glob(ctx, file_glob.as_ref())?; // The first file is skipped in `create_file_paths_queue`. let Some(first_file_url) = file_urls.first() else { @@ -502,7 +475,7 @@ impl TableFunction for VortexTableFunction { fn pushdown_complex_filter( bind_data: &mut Self::BindData, - expr: &crate::duckdb::Expression, + expr: &Expression, ) -> VortexResult { let Some(expr) = try_from_bound_expression(expr)? else { return Ok(false); From 6234de1b89e30fc5823c567065909761b39af248 Mon Sep 17 00:00:00 2001 From: Ruoshi Date: Fri, 6 Feb 2026 11:37:13 -0800 Subject: [PATCH 7/7] fix: remove unused async_compat::Compat import --- vortex-duckdb/src/scan.rs | 1 - 1 file changed, 1 deletion(-) diff --git a/vortex-duckdb/src/scan.rs b/vortex-duckdb/src/scan.rs index 94080c24272..3fb6e248350 100644 --- a/vortex-duckdb/src/scan.rs +++ b/vortex-duckdb/src/scan.rs @@ -13,7 +13,6 @@ use std::sync::atomic::Ordering; use std::task::Context; use std::task::Poll; -use async_compat::Compat; use custom_labels::CURRENT_LABELSET; use futures::FutureExt; use futures::Stream;