diff --git a/CHANGELOG.md b/CHANGELOG.md index 52fcc1e6..53f803f6 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -9,6 +9,7 @@ - Use `--file-log-rotation-period` (or `FILE_LOG_ROTATION_PERIOD`) to configure the frequency of rotation. - Use `--console-log-format` (or `CONSOLE_LOG_FORMAT`) to set the format to `plain` (default) or `json`. - BREAKING: Added listener support for Superset ([#625]). +- Add internal headless service in addition to the metrics service and move listener logic to listener.rs ([#644]) ### Changed @@ -45,6 +46,7 @@ [#635]: https://github.com/stackabletech/superset-operator/pull/635 [#637]: https://github.com/stackabletech/superset-operator/pull/637 [#643]: https://github.com/stackabletech/superset-operator/pull/643 +[#644]: https://github.com/stackabletech/superset-operator/pull/644 ## [25.3.0] - 2025-03-21 diff --git a/rust/operator-binary/src/crd/mod.rs b/rust/operator-binary/src/crd/mod.rs index eceb1d7a..495859bc 100644 --- a/rust/operator-binary/src/crd/mod.rs +++ b/rust/operator-binary/src/crd/mod.rs @@ -32,7 +32,10 @@ use stackable_operator::{ }; use strum::{Display, EnumIter, EnumString, IntoEnumIterator}; -use crate::crd::v1alpha1::{SupersetConfigFragment, SupersetRoleConfig}; +use crate::{ + crd::v1alpha1::{SupersetConfigFragment, SupersetRoleConfig}, + listener::default_listener_class, +}; pub mod affinity; pub mod authentication; @@ -49,9 +52,6 @@ pub const MAX_LOG_FILES_SIZE: MemoryQuantity = MemoryQuantity { unit: BinaryMultiple::Mebi, }; -pub const LISTENER_VOLUME_NAME: &str = "listener"; -pub const LISTENER_VOLUME_DIR: &str = "/stackable/listener"; - pub const APP_PORT_NAME: &str = "http"; pub const APP_PORT: u16 = 8088; pub const METRICS_PORT_NAME: &str = "metrics"; @@ -311,10 +311,6 @@ impl Default for v1alpha1::SupersetRoleConfig { } } -fn default_listener_class() -> String { - "cluster-internal".to_string() -} - #[derive(Clone, Debug, Deserialize, Eq, PartialEq, Serialize)] #[serde(rename_all = "camelCase")] pub struct SupersetCredentials { diff --git a/rust/operator-binary/src/listener.rs b/rust/operator-binary/src/listener.rs new file mode 100644 index 00000000..a21218ac --- /dev/null +++ b/rust/operator-binary/src/listener.rs @@ -0,0 +1,61 @@ +use snafu::{ResultExt, Snafu}; +use stackable_operator::{builder::meta::ObjectMetaBuilder, crd::listener, kvp::ObjectLabels}; + +use crate::crd::{APP_PORT, APP_PORT_NAME, v1alpha1}; + +pub const LISTENER_VOLUME_NAME: &str = "listener"; +pub const LISTENER_VOLUME_DIR: &str = "/stackable/listener"; + +#[derive(Snafu, Debug)] +pub enum Error { + #[snafu(display("object is missing metadata to build owner reference"))] + ObjectMissingMetadataForOwnerRef { + source: stackable_operator::builder::meta::Error, + }, + #[snafu(display("failed to build Metadata"))] + MetadataBuild { + source: stackable_operator::builder::meta::Error, + }, +} + +pub fn build_group_listener( + superset: &v1alpha1::SupersetCluster, + object_labels: ObjectLabels, + listener_class: String, + listener_group_name: String, +) -> Result { + let metadata = ObjectMetaBuilder::new() + .name_and_namespace(superset) + .name(listener_group_name) + .ownerreference_from_resource(superset, None, Some(true)) + .context(ObjectMissingMetadataForOwnerRefSnafu)? + .with_recommended_labels(object_labels) + .context(MetadataBuildSnafu)? + .build(); + + let spec = listener::v1alpha1::ListenerSpec { + class_name: Some(listener_class), + ports: Some(listener_ports()), + ..Default::default() + }; + + let listener = listener::v1alpha1::Listener { + metadata, + spec, + status: None, + }; + + Ok(listener) +} + +pub fn listener_ports() -> Vec { + vec![listener::v1alpha1::ListenerPort { + name: APP_PORT_NAME.to_owned(), + port: APP_PORT.into(), + protocol: Some("TCP".to_owned()), + }] +} + +pub fn default_listener_class() -> String { + "cluster-internal".to_string() +} diff --git a/rust/operator-binary/src/main.rs b/rust/operator-binary/src/main.rs index 7c27956b..edf09574 100644 --- a/rust/operator-binary/src/main.rs +++ b/rust/operator-binary/src/main.rs @@ -42,9 +42,11 @@ mod config; mod controller_commons; mod crd; mod druid_connection_controller; +mod listener; mod operations; mod product_logging; mod rbac; +mod service; mod superset_controller; mod util; diff --git a/rust/operator-binary/src/service.rs b/rust/operator-binary/src/service.rs new file mode 100644 index 00000000..6c635bd3 --- /dev/null +++ b/rust/operator-binary/src/service.rs @@ -0,0 +1,155 @@ +use snafu::{ResultExt, Snafu}; +use stackable_operator::{ + builder::meta::ObjectMetaBuilder, + commons::product_image_selection::ResolvedProductImage, + k8s_openapi::api::core::v1::{Service, ServicePort, ServiceSpec}, + kvp::{Label, Labels}, + role_utils::RoleGroupRef, +}; + +use crate::{ + crd::{APP_NAME, APP_PORT, APP_PORT_NAME, METRICS_PORT, METRICS_PORT_NAME, v1alpha1}, + superset_controller::SUPERSET_CONTROLLER_NAME, + util::build_recommended_labels, +}; +#[derive(Debug, Snafu)] +pub enum Error { + #[snafu(display("object is missing metadata to build owner reference"))] + ObjectMissingMetadataForOwnerRef { + source: stackable_operator::builder::meta::Error, + }, + #[snafu(display("failed to build Metadata"))] + MetadataBuild { + source: stackable_operator::builder::meta::Error, + }, + #[snafu(display("failed to build Labels"))] + LabelBuild { + source: stackable_operator::kvp::LabelError, + }, +} + +/// The rolegroup [`Service`] is a headless service that allows direct access to the instances of a certain rolegroup +/// +/// This is mostly useful for internal communication between peers, or for clients that perform client-side load balancing. +pub fn build_node_rolegroup_headless_service( + superset: &v1alpha1::SupersetCluster, + resolved_product_image: &ResolvedProductImage, + rolegroup: &RoleGroupRef, +) -> Result { + let headless_service = Service { + metadata: ObjectMetaBuilder::new() + .name_and_namespace(superset) + .name(rolegroup_headless_service_name(rolegroup)) + .ownerreference_from_resource(superset, None, Some(true)) + .context(ObjectMissingMetadataForOwnerRefSnafu)? + .with_recommended_labels(build_recommended_labels( + superset, + SUPERSET_CONTROLLER_NAME, + &resolved_product_image.app_version_label, + &rolegroup.role, + &rolegroup.role_group, + )) + .context(MetadataBuildSnafu)? + .build(), + spec: Some(ServiceSpec { + // Internal communication does not need to be exposed + type_: Some("ClusterIP".to_owned()), + cluster_ip: Some("None".to_owned()), + ports: Some(service_ports()), + selector: Some( + Labels::role_group_selector( + superset, + APP_NAME, + &rolegroup.role, + &rolegroup.role_group, + ) + .context(LabelBuildSnafu)? + .into(), + ), + publish_not_ready_addresses: Some(true), + ..ServiceSpec::default() + }), + status: None, + }; + Ok(headless_service) +} + +/// The rolegroup metrics [`Service`] is a service that exposes metrics and a prometheus scraping label +pub fn build_node_rolegroup_metrics_service( + superset: &v1alpha1::SupersetCluster, + resolved_product_image: &ResolvedProductImage, + rolegroup: &RoleGroupRef, +) -> Result { + let metrics_service = Service { + metadata: ObjectMetaBuilder::new() + .name_and_namespace(superset) + .name(rolegroup_metrics_service_name(rolegroup)) + .ownerreference_from_resource(superset, None, Some(true)) + .context(ObjectMissingMetadataForOwnerRefSnafu)? + .with_recommended_labels(build_recommended_labels( + superset, + SUPERSET_CONTROLLER_NAME, + &resolved_product_image.app_version_label, + &rolegroup.role, + &rolegroup.role_group, + )) + .context(MetadataBuildSnafu)? + .with_label(Label::try_from(("prometheus.io/scrape", "true")).context(LabelBuildSnafu)?) + .build(), + spec: Some(ServiceSpec { + // Internal communication does not need to be exposed + type_: Some("ClusterIP".to_owned()), + cluster_ip: Some("None".to_owned()), + ports: Some(metrics_ports()), + selector: Some( + Labels::role_group_selector( + superset, + APP_NAME, + &rolegroup.role, + &rolegroup.role_group, + ) + .context(LabelBuildSnafu)? + .into(), + ), + publish_not_ready_addresses: Some(true), + ..ServiceSpec::default() + }), + status: None, + }; + + Ok(metrics_service) +} + +/// Headless service for cluster internal purposes only. +// TODO: Move to operator-rs +pub fn rolegroup_headless_service_name( + rolegroup: &RoleGroupRef, +) -> String { + format!("{name}-headless", name = rolegroup.object_name()) +} + +/// Headless metrics service exposes Prometheus endpoint only +// TODO: Move to operator-rs +pub fn rolegroup_metrics_service_name( + rolegroup: &RoleGroupRef, +) -> String { + format!("{name}-metrics", name = rolegroup.object_name()) +} + +fn metrics_ports() -> Vec { + vec![ServicePort { + name: Some(METRICS_PORT_NAME.to_string()), + port: METRICS_PORT.into(), + protocol: Some("TCP".to_string()), + ..ServicePort::default() + }] +} + +fn service_ports() -> Vec { + vec![ServicePort { + name: Some(APP_PORT_NAME.to_string()), + port: APP_PORT.into(), + protocol: Some("TCP".to_string()), + ..ServicePort::default() + }] +} diff --git a/rust/operator-binary/src/superset_controller.rs b/rust/operator-binary/src/superset_controller.rs index 2b835f5f..6d2f094c 100644 --- a/rust/operator-binary/src/superset_controller.rs +++ b/rust/operator-binary/src/superset_controller.rs @@ -32,14 +32,12 @@ use stackable_operator::{ }, cluster_resources::{ClusterResourceApplyStrategy, ClusterResources}, commons::{product_image_selection::ResolvedProductImage, rbac::build_rbac_resources}, - crd::{authentication::oidc, listener}, + crd::authentication::oidc, k8s_openapi::{ DeepMerge, api::{ apps::v1::{StatefulSet, StatefulSetSpec}, - core::v1::{ - ConfigMap, EnvVar, HTTPGetAction, Probe, Service, ServicePort, ServiceSpec, - }, + core::v1::{ConfigMap, EnvVar, HTTPGetAction, Probe}, }, apimachinery::pkg::{apis::meta::v1::LabelSelector, util::intstr::IntOrString}, }, @@ -48,7 +46,7 @@ use stackable_operator::{ core::{DeserializeGuard, error_boundary}, runtime::controller::Action, }, - kvp::{Label, Labels, ObjectLabels}, + kvp::{Label, Labels}, logging::controller::ReconcilerError, product_config_utils::{ CONFIG_OVERRIDE_FILE_FOOTER_KEY, CONFIG_OVERRIDE_FILE_HEADER_KEY, @@ -78,17 +76,22 @@ use crate::{ config::{self, PYTHON_IMPORTS}, controller_commons::{self, CONFIG_VOLUME_NAME, LOG_CONFIG_VOLUME_NAME, LOG_VOLUME_NAME}, crd::{ - APP_NAME, APP_PORT, APP_PORT_NAME, LISTENER_VOLUME_DIR, LISTENER_VOLUME_NAME, METRICS_PORT, - METRICS_PORT_NAME, PYTHONPATH, STACKABLE_CONFIG_DIR, STACKABLE_LOG_CONFIG_DIR, - STACKABLE_LOG_DIR, SUPERSET_CONFIG_FILENAME, SupersetConfigOptions, SupersetRole, + APP_NAME, APP_PORT, METRICS_PORT, METRICS_PORT_NAME, PYTHONPATH, STACKABLE_CONFIG_DIR, + STACKABLE_LOG_CONFIG_DIR, STACKABLE_LOG_DIR, SUPERSET_CONFIG_FILENAME, + SupersetConfigOptions, SupersetRole, authentication::{ SupersetAuthenticationClassResolved, SupersetClientAuthenticationDetailsResolved, }, - v1alpha1::{self, Container, SupersetCluster, SupersetClusterStatus, SupersetConfig}, + v1alpha1::{Container, SupersetCluster, SupersetClusterStatus, SupersetConfig}, }, + listener::{LISTENER_VOLUME_DIR, LISTENER_VOLUME_NAME, build_group_listener}, operations::{graceful_shutdown::add_graceful_shutdown_config, pdb::add_pdbs}, product_logging::{LOG_CONFIG_FILE, extend_config_map_with_log_config}, - util::{build_recommended_labels, rolegroup_metrics_service_name}, + service::{ + build_node_rolegroup_headless_service, build_node_rolegroup_metrics_service, + rolegroup_headless_service_name, + }, + util::build_recommended_labels, }; pub const SUPERSET_CONTROLLER_NAME: &str = "supersetcluster"; @@ -298,6 +301,10 @@ pub enum Error { ApplyGroupListener { source: stackable_operator::cluster_resources::Error, }, + #[snafu(display("failed to configure listener"))] + ListenerConfiguration { source: crate::listener::Error }, + #[snafu(display("faild to configure service"))] + ServiceConfiguration { source: crate::service::Error }, } type Result = std::result::Result; @@ -410,8 +417,6 @@ pub async fn reconcile_superset( .merged_config(&SupersetRole::Node, &rolegroup) .context(FailedToResolveConfigSnafu)?; - let rg_service = - build_node_rolegroup_service(superset, &resolved_product_image, &rolegroup)?; let rg_configmap = build_rolegroup_config_map( superset, &resolved_product_image, @@ -432,12 +437,28 @@ pub async fn reconcile_superset( &config, )?; + let rg_metrics_service = + build_node_rolegroup_metrics_service(superset, &resolved_product_image, &rolegroup) + .context(ServiceConfigurationSnafu)?; + + let rg_headless_service = + build_node_rolegroup_headless_service(superset, &resolved_product_image, &rolegroup) + .context(ServiceConfigurationSnafu)?; + cluster_resources - .add(client, rg_service) + .add(client, rg_metrics_service) .await .with_context(|_| ApplyRoleGroupServiceSnafu { rolegroup: rolegroup.clone(), })?; + + cluster_resources + .add(client, rg_headless_service) + .await + .with_context(|_| ApplyRoleGroupServiceSnafu { + rolegroup: rolegroup.clone(), + })?; + cluster_resources .add(client, rg_configmap) .await @@ -467,7 +488,8 @@ pub async fn reconcile_superset( ), listener_class.to_string(), listener_group_name, - )?; + ) + .context(ListenerConfigurationSnafu)?; cluster_resources .add(client, group_listener) .await @@ -616,99 +638,10 @@ fn build_rolegroup_config_map( }) } -/// The rolegroup [`Service`] is a headless service that allows direct access to the instances of a certain rolegroup -/// -/// This is mostly useful for internal communication between peers, or for clients that perform client-side load balancing. -fn build_node_rolegroup_service( - superset: &SupersetCluster, - resolved_product_image: &ResolvedProductImage, - rolegroup: &RoleGroupRef, -) -> Result { - let metadata = ObjectMetaBuilder::new() - .name_and_namespace(superset) - .name(rolegroup_metrics_service_name(&rolegroup.object_name())) - .ownerreference_from_resource(superset, None, Some(true)) - .context(ObjectMissingMetadataForOwnerRefSnafu)? - .with_recommended_labels(build_recommended_labels( - superset, - SUPERSET_CONTROLLER_NAME, - &resolved_product_image.app_version_label, - &rolegroup.role, - &rolegroup.role_group, - )) - .context(MetadataBuildSnafu)? - .with_label(Label::try_from(("prometheus.io/scrape", "true")).context(LabelBuildSnafu)?) - .build(); - - let spec = Some(ServiceSpec { - // Internal communication does not need to be exposed - type_: Some("ClusterIP".to_owned()), - cluster_ip: Some("None".to_owned()), - ports: Some(vec![ServicePort { - name: Some(METRICS_PORT_NAME.to_owned()), - port: METRICS_PORT.into(), - protocol: Some("TCP".to_owned()), - ..ServicePort::default() - }]), - selector: Some( - Labels::role_group_selector(superset, APP_NAME, &rolegroup.role, &rolegroup.role_group) - .context(LabelBuildSnafu)? - .into(), - ), - publish_not_ready_addresses: Some(true), - ..ServiceSpec::default() - }); - - let service = Service { - metadata, - spec, - status: None, - }; - - Ok(service) -} - -pub fn build_group_listener( - superset: &v1alpha1::SupersetCluster, - object_labels: ObjectLabels, - listener_class: String, - listener_group_name: String, -) -> Result { - let metadata = ObjectMetaBuilder::new() - .name_and_namespace(superset) - .name(listener_group_name) - .ownerreference_from_resource(superset, None, Some(true)) - .context(ObjectMissingMetadataForOwnerRefSnafu)? - .with_recommended_labels(object_labels) - .context(MetadataBuildSnafu)? - .build(); - - let spec = listener::v1alpha1::ListenerSpec { - class_name: Some(listener_class), - ports: Some(listener_ports()), - ..Default::default() - }; - - let listener = listener::v1alpha1::Listener { - metadata, - spec, - status: None, - }; - - Ok(listener) -} - -fn listener_ports() -> Vec { - vec![listener::v1alpha1::ListenerPort { - name: APP_PORT_NAME.to_owned(), - port: APP_PORT.into(), - protocol: Some("TCP".to_owned()), - }] -} - /// The rolegroup [`StatefulSet`] runs the rolegroup, as configured by the administrator. /// -/// The [`Pod`](`stackable_operator::k8s_openapi::api::core::v1::Pod`)s are accessible through the corresponding [`Service`] (from [`build_node_rolegroup_service`]). +/// The [`Pod`](`stackable_operator::k8s_openapi::api::core::v1::Pod`)s are accessible through the corresponding +/// [`Service`](`stackable_operator::k8s_openapi::api::core::v1::Service`) (via [`build_node_rolegroup_headless_service`] and metrics from [`build_node_rolegroup_metrics_service`]). #[allow(clippy::too_many_arguments)] fn build_server_rolegroup_statefulset( superset: &SupersetCluster, @@ -991,7 +924,7 @@ fn build_server_rolegroup_statefulset( ), ..LabelSelector::default() }, - service_name: Some(rolegroup_metrics_service_name(&rolegroup_ref.object_name())), + service_name: Some(rolegroup_headless_service_name(rolegroup_ref)), template: pod_template, volume_claim_templates: pvcs, ..StatefulSetSpec::default() diff --git a/rust/operator-binary/src/util.rs b/rust/operator-binary/src/util.rs index b199f21b..0ddf70e7 100644 --- a/rust/operator-binary/src/util.rs +++ b/rust/operator-binary/src/util.rs @@ -48,7 +48,3 @@ pub fn build_recommended_labels<'a, T>( role_group, } } - -pub fn rolegroup_metrics_service_name(role_group_ref_object_name: &str) -> String { - format!("{role_group_ref_object_name}-metrics") -} diff --git a/tests/templates/kuttl/external-access/30-assert.yaml b/tests/templates/kuttl/external-access/30-assert.yaml index ec9af79b..b6d41551 100644 --- a/tests/templates/kuttl/external-access/30-assert.yaml +++ b/tests/templates/kuttl/external-access/30-assert.yaml @@ -37,3 +37,10 @@ metadata: name: superset-node-default-metrics spec: type: ClusterIP +--- +apiVersion: v1 +kind: Service +metadata: + name: superset-node-default-headless +spec: + type: ClusterIP