From 6976021ad198ae1ca79df7ba1c5ddac6da1c43c9 Mon Sep 17 00:00:00 2001 From: Maxi Wittich Date: Thu, 3 Jul 2025 15:22:39 +0200 Subject: [PATCH 1/8] Add headless service, move listener logic to own file --- rust/operator-binary/src/crd/mod.rs | 51 ++++- rust/operator-binary/src/listener.rs | 61 ++++++ rust/operator-binary/src/main.rs | 1 + .../src/superset_controller.rs | 198 +++++++++--------- 4 files changed, 203 insertions(+), 108 deletions(-) create mode 100644 rust/operator-binary/src/listener.rs diff --git a/rust/operator-binary/src/crd/mod.rs b/rust/operator-binary/src/crd/mod.rs index eceb1d7a..65ffd8cc 100644 --- a/rust/operator-binary/src/crd/mod.rs +++ b/rust/operator-binary/src/crd/mod.rs @@ -19,7 +19,7 @@ use stackable_operator::{ fragment::{self, Fragment, ValidationError}, merge::Merge, }, - k8s_openapi::apimachinery::pkg::api::resource::Quantity, + k8s_openapi::{api::core::v1::ServicePort, apimachinery::pkg::api::resource::Quantity}, kube::{CustomResource, ResourceExt, runtime::reflector::ObjectRef}, memory::{BinaryMultiple, MemoryQuantity}, product_config_utils::{self, Configuration}, @@ -32,7 +32,10 @@ use stackable_operator::{ }; use strum::{Display, EnumIter, EnumString, IntoEnumIterator}; -use crate::crd::v1alpha1::{SupersetConfigFragment, SupersetRoleConfig}; +use crate::{ + crd::v1alpha1::{SupersetConfigFragment, SupersetRoleConfig}, + listener::default_listener_class, +}; pub mod affinity; pub mod authentication; @@ -49,9 +52,6 @@ pub const MAX_LOG_FILES_SIZE: MemoryQuantity = MemoryQuantity { unit: BinaryMultiple::Mebi, }; -pub const LISTENER_VOLUME_NAME: &str = "listener"; -pub const LISTENER_VOLUME_DIR: &str = "/stackable/listener"; - pub const APP_PORT_NAME: &str = "http"; pub const APP_PORT: u16 = 8088; pub const METRICS_PORT_NAME: &str = "metrics"; @@ -311,10 +311,6 @@ impl Default for v1alpha1::SupersetRoleConfig { } } -fn default_listener_class() -> String { - "cluster-internal".to_string() -} - #[derive(Clone, Debug, Deserialize, Eq, PartialEq, Serialize)] #[serde(rename_all = "camelCase")] pub struct SupersetCredentials { @@ -528,6 +524,43 @@ impl v1alpha1::SupersetCluster { } } + /// Set of functions to define service names on rolegroup level. + /// Headless service for cluster internal purposes only. + // TODO: Move to operator-rs + pub fn rolegroup_headless_service_name( + &self, + rolegroup: &RoleGroupRef, + ) -> String { + format!("{name}-headless", name = rolegroup.object_name()) + } + + /// Headless metrics service exposes Prometheus endpoint only + // TODO: Move to operator-rs + pub fn rolegroup_headless_metrics_service_name( + &self, + rolegroup: &RoleGroupRef, + ) -> String { + format!("{name}-metrics", name = rolegroup.object_name()) + } + + pub fn metrics_ports(&self) -> Vec { + vec![ServicePort { + name: Some(METRICS_PORT_NAME.to_string()), + port: METRICS_PORT.into(), + protocol: Some("TCP".to_string()), + ..ServicePort::default() + }] + } + + pub fn service_ports(&self) -> Vec { + vec![ServicePort { + name: Some(APP_PORT_NAME.to_string()), + port: APP_PORT.into(), + protocol: Some("TCP".to_string()), + ..ServicePort::default() + }] + } + pub fn generic_role_config(&self, role: &SupersetRole) -> Option { self.get_role_config(role).map(|r| r.common.to_owned()) } diff --git a/rust/operator-binary/src/listener.rs b/rust/operator-binary/src/listener.rs new file mode 100644 index 00000000..a21218ac --- /dev/null +++ b/rust/operator-binary/src/listener.rs @@ -0,0 +1,61 @@ +use snafu::{ResultExt, Snafu}; +use stackable_operator::{builder::meta::ObjectMetaBuilder, crd::listener, kvp::ObjectLabels}; + +use crate::crd::{APP_PORT, APP_PORT_NAME, v1alpha1}; + +pub const LISTENER_VOLUME_NAME: &str = "listener"; +pub const LISTENER_VOLUME_DIR: &str = "/stackable/listener"; + +#[derive(Snafu, Debug)] +pub enum Error { + #[snafu(display("object is missing metadata to build owner reference"))] + ObjectMissingMetadataForOwnerRef { + source: stackable_operator::builder::meta::Error, + }, + #[snafu(display("failed to build Metadata"))] + MetadataBuild { + source: stackable_operator::builder::meta::Error, + }, +} + +pub fn build_group_listener( + superset: &v1alpha1::SupersetCluster, + object_labels: ObjectLabels, + listener_class: String, + listener_group_name: String, +) -> Result { + let metadata = ObjectMetaBuilder::new() + .name_and_namespace(superset) + .name(listener_group_name) + .ownerreference_from_resource(superset, None, Some(true)) + .context(ObjectMissingMetadataForOwnerRefSnafu)? + .with_recommended_labels(object_labels) + .context(MetadataBuildSnafu)? + .build(); + + let spec = listener::v1alpha1::ListenerSpec { + class_name: Some(listener_class), + ports: Some(listener_ports()), + ..Default::default() + }; + + let listener = listener::v1alpha1::Listener { + metadata, + spec, + status: None, + }; + + Ok(listener) +} + +pub fn listener_ports() -> Vec { + vec![listener::v1alpha1::ListenerPort { + name: APP_PORT_NAME.to_owned(), + port: APP_PORT.into(), + protocol: Some("TCP".to_owned()), + }] +} + +pub fn default_listener_class() -> String { + "cluster-internal".to_string() +} diff --git a/rust/operator-binary/src/main.rs b/rust/operator-binary/src/main.rs index 7c27956b..bd3c4987 100644 --- a/rust/operator-binary/src/main.rs +++ b/rust/operator-binary/src/main.rs @@ -42,6 +42,7 @@ mod config; mod controller_commons; mod crd; mod druid_connection_controller; +mod listener; mod operations; mod product_logging; mod rbac; diff --git a/rust/operator-binary/src/superset_controller.rs b/rust/operator-binary/src/superset_controller.rs index 2b835f5f..476e2fd3 100644 --- a/rust/operator-binary/src/superset_controller.rs +++ b/rust/operator-binary/src/superset_controller.rs @@ -32,14 +32,12 @@ use stackable_operator::{ }, cluster_resources::{ClusterResourceApplyStrategy, ClusterResources}, commons::{product_image_selection::ResolvedProductImage, rbac::build_rbac_resources}, - crd::{authentication::oidc, listener}, + crd::authentication::oidc, k8s_openapi::{ DeepMerge, api::{ apps::v1::{StatefulSet, StatefulSetSpec}, - core::v1::{ - ConfigMap, EnvVar, HTTPGetAction, Probe, Service, ServicePort, ServiceSpec, - }, + core::v1::{ConfigMap, EnvVar, HTTPGetAction, Probe, Service, ServiceSpec}, }, apimachinery::pkg::{apis::meta::v1::LabelSelector, util::intstr::IntOrString}, }, @@ -48,7 +46,7 @@ use stackable_operator::{ core::{DeserializeGuard, error_boundary}, runtime::controller::Action, }, - kvp::{Label, Labels, ObjectLabels}, + kvp::{Label, Labels}, logging::controller::ReconcilerError, product_config_utils::{ CONFIG_OVERRIDE_FILE_FOOTER_KEY, CONFIG_OVERRIDE_FILE_HEADER_KEY, @@ -78,14 +76,15 @@ use crate::{ config::{self, PYTHON_IMPORTS}, controller_commons::{self, CONFIG_VOLUME_NAME, LOG_CONFIG_VOLUME_NAME, LOG_VOLUME_NAME}, crd::{ - APP_NAME, APP_PORT, APP_PORT_NAME, LISTENER_VOLUME_DIR, LISTENER_VOLUME_NAME, METRICS_PORT, - METRICS_PORT_NAME, PYTHONPATH, STACKABLE_CONFIG_DIR, STACKABLE_LOG_CONFIG_DIR, - STACKABLE_LOG_DIR, SUPERSET_CONFIG_FILENAME, SupersetConfigOptions, SupersetRole, + APP_NAME, APP_PORT, METRICS_PORT, METRICS_PORT_NAME, PYTHONPATH, STACKABLE_CONFIG_DIR, + STACKABLE_LOG_CONFIG_DIR, STACKABLE_LOG_DIR, SUPERSET_CONFIG_FILENAME, + SupersetConfigOptions, SupersetRole, authentication::{ SupersetAuthenticationClassResolved, SupersetClientAuthenticationDetailsResolved, }, - v1alpha1::{self, Container, SupersetCluster, SupersetClusterStatus, SupersetConfig}, + v1alpha1::{Container, SupersetCluster, SupersetClusterStatus, SupersetConfig}, }, + listener::{LISTENER_VOLUME_DIR, LISTENER_VOLUME_NAME, build_group_listener}, operations::{graceful_shutdown::add_graceful_shutdown_config, pdb::add_pdbs}, product_logging::{LOG_CONFIG_FILE, extend_config_map_with_log_config}, util::{build_recommended_labels, rolegroup_metrics_service_name}, @@ -298,6 +297,8 @@ pub enum Error { ApplyGroupListener { source: stackable_operator::cluster_resources::Error, }, + #[snafu(display("failed to configure listener"))] + ListenerConfiguration { source: crate::listener::Error }, } type Result = std::result::Result; @@ -410,8 +411,6 @@ pub async fn reconcile_superset( .merged_config(&SupersetRole::Node, &rolegroup) .context(FailedToResolveConfigSnafu)?; - let rg_service = - build_node_rolegroup_service(superset, &resolved_product_image, &rolegroup)?; let rg_configmap = build_rolegroup_config_map( superset, &resolved_product_image, @@ -431,13 +430,16 @@ pub async fn reconcile_superset( &rbac_sa.name_any(), &config, )?; - - cluster_resources - .add(client, rg_service) - .await - .with_context(|_| ApplyRoleGroupServiceSnafu { - rolegroup: rolegroup.clone(), - })?; + for rg_service in + build_node_rolegroup_services(superset, &resolved_product_image, &rolegroup)? + { + cluster_resources + .add(client, rg_service) + .await + .with_context(|_| ApplyRoleGroupServiceSnafu { + rolegroup: rolegroup.clone(), + })?; + } cluster_resources .add(client, rg_configmap) .await @@ -467,7 +469,8 @@ pub async fn reconcile_superset( ), listener_class.to_string(), listener_group_name, - )?; + ) + .context(ListenerConfigurationSnafu)?; cluster_resources .add(client, group_listener) .await @@ -619,93 +622,90 @@ fn build_rolegroup_config_map( /// The rolegroup [`Service`] is a headless service that allows direct access to the instances of a certain rolegroup /// /// This is mostly useful for internal communication between peers, or for clients that perform client-side load balancing. -fn build_node_rolegroup_service( +fn build_node_rolegroup_services( superset: &SupersetCluster, resolved_product_image: &ResolvedProductImage, rolegroup: &RoleGroupRef, -) -> Result { - let metadata = ObjectMetaBuilder::new() - .name_and_namespace(superset) - .name(rolegroup_metrics_service_name(&rolegroup.object_name())) - .ownerreference_from_resource(superset, None, Some(true)) - .context(ObjectMissingMetadataForOwnerRefSnafu)? - .with_recommended_labels(build_recommended_labels( - superset, - SUPERSET_CONTROLLER_NAME, - &resolved_product_image.app_version_label, - &rolegroup.role, - &rolegroup.role_group, - )) - .context(MetadataBuildSnafu)? - .with_label(Label::try_from(("prometheus.io/scrape", "true")).context(LabelBuildSnafu)?) - .build(); - - let spec = Some(ServiceSpec { - // Internal communication does not need to be exposed - type_: Some("ClusterIP".to_owned()), - cluster_ip: Some("None".to_owned()), - ports: Some(vec![ServicePort { - name: Some(METRICS_PORT_NAME.to_owned()), - port: METRICS_PORT.into(), - protocol: Some("TCP".to_owned()), - ..ServicePort::default() - }]), - selector: Some( - Labels::role_group_selector(superset, APP_NAME, &rolegroup.role, &rolegroup.role_group) - .context(LabelBuildSnafu)? - .into(), - ), - publish_not_ready_addresses: Some(true), - ..ServiceSpec::default() - }); - - let service = Service { - metadata, - spec, - status: None, - }; +) -> Result> { + let service = vec![ + Service { + metadata: ObjectMetaBuilder::new() + .name_and_namespace(superset) + .name(superset.rolegroup_headless_metrics_service_name(&rolegroup)) + .ownerreference_from_resource(superset, None, Some(true)) + .context(ObjectMissingMetadataForOwnerRefSnafu)? + .with_recommended_labels(build_recommended_labels( + superset, + SUPERSET_CONTROLLER_NAME, + &resolved_product_image.app_version_label, + &rolegroup.role, + &rolegroup.role_group, + )) + .context(MetadataBuildSnafu)? + .with_label( + Label::try_from(("prometheus.io/scrape", "true")).context(LabelBuildSnafu)?, + ) + .build(), + spec: Some(ServiceSpec { + // Internal communication does not need to be exposed + type_: Some("ClusterIP".to_owned()), + cluster_ip: Some("None".to_owned()), + ports: Some(superset.metrics_ports()), + selector: Some( + Labels::role_group_selector( + superset, + APP_NAME, + &rolegroup.role, + &rolegroup.role_group, + ) + .context(LabelBuildSnafu)? + .into(), + ), + publish_not_ready_addresses: Some(true), + ..ServiceSpec::default() + }), + status: None, + }, + Service { + metadata: ObjectMetaBuilder::new() + .name_and_namespace(superset) + .name(superset.rolegroup_headless_service_name(&rolegroup)) + .ownerreference_from_resource(superset, None, Some(true)) + .context(ObjectMissingMetadataForOwnerRefSnafu)? + .with_recommended_labels(build_recommended_labels( + superset, + SUPERSET_CONTROLLER_NAME, + &resolved_product_image.app_version_label, + &rolegroup.role, + &rolegroup.role_group, + )) + .context(MetadataBuildSnafu)? + .build(), + spec: Some(ServiceSpec { + // Internal communication does not need to be exposed + type_: Some("ClusterIP".to_owned()), + cluster_ip: Some("None".to_owned()), + ports: Some(superset.service_ports()), + selector: Some( + Labels::role_group_selector( + superset, + APP_NAME, + &rolegroup.role, + &rolegroup.role_group, + ) + .context(LabelBuildSnafu)? + .into(), + ), + publish_not_ready_addresses: Some(true), + ..ServiceSpec::default() + }), + status: None, + }, + ]; Ok(service) } -pub fn build_group_listener( - superset: &v1alpha1::SupersetCluster, - object_labels: ObjectLabels, - listener_class: String, - listener_group_name: String, -) -> Result { - let metadata = ObjectMetaBuilder::new() - .name_and_namespace(superset) - .name(listener_group_name) - .ownerreference_from_resource(superset, None, Some(true)) - .context(ObjectMissingMetadataForOwnerRefSnafu)? - .with_recommended_labels(object_labels) - .context(MetadataBuildSnafu)? - .build(); - - let spec = listener::v1alpha1::ListenerSpec { - class_name: Some(listener_class), - ports: Some(listener_ports()), - ..Default::default() - }; - - let listener = listener::v1alpha1::Listener { - metadata, - spec, - status: None, - }; - - Ok(listener) -} - -fn listener_ports() -> Vec { - vec![listener::v1alpha1::ListenerPort { - name: APP_PORT_NAME.to_owned(), - port: APP_PORT.into(), - protocol: Some("TCP".to_owned()), - }] -} - /// The rolegroup [`StatefulSet`] runs the rolegroup, as configured by the administrator. /// /// The [`Pod`](`stackable_operator::k8s_openapi::api::core::v1::Pod`)s are accessible through the corresponding [`Service`] (from [`build_node_rolegroup_service`]). From 85b8d9ec876c6bac5a7f2b236814378e8d46ca56 Mon Sep 17 00:00:00 2001 From: Maxi Wittich Date: Thu, 3 Jul 2025 15:27:27 +0200 Subject: [PATCH 2/8] Clippy lints --- rust/operator-binary/src/superset_controller.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/rust/operator-binary/src/superset_controller.rs b/rust/operator-binary/src/superset_controller.rs index 476e2fd3..1c781612 100644 --- a/rust/operator-binary/src/superset_controller.rs +++ b/rust/operator-binary/src/superset_controller.rs @@ -631,7 +631,7 @@ fn build_node_rolegroup_services( Service { metadata: ObjectMetaBuilder::new() .name_and_namespace(superset) - .name(superset.rolegroup_headless_metrics_service_name(&rolegroup)) + .name(superset.rolegroup_headless_metrics_service_name(rolegroup)) .ownerreference_from_resource(superset, None, Some(true)) .context(ObjectMissingMetadataForOwnerRefSnafu)? .with_recommended_labels(build_recommended_labels( @@ -669,7 +669,7 @@ fn build_node_rolegroup_services( Service { metadata: ObjectMetaBuilder::new() .name_and_namespace(superset) - .name(superset.rolegroup_headless_service_name(&rolegroup)) + .name(superset.rolegroup_headless_service_name(rolegroup)) .ownerreference_from_resource(superset, None, Some(true)) .context(ObjectMissingMetadataForOwnerRefSnafu)? .with_recommended_labels(build_recommended_labels( From dd308151506f90ca1237041199cf316fb352f62a Mon Sep 17 00:00:00 2001 From: Maxi Wittich Date: Thu, 3 Jul 2025 15:31:44 +0200 Subject: [PATCH 3/8] fix docs --- rust/operator-binary/src/superset_controller.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/rust/operator-binary/src/superset_controller.rs b/rust/operator-binary/src/superset_controller.rs index 1c781612..1d073d6b 100644 --- a/rust/operator-binary/src/superset_controller.rs +++ b/rust/operator-binary/src/superset_controller.rs @@ -708,7 +708,7 @@ fn build_node_rolegroup_services( /// The rolegroup [`StatefulSet`] runs the rolegroup, as configured by the administrator. /// -/// The [`Pod`](`stackable_operator::k8s_openapi::api::core::v1::Pod`)s are accessible through the corresponding [`Service`] (from [`build_node_rolegroup_service`]). +/// The [`Pod`](`stackable_operator::k8s_openapi::api::core::v1::Pod`)s are accessible through the corresponding [`Service`] (from [`build_node_rolegroup_services`]). #[allow(clippy::too_many_arguments)] fn build_server_rolegroup_statefulset( superset: &SupersetCluster, From 1ebcbc410b9f6fac92b7dff2fcc622d581eda9be Mon Sep 17 00:00:00 2001 From: Maxi Wittich Date: Thu, 3 Jul 2025 15:38:47 +0200 Subject: [PATCH 4/8] Update test to probe for headless service --- tests/templates/kuttl/external-access/30-assert.yaml | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/tests/templates/kuttl/external-access/30-assert.yaml b/tests/templates/kuttl/external-access/30-assert.yaml index ec9af79b..b6d41551 100644 --- a/tests/templates/kuttl/external-access/30-assert.yaml +++ b/tests/templates/kuttl/external-access/30-assert.yaml @@ -37,3 +37,10 @@ metadata: name: superset-node-default-metrics spec: type: ClusterIP +--- +apiVersion: v1 +kind: Service +metadata: + name: superset-node-default-headless +spec: + type: ClusterIP From a668504fd67a5dcb135c1f13084cffa0685cbd26 Mon Sep 17 00:00:00 2001 From: Maxi Wittich Date: Thu, 3 Jul 2025 15:53:45 +0200 Subject: [PATCH 5/8] Update changelog --- CHANGELOG.md | 2 ++ 1 file changed, 2 insertions(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index 52fcc1e6..53f803f6 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -9,6 +9,7 @@ - Use `--file-log-rotation-period` (or `FILE_LOG_ROTATION_PERIOD`) to configure the frequency of rotation. - Use `--console-log-format` (or `CONSOLE_LOG_FORMAT`) to set the format to `plain` (default) or `json`. - BREAKING: Added listener support for Superset ([#625]). +- Add internal headless service in addition to the metrics service and move listener logic to listener.rs ([#644]) ### Changed @@ -45,6 +46,7 @@ [#635]: https://github.com/stackabletech/superset-operator/pull/635 [#637]: https://github.com/stackabletech/superset-operator/pull/637 [#643]: https://github.com/stackabletech/superset-operator/pull/643 +[#644]: https://github.com/stackabletech/superset-operator/pull/644 ## [25.3.0] - 2025-03-21 From a6c579aa40a57333cd978441cce97bc45a27d767 Mon Sep 17 00:00:00 2001 From: Maxi Wittich Date: Thu, 3 Jul 2025 19:33:21 +0200 Subject: [PATCH 6/8] Moving services into own module --- rust/operator-binary/src/crd/mod.rs | 39 +---- rust/operator-binary/src/main.rs | 1 + rust/operator-binary/src/service.rs | 155 ++++++++++++++++++ .../src/superset_controller.rs | 128 ++++----------- 4 files changed, 186 insertions(+), 137 deletions(-) create mode 100644 rust/operator-binary/src/service.rs diff --git a/rust/operator-binary/src/crd/mod.rs b/rust/operator-binary/src/crd/mod.rs index 65ffd8cc..495859bc 100644 --- a/rust/operator-binary/src/crd/mod.rs +++ b/rust/operator-binary/src/crd/mod.rs @@ -19,7 +19,7 @@ use stackable_operator::{ fragment::{self, Fragment, ValidationError}, merge::Merge, }, - k8s_openapi::{api::core::v1::ServicePort, apimachinery::pkg::api::resource::Quantity}, + k8s_openapi::apimachinery::pkg::api::resource::Quantity, kube::{CustomResource, ResourceExt, runtime::reflector::ObjectRef}, memory::{BinaryMultiple, MemoryQuantity}, product_config_utils::{self, Configuration}, @@ -524,43 +524,6 @@ impl v1alpha1::SupersetCluster { } } - /// Set of functions to define service names on rolegroup level. - /// Headless service for cluster internal purposes only. - // TODO: Move to operator-rs - pub fn rolegroup_headless_service_name( - &self, - rolegroup: &RoleGroupRef, - ) -> String { - format!("{name}-headless", name = rolegroup.object_name()) - } - - /// Headless metrics service exposes Prometheus endpoint only - // TODO: Move to operator-rs - pub fn rolegroup_headless_metrics_service_name( - &self, - rolegroup: &RoleGroupRef, - ) -> String { - format!("{name}-metrics", name = rolegroup.object_name()) - } - - pub fn metrics_ports(&self) -> Vec { - vec![ServicePort { - name: Some(METRICS_PORT_NAME.to_string()), - port: METRICS_PORT.into(), - protocol: Some("TCP".to_string()), - ..ServicePort::default() - }] - } - - pub fn service_ports(&self) -> Vec { - vec![ServicePort { - name: Some(APP_PORT_NAME.to_string()), - port: APP_PORT.into(), - protocol: Some("TCP".to_string()), - ..ServicePort::default() - }] - } - pub fn generic_role_config(&self, role: &SupersetRole) -> Option { self.get_role_config(role).map(|r| r.common.to_owned()) } diff --git a/rust/operator-binary/src/main.rs b/rust/operator-binary/src/main.rs index bd3c4987..edf09574 100644 --- a/rust/operator-binary/src/main.rs +++ b/rust/operator-binary/src/main.rs @@ -46,6 +46,7 @@ mod listener; mod operations; mod product_logging; mod rbac; +mod service; mod superset_controller; mod util; diff --git a/rust/operator-binary/src/service.rs b/rust/operator-binary/src/service.rs new file mode 100644 index 00000000..8ffc2d83 --- /dev/null +++ b/rust/operator-binary/src/service.rs @@ -0,0 +1,155 @@ +use snafu::{ResultExt, Snafu}; +use stackable_operator::{ + builder::meta::ObjectMetaBuilder, + commons::product_image_selection::ResolvedProductImage, + k8s_openapi::api::core::v1::{Service, ServicePort, ServiceSpec}, + kvp::{Label, Labels}, + role_utils::RoleGroupRef, +}; + +use crate::{ + crd::{APP_NAME, APP_PORT, APP_PORT_NAME, METRICS_PORT, METRICS_PORT_NAME, v1alpha1}, + superset_controller::SUPERSET_CONTROLLER_NAME, + util::build_recommended_labels, +}; +#[derive(Debug, Snafu)] +pub enum Error { + #[snafu(display("object is missing metadata to build owner reference"))] + ObjectMissingMetadataForOwnerRef { + source: stackable_operator::builder::meta::Error, + }, + #[snafu(display("failed to build Metadata"))] + MetadataBuild { + source: stackable_operator::builder::meta::Error, + }, + #[snafu(display("failed to build Labels"))] + LabelBuild { + source: stackable_operator::kvp::LabelError, + }, +} + +/// The rolegroup [`Service`] is a headless service that allows direct access to the instances of a certain rolegroup +/// +/// This is mostly useful for internal communication between peers, or for clients that perform client-side load balancing. +pub fn build_node_rolegroup_headless_service( + superset: &v1alpha1::SupersetCluster, + resolved_product_image: &ResolvedProductImage, + rolegroup: &RoleGroupRef, +) -> Result { + let headless_service = Service { + metadata: ObjectMetaBuilder::new() + .name_and_namespace(superset) + .name(rolegroup_headless_service_name(rolegroup)) + .ownerreference_from_resource(superset, None, Some(true)) + .context(ObjectMissingMetadataForOwnerRefSnafu)? + .with_recommended_labels(build_recommended_labels( + superset, + SUPERSET_CONTROLLER_NAME, + &resolved_product_image.app_version_label, + &rolegroup.role, + &rolegroup.role_group, + )) + .context(MetadataBuildSnafu)? + .build(), + spec: Some(ServiceSpec { + // Internal communication does not need to be exposed + type_: Some("ClusterIP".to_owned()), + cluster_ip: Some("None".to_owned()), + ports: Some(service_ports()), + selector: Some( + Labels::role_group_selector( + superset, + APP_NAME, + &rolegroup.role, + &rolegroup.role_group, + ) + .context(LabelBuildSnafu)? + .into(), + ), + publish_not_ready_addresses: Some(true), + ..ServiceSpec::default() + }), + status: None, + }; + Ok(headless_service) +} + +/// The rolegroup metrics [`Service`] is a service that exposes metrics and a prometheus scraping label +pub fn build_node_rolegroup_metrics_service( + superset: &v1alpha1::SupersetCluster, + resolved_product_image: &ResolvedProductImage, + rolegroup: &RoleGroupRef, +) -> Result { + let metrics_service = Service { + metadata: ObjectMetaBuilder::new() + .name_and_namespace(superset) + .name(rolegroup_headless_metrics_service_name(rolegroup)) + .ownerreference_from_resource(superset, None, Some(true)) + .context(ObjectMissingMetadataForOwnerRefSnafu)? + .with_recommended_labels(build_recommended_labels( + superset, + SUPERSET_CONTROLLER_NAME, + &resolved_product_image.app_version_label, + &rolegroup.role, + &rolegroup.role_group, + )) + .context(MetadataBuildSnafu)? + .with_label(Label::try_from(("prometheus.io/scrape", "true")).context(LabelBuildSnafu)?) + .build(), + spec: Some(ServiceSpec { + // Internal communication does not need to be exposed + type_: Some("ClusterIP".to_owned()), + cluster_ip: Some("None".to_owned()), + ports: Some(metrics_ports()), + selector: Some( + Labels::role_group_selector( + superset, + APP_NAME, + &rolegroup.role, + &rolegroup.role_group, + ) + .context(LabelBuildSnafu)? + .into(), + ), + publish_not_ready_addresses: Some(true), + ..ServiceSpec::default() + }), + status: None, + }; + + Ok(metrics_service) +} + +/// Headless service for cluster internal purposes only. +// TODO: Move to operator-rs +pub fn rolegroup_headless_service_name( + rolegroup: &RoleGroupRef, +) -> String { + format!("{name}-headless", name = rolegroup.object_name()) +} + +/// Headless metrics service exposes Prometheus endpoint only +// TODO: Move to operator-rs +pub fn rolegroup_headless_metrics_service_name( + rolegroup: &RoleGroupRef, +) -> String { + format!("{name}-metrics", name = rolegroup.object_name()) +} + +fn metrics_ports() -> Vec { + vec![ServicePort { + name: Some(METRICS_PORT_NAME.to_string()), + port: METRICS_PORT.into(), + protocol: Some("TCP".to_string()), + ..ServicePort::default() + }] +} + +fn service_ports() -> Vec { + vec![ServicePort { + name: Some(APP_PORT_NAME.to_string()), + port: APP_PORT.into(), + protocol: Some("TCP".to_string()), + ..ServicePort::default() + }] +} diff --git a/rust/operator-binary/src/superset_controller.rs b/rust/operator-binary/src/superset_controller.rs index 1d073d6b..7e312727 100644 --- a/rust/operator-binary/src/superset_controller.rs +++ b/rust/operator-binary/src/superset_controller.rs @@ -37,7 +37,7 @@ use stackable_operator::{ DeepMerge, api::{ apps::v1::{StatefulSet, StatefulSetSpec}, - core::v1::{ConfigMap, EnvVar, HTTPGetAction, Probe, Service, ServiceSpec}, + core::v1::{ConfigMap, EnvVar, HTTPGetAction, Probe}, }, apimachinery::pkg::{apis::meta::v1::LabelSelector, util::intstr::IntOrString}, }, @@ -87,6 +87,7 @@ use crate::{ listener::{LISTENER_VOLUME_DIR, LISTENER_VOLUME_NAME, build_group_listener}, operations::{graceful_shutdown::add_graceful_shutdown_config, pdb::add_pdbs}, product_logging::{LOG_CONFIG_FILE, extend_config_map_with_log_config}, + service::{build_node_rolegroup_headless_service, build_node_rolegroup_metrics_service}, util::{build_recommended_labels, rolegroup_metrics_service_name}, }; @@ -299,6 +300,8 @@ pub enum Error { }, #[snafu(display("failed to configure listener"))] ListenerConfiguration { source: crate::listener::Error }, + #[snafu(display("faild to configure service"))] + ServiceConfiguration { source: crate::service::Error }, } type Result = std::result::Result; @@ -430,16 +433,29 @@ pub async fn reconcile_superset( &rbac_sa.name_any(), &config, )?; - for rg_service in - build_node_rolegroup_services(superset, &resolved_product_image, &rolegroup)? - { - cluster_resources - .add(client, rg_service) - .await - .with_context(|_| ApplyRoleGroupServiceSnafu { - rolegroup: rolegroup.clone(), - })?; - } + + let rg_metrics_service = + build_node_rolegroup_metrics_service(superset, &resolved_product_image, &rolegroup) + .context(ServiceConfigurationSnafu)?; + + let rg_headless_service = + build_node_rolegroup_headless_service(superset, &resolved_product_image, &rolegroup) + .context(ServiceConfigurationSnafu)?; + + cluster_resources + .add(client, rg_metrics_service) + .await + .with_context(|_| ApplyRoleGroupServiceSnafu { + rolegroup: rolegroup.clone(), + })?; + + cluster_resources + .add(client, rg_headless_service) + .await + .with_context(|_| ApplyRoleGroupServiceSnafu { + rolegroup: rolegroup.clone(), + })?; + cluster_resources .add(client, rg_configmap) .await @@ -619,96 +635,10 @@ fn build_rolegroup_config_map( }) } -/// The rolegroup [`Service`] is a headless service that allows direct access to the instances of a certain rolegroup -/// -/// This is mostly useful for internal communication between peers, or for clients that perform client-side load balancing. -fn build_node_rolegroup_services( - superset: &SupersetCluster, - resolved_product_image: &ResolvedProductImage, - rolegroup: &RoleGroupRef, -) -> Result> { - let service = vec![ - Service { - metadata: ObjectMetaBuilder::new() - .name_and_namespace(superset) - .name(superset.rolegroup_headless_metrics_service_name(rolegroup)) - .ownerreference_from_resource(superset, None, Some(true)) - .context(ObjectMissingMetadataForOwnerRefSnafu)? - .with_recommended_labels(build_recommended_labels( - superset, - SUPERSET_CONTROLLER_NAME, - &resolved_product_image.app_version_label, - &rolegroup.role, - &rolegroup.role_group, - )) - .context(MetadataBuildSnafu)? - .with_label( - Label::try_from(("prometheus.io/scrape", "true")).context(LabelBuildSnafu)?, - ) - .build(), - spec: Some(ServiceSpec { - // Internal communication does not need to be exposed - type_: Some("ClusterIP".to_owned()), - cluster_ip: Some("None".to_owned()), - ports: Some(superset.metrics_ports()), - selector: Some( - Labels::role_group_selector( - superset, - APP_NAME, - &rolegroup.role, - &rolegroup.role_group, - ) - .context(LabelBuildSnafu)? - .into(), - ), - publish_not_ready_addresses: Some(true), - ..ServiceSpec::default() - }), - status: None, - }, - Service { - metadata: ObjectMetaBuilder::new() - .name_and_namespace(superset) - .name(superset.rolegroup_headless_service_name(rolegroup)) - .ownerreference_from_resource(superset, None, Some(true)) - .context(ObjectMissingMetadataForOwnerRefSnafu)? - .with_recommended_labels(build_recommended_labels( - superset, - SUPERSET_CONTROLLER_NAME, - &resolved_product_image.app_version_label, - &rolegroup.role, - &rolegroup.role_group, - )) - .context(MetadataBuildSnafu)? - .build(), - spec: Some(ServiceSpec { - // Internal communication does not need to be exposed - type_: Some("ClusterIP".to_owned()), - cluster_ip: Some("None".to_owned()), - ports: Some(superset.service_ports()), - selector: Some( - Labels::role_group_selector( - superset, - APP_NAME, - &rolegroup.role, - &rolegroup.role_group, - ) - .context(LabelBuildSnafu)? - .into(), - ), - publish_not_ready_addresses: Some(true), - ..ServiceSpec::default() - }), - status: None, - }, - ]; - - Ok(service) -} - /// The rolegroup [`StatefulSet`] runs the rolegroup, as configured by the administrator. /// -/// The [`Pod`](`stackable_operator::k8s_openapi::api::core::v1::Pod`)s are accessible through the corresponding [`Service`] (from [`build_node_rolegroup_services`]). +/// The [`Pod`](`stackable_operator::k8s_openapi::api::core::v1::Pod`)s are accessible through the corresponding +/// [`Service`](`stackable_operator::k8s_openapi::api::core::v1::Service`) (from [`build_node_rolegroup_headless_service`] and [`build_node_rolegroup_metrics_service`]). #[allow(clippy::too_many_arguments)] fn build_server_rolegroup_statefulset( superset: &SupersetCluster, From 0586ddda1ab92bd1ad7b91709464d1c2103f0f71 Mon Sep 17 00:00:00 2001 From: Maxi Wittich Date: Fri, 4 Jul 2025 15:31:44 +0200 Subject: [PATCH 7/8] Updating according to feedback on hive --- rust/operator-binary/src/service.rs | 4 ++-- rust/operator-binary/src/superset_controller.rs | 9 ++++++--- rust/operator-binary/src/util.rs | 4 ---- 3 files changed, 8 insertions(+), 9 deletions(-) diff --git a/rust/operator-binary/src/service.rs b/rust/operator-binary/src/service.rs index 8ffc2d83..6c635bd3 100644 --- a/rust/operator-binary/src/service.rs +++ b/rust/operator-binary/src/service.rs @@ -83,7 +83,7 @@ pub fn build_node_rolegroup_metrics_service( let metrics_service = Service { metadata: ObjectMetaBuilder::new() .name_and_namespace(superset) - .name(rolegroup_headless_metrics_service_name(rolegroup)) + .name(rolegroup_metrics_service_name(rolegroup)) .ownerreference_from_resource(superset, None, Some(true)) .context(ObjectMissingMetadataForOwnerRefSnafu)? .with_recommended_labels(build_recommended_labels( @@ -130,7 +130,7 @@ pub fn rolegroup_headless_service_name( /// Headless metrics service exposes Prometheus endpoint only // TODO: Move to operator-rs -pub fn rolegroup_headless_metrics_service_name( +pub fn rolegroup_metrics_service_name( rolegroup: &RoleGroupRef, ) -> String { format!("{name}-metrics", name = rolegroup.object_name()) diff --git a/rust/operator-binary/src/superset_controller.rs b/rust/operator-binary/src/superset_controller.rs index 7e312727..e9ede441 100644 --- a/rust/operator-binary/src/superset_controller.rs +++ b/rust/operator-binary/src/superset_controller.rs @@ -87,8 +87,11 @@ use crate::{ listener::{LISTENER_VOLUME_DIR, LISTENER_VOLUME_NAME, build_group_listener}, operations::{graceful_shutdown::add_graceful_shutdown_config, pdb::add_pdbs}, product_logging::{LOG_CONFIG_FILE, extend_config_map_with_log_config}, - service::{build_node_rolegroup_headless_service, build_node_rolegroup_metrics_service}, - util::{build_recommended_labels, rolegroup_metrics_service_name}, + service::{ + build_node_rolegroup_headless_service, build_node_rolegroup_metrics_service, + rolegroup_headless_service_name, + }, + util::build_recommended_labels, }; pub const SUPERSET_CONTROLLER_NAME: &str = "supersetcluster"; @@ -921,7 +924,7 @@ fn build_server_rolegroup_statefulset( ), ..LabelSelector::default() }, - service_name: Some(rolegroup_metrics_service_name(&rolegroup_ref.object_name())), + service_name: Some(rolegroup_headless_service_name(rolegroup_ref)), template: pod_template, volume_claim_templates: pvcs, ..StatefulSetSpec::default() diff --git a/rust/operator-binary/src/util.rs b/rust/operator-binary/src/util.rs index b199f21b..0ddf70e7 100644 --- a/rust/operator-binary/src/util.rs +++ b/rust/operator-binary/src/util.rs @@ -48,7 +48,3 @@ pub fn build_recommended_labels<'a, T>( role_group, } } - -pub fn rolegroup_metrics_service_name(role_group_ref_object_name: &str) -> String { - format!("{role_group_ref_object_name}-metrics") -} From aba4f1c00b327179dae9a5b9cb945cefd5973b8f Mon Sep 17 00:00:00 2001 From: Maxi Wittich Date: Fri, 4 Jul 2025 15:33:47 +0200 Subject: [PATCH 8/8] Update rustdocs --- rust/operator-binary/src/superset_controller.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/rust/operator-binary/src/superset_controller.rs b/rust/operator-binary/src/superset_controller.rs index e9ede441..6d2f094c 100644 --- a/rust/operator-binary/src/superset_controller.rs +++ b/rust/operator-binary/src/superset_controller.rs @@ -641,7 +641,7 @@ fn build_rolegroup_config_map( /// The rolegroup [`StatefulSet`] runs the rolegroup, as configured by the administrator. /// /// The [`Pod`](`stackable_operator::k8s_openapi::api::core::v1::Pod`)s are accessible through the corresponding -/// [`Service`](`stackable_operator::k8s_openapi::api::core::v1::Service`) (from [`build_node_rolegroup_headless_service`] and [`build_node_rolegroup_metrics_service`]). +/// [`Service`](`stackable_operator::k8s_openapi::api::core::v1::Service`) (via [`build_node_rolegroup_headless_service`] and metrics from [`build_node_rolegroup_metrics_service`]). #[allow(clippy::too_many_arguments)] fn build_server_rolegroup_statefulset( superset: &SupersetCluster,