Skip to content
This repository was archived by the owner on Mar 31, 2026. It is now read-only.

Commit ed21dee

Browse files
committed
Refactor async_connection class
1 parent 60e831f commit ed21dee

File tree

7 files changed

+194
-187
lines changed

7 files changed

+194
-187
lines changed

google/cloud/storage/_experimental/asyncio/abstracts/__init__.py

Whitespace-only changes.
Lines changed: 74 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,74 @@
1+
# Copyright 2025 Google LLC
2+
#
3+
# Licensed under the Apache License, Version 2.0 (the "License");
4+
# you may not use this file except in compliance with the License.
5+
# You may obtain a copy of the License at
6+
#
7+
# http://www.apache.org/licenses/LICENSE-2.0
8+
#
9+
# Unless required by applicable law or agreed to in writing, software
10+
# distributed under the License is distributed on an "AS IS" BASIS,
11+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
# See the License for the specific language governing permissions and
13+
# limitations under the License.
14+
15+
"""Abstract class for Async JSON and GRPC connection."""
16+
17+
import abc
18+
from google.cloud.storage._http import AGENT_VERSION
19+
from google.api_core.client_info import ClientInfo
20+
from google.cloud.storage import __version__
21+
22+
23+
class AsyncConnection(abc.ABC):
24+
"""Class for asynchronous connection with JSON and GRPC compatibility.
25+
26+
This class expose python implementation of interacting with relevant APIs.
27+
28+
Args:
29+
client: The client that owns this connection.
30+
client_info: Information about the client library.
31+
"""
32+
33+
def __init__(self, client, client_info=None):
34+
self._client = client
35+
36+
if client_info is None:
37+
client_info = ClientInfo()
38+
39+
self._client_info = client_info
40+
if self._client_info.user_agent is None:
41+
self._client_info.user_agent = AGENT_VERSION
42+
else:
43+
self._client_info.user_agent = (
44+
f"{self._client_info.user_agent} {AGENT_VERSION}"
45+
)
46+
self._client_info.client_library_version = __version__
47+
self._extra_headers = {}
48+
49+
@property
50+
def extra_headers(self):
51+
"""Returns extra headers to send with every request."""
52+
return self._extra_headers
53+
54+
@extra_headers.setter
55+
def extra_headers(self, value):
56+
"""Set the extra header property."""
57+
self._extra_headers = value
58+
59+
@property
60+
def user_agent(self):
61+
"""Returns user_agent for async HTTP transport.
62+
63+
Returns:
64+
str: The user agent string.
65+
"""
66+
return self._client_info.to_user_agent()
67+
68+
@user_agent.setter
69+
def user_agent(self, value):
70+
"""Setter for user_agent in connection."""
71+
self._client_info.user_agent = value
72+
73+
async def close(self):
74+
pass

google/cloud/storage/_experimental/asyncio/async_client.py

Lines changed: 19 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -16,16 +16,9 @@
1616

1717
from google.cloud.storage._experimental.asyncio.async_creds import AsyncCredsWrapper
1818
from google.cloud.storage.abstracts.base_client import BaseClient
19-
from google.cloud.storage._experimental.asyncio.async_connection import AsyncConnection
19+
from google.cloud.storage._experimental.asyncio.utility.async_json_connection import AsyncJSONConnection
2020
from google.cloud.storage.abstracts import base_client
2121

22-
try:
23-
from google.auth.aio.transport import sessions
24-
AsyncSession = sessions.AsyncAuthorizedSession
25-
_AIO_AVAILABLE = True
26-
except ImportError:
27-
_AIO_AVAILABLE = False
28-
2922
_marker = base_client.marker
3023

3124

@@ -43,13 +36,6 @@ def __init__(
4336
*,
4437
api_key=None,
4538
):
46-
if not _AIO_AVAILABLE:
47-
# Python 3.9 or less comes with an older version of google-auth library which doesn't support asyncio
48-
raise ImportError(
49-
"Failed to import 'google.auth.aio', Consider using a newer python version (>=3.10)"
50-
" or newer version of google-auth library to mitigate this issue."
51-
)
52-
5339
if self._use_client_cert:
5440
# google.auth.aio.transports.sessions.AsyncAuthorizedSession currently doesn't support configuring mTLS.
5541
# In future, we can monkey patch the above, and do provide mTLS support, but that is not a priority
@@ -66,22 +52,28 @@ def __init__(
6652
api_key=api_key
6753
)
6854
self.credentials = AsyncCredsWrapper(self._credentials) # self._credential is synchronous.
69-
self._connection = AsyncConnection(self, **self.connection_kw_args) # adapter for async communication
70-
self._async_http_internal = _async_http
71-
self._async_http_passed_by_user = (_async_http is not None)
55+
self._async_http = _async_http
56+
57+
# We need both, as the same client can be used for multiple buckets.
58+
self._json_connection_internal = None
59+
self._grpc_connection_internal = None
7260

7361
@property
74-
def async_http(self):
75-
"""Returns the existing asynchronous session, or create one if it does not exists."""
76-
if self._async_http_internal is None:
77-
self._async_http_internal = AsyncSession(credentials=self.credentials)
78-
return self._async_http_internal
62+
def _grpc_connection(self):
63+
raise NotImplementedError("Not yet Implemented.")
7964

80-
async def close(self):
81-
"""Close the session, if it exists"""
82-
if self._async_http_internal is not None and not self._async_http_passed_by_user:
83-
await self._async_http_internal.close()
65+
@property
66+
def _json_connection(self):
67+
if not self._json_connection_internal:
68+
self._json_connection_internal = AsyncJSONConnection(self, _async_http=self._async_http, credentials=self.credentials, **self.connection_kw_args)
69+
return self._json_connection_internal
8470

71+
async def close(self):
72+
if self._json_connection:
73+
await self._json_connection.close()
74+
75+
if self._grpc_connection:
76+
await self._grpc_connection.close()
8577

8678
def bucket(self, bucket_name, user_project=None, generation=None):
8779
"""Factory constructor for bucket object.

google/cloud/storage/_experimental/asyncio/utility/__init__.py

Whitespace-only changes.

google/cloud/storage/_experimental/asyncio/async_connection.py renamed to google/cloud/storage/_experimental/asyncio/utility/async_json_connection.py

Lines changed: 33 additions & 55 deletions
Original file line numberDiff line numberDiff line change
@@ -12,87 +12,65 @@
1212
# See the License for the specific language governing permissions and
1313
# limitations under the License.
1414

15-
"""Create/interact with Google Cloud Storage connections in asynchronous manner."""
15+
"""Implementation of Async JSON connection"""
1616

1717
import json
1818
import collections
1919
import functools
20-
from urllib.parse import urlencode
21-
2220
import google.api_core.exceptions
21+
22+
from urllib.parse import urlencode
2323
from google.cloud import _http
2424
from google.cloud.storage import _http as storage_http
2525
from google.cloud.storage import _helpers
26-
from google.api_core.client_info import ClientInfo
2726
from google.cloud.storage._opentelemetry_tracing import create_trace_span
28-
from google.cloud.storage import __version__
29-
from google.cloud.storage._http import AGENT_VERSION
30-
27+
from google.cloud.storage._experimental.asyncio.abstracts.async_connection import AsyncConnection
3128

32-
class AsyncConnection:
33-
"""Class for asynchronous connection using google.auth.aio.
29+
try:
30+
from google.auth.aio.transport import sessions
31+
AsyncSession = sessions.AsyncAuthorizedSession
32+
_AIO_AVAILABLE = True
33+
except ImportError:
34+
_AIO_AVAILABLE = False
3435

35-
This class handles the creation of API requests, header management,
36-
user agent configuration, and error handling for the Async Storage Client.
36+
class AsyncJSONConnection(AsyncConnection):
37+
"""Implementation of Async JSON connection
3738
3839
Args:
3940
client: The client that owns this connection.
4041
client_info: Information about the client library.
4142
api_endpoint: The API endpoint to use.
4243
"""
4344

44-
def __init__(self, client, client_info=None, api_endpoint=None):
45-
self._client = client
46-
47-
if client_info is None:
48-
client_info = ClientInfo()
49-
50-
self._client_info = client_info
51-
if self._client_info.user_agent is None:
52-
self._client_info.user_agent = AGENT_VERSION
53-
else:
54-
self._client_info.user_agent = (
55-
f"{self._client_info.user_agent} {AGENT_VERSION}"
45+
def __init__(self, client, client_info=None, api_endpoint=None, _async_http=None, credentials=None):
46+
if not _AIO_AVAILABLE:
47+
# Python 3.9 or less comes with an older version of google-auth library which doesn't support asyncio
48+
raise ImportError(
49+
"Failed to import 'google.auth.aio', Consider using a newer python version (>=3.10)"
50+
" or newer version of google-auth library to mitigate this issue."
5651
)
57-
self._client_info.client_library_version = __version__
58-
self._extra_headers = {}
52+
53+
super().__init__(client, client_info=client_info)
5954

6055
self.API_BASE_URL = api_endpoint or storage_http.Connection.DEFAULT_API_ENDPOINT
6156
self.API_VERSION = storage_http.Connection.API_VERSION
6257
self.API_URL_TEMPLATE = storage_http.Connection.API_URL_TEMPLATE
63-
64-
@property
65-
def extra_headers(self):
66-
"""Returns extra headers to send with every request."""
67-
return self._extra_headers
68-
69-
@extra_headers.setter
70-
def extra_headers(self, value):
71-
"""Set the extra header property."""
72-
self._extra_headers = value
58+
59+
self.credentials = credentials
60+
self._async_http_internal = _async_http
61+
self._async_http_passed_by_user = (_async_http is not None)
7362

7463
@property
7564
def async_http(self):
76-
"""Returns the AsyncAuthorizedSession from the client.
77-
78-
Returns:
79-
google.auth.aio.transport.sessions.AsyncAuthorizedSession: The async session.
80-
"""
81-
return self._client.async_http
82-
83-
@property
84-
def user_agent(self):
85-
"""Returns user_agent for async HTTP transport.
86-
87-
Returns:
88-
str: The user agent string.
89-
"""
90-
return self._client_info.to_user_agent()
91-
92-
@user_agent.setter
93-
def user_agent(self, value):
94-
"""Setter for user_agent in connection."""
95-
self._client_info.user_agent = value
65+
"""Returns the existing asynchronous session, or create one if it does not exists."""
66+
if self._async_http_internal is None:
67+
self._async_http_internal = AsyncSession(credentials=self.credentials)
68+
return self._async_http_internal
69+
70+
async def close(self):
71+
"""Close the session, if it exists"""
72+
if self._async_http_internal is not None and not self._async_http_passed_by_user:
73+
await self._async_http_internal.close()
9674

9775
async def _make_request(
9876
self,

tests/unit/asyncio/test_async_client.py

Lines changed: 14 additions & 68 deletions
Original file line numberDiff line numberDiff line change
@@ -17,10 +17,6 @@
1717
import pytest
1818
from google.auth.credentials import Credentials
1919
from google.cloud.storage._experimental.asyncio.async_client import AsyncClient
20-
from google.cloud.storage._experimental.asyncio.async_helpers import AsyncHTTPIterator
21-
22-
# Aliases to match sync test style
23-
_marker = object()
2420

2521

2622
def _make_credentials():
@@ -45,22 +41,20 @@ def test_ctor_defaults(self):
4541
PROJECT = "PROJECT"
4642
credentials = _make_credentials()
4743

48-
# We mock AsyncConnection to prevent network logic during init
49-
with mock.patch("google.cloud.storage._experimental.asyncio.async_client.AsyncConnection") as MockConn:
44+
# We mock AsyncJSONConnection to prevent network logic during init
45+
with mock.patch("google.cloud.storage._experimental.asyncio.async_client.AsyncJSONConnection") as MockConn:
5046
client = self._make_one(project=PROJECT, credentials=credentials)
5147

52-
assert client.project == PROJECT
53-
# It is the instance of the Mock
54-
assert isinstance(client._connection, mock.Mock)
55-
assert client._connection == MockConn.return_value
48+
assert client.project == PROJECT
5649

57-
# Verify specific async attributes
58-
assert client._async_http_internal is None
59-
assert client._async_http_passed_by_user is False
50+
# It is the instance of the Mock
51+
assert isinstance(client._json_connection, mock.Mock)
52+
assert client._json_connection == MockConn.return_value
53+
MockConn.assert_called_once_with(client, _async_http=None, credentials=client.credentials, client_info=None, api_endpoint=None)
6054

61-
# Verify inheritance from BaseClient worked (batch stack, etc)
62-
assert client.current_batch is None
63-
assert list(client._batch_stack) == []
55+
# Verify inheritance from BaseClient worked (batch stack, etc)
56+
assert client.current_batch is None
57+
assert list(client._batch_stack) == []
6458

6559
def test_ctor_mtls_raises_error(self):
6660
credentials = _make_credentials()
@@ -76,67 +70,19 @@ def test_ctor_w_async_http_passed(self):
7670
credentials = _make_credentials()
7771
async_http = mock.Mock()
7872

79-
with mock.patch("google.cloud.storage._experimental.asyncio.async_client.AsyncConnection"):
73+
with mock.patch("google.cloud.storage._experimental.asyncio.async_client.AsyncJSONConnection") as MockConn:
8074
client = self._make_one(
8175
project="PROJECT",
8276
credentials=credentials,
8377
_async_http=async_http
8478
)
8579

86-
assert client._async_http_internal is async_http
87-
assert client._async_http_passed_by_user is True
88-
89-
def test_async_http_property_creates_session(self):
90-
credentials = _make_credentials()
91-
with mock.patch("google.cloud.storage._experimental.asyncio.async_client.AsyncConnection"):
92-
client = self._make_one(project="PROJECT", credentials=credentials)
93-
94-
assert client._async_http_internal is None
95-
96-
# Mock the auth session class
97-
with mock.patch("google.cloud.storage._experimental.asyncio.async_client.AsyncSession") as MockSession:
98-
session = client.async_http
99-
100-
assert session is MockSession.return_value
101-
assert client._async_http_internal is session
102-
# Should be initialized with the AsyncCredsWrapper, not the raw credentials
103-
MockSession.assert_called_once()
104-
call_kwargs = MockSession.call_args[1]
105-
assert call_kwargs['credentials'] == client.credentials
106-
107-
@pytest.mark.asyncio
108-
async def test_close_manages_session_lifecycle(self):
109-
credentials = _make_credentials()
110-
with mock.patch("google.cloud.storage._experimental.asyncio.async_client.AsyncConnection"):
111-
client = self._make_one(project="PROJECT", credentials=credentials)
112-
113-
# 1. Internal session created by client -> Client closes it
114-
mock_internal = mock.AsyncMock()
115-
client._async_http_internal = mock_internal
116-
client._async_http_passed_by_user = False
117-
118-
await client.close()
119-
mock_internal.close.assert_awaited_once()
120-
121-
@pytest.mark.asyncio
122-
async def test_close_ignores_user_session(self):
123-
credentials = _make_credentials()
124-
user_session = mock.AsyncMock()
125-
126-
with mock.patch("google.cloud.storage._experimental.asyncio.async_client.AsyncConnection"):
127-
client = self._make_one(
128-
project="PROJECT",
129-
credentials=credentials,
130-
_async_http=user_session
131-
)
132-
133-
# 2. External session passed by user -> Client DOES NOT close it
134-
await client.close()
135-
user_session.close.assert_not_awaited()
80+
client._json_connection
81+
MockConn.assert_called_once_with(client, _async_http=async_http, credentials=client.credentials, client_info=None, api_endpoint=None)
13682

13783
def test_bucket_not_implemented(self):
13884
credentials = _make_credentials()
139-
with mock.patch("google.cloud.storage._experimental.asyncio.async_client.AsyncConnection"):
85+
with mock.patch("google.cloud.storage._experimental.asyncio.async_client.AsyncJSONConnection"):
14086
client = self._make_one(project="PROJECT", credentials=credentials)
14187

14288
with pytest.raises(NotImplementedError):

0 commit comments

Comments
 (0)