From fec20a215a423820ebb0161cdc705abb9a61b902 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Attila=20M=C3=A9sz=C3=A1ros?= Date: Wed, 21 Jan 2026 14:19:59 +0100 Subject: [PATCH 1/9] feat: distict latest resource collector MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Attila Mészáros --- .../api/reconciler/ReconcileUtils.java | 8 + .../api/reconciler/ReconcileUtilsTest.java | 269 ++++++++++++++++++ 2 files changed, 277 insertions(+) diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/reconciler/ReconcileUtils.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/reconciler/ReconcileUtils.java index ed02c56a01..8d359004c2 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/reconciler/ReconcileUtils.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/reconciler/ReconcileUtils.java @@ -16,8 +16,16 @@ package io.javaoperatorsdk.operator.api.reconciler; import java.lang.reflect.InvocationTargetException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; import java.util.function.Predicate; import java.util.function.UnaryOperator; +import java.util.stream.Collector; +import java.util.stream.Collectors; import org.slf4j.Logger; import org.slf4j.LoggerFactory; diff --git a/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/api/reconciler/ReconcileUtilsTest.java b/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/api/reconciler/ReconcileUtilsTest.java index f76ec61e16..f865c72a8f 100644 --- a/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/api/reconciler/ReconcileUtilsTest.java +++ b/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/api/reconciler/ReconcileUtilsTest.java @@ -15,9 +15,12 @@ */ package io.javaoperatorsdk.operator.api.reconciler; +import java.util.Collection; import java.util.Collections; import java.util.List; +import java.util.Set; import java.util.function.UnaryOperator; +import java.util.stream.Stream; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; @@ -325,4 +328,270 @@ void resourcePatchThrowsWhenEventSourceIsNotManagedInformer() { assertThat(exception.getMessage()).contains("Target event source must be a subclass off"); assertThat(exception.getMessage()).contains("ManagedInformerEventSource"); } + + @Test + void latestDistinctKeepsOnlyLatestResourceVersion() { + // Create multiple resources with same name and namespace but different versions + HasMetadata pod1v1 = + new PodBuilder() + .withMetadata( + new ObjectMetaBuilder() + .withName("pod1") + .withNamespace("default") + .withResourceVersion("100") + .build()) + .build(); + + HasMetadata pod1v2 = + new PodBuilder() + .withMetadata( + new ObjectMetaBuilder() + .withName("pod1") + .withNamespace("default") + .withResourceVersion("200") + .build()) + .build(); + + HasMetadata pod1v3 = + new PodBuilder() + .withMetadata( + new ObjectMetaBuilder() + .withName("pod1") + .withNamespace("default") + .withResourceVersion("150") + .build()) + .build(); + + // Create a resource with different name + HasMetadata pod2v1 = + new PodBuilder() + .withMetadata( + new ObjectMetaBuilder() + .withName("pod2") + .withNamespace("default") + .withResourceVersion("100") + .build()) + .build(); + + // Create a resource with same name but different namespace + HasMetadata pod1OtherNsv1 = + new PodBuilder() + .withMetadata( + new ObjectMetaBuilder() + .withName("pod1") + .withNamespace("other") + .withResourceVersion("50") + .build()) + .build(); + + Collection result = + Stream.of(pod1v1, pod1v2, pod1v3, pod2v1, pod1OtherNsv1) + .collect(ReconcileUtils.latestDistinct()); + + // Should have 3 resources: pod1 in default (latest version 200), pod2 in default, and pod1 in + // other + assertThat(result).hasSize(3); + + // Find pod1 in default namespace - should have version 200 + HasMetadata pod1InDefault = + result.stream() + .filter( + r -> + "pod1".equals(r.getMetadata().getName()) + && "default".equals(r.getMetadata().getNamespace())) + .findFirst() + .orElseThrow(); + assertThat(pod1InDefault.getMetadata().getResourceVersion()).isEqualTo("200"); + + // Find pod2 in default namespace - should exist + HasMetadata pod2InDefault = + result.stream() + .filter( + r -> + "pod2".equals(r.getMetadata().getName()) + && "default".equals(r.getMetadata().getNamespace())) + .findFirst() + .orElseThrow(); + assertThat(pod2InDefault.getMetadata().getResourceVersion()).isEqualTo("100"); + + // Find pod1 in other namespace - should exist + HasMetadata pod1InOther = + result.stream() + .filter( + r -> + "pod1".equals(r.getMetadata().getName()) + && "other".equals(r.getMetadata().getNamespace())) + .findFirst() + .orElseThrow(); + assertThat(pod1InOther.getMetadata().getResourceVersion()).isEqualTo("50"); + } + + @Test + void latestDistinctHandlesEmptyStream() { + Collection result = + Stream.empty().collect(ReconcileUtils.latestDistinct()); + + assertThat(result).isEmpty(); + } + + @Test + void latestDistinctHandlesSingleResource() { + HasMetadata pod = + new PodBuilder() + .withMetadata( + new ObjectMetaBuilder() + .withName("pod1") + .withNamespace("default") + .withResourceVersion("100") + .build()) + .build(); + + Collection result = Stream.of(pod).collect(ReconcileUtils.latestDistinct()); + + assertThat(result).hasSize(1); + assertThat(result).contains(pod); + } + + @Test + void latestDistinctComparesNumericVersionsCorrectly() { + // Test that version 1000 is greater than version 999 (not lexicographic) + HasMetadata podV999 = + new PodBuilder() + .withMetadata( + new ObjectMetaBuilder() + .withName("pod1") + .withNamespace("default") + .withResourceVersion("999") + .build()) + .build(); + + HasMetadata podV1000 = + new PodBuilder() + .withMetadata( + new ObjectMetaBuilder() + .withName("pod1") + .withNamespace("default") + .withResourceVersion("1000") + .build()) + .build(); + + Collection result = + Stream.of(podV999, podV1000).collect(ReconcileUtils.latestDistinct()); + + assertThat(result).hasSize(1); + HasMetadata resultPod = result.iterator().next(); + assertThat(resultPod.getMetadata().getResourceVersion()).isEqualTo("1000"); + } + + @Test + void latestDistinctListReturnsListType() { + Pod pod1v1 = + new PodBuilder() + .withMetadata( + new ObjectMetaBuilder() + .withName("pod1") + .withNamespace("default") + .withResourceVersion("100") + .build()) + .build(); + + Pod pod1v2 = + new PodBuilder() + .withMetadata( + new ObjectMetaBuilder() + .withName("pod1") + .withNamespace("default") + .withResourceVersion("200") + .build()) + .build(); + + Pod pod2v1 = + new PodBuilder() + .withMetadata( + new ObjectMetaBuilder() + .withName("pod2") + .withNamespace("default") + .withResourceVersion("100") + .build()) + .build(); + + List result = + Stream.of(pod1v1, pod1v2, pod2v1).collect(ReconcileUtils.latestDistinctList()); + + assertThat(result).isInstanceOf(List.class); + assertThat(result).hasSize(2); + + // Verify the list contains the correct resources + Pod pod1 = + result.stream() + .filter(r -> "pod1".equals(r.getMetadata().getName())) + .findFirst() + .orElseThrow(); + assertThat(pod1.getMetadata().getResourceVersion()).isEqualTo("200"); + } + + @Test + void latestDistinctSetReturnsSetType() { + Pod pod1v1 = + new PodBuilder() + .withMetadata( + new ObjectMetaBuilder() + .withName("pod1") + .withNamespace("default") + .withResourceVersion("100") + .build()) + .build(); + + Pod pod1v2 = + new PodBuilder() + .withMetadata( + new ObjectMetaBuilder() + .withName("pod1") + .withNamespace("default") + .withResourceVersion("200") + .build()) + .build(); + + Pod pod2v1 = + new PodBuilder() + .withMetadata( + new ObjectMetaBuilder() + .withName("pod2") + .withNamespace("default") + .withResourceVersion("100") + .build()) + .build(); + + Set result = Stream.of(pod1v1, pod1v2, pod2v1).collect(ReconcileUtils.latestDistinctSet()); + + assertThat(result).isInstanceOf(java.util.Set.class); + assertThat(result).hasSize(2); + + // Verify the set contains the correct resources + Pod pod1 = + result.stream() + .filter(r -> "pod1".equals(r.getMetadata().getName())) + .findFirst() + .orElseThrow(); + assertThat(pod1.getMetadata().getResourceVersion()).isEqualTo("200"); + } + + @Test + void latestDistinctListHandlesEmptyStream() { + List result = + Stream.empty().collect(ReconcileUtils.latestDistinctList()); + + assertThat(result).isEmpty(); + assertThat(result).isInstanceOf(List.class); + } + + @Test + void latestDistinctSetHandlesEmptyStream() { + Set result = + Stream.empty().collect(ReconcileUtils.latestDistinctSet()); + + assertThat(result).isEmpty(); + assertThat(result).isInstanceOf(Set.class); + } + } From f26ad07ac7ace2631ec27b30fd9688d1e024a8a7 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Attila=20M=C3=A9sz=C3=A1ros?= Date: Wed, 21 Jan 2026 15:04:40 +0100 Subject: [PATCH 2/9] wip MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Attila Mészáros --- .../api/reconciler/ReconcileUtils.java | 53 +++++ .../latestdistinct/LatestDistinctIT.java | 219 ++++++++++++++++++ .../LatestDistinctTestReconciler.java | 170 ++++++++++++++ .../LatestDistinctTestResource.java | 40 ++++ .../LatestDistinctTestResourceSpec.java | 28 +++ .../LatestDistinctTestResourceStatus.java | 46 ++++ 6 files changed, 556 insertions(+) create mode 100644 operator-framework/src/test/java/io/javaoperatorsdk/operator/baseapi/latestdistinct/LatestDistinctIT.java create mode 100644 operator-framework/src/test/java/io/javaoperatorsdk/operator/baseapi/latestdistinct/LatestDistinctTestReconciler.java create mode 100644 operator-framework/src/test/java/io/javaoperatorsdk/operator/baseapi/latestdistinct/LatestDistinctTestResource.java create mode 100644 operator-framework/src/test/java/io/javaoperatorsdk/operator/baseapi/latestdistinct/LatestDistinctTestResourceSpec.java create mode 100644 operator-framework/src/test/java/io/javaoperatorsdk/operator/baseapi/latestdistinct/LatestDistinctTestResourceStatus.java diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/reconciler/ReconcileUtils.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/reconciler/ReconcileUtils.java index 8d359004c2..1c52a733eb 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/reconciler/ReconcileUtils.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/reconciler/ReconcileUtils.java @@ -603,4 +603,57 @@ public static

P addFinalizerWithSSA( e); } } + + /** + * Returns a collector that deduplicates Kubernetes objects by keeping only the one with the + * latest metadata.resourceVersion for each unique name and namespace combination. The intended + * use case is for the rather rare setup when there are overlapping {@link + * io.javaoperatorsdk.operator.processing.event.source.informer.InformerEventSource}s for a + * resource type. + * + * @param the type of HasMetadata objects + * @return a collector that produces a collection of deduplicated Kubernetes objects + */ + public static Collector> latestDistinct() { + return Collectors.collectingAndThen(latestDistinctToMap(), Map::values); + } + + /** + * Returns a collector that deduplicates Kubernetes objects by keeping only the one with the + * latest metadata.resourceVersion for each unique name and namespace combination. The intended + * use case is for the rather rare setup when there are overlapping {@link + * io.javaoperatorsdk.operator.processing.event.source.informer.InformerEventSource}s for a + * resource type. + * + * @param the type of HasMetadata objects + * @return a collector that produces a List of deduplicated Kubernetes objects + */ + public static Collector> latestDistinctList() { + return Collectors.collectingAndThen( + latestDistinctToMap(), map -> new ArrayList<>(map.values())); + } + + /** + * Returns a collector that deduplicates Kubernetes objects by keeping only the one with the + * latest metadata.resourceVersion for each unique name and namespace combination. The intended + * use case is for the rather rare setup when there are overlapping {@link + * io.javaoperatorsdk.operator.processing.event.source.informer.InformerEventSource}s for a + * resource type. + * + * @param the type of HasMetadata objects + * @return a collector that produces a Set of deduplicated Kubernetes objects + */ + public static Collector> latestDistinctSet() { + return Collectors.collectingAndThen(latestDistinctToMap(), map -> new HashSet<>(map.values())); + } + + private static Collector> latestDistinctToMap() { + return Collectors.toMap( + resource -> + new ResourceID(resource.getMetadata().getName(), resource.getMetadata().getNamespace()), + resource -> resource, + (existing, replacement) -> + compareResourceVersions(existing, replacement) >= 0 ? existing : replacement); + } + } diff --git a/operator-framework/src/test/java/io/javaoperatorsdk/operator/baseapi/latestdistinct/LatestDistinctIT.java b/operator-framework/src/test/java/io/javaoperatorsdk/operator/baseapi/latestdistinct/LatestDistinctIT.java new file mode 100644 index 0000000000..7281287aca --- /dev/null +++ b/operator-framework/src/test/java/io/javaoperatorsdk/operator/baseapi/latestdistinct/LatestDistinctIT.java @@ -0,0 +1,219 @@ +/* + * Copyright Java Operator SDK Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.javaoperatorsdk.operator.baseapi.latestdistinct; + +import java.time.Duration; +import java.util.HashMap; +import java.util.Map; + +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.RegisterExtension; + +import io.fabric8.kubernetes.api.model.ConfigMap; +import io.fabric8.kubernetes.api.model.ConfigMapBuilder; +import io.fabric8.kubernetes.api.model.ObjectMetaBuilder; +import io.javaoperatorsdk.annotation.Sample; +import io.javaoperatorsdk.operator.junit.LocallyRunOperatorExtension; + +import static io.javaoperatorsdk.operator.baseapi.latestdistinct.LatestDistinctTestReconciler.LABEL_KEY; +import static io.javaoperatorsdk.operator.baseapi.latestdistinct.LatestDistinctTestReconciler.LABEL_TYPE_1; +import static io.javaoperatorsdk.operator.baseapi.latestdistinct.LatestDistinctTestReconciler.LABEL_TYPE_2; +import static org.assertj.core.api.Assertions.assertThat; +import static org.awaitility.Awaitility.await; + +@Sample( + tldr = "Latest Distinct with Multiple InformerEventSources", + description = + """ + Demonstrates using two separate InformerEventSource instances for ConfigMaps with \ + overlapping watches, combined with latestDistinctList() to deduplicate resources by \ + keeping the latest version. Also tests ReconcileUtils methods for patching resources \ + with proper cache updates. + """) +class LatestDistinctIT { + + public static final String TEST_RESOURCE_NAME = "test-resource"; + public static final String CONFIG_MAP_1 = "config-map-1"; + public static final String CONFIG_MAP_2 = "config-map-2"; + public static final String CONFIG_MAP_3 = "config-map-3"; + + @RegisterExtension + LocallyRunOperatorExtension operator = + LocallyRunOperatorExtension.builder() + .withReconciler(LatestDistinctTestReconciler.class) + .build(); + + @Test + void testLatestDistinctListWithTwoInformerEventSources() { + // Create the custom resource + var resource = createTestCustomResource(); + operator.create(resource); + + // Create ConfigMaps with type1 label (watched by first event source) + var cm1 = createConfigMap(CONFIG_MAP_1, LABEL_TYPE_1, resource, "value1"); + operator.create(cm1); + + var cm2 = createConfigMap(CONFIG_MAP_2, LABEL_TYPE_1, resource, "value2"); + operator.create(cm2); + + // Create ConfigMap with type2 label (watched by second event source) + var cm3 = createConfigMap(CONFIG_MAP_3, LABEL_TYPE_2, resource, "value3"); + operator.create(cm3); + + // Wait for reconciliation + var reconciler = operator.getReconcilerOfType(LatestDistinctTestReconciler.class); + await() + .atMost(Duration.ofSeconds(5)) + .pollDelay(Duration.ofMillis(300)) + .untilAsserted( + () -> { + assertThat(reconciler.getNumberOfExecutions()).isGreaterThanOrEqualTo(1); + var updatedResource = + operator.get(LatestDistinctTestResource.class, TEST_RESOURCE_NAME); + assertThat(updatedResource.getStatus()).isNotNull(); + // Should see 3 distinct ConfigMaps + assertThat(updatedResource.getStatus().getConfigMapCount()).isEqualTo(3); + assertThat(updatedResource.getStatus().getDataFromConfigMaps()) + .isEqualTo("value1,value2,value3"); + // Verify ReconcileUtils was used + assertThat(updatedResource.getStatus().isReconcileUtilsCalled()).isTrue(); + }); + + // Verify distinct ConfigMap names + assertThat(reconciler.getDistinctConfigMapNames()) + .containsExactlyInAnyOrder(CONFIG_MAP_1, CONFIG_MAP_2, CONFIG_MAP_3); + } + + @Test + void testLatestDistinctDeduplication() { + // Create the custom resource + var resource = createTestCustomResource(); + operator.create(resource); + + // Create a ConfigMap with type1 label + var cm1 = createConfigMap(CONFIG_MAP_1, LABEL_TYPE_1, resource, "initialValue"); + operator.create(cm1); + + // Wait for initial reconciliation + var reconciler = operator.getReconcilerOfType(LatestDistinctTestReconciler.class); + await() + .atMost(Duration.ofSeconds(5)) + .pollDelay(Duration.ofMillis(300)) + .untilAsserted( + () -> { + var updatedResource = + operator.get(LatestDistinctTestResource.class, TEST_RESOURCE_NAME); + assertThat(updatedResource.getStatus()).isNotNull(); + assertThat(updatedResource.getStatus().getConfigMapCount()).isEqualTo(1); + assertThat(updatedResource.getStatus().getDataFromConfigMaps()) + .isEqualTo("initialValue"); + }); + + int executionsBeforeUpdate = reconciler.getNumberOfExecutions(); + + // Update the ConfigMap + cm1 = operator.get(ConfigMap.class, CONFIG_MAP_1); + cm1.getData().put("key", "updatedValue"); + operator.replace(cm1); + + // Wait for reconciliation after update + await() + .atMost(Duration.ofSeconds(5)) + .pollDelay(Duration.ofMillis(300)) + .untilAsserted( + () -> { + assertThat(reconciler.getNumberOfExecutions()).isGreaterThan(executionsBeforeUpdate); + var updatedResource = + operator.get(LatestDistinctTestResource.class, TEST_RESOURCE_NAME); + assertThat(updatedResource.getStatus()).isNotNull(); + // Still should see only 1 distinct ConfigMap (same name, updated version) + assertThat(updatedResource.getStatus().getConfigMapCount()).isEqualTo(1); + assertThat(updatedResource.getStatus().getDataFromConfigMaps()) + .isEqualTo("updatedValue"); + }); + } + + @Test + void testReconcileUtilsServerSideApply() { + // Create the custom resource with initial spec value + var resource = createTestCustomResource(); + resource.getSpec().setValue("initialSpecValue"); + operator.create(resource); + + // Create a ConfigMap + var cm1 = createConfigMap(CONFIG_MAP_1, LABEL_TYPE_1, resource, "value1"); + operator.create(cm1); + + // Wait for reconciliation + var reconciler = operator.getReconcilerOfType(LatestDistinctTestReconciler.class); + await() + .atMost(Duration.ofSeconds(5)) + .pollDelay(Duration.ofMillis(300)) + .untilAsserted( + () -> { + var updatedResource = + operator.get(LatestDistinctTestResource.class, TEST_RESOURCE_NAME); + assertThat(updatedResource.getStatus()).isNotNull(); + assertThat(updatedResource.getStatus().isReconcileUtilsCalled()).isTrue(); + // Verify that the status was updated using ReconcileUtils.serverSideApplyStatus + assertThat(updatedResource.getStatus().getConfigMapCount()).isEqualTo(1); + }); + + // Verify no errors occurred + assertThat(reconciler.isErrorOccurred()).isFalse(); + } + + private LatestDistinctTestResource createTestCustomResource() { + var resource = new LatestDistinctTestResource(); + resource.setMetadata( + new ObjectMetaBuilder() + .withName(TEST_RESOURCE_NAME) + .withNamespace(operator.getNamespace()) + .build()); + resource.setSpec(new LatestDistinctTestResourceSpec()); + return resource; + } + + private ConfigMap createConfigMap( + String name, String labelValue, LatestDistinctTestResource owner, String dataValue) { + Map labels = new HashMap<>(); + labels.put(LABEL_KEY, labelValue); + + Map data = new HashMap<>(); + data.put("key", dataValue); + + return new ConfigMapBuilder() + .withMetadata( + new ObjectMetaBuilder() + .withName(name) + .withNamespace(operator.getNamespace()) + .withLabels(labels) + .build()) + .withData(data) + .withNewMetadata() + .withName(name) + .withNamespace(operator.getNamespace()) + .withLabels(labels) + .addNewOwnerReference() + .withApiVersion(owner.getApiVersion()) + .withKind(owner.getKind()) + .withName(owner.getMetadata().getName()) + .withUid(owner.getMetadata().getUid()) + .endOwnerReference() + .endMetadata() + .build(); + } +} diff --git a/operator-framework/src/test/java/io/javaoperatorsdk/operator/baseapi/latestdistinct/LatestDistinctTestReconciler.java b/operator-framework/src/test/java/io/javaoperatorsdk/operator/baseapi/latestdistinct/LatestDistinctTestReconciler.java new file mode 100644 index 0000000000..ee1d4ab473 --- /dev/null +++ b/operator-framework/src/test/java/io/javaoperatorsdk/operator/baseapi/latestdistinct/LatestDistinctTestReconciler.java @@ -0,0 +1,170 @@ +/* + * Copyright Java Operator SDK Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.javaoperatorsdk.operator.baseapi.latestdistinct; + +import java.util.ArrayList; +import java.util.List; +import java.util.Set; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +import io.fabric8.kubernetes.api.model.ConfigMap; +import io.javaoperatorsdk.operator.api.config.informer.InformerEventSourceConfiguration; +import io.javaoperatorsdk.operator.api.reconciler.Context; +import io.javaoperatorsdk.operator.api.reconciler.ControllerConfiguration; +import io.javaoperatorsdk.operator.api.reconciler.ErrorStatusUpdateControl; +import io.javaoperatorsdk.operator.api.reconciler.EventSourceContext; +import io.javaoperatorsdk.operator.api.reconciler.ReconcileUtils; +import io.javaoperatorsdk.operator.api.reconciler.Reconciler; +import io.javaoperatorsdk.operator.api.reconciler.UpdateControl; +import io.javaoperatorsdk.operator.processing.event.ResourceID; +import io.javaoperatorsdk.operator.processing.event.source.EventSource; +import io.javaoperatorsdk.operator.processing.event.source.informer.InformerEventSource; +import io.javaoperatorsdk.operator.support.TestExecutionInfoProvider; + +@ControllerConfiguration +public class LatestDistinctTestReconciler + implements Reconciler, TestExecutionInfoProvider { + + public static final String EVENT_SOURCE_1_NAME = "configmap-es-1"; + public static final String EVENT_SOURCE_2_NAME = "configmap-es-2"; + public static final String LABEL_TYPE_1 = "type1"; + public static final String LABEL_TYPE_2 = "type2"; + public static final String LABEL_KEY = "configmap-type"; + + private final AtomicInteger numberOfExecutions = new AtomicInteger(0); + private volatile boolean errorOccurred = false; + private final List distinctConfigMapNames = new ArrayList<>(); + + @Override + public UpdateControl reconcile( + LatestDistinctTestResource resource, Context context) { + numberOfExecutions.incrementAndGet(); + + // Get ConfigMaps from both event sources + var eventSource1 = + (InformerEventSource) + context.eventSourceRetriever().getEventSourceFor(ConfigMap.class, EVENT_SOURCE_1_NAME); + var eventSource2 = + (InformerEventSource) + context.eventSourceRetriever().getEventSourceFor(ConfigMap.class, EVENT_SOURCE_2_NAME); + + // Get all ConfigMaps from both event sources + // Using list() with a predicate that always returns true to get all resources + var configMapsFromEs1 = eventSource1.list(cm -> true); + var configMapsFromEs2 = eventSource2.list(cm -> true); + + // Use latestDistinctList to deduplicate ConfigMaps by keeping the latest version + List distinctConfigMaps = + Stream.concat(configMapsFromEs1, configMapsFromEs2) + .collect(ReconcileUtils.latestDistinctList()); + + // Store the distinct ConfigMap names for verification + synchronized (distinctConfigMapNames) { + distinctConfigMapNames.clear(); + distinctConfigMapNames.addAll( + distinctConfigMaps.stream() + .map(cm -> cm.getMetadata().getName()) + .sorted() + .collect(Collectors.toList())); + } + + // Update status with information from ConfigMaps + if (resource.getStatus() == null) { + resource.setStatus(new LatestDistinctTestResourceStatus()); + } + + resource.getStatus().setConfigMapCount(distinctConfigMaps.size()); + + // Concatenate data from all distinct ConfigMaps + String data = + distinctConfigMaps.stream() + .map(cm -> cm.getData() != null ? cm.getData().getOrDefault("key", "") : "") + .filter(s -> !s.isEmpty()) + .collect(Collectors.joining(",")); + + resource.getStatus().setDataFromConfigMaps(data); + + // Use ReconcileUtils to update the status + // This tests serverSideApplyStatus method + resource.getStatus().setReconcileUtilsCalled(true); + return UpdateControl.patchStatus(ReconcileUtils.serverSideApplyStatus(context, resource)); + } + + @Override + public List> prepareEventSources( + EventSourceContext context) { + // Create two separate InformerEventSource instances for ConfigMaps + // Each watches ConfigMaps with different labels + + // First event source: watches ConfigMaps with label "configmap-type: type1" + var configEs1 = + InformerEventSourceConfiguration.from(ConfigMap.class, LatestDistinctTestResource.class) + .withName(EVENT_SOURCE_1_NAME) + .withNamespacesInheritedFromController() + .withLabelSelector(LABEL_KEY + "=" + LABEL_TYPE_1) + .withSecondaryToPrimaryMapper( + cm -> + Set.of( + new ResourceID( + cm.getMetadata().getOwnerReferences().get(0).getName(), + cm.getMetadata().getNamespace()))) + .build(); + + // Second event source: watches ConfigMaps with label "configmap-type: type2" + var configEs2 = + InformerEventSourceConfiguration.from(ConfigMap.class, LatestDistinctTestResource.class) + .withName(EVENT_SOURCE_2_NAME) + .withNamespacesInheritedFromController() + .withLabelSelector(LABEL_KEY + "=" + LABEL_TYPE_2) + .withSecondaryToPrimaryMapper( + cm -> + Set.of( + new ResourceID( + cm.getMetadata().getOwnerReferences().get(0).getName(), + cm.getMetadata().getNamespace()))) + .build(); + + return List.of( + new InformerEventSource<>(configEs1, context), + new InformerEventSource<>(configEs2, context)); + } + + @Override + public int getNumberOfExecutions() { + return numberOfExecutions.get(); + } + + public List getDistinctConfigMapNames() { + synchronized (distinctConfigMapNames) { + return new ArrayList<>(distinctConfigMapNames); + } + } + + @Override + public ErrorStatusUpdateControl updateErrorStatus( + LatestDistinctTestResource resource, + Context context, + Exception e) { + errorOccurred = true; + return ErrorStatusUpdateControl.noStatusUpdate(); + } + + public boolean isErrorOccurred() { + return errorOccurred; + } +} diff --git a/operator-framework/src/test/java/io/javaoperatorsdk/operator/baseapi/latestdistinct/LatestDistinctTestResource.java b/operator-framework/src/test/java/io/javaoperatorsdk/operator/baseapi/latestdistinct/LatestDistinctTestResource.java new file mode 100644 index 0000000000..546e349b0a --- /dev/null +++ b/operator-framework/src/test/java/io/javaoperatorsdk/operator/baseapi/latestdistinct/LatestDistinctTestResource.java @@ -0,0 +1,40 @@ +/* + * Copyright Java Operator SDK Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.javaoperatorsdk.operator.baseapi.latestdistinct; + +import io.fabric8.kubernetes.api.model.Namespaced; +import io.fabric8.kubernetes.client.CustomResource; +import io.fabric8.kubernetes.model.annotation.Group; +import io.fabric8.kubernetes.model.annotation.ShortNames; +import io.fabric8.kubernetes.model.annotation.Version; + +@Group("sample.javaoperatorsdk") +@Version("v1") +@ShortNames("ldt") +public class LatestDistinctTestResource + extends CustomResource + implements Namespaced { + + @Override + protected LatestDistinctTestResourceSpec initSpec() { + return new LatestDistinctTestResourceSpec(); + } + + @Override + protected LatestDistinctTestResourceStatus initStatus() { + return new LatestDistinctTestResourceStatus(); + } +} diff --git a/operator-framework/src/test/java/io/javaoperatorsdk/operator/baseapi/latestdistinct/LatestDistinctTestResourceSpec.java b/operator-framework/src/test/java/io/javaoperatorsdk/operator/baseapi/latestdistinct/LatestDistinctTestResourceSpec.java new file mode 100644 index 0000000000..acfefab85e --- /dev/null +++ b/operator-framework/src/test/java/io/javaoperatorsdk/operator/baseapi/latestdistinct/LatestDistinctTestResourceSpec.java @@ -0,0 +1,28 @@ +/* + * Copyright Java Operator SDK Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.javaoperatorsdk.operator.baseapi.latestdistinct; + +public class LatestDistinctTestResourceSpec { + private String value; + + public String getValue() { + return value; + } + + public void setValue(String value) { + this.value = value; + } +} diff --git a/operator-framework/src/test/java/io/javaoperatorsdk/operator/baseapi/latestdistinct/LatestDistinctTestResourceStatus.java b/operator-framework/src/test/java/io/javaoperatorsdk/operator/baseapi/latestdistinct/LatestDistinctTestResourceStatus.java new file mode 100644 index 0000000000..199b957729 --- /dev/null +++ b/operator-framework/src/test/java/io/javaoperatorsdk/operator/baseapi/latestdistinct/LatestDistinctTestResourceStatus.java @@ -0,0 +1,46 @@ +/* + * Copyright Java Operator SDK Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.javaoperatorsdk.operator.baseapi.latestdistinct; + +public class LatestDistinctTestResourceStatus { + private int configMapCount; + private String dataFromConfigMaps; + private boolean reconcileUtilsCalled; + + public int getConfigMapCount() { + return configMapCount; + } + + public void setConfigMapCount(int configMapCount) { + this.configMapCount = configMapCount; + } + + public String getDataFromConfigMaps() { + return dataFromConfigMaps; + } + + public void setDataFromConfigMaps(String dataFromConfigMaps) { + this.dataFromConfigMaps = dataFromConfigMaps; + } + + public boolean isReconcileUtilsCalled() { + return reconcileUtilsCalled; + } + + public void setReconcileUtilsCalled(boolean reconcileUtilsCalled) { + this.reconcileUtilsCalled = reconcileUtilsCalled; + } +} From 97df3017a2b5a4f6ef9ad0af4ce62d8c2bd8f1ab Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Attila=20M=C3=A9sz=C3=A1ros?= Date: Mon, 26 Jan 2026 15:17:04 +0100 Subject: [PATCH 3/9] wip MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Attila Mészáros --- .../api/reconciler/ReconcileUtils.java | 10 +- .../api/reconciler/ReconcileUtilsTest.java | 11 +- .../latestdistinct/LatestDistinctIT.java | 130 +++--------------- .../LatestDistinctTestReconciler.java | 113 ++++++--------- .../LatestDistinctTestResourceStatus.java | 18 --- 5 files changed, 72 insertions(+), 210 deletions(-) diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/reconciler/ReconcileUtils.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/reconciler/ReconcileUtils.java index 1c52a733eb..84bd546031 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/reconciler/ReconcileUtils.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/reconciler/ReconcileUtils.java @@ -372,13 +372,13 @@ public static R resourcePatch( if (esList.isEmpty()) { throw new IllegalStateException("No event source found for type: " + resource.getClass()); } + var es = esList.get(0); if (esList.size() > 1) { - throw new IllegalStateException( - "Multiple event sources found for: " - + resource.getClass() - + " please provide the target event source"); + log.warn( + "Multiple event source found for type: {}, selecting first with name {}", + resource.getClass(), + log.isWarnEnabled() ? es.name() : null); } - var es = esList.get(0); if (es instanceof ManagedInformerEventSource mes) { return resourcePatch(resource, updateOperation, mes); } else { diff --git a/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/api/reconciler/ReconcileUtilsTest.java b/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/api/reconciler/ReconcileUtilsTest.java index f865c72a8f..d6cc12dfeb 100644 --- a/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/api/reconciler/ReconcileUtilsTest.java +++ b/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/api/reconciler/ReconcileUtilsTest.java @@ -291,7 +291,7 @@ void resourcePatchThrowsWhenNoEventSourceFound() { } @Test - void resourcePatchThrowsWhenMultipleEventSourcesFound() { + void resourcePatchUsesFirstEventSourceIfMultipleEventSourcesPresent() { var resource = TestUtils.testCustomResource1(); var eventSourceRetriever = mock(EventSourceRetriever.class); var eventSource1 = mock(ManagedInformerEventSource.class); @@ -301,13 +301,10 @@ void resourcePatchThrowsWhenMultipleEventSourcesFound() { when(eventSourceRetriever.getEventSourcesFor(TestCustomResource.class)) .thenReturn(List.of(eventSource1, eventSource2)); - var exception = - assertThrows( - IllegalStateException.class, - () -> ReconcileUtils.resourcePatch(context, resource, UnaryOperator.identity())); + ReconcileUtils.resourcePatch(context, resource, UnaryOperator.identity()); - assertThat(exception.getMessage()).contains("Multiple event sources found for"); - assertThat(exception.getMessage()).contains("please provide the target event source"); + verify(eventSource1, times(1)) + .eventFilteringUpdateAndCacheResource(any(), any(UnaryOperator.class)); } @Test diff --git a/operator-framework/src/test/java/io/javaoperatorsdk/operator/baseapi/latestdistinct/LatestDistinctIT.java b/operator-framework/src/test/java/io/javaoperatorsdk/operator/baseapi/latestdistinct/LatestDistinctIT.java index 7281287aca..c66b0dc4de 100644 --- a/operator-framework/src/test/java/io/javaoperatorsdk/operator/baseapi/latestdistinct/LatestDistinctIT.java +++ b/operator-framework/src/test/java/io/javaoperatorsdk/operator/baseapi/latestdistinct/LatestDistinctIT.java @@ -29,8 +29,6 @@ import io.javaoperatorsdk.operator.junit.LocallyRunOperatorExtension; import static io.javaoperatorsdk.operator.baseapi.latestdistinct.LatestDistinctTestReconciler.LABEL_KEY; -import static io.javaoperatorsdk.operator.baseapi.latestdistinct.LatestDistinctTestReconciler.LABEL_TYPE_1; -import static io.javaoperatorsdk.operator.baseapi.latestdistinct.LatestDistinctTestReconciler.LABEL_TYPE_2; import static org.assertj.core.api.Assertions.assertThat; import static org.awaitility.Awaitility.await; @@ -47,11 +45,10 @@ class LatestDistinctIT { public static final String TEST_RESOURCE_NAME = "test-resource"; public static final String CONFIG_MAP_1 = "config-map-1"; - public static final String CONFIG_MAP_2 = "config-map-2"; - public static final String CONFIG_MAP_3 = "config-map-3"; + public static final String DEFAULT_VALUE = "defaultValue"; @RegisterExtension - LocallyRunOperatorExtension operator = + LocallyRunOperatorExtension extension = LocallyRunOperatorExtension.builder() .withReconciler(LatestDistinctTestReconciler.class) .build(); @@ -60,120 +57,30 @@ class LatestDistinctIT { void testLatestDistinctListWithTwoInformerEventSources() { // Create the custom resource var resource = createTestCustomResource(); - operator.create(resource); + resource = extension.create(resource); // Create ConfigMaps with type1 label (watched by first event source) - var cm1 = createConfigMap(CONFIG_MAP_1, LABEL_TYPE_1, resource, "value1"); - operator.create(cm1); - - var cm2 = createConfigMap(CONFIG_MAP_2, LABEL_TYPE_1, resource, "value2"); - operator.create(cm2); - - // Create ConfigMap with type2 label (watched by second event source) - var cm3 = createConfigMap(CONFIG_MAP_3, LABEL_TYPE_2, resource, "value3"); - operator.create(cm3); + var cm1 = createConfigMap(CONFIG_MAP_1, resource); + extension.create(cm1); // Wait for reconciliation - var reconciler = operator.getReconcilerOfType(LatestDistinctTestReconciler.class); + var reconciler = extension.getReconcilerOfType(LatestDistinctTestReconciler.class); await() .atMost(Duration.ofSeconds(5)) .pollDelay(Duration.ofMillis(300)) .untilAsserted( () -> { - assertThat(reconciler.getNumberOfExecutions()).isGreaterThanOrEqualTo(1); var updatedResource = - operator.get(LatestDistinctTestResource.class, TEST_RESOURCE_NAME); + extension.get(LatestDistinctTestResource.class, TEST_RESOURCE_NAME); assertThat(updatedResource.getStatus()).isNotNull(); // Should see 3 distinct ConfigMaps - assertThat(updatedResource.getStatus().getConfigMapCount()).isEqualTo(3); - assertThat(updatedResource.getStatus().getDataFromConfigMaps()) - .isEqualTo("value1,value2,value3"); - // Verify ReconcileUtils was used - assertThat(updatedResource.getStatus().isReconcileUtilsCalled()).isTrue(); - }); - - // Verify distinct ConfigMap names - assertThat(reconciler.getDistinctConfigMapNames()) - .containsExactlyInAnyOrder(CONFIG_MAP_1, CONFIG_MAP_2, CONFIG_MAP_3); - } - - @Test - void testLatestDistinctDeduplication() { - // Create the custom resource - var resource = createTestCustomResource(); - operator.create(resource); - - // Create a ConfigMap with type1 label - var cm1 = createConfigMap(CONFIG_MAP_1, LABEL_TYPE_1, resource, "initialValue"); - operator.create(cm1); - - // Wait for initial reconciliation - var reconciler = operator.getReconcilerOfType(LatestDistinctTestReconciler.class); - await() - .atMost(Duration.ofSeconds(5)) - .pollDelay(Duration.ofMillis(300)) - .untilAsserted( - () -> { - var updatedResource = - operator.get(LatestDistinctTestResource.class, TEST_RESOURCE_NAME); - assertThat(updatedResource.getStatus()).isNotNull(); assertThat(updatedResource.getStatus().getConfigMapCount()).isEqualTo(1); - assertThat(updatedResource.getStatus().getDataFromConfigMaps()) - .isEqualTo("initialValue"); + assertThat(reconciler.isErrorOccurred()).isFalse(); + // note that since there are two event source, and we do the update through one event + // source + // the other will still propagate an event + assertThat(reconciler.getNumberOfExecutions()).isEqualTo(2); }); - - int executionsBeforeUpdate = reconciler.getNumberOfExecutions(); - - // Update the ConfigMap - cm1 = operator.get(ConfigMap.class, CONFIG_MAP_1); - cm1.getData().put("key", "updatedValue"); - operator.replace(cm1); - - // Wait for reconciliation after update - await() - .atMost(Duration.ofSeconds(5)) - .pollDelay(Duration.ofMillis(300)) - .untilAsserted( - () -> { - assertThat(reconciler.getNumberOfExecutions()).isGreaterThan(executionsBeforeUpdate); - var updatedResource = - operator.get(LatestDistinctTestResource.class, TEST_RESOURCE_NAME); - assertThat(updatedResource.getStatus()).isNotNull(); - // Still should see only 1 distinct ConfigMap (same name, updated version) - assertThat(updatedResource.getStatus().getConfigMapCount()).isEqualTo(1); - assertThat(updatedResource.getStatus().getDataFromConfigMaps()) - .isEqualTo("updatedValue"); - }); - } - - @Test - void testReconcileUtilsServerSideApply() { - // Create the custom resource with initial spec value - var resource = createTestCustomResource(); - resource.getSpec().setValue("initialSpecValue"); - operator.create(resource); - - // Create a ConfigMap - var cm1 = createConfigMap(CONFIG_MAP_1, LABEL_TYPE_1, resource, "value1"); - operator.create(cm1); - - // Wait for reconciliation - var reconciler = operator.getReconcilerOfType(LatestDistinctTestReconciler.class); - await() - .atMost(Duration.ofSeconds(5)) - .pollDelay(Duration.ofMillis(300)) - .untilAsserted( - () -> { - var updatedResource = - operator.get(LatestDistinctTestResource.class, TEST_RESOURCE_NAME); - assertThat(updatedResource.getStatus()).isNotNull(); - assertThat(updatedResource.getStatus().isReconcileUtilsCalled()).isTrue(); - // Verify that the status was updated using ReconcileUtils.serverSideApplyStatus - assertThat(updatedResource.getStatus().getConfigMapCount()).isEqualTo(1); - }); - - // Verify no errors occurred - assertThat(reconciler.isErrorOccurred()).isFalse(); } private LatestDistinctTestResource createTestCustomResource() { @@ -181,31 +88,30 @@ private LatestDistinctTestResource createTestCustomResource() { resource.setMetadata( new ObjectMetaBuilder() .withName(TEST_RESOURCE_NAME) - .withNamespace(operator.getNamespace()) + .withNamespace(extension.getNamespace()) .build()); resource.setSpec(new LatestDistinctTestResourceSpec()); return resource; } - private ConfigMap createConfigMap( - String name, String labelValue, LatestDistinctTestResource owner, String dataValue) { + private ConfigMap createConfigMap(String name, LatestDistinctTestResource owner) { Map labels = new HashMap<>(); - labels.put(LABEL_KEY, labelValue); + labels.put(LABEL_KEY, "val"); Map data = new HashMap<>(); - data.put("key", dataValue); + data.put("key", DEFAULT_VALUE); return new ConfigMapBuilder() .withMetadata( new ObjectMetaBuilder() .withName(name) - .withNamespace(operator.getNamespace()) + .withNamespace(extension.getNamespace()) .withLabels(labels) .build()) .withData(data) .withNewMetadata() .withName(name) - .withNamespace(operator.getNamespace()) + .withNamespace(extension.getNamespace()) .withLabels(labels) .addNewOwnerReference() .withApiVersion(owner.getApiVersion()) diff --git a/operator-framework/src/test/java/io/javaoperatorsdk/operator/baseapi/latestdistinct/LatestDistinctTestReconciler.java b/operator-framework/src/test/java/io/javaoperatorsdk/operator/baseapi/latestdistinct/LatestDistinctTestReconciler.java index ee1d4ab473..d182b52824 100644 --- a/operator-framework/src/test/java/io/javaoperatorsdk/operator/baseapi/latestdistinct/LatestDistinctTestReconciler.java +++ b/operator-framework/src/test/java/io/javaoperatorsdk/operator/baseapi/latestdistinct/LatestDistinctTestReconciler.java @@ -15,12 +15,10 @@ */ package io.javaoperatorsdk.operator.baseapi.latestdistinct; -import java.util.ArrayList; import java.util.List; +import java.util.Map; import java.util.Set; import java.util.concurrent.atomic.AtomicInteger; -import java.util.stream.Collectors; -import java.util.stream.Stream; import io.fabric8.kubernetes.api.model.ConfigMap; import io.javaoperatorsdk.operator.api.config.informer.InformerEventSourceConfiguration; @@ -34,89 +32,76 @@ import io.javaoperatorsdk.operator.processing.event.ResourceID; import io.javaoperatorsdk.operator.processing.event.source.EventSource; import io.javaoperatorsdk.operator.processing.event.source.informer.InformerEventSource; -import io.javaoperatorsdk.operator.support.TestExecutionInfoProvider; @ControllerConfiguration -public class LatestDistinctTestReconciler - implements Reconciler, TestExecutionInfoProvider { +public class LatestDistinctTestReconciler implements Reconciler { public static final String EVENT_SOURCE_1_NAME = "configmap-es-1"; public static final String EVENT_SOURCE_2_NAME = "configmap-es-2"; - public static final String LABEL_TYPE_1 = "type1"; - public static final String LABEL_TYPE_2 = "type2"; public static final String LABEL_KEY = "configmap-type"; + public static final String KEY_2 = "key2"; private final AtomicInteger numberOfExecutions = new AtomicInteger(0); private volatile boolean errorOccurred = false; - private final List distinctConfigMapNames = new ArrayList<>(); @Override public UpdateControl reconcile( LatestDistinctTestResource resource, Context context) { - numberOfExecutions.incrementAndGet(); - - // Get ConfigMaps from both event sources - var eventSource1 = - (InformerEventSource) - context.eventSourceRetriever().getEventSourceFor(ConfigMap.class, EVENT_SOURCE_1_NAME); - var eventSource2 = - (InformerEventSource) - context.eventSourceRetriever().getEventSourceFor(ConfigMap.class, EVENT_SOURCE_2_NAME); - - // Get all ConfigMaps from both event sources - // Using list() with a predicate that always returns true to get all resources - var configMapsFromEs1 = eventSource1.list(cm -> true); - var configMapsFromEs2 = eventSource2.list(cm -> true); - - // Use latestDistinctList to deduplicate ConfigMaps by keeping the latest version - List distinctConfigMaps = - Stream.concat(configMapsFromEs1, configMapsFromEs2) - .collect(ReconcileUtils.latestDistinctList()); - - // Store the distinct ConfigMap names for verification - synchronized (distinctConfigMapNames) { - distinctConfigMapNames.clear(); - distinctConfigMapNames.addAll( - distinctConfigMaps.stream() - .map(cm -> cm.getMetadata().getName()) - .sorted() - .collect(Collectors.toList())); - } // Update status with information from ConfigMaps if (resource.getStatus() == null) { resource.setStatus(new LatestDistinctTestResourceStatus()); } + var allConfigMaps = context.getSecondaryResourcesAsStream(ConfigMap.class).toList(); + if (allConfigMaps.size() < 2) { + // wait until both informers see the config map + return UpdateControl.noUpdate(); + } + // makes sure that distinc config maps returned + var distinctConfigMaps = + context + .getSecondaryResourcesAsStream(ConfigMap.class) + .collect(ReconcileUtils.latestDistinctList()); + if (distinctConfigMaps.size() != 1) { + errorOccurred = true; + throw new IllegalStateException(); + } resource.getStatus().setConfigMapCount(distinctConfigMaps.size()); + distinctConfigMaps.get(0).setData(Map.of(KEY_2, "val2")); + var updated = ReconcileUtils.update(context, distinctConfigMaps.get(0)); - // Concatenate data from all distinct ConfigMaps - String data = - distinctConfigMaps.stream() - .map(cm -> cm.getData() != null ? cm.getData().getOrDefault("key", "") : "") - .filter(s -> !s.isEmpty()) - .collect(Collectors.joining(",")); - - resource.getStatus().setDataFromConfigMaps(data); + // makes sure that distinc config maps returned + distinctConfigMaps = + context + .getSecondaryResourcesAsStream(ConfigMap.class) + .collect(ReconcileUtils.latestDistinctList()); - // Use ReconcileUtils to update the status - // This tests serverSideApplyStatus method - resource.getStatus().setReconcileUtilsCalled(true); - return UpdateControl.patchStatus(ReconcileUtils.serverSideApplyStatus(context, resource)); + if (distinctConfigMaps.size() != 1) { + errorOccurred = true; + throw new IllegalStateException(); + } + if (!distinctConfigMaps.get(0).getData().containsKey(KEY_2) + || !distinctConfigMaps + .get(0) + .getMetadata() + .getResourceVersion() + .equals(updated.getMetadata().getResourceVersion())) { + errorOccurred = true; + throw new IllegalStateException(); + } + numberOfExecutions.incrementAndGet(); + return UpdateControl.patchStatus(resource); } @Override public List> prepareEventSources( EventSourceContext context) { - // Create two separate InformerEventSource instances for ConfigMaps - // Each watches ConfigMaps with different labels - - // First event source: watches ConfigMaps with label "configmap-type: type1" var configEs1 = InformerEventSourceConfiguration.from(ConfigMap.class, LatestDistinctTestResource.class) .withName(EVENT_SOURCE_1_NAME) + .withLabelSelector(LABEL_KEY) .withNamespacesInheritedFromController() - .withLabelSelector(LABEL_KEY + "=" + LABEL_TYPE_1) .withSecondaryToPrimaryMapper( cm -> Set.of( @@ -125,12 +110,11 @@ public List> prepareEventSources( cm.getMetadata().getNamespace()))) .build(); - // Second event source: watches ConfigMaps with label "configmap-type: type2" var configEs2 = InformerEventSourceConfiguration.from(ConfigMap.class, LatestDistinctTestResource.class) .withName(EVENT_SOURCE_2_NAME) + .withLabelSelector(LABEL_KEY) .withNamespacesInheritedFromController() - .withLabelSelector(LABEL_KEY + "=" + LABEL_TYPE_2) .withSecondaryToPrimaryMapper( cm -> Set.of( @@ -144,17 +128,6 @@ public List> prepareEventSources( new InformerEventSource<>(configEs2, context)); } - @Override - public int getNumberOfExecutions() { - return numberOfExecutions.get(); - } - - public List getDistinctConfigMapNames() { - synchronized (distinctConfigMapNames) { - return new ArrayList<>(distinctConfigMapNames); - } - } - @Override public ErrorStatusUpdateControl updateErrorStatus( LatestDistinctTestResource resource, @@ -167,4 +140,8 @@ public ErrorStatusUpdateControl updateErrorStatus( public boolean isErrorOccurred() { return errorOccurred; } + + public int getNumberOfExecutions() { + return numberOfExecutions.get(); + } } diff --git a/operator-framework/src/test/java/io/javaoperatorsdk/operator/baseapi/latestdistinct/LatestDistinctTestResourceStatus.java b/operator-framework/src/test/java/io/javaoperatorsdk/operator/baseapi/latestdistinct/LatestDistinctTestResourceStatus.java index 199b957729..fd5ff82df5 100644 --- a/operator-framework/src/test/java/io/javaoperatorsdk/operator/baseapi/latestdistinct/LatestDistinctTestResourceStatus.java +++ b/operator-framework/src/test/java/io/javaoperatorsdk/operator/baseapi/latestdistinct/LatestDistinctTestResourceStatus.java @@ -17,8 +17,6 @@ public class LatestDistinctTestResourceStatus { private int configMapCount; - private String dataFromConfigMaps; - private boolean reconcileUtilsCalled; public int getConfigMapCount() { return configMapCount; @@ -27,20 +25,4 @@ public int getConfigMapCount() { public void setConfigMapCount(int configMapCount) { this.configMapCount = configMapCount; } - - public String getDataFromConfigMaps() { - return dataFromConfigMaps; - } - - public void setDataFromConfigMaps(String dataFromConfigMaps) { - this.dataFromConfigMaps = dataFromConfigMaps; - } - - public boolean isReconcileUtilsCalled() { - return reconcileUtilsCalled; - } - - public void setReconcileUtilsCalled(boolean reconcileUtilsCalled) { - this.reconcileUtilsCalled = reconcileUtilsCalled; - } } From a67b6730b5a14ed4cc9bb47d13afe279c9c96d0b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Attila=20M=C3=A9sz=C3=A1ros?= Date: Tue, 27 Jan 2026 12:03:10 +0100 Subject: [PATCH 4/9] Update operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/reconciler/ReconcileUtils.java Co-authored-by: Martin Stefanko --- .../javaoperatorsdk/operator/api/reconciler/ReconcileUtils.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/reconciler/ReconcileUtils.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/reconciler/ReconcileUtils.java index 84bd546031..9a8ba14599 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/reconciler/ReconcileUtils.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/reconciler/ReconcileUtils.java @@ -375,7 +375,7 @@ public static R resourcePatch( var es = esList.get(0); if (esList.size() > 1) { log.warn( - "Multiple event source found for type: {}, selecting first with name {}", + "Multiple event sources found for type: {}, selecting first with name {}", resource.getClass(), log.isWarnEnabled() ? es.name() : null); } From d14556271680b68e91e0a3b60b32e6740bbd7172 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Attila=20M=C3=A9sz=C3=A1ros?= Date: Tue, 27 Jan 2026 12:08:45 +0100 Subject: [PATCH 5/9] wip MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Attila Mészáros --- .../javaoperatorsdk/operator/api/reconciler/ReconcileUtils.java | 2 +- .../operator/api/reconciler/ReconcileUtilsTest.java | 2 -- 2 files changed, 1 insertion(+), 3 deletions(-) diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/reconciler/ReconcileUtils.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/reconciler/ReconcileUtils.java index 9a8ba14599..f8b42fc2a9 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/reconciler/ReconcileUtils.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/reconciler/ReconcileUtils.java @@ -377,7 +377,7 @@ public static R resourcePatch( log.warn( "Multiple event sources found for type: {}, selecting first with name {}", resource.getClass(), - log.isWarnEnabled() ? es.name() : null); + es.name()); } if (es instanceof ManagedInformerEventSource mes) { return resourcePatch(resource, updateOperation, mes); diff --git a/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/api/reconciler/ReconcileUtilsTest.java b/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/api/reconciler/ReconcileUtilsTest.java index d6cc12dfeb..aea27eff8d 100644 --- a/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/api/reconciler/ReconcileUtilsTest.java +++ b/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/api/reconciler/ReconcileUtilsTest.java @@ -579,7 +579,6 @@ void latestDistinctListHandlesEmptyStream() { Stream.empty().collect(ReconcileUtils.latestDistinctList()); assertThat(result).isEmpty(); - assertThat(result).isInstanceOf(List.class); } @Test @@ -588,7 +587,6 @@ void latestDistinctSetHandlesEmptyStream() { Stream.empty().collect(ReconcileUtils.latestDistinctSet()); assertThat(result).isEmpty(); - assertThat(result).isInstanceOf(Set.class); } } From 5a5027a004cdae48f37cd61426abcaebde127ca5 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Attila=20M=C3=A9sz=C3=A1ros?= Date: Wed, 28 Jan 2026 08:26:29 +0100 Subject: [PATCH 6/9] wip MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Attila Mészáros --- .../api/reconciler/ReconcileUtils.java | 98 +++++++++---------- .../api/reconciler/ReconcileUtilsTest.java | 5 +- 2 files changed, 53 insertions(+), 50 deletions(-) diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/reconciler/ReconcileUtils.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/reconciler/ReconcileUtils.java index f8b42fc2a9..66d644cd01 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/reconciler/ReconcileUtils.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/reconciler/ReconcileUtils.java @@ -40,6 +40,7 @@ import io.javaoperatorsdk.operator.processing.event.ResourceID; import io.javaoperatorsdk.operator.processing.event.source.informer.ManagedInformerEventSource; +import static io.javaoperatorsdk.operator.ReconcilerUtilsInternal.compareResourceVersions; import static io.javaoperatorsdk.operator.processing.KubernetesResourceUtils.getUID; import static io.javaoperatorsdk.operator.processing.KubernetesResourceUtils.getVersion; @@ -604,56 +605,55 @@ public static

P addFinalizerWithSSA( } } - /** - * Returns a collector that deduplicates Kubernetes objects by keeping only the one with the - * latest metadata.resourceVersion for each unique name and namespace combination. The intended - * use case is for the rather rare setup when there are overlapping {@link - * io.javaoperatorsdk.operator.processing.event.source.informer.InformerEventSource}s for a - * resource type. - * - * @param the type of HasMetadata objects - * @return a collector that produces a collection of deduplicated Kubernetes objects - */ - public static Collector> latestDistinct() { - return Collectors.collectingAndThen(latestDistinctToMap(), Map::values); - } - - /** - * Returns a collector that deduplicates Kubernetes objects by keeping only the one with the - * latest metadata.resourceVersion for each unique name and namespace combination. The intended - * use case is for the rather rare setup when there are overlapping {@link - * io.javaoperatorsdk.operator.processing.event.source.informer.InformerEventSource}s for a - * resource type. - * - * @param the type of HasMetadata objects - * @return a collector that produces a List of deduplicated Kubernetes objects - */ - public static Collector> latestDistinctList() { - return Collectors.collectingAndThen( - latestDistinctToMap(), map -> new ArrayList<>(map.values())); - } + /** + * Returns a collector that deduplicates Kubernetes objects by keeping only the one with the + * latest metadata.resourceVersion for each unique name and namespace combination. The intended + * use case is for the rather rare setup when there are overlapping {@link + * io.javaoperatorsdk.operator.processing.event.source.informer.InformerEventSource}s for a + * resource type. + * + * @param the type of HasMetadata objects + * @return a collector that produces a collection of deduplicated Kubernetes objects + */ + public static Collector> latestDistinct() { + return Collectors.collectingAndThen(latestDistinctToMap(), Map::values); + } - /** - * Returns a collector that deduplicates Kubernetes objects by keeping only the one with the - * latest metadata.resourceVersion for each unique name and namespace combination. The intended - * use case is for the rather rare setup when there are overlapping {@link - * io.javaoperatorsdk.operator.processing.event.source.informer.InformerEventSource}s for a - * resource type. - * - * @param the type of HasMetadata objects - * @return a collector that produces a Set of deduplicated Kubernetes objects - */ - public static Collector> latestDistinctSet() { - return Collectors.collectingAndThen(latestDistinctToMap(), map -> new HashSet<>(map.values())); - } + /** + * Returns a collector that deduplicates Kubernetes objects by keeping only the one with the + * latest metadata.resourceVersion for each unique name and namespace combination. The intended + * use case is for the rather rare setup when there are overlapping {@link + * io.javaoperatorsdk.operator.processing.event.source.informer.InformerEventSource}s for a + * resource type. + * + * @param the type of HasMetadata objects + * @return a collector that produces a List of deduplicated Kubernetes objects + */ + public static Collector> latestDistinctList() { + return Collectors.collectingAndThen( + latestDistinctToMap(), map -> new ArrayList<>(map.values())); + } - private static Collector> latestDistinctToMap() { - return Collectors.toMap( - resource -> - new ResourceID(resource.getMetadata().getName(), resource.getMetadata().getNamespace()), - resource -> resource, - (existing, replacement) -> - compareResourceVersions(existing, replacement) >= 0 ? existing : replacement); - } + /** + * Returns a collector that deduplicates Kubernetes objects by keeping only the one with the + * latest metadata.resourceVersion for each unique name and namespace combination. The intended + * use case is for the rather rare setup when there are overlapping {@link + * io.javaoperatorsdk.operator.processing.event.source.informer.InformerEventSource}s for a + * resource type. + * + * @param the type of HasMetadata objects + * @return a collector that produces a Set of deduplicated Kubernetes objects + */ + public static Collector> latestDistinctSet() { + return Collectors.collectingAndThen(latestDistinctToMap(), map -> new HashSet<>(map.values())); + } + private static Collector> latestDistinctToMap() { + return Collectors.toMap( + resource -> + new ResourceID(resource.getMetadata().getName(), resource.getMetadata().getNamespace()), + resource -> resource, + (existing, replacement) -> + compareResourceVersions(existing, replacement) >= 0 ? existing : replacement); + } } diff --git a/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/api/reconciler/ReconcileUtilsTest.java b/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/api/reconciler/ReconcileUtilsTest.java index aea27eff8d..a73d0c6a7a 100644 --- a/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/api/reconciler/ReconcileUtilsTest.java +++ b/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/api/reconciler/ReconcileUtilsTest.java @@ -25,6 +25,10 @@ import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; +import io.fabric8.kubernetes.api.model.HasMetadata; +import io.fabric8.kubernetes.api.model.ObjectMetaBuilder; +import io.fabric8.kubernetes.api.model.Pod; +import io.fabric8.kubernetes.api.model.PodBuilder; import io.fabric8.kubernetes.client.KubernetesClient; import io.fabric8.kubernetes.client.KubernetesClientException; import io.fabric8.kubernetes.client.dsl.MixedOperation; @@ -588,5 +592,4 @@ void latestDistinctSetHandlesEmptyStream() { assertThat(result).isEmpty(); } - } From fd78e57519d1e26e9500165ee6fba7f231d1b7cf Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Attila=20M=C3=A9sz=C3=A1ros?= Date: Thu, 29 Jan 2026 10:58:31 +0100 Subject: [PATCH 7/9] wip MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Attila Mészáros --- .../operator/api/reconciler/ReconcileUtilsTest.java | 2 -- 1 file changed, 2 deletions(-) diff --git a/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/api/reconciler/ReconcileUtilsTest.java b/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/api/reconciler/ReconcileUtilsTest.java index a73d0c6a7a..5da89ba9c4 100644 --- a/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/api/reconciler/ReconcileUtilsTest.java +++ b/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/api/reconciler/ReconcileUtilsTest.java @@ -519,7 +519,6 @@ void latestDistinctListReturnsListType() { List result = Stream.of(pod1v1, pod1v2, pod2v1).collect(ReconcileUtils.latestDistinctList()); - assertThat(result).isInstanceOf(List.class); assertThat(result).hasSize(2); // Verify the list contains the correct resources @@ -565,7 +564,6 @@ void latestDistinctSetReturnsSetType() { Set result = Stream.of(pod1v1, pod1v2, pod2v1).collect(ReconcileUtils.latestDistinctSet()); - assertThat(result).isInstanceOf(java.util.Set.class); assertThat(result).hasSize(2); // Verify the set contains the correct resources From ade5d7ffa7f1133b57e541f1af72026758043760 Mon Sep 17 00:00:00 2001 From: Chris Laprun Date: Thu, 29 Jan 2026 23:08:16 +0100 Subject: [PATCH 8/9] feat: provide de-duplicated secondary resources stream on Context Signed-off-by: Chris Laprun --- .../operator/api/reconciler/Context.java | 4 ++- .../api/reconciler/DefaultContext.java | 36 ++++++++++++++++--- .../api/reconciler/ReconcileUtils.java | 6 ++-- .../source/informer/InformerEventSource.java | 3 +- .../LatestDistinctTestReconciler.java | 5 +-- 5 files changed, 40 insertions(+), 14 deletions(-) diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/reconciler/Context.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/reconciler/Context.java index cc7c865dc5..f87aa7bcb1 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/reconciler/Context.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/reconciler/Context.java @@ -38,9 +38,11 @@ default Optional getSecondaryResource(Class expectedType) { Set getSecondaryResources(Class expectedType); default Stream getSecondaryResourcesAsStream(Class expectedType) { - return getSecondaryResources(expectedType).stream(); + return getSecondaryResourcesAsStream(expectedType, false); } + Stream getSecondaryResourcesAsStream(Class expectedType, boolean deduplicate); + Optional getSecondaryResource(Class expectedType, String eventSourceName); ControllerConfiguration

getControllerConfiguration(); diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/reconciler/DefaultContext.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/reconciler/DefaultContext.java index f1aeadd52a..e7ecf4dcb1 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/reconciler/DefaultContext.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/reconciler/DefaultContext.java @@ -24,6 +24,9 @@ import java.util.stream.Collectors; import java.util.stream.Stream; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + import io.fabric8.kubernetes.api.model.HasMetadata; import io.fabric8.kubernetes.client.KubernetesClient; import io.javaoperatorsdk.operator.api.config.ControllerConfiguration; @@ -35,7 +38,10 @@ import io.javaoperatorsdk.operator.processing.event.NoEventSourceForClassException; import io.javaoperatorsdk.operator.processing.event.ResourceID; +import static io.javaoperatorsdk.operator.ReconcilerUtilsInternal.compareResourceVersions; + public class DefaultContext

implements Context

{ + private static final Logger log = LoggerFactory.getLogger(DefaultContext.class); private RetryInfo retryInfo; private final Controller

controller; @@ -73,11 +79,31 @@ public Set getSecondaryResources(Class expectedType) { return getSecondaryResourcesAsStream(expectedType).collect(Collectors.toSet()); } - @Override - public Stream getSecondaryResourcesAsStream(Class expectedType) { - return controller.getEventSourceManager().getEventSourcesFor(expectedType).stream() - .map(es -> es.getSecondaryResources(primaryResource)) - .flatMap(Set::stream); + public Stream getSecondaryResourcesAsStream(Class expectedType, boolean deduplicate) { + if (deduplicate && !HasMetadata.class.isAssignableFrom(expectedType)) { + throw new IllegalArgumentException("Can only de-duplicate HasMetadata descendants"); + } + + final var stream = + controller.getEventSourceManager().getEventSourcesFor(expectedType).stream() + .mapMulti( + (es, consumer) -> es.getSecondaryResources(primaryResource).forEach(consumer)); + if (deduplicate) { + //noinspection unchecked + return stream + .map(HasMetadata.class::cast) + .collect( + Collectors.toUnmodifiableMap( + ResourceID::fromResource, + Function.identity(), + (existing, replacement) -> + compareResourceVersions(existing, replacement) >= 0 ? existing : replacement)) + .values() + .stream() + .map(hasMetadata -> (R) hasMetadata); + } else { + return stream; + } } @Override diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/reconciler/ReconcileUtils.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/reconciler/ReconcileUtils.java index 66d644cd01..cf3256ac4d 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/reconciler/ReconcileUtils.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/reconciler/ReconcileUtils.java @@ -22,6 +22,7 @@ import java.util.List; import java.util.Map; import java.util.Set; +import java.util.function.Function; import java.util.function.Predicate; import java.util.function.UnaryOperator; import java.util.stream.Collector; @@ -650,9 +651,8 @@ public static

P addFinalizerWithSSA( private static Collector> latestDistinctToMap() { return Collectors.toMap( - resource -> - new ResourceID(resource.getMetadata().getName(), resource.getMetadata().getNamespace()), - resource -> resource, + ResourceID::fromResource, + Function.identity(), (existing, replacement) -> compareResourceVersions(existing, replacement) >= 0 ? existing : replacement); } diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerEventSource.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerEventSource.java index 6743ff436a..b778747417 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerEventSource.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerEventSource.java @@ -218,7 +218,8 @@ public Set getSecondaryResources(P primary) { } return secondaryIDs.stream() .map(this::get) - .flatMap(Optional::stream) + .filter(Optional::isPresent) + .map(Optional::get) .collect(Collectors.toSet()); } diff --git a/operator-framework/src/test/java/io/javaoperatorsdk/operator/baseapi/latestdistinct/LatestDistinctTestReconciler.java b/operator-framework/src/test/java/io/javaoperatorsdk/operator/baseapi/latestdistinct/LatestDistinctTestReconciler.java index d182b52824..ccceb0d836 100644 --- a/operator-framework/src/test/java/io/javaoperatorsdk/operator/baseapi/latestdistinct/LatestDistinctTestReconciler.java +++ b/operator-framework/src/test/java/io/javaoperatorsdk/operator/baseapi/latestdistinct/LatestDistinctTestReconciler.java @@ -58,10 +58,7 @@ public UpdateControl reconcile( return UpdateControl.noUpdate(); } // makes sure that distinc config maps returned - var distinctConfigMaps = - context - .getSecondaryResourcesAsStream(ConfigMap.class) - .collect(ReconcileUtils.latestDistinctList()); + var distinctConfigMaps = context.getSecondaryResourcesAsStream(ConfigMap.class, true).toList(); if (distinctConfigMaps.size() != 1) { errorOccurred = true; throw new IllegalStateException(); From fdfaa630e793cdf1e4c0dca5ae01302914c49549 Mon Sep 17 00:00:00 2001 From: Chris Laprun Date: Thu, 29 Jan 2026 23:51:42 +0100 Subject: [PATCH 9/9] feat: provide de-duplicated secondary resources stream on Context Alternative without requiring collecting to a Map Signed-off-by: Chris Laprun --- .../api/reconciler/DefaultContext.java | 47 ++++++++++++++----- 1 file changed, 36 insertions(+), 11 deletions(-) diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/reconciler/DefaultContext.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/reconciler/DefaultContext.java index e7ecf4dcb1..40c6061fd0 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/reconciler/DefaultContext.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/reconciler/DefaultContext.java @@ -15,6 +15,7 @@ */ package io.javaoperatorsdk.operator.api.reconciler; +import java.util.HashMap; import java.util.Map; import java.util.Optional; import java.util.Set; @@ -29,6 +30,7 @@ import io.fabric8.kubernetes.api.model.HasMetadata; import io.fabric8.kubernetes.client.KubernetesClient; +import io.javaoperatorsdk.operator.ReconcilerUtilsInternal; import io.javaoperatorsdk.operator.api.config.ControllerConfiguration; import io.javaoperatorsdk.operator.api.reconciler.dependent.DependentResource; import io.javaoperatorsdk.operator.api.reconciler.dependent.managed.DefaultManagedWorkflowAndDependentResourceContext; @@ -38,8 +40,6 @@ import io.javaoperatorsdk.operator.processing.event.NoEventSourceForClassException; import io.javaoperatorsdk.operator.processing.event.ResourceID; -import static io.javaoperatorsdk.operator.ReconcilerUtilsInternal.compareResourceVersions; - public class DefaultContext

implements Context

{ private static final Logger log = LoggerFactory.getLogger(DefaultContext.class); @@ -84,22 +84,47 @@ public Stream getSecondaryResourcesAsStream(Class expectedType, boolea throw new IllegalArgumentException("Can only de-duplicate HasMetadata descendants"); } + final var idToLatest = deduplicate ? new HashMap() : null; final var stream = controller.getEventSourceManager().getEventSourcesFor(expectedType).stream() .mapMulti( - (es, consumer) -> es.getSecondaryResources(primaryResource).forEach(consumer)); + (es, consumer) -> + es.getSecondaryResources(primaryResource) + .forEach( + r -> { + var reject = false; + if (deduplicate) { + final boolean[] rejectAr = new boolean[1]; + final var hm = (HasMetadata) r; + final var resourceVersion = hm.getMetadata().getResourceVersion(); + idToLatest.merge( + ResourceID.fromResource(hm), + resourceVersion, + (existing, replacement) -> { + final var comparison = + ReconcilerUtilsInternal.compareResourceVersions( + existing, replacement); + rejectAr[0] = + comparison == 0; // rejecting resource if version is equal + return comparison >= 0 ? existing : replacement; + }); + reject = rejectAr[0]; + } + // only keep resources that don't have the same id and resource + // version + if (!reject) { + consumer.accept(r); + } + })); if (deduplicate) { //noinspection unchecked return stream .map(HasMetadata.class::cast) - .collect( - Collectors.toUnmodifiableMap( - ResourceID::fromResource, - Function.identity(), - (existing, replacement) -> - compareResourceVersions(existing, replacement) >= 0 ? existing : replacement)) - .values() - .stream() + .filter( + hm -> { + final var resourceVersion = hm.getMetadata().getResourceVersion(); + return resourceVersion.equals(idToLatest.get(ResourceID.fromResource(hm))); + }) .map(hasMetadata -> (R) hasMetadata); } else { return stream;