From a8f3bae1b7055aa2e18c208e88d53144d0249dbd Mon Sep 17 00:00:00 2001 From: Dalibor Novak Date: Fri, 21 Jul 2017 18:27:01 +0100 Subject: [PATCH] Update bucket lifecycle check `getPrefix` method on `Rule` [got deprecated](https://github.com/aws/aws-sdk-java/blob/355424771b951ef0066b19c3eab4b4356e270cf4/aws-java-sdk-s3/src/main/java/com/amazonaws/services/s3/model/BucketLifecycleConfiguration.java#L145-L153) It seems that reponse on the wire was also changed so this method no longer returns the prefix even on older versions of AWS SDK (as the one used by this project). I've bumped the AWS SDK dependencies version and implemented the check using new visitor pattern. I am not sure it is the nicest scala code, but I think it works. Tests stil pass. I believe this fixes #346. --- project/SparkRedshiftBuild.scala | 2 +- .../com/databricks/spark/redshift/Utils.scala | 23 ++++++++++++++++++- 2 files changed, 23 insertions(+), 2 deletions(-) diff --git a/project/SparkRedshiftBuild.scala b/project/SparkRedshiftBuild.scala index 1a5301f9..9f4ce429 100644 --- a/project/SparkRedshiftBuild.scala +++ b/project/SparkRedshiftBuild.scala @@ -51,7 +51,7 @@ object SparkRedshiftBuild extends Build { testSparkVersion := sys.props.get("spark.testVersion").getOrElse(sparkVersion.value), testSparkAvroVersion := sys.props.get("sparkAvro.testVersion").getOrElse("3.0.0"), testHadoopVersion := sys.props.get("hadoop.testVersion").getOrElse("2.2.0"), - testAWSJavaSDKVersion := sys.props.get("aws.testVersion").getOrElse("1.10.22"), + testAWSJavaSDKVersion := sys.props.get("aws.testVersion").getOrElse("1.11.166"), spName := "databricks/spark-redshift", sparkComponents ++= Seq("sql", "hive"), spIgnoreProvided := true, diff --git a/src/main/scala/com/databricks/spark/redshift/Utils.scala b/src/main/scala/com/databricks/spark/redshift/Utils.scala index 82c48c3a..5fabb0d1 100644 --- a/src/main/scala/com/databricks/spark/redshift/Utils.scala +++ b/src/main/scala/com/databricks/spark/redshift/Utils.scala @@ -24,6 +24,7 @@ import scala.util.control.NonFatal import com.amazonaws.services.s3.{AmazonS3URI, AmazonS3Client} import com.amazonaws.services.s3.model.BucketLifecycleConfiguration +import com.amazonaws.services.s3.model.lifecycle.{LifecycleAndOperator, LifecyclePredicateVisitor, LifecyclePrefixPredicate, LifecycleTagPredicate} import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.FileSystem import org.slf4j.LoggerFactory @@ -133,11 +134,17 @@ private[redshift] object Utils { val rules = Option(s3Client.getBucketLifecycleConfiguration(bucket)) .map(_.getRules.asScala) .getOrElse(Seq.empty) + val keyPrefixMatchingVisitor = new KeyPrefixMatchingVisitor(key) rules.exists { rule => // Note: this only checks that there is an active rule which matches the temp directory; // it does not actually check that the rule will delete the files. This check is still // better than nothing, though, and we can always improve it later. - rule.getStatus == BucketLifecycleConfiguration.ENABLED && key.startsWith(rule.getPrefix) + rule.getFilter.getPredicate.accept(keyPrefixMatchingVisitor) + if (rule.getStatus == BucketLifecycleConfiguration.ENABLED) { + keyPrefixMatchingVisitor.matchFound + } else { + false + } } } if (!hasMatchingBucketLifecycleRule) { @@ -205,3 +212,17 @@ private[redshift] object Utils { } } } + +private class KeyPrefixMatchingVisitor(key: String) extends LifecyclePredicateVisitor { + var matchFound = false + + override def visit(lifecyclePrefixPredicate: LifecyclePrefixPredicate): Unit = { + if (!matchFound && key.startsWith(lifecyclePrefixPredicate.getPrefix)) { + matchFound = true + } + } + + override def visit(lifecycleTagPredicate: LifecycleTagPredicate): Unit = {} + + override def visit(lifecycleAndOperator: LifecycleAndOperator): Unit = {} +}