diff --git a/README.md b/README.md
index 794e5c4f..23c10b51 100644
--- a/README.md
+++ b/README.md
@@ -402,16 +402,35 @@ The following describes how each connection can be authenticated:
[_JDBC Driver Configuration Options_](http://docs.aws.amazon.com/redshift/latest/mgmt/configure-jdbc-options.html) to add the appropriate SSL options
to the JDBC `url` used with this library.
-- **Encrypting `UNLOAD` data stored in S3 (data stored when reading from Redshift)**: According to the Redshift documentation
- on [_Unloading Data to S3_](http://docs.aws.amazon.com/redshift/latest/dg/t_Unloading_tables.html),
- "UNLOAD automatically encrypts data files using Amazon S3 server-side encryption (SSE-S3)."
+- **S3 Client Side Encryption (CSE)**: Redshift supports S3 CSE with a custom master symmetric key
+ (see: [_S3 CSE on EMRFS_](http://docs.aws.amazon.com/emr/latest/ReleaseGuide/emr-4.5.0//emr-cluster-configuration-object-encryption.html),
+ [S3 CSE on Databricks FS](https://docs.databricks.com/spark/latest/data-sources/amazon-s3.html?highlight=encryption#client-side-s3-encryption)).
+ This library uses RedshiftEncryptionMaterialsProvider to configure the underlying filesystem (EMRFS or Databricks FS). User can turn on S3 CSE by:
+ 1. Setting "fs.s3.cse.enabled" to "true" at the start of the job.
+ 2. Setting "fs.s3.cse.encryptionMaterialsProvider" to "com.databricks.spark.redshift.RedshiftEncryptionMaterialsProvider" at the start of the job
+ 3. Setting "spark-redshift.master-sym-key" to master_symmetric_key to be used by Redshift at the start of the job.
+ 4. Setting "encryption" option to "true" in the parameters.
+
+ **NOTE: Once configured cluster will keep using the master symmetric key to read/write any object to S3. New job must be started to change encryption handling on the cluster.**
+
+ For example, in Scala add:
+ ```scala
+sc.hadoopConfiguration.set("fs.s3.cse.enabled", "true")
+sc.hadoopConfiguration.set("fs.s3.cse.encryptionMaterialsProvider", "com.databricks.spark.redshift.RedshiftEncryptionMaterialsProvider")
+sc.hadoopConfiguration.set("spark-redshift.master-sym-key", "YOUR_MASTER_SYMMETRIC_KEY")
+```
+
+- **Encrypting `UNLOAD` data stored in S3 (data stored when reading from Redshift)**:
- Redshift also supports client-side encryption with a custom key
- (see: [_Unloading Encrypted Data Files_](http://docs.aws.amazon.com/redshift/latest/dg/t_unloading_encrypted_files.html))
- but this library currently lacks the capability to specify the required symmetric key.
+ * Usig S3 server-side encryption (S3 SSE): According to the Redshift documentation
+ on [_S3 Client Side Encryption_](http://docs.aws.amazon.com/redshift/latest/dg/t_Unloading_tables.html),
+ "UNLOAD automatically encrypts data files using Amazon S3 SSE."
+
+ * Using S3 client-side encryption (S3 CSE): Once configured using the steps provided in the previous section,
+ UNLOAD automatically encrypts data files using Amazon S3 CSE.
- **Encrypting `COPY` data stored in S3 (data stored when writing to Redshift)**:
- According to the Redshift documentation on
+ * Usig S3 server-side encryption (S3 SSE): According to the Redshift documentation on
[_Loading Encrypted Data Files from Amazon S3_](http://docs.aws.amazon.com/redshift/latest/dg/c_loading-encrypted-files.html):
> You can use the COPY command to load data files that were uploaded to Amazon S3 using
@@ -423,6 +442,62 @@ The following describes how each connection can be authenticated:
are using `s3a`, `s3n`, EMRFS, etc.).
Note that the `MANIFEST` file (a list of all files written) will not be encrypted.
+ * Using S3 client-side encryption (S3 CSE): Once configured using the steps provided in the previous section,
+ COPY automatically encrypts data files using Amazon S3 client-side encryption (S3 CSE).
+
+Following is an example to read and write data using S3 CSE:
+```scala
+import org.apache.spark.sql._
+
+val sc = // existing SparkContext
+val sqlContext = new SQLContext(sc)
+
+// Enable encryption by setting required hadoop configuration
+sc.hadoopConfiguration.set("fs.s3.cse.enabled", "true")
+sc.hadoopConfiguration.set("fs.s3.cse.encryptionMaterialsProvider", "com.databricks.spark.redshift.RedshiftEncryptionMaterialsProvider")
+sc.hadoopConfiguration.set("spark-redshift.master-sym-key", "YOUR_MASTER_SYMMETRIC_KEY")
+
+// Get some data from a Redshift table
+val df: DataFrame = sqlContext.read
+ .format("com.databricks.spark.redshift")
+ .option("url", "jdbc:redshift://redshifthost:5439/database?user=username&password=pass")
+ .option("dbtable", "my_table")
+ .option("tempdir", "s3n://path/for/temp/data")
+ .option("encryption", "true")
+ .load()
+
+// Can also load data from a Redshift query
+val df: DataFrame = sqlContext.read
+ .format("com.databricks.spark.redshift")
+ .option("url", "jdbc:redshift://redshifthost:5439/database?user=username&password=pass")
+ .option("query", "select x, count(*) my_table group by x")
+ .option("tempdir", "s3n://path/for/temp/data")
+ .option("encryption", "true")
+ .load()
+
+// Apply some transformations to the data as per normal, then you can use the
+// Data Source API to write the data back to another table
+
+df.write
+ .format("com.databricks.spark.redshift")
+ .option("url", "jdbc:redshift://redshifthost:5439/database?user=username&password=pass")
+ .option("dbtable", "my_table_copy")
+ .option("tempdir", "s3n://path/for/temp/data")
+ .option("encryption", "true")
+ .mode("error")
+ .save()
+
+// Using IAM Role based authentication
+df.write
+ .format("com.databricks.spark.redshift")
+ .option("url", "jdbc:redshift://redshifthost:5439/database?user=username&password=pass")
+ .option("dbtable", "my_table_copy")
+ .option("aws_iam_role", "arn:aws:iam::123456789000:role/redshift_iam_role")
+ .option("tempdir", "s3n://path/for/temp/data")
+ .option("encryption", "true")
+ .mode("error")
+ .save()
+```
### Parameters
@@ -513,6 +588,19 @@ need to be configured to allow access from your driver application.
No default |
AWS session token corresponding to provided access key. |
+
+ | encryption |
+ No |
+ false |
+
+
+ Toggles using Amazon S3 Client Side Encryption (CSE):
+ When set to true, CSE will be used.
+ When using CSE, the following needs to happen:
+ 1. The master symmetric key must be set in the Hadoop configuration via the "spark-redshift.master-sym-key" property.
+ 2. EncryptionMaterialsProvider property must be set using "fs.s3.cse.encryptionMaterialsProvider" property.
+
+ |
| tempdir |
Yes |
diff --git a/src/main/scala/com/databricks/spark/redshift/AWSCredentialsUtils.scala b/src/main/scala/com/databricks/spark/redshift/AWSCredentialsUtils.scala
index 47ad0b06..65985bfc 100644
--- a/src/main/scala/com/databricks/spark/redshift/AWSCredentialsUtils.scala
+++ b/src/main/scala/com/databricks/spark/redshift/AWSCredentialsUtils.scala
@@ -28,8 +28,26 @@ private[redshift] object AWSCredentialsUtils {
/**
* Generates a credentials string for use in Redshift COPY and UNLOAD statements.
* Favors a configured `aws_iam_role` if available in the parameters.
+ * Adds master symmetric key to the credential string if S3 CSE is enabled.
+ * NOTE: This method deals with encryption because Redshift's CREDENTIALS option
+ * considers master symmetric key a part of credential string.
*/
def getRedshiftCredentialsString(
+ params: MergedParameters,
+ sparkAwsCredentials: AWSCredentials,
+ hadoopConfiguration: Configuration
+ ): String = {
+
+ getRedshiftAWSCredentialsSubString(params, sparkAwsCredentials) +
+ getRedshiftEncryptionSubString(params, hadoopConfiguration)
+ }
+
+
+ /**
+ * Generates a credentials string for use in Redshift COPY and UNLOAD statements.
+ * Favors a configured `aws_iam_role` if available in the parameters.
+ */
+ private def getRedshiftAWSCredentialsSubString(
params: MergedParameters,
sparkAwsCredentials: AWSCredentials): String = {
@@ -43,6 +61,7 @@ private[redshift] object AWSCredentialsUtils {
s"aws_secret_access_key=${creds.getAWSSecretKey}"
}
}
+
if (params.iamRole.isDefined) {
s"aws_iam_role=${params.iamRole.get}"
} else if (params.temporaryAWSCredentials.isDefined) {
@@ -106,4 +125,17 @@ private[redshift] object AWSCredentialsUtils {
throw new IllegalArgumentException(s"Unrecognized scheme $other; expected s3, s3n, or s3a")
}
}
+
+ /**
+ * Generates encryption string to be appended to the Redshift Credentials string
+ */
+ private def getRedshiftEncryptionSubString(
+ params: MergedParameters,
+ hadoopConfiguration: Configuration): String = {
+ if (params.encryption) {
+ s";master_symmetric_key=${hadoopConfiguration.get("spark-redshift.master-sym-key")}"
+ } else {
+ ""
+ }
+ }
}
diff --git a/src/main/scala/com/databricks/spark/redshift/Parameters.scala b/src/main/scala/com/databricks/spark/redshift/Parameters.scala
index 875f5b75..f0d5ddf4 100644
--- a/src/main/scala/com/databricks/spark/redshift/Parameters.scala
+++ b/src/main/scala/com/databricks/spark/redshift/Parameters.scala
@@ -35,6 +35,7 @@ private[redshift] object Parameters {
"tempformat" -> "AVRO",
"csvnullstring" -> "@NULL@",
"overwrite" -> "false",
+ "encryption" -> "false",
"diststyle" -> "EVEN",
"usestagingtable" -> "true",
"preactions" -> ";",
@@ -159,6 +160,13 @@ private[redshift] object Parameters {
) yield (user, password)
}
+ /**
+ * S3 Client Side Encryption will be enabled if encryption option is set
+ * to true. Master symmetric key is taken from hadoop configuration
+ * property spark-redshift.master-sym-key
+ */
+ def encryption: Boolean = parameters("encryption").toBoolean
+
/**
* A JDBC URL, of the format:
*
diff --git a/src/main/scala/com/databricks/spark/redshift/RedshiftEncryptionMaterialsProvider.scala b/src/main/scala/com/databricks/spark/redshift/RedshiftEncryptionMaterialsProvider.scala
new file mode 100644
index 00000000..7f1bc060
--- /dev/null
+++ b/src/main/scala/com/databricks/spark/redshift/RedshiftEncryptionMaterialsProvider.scala
@@ -0,0 +1,67 @@
+package com.databricks.spark.redshift
+
+import java.util
+import javax.crypto.SecretKey
+import javax.crypto.spec.SecretKeySpec
+
+import com.amazonaws.services.s3.model.{EncryptionMaterials, EncryptionMaterialsProvider}
+import com.amazonaws.util.Base64
+import org.apache.hadoop.conf.{Configurable, Configuration}
+
+
+/**
+ * This class provides encryption materials to be used by spark-redshfit
+ * library. It gets the master symmetric key from hadoop configuration,
+ * converts it into EncryptionMaterials and returns them.
+ */
+object RedshiftEncryptionMaterialsProvider {
+ /**
+ * Encryption algorithm used for by Redshift.
+ */
+ private val ENCRYPTION_ALGORITHM: String = "AES"
+ /**
+ * This is the hadoop configuration property name that contains
+ * redshift master symmetric key.
+ */
+ private val SPARK_REDSHIFT_MASTER_SYM_KEY: String = "spark-redshift.master-sym-key"
+}
+
+class RedshiftEncryptionMaterialsProvider
+ extends EncryptionMaterialsProvider
+ with Configurable {
+ /**
+ * The hadoop configuration.
+ */
+ private var conf: Configuration = null
+
+ def setConf(conf: Configuration) {
+ this.conf = conf
+ }
+
+ def getConf: Configuration = {
+ return this.conf
+ }
+
+ def refresh {
+ }
+
+ def getEncryptionMaterials(materialsDescription: Map[String, String]):
+ EncryptionMaterials = {
+ return getEncryptionMaterials
+ }
+
+ override def getEncryptionMaterials(
+ materialsDescription: util.Map[String, String]):
+ EncryptionMaterials = getEncryptionMaterials
+
+ override def getEncryptionMaterials: EncryptionMaterials = {
+ val masterKey: SecretKey =
+ new SecretKeySpec(
+ Base64.decode(
+ conf.get(
+ RedshiftEncryptionMaterialsProvider.
+ SPARK_REDSHIFT_MASTER_SYM_KEY)),
+ RedshiftEncryptionMaterialsProvider.ENCRYPTION_ALGORITHM)
+ return new EncryptionMaterials(masterKey)
+ }
+}
diff --git a/src/main/scala/com/databricks/spark/redshift/RedshiftRelation.scala b/src/main/scala/com/databricks/spark/redshift/RedshiftRelation.scala
index 31dc11b2..e01277f3 100644
--- a/src/main/scala/com/databricks/spark/redshift/RedshiftRelation.scala
+++ b/src/main/scala/com/databricks/spark/redshift/RedshiftRelation.scala
@@ -181,22 +181,31 @@ private[redshift] case class RedshiftRelation(
// Always quote column names:
val columnList = requiredColumns.map(col => s""""$col"""").mkString(", ")
val whereClause = FilterPushdown.buildWhereClause(schema, filters)
+ val hadoopConfiguration = sqlContext.sparkContext.hadoopConfiguration
val credsString: String =
- AWSCredentialsUtils.getRedshiftCredentialsString(params, creds.getCredentials)
+ AWSCredentialsUtils.getRedshiftCredentialsString(
+ params, creds.getCredentials, hadoopConfiguration)
val query = {
// Since the query passed to UNLOAD will be enclosed in single quotes, we need to escape
// any backslashes and single quotes that appear in the query itself
- val escapedTableNameOrSubqury = tableNameOrSubquery.replace("\\", "\\\\").replace("'", "\\'")
- s"SELECT $columnList FROM $escapedTableNameOrSubqury $whereClause"
+ val escapedTableNameOrSubquery = tableNameOrSubquery.replace("\\", "\\\\").replace("'", "\\'")
+ s"SELECT $columnList FROM $escapedTableNameOrSubquery $whereClause"
}
// We need to remove S3 credentials from the unload path URI because they will conflict with
// the credentials passed via `credsString`.
val fixedUrl = Utils.fixS3Url(Utils.removeCredentialsFromURI(new URI(tempDir)).toString)
-
- s"UNLOAD ('$query') TO '$fixedUrl' WITH CREDENTIALS '$credsString' ESCAPE MANIFEST"
+ val encryptOptStr: String =
+ Utils.getEncryptOptStr(params, sqlContext.sparkContext.hadoopConfiguration)
+
+ s"UNLOAD ('$query') " +
+ s"TO '$fixedUrl' " +
+ s"WITH CREDENTIALS '$credsString' " +
+ s"ESCAPE " +
+ s"MANIFEST " +
+ s"$encryptOptStr"
}
- private def pruneSchema(schema: StructType, columns: Array[String]): StructType = {
+ private def pruneSchema(schema: StructType, columns: Array[String]): StructType = {
val fieldMap = Map(schema.fields.map(x => x.name -> x): _*)
new StructType(columns.map(name => fieldMap(name)))
}
diff --git a/src/main/scala/com/databricks/spark/redshift/RedshiftWriter.scala b/src/main/scala/com/databricks/spark/redshift/RedshiftWriter.scala
index 8383231d..d62be9b6 100644
--- a/src/main/scala/com/databricks/spark/redshift/RedshiftWriter.scala
+++ b/src/main/scala/com/databricks/spark/redshift/RedshiftWriter.scala
@@ -19,17 +19,17 @@ package com.databricks.spark.redshift
import java.net.URI
import java.sql.{Connection, Date, SQLException, Timestamp}
-import com.amazonaws.auth.AWSCredentialsProvider
-import com.amazonaws.services.s3.AmazonS3Client
+import com.amazonaws.auth.{AWSCredentials, AWSCredentialsProvider}
+import com.amazonaws.services.s3.model.PutObjectRequest
+import com.amazonaws.services.s3.{AmazonS3Client, AmazonS3URI}
import org.apache.hadoop.fs.{FileSystem, Path}
-
import org.apache.spark.TaskContext
import org.slf4j.LoggerFactory
+
import scala.collection.mutable
import scala.util.control.NonFatal
-
import com.databricks.spark.redshift.Parameters.MergedParameters
-
+import org.apache.commons.io.IOUtils
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{DataFrame, Row, SQLContext, SaveMode}
import org.apache.spark.sql.types._
@@ -92,14 +92,19 @@ private[redshift] class RedshiftWriter(
creds: AWSCredentialsProvider,
manifestUrl: String): String = {
val credsString: String =
- AWSCredentialsUtils.getRedshiftCredentialsString(params, creds.getCredentials)
+ AWSCredentialsUtils.getRedshiftCredentialsString(
+ params, creds.getCredentials, sqlContext.sparkContext.hadoopConfiguration)
val fixedUrl = Utils.fixS3Url(manifestUrl)
+
+ val encryptOptString: String =
+ Utils.getEncryptOptStr(params, sqlContext.sparkContext.hadoopConfiguration)
+
val format = params.tempFormat match {
case "AVRO" => "AVRO 'auto'"
case csv if csv == "CSV" || csv == "CSV GZIP" => csv + s" NULL AS '${params.nullString}'"
}
s"COPY ${params.table.get} FROM '$fixedUrl' CREDENTIALS '$credsString' FORMAT AS " +
- s"${format} manifest ${params.extraCopyOptions}"
+ s"${format} manifest ${params.extraCopyOptions} $encryptOptString"
}
/**
@@ -209,6 +214,7 @@ private[redshift] class RedshiftWriter(
private def unloadData(
sqlContext: SQLContext,
data: DataFrame,
+ creds: AWSCredentialsProvider,
tempDir: String,
tempFormat: String,
nullString: String): Option[String] = {
@@ -319,16 +325,30 @@ private[redshift] class RedshiftWriter(
}
val manifest = s"""{"entries": [${manifestEntries.mkString(",\n")}]}"""
val manifestPath = sanitizedTempDir + "/manifest.json"
- val fsDataOut = fs.create(new Path(manifestPath))
- try {
- fsDataOut.write(manifest.getBytes("utf-8"))
- } finally {
- fsDataOut.close()
- }
+
+ writeRedshiftManifestUnencrypted(creds, manifest, manifestPath)
Some(manifestPath)
}
}
+ /**
+ * Writes Redshift Manifest file to S3 without encryption.
+ */
+ private def writeRedshiftManifestUnencrypted(
+ creds: AWSCredentialsProvider,
+ manifest: String,
+ manifestPath: String) = {
+ /* By default EncryptionMaterialsProviders encrypt every S3 object using client side
+ * encryption. Redshift expects manifest file to be unencrypted. Hence, instantiating
+ * a new unencrypted AmazonS3Client object to write the manifest to S3.
+ */
+ val s3Client = s3ClientFactory(creds)
+ val s3URI = new AmazonS3URI(manifestPath)
+ val putObjectRequest: PutObjectRequest = new PutObjectRequest(s3URI.getBucket(),
+ s3URI.getKey, IOUtils.toInputStream(manifest, "utf-8"), null);
+ s3Client.putObject(putObjectRequest)
+ }
+
/**
* Write a DataFrame to a Redshift table, using S3 and Avro or CSV serialization
*/
@@ -392,6 +412,7 @@ private[redshift] class RedshiftWriter(
val manifestUrl = unloadData(
sqlContext,
data,
+ creds,
tempDir = params.createPerQueryTempDir(),
tempFormat = params.tempFormat,
nullString = params.nullString)
diff --git a/src/main/scala/com/databricks/spark/redshift/Utils.scala b/src/main/scala/com/databricks/spark/redshift/Utils.scala
index 82c48c3a..65f9a3d7 100644
--- a/src/main/scala/com/databricks/spark/redshift/Utils.scala
+++ b/src/main/scala/com/databricks/spark/redshift/Utils.scala
@@ -19,11 +19,13 @@ package com.databricks.spark.redshift
import java.net.URI
import java.util.UUID
+import com.amazonaws.auth.{AWSCredentials, AWSCredentialsProvider}
+
import scala.collection.JavaConverters._
import scala.util.control.NonFatal
-
-import com.amazonaws.services.s3.{AmazonS3URI, AmazonS3Client}
+import com.amazonaws.services.s3.{AmazonS3Client, AmazonS3URI}
import com.amazonaws.services.s3.model.BucketLifecycleConfiguration
+import com.databricks.spark.redshift.Parameters.MergedParameters
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.FileSystem
import org.slf4j.LoggerFactory
@@ -204,4 +206,46 @@ private[redshift] object Utils {
case _ => None
}
}
+
+ /**
+ * Evaluates hadoop configuration and encryption option. Returns string required to be appended
+ * to UNLOAD/COPY statements to enable or disable encryption.
+ */
+ def getEncryptOptStr(params: MergedParameters, hadoopConfig: Configuration): String = {
+ val sparkRedshiftMasterSymKey = hadoopConfig.get("spark-redshift.master-sym-key")
+ val encryptionMaterialsProvider =
+ hadoopConfig.get("fs.s3.cse.encryptionMaterialsProvider")
+ val enableS3CSEEncryption = hadoopConfig.get("fs.s3.cse.enabled")
+
+ throwExceptionIfConfigurationIsInvalid(
+ params, sparkRedshiftMasterSymKey, encryptionMaterialsProvider, enableS3CSEEncryption)
+
+ if (params.encryption) {
+ "ENCRYPTED"
+ } else {
+ ""
+ }
+ }
+
+ def throwExceptionIfConfigurationIsInvalid(
+ params: MergedParameters,
+ sparkRedshiftMasterSymKey: String,
+ encryptionMaterialsProvider: String,
+ enableS3CSEEncryption: String): Unit =
+
+ if (params.encryption &&
+ (sparkRedshiftMasterSymKey == null
+ || encryptionMaterialsProvider == null
+ || enableS3CSEEncryption == null
+ )
+ ) {
+ throw new IllegalArgumentException(
+ "spark-redshift.master-sym-key, fs.s3.cse.encryptionMaterialsProvider and " +
+ "fs.s3.cse.enabled hadoop configuration must be set immediately after spark " +
+ "job startup, if encryption option is enabled")
+ } else if (!params.encryption && sparkRedshiftMasterSymKey != null) {
+ throw new IllegalArgumentException(
+ "encryption option must be enabled if " +
+ "spark-redshift.master-sym-key hadoop configuration is set")
+ }
}
diff --git a/src/test/scala/com/databricks/spark/redshift/AWSCredentialsUtilsSuite.scala b/src/test/scala/com/databricks/spark/redshift/AWSCredentialsUtilsSuite.scala
index 0315d3a1..b692dd9f 100644
--- a/src/test/scala/com/databricks/spark/redshift/AWSCredentialsUtilsSuite.scala
+++ b/src/test/scala/com/databricks/spark/redshift/AWSCredentialsUtilsSuite.scala
@@ -38,30 +38,75 @@ class AWSCredentialsUtilsSuite extends FunSuite {
}
test("credentialsString with regular keys") {
+ val hadoopConfiguration: Configuration = new Configuration()
val creds = new BasicAWSCredentials("ACCESSKEYID", "SECRET/KEY/WITH/SLASHES")
val params =
Parameters.mergeParameters(baseParams ++ Map("forward_spark_s3_credentials" -> "true"))
- assert(AWSCredentialsUtils.getRedshiftCredentialsString(params, creds) ===
+ assert(AWSCredentialsUtils.getRedshiftCredentialsString(params, creds, hadoopConfiguration) ===
"aws_access_key_id=ACCESSKEYID;aws_secret_access_key=SECRET/KEY/WITH/SLASHES")
}
+ test("credentialsString with regular keys and encryption") {
+ val hadoopConfiguration: Configuration = new Configuration()
+ hadoopConfiguration.set("spark-redshift.master-sym-key", "test-master-sym-key")
+ val creds = new BasicAWSCredentials("ACCESSKEYID", "SECRET/KEY/WITH/SLASHES")
+ val params =
+ Parameters.mergeParameters(
+ baseParams ++ Map("forward_spark_s3_credentials" -> "true", "encryption" -> "true")
+ )
+ assert(AWSCredentialsUtils.getRedshiftCredentialsString(params, creds, hadoopConfiguration) ===
+ "aws_access_key_id=ACCESSKEYID;" +
+ "aws_secret_access_key=SECRET/KEY/WITH/SLASHES;" +
+ "master_symmetric_key=test-master-sym-key")
+ }
+
test("credentialsString with STS temporary keys") {
+ val hadoopConfiguration: Configuration = new Configuration()
val params = Parameters.mergeParameters(baseParams ++ Map(
"temporary_aws_access_key_id" -> "ACCESSKEYID",
"temporary_aws_secret_access_key" -> "SECRET/KEY",
"temporary_aws_session_token" -> "SESSION/Token"))
- assert(AWSCredentialsUtils.getRedshiftCredentialsString(params, null) ===
+ assert(AWSCredentialsUtils.getRedshiftCredentialsString(params, null, hadoopConfiguration) ===
"aws_access_key_id=ACCESSKEYID;aws_secret_access_key=SECRET/KEY;token=SESSION/Token")
}
+ test("credentialsString with STS temporary keys and encryption") {
+ val hadoopConfiguration: Configuration = new Configuration()
+ hadoopConfiguration.set("spark-redshift.master-sym-key", "test-master-sym-key")
+ val params = Parameters.mergeParameters(baseParams ++ Map(
+ "temporary_aws_access_key_id" -> "ACCESSKEYID",
+ "temporary_aws_secret_access_key" -> "SECRET/KEY",
+ "temporary_aws_session_token" -> "SESSION/Token",
+ "encryption" -> "true"))
+ assert(AWSCredentialsUtils.getRedshiftCredentialsString(params, null, hadoopConfiguration) ===
+ "aws_access_key_id=ACCESSKEYID;" +
+ "aws_secret_access_key=SECRET/KEY;" +
+ "token=SESSION/Token;" +
+ "master_symmetric_key=test-master-sym-key")
+ }
+
test("Configured IAM roles should take precedence") {
+ val hadoopConfiguration: Configuration = new Configuration()
val creds = new BasicSessionCredentials("ACCESSKEYID", "SECRET/KEY", "SESSION/Token")
val iamRole = "arn:aws:iam::123456789000:role/redshift_iam_role"
val params = Parameters.mergeParameters(baseParams ++ Map("aws_iam_role" -> iamRole))
- assert(AWSCredentialsUtils.getRedshiftCredentialsString(params, null) ===
+ assert(AWSCredentialsUtils.getRedshiftCredentialsString(params, null, hadoopConfiguration) ===
s"aws_iam_role=$iamRole")
}
+ test("Configured IAM roles should take precedence with encryption") {
+ val hadoopConfiguration: Configuration = new Configuration()
+ hadoopConfiguration.set("spark-redshift.master-sym-key", "test-master-sym-key")
+ val creds = new BasicSessionCredentials("ACCESSKEYID", "SECRET/KEY", "SESSION/Token")
+ val iamRole = "arn:aws:iam::123456789000:role/redshift_iam_role"
+ val params = Parameters.mergeParameters(baseParams ++ Map(
+ "aws_iam_role" -> iamRole,
+ "encryption" -> "true"
+ ))
+ assert(AWSCredentialsUtils.getRedshiftCredentialsString(params, null, hadoopConfiguration) ===
+ s"aws_iam_role=$iamRole;master_symmetric_key=test-master-sym-key")
+ }
+
test("AWSCredentials.load() STS temporary keys should take precedence") {
val conf = new Configuration(false)
conf.set("fs.s3.awsAccessKeyId", "CONFID")