Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions crates/catalog/rest/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ uuid = { workspace = true, features = ["v4"] }

[dev-dependencies]
ctor = { workspace = true }
futures = { workspace = true }
iceberg_test_utils = { path = "../../test_utils", features = ["tests"] }
mockito = { workspace = true }
port_scanner = { workspace = true }
Expand Down
288 changes: 243 additions & 45 deletions crates/catalog/rest/src/catalog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,8 +44,8 @@ use crate::client::{
};
use crate::types::{
CatalogConfig, CommitTableRequest, CommitTableResponse, CreateTableRequest,
ListNamespaceResponse, ListTableResponse, LoadTableResponse, NamespaceSerde,
RegisterTableRequest, RenameTableRequest,
ListNamespaceResponse, ListTableResponse, LoadCredentialsResponse, LoadTableResponse,
NamespaceSerde, RegisterTableRequest, RenameTableRequest,
};

/// REST catalog URI
Expand Down Expand Up @@ -461,6 +461,108 @@ impl RestCatalog {
pub async fn regenerate_token(&self) -> Result<()> {
self.context().await?.client.regenerate_token().await
}

/// The actual logic for loading table, that supports loading vended credentials if requested.
async fn load_table_internal(
&self,
table_ident: &TableIdent,
load_credentials: bool,
) -> Result<Table> {
let context = self.context().await?;

let mut request_builder = context
.client
.request(Method::GET, context.config.table_endpoint(table_ident));

if load_credentials {
request_builder =
request_builder.header("X-Iceberg-Access-Delegation", "vended-credentials");
}

let request = request_builder.build()?;

let http_response = context.client.query_catalog(request).await?;

let response = match http_response.status() {
StatusCode::OK | StatusCode::NOT_MODIFIED => {
deserialize_catalog_response::<LoadTableResponse>(http_response).await?
}
StatusCode::NOT_FOUND => {
return Err(Error::new(
ErrorKind::Unexpected,
"Tried to load a table that does not exist",
));
}
_ => return Err(deserialize_unexpected_catalog_error(http_response).await),
};

// Build config with proper precedence, with each next config overriding previous one:
// 1. response.config (server defaults)
// 2. user_config.props (user configuration)
// 3. storage_credentials (vended credentials - highest priority)
let mut config: HashMap<String, String> = response
.config
.unwrap_or_default()
.into_iter()
.chain(self.user_config.props.clone())
.collect();

// Per the OpenAPI spec: "Clients must first check whether the respective credentials
// exist in the storage-credentials field before checking the config for credentials."
// When vended-credentials header is set, credentials are returned in storage_credentials field.
if let Some(storage_credentials) = response.storage_credentials {
for cred in storage_credentials {
config.extend(cred.config);
}
}

let file_io = self
.load_file_io(response.metadata_location.as_deref(), Some(config))
.await?;

let table_builder = Table::builder()
.identifier(table_ident.clone())
.file_io(file_io)
.metadata(response.metadata);

if let Some(metadata_location) = response.metadata_location {
table_builder.metadata_location(metadata_location).build()
} else {
table_builder.build()
}
}

/// Load vended credentials for a table from the catalog.
pub async fn load_table_credentials(
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not sure if this should be part of the Catalog trait...

&self,
table_ident: &TableIdent,
) -> Result<LoadCredentialsResponse> {
let context = self.context().await?;

let endpoint = format!("{}/credentials", context.config.table_endpoint(table_ident));

let request = context.client.request(Method::GET, endpoint).build()?;

let http_response = context.client.query_catalog(request).await?;

match http_response.status() {
StatusCode::OK => deserialize_catalog_response(http_response).await,
StatusCode::NOT_FOUND => Err(Error::new(
ErrorKind::Unexpected,
"Tried to load credentials for a table that does not exist",
)),
_ => Err(deserialize_unexpected_catalog_error(http_response).await),
}
}

/// Load a table with vended credentials from the catalog.
///
/// This method loads the table and automatically fetches short-lived credentials
/// for accessing the table's data files. The credentials are merged into the
/// FileIO configuration.
pub async fn load_table_with_credentials(&self, table_ident: &TableIdent) -> Result<Table> {
self.load_table_internal(table_ident, true).await
}
}

/// All requests and expected responses are derived from the REST catalog API spec:
Expand Down Expand Up @@ -754,49 +856,7 @@ impl Catalog for RestCatalog {
/// server and the config provided when creating this `RestCatalog` instance, then the value
/// provided locally to the `RestCatalog` will take precedence.
async fn load_table(&self, table_ident: &TableIdent) -> Result<Table> {
let context = self.context().await?;

let request = context
.client
.request(Method::GET, context.config.table_endpoint(table_ident))
.build()?;

let http_response = context.client.query_catalog(request).await?;

let response = match http_response.status() {
StatusCode::OK | StatusCode::NOT_MODIFIED => {
deserialize_catalog_response::<LoadTableResponse>(http_response).await?
}
StatusCode::NOT_FOUND => {
return Err(Error::new(
ErrorKind::Unexpected,
"Tried to load a table that does not exist",
));
}
_ => return Err(deserialize_unexpected_catalog_error(http_response).await),
};

let config = response
.config
.unwrap_or_default()
.into_iter()
.chain(self.user_config.props.clone())
.collect();

let file_io = self
.load_file_io(response.metadata_location.as_deref(), Some(config))
.await?;

let table_builder = Table::builder()
.identifier(table_ident.clone())
.file_io(file_io)
.metadata(response.metadata);

if let Some(metadata_location) = response.metadata_location {
table_builder.metadata_location(metadata_location).build()
} else {
table_builder.build()
}
self.load_table_internal(table_ident, false).await
}

/// Drop a table from the catalog.
Expand Down Expand Up @@ -999,6 +1059,7 @@ mod tests {
use std::sync::Arc;

use chrono::{TimeZone, Utc};
use futures::stream::StreamExt;
use iceberg::spec::{
FormatVersion, NestedField, NullOrder, Operation, PrimitiveType, Schema, Snapshot,
SnapshotLog, SortDirection, SortField, SortOrder, Summary, Transform, Type,
Expand Down Expand Up @@ -2764,4 +2825,141 @@ mod tests {
assert_eq!(err.message(), "Catalog uri is required");
}
}

#[tokio::test]
#[ignore]
async fn test_load_table_credentials_integration() {
use std::env;

let client_id =
env::var("POLARIS_USER").expect("POLARIS_USER environment variable must be set");
let client_secret =
env::var("POLARIS_SECRET").expect("POLARIS_SECRET environment variable must be set");
let catalog_uri = env::var("POLARIS_URI")
.unwrap_or_else(|_| "http://localhost:8181/api/catalog".to_string());

let mut props = HashMap::new();
props.insert(
"credential".to_string(),
format!("{}:{}", client_id, client_secret),
);
props.insert("scope".to_string(), "PRINCIPAL_ROLE:ALL".to_string());
props.insert(
"s3.endpoint".to_string(),
"http://localhost:9000".to_string(),
);

let catalog = RestCatalog::new(
RestCatalogConfig::builder()
.uri(catalog_uri)
.warehouse("warehouse".to_string())
.props(props)
.build(),
);

let table_ident = TableIdent::new(
NamespaceIdent::new("tpch.sf01".to_string()),
"nation".to_string(),
);

let credentials_result = catalog.load_table_credentials(&table_ident).await;

match credentials_result {
Ok(credentials) => {
println!("Successfully loaded credentials");
println!(
"Number of storage credentials: {}",
credentials.storage_credentials.len()
);
// println!("Full response: {:#?}", credentials);
assert!(!credentials.storage_credentials.is_empty());
}
Err(e) => {
panic!("Failed to load table credentials: {:?}", e);
}
}

// Also test loading table with vended credentials
println!("\n--- Testing load_table_with_credentials ---");
let table_result = catalog.load_table_with_credentials(&table_ident).await;

match table_result {
Ok(table) => {
println!("Successfully loaded table with vended credentials");
println!("Table identifier: {}", table.identifier());
println!("Metadata location: {:?}", table.metadata_location());
println!("FileIO configured with vended credentials");

// Scan the table and count rows
println!("\n--- Scanning table ---");
let scan = table.scan().build().expect("Failed to build scan");
let mut row_count = 0;

let mut stream = scan
.to_arrow()
.await
.expect("Failed to create arrow stream");

while let Some(batch_result) = stream.next().await {
match batch_result {
Ok(batch) => {
row_count += batch.num_rows();
println!(" Batch: {} rows", batch.num_rows());
}
Err(e) => {
panic!("Failed to read batch: {:?}", e);
}
}
}

println!("Total rows scanned: {}", row_count);
assert_eq!(row_count, 25, "Expected 25 rows in nation table");
println!("✓ Successfully verified 25 rows in table");
}
Err(e) => {
panic!("Failed to load table with vended credentials: {:?}", e);
}
}

// Test loading table WITHOUT vended credentials and verify scan fails
println!("\n--- Testing load_table WITHOUT vended credentials (should fail) ---");
let table_result_no_creds = catalog.load_table(&table_ident).await;

match table_result_no_creds {
Ok(table) => {
println!("Successfully loaded table WITHOUT vended credentials");
println!("Table identifier: {}", table.identifier());
println!("Metadata location: {:?}", table.metadata_location());

// Try to scan the table - this should fail
println!("\n--- Attempting to scan table without credentials ---");
let scan = table.scan().build().expect("Failed to build scan");

// Try to create arrow stream - this should fail when accessing manifest list
match scan.to_arrow().await {
Ok(_stream) => {
panic!(
"Stream creation succeeded without vended credentials - this should not happen!"
);
}
Err(e) => {
println!("✓ Scan failed as expected without vended credentials");
println!("Error: {}", e);
// Verify it's a permission/authentication error
let error_msg = e.to_string();
assert!(
error_msg.contains("PermissionDenied")
&& error_msg.contains("InvalidAccessKeyId")
&& error_msg.contains("403"),
"Expected permission/authentication error, got: {}",
error_msg
);
}
}
}
Err(e) => {
panic!("Failed to load table without vended credentials: {:?}", e);
}
}
}
}
13 changes: 13 additions & 0 deletions crates/catalog/rest/src/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -164,6 +164,7 @@ pub(super) struct LoadTableResponse {
pub(super) metadata_location: Option<String>,
pub(super) metadata: TableMetadata,
pub(super) config: Option<HashMap<String, String>>,
pub(super) storage_credentials: Option<Vec<StorageCredential>>,
}

#[derive(Debug, Serialize, Deserialize)]
Expand Down Expand Up @@ -199,3 +200,15 @@ pub(super) struct RegisterTableRequest {
pub(super) metadata_location: String,
pub(super) overwrite: Option<bool>,
}

#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct StorageCredential {
pub prefix: String,
pub config: HashMap<String, String>,
}

#[derive(Debug, Serialize, Deserialize)]
#[serde(rename_all = "kebab-case")]
pub struct LoadCredentialsResponse {
pub storage_credentials: Vec<StorageCredential>,
}
Loading