Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/trigger_files/beam_PostCommit_Python.json
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
{
"comment": "Modify this file in a trivial way to cause this test suite to run.",
"modification": 32
"modification": 36
}

Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
{
"comment": "Modify this file in a trivial way to cause this test suite to run",
"modification": 1
}
"modification": 2
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,13 @@
# TODO(https://github.com/apache/beam/issues/32492): re-enable the suite
# on cron and add release/trigger_all_tests.json to trigger path once fixed.

name: PostCommit XVR GoUsingJava Dataflow (disabled)
name: PostCommit XVR GoUsingJava Dataflow

on:
schedule:
- cron: '45 5/6 * * *'
pull_request_target:
paths: ['.github/trigger_files/beam_PostCommit_XVR_GoUsingJava_Dataflow.json']
paths: ['.github/trigger_files/beam_PostCommit_XVR_GoUsingJava_Dataflow.json', 'release/trigger_all_tests.json']
workflow_dispatch:

#Setting explicit permissions for the action to avoid the default permissions which are `write-all` in case of pull_request_target event
Expand Down Expand Up @@ -83,6 +85,8 @@ jobs:
uses: ./.github/actions/gradle-command-self-hosted-action
with:
gradle-command: :runners:google-cloud-dataflow-java:validatesCrossLanguageRunnerGoUsingJava
arguments: |
-PuseDockerBuildx
- name: Archive JUnit Test Results
uses: actions/upload-artifact@v4
if: ${{ !success() }}
Expand Down
7 changes: 5 additions & 2 deletions CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ Now Beam has full support for Milvus integration including Milvus enrichment and

## Breaking Changes

* X behavior was changed ([#X](https://github.com/apache/beam/issues/X)).
* (Python) Some Python dependencies have been split out into extras. To ensure all previously installed dependencies are installed, when installing Beam you can `pip install apache-beam[gcp,interactive,yaml,redis,hadoop,tfrecord]`, though most users will not need all of these extras ([#34554](https://github.com/apache/beam/issues/34554)).

## Deprecations

Expand Down Expand Up @@ -123,7 +123,7 @@ Now Beam has full support for Milvus integration including Milvus enrichment and
- This change only affects pipelines that explicitly use the `pickle_library=dill` pipeline option.
- While `dill==0.3.1.1` is still pre-installed on the official Beam SDK base images, it is no longer a direct dependency of the apache-beam Python package. This means it can be overridden by other dependencies in your environment.
- If your pipeline uses `pickle_library=dill`, you must manually ensure `dill==0.3.1.1` is installed in both your submission and runtime environments.
- Submission environment: Install the dill extra in your local environment `pip install apache-beam[gcpdill]`.
- Submission environment: Install the dill extra in your local environment `pip install apache-beam[gcp,dill]`.
- Runtime (worker) environment: Your action depends on how you manage your worker's environment.
- If using default containers or custom containers with the official Beam base image e.g. `FROM apache/beam_python3.10_sdk:2.69.0`
- Add `dill==0.3.1.1` to your worker's requirements file (e.g., requirements.txt)
Expand All @@ -137,6 +137,9 @@ Now Beam has full support for Milvus integration including Milvus enrichment and
* (Python) The deterministic fallback coder for complex types like NamedTuple, Enum, and dataclasses now normalizes filepaths for better determinism guarantees. This affects streaming pipelines updating from 2.68 to 2.69 that utilize this fallback coder. If your pipeline is affected, you may see a warning like: "Using fallback deterministic coder for type X...". To update safely sepcify the pipeline option `--update_compatibility_version=2.68.0` ([#36345](https://github.com/apache/beam/pull/36345)).
* (Python) Fixed transform naming conflict when executing DataTransform on a dictionary of PColls ([#30445](https://github.com/apache/beam/issues/30445)).
This may break update compatibility if you don't provide a `--transform_name_mapping`.
* (Python) Split some extras out from the core Beam package. ([#30445](https://github.com/apache/beam/issues/30445)).
- If you use Enrichment with redis, Hadoop FileSystem, TFRecord, or some other packages, you may need to install some extras.
- To retain identical behavior to before, instead of `pip install apache-beam`, use `pip install apache-beam[hadoop,gcp,interactive,redis,test,tfrecord]`.
* Removed deprecated Hadoop versions (2.10.2 and 3.2.4) that are no longer supported for [Iceberg](https://github.com/apache/iceberg/issues/10940) from IcebergIO ([#36282](https://github.com/apache/beam/issues/36282)).
* (Go) Coder construction on SDK side is more faithful to the specs from runners without stripping length-prefix. This may break streaming pipeline update as the underlying coder could be changed ([#36387](https://github.com/apache/beam/issues/36387)).
* Minimum Go version for Beam Go updated to 1.25.2 ([#36461](https://github.com/apache/beam/issues/36461)).
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -612,8 +612,8 @@ class BeamModulePlugin implements Plugin<Project> {
def google_ads_version = "33.0.0"
def google_clients_version = "2.0.0"
def google_cloud_bigdataoss_version = "2.2.26"
// [bomupgrader] TODO(#35868): currently pinned, should be determined by: com.google.cloud:google-cloud-spanner, consistent with: google_cloud_platform_libraries_bom
def google_cloud_spanner_version = "6.95.1"
// [bomupgrader] TODO(#37008): currently pinned before gcp-bom moving beyond this version, should be determined by: com.google.cloud:google-cloud-spanner, consistent with: google_cloud_platform_libraries_bom
def google_cloud_spanner_version = "6.104.0"
def google_code_gson_version = "2.10.1"
def google_oauth_clients_version = "1.34.1"
// [bomupgrader] determined by: io.grpc:grpc-netty, consistent with: google_cloud_platform_libraries_bom
Expand Down Expand Up @@ -755,6 +755,7 @@ class BeamModulePlugin implements Plugin<Project> {
google_cloud_dataflow_java_proto_library_all: "com.google.cloud.dataflow:google-cloud-dataflow-java-proto-library-all:0.5.160304",
google_cloud_datastore_v1_proto_client : "com.google.cloud.datastore:datastore-v1-proto-client:2.32.3", // [bomupgrader] sets version
google_cloud_firestore : "com.google.cloud:google-cloud-firestore", // google_cloud_platform_libraries_bom sets version
google_cloud_kms : "com.google.cloud:google-cloud-kms", // google_cloud_platform_libraries_bom sets version
google_cloud_pubsub : "com.google.cloud:google-cloud-pubsub", // google_cloud_platform_libraries_bom sets version
google_cloud_pubsublite : "com.google.cloud:google-cloud-pubsublite", // google_cloud_platform_libraries_bom sets version
// [bomupgrader] the BOM version is set by scripts/tools/bomupgrader.py. If update manually, also update
Expand All @@ -765,6 +766,7 @@ class BeamModulePlugin implements Plugin<Project> {
google_cloud_spanner_bom : "com.google.cloud:google-cloud-spanner-bom:$google_cloud_spanner_version",
google_cloud_spanner : "com.google.cloud:google-cloud-spanner", // google_cloud_platform_libraries_bom sets version
google_cloud_spanner_test : "com.google.cloud:google-cloud-spanner:$google_cloud_spanner_version:tests",
google_cloud_tink : "com.google.crypto.tink:tink:1.19.0",
google_cloud_vertexai : "com.google.cloud:google-cloud-vertexai", // google_cloud_platform_libraries_bom sets version
google_code_gson : "com.google.code.gson:gson:$google_code_gson_version",
// google-http-client's version is explicitly declared for sdks/java/maven-archetypes/examples
Expand Down Expand Up @@ -866,6 +868,7 @@ class BeamModulePlugin implements Plugin<Project> {
proto_google_cloud_datacatalog_v1beta1 : "com.google.api.grpc:proto-google-cloud-datacatalog-v1beta1", // google_cloud_platform_libraries_bom sets version
proto_google_cloud_datastore_v1 : "com.google.api.grpc:proto-google-cloud-datastore-v1", // google_cloud_platform_libraries_bom sets version
proto_google_cloud_firestore_v1 : "com.google.api.grpc:proto-google-cloud-firestore-v1", // google_cloud_platform_libraries_bom sets version
proto_google_cloud_kms_v1 : "com.google.api.grpc:proto-google-cloud-kms-v1", // google_cloud_platform_libraries_bom sets version
proto_google_cloud_pubsub_v1 : "com.google.api.grpc:proto-google-cloud-pubsub-v1", // google_cloud_platform_libraries_bom sets version
proto_google_cloud_pubsublite_v1 : "com.google.api.grpc:proto-google-cloud-pubsublite-v1", // google_cloud_platform_libraries_bom sets version
proto_google_cloud_secret_manager_v1 : "com.google.api.grpc:proto-google-cloud-secretmanager-v1", // google_cloud_platform_libraries_bom sets version
Expand Down
4 changes: 2 additions & 2 deletions runners/google-cloud-dataflow-java/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -52,8 +52,8 @@ evaluationDependsOn(":sdks:java:container:java11")

ext.dataflowLegacyEnvironmentMajorVersion = '8'
ext.dataflowFnapiEnvironmentMajorVersion = '8'
ext.dataflowLegacyContainerVersion = 'beam-master-20251107'
ext.dataflowFnapiContainerVersion = 'beam-master-20251107'
ext.dataflowLegacyContainerVersion = '2.70.0'
ext.dataflowFnapiContainerVersion = '2.70.0'
ext.dataflowContainerBaseRepository = 'gcr.io/cloud-dataflow/v1beta3'

processResources {
Expand Down
3 changes: 3 additions & 0 deletions sdks/go/test/integration/integration.go
Original file line number Diff line number Diff line change
Expand Up @@ -301,6 +301,9 @@ var dataflowFilters = []string{
// There is no infrastructure for running KafkaIO tests with Dataflow.
"TestKafkaIO.*",
"TestSpannerIO.*",
// TODO(36918) These tests are currently failing in Dataflow Runner
"TestBigQueryIO.*",
"TestBigtableIO.*",
// Dataflow doesn't support any test that requires loopback.
// Eg. For FileIO examples.
".*Loopback.*",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@
<!-- gRPC/protobuf exceptions -->
<!-- Non-vendored gRPC/protobuf imports are allowed for files that depend on libraries that expose gRPC/protobuf in its public API -->
<suppress id="ForbidNonVendoredGrpcProtobuf" files=".*sdk.*extensions.*protobuf.*" />
<suppress id="ForbidNonVendoredGrpcProtobuf" files=".*sdk.*core.*GcpHsmGeneratedSecret.*" />
<suppress id="ForbidNonVendoredGrpcProtobuf" files=".*sdk.*core.*GroupByEncryptedKeyTest.*" />
<suppress id="ForbidNonVendoredGrpcProtobuf" files=".*sdk.*core.*GroupByKeyTest.*" />
<suppress id="ForbidNonVendoredGrpcProtobuf" files=".*sdk.*core.*GroupByKeyIT.*" />
Expand Down
6 changes: 6 additions & 0 deletions sdks/java/core/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,10 @@ dependencies {
shadow library.java.snappy_java
shadow library.java.joda_time
implementation enforcedPlatform(library.java.google_cloud_platform_libraries_bom)
implementation library.java.gax
implementation library.java.google_cloud_kms
implementation library.java.proto_google_cloud_kms_v1
implementation library.java.google_cloud_tink
implementation library.java.google_cloud_secret_manager
implementation library.java.proto_google_cloud_secret_manager_v1
implementation library.java.protobuf_java
Expand Down Expand Up @@ -130,6 +134,8 @@ dependencies {
shadowTest library.java.log4j2_api
shadowTest library.java.jamm
shadowTest 'com.google.cloud:google-cloud-secretmanager:2.75.0'
shadowTest 'com.google.cloud:google-cloud-kms:2.75.0'
shadowTest 'com.google.crypto.tink:tink:1.19.0'
testRuntimeOnly library.java.slf4j_jdk14
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,191 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.sdk.util;

import com.google.api.gax.rpc.AlreadyExistsException;
import com.google.api.gax.rpc.NotFoundException;
import com.google.cloud.kms.v1.CryptoKeyName;
import com.google.cloud.kms.v1.EncryptResponse;
import com.google.cloud.kms.v1.KeyManagementServiceClient;
import com.google.cloud.secretmanager.v1.AccessSecretVersionResponse;
import com.google.cloud.secretmanager.v1.ProjectName;
import com.google.cloud.secretmanager.v1.Replication;
import com.google.cloud.secretmanager.v1.SecretManagerServiceClient;
import com.google.cloud.secretmanager.v1.SecretName;
import com.google.cloud.secretmanager.v1.SecretPayload;
import com.google.cloud.secretmanager.v1.SecretVersionName;
import com.google.crypto.tink.subtle.Hkdf;
import com.google.protobuf.ByteString;
import java.io.IOException;
import java.security.GeneralSecurityException;
import java.security.SecureRandom;
import java.util.Base64;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* A {@link org.apache.beam.sdk.util.Secret} manager implementation that generates a secret using
* entropy from a GCP HSM key and stores it in Google Cloud Secret Manager. If the secret already
* exists, it will be retrieved.
*/
public class GcpHsmGeneratedSecret implements Secret {
private static final Logger LOG = LoggerFactory.getLogger(GcpHsmGeneratedSecret.class);
private final String projectId;
private final String locationId;
private final String keyRingId;
private final String keyId;
private final String secretId;

private final SecureRandom random = new SecureRandom();

public GcpHsmGeneratedSecret(
String projectId, String locationId, String keyRingId, String keyId, String jobName) {
this.projectId = projectId;
this.locationId = locationId;
this.keyRingId = keyRingId;
this.keyId = keyId;
this.secretId = "HsmGeneratedSecret_" + jobName;
}

/**
* Returns the secret as a byte array. Assumes that the current active service account has
* permissions to read the secret.
*
* @return The secret as a byte array.
*/
@Override
public byte[] getSecretBytes() {
try (SecretManagerServiceClient client = SecretManagerServiceClient.create()) {
SecretVersionName secretVersionName = SecretVersionName.of(projectId, secretId, "1");

try {
AccessSecretVersionResponse response = client.accessSecretVersion(secretVersionName);
return response.getPayload().getData().toByteArray();
} catch (NotFoundException e) {
LOG.info(
"Secret version {} not found. Creating new secret and version.",
secretVersionName.toString());
}

ProjectName projectName = ProjectName.of(projectId);
SecretName secretName = SecretName.of(projectId, secretId);
try {
com.google.cloud.secretmanager.v1.Secret secret =
com.google.cloud.secretmanager.v1.Secret.newBuilder()
.setReplication(
Replication.newBuilder()
.setAutomatic(Replication.Automatic.newBuilder().build()))
.build();
client.createSecret(projectName, secretId, secret);
} catch (AlreadyExistsException e) {
LOG.info("Secret {} already exists. Adding new version.", secretName.toString());
}

byte[] newKey = generateDek();

try {
// Always retrieve remote secret as source-of-truth in case another thread created it
AccessSecretVersionResponse response = client.accessSecretVersion(secretVersionName);
return response.getPayload().getData().toByteArray();
} catch (NotFoundException e) {
LOG.info(
"Secret version {} not found after re-check. Creating new secret and version.",
secretVersionName.toString());
}

SecretPayload payload =
SecretPayload.newBuilder().setData(ByteString.copyFrom(newKey)).build();
client.addSecretVersion(secretName, payload);
AccessSecretVersionResponse response = client.accessSecretVersion(secretVersionName);
return response.getPayload().getData().toByteArray();

} catch (IOException | GeneralSecurityException e) {
throw new RuntimeException("Failed to retrieve or create secret bytes", e);
}
}

private byte[] generateDek() throws IOException, GeneralSecurityException {
int dekSize = 32;
try (KeyManagementServiceClient client = KeyManagementServiceClient.create()) {
// 1. Generate nonce_one. This doesn't need to have baked in randomness since the
// actual randomness comes from KMS.
byte[] nonceOne = new byte[dekSize];
random.nextBytes(nonceOne);

// 2. Encrypt to get nonce_two
CryptoKeyName keyName = CryptoKeyName.of(projectId, locationId, keyRingId, keyId);
EncryptResponse response = client.encrypt(keyName, ByteString.copyFrom(nonceOne));
byte[] nonceTwo = response.getCiphertext().toByteArray();

// 3. Generate DK
byte[] dk = new byte[dekSize];
random.nextBytes(dk);

// 4. Derive DEK using HKDF
byte[] dek = Hkdf.computeHkdf("HmacSha256", dk, nonceTwo, new byte[0], dekSize);

// 5. Base64 encode
return Base64.getUrlEncoder().encode(dek);
}
}

/**
* Returns the project ID of the secret.
*
* @return The project ID as a String.
*/
public String getProjectId() {
return projectId;
}

/**
* Returns the location ID of the secret.
*
* @return The location ID as a String.
*/
public String getLocationId() {
return locationId;
}

/**
* Returns the key ring ID of the secret.
*
* @return The key ring ID as a String.
*/
public String getKeyRingId() {
return keyRingId;
}

/**
* Returns the key ID of the secret.
*
* @return The key ID as a String.
*/
public String getKeyId() {
return keyId;
}

/**
* Returns the secret ID of the secret.
*
* @return The secret ID as a String.
*/
public String getSecretId() {
return secretId;
}
}
Loading
Loading