From 5f73ac6cd6603159d69dc731e43daeefdb6ccae8 Mon Sep 17 00:00:00 2001 From: Amar3tto Date: Wed, 26 Nov 2025 16:10:50 +0000 Subject: [PATCH 01/11] Set Dataflow container to release version. --- runners/google-cloud-dataflow-java/build.gradle | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/runners/google-cloud-dataflow-java/build.gradle b/runners/google-cloud-dataflow-java/build.gradle index 9f064f2432bc..a18a61e505d4 100644 --- a/runners/google-cloud-dataflow-java/build.gradle +++ b/runners/google-cloud-dataflow-java/build.gradle @@ -52,8 +52,8 @@ evaluationDependsOn(":sdks:java:container:java11") ext.dataflowLegacyEnvironmentMajorVersion = '8' ext.dataflowFnapiEnvironmentMajorVersion = '8' -ext.dataflowLegacyContainerVersion = 'beam-master-20251107' -ext.dataflowFnapiContainerVersion = 'beam-master-20251107' +ext.dataflowLegacyContainerVersion = '2.70.0' +ext.dataflowFnapiContainerVersion = '2.70.0' ext.dataflowContainerBaseRepository = 'gcr.io/cloud-dataflow/v1beta3' processResources { From e075a50101ace82386cb20637721e310c29f53f7 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Rados=C5=82aw=20Stankiewicz?= Date: Wed, 26 Nov 2025 21:43:15 +0100 Subject: [PATCH 02/11] fix retry scenario for query to table materialization (#36912) (#36916) * fix retry scenario for query to table materialization * fix retry scenario for query to table materialization --- .../beam/sdk/io/gcp/bigquery/BigQueryIO.java | 2 ++ .../bigquery/BigQueryStorageQuerySource.java | 22 +++++++++++++++++++ 2 files changed, 24 insertions(+) diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java index 7aef1bd1ce02..69b9c62ceea9 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java @@ -979,6 +979,8 @@ public void processElement( getParseFn(), getOutputCoder(), getBigQueryServices()); + // due to retry, table may already exist, remove it to ensure correctness + querySource.removeDestinationIfExists(options.as(BigQueryOptions.class)); Table queryResultTable = querySource.getTargetTable(options.as(BigQueryOptions.class)); BigQueryStorageTableSource output = diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryStorageQuerySource.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryStorageQuerySource.java index a2350ef19a74..07c3273c293c 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryStorageQuerySource.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryStorageQuerySource.java @@ -17,6 +17,7 @@ */ package org.apache.beam.sdk.io.gcp.bigquery; +import static org.apache.beam.sdk.io.gcp.bigquery.BigQueryResourceNaming.createTempTableReference; import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkNotNull; import com.google.api.services.bigquery.model.JobStatistics; @@ -25,6 +26,7 @@ import com.google.cloud.bigquery.storage.v1.DataFormat; import java.io.IOException; import java.io.ObjectInputStream; +import java.util.Optional; import java.util.concurrent.atomic.AtomicReference; import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.TypedRead.QueryPriority; @@ -188,4 +190,24 @@ public long getEstimatedSizeBytes(PipelineOptions options) throws Exception { protected @Nullable String getTargetTableId(BigQueryOptions options) throws Exception { return null; } + + void removeDestinationIfExists(BigQueryOptions options) throws Exception { + DatasetService datasetService = bqServices.getDatasetService(options.as(BigQueryOptions.class)); + String project = queryTempProject; + if (project == null) { + project = + options.as(BigQueryOptions.class).getBigQueryProject() == null + ? options.as(BigQueryOptions.class).getProject() + : options.as(BigQueryOptions.class).getBigQueryProject(); + } + String tempTableID = + BigQueryResourceNaming.createJobIdPrefix( + options.getJobName(), stepUuid, BigQueryResourceNaming.JobType.QUERY); + TableReference tempTableReference = + createTempTableReference(project, tempTableID, Optional.ofNullable(queryTempDataset)); + Table destTable = datasetService.getTable(tempTableReference); + if (destTable != null) { + datasetService.deleteTable(tempTableReference); + } + } } From 8c36858af039dc2c2dad1433e9c8233f3d1e100c Mon Sep 17 00:00:00 2001 From: Danny McCormick Date: Wed, 26 Nov 2025 20:36:55 -0500 Subject: [PATCH 03/11] Cherrypick in extras changes (#36917) * split hdfs into extra (#36773) * split hdfs into extra * CHANGES * tox * try/catch * test fixes * add to coverage tasks * Update CHANGES to mention extras changes (#36875) --- CHANGES.md | 7 +++++-- sdks/python/apache_beam/io/hadoopfilesystem.py | 11 +++++++++-- sdks/python/apache_beam/io/hadoopfilesystem_test.py | 7 +++++++ sdks/python/setup.py | 2 +- sdks/python/tox.ini | 9 +++++---- 5 files changed, 27 insertions(+), 9 deletions(-) diff --git a/CHANGES.md b/CHANGES.md index 68af5a342d7d..33cf7070a5fe 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -81,7 +81,7 @@ Now Beam has full support for Milvus integration including Milvus enrichment and ## Breaking Changes -* X behavior was changed ([#X](https://github.com/apache/beam/issues/X)). +* (Python) Some Python dependencies have been split out into extras. To ensure all previously installed dependencies are installed, when installing Beam you can `pip install apache-beam[gcp,interactive,yaml,redis,hadoop,tfrecord]`, though most users will not need all of these extras ([#34554](https://github.com/apache/beam/issues/34554)). ## Deprecations @@ -123,7 +123,7 @@ Now Beam has full support for Milvus integration including Milvus enrichment and - This change only affects pipelines that explicitly use the `pickle_library=dill` pipeline option. - While `dill==0.3.1.1` is still pre-installed on the official Beam SDK base images, it is no longer a direct dependency of the apache-beam Python package. This means it can be overridden by other dependencies in your environment. - If your pipeline uses `pickle_library=dill`, you must manually ensure `dill==0.3.1.1` is installed in both your submission and runtime environments. - - Submission environment: Install the dill extra in your local environment `pip install apache-beam[gcpdill]`. + - Submission environment: Install the dill extra in your local environment `pip install apache-beam[gcp,dill]`. - Runtime (worker) environment: Your action depends on how you manage your worker's environment. - If using default containers or custom containers with the official Beam base image e.g. `FROM apache/beam_python3.10_sdk:2.69.0` - Add `dill==0.3.1.1` to your worker's requirements file (e.g., requirements.txt) @@ -137,6 +137,9 @@ Now Beam has full support for Milvus integration including Milvus enrichment and * (Python) The deterministic fallback coder for complex types like NamedTuple, Enum, and dataclasses now normalizes filepaths for better determinism guarantees. This affects streaming pipelines updating from 2.68 to 2.69 that utilize this fallback coder. If your pipeline is affected, you may see a warning like: "Using fallback deterministic coder for type X...". To update safely sepcify the pipeline option `--update_compatibility_version=2.68.0` ([#36345](https://github.com/apache/beam/pull/36345)). * (Python) Fixed transform naming conflict when executing DataTransform on a dictionary of PColls ([#30445](https://github.com/apache/beam/issues/30445)). This may break update compatibility if you don't provide a `--transform_name_mapping`. +* (Python) Split some extras out from the core Beam package. ([#30445](https://github.com/apache/beam/issues/30445)). + - If you use Enrichment with redis, Hadoop FileSystem, TFRecord, or some other packages, you may need to install some extras. + - To retain identical behavior to before, instead of `pip install apache-beam`, use `pip install apache-beam[hadoop,gcp,interactive,redis,test,tfrecord]`. * Removed deprecated Hadoop versions (2.10.2 and 3.2.4) that are no longer supported for [Iceberg](https://github.com/apache/iceberg/issues/10940) from IcebergIO ([#36282](https://github.com/apache/beam/issues/36282)). * (Go) Coder construction on SDK side is more faithful to the specs from runners without stripping length-prefix. This may break streaming pipeline update as the underlying coder could be changed ([#36387](https://github.com/apache/beam/issues/36387)). * Minimum Go version for Beam Go updated to 1.25.2 ([#36461](https://github.com/apache/beam/issues/36461)). diff --git a/sdks/python/apache_beam/io/hadoopfilesystem.py b/sdks/python/apache_beam/io/hadoopfilesystem.py index cf488c228a28..3287644eed8c 100644 --- a/sdks/python/apache_beam/io/hadoopfilesystem.py +++ b/sdks/python/apache_beam/io/hadoopfilesystem.py @@ -26,8 +26,6 @@ import re from typing import BinaryIO # pylint: disable=unused-import -import hdfs - from apache_beam.io import filesystemio from apache_beam.io.filesystem import BeamIOError from apache_beam.io.filesystem import CompressedFile @@ -37,6 +35,11 @@ from apache_beam.options.pipeline_options import HadoopFileSystemOptions from apache_beam.options.pipeline_options import PipelineOptions +try: + import hdfs +except ImportError: + hdfs = None + __all__ = ['HadoopFileSystem'] _HDFS_PREFIX = 'hdfs:/' @@ -108,6 +111,10 @@ def __init__(self, pipeline_options): See :class:`~apache_beam.options.pipeline_options.HadoopFileSystemOptions`. """ super().__init__(pipeline_options) + if hdfs is None: + raise ImportError( + 'Failed to import hdfs. You can ensure it is ' + 'installed by installing the hadoop beam extra') logging.getLogger('hdfs.client').setLevel(logging.WARN) if pipeline_options is None: raise ValueError('pipeline_options is not set') diff --git a/sdks/python/apache_beam/io/hadoopfilesystem_test.py b/sdks/python/apache_beam/io/hadoopfilesystem_test.py index 8c21effc8823..eb0925224dd3 100644 --- a/sdks/python/apache_beam/io/hadoopfilesystem_test.py +++ b/sdks/python/apache_beam/io/hadoopfilesystem_test.py @@ -32,6 +32,11 @@ from apache_beam.options.pipeline_options import HadoopFileSystemOptions from apache_beam.options.pipeline_options import PipelineOptions +try: + import hdfs as actual_hdfs +except ImportError: + actual_hdfs = None + class FakeFile(io.BytesIO): """File object for FakeHdfs""" @@ -201,6 +206,7 @@ def checksum(self, path): @parameterized_class(('full_urls', ), [(False, ), (True, )]) +@unittest.skipIf(actual_hdfs is None, "hdfs extra not installed") class HadoopFileSystemTest(unittest.TestCase): def setUp(self): self._fake_hdfs = FakeHdfs() @@ -607,6 +613,7 @@ def test_delete_error(self): self.assertFalse(self.fs.exists(url2)) +@unittest.skipIf(actual_hdfs is None, "hdfs extra not installed") class HadoopFileSystemRuntimeValueProviderTest(unittest.TestCase): """Tests pipeline_options, in the form of a RuntimeValueProvider.runtime_options object.""" diff --git a/sdks/python/setup.py b/sdks/python/setup.py index 289433f9ea5b..b700d7969832 100644 --- a/sdks/python/setup.py +++ b/sdks/python/setup.py @@ -379,7 +379,6 @@ def get_portability_package_data(): # TODO(https://github.com/grpc/grpc/issues/37710): Unpin grpc 'grpcio>=1.33.1,<2,!=1.48.0,!=1.59.*,!=1.60.*,!=1.61.*,!=1.62.0,!=1.62.1,<1.66.0; python_version <= "3.12"', # pylint: disable=line-too-long 'grpcio>=1.67.0; python_version >= "3.13"', - 'hdfs>=2.1.0,<3.0.0', 'httplib2>=0.8,<0.23.0', 'jsonpickle>=3.0.0,<4.0.0', # numpy can have breaking changes in minor versions. @@ -563,6 +562,7 @@ def get_portability_package_data(): # `--update` / `-U` flag to replace the dask release brought in # by distributed. ], + 'hadoop': ['hdfs>=2.1.0,<3.0.0'], 'yaml': [ 'docstring-parser>=0.15,<1.0', 'jinja2>=3.0,<3.2', diff --git a/sdks/python/tox.ini b/sdks/python/tox.ini index da0932728b20..431cd186c1b9 100644 --- a/sdks/python/tox.ini +++ b/sdks/python/tox.ini @@ -33,7 +33,7 @@ pip_pre = True # allow apps that support color to use it. passenv=TERM,CLOUDSDK_CONFIG,DOCKER_*,TESTCONTAINERS_*,TC_*,ALLOYDB_PASSWORD # Set [] options for pip installation of apache-beam tarball. -extras = test,dataframe,redis,tfrecord,yaml +extras = test,dataframe,hadoop,redis,tfrecord,yaml # Don't warn that these commands aren't installed. allowlist_externals = false @@ -97,8 +97,8 @@ install_command = {envbindir}/python.exe {envbindir}/pip.exe install --retries 1 list_dependencies_command = {envbindir}/python.exe {envbindir}/pip.exe freeze [testenv:py{310,311,312,313}-cloud] -; extras = test,gcp,interactive,dataframe,aws,azure,redis -extras = test,gcp,interactive,dataframe,aws,azure +; extras = test,gcp,interactive,dataframe,aws,azure +extras = test,hadoop,gcp,interactive,dataframe,aws,azure commands = python apache_beam/examples/complete/autocomplete_test.py bash {toxinidir}/scripts/run_pytest.sh {envname} "{posargs}" @@ -173,7 +173,7 @@ setenv = TC_SLEEP_TIME = {env:TC_SLEEP_TIME:1} # NOTE: we could add ml_test to increase the collected code coverage metrics, but it would make the suite slower. -extras = test,gcp,interactive,dataframe,aws,redis +extras = test,hadoop,gcp,interactive,dataframe,aws,redis commands = bash {toxinidir}/scripts/run_pytest.sh {envname} "{posargs}" "--cov-report=xml --cov=. --cov-append" @@ -228,6 +228,7 @@ deps = holdup==1.8.0 extras = gcp + hdfs allowlist_externals = bash echo From 05b3b14b41d4518e96a21ef7edb4ebecfe5f2628 Mon Sep 17 00:00:00 2001 From: Shunping Huang Date: Wed, 26 Nov 2025 20:36:00 -0500 Subject: [PATCH 04/11] Re-enable "PostCommit XVR GoUsingJava Dataflow" in cron (#36909) * Test docker config to fix workflow. * enable buildx and load in python docker. * Another attempt by setting buildx project property. * Another attempt * Re-enable workflow and sickbay some failed tests for now. * Add release trigger --- .../beam_PostCommit_XVR_GoUsingJava_Dataflow.json | 4 ++-- .../beam_PostCommit_XVR_GoUsingJava_Dataflow.yml | 8 ++++++-- sdks/go/test/integration/integration.go | 3 +++ 3 files changed, 11 insertions(+), 4 deletions(-) diff --git a/.github/trigger_files/beam_PostCommit_XVR_GoUsingJava_Dataflow.json b/.github/trigger_files/beam_PostCommit_XVR_GoUsingJava_Dataflow.json index 920c8d132e4a..b26833333238 100644 --- a/.github/trigger_files/beam_PostCommit_XVR_GoUsingJava_Dataflow.json +++ b/.github/trigger_files/beam_PostCommit_XVR_GoUsingJava_Dataflow.json @@ -1,4 +1,4 @@ { "comment": "Modify this file in a trivial way to cause this test suite to run", - "modification": 1 -} \ No newline at end of file + "modification": 2 +} diff --git a/.github/workflows/beam_PostCommit_XVR_GoUsingJava_Dataflow.yml b/.github/workflows/beam_PostCommit_XVR_GoUsingJava_Dataflow.yml index cc52476ca303..76aebfccb68a 100644 --- a/.github/workflows/beam_PostCommit_XVR_GoUsingJava_Dataflow.yml +++ b/.github/workflows/beam_PostCommit_XVR_GoUsingJava_Dataflow.yml @@ -16,11 +16,13 @@ # TODO(https://github.com/apache/beam/issues/32492): re-enable the suite # on cron and add release/trigger_all_tests.json to trigger path once fixed. -name: PostCommit XVR GoUsingJava Dataflow (disabled) +name: PostCommit XVR GoUsingJava Dataflow on: + schedule: + - cron: '45 5/6 * * *' pull_request_target: - paths: ['.github/trigger_files/beam_PostCommit_XVR_GoUsingJava_Dataflow.json'] + paths: ['.github/trigger_files/beam_PostCommit_XVR_GoUsingJava_Dataflow.json', 'release/trigger_all_tests.json'] workflow_dispatch: #Setting explicit permissions for the action to avoid the default permissions which are `write-all` in case of pull_request_target event @@ -83,6 +85,8 @@ jobs: uses: ./.github/actions/gradle-command-self-hosted-action with: gradle-command: :runners:google-cloud-dataflow-java:validatesCrossLanguageRunnerGoUsingJava + arguments: | + -PuseDockerBuildx - name: Archive JUnit Test Results uses: actions/upload-artifact@v4 if: ${{ !success() }} diff --git a/sdks/go/test/integration/integration.go b/sdks/go/test/integration/integration.go index b6d2c60e0fb9..ea23c5f9ae0e 100644 --- a/sdks/go/test/integration/integration.go +++ b/sdks/go/test/integration/integration.go @@ -301,6 +301,9 @@ var dataflowFilters = []string{ // There is no infrastructure for running KafkaIO tests with Dataflow. "TestKafkaIO.*", "TestSpannerIO.*", + // TODO(36918) These tests are currently failing in Dataflow Runner + "TestBigQueryIO.*", + "TestBigtableIO.*", // Dataflow doesn't support any test that requires loopback. // Eg. For FileIO examples. ".*Loopback.*", From 0af21d976b187e9ac7147b66869ed9681dd633fd Mon Sep 17 00:00:00 2001 From: Shunping Huang Date: Mon, 1 Dec 2025 15:19:14 -0500 Subject: [PATCH 05/11] Enforce version upper bound for scikit-learn. (#36948) (#36950) --- sdks/python/setup.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sdks/python/setup.py b/sdks/python/setup.py index b700d7969832..ef58b4f9c760 100644 --- a/sdks/python/setup.py +++ b/sdks/python/setup.py @@ -448,7 +448,7 @@ def get_portability_package_data(): 'pytest>=7.1.2,<9.0', 'pytest-xdist>=2.5.0,<4', 'pytest-timeout>=2.1.0,<3', - 'scikit-learn>=0.20.0', + 'scikit-learn>=0.20.0,<1.8.0', 'sqlalchemy>=1.3,<3.0', 'psycopg2-binary>=2.8.5,<3.0', 'testcontainers[mysql,kafka,milvus]>=4.0.0,<5.0.0', From 692fb4762f5f8b1c286b814734364d0c75dadeb5 Mon Sep 17 00:00:00 2001 From: Danny McCormick Date: Wed, 3 Dec 2025 10:51:11 -0500 Subject: [PATCH 06/11] Add new method of generating key for GBEK (#36891) (#36967) * Add new method of generating key for GBEK * Java version * fix deps * Imports * Secret parsing tests * docs * more docs * formatting + test cleanup * lint * lint * lint * lint * import order * Deps + style exemption * reuse key: * reuse key * Feedback * Test fixes --- .../trigger_files/beam_PostCommit_Python.json | 2 +- .../beam/gradle/BeamModulePlugin.groovy | 3 + .../beam/checkstyle/suppressions.xml | 1 + sdks/java/core/build.gradle | 6 + .../beam/sdk/util/GcpHsmGeneratedSecret.java | 191 ++++++++++++++++++ .../java/org/apache/beam/sdk/util/Secret.java | 45 ++++- .../transforms/GroupByEncryptedKeyTest.java | 80 ++++++++ .../beam/sdk/transforms/GroupByKeyIT.java | 112 ++++++++++ .../org/apache/beam/sdk/util/SecretTest.java | 18 +- .../apache_beam/transforms/core_it_test.py | 57 ++++++ sdks/python/apache_beam/transforms/util.py | 160 ++++++++++++++- .../apache_beam/transforms/util_test.py | 119 +++++++++++ sdks/python/setup.py | 1 + 13 files changed, 782 insertions(+), 13 deletions(-) create mode 100644 sdks/java/core/src/main/java/org/apache/beam/sdk/util/GcpHsmGeneratedSecret.java diff --git a/.github/trigger_files/beam_PostCommit_Python.json b/.github/trigger_files/beam_PostCommit_Python.json index ed56f65ef50f..06bd728be6d7 100644 --- a/.github/trigger_files/beam_PostCommit_Python.json +++ b/.github/trigger_files/beam_PostCommit_Python.json @@ -1,5 +1,5 @@ { "comment": "Modify this file in a trivial way to cause this test suite to run.", - "modification": 32 + "modification": 33 } diff --git a/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy b/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy index e941de9dfb64..595741f8efe4 100644 --- a/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy +++ b/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy @@ -755,6 +755,7 @@ class BeamModulePlugin implements Plugin { google_cloud_dataflow_java_proto_library_all: "com.google.cloud.dataflow:google-cloud-dataflow-java-proto-library-all:0.5.160304", google_cloud_datastore_v1_proto_client : "com.google.cloud.datastore:datastore-v1-proto-client:2.32.3", // [bomupgrader] sets version google_cloud_firestore : "com.google.cloud:google-cloud-firestore", // google_cloud_platform_libraries_bom sets version + google_cloud_kms : "com.google.cloud:google-cloud-kms", // google_cloud_platform_libraries_bom sets version google_cloud_pubsub : "com.google.cloud:google-cloud-pubsub", // google_cloud_platform_libraries_bom sets version google_cloud_pubsublite : "com.google.cloud:google-cloud-pubsublite", // google_cloud_platform_libraries_bom sets version // [bomupgrader] the BOM version is set by scripts/tools/bomupgrader.py. If update manually, also update @@ -765,6 +766,7 @@ class BeamModulePlugin implements Plugin { google_cloud_spanner_bom : "com.google.cloud:google-cloud-spanner-bom:$google_cloud_spanner_version", google_cloud_spanner : "com.google.cloud:google-cloud-spanner", // google_cloud_platform_libraries_bom sets version google_cloud_spanner_test : "com.google.cloud:google-cloud-spanner:$google_cloud_spanner_version:tests", + google_cloud_tink : "com.google.crypto.tink:tink:1.19.0", google_cloud_vertexai : "com.google.cloud:google-cloud-vertexai", // google_cloud_platform_libraries_bom sets version google_code_gson : "com.google.code.gson:gson:$google_code_gson_version", // google-http-client's version is explicitly declared for sdks/java/maven-archetypes/examples @@ -866,6 +868,7 @@ class BeamModulePlugin implements Plugin { proto_google_cloud_datacatalog_v1beta1 : "com.google.api.grpc:proto-google-cloud-datacatalog-v1beta1", // google_cloud_platform_libraries_bom sets version proto_google_cloud_datastore_v1 : "com.google.api.grpc:proto-google-cloud-datastore-v1", // google_cloud_platform_libraries_bom sets version proto_google_cloud_firestore_v1 : "com.google.api.grpc:proto-google-cloud-firestore-v1", // google_cloud_platform_libraries_bom sets version + proto_google_cloud_kms_v1 : "com.google.api.grpc:proto-google-cloud-kms-v1", // google_cloud_platform_libraries_bom sets version proto_google_cloud_pubsub_v1 : "com.google.api.grpc:proto-google-cloud-pubsub-v1", // google_cloud_platform_libraries_bom sets version proto_google_cloud_pubsublite_v1 : "com.google.api.grpc:proto-google-cloud-pubsublite-v1", // google_cloud_platform_libraries_bom sets version proto_google_cloud_secret_manager_v1 : "com.google.api.grpc:proto-google-cloud-secretmanager-v1", // google_cloud_platform_libraries_bom sets version diff --git a/sdks/java/build-tools/src/main/resources/beam/checkstyle/suppressions.xml b/sdks/java/build-tools/src/main/resources/beam/checkstyle/suppressions.xml index 53cd7b7ad4d0..ef4cbdb5ba02 100644 --- a/sdks/java/build-tools/src/main/resources/beam/checkstyle/suppressions.xml +++ b/sdks/java/build-tools/src/main/resources/beam/checkstyle/suppressions.xml @@ -57,6 +57,7 @@ + diff --git a/sdks/java/core/build.gradle b/sdks/java/core/build.gradle index 4f37ad47ec4c..74b6dfe4bba7 100644 --- a/sdks/java/core/build.gradle +++ b/sdks/java/core/build.gradle @@ -102,6 +102,10 @@ dependencies { shadow library.java.snappy_java shadow library.java.joda_time implementation enforcedPlatform(library.java.google_cloud_platform_libraries_bom) + implementation library.java.gax + implementation library.java.google_cloud_kms + implementation library.java.proto_google_cloud_kms_v1 + implementation library.java.google_cloud_tink implementation library.java.google_cloud_secret_manager implementation library.java.proto_google_cloud_secret_manager_v1 implementation library.java.protobuf_java @@ -130,6 +134,8 @@ dependencies { shadowTest library.java.log4j2_api shadowTest library.java.jamm shadowTest 'com.google.cloud:google-cloud-secretmanager:2.75.0' + shadowTest 'com.google.cloud:google-cloud-kms:2.75.0' + shadowTest 'com.google.crypto.tink:tink:1.19.0' testRuntimeOnly library.java.slf4j_jdk14 } diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/GcpHsmGeneratedSecret.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/GcpHsmGeneratedSecret.java new file mode 100644 index 000000000000..493330ad5561 --- /dev/null +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/GcpHsmGeneratedSecret.java @@ -0,0 +1,191 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.util; + +import com.google.api.gax.rpc.AlreadyExistsException; +import com.google.api.gax.rpc.NotFoundException; +import com.google.cloud.kms.v1.CryptoKeyName; +import com.google.cloud.kms.v1.EncryptResponse; +import com.google.cloud.kms.v1.KeyManagementServiceClient; +import com.google.cloud.secretmanager.v1.AccessSecretVersionResponse; +import com.google.cloud.secretmanager.v1.ProjectName; +import com.google.cloud.secretmanager.v1.Replication; +import com.google.cloud.secretmanager.v1.SecretManagerServiceClient; +import com.google.cloud.secretmanager.v1.SecretName; +import com.google.cloud.secretmanager.v1.SecretPayload; +import com.google.cloud.secretmanager.v1.SecretVersionName; +import com.google.crypto.tink.subtle.Hkdf; +import com.google.protobuf.ByteString; +import java.io.IOException; +import java.security.GeneralSecurityException; +import java.security.SecureRandom; +import java.util.Base64; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * A {@link org.apache.beam.sdk.util.Secret} manager implementation that generates a secret using + * entropy from a GCP HSM key and stores it in Google Cloud Secret Manager. If the secret already + * exists, it will be retrieved. + */ +public class GcpHsmGeneratedSecret implements Secret { + private static final Logger LOG = LoggerFactory.getLogger(GcpHsmGeneratedSecret.class); + private final String projectId; + private final String locationId; + private final String keyRingId; + private final String keyId; + private final String secretId; + + private final SecureRandom random = new SecureRandom(); + + public GcpHsmGeneratedSecret( + String projectId, String locationId, String keyRingId, String keyId, String jobName) { + this.projectId = projectId; + this.locationId = locationId; + this.keyRingId = keyRingId; + this.keyId = keyId; + this.secretId = "HsmGeneratedSecret_" + jobName; + } + + /** + * Returns the secret as a byte array. Assumes that the current active service account has + * permissions to read the secret. + * + * @return The secret as a byte array. + */ + @Override + public byte[] getSecretBytes() { + try (SecretManagerServiceClient client = SecretManagerServiceClient.create()) { + SecretVersionName secretVersionName = SecretVersionName.of(projectId, secretId, "1"); + + try { + AccessSecretVersionResponse response = client.accessSecretVersion(secretVersionName); + return response.getPayload().getData().toByteArray(); + } catch (NotFoundException e) { + LOG.info( + "Secret version {} not found. Creating new secret and version.", + secretVersionName.toString()); + } + + ProjectName projectName = ProjectName.of(projectId); + SecretName secretName = SecretName.of(projectId, secretId); + try { + com.google.cloud.secretmanager.v1.Secret secret = + com.google.cloud.secretmanager.v1.Secret.newBuilder() + .setReplication( + Replication.newBuilder() + .setAutomatic(Replication.Automatic.newBuilder().build())) + .build(); + client.createSecret(projectName, secretId, secret); + } catch (AlreadyExistsException e) { + LOG.info("Secret {} already exists. Adding new version.", secretName.toString()); + } + + byte[] newKey = generateDek(); + + try { + // Always retrieve remote secret as source-of-truth in case another thread created it + AccessSecretVersionResponse response = client.accessSecretVersion(secretVersionName); + return response.getPayload().getData().toByteArray(); + } catch (NotFoundException e) { + LOG.info( + "Secret version {} not found after re-check. Creating new secret and version.", + secretVersionName.toString()); + } + + SecretPayload payload = + SecretPayload.newBuilder().setData(ByteString.copyFrom(newKey)).build(); + client.addSecretVersion(secretName, payload); + AccessSecretVersionResponse response = client.accessSecretVersion(secretVersionName); + return response.getPayload().getData().toByteArray(); + + } catch (IOException | GeneralSecurityException e) { + throw new RuntimeException("Failed to retrieve or create secret bytes", e); + } + } + + private byte[] generateDek() throws IOException, GeneralSecurityException { + int dekSize = 32; + try (KeyManagementServiceClient client = KeyManagementServiceClient.create()) { + // 1. Generate nonce_one. This doesn't need to have baked in randomness since the + // actual randomness comes from KMS. + byte[] nonceOne = new byte[dekSize]; + random.nextBytes(nonceOne); + + // 2. Encrypt to get nonce_two + CryptoKeyName keyName = CryptoKeyName.of(projectId, locationId, keyRingId, keyId); + EncryptResponse response = client.encrypt(keyName, ByteString.copyFrom(nonceOne)); + byte[] nonceTwo = response.getCiphertext().toByteArray(); + + // 3. Generate DK + byte[] dk = new byte[dekSize]; + random.nextBytes(dk); + + // 4. Derive DEK using HKDF + byte[] dek = Hkdf.computeHkdf("HmacSha256", dk, nonceTwo, new byte[0], dekSize); + + // 5. Base64 encode + return Base64.getUrlEncoder().encode(dek); + } + } + + /** + * Returns the project ID of the secret. + * + * @return The project ID as a String. + */ + public String getProjectId() { + return projectId; + } + + /** + * Returns the location ID of the secret. + * + * @return The location ID as a String. + */ + public String getLocationId() { + return locationId; + } + + /** + * Returns the key ring ID of the secret. + * + * @return The key ring ID as a String. + */ + public String getKeyRingId() { + return keyRingId; + } + + /** + * Returns the key ID of the secret. + * + * @return The key ID as a String. + */ + public String getKeyId() { + return keyId; + } + + /** + * Returns the secret ID of the secret. + * + * @return The secret ID as a String. + */ + public String getSecretId() { + return secretId; + } +} diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/Secret.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/Secret.java index a75e01c9543f..f8efde0dd44c 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/Secret.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/Secret.java @@ -23,6 +23,7 @@ import java.util.HashSet; import java.util.Map; import java.util.Set; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions; /** * A secret management interface used for handling sensitive data. @@ -70,16 +71,48 @@ static Secret parseSecretOption(String secretOption) { paramName, gcpSecretParams)); } } - String versionName = paramMap.get("version_name"); - if (versionName == null) { - throw new RuntimeException( - "version_name must contain a valid value for versionName parameter"); - } + String versionName = + Preconditions.checkNotNull( + paramMap.get("version_name"), + "version_name must contain a valid value for versionName parameter"); return new GcpSecret(versionName); + case "gcphsmgeneratedsecret": + Set gcpHsmGeneratedSecretParams = + new HashSet<>( + Arrays.asList("project_id", "location_id", "key_ring_id", "key_id", "job_name")); + for (String paramName : paramMap.keySet()) { + if (!gcpHsmGeneratedSecretParams.contains(paramName)) { + throw new RuntimeException( + String.format( + "Invalid secret parameter %s, GcpHsmGeneratedSecret only supports the following parameters: %s", + paramName, gcpHsmGeneratedSecretParams)); + } + } + String projectId = + Preconditions.checkNotNull( + paramMap.get("project_id"), + "project_id must contain a valid value for projectId parameter"); + String locationId = + Preconditions.checkNotNull( + paramMap.get("location_id"), + "location_id must contain a valid value for locationId parameter"); + String keyRingId = + Preconditions.checkNotNull( + paramMap.get("key_ring_id"), + "key_ring_id must contain a valid value for keyRingId parameter"); + String keyId = + Preconditions.checkNotNull( + paramMap.get("key_id"), "key_id must contain a valid value for keyId parameter"); + String jobName = + Preconditions.checkNotNull( + paramMap.get("job_name"), + "job_name must contain a valid value for jobName parameter"); + return new GcpHsmGeneratedSecret(projectId, locationId, keyRingId, keyId, jobName); default: throw new RuntimeException( String.format( - "Invalid secret type %s, currently only GcpSecret is supported", secretType)); + "Invalid secret type %s, currently only GcpSecret and GcpHsmGeneratedSecret are supported", + secretType)); } } } diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/GroupByEncryptedKeyTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/GroupByEncryptedKeyTest.java index 31064470bd38..77195533ace3 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/GroupByEncryptedKeyTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/GroupByEncryptedKeyTest.java @@ -39,6 +39,7 @@ import org.apache.beam.sdk.testing.NeedsRunner; import org.apache.beam.sdk.testing.PAssert; import org.apache.beam.sdk.testing.TestPipeline; +import org.apache.beam.sdk.util.GcpHsmGeneratedSecret; import org.apache.beam.sdk.util.GcpSecret; import org.apache.beam.sdk.util.Secret; import org.apache.beam.sdk.values.KV; @@ -102,6 +103,9 @@ public void testGroupByKeyFakeSecret() { private static final String PROJECT_ID = "apache-beam-testing"; private static final String SECRET_ID = "gbek-test"; private static Secret gcpSecret; + private static Secret gcpHsmGeneratedSecret; + private static final String KEY_RING_ID = "gbek-test-key-ring"; + private static final String KEY_ID = "gbek-test-key"; @BeforeClass public static void setup() throws IOException { @@ -131,6 +135,45 @@ public static void setup() throws IOException { .build()); } gcpSecret = new GcpSecret(secretName.toString() + "/versions/latest"); + + try { + com.google.cloud.kms.v1.KeyManagementServiceClient kmsClient = + com.google.cloud.kms.v1.KeyManagementServiceClient.create(); + String locationId = "global"; + com.google.cloud.kms.v1.KeyRingName keyRingName = + com.google.cloud.kms.v1.KeyRingName.of(PROJECT_ID, locationId, KEY_RING_ID); + com.google.cloud.kms.v1.LocationName locationName = + com.google.cloud.kms.v1.LocationName.of(PROJECT_ID, locationId); + try { + kmsClient.getKeyRing(keyRingName); + } catch (Exception e) { + kmsClient.createKeyRing( + locationName, KEY_RING_ID, com.google.cloud.kms.v1.KeyRing.newBuilder().build()); + } + + com.google.cloud.kms.v1.CryptoKeyName keyName = + com.google.cloud.kms.v1.CryptoKeyName.of(PROJECT_ID, locationId, KEY_RING_ID, KEY_ID); + try { + kmsClient.getCryptoKey(keyName); + } catch (Exception e) { + com.google.cloud.kms.v1.CryptoKey key = + com.google.cloud.kms.v1.CryptoKey.newBuilder() + .setPurpose(com.google.cloud.kms.v1.CryptoKey.CryptoKeyPurpose.ENCRYPT_DECRYPT) + .build(); + kmsClient.createCryptoKey(keyRingName, KEY_ID, key); + } + gcpHsmGeneratedSecret = + new GcpHsmGeneratedSecret( + PROJECT_ID, + locationId, + KEY_RING_ID, + KEY_ID, + String.format("gbek-test-job-%d", new SecureRandom().nextInt(10000))); + // Validate we have crypto permissions or skip these tests. + gcpHsmGeneratedSecret.getSecretBytes(); + } catch (Exception e) { + gcpHsmGeneratedSecret = null; + } } @AfterClass @@ -183,6 +226,43 @@ public void testGroupByKeyGcpSecretThrows() { assertThrows(RuntimeException.class, () -> p.run()); } + @Test + @Category(NeedsRunner.class) + public void testGroupByKeyGcpHsmGeneratedSecret() { + if (gcpHsmGeneratedSecret == null) { + return; + } + List> ungroupedPairs = + Arrays.asList( + KV.of(null, 3), + KV.of("k1", 3), + KV.of("k5", Integer.MAX_VALUE), + KV.of("k5", Integer.MIN_VALUE), + KV.of("k2", 66), + KV.of("k1", 4), + KV.of(null, 5), + KV.of("k2", -33), + KV.of("k3", 0)); + + PCollection> input = + p.apply( + Create.of(ungroupedPairs) + .withCoder(KvCoder.of(NullableCoder.of(StringUtf8Coder.of()), VarIntCoder.of()))); + + PCollection>> output = + input.apply(GroupByEncryptedKey.create(gcpHsmGeneratedSecret)); + + PAssert.that(output.apply("Sort", MapElements.via(new SortValues()))) + .containsInAnyOrder( + KV.of("k1", Arrays.asList(3, 4)), + KV.of(null, Arrays.asList(3, 5)), + KV.of("k5", Arrays.asList(Integer.MIN_VALUE, Integer.MAX_VALUE)), + KV.of("k2", Arrays.asList(-33, 66)), + KV.of("k3", Arrays.asList(0))); + + p.run(); + } + private static class SortValues extends SimpleFunction>, KV>> { @Override diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/GroupByKeyIT.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/GroupByKeyIT.java index 1c8168a42a03..431bdf448bea 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/GroupByKeyIT.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/GroupByKeyIT.java @@ -17,6 +17,10 @@ */ package org.apache.beam.sdk.transforms; +import com.google.cloud.kms.v1.CryptoKey; +import com.google.cloud.kms.v1.CryptoKeyName; +import com.google.cloud.kms.v1.KeyManagementServiceClient; +import com.google.cloud.kms.v1.KeyRingName; import com.google.cloud.secretmanager.v1.ProjectName; import com.google.cloud.secretmanager.v1.SecretManagerServiceClient; import com.google.cloud.secretmanager.v1.SecretName; @@ -33,6 +37,7 @@ import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.testing.PAssert; import org.apache.beam.sdk.testing.TestPipeline; +import org.apache.beam.sdk.util.GcpHsmGeneratedSecret; import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.PCollection; import org.junit.AfterClass; @@ -51,7 +56,11 @@ public class GroupByKeyIT { private static final String PROJECT_ID = "apache-beam-testing"; private static final String SECRET_ID = "gbek-test"; private static String gcpSecretVersionName; + private static String gcpHsmSecretOption; private static String secretId; + private static final String KEY_RING_ID = "gbek-it-key-ring"; + private static final String KEY_ID = "gbek-it-key"; + private static final String LOCATION_ID = "global"; @BeforeClass public static void setup() throws IOException { @@ -88,6 +97,34 @@ public static void setup() throws IOException { .build()); } gcpSecretVersionName = secretName.toString() + "/versions/latest"; + + try { + KeyManagementServiceClient kmsClient = KeyManagementServiceClient.create(); + KeyRingName keyRingName = KeyRingName.of(PROJECT_ID, LOCATION_ID, KEY_RING_ID); + com.google.cloud.kms.v1.LocationName locationName = + com.google.cloud.kms.v1.LocationName.of(PROJECT_ID, LOCATION_ID); + try { + kmsClient.getKeyRing(keyRingName); + } catch (Exception e) { + kmsClient.createKeyRing( + locationName, KEY_RING_ID, com.google.cloud.kms.v1.KeyRing.newBuilder().build()); + } + + CryptoKeyName keyName = CryptoKeyName.of(PROJECT_ID, LOCATION_ID, KEY_RING_ID, KEY_ID); + try { + kmsClient.getCryptoKey(keyName); + } catch (Exception e) { + CryptoKey key = + CryptoKey.newBuilder().setPurpose(CryptoKey.CryptoKeyPurpose.ENCRYPT_DECRYPT).build(); + kmsClient.createCryptoKey(keyRingName, KEY_ID, key); + } + gcpHsmSecretOption = + String.format( + "type:gcphsmgeneratedsecret;project_id:%s;location_id:%s;key_ring_id:%s;key_id:%s;job_name:%s", + PROJECT_ID, LOCATION_ID, KEY_RING_ID, KEY_ID, secretId); + } catch (Exception e) { + gcpHsmSecretOption = null; + } } @AfterClass @@ -135,6 +172,81 @@ public void testGroupByKeyWithValidGcpSecretOption() throws Exception { p.run(); } + @Test + public void testGroupByKeyWithValidGcpHsmGeneratedSecretOption() throws Exception { + if (gcpHsmSecretOption == null) { + // Skip test if we couldn't set up KMS + return; + } + PipelineOptions options = TestPipeline.testingPipelineOptions(); + options.setGbek(gcpHsmSecretOption); + Pipeline p = Pipeline.create(options); + List> ungroupedPairs = + Arrays.asList( + KV.of("k1", 3), + KV.of("k5", Integer.MAX_VALUE), + KV.of("k5", Integer.MIN_VALUE), + KV.of("k2", 66), + KV.of("k1", 4), + KV.of("k2", -33), + KV.of("k3", 0)); + + PCollection> input = + p.apply( + Create.of(ungroupedPairs) + .withCoder(KvCoder.of(StringUtf8Coder.of(), VarIntCoder.of()))); + + PCollection>> output = input.apply(GroupByKey.create()); + + PAssert.that(output) + .containsInAnyOrder( + KV.of("k1", Arrays.asList(3, 4)), + KV.of("k5", Arrays.asList(Integer.MAX_VALUE, Integer.MIN_VALUE)), + KV.of("k2", Arrays.asList(66, -33)), + KV.of("k3", Arrays.asList(0))); + + p.run(); + } + + @Test + public void testGroupByKeyWithExistingGcpHsmGeneratedSecretOption() throws Exception { + if (gcpHsmSecretOption == null) { + // Skip test if we couldn't set up KMS + return; + } + // Create the secret beforehand + new GcpHsmGeneratedSecret(PROJECT_ID, "global", KEY_RING_ID, KEY_ID, secretId).getSecretBytes(); + + PipelineOptions options = TestPipeline.testingPipelineOptions(); + options.setGbek(gcpHsmSecretOption); + Pipeline p = Pipeline.create(options); + List> ungroupedPairs = + Arrays.asList( + KV.of("k1", 3), + KV.of("k5", Integer.MAX_VALUE), + KV.of("k5", Integer.MIN_VALUE), + KV.of("k2", 66), + KV.of("k1", 4), + KV.of("k2", -33), + KV.of("k3", 0)); + + PCollection> input = + p.apply( + Create.of(ungroupedPairs) + .withCoder(KvCoder.of(StringUtf8Coder.of(), VarIntCoder.of()))); + + PCollection>> output = input.apply(GroupByKey.create()); + + PAssert.that(output) + .containsInAnyOrder( + KV.of("k1", Arrays.asList(3, 4)), + KV.of("k5", Arrays.asList(Integer.MAX_VALUE, Integer.MIN_VALUE)), + KV.of("k2", Arrays.asList(66, -33)), + KV.of("k3", Arrays.asList(0))); + + p.run(); + } + @Test public void testGroupByKeyWithInvalidGcpSecretOption() throws Exception { if (gcpSecretVersionName == null) { diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/SecretTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/SecretTest.java index dd4b125d73fe..0acfa3963462 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/SecretTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/SecretTest.java @@ -37,6 +37,20 @@ public void testParseSecretOptionWithValidGcpSecret() { assertEquals("my_secret/versions/latest", ((GcpSecret) secret).getVersionName()); } + @Test + public void testParseSecretOptionWithValidGcpHsmGeneratedSecret() { + String secretOption = + "type:gcphsmgeneratedsecret;project_id:my-project;location_id:global;key_ring_id:my-key-ring;key_id:my-key;job_name:my-job"; + Secret secret = Secret.parseSecretOption(secretOption); + assertTrue(secret instanceof GcpHsmGeneratedSecret); + GcpHsmGeneratedSecret hsmSecret = (GcpHsmGeneratedSecret) secret; + assertEquals("my-project", hsmSecret.getProjectId()); + assertEquals("global", hsmSecret.getLocationId()); + assertEquals("my-key-ring", hsmSecret.getKeyRingId()); + assertEquals("my-key", hsmSecret.getKeyId()); + assertEquals("HsmGeneratedSecret_my-job", hsmSecret.getSecretId()); + } + @Test public void testParseSecretOptionWithMissingType() { String secretOption = "version_name:my_secret/versions/latest"; @@ -50,9 +64,7 @@ public void testParseSecretOptionWithUnsupportedType() { String secretOption = "type:unsupported;version_name:my_secret/versions/latest"; Exception exception = assertThrows(RuntimeException.class, () -> Secret.parseSecretOption(secretOption)); - assertEquals( - "Invalid secret type unsupported, currently only GcpSecret is supported", - exception.getMessage()); + assertTrue(exception.getMessage().contains("Invalid secret type unsupported")); } @Test diff --git a/sdks/python/apache_beam/transforms/core_it_test.py b/sdks/python/apache_beam/transforms/core_it_test.py index 18ae3f30f574..2cdb770b5972 100644 --- a/sdks/python/apache_beam/transforms/core_it_test.py +++ b/sdks/python/apache_beam/transforms/core_it_test.py @@ -38,6 +38,11 @@ except ImportError: secretmanager = None # type: ignore[assignment] +try: + from google.cloud import kms +except ImportError: + kms = None # type: ignore[assignment] + class GbekIT(unittest.TestCase): @classmethod @@ -74,6 +79,42 @@ def setUpClass(cls): cls.gcp_secret = GcpSecret(version_name) cls.secret_option = f'type:GcpSecret;version_name:{version_name}' + if kms is not None: + cls.kms_client = kms.KeyManagementServiceClient() + cls.location_id = 'global' + py_version = f'_py{sys.version_info.major}{sys.version_info.minor}' + secret_postfix = datetime.now().strftime('%m%d_%H%M%S') + py_version + cls.key_ring_id = 'gbekit_key_ring_tests' + cls.key_ring_path = cls.kms_client.key_ring_path( + cls.project_id, cls.location_id, cls.key_ring_id) + try: + cls.kms_client.get_key_ring(request={'name': cls.key_ring_path}) + except Exception: + parent = f'projects/{cls.project_id}/locations/{cls.location_id}' + cls.kms_client.create_key_ring( + request={ + 'parent': parent, + 'key_ring_id': cls.key_ring_id, + }) + cls.key_id = 'gbekit_key_tests' + cls.key_path = cls.kms_client.crypto_key_path( + cls.project_id, cls.location_id, cls.key_ring_id, cls.key_id) + try: + cls.kms_client.get_crypto_key(request={'name': cls.key_path}) + except Exception: + cls.kms_client.create_crypto_key( + request={ + 'parent': cls.key_ring_path, + 'crypto_key_id': cls.key_id, + 'crypto_key': { + 'purpose': kms.CryptoKey.CryptoKeyPurpose.ENCRYPT_DECRYPT + } + }) + cls.hsm_secret_option = ( + f'type:GcpHsmGeneratedSecret;project_id:{cls.project_id};' + f'location_id:{cls.location_id};key_ring_id:{cls.key_ring_id};' + f'key_id:{cls.key_id};job_name:{secret_postfix}') + @classmethod def tearDownClass(cls): if secretmanager is not None: @@ -94,6 +135,22 @@ def test_gbk_with_gbek_it(self): pipeline.run().wait_until_finish() + @pytest.mark.it_postcommit + @unittest.skipIf(secretmanager is None, 'GCP dependencies are not installed') + @unittest.skipIf(kms is None, 'GCP dependencies are not installed') + def test_gbk_with_gbek_hsm_it(self): + pipeline = TestPipeline(is_integration_test=True) + pipeline.options.view_as(SetupOptions).gbek = self.hsm_secret_option + + pcoll_1 = pipeline | 'Start 1' >> beam.Create([('a', 1), ('a', 2), ('b', 3), + ('c', 4)]) + result = (pcoll_1) | beam.GroupByKey() + sorted_result = result | beam.Map(lambda x: (x[0], sorted(x[1]))) + assert_that( + sorted_result, equal_to([('a', ([1, 2])), ('b', ([3])), ('c', ([4]))])) + + pipeline.run().wait_until_finish() + @pytest.mark.it_postcommit @unittest.skipIf(secretmanager is None, 'GCP dependencies are not installed') def test_combineValues_with_gbek_it(self): diff --git a/sdks/python/apache_beam/transforms/util.py b/sdks/python/apache_beam/transforms/util.py index ba79d4ddf31c..fbaab6b4ebbb 100644 --- a/sdks/python/apache_beam/transforms/util.py +++ b/sdks/python/apache_beam/transforms/util.py @@ -361,15 +361,20 @@ def parse_secret_option(secret) -> 'Secret': secret_type = param_map['type'].lower() del param_map['type'] - secret_class = None + secret_class = Secret secret_params = None if secret_type == 'gcpsecret': - secret_class = GcpSecret + secret_class = GcpSecret # type: ignore[assignment] secret_params = ['version_name'] + elif secret_type == 'gcphsmgeneratedsecret': + secret_class = GcpHsmGeneratedSecret # type: ignore[assignment] + secret_params = [ + 'project_id', 'location_id', 'key_ring_id', 'key_id', 'job_name' + ] else: raise ValueError( f'Invalid secret type {secret_type}, currently only ' - 'GcpSecret is supported') + 'GcpSecret and GcpHsmGeneratedSecret are supported') for param_name in param_map.keys(): if param_name not in secret_params: @@ -413,6 +418,155 @@ def __eq__(self, secret): return self._version_name == getattr(secret, '_version_name', None) +class GcpHsmGeneratedSecret(Secret): + """A secret manager implementation that generates a secret using a GCP HSM key + and stores it in Google Cloud Secret Manager. If the secret already exists, + it will be retrieved. + """ + def __init__( + self, + project_id: str, + location_id: str, + key_ring_id: str, + key_id: str, + job_name: str): + """Initializes a GcpHsmGeneratedSecret object. + + Args: + project_id: The GCP project ID. + location_id: The GCP location ID for the HSM key. + key_ring_id: The ID of the KMS key ring. + key_id: The ID of the KMS key. + job_name: The name of the job, used to generate a unique secret name. + """ + self._project_id = project_id + self._location_id = location_id + self._key_ring_id = key_ring_id + self._key_id = key_id + self._secret_version_name = f'HsmGeneratedSecret_{job_name}' + + def get_secret_bytes(self) -> bytes: + """Retrieves the secret bytes. + + If the secret version already exists in Secret Manager, it is retrieved. + Otherwise, a new secret and version are created. The new secret is + generated using the HSM key. + + Returns: + The secret as a byte string. + """ + try: + from google.api_core import exceptions as api_exceptions + from google.cloud import secretmanager + client = secretmanager.SecretManagerServiceClient() + + project_path = f"projects/{self._project_id}" + secret_path = f"{project_path}/secrets/{self._secret_version_name}" + # Since we may generate multiple versions when doing this on workers, + # just always take the first version added to maintain consistency. + secret_version_path = f"{secret_path}/versions/1" + + try: + response = client.access_secret_version( + request={"name": secret_version_path}) + return response.payload.data + except api_exceptions.NotFound: + # Don't bother logging yet, we'll only log if we actually add the + # secret version below + pass + + try: + client.create_secret( + request={ + "parent": project_path, + "secret_id": self._secret_version_name, + "secret": { + "replication": { + "automatic": {} + } + }, + }) + except api_exceptions.AlreadyExists: + # Don't bother logging yet, we'll only log if we actually add the + # secret version below + pass + + new_key = self.generate_dek() + try: + # Try one more time in case it was created while we were generating the + # DEK. + response = client.access_secret_version( + request={"name": secret_version_path}) + return response.payload.data + except api_exceptions.NotFound: + logging.info( + "Secret version %s not found. " + "Creating new secret and version.", + secret_version_path) + client.add_secret_version( + request={ + "parent": secret_path, "payload": { + "data": new_key + } + }) + response = client.access_secret_version( + request={"name": secret_version_path}) + return response.payload.data + + except Exception as e: + raise RuntimeError( + f'Failed to retrieve or create secret bytes for secret ' + f'{self._secret_version_name} with exception {e}') + + def generate_dek(self, dek_size: int = 32) -> bytes: + """Generates a new Data Encryption Key (DEK) using an HSM-backed key. + + This function follows a key derivation process that incorporates entropy + from the HSM-backed key into the nonce used for key derivation. + + Args: + dek_size: The size of the DEK to generate. + + Returns: + A new DEK of the specified size, url-safe base64-encoded. + """ + try: + import base64 + import os + + from cryptography.hazmat.primitives import hashes + from cryptography.hazmat.primitives.kdf.hkdf import HKDF + from google.cloud import kms + + # 1. Generate a random nonce (nonce_one) + nonce_one = os.urandom(dek_size) + + # 2. Use the HSM-backed key to encrypt nonce_one to create nonce_two + kms_client = kms.KeyManagementServiceClient() + key_path = kms_client.crypto_key_path( + self._project_id, self._location_id, self._key_ring_id, self._key_id) + response = kms_client.encrypt( + request={ + 'name': key_path, 'plaintext': nonce_one + }) + nonce_two = response.ciphertext + + # 3. Generate a Derivation Key (DK) + dk = os.urandom(dek_size) + + # 4. Use a KDF to derive the DEK using DK and nonce_two + hkdf = HKDF( + algorithm=hashes.SHA256(), + length=dek_size, + salt=nonce_two, + info=None, + ) + dek = hkdf.derive(dk) + return base64.urlsafe_b64encode(dek) + except Exception as e: + raise RuntimeError(f'Failed to generate DEK with exception {e}') + + class _EncryptMessage(DoFn): """A DoFn that encrypts the key and value of each element.""" def __init__( diff --git a/sdks/python/apache_beam/transforms/util_test.py b/sdks/python/apache_beam/transforms/util_test.py index 34e251fad1c7..dd5e19519faf 100644 --- a/sdks/python/apache_beam/transforms/util_test.py +++ b/sdks/python/apache_beam/transforms/util_test.py @@ -72,6 +72,7 @@ from apache_beam.transforms.core import FlatMapTuple from apache_beam.transforms.trigger import AfterCount from apache_beam.transforms.trigger import Repeatedly +from apache_beam.transforms.util import GcpHsmGeneratedSecret from apache_beam.transforms.util import GcpSecret from apache_beam.transforms.util import Secret from apache_beam.transforms.window import FixedWindows @@ -439,6 +440,124 @@ def test_gbek_gcp_secret_manager_throws(self): result, equal_to([('a', ([1, 2])), ('b', ([3])), ('c', ([4]))])) +@unittest.skipIf(secretmanager is None, 'GCP dependencies are not installed') +class GcpHsmGeneratedSecretTest(unittest.TestCase): + def setUp(self): + self.mock_secret_manager_client = mock.MagicMock() + self.mock_kms_client = mock.MagicMock() + + # Patch the clients + self.secretmanager_patcher = mock.patch( + 'google.cloud.secretmanager.SecretManagerServiceClient', + return_value=self.mock_secret_manager_client) + self.kms_patcher = mock.patch( + 'google.cloud.kms.KeyManagementServiceClient', + return_value=self.mock_kms_client) + self.os_urandom_patcher = mock.patch('os.urandom', return_value=b'0' * 32) + self.hkdf_patcher = mock.patch( + 'cryptography.hazmat.primitives.kdf.hkdf.HKDF.derive', + return_value=b'derived_key') + + self.secretmanager_patcher.start() + self.kms_patcher.start() + self.os_urandom_patcher.start() + self.hkdf_patcher.start() + + def tearDown(self): + self.secretmanager_patcher.stop() + self.kms_patcher.stop() + self.os_urandom_patcher.stop() + self.hkdf_patcher.stop() + + def test_happy_path_secret_creation(self): + from google.api_core import exceptions as api_exceptions + + project_id = 'test-project' + location_id = 'global' + key_ring_id = 'test-key-ring' + key_id = 'test-key' + job_name = 'test-job' + + secret = GcpHsmGeneratedSecret( + project_id, location_id, key_ring_id, key_id, job_name) + + # Mock responses for secret creation path + self.mock_secret_manager_client.access_secret_version.side_effect = [ + api_exceptions.NotFound('not found'), # first check + api_exceptions.NotFound('not found'), # second check + mock.MagicMock(payload=mock.MagicMock(data=b'derived_key')) + ] + self.mock_kms_client.encrypt.return_value = mock.MagicMock( + ciphertext=b'encrypted_nonce') + + secret_bytes = secret.get_secret_bytes() + self.assertEqual(secret_bytes, b'derived_key') + + # Assertions on mocks + secret_version_path = ( + f'projects/{project_id}/secrets/{secret._secret_version_name}' + '/versions/1') + self.mock_secret_manager_client.access_secret_version.assert_any_call( + request={'name': secret_version_path}) + self.assertEqual( + self.mock_secret_manager_client.access_secret_version.call_count, 3) + self.mock_secret_manager_client.create_secret.assert_called_once() + self.mock_kms_client.encrypt.assert_called_once() + self.mock_secret_manager_client.add_secret_version.assert_called_once() + + def test_secret_already_exists(self): + from google.api_core import exceptions as api_exceptions + + project_id = 'test-project' + location_id = 'global' + key_ring_id = 'test-key-ring' + key_id = 'test-key' + job_name = 'test-job' + + secret = GcpHsmGeneratedSecret( + project_id, location_id, key_ring_id, key_id, job_name) + + # Mock responses for secret creation path + self.mock_secret_manager_client.access_secret_version.side_effect = [ + api_exceptions.NotFound('not found'), + api_exceptions.NotFound('not found'), + mock.MagicMock(payload=mock.MagicMock(data=b'derived_key')) + ] + self.mock_secret_manager_client.create_secret.side_effect = ( + api_exceptions.AlreadyExists('exists')) + self.mock_kms_client.encrypt.return_value = mock.MagicMock( + ciphertext=b'encrypted_nonce') + + secret_bytes = secret.get_secret_bytes() + self.assertEqual(secret_bytes, b'derived_key') + + # Assertions on mocks + self.mock_secret_manager_client.create_secret.assert_called_once() + self.mock_secret_manager_client.add_secret_version.assert_called_once() + + def test_secret_version_already_exists(self): + project_id = 'test-project' + location_id = 'global' + key_ring_id = 'test-key-ring' + key_id = 'test-key' + job_name = 'test-job' + + secret = GcpHsmGeneratedSecret( + project_id, location_id, key_ring_id, key_id, job_name) + + self.mock_secret_manager_client.access_secret_version.return_value = ( + mock.MagicMock(payload=mock.MagicMock(data=b'existing_dek'))) + + secret_bytes = secret.get_secret_bytes() + self.assertEqual(secret_bytes, b'existing_dek') + + # Assertions + self.mock_secret_manager_client.access_secret_version.assert_called_once() + self.mock_secret_manager_client.create_secret.assert_not_called() + self.mock_secret_manager_client.add_secret_version.assert_not_called() + self.mock_kms_client.encrypt.assert_not_called() + + class FakeClock(object): def __init__(self, now=time.time()): self._now = now diff --git a/sdks/python/setup.py b/sdks/python/setup.py index ef58b4f9c760..c74afbb52d37 100644 --- a/sdks/python/setup.py +++ b/sdks/python/setup.py @@ -483,6 +483,7 @@ def get_portability_package_data(): 'google-cloud-spanner>=3.0.0,<4', # GCP Packages required by ML functionality 'google-cloud-dlp>=3.0.0,<4', + 'google-cloud-kms>=3.0.0,<4', 'google-cloud-language>=2.0,<3', 'google-cloud-secret-manager>=2.0,<3', 'google-cloud-videointelligence>=2.0,<3', From c6d9aee37117109d651a376b540d7cb570f7b22e Mon Sep 17 00:00:00 2001 From: Danny Mccormick Date: Wed, 3 Dec 2025 16:26:47 -0500 Subject: [PATCH 07/11] Fix python postcommit --- .github/trigger_files/beam_PostCommit_Python.json | 2 +- .../io/hdfs_integration_test/hdfs_integration_test.sh | 2 +- sdks/python/tox.ini | 2 +- 3 files changed, 3 insertions(+), 3 deletions(-) diff --git a/.github/trigger_files/beam_PostCommit_Python.json b/.github/trigger_files/beam_PostCommit_Python.json index ed56f65ef50f..c6ec17f48412 100644 --- a/.github/trigger_files/beam_PostCommit_Python.json +++ b/.github/trigger_files/beam_PostCommit_Python.json @@ -1,5 +1,5 @@ { "comment": "Modify this file in a trivial way to cause this test suite to run.", - "modification": 32 + "modification": 35 } diff --git a/sdks/python/apache_beam/io/hdfs_integration_test/hdfs_integration_test.sh b/sdks/python/apache_beam/io/hdfs_integration_test/hdfs_integration_test.sh index 98cf4f74e4ab..7d272550ce51 100755 --- a/sdks/python/apache_beam/io/hdfs_integration_test/hdfs_integration_test.sh +++ b/sdks/python/apache_beam/io/hdfs_integration_test/hdfs_integration_test.sh @@ -40,7 +40,7 @@ cp -r ${ROOT_DIR}/sdks/python ${CONTEXT_DIR}/sdks/ cp -r ${ROOT_DIR}/model ${CONTEXT_DIR}/ # Use a unique name to allow concurrent runs on the same machine. -PROJECT_NAME=$(echo hdfs_IT-${BUILD_TAG:-non-jenkins}) +PROJECT_NAME=$(echo hdfs_it-${BUILD_TAG:-non-gha}) if [ -z "${BUILD_TAG:-}" ]; then COLOR_OPT="" diff --git a/sdks/python/tox.ini b/sdks/python/tox.ini index 431cd186c1b9..52fd82d41153 100644 --- a/sdks/python/tox.ini +++ b/sdks/python/tox.ini @@ -228,7 +228,7 @@ deps = holdup==1.8.0 extras = gcp - hdfs + hadoop allowlist_externals = bash echo From 5c92498399cc651923910dd77d4c9f1401dbfe67 Mon Sep 17 00:00:00 2001 From: Yi Hu Date: Thu, 4 Dec 2025 17:56:46 -0500 Subject: [PATCH 08/11] Enforce hypothesis version upper bound (#36952) (#36998) Co-authored-by: Shunping Huang --- sdks/python/setup.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/sdks/python/setup.py b/sdks/python/setup.py index c74afbb52d37..176c84c9966b 100644 --- a/sdks/python/setup.py +++ b/sdks/python/setup.py @@ -453,7 +453,9 @@ def get_portability_package_data(): 'psycopg2-binary>=2.8.5,<3.0', 'testcontainers[mysql,kafka,milvus]>=4.0.0,<5.0.0', 'cryptography>=41.0.2', - 'hypothesis>5.0.0,<7.0.0', + # TODO(https://github.com/apache/beam/issues/36951): need to + # further investigate the cause + 'hypothesis>5.0.0,<6.148.4', 'virtualenv-clone>=0.5,<1.0', 'python-tds>=1.16.1', 'sqlalchemy-pytds>=1.0.2', From fecd5e12008e5d8edae5c8a72f9a42c86a700321 Mon Sep 17 00:00:00 2001 From: Danny McCormick Date: Fri, 5 Dec 2025 10:37:15 -0500 Subject: [PATCH 09/11] Fix arm deps (#36997) --- .github/trigger_files/beam_PostCommit_Python.json | 2 +- sdks/python/container/ml/py310/base_image_requirements.txt | 1 + sdks/python/container/ml/py310/gpu_image_requirements.txt | 1 + sdks/python/container/ml/py311/base_image_requirements.txt | 1 + sdks/python/container/ml/py311/gpu_image_requirements.txt | 1 + sdks/python/container/ml/py312/base_image_requirements.txt | 1 + sdks/python/container/ml/py312/gpu_image_requirements.txt | 1 + sdks/python/container/ml/py313/base_image_requirements.txt | 1 + sdks/python/container/py310/base_image_requirements.txt | 1 + sdks/python/container/py311/base_image_requirements.txt | 1 + sdks/python/container/py312/base_image_requirements.txt | 1 + sdks/python/container/py313/base_image_requirements.txt | 1 + 12 files changed, 12 insertions(+), 1 deletion(-) diff --git a/.github/trigger_files/beam_PostCommit_Python.json b/.github/trigger_files/beam_PostCommit_Python.json index c6ec17f48412..8b8054dac42b 100644 --- a/.github/trigger_files/beam_PostCommit_Python.json +++ b/.github/trigger_files/beam_PostCommit_Python.json @@ -1,5 +1,5 @@ { "comment": "Modify this file in a trivial way to cause this test suite to run.", - "modification": 35 + "modification": 36 } diff --git a/sdks/python/container/ml/py310/base_image_requirements.txt b/sdks/python/container/ml/py310/base_image_requirements.txt index d4d0bdbb1686..e9dfdf83e5db 100644 --- a/sdks/python/container/ml/py310/base_image_requirements.txt +++ b/sdks/python/container/ml/py310/base_image_requirements.txt @@ -74,6 +74,7 @@ google-cloud-bigtable==2.34.0 google-cloud-core==2.5.0 google-cloud-datastore==2.21.0 google-cloud-dlp==3.33.0 +google-cloud-kms==3.7.0 google-cloud-language==2.18.0 google-cloud-profiler==4.1.0 google-cloud-pubsub==2.33.0 diff --git a/sdks/python/container/ml/py310/gpu_image_requirements.txt b/sdks/python/container/ml/py310/gpu_image_requirements.txt index 3ece38b033ff..c09fd258fbd2 100644 --- a/sdks/python/container/ml/py310/gpu_image_requirements.txt +++ b/sdks/python/container/ml/py310/gpu_image_requirements.txt @@ -91,6 +91,7 @@ google-cloud-bigtable==2.34.0 google-cloud-core==2.5.0 google-cloud-datastore==2.21.0 google-cloud-dlp==3.33.0 +google-cloud-kms==3.7.0 google-cloud-language==2.18.0 google-cloud-profiler==4.1.0 google-cloud-pubsub==2.33.0 diff --git a/sdks/python/container/ml/py311/base_image_requirements.txt b/sdks/python/container/ml/py311/base_image_requirements.txt index 0d9874d52640..be1c86d408dc 100644 --- a/sdks/python/container/ml/py311/base_image_requirements.txt +++ b/sdks/python/container/ml/py311/base_image_requirements.txt @@ -72,6 +72,7 @@ google-cloud-bigtable==2.34.0 google-cloud-core==2.5.0 google-cloud-datastore==2.21.0 google-cloud-dlp==3.33.0 +google-cloud-kms==3.7.0 google-cloud-language==2.18.0 google-cloud-profiler==4.1.0 google-cloud-pubsub==2.33.0 diff --git a/sdks/python/container/ml/py311/gpu_image_requirements.txt b/sdks/python/container/ml/py311/gpu_image_requirements.txt index e9c281dc0f4a..21835dcc5d46 100644 --- a/sdks/python/container/ml/py311/gpu_image_requirements.txt +++ b/sdks/python/container/ml/py311/gpu_image_requirements.txt @@ -89,6 +89,7 @@ google-cloud-bigtable==2.34.0 google-cloud-core==2.5.0 google-cloud-datastore==2.21.0 google-cloud-dlp==3.33.0 +google-cloud-kms==3.7.0 google-cloud-language==2.18.0 google-cloud-profiler==4.1.0 google-cloud-pubsub==2.33.0 diff --git a/sdks/python/container/ml/py312/base_image_requirements.txt b/sdks/python/container/ml/py312/base_image_requirements.txt index 2e9ddec66570..8dab6a2c2b78 100644 --- a/sdks/python/container/ml/py312/base_image_requirements.txt +++ b/sdks/python/container/ml/py312/base_image_requirements.txt @@ -71,6 +71,7 @@ google-cloud-bigtable==2.34.0 google-cloud-core==2.5.0 google-cloud-datastore==2.21.0 google-cloud-dlp==3.33.0 +google-cloud-kms==3.7.0 google-cloud-language==2.18.0 google-cloud-profiler==4.1.0 google-cloud-pubsub==2.33.0 diff --git a/sdks/python/container/ml/py312/gpu_image_requirements.txt b/sdks/python/container/ml/py312/gpu_image_requirements.txt index e0c1b617f3f0..15932bac8ba3 100644 --- a/sdks/python/container/ml/py312/gpu_image_requirements.txt +++ b/sdks/python/container/ml/py312/gpu_image_requirements.txt @@ -88,6 +88,7 @@ google-cloud-bigtable==2.34.0 google-cloud-core==2.5.0 google-cloud-datastore==2.21.0 google-cloud-dlp==3.33.0 +google-cloud-kms==3.7.0 google-cloud-language==2.18.0 google-cloud-profiler==4.1.0 google-cloud-pubsub==2.33.0 diff --git a/sdks/python/container/ml/py313/base_image_requirements.txt b/sdks/python/container/ml/py313/base_image_requirements.txt index 59eb65d6ee59..dd53fb738ea8 100644 --- a/sdks/python/container/ml/py313/base_image_requirements.txt +++ b/sdks/python/container/ml/py313/base_image_requirements.txt @@ -70,6 +70,7 @@ google-cloud-bigtable==2.34.0 google-cloud-core==2.5.0 google-cloud-datastore==2.21.0 google-cloud-dlp==3.33.0 +google-cloud-kms==3.7.0 google-cloud-language==2.18.0 google-cloud-pubsub==2.33.0 google-cloud-pubsublite==1.12.0 diff --git a/sdks/python/container/py310/base_image_requirements.txt b/sdks/python/container/py310/base_image_requirements.txt index 67bd2226557b..a4333267967b 100644 --- a/sdks/python/container/py310/base_image_requirements.txt +++ b/sdks/python/container/py310/base_image_requirements.txt @@ -68,6 +68,7 @@ google-cloud-bigtable==2.34.0 google-cloud-core==2.5.0 google-cloud-datastore==2.21.0 google-cloud-dlp==3.33.0 +google-cloud-kms==3.7.0 google-cloud-language==2.18.0 google-cloud-profiler==4.1.0 google-cloud-pubsub==2.33.0 diff --git a/sdks/python/container/py311/base_image_requirements.txt b/sdks/python/container/py311/base_image_requirements.txt index 766c8f47706e..36b439965cc6 100644 --- a/sdks/python/container/py311/base_image_requirements.txt +++ b/sdks/python/container/py311/base_image_requirements.txt @@ -66,6 +66,7 @@ google-cloud-bigtable==2.34.0 google-cloud-core==2.5.0 google-cloud-datastore==2.21.0 google-cloud-dlp==3.33.0 +google-cloud-kms==3.7.0 google-cloud-language==2.18.0 google-cloud-profiler==4.1.0 google-cloud-pubsub==2.33.0 diff --git a/sdks/python/container/py312/base_image_requirements.txt b/sdks/python/container/py312/base_image_requirements.txt index b97dbaeeb8e3..f85423f92e79 100644 --- a/sdks/python/container/py312/base_image_requirements.txt +++ b/sdks/python/container/py312/base_image_requirements.txt @@ -65,6 +65,7 @@ google-cloud-bigtable==2.34.0 google-cloud-core==2.5.0 google-cloud-datastore==2.21.0 google-cloud-dlp==3.33.0 +google-cloud-kms==3.7.0 google-cloud-language==2.18.0 google-cloud-profiler==4.1.0 google-cloud-pubsub==2.33.0 diff --git a/sdks/python/container/py313/base_image_requirements.txt b/sdks/python/container/py313/base_image_requirements.txt index 423a5eabc862..aed8dd46345c 100644 --- a/sdks/python/container/py313/base_image_requirements.txt +++ b/sdks/python/container/py313/base_image_requirements.txt @@ -64,6 +64,7 @@ google-cloud-bigtable==2.34.0 google-cloud-core==2.5.0 google-cloud-datastore==2.21.0 google-cloud-dlp==3.33.0 +google-cloud-kms==3.7.0 google-cloud-language==2.18.0 google-cloud-pubsub==2.33.0 google-cloud-pubsublite==1.12.0 From a83a5369938d83f10e7760bacfb1423b76de6e62 Mon Sep 17 00:00:00 2001 From: Jiang Zhu Date: Fri, 5 Dec 2025 06:46:32 -0800 Subject: [PATCH 10/11] Temporary disable a few unit tests in SpannerChangeStreamErrorTest (#37003) --- .../spanner/changestreams/SpannerChangeStreamErrorTest.java | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/SpannerChangeStreamErrorTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/SpannerChangeStreamErrorTest.java index 8adc927b4f29..835ca0a0f5a8 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/SpannerChangeStreamErrorTest.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/SpannerChangeStreamErrorTest.java @@ -70,6 +70,7 @@ import org.joda.time.Duration; import org.junit.After; import org.junit.Before; +import org.junit.Ignore; import org.junit.Rule; import org.junit.Test; import org.junit.rules.ExpectedException; @@ -115,6 +116,7 @@ public void tearDown() throws NoSuchFieldException, IllegalAccessException { } @Test + @Ignore("https://github.com/apache/beam/issues/37002 Re-enable skipped tests.") // Error code UNAVAILABLE is retried repeatedly until the RPC times out. public void testUnavailableExceptionRetries() throws InterruptedException { DirectOptions options = PipelineOptionsFactory.as(DirectOptions.class); @@ -155,6 +157,7 @@ public void testUnavailableExceptionRetries() throws InterruptedException { } @Test + @Ignore("https://github.com/apache/beam/issues/37002 Re-enable skipped tests.") // Error code ABORTED is retried repeatedly until it times out. public void testAbortedExceptionRetries() throws InterruptedException { mockSpannerService.setExecuteStreamingSqlExecutionTime( @@ -218,6 +221,7 @@ public void testUnknownExceptionDoesNotRetry() { } @Test + @Ignore("https://github.com/apache/beam/issues/37002 Re-enable skipped tests.") // Error code RESOURCE_EXHAUSTED is retried repeatedly. public void testResourceExhaustedRetry() { mockSpannerService.setExecuteStreamingSqlExecutionTime( @@ -281,6 +285,7 @@ public void testResourceExhaustedRetryWithDefaultSettings() { } @Test + @Ignore("https://github.com/apache/beam/issues/37002 Re-enable skipped tests.") public void testInvalidRecordReceived() { final Timestamp startTimestamp = Timestamp.ofTimeSecondsAndNanos(0, 1000); final Timestamp endTimestamp = From 9abc6906b45352c013156a00eb7b0efb363d0572 Mon Sep 17 00:00:00 2001 From: Shunping Huang Date: Fri, 5 Dec 2025 15:10:38 -0500 Subject: [PATCH 11/11] Update spanner client version. (#36995) * Update spanner client version. * Edit comments with a new issue id. --- .../groovy/org/apache/beam/gradle/BeamModulePlugin.groovy | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy b/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy index 595741f8efe4..5803f8b19c46 100644 --- a/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy +++ b/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy @@ -612,8 +612,8 @@ class BeamModulePlugin implements Plugin { def google_ads_version = "33.0.0" def google_clients_version = "2.0.0" def google_cloud_bigdataoss_version = "2.2.26" - // [bomupgrader] TODO(#35868): currently pinned, should be determined by: com.google.cloud:google-cloud-spanner, consistent with: google_cloud_platform_libraries_bom - def google_cloud_spanner_version = "6.95.1" + // [bomupgrader] TODO(#37008): currently pinned before gcp-bom moving beyond this version, should be determined by: com.google.cloud:google-cloud-spanner, consistent with: google_cloud_platform_libraries_bom + def google_cloud_spanner_version = "6.104.0" def google_code_gson_version = "2.10.1" def google_oauth_clients_version = "1.34.1" // [bomupgrader] determined by: io.grpc:grpc-netty, consistent with: google_cloud_platform_libraries_bom