diff --git a/src/main/java/com/uid2/shared/optout/OptOutCloudSync.java b/src/main/java/com/uid2/shared/optout/OptOutCloudSync.java index d97153ec..4aa7f118 100644 --- a/src/main/java/com/uid2/shared/optout/OptOutCloudSync.java +++ b/src/main/java/com/uid2/shared/optout/OptOutCloudSync.java @@ -27,6 +27,7 @@ public class OptOutCloudSync implements ICloudSync { private static final Logger LOGGER = LoggerFactory.getLogger(OptOutCloudSync.class); private final boolean fullSync; + private final boolean uploadOnly; private final String cloudFolder; private final String deltaConsumerDir; private final String partitionConsumerDir; @@ -48,7 +49,12 @@ public class OptOutCloudSync implements ICloudSync { private AtomicReference>>> handlersNewCloudPaths = new AtomicReference<>(new ArrayList<>()); public OptOutCloudSync(JsonObject jsonConfig, boolean fullSync) { + this(jsonConfig, fullSync, false); + } + + public OptOutCloudSync(JsonObject jsonConfig, boolean fullSync, boolean uploadOnly) { this.fullSync = fullSync; + this.uploadOnly = uploadOnly; this.cloudFolder = CloudUtils.normalizDirPath(jsonConfig.getString(Const.Config.OptOutS3FolderProp)); this.deltaConsumerDir = OptOutUtils.getDeltaConsumerDir(jsonConfig); this.partitionConsumerDir = OptOutUtils.getPartitionConsumerDir(jsonConfig); @@ -84,6 +90,19 @@ public OptOutCloudSync(JsonObject jsonConfig, boolean fullSync) { this.mkdirsBlocking(); } + /** + * Creates an upload-only OptOutCloudSync instance. + * This skips all download/refresh operations. + */ + public static OptOutCloudSync createUploadOnly(JsonObject jsonConfig, boolean fullSync) { + return new OptOutCloudSync(jsonConfig, fullSync, true); + } + + @Override + public boolean isUploadOnly() { + return this.uploadOnly; + } + @Override public String toCloudPath(String path) { if (OptOutUtils.isDeltaFile(path)) { @@ -121,6 +140,11 @@ public String toLocalPath(String path) { @Override public boolean refresh(Instant now, ICloudStorage fsCloud, ICloudStorage fsLocal, Consumer> handleDownloads, Consumer> handleDeletes) throws CloudStorageException { + // In upload-only mode, skip all download/sync operations + if (uploadOnly) { + return true; + } + // list local cached paths List cachedPathList = new ArrayList<>(); localListFiles(fsLocal, this.deltaConsumerDir, OptOutUtils.prefixDeltaFile, cachedPathList); diff --git a/src/main/java/com/uid2/shared/vertx/CloudSyncVerticle.java b/src/main/java/com/uid2/shared/vertx/CloudSyncVerticle.java index 9352c222..e0d77b11 100644 --- a/src/main/java/com/uid2/shared/vertx/CloudSyncVerticle.java +++ b/src/main/java/com/uid2/shared/vertx/CloudSyncVerticle.java @@ -167,19 +167,29 @@ public void start(Promise promise) { this.uploadExecutor = vertx.createSharedWorkerExecutor("cloudsync-" + name + "-upload-pool", this.uploadThreads); - // handle refresh event - vertx.eventBus().consumer( - eventRefresh, - o -> this.handleRefresh(o)); + // handle refresh event (skip if upload-only) + if (!cloudSync.isUploadOnly()) { + vertx.eventBus().consumer( + eventRefresh, + o -> this.handleRefresh(o)); + } else { + LOGGER.info("CloudSyncVerticle." + name + " is upload-only, skipping refresh event handler registration"); + } // upload to cloud vertx.eventBus().consumer( this.eventUpload, msg -> this.handleUpload(msg)); - cloudRefresh() - .onFailure(t -> LOGGER.error("cloudRefresh failed: " + t.getMessage(), new Exception(t))) - .onComplete(ar -> promise.handle(ar)); + // initial refresh (skip if upload-only) + if (!cloudSync.isUploadOnly()) { + cloudRefresh() + .onFailure(t -> LOGGER.error("cloudRefresh failed: " + t.getMessage(), new Exception(t))) + .onComplete(ar -> promise.handle(ar)); + } else { + LOGGER.info("CloudSyncVerticle." + name + " is upload-only, skipping initial refresh"); + promise.complete(); + } promise.future() .onSuccess(v -> { @@ -214,6 +224,10 @@ public String eventDownloaded() { } private void handleRefresh(Message m) { + if (cloudSync.isUploadOnly()) { + LOGGER.warn("handleRefresh called but this is upload-only mode"); + return; + } cloudRefresh() .onSuccess(t -> this.storeRefreshIsFailing.set(0)) .onFailure(t -> { diff --git a/src/main/java/com/uid2/shared/vertx/ICloudSync.java b/src/main/java/com/uid2/shared/vertx/ICloudSync.java index 05997255..8f11f70f 100644 --- a/src/main/java/com/uid2/shared/vertx/ICloudSync.java +++ b/src/main/java/com/uid2/shared/vertx/ICloudSync.java @@ -15,4 +15,13 @@ public interface ICloudSync { boolean refresh(Instant now, ICloudStorage fsCloud, ICloudStorage fsLocal, Consumer> handleDownloads, Consumer> handleDeletes) throws CloudStorageException; + + /** + * Returns true if this CloudSync instance only supports upload operations. + * When true, download/refresh operations will be skipped. + * Default is false for backward compatibility. + */ + default boolean isUploadOnly() { + return false; + } }