From 9353561ff6873d5ffb062ede3731190beb65b71a Mon Sep 17 00:00:00 2001 From: Adnan Alhomssi Date: Wed, 5 Feb 2025 00:35:58 +0100 Subject: [PATCH 1/9] draft --- src/crud_ops.rs | 129 +++++++++++++++++++++++++++++++++++++++++++++++- src/lib.rs | 9 +++- src/metrics.rs | 1 + 3 files changed, 137 insertions(+), 2 deletions(-) diff --git a/src/crud_ops.rs b/src/crud_ops.rs index c7bd5ae..0263c52 100644 --- a/src/crud_ops.rs +++ b/src/crud_ops.rs @@ -5,7 +5,7 @@ use ::metrics::counter; use object_store::{path::Path, ObjectStore}; use tokio_util::io::StreamReader; -use std::{ffi::{c_char, c_void}, sync::Arc}; +use std::{ffi::{c_char, c_void, CString}, sync::{atomic::{AtomicUsize, Ordering}, Arc}}; use futures_util::{stream, StreamExt}; use tokio::io::{AsyncRead, AsyncReadExt, AsyncWriteExt}; @@ -44,6 +44,85 @@ impl RawResponse for Response { } } +// ================================================================================================ +// Boiler plate code for FFI structs +// Any non-copy fields of ListEntry must be properly destroyed on destroy_list_entries +#[repr(C)] +pub struct BulkFailedEntry { + path: *const c_char, + error_message: *const c_char +} +unsafe impl Send for BulkFailedEntry {} + +// Only stores paths of entries that the bulk operation failed on +#[repr(C)] +pub struct BulkResponse { + result: CResult, + failed_entries: *const BulkFailedEntry, + failed_count: u64, + error_message: *mut c_char, + context: *const Context +} + +unsafe impl Send for BulkResponse {} + +impl RawResponse for BulkResponse { + type Payload = Vec; + fn result_mut(&mut self) -> &mut CResult { + &mut self.result + } + fn context_mut(&mut self) -> &mut *const Context { + &mut self.context + } + fn error_message_mut(&mut self) -> &mut *mut c_char { + &mut self.error_message + } + fn set_payload(&mut self, payload: Option) { + match payload { + Some(entries) => { + let entries_slice = entries.into_boxed_slice(); + let entry_count = entries_slice.len() as u64; + let entries_ptr = entries_slice.as_ptr(); + std::mem::forget(entries_slice); + + self.failed_count = entry_count; + self.failed_entries = entries_ptr; + } + None => { + self.failed_entries = std::ptr::null(); + self.failed_count = 0; + } + } + } +} + +#[no_mangle] +pub extern "C" fn destroy_bulk_failed_entries( + entries: *mut BulkFailedEntry, + entry_count: u64 +) -> CResult { + let boxed_slice = unsafe { Box::from_raw(std::slice::from_raw_parts_mut(entries, entry_count as usize)) }; + for entry in &*boxed_slice { + // Safety: must properly drop all allocated fields from ListEntry here + let _ = unsafe { CString::from_raw(entry.path.cast_mut()) }; + } + CResult::Ok +} + +impl BulkFailedEntry { + pub fn new(path: Path, error_msg:String) -> Self { + BulkFailedEntry { + path: CString::new(path.to_string()) + .expect("should not have nulls") + .into_raw(), + error_message: CString::new(error_msg.to_string()) + .expect("should not have nulls") + .into_raw() + } + } +} +// ================================================================================================ + async fn read_to_slice(reader: &mut BoxedReader, mut slice: &mut [u8]) -> crate::Result { let mut received_bytes = 0; loop { @@ -143,6 +222,38 @@ impl Client { counter!(metrics::total_delete_ops).increment(1); with_retries!(self, self.delete_impl(path).await) } + async fn bulk_delete_impl(&self, paths: &Vec) -> crate::Result> { + let stream = stream::iter(paths.iter().map(|path| Ok(path.clone()))).boxed(); + // Counter to keep track of the index of the path that failed to delete + let counter = Arc::new(AtomicUsize::new(0)); + let bulk_failed_entries = self.store.delete_stream(stream) + .filter_map(|result| async { + let counter_clone = Arc::clone(&counter); + let index = counter_clone.fetch_add(1, Ordering::SeqCst); + match result { + Ok(_) => None, + Err(e) => match e { + // We treat not found as success because AWS S3 does not return an error + // if the object does not exist + object_store::Error::NotFound { path: _, source: _ } => { + None + }, + _ => { + Some(BulkFailedEntry::new( + paths[index].clone(), e.to_string()) + ) + } + }, + } + }) + .collect::>() + .await; + Ok(bulk_failed_entries) + } + pub async fn bulk_delete(&self, paths: Vec) -> crate::Result> { + counter!(metrics::total_bulk_delete_ops).increment(1); + with_retries!(self, self.bulk_delete_impl(&paths).await) + } async fn multipart_get_impl(&self, path: &Path, slice: &mut [u8]) -> crate::Result { let _guard = duration_on_drop!(metrics::multipart_get_attempt_duration); let result = self.store.get_opts( @@ -253,3 +364,19 @@ export_queued_op!( }, path: *const c_char ); + +export_queued_op!( + bulk_delete, + BulkResponse, + |config, response| { + let mut paths_vec:Vec = Vec::new(); + for i in 0..num_paths as isize { + let path_ptr = unsafe { *path_c_array.offset(i)}; + let path = unsafe { std::ffi::CStr::from_ptr(path_ptr) }; + let path = unsafe { cstr_to_path(path) }; + paths_vec.push(path); + } + Ok(Request::BulkDelete(paths_vec, config, response)) + }, + path_c_array: *const *const c_char, num_paths: usize +); diff --git a/src/lib.rs b/src/lib.rs index 16e5c2d..f9360c6 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -32,7 +32,7 @@ mod list; use list::{ListResponse, ListStreamResponse}; mod crud_ops; -use crud_ops::Response; +use crud_ops::{BulkResponse, Response}; mod stream; use stream::{GetStreamResponse, PutStreamResponse}; @@ -193,6 +193,7 @@ enum Request { Get(Path, &'static mut [u8], &'static RawConfig, ResponseGuard), Put(Path, &'static [u8], &'static RawConfig, ResponseGuard), Delete(Path, &'static RawConfig, ResponseGuard), + BulkDelete(Vec, &'static RawConfig, ResponseGuard), List(Path, Option, &'static RawConfig, ResponseGuard), ListStream(Path, Option, &'static RawConfig, ResponseGuard), GetStream(Path, usize, Compression, &'static RawConfig, ResponseGuard), @@ -207,6 +208,7 @@ impl Request { Request::Get( .. , response) => response.cancelled(), Request::Put( .. , response) => response.cancelled(), Request::Delete( .. , response) => response.cancelled(), + Request::BulkDelete( .. , response) => response.cancelled(), Request::List( .. , response) => response.cancelled(), Request::ListStream( .. , response) => response.cancelled(), Request::GetStream( .. , response) => response.cancelled(), @@ -218,6 +220,7 @@ impl Request { Request::Get( .. , response) => response.into_error(error), Request::Put( .. , response) => response.into_error(error), Request::Delete( .. , response) => response.into_error(error), + Request::BulkDelete( .. , response) => response.into_error(error), Request::List( .. , response) => response.into_error(error), Request::ListStream( .. , response) => response.into_error(error), Request::GetStream( .. , response) => response.into_error(error), @@ -229,6 +232,7 @@ impl Request { Request::Get( .. , config, _response) => config, Request::Put( .. , config, _response) => config, Request::Delete( .. , config, _response) => config, + Request::BulkDelete( .. , config, _response) => config, Request::List( .. , config, _response) => config, Request::ListStream( .. , config, _response) => config, Request::GetStream( .. , config, _response) => config, @@ -849,6 +853,9 @@ pub extern "C" fn start( Request::Delete(path, _config, response) => { with_cancellation!(client.delete(&path), response, false); } + Request::BulkDelete(paths, _config, response) => { + with_cancellation!(client.bulk_delete(paths), response, false); + } Request::List(prefix, offset, _config, response) => { with_cancellation!(client.list(&prefix, offset.as_ref()), response); } diff --git a/src/metrics.rs b/src/metrics.rs index f735114..66981fe 100644 --- a/src/metrics.rs +++ b/src/metrics.rs @@ -131,6 +131,7 @@ declare_metrics! { Counter, total_get_ops, Unit::Count, "Total amount of object GET operations"; Counter, total_put_ops, Unit::Count, "Total amount of object PUT operations"; Counter, total_delete_ops, Unit::Count, "Total amount of object DELETE operations"; + Counter, total_bulk_delete_ops, Unit::Count, "Total amount of object bulk DELETE operations"; Counter, total_keyring_get, Unit::Count, "Total amount of key fetches from in-memory keyring"; Counter, total_keyring_miss, Unit::Count, "Total amount of misses while fetching key from keyring"; Counter, total_fetch_upload_info, Unit::Count, "Total amount of Snowflake stage info requests"; From 4a6752c1624e2602ede8a4a9de96f064ad8deaee Mon Sep 17 00:00:00 2001 From: Adnan Alhomssi Date: Fri, 7 Feb 2025 10:49:10 +0100 Subject: [PATCH 2/9] . --- src/crud_ops.rs | 25 ++++++++++++++++--------- 1 file changed, 16 insertions(+), 9 deletions(-) diff --git a/src/crud_ops.rs b/src/crud_ops.rs index 0263c52..a0cb049 100644 --- a/src/crud_ops.rs +++ b/src/crud_ops.rs @@ -67,7 +67,7 @@ pub struct BulkResponse { unsafe impl Send for BulkResponse {} impl RawResponse for BulkResponse { - type Payload = Vec; + type Payload = Vec<(Path, String)>; fn result_mut(&mut self) -> &mut CResult { &mut self.result } @@ -80,6 +80,9 @@ impl RawResponse for BulkResponse { fn set_payload(&mut self, payload: Option) { match payload { Some(entries) => { + let entries = entries.into_iter().map(|(path, error_msg)| { + BulkFailedEntry::new(path, error_msg) + }).collect::>(); let entries_slice = entries.into_boxed_slice(); let entry_count = entries_slice.len() as u64; let entries_ptr = entries_slice.as_ptr(); @@ -222,7 +225,8 @@ impl Client { counter!(metrics::total_delete_ops).increment(1); with_retries!(self, self.delete_impl(path).await) } - async fn bulk_delete_impl(&self, paths: &Vec) -> crate::Result> { + + async fn bulk_delete_impl(&self, paths: &Vec) -> crate::Result> { let stream = stream::iter(paths.iter().map(|path| Ok(path.clone()))).boxed(); // Counter to keep track of the index of the path that failed to delete let counter = Arc::new(AtomicUsize::new(0)); @@ -231,26 +235,29 @@ impl Client { let counter_clone = Arc::clone(&counter); let index = counter_clone.fetch_add(1, Ordering::SeqCst); match result { - Ok(_) => None, + Ok(path) => { + println!("H2 Path found: {}", path); + None + }, Err(e) => match e { // We treat not found as success because AWS S3 does not return an error // if the object does not exist - object_store::Error::NotFound { path: _, source: _ } => { + object_store::Error::NotFound { path: _, source } => { + println!("H3 Path not found: {} {}", paths[index], source); None }, _ => { - Some(BulkFailedEntry::new( - paths[index].clone(), e.to_string()) - ) + println!("H2 Path error: {} \n {}", paths[index].clone(), e.to_string()); + Some((paths[index].clone(), e.to_string())) } }, } }) - .collect::>() + .collect::>() .await; Ok(bulk_failed_entries) } - pub async fn bulk_delete(&self, paths: Vec) -> crate::Result> { + pub async fn bulk_delete(&self, paths: Vec) -> crate::Result> { counter!(metrics::total_bulk_delete_ops).increment(1); with_retries!(self, self.bulk_delete_impl(&paths).await) } From 25e8c2ea1548f2ca6294cc549455d4fb783199b7 Mon Sep 17 00:00:00 2001 From: Adnan Alhomssi Date: Fri, 7 Feb 2025 18:07:14 +0100 Subject: [PATCH 3/9] * update object_store * add railguard for cases when num of callbacks < provided paths * cleanup --- Cargo.lock | 67 ++++++++++++++++++------------------------------- Cargo.toml | 2 +- src/crud_ops.rs | 15 ++++++++--- 3 files changed, 37 insertions(+), 47 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 4d5a430..f7831be 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1,6 +1,6 @@ # This file is automatically @generated by Cargo. # It is not intended for manual editing. -version = 3 +version = 4 [[package]] name = "addr2line" @@ -162,7 +162,7 @@ checksum = "6e0c28dcc82d7c8ead5cb13beb15405b57b8546e93215673ff8ca0349a028107" dependencies = [ "proc-macro2", "quote", - "syn 2.0.79", + "syn", ] [[package]] @@ -358,7 +358,7 @@ dependencies = [ "heck 0.5.0", "proc-macro2", "quote", - "syn 2.0.79", + "syn", ] [[package]] @@ -533,12 +533,6 @@ dependencies = [ "crypto-common", ] -[[package]] -name = "doc-comment" -version = "0.3.3" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "fea41bba32d969b513997752735605054bc0dfa92b4c56bf1189f2e174be7a10" - [[package]] name = "either" version = "1.9.0" @@ -560,7 +554,7 @@ dependencies = [ "heck 0.4.1", "proc-macro2", "quote", - "syn 2.0.79", + "syn", ] [[package]] @@ -716,7 +710,7 @@ checksum = "53b153fd91e4b0147f4aced87be237c98248656bb01050b96bf3ee89220a8ddb" dependencies = [ "proc-macro2", "quote", - "syn 2.0.79", + "syn", ] [[package]] @@ -1378,14 +1372,15 @@ dependencies = [ [[package]] name = "object_store" -version = "0.10.2" -source = "git+https://github.com/andrebsguedes/arrow-rs.git?tag=v0.10.2-beta1#3e15b9a308e29479bc33e4f06855227d93bf88a6" +version = "0.11.2" +source = "git+https://github.com/RelationalAI/arrow-rs.git?branch=object_store_0.11.3-beta1#fa77acbd1e5e3acbf0824443b2c1d1df8609b457" dependencies = [ "async-trait", "base64", "bytes", "chrono", "futures", + "httparse", "humantime", "hyper", "itertools 0.13.0", @@ -1484,7 +1479,7 @@ checksum = "a948666b637a0f465e8564c73e89d4dde00d72d4d473cc972f390fc3dcee7d9c" dependencies = [ "proc-macro2", "quote", - "syn 2.0.79", + "syn", ] [[package]] @@ -1588,7 +1583,7 @@ checksum = "2f38a4412a78282e09a2cf38d195ea5420d15ba0602cb375210efbc877243965" dependencies = [ "proc-macro2", "quote", - "syn 2.0.79", + "syn", ] [[package]] @@ -1653,9 +1648,9 @@ checksum = "a1d01941d82fa2ab50be1e79e6714289dd7cde78eba4c074bc5a4374f650dfe0" [[package]] name = "quick-xml" -version = "0.36.1" +version = "0.37.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "96a05e2e8efddfa51a84ca47cec303fac86c8541b686d37cac5efc0e094417bc" +checksum = "165859e9e55f79d67b96c5d96f4e88b6f2695a1972849c15a6a3f5c59fc2c003" dependencies = [ "memchr", "serde", @@ -2041,7 +2036,7 @@ checksum = "a5831b979fd7b5439637af1752d535ff49f4860c0f341d1baeb6faf0f4242170" dependencies = [ "proc-macro2", "quote", - "syn 2.0.79", + "syn", ] [[package]] @@ -2118,24 +2113,23 @@ checksum = "3c5e1a9a646d36c3599cd173a41282daf47c44583ad367b8e6837255952e5c67" [[package]] name = "snafu" -version = "0.7.5" +version = "0.8.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e4de37ad025c587a29e8f3f5605c00f70b98715ef90b9061a815b9e59e9042d6" +checksum = "223891c85e2a29c3fe8fb900c1fae5e69c2e42415e3177752e8718475efa5019" dependencies = [ - "doc-comment", "snafu-derive", ] [[package]] name = "snafu-derive" -version = "0.7.5" +version = "0.8.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "990079665f075b699031e9c08fd3ab99be5029b96f3b78dc0709e8f77e4efebf" +checksum = "03c3c6b7927ffe7ecaa769ee0e3994da3b8cafc8f444578982c83ecb161af917" dependencies = [ - "heck 0.4.1", + "heck 0.5.0", "proc-macro2", "quote", - "syn 1.0.109", + "syn", ] [[package]] @@ -2169,17 +2163,6 @@ version = "2.6.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0d0208408ba0c3df17ed26eb06992cb1a1268d41b2c0e12e65203fbe3972cee5" -[[package]] -name = "syn" -version = "1.0.109" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "72b64191b275b66ffe2469e8af2c1cfe3bafa67b529ead792a6d0160888b4237" -dependencies = [ - "proc-macro2", - "quote", - "unicode-ident", -] - [[package]] name = "syn" version = "2.0.79" @@ -2226,7 +2209,7 @@ checksum = "266b2e40bc00e5a6c09c3584011e08b06f123c00362c92b975ba9843aaaa14b8" dependencies = [ "proc-macro2", "quote", - "syn 2.0.79", + "syn", ] [[package]] @@ -2290,7 +2273,7 @@ checksum = "5b8a1e28f2deaa14e508979454cb3a223b10b938b45af148bc0986de36f1923b" dependencies = [ "proc-macro2", "quote", - "syn 2.0.79", + "syn", ] [[package]] @@ -2364,7 +2347,7 @@ checksum = "34704c8d6ebcbc939824180af020566b01a7c01f80641264eba0999f6c2b6be7" dependencies = [ "proc-macro2", "quote", - "syn 2.0.79", + "syn", ] [[package]] @@ -2541,7 +2524,7 @@ dependencies = [ "once_cell", "proc-macro2", "quote", - "syn 2.0.79", + "syn", "wasm-bindgen-shared", ] @@ -2575,7 +2558,7 @@ checksum = "f0eb82fcb7930ae6219a7ecfd55b217f5f0893484b7a13022ebb2b2bf20b5283" dependencies = [ "proc-macro2", "quote", - "syn 2.0.79", + "syn", "wasm-bindgen-backend", "wasm-bindgen-shared", ] @@ -2833,7 +2816,7 @@ checksum = "fa4f8080344d4671fb4e831a13ad1e68092748387dfc4f55e356242fae12ce3e" dependencies = [ "proc-macro2", "quote", - "syn 2.0.79", + "syn", ] [[package]] diff --git a/Cargo.toml b/Cargo.toml index 82d1262..d90d162 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -43,7 +43,7 @@ futures-util = "0.3" reqwest = { version = "0.12", default-features = false, features = ["rustls-tls", "hickory-dns"] } # object_store = { version = "0.10.1", features = ["azure", "aws"] } # Pinned to a specific commit while waiting for upstream -object_store = { git = "https://github.com/andrebsguedes/arrow-rs.git", tag = "v0.10.2-beta1", features = ["azure", "aws", "experimental-azure-list-offset", "experimental-arbitrary-list-prefix"] } +object_store = { git = "https://github.com/RelationalAI/arrow-rs.git", branch = "object_store_0.11.3-beta1", features = ["azure", "aws", "experimental-azure-list-offset", "experimental-arbitrary-list-prefix"] } hickory-resolver = "0.24" thiserror = "1" anyhow = { version = "1", features = ["backtrace"] } diff --git a/src/crud_ops.rs b/src/crud_ops.rs index a0cb049..829ec17 100644 --- a/src/crud_ops.rs +++ b/src/crud_ops.rs @@ -236,18 +236,15 @@ impl Client { let index = counter_clone.fetch_add(1, Ordering::SeqCst); match result { Ok(path) => { - println!("H2 Path found: {}", path); None }, Err(e) => match e { // We treat not found as success because AWS S3 does not return an error // if the object does not exist object_store::Error::NotFound { path: _, source } => { - println!("H3 Path not found: {} {}", paths[index], source); None }, _ => { - println!("H2 Path error: {} \n {}", paths[index].clone(), e.to_string()); Some((paths[index].clone(), e.to_string())) } }, @@ -255,7 +252,17 @@ impl Client { }) .collect::>() .await; - Ok(bulk_failed_entries) + // Rail guard to catch generic errors + let callbacks_called = counter.load(Ordering::SeqCst); + if callbacks_called < paths.len() { + if callbacks_called == 0 { + Err(crate::Error::invalid_response("Some paths were not deleted")) + } else { + Err(crate::Error::invalid_response(bulk_failed_entries[0].1.clone())) + } + } else { + Ok(bulk_failed_entries) + } } pub async fn bulk_delete(&self, paths: Vec) -> crate::Result> { counter!(metrics::total_bulk_delete_ops).increment(1); From d654670919c22366d57f0d3699fa5298af25aa9d Mon Sep 17 00:00:00 2001 From: Adnan Alhomssi Date: Mon, 10 Feb 2025 19:29:10 +0100 Subject: [PATCH 4/9] PR feedback --- src/crud_ops.rs | 75 ++++++++++++++++++++++++------------------------- 1 file changed, 37 insertions(+), 38 deletions(-) diff --git a/src/crud_ops.rs b/src/crud_ops.rs index 829ec17..64732c0 100644 --- a/src/crud_ops.rs +++ b/src/crud_ops.rs @@ -46,7 +46,7 @@ impl RawResponse for Response { // ================================================================================================ // Boiler plate code for FFI structs -// Any non-copy fields of ListEntry must be properly destroyed on destroy_list_entries +// Any non-copy fields of BulkFailedEntry must be properly destroyed on destroy_bulk_failed_entries #[repr(C)] pub struct BulkFailedEntry { path: *const c_char, @@ -67,7 +67,7 @@ pub struct BulkResponse { unsafe impl Send for BulkResponse {} impl RawResponse for BulkResponse { - type Payload = Vec<(Path, String)>; + type Payload = Vec<(Path, crate::Error)>; fn result_mut(&mut self) -> &mut CResult { &mut self.result } @@ -80,8 +80,8 @@ impl RawResponse for BulkResponse { fn set_payload(&mut self, payload: Option) { match payload { Some(entries) => { - let entries = entries.into_iter().map(|(path, error_msg)| { - BulkFailedEntry::new(path, error_msg) + let entries = entries.into_iter().map(|(path, error)| { + BulkFailedEntry::new(path, error.to_string()) }).collect::>(); let entries_slice = entries.into_boxed_slice(); let entry_count = entries_slice.len() as u64; @@ -106,7 +106,7 @@ pub extern "C" fn destroy_bulk_failed_entries( ) -> CResult { let boxed_slice = unsafe { Box::from_raw(std::slice::from_raw_parts_mut(entries, entry_count as usize)) }; for entry in &*boxed_slice { - // Safety: must properly drop all allocated fields from ListEntry here + // Safety: must properly drop all allocated fields from BulkFailedEntry here let _ = unsafe { CString::from_raw(entry.path.cast_mut()) }; } CResult::Ok @@ -226,45 +226,47 @@ impl Client { with_retries!(self, self.delete_impl(path).await) } - async fn bulk_delete_impl(&self, paths: &Vec) -> crate::Result> { + async fn bulk_delete_impl(&self, paths: &Vec) -> crate::Result> { let stream = stream::iter(paths.iter().map(|path| Ok(path.clone()))).boxed(); - // Counter to keep track of the index of the path that failed to delete - let counter = Arc::new(AtomicUsize::new(0)); - let bulk_failed_entries = self.store.delete_stream(stream) - .filter_map(|result| async { - let counter_clone = Arc::clone(&counter); - let index = counter_clone.fetch_add(1, Ordering::SeqCst); - match result { - Ok(path) => { + let indexed_results = self.store.delete_stream(stream) + .enumerate() + .filter_map(|(index, result)| async move { + Some((index, result)) + }).collect::)>>().await; + // We need to count the number of callbacks called to determine if we need to raise an error + // if some paths were not processed at all. + let callbacks_called = indexed_results.len(); + let bulk_failed_entries = indexed_results.into_iter().filter_map(|(index, result)| { + match result { + Ok(_) => { + None + }, + Err(e) => match e { + // We treat not found as success because AWS S3 does not return an error + // if the object does not exist + object_store::Error::NotFound { .. } => { None }, - Err(e) => match e { - // We treat not found as success because AWS S3 does not return an error - // if the object does not exist - object_store::Error::NotFound { path: _, source } => { - None - }, - _ => { - Some((paths[index].clone(), e.to_string())) - } - }, - } - }) - .collect::>() - .await; + _ => { + Some((paths[index].clone(), e.into())) + } + }, + } + }).collect::>(); + // Rail guard to catch generic errors - let callbacks_called = counter.load(Ordering::SeqCst); if callbacks_called < paths.len() { if callbacks_called == 0 { Err(crate::Error::invalid_response("Some paths were not deleted")) } else { - Err(crate::Error::invalid_response(bulk_failed_entries[0].1.clone())) + let error_string = bulk_failed_entries[0].1.to_string(); + Err(crate::Error::invalid_response(error_string)) } } else { Ok(bulk_failed_entries) } } - pub async fn bulk_delete(&self, paths: Vec) -> crate::Result> { + pub async fn bulk_delete(&self, paths: Vec) -> crate::Result> { counter!(metrics::total_bulk_delete_ops).increment(1); with_retries!(self, self.bulk_delete_impl(&paths).await) } @@ -383,13 +385,10 @@ export_queued_op!( bulk_delete, BulkResponse, |config, response| { - let mut paths_vec:Vec = Vec::new(); - for i in 0..num_paths as isize { - let path_ptr = unsafe { *path_c_array.offset(i)}; - let path = unsafe { std::ffi::CStr::from_ptr(path_ptr) }; - let path = unsafe { cstr_to_path(path) }; - paths_vec.push(path); - } + let slice: &[*const c_char] = unsafe { std::slice::from_raw_parts(path_c_array, num_paths) }; + let paths_vec: Vec = slice.iter().map(|ptr| { + unsafe { cstr_to_path(std::ffi::CStr::from_ptr(ptr.clone())) } + }).collect(); Ok(Request::BulkDelete(paths_vec, config, response)) }, path_c_array: *const *const c_char, num_paths: usize From a25a1085af057abe7a6f098be988fe14641251e3 Mon Sep 17 00:00:00 2001 From: Adnan Alhomssi Date: Tue, 11 Feb 2025 10:55:43 +0100 Subject: [PATCH 5/9] PR feedback --- src/crud_ops.rs | 50 ++++++++++++++++++++++++------------------------- 1 file changed, 25 insertions(+), 25 deletions(-) diff --git a/src/crud_ops.rs b/src/crud_ops.rs index 64732c0..c6d09db 100644 --- a/src/crud_ops.rs +++ b/src/crud_ops.rs @@ -228,42 +228,42 @@ impl Client { async fn bulk_delete_impl(&self, paths: &Vec) -> crate::Result> { let stream = stream::iter(paths.iter().map(|path| Ok(path.clone()))).boxed(); - let indexed_results = self.store.delete_stream(stream) - .enumerate() - .filter_map(|(index, result)| async move { - Some((index, result)) - }).collect::)>>().await; + let results = self.store.delete_stream(stream) + .collect::>().await; // We need to count the number of callbacks called to determine if we need to raise an error // if some paths were not processed at all. - let callbacks_called = indexed_results.len(); - let bulk_failed_entries = indexed_results.into_iter().filter_map(|(index, result)| { - match result { - Ok(_) => { - None - }, - Err(e) => match e { - // We treat not found as success because AWS S3 does not return an error - // if the object does not exist - object_store::Error::NotFound { .. } => { + let num_results = results.len(); + let failures = results + .into_iter() + .enumerate() + .filter_map(|(index, result)| { + match result { + Ok(_) => { None }, - _ => { - Some((paths[index].clone(), e.into())) - } - }, - } - }).collect::>(); + Err(e) => match e { + // We treat not found as success because AWS S3 does not return an error + // if the object does not exist + object_store::Error::NotFound { .. } => { + None + }, + _ => { + Some((paths[index].clone(), e.into())) + } + }, + } + }).collect::>(); // Rail guard to catch generic errors - if callbacks_called < paths.len() { - if callbacks_called == 0 { + if num_results < paths.len() { + if num_results == 0 { Err(crate::Error::invalid_response("Some paths were not deleted")) } else { - let error_string = bulk_failed_entries[0].1.to_string(); + let error_string = failures[0].1.to_string(); Err(crate::Error::invalid_response(error_string)) } } else { - Ok(bulk_failed_entries) + Ok(failures) } } pub async fn bulk_delete(&self, paths: Vec) -> crate::Result> { From 2c6589d889e924deb8dd0d9f618536b1284f5b34 Mon Sep 17 00:00:00 2001 From: Adnan Alhomssi Date: Tue, 11 Feb 2025 11:00:20 +0100 Subject: [PATCH 6/9] minor --- src/crud_ops.rs | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/src/crud_ops.rs b/src/crud_ops.rs index c6d09db..1b9be42 100644 --- a/src/crud_ops.rs +++ b/src/crud_ops.rs @@ -230,8 +230,8 @@ impl Client { let stream = stream::iter(paths.iter().map(|path| Ok(path.clone()))).boxed(); let results = self.store.delete_stream(stream) .collect::>().await; - // We need to count the number of callbacks called to determine if we need to raise an error - // if some paths were not processed at all. + // We count the number of results to raise an error if some paths were not + // successfully processed at all. let num_results = results.len(); let failures = results .into_iter() @@ -257,9 +257,11 @@ impl Client { // Rail guard to catch generic errors if num_results < paths.len() { if num_results == 0 { + tracing::warn!("delete_stream returned zero results"); Err(crate::Error::invalid_response("Some paths were not deleted")) } else { let error_string = failures[0].1.to_string(); + tracing::warn!("delete_stream returned a single generic error: {}", error_string); Err(crate::Error::invalid_response(error_string)) } } else { From 0e54fecb1a3158986ea2b5d2aed77c3633590cf7 Mon Sep 17 00:00:00 2001 From: Adnan Alhomssi Date: Tue, 11 Feb 2025 11:05:49 +0100 Subject: [PATCH 7/9] use RAI arrow-rs tag --- Cargo.lock | 2 +- Cargo.toml | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index f7831be..d09444b 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1373,7 +1373,7 @@ dependencies = [ [[package]] name = "object_store" version = "0.11.2" -source = "git+https://github.com/RelationalAI/arrow-rs.git?branch=object_store_0.11.3-beta1#fa77acbd1e5e3acbf0824443b2c1d1df8609b457" +source = "git+https://github.com/RelationalAI/arrow-rs.git?tag=v0.11.3-beta1#fa77acbd1e5e3acbf0824443b2c1d1df8609b457" dependencies = [ "async-trait", "base64", diff --git a/Cargo.toml b/Cargo.toml index d90d162..7e63e1d 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -43,7 +43,7 @@ futures-util = "0.3" reqwest = { version = "0.12", default-features = false, features = ["rustls-tls", "hickory-dns"] } # object_store = { version = "0.10.1", features = ["azure", "aws"] } # Pinned to a specific commit while waiting for upstream -object_store = { git = "https://github.com/RelationalAI/arrow-rs.git", branch = "object_store_0.11.3-beta1", features = ["azure", "aws", "experimental-azure-list-offset", "experimental-arbitrary-list-prefix"] } +object_store = { git = "https://github.com/RelationalAI/arrow-rs.git", tag = "v0.11.3-beta1", features = ["azure", "aws", "experimental-azure-list-offset", "experimental-arbitrary-list-prefix"] } hickory-resolver = "0.24" thiserror = "1" anyhow = { version = "1", features = ["backtrace"] } From 352fcb3258635954d3d7558b35b624d03e1d27e4 Mon Sep 17 00:00:00 2001 From: Adnan Alhomssi Date: Tue, 11 Feb 2025 11:14:10 +0100 Subject: [PATCH 8/9] also destroy error_message --- src/crud_ops.rs | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/src/crud_ops.rs b/src/crud_ops.rs index 1b9be42..4ea4c97 100644 --- a/src/crud_ops.rs +++ b/src/crud_ops.rs @@ -5,7 +5,7 @@ use ::metrics::counter; use object_store::{path::Path, ObjectStore}; use tokio_util::io::StreamReader; -use std::{ffi::{c_char, c_void, CString}, sync::{atomic::{AtomicUsize, Ordering}, Arc}}; +use std::{ffi::{c_char, c_void, CString}, sync::Arc}; use futures_util::{stream, StreamExt}; use tokio::io::{AsyncRead, AsyncReadExt, AsyncWriteExt}; @@ -60,6 +60,7 @@ pub struct BulkResponse { result: CResult, failed_entries: *const BulkFailedEntry, failed_count: u64, + // Generic error message for the whole operation. Not null implies failed_count = 0 error_message: *mut c_char, context: *const Context } @@ -108,6 +109,7 @@ pub extern "C" fn destroy_bulk_failed_entries( for entry in &*boxed_slice { // Safety: must properly drop all allocated fields from BulkFailedEntry here let _ = unsafe { CString::from_raw(entry.path.cast_mut()) }; + let _ = unsafe { CString::from_raw(entry.error_message.cast_mut()) }; } CResult::Ok } From 7c0356d65bbe921d1261e8d8bb56fefa546eedf9 Mon Sep 17 00:00:00 2001 From: Adnan Alhomssi Date: Tue, 11 Feb 2025 11:40:36 +0100 Subject: [PATCH 9/9] Bump version to 12.0 --- Cargo.lock | 2 +- Cargo.toml | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index d09444b..729a53f 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1402,7 +1402,7 @@ dependencies = [ [[package]] name = "object_store_ffi" -version = "0.11.1" +version = "0.12.0" dependencies = [ "anyhow", "async-channel", diff --git a/Cargo.toml b/Cargo.toml index 7e63e1d..ecbd3e9 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "object_store_ffi" -version = "0.11.1" +version = "0.12.0" edition = "2021" [[bench]]