Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions Project.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
name = "RustyObjectStore"
uuid = "1b5eed3d-1f46-4baa-87f3-a4a892b23610"
version = "0.11.1"
version = "0.12.3"

[deps]
Base64 = "2a0f44e3-6c83-55bd-87e4-b1978d98bd5f"
Expand All @@ -20,7 +20,7 @@ ReTestItems = "1"
Sockets = "1"
Test = "1"
julia = "1.8"
object_store_ffi_jll = "0.11.1"
object_store_ffi_jll = "0.12.3"

[extras]
CloudBase = "85eb1798-d7c4-4918-bb13-c944d38e27ed"
Expand Down
102 changes: 102 additions & 0 deletions src/RustyObjectStore.jl
Original file line number Diff line number Diff line change
Expand Up @@ -746,6 +746,13 @@ struct DeleteException <: RequestException

DeleteException(msg) = new(msg, rust_message_to_reason(msg))
end
# Used for generic exceptions that are not specific to one of the to be deleted paths
struct BulkDeleteException <: RequestException
msg::String
reason::ErrorReason

BulkDeleteException(msg) = new(msg, rust_message_to_reason(msg))
end
struct ListException <: RequestException
msg::String
reason::ErrorReason
Expand Down Expand Up @@ -1058,6 +1065,101 @@ function delete_object(path::String, conf::AbstractConfig)
end
end

# =========================================================================================
# Bulk Delete
struct BulkFailedEntryFFI
path::Cstring
error_message::Cstring
end

struct BulkFailedEntry
path::String
error_message::String
end

function convert_bulk_failed_entry(entry::BulkFailedEntryFFI)
return BulkFailedEntry(
unsafe_string(entry.path),
unsafe_string(entry.error_message),
)
end

mutable struct BulkResponseFFI
result::Cint
failed_entries::Ptr{BulkFailedEntryFFI}
failed_count::Culonglong
error_message::Ptr{Cchar}
context::Ptr{Cvoid}

BulkResponseFFI() = new(-1, C_NULL, 0, C_NULL, C_NULL)
end

"""
bulk_delete_objects(path, conf)

Send a bulk delete request to the object store.

# Arguments
- `paths::Vector{String}`: The locations of the objects to delete.
- `conf::AbstractConfig`: The configuration to use for the request.
It includes credentials and other client options.

# Throws
- `BulkDeleteException`: If the request fails for any reason.
Note that deletions of non-existing objects will be treated as success.
"""
function bulk_delete_objects(paths::Vector{String}, conf::AbstractConfig)
response = BulkResponseFFI()
ct = current_task()
event = Base.Event()
handle = pointer_from_objref(event)
config = into_config(conf)
while true
result = GC.@preserve paths config response event begin
preserve_task(ct)
try
# Pass a pointer to the array of pointers to the Cstrings
result = @ccall rust_lib.bulk_delete(
paths::Ptr{Ptr{Cchar}},
length(paths)::Cuint,
config::Ref{Config},
response::Ref{BulkResponseFFI},
handle::Ptr{Cvoid}
)::Cint

wait_or_cancel(event, response)

result
finally
unpreserve_task(ct)
end
end

if result == 2
# backoff
sleep(0.01)
continue
end

@throw_on_error(response, "bulk_delete", BulkDeleteException)

entries = if response.failed_count > 0
raw_entries = unsafe_wrap(Array, response.failed_entries, response.failed_count)
vector = map(convert_bulk_failed_entry, raw_entries)
@ccall rust_lib.destroy_bulk_failed_entries(
response.failed_entries::Ptr{BulkFailedEntryFFI},
response.failed_count::Culonglong
)::Cint
vector
else
Vector{BulkFailedEntry}[]
end

return entries
end
end
# =========================================================================================

mutable struct ReadResponseFFI
result::Cint
length::Culonglong
Expand Down
110 changes: 109 additions & 1 deletion test/azure_blobs_exception_tests.jl
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
@testitem "Basic BlobStorage exceptions" setup=[InitializeObjectStore] begin
using CloudBase.CloudTest: Azurite
import CloudBase
using RustyObjectStore: RustyObjectStore, get_object!, put_object, ClientOptions, AzureConfig, AWSConfig
import CloudBase
import HTTP
import Sockets

# For interactive testing, use Azurite.run() instead of Azurite.with()
# conf, p = Azurite.run(; debug=true, public=false); atexit(() -> kill(p))
Expand Down Expand Up @@ -173,6 +175,112 @@
@test occursin("The specified resource does not exist.", e.msg)
end
end

@testset "bulk_delete_objects exceptions" begin
# We have to mock to simulate partial failures for some of the
# requested deletes
crafted_res =
"""--batchresponse_66925647-d0cb-4109-b6d3-28efe3e1e5ed\r
Content-Type: application/http\r
Content-ID: 0\r
\r
HTTP/1.1 202 Accepted\r
x-ms-delete-type-permanent: true\r
x-ms-request-id: 778fdc83-801e-0000-62ff-0334671e284f\r
x-ms-version: 2018-11-09\r
\r
--batchresponse_66925647-d0cb-4109-b6d3-28efe3e1e5ed\r
Content-Type: application/http\r
Content-ID: 1\r
\r
HTTP/1.1 403 Forbidden\r
Content-Length: 223\r
Content-Type: application/xml\r
Server: Microsoft-HTTPAPI/2.0\r
x-ms-request-id: 12345678-90ab-cdef-1234-567890abcdef\r
x-ms-version: 2021-12-02\r
Date: Wed, 07 Feb 2025 12:34:56 GMT\r
\r
<?xml version="1.0" encoding="utf-8"?>\r
<Error>\r
<Code>AuthorizationPermissionMismatch</Code>\r
<Message>\r
This request is not authorized to perform this operation using this permission.\r
RequestId: 12345678-90ab-cdef-1234-567890abcdef\r
Time: 2025-02-07T12:34:56.000Z\r
</Message>\r
</Error>\r
Time:2018-06-14T16:46:54.6040685Z</Message></Error>\r
\r
--batchresponse_66925647-d0cb-4109-b6d3-28efe3e1e5ed\r
Content-Type: application/http\r
Content-ID: 2\r
\r
HTTP/1.1 404 The specified blob does not exist.\r
x-ms-error-code: BlobNotFound\r
x-ms-request-id: 778fdc83-801e-0000-62ff-0334671e2852\r
x-ms-version: 2018-11-09\r
Content-Length: 216\r
Content-Type: application/xml\r
\r
<?xml version=\"1.0\" encoding=\"utf-8\"?>\r
<Error><Code>BlobNotFound</Code><Message>The specified blob does not exist.\r
RequestId:778fdc83-801e-0000-62ff-0334671e2852\r
Time:2018-06-14T16:46:54.6040685Z</Message></Error>\r
--batchresponse_66925647-d0cb-4109-b6d3-28efe3e1e5ed--\r
"""
headers = Ref([
"Transfer-Encoding" => "chunked",
"Content-Type" => "multipart/mixed; boundary=batchresponse_66925647-d0cb-4109-b6d3-28efe3e1e5ed",
"x-ms-request-id" => "778fdc83-801e-0000-62ff-033467000000",
"x-ms-version" => "2018-11-09",
])
(port, tcp_server) = Sockets.listenany(8081)
http_server = HTTP.serve!(tcp_server) do request::HTTP.Request
return HTTP.Response(
200,
headers[],
crafted_res,
)
end

mock_baseurl = "http://127.0.0.1:$port/account/container/"
mock_config = AzureConfig(;
storage_account_name=_credentials.auth.account,
container_name=_container.name,
storage_account_key=_credentials.auth.key,
host=mock_baseurl
)

failed_entries = RustyObjectStore.bulk_delete_objects(
["a", "b", "c"],
mock_config,
)
@test length(failed_entries) == 1
@test failed_entries[1].path == "b"
@test occursin("Forbidden (code: 403)", first(failed_entries).error_message)

# Corrupt response headers to generate a generic exception independent of the paths
# we asked to delete
headers[] = []
try
failed_entries = RustyObjectStore.bulk_delete_objects(
["a", "b", "c"],
mock_config,
)
# should throw because the response is invalid as it misses the
# Content-Type header
@test false
catch e
@test e isa RustyObjectStore.BulkDeleteException
@test occursin("Got invalid bulk delete response", e.msg)
@test e.reason == RustyObjectStore.UnknownError()
finally
close(http_server)
end
wait(http_server)
# Test
end
end # Azurite.with
# Azurite is not running at this point
@testset "Connection error" begin
Expand Down
30 changes: 28 additions & 2 deletions test/basic_unified_tests.jl
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
@testsetup module ReadWriteCases
using RustyObjectStore: get_object!, put_object, get_object_stream, put_object_stream,
AbstractConfig, delete_object, list_objects, list_objects_stream, next_chunk!, finish!
AbstractConfig, bulk_delete_objects, delete_object,
list_objects, list_objects_stream, next_chunk!, finish!
using CodecZlib
using RustyObjectStore

Expand Down Expand Up @@ -475,6 +476,32 @@ function run_read_write_test_cases(read_config::AbstractConfig, write_config::Ab
end
end

@testset "bulk_delete_objects" begin
input = "1,2,3,4,5,6,7,8,9,1\n" ^ 5
buffer = Vector{UInt8}(undef, 100)
@assert sizeof(input) == 100
@assert sizeof(buffer) == sizeof(input)

object_names = ["test100B.csv", "test200B.csv"]
for name in object_names
nbytes_written = put_object(codeunits(input), name, write_config)
@test nbytes_written == 100
end

failed_entries = bulk_delete_objects(object_names, write_config)
@test isempty(failed_entries)

for name in object_names
try
nbytes_read = get_object!(buffer, name, read_config)
@test false # should throw
catch e
@test e isa RustyObjectStore.GetException
@test occursin("not found", e.msg)
end
end
end

# Large files should use multipart upload / download requests
@testset "20MB file, 20MB buffer" begin
input = "1,2,3,4,5,6,7,8,9,1\n" ^ 1_000_000
Expand Down Expand Up @@ -686,7 +713,6 @@ Azurite.with(; debug=true, public=false) do conf

run_sanity_test_cases(config_padded)
end # Azurite.with

end # @testitem

# NOTE: PUT on azure always requires credentials, while GET on public containers doesn't
Expand Down