diff --git a/CHANGELOG.md b/CHANGELOG.md index 87ffaaf6..65dcc281 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -5,10 +5,10 @@ ### Fixed - Don't panic on invalid authorization config. Previously, a missing OPA ConfigMap would crash the operator ([#667]). -- getting_started: Add a 120 second timeout before trying to enable the DAG ([#665]). +- Fix OPA authorization for Airflow 3. Airflow 3 needs to be configured via env variables, the operator now does this correctly ([#668]). -[#665]: https://github.com/stackabletech/airflow-operator/pull/665 [#667]: https://github.com/stackabletech/airflow-operator/pull/667 +[#668]: https://github.com/stackabletech/airflow-operator/pull/668 ## [25.7.0] - 2025-07-23 diff --git a/rust/operator-binary/src/airflow_controller.rs b/rust/operator-binary/src/airflow_controller.rs index fa526521..e6354935 100644 --- a/rust/operator-binary/src/airflow_controller.rs +++ b/rust/operator-binary/src/airflow_controller.rs @@ -723,8 +723,13 @@ fn build_rolegroup_config_map( let mut config: BTreeMap = BTreeMap::new(); // this will call default values from AirflowClientAuthenticationDetails - config::add_airflow_config(&mut config, authentication_config, authorization_config) - .context(ConstructConfigSnafu)?; + config::add_airflow_config( + &mut config, + authentication_config, + authorization_config, + &resolved_product_image.product_version, + ) + .context(ConstructConfigSnafu)?; tracing::debug!( "Default config for {}: {:?}", diff --git a/rust/operator-binary/src/config.rs b/rust/operator-binary/src/config.rs index f130e5e9..374f68d3 100644 --- a/rust/operator-binary/src/config.rs +++ b/rust/operator-binary/src/config.rs @@ -13,7 +13,7 @@ use crate::crd::{ AirflowAuthenticationClassResolved, AirflowClientAuthenticationDetailsResolved, DEFAULT_OIDC_PROVIDER, FlaskRolesSyncMoment, }, - authorization::{AirflowAuthorizationResolved, OpaConfigResolved}, + authorization::AirflowAuthorizationResolved, }; pub const PYTHON_IMPORTS: &[&str] = &[ @@ -41,6 +41,7 @@ pub fn add_airflow_config( config: &mut BTreeMap, authentication_config: &AirflowClientAuthenticationDetailsResolved, authorization_config: &AirflowAuthorizationResolved, + product_version: &str, ) -> Result<()> { if !config.contains_key(&*AirflowConfigOptions::AuthType.to_string()) { config.insert( @@ -51,7 +52,7 @@ pub fn add_airflow_config( } append_authentication_config(config, authentication_config)?; - append_authorization_config(config, authorization_config)?; + append_authorization_config(config, authorization_config, product_version); Ok(()) } @@ -275,32 +276,30 @@ fn append_oidc_config( fn append_authorization_config( config: &mut BTreeMap, authorization_config: &AirflowAuthorizationResolved, -) -> Result<(), Error> { - if let Some(opa_config) = &authorization_config.opa { - append_opa_config(config, opa_config)?; + product_version: &str, +) { + // See `env_vars::authorization_env_vars` for why we only care about Airflow 2 + if !product_version.starts_with("2.") { + return; } + let Some(opa_config) = &authorization_config.opa else { + return; + }; - Ok(()) -} - -fn append_opa_config( - config: &mut BTreeMap, - opa_config: &OpaConfigResolved, -) -> Result<(), Error> { - config.insert( - AirflowConfigOptions::AuthOpaRequestUrl.to_string(), - opa_config.connection_string.to_owned(), - ); - config.insert( - AirflowConfigOptions::AuthOpaCacheTtlInSec.to_string(), - opa_config.cache_entry_time_to_live.as_secs().to_string(), - ); - config.insert( - AirflowConfigOptions::AuthOpaCacheMaxsize.to_string(), - opa_config.cache_max_entries.to_string(), - ); - - Ok(()) + config.extend([ + ( + AirflowConfigOptions::AuthOpaRequestUrl.to_string(), + opa_config.connection_string.to_owned(), + ), + ( + AirflowConfigOptions::AuthOpaCacheTtlInSec.to_string(), + opa_config.cache_entry_time_to_live.as_secs().to_string(), + ), + ( + AirflowConfigOptions::AuthOpaCacheMaxsize.to_string(), + opa_config.cache_max_entries.to_string(), + ), + ]); } #[cfg(test)] @@ -325,6 +324,8 @@ mod tests { }, }; + const TEST_AIRFLOW_VERSION: &str = "3.0.1"; + #[test] fn test_auth_db_config() { let authentication_config = AirflowClientAuthenticationDetailsResolved { @@ -337,7 +338,13 @@ mod tests { let authorization_config = AirflowAuthorizationResolved { opa: None }; let mut result = BTreeMap::new(); - add_airflow_config(&mut result, &authentication_config, &authorization_config).expect("Ok"); + add_airflow_config( + &mut result, + &authentication_config, + &authorization_config, + TEST_AIRFLOW_VERSION, + ) + .expect("Ok"); assert_eq!( BTreeMap::from([ @@ -382,7 +389,13 @@ mod tests { let authorization_config = AirflowAuthorizationResolved { opa: None }; let mut result = BTreeMap::new(); - add_airflow_config(&mut result, &authentication_config, &authorization_config).expect("Ok"); + add_airflow_config( + &mut result, + &authentication_config, + &authorization_config, + TEST_AIRFLOW_VERSION, + ) + .expect("Ok"); assert_eq!(BTreeMap::from([ ("AUTH_LDAP_ALLOW_SELF_SIGNED".into(), "false".into()), @@ -468,7 +481,13 @@ mod tests { let authorization_config = AirflowAuthorizationResolved { opa: None }; let mut result = BTreeMap::new(); - add_airflow_config(&mut result, &authentication_config, &authorization_config).expect("Ok"); + add_airflow_config( + &mut result, + &authentication_config, + &authorization_config, + TEST_AIRFLOW_VERSION, + ) + .expect("Ok"); assert_eq!( BTreeMap::from([ @@ -532,16 +551,16 @@ mod tests { }; let mut result = BTreeMap::new(); - add_airflow_config(&mut result, &authentication_config, &authorization_config).expect("Ok"); + add_airflow_config( + &mut result, + &authentication_config, + &authorization_config, + TEST_AIRFLOW_VERSION, + ) + .expect("Ok"); assert_eq!( BTreeMap::from([ - ("AUTH_OPA_CACHE_MAXSIZE".into(), "1000".into()), - ("AUTH_OPA_CACHE_TTL_IN_SEC".into(), "30".into()), - ( - "AUTH_OPA_REQUEST_URL".into(), - "http://opa:8081/v1/data/airflow".into() - ), ("AUTH_ROLES_SYNC_AT_LOGIN".into(), "false".into()), ("AUTH_TYPE".into(), "AUTH_DB".into()), ("AUTH_USER_REGISTRATION".into(), "true".into()), diff --git a/rust/operator-binary/src/crd/mod.rs b/rust/operator-binary/src/crd/mod.rs index 8704eff4..e4b357ff 100644 --- a/rust/operator-binary/src/crd/mod.rs +++ b/rust/operator-binary/src/crd/mod.rs @@ -128,6 +128,9 @@ pub enum AirflowConfigOptions { AuthLdapTlsKeyfile, AuthLdapTlsCacertfile, AuthLdapAllowSelfSigned, + // OPA configs for Airflow 2 + // Airflow 3 configs need to be passed via env variables! + // See `env_vars::authorization_env_vars` for details AuthOpaCacheMaxsize, AuthOpaCacheTtlInSec, AuthOpaRequestUrl, diff --git a/rust/operator-binary/src/env_vars.rs b/rust/operator-binary/src/env_vars.rs index 9a10ad0b..25a6a112 100644 --- a/rust/operator-binary/src/env_vars.rs +++ b/rust/operator-binary/src/env_vars.rs @@ -27,6 +27,11 @@ use crate::{ }; const AIRFLOW_CORE_AUTH_MANAGER: &str = "AIRFLOW__CORE__AUTH_MANAGER"; +// Airflow 3 envs +const AIRFLOW_CORE_AUTH_OPA_REQUEST_URL: &str = "AIRFLOW__CORE__AUTH_OPA_REQUEST_URL"; +const AIRFLOW_CORE_AUTH_OPA_CACHE_TTL_IN_SEC: &str = "AIRFLOW__CORE__AUTH_OPA_CACHE_TTL_IN_SEC"; +const AIRFLOW_CORE_AUTH_OPA_CACHE_MAXSIZE: &str = "AIRFLOW__CORE__AUTH_OPA_CACHE_MAXSIZE"; + const AIRFLOW_LOGGING_LOGGING_CONFIG_CLASS: &str = "AIRFLOW__LOGGING__LOGGING_CONFIG_CLASS"; const AIRFLOW_METRICS_STATSD_ON: &str = "AIRFLOW__METRICS__STATSD_ON"; const AIRFLOW_METRICS_STATSD_HOST: &str = "AIRFLOW__METRICS__STATSD_HOST"; @@ -231,7 +236,10 @@ pub fn build_airflow_statefulset_envs( } AirflowRole::Webserver => { let mut vars = authentication_env_vars(auth_config); - vars.extend(authorization_env_vars(authorization_config)); + vars.extend(authorization_env_vars( + authorization_config, + &resolved_product_image.product_version, + )); env.extend(vars.into_iter().map(|var| (var.name.to_owned(), var))); } _ => {} @@ -554,15 +562,45 @@ fn authentication_env_vars( .collect() } -fn authorization_env_vars(authorization_config: &AirflowAuthorizationResolved) -> Vec { - let mut env = vec![]; - - if authorization_config.opa.is_some() { - env.push(EnvVar { - name: AIRFLOW_CORE_AUTH_MANAGER.into(), - value: Some("opa_auth_manager.opa_fab_auth_manager.OpaFabAuthManager".to_string()), - ..Default::default() - }); +/// Constructs the needed authorization env vars for the specific Airflow version. +/// +/// `AIRFLOW__CORE__AUTH_MANAGER` always needs to be set as env var. +/// +/// Airflow 2 needs to OPA settings in the `webserver_config.py` such as `AUTH_OPA_REQUEST_URL`. +/// Airflow 3 needs to OPA settings as env variables such as `AIRFLOW__CORE__AUTH_OPA_REQUEST_URL`. +fn authorization_env_vars( + authorization_config: &AirflowAuthorizationResolved, + product_version: &str, +) -> Vec { + let Some(opa) = &authorization_config.opa else { + return vec![]; + }; + + let mut env = vec![EnvVar { + name: AIRFLOW_CORE_AUTH_MANAGER.into(), + value: Some("opa_auth_manager.opa_fab_auth_manager.OpaFabAuthManager".to_string()), + ..Default::default() + }]; + if product_version.starts_with("2.") { + // OPA config needs to go into `webserver_config.py` + } else { + env.extend([ + EnvVar { + name: AIRFLOW_CORE_AUTH_OPA_REQUEST_URL.into(), + value: Some(opa.connection_string.to_owned()), + ..Default::default() + }, + EnvVar { + name: AIRFLOW_CORE_AUTH_OPA_CACHE_TTL_IN_SEC.into(), + value: Some(opa.cache_entry_time_to_live.as_secs().to_string()), + ..Default::default() + }, + EnvVar { + name: AIRFLOW_CORE_AUTH_OPA_CACHE_MAXSIZE.into(), + value: Some(opa.cache_max_entries.to_string()), + ..Default::default() + }, + ]); } env @@ -600,3 +638,73 @@ fn execution_server_env_vars(airflow: &v1alpha1::AirflowCluster) -> BTreeMap>(); + + assert_eq!( + authorization_env_vars, + [( + "AIRFLOW__CORE__AUTH_MANAGER".into(), + "opa_auth_manager.opa_fab_auth_manager.OpaFabAuthManager".into() + ),] + ); + } + + #[test] + fn test_airflow_3_authorization_env_vars() { + let authorization_config = get_test_authorization_config(); + let authorization_env_vars = authorization_env_vars(&authorization_config, "3.0.1"); + let authorization_env_vars = authorization_env_vars + .into_iter() + .map(|env| (env.name, env.value.expect("env var value must be present"))) + .collect::>(); + + assert_eq!( + authorization_env_vars, + [ + ( + "AIRFLOW__CORE__AUTH_MANAGER".into(), + "opa_auth_manager.opa_fab_auth_manager.OpaFabAuthManager".into() + ), + ( + "AIRFLOW__CORE__AUTH_OPA_REQUEST_URL".into(), + "http://opa-server.default.svc.cluster.local:8081/v1/data/airflow".into() + ), + ( + "AIRFLOW__CORE__AUTH_OPA_CACHE_TTL_IN_SEC".into(), + "30".into() + ), + ( + "AIRFLOW__CORE__AUTH_OPA_CACHE_MAXSIZE".into(), + "1000".into() + ), + ] + ); + } + + fn get_test_authorization_config() -> AirflowAuthorizationResolved { + AirflowAuthorizationResolved { + opa: Some(OpaConfigResolved { + connection_string: + "http://opa-server.default.svc.cluster.local:8081/v1/data/airflow".to_string(), + cache_entry_time_to_live: Duration::from_secs(30), + cache_max_entries: 1000, + }), + } + } +} diff --git a/tests/templates/kuttl/opa/20-assert.yaml b/tests/templates/kuttl/opa/20-assert.yaml index e868cdaf..f55ef436 100644 --- a/tests/templates/kuttl/opa/20-assert.yaml +++ b/tests/templates/kuttl/opa/20-assert.yaml @@ -3,4 +3,4 @@ apiVersion: kuttl.dev/v1beta1 kind: TestAssert timeout: 300 commands: - - script: kubectl -n $NAMESPACE rollout status daemonset opa-server-default --timeout 300s + - script: kubectl -n $NAMESPACE wait --for=condition=available --timeout=10m opacluster/test-opa diff --git a/tests/templates/kuttl/opa/20-install-opa.yaml.j2 b/tests/templates/kuttl/opa/20-install-opa.yaml.j2 index b82531c9..d7e4e0ba 100644 --- a/tests/templates/kuttl/opa/20-install-opa.yaml.j2 +++ b/tests/templates/kuttl/opa/20-install-opa.yaml.j2 @@ -7,7 +7,9 @@ metadata: apiVersion: opa.stackable.tech/v1alpha1 kind: OpaCluster metadata: - name: opa + # The OpaCluster is intentionally not only called "opa" to ensure that our custom OPA URL is + # used and not some default value of "opa". + name: test-opa spec: image: {% if test_scenario['values']['opa-latest'].find(",") > 0 %} diff --git a/tests/templates/kuttl/opa/30-install-airflow.yaml.j2 b/tests/templates/kuttl/opa/30-install-airflow.yaml.j2 index fcf057c1..631602ac 100644 --- a/tests/templates/kuttl/opa/30-install-airflow.yaml.j2 +++ b/tests/templates/kuttl/opa/30-install-airflow.yaml.j2 @@ -34,7 +34,7 @@ spec: clusterConfig: authorization: opa: - configMapName: opa + configMapName: test-opa package: airflow cache: entryTimeToLive: 5s @@ -54,7 +54,9 @@ spec: configOverrides: webserver_config.py: WTF_CSRF_ENABLED: "False" # Allow "POST /login/" without CSRF token - AUTH_OPA_CACHE_MAXSIZE_DEFAULT: "0" # disable decision caching for easy debugging + AUTH_OPA_CACHE_MAXSIZE: "0" # Airflow 2: Disable decision caching for easy debugging + envOverrides: + AIRFLOW__CORE__AUTH_OPA_CACHE_MAXSIZE: "0" # Airflow 3: Disable decision caching for easy debugging roleGroups: default: replicas: 1