Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -38,9 +38,11 @@ default <R> Optional<R> getSecondaryResource(Class<R> expectedType) {
<R> Set<R> getSecondaryResources(Class<R> expectedType);

default <R> Stream<R> getSecondaryResourcesAsStream(Class<R> expectedType) {
return getSecondaryResources(expectedType).stream();
return getSecondaryResourcesAsStream(expectedType, false);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here we could use the former implementation and call this in case in the new method the deduplication is false.

}

<R> Stream<R> getSecondaryResourcesAsStream(Class<R> expectedType, boolean deduplicate);

<R> Optional<R> getSecondaryResource(Class<R> expectedType, String eventSourceName);

ControllerConfiguration<P> getControllerConfiguration();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
*/
package io.javaoperatorsdk.operator.api.reconciler;

import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
Expand All @@ -24,8 +25,12 @@
import java.util.stream.Collectors;
import java.util.stream.Stream;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import io.fabric8.kubernetes.api.model.HasMetadata;
import io.fabric8.kubernetes.client.KubernetesClient;
import io.javaoperatorsdk.operator.ReconcilerUtilsInternal;
import io.javaoperatorsdk.operator.api.config.ControllerConfiguration;
import io.javaoperatorsdk.operator.api.reconciler.dependent.DependentResource;
import io.javaoperatorsdk.operator.api.reconciler.dependent.managed.DefaultManagedWorkflowAndDependentResourceContext;
Expand All @@ -36,6 +41,7 @@
import io.javaoperatorsdk.operator.processing.event.ResourceID;

public class DefaultContext<P extends HasMetadata> implements Context<P> {
private static final Logger log = LoggerFactory.getLogger(DefaultContext.class);

private RetryInfo retryInfo;
private final Controller<P> controller;
Expand Down Expand Up @@ -73,11 +79,56 @@ public <T> Set<T> getSecondaryResources(Class<T> expectedType) {
return getSecondaryResourcesAsStream(expectedType).collect(Collectors.toSet());
}

@Override
public <R> Stream<R> getSecondaryResourcesAsStream(Class<R> expectedType) {
return controller.getEventSourceManager().getEventSourcesFor(expectedType).stream()
.map(es -> es.getSecondaryResources(primaryResource))
.flatMap(Set::stream);
public <R> Stream<R> getSecondaryResourcesAsStream(Class<R> expectedType, boolean deduplicate) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please add comprehensive javadoc

if (deduplicate && !HasMetadata.class.isAssignableFrom(expectedType)) {
throw new IllegalArgumentException("Can only de-duplicate HasMetadata descendants");
}

final var idToLatest = deduplicate ? new HashMap<ResourceID, String>() : null;
final var stream =
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As mentioned above for sake of simplicity/readbaility can if deduplicate is false could we just use the former implementation?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We could but this implementation avoids creating intermediary Streams, though I agree that it's slightly less readable. Let me see what I can do about that.

controller.getEventSourceManager().getEventSourcesFor(expectedType).stream()
.<R>mapMulti(
(es, consumer) ->
es.getSecondaryResources(primaryResource)
.forEach(
r -> {
var reject = false;
if (deduplicate) {
final boolean[] rejectAr = new boolean[1];
final var hm = (HasMetadata) r;
final var resourceVersion = hm.getMetadata().getResourceVersion();
idToLatest.merge(
ResourceID.fromResource(hm),
resourceVersion,
(existing, replacement) -> {
final var comparison =
ReconcilerUtilsInternal.compareResourceVersions(
existing, replacement);
rejectAr[0] =
comparison == 0; // rejecting resource if version is equal
return comparison >= 0 ? existing : replacement;
});
reject = rejectAr[0];
}
// only keep resources that don't have the same id and resource
// version
if (!reject) {
consumer.accept(r);
}
}));
if (deduplicate) {
//noinspection unchecked
return stream
.map(HasMetadata.class::cast)
.filter(
hm -> {
final var resourceVersion = hm.getMetadata().getResourceVersion();
return resourceVersion.equals(idToLatest.get(ResourceID.fromResource(hm)));
})
.map(hasMetadata -> (R) hasMetadata);
} else {
return stream;
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,17 @@
package io.javaoperatorsdk.operator.api.reconciler;

import java.lang.reflect.InvocationTargetException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.function.Function;
import java.util.function.Predicate;
import java.util.function.UnaryOperator;
import java.util.stream.Collector;
import java.util.stream.Collectors;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -32,6 +41,7 @@
import io.javaoperatorsdk.operator.processing.event.ResourceID;
import io.javaoperatorsdk.operator.processing.event.source.informer.ManagedInformerEventSource;

import static io.javaoperatorsdk.operator.ReconcilerUtilsInternal.compareResourceVersions;
import static io.javaoperatorsdk.operator.processing.KubernetesResourceUtils.getUID;
import static io.javaoperatorsdk.operator.processing.KubernetesResourceUtils.getVersion;

Expand Down Expand Up @@ -364,13 +374,13 @@ public static <R extends HasMetadata> R resourcePatch(
if (esList.isEmpty()) {
throw new IllegalStateException("No event source found for type: " + resource.getClass());
}
var es = esList.get(0);
if (esList.size() > 1) {
throw new IllegalStateException(
"Multiple event sources found for: "
+ resource.getClass()
+ " please provide the target event source");
log.warn(
"Multiple event sources found for type: {}, selecting first with name {}",
resource.getClass(),
es.name());
}
var es = esList.get(0);
if (es instanceof ManagedInformerEventSource mes) {
return resourcePatch(resource, updateOperation, mes);
} else {
Expand Down Expand Up @@ -595,4 +605,55 @@ public static <P extends HasMetadata> P addFinalizerWithSSA(
e);
}
}

/**
* Returns a collector that deduplicates Kubernetes objects by keeping only the one with the
* latest metadata.resourceVersion for each unique name and namespace combination. The intended
* use case is for the rather rare setup when there are overlapping {@link
* io.javaoperatorsdk.operator.processing.event.source.informer.InformerEventSource}s for a
* resource type.
*
* @param <T> the type of HasMetadata objects
* @return a collector that produces a collection of deduplicated Kubernetes objects
*/
public static <T extends HasMetadata> Collector<T, ?, Collection<T>> latestDistinct() {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

wasn't this moved to ReconcileUtilsInternal

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we were to keep this implementation, this actually would be removed

return Collectors.collectingAndThen(latestDistinctToMap(), Map::values);
}

/**
* Returns a collector that deduplicates Kubernetes objects by keeping only the one with the
* latest metadata.resourceVersion for each unique name and namespace combination. The intended
* use case is for the rather rare setup when there are overlapping {@link
* io.javaoperatorsdk.operator.processing.event.source.informer.InformerEventSource}s for a
* resource type.
*
* @param <T> the type of HasMetadata objects
* @return a collector that produces a List of deduplicated Kubernetes objects
*/
public static <T extends HasMetadata> Collector<T, ?, List<T>> latestDistinctList() {
return Collectors.collectingAndThen(
latestDistinctToMap(), map -> new ArrayList<>(map.values()));
}

/**
* Returns a collector that deduplicates Kubernetes objects by keeping only the one with the
* latest metadata.resourceVersion for each unique name and namespace combination. The intended
* use case is for the rather rare setup when there are overlapping {@link
* io.javaoperatorsdk.operator.processing.event.source.informer.InformerEventSource}s for a
* resource type.
*
* @param <T> the type of HasMetadata objects
* @return a collector that produces a Set of deduplicated Kubernetes objects
*/
public static <T extends HasMetadata> Collector<T, ?, Set<T>> latestDistinctSet() {
return Collectors.collectingAndThen(latestDistinctToMap(), map -> new HashSet<>(map.values()));
}

private static <T extends HasMetadata> Collector<T, ?, Map<ResourceID, T>> latestDistinctToMap() {
return Collectors.toMap(
ResourceID::fromResource,
Function.identity(),
(existing, replacement) ->
compareResourceVersions(existing, replacement) >= 0 ? existing : replacement);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -218,7 +218,8 @@ public Set<R> getSecondaryResources(P primary) {
}
return secondaryIDs.stream()
.map(this::get)
.flatMap(Optional::stream)
.filter(Optional::isPresent)
.map(Optional::get)
.collect(Collectors.toSet());
}

Expand Down
Loading