From 8b4ff5ef05895f9671727de0211f9058f495b99e Mon Sep 17 00:00:00 2001 From: Adnan Alhomssi Date: Wed, 5 Feb 2025 00:35:46 +0100 Subject: [PATCH 1/8] draft --- src/RustyObjectStore.jl | 96 +++++++++++++++++++++++++++++++++++++ test/basic_unified_tests.jl | 49 +++++++++++++++++++ 2 files changed, 145 insertions(+) diff --git a/src/RustyObjectStore.jl b/src/RustyObjectStore.jl index b72780a..7ce46f0 100644 --- a/src/RustyObjectStore.jl +++ b/src/RustyObjectStore.jl @@ -905,6 +905,102 @@ function delete_object(path::String, conf::AbstractConfig) end end +# ========================================================================================= +# Bulk Delete +struct BulkFailedEntryFFI + path::Cstring + error_message::Cstring +end + +struct BulkFailedEntry + path::String + error_message::String +end + +function convert_bulk_failed_entry(entry::BulkFailedEntryFFI) + return BulkFailedEntry( + unsafe_string(entry.path), + unsafe_string(entry.error_message), + ) +end + +mutable struct BulkResponseFFI + result::Cint + failed_entries::Ptr{BulkFailedEntryFFI} + failed_count::Culonglong + error_message::Ptr{Cchar} + context::Ptr{Cvoid} + + BulkResponseFFI() = new(-1, C_NULL, 0, C_NULL, C_NULL) +end + +""" + bulk_delete_objects(path, conf) + +Send a delete request to the object store. + +# Arguments +- `path::String`: The location of the object to delete. +- `conf::AbstractConfig`: The configuration to use for the request. + It includes credentials and other client options. + +# Throws +- `DeleteException`: If the request fails for any reason. Note that S3 will treat a delete request + to a non-existing object as a success, while Azure Blob will treat it as a 404 error. +""" +function bulk_delete_objects(paths::Vector{String}, conf::AbstractConfig) + response = BulkResponseFFI() + ct = current_task() + event = Base.Event() + handle = pointer_from_objref(event) + config = into_config(conf) + while true + preserve_task(ct) + # Convert Julia strings to Cstrings + c_paths = [Base.cconvert(Cstring, path) for path in paths] + # Create an array of pointers to the Cstrings + paths_array = [pointer(c_path) for c_path in c_paths] + result = GC.@preserve paths c_paths paths_array config response event try + # Pass a pointer to the array of pointers to the Cstrings + c_paths_ptr = pointer(paths_array) + result = @ccall rust_lib.bulk_delete( + c_paths_ptr::Ptr{Ptr{Cchar}}, + length(paths)::Cuint, + config::Ref{Config}, + response::Ref{BulkResponseFFI}, + handle::Ptr{Cvoid} + )::Cint + + wait_or_cancel(event, response) + + result + finally + unpreserve_task(ct) + end + + if result == 2 + # backoff + sleep(0.01) + continue + end + + entries = if response.failed_count > 0 + raw_entries = unsafe_wrap(Array, response.failed_entries, response.failed_count) + vector = map(convert_bulk_failed_entry, raw_entries) + @ccall rust_lib.destroy_bulk_failed_entries( + response.failed_entries::Ptr{BulkFailedEntryFFI}, + response.failed_count::Culonglong + )::Cint + vector + else + Vector{BulkFailedEntry}[] + end + + return entries + end +end +# ========================================================================================= + mutable struct ReadResponseFFI result::Cint length::Culonglong diff --git a/test/basic_unified_tests.jl b/test/basic_unified_tests.jl index a7cae70..7078a3c 100644 --- a/test/basic_unified_tests.jl +++ b/test/basic_unified_tests.jl @@ -679,6 +679,55 @@ end # Azurite.with end # @testitem + +@testitem "playground" setup=[InitializeObjectStore, ReadWriteCases] begin +using CloudBase.CloudTest: Azurite +using RustyObjectStore: AzureConfig, ClientOptions + + +# For interactive testing, use Azurite.run() instead of Azurite.with() +# conf, p = Azurite.run(; debug=true, public=false); atexit(() -> kill(p)) +Azurite.with(; debug=true, public=false) do conf + _credentials, _container = conf + base_url = _container.baseurl + config = AzureConfig(; + storage_account_name=_credentials.auth.account, + container_name=_container.name, + storage_account_key=_credentials.auth.key, + host=base_url + ) + + @testset "delete_object" begin + input = "1,2,3,4,5,6,7,8,9,1\n" ^ 5 + buffer = Vector{UInt8}(undef, 100) + @assert sizeof(input) == 100 + @assert sizeof(buffer) == sizeof(input) + + nbytes_written = put_object(codeunits(input), "test100B.csv", config) + @test nbytes_written == 100 + + failed_entries = RustyObjectStore.bulk_delete_objects( + ["test100B.csv", "not_to_be_found.csv"], + config, + ) + # Even though `not_to_be_found.csv` doesn't exist, it's not an error + # to try to delete it. This intentional behavior because AWS S3 doesn't + # return an error when trying to delete a non-existent object. + @test length(failed_entries) == 0 + + try + nbytes_read = get_object!(buffer, "test100B.csv", config) + @test false # should throw + catch e + @test e isa RustyObjectStore.GetException + @test occursin("not found", e.msg) + end + end + +end # Azurite.with + +end # @testitem + # NOTE: PUT on azure always requires credentials, while GET on public containers doesn't @testitem "Basic BlobStorage usage (anonymous read enabled)" setup=[InitializeObjectStore, ReadWriteCases] begin using CloudBase.CloudTest: Azurite From 796b372b55e73773a4cd8af17f9d5e315379603b Mon Sep 17 00:00:00 2001 From: Adnan Alhomssi Date: Wed, 5 Feb 2025 00:53:24 +0100 Subject: [PATCH 2/8] BulkDeleteException to catch generic exceptions --- src/RustyObjectStore.jl | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/src/RustyObjectStore.jl b/src/RustyObjectStore.jl index 7ce46f0..0589db6 100644 --- a/src/RustyObjectStore.jl +++ b/src/RustyObjectStore.jl @@ -596,6 +596,13 @@ struct DeleteException <: RequestException DeleteException(msg) = new(msg, rust_message_to_reason(msg)) end +# Used for generic exceptions that are not specific to one of the to be deleted paths +struct BulkDeleteException <: RequestException + msg::String + reason::ErrorReason + + BulkDeleteException(msg) = new(msg, rust_message_to_reason(msg)) +end struct ListException <: RequestException msg::String reason::ErrorReason @@ -984,6 +991,8 @@ function bulk_delete_objects(paths::Vector{String}, conf::AbstractConfig) continue end + @throw_on_error(response, "bulk_delete", BulkDeleteException) + entries = if response.failed_count > 0 raw_entries = unsafe_wrap(Array, response.failed_entries, response.failed_count) vector = map(convert_bulk_failed_entry, raw_entries) From 52a0eebd91468efb51eceaa4797df2e181b8b3ca Mon Sep 17 00:00:00 2001 From: Adnan Alhomssi Date: Fri, 7 Feb 2025 18:18:20 +0100 Subject: [PATCH 3/8] test cases --- test/azure_blobs_exception_tests.jl | 109 +++++++++++++++++++++++++++- test/basic_unified_tests.jl | 79 +++++++------------- 2 files changed, 136 insertions(+), 52 deletions(-) diff --git a/test/azure_blobs_exception_tests.jl b/test/azure_blobs_exception_tests.jl index f2dbeb6..b5047bc 100644 --- a/test/azure_blobs_exception_tests.jl +++ b/test/azure_blobs_exception_tests.jl @@ -1,7 +1,9 @@ @testitem "Basic BlobStorage exceptions" setup=[InitializeObjectStore] begin using CloudBase.CloudTest: Azurite - import CloudBase using RustyObjectStore: RustyObjectStore, get_object!, put_object, ClientOptions, AzureConfig, AWSConfig + import CloudBase + import HTTP + import Sockets # For interactive testing, use Azurite.run() instead of Azurite.with() # conf, p = Azurite.run(; debug=true, public=false); atexit(() -> kill(p)) @@ -173,6 +175,111 @@ @test occursin("The specified resource does not exist.", e.msg) end end + + @testset "bulk_delete_objects exceptions" begin + # We have to mock to simulate partial failures for some of the + # requested deletes +crafted_res = +"""--batchresponse_66925647-d0cb-4109-b6d3-28efe3e1e5ed\r +Content-Type: application/http\r +Content-ID: 0\r +\r +HTTP/1.1 202 Accepted\r +x-ms-delete-type-permanent: true\r +x-ms-request-id: 778fdc83-801e-0000-62ff-0334671e284f\r +x-ms-version: 2018-11-09\r +\r +--batchresponse_66925647-d0cb-4109-b6d3-28efe3e1e5ed\r +Content-Type: application/http\r +Content-ID: 1\r +\r +HTTP/1.1 403 Forbidden\r +Content-Length: 223\r +Content-Type: application/xml\r +Server: Microsoft-HTTPAPI/2.0\r +x-ms-request-id: 12345678-90ab-cdef-1234-567890abcdef\r +x-ms-version: 2021-12-02\r +Date: Wed, 07 Feb 2025 12:34:56 GMT\r +\r +\r +\r + AuthorizationPermissionMismatch\r + \r + This request is not authorized to perform this operation using this permission.\r + RequestId: 12345678-90ab-cdef-1234-567890abcdef\r + Time: 2025-02-07T12:34:56.000Z\r + \r +\r +Time:2018-06-14T16:46:54.6040685Z\r +\r +--batchresponse_66925647-d0cb-4109-b6d3-28efe3e1e5ed\r +Content-Type: application/http\r +Content-ID: 2\r +\r +HTTP/1.1 404 The specified blob does not exist.\r +x-ms-error-code: BlobNotFound\r +x-ms-request-id: 778fdc83-801e-0000-62ff-0334671e2852\r +x-ms-version: 2018-11-09\r +Content-Length: 216\r +Content-Type: application/xml\r +\r +\r +BlobNotFoundThe specified blob does not exist.\r +RequestId:778fdc83-801e-0000-62ff-0334671e2852\r +Time:2018-06-14T16:46:54.6040685Z\r +--batchresponse_66925647-d0cb-4109-b6d3-28efe3e1e5ed--\r +""" + headers = Ref([ + "Transfer-Encoding" => "chunked", + "Content-Type" => "multipart/mixed; boundary=batchresponse_66925647-d0cb-4109-b6d3-28efe3e1e5ed", + "x-ms-request-id" => "778fdc83-801e-0000-62ff-033467000000", + "x-ms-version" => "2018-11-09", + ]) + (port, tcp_server) = Sockets.listenany(8081) + http_server = HTTP.serve!(tcp_server) do request::HTTP.Request + return HTTP.Response( + 200, + headers[], + crafted_res, + ) + end + + mock_baseurl = "http://127.0.0.1:$port/account/container/" + mock_config = AzureConfig(; + storage_account_name=_credentials.auth.account, + container_name=_container.name, + storage_account_key=_credentials.auth.key, + host=mock_baseurl + ) + + failed_entries = RustyObjectStore.bulk_delete_objects( + ["a", "b", "c"], + mock_config, + ) + @test length(failed_entries) == 1 + @test occursin("Forbidden (code: 403)", first(failed_entries).error_message) + + # Corrupt response headers to generate a generic exception independent of the paths + # we asked to delete + headers[] = [] + try + failed_entries = RustyObjectStore.bulk_delete_objects( + ["a", "b", "c"], + mock_config, + ) + # should throw because the response is invalid as it misses the + # Content-Type header + @test false + catch e + @test e isa RustyObjectStore.BulkDeleteException + @test occursin("Got invalid bulk delete response", e.msg) + @test e.reason == RustyObjectStore.UnknownError() + finally + close(http_server) + end + wait(http_server) + # Test + end end # Azurite.with # Azurite is not running at this point @testset "Connection error" begin diff --git a/test/basic_unified_tests.jl b/test/basic_unified_tests.jl index 7078a3c..3633ee0 100644 --- a/test/basic_unified_tests.jl +++ b/test/basic_unified_tests.jl @@ -1,6 +1,7 @@ @testsetup module ReadWriteCases using RustyObjectStore: get_object!, put_object, get_object_stream, put_object_stream, - AbstractConfig, delete_object, list_objects, list_objects_stream, next_chunk!, finish! + AbstractConfig, bulk_delete_objects, delete_object, + list_objects, list_objects_stream, next_chunk!, finish! using CodecZlib using RustyObjectStore @@ -475,6 +476,32 @@ function run_read_write_test_cases(read_config::AbstractConfig, write_config::Ab end end + @testset "bulk_delete_objects" begin + input = "1,2,3,4,5,6,7,8,9,1\n" ^ 5 + buffer = Vector{UInt8}(undef, 100) + @assert sizeof(input) == 100 + @assert sizeof(buffer) == sizeof(input) + + object_names = ["test100B.csv", "test200B.csv"] + for name in object_names + nbytes_written = put_object(codeunits(input), name, write_config) + @test nbytes_written == 100 + end + + failed_entries = bulk_delete_objects(object_names, write_config) + @test isempty(failed_entries) + + for name in object_names + try + nbytes_read = get_object!(buffer, name, read_config) + @test false # should throw + catch e + @test e isa RustyObjectStore.GetException + @test occursin("not found", e.msg) + end + end + end + # Large files should use multipart upload / download requests @testset "20MB file, 20MB buffer" begin input = "1,2,3,4,5,6,7,8,9,1\n" ^ 1_000_000 @@ -676,56 +703,6 @@ Azurite.with(; debug=true, public=false) do conf run_sanity_test_cases(config_padded) end # Azurite.with - -end # @testitem - - -@testitem "playground" setup=[InitializeObjectStore, ReadWriteCases] begin -using CloudBase.CloudTest: Azurite -using RustyObjectStore: AzureConfig, ClientOptions - - -# For interactive testing, use Azurite.run() instead of Azurite.with() -# conf, p = Azurite.run(; debug=true, public=false); atexit(() -> kill(p)) -Azurite.with(; debug=true, public=false) do conf - _credentials, _container = conf - base_url = _container.baseurl - config = AzureConfig(; - storage_account_name=_credentials.auth.account, - container_name=_container.name, - storage_account_key=_credentials.auth.key, - host=base_url - ) - - @testset "delete_object" begin - input = "1,2,3,4,5,6,7,8,9,1\n" ^ 5 - buffer = Vector{UInt8}(undef, 100) - @assert sizeof(input) == 100 - @assert sizeof(buffer) == sizeof(input) - - nbytes_written = put_object(codeunits(input), "test100B.csv", config) - @test nbytes_written == 100 - - failed_entries = RustyObjectStore.bulk_delete_objects( - ["test100B.csv", "not_to_be_found.csv"], - config, - ) - # Even though `not_to_be_found.csv` doesn't exist, it's not an error - # to try to delete it. This intentional behavior because AWS S3 doesn't - # return an error when trying to delete a non-existent object. - @test length(failed_entries) == 0 - - try - nbytes_read = get_object!(buffer, "test100B.csv", config) - @test false # should throw - catch e - @test e isa RustyObjectStore.GetException - @test occursin("not found", e.msg) - end - end - -end # Azurite.with - end # @testitem # NOTE: PUT on azure always requires credentials, while GET on public containers doesn't From bf063d10f3a9d2c0aa1b9dfe98011eba2afdcbff Mon Sep 17 00:00:00 2001 From: Adnan Alhomssi Date: Tue, 11 Feb 2025 12:22:40 +0100 Subject: [PATCH 4/8] PR feedback --- src/RustyObjectStore.jl | 47 +++++++++++++++++++---------------------- 1 file changed, 22 insertions(+), 25 deletions(-) diff --git a/src/RustyObjectStore.jl b/src/RustyObjectStore.jl index 0589db6..b67a808 100644 --- a/src/RustyObjectStore.jl +++ b/src/RustyObjectStore.jl @@ -944,16 +944,16 @@ end """ bulk_delete_objects(path, conf) -Send a delete request to the object store. +Send a bulk delete request to the object store. # Arguments -- `path::String`: The location of the object to delete. +- `paths::Vector{String}`: The locations of the objects to delete. - `conf::AbstractConfig`: The configuration to use for the request. It includes credentials and other client options. # Throws -- `DeleteException`: If the request fails for any reason. Note that S3 will treat a delete request - to a non-existing object as a success, while Azure Blob will treat it as a 404 error. +- `BulkDeleteException`: If the request fails for any reason. + Note that deletions of non-existing objects will be treated as success. """ function bulk_delete_objects(paths::Vector{String}, conf::AbstractConfig) response = BulkResponseFFI() @@ -962,27 +962,24 @@ function bulk_delete_objects(paths::Vector{String}, conf::AbstractConfig) handle = pointer_from_objref(event) config = into_config(conf) while true - preserve_task(ct) - # Convert Julia strings to Cstrings - c_paths = [Base.cconvert(Cstring, path) for path in paths] - # Create an array of pointers to the Cstrings - paths_array = [pointer(c_path) for c_path in c_paths] - result = GC.@preserve paths c_paths paths_array config response event try - # Pass a pointer to the array of pointers to the Cstrings - c_paths_ptr = pointer(paths_array) - result = @ccall rust_lib.bulk_delete( - c_paths_ptr::Ptr{Ptr{Cchar}}, - length(paths)::Cuint, - config::Ref{Config}, - response::Ref{BulkResponseFFI}, - handle::Ptr{Cvoid} - )::Cint - - wait_or_cancel(event, response) - - result - finally - unpreserve_task(ct) + result = GC.@preserve paths config response event begin + preserve_task(ct) + try + # Pass a pointer to the array of pointers to the Cstrings + result = @ccall rust_lib.bulk_delete( + paths::Ptr{Ptr{Cchar}}, + length(paths)::Cuint, + config::Ref{Config}, + response::Ref{BulkResponseFFI}, + handle::Ptr{Cvoid} + )::Cint + + wait_or_cancel(event, response) + + result + finally + unpreserve_task(ct) + end end if result == 2 From 24c211ece9449cdefa5a0c02a7fafee57267df76 Mon Sep 17 00:00:00 2001 From: Adnan Alhomssi Date: Tue, 11 Feb 2025 20:31:12 +0100 Subject: [PATCH 5/8] minor test --- test/azure_blobs_exception_tests.jl | 1 + 1 file changed, 1 insertion(+) diff --git a/test/azure_blobs_exception_tests.jl b/test/azure_blobs_exception_tests.jl index 4525635..22ed43d 100644 --- a/test/azure_blobs_exception_tests.jl +++ b/test/azure_blobs_exception_tests.jl @@ -257,6 +257,7 @@ Time:2018-06-14T16:46:54.6040685Z\r mock_config, ) @test length(failed_entries) == 1 + @test failed_entries[1].path == "b" @test occursin("Forbidden (code: 403)", first(failed_entries).error_message) # Corrupt response headers to generate a generic exception independent of the paths From 7edc067d0a4a0c7a1563fc97c0f19d561b4a3fc9 Mon Sep 17 00:00:00 2001 From: Adnan Alhomssi Date: Tue, 11 Feb 2025 22:25:39 +0100 Subject: [PATCH 6/8] update object_store_ffi_jll --- Project.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Project.toml b/Project.toml index fab7bef..2d1a6c5 100644 --- a/Project.toml +++ b/Project.toml @@ -20,7 +20,7 @@ ReTestItems = "1" Sockets = "1" Test = "1" julia = "1.8" -object_store_ffi_jll = "0.11.1" +object_store_ffi_jll = "0.12.2" [extras] CloudBase = "85eb1798-d7c4-4918-bb13-c944d38e27ed" From 71701ca10d6f8eddb447fcbf958f8a473e13adbd Mon Sep 17 00:00:00 2001 From: Adnan Alhomssi Date: Wed, 12 Feb 2025 01:02:45 +0100 Subject: [PATCH 7/8] bump RustyObjectStore version --- Project.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Project.toml b/Project.toml index 2d1a6c5..2b1d163 100644 --- a/Project.toml +++ b/Project.toml @@ -1,6 +1,6 @@ name = "RustyObjectStore" uuid = "1b5eed3d-1f46-4baa-87f3-a4a892b23610" -version = "0.11.1" +version = "0.12.2" [deps] Base64 = "2a0f44e3-6c83-55bd-87e4-b1978d98bd5f" From a13f10365435046bddf31d57d6555e6f82981680 Mon Sep 17 00:00:00 2001 From: Andre Guedes Date: Fri, 28 Feb 2025 23:52:26 -0300 Subject: [PATCH 8/8] [HOTFIX] workaround Azure token expiration (version bump) --- Project.toml | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/Project.toml b/Project.toml index 2b1d163..56d676c 100644 --- a/Project.toml +++ b/Project.toml @@ -1,6 +1,6 @@ name = "RustyObjectStore" uuid = "1b5eed3d-1f46-4baa-87f3-a4a892b23610" -version = "0.12.2" +version = "0.12.3" [deps] Base64 = "2a0f44e3-6c83-55bd-87e4-b1978d98bd5f" @@ -20,7 +20,7 @@ ReTestItems = "1" Sockets = "1" Test = "1" julia = "1.8" -object_store_ffi_jll = "0.12.2" +object_store_ffi_jll = "0.12.3" [extras] CloudBase = "85eb1798-d7c4-4918-bb13-c944d38e27ed"