From 13ce63fe02393d98601d53dc599e70bed3ca7f3d Mon Sep 17 00:00:00 2001 From: Vaibhav Pratap Date: Tue, 26 May 2026 07:13:05 +0000 Subject: [PATCH 1/5] fix(storage): C++ SDK might be replaying more writes than necessary on write stream reconnects --- .../async/writer_connection_resumed.cc | 24 ++-- .../async/writer_connection_resumed_test.cc | 111 ++++++++++++++++++ 2 files changed, 128 insertions(+), 7 deletions(-) diff --git a/google/cloud/storage/internal/async/writer_connection_resumed.cc b/google/cloud/storage/internal/async/writer_connection_resumed.cc index 0dd2a6ace9897..e1bee17c5529f 100644 --- a/google/cloud/storage/internal/async/writer_connection_resumed.cc +++ b/google/cloud/storage/internal/async/writer_connection_resumed.cc @@ -441,14 +441,24 @@ class AsyncWriterConnectionResumedState return SetError(std::move(lk), std::move(res).status()); } // Regular resume attempt succeeded. Check state. - auto state = impl_->PersistedState(); - if (absl::holds_alternative(state)) { - // Found finalized object (maybe finalized concurrently or resumed). - return SetFinalized(std::move(lk), absl::get( - std::move(state))); + std::int64_t persisted_offset = 0; + if (res->first_response.has_resource()) { + if (!res->first_response.has_write_handle()) { + // Found finalized object (maybe finalized concurrently or resumed). + return SetFinalized(std::move(lk), res->first_response.resource()); + } + persisted_offset = res->first_response.resource().size(); + } else if (res->first_response.has_persisted_size()) { + persisted_offset = res->first_response.persisted_size(); + } else { + auto state = impl_->PersistedState(); + if (absl::holds_alternative(state)) { + // Found finalized object (maybe finalized concurrently or resumed). + return SetFinalized(std::move(lk), absl::get( + std::move(state))); + } + persisted_offset = absl::get(state); } - // Regular resume succeeded, object not finalized. Continue writing. - auto persisted_offset = absl::get(state); auto checksums = impl_->PersistedChecksums(); diff --git a/google/cloud/storage/internal/async/writer_connection_resumed_test.cc b/google/cloud/storage/internal/async/writer_connection_resumed_test.cc index dcebb80ac8c63..542a0d3c7cc0f 100644 --- a/google/cloud/storage/internal/async/writer_connection_resumed_test.cc +++ b/google/cloud/storage/internal/async/writer_connection_resumed_test.cc @@ -733,6 +733,117 @@ TEST(WriterConnectionResumed, ResetWriteOffsetOnResume) { EXPECT_THAT(write.get(), StatusIs(StatusCode::kOk)); } +TEST(WriterConnectionResumed, ResumeUsesSizeFromFirstResponse) { + AsyncSequencer sequencer; + auto mock = std::make_unique(); + auto* mock_ptr = mock.get(); + + auto initial_request = google::storage::v2::BidiWriteObjectRequest{}; + google::storage::v2::BidiWriteObjectResponse first_response; + first_response.mutable_write_handle()->set_handle("initial-handle"); + + auto mock_hash = + std::make_shared(); + EXPECT_CALL(*mock_hash, Update(::testing::An(), + ::testing::An(), + ::testing::An())) + .WillRepeatedly(Return(Status())); + + EXPECT_CALL(*mock_ptr, PersistedState) + .WillOnce(Return(MakePersistedState(0))); + + auto const payload = TestPayload(2048); + + EXPECT_CALL(*mock_ptr, Flush(_)).WillOnce([&](auto) { + return sequencer.PushBack("Flush").then([](auto f) { + if (f.get()) return Status{}; + return TransientError(); + }); + }); + + MockFactory mock_factory; + auto mock_stream = + std::make_unique>(); + auto* mock_stream_ptr = mock_stream.get(); + + EXPECT_CALL(mock_factory, Call(_)) + .WillOnce([&](google::storage::v2::BidiWriteObjectRequest const&) { + WriteObject::WriteResult result; + result.stream = std::move(mock_stream); + result.first_response.mutable_write_handle()->set_handle("new-handle"); + result.first_response.set_persisted_size(2048); + return sequencer.PushBack("Factory").then( + [r = std::move(result)](auto) mutable { + return StatusOr(std::move(r)); + }); + }); + + EXPECT_CALL(*mock_stream_ptr, Write(_, _)) + .WillOnce([&](google::storage::v2::BidiWriteObjectRequest const& request, + grpc::WriteOptions) { + EXPECT_EQ(request.write_offset(), 2048); + return sequencer.PushBack("StateLookupWrite").then([](auto) { return true; }); + }) + .WillOnce([&](google::storage::v2::BidiWriteObjectRequest const& request, + grpc::WriteOptions) { + EXPECT_TRUE(GetContent(request.checksummed_data()).empty()); + EXPECT_TRUE(request.flush()); + return sequencer.PushBack("GhostWrite").then([](auto) { return true; }); + }); + + google::storage::v2::BidiWriteObjectResponse read_response; + read_response.set_persisted_size(2048); + EXPECT_CALL(*mock_stream_ptr, Read) + .WillOnce([&, read_response]() { + return sequencer.PushBack("StreamRead1").then([read_response](auto) { + return absl::make_optional(read_response); + }); + }) + .WillOnce([&, read_response]() { + return sequencer.PushBack("StreamRead2").then([read_response](auto) { + return absl::make_optional(read_response); + }); + }); + + EXPECT_CALL(*mock_stream_ptr, Finish) + .WillOnce(Return(make_ready_future(Status{}))); + EXPECT_CALL(*mock_stream_ptr, Cancel).WillRepeatedly(Return()); + + auto connection = MakeWriterConnectionResumed( + mock_factory.AsStdFunction(), std::move(mock), initial_request, mock_hash, + first_response, Options{}); + + auto write = connection->Write(payload); + + auto next = sequencer.PopFrontWithName(); + EXPECT_EQ(next.second, "Flush"); + next.first.set_value(false); + + next = sequencer.PopFrontWithName(); + EXPECT_EQ(next.second, "Factory"); + next.first.set_value(true); + + next = sequencer.PopFrontWithName(); + EXPECT_EQ(next.second, "StateLookupWrite"); + next.first.set_value(true); + + next = sequencer.PopFrontWithName(); + EXPECT_EQ(next.second, "StreamRead1"); + next.first.set_value(true); + + next = sequencer.PopFrontWithName(); + EXPECT_EQ(next.second, "GhostWrite"); + next.first.set_value(true); + + next = sequencer.PopFrontWithName(); + EXPECT_EQ(next.second, "StreamRead2"); + next.first.set_value(true); + + EXPECT_THAT(write.get(), StatusIs(StatusCode::kOk)); +} + } // namespace GOOGLE_CLOUD_CPP_INLINE_NAMESPACE_END } // namespace storage_internal From f725eb737bd35078789830969cecddda18481dbc Mon Sep 17 00:00:00 2001 From: Vaibhav Pratap Date: Tue, 26 May 2026 09:06:26 +0000 Subject: [PATCH 2/5] fix the format --- .../storage/internal/async/writer_connection_resumed.cc | 5 +++-- .../storage/internal/async/writer_connection_resumed_test.cc | 4 +++- 2 files changed, 6 insertions(+), 3 deletions(-) diff --git a/google/cloud/storage/internal/async/writer_connection_resumed.cc b/google/cloud/storage/internal/async/writer_connection_resumed.cc index e1bee17c5529f..f8a5dfdc51abf 100644 --- a/google/cloud/storage/internal/async/writer_connection_resumed.cc +++ b/google/cloud/storage/internal/async/writer_connection_resumed.cc @@ -454,8 +454,9 @@ class AsyncWriterConnectionResumedState auto state = impl_->PersistedState(); if (absl::holds_alternative(state)) { // Found finalized object (maybe finalized concurrently or resumed). - return SetFinalized(std::move(lk), absl::get( - std::move(state))); + return SetFinalized( + std::move(lk), + absl::get(std::move(state))); } persisted_offset = absl::get(state); } diff --git a/google/cloud/storage/internal/async/writer_connection_resumed_test.cc b/google/cloud/storage/internal/async/writer_connection_resumed_test.cc index 542a0d3c7cc0f..6c33bb2d7051c 100644 --- a/google/cloud/storage/internal/async/writer_connection_resumed_test.cc +++ b/google/cloud/storage/internal/async/writer_connection_resumed_test.cc @@ -784,7 +784,9 @@ TEST(WriterConnectionResumed, ResumeUsesSizeFromFirstResponse) { .WillOnce([&](google::storage::v2::BidiWriteObjectRequest const& request, grpc::WriteOptions) { EXPECT_EQ(request.write_offset(), 2048); - return sequencer.PushBack("StateLookupWrite").then([](auto) { return true; }); + return sequencer.PushBack("StateLookupWrite").then([](auto) { + return true; + }); }) .WillOnce([&](google::storage::v2::BidiWriteObjectRequest const& request, grpc::WriteOptions) { From ddfdfd60ad020857296a58418762fdaba49a8498 Mon Sep 17 00:00:00 2001 From: Vaibhav Pratap Date: Tue, 26 May 2026 09:31:12 +0000 Subject: [PATCH 3/5] resolve ai comments --- .../cloud/storage/internal/async/writer_connection_resumed.cc | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/google/cloud/storage/internal/async/writer_connection_resumed.cc b/google/cloud/storage/internal/async/writer_connection_resumed.cc index f8a5dfdc51abf..7bfcd7c86b576 100644 --- a/google/cloud/storage/internal/async/writer_connection_resumed.cc +++ b/google/cloud/storage/internal/async/writer_connection_resumed.cc @@ -445,7 +445,8 @@ class AsyncWriterConnectionResumedState if (res->first_response.has_resource()) { if (!res->first_response.has_write_handle()) { // Found finalized object (maybe finalized concurrently or resumed). - return SetFinalized(std::move(lk), res->first_response.resource()); + return SetFinalized(std::move(lk), + std::move(*res->first_response.mutable_resource())); } persisted_offset = res->first_response.resource().size(); } else if (res->first_response.has_persisted_size()) { From 0903aeb477cfacad0ca354ab8b51ee4cbf168eb0 Mon Sep 17 00:00:00 2001 From: Vaibhav Pratap Date: Wed, 27 May 2026 10:21:42 +0000 Subject: [PATCH 4/5] update checksum as well --- .../async/writer_connection_resumed.cc | 14 ++- .../async/writer_connection_resumed_test.cc | 115 ++++++++++++++++++ 2 files changed, 126 insertions(+), 3 deletions(-) diff --git a/google/cloud/storage/internal/async/writer_connection_resumed.cc b/google/cloud/storage/internal/async/writer_connection_resumed.cc index 7bfcd7c86b576..18bbb08f35e6d 100644 --- a/google/cloud/storage/internal/async/writer_connection_resumed.cc +++ b/google/cloud/storage/internal/async/writer_connection_resumed.cc @@ -442,15 +442,24 @@ class AsyncWriterConnectionResumedState } // Regular resume attempt succeeded. Check state. std::int64_t persisted_offset = 0; + absl::optional checksums; + if (res->first_response.has_resource()) { if (!res->first_response.has_write_handle()) { // Found finalized object (maybe finalized concurrently or resumed). return SetFinalized(std::move(lk), std::move(*res->first_response.mutable_resource())); } - persisted_offset = res->first_response.resource().size(); + auto const& resource = res->first_response.resource(); + persisted_offset = resource.size(); + if (resource.has_checksums()) { + checksums = resource.checksums(); + } } else if (res->first_response.has_persisted_size()) { persisted_offset = res->first_response.persisted_size(); + if (res->first_response.has_persisted_data_checksums()) { + checksums = res->first_response.persisted_data_checksums(); + } } else { auto state = impl_->PersistedState(); if (absl::holds_alternative(state)) { @@ -460,10 +469,9 @@ class AsyncWriterConnectionResumedState absl::get(std::move(state))); } persisted_offset = absl::get(state); + checksums = impl_->PersistedChecksums(); } - auto checksums = impl_->PersistedChecksums(); - auto hash = hash_function_; if (checksums && checksums->has_crc32c()) { hash = std::make_shared< diff --git a/google/cloud/storage/internal/async/writer_connection_resumed_test.cc b/google/cloud/storage/internal/async/writer_connection_resumed_test.cc index 6c33bb2d7051c..2aa1e7e883e31 100644 --- a/google/cloud/storage/internal/async/writer_connection_resumed_test.cc +++ b/google/cloud/storage/internal/async/writer_connection_resumed_test.cc @@ -846,6 +846,121 @@ TEST(WriterConnectionResumed, ResumeUsesSizeFromFirstResponse) { EXPECT_THAT(write.get(), StatusIs(StatusCode::kOk)); } +TEST(WriterConnectionResumed, ResumeUsesChecksumsFromFirstResponse) { + AsyncSequencer sequencer; + auto mock = std::make_unique(); + auto* mock_ptr = mock.get(); + + auto initial_request = google::storage::v2::BidiWriteObjectRequest{}; + google::storage::v2::BidiWriteObjectResponse first_response; + first_response.mutable_write_handle()->set_handle("initial-handle"); + + auto mock_hash = + std::make_shared(); + EXPECT_CALL(*mock_hash, Update(::testing::An(), + ::testing::An(), + ::testing::An())) + .WillRepeatedly(Return(Status())); + + EXPECT_CALL(*mock_ptr, PersistedState) + .WillOnce(Return(MakePersistedState(0))); + + auto const payload = TestPayload(2048); + + EXPECT_CALL(*mock_ptr, Flush(_)).WillOnce([&](auto) { + return sequencer.PushBack("Flush").then([](auto f) { + if (f.get()) return Status{}; + return TransientError(); + }); + }); + + MockFactory mock_factory; + auto mock_stream = + std::make_unique>(); + auto* mock_stream_ptr = mock_stream.get(); + + EXPECT_CALL(mock_factory, Call(_)) + .WillOnce([&](google::storage::v2::BidiWriteObjectRequest const&) { + WriteObject::WriteResult result; + result.stream = std::move(mock_stream); + result.first_response.mutable_write_handle()->set_handle("new-handle"); + result.first_response.set_persisted_size(2048); + // Set checksums in response! + result.first_response.mutable_persisted_data_checksums()->set_crc32c(12345); + return sequencer.PushBack("Factory").then( + [r = std::move(result)](auto) mutable { + return StatusOr(std::move(r)); + }); + }); + + EXPECT_CALL(*mock_stream_ptr, Write(_, _)) + .WillOnce([&](google::storage::v2::BidiWriteObjectRequest const& request, + grpc::WriteOptions) { + EXPECT_EQ(request.write_offset(), 2048); + return sequencer.PushBack("StateLookupWrite").then([](auto) { + return true; + }); + }) + .WillOnce([&](google::storage::v2::BidiWriteObjectRequest const& request, + grpc::WriteOptions) { + EXPECT_TRUE(GetContent(request.checksummed_data()).empty()); + EXPECT_TRUE(request.flush()); + return sequencer.PushBack("GhostWrite").then([](auto) { return true; }); + }); + + google::storage::v2::BidiWriteObjectResponse read_response; + read_response.set_persisted_size(2048); + EXPECT_CALL(*mock_stream_ptr, Read) + .WillOnce([&, read_response]() { + return sequencer.PushBack("StreamRead1").then([read_response](auto) { + return absl::make_optional(read_response); + }); + }) + .WillOnce([&, read_response]() { + return sequencer.PushBack("StreamRead2").then([read_response](auto) { + return absl::make_optional(read_response); + }); + }); + + EXPECT_CALL(*mock_stream_ptr, Finish) + .WillOnce(Return(make_ready_future(Status{}))); + EXPECT_CALL(*mock_stream_ptr, Cancel).WillRepeatedly(Return()); + + auto connection = MakeWriterConnectionResumed( + mock_factory.AsStdFunction(), std::move(mock), initial_request, mock_hash, + first_response, Options{}); + + auto write = connection->Write(payload); + + auto next = sequencer.PopFrontWithName(); + EXPECT_EQ(next.second, "Flush"); + next.first.set_value(false); + + next = sequencer.PopFrontWithName(); + EXPECT_EQ(next.second, "Factory"); + next.first.set_value(true); + + next = sequencer.PopFrontWithName(); + EXPECT_EQ(next.second, "StateLookupWrite"); + next.first.set_value(true); + + next = sequencer.PopFrontWithName(); + EXPECT_EQ(next.second, "StreamRead1"); + next.first.set_value(true); + + next = sequencer.PopFrontWithName(); + EXPECT_EQ(next.second, "GhostWrite"); + next.first.set_value(true); + + next = sequencer.PopFrontWithName(); + EXPECT_EQ(next.second, "StreamRead2"); + next.first.set_value(true); + + EXPECT_THAT(write.get(), StatusIs(StatusCode::kOk)); +} + } // namespace GOOGLE_CLOUD_CPP_INLINE_NAMESPACE_END } // namespace storage_internal From b45618c12b431f71d3f2cf1ac7c4671fe6eb3e33 Mon Sep 17 00:00:00 2001 From: Vaibhav Pratap Date: Wed, 27 May 2026 10:27:49 +0000 Subject: [PATCH 5/5] fix the format --- .../storage/internal/async/writer_connection_resumed_test.cc | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/google/cloud/storage/internal/async/writer_connection_resumed_test.cc b/google/cloud/storage/internal/async/writer_connection_resumed_test.cc index 2aa1e7e883e31..07543ed643638 100644 --- a/google/cloud/storage/internal/async/writer_connection_resumed_test.cc +++ b/google/cloud/storage/internal/async/writer_connection_resumed_test.cc @@ -888,7 +888,8 @@ TEST(WriterConnectionResumed, ResumeUsesChecksumsFromFirstResponse) { result.first_response.mutable_write_handle()->set_handle("new-handle"); result.first_response.set_persisted_size(2048); // Set checksums in response! - result.first_response.mutable_persisted_data_checksums()->set_crc32c(12345); + result.first_response.mutable_persisted_data_checksums()->set_crc32c( + 12345); return sequencer.PushBack("Factory").then( [r = std::move(result)](auto) mutable { return StatusOr(std::move(r));