diff --git a/conf/default-config.json b/conf/default-config.json
index c5de0d87b..9921e6fd4 100644
--- a/conf/default-config.json
+++ b/conf/default-config.json
@@ -40,5 +40,6 @@
"sharing_token_expiry_seconds": 2592000,
"operator_type": "public",
"enable_remote_config": true,
- "uid_instance_id_prefix": "local-operator"
+ "uid_instance_id_prefix": "local-operator",
+ "enable_async_batch_request": true
}
diff --git a/pom.xml b/pom.xml
index 70f69138a..ada90c898 100644
--- a/pom.xml
+++ b/pom.xml
@@ -6,7 +6,7 @@
com.uid2
uid2-operator
- 5.64.11
+ 5.64.14-alpha-302-SNAPSHOT
UTF-8
diff --git a/src/main/java/com/uid2/operator/Const.java b/src/main/java/com/uid2/operator/Const.java
index 14f878371..ec0004f25 100644
--- a/src/main/java/com/uid2/operator/Const.java
+++ b/src/main/java/com/uid2/operator/Const.java
@@ -39,5 +39,9 @@ public class Config extends com.uid2.shared.Const.Config {
public static final String RuntimeConfigMetadataPathProp = "runtime_config_metadata_path";
public static final String IdentityEnvironmentProp = "identity_environment";
+
+ public static final String EnableAsyncBatchRequestProp = "enable_async_batch_request";
+
+ public static final String DefaultWorkerPoolThreadCount = "default_worker_pool_thread_count";
}
}
diff --git a/src/main/java/com/uid2/operator/Main.java b/src/main/java/com/uid2/operator/Main.java
index b4e79f5c2..8fbd2d4fc 100644
--- a/src/main/java/com/uid2/operator/Main.java
+++ b/src/main/java/com/uid2/operator/Main.java
@@ -488,7 +488,7 @@ private static Vertx createVertx() {
MicrometerMetricsOptions metricOptions = new MicrometerMetricsOptions()
.setPrometheusOptions(prometheusOptions)
- .setLabels(EnumSet.of(Label.HTTP_METHOD, Label.HTTP_CODE, Label.HTTP_PATH))
+ .setLabels(EnumSet.of(Label.HTTP_METHOD, Label.HTTP_CODE, Label.HTTP_PATH, Label.POOL_NAME))
.setJvmMetricsEnabled(true)
.setEnabled(true);
setupMetrics(metricOptions);
@@ -497,9 +497,14 @@ private static Vertx createVertx() {
? 60 * 1000
: 3600 * 1000;
+ final int defaultWorkerPoolSize = Math.max(2, (Runtime.getRuntime().availableProcessors() - 2) / 2 + 1);
+ final int workerPoolSize = getEnvInt(Const.Config.DefaultWorkerPoolThreadCount, defaultWorkerPoolSize);
+ LOGGER.info("Creating Vertx with default worker pool size: {}", workerPoolSize);
+
VertxOptions vertxOptions = new VertxOptions()
.setMetricsOptions(metricOptions)
- .setBlockedThreadCheckInterval(threadBlockedCheckInterval);
+ .setBlockedThreadCheckInterval(threadBlockedCheckInterval)
+ .setWorkerPoolSize(workerPoolSize);
return Vertx.vertx(vertxOptions);
}
@@ -524,6 +529,7 @@ private static void setupMetrics(MicrometerMetricsOptions metricOptions) {
Objects.equals(id.getTag(Label.HTTP_CODE.toString()), "404")))
.meterFilter(new MeterFilter() {
private final String httpServerResponseTime = MetricsDomain.HTTP_SERVER.getPrefix() + MetricsNaming.v4Names().getHttpResponseTime();
+ private final String poolQueueTime = MetricsDomain.NAMED_POOLS.getPrefix() + MetricsNaming.v4Names().getPoolQueueTime();
@Override
public DistributionStatisticConfig configure(Meter.Id id, DistributionStatisticConfig config) {
@@ -533,6 +539,12 @@ public DistributionStatisticConfig configure(Meter.Id id, DistributionStatisticC
.build()
.merge(config);
}
+ if (id.getName().equals(poolQueueTime)) {
+ return DistributionStatisticConfig.builder()
+ .percentiles(0.50, 0.90, 0.95, 0.99)
+ .build()
+ .merge(config);
+ }
return config;
}
})
@@ -626,4 +638,17 @@ private IOperatorKeyRetriever createOperatorKeyRetriever() throws Exception {
}
}
}
+
+ private static int getEnvInt(String name, int defaultValue) {
+ String value = System.getenv(name);
+ if (value == null || value.isEmpty()) {
+ return defaultValue;
+ }
+ try {
+ return Integer.parseInt(value);
+ } catch (NumberFormatException e) {
+ LOGGER.warn("Invalid integer value for environment variable {}: '{}', using default: {}", name, value, defaultValue);
+ return defaultValue;
+ }
+ }
}
diff --git a/src/main/java/com/uid2/operator/vertx/UIDOperatorVerticle.java b/src/main/java/com/uid2/operator/vertx/UIDOperatorVerticle.java
index 5f879d768..88c8b2ab5 100644
--- a/src/main/java/com/uid2/operator/vertx/UIDOperatorVerticle.java
+++ b/src/main/java/com/uid2/operator/vertx/UIDOperatorVerticle.java
@@ -139,6 +139,8 @@ public class UIDOperatorVerticle extends AbstractVerticle {
private final int optOutStatusMaxRequestSize;
private final boolean optOutStatusApiEnabled;
+ private final boolean isAsyncBatchRequestsEnabled;
+
//"Android" is from https://github.com/IABTechLab/uid2-android-sdk/blob/ff93ebf597f5de7d440a84f7015a334ba4138ede/sdk/src/main/java/com/uid2/UID2Client.kt#L46
//"ios"/"tvos" is from https://github.com/IABTechLab/uid2-ios-sdk/blob/91c290d29a7093cfc209eca493d1fee80c17e16a/Sources/UID2/UID2Client.swift#L36-L38
private static final List SUPPORTED_IN_APP = Arrays.asList("Android", "ios", "tvos");
@@ -197,6 +199,7 @@ public UIDOperatorVerticle(IConfigStore configStore,
this.identityV3Enabled = config.getBoolean(IdentityV3Prop, false);
this.disableOptoutToken = config.getBoolean(DisableOptoutTokenProp, false);
this.uidInstanceIdProvider = uidInstanceIdProvider;
+ this.isAsyncBatchRequestsEnabled = config.getBoolean(EnableAsyncBatchRequestProp, false);
}
@Override
@@ -281,16 +284,8 @@ private void setUpEncryptedRoutes(Router mainRouter, BodyHandler bodyHandler) {
rc -> encryptedPayloadHandler.handleTokenRefresh(rc, this::handleTokenRefreshV2)));
mainRouter.post(V2_TOKEN_VALIDATE.toString()).handler(bodyHandler).handler(auth.handleV1(
rc -> encryptedPayloadHandler.handle(rc, this::handleTokenValidateV2), Role.GENERATOR));
- mainRouter.post(V2_IDENTITY_BUCKETS.toString()).handler(bodyHandler).handler(auth.handleV1(
- rc -> encryptedPayloadHandler.handle(rc, this::handleBucketsV2), Role.MAPPER));
- mainRouter.post(V2_IDENTITY_MAP.toString()).handler(bodyHandler).handler(auth.handleV1(
- rc -> encryptedPayloadHandler.handle(rc, this::handleIdentityMapV2), Role.MAPPER));
mainRouter.post(V2_KEY_LATEST.toString()).handler(bodyHandler).handler(auth.handleV1(
rc -> encryptedPayloadHandler.handle(rc, this::handleKeysRequestV2), Role.ID_READER));
- mainRouter.post(V2_KEY_SHARING.toString()).handler(bodyHandler).handler(auth.handleV1(
- rc -> encryptedPayloadHandler.handle(rc, this::handleKeysSharing), Role.SHARER, Role.ID_READER));
- mainRouter.post(V2_KEY_BIDSTREAM.toString()).handler(bodyHandler).handler(auth.handleV1(
- rc -> encryptedPayloadHandler.handle(rc, this::handleKeysBidstream), Role.ID_READER));
mainRouter.post(V2_TOKEN_LOGOUT.toString()).handler(bodyHandler).handler(auth.handleV1(
rc -> encryptedPayloadHandler.handleAsync(rc, this::handleLogoutAsyncV2), Role.OPTOUT));
if (this.optOutStatusApiEnabled) {
@@ -302,8 +297,31 @@ private void setUpEncryptedRoutes(Router mainRouter, BodyHandler bodyHandler) {
if (this.clientSideTokenGenerate)
mainRouter.post(V2_TOKEN_CLIENTGENERATE.toString()).handler(bodyHandler).handler(this::handleClientSideTokenGenerate);
- mainRouter.post(V3_IDENTITY_MAP.toString()).handler(bodyHandler).handler(auth.handleV1(
- rc -> encryptedPayloadHandler.handle(rc, this::handleIdentityMapV3), Role.MAPPER));
+ if (isAsyncBatchRequestsEnabled) {
+ LOGGER.info("Async batch requests enabled");
+ mainRouter.post(V2_KEY_SHARING.toString()).handler(bodyHandler).blockingHandler(auth.handleV1(
+ rc -> encryptedPayloadHandler.handle(rc, this::handleKeysSharing), Role.SHARER, Role.ID_READER), false);
+ mainRouter.post(V2_KEY_BIDSTREAM.toString()).handler(bodyHandler).blockingHandler(auth.handleV1(
+ rc -> encryptedPayloadHandler.handle(rc, this::handleKeysBidstream), Role.ID_READER), false);
+ mainRouter.post(V2_IDENTITY_BUCKETS.toString()).handler(bodyHandler).blockingHandler(auth.handleV1(
+ rc -> encryptedPayloadHandler.handle(rc, this::handleBucketsV2), Role.MAPPER), false);
+ mainRouter.post(V2_IDENTITY_MAP.toString()).handler(bodyHandler).blockingHandler(auth.handleV1(
+ rc -> encryptedPayloadHandler.handle(rc, this::handleIdentityMapV2), Role.MAPPER), false);
+ mainRouter.post(V3_IDENTITY_MAP.toString()).handler(bodyHandler).blockingHandler(auth.handleV1(
+ rc -> encryptedPayloadHandler.handle(rc, this::handleIdentityMapV3), Role.MAPPER), false);
+ } else {
+ LOGGER.info("Async batch requests disabled");
+ mainRouter.post(V2_KEY_SHARING.toString()).handler(bodyHandler).handler(auth.handleV1(
+ rc -> encryptedPayloadHandler.handle(rc, this::handleKeysSharing), Role.SHARER, Role.ID_READER));
+ mainRouter.post(V2_KEY_BIDSTREAM.toString()).handler(bodyHandler).handler(auth.handleV1(
+ rc -> encryptedPayloadHandler.handle(rc, this::handleKeysBidstream), Role.ID_READER));
+ mainRouter.post(V2_IDENTITY_BUCKETS.toString()).handler(bodyHandler).handler(auth.handleV1(
+ rc -> encryptedPayloadHandler.handle(rc, this::handleBucketsV2), Role.MAPPER));
+ mainRouter.post(V2_IDENTITY_MAP.toString()).handler(bodyHandler).handler(auth.handleV1(
+ rc -> encryptedPayloadHandler.handle(rc, this::handleIdentityMapV2), Role.MAPPER));
+ mainRouter.post(V3_IDENTITY_MAP.toString()).handler(bodyHandler).handler(auth.handleV1(
+ rc -> encryptedPayloadHandler.handle(rc, this::handleIdentityMapV3), Role.MAPPER));
+ }
}
private void handleClientSideTokenGenerate(RoutingContext rc) {