Skip to content

Commit 19d64d3

Browse files
committed
feat(storage): access raw protobuf directly in read_resumption_strategy
1 parent 4d98e32 commit 19d64d3

File tree

5 files changed

+48
-40
lines changed

5 files changed

+48
-40
lines changed

google/cloud/storage/asyncio/retry/reads_resumption_strategy.py

Lines changed: 15 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -81,23 +81,24 @@ def update_state_from_response(
8181
self, response: storage_v2.BidiReadObjectResponse, state: Dict[str, Any]
8282
) -> None:
8383
"""Processes a server response, performs integrity checks, and updates state."""
84-
85-
# Capture read_handle if provided.
86-
if response.read_handle:
87-
state["read_handle"] = response.read_handle
84+
proto = getattr(response, "_pb", response)
85+
if proto.read_handle:
86+
state["read_handle"] = storage_v2.BidiReadHandle(
87+
handle=proto.read_handle.handle
88+
)
8889

8990
download_states = state["download_states"]
9091

91-
for object_data_range in response.object_data_ranges:
92-
# Ignore empty ranges or ranges for IDs not in our state
93-
# (e.g., from a previously cancelled request on the same stream).
94-
if not object_data_range.read_range:
92+
for object_data_range in proto.object_data_ranges:
93+
if not object_data_range.HasField("read_range"):
9594
logger.warning(
9695
"Received response with missing read_range field; ignoring."
9796
)
9897
continue
9998

100-
read_id = object_data_range.read_range.read_id
99+
read_range_pb = object_data_range.read_range
100+
read_id = read_range_pb.read_id
101+
101102
if read_id not in download_states:
102103
logger.warning(
103104
f"Received data for unknown or stale read_id {read_id}; ignoring."
@@ -107,7 +108,7 @@ def update_state_from_response(
107108
read_state = download_states[read_id]
108109

109110
# Offset Verification
110-
chunk_offset = object_data_range.read_range.read_offset
111+
chunk_offset = read_range_pb.read_offset
111112
if chunk_offset != read_state.next_expected_offset:
112113
raise DataCorruption(
113114
response,
@@ -116,11 +117,11 @@ def update_state_from_response(
116117
)
117118

118119
# Checksum Verification
119-
# We must validate data before updating state or writing to buffer.
120-
data = object_data_range.checksummed_data.content
121-
server_checksum = object_data_range.checksummed_data.crc32c
120+
checksummed_data = object_data_range.checksummed_data
121+
data = checksummed_data.content
122122

123-
if server_checksum is not None:
123+
if checksummed_data.HasField("crc32c"):
124+
server_checksum = checksummed_data.crc32c
124125
client_checksum = int.from_bytes(Checksum(data).digest(), "big")
125126
if server_checksum != client_checksum:
126127
raise DataCorruption(

tests/perf/microbenchmarks/_utils.py

Lines changed: 9 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,8 @@
1818
import socket
1919
import psutil
2020

21-
_C4_STANDARD_192_NIC = "ens3" # can be fetched via ip link show
21+
_C4_STANDARD_192_NIC = "ens3" # can be fetched via ip link show
22+
2223

2324
def publish_benchmark_extra_info(
2425
benchmark: Any,
@@ -28,7 +29,6 @@ def publish_benchmark_extra_info(
2829
download_bytes_list: Optional[List[int]] = None,
2930
duration: Optional[int] = None,
3031
) -> None:
31-
3232
"""
3333
Helper function to publish benchmark parameters to the extra_info property.
3434
"""
@@ -48,14 +48,15 @@ def publish_benchmark_extra_info(
4848
benchmark.group = benchmark_group
4949

5050
if download_bytes_list is not None:
51-
assert duration is not None, "Duration must be provided if total_bytes_transferred is provided."
51+
assert (
52+
duration is not None
53+
), "Duration must be provided if total_bytes_transferred is provided."
5254
throughputs_list = [x / duration / (1024 * 1024) for x in download_bytes_list]
5355
min_throughput = min(throughputs_list)
5456
max_throughput = max(throughputs_list)
5557
mean_throughput = statistics.mean(throughputs_list)
5658
median_throughput = statistics.median(throughputs_list)
5759

58-
5960
else:
6061
object_size = params.file_size_bytes
6162
num_files = params.num_files
@@ -211,13 +212,13 @@ def get_affinity(irq):
211212

212213
def get_primary_interface_name():
213214
primary_ip = None
214-
215+
215216
# 1. Determine the Local IP used for internet access
216217
# We use UDP (SOCK_DGRAM) so we don't actually send a handshake/packet
217218
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
218219
try:
219220
# connect() to a public IP (Google DNS) to force route resolution
220-
s.connect(('8.8.8.8', 80))
221+
s.connect(("8.8.8.8", 80))
221222
primary_ip = s.getsockname()[0]
222223
except Exception:
223224
# Fallback if no internet
@@ -248,7 +249,7 @@ def get_irq_affinity():
248249
for irq in irqs:
249250
affinity_str = get_affinity(irq)
250251
if affinity_str != "N/A":
251-
for part in affinity_str.split(','):
252-
if '-' not in part:
252+
for part in affinity_str.split(","):
253+
if "-" not in part:
253254
cpus.add(int(part))
254255
return cpus

tests/perf/microbenchmarks/time_based/conftest.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,5 +17,5 @@
1717
@pytest.fixture
1818
def workload_params(request):
1919
params = request.param
20-
files_names = [f'fio-go_storage_fio.0.{i}' for i in range(0, params.num_processes)]
20+
files_names = [f"fio-go_storage_fio.0.{i}" for i in range(0, params.num_processes)]
2121
return params, files_names

tests/perf/microbenchmarks/time_based/reads/test_reads.py

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -159,7 +159,6 @@ async def _download_time_based_async(client, filename, params):
159159

160160

161161
def _download_files_worker(process_idx, filename, params, bucket_type):
162-
163162
if bucket_type == "zonal":
164163
return worker_loop.run_until_complete(
165164
_download_time_based_async(worker_client, filename, params)

tests/unit/asyncio/test_async_appendable_object_writer.py

Lines changed: 23 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -175,9 +175,9 @@ async def test_state_lookup(self, mock_appendable_writer):
175175
writer._is_stream_open = True
176176
writer.write_obj_stream = mock_appendable_writer["mock_stream"]
177177

178-
mock_appendable_writer["mock_stream"].recv.return_value = (
179-
storage_type.BidiWriteObjectResponse(persisted_size=100)
180-
)
178+
mock_appendable_writer[
179+
"mock_stream"
180+
].recv.return_value = storage_type.BidiWriteObjectResponse(persisted_size=100)
181181

182182
size = await writer.state_lookup()
183183

@@ -246,9 +246,7 @@ async def test_append_data_less_than_flush_interval(self, mock_appendable_writer
246246
],
247247
)
248248
@pytest.mark.asyncio
249-
async def test_append(
250-
self, data_len, mock_appendable_writer
251-
):
249+
async def test_append(self, data_len, mock_appendable_writer):
252250
"""Verify append orchestrates manager and drives the internal generator."""
253251
# Arrange
254252
writer = self._make_one(mock_appendable_writer["mock_client"])
@@ -272,10 +270,19 @@ async def test_append(
272270
# Assert
273271
expected_recv_count = data_len // _DEFAULT_FLUSH_INTERVAL_BYTES
274272
assert writer.offset == data_len
275-
assert writer.bytes_appended_since_last_flush == data_len % _DEFAULT_FLUSH_INTERVAL_BYTES
276-
assert writer.persisted_size == expected_recv_count*_DEFAULT_FLUSH_INTERVAL_BYTES
277-
assert writer.write_obj_stream.send.await_count == -(-data_len // _MAX_CHUNK_SIZE_BYTES) # Ceiling division for number of chunks
278-
assert writer.write_obj_stream.recv.await_count == expected_recv_count # Expect 1 recv per flush interval
273+
assert (
274+
writer.bytes_appended_since_last_flush
275+
== data_len % _DEFAULT_FLUSH_INTERVAL_BYTES
276+
)
277+
assert (
278+
writer.persisted_size == expected_recv_count * _DEFAULT_FLUSH_INTERVAL_BYTES
279+
)
280+
assert writer.write_obj_stream.send.await_count == -(
281+
-data_len // _MAX_CHUNK_SIZE_BYTES
282+
) # Ceiling division for number of chunks
283+
assert (
284+
writer.write_obj_stream.recv.await_count == expected_recv_count
285+
) # Expect 1 recv per flush interval
279286

280287
@pytest.mark.asyncio
281288
async def test_append_recovery_reopens_stream(self, mock_appendable_writer):
@@ -339,9 +346,9 @@ async def test_flush_resets_counters(self, mock_appendable_writer):
339346
writer.write_obj_stream = mock_appendable_writer["mock_stream"]
340347
writer.bytes_appended_since_last_flush = 100
341348

342-
mock_appendable_writer["mock_stream"].recv.return_value = (
343-
storage_type.BidiWriteObjectResponse(persisted_size=200)
344-
)
349+
mock_appendable_writer[
350+
"mock_stream"
351+
].recv.return_value = storage_type.BidiWriteObjectResponse(persisted_size=200)
345352

346353
await writer.flush()
347354

@@ -382,9 +389,9 @@ async def test_finalize_lifecycle(self, mock_appendable_writer):
382389
writer.write_obj_stream = mock_appendable_writer["mock_stream"]
383390

384391
resource = storage_type.Object(size=999)
385-
mock_appendable_writer["mock_stream"].recv.return_value = (
386-
storage_type.BidiWriteObjectResponse(resource=resource)
387-
)
392+
mock_appendable_writer[
393+
"mock_stream"
394+
].recv.return_value = storage_type.BidiWriteObjectResponse(resource=resource)
388395

389396
res = await writer.finalize()
390397

0 commit comments

Comments
 (0)