Skip to content
6 changes: 6 additions & 0 deletions google/cloud/storage/async/writer_connection.h
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,12 @@ class AsyncWriterConnection {
/// Returns the latest write handle, if any.
virtual absl::optional<google::storage::v2::BidiWriteHandle> WriteHandle()
const = 0;

/// Returns the latest persisted data checksums, if any.
virtual absl::optional<google::storage::v2::ObjectChecksums>
PersistedChecksums() const {
return absl::nullopt;
}
};

/**
Expand Down
63 changes: 44 additions & 19 deletions google/cloud/storage/internal/async/connection_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,47 @@ std::unique_ptr<storage::internal::HashFunction> CreateHashFunction(
return storage::internal::CreateNullHashFunction();
}

StatusOr<std::unique_ptr<storage::AsyncWriterConnection>> MakeAppendableWriter(
google::cloud::internal::ImmutableOptions const& current,
google::storage::v2::BidiWriteObjectRequest request,
std::int64_t persisted_size,
std::function<future<StatusOr<WriteObject::WriteResult>>(
google::storage::v2::BidiWriteObjectRequest)>
factory,
StatusOr<WriteObject::WriteResult> rpc) {
if (!rpc) return std::move(rpc).status();

std::shared_ptr<storage::internal::HashFunction> hash;
std::unique_ptr<AsyncWriterConnectionImpl> impl;

if (rpc->first_response.has_resource()) {
auto const& resource = rpc->first_response.resource();
if (current->get<storage::EnableCrc32cValidationOption>() &&
resource.has_checksums() && resource.checksums().has_crc32c()) {
hash = std::make_shared<
::google::cloud::storage::internal::Crc32cHashFunction>(
resource.checksums().crc32c(), resource.size());
} else {
hash = CreateHashFunction(*current);
}
impl = std::make_unique<AsyncWriterConnectionImpl>(
current, request, std::move(rpc->stream), hash, resource, false);
} else {
persisted_size = rpc->first_response.persisted_size();
hash = CreateHashFunction(*current);
auto checksums = rpc->first_response.has_persisted_data_checksums()
? absl::make_optional(
rpc->first_response.persisted_data_checksums())
: absl::nullopt;
impl = std::make_unique<AsyncWriterConnectionImpl>(
current, request, std::move(rpc->stream), hash, persisted_size, false,
checksums);
}
return MakeWriterConnectionResumed(std::move(factory), std::move(impl),
std::move(request), std::move(hash),
rpc->first_response, *current);
}

std::unique_ptr<storage::internal::HashValidator> CreateHashValidator(
google::storage::v2::ReadObjectRequest const& request,
Options const& options) {
Expand Down Expand Up @@ -315,8 +356,6 @@ AsyncConnectionImpl::AppendableObjectUploadImpl(AppendableUploadParams p) {
auto current = internal::MakeImmutableOptions(std::move(p.options));
auto request = p.request;
std::int64_t persisted_size = 0;
std::shared_ptr<storage::internal::HashFunction> hash_function =
CreateHashFunction(*current);
auto retry =
std::shared_ptr<storage::AsyncRetryPolicy>(retry_policy(*current));
auto backoff =
Expand Down Expand Up @@ -404,24 +443,10 @@ AsyncConnectionImpl::AppendableObjectUploadImpl(AppendableUploadParams p) {
auto pending = factory(std::move(request));
return pending.then(
[current, request = std::move(p.request), persisted_size,
hash = std::move(hash_function), fa = std::move(factory)](auto f) mutable
fa = std::move(factory)](auto f) mutable
-> StatusOr<std::unique_ptr<storage::AsyncWriterConnection>> {
auto rpc = f.get();
if (!rpc) return std::move(rpc).status();
std::unique_ptr<AsyncWriterConnectionImpl> impl;
if (rpc->first_response.has_resource()) {
impl = std::make_unique<AsyncWriterConnectionImpl>(
current, request, std::move(rpc->stream), hash,
rpc->first_response.resource(), false);
} else {
persisted_size = rpc->first_response.persisted_size();
impl = std::make_unique<AsyncWriterConnectionImpl>(
current, request, std::move(rpc->stream), hash, persisted_size,
false);
}
return MakeWriterConnectionResumed(std::move(fa), std::move(impl),
std::move(request), std::move(hash),
rpc->first_response, *current);
return MakeAppendableWriter(current, std::move(request), persisted_size,
std::move(fa), f.get());
});
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,14 @@
// See the License for the specific language governing permissions and
// limitations under the License.

#include "google/cloud/storage/async/options.h"
#include "google/cloud/storage/async/retry_policy.h"
#include "google/cloud/storage/async/writer_connection.h"
#include "google/cloud/storage/internal/async/connection_impl.h"
#include "google/cloud/storage/internal/async/default_options.h"
#include "google/cloud/storage/internal/async/write_object.h"
#include "google/cloud/storage/internal/async/writer_connection_impl.h"
#include "google/cloud/storage/internal/crc32c.h"
#include "google/cloud/storage/testing/canonical_errors.h"
#include "google/cloud/storage/testing/mock_storage_stub.h"
#include "google/cloud/common_options.h"
Expand Down Expand Up @@ -627,6 +631,257 @@ TEST_F(AsyncConnectionImplAppendableTest, AppendableUploadRedirectNoHandle) {
next.first.set_value(true);
}

TEST_F(AsyncConnectionImplAppendableTest,
StartAppendableObjectUploadWithChecksum) {
auto constexpr kRequestText = R"pb(
write_object_spec {
resource {
bucket: "projects/_/buckets/test-bucket"
name: "test-object"
content_type: "text/plain"
}
}
)pb";
AsyncSequencer<bool> sequencer;
auto mock = std::make_shared<storage::testing::MockStorageStub>();

google::storage::v2::Object initial_resource;
initial_resource.set_bucket("projects/_/buckets/test-bucket");
initial_resource.set_name("test-object");
initial_resource.set_size(1024);
initial_resource.mutable_checksums()->set_crc32c(12345); // Some dummy CRC

auto stream = std::make_unique<MockAsyncBidiWriteObjectStream>();
EXPECT_CALL(*stream, Start).WillOnce([&] {
return sequencer.PushBack("Start");
});

EXPECT_CALL(*stream, Read)
.WillOnce([&, initial_resource] {
return sequencer.PushBack("Read(Takeover)")
.then([initial_resource](auto) {
auto response = google::storage::v2::BidiWriteObjectResponse{};
*response.mutable_resource() = initial_resource;
return absl::make_optional(std::move(response));
});
})
.WillOnce([&, initial_resource] {
return sequencer.PushBack("Read(FinalObject)")
.then([initial_resource](auto) {
auto response = google::storage::v2::BidiWriteObjectResponse{};
*response.mutable_resource() = initial_resource;
response.mutable_resource()->set_size(
initial_resource.size() + 9); // "some data" size is 9
return absl::make_optional(std::move(response));
});
});

EXPECT_CALL(*stream, Cancel).Times(1);
EXPECT_CALL(*stream, Finish).WillOnce([&] {
return sequencer.PushBack("Finish").then([](auto) { return Status{}; });
});

EXPECT_CALL(*stream, Write)
.WillOnce([&](google::storage::v2::BidiWriteObjectRequest const& request,
grpc::WriteOptions wopt) {
EXPECT_TRUE(request.state_lookup());
EXPECT_FALSE(wopt.is_last_message());
return sequencer.PushBack("Write(StateLookup)");
})
.WillOnce(
[&](google::storage::v2::BidiWriteObjectRequest const& /*request*/,
grpc::WriteOptions wopt) {
EXPECT_FALSE(wopt.is_last_message());
return sequencer.PushBack("Write(data)");
})
.WillOnce([&](google::storage::v2::BidiWriteObjectRequest const& request,
grpc::WriteOptions wopt) {
EXPECT_TRUE(request.finish_write());
EXPECT_TRUE(wopt.is_last_message());
// Here we expect full checksums to be set because we had the resource
// in takeover.
EXPECT_TRUE(request.has_object_checksums());
Comment thread
v-pratap marked this conversation as resolved.
auto expected_crc =
google::cloud::storage_internal::ExtendCrc32c(12345, "some data");
EXPECT_EQ(request.object_checksums().crc32c(), expected_crc);
return sequencer.PushBack("Write(Finalize)");
});

EXPECT_CALL(*mock, AsyncBidiWriteObject).WillOnce([&] {
return std::unique_ptr<AsyncBidiWriteObjectStream>(std::move(stream));
});

internal::AutomaticallyCreatedBackgroundThreads pool(1);
// Enable CRC32C validation in options
auto options = TestOptions().set<storage::EnableCrc32cValidationOption>(true);
auto connection = MakeTestConnection(pool.cq(), mock, options);

auto request = google::storage::v2::BidiWriteObjectRequest{};
ASSERT_TRUE(TextFormat::ParseFromString(kRequestText, &request));
request.mutable_write_object_spec()->set_appendable(true);

auto pending = connection->StartAppendableObjectUpload(
{std::move(request), connection->options()});

auto next = sequencer.PopFrontWithName();
EXPECT_EQ(next.second, "Start");
next.first.set_value(true);

next = sequencer.PopFrontWithName();
EXPECT_EQ(next.second, "Write(StateLookup)");
next.first.set_value(true);

next = sequencer.PopFrontWithName();
EXPECT_EQ(next.second, "Read(Takeover)");
next.first.set_value(true);

auto r = pending.get();
ASSERT_STATUS_OK(r);
auto writer = *std::move(r);

// Write some data.
auto w1 = writer->Write(storage::WritePayload("some data"));
next = sequencer.PopFrontWithName();
EXPECT_EQ(next.second, "Write(data)");
next.first.set_value(true);
EXPECT_STATUS_OK(w1.get());

// Finalize the upload.
auto w2 = writer->Finalize({});
next = sequencer.PopFrontWithName();
EXPECT_EQ(next.second, "Write(Finalize)");
next.first.set_value(true);
next = sequencer.PopFrontWithName();
EXPECT_EQ(next.second, "Read(FinalObject)");
next.first.set_value(true);

auto response = w2.get();
ASSERT_STATUS_OK(response);

writer.reset();
next = sequencer.PopFrontWithName();
EXPECT_EQ(next.second, "Finish");
next.first.set_value(true);
}

TEST_F(AsyncConnectionImplAppendableTest,
ResumeAppendableObjectUploadWithChecksum) {
auto constexpr kRequestText = R"pb(
append_object_spec { object: "test-object" }
)pb";
AsyncSequencer<bool> sequencer;
auto mock = std::make_shared<storage::testing::MockStorageStub>();

constexpr std::int64_t kPersistedSize = 16384;
constexpr std::uint32_t kPersistedCrc = 12345;

auto stream = std::make_unique<MockAsyncBidiWriteObjectStream>();
EXPECT_CALL(*stream, Start).WillOnce([&] {
return sequencer.PushBack("Start");
});

EXPECT_CALL(*stream, Read)
.WillOnce([&] {
return sequencer.PushBack("Read(PersistedSize)").then([](auto) {
auto response = google::storage::v2::BidiWriteObjectResponse{};
response.set_persisted_size(kPersistedSize);
response.mutable_persisted_data_checksums()->set_crc32c(
kPersistedCrc);
return absl::make_optional(std::move(response));
});
})
.WillOnce([&] {
return sequencer.PushBack("Read(FinalObject)").then([](auto) {
auto response = google::storage::v2::BidiWriteObjectResponse{};
auto object = google::storage::v2::Object{};
object.set_bucket("projects/_/buckets/test-bucket");
object.set_name("test-object");
object.set_size(kPersistedSize + 9);
*response.mutable_resource() = std::move(object);
return absl::make_optional(std::move(response));
});
});

EXPECT_CALL(*stream, Cancel).Times(1);
EXPECT_CALL(*stream, Finish).WillOnce([&] {
return sequencer.PushBack("Finish").then([](auto) { return Status{}; });
});

EXPECT_CALL(*stream, Write)
.WillOnce([&](google::storage::v2::BidiWriteObjectRequest const& request,
grpc::WriteOptions wopt) {
EXPECT_TRUE(request.state_lookup());
EXPECT_FALSE(wopt.is_last_message());
return sequencer.PushBack("Write(StateLookup)");
})
.WillOnce(
[&](google::storage::v2::BidiWriteObjectRequest const& /*request*/,
grpc::WriteOptions wopt) {
EXPECT_FALSE(wopt.is_last_message());
return sequencer.PushBack("Write(data)");
})
.WillOnce([&](google::storage::v2::BidiWriteObjectRequest const& request,
grpc::WriteOptions wopt) {
EXPECT_TRUE(request.finish_write());
EXPECT_TRUE(wopt.is_last_message());
EXPECT_TRUE(request.has_object_checksums());
EXPECT_EQ(request.object_checksums().crc32c(), 2901820631);
return sequencer.PushBack("Write(Finalize)");
});

EXPECT_CALL(*mock, AsyncBidiWriteObject)
.WillOnce([&](auto const&, auto, auto) {
return std::unique_ptr<AsyncBidiWriteObjectStream>(std::move(stream));
});

internal::AutomaticallyCreatedBackgroundThreads pool(1);
auto options = TestOptions().set<storage::EnableCrc32cValidationOption>(true);
auto connection = MakeTestConnection(pool.cq(), mock, options);

auto request = google::storage::v2::BidiWriteObjectRequest{};
ASSERT_TRUE(TextFormat::ParseFromString(kRequestText, &request));
auto pending = connection->ResumeAppendableObjectUpload(
{std::move(request), connection->options()});

auto next = sequencer.PopFrontWithName();
EXPECT_EQ(next.second, "Start");
next.first.set_value(true);

next = sequencer.PopFrontWithName();
EXPECT_EQ(next.second, "Write(StateLookup)");
next.first.set_value(true);

next = sequencer.PopFrontWithName();
EXPECT_EQ(next.second, "Read(PersistedSize)");
next.first.set_value(true);

auto r = pending.get();
ASSERT_STATUS_OK(r);
auto writer = *std::move(r);

auto w1 = writer->Write(storage::WritePayload("some data"));
next = sequencer.PopFrontWithName();
EXPECT_EQ(next.second, "Write(data)");
next.first.set_value(true);
EXPECT_STATUS_OK(w1.get());

auto w2 = writer->Finalize({});
next = sequencer.PopFrontWithName();
EXPECT_EQ(next.second, "Write(Finalize)");
next.first.set_value(true);
next = sequencer.PopFrontWithName();
EXPECT_EQ(next.second, "Read(FinalObject)");
next.first.set_value(true);

auto response = w2.get();
ASSERT_STATUS_OK(response);

writer.reset();
next = sequencer.PopFrontWithName();
EXPECT_EQ(next.second, "Finish");
next.first.set_value(true);
}

} // namespace
GOOGLE_CLOUD_CPP_INLINE_NAMESPACE_END
} // namespace storage_internal
Expand Down
Loading
Loading