Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,10 @@
### Fixed

- Don't panic on invalid authorization config. Previously, a missing OPA ConfigMap would crash the operator ([#667]).
- getting_started: Add a 120 second timeout before trying to enable the DAG ([#665]).
- Fix OPA authorization for Airflow 3. Airflow 3 needs to be configured via env variables, the operator now does this correctly ([#668]).

[#665]: https://github.com/stackabletech/airflow-operator/pull/665
[#667]: https://github.com/stackabletech/airflow-operator/pull/667
[#668]: https://github.com/stackabletech/airflow-operator/pull/668

## [25.7.0] - 2025-07-23

Expand Down
9 changes: 7 additions & 2 deletions rust/operator-binary/src/airflow_controller.rs
Original file line number Diff line number Diff line change
Expand Up @@ -723,8 +723,13 @@ fn build_rolegroup_config_map(
let mut config: BTreeMap<String, String> = BTreeMap::new();

// this will call default values from AirflowClientAuthenticationDetails
config::add_airflow_config(&mut config, authentication_config, authorization_config)
.context(ConstructConfigSnafu)?;
config::add_airflow_config(
&mut config,
authentication_config,
authorization_config,
&resolved_product_image.product_version,
)
.context(ConstructConfigSnafu)?;

tracing::debug!(
"Default config for {}: {:?}",
Expand Down
91 changes: 55 additions & 36 deletions rust/operator-binary/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ use crate::crd::{
AirflowAuthenticationClassResolved, AirflowClientAuthenticationDetailsResolved,
DEFAULT_OIDC_PROVIDER, FlaskRolesSyncMoment,
},
authorization::{AirflowAuthorizationResolved, OpaConfigResolved},
authorization::AirflowAuthorizationResolved,
};

pub const PYTHON_IMPORTS: &[&str] = &[
Expand Down Expand Up @@ -41,6 +41,7 @@ pub fn add_airflow_config(
config: &mut BTreeMap<String, String>,
authentication_config: &AirflowClientAuthenticationDetailsResolved,
authorization_config: &AirflowAuthorizationResolved,
product_version: &str,
) -> Result<()> {
if !config.contains_key(&*AirflowConfigOptions::AuthType.to_string()) {
config.insert(
Expand All @@ -51,7 +52,7 @@ pub fn add_airflow_config(
}

append_authentication_config(config, authentication_config)?;
append_authorization_config(config, authorization_config)?;
append_authorization_config(config, authorization_config, product_version);

Ok(())
}
Expand Down Expand Up @@ -275,32 +276,30 @@ fn append_oidc_config(
fn append_authorization_config(
config: &mut BTreeMap<String, String>,
authorization_config: &AirflowAuthorizationResolved,
) -> Result<(), Error> {
if let Some(opa_config) = &authorization_config.opa {
append_opa_config(config, opa_config)?;
product_version: &str,
) {
// See `env_vars::authorization_env_vars` for why we only care about Airflow 2
if !product_version.starts_with("2.") {
return;
}
let Some(opa_config) = &authorization_config.opa else {
return;
};

Ok(())
}

fn append_opa_config(
config: &mut BTreeMap<String, String>,
opa_config: &OpaConfigResolved,
) -> Result<(), Error> {
config.insert(
AirflowConfigOptions::AuthOpaRequestUrl.to_string(),
opa_config.connection_string.to_owned(),
);
config.insert(
AirflowConfigOptions::AuthOpaCacheTtlInSec.to_string(),
opa_config.cache_entry_time_to_live.as_secs().to_string(),
);
config.insert(
AirflowConfigOptions::AuthOpaCacheMaxsize.to_string(),
opa_config.cache_max_entries.to_string(),
);

Ok(())
config.extend([
(
AirflowConfigOptions::AuthOpaRequestUrl.to_string(),
opa_config.connection_string.to_owned(),
),
(
AirflowConfigOptions::AuthOpaCacheTtlInSec.to_string(),
opa_config.cache_entry_time_to_live.as_secs().to_string(),
),
(
AirflowConfigOptions::AuthOpaCacheMaxsize.to_string(),
opa_config.cache_max_entries.to_string(),
),
]);
}

#[cfg(test)]
Expand All @@ -325,6 +324,8 @@ mod tests {
},
};

const TEST_AIRFLOW_VERSION: &str = "3.0.1";

#[test]
fn test_auth_db_config() {
let authentication_config = AirflowClientAuthenticationDetailsResolved {
Expand All @@ -337,7 +338,13 @@ mod tests {
let authorization_config = AirflowAuthorizationResolved { opa: None };

let mut result = BTreeMap::new();
add_airflow_config(&mut result, &authentication_config, &authorization_config).expect("Ok");
add_airflow_config(
&mut result,
&authentication_config,
&authorization_config,
TEST_AIRFLOW_VERSION,
)
.expect("Ok");

assert_eq!(
BTreeMap::from([
Expand Down Expand Up @@ -382,7 +389,13 @@ mod tests {
let authorization_config = AirflowAuthorizationResolved { opa: None };

let mut result = BTreeMap::new();
add_airflow_config(&mut result, &authentication_config, &authorization_config).expect("Ok");
add_airflow_config(
&mut result,
&authentication_config,
&authorization_config,
TEST_AIRFLOW_VERSION,
)
.expect("Ok");

assert_eq!(BTreeMap::from([
("AUTH_LDAP_ALLOW_SELF_SIGNED".into(), "false".into()),
Expand Down Expand Up @@ -468,7 +481,13 @@ mod tests {
let authorization_config = AirflowAuthorizationResolved { opa: None };

let mut result = BTreeMap::new();
add_airflow_config(&mut result, &authentication_config, &authorization_config).expect("Ok");
add_airflow_config(
&mut result,
&authentication_config,
&authorization_config,
TEST_AIRFLOW_VERSION,
)
.expect("Ok");

assert_eq!(
BTreeMap::from([
Expand Down Expand Up @@ -532,16 +551,16 @@ mod tests {
};

let mut result = BTreeMap::new();
add_airflow_config(&mut result, &authentication_config, &authorization_config).expect("Ok");
add_airflow_config(
&mut result,
&authentication_config,
&authorization_config,
TEST_AIRFLOW_VERSION,
)
.expect("Ok");

assert_eq!(
BTreeMap::from([
("AUTH_OPA_CACHE_MAXSIZE".into(), "1000".into()),
("AUTH_OPA_CACHE_TTL_IN_SEC".into(), "30".into()),
(
"AUTH_OPA_REQUEST_URL".into(),
"http://opa:8081/v1/data/airflow".into()
),
("AUTH_ROLES_SYNC_AT_LOGIN".into(), "false".into()),
("AUTH_TYPE".into(), "AUTH_DB".into()),
("AUTH_USER_REGISTRATION".into(), "true".into()),
Expand Down
3 changes: 3 additions & 0 deletions rust/operator-binary/src/crd/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,9 @@ pub enum AirflowConfigOptions {
AuthLdapTlsKeyfile,
AuthLdapTlsCacertfile,
AuthLdapAllowSelfSigned,
// OPA configs for Airflow 2
// Airflow 3 configs need to be passed via env variables!
// See `env_vars::authorization_env_vars` for details
AuthOpaCacheMaxsize,
AuthOpaCacheTtlInSec,
AuthOpaRequestUrl,
Expand Down
128 changes: 118 additions & 10 deletions rust/operator-binary/src/env_vars.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,11 @@ use crate::{
};

const AIRFLOW_CORE_AUTH_MANAGER: &str = "AIRFLOW__CORE__AUTH_MANAGER";
// Airflow 3 envs
const AIRFLOW_CORE_AUTH_OPA_REQUEST_URL: &str = "AIRFLOW__CORE__AUTH_OPA_REQUEST_URL";
const AIRFLOW_CORE_AUTH_OPA_CACHE_TTL_IN_SEC: &str = "AIRFLOW__CORE__AUTH_OPA_CACHE_TTL_IN_SEC";
const AIRFLOW_CORE_AUTH_OPA_CACHE_MAXSIZE: &str = "AIRFLOW__CORE__AUTH_OPA_CACHE_MAXSIZE";

const AIRFLOW_LOGGING_LOGGING_CONFIG_CLASS: &str = "AIRFLOW__LOGGING__LOGGING_CONFIG_CLASS";
const AIRFLOW_METRICS_STATSD_ON: &str = "AIRFLOW__METRICS__STATSD_ON";
const AIRFLOW_METRICS_STATSD_HOST: &str = "AIRFLOW__METRICS__STATSD_HOST";
Expand Down Expand Up @@ -231,7 +236,10 @@ pub fn build_airflow_statefulset_envs(
}
AirflowRole::Webserver => {
let mut vars = authentication_env_vars(auth_config);
vars.extend(authorization_env_vars(authorization_config));
vars.extend(authorization_env_vars(
authorization_config,
&resolved_product_image.product_version,
));
env.extend(vars.into_iter().map(|var| (var.name.to_owned(), var)));
}
_ => {}
Expand Down Expand Up @@ -554,15 +562,45 @@ fn authentication_env_vars(
.collect()
}

fn authorization_env_vars(authorization_config: &AirflowAuthorizationResolved) -> Vec<EnvVar> {
let mut env = vec![];

if authorization_config.opa.is_some() {
env.push(EnvVar {
name: AIRFLOW_CORE_AUTH_MANAGER.into(),
value: Some("opa_auth_manager.opa_fab_auth_manager.OpaFabAuthManager".to_string()),
..Default::default()
});
/// Constructs the needed authorization env vars for the specific Airflow version.
///
/// `AIRFLOW__CORE__AUTH_MANAGER` always needs to be set as env var.
///
/// Airflow 2 needs to OPA settings in the `webserver_config.py` such as `AUTH_OPA_REQUEST_URL`.
/// Airflow 3 needs to OPA settings as env variables such as `AIRFLOW__CORE__AUTH_OPA_REQUEST_URL`.
fn authorization_env_vars(
authorization_config: &AirflowAuthorizationResolved,
product_version: &str,
) -> Vec<EnvVar> {
let Some(opa) = &authorization_config.opa else {
return vec![];
};

let mut env = vec![EnvVar {
name: AIRFLOW_CORE_AUTH_MANAGER.into(),
value: Some("opa_auth_manager.opa_fab_auth_manager.OpaFabAuthManager".to_string()),
..Default::default()
}];
if product_version.starts_with("2.") {
// OPA config needs to go into `webserver_config.py`
} else {
env.extend([
EnvVar {
name: AIRFLOW_CORE_AUTH_OPA_REQUEST_URL.into(),
value: Some(opa.connection_string.to_owned()),
..Default::default()
},
EnvVar {
name: AIRFLOW_CORE_AUTH_OPA_CACHE_TTL_IN_SEC.into(),
value: Some(opa.cache_entry_time_to_live.as_secs().to_string()),
..Default::default()
},
EnvVar {
name: AIRFLOW_CORE_AUTH_OPA_CACHE_MAXSIZE.into(),
value: Some(opa.cache_max_entries.to_string()),
..Default::default()
},
]);
}

env
Expand Down Expand Up @@ -600,3 +638,73 @@ fn execution_server_env_vars(airflow: &v1alpha1::AirflowCluster) -> BTreeMap<Str

env
}

#[cfg(test)]
mod tests {

use stackable_operator::time::Duration;

use super::*;
use crate::crd::authorization::OpaConfigResolved;

#[test]
fn test_airflow_2_authorization_env_vars() {
let authorization_config = get_test_authorization_config();
let authorization_env_vars = authorization_env_vars(&authorization_config, "2.10.5");
let authorization_env_vars = authorization_env_vars
.into_iter()
.map(|env| (env.name, env.value.expect("env var value must be present")))
.collect::<Vec<_>>();

assert_eq!(
authorization_env_vars,
[(
"AIRFLOW__CORE__AUTH_MANAGER".into(),
"opa_auth_manager.opa_fab_auth_manager.OpaFabAuthManager".into()
),]
);
}

#[test]
fn test_airflow_3_authorization_env_vars() {
let authorization_config = get_test_authorization_config();
let authorization_env_vars = authorization_env_vars(&authorization_config, "3.0.1");
let authorization_env_vars = authorization_env_vars
.into_iter()
.map(|env| (env.name, env.value.expect("env var value must be present")))
.collect::<Vec<_>>();

assert_eq!(
authorization_env_vars,
[
(
"AIRFLOW__CORE__AUTH_MANAGER".into(),
"opa_auth_manager.opa_fab_auth_manager.OpaFabAuthManager".into()
),
(
"AIRFLOW__CORE__AUTH_OPA_REQUEST_URL".into(),
"http://opa-server.default.svc.cluster.local:8081/v1/data/airflow".into()
),
(
"AIRFLOW__CORE__AUTH_OPA_CACHE_TTL_IN_SEC".into(),
"30".into()
),
(
"AIRFLOW__CORE__AUTH_OPA_CACHE_MAXSIZE".into(),
"1000".into()
),
]
);
}

fn get_test_authorization_config() -> AirflowAuthorizationResolved {
AirflowAuthorizationResolved {
opa: Some(OpaConfigResolved {
connection_string:
"http://opa-server.default.svc.cluster.local:8081/v1/data/airflow".to_string(),
cache_entry_time_to_live: Duration::from_secs(30),
cache_max_entries: 1000,
}),
}
}
}
2 changes: 1 addition & 1 deletion tests/templates/kuttl/opa/20-assert.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -3,4 +3,4 @@ apiVersion: kuttl.dev/v1beta1
kind: TestAssert
timeout: 300
commands:
- script: kubectl -n $NAMESPACE rollout status daemonset opa-server-default --timeout 300s
- script: kubectl -n $NAMESPACE wait --for=condition=available --timeout=10m opacluster/test-opa
4 changes: 3 additions & 1 deletion tests/templates/kuttl/opa/20-install-opa.yaml.j2
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,9 @@ metadata:
apiVersion: opa.stackable.tech/v1alpha1
kind: OpaCluster
metadata:
name: opa
# The OpaCluster is intentionally not only called "opa" to ensure that our custom OPA URL is
# used and not some default value of "opa".
name: test-opa
spec:
image:
{% if test_scenario['values']['opa-latest'].find(",") > 0 %}
Expand Down
6 changes: 4 additions & 2 deletions tests/templates/kuttl/opa/30-install-airflow.yaml.j2
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ spec:
clusterConfig:
authorization:
opa:
configMapName: opa
configMapName: test-opa
package: airflow
cache:
entryTimeToLive: 5s
Expand All @@ -54,7 +54,9 @@ spec:
configOverrides:
webserver_config.py:
WTF_CSRF_ENABLED: "False" # Allow "POST /login/" without CSRF token
AUTH_OPA_CACHE_MAXSIZE_DEFAULT: "0" # disable decision caching for easy debugging
AUTH_OPA_CACHE_MAXSIZE: "0" # Airflow 2: Disable decision caching for easy debugging
envOverrides:
AIRFLOW__CORE__AUTH_OPA_CACHE_MAXSIZE: "0" # Airflow 3: Disable decision caching for easy debugging
roleGroups:
default:
replicas: 1
Expand Down