diff --git a/Project.toml b/Project.toml index fab7bef..56d676c 100644 --- a/Project.toml +++ b/Project.toml @@ -1,6 +1,6 @@ name = "RustyObjectStore" uuid = "1b5eed3d-1f46-4baa-87f3-a4a892b23610" -version = "0.11.1" +version = "0.12.3" [deps] Base64 = "2a0f44e3-6c83-55bd-87e4-b1978d98bd5f" @@ -20,7 +20,7 @@ ReTestItems = "1" Sockets = "1" Test = "1" julia = "1.8" -object_store_ffi_jll = "0.11.1" +object_store_ffi_jll = "0.12.3" [extras] CloudBase = "85eb1798-d7c4-4918-bb13-c944d38e27ed" diff --git a/src/RustyObjectStore.jl b/src/RustyObjectStore.jl index a6f6f00..4d98058 100644 --- a/src/RustyObjectStore.jl +++ b/src/RustyObjectStore.jl @@ -746,6 +746,13 @@ struct DeleteException <: RequestException DeleteException(msg) = new(msg, rust_message_to_reason(msg)) end +# Used for generic exceptions that are not specific to one of the to be deleted paths +struct BulkDeleteException <: RequestException + msg::String + reason::ErrorReason + + BulkDeleteException(msg) = new(msg, rust_message_to_reason(msg)) +end struct ListException <: RequestException msg::String reason::ErrorReason @@ -1058,6 +1065,101 @@ function delete_object(path::String, conf::AbstractConfig) end end +# ========================================================================================= +# Bulk Delete +struct BulkFailedEntryFFI + path::Cstring + error_message::Cstring +end + +struct BulkFailedEntry + path::String + error_message::String +end + +function convert_bulk_failed_entry(entry::BulkFailedEntryFFI) + return BulkFailedEntry( + unsafe_string(entry.path), + unsafe_string(entry.error_message), + ) +end + +mutable struct BulkResponseFFI + result::Cint + failed_entries::Ptr{BulkFailedEntryFFI} + failed_count::Culonglong + error_message::Ptr{Cchar} + context::Ptr{Cvoid} + + BulkResponseFFI() = new(-1, C_NULL, 0, C_NULL, C_NULL) +end + +""" + bulk_delete_objects(path, conf) + +Send a bulk delete request to the object store. + +# Arguments +- `paths::Vector{String}`: The locations of the objects to delete. +- `conf::AbstractConfig`: The configuration to use for the request. + It includes credentials and other client options. + +# Throws +- `BulkDeleteException`: If the request fails for any reason. + Note that deletions of non-existing objects will be treated as success. +""" +function bulk_delete_objects(paths::Vector{String}, conf::AbstractConfig) + response = BulkResponseFFI() + ct = current_task() + event = Base.Event() + handle = pointer_from_objref(event) + config = into_config(conf) + while true + result = GC.@preserve paths config response event begin + preserve_task(ct) + try + # Pass a pointer to the array of pointers to the Cstrings + result = @ccall rust_lib.bulk_delete( + paths::Ptr{Ptr{Cchar}}, + length(paths)::Cuint, + config::Ref{Config}, + response::Ref{BulkResponseFFI}, + handle::Ptr{Cvoid} + )::Cint + + wait_or_cancel(event, response) + + result + finally + unpreserve_task(ct) + end + end + + if result == 2 + # backoff + sleep(0.01) + continue + end + + @throw_on_error(response, "bulk_delete", BulkDeleteException) + + entries = if response.failed_count > 0 + raw_entries = unsafe_wrap(Array, response.failed_entries, response.failed_count) + vector = map(convert_bulk_failed_entry, raw_entries) + @ccall rust_lib.destroy_bulk_failed_entries( + response.failed_entries::Ptr{BulkFailedEntryFFI}, + response.failed_count::Culonglong + )::Cint + vector + else + Vector{BulkFailedEntry}[] + end + + return entries + end +end +# ========================================================================================= + mutable struct ReadResponseFFI result::Cint length::Culonglong diff --git a/test/azure_blobs_exception_tests.jl b/test/azure_blobs_exception_tests.jl index 2bb6603..22ed43d 100644 --- a/test/azure_blobs_exception_tests.jl +++ b/test/azure_blobs_exception_tests.jl @@ -1,7 +1,9 @@ @testitem "Basic BlobStorage exceptions" setup=[InitializeObjectStore] begin using CloudBase.CloudTest: Azurite - import CloudBase using RustyObjectStore: RustyObjectStore, get_object!, put_object, ClientOptions, AzureConfig, AWSConfig + import CloudBase + import HTTP + import Sockets # For interactive testing, use Azurite.run() instead of Azurite.with() # conf, p = Azurite.run(; debug=true, public=false); atexit(() -> kill(p)) @@ -173,6 +175,112 @@ @test occursin("The specified resource does not exist.", e.msg) end end + + @testset "bulk_delete_objects exceptions" begin + # We have to mock to simulate partial failures for some of the + # requested deletes +crafted_res = +"""--batchresponse_66925647-d0cb-4109-b6d3-28efe3e1e5ed\r +Content-Type: application/http\r +Content-ID: 0\r +\r +HTTP/1.1 202 Accepted\r +x-ms-delete-type-permanent: true\r +x-ms-request-id: 778fdc83-801e-0000-62ff-0334671e284f\r +x-ms-version: 2018-11-09\r +\r +--batchresponse_66925647-d0cb-4109-b6d3-28efe3e1e5ed\r +Content-Type: application/http\r +Content-ID: 1\r +\r +HTTP/1.1 403 Forbidden\r +Content-Length: 223\r +Content-Type: application/xml\r +Server: Microsoft-HTTPAPI/2.0\r +x-ms-request-id: 12345678-90ab-cdef-1234-567890abcdef\r +x-ms-version: 2021-12-02\r +Date: Wed, 07 Feb 2025 12:34:56 GMT\r +\r +\r +\r + AuthorizationPermissionMismatch\r + \r + This request is not authorized to perform this operation using this permission.\r + RequestId: 12345678-90ab-cdef-1234-567890abcdef\r + Time: 2025-02-07T12:34:56.000Z\r + \r +\r +Time:2018-06-14T16:46:54.6040685Z\r +\r +--batchresponse_66925647-d0cb-4109-b6d3-28efe3e1e5ed\r +Content-Type: application/http\r +Content-ID: 2\r +\r +HTTP/1.1 404 The specified blob does not exist.\r +x-ms-error-code: BlobNotFound\r +x-ms-request-id: 778fdc83-801e-0000-62ff-0334671e2852\r +x-ms-version: 2018-11-09\r +Content-Length: 216\r +Content-Type: application/xml\r +\r +\r +BlobNotFoundThe specified blob does not exist.\r +RequestId:778fdc83-801e-0000-62ff-0334671e2852\r +Time:2018-06-14T16:46:54.6040685Z\r +--batchresponse_66925647-d0cb-4109-b6d3-28efe3e1e5ed--\r +""" + headers = Ref([ + "Transfer-Encoding" => "chunked", + "Content-Type" => "multipart/mixed; boundary=batchresponse_66925647-d0cb-4109-b6d3-28efe3e1e5ed", + "x-ms-request-id" => "778fdc83-801e-0000-62ff-033467000000", + "x-ms-version" => "2018-11-09", + ]) + (port, tcp_server) = Sockets.listenany(8081) + http_server = HTTP.serve!(tcp_server) do request::HTTP.Request + return HTTP.Response( + 200, + headers[], + crafted_res, + ) + end + + mock_baseurl = "http://127.0.0.1:$port/account/container/" + mock_config = AzureConfig(; + storage_account_name=_credentials.auth.account, + container_name=_container.name, + storage_account_key=_credentials.auth.key, + host=mock_baseurl + ) + + failed_entries = RustyObjectStore.bulk_delete_objects( + ["a", "b", "c"], + mock_config, + ) + @test length(failed_entries) == 1 + @test failed_entries[1].path == "b" + @test occursin("Forbidden (code: 403)", first(failed_entries).error_message) + + # Corrupt response headers to generate a generic exception independent of the paths + # we asked to delete + headers[] = [] + try + failed_entries = RustyObjectStore.bulk_delete_objects( + ["a", "b", "c"], + mock_config, + ) + # should throw because the response is invalid as it misses the + # Content-Type header + @test false + catch e + @test e isa RustyObjectStore.BulkDeleteException + @test occursin("Got invalid bulk delete response", e.msg) + @test e.reason == RustyObjectStore.UnknownError() + finally + close(http_server) + end + wait(http_server) + # Test + end end # Azurite.with # Azurite is not running at this point @testset "Connection error" begin diff --git a/test/basic_unified_tests.jl b/test/basic_unified_tests.jl index a9ef61a..b4b1b97 100644 --- a/test/basic_unified_tests.jl +++ b/test/basic_unified_tests.jl @@ -1,6 +1,7 @@ @testsetup module ReadWriteCases using RustyObjectStore: get_object!, put_object, get_object_stream, put_object_stream, - AbstractConfig, delete_object, list_objects, list_objects_stream, next_chunk!, finish! + AbstractConfig, bulk_delete_objects, delete_object, + list_objects, list_objects_stream, next_chunk!, finish! using CodecZlib using RustyObjectStore @@ -475,6 +476,32 @@ function run_read_write_test_cases(read_config::AbstractConfig, write_config::Ab end end + @testset "bulk_delete_objects" begin + input = "1,2,3,4,5,6,7,8,9,1\n" ^ 5 + buffer = Vector{UInt8}(undef, 100) + @assert sizeof(input) == 100 + @assert sizeof(buffer) == sizeof(input) + + object_names = ["test100B.csv", "test200B.csv"] + for name in object_names + nbytes_written = put_object(codeunits(input), name, write_config) + @test nbytes_written == 100 + end + + failed_entries = bulk_delete_objects(object_names, write_config) + @test isempty(failed_entries) + + for name in object_names + try + nbytes_read = get_object!(buffer, name, read_config) + @test false # should throw + catch e + @test e isa RustyObjectStore.GetException + @test occursin("not found", e.msg) + end + end + end + # Large files should use multipart upload / download requests @testset "20MB file, 20MB buffer" begin input = "1,2,3,4,5,6,7,8,9,1\n" ^ 1_000_000 @@ -686,7 +713,6 @@ Azurite.with(; debug=true, public=false) do conf run_sanity_test_cases(config_padded) end # Azurite.with - end # @testitem # NOTE: PUT on azure always requires credentials, while GET on public containers doesn't