diff --git a/conf/default-config.json b/conf/default-config.json index c5de0d87b..9921e6fd4 100644 --- a/conf/default-config.json +++ b/conf/default-config.json @@ -40,5 +40,6 @@ "sharing_token_expiry_seconds": 2592000, "operator_type": "public", "enable_remote_config": true, - "uid_instance_id_prefix": "local-operator" + "uid_instance_id_prefix": "local-operator", + "enable_async_batch_request": true } diff --git a/pom.xml b/pom.xml index 70f69138a..ada90c898 100644 --- a/pom.xml +++ b/pom.xml @@ -6,7 +6,7 @@ com.uid2 uid2-operator - 5.64.11 + 5.64.14-alpha-302-SNAPSHOT UTF-8 diff --git a/src/main/java/com/uid2/operator/Const.java b/src/main/java/com/uid2/operator/Const.java index 14f878371..ec0004f25 100644 --- a/src/main/java/com/uid2/operator/Const.java +++ b/src/main/java/com/uid2/operator/Const.java @@ -39,5 +39,9 @@ public class Config extends com.uid2.shared.Const.Config { public static final String RuntimeConfigMetadataPathProp = "runtime_config_metadata_path"; public static final String IdentityEnvironmentProp = "identity_environment"; + + public static final String EnableAsyncBatchRequestProp = "enable_async_batch_request"; + + public static final String DefaultWorkerPoolThreadCount = "default_worker_pool_thread_count"; } } diff --git a/src/main/java/com/uid2/operator/Main.java b/src/main/java/com/uid2/operator/Main.java index b4e79f5c2..8fbd2d4fc 100644 --- a/src/main/java/com/uid2/operator/Main.java +++ b/src/main/java/com/uid2/operator/Main.java @@ -488,7 +488,7 @@ private static Vertx createVertx() { MicrometerMetricsOptions metricOptions = new MicrometerMetricsOptions() .setPrometheusOptions(prometheusOptions) - .setLabels(EnumSet.of(Label.HTTP_METHOD, Label.HTTP_CODE, Label.HTTP_PATH)) + .setLabels(EnumSet.of(Label.HTTP_METHOD, Label.HTTP_CODE, Label.HTTP_PATH, Label.POOL_NAME)) .setJvmMetricsEnabled(true) .setEnabled(true); setupMetrics(metricOptions); @@ -497,9 +497,14 @@ private static Vertx createVertx() { ? 60 * 1000 : 3600 * 1000; + final int defaultWorkerPoolSize = Math.max(2, (Runtime.getRuntime().availableProcessors() - 2) / 2 + 1); + final int workerPoolSize = getEnvInt(Const.Config.DefaultWorkerPoolThreadCount, defaultWorkerPoolSize); + LOGGER.info("Creating Vertx with default worker pool size: {}", workerPoolSize); + VertxOptions vertxOptions = new VertxOptions() .setMetricsOptions(metricOptions) - .setBlockedThreadCheckInterval(threadBlockedCheckInterval); + .setBlockedThreadCheckInterval(threadBlockedCheckInterval) + .setWorkerPoolSize(workerPoolSize); return Vertx.vertx(vertxOptions); } @@ -524,6 +529,7 @@ private static void setupMetrics(MicrometerMetricsOptions metricOptions) { Objects.equals(id.getTag(Label.HTTP_CODE.toString()), "404"))) .meterFilter(new MeterFilter() { private final String httpServerResponseTime = MetricsDomain.HTTP_SERVER.getPrefix() + MetricsNaming.v4Names().getHttpResponseTime(); + private final String poolQueueTime = MetricsDomain.NAMED_POOLS.getPrefix() + MetricsNaming.v4Names().getPoolQueueTime(); @Override public DistributionStatisticConfig configure(Meter.Id id, DistributionStatisticConfig config) { @@ -533,6 +539,12 @@ public DistributionStatisticConfig configure(Meter.Id id, DistributionStatisticC .build() .merge(config); } + if (id.getName().equals(poolQueueTime)) { + return DistributionStatisticConfig.builder() + .percentiles(0.50, 0.90, 0.95, 0.99) + .build() + .merge(config); + } return config; } }) @@ -626,4 +638,17 @@ private IOperatorKeyRetriever createOperatorKeyRetriever() throws Exception { } } } + + private static int getEnvInt(String name, int defaultValue) { + String value = System.getenv(name); + if (value == null || value.isEmpty()) { + return defaultValue; + } + try { + return Integer.parseInt(value); + } catch (NumberFormatException e) { + LOGGER.warn("Invalid integer value for environment variable {}: '{}', using default: {}", name, value, defaultValue); + return defaultValue; + } + } } diff --git a/src/main/java/com/uid2/operator/vertx/UIDOperatorVerticle.java b/src/main/java/com/uid2/operator/vertx/UIDOperatorVerticle.java index 5f879d768..88c8b2ab5 100644 --- a/src/main/java/com/uid2/operator/vertx/UIDOperatorVerticle.java +++ b/src/main/java/com/uid2/operator/vertx/UIDOperatorVerticle.java @@ -139,6 +139,8 @@ public class UIDOperatorVerticle extends AbstractVerticle { private final int optOutStatusMaxRequestSize; private final boolean optOutStatusApiEnabled; + private final boolean isAsyncBatchRequestsEnabled; + //"Android" is from https://github.com/IABTechLab/uid2-android-sdk/blob/ff93ebf597f5de7d440a84f7015a334ba4138ede/sdk/src/main/java/com/uid2/UID2Client.kt#L46 //"ios"/"tvos" is from https://github.com/IABTechLab/uid2-ios-sdk/blob/91c290d29a7093cfc209eca493d1fee80c17e16a/Sources/UID2/UID2Client.swift#L36-L38 private static final List SUPPORTED_IN_APP = Arrays.asList("Android", "ios", "tvos"); @@ -197,6 +199,7 @@ public UIDOperatorVerticle(IConfigStore configStore, this.identityV3Enabled = config.getBoolean(IdentityV3Prop, false); this.disableOptoutToken = config.getBoolean(DisableOptoutTokenProp, false); this.uidInstanceIdProvider = uidInstanceIdProvider; + this.isAsyncBatchRequestsEnabled = config.getBoolean(EnableAsyncBatchRequestProp, false); } @Override @@ -281,16 +284,8 @@ private void setUpEncryptedRoutes(Router mainRouter, BodyHandler bodyHandler) { rc -> encryptedPayloadHandler.handleTokenRefresh(rc, this::handleTokenRefreshV2))); mainRouter.post(V2_TOKEN_VALIDATE.toString()).handler(bodyHandler).handler(auth.handleV1( rc -> encryptedPayloadHandler.handle(rc, this::handleTokenValidateV2), Role.GENERATOR)); - mainRouter.post(V2_IDENTITY_BUCKETS.toString()).handler(bodyHandler).handler(auth.handleV1( - rc -> encryptedPayloadHandler.handle(rc, this::handleBucketsV2), Role.MAPPER)); - mainRouter.post(V2_IDENTITY_MAP.toString()).handler(bodyHandler).handler(auth.handleV1( - rc -> encryptedPayloadHandler.handle(rc, this::handleIdentityMapV2), Role.MAPPER)); mainRouter.post(V2_KEY_LATEST.toString()).handler(bodyHandler).handler(auth.handleV1( rc -> encryptedPayloadHandler.handle(rc, this::handleKeysRequestV2), Role.ID_READER)); - mainRouter.post(V2_KEY_SHARING.toString()).handler(bodyHandler).handler(auth.handleV1( - rc -> encryptedPayloadHandler.handle(rc, this::handleKeysSharing), Role.SHARER, Role.ID_READER)); - mainRouter.post(V2_KEY_BIDSTREAM.toString()).handler(bodyHandler).handler(auth.handleV1( - rc -> encryptedPayloadHandler.handle(rc, this::handleKeysBidstream), Role.ID_READER)); mainRouter.post(V2_TOKEN_LOGOUT.toString()).handler(bodyHandler).handler(auth.handleV1( rc -> encryptedPayloadHandler.handleAsync(rc, this::handleLogoutAsyncV2), Role.OPTOUT)); if (this.optOutStatusApiEnabled) { @@ -302,8 +297,31 @@ private void setUpEncryptedRoutes(Router mainRouter, BodyHandler bodyHandler) { if (this.clientSideTokenGenerate) mainRouter.post(V2_TOKEN_CLIENTGENERATE.toString()).handler(bodyHandler).handler(this::handleClientSideTokenGenerate); - mainRouter.post(V3_IDENTITY_MAP.toString()).handler(bodyHandler).handler(auth.handleV1( - rc -> encryptedPayloadHandler.handle(rc, this::handleIdentityMapV3), Role.MAPPER)); + if (isAsyncBatchRequestsEnabled) { + LOGGER.info("Async batch requests enabled"); + mainRouter.post(V2_KEY_SHARING.toString()).handler(bodyHandler).blockingHandler(auth.handleV1( + rc -> encryptedPayloadHandler.handle(rc, this::handleKeysSharing), Role.SHARER, Role.ID_READER), false); + mainRouter.post(V2_KEY_BIDSTREAM.toString()).handler(bodyHandler).blockingHandler(auth.handleV1( + rc -> encryptedPayloadHandler.handle(rc, this::handleKeysBidstream), Role.ID_READER), false); + mainRouter.post(V2_IDENTITY_BUCKETS.toString()).handler(bodyHandler).blockingHandler(auth.handleV1( + rc -> encryptedPayloadHandler.handle(rc, this::handleBucketsV2), Role.MAPPER), false); + mainRouter.post(V2_IDENTITY_MAP.toString()).handler(bodyHandler).blockingHandler(auth.handleV1( + rc -> encryptedPayloadHandler.handle(rc, this::handleIdentityMapV2), Role.MAPPER), false); + mainRouter.post(V3_IDENTITY_MAP.toString()).handler(bodyHandler).blockingHandler(auth.handleV1( + rc -> encryptedPayloadHandler.handle(rc, this::handleIdentityMapV3), Role.MAPPER), false); + } else { + LOGGER.info("Async batch requests disabled"); + mainRouter.post(V2_KEY_SHARING.toString()).handler(bodyHandler).handler(auth.handleV1( + rc -> encryptedPayloadHandler.handle(rc, this::handleKeysSharing), Role.SHARER, Role.ID_READER)); + mainRouter.post(V2_KEY_BIDSTREAM.toString()).handler(bodyHandler).handler(auth.handleV1( + rc -> encryptedPayloadHandler.handle(rc, this::handleKeysBidstream), Role.ID_READER)); + mainRouter.post(V2_IDENTITY_BUCKETS.toString()).handler(bodyHandler).handler(auth.handleV1( + rc -> encryptedPayloadHandler.handle(rc, this::handleBucketsV2), Role.MAPPER)); + mainRouter.post(V2_IDENTITY_MAP.toString()).handler(bodyHandler).handler(auth.handleV1( + rc -> encryptedPayloadHandler.handle(rc, this::handleIdentityMapV2), Role.MAPPER)); + mainRouter.post(V3_IDENTITY_MAP.toString()).handler(bodyHandler).handler(auth.handleV1( + rc -> encryptedPayloadHandler.handle(rc, this::handleIdentityMapV3), Role.MAPPER)); + } } private void handleClientSideTokenGenerate(RoutingContext rc) {