Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion google/cloud/storage/async/writer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ future<Status> AsyncWriter::Close() {
"closed stream", GCP_ERROR_INFO()));
}

return impl_->Flush(WritePayload{}).then([impl = std::move(impl_)](auto f) {
return impl_->Close(WritePayload{}).then([impl = std::move(impl_)](auto f) {
return f.get();
});
}
Expand Down
6 changes: 6 additions & 0 deletions google/cloud/storage/async/writer_connection.h
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,12 @@ class AsyncWriterConnection {
/// Uploads some data to the service and flushes the value.
virtual future<Status> Flush(WritePayload payload) = 0;

/// Cleanly flushes pending data and piggybacks a stream half-close
/// (WritesDone).
virtual future<Status> Close(WritePayload payload) {
return Flush(std::move(payload));
}

/// Wait for the result of a `Flush()` call.
virtual future<StatusOr<std::int64_t>> Query() = 0;

Expand Down
8 changes: 4 additions & 4 deletions google/cloud/storage/async/writer_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -188,7 +188,7 @@ TEST(AsyncWriterTest, MultipleFlushesAreQueuedAndSequential) {
TEST(AsyncWriterTest, Close) {
auto mock = std::make_unique<MockAsyncWriterConnection>();
::testing::InSequence sequence;
EXPECT_CALL(*mock, Flush(WritePayloadContents(IsEmpty()))).WillOnce([] {
EXPECT_CALL(*mock, Close(WritePayloadContents(IsEmpty()))).WillOnce([] {
return make_ready_future(Status{});
});

Expand All @@ -205,7 +205,7 @@ TEST(AsyncWriterTest, CloseOnDefaultConstructed) {

TEST(AsyncWriterTest, CloseOnMovedWriter) {
auto mock = std::make_unique<MockAsyncWriterConnection>();
EXPECT_CALL(*mock, Flush(WritePayloadContents(IsEmpty()))).WillOnce([] {
EXPECT_CALL(*mock, Close(WritePayloadContents(IsEmpty()))).WillOnce([] {
return make_ready_future(Status{});
});
AsyncWriter writer(std::move(mock));
Expand All @@ -217,7 +217,7 @@ TEST(AsyncWriterTest, CloseOnMovedWriter) {
TEST(AsyncWriterTest, ErrorOnWriteAfterClose) {
auto mock = std::make_unique<MockAsyncWriterConnection>();
auto* mock_ptr = mock.get();
EXPECT_CALL(*mock, Flush(WritePayloadContents(IsEmpty()))).WillOnce([] {
EXPECT_CALL(*mock, Close(WritePayloadContents(IsEmpty()))).WillOnce([] {
return make_ready_future(Status{});
});

Expand All @@ -232,7 +232,7 @@ TEST(AsyncWriterTest, ErrorOnWriteAfterClose) {

TEST(AsyncWriterTest, ErrorOnClose) {
auto mock = std::make_unique<MockAsyncWriterConnection>();
EXPECT_CALL(*mock, Flush(WritePayloadContents(IsEmpty()))).WillOnce([] {
EXPECT_CALL(*mock, Close(WritePayloadContents(IsEmpty()))).WillOnce([] {
return make_ready_future(PermanentError());
});

Expand Down
4 changes: 4 additions & 0 deletions google/cloud/storage/internal/async/partial_upload.cc
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,10 @@ void PartialUpload::Write() {
} else if (action_ == LastMessageAction::kFlush) {
request_.set_flush(true);
request_.set_state_lookup(true);
} else if (action_ == LastMessageAction::kFlushAndClose) {
request_.set_flush(true);
request_.set_state_lookup(true);
wopt.set_last_message();
}
}
(void)rpc_->Write(request_, std::move(wopt))
Expand Down
8 changes: 7 additions & 1 deletion google/cloud/storage/internal/async/partial_upload.h
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,13 @@ class PartialUpload : public std::enable_shared_from_this<PartialUpload> {
google::storage::v2::BidiWriteObjectRequest,
google::storage::v2::BidiWriteObjectResponse>;

enum LastMessageAction { kNone, kFlush, kFinalize, kFinalizeWithChecksum };
enum LastMessageAction {
kNone,
kFlush,
kFlushAndClose,
kFinalize,
kFinalizeWithChecksum
};

static std::shared_ptr<PartialUpload> Call(
std::shared_ptr<StreamingWriteRpc> rpc,
Expand Down
26 changes: 26 additions & 0 deletions google/cloud/storage/internal/async/writer_connection_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,19 @@ future<Status> AsyncWriterConnectionImpl::Flush(storage::WritePayload payload) {
});
}

future<Status> AsyncWriterConnectionImpl::Close(storage::WritePayload payload) {
auto write = MakeRequest();
auto p = WritePayloadImpl::GetImpl(payload);
auto size = p.size();
auto coro = PartialUpload::Call(impl_, hash_function_, std::move(write),
std::move(p), PartialUpload::kFlushAndClose);

return coro->Start().then([coro, size, this](auto f) mutable {
coro.reset(); // breaks the cycle between the completion queue and coro
return OnClose(size, f.get());
});
}
Comment thread
kalragauri marked this conversation as resolved.

future<StatusOr<std::int64_t>> AsyncWriterConnectionImpl::Query() {
return impl_->Read().then([this](auto f) { return OnQuery(f.get()); });
}
Expand Down Expand Up @@ -202,6 +215,19 @@ future<Status> AsyncWriterConnectionImpl::OnPartialUpload(
return make_ready_future(Status{});
}

future<Status> AsyncWriterConnectionImpl::OnClose(std::size_t upload_size,
StatusOr<bool> success) {
if (!success) {
return Finish().then(HandleFinishAfterError(std::move(success).status()));
}
if (!*success) {
return Finish().then(
HandleFinishAfterError("Expected Finish() error after non-ok Write()"));
}
offset_ += upload_size;
return Finish().then([](auto f) { return f.get(); });
}

future<StatusOr<google::storage::v2::Object>>
AsyncWriterConnectionImpl::OnFinalUpload(std::size_t upload_size,
StatusOr<bool> success) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ class AsyncWriterConnectionImpl : public storage::AsyncWriterConnection {
future<StatusOr<google::storage::v2::Object>> Finalize(
storage::WritePayload) override;
future<Status> Flush(storage::WritePayload payload) override;
future<Status> Close(storage::WritePayload payload) override;
future<StatusOr<std::int64_t>> Query() override;
RpcMetadata GetRequestMetadata() override;

Expand All @@ -83,6 +84,7 @@ class AsyncWriterConnectionImpl : public storage::AsyncWriterConnection {

future<Status> OnPartialUpload(std::size_t upload_size,
StatusOr<bool> success);
future<Status> OnClose(std::size_t upload_size, StatusOr<bool> success);
future<StatusOr<google::storage::v2::Object>> OnFinalUpload(
std::size_t upload_size, StatusOr<bool> success);
future<StatusOr<std::int64_t>> OnQuery(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -837,6 +837,73 @@ TEST(AsyncWriterConnectionTest, QueryUpdatesHandle) {
EXPECT_EQ(seen_handles[0], "queried-handle");
}

TEST(AsyncWriterConnectionTest, CloseEmpty) {
AsyncSequencer<bool> sequencer;
auto mock = std::make_unique<MockStream>();
EXPECT_CALL(*mock, Cancel).Times(1);
EXPECT_CALL(*mock, Write)
.WillOnce([&](Request const& request, grpc::WriteOptions wopt) {
EXPECT_TRUE(request.flush());
EXPECT_TRUE(request.state_lookup());
EXPECT_TRUE(wopt.is_last_message());
EXPECT_EQ(request.common_object_request_params().encryption_algorithm(),
"test-only-algo");
return sequencer.PushBack("Write");
});
EXPECT_CALL(*mock, Finish).WillOnce([&] {
return sequencer.PushBack("Finish").then([](auto f) {
if (f.get()) return Status{};
return PermanentError();
});
});
auto hash = std::make_shared<MockHashFunction>();
EXPECT_CALL(*hash, Update(_, An<absl::Cord const&>(), _)).Times(1);
EXPECT_CALL(*hash, Finish).Times(0);

auto tested = std::make_unique<AsyncWriterConnectionImpl>(
TestOptions(), MakeRequest(), std::move(mock), hash, 1024);
auto close = tested->Close(WritePayload{});
auto next = sequencer.PopFrontWithName();
ASSERT_THAT(next.second, "Write");
next.first.set_value(true);

next = sequencer.PopFrontWithName();
ASSERT_THAT(next.second, "Finish");
next.first.set_value(true);

EXPECT_THAT(close.get(), IsOk());
tested = {};
}

TEST(AsyncWriterConnectionTest, CloseError) {
AsyncSequencer<bool> sequencer;
auto mock = std::make_unique<MockStream>();
EXPECT_CALL(*mock, Cancel).Times(1);
EXPECT_CALL(*mock, Write).WillOnce([&](Request const&, grpc::WriteOptions) {
return sequencer.PushBack("Write");
});
EXPECT_CALL(*mock, Finish).WillOnce([&] {
return sequencer.PushBack("Finish").then([](auto f) {
if (f.get()) return Status{};
return PermanentError();
});
});
auto hash = std::make_shared<MockHashFunction>();
EXPECT_CALL(*hash, Update(_, An<absl::Cord const&>(), _)).Times(1);
EXPECT_CALL(*hash, Finish).Times(0);

auto tested = std::make_unique<AsyncWriterConnectionImpl>(
TestOptions(), MakeRequest(), std::move(mock), hash, 1024);
auto response = tested->Close(WritePayload{});
auto next = sequencer.PopFrontWithName();
ASSERT_THAT(next.second, "Write");
next.first.set_value(false); // Detect an error on Write()
next = sequencer.PopFrontWithName();
ASSERT_THAT(next.second, "Finish");
next.first.set_value(false); // Return error from Finish()
EXPECT_THAT(response.get(), StatusIs(PermanentError().code()));
}

GOOGLE_CLOUD_CPP_INLINE_NAMESPACE_END
} // namespace storage_internal
} // namespace cloud
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ class MockAsyncWriterConnection : public storage::AsyncWriterConnection {
MOCK_METHOD(future<StatusOr<google::storage::v2::Object>>, Finalize,
(storage::WritePayload), (override));
MOCK_METHOD(future<Status>, Flush, (storage::WritePayload), (override));
MOCK_METHOD(future<Status>, Close, (storage::WritePayload), (override));
MOCK_METHOD(future<StatusOr<std::int64_t>>, Query, (), (override));
MOCK_METHOD(RpcMetadata, GetRequestMetadata, (), (override));
};
Expand Down
Loading