Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/trigger_files/beam_PostCommit_Python.json
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
{
"comment": "Modify this file in a trivial way to cause this test suite to run.",
"modification": 32
"modification": 33
}

Original file line number Diff line number Diff line change
Expand Up @@ -755,6 +755,7 @@ class BeamModulePlugin implements Plugin<Project> {
google_cloud_dataflow_java_proto_library_all: "com.google.cloud.dataflow:google-cloud-dataflow-java-proto-library-all:0.5.160304",
google_cloud_datastore_v1_proto_client : "com.google.cloud.datastore:datastore-v1-proto-client:2.32.3", // [bomupgrader] sets version
google_cloud_firestore : "com.google.cloud:google-cloud-firestore", // google_cloud_platform_libraries_bom sets version
google_cloud_kms : "com.google.cloud:google-cloud-kms", // google_cloud_platform_libraries_bom sets version
google_cloud_pubsub : "com.google.cloud:google-cloud-pubsub", // google_cloud_platform_libraries_bom sets version
google_cloud_pubsublite : "com.google.cloud:google-cloud-pubsublite", // google_cloud_platform_libraries_bom sets version
// [bomupgrader] the BOM version is set by scripts/tools/bomupgrader.py. If update manually, also update
Expand All @@ -765,6 +766,7 @@ class BeamModulePlugin implements Plugin<Project> {
google_cloud_spanner_bom : "com.google.cloud:google-cloud-spanner-bom:$google_cloud_spanner_version",
google_cloud_spanner : "com.google.cloud:google-cloud-spanner", // google_cloud_platform_libraries_bom sets version
google_cloud_spanner_test : "com.google.cloud:google-cloud-spanner:$google_cloud_spanner_version:tests",
google_cloud_tink : "com.google.crypto.tink:tink:1.19.0",
google_cloud_vertexai : "com.google.cloud:google-cloud-vertexai", // google_cloud_platform_libraries_bom sets version
google_code_gson : "com.google.code.gson:gson:$google_code_gson_version",
// google-http-client's version is explicitly declared for sdks/java/maven-archetypes/examples
Expand Down Expand Up @@ -866,6 +868,7 @@ class BeamModulePlugin implements Plugin<Project> {
proto_google_cloud_datacatalog_v1beta1 : "com.google.api.grpc:proto-google-cloud-datacatalog-v1beta1", // google_cloud_platform_libraries_bom sets version
proto_google_cloud_datastore_v1 : "com.google.api.grpc:proto-google-cloud-datastore-v1", // google_cloud_platform_libraries_bom sets version
proto_google_cloud_firestore_v1 : "com.google.api.grpc:proto-google-cloud-firestore-v1", // google_cloud_platform_libraries_bom sets version
proto_google_cloud_kms_v1 : "com.google.api.grpc:proto-google-cloud-kms-v1", // google_cloud_platform_libraries_bom sets version
proto_google_cloud_pubsub_v1 : "com.google.api.grpc:proto-google-cloud-pubsub-v1", // google_cloud_platform_libraries_bom sets version
proto_google_cloud_pubsublite_v1 : "com.google.api.grpc:proto-google-cloud-pubsublite-v1", // google_cloud_platform_libraries_bom sets version
proto_google_cloud_secret_manager_v1 : "com.google.api.grpc:proto-google-cloud-secretmanager-v1", // google_cloud_platform_libraries_bom sets version
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@
<!-- gRPC/protobuf exceptions -->
<!-- Non-vendored gRPC/protobuf imports are allowed for files that depend on libraries that expose gRPC/protobuf in its public API -->
<suppress id="ForbidNonVendoredGrpcProtobuf" files=".*sdk.*extensions.*protobuf.*" />
<suppress id="ForbidNonVendoredGrpcProtobuf" files=".*sdk.*core.*GcpHsmGeneratedSecret.*" />
<suppress id="ForbidNonVendoredGrpcProtobuf" files=".*sdk.*core.*GroupByEncryptedKeyTest.*" />
<suppress id="ForbidNonVendoredGrpcProtobuf" files=".*sdk.*core.*GroupByKeyTest.*" />
<suppress id="ForbidNonVendoredGrpcProtobuf" files=".*sdk.*core.*GroupByKeyIT.*" />
Expand Down
6 changes: 6 additions & 0 deletions sdks/java/core/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,10 @@ dependencies {
shadow library.java.snappy_java
shadow library.java.joda_time
implementation enforcedPlatform(library.java.google_cloud_platform_libraries_bom)
implementation library.java.gax
implementation library.java.google_cloud_kms
implementation library.java.proto_google_cloud_kms_v1
implementation library.java.google_cloud_tink
implementation library.java.google_cloud_secret_manager
implementation library.java.proto_google_cloud_secret_manager_v1
implementation library.java.protobuf_java
Expand Down Expand Up @@ -130,6 +134,8 @@ dependencies {
shadowTest library.java.log4j2_api
shadowTest library.java.jamm
shadowTest 'com.google.cloud:google-cloud-secretmanager:2.75.0'
shadowTest 'com.google.cloud:google-cloud-kms:2.75.0'
shadowTest 'com.google.crypto.tink:tink:1.19.0'
testRuntimeOnly library.java.slf4j_jdk14
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,191 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.sdk.util;

import com.google.api.gax.rpc.AlreadyExistsException;
import com.google.api.gax.rpc.NotFoundException;
import com.google.cloud.kms.v1.CryptoKeyName;
import com.google.cloud.kms.v1.EncryptResponse;
import com.google.cloud.kms.v1.KeyManagementServiceClient;
import com.google.cloud.secretmanager.v1.AccessSecretVersionResponse;
import com.google.cloud.secretmanager.v1.ProjectName;
import com.google.cloud.secretmanager.v1.Replication;
import com.google.cloud.secretmanager.v1.SecretManagerServiceClient;
import com.google.cloud.secretmanager.v1.SecretName;
import com.google.cloud.secretmanager.v1.SecretPayload;
import com.google.cloud.secretmanager.v1.SecretVersionName;
import com.google.crypto.tink.subtle.Hkdf;
import com.google.protobuf.ByteString;
import java.io.IOException;
import java.security.GeneralSecurityException;
import java.security.SecureRandom;
import java.util.Base64;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* A {@link org.apache.beam.sdk.util.Secret} manager implementation that generates a secret using
* entropy from a GCP HSM key and stores it in Google Cloud Secret Manager. If the secret already
* exists, it will be retrieved.
*/
public class GcpHsmGeneratedSecret implements Secret {
private static final Logger LOG = LoggerFactory.getLogger(GcpHsmGeneratedSecret.class);
private final String projectId;
private final String locationId;
private final String keyRingId;
private final String keyId;
private final String secretId;

private final SecureRandom random = new SecureRandom();

public GcpHsmGeneratedSecret(
String projectId, String locationId, String keyRingId, String keyId, String jobName) {
this.projectId = projectId;
this.locationId = locationId;
this.keyRingId = keyRingId;
this.keyId = keyId;
this.secretId = "HsmGeneratedSecret_" + jobName;
}

/**
* Returns the secret as a byte array. Assumes that the current active service account has
* permissions to read the secret.
*
* @return The secret as a byte array.
*/
@Override
public byte[] getSecretBytes() {
try (SecretManagerServiceClient client = SecretManagerServiceClient.create()) {
SecretVersionName secretVersionName = SecretVersionName.of(projectId, secretId, "1");

try {
AccessSecretVersionResponse response = client.accessSecretVersion(secretVersionName);
return response.getPayload().getData().toByteArray();
} catch (NotFoundException e) {
LOG.info(
"Secret version {} not found. Creating new secret and version.",
secretVersionName.toString());
}

ProjectName projectName = ProjectName.of(projectId);
SecretName secretName = SecretName.of(projectId, secretId);
try {
com.google.cloud.secretmanager.v1.Secret secret =
com.google.cloud.secretmanager.v1.Secret.newBuilder()
.setReplication(
Replication.newBuilder()
.setAutomatic(Replication.Automatic.newBuilder().build()))
.build();
client.createSecret(projectName, secretId, secret);
} catch (AlreadyExistsException e) {
LOG.info("Secret {} already exists. Adding new version.", secretName.toString());
}

byte[] newKey = generateDek();

try {
// Always retrieve remote secret as source-of-truth in case another thread created it
AccessSecretVersionResponse response = client.accessSecretVersion(secretVersionName);
return response.getPayload().getData().toByteArray();
} catch (NotFoundException e) {
LOG.info(
"Secret version {} not found after re-check. Creating new secret and version.",
secretVersionName.toString());
}

SecretPayload payload =
SecretPayload.newBuilder().setData(ByteString.copyFrom(newKey)).build();
client.addSecretVersion(secretName, payload);
AccessSecretVersionResponse response = client.accessSecretVersion(secretVersionName);
return response.getPayload().getData().toByteArray();

} catch (IOException | GeneralSecurityException e) {
throw new RuntimeException("Failed to retrieve or create secret bytes", e);
}
}

private byte[] generateDek() throws IOException, GeneralSecurityException {
int dekSize = 32;
try (KeyManagementServiceClient client = KeyManagementServiceClient.create()) {
// 1. Generate nonce_one. This doesn't need to have baked in randomness since the
// actual randomness comes from KMS.
byte[] nonceOne = new byte[dekSize];
random.nextBytes(nonceOne);

// 2. Encrypt to get nonce_two
CryptoKeyName keyName = CryptoKeyName.of(projectId, locationId, keyRingId, keyId);
EncryptResponse response = client.encrypt(keyName, ByteString.copyFrom(nonceOne));
byte[] nonceTwo = response.getCiphertext().toByteArray();

// 3. Generate DK
byte[] dk = new byte[dekSize];
random.nextBytes(dk);

// 4. Derive DEK using HKDF
byte[] dek = Hkdf.computeHkdf("HmacSha256", dk, nonceTwo, new byte[0], dekSize);

// 5. Base64 encode
return Base64.getUrlEncoder().encode(dek);
}
}

/**
* Returns the project ID of the secret.
*
* @return The project ID as a String.
*/
public String getProjectId() {
return projectId;
}

/**
* Returns the location ID of the secret.
*
* @return The location ID as a String.
*/
public String getLocationId() {
return locationId;
}

/**
* Returns the key ring ID of the secret.
*
* @return The key ring ID as a String.
*/
public String getKeyRingId() {
return keyRingId;
}

/**
* Returns the key ID of the secret.
*
* @return The key ID as a String.
*/
public String getKeyId() {
return keyId;
}

/**
* Returns the secret ID of the secret.
*
* @return The secret ID as a String.
*/
public String getSecretId() {
return secretId;
}
}
45 changes: 39 additions & 6 deletions sdks/java/core/src/main/java/org/apache/beam/sdk/util/Secret.java
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions;

/**
* A secret management interface used for handling sensitive data.
Expand Down Expand Up @@ -70,16 +71,48 @@ static Secret parseSecretOption(String secretOption) {
paramName, gcpSecretParams));
}
}
String versionName = paramMap.get("version_name");
if (versionName == null) {
throw new RuntimeException(
"version_name must contain a valid value for versionName parameter");
}
String versionName =
Preconditions.checkNotNull(
paramMap.get("version_name"),
"version_name must contain a valid value for versionName parameter");
return new GcpSecret(versionName);
case "gcphsmgeneratedsecret":
Set<String> gcpHsmGeneratedSecretParams =
new HashSet<>(
Arrays.asList("project_id", "location_id", "key_ring_id", "key_id", "job_name"));
for (String paramName : paramMap.keySet()) {
if (!gcpHsmGeneratedSecretParams.contains(paramName)) {
throw new RuntimeException(
String.format(
"Invalid secret parameter %s, GcpHsmGeneratedSecret only supports the following parameters: %s",
paramName, gcpHsmGeneratedSecretParams));
}
}
String projectId =
Preconditions.checkNotNull(
paramMap.get("project_id"),
"project_id must contain a valid value for projectId parameter");
String locationId =
Preconditions.checkNotNull(
paramMap.get("location_id"),
"location_id must contain a valid value for locationId parameter");
String keyRingId =
Preconditions.checkNotNull(
paramMap.get("key_ring_id"),
"key_ring_id must contain a valid value for keyRingId parameter");
String keyId =
Preconditions.checkNotNull(
paramMap.get("key_id"), "key_id must contain a valid value for keyId parameter");
String jobName =
Preconditions.checkNotNull(
paramMap.get("job_name"),
"job_name must contain a valid value for jobName parameter");
return new GcpHsmGeneratedSecret(projectId, locationId, keyRingId, keyId, jobName);
default:
throw new RuntimeException(
String.format(
"Invalid secret type %s, currently only GcpSecret is supported", secretType));
"Invalid secret type %s, currently only GcpSecret and GcpHsmGeneratedSecret are supported",
secretType));
}
}
}
Loading
Loading