diff --git a/CHANGELOG.md b/CHANGELOG.md index 631b3ae2..517ef2f8 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -11,6 +11,7 @@ All notable changes to this project will be documented in this file. - Use `--file-log-rotation-period` (or `FILE_LOG_ROTATION_PERIOD`) to configure the frequency of rotation. - Use `--console-log-format` (or `CONSOLE_LOG_FORMAT`) to set the format to `plain` (default) or `json`. - Add built-in Prometheus support and expose metrics on `/metrics` path of `native-metrics` port ([#955]). +- BREAKING: Add listener support ([#957]). ### Changed @@ -47,6 +48,7 @@ All notable changes to this project will be documented in this file. [#946]: https://github.com/stackabletech/zookeeper-operator/pull/946 [#950]: https://github.com/stackabletech/zookeeper-operator/pull/950 [#955]: https://github.com/stackabletech/zookeeper-operator/pull/955 +[#957]: https://github.com/stackabletech/zookeeper-operator/pull/957 ## [25.3.0] - 2025-03-21 diff --git a/deploy/helm/zookeeper-operator/crds/crds.yaml b/deploy/helm/zookeeper-operator/crds/crds.yaml index 1ecc3a6a..0d6db85f 100644 --- a/deploy/helm/zookeeper-operator/crds/crds.yaml +++ b/deploy/helm/zookeeper-operator/crds/crds.yaml @@ -28,7 +28,6 @@ spec: clusterConfig: default: authentication: [] - listenerClass: cluster-internal tls: quorumSecretClass: tls serverSecretClass: tls @@ -53,20 +52,6 @@ spec: - authenticationClass type: object type: array - listenerClass: - default: cluster-internal - description: |- - This field controls which type of Service the Operator creates for this ZookeeperCluster: - - * cluster-internal: Use a ClusterIP service - - * external-unstable: Use a NodePort service - - This is a temporary solution with the goal to keep yaml manifests forward compatible. In the future, this setting will control which [ListenerClass](https://docs.stackable.tech/home/nightly/listener-operator/listenerclass.html) will be used to expose the service, and ListenerClass names will stay the same, allowing for a non-breaking change. - enum: - - cluster-internal - - external-unstable - type: string tls: default: quorumSecretClass: tls @@ -444,11 +429,16 @@ spec: x-kubernetes-preserve-unknown-fields: true roleConfig: default: + listenerClass: cluster-internal podDisruptionBudget: enabled: true maxUnavailable: null description: This is a product-agnostic RoleConfig, which is sufficient for most of the products. properties: + listenerClass: + default: cluster-internal + description: This field controls which [ListenerClass](https://docs.stackable.tech/home/nightly/listener-operator/listenerclass.html) is used to expose the ZooKeeper servers. + type: string podDisruptionBudget: default: enabled: true diff --git a/deploy/helm/zookeeper-operator/templates/roles.yaml b/deploy/helm/zookeeper-operator/templates/roles.yaml index 8d3779b1..4c03cf09 100644 --- a/deploy/helm/zookeeper-operator/templates/roles.yaml +++ b/deploy/helm/zookeeper-operator/templates/roles.yaml @@ -107,6 +107,17 @@ rules: verbs: - create - patch + - apiGroups: + - listeners.stackable.tech + resources: + - listeners + verbs: + - get + - list + - watch + - patch + - create + - delete - apiGroups: - {{ include "operator.name" . }}.stackable.tech resources: diff --git a/docs/modules/zookeeper/pages/usage_guide/listenerclass.adoc b/docs/modules/zookeeper/pages/usage_guide/listenerclass.adoc index 037f934f..3bae38e3 100644 --- a/docs/modules/zookeeper/pages/usage_guide/listenerclass.adoc +++ b/docs/modules/zookeeper/pages/usage_guide/listenerclass.adoc @@ -1,16 +1,16 @@ = Service exposition with ListenerClasses +:description: Configure the ZooKeeper service exposure with listener classes: cluster-internal, external-unstable or external-stable Apache ZooKeeper offers an API. The Operator deploys a service called `` (where `` is the name of the ZookeeperCluster) through which ZooKeeper can be reached. -This service can have either the `cluster-internal` or `external-unstable` type. `external-stable` is not supported for ZooKeeper at the moment. -Read more about the types in the xref:concepts:service-exposition.adoc[service exposition] documentation at platform level. - -This is how the listener class is configured: +The operator deploys a xref:listener-operator:listener.adoc[Listener] for the Server pods. +The listener defaults to only being accessible from within the Kubernetes cluster, but this can be changed by setting `.spec.servers.roleConfig.listenerClass`: [source,yaml] ---- spec: - clusterConfig: - listenerClass: cluster-internal # <1> + servers: + roleConfig: + listenerClass: external-unstable # <1> ---- -<1> The default `cluster-internal` setting. +<1> Specify one of `external-stable`, `external-unstable`, `cluster-internal` (the default setting is `cluster-internal`). diff --git a/rust/operator-binary/src/config/jvm.rs b/rust/operator-binary/src/config/jvm.rs index 2db69e86..7f9b3015 100644 --- a/rust/operator-binary/src/config/jvm.rs +++ b/rust/operator-binary/src/config/jvm.rs @@ -1,13 +1,15 @@ use snafu::{OptionExt, ResultExt, Snafu}; use stackable_operator::{ memory::{BinaryMultiple, MemoryQuantity}, - role_utils::{self, GenericRoleConfig, JavaCommonConfig, JvmArgumentOverrides, Role}, + role_utils::{self, JavaCommonConfig, JvmArgumentOverrides, Role}, }; use crate::crd::{ - JVM_SECURITY_PROPERTIES_FILE, LOG4J_CONFIG_FILE, LOGBACK_CONFIG_FILE, LoggingFramework, - METRICS_PORT, STACKABLE_CONFIG_DIR, STACKABLE_LOG_CONFIG_DIR, - v1alpha1::{ZookeeperCluster, ZookeeperConfig, ZookeeperConfigFragment}, + JMX_METRICS_PORT, JVM_SECURITY_PROPERTIES_FILE, LOG4J_CONFIG_FILE, LOGBACK_CONFIG_FILE, + LoggingFramework, STACKABLE_CONFIG_DIR, STACKABLE_LOG_CONFIG_DIR, + v1alpha1::{ + ZookeeperCluster, ZookeeperConfig, ZookeeperConfigFragment, ZookeeperServerRoleConfig, + }, }; const JAVA_HEAP_FACTOR: f32 = 0.8; @@ -29,7 +31,7 @@ pub enum Error { /// All JVM arguments. fn construct_jvm_args( zk: &ZookeeperCluster, - role: &Role, + role: &Role, role_group: &str, ) -> Result, Error> { let logging_framework = zk.logging_framework(); @@ -37,7 +39,7 @@ fn construct_jvm_args( let jvm_args = vec![ format!("-Djava.security.properties={STACKABLE_CONFIG_DIR}/{JVM_SECURITY_PROPERTIES_FILE}"), format!( - "-javaagent:/stackable/jmx/jmx_prometheus_javaagent.jar={METRICS_PORT}:/stackable/jmx/server.yaml" + "-javaagent:/stackable/jmx/jmx_prometheus_javaagent.jar={JMX_METRICS_PORT}:/stackable/jmx/server.yaml" ), match logging_framework { LoggingFramework::LOG4J => { @@ -63,7 +65,7 @@ fn construct_jvm_args( /// [`construct_zk_server_heap_env`]). pub fn construct_non_heap_jvm_args( zk: &ZookeeperCluster, - role: &Role, + role: &Role, role_group: &str, ) -> Result { let mut jvm_args = construct_jvm_args(zk, role, role_group)?; @@ -99,7 +101,10 @@ fn is_heap_jvm_argument(jvm_argument: &str) -> bool { #[cfg(test)] mod tests { use super::*; - use crate::crd::{ZookeeperRole, v1alpha1::ZookeeperConfig}; + use crate::crd::{ + ZookeeperRole, + v1alpha1::{ZookeeperConfig, ZookeeperServerRoleConfig}, + }; #[test] fn test_construct_jvm_arguments_defaults() { @@ -182,7 +187,7 @@ mod tests { ) -> ( ZookeeperCluster, ZookeeperConfig, - Role, + Role, String, ) { let zookeeper: ZookeeperCluster = diff --git a/rust/operator-binary/src/crd/mod.rs b/rust/operator-binary/src/crd/mod.rs index 8fd6291a..d05fb6ca 100644 --- a/rust/operator-binary/src/crd/mod.rs +++ b/rust/operator-binary/src/crd/mod.rs @@ -34,7 +34,11 @@ use stackable_operator::{ }; use strum::{Display, EnumIter, EnumString, IntoEnumIterator}; -use crate::crd::affinity::get_affinity; +use crate::{ + crd::{affinity::get_affinity, v1alpha1::ZookeeperServerRoleConfig}, + discovery::build_role_group_headless_service_name, + listener::role_listener_name, +}; pub mod affinity; pub mod authentication; @@ -47,8 +51,16 @@ pub const OPERATOR_NAME: &str = "zookeeper.stackable.tech"; pub const ZOOKEEPER_PROPERTIES_FILE: &str = "zoo.cfg"; pub const JVM_SECURITY_PROPERTIES_FILE: &str = "security.properties"; -pub const METRICS_PORT: u16 = 9505; +pub const ZOOKEEPER_SERVER_PORT_NAME: &str = "zk"; +pub const ZOOKEEPER_LEADER_PORT_NAME: &str = "zk-leader"; +pub const ZOOKEEPER_LEADER_PORT: u16 = 2888; +pub const ZOOKEEPER_ELECTION_PORT_NAME: &str = "zk-election"; +pub const ZOOKEEPER_ELECTION_PORT: u16 = 3888; + +pub const JMX_METRICS_PORT_NAME: &str = "metrics"; +pub const JMX_METRICS_PORT: u16 = 9505; pub const METRICS_PROVIDER_HTTP_PORT_KEY: &str = "metricsProvider.httpPort"; +pub const METRICS_PROVIDER_HTTP_PORT_NAME: &str = "native-metrics"; pub const METRICS_PROVIDER_HTTP_PORT: u16 = 7000; pub const STACKABLE_DATA_DIR: &str = "/stackable/data"; @@ -74,6 +86,7 @@ pub const MAX_PREPARE_LOG_FILE_SIZE: MemoryQuantity = MemoryQuantity { pub const DOCKER_IMAGE_BASE_NAME: &str = "zookeeper"; const DEFAULT_SERVER_GRACEFUL_SHUTDOWN_TIMEOUT: Duration = Duration::from_minutes_unchecked(2); +pub const DEFAULT_LISTENER_CLASS: &str = "cluster-internal"; mod built_info { pub const CARGO_PKG_VERSION: &str = env!("CARGO_PKG_VERSION"); @@ -141,7 +154,19 @@ pub mod versioned { // no doc - it's in the struct. #[serde(skip_serializing_if = "Option::is_none")] - pub servers: Option>, + pub servers: + Option>, + } + + #[derive(Clone, Debug, Deserialize, JsonSchema, PartialEq, Serialize)] + #[serde(rename_all = "camelCase")] + pub struct ZookeeperServerRoleConfig { + #[serde(flatten)] + pub common: GenericRoleConfig, + + /// This field controls which [ListenerClass](DOCS_BASE_URL_PLACEHOLDER/listener-operator/listenerclass.html) is used to expose the ZooKeeper servers. + #[serde(default = "default_listener_class")] + pub listener_class: String, } #[derive(Clone, Deserialize, Debug, Eq, JsonSchema, PartialEq, Serialize)] @@ -166,29 +191,6 @@ pub mod versioned { skip_serializing_if = "Option::is_none" )] pub tls: Option, - - /// This field controls which type of Service the Operator creates for this ZookeeperCluster: - /// - /// * cluster-internal: Use a ClusterIP service - /// - /// * external-unstable: Use a NodePort service - /// - /// This is a temporary solution with the goal to keep yaml manifests forward compatible. - /// In the future, this setting will control which [ListenerClass](DOCS_BASE_URL_PLACEHOLDER/listener-operator/listenerclass.html) - /// will be used to expose the service, and ListenerClass names will stay the same, allowing for a non-breaking change. - #[serde(default)] - pub listener_class: CurrentlySupportedListenerClasses, - } - - // TODO: Temporary solution until listener-operator is finished - #[derive(Clone, Debug, Default, Display, Deserialize, Eq, JsonSchema, PartialEq, Serialize)] - #[serde(rename_all = "PascalCase")] - pub enum CurrentlySupportedListenerClasses { - #[default] - #[serde(rename = "cluster-internal")] - ClusterInternal, - #[serde(rename = "external-unstable")] - ExternalUnstable, } #[derive(Clone, Debug, Default, Fragment, JsonSchema, PartialEq)] @@ -346,7 +348,7 @@ pub enum ZookeeperRole { /// Used for service discovery. pub struct ZookeeperPodRef { pub namespace: String, - pub role_group_service_name: String, + pub role_group_headless_service_name: String, pub pod_name: String, pub zookeeper_myid: u16, } @@ -356,15 +358,18 @@ fn cluster_config_default() -> v1alpha1::ZookeeperClusterConfig { authentication: vec![], vector_aggregator_config_map_name: None, tls: tls::default_zookeeper_tls(), - listener_class: v1alpha1::CurrentlySupportedListenerClasses::default(), } } -impl v1alpha1::CurrentlySupportedListenerClasses { - pub fn k8s_service_type(&self) -> String { - match self { - v1alpha1::CurrentlySupportedListenerClasses::ClusterInternal => "ClusterIP".to_string(), - v1alpha1::CurrentlySupportedListenerClasses::ExternalUnstable => "NodePort".to_string(), +fn default_listener_class() -> String { + DEFAULT_LISTENER_CLASS.to_owned() +} + +impl Default for ZookeeperServerRoleConfig { + fn default() -> Self { + Self { + listener_class: default_listener_class(), + common: Default::default(), } } } @@ -506,11 +511,11 @@ impl HasStatusCondition for v1alpha1::ZookeeperCluster { } impl ZookeeperPodRef { - pub fn fqdn(&self, cluster_info: &KubernetesClusterInfo) -> String { + pub fn internal_fqdn(&self, cluster_info: &KubernetesClusterInfo) -> String { format!( "{pod_name}.{service_name}.{namespace}.svc.{cluster_domain}", pod_name = self.pod_name, - service_name = self.role_group_service_name, + service_name = self.role_group_headless_service_name, namespace = self.namespace, cluster_domain = cluster_info.cluster_domain ) @@ -541,16 +546,16 @@ impl v1alpha1::ZookeeperCluster { } } - /// The name of the role-level load-balanced Kubernetes `Service` - pub fn server_role_service_name(&self) -> Option { - self.metadata.name.clone() - } - - /// The fully-qualified domain name of the role-level load-balanced Kubernetes `Service` - pub fn server_role_service_fqdn(&self, cluster_info: &KubernetesClusterInfo) -> Option { + /// The fully-qualified domain name of the role-level [Listener] + /// + /// [Listener]: stackable_operator::crd::listener::v1alpha1::Listener + pub fn server_role_listener_fqdn( + &self, + cluster_info: &KubernetesClusterInfo, + ) -> Option { Some(format!( - "{role_service_name}.{namespace}.svc.{cluster_domain}", - role_service_name = self.server_role_service_name()?, + "{role_listener_name}.{namespace}.svc.{cluster_domain}", + role_listener_name = role_listener_name(self, &ZookeeperRole::Server), namespace = self.metadata.namespace.as_ref()?, cluster_domain = cluster_info.cluster_domain )) @@ -560,8 +565,10 @@ impl v1alpha1::ZookeeperCluster { pub fn role( &self, role_variant: &ZookeeperRole, - ) -> Result<&Role, Error> - { + ) -> Result< + &Role, + Error, + > { match role_variant { ZookeeperRole::Server => self.spec.servers.as_ref(), } @@ -602,7 +609,7 @@ impl v1alpha1::ZookeeperCluster { } } - pub fn role_config(&self, role: &ZookeeperRole) -> Option<&GenericRoleConfig> { + pub fn role_config(&self, role: &ZookeeperRole) -> Option<&ZookeeperServerRoleConfig> { match role { ZookeeperRole::Server => self.spec.servers.as_ref().map(|s| &s.role_config), } @@ -634,8 +641,10 @@ impl v1alpha1::ZookeeperCluster { for i in 0..rolegroup.replicas.unwrap_or(1) { pod_refs.push(ZookeeperPodRef { namespace: ns.clone(), - role_group_service_name: rolegroup_ref.object_name(), - pod_name: format!("{}-{}", rolegroup_ref.object_name(), i), + role_group_headless_service_name: build_role_group_headless_service_name( + rolegroup_ref.object_name(), + ), + pod_name: format!("{role_group}-{i}", role_group = rolegroup_ref.object_name()), zookeeper_myid: i + myid_offset, }); } diff --git a/rust/operator-binary/src/crd/security.rs b/rust/operator-binary/src/crd/security.rs index a667e7da..23daad4b 100644 --- a/rust/operator-binary/src/crd/security.rs +++ b/rust/operator-binary/src/crd/security.rs @@ -25,7 +25,13 @@ use stackable_operator::{ time::Duration, }; -use crate::crd::{authentication, authentication::ResolvedAuthenticationClasses, tls, v1alpha1}; +use crate::{ + crd::{ + authentication::{self, ResolvedAuthenticationClasses}, + tls, v1alpha1, + }, + zk_controller::LISTENER_VOLUME_NAME, +}; type Result = std::result::Result; @@ -158,7 +164,7 @@ impl ZookeeperSecurity { .add_volume_mount(tls_volume_name, Self::SERVER_TLS_DIR) .context(AddVolumeMountSnafu)?; pod_builder - .add_volume(Self::create_tls_volume( + .add_volume(Self::create_server_tls_volume( tls_volume_name, secret_class, requested_secret_lifetime, @@ -172,7 +178,7 @@ impl ZookeeperSecurity { .add_volume_mount(tls_volume_name, Self::QUORUM_TLS_DIR) .context(AddVolumeMountSnafu)?; pod_builder - .add_volume(Self::create_tls_volume( + .add_volume(Self::create_quorum_tls_volume( tls_volume_name, &self.quorum_secret_class, requested_secret_lifetime, @@ -298,8 +304,34 @@ impl ZookeeperSecurity { .or(self.server_secret_class.as_ref()) } - /// Creates ephemeral volumes to mount the `SecretClass` into the Pods - fn create_tls_volume( + /// Creates ephemeral volumes to mount the `SecretClass` with the listener-volume scope into the Pods. + /// + /// The resulting volume will contain TLS certificates with the FQDN reported in the applicable [ListenerStatus]. + /// + /// [ListenerStatus]: ::stackable_operator::crd::listener::v1alpha1::ListenerStatus + fn create_server_tls_volume( + volume_name: &str, + secret_class_name: &str, + requested_secret_lifetime: &Duration, + ) -> Result { + let volume = VolumeBuilder::new(volume_name) + .ephemeral( + SecretOperatorVolumeSourceBuilder::new(secret_class_name) + .with_listener_volume_scope(LISTENER_VOLUME_NAME) + .with_format(SecretFormat::TlsPkcs12) + .with_auto_tls_cert_lifetime(*requested_secret_lifetime) + .build() + .context(BuildTlsVolumeSnafu { volume_name })?, + ) + .build(); + + Ok(volume) + } + + /// Creates ephemeral volumes to mount the `SecretClass` with the pod scope into the Pods. + /// + /// The resulting volume will contain TLS certificates with the FQDN of the Pod in relation to the StatefulSet's headless service. + fn create_quorum_tls_volume( volume_name: &str, secret_class_name: &str, requested_secret_lifetime: &Duration, @@ -308,7 +340,6 @@ impl ZookeeperSecurity { .ephemeral( SecretOperatorVolumeSourceBuilder::new(secret_class_name) .with_pod_scope() - .with_node_scope() .with_format(SecretFormat::TlsPkcs12) .with_auto_tls_cert_lifetime(*requested_secret_lifetime) .build() diff --git a/rust/operator-binary/src/discovery.rs b/rust/operator-binary/src/discovery.rs index c774d628..5f7895d1 100644 --- a/rust/operator-binary/src/discovery.rs +++ b/rust/operator-binary/src/discovery.rs @@ -4,13 +4,13 @@ use snafu::{OptionExt, ResultExt, Snafu}; use stackable_operator::{ builder::{configmap::ConfigMapBuilder, meta::ObjectMetaBuilder}, commons::product_image_selection::ResolvedProductImage, - k8s_openapi::api::core::v1::{ConfigMap, Endpoints, Service}, + crd::listener, + k8s_openapi::api::core::v1::ConfigMap, kube::{Resource, ResourceExt, runtime::reflector::ObjectRef}, - utils::cluster_info::KubernetesClusterInfo, }; use crate::{ - crd::{ZookeeperRole, security::ZookeeperSecurity, v1alpha1}, + crd::{ZOOKEEPER_SERVER_PORT_NAME, ZookeeperRole, security::ZookeeperSecurity, v1alpha1}, utils::build_recommended_labels, }; @@ -27,29 +27,28 @@ pub enum Error { #[snafu(display("chroot path {} was relative (must be absolute)", chroot))] RelativeChroot { chroot: String }, - #[snafu(display("object has no name associated"))] - NoName, - #[snafu(display("object has no namespace associated"))] NoNamespace, #[snafu(display("failed to list expected pods"))] ExpectedPods { source: crate::crd::Error }, - #[snafu(display("could not find service port with name {}", port_name))] - NoServicePort { port_name: String }, - - #[snafu(display("service port with name {} does not have a nodePort", port_name))] - NoNodePort { port_name: String }, + #[snafu(display("{listener} does not have a port with the name {port_name:?}"))] + PortNotFound { + port_name: String, + listener: ObjectRef, + }, - #[snafu(display("could not find Endpoints for {}", svc))] - FindEndpoints { - source: stackable_operator::client::Error, - svc: ObjectRef, + #[snafu(display("expected an unsigned 16-bit port, got {port_number}"))] + InvalidPort { + source: TryFromIntError, + port_number: i32, }, - #[snafu(display("nodePort was out of range"))] - InvalidNodePort { source: TryFromIntError }, + #[snafu(display("{listener} has no ingress addresses"))] + NoListenerIngressAddresses { + listener: ObjectRef, + }, #[snafu(display("failed to build ConfigMap"))] BuildConfigMap { @@ -62,75 +61,31 @@ pub enum Error { }, } -/// Builds discovery [`ConfigMap`]s for connecting to a [`v1alpha1::ZookeeperCluster`] for all expected scenarios +/// Build a discovery [`ConfigMap`] containing connection details for a [`v1alpha1::ZookeeperCluster`] from a [`listener::v1alpha1::Listener`]. #[allow(clippy::too_many_arguments)] -pub async fn build_discovery_configmaps( +pub fn build_discovery_configmap( zk: &v1alpha1::ZookeeperCluster, owner: &impl Resource, - client: &stackable_operator::client::Client, controller_name: &str, - svc: &Service, + listener: listener::v1alpha1::Listener, chroot: Option<&str>, resolved_product_image: &ResolvedProductImage, zookeeper_security: &ZookeeperSecurity, -) -> Result> { +) -> Result { let name = owner.name_unchecked(); let namespace = owner.namespace().context(NoNamespaceSnafu)?; - let mut discovery_configmaps = vec![build_discovery_configmap( - zk, - owner, - zookeeper_security, - name.as_str(), - &namespace, - controller_name, - chroot, - pod_hosts(zk, zookeeper_security, &client.kubernetes_cluster_info)?, - resolved_product_image, - )?]; - if zk.spec.cluster_config.listener_class - == crate::crd::v1alpha1::CurrentlySupportedListenerClasses::ExternalUnstable - { - discovery_configmaps.push(build_discovery_configmap( - zk, - owner, - zookeeper_security, - &format!("{}-nodeport", name), - &namespace, - controller_name, - chroot, - nodeport_hosts(client, svc, "zk").await?, - resolved_product_image, - )?); - } - - Ok(discovery_configmaps) -} + let listener_addresses = listener_addresses(&listener, ZOOKEEPER_SERVER_PORT_NAME)?; -/// Build a discovery [`ConfigMap`] containing information about how to connect to a certain [`v1alpha1::ZookeeperCluster`] -/// -/// `hosts` will usually come from either [`pod_hosts`] or [`nodeport_hosts`]. -#[allow(clippy::too_many_arguments)] -fn build_discovery_configmap( - zk: &v1alpha1::ZookeeperCluster, - owner: &impl Resource, - zookeeper_security: &ZookeeperSecurity, - name: &str, - namespace: &str, - controller_name: &str, - chroot: Option<&str>, - hosts: impl IntoIterator, u16)>, - resolved_product_image: &ResolvedProductImage, -) -> Result { // Write a connection string of the format that Java ZooKeeper client expects: // "{host1}:{port1},{host2:port2},.../{chroot}" // See https://zookeeper.apache.org/doc/current/apidocs/zookeeper-server/org/apache/zookeeper/ZooKeeper.html#ZooKeeper-java.lang.String-int-org.apache.zookeeper.Watcher- - let hosts = hosts + let listener_addresses = listener_addresses .into_iter() - .map(|(host, port)| format!("{}:{}", host.into(), port)) + .map(|(host, port)| format!("{host}:{port}")) .collect::>() .join(","); - let mut conn_str = hosts.clone(); + let mut conn_str = listener_addresses.clone(); if let Some(chroot) = chroot { if !chroot.starts_with('/') { return RelativeChrootSnafu { chroot }.fail(); @@ -158,7 +113,7 @@ fn build_discovery_configmap( ) .add_data("ZOOKEEPER", conn_str) // Some clients don't support ZooKeeper's merged `hosts/chroot` format, so export them separately for these clients - .add_data("ZOOKEEPER_HOSTS", hosts) + .add_data("ZOOKEEPER_HOSTS", listener_addresses) .add_data( "ZOOKEEPER_CLIENT_PORT", zookeeper_security.client_port().to_string(), @@ -168,57 +123,60 @@ fn build_discovery_configmap( .context(BuildConfigMapSnafu) } -/// Lists all Pods FQDNs expected to host the [`v1alpha1::ZookeeperCluster`] -fn pod_hosts<'a>( - zk: &'a v1alpha1::ZookeeperCluster, - zookeeper_security: &'a ZookeeperSecurity, - cluster_info: &'a KubernetesClusterInfo, -) -> Result + 'a> { - Ok(zk - .pods() - .context(ExpectedPodsSnafu)? - .map(|pod_ref| (pod_ref.fqdn(cluster_info), zookeeper_security.client_port()))) -} - -/// Lists all nodes currently hosting Pods participating in the [`Service`] -async fn nodeport_hosts( - client: &stackable_operator::client::Client, - svc: &Service, +/// Lists all listener address and port number pairs for a given `port_name` for Pods participating in the [`Listener`][1] +/// +/// This returns pairs of `(Address, Port)`, where address could be a hostname or IP address of a node, clusterIP or external +/// load balancer depending on the Service type. +/// +/// ## Errors +/// +/// An error will be returned if there is no address found for the `port_name`. +/// +/// [1]: listener::v1alpha1::Listener +// TODO (@NickLarsenNZ): Move this to stackable-operator, so it can be used as listener.addresses_for_port(port_name) +fn listener_addresses( + listener: &listener::v1alpha1::Listener, port_name: &str, ) -> Result> { - let svc_port = svc - .spec + // Get addresses port pairs for addresses that have a port with the name that matches the one we are interested in + let address_port_pairs = listener + .status .as_ref() - .and_then(|svc_spec| { - svc_spec - .ports - .as_ref()? - .iter() - .find(|port| port.name.as_deref() == Some(port_name)) + .and_then(|listener_status| listener_status.ingress_addresses.as_ref()) + .context(NoListenerIngressAddressesSnafu { listener })? + .iter() + // Filter the addresses that have the port we are interested in (they likely all have it though) + .filter_map(|listener_ingress| { + Some(listener_ingress.address.clone()).zip(listener_ingress.ports.get(port_name)) + }) + // Convert the port from i32 to u16 + .map(|(listener_address, &port_number)| { + let port_number: u16 = port_number + .try_into() + .context(InvalidPortSnafu { port_number })?; + Ok((listener_address, port_number)) }) - .context(NoServicePortSnafu { port_name })?; - let node_port = svc_port.node_port.context(NoNodePortSnafu { port_name })?; - let endpoints = client - .get::( - svc.metadata.name.as_deref().context(NoNameSnafu)?, - svc.metadata - .namespace - .as_deref() - .context(NoNamespaceSnafu)?, - ) - .await - .with_context(|_| FindEndpointsSnafu { - svc: ObjectRef::from_obj(svc), - })?; - let nodes = endpoints - .subsets - .into_iter() - .flatten() - .flat_map(|subset| subset.addresses) - .flatten() - .flat_map(|addr| addr.node_name); - let addrs = nodes - .map(|node| Ok((node, node_port.try_into().context(InvalidNodePortSnafu)?))) .collect::, _>>()?; - Ok(addrs) + + // An empty list is considered an error + match address_port_pairs.is_empty() { + true => PortNotFoundSnafu { + port_name, + listener, + } + .fail(), + false => Ok(address_port_pairs), + } +} + +// TODO (@NickLarsenNZ): Implement this directly on RoleGroupRef, ie: +// RoleGroupRef::metrics_service_name(&self) to restrict what _name_ can be. +pub fn build_role_group_headless_service_name(name: String) -> String { + format!("{name}-headless") +} + +// TODO (@NickLarsenNZ): Implement this directly on RoleGroupRef, ie: +// RoleGroupRef::metrics_service_name(&self) to restrict what _name_ can be. +pub fn build_role_group_metrics_service_name(name: String) -> String { + format!("{name}-metrics") } diff --git a/rust/operator-binary/src/listener.rs b/rust/operator-binary/src/listener.rs new file mode 100644 index 00000000..71575976 --- /dev/null +++ b/rust/operator-binary/src/listener.rs @@ -0,0 +1,95 @@ +//! Types and functions for exposing product endpoints via [listener::v1alpha1::Listener]. + +use snafu::{ResultExt as _, Snafu}; +use stackable_operator::{ + builder::meta::ObjectMetaBuilder, commons::product_image_selection::ResolvedProductImage, + crd::listener, kube::ResourceExt as _, +}; + +use crate::{ + crd::{ZOOKEEPER_SERVER_PORT_NAME, ZookeeperRole, security::ZookeeperSecurity, v1alpha1}, + utils::build_recommended_labels, + zk_controller::ZK_CONTROLLER_NAME, +}; + +type Result = std::result::Result; + +#[derive(Snafu, Debug)] +pub enum Error { + #[snafu(display("Role {zk_role:?} is not defined in the ZooKeeperCluster spec"))] + InvalidRole { + source: crate::crd::Error, + zk_role: String, + }, + + #[snafu(display("object is missing metadata to build owner reference"))] + ObjectMissingMetadataForOwnerRef { + source: stackable_operator::builder::meta::Error, + }, + + #[snafu(display("failed to build recommended labels"))] + RecommendedLabels { + source: stackable_operator::builder::meta::Error, + }, +} + +pub fn build_role_listener( + zk: &v1alpha1::ZookeeperCluster, + zk_role: &ZookeeperRole, + resolved_product_image: &ResolvedProductImage, + zookeeper_security: &ZookeeperSecurity, +) -> Result { + let listener_name = role_listener_name(zk, zk_role); + let listener_class = &zk + .role(zk_role) + .with_context(|_| InvalidRoleSnafu { + zk_role: zk_role.to_string(), + })? + .role_config + .listener_class; + + let listener = listener::v1alpha1::Listener { + metadata: ObjectMetaBuilder::new() + .name_and_namespace(zk) + .name(listener_name) + .ownerreference_from_resource(zk, None, Some(true)) + .context(ObjectMissingMetadataForOwnerRefSnafu)? + // Since we only make a listener for the role, which labels should we use? + // We can't use with_recommended_labels because it requires an ObjectLabels which + // in turn requires RoleGroup stuff) + // TODO (@NickLarsenNZ): Make separate functions for with_recommended_labels with/without rolegroups + // .with_labels(manual_labels).build() + .with_recommended_labels(build_recommended_labels( + zk, + ZK_CONTROLLER_NAME, + &resolved_product_image.app_version_label, + &zk_role.to_string(), + "none", // TODO (@NickLarsenNZ): update build_recommended_labels to have an optional role_group + )) + .context(RecommendedLabelsSnafu)? + .build(), + spec: listener::v1alpha1::ListenerSpec { + class_name: Some(listener_class.to_owned()), + ports: Some(listener_ports(zookeeper_security)), + ..listener::v1alpha1::ListenerSpec::default() + }, + status: None, + }; + + Ok(listener) +} + +// TODO (@NickLarsenNZ): This could be a method we can put on a Resource that takes a role_name +pub fn role_listener_name(zk: &v1alpha1::ZookeeperCluster, zk_role: &ZookeeperRole) -> String { + // TODO (@NickLarsenNZ): Make a convention, do we use name_any() and allow empty string? or handle the error (as unlikely as it would be)? + format!("{zk}-{zk_role}", zk = zk.name_any()) +} + +// We only use the server port here and intentionally omit the metrics one. +fn listener_ports(zookeeper_security: &ZookeeperSecurity) -> Vec { + vec![listener::v1alpha1::ListenerPort { + name: ZOOKEEPER_SERVER_PORT_NAME.to_string(), + port: zookeeper_security.client_port().into(), + protocol: Some("TCP".to_string()), + }] +} diff --git a/rust/operator-binary/src/main.rs b/rust/operator-binary/src/main.rs index ed548c2d..0fb628de 100644 --- a/rust/operator-binary/src/main.rs +++ b/rust/operator-binary/src/main.rs @@ -8,7 +8,7 @@ use stackable_operator::{ cli::{Command, ProductOperatorRun}, k8s_openapi::api::{ apps::v1::StatefulSet, - core::v1::{ConfigMap, Endpoints, Service}, + core::v1::{ConfigMap, Service}, }, kube::{ Resource, @@ -31,6 +31,7 @@ mod command; mod config; pub mod crd; mod discovery; +mod listener; mod operations; mod product_logging; mod utils; @@ -102,30 +103,11 @@ async fn main() -> anyhow::Result<()> { instance: None, }, )); - let zk_store = zk_controller.store(); let zk_controller = zk_controller .owns( watch_namespace.get_api::>(&client), watcher::Config::default(), ) - .watches( - watch_namespace.get_api::>(&client), - watcher::Config::default(), - move |endpoints| { - zk_store - .state() - .into_iter() - .filter(move |zk| { - let Ok(zk) = &zk.0 else { - return false; - }; - let endpoints_meta = endpoints.meta(); - zk.metadata.namespace == endpoints_meta.namespace - && zk.server_role_service_name() == endpoints_meta.name - }) - .map(|zk| ObjectRef::from_obj(&*zk)) - }, - ) .owns( watch_namespace.get_api::>(&client), watcher::Config::default(), diff --git a/rust/operator-binary/src/zk_controller.rs b/rust/operator-binary/src/zk_controller.rs index 706677d3..cee38f97 100644 --- a/rust/operator-binary/src/zk_controller.rs +++ b/rust/operator-binary/src/zk_controller.rs @@ -21,7 +21,12 @@ use stackable_operator::{ self, configmap::ConfigMapBuilder, meta::ObjectMetaBuilder, - pod::{PodBuilder, container::ContainerBuilder, resources::ResourceRequirementsBuilder}, + pod::{ + PodBuilder, + container::ContainerBuilder, + resources::ResourceRequirementsBuilder, + volume::{ListenerOperatorVolumeSourceBuilder, ListenerReference}, + }, }, cluster_resources::{ClusterResourceApplyStrategy, ClusterResources}, commons::{product_image_selection::ResolvedProductImage, rbac::build_rbac_resources}, @@ -31,8 +36,8 @@ use stackable_operator::{ apps::v1::{StatefulSet, StatefulSetSpec}, core::v1::{ ConfigMap, ConfigMapVolumeSource, EmptyDirVolumeSource, EnvVar, EnvVarSource, - ExecAction, ObjectFieldSelector, PodSecurityContext, Probe, Service, - ServiceAccount, ServicePort, ServiceSpec, Volume, + ExecAction, ObjectFieldSelector, PersistentVolumeClaim, PodSecurityContext, Probe, + Service, ServiceAccount, ServicePort, ServiceSpec, Volume, }, }, apimachinery::pkg::apis::meta::v1::LabelSelector, @@ -56,7 +61,7 @@ use stackable_operator::{ CustomContainerLogConfig, }, }, - role_utils::{GenericRoleConfig, RoleGroupRef}, + role_utils::RoleGroupRef, status::condition::{ compute_conditions, operations::ClusterOperationsConditionBuilder, statefulset::StatefulSetConditionBuilder, @@ -71,14 +76,22 @@ use crate::{ command::create_init_container_command_args, config::jvm::{construct_non_heap_jvm_args, construct_zk_server_heap_env}, crd::{ - DOCKER_IMAGE_BASE_NAME, JVM_SECURITY_PROPERTIES_FILE, MAX_PREPARE_LOG_FILE_SIZE, - MAX_ZK_LOG_FILES_SIZE, METRICS_PROVIDER_HTTP_PORT, METRICS_PROVIDER_HTTP_PORT_KEY, - STACKABLE_CONFIG_DIR, STACKABLE_DATA_DIR, STACKABLE_LOG_CONFIG_DIR, STACKABLE_LOG_DIR, - STACKABLE_RW_CONFIG_DIR, ZOOKEEPER_PROPERTIES_FILE, ZookeeperRole, + DOCKER_IMAGE_BASE_NAME, JMX_METRICS_PORT, JMX_METRICS_PORT_NAME, + JVM_SECURITY_PROPERTIES_FILE, MAX_PREPARE_LOG_FILE_SIZE, MAX_ZK_LOG_FILES_SIZE, + METRICS_PROVIDER_HTTP_PORT, METRICS_PROVIDER_HTTP_PORT_KEY, + METRICS_PROVIDER_HTTP_PORT_NAME, STACKABLE_CONFIG_DIR, STACKABLE_DATA_DIR, + STACKABLE_LOG_CONFIG_DIR, STACKABLE_LOG_DIR, STACKABLE_RW_CONFIG_DIR, + ZOOKEEPER_ELECTION_PORT, ZOOKEEPER_ELECTION_PORT_NAME, ZOOKEEPER_LEADER_PORT, + ZOOKEEPER_LEADER_PORT_NAME, ZOOKEEPER_PROPERTIES_FILE, ZOOKEEPER_SERVER_PORT_NAME, + ZookeeperRole, security::{self, ZookeeperSecurity}, - v1alpha1, + v1alpha1::{self, ZookeeperServerRoleConfig}, + }, + discovery::{ + self, build_discovery_configmap, build_role_group_headless_service_name, + build_role_group_metrics_service_name, }, - discovery::{self, build_discovery_configmaps}, + listener::{build_role_listener, role_listener_name}, operations::{graceful_shutdown::add_graceful_shutdown_config, pdb::add_pdbs}, product_logging::extend_role_group_config_map, utils::build_recommended_labels, @@ -86,6 +99,8 @@ use crate::{ pub const ZK_CONTROLLER_NAME: &str = "zookeepercluster"; pub const ZK_FULL_CONTROLLER_NAME: &str = concatcp!(ZK_CONTROLLER_NAME, '.', OPERATOR_NAME); +pub const LISTENER_VOLUME_NAME: &str = "listener"; +pub const LISTENER_VOLUME_DIR: &str = "/stackable/listener"; pub struct Ctx { pub client: stackable_operator::client::Client, @@ -266,6 +281,21 @@ pub enum Error { #[snafu(display("failed to construct JVM arguments"))] ConstructJvmArguments { source: crate::config::jvm::Error }, + + #[snafu(display("failed to apply group listener"))] + ApplyGroupListener { + source: stackable_operator::cluster_resources::Error, + }, + + #[snafu(display("failed to configure listener"))] + ListenerConfiguration { source: crate::listener::Error }, + + // #[snafu(display("failed to configure listener"))] + // ListenerConfiguration { source: crate::listener::Error }, + #[snafu(display("failed to build listener volume"))] + BuildListenerPersistentVolume { + source: stackable_operator::builder::pod::volume::ListenerOperatorVolumeSourceBuilderError, + }, } impl ReconcilerError for Error { @@ -276,7 +306,7 @@ impl ReconcilerError for Error { fn secondary_object(&self) -> Option> { match self { Error::MissingSecretLifetime => None, - Error::InvalidZookeeperCluster { source: _ } => None, + Error::InvalidZookeeperCluster { .. } => None, Error::CrdValidationFailure { .. } => None, Error::NoServerRole => None, Error::RoleParseFailure { .. } => None, @@ -313,6 +343,9 @@ impl ReconcilerError for Error { Error::AddVolumeMount { .. } => None, Error::CreateClusterResources { .. } => None, Error::ConstructJvmArguments { .. } => None, + Error::ApplyGroupListener { .. } => None, + Error::BuildListenerPersistentVolume { .. } => None, + Error::ListenerConfiguration { .. } => None, } } } @@ -394,14 +427,6 @@ pub async fn reconcile_zk( .await .context(ApplyRoleBindingSnafu)?; - let server_role_service = cluster_resources - .add( - client, - build_server_role_service(zk, &resolved_product_image, &zookeeper_security)?, - ) - .await - .context(ApplyRoleServiceSnafu)?; - let mut ss_cond_builder = StatefulSetConditionBuilder::default(); let zk_role = ZookeeperRole::Server; @@ -411,11 +436,12 @@ pub async fn reconcile_zk( .merged_config(&ZookeeperRole::Server, &rolegroup) .context(FailedToResolveConfigSnafu)?; - let rg_service = build_server_rolegroup_service( + let rg_headless_service = + build_server_rolegroup_headless_service(zk, &rolegroup, &resolved_product_image)?; + let rg_metrics_service = build_server_rolegroup_metrics_service( zk, &rolegroup, &resolved_product_image, - &zookeeper_security, rolegroup_config, )?; let rg_configmap = build_server_rolegroup_config_map( @@ -436,8 +462,15 @@ pub async fn reconcile_zk( &merged_config, &rbac_sa, )?; + cluster_resources - .add(client, rg_service) + .add(client, rg_headless_service) + .await + .with_context(|_| ApplyRoleGroupServiceSnafu { + rolegroup: rolegroup.clone(), + })?; + cluster_resources + .add(client, rg_metrics_service) .await .with_context(|_| ApplyRoleGroupServiceSnafu { rolegroup: rolegroup.clone(), @@ -460,38 +493,45 @@ pub async fn reconcile_zk( } let role_config = zk.role_config(&zk_role); - if let Some(GenericRoleConfig { - pod_disruption_budget: pdb, - }) = role_config - { - add_pdbs(pdb, zk, &zk_role, client, &mut cluster_resources) - .await - .context(FailedToCreatePdbSnafu)?; + if let Some(ZookeeperServerRoleConfig { common, .. }) = role_config { + add_pdbs( + &common.pod_disruption_budget, + zk, + &zk_role, + client, + &mut cluster_resources, + ) + .await + .context(FailedToCreatePdbSnafu)?; } + let listener = build_role_listener(zk, &zk_role, &resolved_product_image, &zookeeper_security) + .context(ListenerConfigurationSnafu)?; + let applied_listener = cluster_resources + .add(client, listener) + .await + .context(ApplyGroupListenerSnafu)?; + // std's SipHasher is deprecated, and DefaultHasher is unstable across Rust releases. // We don't /need/ stability, but it's still nice to avoid spurious changes where possible. let mut discovery_hash = FnvHasher::with_key(0); - for discovery_cm in build_discovery_configmaps( + let discovery_cm = build_discovery_configmap( zk, zk, - client, ZK_CONTROLLER_NAME, - &server_role_service, + applied_listener, None, &resolved_product_image, &zookeeper_security, ) - .await - .context(BuildDiscoveryConfigSnafu)? - { - let discovery_cm = cluster_resources - .add(client, discovery_cm) - .await - .context(ApplyDiscoveryConfigSnafu)?; - if let Some(generation) = discovery_cm.metadata.resource_version { - discovery_hash.write(generation.as_bytes()) - } + .context(BuildDiscoveryConfigSnafu)?; + + let discovery_cm = cluster_resources + .add(client, discovery_cm) + .await + .context(ApplyDiscoveryConfigSnafu)?; + if let Some(generation) = discovery_cm.metadata.resource_version { + discovery_hash.write(generation.as_bytes()) } let cluster_operation_cond_builder = @@ -516,58 +556,6 @@ pub async fn reconcile_zk( Ok(controller::Action::await_change()) } -/// The server-role service is the primary endpoint that should be used by clients that do not perform internal load balancing, -/// including targets outside of the cluster. -/// -/// Note that you should generally *not* hard-code clients to use these services; instead, create a [`v1alpha1::ZookeeperZnode`](`v1alpha1::ZookeeperZnode`) -/// and use the connection string that it gives you. -pub fn build_server_role_service( - zk: &v1alpha1::ZookeeperCluster, - resolved_product_image: &ResolvedProductImage, - zookeeper_security: &ZookeeperSecurity, -) -> Result { - let role_name = ZookeeperRole::Server.to_string(); - let role_svc_name = zk - .server_role_service_name() - .context(GlobalServiceNameNotFoundSnafu)?; - - let metadata = ObjectMetaBuilder::new() - .name_and_namespace(zk) - .name(&role_svc_name) - .ownerreference_from_resource(zk, None, Some(true)) - .context(ObjectMissingMetadataForOwnerRefSnafu)? - .with_recommended_labels(build_recommended_labels( - zk, - ZK_CONTROLLER_NAME, - &resolved_product_image.app_version_label, - &role_name, - "global", - )) - .context(ObjectMetaSnafu)? - .build(); - - let service_selector_labels = - Labels::role_selector(zk, APP_NAME, &role_name).context(BuildLabelSnafu)?; - - let service_spec = ServiceSpec { - ports: Some(vec![ServicePort { - name: Some("zk".to_string()), - port: zookeeper_security.client_port().into(), - protocol: Some("TCP".to_string()), - ..ServicePort::default() - }]), - selector: Some(service_selector_labels.into()), - type_: Some(zk.spec.cluster_config.listener_class.k8s_service_type()), - ..ServiceSpec::default() - }; - - Ok(Service { - metadata, - spec: Some(service_spec), - status: None, - }) -} - /// The rolegroup [`ConfigMap`] configures the rolegroup based on the configuration given by the administrator fn build_server_rolegroup_config_map( zk: &v1alpha1::ZookeeperCluster, @@ -583,11 +571,11 @@ fn build_server_rolegroup_config_map( .flatten() .map(|pod| { ( - format!("server.{}", pod.zookeeper_myid), + format!("server.{id}", id = pod.zookeeper_myid), format!( - "{}:2888:3888;{}", - pod.fqdn(cluster_info), - zookeeper_security.client_port() + "{internal_fqdn}:{ZOOKEEPER_LEADER_PORT}:{ZOOKEEPER_ELECTION_PORT};{client_port}", + internal_fqdn = pod.internal_fqdn(cluster_info), + client_port = zookeeper_security.client_port() ), ) }) @@ -669,14 +657,70 @@ fn build_server_rolegroup_config_map( }) } -/// The rolegroup [`Service`] is a headless service that allows direct access to the instances of a certain rolegroup +/// The rolegroup [`Service`] is a headless service that allows internal access to the instances of a certain rolegroup /// /// This is mostly useful for internal communication between peers, or for clients that perform client-side load balancing. -fn build_server_rolegroup_service( +fn build_server_rolegroup_headless_service( + zk: &v1alpha1::ZookeeperCluster, + rolegroup: &RoleGroupRef, + resolved_product_image: &ResolvedProductImage, +) -> Result { + let metadata = ObjectMetaBuilder::new() + .name_and_namespace(zk) + .name(build_role_group_headless_service_name( + rolegroup.object_name(), + )) + .ownerreference_from_resource(zk, None, Some(true)) + .context(ObjectMissingMetadataForOwnerRefSnafu)? + .with_recommended_labels(build_recommended_labels( + zk, + ZK_CONTROLLER_NAME, + &resolved_product_image.app_version_label, + &rolegroup.role, + &rolegroup.role_group, + )) + .context(ObjectMetaSnafu)? + .build(); + + let service_selector_labels = + Labels::role_group_selector(zk, APP_NAME, &rolegroup.role, &rolegroup.role_group) + .context(BuildLabelSnafu)?; + + let service_spec = ServiceSpec { + // Internal communication does not need to be exposed + type_: Some("ClusterIP".to_string()), + cluster_ip: Some("None".to_string()), + ports: Some(vec![ + ServicePort { + name: Some(ZOOKEEPER_LEADER_PORT_NAME.to_string()), + port: ZOOKEEPER_LEADER_PORT as i32, + protocol: Some("TCP".to_string()), + ..ServicePort::default() + }, + ServicePort { + name: Some(ZOOKEEPER_ELECTION_PORT_NAME.to_string()), + port: ZOOKEEPER_ELECTION_PORT as i32, + protocol: Some("TCP".to_string()), + ..ServicePort::default() + }, + ]), + selector: Some(service_selector_labels.into()), + publish_not_ready_addresses: Some(true), + ..ServiceSpec::default() + }; + + Ok(Service { + metadata, + spec: Some(service_spec), + status: None, + }) +} + +/// The rolegroup [`Service`] for exposing metrics +fn build_server_rolegroup_metrics_service( zk: &v1alpha1::ZookeeperCluster, rolegroup: &RoleGroupRef, resolved_product_image: &ResolvedProductImage, - zookeeper_security: &ZookeeperSecurity, rolegroup_config: &HashMap>, ) -> Result { let prometheus_label = @@ -684,7 +728,9 @@ fn build_server_rolegroup_service( let metadata = ObjectMetaBuilder::new() .name_and_namespace(zk) - .name(rolegroup.object_name()) + .name(build_role_group_metrics_service_name( + rolegroup.object_name(), + )) .ownerreference_from_resource(zk, None, Some(true)) .context(ObjectMissingMetadataForOwnerRefSnafu)? .with_recommended_labels(build_recommended_labels( @@ -708,20 +754,14 @@ fn build_server_rolegroup_service( cluster_ip: Some("None".to_string()), ports: Some(vec![ ServicePort { - name: Some("zk".to_string()), - port: zookeeper_security.client_port().into(), - protocol: Some("TCP".to_string()), - ..ServicePort::default() - }, - ServicePort { - name: Some("metrics".to_string()), - port: 9505, + name: Some(JMX_METRICS_PORT_NAME.to_string()), + port: JMX_METRICS_PORT as i32, protocol: Some("TCP".to_string()), ..ServicePort::default() }, ServicePort { - name: Some("native-metrics".to_string()), - port: metrics_port_from_rolegroup_config(rolegroup_config).into(), + name: Some(METRICS_PROVIDER_HTTP_PORT_NAME.to_string()), + port: metrics_port_from_rolegroup_config(rolegroup_config) as i32, protocol: Some("TCP".to_string()), ..ServicePort::default() }, @@ -738,9 +778,22 @@ fn build_server_rolegroup_service( }) } +pub fn build_role_listener_pvc( + group_listener_name: &str, + unversioned_recommended_labels: &Labels, +) -> Result { + ListenerOperatorVolumeSourceBuilder::new( + &ListenerReference::ListenerName(group_listener_name.to_string()), + unversioned_recommended_labels, + ) + .expect("ListenerOperatorVolumeSourceBuilder::new always returns Ok()") + .build_pvc(LISTENER_VOLUME_NAME.to_string()) + .context(BuildListenerPersistentVolumeSnafu) +} + /// The rolegroup [`StatefulSet`] runs the rolegroup, as configured by the administrator. /// -/// The [`Pod`](`stackable_operator::k8s_openapi::api::core::v1::Pod`)s are accessible through the corresponding [`Service`] (from [`build_server_rolegroup_service`]). +/// The [`Pod`](`stackable_operator::k8s_openapi::api::core::v1::Pod`)s are accessible through the corresponding headless [`Service`] (from [`build_server_rolegroup_headless_service`]). #[allow(clippy::too_many_arguments)] fn build_server_rolegroup_statefulset( zk: &v1alpha1::ZookeeperCluster, @@ -772,7 +825,7 @@ fn build_server_rolegroup_statefulset( }) .collect::>(); - let (pvc, resources) = zk + let (original_pvcs, resources) = zk .resources(zk_role, rolegroup_ref) .context(CrdValidationFailureSnafu)?; @@ -782,6 +835,29 @@ fn build_server_rolegroup_statefulset( ContainerBuilder::new(APP_NAME).expect("invalid hard-coded container name"); let mut pod_builder = PodBuilder::new(); + // Used for PVC templates that cannot be modified once they are deployed + let unversioned_recommended_labels = Labels::recommended(build_recommended_labels( + zk, + ZK_CONTROLLER_NAME, + // A version value is required, but we need to use something constant so that we don't run into immutabile field issues. + "none", + &rolegroup_ref.role, + &rolegroup_ref.role_group, + )) + .context(BuildLabelSnafu)?; + + let listener_pvc = build_role_listener_pvc( + &role_listener_name(zk, &ZookeeperRole::Server), + &unversioned_recommended_labels, + )?; + + let mut pvcs = original_pvcs; + pvcs.extend([listener_pvc]); + + cb_zookeeper + .add_volume_mount(LISTENER_VOLUME_NAME, LISTENER_VOLUME_DIR) + .context(AddVolumeMountSnafu)?; + let requested_secret_lifetime = merged_config .requested_secret_lifetime .context(MissingSecretLifetimeSnafu)?; @@ -903,12 +979,15 @@ fn build_server_rolegroup_statefulset( period_seconds: Some(1), ..Probe::default() }) - .add_container_port("zk", zookeeper_security.client_port().into()) - .add_container_port("zk-leader", 2888) - .add_container_port("zk-election", 3888) - .add_container_port("metrics", 9505) .add_container_port( - "native-metrics", + ZOOKEEPER_SERVER_PORT_NAME, + zookeeper_security.client_port() as i32, + ) + .add_container_port(ZOOKEEPER_LEADER_PORT_NAME, ZOOKEEPER_LEADER_PORT as i32) + .add_container_port(ZOOKEEPER_ELECTION_PORT_NAME, ZOOKEEPER_ELECTION_PORT as i32) + .add_container_port(JMX_METRICS_PORT_NAME, 9505) + .add_container_port( + METRICS_PROVIDER_HTTP_PORT_NAME, metrics_port_from_rolegroup_config(server_config).into(), ) .add_volume_mount("data", STACKABLE_DATA_DIR) @@ -1063,9 +1142,11 @@ fn build_server_rolegroup_statefulset( match_labels: Some(statefulset_match_labels.into()), ..LabelSelector::default() }, - service_name: Some(rolegroup_ref.object_name()), + service_name: Some(build_role_group_headless_service_name( + rolegroup_ref.object_name(), + )), template: pod_template, - volume_claim_templates: Some(pvc), + volume_claim_templates: Some(pvcs), ..StatefulSetSpec::default() }; diff --git a/rust/operator-binary/src/znode_controller.rs b/rust/operator-binary/src/znode_controller.rs index 41b9f92c..20ac36fe 100644 --- a/rust/operator-binary/src/znode_controller.rs +++ b/rust/operator-binary/src/znode_controller.rs @@ -8,7 +8,8 @@ use snafu::{OptionExt, ResultExt, Snafu}; use stackable_operator::{ cluster_resources::{ClusterResourceApplyStrategy, ClusterResources}, commons::product_image_selection::ResolvedProductImage, - k8s_openapi::api::core::v1::{ConfigMap, Service}, + crd::listener, + k8s_openapi::api::core::v1::ConfigMap, kube::{ self, Resource, api::ObjectMeta, @@ -24,8 +25,9 @@ use tracing::{debug, info}; use crate::{ APP_NAME, OPERATOR_NAME, - crd::{DOCKER_IMAGE_BASE_NAME, security::ZookeeperSecurity, v1alpha1}, - discovery::{self, build_discovery_configmaps}, + crd::{DOCKER_IMAGE_BASE_NAME, ZookeeperRole, security::ZookeeperSecurity, v1alpha1}, + discovery::{self, build_discovery_configmap}, + listener::role_listener_name, }; pub const ZNODE_CONTROLLER_NAME: &str = "znode"; @@ -52,7 +54,7 @@ pub enum Error { #[snafu(display("object does not refer to ZookeeperCluster"))] InvalidZkReference, - #[snafu(display("could not find {}", zk))] + #[snafu(display("could not find {zk:?}"))] FindZk { source: stackable_operator::client::Error, zk: ObjectRef, @@ -63,30 +65,30 @@ pub enum Error { zk: ObjectRef, }, - #[snafu(display("could not find server role service name for {}", zk))] + #[snafu(display("could not find server role service name for {zk:?}"))] NoZkSvcName { zk: ObjectRef, }, - #[snafu(display("could not find server role service for {}", zk))] + #[snafu(display("could not find server role service for {zk:?}"))] FindZkSvc { source: stackable_operator::client::Error, zk: ObjectRef, }, - #[snafu(display("failed to calculate FQDN for {}", zk))] + #[snafu(display("failed to calculate FQDN for {zk:?}"))] NoZkFqdn { zk: ObjectRef, }, - #[snafu(display("failed to ensure that ZNode {} exists in {}", znode_path, zk))] + #[snafu(display("failed to ensure that ZNode {znode_path:?} exists in {zk:?}"))] EnsureZnode { source: znode_mgmt::Error, zk: ObjectRef, znode_path: String, }, - #[snafu(display("failed to ensure that ZNode {} is missing from {}", znode_path, zk))] + #[snafu(display("failed to ensure that ZNode {znode_path:?} is missing from {zk:?}"))] EnsureZnodeMissing { source: znode_mgmt::Error, zk: ObjectRef, @@ -96,7 +98,7 @@ pub enum Error { #[snafu(display("failed to build discovery information"))] BuildDiscoveryConfigMap { source: discovery::Error }, - #[snafu(display("failed to save discovery information to {}", cm))] + #[snafu(display("failed to save discovery information to {cm:?}"))] ApplyDiscoveryConfigMap { source: stackable_operator::cluster_resources::Error, cm: ObjectRef, @@ -122,6 +124,12 @@ pub enum Error { #[snafu(display("failed to initialize security context"))] FailedToInitializeSecurityContext { source: crate::crd::security::Error }, + + #[snafu(display("Znode {znode:?} missing expected keys (name and/or namespace)"))] + ZnodeMissingExpectedKeys { + source: stackable_operator::cluster_resources::Error, + znode: ObjectRef, + }, } type Result = std::result::Result; @@ -153,7 +161,7 @@ impl ReconcilerError for Error { fn secondary_object(&self) -> Option> { match self { - Error::InvalidZookeeperZnode { source: _ } => None, + Error::InvalidZookeeperZnode { .. } => None, Error::ObjectMissingMetadata => None, Error::InvalidZkReference => None, Error::FindZk { zk, .. } => Some(zk.clone().erase()), @@ -163,13 +171,14 @@ impl ReconcilerError for Error { Error::NoZkFqdn { zk } => Some(zk.clone().erase()), Error::EnsureZnode { zk, .. } => Some(zk.clone().erase()), Error::EnsureZnodeMissing { zk, .. } => Some(zk.clone().erase()), - Error::BuildDiscoveryConfigMap { source: _ } => None, + Error::BuildDiscoveryConfigMap { .. } => None, Error::ApplyDiscoveryConfigMap { cm, .. } => Some(cm.clone().erase()), Error::ApplyStatus { .. } => None, - Error::Finalizer { source: _ } => None, - Error::DeleteOrphans { source: _ } => None, + Error::Finalizer { .. } => None, + Error::DeleteOrphans { .. } => None, Error::ObjectHasNoNamespace => None, - Error::FailedToInitializeSecurityContext { source: _ } => None, + Error::FailedToInitializeSecurityContext { .. } => None, + Error::ZnodeMissingExpectedKeys { .. } => None, } } } @@ -272,7 +281,7 @@ async fn reconcile_apply( &znode.object_ref(&()), ClusterResourceApplyStrategy::from(&zk.spec.cluster_operation), ) - .unwrap(); + .context(ZnodeMissingExpectedKeysSnafu { znode })?; znode_mgmt::ensure_znode_exists( &zk_mgmt_addr(&zk, &zookeeper_security, &client.kubernetes_cluster_info)?, @@ -284,12 +293,9 @@ async fn reconcile_apply( znode_path, })?; - let server_role_service = client - .get::( - &zk.server_role_service_name() - .with_context(|| NoZkSvcNameSnafu { - zk: ObjectRef::from_obj(&zk), - })?, + let listener = client + .get::( + &role_listener_name(&zk, &ZookeeperRole::Server), zk.metadata .namespace .as_deref() @@ -299,25 +305,23 @@ async fn reconcile_apply( .context(FindZkSvcSnafu { zk: ObjectRef::from_obj(&zk), })?; - for discovery_cm in build_discovery_configmaps( + + let discovery_cm = build_discovery_configmap( &zk, znode, - client, ZNODE_CONTROLLER_NAME, - &server_role_service, + listener, Some(znode_path), resolved_product_image, &zookeeper_security, ) - .await - .context(BuildDiscoveryConfigMapSnafu)? - { - let obj_ref = ObjectRef::from_obj(&discovery_cm); - cluster_resources - .add(client, discovery_cm) - .await - .with_context(|_| ApplyDiscoveryConfigMapSnafu { cm: obj_ref })?; - } + .context(BuildDiscoveryConfigMapSnafu)?; + + let obj_ref = ObjectRef::from_obj(&discovery_cm); + cluster_resources + .add(client, discovery_cm) + .await + .with_context(|_| ApplyDiscoveryConfigMapSnafu { cm: obj_ref })?; cluster_resources .delete_orphaned_resources(client) @@ -357,6 +361,16 @@ async fn reconcile_cleanup( Ok(controller::Action::await_change()) } +/// Get the ZooKeeper management host:port for the operator to manage the ZooKeeper cluster. +/// +/// This uses the _Server_ Role [Listener] address because it covers ZooKeeper replicas across all +/// RoleGroups. +/// This does mean that when the listenerClass is `external-stable`, the operator will need to be +/// able to access the external address (eg: Load Balancer). +/// +/// [Listener]: ::stackable_operator::crd::listener::v1alpha1::Listener +// NOTE (@NickLarsenNZ): If we want to keep this traffic internal, we would need to choose one of +// the RoleGroups headless services - or make a dedicated ClusterIP service for the operator to use. fn zk_mgmt_addr( zk: &v1alpha1::ZookeeperCluster, zookeeper_security: &ZookeeperSecurity, @@ -365,12 +379,13 @@ fn zk_mgmt_addr( // Rust ZooKeeper client does not support client-side load-balancing, so use // (load-balanced) global service instead. Ok(format!( - "{}:{}", - zk.server_role_service_fqdn(cluster_info) + "{hostname}:{port}", + hostname = zk + .server_role_listener_fqdn(cluster_info) .with_context(|| NoZkFqdnSnafu { zk: ObjectRef::from_obj(zk), })?, - zookeeper_security.client_port(), + port = zookeeper_security.client_port(), )) } @@ -427,34 +442,34 @@ mod znode_mgmt { source: std::io::Error, addr: String, }, - #[snafu(display("address {} did not resolve to any socket addresses", addr))] + #[snafu(display("address {addr:?} did not resolve to any socket addresses"))] AddrResolution { addr: String }, - #[snafu(display("failed to connect to {}", addr))] + #[snafu(display("failed to connect to {addr:?}"))] Connect { source: tokio_zookeeper::error::Error, addr: SocketAddr, }, - #[snafu(display("protocol error creating znode {}", path))] + #[snafu(display("protocol error creating znode {path:?}"))] CreateZnodeProtocol { source: tokio_zookeeper::error::Error, path: String, }, - #[snafu(display("failed to create znode {}", path))] + #[snafu(display("failed to create znode {path:?}"))] CreateZnode { source: tokio_zookeeper::error::Create, path: String, }, - #[snafu(display("protocol error deleting znode {}", path))] + #[snafu(display("protocol error deleting znode {path:?}"))] DeleteZnodeProtocol { source: tokio_zookeeper::error::Error, path: String, }, - #[snafu(display("failed to delete znode {}", path))] + #[snafu(display("failed to delete znode {path:?}"))] DeleteZnode { source: tokio_zookeeper::error::Delete, path: String, }, - #[snafu(display("failed to find children to delete of {}", path))] + #[snafu(display("failed to find children to delete of {path:?}"))] DeleteZnodeFindChildrenProtocol { source: tokio_zookeeper::error::Error, path: String, diff --git a/tests/templates/kuttl/smoke/10-assert.yaml.j2 b/tests/templates/kuttl/smoke/10-assert.yaml.j2 index 22ebd65a..cfaee9c0 100644 --- a/tests/templates/kuttl/smoke/10-assert.yaml.j2 +++ b/tests/templates/kuttl/smoke/10-assert.yaml.j2 @@ -83,3 +83,38 @@ status: expectedPods: 3 currentHealthy: 3 disruptionsAllowed: 1 +--- +apiVersion: v1 +kind: Service +metadata: + name: test-zk-server +spec: + type: ClusterIP # listenerClass: cluster-internal +--- +apiVersion: v1 +kind: Service +metadata: + name: test-zk-server-primary-headless +spec: + type: ClusterIP +--- +apiVersion: v1 +kind: Service +metadata: + name: test-zk-server-primary-metrics +spec: + type: ClusterIP +--- +apiVersion: v1 +kind: Service +metadata: + name: test-zk-server-secondary-headless +spec: + type: ClusterIP +--- +apiVersion: v1 +kind: Service +metadata: + name: test-zk-server-secondary-metrics +spec: + type: ClusterIP diff --git a/tests/templates/kuttl/smoke/test_tls.sh.j2 b/tests/templates/kuttl/smoke/test_tls.sh.j2 index 2d87370e..cc7be7f8 100755 --- a/tests/templates/kuttl/smoke/test_tls.sh.j2 +++ b/tests/templates/kuttl/smoke/test_tls.sh.j2 @@ -4,9 +4,9 @@ NAMESPACE=$1 {% if test_scenario['values']['use-client-auth-tls'] == 'true' or test_scenario['values']['use-server-tls'] == 'true' %} -SERVER="test-zk-server-primary-1.test-zk-server-primary.${NAMESPACE}.svc.cluster.local:2282" +SERVER="test-zk-server.${NAMESPACE}.svc.cluster.local:2282" {% else %} -SERVER="test-zk-server-primary-1.test-zk-server-primary.${NAMESPACE}.svc.cluster.local:2181" +SERVER="test-zk-server.${NAMESPACE}.svc.cluster.local:2181" {% endif %} # just to be safe... diff --git a/tests/templates/kuttl/smoke/test_zookeeper.py b/tests/templates/kuttl/smoke/test_zookeeper.py index 76f5b561..42fbe927 100755 --- a/tests/templates/kuttl/smoke/test_zookeeper.py +++ b/tests/templates/kuttl/smoke/test_zookeeper.py @@ -90,21 +90,12 @@ def check_monitoring(hosts): args = vars(all_args.parse_args()) namespace = args["namespace"] - host_primary_0 = ( - "http://test-zk-server-primary-0.test-zk-server-primary." - + namespace - + ".svc.cluster.local" - ) - host_primary_1 = ( - "http://test-zk-server-primary-1.test-zk-server-primary." - + namespace - + ".svc.cluster.local" - ) - host_secondary = ( - "http://test-zk-server-secondary-0.test-zk-server-secondary." - + namespace - + ".svc.cluster.local" - ) + # Pod FQDNs via the headless service. Note: Metrics will still be accessible + # even though they are exposed on a different service. ie: + # test-zk-server-primary-mertrics.{namespace}.svc.cluster.local + host_primary_0 = f"http://test-zk-server-primary-0.test-zk-server-primary-headless.{namespace}.svc.cluster.local" + host_primary_1 = f"http://test-zk-server-primary-1.test-zk-server-primary-headless.{namespace}.svc.cluster.local" + host_secondary = f"http://test-zk-server-secondary-0.test-zk-server-secondary-headless.{namespace}.svc.cluster.local" hosts = [host_primary_0, host_primary_1, host_secondary]