Skip to content
Empty file modified mvnw
100644 → 100755
Empty file.
39 changes: 38 additions & 1 deletion src/main/java/dev/openfeature/sdk/EventProvider.java
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,12 @@

import dev.openfeature.sdk.internal.ConfigurableThreadFactory;
import dev.openfeature.sdk.internal.TriConsumer;
import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.function.BiConsumer;
import lombok.extern.slf4j.Slf4j;

/**
Expand All @@ -22,6 +25,7 @@
@Slf4j
public abstract class EventProvider implements FeatureProvider {
private EventProviderListener eventProviderListener;
private final List<BiConsumer<ProviderEvent, ProviderEventDetails>> eventObservers = new CopyOnWriteArrayList<>();
private final ExecutorService emitterExecutor =
Executors.newCachedThreadPool(new ConfigurableThreadFactory("openfeature-event-emitter-thread", true));

Expand Down Expand Up @@ -54,6 +58,31 @@ void detach() {
this.onEmit = null;
}

/**
* Add a provider event observer.
*
* <p>Observers are invoked whenever this provider emits an event and are intended for advanced
* provider composition scenarios.
*
* @param observer observer callback
*/
public void addEventObserver(BiConsumer<ProviderEvent, ProviderEventDetails> observer) {
if (observer != null) {
eventObservers.add(observer);
}
}

/**
* Remove a previously registered provider event observer.
*
* @param observer observer callback
*/
public void removeEventObserver(BiConsumer<ProviderEvent, ProviderEventDetails> observer) {
if (observer != null) {
eventObservers.remove(observer);
}
}

/**
* Stop the event emitter executor and block until either termination has completed
* or timeout period has elapsed.
Expand Down Expand Up @@ -81,8 +110,9 @@ public void shutdown() {
public Awaitable emit(final ProviderEvent event, final ProviderEventDetails details) {
final var localEventProviderListener = this.eventProviderListener;
final var localOnEmit = this.onEmit;
final var localEventObservers = this.eventObservers;

if (localEventProviderListener == null && localOnEmit == null) {
if (localEventProviderListener == null && localOnEmit == null && localEventObservers.isEmpty()) {
return Awaitable.FINISHED;
}

Expand All @@ -98,6 +128,13 @@ public Awaitable emit(final ProviderEvent event, final ProviderEventDetails deta
if (localOnEmit != null) {
localOnEmit.accept(this, event, details);
}
for (BiConsumer<ProviderEvent, ProviderEventDetails> observer : localEventObservers) {
try {
observer.accept(event, details);
} catch (Exception e) {
log.error("Exception in provider event observer {}", observer, e);
}
}
} finally {
awaitable.wakeup();
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,205 @@
package dev.openfeature.sdk.multiprovider;

import dev.openfeature.sdk.ErrorCode;
import dev.openfeature.sdk.EvaluationContext;
import dev.openfeature.sdk.FeatureProvider;
import dev.openfeature.sdk.ProviderEvaluation;
import java.util.ArrayList;
import java.util.Collections;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.function.BiConsumer;
import java.util.function.Function;
import java.util.stream.Collectors;
import lombok.Getter;

/**
* Comparison strategy.
*
* <p>Evaluates all providers in parallel and compares successful results.
* If all providers agree on the value, the fallback provider's result is returned.
* If providers disagree, the optional {@code onMismatch} callback is invoked
* and the fallback provider's result is returned.
* If any provider returns an error, all errors are collected and a
* {@link ErrorCode#GENERAL} error is returned.
*/
public class ComparisonStrategy implements Strategy {

private static final long DEFAULT_TIMEOUT_MS = 30_000;

@Getter
private final String fallbackProvider;

private final BiConsumer<String, Map<String, ProviderEvaluation<?>>> onMismatch;
private final ExecutorService executorService;
private final long timeoutMs;

/**
* Constructs a comparison strategy with a fallback provider.
*
* <p>Uses a shared {@link ForkJoinPool#commonPool()} for parallel evaluation.
*
* @param fallbackProvider provider name to use as fallback when successful
* providers disagree
*/
public ComparisonStrategy(String fallbackProvider) {
this(fallbackProvider, null);
}

/**
* Constructs a comparison strategy with fallback provider and mismatch callback.
*
* <p>Uses a shared {@link ForkJoinPool#commonPool()} for parallel evaluation.
*
* @param fallbackProvider provider name to use as fallback when successful
* providers disagree
* @param onMismatch callback invoked with all successful evaluations
* when they disagree
*/
public ComparisonStrategy(
String fallbackProvider, BiConsumer<String, Map<String, ProviderEvaluation<?>>> onMismatch) {
this(fallbackProvider, onMismatch, ForkJoinPool.commonPool(), DEFAULT_TIMEOUT_MS);
}

/**
* Constructs a comparison strategy with a caller-supplied executor.
*
* @param fallbackProvider provider name to use as fallback when successful
* providers disagree
* @param onMismatch callback invoked with all successful evaluations
* when they disagree (may be {@code null})
* @param executorService executor to use for parallel evaluation
* @param timeoutMs maximum time in milliseconds to wait for all
* providers to complete
*/
public ComparisonStrategy(
String fallbackProvider,
BiConsumer<String, Map<String, ProviderEvaluation<?>>> onMismatch,
ExecutorService executorService,
long timeoutMs) {
this.fallbackProvider = Objects.requireNonNull(fallbackProvider, "fallbackProvider must not be null");
this.onMismatch = onMismatch;
this.executorService = Objects.requireNonNull(executorService, "executorService must not be null");
this.timeoutMs = timeoutMs;
}

@Override
public <T> ProviderEvaluation<T> evaluate(

Check failure on line 95 in src/main/java/dev/openfeature/sdk/multiprovider/ComparisonStrategy.java

View check run for this annotation

SonarQubeCloud / SonarCloud Code Analysis

Refactor this method to reduce its Cognitive Complexity from 20 to the 15 allowed.

See more on https://sonarcloud.io/project/issues?id=open-feature_java-sdk&issues=AZ4I0vsHn67pI1HeG1OB&open=AZ4I0vsHn67pI1HeG1OB&pullRequest=1897
Map<String, FeatureProvider> providers,
String key,
T defaultValue,
EvaluationContext ctx,
Function<FeatureProvider, ProviderEvaluation<T>> providerFunction) {
if (providers.isEmpty()) {
return ProviderEvaluation.<T>builder()
.errorCode(ErrorCode.GENERAL)
.errorMessage("No providers configured")
.build();
}
if (!providers.containsKey(fallbackProvider)) {
throw new IllegalArgumentException("fallbackProvider not found in providers: " + fallbackProvider);
}

int capacity = providers.size() * 4 / 3 + 1;
Map<String, ProviderEvaluation<T>> successfulResults = new ConcurrentHashMap<>(capacity);
Map<String, String> providerErrors = new ConcurrentHashMap<>(capacity);

try {
List<Callable<Void>> tasks = new ArrayList<>(providers.size());
for (Map.Entry<String, FeatureProvider> entry : providers.entrySet()) {
String providerName = entry.getKey();
FeatureProvider provider = entry.getValue();
tasks.add(() -> {
try {
ProviderEvaluation<T> evaluation = providerFunction.apply(provider);
if (evaluation == null) {
providerErrors.put(providerName, "null evaluation");
} else if (evaluation.getErrorCode() == null) {
successfulResults.put(providerName, evaluation);
} else {
providerErrors.put(
providerName, evaluation.getErrorCode() + ": " + evaluation.getErrorMessage());
}
} catch (Exception e) {
providerErrors.put(providerName, e.getClass().getSimpleName() + ": " + e.getMessage());
}
return null;
});
}
List<Future<Void>> futures = executorService.invokeAll(tasks, timeoutMs, TimeUnit.MILLISECONDS);
for (Future<Void> future : futures) {
if (future.isCancelled()) {
return ProviderEvaluation.<T>builder()
.errorCode(ErrorCode.GENERAL)
.errorMessage("Comparison strategy timed out after " + timeoutMs + "ms")
.build();
}
future.get();
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
return ProviderEvaluation.<T>builder()
.errorCode(ErrorCode.GENERAL)
.errorMessage("Comparison strategy interrupted: " + e.getMessage())
.build();
} catch (Exception e) {
return ProviderEvaluation.<T>builder()
.errorCode(ErrorCode.GENERAL)
.errorMessage("Comparison strategy failed: " + e.getMessage())
.build();
}

if (!providerErrors.isEmpty()) {
return ProviderEvaluation.<T>builder()
.errorCode(ErrorCode.GENERAL)
.errorMessage("Provider errors: " + buildErrorSummary(providerErrors))
Comment on lines +158 to +163
.build();
}

ProviderEvaluation<T> fallbackResult = successfulResults.get(fallbackProvider);
if (fallbackResult == null) {
return ProviderEvaluation.<T>builder()
.errorCode(ErrorCode.GENERAL)
.errorMessage("Fallback provider did not return a successful evaluation: " + fallbackProvider)
.build();
}

if (allEvaluationsMatch(successfulResults)) {
return fallbackResult;
}

if (onMismatch != null) {
Map<String, ProviderEvaluation<?>> mismatchPayload = new LinkedHashMap<>(successfulResults);
onMismatch.accept(key, Collections.unmodifiableMap(mismatchPayload));
}
return fallbackResult;
}

private String buildErrorSummary(Map<String, String> errors) {
return errors.entrySet().stream()
.map(e -> e.getKey() + " -> " + e.getValue())
.collect(Collectors.joining("; "));
}

Comment on lines +180 to +191
private <T> boolean allEvaluationsMatch(Map<String, ProviderEvaluation<T>> results) {
ProviderEvaluation<T> baseline = null;
for (ProviderEvaluation<T> evaluation : results.values()) {
if (baseline == null) {
baseline = evaluation;
continue;
}
if (!Objects.equals(baseline.getValue(), evaluation.getValue())) {
return false;
}
}
return true;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
package dev.openfeature.sdk.multiprovider;

import dev.openfeature.sdk.ClientMetadata;
import java.util.Map;

/** Captures hook lifecycle context (client metadata and hints) for per-provider hook execution. */
final class HookExecutionContext {
final ClientMetadata clientMetadata;
final Map<String, Object> hints;

HookExecutionContext(ClientMetadata clientMetadata, Map<String, Object> hints) {
this.clientMetadata = clientMetadata;
this.hints = hints;
}
}
Loading
Loading