diff --git a/README.md b/README.md index 794e5c4f..23c10b51 100644 --- a/README.md +++ b/README.md @@ -402,16 +402,35 @@ The following describes how each connection can be authenticated: [_JDBC Driver Configuration Options_](http://docs.aws.amazon.com/redshift/latest/mgmt/configure-jdbc-options.html) to add the appropriate SSL options to the JDBC `url` used with this library. -- **Encrypting `UNLOAD` data stored in S3 (data stored when reading from Redshift)**: According to the Redshift documentation - on [_Unloading Data to S3_](http://docs.aws.amazon.com/redshift/latest/dg/t_Unloading_tables.html), - "UNLOAD automatically encrypts data files using Amazon S3 server-side encryption (SSE-S3)." +- **S3 Client Side Encryption (CSE)**: Redshift supports S3 CSE with a custom master symmetric key + (see: [_S3 CSE on EMRFS_](http://docs.aws.amazon.com/emr/latest/ReleaseGuide/emr-4.5.0//emr-cluster-configuration-object-encryption.html), + [S3 CSE on Databricks FS](https://docs.databricks.com/spark/latest/data-sources/amazon-s3.html?highlight=encryption#client-side-s3-encryption)). + This library uses RedshiftEncryptionMaterialsProvider to configure the underlying filesystem (EMRFS or Databricks FS). User can turn on S3 CSE by: + 1. Setting "fs.s3.cse.enabled" to "true" at the start of the job. + 2. Setting "fs.s3.cse.encryptionMaterialsProvider" to "com.databricks.spark.redshift.RedshiftEncryptionMaterialsProvider" at the start of the job + 3. Setting "spark-redshift.master-sym-key" to master_symmetric_key to be used by Redshift at the start of the job. + 4. Setting "encryption" option to "true" in the parameters. + + **NOTE: Once configured cluster will keep using the master symmetric key to read/write any object to S3. New job must be started to change encryption handling on the cluster.** + + For example, in Scala add: + ```scala +sc.hadoopConfiguration.set("fs.s3.cse.enabled", "true") +sc.hadoopConfiguration.set("fs.s3.cse.encryptionMaterialsProvider", "com.databricks.spark.redshift.RedshiftEncryptionMaterialsProvider") +sc.hadoopConfiguration.set("spark-redshift.master-sym-key", "YOUR_MASTER_SYMMETRIC_KEY") +``` + +- **Encrypting `UNLOAD` data stored in S3 (data stored when reading from Redshift)**: - Redshift also supports client-side encryption with a custom key - (see: [_Unloading Encrypted Data Files_](http://docs.aws.amazon.com/redshift/latest/dg/t_unloading_encrypted_files.html)) - but this library currently lacks the capability to specify the required symmetric key. + * Usig S3 server-side encryption (S3 SSE): According to the Redshift documentation + on [_S3 Client Side Encryption_](http://docs.aws.amazon.com/redshift/latest/dg/t_Unloading_tables.html), + "UNLOAD automatically encrypts data files using Amazon S3 SSE." + + * Using S3 client-side encryption (S3 CSE): Once configured using the steps provided in the previous section, + UNLOAD automatically encrypts data files using Amazon S3 CSE. - **Encrypting `COPY` data stored in S3 (data stored when writing to Redshift)**: - According to the Redshift documentation on + * Usig S3 server-side encryption (S3 SSE): According to the Redshift documentation on [_Loading Encrypted Data Files from Amazon S3_](http://docs.aws.amazon.com/redshift/latest/dg/c_loading-encrypted-files.html): > You can use the COPY command to load data files that were uploaded to Amazon S3 using @@ -423,6 +442,62 @@ The following describes how each connection can be authenticated: are using `s3a`, `s3n`, EMRFS, etc.). Note that the `MANIFEST` file (a list of all files written) will not be encrypted. + * Using S3 client-side encryption (S3 CSE): Once configured using the steps provided in the previous section, + COPY automatically encrypts data files using Amazon S3 client-side encryption (S3 CSE). + +Following is an example to read and write data using S3 CSE: +```scala +import org.apache.spark.sql._ + +val sc = // existing SparkContext +val sqlContext = new SQLContext(sc) + +// Enable encryption by setting required hadoop configuration +sc.hadoopConfiguration.set("fs.s3.cse.enabled", "true") +sc.hadoopConfiguration.set("fs.s3.cse.encryptionMaterialsProvider", "com.databricks.spark.redshift.RedshiftEncryptionMaterialsProvider") +sc.hadoopConfiguration.set("spark-redshift.master-sym-key", "YOUR_MASTER_SYMMETRIC_KEY") + +// Get some data from a Redshift table +val df: DataFrame = sqlContext.read + .format("com.databricks.spark.redshift") + .option("url", "jdbc:redshift://redshifthost:5439/database?user=username&password=pass") + .option("dbtable", "my_table") + .option("tempdir", "s3n://path/for/temp/data") + .option("encryption", "true") + .load() + +// Can also load data from a Redshift query +val df: DataFrame = sqlContext.read + .format("com.databricks.spark.redshift") + .option("url", "jdbc:redshift://redshifthost:5439/database?user=username&password=pass") + .option("query", "select x, count(*) my_table group by x") + .option("tempdir", "s3n://path/for/temp/data") + .option("encryption", "true") + .load() + +// Apply some transformations to the data as per normal, then you can use the +// Data Source API to write the data back to another table + +df.write + .format("com.databricks.spark.redshift") + .option("url", "jdbc:redshift://redshifthost:5439/database?user=username&password=pass") + .option("dbtable", "my_table_copy") + .option("tempdir", "s3n://path/for/temp/data") + .option("encryption", "true") + .mode("error") + .save() + +// Using IAM Role based authentication +df.write + .format("com.databricks.spark.redshift") + .option("url", "jdbc:redshift://redshifthost:5439/database?user=username&password=pass") + .option("dbtable", "my_table_copy") + .option("aws_iam_role", "arn:aws:iam::123456789000:role/redshift_iam_role") + .option("tempdir", "s3n://path/for/temp/data") + .option("encryption", "true") + .mode("error") + .save() +``` ### Parameters @@ -513,6 +588,19 @@ need to be configured to allow access from your driver application. No default AWS session token corresponding to provided access key. + + encryption + No + false + +

+ Toggles using Amazon S3 Client Side Encryption (CSE):
+ When set to true, CSE will be used.
+ When using CSE, the following needs to happen:
+ 1. The master symmetric key must be set in the Hadoop configuration via the "spark-redshift.master-sym-key" property.
+ 2. EncryptionMaterialsProvider property must be set using "fs.s3.cse.encryptionMaterialsProvider" property. +

+ tempdir Yes diff --git a/src/main/scala/com/databricks/spark/redshift/AWSCredentialsUtils.scala b/src/main/scala/com/databricks/spark/redshift/AWSCredentialsUtils.scala index 47ad0b06..65985bfc 100644 --- a/src/main/scala/com/databricks/spark/redshift/AWSCredentialsUtils.scala +++ b/src/main/scala/com/databricks/spark/redshift/AWSCredentialsUtils.scala @@ -28,8 +28,26 @@ private[redshift] object AWSCredentialsUtils { /** * Generates a credentials string for use in Redshift COPY and UNLOAD statements. * Favors a configured `aws_iam_role` if available in the parameters. + * Adds master symmetric key to the credential string if S3 CSE is enabled. + * NOTE: This method deals with encryption because Redshift's CREDENTIALS option + * considers master symmetric key a part of credential string. */ def getRedshiftCredentialsString( + params: MergedParameters, + sparkAwsCredentials: AWSCredentials, + hadoopConfiguration: Configuration + ): String = { + + getRedshiftAWSCredentialsSubString(params, sparkAwsCredentials) + + getRedshiftEncryptionSubString(params, hadoopConfiguration) + } + + + /** + * Generates a credentials string for use in Redshift COPY and UNLOAD statements. + * Favors a configured `aws_iam_role` if available in the parameters. + */ + private def getRedshiftAWSCredentialsSubString( params: MergedParameters, sparkAwsCredentials: AWSCredentials): String = { @@ -43,6 +61,7 @@ private[redshift] object AWSCredentialsUtils { s"aws_secret_access_key=${creds.getAWSSecretKey}" } } + if (params.iamRole.isDefined) { s"aws_iam_role=${params.iamRole.get}" } else if (params.temporaryAWSCredentials.isDefined) { @@ -106,4 +125,17 @@ private[redshift] object AWSCredentialsUtils { throw new IllegalArgumentException(s"Unrecognized scheme $other; expected s3, s3n, or s3a") } } + + /** + * Generates encryption string to be appended to the Redshift Credentials string + */ + private def getRedshiftEncryptionSubString( + params: MergedParameters, + hadoopConfiguration: Configuration): String = { + if (params.encryption) { + s";master_symmetric_key=${hadoopConfiguration.get("spark-redshift.master-sym-key")}" + } else { + "" + } + } } diff --git a/src/main/scala/com/databricks/spark/redshift/Parameters.scala b/src/main/scala/com/databricks/spark/redshift/Parameters.scala index 875f5b75..f0d5ddf4 100644 --- a/src/main/scala/com/databricks/spark/redshift/Parameters.scala +++ b/src/main/scala/com/databricks/spark/redshift/Parameters.scala @@ -35,6 +35,7 @@ private[redshift] object Parameters { "tempformat" -> "AVRO", "csvnullstring" -> "@NULL@", "overwrite" -> "false", + "encryption" -> "false", "diststyle" -> "EVEN", "usestagingtable" -> "true", "preactions" -> ";", @@ -159,6 +160,13 @@ private[redshift] object Parameters { ) yield (user, password) } + /** + * S3 Client Side Encryption will be enabled if encryption option is set + * to true. Master symmetric key is taken from hadoop configuration + * property spark-redshift.master-sym-key + */ + def encryption: Boolean = parameters("encryption").toBoolean + /** * A JDBC URL, of the format: * diff --git a/src/main/scala/com/databricks/spark/redshift/RedshiftEncryptionMaterialsProvider.scala b/src/main/scala/com/databricks/spark/redshift/RedshiftEncryptionMaterialsProvider.scala new file mode 100644 index 00000000..7f1bc060 --- /dev/null +++ b/src/main/scala/com/databricks/spark/redshift/RedshiftEncryptionMaterialsProvider.scala @@ -0,0 +1,67 @@ +package com.databricks.spark.redshift + +import java.util +import javax.crypto.SecretKey +import javax.crypto.spec.SecretKeySpec + +import com.amazonaws.services.s3.model.{EncryptionMaterials, EncryptionMaterialsProvider} +import com.amazonaws.util.Base64 +import org.apache.hadoop.conf.{Configurable, Configuration} + + +/** + * This class provides encryption materials to be used by spark-redshfit + * library. It gets the master symmetric key from hadoop configuration, + * converts it into EncryptionMaterials and returns them. + */ +object RedshiftEncryptionMaterialsProvider { + /** + * Encryption algorithm used for by Redshift. + */ + private val ENCRYPTION_ALGORITHM: String = "AES" + /** + * This is the hadoop configuration property name that contains + * redshift master symmetric key. + */ + private val SPARK_REDSHIFT_MASTER_SYM_KEY: String = "spark-redshift.master-sym-key" +} + +class RedshiftEncryptionMaterialsProvider + extends EncryptionMaterialsProvider + with Configurable { + /** + * The hadoop configuration. + */ + private var conf: Configuration = null + + def setConf(conf: Configuration) { + this.conf = conf + } + + def getConf: Configuration = { + return this.conf + } + + def refresh { + } + + def getEncryptionMaterials(materialsDescription: Map[String, String]): + EncryptionMaterials = { + return getEncryptionMaterials + } + + override def getEncryptionMaterials( + materialsDescription: util.Map[String, String]): + EncryptionMaterials = getEncryptionMaterials + + override def getEncryptionMaterials: EncryptionMaterials = { + val masterKey: SecretKey = + new SecretKeySpec( + Base64.decode( + conf.get( + RedshiftEncryptionMaterialsProvider. + SPARK_REDSHIFT_MASTER_SYM_KEY)), + RedshiftEncryptionMaterialsProvider.ENCRYPTION_ALGORITHM) + return new EncryptionMaterials(masterKey) + } +} diff --git a/src/main/scala/com/databricks/spark/redshift/RedshiftRelation.scala b/src/main/scala/com/databricks/spark/redshift/RedshiftRelation.scala index 31dc11b2..e01277f3 100644 --- a/src/main/scala/com/databricks/spark/redshift/RedshiftRelation.scala +++ b/src/main/scala/com/databricks/spark/redshift/RedshiftRelation.scala @@ -181,22 +181,31 @@ private[redshift] case class RedshiftRelation( // Always quote column names: val columnList = requiredColumns.map(col => s""""$col"""").mkString(", ") val whereClause = FilterPushdown.buildWhereClause(schema, filters) + val hadoopConfiguration = sqlContext.sparkContext.hadoopConfiguration val credsString: String = - AWSCredentialsUtils.getRedshiftCredentialsString(params, creds.getCredentials) + AWSCredentialsUtils.getRedshiftCredentialsString( + params, creds.getCredentials, hadoopConfiguration) val query = { // Since the query passed to UNLOAD will be enclosed in single quotes, we need to escape // any backslashes and single quotes that appear in the query itself - val escapedTableNameOrSubqury = tableNameOrSubquery.replace("\\", "\\\\").replace("'", "\\'") - s"SELECT $columnList FROM $escapedTableNameOrSubqury $whereClause" + val escapedTableNameOrSubquery = tableNameOrSubquery.replace("\\", "\\\\").replace("'", "\\'") + s"SELECT $columnList FROM $escapedTableNameOrSubquery $whereClause" } // We need to remove S3 credentials from the unload path URI because they will conflict with // the credentials passed via `credsString`. val fixedUrl = Utils.fixS3Url(Utils.removeCredentialsFromURI(new URI(tempDir)).toString) - - s"UNLOAD ('$query') TO '$fixedUrl' WITH CREDENTIALS '$credsString' ESCAPE MANIFEST" + val encryptOptStr: String = + Utils.getEncryptOptStr(params, sqlContext.sparkContext.hadoopConfiguration) + + s"UNLOAD ('$query') " + + s"TO '$fixedUrl' " + + s"WITH CREDENTIALS '$credsString' " + + s"ESCAPE " + + s"MANIFEST " + + s"$encryptOptStr" } - private def pruneSchema(schema: StructType, columns: Array[String]): StructType = { + private def pruneSchema(schema: StructType, columns: Array[String]): StructType = { val fieldMap = Map(schema.fields.map(x => x.name -> x): _*) new StructType(columns.map(name => fieldMap(name))) } diff --git a/src/main/scala/com/databricks/spark/redshift/RedshiftWriter.scala b/src/main/scala/com/databricks/spark/redshift/RedshiftWriter.scala index 8383231d..d62be9b6 100644 --- a/src/main/scala/com/databricks/spark/redshift/RedshiftWriter.scala +++ b/src/main/scala/com/databricks/spark/redshift/RedshiftWriter.scala @@ -19,17 +19,17 @@ package com.databricks.spark.redshift import java.net.URI import java.sql.{Connection, Date, SQLException, Timestamp} -import com.amazonaws.auth.AWSCredentialsProvider -import com.amazonaws.services.s3.AmazonS3Client +import com.amazonaws.auth.{AWSCredentials, AWSCredentialsProvider} +import com.amazonaws.services.s3.model.PutObjectRequest +import com.amazonaws.services.s3.{AmazonS3Client, AmazonS3URI} import org.apache.hadoop.fs.{FileSystem, Path} - import org.apache.spark.TaskContext import org.slf4j.LoggerFactory + import scala.collection.mutable import scala.util.control.NonFatal - import com.databricks.spark.redshift.Parameters.MergedParameters - +import org.apache.commons.io.IOUtils import org.apache.spark.rdd.RDD import org.apache.spark.sql.{DataFrame, Row, SQLContext, SaveMode} import org.apache.spark.sql.types._ @@ -92,14 +92,19 @@ private[redshift] class RedshiftWriter( creds: AWSCredentialsProvider, manifestUrl: String): String = { val credsString: String = - AWSCredentialsUtils.getRedshiftCredentialsString(params, creds.getCredentials) + AWSCredentialsUtils.getRedshiftCredentialsString( + params, creds.getCredentials, sqlContext.sparkContext.hadoopConfiguration) val fixedUrl = Utils.fixS3Url(manifestUrl) + + val encryptOptString: String = + Utils.getEncryptOptStr(params, sqlContext.sparkContext.hadoopConfiguration) + val format = params.tempFormat match { case "AVRO" => "AVRO 'auto'" case csv if csv == "CSV" || csv == "CSV GZIP" => csv + s" NULL AS '${params.nullString}'" } s"COPY ${params.table.get} FROM '$fixedUrl' CREDENTIALS '$credsString' FORMAT AS " + - s"${format} manifest ${params.extraCopyOptions}" + s"${format} manifest ${params.extraCopyOptions} $encryptOptString" } /** @@ -209,6 +214,7 @@ private[redshift] class RedshiftWriter( private def unloadData( sqlContext: SQLContext, data: DataFrame, + creds: AWSCredentialsProvider, tempDir: String, tempFormat: String, nullString: String): Option[String] = { @@ -319,16 +325,30 @@ private[redshift] class RedshiftWriter( } val manifest = s"""{"entries": [${manifestEntries.mkString(",\n")}]}""" val manifestPath = sanitizedTempDir + "/manifest.json" - val fsDataOut = fs.create(new Path(manifestPath)) - try { - fsDataOut.write(manifest.getBytes("utf-8")) - } finally { - fsDataOut.close() - } + + writeRedshiftManifestUnencrypted(creds, manifest, manifestPath) Some(manifestPath) } } + /** + * Writes Redshift Manifest file to S3 without encryption. + */ + private def writeRedshiftManifestUnencrypted( + creds: AWSCredentialsProvider, + manifest: String, + manifestPath: String) = { + /* By default EncryptionMaterialsProviders encrypt every S3 object using client side + * encryption. Redshift expects manifest file to be unencrypted. Hence, instantiating + * a new unencrypted AmazonS3Client object to write the manifest to S3. + */ + val s3Client = s3ClientFactory(creds) + val s3URI = new AmazonS3URI(manifestPath) + val putObjectRequest: PutObjectRequest = new PutObjectRequest(s3URI.getBucket(), + s3URI.getKey, IOUtils.toInputStream(manifest, "utf-8"), null); + s3Client.putObject(putObjectRequest) + } + /** * Write a DataFrame to a Redshift table, using S3 and Avro or CSV serialization */ @@ -392,6 +412,7 @@ private[redshift] class RedshiftWriter( val manifestUrl = unloadData( sqlContext, data, + creds, tempDir = params.createPerQueryTempDir(), tempFormat = params.tempFormat, nullString = params.nullString) diff --git a/src/main/scala/com/databricks/spark/redshift/Utils.scala b/src/main/scala/com/databricks/spark/redshift/Utils.scala index 82c48c3a..65f9a3d7 100644 --- a/src/main/scala/com/databricks/spark/redshift/Utils.scala +++ b/src/main/scala/com/databricks/spark/redshift/Utils.scala @@ -19,11 +19,13 @@ package com.databricks.spark.redshift import java.net.URI import java.util.UUID +import com.amazonaws.auth.{AWSCredentials, AWSCredentialsProvider} + import scala.collection.JavaConverters._ import scala.util.control.NonFatal - -import com.amazonaws.services.s3.{AmazonS3URI, AmazonS3Client} +import com.amazonaws.services.s3.{AmazonS3Client, AmazonS3URI} import com.amazonaws.services.s3.model.BucketLifecycleConfiguration +import com.databricks.spark.redshift.Parameters.MergedParameters import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.FileSystem import org.slf4j.LoggerFactory @@ -204,4 +206,46 @@ private[redshift] object Utils { case _ => None } } + + /** + * Evaluates hadoop configuration and encryption option. Returns string required to be appended + * to UNLOAD/COPY statements to enable or disable encryption. + */ + def getEncryptOptStr(params: MergedParameters, hadoopConfig: Configuration): String = { + val sparkRedshiftMasterSymKey = hadoopConfig.get("spark-redshift.master-sym-key") + val encryptionMaterialsProvider = + hadoopConfig.get("fs.s3.cse.encryptionMaterialsProvider") + val enableS3CSEEncryption = hadoopConfig.get("fs.s3.cse.enabled") + + throwExceptionIfConfigurationIsInvalid( + params, sparkRedshiftMasterSymKey, encryptionMaterialsProvider, enableS3CSEEncryption) + + if (params.encryption) { + "ENCRYPTED" + } else { + "" + } + } + + def throwExceptionIfConfigurationIsInvalid( + params: MergedParameters, + sparkRedshiftMasterSymKey: String, + encryptionMaterialsProvider: String, + enableS3CSEEncryption: String): Unit = + + if (params.encryption && + (sparkRedshiftMasterSymKey == null + || encryptionMaterialsProvider == null + || enableS3CSEEncryption == null + ) + ) { + throw new IllegalArgumentException( + "spark-redshift.master-sym-key, fs.s3.cse.encryptionMaterialsProvider and " + + "fs.s3.cse.enabled hadoop configuration must be set immediately after spark " + + "job startup, if encryption option is enabled") + } else if (!params.encryption && sparkRedshiftMasterSymKey != null) { + throw new IllegalArgumentException( + "encryption option must be enabled if " + + "spark-redshift.master-sym-key hadoop configuration is set") + } } diff --git a/src/test/scala/com/databricks/spark/redshift/AWSCredentialsUtilsSuite.scala b/src/test/scala/com/databricks/spark/redshift/AWSCredentialsUtilsSuite.scala index 0315d3a1..b692dd9f 100644 --- a/src/test/scala/com/databricks/spark/redshift/AWSCredentialsUtilsSuite.scala +++ b/src/test/scala/com/databricks/spark/redshift/AWSCredentialsUtilsSuite.scala @@ -38,30 +38,75 @@ class AWSCredentialsUtilsSuite extends FunSuite { } test("credentialsString with regular keys") { + val hadoopConfiguration: Configuration = new Configuration() val creds = new BasicAWSCredentials("ACCESSKEYID", "SECRET/KEY/WITH/SLASHES") val params = Parameters.mergeParameters(baseParams ++ Map("forward_spark_s3_credentials" -> "true")) - assert(AWSCredentialsUtils.getRedshiftCredentialsString(params, creds) === + assert(AWSCredentialsUtils.getRedshiftCredentialsString(params, creds, hadoopConfiguration) === "aws_access_key_id=ACCESSKEYID;aws_secret_access_key=SECRET/KEY/WITH/SLASHES") } + test("credentialsString with regular keys and encryption") { + val hadoopConfiguration: Configuration = new Configuration() + hadoopConfiguration.set("spark-redshift.master-sym-key", "test-master-sym-key") + val creds = new BasicAWSCredentials("ACCESSKEYID", "SECRET/KEY/WITH/SLASHES") + val params = + Parameters.mergeParameters( + baseParams ++ Map("forward_spark_s3_credentials" -> "true", "encryption" -> "true") + ) + assert(AWSCredentialsUtils.getRedshiftCredentialsString(params, creds, hadoopConfiguration) === + "aws_access_key_id=ACCESSKEYID;" + + "aws_secret_access_key=SECRET/KEY/WITH/SLASHES;" + + "master_symmetric_key=test-master-sym-key") + } + test("credentialsString with STS temporary keys") { + val hadoopConfiguration: Configuration = new Configuration() val params = Parameters.mergeParameters(baseParams ++ Map( "temporary_aws_access_key_id" -> "ACCESSKEYID", "temporary_aws_secret_access_key" -> "SECRET/KEY", "temporary_aws_session_token" -> "SESSION/Token")) - assert(AWSCredentialsUtils.getRedshiftCredentialsString(params, null) === + assert(AWSCredentialsUtils.getRedshiftCredentialsString(params, null, hadoopConfiguration) === "aws_access_key_id=ACCESSKEYID;aws_secret_access_key=SECRET/KEY;token=SESSION/Token") } + test("credentialsString with STS temporary keys and encryption") { + val hadoopConfiguration: Configuration = new Configuration() + hadoopConfiguration.set("spark-redshift.master-sym-key", "test-master-sym-key") + val params = Parameters.mergeParameters(baseParams ++ Map( + "temporary_aws_access_key_id" -> "ACCESSKEYID", + "temporary_aws_secret_access_key" -> "SECRET/KEY", + "temporary_aws_session_token" -> "SESSION/Token", + "encryption" -> "true")) + assert(AWSCredentialsUtils.getRedshiftCredentialsString(params, null, hadoopConfiguration) === + "aws_access_key_id=ACCESSKEYID;" + + "aws_secret_access_key=SECRET/KEY;" + + "token=SESSION/Token;" + + "master_symmetric_key=test-master-sym-key") + } + test("Configured IAM roles should take precedence") { + val hadoopConfiguration: Configuration = new Configuration() val creds = new BasicSessionCredentials("ACCESSKEYID", "SECRET/KEY", "SESSION/Token") val iamRole = "arn:aws:iam::123456789000:role/redshift_iam_role" val params = Parameters.mergeParameters(baseParams ++ Map("aws_iam_role" -> iamRole)) - assert(AWSCredentialsUtils.getRedshiftCredentialsString(params, null) === + assert(AWSCredentialsUtils.getRedshiftCredentialsString(params, null, hadoopConfiguration) === s"aws_iam_role=$iamRole") } + test("Configured IAM roles should take precedence with encryption") { + val hadoopConfiguration: Configuration = new Configuration() + hadoopConfiguration.set("spark-redshift.master-sym-key", "test-master-sym-key") + val creds = new BasicSessionCredentials("ACCESSKEYID", "SECRET/KEY", "SESSION/Token") + val iamRole = "arn:aws:iam::123456789000:role/redshift_iam_role" + val params = Parameters.mergeParameters(baseParams ++ Map( + "aws_iam_role" -> iamRole, + "encryption" -> "true" + )) + assert(AWSCredentialsUtils.getRedshiftCredentialsString(params, null, hadoopConfiguration) === + s"aws_iam_role=$iamRole;master_symmetric_key=test-master-sym-key") + } + test("AWSCredentials.load() STS temporary keys should take precedence") { val conf = new Configuration(false) conf.set("fs.s3.awsAccessKeyId", "CONFID")