diff --git a/Cargo.lock b/Cargo.lock index f281319aa6..301b2b9428 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2395,6 +2395,7 @@ dependencies = [ "async-trait", "chrono", "ctor", + "futures", "http 1.3.1", "iceberg", "iceberg_test_utils", diff --git a/crates/catalog/rest/Cargo.toml b/crates/catalog/rest/Cargo.toml index 916b5ccf75..44e4a22e32 100644 --- a/crates/catalog/rest/Cargo.toml +++ b/crates/catalog/rest/Cargo.toml @@ -45,6 +45,7 @@ uuid = { workspace = true, features = ["v4"] } [dev-dependencies] ctor = { workspace = true } +futures = { workspace = true } iceberg_test_utils = { path = "../../test_utils", features = ["tests"] } mockito = { workspace = true } port_scanner = { workspace = true } diff --git a/crates/catalog/rest/src/catalog.rs b/crates/catalog/rest/src/catalog.rs index ce5656cfca..c784f32039 100644 --- a/crates/catalog/rest/src/catalog.rs +++ b/crates/catalog/rest/src/catalog.rs @@ -44,8 +44,8 @@ use crate::client::{ }; use crate::types::{ CatalogConfig, CommitTableRequest, CommitTableResponse, CreateTableRequest, - ListNamespaceResponse, ListTableResponse, LoadTableResponse, NamespaceSerde, - RegisterTableRequest, RenameTableRequest, + ListNamespaceResponse, ListTableResponse, LoadCredentialsResponse, LoadTableResponse, + NamespaceSerde, RegisterTableRequest, RenameTableRequest, }; /// REST catalog URI @@ -461,6 +461,108 @@ impl RestCatalog { pub async fn regenerate_token(&self) -> Result<()> { self.context().await?.client.regenerate_token().await } + + /// The actual logic for loading table, that supports loading vended credentials if requested. + async fn load_table_internal( + &self, + table_ident: &TableIdent, + load_credentials: bool, + ) -> Result { + let context = self.context().await?; + + let mut request_builder = context + .client + .request(Method::GET, context.config.table_endpoint(table_ident)); + + if load_credentials { + request_builder = + request_builder.header("X-Iceberg-Access-Delegation", "vended-credentials"); + } + + let request = request_builder.build()?; + + let http_response = context.client.query_catalog(request).await?; + + let response = match http_response.status() { + StatusCode::OK | StatusCode::NOT_MODIFIED => { + deserialize_catalog_response::(http_response).await? + } + StatusCode::NOT_FOUND => { + return Err(Error::new( + ErrorKind::Unexpected, + "Tried to load a table that does not exist", + )); + } + _ => return Err(deserialize_unexpected_catalog_error(http_response).await), + }; + + // Build config with proper precedence, with each next config overriding previous one: + // 1. response.config (server defaults) + // 2. user_config.props (user configuration) + // 3. storage_credentials (vended credentials - highest priority) + let mut config: HashMap = response + .config + .unwrap_or_default() + .into_iter() + .chain(self.user_config.props.clone()) + .collect(); + + // Per the OpenAPI spec: "Clients must first check whether the respective credentials + // exist in the storage-credentials field before checking the config for credentials." + // When vended-credentials header is set, credentials are returned in storage_credentials field. + if let Some(storage_credentials) = response.storage_credentials { + for cred in storage_credentials { + config.extend(cred.config); + } + } + + let file_io = self + .load_file_io(response.metadata_location.as_deref(), Some(config)) + .await?; + + let table_builder = Table::builder() + .identifier(table_ident.clone()) + .file_io(file_io) + .metadata(response.metadata); + + if let Some(metadata_location) = response.metadata_location { + table_builder.metadata_location(metadata_location).build() + } else { + table_builder.build() + } + } + + /// Load vended credentials for a table from the catalog. + pub async fn load_table_credentials( + &self, + table_ident: &TableIdent, + ) -> Result { + let context = self.context().await?; + + let endpoint = format!("{}/credentials", context.config.table_endpoint(table_ident)); + + let request = context.client.request(Method::GET, endpoint).build()?; + + let http_response = context.client.query_catalog(request).await?; + + match http_response.status() { + StatusCode::OK => deserialize_catalog_response(http_response).await, + StatusCode::NOT_FOUND => Err(Error::new( + ErrorKind::Unexpected, + "Tried to load credentials for a table that does not exist", + )), + _ => Err(deserialize_unexpected_catalog_error(http_response).await), + } + } + + /// Load a table with vended credentials from the catalog. + /// + /// This method loads the table and automatically fetches short-lived credentials + /// for accessing the table's data files. The credentials are merged into the + /// FileIO configuration. + pub async fn load_table_with_credentials(&self, table_ident: &TableIdent) -> Result
{ + self.load_table_internal(table_ident, true).await + } } /// All requests and expected responses are derived from the REST catalog API spec: @@ -754,49 +856,7 @@ impl Catalog for RestCatalog { /// server and the config provided when creating this `RestCatalog` instance, then the value /// provided locally to the `RestCatalog` will take precedence. async fn load_table(&self, table_ident: &TableIdent) -> Result
{ - let context = self.context().await?; - - let request = context - .client - .request(Method::GET, context.config.table_endpoint(table_ident)) - .build()?; - - let http_response = context.client.query_catalog(request).await?; - - let response = match http_response.status() { - StatusCode::OK | StatusCode::NOT_MODIFIED => { - deserialize_catalog_response::(http_response).await? - } - StatusCode::NOT_FOUND => { - return Err(Error::new( - ErrorKind::Unexpected, - "Tried to load a table that does not exist", - )); - } - _ => return Err(deserialize_unexpected_catalog_error(http_response).await), - }; - - let config = response - .config - .unwrap_or_default() - .into_iter() - .chain(self.user_config.props.clone()) - .collect(); - - let file_io = self - .load_file_io(response.metadata_location.as_deref(), Some(config)) - .await?; - - let table_builder = Table::builder() - .identifier(table_ident.clone()) - .file_io(file_io) - .metadata(response.metadata); - - if let Some(metadata_location) = response.metadata_location { - table_builder.metadata_location(metadata_location).build() - } else { - table_builder.build() - } + self.load_table_internal(table_ident, false).await } /// Drop a table from the catalog. @@ -999,6 +1059,7 @@ mod tests { use std::sync::Arc; use chrono::{TimeZone, Utc}; + use futures::stream::StreamExt; use iceberg::spec::{ FormatVersion, NestedField, NullOrder, Operation, PrimitiveType, Schema, Snapshot, SnapshotLog, SortDirection, SortField, SortOrder, Summary, Transform, Type, @@ -2764,4 +2825,141 @@ mod tests { assert_eq!(err.message(), "Catalog uri is required"); } } + + #[tokio::test] + #[ignore] + async fn test_load_table_credentials_integration() { + use std::env; + + let client_id = + env::var("POLARIS_USER").expect("POLARIS_USER environment variable must be set"); + let client_secret = + env::var("POLARIS_SECRET").expect("POLARIS_SECRET environment variable must be set"); + let catalog_uri = env::var("POLARIS_URI") + .unwrap_or_else(|_| "http://localhost:8181/api/catalog".to_string()); + + let mut props = HashMap::new(); + props.insert( + "credential".to_string(), + format!("{}:{}", client_id, client_secret), + ); + props.insert("scope".to_string(), "PRINCIPAL_ROLE:ALL".to_string()); + props.insert( + "s3.endpoint".to_string(), + "http://localhost:9000".to_string(), + ); + + let catalog = RestCatalog::new( + RestCatalogConfig::builder() + .uri(catalog_uri) + .warehouse("warehouse".to_string()) + .props(props) + .build(), + ); + + let table_ident = TableIdent::new( + NamespaceIdent::new("tpch.sf01".to_string()), + "nation".to_string(), + ); + + let credentials_result = catalog.load_table_credentials(&table_ident).await; + + match credentials_result { + Ok(credentials) => { + println!("Successfully loaded credentials"); + println!( + "Number of storage credentials: {}", + credentials.storage_credentials.len() + ); + // println!("Full response: {:#?}", credentials); + assert!(!credentials.storage_credentials.is_empty()); + } + Err(e) => { + panic!("Failed to load table credentials: {:?}", e); + } + } + + // Also test loading table with vended credentials + println!("\n--- Testing load_table_with_credentials ---"); + let table_result = catalog.load_table_with_credentials(&table_ident).await; + + match table_result { + Ok(table) => { + println!("Successfully loaded table with vended credentials"); + println!("Table identifier: {}", table.identifier()); + println!("Metadata location: {:?}", table.metadata_location()); + println!("FileIO configured with vended credentials"); + + // Scan the table and count rows + println!("\n--- Scanning table ---"); + let scan = table.scan().build().expect("Failed to build scan"); + let mut row_count = 0; + + let mut stream = scan + .to_arrow() + .await + .expect("Failed to create arrow stream"); + + while let Some(batch_result) = stream.next().await { + match batch_result { + Ok(batch) => { + row_count += batch.num_rows(); + println!(" Batch: {} rows", batch.num_rows()); + } + Err(e) => { + panic!("Failed to read batch: {:?}", e); + } + } + } + + println!("Total rows scanned: {}", row_count); + assert_eq!(row_count, 25, "Expected 25 rows in nation table"); + println!("✓ Successfully verified 25 rows in table"); + } + Err(e) => { + panic!("Failed to load table with vended credentials: {:?}", e); + } + } + + // Test loading table WITHOUT vended credentials and verify scan fails + println!("\n--- Testing load_table WITHOUT vended credentials (should fail) ---"); + let table_result_no_creds = catalog.load_table(&table_ident).await; + + match table_result_no_creds { + Ok(table) => { + println!("Successfully loaded table WITHOUT vended credentials"); + println!("Table identifier: {}", table.identifier()); + println!("Metadata location: {:?}", table.metadata_location()); + + // Try to scan the table - this should fail + println!("\n--- Attempting to scan table without credentials ---"); + let scan = table.scan().build().expect("Failed to build scan"); + + // Try to create arrow stream - this should fail when accessing manifest list + match scan.to_arrow().await { + Ok(_stream) => { + panic!( + "Stream creation succeeded without vended credentials - this should not happen!" + ); + } + Err(e) => { + println!("✓ Scan failed as expected without vended credentials"); + println!("Error: {}", e); + // Verify it's a permission/authentication error + let error_msg = e.to_string(); + assert!( + error_msg.contains("PermissionDenied") + && error_msg.contains("InvalidAccessKeyId") + && error_msg.contains("403"), + "Expected permission/authentication error, got: {}", + error_msg + ); + } + } + } + Err(e) => { + panic!("Failed to load table without vended credentials: {:?}", e); + } + } + } } diff --git a/crates/catalog/rest/src/types.rs b/crates/catalog/rest/src/types.rs index 70ed72051a..2acd18e47e 100644 --- a/crates/catalog/rest/src/types.rs +++ b/crates/catalog/rest/src/types.rs @@ -164,6 +164,7 @@ pub(super) struct LoadTableResponse { pub(super) metadata_location: Option, pub(super) metadata: TableMetadata, pub(super) config: Option>, + pub(super) storage_credentials: Option>, } #[derive(Debug, Serialize, Deserialize)] @@ -199,3 +200,15 @@ pub(super) struct RegisterTableRequest { pub(super) metadata_location: String, pub(super) overwrite: Option, } + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct StorageCredential { + pub prefix: String, + pub config: HashMap, +} + +#[derive(Debug, Serialize, Deserialize)] +#[serde(rename_all = "kebab-case")] +pub struct LoadCredentialsResponse { + pub storage_credentials: Vec, +}