diff --git a/src/main/java/org/prebid/cache/config/MonitoringConfig.java b/src/main/java/org/prebid/cache/config/MonitoringConfig.java new file mode 100644 index 0000000..c2d8049 --- /dev/null +++ b/src/main/java/org/prebid/cache/config/MonitoringConfig.java @@ -0,0 +1,43 @@ +package org.prebid.cache.config; + +import lombok.extern.slf4j.Slf4j; +import org.prebid.cache.metrics.MetricsRecorder; +import org.prebid.cache.model.PayloadWrapper; +import org.prebid.cache.repository.CacheConfig; +import org.prebid.cache.repository.ReactiveRepository; +import org.prebid.cache.service.CacheMonitorService; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import reactor.core.Disposable; +import reactor.core.publisher.Flux; + +import java.time.Duration; +import java.util.Random; + +@Slf4j +@Configuration +@ConditionalOnProperty(prefix = "cache.monitoring", name = "enabled", havingValue = "true") +public class MonitoringConfig { + + @Bean + public CacheMonitorService storageMonitorService(ReactiveRepository repository, + MetricsRecorder metricsRecorder, + CacheConfig config) { + return new CacheMonitorService(repository, metricsRecorder, config); + } + + @Bean + public Disposable monitorScheduledPoller(CacheMonitorService cacheMonitorService, + CacheConfig cacheConfig, + @Value("${cache.monitoring.intervalSec}") int intervalSec) { + final Duration startDelay = Duration.ofSeconds(new Random().nextInt(0, cacheConfig.getTimeoutMs())); + return Flux.interval(startDelay, Duration.ofSeconds(intervalSec)) + .onBackpressureDrop() + .concatMap(counter -> cacheMonitorService.poll()) + .onErrorContinue((throwable, o) -> log.error( + "Failed during cache monitor polling: " + throwable.getMessage(), throwable)) + .subscribe(); + } +} diff --git a/src/main/java/org/prebid/cache/metrics/MeasurementTag.java b/src/main/java/org/prebid/cache/metrics/MeasurementTag.java index d0572de..b8d9913 100644 --- a/src/main/java/org/prebid/cache/metrics/MeasurementTag.java +++ b/src/main/java/org/prebid/cache/metrics/MeasurementTag.java @@ -20,7 +20,8 @@ public enum MeasurementTag { ERROR_EXISTING_ID("pbc.err.existingId"), ERROR_REJECTED_EXTERNAL_ID("pbc.err.rejectedExternalId"), PROXY_SUCCESS("pbc.proxy.success"), - PROXY_FAILURE("pbc.proxy.failure"); + PROXY_FAILURE("pbc.proxy.failure"), + ENTRY_LIFETIME("pbc.entryLifetimeSec.${ttlBucket}"); private final String tag; diff --git a/src/main/java/org/prebid/cache/metrics/MetricsRecorder.java b/src/main/java/org/prebid/cache/metrics/MetricsRecorder.java index 120d015..3c4daa7 100644 --- a/src/main/java/org/prebid/cache/metrics/MetricsRecorder.java +++ b/src/main/java/org/prebid/cache/metrics/MetricsRecorder.java @@ -6,12 +6,15 @@ import org.prebid.cache.handlers.ServiceType; import org.springframework.stereotype.Component; +import java.time.Duration; + @Component public class MetricsRecorder { private final MeterRegistry meterRegistry; protected static final String PREFIX_PLACEHOLDER = "\\$\\{prefix\\}"; + protected static final String TTL_BUCKET_PLACEHOLDER = "\\$\\{ttlBucket\\}"; public MetricsRecorder(MeterRegistry meterRegistry) { this.meterRegistry = meterRegistry; @@ -66,13 +69,16 @@ public void markMeterForTag(final String prefix, final MeasurementTag measuremen public MetricsRecorderTimer createRequestTimerForServiceType(final ServiceType serviceType) { if (serviceType.equals(ServiceType.FETCH)) { return new MetricsRecorderTimer( - MeasurementTag.REQUEST_DURATION.getTag() - .replaceAll(PREFIX_PLACEHOLDER, "read")); + MeasurementTag.REQUEST_DURATION.getTag().replaceAll(PREFIX_PLACEHOLDER, "read")); } else if (serviceType.equals(ServiceType.SAVE)) { return new MetricsRecorderTimer( - MeasurementTag.REQUEST_DURATION.getTag() - .replaceAll(PREFIX_PLACEHOLDER, "write")); + MeasurementTag.REQUEST_DURATION.getTag().replaceAll(PREFIX_PLACEHOLDER, "write")); } return null; } + + public void recordEntryLifetime(String bucketName, Duration entryLifetime) { + meterRegistry.timer(MeasurementTag.ENTRY_LIFETIME.getTag().replaceAll(TTL_BUCKET_PLACEHOLDER, bucketName)) + .record(Duration.ofSeconds(entryLifetime.getSeconds())); + } } diff --git a/src/main/java/org/prebid/cache/model/PayloadWrapper.java b/src/main/java/org/prebid/cache/model/PayloadWrapper.java index fa59143..5d92aec 100644 --- a/src/main/java/org/prebid/cache/model/PayloadWrapper.java +++ b/src/main/java/org/prebid/cache/model/PayloadWrapper.java @@ -10,7 +10,7 @@ public class PayloadWrapper { String id; String prefix; Payload payload; - + Long timestamp; Long expiry; transient boolean isExternalId; diff --git a/src/main/java/org/prebid/cache/service/CacheMonitorService.java b/src/main/java/org/prebid/cache/service/CacheMonitorService.java new file mode 100644 index 0000000..1d7fd11 --- /dev/null +++ b/src/main/java/org/prebid/cache/service/CacheMonitorService.java @@ -0,0 +1,101 @@ +package org.prebid.cache.service; + +import org.prebid.cache.exceptions.PayloadWrapperPropertyException; +import org.prebid.cache.metrics.MetricsRecorder; +import org.prebid.cache.model.PayloadWrapper; +import org.prebid.cache.repository.CacheConfig; +import org.prebid.cache.repository.ReactiveRepository; +import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; +import reactor.core.scheduler.Schedulers; + +import java.time.Duration; +import java.time.Instant; +import java.util.Map; +import java.util.Objects; +import java.util.UUID; +import java.util.concurrent.ConcurrentHashMap; + +public class CacheMonitorService { + + private static final String DEFAULT_CACHE_TTL = "default"; + private static final String MIN_CACHE_TTL = "min"; + private static final String MAX_CACHE_TTL = "max"; + private static final String STATIC_CACHE_TTL = "static"; + + private final ReactiveRepository repository; + private final MetricsRecorder metricsRecorder; + private final String prefix; + + private final Map expiryBuckets; + private final Map monitoredEntities = new ConcurrentHashMap<>(); + + public CacheMonitorService(ReactiveRepository repository, + MetricsRecorder metricsRecorder, + CacheConfig config) { + + this.repository = repository; + this.metricsRecorder = metricsRecorder; + this.prefix = config.getPrefix(); + expiryBuckets = resolveExpiryBuckets(config); + } + + public Mono poll() { + return Flux.fromIterable(expiryBuckets.entrySet()) + .parallel() + .runOn(Schedulers.parallel()) + .flatMap(entry -> + processExpiryBucket(entry.getKey(), + entry.getValue(), + monitoredEntities.get(entry.getKey())) + .doOnNext(wrapper -> monitoredEntities.put(entry.getKey(), wrapper)) + ) + .then(); + } + + private Mono processExpiryBucket(Long ttl, String bucketName, PayloadWrapper payloadWrapper) { + if (Objects.isNull(payloadWrapper)) { + return saveNewWrapper(ttl); + } + + try { + final String normalizedId = payloadWrapper.getNormalizedId(); + return repository.findById(normalizedId) + .switchIfEmpty(Mono.defer(() -> { + final Duration entryLifetime = + Duration.ofMillis(Instant.now().toEpochMilli() - payloadWrapper.getTimestamp()); + metricsRecorder.recordEntryLifetime(bucketName, entryLifetime); + return saveNewWrapper(ttl); + })); + } catch (PayloadWrapperPropertyException e) { + return Mono.error(new RuntimeException(e)); + } + } + + private Mono saveNewWrapper(Long ttl) { + final PayloadWrapper newWrapper = PayloadWrapper.builder() + .id(UUID.randomUUID().toString()) + .prefix(prefix) + .timestamp(Instant.now().toEpochMilli()) + .expiry(ttl) + .build(); + return repository.save(newWrapper) + .doOnSuccess(wrapper -> monitoredEntities.put(ttl, wrapper)); + } + + private static Map resolveExpiryBuckets(CacheConfig config) { + if (config.getMinExpiry() == config.getMaxExpiry() && config.getMinExpiry() == 0 + || config.getMinExpiry() == config.getMaxExpiry() && config.getMinExpiry() == config.getExpirySec()) { + return Map.of(config.getExpirySec(), DEFAULT_CACHE_TTL); + } else if (config.getMinExpiry() == config.getMaxExpiry()) { + return Map.of( + config.getExpirySec(), DEFAULT_CACHE_TTL, + config.getMinExpiry(), STATIC_CACHE_TTL); + } else { + return Map.of( + config.getExpirySec(), DEFAULT_CACHE_TTL, + config.getMinExpiry(), MIN_CACHE_TTL, + config.getMaxExpiry(), MAX_CACHE_TTL); + } + } +} diff --git a/src/main/resources/application.yml b/src/main/resources/application.yml index fc83b1e..6c16c02 100644 --- a/src/main/resources/application.yml +++ b/src/main/resources/application.yml @@ -20,6 +20,9 @@ cors: # cache cache: + monitoring: + enabled: false + intervalSec: 1 prefix: prebid_ expiry_sec: 300 timeout_ms: 300