From b1b1cc1fdbfee10fb612cd5032133d6acbb16c16 Mon Sep 17 00:00:00 2001 From: Oleksandr Zhevedenko <720803+Net-burst@users.noreply.github.com> Date: Wed, 26 Mar 2025 21:31:05 -0400 Subject: [PATCH 1/8] Feature: Cache monitoring --- .../prebid/cache/config/MonitoringConfig.java | 44 ++++++++++ .../prebid/cache/metrics/MeasurementTag.java | 3 +- .../prebid/cache/metrics/MetricsRecorder.java | 14 +++- .../prebid/cache/model/PayloadWrapper.java | 2 +- .../cache/service/CacheMonitorService.java | 81 +++++++++++++++++++ src/main/resources/application.yml | 3 + 6 files changed, 141 insertions(+), 6 deletions(-) create mode 100644 src/main/java/org/prebid/cache/config/MonitoringConfig.java create mode 100644 src/main/java/org/prebid/cache/service/CacheMonitorService.java diff --git a/src/main/java/org/prebid/cache/config/MonitoringConfig.java b/src/main/java/org/prebid/cache/config/MonitoringConfig.java new file mode 100644 index 0000000..0eb8734 --- /dev/null +++ b/src/main/java/org/prebid/cache/config/MonitoringConfig.java @@ -0,0 +1,44 @@ +package org.prebid.cache.config; + +import lombok.extern.slf4j.Slf4j; +import org.prebid.cache.metrics.MetricsRecorder; +import org.prebid.cache.model.PayloadWrapper; +import org.prebid.cache.repository.CacheConfig; +import org.prebid.cache.repository.ReactiveRepository; +import org.prebid.cache.service.CacheMonitorService; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import reactor.core.Disposable; +import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; + +import java.time.Duration; + +@Slf4j +@Configuration +@ConditionalOnProperty(prefix = "cache.monitoring", name = "enabled", havingValue = "true") +public class MonitoringConfig { + + @Bean + public CacheMonitorService storageMonitorService(ReactiveRepository repository, + MetricsRecorder metricsRecorder, + CacheConfig config) { + return new CacheMonitorService(repository, metricsRecorder, config); + } + + @Bean + public Disposable monitorScheduledPoller(CacheMonitorService cacheMonitorService, + @Value("${cache.monitoring.intervalSec}") int intervalSec) { + return Flux.interval(Duration.ofSeconds(intervalSec)) + .onBackpressureDrop() + .onErrorContinue((throwable, o) -> log.error( + "Failed during cache monitor polling: " + throwable.getMessage(), throwable)) + .flatMap(counter -> Mono.fromCallable(() -> { + cacheMonitorService.poll(); + return counter; + })) + .subscribe(); + } +} diff --git a/src/main/java/org/prebid/cache/metrics/MeasurementTag.java b/src/main/java/org/prebid/cache/metrics/MeasurementTag.java index d0572de..b8d9913 100644 --- a/src/main/java/org/prebid/cache/metrics/MeasurementTag.java +++ b/src/main/java/org/prebid/cache/metrics/MeasurementTag.java @@ -20,7 +20,8 @@ public enum MeasurementTag { ERROR_EXISTING_ID("pbc.err.existingId"), ERROR_REJECTED_EXTERNAL_ID("pbc.err.rejectedExternalId"), PROXY_SUCCESS("pbc.proxy.success"), - PROXY_FAILURE("pbc.proxy.failure"); + PROXY_FAILURE("pbc.proxy.failure"), + ENTRY_LIFETIME("pbc.entryLifetimeSec.${ttlBucket}"); private final String tag; diff --git a/src/main/java/org/prebid/cache/metrics/MetricsRecorder.java b/src/main/java/org/prebid/cache/metrics/MetricsRecorder.java index 120d015..3c4daa7 100644 --- a/src/main/java/org/prebid/cache/metrics/MetricsRecorder.java +++ b/src/main/java/org/prebid/cache/metrics/MetricsRecorder.java @@ -6,12 +6,15 @@ import org.prebid.cache.handlers.ServiceType; import org.springframework.stereotype.Component; +import java.time.Duration; + @Component public class MetricsRecorder { private final MeterRegistry meterRegistry; protected static final String PREFIX_PLACEHOLDER = "\\$\\{prefix\\}"; + protected static final String TTL_BUCKET_PLACEHOLDER = "\\$\\{ttlBucket\\}"; public MetricsRecorder(MeterRegistry meterRegistry) { this.meterRegistry = meterRegistry; @@ -66,13 +69,16 @@ public void markMeterForTag(final String prefix, final MeasurementTag measuremen public MetricsRecorderTimer createRequestTimerForServiceType(final ServiceType serviceType) { if (serviceType.equals(ServiceType.FETCH)) { return new MetricsRecorderTimer( - MeasurementTag.REQUEST_DURATION.getTag() - .replaceAll(PREFIX_PLACEHOLDER, "read")); + MeasurementTag.REQUEST_DURATION.getTag().replaceAll(PREFIX_PLACEHOLDER, "read")); } else if (serviceType.equals(ServiceType.SAVE)) { return new MetricsRecorderTimer( - MeasurementTag.REQUEST_DURATION.getTag() - .replaceAll(PREFIX_PLACEHOLDER, "write")); + MeasurementTag.REQUEST_DURATION.getTag().replaceAll(PREFIX_PLACEHOLDER, "write")); } return null; } + + public void recordEntryLifetime(String bucketName, Duration entryLifetime) { + meterRegistry.timer(MeasurementTag.ENTRY_LIFETIME.getTag().replaceAll(TTL_BUCKET_PLACEHOLDER, bucketName)) + .record(Duration.ofSeconds(entryLifetime.getSeconds())); + } } diff --git a/src/main/java/org/prebid/cache/model/PayloadWrapper.java b/src/main/java/org/prebid/cache/model/PayloadWrapper.java index fa59143..5d92aec 100644 --- a/src/main/java/org/prebid/cache/model/PayloadWrapper.java +++ b/src/main/java/org/prebid/cache/model/PayloadWrapper.java @@ -10,7 +10,7 @@ public class PayloadWrapper { String id; String prefix; Payload payload; - + Long timestamp; Long expiry; transient boolean isExternalId; diff --git a/src/main/java/org/prebid/cache/service/CacheMonitorService.java b/src/main/java/org/prebid/cache/service/CacheMonitorService.java new file mode 100644 index 0000000..e240cba --- /dev/null +++ b/src/main/java/org/prebid/cache/service/CacheMonitorService.java @@ -0,0 +1,81 @@ +package org.prebid.cache.service; + +import lombok.extern.slf4j.Slf4j; +import org.prebid.cache.exceptions.PayloadWrapperPropertyException; +import org.prebid.cache.metrics.MetricsRecorder; +import org.prebid.cache.model.PayloadWrapper; +import org.prebid.cache.repository.CacheConfig; +import org.prebid.cache.repository.ReactiveRepository; +import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; +import reactor.core.scheduler.Schedulers; + +import java.time.Duration; +import java.time.Instant; +import java.util.Map; +import java.util.Objects; +import java.util.UUID; +import java.util.concurrent.ConcurrentHashMap; + +@Slf4j +public class CacheMonitorService { + private final ReactiveRepository repository; + private final MetricsRecorder metricsRecorder; + private final String prefix; + + private final Map monitoredExpiryTimes; + private final Map monitoredEntities = new ConcurrentHashMap<>(); + + public CacheMonitorService(ReactiveRepository repository, + MetricsRecorder metricsRecorder, + CacheConfig config) { + + this.repository = repository; + this.metricsRecorder = metricsRecorder; + this.prefix = config.getPrefix(); + monitoredExpiryTimes = Map.of( + config.getMinExpiry(), "min", + config.getExpirySec(), "default", + config.getMaxExpiry(), "max"); + } + + public void poll() { + Flux.fromIterable(monitoredExpiryTimes.entrySet()) + .flatMap(entry -> + Mono.defer(() -> + processExpiryBucket(entry.getKey(), entry.getValue(), monitoredEntities.get(entry.getKey())) + .doOnNext(wrapper -> monitoredEntities.put(entry.getKey(), wrapper)) + )) + .subscribeOn(Schedulers.parallel()) + .doOnError(error -> log.error("Error during cache monitor poll.", error)) + .subscribe(); + } + + private Mono processExpiryBucket(Long ttl, String bucketName, PayloadWrapper payloadWrapper) { + if (Objects.isNull(payloadWrapper)) { + return saveNewWrapper(ttl); + } else { + try { + final String normalizedId = payloadWrapper.getNormalizedId(); + return repository.findById(normalizedId) + .switchIfEmpty(Mono.defer(() -> { + final Duration entryLifetime = Duration.ofMillis(Instant.now().toEpochMilli() - payloadWrapper.getTimestamp()); + metricsRecorder.recordEntryLifetime(bucketName, entryLifetime); + return saveNewWrapper(ttl); + })); + } catch (PayloadWrapperPropertyException e) { + return Mono.error(new RuntimeException(e)); + } + } + } + + private Mono saveNewWrapper(Long ttl) { + final PayloadWrapper wrapper = PayloadWrapper.builder() + .id(UUID.randomUUID().toString()) + .prefix(prefix) + .timestamp(Instant.now().toEpochMilli()) + .expiry(ttl) + .build(); + return repository.save(wrapper); + } +} diff --git a/src/main/resources/application.yml b/src/main/resources/application.yml index fc83b1e..d545cd5 100644 --- a/src/main/resources/application.yml +++ b/src/main/resources/application.yml @@ -20,6 +20,9 @@ cors: # cache cache: + monitoring: + enabled: true + intervalSec: 1 prefix: prebid_ expiry_sec: 300 timeout_ms: 300 From b708e746666c8711e066d11cb1ce3fd6fe9c7e0d Mon Sep 17 00:00:00 2001 From: Oleksandr Zhevedenko <720803+Net-burst@users.noreply.github.com> Date: Wed, 26 Mar 2025 23:34:57 -0400 Subject: [PATCH 2/8] Fix checkstyle issues --- .../java/org/prebid/cache/service/CacheMonitorService.java | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/src/main/java/org/prebid/cache/service/CacheMonitorService.java b/src/main/java/org/prebid/cache/service/CacheMonitorService.java index e240cba..e048fca 100644 --- a/src/main/java/org/prebid/cache/service/CacheMonitorService.java +++ b/src/main/java/org/prebid/cache/service/CacheMonitorService.java @@ -43,7 +43,9 @@ public void poll() { Flux.fromIterable(monitoredExpiryTimes.entrySet()) .flatMap(entry -> Mono.defer(() -> - processExpiryBucket(entry.getKey(), entry.getValue(), monitoredEntities.get(entry.getKey())) + processExpiryBucket(entry.getKey(), + entry.getValue(), + monitoredEntities.get(entry.getKey())) .doOnNext(wrapper -> monitoredEntities.put(entry.getKey(), wrapper)) )) .subscribeOn(Schedulers.parallel()) @@ -59,7 +61,8 @@ private Mono processExpiryBucket(Long ttl, String bucketName, Pa final String normalizedId = payloadWrapper.getNormalizedId(); return repository.findById(normalizedId) .switchIfEmpty(Mono.defer(() -> { - final Duration entryLifetime = Duration.ofMillis(Instant.now().toEpochMilli() - payloadWrapper.getTimestamp()); + final Duration entryLifetime = + Duration.ofMillis(Instant.now().toEpochMilli() - payloadWrapper.getTimestamp()); metricsRecorder.recordEntryLifetime(bucketName, entryLifetime); return saveNewWrapper(ttl); })); From 1b554bbe576f33c98608ceaa692dbe9173412388 Mon Sep 17 00:00:00 2001 From: Oleksandr Zhevedenko <720803+Net-burst@users.noreply.github.com> Date: Thu, 27 Mar 2025 21:01:53 -0400 Subject: [PATCH 3/8] Fix remarks and update logic to be async where needed --- .../prebid/cache/config/MonitoringConfig.java | 7 +-- .../cache/service/CacheMonitorService.java | 59 +++++++++++++------ src/main/resources/application.yml | 2 +- 3 files changed, 44 insertions(+), 24 deletions(-) diff --git a/src/main/java/org/prebid/cache/config/MonitoringConfig.java b/src/main/java/org/prebid/cache/config/MonitoringConfig.java index 0eb8734..1bd02a8 100644 --- a/src/main/java/org/prebid/cache/config/MonitoringConfig.java +++ b/src/main/java/org/prebid/cache/config/MonitoringConfig.java @@ -12,7 +12,6 @@ import org.springframework.context.annotation.Configuration; import reactor.core.Disposable; import reactor.core.publisher.Flux; -import reactor.core.publisher.Mono; import java.time.Duration; @@ -31,14 +30,12 @@ public CacheMonitorService storageMonitorService(ReactiveRepository log.error( "Failed during cache monitor polling: " + throwable.getMessage(), throwable)) - .flatMap(counter -> Mono.fromCallable(() -> { - cacheMonitorService.poll(); - return counter; - })) + .concatMap(counter -> cacheMonitorService.poll()) .subscribe(); } } diff --git a/src/main/java/org/prebid/cache/service/CacheMonitorService.java b/src/main/java/org/prebid/cache/service/CacheMonitorService.java index e048fca..b581f08 100644 --- a/src/main/java/org/prebid/cache/service/CacheMonitorService.java +++ b/src/main/java/org/prebid/cache/service/CacheMonitorService.java @@ -19,11 +19,17 @@ @Slf4j public class CacheMonitorService { + + private static final String DEFAULT_CACHE_TTL = "default"; + private static final String MIN_CACHE_TTL = "min"; + private static final String MAX_CACHE_TTL = "max"; + private static final String STATIC_CACHE_TTL = "static"; + private final ReactiveRepository repository; private final MetricsRecorder metricsRecorder; private final String prefix; - private final Map monitoredExpiryTimes; + private final Map expiryBuckets; private final Map monitoredEntities = new ConcurrentHashMap<>(); public CacheMonitorService(ReactiveRepository repository, @@ -33,24 +39,25 @@ public CacheMonitorService(ReactiveRepository repository this.repository = repository; this.metricsRecorder = metricsRecorder; this.prefix = config.getPrefix(); - monitoredExpiryTimes = Map.of( - config.getMinExpiry(), "min", - config.getExpirySec(), "default", - config.getMaxExpiry(), "max"); + expiryBuckets = resolveExpiryBuckets(config); + } + + public Mono poll() { + return processExpiryBuckets() + .onErrorContinue((error, o) -> log.error("Error during cache monitor poll.", error)) + .then(); } - public void poll() { - Flux.fromIterable(monitoredExpiryTimes.entrySet()) + private Flux processExpiryBuckets() { + return Flux.fromIterable(expiryBuckets.entrySet()) .flatMap(entry -> - Mono.defer(() -> - processExpiryBucket(entry.getKey(), - entry.getValue(), - monitoredEntities.get(entry.getKey())) - .doOnNext(wrapper -> monitoredEntities.put(entry.getKey(), wrapper)) - )) - .subscribeOn(Schedulers.parallel()) - .doOnError(error -> log.error("Error during cache monitor poll.", error)) - .subscribe(); + Mono.just(entry) + .flatMap(element -> + processExpiryBucket(element.getKey(), + element.getValue(), + monitoredEntities.get(element.getKey()))) + .subscribeOn(Schedulers.parallel()) + ); } private Mono processExpiryBucket(Long ttl, String bucketName, PayloadWrapper payloadWrapper) { @@ -73,12 +80,28 @@ private Mono processExpiryBucket(Long ttl, String bucketName, Pa } private Mono saveNewWrapper(Long ttl) { - final PayloadWrapper wrapper = PayloadWrapper.builder() + final PayloadWrapper newWrapper = PayloadWrapper.builder() .id(UUID.randomUUID().toString()) .prefix(prefix) .timestamp(Instant.now().toEpochMilli()) .expiry(ttl) .build(); - return repository.save(wrapper); + return repository.save(newWrapper) + .doOnSuccess(wrapper -> monitoredEntities.put(ttl, wrapper)); + } + + private Map resolveExpiryBuckets(CacheConfig config) { + if (config.getMinExpiry() == config.getMaxExpiry() && config.getMinExpiry() == 0) { + return Map.of(config.getExpirySec(), DEFAULT_CACHE_TTL); + } else if (config.getMinExpiry() == config.getMaxExpiry()) { + return Map.of( + config.getMinExpiry(), STATIC_CACHE_TTL, + config.getExpirySec(), DEFAULT_CACHE_TTL); + } else { + return Map.of( + config.getMinExpiry(), MIN_CACHE_TTL, + config.getExpirySec(), DEFAULT_CACHE_TTL, + config.getMaxExpiry(), MAX_CACHE_TTL); + } } } diff --git a/src/main/resources/application.yml b/src/main/resources/application.yml index d545cd5..6c16c02 100644 --- a/src/main/resources/application.yml +++ b/src/main/resources/application.yml @@ -21,7 +21,7 @@ cors: # cache cache: monitoring: - enabled: true + enabled: false intervalSec: 1 prefix: prebid_ expiry_sec: 300 From 4073969e5f66c5ad9e7c5cced36f57c3f671d754 Mon Sep 17 00:00:00 2001 From: Oleksandr Zhevedenko <720803+Net-burst@users.noreply.github.com> Date: Fri, 28 Mar 2025 11:02:22 -0400 Subject: [PATCH 4/8] Update bucketing logic --- .../org/prebid/cache/service/CacheMonitorService.java | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/src/main/java/org/prebid/cache/service/CacheMonitorService.java b/src/main/java/org/prebid/cache/service/CacheMonitorService.java index b581f08..228cec0 100644 --- a/src/main/java/org/prebid/cache/service/CacheMonitorService.java +++ b/src/main/java/org/prebid/cache/service/CacheMonitorService.java @@ -91,16 +91,17 @@ private Mono saveNewWrapper(Long ttl) { } private Map resolveExpiryBuckets(CacheConfig config) { - if (config.getMinExpiry() == config.getMaxExpiry() && config.getMinExpiry() == 0) { + if ((config.getMinExpiry() == config.getMaxExpiry() && config.getMinExpiry() == 0) + || (config.getMinExpiry() == config.getMaxExpiry() && config.getMinExpiry() == config.getExpirySec())) { return Map.of(config.getExpirySec(), DEFAULT_CACHE_TTL); } else if (config.getMinExpiry() == config.getMaxExpiry()) { return Map.of( - config.getMinExpiry(), STATIC_CACHE_TTL, - config.getExpirySec(), DEFAULT_CACHE_TTL); + config.getExpirySec(), DEFAULT_CACHE_TTL, + config.getMinExpiry(), STATIC_CACHE_TTL); } else { return Map.of( - config.getMinExpiry(), MIN_CACHE_TTL, config.getExpirySec(), DEFAULT_CACHE_TTL, + config.getMinExpiry(), MIN_CACHE_TTL, config.getMaxExpiry(), MAX_CACHE_TTL); } } From d9a0fffc1e81fa8009401fee2fe11f1db2a479cd Mon Sep 17 00:00:00 2001 From: Oleksandr Zhevedenko <720803+Net-burst@users.noreply.github.com> Date: Fri, 28 Mar 2025 12:18:50 -0400 Subject: [PATCH 5/8] Minor update to method scope --- src/main/java/org/prebid/cache/service/CacheMonitorService.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/main/java/org/prebid/cache/service/CacheMonitorService.java b/src/main/java/org/prebid/cache/service/CacheMonitorService.java index 228cec0..12ad60e 100644 --- a/src/main/java/org/prebid/cache/service/CacheMonitorService.java +++ b/src/main/java/org/prebid/cache/service/CacheMonitorService.java @@ -90,7 +90,7 @@ private Mono saveNewWrapper(Long ttl) { .doOnSuccess(wrapper -> monitoredEntities.put(ttl, wrapper)); } - private Map resolveExpiryBuckets(CacheConfig config) { + private static Map resolveExpiryBuckets(CacheConfig config) { if ((config.getMinExpiry() == config.getMaxExpiry() && config.getMinExpiry() == 0) || (config.getMinExpiry() == config.getMaxExpiry() && config.getMinExpiry() == config.getExpirySec())) { return Map.of(config.getExpirySec(), DEFAULT_CACHE_TTL); From d576623f6e695678b66353e6fe86d67b0c34e983 Mon Sep 17 00:00:00 2001 From: Oleksandr Zhevedenko <720803+Net-burst@users.noreply.github.com> Date: Fri, 28 Mar 2025 16:59:50 -0400 Subject: [PATCH 6/8] Fix Checkstyle --- .../java/org/prebid/cache/service/CacheMonitorService.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/main/java/org/prebid/cache/service/CacheMonitorService.java b/src/main/java/org/prebid/cache/service/CacheMonitorService.java index 12ad60e..ea2c519 100644 --- a/src/main/java/org/prebid/cache/service/CacheMonitorService.java +++ b/src/main/java/org/prebid/cache/service/CacheMonitorService.java @@ -91,8 +91,8 @@ private Mono saveNewWrapper(Long ttl) { } private static Map resolveExpiryBuckets(CacheConfig config) { - if ((config.getMinExpiry() == config.getMaxExpiry() && config.getMinExpiry() == 0) - || (config.getMinExpiry() == config.getMaxExpiry() && config.getMinExpiry() == config.getExpirySec())) { + if (config.getMinExpiry() == config.getMaxExpiry() && config.getMinExpiry() == 0 + || config.getMinExpiry() == config.getMaxExpiry() && config.getMinExpiry() == config.getExpirySec()) { return Map.of(config.getExpirySec(), DEFAULT_CACHE_TTL); } else if (config.getMinExpiry() == config.getMaxExpiry()) { return Map.of( From 5def5090c4f983eac5d82d0dc5c4557a581d89dc Mon Sep 17 00:00:00 2001 From: Oleksandr Zhevedenko <720803+Net-burst@users.noreply.github.com> Date: Mon, 31 Mar 2025 17:36:24 -0400 Subject: [PATCH 7/8] Fix remarks --- .../prebid/cache/config/MonitoringConfig.java | 8 ++-- .../cache/service/CacheMonitorService.java | 47 +++++++++---------- 2 files changed, 26 insertions(+), 29 deletions(-) diff --git a/src/main/java/org/prebid/cache/config/MonitoringConfig.java b/src/main/java/org/prebid/cache/config/MonitoringConfig.java index 1bd02a8..c2d8049 100644 --- a/src/main/java/org/prebid/cache/config/MonitoringConfig.java +++ b/src/main/java/org/prebid/cache/config/MonitoringConfig.java @@ -14,6 +14,7 @@ import reactor.core.publisher.Flux; import java.time.Duration; +import java.util.Random; @Slf4j @Configuration @@ -29,13 +30,14 @@ public CacheMonitorService storageMonitorService(ReactiveRepository cacheMonitorService.poll()) .onErrorContinue((throwable, o) -> log.error( "Failed during cache monitor polling: " + throwable.getMessage(), throwable)) - .concatMap(counter -> cacheMonitorService.poll()) .subscribe(); } } diff --git a/src/main/java/org/prebid/cache/service/CacheMonitorService.java b/src/main/java/org/prebid/cache/service/CacheMonitorService.java index ea2c519..85ffca9 100644 --- a/src/main/java/org/prebid/cache/service/CacheMonitorService.java +++ b/src/main/java/org/prebid/cache/service/CacheMonitorService.java @@ -43,39 +43,34 @@ public CacheMonitorService(ReactiveRepository repository } public Mono poll() { - return processExpiryBuckets() - .onErrorContinue((error, o) -> log.error("Error during cache monitor poll.", error)) - .then(); - } - - private Flux processExpiryBuckets() { return Flux.fromIterable(expiryBuckets.entrySet()) + .parallel() + .runOn(Schedulers.parallel()) .flatMap(entry -> - Mono.just(entry) - .flatMap(element -> - processExpiryBucket(element.getKey(), - element.getValue(), - monitoredEntities.get(element.getKey()))) - .subscribeOn(Schedulers.parallel()) - ); + processExpiryBucket(entry.getKey(), + entry.getValue(), + monitoredEntities.get(entry.getKey())) + .doOnNext(wrapper -> monitoredEntities.put(entry.getKey(), wrapper)) + ) + .then(); } private Mono processExpiryBucket(Long ttl, String bucketName, PayloadWrapper payloadWrapper) { if (Objects.isNull(payloadWrapper)) { return saveNewWrapper(ttl); - } else { - try { - final String normalizedId = payloadWrapper.getNormalizedId(); - return repository.findById(normalizedId) - .switchIfEmpty(Mono.defer(() -> { - final Duration entryLifetime = - Duration.ofMillis(Instant.now().toEpochMilli() - payloadWrapper.getTimestamp()); - metricsRecorder.recordEntryLifetime(bucketName, entryLifetime); - return saveNewWrapper(ttl); - })); - } catch (PayloadWrapperPropertyException e) { - return Mono.error(new RuntimeException(e)); - } + } + + try { + final String normalizedId = payloadWrapper.getNormalizedId(); + return repository.findById(normalizedId) + .switchIfEmpty(Mono.defer(() -> { + final Duration entryLifetime = + Duration.ofMillis(Instant.now().toEpochMilli() - payloadWrapper.getTimestamp()); + metricsRecorder.recordEntryLifetime(bucketName, entryLifetime); + return saveNewWrapper(ttl); + })); + } catch (PayloadWrapperPropertyException e) { + return Mono.error(new RuntimeException(e)); } } From 61ea29c7f2d476a2a5f78cb69c1aee9ceaaddf8a Mon Sep 17 00:00:00 2001 From: Oleksandr Zhevedenko <720803+Net-burst@users.noreply.github.com> Date: Mon, 12 May 2025 10:29:36 -0400 Subject: [PATCH 8/8] Fix remark --- src/main/java/org/prebid/cache/service/CacheMonitorService.java | 2 -- 1 file changed, 2 deletions(-) diff --git a/src/main/java/org/prebid/cache/service/CacheMonitorService.java b/src/main/java/org/prebid/cache/service/CacheMonitorService.java index 85ffca9..1d7fd11 100644 --- a/src/main/java/org/prebid/cache/service/CacheMonitorService.java +++ b/src/main/java/org/prebid/cache/service/CacheMonitorService.java @@ -1,6 +1,5 @@ package org.prebid.cache.service; -import lombok.extern.slf4j.Slf4j; import org.prebid.cache.exceptions.PayloadWrapperPropertyException; import org.prebid.cache.metrics.MetricsRecorder; import org.prebid.cache.model.PayloadWrapper; @@ -17,7 +16,6 @@ import java.util.UUID; import java.util.concurrent.ConcurrentHashMap; -@Slf4j public class CacheMonitorService { private static final String DEFAULT_CACHE_TTL = "default";