Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions google/cloud/storage/async/object_descriptor.cc
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,8 @@ std::pair<AsyncReader, AsyncToken> ObjectDescriptor::ReadLast(
return {AsyncReader(std::move(reader)), std::move(token)};
}

bool ObjectDescriptor::IsOpen() const { return impl_->IsOpen(); }

GOOGLE_CLOUD_CPP_INLINE_NAMESPACE_END
} // namespace storage
} // namespace cloud
Expand Down
8 changes: 8 additions & 0 deletions google/cloud/storage/async/object_descriptor.h
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,14 @@ class ObjectDescriptor {
*/
std::pair<AsyncReader, AsyncToken> ReadLast(std::int64_t limit);

/**
* Returns true if the descriptor is open.
*
* A descriptor is open if it has not been cancelled and has not hit a
* permanent failure.
*/
bool IsOpen() const;

private:
std::shared_ptr<ObjectDescriptorConnection> impl_;
};
Expand Down
8 changes: 8 additions & 0 deletions google/cloud/storage/async/object_descriptor_connection.h
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,14 @@ class ObjectDescriptorConnection {
virtual std::unique_ptr<AsyncReaderConnection> Read(ReadParams p) = 0;

virtual void MakeSubsequentStream() = 0;

/**
* Returns true if the descriptor is open.
*
* A descriptor is open if it has not been cancelled and has not hit a
* permanent failure.
*/
virtual bool IsOpen() const { return true; }
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this have a default implementation that returns true or should it be a pure virtual method with no implementation?

};

GOOGLE_CLOUD_CPP_INLINE_NAMESPACE_END
Expand Down
9 changes: 9 additions & 0 deletions google/cloud/storage/async/object_descriptor_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -186,6 +186,15 @@ TEST(ObjectDescriptor, ReadExceedsMaxRange) {
EXPECT_FALSE(token.valid());
}

TEST(ObjectDescriptor, IsOpen) {
auto mock = std::make_shared<MockAsyncObjectDescriptorConnection>();
EXPECT_CALL(*mock, IsOpen).WillOnce(Return(true)).WillOnce(Return(false));

auto tested = ObjectDescriptor(mock);
EXPECT_TRUE(tested.IsOpen());
EXPECT_FALSE(tested.IsOpen());
}

} // namespace
GOOGLE_CLOUD_CPP_INLINE_NAMESPACE_END
} // namespace storage
Expand Down
41 changes: 28 additions & 13 deletions google/cloud/storage/internal/async/connection_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@
#include "google/cloud/internal/async_streaming_read_rpc_timeout.h"
#include "google/cloud/internal/async_streaming_write_rpc_timeout.h"
#include "google/cloud/internal/make_status.h"
#include <grpcpp/grpcpp.h>
#include <memory>
#include <utility>

Expand Down Expand Up @@ -231,19 +232,33 @@ AsyncConnectionImpl::Open(OpenParams p) {

auto pending = factory(std::move(initial_request));
using ReturnType = std::shared_ptr<storage::ObjectDescriptorConnection>;
return pending.then(
[rp = std::move(resume_policy), fa = std::move(factory),
rs = std::move(p.read_spec),
options = std::move(p.options)](auto f) mutable -> StatusOr<ReturnType> {
auto result = f.get();
if (!result) return std::move(result).status();

auto impl = std::make_shared<ObjectDescriptorImpl>(
std::move(rp), std::move(fa), std::move(rs),
std::move(result->stream), std::move(options));
impl->Start(std::move(result->first_response));
return ReturnType(impl);
});
return pending.then([rp = std::move(resume_policy), fa = std::move(factory),
rs = std::move(p.read_spec),
options = std::move(p.options), refresh = refresh_](
auto f) mutable -> StatusOr<ReturnType> {
auto result = f.get();
if (!result) return std::move(result).status();

// The descriptor remains open if at least one gRPC channel is in a
// functional state. We consider READY, IDLE, and CONNECTING to be
// functional. TRANSIENT_FAILURE and SHUTDOWN are not included because they
// indicate a definitive loss of connectivity or terminal closure.
auto transport_ok = [refresh] {
if (!refresh) return true;
auto const& channels = refresh->channels();
return std::any_of(
channels.begin(), channels.end(), [](auto const& channel) {
auto state = channel->GetState(false);
return state == GRPC_CHANNEL_READY || state == GRPC_CHANNEL_IDLE ||
state == GRPC_CHANNEL_CONNECTING;
});
};
auto impl = std::make_shared<ObjectDescriptorImpl>(
std::move(rp), std::move(fa), std::move(rs), std::move(result->stream),
std::move(options), std::move(transport_ok));
impl->Start(std::move(result->first_response));
return ReturnType(impl);
});
}

future<StatusOr<std::unique_ptr<storage::AsyncReaderConnection>>>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ class AsyncObjectDescriptorConnectionTracing
absl::optional<google::storage::v2::Object> metadata() const override {
return impl_->metadata();
}
bool IsOpen() const override { return impl_->IsOpen(); }

std::unique_ptr<storage::AsyncReaderConnection> Read(ReadParams p) override {
internal::OTelScope scope(span_);
Expand Down
16 changes: 14 additions & 2 deletions google/cloud/storage/internal/async/object_descriptor_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -34,11 +34,13 @@ ObjectDescriptorImpl::ObjectDescriptorImpl(
std::unique_ptr<storage::ResumePolicy> resume_policy,
OpenStreamFactory make_stream,
google::storage::v2::BidiReadObjectSpec read_object_spec,
std::shared_ptr<OpenStream> stream, Options options)
std::shared_ptr<OpenStream> stream, Options options,
std::function<bool()> transport_ok)
: resume_policy_prototype_(std::move(resume_policy)),
make_stream_(std::move(make_stream)),
read_object_spec_(std::move(read_object_spec)),
options_(std::move(options)) {
options_(std::move(options)),
transport_ok_(std::move(transport_ok)) {
stream_manager_ = std::make_unique<StreamManager>(
[]() -> std::shared_ptr<ReadStream> { return nullptr; }, // NOLINT
std::make_shared<ReadStream>(std::move(stream),
Expand All @@ -62,8 +64,18 @@ void ObjectDescriptorImpl::Start(
}
}

bool ObjectDescriptorImpl::IsOpen() const {
{
std::unique_lock<std::mutex> lk(mu_);
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

std::scoped_lock would be more appropriate here.

if (cancelled_) return false;
if (stream_manager_->Empty()) return false;
}
return !transport_ok_ || transport_ok_();
}

void ObjectDescriptorImpl::Cancel() {
std::unique_lock<std::mutex> lk(mu_);
if (cancelled_) return;
cancelled_ = true;
if (stream_manager_) stream_manager_->CancelAll();
if (pending_stream_.valid()) pending_stream_.cancel();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
#include "absl/types/optional.h"
#include "google/storage/v2/storage.pb.h"
#include <cstdint>
#include <functional>
#include <memory>
#include <mutex>
#include <unordered_map>
Expand Down Expand Up @@ -59,8 +60,8 @@ class ObjectDescriptorImpl
ObjectDescriptorImpl(std::unique_ptr<storage::ResumePolicy> resume_policy,
OpenStreamFactory make_stream,
google::storage::v2::BidiReadObjectSpec read_object_spec,
std::shared_ptr<OpenStream> stream,
Options options = {});
std::shared_ptr<OpenStream> stream, Options options = {},
std::function<bool()> transport_ok = {});
~ObjectDescriptorImpl() override;

// Start the read loop.
Expand All @@ -82,6 +83,8 @@ class ObjectDescriptorImpl

std::size_t StreamSize() const;

bool IsOpen() const override;

private:
using StreamManager = MultiStreamManager<ReadStream, ReadRange>;
using StreamIterator =
Expand Down Expand Up @@ -123,6 +126,7 @@ class ObjectDescriptorImpl
google::cloud::StatusOr<storage_internal::OpenStreamResult>>
pending_stream_;
bool cancelled_ = false;
std::function<bool()> transport_ok_;
};

GOOGLE_CLOUD_CPP_INLINE_NAMESPACE_END
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,15 +67,18 @@ auto constexpr kMetadataText = R"pb(

auto NoResume() { return storage::LimitedErrorCountResumePolicy(0)(); }

auto MakeTested(std::unique_ptr<storage::ResumePolicy> resume_policy,
OpenStreamFactory make_stream,
google::storage::v2::BidiReadObjectSpec read_object_spec,
std::shared_ptr<OpenStream> stream) {
auto MakeTested(
std::unique_ptr<storage::ResumePolicy> resume_policy,
OpenStreamFactory make_stream,
google::storage::v2::BidiReadObjectSpec read_object_spec,
std::shared_ptr<OpenStream> stream,
std::function<bool()> transport_ok = [] { return true; }) {
Options options;
options.set<storage::EnableMultiStreamOptimizationOption>(true);
return std::make_shared<ObjectDescriptorImpl>(
std::move(resume_policy), std::move(make_stream),
std::move(read_object_spec), std::move(stream), std::move(options));
std::move(read_object_spec), std::move(stream), std::move(options),
std::move(transport_ok));
}

MATCHER_P(IsProtoEqualModuloRepeatedFieldOrdering, value,
Expand Down Expand Up @@ -1784,6 +1787,83 @@ TEST(ObjectDescriptorImpl, MultiStreamOptimizationDisabled) {
tested.reset();
}

/// @test Verify that IsOpen() is true by default.
TEST(ObjectDescriptorImpl, IsOpenTrueByDefault) {
MockFactory factory;
auto stream = std::make_unique<MockStream>();
EXPECT_CALL(*stream, Finish).WillOnce(Return(make_ready_future(Status{})));
EXPECT_CALL(*stream, Cancel).Times(AtMost(1));
auto tested = MakeTested(NoResume(), factory.AsStdFunction(),
google::storage::v2::BidiReadObjectSpec{},
std::make_shared<OpenStream>(std::move(stream)));
EXPECT_TRUE(tested->IsOpen());
}

/// @test Verify that IsOpen() is false after Cancel().
TEST(ObjectDescriptorImpl, IsOpenFalseOnCancel) {
MockFactory factory;
auto stream = std::make_unique<MockStream>();
EXPECT_CALL(*stream, Finish).WillOnce(Return(make_ready_future(Status{})));
EXPECT_CALL(*stream, Cancel).Times(AtMost(2));
auto tested = MakeTested(NoResume(), factory.AsStdFunction(),
google::storage::v2::BidiReadObjectSpec{},
std::make_shared<OpenStream>(std::move(stream)));
EXPECT_TRUE(tested->IsOpen());
tested->Cancel();
EXPECT_FALSE(tested->IsOpen());
}

/// @test Verify that IsOpen() is false if transport health check fails.
TEST(ObjectDescriptorImpl, IsOpenFalseOnPermanentError) {
MockFactory factory;
auto stream = std::make_unique<MockStream>();
EXPECT_CALL(*stream, Finish).WillOnce(Return(make_ready_future(Status{})));
EXPECT_CALL(*stream, Cancel).Times(AtMost(1));
bool transport_ok = true;
auto transport_ok_callback = [&transport_ok] { return transport_ok; };

auto tested = MakeTested(NoResume(), factory.AsStdFunction(),
google::storage::v2::BidiReadObjectSpec{},
std::make_shared<OpenStream>(std::move(stream)),
transport_ok_callback);

EXPECT_TRUE(tested->IsOpen());
transport_ok = false;
EXPECT_FALSE(tested->IsOpen());
}

TEST(ObjectDescriptorImpl, IsOpenFalseOnTransportFailure) {
auto stream = std::make_unique<MockStream>();
EXPECT_CALL(*stream, Cancel).Times(1);
EXPECT_CALL(*stream, Finish).WillOnce([] {
return make_ready_future(Status{});
});
MockFactory factory;
auto transport_ok = [] { return false; };
auto tested = std::make_shared<ObjectDescriptorImpl>(
NoResume(), factory.AsStdFunction(),
google::storage::v2::BidiReadObjectSpec{},
std::make_shared<OpenStream>(std::move(stream)), Options{},
std::move(transport_ok));
EXPECT_FALSE(tested->IsOpen());
}

TEST(ObjectDescriptorImpl, IsOpenTrueOnTransportSuccess) {
auto stream = std::make_unique<MockStream>();
EXPECT_CALL(*stream, Cancel).Times(1);
EXPECT_CALL(*stream, Finish).WillOnce([] {
return make_ready_future(Status{});
});
MockFactory factory;
auto transport_ok = [] { return true; };
auto tested = std::make_shared<ObjectDescriptorImpl>(
NoResume(), factory.AsStdFunction(),
google::storage::v2::BidiReadObjectSpec{},
std::make_shared<OpenStream>(std::move(stream)), Options{},
std::move(transport_ok));
EXPECT_TRUE(tested->IsOpen());
}

} // namespace
GOOGLE_CLOUD_CPP_INLINE_NAMESPACE_END
} // namespace storage_internal
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ class MockAsyncObjectDescriptorConnection
(ReadParams), (override));
MOCK_METHOD(void, MakeSubsequentStream, (), (override));
MOCK_METHOD(Options, options, (), (const, override));
MOCK_METHOD(bool, IsOpen, (), (const, override));
};

GOOGLE_CLOUD_CPP_INLINE_NAMESPACE_END
Expand Down
Loading