From 9044883a16029006313d31dbbe9338de5145e17c Mon Sep 17 00:00:00 2001 From: Alan McDade Date: Fri, 18 Apr 2025 12:10:40 +0200 Subject: [PATCH] Manual commit of backport --- .../org/geowebcache/util/TMSKeyBuilder.java | 32 +++ geowebcache/s3storage/Readme.md | 30 +++ .../java/org/geowebcache/s3/S3BlobStore.java | 167 ++++++++------- .../org/geowebcache/s3/S3BlobStoreInfo.java | 3 +- .../main/java/org/geowebcache/s3/S3Ops.java | 186 ++++++++++------ .../s3/streams/BatchingIterator.java | 63 ++++++ .../s3/streams/BoundedS3KeySupplier.java | 89 ++++++++ .../s3/streams/DeleteBatchesOfS3Objects.java | 82 ++++++++ .../streams/TileDeletionListenerNotifier.java | 124 +++++++++++ .../s3/streams/UnboundedS3KeySupplier.java | 65 ++++++ .../AbstractS3BlobStoreIntegrationTest.java | 198 +++++++++++++----- .../s3/OfflineS3BlobStoreIntegrationTest.java | 9 - .../s3/S3BlobStoreConformanceTest.java | 20 ++ .../org/geowebcache/s3/TemporaryS3Folder.java | 1 + 14 files changed, 865 insertions(+), 204 deletions(-) create mode 100644 geowebcache/s3storage/Readme.md create mode 100644 geowebcache/s3storage/src/main/java/org/geowebcache/s3/streams/BatchingIterator.java create mode 100644 geowebcache/s3storage/src/main/java/org/geowebcache/s3/streams/BoundedS3KeySupplier.java create mode 100644 geowebcache/s3storage/src/main/java/org/geowebcache/s3/streams/DeleteBatchesOfS3Objects.java create mode 100644 geowebcache/s3storage/src/main/java/org/geowebcache/s3/streams/TileDeletionListenerNotifier.java create mode 100644 geowebcache/s3storage/src/main/java/org/geowebcache/s3/streams/UnboundedS3KeySupplier.java diff --git a/geowebcache/core/src/main/java/org/geowebcache/util/TMSKeyBuilder.java b/geowebcache/core/src/main/java/org/geowebcache/util/TMSKeyBuilder.java index 5364400c5..47ca0fcac 100644 --- a/geowebcache/core/src/main/java/org/geowebcache/util/TMSKeyBuilder.java +++ b/geowebcache/core/src/main/java/org/geowebcache/util/TMSKeyBuilder.java @@ -59,6 +59,15 @@ public String layerId(String layerName) { return layer.getId(); } + public String layerNameFromId(String layerId) { + for (TileLayer tileLayer : layers.getLayerList()) { + if (layerId.equals(tileLayer.getId())) { + return tileLayer.getName(); + } + } + return null; + } + public Set layerGridsets(String layerName) { TileLayer layer; try { @@ -222,4 +231,27 @@ private static String join(boolean closing, Object... elements) { } return joiner.toString(); } + + private static String parametersFromTileRange(TileRange obj) { + String parametersId = obj.getParametersId(); + if (parametersId == null) { + Map parameters = obj.getParameters(); + parametersId = ParametersUtils.getId(parameters); + if (parametersId == null) { + parametersId = "default"; + } else { + obj.setParametersId(parametersId); + } + } + return parametersId; + } + + public String forZoomLevel(TileRange tileRange, int level) { + String layerId = layerId(tileRange.getLayerName()); + String gridsetId = tileRange.getGridSetId(); + String format = tileRange.getMimeType().getFileExtension(); + String parametersId = parametersFromTileRange(tileRange); + + return join(true, prefix, layerId, gridsetId, format, parametersId, String.valueOf(level)); + } } diff --git a/geowebcache/s3storage/Readme.md b/geowebcache/s3storage/Readme.md new file mode 100644 index 000000000..b2b521e4d --- /dev/null +++ b/geowebcache/s3storage/Readme.md @@ -0,0 +1,30 @@ +Tidy up aws after working with tests +=== + +``` +aws s3 ls s3:/// | grep tmp_ | awk '{print $2}' | while read obj; do + echo "Object: $obj" + aws s3 rm s3://gwc-s3-test/$obj --recursive +done + +``` + +Replace the `` with the value configured in your system. +This will delete all the temporary object that have been created + + +Config file +==== +Add a `.gwc_s3_tests.properties` to your home directory to get the integration tests to run. + +``` +cat .gwc_s3_tests.properties +``` +_contents of file_ + +``` +bucket=gwc-s3-test +secretKey=lxL***************************** +accessKey=AK***************``` + +``` diff --git a/geowebcache/s3storage/src/main/java/org/geowebcache/s3/S3BlobStore.java b/geowebcache/s3storage/src/main/java/org/geowebcache/s3/S3BlobStore.java index 823ecb011..fb8912611 100644 --- a/geowebcache/s3storage/src/main/java/org/geowebcache/s3/S3BlobStore.java +++ b/geowebcache/s3storage/src/main/java/org/geowebcache/s3/S3BlobStore.java @@ -13,7 +13,9 @@ */ package org.geowebcache.s3; +import static com.google.common.base.Preconditions.checkArgument; import static com.google.common.base.Preconditions.checkNotNull; +import static java.lang.String.format; import static java.util.Objects.isNull; import com.amazonaws.AmazonServiceException; @@ -21,18 +23,12 @@ import com.amazonaws.services.s3.model.AccessControlList; import com.amazonaws.services.s3.model.BucketPolicy; import com.amazonaws.services.s3.model.CannedAccessControlList; -import com.amazonaws.services.s3.model.DeleteObjectsRequest; -import com.amazonaws.services.s3.model.DeleteObjectsRequest.KeyVersion; import com.amazonaws.services.s3.model.Grant; import com.amazonaws.services.s3.model.ObjectMetadata; import com.amazonaws.services.s3.model.PutObjectRequest; import com.amazonaws.services.s3.model.S3Object; import com.amazonaws.services.s3.model.S3ObjectInputStream; import com.amazonaws.services.s3.model.S3ObjectSummary; -import com.google.common.base.Function; -import com.google.common.collect.AbstractIterator; -import com.google.common.collect.Iterators; -import com.google.common.collect.Lists; import com.google.common.io.ByteStreams; import java.io.ByteArrayInputStream; import java.io.ByteArrayOutputStream; @@ -41,7 +37,6 @@ import java.nio.channels.WritableByteChannel; import java.util.ArrayList; import java.util.Arrays; -import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Objects; @@ -50,7 +45,10 @@ import java.util.Set; import java.util.logging.Level; import java.util.logging.Logger; +import java.util.regex.Matcher; +import java.util.regex.Pattern; import java.util.stream.Collectors; +import java.util.stream.IntStream; import javax.annotation.Nullable; import org.geotools.util.logging.Logging; import org.geowebcache.GeoWebCacheException; @@ -61,6 +59,7 @@ import org.geowebcache.locks.LockProvider; import org.geowebcache.mime.MimeException; import org.geowebcache.mime.MimeType; +import org.geowebcache.s3.streams.TileDeletionListenerNotifier; import org.geowebcache.storage.BlobStore; import org.geowebcache.storage.BlobStoreListener; import org.geowebcache.storage.BlobStoreListenerList; @@ -68,7 +67,6 @@ import org.geowebcache.storage.StorageException; import org.geowebcache.storage.TileObject; import org.geowebcache.storage.TileRange; -import org.geowebcache.storage.TileRangeIterator; import org.geowebcache.util.TMSKeyBuilder; public class S3BlobStore implements BlobStore { @@ -83,8 +81,6 @@ public class S3BlobStore implements BlobStore { private String bucketName; - private volatile boolean shutDown; - private final S3Ops s3Ops; private CannedAccessControlList acl; @@ -100,7 +96,7 @@ public S3BlobStore(S3BlobStoreInfo config, TileLayerDispatcher layers, LockProvi conn = validateClient(config.buildClient(), bucketName); acl = config.getAccessControlList(); - this.s3Ops = new S3Ops(conn, bucketName, keyBuilder, lockProvider); + this.s3Ops = new S3Ops(conn, bucketName, keyBuilder, lockProvider, listeners); boolean empty = !s3Ops.prefixExists(prefix); boolean existing = Objects.nonNull(s3Ops.getObjectMetadata(keyBuilder.storeMetadata())); @@ -172,7 +168,6 @@ private void checkBucketPolicy(AmazonS3Client client, String bucketName) throws @Override public void destroy() { - this.shutDown = true; AmazonS3Client conn = this.conn; this.conn = null; if (conn != null) { @@ -279,80 +274,40 @@ public boolean get(TileObject obj) throws StorageException { return true; } - private class TileToKey implements Function { - - private final String coordsPrefix; - - private final String extension; - - public TileToKey(String coordsPrefix, MimeType mimeType) { - this.coordsPrefix = coordsPrefix; - this.extension = mimeType.getInternalName(); - } - - @Override - public KeyVersion apply(long[] loc) { - long z = loc[2]; - long x = loc[0]; - long y = loc[1]; - StringBuilder sb = new StringBuilder(coordsPrefix); - sb.append(z).append('/').append(x).append('/').append(y).append('.').append(extension); - return new KeyVersion(sb.toString()); - } - } - @Override public boolean delete(final TileRange tileRange) throws StorageException { + checkNotNull(tileRange, "tile range must not be null"); + checkArgument(tileRange.getZoomStart() >= 0, "zoom start must be greater or equal than zero"); + checkArgument( + tileRange.getZoomStop() >= tileRange.getZoomStart(), + "zoom stop must be greater or equal than start zoom"); final String coordsPrefix = keyBuilder.coordinatesPrefix(tileRange, true); if (!s3Ops.prefixExists(coordsPrefix)) { return false; } - final Iterator tileLocations = new AbstractIterator<>() { - - // TileRange iterator with 1x1 meta tiling factor - private TileRangeIterator trIter = new TileRangeIterator(tileRange, new int[] {1, 1}); + // Create a prefix for each zoom level + long count = IntStream.range(tileRange.getZoomStart(), tileRange.getZoomStop() + 1) + .mapToObj(level -> scheduleDeleteForZoomLevel(tileRange, level)) + .filter(Objects::nonNull) + .count(); - @Override - protected long[] computeNext() { - long[] gridLoc = trIter.nextMetaGridLocation(new long[3]); - return gridLoc == null ? endOfData() : gridLoc; - } - }; - - if (listeners.isEmpty()) { - // if there are no listeners, don't bother requesting every tile - // metadata to notify the listeners - Iterator> partition = Iterators.partition(tileLocations, 1000); - final TileToKey tileToKey = new TileToKey(coordsPrefix, tileRange.getMimeType()); - - while (partition.hasNext() && !shutDown) { - List locations = partition.next(); - List keys = Lists.transform(locations, tileToKey); - - DeleteObjectsRequest req = new DeleteObjectsRequest(bucketName); - req.setQuiet(true); - req.setKeys(keys); - conn.deleteObjects(req); - } + // Check all ranges where scheduled + return count == (tileRange.getZoomStop() - tileRange.getZoomStart() + 1); + } - } else { - long[] xyz; - String layerName = tileRange.getLayerName(); - String gridSetId = tileRange.getGridSetId(); - String format = tileRange.getMimeType().getFormat(); - Map parameters = tileRange.getParameters(); - - while (tileLocations.hasNext()) { - xyz = tileLocations.next(); - TileObject tile = TileObject.createQueryTileObject(layerName, xyz, gridSetId, format, parameters); - tile.setParametersId(tileRange.getParametersId()); - delete(tile); - } + private String scheduleDeleteForZoomLevel(TileRange tileRange, int level) { + String zoomPath = keyBuilder.forZoomLevel(tileRange, level); + Bounds bounds = new Bounds(tileRange.rangeBounds(level)); + String prefix = format("%s?%s", zoomPath, bounds); + try { + s3Ops.scheduleAsyncDelete(prefix); + return prefix; + } catch (GeoWebCacheException e) { + log.warning("Cannot schedule delete for prefix " + prefix); + return null; } - - return true; } @Override @@ -457,8 +412,7 @@ private Properties getLayerMetadata(String layerName) { } private void putParametersMetadata(String layerName, String parametersId, Map parameters) { - assert (isNull(parametersId) == isNull(parameters)); - if (isNull(parametersId)) { + if (isNull(parameters)) { return; } Properties properties = new Properties(); @@ -519,4 +473,63 @@ public Map>> getParametersMapping(String la .map(props -> (Map) (Map) props) .collect(Collectors.toMap(ParametersUtils::getId, Optional::of)); } + + public static class Bounds { + private static final Pattern boundsRegex = + Pattern.compile("^(?.*/)\\?bounds=(?\\d+),(?\\d+),(?\\d+),(?\\d+)$"); + private final long minX, minY, maxX, maxY; + + public Bounds(long[] bound) { + minX = Math.min(bound[0], bound[2]); + minY = Math.min(bound[1], bound[3]); + maxX = Math.max(bound[0], bound[2]); + maxY = Math.max(bound[1], bound[3]); + } + + public long getMinX() { + return minX; + } + + public long getMaxX() { + return maxX; + } + + static Optional createBounds(String prefix) { + Matcher matcher = boundsRegex.matcher(prefix); + if (!matcher.matches()) { + return Optional.empty(); + } + + Bounds bounds = new Bounds(new long[] { + Long.parseLong(matcher.group("minx")), + Long.parseLong(matcher.group("miny")), + Long.parseLong(matcher.group("maxx")), + Long.parseLong(matcher.group("maxy")) + }); + return Optional.of(bounds); + } + + static String prefixWithoutBounds(String prefix) { + Matcher matcher = boundsRegex.matcher(prefix); + if (matcher.matches()) { + return matcher.group("prefix"); + } + return prefix; + } + + @Override + public String toString() { + return format("bounds=%d,%d,%d,%d", minX, minY, maxX, maxY); + } + + public boolean predicate(S3ObjectSummary s3ObjectSummary) { + var matcher = TileDeletionListenerNotifier.keyRegex.matcher(s3ObjectSummary.getKey()); + if (!matcher.matches()) { + return false; + } + long x = Long.parseLong(matcher.group(TileDeletionListenerNotifier.X_GROUP_POS)); + long y = Long.parseLong(matcher.group(TileDeletionListenerNotifier.Y_GROUP_POS)); + return x >= minX && x <= maxX && y >= minY && y <= maxY; + } + } } diff --git a/geowebcache/s3storage/src/main/java/org/geowebcache/s3/S3BlobStoreInfo.java b/geowebcache/s3storage/src/main/java/org/geowebcache/s3/S3BlobStoreInfo.java index 998911cc2..8a150ad92 100644 --- a/geowebcache/s3storage/src/main/java/org/geowebcache/s3/S3BlobStoreInfo.java +++ b/geowebcache/s3storage/src/main/java/org/geowebcache/s3/S3BlobStoreInfo.java @@ -423,7 +423,8 @@ public AmazonS3Client buildClient() { clientConfig.setUseGzip(useGzip); } log.fine("Initializing AWS S3 connection"); - AmazonS3Client client = new AmazonS3Client(getCredentialsProvider(), clientConfig); + AWSCredentialsProvider credentialsProvider = getCredentialsProvider(); + AmazonS3Client client = new AmazonS3Client(credentialsProvider, clientConfig); if (endpoint != null && !"".equals(endpoint)) { S3ClientOptions s3ClientOptions = new S3ClientOptions(); s3ClientOptions.setPathStyleAccess(true); diff --git a/geowebcache/s3storage/src/main/java/org/geowebcache/s3/S3Ops.java b/geowebcache/s3storage/src/main/java/org/geowebcache/s3/S3Ops.java index 3db22392e..f12394510 100644 --- a/geowebcache/s3storage/src/main/java/org/geowebcache/s3/S3Ops.java +++ b/geowebcache/s3storage/src/main/java/org/geowebcache/s3/S3Ops.java @@ -13,12 +13,12 @@ */ package org.geowebcache.s3; +import static org.geowebcache.s3.S3BlobStore.Bounds.prefixWithoutBounds; + import com.amazonaws.services.s3.AmazonS3; import com.amazonaws.services.s3.AmazonS3Client; import com.amazonaws.services.s3.iterable.S3Objects; import com.amazonaws.services.s3.model.AmazonS3Exception; -import com.amazonaws.services.s3.model.DeleteObjectsRequest; -import com.amazonaws.services.s3.model.DeleteObjectsRequest.KeyVersion; import com.amazonaws.services.s3.model.ObjectMetadata; import com.amazonaws.services.s3.model.PutObjectRequest; import com.amazonaws.services.s3.model.S3Object; @@ -31,20 +31,20 @@ import java.io.InputStream; import java.io.InputStreamReader; import java.nio.charset.StandardCharsets; -import java.util.ArrayList; import java.util.List; import java.util.Map; import java.util.Map.Entry; +import java.util.Objects; +import java.util.Optional; import java.util.Properties; import java.util.concurrent.Callable; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.ThreadFactory; -import java.util.concurrent.atomic.AtomicInteger; +import java.util.function.Consumer; import java.util.function.Predicate; -import java.util.logging.Level; -import java.util.stream.Collectors; +import java.util.logging.Logger; import java.util.stream.Stream; import java.util.stream.StreamSupport; import javax.annotation.Nullable; @@ -53,11 +53,19 @@ import org.geowebcache.locks.LockProvider; import org.geowebcache.locks.LockProvider.Lock; import org.geowebcache.locks.NoOpLockProvider; +import org.geowebcache.s3.S3BlobStore.Bounds; +import org.geowebcache.s3.streams.BatchingIterator; +import org.geowebcache.s3.streams.BoundedS3KeySupplier; +import org.geowebcache.s3.streams.DeleteBatchesOfS3Objects; +import org.geowebcache.s3.streams.TileDeletionListenerNotifier; +import org.geowebcache.s3.streams.UnboundedS3KeySupplier; +import org.geowebcache.storage.BlobStoreListenerList; import org.geowebcache.storage.StorageException; import org.geowebcache.util.TMSKeyBuilder; class S3Ops { - + private static final int BATCH_SIZE = 1000; + public static final Consumer> NO_OPERATION_POST_PROCESSOR = list -> {}; private final AmazonS3Client conn; private final String bucketName; @@ -70,12 +78,20 @@ class S3Ops { private Map pendingDeletesKeyTime = new ConcurrentHashMap<>(); - public S3Ops(AmazonS3Client conn, String bucketName, TMSKeyBuilder keyBuilder, LockProvider locks) + private final BlobStoreListenerList listeners; + + public S3Ops( + AmazonS3Client conn, + String bucketName, + TMSKeyBuilder keyBuilder, + LockProvider locks, + BlobStoreListenerList listeners) throws StorageException { this.conn = conn; this.bucketName = bucketName; this.keyBuilder = keyBuilder; this.locks = locks == null ? new NoOpLockProvider() : locks; + this.listeners = listeners; this.deleteExecutorService = createDeleteExecutorService(); issuePendingBulkDeletes(); } @@ -109,8 +125,14 @@ private void issuePendingBulkDeletes() throws StorageException { final long timestamp = Long.parseLong(e.getValue().toString()); S3BlobStore.log.info( String.format("Restarting pending bulk delete on '%s/%s':%d", bucketName, prefix, timestamp)); - asyncDelete(prefix, timestamp); + pendingDeletesKeyTime.put(prefix, timestamp); + boolean nothingToDelete = !asyncDelete(prefix, timestamp); + if (nothingToDelete) { + clearPendingBulkDelete(prefix, timestamp); + } } + } catch (GeoWebCacheException e) { + S3BlobStore.log.warning("Unable to delete pending deletes: " + e.getMessage()); } finally { try { lock.release(); @@ -138,9 +160,12 @@ private void clearPendingBulkDelete(final String prefix, final long timestamp) t long storedTimestamp = storedVal == null ? Long.MIN_VALUE : Long.parseLong(storedVal); if (timestamp >= storedTimestamp) { putProperties(pendingDeletesKey, deletes); + S3BlobStore.log.info( + String.format("Bulk delete removed pendingDelete for for bucket '%s/%s'", bucketName, prefix)); + } else { S3BlobStore.log.info(String.format( - "bulk delete finished but there's a newer one ongoing for bucket '%s/%s'", bucketName, prefix)); + "Bulk delete finished but there's a newer one ongoing for bucket '%s/%s'", bucketName, prefix)); } } catch (StorageException e) { throw new RuntimeException(e); @@ -179,7 +204,7 @@ private long currentTimeSeconds() { return timestamp; } - private synchronized boolean asyncDelete(final String prefix, final long timestamp) { + private synchronized boolean asyncDelete(final String prefix, final long timestamp) throws StorageException { if (!prefixExists(prefix)) { return false; } @@ -189,7 +214,10 @@ private synchronized boolean asyncDelete(final String prefix, final long timesta return false; } - BulkDelete task = new BulkDelete(conn, bucketName, prefix, timestamp); + TileDeletionListenerNotifier tileDeletionListenerNotifier = + new TileDeletionListenerNotifier(listeners, keyBuilder, S3BlobStore.log); + BulkDelete task = + new BulkDelete(conn, bucketName, prefix, timestamp, S3BlobStore.log, tileDeletionListenerNotifier); deleteExecutorService.submit(task); pendingDeletesKeyTime.put(prefix, timestamp); @@ -285,10 +313,15 @@ public byte[] getBytes(String key) throws StorageException { /** Simply checks if there are objects starting with {@code prefix} */ public boolean prefixExists(String prefix) { - boolean hasNext = S3Objects.withPrefix(conn, bucketName, prefix) + String prefixWithoutBounds = prefixWithoutBounds(prefix); + boolean hasNext = S3Objects.withPrefix(conn, bucketName, prefixWithoutBounds) .withBatchSize(1) .iterator() .hasNext(); + + if (!hasNext) { + S3BlobStore.log.info("No prefix exists for " + prefixWithoutBounds); + } return hasNext; } @@ -334,7 +367,7 @@ public Stream objectStream(String prefix) { S3Objects.withPrefix(conn, bucketName, prefix).spliterator(), false); } - private class BulkDelete implements Callable { + public class BulkDelete implements Callable { private final String prefix; @@ -343,71 +376,104 @@ private class BulkDelete implements Callable { private final AmazonS3 conn; private final String bucketName; - - public BulkDelete(final AmazonS3 conn, final String bucketName, final String prefix, final long timestamp) { + private final Logger logger; + private final TileDeletionListenerNotifier tileDeletionListenerNotifier; + + public BulkDelete( + final AmazonS3 conn, + final String bucketName, + final String prefix, + final long timestamp, + final Logger logger, + TileDeletionListenerNotifier tileDeletionListenerNotifier) { this.conn = conn; this.bucketName = bucketName; this.prefix = prefix; this.timestamp = timestamp; + this.logger = logger; + this.tileDeletionListenerNotifier = tileDeletionListenerNotifier; } @Override public Long call() throws Exception { - long count = 0L; + LockProvider.Lock lock = locks.getLock(prefix); + logger.info(String.format("Running bulk delete on '%s/%s':%d", bucketName, prefix, timestamp)); try { - checkInterrupted(); - S3BlobStore.log.info(String.format("Running bulk delete on '%s/%s':%d", bucketName, prefix, timestamp)); - Predicate filter = new TimeStampFilter(timestamp); - AtomicInteger n = new AtomicInteger(0); - Iterable> partitions = objectStream(prefix) - .filter(filter) - .collect(Collectors.groupingBy((x) -> n.getAndIncrement() % 1000)) - .values(); - - for (List partition : partitions) { - - checkInterrupted(); - - List keys = new ArrayList<>(partition.size()); - for (S3ObjectSummary so : partition) { - String key = so.getKey(); - keys.add(new KeyVersion(key)); - } - - checkInterrupted(); - - if (!keys.isEmpty()) { - DeleteObjectsRequest deleteReq = new DeleteObjectsRequest(bucketName); - deleteReq.setQuiet(true); - deleteReq.setKeys(keys); + long tilesDeleted = deleteBatchesOfTilesAndInformListeners(); + logger.info(String.format( + "Finished bulk delete on '%s/%s':%d. %d objects deleted", + bucketName, prefix, timestamp, tilesDeleted)); - checkInterrupted(); - - conn.deleteObjects(deleteReq); - count += keys.size(); - } + // Once clear of the streams, throw the interrupt exception if required + // Streams will exit cleanly without clearing the interrupt flag + checkInterrupted(); + clearPendingBulkDelete(prefix, timestamp); + return tilesDeleted; + } catch (RuntimeException e) { + S3BlobStore.log.warning("Aborted bulk delete '" + e.getMessage() + "' from " + + e.getClass().getSimpleName()); + if (Objects.nonNull(e.getMessage())) { + S3BlobStore.log.warning("Aborted caused '" + e.getCause().getMessage() + "' from " + + e.getCause().getClass().getSimpleName()); } - } catch (InterruptedException | IllegalStateException e) { - S3BlobStore.log.info(String.format( - "S3 bulk delete aborted for '%s/%s'. Will resume on next startup.", bucketName, prefix)); - throw e; - } catch (Exception e) { - S3BlobStore.log.log( - Level.WARNING, - String.format("Unknown error performing bulk S3 delete of '%s/%s'", bucketName, prefix), - e); throw e; + } finally { + try { + lock.release(); + } catch (GeoWebCacheException e) { + // Do not allow checked exception to escape from a finally block + logger.warning("Error releasing lock: " + e.getMessage()); + } + } + } + + private long deleteBatchesOfTilesAndInformListeners() { + var possibleBounds = Bounds.createBounds(prefix); + DeleteBatchesOfS3Objects deleteBatchesOfS3Objects = + new DeleteBatchesOfS3Objects<>(bucketName, conn, S3ObjectSummary::getKey, logger); + Predicate timeStampFilter = new TimeStampFilter(timestamp); + Consumer> batchPostProcessor = + possibleBounds.isPresent() ? tileDeletionListenerNotifier : NO_OPERATION_POST_PROCESSOR; + + return BatchingIterator.batchedStreamOf( + createS3ObjectStream(possibleBounds) + .takeWhile(Objects::nonNull) + .takeWhile(o -> !Thread.currentThread().isInterrupted()) + .filter(timeStampFilter), + BATCH_SIZE) + .map(deleteBatchesOfS3Objects) + .peek(batchPostProcessor) + .mapToLong(List::size) + .sum(); + } + + private Stream createS3ObjectStream(Optional possibleBounds) { + if (possibleBounds.isPresent()) { + String prefixWithoutBounds = prefixWithoutBounds(prefix); + return boundedStreamOfS3Objects(prefixWithoutBounds, possibleBounds.get()); + } else { + return unboundedStreamOfS3Objects(prefix); } + } - S3BlobStore.log.info(String.format( - "Finished bulk delete on '%s/%s':%d. %d objects deleted", bucketName, prefix, timestamp, count)); + private Stream unboundedStreamOfS3Objects(String prefix) { + S3Objects s3Objects = S3Objects.withPrefix(conn, bucketName, prefix).withBatchSize(BATCH_SIZE); + UnboundedS3KeySupplier supplier = new UnboundedS3KeySupplier(prefix, bucketName, s3Objects, logger); + return Stream.generate(supplier).takeWhile(Objects::nonNull); + } - S3Ops.this.clearPendingBulkDelete(prefix, timestamp); - return count; + private Stream boundedStreamOfS3Objects(String prefixWithoutBounds, Bounds bounds) { + BoundedS3KeySupplier supplier = + new BoundedS3KeySupplier(prefixWithoutBounds, logger, conn, bounds, bucketName, BATCH_SIZE); + return Stream.generate(supplier) + .takeWhile(Objects::nonNull) + .filter(bounds::predicate); // Filter Y bounds as X is taken care of by the supplier } private void checkInterrupted() throws InterruptedException { if (Thread.interrupted()) { + S3BlobStore.log.info(String.format( + "S3 bulk delete aborted for '%s/%s'. Will resume on next startup.", bucketName, prefix)); throw new InterruptedException(); } } diff --git a/geowebcache/s3storage/src/main/java/org/geowebcache/s3/streams/BatchingIterator.java b/geowebcache/s3storage/src/main/java/org/geowebcache/s3/streams/BatchingIterator.java new file mode 100644 index 000000000..29bfed2c1 --- /dev/null +++ b/geowebcache/s3storage/src/main/java/org/geowebcache/s3/streams/BatchingIterator.java @@ -0,0 +1,63 @@ +/** + * This program is free software: you can redistribute it and/or modify it under the terms of the GNU Lesser General + * Public License as published by the Free Software Foundation, either version 3 of the License, or (at your option) any + * later version. + * + *

This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied + * warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details. + * + *

You should have received a copy of the GNU Lesser General Public License along with this program. If not, see + * . + * + *

Copyright 2025 + */ +package org.geowebcache.s3.streams; + +import static java.util.Spliterator.ORDERED; + +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; +import java.util.Spliterators; +import java.util.stream.Stream; +import java.util.stream.StreamSupport; + +public class BatchingIterator implements Iterator> { + /** + * Given a stream, convert it to a stream of batches no greater than the batchSize. + * + * @param originalStream to convert + * @param batchSize maximum size of a batch + * @param type of items in the stream + * @return a stream of batches taken sequentially from the original stream + */ + public static Stream> batchedStreamOf(Stream originalStream, int batchSize) { + return asStream(new BatchingIterator<>(originalStream.iterator(), batchSize)); + } + + private static Stream asStream(Iterator iterator) { + return StreamSupport.stream(Spliterators.spliteratorUnknownSize(iterator, ORDERED), false); + } + + private final int batchSize; + private final Iterator sourceIterator; + + private BatchingIterator(Iterator sourceIterator, int batchSize) { + this.batchSize = batchSize; + this.sourceIterator = sourceIterator; + } + + @Override + public boolean hasNext() { + return sourceIterator.hasNext(); + } + + @Override + public List next() { + List currentBatch = new ArrayList<>(batchSize); + while (sourceIterator.hasNext() && currentBatch.size() < batchSize) { + currentBatch.add(sourceIterator.next()); + } + return currentBatch; + } +} diff --git a/geowebcache/s3storage/src/main/java/org/geowebcache/s3/streams/BoundedS3KeySupplier.java b/geowebcache/s3storage/src/main/java/org/geowebcache/s3/streams/BoundedS3KeySupplier.java new file mode 100644 index 000000000..0e7dbdd82 --- /dev/null +++ b/geowebcache/s3storage/src/main/java/org/geowebcache/s3/streams/BoundedS3KeySupplier.java @@ -0,0 +1,89 @@ +/** + * This program is free software: you can redistribute it and/or modify it under the terms of the GNU Lesser General + * Public License as published by the Free Software Foundation, either version 3 of the License, or (at your option) any + * later version. + * + *

This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied + * warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details. + * + *

You should have received a copy of the GNU Lesser General Public License along with this program. If not, see + * . + * + *

Copyright 2025 + */ +package org.geowebcache.s3.streams; + +import static java.lang.String.format; + +import com.amazonaws.services.s3.AmazonS3; +import com.amazonaws.services.s3.iterable.S3Objects; +import com.amazonaws.services.s3.model.S3ObjectSummary; +import java.util.Iterator; +import java.util.function.Supplier; +import java.util.logging.Logger; +import org.geowebcache.s3.S3BlobStore.Bounds; + +/** + * Similar to the UnboundedS3KeySupplier it retrieves keys from S3. It is slightly more optimised as it respects the x + * bounds only fetching objects that are from the range of x bounds S3ObjectPathsForPrefixSupplier This class will + * interact with the AmazonS3 connection to retrieve all the objects with prefix and bucket provided
+ * It will return these lazily one by one as the get methods is called + */ +public class BoundedS3KeySupplier implements Supplier { + private final String prefixWithoutBounds; + private final Logger logger; + private final AmazonS3 conn; + private final Bounds bounds; + private final String bucket; + private final int batch; + + public BoundedS3KeySupplier( + String prefixWithoutBounds, Logger logger, AmazonS3 conn, Bounds bounds, String bucket, int batch) { + this.prefixWithoutBounds = prefixWithoutBounds; + this.logger = logger; + this.conn = conn; + this.bounds = bounds; + this.nextX = bounds.getMinX(); + this.bucket = bucket; + this.batch = batch; + } + + private Iterator iterator; + private long nextX; + private long count = 0; + + @Override + public S3ObjectSummary get() { + return next(); + } + + private synchronized S3ObjectSummary next() { + boolean hasNext = false; + do { + hasNext = iterator != null && iterator.hasNext(); + if (!hasNext) { + iterator = null; + } + + if (iterator == null && nextX <= bounds.getMaxX()) { + String prefixWithX = format("%s%d/", prefixWithoutBounds, nextX); + S3Objects s3Objects = + S3Objects.withPrefix(conn, bucket, prefixWithX).withBatchSize(batch); + iterator = s3Objects.iterator(); + hasNext = iterator.hasNext(); + nextX++; + } + } while (!hasNext && nextX <= bounds.getMaxX()); // It is exhausted if + + if (hasNext) { + count++; + S3ObjectSummary summary = iterator.next(); + logger.fine(format("%s: %s", summary.getKey(), bounds)); + return summary; + } else { + logger.info(String.format( + "Exhausted objects with prefix: %s supplied %d", prefixWithoutBounds + bounds, count)); + return null; + } + } +} diff --git a/geowebcache/s3storage/src/main/java/org/geowebcache/s3/streams/DeleteBatchesOfS3Objects.java b/geowebcache/s3storage/src/main/java/org/geowebcache/s3/streams/DeleteBatchesOfS3Objects.java new file mode 100644 index 000000000..eb5b0c1fe --- /dev/null +++ b/geowebcache/s3storage/src/main/java/org/geowebcache/s3/streams/DeleteBatchesOfS3Objects.java @@ -0,0 +1,82 @@ +/** + * This program is free software: you can redistribute it and/or modify it under the terms of the GNU Lesser General + * Public License as published by the Free Software Foundation, either version 3 of the License, or (at your option) any + * later version. + * + *

This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied + * warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details. + * + *

You should have received a copy of the GNU Lesser General Public License along with this program. If not, see + * . + * + *

Copyright 2025 + */ +package org.geowebcache.s3.streams; + +import static java.util.stream.Collectors.toMap; + +import com.amazonaws.AmazonServiceException; +import com.amazonaws.services.s3.AmazonS3; +import com.amazonaws.services.s3.model.DeleteObjectsRequest; +import com.amazonaws.services.s3.model.DeleteObjectsRequest.KeyVersion; +import com.amazonaws.services.s3.model.DeleteObjectsResult; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.function.Function; +import java.util.logging.Logger; +import java.util.stream.Collectors; + +/** @param The type of the data object used to track abstract the s3 class */ +public class DeleteBatchesOfS3Objects implements Function, List> { + private final String bucket; + private final AmazonS3 conn; + private final Function mapToKeyPath; + private final Logger logger; + + public DeleteBatchesOfS3Objects(String bucket, AmazonS3 conn, Function mapToKeyPath, Logger logger) { + this.bucket = bucket; + this.conn = conn; + this.mapToKeyPath = mapToKeyPath; + this.logger = logger; + } + + @Override + public List apply(List objectList) { + if (!objectList.isEmpty()) { + Map tilesByPath = makeMapOfTilesByPath(objectList); + DeleteObjectsRequest deleteObjectsRequest = buildRequest(tilesByPath); + DeleteObjectsResult deleteObjectsResult = makeRequest(deleteObjectsRequest); + return collectResults(deleteObjectsResult, tilesByPath); + } else { + logger.info("Expected a batch of object to delete received none"); + return List.of(); + } + } + + private List collectResults(DeleteObjectsResult deleteObjectsResult, Map tilesByPath) { + return deleteObjectsResult.getDeletedObjects().stream() + .map(deletedObject -> tilesByPath.get(deletedObject.getKey())) + .collect(Collectors.toList()); + } + + private DeleteObjectsResult makeRequest(DeleteObjectsRequest deleteObjectsRequest) { + try { + return conn.deleteObjects(deleteObjectsRequest); + } catch (AmazonServiceException e) { + return new DeleteObjectsResult(new ArrayList<>()); + } + } + + private DeleteObjectsRequest buildRequest(Map tilesByPath) { + DeleteObjectsRequest request = new DeleteObjectsRequest(bucket); + request.setBucketName(bucket); + request.setKeys(tilesByPath.keySet().stream().map(KeyVersion::new).collect(Collectors.toList())); + request.setQuiet(false); + return request; + } + + private Map makeMapOfTilesByPath(List tileList) { + return tileList.stream().collect(toMap(mapToKeyPath, Function.identity())); + } +} diff --git a/geowebcache/s3storage/src/main/java/org/geowebcache/s3/streams/TileDeletionListenerNotifier.java b/geowebcache/s3storage/src/main/java/org/geowebcache/s3/streams/TileDeletionListenerNotifier.java new file mode 100644 index 000000000..babed20b9 --- /dev/null +++ b/geowebcache/s3storage/src/main/java/org/geowebcache/s3/streams/TileDeletionListenerNotifier.java @@ -0,0 +1,124 @@ +/** + * This program is free software: you can redistribute it and/or modify it under the terms of the GNU Lesser General + * Public License as published by the Free Software Foundation, either version 3 of the License, or (at your option) any + * later version. + * + *

This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied + * warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details. + * + *

You should have received a copy of the GNU Lesser General Public License along with this program. If not, see + * . + * + *

Copyright 2025 + */ +package org.geowebcache.s3.streams; + +import static com.google.common.base.Preconditions.checkNotNull; + +import com.amazonaws.services.s3.model.S3ObjectSummary; +import java.util.List; +import java.util.Objects; +import java.util.function.Consumer; +import java.util.logging.Logger; +import java.util.regex.Matcher; +import java.util.regex.Pattern; +import org.geowebcache.mime.MimeException; +import org.geowebcache.mime.MimeType; +import org.geowebcache.storage.BlobStoreListenerList; +import org.geowebcache.util.TMSKeyBuilder; + +/** + * TileDeletionListenerNotifier is responsible for informing BlobStoreListeners that a tile has been deleted. The method + * is called when the + */ +public class TileDeletionListenerNotifier implements Consumer> { + public static final String LAYER_GROUP_POS = "layer"; + public static final String GRID_SET_ID_GROUP_POS = "gridSetId"; + public static final String FORMAT_GROUP_POS = "format"; + public static final String PARAMETERS_ID_GROUP_POS = "parametersId"; + public static final String X_GROUP_POS = "x"; + public static final String Y_GROUP_POS = "y"; + public static final String Z_GROUP_POS = "z"; + + public static final Pattern keyRegex = Pattern.compile( + "((?.+)/)?(?.+)/(?.+)/(?.+)/(?.+)/(?\\d+)/(?\\d+)/(?\\d+)\\.(?.+)"); + + private final BlobStoreListenerList listenerList; + private final TMSKeyBuilder keyBuilder; + + public TileDeletionListenerNotifier(BlobStoreListenerList listenerList, TMSKeyBuilder keyBuilder, Logger logger) { + checkNotNull(listenerList, "listenerList cannot be null"); + checkNotNull(keyBuilder, "keyBuilder cannot be null"); + checkNotNull(logger, "logger cannot be null"); + + this.listenerList = listenerList; + this.logger = logger; + this.keyBuilder = keyBuilder; + } + + private final Logger logger; + + @Override + public void accept(List tileObjectList) { + if (tileObjectList == null || tileObjectList.isEmpty()) { + logger.fine("There are no tiles successfully deleted in this batch"); + return; + } + + if (listenerList.isEmpty()) { + logger.fine("There are no listeners to be notified"); + return; + } + + // All the S3Objects are from the same layer + String layerName = null; + long count = 0; + for (S3ObjectSummary s3ObjectSummary : tileObjectList) { + Matcher matcher = keyRegex.matcher(s3ObjectSummary.getKey()); + if (matcher.matches()) { + String layerId = matcher.group(LAYER_GROUP_POS); + String gridSetId = matcher.group(GRID_SET_ID_GROUP_POS); + String extension = matcher.group(FORMAT_GROUP_POS); + String parametersId = matcher.group(PARAMETERS_ID_GROUP_POS); + long x = Long.parseLong(matcher.group(X_GROUP_POS)); + long y = Long.parseLong(matcher.group(Y_GROUP_POS)); + int z = Integer.parseInt(matcher.group(Z_GROUP_POS)); + + if (layerName == null) { + layerName = keyBuilder.layerNameFromId(layerId); + if (layerName == null) { + logger.warning("No layer found for id " + layerId + + "skipping tile listener notification as the tiles will not match"); + return; + } + } + + if (Objects.equals(parametersId, "default")) { + parametersId = null; + } + + MimeType mimeType = getMimeType(extension); + if (mimeType == null) { + logger.warning("Unknown extension " + extension + " cannot match a mimetype"); + continue; + } + + listenerList.sendTileDeleted( + layerName, gridSetId, mimeType.getMimeType(), parametersId, x, y, z, s3ObjectSummary.getSize()); + } else { + logger.warning("Key is in an invalid format " + s3ObjectSummary.getKey()); + } + } + logger.fine("Notified " + count + " tiles successfully deleted from a batch of " + tileObjectList.size()); + } + + private MimeType getMimeType(String extension) { + MimeType mimeType = null; + try { + mimeType = MimeType.createFromExtension(extension); + } catch (MimeException e) { + logger.warning("Unable to parse find mime type for extension " + extension); + } + return mimeType; + } +} diff --git a/geowebcache/s3storage/src/main/java/org/geowebcache/s3/streams/UnboundedS3KeySupplier.java b/geowebcache/s3storage/src/main/java/org/geowebcache/s3/streams/UnboundedS3KeySupplier.java new file mode 100644 index 000000000..bf598b5ee --- /dev/null +++ b/geowebcache/s3storage/src/main/java/org/geowebcache/s3/streams/UnboundedS3KeySupplier.java @@ -0,0 +1,65 @@ +/** + * This program is free software: you can redistribute it and/or modify it under the terms of the GNU Lesser General + * Public License as published by the Free Software Foundation, either version 3 of the License, or (at your option) any + * later version. + * + *

This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied + * warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details. + * + *

You should have received a copy of the GNU Lesser General Public License along with this program. If not, see + * . + * + *

Copyright 2025 + */ +package org.geowebcache.s3.streams; + +import static com.google.common.base.Preconditions.checkNotNull; + +import com.amazonaws.services.s3.iterable.S3Objects; +import com.amazonaws.services.s3.model.S3ObjectSummary; +import java.util.Iterator; +import java.util.function.Supplier; +import java.util.logging.Logger; + +/** + * UnboundedS3KeySupplier This class will interact with the AmazonS3 connection to retrieve all the objects with prefix + * and bucket provided
+ * It will return these lazily one by one as the get methods is called + */ +public class UnboundedS3KeySupplier implements Supplier { + private final String prefix; + private long count = 0; + private final Logger logger; + private final S3Objects s3Objects; + + private Iterator iterator; + + public UnboundedS3KeySupplier(String prefix, String bucket, S3Objects s3Objects, Logger logger) { + checkNotNull(prefix, "prefix must not be null"); + checkNotNull(bucket, "bucket must not be null"); + checkNotNull(s3Objects, "s3Objects must not be null"); + checkNotNull(logger, "logger must not be null"); + + this.prefix = prefix; + this.s3Objects = s3Objects; + this.logger = logger; + } + + @Override + public S3ObjectSummary get() { + return next(); + } + + private synchronized S3ObjectSummary next() { + if (iterator == null) { + iterator = s3Objects.iterator(); + } + if (iterator.hasNext()) { + count++; + return iterator.next(); + } else { + logger.info(String.format("Exhausted objects with prefix: %s supplied %d", prefix, count)); + return null; + } + } +} diff --git a/geowebcache/s3storage/src/test/java/org/geowebcache/s3/AbstractS3BlobStoreIntegrationTest.java b/geowebcache/s3storage/src/test/java/org/geowebcache/s3/AbstractS3BlobStoreIntegrationTest.java index 2c4734b52..6fecd5c53 100644 --- a/geowebcache/s3storage/src/test/java/org/geowebcache/s3/AbstractS3BlobStoreIntegrationTest.java +++ b/geowebcache/s3storage/src/test/java/org/geowebcache/s3/AbstractS3BlobStoreIntegrationTest.java @@ -16,12 +16,12 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNotNull; -import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; import static org.mockito.ArgumentMatchers.anyInt; import static org.mockito.ArgumentMatchers.anyLong; import static org.mockito.ArgumentMatchers.anyString; import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.ArgumentMatchers.isNull; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; @@ -32,11 +32,16 @@ import com.google.common.io.Files; import java.io.File; import java.io.IOException; +import java.time.Duration; import java.util.Arrays; import java.util.Collections; import java.util.HashMap; +import java.util.List; import java.util.Map; +import java.util.concurrent.TimeUnit; import java.util.logging.Logger; +import org.apache.commons.lang3.SystemUtils; +import org.awaitility.Awaitility; import org.geotools.util.logging.Logging; import org.geowebcache.config.DefaultGridsets; import org.geowebcache.grid.GridSet; @@ -57,9 +62,10 @@ import org.geowebcache.storage.TileObject; import org.geowebcache.storage.TileRange; import org.junit.After; +import org.junit.Assume; import org.junit.Before; +import org.junit.Ignore; import org.junit.Test; -import org.mockito.Mockito; /** * Integration tests for {@link S3BlobStore}. @@ -84,12 +90,20 @@ public abstract class AbstractS3BlobStoreIntegrationTest { @Before public void before() throws Exception { + Assume.assumeFalse("Test skipped on Windows", SystemUtils.IS_OS_WINDOWS); + Assume.assumeFalse("Test skipped on Mac osx", SystemUtils.IS_OS_MAC_OSX); + + Awaitility.setDefaultPollInterval(10, TimeUnit.MILLISECONDS); + Awaitility.setDefaultPollDelay(Duration.ZERO); + Awaitility.setDefaultTimeout(Duration.ofSeconds(30L)); + S3BlobStoreInfo config = getConfiguration(); TileLayerDispatcher layers = mock(TileLayerDispatcher.class); LockProvider lockProvider = new NoOpLockProvider(); TileLayer layer = mock(TileLayer.class); when(layers.getTileLayer(eq(DEFAULT_LAYER))).thenReturn(layer); + when(layers.getLayerList()).thenReturn(List.of(layer)); when(layer.getName()).thenReturn(DEFAULT_LAYER); when(layer.getId()).thenReturn(DEFAULT_LAYER); blobStore = new S3BlobStore(config, layers, lockProvider); @@ -153,7 +167,7 @@ public void testPutWithListener() throws MimeException, StorageException { eq(tile.getLayerName()), eq(tile.getGridSetId()), eq(tile.getBlobFormat()), - anyString(), + isNull(), eq(20L), eq(30L), eq(12), @@ -170,7 +184,7 @@ public void testPutWithListener() throws MimeException, StorageException { eq(tile.getLayerName()), eq(tile.getGridSetId()), eq(tile.getBlobFormat()), - anyString(), + isNull(), eq(20L), eq(30L), eq(12), @@ -211,7 +225,7 @@ public void testDelete() throws MimeException, StorageException { eq(tile.getLayerName()), eq(tile.getGridSetId()), eq(tile.getBlobFormat()), - anyString(), + isNull(), eq(22L), eq(30L), eq(12), @@ -233,14 +247,15 @@ public void testDeleteLayer() throws Exception { tile.getXYZ()[0] = 22; blobStore.put(tile); - BlobStoreListener listener = mock(BlobStoreListener.class); - blobStore.addListener(listener); + FakeListener fakeListener = new FakeListener(); + blobStore.addListener(fakeListener); + String layerName = tile.getLayerName(); - blobStore.delete(layerName); - blobStore.destroy(); - Thread.sleep(10000); - // blobStore.delete(layerName); - // verify(listener, Mockito.atLeastOnce()).layerDeleted(eq(layerName)); + assertTrue(blobStore.delete(layerName)); + + Awaitility.await().until(() -> fakeListener.layerDeleted == 1); + assertEquals(0, fakeListener.tileDeleted); + assertEquals(1, fakeListener.total()); } @Test @@ -260,14 +275,18 @@ public void testDeleteGridSubset() throws Exception { assertTrue(blobStore.get(queryTile(DEFAULT_LAYER, "EPSG:3857", "jpeg", 0, 0, 0, "param", "value"))); } + // This test is non-deterministic + @Ignore @Test public void testLayerMetadata() { blobStore.putLayerMetadata(DEFAULT_LAYER, "prop1", "value1"); blobStore.putLayerMetadata(DEFAULT_LAYER, "prop2", "value2"); - assertNull(blobStore.getLayerMetadata(DEFAULT_LAYER, "nonExistingKey")); - assertEquals("value1", blobStore.getLayerMetadata(DEFAULT_LAYER, "prop1")); - assertEquals("value2", blobStore.getLayerMetadata(DEFAULT_LAYER, "prop2")); + Awaitility.await().untilAsserted(() -> blobStore.getLayerMetadata(DEFAULT_LAYER, "nonExistingKey")); + Awaitility.await() + .untilAsserted(() -> assertEquals("value1", blobStore.getLayerMetadata(DEFAULT_LAYER, "prop1"))); + Awaitility.await() + .untilAsserted(() -> assertEquals("value2", blobStore.getLayerMetadata(DEFAULT_LAYER, "prop2"))); } @Test @@ -294,9 +313,9 @@ public void testTruncateShortCutsIfNoTilesInParametersPrefix() throws StorageExc tileRange(DEFAULT_LAYER, DEFAULT_GRIDSET, zoomStart, zoomStop, rangeBounds, mimeType, parameters); assertFalse(blobStore.delete(tileRange)); - verify(listener, times(0)) + Awaitility.await().untilAsserted(() -> verify(listener, times(0)) .tileDeleted( - anyString(), anyString(), anyString(), anyString(), anyLong(), anyLong(), anyInt(), anyLong()); + anyString(), anyString(), anyString(), anyString(), anyLong(), anyLong(), anyInt(), anyLong())); } @Test @@ -325,15 +344,15 @@ public void testTruncateShortCutsIfNoTilesInGridsetPrefix() throws StorageExcept tileRange(DEFAULT_LAYER, gridset.getName(), zoomStart, zoomStop, rangeBounds, mimeType, parameters); assertFalse(blobStore.delete(tileRange)); - verify(listener, times(0)) + Awaitility.await().untilAsserted(() -> verify(listener, times(0)) .tileDeleted( - anyString(), anyString(), anyString(), anyString(), anyLong(), anyLong(), anyInt(), anyLong()); + anyString(), anyString(), anyString(), anyString(), anyLong(), anyLong(), anyInt(), anyLong())); } /** Seed levels 0 to 2, truncate levels 0 and 1, check level 2 didn't get deleted */ @Test public void testTruncateRespectsLevels() throws StorageException, MimeException { - + Assume.assumeFalse("Test skipped on Windows", SystemUtils.IS_OS_WINDOWS); final int zoomStart = 0; final int zoomStop = 2; @@ -346,7 +365,7 @@ public void testTruncateRespectsLevels() throws StorageException, MimeException seed(zoomStart, zoomStop, gridset.getName(), DEFAULT_FORMAT, null); - BlobStoreListener listener = mock(BlobStoreListener.class); + FakeListener listener = new FakeListener(); blobStore.addListener(listener); MimeType mimeType = MimeType.createFromExtension(DEFAULT_FORMAT); @@ -361,54 +380,39 @@ public void testTruncateRespectsLevels() throws StorageException, MimeException assertTrue(blobStore.delete(tileRange)); int expectedCount = 5; // 1 for level 0, 4 for level 1, as per seed() - - verify(listener, times(expectedCount)) - .tileDeleted( - anyString(), anyString(), anyString(), anyString(), anyLong(), anyLong(), anyInt(), anyLong()); + Awaitility.await().untilAsserted(() -> { + assertEquals(expectedCount, listener.tileDeleted); + assertEquals(expectedCount, listener.total()); + }); } - /** If there are not {@link BlobStoreListener}s, use an optimized code path (not calling delete() for each tile) */ @Test - public void testTruncateOptimizationIfNoListeners() throws StorageException, MimeException { + public void testBoundedLayerDeletion() throws StorageException, MimeException { - final int zoomStart = 0; - final int zoomStop = 2; + int level = 3; + seed(level, level); + FakeListener fakeListener = new FakeListener(); + blobStore.addListener(fakeListener); - long[][] rangeBounds = { // - {0, 0, 0, 0, 0}, // - {0, 0, 1, 1, 1}, // - {0, 0, 3, 3, 2} // - }; - - seed(zoomStart, zoomStop); + long[][] rangeBounds = {{2, 2, 3, 3, level}}; MimeType mimeType = MimeType.createFromExtension(DEFAULT_FORMAT); Map parameters = null; + TileRange tileRange = + tileRange(DEFAULT_LAYER, DEFAULT_GRIDSET, level, level, rangeBounds, mimeType, parameters); - final int truncateStart = 0, truncateStop = 1; - - TileRange tileRange = tileRange( - DEFAULT_LAYER, DEFAULT_GRIDSET, truncateStart, truncateStop, rangeBounds, mimeType, parameters); - - blobStore = Mockito.spy(blobStore); assertTrue(blobStore.delete(tileRange)); + try { + Thread.sleep(1000L); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } - verify(blobStore, times(0)).delete(Mockito.any(TileObject.class)); - assertFalse(blobStore.get(queryTile(0, 0, 0))); - assertFalse(blobStore.get(queryTile(0, 0, 1))); - assertFalse(blobStore.get(queryTile(0, 1, 1))); - assertFalse(blobStore.get(queryTile(1, 0, 1))); - assertFalse(blobStore.get(queryTile(1, 1, 1))); - - assertTrue(blobStore.get(queryTile(0, 0, 2))); - assertTrue(blobStore.get(queryTile(0, 1, 2))); - assertTrue(blobStore.get(queryTile(0, 2, 2))); - // ... - assertTrue(blobStore.get(queryTile(3, 0, 2))); - assertTrue(blobStore.get(queryTile(3, 1, 2))); - assertTrue(blobStore.get(queryTile(3, 2, 2))); - assertTrue(blobStore.get(queryTile(3, 3, 2))); + int wantedNumberOfInvocations = + (int) ((rangeBounds[0][2] - rangeBounds[0][0] + 1) * (rangeBounds[0][3] - rangeBounds[0][1] + 1)); + Awaitility.await().untilAsserted(() -> assertEquals(wantedNumberOfInvocations, fakeListener.tileDeleted)); + assertEquals(wantedNumberOfInvocations, fakeListener.total()); } private TileRange tileRange( @@ -502,4 +506,84 @@ private TileObject queryTile( TileObject tile = TileObject.createQueryTileObject(layer, new long[] {x, y, z}, gridset, format, parameters); return tile; } + + static class FakeListener implements BlobStoreListener { + int tileStored = 0; + int tileDeleted = 0; + int tileUpdated = 0; + int layerDeleted = 0; + int layerRenamed = 0; + int gridSubsetDeleted = 0; + int parametersDeleted = 0; + + @Override + public void tileStored( + String layerName, + String gridSetId, + String blobFormat, + String parametersId, + long x, + long y, + int z, + long blobSize) { + tileStored++; + } + + @Override + public void tileDeleted( + String layerName, + String gridSetId, + String blobFormat, + String parametersId, + long x, + long y, + int z, + long blobSize) { + tileDeleted++; + } + + @Override + public void tileUpdated( + String layerName, + String gridSetId, + String blobFormat, + String parametersId, + long x, + long y, + int z, + long blobSize, + long oldSize) { + tileUpdated++; + } + + @Override + public void layerDeleted(String layerName) { + layerDeleted++; + } + + @Override + public void layerRenamed(String oldLayerName, String newLayerName) { + layerRenamed++; + } + + @Override + public void gridSubsetDeleted(String layerName, String gridSetId) { + gridSubsetDeleted++; + } + + @Override + public void parametersDeleted(String layerName, String parametersId) { + parametersDeleted++; + } + + public int total() { + return tileDeleted + + tileStored + + tileUpdated + + layerDeleted + + layerRenamed + + gridSubsetDeleted + + parametersDeleted; + } + } } diff --git a/geowebcache/s3storage/src/test/java/org/geowebcache/s3/OfflineS3BlobStoreIntegrationTest.java b/geowebcache/s3storage/src/test/java/org/geowebcache/s3/OfflineS3BlobStoreIntegrationTest.java index f81a96648..c7a5bc019 100644 --- a/geowebcache/s3storage/src/test/java/org/geowebcache/s3/OfflineS3BlobStoreIntegrationTest.java +++ b/geowebcache/s3storage/src/test/java/org/geowebcache/s3/OfflineS3BlobStoreIntegrationTest.java @@ -16,7 +16,6 @@ import org.geowebcache.storage.StorageException; import org.junit.AfterClass; import org.junit.BeforeClass; -import org.junit.Ignore; import org.junit.Test; /** @@ -24,7 +23,6 @@ * *

*/ -@Ignore // this test fails very often on the AppVeyor build and frequently on Travis, disabling public class OfflineS3BlobStoreIntegrationTest extends AbstractS3BlobStoreIntegrationTest { private static S3Mock api; @@ -52,13 +50,6 @@ protected S3BlobStoreInfo getConfiguration() { @Override @Test - public void testTruncateOptimizationIfNoListeners() throws StorageException, MimeException { - super.testTruncateOptimizationIfNoListeners(); - } - - @Override - @Ignore // randomly fails - @Test public void testTruncateShortCutsIfNoTilesInGridsetPrefix() throws StorageException, MimeException { super.testTruncateShortCutsIfNoTilesInGridsetPrefix(); } diff --git a/geowebcache/s3storage/src/test/java/org/geowebcache/s3/S3BlobStoreConformanceTest.java b/geowebcache/s3storage/src/test/java/org/geowebcache/s3/S3BlobStoreConformanceTest.java index b1b11734c..27265f5c1 100644 --- a/geowebcache/s3storage/src/test/java/org/geowebcache/s3/S3BlobStoreConformanceTest.java +++ b/geowebcache/s3storage/src/test/java/org/geowebcache/s3/S3BlobStoreConformanceTest.java @@ -13,6 +13,7 @@ */ package org.geowebcache.s3; +import static java.util.concurrent.TimeUnit.SECONDS; import static org.easymock.EasyMock.createMock; import static org.easymock.EasyMock.eq; import static org.easymock.EasyMock.expect; @@ -22,13 +23,17 @@ import java.util.Arrays; import java.util.Collections; import java.util.stream.Stream; +import org.awaitility.Awaitility; import org.easymock.EasyMock; import org.geowebcache.GeoWebCacheException; +import org.geowebcache.grid.GridSet; import org.geowebcache.layer.TileLayer; import org.geowebcache.layer.TileLayerDispatcher; import org.geowebcache.locks.LockProvider; import org.geowebcache.locks.NoOpLockProvider; import org.geowebcache.storage.AbstractBlobStoreTest; +import org.geowebcache.storage.StorageException; +import org.geowebcache.storage.TileRange; import org.junit.Assume; import org.junit.Rule; @@ -63,4 +68,19 @@ public void createTestUnit() throws Exception { replay(layers); store = new S3BlobStore(config, layers, lockProvider); } + + @Override + public void assertTileRangeEmpty(String layerName, GridSet gridSet, String format, TileRange range) + throws StorageException { + Awaitility.await().atMost(30, SECONDS).untilAsserted(() -> { + for (int z = range.getZoomStart(); z <= range.getZoomStop(); z++) { + long[] bounds = range.rangeBounds(z); + for (long x = bounds[0]; x <= bounds[2]; x++) { + for (long y = bounds[1]; y < bounds[2]; y++) { + assertNoTile(layerName, x, y, z, gridSet.getName(), format, null); + } + } + } + }); + } } diff --git a/geowebcache/s3storage/src/test/java/org/geowebcache/s3/TemporaryS3Folder.java b/geowebcache/s3storage/src/test/java/org/geowebcache/s3/TemporaryS3Folder.java index 1c1959a71..dbcf915f4 100644 --- a/geowebcache/s3storage/src/test/java/org/geowebcache/s3/TemporaryS3Folder.java +++ b/geowebcache/s3storage/src/test/java/org/geowebcache/s3/TemporaryS3Folder.java @@ -86,6 +86,7 @@ public S3BlobStoreInfo getConfig() { config.setBucket(bucket); config.setAwsAccessKey(accessKey); config.setAwsSecretKey(secretKey); + config.setAccess(Access.PRIVATE); config.setPrefix(temporaryPrefix); if (properties.getProperty("endpoint") != null) { config.setEndpoint(properties.getProperty("endpoint"));