From b6c14a82c060ac7cd0f234b30e8510af64638b82 Mon Sep 17 00:00:00 2001 From: Alexander Renz-Wieland Date: Tue, 19 Nov 2024 15:02:22 +0100 Subject: [PATCH 01/10] First cut: basic tests --- src/mock_server.jl | 59 ++++++++++++++++++++++++++++++++----- test/basic_unified_tests.jl | 38 ++++++++++++++++++++++-- 2 files changed, 87 insertions(+), 10 deletions(-) diff --git a/src/mock_server.jl b/src/mock_server.jl index d68d313..cebdf00 100644 --- a/src/mock_server.jl +++ b/src/mock_server.jl @@ -1,4 +1,6 @@ -using CloudBase: CloudCredentials, AWSCredentials, AbstractStore, AWS +using CloudBase: CloudCredentials, AbstractStore +using CloudBase: AWSCredentials, AWS +using CloudBase: AzureCredentials, Azure using JSON3, HTTP, Sockets, Base64 using RustyObjectStore: SnowflakeConfig, ClientOptions using Base: UUID @@ -118,6 +120,31 @@ function construct_stage_info(credentials::AWSCredentials, store::AWS.Bucket, pa ) end +function construct_stage_info(credentials::AzureCredentials, store::Azure.Container, path::String, encrypted::Bool) + m = match(r"(https?://.*?)/(.*)", store.baseurl) + @assert !isnothing(m) + test_endpoint = m.captures[1] + rest = split(HTTP.unescapeuri(m.captures[2]), "/") + account = rest[1] + container = rest[2] + + Dict( + "locationType" => "AZURE", + "location" => container * "/", + "path" => path, + "region" => "westus2", + "storageAccount" => account, + "isClientSideEncrypted" => encrypted, + "ciphers" => encrypted ? "AES_CBC" : nothing, + "creds" => Dict( + "AZURE_SAS_TOKEN" => "dummy-token", + ), + "useS3RegionalUrl" => false, + "endPoint" => "blob.core.windows.net", + "testEndpoint" => test_endpoint, + ) +end + function next_id_and_key(gw::SFGatewayMock) @lock gw.keys_lock begin key_id = gw.next_key_id @@ -216,6 +243,8 @@ function start(gw::SFGatewayMock) stage_info = if isa(gw.credentials, AWSCredentials) && isa(gw.store, AWS.Bucket) construct_stage_info(gw.credentials, gw.store, stage_path(stage), gw.encrypted) + elseif isa(gw.credentials, AzureCredentials) && isa(gw.store, Azure.Container) + construct_stage_info(gw.credentials, gw.store, stage_path(stage), gw.encrypted) else error("unimplemented") end @@ -251,18 +280,32 @@ function start(gw::SFGatewayMock) stage_info = if isa(gw.credentials, AWSCredentials) && isa(gw.store, AWS.Bucket) construct_stage_info(gw.credentials, gw.store, stage_path(stage), gw.encrypted) + elseif isa(gw.credentials, AzureCredentials) && isa(gw.store, Azure.Container) + construct_stage_info(gw.credentials, gw.store, stage_path(stage), gw.encrypted) else error("unimplemented") end encryption_material = if gw.encrypted - # fetch key id from s3 meta and return key - response = AWS.head( - stage_info["testEndpoint"] * "/" * stage_info["location"] * path; - service="s3", region="us-east-1", credentials=gw.credentials - ) - pos = findfirst(x -> x[1] == "x-amz-meta-x-amz-matdesc", response.headers) - matdesc = JSON3.read(response.headers[pos][2]) + # fetch key id from blob meta and return key + headers, metadata_key = if isa(gw.credentials, AWSCredentials) + response = AWS.head( + stage_info["testEndpoint"] * "/" * stage_info["location"] * path; + service="s3", region="us-east-1", credentials=gw.credentials + ) + response.headers, "x-amz-meta-x-amz-matdesc" + elseif isa(gw.credentials, AzureCredentials) + response = Azure.head( + stage_info["testEndpoint"] * "/" * stage_info["storageAccount"] * "/" * stage_info["location"] * path; + service="blob", region="westus2", credentials=gw.credentials + ) + println("response headers: $(response.headers)") + response.headers, "x-ms-meta-matdesc" + else + error("unknown credentials type: $(typeof(gw.credentials))") + end + pos = findfirst(x -> x[1] == metadata_key, headers) + matdesc = JSON3.read(headers[pos][2]) key_id = matdesc["queryId"] key = find_key_by_id(gw, key_id) Dict( diff --git a/test/basic_unified_tests.jl b/test/basic_unified_tests.jl index ef106d7..a9ef61a 100644 --- a/test/basic_unified_tests.jl +++ b/test/basic_unified_tests.jl @@ -776,7 +776,7 @@ Minio.with(; debug=true, public=true) do conf end # Minio.with end # @testitem -@testitem "Basic Snowflake Stage usage" setup=[InitializeObjectStore, SnowflakeMock, ReadWriteCases] begin +@testitem "Basic Snowflake Stage usage: AWS, non-encrypted" setup=[InitializeObjectStore, SnowflakeMock, ReadWriteCases] begin using CloudBase.CloudTest: Minio using RustyObjectStore: SnowflakeConfig, ClientOptions @@ -793,7 +793,7 @@ Minio.with(; debug=true, public=false) do conf end # Minio.with end # @testitem -@testitem "Basic Snowflake Stage usage (encrypted)" setup=[InitializeObjectStore, SnowflakeMock, ReadWriteCases] begin +@testitem "Basic Snowflake Stage usage: AWS, encrypted" setup=[InitializeObjectStore, SnowflakeMock, ReadWriteCases] begin using CloudBase.CloudTest: Minio using RustyObjectStore: SnowflakeConfig, ClientOptions @@ -809,3 +809,37 @@ Minio.with(; debug=true, public=false) do conf end end # Minio.with end # @testitem + +@testitem "Basic Snowflake Stage usage: Azure, non-encrypted" setup=[InitializeObjectStore, SnowflakeMock, ReadWriteCases] begin +using CloudBase.CloudTest: Azurite +using RustyObjectStore: SnowflakeConfig, ClientOptions + +# For interactive testing, use Azurite.run() instead of Azurite.with() +# conf, p = Azurite.run(; debug=true, public=false); atexit(() -> kill(p)) +Azurite.with(; debug=true, public=false) do conf + credentials, container = conf + with(SFGatewayMock(credentials, container, false)) do config::SnowflakeConfig + run_read_write_test_cases(config) + run_stream_test_cases(config) + run_list_test_cases(config) + run_sanity_test_cases(config) + end +end # Azurite.with +end # @testitem + +@testitem "Basic Snowflake Stage usage: Azure, encrypted" setup=[InitializeObjectStore, SnowflakeMock, ReadWriteCases] begin +using CloudBase.CloudTest: Azurite +using RustyObjectStore: SnowflakeConfig, ClientOptions + +# For interactive testing, use Azurite.run() instead of Azurite.with() +# conf, p = Azurite.run(; debug=true, public=false); atexit(() -> kill(p)) +Azurite.with(; debug=true, public=false) do conf + credentials, container = conf + with(SFGatewayMock(credentials, container, true)) do config::SnowflakeConfig + run_read_write_test_cases(config) + run_stream_test_cases(config) + run_list_test_cases(config; strict_entry_size=false) + run_sanity_test_cases(config) + end +end # Azurite.with +end # @testitem From 6dc75d5c10eff8a388c379d4d56a4b2c391febba Mon Sep 17 00:00:00 2001 From: Alexander Renz-Wieland Date: Tue, 19 Nov 2024 19:15:40 +0100 Subject: [PATCH 02/10] first cut at snowflake exception tests --- test/snowflake_stage_exception_tests.jl | 441 +++++++++++++++--------- 1 file changed, 271 insertions(+), 170 deletions(-) diff --git a/test/snowflake_stage_exception_tests.jl b/test/snowflake_stage_exception_tests.jl index 36f2360..29327ae 100644 --- a/test/snowflake_stage_exception_tests.jl +++ b/test/snowflake_stage_exception_tests.jl @@ -1,106 +1,154 @@ @testitem "Basic Stage exceptions" setup=[InitializeObjectStore, SnowflakeMock] begin - using CloudBase.CloudTest: Minio + using CloudBase.CloudTest: Minio, Azurite import CloudBase - using RustyObjectStore: RustyObjectStore, get_object!, put_object, ClientOptions, SnowflakeConfig - - # For interactive testing, use Minio.run() instead of Minio.with() - # conf, p = Minio.run(; debug=true, public=false); atexit(() -> kill(p)) - Minio.with(; debug=true, public=false) do conf - credentials, container = conf - with(SFGatewayMock(credentials, container, true)) do config::SnowflakeConfig - global _stale_config = config + using RustyObjectStore: RustyObjectStore, ClientOptions, SnowflakeConfig, AbstractConfig + using RustyObjectStore: get_object!, put_object, delete_object - @testset "Insufficient output buffer size" begin - input = "1,2,3,4,5,6,7,8,9,1\n" ^ 5 - buffer = Vector{UInt8}(undef, 10) - @assert sizeof(input) == 100 - @assert sizeof(buffer) < sizeof(input) + function run_basic_test_cases(config::AbstractConfig, provider::Symbol) + @testset "Insufficient output buffer size" begin + input = "1,2,3,4,5,6,7,8,9,1\n" ^ 5 + buffer = Vector{UInt8}(undef, 10) + @assert sizeof(input) == 100 + @assert sizeof(buffer) < sizeof(input) - nbytes_written = put_object(codeunits(input), "test100B.csv", config) - @test nbytes_written == 100 + nbytes_written = put_object(codeunits(input), "test100B.csv", config) + @test nbytes_written == 100 - try - nbytes_read = get_object!(buffer, "test100B.csv", config) - @test false # Should have thrown an error - catch err - @test err isa RustyObjectStore.GetException - @test occursin("Supplied buffer was too small", err.msg) - end + try + nbytes_read = get_object!(buffer, "test100B.csv", config) + @test false # Should have thrown an error + catch err + @test err isa RustyObjectStore.GetException + @test occursin("Supplied buffer was too small", err.msg) end + end - @testset "Non-existing file" begin - buffer = Vector{UInt8}(undef, 100) - try - get_object!(buffer, "doesnt_exist.csv", config) - @test false # Should have thrown an error - catch e - @test e isa RustyObjectStore.GetException - @test occursin("404 Not Found", e.msg) + @testset "Non-existing file" begin + buffer = Vector{UInt8}(undef, 100) + try + get_object!(buffer, "doesnt_exist.csv", config) + @test false # Should have thrown an error + catch e + @test e isa RustyObjectStore.GetException + @test occursin("404 Not Found", e.msg) + println("type of config: ", typeof(config)) + println("config: ", config) + if provider == :aws @test occursin("The specified key does not exist", e.msg) + else + @test occursin("The specified blob does not exist", e.msg) end end + end - @testset "Delete non-existing file" begin + @testset "Delete non-existing file" begin + if provider == :aws # S3 semantics is to return success on deleting a non-existing file, so we expect this # to succeed delete_object("doesnt_exist.csv", config) @test true - end - - @testset "Non-existing container" begin - bad_config = SnowflakeConfig( - stage="doesnotexist", - account=config.account, - database=config.database, - schema=config.schema, - endpoint=config.endpoint, - master_token_path=config.master_token_path, - opts=ClientOptions(max_retries=2) - ) - buffer = Vector{UInt8}(undef, 100) - + else try - put_object(codeunits("a,b,c"), "invalid_credentials2.csv", bad_config) + delete_object("doesnt_exist.csv", config) @test false # Should have thrown an error catch e - @test e isa RustyObjectStore.PutException - @test occursin("Stage not found", e.msg) + @test e isa RustyObjectStore.DeleteException + @test occursin("404 Not Found", e.msg) + @test occursin("The specified blob does not exist", e.msg) end + end + end - nbytes_written = put_object(codeunits("a,b,c"), "invalid_credentials2.csv", config) - @assert nbytes_written == 5 + @testset "Non-existing container" begin + bad_config = SnowflakeConfig( + stage="doesnotexist", + account=config.account, + database=config.database, + schema=config.schema, + endpoint=config.endpoint, + master_token_path=config.master_token_path, + opts=ClientOptions(max_retries=2) + ) + buffer = Vector{UInt8}(undef, 100) - try - get_object!(buffer, "invalid_credentials2.csv", bad_config) - @test false # Should have thrown an error - catch e - @test e isa RustyObjectStore.GetException - @test occursin("Stage not found", e.msg) - end + try + put_object(codeunits("a,b,c"), "invalid_credentials2.csv", bad_config) + @test false # Should have thrown an error + catch e + @test e isa RustyObjectStore.PutException + @test occursin("Stage not found", e.msg) + end + + nbytes_written = put_object(codeunits("a,b,c"), "invalid_credentials2.csv", config) + @assert nbytes_written == 5 + + try + get_object!(buffer, "invalid_credentials2.csv", bad_config) + @test false # Should have thrown an error + catch e + @test e isa RustyObjectStore.GetException + @test occursin("Stage not found", e.msg) end - end # with(SFGatewayMock) - end # Minio.with - # Minio is not running at this point - @testset "Connection error" begin - buffer = Vector{UInt8}(undef, 100) - # These test retry the connection error - try - put_object(codeunits("a,b,c"), "still_doesnt_exist.csv", _stale_config) - @test false # Should have thrown an error - catch e - @test e isa RustyObjectStore.PutException - @test occursin("Connection refused", e.msg) || occursin("Unable to access master token file", e.msg) end + end + + function run_stale_config_test_cases(stale_config::AbstractConfig) + @testset "Connection error" begin + buffer = Vector{UInt8}(undef, 100) + # These test retry the connection error + try + put_object(codeunits("a,b,c"), "still_doesnt_exist.csv", stale_config) + @test false # Should have thrown an error + catch e + @test e isa RustyObjectStore.PutException + @test occursin("Connection refused", e.msg) || occursin("Unable to access master token file", e.msg) + end - try - get_object!(buffer, "still_doesnt_exist.csv", _stale_config) - @test false # Should have thrown an error - catch e - @test e isa RustyObjectStore.GetException - @test occursin("Connection refused", e.msg) || occursin("Unable to access master token file", e.msg) + try + get_object!(buffer, "still_doesnt_exist.csv", stale_config) + @test false # Should have thrown an error + catch e + @test e isa RustyObjectStore.GetException + @test occursin("Connection refused", e.msg) || occursin("Unable to access master token file", e.msg) + end end end + @testset "aws" begin + # For interactive testing, use Minio.run() instead of Minio.with() + # conf, p = Minio.run(; debug=true, public=false); atexit(() -> kill(p)) + Minio.with(; debug=true, public=false) do conf + credentials, container = conf + with(SFGatewayMock(credentials, container, true)) do config::SnowflakeConfig + global _stale_config = config + + run_basic_test_cases(config, :aws) + + end # with(SFGatewayMock) + end # Minio.with + + # MinIO is not running at this point + run_stale_config_test_cases(_stale_config) + end + + @testset "azure" begin + # For interactive testing, use Azurite.run() instead of Azurite.with() + # conf, p = Azurite.run(; debug=true, public=false); atexit(() -> kill(p)) + Azurite.with(; debug=true, public=false) do conf + credentials, container = conf + with(SFGatewayMock(credentials, container, true)) do config::SnowflakeConfig + global _stale_config = config + + run_basic_test_cases(config, :azure) + + end # with(SFGatewayMock) + end # Azurite.with + + # Azurite is not running at this point + run_stale_config_test_cases(_stale_config) + end + + @testset "multiple start" begin res = @ccall RustyObjectStore.rust_lib.start()::Cint @test res == 1 # Rust CResult::Error @@ -118,9 +166,32 @@ end # @testitem retry_timeout_secs = 10 request_timeout_secs = 1 + function for_all_providers(f::Function) + for provider in [:aws, :azure] + @testset "$(String(provider))" begin + f(provider) + end + end + end + + function get_creds_and_container(provider, port) + @assert provider === :aws || provider === :azure + if provider == :aws + return ( + CloudBase.AWSCredentials("dummy", "dummy"), + CloudBase.AWS.Bucket("dummy", host="http://127.0.0.1:$port"), + ) + else + return ( + CloudBase.AzureCredentials("dummy", "dummy"), + CloudBase.Azure.Container("dummy", "dummy", host="http://127.0.0.1:$port"), + ) + end + end + # Starts a TCP server that will accept the connection and start reading a few # bytes before closing the connection. - function test_tcp_error(method) + function test_tcp_error(method, provider) @assert method === :GET || method === :PUT nrequests = Ref(0) @@ -138,8 +209,7 @@ end # @testitem max_retries=max_retries, retry_timeout_secs=retry_timeout_secs ) - credentials = CloudBase.AWSCredentials("dummy", "dummy") - container = CloudBase.AWS.Bucket("dummy", host="http://127.0.0.1:$port") + credentials, container = get_creds_and_container(provider, port) with(SFGatewayMock(credentials, container, true, opts=opts)) do conf::SnowflakeConfig try method === :GET && get_object!(zeros(UInt8, 5), "blob", conf) @@ -159,7 +229,7 @@ end # @testitem # Starts an HTTP server that will respond with the headers for a GET response # and will write a partial body before closing the connection. - function test_get_stream_error() + function test_get_stream_error(provider) nrequests = Ref(0) (port, tcp_server) = Sockets.listenany(8083) @@ -176,8 +246,7 @@ end # @testitem max_retries=max_retries, retry_timeout_secs=retry_timeout_secs ) - credentials = CloudBase.AWSCredentials("dummy", "dummy") - container = CloudBase.AWS.Bucket("dummy", host="http://127.0.0.1:$port") + credentials, container = get_creds_and_container(provider, port) with(SFGatewayMock(credentials, container, false, opts=opts)) do conf::SnowflakeConfig try get_object!(zeros(UInt8, 20), "blob", conf) @@ -200,7 +269,7 @@ end # @testitem # Starts a TCP server that will accept the connection and start reading a few # bytes before forcing a TCP reset on the connection. - function test_tcp_reset(method) + function test_tcp_reset(method, provider) @assert method === :GET || method === :PUT nrequests = Ref(0) @@ -223,8 +292,7 @@ end # @testitem max_retries=max_retries, retry_timeout_secs=retry_timeout_secs ) - credentials = CloudBase.AWSCredentials("dummy", "dummy") - container = CloudBase.AWS.Bucket("dummy", host="http://127.0.0.1:$port") + credentials, container = get_creds_and_container(provider, port) with(SFGatewayMock(credentials, container, true, opts=opts)) do conf::SnowflakeConfig try method === :GET && get_object!(zeros(UInt8, 5), "blob", conf) @@ -244,7 +312,7 @@ end # @testitem # Starts an HTTP server that will respond with the headers for a GET response # and will write a partial body before forcing a TCP reset on the connection. - function test_get_stream_reset() + function test_get_stream_reset(provider) nrequests = Ref(0) (port, tcp_server) = Sockets.listenany(8083) @@ -268,8 +336,7 @@ end # @testitem max_retries=max_retries, retry_timeout_secs=retry_timeout_secs ) - credentials = CloudBase.AWSCredentials("dummy", "dummy") - container = CloudBase.AWS.Bucket("dummy", host="http://127.0.0.1:$port") + credentials, container = get_creds_and_container(provider, port) with(SFGatewayMock(credentials, container, false, opts=opts)) do conf::SnowflakeConfig try get_object!(zeros(UInt8, 20), "blob", conf) @@ -288,7 +355,7 @@ end # @testitem # Starts an HTTP server that will respond with the headers for a GET response # and will write a partial body before sleeping until the client times out. - function test_get_stream_timeout() + function test_get_stream_timeout(provider) nrequests = Ref(0) (port, tcp_server) = Sockets.listenany(8083) @@ -309,8 +376,7 @@ end # @testitem retry_timeout_secs=retry_timeout_secs, request_timeout_secs ) - credentials = CloudBase.AWSCredentials("dummy", "dummy") - container = CloudBase.AWS.Bucket("dummy", host="http://127.0.0.1:$port") + credentials, container = get_creds_and_container(provider, port) with(SFGatewayMock(credentials, container, false, opts=opts)) do conf::SnowflakeConfig try get_object!(zeros(UInt8, 20), "blob", conf) @@ -330,7 +396,7 @@ end # @testitem # Starts an HTTP server that will respond with the provided status code # when receiving a request for the provided method, it optionally returns # the provided headers in the response - function test_status(method, response_status, headers=nothing) + function test_status(method, response_status, provider, headers=nothing) @assert method === :GET || method === :PUT nrequests = Ref(0) response_body = "response body from the dummy server" @@ -350,8 +416,7 @@ end # @testitem max_retries=max_retries, retry_timeout_secs=retry_timeout_secs ) - credentials = CloudBase.AWSCredentials("dummy", "dummy") - container = CloudBase.AWS.Bucket("dummy", host="http://127.0.0.1:$port") + credentials, container = get_creds_and_container(provider, port) with(SFGatewayMock(credentials, container, true, opts=opts)) do conf::SnowflakeConfig try method === :GET && get_object!(zeros(UInt8, 5), "blob", conf) @@ -372,7 +437,7 @@ end # @testitem end # Starts an HTTP server that upon receiving the request sleeps until the client times out. - function test_timeout(method, message, wait_secs::Int = 60) + function test_timeout(method, message, provider, wait_secs::Int = 60) @assert method === :GET || method === :PUT nrequests = Ref(0) response_body = "response body from the dummy server" @@ -395,8 +460,7 @@ end # @testitem retry_timeout_secs=retry_timeout_secs, request_timeout_secs ) - credentials = CloudBase.AWSCredentials("dummy", "dummy") - container = CloudBase.AWS.Bucket("dummy", host="http://127.0.0.1:$port") + credentials, container = get_creds_and_container(provider, port) with(SFGatewayMock(credentials, container, true, opts=opts)) do conf::SnowflakeConfig try method === :GET && get_object!(zeros(UInt8, 5), "blob", conf) @@ -417,7 +481,7 @@ end # @testitem # Starts an HTTP server that upon receiving the request sleeps for 5 seconds to allow # for the client to simulate a cancellation. - function test_cancellation() + function test_cancellation(provider) nrequests = Ref(0) response_body = "response body from the dummy server" @@ -437,8 +501,7 @@ end # @testitem retry_timeout_secs=10, request_timeout_secs=10 ) - credentials = CloudBase.AWSCredentials("dummy", "dummy") - container = CloudBase.AWS.Bucket("dummy", host="http://127.0.0.1:$port") + credentials, container = get_creds_and_container(provider, port) with(SFGatewayMock(credentials, container, true, opts=opts)) do conf::SnowflakeConfig try size = 7_000_000 @@ -474,78 +537,94 @@ end # @testitem # AWS S3 can also respond with this code for other unrecoverable cases such as when # an upload exceeds the maximum allowed object size # See https://www.rfc-editor.org/rfc/rfc9110#status.400 - nrequests = test_status(:GET, 400) - @test nrequests == 1 - nrequests = test_status(:PUT, 400) - @test nrequests == 1 + for_all_providers() do provider + nrequests = test_status(:GET, 400, provider) + @test nrequests == 1 + nrequests = test_status(:PUT, 400, provider) + @test nrequests == 1 + end end @testset "403: Forbidden" begin # Returned when you pass an invalid api-key. # See https://www.rfc-editor.org/rfc/rfc9110#status.403 - nrequests = test_status(:GET, 403) - @test nrequests == 1 - nrequests = test_status(:PUT, 403) - @test nrequests == 1 + for_all_providers() do provider + nrequests = test_status(:GET, 403, provider) + @test nrequests == 1 + nrequests = test_status(:PUT, 403, provider) + @test nrequests == 1 + end end @testset "404: Not Found" begin # Returned when container not found or blob not found # See https://docs.aws.amazon.com/AmazonS3/latest/API/ErrorResponses.html # See https://www.rfc-editor.org/rfc/rfc9110#status.404 - nrequests = test_status(:GET, 404) - @test nrequests == 1 + for_all_providers() do provider + nrequests = test_status(:GET, 404, provider) + @test nrequests == 1 + end end @testset "405: Method Not Supported" begin # See https://www.rfc-editor.org/rfc/rfc9110#status.405 - nrequests = test_status(:GET, 405, ["Allow" => "PUT"]) - @test nrequests == 1 - nrequests = test_status(:PUT, 405, ["Allow" => "GET"]) - @test nrequests == 1 + for_all_providers() do provider + nrequests = test_status(:GET, 405, provider, ["Allow" => "PUT"]) + @test nrequests == 1 + nrequests = test_status(:PUT, 405, provider, ["Allow" => "GET"]) + @test nrequests == 1 + end end @testset "409: Conflict" begin # Returned when write operations conflict. # See https://docs.aws.amazon.com/AmazonS3/latest/API/ErrorResponses.html # See https://www.rfc-editor.org/rfc/rfc9110#status.409 - nrequests = test_status(:GET, 409) - @test nrequests == 1 - nrequests = test_status(:PUT, 409) - @test nrequests == 1 + for_all_providers() do provider + nrequests = test_status(:GET, 409, provider) + @test nrequests == 1 + nrequests = test_status(:PUT, 409, provider) + @test nrequests == 1 + end end @testset "412: Precondition Failed" begin # Returned when an If-Match or If-None-Match header's condition evaluates to false # See https://docs.aws.amazon.com/AmazonS3/latest/API/ErrorResponses.html # See https://www.rfc-editor.org/rfc/rfc9110#status.412 - nrequests = test_status(:GET, 412) - @test nrequests == 1 - nrequests = test_status(:PUT, 412) - @test nrequests == 1 + for_all_providers() do provider + nrequests = test_status(:GET, 412, provider) + @test nrequests == 1 + nrequests = test_status(:PUT, 412, provider) + @test nrequests == 1 + end end @testset "413: Content Too Large" begin # See https://www.rfc-editor.org/rfc/rfc9110#status.413 - nrequests = test_status(:PUT, 413) - @test nrequests == 1 + for_all_providers() do provider + nrequests = test_status(:PUT, 413, provider) + @test nrequests == 1 + end end @testset "429: Too Many Requests" begin - # See https://www.rfc-editor.org/rfc/rfc6585#section-4 - nrequests = test_status(:GET, 429) - @test nrequests == 1 - nrequests = test_status(:PUT, 429) - @test nrequests == 1 - # See https://www.rfc-editor.org/rfc/rfc9110#field.retry-after - # TODO: We probably should respect the Retry-After header, but we currently don't - # (and we don't know if AWS actually sets it) - # This can happen when AWS is throttling us, so it might be a good idea to retry with some - # larger initial backoff (very eager retries probably only make the situation worse). - nrequests = test_status(:GET, 429, ["Retry-After" => 10]) - @test nrequests == 1 + max_retries broken=true - nrequests = test_status(:PUT, 429, ["Retry-After" => 10]) - @test nrequests == 1 + max_retries broken=true + for_all_providers() do provider + # See https://www.rfc-editor.org/rfc/rfc6585#section-4 + nrequests = test_status(:GET, 429, provider) + @test nrequests == 1 + nrequests = test_status(:PUT, 429, provider) + @test nrequests == 1 + # See https://www.rfc-editor.org/rfc/rfc9110#field.retry-after + # TODO: We probably should respect the Retry-After header, but we currently don't + # (and we don't know if AWS actually sets it) + # This can happen when AWS is throttling us, so it might be a good idea to retry with some + # larger initial backoff (very eager retries probably only make the situation worse). + nrequests = test_status(:GET, 429, provider, ["Retry-After" => 10]) + @test nrequests == 1 + max_retries broken=true + nrequests = test_status(:PUT, 429, provider, ["Retry-After" => 10]) + @test nrequests == 1 + max_retries broken=true + end end @testset "502: Bad Gateway" begin @@ -554,10 +633,12 @@ end # @testitem # gateway or proxy, received an invalid response from an inbound server it accessed # while attempting to fulfill the request. # This error can occur when you enter HTTP instead of HTTPS in the connection. - nrequests = test_status(:GET, 502) - @test nrequests == 1 + max_retries - nrequests = test_status(:PUT, 502) - @test nrequests == 1 + max_retries + for_all_providers() do provider + nrequests = test_status(:GET, 502, provider) + @test nrequests == 1 + max_retries + nrequests = test_status(:PUT, 502, provider) + @test nrequests == 1 + max_retries + end end @testset "503: Service Unavailable" begin @@ -568,10 +649,12 @@ end # @testitem # header field (Section 10.2.3) to suggest an appropriate amount of time for the # client to wait before retrying the request. # See https://docs.aws.amazon.com/AmazonS3/latest/API/ErrorResponses.html - nrequests = test_status(:GET, 503) - @test nrequests == 1 + max_retries - nrequests = test_status(:PUT, 503) - @test nrequests == 1 + max_retries + for_all_providers() do provider + nrequests = test_status(:GET, 503, provider) + @test nrequests == 1 + max_retries + nrequests = test_status(:PUT, 503, provider) + @test nrequests == 1 + max_retries + end end @testset "504: Gateway Timeout" begin @@ -579,50 +662,68 @@ end # @testitem # The 504 (Gateway Timeout) status code indicates that the server, while acting as # a gateway or proxy, did not receive a timely response from an upstream server it # needed to access in order to complete the request - nrequests = test_status(:GET, 504) - @test nrequests == 1 + max_retries - nrequests = test_status(:PUT, 504) - @test nrequests == 1 + max_retries + for_all_providers() do provider + nrequests = test_status(:GET, 504, provider) + @test nrequests == 1 + max_retries + nrequests = test_status(:PUT, 504, provider) + @test nrequests == 1 + max_retries + end end @testset "Timeout" begin - nrequests = test_timeout(:GET, "timed out", 2) - @test nrequests == 1 + max_retries - nrequests = test_timeout(:PUT, "timed out", 2) - @test nrequests == 1 + max_retries + for_all_providers() do provider + nrequests = test_timeout(:GET, "timed out", provider, 2) + @test nrequests == 1 + max_retries + nrequests = test_timeout(:PUT, "timed out", provider, 2) + @test nrequests == 1 + max_retries + end end @testset "TCP Closed" begin - nrequests = test_tcp_error(:GET) - @test nrequests == 1 + max_retries - nrequests = test_tcp_error(:PUT) - @test nrequests == 1 + max_retries + for_all_providers() do provider + nrequests = test_tcp_error(:GET, provider) + @test nrequests == 1 + max_retries + nrequests = test_tcp_error(:PUT, provider) + @test nrequests == 1 + max_retries + end end @testset "TCP reset" begin - nrequests = test_tcp_reset(:GET) - @test nrequests == 1 + max_retries - nrequests = test_tcp_reset(:PUT) - @test nrequests == 1 + max_retries + for_all_providers() do provider + nrequests = test_tcp_reset(:GET, provider) + @test nrequests == 1 + max_retries + nrequests = test_tcp_reset(:PUT, provider) + @test nrequests == 1 + max_retries + end end + # TODO: failing @testset "Incomplete GET body" begin - nrequests = test_get_stream_error() - @test nrequests == 1 + max_retries + for_all_providers() do provider + nrequests = test_get_stream_error(provider) + @test nrequests == 1 + max_retries + end end + # TODO: failing @testset "Incomplete GET body reset" begin - nrequests = test_get_stream_reset() - @test nrequests == 1 + max_retries + for_all_providers() do provider + nrequests = test_get_stream_reset(provider) + @test nrequests == 1 + max_retries + end end @testset "Incomplete GET body timeout" begin - nrequests = test_get_stream_timeout() - @test nrequests == 1 + max_retries + for_all_providers() do provider + nrequests = test_get_stream_timeout(provider) + @test nrequests == 1 + max_retries + end end @testset "Cancellation" begin - nrequests = test_cancellation() - @test nrequests == 1 + for_all_providers() do provider + nrequests = test_cancellation(provider) + @test nrequests == 1 + end end end From a62fe63b2a8f55a93ca29054cb2d03b825d09482 Mon Sep 17 00:00:00 2001 From: Alexander Renz-Wieland Date: Tue, 19 Nov 2024 19:30:18 +0100 Subject: [PATCH 03/10] fix stream tests --- test/snowflake_stage_exception_tests.jl | 10 ++++++++-- 1 file changed, 8 insertions(+), 2 deletions(-) diff --git a/test/snowflake_stage_exception_tests.jl b/test/snowflake_stage_exception_tests.jl index 29327ae..a6d3d84 100644 --- a/test/snowflake_stage_exception_tests.jl +++ b/test/snowflake_stage_exception_tests.jl @@ -237,6 +237,10 @@ end # @testitem nrequests[] += 1 HTTP.setstatus(http, 200) HTTP.setheader(http, "Content-Length" => "20") + if provider == :azure + HTTP.setheader(http, "Last-Modified" => "Tue, 15 Oct 2019 12:45:26 GMT") + HTTP.setheader(http, "ETag" => "123") + end HTTP.startwrite(http) write(http, "not enough") close(http.stream) @@ -320,6 +324,10 @@ end # @testitem nrequests[] += 1 HTTP.setstatus(http, 200) HTTP.setheader(http, "Content-Length" => "20") + if provider == :azure + HTTP.setheader(http, "Last-Modified" => "Tue, 15 Oct 2019 12:45:26 GMT") + HTTP.setheader(http, "ETag" => "123") + end HTTP.startwrite(http) write(http, "not enough") socket = HTTP.IOExtras.tcpsocket(HTTP.Connections.getrawstream(http)) @@ -697,7 +705,6 @@ end # @testitem end end - # TODO: failing @testset "Incomplete GET body" begin for_all_providers() do provider nrequests = test_get_stream_error(provider) @@ -705,7 +712,6 @@ end # @testitem end end - # TODO: failing @testset "Incomplete GET body reset" begin for_all_providers() do provider nrequests = test_get_stream_reset(provider) From d01837eb76374915b262e952ac27a526f2535b0b Mon Sep 17 00:00:00 2001 From: Alexander Renz-Wieland Date: Wed, 20 Nov 2024 10:05:08 +0100 Subject: [PATCH 04/10] cleanup --- Project.toml | 4 ++-- src/mock_server.jl | 1 - 2 files changed, 2 insertions(+), 3 deletions(-) diff --git a/Project.toml b/Project.toml index e9c7ef4..57cf96d 100644 --- a/Project.toml +++ b/Project.toml @@ -1,6 +1,6 @@ name = "RustyObjectStore" uuid = "1b5eed3d-1f46-4baa-87f3-a4a892b23610" -version = "0.10.0" +version = "0.11.0" [deps] Base64 = "2a0f44e3-6c83-55bd-87e4-b1978d98bd5f" @@ -20,7 +20,7 @@ ReTestItems = "1" Sockets = "1" Test = "1" julia = "1.8" -object_store_ffi_jll = "0.10.0" +object_store_ffi_jll = "0.11.0" [extras] CloudBase = "85eb1798-d7c4-4918-bb13-c944d38e27ed" diff --git a/src/mock_server.jl b/src/mock_server.jl index cebdf00..9ae568d 100644 --- a/src/mock_server.jl +++ b/src/mock_server.jl @@ -299,7 +299,6 @@ function start(gw::SFGatewayMock) stage_info["testEndpoint"] * "/" * stage_info["storageAccount"] * "/" * stage_info["location"] * path; service="blob", region="westus2", credentials=gw.credentials ) - println("response headers: $(response.headers)") response.headers, "x-ms-meta-matdesc" else error("unknown credentials type: $(typeof(gw.credentials))") From e5e3a734516f46dc25984dff5cb843a029677bb3 Mon Sep 17 00:00:00 2001 From: Alexander Renz-Wieland Date: Sat, 23 Nov 2024 09:01:55 +0100 Subject: [PATCH 05/10] bump as object_store_ffi v0.11.0 should be available now --- src/mock_server.jl | 1 + 1 file changed, 1 insertion(+) diff --git a/src/mock_server.jl b/src/mock_server.jl index 9ae568d..2368de1 100644 --- a/src/mock_server.jl +++ b/src/mock_server.jl @@ -128,6 +128,7 @@ function construct_stage_info(credentials::AzureCredentials, store::Azure.Contai account = rest[1] container = rest[2] + Dict( "locationType" => "AZURE", "location" => container * "/", From d8299dd9820c8d613c3edafc8a792864a636d558 Mon Sep 17 00:00:00 2001 From: Alexander Renz-Wieland Date: Sat, 23 Nov 2024 09:34:37 +0100 Subject: [PATCH 06/10] Update src/mock_server.jl MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Co-authored-by: André Guedes --- src/mock_server.jl | 8 ++------ 1 file changed, 2 insertions(+), 6 deletions(-) diff --git a/src/mock_server.jl b/src/mock_server.jl index 2368de1..c563e1b 100644 --- a/src/mock_server.jl +++ b/src/mock_server.jl @@ -121,12 +121,8 @@ function construct_stage_info(credentials::AWSCredentials, store::AWS.Bucket, pa end function construct_stage_info(credentials::AzureCredentials, store::Azure.Container, path::String, encrypted::Bool) - m = match(r"(https?://.*?)/(.*)", store.baseurl) - @assert !isnothing(m) - test_endpoint = m.captures[1] - rest = split(HTTP.unescapeuri(m.captures[2]), "/") - account = rest[1] - container = rest[2] + ok, test_endpoint, account, container, _path = CloudStore.parseAzureAccountContainerBlob(store.baseurl; parseLocal=true) + ok || error("failed to parse Azurite baseurl") Dict( From 09d5dd17758f8ab911fd3f02d83c32e60d12475c Mon Sep 17 00:00:00 2001 From: Alexander Renz-Wieland Date: Sat, 23 Nov 2024 09:50:08 +0100 Subject: [PATCH 07/10] PR feedback --- src/mock_server.jl | 17 ++++++++++------- 1 file changed, 10 insertions(+), 7 deletions(-) diff --git a/src/mock_server.jl b/src/mock_server.jl index c563e1b..7d287f8 100644 --- a/src/mock_server.jl +++ b/src/mock_server.jl @@ -120,15 +120,18 @@ function construct_stage_info(credentials::AWSCredentials, store::AWS.Bucket, pa ) end -function construct_stage_info(credentials::AzureCredentials, store::Azure.Container, path::String, encrypted::Bool) - ok, test_endpoint, account, container, _path = CloudStore.parseAzureAccountContainerBlob(store.baseurl; parseLocal=true) - ok || error("failed to parse Azurite baseurl") - +function construct_stage_info(credentials::AzureCredentials, store::Azure.Container, encrypted::Bool) + m = match(r"(https?://.*?)/(.*)", store.baseurl) + @assert !isnothing(m) + test_endpoint = m.captures[1] + rest = split(HTTP.unescapeuri(m.captures[2]), "/") + account = rest[1] + container = rest[2] Dict( "locationType" => "AZURE", "location" => container * "/", - "path" => path, + "path" => container * "/", "region" => "westus2", "storageAccount" => account, "isClientSideEncrypted" => encrypted, @@ -241,7 +244,7 @@ function start(gw::SFGatewayMock) stage_info = if isa(gw.credentials, AWSCredentials) && isa(gw.store, AWS.Bucket) construct_stage_info(gw.credentials, gw.store, stage_path(stage), gw.encrypted) elseif isa(gw.credentials, AzureCredentials) && isa(gw.store, Azure.Container) - construct_stage_info(gw.credentials, gw.store, stage_path(stage), gw.encrypted) + construct_stage_info(gw.credentials, gw.store, gw.encrypted) else error("unimplemented") end @@ -278,7 +281,7 @@ function start(gw::SFGatewayMock) stage_info = if isa(gw.credentials, AWSCredentials) && isa(gw.store, AWS.Bucket) construct_stage_info(gw.credentials, gw.store, stage_path(stage), gw.encrypted) elseif isa(gw.credentials, AzureCredentials) && isa(gw.store, Azure.Container) - construct_stage_info(gw.credentials, gw.store, stage_path(stage), gw.encrypted) + construct_stage_info(gw.credentials, gw.store, gw.encrypted) else error("unimplemented") end From e9c59a8c5683cf7b2a610b399535156690eddb19 Mon Sep 17 00:00:00 2001 From: Alexander Renz-Wieland Date: Mon, 25 Nov 2024 16:59:21 +0100 Subject: [PATCH 08/10] use parseAzureAccountContainerBlob --- src/mock_server.jl | 11 ++++------- 1 file changed, 4 insertions(+), 7 deletions(-) diff --git a/src/mock_server.jl b/src/mock_server.jl index 7d287f8..8fccda6 100644 --- a/src/mock_server.jl +++ b/src/mock_server.jl @@ -1,4 +1,4 @@ -using CloudBase: CloudCredentials, AbstractStore +using CloudBase: CloudBase, CloudCredentials, AbstractStore using CloudBase: AWSCredentials, AWS using CloudBase: AzureCredentials, Azure using JSON3, HTTP, Sockets, Base64 @@ -121,12 +121,9 @@ function construct_stage_info(credentials::AWSCredentials, store::AWS.Bucket, pa end function construct_stage_info(credentials::AzureCredentials, store::Azure.Container, encrypted::Bool) - m = match(r"(https?://.*?)/(.*)", store.baseurl) - @assert !isnothing(m) - test_endpoint = m.captures[1] - rest = split(HTTP.unescapeuri(m.captures[2]), "/") - account = rest[1] - container = rest[2] + ok, _service, test_endpoint, account, container, _path = + CloudBase.parseAzureAccountContainerBlob(rstrip(store.baseurl, '/'); parseLocal=true) + ok || error("failed to parse Azurite baseurl") Dict( "locationType" => "AZURE", From 48640d7200c6251cbd965ede31f7084d45d74ea4 Mon Sep 17 00:00:00 2001 From: Alexander Renz-Wieland Date: Mon, 25 Nov 2024 17:28:16 +0100 Subject: [PATCH 09/10] temp --- src/mock_server.jl | 1 + 1 file changed, 1 insertion(+) diff --git a/src/mock_server.jl b/src/mock_server.jl index 8fccda6..4de3a59 100644 --- a/src/mock_server.jl +++ b/src/mock_server.jl @@ -121,6 +121,7 @@ function construct_stage_info(credentials::AWSCredentials, store::AWS.Bucket, pa end function construct_stage_info(credentials::AzureCredentials, store::Azure.Container, encrypted::Bool) + println("store.baseurl: $(store.baseurl)") # TEMP ok, _service, test_endpoint, account, container, _path = CloudBase.parseAzureAccountContainerBlob(rstrip(store.baseurl, '/'); parseLocal=true) ok || error("failed to parse Azurite baseurl") From a5601b95b783bd86a3d521cdbca0faa06a5d794b Mon Sep 17 00:00:00 2001 From: Alexander Renz-Wieland Date: Tue, 26 Nov 2024 08:25:10 +0100 Subject: [PATCH 10/10] switch back to manual parsing --- src/mock_server.jl | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/src/mock_server.jl b/src/mock_server.jl index 4de3a59..52d5c6b 100644 --- a/src/mock_server.jl +++ b/src/mock_server.jl @@ -121,10 +121,12 @@ function construct_stage_info(credentials::AWSCredentials, store::AWS.Bucket, pa end function construct_stage_info(credentials::AzureCredentials, store::Azure.Container, encrypted::Bool) - println("store.baseurl: $(store.baseurl)") # TEMP - ok, _service, test_endpoint, account, container, _path = - CloudBase.parseAzureAccountContainerBlob(rstrip(store.baseurl, '/'); parseLocal=true) - ok || error("failed to parse Azurite baseurl") + m = match(r"(https?://.*?)/(.*)", store.baseurl) + @assert !isnothing(m) + test_endpoint = m.captures[1] + rest = split(HTTP.unescapeuri(m.captures[2]), "/") + account = rest[1] + container = rest[2] Dict( "locationType" => "AZURE",