feat(auth): implement regional access boundary support for standalone JWT and async service accounts#17025
feat(auth): implement regional access boundary support for standalone JWT and async service accounts#17025nbayati wants to merge 10 commits into
Conversation
…ement async refresh manager
…support Regional Access Boundary logic
There was a problem hiding this comment.
Code Review
This pull request implements asynchronous support for Regional Access Boundary (RAB) management, including background refresh tasks and mTLS endpoint support. Key changes include the addition of _AsyncRegionalAccessBoundaryRefreshManager, updates to JWT and service account credentials to handle RAB state during cloning and serialization, and comprehensive test coverage for these new flows. However, a critical issue was identified where the refresh method in google/auth/jwt.py was renamed to _perform_refresh_token, which will break token updates as the base class expects a refresh method. Additionally, a typo was found in a test assertion URL.
| self._worker_task = None | ||
|
|
||
| def __getstate__(self): | ||
| """Pickle helper that serializes the _lock and _worker_task attributes.""" |
There was a problem hiding this comment.
This comment seems misleading/incorrect to me. From what I can tell, this removes these attributes from the serialization of this manager entirely (it doesn't help in/allow the attributes to be serialized).
| return state | ||
|
|
||
| def __setstate__(self, state): | ||
| """Pickle helper that deserializes the _lock and _worker_task attributes.""" |
There was a problem hiding this comment.
Similarly, I don't think this is actually deserializing anything for these attributes, but it is re-initiating them
| except asyncio.TimeoutError: | ||
| return False, {}, False | ||
|
|
||
| response_body1 = await response.content() |
There was a problem hiding this comment.
This is an async_generator, not an awaitable method. I think this crashes as written. Should call await response.read() instead.
Let's also add a test that will catch this, and cover in try/catch block
| try: | ||
| if timeout: | ||
| response = await asyncio.wait_for( | ||
| request(method="GET", url=url, headers=headers), timeout=timeout |
There was a problem hiding this comment.
Should timeout be passed to the request here?
| ) | ||
| else: | ||
| response = await request(method="GET", url=url, headers=headers) | ||
| except asyncio.TimeoutError: |
There was a problem hiding this comment.
What happens if the request fails? I think we may need to catch additional errors here (timeout error, transport error..)
| @@ -369,6 +401,8 @@ def _copy_regional_access_boundary_manager(self, target): | |||
| # but share the immutable data reference to avoid unnecessary initial lookups. | |||
| new_manager = _regional_access_boundary_utils._RegionalAccessBoundaryManager() | |||
| new_manager._data = self._rab_manager._data | |||
| # Preserve the type of refresh manager (sync or async) | |||
| new_manager.refresh_manager = self._rab_manager.refresh_manager.__class__() | |||
There was a problem hiding this comment.
I don't think this is safe across sync/async creds.
- Using the source refresh-manager means we can have an async refresh manager on a sync cred which can later call asyncio.create_task() from a sync before_request.
- _use_blocking_regional_access_boundary_lookup is not kept, so a credential configured for blocking RAB can become non-blocking after with_scopes(), with_quota_project(), etc (breaking gcloud)
I think the cred should keep its initialized manager type and we should only copy the RAB data/config.
| # Error was already logged by _lookup_regional_access_boundary_request | ||
| return None | ||
|
|
||
| if "encodedLocations" not in response_data: |
There was a problem hiding this comment.
Nit: should fail open on bad server responses, so we should check that response_data is a dict
| except ValueError: | ||
| response_data = response_body | ||
|
|
||
| if response.status == http_client.OK: |
There was a problem hiding this comment.
In google.auth.aio.transport.Response, the HTTP status code is exposed via the status_code property, not status. Passing a compliant google.auth.aio transport callable raises AttributeError: 'Response' object has no attribute 'status'. Please update the async lookup and grant methods to check .status_code.
| @@ -288,3 +289,145 @@ async def refresh_grant( | |||
| request, token_uri, body, can_retry=can_retry | |||
| ) | |||
| return client._handle_refresh_grant_response(response_data, refresh_token) | |||
|
|
|||
|
|
|||
There was a problem hiding this comment.
is _jwt_async.py out of scope?
| await credentials_async.Credentials.before_request( | ||
| self, request, method, url, headers | ||
| ) | ||
| self._maybe_start_regional_access_boundary_refresh(request, url) | ||
| self._rab_manager.apply_headers(headers) |
There was a problem hiding this comment.
This may be redundant, why is this needed?
| # A refresh is already in progress. | ||
| return | ||
|
|
||
| async def _worker(): |
There was a problem hiding this comment.
Unlike the synchronous refresh manager which safely deepcopies the transport (copied_request = copy.deepcopy(request)), the async manager passes the exact same request instance directly into the background coroutine task. Because start_refresh is invoked inside before_request, the main application coroutine immediately proceeds to make its actual service API HTTP call using the exact same request transport while the background task is concurrently using it, risking HTTP state corruption or interleaved headers.
Additionally, spawning asyncio.create_task(_worker()) without tracking cancellation hooks upon client session closure can potentially cause dangling tasks that raise RuntimeError: Session is closed when executing against closed client sessions.
| @@ -66,6 +67,12 @@ class Credentials( | |||
| credentials = credentials.with_quota_project('myproject-123') | |||
| """ | |||
|
|
|||
| def __init__(self, *args, **kwargs): | |||
There was a problem hiding this comment.
_service_account_async.Credentials lacks a __setstate__ override. When older pickled async credentials unpickle, they fall back to CredentialsWithRegionalAccessBoundary.__setstate__, which attaches a synchronous _RegionalAccessBoundaryRefreshManager. If a background RAB lookup triggers on this unpickled credential, a synchronous background thread will invoke async def _lookup_regional_access_boundary synchronously without awaiting it, causing a fatal thread crash (AttributeError: 'coroutine' object has no attribute 'get'). Please implement __setstate__ to ensure self._rab_manager.refresh_manager is always restored as an _AsyncRegionalAccessBoundaryRefreshManager().
This PR implements the following changes: