diff --git a/google/cloud/storage/internal/async/writer_connection_resumed.cc b/google/cloud/storage/internal/async/writer_connection_resumed.cc index 0dd2a6ace9897..18bbb08f35e6d 100644 --- a/google/cloud/storage/internal/async/writer_connection_resumed.cc +++ b/google/cloud/storage/internal/async/writer_connection_resumed.cc @@ -441,16 +441,36 @@ class AsyncWriterConnectionResumedState return SetError(std::move(lk), std::move(res).status()); } // Regular resume attempt succeeded. Check state. - auto state = impl_->PersistedState(); - if (absl::holds_alternative(state)) { - // Found finalized object (maybe finalized concurrently or resumed). - return SetFinalized(std::move(lk), absl::get( - std::move(state))); + std::int64_t persisted_offset = 0; + absl::optional checksums; + + if (res->first_response.has_resource()) { + if (!res->first_response.has_write_handle()) { + // Found finalized object (maybe finalized concurrently or resumed). + return SetFinalized(std::move(lk), + std::move(*res->first_response.mutable_resource())); + } + auto const& resource = res->first_response.resource(); + persisted_offset = resource.size(); + if (resource.has_checksums()) { + checksums = resource.checksums(); + } + } else if (res->first_response.has_persisted_size()) { + persisted_offset = res->first_response.persisted_size(); + if (res->first_response.has_persisted_data_checksums()) { + checksums = res->first_response.persisted_data_checksums(); + } + } else { + auto state = impl_->PersistedState(); + if (absl::holds_alternative(state)) { + // Found finalized object (maybe finalized concurrently or resumed). + return SetFinalized( + std::move(lk), + absl::get(std::move(state))); + } + persisted_offset = absl::get(state); + checksums = impl_->PersistedChecksums(); } - // Regular resume succeeded, object not finalized. Continue writing. - auto persisted_offset = absl::get(state); - - auto checksums = impl_->PersistedChecksums(); auto hash = hash_function_; if (checksums && checksums->has_crc32c()) { diff --git a/google/cloud/storage/internal/async/writer_connection_resumed_test.cc b/google/cloud/storage/internal/async/writer_connection_resumed_test.cc index dcebb80ac8c63..07543ed643638 100644 --- a/google/cloud/storage/internal/async/writer_connection_resumed_test.cc +++ b/google/cloud/storage/internal/async/writer_connection_resumed_test.cc @@ -733,6 +733,235 @@ TEST(WriterConnectionResumed, ResetWriteOffsetOnResume) { EXPECT_THAT(write.get(), StatusIs(StatusCode::kOk)); } +TEST(WriterConnectionResumed, ResumeUsesSizeFromFirstResponse) { + AsyncSequencer sequencer; + auto mock = std::make_unique(); + auto* mock_ptr = mock.get(); + + auto initial_request = google::storage::v2::BidiWriteObjectRequest{}; + google::storage::v2::BidiWriteObjectResponse first_response; + first_response.mutable_write_handle()->set_handle("initial-handle"); + + auto mock_hash = + std::make_shared(); + EXPECT_CALL(*mock_hash, Update(::testing::An(), + ::testing::An(), + ::testing::An())) + .WillRepeatedly(Return(Status())); + + EXPECT_CALL(*mock_ptr, PersistedState) + .WillOnce(Return(MakePersistedState(0))); + + auto const payload = TestPayload(2048); + + EXPECT_CALL(*mock_ptr, Flush(_)).WillOnce([&](auto) { + return sequencer.PushBack("Flush").then([](auto f) { + if (f.get()) return Status{}; + return TransientError(); + }); + }); + + MockFactory mock_factory; + auto mock_stream = + std::make_unique>(); + auto* mock_stream_ptr = mock_stream.get(); + + EXPECT_CALL(mock_factory, Call(_)) + .WillOnce([&](google::storage::v2::BidiWriteObjectRequest const&) { + WriteObject::WriteResult result; + result.stream = std::move(mock_stream); + result.first_response.mutable_write_handle()->set_handle("new-handle"); + result.first_response.set_persisted_size(2048); + return sequencer.PushBack("Factory").then( + [r = std::move(result)](auto) mutable { + return StatusOr(std::move(r)); + }); + }); + + EXPECT_CALL(*mock_stream_ptr, Write(_, _)) + .WillOnce([&](google::storage::v2::BidiWriteObjectRequest const& request, + grpc::WriteOptions) { + EXPECT_EQ(request.write_offset(), 2048); + return sequencer.PushBack("StateLookupWrite").then([](auto) { + return true; + }); + }) + .WillOnce([&](google::storage::v2::BidiWriteObjectRequest const& request, + grpc::WriteOptions) { + EXPECT_TRUE(GetContent(request.checksummed_data()).empty()); + EXPECT_TRUE(request.flush()); + return sequencer.PushBack("GhostWrite").then([](auto) { return true; }); + }); + + google::storage::v2::BidiWriteObjectResponse read_response; + read_response.set_persisted_size(2048); + EXPECT_CALL(*mock_stream_ptr, Read) + .WillOnce([&, read_response]() { + return sequencer.PushBack("StreamRead1").then([read_response](auto) { + return absl::make_optional(read_response); + }); + }) + .WillOnce([&, read_response]() { + return sequencer.PushBack("StreamRead2").then([read_response](auto) { + return absl::make_optional(read_response); + }); + }); + + EXPECT_CALL(*mock_stream_ptr, Finish) + .WillOnce(Return(make_ready_future(Status{}))); + EXPECT_CALL(*mock_stream_ptr, Cancel).WillRepeatedly(Return()); + + auto connection = MakeWriterConnectionResumed( + mock_factory.AsStdFunction(), std::move(mock), initial_request, mock_hash, + first_response, Options{}); + + auto write = connection->Write(payload); + + auto next = sequencer.PopFrontWithName(); + EXPECT_EQ(next.second, "Flush"); + next.first.set_value(false); + + next = sequencer.PopFrontWithName(); + EXPECT_EQ(next.second, "Factory"); + next.first.set_value(true); + + next = sequencer.PopFrontWithName(); + EXPECT_EQ(next.second, "StateLookupWrite"); + next.first.set_value(true); + + next = sequencer.PopFrontWithName(); + EXPECT_EQ(next.second, "StreamRead1"); + next.first.set_value(true); + + next = sequencer.PopFrontWithName(); + EXPECT_EQ(next.second, "GhostWrite"); + next.first.set_value(true); + + next = sequencer.PopFrontWithName(); + EXPECT_EQ(next.second, "StreamRead2"); + next.first.set_value(true); + + EXPECT_THAT(write.get(), StatusIs(StatusCode::kOk)); +} + +TEST(WriterConnectionResumed, ResumeUsesChecksumsFromFirstResponse) { + AsyncSequencer sequencer; + auto mock = std::make_unique(); + auto* mock_ptr = mock.get(); + + auto initial_request = google::storage::v2::BidiWriteObjectRequest{}; + google::storage::v2::BidiWriteObjectResponse first_response; + first_response.mutable_write_handle()->set_handle("initial-handle"); + + auto mock_hash = + std::make_shared(); + EXPECT_CALL(*mock_hash, Update(::testing::An(), + ::testing::An(), + ::testing::An())) + .WillRepeatedly(Return(Status())); + + EXPECT_CALL(*mock_ptr, PersistedState) + .WillOnce(Return(MakePersistedState(0))); + + auto const payload = TestPayload(2048); + + EXPECT_CALL(*mock_ptr, Flush(_)).WillOnce([&](auto) { + return sequencer.PushBack("Flush").then([](auto f) { + if (f.get()) return Status{}; + return TransientError(); + }); + }); + + MockFactory mock_factory; + auto mock_stream = + std::make_unique>(); + auto* mock_stream_ptr = mock_stream.get(); + + EXPECT_CALL(mock_factory, Call(_)) + .WillOnce([&](google::storage::v2::BidiWriteObjectRequest const&) { + WriteObject::WriteResult result; + result.stream = std::move(mock_stream); + result.first_response.mutable_write_handle()->set_handle("new-handle"); + result.first_response.set_persisted_size(2048); + // Set checksums in response! + result.first_response.mutable_persisted_data_checksums()->set_crc32c( + 12345); + return sequencer.PushBack("Factory").then( + [r = std::move(result)](auto) mutable { + return StatusOr(std::move(r)); + }); + }); + + EXPECT_CALL(*mock_stream_ptr, Write(_, _)) + .WillOnce([&](google::storage::v2::BidiWriteObjectRequest const& request, + grpc::WriteOptions) { + EXPECT_EQ(request.write_offset(), 2048); + return sequencer.PushBack("StateLookupWrite").then([](auto) { + return true; + }); + }) + .WillOnce([&](google::storage::v2::BidiWriteObjectRequest const& request, + grpc::WriteOptions) { + EXPECT_TRUE(GetContent(request.checksummed_data()).empty()); + EXPECT_TRUE(request.flush()); + return sequencer.PushBack("GhostWrite").then([](auto) { return true; }); + }); + + google::storage::v2::BidiWriteObjectResponse read_response; + read_response.set_persisted_size(2048); + EXPECT_CALL(*mock_stream_ptr, Read) + .WillOnce([&, read_response]() { + return sequencer.PushBack("StreamRead1").then([read_response](auto) { + return absl::make_optional(read_response); + }); + }) + .WillOnce([&, read_response]() { + return sequencer.PushBack("StreamRead2").then([read_response](auto) { + return absl::make_optional(read_response); + }); + }); + + EXPECT_CALL(*mock_stream_ptr, Finish) + .WillOnce(Return(make_ready_future(Status{}))); + EXPECT_CALL(*mock_stream_ptr, Cancel).WillRepeatedly(Return()); + + auto connection = MakeWriterConnectionResumed( + mock_factory.AsStdFunction(), std::move(mock), initial_request, mock_hash, + first_response, Options{}); + + auto write = connection->Write(payload); + + auto next = sequencer.PopFrontWithName(); + EXPECT_EQ(next.second, "Flush"); + next.first.set_value(false); + + next = sequencer.PopFrontWithName(); + EXPECT_EQ(next.second, "Factory"); + next.first.set_value(true); + + next = sequencer.PopFrontWithName(); + EXPECT_EQ(next.second, "StateLookupWrite"); + next.first.set_value(true); + + next = sequencer.PopFrontWithName(); + EXPECT_EQ(next.second, "StreamRead1"); + next.first.set_value(true); + + next = sequencer.PopFrontWithName(); + EXPECT_EQ(next.second, "GhostWrite"); + next.first.set_value(true); + + next = sequencer.PopFrontWithName(); + EXPECT_EQ(next.second, "StreamRead2"); + next.first.set_value(true); + + EXPECT_THAT(write.get(), StatusIs(StatusCode::kOk)); +} + } // namespace GOOGLE_CLOUD_CPP_INLINE_NAMESPACE_END } // namespace storage_internal