From 2407aac488dd496fde171b044033431015a9d0f2 Mon Sep 17 00:00:00 2001 From: Vukasin Stefanovic Date: Wed, 24 Dec 2025 12:57:22 +0100 Subject: [PATCH 1/9] added load_table_credentials --- crates/catalog/rest/src/catalog.rs | 30 ++++++++++++++++++++++++++++-- crates/catalog/rest/src/types.rs | 12 ++++++++++++ 2 files changed, 40 insertions(+), 2 deletions(-) diff --git a/crates/catalog/rest/src/catalog.rs b/crates/catalog/rest/src/catalog.rs index ce5656cfca..b31e636165 100644 --- a/crates/catalog/rest/src/catalog.rs +++ b/crates/catalog/rest/src/catalog.rs @@ -44,8 +44,8 @@ use crate::client::{ }; use crate::types::{ CatalogConfig, CommitTableRequest, CommitTableResponse, CreateTableRequest, - ListNamespaceResponse, ListTableResponse, LoadTableResponse, NamespaceSerde, - RegisterTableRequest, RenameTableRequest, + ListNamespaceResponse, ListTableResponse, LoadCredentialsResponse, LoadTableResponse, + NamespaceSerde, RegisterTableRequest, RenameTableRequest, }; /// REST catalog URI @@ -461,6 +461,32 @@ impl RestCatalog { pub async fn regenerate_token(&self) -> Result<()> { self.context().await?.client.regenerate_token().await } + + /// Load vended credentials for a table from the catalog. + pub async fn load_table_credentials( + &self, + table_ident: &TableIdent, + ) -> Result { + let context = self.context().await?; + + let endpoint = format!( + "{}/credentials", + context.config.table_endpoint(table_ident) + ); + + let request = context.client.request(Method::GET, endpoint).build()?; + + let http_response = context.client.query_catalog(request).await?; + + match http_response.status() { + StatusCode::OK => deserialize_catalog_response(http_response).await, + StatusCode::NOT_FOUND => Err(Error::new( + ErrorKind::Unexpected, + "Tried to load credentials for a table that does not exist", + )), + _ => Err(deserialize_unexpected_catalog_error(http_response).await), + } + } } /// All requests and expected responses are derived from the REST catalog API spec: diff --git a/crates/catalog/rest/src/types.rs b/crates/catalog/rest/src/types.rs index 70ed72051a..83759ef40e 100644 --- a/crates/catalog/rest/src/types.rs +++ b/crates/catalog/rest/src/types.rs @@ -199,3 +199,15 @@ pub(super) struct RegisterTableRequest { pub(super) metadata_location: String, pub(super) overwrite: Option, } + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct StorageCredential { + pub prefix: String, + pub config: HashMap, +} + +#[derive(Debug, Serialize, Deserialize)] +#[serde(rename_all = "kebab-case")] +pub struct LoadCredentialsResponse { + pub storage_credentials: Vec, +} From c0b84703277e8ac82710c4ffc4d74f26db017b5b Mon Sep 17 00:00:00 2001 From: Vukasin Stefanovic Date: Wed, 24 Dec 2025 14:12:17 +0100 Subject: [PATCH 2/9] Support catalog credentials (both during load and during --- Cargo.lock | 1 + crates/catalog/rest/Cargo.toml | 1 + crates/catalog/rest/src/catalog.rs | 244 ++++++++++++++++++++++++----- crates/catalog/rest/src/types.rs | 1 + 4 files changed, 204 insertions(+), 43 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index f281319aa6..301b2b9428 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2395,6 +2395,7 @@ dependencies = [ "async-trait", "chrono", "ctor", + "futures", "http 1.3.1", "iceberg", "iceberg_test_utils", diff --git a/crates/catalog/rest/Cargo.toml b/crates/catalog/rest/Cargo.toml index 916b5ccf75..44e4a22e32 100644 --- a/crates/catalog/rest/Cargo.toml +++ b/crates/catalog/rest/Cargo.toml @@ -45,6 +45,7 @@ uuid = { workspace = true, features = ["v4"] } [dev-dependencies] ctor = { workspace = true } +futures = { workspace = true } iceberg_test_utils = { path = "../../test_utils", features = ["tests"] } mockito = { workspace = true } port_scanner = { workspace = true } diff --git a/crates/catalog/rest/src/catalog.rs b/crates/catalog/rest/src/catalog.rs index b31e636165..03ef5f6baf 100644 --- a/crates/catalog/rest/src/catalog.rs +++ b/crates/catalog/rest/src/catalog.rs @@ -462,6 +462,71 @@ impl RestCatalog { self.context().await?.client.regenerate_token().await } + /// The actual logic for loading table, that supports loading vended credentials if requested. + async fn load_table_internal(&self, table_ident: &TableIdent, load_credentials: bool) -> Result { + let context = self.context().await?; + + let mut request_builder = context + .client + .request(Method::GET, context.config.table_endpoint(table_ident)); + + if load_credentials { + request_builder = request_builder.header("X-Iceberg-Access-Delegation", "vended-credentials"); + } + + let request = request_builder.build()?; + + let http_response = context.client.query_catalog(request).await?; + + let response = match http_response.status() { + StatusCode::OK | StatusCode::NOT_MODIFIED => { + deserialize_catalog_response::(http_response).await? + } + StatusCode::NOT_FOUND => { + return Err(Error::new( + ErrorKind::Unexpected, + "Tried to load a table that does not exist", + )); + } + _ => return Err(deserialize_unexpected_catalog_error(http_response).await), + }; + + // Build config with proper precedence, with each next config overriding previous one: + // 1. response.config (server defaults) + // 2. user_config.props (user configuration) + // 3. storage_credentials (vended credentials - highest priority) + let mut config: HashMap = response + .config + .unwrap_or_default() + .into_iter() + .chain(self.user_config.props.clone()) + .collect(); + + // Per the OpenAPI spec: "Clients must first check whether the respective credentials + // exist in the storage-credentials field before checking the config for credentials." + // When vended-credentials header is set, credentials are returned in storage_credentials field. + if let Some(storage_credentials) = response.storage_credentials { + for cred in storage_credentials { + config.extend(cred.config); + } + } + + let file_io = self + .load_file_io(response.metadata_location.as_deref(), Some(config)) + .await?; + + let table_builder = Table::builder() + .identifier(table_ident.clone()) + .file_io(file_io) + .metadata(response.metadata); + + if let Some(metadata_location) = response.metadata_location { + table_builder.metadata_location(metadata_location).build() + } else { + table_builder.build() + } + } + /// Load vended credentials for a table from the catalog. pub async fn load_table_credentials( &self, @@ -487,6 +552,15 @@ impl RestCatalog { _ => Err(deserialize_unexpected_catalog_error(http_response).await), } } + + /// Load a table with vended credentials from the catalog. + /// + /// This method loads the table and automatically fetches short-lived credentials + /// for accessing the table's data files. The credentials are merged into the + /// FileIO configuration. + pub async fn load_table_with_credentials(&self, table_ident: &TableIdent) -> Result
{ + self.load_table_internal(table_ident, true).await + } } /// All requests and expected responses are derived from the REST catalog API spec: @@ -780,49 +854,7 @@ impl Catalog for RestCatalog { /// server and the config provided when creating this `RestCatalog` instance, then the value /// provided locally to the `RestCatalog` will take precedence. async fn load_table(&self, table_ident: &TableIdent) -> Result
{ - let context = self.context().await?; - - let request = context - .client - .request(Method::GET, context.config.table_endpoint(table_ident)) - .build()?; - - let http_response = context.client.query_catalog(request).await?; - - let response = match http_response.status() { - StatusCode::OK | StatusCode::NOT_MODIFIED => { - deserialize_catalog_response::(http_response).await? - } - StatusCode::NOT_FOUND => { - return Err(Error::new( - ErrorKind::Unexpected, - "Tried to load a table that does not exist", - )); - } - _ => return Err(deserialize_unexpected_catalog_error(http_response).await), - }; - - let config = response - .config - .unwrap_or_default() - .into_iter() - .chain(self.user_config.props.clone()) - .collect(); - - let file_io = self - .load_file_io(response.metadata_location.as_deref(), Some(config)) - .await?; - - let table_builder = Table::builder() - .identifier(table_ident.clone()) - .file_io(file_io) - .metadata(response.metadata); - - if let Some(metadata_location) = response.metadata_location { - table_builder.metadata_location(metadata_location).build() - } else { - table_builder.build() - } + self.load_table_internal(table_ident, false).await } /// Drop a table from the catalog. @@ -1020,6 +1052,7 @@ impl Catalog for RestCatalog { #[cfg(test)] mod tests { + use futures::stream::StreamExt; use std::fs::File; use std::io::BufReader; use std::sync::Arc; @@ -2790,4 +2823,129 @@ mod tests { assert_eq!(err.message(), "Catalog uri is required"); } } + + #[tokio::test] + #[ignore] + async fn test_load_table_credentials_integration() { + use std::env; + + let client_id = env::var("POLARIS_USER") + .expect("POLARIS_USER environment variable must be set"); + let client_secret = env::var("POLARIS_SECRET") + .expect("POLARIS_SECRET environment variable must be set"); + let catalog_uri = env::var("POLARIS_URI") + .unwrap_or_else(|_| "http://localhost:8181/api/catalog".to_string()); + + let mut props = HashMap::new(); + props.insert( + "credential".to_string(), + format!("{}:{}", client_id, client_secret), + ); + props.insert("scope".to_string(), "PRINCIPAL_ROLE:ALL".to_string()); + props.insert("s3.endpoint".to_string(), "http://localhost:9000".to_string()); + + let catalog = RestCatalog::new( + RestCatalogConfig::builder() + .uri(catalog_uri) + .warehouse("warehouse".to_string()) + .props(props) + .build(), + ); + + let table_ident = TableIdent::new( + NamespaceIdent::new("tpch.sf01".to_string()), + "nation".to_string(), + ); + + let credentials_result = catalog.load_table_credentials(&table_ident).await; + + match credentials_result { + Ok(credentials) => { + println!("Successfully loaded credentials"); + println!("Number of storage credentials: {}", credentials.storage_credentials.len()); + // println!("Full response: {:#?}", credentials); + assert!(!credentials.storage_credentials.is_empty()); + } + Err(e) => { + panic!("Failed to load table credentials: {:?}", e); + } + } + + // Also test loading table with vended credentials + println!("\n--- Testing load_table_with_credentials ---"); + let table_result = catalog.load_table_with_credentials(&table_ident).await; + + match table_result { + Ok(table) => { + println!("Successfully loaded table with vended credentials"); + println!("Table identifier: {}", table.identifier()); + println!("Metadata location: {:?}", table.metadata_location()); + println!("FileIO configured with vended credentials"); + + // Scan the table and count rows + println!("\n--- Scanning table ---"); + let scan = table.scan().build().expect("Failed to build scan"); + let mut row_count = 0; + + let mut stream = scan.to_arrow().await.expect("Failed to create arrow stream"); + + while let Some(batch_result) = stream.next().await { + match batch_result { + Ok(batch) => { + row_count += batch.num_rows(); + println!(" Batch: {} rows", batch.num_rows()); + } + Err(e) => { + panic!("Failed to read batch: {:?}", e); + } + } + } + + println!("Total rows scanned: {}", row_count); + assert_eq!(row_count, 25, "Expected 25 rows in nation table"); + println!("✓ Successfully verified 25 rows in table"); + } + Err(e) => { + panic!("Failed to load table with vended credentials: {:?}", e); + } + } + + // Test loading table WITHOUT vended credentials and verify scan fails + println!("\n--- Testing load_table WITHOUT vended credentials (should fail) ---"); + let table_result_no_creds = catalog.load_table(&table_ident).await; + + match table_result_no_creds { + Ok(table) => { + println!("Successfully loaded table WITHOUT vended credentials"); + println!("Table identifier: {}", table.identifier()); + println!("Metadata location: {:?}", table.metadata_location()); + + // Try to scan the table - this should fail + println!("\n--- Attempting to scan table without credentials ---"); + let scan = table.scan().build().expect("Failed to build scan"); + + // Try to create arrow stream - this should fail when accessing manifest list + match scan.to_arrow().await { + Ok(_stream) => { + panic!("Stream creation succeeded without vended credentials - this should not happen!"); + } + Err(e) => { + println!("✓ Scan failed as expected without vended credentials"); + println!("Error: {}", e); + // Verify it's a permission/authentication error + let error_msg = e.to_string(); + assert!( + error_msg.contains("PermissionDenied") && + error_msg.contains("InvalidAccessKeyId") && + error_msg.contains("403"), + "Expected permission/authentication error, got: {}", error_msg + ); + } + } + } + Err(e) => { + panic!("Failed to load table without vended credentials: {:?}", e); + } + } + } } diff --git a/crates/catalog/rest/src/types.rs b/crates/catalog/rest/src/types.rs index 83759ef40e..2acd18e47e 100644 --- a/crates/catalog/rest/src/types.rs +++ b/crates/catalog/rest/src/types.rs @@ -164,6 +164,7 @@ pub(super) struct LoadTableResponse { pub(super) metadata_location: Option, pub(super) metadata: TableMetadata, pub(super) config: Option>, + pub(super) storage_credentials: Option>, } #[derive(Debug, Serialize, Deserialize)] From 7dca3e098c7705c55c13e7ddadb7e63c39d652ce Mon Sep 17 00:00:00 2001 From: Vukasin Stefanovic Date: Wed, 24 Dec 2025 16:08:43 +0100 Subject: [PATCH 3/9] Draft azdls.sas-token. prefix support --- crates/catalog/rest/Cargo.toml | 3 + crates/catalog/rest/src/catalog.rs | 89 +++++++++--------- crates/iceberg/src/io/storage_azdls.rs | 124 +++++++++++++++++++++---- 3 files changed, 151 insertions(+), 65 deletions(-) diff --git a/crates/catalog/rest/Cargo.toml b/crates/catalog/rest/Cargo.toml index 44e4a22e32..81003f2851 100644 --- a/crates/catalog/rest/Cargo.toml +++ b/crates/catalog/rest/Cargo.toml @@ -28,6 +28,9 @@ keywords = ["iceberg", "rest", "catalog"] license = { workspace = true } repository = { workspace = true } +[features] +storage-azdls = ["iceberg/storage-azdls"] + [dependencies] async-trait = { workspace = true } chrono = { workspace = true } diff --git a/crates/catalog/rest/src/catalog.rs b/crates/catalog/rest/src/catalog.rs index 03ef5f6baf..ae13d11235 100644 --- a/crates/catalog/rest/src/catalog.rs +++ b/crates/catalog/rest/src/catalog.rs @@ -2834,7 +2834,7 @@ mod tests { let client_secret = env::var("POLARIS_SECRET") .expect("POLARIS_SECRET environment variable must be set"); let catalog_uri = env::var("POLARIS_URI") - .unwrap_or_else(|_| "http://localhost:8181/api/catalog".to_string()); + .unwrap_or_else(|_| "https://apb52872.snowflakecomputing.com/polaris/api/catalog".to_string()); let mut props = HashMap::new(); props.insert( @@ -2842,19 +2842,19 @@ mod tests { format!("{}:{}", client_id, client_secret), ); props.insert("scope".to_string(), "PRINCIPAL_ROLE:ALL".to_string()); - props.insert("s3.endpoint".to_string(), "http://localhost:9000".to_string()); + // props.insert("s3.endpoint".to_string(), "http://localhost:9000".to_string()); let catalog = RestCatalog::new( RestCatalogConfig::builder() .uri(catalog_uri) - .warehouse("warehouse".to_string()) + .warehouse("azure_cat".to_string()) .props(props) .build(), ); let table_ident = TableIdent::new( - NamespaceIdent::new("tpch.sf01".to_string()), - "nation".to_string(), + NamespaceIdent::new("ns1".to_string()), + "T2".to_string(), ); let credentials_result = catalog.load_table_credentials(&table_ident).await; @@ -2863,7 +2863,7 @@ mod tests { Ok(credentials) => { println!("Successfully loaded credentials"); println!("Number of storage credentials: {}", credentials.storage_credentials.len()); - // println!("Full response: {:#?}", credentials); + println!("Full response: {:#?}", credentials); assert!(!credentials.storage_credentials.is_empty()); } Err(e) => { @@ -2902,50 +2902,49 @@ mod tests { } println!("Total rows scanned: {}", row_count); - assert_eq!(row_count, 25, "Expected 25 rows in nation table"); - println!("✓ Successfully verified 25 rows in table"); + assert_eq!(row_count, 1, "Expected 1 rows in the table"); } Err(e) => { panic!("Failed to load table with vended credentials: {:?}", e); } } - // Test loading table WITHOUT vended credentials and verify scan fails - println!("\n--- Testing load_table WITHOUT vended credentials (should fail) ---"); - let table_result_no_creds = catalog.load_table(&table_ident).await; - - match table_result_no_creds { - Ok(table) => { - println!("Successfully loaded table WITHOUT vended credentials"); - println!("Table identifier: {}", table.identifier()); - println!("Metadata location: {:?}", table.metadata_location()); - - // Try to scan the table - this should fail - println!("\n--- Attempting to scan table without credentials ---"); - let scan = table.scan().build().expect("Failed to build scan"); - - // Try to create arrow stream - this should fail when accessing manifest list - match scan.to_arrow().await { - Ok(_stream) => { - panic!("Stream creation succeeded without vended credentials - this should not happen!"); - } - Err(e) => { - println!("✓ Scan failed as expected without vended credentials"); - println!("Error: {}", e); - // Verify it's a permission/authentication error - let error_msg = e.to_string(); - assert!( - error_msg.contains("PermissionDenied") && - error_msg.contains("InvalidAccessKeyId") && - error_msg.contains("403"), - "Expected permission/authentication error, got: {}", error_msg - ); - } - } - } - Err(e) => { - panic!("Failed to load table without vended credentials: {:?}", e); - } - } + // // Test loading table WITHOUT vended credentials and verify scan fails + // println!("\n--- Testing load_table WITHOUT vended credentials (should fail) ---"); + // let table_result_no_creds = catalog.load_table(&table_ident).await; + + // match table_result_no_creds { + // Ok(table) => { + // println!("Successfully loaded table WITHOUT vended credentials"); + // println!("Table identifier: {}", table.identifier()); + // println!("Metadata location: {:?}", table.metadata_location()); + + // // Try to scan the table - this should fail + // println!("\n--- Attempting to scan table without credentials ---"); + // let scan = table.scan().build().expect("Failed to build scan"); + + // // Try to create arrow stream - this should fail when accessing manifest list + // match scan.to_arrow().await { + // Ok(_stream) => { + // panic!("Stream creation succeeded without vended credentials - this should not happen!"); + // } + // Err(e) => { + // println!("✓ Scan failed as expected without vended credentials"); + // println!("Error: {}", e); + // // Verify it's a permission/authentication error + // let error_msg = e.to_string(); + // assert!( + // error_msg.contains("PermissionDenied") && + // error_msg.contains("InvalidAccessKeyId") && + // error_msg.contains("403"), + // "Expected permission/authentication error, got: {}", error_msg + // ); + // } + // } + // } + // Err(e) => { + // panic!("Failed to load table without vended credentials: {:?}", e); + // } + // } } } diff --git a/crates/iceberg/src/io/storage_azdls.rs b/crates/iceberg/src/io/storage_azdls.rs index b61ecd9af8..c78a389724 100644 --- a/crates/iceberg/src/io/storage_azdls.rs +++ b/crates/iceberg/src/io/storage_azdls.rs @@ -80,7 +80,7 @@ pub(crate) fn azdls_config_parse(mut properties: HashMap) -> Res config.account_key = Some(account_key); } - if let Some(sas_token) = properties.remove(ADLS_SAS_TOKEN) { + if let Some(sas_token) = find_sas_token(&properties, config.account_name.as_deref()) { config.sas_token = Some(sas_token); } @@ -103,6 +103,37 @@ pub(crate) fn azdls_config_parse(mut properties: HashMap) -> Res Ok(config) } +/// Finds the appropriate SAS token from properties based on account name. +/// +/// Strategy: +/// 1. If account name is known, search for keys matching `adls.sas-token.` prefix +/// 2. If not found, fall back to searching for keys matching `adls.sas-token` prefix +/// 3. Return the shortest matching key (least specific) +/// 4. Trim leading '?' from the token if present +fn find_sas_token(properties: &HashMap, account_name: Option<&str>) -> Option { + // Helper function to search for token with a given prefix + let find_with_prefix = |prefix: &str| { + properties + .iter() + .filter(|(key, _)| { + key.as_str() == prefix || key.starts_with(&format!("{}.", prefix)) + }) + .min_by_key(|(key, _)| key.len()) + .map(|(_, value)| value.strip_prefix('?').unwrap_or(value).to_string()) + }; + + // Try account-specific prefix first if account name is known, then fall back to base + if let Some(account) = account_name { + let account_prefix = format!("{}.{}", ADLS_SAS_TOKEN, account); + if let Some(token) = find_with_prefix(&account_prefix) { + return Some(token); + } + } + + // Fall back to base prefix (adls.sas-token) + find_with_prefix(ADLS_SAS_TOKEN) +} + /// Builds an OpenDAL operator from the AzdlsConfig and path. /// /// The path is expected to include the scheme in a format like: @@ -327,28 +358,29 @@ fn parse_azure_storage_endpoint(url: &Url) -> Result<(&str, &str, &str)> { } fn validate_storage_and_scheme( - storage_service: &str, + _storage_service: &str, scheme_str: &str, ) -> Result { let scheme = scheme_str.parse::()?; - match scheme { - AzureStorageScheme::Abfss | AzureStorageScheme::Abfs => { - ensure_data_valid!( - storage_service == "dfs", - "AzureStoragePath: Unexpected storage service for abfs[s]: {}", - storage_service - ); - Ok(scheme) - } - AzureStorageScheme::Wasbs | AzureStorageScheme::Wasb => { - ensure_data_valid!( - storage_service == "blob", - "AzureStoragePath: Unexpected storage service for wasb[s]: {}", - storage_service - ); - Ok(scheme) - } - } + Ok(scheme) + // match scheme { + // AzureStorageScheme::Abfss | AzureStorageScheme::Abfs => { + // ensure_data_valid!( + // storage_service == "dfs", + // "AzureStoragePath: Unexpected storage service for abfs[s]: {}", + // storage_service + // ); + // Ok(scheme) + // } + // AzureStorageScheme::Wasbs | AzureStorageScheme::Wasb => { + // ensure_data_valid!( + // storage_service == "blob", + // "AzureStoragePath: Unexpected storage service for wasb[s]: {}", + // storage_service + // ); + // Ok(scheme) + // } + // } } #[cfg(test)] @@ -423,6 +455,58 @@ mod tests { ..Default::default() }), ), + ( + "account-specific SAS token with full domain", + HashMap::from([ + (super::ADLS_ACCOUNT_NAME.to_string(), "vukasineusstorage1".to_string()), + ( + "adls.sas-token.vukasineusstorage1.blob.core.windows.net".to_string(), + "token-full".to_string() + ), + ( + "adls.sas-token.vukasineusstorage1".to_string(), + "token-account".to_string() + ), + ]), + Some(AzdlsConfig { + account_name: Some("vukasineusstorage1".to_string()), + sas_token: Some("token-account".to_string()), // Should pick the shorter one + ..Default::default() + }), + ), + ( + "account-specific SAS token with only full domain", + HashMap::from([ + (super::ADLS_ACCOUNT_NAME.to_string(), "myaccount".to_string()), + ( + "adls.sas-token.myaccount.blob.core.windows.net".to_string(), + "token-specific".to_string() + ), + ]), + Some(AzdlsConfig { + account_name: Some("myaccount".to_string()), + sas_token: Some("token-specific".to_string()), + ..Default::default() + }), + ), + ( + "SAS token without account name picks shortest", + HashMap::from([ + (super::ADLS_SAS_TOKEN.to_string(), "token-generic".to_string()), + ( + "adls.sas-token.someaccount".to_string(), + "token-account".to_string() + ), + ( + "adls.sas-token.someaccount.blob.core.windows.net".to_string(), + "token-specific".to_string() + ), + ]), + Some(AzdlsConfig { + sas_token: Some("token-generic".to_string()), // Should pick the shortest one + ..Default::default() + }), + ), ]; for (name, properties, expected) in test_cases { From a71d13659eeb747a4b3fee052ea8741a59aa375d Mon Sep 17 00:00:00 2001 From: Vukasin Stefanovic Date: Wed, 24 Dec 2025 16:20:48 +0100 Subject: [PATCH 4/9] cargo fmt --- crates/catalog/rest/src/catalog.rs | 52 +++++++++++++++++++----------- 1 file changed, 33 insertions(+), 19 deletions(-) diff --git a/crates/catalog/rest/src/catalog.rs b/crates/catalog/rest/src/catalog.rs index 03ef5f6baf..c784f32039 100644 --- a/crates/catalog/rest/src/catalog.rs +++ b/crates/catalog/rest/src/catalog.rs @@ -463,7 +463,11 @@ impl RestCatalog { } /// The actual logic for loading table, that supports loading vended credentials if requested. - async fn load_table_internal(&self, table_ident: &TableIdent, load_credentials: bool) -> Result
{ + async fn load_table_internal( + &self, + table_ident: &TableIdent, + load_credentials: bool, + ) -> Result
{ let context = self.context().await?; let mut request_builder = context @@ -471,7 +475,8 @@ impl RestCatalog { .request(Method::GET, context.config.table_endpoint(table_ident)); if load_credentials { - request_builder = request_builder.header("X-Iceberg-Access-Delegation", "vended-credentials"); + request_builder = + request_builder.header("X-Iceberg-Access-Delegation", "vended-credentials"); } let request = request_builder.build()?; @@ -534,10 +539,7 @@ impl RestCatalog { ) -> Result { let context = self.context().await?; - let endpoint = format!( - "{}/credentials", - context.config.table_endpoint(table_ident) - ); + let endpoint = format!("{}/credentials", context.config.table_endpoint(table_ident)); let request = context.client.request(Method::GET, endpoint).build()?; @@ -1052,12 +1054,12 @@ impl Catalog for RestCatalog { #[cfg(test)] mod tests { - use futures::stream::StreamExt; use std::fs::File; use std::io::BufReader; use std::sync::Arc; use chrono::{TimeZone, Utc}; + use futures::stream::StreamExt; use iceberg::spec::{ FormatVersion, NestedField, NullOrder, Operation, PrimitiveType, Schema, Snapshot, SnapshotLog, SortDirection, SortField, SortOrder, Summary, Transform, Type, @@ -2829,10 +2831,10 @@ mod tests { async fn test_load_table_credentials_integration() { use std::env; - let client_id = env::var("POLARIS_USER") - .expect("POLARIS_USER environment variable must be set"); - let client_secret = env::var("POLARIS_SECRET") - .expect("POLARIS_SECRET environment variable must be set"); + let client_id = + env::var("POLARIS_USER").expect("POLARIS_USER environment variable must be set"); + let client_secret = + env::var("POLARIS_SECRET").expect("POLARIS_SECRET environment variable must be set"); let catalog_uri = env::var("POLARIS_URI") .unwrap_or_else(|_| "http://localhost:8181/api/catalog".to_string()); @@ -2842,7 +2844,10 @@ mod tests { format!("{}:{}", client_id, client_secret), ); props.insert("scope".to_string(), "PRINCIPAL_ROLE:ALL".to_string()); - props.insert("s3.endpoint".to_string(), "http://localhost:9000".to_string()); + props.insert( + "s3.endpoint".to_string(), + "http://localhost:9000".to_string(), + ); let catalog = RestCatalog::new( RestCatalogConfig::builder() @@ -2862,7 +2867,10 @@ mod tests { match credentials_result { Ok(credentials) => { println!("Successfully loaded credentials"); - println!("Number of storage credentials: {}", credentials.storage_credentials.len()); + println!( + "Number of storage credentials: {}", + credentials.storage_credentials.len() + ); // println!("Full response: {:#?}", credentials); assert!(!credentials.storage_credentials.is_empty()); } @@ -2887,7 +2895,10 @@ mod tests { let scan = table.scan().build().expect("Failed to build scan"); let mut row_count = 0; - let mut stream = scan.to_arrow().await.expect("Failed to create arrow stream"); + let mut stream = scan + .to_arrow() + .await + .expect("Failed to create arrow stream"); while let Some(batch_result) = stream.next().await { match batch_result { @@ -2927,7 +2938,9 @@ mod tests { // Try to create arrow stream - this should fail when accessing manifest list match scan.to_arrow().await { Ok(_stream) => { - panic!("Stream creation succeeded without vended credentials - this should not happen!"); + panic!( + "Stream creation succeeded without vended credentials - this should not happen!" + ); } Err(e) => { println!("✓ Scan failed as expected without vended credentials"); @@ -2935,10 +2948,11 @@ mod tests { // Verify it's a permission/authentication error let error_msg = e.to_string(); assert!( - error_msg.contains("PermissionDenied") && - error_msg.contains("InvalidAccessKeyId") && - error_msg.contains("403"), - "Expected permission/authentication error, got: {}", error_msg + error_msg.contains("PermissionDenied") + && error_msg.contains("InvalidAccessKeyId") + && error_msg.contains("403"), + "Expected permission/authentication error, got: {}", + error_msg ); } } From 41147e8f945a01f76bdc6dcf9128884a0e47de10 Mon Sep 17 00:00:00 2001 From: Vukasin Stefanovic Date: Wed, 24 Dec 2025 16:21:12 +0100 Subject: [PATCH 5/9] cargo fmt --- crates/catalog/rest/src/catalog.rs | 44 +++++++++++++++----------- crates/iceberg/src/io/storage_azdls.rs | 34 +++++++++++++------- 2 files changed, 47 insertions(+), 31 deletions(-) diff --git a/crates/catalog/rest/src/catalog.rs b/crates/catalog/rest/src/catalog.rs index ae13d11235..d92b4e22d1 100644 --- a/crates/catalog/rest/src/catalog.rs +++ b/crates/catalog/rest/src/catalog.rs @@ -463,7 +463,11 @@ impl RestCatalog { } /// The actual logic for loading table, that supports loading vended credentials if requested. - async fn load_table_internal(&self, table_ident: &TableIdent, load_credentials: bool) -> Result
{ + async fn load_table_internal( + &self, + table_ident: &TableIdent, + load_credentials: bool, + ) -> Result
{ let context = self.context().await?; let mut request_builder = context @@ -471,7 +475,8 @@ impl RestCatalog { .request(Method::GET, context.config.table_endpoint(table_ident)); if load_credentials { - request_builder = request_builder.header("X-Iceberg-Access-Delegation", "vended-credentials"); + request_builder = + request_builder.header("X-Iceberg-Access-Delegation", "vended-credentials"); } let request = request_builder.build()?; @@ -534,10 +539,7 @@ impl RestCatalog { ) -> Result { let context = self.context().await?; - let endpoint = format!( - "{}/credentials", - context.config.table_endpoint(table_ident) - ); + let endpoint = format!("{}/credentials", context.config.table_endpoint(table_ident)); let request = context.client.request(Method::GET, endpoint).build()?; @@ -1052,12 +1054,12 @@ impl Catalog for RestCatalog { #[cfg(test)] mod tests { - use futures::stream::StreamExt; use std::fs::File; use std::io::BufReader; use std::sync::Arc; use chrono::{TimeZone, Utc}; + use futures::stream::StreamExt; use iceberg::spec::{ FormatVersion, NestedField, NullOrder, Operation, PrimitiveType, Schema, Snapshot, SnapshotLog, SortDirection, SortField, SortOrder, Summary, Transform, Type, @@ -2829,12 +2831,13 @@ mod tests { async fn test_load_table_credentials_integration() { use std::env; - let client_id = env::var("POLARIS_USER") - .expect("POLARIS_USER environment variable must be set"); - let client_secret = env::var("POLARIS_SECRET") - .expect("POLARIS_SECRET environment variable must be set"); - let catalog_uri = env::var("POLARIS_URI") - .unwrap_or_else(|_| "https://apb52872.snowflakecomputing.com/polaris/api/catalog".to_string()); + let client_id = + env::var("POLARIS_USER").expect("POLARIS_USER environment variable must be set"); + let client_secret = + env::var("POLARIS_SECRET").expect("POLARIS_SECRET environment variable must be set"); + let catalog_uri = env::var("POLARIS_URI").unwrap_or_else(|_| { + "https://apb52872.snowflakecomputing.com/polaris/api/catalog".to_string() + }); let mut props = HashMap::new(); props.insert( @@ -2852,17 +2855,17 @@ mod tests { .build(), ); - let table_ident = TableIdent::new( - NamespaceIdent::new("ns1".to_string()), - "T2".to_string(), - ); + let table_ident = TableIdent::new(NamespaceIdent::new("ns1".to_string()), "T2".to_string()); let credentials_result = catalog.load_table_credentials(&table_ident).await; match credentials_result { Ok(credentials) => { println!("Successfully loaded credentials"); - println!("Number of storage credentials: {}", credentials.storage_credentials.len()); + println!( + "Number of storage credentials: {}", + credentials.storage_credentials.len() + ); println!("Full response: {:#?}", credentials); assert!(!credentials.storage_credentials.is_empty()); } @@ -2887,7 +2890,10 @@ mod tests { let scan = table.scan().build().expect("Failed to build scan"); let mut row_count = 0; - let mut stream = scan.to_arrow().await.expect("Failed to create arrow stream"); + let mut stream = scan + .to_arrow() + .await + .expect("Failed to create arrow stream"); while let Some(batch_result) = stream.next().await { match batch_result { diff --git a/crates/iceberg/src/io/storage_azdls.rs b/crates/iceberg/src/io/storage_azdls.rs index c78a389724..2d0ad02e0a 100644 --- a/crates/iceberg/src/io/storage_azdls.rs +++ b/crates/iceberg/src/io/storage_azdls.rs @@ -110,14 +110,15 @@ pub(crate) fn azdls_config_parse(mut properties: HashMap) -> Res /// 2. If not found, fall back to searching for keys matching `adls.sas-token` prefix /// 3. Return the shortest matching key (least specific) /// 4. Trim leading '?' from the token if present -fn find_sas_token(properties: &HashMap, account_name: Option<&str>) -> Option { +fn find_sas_token( + properties: &HashMap, + account_name: Option<&str>, +) -> Option { // Helper function to search for token with a given prefix let find_with_prefix = |prefix: &str| { properties .iter() - .filter(|(key, _)| { - key.as_str() == prefix || key.starts_with(&format!("{}.", prefix)) - }) + .filter(|(key, _)| key.as_str() == prefix || key.starts_with(&format!("{}.", prefix))) .min_by_key(|(key, _)| key.len()) .map(|(_, value)| value.strip_prefix('?').unwrap_or(value).to_string()) }; @@ -458,14 +459,17 @@ mod tests { ( "account-specific SAS token with full domain", HashMap::from([ - (super::ADLS_ACCOUNT_NAME.to_string(), "vukasineusstorage1".to_string()), + ( + super::ADLS_ACCOUNT_NAME.to_string(), + "vukasineusstorage1".to_string(), + ), ( "adls.sas-token.vukasineusstorage1.blob.core.windows.net".to_string(), - "token-full".to_string() + "token-full".to_string(), ), ( "adls.sas-token.vukasineusstorage1".to_string(), - "token-account".to_string() + "token-account".to_string(), ), ]), Some(AzdlsConfig { @@ -477,10 +481,13 @@ mod tests { ( "account-specific SAS token with only full domain", HashMap::from([ - (super::ADLS_ACCOUNT_NAME.to_string(), "myaccount".to_string()), + ( + super::ADLS_ACCOUNT_NAME.to_string(), + "myaccount".to_string(), + ), ( "adls.sas-token.myaccount.blob.core.windows.net".to_string(), - "token-specific".to_string() + "token-specific".to_string(), ), ]), Some(AzdlsConfig { @@ -492,14 +499,17 @@ mod tests { ( "SAS token without account name picks shortest", HashMap::from([ - (super::ADLS_SAS_TOKEN.to_string(), "token-generic".to_string()), + ( + super::ADLS_SAS_TOKEN.to_string(), + "token-generic".to_string(), + ), ( "adls.sas-token.someaccount".to_string(), - "token-account".to_string() + "token-account".to_string(), ), ( "adls.sas-token.someaccount.blob.core.windows.net".to_string(), - "token-specific".to_string() + "token-specific".to_string(), ), ]), Some(AzdlsConfig { From bf8cd40723523d7bf3d3f2d6b57acf5d5732ea3f Mon Sep 17 00:00:00 2001 From: Vukasin Stefanovic Date: Fri, 26 Dec 2025 09:42:54 +0100 Subject: [PATCH 6/9] fix test - we have to validate scheme, to ensure at least there's one of these two. But we allow both. --- crates/iceberg/src/io/storage_azdls.rs | 39 +++++++++++++------------- 1 file changed, 19 insertions(+), 20 deletions(-) diff --git a/crates/iceberg/src/io/storage_azdls.rs b/crates/iceberg/src/io/storage_azdls.rs index 2d0ad02e0a..cadaf13f46 100644 --- a/crates/iceberg/src/io/storage_azdls.rs +++ b/crates/iceberg/src/io/storage_azdls.rs @@ -359,29 +359,28 @@ fn parse_azure_storage_endpoint(url: &Url) -> Result<(&str, &str, &str)> { } fn validate_storage_and_scheme( - _storage_service: &str, + storage_service: &str, scheme_str: &str, ) -> Result { let scheme = scheme_str.parse::()?; - Ok(scheme) - // match scheme { - // AzureStorageScheme::Abfss | AzureStorageScheme::Abfs => { - // ensure_data_valid!( - // storage_service == "dfs", - // "AzureStoragePath: Unexpected storage service for abfs[s]: {}", - // storage_service - // ); - // Ok(scheme) - // } - // AzureStorageScheme::Wasbs | AzureStorageScheme::Wasb => { - // ensure_data_valid!( - // storage_service == "blob", - // "AzureStoragePath: Unexpected storage service for wasb[s]: {}", - // storage_service - // ); - // Ok(scheme) - // } - // } + match scheme { + AzureStorageScheme::Abfss | AzureStorageScheme::Abfs => { + ensure_data_valid!( + storage_service == "dfs" || storage_service == "blob", + "AzureStoragePath: Unexpected storage service for abfs[s]: {}", + storage_service + ); + Ok(scheme) + } + AzureStorageScheme::Wasbs | AzureStorageScheme::Wasb => { + ensure_data_valid!( + storage_service == "blob" || storage_service == "dfs", + "AzureStoragePath: Unexpected storage service for wasb[s]: {}", + storage_service + ); + Ok(scheme) + } + } } #[cfg(test)] From ae2a13dcf3019792e2d2741623e6550404d2b48d Mon Sep 17 00:00:00 2001 From: Vukasin Stefanovic Date: Fri, 26 Dec 2025 09:43:50 +0100 Subject: [PATCH 7/9] . --- crates/iceberg/src/io/storage_azdls.rs | 24 ++++++------------------ 1 file changed, 6 insertions(+), 18 deletions(-) diff --git a/crates/iceberg/src/io/storage_azdls.rs b/crates/iceberg/src/io/storage_azdls.rs index cadaf13f46..4781577773 100644 --- a/crates/iceberg/src/io/storage_azdls.rs +++ b/crates/iceberg/src/io/storage_azdls.rs @@ -363,24 +363,12 @@ fn validate_storage_and_scheme( scheme_str: &str, ) -> Result { let scheme = scheme_str.parse::()?; - match scheme { - AzureStorageScheme::Abfss | AzureStorageScheme::Abfs => { - ensure_data_valid!( - storage_service == "dfs" || storage_service == "blob", - "AzureStoragePath: Unexpected storage service for abfs[s]: {}", - storage_service - ); - Ok(scheme) - } - AzureStorageScheme::Wasbs | AzureStorageScheme::Wasb => { - ensure_data_valid!( - storage_service == "blob" || storage_service == "dfs", - "AzureStoragePath: Unexpected storage service for wasb[s]: {}", - storage_service - ); - Ok(scheme) - } - } + ensure_data_valid!( + storage_service == "dfs" || storage_service == "blob", + "AzureStoragePath: Unexpected storage service for abfs[s]: {}", + storage_service + ); + Ok(scheme) } #[cfg(test)] From eaa6d84f9ae98a1456ac342fcc8c832a1ae2209a Mon Sep 17 00:00:00 2001 From: Vukasin Stefanovic Date: Fri, 26 Dec 2025 11:49:11 +0100 Subject: [PATCH 8/9] . --- crates/catalog/rest/Cargo.toml | 3 --- 1 file changed, 3 deletions(-) diff --git a/crates/catalog/rest/Cargo.toml b/crates/catalog/rest/Cargo.toml index 81003f2851..44e4a22e32 100644 --- a/crates/catalog/rest/Cargo.toml +++ b/crates/catalog/rest/Cargo.toml @@ -28,9 +28,6 @@ keywords = ["iceberg", "rest", "catalog"] license = { workspace = true } repository = { workspace = true } -[features] -storage-azdls = ["iceberg/storage-azdls"] - [dependencies] async-trait = { workspace = true } chrono = { workspace = true } From f92f7564a0ea4278a9159970c85fc8200ad70016 Mon Sep 17 00:00:00 2001 From: Vukasin Stefanovic Date: Fri, 26 Dec 2025 11:53:30 +0100 Subject: [PATCH 9/9] comment --- crates/iceberg/src/io/storage_azdls.rs | 15 +++++++++++---- 1 file changed, 11 insertions(+), 4 deletions(-) diff --git a/crates/iceberg/src/io/storage_azdls.rs b/crates/iceberg/src/io/storage_azdls.rs index 4781577773..7294896551 100644 --- a/crates/iceberg/src/io/storage_azdls.rs +++ b/crates/iceberg/src/io/storage_azdls.rs @@ -363,6 +363,13 @@ fn validate_storage_and_scheme( scheme_str: &str, ) -> Result { let scheme = scheme_str.parse::()?; + // Azure actually is oblivious to what we use for the scheme here. + // It actually supports both dfs and blob endpoints for all storage kinds. + // We should route those to different OpenDAL operators, but given that we don't + // do that today but map both schemes/endpoints to the same ADLS OpenDAL operator + // (which uses dfs endpoint), we might as well accept wasb URL for dfs endpoint, + // and abfs URL for blob endpoint. Especially since some implementations (e.g. Snowflake) + // always use abfs in URL, regardless of the endpoint. ensure_data_valid!( storage_service == "dfs" || storage_service == "blob", "AzureStoragePath: Unexpected storage service for abfs[s]: {}", @@ -448,19 +455,19 @@ mod tests { HashMap::from([ ( super::ADLS_ACCOUNT_NAME.to_string(), - "vukasineusstorage1".to_string(), + "azteststorage".to_string(), ), ( - "adls.sas-token.vukasineusstorage1.blob.core.windows.net".to_string(), + "adls.sas-token.azteststorage.blob.core.windows.net".to_string(), "token-full".to_string(), ), ( - "adls.sas-token.vukasineusstorage1".to_string(), + "adls.sas-token.azteststorage".to_string(), "token-account".to_string(), ), ]), Some(AzdlsConfig { - account_name: Some("vukasineusstorage1".to_string()), + account_name: Some("azteststorage".to_string()), sas_token: Some("token-account".to_string()), // Should pick the shorter one ..Default::default() }),