Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
54 commits
Select commit Hold shift + click to select a range
2374611
First draft
emmyzhou-db May 28, 2025
5148136
Update test
emmyzhou-db May 28, 2025
78c03a8
Clean up unit tests
emmyzhou-db May 30, 2025
5a37c59
Clean up comments
emmyzhou-db May 30, 2025
f5e4f8a
Add Javadoc to Token.java
emmyzhou-db May 30, 2025
3e65109
Add a token expiry buffer field
emmyzhou-db May 30, 2025
dfce414
Fix for comments
emmyzhou-db Jun 2, 2025
5bd4215
Update tests
emmyzhou-db Jun 3, 2025
93e0baf
Clean up tests
emmyzhou-db Jun 3, 2025
15e221d
Add logging
emmyzhou-db Jun 3, 2025
7589dab
Performance optimization
emmyzhou-db Jun 3, 2025
d97c734
Furter optimizations
emmyzhou-db Jun 3, 2025
105bc99
Add extra token state check in async refresh
emmyzhou-db Jun 3, 2025
b24a0fc
Change LocalDateTime to Instant
emmyzhou-db Jun 3, 2025
6f81b4c
Update parseExpiry in CilTokenSource
emmyzhou-db Jun 4, 2025
daac1b2
Update javadoc
emmyzhou-db Jun 4, 2025
f3d4b8a
Update javadoc
emmyzhou-db Jun 4, 2025
12123a9
Retrigger tests
emmyzhou-db Jun 5, 2025
def03c5
Merge branch 'main' into emmyzhou-db/localdatetime-to-instant
parthban-db Jun 5, 2025
8705ff5
Save progress
emmyzhou-db Jun 6, 2025
fdc50ef
Removed redundant date formattters
emmyzhou-db Jun 6, 2025
70934b2
Change clock supplier to use UTC time
emmyzhou-db Jun 6, 2025
8f19819
Update async tests
emmyzhou-db Jun 7, 2025
333d22e
Update async test
emmyzhou-db Jun 7, 2025
1bd052f
Add support for space separated expiry strings
emmyzhou-db Jun 7, 2025
408f3b4
revert test data
emmyzhou-db Jun 7, 2025
3f06444
Passing all tests
emmyzhou-db Jun 10, 2025
f12024f
Wrapped all token sources in cached token source
emmyzhou-db Jun 10, 2025
c32b752
update names
emmyzhou-db Jun 10, 2025
7e441d6
Small change
emmyzhou-db Jun 10, 2025
35cf2a4
Revert DatabricksConfig
emmyzhou-db Jun 10, 2025
b55a4e8
Merge branch 'emmyzhou-db/localdatetime-to-instant' into emmyzhou-db/…
emmyzhou-db Jun 10, 2025
457d02c
Merge branch 'emmyzhou-db/localdatetime-to-instant' into emmyzhou-db/…
emmyzhou-db Jun 10, 2025
1c99d4b
Merge branch 'emmyzhou-db/localdatetime-to-instant' into emmyzhou-db/…
emmyzhou-db Jun 10, 2025
63d246c
Use Instant with CachedTokenSource
emmyzhou-db Jun 10, 2025
114982e
Update AzureCliCredentials
emmyzhou-db Jun 11, 2025
8417e1b
Revert AzureCliCredentialsProvider
emmyzhou-db Jun 11, 2025
e843dc7
Update variable names
emmyzhou-db Jun 11, 2025
6cc046c
Merge branch 'main' into emmyzhou-db/async_token_cache_wrapper
emmyzhou-db Jun 16, 2025
b774e1a
clean up CachedTokenSource
emmyzhou-db Jun 16, 2025
e846dd7
Rename SystemClockSupplier to UtcClockSupplier
emmyzhou-db Jun 16, 2025
8970e1b
Refactor
emmyzhou-db Jun 17, 2025
e3499a8
Merge branch 'main' into emmyzhou-db/async_token_cache_wrapper
emmyzhou-db Jun 18, 2025
c880323
Clean up
emmyzhou-db Jun 18, 2025
75367bf
Add javadoc to builder class
emmyzhou-db Jun 18, 2025
68edacc
Add new attribute to config
emmyzhou-db Jun 20, 2025
a6b8df2
Merge branch 'main' into emmyzhou-db/async_token_cache_wrapper
emmyzhou-db Jun 26, 2025
bc66554
Use set instead of with for cached token source builder
emmyzhou-db Jun 26, 2025
3d6c39a
Update names
emmyzhou-db Jun 26, 2025
7e7ddae
Merge branch 'emmyzhou-db/async_token_cache_wrapper' into emmyzhou-db…
emmyzhou-db Jun 26, 2025
28bcb03
Enabled async by default via config
emmyzhou-db Jun 26, 2025
20ca154
revert formatter change
emmyzhou-db Jun 26, 2025
4c3d768
Merge branch 'main' into emmyzhou-db/disable-async-via-config
emmyzhou-db Jun 27, 2025
70e576c
Update changelog
emmyzhou-db Jun 27, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions NEXT_CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,9 @@
## Release v0.55.0

### New Features and Improvements
* Enabled asynchronous token refreshes by default. A new `disable_async_token_refresh` configuration option has been added to allow disabling this feature if necessary.
To disable asynchronous token refresh, set the environment variable `DATABRICKS_DISABLE_ASYNC_TOKEN_REFRESH=true` or configure it within your configuration object.
The previous `DATABRICKS_ENABLE_EXPERIMENTAL_ASYNC_TOKEN_REFRESH` option has been removed as asynchronous refresh is now the default behavior.

### Bug Fixes

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,10 @@ public CachedTokenSource tokenSourceFor(DatabricksConfig config, String resource
protected CachedTokenSource getTokenSource(DatabricksConfig config, List<String> cmd) {
CliTokenSource tokenSource =
new CliTokenSource(cmd, "tokenType", "accessToken", "expiresOn", config.getEnv());
CachedTokenSource cachedTokenSource = new CachedTokenSource.Builder(tokenSource).build();
CachedTokenSource cachedTokenSource =
new CachedTokenSource.Builder(tokenSource)
.setAsyncDisabled(config.getDisableAsyncTokenRefresh())
.build();
cachedTokenSource.getToken(); // Check if the CLI is installed and to validate the config.
return cachedTokenSource;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,10 @@ public OAuthHeaderFactory configure(DatabricksConfig config) {
return null;
}

CachedTokenSource cachedTokenSource = new CachedTokenSource.Builder(tokenSource).build();
CachedTokenSource cachedTokenSource =
new CachedTokenSource.Builder(tokenSource)
.setAsyncDisabled(config.getDisableAsyncTokenRefresh())
.build();
cachedTokenSource.getToken(); // We need this for checking if databricks CLI is installed.

return OAuthHeaderFactory.fromTokenSource(cachedTokenSource);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,10 @@ public class DatabricksConfig {
@ConfigAttribute(env = "DATABRICKS_OIDC_TOKEN_ENV", auth = "env-oidc")
private String oidcTokenEnv;

/** Disable asynchronous token refresh when set to true. */
@ConfigAttribute(env = "DATABRICKS_DISABLE_ASYNC_TOKEN_REFRESH")
private Boolean disableAsyncTokenRefresh;

public Environment getEnv() {
return env;
}
Expand Down Expand Up @@ -575,6 +579,15 @@ public DatabricksConfig setOidcTokenEnv(String oidcTokenEnv) {
return this;
}

public boolean getDisableAsyncTokenRefresh() {
return disableAsyncTokenRefresh != null && disableAsyncTokenRefresh;
}

public DatabricksConfig setDisableAsyncTokenRefresh(boolean disableAsyncTokenRefresh) {
this.disableAsyncTokenRefresh = disableAsyncTokenRefresh;
return this;
}

public boolean isAzure() {
if (azureWorkspaceResourceId != null) {
return true;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,10 @@ public OAuthHeaderFactory configure(DatabricksConfig config) {
config.getEffectiveAzureLoginAppId(),
idToken.get(),
"urn:ietf:params:oauth:client-assertion-type:jwt-bearer");
CachedTokenSource cachedTokenSource = new CachedTokenSource.Builder(tokenSource).build();
CachedTokenSource cachedTokenSource =
new CachedTokenSource.Builder(tokenSource)
.setAsyncDisabled(config.getDisableAsyncTokenRefresh())
.build();
return OAuthHeaderFactory.fromTokenSource(cachedTokenSource);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,8 @@ private static CachedTokenSource tokenSourceFor(DatabricksConfig config, String
.withEndpointParametersSupplier(() -> endpointParams)
.withAuthParameterPosition(AuthParameterPosition.BODY)
.build();
return new CachedTokenSource.Builder(clientCredentials).build();
return new CachedTokenSource.Builder(clientCredentials)
.setAsyncDisabled(config.getDisableAsyncTokenRefresh())
.build();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,7 @@ private enum TokenState {
// The token source to use for refreshing the token.
private final TokenSource tokenSource;
// Whether asynchronous refresh is enabled.
private boolean asyncEnabled =
Boolean.parseBoolean(System.getenv("DATABRICKS_ENABLE_EXPERIMENTAL_ASYNC_TOKEN_REFRESH"));
private boolean asyncDisabled = false;
// Duration before expiry to consider a token as 'stale'.
private final Duration staleDuration;
// Additional buffer before expiry to consider a token as expired.
Expand All @@ -50,15 +49,15 @@ private enum TokenState {
private final ClockSupplier clockSupplier;

// The current OAuth token. May be null if not yet fetched.
private volatile Token token;
protected volatile Token token;
// Whether a refresh is currently in progress (for async refresh).
private boolean refreshInProgress = false;
// Whether the last refresh attempt succeeded.
private boolean lastRefreshSucceeded = true;

private CachedTokenSource(Builder builder) {
this.tokenSource = builder.tokenSource;
this.asyncEnabled = builder.asyncEnabled;
this.asyncDisabled = builder.asyncDisabled;
this.staleDuration = builder.staleDuration;
this.expiryBuffer = builder.expiryBuffer;
this.clockSupplier = builder.clockSupplier;
Expand All @@ -73,7 +72,7 @@ private CachedTokenSource(Builder builder) {
*/
public static class Builder {
private final TokenSource tokenSource;
private boolean asyncEnabled = false;
private boolean asyncDisabled = false;
private Duration staleDuration = DEFAULT_STALE_DURATION;
private Duration expiryBuffer = DEFAULT_EXPIRY_BUFFER;
private ClockSupplier clockSupplier = new UtcClockSupplier();
Expand Down Expand Up @@ -110,11 +109,11 @@ public Builder setToken(Token token) {
* current token. When disabled, all refreshes are performed synchronously and will block the
* calling thread.
*
* @param asyncEnabled True to enable asynchronous refresh, false to disable.
* @param asyncDisabled True to disable asynchronous refresh, false to enable.
* @return This builder instance for method chaining.
*/
public Builder setAsyncEnabled(boolean asyncEnabled) {
this.asyncEnabled = asyncEnabled;
public Builder setAsyncDisabled(boolean asyncDisabled) {
this.asyncDisabled = asyncDisabled;
return this;
}

Expand Down Expand Up @@ -182,10 +181,10 @@ public CachedTokenSource build() {
* @return The current valid token
*/
public Token getToken() {
if (asyncEnabled) {
return getTokenAsync();
if (asyncDisabled) {
return getTokenBlocking();
}
return getTokenBlocking();
return getTokenAsync();
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ public class DataPlaneTokenSource {
private final HttpClient httpClient;
private final TokenSource cpTokenSource;
private final String host;
private final boolean asyncDisabled;
private final ConcurrentHashMap<TokenSourceKey, CachedTokenSource> sourcesCache;
/**
* Caching key for {@link EndpointTokenSource}, based on endpoint and authorization details. This
Expand All @@ -42,11 +43,13 @@ static TokenSourceKey create(String endpoint, String authDetails) {
* @throws NullPointerException if any parameter is null.
* @throws IllegalArgumentException if the host is empty.
*/
public DataPlaneTokenSource(HttpClient httpClient, TokenSource cpTokenSource, String host) {
public DataPlaneTokenSource(
HttpClient httpClient, TokenSource cpTokenSource, String host, boolean asyncDisabled) {
this.httpClient = Objects.requireNonNull(httpClient, "HTTP client cannot be null");
this.cpTokenSource =
Objects.requireNonNull(cpTokenSource, "Control plane token source cannot be null");
this.host = Objects.requireNonNull(host, "Host cannot be null");
this.asyncDisabled = asyncDisabled;

if (host.isEmpty()) {
throw new IllegalArgumentException("Host cannot be empty");
Expand Down Expand Up @@ -85,6 +88,7 @@ public Token getToken(String endpoint, String authDetails) {
new CachedTokenSource.Builder(
new EndpointTokenSource(
this.cpTokenSource, k.authDetails(), this.httpClient, this.host))
.setAsyncDisabled(asyncDisabled)
.build());

return specificSource.getToken();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,10 @@ public OAuthHeaderFactory configure(DatabricksConfig config) {
Optional.of(config.getEffectiveOAuthRedirectUrl()),
Optional.of(tokenCache));

CachedTokenSource cachedTokenSource = new CachedTokenSource.Builder(tokenSource).build();
CachedTokenSource cachedTokenSource =
new CachedTokenSource.Builder(tokenSource)
.setAsyncDisabled(config.getDisableAsyncTokenRefresh())
.build();
LOGGER.debug("Using cached token, will immediately refresh");
cachedTokenSource.getToken();
return OAuthHeaderFactory.fromTokenSource(cachedTokenSource);
Expand Down Expand Up @@ -128,6 +131,9 @@ CachedTokenSource performBrowserAuth(
Optional.ofNullable(config.getEffectiveOAuthRedirectUrl()),
Optional.ofNullable(tokenCache));

return new CachedTokenSource.Builder(tokenSource).setToken(token).build();
return new CachedTokenSource.Builder(tokenSource)
.setToken(token)
.setAsyncDisabled(config.getDisableAsyncTokenRefresh())
.build();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,10 @@ public HeaderFactory configure(DatabricksConfig config) throws DatabricksExcepti
.build())
.build();

CachedTokenSource cachedTokenSource = new CachedTokenSource.Builder(clientCredentials).build();
CachedTokenSource cachedTokenSource =
new CachedTokenSource.Builder(clientCredentials)
.setAsyncDisabled(config.getDisableAsyncTokenRefresh())
.build();

return () -> {
Map<String, String> headers = new HashMap<>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,9 @@ public OAuthHeaderFactory configure(DatabricksConfig config) {
.build();

CachedTokenSource cachedTokenSource =
new CachedTokenSource.Builder(clientCredentials).build();
new CachedTokenSource.Builder(clientCredentials)
.setAsyncDisabled(config.getDisableAsyncTokenRefresh())
.build();

return OAuthHeaderFactory.fromTokenSource(cachedTokenSource);
} catch (IOException e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,9 @@ public OAuthHeaderFactory configure(DatabricksConfig config) {
TokenSource cachedTokenSource =
(tokenSource instanceof CachedTokenSource)
? tokenSource
: new CachedTokenSource.Builder(tokenSource).build();
: new CachedTokenSource.Builder(tokenSource)
.setAsyncDisabled(config.getDisableAsyncTokenRefresh())
.build();

try {
// Validate that we can get a token before returning a HeaderFactory
Expand Down

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Original file line number Diff line number Diff line change
Expand Up @@ -23,20 +23,20 @@ public class CachedTokenSourceTest {

private static Stream<Arguments> provideAsyncRefreshScenarios() {
return Stream.of(
Arguments.of("Fresh token, async enabled", FRESH_MINUTES, true, false, INITIAL_TOKEN),
Arguments.of("Stale token, async enabled", STALE_MINUTES, true, true, INITIAL_TOKEN),
Arguments.of("Expired token, async enabled", EXPIRED_MINUTES, true, true, REFRESH_TOKEN),
Arguments.of("Fresh token, async disabled", FRESH_MINUTES, false, false, INITIAL_TOKEN),
Arguments.of("Stale token, async disabled", STALE_MINUTES, false, false, INITIAL_TOKEN),
Arguments.of("Expired token, async disabled", EXPIRED_MINUTES, false, true, REFRESH_TOKEN));
Arguments.of("Fresh token, async enabled", FRESH_MINUTES, false, false, INITIAL_TOKEN),
Arguments.of("Stale token, async enabled", STALE_MINUTES, false, true, INITIAL_TOKEN),
Arguments.of("Expired token, async enabled", EXPIRED_MINUTES, false, true, REFRESH_TOKEN),
Arguments.of("Fresh token, async disabled", FRESH_MINUTES, true, false, INITIAL_TOKEN),
Arguments.of("Stale token, async disabled", STALE_MINUTES, true, false, INITIAL_TOKEN),
Arguments.of("Expired token, async disabled", EXPIRED_MINUTES, true, true, REFRESH_TOKEN));
}

@ParameterizedTest(name = "{0}")
@MethodSource("provideAsyncRefreshScenarios")
void testAsyncRefreshParametrized(
String testName,
long minutesUntilExpiry,
boolean asyncEnabled,
boolean asyncDisabled,
boolean expectRefresh,
String expectedToken)
throws Exception {
Expand Down Expand Up @@ -67,7 +67,7 @@ public Token getToken() {

CachedTokenSource source =
new CachedTokenSource.Builder(tokenSource)
.setAsyncEnabled(asyncEnabled)
.setAsyncDisabled(asyncDisabled)
.setToken(initialToken)
.build();

Expand Down Expand Up @@ -127,7 +127,7 @@ public Token getToken() {
TestSource testSource = new TestSource();
CachedTokenSource source =
new CachedTokenSource.Builder(testSource)
.setAsyncEnabled(true)
.setAsyncDisabled(false)
.setToken(staleToken)
.setClockSupplier(clockSupplier)
.build();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -184,11 +184,13 @@ void testDataPlaneTokenSource(
assertThrows(
expectedException,
() -> {
DataPlaneTokenSource source = new DataPlaneTokenSource(httpClient, cpTokenSource, host);
DataPlaneTokenSource source =
new DataPlaneTokenSource(httpClient, cpTokenSource, host, false);
source.getToken(endpoint, authDetails);
});
} else {
DataPlaneTokenSource source = new DataPlaneTokenSource(httpClient, cpTokenSource, host);
DataPlaneTokenSource source =
new DataPlaneTokenSource(httpClient, cpTokenSource, host, false);
Token token = source.getToken(endpoint, authDetails);
assertNotNull(token);
assertEquals(expectedToken.getAccessToken(), token.getAccessToken());
Expand All @@ -214,7 +216,7 @@ void testEndpointTokenSourceCaching() throws Exception {
try (MockedConstruction<EndpointTokenSource> mockedConstruction =
mockConstruction(EndpointTokenSource.class)) {
DataPlaneTokenSource source =
new DataPlaneTokenSource(mockHttpClient, mockCpTokenSource, TEST_HOST);
new DataPlaneTokenSource(mockHttpClient, mockCpTokenSource, TEST_HOST, false);

// First call - should create new EndpointTokenSource
source.getToken(TEST_ENDPOINT_1, TEST_AUTH_DETAILS_1);
Expand Down
Loading