From b383d0e1c85a1ec7c47553e758839860b62192b5 Mon Sep 17 00:00:00 2001 From: Nick Larsen Date: Fri, 30 May 2025 11:08:36 +0200 Subject: [PATCH 01/18] chore(listener): Update operator role permissions --- deploy/helm/zookeeper-operator/templates/roles.yaml | 11 +++++++++++ 1 file changed, 11 insertions(+) diff --git a/deploy/helm/zookeeper-operator/templates/roles.yaml b/deploy/helm/zookeeper-operator/templates/roles.yaml index 8d3779b1..4c03cf09 100644 --- a/deploy/helm/zookeeper-operator/templates/roles.yaml +++ b/deploy/helm/zookeeper-operator/templates/roles.yaml @@ -107,6 +107,17 @@ rules: verbs: - create - patch + - apiGroups: + - listeners.stackable.tech + resources: + - listeners + verbs: + - get + - list + - watch + - patch + - create + - delete - apiGroups: - {{ include "operator.name" . }}.stackable.tech resources: From bd1e084bfcacf32e5ea49550de91e28214e45fb5 Mon Sep 17 00:00:00 2001 From: Nick Larsen Date: Fri, 30 May 2025 11:16:55 +0200 Subject: [PATCH 02/18] docs(listener): Update service exposition docs --- .../zookeeper/pages/usage_guide/listenerclass.adoc | 13 ++++++------- 1 file changed, 6 insertions(+), 7 deletions(-) diff --git a/docs/modules/zookeeper/pages/usage_guide/listenerclass.adoc b/docs/modules/zookeeper/pages/usage_guide/listenerclass.adoc index 037f934f..f9813a8b 100644 --- a/docs/modules/zookeeper/pages/usage_guide/listenerclass.adoc +++ b/docs/modules/zookeeper/pages/usage_guide/listenerclass.adoc @@ -2,15 +2,14 @@ Apache ZooKeeper offers an API. The Operator deploys a service called `` (where `` is the name of the ZookeeperCluster) through which ZooKeeper can be reached. -This service can have either the `cluster-internal` or `external-unstable` type. `external-stable` is not supported for ZooKeeper at the moment. -Read more about the types in the xref:concepts:service-exposition.adoc[service exposition] documentation at platform level. - -This is how the listener class is configured: +The operator deploys a xref:listener-operator:listener.adoc[Listener] for the Server pod. +The listener defaults to only being accessible from within the Kubernetes cluster, but this can be changed by setting `.spec.nodes.config.listenerClass`: [source,yaml] ---- spec: - clusterConfig: - listenerClass: cluster-internal # <1> + servers: + config: + listenerClass: external-unstable # <1> ---- -<1> The default `cluster-internal` setting. +<1> Specify one of `external-stable`, `external-unstable`, `cluster-internal` (the default setting is `cluster-internal`). From 95105f7abcfe59dfea7fb5287cfa9d878a121420 Mon Sep 17 00:00:00 2001 From: Nick Larsen Date: Tue, 24 Jun 2025 13:56:08 +0200 Subject: [PATCH 03/18] feat!: Implement server role listener BREAKING: CRD changes .spec.clusterConfig.listenerClass to .spec.servers.roleConfig.listenerClass --- deploy/helm/zookeeper-operator/crds/crds.yaml | 20 +- rust/operator-binary/src/config/jvm.rs | 17 +- rust/operator-binary/src/crd/mod.rs | 84 ++++--- rust/operator-binary/src/discovery.rs | 201 +++++++-------- rust/operator-binary/src/listener.rs | 89 +++++++ rust/operator-binary/src/main.rs | 22 +- rust/operator-binary/src/zk_controller.rs | 231 +++++++++--------- rust/operator-binary/src/znode_controller.rs | 42 ++-- 8 files changed, 376 insertions(+), 330 deletions(-) create mode 100644 rust/operator-binary/src/listener.rs diff --git a/deploy/helm/zookeeper-operator/crds/crds.yaml b/deploy/helm/zookeeper-operator/crds/crds.yaml index 1ecc3a6a..18aa6e3e 100644 --- a/deploy/helm/zookeeper-operator/crds/crds.yaml +++ b/deploy/helm/zookeeper-operator/crds/crds.yaml @@ -28,7 +28,6 @@ spec: clusterConfig: default: authentication: [] - listenerClass: cluster-internal tls: quorumSecretClass: tls serverSecretClass: tls @@ -53,20 +52,6 @@ spec: - authenticationClass type: object type: array - listenerClass: - default: cluster-internal - description: |- - This field controls which type of Service the Operator creates for this ZookeeperCluster: - - * cluster-internal: Use a ClusterIP service - - * external-unstable: Use a NodePort service - - This is a temporary solution with the goal to keep yaml manifests forward compatible. In the future, this setting will control which [ListenerClass](https://docs.stackable.tech/home/nightly/listener-operator/listenerclass.html) will be used to expose the service, and ListenerClass names will stay the same, allowing for a non-breaking change. - enum: - - cluster-internal - - external-unstable - type: string tls: default: quorumSecretClass: tls @@ -444,11 +429,16 @@ spec: x-kubernetes-preserve-unknown-fields: true roleConfig: default: + listenerClass: cluster-internal podDisruptionBudget: enabled: true maxUnavailable: null description: This is a product-agnostic RoleConfig, which is sufficient for most of the products. properties: + listenerClass: + default: cluster-internal + description: This field controls which [ListenerClass](https://docs.stackable.tech/home/nightly/listener-operator/listenerclass.html) is used to expose the ZooKeeper nodes. + type: string podDisruptionBudget: default: enabled: true diff --git a/rust/operator-binary/src/config/jvm.rs b/rust/operator-binary/src/config/jvm.rs index 2db69e86..12aa3bf5 100644 --- a/rust/operator-binary/src/config/jvm.rs +++ b/rust/operator-binary/src/config/jvm.rs @@ -1,13 +1,15 @@ use snafu::{OptionExt, ResultExt, Snafu}; use stackable_operator::{ memory::{BinaryMultiple, MemoryQuantity}, - role_utils::{self, GenericRoleConfig, JavaCommonConfig, JvmArgumentOverrides, Role}, + role_utils::{self, JavaCommonConfig, JvmArgumentOverrides, Role}, }; use crate::crd::{ JVM_SECURITY_PROPERTIES_FILE, LOG4J_CONFIG_FILE, LOGBACK_CONFIG_FILE, LoggingFramework, METRICS_PORT, STACKABLE_CONFIG_DIR, STACKABLE_LOG_CONFIG_DIR, - v1alpha1::{ZookeeperCluster, ZookeeperConfig, ZookeeperConfigFragment}, + v1alpha1::{ + ZookeeperCluster, ZookeeperConfig, ZookeeperConfigFragment, ZookeeperServerRoleConfig, + }, }; const JAVA_HEAP_FACTOR: f32 = 0.8; @@ -29,7 +31,7 @@ pub enum Error { /// All JVM arguments. fn construct_jvm_args( zk: &ZookeeperCluster, - role: &Role, + role: &Role, role_group: &str, ) -> Result, Error> { let logging_framework = zk.logging_framework(); @@ -63,7 +65,7 @@ fn construct_jvm_args( /// [`construct_zk_server_heap_env`]). pub fn construct_non_heap_jvm_args( zk: &ZookeeperCluster, - role: &Role, + role: &Role, role_group: &str, ) -> Result { let mut jvm_args = construct_jvm_args(zk, role, role_group)?; @@ -99,7 +101,10 @@ fn is_heap_jvm_argument(jvm_argument: &str) -> bool { #[cfg(test)] mod tests { use super::*; - use crate::crd::{ZookeeperRole, v1alpha1::ZookeeperConfig}; + use crate::crd::{ + ZookeeperRole, + v1alpha1::{ZookeeperConfig, ZookeeperServerRoleConfig}, + }; #[test] fn test_construct_jvm_arguments_defaults() { @@ -182,7 +187,7 @@ mod tests { ) -> ( ZookeeperCluster, ZookeeperConfig, - Role, + Role, String, ) { let zookeeper: ZookeeperCluster = diff --git a/rust/operator-binary/src/crd/mod.rs b/rust/operator-binary/src/crd/mod.rs index 532a95d2..900f886f 100644 --- a/rust/operator-binary/src/crd/mod.rs +++ b/rust/operator-binary/src/crd/mod.rs @@ -34,7 +34,7 @@ use stackable_operator::{ }; use strum::{Display, EnumIter, EnumString, IntoEnumIterator}; -use crate::crd::affinity::get_affinity; +use crate::crd::{affinity::get_affinity, v1alpha1::ZookeeperServerRoleConfig}; pub mod affinity; pub mod authentication; @@ -47,6 +47,7 @@ pub const OPERATOR_NAME: &str = "zookeeper.stackable.tech"; pub const ZOOKEEPER_PROPERTIES_FILE: &str = "zoo.cfg"; pub const JVM_SECURITY_PROPERTIES_FILE: &str = "security.properties"; +pub const METRICS_PORT_NAME: &str = "metrics"; pub const METRICS_PORT: u16 = 9505; pub const STACKABLE_DATA_DIR: &str = "/stackable/data"; @@ -72,6 +73,7 @@ pub const MAX_PREPARE_LOG_FILE_SIZE: MemoryQuantity = MemoryQuantity { pub const DOCKER_IMAGE_BASE_NAME: &str = "zookeeper"; const DEFAULT_SERVER_GRACEFUL_SHUTDOWN_TIMEOUT: Duration = Duration::from_minutes_unchecked(2); +pub const DEFAULT_LISTENER_CLASS: &str = "cluster-internal"; mod built_info { pub const CARGO_PKG_VERSION: &str = env!("CARGO_PKG_VERSION"); @@ -139,7 +141,19 @@ pub mod versioned { // no doc - it's in the struct. #[serde(skip_serializing_if = "Option::is_none")] - pub servers: Option>, + pub servers: + Option>, + } + + #[derive(Clone, Debug, Deserialize, JsonSchema, PartialEq, Serialize)] + #[serde(rename_all = "camelCase")] + pub struct ZookeeperServerRoleConfig { + #[serde(flatten)] + pub common: GenericRoleConfig, + + /// This field controls which [ListenerClass](DOCS_BASE_URL_PLACEHOLDER/listener-operator/listenerclass.html) is used to expose the ZooKeeper nodes. + #[serde(default = "default_listener_class")] + pub listener_class: String, } #[derive(Clone, Deserialize, Debug, Eq, JsonSchema, PartialEq, Serialize)] @@ -164,29 +178,6 @@ pub mod versioned { skip_serializing_if = "Option::is_none" )] pub tls: Option, - - /// This field controls which type of Service the Operator creates for this ZookeeperCluster: - /// - /// * cluster-internal: Use a ClusterIP service - /// - /// * external-unstable: Use a NodePort service - /// - /// This is a temporary solution with the goal to keep yaml manifests forward compatible. - /// In the future, this setting will control which [ListenerClass](DOCS_BASE_URL_PLACEHOLDER/listener-operator/listenerclass.html) - /// will be used to expose the service, and ListenerClass names will stay the same, allowing for a non-breaking change. - #[serde(default)] - pub listener_class: CurrentlySupportedListenerClasses, - } - - // TODO: Temporary solution until listener-operator is finished - #[derive(Clone, Debug, Default, Display, Deserialize, Eq, JsonSchema, PartialEq, Serialize)] - #[serde(rename_all = "PascalCase")] - pub enum CurrentlySupportedListenerClasses { - #[default] - #[serde(rename = "cluster-internal")] - ClusterInternal, - #[serde(rename = "external-unstable")] - ExternalUnstable, } #[derive(Clone, Debug, Default, Fragment, JsonSchema, PartialEq)] @@ -354,15 +345,18 @@ fn cluster_config_default() -> v1alpha1::ZookeeperClusterConfig { authentication: vec![], vector_aggregator_config_map_name: None, tls: tls::default_zookeeper_tls(), - listener_class: v1alpha1::CurrentlySupportedListenerClasses::default(), } } -impl v1alpha1::CurrentlySupportedListenerClasses { - pub fn k8s_service_type(&self) -> String { - match self { - v1alpha1::CurrentlySupportedListenerClasses::ClusterInternal => "ClusterIP".to_string(), - v1alpha1::CurrentlySupportedListenerClasses::ExternalUnstable => "NodePort".to_string(), +fn default_listener_class() -> String { + DEFAULT_LISTENER_CLASS.to_owned() +} + +impl Default for ZookeeperServerRoleConfig { + fn default() -> Self { + Self { + listener_class: default_listener_class(), + common: Default::default(), } } } @@ -494,6 +488,7 @@ impl HasStatusCondition for v1alpha1::ZookeeperCluster { } impl ZookeeperPodRef { + // TODO (@NickLarsenNZ): What to do here? pub fn fqdn(&self, cluster_info: &KubernetesClusterInfo) -> String { format!( "{pod_name}.{service_name}.{namespace}.svc.{cluster_domain}", @@ -529,16 +524,23 @@ impl v1alpha1::ZookeeperCluster { } } - /// The name of the role-level load-balanced Kubernetes `Service` - pub fn server_role_service_name(&self) -> Option { + /// The name of the role-level [Listener] + /// + /// [Listener]: stackable_operator::crd::listener::v1alpha1::Listener + pub fn server_role_listener_name(&self) -> Option { self.metadata.name.clone() } - /// The fully-qualified domain name of the role-level load-balanced Kubernetes `Service` - pub fn server_role_service_fqdn(&self, cluster_info: &KubernetesClusterInfo) -> Option { + /// The fully-qualified domain name of the role-level [Listener] + /// + /// [Listener]: stackable_operator::crd::listener::v1alpha1::Listener + pub fn server_role_listener_fqdn( + &self, + cluster_info: &KubernetesClusterInfo, + ) -> Option { Some(format!( - "{role_service_name}.{namespace}.svc.{cluster_domain}", - role_service_name = self.server_role_service_name()?, + "{role_listener_name}.{namespace}.svc.{cluster_domain}", + role_listener_name = self.server_role_listener_name()?, namespace = self.metadata.namespace.as_ref()?, cluster_domain = cluster_info.cluster_domain )) @@ -548,8 +550,10 @@ impl v1alpha1::ZookeeperCluster { pub fn role( &self, role_variant: &ZookeeperRole, - ) -> Result<&Role, Error> - { + ) -> Result< + &Role, + Error, + > { match role_variant { ZookeeperRole::Server => self.spec.servers.as_ref(), } @@ -590,7 +594,7 @@ impl v1alpha1::ZookeeperCluster { } } - pub fn role_config(&self, role: &ZookeeperRole) -> Option<&GenericRoleConfig> { + pub fn role_config(&self, role: &ZookeeperRole) -> Option<&ZookeeperServerRoleConfig> { match role { ZookeeperRole::Server => self.spec.servers.as_ref().map(|s| &s.role_config), } diff --git a/rust/operator-binary/src/discovery.rs b/rust/operator-binary/src/discovery.rs index c774d628..796389b2 100644 --- a/rust/operator-binary/src/discovery.rs +++ b/rust/operator-binary/src/discovery.rs @@ -4,9 +4,9 @@ use snafu::{OptionExt, ResultExt, Snafu}; use stackable_operator::{ builder::{configmap::ConfigMapBuilder, meta::ObjectMetaBuilder}, commons::product_image_selection::ResolvedProductImage, - k8s_openapi::api::core::v1::{ConfigMap, Endpoints, Service}, + crd::listener, + k8s_openapi::api::core::v1::ConfigMap, kube::{Resource, ResourceExt, runtime::reflector::ObjectRef}, - utils::cluster_info::KubernetesClusterInfo, }; use crate::{ @@ -27,29 +27,28 @@ pub enum Error { #[snafu(display("chroot path {} was relative (must be absolute)", chroot))] RelativeChroot { chroot: String }, - #[snafu(display("object has no name associated"))] - NoName, - #[snafu(display("object has no namespace associated"))] NoNamespace, #[snafu(display("failed to list expected pods"))] ExpectedPods { source: crate::crd::Error }, - #[snafu(display("could not find service port with name {}", port_name))] - NoServicePort { port_name: String }, - - #[snafu(display("service port with name {} does not have a nodePort", port_name))] - NoNodePort { port_name: String }, + #[snafu(display("{listener} does not have a port with the name {port_name:?}"))] + PortNotFound { + port_name: String, + listener: ObjectRef, + }, - #[snafu(display("could not find Endpoints for {}", svc))] - FindEndpoints { - source: stackable_operator::client::Error, - svc: ObjectRef, + #[snafu(display("expected an unsigned 16-bit port, got {port_number}"))] + InvalidPort { + source: TryFromIntError, + port_number: i32, }, - #[snafu(display("nodePort was out of range"))] - InvalidNodePort { source: TryFromIntError }, + #[snafu(display("{listener} has no ingress addresses"))] + NoListnerAddresses { + listener: ObjectRef, + }, #[snafu(display("failed to build ConfigMap"))] BuildConfigMap { @@ -62,75 +61,33 @@ pub enum Error { }, } -/// Builds discovery [`ConfigMap`]s for connecting to a [`v1alpha1::ZookeeperCluster`] for all expected scenarios +/// Build a discovery [`ConfigMap`] containing connection details for a [`v1alpha1::ZookeeperCluster`] from a [`listener::v1alpha1::Listener`]. +/// +/// `hosts` will usually come from either [`pod_hosts`] or [`nodeport_hosts`]. #[allow(clippy::too_many_arguments)] -pub async fn build_discovery_configmaps( +pub fn build_discovery_configmap( zk: &v1alpha1::ZookeeperCluster, owner: &impl Resource, - client: &stackable_operator::client::Client, controller_name: &str, - svc: &Service, + listener: listener::v1alpha1::Listener, chroot: Option<&str>, resolved_product_image: &ResolvedProductImage, zookeeper_security: &ZookeeperSecurity, -) -> Result> { +) -> Result { let name = owner.name_unchecked(); let namespace = owner.namespace().context(NoNamespaceSnafu)?; - let mut discovery_configmaps = vec![build_discovery_configmap( - zk, - owner, - zookeeper_security, - name.as_str(), - &namespace, - controller_name, - chroot, - pod_hosts(zk, zookeeper_security, &client.kubernetes_cluster_info)?, - resolved_product_image, - )?]; - if zk.spec.cluster_config.listener_class - == crate::crd::v1alpha1::CurrentlySupportedListenerClasses::ExternalUnstable - { - discovery_configmaps.push(build_discovery_configmap( - zk, - owner, - zookeeper_security, - &format!("{}-nodeport", name), - &namespace, - controller_name, - chroot, - nodeport_hosts(client, svc, "zk").await?, - resolved_product_image, - )?); - } + let listener_addresses = listener_addresses(&listener, "zk")?; - Ok(discovery_configmaps) -} - -/// Build a discovery [`ConfigMap`] containing information about how to connect to a certain [`v1alpha1::ZookeeperCluster`] -/// -/// `hosts` will usually come from either [`pod_hosts`] or [`nodeport_hosts`]. -#[allow(clippy::too_many_arguments)] -fn build_discovery_configmap( - zk: &v1alpha1::ZookeeperCluster, - owner: &impl Resource, - zookeeper_security: &ZookeeperSecurity, - name: &str, - namespace: &str, - controller_name: &str, - chroot: Option<&str>, - hosts: impl IntoIterator, u16)>, - resolved_product_image: &ResolvedProductImage, -) -> Result { // Write a connection string of the format that Java ZooKeeper client expects: // "{host1}:{port1},{host2:port2},.../{chroot}" // See https://zookeeper.apache.org/doc/current/apidocs/zookeeper-server/org/apache/zookeeper/ZooKeeper.html#ZooKeeper-java.lang.String-int-org.apache.zookeeper.Watcher- - let hosts = hosts + let listener_addresses = listener_addresses .into_iter() - .map(|(host, port)| format!("{}:{}", host.into(), port)) + .map(|(host, port)| format!("{host}:{port}")) .collect::>() .join(","); - let mut conn_str = hosts.clone(); + let mut conn_str = listener_addresses.clone(); if let Some(chroot) = chroot { if !chroot.starts_with('/') { return RelativeChrootSnafu { chroot }.fail(); @@ -158,7 +115,7 @@ fn build_discovery_configmap( ) .add_data("ZOOKEEPER", conn_str) // Some clients don't support ZooKeeper's merged `hosts/chroot` format, so export them separately for these clients - .add_data("ZOOKEEPER_HOSTS", hosts) + .add_data("ZOOKEEPER_HOSTS", listener_addresses) .add_data( "ZOOKEEPER_CLIENT_PORT", zookeeper_security.client_port().to_string(), @@ -168,57 +125,67 @@ fn build_discovery_configmap( .context(BuildConfigMapSnafu) } -/// Lists all Pods FQDNs expected to host the [`v1alpha1::ZookeeperCluster`] -fn pod_hosts<'a>( - zk: &'a v1alpha1::ZookeeperCluster, - zookeeper_security: &'a ZookeeperSecurity, - cluster_info: &'a KubernetesClusterInfo, -) -> Result + 'a> { - Ok(zk - .pods() - .context(ExpectedPodsSnafu)? - .map(|pod_ref| (pod_ref.fqdn(cluster_info), zookeeper_security.client_port()))) -} - -/// Lists all nodes currently hosting Pods participating in the [`Service`] -async fn nodeport_hosts( - client: &stackable_operator::client::Client, - svc: &Service, +// /// Lists all Pods FQDNs expected to host the [`v1alpha1::ZookeeperCluster`] +// fn pod_hosts<'a>( +// zk: &'a v1alpha1::ZookeeperCluster, +// zookeeper_security: &'a ZookeeperSecurity, +// cluster_info: &'a KubernetesClusterInfo, +// ) -> Result + 'a> { +// Ok(zk +// .pods() +// .context(ExpectedPodsSnafu)? +// .map(|pod_ref| (pod_ref.fqdn(cluster_info), zookeeper_security.client_port()))) +// } + +/// Lists all listener address and port number pairs for a given `port_name` for Pods participating in the [`Listener`][1] +/// +/// This returns pairs of `(Address, Port)`, where address could be a hostname or IP address of a node, clusterIP or external +/// load balancer depending on the Service type. +/// +/// ## Errors +/// +/// An error will be returned if there is no address found for the `port_name`. +/// +/// [1]: listener::v1alpha1::Listener +// TODO (@NickLarsenNZ): Move this to stackable-operator, so it can be used as listener.addresses_for_port(port_name) +fn listener_addresses( + listener: &listener::v1alpha1::Listener, port_name: &str, ) -> Result> { - let svc_port = svc - .spec + // Get addresses port pairs for addresses that have a port with the name that matches the one we are interested in + let address_port_pairs = listener + .status .as_ref() - .and_then(|svc_spec| { - svc_spec - .ports - .as_ref()? - .iter() - .find(|port| port.name.as_deref() == Some(port_name)) + .and_then(|listener_status| listener_status.ingress_addresses.as_ref()) + // TODO (@NickLarsenNZ): Rename error variant + .context(NoListnerAddressesSnafu { listener })? + .iter() + // Filter the addresses that have the port we are interested in (they likely all have it though) + .filter_map(|listener_ingress| { + Some(listener_ingress.address.clone()).zip(listener_ingress.ports.get(port_name)) + }) + // Convert the port from i32 to u16 + .map(|(listener_address, &port_number)| { + let port_number: u16 = port_number + .try_into() + .context(InvalidPortSnafu { port_number })?; + Ok((listener_address, port_number)) }) - .context(NoServicePortSnafu { port_name })?; - let node_port = svc_port.node_port.context(NoNodePortSnafu { port_name })?; - let endpoints = client - .get::( - svc.metadata.name.as_deref().context(NoNameSnafu)?, - svc.metadata - .namespace - .as_deref() - .context(NoNamespaceSnafu)?, - ) - .await - .with_context(|_| FindEndpointsSnafu { - svc: ObjectRef::from_obj(svc), - })?; - let nodes = endpoints - .subsets - .into_iter() - .flatten() - .flat_map(|subset| subset.addresses) - .flatten() - .flat_map(|addr| addr.node_name); - let addrs = nodes - .map(|node| Ok((node, node_port.try_into().context(InvalidNodePortSnafu)?))) .collect::, _>>()?; - Ok(addrs) + + // An empty list is considered an error + match address_port_pairs.is_empty() { + true => PortNotFoundSnafu { + port_name, + listener, + } + .fail(), + false => Ok(address_port_pairs), + } +} + +// TODO (@NickLarsenNZ): Implement this directly on RoleGroupRef, ie: +// RoleGroupRef::metrics_service_name(&self) +pub fn build_headless_role_group_metrics_service_name(name: String) -> String { + format!("{name}-metrics") } diff --git a/rust/operator-binary/src/listener.rs b/rust/operator-binary/src/listener.rs new file mode 100644 index 00000000..81b76e9f --- /dev/null +++ b/rust/operator-binary/src/listener.rs @@ -0,0 +1,89 @@ +//! Types and functions for exposing product endpoints via [listener::v1alpha1::Listener]. + +use snafu::{ResultExt as _, Snafu}; +use stackable_operator::{ + builder::meta::ObjectMetaBuilder, commons::product_image_selection::ResolvedProductImage, + crd::listener, kube::ResourceExt as _, +}; + +use crate::{ + crd::{ZookeeperRole, security::ZookeeperSecurity, v1alpha1}, + utils::build_recommended_labels, + zk_controller::ZK_CONTROLLER_NAME, +}; + +type Result = std::result::Result; + +#[derive(Snafu, Debug)] +pub enum Error { + #[snafu(display("object is missing metadata to build owner reference"))] + ObjectMissingMetadataForOwnerRef { + source: stackable_operator::builder::meta::Error, + }, + + #[snafu(display("failed to build recommended labels"))] + RecommendedLabels { + source: stackable_operator::builder::meta::Error, + }, +} + +pub fn build_role_listener( + zk: &v1alpha1::ZookeeperCluster, + zk_role: &ZookeeperRole, + resolved_product_image: &ResolvedProductImage, + zookeeper_security: &ZookeeperSecurity, +) -> Result { + // TODO (@NickLarsenNZ): Move this to a common function that takes a zk and a zk_role so that we can use it in other places like the PVC generation + let listener_name = role_listener_name(zk, zk_role); + let listener_class = &zk + .role(zk_role) + .expect("handle error: valid role") + .role_config + .listener_class; + + let listener = listener::v1alpha1::Listener { + metadata: ObjectMetaBuilder::new() + .name_and_namespace(zk) + .name(listener_name) + .ownerreference_from_resource(zk, None, Some(true)) + .context(ObjectMissingMetadataForOwnerRefSnafu)? + // Since we only make a listener for the role, which labels should we use? + // We can't use with_recommended_labels because it requires an ObjectLabels which + // in turn requires RoleGroup stuff) + // TODO (@NickLarsenNZ): Make separate functions for with_recommended_labels with/without rolegroups + // .with_labels(manual_labels).build() + .with_recommended_labels(build_recommended_labels( + zk, + ZK_CONTROLLER_NAME, + &resolved_product_image.app_version_label, + &zk_role.to_string(), + "global", // TODO (@NickLarsenNZ): update build_recommended_labels to have an optional role_group + )) + .context(RecommendedLabelsSnafu)? + .build(), + spec: listener::v1alpha1::ListenerSpec { + class_name: Some(listener_class.to_owned()), + ports: Some(listener_ports(zookeeper_security)), + ..listener::v1alpha1::ListenerSpec::default() + }, + status: None, + }; + + Ok(listener) +} + +// TODO (@NickLarsenNZ): This could be a method we can put on a Resource that takes a role_name +fn role_listener_name(zk: &v1alpha1::ZookeeperCluster, zk_role: &ZookeeperRole) -> String { + // TODO (@NickLarsenNZ): Make a convention, do we use name_any() and allow empty string? or metadata.name.expect, or handle the error? + format!("{zk}-{zk_role}", zk = zk.name_any()) +} + +// We only use the http port here and intentionally omit +// the metrics one. +fn listener_ports(zookeeper_security: &ZookeeperSecurity) -> Vec { + vec![listener::v1alpha1::ListenerPort { + name: "zk".to_string(), + port: zookeeper_security.client_port().into(), + protocol: Some("TCP".to_string()), + }] +} diff --git a/rust/operator-binary/src/main.rs b/rust/operator-binary/src/main.rs index 37229c6d..93b7e657 100644 --- a/rust/operator-binary/src/main.rs +++ b/rust/operator-binary/src/main.rs @@ -8,7 +8,7 @@ use stackable_operator::{ cli::{Command, ProductOperatorRun}, k8s_openapi::api::{ apps::v1::StatefulSet, - core::v1::{ConfigMap, Endpoints, Service}, + core::v1::{ConfigMap, Service}, }, kube::{ Resource, @@ -31,6 +31,7 @@ mod command; mod config; pub mod crd; mod discovery; +mod listener; mod operations; mod product_logging; mod utils; @@ -99,30 +100,11 @@ async fn main() -> anyhow::Result<()> { controller: ZK_FULL_CONTROLLER_NAME.to_string(), instance: None, })); - let zk_store = zk_controller.store(); let zk_controller = zk_controller .owns( watch_namespace.get_api::>(&client), watcher::Config::default(), ) - .watches( - watch_namespace.get_api::>(&client), - watcher::Config::default(), - move |endpoints| { - zk_store - .state() - .into_iter() - .filter(move |zk| { - let Ok(zk) = &zk.0 else { - return false; - }; - let endpoints_meta = endpoints.meta(); - zk.metadata.namespace == endpoints_meta.namespace - && zk.server_role_service_name() == endpoints_meta.name - }) - .map(|zk| ObjectRef::from_obj(&*zk)) - }, - ) .owns( watch_namespace.get_api::>(&client), watcher::Config::default(), diff --git a/rust/operator-binary/src/zk_controller.rs b/rust/operator-binary/src/zk_controller.rs index ddaa9e2b..cfba85e2 100644 --- a/rust/operator-binary/src/zk_controller.rs +++ b/rust/operator-binary/src/zk_controller.rs @@ -21,7 +21,12 @@ use stackable_operator::{ self, configmap::ConfigMapBuilder, meta::ObjectMetaBuilder, - pod::{PodBuilder, container::ContainerBuilder, resources::ResourceRequirementsBuilder}, + pod::{ + PodBuilder, + container::ContainerBuilder, + resources::ResourceRequirementsBuilder, + volume::{ListenerOperatorVolumeSourceBuilder, ListenerReference}, + }, }, cluster_resources::{ClusterResourceApplyStrategy, ClusterResources}, commons::{product_image_selection::ResolvedProductImage, rbac::build_rbac_resources}, @@ -31,8 +36,8 @@ use stackable_operator::{ apps::v1::{StatefulSet, StatefulSetSpec}, core::v1::{ ConfigMap, ConfigMapVolumeSource, EmptyDirVolumeSource, EnvVar, EnvVarSource, - ExecAction, ObjectFieldSelector, PodSecurityContext, Probe, Service, - ServiceAccount, ServicePort, ServiceSpec, Volume, + ExecAction, ObjectFieldSelector, PersistentVolumeClaim, PodSecurityContext, Probe, + Service, ServiceAccount, ServicePort, ServiceSpec, Volume, }, }, apimachinery::pkg::apis::meta::v1::LabelSelector, @@ -56,7 +61,7 @@ use stackable_operator::{ CustomContainerLogConfig, }, }, - role_utils::{GenericRoleConfig, RoleGroupRef}, + role_utils::RoleGroupRef, status::condition::{ compute_conditions, operations::ClusterOperationsConditionBuilder, statefulset::StatefulSetConditionBuilder, @@ -72,12 +77,14 @@ use crate::{ config::jvm::{construct_non_heap_jvm_args, construct_zk_server_heap_env}, crd::{ DOCKER_IMAGE_BASE_NAME, JVM_SECURITY_PROPERTIES_FILE, MAX_PREPARE_LOG_FILE_SIZE, - MAX_ZK_LOG_FILES_SIZE, STACKABLE_CONFIG_DIR, STACKABLE_DATA_DIR, STACKABLE_LOG_CONFIG_DIR, - STACKABLE_LOG_DIR, STACKABLE_RW_CONFIG_DIR, ZOOKEEPER_PROPERTIES_FILE, ZookeeperRole, + MAX_ZK_LOG_FILES_SIZE, METRICS_PORT, METRICS_PORT_NAME, STACKABLE_CONFIG_DIR, + STACKABLE_DATA_DIR, STACKABLE_LOG_CONFIG_DIR, STACKABLE_LOG_DIR, STACKABLE_RW_CONFIG_DIR, + ZOOKEEPER_PROPERTIES_FILE, ZookeeperRole, security::{self, ZookeeperSecurity}, - v1alpha1, + v1alpha1::{self, ZookeeperServerRoleConfig}, }, - discovery::{self, build_discovery_configmaps}, + discovery::{self, build_discovery_configmap, build_headless_role_group_metrics_service_name}, + listener::build_role_listener, operations::{graceful_shutdown::add_graceful_shutdown_config, pdb::add_pdbs}, product_logging::extend_role_group_config_map, utils::build_recommended_labels, @@ -85,6 +92,8 @@ use crate::{ pub const ZK_CONTROLLER_NAME: &str = "zookeepercluster"; pub const ZK_FULL_CONTROLLER_NAME: &str = concatcp!(ZK_CONTROLLER_NAME, '.', OPERATOR_NAME); +pub const LISTENER_VOLUME_NAME: &str = "listener"; +pub const LISTENER_VOLUME_DIR: &str = "/stackable/listener"; pub const ZK_UID: i64 = 1000; pub struct Ctx { @@ -266,6 +275,21 @@ pub enum Error { #[snafu(display("failed to construct JVM arguments"))] ConstructJvmArguments { source: crate::config::jvm::Error }, + + #[snafu(display("failed to apply group listener"))] + ApplyGroupListener { + source: stackable_operator::cluster_resources::Error, + }, + + #[snafu(display("failed to configure listener"))] + ListenerConfiguration { source: crate::listener::Error }, + + // #[snafu(display("failed to configure listener"))] + // ListenerConfiguration { source: crate::listener::Error }, + #[snafu(display("failed to build listener volume"))] + BuildListenerPersistentVolume { + source: stackable_operator::builder::pod::volume::ListenerOperatorVolumeSourceBuilderError, + }, } impl ReconcilerError for Error { @@ -276,7 +300,7 @@ impl ReconcilerError for Error { fn secondary_object(&self) -> Option> { match self { Error::MissingSecretLifetime => None, - Error::InvalidZookeeperCluster { source: _ } => None, + Error::InvalidZookeeperCluster { .. } => None, Error::CrdValidationFailure { .. } => None, Error::NoServerRole => None, Error::RoleParseFailure { .. } => None, @@ -313,6 +337,9 @@ impl ReconcilerError for Error { Error::AddVolumeMount { .. } => None, Error::CreateClusterResources { .. } => None, Error::ConstructJvmArguments { .. } => None, + Error::ApplyGroupListener { .. } => None, + Error::BuildListenerPersistentVolume { .. } => None, + Error::ListenerConfiguration { .. } => None, } } } @@ -394,14 +421,6 @@ pub async fn reconcile_zk( .await .context(ApplyRoleBindingSnafu)?; - let server_role_service = cluster_resources - .add( - client, - build_server_role_service(zk, &resolved_product_image, &zookeeper_security)?, - ) - .await - .context(ApplyRoleServiceSnafu)?; - let mut ss_cond_builder = StatefulSetConditionBuilder::default(); let zk_role = ZookeeperRole::Server; @@ -411,12 +430,7 @@ pub async fn reconcile_zk( .merged_config(&ZookeeperRole::Server, &rolegroup) .context(FailedToResolveConfigSnafu)?; - let rg_service = build_server_rolegroup_service( - zk, - &rolegroup, - &resolved_product_image, - &zookeeper_security, - )?; + let rg_service = build_server_rolegroup_service(zk, &rolegroup, &resolved_product_image)?; let rg_configmap = build_server_rolegroup_config_map( zk, &rolegroup, @@ -435,6 +449,7 @@ pub async fn reconcile_zk( &merged_config, &rbac_sa, )?; + cluster_resources .add(client, rg_service) .await @@ -459,38 +474,49 @@ pub async fn reconcile_zk( } let role_config = zk.role_config(&zk_role); - if let Some(GenericRoleConfig { - pod_disruption_budget: pdb, + if let Some(ZookeeperServerRoleConfig { + common, + listener_class: _, }) = role_config { - add_pdbs(pdb, zk, &zk_role, client, &mut cluster_resources) - .await - .context(FailedToCreatePdbSnafu)?; + add_pdbs( + &common.pod_disruption_budget, + zk, + &zk_role, + client, + &mut cluster_resources, + ) + .await + .context(FailedToCreatePdbSnafu)?; } + let listener = build_role_listener(zk, &zk_role, &resolved_product_image, &zookeeper_security) + .context(ListenerConfigurationSnafu)?; + cluster_resources + .add(client, listener.clone()) + .await + .context(ApplyGroupListenerSnafu)?; + // std's SipHasher is deprecated, and DefaultHasher is unstable across Rust releases. // We don't /need/ stability, but it's still nice to avoid spurious changes where possible. let mut discovery_hash = FnvHasher::with_key(0); - for discovery_cm in build_discovery_configmaps( + let discovery_cm = build_discovery_configmap( zk, zk, - client, ZK_CONTROLLER_NAME, - &server_role_service, + listener, None, &resolved_product_image, &zookeeper_security, ) - .await - .context(BuildDiscoveryConfigSnafu)? - { - let discovery_cm = cluster_resources - .add(client, discovery_cm) - .await - .context(ApplyDiscoveryConfigSnafu)?; - if let Some(generation) = discovery_cm.metadata.resource_version { - discovery_hash.write(generation.as_bytes()) - } + .context(BuildDiscoveryConfigSnafu)?; + + let discovery_cm = cluster_resources + .add(client, discovery_cm) + .await + .context(ApplyDiscoveryConfigSnafu)?; + if let Some(generation) = discovery_cm.metadata.resource_version { + discovery_hash.write(generation.as_bytes()) } let cluster_operation_cond_builder = @@ -515,58 +541,6 @@ pub async fn reconcile_zk( Ok(controller::Action::await_change()) } -/// The server-role service is the primary endpoint that should be used by clients that do not perform internal load balancing, -/// including targets outside of the cluster. -/// -/// Note that you should generally *not* hard-code clients to use these services; instead, create a [`v1alpha1::ZookeeperZnode`](`v1alpha1::ZookeeperZnode`) -/// and use the connection string that it gives you. -pub fn build_server_role_service( - zk: &v1alpha1::ZookeeperCluster, - resolved_product_image: &ResolvedProductImage, - zookeeper_security: &ZookeeperSecurity, -) -> Result { - let role_name = ZookeeperRole::Server.to_string(); - let role_svc_name = zk - .server_role_service_name() - .context(GlobalServiceNameNotFoundSnafu)?; - - let metadata = ObjectMetaBuilder::new() - .name_and_namespace(zk) - .name(&role_svc_name) - .ownerreference_from_resource(zk, None, Some(true)) - .context(ObjectMissingMetadataForOwnerRefSnafu)? - .with_recommended_labels(build_recommended_labels( - zk, - ZK_CONTROLLER_NAME, - &resolved_product_image.app_version_label, - &role_name, - "global", - )) - .context(ObjectMetaSnafu)? - .build(); - - let service_selector_labels = - Labels::role_selector(zk, APP_NAME, &role_name).context(BuildLabelSnafu)?; - - let service_spec = ServiceSpec { - ports: Some(vec![ServicePort { - name: Some("zk".to_string()), - port: zookeeper_security.client_port().into(), - protocol: Some("TCP".to_string()), - ..ServicePort::default() - }]), - selector: Some(service_selector_labels.into()), - type_: Some(zk.spec.cluster_config.listener_class.k8s_service_type()), - ..ServiceSpec::default() - }; - - Ok(Service { - metadata, - spec: Some(service_spec), - status: None, - }) -} - /// The rolegroup [`ConfigMap`] configures the rolegroup based on the configuration given by the administrator fn build_server_rolegroup_config_map( zk: &v1alpha1::ZookeeperCluster, @@ -675,14 +649,15 @@ fn build_server_rolegroup_service( zk: &v1alpha1::ZookeeperCluster, rolegroup: &RoleGroupRef, resolved_product_image: &ResolvedProductImage, - zookeeper_security: &ZookeeperSecurity, ) -> Result { let prometheus_label = Label::try_from(("prometheus.io/scrape", "true")).context(BuildLabelSnafu)?; let metadata = ObjectMetaBuilder::new() .name_and_namespace(zk) - .name(rolegroup.object_name()) + .name(build_headless_role_group_metrics_service_name( + rolegroup.object_name(), + )) .ownerreference_from_resource(zk, None, Some(true)) .context(ObjectMissingMetadataForOwnerRefSnafu)? .with_recommended_labels(build_recommended_labels( @@ -704,20 +679,12 @@ fn build_server_rolegroup_service( // Internal communication does not need to be exposed type_: Some("ClusterIP".to_string()), cluster_ip: Some("None".to_string()), - ports: Some(vec![ - ServicePort { - name: Some("zk".to_string()), - port: zookeeper_security.client_port().into(), - protocol: Some("TCP".to_string()), - ..ServicePort::default() - }, - ServicePort { - name: Some("metrics".to_string()), - port: 9505, - protocol: Some("TCP".to_string()), - ..ServicePort::default() - }, - ]), + ports: Some(vec![ServicePort { + name: Some(METRICS_PORT_NAME.to_string()), + port: METRICS_PORT.into(), + protocol: Some("TCP".to_string()), + ..ServicePort::default() + }]), selector: Some(service_selector_labels.into()), publish_not_ready_addresses: Some(true), ..ServiceSpec::default() @@ -730,6 +697,19 @@ fn build_server_rolegroup_service( }) } +pub fn build_role_listener_pvc( + group_listener_name: &str, + unversioned_recommended_labels: &Labels, +) -> Result { + ListenerOperatorVolumeSourceBuilder::new( + &ListenerReference::ListenerName(group_listener_name.to_string()), + unversioned_recommended_labels, + ) + .expect("ListenerOperatorVolumeSourceBuilder::new always returns Ok()") + .build_pvc(LISTENER_VOLUME_NAME.to_string()) + .context(BuildListenerPersistentVolumeSnafu) +} + /// The rolegroup [`StatefulSet`] runs the rolegroup, as configured by the administrator. /// /// The [`Pod`](`stackable_operator::k8s_openapi::api::core::v1::Pod`)s are accessible through the corresponding [`Service`] (from [`build_server_rolegroup_service`]). @@ -764,7 +744,7 @@ fn build_server_rolegroup_statefulset( }) .collect::>(); - let (pvc, resources) = zk + let (original_pvcs, resources) = zk .resources(zk_role, rolegroup_ref) .context(CrdValidationFailureSnafu)?; @@ -774,6 +754,31 @@ fn build_server_rolegroup_statefulset( ContainerBuilder::new(APP_NAME).expect("invalid hard-coded container name"); let mut pod_builder = PodBuilder::new(); + // Used for PVC templates that cannot be modified once they are deployed + let unversioned_recommended_labels = Labels::recommended(build_recommended_labels( + zk, + ZK_CONTROLLER_NAME, + // A version value is required, but we need to use something constant so that we don't run into immutabile field issues. + "none", + &rolegroup_ref.role, + &rolegroup_ref.role_group, + )) + .expect("todo: LabelBuildSnafu"); + // .context(LabelBuildSnafu)?; + + let listener_pvc = build_role_listener_pvc( + &zk.server_role_listener_name() + .expect("todo: get role from zk_role"), + &unversioned_recommended_labels, + )?; + + let mut pvcs = original_pvcs; + pvcs.extend([listener_pvc]); + + cb_zookeeper + .add_volume_mount(LISTENER_VOLUME_NAME, LISTENER_VOLUME_DIR) + .context(AddVolumeMountSnafu)?; + let requested_secret_lifetime = merged_config .requested_secret_lifetime .context(MissingSecretLifetimeSnafu)?; @@ -1053,9 +1058,11 @@ fn build_server_rolegroup_statefulset( match_labels: Some(statefulset_match_labels.into()), ..LabelSelector::default() }, - service_name: Some(rolegroup_ref.object_name()), + service_name: Some(build_headless_role_group_metrics_service_name( + rolegroup_ref.object_name(), + )), template: pod_template, - volume_claim_templates: Some(pvc), + volume_claim_templates: Some(pvcs), ..StatefulSetSpec::default() }; diff --git a/rust/operator-binary/src/znode_controller.rs b/rust/operator-binary/src/znode_controller.rs index 41b9f92c..5f996b0f 100644 --- a/rust/operator-binary/src/znode_controller.rs +++ b/rust/operator-binary/src/znode_controller.rs @@ -8,7 +8,8 @@ use snafu::{OptionExt, ResultExt, Snafu}; use stackable_operator::{ cluster_resources::{ClusterResourceApplyStrategy, ClusterResources}, commons::product_image_selection::ResolvedProductImage, - k8s_openapi::api::core::v1::{ConfigMap, Service}, + crd::listener, + k8s_openapi::api::core::v1::ConfigMap, kube::{ self, Resource, api::ObjectMeta, @@ -25,7 +26,7 @@ use tracing::{debug, info}; use crate::{ APP_NAME, OPERATOR_NAME, crd::{DOCKER_IMAGE_BASE_NAME, security::ZookeeperSecurity, v1alpha1}, - discovery::{self, build_discovery_configmaps}, + discovery::{self, build_discovery_configmap}, }; pub const ZNODE_CONTROLLER_NAME: &str = "znode"; @@ -284,9 +285,10 @@ async fn reconcile_apply( znode_path, })?; - let server_role_service = client - .get::( - &zk.server_role_service_name() + let listener = client + .get::( + // TODO (@NickLarsenNZ): Check what this should be. Source it from the right place, or use a const + &zk.server_role_listener_name() .with_context(|| NoZkSvcNameSnafu { zk: ObjectRef::from_obj(&zk), })?, @@ -299,25 +301,23 @@ async fn reconcile_apply( .context(FindZkSvcSnafu { zk: ObjectRef::from_obj(&zk), })?; - for discovery_cm in build_discovery_configmaps( + + let discovery_cm = build_discovery_configmap( &zk, znode, - client, ZNODE_CONTROLLER_NAME, - &server_role_service, + listener, Some(znode_path), resolved_product_image, &zookeeper_security, ) - .await - .context(BuildDiscoveryConfigMapSnafu)? - { - let obj_ref = ObjectRef::from_obj(&discovery_cm); - cluster_resources - .add(client, discovery_cm) - .await - .with_context(|_| ApplyDiscoveryConfigMapSnafu { cm: obj_ref })?; - } + .context(BuildDiscoveryConfigMapSnafu)?; + + let obj_ref = ObjectRef::from_obj(&discovery_cm); + cluster_resources + .add(client, discovery_cm) + .await + .with_context(|_| ApplyDiscoveryConfigMapSnafu { cm: obj_ref })?; cluster_resources .delete_orphaned_resources(client) @@ -357,6 +357,7 @@ async fn reconcile_cleanup( Ok(controller::Action::await_change()) } +// TODO (@NickLarsenNZ): What to do here? fn zk_mgmt_addr( zk: &v1alpha1::ZookeeperCluster, zookeeper_security: &ZookeeperSecurity, @@ -365,12 +366,13 @@ fn zk_mgmt_addr( // Rust ZooKeeper client does not support client-side load-balancing, so use // (load-balanced) global service instead. Ok(format!( - "{}:{}", - zk.server_role_service_fqdn(cluster_info) + "{hostname}:{port}", + hostname = zk + .server_role_listener_fqdn(cluster_info) .with_context(|| NoZkFqdnSnafu { zk: ObjectRef::from_obj(zk), })?, - zookeeper_security.client_port(), + port = zookeeper_security.client_port(), )) } From c79ea85b203b7cc037da87c5c54b3ac7c5857142 Mon Sep 17 00:00:00 2001 From: Nick Larsen Date: Tue, 24 Jun 2025 15:29:06 +0200 Subject: [PATCH 04/18] chore: Update changelog --- CHANGELOG.md | 2 ++ 1 file changed, 2 insertions(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index d8761ce9..4584fd14 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -10,6 +10,7 @@ All notable changes to this project will be documented in this file. - Use `--file-log-max-files` (or `FILE_LOG_MAX_FILES`) to limit the number of log files kept. - Use `--file-log-rotation-period` (or `FILE_LOG_ROTATION_PERIOD`) to configure the frequency of rotation. - Use `--console-log-format` (or `CONSOLE_LOG_FORMAT`) to set the format to `plain` (default) or `json`. +- BREAKING: Add listener support ([#957]). ### Changed @@ -40,6 +41,7 @@ All notable changes to this project will be documented in this file. [#940]: https://github.com/stackabletech/zookeeper-operator/pull/940 [#942]: https://github.com/stackabletech/zookeeper-operator/pull/942 [#946]: https://github.com/stackabletech/zookeeper-operator/pull/946 +[#957]: https://github.com/stackabletech/zookeeper-operator/pull/957 ## [25.3.0] - 2025-03-21 From 0d6527a73f2ab173be2a015523b09a9e8ec4e7c3 Mon Sep 17 00:00:00 2001 From: Nick Larsen Date: Tue, 24 Jun 2025 15:34:10 +0200 Subject: [PATCH 05/18] docs: Update listener usage --- docs/modules/zookeeper/pages/usage_guide/listenerclass.adoc | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/docs/modules/zookeeper/pages/usage_guide/listenerclass.adoc b/docs/modules/zookeeper/pages/usage_guide/listenerclass.adoc index f9813a8b..4e9e8a81 100644 --- a/docs/modules/zookeeper/pages/usage_guide/listenerclass.adoc +++ b/docs/modules/zookeeper/pages/usage_guide/listenerclass.adoc @@ -1,15 +1,16 @@ = Service exposition with ListenerClasses +:description: Configure the ZooKeeper service exposure with listener classes: cluster-internal, external-unstable or external-stable Apache ZooKeeper offers an API. The Operator deploys a service called `` (where `` is the name of the ZookeeperCluster) through which ZooKeeper can be reached. -The operator deploys a xref:listener-operator:listener.adoc[Listener] for the Server pod. +The operator deploys a xref:listener-operator:listener.adoc[Listener] for the Server pods. The listener defaults to only being accessible from within the Kubernetes cluster, but this can be changed by setting `.spec.nodes.config.listenerClass`: [source,yaml] ---- spec: servers: - config: + roleConfig: listenerClass: external-unstable # <1> ---- <1> Specify one of `external-stable`, `external-unstable`, `cluster-internal` (the default setting is `cluster-internal`). From 69bda897e40434ab9c28154f8780a9ef1bcf97c8 Mon Sep 17 00:00:00 2001 From: Nick Larsen Date: Thu, 26 Jun 2025 13:07:22 +0200 Subject: [PATCH 06/18] fix: Use the applied listener to build the discover config map Note: This is because it relies on information in the status field --- rust/operator-binary/src/zk_controller.rs | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/rust/operator-binary/src/zk_controller.rs b/rust/operator-binary/src/zk_controller.rs index cfba85e2..96cf43d2 100644 --- a/rust/operator-binary/src/zk_controller.rs +++ b/rust/operator-binary/src/zk_controller.rs @@ -492,8 +492,8 @@ pub async fn reconcile_zk( let listener = build_role_listener(zk, &zk_role, &resolved_product_image, &zookeeper_security) .context(ListenerConfigurationSnafu)?; - cluster_resources - .add(client, listener.clone()) + let applied_listener = cluster_resources + .add(client, listener) .await .context(ApplyGroupListenerSnafu)?; @@ -504,7 +504,7 @@ pub async fn reconcile_zk( zk, zk, ZK_CONTROLLER_NAME, - listener, + applied_listener, None, &resolved_product_image, &zookeeper_security, From 238dbd853a6ab9753c88729a0e25237aa51bdb22 Mon Sep 17 00:00:00 2001 From: Nick Larsen Date: Wed, 2 Jul 2025 10:12:49 +0200 Subject: [PATCH 07/18] feat: Move ZK ports to headless service --- rust/operator-binary/src/crd/mod.rs | 31 +++--- rust/operator-binary/src/crd/security.rs | 43 +++++++- rust/operator-binary/src/discovery.rs | 15 ++- rust/operator-binary/src/listener.rs | 2 +- rust/operator-binary/src/zk_controller.rs | 104 +++++++++++++++--- rust/operator-binary/src/znode_controller.rs | 20 ++-- tests/templates/kuttl/smoke/10-assert.yaml.j2 | 35 ++++++ tests/templates/kuttl/smoke/test_tls.sh.j2 | 4 +- tests/templates/kuttl/smoke/test_zookeeper.py | 44 ++++++-- 9 files changed, 236 insertions(+), 62 deletions(-) diff --git a/rust/operator-binary/src/crd/mod.rs b/rust/operator-binary/src/crd/mod.rs index 900f886f..5aacef1b 100644 --- a/rust/operator-binary/src/crd/mod.rs +++ b/rust/operator-binary/src/crd/mod.rs @@ -34,7 +34,11 @@ use stackable_operator::{ }; use strum::{Display, EnumIter, EnumString, IntoEnumIterator}; -use crate::crd::{affinity::get_affinity, v1alpha1::ZookeeperServerRoleConfig}; +use crate::{ + crd::{affinity::get_affinity, v1alpha1::ZookeeperServerRoleConfig}, + discovery::build_role_group_headless_service_name, + listener::role_listener_name, +}; pub mod affinity; pub mod authentication; @@ -47,6 +51,9 @@ pub const OPERATOR_NAME: &str = "zookeeper.stackable.tech"; pub const ZOOKEEPER_PROPERTIES_FILE: &str = "zoo.cfg"; pub const JVM_SECURITY_PROPERTIES_FILE: &str = "security.properties"; +pub const ZOOKEEPER_LEADER_PORT: u16 = 2888; +pub const ZOOKEEPER_ELECTION_PORT: u16 = 3888; + pub const METRICS_PORT_NAME: &str = "metrics"; pub const METRICS_PORT: u16 = 9505; @@ -335,7 +342,7 @@ pub enum ZookeeperRole { /// Used for service discovery. pub struct ZookeeperPodRef { pub namespace: String, - pub role_group_service_name: String, + pub role_group_headless_service_name: String, pub pod_name: String, pub zookeeper_myid: u16, } @@ -488,12 +495,11 @@ impl HasStatusCondition for v1alpha1::ZookeeperCluster { } impl ZookeeperPodRef { - // TODO (@NickLarsenNZ): What to do here? - pub fn fqdn(&self, cluster_info: &KubernetesClusterInfo) -> String { + pub fn internal_fqdn(&self, cluster_info: &KubernetesClusterInfo) -> String { format!( "{pod_name}.{service_name}.{namespace}.svc.{cluster_domain}", pod_name = self.pod_name, - service_name = self.role_group_service_name, + service_name = self.role_group_headless_service_name, namespace = self.namespace, cluster_domain = cluster_info.cluster_domain ) @@ -524,13 +530,6 @@ impl v1alpha1::ZookeeperCluster { } } - /// The name of the role-level [Listener] - /// - /// [Listener]: stackable_operator::crd::listener::v1alpha1::Listener - pub fn server_role_listener_name(&self) -> Option { - self.metadata.name.clone() - } - /// The fully-qualified domain name of the role-level [Listener] /// /// [Listener]: stackable_operator::crd::listener::v1alpha1::Listener @@ -540,7 +539,7 @@ impl v1alpha1::ZookeeperCluster { ) -> Option { Some(format!( "{role_listener_name}.{namespace}.svc.{cluster_domain}", - role_listener_name = self.server_role_listener_name()?, + role_listener_name = role_listener_name(self, &ZookeeperRole::Server), namespace = self.metadata.namespace.as_ref()?, cluster_domain = cluster_info.cluster_domain )) @@ -626,8 +625,10 @@ impl v1alpha1::ZookeeperCluster { for i in 0..rolegroup.replicas.unwrap_or(1) { pod_refs.push(ZookeeperPodRef { namespace: ns.clone(), - role_group_service_name: rolegroup_ref.object_name(), - pod_name: format!("{}-{}", rolegroup_ref.object_name(), i), + role_group_headless_service_name: build_role_group_headless_service_name( + rolegroup_ref.object_name(), + ), + pod_name: format!("{role_group}-{i}", role_group = rolegroup_ref.object_name()), zookeeper_myid: i + myid_offset, }); } diff --git a/rust/operator-binary/src/crd/security.rs b/rust/operator-binary/src/crd/security.rs index a667e7da..23daad4b 100644 --- a/rust/operator-binary/src/crd/security.rs +++ b/rust/operator-binary/src/crd/security.rs @@ -25,7 +25,13 @@ use stackable_operator::{ time::Duration, }; -use crate::crd::{authentication, authentication::ResolvedAuthenticationClasses, tls, v1alpha1}; +use crate::{ + crd::{ + authentication::{self, ResolvedAuthenticationClasses}, + tls, v1alpha1, + }, + zk_controller::LISTENER_VOLUME_NAME, +}; type Result = std::result::Result; @@ -158,7 +164,7 @@ impl ZookeeperSecurity { .add_volume_mount(tls_volume_name, Self::SERVER_TLS_DIR) .context(AddVolumeMountSnafu)?; pod_builder - .add_volume(Self::create_tls_volume( + .add_volume(Self::create_server_tls_volume( tls_volume_name, secret_class, requested_secret_lifetime, @@ -172,7 +178,7 @@ impl ZookeeperSecurity { .add_volume_mount(tls_volume_name, Self::QUORUM_TLS_DIR) .context(AddVolumeMountSnafu)?; pod_builder - .add_volume(Self::create_tls_volume( + .add_volume(Self::create_quorum_tls_volume( tls_volume_name, &self.quorum_secret_class, requested_secret_lifetime, @@ -298,8 +304,34 @@ impl ZookeeperSecurity { .or(self.server_secret_class.as_ref()) } - /// Creates ephemeral volumes to mount the `SecretClass` into the Pods - fn create_tls_volume( + /// Creates ephemeral volumes to mount the `SecretClass` with the listener-volume scope into the Pods. + /// + /// The resulting volume will contain TLS certificates with the FQDN reported in the applicable [ListenerStatus]. + /// + /// [ListenerStatus]: ::stackable_operator::crd::listener::v1alpha1::ListenerStatus + fn create_server_tls_volume( + volume_name: &str, + secret_class_name: &str, + requested_secret_lifetime: &Duration, + ) -> Result { + let volume = VolumeBuilder::new(volume_name) + .ephemeral( + SecretOperatorVolumeSourceBuilder::new(secret_class_name) + .with_listener_volume_scope(LISTENER_VOLUME_NAME) + .with_format(SecretFormat::TlsPkcs12) + .with_auto_tls_cert_lifetime(*requested_secret_lifetime) + .build() + .context(BuildTlsVolumeSnafu { volume_name })?, + ) + .build(); + + Ok(volume) + } + + /// Creates ephemeral volumes to mount the `SecretClass` with the pod scope into the Pods. + /// + /// The resulting volume will contain TLS certificates with the FQDN of the Pod in relation to the StatefulSet's headless service. + fn create_quorum_tls_volume( volume_name: &str, secret_class_name: &str, requested_secret_lifetime: &Duration, @@ -308,7 +340,6 @@ impl ZookeeperSecurity { .ephemeral( SecretOperatorVolumeSourceBuilder::new(secret_class_name) .with_pod_scope() - .with_node_scope() .with_format(SecretFormat::TlsPkcs12) .with_auto_tls_cert_lifetime(*requested_secret_lifetime) .build() diff --git a/rust/operator-binary/src/discovery.rs b/rust/operator-binary/src/discovery.rs index 796389b2..574828b0 100644 --- a/rust/operator-binary/src/discovery.rs +++ b/rust/operator-binary/src/discovery.rs @@ -46,7 +46,7 @@ pub enum Error { }, #[snafu(display("{listener} has no ingress addresses"))] - NoListnerAddresses { + NoListenerIngressAddresses { listener: ObjectRef, }, @@ -157,8 +157,7 @@ fn listener_addresses( .status .as_ref() .and_then(|listener_status| listener_status.ingress_addresses.as_ref()) - // TODO (@NickLarsenNZ): Rename error variant - .context(NoListnerAddressesSnafu { listener })? + .context(NoListenerIngressAddressesSnafu { listener })? .iter() // Filter the addresses that have the port we are interested in (they likely all have it though) .filter_map(|listener_ingress| { @@ -185,7 +184,13 @@ fn listener_addresses( } // TODO (@NickLarsenNZ): Implement this directly on RoleGroupRef, ie: -// RoleGroupRef::metrics_service_name(&self) -pub fn build_headless_role_group_metrics_service_name(name: String) -> String { +// RoleGroupRef::metrics_service_name(&self) to restrict what _name_ can be. +pub fn build_role_group_headless_service_name(name: String) -> String { + format!("{name}-headless") +} + +// TODO (@NickLarsenNZ): Implement this directly on RoleGroupRef, ie: +// RoleGroupRef::metrics_service_name(&self) to restrict what _name_ can be. +pub fn build_role_group_metrics_service_name(name: String) -> String { format!("{name}-metrics") } diff --git a/rust/operator-binary/src/listener.rs b/rust/operator-binary/src/listener.rs index 81b76e9f..b47a201c 100644 --- a/rust/operator-binary/src/listener.rs +++ b/rust/operator-binary/src/listener.rs @@ -73,7 +73,7 @@ pub fn build_role_listener( } // TODO (@NickLarsenNZ): This could be a method we can put on a Resource that takes a role_name -fn role_listener_name(zk: &v1alpha1::ZookeeperCluster, zk_role: &ZookeeperRole) -> String { +pub fn role_listener_name(zk: &v1alpha1::ZookeeperCluster, zk_role: &ZookeeperRole) -> String { // TODO (@NickLarsenNZ): Make a convention, do we use name_any() and allow empty string? or metadata.name.expect, or handle the error? format!("{zk}-{zk_role}", zk = zk.name_any()) } diff --git a/rust/operator-binary/src/zk_controller.rs b/rust/operator-binary/src/zk_controller.rs index 96cf43d2..32023998 100644 --- a/rust/operator-binary/src/zk_controller.rs +++ b/rust/operator-binary/src/zk_controller.rs @@ -79,12 +79,15 @@ use crate::{ DOCKER_IMAGE_BASE_NAME, JVM_SECURITY_PROPERTIES_FILE, MAX_PREPARE_LOG_FILE_SIZE, MAX_ZK_LOG_FILES_SIZE, METRICS_PORT, METRICS_PORT_NAME, STACKABLE_CONFIG_DIR, STACKABLE_DATA_DIR, STACKABLE_LOG_CONFIG_DIR, STACKABLE_LOG_DIR, STACKABLE_RW_CONFIG_DIR, - ZOOKEEPER_PROPERTIES_FILE, ZookeeperRole, + ZOOKEEPER_ELECTION_PORT, ZOOKEEPER_LEADER_PORT, ZOOKEEPER_PROPERTIES_FILE, ZookeeperRole, security::{self, ZookeeperSecurity}, v1alpha1::{self, ZookeeperServerRoleConfig}, }, - discovery::{self, build_discovery_configmap, build_headless_role_group_metrics_service_name}, - listener::build_role_listener, + discovery::{ + self, build_discovery_configmap, build_role_group_headless_service_name, + build_role_group_metrics_service_name, + }, + listener::{build_role_listener, role_listener_name}, operations::{graceful_shutdown::add_graceful_shutdown_config, pdb::add_pdbs}, product_logging::extend_role_group_config_map, utils::build_recommended_labels, @@ -430,7 +433,10 @@ pub async fn reconcile_zk( .merged_config(&ZookeeperRole::Server, &rolegroup) .context(FailedToResolveConfigSnafu)?; - let rg_service = build_server_rolegroup_service(zk, &rolegroup, &resolved_product_image)?; + let rg_headless_service = + build_server_rolegroup_headless_service(zk, &rolegroup, &resolved_product_image)?; + let rg_metrics_service = + build_server_rolegroup_metrics_service(zk, &rolegroup, &resolved_product_image)?; let rg_configmap = build_server_rolegroup_config_map( zk, &rolegroup, @@ -451,7 +457,13 @@ pub async fn reconcile_zk( )?; cluster_resources - .add(client, rg_service) + .add(client, rg_headless_service) + .await + .with_context(|_| ApplyRoleGroupServiceSnafu { + rolegroup: rolegroup.clone(), + })?; + cluster_resources + .add(client, rg_metrics_service) .await .with_context(|_| ApplyRoleGroupServiceSnafu { rolegroup: rolegroup.clone(), @@ -556,11 +568,11 @@ fn build_server_rolegroup_config_map( .flatten() .map(|pod| { ( - format!("server.{}", pod.zookeeper_myid), + format!("server.{id}", id = pod.zookeeper_myid), format!( - "{}:2888:3888;{}", - pod.fqdn(cluster_info), - zookeeper_security.client_port() + "{internal_fqdn}:{ZOOKEEPER_LEADER_PORT}:{ZOOKEEPER_ELECTION_PORT};{client_port}", + internal_fqdn = pod.internal_fqdn(cluster_info), + client_port = zookeeper_security.client_port() ), ) }) @@ -642,10 +654,69 @@ fn build_server_rolegroup_config_map( }) } -/// The rolegroup [`Service`] is a headless service that allows direct access to the instances of a certain rolegroup +/// The rolegroup [`Service`] is a headless service that allows internal access to the instances of a certain rolegroup /// /// This is mostly useful for internal communication between peers, or for clients that perform client-side load balancing. -fn build_server_rolegroup_service( +fn build_server_rolegroup_headless_service( + zk: &v1alpha1::ZookeeperCluster, + rolegroup: &RoleGroupRef, + resolved_product_image: &ResolvedProductImage, +) -> Result { + let metadata = ObjectMetaBuilder::new() + .name_and_namespace(zk) + .name(build_role_group_headless_service_name( + rolegroup.object_name(), + )) + .ownerreference_from_resource(zk, None, Some(true)) + .context(ObjectMissingMetadataForOwnerRefSnafu)? + .with_recommended_labels(build_recommended_labels( + zk, + ZK_CONTROLLER_NAME, + &resolved_product_image.app_version_label, + &rolegroup.role, + &rolegroup.role_group, + )) + .context(ObjectMetaSnafu)? + .build(); + + let service_selector_labels = + Labels::role_group_selector(zk, APP_NAME, &rolegroup.role, &rolegroup.role_group) + .context(BuildLabelSnafu)?; + + let service_spec = ServiceSpec { + // Internal communication does not need to be exposed + type_: Some("ClusterIP".to_string()), + cluster_ip: Some("None".to_string()), + ports: Some(vec![ + ServicePort { + // TODO (@NickLarsenNZ): Use a const + name: Some("zk-leader".to_string()), + port: ZOOKEEPER_LEADER_PORT as i32, + protocol: Some("TCP".to_string()), + ..ServicePort::default() + }, + ServicePort { + // TODO (@NickLarsenNZ): Use a const + name: Some("zk-election".to_string()), + port: ZOOKEEPER_ELECTION_PORT as i32, + protocol: Some("TCP".to_string()), + ..ServicePort::default() + }, + ]), + selector: Some(service_selector_labels.into()), + publish_not_ready_addresses: Some(true), + ..ServiceSpec::default() + }; + + Ok(Service { + metadata, + spec: Some(service_spec), + status: None, + }) +} + +/// The rolegroup [`Service`] for exposing metrics +fn build_server_rolegroup_metrics_service( zk: &v1alpha1::ZookeeperCluster, rolegroup: &RoleGroupRef, resolved_product_image: &ResolvedProductImage, @@ -655,7 +726,7 @@ fn build_server_rolegroup_service( let metadata = ObjectMetaBuilder::new() .name_and_namespace(zk) - .name(build_headless_role_group_metrics_service_name( + .name(build_role_group_metrics_service_name( rolegroup.object_name(), )) .ownerreference_from_resource(zk, None, Some(true)) @@ -767,8 +838,7 @@ fn build_server_rolegroup_statefulset( // .context(LabelBuildSnafu)?; let listener_pvc = build_role_listener_pvc( - &zk.server_role_listener_name() - .expect("todo: get role from zk_role"), + &role_listener_name(zk, &ZookeeperRole::Server), &unversioned_recommended_labels, )?; @@ -901,8 +971,8 @@ fn build_server_rolegroup_statefulset( ..Probe::default() }) .add_container_port("zk", zookeeper_security.client_port().into()) - .add_container_port("zk-leader", 2888) - .add_container_port("zk-election", 3888) + .add_container_port("zk-leader", ZOOKEEPER_LEADER_PORT as i32) + .add_container_port("zk-election", ZOOKEEPER_ELECTION_PORT as i32) .add_container_port("metrics", 9505) .add_volume_mount("data", STACKABLE_DATA_DIR) .context(AddVolumeMountSnafu)? @@ -1058,7 +1128,7 @@ fn build_server_rolegroup_statefulset( match_labels: Some(statefulset_match_labels.into()), ..LabelSelector::default() }, - service_name: Some(build_headless_role_group_metrics_service_name( + service_name: Some(build_role_group_headless_service_name( rolegroup_ref.object_name(), )), template: pod_template, diff --git a/rust/operator-binary/src/znode_controller.rs b/rust/operator-binary/src/znode_controller.rs index 5f996b0f..7e797853 100644 --- a/rust/operator-binary/src/znode_controller.rs +++ b/rust/operator-binary/src/znode_controller.rs @@ -25,8 +25,9 @@ use tracing::{debug, info}; use crate::{ APP_NAME, OPERATOR_NAME, - crd::{DOCKER_IMAGE_BASE_NAME, security::ZookeeperSecurity, v1alpha1}, + crd::{DOCKER_IMAGE_BASE_NAME, ZookeeperRole, security::ZookeeperSecurity, v1alpha1}, discovery::{self, build_discovery_configmap}, + listener::role_listener_name, }; pub const ZNODE_CONTROLLER_NAME: &str = "znode"; @@ -287,11 +288,7 @@ async fn reconcile_apply( let listener = client .get::( - // TODO (@NickLarsenNZ): Check what this should be. Source it from the right place, or use a const - &zk.server_role_listener_name() - .with_context(|| NoZkSvcNameSnafu { - zk: ObjectRef::from_obj(&zk), - })?, + &role_listener_name(&zk, &ZookeeperRole::Server), zk.metadata .namespace .as_deref() @@ -357,7 +354,16 @@ async fn reconcile_cleanup( Ok(controller::Action::await_change()) } -// TODO (@NickLarsenNZ): What to do here? +/// Get the ZooKeeper management host:port for the operator to manage the ZooKeeper cluster. +/// +/// This uses the _Server_ Role [Listener] address because it covers ZooKeeper replicas across all +/// RoleGroups. +/// This does mean that when the listenerClass is `external-stable`, the operator will need to be +/// able to access the external adress (eg: Load Balancer). +/// +/// [Listener]: ::stackable_operator::crd::listener::v1alpha1::Listener +// NOTE (@NickLarsenNZ): If we want to keep this traffic internal, we would need to choose one of +// the RoleGroups headless services - or make a dedicated ClusterIP service for the operator to use. fn zk_mgmt_addr( zk: &v1alpha1::ZookeeperCluster, zookeeper_security: &ZookeeperSecurity, diff --git a/tests/templates/kuttl/smoke/10-assert.yaml.j2 b/tests/templates/kuttl/smoke/10-assert.yaml.j2 index 22ebd65a..cfaee9c0 100644 --- a/tests/templates/kuttl/smoke/10-assert.yaml.j2 +++ b/tests/templates/kuttl/smoke/10-assert.yaml.j2 @@ -83,3 +83,38 @@ status: expectedPods: 3 currentHealthy: 3 disruptionsAllowed: 1 +--- +apiVersion: v1 +kind: Service +metadata: + name: test-zk-server +spec: + type: ClusterIP # listenerClass: cluster-internal +--- +apiVersion: v1 +kind: Service +metadata: + name: test-zk-server-primary-headless +spec: + type: ClusterIP +--- +apiVersion: v1 +kind: Service +metadata: + name: test-zk-server-primary-metrics +spec: + type: ClusterIP +--- +apiVersion: v1 +kind: Service +metadata: + name: test-zk-server-secondary-headless +spec: + type: ClusterIP +--- +apiVersion: v1 +kind: Service +metadata: + name: test-zk-server-secondary-metrics +spec: + type: ClusterIP diff --git a/tests/templates/kuttl/smoke/test_tls.sh.j2 b/tests/templates/kuttl/smoke/test_tls.sh.j2 index 2d87370e..cc7be7f8 100755 --- a/tests/templates/kuttl/smoke/test_tls.sh.j2 +++ b/tests/templates/kuttl/smoke/test_tls.sh.j2 @@ -4,9 +4,9 @@ NAMESPACE=$1 {% if test_scenario['values']['use-client-auth-tls'] == 'true' or test_scenario['values']['use-server-tls'] == 'true' %} -SERVER="test-zk-server-primary-1.test-zk-server-primary.${NAMESPACE}.svc.cluster.local:2282" +SERVER="test-zk-server.${NAMESPACE}.svc.cluster.local:2282" {% else %} -SERVER="test-zk-server-primary-1.test-zk-server-primary.${NAMESPACE}.svc.cluster.local:2181" +SERVER="test-zk-server.${NAMESPACE}.svc.cluster.local:2181" {% endif %} # just to be safe... diff --git a/tests/templates/kuttl/smoke/test_zookeeper.py b/tests/templates/kuttl/smoke/test_zookeeper.py index dfef9589..420c9b28 100755 --- a/tests/templates/kuttl/smoke/test_zookeeper.py +++ b/tests/templates/kuttl/smoke/test_zookeeper.py @@ -3,6 +3,7 @@ import requests import time import sys + sys.tracebacklimit = 0 @@ -37,12 +38,23 @@ def check_ruok(hosts): url = host + ":8080/commands/" + cmd_ruok response = try_get(url).json() - if "command" in response and response["command"] == cmd_ruok \ - and "error" in response and response["error"] is None: + if ( + "command" in response + and response["command"] == cmd_ruok + and "error" in response + and response["error"] is None + ): continue else: - print("Error[" + cmd_ruok + "] for [" + url + "]: received " + str( - response) + " - expected {'command': 'ruok', 'error': None} ") + print( + "Error[" + + cmd_ruok + + "] for [" + + url + + "]: received " + + str(response) + + " - expected {'command': 'ruok', 'error': None} " + ) exit(-1) @@ -58,15 +70,29 @@ def check_monitoring(hosts): exit(-1) -if __name__ == '__main__': +if __name__ == "__main__": all_args = argparse.ArgumentParser(description="Test ZooKeeper.") - all_args.add_argument("-n", "--namespace", help="The namespace to run in", required=True) + all_args.add_argument( + "-n", "--namespace", help="The namespace to run in", required=True + ) args = vars(all_args.parse_args()) namespace = args["namespace"] - host_primary_0 = "http://test-zk-server-primary-0.test-zk-server-primary." + namespace + ".svc.cluster.local" - host_primary_1 = "http://test-zk-server-primary-1.test-zk-server-primary." + namespace + ".svc.cluster.local" - host_secondary = "http://test-zk-server-secondary-0.test-zk-server-secondary." + namespace + ".svc.cluster.local" + host_primary_0 = ( + "http://test-zk-server-primary-0.test-zk-server-primary-headless." + + namespace + + ".svc.cluster.local" + ) + host_primary_1 = ( + "http://test-zk-server-primary-1.test-zk-server-primary-headless." + + namespace + + ".svc.cluster.local" + ) + host_secondary = ( + "http://test-zk-server-secondary-0.test-zk-server-secondary-headless." + + namespace + + ".svc.cluster.local" + ) hosts = [host_primary_0, host_primary_1, host_secondary] From c06c98c04c7d9c58f8aa4b2178011b9f2d15cc06 Mon Sep 17 00:00:00 2001 From: Nick Larsen Date: Wed, 2 Jul 2025 14:46:39 +0200 Subject: [PATCH 08/18] chore: Handle errors properly --- rust/operator-binary/src/listener.rs | 11 +++++++++-- rust/operator-binary/src/zk_controller.rs | 4 ++-- rust/operator-binary/src/znode_controller.rs | 1 + 3 files changed, 12 insertions(+), 4 deletions(-) diff --git a/rust/operator-binary/src/listener.rs b/rust/operator-binary/src/listener.rs index b47a201c..8700a924 100644 --- a/rust/operator-binary/src/listener.rs +++ b/rust/operator-binary/src/listener.rs @@ -16,6 +16,12 @@ type Result = std::result::Result; #[derive(Snafu, Debug)] pub enum Error { + #[snafu(display("Role {zk_role:?} is not defined in the ZooKeeperCluster spec"))] + InvalidRole { + source: crate::crd::Error, + zk_role: String, + }, + #[snafu(display("object is missing metadata to build owner reference"))] ObjectMissingMetadataForOwnerRef { source: stackable_operator::builder::meta::Error, @@ -33,11 +39,12 @@ pub fn build_role_listener( resolved_product_image: &ResolvedProductImage, zookeeper_security: &ZookeeperSecurity, ) -> Result { - // TODO (@NickLarsenNZ): Move this to a common function that takes a zk and a zk_role so that we can use it in other places like the PVC generation let listener_name = role_listener_name(zk, zk_role); let listener_class = &zk .role(zk_role) - .expect("handle error: valid role") + .with_context(|_| InvalidRoleSnafu { + zk_role: zk_role.to_string(), + })? .role_config .listener_class; diff --git a/rust/operator-binary/src/zk_controller.rs b/rust/operator-binary/src/zk_controller.rs index 32023998..20fc13eb 100644 --- a/rust/operator-binary/src/zk_controller.rs +++ b/rust/operator-binary/src/zk_controller.rs @@ -834,8 +834,7 @@ fn build_server_rolegroup_statefulset( &rolegroup_ref.role, &rolegroup_ref.role_group, )) - .expect("todo: LabelBuildSnafu"); - // .context(LabelBuildSnafu)?; + .context(BuildLabelSnafu)?; let listener_pvc = build_role_listener_pvc( &role_listener_name(zk, &ZookeeperRole::Server), @@ -970,6 +969,7 @@ fn build_server_rolegroup_statefulset( period_seconds: Some(1), ..Probe::default() }) + // TODO (@NickLarsenNZ): Use consts for the port names (since they are used in multiple places) .add_container_port("zk", zookeeper_security.client_port().into()) .add_container_port("zk-leader", ZOOKEEPER_LEADER_PORT as i32) .add_container_port("zk-election", ZOOKEEPER_ELECTION_PORT as i32) diff --git a/rust/operator-binary/src/znode_controller.rs b/rust/operator-binary/src/znode_controller.rs index 7e797853..38439863 100644 --- a/rust/operator-binary/src/znode_controller.rs +++ b/rust/operator-binary/src/znode_controller.rs @@ -274,6 +274,7 @@ async fn reconcile_apply( &znode.object_ref(&()), ClusterResourceApplyStrategy::from(&zk.spec.cluster_operation), ) + // TODO (@NickLarsenNZ): Handle this error properly. znode should contain namespace/name, but there is no guarantee .unwrap(); znode_mgmt::ensure_znode_exists( From 5fe88d43d09e69f97c25f462fb8f710e5a2c3e28 Mon Sep 17 00:00:00 2001 From: Nick Larsen Date: Thu, 3 Jul 2025 10:17:57 +0200 Subject: [PATCH 09/18] docs: Fix doc refs --- rust/operator-binary/src/discovery.rs | 2 -- rust/operator-binary/src/zk_controller.rs | 2 +- 2 files changed, 1 insertion(+), 3 deletions(-) diff --git a/rust/operator-binary/src/discovery.rs b/rust/operator-binary/src/discovery.rs index 574828b0..4f8c1aa1 100644 --- a/rust/operator-binary/src/discovery.rs +++ b/rust/operator-binary/src/discovery.rs @@ -62,8 +62,6 @@ pub enum Error { } /// Build a discovery [`ConfigMap`] containing connection details for a [`v1alpha1::ZookeeperCluster`] from a [`listener::v1alpha1::Listener`]. -/// -/// `hosts` will usually come from either [`pod_hosts`] or [`nodeport_hosts`]. #[allow(clippy::too_many_arguments)] pub fn build_discovery_configmap( zk: &v1alpha1::ZookeeperCluster, diff --git a/rust/operator-binary/src/zk_controller.rs b/rust/operator-binary/src/zk_controller.rs index 9682dea4..102a01dc 100644 --- a/rust/operator-binary/src/zk_controller.rs +++ b/rust/operator-binary/src/zk_controller.rs @@ -797,7 +797,7 @@ pub fn build_role_listener_pvc( /// The rolegroup [`StatefulSet`] runs the rolegroup, as configured by the administrator. /// -/// The [`Pod`](`stackable_operator::k8s_openapi::api::core::v1::Pod`)s are accessible through the corresponding [`Service`] (from [`build_server_rolegroup_service`]). +/// The [`Pod`](`stackable_operator::k8s_openapi::api::core::v1::Pod`)s are accessible through the corresponding headless [`Service`] (from [`build_server_rolegroup_headless_service`]). #[allow(clippy::too_many_arguments)] fn build_server_rolegroup_statefulset( zk: &v1alpha1::ZookeeperCluster, From c0d6569ee0ce5262db2d02de06a8551191884f0e Mon Sep 17 00:00:00 2001 From: Nick Larsen Date: Thu, 3 Jul 2025 10:47:42 +0200 Subject: [PATCH 10/18] chore: tidy up metrics ports and use consts --- rust/operator-binary/src/config/jvm.rs | 6 ++-- rust/operator-binary/src/crd/mod.rs | 8 +++-- rust/operator-binary/src/discovery.rs | 4 +-- rust/operator-binary/src/listener.rs | 7 ++-- rust/operator-binary/src/zk_controller.rs | 40 ++++++++++++----------- 5 files changed, 35 insertions(+), 30 deletions(-) diff --git a/rust/operator-binary/src/config/jvm.rs b/rust/operator-binary/src/config/jvm.rs index 12aa3bf5..7f9b3015 100644 --- a/rust/operator-binary/src/config/jvm.rs +++ b/rust/operator-binary/src/config/jvm.rs @@ -5,8 +5,8 @@ use stackable_operator::{ }; use crate::crd::{ - JVM_SECURITY_PROPERTIES_FILE, LOG4J_CONFIG_FILE, LOGBACK_CONFIG_FILE, LoggingFramework, - METRICS_PORT, STACKABLE_CONFIG_DIR, STACKABLE_LOG_CONFIG_DIR, + JMX_METRICS_PORT, JVM_SECURITY_PROPERTIES_FILE, LOG4J_CONFIG_FILE, LOGBACK_CONFIG_FILE, + LoggingFramework, STACKABLE_CONFIG_DIR, STACKABLE_LOG_CONFIG_DIR, v1alpha1::{ ZookeeperCluster, ZookeeperConfig, ZookeeperConfigFragment, ZookeeperServerRoleConfig, }, @@ -39,7 +39,7 @@ fn construct_jvm_args( let jvm_args = vec![ format!("-Djava.security.properties={STACKABLE_CONFIG_DIR}/{JVM_SECURITY_PROPERTIES_FILE}"), format!( - "-javaagent:/stackable/jmx/jmx_prometheus_javaagent.jar={METRICS_PORT}:/stackable/jmx/server.yaml" + "-javaagent:/stackable/jmx/jmx_prometheus_javaagent.jar={JMX_METRICS_PORT}:/stackable/jmx/server.yaml" ), match logging_framework { LoggingFramework::LOG4J => { diff --git a/rust/operator-binary/src/crd/mod.rs b/rust/operator-binary/src/crd/mod.rs index 06e92c15..bd093053 100644 --- a/rust/operator-binary/src/crd/mod.rs +++ b/rust/operator-binary/src/crd/mod.rs @@ -51,12 +51,16 @@ pub const OPERATOR_NAME: &str = "zookeeper.stackable.tech"; pub const ZOOKEEPER_PROPERTIES_FILE: &str = "zoo.cfg"; pub const JVM_SECURITY_PROPERTIES_FILE: &str = "security.properties"; +pub const ZOOKEEPER_SERVER_PORT_NAME: &str = "zk"; +pub const ZOOKEEPER_LEADER_PORT_NAME: &str = "zk-leader"; pub const ZOOKEEPER_LEADER_PORT: u16 = 2888; +pub const ZOOKEEPER_ELECTION_PORT_NAME: &str = "zk-election"; pub const ZOOKEEPER_ELECTION_PORT: u16 = 3888; -pub const METRICS_PORT_NAME: &str = "metrics"; -pub const METRICS_PORT: u16 = 9505; +pub const JMX_METRICS_PORT_NAME: &str = "metrics"; +pub const JMX_METRICS_PORT: u16 = 9505; pub const METRICS_PROVIDER_HTTP_PORT_KEY: &str = "metricsProvider.httpPort"; +pub const METRICS_PROVIDER_HTTP_PORT_NAME: &str = "native-metrics"; pub const METRICS_PROVIDER_HTTP_PORT: u16 = 7000; pub const STACKABLE_DATA_DIR: &str = "/stackable/data"; diff --git a/rust/operator-binary/src/discovery.rs b/rust/operator-binary/src/discovery.rs index 4f8c1aa1..97e7af0c 100644 --- a/rust/operator-binary/src/discovery.rs +++ b/rust/operator-binary/src/discovery.rs @@ -10,7 +10,7 @@ use stackable_operator::{ }; use crate::{ - crd::{ZookeeperRole, security::ZookeeperSecurity, v1alpha1}, + crd::{ZOOKEEPER_SERVER_PORT_NAME, ZookeeperRole, security::ZookeeperSecurity, v1alpha1}, utils::build_recommended_labels, }; @@ -75,7 +75,7 @@ pub fn build_discovery_configmap( let name = owner.name_unchecked(); let namespace = owner.namespace().context(NoNamespaceSnafu)?; - let listener_addresses = listener_addresses(&listener, "zk")?; + let listener_addresses = listener_addresses(&listener, ZOOKEEPER_SERVER_PORT_NAME)?; // Write a connection string of the format that Java ZooKeeper client expects: // "{host1}:{port1},{host2:port2},.../{chroot}" diff --git a/rust/operator-binary/src/listener.rs b/rust/operator-binary/src/listener.rs index 8700a924..90a6f32f 100644 --- a/rust/operator-binary/src/listener.rs +++ b/rust/operator-binary/src/listener.rs @@ -7,7 +7,7 @@ use stackable_operator::{ }; use crate::{ - crd::{ZookeeperRole, security::ZookeeperSecurity, v1alpha1}, + crd::{ZOOKEEPER_SERVER_PORT_NAME, ZookeeperRole, security::ZookeeperSecurity, v1alpha1}, utils::build_recommended_labels, zk_controller::ZK_CONTROLLER_NAME, }; @@ -85,11 +85,10 @@ pub fn role_listener_name(zk: &v1alpha1::ZookeeperCluster, zk_role: &ZookeeperRo format!("{zk}-{zk_role}", zk = zk.name_any()) } -// We only use the http port here and intentionally omit -// the metrics one. +// We only use the server port here and intentionally omit the metrics one. fn listener_ports(zookeeper_security: &ZookeeperSecurity) -> Vec { vec![listener::v1alpha1::ListenerPort { - name: "zk".to_string(), + name: ZOOKEEPER_SERVER_PORT_NAME.to_string(), port: zookeeper_security.client_port().into(), protocol: Some("TCP".to_string()), }] diff --git a/rust/operator-binary/src/zk_controller.rs b/rust/operator-binary/src/zk_controller.rs index 102a01dc..2891cbff 100644 --- a/rust/operator-binary/src/zk_controller.rs +++ b/rust/operator-binary/src/zk_controller.rs @@ -76,11 +76,14 @@ use crate::{ command::create_init_container_command_args, config::jvm::{construct_non_heap_jvm_args, construct_zk_server_heap_env}, crd::{ - DOCKER_IMAGE_BASE_NAME, JVM_SECURITY_PROPERTIES_FILE, MAX_PREPARE_LOG_FILE_SIZE, - MAX_ZK_LOG_FILES_SIZE, METRICS_PORT, METRICS_PORT_NAME, METRICS_PROVIDER_HTTP_PORT, - METRICS_PROVIDER_HTTP_PORT_KEY, STACKABLE_CONFIG_DIR, STACKABLE_DATA_DIR, + DOCKER_IMAGE_BASE_NAME, JMX_METRICS_PORT, JMX_METRICS_PORT_NAME, + JVM_SECURITY_PROPERTIES_FILE, MAX_PREPARE_LOG_FILE_SIZE, MAX_ZK_LOG_FILES_SIZE, + METRICS_PROVIDER_HTTP_PORT, METRICS_PROVIDER_HTTP_PORT_KEY, + METRICS_PROVIDER_HTTP_PORT_NAME, STACKABLE_CONFIG_DIR, STACKABLE_DATA_DIR, STACKABLE_LOG_CONFIG_DIR, STACKABLE_LOG_DIR, STACKABLE_RW_CONFIG_DIR, - ZOOKEEPER_ELECTION_PORT, ZOOKEEPER_LEADER_PORT, ZOOKEEPER_PROPERTIES_FILE, ZookeeperRole, + ZOOKEEPER_ELECTION_PORT, ZOOKEEPER_ELECTION_PORT_NAME, ZOOKEEPER_LEADER_PORT, + ZOOKEEPER_LEADER_PORT_NAME, ZOOKEEPER_PROPERTIES_FILE, ZOOKEEPER_SERVER_PORT_NAME, + ZookeeperRole, security::{self, ZookeeperSecurity}, v1alpha1::{self, ZookeeperServerRoleConfig}, }, @@ -693,15 +696,13 @@ fn build_server_rolegroup_headless_service( cluster_ip: Some("None".to_string()), ports: Some(vec![ ServicePort { - // TODO (@NickLarsenNZ): Use a const - name: Some("zk-leader".to_string()), + name: Some(ZOOKEEPER_LEADER_PORT_NAME.to_string()), port: ZOOKEEPER_LEADER_PORT as i32, protocol: Some("TCP".to_string()), ..ServicePort::default() }, ServicePort { - // TODO (@NickLarsenNZ): Use a const - name: Some("zk-election".to_string()), + name: Some(ZOOKEEPER_ELECTION_PORT_NAME.to_string()), port: ZOOKEEPER_ELECTION_PORT as i32, protocol: Some("TCP".to_string()), ..ServicePort::default() @@ -757,15 +758,14 @@ fn build_server_rolegroup_metrics_service( cluster_ip: Some("None".to_string()), ports: Some(vec![ ServicePort { - name: Some(METRICS_PORT_NAME.to_string()), - port: METRICS_PORT.into(), + name: Some(JMX_METRICS_PORT_NAME.to_string()), + port: JMX_METRICS_PORT as i32, protocol: Some("TCP".to_string()), ..ServicePort::default() }, ServicePort { - // TODO (@NickLarsenNZ): Use a const: METRICS_PROVIDER_HTTP_PORT_NAME - name: Some("native-metrics".to_string()), - port: metrics_port_from_rolegroup_config(rolegroup_config).into(), + name: Some(METRICS_PROVIDER_HTTP_PORT_NAME.to_string()), + port: metrics_port_from_rolegroup_config(rolegroup_config) as i32, protocol: Some("TCP".to_string()), ..ServicePort::default() }, @@ -983,13 +983,15 @@ fn build_server_rolegroup_statefulset( period_seconds: Some(1), ..Probe::default() }) - // TODO (@NickLarsenNZ): Use consts for the port names (since they are used in multiple places) - .add_container_port("zk", zookeeper_security.client_port().into()) - .add_container_port("zk-leader", ZOOKEEPER_LEADER_PORT as i32) - .add_container_port("zk-election", ZOOKEEPER_ELECTION_PORT as i32) - .add_container_port("metrics", 9505) .add_container_port( - "native-metrics", + ZOOKEEPER_SERVER_PORT_NAME, + zookeeper_security.client_port() as i32, + ) + .add_container_port(ZOOKEEPER_LEADER_PORT_NAME, ZOOKEEPER_LEADER_PORT as i32) + .add_container_port(ZOOKEEPER_ELECTION_PORT_NAME, ZOOKEEPER_ELECTION_PORT as i32) + .add_container_port(JMX_METRICS_PORT_NAME, 9505) + .add_container_port( + METRICS_PROVIDER_HTTP_PORT_NAME, metrics_port_from_rolegroup_config(server_config).into(), ) .add_volume_mount("data", STACKABLE_DATA_DIR) From fd0e4a8a5bfc29d34629fd4d2c2503648b7f096d Mon Sep 17 00:00:00 2001 From: Nick <10092581+NickLarsenNZ@users.noreply.github.com> Date: Thu, 3 Jul 2025 12:32:16 +0200 Subject: [PATCH 11/18] Remove commented code --- rust/operator-binary/src/discovery.rs | 12 ------------ 1 file changed, 12 deletions(-) diff --git a/rust/operator-binary/src/discovery.rs b/rust/operator-binary/src/discovery.rs index 97e7af0c..5f7895d1 100644 --- a/rust/operator-binary/src/discovery.rs +++ b/rust/operator-binary/src/discovery.rs @@ -123,18 +123,6 @@ pub fn build_discovery_configmap( .context(BuildConfigMapSnafu) } -// /// Lists all Pods FQDNs expected to host the [`v1alpha1::ZookeeperCluster`] -// fn pod_hosts<'a>( -// zk: &'a v1alpha1::ZookeeperCluster, -// zookeeper_security: &'a ZookeeperSecurity, -// cluster_info: &'a KubernetesClusterInfo, -// ) -> Result + 'a> { -// Ok(zk -// .pods() -// .context(ExpectedPodsSnafu)? -// .map(|pod_ref| (pod_ref.fqdn(cluster_info), zookeeper_security.client_port()))) -// } - /// Lists all listener address and port number pairs for a given `port_name` for Pods participating in the [`Listener`][1] /// /// This returns pairs of `(Address, Port)`, where address could be a hostname or IP address of a node, clusterIP or external From ba1180deda18ce4909a48f9d35c27d455c601a4a Mon Sep 17 00:00:00 2001 From: Nick <10092581+NickLarsenNZ@users.noreply.github.com> Date: Thu, 3 Jul 2025 15:17:37 +0200 Subject: [PATCH 12/18] Apply suggestions from code review Co-authored-by: Malte Sander --- docs/modules/zookeeper/pages/usage_guide/listenerclass.adoc | 2 +- rust/operator-binary/src/crd/mod.rs | 2 +- rust/operator-binary/src/listener.rs | 4 ++-- rust/operator-binary/src/znode_controller.rs | 2 +- 4 files changed, 5 insertions(+), 5 deletions(-) diff --git a/docs/modules/zookeeper/pages/usage_guide/listenerclass.adoc b/docs/modules/zookeeper/pages/usage_guide/listenerclass.adoc index 4e9e8a81..3bae38e3 100644 --- a/docs/modules/zookeeper/pages/usage_guide/listenerclass.adoc +++ b/docs/modules/zookeeper/pages/usage_guide/listenerclass.adoc @@ -4,7 +4,7 @@ Apache ZooKeeper offers an API. The Operator deploys a service called `` (where `` is the name of the ZookeeperCluster) through which ZooKeeper can be reached. The operator deploys a xref:listener-operator:listener.adoc[Listener] for the Server pods. -The listener defaults to only being accessible from within the Kubernetes cluster, but this can be changed by setting `.spec.nodes.config.listenerClass`: +The listener defaults to only being accessible from within the Kubernetes cluster, but this can be changed by setting `.spec.servers.roleConfig.listenerClass`: [source,yaml] ---- diff --git a/rust/operator-binary/src/crd/mod.rs b/rust/operator-binary/src/crd/mod.rs index bd093053..d05fb6ca 100644 --- a/rust/operator-binary/src/crd/mod.rs +++ b/rust/operator-binary/src/crd/mod.rs @@ -164,7 +164,7 @@ pub mod versioned { #[serde(flatten)] pub common: GenericRoleConfig, - /// This field controls which [ListenerClass](DOCS_BASE_URL_PLACEHOLDER/listener-operator/listenerclass.html) is used to expose the ZooKeeper nodes. + /// This field controls which [ListenerClass](DOCS_BASE_URL_PLACEHOLDER/listener-operator/listenerclass.html) is used to expose the ZooKeeper servers. #[serde(default = "default_listener_class")] pub listener_class: String, } diff --git a/rust/operator-binary/src/listener.rs b/rust/operator-binary/src/listener.rs index 90a6f32f..71575976 100644 --- a/rust/operator-binary/src/listener.rs +++ b/rust/operator-binary/src/listener.rs @@ -64,7 +64,7 @@ pub fn build_role_listener( ZK_CONTROLLER_NAME, &resolved_product_image.app_version_label, &zk_role.to_string(), - "global", // TODO (@NickLarsenNZ): update build_recommended_labels to have an optional role_group + "none", // TODO (@NickLarsenNZ): update build_recommended_labels to have an optional role_group )) .context(RecommendedLabelsSnafu)? .build(), @@ -81,7 +81,7 @@ pub fn build_role_listener( // TODO (@NickLarsenNZ): This could be a method we can put on a Resource that takes a role_name pub fn role_listener_name(zk: &v1alpha1::ZookeeperCluster, zk_role: &ZookeeperRole) -> String { - // TODO (@NickLarsenNZ): Make a convention, do we use name_any() and allow empty string? or metadata.name.expect, or handle the error? + // TODO (@NickLarsenNZ): Make a convention, do we use name_any() and allow empty string? or handle the error (as unlikely as it would be)? format!("{zk}-{zk_role}", zk = zk.name_any()) } diff --git a/rust/operator-binary/src/znode_controller.rs b/rust/operator-binary/src/znode_controller.rs index 38439863..a90ddbfd 100644 --- a/rust/operator-binary/src/znode_controller.rs +++ b/rust/operator-binary/src/znode_controller.rs @@ -360,7 +360,7 @@ async fn reconcile_cleanup( /// This uses the _Server_ Role [Listener] address because it covers ZooKeeper replicas across all /// RoleGroups. /// This does mean that when the listenerClass is `external-stable`, the operator will need to be -/// able to access the external adress (eg: Load Balancer). +/// able to access the external address (eg: Load Balancer). /// /// [Listener]: ::stackable_operator::crd::listener::v1alpha1::Listener // NOTE (@NickLarsenNZ): If we want to keep this traffic internal, we would need to choose one of From c28806362d2ecced62ab94d7ce56184692ca6fff Mon Sep 17 00:00:00 2001 From: Nick Larsen Date: Thu, 3 Jul 2025 15:18:55 +0200 Subject: [PATCH 13/18] chore: Update CRD field descriptions --- deploy/helm/zookeeper-operator/crds/crds.yaml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/deploy/helm/zookeeper-operator/crds/crds.yaml b/deploy/helm/zookeeper-operator/crds/crds.yaml index 18aa6e3e..0d6db85f 100644 --- a/deploy/helm/zookeeper-operator/crds/crds.yaml +++ b/deploy/helm/zookeeper-operator/crds/crds.yaml @@ -437,7 +437,7 @@ spec: properties: listenerClass: default: cluster-internal - description: This field controls which [ListenerClass](https://docs.stackable.tech/home/nightly/listener-operator/listenerclass.html) is used to expose the ZooKeeper nodes. + description: This field controls which [ListenerClass](https://docs.stackable.tech/home/nightly/listener-operator/listenerclass.html) is used to expose the ZooKeeper servers. type: string podDisruptionBudget: default: From 09476f371e0f5b9ea832c7194a7175189be539fe Mon Sep 17 00:00:00 2001 From: Nick Larsen Date: Thu, 3 Jul 2025 15:28:31 +0200 Subject: [PATCH 14/18] fix: Handle unlikely error instead of crashing --- rust/operator-binary/src/znode_controller.rs | 19 ++++++++++++------- 1 file changed, 12 insertions(+), 7 deletions(-) diff --git a/rust/operator-binary/src/znode_controller.rs b/rust/operator-binary/src/znode_controller.rs index a90ddbfd..9e063430 100644 --- a/rust/operator-binary/src/znode_controller.rs +++ b/rust/operator-binary/src/znode_controller.rs @@ -124,6 +124,11 @@ pub enum Error { #[snafu(display("failed to initialize security context"))] FailedToInitializeSecurityContext { source: crate::crd::security::Error }, + + #[snafu(display("OwnerRef missing expected keys (name and/or namespace)"))] + OwnerRefMissingExpectedKeys { + source: stackable_operator::cluster_resources::Error, + }, } type Result = std::result::Result; @@ -155,7 +160,7 @@ impl ReconcilerError for Error { fn secondary_object(&self) -> Option> { match self { - Error::InvalidZookeeperZnode { source: _ } => None, + Error::InvalidZookeeperZnode { .. } => None, Error::ObjectMissingMetadata => None, Error::InvalidZkReference => None, Error::FindZk { zk, .. } => Some(zk.clone().erase()), @@ -165,13 +170,14 @@ impl ReconcilerError for Error { Error::NoZkFqdn { zk } => Some(zk.clone().erase()), Error::EnsureZnode { zk, .. } => Some(zk.clone().erase()), Error::EnsureZnodeMissing { zk, .. } => Some(zk.clone().erase()), - Error::BuildDiscoveryConfigMap { source: _ } => None, + Error::BuildDiscoveryConfigMap { .. } => None, Error::ApplyDiscoveryConfigMap { cm, .. } => Some(cm.clone().erase()), Error::ApplyStatus { .. } => None, - Error::Finalizer { source: _ } => None, - Error::DeleteOrphans { source: _ } => None, + Error::Finalizer { .. } => None, + Error::DeleteOrphans { .. } => None, Error::ObjectHasNoNamespace => None, - Error::FailedToInitializeSecurityContext { source: _ } => None, + Error::FailedToInitializeSecurityContext { .. } => None, + Error::OwnerRefMissingExpectedKeys { .. } => None, } } } @@ -274,8 +280,7 @@ async fn reconcile_apply( &znode.object_ref(&()), ClusterResourceApplyStrategy::from(&zk.spec.cluster_operation), ) - // TODO (@NickLarsenNZ): Handle this error properly. znode should contain namespace/name, but there is no guarantee - .unwrap(); + .context(OwnerRefMissingExpectedKeysSnafu)?; znode_mgmt::ensure_znode_exists( &zk_mgmt_addr(&zk, &zookeeper_security, &client.kubernetes_cluster_info)?, From 7ade4490f1a82d207bd4381fb13626fdea46f456 Mon Sep 17 00:00:00 2001 From: Nick <10092581+NickLarsenNZ@users.noreply.github.com> Date: Thu, 3 Jul 2025 15:29:52 +0200 Subject: [PATCH 15/18] Update rust/operator-binary/src/zk_controller.rs Co-authored-by: Malte Sander --- rust/operator-binary/src/zk_controller.rs | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/rust/operator-binary/src/zk_controller.rs b/rust/operator-binary/src/zk_controller.rs index 2891cbff..b105adcb 100644 --- a/rust/operator-binary/src/zk_controller.rs +++ b/rust/operator-binary/src/zk_controller.rs @@ -493,10 +493,7 @@ pub async fn reconcile_zk( } let role_config = zk.role_config(&zk_role); - if let Some(ZookeeperServerRoleConfig { - common, - listener_class: _, - }) = role_config + if let Some(ZookeeperServerRoleConfig { common, .. }) = role_config { { add_pdbs( &common.pod_disruption_budget, From 9690a921bf35e492cf4b30fba59139e7829f356e Mon Sep 17 00:00:00 2001 From: Nick Larsen Date: Fri, 4 Jul 2025 08:10:50 +0200 Subject: [PATCH 16/18] chore: Add znode ref to error message --- rust/operator-binary/src/znode_controller.rs | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/rust/operator-binary/src/znode_controller.rs b/rust/operator-binary/src/znode_controller.rs index 9e063430..1ff7a8ed 100644 --- a/rust/operator-binary/src/znode_controller.rs +++ b/rust/operator-binary/src/znode_controller.rs @@ -125,9 +125,10 @@ pub enum Error { #[snafu(display("failed to initialize security context"))] FailedToInitializeSecurityContext { source: crate::crd::security::Error }, - #[snafu(display("OwnerRef missing expected keys (name and/or namespace)"))] - OwnerRefMissingExpectedKeys { + #[snafu(display("Znode {znode:?} missing expected keys (name and/or namespace)"))] + ZnodeMissingExpectedKeys { source: stackable_operator::cluster_resources::Error, + znode: ObjectRef, }, } type Result = std::result::Result; @@ -177,7 +178,7 @@ impl ReconcilerError for Error { Error::DeleteOrphans { .. } => None, Error::ObjectHasNoNamespace => None, Error::FailedToInitializeSecurityContext { .. } => None, - Error::OwnerRefMissingExpectedKeys { .. } => None, + Error::ZnodeMissingExpectedKeys { .. } => None, } } } @@ -280,7 +281,7 @@ async fn reconcile_apply( &znode.object_ref(&()), ClusterResourceApplyStrategy::from(&zk.spec.cluster_operation), ) - .context(OwnerRefMissingExpectedKeysSnafu)?; + .context(ZnodeMissingExpectedKeysSnafu { znode })?; znode_mgmt::ensure_znode_exists( &zk_mgmt_addr(&zk, &zookeeper_security, &client.kubernetes_cluster_info)?, From 01d10c4f253cd53d20d56379dd1a28dff9b934e8 Mon Sep 17 00:00:00 2001 From: Nick Larsen Date: Fri, 4 Jul 2025 08:11:43 +0200 Subject: [PATCH 17/18] style: Replace empty format interpolations --- rust/operator-binary/src/znode_controller.rs | 28 ++++++++++---------- 1 file changed, 14 insertions(+), 14 deletions(-) diff --git a/rust/operator-binary/src/znode_controller.rs b/rust/operator-binary/src/znode_controller.rs index 1ff7a8ed..20ac36fe 100644 --- a/rust/operator-binary/src/znode_controller.rs +++ b/rust/operator-binary/src/znode_controller.rs @@ -54,7 +54,7 @@ pub enum Error { #[snafu(display("object does not refer to ZookeeperCluster"))] InvalidZkReference, - #[snafu(display("could not find {}", zk))] + #[snafu(display("could not find {zk:?}"))] FindZk { source: stackable_operator::client::Error, zk: ObjectRef, @@ -65,30 +65,30 @@ pub enum Error { zk: ObjectRef, }, - #[snafu(display("could not find server role service name for {}", zk))] + #[snafu(display("could not find server role service name for {zk:?}"))] NoZkSvcName { zk: ObjectRef, }, - #[snafu(display("could not find server role service for {}", zk))] + #[snafu(display("could not find server role service for {zk:?}"))] FindZkSvc { source: stackable_operator::client::Error, zk: ObjectRef, }, - #[snafu(display("failed to calculate FQDN for {}", zk))] + #[snafu(display("failed to calculate FQDN for {zk:?}"))] NoZkFqdn { zk: ObjectRef, }, - #[snafu(display("failed to ensure that ZNode {} exists in {}", znode_path, zk))] + #[snafu(display("failed to ensure that ZNode {znode_path:?} exists in {zk:?}"))] EnsureZnode { source: znode_mgmt::Error, zk: ObjectRef, znode_path: String, }, - #[snafu(display("failed to ensure that ZNode {} is missing from {}", znode_path, zk))] + #[snafu(display("failed to ensure that ZNode {znode_path:?} is missing from {zk:?}"))] EnsureZnodeMissing { source: znode_mgmt::Error, zk: ObjectRef, @@ -98,7 +98,7 @@ pub enum Error { #[snafu(display("failed to build discovery information"))] BuildDiscoveryConfigMap { source: discovery::Error }, - #[snafu(display("failed to save discovery information to {}", cm))] + #[snafu(display("failed to save discovery information to {cm:?}"))] ApplyDiscoveryConfigMap { source: stackable_operator::cluster_resources::Error, cm: ObjectRef, @@ -442,34 +442,34 @@ mod znode_mgmt { source: std::io::Error, addr: String, }, - #[snafu(display("address {} did not resolve to any socket addresses", addr))] + #[snafu(display("address {addr:?} did not resolve to any socket addresses"))] AddrResolution { addr: String }, - #[snafu(display("failed to connect to {}", addr))] + #[snafu(display("failed to connect to {addr:?}"))] Connect { source: tokio_zookeeper::error::Error, addr: SocketAddr, }, - #[snafu(display("protocol error creating znode {}", path))] + #[snafu(display("protocol error creating znode {path:?}"))] CreateZnodeProtocol { source: tokio_zookeeper::error::Error, path: String, }, - #[snafu(display("failed to create znode {}", path))] + #[snafu(display("failed to create znode {path:?}"))] CreateZnode { source: tokio_zookeeper::error::Create, path: String, }, - #[snafu(display("protocol error deleting znode {}", path))] + #[snafu(display("protocol error deleting znode {path:?}"))] DeleteZnodeProtocol { source: tokio_zookeeper::error::Error, path: String, }, - #[snafu(display("failed to delete znode {}", path))] + #[snafu(display("failed to delete znode {path:?}"))] DeleteZnode { source: tokio_zookeeper::error::Delete, path: String, }, - #[snafu(display("failed to find children to delete of {}", path))] + #[snafu(display("failed to find children to delete of {path:?}"))] DeleteZnodeFindChildrenProtocol { source: tokio_zookeeper::error::Error, path: String, From 43d446930ad1d327c613c4c9b72cae5c9237f42b Mon Sep 17 00:00:00 2001 From: Nick Larsen Date: Fri, 4 Jul 2025 09:55:51 +0200 Subject: [PATCH 18/18] fix: Remove extra brace --- rust/operator-binary/src/zk_controller.rs | 1 - 1 file changed, 1 deletion(-) diff --git a/rust/operator-binary/src/zk_controller.rs b/rust/operator-binary/src/zk_controller.rs index b105adcb..cee38f97 100644 --- a/rust/operator-binary/src/zk_controller.rs +++ b/rust/operator-binary/src/zk_controller.rs @@ -494,7 +494,6 @@ pub async fn reconcile_zk( let role_config = zk.role_config(&zk_role); if let Some(ZookeeperServerRoleConfig { common, .. }) = role_config { - { add_pdbs( &common.pod_disruption_budget, zk,