Skip to content
1 change: 1 addition & 0 deletions NEXT_CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,5 +11,6 @@
### Documentation

### Internal Changes
* Replace the async-disabling mechanism on token refresh failure with a 1-minute retry backoff. Previously, a single failed async refresh would disable proactive token renewal until the token expired. Now, the SDK waits a short cooldown period and retries, improving resilience to transient errors.

### API Changes
66 changes: 43 additions & 23 deletions databricks/sdk/oauth.py
Original file line number Diff line number Diff line change
Expand Up @@ -247,12 +247,11 @@ class Refreshable(TokenSource):

_EXECUTOR = None
_EXECUTOR_LOCK = threading.Lock()
# Legacy default duration for the stale period. This value is chosen to cover the
# maximum monthly downtime allowed by a 99.99% uptime SLA (~4.38 minutes).
_DEFAULT_STALE_DURATION = timedelta(minutes=5)
# Default maximum stale duration. Chosen to cover the maximum monthly downtime
# allowed by a 99.99% uptime SLA (~4.38 minutes) with generous overhead guarantees
_MAX_STALE_DURATION = timedelta(minutes=20)
# Backoff time after an async refresh failure before trying another one.
_ASYNC_REFRESH_RETRY_BACKOFF = timedelta(minutes=1)

@classmethod
def _get_executor(cls):
Expand All @@ -271,15 +270,25 @@ def __init__(
stale_duration: Optional[timedelta] = None,
):
# Config properties
self._use_dynamic_stale_duration = stale_duration is None
self._use_legacy_stale_duration = stale_duration is not None
# Only read on the legacy path (when _use_legacy_stale_duration is True).
self._stale_duration = stale_duration if stale_duration is not None else timedelta(seconds=0)
Comment thread
mihaimitrea-db marked this conversation as resolved.
self._disable_async = disable_async
# Lock
self._lock = threading.Lock()
# Non Thread safe properties. They should be accessed only when protected by the lock above.
self._stale_after: Optional[datetime] = None
self._token_generation: int = 0
self._update_token(token or Token(""))
self._is_refreshing = False
self._refresh_err = False

def _now(self) -> datetime:
Comment thread
mihaimitrea-db marked this conversation as resolved.
Comment thread
mihaimitrea-db marked this conversation as resolved.
"""Return the current time, matching the tz-awareness of the cached token."""
if self._token.expiry:
return datetime.now(tz=self._token.expiry.tzinfo)
if self._stale_after:
return datetime.now(tz=self._stale_after.tzinfo)
return datetime.now()

def _update_token(self, token: Token) -> None:
"""Stores the new token and pre-computes the stale threshold.
Expand All @@ -289,17 +298,28 @@ def _update_token(self, token: Token) -> None:

This ensures short-lived tokens (e.g. FastPath with 10-minute TTL) get a
proportionally smaller stale window, while standard OAuth tokens (≥1 hour TTL)
use the full cap of _DEFAULT_STALE_DURATION.
use the full cap of _MAX_STALE_DURATION.
"""
self._token = token
Comment thread
mihaimitrea-db marked this conversation as resolved.
self._token_generation += 1
self._stale_after = None

if self._use_dynamic_stale_duration and self._token.expiry:
ttl = self._token.expiry - datetime.now()

if ttl < timedelta(seconds=0):
self._stale_duration = timedelta(seconds=0)
if self._token.expiry:
if self._use_legacy_stale_duration:
self._stale_after = self._token.expiry - self._stale_duration
else:
self._stale_duration = min(ttl // 2, self._MAX_STALE_DURATION)
ttl = self._token.expiry - self._now()
stale_duration = max(timedelta(seconds=0), min(ttl // 2, self._MAX_STALE_DURATION))
self._stale_after = self._token.expiry - stale_duration

def _handle_failed_async_refresh(self) -> None:
"""Pushes _stale_after forward by the retry backoff, making the token appear fresh temporarily.

This may set _stale_after past the token's expiry; that is safe because
_token_state() checks expiry before staleness.
"""
if self._stale_after:
self._stale_after = self._now() + self._ASYNC_REFRESH_RETRY_BACKOFF

# This is the main entry point for the Token. Do not access the token
# using any of the internal functions.
Expand Down Expand Up @@ -333,19 +353,16 @@ def _token_state(self) -> _TokenState:
if not self._token.expiry:
return _TokenState.FRESH

lifespan = self._token.expiry - datetime.now()
if lifespan < timedelta(seconds=0):
now = self._now()
if self._token.expiry < now:
return _TokenState.EXPIRED
if lifespan < self._stale_duration:
if self._stale_after and self._stale_after < now:
return _TokenState.STALE
return _TokenState.FRESH

def _blocking_token(self) -> Token:
"""Returns a token, blocking if necessary to refresh it."""
state = self._token_state()
# This is important to recover from potential previous failed attempts
# to refresh the token asynchronously.
self._refresh_err = False
self._is_refreshing = False

# It's possible that the token got refreshed (either by a _blocking_refresh or
Expand All @@ -359,28 +376,31 @@ def _blocking_token(self) -> Token:

def _trigger_async_refresh(self):
"""Starts an asynchronous refresh if none is in progress."""
gen_at_submit = self._token_generation

def _refresh_internal():
new_token = None
try:
new_token = self.refresh()
except Exception as e:
# This happens on a thread, so we don't want to propagate the error.
# Instead, if there is no new_token for any reason, we will disable async refresh below
# But we will do it inside the lock.
# Instead, if there is no new_token for any reason, we apply a retry
# backoff below so the token appears fresh for a short cooldown period.
logger.warning(f"Tried to refresh token asynchronously, but failed: {e}")

with self._lock:
if new_token is not None:
if self._token_generation != gen_at_submit:
logger.debug("Async refresh completed but token was already updated; discarding result.")
elif new_token is not None:
self._update_token(new_token)
else:
self._refresh_err = True
self._handle_failed_async_refresh()
self._is_refreshing = False

# The token may have been refreshed by another thread.
if self._token_state() == _TokenState.FRESH:
return
if not self._is_refreshing and not self._refresh_err:
if not self._is_refreshing:
self._is_refreshing = True
Refreshable._get_executor().submit(_refresh_internal)

Expand Down
Loading
Loading