From 1d98933a0eea73cbb7f707fbd026d84db5c19b5c Mon Sep 17 00:00:00 2001 From: Danny Mccormick Date: Mon, 24 Nov 2025 15:53:54 -0500 Subject: [PATCH 01/18] Add new method of generating key for GBEK --- .../apache_beam/transforms/core_it_test.py | 67 ++++++++++ sdks/python/apache_beam/transforms/util.py | 118 ++++++++++++++++- .../apache_beam/transforms/util_test.py | 119 ++++++++++++++++++ sdks/python/setup.py | 1 + 4 files changed, 304 insertions(+), 1 deletion(-) diff --git a/sdks/python/apache_beam/transforms/core_it_test.py b/sdks/python/apache_beam/transforms/core_it_test.py index 18ae3f30f574..a4658d51a61f 100644 --- a/sdks/python/apache_beam/transforms/core_it_test.py +++ b/sdks/python/apache_beam/transforms/core_it_test.py @@ -38,6 +38,11 @@ except ImportError: secretmanager = None # type: ignore[assignment] +try: + from google.cloud import kms +except ImportError: + kms = None # type: ignore[assignment] + class GbekIT(unittest.TestCase): @classmethod @@ -74,10 +79,56 @@ def setUpClass(cls): cls.gcp_secret = GcpSecret(version_name) cls.secret_option = f'type:GcpSecret;version_name:{version_name}' + if kms is not None: + cls.kms_client = kms.KeyManagementServiceClient() + cls.location_id = 'global' + py_version = f'_py{sys.version_info.major}{sys.version_info.minor}' + secret_postfix = datetime.now().strftime('%m%d_%H%M%S') + py_version + cls.key_ring_id = 'gbekit_key_ring_tests_' + secret_postfix + cls.key_ring_path = cls.kms_client.key_ring_path( + cls.project_id, cls.location_id, cls.key_ring_id) + try: + cls.kms_client.get_key_ring(request={'name': cls.key_ring_path}) + except Exception: + cls.kms_client.create_key_ring( + request={ + 'parent': + f'projects/{cls.project_id}/locations/{cls.location_id}', + 'key_ring_id': cls.key_ring_id, + }) + cls.key_id = 'gbekit_key_tests_' + secret_postfix + cls.key_path = cls.kms_client.crypto_key_path( + cls.project_id, cls.location_id, cls.key_ring_id, cls.key_id) + try: + cls.kms_client.get_crypto_key(request={'name': cls.key_path}) + except Exception: + cls.kms_client.create_crypto_key( + request={ + 'parent': cls.key_ring_path, + 'crypto_key_id': cls.key_id, + 'crypto_key': { + 'purpose': kms.CryptoKey.CryptoKeyPurpose.ENCRYPT_DECRYPT + } + }) + cls.hsm_secret_option = ( + f'type:GcpHsmGeneratedSecret;project_id:{cls.project_id};' + f'location_id:{cls.location_id};key_ring_id:{cls.key_ring_id};' + f'key_id:{cls.key_id};job_name:{secret_postfix}') + @classmethod def tearDownClass(cls): if secretmanager is not None: cls.client.delete_secret(request={'name': cls.secret_path}) + if kms is not None and hasattr(cls, 'kms_client') and hasattr( + cls, 'key_path'): + for version in cls.kms_client.list_crypto_key_versions( + request={'parent': cls.key_path}): + if version.state in [ + kms.CryptoKeyVersion.CryptoKeyVersionState.ENABLED, + kms.CryptoKeyVersion.CryptoKeyVersionState.DISABLED + ]: + cls.kms_client.destroy_crypto_key_version( + request={'name': version.name}) @pytest.mark.it_postcommit @unittest.skipIf(secretmanager is None, 'GCP dependencies are not installed') @@ -94,6 +145,22 @@ def test_gbk_with_gbek_it(self): pipeline.run().wait_until_finish() + @pytest.mark.it_postcommit + @unittest.skipIf(secretmanager is None, 'GCP dependencies are not installed') + @unittest.skipIf(kms is None, 'GCP dependencies are not installed') + def test_gbk_with_gbek_hsm_it(self): + pipeline = TestPipeline(is_integration_test=True) + pipeline.options.view_as(SetupOptions).gbek = self.hsm_secret_option + + pcoll_1 = pipeline | 'Start 1' >> beam.Create([('a', 1), ('a', 2), ('b', 3), + ('c', 4)]) + result = (pcoll_1) | beam.GroupByKey() + sorted_result = result | beam.Map(lambda x: (x[0], sorted(x[1]))) + assert_that( + sorted_result, equal_to([('a', ([1, 2])), ('b', ([3])), ('c', ([4]))])) + + pipeline.run().wait_until_finish() + @pytest.mark.it_postcommit @unittest.skipIf(secretmanager is None, 'GCP dependencies are not installed') def test_combineValues_with_gbek_it(self): diff --git a/sdks/python/apache_beam/transforms/util.py b/sdks/python/apache_beam/transforms/util.py index ba79d4ddf31c..7e96a1093c1d 100644 --- a/sdks/python/apache_beam/transforms/util.py +++ b/sdks/python/apache_beam/transforms/util.py @@ -30,6 +30,7 @@ import threading import time import uuid +from datetime import datetime from collections.abc import Callable from collections.abc import Iterable from typing import TYPE_CHECKING @@ -366,10 +367,15 @@ def parse_secret_option(secret) -> 'Secret': if secret_type == 'gcpsecret': secret_class = GcpSecret secret_params = ['version_name'] + elif secret_type == 'gcphsmgeneratedsecret': + secret_class = GcpHsmGeneratedSecret + secret_params = [ + 'project_id', 'location_id', 'key_ring_id', 'key_id', 'job_name' + ] else: raise ValueError( f'Invalid secret type {secret_type}, currently only ' - 'GcpSecret is supported') + 'GcpSecret and GcpHsmGeneratedSecret are supported') for param_name in param_map.keys(): if param_name not in secret_params: @@ -413,6 +419,116 @@ def __eq__(self, secret): return self._version_name == getattr(secret, '_version_name', None) +class GcpHsmGeneratedSecret(Secret): + def __init__( + self, project_id: str, location_id: str, key_ring_id: str, key_id: str, job_name: str): + self._project_id = project_id + self._location_id = location_id + self._key_ring_id = key_ring_id + self._key_id = key_id + self._secret_version_name = f'HsmGeneratedSecret_{job_name}' + + def get_secret_bytes(self) -> bytes: + try: + from google.cloud import secretmanager + from google.api_core import exceptions as api_exceptions + client = secretmanager.SecretManagerServiceClient() + + project_path = f"projects/{self._project_id}" + secret_path = f"{project_path}/secrets/{self._secret_version_name}" + # Since we may generate multiple versions when doing this on workers, + # just always take the first version added to maintain consistency. + secret_version_path = f"{secret_path}/versions/1" + + try: + response = client.access_secret_version( + request={"name": secret_version_path}) + return response.payload.data + except api_exceptions.NotFound: + # Don't bother logging yet, we'll only log if we actually add the + # secret version below + pass + + try: + client.create_secret( + request={ + "parent": project_path, + "secret_id": self._secret_version_name, + "secret": {"replication": {"automatic": {}}}, + }) + except api_exceptions.AlreadyExists: + # Don't bother logging yet, we'll only log if we actually add the + # secret version below + pass + + new_key = self.generate_dek() + try: + # Try one more time in case it was created while we were generating the + # DEK. + response = client.access_secret_version( + request={"name": secret_version_path}) + return response.payload.data + except api_exceptions.NotFound: + logging.info( + f"Secret version {secret_version_path} not found. " + "Creating new secret and version.") + client.add_secret_version( + request={"parent": secret_path, "payload": {"data": new_key}}) + response = client.access_secret_version( + request={"name": secret_version_path}) + return response.payload.data + + except Exception as e: + raise RuntimeError( + f'Failed to retrieve or create secret bytes for secret ' + f'{self._secret_version_name} with exception {e}') + + def generate_dek(self, dek_size: int = 32) -> bytes: + """Generates a new Data Encryption Key (DEK) using an HSM-backed key. + + This function follows a key derivation process that incorporates entropy + from the HSM-backed key into the nonce used for key derivation. + + Returns: + A new DEK of the specified size. + """ + try: + import base64 + import os + from cryptography.hazmat.primitives.kdf.hkdf import HKDF + from cryptography.hazmat.primitives import hashes + from google.cloud import kms + + # 1. Generate a random nonce (nonce_one) + nonce_one = os.urandom(dek_size) + + # 2. Use the HSM-backed key to encrypt nonce_one to create nonce_two + kms_client = kms.KeyManagementServiceClient() + key_path = kms_client.crypto_key_path( + self._project_id, + self._location_id, + self._key_ring_id, + self._key_id) + response = kms_client.encrypt( + request={'name': key_path, 'plaintext': nonce_one}) + nonce_two = response.ciphertext + + # 3. Generate a Derivation Key (DK) + dk = os.urandom(dek_size) + + # 4. Use a KDF to derive the DEK using DK and nonce_two + hkdf = HKDF( + algorithm=hashes.SHA256(), + length=dek_size, + salt=nonce_two, + info=None, + ) + dek = hkdf.derive(dk) + return base64.urlsafe_b64encode(dek) + except Exception as e: + raise RuntimeError(f'Failed to generate DEK with exception {e}') + + class _EncryptMessage(DoFn): """A DoFn that encrypts the key and value of each element.""" def __init__( diff --git a/sdks/python/apache_beam/transforms/util_test.py b/sdks/python/apache_beam/transforms/util_test.py index 34e251fad1c7..a6270e9f6a0b 100644 --- a/sdks/python/apache_beam/transforms/util_test.py +++ b/sdks/python/apache_beam/transforms/util_test.py @@ -73,6 +73,7 @@ from apache_beam.transforms.trigger import AfterCount from apache_beam.transforms.trigger import Repeatedly from apache_beam.transforms.util import GcpSecret +from apache_beam.transforms.util import GcpHsmGeneratedSecret from apache_beam.transforms.util import Secret from apache_beam.transforms.window import FixedWindows from apache_beam.transforms.window import GlobalWindow @@ -439,6 +440,124 @@ def test_gbek_gcp_secret_manager_throws(self): result, equal_to([('a', ([1, 2])), ('b', ([3])), ('c', ([4]))])) +@unittest.skipIf(secretmanager is None, 'GCP dependencies are not installed') +class GcpHsmGeneratedSecretTest(unittest.TestCase): + def setUp(self): + self.mock_secret_manager_client = mock.MagicMock() + self.mock_kms_client = mock.MagicMock() + + # Patch the clients + self.secretmanager_patcher = mock.patch( + 'google.cloud.secretmanager.SecretManagerServiceClient', + return_value=self.mock_secret_manager_client) + self.kms_patcher = mock.patch( + 'google.cloud.kms.KeyManagementServiceClient', + return_value=self.mock_kms_client) + self.os_urandom_patcher = mock.patch('os.urandom', return_value=b'0' * 32) + self.hkdf_patcher = mock.patch( + 'cryptography.hazmat.primitives.kdf.hkdf.HKDF.derive', + return_value=b'derived_key') + + self.secretmanager_patcher.start() + self.kms_patcher.start() + self.os_urandom_patcher.start() + self.hkdf_patcher.start() + + def tearDown(self): + self.secretmanager_patcher.stop() + self.kms_patcher.stop() + self.os_urandom_patcher.stop() + self.hkdf_patcher.stop() + + def test_happy_path_secret_creation(self): + from google.api_core import exceptions as api_exceptions + + project_id = 'test-project' + location_id = 'global' + key_ring_id = 'test-key-ring' + key_id = 'test-key' + job_name = 'test-job' + + secret = GcpHsmGeneratedSecret( + project_id, location_id, key_ring_id, key_id, job_name) + + # Mock responses for secret creation path + self.mock_secret_manager_client.access_secret_version.side_effect = [ + api_exceptions.NotFound('not found'), # first check + api_exceptions.NotFound('not found'), # second check + mock.MagicMock(payload=mock.MagicMock(data=b'derived_key')) + ] + self.mock_kms_client.encrypt.return_value = mock.MagicMock( + ciphertext=b'encrypted_nonce') + + secret_bytes = secret.get_secret_bytes() + self.assertEqual(secret_bytes, b'derived_key') + + # Assertions on mocks + secret_version_path = ( + f'projects/{project_id}/secrets/{secret._secret_version_name}/versions/1' + ) + self.mock_secret_manager_client.access_secret_version.assert_any_call( + request={'name': secret_version_path}) + self.assertEqual( + self.mock_secret_manager_client.access_secret_version.call_count, 3) + self.mock_secret_manager_client.create_secret.assert_called_once() + self.mock_kms_client.encrypt.assert_called_once() + self.mock_secret_manager_client.add_secret_version.assert_called_once() + + def test_secret_already_exists(self): + from google.api_core import exceptions as api_exceptions + + project_id = 'test-project' + location_id = 'global' + key_ring_id = 'test-key-ring' + key_id = 'test-key' + job_name = 'test-job' + + secret = GcpHsmGeneratedSecret( + project_id, location_id, key_ring_id, key_id, job_name) + + # Mock responses for secret creation path + self.mock_secret_manager_client.access_secret_version.side_effect = [ + api_exceptions.NotFound('not found'), + api_exceptions.NotFound('not found'), + mock.MagicMock(payload=mock.MagicMock(data=b'derived_key')) + ] + self.mock_secret_manager_client.create_secret.side_effect = ( + api_exceptions.AlreadyExists('exists')) + self.mock_kms_client.encrypt.return_value = mock.MagicMock( + ciphertext=b'encrypted_nonce') + + secret_bytes = secret.get_secret_bytes() + self.assertEqual(secret_bytes, b'derived_key') + + # Assertions on mocks + self.mock_secret_manager_client.create_secret.assert_called_once() + self.mock_secret_manager_client.add_secret_version.assert_called_once() + + def test_secret_version_already_exists(self): + project_id = 'test-project' + location_id = 'global' + key_ring_id = 'test-key-ring' + key_id = 'test-key' + job_name = 'test-job' + + secret = GcpHsmGeneratedSecret( + project_id, location_id, key_ring_id, key_id, job_name) + + self.mock_secret_manager_client.access_secret_version.return_value = ( + mock.MagicMock(payload=mock.MagicMock(data=b'existing_dek'))) + + secret_bytes = secret.get_secret_bytes() + self.assertEqual(secret_bytes, b'existing_dek') + + # Assertions + self.mock_secret_manager_client.access_secret_version.assert_called_once() + self.mock_secret_manager_client.create_secret.assert_not_called() + self.mock_secret_manager_client.add_secret_version.assert_not_called() + self.mock_kms_client.encrypt.assert_not_called() + + class FakeClock(object): def __init__(self, now=time.time()): self._now = now diff --git a/sdks/python/setup.py b/sdks/python/setup.py index 074d64ae8921..80b121fd9e3b 100644 --- a/sdks/python/setup.py +++ b/sdks/python/setup.py @@ -485,6 +485,7 @@ def get_portability_package_data(): 'google-cloud-spanner>=3.0.0,<4', # GCP Packages required by ML functionality 'google-cloud-dlp>=3.0.0,<4', + 'google-cloud-kms>=3.0.0,<4', 'google-cloud-language>=2.0,<3', 'google-cloud-secret-manager>=2.0,<3', 'google-cloud-videointelligence>=2.0,<3', From a7bae2ada89cdc7bcf0ddbfb16216cd12af1a0c7 Mon Sep 17 00:00:00 2001 From: Danny Mccormick Date: Mon, 24 Nov 2025 16:31:06 -0500 Subject: [PATCH 02/18] Java version --- .../beam/sdk/util/GcpHsmGeneratedSecret.java | 141 ++++++++++++++++++ .../java/org/apache/beam/sdk/util/Secret.java | 21 ++- .../transforms/GroupByEncryptedKeyTest.java | 87 +++++++++++ .../beam/sdk/transforms/GroupByKeyIT.java | 119 +++++++++++++++ 4 files changed, 367 insertions(+), 1 deletion(-) create mode 100644 sdks/java/core/src/main/java/org/apache/beam/sdk/util/GcpHsmGeneratedSecret.java diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/GcpHsmGeneratedSecret.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/GcpHsmGeneratedSecret.java new file mode 100644 index 000000000000..6f1eea5baf6e --- /dev/null +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/GcpHsmGeneratedSecret.java @@ -0,0 +1,141 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.util; + +import com.google.api.gax.rpc.AlreadyExistsException; +import com.google.api.gax.rpc.NotFoundException; +import com.google.cloud.kms.v1.CryptoKeyName; +import com.google.cloud.kms.v1.EncryptResponse; +import com.google.cloud.kms.v1.KeyManagementServiceClient; +import com.google.cloud.secretmanager.v1.AccessSecretVersionResponse; +import com.google.cloud.secretmanager.v1.ProjectName; +import com.google.cloud.secretmanager.v1.Replication; +import com.google.cloud.secretmanager.v1.SecretPayload; +import com.google.cloud.secretmanager.v1.SecretManagerServiceClient; +import com.google.cloud.secretmanager.v1.SecretName; +import com.google.cloud.secretmanager.v1.SecretVersionName; +import com.google.crypto.tink.subtle.Hkdf; +import com.google.protobuf.ByteString; +import java.io.IOException; +import java.security.GeneralSecurityException; +import java.security.SecureRandom; +import java.util.Base64; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * A {@link Secret} manager implementation that retrieves secrets from Google Cloud Secret Manager. + */ +public class GcpHsmGeneratedSecret implements Secret { + private static final Logger LOG = LoggerFactory.getLogger(GcpHsmGeneratedSecret.class); + private final String projectId; + private final String locationId; + private final String keyRingId; + private final String keyId; + private final String secretId; + + public GcpHsmGeneratedSecret( + String projectId, String locationId, String keyRingId, String keyId, String jobName) { + this.projectId = projectId; + this.locationId = locationId; + this.keyRingId = keyRingId; + this.keyId = keyId; + this.secretId = "HsmGeneratedSecret_" + jobName; + } + + /** + * Returns the secret as a byte array. Assumes that the current active service account has + * permissions to read the secret. + * + * @return The secret as a byte array. + */ + @Override + public byte[] getSecretBytes() { + try (SecretManagerServiceClient client = SecretManagerServiceClient.create()) { + SecretVersionName secretVersionName = SecretVersionName.of(projectId, secretId, "1"); + + try { + AccessSecretVersionResponse response = client.accessSecretVersion(secretVersionName); + return response.getPayload().getData().toByteArray(); + } catch (NotFoundException e) { + LOG.info( + "Secret version {} not found. Creating new secret and version.", + secretVersionName.toString()); + } + + ProjectName projectName = ProjectName.of(projectId); + SecretName secretName = SecretName.of(projectId, secretId); + try { + com.google.cloud.secretmanager.v1.Secret secret = + com.google.cloud.secretmanager.v1.Secret.newBuilder() + .setReplication( + Replication.newBuilder() + .setAutomatic(Replication.Automatic.newBuilder().build())) + .build(); + client.createSecret(projectName, secretId, secret); + } catch (AlreadyExistsException e) { + LOG.info("Secret {} already exists. Adding new version.", secretName.toString()); + } + + byte[] newKey = generateDek(); + + try { + // Try to access again in case another thread created it. + AccessSecretVersionResponse response = client.accessSecretVersion(secretVersionName); + return response.getPayload().getData().toByteArray(); + } catch (NotFoundException e) { + LOG.info( + "Secret version {} not found after re-check. Creating new secret and version.", + secretVersionName.toString()); + } + + SecretPayload payload = SecretPayload.newBuilder().setData(ByteString.copyFrom(newKey)).build(); + client.addSecretVersion(secretName, payload); + AccessSecretVersionResponse response = client.accessSecretVersion(secretVersionName); + return response.getPayload().getData().toByteArray(); + + } catch (IOException | GeneralSecurityException e) { + throw new RuntimeException("Failed to retrieve or create secret bytes", e); + } + } + + private byte[] generateDek() throws IOException, GeneralSecurityException { + int dekSize = 32; + try (KeyManagementServiceClient client = KeyManagementServiceClient.create()) { + // 1. Generate nonce_one + SecureRandom random = new SecureRandom(); + byte[] nonceOne = new byte[dekSize]; + random.nextBytes(nonceOne); + + // 2. Encrypt to get nonce_two + CryptoKeyName keyName = CryptoKeyName.of(projectId, locationId, keyRingId, keyId); + EncryptResponse response = client.encrypt(keyName, ByteString.copyFrom(nonceOne)); + byte[] nonceTwo = response.getCiphertext().toByteArray(); + + // 3. Generate DK + byte[] dk = new byte[dekSize]; + random.nextBytes(dk); + + // 4. Derive DEK using HKDF + byte[] dek = Hkdf.computeHkdf("HmacSha256", dk, nonceTwo, new byte[0], dekSize); + + // 5. Base64 encode + return Base64.getUrlEncoder().encode(dek); + } + } +} diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/Secret.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/Secret.java index a75e01c9543f..e33d98506c27 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/Secret.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/Secret.java @@ -76,10 +76,29 @@ static Secret parseSecretOption(String secretOption) { "version_name must contain a valid value for versionName parameter"); } return new GcpSecret(versionName); + case "gcphsmgeneratedsecret": + Set gcpHsmGeneratedSecretParams = + new HashSet<>( + Arrays.asList("project_id", "location_id", "key_ring_id", "key_id", "job_name")); + for (String paramName : paramMap.keySet()) { + if (!gcpHsmGeneratedSecretParams.contains(paramName)) { + throw new RuntimeException( + String.format( + "Invalid secret parameter %s, GcpHsmGeneratedSecret only supports the following parameters: %s", + paramName, gcpHsmGeneratedSecretParams)); + } + } + return new GcpHsmGeneratedSecret( + paramMap.get("project_id"), + paramMap.get("location_id"), + paramMap.get("key_ring_id"), + paramMap.get("key_id"), + paramMap.get("job_name")); default: throw new RuntimeException( String.format( - "Invalid secret type %s, currently only GcpSecret is supported", secretType)); + "Invalid secret type %s, currently only GcpSecret and GcpHsmGeneratedSecret are supported", + secretType)); } } } diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/GroupByEncryptedKeyTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/GroupByEncryptedKeyTest.java index 31064470bd38..d8f4b3e90b6c 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/GroupByEncryptedKeyTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/GroupByEncryptedKeyTest.java @@ -39,6 +39,7 @@ import org.apache.beam.sdk.testing.NeedsRunner; import org.apache.beam.sdk.testing.PAssert; import org.apache.beam.sdk.testing.TestPipeline; +import org.apache.beam.sdk.util.GcpHsmGeneratedSecret; import org.apache.beam.sdk.util.GcpSecret; import org.apache.beam.sdk.util.Secret; import org.apache.beam.sdk.values.KV; @@ -102,6 +103,9 @@ public void testGroupByKeyFakeSecret() { private static final String PROJECT_ID = "apache-beam-testing"; private static final String SECRET_ID = "gbek-test"; private static Secret gcpSecret; + private static Secret gcpHsmGeneratedSecret; + private static String keyRingId; + private static String keyId; @BeforeClass public static void setup() throws IOException { @@ -131,6 +135,37 @@ public static void setup() throws IOException { .build()); } gcpSecret = new GcpSecret(secretName.toString() + "/versions/latest"); + + try { + com.google.cloud.kms.v1.KeyManagementServiceClient kmsClient = + com.google.cloud.kms.v1.KeyManagementServiceClient.create(); + String locationId = "global"; + keyRingId = "gbek-test-key-ring-" + System.currentTimeMillis(); + com.google.cloud.kms.v1.KeyRingName keyRingName = + com.google.cloud.kms.v1.KeyRingName.of(PROJECT_ID, locationId, keyRingId); + kmsClient.createKeyRing( + keyRingName.getProject(), + keyRingName.getLocation(), + keyRingId, + com.google.cloud.kms.v1.KeyRing.newBuilder().build()); + + keyId = "gbek-test-key-" + System.currentTimeMillis(); + com.google.cloud.kms.v1.CryptoKey key = + com.google.cloud.kms.v1.CryptoKey.newBuilder() + .setPurpose( + com.google.cloud.kms.v1.CryptoKey.CryptoKeyPurpose.ENCRYPT_DECRYPT) + .build(); + kmsClient.createCryptoKey(keyRingName, keyId, key); + gcpHsmGeneratedSecret = + new GcpHsmGeneratedSecret( + PROJECT_ID, + locationId, + keyRingId, + keyId, + String.format("gbek-test-job-%d", new SecureRandom().nextInt(10000))); + } catch (Exception e) { + gcpHsmGeneratedSecret = null; + } } @AfterClass @@ -138,6 +173,21 @@ public static void tearDown() throws IOException { SecretManagerServiceClient client = SecretManagerServiceClient.create(); SecretName secretName = SecretName.of(PROJECT_ID, SECRET_ID); client.deleteSecret(secretName); + if (gcpHsmGeneratedSecret != null) { + com.google.cloud.kms.v1.KeyManagementServiceClient kmsClient = + com.google.cloud.kms.v1.KeyManagementServiceClient.create(); + com.google.cloud.kms.v1.CryptoKeyName keyName = + com.google.cloud.kms.v1.CryptoKeyName.of(PROJECT_ID, "global", keyRingId, keyId); + for (com.google.cloud.kms.v1.CryptoKeyVersion version : + kmsClient.listCryptoKeyVersions(keyName).iterateAll()) { + if (version.getState() + == com.google.cloud.kms.v1.CryptoKeyVersion.CryptoKeyVersionState.ENABLED + || version.getState() + == com.google.cloud.kms.v1.CryptoKeyVersion.CryptoKeyVersionState.DISABLED) { + kmsClient.destroyCryptoKeyVersion(version.getName()); + } + } + } } @Test @@ -183,6 +233,43 @@ public void testGroupByKeyGcpSecretThrows() { assertThrows(RuntimeException.class, () -> p.run()); } + @Test + @Category(NeedsRunner.class) + public void testGroupByKeyGcpHsmGeneratedSecret() { + if (gcpHsmGeneratedSecret == null) { + return; + } + List> ungroupedPairs = + Arrays.asList( + KV.of(null, 3), + KV.of("k1", 3), + KV.of("k5", Integer.MAX_VALUE), + KV.of("k5", Integer.MIN_VALUE), + KV.of("k2", 66), + KV.of("k1", 4), + KV.of(null, 5), + KV.of("k2", -33), + KV.of("k3", 0)); + + PCollection> input = + p.apply( + Create.of(ungroupedPairs) + .withCoder(KvCoder.of(NullableCoder.of(StringUtf8Coder.of()), VarIntCoder.of()))); + + PCollection>> output = + input.apply(GroupByEncryptedKey.create(gcpHsmGeneratedSecret)); + + PAssert.that(output.apply("Sort", MapElements.via(new SortValues()))) + .containsInAnyOrder( + KV.of("k1", Arrays.asList(3, 4)), + KV.of(null, Arrays.asList(3, 5)), + KV.of("k5", Arrays.asList(Integer.MIN_VALUE, Integer.MAX_VALUE)), + KV.of("k2", Arrays.asList(-33, 66)), + KV.of("k3", Arrays.asList(0))); + + p.run(); + } + private static class SortValues extends SimpleFunction>, KV>> { @Override diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/GroupByKeyIT.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/GroupByKeyIT.java index 1c8168a42a03..1a6bf15bebcf 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/GroupByKeyIT.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/GroupByKeyIT.java @@ -17,6 +17,12 @@ */ package org.apache.beam.sdk.transforms; +import com.google.cloud.secretmanager.v1.ProjectName; +import com.google.cloud.kms.v1.CryptoKey; +import com.google.cloud.kms.v1.CryptoKeyName; +import com.google.cloud.kms.v1.CryptoKeyVersion; +import com.google.cloud.kms.v1.KeyManagementServiceClient; +import com.google.cloud.kms.v1.KeyRingName; import com.google.cloud.secretmanager.v1.ProjectName; import com.google.cloud.secretmanager.v1.SecretManagerServiceClient; import com.google.cloud.secretmanager.v1.SecretName; @@ -51,7 +57,10 @@ public class GroupByKeyIT { private static final String PROJECT_ID = "apache-beam-testing"; private static final String SECRET_ID = "gbek-test"; private static String gcpSecretVersionName; + private static String gcpHsmSecretOption; private static String secretId; + private static String keyRingId; + private static String keyId; @BeforeClass public static void setup() throws IOException { @@ -88,6 +97,31 @@ public static void setup() throws IOException { .build()); } gcpSecretVersionName = secretName.toString() + "/versions/latest"; + + try { + KeyManagementServiceClient kmsClient = KeyManagementServiceClient.create(); + String locationId = "global"; + keyRingId = "gbekit-key-ring-" + new SecureRandom().nextInt(10000); + KeyRingName keyRingName = KeyRingName.of(PROJECT_ID, locationId, keyRingId); + kmsClient.createKeyRing( + keyRingName.getProject(), + keyRingName.getLocation(), + keyRingId, + com.google.cloud.kms.v1.KeyRing.newBuilder().build()); + + keyId = "gbekit-key-" + new SecureRandom().nextInt(10000); + CryptoKey key = + CryptoKey.newBuilder() + .setPurpose(CryptoKey.CryptoKeyPurpose.ENCRYPT_DECRYPT) + .build(); + kmsClient.createCryptoKey(keyRingName, keyId, key); + gcpHsmSecretOption = + String.format( + "type:gcphsmgeneratedsecret;project_id:%s;location_id:%s;key_ring_id:%s;key_id:%s;job_name:%s", + PROJECT_ID, locationId, keyRingId, keyId, secretId); + } catch (Exception e) { + gcpHsmSecretOption = null; + } } @AfterClass @@ -97,6 +131,16 @@ public static void tearDown() throws IOException { SecretName secretName = SecretName.of(PROJECT_ID, secretId); client.deleteSecret(secretName); } + if (gcpHsmSecretOption != null) { + KeyManagementServiceClient kmsClient = KeyManagementServiceClient.create(); + CryptoKeyName keyName = CryptoKeyName.of(PROJECT_ID, "global", keyRingId, keyId); + for (CryptoKeyVersion version : kmsClient.listCryptoKeyVersions(keyName).iterateAll()) { + if (version.getState() == CryptoKeyVersion.CryptoKeyVersionState.ENABLED + || version.getState() == CryptoKeyVersion.CryptoKeyVersionState.DISABLED) { + kmsClient.destroyCryptoKeyVersion(version.getName()); + } + } + } } @Test @@ -135,6 +179,81 @@ public void testGroupByKeyWithValidGcpSecretOption() throws Exception { p.run(); } + @Test + public void testGroupByKeyWithValidGcpHsmGeneratedSecretOption() throws Exception { + if (gcpHsmSecretOption == null) { + // Skip test if we couldn't set up KMS + return; + } + PipelineOptions options = TestPipeline.testingPipelineOptions(); + options.setGbek(gcpHsmSecretOption); + Pipeline p = Pipeline.create(options); + List> ungroupedPairs = + Arrays.asList( + KV.of("k1", 3), + KV.of("k5", Integer.MAX_VALUE), + KV.of("k5", Integer.MIN_VALUE), + KV.of("k2", 66), + KV.of("k1", 4), + KV.of("k2", -33), + KV.of("k3", 0)); + + PCollection> input = + p.apply( + Create.of(ungroupedPairs) + .withCoder(KvCoder.of(StringUtf8Coder.of(), VarIntCoder.of()))); + + PCollection>> output = input.apply(GroupByKey.create()); + + PAssert.that(output) + .containsInAnyOrder( + KV.of("k1", Arrays.asList(3, 4)), + KV.of("k5", Arrays.asList(Integer.MAX_VALUE, Integer.MIN_VALUE)), + KV.of("k2", Arrays.asList(66, -33)), + KV.of("k3", Arrays.asList(0))); + + p.run(); + } + + @Test + public void testGroupByKeyWithExistingGcpHsmGeneratedSecretOption() throws Exception { + if (gcpHsmSecretOption == null) { + // Skip test if we couldn't set up KMS + return; + } + // Create the secret beforehand + new GcpHsmGeneratedSecret(PROJECT_ID, "global", keyRingId, keyId, secretId).getSecretBytes(); + + PipelineOptions options = TestPipeline.testingPipelineOptions(); + options.setGbek(gcpHsmSecretOption); + Pipeline p = Pipeline.create(options); + List> ungroupedPairs = + Arrays.asList( + KV.of("k1", 3), + KV.of("k5", Integer.MAX_VALUE), + KV.of("k5", Integer.MIN_VALUE), + KV.of("k2", 66), + KV.of("k1", 4), + KV.of("k2", -33), + KV.of("k3", 0)); + + PCollection> input = + p.apply( + Create.of(ungroupedPairs) + .withCoder(KvCoder.of(StringUtf8Coder.of(), VarIntCoder.of()))); + + PCollection>> output = input.apply(GroupByKey.create()); + + PAssert.that(output) + .containsInAnyOrder( + KV.of("k1", Arrays.asList(3, 4)), + KV.of("k5", Arrays.asList(Integer.MAX_VALUE, Integer.MIN_VALUE)), + KV.of("k2", Arrays.asList(66, -33)), + KV.of("k3", Arrays.asList(0))); + + p.run(); + } + @Test public void testGroupByKeyWithInvalidGcpSecretOption() throws Exception { if (gcpSecretVersionName == null) { From bb33098d1812e4f521061d73da207d60e9597236 Mon Sep 17 00:00:00 2001 From: Danny Mccormick Date: Tue, 25 Nov 2025 09:02:38 -0500 Subject: [PATCH 03/18] fix deps --- .../beam/gradle/BeamModulePlugin.groovy | 2 ++ sdks/java/core/build.gradle | 2 ++ .../beam/sdk/util/GcpHsmGeneratedSecret.java | 5 ++-- .../java/org/apache/beam/sdk/util/Secret.java | 30 +++++++++++++++---- .../transforms/GroupByEncryptedKeyTest.java | 3 +- .../beam/sdk/transforms/GroupByKeyIT.java | 5 +--- 6 files changed, 33 insertions(+), 14 deletions(-) diff --git a/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy b/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy index 3f299916db8c..1af833a9f080 100644 --- a/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy +++ b/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy @@ -753,6 +753,7 @@ class BeamModulePlugin implements Plugin { google_cloud_dataflow_java_proto_library_all: "com.google.cloud.dataflow:google-cloud-dataflow-java-proto-library-all:0.5.160304", google_cloud_datastore_v1_proto_client : "com.google.cloud.datastore:datastore-v1-proto-client:2.32.3", // [bomupgrader] sets version google_cloud_firestore : "com.google.cloud:google-cloud-firestore", // google_cloud_platform_libraries_bom sets version + google_cloud_kms : "com.google.cloud:google-cloud-kms", // google_cloud_platform_libraries_bom sets version google_cloud_pubsub : "com.google.cloud:google-cloud-pubsub", // google_cloud_platform_libraries_bom sets version google_cloud_pubsublite : "com.google.cloud:google-cloud-pubsublite", // google_cloud_platform_libraries_bom sets version // [bomupgrader] the BOM version is set by scripts/tools/bomupgrader.py. If update manually, also update @@ -763,6 +764,7 @@ class BeamModulePlugin implements Plugin { google_cloud_spanner_bom : "com.google.cloud:google-cloud-spanner-bom:$google_cloud_spanner_version", google_cloud_spanner : "com.google.cloud:google-cloud-spanner", // google_cloud_platform_libraries_bom sets version google_cloud_spanner_test : "com.google.cloud:google-cloud-spanner:$google_cloud_spanner_version:tests", + google_cloud_tink : "com.google.crypto.tink:tink:1.19.0", google_cloud_vertexai : "com.google.cloud:google-cloud-vertexai", // google_cloud_platform_libraries_bom sets version google_code_gson : "com.google.code.gson:gson:$google_code_gson_version", // google-http-client's version is explicitly declared for sdks/java/maven-archetypes/examples diff --git a/sdks/java/core/build.gradle b/sdks/java/core/build.gradle index 4a6d2f11973e..f8ddead054a3 100644 --- a/sdks/java/core/build.gradle +++ b/sdks/java/core/build.gradle @@ -100,6 +100,8 @@ dependencies { shadow library.java.snappy_java shadow library.java.joda_time implementation enforcedPlatform(library.java.google_cloud_platform_libraries_bom) + implementation library.java.google_cloud_kms + implementation library.java.google_cloud_tink implementation library.java.google_cloud_secret_manager implementation library.java.proto_google_cloud_secret_manager_v1 implementation library.java.protobuf_java diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/GcpHsmGeneratedSecret.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/GcpHsmGeneratedSecret.java index 6f1eea5baf6e..5a34dfc5e3e2 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/GcpHsmGeneratedSecret.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/GcpHsmGeneratedSecret.java @@ -25,9 +25,9 @@ import com.google.cloud.secretmanager.v1.AccessSecretVersionResponse; import com.google.cloud.secretmanager.v1.ProjectName; import com.google.cloud.secretmanager.v1.Replication; -import com.google.cloud.secretmanager.v1.SecretPayload; import com.google.cloud.secretmanager.v1.SecretManagerServiceClient; import com.google.cloud.secretmanager.v1.SecretName; +import com.google.cloud.secretmanager.v1.SecretPayload; import com.google.cloud.secretmanager.v1.SecretVersionName; import com.google.crypto.tink.subtle.Hkdf; import com.google.protobuf.ByteString; @@ -104,7 +104,8 @@ public byte[] getSecretBytes() { secretVersionName.toString()); } - SecretPayload payload = SecretPayload.newBuilder().setData(ByteString.copyFrom(newKey)).build(); + SecretPayload payload = + SecretPayload.newBuilder().setData(ByteString.copyFrom(newKey)).build(); client.addSecretVersion(secretName, payload); AccessSecretVersionResponse response = client.accessSecretVersion(secretVersionName); return response.getPayload().getData().toByteArray(); diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/Secret.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/Secret.java index e33d98506c27..ccffee181504 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/Secret.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/Secret.java @@ -88,12 +88,30 @@ static Secret parseSecretOption(String secretOption) { paramName, gcpHsmGeneratedSecretParams)); } } - return new GcpHsmGeneratedSecret( - paramMap.get("project_id"), - paramMap.get("location_id"), - paramMap.get("key_ring_id"), - paramMap.get("key_id"), - paramMap.get("job_name")); + String projectId = paramMap.get("project_id"); + if (projectId == null) { + throw new RuntimeException( + "project_id must contain a valid value for projectId parameter"); + } + String locationId = paramMap.get("location_id"); + if (locationId == null) { + throw new RuntimeException( + "location_id must contain a valid value for locationId parameter"); + } + String keyRingId = paramMap.get("key_ring_id"); + if (keyRingId == null) { + throw new RuntimeException( + "key_ring_id must contain a valid value for keyRingId parameter"); + } + String keyId = paramMap.get("key_id"); + if (keyId == null) { + throw new RuntimeException("key_id must contain a valid value for keyId parameter"); + } + String jobName = paramMap.get("job_name"); + if (jobName == null) { + throw new RuntimeException("job_name must contain a valid value for jobName parameter"); + } + return new GcpHsmGeneratedSecret(projectId, locationId, keyRingId, keyId, jobName); default: throw new RuntimeException( String.format( diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/GroupByEncryptedKeyTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/GroupByEncryptedKeyTest.java index d8f4b3e90b6c..809133fa783d 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/GroupByEncryptedKeyTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/GroupByEncryptedKeyTest.java @@ -152,8 +152,7 @@ public static void setup() throws IOException { keyId = "gbek-test-key-" + System.currentTimeMillis(); com.google.cloud.kms.v1.CryptoKey key = com.google.cloud.kms.v1.CryptoKey.newBuilder() - .setPurpose( - com.google.cloud.kms.v1.CryptoKey.CryptoKeyPurpose.ENCRYPT_DECRYPT) + .setPurpose(com.google.cloud.kms.v1.CryptoKey.CryptoKeyPurpose.ENCRYPT_DECRYPT) .build(); kmsClient.createCryptoKey(keyRingName, keyId, key); gcpHsmGeneratedSecret = diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/GroupByKeyIT.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/GroupByKeyIT.java index 1a6bf15bebcf..0dfa2f41982c 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/GroupByKeyIT.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/GroupByKeyIT.java @@ -17,7 +17,6 @@ */ package org.apache.beam.sdk.transforms; -import com.google.cloud.secretmanager.v1.ProjectName; import com.google.cloud.kms.v1.CryptoKey; import com.google.cloud.kms.v1.CryptoKeyName; import com.google.cloud.kms.v1.CryptoKeyVersion; @@ -111,9 +110,7 @@ public static void setup() throws IOException { keyId = "gbekit-key-" + new SecureRandom().nextInt(10000); CryptoKey key = - CryptoKey.newBuilder() - .setPurpose(CryptoKey.CryptoKeyPurpose.ENCRYPT_DECRYPT) - .build(); + CryptoKey.newBuilder().setPurpose(CryptoKey.CryptoKeyPurpose.ENCRYPT_DECRYPT).build(); kmsClient.createCryptoKey(keyRingName, keyId, key); gcpHsmSecretOption = String.format( From e620b69d8ad4b0e03110dbf6390033ed9bf745be Mon Sep 17 00:00:00 2001 From: Danny Mccormick Date: Tue, 25 Nov 2025 09:35:00 -0500 Subject: [PATCH 04/18] Imports --- .../beam/sdk/transforms/GroupByEncryptedKeyTest.java | 7 +++---- .../java/org/apache/beam/sdk/transforms/GroupByKeyIT.java | 8 ++++---- 2 files changed, 7 insertions(+), 8 deletions(-) diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/GroupByEncryptedKeyTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/GroupByEncryptedKeyTest.java index 809133fa783d..9fcc259d327e 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/GroupByEncryptedKeyTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/GroupByEncryptedKeyTest.java @@ -143,11 +143,10 @@ public static void setup() throws IOException { keyRingId = "gbek-test-key-ring-" + System.currentTimeMillis(); com.google.cloud.kms.v1.KeyRingName keyRingName = com.google.cloud.kms.v1.KeyRingName.of(PROJECT_ID, locationId, keyRingId); + com.google.cloud.kms.v1.LocationName locationName = + com.google.cloud.kms.v1.LocationName.of(PROJECT_ID, locationId); kmsClient.createKeyRing( - keyRingName.getProject(), - keyRingName.getLocation(), - keyRingId, - com.google.cloud.kms.v1.KeyRing.newBuilder().build()); + locationName, keyRingId, com.google.cloud.kms.v1.KeyRing.newBuilder().build()); keyId = "gbek-test-key-" + System.currentTimeMillis(); com.google.cloud.kms.v1.CryptoKey key = diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/GroupByKeyIT.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/GroupByKeyIT.java index 0dfa2f41982c..1657d6630104 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/GroupByKeyIT.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/GroupByKeyIT.java @@ -22,6 +22,7 @@ import com.google.cloud.kms.v1.CryptoKeyVersion; import com.google.cloud.kms.v1.KeyManagementServiceClient; import com.google.cloud.kms.v1.KeyRingName; +import com.google.cloud.kms.v1.LocationName; import com.google.cloud.secretmanager.v1.ProjectName; import com.google.cloud.secretmanager.v1.SecretManagerServiceClient; import com.google.cloud.secretmanager.v1.SecretName; @@ -38,6 +39,7 @@ import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.testing.PAssert; import org.apache.beam.sdk.testing.TestPipeline; +import org.apache.beam.sdk.util.GcpHsmGeneratedSecret; import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.PCollection; import org.junit.AfterClass; @@ -102,11 +104,9 @@ public static void setup() throws IOException { String locationId = "global"; keyRingId = "gbekit-key-ring-" + new SecureRandom().nextInt(10000); KeyRingName keyRingName = KeyRingName.of(PROJECT_ID, locationId, keyRingId); + LocationName locationName = LocationName.of(PROJECT_ID, locationId); kmsClient.createKeyRing( - keyRingName.getProject(), - keyRingName.getLocation(), - keyRingId, - com.google.cloud.kms.v1.KeyRing.newBuilder().build()); + locationName, keyRingId, com.google.cloud.kms.v1.KeyRing.newBuilder().build()); keyId = "gbekit-key-" + new SecureRandom().nextInt(10000); CryptoKey key = From 33eac039b2d31e1355bcb205aa39a340eeb8c7ff Mon Sep 17 00:00:00 2001 From: Danny Mccormick Date: Tue, 25 Nov 2025 09:56:28 -0500 Subject: [PATCH 05/18] Secret parsing tests --- .../beam/sdk/util/GcpHsmGeneratedSecret.java | 20 +++++++++++++++++++ .../org/apache/beam/sdk/util/SecretTest.java | 18 ++++++++++++++--- 2 files changed, 35 insertions(+), 3 deletions(-) diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/GcpHsmGeneratedSecret.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/GcpHsmGeneratedSecret.java index 5a34dfc5e3e2..eeaa3e536e61 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/GcpHsmGeneratedSecret.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/GcpHsmGeneratedSecret.java @@ -139,4 +139,24 @@ private byte[] generateDek() throws IOException, GeneralSecurityException { return Base64.getUrlEncoder().encode(dek); } } + + public String getProjectId() { + return projectId; + } + + public String getLocationId() { + return locationId; + } + + public String getKeyRingId() { + return keyRingId; + } + + public String getKeyId() { + return keyId; + } + + public String getSecretId() { + return secretId; + } } diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/SecretTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/SecretTest.java index dd4b125d73fe..0acfa3963462 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/SecretTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/SecretTest.java @@ -37,6 +37,20 @@ public void testParseSecretOptionWithValidGcpSecret() { assertEquals("my_secret/versions/latest", ((GcpSecret) secret).getVersionName()); } + @Test + public void testParseSecretOptionWithValidGcpHsmGeneratedSecret() { + String secretOption = + "type:gcphsmgeneratedsecret;project_id:my-project;location_id:global;key_ring_id:my-key-ring;key_id:my-key;job_name:my-job"; + Secret secret = Secret.parseSecretOption(secretOption); + assertTrue(secret instanceof GcpHsmGeneratedSecret); + GcpHsmGeneratedSecret hsmSecret = (GcpHsmGeneratedSecret) secret; + assertEquals("my-project", hsmSecret.getProjectId()); + assertEquals("global", hsmSecret.getLocationId()); + assertEquals("my-key-ring", hsmSecret.getKeyRingId()); + assertEquals("my-key", hsmSecret.getKeyId()); + assertEquals("HsmGeneratedSecret_my-job", hsmSecret.getSecretId()); + } + @Test public void testParseSecretOptionWithMissingType() { String secretOption = "version_name:my_secret/versions/latest"; @@ -50,9 +64,7 @@ public void testParseSecretOptionWithUnsupportedType() { String secretOption = "type:unsupported;version_name:my_secret/versions/latest"; Exception exception = assertThrows(RuntimeException.class, () -> Secret.parseSecretOption(secretOption)); - assertEquals( - "Invalid secret type unsupported, currently only GcpSecret is supported", - exception.getMessage()); + assertTrue(exception.getMessage().contains("Invalid secret type unsupported")); } @Test From 48c2e9c8cb1d0c3866a1f051339ad3ed96cc1662 Mon Sep 17 00:00:00 2001 From: Danny Mccormick Date: Tue, 25 Nov 2025 10:03:27 -0500 Subject: [PATCH 06/18] docs --- .../beam/sdk/util/GcpHsmGeneratedSecret.java | 28 ++++++++++++++++++- 1 file changed, 27 insertions(+), 1 deletion(-) diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/GcpHsmGeneratedSecret.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/GcpHsmGeneratedSecret.java index eeaa3e536e61..542b473db3c3 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/GcpHsmGeneratedSecret.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/GcpHsmGeneratedSecret.java @@ -39,7 +39,8 @@ import org.slf4j.LoggerFactory; /** - * A {@link Secret} manager implementation that retrieves secrets from Google Cloud Secret Manager. + * A {@link Secret} manager implementation that generates a secret using entropy from a GCP HSM key + * and stores it in Google Cloud Secret Manager. If the secret already exists, it will be retrieved. */ public class GcpHsmGeneratedSecret implements Secret { private static final Logger LOG = LoggerFactory.getLogger(GcpHsmGeneratedSecret.class); @@ -140,22 +141,47 @@ private byte[] generateDek() throws IOException, GeneralSecurityException { } } + /** + * Returns the project ID of the secret. + * + * @return The project ID as a String. + */ public String getProjectId() { return projectId; } + /** + * Returns the location ID of the secret. + * + * @return The location ID as a String. + */ public String getLocationId() { return locationId; } + /** + * Returns the key ring ID of the secret. + * + * @return The key ring ID as a String. + */ public String getKeyRingId() { return keyRingId; } + /** + * Returns the key ID of the secret. + * + * @return The key ID as a String. + */ public String getKeyId() { return keyId; } + /** + * Returns the secret ID of the secret. + * + * @return The secret ID as a String. + */ public String getSecretId() { return secretId; } From 2063095f19d631919abdf0a5b7dadcfb666d2d69 Mon Sep 17 00:00:00 2001 From: Danny Mccormick Date: Tue, 25 Nov 2025 10:17:06 -0500 Subject: [PATCH 07/18] more docs --- sdks/python/apache_beam/transforms/util.py | 57 ++++++++++++++++++---- 1 file changed, 47 insertions(+), 10 deletions(-) diff --git a/sdks/python/apache_beam/transforms/util.py b/sdks/python/apache_beam/transforms/util.py index 7e96a1093c1d..bda77eeb0d1d 100644 --- a/sdks/python/apache_beam/transforms/util.py +++ b/sdks/python/apache_beam/transforms/util.py @@ -420,8 +420,26 @@ def __eq__(self, secret): class GcpHsmGeneratedSecret(Secret): + """A secret manager implementation that generates a secret using a GCP HSM key + and stores it in Google Cloud Secret Manager. If the secret already exists, + it will be retrieved. + """ def __init__( - self, project_id: str, location_id: str, key_ring_id: str, key_id: str, job_name: str): + self, + project_id: str, + location_id: str, + key_ring_id: str, + key_id: str, + job_name: str): + """Initializes a GcpHsmGeneratedSecret object. + + Args: + project_id: The GCP project ID. + location_id: The GCP location ID for the HSM key. + key_ring_id: The ID of the KMS key ring. + key_id: The ID of the KMS key. + job_name: The name of the job, used to generate a unique secret name. + """ self._project_id = project_id self._location_id = location_id self._key_ring_id = key_ring_id @@ -429,6 +447,15 @@ def __init__( self._secret_version_name = f'HsmGeneratedSecret_{job_name}' def get_secret_bytes(self) -> bytes: + """Retrieves the secret bytes. + + If the secret version already exists in Secret Manager, it is retrieved. + Otherwise, a new secret and version are created. The new secret is + generated using the HSM key. + + Returns: + The secret as a byte string. + """ try: from google.cloud import secretmanager from google.api_core import exceptions as api_exceptions @@ -454,7 +481,11 @@ def get_secret_bytes(self) -> bytes: request={ "parent": project_path, "secret_id": self._secret_version_name, - "secret": {"replication": {"automatic": {}}}, + "secret": { + "replication": { + "automatic": {} + } + }, }) except api_exceptions.AlreadyExists: # Don't bother logging yet, we'll only log if we actually add the @@ -473,7 +504,11 @@ def get_secret_bytes(self) -> bytes: f"Secret version {secret_version_path} not found. " "Creating new secret and version.") client.add_secret_version( - request={"parent": secret_path, "payload": {"data": new_key}}) + request={ + "parent": secret_path, "payload": { + "data": new_key + } + }) response = client.access_secret_version( request={"name": secret_version_path}) return response.payload.data @@ -482,15 +517,18 @@ def get_secret_bytes(self) -> bytes: raise RuntimeError( f'Failed to retrieve or create secret bytes for secret ' f'{self._secret_version_name} with exception {e}') - + def generate_dek(self, dek_size: int = 32) -> bytes: """Generates a new Data Encryption Key (DEK) using an HSM-backed key. This function follows a key derivation process that incorporates entropy from the HSM-backed key into the nonce used for key derivation. + Args: + dek_size: The size of the DEK to generate. + Returns: - A new DEK of the specified size. + A new DEK of the specified size, url-safe base64-encoded. """ try: import base64 @@ -505,12 +543,11 @@ def generate_dek(self, dek_size: int = 32) -> bytes: # 2. Use the HSM-backed key to encrypt nonce_one to create nonce_two kms_client = kms.KeyManagementServiceClient() key_path = kms_client.crypto_key_path( - self._project_id, - self._location_id, - self._key_ring_id, - self._key_id) + self._project_id, self._location_id, self._key_ring_id, self._key_id) response = kms_client.encrypt( - request={'name': key_path, 'plaintext': nonce_one}) + request={ + 'name': key_path, 'plaintext': nonce_one + }) nonce_two = response.ciphertext # 3. Generate a Derivation Key (DK) From 85c7ef9a140f7c49cc0ec61734036120a75ddc53 Mon Sep 17 00:00:00 2001 From: Danny Mccormick Date: Tue, 25 Nov 2025 10:54:42 -0500 Subject: [PATCH 08/18] formatting + test cleanup --- .../resources/beam/checkstyle/suppressions.xml | 1 + .../python/apache_beam/transforms/core_it_test.py | 15 +++++++-------- 2 files changed, 8 insertions(+), 8 deletions(-) diff --git a/sdks/java/build-tools/src/main/resources/beam/checkstyle/suppressions.xml b/sdks/java/build-tools/src/main/resources/beam/checkstyle/suppressions.xml index 53cd7b7ad4d0..ef4cbdb5ba02 100644 --- a/sdks/java/build-tools/src/main/resources/beam/checkstyle/suppressions.xml +++ b/sdks/java/build-tools/src/main/resources/beam/checkstyle/suppressions.xml @@ -57,6 +57,7 @@ + diff --git a/sdks/python/apache_beam/transforms/core_it_test.py b/sdks/python/apache_beam/transforms/core_it_test.py index a4658d51a61f..f24e52e97c18 100644 --- a/sdks/python/apache_beam/transforms/core_it_test.py +++ b/sdks/python/apache_beam/transforms/core_it_test.py @@ -92,8 +92,7 @@ def setUpClass(cls): except Exception: cls.kms_client.create_key_ring( request={ - 'parent': - f'projects/{cls.project_id}/locations/{cls.location_id}', + 'parent': f'projects/{cls.project_id}/locations/{cls.location_id}', 'key_ring_id': cls.key_ring_id, }) cls.key_id = 'gbekit_key_tests_' + secret_postfix @@ -119,16 +118,16 @@ def setUpClass(cls): def tearDownClass(cls): if secretmanager is not None: cls.client.delete_secret(request={'name': cls.secret_path}) - if kms is not None and hasattr(cls, 'kms_client') and hasattr( - cls, 'key_path'): + if kms is not None and hasattr(cls, 'kms_client') and hasattr(cls, + 'key_path'): for version in cls.kms_client.list_crypto_key_versions( request={'parent': cls.key_path}): - if version.state in [ - kms.CryptoKeyVersion.CryptoKeyVersionState.ENABLED, - kms.CryptoKeyVersion.CryptoKeyVersionState.DISABLED - ]: + try: cls.kms_client.destroy_crypto_key_version( request={'name': version.name}) + except: + # Best effort deletion + pass @pytest.mark.it_postcommit @unittest.skipIf(secretmanager is None, 'GCP dependencies are not installed') From b4db1c6a3f6050a63771063f0b78a60a99578980 Mon Sep 17 00:00:00 2001 From: Danny Mccormick Date: Tue, 25 Nov 2025 11:32:59 -0500 Subject: [PATCH 09/18] lint --- sdks/python/apache_beam/transforms/util.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sdks/python/apache_beam/transforms/util.py b/sdks/python/apache_beam/transforms/util.py index bda77eeb0d1d..85d65894cbb5 100644 --- a/sdks/python/apache_beam/transforms/util.py +++ b/sdks/python/apache_beam/transforms/util.py @@ -362,7 +362,7 @@ def parse_secret_option(secret) -> 'Secret': secret_type = param_map['type'].lower() del param_map['type'] - secret_class = None + secret_class: Secret secret_params = None if secret_type == 'gcpsecret': secret_class = GcpSecret From de18f762f7efaf1633ebac514ceecc01ef5711c1 Mon Sep 17 00:00:00 2001 From: Danny Mccormick Date: Tue, 25 Nov 2025 12:04:01 -0500 Subject: [PATCH 10/18] lint --- sdks/python/apache_beam/transforms/core_it_test.py | 5 +++-- sdks/python/apache_beam/transforms/util.py | 8 ++++---- sdks/python/apache_beam/transforms/util_test.py | 4 ++-- 3 files changed, 9 insertions(+), 8 deletions(-) diff --git a/sdks/python/apache_beam/transforms/core_it_test.py b/sdks/python/apache_beam/transforms/core_it_test.py index f24e52e97c18..8140c9697c75 100644 --- a/sdks/python/apache_beam/transforms/core_it_test.py +++ b/sdks/python/apache_beam/transforms/core_it_test.py @@ -90,9 +90,10 @@ def setUpClass(cls): try: cls.kms_client.get_key_ring(request={'name': cls.key_ring_path}) except Exception: + parent = f'projects/{cls.project_id}/locations/{cls.location_id}' cls.kms_client.create_key_ring( request={ - 'parent': f'projects/{cls.project_id}/locations/{cls.location_id}', + 'parent': parent, 'key_ring_id': cls.key_ring_id, }) cls.key_id = 'gbekit_key_tests_' + secret_postfix @@ -125,7 +126,7 @@ def tearDownClass(cls): try: cls.kms_client.destroy_crypto_key_version( request={'name': version.name}) - except: + except Exception: # Best effort deletion pass diff --git a/sdks/python/apache_beam/transforms/util.py b/sdks/python/apache_beam/transforms/util.py index 85d65894cbb5..38cb99955b2a 100644 --- a/sdks/python/apache_beam/transforms/util.py +++ b/sdks/python/apache_beam/transforms/util.py @@ -30,7 +30,6 @@ import threading import time import uuid -from datetime import datetime from collections.abc import Callable from collections.abc import Iterable from typing import TYPE_CHECKING @@ -362,7 +361,7 @@ def parse_secret_option(secret) -> 'Secret': secret_type = param_map['type'].lower() del param_map['type'] - secret_class: Secret + secret_class = Secret secret_params = None if secret_type == 'gcpsecret': secret_class = GcpSecret @@ -501,8 +500,9 @@ def get_secret_bytes(self) -> bytes: return response.payload.data except api_exceptions.NotFound: logging.info( - f"Secret version {secret_version_path} not found. " - "Creating new secret and version.") + "Secret version %s not found. " + "Creating new secret and version.", + secret_version_path) client.add_secret_version( request={ "parent": secret_path, "payload": { diff --git a/sdks/python/apache_beam/transforms/util_test.py b/sdks/python/apache_beam/transforms/util_test.py index a6270e9f6a0b..b154aaea840e 100644 --- a/sdks/python/apache_beam/transforms/util_test.py +++ b/sdks/python/apache_beam/transforms/util_test.py @@ -495,8 +495,8 @@ def test_happy_path_secret_creation(self): # Assertions on mocks secret_version_path = ( - f'projects/{project_id}/secrets/{secret._secret_version_name}/versions/1' - ) + f'projects/{project_id}/secrets/{secret._secret_version_name}' + '/versions/1') self.mock_secret_manager_client.access_secret_version.assert_any_call( request={'name': secret_version_path}) self.assertEqual( From 68e0903d4cedfef1c99be8b8558506f3fb524af5 Mon Sep 17 00:00:00 2001 From: Danny Mccormick Date: Tue, 25 Nov 2025 12:36:28 -0500 Subject: [PATCH 11/18] lint --- sdks/python/apache_beam/transforms/util.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/sdks/python/apache_beam/transforms/util.py b/sdks/python/apache_beam/transforms/util.py index 38cb99955b2a..9c68b3081113 100644 --- a/sdks/python/apache_beam/transforms/util.py +++ b/sdks/python/apache_beam/transforms/util.py @@ -364,10 +364,10 @@ def parse_secret_option(secret) -> 'Secret': secret_class = Secret secret_params = None if secret_type == 'gcpsecret': - secret_class = GcpSecret + secret_class = GcpSecret # pylint: disable=assignment secret_params = ['version_name'] elif secret_type == 'gcphsmgeneratedsecret': - secret_class = GcpHsmGeneratedSecret + secret_class = GcpHsmGeneratedSecret # pylint: disable=assignment secret_params = [ 'project_id', 'location_id', 'key_ring_id', 'key_id', 'job_name' ] From ba425c0f231b6864f885020974d45fc0ae2b006d Mon Sep 17 00:00:00 2001 From: Danny Mccormick Date: Tue, 25 Nov 2025 13:00:05 -0500 Subject: [PATCH 12/18] lint --- sdks/python/apache_beam/transforms/util.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/sdks/python/apache_beam/transforms/util.py b/sdks/python/apache_beam/transforms/util.py index 9c68b3081113..e438ed86ab9b 100644 --- a/sdks/python/apache_beam/transforms/util.py +++ b/sdks/python/apache_beam/transforms/util.py @@ -364,10 +364,10 @@ def parse_secret_option(secret) -> 'Secret': secret_class = Secret secret_params = None if secret_type == 'gcpsecret': - secret_class = GcpSecret # pylint: disable=assignment + secret_class = GcpSecret # type: ignore[assignment] secret_params = ['version_name'] elif secret_type == 'gcphsmgeneratedsecret': - secret_class = GcpHsmGeneratedSecret # pylint: disable=assignment + secret_class = GcpHsmGeneratedSecret # type: ignore[assignment] secret_params = [ 'project_id', 'location_id', 'key_ring_id', 'key_id', 'job_name' ] From 8c1f4299719f5dd8b12beb2d08d57948dae6b7cc Mon Sep 17 00:00:00 2001 From: Danny Mccormick Date: Tue, 25 Nov 2025 13:32:53 -0500 Subject: [PATCH 13/18] import order --- sdks/python/apache_beam/transforms/util.py | 9 +++++---- sdks/python/apache_beam/transforms/util_test.py | 2 +- 2 files changed, 6 insertions(+), 5 deletions(-) diff --git a/sdks/python/apache_beam/transforms/util.py b/sdks/python/apache_beam/transforms/util.py index e438ed86ab9b..fbaab6b4ebbb 100644 --- a/sdks/python/apache_beam/transforms/util.py +++ b/sdks/python/apache_beam/transforms/util.py @@ -364,10 +364,10 @@ def parse_secret_option(secret) -> 'Secret': secret_class = Secret secret_params = None if secret_type == 'gcpsecret': - secret_class = GcpSecret # type: ignore[assignment] + secret_class = GcpSecret # type: ignore[assignment] secret_params = ['version_name'] elif secret_type == 'gcphsmgeneratedsecret': - secret_class = GcpHsmGeneratedSecret # type: ignore[assignment] + secret_class = GcpHsmGeneratedSecret # type: ignore[assignment] secret_params = [ 'project_id', 'location_id', 'key_ring_id', 'key_id', 'job_name' ] @@ -456,8 +456,8 @@ def get_secret_bytes(self) -> bytes: The secret as a byte string. """ try: - from google.cloud import secretmanager from google.api_core import exceptions as api_exceptions + from google.cloud import secretmanager client = secretmanager.SecretManagerServiceClient() project_path = f"projects/{self._project_id}" @@ -533,8 +533,9 @@ def generate_dek(self, dek_size: int = 32) -> bytes: try: import base64 import os - from cryptography.hazmat.primitives.kdf.hkdf import HKDF + from cryptography.hazmat.primitives import hashes + from cryptography.hazmat.primitives.kdf.hkdf import HKDF from google.cloud import kms # 1. Generate a random nonce (nonce_one) diff --git a/sdks/python/apache_beam/transforms/util_test.py b/sdks/python/apache_beam/transforms/util_test.py index b154aaea840e..dd5e19519faf 100644 --- a/sdks/python/apache_beam/transforms/util_test.py +++ b/sdks/python/apache_beam/transforms/util_test.py @@ -72,8 +72,8 @@ from apache_beam.transforms.core import FlatMapTuple from apache_beam.transforms.trigger import AfterCount from apache_beam.transforms.trigger import Repeatedly -from apache_beam.transforms.util import GcpSecret from apache_beam.transforms.util import GcpHsmGeneratedSecret +from apache_beam.transforms.util import GcpSecret from apache_beam.transforms.util import Secret from apache_beam.transforms.window import FixedWindows from apache_beam.transforms.window import GlobalWindow From e005dc9feb2c90f9c1e1caeaf11c81b6e84c6023 Mon Sep 17 00:00:00 2001 From: Danny Mccormick Date: Tue, 25 Nov 2025 16:21:58 -0500 Subject: [PATCH 14/18] Deps + style exemption --- .../org/apache/beam/gradle/BeamModulePlugin.groovy | 1 + sdks/java/core/build.gradle | 4 ++++ .../apache/beam/sdk/util/GcpHsmGeneratedSecret.java | 10 +++++++--- 3 files changed, 12 insertions(+), 3 deletions(-) diff --git a/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy b/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy index 1af833a9f080..f657362275b2 100644 --- a/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy +++ b/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy @@ -864,6 +864,7 @@ class BeamModulePlugin implements Plugin { proto_google_cloud_datacatalog_v1beta1 : "com.google.api.grpc:proto-google-cloud-datacatalog-v1beta1", // google_cloud_platform_libraries_bom sets version proto_google_cloud_datastore_v1 : "com.google.api.grpc:proto-google-cloud-datastore-v1", // google_cloud_platform_libraries_bom sets version proto_google_cloud_firestore_v1 : "com.google.api.grpc:proto-google-cloud-firestore-v1", // google_cloud_platform_libraries_bom sets version + proto_google_cloud_kms_v1 : "com.google.api.grpc:proto-google-cloud-kms-v1", // google_cloud_platform_libraries_bom sets version proto_google_cloud_pubsub_v1 : "com.google.api.grpc:proto-google-cloud-pubsub-v1", // google_cloud_platform_libraries_bom sets version proto_google_cloud_pubsublite_v1 : "com.google.api.grpc:proto-google-cloud-pubsublite-v1", // google_cloud_platform_libraries_bom sets version proto_google_cloud_secret_manager_v1 : "com.google.api.grpc:proto-google-cloud-secretmanager-v1", // google_cloud_platform_libraries_bom sets version diff --git a/sdks/java/core/build.gradle b/sdks/java/core/build.gradle index f8ddead054a3..003ef1958f28 100644 --- a/sdks/java/core/build.gradle +++ b/sdks/java/core/build.gradle @@ -100,7 +100,9 @@ dependencies { shadow library.java.snappy_java shadow library.java.joda_time implementation enforcedPlatform(library.java.google_cloud_platform_libraries_bom) + implementation library.java.gax implementation library.java.google_cloud_kms + implementation library.java.proto_google_cloud_kms_v1 implementation library.java.google_cloud_tink implementation library.java.google_cloud_secret_manager implementation library.java.proto_google_cloud_secret_manager_v1 @@ -130,6 +132,8 @@ dependencies { shadowTest library.java.log4j2_api shadowTest library.java.jamm shadowTest 'com.google.cloud:google-cloud-secretmanager:2.75.0' + shadowTest 'com.google.cloud:google-cloud-kms:2.75.0' + shadowTest 'com.google.crypto.tink:tink:1.19.0' testRuntimeOnly library.java.slf4j_jdk14 } diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/GcpHsmGeneratedSecret.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/GcpHsmGeneratedSecret.java index 542b473db3c3..d4296f0130d1 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/GcpHsmGeneratedSecret.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/GcpHsmGeneratedSecret.java @@ -31,6 +31,7 @@ import com.google.cloud.secretmanager.v1.SecretVersionName; import com.google.crypto.tink.subtle.Hkdf; import com.google.protobuf.ByteString; +import edu.umd.cs.findbugs.annotations.SuppressFBWarnings; import java.io.IOException; import java.security.GeneralSecurityException; import java.security.SecureRandom; @@ -39,8 +40,9 @@ import org.slf4j.LoggerFactory; /** - * A {@link Secret} manager implementation that generates a secret using entropy from a GCP HSM key - * and stores it in Google Cloud Secret Manager. If the secret already exists, it will be retrieved. + * A {@link org.apache.beam.sdk.util.Secret} manager implementation that generates a secret using + * entropy from a GCP HSM key and stores it in Google Cloud Secret Manager. If the secret already + * exists, it will be retrieved. */ public class GcpHsmGeneratedSecret implements Secret { private static final Logger LOG = LoggerFactory.getLogger(GcpHsmGeneratedSecret.class); @@ -116,10 +118,12 @@ public byte[] getSecretBytes() { } } + @SuppressFBWarnings("DMI_RANDOM_USED_ONLY_ONCE") // intended, used for non-random nonceOne private byte[] generateDek() throws IOException, GeneralSecurityException { int dekSize = 32; try (KeyManagementServiceClient client = KeyManagementServiceClient.create()) { - // 1. Generate nonce_one + // 1. Generate nonce_one. This doesn't need to have baked in randomness since the + // actual randomness comes from KMS. SecureRandom random = new SecureRandom(); byte[] nonceOne = new byte[dekSize]; random.nextBytes(nonceOne); From cde99a936525a890306d395e8ce841899696b059 Mon Sep 17 00:00:00 2001 From: Danny Mccormick Date: Wed, 26 Nov 2025 16:07:50 -0500 Subject: [PATCH 15/18] reuse key: --- .../transforms/GroupByEncryptedKeyTest.java | 51 ++++++++----------- .../beam/sdk/transforms/GroupByKeyIT.java | 48 ++++++++--------- 2 files changed, 44 insertions(+), 55 deletions(-) diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/GroupByEncryptedKeyTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/GroupByEncryptedKeyTest.java index 9fcc259d327e..1fa68021fd5d 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/GroupByEncryptedKeyTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/GroupByEncryptedKeyTest.java @@ -104,8 +104,8 @@ public void testGroupByKeyFakeSecret() { private static final String SECRET_ID = "gbek-test"; private static Secret gcpSecret; private static Secret gcpHsmGeneratedSecret; - private static String keyRingId; - private static String keyId; + private static final String KEY_RING_ID = "gbek-test-key-ring"; + private static final String KEY_ID = "gbek-test-key"; @BeforeClass public static void setup() throws IOException { @@ -140,26 +140,34 @@ public static void setup() throws IOException { com.google.cloud.kms.v1.KeyManagementServiceClient kmsClient = com.google.cloud.kms.v1.KeyManagementServiceClient.create(); String locationId = "global"; - keyRingId = "gbek-test-key-ring-" + System.currentTimeMillis(); com.google.cloud.kms.v1.KeyRingName keyRingName = - com.google.cloud.kms.v1.KeyRingName.of(PROJECT_ID, locationId, keyRingId); + com.google.cloud.kms.v1.KeyRingName.of(PROJECT_ID, locationId, KEY_RING_ID); com.google.cloud.kms.v1.LocationName locationName = com.google.cloud.kms.v1.LocationName.of(PROJECT_ID, locationId); - kmsClient.createKeyRing( - locationName, keyRingId, com.google.cloud.kms.v1.KeyRing.newBuilder().build()); + try { + kmsClient.getKeyRing(keyRingName); + } catch (Exception e) { + kmsClient.createKeyRing( + locationName, KEY_RING_ID, com.google.cloud.kms.v1.KeyRing.newBuilder().build()); + } - keyId = "gbek-test-key-" + System.currentTimeMillis(); - com.google.cloud.kms.v1.CryptoKey key = - com.google.cloud.kms.v1.CryptoKey.newBuilder() - .setPurpose(com.google.cloud.kms.v1.CryptoKey.CryptoKeyPurpose.ENCRYPT_DECRYPT) - .build(); - kmsClient.createCryptoKey(keyRingName, keyId, key); + com.google.cloud.kms.v1.CryptoKeyName keyName = + com.google.cloud.kms.v1.CryptoKeyName.of(PROJECT_ID, locationId, KEY_RING_ID, KEY_ID); + try { + kmsClient.getCryptoKey(keyName); + } catch (Exception e) { + com.google.cloud.kms.v1.CryptoKey key = + com.google.cloud.kms.v1.CryptoKey.newBuilder() + .setPurpose(com.google.cloud.kms.v1.CryptoKey.CryptoKeyPurpose.ENCRYPT_DECRYPT) + .build(); + kmsClient.createCryptoKey(keyRingName, KEY_ID, key); + } gcpHsmGeneratedSecret = new GcpHsmGeneratedSecret( PROJECT_ID, locationId, - keyRingId, - keyId, + KEY_RING_ID, + KEY_ID, String.format("gbek-test-job-%d", new SecureRandom().nextInt(10000))); } catch (Exception e) { gcpHsmGeneratedSecret = null; @@ -171,21 +179,6 @@ public static void tearDown() throws IOException { SecretManagerServiceClient client = SecretManagerServiceClient.create(); SecretName secretName = SecretName.of(PROJECT_ID, SECRET_ID); client.deleteSecret(secretName); - if (gcpHsmGeneratedSecret != null) { - com.google.cloud.kms.v1.KeyManagementServiceClient kmsClient = - com.google.cloud.kms.v1.KeyManagementServiceClient.create(); - com.google.cloud.kms.v1.CryptoKeyName keyName = - com.google.cloud.kms.v1.CryptoKeyName.of(PROJECT_ID, "global", keyRingId, keyId); - for (com.google.cloud.kms.v1.CryptoKeyVersion version : - kmsClient.listCryptoKeyVersions(keyName).iterateAll()) { - if (version.getState() - == com.google.cloud.kms.v1.CryptoKeyVersion.CryptoKeyVersionState.ENABLED - || version.getState() - == com.google.cloud.kms.v1.CryptoKeyVersion.CryptoKeyVersionState.DISABLED) { - kmsClient.destroyCryptoKeyVersion(version.getName()); - } - } - } } @Test diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/GroupByKeyIT.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/GroupByKeyIT.java index 1657d6630104..4173756f643f 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/GroupByKeyIT.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/GroupByKeyIT.java @@ -19,10 +19,8 @@ import com.google.cloud.kms.v1.CryptoKey; import com.google.cloud.kms.v1.CryptoKeyName; -import com.google.cloud.kms.v1.CryptoKeyVersion; import com.google.cloud.kms.v1.KeyManagementServiceClient; import com.google.cloud.kms.v1.KeyRingName; -import com.google.cloud.kms.v1.LocationName; import com.google.cloud.secretmanager.v1.ProjectName; import com.google.cloud.secretmanager.v1.SecretManagerServiceClient; import com.google.cloud.secretmanager.v1.SecretName; @@ -60,8 +58,8 @@ public class GroupByKeyIT { private static String gcpSecretVersionName; private static String gcpHsmSecretOption; private static String secretId; - private static String keyRingId; - private static String keyId; + private static final String KEY_RING_ID = "gbek-it-key-ring"; + private static final String KEY_ID = "gbek-it-key"; @BeforeClass public static void setup() throws IOException { @@ -102,20 +100,28 @@ public static void setup() throws IOException { try { KeyManagementServiceClient kmsClient = KeyManagementServiceClient.create(); String locationId = "global"; - keyRingId = "gbekit-key-ring-" + new SecureRandom().nextInt(10000); - KeyRingName keyRingName = KeyRingName.of(PROJECT_ID, locationId, keyRingId); - LocationName locationName = LocationName.of(PROJECT_ID, locationId); - kmsClient.createKeyRing( - locationName, keyRingId, com.google.cloud.kms.v1.KeyRing.newBuilder().build()); - - keyId = "gbekit-key-" + new SecureRandom().nextInt(10000); - CryptoKey key = - CryptoKey.newBuilder().setPurpose(CryptoKey.CryptoKeyPurpose.ENCRYPT_DECRYPT).build(); - kmsClient.createCryptoKey(keyRingName, keyId, key); + KeyRingName keyRingName = KeyRingName.of(PROJECT_ID, locationId, KEY_RING_ID); + com.google.cloud.kms.v1.LocationName locationName = + com.google.cloud.kms.v1.LocationName.of(PROJECT_ID, locationId); + try { + kmsClient.getKeyRing(keyRingName); + } catch (Exception e) { + kmsClient.createKeyRing( + locationName, KEY_RING_ID, com.google.cloud.kms.v1.KeyRing.newBuilder().build()); + } + + CryptoKeyName keyName = CryptoKeyName.of(PROJECT_ID, locationId, KEY_RING_ID, KEY_ID); + try { + kmsClient.getCryptoKey(keyName); + } catch (Exception e) { + CryptoKey key = + CryptoKey.newBuilder().setPurpose(CryptoKey.CryptoKeyPurpose.ENCRYPT_DECRYPT).build(); + kmsClient.createCryptoKey(keyRingName, KEY_ID, key); + } gcpHsmSecretOption = String.format( "type:gcphsmgeneratedsecret;project_id:%s;location_id:%s;key_ring_id:%s;key_id:%s;job_name:%s", - PROJECT_ID, locationId, keyRingId, keyId, secretId); + PROJECT_ID, locationId, KEY_RING_ID, KEY_ID, secretId); } catch (Exception e) { gcpHsmSecretOption = null; } @@ -128,16 +134,6 @@ public static void tearDown() throws IOException { SecretName secretName = SecretName.of(PROJECT_ID, secretId); client.deleteSecret(secretName); } - if (gcpHsmSecretOption != null) { - KeyManagementServiceClient kmsClient = KeyManagementServiceClient.create(); - CryptoKeyName keyName = CryptoKeyName.of(PROJECT_ID, "global", keyRingId, keyId); - for (CryptoKeyVersion version : kmsClient.listCryptoKeyVersions(keyName).iterateAll()) { - if (version.getState() == CryptoKeyVersion.CryptoKeyVersionState.ENABLED - || version.getState() == CryptoKeyVersion.CryptoKeyVersionState.DISABLED) { - kmsClient.destroyCryptoKeyVersion(version.getName()); - } - } - } } @Test @@ -219,7 +215,7 @@ public void testGroupByKeyWithExistingGcpHsmGeneratedSecretOption() throws Excep return; } // Create the secret beforehand - new GcpHsmGeneratedSecret(PROJECT_ID, "global", keyRingId, keyId, secretId).getSecretBytes(); + new GcpHsmGeneratedSecret(PROJECT_ID, "global", KEY_RING_ID, KEY_ID, secretId).getSecretBytes(); PipelineOptions options = TestPipeline.testingPipelineOptions(); options.setGbek(gcpHsmSecretOption); From 4eb9f8ece1f1097d818b4faffb53e822e08cee51 Mon Sep 17 00:00:00 2001 From: Danny Mccormick Date: Wed, 26 Nov 2025 16:26:07 -0500 Subject: [PATCH 16/18] reuse key --- sdks/python/apache_beam/transforms/core_it_test.py | 14 ++------------ 1 file changed, 2 insertions(+), 12 deletions(-) diff --git a/sdks/python/apache_beam/transforms/core_it_test.py b/sdks/python/apache_beam/transforms/core_it_test.py index 8140c9697c75..2cdb770b5972 100644 --- a/sdks/python/apache_beam/transforms/core_it_test.py +++ b/sdks/python/apache_beam/transforms/core_it_test.py @@ -84,7 +84,7 @@ def setUpClass(cls): cls.location_id = 'global' py_version = f'_py{sys.version_info.major}{sys.version_info.minor}' secret_postfix = datetime.now().strftime('%m%d_%H%M%S') + py_version - cls.key_ring_id = 'gbekit_key_ring_tests_' + secret_postfix + cls.key_ring_id = 'gbekit_key_ring_tests' cls.key_ring_path = cls.kms_client.key_ring_path( cls.project_id, cls.location_id, cls.key_ring_id) try: @@ -96,7 +96,7 @@ def setUpClass(cls): 'parent': parent, 'key_ring_id': cls.key_ring_id, }) - cls.key_id = 'gbekit_key_tests_' + secret_postfix + cls.key_id = 'gbekit_key_tests' cls.key_path = cls.kms_client.crypto_key_path( cls.project_id, cls.location_id, cls.key_ring_id, cls.key_id) try: @@ -119,16 +119,6 @@ def setUpClass(cls): def tearDownClass(cls): if secretmanager is not None: cls.client.delete_secret(request={'name': cls.secret_path}) - if kms is not None and hasattr(cls, 'kms_client') and hasattr(cls, - 'key_path'): - for version in cls.kms_client.list_crypto_key_versions( - request={'parent': cls.key_path}): - try: - cls.kms_client.destroy_crypto_key_version( - request={'name': version.name}) - except Exception: - # Best effort deletion - pass @pytest.mark.it_postcommit @unittest.skipIf(secretmanager is None, 'GCP dependencies are not installed') From 8ce227ffc8649f569877760a05d624ce91ada3fb Mon Sep 17 00:00:00 2001 From: Danny Mccormick Date: Tue, 2 Dec 2025 10:05:28 -0500 Subject: [PATCH 17/18] Feedback --- .../trigger_files/beam_PostCommit_Python.json | 2 +- .../beam/sdk/util/GcpHsmGeneratedSecret.java | 7 ++- .../java/org/apache/beam/sdk/util/Secret.java | 52 +++++++++---------- .../beam/sdk/transforms/GroupByKeyIT.java | 10 ++-- 4 files changed, 33 insertions(+), 38 deletions(-) diff --git a/.github/trigger_files/beam_PostCommit_Python.json b/.github/trigger_files/beam_PostCommit_Python.json index ed56f65ef50f..06bd728be6d7 100644 --- a/.github/trigger_files/beam_PostCommit_Python.json +++ b/.github/trigger_files/beam_PostCommit_Python.json @@ -1,5 +1,5 @@ { "comment": "Modify this file in a trivial way to cause this test suite to run.", - "modification": 32 + "modification": 33 } diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/GcpHsmGeneratedSecret.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/GcpHsmGeneratedSecret.java index d4296f0130d1..493330ad5561 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/GcpHsmGeneratedSecret.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/GcpHsmGeneratedSecret.java @@ -31,7 +31,6 @@ import com.google.cloud.secretmanager.v1.SecretVersionName; import com.google.crypto.tink.subtle.Hkdf; import com.google.protobuf.ByteString; -import edu.umd.cs.findbugs.annotations.SuppressFBWarnings; import java.io.IOException; import java.security.GeneralSecurityException; import java.security.SecureRandom; @@ -52,6 +51,8 @@ public class GcpHsmGeneratedSecret implements Secret { private final String keyId; private final String secretId; + private final SecureRandom random = new SecureRandom(); + public GcpHsmGeneratedSecret( String projectId, String locationId, String keyRingId, String keyId, String jobName) { this.projectId = projectId; @@ -98,7 +99,7 @@ public byte[] getSecretBytes() { byte[] newKey = generateDek(); try { - // Try to access again in case another thread created it. + // Always retrieve remote secret as source-of-truth in case another thread created it AccessSecretVersionResponse response = client.accessSecretVersion(secretVersionName); return response.getPayload().getData().toByteArray(); } catch (NotFoundException e) { @@ -118,13 +119,11 @@ public byte[] getSecretBytes() { } } - @SuppressFBWarnings("DMI_RANDOM_USED_ONLY_ONCE") // intended, used for non-random nonceOne private byte[] generateDek() throws IOException, GeneralSecurityException { int dekSize = 32; try (KeyManagementServiceClient client = KeyManagementServiceClient.create()) { // 1. Generate nonce_one. This doesn't need to have baked in randomness since the // actual randomness comes from KMS. - SecureRandom random = new SecureRandom(); byte[] nonceOne = new byte[dekSize]; random.nextBytes(nonceOne); diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/Secret.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/Secret.java index ccffee181504..f8efde0dd44c 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/Secret.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/Secret.java @@ -23,6 +23,7 @@ import java.util.HashSet; import java.util.Map; import java.util.Set; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions; /** * A secret management interface used for handling sensitive data. @@ -70,11 +71,10 @@ static Secret parseSecretOption(String secretOption) { paramName, gcpSecretParams)); } } - String versionName = paramMap.get("version_name"); - if (versionName == null) { - throw new RuntimeException( - "version_name must contain a valid value for versionName parameter"); - } + String versionName = + Preconditions.checkNotNull( + paramMap.get("version_name"), + "version_name must contain a valid value for versionName parameter"); return new GcpSecret(versionName); case "gcphsmgeneratedsecret": Set gcpHsmGeneratedSecretParams = @@ -88,29 +88,25 @@ static Secret parseSecretOption(String secretOption) { paramName, gcpHsmGeneratedSecretParams)); } } - String projectId = paramMap.get("project_id"); - if (projectId == null) { - throw new RuntimeException( - "project_id must contain a valid value for projectId parameter"); - } - String locationId = paramMap.get("location_id"); - if (locationId == null) { - throw new RuntimeException( - "location_id must contain a valid value for locationId parameter"); - } - String keyRingId = paramMap.get("key_ring_id"); - if (keyRingId == null) { - throw new RuntimeException( - "key_ring_id must contain a valid value for keyRingId parameter"); - } - String keyId = paramMap.get("key_id"); - if (keyId == null) { - throw new RuntimeException("key_id must contain a valid value for keyId parameter"); - } - String jobName = paramMap.get("job_name"); - if (jobName == null) { - throw new RuntimeException("job_name must contain a valid value for jobName parameter"); - } + String projectId = + Preconditions.checkNotNull( + paramMap.get("project_id"), + "project_id must contain a valid value for projectId parameter"); + String locationId = + Preconditions.checkNotNull( + paramMap.get("location_id"), + "location_id must contain a valid value for locationId parameter"); + String keyRingId = + Preconditions.checkNotNull( + paramMap.get("key_ring_id"), + "key_ring_id must contain a valid value for keyRingId parameter"); + String keyId = + Preconditions.checkNotNull( + paramMap.get("key_id"), "key_id must contain a valid value for keyId parameter"); + String jobName = + Preconditions.checkNotNull( + paramMap.get("job_name"), + "job_name must contain a valid value for jobName parameter"); return new GcpHsmGeneratedSecret(projectId, locationId, keyRingId, keyId, jobName); default: throw new RuntimeException( diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/GroupByKeyIT.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/GroupByKeyIT.java index 4173756f643f..431bdf448bea 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/GroupByKeyIT.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/GroupByKeyIT.java @@ -60,6 +60,7 @@ public class GroupByKeyIT { private static String secretId; private static final String KEY_RING_ID = "gbek-it-key-ring"; private static final String KEY_ID = "gbek-it-key"; + private static final String LOCATION_ID = "global"; @BeforeClass public static void setup() throws IOException { @@ -99,10 +100,9 @@ public static void setup() throws IOException { try { KeyManagementServiceClient kmsClient = KeyManagementServiceClient.create(); - String locationId = "global"; - KeyRingName keyRingName = KeyRingName.of(PROJECT_ID, locationId, KEY_RING_ID); + KeyRingName keyRingName = KeyRingName.of(PROJECT_ID, LOCATION_ID, KEY_RING_ID); com.google.cloud.kms.v1.LocationName locationName = - com.google.cloud.kms.v1.LocationName.of(PROJECT_ID, locationId); + com.google.cloud.kms.v1.LocationName.of(PROJECT_ID, LOCATION_ID); try { kmsClient.getKeyRing(keyRingName); } catch (Exception e) { @@ -110,7 +110,7 @@ public static void setup() throws IOException { locationName, KEY_RING_ID, com.google.cloud.kms.v1.KeyRing.newBuilder().build()); } - CryptoKeyName keyName = CryptoKeyName.of(PROJECT_ID, locationId, KEY_RING_ID, KEY_ID); + CryptoKeyName keyName = CryptoKeyName.of(PROJECT_ID, LOCATION_ID, KEY_RING_ID, KEY_ID); try { kmsClient.getCryptoKey(keyName); } catch (Exception e) { @@ -121,7 +121,7 @@ public static void setup() throws IOException { gcpHsmSecretOption = String.format( "type:gcphsmgeneratedsecret;project_id:%s;location_id:%s;key_ring_id:%s;key_id:%s;job_name:%s", - PROJECT_ID, locationId, KEY_RING_ID, KEY_ID, secretId); + PROJECT_ID, LOCATION_ID, KEY_RING_ID, KEY_ID, secretId); } catch (Exception e) { gcpHsmSecretOption = null; } From 814097617ac9d2cde9ac195d6ef596d0017ea8a8 Mon Sep 17 00:00:00 2001 From: Danny Mccormick Date: Tue, 2 Dec 2025 15:48:08 -0500 Subject: [PATCH 18/18] Test fixes --- .../org/apache/beam/sdk/transforms/GroupByEncryptedKeyTest.java | 2 ++ 1 file changed, 2 insertions(+) diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/GroupByEncryptedKeyTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/GroupByEncryptedKeyTest.java index 1fa68021fd5d..77195533ace3 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/GroupByEncryptedKeyTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/GroupByEncryptedKeyTest.java @@ -169,6 +169,8 @@ public static void setup() throws IOException { KEY_RING_ID, KEY_ID, String.format("gbek-test-job-%d", new SecureRandom().nextInt(10000))); + // Validate we have crypto permissions or skip these tests. + gcpHsmGeneratedSecret.getSecretBytes(); } catch (Exception e) { gcpHsmGeneratedSecret = null; }