Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
102 changes: 95 additions & 7 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -402,16 +402,35 @@ The following describes how each connection can be authenticated:
[_JDBC Driver Configuration Options_](http://docs.aws.amazon.com/redshift/latest/mgmt/configure-jdbc-options.html) to add the appropriate SSL options
to the JDBC `url` used with this library.

- **Encrypting `UNLOAD` data stored in S3 (data stored when reading from Redshift)**: According to the Redshift documentation
on [_Unloading Data to S3_](http://docs.aws.amazon.com/redshift/latest/dg/t_Unloading_tables.html),
"UNLOAD automatically encrypts data files using Amazon S3 server-side encryption (SSE-S3)."
- **S3 Client Side Encryption (CSE)**: Redshift supports S3 CSE with a custom master symmetric key
(see: [_S3 CSE on EMRFS_](http://docs.aws.amazon.com/emr/latest/ReleaseGuide/emr-4.5.0//emr-cluster-configuration-object-encryption.html),
[S3 CSE on Databricks FS](https://docs.databricks.com/spark/latest/data-sources/amazon-s3.html?highlight=encryption#client-side-s3-encryption)).
This library uses RedshiftEncryptionMaterialsProvider to configure the underlying filesystem (EMRFS or Databricks FS). User can turn on S3 CSE by:
1. Setting "fs.s3.cse.enabled" to "true" at the start of the job.
2. Setting "fs.s3.cse.encryptionMaterialsProvider" to "com.databricks.spark.redshift.RedshiftEncryptionMaterialsProvider" at the start of the job
3. Setting "spark-redshift.master-sym-key" to master_symmetric_key to be used by Redshift at the start of the job.
4. Setting "encryption" option to "true" in the parameters.

**NOTE: Once configured cluster will keep using the master symmetric key to read/write any object to S3. New job must be started to change encryption handling on the cluster.**

For example, in Scala add:
```scala
sc.hadoopConfiguration.set("fs.s3.cse.enabled", "true")
sc.hadoopConfiguration.set("fs.s3.cse.encryptionMaterialsProvider", "com.databricks.spark.redshift.RedshiftEncryptionMaterialsProvider")
sc.hadoopConfiguration.set("spark-redshift.master-sym-key", "YOUR_MASTER_SYMMETRIC_KEY")
```

- **Encrypting `UNLOAD` data stored in S3 (data stored when reading from Redshift)**:

Redshift also supports client-side encryption with a custom key
(see: [_Unloading Encrypted Data Files_](http://docs.aws.amazon.com/redshift/latest/dg/t_unloading_encrypted_files.html))
but this library currently lacks the capability to specify the required symmetric key.
* Usig S3 server-side encryption (S3 SSE): According to the Redshift documentation
on [_S3 Client Side Encryption_](http://docs.aws.amazon.com/redshift/latest/dg/t_Unloading_tables.html),
"UNLOAD automatically encrypts data files using Amazon S3 SSE."

* Using S3 client-side encryption (S3 CSE): Once configured using the steps provided in the previous section,
UNLOAD automatically encrypts data files using Amazon S3 CSE.

- **Encrypting `COPY` data stored in S3 (data stored when writing to Redshift)**:
According to the Redshift documentation on
* Usig S3 server-side encryption (S3 SSE): According to the Redshift documentation on
[_Loading Encrypted Data Files from Amazon S3_](http://docs.aws.amazon.com/redshift/latest/dg/c_loading-encrypted-files.html):

> You can use the COPY command to load data files that were uploaded to Amazon S3 using
Expand All @@ -423,6 +442,62 @@ The following describes how each connection can be authenticated:
are using `s3a`, `s3n`, EMRFS, etc.).
Note that the `MANIFEST` file (a list of all files written) will not be encrypted.

* Using S3 client-side encryption (S3 CSE): Once configured using the steps provided in the previous section,
COPY automatically encrypts data files using Amazon S3 client-side encryption (S3 CSE).

Following is an example to read and write data using S3 CSE:
```scala
import org.apache.spark.sql._

val sc = // existing SparkContext
val sqlContext = new SQLContext(sc)

// Enable encryption by setting required hadoop configuration
sc.hadoopConfiguration.set("fs.s3.cse.enabled", "true")
sc.hadoopConfiguration.set("fs.s3.cse.encryptionMaterialsProvider", "com.databricks.spark.redshift.RedshiftEncryptionMaterialsProvider")
sc.hadoopConfiguration.set("spark-redshift.master-sym-key", "YOUR_MASTER_SYMMETRIC_KEY")

// Get some data from a Redshift table
val df: DataFrame = sqlContext.read
.format("com.databricks.spark.redshift")
.option("url", "jdbc:redshift://redshifthost:5439/database?user=username&password=pass")
.option("dbtable", "my_table")
.option("tempdir", "s3n://path/for/temp/data")
.option("encryption", "true")
.load()

// Can also load data from a Redshift query
val df: DataFrame = sqlContext.read
.format("com.databricks.spark.redshift")
.option("url", "jdbc:redshift://redshifthost:5439/database?user=username&password=pass")
.option("query", "select x, count(*) my_table group by x")
.option("tempdir", "s3n://path/for/temp/data")
.option("encryption", "true")
.load()

// Apply some transformations to the data as per normal, then you can use the
// Data Source API to write the data back to another table

df.write
.format("com.databricks.spark.redshift")
.option("url", "jdbc:redshift://redshifthost:5439/database?user=username&password=pass")
.option("dbtable", "my_table_copy")
.option("tempdir", "s3n://path/for/temp/data")
.option("encryption", "true")
.mode("error")
.save()

// Using IAM Role based authentication
df.write
.format("com.databricks.spark.redshift")
.option("url", "jdbc:redshift://redshifthost:5439/database?user=username&password=pass")
.option("dbtable", "my_table_copy")
.option("aws_iam_role", "arn:aws:iam::123456789000:role/redshift_iam_role")
.option("tempdir", "s3n://path/for/temp/data")
.option("encryption", "true")
.mode("error")
.save()
```

### Parameters

Expand Down Expand Up @@ -513,6 +588,19 @@ need to be configured to allow access from your driver application.
<td>No default</td>
<td>AWS session token corresponding to provided access key.</td>
</tr>
<tr>
<td><tt>encryption</tt></td>
<td>No</td>
<td>false</td>
<td>
<p>
Toggles using Amazon S3 Client Side Encryption (CSE): <br>
When set to true, CSE will be used.<br>
When using CSE, the following needs to happen:<br>
1. The master symmetric key must be set in the Hadoop configuration via the "spark-redshift.master-sym-key" property. <br>
2. EncryptionMaterialsProvider property must be set using "fs.s3.cse.encryptionMaterialsProvider" property.
</p>
</tr>
<tr>
<td><tt>tempdir</tt></td>
<td>Yes</td>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,26 @@ private[redshift] object AWSCredentialsUtils {
/**
* Generates a credentials string for use in Redshift COPY and UNLOAD statements.
* Favors a configured `aws_iam_role` if available in the parameters.
* Adds master symmetric key to the credential string if S3 CSE is enabled.
* NOTE: This method deals with encryption because Redshift's CREDENTIALS option
* considers master symmetric key a part of credential string.
*/
def getRedshiftCredentialsString(
params: MergedParameters,
sparkAwsCredentials: AWSCredentials,
hadoopConfiguration: Configuration
): String = {

getRedshiftAWSCredentialsSubString(params, sparkAwsCredentials) +
getRedshiftEncryptionSubString(params, hadoopConfiguration)
}


/**
* Generates a credentials string for use in Redshift COPY and UNLOAD statements.
* Favors a configured `aws_iam_role` if available in the parameters.
*/
private def getRedshiftAWSCredentialsSubString(
params: MergedParameters,
sparkAwsCredentials: AWSCredentials): String = {

Expand All @@ -43,6 +61,7 @@ private[redshift] object AWSCredentialsUtils {
s"aws_secret_access_key=${creds.getAWSSecretKey}"
}
}

if (params.iamRole.isDefined) {
s"aws_iam_role=${params.iamRole.get}"
} else if (params.temporaryAWSCredentials.isDefined) {
Expand Down Expand Up @@ -106,4 +125,17 @@ private[redshift] object AWSCredentialsUtils {
throw new IllegalArgumentException(s"Unrecognized scheme $other; expected s3, s3n, or s3a")
}
}

/**
* Generates encryption string to be appended to the Redshift Credentials string
*/
private def getRedshiftEncryptionSubString(
params: MergedParameters,
hadoopConfiguration: Configuration): String = {
if (params.encryption) {
s";master_symmetric_key=${hadoopConfiguration.get("spark-redshift.master-sym-key")}"
} else {
""
}
}
}
8 changes: 8 additions & 0 deletions src/main/scala/com/databricks/spark/redshift/Parameters.scala
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ private[redshift] object Parameters {
"tempformat" -> "AVRO",
"csvnullstring" -> "@NULL@",
"overwrite" -> "false",
"encryption" -> "false",
"diststyle" -> "EVEN",
"usestagingtable" -> "true",
"preactions" -> ";",
Expand Down Expand Up @@ -159,6 +160,13 @@ private[redshift] object Parameters {
) yield (user, password)
}

/**
* S3 Client Side Encryption will be enabled if encryption option is set
* to true. Master symmetric key is taken from hadoop configuration
* property spark-redshift.master-sym-key
*/
def encryption: Boolean = parameters("encryption").toBoolean

/**
* A JDBC URL, of the format:
*
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
package com.databricks.spark.redshift

import java.util
import javax.crypto.SecretKey
import javax.crypto.spec.SecretKeySpec

import com.amazonaws.services.s3.model.{EncryptionMaterials, EncryptionMaterialsProvider}
import com.amazonaws.util.Base64
import org.apache.hadoop.conf.{Configurable, Configuration}


/**
* This class provides encryption materials to be used by spark-redshfit
* library. It gets the master symmetric key from hadoop configuration,
* converts it into EncryptionMaterials and returns them.
*/
object RedshiftEncryptionMaterialsProvider {
/**
* Encryption algorithm used for by Redshift.
*/
private val ENCRYPTION_ALGORITHM: String = "AES"
/**
* This is the hadoop configuration property name that contains
* redshift master symmetric key.
*/
private val SPARK_REDSHIFT_MASTER_SYM_KEY: String = "spark-redshift.master-sym-key"
}

class RedshiftEncryptionMaterialsProvider
extends EncryptionMaterialsProvider
with Configurable {
/**
* The hadoop configuration.
*/
private var conf: Configuration = null

def setConf(conf: Configuration) {
this.conf = conf
}

def getConf: Configuration = {
return this.conf
}

def refresh {
}

def getEncryptionMaterials(materialsDescription: Map[String, String]):
EncryptionMaterials = {
return getEncryptionMaterials
}

override def getEncryptionMaterials(
materialsDescription: util.Map[String, String]):
EncryptionMaterials = getEncryptionMaterials

override def getEncryptionMaterials: EncryptionMaterials = {
val masterKey: SecretKey =
new SecretKeySpec(
Base64.decode(
conf.get(
RedshiftEncryptionMaterialsProvider.
SPARK_REDSHIFT_MASTER_SYM_KEY)),
RedshiftEncryptionMaterialsProvider.ENCRYPTION_ALGORITHM)
return new EncryptionMaterials(masterKey)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -181,22 +181,31 @@ private[redshift] case class RedshiftRelation(
// Always quote column names:
val columnList = requiredColumns.map(col => s""""$col"""").mkString(", ")
val whereClause = FilterPushdown.buildWhereClause(schema, filters)
val hadoopConfiguration = sqlContext.sparkContext.hadoopConfiguration
val credsString: String =
AWSCredentialsUtils.getRedshiftCredentialsString(params, creds.getCredentials)
AWSCredentialsUtils.getRedshiftCredentialsString(
params, creds.getCredentials, hadoopConfiguration)
val query = {
// Since the query passed to UNLOAD will be enclosed in single quotes, we need to escape
// any backslashes and single quotes that appear in the query itself
val escapedTableNameOrSubqury = tableNameOrSubquery.replace("\\", "\\\\").replace("'", "\\'")
s"SELECT $columnList FROM $escapedTableNameOrSubqury $whereClause"
val escapedTableNameOrSubquery = tableNameOrSubquery.replace("\\", "\\\\").replace("'", "\\'")
s"SELECT $columnList FROM $escapedTableNameOrSubquery $whereClause"
}
// We need to remove S3 credentials from the unload path URI because they will conflict with
// the credentials passed via `credsString`.
val fixedUrl = Utils.fixS3Url(Utils.removeCredentialsFromURI(new URI(tempDir)).toString)

s"UNLOAD ('$query') TO '$fixedUrl' WITH CREDENTIALS '$credsString' ESCAPE MANIFEST"
val encryptOptStr: String =
Utils.getEncryptOptStr(params, sqlContext.sparkContext.hadoopConfiguration)

s"UNLOAD ('$query') " +
s"TO '$fixedUrl' " +
s"WITH CREDENTIALS '$credsString' " +
s"ESCAPE " +
s"MANIFEST " +
s"$encryptOptStr"
}

private def pruneSchema(schema: StructType, columns: Array[String]): StructType = {
private def pruneSchema(schema: StructType, columns: Array[String]): StructType = {
val fieldMap = Map(schema.fields.map(x => x.name -> x): _*)
new StructType(columns.map(name => fieldMap(name)))
}
Expand Down
Loading