diff --git a/Cargo.lock b/Cargo.lock index 4d5a430..729a53f 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1,6 +1,6 @@ # This file is automatically @generated by Cargo. # It is not intended for manual editing. -version = 3 +version = 4 [[package]] name = "addr2line" @@ -162,7 +162,7 @@ checksum = "6e0c28dcc82d7c8ead5cb13beb15405b57b8546e93215673ff8ca0349a028107" dependencies = [ "proc-macro2", "quote", - "syn 2.0.79", + "syn", ] [[package]] @@ -358,7 +358,7 @@ dependencies = [ "heck 0.5.0", "proc-macro2", "quote", - "syn 2.0.79", + "syn", ] [[package]] @@ -533,12 +533,6 @@ dependencies = [ "crypto-common", ] -[[package]] -name = "doc-comment" -version = "0.3.3" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "fea41bba32d969b513997752735605054bc0dfa92b4c56bf1189f2e174be7a10" - [[package]] name = "either" version = "1.9.0" @@ -560,7 +554,7 @@ dependencies = [ "heck 0.4.1", "proc-macro2", "quote", - "syn 2.0.79", + "syn", ] [[package]] @@ -716,7 +710,7 @@ checksum = "53b153fd91e4b0147f4aced87be237c98248656bb01050b96bf3ee89220a8ddb" dependencies = [ "proc-macro2", "quote", - "syn 2.0.79", + "syn", ] [[package]] @@ -1378,14 +1372,15 @@ dependencies = [ [[package]] name = "object_store" -version = "0.10.2" -source = "git+https://github.com/andrebsguedes/arrow-rs.git?tag=v0.10.2-beta1#3e15b9a308e29479bc33e4f06855227d93bf88a6" +version = "0.11.2" +source = "git+https://github.com/RelationalAI/arrow-rs.git?tag=v0.11.3-beta1#fa77acbd1e5e3acbf0824443b2c1d1df8609b457" dependencies = [ "async-trait", "base64", "bytes", "chrono", "futures", + "httparse", "humantime", "hyper", "itertools 0.13.0", @@ -1407,7 +1402,7 @@ dependencies = [ [[package]] name = "object_store_ffi" -version = "0.11.1" +version = "0.12.0" dependencies = [ "anyhow", "async-channel", @@ -1484,7 +1479,7 @@ checksum = "a948666b637a0f465e8564c73e89d4dde00d72d4d473cc972f390fc3dcee7d9c" dependencies = [ "proc-macro2", "quote", - "syn 2.0.79", + "syn", ] [[package]] @@ -1588,7 +1583,7 @@ checksum = "2f38a4412a78282e09a2cf38d195ea5420d15ba0602cb375210efbc877243965" dependencies = [ "proc-macro2", "quote", - "syn 2.0.79", + "syn", ] [[package]] @@ -1653,9 +1648,9 @@ checksum = "a1d01941d82fa2ab50be1e79e6714289dd7cde78eba4c074bc5a4374f650dfe0" [[package]] name = "quick-xml" -version = "0.36.1" +version = "0.37.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "96a05e2e8efddfa51a84ca47cec303fac86c8541b686d37cac5efc0e094417bc" +checksum = "165859e9e55f79d67b96c5d96f4e88b6f2695a1972849c15a6a3f5c59fc2c003" dependencies = [ "memchr", "serde", @@ -2041,7 +2036,7 @@ checksum = "a5831b979fd7b5439637af1752d535ff49f4860c0f341d1baeb6faf0f4242170" dependencies = [ "proc-macro2", "quote", - "syn 2.0.79", + "syn", ] [[package]] @@ -2118,24 +2113,23 @@ checksum = "3c5e1a9a646d36c3599cd173a41282daf47c44583ad367b8e6837255952e5c67" [[package]] name = "snafu" -version = "0.7.5" +version = "0.8.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e4de37ad025c587a29e8f3f5605c00f70b98715ef90b9061a815b9e59e9042d6" +checksum = "223891c85e2a29c3fe8fb900c1fae5e69c2e42415e3177752e8718475efa5019" dependencies = [ - "doc-comment", "snafu-derive", ] [[package]] name = "snafu-derive" -version = "0.7.5" +version = "0.8.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "990079665f075b699031e9c08fd3ab99be5029b96f3b78dc0709e8f77e4efebf" +checksum = "03c3c6b7927ffe7ecaa769ee0e3994da3b8cafc8f444578982c83ecb161af917" dependencies = [ - "heck 0.4.1", + "heck 0.5.0", "proc-macro2", "quote", - "syn 1.0.109", + "syn", ] [[package]] @@ -2169,17 +2163,6 @@ version = "2.6.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0d0208408ba0c3df17ed26eb06992cb1a1268d41b2c0e12e65203fbe3972cee5" -[[package]] -name = "syn" -version = "1.0.109" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "72b64191b275b66ffe2469e8af2c1cfe3bafa67b529ead792a6d0160888b4237" -dependencies = [ - "proc-macro2", - "quote", - "unicode-ident", -] - [[package]] name = "syn" version = "2.0.79" @@ -2226,7 +2209,7 @@ checksum = "266b2e40bc00e5a6c09c3584011e08b06f123c00362c92b975ba9843aaaa14b8" dependencies = [ "proc-macro2", "quote", - "syn 2.0.79", + "syn", ] [[package]] @@ -2290,7 +2273,7 @@ checksum = "5b8a1e28f2deaa14e508979454cb3a223b10b938b45af148bc0986de36f1923b" dependencies = [ "proc-macro2", "quote", - "syn 2.0.79", + "syn", ] [[package]] @@ -2364,7 +2347,7 @@ checksum = "34704c8d6ebcbc939824180af020566b01a7c01f80641264eba0999f6c2b6be7" dependencies = [ "proc-macro2", "quote", - "syn 2.0.79", + "syn", ] [[package]] @@ -2541,7 +2524,7 @@ dependencies = [ "once_cell", "proc-macro2", "quote", - "syn 2.0.79", + "syn", "wasm-bindgen-shared", ] @@ -2575,7 +2558,7 @@ checksum = "f0eb82fcb7930ae6219a7ecfd55b217f5f0893484b7a13022ebb2b2bf20b5283" dependencies = [ "proc-macro2", "quote", - "syn 2.0.79", + "syn", "wasm-bindgen-backend", "wasm-bindgen-shared", ] @@ -2833,7 +2816,7 @@ checksum = "fa4f8080344d4671fb4e831a13ad1e68092748387dfc4f55e356242fae12ce3e" dependencies = [ "proc-macro2", "quote", - "syn 2.0.79", + "syn", ] [[package]] diff --git a/Cargo.toml b/Cargo.toml index 82d1262..ecbd3e9 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "object_store_ffi" -version = "0.11.1" +version = "0.12.0" edition = "2021" [[bench]] @@ -43,7 +43,7 @@ futures-util = "0.3" reqwest = { version = "0.12", default-features = false, features = ["rustls-tls", "hickory-dns"] } # object_store = { version = "0.10.1", features = ["azure", "aws"] } # Pinned to a specific commit while waiting for upstream -object_store = { git = "https://github.com/andrebsguedes/arrow-rs.git", tag = "v0.10.2-beta1", features = ["azure", "aws", "experimental-azure-list-offset", "experimental-arbitrary-list-prefix"] } +object_store = { git = "https://github.com/RelationalAI/arrow-rs.git", tag = "v0.11.3-beta1", features = ["azure", "aws", "experimental-azure-list-offset", "experimental-arbitrary-list-prefix"] } hickory-resolver = "0.24" thiserror = "1" anyhow = { version = "1", features = ["backtrace"] } diff --git a/src/crud_ops.rs b/src/crud_ops.rs index c7bd5ae..4ea4c97 100644 --- a/src/crud_ops.rs +++ b/src/crud_ops.rs @@ -5,7 +5,7 @@ use ::metrics::counter; use object_store::{path::Path, ObjectStore}; use tokio_util::io::StreamReader; -use std::{ffi::{c_char, c_void}, sync::Arc}; +use std::{ffi::{c_char, c_void, CString}, sync::Arc}; use futures_util::{stream, StreamExt}; use tokio::io::{AsyncRead, AsyncReadExt, AsyncWriteExt}; @@ -44,6 +44,90 @@ impl RawResponse for Response { } } +// ================================================================================================ +// Boiler plate code for FFI structs +// Any non-copy fields of BulkFailedEntry must be properly destroyed on destroy_bulk_failed_entries +#[repr(C)] +pub struct BulkFailedEntry { + path: *const c_char, + error_message: *const c_char +} +unsafe impl Send for BulkFailedEntry {} + +// Only stores paths of entries that the bulk operation failed on +#[repr(C)] +pub struct BulkResponse { + result: CResult, + failed_entries: *const BulkFailedEntry, + failed_count: u64, + // Generic error message for the whole operation. Not null implies failed_count = 0 + error_message: *mut c_char, + context: *const Context +} + +unsafe impl Send for BulkResponse {} + +impl RawResponse for BulkResponse { + type Payload = Vec<(Path, crate::Error)>; + fn result_mut(&mut self) -> &mut CResult { + &mut self.result + } + fn context_mut(&mut self) -> &mut *const Context { + &mut self.context + } + fn error_message_mut(&mut self) -> &mut *mut c_char { + &mut self.error_message + } + fn set_payload(&mut self, payload: Option) { + match payload { + Some(entries) => { + let entries = entries.into_iter().map(|(path, error)| { + BulkFailedEntry::new(path, error.to_string()) + }).collect::>(); + let entries_slice = entries.into_boxed_slice(); + let entry_count = entries_slice.len() as u64; + let entries_ptr = entries_slice.as_ptr(); + std::mem::forget(entries_slice); + + self.failed_count = entry_count; + self.failed_entries = entries_ptr; + } + None => { + self.failed_entries = std::ptr::null(); + self.failed_count = 0; + } + } + } +} + +#[no_mangle] +pub extern "C" fn destroy_bulk_failed_entries( + entries: *mut BulkFailedEntry, + entry_count: u64 +) -> CResult { + let boxed_slice = unsafe { Box::from_raw(std::slice::from_raw_parts_mut(entries, entry_count as usize)) }; + for entry in &*boxed_slice { + // Safety: must properly drop all allocated fields from BulkFailedEntry here + let _ = unsafe { CString::from_raw(entry.path.cast_mut()) }; + let _ = unsafe { CString::from_raw(entry.error_message.cast_mut()) }; + } + CResult::Ok +} + +impl BulkFailedEntry { + pub fn new(path: Path, error_msg:String) -> Self { + BulkFailedEntry { + path: CString::new(path.to_string()) + .expect("should not have nulls") + .into_raw(), + error_message: CString::new(error_msg.to_string()) + .expect("should not have nulls") + .into_raw() + } + } +} +// ================================================================================================ + async fn read_to_slice(reader: &mut BoxedReader, mut slice: &mut [u8]) -> crate::Result { let mut received_bytes = 0; loop { @@ -143,6 +227,53 @@ impl Client { counter!(metrics::total_delete_ops).increment(1); with_retries!(self, self.delete_impl(path).await) } + + async fn bulk_delete_impl(&self, paths: &Vec) -> crate::Result> { + let stream = stream::iter(paths.iter().map(|path| Ok(path.clone()))).boxed(); + let results = self.store.delete_stream(stream) + .collect::>().await; + // We count the number of results to raise an error if some paths were not + // successfully processed at all. + let num_results = results.len(); + let failures = results + .into_iter() + .enumerate() + .filter_map(|(index, result)| { + match result { + Ok(_) => { + None + }, + Err(e) => match e { + // We treat not found as success because AWS S3 does not return an error + // if the object does not exist + object_store::Error::NotFound { .. } => { + None + }, + _ => { + Some((paths[index].clone(), e.into())) + } + }, + } + }).collect::>(); + + // Rail guard to catch generic errors + if num_results < paths.len() { + if num_results == 0 { + tracing::warn!("delete_stream returned zero results"); + Err(crate::Error::invalid_response("Some paths were not deleted")) + } else { + let error_string = failures[0].1.to_string(); + tracing::warn!("delete_stream returned a single generic error: {}", error_string); + Err(crate::Error::invalid_response(error_string)) + } + } else { + Ok(failures) + } + } + pub async fn bulk_delete(&self, paths: Vec) -> crate::Result> { + counter!(metrics::total_bulk_delete_ops).increment(1); + with_retries!(self, self.bulk_delete_impl(&paths).await) + } async fn multipart_get_impl(&self, path: &Path, slice: &mut [u8]) -> crate::Result { let _guard = duration_on_drop!(metrics::multipart_get_attempt_duration); let result = self.store.get_opts( @@ -253,3 +384,16 @@ export_queued_op!( }, path: *const c_char ); + +export_queued_op!( + bulk_delete, + BulkResponse, + |config, response| { + let slice: &[*const c_char] = unsafe { std::slice::from_raw_parts(path_c_array, num_paths) }; + let paths_vec: Vec = slice.iter().map(|ptr| { + unsafe { cstr_to_path(std::ffi::CStr::from_ptr(ptr.clone())) } + }).collect(); + Ok(Request::BulkDelete(paths_vec, config, response)) + }, + path_c_array: *const *const c_char, num_paths: usize +); diff --git a/src/lib.rs b/src/lib.rs index 16e5c2d..f9360c6 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -32,7 +32,7 @@ mod list; use list::{ListResponse, ListStreamResponse}; mod crud_ops; -use crud_ops::Response; +use crud_ops::{BulkResponse, Response}; mod stream; use stream::{GetStreamResponse, PutStreamResponse}; @@ -193,6 +193,7 @@ enum Request { Get(Path, &'static mut [u8], &'static RawConfig, ResponseGuard), Put(Path, &'static [u8], &'static RawConfig, ResponseGuard), Delete(Path, &'static RawConfig, ResponseGuard), + BulkDelete(Vec, &'static RawConfig, ResponseGuard), List(Path, Option, &'static RawConfig, ResponseGuard), ListStream(Path, Option, &'static RawConfig, ResponseGuard), GetStream(Path, usize, Compression, &'static RawConfig, ResponseGuard), @@ -207,6 +208,7 @@ impl Request { Request::Get( .. , response) => response.cancelled(), Request::Put( .. , response) => response.cancelled(), Request::Delete( .. , response) => response.cancelled(), + Request::BulkDelete( .. , response) => response.cancelled(), Request::List( .. , response) => response.cancelled(), Request::ListStream( .. , response) => response.cancelled(), Request::GetStream( .. , response) => response.cancelled(), @@ -218,6 +220,7 @@ impl Request { Request::Get( .. , response) => response.into_error(error), Request::Put( .. , response) => response.into_error(error), Request::Delete( .. , response) => response.into_error(error), + Request::BulkDelete( .. , response) => response.into_error(error), Request::List( .. , response) => response.into_error(error), Request::ListStream( .. , response) => response.into_error(error), Request::GetStream( .. , response) => response.into_error(error), @@ -229,6 +232,7 @@ impl Request { Request::Get( .. , config, _response) => config, Request::Put( .. , config, _response) => config, Request::Delete( .. , config, _response) => config, + Request::BulkDelete( .. , config, _response) => config, Request::List( .. , config, _response) => config, Request::ListStream( .. , config, _response) => config, Request::GetStream( .. , config, _response) => config, @@ -849,6 +853,9 @@ pub extern "C" fn start( Request::Delete(path, _config, response) => { with_cancellation!(client.delete(&path), response, false); } + Request::BulkDelete(paths, _config, response) => { + with_cancellation!(client.bulk_delete(paths), response, false); + } Request::List(prefix, offset, _config, response) => { with_cancellation!(client.list(&prefix, offset.as_ref()), response); } diff --git a/src/metrics.rs b/src/metrics.rs index f735114..66981fe 100644 --- a/src/metrics.rs +++ b/src/metrics.rs @@ -131,6 +131,7 @@ declare_metrics! { Counter, total_get_ops, Unit::Count, "Total amount of object GET operations"; Counter, total_put_ops, Unit::Count, "Total amount of object PUT operations"; Counter, total_delete_ops, Unit::Count, "Total amount of object DELETE operations"; + Counter, total_bulk_delete_ops, Unit::Count, "Total amount of object bulk DELETE operations"; Counter, total_keyring_get, Unit::Count, "Total amount of key fetches from in-memory keyring"; Counter, total_keyring_miss, Unit::Count, "Total amount of misses while fetching key from keyring"; Counter, total_fetch_upload_info, Unit::Count, "Total amount of Snowflake stage info requests";