Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
110 changes: 91 additions & 19 deletions src/main/java/org/prebid/server/cache/CoreCacheService.java
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import org.apache.commons.collections4.CollectionUtils;
import org.apache.commons.lang3.ObjectUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.http.client.utils.URIBuilder;
import org.prebid.server.auction.model.AuctionContext;
import org.prebid.server.auction.model.BidInfo;
import org.prebid.server.auction.model.CachedDebugLog;
Expand All @@ -22,6 +23,7 @@
import org.prebid.server.cache.model.DebugHttpCall;
import org.prebid.server.cache.proto.request.bid.BidCacheRequest;
import org.prebid.server.cache.proto.request.bid.BidPutObject;
import org.prebid.server.cache.proto.response.CacheErrorResponse;
import org.prebid.server.cache.proto.response.bid.BidCacheResponse;
import org.prebid.server.cache.proto.response.bid.CacheObject;
import org.prebid.server.cache.utils.CacheServiceUtil;
Expand All @@ -45,6 +47,7 @@
import org.prebid.server.vertx.httpclient.HttpClient;
import org.prebid.server.vertx.httpclient.model.HttpClientResponse;

import java.net.URISyntaxException;
import java.net.URL;
import java.time.Clock;
import java.util.ArrayList;
Expand All @@ -55,6 +58,7 @@
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.TimeoutException;
import java.util.function.BiConsumer;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.Stream;
Expand All @@ -66,6 +70,8 @@ public class CoreCacheService {
private static final String BID_WURL_ATTRIBUTE = "wurl";
private static final String TRACE_INFO_SEPARATOR = "-";
private static final int MAX_DATACENTER_REGION_LENGTH = 4;
private static final String UUID_QUERY_PARAMETER = "uuid";
private static final String CH_QUERY_PARAMETER = "ch";

private final HttpClient httpClient;
private final URL externalEndpointUrl;
Expand Down Expand Up @@ -186,18 +192,27 @@ private Future<BidCacheResponse> makeRequest(BidCacheRequest bidCacheRequest,
cacheHeaders,
mapper.encodeToString(bidCacheRequest),
remainingTimeout)
.map(response -> toBidCacheResponse(
.map(response -> processVtrackWriteCacheResponse(
response.getStatusCode(), response.getBody(), bidCount, accountId, startTime))
.recover(exception -> failResponse(exception, accountId, startTime));
.recover(exception -> failVtrackCacheWriteResponse(exception, accountId, startTime));
}

private Future<BidCacheResponse> failResponse(Throwable exception, String accountId, long startTime) {
metrics.updateCacheRequestFailedTime(accountId, clock.millis() - startTime);
private BidCacheResponse processVtrackWriteCacheResponse(int statusCode,
String responseBody,
int bidCount,
String accountId,
long startTime) {

logger.warn("Error occurred while interacting with cache service: {}", exception.getMessage());
logger.debug("Error occurred while interacting with cache service", exception);
final BidCacheResponse bidCacheResponse = toBidCacheResponse(statusCode, responseBody, bidCount);
metrics.updateVtrackCacheWriteRequestTime(accountId, clock.millis() - startTime, MetricName.ok);
return bidCacheResponse;
}

return Future.failedFuture(exception);
private <T> Future<T> failVtrackCacheWriteResponse(Throwable exception, String accountId, long startTime) {
if (exception instanceof PreBidException) {
metrics.updateVtrackCacheWriteRequestTime(accountId, clock.millis() - startTime, MetricName.err);
}
return failResponse(exception);
}

public Future<BidCacheResponse> cachePutObjects(List<BidPutObject> bidPutObjects,
Expand All @@ -210,7 +225,10 @@ public Future<BidCacheResponse> cachePutObjects(List<BidPutObject> bidPutObjects
final List<CachedCreative> cachedCreatives =
updatePutObjects(bidPutObjects, isEventsEnabled, biddersAllowingVastUpdate, accountId, integration);

updateCreativeMetrics(accountId, cachedCreatives);
updateCreativeMetrics(
cachedCreatives,
(ttl, type) -> metrics.updateVtrackCacheCreativeTtl(accountId, ttl, type),
(size, type) -> metrics.updateVtrackCacheCreativeSize(accountId, size, type));
Comment on lines +228 to +231
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No need for lambdas, just pass accountId as method argument

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't get, you mean it should be two separate updateCreativeMetrics methods for vtrack and auction?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No need, it seemed to me that the methods were the same


return makeRequest(toBidCacheRequest(cachedCreatives), cachedCreatives.size(), timeout, accountId);
}
Expand Down Expand Up @@ -309,7 +327,10 @@ private Future<CacheServiceResult> doCacheOpenrtb(List<CacheBid> bids,

final BidCacheRequest bidCacheRequest = toBidCacheRequest(cachedCreatives);

updateCreativeMetrics(accountId, cachedCreatives);
updateCreativeMetrics(
cachedCreatives,
(ttl, type) -> metrics.updateCacheCreativeTtl(accountId, ttl, type),
(size, type) -> metrics.updateCacheCreativeSize(accountId, size, type));

final String url = ObjectUtils.firstNonNull(internalEndpointUrl, externalEndpointUrl).toString();
final String body = mapper.encodeToString(bidCacheRequest);
Expand Down Expand Up @@ -343,8 +364,8 @@ private CacheServiceResult processResponseOpenrtb(HttpClientResponse response,
externalEndpointUrl.toString(), httpRequest, httpResponse, startTime);
final BidCacheResponse bidCacheResponse;
try {
bidCacheResponse = toBidCacheResponse(
responseStatusCode, response.getBody(), bidCount, accountId, startTime);
bidCacheResponse = toBidCacheResponse(responseStatusCode, response.getBody(), bidCount);
metrics.updateAuctionCacheRequestTime(accountId, clock.millis() - startTime, MetricName.ok);
} catch (PreBidException e) {
return CacheServiceResult.of(httpCall, e, Collections.emptyMap());
}
Expand All @@ -361,7 +382,7 @@ private CacheServiceResult failResponseOpenrtb(Throwable exception,
logger.warn("Error occurred while interacting with cache service: {}", exception.getMessage());
logger.debug("Error occurred while interacting with cache service", exception);

metrics.updateCacheRequestFailedTime(accountId, clock.millis() - startTime);
metrics.updateAuctionCacheRequestTime(accountId, clock.millis() - startTime, MetricName.err);

final DebugHttpCall httpCall = makeDebugHttpCall(externalEndpointUrl.toString(), request, null, startTime);
return CacheServiceResult.of(httpCall, exception, Collections.emptyMap());
Expand Down Expand Up @@ -460,9 +481,7 @@ private String generateWinUrl(String bidId,

private BidCacheResponse toBidCacheResponse(int statusCode,
String responseBody,
int bidCount,
String accountId,
long startTime) {
int bidCount) {

if (statusCode != 200) {
throw new PreBidException("HTTP status code " + statusCode);
Expand All @@ -480,7 +499,6 @@ private BidCacheResponse toBidCacheResponse(int statusCode,
throw new PreBidException("The number of response cache objects doesn't match with bids");
}

metrics.updateCacheRequestSuccessTime(accountId, clock.millis() - startTime);
return bidCacheResponse;
}

Expand Down Expand Up @@ -538,17 +556,20 @@ private static String resolveVideoBidUuid(String uuid, String hbCacheId) {
return hbCacheId != null && uuid.endsWith(hbCacheId) ? hbCacheId : uuid;
}

private void updateCreativeMetrics(String accountId, List<CachedCreative> cachedCreatives) {
private void updateCreativeMetrics(List<CachedCreative> cachedCreatives,
BiConsumer<Integer, MetricName> updateCreativeTtlMetric,
BiConsumer<Integer, MetricName> updateCreativeSiseMetric) {

for (CachedCreative cachedCreative : cachedCreatives) {
final BidPutObject payload = cachedCreative.getPayload();
final MetricName creativeType = resolveCreativeTypeName(payload);
final Integer creativeTtl = ObjectUtils.defaultIfNull(payload.getTtlseconds(), payload.getExpiry());

if (creativeTtl != null) {
metrics.updateCacheCreativeTtl(accountId, creativeTtl, creativeType);
updateCreativeTtlMetric.accept(creativeTtl, creativeType);
}

metrics.updateCacheCreativeSize(accountId, cachedCreative.getSize(), creativeType);
updateCreativeSiseMetric.accept(cachedCreative.getSize(), creativeType);
}
}

Expand Down Expand Up @@ -627,4 +648,55 @@ private static String normalizeDatacenterRegion(String datacenterRegion) {
? trimmedDatacenterRegion.substring(0, MAX_DATACENTER_REGION_LENGTH)
: trimmedDatacenterRegion;
}

public Future<HttpClientResponse> getCachedObject(String key, String ch, Timeout timeout) {
final long remainingTimeout = timeout.remaining();
if (remainingTimeout <= 0) {
return Future.failedFuture(new TimeoutException("Timeout has been exceeded"));
}

final URL endpointUrl = ObjectUtils.firstNonNull(internalEndpointUrl, externalEndpointUrl);
final String url;
try {
final URIBuilder uriBuilder = new URIBuilder(endpointUrl.toString());
uriBuilder.addParameter(UUID_QUERY_PARAMETER, key);
if (StringUtils.isNotBlank(ch)) {
uriBuilder.addParameter(CH_QUERY_PARAMETER, ch);
}
url = uriBuilder.build().toString();
} catch (URISyntaxException e) {
return Future.failedFuture(new IllegalArgumentException("Configured cache url is malformed", e));
}

final long startTime = clock.millis();
return httpClient.get(url, cacheHeaders, remainingTimeout)
.map(response -> processVtrackReadResponse(response, startTime))
.recover(CoreCacheService::failResponse);
}

private HttpClientResponse processVtrackReadResponse(HttpClientResponse response, long startTime) {
final int statusCode = response.getStatusCode();
final String body = response.getBody();

if (statusCode == 200) {
metrics.updateVtrackCacheReadRequestTime(clock.millis() - startTime, MetricName.ok);
return response;
}

metrics.updateVtrackCacheReadRequestTime(clock.millis() - startTime, MetricName.err);

try {
final CacheErrorResponse errorResponse = mapper.decodeValue(body, CacheErrorResponse.class);
return HttpClientResponse.of(statusCode, response.getHeaders(), errorResponse.getMessage());
} catch (DecodeException e) {
throw new PreBidException("Cannot parse response: " + body, e);
}
}

private static <T> Future<T> failResponse(Throwable exception) {
logger.warn("Error occurred while interacting with cache service: {}", exception.getMessage());
logger.debug("Error occurred while interacting with cache service", exception);

return Future.failedFuture(exception);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
package org.prebid.server.cache.proto.response;

import lombok.Builder;
import lombok.Value;

@Value
@Builder
public class CacheErrorResponse {

String error;

Integer status;

String path;

String message;

Long timestamp;
}
108 changes: 108 additions & 0 deletions src/main/java/org/prebid/server/handler/GetVtrackHandler.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
package org.prebid.server.handler;

import io.netty.handler.codec.http.HttpHeaderValues;
import io.netty.handler.codec.http.HttpResponseStatus;
import io.vertx.core.AsyncResult;
import io.vertx.core.MultiMap;
import io.vertx.core.http.HttpMethod;
import io.vertx.ext.web.RoutingContext;
import org.apache.commons.lang3.StringUtils;
import org.prebid.server.cache.CoreCacheService;
import org.prebid.server.execution.timeout.Timeout;
import org.prebid.server.execution.timeout.TimeoutFactory;
import org.prebid.server.log.Logger;
import org.prebid.server.log.LoggerFactory;
import org.prebid.server.model.Endpoint;
import org.prebid.server.util.HttpUtil;
import org.prebid.server.vertx.httpclient.model.HttpClientResponse;
import org.prebid.server.vertx.verticles.server.HttpEndpoint;
import org.prebid.server.vertx.verticles.server.application.ApplicationResource;

import java.util.Collections;
import java.util.List;
import java.util.Objects;

public class GetVtrackHandler implements ApplicationResource {

private static final Logger logger = LoggerFactory.getLogger(GetVtrackHandler.class);

private static final String UUID_PARAMETER = "uuid";
private static final String CH_PARAMETER = "ch";

private final long defaultTimeout;
private final CoreCacheService coreCacheService;
private final TimeoutFactory timeoutFactory;

public GetVtrackHandler(long defaultTimeout, CoreCacheService coreCacheService, TimeoutFactory timeoutFactory) {
this.defaultTimeout = defaultTimeout;
this.coreCacheService = Objects.requireNonNull(coreCacheService);
this.timeoutFactory = Objects.requireNonNull(timeoutFactory);
}

@Override
public List<HttpEndpoint> endpoints() {
return Collections.singletonList(HttpEndpoint.of(HttpMethod.GET, Endpoint.vtrack.value()));
}

@Override
public void handle(RoutingContext routingContext) {
final String uuid = routingContext.request().getParam(UUID_PARAMETER);
final String ch = routingContext.request().getParam(CH_PARAMETER);
if (StringUtils.isBlank(uuid)) {
respondWith(
routingContext,
HttpResponseStatus.BAD_REQUEST,
"'%s' is a required query parameter and can't be empty".formatted(UUID_PARAMETER));
return;
}

final Timeout timeout = timeoutFactory.create(defaultTimeout);

coreCacheService.getCachedObject(uuid, ch, timeout)
.onComplete(asyncCache -> handleCacheResult(asyncCache, routingContext));
}

private static void respondWithServerError(RoutingContext routingContext, Throwable exception) {
logger.error("Error occurred while sending request to cache", exception);
respondWith(routingContext, HttpResponseStatus.INTERNAL_SERVER_ERROR,
"%s: %s".formatted("Error occurred while sending request to cache", exception.getMessage()));
}

private static void respondWith(RoutingContext routingContext,
HttpResponseStatus status,
MultiMap headers,
String body) {

HttpUtil.executeSafely(
routingContext,
Endpoint.vtrack,
response -> {
headers.forEach(response::putHeader);
response.setStatusCode(status.code()) .end(body);
});
}

private static void respondWith(RoutingContext routingContext, HttpResponseStatus status, String body) {
HttpUtil.executeSafely(
routingContext,
Endpoint.vtrack,
response -> response
.putHeader(HttpUtil.CONTENT_TYPE_HEADER, HttpHeaderValues.APPLICATION_JSON)
.setStatusCode(status.code())
.end(body));
}

private void handleCacheResult(AsyncResult<HttpClientResponse> async, RoutingContext routingContext) {
if (async.failed()) {
respondWithServerError(routingContext, async.cause());
} else {
final HttpClientResponse response = async.result();
final HttpResponseStatus status = HttpResponseStatus.valueOf(response.getStatusCode());
if (status == HttpResponseStatus.OK) {
respondWith(routingContext, status, response.getHeaders(), response.getBody());
} else {
respondWith(routingContext, status, response.getBody());
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -39,9 +39,9 @@
import java.util.Set;
import java.util.stream.Collectors;

public class VtrackHandler implements ApplicationResource {
public class PostVtrackHandler implements ApplicationResource {

private static final Logger logger = LoggerFactory.getLogger(VtrackHandler.class);
private static final Logger logger = LoggerFactory.getLogger(PostVtrackHandler.class);

private static final String ACCOUNT_PARAMETER = "a";
private static final String INTEGRATION_PARAMETER = "int";
Expand All @@ -56,14 +56,14 @@ public class VtrackHandler implements ApplicationResource {
private final TimeoutFactory timeoutFactory;
private final JacksonMapper mapper;

public VtrackHandler(long defaultTimeout,
boolean allowUnknownBidder,
boolean modifyVastForUnknownBidder,
ApplicationSettings applicationSettings,
BidderCatalog bidderCatalog,
CoreCacheService coreCacheService,
TimeoutFactory timeoutFactory,
JacksonMapper mapper) {
public PostVtrackHandler(long defaultTimeout,
boolean allowUnknownBidder,
boolean modifyVastForUnknownBidder,
ApplicationSettings applicationSettings,
BidderCatalog bidderCatalog,
CoreCacheService coreCacheService,
TimeoutFactory timeoutFactory,
JacksonMapper mapper) {

this.defaultTimeout = defaultTimeout;
this.allowUnknownBidder = allowUnknownBidder;
Expand Down
Loading
Loading