From 7e7fcd12ae3ef465056aa5f9d2fabd61f9bcd8aa Mon Sep 17 00:00:00 2001 From: Gauri Kalra Date: Tue, 26 May 2026 08:11:48 +0000 Subject: [PATCH] fix(storage): Set the idempotency token for async rewrites --- .../async/rewriter_connection_impl.cc | 3 +- .../internal/async/rewriter_connection_impl.h | 2 + .../async/rewriter_connection_impl_test.cc | 60 +++++++++++++++++++ 3 files changed, 64 insertions(+), 1 deletion(-) diff --git a/google/cloud/storage/internal/async/rewriter_connection_impl.cc b/google/cloud/storage/internal/async/rewriter_connection_impl.cc index 6b6a987929b28..81babb43d4e5b 100644 --- a/google/cloud/storage/internal/async/rewriter_connection_impl.cc +++ b/google/cloud/storage/internal/async/rewriter_connection_impl.cc @@ -49,11 +49,12 @@ RewriterConnectionImpl::Iterate() { current_->get()->clone(), current_->get()->clone(), policy->RewriteObject(request_), cq_, - [stub = stub_]( + [stub = stub_, id = invocation_id_generator_.MakeInvocationId()]( CompletionQueue& cq, std::shared_ptr context, google::cloud::internal::ImmutableOptions options, google::storage::v2::RewriteObjectRequest const& proto) { + AddIdempotencyToken(*context, id); return stub->AsyncRewriteObject(cq, std::move(context), std::move(options), proto); }, diff --git a/google/cloud/storage/internal/async/rewriter_connection_impl.h b/google/cloud/storage/internal/async/rewriter_connection_impl.h index 06c52e85a6d04..0d79e6955f5f4 100644 --- a/google/cloud/storage/internal/async/rewriter_connection_impl.h +++ b/google/cloud/storage/internal/async/rewriter_connection_impl.h @@ -20,6 +20,7 @@ #include "google/cloud/storage/options.h" #include "google/cloud/completion_queue.h" #include "google/cloud/future.h" +#include "google/cloud/internal/invocation_id_generator.h" #include "google/cloud/status_or.h" #include "google/cloud/version.h" #include "google/storage/v2/storage.pb.h" @@ -53,6 +54,7 @@ class RewriterConnectionImpl std::shared_ptr stub_; google::cloud::internal::ImmutableOptions current_; google::storage::v2::RewriteObjectRequest request_; + google::cloud::internal::InvocationIdGenerator invocation_id_generator_; }; GOOGLE_CLOUD_CPP_INLINE_NAMESPACE_END diff --git a/google/cloud/storage/internal/async/rewriter_connection_impl_test.cc b/google/cloud/storage/internal/async/rewriter_connection_impl_test.cc index 7e37f8884a5a0..cdf74fd1ac261 100644 --- a/google/cloud/storage/internal/async/rewriter_connection_impl_test.cc +++ b/google/cloud/storage/internal/async/rewriter_connection_impl_test.cc @@ -23,6 +23,7 @@ #include "google/cloud/internal/background_threads_impl.h" #include "google/cloud/testing_util/async_sequencer.h" #include "google/cloud/testing_util/status_matchers.h" +#include "google/cloud/testing_util/validate_metadata.h" #include namespace google { @@ -238,6 +239,65 @@ TEST(RewriterConnectionImplTest, TooManyTransients) { EXPECT_THAT(r1.get(), StatusIs(TransientError().status().code())); } +TEST(RewriterConnectionImplTest, IterateReusesIdempotencyTokenOnRetry) { + google::cloud::testing_util::ValidateMetadataFixture + validate_metadata_fixture; + AsyncSequencer sequencer; + auto mock = std::make_shared(); + std::string first_token; + + EXPECT_CALL(*mock, AsyncRewriteObject) + .WillOnce([&](auto&, auto const& context, auto const&, + google::storage::v2::RewriteObjectRequest const&) { + auto metadata = validate_metadata_fixture.GetMetadata(*context); + auto l = metadata.find("x-goog-gcs-idempotency-token"); + EXPECT_NE(l, metadata.end()); + if (l != metadata.end()) { + first_token = l->second; + EXPECT_FALSE(first_token.empty()); + } + + return sequencer.PushBack("RewriteObject(1)").then([](auto) { + return TransientError(); + }); + }) + .WillOnce([&](auto&, auto const& context, auto const&, + google::storage::v2::RewriteObjectRequest const&) { + auto metadata = validate_metadata_fixture.GetMetadata(*context); + auto l = metadata.find("x-goog-gcs-idempotency-token"); + EXPECT_NE(l, metadata.end()); + if (l != metadata.end()) { + EXPECT_EQ(l->second, first_token); + } + + return sequencer.PushBack("RewriteObject(2)").then([](auto) { + google::storage::v2::RewriteResponse response; + response.set_total_bytes_rewritten(1000); + response.set_object_size(3000); + response.set_rewrite_token("test-rewrite-token"); + return make_status_or(response); + }); + }); + + internal::AutomaticallyCreatedBackgroundThreads pool(1); + auto connection = std::make_shared( + pool.cq(), std::move(mock), TestOptions(), MakeRequest()); + + auto r1 = connection->Iterate(); + auto next = sequencer.PopFrontWithName(); + EXPECT_EQ(next.second, "RewriteObject(1)"); + next.first.set_value(true); + next = sequencer.PopFrontWithName(); + EXPECT_EQ(next.second, "RewriteObject(2)"); + next.first.set_value(true); + EXPECT_THAT( + r1.get(), + IsOkAndHolds(ResultOf( + "total bytes", + [](RewriteResponse const& v) { return v.total_bytes_rewritten(); }, + 1000))); +} + GOOGLE_CLOUD_CPP_INLINE_NAMESPACE_END } // namespace storage_internal } // namespace cloud