diff --git a/.github/workflows/release-docker-image.yaml b/.github/workflows/release-docker-image.yaml index ce29a769..92d831a4 100644 --- a/.github/workflows/release-docker-image.yaml +++ b/.github/workflows/release-docker-image.yaml @@ -67,4 +67,3 @@ jobs: admin_branch: ${{ inputs.admin_branch }} operator_branch: ${{ inputs.operator_branch }} secrets: inherit - \ No newline at end of file diff --git a/conf/default-config.json b/conf/default-config.json index 0543dcad..ddc82ab1 100644 --- a/conf/default-config.json +++ b/conf/default-config.json @@ -9,9 +9,7 @@ "optout_observe_only": false, "optout_s3_path_compat": false, "optout_partner_endpoint_mock": false, - "optout_add_entry_timeout_ms": 1000, "optout_data_dir": "/opt/uid2/optout/", - "optout_replica_uris": null, "optout_producer_replica_id": -1, "optout_producer_replica_id_offset": 0, "optout_producer_max_replicas": 3, @@ -24,8 +22,6 @@ "optout_delete_expired": true, "optout_s3_bucket": null, "optout_s3_folder": null, - "optout_legacy_producer_s3_folder": null, - "optout_sqs_s3_folder": null, "optout_sqs_queue_url": null, "cloud_download_threads": 8, "cloud_upload_threads": 2, @@ -36,7 +32,6 @@ "partners_config_path": "partners/config.json", "operator_type": "public", "uid_instance_id_prefix": "local-optout", - "optout_enqueue_sqs_enabled": false, "optout_sqs_max_queue_size": 0, "optout_sqs_max_messages_per_poll": 10, "optout_sqs_visibility_timeout": 300, @@ -46,4 +41,4 @@ "traffic_filter_config_path": null, "traffic_calc_config_path": null, "manual_override_s3_path": null -} \ No newline at end of file +} diff --git a/conf/integ-config.json b/conf/integ-config.json index aeb81143..b6c75b9c 100644 --- a/conf/integ-config.json +++ b/conf/integ-config.json @@ -5,7 +5,6 @@ "att_token_enc_salt": "", "optout_s3_folder": "optout/", "optout_internal_api_token": "test-optout-internal-key", - "optout_replica_uris": "http://localhost:8081/optout/write,http://localhost:8081/optout/write,http://localhost:8081/optout/write", "partners_config_path": "/com.uid2.core/test/partners/config.json", "partners_metadata_path": "http://localhost:8088/partners/refresh", "operators_metadata_path": "http://localhost:8088/operators/refresh", @@ -13,4 +12,4 @@ "core_api_token": "trusted-partner-key", "enforceJwt": false, "uid_instance_id_prefix": "local-optout" -} \ No newline at end of file +} diff --git a/conf/local-config.json b/conf/local-config.json index b4e2d0e9..ecd65096 100644 --- a/conf/local-config.json +++ b/conf/local-config.json @@ -7,7 +7,6 @@ "optout_partner_endpoint_mock": true, "optout_internal_api_token": "test-optout-internal-key", "optout_producer_max_replicas": 1, - "optout_replica_uris": "http://127.0.0.1:8081/optout/write,http://127.0.0.1:8081/optout/write,http://127.0.0.1:8081/optout/write", "partners_config_path": "/com.uid2.core/test/partners/config.json", "partners_metadata_path": "/com.uid2.core/test/partners/metadata.json", "clients_metadata_path": "/com.uid2.core/test/clients/metadata.json", @@ -18,4 +17,4 @@ "optout_url": "http://localhost:8081", "enforceJwt": false, "uid_instance_id_prefix": "local-optout" -} \ No newline at end of file +} diff --git a/conf/local-e2e-docker-config.json b/conf/local-e2e-docker-config.json index 4a5f9c7a..d1b0eca5 100644 --- a/conf/local-e2e-docker-config.json +++ b/conf/local-e2e-docker-config.json @@ -1,20 +1,25 @@ { "service_instances": 1, + "service_verbose": true, "storage_mock": false, "aws_s3_endpoint": "http://localstack:5001", + "aws_sqs_endpoint": "http://localstack:5001", "optout_s3_bucket": "test-optout-bucket", + "optout_s3_bucket_dropped_requests": "test-optout-bucket", + "optout_sqs_queue_url": "http://localstack:5001/000000000000/optout-queue", "pre_signed_url_expiry": 1800, - "aws_region": "us-east-2", + "aws_region": "us-east-1", "aws_access_key_id": "no access key needed for test", "aws_secret_access_key": "no secret key needed for test", "att_token_enc_key": "", "att_token_enc_salt": "", "optout_producer_max_replicas": 1, + "optout_producer_buffer_size": 65536, "optout_s3_folder": "optout-v2/", "optout_internal_api_token": "test-optout-internal-key", - "optout_replica_uris": "http://localhost:8081/optout/write,http://localhost:8081/optout/write,http://localhost:8081/optout/write", "optout_producer_replica_id": 0, "optout_delta_rotate_interval": 60, + "optout_sqs_delta_window_seconds": 0, "partners_metadata_path": "http://core:8088/partners/refresh", "operators_metadata_path": "http://core:8088/operators/refresh", "core_attest_url": "http://core:8088/attest", diff --git a/src/main/java/com/uid2/optout/Const.java b/src/main/java/com/uid2/optout/Const.java index 296e166f..24b9d082 100644 --- a/src/main/java/com/uid2/optout/Const.java +++ b/src/main/java/com/uid2/optout/Const.java @@ -11,16 +11,12 @@ public static class Config extends com.uid2.shared.Const.Config { public static final String OptOutPartnerEndpointMockProp = "optout_partner_endpoint_mock"; public static final String OptOutObserveOnlyProp = "optout_observe_only"; public static final String OptOutS3PathCompatProp = "optout_s3_path_compat"; - public static final String OptOutAddEntryTimeoutMsProp = "optout_add_entry_timeout_ms"; public static final String OptOutProducerBufferSizeProp = "optout_producer_buffer_size"; public static final String OptOutSenderReplicaIdProp = "optout_sender_replica_id"; public static final String OptOutDeleteExpiredProp = "optout_delete_expired"; public static final String PartnersConfigPathProp = "partners_config_path"; public static final String PartnersMetadataPathProp = "partners_metadata_path"; public static final String OptOutSqsQueueUrlProp = "optout_sqs_queue_url"; - public static final String OptOutSqsEnabledProp = "optout_enqueue_sqs_enabled"; - public static final String OptOutSqsS3FolderProp = "optout_sqs_s3_folder"; // sqs delta producer writes to this folder - public static final String OptOutLegacyProducerS3FolderProp = "optout_legacy_producer_s3_folder"; // legacy producer writes to this folder public static final String OptOutSqsMaxMessagesPerPollProp = "optout_sqs_max_messages_per_poll"; public static final String OptOutSqsVisibilityTimeoutProp = "optout_sqs_visibility_timeout"; public static final String OptOutDeltaJobTimeoutSecondsProp = "optout_delta_job_timeout_seconds"; @@ -35,14 +31,13 @@ public static class Config extends com.uid2.shared.Const.Config { public static final String OptOutTrafficCalcAllowlistRangesProp = "traffic_calc_allowlist_ranges"; public static final String OptOutSqsDeltaWindowSecondsProp = "optout_sqs_delta_window_seconds"; public static final String OptOutSqsMaxQueueSizeProp = "optout_sqs_max_queue_size"; + public static final String AwsSqsEndpointProp = "aws_sqs_endpoint"; } public static class Event { - public static final String DeltaProduce = "delta.produce"; public static final String DeltaProduced = "delta.produced"; public static final String DeltaSentRemote = "delta.sent_remote"; public static final String PartitionProduce = "partition.produce"; public static final String PartitionProduced = "partition.produced"; - public static final String EntryAdd = "entry.add"; } } diff --git a/src/main/java/com/uid2/optout/Main.java b/src/main/java/com/uid2/optout/Main.java index fcd4aa6e..65bc3fbd 100644 --- a/src/main/java/com/uid2/optout/Main.java +++ b/src/main/java/com/uid2/optout/Main.java @@ -11,10 +11,8 @@ import com.uid2.shared.audit.UidInstanceIdProvider; import com.uid2.shared.auth.RotatingOperatorKeyProvider; import com.uid2.shared.cloud.*; -import com.uid2.shared.health.HealthManager; import com.uid2.shared.jmx.AdminApi; import com.uid2.shared.optout.OptOutCloudSync; -import com.uid2.shared.optout.OptOutUtils; import com.uid2.shared.store.CloudPath; import com.uid2.shared.store.scope.GlobalScope; import com.uid2.shared.vertx.CloudSyncVerticle; @@ -41,18 +39,15 @@ import javax.management.*; import java.io.IOException; import java.lang.management.ManagementFactory; +import java.nio.file.Files; import java.nio.file.Path; import java.nio.file.Paths; -import java.time.Instant; import java.util.*; -import java.util.concurrent.atomic.AtomicInteger; -import java.util.concurrent.atomic.AtomicReference; import java.util.function.Supplier; // // produces events: // - cloudsync.optout.refresh (timer-based) -// - delta.produce (timer-based) // public class Main { private static final Logger LOGGER = LoggerFactory.getLogger(Main.class); @@ -65,7 +60,6 @@ public class Main { private final ICloudStorage fsPartnerConfig; private final RotatingOperatorKeyProvider operatorKeyProvider; private final boolean observeOnly; - private final boolean enqueueSqsEnabled; private final UidInstanceIdProvider uidInstanceIdProvider; public Main(Vertx vertx, JsonObject config) throws Exception { @@ -75,7 +69,6 @@ public Main(Vertx vertx, JsonObject config) throws Exception { if (this.observeOnly) { LOGGER.warn("Running Observe ONLY mode: no producer, no sender"); } - this.enqueueSqsEnabled = config.getBoolean(Const.Config.OptOutSqsEnabledProp, false); this.uidInstanceIdProvider = new UidInstanceIdProvider(config); boolean useStorageMock = config.getBoolean(Const.Config.StorageMockProp, false); @@ -153,6 +146,7 @@ public Main(Vertx vertx, JsonObject config) throws Exception { } public static void main(String[] args) { + final String vertxConfigPath = System.getProperty(Const.Config.VERTX_CONFIG_PATH_PROP); if (vertxConfigPath != null) { LOGGER.info("Running CUSTOM CONFIG mode, config: {}", vertxConfigPath); @@ -246,11 +240,6 @@ public void run(String[] args) throws IOException { List futs = new ArrayList<>(); - // determine folder configuration for old producer vs readers - String legacyFolder = this.config.getString(Const.Config.OptOutLegacyProducerS3FolderProp); - String mainFolder = this.config.getString(Const.Config.OptOutS3FolderProp); - boolean useLegacyFolder = legacyFolder != null && !legacyFolder.equals(mainFolder); - // main CloudSyncVerticle - downloads from optout_s3_folder (for readers: partition generator, log sender) OptOutCloudSync cs = new OptOutCloudSync(this.config, true); CloudSyncVerticle cloudSyncVerticle = new CloudSyncVerticle("optout", this.fsOptOut, this.fsLocal, cs, this.config); @@ -265,70 +254,28 @@ public void run(String[] args) throws IOException { // enable partition producing (reads from main folder via cs) cs.enableDeltaMerging(vertx, Const.Event.PartitionProduce); + // deploy partition producer verticle - uploads to S3 via CloudSyncVerticle + OptOutLogProducer partitionProducer = new OptOutLogProducer(this.config, cloudSyncVerticle.eventUpload()); + futs.add(this.deploySingleInstance(partitionProducer)); + // create partners config monitor futs.add(this.createPartnerConfigMonitor(cloudSyncVerticle.eventDownloaded())); - // create CloudSyncVerticle for old producer uploads - // if legacy folder is configured, create upload-only verticle; otherwise reuse main verticle - OptOutCloudSync uploadCs; - CloudSyncVerticle uploadVerticle; - if (useLegacyFolder) { - LOGGER.info("old producer uploads configured to use folder: {} (readers use: {})", legacyFolder, mainFolder); - JsonObject legacyConfig = new JsonObject().mergeIn(this.config) - .put(Const.Config.OptOutS3FolderProp, legacyFolder); - uploadCs = new OptOutCloudSync(legacyConfig, true, true); - // upload-only verticle: won't download legacy files - uploadVerticle = new CloudSyncVerticle("optout-legacy", this.fsOptOut, this.fsLocal, uploadCs, this.config); - futs.add(this.deploySingleInstance(uploadVerticle)); - } else { - // no legacy folder configured, old producer uploads to main folder - uploadCs = cs; - uploadVerticle = cloudSyncVerticle; - } - - // create & deploy log producer verticle - // deltas go to uploadVerticle (legacy folder in legacy mode, main folder otherwise) - // partitions always go to main folder (cloudSyncVerticle) since they read from main folder - OptOutLogProducer logProducer = new OptOutLogProducer(this.config, uploadVerticle.eventUpload(), cloudSyncVerticle.eventUpload()); - futs.add(this.deploySingleInstance(logProducer)); - - // upload last delta produced and potentially not uploaded yet - // always use main cs/cloudSyncVerticle for refresh mechanism (legacy verticle is upload-only) - // uploadCs is used only for determining the cloud path (legacy folder vs main folder) - futs.add((this.uploadLastDelta(cs, uploadCs, logProducer, cloudSyncVerticle.eventRefresh()))); - } - - // deploy sqs producer if enabled - if (this.enqueueSqsEnabled) { - LOGGER.info("sqs enabled, deploying OptOutSqsLogProducer"); + // deploy sqs log producer try { - // sqs delta production uses a separate s3 folder (default: "sqs-delta") - // OptOutCloudSync reads from optout_s3_folder, so we override it with optout_sqs_s3_folder - String sqsFolder = this.config.getString(Const.Config.OptOutSqsS3FolderProp, "sqs-delta"); - JsonObject sqsCloudSyncConfig = new JsonObject().mergeIn(this.config) - .put(Const.Config.OptOutS3FolderProp, sqsFolder); - OptOutCloudSync sqsCs = new OptOutCloudSync(sqsCloudSyncConfig, true); - - // create cloud storage instances - ICloudStorage fsSqs; - boolean useStorageMock = this.config.getBoolean(Const.Config.StorageMockProp, false); - if (useStorageMock) { - fsSqs = this.fsOptOut; - } else { - String optoutBucket = this.config.getString(Const.Config.OptOutS3BucketProp); - fsSqs = CloudUtils.createStorage(optoutBucket, this.config); - } + // Create default config files for local/e2e testing if needed + createDefaultConfigFilesIfNeeded(); String optoutBucketDroppedRequests = this.config.getString(Const.Config.OptOutS3BucketDroppedRequestsProp); ICloudStorage fsSqsDroppedRequests = CloudUtils.createStorage(optoutBucketDroppedRequests, this.config); // deploy sqs log producer - // fires DeltaProduced (notification) not DeltaProduce (trigger) to avoid triggering old producer - OptOutSqsLogProducer sqsLogProducer = new OptOutSqsLogProducer(this.config, fsSqs, fsSqsDroppedRequests, sqsCs, Const.Event.DeltaProduced, null); + OptOutSqsLogProducer sqsLogProducer = new OptOutSqsLogProducer(this.config, this.fsOptOut, fsSqsDroppedRequests, cs, Const.Event.DeltaProduced, null); futs.add(this.deploySingleInstance(sqsLogProducer)); LOGGER.info("sqs log producer deployed, bucket={}, folder={}", - this.config.getString(Const.Config.OptOutS3BucketProp), sqsFolder); + this.config.getString(Const.Config.OptOutS3BucketProp), + this.config.getString(Const.Config.OptOutS3FolderProp)); } catch (IOException e) { LOGGER.error("circuit_breaker_config_error: failed to initialize sqs log producer, delta production will be disabled: {}", e.getMessage(), e); } catch (MalformedTrafficFilterConfigException e) { @@ -339,7 +286,7 @@ public void run(String[] args) throws IOException { } Supplier svcSupplier = () -> { - OptOutServiceVerticle svc = new OptOutServiceVerticle(vertx, this.operatorKeyProvider, this.fsOptOut, this.config, this.uidInstanceIdProvider); + OptOutServiceVerticle svc = new OptOutServiceVerticle(vertx, this.operatorKeyProvider, this.fsOptOut, this.config); // configure where OptOutService receives the latest cloud paths cs.registerNewCloudPathsHandler(ps -> svc.setCloudPaths(ps)); return svc; @@ -367,69 +314,6 @@ public void run(String[] args) throws IOException { }); } - - private Future uploadLastDelta(OptOutCloudSync cs, OptOutCloudSync uploadCs, OptOutLogProducer logProducer, String eventRefresh) { - final String deltaLocalPath; - try { - deltaLocalPath = logProducer.getLastDelta(); - // no need to upload if delta cannot be found - if (deltaLocalPath == null) { - LOGGER.info("found no last delta on disk"); - return Future.succeededFuture(); - } - } catch (Exception ex) { - LOGGER.error("uploadLastDelta error: " + ex.getMessage(), ex); - return Future.failedFuture(ex); - } - - Promise promise = Promise.promise(); - AtomicReference handler = new AtomicReference<>(); - handler.set(cs.registerNewCloudPathsHandler(cloudPaths -> { - try { - cs.unregisterNewCloudPathsHandler(handler.get()); - // use uploadCs for cloud path (may be different folder than cs) - final String deltaCloudPath = uploadCs.toCloudPath(deltaLocalPath); - if (cloudPaths.contains(deltaCloudPath)) { - // if delta is already uploaded, the work is already done - LOGGER.info("found no last delta that needs to be uploaded"); - } else { - this.fsOptOut.upload(deltaLocalPath, deltaCloudPath); - LOGGER.warn("found last delta that is not uploaded " + deltaLocalPath); - LOGGER.warn("uploaded last delta to " + deltaCloudPath); - } - promise.complete(); - } catch (Exception ex) { - final String msg = "unable handle last delta upload: " + ex.getMessage(); - LOGGER.error(msg, ex); - promise.fail(new Exception(msg, ex)); - } - })); - - // refresh now to mitigate a race-condition (cloud refreshed before cloudPaths handler is registered) - vertx.eventBus().send(eventRefresh, 0); - - AtomicInteger counter = new AtomicInteger(0); - vertx.setPeriodic(60*1000, id -> { - if (HealthManager.instance.isHealthy()) { - vertx.cancelTimer(id); - return; - } - - int count = counter.incrementAndGet(); - if (count >= 10) { - LOGGER.error("Unable to refresh from cloud storage and upload last delta..."); - vertx.close(); - System.exit(1); - return; - } - - LOGGER.warn("Waiting for cloud refresh to complete. Sending " + count + " " + eventRefresh + "..."); - vertx.eventBus().send(eventRefresh, 0); - }); - - return promise.future(); - } - private Future createOperatorKeyRotator() { RotatingStoreVerticle rotatingStore = new RotatingStoreVerticle("operators", 10000, operatorKeyProvider); return this.deploySingleInstance(rotatingStore); @@ -502,40 +386,55 @@ private Future setupTimerEvents(String eventCloudRefresh) { // refresh now to ready optout service verticles vertx.eventBus().send(eventCloudRefresh, 0); - int rotateInterval = config.getInteger(Const.Config.OptOutDeltaRotateIntervalProp); int cloudRefreshInterval = config.getInteger(Const.Config.CloudRefreshIntervalProp); - // if we plan to consolidate logs from multiple replicas, we need to make sure they are produced at roughly - // the same time, e.g. if the logs are produced every 5 mins, ideally we'd like to send log.produce event - // at 00, 05, 10, 15 mins etc, of each hour. - // - // first calculate seconds to sleep to get to the above exact intervals - final int secondsToSleep = OptOutUtils.getSecondsBeforeNextSlot(Instant.now(), rotateInterval); - final int msToSleep = secondsToSleep > 0 ? secondsToSleep * 1000 : 1; - LOGGER.info("sleep for " + secondsToSleep + "s before scheduling the first log rotate event"); - vertx.setTimer(msToSleep, v -> { - // at the right starting time, start periodically emitting log.produce event - vertx.setPeriodic(1000 * rotateInterval, id -> { - LOGGER.trace("sending " + Const.Event.DeltaProduce); - vertx.eventBus().send(Const.Event.DeltaProduce, id); - }); - }); - - // add 15s offset to do s3 refresh also synchronized - final int secondsToSleep2 = (secondsToSleep + 15) % cloudRefreshInterval; - final int msToSleep2 = secondsToSleep2 > 0 ? secondsToSleep2 * 1000 : 1; - LOGGER.info("sleep for " + secondsToSleep2 + "s before scheduling the first s3 refresh event"); - vertx.setTimer(msToSleep2, v -> { - LOGGER.info("sending the 1st " + eventCloudRefresh); - vertx.eventBus().send(eventCloudRefresh, -1); - - // periodically emit s3.refresh event - vertx.setPeriodic(1000 * cloudRefreshInterval, id -> { - LOGGER.trace("sending " + eventCloudRefresh); - vertx.eventBus().send(eventCloudRefresh, id); - }); + // periodically emit s3.refresh event + vertx.setPeriodic(1000 * cloudRefreshInterval, id -> { + LOGGER.trace("sending " + eventCloudRefresh); + vertx.eventBus().send(eventCloudRefresh, id); }); return Future.succeededFuture(); } + + /** + * Creates default traffic filter and traffic calc config files for local/e2e testing. + * Only creates files if running in non-production mode and config paths are not set. + */ + private void createDefaultConfigFilesIfNeeded() { + if (Utils.isProductionEnvironment()) { + return; + } + + try { + Path tempDir = Files.createTempDirectory("optout-config"); + + // Create traffic filter config if not set + String filterPath = config.getString(Const.Config.TrafficFilterConfigPathProp); + if (filterPath == null || filterPath.isEmpty()) { + Path filterConfigPath = tempDir.resolve("traffic-filter-config.json"); + String filterConfig = "{\n \"denylist_requests\": []\n}"; + Files.writeString(filterConfigPath, filterConfig); + config.put(Const.Config.TrafficFilterConfigPathProp, filterConfigPath.toString()); + LOGGER.info("Created default traffic filter config at: {}", filterConfigPath); + } + + // Create traffic calc config if not set + String calcPath = config.getString(Const.Config.TrafficCalcConfigPathProp); + if (calcPath == null || calcPath.isEmpty()) { + Path calcConfigPath = tempDir.resolve("traffic-calc-config.json"); + String calcConfig = "{\n" + + " \"traffic_calc_evaluation_window_seconds\": 86400,\n" + + " \"traffic_calc_baseline_traffic\": 1000000,\n" + + " \"traffic_calc_threshold_multiplier\": 10,\n" + + " \"traffic_calc_allowlist_ranges\": []\n" + + "}"; + Files.writeString(calcConfigPath, calcConfig); + config.put(Const.Config.TrafficCalcConfigPathProp, calcConfigPath.toString()); + LOGGER.info("Created default traffic calc config at: {}", calcConfigPath); + } + } catch (IOException e) { + LOGGER.warn("Failed to create default config files: {}", e.getMessage()); + } + } } diff --git a/src/main/java/com/uid2/optout/vertx/DeltaProduceJobStatus.java b/src/main/java/com/uid2/optout/vertx/DeltaProduceJobStatus.java deleted file mode 100644 index e503e200..00000000 --- a/src/main/java/com/uid2/optout/vertx/DeltaProduceJobStatus.java +++ /dev/null @@ -1,85 +0,0 @@ -package com.uid2.optout.vertx; - -import io.vertx.core.json.JsonObject; -import java.time.Instant; - -/** - * Represents the status and result of an async delta production job on a pod. - * - * This class tracks the lifecycle of a delta production job including its state - * (running, completed, failed), timing information, and result or error details. - * - */ -public class DeltaProduceJobStatus { - private final Instant startTime; - private volatile JobState state; - private volatile JsonObject result; - private volatile String errorMessage; - private volatile Instant endTime; - - public enum JobState { - RUNNING, - COMPLETED, - FAILED - } - - public DeltaProduceJobStatus() { - this.startTime = Instant.now(); - this.state = JobState.RUNNING; - } - - /** - * Mark the job as completed with the given result. - * @param result The result details as a JsonObject - */ - public void complete(JsonObject result) { - this.result = result; - this.state = JobState.COMPLETED; - this.endTime = Instant.now(); - } - - /** - * Mark the job as failed with the given error message. - * @param errorMessage Description of the failure - */ - public void fail(String errorMessage) { - this.errorMessage = errorMessage; - this.state = JobState.FAILED; - this.endTime = Instant.now(); - } - - /** - * Get the current state of the job. - * @return The job state - */ - public JobState getState() { - return state; - } - - /** - * Convert the job status to a JSON representation for API responses. - * @return JsonObject with state, timing, and result/error information - */ - public JsonObject toJson() { - JsonObject json = new JsonObject() - .put("state", state.name().toLowerCase()) - .put("start_time", startTime.toString()); - - if (endTime != null) { - json.put("end_time", endTime.toString()); - long durationSeconds = endTime.getEpochSecond() - startTime.getEpochSecond(); - json.put("duration_seconds", durationSeconds); - } - - if (state == JobState.COMPLETED && result != null) { - json.put("result", result); - } - - if (state == JobState.FAILED && errorMessage != null) { - json.put("error", errorMessage); - } - - return json; - } -} - diff --git a/src/main/java/com/uid2/optout/vertx/DeltaProductionResult.java b/src/main/java/com/uid2/optout/vertx/DeltaProductionResult.java deleted file mode 100644 index 7f2eb8fb..00000000 --- a/src/main/java/com/uid2/optout/vertx/DeltaProductionResult.java +++ /dev/null @@ -1,35 +0,0 @@ -package com.uid2.optout.vertx; - -/** - * Result object containing statistics from delta production. - */ -public class DeltaProductionResult { - private final int deltasProduced; - private final int entriesProcessed; - - /* - * indicates that there are still messages in the queue, however, - * not enough time has elapsed to produce a delta file. - * We produce in batches of (5 minutes) - */ - private final boolean stoppedDueToMessagesTooRecent; - - public DeltaProductionResult(int deltasProduced, int entriesProcessed, boolean stoppedDueToMessagesTooRecent) { - this.deltasProduced = deltasProduced; - this.entriesProcessed = entriesProcessed; - this.stoppedDueToMessagesTooRecent = stoppedDueToMessagesTooRecent; - } - - public int getDeltasProduced() { - return deltasProduced; - } - - public int getEntriesProcessed() { - return entriesProcessed; - } - - public boolean stoppedDueToMessagesTooRecent() { - return stoppedDueToMessagesTooRecent; - } -} - diff --git a/src/main/java/com/uid2/optout/vertx/Endpoints.java b/src/main/java/com/uid2/optout/vertx/Endpoints.java index 4356d9af..be6c58cf 100644 --- a/src/main/java/com/uid2/optout/vertx/Endpoints.java +++ b/src/main/java/com/uid2/optout/vertx/Endpoints.java @@ -7,7 +7,6 @@ public enum Endpoints { OPS_HEALTHCHECK("/ops/healthcheck"), OPTOUT_REFRESH("/optout/refresh"), - OPTOUT_WRITE("/optout/write"), OPTOUT_REPLICATE("/optout/replicate"), OPTOUT_DELTA_PRODUCE("/optout/deltaproduce"), OPTOUT_PARTNER_MOCK("/optout/partner_mock"); diff --git a/src/main/java/com/uid2/optout/vertx/OptOutLogProducer.java b/src/main/java/com/uid2/optout/vertx/OptOutLogProducer.java index b9efc427..9467d3d4 100644 --- a/src/main/java/com/uid2/optout/vertx/OptOutLogProducer.java +++ b/src/main/java/com/uid2/optout/vertx/OptOutLogProducer.java @@ -7,7 +7,6 @@ import io.micrometer.core.instrument.Counter; import io.micrometer.core.instrument.Metrics; import io.vertx.core.AbstractVerticle; -import io.vertx.core.Future; import io.vertx.core.Promise; import io.vertx.core.WorkerExecutor; import io.vertx.core.eventbus.EventBus; @@ -18,77 +17,45 @@ import org.slf4j.LoggerFactory; import java.io.File; -import java.io.IOException; -import java.nio.ByteBuffer; -import java.nio.ByteOrder; -import java.nio.channels.FileChannel; import java.nio.file.Files; import java.nio.file.Path; import java.nio.file.Paths; -import java.nio.file.StandardOpenOption; import java.time.Instant; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Optional; // // consumes event: -// - entry.add (string identityHash,advertisingId) -// - delta.produce (json logPaths) // - partition.produce (json logPaths) // // produces events: -// - delta.produced (String filePath) // - partition.produced (String filePath) // -// there are 2 types of optout log file: delta and partition -// delta is raw optout log file produced at a regular cadence (e.g. 5 mins) // partition is merge-sorted logs for a given larger time window (e.g. every 24hr) // public class OptOutLogProducer extends AbstractVerticle { private static final Logger LOGGER = LoggerFactory.getLogger(OptOutLogProducer.class); - private final HealthComponent healthComponent = HealthManager.instance.registerComponent("log-producer"); - private final String deltaProducerDir; + private final HealthComponent healthComponent = HealthManager.instance.registerComponent("partition-producer"); private final String partitionProducerDir; private final int replicaId; - private final ArrayList> bufferedMessages; - private final String eventDeltaProduced; - private final String eventPartitionProduced; + private final String eventUpload; private final FileUtils fileUtils; - private Counter counterDeltaProduced = Counter - .builder("uid2_optout_delta_produced_total") - .description("counter for how many optout delta files are produced") - .register(Metrics.globalRegistry); private Counter counterPartitionProduced = Counter .builder("uid2_optout_partition_produced_total") .description("counter for how many optout partition files are produced") .register(Metrics.globalRegistry); - private ByteBuffer buffer; - private String currentDeltaFileName = null; - private boolean writeInProgress = false; - private boolean shutdownInProgress = false; - private FileChannel fileChannel = null; private WorkerExecutor partitionProducerExecutor = null; - private int writeErrorsSinceDeltaOpen = 0; - public OptOutLogProducer(JsonObject jsonConfig) throws IOException { - this(jsonConfig, Const.Event.DeltaProduced, Const.Event.PartitionProduced); + public OptOutLogProducer(JsonObject jsonConfig) { + this(jsonConfig, null); } - public OptOutLogProducer(JsonObject jsonConfig, String eventDeltaProduced, String eventPartitionProduced) throws IOException { + public OptOutLogProducer(JsonObject jsonConfig, String eventUpload) { this.healthComponent.setHealthStatus(false, "not started"); - this.deltaProducerDir = this.getDeltaProducerDir(jsonConfig); this.partitionProducerDir = this.getPartitionProducerDir(jsonConfig); this.replicaId = OptOutUtils.getReplicaId(jsonConfig); LOGGER.info("replica id is set to " + this.replicaId); - int bufferSize = jsonConfig.getInteger(Const.Config.OptOutProducerBufferSizeProp); - this.buffer = ByteBuffer.allocate(bufferSize).order(ByteOrder.LITTLE_ENDIAN); - this.bufferedMessages = new ArrayList>(); - - this.eventDeltaProduced = eventDeltaProduced; - this.eventPartitionProduced = eventPartitionProduced; + this.eventUpload = eventUpload; this.fileUtils = new FileUtils(jsonConfig); @@ -103,17 +70,14 @@ public void start(Promise startPromise) throws Exception { this.healthComponent.setHealthStatus(false, "still starting"); try { - // create a special worker pool for partition producer, so that it doesn't block log producer + // create a special worker pool for partition producer this.partitionProducerExecutor = vertx.createSharedWorkerExecutor("partition-worker-pool"); EventBus eb = vertx.eventBus(); - eb.consumer(Const.Event.EntryAdd, msg -> this.handleEntryAdd(msg)); - eb.consumer(Const.Event.DeltaProduce, msg -> this.handleDeltaProduce(msg)); eb.consumer(Const.Event.PartitionProduce, msg -> this.handlePartitionProduce(msg)); - // start delta rotating - this.deltaRotate(false).onComplete( - ar -> startPromise.handle(ar)); + this.mkdirsBlocking(); + startPromise.complete(); } catch (Exception ex) { LOGGER.error(ex.getMessage(), ex); startPromise.fail(new Throwable(ex)); @@ -133,298 +97,42 @@ public void start(Promise startPromise) throws Exception { @Override public void stop(Promise stopPromise) throws Exception { LOGGER.info("shutting down OptOutLogProducer."); - this.deltaRotate(true).onComplete( - ar -> stopPromise.handle(ar)); - stopPromise.future() - .onSuccess(v -> LOGGER.info("stopped OptOutLogProducer")) - .onFailure(e -> LOGGER.error("failed stopping OptOutLogProducer", e)); - } - - public String getLastDelta() { - String[] deltaList = (new File(this.deltaProducerDir)).list(); - if (deltaList == null) return null; - Optional last = Arrays.stream(deltaList) - .sorted(OptOutUtils.DeltaFilenameComparatorDescending) - .findFirst(); - if (last.isPresent()) return Paths.get(this.deltaProducerDir, last.get()).toString(); - return null; - } - - private Future deltaRotate(boolean shuttingDown) { - Promise promise = Promise.promise(); - vertx.executeBlocking( - blockingPromise -> { - try { - String newDelta = this.deltaRotateBlocking(shuttingDown); - if (newDelta != null) this.publishDeltaProducedEvent(newDelta); - ; - blockingPromise.complete(); - } catch (Exception ex) { - LOGGER.error(ex.getMessage(), ex); - blockingPromise.fail(new Throwable(ex)); - } - }, - res -> promise.handle(res) - ); - return promise.future(); - } - - private void publishDeltaProducedEvent(String newDelta) { - assert newDelta != null; - this.counterDeltaProduced.increment(); - vertx.eventBus().publish(this.eventDeltaProduced, newDelta); + if (this.partitionProducerExecutor != null) { + this.partitionProducerExecutor.close(); + } + stopPromise.complete(); + LOGGER.info("stopped OptOutLogProducer"); } - private void publishPartitionProducedEvent(String newPartition) { + private void uploadPartition(String newPartition) { assert newPartition != null; this.counterPartitionProduced.increment(); - vertx.eventBus().publish(this.eventPartitionProduced, newPartition); - } - - private void handleEntryAdd(Message entryMsg) { - if (this.shutdownInProgress) { - // if this event is received after shutdownInProgress is set, there is no file to write to at this point - entryMsg.reply(false); - return; - } - - String body = entryMsg.body(); - if (!body.contains(",")) { - LOGGER.error("unexpected optout entry format: " + body); - // fast fail if the message doesn't contain a comma (identity_hash,advertising_id) - entryMsg.reply(false); - return; - } - - String[] parts = body.split(","); - if (parts.length != 3) { - LOGGER.error("unexpected optout entry format: " + body); - // fast fail if the message doesn't contain a comma (identity_hash,advertising_id) - entryMsg.reply(false); - return; - } - - byte[] identityHash = OptOutUtils.base64StringTobyteArray(parts[0]); - if (identityHash == null) { - LOGGER.error("unexpected optout identity_hash: " + parts[0]); - // fast fail if the message doesn't contain a valid identity_hash - entryMsg.reply(false); - return; - } - - byte[] advertisingId = OptOutUtils.base64StringTobyteArray(parts[1]); - if (advertisingId == null) { - LOGGER.error("unexpected optout identity_hash: " + parts[1]); - // fast fail if the message doesn't contain a valid advertising_id - entryMsg.reply(false); - return; - } - - long timestampEpoch = -1; - try { - timestampEpoch = Long.valueOf(parts[2]); - } catch (NumberFormatException e) { - LOGGER.error("unexpected optout timestamp: " + parts[2]); - // fast fail if the message doesn't contain a valid unix epoch timestamp for optout entry - entryMsg.reply(false); - return; + if (this.eventUpload != null) { + LOGGER.info("Partition produced, sending for upload: {}", newPartition); + vertx.eventBus().send(this.eventUpload, newPartition); + } else { + LOGGER.warn("Partition produced but no upload event configured: {}", newPartition); } - - // add current msg to buffer - bufferedMessages.add(entryMsg); - - // if there are no blocking write in progress, start one - if (!this.writeInProgress) { - this.writeInProgress = true; - this.kickoffWrite(); - } - } - - private void kickoffWrite() { - // current msg will be written as a batch - ArrayList batch = new ArrayList(bufferedMessages); - bufferedMessages.clear(); - - vertx.executeBlocking( - promise -> { - this.writeLogBlocking(batch); - promise.complete(); - }, - res -> kickoffWriteCallback() - ); - } - - private void kickoffWriteCallback() { - if (bufferedMessages.size() > 0) - kickoffWrite(); - else - this.writeInProgress = false; - } - - private void handleDeltaProduce(Message m) { - vertx.executeBlocking( - promise -> promise.complete(this.deltaRotateBlocking(false)), - res -> this.publishDeltaProducedEvent(res.result()) - ); } private void handlePartitionProduce(Message msg) { // convert input string into array of delta files to combine into new partition String[] files = OptOutUtils.jsonArrayToStringArray(msg.body()); + LOGGER.info("handlePartitionProduce received event with {} files: {}", files.length, msg.body()); // execute blocking operation using special worker-pool // when completed, publish partition.produced event this.partitionProducerExecutor.executeBlocking( promise -> promise.complete(this.producePartitionBlocking(files)), - res -> this.publishPartitionProducedEvent(res.result()) - ); - } - - // this function is no-throw - private void writeLogBlocking(ArrayList batch) { - if (this.shutdownInProgress) { - // if flag is set, file is already closed and we can't write any more due to verticle being shutdown - assert this.fileChannel == null; - for (Message m : batch) { - m.reply(false); - } - return; - } - - try { - assert this.fileChannel != null; - - // make sure buffer size is large enough - this.checkBufferSize(batch.size() * OptOutConst.EntrySize); - - // write optout entries - for (Message m : batch) { - String body = m.body(); - String[] parts = body.split(","); - assert parts.length == 3; - byte[] identityHash = OptOutUtils.base64StringTobyteArray(parts[0]); - byte[] advertisingId = OptOutUtils.base64StringTobyteArray(parts[1]); - long timestamp = Long.valueOf(parts[2]); - assert identityHash != null && advertisingId != null; - - OptOutEntry.writeTo(buffer, identityHash, advertisingId, timestamp); - } - buffer.flip(); - this.fileChannel.write(buffer); - } catch (Exception ex) { - LOGGER.error("write delta failed: " + ex.getMessage(), ex); - // report unhealthy status - ++this.writeErrorsSinceDeltaOpen; - - // if file write failed, reply false with original messages for entry.add - for (Message m : batch) { - m.reply(false); - } - return; - } finally { - // clearing the buffer - buffer.clear(); - } - - // on success, reply true with original messages - for (Message m : batch) { - m.reply(true); - } - } - - private void checkBufferSize(int dataSize) { - ByteBuffer b = this.buffer; - if (b.capacity() < dataSize) { - int newCapacity = Integer.highestOneBit(dataSize) << 1; - LOGGER.warn("Expanding buffer size: current " + b.capacity() + ", need " + dataSize + ", new " + newCapacity); - this.buffer = ByteBuffer.allocate(newCapacity).order(ByteOrder.LITTLE_ENDIAN); - } - } - - // this function is no-throw - private String deltaRotateBlocking(boolean shuttingDown) { - this.mkdirsBlocking(); - String logProduced = null; - - // close current delta file if needed - if (this.fileChannel != null) { - logProduced = this.currentDeltaFileName; - assert logProduced != null; - - // add a special last entry with ffff hash and the current timestamp - buffer.put(OptOutUtils.onesHashBytes); - buffer.put(OptOutUtils.onesHashBytes); - buffer.putLong(OptOutUtils.nowEpochSeconds()); - try { - buffer.flip(); - this.fileChannel.write(buffer); - } catch (Exception ex) { - // report unhealthy status - ++this.writeErrorsSinceDeltaOpen; - LOGGER.error("write last entry to delta failed: " + ex.getMessage(), ex); - assert false; - } finally { - buffer.clear(); - } - - // save old delta file - try { - this.fileChannel.close(); - this.fileChannel = null; - } catch (Exception ex) { - // report unhealthy status - ++this.writeErrorsSinceDeltaOpen; - LOGGER.error("close delta file failed: " + ex.getMessage(), ex); - assert false; - } - } - - // create a new file if not shutting down - if (!shuttingDown) { - // create new delta file - Path logPath = Paths.get(this.newDeltaFileName()); - try { - this.fileChannel = FileChannel.open(logPath, StandardOpenOption.WRITE, StandardOpenOption.CREATE_NEW, StandardOpenOption.SYNC); - this.currentDeltaFileName = logPath.toString(); - } catch (Exception ex) { - // report unhealthy status - ++this.writeErrorsSinceDeltaOpen; - LOGGER.error("open delta file failed" + ex.getMessage(), ex); - assert false; - } - - // asserting buffer is clear - assert this.buffer.position() == 0; - - // add a special first entry with null hash and the current timestamp - buffer.put(OptOutUtils.nullHashBytes); - buffer.put(OptOutUtils.nullHashBytes); - buffer.putLong(OptOutUtils.nowEpochSeconds()); - try { - buffer.flip(); - this.fileChannel.write(buffer); - } catch (Exception ex) { - // report unhealthy status - ++this.writeErrorsSinceDeltaOpen; - LOGGER.error("write first entry to delta failed: " + ex.getMessage(), ex); - assert false; - } finally { - buffer.clear(); + res -> { + if (res.succeeded()) { + LOGGER.info("partition produced successfully: {}", res.result()); + this.uploadPartition(res.result()); + } else { + LOGGER.error("Failed to produce partition: " + res.cause().getMessage(), res.cause()); + } } - - // reset isHealthy status when a new file is open - this.writeErrorsSinceDeltaOpen = 0; - } - - if (shuttingDown) { - // there is a race condition, between: - // a) the delta being rotated for shutting down - // b) the verticle being shutdown - // and set the flag here to fail those entry.add requests that are received in between - this.shutdownInProgress = true; - } - - return logProduced; + ); } // this function is no throw @@ -461,46 +169,38 @@ private String producePartitionBlocking(String[] logFiles) { private void mkdirsBlocking() { FileSystem fs = vertx.fileSystem(); - if (!fs.existsBlocking(this.deltaProducerDir)) { - fs.mkdirsBlocking(this.deltaProducerDir); - } if (!fs.existsBlocking(this.partitionProducerDir)) { fs.mkdirsBlocking(this.partitionProducerDir); } } - - private void deleteExpiredLogsLocally() throws IOException { - deleteExpiredLogsLocally(this.deltaProducerDir); + private void deleteExpiredLogsLocally() { deleteExpiredLogsLocally(this.partitionProducerDir); } - private void deleteExpiredLogsLocally(String dirName) throws IOException { + private void deleteExpiredLogsLocally(String dirName) { if (!Files.exists(Paths.get(dirName))) return; Instant now = Instant.now(); File dir = new File(dirName); File[] files = dir.listFiles(); + if (files == null) return; for (File f : files) { if (fileUtils.isDeltaOrPartitionExpired(now, f.getName())) { - Files.delete(f.toPath()); - LOGGER.warn("deleted expired log: " + f.getName()); + try { + Files.delete(f.toPath()); + LOGGER.warn("deleted expired log: " + f.getName()); + } catch (Exception e) { + LOGGER.error("Error deleting expired log: " + f.getName(), e); + } } } } - private String newDeltaFileName() { - return Paths.get(this.deltaProducerDir, OptOutUtils.newDeltaFileName(this.replicaId)).toString(); - } - private String newPartitionFileName() { return Paths.get(this.partitionProducerDir, OptOutUtils.newPartitionFileName(this.replicaId)).toString(); } - public static String getDeltaProducerDir(JsonObject config) { - return String.format("%s/producer/delta", config.getString(Const.Config.OptOutDataDirProp)); - } - public static String getPartitionProducerDir(JsonObject config) { return String.format("%s/producer/partition", config.getString(Const.Config.OptOutDataDirProp)); } diff --git a/src/main/java/com/uid2/optout/vertx/OptOutServiceVerticle.java b/src/main/java/com/uid2/optout/vertx/OptOutServiceVerticle.java index 5197ee44..20cc0600 100644 --- a/src/main/java/com/uid2/optout/vertx/OptOutServiceVerticle.java +++ b/src/main/java/com/uid2/optout/vertx/OptOutServiceVerticle.java @@ -1,15 +1,12 @@ package com.uid2.optout.vertx; import com.uid2.optout.Const; -import com.uid2.optout.auth.InternalAuthMiddleware; import com.uid2.optout.sqs.SqsMessageOperations; -import com.uid2.optout.web.QuorumWebClient; import com.uid2.shared.Utils; import com.uid2.shared.attest.AttestationTokenService; import com.uid2.shared.attest.IAttestationTokenService; import com.uid2.shared.attest.JwtService; import com.uid2.shared.audit.Audit; -import com.uid2.shared.audit.UidInstanceIdProvider; import com.uid2.shared.auth.IAuthorizableProvider; import com.uid2.shared.auth.OperatorKey; import com.uid2.shared.auth.Role; @@ -24,8 +21,6 @@ import com.uid2.shared.optout.OptOutUtils; import com.uid2.shared.vertx.RequestCapturingHandler; import io.vertx.core.*; -import io.vertx.core.eventbus.DeliveryOptions; -import io.vertx.core.eventbus.Message; import io.vertx.core.http.*; import io.vertx.core.json.JsonObject; import org.slf4j.Logger; @@ -34,10 +29,15 @@ import io.vertx.ext.web.RoutingContext; import io.vertx.ext.web.handler.BodyHandler; import io.vertx.ext.web.handler.CorsHandler; +import software.amazon.awssdk.auth.credentials.AnonymousCredentialsProvider; +import software.amazon.awssdk.auth.credentials.AwsBasicCredentials; +import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider; +import software.amazon.awssdk.regions.Region; import software.amazon.awssdk.services.sqs.SqsClient; +import software.amazon.awssdk.services.sqs.SqsClientBuilder; import software.amazon.awssdk.services.sqs.model.SendMessageRequest; -import software.amazon.awssdk.services.sqs.model.SendMessageResponse; +import java.net.URI; import java.net.URL; import java.time.Instant; import java.util.ArrayList; @@ -62,24 +62,17 @@ public class OptOutServiceVerticle extends AbstractVerticle { private final boolean isVerbose; private final int listenPort; private final int deltaRotateInterval; - private final QuorumWebClient replicaWriteClient; - private final DeliveryOptions defaultDeliveryOptions; private final AtomicReference> cloudPaths = new AtomicReference<>(); private final ICloudStorage cloudStorage; private final boolean enableOptOutPartnerMock; - private final String internalApiKey; - private final InternalAuthMiddleware internalAuth; private final SqsClient sqsClient; private final String sqsQueueUrl; - private final boolean sqsEnabled; private final int sqsMaxQueueSize; - private final String podName; public OptOutServiceVerticle(Vertx vertx, IAuthorizableProvider clientKeyProvider, ICloudStorage cloudStorage, - JsonObject jsonConfig, - UidInstanceIdProvider uidInstanceIdProvider) { + JsonObject jsonConfig) { this.healthComponent.setHealthStatus(false, "not started"); this.cloudStorage = cloudStorage; @@ -103,45 +96,41 @@ public OptOutServiceVerticle(Vertx vertx, this.deltaRotateInterval = jsonConfig.getInteger(Const.Config.OptOutDeltaRotateIntervalProp); this.isVerbose = jsonConfig.getBoolean(Const.Config.ServiceVerboseProp, false); - String replicaUrisConfig = jsonConfig.getString(Const.Config.OptOutReplicaUris); - if (replicaUrisConfig == null) { - LOGGER.warn(Const.Config.OptOutReplicaUris + " not configured, not instantiating multi-replica write client"); - this.replicaWriteClient = null; - } else { - String[] replicaUris = replicaUrisConfig.split(","); - this.replicaWriteClient = new QuorumWebClient(vertx, replicaUris, uidInstanceIdProvider); - } - - this.defaultDeliveryOptions = new DeliveryOptions(); - int addEntryTimeoutMs = jsonConfig.getInteger(Const.Config.OptOutAddEntryTimeoutMsProp); - this.defaultDeliveryOptions.setSendTimeout(addEntryTimeoutMs); - - this.internalApiKey = jsonConfig.getString(Const.Config.OptOutInternalApiTokenProp); - this.internalAuth = new InternalAuthMiddleware(this.internalApiKey, "optout"); this.enableOptOutPartnerMock = jsonConfig.getBoolean(Const.Config.OptOutPartnerEndpointMockProp); - this.sqsEnabled = jsonConfig.getBoolean(Const.Config.OptOutSqsEnabledProp, false); this.sqsQueueUrl = jsonConfig.getString(Const.Config.OptOutSqsQueueUrlProp); this.sqsMaxQueueSize = jsonConfig.getInteger(Const.Config.OptOutSqsMaxQueueSizeProp, 0); // 0 = no limit - this.podName = jsonConfig.getString("POD_NAME"); SqsClient tempSqsClient = null; - if (this.sqsEnabled) { - if (this.sqsQueueUrl == null || this.sqsQueueUrl.isEmpty()) { - LOGGER.warn("SQS enabled but queue URL not configured"); - } else { - try { - tempSqsClient = SqsClient.builder().build(); - LOGGER.info("SQS client initialized successfully"); - LOGGER.info("SQS client region: " + tempSqsClient.serviceClientConfiguration().region()); - LOGGER.info("SQS queue URL configured: " + this.sqsQueueUrl); - } catch (Exception e) { - LOGGER.error("Failed to initialize SQS client: " + e.getMessage(), e); - tempSqsClient = null; + if (this.sqsQueueUrl == null || this.sqsQueueUrl.isEmpty()) { + LOGGER.error("sqs_error: queue url not configured"); + } else { + try { + SqsClientBuilder builder = SqsClient.builder(); + + // Support custom endpoint for LocalStack + String awsEndpoint = jsonConfig.getString(Const.Config.AwsSqsEndpointProp); + LOGGER.info("SQS endpoint from config: {}", awsEndpoint); + if (awsEndpoint != null && !awsEndpoint.isEmpty()) { + builder.endpointOverride(URI.create(awsEndpoint)); + String region = jsonConfig.getString("aws_region"); + LOGGER.info("AWS region from config: {}", region); + if (region == null || region.isEmpty()) { + throw new IllegalArgumentException("aws_region must be configured when using custom SQS endpoint"); + } + builder.region(Region.of(region)); + // Use static credentials for LocalStack + builder.credentialsProvider(StaticCredentialsProvider.create( + AwsBasicCredentials.create("test", "test"))); + LOGGER.info("SQS client using custom endpoint: {}, region: {}", awsEndpoint, region); } + + tempSqsClient = builder.build(); + LOGGER.info("SQS client initialized successfully for queue: {}", this.sqsQueueUrl); + } catch (Exception e) { + LOGGER.error("Failed to initialize SQS client: " + e.getMessage(), e); + tempSqsClient = null; } - } else { - LOGGER.info("SQS integration disabled"); } this.sqsClient = tempSqsClient; } @@ -214,8 +203,6 @@ private Router createRouter() { .allowedHeader("Access-Control-Allow-Headers") .allowedHeader("Content-Type")); - router.route(Endpoints.OPTOUT_WRITE.toString()) - .handler(internalAuth.handleWithAudit(this::handleWrite)); router.route(Endpoints.OPTOUT_REPLICATE.toString()) .handler(auth.handleWithAudit(this::handleReplicate, Arrays.asList(Role.OPTOUT))); router.route(Endpoints.OPTOUT_REFRESH.toString()) @@ -292,81 +279,18 @@ private void handleHealthCheck(RoutingContext rc) { } private void handleReplicate(RoutingContext routingContext) { - - if(this.sqsEnabled && this.sqsClient != null){ - this.handleQueue(routingContext); - } - HttpServerRequest req = routingContext.request(); - - MultiMap params = req.params(); - String identityHash = req.getParam(IDENTITY_HASH); - String advertisingId = req.getParam(ADVERTISING_ID); - JsonObject body = routingContext.body().asJsonObject(); - HttpServerResponse resp = routingContext.response(); - if (identityHash == null || params.getAll(IDENTITY_HASH).size() != 1) { - this.sendBadRequestError(resp); - return; - } - if (advertisingId == null || params.getAll(ADVERTISING_ID).size() != 1) { - this.sendBadRequestError(resp); - return; - } - - if (!this.isGetOrPost(req)) { - this.sendBadRequestError(resp); - } else if (this.replicaWriteClient == null) { - this.sendInternalServerError(resp, "optout replicas not configured"); - } - else { - try { - this.replicaWriteClient.get(r -> { - r.setQueryParam(IDENTITY_HASH, identityHash); - r.setQueryParam(ADVERTISING_ID, advertisingId); - r.headers().set("Authorization", "Bearer " + internalApiKey); - r.headers().set(Audit.UID_INSTANCE_ID_HEADER, this.replicaWriteClient.getInstanceId()); - return r; - }).onComplete(ar -> { - final String maskedId1 = Utils.maskPii(identityHash); - final String maskedId2 = Utils.maskPii(advertisingId); - if (ar.failed()) { - LOGGER.error("failed sending optout/write to remote endpoints - identity_hash: " + maskedId1 + ", advertising_id: " + maskedId2); - LOGGER.error(ar.cause().getMessage(), new Exception(ar.cause())); - this.sendInternalServerError(resp, ar.cause().toString()); - } else { - String timestamp = null; - for (io.vertx.ext.web.client.HttpResponse replicaResp : ar.result()) { - if (replicaResp != null && replicaResp.statusCode() == 200) { - timestamp = replicaResp.bodyAsString(); - } - } - - if (timestamp == null) { - sendInternalServerError(resp, "Unexpected result calling internal write api"); - } else { - LOGGER.info("sent optout/write to remote endpoints - identity_hash: " + maskedId1 + ", advertising_id: " + maskedId1); - resp.setStatusCode(200) - .setChunked(true) - .write(timestamp); - resp.end(); - } - } - }); - } catch (Exception ex) { - LOGGER.error("error creating requests for remote optout/write call:", ex); - this.sendInternalServerError(resp, ex.getMessage()); - } - } - } - - private void handleQueue(RoutingContext routingContext) { - HttpServerRequest req = routingContext.request(); // skip sqs queueing for validator operators (reference-operator, candidate-operator) // this avoids triple processing of the same request String instanceId = req.getHeader(Audit.UID_INSTANCE_ID_HEADER); if (isValidatorOperatorRequest(instanceId)) { + long optoutEpoch = OptOutUtils.nowEpochSeconds(); + resp.setStatusCode(200) + .setChunked(true) + .write(String.valueOf(optoutEpoch)); + resp.end(); return; } @@ -379,22 +303,36 @@ private void handleQueue(RoutingContext routingContext) { String email = body != null ? body.getString(EMAIL) : null; String phone = body != null ? body.getString(PHONE) : null; - HttpServerResponse resp = routingContext.response(); - - // while old delta production is enabled, response is handled by replicate logic - - // validate parameters - same as replicate + // validate parameters if (identityHash == null || params.getAll(IDENTITY_HASH).size() != 1) { - // this.sendBadRequestError(resp); + this.sendBadRequestError(resp); return; } if (advertisingId == null || params.getAll(ADVERTISING_ID).size() != 1) { - // this.sendBadRequestError(resp); + this.sendBadRequestError(resp); + return; + } + + // validate base64 decoding + byte[] hashBytes = OptOutUtils.base64StringTobyteArray(identityHash); + byte[] idBytes = OptOutUtils.base64StringTobyteArray(advertisingId); + if (hashBytes == null) { + this.sendBadRequestError(resp); + return; + } + if (idBytes == null) { + this.sendBadRequestError(resp); + return; + } + + // optout null/ones is not allowed + if (OptOutEntry.isSpecialHash(hashBytes)) { + this.sendBadRequestError(resp); return; } if (!this.isGetOrPost(req)) { - // this.sendBadRequestError(resp); + this.sendBadRequestError(resp); return; } @@ -429,64 +367,19 @@ private void handleQueue(RoutingContext routingContext) { .build(); this.sqsClient.sendMessage(sendMsgRequest); - return null; + return OptOutUtils.nowEpochSeconds(); + }).onSuccess(optoutEpoch -> { + resp.setStatusCode(200) + .setChunked(true) + .write(String.valueOf(optoutEpoch)); + resp.end(); }).onFailure(cause -> { LOGGER.error("failed to queue message, cause={}", cause.getMessage()); + resp.setStatusCode(500).end(); }); } catch (Exception ex) { - // this.sendInternalServerError(resp, ex.getMessage()); - LOGGER.error("Error processing queue request: " + ex.getMessage(), ex); - } - } - - private void handleWrite(RoutingContext routingContext) { - HttpServerRequest req = routingContext.request(); - MultiMap params = req.params(); - String identityHash = req.getParam(IDENTITY_HASH); - String advertisingId = req.getParam(ADVERTISING_ID); - JsonObject body = routingContext.body().asJsonObject(); - - HttpServerResponse resp = routingContext.response(); - if (identityHash == null || params.getAll(IDENTITY_HASH).size() != 1) { - this.sendBadRequestError(resp); - return; - } - if (advertisingId == null || params.getAll(ADVERTISING_ID).size() != 1) { - this.sendBadRequestError(resp); - return; - } - - byte[] hashBytes = OptOutUtils.base64StringTobyteArray(identityHash); - byte[] idBytes = OptOutUtils.base64StringTobyteArray(advertisingId); - if (hashBytes == null) { - this.sendBadRequestError(resp); - } else if (idBytes == null) { - this.sendBadRequestError(resp); - } else if (!this.isGetOrPost(req)) { - this.sendBadRequestError(resp); - } else if (body != null) { - this.sendBadRequestError(resp); - } else if (OptOutEntry.isSpecialHash(hashBytes)) { - // optout null/ones is not allowed - this.sendBadRequestError(resp); - } else { - long optoutEpoch = OptOutUtils.nowEpochSeconds(); - String msg = identityHash + "," + advertisingId + "," + String.valueOf(optoutEpoch); - vertx.eventBus().request(Const.Event.EntryAdd, msg, this.defaultDeliveryOptions, - ar -> this.handleEntryAdded(ar, resp, optoutEpoch)); - } - } - - private void handleEntryAdded(AsyncResult> res, HttpServerResponse resp, long optoutEpoch) { - if (res.failed()) { - this.sendInternalServerError(resp, res.cause().toString()); - } else if (!res.result().body().equals(true)) { - this.sendInternalServerError(resp, "Unexpected msg reply: " + res.result().body()); - } else { - resp.setStatusCode(200) - .setChunked(true) - .write(String.valueOf(optoutEpoch)); - resp.end(); + LOGGER.error("Error processing replicate request: " + ex.getMessage(), ex); + resp.setStatusCode(500).end(); } } @@ -517,17 +410,6 @@ private void sendInternalServerError(HttpServerResponse resp, String why) { } } - private void sendServiceUnavailableError(HttpServerResponse resp, String why) { - if (this.isVerbose && why != null) { - resp.setStatusCode(503); - resp.setChunked(true); - resp.write(why); - resp.end(); - } else { - sendStatus(503, resp); - } - } - private void sendBadRequestError(HttpServerResponse response) { sendStatus(400, response); } diff --git a/src/main/java/com/uid2/optout/vertx/OptOutSqsLogProducer.java b/src/main/java/com/uid2/optout/vertx/OptOutSqsLogProducer.java index c33b11dc..c19e20f5 100644 --- a/src/main/java/com/uid2/optout/vertx/OptOutSqsLogProducer.java +++ b/src/main/java/com/uid2/optout/vertx/OptOutSqsLogProducer.java @@ -30,10 +30,17 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import software.amazon.awssdk.auth.credentials.AnonymousCredentialsProvider; +import software.amazon.awssdk.auth.credentials.AwsBasicCredentials; +import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider; +import software.amazon.awssdk.regions.Region; import software.amazon.awssdk.services.sqs.SqsClient; +import software.amazon.awssdk.services.sqs.SqsClientBuilder; import static com.uid2.optout.util.HttpResponseHelper.*; +import java.net.URI; + import java.io.IOException; import java.util.concurrent.atomic.AtomicReference; @@ -110,7 +117,28 @@ public OptOutSqsLogProducer(JsonObject jsonConfig, ICloudStorage cloudStorage, I if (queueUrl == null || queueUrl.isEmpty()) { throw new IOException("sqs queue url not configured"); } - this.sqsClient = sqsClient != null ? sqsClient : SqsClient.builder().build(); + if (sqsClient != null) { + this.sqsClient = sqsClient; + } else { + SqsClientBuilder builder = SqsClient.builder(); + // Support custom endpoint for LocalStack + String awsEndpoint = jsonConfig.getString(Const.Config.AwsSqsEndpointProp); + LOGGER.info("SQS endpoint from config: {}", awsEndpoint); + if (awsEndpoint != null && !awsEndpoint.isEmpty()) { + builder.endpointOverride(URI.create(awsEndpoint)); + String region = jsonConfig.getString("aws_region"); + LOGGER.info("AWS region from config: {}", region); + if (region == null || region.isEmpty()) { + throw new IllegalArgumentException("aws_region must be configured when using custom SQS endpoint"); + } + builder.region(Region.of(region)); + // Use static credentials for LocalStack + builder.credentialsProvider(StaticCredentialsProvider.create( + AwsBasicCredentials.create("test", "test"))); + LOGGER.info("SQS client using custom endpoint: {}, region: {}", awsEndpoint, region); + } + this.sqsClient = builder.build(); + } LOGGER.info("sqs client initialized for queue: {}", queueUrl); // http server configuration @@ -122,7 +150,7 @@ public OptOutSqsLogProducer(JsonObject jsonConfig, ICloudStorage cloudStorage, I // circuit breaker tools this.trafficFilter = new TrafficFilter(jsonConfig.getString(Const.Config.TrafficFilterConfigPathProp)); - this.trafficCalculator = new TrafficCalculator(cloudStorage, jsonConfig.getString(Const.Config.OptOutSqsS3FolderProp), jsonConfig.getString(Const.Config.TrafficCalcConfigPathProp)); + this.trafficCalculator = new TrafficCalculator(cloudStorage, jsonConfig.getString(Const.Config.OptOutS3FolderProp), jsonConfig.getString(Const.Config.TrafficCalcConfigPathProp)); // configuration values for orchestrator setup int replicaId = OptOutUtils.getReplicaId(jsonConfig); diff --git a/src/main/java/com/uid2/optout/web/QuorumWebClient.java b/src/main/java/com/uid2/optout/web/QuorumWebClient.java deleted file mode 100644 index f5c99aa9..00000000 --- a/src/main/java/com/uid2/optout/web/QuorumWebClient.java +++ /dev/null @@ -1,82 +0,0 @@ -package com.uid2.optout.web; - -import com.uid2.shared.audit.UidInstanceIdProvider; -import io.vertx.core.Future; -import io.vertx.core.Promise; -import io.vertx.core.Vertx; -import io.vertx.core.buffer.Buffer; -import io.vertx.core.http.HttpMethod; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import io.vertx.ext.web.client.HttpRequest; -import io.vertx.ext.web.client.HttpResponse; -import io.vertx.ext.web.client.WebClient; - -import java.net.URI; -import java.util.concurrent.atomic.AtomicInteger; -import java.util.function.Function; - -public class QuorumWebClient { - private static final Logger LOGGER = LoggerFactory.getLogger(QuorumWebClient.class); - private final UidInstanceIdProvider uidInstanceIdProvider; - - final URI[] uris; - final WebClient[] clients; - - public QuorumWebClient(Vertx vertx, String[] uris, UidInstanceIdProvider uidInstanceIdProvider) { - this.uris = new URI[uris.length]; - this.clients = new WebClient[uris.length]; - for (int i = 0; i < uris.length; ++i) { - LOGGER.info("creating QuorumWebClient " + i + " " + uris[i].toString()); - this.uris[i] = URI.create(uris[i]); - this.clients[i] = WebClient.create(vertx); - } - this.uidInstanceIdProvider = uidInstanceIdProvider; - } - - public String getInstanceId() { - return this.uidInstanceIdProvider.getInstanceId(); - } - - public Future[]> get(Function, HttpRequest> requestCreator) { - Promise[]> promise = Promise.promise(); - HttpResponse[] resps = new HttpResponse[this.uris.length]; - AtomicInteger succeeded = new AtomicInteger(0); - AtomicInteger completed = new AtomicInteger(0); - for (int i = 0; i < this.uris.length; ++i) { - final int iterations = i; - final URI uri = this.uris[i]; - HttpRequest req = this.clients[i].requestAbs(HttpMethod.GET, uri.toString()); - requestCreator.apply(req).send(ar -> { - final int quorum = this.quorumThreshold(); - if (ar.succeeded()) { - HttpResponse resp = ar.result(); - resps[iterations] = resp; - - if (resp.statusCode() != 200) { - LOGGER.error("remote optout/write request " + uri + " returned " + resp.statusCode()); - } else if (succeeded.incrementAndGet() == quorum) { - promise.complete(resps); - return; - } - } else { - LOGGER.error("Failed sending request to " + uri, ar.cause()); - } - - if (completed.incrementAndGet() == this.uris.length) { - // TODO construct aggregate errors - promise.fail(new Throwable("Failed on quorum")); - } - }); - } - return promise.future(); - } - - private int quorumThreshold() { - //// Uncomment this to use simple majority quorum - // return this.uris.length / 2 + 1; - - // As of now, quorum is simply 1 replica returns 200 - return 1; - } -} diff --git a/src/main/resources/localstack/init-aws.sh b/src/main/resources/localstack/init-aws.sh index 58e3fabf..e4b51cdc 100755 --- a/src/main/resources/localstack/init-aws.sh +++ b/src/main/resources/localstack/init-aws.sh @@ -1,9 +1,18 @@ #!/usr/bin/env bash +echo "Starting optout LocalStack initialization..." + +# Set region to match optout service config +export AWS_DEFAULT_REGION=us-east-1 + date="$(date '+%Y-%m-%d')" full_ts="$(date '+%Y-%m-%dT%H.%M.%SZ')" delta_file="optout-delta-000_${full_ts}_64692b14.dat" -aws s3 --endpoint-url http://localhost:5001 mb s3://test-optout-bucket -aws s3 --endpoint-url http://localhost:5001 cp /s3/optout/optout-v2/delta/2023-01-01/ s3://test-optout-bucket/optout-v2/delta/2023-01-01/ --recursive -aws s3 --endpoint-url http://localhost:5001 cp /s3/optout/optout-v2/delta/optout-delta-000.dat "s3://test-optout-bucket/optout-v2/delta/${date}/${delta_file}" \ No newline at end of file +awslocal s3 mb s3://test-optout-bucket || echo "Bucket may already exist" + +awslocal s3 cp /s3/optout/optout-v2/delta/2023-01-01/ s3://test-optout-bucket/optout-v2/delta/2023-01-01/ --recursive +awslocal s3 cp /s3/optout/optout-v2/delta/optout-delta-000.dat "s3://test-optout-bucket/optout-v2/delta/${date}/${delta_file}" + +awslocal sqs create-queue --queue-name optout-queue +echo "Queue creation exit code: $?" diff --git a/src/test/java/com/uid2/optout/TestUtils.java b/src/test/java/com/uid2/optout/TestUtils.java index 541349c8..c358b3d0 100644 --- a/src/test/java/com/uid2/optout/TestUtils.java +++ b/src/test/java/com/uid2/optout/TestUtils.java @@ -1,9 +1,7 @@ package com.uid2.optout; -import com.uid2.optout.vertx.OptOutLogProducer; import com.uid2.optout.vertx.OptOutServiceVerticle; import com.uid2.optout.vertx.TestOperatorKeyProvider; -import com.uid2.shared.audit.UidInstanceIdProvider; import com.uid2.shared.optout.*; import io.vertx.core.Vertx; import io.vertx.core.json.JsonObject; @@ -20,12 +18,8 @@ import java.util.stream.Collectors; public class TestUtils { - public static OptOutLogProducer createOptOutLogProducer(Vertx vertx, JsonObject config) throws Exception { - return new OptOutLogProducer(config); - } - - public static OptOutServiceVerticle createOptOutService(Vertx vertx, JsonObject config, UidInstanceIdProvider uidInstanceIdProvider) throws Exception { - return new OptOutServiceVerticle(vertx, new TestOperatorKeyProvider(), null, config, uidInstanceIdProvider); + public static OptOutServiceVerticle createOptOutService(Vertx vertx, JsonObject config) throws Exception { + return new OptOutServiceVerticle(vertx, new TestOperatorKeyProvider(), null, config); } public static OptOutEntry[] toEntries(long... ids) { diff --git a/src/test/java/com/uid2/optout/vertx/OptOutLogProducerTest.java b/src/test/java/com/uid2/optout/vertx/OptOutLogProducerTest.java index bca39067..b383dfd4 100644 --- a/src/test/java/com/uid2/optout/vertx/OptOutLogProducerTest.java +++ b/src/test/java/com/uid2/optout/vertx/OptOutLogProducerTest.java @@ -5,7 +5,6 @@ import com.uid2.shared.optout.*; import com.uid2.shared.vertx.VertxUtils; import io.vertx.core.*; -import io.vertx.core.eventbus.DeliveryOptions; import io.vertx.core.eventbus.MessageConsumer; import io.vertx.core.json.JsonObject; import io.vertx.ext.unit.Async; @@ -16,19 +15,14 @@ import org.junit.Test; import org.junit.runner.RunWith; -import java.io.File; import java.io.IOException; import java.nio.file.Files; -import java.nio.file.Path; import java.nio.file.Paths; -import java.time.Instant; -import java.util.ArrayList; -import java.util.List; import java.util.Set; @RunWith(VertxUnitRunner.class) public class OptOutLogProducerTest { - // set data_dir option to use tmpDir during test + private static final String TEST_UPLOAD_EVENT = "test.partition.upload"; private Vertx vertx; public OptOutLogProducerTest() throws IOException { @@ -40,24 +34,15 @@ public void setup(TestContext context) throws Exception { JsonObject config = VertxUtils.getJsonConfig(vertx); // set data_dir option to use tmpDir during test config.put(Const.Config.OptOutDataDirProp, OptOutUtils.tmpDir); - OptOutLogProducer producer = TestUtils.createOptOutLogProducer(vertx, config); + OptOutLogProducer producer = new OptOutLogProducer(config, TEST_UPLOAD_EVENT); vertx.deployVerticle(producer, context.asyncAssertSuccess()); } @After public void tearDown(TestContext context) { - // context.assertTrue(OptOutLogProducer.instance.isHealthy()); vertx.close(context.asyncAssertSuccess()); } - @Test - public void callLogProduce_expectLogProduced(TestContext ctx) { - long[] p = OptOutUtils.toLongArray(0); - long[] n = OptOutUtils.toLongArray(1, 2, 3); - verifySuccessLogProduce(ctx).onFailure(ex -> ctx.fail(ex)) - .onSuccess(logFile -> verifyLog(ctx, logFile, p, n)); - } - @Test public void callSnapshotProduce_expectSnapshotProduced(TestContext ctx) { String log1 = TestUtils.newDeltaFile(1, 2, 3); @@ -72,233 +57,19 @@ public void callSnapshotProduce_expectSnapshotProduced(TestContext ctx) { .onSuccess(logFile -> verifySnapshot(ctx, logFile, p, n)); } - @Test - public void addEntry_verifyResults(TestContext ctx) { - verifySuccessEntryAdd(ctx, OptOutEntry.idHashB64FromLong(1)); - } - - @Test - public void addEntriesParallel_verifyResults(TestContext ctx) { - for (int i = 0; i < 100; ++i) { - verifySuccessEntryAdd(ctx, OptOutEntry.idHashB64FromLong(i)); - } - } - - @Test - public void addEntriesSerial_verifyResults(TestContext ctx) { - Future f = Future.succeededFuture(); - for (int i = 0; i < 100; ++i) { - final long id = i; - f = f.compose(v -> verifySuccessEntryAdd(ctx, OptOutEntry.idHashB64FromLong(id))); - } - f.onComplete(ctx.asyncAssertSuccess()); - } - - @Test - public void addEntriesParallelAndProduce_verifyResults(TestContext ctx) { - List fs = new ArrayList<>(); - List p = new ArrayList<>(); - List n = new ArrayList<>(); - for (long i = 0; i < 100; ++i) { - p.add(i); - n.add(1000 + i); - fs.add(verifySuccessEntryAdd(ctx, OptOutEntry.idHashB64FromLong(i))); - } - - CompositeFuture.all(fs) - .compose(v -> verifySuccessLogProduce(ctx)) - .onFailure(ex -> ctx.fail(ex)) - .onSuccess(logFile -> verifyLog(ctx, logFile, OptOutUtils.toArray(p), OptOutUtils.toArray(n))); - } - - @Test - public void addEntriesSerialAndProduce_verifyResults(TestContext ctx) { - Future f = Future.succeededFuture(); - List p = new ArrayList<>(); - List n = new ArrayList<>(); - for (long i = 0; i < 100; ++i) { - p.add(i); - n.add(1000 + i); - final long id = i; - f = f.compose(v -> verifySuccessEntryAdd(ctx, OptOutEntry.idHashB64FromLong(id))); - } - - f.compose(v -> verifySuccessLogProduce(ctx)) - .onFailure(ex -> ctx.fail(ex)) - .onSuccess(logFile -> verifyLog(ctx, logFile, OptOutUtils.toArray(p), OptOutUtils.toArray(n))); - } - - @Test - public void shutdown_verifyLogProduced(TestContext ctx) { - verifySuccessLogProduceOnShutdown(ctx).onComplete(ctx.asyncAssertSuccess()); - } - - @Test - public void shutdownAfterEntriesAdded_verifyLogProduced(TestContext ctx) { - List fs = new ArrayList<>(); - for (long i = 0; i < 100; ++i) { - fs.add(verifyReceivedEntryAdd(ctx, OptOutEntry.idHashB64FromLong(i))); - } - - CompositeFuture.all(fs) - .compose(v -> verifySuccessLogProduceOnShutdown(ctx)) - .onComplete(ctx.asyncAssertSuccess()); - } - - @Test - public void shutdownBeforeEntriesAdded_verifyLogProduced(TestContext ctx) { - List fs = new ArrayList<>(); - fs.add(verifySuccessLogProduceOnShutdown(ctx)); - for (long i = 0; i < 100; ++i) { - fs.add(verifyReceivedEntryAdd(ctx, OptOutEntry.idHashB64FromLong(i))); - } - - CompositeFuture.all(fs) - .onComplete(ctx.asyncAssertSuccess()); - } - - @Test - public void shutdownWhileEntriesAdded_verifyLogProduced(TestContext ctx) { - List fs = new ArrayList<>(); - for (long i = 0; i < 100; ++i) { - fs.add(verifyReceivedEntryAdd(ctx, OptOutEntry.idHashB64FromLong(i))); - } - fs.add(verifySuccessLogProduceOnShutdown(ctx)); - for (long i = 0; i < 100; ++i) { - fs.add(verifyReceivedEntryAdd(ctx, OptOutEntry.idHashB64FromLong(100 + i))); - } - - CompositeFuture.all(fs) - .onComplete(ctx.asyncAssertSuccess()); - } - - @Test - public void internal_testLog(TestContext ctx) throws IOException { - long[] p = OptOutUtils.toLongArray(1, 2, 3); - long[] n = OptOutUtils.toLongArray(4, 5, 6); - String log = TestUtils.newDeltaFile(p); - verifyLog(ctx, log, p, n); - } - @Test public void internal_testSnapshot(TestContext ctx) throws IOException { long[] p = OptOutUtils.toLongArray(1, 2, 3); long[] n = OptOutUtils.toLongArray(4, 5, 6); String snapshot = TestUtils.newPartitionFile(p); - verifyLog(ctx, snapshot, p, n); - } - - private Future verifyReceivedEntryAdd(TestContext ctx, String identityHash) { - return this.verifyReceivedEntryAdd(ctx, identityHash, identityHash); - } - - private Future verifyReceivedEntryAdd(TestContext ctx, String identityHash, String advertisingId) { - Promise promise = Promise.promise(); - long ts = Instant.now().getEpochSecond(); - String msg = identityHash + "," + advertisingId + "," + String.valueOf(ts); - Async async = ctx.async(); - DeliveryOptions opts = new DeliveryOptions(); - opts.setSendTimeout(500); - vertx.eventBus().request(Const.Event.EntryAdd, msg, opts, ar -> { - // System.out.format("msg: %s, isFailed: %b\n", msg, ar.failed()); - if (ar.failed()) { - promise.complete(); - } else if (!ar.result().body().equals(true) && !ar.result().body().equals(false)) { - promise.fail("Unknonwn msg resp: " + ar.result().body()); - ctx.fail(); - } else { - promise.complete(); - } - async.complete(); - }); - return promise.future(); - } - - private Future verifySuccessEntryAdd(TestContext ctx, String identityHash) { - // using the same value for identity_hash and advertising_id - return this.verifySuccessEntryAdd(ctx, identityHash, identityHash); - } - - private Future verifySuccessEntryAdd(TestContext ctx, String identityHash, String advertisingId) { - Promise promise = Promise.promise(); - String msg = identityHash + "," + advertisingId + "," + Instant.now().getEpochSecond(); - Async async = ctx.async(); - vertx.eventBus().request(Const.Event.EntryAdd, msg, ar -> { - if (ar.failed()) { - promise.fail(ar.cause()); - ctx.fail(); - } else if (!ar.result().body().equals(true)) { - promise.fail("Unknonwn msg resp: " + ar.result()); - ctx.fail(); - } else { - promise.complete(); - } - async.complete(); - }); - return promise.future(); - } - - private Future verifySuccessLogProduce(TestContext ctx) { - Promise promise = Promise.promise(); - Async async = ctx.async(); - - MessageConsumer c = vertx.eventBus().consumer(Const.Event.DeltaProduced); - c.handler(m -> { - String newLog = m.body(); - ctx.assertTrue(newLog != null); - ctx.assertTrue(OptOutUtils.isDeltaFile(newLog)); - - Path newLogPath = Paths.get(newLog); - ctx.assertTrue(Files.exists(newLogPath)); - File newLogFile = new File(newLog); - long fileSize = newLogFile.length(); - - // file size must be multiples of optout entry size - ctx.assertTrue(fileSize > 0); - ctx.assertTrue(0 == (fileSize % OptOutConst.EntrySize)); - - c.unregister(); // unregister this consumer once validation is done - async.complete(); - promise.complete(newLog); - }); - - vertx.eventBus().send(Const.Event.DeltaProduce, null); - return promise.future(); - } - - private Future verifySuccessLogProduceOnShutdown(TestContext ctx) { - Promise promise = Promise.promise(); - Async async = ctx.async(); - - MessageConsumer c = vertx.eventBus().consumer(Const.Event.DeltaProduced); - c.handler(m -> { - String newLog = m.body(); - ctx.assertTrue(newLog != null); - ctx.assertTrue(OptOutUtils.isDeltaFile(newLog)); - - Path newLogPath = Paths.get(newLog); - ctx.assertTrue(Files.exists(newLogPath)); - File newLogFile = new File(newLog); - long fileSize = newLogFile.length(); - - // file size must be multiples of optout entry size - ctx.assertTrue(fileSize > 0); - ctx.assertTrue(0 == (fileSize % OptOutConst.EntrySize)); - - c.unregister(); // unregister this consumer once validation is done - async.complete(); - promise.complete(newLog); - }); - - vertx.deploymentIDs().forEach(vertx::undeploy); - return promise.future(); + verifySnapshot(ctx, snapshot, p, n); } private Future verifySuccessSnapshotProduce(TestContext ctx, String msg) { Promise promise = Promise.promise(); Async async = ctx.async(); - MessageConsumer c = vertx.eventBus().consumer(Const.Event.PartitionProduced); + MessageConsumer c = vertx.eventBus().consumer(TEST_UPLOAD_EVENT); c.handler(m -> { String newSnap = m.body(); ctx.assertTrue(newSnap != null); @@ -312,28 +83,6 @@ private Future verifySuccessSnapshotProduce(TestContext ctx, String msg) return promise.future(); } - private void verifyLog(TestContext ctx, String logFile, long[] positiveIds, long[] negativeIds) { - try { - Set pSet = OptOutUtils.toSet(positiveIds); - Set nSet = OptOutUtils.toSet(negativeIds); - byte[] data = Files.readAllBytes(Paths.get(logFile)); - OptOutCollection s = new OptOutCollection(data); - - s.forEach(e -> { - if (e.isSpecialHash()) return; - long idHash = e.idHashAsLong(); - long adsId = e.advertisingIdAsLong(); - System.out.format("check idHash %d, adsId %d\n", idHash, adsId); - ctx.assertTrue(pSet.contains(idHash)); - ctx.assertFalse(nSet.contains(idHash)); - ctx.assertTrue(pSet.contains(adsId)); - ctx.assertFalse(nSet.contains(adsId)); - }); - } catch (IOException ex) { - ctx.fail(ex); - } - } - private void verifySnapshot(TestContext ctx, String snapshotFile, long[] positiveIds, long[] negativeIds) { try { Set pSet = OptOutUtils.toSet(positiveIds); @@ -343,13 +92,11 @@ private void verifySnapshot(TestContext ctx, String snapshotFile, long[] positiv // verify all positive ids are contained in snapshot for (long id : positiveIds) { - // System.out.format("pos: %d\n", id); ctx.assertTrue(s.contains(OptOutEntry.idHashFromLong(id))); } // verify all negative ids are not contained in snapshot for (long id : negativeIds) { - // System.out.format("neg: %d\n", id); ctx.assertFalse(s.contains(OptOutEntry.idHashFromLong(id))); } diff --git a/src/test/java/com/uid2/optout/vertx/OptOutServiceVerticleTest.java b/src/test/java/com/uid2/optout/vertx/OptOutServiceVerticleTest.java index 28b03b27..386b8de7 100644 --- a/src/test/java/com/uid2/optout/vertx/OptOutServiceVerticleTest.java +++ b/src/test/java/com/uid2/optout/vertx/OptOutServiceVerticleTest.java @@ -2,12 +2,8 @@ import com.uid2.optout.Const; import com.uid2.optout.TestUtils; -import com.uid2.optout.web.QuorumWebClient; -import com.uid2.shared.audit.UidInstanceIdProvider; -import com.uid2.shared.optout.OptOutEntry; import com.uid2.shared.optout.OptOutUtils; import com.uid2.shared.vertx.VertxUtils; -import io.vertx.core.CompositeFuture; import io.vertx.core.Future; import io.vertx.core.Promise; import io.vertx.core.Vertx; @@ -22,27 +18,17 @@ import org.junit.runner.RunWith; import java.util.ArrayList; -import java.util.List; @RunWith(VertxUnitRunner.class) public class OptOutServiceVerticleTest { private static final String INTERNAL_TEST_KEY = "test-operator-key"; - private static final String INTERNAL_OPTOUT_KEY = "test-optout-operator-key"; private static Vertx vertx; - private static final UidInstanceIdProvider uidInstanceIdProvider = new UidInstanceIdProvider("test-instance", "id"); @BeforeClass public static void suiteSetup(TestContext context) throws Exception { vertx = Vertx.vertx(); JsonObject config = VertxUtils.getJsonConfig(vertx); - deployLogProducer(context, config) - .compose(v -> { - try { - return deployService(context, config); - } catch (Exception e) { - return Future.failedFuture(e); - } - }) + deployService(context, config) .onComplete(context.asyncAssertSuccess()); } @@ -51,24 +37,14 @@ public static void suiteTearDown(TestContext context) { vertx.close(context.asyncAssertSuccess()); } - - private static Future deployLogProducer(TestContext context, JsonObject config) throws Exception { + private static Future deployService(TestContext context, JsonObject config) throws Exception { Promise promise = Promise.promise(); - // set data_dir option to use tmpDir during test config .put(Const.Config.OptOutDataDirProp, OptOutUtils.tmpDir) - .put(Const.Config.OptOutInternalApiTokenProp, INTERNAL_TEST_KEY) - .put(Const.Config.OptOutReplicaUris, "http://127.0.0.1:8081/optout/write,http://127.0.0.1:8081/optout/write,http://127.0.0.1:8081/optout/write"); + .put(Const.Config.OptOutInternalApiTokenProp, INTERNAL_TEST_KEY); - OptOutLogProducer producer = TestUtils.createOptOutLogProducer(vertx, config); - vertx.deployVerticle(producer, ar -> promise.handle(ar)); - return promise.future(); - } - - private static Future deployService(TestContext context, JsonObject config) throws Exception { - Promise promise = Promise.promise(); - OptOutServiceVerticle svc = TestUtils.createOptOutService(vertx, config, uidInstanceIdProvider); + OptOutServiceVerticle svc = TestUtils.createOptOutService(vertx, config); vertx.deployVerticle(svc, ar -> { // set an empty cloud paths svc.setCloudPaths(new ArrayList<>()); @@ -77,142 +53,20 @@ private static Future deployService(TestContext context, JsonObject conf return promise.future(); } - @Test - public void writeNull_expect400(TestContext context) { - verifyStatus(context, writeQuery(OptOutUtils.nullHashBytes), 400); - } - - @Test - public void writeOnes_expect400(TestContext context) { - verifyStatus(context, writeQuery(OptOutUtils.onesHashBytes), 400); - } - - @Test - public void writeId_expect200(TestContext context) { - verifyStatus(context, writeQuery(100), 200); - } - @Test public void getHealthCheck_expectOK(TestContext context) { verifyStatus(context, Endpoints.OPS_HEALTHCHECK.toString(), 200); } @Test - public void writeMultiple_expect200(TestContext context) { - Future f = Future.succeededFuture(); - for (int i = 0; i < 3; ++i) { - final long id = 1 + i * 100; - f.compose(v -> verifyStatus(context, writeQuery(id), 200)); - } - f.onComplete(context.asyncAssertSuccess()); - } - - @Test - public void writeIdsSerial_expect200(TestContext context) { - Future f = Future.succeededFuture(); - for (int i = 0; i < 100; ++i) { - final long id = 100 + i; - f.compose(v -> verifyStatus(context, writeQuery(id), 200)); - } - f.onComplete(context.asyncAssertSuccess()); - } - - @Test - public void writeIdsParallel_expect200(TestContext context) { - List fs = new ArrayList(); - for (int i = 0; i < 100; ++i) { - final long id = 100 + i; - fs.add(verifyStatus(context, writeQuery(id), 200)); - } - CompositeFuture.all(fs).onComplete(context.asyncAssertSuccess()); - } - - // optout/add forwards request to remote optout/write api endpoints - @Test - public void replicateWithoutOptoutRole_expect401(TestContext context) { - verifyStatus(context, replicateQuery(234), 401); - } - @Test - public void replicate_expect200(TestContext context) { - verifyStatus(context, replicateQuery(234), 200, INTERNAL_OPTOUT_KEY); - } - - @Test - public void testQuorumClient_expectSuccess(TestContext context) { - String[] uris = new String[3]; - for (int i = 0; i < 3; ++i) { - uris[i] = String.format("http://127.0.0.1:%d%s", Const.Port.ServicePortForOptOut, Endpoints.OPTOUT_WRITE); - } - - QuorumWebClient quorumClient = new QuorumWebClient(vertx, uris, uidInstanceIdProvider); - quorumClient.get(req -> { - req.addQueryParam(OptOutServiceVerticle.IDENTITY_HASH, OptOutEntry.idHashB64FromLong(123)); - req.addQueryParam(OptOutServiceVerticle.ADVERTISING_ID, OptOutEntry.idHashB64FromLong(456)); - req.bearerTokenAuthentication(INTERNAL_TEST_KEY); - return req; - }).onComplete(context.asyncAssertSuccess()); - } - - @Test - public void testQuorumClient1Failure_expectSuccess(TestContext context) { - String[] uris = new String[3]; - for (int i = 0; i < 2; ++i) { - uris[i] = String.format("http://127.0.0.1:%d%s", Const.Port.ServicePortForOptOut, Endpoints.OPTOUT_WRITE); - } - uris[2] = "http://httpstat.us/404"; - - QuorumWebClient quorumClient = new QuorumWebClient(vertx, uris, uidInstanceIdProvider); - quorumClient.get(req -> { - req.addQueryParam(OptOutServiceVerticle.IDENTITY_HASH, OptOutEntry.idHashB64FromLong(123)); - req.addQueryParam(OptOutServiceVerticle.ADVERTISING_ID, OptOutEntry.idHashB64FromLong(456)); - req.bearerTokenAuthentication(INTERNAL_TEST_KEY); - return req; - }).onComplete(context.asyncAssertSuccess()); - } - - @Test - public void testQuorumClientAllFailures_expectSuccess(TestContext context) { - String[] uris = new String[3]; - for (int i = 0; i < 3; ++i) { - uris[i] = "http://httpstat.us/404"; - } - - QuorumWebClient quorumClient = new QuorumWebClient(vertx, uris, uidInstanceIdProvider); - quorumClient.get(req -> { - req.addQueryParam(OptOutServiceVerticle.IDENTITY_HASH, OptOutEntry.idHashB64FromLong(123)); - req.addQueryParam(OptOutServiceVerticle.ADVERTISING_ID, OptOutEntry.idHashB64FromLong(456)); - return req; - }).onComplete(context.asyncAssertFailure()); - } - - private String writeQuery(long id) { - return this.writeQuery(OptOutEntry.idHashB64FromLong(id)); - } - - private String writeQuery(String identityHashB64) { - return this.writeQuery(identityHashB64, identityHashB64); - } - - private String writeQuery(byte[] identityHash) { - return this.writeQuery(identityHash, identityHash); - } - - private String writeQuery(byte[] identityHash, byte[] advertisingId) { - return this.writeQuery(OptOutUtils.byteArrayToBase64String(identityHash), - OptOutUtils.byteArrayToBase64String(advertisingId)); - } - - private String writeQuery(String identityHashB64, String advertisingIdB64) { - return String.format("%s?%s=%s&%s=%s", Endpoints.OPTOUT_WRITE, - OptOutServiceVerticle.IDENTITY_HASH, - identityHashB64, - OptOutServiceVerticle.ADVERTISING_ID, - advertisingIdB64); + public void replicateWithoutAuth_expect401(TestContext context) { + verifyStatus(context, replicateQuery(234), 401, null); } private String replicateQuery(long id) { - return this.replicateQuery(OptOutEntry.idHashB64FromLong(id), - OptOutEntry.idHashB64FromLong(id)); + return this.replicateQuery( + com.uid2.shared.optout.OptOutEntry.idHashB64FromLong(id), + com.uid2.shared.optout.OptOutEntry.idHashB64FromLong(id)); } private String replicateQuery(String identityHashB64, String advertisingIdB64) { @@ -222,6 +76,7 @@ private String replicateQuery(String identityHashB64, String advertisingIdB64) { OptOutServiceVerticle.ADVERTISING_ID, advertisingIdB64); } + private Future verifyStatus(TestContext context, String pq, int status) { return verifyStatus(context, pq, status, INTERNAL_TEST_KEY); } @@ -232,15 +87,18 @@ private Future verifyStatus(TestContext context, String pq, int status, St int port = Const.Port.ServicePortForOptOut; vertx.createHttpClient() .request(HttpMethod.GET, port, "127.0.0.1", pq) - .compose(req -> req - .putHeader("Authorization", "Bearer " + token) - .send() + .compose(req -> { + if (token != null) { + req.putHeader("Authorization", "Bearer " + token); + } + return req.send() .compose(resp -> { context.assertEquals(status, resp.statusCode()); async.complete(); promise.complete(); return resp.body(); - })); + }); + }); return promise.future(); } } diff --git a/src/test/java/com/uid2/optout/vertx/OptOutSqsLogProducerTest.java b/src/test/java/com/uid2/optout/vertx/OptOutSqsLogProducerTest.java index 556f72ca..0f92be13 100644 --- a/src/test/java/com/uid2/optout/vertx/OptOutSqsLogProducerTest.java +++ b/src/test/java/com/uid2/optout/vertx/OptOutSqsLogProducerTest.java @@ -81,7 +81,7 @@ public void setup(Vertx vertx, VertxTestContext testContext) throws Exception { .put(Const.Config.TrafficCalcConfigPathProp, TRAFFIC_CALC_CONFIG_PATH) .put(Const.Config.ManualOverrideS3PathProp, MANUAL_OVERRIDE_S3_PATH) .put(Const.Config.OptOutS3BucketDroppedRequestsProp, TEST_BUCKET_DROPPED_REQUESTS) - .put(Const.Config.OptOutSqsS3FolderProp, S3_DELTA_PREFIX) + .put(Const.Config.OptOutS3FolderProp, S3_DELTA_PREFIX) .put(Const.Config.OptOutMaxMessagesPerFileProp, 100); // mock cloud sync to return proper s3 paths @@ -133,7 +133,7 @@ public void setup(Vertx vertx, VertxTestContext testContext) throws Exception { } // create producer with mocks - producer = new OptOutSqsLogProducer(config, cloudStorage, cloudStorageDroppedRequests, cloudSync, Const.Event.DeltaProduce, sqsClient); + producer = new OptOutSqsLogProducer(config, cloudStorage, cloudStorageDroppedRequests, cloudSync, Const.Event.DeltaProduced, sqsClient); // deploy verticle vertx.deployVerticle(producer, testContext.succeeding(id -> testContext.completeNow()));