Skip to content

Commit db79526

Browse files
feat: Inject vector aggregator address as env into vector config (#589)
* start watching configmaps that are referenced in hive spec * wip: Inject the vector aggregator address into the vector config using an env var * add changelog entry * remove local patch for operator-rs * run cargo fmt * rename store * chore: Use borrows --------- Co-authored-by: Nick Larsen <nick.larsen@stackable.tech>
1 parent f98fe20 commit db79526

File tree

6 files changed

+113
-128
lines changed

6 files changed

+113
-128
lines changed

CHANGELOG.md

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,12 +10,16 @@ All notable changes to this project will be documented in this file.
1010
- BREAKING: The file log directory was set by `HIVE_OPERATOR_LOG_DIRECTORY`, and is now set by `ROLLING_LOGS`
1111
(or via `--rolling-logs <DIRECTORY>`).
1212
- Replace stackable-operator `print_startup_string` with `tracing::info!` with fields.
13+
- BREAKING: Inject the vector aggregator address into the vector config using the env var `VECTOR_AGGREGATOR_ADDRESS` instead
14+
of having the operator write it to the vector config ([#589]).
1315

1416
### Fixed
1517

1618
- Use `json` file extension for log files ([#591]).
19+
- Fix a bug where changes to ConfigMaps that are referenced in the HiveCluster spec didn't trigger a reconciliation ([#589]).
1720

1821
[#585]: https://github.com/stackabletech/hive-operator/pull/585
22+
[#589]: https://github.com/stackabletech/hdfs-operator/pull/589
1923
[#591]: https://github.com/stackabletech/hive-operator/pull/591
2024

2125
## [25.3.0] - 2025-03-21

Cargo.lock

Lines changed: 4 additions & 4 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@ repository = "https://github.com/stackabletech/hive-operator"
1111

1212
[workspace.dependencies]
1313
product-config = { git = "https://github.com/stackabletech/product-config.git", tag = "0.7.0" }
14-
stackable-operator = { git = "https://github.com/stackabletech/operator-rs.git", tag = "stackable-operator-0.89.1" }
14+
stackable-operator = { git = "https://github.com/stackabletech/operator-rs.git", tag = "stackable-operator-0.90.0" }
1515
stackable-telemetry = { git = "https://github.com/stackabletech/operator-rs.git", tag = "stackable-telemetry-0.4.0" }
1616
stackable-versioned = { git = "https://github.com/stackabletech/operator-rs.git", features = ["k8s"], tag = "stackable-versioned-0.7.1" }
1717

@@ -34,4 +34,5 @@ tokio = { version = "1.40", features = ["full"] }
3434
tracing = "0.1"
3535

3636
# [patch."https://github.com/stackabletech/operator-rs.git"]
37+
# stackable-operator = { path = "../operator-rs/crates/stackable-operator" }
3738
# stackable-operator = { git = "https://github.com/stackabletech//operator-rs.git", branch = "main" }

rust/operator-binary/src/controller.rs

Lines changed: 31 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -94,7 +94,7 @@ use crate::{
9494
kerberos_container_start_commands,
9595
},
9696
operations::{graceful_shutdown::add_graceful_shutdown_config, pdb::add_pdbs},
97-
product_logging::{extend_role_group_config_map, resolve_vector_aggregator_address},
97+
product_logging::extend_role_group_config_map,
9898
};
9999

100100
pub const HIVE_CONTROLLER_NAME: &str = "hivecluster";
@@ -228,10 +228,8 @@ pub enum Error {
228228
source: stackable_operator::cluster_resources::Error,
229229
},
230230

231-
#[snafu(display("failed to resolve the Vector aggregator address"))]
232-
ResolveVectorAggregatorAddress {
233-
source: crate::product_logging::Error,
234-
},
231+
#[snafu(display("vector agent is enabled but vector aggregator ConfigMap is missing"))]
232+
VectorAggregatorConfigMapMissing,
235233

236234
#[snafu(display("failed to add the logging configuration to the ConfigMap [{cm_name}]"))]
237235
InvalidLoggingConfig {
@@ -439,10 +437,6 @@ pub async fn reconcile_hive(
439437
.await
440438
.context(ApplyRoleServiceSnafu)?;
441439

442-
let vector_aggregator_address = resolve_vector_aggregator_address(hive, client)
443-
.await
444-
.context(ResolveVectorAggregatorAddressSnafu)?;
445-
446440
let mut ss_cond_builder = StatefulSetConditionBuilder::default();
447441

448442
for (rolegroup_name, rolegroup_config) in metastore_config.iter() {
@@ -461,7 +455,6 @@ pub async fn reconcile_hive(
461455
rolegroup_config,
462456
s3_connection_spec.as_ref(),
463457
&config,
464-
vector_aggregator_address.as_deref(),
465458
&client.kubernetes_cluster_info,
466459
)?;
467460
let rg_statefulset = build_metastore_rolegroup_statefulset(
@@ -604,7 +597,6 @@ fn build_metastore_rolegroup_config_map(
604597
role_group_config: &HashMap<PropertyNameKind, BTreeMap<String, String>>,
605598
s3_connection_spec: Option<&S3ConnectionSpec>,
606599
merged_config: &MetaStoreConfig,
607-
vector_aggregator_address: Option<&str>,
608600
cluster_info: &KubernetesClusterInfo,
609601
) -> Result<ConfigMap> {
610602
let mut hive_site_data = String::new();
@@ -718,15 +710,11 @@ fn build_metastore_rolegroup_config_map(
718710
cm_builder.add_data(CORE_SITE_XML, to_hadoop_xml(data.iter()));
719711
}
720712

721-
extend_role_group_config_map(
722-
rolegroup,
723-
vector_aggregator_address,
724-
&merged_config.logging,
725-
&mut cm_builder,
726-
)
727-
.context(InvalidLoggingConfigSnafu {
728-
cm_name: rolegroup.object_name(),
729-
})?;
713+
extend_role_group_config_map(rolegroup, &merged_config.logging, &mut cm_builder).context(
714+
InvalidLoggingConfigSnafu {
715+
cm_name: rolegroup.object_name(),
716+
},
717+
)?;
730718

731719
cm_builder
732720
.build()
@@ -1049,21 +1037,29 @@ fn build_metastore_rolegroup_statefulset(
10491037
// N.B. the vector container should *follow* the hive container so that the hive one is the
10501038
// default, is started first and can provide any dependencies that vector expects
10511039
if merged_config.logging.enable_vector_agent {
1052-
pod_builder.add_container(
1053-
product_logging::framework::vector_container(
1054-
resolved_product_image,
1055-
STACKABLE_CONFIG_MOUNT_DIR_NAME,
1056-
STACKABLE_LOG_DIR_NAME,
1057-
merged_config.logging.containers.get(&Container::Vector),
1058-
ResourceRequirementsBuilder::new()
1059-
.with_cpu_request("250m")
1060-
.with_cpu_limit("500m")
1061-
.with_memory_request("128Mi")
1062-
.with_memory_limit("128Mi")
1063-
.build(),
1064-
)
1065-
.context(BuildVectorContainerSnafu)?,
1066-
);
1040+
match &hive.spec.cluster_config.vector_aggregator_config_map_name {
1041+
Some(vector_aggregator_config_map_name) => {
1042+
pod_builder.add_container(
1043+
product_logging::framework::vector_container(
1044+
resolved_product_image,
1045+
STACKABLE_CONFIG_MOUNT_DIR_NAME,
1046+
STACKABLE_LOG_DIR_NAME,
1047+
merged_config.logging.containers.get(&Container::Vector),
1048+
ResourceRequirementsBuilder::new()
1049+
.with_cpu_request("250m")
1050+
.with_cpu_limit("500m")
1051+
.with_memory_request("128Mi")
1052+
.with_memory_limit("128Mi")
1053+
.build(),
1054+
vector_aggregator_config_map_name,
1055+
)
1056+
.context(BuildVectorContainerSnafu)?,
1057+
);
1058+
}
1059+
None => {
1060+
VectorAggregatorConfigMapMissingSnafu.fail()?;
1061+
}
1062+
}
10671063
}
10681064

10691065
let mut pod_template = pod_builder.build_template();

rust/operator-binary/src/main.rs

Lines changed: 70 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -19,10 +19,12 @@ use stackable_operator::{
1919
core::v1::{ConfigMap, Service},
2020
},
2121
kube::{
22+
ResourceExt,
2223
core::DeserializeGuard,
2324
runtime::{
2425
Controller,
2526
events::{Recorder, Reporter},
27+
reflector::ObjectRef,
2628
watcher,
2729
},
2830
},
@@ -128,51 +130,78 @@ async fn main() -> anyhow::Result<()> {
128130
instance: None,
129131
}));
130132

131-
Controller::new(
133+
let hive_controller = Controller::new(
132134
watch_namespace.get_api::<DeserializeGuard<v1alpha1::HiveCluster>>(&client),
133135
watcher::Config::default(),
134-
)
135-
.owns(
136-
watch_namespace.get_api::<Service>(&client),
137-
watcher::Config::default(),
138-
)
139-
.owns(
140-
watch_namespace.get_api::<StatefulSet>(&client),
141-
watcher::Config::default(),
142-
)
143-
.owns(
144-
watch_namespace.get_api::<ConfigMap>(&client),
145-
watcher::Config::default(),
146-
)
147-
.shutdown_on_signal()
148-
.run(
149-
controller::reconcile_hive,
150-
controller::error_policy,
151-
Arc::new(controller::Ctx {
152-
client: client.clone(),
153-
product_config,
154-
}),
155-
)
156-
// We can let the reporting happen in the background
157-
.for_each_concurrent(
158-
16, // concurrency limit
159-
|result| {
160-
// The event_recorder needs to be shared across all invocations, so that
161-
// events are correctly aggregated
162-
let event_recorder = event_recorder.clone();
163-
async move {
164-
report_controller_reconciled(
165-
&event_recorder,
166-
HIVE_FULL_CONTROLLER_NAME,
167-
&result,
168-
)
169-
.await;
170-
}
171-
},
172-
)
173-
.await;
136+
);
137+
let config_map_store = hive_controller.store();
138+
hive_controller
139+
.owns(
140+
watch_namespace.get_api::<Service>(&client),
141+
watcher::Config::default(),
142+
)
143+
.owns(
144+
watch_namespace.get_api::<StatefulSet>(&client),
145+
watcher::Config::default(),
146+
)
147+
.owns(
148+
watch_namespace.get_api::<ConfigMap>(&client),
149+
watcher::Config::default(),
150+
)
151+
.shutdown_on_signal()
152+
.watches(
153+
watch_namespace.get_api::<DeserializeGuard<ConfigMap>>(&client),
154+
watcher::Config::default(),
155+
move |config_map| {
156+
config_map_store
157+
.state()
158+
.into_iter()
159+
.filter(move |hive| references_config_map(hive, &config_map))
160+
.map(|hive| ObjectRef::from_obj(&*hive))
161+
},
162+
)
163+
.run(
164+
controller::reconcile_hive,
165+
controller::error_policy,
166+
Arc::new(controller::Ctx {
167+
client: client.clone(),
168+
product_config,
169+
}),
170+
)
171+
// We can let the reporting happen in the background
172+
.for_each_concurrent(
173+
16, // concurrency limit
174+
|result| {
175+
// The event_recorder needs to be shared across all invocations, so that
176+
// events are correctly aggregated
177+
let event_recorder = event_recorder.clone();
178+
async move {
179+
report_controller_reconciled(
180+
&event_recorder,
181+
HIVE_FULL_CONTROLLER_NAME,
182+
&result,
183+
)
184+
.await;
185+
}
186+
},
187+
)
188+
.await;
174189
}
175190
}
176191

177192
Ok(())
178193
}
194+
195+
fn references_config_map(
196+
hive: &DeserializeGuard<v1alpha1::HiveCluster>,
197+
config_map: &DeserializeGuard<ConfigMap>,
198+
) -> bool {
199+
let Ok(hive) = &hive.0 else {
200+
return false;
201+
};
202+
203+
match &hive.spec.cluster_config.hdfs {
204+
Some(hdfs_connection) => hdfs_connection.config_map == config_map.name_any(),
205+
None => false,
206+
}
207+
}

rust/operator-binary/src/product_logging.rs

Lines changed: 2 additions & 47 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,6 @@
1-
use snafu::{OptionExt, ResultExt, Snafu};
1+
use snafu::Snafu;
22
use stackable_operator::{
33
builder::configmap::ConfigMapBuilder,
4-
client::Client,
5-
k8s_openapi::api::core::v1::ConfigMap,
6-
kube::ResourceExt,
74
memory::BinaryMultiple,
85
product_logging::{
96
self,
@@ -33,54 +30,16 @@ pub enum Error {
3330
},
3431
#[snafu(display("crd validation failure"))]
3532
CrdValidationFailure { source: crate::crd::Error },
36-
#[snafu(display("vectorAggregatorConfigMapName must be set"))]
37-
MissingVectorAggregatorAddress,
3833
}
3934

4035
type Result<T, E = Error> = std::result::Result<T, E>;
4136

42-
const VECTOR_AGGREGATOR_CM_ENTRY: &str = "ADDRESS";
4337
const CONSOLE_CONVERSION_PATTERN: &str = "%d{ISO8601} %5p [%t] %c{2}: %m%n";
4438
const HIVE_LOG_FILE: &str = "hive.log4j2.xml";
4539

46-
/// Return the address of the Vector aggregator if the corresponding ConfigMap name is given in the
47-
/// cluster spec
48-
pub async fn resolve_vector_aggregator_address(
49-
hive: &v1alpha1::HiveCluster,
50-
client: &Client,
51-
) -> Result<Option<String>> {
52-
let vector_aggregator_address = if let Some(vector_aggregator_config_map_name) =
53-
&hive.spec.cluster_config.vector_aggregator_config_map_name
54-
{
55-
let vector_aggregator_address = client
56-
.get::<ConfigMap>(
57-
vector_aggregator_config_map_name,
58-
hive.namespace()
59-
.as_deref()
60-
.context(ObjectHasNoNamespaceSnafu)?,
61-
)
62-
.await
63-
.context(ConfigMapNotFoundSnafu {
64-
cm_name: vector_aggregator_config_map_name.to_string(),
65-
})?
66-
.data
67-
.and_then(|mut data| data.remove(VECTOR_AGGREGATOR_CM_ENTRY))
68-
.context(MissingConfigMapEntrySnafu {
69-
entry: VECTOR_AGGREGATOR_CM_ENTRY,
70-
cm_name: vector_aggregator_config_map_name.to_string(),
71-
})?;
72-
Some(vector_aggregator_address)
73-
} else {
74-
None
75-
};
76-
77-
Ok(vector_aggregator_address)
78-
}
79-
8040
/// Extend the role group ConfigMap with logging and Vector configurations
8141
pub fn extend_role_group_config_map(
8242
rolegroup: &RoleGroupRef<v1alpha1::HiveCluster>,
83-
vector_aggregator_address: Option<&str>,
8443
logging: &Logging<Container>,
8544
cm_builder: &mut ConfigMapBuilder,
8645
) -> Result<()> {
@@ -118,11 +77,7 @@ pub fn extend_role_group_config_map(
11877
if logging.enable_vector_agent {
11978
cm_builder.add_data(
12079
product_logging::framework::VECTOR_CONFIG_FILE,
121-
product_logging::framework::create_vector_config(
122-
rolegroup,
123-
vector_aggregator_address.context(MissingVectorAggregatorAddressSnafu)?,
124-
vector_log_config,
125-
),
80+
product_logging::framework::create_vector_config(rolegroup, vector_log_config),
12681
);
12782
}
12883

0 commit comments

Comments
 (0)