Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -441,14 +441,26 @@ class AsyncWriterConnectionResumedState
return SetError(std::move(lk), std::move(res).status());
}
// Regular resume attempt succeeded. Check state.
auto state = impl_->PersistedState();
if (absl::holds_alternative<google::storage::v2::Object>(state)) {
// Found finalized object (maybe finalized concurrently or resumed).
return SetFinalized(std::move(lk), absl::get<google::storage::v2::Object>(
std::move(state)));
std::int64_t persisted_offset = 0;
if (res->first_response.has_resource()) {
if (!res->first_response.has_write_handle()) {
// Found finalized object (maybe finalized concurrently or resumed).
return SetFinalized(std::move(lk),
std::move(*res->first_response.mutable_resource()));
}
persisted_offset = res->first_response.resource().size();
} else if (res->first_response.has_persisted_size()) {
persisted_offset = res->first_response.persisted_size();
} else {
auto state = impl_->PersistedState();
if (absl::holds_alternative<google::storage::v2::Object>(state)) {
// Found finalized object (maybe finalized concurrently or resumed).
return SetFinalized(
std::move(lk),
absl::get<google::storage::v2::Object>(std::move(state)));
}
persisted_offset = absl::get<std::int64_t>(state);
}
// Regular resume succeeded, object not finalized. Continue writing.
auto persisted_offset = absl::get<std::int64_t>(state);

auto checksums = impl_->PersistedChecksums();
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need to update the checksum as well using the response sent by the server?


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -733,6 +733,119 @@ TEST(WriterConnectionResumed, ResetWriteOffsetOnResume) {
EXPECT_THAT(write.get(), StatusIs(StatusCode::kOk));
}

TEST(WriterConnectionResumed, ResumeUsesSizeFromFirstResponse) {
AsyncSequencer<bool> sequencer;
auto mock = std::make_unique<MockAsyncWriterConnection>();
auto* mock_ptr = mock.get();

auto initial_request = google::storage::v2::BidiWriteObjectRequest{};
google::storage::v2::BidiWriteObjectResponse first_response;
first_response.mutable_write_handle()->set_handle("initial-handle");

auto mock_hash =
std::make_shared<google::cloud::storage::testing::MockHashFunction>();
EXPECT_CALL(*mock_hash, Update(::testing::An<std::int64_t>(),
::testing::An<absl::Cord const&>(),
::testing::An<std::uint32_t>()))
.WillRepeatedly(Return(Status()));

EXPECT_CALL(*mock_ptr, PersistedState)
.WillOnce(Return(MakePersistedState(0)));

auto const payload = TestPayload(2048);

EXPECT_CALL(*mock_ptr, Flush(_)).WillOnce([&](auto) {
return sequencer.PushBack("Flush").then([](auto f) {
if (f.get()) return Status{};
return TransientError();
});
});

MockFactory mock_factory;
auto mock_stream =
std::make_unique<google::cloud::mocks::MockAsyncStreamingReadWriteRpc<
google::storage::v2::BidiWriteObjectRequest,
google::storage::v2::BidiWriteObjectResponse>>();
auto* mock_stream_ptr = mock_stream.get();

EXPECT_CALL(mock_factory, Call(_))
.WillOnce([&](google::storage::v2::BidiWriteObjectRequest const&) {
WriteObject::WriteResult result;
result.stream = std::move(mock_stream);
result.first_response.mutable_write_handle()->set_handle("new-handle");
result.first_response.set_persisted_size(2048);
return sequencer.PushBack("Factory").then(
[r = std::move(result)](auto) mutable {
return StatusOr<WriteObject::WriteResult>(std::move(r));
});
});

EXPECT_CALL(*mock_stream_ptr, Write(_, _))
.WillOnce([&](google::storage::v2::BidiWriteObjectRequest const& request,
grpc::WriteOptions) {
EXPECT_EQ(request.write_offset(), 2048);
return sequencer.PushBack("StateLookupWrite").then([](auto) {
return true;
});
})
.WillOnce([&](google::storage::v2::BidiWriteObjectRequest const& request,
grpc::WriteOptions) {
EXPECT_TRUE(GetContent(request.checksummed_data()).empty());
EXPECT_TRUE(request.flush());
return sequencer.PushBack("GhostWrite").then([](auto) { return true; });
});

google::storage::v2::BidiWriteObjectResponse read_response;
read_response.set_persisted_size(2048);
EXPECT_CALL(*mock_stream_ptr, Read)
.WillOnce([&, read_response]() {
return sequencer.PushBack("StreamRead1").then([read_response](auto) {
return absl::make_optional(read_response);
});
})
.WillOnce([&, read_response]() {
return sequencer.PushBack("StreamRead2").then([read_response](auto) {
return absl::make_optional(read_response);
});
});

EXPECT_CALL(*mock_stream_ptr, Finish)
.WillOnce(Return(make_ready_future(Status{})));
EXPECT_CALL(*mock_stream_ptr, Cancel).WillRepeatedly(Return());

auto connection = MakeWriterConnectionResumed(
mock_factory.AsStdFunction(), std::move(mock), initial_request, mock_hash,
first_response, Options{});

auto write = connection->Write(payload);

auto next = sequencer.PopFrontWithName();
EXPECT_EQ(next.second, "Flush");
next.first.set_value(false);

next = sequencer.PopFrontWithName();
EXPECT_EQ(next.second, "Factory");
next.first.set_value(true);

next = sequencer.PopFrontWithName();
EXPECT_EQ(next.second, "StateLookupWrite");
next.first.set_value(true);

next = sequencer.PopFrontWithName();
EXPECT_EQ(next.second, "StreamRead1");
next.first.set_value(true);

next = sequencer.PopFrontWithName();
EXPECT_EQ(next.second, "GhostWrite");
next.first.set_value(true);

next = sequencer.PopFrontWithName();
EXPECT_EQ(next.second, "StreamRead2");
next.first.set_value(true);

EXPECT_THAT(write.get(), StatusIs(StatusCode::kOk));
}

} // namespace
GOOGLE_CLOUD_CPP_INLINE_NAMESPACE_END
} // namespace storage_internal
Expand Down
Loading