From ef410e663729a3cbcf88c98b9a7dbe25f11c5b19 Mon Sep 17 00:00:00 2001 From: Sebastian Bernauer Date: Fri, 1 Aug 2025 10:34:54 +0200 Subject: [PATCH 1/5] fix: OPA authorization for Airflow 3 --- CHANGELOG.md | 3 +- .../operator-binary/src/airflow_controller.rs | 9 +- rust/operator-binary/src/config.rs | 121 +++++++---------- rust/operator-binary/src/crd/mod.rs | 3 + rust/operator-binary/src/env_vars.rs | 128 ++++++++++++++++-- tests/templates/kuttl/opa/20-assert.yaml | 2 +- .../kuttl/opa/20-install-opa.yaml.j2 | 4 +- .../kuttl/opa/30-install-airflow.yaml.j2 | 6 +- 8 files changed, 187 insertions(+), 89 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 87ffaaf6..11dd70e3 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -5,10 +5,9 @@ ### Fixed - Don't panic on invalid authorization config. Previously, a missing OPA ConfigMap would crash the operator ([#667]). -- getting_started: Add a 120 second timeout before trying to enable the DAG ([#665]). +- Fix OPA authorization for Airflow 3. Airflow 3 now needs to be configured via env variables, the operator now does this correctly ([#XXX]). [#665]: https://github.com/stackabletech/airflow-operator/pull/665 -[#667]: https://github.com/stackabletech/airflow-operator/pull/667 ## [25.7.0] - 2025-07-23 diff --git a/rust/operator-binary/src/airflow_controller.rs b/rust/operator-binary/src/airflow_controller.rs index fa526521..e6354935 100644 --- a/rust/operator-binary/src/airflow_controller.rs +++ b/rust/operator-binary/src/airflow_controller.rs @@ -723,8 +723,13 @@ fn build_rolegroup_config_map( let mut config: BTreeMap = BTreeMap::new(); // this will call default values from AirflowClientAuthenticationDetails - config::add_airflow_config(&mut config, authentication_config, authorization_config) - .context(ConstructConfigSnafu)?; + config::add_airflow_config( + &mut config, + authentication_config, + authorization_config, + &resolved_product_image.product_version, + ) + .context(ConstructConfigSnafu)?; tracing::debug!( "Default config for {}: {:?}", diff --git a/rust/operator-binary/src/config.rs b/rust/operator-binary/src/config.rs index f130e5e9..18f21a76 100644 --- a/rust/operator-binary/src/config.rs +++ b/rust/operator-binary/src/config.rs @@ -13,7 +13,7 @@ use crate::crd::{ AirflowAuthenticationClassResolved, AirflowClientAuthenticationDetailsResolved, DEFAULT_OIDC_PROVIDER, FlaskRolesSyncMoment, }, - authorization::{AirflowAuthorizationResolved, OpaConfigResolved}, + authorization::AirflowAuthorizationResolved, }; pub const PYTHON_IMPORTS: &[&str] = &[ @@ -41,6 +41,7 @@ pub fn add_airflow_config( config: &mut BTreeMap, authentication_config: &AirflowClientAuthenticationDetailsResolved, authorization_config: &AirflowAuthorizationResolved, + product_version: &str, ) -> Result<()> { if !config.contains_key(&*AirflowConfigOptions::AuthType.to_string()) { config.insert( @@ -51,7 +52,7 @@ pub fn add_airflow_config( } append_authentication_config(config, authentication_config)?; - append_authorization_config(config, authorization_config)?; + append_authorization_config(config, authorization_config, product_version); Ok(()) } @@ -275,32 +276,30 @@ fn append_oidc_config( fn append_authorization_config( config: &mut BTreeMap, authorization_config: &AirflowAuthorizationResolved, -) -> Result<(), Error> { - if let Some(opa_config) = &authorization_config.opa { - append_opa_config(config, opa_config)?; + product_version: &str, +) { + // See `env_vars::authorization_env_vars` for why we only care about Airflow 2 + if !product_version.starts_with("2.") { + return; } + let Some(opa_config) = &authorization_config.opa else { + return; + }; - Ok(()) -} - -fn append_opa_config( - config: &mut BTreeMap, - opa_config: &OpaConfigResolved, -) -> Result<(), Error> { - config.insert( - AirflowConfigOptions::AuthOpaRequestUrl.to_string(), - opa_config.connection_string.to_owned(), - ); - config.insert( - AirflowConfigOptions::AuthOpaCacheTtlInSec.to_string(), - opa_config.cache_entry_time_to_live.as_secs().to_string(), - ); - config.insert( - AirflowConfigOptions::AuthOpaCacheMaxsize.to_string(), - opa_config.cache_max_entries.to_string(), - ); - - Ok(()) + config.extend([ + ( + AirflowConfigOptions::AuthOpaRequestUrl.to_string(), + opa_config.connection_string.to_owned(), + ), + ( + AirflowConfigOptions::AuthOpaCacheTtlInSec.to_string(), + opa_config.cache_entry_time_to_live.as_secs().to_string(), + ), + ( + AirflowConfigOptions::AuthOpaCacheMaxsize.to_string(), + opa_config.cache_max_entries.to_string(), + ), + ]); } #[cfg(test)] @@ -309,10 +308,7 @@ mod tests { use indoc::formatdoc; use rstest::rstest; - use stackable_operator::{ - crd::authentication::{ldap, oidc}, - time::Duration, - }; + use stackable_operator::crd::authentication::{ldap, oidc}; use crate::{ config::add_airflow_config, @@ -321,10 +317,12 @@ mod tests { AirflowAuthenticationClassResolved, AirflowClientAuthenticationDetailsResolved, FlaskRolesSyncMoment, default_user_registration, }, - authorization::{AirflowAuthorizationResolved, OpaConfigResolved}, + authorization::AirflowAuthorizationResolved, }, }; + const TEST_AIRFLOW_VERSION: &str = "3.0.1"; + #[test] fn test_auth_db_config() { let authentication_config = AirflowClientAuthenticationDetailsResolved { @@ -337,7 +335,13 @@ mod tests { let authorization_config = AirflowAuthorizationResolved { opa: None }; let mut result = BTreeMap::new(); - add_airflow_config(&mut result, &authentication_config, &authorization_config).expect("Ok"); + add_airflow_config( + &mut result, + &authentication_config, + &authorization_config, + TEST_AIRFLOW_VERSION, + ) + .expect("Ok"); assert_eq!( BTreeMap::from([ @@ -382,7 +386,13 @@ mod tests { let authorization_config = AirflowAuthorizationResolved { opa: None }; let mut result = BTreeMap::new(); - add_airflow_config(&mut result, &authentication_config, &authorization_config).expect("Ok"); + add_airflow_config( + &mut result, + &authentication_config, + &authorization_config, + TEST_AIRFLOW_VERSION, + ) + .expect("Ok"); assert_eq!(BTreeMap::from([ ("AUTH_LDAP_ALLOW_SELF_SIGNED".into(), "false".into()), @@ -468,7 +478,13 @@ mod tests { let authorization_config = AirflowAuthorizationResolved { opa: None }; let mut result = BTreeMap::new(); - add_airflow_config(&mut result, &authentication_config, &authorization_config).expect("Ok"); + add_airflow_config( + &mut result, + &authentication_config, + &authorization_config, + TEST_AIRFLOW_VERSION, + ) + .expect("Ok"); assert_eq!( BTreeMap::from([ @@ -513,41 +529,4 @@ mod tests { result ); } - - #[test] - fn test_opa_config() { - let authentication_config = AirflowClientAuthenticationDetailsResolved { - authentication_classes_resolved: vec![], - user_registration: true, - user_registration_role: "User".to_string(), - sync_roles_at: FlaskRolesSyncMoment::Registration, - }; - - let authorization_config = AirflowAuthorizationResolved { - opa: Some(OpaConfigResolved { - connection_string: "http://opa:8081/v1/data/airflow".to_string(), - cache_entry_time_to_live: Duration::from_secs(30), - cache_max_entries: 1000, - }), - }; - - let mut result = BTreeMap::new(); - add_airflow_config(&mut result, &authentication_config, &authorization_config).expect("Ok"); - - assert_eq!( - BTreeMap::from([ - ("AUTH_OPA_CACHE_MAXSIZE".into(), "1000".into()), - ("AUTH_OPA_CACHE_TTL_IN_SEC".into(), "30".into()), - ( - "AUTH_OPA_REQUEST_URL".into(), - "http://opa:8081/v1/data/airflow".into() - ), - ("AUTH_ROLES_SYNC_AT_LOGIN".into(), "false".into()), - ("AUTH_TYPE".into(), "AUTH_DB".into()), - ("AUTH_USER_REGISTRATION".into(), "true".into()), - ("AUTH_USER_REGISTRATION_ROLE".into(), "User".into()) - ]), - result - ); - } } diff --git a/rust/operator-binary/src/crd/mod.rs b/rust/operator-binary/src/crd/mod.rs index 8704eff4..e4b357ff 100644 --- a/rust/operator-binary/src/crd/mod.rs +++ b/rust/operator-binary/src/crd/mod.rs @@ -128,6 +128,9 @@ pub enum AirflowConfigOptions { AuthLdapTlsKeyfile, AuthLdapTlsCacertfile, AuthLdapAllowSelfSigned, + // OPA configs for Airflow 2 + // Airflow 3 configs need to be passed via env variables! + // See `env_vars::authorization_env_vars` for details AuthOpaCacheMaxsize, AuthOpaCacheTtlInSec, AuthOpaRequestUrl, diff --git a/rust/operator-binary/src/env_vars.rs b/rust/operator-binary/src/env_vars.rs index 9a10ad0b..25a6a112 100644 --- a/rust/operator-binary/src/env_vars.rs +++ b/rust/operator-binary/src/env_vars.rs @@ -27,6 +27,11 @@ use crate::{ }; const AIRFLOW_CORE_AUTH_MANAGER: &str = "AIRFLOW__CORE__AUTH_MANAGER"; +// Airflow 3 envs +const AIRFLOW_CORE_AUTH_OPA_REQUEST_URL: &str = "AIRFLOW__CORE__AUTH_OPA_REQUEST_URL"; +const AIRFLOW_CORE_AUTH_OPA_CACHE_TTL_IN_SEC: &str = "AIRFLOW__CORE__AUTH_OPA_CACHE_TTL_IN_SEC"; +const AIRFLOW_CORE_AUTH_OPA_CACHE_MAXSIZE: &str = "AIRFLOW__CORE__AUTH_OPA_CACHE_MAXSIZE"; + const AIRFLOW_LOGGING_LOGGING_CONFIG_CLASS: &str = "AIRFLOW__LOGGING__LOGGING_CONFIG_CLASS"; const AIRFLOW_METRICS_STATSD_ON: &str = "AIRFLOW__METRICS__STATSD_ON"; const AIRFLOW_METRICS_STATSD_HOST: &str = "AIRFLOW__METRICS__STATSD_HOST"; @@ -231,7 +236,10 @@ pub fn build_airflow_statefulset_envs( } AirflowRole::Webserver => { let mut vars = authentication_env_vars(auth_config); - vars.extend(authorization_env_vars(authorization_config)); + vars.extend(authorization_env_vars( + authorization_config, + &resolved_product_image.product_version, + )); env.extend(vars.into_iter().map(|var| (var.name.to_owned(), var))); } _ => {} @@ -554,15 +562,45 @@ fn authentication_env_vars( .collect() } -fn authorization_env_vars(authorization_config: &AirflowAuthorizationResolved) -> Vec { - let mut env = vec![]; - - if authorization_config.opa.is_some() { - env.push(EnvVar { - name: AIRFLOW_CORE_AUTH_MANAGER.into(), - value: Some("opa_auth_manager.opa_fab_auth_manager.OpaFabAuthManager".to_string()), - ..Default::default() - }); +/// Constructs the needed authorization env vars for the specific Airflow version. +/// +/// `AIRFLOW__CORE__AUTH_MANAGER` always needs to be set as env var. +/// +/// Airflow 2 needs to OPA settings in the `webserver_config.py` such as `AUTH_OPA_REQUEST_URL`. +/// Airflow 3 needs to OPA settings as env variables such as `AIRFLOW__CORE__AUTH_OPA_REQUEST_URL`. +fn authorization_env_vars( + authorization_config: &AirflowAuthorizationResolved, + product_version: &str, +) -> Vec { + let Some(opa) = &authorization_config.opa else { + return vec![]; + }; + + let mut env = vec![EnvVar { + name: AIRFLOW_CORE_AUTH_MANAGER.into(), + value: Some("opa_auth_manager.opa_fab_auth_manager.OpaFabAuthManager".to_string()), + ..Default::default() + }]; + if product_version.starts_with("2.") { + // OPA config needs to go into `webserver_config.py` + } else { + env.extend([ + EnvVar { + name: AIRFLOW_CORE_AUTH_OPA_REQUEST_URL.into(), + value: Some(opa.connection_string.to_owned()), + ..Default::default() + }, + EnvVar { + name: AIRFLOW_CORE_AUTH_OPA_CACHE_TTL_IN_SEC.into(), + value: Some(opa.cache_entry_time_to_live.as_secs().to_string()), + ..Default::default() + }, + EnvVar { + name: AIRFLOW_CORE_AUTH_OPA_CACHE_MAXSIZE.into(), + value: Some(opa.cache_max_entries.to_string()), + ..Default::default() + }, + ]); } env @@ -600,3 +638,73 @@ fn execution_server_env_vars(airflow: &v1alpha1::AirflowCluster) -> BTreeMap>(); + + assert_eq!( + authorization_env_vars, + [( + "AIRFLOW__CORE__AUTH_MANAGER".into(), + "opa_auth_manager.opa_fab_auth_manager.OpaFabAuthManager".into() + ),] + ); + } + + #[test] + fn test_airflow_3_authorization_env_vars() { + let authorization_config = get_test_authorization_config(); + let authorization_env_vars = authorization_env_vars(&authorization_config, "3.0.1"); + let authorization_env_vars = authorization_env_vars + .into_iter() + .map(|env| (env.name, env.value.expect("env var value must be present"))) + .collect::>(); + + assert_eq!( + authorization_env_vars, + [ + ( + "AIRFLOW__CORE__AUTH_MANAGER".into(), + "opa_auth_manager.opa_fab_auth_manager.OpaFabAuthManager".into() + ), + ( + "AIRFLOW__CORE__AUTH_OPA_REQUEST_URL".into(), + "http://opa-server.default.svc.cluster.local:8081/v1/data/airflow".into() + ), + ( + "AIRFLOW__CORE__AUTH_OPA_CACHE_TTL_IN_SEC".into(), + "30".into() + ), + ( + "AIRFLOW__CORE__AUTH_OPA_CACHE_MAXSIZE".into(), + "1000".into() + ), + ] + ); + } + + fn get_test_authorization_config() -> AirflowAuthorizationResolved { + AirflowAuthorizationResolved { + opa: Some(OpaConfigResolved { + connection_string: + "http://opa-server.default.svc.cluster.local:8081/v1/data/airflow".to_string(), + cache_entry_time_to_live: Duration::from_secs(30), + cache_max_entries: 1000, + }), + } + } +} diff --git a/tests/templates/kuttl/opa/20-assert.yaml b/tests/templates/kuttl/opa/20-assert.yaml index e868cdaf..f55ef436 100644 --- a/tests/templates/kuttl/opa/20-assert.yaml +++ b/tests/templates/kuttl/opa/20-assert.yaml @@ -3,4 +3,4 @@ apiVersion: kuttl.dev/v1beta1 kind: TestAssert timeout: 300 commands: - - script: kubectl -n $NAMESPACE rollout status daemonset opa-server-default --timeout 300s + - script: kubectl -n $NAMESPACE wait --for=condition=available --timeout=10m opacluster/test-opa diff --git a/tests/templates/kuttl/opa/20-install-opa.yaml.j2 b/tests/templates/kuttl/opa/20-install-opa.yaml.j2 index b82531c9..d7e4e0ba 100644 --- a/tests/templates/kuttl/opa/20-install-opa.yaml.j2 +++ b/tests/templates/kuttl/opa/20-install-opa.yaml.j2 @@ -7,7 +7,9 @@ metadata: apiVersion: opa.stackable.tech/v1alpha1 kind: OpaCluster metadata: - name: opa + # The OpaCluster is intentionally not only called "opa" to ensure that our custom OPA URL is + # used and not some default value of "opa". + name: test-opa spec: image: {% if test_scenario['values']['opa-latest'].find(",") > 0 %} diff --git a/tests/templates/kuttl/opa/30-install-airflow.yaml.j2 b/tests/templates/kuttl/opa/30-install-airflow.yaml.j2 index fcf057c1..631602ac 100644 --- a/tests/templates/kuttl/opa/30-install-airflow.yaml.j2 +++ b/tests/templates/kuttl/opa/30-install-airflow.yaml.j2 @@ -34,7 +34,7 @@ spec: clusterConfig: authorization: opa: - configMapName: opa + configMapName: test-opa package: airflow cache: entryTimeToLive: 5s @@ -54,7 +54,9 @@ spec: configOverrides: webserver_config.py: WTF_CSRF_ENABLED: "False" # Allow "POST /login/" without CSRF token - AUTH_OPA_CACHE_MAXSIZE_DEFAULT: "0" # disable decision caching for easy debugging + AUTH_OPA_CACHE_MAXSIZE: "0" # Airflow 2: Disable decision caching for easy debugging + envOverrides: + AIRFLOW__CORE__AUTH_OPA_CACHE_MAXSIZE: "0" # Airflow 3: Disable decision caching for easy debugging roleGroups: default: replicas: 1 From b36a34d6cd2418e5a36ec6d11ba3098f79152b0b Mon Sep 17 00:00:00 2001 From: Sebastian Bernauer Date: Fri, 1 Aug 2025 10:39:00 +0200 Subject: [PATCH 2/5] changelog --- CHANGELOG.md | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 11dd70e3..51d70443 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -5,9 +5,10 @@ ### Fixed - Don't panic on invalid authorization config. Previously, a missing OPA ConfigMap would crash the operator ([#667]). -- Fix OPA authorization for Airflow 3. Airflow 3 now needs to be configured via env variables, the operator now does this correctly ([#XXX]). +- Fix OPA authorization for Airflow 3. Airflow 3 now needs to be configured via env variables, the operator now does this correctly ([#668]). [#665]: https://github.com/stackabletech/airflow-operator/pull/665 +[#668]: https://github.com/stackabletech/airflow-operator/pull/668 ## [25.7.0] - 2025-07-23 From acbbf8037641951081ec7f1e10dd5a85476a888a Mon Sep 17 00:00:00 2001 From: Sebastian Bernauer Date: Fri, 1 Aug 2025 10:39:30 +0200 Subject: [PATCH 3/5] changelog --- CHANGELOG.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 51d70443..2d096026 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -5,7 +5,7 @@ ### Fixed - Don't panic on invalid authorization config. Previously, a missing OPA ConfigMap would crash the operator ([#667]). -- Fix OPA authorization for Airflow 3. Airflow 3 now needs to be configured via env variables, the operator now does this correctly ([#668]). +- Fix OPA authorization for Airflow 3. Airflow 3 needs to be configured via env variables, the operator now does this correctly ([#668]). [#665]: https://github.com/stackabletech/airflow-operator/pull/665 [#668]: https://github.com/stackabletech/airflow-operator/pull/668 From 325d25b24d3bbeab99597315aa2efbad9fcd774e Mon Sep 17 00:00:00 2001 From: Sebastian Bernauer Date: Fri, 1 Aug 2025 10:56:46 +0200 Subject: [PATCH 4/5] Re-add AuhtZ test --- rust/operator-binary/src/config.rs | 44 ++++++++++++++++++++++++++++-- 1 file changed, 42 insertions(+), 2 deletions(-) diff --git a/rust/operator-binary/src/config.rs b/rust/operator-binary/src/config.rs index 18f21a76..374f68d3 100644 --- a/rust/operator-binary/src/config.rs +++ b/rust/operator-binary/src/config.rs @@ -308,7 +308,10 @@ mod tests { use indoc::formatdoc; use rstest::rstest; - use stackable_operator::crd::authentication::{ldap, oidc}; + use stackable_operator::{ + crd::authentication::{ldap, oidc}, + time::Duration, + }; use crate::{ config::add_airflow_config, @@ -317,7 +320,7 @@ mod tests { AirflowAuthenticationClassResolved, AirflowClientAuthenticationDetailsResolved, FlaskRolesSyncMoment, default_user_registration, }, - authorization::AirflowAuthorizationResolved, + authorization::{AirflowAuthorizationResolved, OpaConfigResolved}, }, }; @@ -529,4 +532,41 @@ mod tests { result ); } + + #[test] + fn test_opa_config() { + let authentication_config = AirflowClientAuthenticationDetailsResolved { + authentication_classes_resolved: vec![], + user_registration: true, + user_registration_role: "User".to_string(), + sync_roles_at: FlaskRolesSyncMoment::Registration, + }; + + let authorization_config = AirflowAuthorizationResolved { + opa: Some(OpaConfigResolved { + connection_string: "http://opa:8081/v1/data/airflow".to_string(), + cache_entry_time_to_live: Duration::from_secs(30), + cache_max_entries: 1000, + }), + }; + + let mut result = BTreeMap::new(); + add_airflow_config( + &mut result, + &authentication_config, + &authorization_config, + TEST_AIRFLOW_VERSION, + ) + .expect("Ok"); + + assert_eq!( + BTreeMap::from([ + ("AUTH_ROLES_SYNC_AT_LOGIN".into(), "false".into()), + ("AUTH_TYPE".into(), "AUTH_DB".into()), + ("AUTH_USER_REGISTRATION".into(), "true".into()), + ("AUTH_USER_REGISTRATION_ROLE".into(), "User".into()) + ]), + result + ); + } } From 4a019d7664d495952162d82cf1fb75c5a05a20ee Mon Sep 17 00:00:00 2001 From: Sebastian Bernauer Date: Fri, 1 Aug 2025 11:02:59 +0200 Subject: [PATCH 5/5] changelog --- CHANGELOG.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 2d096026..65dcc281 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -7,7 +7,7 @@ - Don't panic on invalid authorization config. Previously, a missing OPA ConfigMap would crash the operator ([#667]). - Fix OPA authorization for Airflow 3. Airflow 3 needs to be configured via env variables, the operator now does this correctly ([#668]). -[#665]: https://github.com/stackabletech/airflow-operator/pull/665 +[#667]: https://github.com/stackabletech/airflow-operator/pull/667 [#668]: https://github.com/stackabletech/airflow-operator/pull/668 ## [25.7.0] - 2025-07-23