Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 24 additions & 0 deletions src/main/java/com/uid2/shared/optout/OptOutCloudSync.java
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ public class OptOutCloudSync implements ICloudSync {
private static final Logger LOGGER = LoggerFactory.getLogger(OptOutCloudSync.class);

private final boolean fullSync;
private final boolean uploadOnly;
private final String cloudFolder;
private final String deltaConsumerDir;
private final String partitionConsumerDir;
Expand All @@ -48,7 +49,12 @@ public class OptOutCloudSync implements ICloudSync {
private AtomicReference<List<Consumer<Collection<String>>>> handlersNewCloudPaths = new AtomicReference<>(new ArrayList<>());

public OptOutCloudSync(JsonObject jsonConfig, boolean fullSync) {
this(jsonConfig, fullSync, false);
}

public OptOutCloudSync(JsonObject jsonConfig, boolean fullSync, boolean uploadOnly) {
this.fullSync = fullSync;
this.uploadOnly = uploadOnly;
this.cloudFolder = CloudUtils.normalizDirPath(jsonConfig.getString(Const.Config.OptOutS3FolderProp));
this.deltaConsumerDir = OptOutUtils.getDeltaConsumerDir(jsonConfig);
this.partitionConsumerDir = OptOutUtils.getPartitionConsumerDir(jsonConfig);
Expand Down Expand Up @@ -84,6 +90,19 @@ public OptOutCloudSync(JsonObject jsonConfig, boolean fullSync) {
this.mkdirsBlocking();
}

/**
* Creates an upload-only OptOutCloudSync instance.
* This skips all download/refresh operations.
*/
public static OptOutCloudSync createUploadOnly(JsonObject jsonConfig, boolean fullSync) {
return new OptOutCloudSync(jsonConfig, fullSync, true);
}

@Override
public boolean isUploadOnly() {
return this.uploadOnly;
}

@Override
public String toCloudPath(String path) {
if (OptOutUtils.isDeltaFile(path)) {
Expand Down Expand Up @@ -121,6 +140,11 @@ public String toLocalPath(String path) {

@Override
public boolean refresh(Instant now, ICloudStorage fsCloud, ICloudStorage fsLocal, Consumer<Set<String>> handleDownloads, Consumer<Set<String>> handleDeletes) throws CloudStorageException {
// In upload-only mode, skip all download/sync operations
if (uploadOnly) {
return true;
}

// list local cached paths
List<String> cachedPathList = new ArrayList<>();
localListFiles(fsLocal, this.deltaConsumerDir, OptOutUtils.prefixDeltaFile, cachedPathList);
Expand Down
28 changes: 21 additions & 7 deletions src/main/java/com/uid2/shared/vertx/CloudSyncVerticle.java
Original file line number Diff line number Diff line change
Expand Up @@ -167,19 +167,29 @@ public void start(Promise<Void> promise) {
this.uploadExecutor = vertx.createSharedWorkerExecutor("cloudsync-" + name + "-upload-pool",
this.uploadThreads);

// handle refresh event
vertx.eventBus().consumer(
eventRefresh,
o -> this.handleRefresh(o));
// handle refresh event (skip if upload-only)
if (!cloudSync.isUploadOnly()) {
vertx.eventBus().consumer(
eventRefresh,
o -> this.handleRefresh(o));
} else {
LOGGER.info("CloudSyncVerticle." + name + " is upload-only, skipping refresh event handler registration");
}

// upload to cloud
vertx.eventBus().<String>consumer(
this.eventUpload,
msg -> this.handleUpload(msg));

cloudRefresh()
.onFailure(t -> LOGGER.error("cloudRefresh failed: " + t.getMessage(), new Exception(t)))
.onComplete(ar -> promise.handle(ar));
// initial refresh (skip if upload-only)
if (!cloudSync.isUploadOnly()) {
cloudRefresh()
.onFailure(t -> LOGGER.error("cloudRefresh failed: " + t.getMessage(), new Exception(t)))
.onComplete(ar -> promise.handle(ar));
} else {
LOGGER.info("CloudSyncVerticle." + name + " is upload-only, skipping initial refresh");
promise.complete();
}

promise.future()
.onSuccess(v -> {
Expand Down Expand Up @@ -214,6 +224,10 @@ public String eventDownloaded() {
}

private void handleRefresh(Message m) {
if (cloudSync.isUploadOnly()) {
LOGGER.warn("handleRefresh called but this is upload-only mode");
return;
}
cloudRefresh()
.onSuccess(t -> this.storeRefreshIsFailing.set(0))
.onFailure(t -> {
Expand Down
9 changes: 9 additions & 0 deletions src/main/java/com/uid2/shared/vertx/ICloudSync.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,4 +15,13 @@ public interface ICloudSync {
boolean refresh(Instant now, ICloudStorage fsCloud, ICloudStorage fsLocal,
Consumer<Set<String>> handleDownloads, Consumer<Set<String>> handleDeletes)
throws CloudStorageException;

/**
* Returns true if this CloudSync instance only supports upload operations.
* When true, download/refresh operations will be skipped.
* Default is false for backward compatibility.
*/
default boolean isUploadOnly() {
return false;
}
}
Loading