From 2407aac488dd496fde171b044033431015a9d0f2 Mon Sep 17 00:00:00 2001 From: Vukasin Stefanovic Date: Wed, 24 Dec 2025 12:57:22 +0100 Subject: [PATCH 1/3] added load_table_credentials --- crates/catalog/rest/src/catalog.rs | 30 ++++++++++++++++++++++++++++-- crates/catalog/rest/src/types.rs | 12 ++++++++++++ 2 files changed, 40 insertions(+), 2 deletions(-) diff --git a/crates/catalog/rest/src/catalog.rs b/crates/catalog/rest/src/catalog.rs index ce5656cfca..b31e636165 100644 --- a/crates/catalog/rest/src/catalog.rs +++ b/crates/catalog/rest/src/catalog.rs @@ -44,8 +44,8 @@ use crate::client::{ }; use crate::types::{ CatalogConfig, CommitTableRequest, CommitTableResponse, CreateTableRequest, - ListNamespaceResponse, ListTableResponse, LoadTableResponse, NamespaceSerde, - RegisterTableRequest, RenameTableRequest, + ListNamespaceResponse, ListTableResponse, LoadCredentialsResponse, LoadTableResponse, + NamespaceSerde, RegisterTableRequest, RenameTableRequest, }; /// REST catalog URI @@ -461,6 +461,32 @@ impl RestCatalog { pub async fn regenerate_token(&self) -> Result<()> { self.context().await?.client.regenerate_token().await } + + /// Load vended credentials for a table from the catalog. + pub async fn load_table_credentials( + &self, + table_ident: &TableIdent, + ) -> Result { + let context = self.context().await?; + + let endpoint = format!( + "{}/credentials", + context.config.table_endpoint(table_ident) + ); + + let request = context.client.request(Method::GET, endpoint).build()?; + + let http_response = context.client.query_catalog(request).await?; + + match http_response.status() { + StatusCode::OK => deserialize_catalog_response(http_response).await, + StatusCode::NOT_FOUND => Err(Error::new( + ErrorKind::Unexpected, + "Tried to load credentials for a table that does not exist", + )), + _ => Err(deserialize_unexpected_catalog_error(http_response).await), + } + } } /// All requests and expected responses are derived from the REST catalog API spec: diff --git a/crates/catalog/rest/src/types.rs b/crates/catalog/rest/src/types.rs index 70ed72051a..83759ef40e 100644 --- a/crates/catalog/rest/src/types.rs +++ b/crates/catalog/rest/src/types.rs @@ -199,3 +199,15 @@ pub(super) struct RegisterTableRequest { pub(super) metadata_location: String, pub(super) overwrite: Option, } + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct StorageCredential { + pub prefix: String, + pub config: HashMap, +} + +#[derive(Debug, Serialize, Deserialize)] +#[serde(rename_all = "kebab-case")] +pub struct LoadCredentialsResponse { + pub storage_credentials: Vec, +} From c0b84703277e8ac82710c4ffc4d74f26db017b5b Mon Sep 17 00:00:00 2001 From: Vukasin Stefanovic Date: Wed, 24 Dec 2025 14:12:17 +0100 Subject: [PATCH 2/3] Support catalog credentials (both during load and during --- Cargo.lock | 1 + crates/catalog/rest/Cargo.toml | 1 + crates/catalog/rest/src/catalog.rs | 244 ++++++++++++++++++++++++----- crates/catalog/rest/src/types.rs | 1 + 4 files changed, 204 insertions(+), 43 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index f281319aa6..301b2b9428 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2395,6 +2395,7 @@ dependencies = [ "async-trait", "chrono", "ctor", + "futures", "http 1.3.1", "iceberg", "iceberg_test_utils", diff --git a/crates/catalog/rest/Cargo.toml b/crates/catalog/rest/Cargo.toml index 916b5ccf75..44e4a22e32 100644 --- a/crates/catalog/rest/Cargo.toml +++ b/crates/catalog/rest/Cargo.toml @@ -45,6 +45,7 @@ uuid = { workspace = true, features = ["v4"] } [dev-dependencies] ctor = { workspace = true } +futures = { workspace = true } iceberg_test_utils = { path = "../../test_utils", features = ["tests"] } mockito = { workspace = true } port_scanner = { workspace = true } diff --git a/crates/catalog/rest/src/catalog.rs b/crates/catalog/rest/src/catalog.rs index b31e636165..03ef5f6baf 100644 --- a/crates/catalog/rest/src/catalog.rs +++ b/crates/catalog/rest/src/catalog.rs @@ -462,6 +462,71 @@ impl RestCatalog { self.context().await?.client.regenerate_token().await } + /// The actual logic for loading table, that supports loading vended credentials if requested. + async fn load_table_internal(&self, table_ident: &TableIdent, load_credentials: bool) -> Result { + let context = self.context().await?; + + let mut request_builder = context + .client + .request(Method::GET, context.config.table_endpoint(table_ident)); + + if load_credentials { + request_builder = request_builder.header("X-Iceberg-Access-Delegation", "vended-credentials"); + } + + let request = request_builder.build()?; + + let http_response = context.client.query_catalog(request).await?; + + let response = match http_response.status() { + StatusCode::OK | StatusCode::NOT_MODIFIED => { + deserialize_catalog_response::(http_response).await? + } + StatusCode::NOT_FOUND => { + return Err(Error::new( + ErrorKind::Unexpected, + "Tried to load a table that does not exist", + )); + } + _ => return Err(deserialize_unexpected_catalog_error(http_response).await), + }; + + // Build config with proper precedence, with each next config overriding previous one: + // 1. response.config (server defaults) + // 2. user_config.props (user configuration) + // 3. storage_credentials (vended credentials - highest priority) + let mut config: HashMap = response + .config + .unwrap_or_default() + .into_iter() + .chain(self.user_config.props.clone()) + .collect(); + + // Per the OpenAPI spec: "Clients must first check whether the respective credentials + // exist in the storage-credentials field before checking the config for credentials." + // When vended-credentials header is set, credentials are returned in storage_credentials field. + if let Some(storage_credentials) = response.storage_credentials { + for cred in storage_credentials { + config.extend(cred.config); + } + } + + let file_io = self + .load_file_io(response.metadata_location.as_deref(), Some(config)) + .await?; + + let table_builder = Table::builder() + .identifier(table_ident.clone()) + .file_io(file_io) + .metadata(response.metadata); + + if let Some(metadata_location) = response.metadata_location { + table_builder.metadata_location(metadata_location).build() + } else { + table_builder.build() + } + } + /// Load vended credentials for a table from the catalog. pub async fn load_table_credentials( &self, @@ -487,6 +552,15 @@ impl RestCatalog { _ => Err(deserialize_unexpected_catalog_error(http_response).await), } } + + /// Load a table with vended credentials from the catalog. + /// + /// This method loads the table and automatically fetches short-lived credentials + /// for accessing the table's data files. The credentials are merged into the + /// FileIO configuration. + pub async fn load_table_with_credentials(&self, table_ident: &TableIdent) -> Result
{ + self.load_table_internal(table_ident, true).await + } } /// All requests and expected responses are derived from the REST catalog API spec: @@ -780,49 +854,7 @@ impl Catalog for RestCatalog { /// server and the config provided when creating this `RestCatalog` instance, then the value /// provided locally to the `RestCatalog` will take precedence. async fn load_table(&self, table_ident: &TableIdent) -> Result
{ - let context = self.context().await?; - - let request = context - .client - .request(Method::GET, context.config.table_endpoint(table_ident)) - .build()?; - - let http_response = context.client.query_catalog(request).await?; - - let response = match http_response.status() { - StatusCode::OK | StatusCode::NOT_MODIFIED => { - deserialize_catalog_response::(http_response).await? - } - StatusCode::NOT_FOUND => { - return Err(Error::new( - ErrorKind::Unexpected, - "Tried to load a table that does not exist", - )); - } - _ => return Err(deserialize_unexpected_catalog_error(http_response).await), - }; - - let config = response - .config - .unwrap_or_default() - .into_iter() - .chain(self.user_config.props.clone()) - .collect(); - - let file_io = self - .load_file_io(response.metadata_location.as_deref(), Some(config)) - .await?; - - let table_builder = Table::builder() - .identifier(table_ident.clone()) - .file_io(file_io) - .metadata(response.metadata); - - if let Some(metadata_location) = response.metadata_location { - table_builder.metadata_location(metadata_location).build() - } else { - table_builder.build() - } + self.load_table_internal(table_ident, false).await } /// Drop a table from the catalog. @@ -1020,6 +1052,7 @@ impl Catalog for RestCatalog { #[cfg(test)] mod tests { + use futures::stream::StreamExt; use std::fs::File; use std::io::BufReader; use std::sync::Arc; @@ -2790,4 +2823,129 @@ mod tests { assert_eq!(err.message(), "Catalog uri is required"); } } + + #[tokio::test] + #[ignore] + async fn test_load_table_credentials_integration() { + use std::env; + + let client_id = env::var("POLARIS_USER") + .expect("POLARIS_USER environment variable must be set"); + let client_secret = env::var("POLARIS_SECRET") + .expect("POLARIS_SECRET environment variable must be set"); + let catalog_uri = env::var("POLARIS_URI") + .unwrap_or_else(|_| "http://localhost:8181/api/catalog".to_string()); + + let mut props = HashMap::new(); + props.insert( + "credential".to_string(), + format!("{}:{}", client_id, client_secret), + ); + props.insert("scope".to_string(), "PRINCIPAL_ROLE:ALL".to_string()); + props.insert("s3.endpoint".to_string(), "http://localhost:9000".to_string()); + + let catalog = RestCatalog::new( + RestCatalogConfig::builder() + .uri(catalog_uri) + .warehouse("warehouse".to_string()) + .props(props) + .build(), + ); + + let table_ident = TableIdent::new( + NamespaceIdent::new("tpch.sf01".to_string()), + "nation".to_string(), + ); + + let credentials_result = catalog.load_table_credentials(&table_ident).await; + + match credentials_result { + Ok(credentials) => { + println!("Successfully loaded credentials"); + println!("Number of storage credentials: {}", credentials.storage_credentials.len()); + // println!("Full response: {:#?}", credentials); + assert!(!credentials.storage_credentials.is_empty()); + } + Err(e) => { + panic!("Failed to load table credentials: {:?}", e); + } + } + + // Also test loading table with vended credentials + println!("\n--- Testing load_table_with_credentials ---"); + let table_result = catalog.load_table_with_credentials(&table_ident).await; + + match table_result { + Ok(table) => { + println!("Successfully loaded table with vended credentials"); + println!("Table identifier: {}", table.identifier()); + println!("Metadata location: {:?}", table.metadata_location()); + println!("FileIO configured with vended credentials"); + + // Scan the table and count rows + println!("\n--- Scanning table ---"); + let scan = table.scan().build().expect("Failed to build scan"); + let mut row_count = 0; + + let mut stream = scan.to_arrow().await.expect("Failed to create arrow stream"); + + while let Some(batch_result) = stream.next().await { + match batch_result { + Ok(batch) => { + row_count += batch.num_rows(); + println!(" Batch: {} rows", batch.num_rows()); + } + Err(e) => { + panic!("Failed to read batch: {:?}", e); + } + } + } + + println!("Total rows scanned: {}", row_count); + assert_eq!(row_count, 25, "Expected 25 rows in nation table"); + println!("✓ Successfully verified 25 rows in table"); + } + Err(e) => { + panic!("Failed to load table with vended credentials: {:?}", e); + } + } + + // Test loading table WITHOUT vended credentials and verify scan fails + println!("\n--- Testing load_table WITHOUT vended credentials (should fail) ---"); + let table_result_no_creds = catalog.load_table(&table_ident).await; + + match table_result_no_creds { + Ok(table) => { + println!("Successfully loaded table WITHOUT vended credentials"); + println!("Table identifier: {}", table.identifier()); + println!("Metadata location: {:?}", table.metadata_location()); + + // Try to scan the table - this should fail + println!("\n--- Attempting to scan table without credentials ---"); + let scan = table.scan().build().expect("Failed to build scan"); + + // Try to create arrow stream - this should fail when accessing manifest list + match scan.to_arrow().await { + Ok(_stream) => { + panic!("Stream creation succeeded without vended credentials - this should not happen!"); + } + Err(e) => { + println!("✓ Scan failed as expected without vended credentials"); + println!("Error: {}", e); + // Verify it's a permission/authentication error + let error_msg = e.to_string(); + assert!( + error_msg.contains("PermissionDenied") && + error_msg.contains("InvalidAccessKeyId") && + error_msg.contains("403"), + "Expected permission/authentication error, got: {}", error_msg + ); + } + } + } + Err(e) => { + panic!("Failed to load table without vended credentials: {:?}", e); + } + } + } } diff --git a/crates/catalog/rest/src/types.rs b/crates/catalog/rest/src/types.rs index 83759ef40e..2acd18e47e 100644 --- a/crates/catalog/rest/src/types.rs +++ b/crates/catalog/rest/src/types.rs @@ -164,6 +164,7 @@ pub(super) struct LoadTableResponse { pub(super) metadata_location: Option, pub(super) metadata: TableMetadata, pub(super) config: Option>, + pub(super) storage_credentials: Option>, } #[derive(Debug, Serialize, Deserialize)] From a71d13659eeb747a4b3fee052ea8741a59aa375d Mon Sep 17 00:00:00 2001 From: Vukasin Stefanovic Date: Wed, 24 Dec 2025 16:20:48 +0100 Subject: [PATCH 3/3] cargo fmt --- crates/catalog/rest/src/catalog.rs | 52 +++++++++++++++++++----------- 1 file changed, 33 insertions(+), 19 deletions(-) diff --git a/crates/catalog/rest/src/catalog.rs b/crates/catalog/rest/src/catalog.rs index 03ef5f6baf..c784f32039 100644 --- a/crates/catalog/rest/src/catalog.rs +++ b/crates/catalog/rest/src/catalog.rs @@ -463,7 +463,11 @@ impl RestCatalog { } /// The actual logic for loading table, that supports loading vended credentials if requested. - async fn load_table_internal(&self, table_ident: &TableIdent, load_credentials: bool) -> Result
{ + async fn load_table_internal( + &self, + table_ident: &TableIdent, + load_credentials: bool, + ) -> Result
{ let context = self.context().await?; let mut request_builder = context @@ -471,7 +475,8 @@ impl RestCatalog { .request(Method::GET, context.config.table_endpoint(table_ident)); if load_credentials { - request_builder = request_builder.header("X-Iceberg-Access-Delegation", "vended-credentials"); + request_builder = + request_builder.header("X-Iceberg-Access-Delegation", "vended-credentials"); } let request = request_builder.build()?; @@ -534,10 +539,7 @@ impl RestCatalog { ) -> Result { let context = self.context().await?; - let endpoint = format!( - "{}/credentials", - context.config.table_endpoint(table_ident) - ); + let endpoint = format!("{}/credentials", context.config.table_endpoint(table_ident)); let request = context.client.request(Method::GET, endpoint).build()?; @@ -1052,12 +1054,12 @@ impl Catalog for RestCatalog { #[cfg(test)] mod tests { - use futures::stream::StreamExt; use std::fs::File; use std::io::BufReader; use std::sync::Arc; use chrono::{TimeZone, Utc}; + use futures::stream::StreamExt; use iceberg::spec::{ FormatVersion, NestedField, NullOrder, Operation, PrimitiveType, Schema, Snapshot, SnapshotLog, SortDirection, SortField, SortOrder, Summary, Transform, Type, @@ -2829,10 +2831,10 @@ mod tests { async fn test_load_table_credentials_integration() { use std::env; - let client_id = env::var("POLARIS_USER") - .expect("POLARIS_USER environment variable must be set"); - let client_secret = env::var("POLARIS_SECRET") - .expect("POLARIS_SECRET environment variable must be set"); + let client_id = + env::var("POLARIS_USER").expect("POLARIS_USER environment variable must be set"); + let client_secret = + env::var("POLARIS_SECRET").expect("POLARIS_SECRET environment variable must be set"); let catalog_uri = env::var("POLARIS_URI") .unwrap_or_else(|_| "http://localhost:8181/api/catalog".to_string()); @@ -2842,7 +2844,10 @@ mod tests { format!("{}:{}", client_id, client_secret), ); props.insert("scope".to_string(), "PRINCIPAL_ROLE:ALL".to_string()); - props.insert("s3.endpoint".to_string(), "http://localhost:9000".to_string()); + props.insert( + "s3.endpoint".to_string(), + "http://localhost:9000".to_string(), + ); let catalog = RestCatalog::new( RestCatalogConfig::builder() @@ -2862,7 +2867,10 @@ mod tests { match credentials_result { Ok(credentials) => { println!("Successfully loaded credentials"); - println!("Number of storage credentials: {}", credentials.storage_credentials.len()); + println!( + "Number of storage credentials: {}", + credentials.storage_credentials.len() + ); // println!("Full response: {:#?}", credentials); assert!(!credentials.storage_credentials.is_empty()); } @@ -2887,7 +2895,10 @@ mod tests { let scan = table.scan().build().expect("Failed to build scan"); let mut row_count = 0; - let mut stream = scan.to_arrow().await.expect("Failed to create arrow stream"); + let mut stream = scan + .to_arrow() + .await + .expect("Failed to create arrow stream"); while let Some(batch_result) = stream.next().await { match batch_result { @@ -2927,7 +2938,9 @@ mod tests { // Try to create arrow stream - this should fail when accessing manifest list match scan.to_arrow().await { Ok(_stream) => { - panic!("Stream creation succeeded without vended credentials - this should not happen!"); + panic!( + "Stream creation succeeded without vended credentials - this should not happen!" + ); } Err(e) => { println!("✓ Scan failed as expected without vended credentials"); @@ -2935,10 +2948,11 @@ mod tests { // Verify it's a permission/authentication error let error_msg = e.to_string(); assert!( - error_msg.contains("PermissionDenied") && - error_msg.contains("InvalidAccessKeyId") && - error_msg.contains("403"), - "Expected permission/authentication error, got: {}", error_msg + error_msg.contains("PermissionDenied") + && error_msg.contains("InvalidAccessKeyId") + && error_msg.contains("403"), + "Expected permission/authentication error, got: {}", + error_msg ); } }