-
Notifications
You must be signed in to change notification settings - Fork 292
Limit number of S3 requests when truncating based on a tile range #1390
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
4537f2a
4b8cb05
e0d205a
ab44cdd
97b67f8
449a7c7
e2e5d85
039d22e
3d19f67
da02536
cd722f5
8629417
80fc136
1314791
6f3a40c
5c678ad
e4ba074
577ac71
3cbb23f
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,30 @@ | ||
| Tidy up aws after working with tests | ||
| === | ||
|
|
||
| ``` | ||
| aws s3 ls s3://<bucket>/ | grep tmp_ | awk '{print $2}' | while read obj; do | ||
| echo "Object: $obj" | ||
| aws s3 rm s3://gwc-s3-test/$obj --recursive | ||
| done | ||
| </code> | ||
| ``` | ||
|
|
||
| Replace the `<bucket>` with the value configured in your system. | ||
| This will delete all the temporary object that have been created | ||
|
|
||
|
|
||
| Config file | ||
| ==== | ||
| Add a `.gwc_s3_tests.properties` to your home directory to get the integration tests to run. | ||
|
|
||
| ``` | ||
| cat .gwc_s3_tests.properties | ||
| ``` | ||
| _contents of file_ | ||
|
|
||
| ``` | ||
| bucket=gwc-s3-test | ||
| secretKey=lxL***************************** | ||
| accessKey=AK***************``` | ||
|
|
||
| ``` |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -13,26 +13,22 @@ | |
| */ | ||
| package org.geowebcache.s3; | ||
|
|
||
| import static com.google.common.base.Preconditions.checkArgument; | ||
| import static com.google.common.base.Preconditions.checkNotNull; | ||
| import static java.lang.String.format; | ||
| import static java.util.Objects.isNull; | ||
|
|
||
| import com.amazonaws.AmazonServiceException; | ||
| import com.amazonaws.services.s3.AmazonS3Client; | ||
| import com.amazonaws.services.s3.model.AccessControlList; | ||
| import com.amazonaws.services.s3.model.BucketPolicy; | ||
| import com.amazonaws.services.s3.model.CannedAccessControlList; | ||
| import com.amazonaws.services.s3.model.DeleteObjectsRequest; | ||
| import com.amazonaws.services.s3.model.DeleteObjectsRequest.KeyVersion; | ||
| import com.amazonaws.services.s3.model.Grant; | ||
| import com.amazonaws.services.s3.model.ObjectMetadata; | ||
| import com.amazonaws.services.s3.model.PutObjectRequest; | ||
| import com.amazonaws.services.s3.model.S3Object; | ||
| import com.amazonaws.services.s3.model.S3ObjectInputStream; | ||
| import com.amazonaws.services.s3.model.S3ObjectSummary; | ||
| import com.google.common.base.Function; | ||
| import com.google.common.collect.AbstractIterator; | ||
| import com.google.common.collect.Iterators; | ||
| import com.google.common.collect.Lists; | ||
| import com.google.common.io.ByteStreams; | ||
| import java.io.ByteArrayInputStream; | ||
| import java.io.ByteArrayOutputStream; | ||
|
|
@@ -41,7 +37,6 @@ | |
| import java.nio.channels.WritableByteChannel; | ||
| import java.util.ArrayList; | ||
| import java.util.Arrays; | ||
| import java.util.Iterator; | ||
| import java.util.List; | ||
| import java.util.Map; | ||
| import java.util.Objects; | ||
|
|
@@ -50,7 +45,10 @@ | |
| import java.util.Set; | ||
| import java.util.logging.Level; | ||
| import java.util.logging.Logger; | ||
| import java.util.regex.Matcher; | ||
| import java.util.regex.Pattern; | ||
| import java.util.stream.Collectors; | ||
| import java.util.stream.IntStream; | ||
| import javax.annotation.Nullable; | ||
| import org.geotools.util.logging.Logging; | ||
| import org.geowebcache.GeoWebCacheException; | ||
|
|
@@ -61,14 +59,14 @@ | |
| import org.geowebcache.locks.LockProvider; | ||
| import org.geowebcache.mime.MimeException; | ||
| import org.geowebcache.mime.MimeType; | ||
| import org.geowebcache.s3.streams.TileDeletionListenerNotifier; | ||
| import org.geowebcache.storage.BlobStore; | ||
| import org.geowebcache.storage.BlobStoreListener; | ||
| import org.geowebcache.storage.BlobStoreListenerList; | ||
| import org.geowebcache.storage.CompositeBlobStore; | ||
| import org.geowebcache.storage.StorageException; | ||
| import org.geowebcache.storage.TileObject; | ||
| import org.geowebcache.storage.TileRange; | ||
| import org.geowebcache.storage.TileRangeIterator; | ||
| import org.geowebcache.util.TMSKeyBuilder; | ||
|
|
||
| public class S3BlobStore implements BlobStore { | ||
|
|
@@ -83,8 +81,6 @@ public class S3BlobStore implements BlobStore { | |
|
|
||
| private String bucketName; | ||
|
|
||
| private volatile boolean shutDown; | ||
|
|
||
| private final S3Ops s3Ops; | ||
|
|
||
| private CannedAccessControlList acl; | ||
|
|
@@ -100,7 +96,7 @@ public S3BlobStore(S3BlobStoreInfo config, TileLayerDispatcher layers, LockProvi | |
| conn = validateClient(config.buildClient(), bucketName); | ||
| acl = config.getAccessControlList(); | ||
|
|
||
| this.s3Ops = new S3Ops(conn, bucketName, keyBuilder, lockProvider); | ||
| this.s3Ops = new S3Ops(conn, bucketName, keyBuilder, lockProvider, listeners); | ||
|
|
||
| boolean empty = !s3Ops.prefixExists(prefix); | ||
| boolean existing = Objects.nonNull(s3Ops.getObjectMetadata(keyBuilder.storeMetadata())); | ||
|
|
@@ -172,7 +168,6 @@ private void checkBucketPolicy(AmazonS3Client client, String bucketName) throws | |
|
|
||
| @Override | ||
| public void destroy() { | ||
| this.shutDown = true; | ||
| AmazonS3Client conn = this.conn; | ||
| this.conn = null; | ||
| if (conn != null) { | ||
|
|
@@ -279,80 +274,40 @@ public boolean get(TileObject obj) throws StorageException { | |
| return true; | ||
| } | ||
|
|
||
| private class TileToKey implements Function<long[], KeyVersion> { | ||
|
|
||
| private final String coordsPrefix; | ||
|
|
||
| private final String extension; | ||
|
|
||
| public TileToKey(String coordsPrefix, MimeType mimeType) { | ||
| this.coordsPrefix = coordsPrefix; | ||
| this.extension = mimeType.getInternalName(); | ||
| } | ||
|
|
||
| @Override | ||
| public KeyVersion apply(long[] loc) { | ||
| long z = loc[2]; | ||
| long x = loc[0]; | ||
| long y = loc[1]; | ||
| StringBuilder sb = new StringBuilder(coordsPrefix); | ||
| sb.append(z).append('/').append(x).append('/').append(y).append('.').append(extension); | ||
| return new KeyVersion(sb.toString()); | ||
| } | ||
| } | ||
|
|
||
| @Override | ||
| public boolean delete(final TileRange tileRange) throws StorageException { | ||
| checkNotNull(tileRange, "tile range must not be null"); | ||
| checkArgument(tileRange.getZoomStart() >= 0, "zoom start must be greater or equal than zero"); | ||
| checkArgument( | ||
| tileRange.getZoomStop() >= tileRange.getZoomStart(), | ||
| "zoom stop must be greater or equal than start zoom"); | ||
|
|
||
| final String coordsPrefix = keyBuilder.coordinatesPrefix(tileRange, true); | ||
| if (!s3Ops.prefixExists(coordsPrefix)) { | ||
| return false; | ||
| } | ||
|
|
||
| final Iterator<long[]> tileLocations = new AbstractIterator<>() { | ||
|
|
||
| // TileRange iterator with 1x1 meta tiling factor | ||
| private TileRangeIterator trIter = new TileRangeIterator(tileRange, new int[] {1, 1}); | ||
| // Create a prefix for each zoom level | ||
| long count = IntStream.range(tileRange.getZoomStart(), tileRange.getZoomStop() + 1) | ||
| .mapToObj(level -> scheduleDeleteForZoomLevel(tileRange, level)) | ||
| .filter(Objects::nonNull) | ||
| .count(); | ||
|
|
||
| @Override | ||
| protected long[] computeNext() { | ||
| long[] gridLoc = trIter.nextMetaGridLocation(new long[3]); | ||
| return gridLoc == null ? endOfData() : gridLoc; | ||
| } | ||
| }; | ||
|
|
||
| if (listeners.isEmpty()) { | ||
| // if there are no listeners, don't bother requesting every tile | ||
| // metadata to notify the listeners | ||
| Iterator<List<long[]>> partition = Iterators.partition(tileLocations, 1000); | ||
| final TileToKey tileToKey = new TileToKey(coordsPrefix, tileRange.getMimeType()); | ||
|
|
||
| while (partition.hasNext() && !shutDown) { | ||
| List<long[]> locations = partition.next(); | ||
| List<KeyVersion> keys = Lists.transform(locations, tileToKey); | ||
|
|
||
| DeleteObjectsRequest req = new DeleteObjectsRequest(bucketName); | ||
| req.setQuiet(true); | ||
| req.setKeys(keys); | ||
| conn.deleteObjects(req); | ||
| } | ||
| // Check all ranges where scheduled | ||
| return count == (tileRange.getZoomStop() - tileRange.getZoomStart() + 1); | ||
| } | ||
|
|
||
| } else { | ||
| long[] xyz; | ||
| String layerName = tileRange.getLayerName(); | ||
| String gridSetId = tileRange.getGridSetId(); | ||
| String format = tileRange.getMimeType().getFormat(); | ||
| Map<String, String> parameters = tileRange.getParameters(); | ||
|
|
||
| while (tileLocations.hasNext()) { | ||
| xyz = tileLocations.next(); | ||
| TileObject tile = TileObject.createQueryTileObject(layerName, xyz, gridSetId, format, parameters); | ||
| tile.setParametersId(tileRange.getParametersId()); | ||
| delete(tile); | ||
| } | ||
| private String scheduleDeleteForZoomLevel(TileRange tileRange, int level) { | ||
| String zoomPath = keyBuilder.forZoomLevel(tileRange, level); | ||
| Bounds bounds = new Bounds(tileRange.rangeBounds(level)); | ||
| String prefix = format("%s?%s", zoomPath, bounds); | ||
| try { | ||
| s3Ops.scheduleAsyncDelete(prefix); | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This does the same as deleteByGridsetId, deleteByParametersId and delete(layerName). Looking at the changes in S3Ops, I have the impression now all asynch deletes are sending events for single tiles... if so, the listeners may end up recording a change "twice", and thus have disk quota go off synch.
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Only delete tile with a bounded delete, because even the bounds are not entered by the user defaults are passed though they will be included in the prefix passed to BulkDelete. BulkDelete will only do the notifications to listeners when it is a bounded delete. |
||
| return prefix; | ||
| } catch (GeoWebCacheException e) { | ||
| log.warning("Cannot schedule delete for prefix " + prefix); | ||
| return null; | ||
| } | ||
|
|
||
| return true; | ||
| } | ||
|
|
||
| @Override | ||
|
|
@@ -457,8 +412,7 @@ private Properties getLayerMetadata(String layerName) { | |
| } | ||
|
|
||
| private void putParametersMetadata(String layerName, String parametersId, Map<String, String> parameters) { | ||
| assert (isNull(parametersId) == isNull(parameters)); | ||
| if (isNull(parametersId)) { | ||
| if (isNull(parameters)) { | ||
| return; | ||
| } | ||
| Properties properties = new Properties(); | ||
|
|
@@ -519,4 +473,63 @@ public Map<String, Optional<Map<String, String>>> getParametersMapping(String la | |
| .map(props -> (Map<String, String>) (Map<?, ?>) props) | ||
| .collect(Collectors.toMap(ParametersUtils::getId, Optional::of)); | ||
| } | ||
|
|
||
| public static class Bounds { | ||
| private static final Pattern boundsRegex = | ||
| Pattern.compile("^(?<prefix>.*/)\\?bounds=(?<minx>\\d+),(?<miny>\\d+),(?<maxx>\\d+),(?<maxy>\\d+)$"); | ||
| private final long minX, minY, maxX, maxY; | ||
|
|
||
| public Bounds(long[] bound) { | ||
| minX = Math.min(bound[0], bound[2]); | ||
| minY = Math.min(bound[1], bound[3]); | ||
| maxX = Math.max(bound[0], bound[2]); | ||
| maxY = Math.max(bound[1], bound[3]); | ||
| } | ||
|
|
||
| public long getMinX() { | ||
| return minX; | ||
| } | ||
|
|
||
| public long getMaxX() { | ||
| return maxX; | ||
| } | ||
|
|
||
| static Optional<Bounds> createBounds(String prefix) { | ||
| Matcher matcher = boundsRegex.matcher(prefix); | ||
| if (!matcher.matches()) { | ||
| return Optional.empty(); | ||
| } | ||
|
|
||
| Bounds bounds = new Bounds(new long[] { | ||
| Long.parseLong(matcher.group("minx")), | ||
| Long.parseLong(matcher.group("miny")), | ||
| Long.parseLong(matcher.group("maxx")), | ||
| Long.parseLong(matcher.group("maxy")) | ||
| }); | ||
| return Optional.of(bounds); | ||
| } | ||
|
|
||
| static String prefixWithoutBounds(String prefix) { | ||
| Matcher matcher = boundsRegex.matcher(prefix); | ||
| if (matcher.matches()) { | ||
| return matcher.group("prefix"); | ||
| } | ||
| return prefix; | ||
| } | ||
|
|
||
| @Override | ||
| public String toString() { | ||
| return format("bounds=%d,%d,%d,%d", minX, minY, maxX, maxY); | ||
| } | ||
|
|
||
| public boolean predicate(S3ObjectSummary s3ObjectSummary) { | ||
| var matcher = TileDeletionListenerNotifier.keyRegex.matcher(s3ObjectSummary.getKey()); | ||
| if (!matcher.matches()) { | ||
| return false; | ||
| } | ||
| long x = Long.parseLong(matcher.group(TileDeletionListenerNotifier.X_GROUP_POS)); | ||
| long y = Long.parseLong(matcher.group(TileDeletionListenerNotifier.Y_GROUP_POS)); | ||
| return x >= minX && x <= maxX && y >= minY && y <= maxY; | ||
| } | ||
| } | ||
| } | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is going to execute a delete for all tiles in a given zoom level... which is an improvement, if the tile range does not have rangeBounds, of if the rangeBounds did cover the whole gridset area.
But if someone set up the job to remove a specific area (e.g., a city of interest) then the current code would delete everything instead.
To expedite this, I would suggest the following:
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added code for bounded deletes. Simplest version applied
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added BoundedS3KeySupplier that reduces the S3ObjectSummaries inspected by scanning the x axis between the bounds.