diff --git a/Cargo.lock b/Cargo.lock
index f281319aa6..301b2b9428 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -2395,6 +2395,7 @@ dependencies = [
"async-trait",
"chrono",
"ctor",
+ "futures",
"http 1.3.1",
"iceberg",
"iceberg_test_utils",
diff --git a/crates/catalog/rest/Cargo.toml b/crates/catalog/rest/Cargo.toml
index 916b5ccf75..44e4a22e32 100644
--- a/crates/catalog/rest/Cargo.toml
+++ b/crates/catalog/rest/Cargo.toml
@@ -45,6 +45,7 @@ uuid = { workspace = true, features = ["v4"] }
[dev-dependencies]
ctor = { workspace = true }
+futures = { workspace = true }
iceberg_test_utils = { path = "../../test_utils", features = ["tests"] }
mockito = { workspace = true }
port_scanner = { workspace = true }
diff --git a/crates/catalog/rest/src/catalog.rs b/crates/catalog/rest/src/catalog.rs
index ce5656cfca..c784f32039 100644
--- a/crates/catalog/rest/src/catalog.rs
+++ b/crates/catalog/rest/src/catalog.rs
@@ -44,8 +44,8 @@ use crate::client::{
};
use crate::types::{
CatalogConfig, CommitTableRequest, CommitTableResponse, CreateTableRequest,
- ListNamespaceResponse, ListTableResponse, LoadTableResponse, NamespaceSerde,
- RegisterTableRequest, RenameTableRequest,
+ ListNamespaceResponse, ListTableResponse, LoadCredentialsResponse, LoadTableResponse,
+ NamespaceSerde, RegisterTableRequest, RenameTableRequest,
};
/// REST catalog URI
@@ -461,6 +461,108 @@ impl RestCatalog {
pub async fn regenerate_token(&self) -> Result<()> {
self.context().await?.client.regenerate_token().await
}
+
+ /// The actual logic for loading table, that supports loading vended credentials if requested.
+ async fn load_table_internal(
+ &self,
+ table_ident: &TableIdent,
+ load_credentials: bool,
+ ) -> Result
{
+ let context = self.context().await?;
+
+ let mut request_builder = context
+ .client
+ .request(Method::GET, context.config.table_endpoint(table_ident));
+
+ if load_credentials {
+ request_builder =
+ request_builder.header("X-Iceberg-Access-Delegation", "vended-credentials");
+ }
+
+ let request = request_builder.build()?;
+
+ let http_response = context.client.query_catalog(request).await?;
+
+ let response = match http_response.status() {
+ StatusCode::OK | StatusCode::NOT_MODIFIED => {
+ deserialize_catalog_response::(http_response).await?
+ }
+ StatusCode::NOT_FOUND => {
+ return Err(Error::new(
+ ErrorKind::Unexpected,
+ "Tried to load a table that does not exist",
+ ));
+ }
+ _ => return Err(deserialize_unexpected_catalog_error(http_response).await),
+ };
+
+ // Build config with proper precedence, with each next config overriding previous one:
+ // 1. response.config (server defaults)
+ // 2. user_config.props (user configuration)
+ // 3. storage_credentials (vended credentials - highest priority)
+ let mut config: HashMap = response
+ .config
+ .unwrap_or_default()
+ .into_iter()
+ .chain(self.user_config.props.clone())
+ .collect();
+
+ // Per the OpenAPI spec: "Clients must first check whether the respective credentials
+ // exist in the storage-credentials field before checking the config for credentials."
+ // When vended-credentials header is set, credentials are returned in storage_credentials field.
+ if let Some(storage_credentials) = response.storage_credentials {
+ for cred in storage_credentials {
+ config.extend(cred.config);
+ }
+ }
+
+ let file_io = self
+ .load_file_io(response.metadata_location.as_deref(), Some(config))
+ .await?;
+
+ let table_builder = Table::builder()
+ .identifier(table_ident.clone())
+ .file_io(file_io)
+ .metadata(response.metadata);
+
+ if let Some(metadata_location) = response.metadata_location {
+ table_builder.metadata_location(metadata_location).build()
+ } else {
+ table_builder.build()
+ }
+ }
+
+ /// Load vended credentials for a table from the catalog.
+ pub async fn load_table_credentials(
+ &self,
+ table_ident: &TableIdent,
+ ) -> Result {
+ let context = self.context().await?;
+
+ let endpoint = format!("{}/credentials", context.config.table_endpoint(table_ident));
+
+ let request = context.client.request(Method::GET, endpoint).build()?;
+
+ let http_response = context.client.query_catalog(request).await?;
+
+ match http_response.status() {
+ StatusCode::OK => deserialize_catalog_response(http_response).await,
+ StatusCode::NOT_FOUND => Err(Error::new(
+ ErrorKind::Unexpected,
+ "Tried to load credentials for a table that does not exist",
+ )),
+ _ => Err(deserialize_unexpected_catalog_error(http_response).await),
+ }
+ }
+
+ /// Load a table with vended credentials from the catalog.
+ ///
+ /// This method loads the table and automatically fetches short-lived credentials
+ /// for accessing the table's data files. The credentials are merged into the
+ /// FileIO configuration.
+ pub async fn load_table_with_credentials(&self, table_ident: &TableIdent) -> Result {
+ self.load_table_internal(table_ident, true).await
+ }
}
/// All requests and expected responses are derived from the REST catalog API spec:
@@ -754,49 +856,7 @@ impl Catalog for RestCatalog {
/// server and the config provided when creating this `RestCatalog` instance, then the value
/// provided locally to the `RestCatalog` will take precedence.
async fn load_table(&self, table_ident: &TableIdent) -> Result {
- let context = self.context().await?;
-
- let request = context
- .client
- .request(Method::GET, context.config.table_endpoint(table_ident))
- .build()?;
-
- let http_response = context.client.query_catalog(request).await?;
-
- let response = match http_response.status() {
- StatusCode::OK | StatusCode::NOT_MODIFIED => {
- deserialize_catalog_response::(http_response).await?
- }
- StatusCode::NOT_FOUND => {
- return Err(Error::new(
- ErrorKind::Unexpected,
- "Tried to load a table that does not exist",
- ));
- }
- _ => return Err(deserialize_unexpected_catalog_error(http_response).await),
- };
-
- let config = response
- .config
- .unwrap_or_default()
- .into_iter()
- .chain(self.user_config.props.clone())
- .collect();
-
- let file_io = self
- .load_file_io(response.metadata_location.as_deref(), Some(config))
- .await?;
-
- let table_builder = Table::builder()
- .identifier(table_ident.clone())
- .file_io(file_io)
- .metadata(response.metadata);
-
- if let Some(metadata_location) = response.metadata_location {
- table_builder.metadata_location(metadata_location).build()
- } else {
- table_builder.build()
- }
+ self.load_table_internal(table_ident, false).await
}
/// Drop a table from the catalog.
@@ -999,6 +1059,7 @@ mod tests {
use std::sync::Arc;
use chrono::{TimeZone, Utc};
+ use futures::stream::StreamExt;
use iceberg::spec::{
FormatVersion, NestedField, NullOrder, Operation, PrimitiveType, Schema, Snapshot,
SnapshotLog, SortDirection, SortField, SortOrder, Summary, Transform, Type,
@@ -2764,4 +2825,141 @@ mod tests {
assert_eq!(err.message(), "Catalog uri is required");
}
}
+
+ #[tokio::test]
+ #[ignore]
+ async fn test_load_table_credentials_integration() {
+ use std::env;
+
+ let client_id =
+ env::var("POLARIS_USER").expect("POLARIS_USER environment variable must be set");
+ let client_secret =
+ env::var("POLARIS_SECRET").expect("POLARIS_SECRET environment variable must be set");
+ let catalog_uri = env::var("POLARIS_URI")
+ .unwrap_or_else(|_| "http://localhost:8181/api/catalog".to_string());
+
+ let mut props = HashMap::new();
+ props.insert(
+ "credential".to_string(),
+ format!("{}:{}", client_id, client_secret),
+ );
+ props.insert("scope".to_string(), "PRINCIPAL_ROLE:ALL".to_string());
+ props.insert(
+ "s3.endpoint".to_string(),
+ "http://localhost:9000".to_string(),
+ );
+
+ let catalog = RestCatalog::new(
+ RestCatalogConfig::builder()
+ .uri(catalog_uri)
+ .warehouse("warehouse".to_string())
+ .props(props)
+ .build(),
+ );
+
+ let table_ident = TableIdent::new(
+ NamespaceIdent::new("tpch.sf01".to_string()),
+ "nation".to_string(),
+ );
+
+ let credentials_result = catalog.load_table_credentials(&table_ident).await;
+
+ match credentials_result {
+ Ok(credentials) => {
+ println!("Successfully loaded credentials");
+ println!(
+ "Number of storage credentials: {}",
+ credentials.storage_credentials.len()
+ );
+ // println!("Full response: {:#?}", credentials);
+ assert!(!credentials.storage_credentials.is_empty());
+ }
+ Err(e) => {
+ panic!("Failed to load table credentials: {:?}", e);
+ }
+ }
+
+ // Also test loading table with vended credentials
+ println!("\n--- Testing load_table_with_credentials ---");
+ let table_result = catalog.load_table_with_credentials(&table_ident).await;
+
+ match table_result {
+ Ok(table) => {
+ println!("Successfully loaded table with vended credentials");
+ println!("Table identifier: {}", table.identifier());
+ println!("Metadata location: {:?}", table.metadata_location());
+ println!("FileIO configured with vended credentials");
+
+ // Scan the table and count rows
+ println!("\n--- Scanning table ---");
+ let scan = table.scan().build().expect("Failed to build scan");
+ let mut row_count = 0;
+
+ let mut stream = scan
+ .to_arrow()
+ .await
+ .expect("Failed to create arrow stream");
+
+ while let Some(batch_result) = stream.next().await {
+ match batch_result {
+ Ok(batch) => {
+ row_count += batch.num_rows();
+ println!(" Batch: {} rows", batch.num_rows());
+ }
+ Err(e) => {
+ panic!("Failed to read batch: {:?}", e);
+ }
+ }
+ }
+
+ println!("Total rows scanned: {}", row_count);
+ assert_eq!(row_count, 25, "Expected 25 rows in nation table");
+ println!("✓ Successfully verified 25 rows in table");
+ }
+ Err(e) => {
+ panic!("Failed to load table with vended credentials: {:?}", e);
+ }
+ }
+
+ // Test loading table WITHOUT vended credentials and verify scan fails
+ println!("\n--- Testing load_table WITHOUT vended credentials (should fail) ---");
+ let table_result_no_creds = catalog.load_table(&table_ident).await;
+
+ match table_result_no_creds {
+ Ok(table) => {
+ println!("Successfully loaded table WITHOUT vended credentials");
+ println!("Table identifier: {}", table.identifier());
+ println!("Metadata location: {:?}", table.metadata_location());
+
+ // Try to scan the table - this should fail
+ println!("\n--- Attempting to scan table without credentials ---");
+ let scan = table.scan().build().expect("Failed to build scan");
+
+ // Try to create arrow stream - this should fail when accessing manifest list
+ match scan.to_arrow().await {
+ Ok(_stream) => {
+ panic!(
+ "Stream creation succeeded without vended credentials - this should not happen!"
+ );
+ }
+ Err(e) => {
+ println!("✓ Scan failed as expected without vended credentials");
+ println!("Error: {}", e);
+ // Verify it's a permission/authentication error
+ let error_msg = e.to_string();
+ assert!(
+ error_msg.contains("PermissionDenied")
+ && error_msg.contains("InvalidAccessKeyId")
+ && error_msg.contains("403"),
+ "Expected permission/authentication error, got: {}",
+ error_msg
+ );
+ }
+ }
+ }
+ Err(e) => {
+ panic!("Failed to load table without vended credentials: {:?}", e);
+ }
+ }
+ }
}
diff --git a/crates/catalog/rest/src/types.rs b/crates/catalog/rest/src/types.rs
index 70ed72051a..2acd18e47e 100644
--- a/crates/catalog/rest/src/types.rs
+++ b/crates/catalog/rest/src/types.rs
@@ -164,6 +164,7 @@ pub(super) struct LoadTableResponse {
pub(super) metadata_location: Option,
pub(super) metadata: TableMetadata,
pub(super) config: Option>,
+ pub(super) storage_credentials: Option>,
}
#[derive(Debug, Serialize, Deserialize)]
@@ -199,3 +200,15 @@ pub(super) struct RegisterTableRequest {
pub(super) metadata_location: String,
pub(super) overwrite: Option,
}
+
+#[derive(Debug, Clone, Serialize, Deserialize)]
+pub struct StorageCredential {
+ pub prefix: String,
+ pub config: HashMap,
+}
+
+#[derive(Debug, Serialize, Deserialize)]
+#[serde(rename_all = "kebab-case")]
+pub struct LoadCredentialsResponse {
+ pub storage_credentials: Vec,
+}