diff --git a/geowebcache/azureblob/pom.xml b/geowebcache/azureblob/pom.xml index 6c7fb1d4f..feb4eb083 100644 --- a/geowebcache/azureblob/pom.xml +++ b/geowebcache/azureblob/pom.xml @@ -43,6 +43,11 @@ azure-storage-blob ${azure.version} + + com.azure + azure-storage-blob-batch + ${azure.version} + org.apache.logging.log4j diff --git a/geowebcache/azureblob/src/main/java/org/geowebcache/azure/AzureBlobStore.java b/geowebcache/azureblob/src/main/java/org/geowebcache/azure/AzureBlobStore.java index 14d105cba..530fbafd6 100644 --- a/geowebcache/azureblob/src/main/java/org/geowebcache/azure/AzureBlobStore.java +++ b/geowebcache/azureblob/src/main/java/org/geowebcache/azure/AzureBlobStore.java @@ -15,6 +15,7 @@ import static com.google.common.base.Preconditions.checkNotNull; import static java.util.Objects.isNull; +import static org.geowebcache.azure.DeleteManager.PAGE_SIZE; import com.azure.core.util.BinaryData; import com.azure.storage.blob.models.BlobDownloadContentResponse; @@ -22,12 +23,11 @@ import com.azure.storage.blob.models.BlobProperties; import com.azure.storage.blob.models.BlobStorageException; import com.azure.storage.blob.specialized.BlockBlobClient; -import com.google.common.collect.AbstractIterator; -import com.google.common.collect.Iterators; import java.io.IOException; import java.io.InputStream; import java.io.UncheckedIOException; import java.time.OffsetDateTime; +import java.util.ArrayList; import java.util.Iterator; import java.util.List; import java.util.Map; @@ -35,7 +35,10 @@ import java.util.Properties; import java.util.concurrent.Callable; import java.util.logging.Logger; +import java.util.regex.Matcher; +import java.util.regex.Pattern; import java.util.stream.Collectors; +import java.util.stream.IntStream; import java.util.stream.Stream; import javax.annotation.Nullable; import org.geotools.util.logging.Logging; @@ -53,13 +56,14 @@ import org.geowebcache.storage.StorageException; import org.geowebcache.storage.TileObject; import org.geowebcache.storage.TileRange; -import org.geowebcache.storage.TileRangeIterator; import org.geowebcache.util.TMSKeyBuilder; import org.springframework.http.HttpStatus; public class AzureBlobStore implements BlobStore { - static Logger log = Logging.getLogger(AzureBlobStore.class.getName()); + private static final Logger LOG = Logging.getLogger(AzureBlobStore.class.getName()); + + private static final Pattern TILE_BLOB_NAME_REGEXP = Pattern.compile("(?\\d+)/(?\\d+)/(?\\d+)\\.\\w+$"); private final TMSKeyBuilder keyBuilder; private final BlobStoreListenerList listeners = new BlobStoreListenerList(); @@ -190,58 +194,98 @@ public boolean delete(TileRange tileRange) throws StorageException { return false; } - // open an iterator oer tile locations, to avoid memory accumulation - final Iterator tileLocations = new AbstractIterator<>() { - - // TileRange iterator with 1x1 meta tiling factor - private TileRangeIterator trIter = new TileRangeIterator(tileRange, new int[] {1, 1}); - - @Override - protected long[] computeNext() { - long[] gridLoc = trIter.nextMetaGridLocation(new long[3]); - return gridLoc == null ? endOfData() : gridLoc; - } - }; + Stream blobsToDelete = findTileBlobsToDelete(tileRange, coordsPrefix); - // if no listeners, we don't need to gather extra tile info, use a dedicated fast path if (listeners.isEmpty()) { // if there are no listeners, don't bother requesting every tile // metadata to notify the listeners - Iterator keysIterator = Iterators.transform( - tileLocations, tl -> keyBuilder.forLocation(coordsPrefix, tl, tileRange.getMimeType())); - // split the iteration in parts to avoid memory accumulation - Iterator> partition = Iterators.partition(keysIterator, DeleteManager.PAGE_SIZE); - - while (partition.hasNext() && !shutDown) { - List locations = partition.next(); - deleteManager.deleteParallel(locations); + if (!shutDown) { + deleteManager.deleteStreamed(blobsToDelete); } - } else { // if we need to gather info, we'll end up just calling "delete" on each tile // this is run here instead of inside the delete manager as we need high level info // about tiles, e.g., TileObject, to inform the listeners - String layerName = tileRange.getLayerName(); - String gridSetId = tileRange.getGridSetId(); - String format = tileRange.getMimeType().getFormat(); - Map parameters = tileRange.getParameters(); - - Iterator> tilesIterator = Iterators.transform(tileLocations, xyz -> { - TileObject tile = TileObject.createQueryTileObject(layerName, xyz, gridSetId, format, parameters); + Stream> tilesDeletions = blobsToDelete.map(blobItem -> { + TileObject tile = createTileObject(blobItem, tileRange); tile.setParametersId(tileRange.getParametersId()); - return (Callable) () -> delete(tile); + return () -> delete(tile); }); - Iterator>> partition = Iterators.partition(tilesIterator, DeleteManager.PAGE_SIZE); - // once a page of callables is ready, run them in parallel on the delete manager - while (partition.hasNext() && !shutDown) { - deleteManager.executeParallel(partition.next()); - } + executeParallelDeletions(tilesDeletions); } return true; } + private Stream findTileBlobsToDelete(TileRange tileRange, String coordsPrefix) { + return IntStream.rangeClosed(tileRange.getZoomStart(), tileRange.getZoomStop()) + .boxed() + .flatMap(zoom -> { + String zoomPrefix = coordsPrefix + "/" + zoom; + + if (!client.prefixExists(zoomPrefix)) { + // empty level, skipping + return Stream.empty(); + } + + long[] rangeBoundsAtZoom = tileRange.rangeBounds(zoom); + + return client.listBlobs(zoomPrefix) + .filter(tb -> + TILE_BLOB_NAME_REGEXP.matcher(tb.getName()).find()) + .filter(tb -> isTileBlobInBounds(tb, rangeBoundsAtZoom)); + }); + } + + private boolean isTileBlobInBounds(BlobItem tileBlob, long[] bounds) { + long minX = bounds[0]; + long minY = bounds[1]; + long maxX = bounds[2]; + long maxY = bounds[3]; + + long[] index = extractTileIndex(tileBlob); + long tileX = index[0]; + long tileY = index[1]; + + return tileX >= minX && tileX <= maxX && tileY >= minY && tileY <= maxY; + } + + private TileObject createTileObject(BlobItem blobItem, TileRange tileRange) { + String layerName = tileRange.getLayerName(); + String gridSetId = tileRange.getGridSetId(); + String format = tileRange.getMimeType().getFormat(); + Map parameters = tileRange.getParameters(); + return TileObject.createQueryTileObject(layerName, extractTileIndex(blobItem), gridSetId, format, parameters); + } + + private long[] extractTileIndex(BlobItem blobItem) { + Matcher matcher = TILE_BLOB_NAME_REGEXP.matcher(blobItem.getName()); + + if (!matcher.find()) { + throw new IllegalArgumentException("Invalid tile blob name"); + } + + return new long[] { + Long.parseLong(matcher.group("x")), Long.parseLong(matcher.group("y")), Long.parseLong(matcher.group("z")) + }; + } + + private void executeParallelDeletions(Stream> tilesDeletions) throws StorageException { + Iterator> tilesDeletionsIterator = tilesDeletions.iterator(); + + while (tilesDeletionsIterator.hasNext() && !shutDown) { + + // once a page of callables is ready, run them in parallel on the delete manager + List> callables = new ArrayList<>(PAGE_SIZE); + for (int i = 0; i < PAGE_SIZE && tilesDeletionsIterator.hasNext(); i++) { + callables.add(tilesDeletionsIterator.next()); + } + + deleteManager.executeParallel(callables); + } + } + @Override public boolean get(TileObject obj) throws StorageException { final String key = keyBuilder.forTile(obj); @@ -373,7 +417,7 @@ public boolean rename(String oldLayerName, String newLayerName) throws StorageEx // revisit: this seems to hold true only for GeoServerTileLayer, "standalone" TileLayers // return getName() from getId(), as in AbstractTileLayer. Unfortunately the only option // for non-GeoServerTileLayers would be copy and delete. Expensive. - log.fine("No need to rename layers, AzureBlobStore uses layer id as key root"); + LOG.fine("No need to rename layers, AzureBlobStore uses layer id as key root"); if (client.prefixExists(oldLayerName)) { listeners.sendLayerRenamed(oldLayerName, newLayerName); } diff --git a/geowebcache/azureblob/src/main/java/org/geowebcache/azure/AzureClient.java b/geowebcache/azureblob/src/main/java/org/geowebcache/azure/AzureClient.java index e2ca06ac8..b60ba8330 100644 --- a/geowebcache/azureblob/src/main/java/org/geowebcache/azure/AzureClient.java +++ b/geowebcache/azureblob/src/main/java/org/geowebcache/azure/AzureClient.java @@ -26,6 +26,8 @@ import com.azure.storage.blob.BlobContainerClient; import com.azure.storage.blob.BlobServiceClient; import com.azure.storage.blob.BlobServiceClientBuilder; +import com.azure.storage.blob.batch.BlobBatchClient; +import com.azure.storage.blob.batch.BlobBatchClientBuilder; import com.azure.storage.blob.models.BlobDownloadContentResponse; import com.azure.storage.blob.models.BlobHttpHeaders; import com.azure.storage.blob.models.BlobItem; @@ -55,6 +57,7 @@ public class AzureClient { private AzureBlobStoreData configuration; private final BlobContainerClient container; + private final BlobBatchClient batch; public AzureClient(AzureBlobStoreData configuration) throws StorageException { this.configuration = configuration; @@ -64,6 +67,7 @@ public AzureClient(AzureBlobStoreData configuration) throws StorageException { String containerName = configuration.getContainer(); this.container = getOrCreateContainer(serviceClient, containerName); + this.batch = new BlobBatchClientBuilder(serviceClient).buildClient(); } catch (StorageException e) { throw e; } catch (RuntimeException e) { @@ -284,6 +288,10 @@ public BlobContainerClient getContainer() { return container; } + public BlobBatchClient getBatch() { + return batch; + } + public boolean deleteBlob(String key) { BlockBlobClient metadata = getBlockBlobClient(key); return metadata.deleteIfExists(); diff --git a/geowebcache/azureblob/src/main/java/org/geowebcache/azure/DeleteManager.java b/geowebcache/azureblob/src/main/java/org/geowebcache/azure/DeleteManager.java index f3be93cff..98c06ce7c 100644 --- a/geowebcache/azureblob/src/main/java/org/geowebcache/azure/DeleteManager.java +++ b/geowebcache/azureblob/src/main/java/org/geowebcache/azure/DeleteManager.java @@ -13,11 +13,11 @@ */ package org.geowebcache.azure; -import static org.geowebcache.azure.AzureBlobStore.log; - import com.azure.core.http.rest.PagedResponse; import com.azure.storage.blob.BlobContainerClient; +import com.azure.storage.blob.batch.BlobBatchClient; import com.azure.storage.blob.models.BlobItem; +import com.azure.storage.blob.models.DeleteSnapshotsOptionType; import com.azure.storage.blob.models.ListBlobsOptions; import com.google.common.util.concurrent.ThreadFactoryBuilder; import java.io.Closeable; @@ -26,20 +26,22 @@ import java.time.OffsetDateTime; import java.util.ArrayList; import java.util.HashSet; +import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Properties; import java.util.Set; import java.util.concurrent.Callable; import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; import java.util.concurrent.ThreadFactory; -import java.util.function.Predicate; import java.util.logging.Level; +import java.util.logging.Logger; import java.util.stream.Collectors; +import java.util.stream.Stream; +import org.geotools.util.logging.Logging; import org.geowebcache.GeoWebCacheException; import org.geowebcache.locks.LockProvider; import org.geowebcache.locks.LockProvider.Lock; @@ -50,24 +52,28 @@ * Class handling deletes, which are normally handled in an asynchronous way in all other stores as well. These are bulk * operations like deleting an entire layer, or a rangeset, with potentially million of tiles involved. * - *

Unfortunately the Azure BLOB API has no concept of bulk delete, and no concept of containment either, so tiles - * have to be enumerated one by one and a delete issued on each one. This calls for a parallel execution, and requires - * avoiding accumulation of references to all tiles that need removing in memory, as they could be millions or more, - * hence code that tries to run over the tiles in pages + *

Using {@link BlobBatchClient}, tile URLs are partitioned into batches (max 256 per batch) to issue bulk delete + * requests, reducing network overhead, while keeping memory usage low. */ class DeleteManager implements Closeable { + + private static final Logger LOG = Logging.getLogger(AzureBlobStore.class.getName()); + /** - * the page size here is not about limiting the requests, but ensures that we don't end up using too much memory - * while processing millions of tiles, that would be otherwise all queued on the {@link ExecutorService} + * To manage blobs in batch through {@link BlobBatchClient}, we are limited to 256 blobs per request. + * + * @see Azure + * Blob Batch REST API */ - static final int PAGE_SIZE = 1000; + static final int PAGE_SIZE = 256; private final TMSKeyBuilder keyBuilder; private final AzureClient client; private final LockProvider locks; private final int concurrency; - private ExecutorService deleteExecutor; - private Map pendingDeletesKeyTime = new ConcurrentHashMap<>(); + private final ExecutorService deleteExecutor; + private final Map pendingDeletesKeyTime = new ConcurrentHashMap<>(); public DeleteManager(AzureClient client, LockProvider locks, TMSKeyBuilder keyBuilder, int maxConnections) { this.keyBuilder = keyBuilder; @@ -107,12 +113,12 @@ public void executeParallel(List> callables) throws StorageException } } - /** Executes the removal of the specified keys in a parallel fashion, returning the number of removed keys */ - public Long deleteParallel(List keys) throws StorageException { + /** Executes the removal of the specified blobs in a streamed fashion, returning the number of removed keys */ + public Long deleteStreamed(Stream blobs) throws StorageException { try { - return new KeysBulkDelete(keys).call(); + return new KeysBulkDelete(blobs.map(BlobItem::getName)).call(); } catch (Exception e) { - throw new StorageException("Failed to submit parallel keys execution", e); + throw new StorageException("Failed to submit keys deletions", e); } } @@ -120,7 +126,7 @@ public boolean scheduleAsyncDelete(final String prefix) throws StorageException final long timestamp = currentTimeSeconds(); String msg = "Issuing bulk delete on '%s/%s' for objects older than %d" .formatted(client.getContainerName(), prefix, timestamp); - log.info(msg); + LOG.info(msg); try { Lock lock = locks.getLock(prefix); @@ -157,8 +163,8 @@ public void issuePendingBulkDeletes() throws StorageException { for (Map.Entry e : deletes.entrySet()) { final String prefix = e.getKey().toString(); final long timestamp = Long.parseLong(e.getValue().toString()); - if (log.isLoggable(Level.INFO)) - log.info("Restarting pending bulk delete on '%s/%s':%d" + if (LOG.isLoggable(Level.INFO)) + LOG.info("Restarting pending bulk delete on '%s/%s':%d" .formatted(client.getContainerName(), prefix, timestamp)); if (!asyncDelete(prefix, timestamp)) { deletesToClear.add(prefix); @@ -186,7 +192,7 @@ public synchronized boolean asyncDelete(String prefix, long timestamp) { // is there any task already deleting a larger set of times in the same prefix // folder? Long currentTaskTime = pendingDeletesKeyTime.get(prefix); - if (currentTaskTime != null && currentTaskTime.longValue() > timestamp) { + if (currentTaskTime != null && currentTaskTime > timestamp) { return false; } @@ -216,8 +222,8 @@ public Long call() throws Exception { long count = 0L; try { checkInterrupted(); - if (log.isLoggable(Level.INFO)) - log.info("Running bulk delete on '%s/%s':%d" + if (LOG.isLoggable(Level.INFO)) + LOG.info("Running bulk delete on '%s/%s':%d" .formatted(client.getContainerName(), prefix, timestamp)); BlobContainerClient container = client.getContainer(); @@ -228,19 +234,26 @@ public Long call() throws Exception { Iterable> response = container.listBlobs(options, null).iterableByPage(); + BlobBatchClient batch = client.getBatch(); + for (PagedResponse segment : response) { try (PagedResponse s = segment) { // try-with-resources to please PMD checkInterrupted(); - List items = s.getValue(); - count += deleteItems(container, items, this::equalOrAfter); + + List items = s.getValue().stream() + .filter(this::equalOrAfter) + .map(BlobItem::getName) + .collect(Collectors.toList()); + + count += deleteItems(container, batch, items); } } } catch (InterruptedException | IllegalStateException e) { - log.info("Azure bulk delete aborted for '%s/%s'. Will resume on next startup." + LOG.info("Azure bulk delete aborted for '%s/%s'. Will resume on next startup." .formatted(client.getContainerName(), prefix)); throw e; } catch (RuntimeException e) { - log.log( + LOG.log( Level.WARNING, "Unknown error performing bulk Azure blobs delete of '%s/%s'" .formatted(client.getContainerName(), prefix), @@ -248,8 +261,8 @@ public Long call() throws Exception { throw e; } - if (log.isLoggable(Level.INFO)) - log.info("Finished bulk delete on '%s/%s':%d. %d objects deleted" + if (LOG.isLoggable(Level.INFO)) + LOG.info("Finished bulk delete on '%s/%s':%d. %d objects deleted" .formatted(client.getContainerName(), prefix, timestamp, count)); clearPendingBulkDelete(prefix, timestamp); @@ -268,7 +281,7 @@ private void clearPendingBulkDelete(final String prefix, final long timestamp) t return; // someone else cleared it up for us. A task that run after this one but // finished before? } - if (taskTime.longValue() > timestamp) { + if (taskTime > timestamp) { return; // someone else issued a bulk delete after this one for the same key prefix } final String pendingDeletesKey = keyBuilder.pendingDeletes(); @@ -280,8 +293,8 @@ private void clearPendingBulkDelete(final String prefix, final long timestamp) t long storedTimestamp = storedVal == null ? Long.MIN_VALUE : Long.parseLong(storedVal); if (timestamp >= storedTimestamp) { client.putProperties(pendingDeletesKey, deletes); - } else if (log.isLoggable(Level.INFO)) { - log.info("bulk delete finished but there's a newer one ongoing for container '%s/%s'" + } else if (LOG.isLoggable(Level.INFO)) { + LOG.info("bulk delete finished but there's a newer one ongoing for container '%s/%s'" .formatted(client.getContainerName(), prefix)); } } catch (StorageException e) { @@ -290,31 +303,14 @@ private void clearPendingBulkDelete(final String prefix, final long timestamp) t lock.release(); } } - - private long deleteItems(BlobContainerClient container, List segment, Predicate filter) - throws ExecutionException, InterruptedException { - - List> collect = segment.stream() - .filter(filter) - .map(item -> deleteExecutor.submit(() -> { - deleteItem(container, item.getName()); - return null; - })) - .collect(Collectors.toList()); - - for (Future f : collect) { - f.get(); - } - return collect.size(); - } } public class KeysBulkDelete implements Callable { - private final List keys; + private final Stream keyStream; - public KeysBulkDelete(List keys) { - this.keys = keys; + public KeysBulkDelete(Stream keyStream) { + this.keyStream = keyStream; } @Override @@ -322,44 +318,50 @@ public Long call() throws Exception { long count = 0L; try { checkInterrupted(); - if (log.isLoggable(Level.FINER)) { - log.finer("Running delete delete on list of items on '%s':%s ... (only the first 100 items listed)" - .formatted(client.getContainerName(), keys.subList(0, Math.min(keys.size(), 100)))); - } BlobContainerClient container = client.getContainer(); + BlobBatchClient batch = client.getBatch(); - for (int i = 0; i < keys.size(); i += PAGE_SIZE) { - deleteItems(container, keys.subList(i, Math.min(i + PAGE_SIZE, keys.size()))); + Iterator keysIterator = keyStream.iterator(); + + while (keysIterator.hasNext()) { + + List keys = new ArrayList<>(PAGE_SIZE); + for (int i = 0; i < PAGE_SIZE && keysIterator.hasNext(); i++) { + keys.add(keysIterator.next()); + } + + if (LOG.isLoggable(Level.FINER)) { + LOG.finer( + "Running delete delete on list of items on '%s':%s ... (only the first 100 items listed)" + .formatted( + client.getContainerName(), + keys.subList(0, Math.min(keys.size(), 100)))); + } + + count += deleteItems(container, batch, keys); } } catch (InterruptedException | IllegalStateException e) { - log.log(Level.INFO, "Azure bulk delete aborted", e); + LOG.log(Level.INFO, "Azure bulk delete aborted", e); throw e; } catch (Exception e) { - log.log(Level.WARNING, "Unknown error performing bulk Azure delete", e); + LOG.log(Level.WARNING, "Unknown error performing bulk Azure delete", e); throw e; } - if (log.isLoggable(Level.INFO)) - log.info("Finished bulk delete on %s, %d objects deleted".formatted(client.getContainerName(), count)); + if (LOG.isLoggable(Level.INFO)) + LOG.info("Finished bulk delete on %s, %d objects deleted".formatted(client.getContainerName(), count)); return count; } - - private long deleteItems(BlobContainerClient container, List itemNames) - throws ExecutionException, InterruptedException { - List> collect = itemNames.stream() - .map(item -> deleteExecutor.submit(() -> deleteItem(container, item))) - .collect(Collectors.toList()); - - for (Future f : collect) { - f.get(); - } - return collect.size(); - } } - private boolean deleteItem(BlobContainerClient container, String key) { - return container.getBlobClient(key).deleteIfExists(); + private long deleteItems(BlobContainerClient container, BlobBatchClient batch, List itemNames) { + List blobsUrls = itemNames.stream() + .map(n -> container.getBlobClient(n).getBlobUrl()) + .collect(Collectors.toList()); + + return batch.deleteBlobs(blobsUrls, DeleteSnapshotsOptionType.INCLUDE).stream() + .count(); } void checkInterrupted() throws InterruptedException {