Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 5 additions & 3 deletions src/main/java/org/prebid/cache/config/MonitoringConfig.java
Original file line number Diff line number Diff line change
Expand Up @@ -30,9 +30,11 @@ public CacheMonitorService storageMonitorService(ReactiveRepository<PayloadWrapp

@Bean
public Disposable monitorScheduledPoller(CacheMonitorService cacheMonitorService,
CacheConfig cacheConfig,
@Value("${cache.monitoring.intervalSec}") int intervalSec) {
final Duration startDelay = Duration.ofSeconds(new Random().nextInt(0, cacheConfig.getTimeoutMs()));
@Value("${cache.monitoring.intervalSec}") int intervalSec,
@Value("${cache.monitoring.maxStartDelayJitterSec}") int startJitterSec) {
final Duration startDelay = startJitterSec > 0
? Duration.ofSeconds(new Random().nextInt(0, startJitterSec))
: Duration.ZERO;
return Flux.interval(startDelay, Duration.ofSeconds(intervalSec))
.onBackpressureDrop()
.concatMap(counter -> cacheMonitorService.poll())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,13 +11,14 @@

@Slf4j
public class AerospikeReadListener implements RecordListener {

private static final String NAME = "cache";
private final MonoSink<String> sink;
private final String recordKeyId;
private final String keyId;

public AerospikeReadListener(MonoSink<String> sink, String recordKeyId) {
public AerospikeReadListener(MonoSink<String> sink, String keyId) {
this.sink = sink;
this.recordKeyId = recordKeyId;
this.keyId = keyId;
}

@Override
Expand All @@ -31,7 +32,7 @@ public void onSuccess(Key key, Record record) {

@Override
public void onFailure(AerospikeException exception) {
log.error("Error when reading record with keyId {}", recordKeyId);
log.error("Error reading record with key id {} due to: {}", keyId, exception.getMessage());
sink.error(exception);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@

@Slf4j
public class AerospikeWriteListener implements WriteListener {

private final MonoSink<String> sink;
private final String keyId;

Expand All @@ -23,7 +24,7 @@ public void onSuccess(Key key) {

@Override
public void onFailure(AerospikeException exception) {
log.error("Error when writing record with keyID : {}", keyId);
log.error("Error writing record with key id {} due to: {}", keyId, exception.getMessage());
sink.error(exception);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,6 @@ public MetricsRecorderTimer createRequestTimerForServiceType(final ServiceType s

public void recordEntryLifetime(String bucketName, Duration entryLifetime) {
meterRegistry.timer(MeasurementTag.ENTRY_LIFETIME.getTag().replaceAll(TTL_BUCKET_PLACEHOLDER, bucketName))
.record(Duration.ofSeconds(entryLifetime.getSeconds()));
.record(entryLifetime);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ private Mono<PayloadWrapper> processExpiryBucket(Long ttl, String bucketName, Pa
return repository.findById(normalizedId)
.switchIfEmpty(Mono.defer(() -> {
final Duration entryLifetime =
Duration.ofMillis(Instant.now().toEpochMilli() - payloadWrapper.getTimestamp());
Duration.between(Instant.ofEpochMilli(payloadWrapper.getTimestamp()), Instant.now());
metricsRecorder.recordEntryLifetime(bucketName, entryLifetime);
return saveNewWrapper(ttl);
}));
Expand Down
3 changes: 2 additions & 1 deletion src/main/resources/application.yml
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,8 @@ cors:
cache:
monitoring:
enabled: false
intervalSec: 1
intervalSec: 5
maxStartDelayJitterSec: 0
prefix: prebid_
expiry_sec: 300
timeout_ms: 300
Expand Down
Loading