Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
43 changes: 43 additions & 0 deletions src/main/java/org/prebid/cache/config/MonitoringConfig.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
package org.prebid.cache.config;

import lombok.extern.slf4j.Slf4j;
import org.prebid.cache.metrics.MetricsRecorder;
import org.prebid.cache.model.PayloadWrapper;
import org.prebid.cache.repository.CacheConfig;
import org.prebid.cache.repository.ReactiveRepository;
import org.prebid.cache.service.CacheMonitorService;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import reactor.core.Disposable;
import reactor.core.publisher.Flux;

import java.time.Duration;
import java.util.Random;

@Slf4j
@Configuration
@ConditionalOnProperty(prefix = "cache.monitoring", name = "enabled", havingValue = "true")
public class MonitoringConfig {

@Bean
public CacheMonitorService storageMonitorService(ReactiveRepository<PayloadWrapper, String> repository,
MetricsRecorder metricsRecorder,
CacheConfig config) {
return new CacheMonitorService(repository, metricsRecorder, config);
}

@Bean
public Disposable monitorScheduledPoller(CacheMonitorService cacheMonitorService,
CacheConfig cacheConfig,
@Value("${cache.monitoring.intervalSec}") int intervalSec) {
final Duration startDelay = Duration.ofSeconds(new Random().nextInt(0, cacheConfig.getTimeoutMs()));
return Flux.interval(startDelay, Duration.ofSeconds(intervalSec))
.onBackpressureDrop()
.concatMap(counter -> cacheMonitorService.poll())
.onErrorContinue((throwable, o) -> log.error(
"Failed during cache monitor polling: " + throwable.getMessage(), throwable))
.subscribe();
}
}
3 changes: 2 additions & 1 deletion src/main/java/org/prebid/cache/metrics/MeasurementTag.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,8 @@ public enum MeasurementTag {
ERROR_EXISTING_ID("pbc.err.existingId"),
ERROR_REJECTED_EXTERNAL_ID("pbc.err.rejectedExternalId"),
PROXY_SUCCESS("pbc.proxy.success"),
PROXY_FAILURE("pbc.proxy.failure");
PROXY_FAILURE("pbc.proxy.failure"),
ENTRY_LIFETIME("pbc.entryLifetimeSec.${ttlBucket}");

private final String tag;

Expand Down
14 changes: 10 additions & 4 deletions src/main/java/org/prebid/cache/metrics/MetricsRecorder.java
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,15 @@
import org.prebid.cache.handlers.ServiceType;
import org.springframework.stereotype.Component;

import java.time.Duration;

@Component
public class MetricsRecorder {

private final MeterRegistry meterRegistry;

protected static final String PREFIX_PLACEHOLDER = "\\$\\{prefix\\}";
protected static final String TTL_BUCKET_PLACEHOLDER = "\\$\\{ttlBucket\\}";

public MetricsRecorder(MeterRegistry meterRegistry) {
this.meterRegistry = meterRegistry;
Expand Down Expand Up @@ -66,13 +69,16 @@ public void markMeterForTag(final String prefix, final MeasurementTag measuremen
public MetricsRecorderTimer createRequestTimerForServiceType(final ServiceType serviceType) {
if (serviceType.equals(ServiceType.FETCH)) {
return new MetricsRecorderTimer(
MeasurementTag.REQUEST_DURATION.getTag()
.replaceAll(PREFIX_PLACEHOLDER, "read"));
MeasurementTag.REQUEST_DURATION.getTag().replaceAll(PREFIX_PLACEHOLDER, "read"));
} else if (serviceType.equals(ServiceType.SAVE)) {
return new MetricsRecorderTimer(
MeasurementTag.REQUEST_DURATION.getTag()
.replaceAll(PREFIX_PLACEHOLDER, "write"));
MeasurementTag.REQUEST_DURATION.getTag().replaceAll(PREFIX_PLACEHOLDER, "write"));
}
return null;
}

public void recordEntryLifetime(String bucketName, Duration entryLifetime) {
meterRegistry.timer(MeasurementTag.ENTRY_LIFETIME.getTag().replaceAll(TTL_BUCKET_PLACEHOLDER, bucketName))
.record(Duration.ofSeconds(entryLifetime.getSeconds()));
}
}
2 changes: 1 addition & 1 deletion src/main/java/org/prebid/cache/model/PayloadWrapper.java
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ public class PayloadWrapper {
String id;
String prefix;
Payload payload;

Long timestamp;
Long expiry;

transient boolean isExternalId;
Expand Down
101 changes: 101 additions & 0 deletions src/main/java/org/prebid/cache/service/CacheMonitorService.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
package org.prebid.cache.service;

import org.prebid.cache.exceptions.PayloadWrapperPropertyException;
import org.prebid.cache.metrics.MetricsRecorder;
import org.prebid.cache.model.PayloadWrapper;
import org.prebid.cache.repository.CacheConfig;
import org.prebid.cache.repository.ReactiveRepository;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Schedulers;

import java.time.Duration;
import java.time.Instant;
import java.util.Map;
import java.util.Objects;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;

public class CacheMonitorService {

private static final String DEFAULT_CACHE_TTL = "default";
private static final String MIN_CACHE_TTL = "min";
private static final String MAX_CACHE_TTL = "max";
private static final String STATIC_CACHE_TTL = "static";

private final ReactiveRepository<PayloadWrapper, String> repository;
private final MetricsRecorder metricsRecorder;
private final String prefix;

private final Map<Long, String> expiryBuckets;
private final Map<Long, PayloadWrapper> monitoredEntities = new ConcurrentHashMap<>();

public CacheMonitorService(ReactiveRepository<PayloadWrapper, String> repository,
MetricsRecorder metricsRecorder,
CacheConfig config) {

this.repository = repository;
this.metricsRecorder = metricsRecorder;
this.prefix = config.getPrefix();
expiryBuckets = resolveExpiryBuckets(config);
}

public Mono<Void> poll() {
return Flux.fromIterable(expiryBuckets.entrySet())
.parallel()
.runOn(Schedulers.parallel())
.flatMap(entry ->
processExpiryBucket(entry.getKey(),
entry.getValue(),
monitoredEntities.get(entry.getKey()))
.doOnNext(wrapper -> monitoredEntities.put(entry.getKey(), wrapper))
)
.then();
}

private Mono<PayloadWrapper> processExpiryBucket(Long ttl, String bucketName, PayloadWrapper payloadWrapper) {
if (Objects.isNull(payloadWrapper)) {
return saveNewWrapper(ttl);
}

try {
final String normalizedId = payloadWrapper.getNormalizedId();
return repository.findById(normalizedId)
.switchIfEmpty(Mono.defer(() -> {
final Duration entryLifetime =
Duration.ofMillis(Instant.now().toEpochMilli() - payloadWrapper.getTimestamp());
metricsRecorder.recordEntryLifetime(bucketName, entryLifetime);
return saveNewWrapper(ttl);
}));
} catch (PayloadWrapperPropertyException e) {
return Mono.error(new RuntimeException(e));
}
}

private Mono<PayloadWrapper> saveNewWrapper(Long ttl) {
final PayloadWrapper newWrapper = PayloadWrapper.builder()
.id(UUID.randomUUID().toString())
.prefix(prefix)
.timestamp(Instant.now().toEpochMilli())
.expiry(ttl)
.build();
return repository.save(newWrapper)
.doOnSuccess(wrapper -> monitoredEntities.put(ttl, wrapper));
}

private static Map<Long, String> resolveExpiryBuckets(CacheConfig config) {
if (config.getMinExpiry() == config.getMaxExpiry() && config.getMinExpiry() == 0
|| config.getMinExpiry() == config.getMaxExpiry() && config.getMinExpiry() == config.getExpirySec()) {
return Map.of(config.getExpirySec(), DEFAULT_CACHE_TTL);
} else if (config.getMinExpiry() == config.getMaxExpiry()) {
return Map.of(
config.getExpirySec(), DEFAULT_CACHE_TTL,
config.getMinExpiry(), STATIC_CACHE_TTL);
} else {
return Map.of(
config.getExpirySec(), DEFAULT_CACHE_TTL,
config.getMinExpiry(), MIN_CACHE_TTL,
config.getMaxExpiry(), MAX_CACHE_TTL);
}
}
}
3 changes: 3 additions & 0 deletions src/main/resources/application.yml
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,9 @@ cors:

# cache
cache:
monitoring:
enabled: false
intervalSec: 1
prefix: prebid_
expiry_sec: 300
timeout_ms: 300
Expand Down
Loading