Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -167,4 +167,4 @@ jobs:
meson compile -C builddir
- name: Test Iceberg
run: |
meson test -C builddir --timeout-multiplier 0
meson test -C builddir --timeout-multiplier 0 --print-errorlogs
1 change: 1 addition & 0 deletions src/iceberg/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,7 @@ set(ICEBERG_SOURCES
update/update_snapshot_reference.cc
update/update_sort_order.cc
update/update_statistics.cc
util/base64.cc
util/bucket_util.cc
util/content_file_util.cc
util/conversions.cc
Expand Down
1 change: 1 addition & 0 deletions src/iceberg/catalog/rest/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ set(ICEBERG_REST_SOURCES
auth/auth_properties.cc
auth/auth_session.cc
auth/oauth2_util.cc
auth/token_refresh_scheduler.cc
catalog_properties.cc
endpoint.cc
error_handlers.cc
Expand Down
12 changes: 6 additions & 6 deletions src/iceberg/catalog/rest/auth/auth_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,8 @@
#include "iceberg/catalog/rest/auth/auth_properties.h"
#include "iceberg/catalog/rest/auth/auth_session.h"
#include "iceberg/catalog/rest/auth/oauth2_util.h"
#include "iceberg/util/base64.h"
#include "iceberg/util/macros.h"
#include "iceberg/util/transform_util.h"

namespace iceberg::rest::auth {

Expand Down Expand Up @@ -83,8 +83,7 @@ class BasicAuthManager : public AuthManager {
"Missing required property '{}'", AuthProperties::kBasicPassword);
std::string credential = username_it->second + ":" + password_it->second;
return AuthSession::MakeDefault(
{{std::string(kAuthorizationHeader),
"Basic " + TransformUtil::Base64Encode(credential)}});
{{std::string(kAuthorizationHeader), "Basic " + Base64::Encode(credential)}});
}
};

Expand Down Expand Up @@ -130,7 +129,8 @@ class OAuth2Manager : public AuthManager {
init_token_response_.reset();
return AuthSession::MakeOAuth2(token_response, config.oauth2_server_uri(),
config.client_id(), config.client_secret(),
config.scope(), client);
config.scope(), config.keep_refreshed(),
config.optional_oauth_params(), client);
}

// If token is provided, use it directly.
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Java wraps configured access tokens in fromAccessToken() so expiring tokens can be refreshed. Returning a static session here means JWT or token-expires-in-ms based tokens never refresh.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agreed. This is tightly coupled to the token-exchange work (a configured token has no credential, so Java refreshes it via exchange), so I'll bundle it into the same follow-up PR: add token-expires-in-ms, populate expires_at_millis_, and wrap configured tokens in a refreshable session. Will link the PR here.

Expand All @@ -143,10 +143,10 @@ class OAuth2Manager : public AuthManager {
auto base_session = AuthSession::MakeDefault(AuthHeaders(config.token()));
OAuthTokenResponse token_response;
ICEBERG_ASSIGN_OR_RAISE(token_response, FetchToken(client, *base_session, config));
// TODO(lishuxu): should we directly pass config to the MakeOAuth2 call?
return AuthSession::MakeOAuth2(token_response, config.oauth2_server_uri(),
config.client_id(), config.client_secret(),
config.scope(), client);
config.scope(), config.keep_refreshed(),
config.optional_oauth_params(), client);
}

return AuthSession::MakeDefault({});
Expand Down
210 changes: 204 additions & 6 deletions src/iceberg/catalog/rest/auth/auth_session.cc
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,16 @@

#include "iceberg/catalog/rest/auth/auth_session.h"

#include <algorithm>
#include <chrono>
#include <memory>
#include <shared_mutex>
#include <utility>

#include "iceberg/catalog/rest/auth/auth_properties.h"
#include "iceberg/catalog/rest/auth/oauth2_util.h"
#include "iceberg/catalog/rest/auth/token_refresh_scheduler.h"
#include "iceberg/catalog/rest/http_client.h"

namespace iceberg::rest::auth {

Expand All @@ -44,6 +51,189 @@ class DefaultAuthSession : public AuthSession {
std::unordered_map<std::string, std::string> headers_;
};

/// \brief OAuth2 session with automatic token refresh.
class OAuth2AuthSession : public AuthSession,
public std::enable_shared_from_this<OAuth2AuthSession> {
public:
struct Config {
std::string token_endpoint;
std::string client_id;
std::string client_secret;
std::string scope;
std::unordered_map<std::string, std::string> optional_oauth_params;
bool keep_refreshed;
};

/// \brief Create an OAuth2 session and optionally schedule refresh.
static std::shared_ptr<OAuth2AuthSession> Make(const OAuthTokenResponse& initial_token,
Config config, HttpClient& client) {
auto session = std::shared_ptr<OAuth2AuthSession>(
new OAuth2AuthSession(std::move(config), client));
session->SetInitialToken(initial_token);
return session;
}

Status Authenticate(std::unordered_map<std::string, std::string>& headers) override {
std::shared_lock lock(mutex_);
for (const auto& [key, value] : headers_) {
headers.try_emplace(key, value);
}
return {};
}

Status Close() override {
bool expected = false;
if (!closed_.compare_exchange_strong(expected, true)) {
return {}; // Already closed
}
TokenRefreshScheduler::Instance().Cancel(scheduled_task_id_.load());
return {};
}

private:
OAuth2AuthSession(Config config, HttpClient& client)
: config_(std::move(config)), client_(client) {}

void SetInitialToken(const OAuthTokenResponse& token_response) {
token_ = token_response.access_token;
headers_ = {{std::string(kAuthorizationHeader), std::string(kBearerPrefix) + token_}};

// Determine expiration time
if (token_response.expires_in_secs.has_value()) {
expires_at_ = std::chrono::steady_clock::now() +
std::chrono::seconds(*token_response.expires_in_secs);
} else if (auto exp_ms = ExpiresAtMillis(token_); exp_ms.has_value()) {
// Convert absolute epoch millis to steady_clock time_point
auto now_sys = std::chrono::system_clock::now();
auto now_steady = std::chrono::steady_clock::now();
auto exp_sys =
std::chrono::system_clock::time_point(std::chrono::milliseconds(*exp_ms));
expires_at_ = now_steady + (exp_sys - now_sys);
}

if (config_.keep_refreshed &&
expires_at_ != std::chrono::steady_clock::time_point{}) {
ScheduleRefresh();
}
}

void DoRefresh() { DoRefreshAttempt(0, std::chrono::milliseconds(200)); }

/// \brief Single refresh attempt. On failure, schedules a retry via the
/// scheduler (non-blocking) instead of sleeping on the worker thread.
void DoRefreshAttempt(int attempt, std::chrono::milliseconds backoff) {
static constexpr int kMaxRetries = 5;
static constexpr auto kMaxBackoff = std::chrono::milliseconds(10'000);

if (closed_.load()) return;

// Build credential and properties once (invariant across retries)
std::string credential = config_.client_id.empty()
? config_.client_secret
: config_.client_id + ":" + config_.client_secret;

// Use an empty session for the refresh request (no auth headers —
// avoids circular dependency of using an expired token to refresh itself)
auto empty_session = AuthSession::MakeDefault({});

AuthProperties props;
Comment thread
lishuxu marked this conversation as resolved.
props.Set(AuthProperties::kCredential, credential);
props.Set(AuthProperties::kScope, config_.scope);
props.Set(AuthProperties::kOAuth2ServerUri, config_.token_endpoint);
for (const auto& [key, value] : config_.optional_oauth_params) {
props.mutable_configs().insert_or_assign(key, value);
}

auto result = FetchToken(client_, *empty_session, props);
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Java refreshes through token exchange by default and only falls back to client credentials in specific cases. This path always uses client_credentials and ignores token-exchange-enabled, which breaks parity for exchange-based refresh.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch. This path always uses client_credentials and ignores token-exchange-enabled, unlike Java which refreshes via token exchange by default. I'll address this in a dedicated follow-up PR (exchange request builder + refresh flow rework), and link it here. Keeping this PR scoped to client_credentials auto-refresh.

if (result.has_value()) {
auto& response = result.value();
{
std::unique_lock lock(mutex_);
token_ = response.access_token;
headers_ = {
{std::string(kAuthorizationHeader), std::string(kBearerPrefix) + token_}};

// Reset before deriving new expiry
expires_at_ = std::chrono::steady_clock::time_point{};

if (response.expires_in_secs.has_value()) {
Comment thread
lishuxu marked this conversation as resolved.
expires_at_ = std::chrono::steady_clock::now() +
std::chrono::seconds(*response.expires_in_secs);
} else if (auto exp_ms = ExpiresAtMillis(token_); exp_ms.has_value()) {
auto now_sys = std::chrono::system_clock::now();
auto now_steady = std::chrono::steady_clock::now();
auto exp_sys =
std::chrono::system_clock::time_point(std::chrono::milliseconds(*exp_ms));
expires_at_ = now_steady + (exp_sys - now_sys);
}
}
// Note: ScheduleRefresh must be called outside the lock.
ScheduleRefresh();
return; // Success
}

// Schedule retry with exponential backoff (non-blocking)
if (attempt + 1 < kMaxRetries) {
auto next_backoff =
std::min(std::chrono::duration_cast<std::chrono::milliseconds>(backoff * 2),
kMaxBackoff);
std::weak_ptr<OAuth2AuthSession> weak_self = shared_from_this();
TokenRefreshScheduler::Instance().Schedule(
backoff,
[weak_self = std::move(weak_self), next_attempt = attempt + 1, next_backoff] {
if (auto self = weak_self.lock()) {
self->DoRefreshAttempt(next_attempt, next_backoff);
}
});
}
// All retries exhausted — stop refreshing silently.
// Next request will use the expired token; server returns 401.
}

/// \brief Schedule the next token refresh based on expiration time.
///
/// Must be called outside any lock on mutex_ (CalculateRefreshDelay
/// acquires shared_lock internally).
void ScheduleRefresh() {
if (!config_.keep_refreshed || closed_.load()) return;

auto delay = CalculateRefreshDelay();
if (delay <= std::chrono::milliseconds::zero()) return;

std::weak_ptr<OAuth2AuthSession> weak_self = shared_from_this();
auto new_id = TokenRefreshScheduler::Instance().Schedule(
delay, [weak_self = std::move(weak_self)] {
if (auto self = weak_self.lock()) {
self->DoRefresh();
}
});
scheduled_task_id_.store(new_id);
}

std::chrono::milliseconds CalculateRefreshDelay() const {
std::shared_lock lock(mutex_);
auto now = std::chrono::steady_clock::now();
if (expires_at_ <= now) return std::chrono::milliseconds::zero();

auto expires_in =
std::chrono::duration_cast<std::chrono::milliseconds>(expires_at_ - now);
// Refresh window: 10% of remaining time, capped at 5 minutes
auto refresh_window = std::min(expires_in / 10, std::chrono::milliseconds(300'000));
auto wait_time = expires_in - refresh_window;
return std::max(wait_time, std::chrono::milliseconds(10));
}

mutable std::shared_mutex mutex_; // protects token_, headers_, expires_at_
std::string token_;
std::unordered_map<std::string, std::string> headers_;
std::chrono::steady_clock::time_point expires_at_{};

Config config_;
HttpClient& client_;
std::atomic<uint64_t> scheduled_task_id_{0};
std::atomic<bool> closed_{false};
};

} // namespace

std::shared_ptr<AuthSession> AuthSession::MakeDefault(
Expand All @@ -52,12 +242,20 @@ std::shared_ptr<AuthSession> AuthSession::MakeDefault(
}

std::shared_ptr<AuthSession> AuthSession::MakeOAuth2(
const OAuthTokenResponse& initial_token, const std::string& /*token_endpoint*/,
const std::string& /*client_id*/, const std::string& /*client_secret*/,
const std::string& /*scope*/, HttpClient& /*client*/) {
// TODO(lishuxu): Create OAuth2AuthSession with auto-refresh support.
return MakeDefault({{std::string(kAuthorizationHeader),
std::string(kBearerPrefix) + initial_token.access_token}});
const OAuthTokenResponse& initial_token, const std::string& token_endpoint,
const std::string& client_id, const std::string& client_secret,
const std::string& scope, bool keep_refreshed,
const std::unordered_map<std::string, std::string>& optional_oauth_params,
HttpClient& client) {
OAuth2AuthSession::Config config{
.token_endpoint = token_endpoint,
.client_id = client_id,
.client_secret = client_secret,
.scope = scope,
.optional_oauth_params = optional_oauth_params,
.keep_refreshed = keep_refreshed,
};
return OAuth2AuthSession::Make(initial_token, std::move(config), client);
}

} // namespace iceberg::rest::auth
14 changes: 8 additions & 6 deletions src/iceberg/catalog/rest/auth/auth_session.h
Original file line number Diff line number Diff line change
Expand Up @@ -84,14 +84,16 @@ class ICEBERG_REST_EXPORT AuthSession {
/// \param client_id OAuth2 client ID for refresh requests.
/// \param client_secret OAuth2 client secret for re-fetch if refresh fails.
/// \param scope OAuth2 scope for refresh requests.
/// \param keep_refreshed Whether to schedule automatic token refresh.
/// \param optional_oauth_params Optional OAuth params (audience, resource) for refresh.
/// \param client HTTP client for making refresh requests.
/// \return A new session that manages token lifecycle automatically.
static std::shared_ptr<AuthSession> MakeOAuth2(const OAuthTokenResponse& initial_token,
const std::string& token_endpoint,
const std::string& client_id,
const std::string& client_secret,
const std::string& scope,
HttpClient& client);
static std::shared_ptr<AuthSession> MakeOAuth2(
const OAuthTokenResponse& initial_token, const std::string& token_endpoint,
const std::string& client_id, const std::string& client_secret,
const std::string& scope, bool keep_refreshed,
const std::unordered_map<std::string, std::string>& optional_oauth_params,
HttpClient& client);
};

} // namespace iceberg::rest::auth
46 changes: 46 additions & 0 deletions src/iceberg/catalog/rest/auth/oauth2_util.cc
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
#include "iceberg/catalog/rest/http_client.h"
#include "iceberg/catalog/rest/json_serde_internal.h"
#include "iceberg/json_serde_internal.h"
#include "iceberg/util/base64.h"
#include "iceberg/util/macros.h"

namespace iceberg::rest::auth {
Expand Down Expand Up @@ -74,4 +75,49 @@ Result<OAuthTokenResponse> FetchToken(HttpClient& client, AuthSession& session,
return token_response;
}

std::optional<int64_t> ExpiresAtMillis(std::string_view token) {
if (token.empty()) {
return std::nullopt;
}

// A JWT has exactly 3 dot-separated parts: header.payload.signature
auto first_dot = token.find('.');
if (first_dot == std::string_view::npos) {
return std::nullopt;
}
auto second_dot = token.find('.', first_dot + 1);
if (second_dot == std::string_view::npos) {
return std::nullopt;
}
// Ensure there are exactly 3 parts (no additional dots after the signature).
// Note: JWE tokens have 5 segments — they are intentionally not supported here
// and will return nullopt (graceful degradation to not scheduling refresh).
if (token.find('.', second_dot + 1) != std::string_view::npos) {
return std::nullopt;
}

// Extract and decode the payload (second part).
// Note: Base64::UrlDecode returns an error on invalid input, and Ok("") on empty input.
// A valid JWT payload is never empty (at minimum "{}"), so empty result reliably
// indicates the token is not a JWT we can parse.
std::string_view payload_b64 = token.substr(first_dot + 1, second_dot - first_dot - 1);
auto payload_result = Base64::UrlDecode(payload_b64);
if (!payload_result.has_value() || payload_result->empty()) {
return std::nullopt;
}
const std::string& payload = *payload_result;

// Parse JSON and extract "exp" claim
auto json = nlohmann::json::parse(payload, nullptr, false);
if (json.is_discarded() || !json.is_object()) {
return std::nullopt;
}
auto it = json.find("exp");
if (it == json.end() || !it->is_number()) {
return std::nullopt;
}
auto exp_seconds = static_cast<int64_t>(it->get<double>());
return exp_seconds * 1000; // Convert seconds to milliseconds
}

} // namespace iceberg::rest::auth
Loading
Loading