From b947f10cffc2ecda45cb3b73a6d5057150a9bc1b Mon Sep 17 00:00:00 2001 From: Gauri Kalra Date: Mon, 25 May 2026 06:59:54 +0000 Subject: [PATCH] fix(storage): Implement core clean half-close stream teardown for appendable uploads --- google/cloud/storage/async/writer.cc | 2 +- .../cloud/storage/async/writer_connection.h | 6 ++ google/cloud/storage/async/writer_test.cc | 8 +-- .../storage/internal/async/partial_upload.cc | 4 ++ .../storage/internal/async/partial_upload.h | 8 ++- .../internal/async/writer_connection_impl.cc | 26 +++++++ .../internal/async/writer_connection_impl.h | 2 + .../async/writer_connection_impl_test.cc | 67 +++++++++++++++++++ .../mocks/mock_async_writer_connection.h | 1 + 9 files changed, 118 insertions(+), 6 deletions(-) diff --git a/google/cloud/storage/async/writer.cc b/google/cloud/storage/async/writer.cc index 85eb88d427c11..5195a55ef4f05 100644 --- a/google/cloud/storage/async/writer.cc +++ b/google/cloud/storage/async/writer.cc @@ -104,7 +104,7 @@ future AsyncWriter::Close() { "closed stream", GCP_ERROR_INFO())); } - return impl_->Flush(WritePayload{}).then([impl = std::move(impl_)](auto f) { + return impl_->Close(WritePayload{}).then([impl = std::move(impl_)](auto f) { return f.get(); }); } diff --git a/google/cloud/storage/async/writer_connection.h b/google/cloud/storage/async/writer_connection.h index 43d6348f055ae..e8dbda1f3a703 100644 --- a/google/cloud/storage/async/writer_connection.h +++ b/google/cloud/storage/async/writer_connection.h @@ -116,6 +116,12 @@ class AsyncWriterConnection { /// Uploads some data to the service and flushes the value. virtual future Flush(WritePayload payload) = 0; + /// Cleanly flushes pending data and piggybacks a stream half-close + /// (WritesDone). + virtual future Close(WritePayload payload) { + return Flush(std::move(payload)); + } + /// Wait for the result of a `Flush()` call. virtual future> Query() = 0; diff --git a/google/cloud/storage/async/writer_test.cc b/google/cloud/storage/async/writer_test.cc index fc5e57eee282d..cdf3824f90f49 100644 --- a/google/cloud/storage/async/writer_test.cc +++ b/google/cloud/storage/async/writer_test.cc @@ -188,7 +188,7 @@ TEST(AsyncWriterTest, MultipleFlushesAreQueuedAndSequential) { TEST(AsyncWriterTest, Close) { auto mock = std::make_unique(); ::testing::InSequence sequence; - EXPECT_CALL(*mock, Flush(WritePayloadContents(IsEmpty()))).WillOnce([] { + EXPECT_CALL(*mock, Close(WritePayloadContents(IsEmpty()))).WillOnce([] { return make_ready_future(Status{}); }); @@ -205,7 +205,7 @@ TEST(AsyncWriterTest, CloseOnDefaultConstructed) { TEST(AsyncWriterTest, CloseOnMovedWriter) { auto mock = std::make_unique(); - EXPECT_CALL(*mock, Flush(WritePayloadContents(IsEmpty()))).WillOnce([] { + EXPECT_CALL(*mock, Close(WritePayloadContents(IsEmpty()))).WillOnce([] { return make_ready_future(Status{}); }); AsyncWriter writer(std::move(mock)); @@ -217,7 +217,7 @@ TEST(AsyncWriterTest, CloseOnMovedWriter) { TEST(AsyncWriterTest, ErrorOnWriteAfterClose) { auto mock = std::make_unique(); auto* mock_ptr = mock.get(); - EXPECT_CALL(*mock, Flush(WritePayloadContents(IsEmpty()))).WillOnce([] { + EXPECT_CALL(*mock, Close(WritePayloadContents(IsEmpty()))).WillOnce([] { return make_ready_future(Status{}); }); @@ -232,7 +232,7 @@ TEST(AsyncWriterTest, ErrorOnWriteAfterClose) { TEST(AsyncWriterTest, ErrorOnClose) { auto mock = std::make_unique(); - EXPECT_CALL(*mock, Flush(WritePayloadContents(IsEmpty()))).WillOnce([] { + EXPECT_CALL(*mock, Close(WritePayloadContents(IsEmpty()))).WillOnce([] { return make_ready_future(PermanentError()); }); diff --git a/google/cloud/storage/internal/async/partial_upload.cc b/google/cloud/storage/internal/async/partial_upload.cc index 154fc6aca49dd..e500d1ba4ec24 100644 --- a/google/cloud/storage/internal/async/partial_upload.cc +++ b/google/cloud/storage/internal/async/partial_upload.cc @@ -60,6 +60,10 @@ void PartialUpload::Write() { } else if (action_ == LastMessageAction::kFlush) { request_.set_flush(true); request_.set_state_lookup(true); + } else if (action_ == LastMessageAction::kFlushAndClose) { + request_.set_flush(true); + request_.set_state_lookup(true); + wopt.set_last_message(); } } (void)rpc_->Write(request_, std::move(wopt)) diff --git a/google/cloud/storage/internal/async/partial_upload.h b/google/cloud/storage/internal/async/partial_upload.h index bf4a7be3b2bf4..781740d77553d 100644 --- a/google/cloud/storage/internal/async/partial_upload.h +++ b/google/cloud/storage/internal/async/partial_upload.h @@ -90,7 +90,13 @@ class PartialUpload : public std::enable_shared_from_this { google::storage::v2::BidiWriteObjectRequest, google::storage::v2::BidiWriteObjectResponse>; - enum LastMessageAction { kNone, kFlush, kFinalize, kFinalizeWithChecksum }; + enum LastMessageAction { + kNone, + kFlush, + kFlushAndClose, + kFinalize, + kFinalizeWithChecksum + }; static std::shared_ptr Call( std::shared_ptr rpc, diff --git a/google/cloud/storage/internal/async/writer_connection_impl.cc b/google/cloud/storage/internal/async/writer_connection_impl.cc index a97a62960e3f6..3acafa12d7dd8 100644 --- a/google/cloud/storage/internal/async/writer_connection_impl.cc +++ b/google/cloud/storage/internal/async/writer_connection_impl.cc @@ -163,6 +163,19 @@ future AsyncWriterConnectionImpl::Flush(storage::WritePayload payload) { }); } +future AsyncWriterConnectionImpl::Close(storage::WritePayload payload) { + auto write = MakeRequest(); + auto p = WritePayloadImpl::GetImpl(payload); + auto size = p.size(); + auto coro = PartialUpload::Call(impl_, hash_function_, std::move(write), + std::move(p), PartialUpload::kFlushAndClose); + + return coro->Start().then([coro, size, this](auto f) mutable { + coro.reset(); // breaks the cycle between the completion queue and coro + return OnClose(size, f.get()); + }); +} + future> AsyncWriterConnectionImpl::Query() { return impl_->Read().then([this](auto f) { return OnQuery(f.get()); }); } @@ -202,6 +215,19 @@ future AsyncWriterConnectionImpl::OnPartialUpload( return make_ready_future(Status{}); } +future AsyncWriterConnectionImpl::OnClose(std::size_t upload_size, + StatusOr success) { + if (!success) { + return Finish().then(HandleFinishAfterError(std::move(success).status())); + } + if (!*success) { + return Finish().then( + HandleFinishAfterError("Expected Finish() error after non-ok Write()")); + } + offset_ += upload_size; + return Finish().then([](auto f) { return f.get(); }); +} + future> AsyncWriterConnectionImpl::OnFinalUpload(std::size_t upload_size, StatusOr success) { diff --git a/google/cloud/storage/internal/async/writer_connection_impl.h b/google/cloud/storage/internal/async/writer_connection_impl.h index 4cc1e979f1fe0..a98e69f907e8d 100644 --- a/google/cloud/storage/internal/async/writer_connection_impl.h +++ b/google/cloud/storage/internal/async/writer_connection_impl.h @@ -65,6 +65,7 @@ class AsyncWriterConnectionImpl : public storage::AsyncWriterConnection { future> Finalize( storage::WritePayload) override; future Flush(storage::WritePayload payload) override; + future Close(storage::WritePayload payload) override; future> Query() override; RpcMetadata GetRequestMetadata() override; @@ -83,6 +84,7 @@ class AsyncWriterConnectionImpl : public storage::AsyncWriterConnection { future OnPartialUpload(std::size_t upload_size, StatusOr success); + future OnClose(std::size_t upload_size, StatusOr success); future> OnFinalUpload( std::size_t upload_size, StatusOr success); future> OnQuery( diff --git a/google/cloud/storage/internal/async/writer_connection_impl_test.cc b/google/cloud/storage/internal/async/writer_connection_impl_test.cc index caaa3d4ae8d1c..03ecf7699cc2c 100644 --- a/google/cloud/storage/internal/async/writer_connection_impl_test.cc +++ b/google/cloud/storage/internal/async/writer_connection_impl_test.cc @@ -837,6 +837,73 @@ TEST(AsyncWriterConnectionTest, QueryUpdatesHandle) { EXPECT_EQ(seen_handles[0], "queried-handle"); } +TEST(AsyncWriterConnectionTest, CloseEmpty) { + AsyncSequencer sequencer; + auto mock = std::make_unique(); + EXPECT_CALL(*mock, Cancel).Times(1); + EXPECT_CALL(*mock, Write) + .WillOnce([&](Request const& request, grpc::WriteOptions wopt) { + EXPECT_TRUE(request.flush()); + EXPECT_TRUE(request.state_lookup()); + EXPECT_TRUE(wopt.is_last_message()); + EXPECT_EQ(request.common_object_request_params().encryption_algorithm(), + "test-only-algo"); + return sequencer.PushBack("Write"); + }); + EXPECT_CALL(*mock, Finish).WillOnce([&] { + return sequencer.PushBack("Finish").then([](auto f) { + if (f.get()) return Status{}; + return PermanentError(); + }); + }); + auto hash = std::make_shared(); + EXPECT_CALL(*hash, Update(_, An(), _)).Times(1); + EXPECT_CALL(*hash, Finish).Times(0); + + auto tested = std::make_unique( + TestOptions(), MakeRequest(), std::move(mock), hash, 1024); + auto close = tested->Close(WritePayload{}); + auto next = sequencer.PopFrontWithName(); + ASSERT_THAT(next.second, "Write"); + next.first.set_value(true); + + next = sequencer.PopFrontWithName(); + ASSERT_THAT(next.second, "Finish"); + next.first.set_value(true); + + EXPECT_THAT(close.get(), IsOk()); + tested = {}; +} + +TEST(AsyncWriterConnectionTest, CloseError) { + AsyncSequencer sequencer; + auto mock = std::make_unique(); + EXPECT_CALL(*mock, Cancel).Times(1); + EXPECT_CALL(*mock, Write).WillOnce([&](Request const&, grpc::WriteOptions) { + return sequencer.PushBack("Write"); + }); + EXPECT_CALL(*mock, Finish).WillOnce([&] { + return sequencer.PushBack("Finish").then([](auto f) { + if (f.get()) return Status{}; + return PermanentError(); + }); + }); + auto hash = std::make_shared(); + EXPECT_CALL(*hash, Update(_, An(), _)).Times(1); + EXPECT_CALL(*hash, Finish).Times(0); + + auto tested = std::make_unique( + TestOptions(), MakeRequest(), std::move(mock), hash, 1024); + auto response = tested->Close(WritePayload{}); + auto next = sequencer.PopFrontWithName(); + ASSERT_THAT(next.second, "Write"); + next.first.set_value(false); // Detect an error on Write() + next = sequencer.PopFrontWithName(); + ASSERT_THAT(next.second, "Finish"); + next.first.set_value(false); // Return error from Finish() + EXPECT_THAT(response.get(), StatusIs(PermanentError().code())); +} + GOOGLE_CLOUD_CPP_INLINE_NAMESPACE_END } // namespace storage_internal } // namespace cloud diff --git a/google/cloud/storage/mocks/mock_async_writer_connection.h b/google/cloud/storage/mocks/mock_async_writer_connection.h index 5db37cdc11322..875a288a34eb3 100644 --- a/google/cloud/storage/mocks/mock_async_writer_connection.h +++ b/google/cloud/storage/mocks/mock_async_writer_connection.h @@ -37,6 +37,7 @@ class MockAsyncWriterConnection : public storage::AsyncWriterConnection { MOCK_METHOD(future>, Finalize, (storage::WritePayload), (override)); MOCK_METHOD(future, Flush, (storage::WritePayload), (override)); + MOCK_METHOD(future, Close, (storage::WritePayload), (override)); MOCK_METHOD(future>, Query, (), (override)); MOCK_METHOD(RpcMetadata, GetRequestMetadata, (), (override)); };