> callables = new ArrayList<>(PAGE_SIZE);
+ for (int i = 0; i < PAGE_SIZE && tilesDeletionsIterator.hasNext(); i++) {
+ callables.add(tilesDeletionsIterator.next());
+ }
+
+ deleteManager.executeParallel(callables);
+ }
+ }
+
@Override
public boolean get(TileObject obj) throws StorageException {
final String key = keyBuilder.forTile(obj);
@@ -373,7 +417,7 @@ public boolean rename(String oldLayerName, String newLayerName) throws StorageEx
// revisit: this seems to hold true only for GeoServerTileLayer, "standalone" TileLayers
// return getName() from getId(), as in AbstractTileLayer. Unfortunately the only option
// for non-GeoServerTileLayers would be copy and delete. Expensive.
- log.fine("No need to rename layers, AzureBlobStore uses layer id as key root");
+ LOG.fine("No need to rename layers, AzureBlobStore uses layer id as key root");
if (client.prefixExists(oldLayerName)) {
listeners.sendLayerRenamed(oldLayerName, newLayerName);
}
diff --git a/geowebcache/azureblob/src/main/java/org/geowebcache/azure/AzureClient.java b/geowebcache/azureblob/src/main/java/org/geowebcache/azure/AzureClient.java
index 69ee385c2..994ae1979 100644
--- a/geowebcache/azureblob/src/main/java/org/geowebcache/azure/AzureClient.java
+++ b/geowebcache/azureblob/src/main/java/org/geowebcache/azure/AzureClient.java
@@ -27,6 +27,8 @@
import com.azure.storage.blob.BlobContainerClient;
import com.azure.storage.blob.BlobServiceClient;
import com.azure.storage.blob.BlobServiceClientBuilder;
+import com.azure.storage.blob.batch.BlobBatchClient;
+import com.azure.storage.blob.batch.BlobBatchClientBuilder;
import com.azure.storage.blob.models.BlobDownloadContentResponse;
import com.azure.storage.blob.models.BlobHttpHeaders;
import com.azure.storage.blob.models.BlobItem;
@@ -56,6 +58,7 @@ public class AzureClient {
private AzureBlobStoreData configuration;
private final BlobContainerClient container;
+ private final BlobBatchClient batch;
public AzureClient(AzureBlobStoreData configuration) throws StorageException {
this.configuration = configuration;
@@ -65,6 +68,7 @@ public AzureClient(AzureBlobStoreData configuration) throws StorageException {
String containerName = configuration.getContainer();
this.container = getOrCreateContainer(serviceClient, containerName);
+ this.batch = new BlobBatchClientBuilder(serviceClient).buildClient();
} catch (StorageException e) {
throw e;
} catch (RuntimeException e) {
@@ -285,6 +289,10 @@ public BlobContainerClient getContainer() {
return container;
}
+ public BlobBatchClient getBatch() {
+ return batch;
+ }
+
public boolean deleteBlob(String key) {
BlockBlobClient metadata = getBlockBlobClient(key);
return metadata.deleteIfExists();
diff --git a/geowebcache/azureblob/src/main/java/org/geowebcache/azure/DeleteManager.java b/geowebcache/azureblob/src/main/java/org/geowebcache/azure/DeleteManager.java
index b1795be83..4e94e8531 100644
--- a/geowebcache/azureblob/src/main/java/org/geowebcache/azure/DeleteManager.java
+++ b/geowebcache/azureblob/src/main/java/org/geowebcache/azure/DeleteManager.java
@@ -13,11 +13,11 @@
*/
package org.geowebcache.azure;
-import static org.geowebcache.azure.AzureBlobStore.log;
-
import com.azure.core.http.rest.PagedResponse;
import com.azure.storage.blob.BlobContainerClient;
+import com.azure.storage.blob.batch.BlobBatchClient;
import com.azure.storage.blob.models.BlobItem;
+import com.azure.storage.blob.models.DeleteSnapshotsOptionType;
import com.azure.storage.blob.models.ListBlobsOptions;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import java.io.Closeable;
@@ -26,20 +26,22 @@
import java.time.OffsetDateTime;
import java.util.ArrayList;
import java.util.HashSet;
+import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ThreadFactory;
-import java.util.function.Predicate;
import java.util.logging.Level;
+import java.util.logging.Logger;
import java.util.stream.Collectors;
+import java.util.stream.Stream;
+import org.geotools.util.logging.Logging;
import org.geowebcache.GeoWebCacheException;
import org.geowebcache.locks.LockProvider;
import org.geowebcache.locks.LockProvider.Lock;
@@ -50,24 +52,28 @@
* Class handling deletes, which are normally handled in an asynchronous way in all other stores as well. These are bulk
* operations like deleting an entire layer, or a rangeset, with potentially million of tiles involved.
*
- * Unfortunately the Azure BLOB API has no concept of bulk delete, and no concept of containment either, so tiles
- * have to be enumerated one by one and a delete issued on each one. This calls for a parallel execution, and requires
- * avoiding accumulation of references to all tiles that need removing in memory, as they could be millions or more,
- * hence code that tries to run over the tiles in pages
+ *
Using {@link BlobBatchClient}, tile URLs are partitioned into batches (max 256 per batch) to issue bulk delete
+ * requests, reducing network overhead, while keeping memory usage low.
*/
class DeleteManager implements Closeable {
+
+ private static final Logger LOG = Logging.getLogger(AzureBlobStore.class.getName());
+
/**
- * the page size here is not about limiting the requests, but ensures that we don't end up using too much memory
- * while processing millions of tiles, that would be otherwise all queued on the {@link ExecutorService}
+ * To manage blobs in batch through {@link BlobBatchClient}, we are limited to 256 blobs per request.
+ *
+ * @see Azure
+ * Blob Batch REST API
*/
- static final int PAGE_SIZE = 1000;
+ static final int PAGE_SIZE = 256;
private final TMSKeyBuilder keyBuilder;
private final AzureClient client;
private final LockProvider locks;
private final int concurrency;
- private ExecutorService deleteExecutor;
- private Map pendingDeletesKeyTime = new ConcurrentHashMap<>();
+ private final ExecutorService deleteExecutor;
+ private final Map pendingDeletesKeyTime = new ConcurrentHashMap<>();
public DeleteManager(AzureClient client, LockProvider locks, TMSKeyBuilder keyBuilder, int maxConnections) {
this.keyBuilder = keyBuilder;
@@ -107,12 +113,12 @@ public void executeParallel(List> callables) throws StorageException
}
}
- /** Executes the removal of the specified keys in a parallel fashion, returning the number of removed keys */
- public Long deleteParallel(List keys) throws StorageException {
+ /** Executes the removal of the specified blobs in a streamed fashion, returning the number of removed keys */
+ public Long deleteStreamed(Stream blobs) throws StorageException {
try {
- return new KeysBulkDelete(keys).call();
+ return new KeysBulkDelete(blobs.map(BlobItem::getName)).call();
} catch (Exception e) {
- throw new StorageException("Failed to submit parallel keys execution", e);
+ throw new StorageException("Failed to submit keys deletions", e);
}
}
@@ -121,7 +127,7 @@ public boolean scheduleAsyncDelete(final String prefix) throws StorageException
String msg = String.format(
"Issuing bulk delete on '%s/%s' for objects older than %d",
client.getContainerName(), prefix, timestamp);
- log.info(msg);
+ LOG.info(msg);
try {
Lock lock = locks.getLock(prefix);
@@ -157,8 +163,8 @@ public void issuePendingBulkDeletes() throws StorageException {
for (Map.Entry