From 4a18cbe3675447043b939daa1dd7d6e2e73cc864 Mon Sep 17 00:00:00 2001 From: Danny Mccormick Date: Wed, 18 Dec 2024 15:14:40 -0500 Subject: [PATCH 01/43] [WIP] Update xlang kinesis to v2 --- .../trigger_files/beam_PostCommit_Python.json | 2 +- .../expansion-service/build.gradle | 39 +++ .../kinesis/KinesisTransformRegistrar.java | 267 ++++++++++++++++++ sdks/python/apache_beam/io/kinesis.py | 46 ++- .../python/test-suites/portable/common.gradle | 4 +- settings.gradle.kts | 1 + 6 files changed, 344 insertions(+), 15 deletions(-) create mode 100644 sdks/java/io/amazon-web-services2/expansion-service/build.gradle create mode 100644 sdks/java/io/amazon-web-services2/src/main/java/org/apache/beam/sdk/io/aws2/kinesis/KinesisTransformRegistrar.java diff --git a/.github/trigger_files/beam_PostCommit_Python.json b/.github/trigger_files/beam_PostCommit_Python.json index 9c7a70ceed74..dd3d3e011a0c 100644 --- a/.github/trigger_files/beam_PostCommit_Python.json +++ b/.github/trigger_files/beam_PostCommit_Python.json @@ -1,5 +1,5 @@ { "comment": "Modify this file in a trivial way to cause this test suite to run.", - "modification": 7 + "modification": 8 } diff --git a/sdks/java/io/amazon-web-services2/expansion-service/build.gradle b/sdks/java/io/amazon-web-services2/expansion-service/build.gradle new file mode 100644 index 000000000000..fd712737f53c --- /dev/null +++ b/sdks/java/io/amazon-web-services2/expansion-service/build.gradle @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * License); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +apply plugin: 'org.apache.beam.module' +apply plugin: 'application' +mainClassName = "org.apache.beam.sdk.expansion.service.ExpansionService" + +applyJavaNature( + automaticModuleName: 'org.apache.beam.sdk.io.amazon-web-services2.expansion.service', + exportJavadoc: false, + validateShadowJar: false, + shadowClosure: {}, +) + +description = "Apache Beam :: SDKs :: Java :: IO :: Amazon Web Services 2 :: Expansion Service" +ext.summary = "Expansion service serving AWS2" + +dependencies { + implementation project(":sdks:java:expansion-service") + permitUnusedDeclared project(":sdks:java:expansion-service") + implementation project(":sdks:java:io:amazon-web-services2") + permitUnusedDeclared project(":sdks:java:io:amazon-web-services2") + runtimeOnly library.java.slf4j_jdk14 +} \ No newline at end of file diff --git a/sdks/java/io/amazon-web-services2/src/main/java/org/apache/beam/sdk/io/aws2/kinesis/KinesisTransformRegistrar.java b/sdks/java/io/amazon-web-services2/src/main/java/org/apache/beam/sdk/io/aws2/kinesis/KinesisTransformRegistrar.java new file mode 100644 index 000000000000..85f5c9f2db96 --- /dev/null +++ b/sdks/java/io/amazon-web-services2/src/main/java/org/apache/beam/sdk/io/aws2/kinesis/KinesisTransformRegistrar.java @@ -0,0 +1,267 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.aws2.kinesis; + +import software.amazon.awssdk.auth.credentials.AwsBasicCredentials; +import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider; +import software.amazon.awssdk.regions.Region; +import software.amazon.kinesis.common.InitialPositionInStream; +import com.google.auto.service.AutoService; +import java.util.Map; +import java.util.Properties; +import org.apache.beam.sdk.expansion.ExternalTransformRegistrar; +import org.apache.beam.sdk.io.aws2.kinesis.KinesisIO; +import org.apache.beam.sdk.transforms.ExternalTransformBuilder; +import org.apache.beam.sdk.transforms.PTransform; +import org.apache.beam.sdk.values.PBegin; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.PDone; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap; +import org.checkerframework.checker.nullness.qual.Nullable; +import org.joda.time.Duration; +import org.joda.time.Instant; + +/** + * Exposes {@link KinesisIO.Write} and {@link KinesisIO.Read} as an external transform for + * cross-language usage. + */ +@AutoService(ExternalTransformRegistrar.class) +@SuppressWarnings({ + "nullness" // TODO(https://github.com/apache/beam/issues/20497) +}) +public class KinesisTransformRegistrar implements ExternalTransformRegistrar { + public static final String WRITE_URN = "beam:transform:org.apache.beam:kinesis_write:v2"; + public static final String READ_DATA_URN = "beam:transform:org.apache.beam:kinesis_read_data:v2"; + + @Override + public Map> knownBuilderInstances() { + return ImmutableMap.of(WRITE_URN, new WriteBuilder(), READ_DATA_URN, new ReadDataBuilder()); + } + + private abstract static class CrossLanguageConfiguration { + String streamName; + String awsAccessKey; + String awsSecretKey; + Region region; + @Nullable String serviceEndpoint; + + public void setStreamName(String streamName) { + this.streamName = streamName; + } + + public void setAwsAccessKey(String awsAccessKey) { + this.awsAccessKey = awsAccessKey; + } + + public void setAwsSecretKey(String awsSecretKey) { + this.awsSecretKey = awsSecretKey; + } + + public void setRegion(String region) { + this.region = Region.of(region); + } + + public void setServiceEndpoint(@Nullable String serviceEndpoint) { + this.serviceEndpoint = serviceEndpoint; + } + } + + public static class WriteBuilder + implements ExternalTransformBuilder, PDone> { + + public static class Configuration extends CrossLanguageConfiguration { + private Properties producerProperties; + private String partitionKey; + + public void setProducerProperties(Map producerProperties) { + if (producerProperties != null) { + Properties properties = new Properties(); + producerProperties.forEach(properties::setProperty); + this.producerProperties = properties; + } + } + + public void setPartitionKey(String partitionKey) { + this.partitionKey = partitionKey; + } + } + + @Override + public PTransform, PDone> buildExternal(Configuration configuration) { + AwsBasicCredentials creds = + AwsBasicCredentials.create(configuration.awsAccessKey, configuration.awsSecretKey); + StaticCredentialsProvider provider = StaticCredentialsProvider.create(creds); + KinesisIO.Write writeTransform = + KinesisIO.write() + .withStreamName(configuration.streamName) + .withClientConfiguration( + ClientConfiguration.builder() + .credentialsProvider(provider) + .region(Region.of(configuration.region)) + .endpoint(configuration.serviceEndpoint) + .build()) + .withPartitioner(p -> configuration.partitionKey); + + if (configuration.producerProperties != null) { + writeTransform = writeTransform.withProducerProperties(configuration.producerProperties); + } + + return writeTransform; + } + } + + public static class ReadDataBuilder + implements ExternalTransformBuilder< + ReadDataBuilder.Configuration, PBegin, PCollection> { + + public static class Configuration extends CrossLanguageConfiguration { + private @Nullable Long maxNumRecords; + private @Nullable Duration maxReadTime; + private @Nullable InitialPositionInStream initialPositionInStream; + private @Nullable Instant initialTimestampInStream; + private @Nullable Integer requestRecordsLimit; + private @Nullable Duration upToDateThreshold; + private @Nullable Long maxCapacityPerShard; + private @Nullable WatermarkPolicy watermarkPolicy; + private @Nullable Duration watermarkIdleDurationThreshold; + private @Nullable Duration rateLimit; + + public void setMaxNumRecords(@Nullable Long maxNumRecords) { + this.maxNumRecords = maxNumRecords; + } + + public void setMaxReadTime(@Nullable Long maxReadTime) { + if (maxReadTime != null) { + this.maxReadTime = Duration.millis(maxReadTime); + } + } + + public void setInitialPositionInStream(@Nullable String initialPositionInStream) { + if (initialPositionInStream != null) { + this.initialPositionInStream = InitialPositionInStream.valueOf(initialPositionInStream); + } + } + + public void setInitialTimestampInStream(@Nullable Long initialTimestampInStream) { + if (initialTimestampInStream != null) { + this.initialTimestampInStream = Instant.ofEpochMilli(initialTimestampInStream); + } + } + + public void setRequestRecordsLimit(@Nullable Long requestRecordsLimit) { + if (requestRecordsLimit != null) { + this.requestRecordsLimit = requestRecordsLimit.intValue(); + } + } + + public void setUpToDateThreshold(@Nullable Long upToDateThreshold) { + if (upToDateThreshold != null) { + this.upToDateThreshold = Duration.millis(upToDateThreshold); + } + } + + public void setMaxCapacityPerShard(@Nullable Long maxCapacityPerShard) { + this.maxCapacityPerShard = maxCapacityPerShard; + } + + public void setWatermarkPolicy(@Nullable String watermarkPolicy) { + if (watermarkPolicy != null) { + this.watermarkPolicy = WatermarkPolicy.valueOf(watermarkPolicy); + } + } + + public void setWatermarkIdleDurationThreshold(@Nullable Long watermarkIdleDurationThreshold) { + if (watermarkIdleDurationThreshold != null) { + this.watermarkIdleDurationThreshold = Duration.millis(watermarkIdleDurationThreshold); + } + } + + public void setRateLimit(@Nullable Long rateLimit) { + if (rateLimit != null) { + this.rateLimit = Duration.millis(rateLimit); + } + } + } + + private enum WatermarkPolicy { + ARRIVAL_TIME, + PROCESSING_TIME + } + + @Override + public PTransform> buildExternal( + ReadDataBuilder.Configuration configuration) { + KinesisIO.Read readTransform = + KinesisIO.readData() + .withStreamName(configuration.streamName) + .withClientConfiguration( + ClientConfiguration.builder() + .credentialsProvider(provider) + .region(Region.of(configuration.region)) + .endpoint(configuration.serviceEndpoint) + .build()); + + if (configuration.maxNumRecords != null) { + readTransform = readTransform.withMaxNumRecords(configuration.maxNumRecords); + } + if (configuration.upToDateThreshold != null) { + readTransform = readTransform.withUpToDateThreshold(configuration.upToDateThreshold); + } + if (configuration.maxCapacityPerShard != null) { + readTransform = + readTransform.withMaxCapacityPerShard(configuration.maxCapacityPerShard.intValue()); + } + if (configuration.watermarkPolicy != null) { + switch (configuration.watermarkPolicy) { + case ARRIVAL_TIME: + readTransform = + configuration.watermarkIdleDurationThreshold != null + ? readTransform.withArrivalTimeWatermarkPolicy( + configuration.watermarkIdleDurationThreshold) + : readTransform.withArrivalTimeWatermarkPolicy(); + break; + case PROCESSING_TIME: + readTransform = readTransform.withProcessingTimeWatermarkPolicy(); + break; + default: + throw new RuntimeException( + String.format( + "Unsupported watermark policy type: %s", configuration.watermarkPolicy)); + } + } + if (configuration.rateLimit != null) { + readTransform = readTransform.withFixedDelayRateLimitPolicy(configuration.rateLimit); + } + if (configuration.maxReadTime != null) { + readTransform = readTransform.withMaxReadTime(configuration.maxReadTime); + } + if (configuration.initialPositionInStream != null) { + readTransform = + readTransform.withInitialPositionInStream(configuration.initialPositionInStream); + } + if (configuration.requestRecordsLimit != null) { + readTransform = readTransform.withRequestRecordsLimit(configuration.requestRecordsLimit); + } + if (configuration.initialTimestampInStream != null) { + readTransform = + readTransform.withInitialTimestampInStream(configuration.initialTimestampInStream); + } + return readTransform; + } + } +} \ No newline at end of file diff --git a/sdks/python/apache_beam/io/kinesis.py b/sdks/python/apache_beam/io/kinesis.py index bc5e1fa787b4..0be98c122f6e 100644 --- a/sdks/python/apache_beam/io/kinesis.py +++ b/sdks/python/apache_beam/io/kinesis.py @@ -49,7 +49,8 @@ In this option, Python SDK will either download (for released Beam version) or build (when running from a Beam Git clone) a expansion service jar and use that to expand transforms. Currently Kinesis transforms use the - 'beam-sdks-java-io-kinesis-expansion-service' jar for this purpose. + 'beam-sdks-java-io-amazon-web-services2-expansion-service' jar for this + purpose. *Option 2: specify a custom expansion service* @@ -99,7 +100,7 @@ def default_io_expansion_service(): return BeamJarExpansionService( - 'sdks:java:io:kinesis:expansion-service:shadowJar') + 'sdks:java:io:amazon-web-services2:expansion-service:shadowJar') WriteToKinesisSchema = NamedTuple( @@ -111,7 +112,6 @@ def default_io_expansion_service(): ('region', str), ('partition_key', str), ('service_endpoint', Optional[str]), - ('verify_certificate', Optional[bool]), ('producer_properties', Optional[Mapping[str, str]]), ], ) @@ -123,7 +123,7 @@ class WriteToKinesis(ExternalTransform): Experimental; no backwards compatibility guarantees. """ - URN = 'beam:transform:org.apache.beam:kinesis_write:v1' + URN = 'beam:transform:org.apache.beam:kinesis_write:v2' def __init__( self, @@ -145,14 +145,26 @@ def __init__( :param aws_secret_key: Kinesis access key secret. :param region: AWS region. Example: 'us-east-1'. :param service_endpoint: Kinesis service endpoint - :param verify_certificate: Enable or disable certificate verification. - Never set to False on production. True by default. + :param verify_certificate: Deprecated - certificates will always be + verified. :param partition_key: Specify default partition key. :param producer_properties: Specify the configuration properties for Kinesis Producer Library (KPL) as dictionary. Example: {'CollectionMaxCount': '1000', 'ConnectTimeout': '10000'} :param expansion_service: The address (host:port) of the ExpansionService. """ + if verify_certificate is False: + # Previously, we supported this via + # https://javadoc.io/doc/com.amazonaws/amazon-kinesis-producer/0.14.0/com/amazonaws/services/kinesis/producer/KinesisProducerConfiguration.html#isVerifyCertificate-- + # With the new AWS client, we no longer support it and it is always True + raise ValueError( + 'verify_certificate set to False. This option is no longer ' + + 'supported and certificate verification will still happen.') + if verify_certificate is True: + logging.warning( + 'verify_certificate set to True. This option is no longer ' + + 'supported and certificate verification will automatically happen. ' + + 'This option may be removed in a future release') super().__init__( self.URN, NamedTupleBasedPayloadBuilder( @@ -163,7 +175,6 @@ def __init__( region=region, partition_key=partition_key, service_endpoint=service_endpoint, - verify_certificate=verify_certificate, producer_properties=producer_properties, )), expansion_service or default_io_expansion_service(), @@ -178,7 +189,6 @@ def __init__( ('aws_secret_key', str), ('region', str), ('service_endpoint', Optional[str]), - ('verify_certificate', Optional[bool]), ('max_num_records', Optional[int]), ('max_read_time', Optional[int]), ('initial_position_in_stream', Optional[str]), @@ -199,7 +209,7 @@ class ReadDataFromKinesis(ExternalTransform): Experimental; no backwards compatibility guarantees. """ - URN = 'beam:transform:org.apache.beam:kinesis_read_data:v1' + URN = 'beam:transform:org.apache.beam:kinesis_read_data:v2' def __init__( self, @@ -229,8 +239,8 @@ def __init__( :param aws_secret_key: Kinesis access key secret. :param region: AWS region. Example: 'us-east-1'. :param service_endpoint: Kinesis service endpoint - :param verify_certificate: Enable or disable certificate verification. - Never set to False on production. True by default. + :param verify_certificate: Deprecated - certificates will always be + verified. :param max_num_records: Specifies to read at most a given number of records. Must be greater than 0. :param max_read_time: Specifies to read records during x milliseconds. @@ -277,6 +287,19 @@ def __init__( ): logging.warning('Provided timestamp emplaced not in the past.') + if verify_certificate is False: + # Previously, we supported this via + # https://javadoc.io/doc/com.amazonaws/amazon-kinesis-producer/0.14.0/com/amazonaws/services/kinesis/producer/KinesisProducerConfiguration.html#isVerifyCertificate-- + # With the new AWS client, we no longer support it and it is always True + raise ValueError( + 'verify_certificate set to False. This option is no longer ' + + 'supported and certificate verification will still happen.') + if verify_certificate is True: + logging.warning( + 'verify_certificate set to True. This option is no longer ' + + 'supported and certificate verification will automatically happen. ' + + 'This option may be removed in a future release') + super().__init__( self.URN, NamedTupleBasedPayloadBuilder( @@ -286,7 +309,6 @@ def __init__( aws_secret_key=aws_secret_key, region=region, service_endpoint=service_endpoint, - verify_certificate=verify_certificate, max_num_records=max_num_records, max_read_time=max_read_time, initial_position_in_stream=initial_position_in_stream, diff --git a/sdks/python/test-suites/portable/common.gradle b/sdks/python/test-suites/portable/common.gradle index be87be749862..99b477b2c7db 100644 --- a/sdks/python/test-suites/portable/common.gradle +++ b/sdks/python/test-suites/portable/common.gradle @@ -376,7 +376,7 @@ project.tasks.register("postCommitPy${pythonVersionSuffix}IT") { ':sdks:java:testing:kafka-service:buildTestKafkaServiceJar', ':sdks:java:io:expansion-service:shadowJar', ':sdks:java:io:google-cloud-platform:expansion-service:shadowJar', - ':sdks:java:io:kinesis:expansion-service:shadowJar', + ':sdks:java:io:amazon-web-services2:expansion-service:shadowJar', ':sdks:java:extensions:schemaio-expansion-service:shadowJar', ':sdks:java:io:debezium:expansion-service:shadowJar' ] @@ -426,7 +426,7 @@ project.tasks.register("xlangSpannerIOIT") { ":sdks:java:container:${currentJavaVersion}:docker", ':sdks:java:io:expansion-service:shadowJar', ':sdks:java:io:google-cloud-platform:expansion-service:shadowJar', - ':sdks:java:io:kinesis:expansion-service:shadowJar', + ':sdks:java:io:amazon-web-services2:expansion-service:shadowJar', ':sdks:java:extensions:schemaio-expansion-service:shadowJar', ':sdks:java:io:debezium:expansion-service:shadowJar' ] diff --git a/settings.gradle.kts b/settings.gradle.kts index a8bee45a05ac..624e9f970d9d 100644 --- a/settings.gradle.kts +++ b/settings.gradle.kts @@ -206,6 +206,7 @@ include(":sdks:java:harness") include(":sdks:java:harness:jmh") include(":sdks:java:io:amazon-web-services") include(":sdks:java:io:amazon-web-services2") +include(":sdks:java:io:amazon-web-services2:expansion-service") include(":sdks:java:io:amqp") include(":sdks:java:io:azure") include(":sdks:java:io:azure-cosmos") From 18255573b8d9e5d5300a5e09751f27d01debed41 Mon Sep 17 00:00:00 2001 From: Danny Mccormick Date: Wed, 18 Dec 2024 16:05:08 -0500 Subject: [PATCH 02/43] cleanup --- .../apache_beam/io/external/xlang_kinesisio_it_test.py | 7 +------ sdks/python/apache_beam/io/kinesis.py | 6 ++++-- 2 files changed, 5 insertions(+), 8 deletions(-) diff --git a/sdks/python/apache_beam/io/external/xlang_kinesisio_it_test.py b/sdks/python/apache_beam/io/external/xlang_kinesisio_it_test.py index 151d63d84684..44e6b78f4e93 100644 --- a/sdks/python/apache_beam/io/external/xlang_kinesisio_it_test.py +++ b/sdks/python/apache_beam/io/external/xlang_kinesisio_it_test.py @@ -116,8 +116,7 @@ def run_kinesis_write(self): region=self.aws_region, service_endpoint=self.aws_service_endpoint, verify_certificate=(not self.use_localstack), - partition_key='1', - producer_properties=self.producer_properties, + partition_key='1' )) def run_kinesis_read(self): @@ -219,10 +218,6 @@ def setUp(self): self.aws_service_endpoint = known_args.aws_service_endpoint self.use_localstack = not known_args.use_real_aws self.expansion_service = known_args.expansion_service - self.producer_properties = { - 'CollectionMaxCount': str(NUM_RECORDS), - 'ConnectTimeout': str(MAX_READ_TIME), - } if self.use_localstack: self.set_localstack() diff --git a/sdks/python/apache_beam/io/kinesis.py b/sdks/python/apache_beam/io/kinesis.py index 0be98c122f6e..1640b25830b4 100644 --- a/sdks/python/apache_beam/io/kinesis.py +++ b/sdks/python/apache_beam/io/kinesis.py @@ -112,7 +112,6 @@ def default_io_expansion_service(): ('region', str), ('partition_key', str), ('service_endpoint', Optional[str]), - ('producer_properties', Optional[Mapping[str, str]]), ], ) @@ -165,6 +164,10 @@ def __init__( 'verify_certificate set to True. This option is no longer ' + 'supported and certificate verification will automatically happen. ' + 'This option may be removed in a future release') + if producer_properties is not None: + raise ValueError( + 'producer_properties is no longer supported and will be removed in ' + + 'a future release.') super().__init__( self.URN, NamedTupleBasedPayloadBuilder( @@ -175,7 +178,6 @@ def __init__( region=region, partition_key=partition_key, service_endpoint=service_endpoint, - producer_properties=producer_properties, )), expansion_service or default_io_expansion_service(), ) From adc016f0f30db265597c0e24653f68c8238fe74d Mon Sep 17 00:00:00 2001 From: Danny Mccormick Date: Wed, 18 Dec 2024 16:15:16 -0500 Subject: [PATCH 03/43] Add missed file --- .../kinesis/KinesisTransformRegistrar.java | 29 ++++++------------- 1 file changed, 9 insertions(+), 20 deletions(-) diff --git a/sdks/java/io/amazon-web-services2/src/main/java/org/apache/beam/sdk/io/aws2/kinesis/KinesisTransformRegistrar.java b/sdks/java/io/amazon-web-services2/src/main/java/org/apache/beam/sdk/io/aws2/kinesis/KinesisTransformRegistrar.java index 85f5c9f2db96..7ad4c4f01973 100644 --- a/sdks/java/io/amazon-web-services2/src/main/java/org/apache/beam/sdk/io/aws2/kinesis/KinesisTransformRegistrar.java +++ b/sdks/java/io/amazon-web-services2/src/main/java/org/apache/beam/sdk/io/aws2/kinesis/KinesisTransformRegistrar.java @@ -25,6 +25,7 @@ import java.util.Map; import java.util.Properties; import org.apache.beam.sdk.expansion.ExternalTransformRegistrar; +import org.apache.beam.sdk.io.aws2.common.ClientConfiguration; import org.apache.beam.sdk.io.aws2.kinesis.KinesisIO; import org.apache.beam.sdk.transforms.ExternalTransformBuilder; import org.apache.beam.sdk.transforms.PTransform; @@ -85,17 +86,8 @@ public static class WriteBuilder implements ExternalTransformBuilder, PDone> { public static class Configuration extends CrossLanguageConfiguration { - private Properties producerProperties; private String partitionKey; - public void setProducerProperties(Map producerProperties) { - if (producerProperties != null) { - Properties properties = new Properties(); - producerProperties.forEach(properties::setProperty); - this.producerProperties = properties; - } - } - public void setPartitionKey(String partitionKey) { this.partitionKey = partitionKey; } @@ -103,24 +95,19 @@ public void setPartitionKey(String partitionKey) { @Override public PTransform, PDone> buildExternal(Configuration configuration) { - AwsBasicCredentials creds = - AwsBasicCredentials.create(configuration.awsAccessKey, configuration.awsSecretKey); + AwsBasicCredentials creds = AwsBasicCredentials.create(configuration.awsAccessKey, configuration.awsSecretKey); StaticCredentialsProvider provider = StaticCredentialsProvider.create(creds); KinesisIO.Write writeTransform = - KinesisIO.write() + KinesisIO.write() .withStreamName(configuration.streamName) .withClientConfiguration( ClientConfiguration.builder() .credentialsProvider(provider) - .region(Region.of(configuration.region)) + .region(configuration.region) .endpoint(configuration.serviceEndpoint) .build()) .withPartitioner(p -> configuration.partitionKey); - if (configuration.producerProperties != null) { - writeTransform = writeTransform.withProducerProperties(configuration.producerProperties); - } - return writeTransform; } } @@ -206,13 +193,15 @@ private enum WatermarkPolicy { @Override public PTransform> buildExternal( ReadDataBuilder.Configuration configuration) { - KinesisIO.Read readTransform = - KinesisIO.readData() + AwsBasicCredentials creds = AwsBasicCredentials.create(configuration.awsAccessKey, configuration.awsSecretKey); + StaticCredentialsProvider provider = StaticCredentialsProvider.create(creds); + KinesisIO.Read readTransform = + KinesisIO.read() .withStreamName(configuration.streamName) .withClientConfiguration( ClientConfiguration.builder() .credentialsProvider(provider) - .region(Region.of(configuration.region)) + .region(configuration.region) .endpoint(configuration.serviceEndpoint) .build()); From fb95fee846dae1952b9144da347ceab0751c1a5f Mon Sep 17 00:00:00 2001 From: Danny Mccormick Date: Wed, 18 Dec 2024 16:50:35 -0500 Subject: [PATCH 04/43] Fix up --- .../io/aws2/kinesis/KinesisTransformRegistrar.java | 13 +++++++++---- 1 file changed, 9 insertions(+), 4 deletions(-) diff --git a/sdks/java/io/amazon-web-services2/src/main/java/org/apache/beam/sdk/io/aws2/kinesis/KinesisTransformRegistrar.java b/sdks/java/io/amazon-web-services2/src/main/java/org/apache/beam/sdk/io/aws2/kinesis/KinesisTransformRegistrar.java index 7ad4c4f01973..8bbe9968024c 100644 --- a/sdks/java/io/amazon-web-services2/src/main/java/org/apache/beam/sdk/io/aws2/kinesis/KinesisTransformRegistrar.java +++ b/sdks/java/io/amazon-web-services2/src/main/java/org/apache/beam/sdk/io/aws2/kinesis/KinesisTransformRegistrar.java @@ -22,6 +22,7 @@ import software.amazon.awssdk.regions.Region; import software.amazon.kinesis.common.InitialPositionInStream; import com.google.auto.service.AutoService; +import java.net.URI; import java.util.Map; import java.util.Properties; import org.apache.beam.sdk.expansion.ExternalTransformRegistrar; @@ -59,7 +60,7 @@ private abstract static class CrossLanguageConfiguration { String awsAccessKey; String awsSecretKey; Region region; - @Nullable String serviceEndpoint; + @Nullable URI serviceEndpoint; public void setStreamName(String streamName) { this.streamName = streamName; @@ -78,7 +79,9 @@ public void setRegion(String region) { } public void setServiceEndpoint(@Nullable String serviceEndpoint) { - this.serviceEndpoint = serviceEndpoint; + if (serviceEndpoint != null) { + this.serviceEndpoint = URI(serviceEndpoint); + } } } @@ -97,7 +100,7 @@ public void setPartitionKey(String partitionKey) { public PTransform, PDone> buildExternal(Configuration configuration) { AwsBasicCredentials creds = AwsBasicCredentials.create(configuration.awsAccessKey, configuration.awsSecretKey); StaticCredentialsProvider provider = StaticCredentialsProvider.create(creds); - KinesisIO.Write writeTransform = + KinesisIO.Write writeTransform = KinesisIO.write() .withStreamName(configuration.streamName) .withClientConfiguration( @@ -250,7 +253,9 @@ public PTransform> buildExternal( readTransform = readTransform.withInitialTimestampInStream(configuration.initialTimestampInStream); } - return readTransform; + // Convert back to bytes to keep consistency with previous verison: + // https://github.com/apache/beam/blob/5eed396caf9e0065d8ed82edcc236bad5b71ba22/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/KinesisTransformRegistrar.java + return readTransform.Map(kr -> kr.getDataAsBytes()); } } } \ No newline at end of file From a0cf3637cd712ecefd3b58a1ae53e1638fb35d3a Mon Sep 17 00:00:00 2001 From: Danny Mccormick Date: Wed, 18 Dec 2024 16:57:21 -0500 Subject: [PATCH 05/43] Fix up --- .../beam/sdk/io/aws2/kinesis/KinesisTransformRegistrar.java | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/sdks/java/io/amazon-web-services2/src/main/java/org/apache/beam/sdk/io/aws2/kinesis/KinesisTransformRegistrar.java b/sdks/java/io/amazon-web-services2/src/main/java/org/apache/beam/sdk/io/aws2/kinesis/KinesisTransformRegistrar.java index 8bbe9968024c..8df97f6d5304 100644 --- a/sdks/java/io/amazon-web-services2/src/main/java/org/apache/beam/sdk/io/aws2/kinesis/KinesisTransformRegistrar.java +++ b/sdks/java/io/amazon-web-services2/src/main/java/org/apache/beam/sdk/io/aws2/kinesis/KinesisTransformRegistrar.java @@ -32,7 +32,6 @@ import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.values.PBegin; import org.apache.beam.sdk.values.PCollection; -import org.apache.beam.sdk.values.PDone; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap; import org.checkerframework.checker.nullness.qual.Nullable; import org.joda.time.Duration; @@ -86,7 +85,7 @@ public void setServiceEndpoint(@Nullable String serviceEndpoint) { } public static class WriteBuilder - implements ExternalTransformBuilder, PDone> { + implements ExternalTransformBuilder, KinesisIO.Write.Result> { public static class Configuration extends CrossLanguageConfiguration { private String partitionKey; @@ -97,7 +96,7 @@ public void setPartitionKey(String partitionKey) { } @Override - public PTransform, PDone> buildExternal(Configuration configuration) { + public PTransform, KinesisIO.Write.Result> buildExternal(Configuration configuration) { AwsBasicCredentials creds = AwsBasicCredentials.create(configuration.awsAccessKey, configuration.awsSecretKey); StaticCredentialsProvider provider = StaticCredentialsProvider.create(creds); KinesisIO.Write writeTransform = From 93e427ac01010233f114b8cba2e6b0b54d6db5d0 Mon Sep 17 00:00:00 2001 From: Danny Mccormick Date: Wed, 18 Dec 2024 17:02:33 -0500 Subject: [PATCH 06/43] Fix up --- .../sdk/io/aws2/kinesis/KinesisTransformRegistrar.java | 10 ++++++++-- 1 file changed, 8 insertions(+), 2 deletions(-) diff --git a/sdks/java/io/amazon-web-services2/src/main/java/org/apache/beam/sdk/io/aws2/kinesis/KinesisTransformRegistrar.java b/sdks/java/io/amazon-web-services2/src/main/java/org/apache/beam/sdk/io/aws2/kinesis/KinesisTransformRegistrar.java index 8df97f6d5304..5983f9a665ec 100644 --- a/sdks/java/io/amazon-web-services2/src/main/java/org/apache/beam/sdk/io/aws2/kinesis/KinesisTransformRegistrar.java +++ b/sdks/java/io/amazon-web-services2/src/main/java/org/apache/beam/sdk/io/aws2/kinesis/KinesisTransformRegistrar.java @@ -79,7 +79,7 @@ public void setRegion(String region) { public void setServiceEndpoint(@Nullable String serviceEndpoint) { if (serviceEndpoint != null) { - this.serviceEndpoint = URI(serviceEndpoint); + this.serviceEndpoint = new URI(serviceEndpoint); } } } @@ -254,7 +254,13 @@ public PTransform> buildExternal( } // Convert back to bytes to keep consistency with previous verison: // https://github.com/apache/beam/blob/5eed396caf9e0065d8ed82edcc236bad5b71ba22/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/KinesisTransformRegistrar.java - return readTransform.Map(kr -> kr.getDataAsBytes()); + return readTransform.apply("Convert to bytes", ParDo.of(new DoFn() { + @ProcessElement + public void processElement(ProcessContext c) { + KinesisRecord record = c.element(); + return record.getDataAsBytes(); + } + })); } } } \ No newline at end of file From 3397d8554b09ebc76739d8d1c38bbe6556370114 Mon Sep 17 00:00:00 2001 From: Danny Mccormick Date: Thu, 19 Dec 2024 10:14:03 -0500 Subject: [PATCH 07/43] fix --- .../kinesis/KinesisTransformRegistrar.java | 88 +++++++++++++------ 1 file changed, 59 insertions(+), 29 deletions(-) diff --git a/sdks/java/io/amazon-web-services2/src/main/java/org/apache/beam/sdk/io/aws2/kinesis/KinesisTransformRegistrar.java b/sdks/java/io/amazon-web-services2/src/main/java/org/apache/beam/sdk/io/aws2/kinesis/KinesisTransformRegistrar.java index 5983f9a665ec..5f11c9d21461 100644 --- a/sdks/java/io/amazon-web-services2/src/main/java/org/apache/beam/sdk/io/aws2/kinesis/KinesisTransformRegistrar.java +++ b/sdks/java/io/amazon-web-services2/src/main/java/org/apache/beam/sdk/io/aws2/kinesis/KinesisTransformRegistrar.java @@ -17,29 +17,31 @@ */ package org.apache.beam.sdk.io.aws2.kinesis; -import software.amazon.awssdk.auth.credentials.AwsBasicCredentials; -import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider; -import software.amazon.awssdk.regions.Region; -import software.amazon.kinesis.common.InitialPositionInStream; import com.google.auto.service.AutoService; import java.net.URI; +import java.net.URISyntaxException; import java.util.Map; -import java.util.Properties; import org.apache.beam.sdk.expansion.ExternalTransformRegistrar; import org.apache.beam.sdk.io.aws2.common.ClientConfiguration; -import org.apache.beam.sdk.io.aws2.kinesis.KinesisIO; +import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.ExternalTransformBuilder; import org.apache.beam.sdk.transforms.PTransform; +import org.apache.beam.sdk.transforms.ParDo; import org.apache.beam.sdk.values.PBegin; import org.apache.beam.sdk.values.PCollection; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap; import org.checkerframework.checker.nullness.qual.Nullable; import org.joda.time.Duration; import org.joda.time.Instant; +import software.amazon.awssdk.auth.credentials.AwsBasicCredentials; +import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider; +import software.amazon.awssdk.regions.Region; +import software.amazon.kinesis.common.InitialPositionInStream; /** - * Exposes {@link KinesisIO.Write} and {@link KinesisIO.Read} as an external transform for - * cross-language usage. + * Exposes {@link org.apache.beam.sdk.io.aws2.kinesis.KinesisIO.Write} and {@link + * org.apache.beam.sdk.io.aws2.kinesis.KinesisIO.Read} as an external transform for cross-language + * usage. */ @AutoService(ExternalTransformRegistrar.class) @SuppressWarnings({ @@ -79,13 +81,19 @@ public void setRegion(String region) { public void setServiceEndpoint(@Nullable String serviceEndpoint) { if (serviceEndpoint != null) { - this.serviceEndpoint = new URI(serviceEndpoint); + try { + this.serviceEndpoint = new URI(serviceEndpoint); + } catch (URISyntaxException ex) { + throw new RuntimeException( + String.format("Service endpoint must be URI format, got: %s", serviceEndpoint)); + } } } } public static class WriteBuilder - implements ExternalTransformBuilder, KinesisIO.Write.Result> { + implements ExternalTransformBuilder< + WriteBuilder.Configuration, PCollection, KinesisIO.Write.Result> { public static class Configuration extends CrossLanguageConfiguration { private String partitionKey; @@ -96,18 +104,20 @@ public void setPartitionKey(String partitionKey) { } @Override - public PTransform, KinesisIO.Write.Result> buildExternal(Configuration configuration) { - AwsBasicCredentials creds = AwsBasicCredentials.create(configuration.awsAccessKey, configuration.awsSecretKey); + public PTransform, KinesisIO.Write.Result> buildExternal( + Configuration configuration) { + AwsBasicCredentials creds = + AwsBasicCredentials.create(configuration.awsAccessKey, configuration.awsSecretKey); StaticCredentialsProvider provider = StaticCredentialsProvider.create(creds); KinesisIO.Write writeTransform = KinesisIO.write() .withStreamName(configuration.streamName) .withClientConfiguration( ClientConfiguration.builder() - .credentialsProvider(provider) - .region(configuration.region) - .endpoint(configuration.serviceEndpoint) - .build()) + .credentialsProvider(provider) + .region(configuration.region) + .endpoint(configuration.serviceEndpoint) + .build()) .withPartitioner(p -> configuration.partitionKey); return writeTransform; @@ -195,17 +205,18 @@ private enum WatermarkPolicy { @Override public PTransform> buildExternal( ReadDataBuilder.Configuration configuration) { - AwsBasicCredentials creds = AwsBasicCredentials.create(configuration.awsAccessKey, configuration.awsSecretKey); + AwsBasicCredentials creds = + AwsBasicCredentials.create(configuration.awsAccessKey, configuration.awsSecretKey); StaticCredentialsProvider provider = StaticCredentialsProvider.create(creds); KinesisIO.Read readTransform = KinesisIO.read() .withStreamName(configuration.streamName) .withClientConfiguration( ClientConfiguration.builder() - .credentialsProvider(provider) - .region(configuration.region) - .endpoint(configuration.serviceEndpoint) - .build()); + .credentialsProvider(provider) + .region(configuration.region) + .endpoint(configuration.serviceEndpoint) + .build()); if (configuration.maxNumRecords != null) { readTransform = readTransform.withMaxNumRecords(configuration.maxNumRecords); @@ -252,15 +263,34 @@ public PTransform> buildExternal( readTransform = readTransform.withInitialTimestampInStream(configuration.initialTimestampInStream); } + + return new KinesisReadToBytes(readTransform); + } + } + + public static class KinesisReadToBytes extends PTransform> { + private KinesisIO.Read readTransform; + + private KinesisReadToBytes(KinesisIO.Read readTransform) { + this.readTransform = readTransform; + } + + @Override + public PCollection expand(PBegin input) { // Convert back to bytes to keep consistency with previous verison: // https://github.com/apache/beam/blob/5eed396caf9e0065d8ed82edcc236bad5b71ba22/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/KinesisTransformRegistrar.java - return readTransform.apply("Convert to bytes", ParDo.of(new DoFn() { - @ProcessElement - public void processElement(ProcessContext c) { - KinesisRecord record = c.element(); - return record.getDataAsBytes(); - } - })); + return input + .apply(this.readTransform) + .apply( + "Convert to bytes", + ParDo.of( + new DoFn() { + @ProcessElement + public byte[] processElement(ProcessContext c) { + KinesisRecord record = c.element(); + return record.getDataAsBytes(); + } + })); } } -} \ No newline at end of file +} From fb462b4fc1fa55fad63e460744dd4a2aae633bd8 Mon Sep 17 00:00:00 2001 From: Danny McCormick Date: Thu, 19 Dec 2024 15:40:10 +0000 Subject: [PATCH 08/43] fmt --- sdks/python/apache_beam/io/kinesis.py | 24 ++++++++++++------------ 1 file changed, 12 insertions(+), 12 deletions(-) diff --git a/sdks/python/apache_beam/io/kinesis.py b/sdks/python/apache_beam/io/kinesis.py index 1640b25830b4..6d924e1992b8 100644 --- a/sdks/python/apache_beam/io/kinesis.py +++ b/sdks/python/apache_beam/io/kinesis.py @@ -157,17 +157,17 @@ def __init__( # https://javadoc.io/doc/com.amazonaws/amazon-kinesis-producer/0.14.0/com/amazonaws/services/kinesis/producer/KinesisProducerConfiguration.html#isVerifyCertificate-- # With the new AWS client, we no longer support it and it is always True raise ValueError( - 'verify_certificate set to False. This option is no longer ' + - 'supported and certificate verification will still happen.') + 'verify_certificate set to False. This option is no longer ' + + 'supported and certificate verification will still happen.') if verify_certificate is True: logging.warning( - 'verify_certificate set to True. This option is no longer ' + - 'supported and certificate verification will automatically happen. ' + - 'This option may be removed in a future release') + 'verify_certificate set to True. This option is no longer ' + + 'supported and certificate verification will automatically ' + + 'happen. This option may be removed in a future release') if producer_properties is not None: raise ValueError( - 'producer_properties is no longer supported and will be removed in ' + - 'a future release.') + 'producer_properties is no longer supported and will be removed ' + + 'in a future release.') super().__init__( self.URN, NamedTupleBasedPayloadBuilder( @@ -294,13 +294,13 @@ def __init__( # https://javadoc.io/doc/com.amazonaws/amazon-kinesis-producer/0.14.0/com/amazonaws/services/kinesis/producer/KinesisProducerConfiguration.html#isVerifyCertificate-- # With the new AWS client, we no longer support it and it is always True raise ValueError( - 'verify_certificate set to False. This option is no longer ' + - 'supported and certificate verification will still happen.') + 'verify_certificate set to False. This option is no longer ' + + 'supported and certificate verification will still happen.') if verify_certificate is True: logging.warning( - 'verify_certificate set to True. This option is no longer ' + - 'supported and certificate verification will automatically happen. ' + - 'This option may be removed in a future release') + 'verify_certificate set to True. This option is no longer ' + + 'supported and certificate verification will automatically ' + + 'happen. This option may be removed in a future release') super().__init__( self.URN, From d91a113dfa1a8f6cb9f52c3d272de16739d36f69 Mon Sep 17 00:00:00 2001 From: Danny McCormick Date: Thu, 19 Dec 2024 17:44:07 +0000 Subject: [PATCH 09/43] Fix test --- .../apache_beam/io/external/xlang_kinesisio_it_test.py | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/sdks/python/apache_beam/io/external/xlang_kinesisio_it_test.py b/sdks/python/apache_beam/io/external/xlang_kinesisio_it_test.py index 44e6b78f4e93..d37ce1f87fe3 100644 --- a/sdks/python/apache_beam/io/external/xlang_kinesisio_it_test.py +++ b/sdks/python/apache_beam/io/external/xlang_kinesisio_it_test.py @@ -115,9 +115,7 @@ def run_kinesis_write(self): aws_secret_key=self.aws_secret_key, region=self.aws_region, service_endpoint=self.aws_service_endpoint, - verify_certificate=(not self.use_localstack), - partition_key='1' - )) + partition_key='1')) def run_kinesis_read(self): records = [RECORD + str(i).encode() for i in range(NUM_RECORDS)] @@ -131,7 +129,6 @@ def run_kinesis_read(self): aws_secret_key=self.aws_secret_key, region=self.aws_region, service_endpoint=self.aws_service_endpoint, - verify_certificate=not self.use_localstack, max_num_records=NUM_RECORDS, max_read_time=MAX_READ_TIME, request_records_limit=REQUEST_RECORDS_LIMIT, From 90c13e9948c669b8f65d9be9119948ab18a4da37 Mon Sep 17 00:00:00 2001 From: Danny McCormick Date: Thu, 19 Dec 2024 17:45:05 +0000 Subject: [PATCH 10/43] lint --- sdks/python/apache_beam/io/kinesis.py | 1 - 1 file changed, 1 deletion(-) diff --git a/sdks/python/apache_beam/io/kinesis.py b/sdks/python/apache_beam/io/kinesis.py index 6d924e1992b8..cdae53e71c5a 100644 --- a/sdks/python/apache_beam/io/kinesis.py +++ b/sdks/python/apache_beam/io/kinesis.py @@ -82,7 +82,6 @@ import logging import time -from typing import Mapping from typing import NamedTuple from typing import Optional From a9eed7d7c2b7950d818b7d507b163b53071b45f8 Mon Sep 17 00:00:00 2001 From: Danny McCormick Date: Thu, 19 Dec 2024 20:57:04 +0000 Subject: [PATCH 11/43] Add serializer --- .../beam/sdk/io/aws2/kinesis/KinesisTransformRegistrar.java | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/sdks/java/io/amazon-web-services2/src/main/java/org/apache/beam/sdk/io/aws2/kinesis/KinesisTransformRegistrar.java b/sdks/java/io/amazon-web-services2/src/main/java/org/apache/beam/sdk/io/aws2/kinesis/KinesisTransformRegistrar.java index 5f11c9d21461..f3a5e261ff7f 100644 --- a/sdks/java/io/amazon-web-services2/src/main/java/org/apache/beam/sdk/io/aws2/kinesis/KinesisTransformRegistrar.java +++ b/sdks/java/io/amazon-web-services2/src/main/java/org/apache/beam/sdk/io/aws2/kinesis/KinesisTransformRegistrar.java @@ -109,6 +109,7 @@ public PTransform, KinesisIO.Write.Result> buildExternal( AwsBasicCredentials creds = AwsBasicCredentials.create(configuration.awsAccessKey, configuration.awsSecretKey); StaticCredentialsProvider provider = StaticCredentialsProvider.create(creds); + SerializableFunction serializer = v -> v; KinesisIO.Write writeTransform = KinesisIO.write() .withStreamName(configuration.streamName) @@ -118,7 +119,8 @@ public PTransform, KinesisIO.Write.Result> buildExternal( .region(configuration.region) .endpoint(configuration.serviceEndpoint) .build()) - .withPartitioner(p -> configuration.partitionKey); + .withPartitioner(p -> configuration.partitionKey) + .withSerializer(serializer); return writeTransform; } From 3f01f70d9f1fc32c6d695025d2a4c9f5b1be60c9 Mon Sep 17 00:00:00 2001 From: Danny McCormick Date: Thu, 19 Dec 2024 21:22:06 +0000 Subject: [PATCH 12/43] Add serializer --- .../beam/sdk/io/aws2/kinesis/KinesisTransformRegistrar.java | 1 + 1 file changed, 1 insertion(+) diff --git a/sdks/java/io/amazon-web-services2/src/main/java/org/apache/beam/sdk/io/aws2/kinesis/KinesisTransformRegistrar.java b/sdks/java/io/amazon-web-services2/src/main/java/org/apache/beam/sdk/io/aws2/kinesis/KinesisTransformRegistrar.java index f3a5e261ff7f..8dfe1f07729e 100644 --- a/sdks/java/io/amazon-web-services2/src/main/java/org/apache/beam/sdk/io/aws2/kinesis/KinesisTransformRegistrar.java +++ b/sdks/java/io/amazon-web-services2/src/main/java/org/apache/beam/sdk/io/aws2/kinesis/KinesisTransformRegistrar.java @@ -27,6 +27,7 @@ import org.apache.beam.sdk.transforms.ExternalTransformBuilder; import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.transforms.SerializableFunction; import org.apache.beam.sdk.values.PBegin; import org.apache.beam.sdk.values.PCollection; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap; From a4aa6f1944ea03548b237379f86c0278be3c539b Mon Sep 17 00:00:00 2001 From: Danny McCormick Date: Fri, 20 Dec 2024 14:44:23 +0000 Subject: [PATCH 13/43] Allow configuration to be serialized --- .../kinesis/KinesisTransformRegistrar.java | 35 +++++++++++++------ 1 file changed, 24 insertions(+), 11 deletions(-) diff --git a/sdks/java/io/amazon-web-services2/src/main/java/org/apache/beam/sdk/io/aws2/kinesis/KinesisTransformRegistrar.java b/sdks/java/io/amazon-web-services2/src/main/java/org/apache/beam/sdk/io/aws2/kinesis/KinesisTransformRegistrar.java index 8dfe1f07729e..ddb80131f6b7 100644 --- a/sdks/java/io/amazon-web-services2/src/main/java/org/apache/beam/sdk/io/aws2/kinesis/KinesisTransformRegistrar.java +++ b/sdks/java/io/amazon-web-services2/src/main/java/org/apache/beam/sdk/io/aws2/kinesis/KinesisTransformRegistrar.java @@ -62,7 +62,7 @@ private abstract static class CrossLanguageConfiguration { String awsAccessKey; String awsSecretKey; Region region; - @Nullable URI serviceEndpoint; + @Nullable String serviceEndpoint; public void setStreamName(String streamName) { this.streamName = streamName; @@ -81,14 +81,7 @@ public void setRegion(String region) { } public void setServiceEndpoint(@Nullable String serviceEndpoint) { - if (serviceEndpoint != null) { - try { - this.serviceEndpoint = new URI(serviceEndpoint); - } catch (URISyntaxException ex) { - throw new RuntimeException( - String.format("Service endpoint must be URI format, got: %s", serviceEndpoint)); - } - } + this.serviceEndpoint = serviceEndpoint; } } @@ -111,6 +104,16 @@ public PTransform, KinesisIO.Write.Result> buildExternal( AwsBasicCredentials.create(configuration.awsAccessKey, configuration.awsSecretKey); StaticCredentialsProvider provider = StaticCredentialsProvider.create(creds); SerializableFunction serializer = v -> v; + @Nullable URI endpoint; + if (configuration.serviceEndpoint != null) { + try { + endpoint = configuration.serviceEndpoint; + } + catch (URISyntaxException ex) { + throw new RuntimeException( + String.format("Service endpoint must be URI format, got: %s", serviceEndpoint)); + } + } KinesisIO.Write writeTransform = KinesisIO.write() .withStreamName(configuration.streamName) @@ -118,7 +121,7 @@ public PTransform, KinesisIO.Write.Result> buildExternal( ClientConfiguration.builder() .credentialsProvider(provider) .region(configuration.region) - .endpoint(configuration.serviceEndpoint) + .endpoint(endpoint) .build()) .withPartitioner(p -> configuration.partitionKey) .withSerializer(serializer); @@ -211,6 +214,16 @@ public PTransform> buildExternal( AwsBasicCredentials creds = AwsBasicCredentials.create(configuration.awsAccessKey, configuration.awsSecretKey); StaticCredentialsProvider provider = StaticCredentialsProvider.create(creds); + @Nullable URI endpoint; + if (configuration.serviceEndpoint != null) { + try { + endpoint = configuration.serviceEndpoint; + } + catch (URISyntaxException ex) { + throw new RuntimeException( + String.format("Service endpoint must be URI format, got: %s", serviceEndpoint)); + } + } KinesisIO.Read readTransform = KinesisIO.read() .withStreamName(configuration.streamName) @@ -218,7 +231,7 @@ public PTransform> buildExternal( ClientConfiguration.builder() .credentialsProvider(provider) .region(configuration.region) - .endpoint(configuration.serviceEndpoint) + .endpoint(endpoint) .build()); if (configuration.maxNumRecords != null) { From 7a63a0b17e5de6fe087f2c0ea871af2de23014e3 Mon Sep 17 00:00:00 2001 From: Danny McCormick Date: Fri, 20 Dec 2024 15:18:20 +0000 Subject: [PATCH 14/43] Allow configuration to be serialized --- .../io/aws2/kinesis/KinesisTransformRegistrar.java | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/sdks/java/io/amazon-web-services2/src/main/java/org/apache/beam/sdk/io/aws2/kinesis/KinesisTransformRegistrar.java b/sdks/java/io/amazon-web-services2/src/main/java/org/apache/beam/sdk/io/aws2/kinesis/KinesisTransformRegistrar.java index ddb80131f6b7..42d06e81f66e 100644 --- a/sdks/java/io/amazon-web-services2/src/main/java/org/apache/beam/sdk/io/aws2/kinesis/KinesisTransformRegistrar.java +++ b/sdks/java/io/amazon-web-services2/src/main/java/org/apache/beam/sdk/io/aws2/kinesis/KinesisTransformRegistrar.java @@ -104,14 +104,14 @@ public PTransform, KinesisIO.Write.Result> buildExternal( AwsBasicCredentials.create(configuration.awsAccessKey, configuration.awsSecretKey); StaticCredentialsProvider provider = StaticCredentialsProvider.create(creds); SerializableFunction serializer = v -> v; - @Nullable URI endpoint; + @Nullable URI endpoint = null; if (configuration.serviceEndpoint != null) { try { - endpoint = configuration.serviceEndpoint; + endpoint = new URI(configuration.serviceEndpoint); } catch (URISyntaxException ex) { throw new RuntimeException( - String.format("Service endpoint must be URI format, got: %s", serviceEndpoint)); + String.format("Service endpoint must be URI format, got: %s", configuration.serviceEndpoint)); } } KinesisIO.Write writeTransform = @@ -214,14 +214,14 @@ public PTransform> buildExternal( AwsBasicCredentials creds = AwsBasicCredentials.create(configuration.awsAccessKey, configuration.awsSecretKey); StaticCredentialsProvider provider = StaticCredentialsProvider.create(creds); - @Nullable URI endpoint; + @Nullable URI endpoint = null; if (configuration.serviceEndpoint != null) { try { - endpoint = configuration.serviceEndpoint; + endpoint = new URI(configuration.serviceEndpoint); } catch (URISyntaxException ex) { throw new RuntimeException( - String.format("Service endpoint must be URI format, got: %s", serviceEndpoint)); + String.format("Service endpoint must be URI format, got: %s", configuration.serviceEndpoint)); } } KinesisIO.Read readTransform = From d27843403d3a8eb7d5151e692efce392dd370abf Mon Sep 17 00:00:00 2001 From: Danny McCormick Date: Fri, 20 Dec 2024 17:27:51 +0000 Subject: [PATCH 15/43] Allow configuration to be serialized --- .../sdk/io/aws2/kinesis/KinesisTransformRegistrar.java | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/sdks/java/io/amazon-web-services2/src/main/java/org/apache/beam/sdk/io/aws2/kinesis/KinesisTransformRegistrar.java b/sdks/java/io/amazon-web-services2/src/main/java/org/apache/beam/sdk/io/aws2/kinesis/KinesisTransformRegistrar.java index 42d06e81f66e..944aa7d7a202 100644 --- a/sdks/java/io/amazon-web-services2/src/main/java/org/apache/beam/sdk/io/aws2/kinesis/KinesisTransformRegistrar.java +++ b/sdks/java/io/amazon-web-services2/src/main/java/org/apache/beam/sdk/io/aws2/kinesis/KinesisTransformRegistrar.java @@ -61,7 +61,7 @@ private abstract static class CrossLanguageConfiguration { String streamName; String awsAccessKey; String awsSecretKey; - Region region; + String region; @Nullable String serviceEndpoint; public void setStreamName(String streamName) { @@ -77,7 +77,7 @@ public void setAwsSecretKey(String awsSecretKey) { } public void setRegion(String region) { - this.region = Region.of(region); + this.region = region; } public void setServiceEndpoint(@Nullable String serviceEndpoint) { @@ -120,7 +120,7 @@ public PTransform, KinesisIO.Write.Result> buildExternal( .withClientConfiguration( ClientConfiguration.builder() .credentialsProvider(provider) - .region(configuration.region) + .region(Regions.fromName(configuration.region)) .endpoint(endpoint) .build()) .withPartitioner(p -> configuration.partitionKey) @@ -230,7 +230,7 @@ public PTransform> buildExternal( .withClientConfiguration( ClientConfiguration.builder() .credentialsProvider(provider) - .region(configuration.region) + .region(Regions.fromName(configuration.region)) .endpoint(endpoint) .build()); From 1b79da4fe7010d87aae5e911d34c0cb0a7b22f4f Mon Sep 17 00:00:00 2001 From: Danny McCormick Date: Fri, 20 Dec 2024 17:54:42 +0000 Subject: [PATCH 16/43] Allow configuration to be serialized --- .../beam/sdk/io/aws2/kinesis/KinesisTransformRegistrar.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/sdks/java/io/amazon-web-services2/src/main/java/org/apache/beam/sdk/io/aws2/kinesis/KinesisTransformRegistrar.java b/sdks/java/io/amazon-web-services2/src/main/java/org/apache/beam/sdk/io/aws2/kinesis/KinesisTransformRegistrar.java index 944aa7d7a202..0908071646a9 100644 --- a/sdks/java/io/amazon-web-services2/src/main/java/org/apache/beam/sdk/io/aws2/kinesis/KinesisTransformRegistrar.java +++ b/sdks/java/io/amazon-web-services2/src/main/java/org/apache/beam/sdk/io/aws2/kinesis/KinesisTransformRegistrar.java @@ -120,7 +120,7 @@ public PTransform, KinesisIO.Write.Result> buildExternal( .withClientConfiguration( ClientConfiguration.builder() .credentialsProvider(provider) - .region(Regions.fromName(configuration.region)) + .region(Region.of(configuration.region)) .endpoint(endpoint) .build()) .withPartitioner(p -> configuration.partitionKey) @@ -230,7 +230,7 @@ public PTransform> buildExternal( .withClientConfiguration( ClientConfiguration.builder() .credentialsProvider(provider) - .region(Regions.fromName(configuration.region)) + .region(Region.of(configuration.region)) .endpoint(endpoint) .build()); From f0c0f5875c3030effa721b28ef73640b6350d386 Mon Sep 17 00:00:00 2001 From: Danny McCormick Date: Fri, 20 Dec 2024 20:58:38 +0000 Subject: [PATCH 17/43] debug info --- .../java/org/apache/beam/sdk/util/SerializableUtils.java | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/SerializableUtils.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/SerializableUtils.java index 54f2a6572f6f..c4941e22c4a1 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/SerializableUtils.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/SerializableUtils.java @@ -21,6 +21,8 @@ import static org.apache.beam.sdk.util.CoderUtils.encodeToByteArray; import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkState; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.ObjectWriter; import java.io.ByteArrayInputStream; import java.io.ByteArrayOutputStream; import java.io.IOException; @@ -56,7 +58,8 @@ public static byte[] serializeToByteArray(Serializable value) { } return buffer.toByteArray(); } catch (IOException exn) { - throw new IllegalArgumentException("unable to serialize " + value, exn); + ObjectWriter ow = new ObjectMapper().writer().withDefaultPrettyPrinter(); + throw new IllegalArgumentException("unable to serialize " + ow.writeValueAsString(object), exn); } } From 5f816a16681a348aa4c39ea4269181913c913441 Mon Sep 17 00:00:00 2001 From: Danny McCormick Date: Fri, 20 Dec 2024 21:13:10 +0000 Subject: [PATCH 18/43] debug info --- .../main/java/org/apache/beam/sdk/util/SerializableUtils.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/SerializableUtils.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/SerializableUtils.java index c4941e22c4a1..82f109d1ad43 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/SerializableUtils.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/SerializableUtils.java @@ -59,7 +59,7 @@ public static byte[] serializeToByteArray(Serializable value) { return buffer.toByteArray(); } catch (IOException exn) { ObjectWriter ow = new ObjectMapper().writer().withDefaultPrettyPrinter(); - throw new IllegalArgumentException("unable to serialize " + ow.writeValueAsString(object), exn); + throw new IllegalArgumentException("unable to serialize " + ow.writeValueAsString(value), exn); } } From 0e943df8a139f95a8e89654b7151f88d6f0659ee Mon Sep 17 00:00:00 2001 From: Danny McCormick Date: Fri, 20 Dec 2024 21:25:43 +0000 Subject: [PATCH 19/43] debug info --- .../java/org/apache/beam/sdk/util/SerializableUtils.java | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/SerializableUtils.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/SerializableUtils.java index 82f109d1ad43..fe148d917825 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/SerializableUtils.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/SerializableUtils.java @@ -23,6 +23,7 @@ import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.ObjectWriter; +import com.fasterxml.jackson.databind.JsonMappingException; import java.io.ByteArrayInputStream; import java.io.ByteArrayOutputStream; import java.io.IOException; @@ -59,7 +60,11 @@ public static byte[] serializeToByteArray(Serializable value) { return buffer.toByteArray(); } catch (IOException exn) { ObjectWriter ow = new ObjectMapper().writer().withDefaultPrettyPrinter(); - throw new IllegalArgumentException("unable to serialize " + ow.writeValueAsString(value), exn); + try { + throw new IllegalArgumentException("unable to serialize " + ow.writeValueAsString(value), exn); + } catch (JsonProcessingException ex) { + IllegalArgumentException("unable to jsonify " + value, exn); + } } } From d13783243d9e260610c50d09b4986609b38d74b9 Mon Sep 17 00:00:00 2001 From: Danny McCormick Date: Fri, 20 Dec 2024 21:28:26 +0000 Subject: [PATCH 20/43] debug info --- .../main/java/org/apache/beam/sdk/util/SerializableUtils.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/SerializableUtils.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/SerializableUtils.java index fe148d917825..20dfcf75cc41 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/SerializableUtils.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/SerializableUtils.java @@ -63,7 +63,7 @@ public static byte[] serializeToByteArray(Serializable value) { try { throw new IllegalArgumentException("unable to serialize " + ow.writeValueAsString(value), exn); } catch (JsonProcessingException ex) { - IllegalArgumentException("unable to jsonify " + value, exn); + throw new IllegalArgumentException("unable to jsonify " + value, exn); } } } From c6c736d94ea9f5bc6e9332316d115822740e1763 Mon Sep 17 00:00:00 2001 From: Danny McCormick Date: Fri, 20 Dec 2024 21:30:20 +0000 Subject: [PATCH 21/43] debug info --- .../org/apache/beam/sdk/util/SerializableUtils.java | 7 ++++--- .../io/aws2/kinesis/KinesisTransformRegistrar.java | 12 ++++++------ 2 files changed, 10 insertions(+), 9 deletions(-) diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/SerializableUtils.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/SerializableUtils.java index 20dfcf75cc41..43c0b343c3bf 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/SerializableUtils.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/SerializableUtils.java @@ -21,8 +21,8 @@ import static org.apache.beam.sdk.util.CoderUtils.encodeToByteArray; import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkState; -import com.fasterxml.jackson.databind.ObjectMapper; -import com.fasterxml.jackson.databind.ObjectWriter; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.ObjectWriter; import com.fasterxml.jackson.databind.JsonMappingException; import java.io.ByteArrayInputStream; import java.io.ByteArrayOutputStream; @@ -61,7 +61,8 @@ public static byte[] serializeToByteArray(Serializable value) { } catch (IOException exn) { ObjectWriter ow = new ObjectMapper().writer().withDefaultPrettyPrinter(); try { - throw new IllegalArgumentException("unable to serialize " + ow.writeValueAsString(value), exn); + throw new IllegalArgumentException( + "unable to serialize " + ow.writeValueAsString(value), exn); } catch (JsonProcessingException ex) { throw new IllegalArgumentException("unable to jsonify " + value, exn); } diff --git a/sdks/java/io/amazon-web-services2/src/main/java/org/apache/beam/sdk/io/aws2/kinesis/KinesisTransformRegistrar.java b/sdks/java/io/amazon-web-services2/src/main/java/org/apache/beam/sdk/io/aws2/kinesis/KinesisTransformRegistrar.java index 0908071646a9..b7e71fe18f4d 100644 --- a/sdks/java/io/amazon-web-services2/src/main/java/org/apache/beam/sdk/io/aws2/kinesis/KinesisTransformRegistrar.java +++ b/sdks/java/io/amazon-web-services2/src/main/java/org/apache/beam/sdk/io/aws2/kinesis/KinesisTransformRegistrar.java @@ -108,10 +108,10 @@ public PTransform, KinesisIO.Write.Result> buildExternal( if (configuration.serviceEndpoint != null) { try { endpoint = new URI(configuration.serviceEndpoint); - } - catch (URISyntaxException ex) { + } catch (URISyntaxException ex) { throw new RuntimeException( - String.format("Service endpoint must be URI format, got: %s", configuration.serviceEndpoint)); + String.format( + "Service endpoint must be URI format, got: %s", configuration.serviceEndpoint)); } } KinesisIO.Write writeTransform = @@ -218,10 +218,10 @@ public PTransform> buildExternal( if (configuration.serviceEndpoint != null) { try { endpoint = new URI(configuration.serviceEndpoint); - } - catch (URISyntaxException ex) { + } catch (URISyntaxException ex) { throw new RuntimeException( - String.format("Service endpoint must be URI format, got: %s", configuration.serviceEndpoint)); + String.format( + "Service endpoint must be URI format, got: %s", configuration.serviceEndpoint)); } } KinesisIO.Read readTransform = From 4260926cb82e24929e3fe8e83c5309f5d48c426b Mon Sep 17 00:00:00 2001 From: Danny McCormick Date: Fri, 20 Dec 2024 21:34:12 +0000 Subject: [PATCH 22/43] debug info --- .../main/java/org/apache/beam/sdk/util/SerializableUtils.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/SerializableUtils.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/SerializableUtils.java index 43c0b343c3bf..4f01248cf2fc 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/SerializableUtils.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/SerializableUtils.java @@ -21,9 +21,9 @@ import static org.apache.beam.sdk.util.CoderUtils.encodeToByteArray; import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkState; +import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.ObjectWriter; -import com.fasterxml.jackson.databind.JsonMappingException; import java.io.ByteArrayInputStream; import java.io.ByteArrayOutputStream; import java.io.IOException; From d737e5bbf2500bacb18e673d2042e71a25cbf622 Mon Sep 17 00:00:00 2001 From: Danny McCormick Date: Mon, 23 Dec 2024 13:14:35 +0000 Subject: [PATCH 23/43] Allow writebuilder to be serialized --- .../org/apache/beam/sdk/util/SerializableUtils.java | 11 +---------- .../io/aws2/kinesis/KinesisTransformRegistrar.java | 3 ++- 2 files changed, 3 insertions(+), 11 deletions(-) diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/SerializableUtils.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/SerializableUtils.java index 4f01248cf2fc..54f2a6572f6f 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/SerializableUtils.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/SerializableUtils.java @@ -21,9 +21,6 @@ import static org.apache.beam.sdk.util.CoderUtils.encodeToByteArray; import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkState; -import com.fasterxml.jackson.core.JsonProcessingException; -import com.fasterxml.jackson.databind.ObjectMapper; -import com.fasterxml.jackson.databind.ObjectWriter; import java.io.ByteArrayInputStream; import java.io.ByteArrayOutputStream; import java.io.IOException; @@ -59,13 +56,7 @@ public static byte[] serializeToByteArray(Serializable value) { } return buffer.toByteArray(); } catch (IOException exn) { - ObjectWriter ow = new ObjectMapper().writer().withDefaultPrettyPrinter(); - try { - throw new IllegalArgumentException( - "unable to serialize " + ow.writeValueAsString(value), exn); - } catch (JsonProcessingException ex) { - throw new IllegalArgumentException("unable to jsonify " + value, exn); - } + throw new IllegalArgumentException("unable to serialize " + value, exn); } } diff --git a/sdks/java/io/amazon-web-services2/src/main/java/org/apache/beam/sdk/io/aws2/kinesis/KinesisTransformRegistrar.java b/sdks/java/io/amazon-web-services2/src/main/java/org/apache/beam/sdk/io/aws2/kinesis/KinesisTransformRegistrar.java index b7e71fe18f4d..8fbf26537d4f 100644 --- a/sdks/java/io/amazon-web-services2/src/main/java/org/apache/beam/sdk/io/aws2/kinesis/KinesisTransformRegistrar.java +++ b/sdks/java/io/amazon-web-services2/src/main/java/org/apache/beam/sdk/io/aws2/kinesis/KinesisTransformRegistrar.java @@ -102,6 +102,7 @@ public PTransform, KinesisIO.Write.Result> buildExternal( Configuration configuration) { AwsBasicCredentials creds = AwsBasicCredentials.create(configuration.awsAccessKey, configuration.awsSecretKey); + String pk = configuration.partitionKey; StaticCredentialsProvider provider = StaticCredentialsProvider.create(creds); SerializableFunction serializer = v -> v; @Nullable URI endpoint = null; @@ -123,7 +124,7 @@ public PTransform, KinesisIO.Write.Result> buildExternal( .region(Region.of(configuration.region)) .endpoint(endpoint) .build()) - .withPartitioner(p -> configuration.partitionKey) + .withPartitioner(p -> pk) .withSerializer(serializer); return writeTransform; From 64d391e4cea318731647a34f0f88b4830dffe837 Mon Sep 17 00:00:00 2001 From: Danny Mccormick Date: Mon, 23 Dec 2024 11:04:26 -0500 Subject: [PATCH 24/43] Try skipping certs --- .../beam/sdk/io/aws2/common/ClientBuilderFactory.java | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/sdks/java/io/amazon-web-services2/src/main/java/org/apache/beam/sdk/io/aws2/common/ClientBuilderFactory.java b/sdks/java/io/amazon-web-services2/src/main/java/org/apache/beam/sdk/io/aws2/common/ClientBuilderFactory.java index 6398de57b5c3..61916e091dbd 100644 --- a/sdks/java/io/amazon-web-services2/src/main/java/org/apache/beam/sdk/io/aws2/common/ClientBuilderFactory.java +++ b/sdks/java/io/amazon-web-services2/src/main/java/org/apache/beam/sdk/io/aws2/common/ClientBuilderFactory.java @@ -40,6 +40,7 @@ import software.amazon.awssdk.http.apache.ApacheHttpClient; import software.amazon.awssdk.http.apache.ProxyConfiguration; import software.amazon.awssdk.http.nio.netty.NettyNioAsyncHttpClient; +import software.amazon.awssdk.internal.http.NoneTlsKeyManagersProvider; import software.amazon.awssdk.regions.Region; /** @@ -177,6 +178,9 @@ public , ClientT> BuilderT setOptional(httpConfig.maxConnections(), client::maxConnections); } + // TODO - gate this behind a flag. + client.tlsKeyManagersProvider(NoneTlsKeyManagersProvider.getInstance()); + // must use builder to make sure client is managed by the SDK ((SdkSyncClientBuilder) builder).httpClientBuilder(client); } else if (builder instanceof SdkAsyncClientBuilder) { @@ -199,6 +203,9 @@ public , ClientT> BuilderT httpConfig.socketTimeout(), client::writeTimeout); // fallback for writeTimeout setOptional(httpConfig.writeTimeout(), client::writeTimeout); setOptional(httpConfig.maxConnections(), client::maxConcurrency); + + // TODO - gate this behind a flag. + client.tlsKeyManagersProvider(NoneTlsKeyManagersProvider.getInstance()); } // must use builder to make sure client is managed by the SDK From 80daeb5a390efb5d14b93735e5987236b582ad61 Mon Sep 17 00:00:00 2001 From: Danny Mccormick Date: Mon, 23 Dec 2024 11:18:46 -0500 Subject: [PATCH 25/43] Make sure it gets set for now --- .../beam/sdk/io/aws2/common/ClientBuilderFactory.java | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/sdks/java/io/amazon-web-services2/src/main/java/org/apache/beam/sdk/io/aws2/common/ClientBuilderFactory.java b/sdks/java/io/amazon-web-services2/src/main/java/org/apache/beam/sdk/io/aws2/common/ClientBuilderFactory.java index 61916e091dbd..9a1772ce878b 100644 --- a/sdks/java/io/amazon-web-services2/src/main/java/org/apache/beam/sdk/io/aws2/common/ClientBuilderFactory.java +++ b/sdks/java/io/amazon-web-services2/src/main/java/org/apache/beam/sdk/io/aws2/common/ClientBuilderFactory.java @@ -203,11 +203,11 @@ public , ClientT> BuilderT httpConfig.socketTimeout(), client::writeTimeout); // fallback for writeTimeout setOptional(httpConfig.writeTimeout(), client::writeTimeout); setOptional(httpConfig.maxConnections(), client::maxConcurrency); - - // TODO - gate this behind a flag. - client.tlsKeyManagersProvider(NoneTlsKeyManagersProvider.getInstance()); } + // TODO - gate this behind a flag. + client.tlsKeyManagersProvider(NoneTlsKeyManagersProvider.getInstance()); + // must use builder to make sure client is managed by the SDK ((SdkAsyncClientBuilder) builder).httpClientBuilder(client); } From 4c8992cc13ee5ef778d17a9e3184b9506ade5397 Mon Sep 17 00:00:00 2001 From: Danny Mccormick Date: Mon, 23 Dec 2024 13:17:19 -0500 Subject: [PATCH 26/43] put behind flag --- .../io/aws2/common/ClientBuilderFactory.java | 16 +++++--- .../io/aws2/common/ClientConfiguration.java | 14 +++++++ .../kinesis/KinesisTransformRegistrar.java | 7 ++++ .../io/external/xlang_kinesisio_it_test.py | 1 + sdks/python/apache_beam/io/kinesis.py | 37 ++++--------------- 5 files changed, 41 insertions(+), 34 deletions(-) diff --git a/sdks/java/io/amazon-web-services2/src/main/java/org/apache/beam/sdk/io/aws2/common/ClientBuilderFactory.java b/sdks/java/io/amazon-web-services2/src/main/java/org/apache/beam/sdk/io/aws2/common/ClientBuilderFactory.java index 9a1772ce878b..85aa99cea360 100644 --- a/sdks/java/io/amazon-web-services2/src/main/java/org/apache/beam/sdk/io/aws2/common/ClientBuilderFactory.java +++ b/sdks/java/io/amazon-web-services2/src/main/java/org/apache/beam/sdk/io/aws2/common/ClientBuilderFactory.java @@ -162,7 +162,11 @@ public , ClientT> BuilderT HttpClientConfiguration httpConfig = options.getHttpClientConfiguration(); ProxyConfiguration proxyConfig = options.getProxyConfiguration(); - if (proxyConfig != null || httpConfig != null) { + boolean skipCertificateVerification = false; + if (config.skipCertificateVerification() != null) { + skipCertificateVerification = config.skipCertificateVerification(); + } + if (proxyConfig != null || httpConfig != null || skipCertificateVerification) { if (builder instanceof SdkSyncClientBuilder) { ApacheHttpClient.Builder client = syncClientBuilder(); @@ -178,8 +182,9 @@ public , ClientT> BuilderT setOptional(httpConfig.maxConnections(), client::maxConnections); } - // TODO - gate this behind a flag. - client.tlsKeyManagersProvider(NoneTlsKeyManagersProvider.getInstance()); + if (skipCertificateVerification) { + client.tlsKeyManagersProvider(NoneTlsKeyManagersProvider.getInstance()); + } // must use builder to make sure client is managed by the SDK ((SdkSyncClientBuilder) builder).httpClientBuilder(client); @@ -205,8 +210,9 @@ public , ClientT> BuilderT setOptional(httpConfig.maxConnections(), client::maxConcurrency); } - // TODO - gate this behind a flag. - client.tlsKeyManagersProvider(NoneTlsKeyManagersProvider.getInstance()); + if (skipCertificateVerification) { + client.tlsKeyManagersProvider(NoneTlsKeyManagersProvider.getInstance()); + } // must use builder to make sure client is managed by the SDK ((SdkAsyncClientBuilder) builder).httpClientBuilder(client); diff --git a/sdks/java/io/amazon-web-services2/src/main/java/org/apache/beam/sdk/io/aws2/common/ClientConfiguration.java b/sdks/java/io/amazon-web-services2/src/main/java/org/apache/beam/sdk/io/aws2/common/ClientConfiguration.java index 08fb595bd037..385a25b5a13f 100644 --- a/sdks/java/io/amazon-web-services2/src/main/java/org/apache/beam/sdk/io/aws2/common/ClientConfiguration.java +++ b/sdks/java/io/amazon-web-services2/src/main/java/org/apache/beam/sdk/io/aws2/common/ClientConfiguration.java @@ -76,6 +76,13 @@ public abstract class ClientConfiguration implements Serializable { return regionId() != null ? Region.of(regionId()) : null; } + /** + * Optional flag to skip certificate verification. Should only be overriden for test scenarios. If + * set, this overwrites the default in {@link AwsOptions#skipCertificateVerification()}. + */ + @JsonProperty + public abstract @Nullable @Pure Boolean skipCertificateVerification(); + /** * Optional service endpoint to use AWS compatible services instead, e.g. for testing. If set, * this overwrites the default in {@link AwsOptions#getEndpoint()}. @@ -156,6 +163,13 @@ public Builder retry(Consumer retry) { return retry(builder.build()); } + /** + * Optional flag to skip certificate verification. Should only be overriden for test scenarios. + * If set, this overwrites the default in {@link AwsOptions#skipCertificateVerification()}. + */ + @JsonProperty + public abstract Builder skipCertificateVerification(boolean skipCertificateVerification); + abstract Builder regionId(String region); abstract Builder credentialsProviderAsJson(String credentialsProvider); diff --git a/sdks/java/io/amazon-web-services2/src/main/java/org/apache/beam/sdk/io/aws2/kinesis/KinesisTransformRegistrar.java b/sdks/java/io/amazon-web-services2/src/main/java/org/apache/beam/sdk/io/aws2/kinesis/KinesisTransformRegistrar.java index 8fbf26537d4f..11e517872971 100644 --- a/sdks/java/io/amazon-web-services2/src/main/java/org/apache/beam/sdk/io/aws2/kinesis/KinesisTransformRegistrar.java +++ b/sdks/java/io/amazon-web-services2/src/main/java/org/apache/beam/sdk/io/aws2/kinesis/KinesisTransformRegistrar.java @@ -63,6 +63,7 @@ private abstract static class CrossLanguageConfiguration { String awsSecretKey; String region; @Nullable String serviceEndpoint; + boolean verifyCertificate; public void setStreamName(String streamName) { this.streamName = streamName; @@ -83,6 +84,10 @@ public void setRegion(String region) { public void setServiceEndpoint(@Nullable String serviceEndpoint) { this.serviceEndpoint = serviceEndpoint; } + + public void setVerifyCertificate(@Nullable Boolean verifyCertificate) { + this.verifyCertificate = verifyCertificate == null || verifyCertificate; + } } public static class WriteBuilder @@ -123,6 +128,7 @@ public PTransform, KinesisIO.Write.Result> buildExternal( .credentialsProvider(provider) .region(Region.of(configuration.region)) .endpoint(endpoint) + .skipCertificateVerification(!configuration.verifyCertificate) .build()) .withPartitioner(p -> pk) .withSerializer(serializer); @@ -233,6 +239,7 @@ public PTransform> buildExternal( .credentialsProvider(provider) .region(Region.of(configuration.region)) .endpoint(endpoint) + .skipCertificateVerification(!configuration.verifyCertificate) .build()); if (configuration.maxNumRecords != null) { diff --git a/sdks/python/apache_beam/io/external/xlang_kinesisio_it_test.py b/sdks/python/apache_beam/io/external/xlang_kinesisio_it_test.py index d37ce1f87fe3..df0926341a5e 100644 --- a/sdks/python/apache_beam/io/external/xlang_kinesisio_it_test.py +++ b/sdks/python/apache_beam/io/external/xlang_kinesisio_it_test.py @@ -129,6 +129,7 @@ def run_kinesis_read(self): aws_secret_key=self.aws_secret_key, region=self.aws_region, service_endpoint=self.aws_service_endpoint, + verify_certificate=not self.use_localstack, max_num_records=NUM_RECORDS, max_read_time=MAX_READ_TIME, request_records_limit=REQUEST_RECORDS_LIMIT, diff --git a/sdks/python/apache_beam/io/kinesis.py b/sdks/python/apache_beam/io/kinesis.py index cdae53e71c5a..94be0c44b709 100644 --- a/sdks/python/apache_beam/io/kinesis.py +++ b/sdks/python/apache_beam/io/kinesis.py @@ -111,6 +111,7 @@ def default_io_expansion_service(): ('region', str), ('partition_key', str), ('service_endpoint', Optional[str]), + ('verify_certificate', Optional[bool]), ], ) @@ -143,26 +144,14 @@ def __init__( :param aws_secret_key: Kinesis access key secret. :param region: AWS region. Example: 'us-east-1'. :param service_endpoint: Kinesis service endpoint - :param verify_certificate: Deprecated - certificates will always be - verified. + :param verify_certificate: Enable or disable certificate verification. + Never set to False on production. True by default. :param partition_key: Specify default partition key. :param producer_properties: Specify the configuration properties for Kinesis Producer Library (KPL) as dictionary. Example: {'CollectionMaxCount': '1000', 'ConnectTimeout': '10000'} :param expansion_service: The address (host:port) of the ExpansionService. """ - if verify_certificate is False: - # Previously, we supported this via - # https://javadoc.io/doc/com.amazonaws/amazon-kinesis-producer/0.14.0/com/amazonaws/services/kinesis/producer/KinesisProducerConfiguration.html#isVerifyCertificate-- - # With the new AWS client, we no longer support it and it is always True - raise ValueError( - 'verify_certificate set to False. This option is no longer ' + - 'supported and certificate verification will still happen.') - if verify_certificate is True: - logging.warning( - 'verify_certificate set to True. This option is no longer ' + - 'supported and certificate verification will automatically ' + - 'happen. This option may be removed in a future release') if producer_properties is not None: raise ValueError( 'producer_properties is no longer supported and will be removed ' + @@ -177,6 +166,7 @@ def __init__( region=region, partition_key=partition_key, service_endpoint=service_endpoint, + verify_certificate=verify_certificate, )), expansion_service or default_io_expansion_service(), ) @@ -190,6 +180,7 @@ def __init__( ('aws_secret_key', str), ('region', str), ('service_endpoint', Optional[str]), + ('verify_certificate', Optional[bool]), ('max_num_records', Optional[int]), ('max_read_time', Optional[int]), ('initial_position_in_stream', Optional[str]), @@ -240,8 +231,8 @@ def __init__( :param aws_secret_key: Kinesis access key secret. :param region: AWS region. Example: 'us-east-1'. :param service_endpoint: Kinesis service endpoint - :param verify_certificate: Deprecated - certificates will always be - verified. + :param verify_certificate: Enable or disable certificate verification. + Never set to False on production. True by default. :param max_num_records: Specifies to read at most a given number of records. Must be greater than 0. :param max_read_time: Specifies to read records during x milliseconds. @@ -288,19 +279,6 @@ def __init__( ): logging.warning('Provided timestamp emplaced not in the past.') - if verify_certificate is False: - # Previously, we supported this via - # https://javadoc.io/doc/com.amazonaws/amazon-kinesis-producer/0.14.0/com/amazonaws/services/kinesis/producer/KinesisProducerConfiguration.html#isVerifyCertificate-- - # With the new AWS client, we no longer support it and it is always True - raise ValueError( - 'verify_certificate set to False. This option is no longer ' + - 'supported and certificate verification will still happen.') - if verify_certificate is True: - logging.warning( - 'verify_certificate set to True. This option is no longer ' + - 'supported and certificate verification will automatically ' + - 'happen. This option may be removed in a future release') - super().__init__( self.URN, NamedTupleBasedPayloadBuilder( @@ -310,6 +288,7 @@ def __init__( aws_secret_key=aws_secret_key, region=region, service_endpoint=service_endpoint, + verify_certificate=verify_certificate, max_num_records=max_num_records, max_read_time=max_read_time, initial_position_in_stream=initial_position_in_stream, From adc70adcead3c7c6b20668698ad1d7b272ec09cc Mon Sep 17 00:00:00 2001 From: Danny Mccormick Date: Mon, 23 Dec 2024 14:33:40 -0500 Subject: [PATCH 27/43] Doc + debug further --- CHANGES.md | 1 + .../beam/sdk/io/aws2/common/ClientBuilderFactory.java | 2 ++ sdks/python/apache_beam/io/kinesis.py | 6 +++--- 3 files changed, 6 insertions(+), 3 deletions(-) diff --git a/CHANGES.md b/CHANGES.md index 06b92953c662..b0dfc5af49ef 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -81,6 +81,7 @@ ## Breaking Changes * Upgraded ZetaSQL to 2024.11.1 ([#32902](https://github.com/apache/beam/pull/32902)). Java11+ is now needed if Beam's ZetaSQL component is used. +* AWS V1 I/Os have been removed (Java). As part of this, x-lang Python I/Os no longer support setting producer_properties ([#33430](https://github.com/apache/beam/issues/33430)). * X behavior was changed ([#X](https://github.com/apache/beam/issues/X)). ## Deprecations diff --git a/sdks/java/io/amazon-web-services2/src/main/java/org/apache/beam/sdk/io/aws2/common/ClientBuilderFactory.java b/sdks/java/io/amazon-web-services2/src/main/java/org/apache/beam/sdk/io/aws2/common/ClientBuilderFactory.java index 85aa99cea360..dc6825b1b815 100644 --- a/sdks/java/io/amazon-web-services2/src/main/java/org/apache/beam/sdk/io/aws2/common/ClientBuilderFactory.java +++ b/sdks/java/io/amazon-web-services2/src/main/java/org/apache/beam/sdk/io/aws2/common/ClientBuilderFactory.java @@ -184,6 +184,7 @@ public , ClientT> BuilderT if (skipCertificateVerification) { client.tlsKeyManagersProvider(NoneTlsKeyManagersProvider.getInstance()); + throw new Exception("Made it this far - probably means the tlsKeyManagersProvider is not right"); } // must use builder to make sure client is managed by the SDK @@ -212,6 +213,7 @@ public , ClientT> BuilderT if (skipCertificateVerification) { client.tlsKeyManagersProvider(NoneTlsKeyManagersProvider.getInstance()); + throw new Exception("Made it this far - probably means the tlsKeyManagersProvider is not right"); } // must use builder to make sure client is managed by the SDK diff --git a/sdks/python/apache_beam/io/kinesis.py b/sdks/python/apache_beam/io/kinesis.py index 94be0c44b709..ce0bb2623a38 100644 --- a/sdks/python/apache_beam/io/kinesis.py +++ b/sdks/python/apache_beam/io/kinesis.py @@ -147,9 +147,9 @@ def __init__( :param verify_certificate: Enable or disable certificate verification. Never set to False on production. True by default. :param partition_key: Specify default partition key. - :param producer_properties: Specify the configuration properties for Kinesis - Producer Library (KPL) as dictionary. - Example: {'CollectionMaxCount': '1000', 'ConnectTimeout': '10000'} + :param producer_properties: (Deprecated) This option no longer is available + since the AWS IOs upgraded to v2. Trying to set it will lead to an + error. For more info, see https://github.com/apache/beam/issues/33430. :param expansion_service: The address (host:port) of the ExpansionService. """ if producer_properties is not None: From 30fd6056c626e05fcee7ed5a487c3b57abe5d424 Mon Sep 17 00:00:00 2001 From: Danny Mccormick Date: Mon, 23 Dec 2024 16:30:33 -0500 Subject: [PATCH 28/43] Merge in master --- .../beam/sdk/io/aws2/common/ClientBuilderFactory.java | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/sdks/java/io/amazon-web-services2/src/main/java/org/apache/beam/sdk/io/aws2/common/ClientBuilderFactory.java b/sdks/java/io/amazon-web-services2/src/main/java/org/apache/beam/sdk/io/aws2/common/ClientBuilderFactory.java index dc6825b1b815..e0b3646f3644 100644 --- a/sdks/java/io/amazon-web-services2/src/main/java/org/apache/beam/sdk/io/aws2/common/ClientBuilderFactory.java +++ b/sdks/java/io/amazon-web-services2/src/main/java/org/apache/beam/sdk/io/aws2/common/ClientBuilderFactory.java @@ -184,7 +184,8 @@ public , ClientT> BuilderT if (skipCertificateVerification) { client.tlsKeyManagersProvider(NoneTlsKeyManagersProvider.getInstance()); - throw new Exception("Made it this far - probably means the tlsKeyManagersProvider is not right"); + throw new RuntimeException( + "Made it this far - probably means the tlsKeyManagersProvider is not right"); } // must use builder to make sure client is managed by the SDK @@ -213,7 +214,8 @@ public , ClientT> BuilderT if (skipCertificateVerification) { client.tlsKeyManagersProvider(NoneTlsKeyManagersProvider.getInstance()); - throw new Exception("Made it this far - probably means the tlsKeyManagersProvider is not right"); + throw new RuntimeException( + "Made it this far - probably means the tlsKeyManagersProvider is not right"); } // must use builder to make sure client is managed by the SDK From c8caf0c61588897d4bf72c077162f7b3a9b32fcf Mon Sep 17 00:00:00 2001 From: Danny Mccormick Date: Mon, 23 Dec 2024 19:36:20 -0500 Subject: [PATCH 29/43] Debug info --- .../apache/beam/sdk/io/aws2/common/ClientBuilderFactory.java | 1 + .../beam/sdk/io/aws2/kinesis/KinesisTransformRegistrar.java | 3 +++ 2 files changed, 4 insertions(+) diff --git a/sdks/java/io/amazon-web-services2/src/main/java/org/apache/beam/sdk/io/aws2/common/ClientBuilderFactory.java b/sdks/java/io/amazon-web-services2/src/main/java/org/apache/beam/sdk/io/aws2/common/ClientBuilderFactory.java index e0b3646f3644..d21aa7f0ec5e 100644 --- a/sdks/java/io/amazon-web-services2/src/main/java/org/apache/beam/sdk/io/aws2/common/ClientBuilderFactory.java +++ b/sdks/java/io/amazon-web-services2/src/main/java/org/apache/beam/sdk/io/aws2/common/ClientBuilderFactory.java @@ -165,6 +165,7 @@ public , ClientT> BuilderT boolean skipCertificateVerification = false; if (config.skipCertificateVerification() != null) { skipCertificateVerification = config.skipCertificateVerification(); + throw new RuntimeException("config was non-null " + skipCertificateVerification); } if (proxyConfig != null || httpConfig != null || skipCertificateVerification) { if (builder instanceof SdkSyncClientBuilder) { diff --git a/sdks/java/io/amazon-web-services2/src/main/java/org/apache/beam/sdk/io/aws2/kinesis/KinesisTransformRegistrar.java b/sdks/java/io/amazon-web-services2/src/main/java/org/apache/beam/sdk/io/aws2/kinesis/KinesisTransformRegistrar.java index 11e517872971..2b5d628e3121 100644 --- a/sdks/java/io/amazon-web-services2/src/main/java/org/apache/beam/sdk/io/aws2/kinesis/KinesisTransformRegistrar.java +++ b/sdks/java/io/amazon-web-services2/src/main/java/org/apache/beam/sdk/io/aws2/kinesis/KinesisTransformRegistrar.java @@ -120,6 +120,9 @@ public PTransform, KinesisIO.Write.Result> buildExternal( "Service endpoint must be URI format, got: %s", configuration.serviceEndpoint)); } } + if (configuration.verifyCertificate) { + throw new RuntimeException("This is the problem"); + } KinesisIO.Write writeTransform = KinesisIO.write() .withStreamName(configuration.streamName) From 583157b86b15f07c5bbf66374070de2b500b79ac Mon Sep 17 00:00:00 2001 From: Danny Mccormick Date: Tue, 24 Dec 2024 11:18:08 -0500 Subject: [PATCH 30/43] Pass through param --- sdks/python/apache_beam/io/external/xlang_kinesisio_it_test.py | 1 + 1 file changed, 1 insertion(+) diff --git a/sdks/python/apache_beam/io/external/xlang_kinesisio_it_test.py b/sdks/python/apache_beam/io/external/xlang_kinesisio_it_test.py index df0926341a5e..e7c840c103bd 100644 --- a/sdks/python/apache_beam/io/external/xlang_kinesisio_it_test.py +++ b/sdks/python/apache_beam/io/external/xlang_kinesisio_it_test.py @@ -115,6 +115,7 @@ def run_kinesis_write(self): aws_secret_key=self.aws_secret_key, region=self.aws_region, service_endpoint=self.aws_service_endpoint, + verify_certificate=(not self.use_localstack), partition_key='1')) def run_kinesis_read(self): From e1f306d6dba566a332e634c770db0a99c3b11503 Mon Sep 17 00:00:00 2001 From: Danny Mccormick Date: Thu, 26 Dec 2024 04:40:00 -0500 Subject: [PATCH 31/43] Remove debug --- .../apache/beam/sdk/io/aws2/common/ClientBuilderFactory.java | 5 ----- .../beam/sdk/io/aws2/kinesis/KinesisTransformRegistrar.java | 3 --- 2 files changed, 8 deletions(-) diff --git a/sdks/java/io/amazon-web-services2/src/main/java/org/apache/beam/sdk/io/aws2/common/ClientBuilderFactory.java b/sdks/java/io/amazon-web-services2/src/main/java/org/apache/beam/sdk/io/aws2/common/ClientBuilderFactory.java index d21aa7f0ec5e..85aa99cea360 100644 --- a/sdks/java/io/amazon-web-services2/src/main/java/org/apache/beam/sdk/io/aws2/common/ClientBuilderFactory.java +++ b/sdks/java/io/amazon-web-services2/src/main/java/org/apache/beam/sdk/io/aws2/common/ClientBuilderFactory.java @@ -165,7 +165,6 @@ public , ClientT> BuilderT boolean skipCertificateVerification = false; if (config.skipCertificateVerification() != null) { skipCertificateVerification = config.skipCertificateVerification(); - throw new RuntimeException("config was non-null " + skipCertificateVerification); } if (proxyConfig != null || httpConfig != null || skipCertificateVerification) { if (builder instanceof SdkSyncClientBuilder) { @@ -185,8 +184,6 @@ public , ClientT> BuilderT if (skipCertificateVerification) { client.tlsKeyManagersProvider(NoneTlsKeyManagersProvider.getInstance()); - throw new RuntimeException( - "Made it this far - probably means the tlsKeyManagersProvider is not right"); } // must use builder to make sure client is managed by the SDK @@ -215,8 +212,6 @@ public , ClientT> BuilderT if (skipCertificateVerification) { client.tlsKeyManagersProvider(NoneTlsKeyManagersProvider.getInstance()); - throw new RuntimeException( - "Made it this far - probably means the tlsKeyManagersProvider is not right"); } // must use builder to make sure client is managed by the SDK diff --git a/sdks/java/io/amazon-web-services2/src/main/java/org/apache/beam/sdk/io/aws2/kinesis/KinesisTransformRegistrar.java b/sdks/java/io/amazon-web-services2/src/main/java/org/apache/beam/sdk/io/aws2/kinesis/KinesisTransformRegistrar.java index 2b5d628e3121..11e517872971 100644 --- a/sdks/java/io/amazon-web-services2/src/main/java/org/apache/beam/sdk/io/aws2/kinesis/KinesisTransformRegistrar.java +++ b/sdks/java/io/amazon-web-services2/src/main/java/org/apache/beam/sdk/io/aws2/kinesis/KinesisTransformRegistrar.java @@ -120,9 +120,6 @@ public PTransform, KinesisIO.Write.Result> buildExternal( "Service endpoint must be URI format, got: %s", configuration.serviceEndpoint)); } } - if (configuration.verifyCertificate) { - throw new RuntimeException("This is the problem"); - } KinesisIO.Write writeTransform = KinesisIO.write() .withStreamName(configuration.streamName) From 4bf4425e278e4fb41a6a4534888705f9d5a160ad Mon Sep 17 00:00:00 2001 From: Danny Mccormick Date: Thu, 26 Dec 2024 07:22:32 -0500 Subject: [PATCH 32/43] Remove debug --- .../beam/sdk/io/aws2/common/ClientBuilderFactory.java | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/sdks/java/io/amazon-web-services2/src/main/java/org/apache/beam/sdk/io/aws2/common/ClientBuilderFactory.java b/sdks/java/io/amazon-web-services2/src/main/java/org/apache/beam/sdk/io/aws2/common/ClientBuilderFactory.java index 85aa99cea360..55c6341f1856 100644 --- a/sdks/java/io/amazon-web-services2/src/main/java/org/apache/beam/sdk/io/aws2/common/ClientBuilderFactory.java +++ b/sdks/java/io/amazon-web-services2/src/main/java/org/apache/beam/sdk/io/aws2/common/ClientBuilderFactory.java @@ -184,6 +184,8 @@ public , ClientT> BuilderT if (skipCertificateVerification) { client.tlsKeyManagersProvider(NoneTlsKeyManagersProvider.getInstance()); + throw new RuntimeException( + "Made it this far (SdkSyncClientBuilder) - probably means the tlsKeyManagersProvider is not right"); } // must use builder to make sure client is managed by the SDK @@ -212,10 +214,14 @@ public , ClientT> BuilderT if (skipCertificateVerification) { client.tlsKeyManagersProvider(NoneTlsKeyManagersProvider.getInstance()); + throw new RuntimeException( + "Made it this far (SdkAsyncClientBuilder) - probably means the tlsKeyManagersProvider is not right"); } // must use builder to make sure client is managed by the SDK ((SdkAsyncClientBuilder) builder).httpClientBuilder(client); + } else { + throw new RuntimeException("Surprising builder type: " + builder.toString()); } } return builder; From 3822660b58fa0b154246ea28c5d440c0f5b5a075 Mon Sep 17 00:00:00 2001 From: Danny Mccormick Date: Thu, 26 Dec 2024 08:51:42 -0500 Subject: [PATCH 33/43] override trust manager --- .../io/aws2/common/ClientBuilderFactory.java | 39 ++++++++++++++++--- 1 file changed, 33 insertions(+), 6 deletions(-) diff --git a/sdks/java/io/amazon-web-services2/src/main/java/org/apache/beam/sdk/io/aws2/common/ClientBuilderFactory.java b/sdks/java/io/amazon-web-services2/src/main/java/org/apache/beam/sdk/io/aws2/common/ClientBuilderFactory.java index 55c6341f1856..eb8cbbe6c536 100644 --- a/sdks/java/io/amazon-web-services2/src/main/java/org/apache/beam/sdk/io/aws2/common/ClientBuilderFactory.java +++ b/sdks/java/io/amazon-web-services2/src/main/java/org/apache/beam/sdk/io/aws2/common/ClientBuilderFactory.java @@ -24,10 +24,14 @@ import java.io.Serializable; import java.net.URI; +import java.security.cert.CertificateException; +import java.security.cert.X509Certificate; import java.time.Duration; import java.util.function.Consumer; import java.util.function.Function; import javax.annotation.Nullable; +import javax.net.ssl.TrustManager; +import javax.net.ssl.X509TrustManager; import org.apache.beam.sdk.io.aws2.options.AwsOptions; import org.apache.beam.sdk.util.InstanceBuilder; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting; @@ -37,6 +41,7 @@ import software.amazon.awssdk.core.client.builder.SdkSyncClientBuilder; import software.amazon.awssdk.core.client.config.ClientOverrideConfiguration; import software.amazon.awssdk.core.retry.RetryPolicy; +import software.amazon.awssdk.http.TlsTrustManagersProvider; import software.amazon.awssdk.http.apache.ApacheHttpClient; import software.amazon.awssdk.http.apache.ProxyConfiguration; import software.amazon.awssdk.http.nio.netty.NettyNioAsyncHttpClient; @@ -114,6 +119,32 @@ static , ClientT> ClientT b return ClientBuilderFactory.getFactory(options).create(builder, config, options).build(); } + /** Trust provider to skip certificate verification. Should only be used for test pipelines. */ + public class SkipCertificateVerificationTrustManagerProvider implements TlsTrustManagersProvider { + public SkipCertificateVerificationTrustManagerProvider() {} + + @Override + public TrustManager[] trustManagers() { + TrustManager tm = + new X509TrustManager() { + @Override + public final void checkClientTrusted(X509Certificate[] x509CertificateArr, String str) + throws CertificateException {} + + @Override + public final void checkServerTrusted(X509Certificate[] x509CertificateArr, String str) + throws CertificateException {} + + @Override + public final X509Certificate[] getAcceptedIssuers() { + return new X509Certificate[0]; + } + }; + TrustManager[] tms = {tm}; + return tms; + } + } + /** * Default implementation of {@link ClientBuilderFactory}. This implementation can configure both, * synchronous clients using {@link ApacheHttpClient} as well as asynchronous clients using {@link @@ -184,8 +215,7 @@ public , ClientT> BuilderT if (skipCertificateVerification) { client.tlsKeyManagersProvider(NoneTlsKeyManagersProvider.getInstance()); - throw new RuntimeException( - "Made it this far (SdkSyncClientBuilder) - probably means the tlsKeyManagersProvider is not right"); + client.tlsTrustManagersProvider(new SkipCertificateVerificationTrustManagerProvider()); } // must use builder to make sure client is managed by the SDK @@ -214,14 +244,11 @@ public , ClientT> BuilderT if (skipCertificateVerification) { client.tlsKeyManagersProvider(NoneTlsKeyManagersProvider.getInstance()); - throw new RuntimeException( - "Made it this far (SdkAsyncClientBuilder) - probably means the tlsKeyManagersProvider is not right"); + client.tlsTrustManagersProvider(new SkipCertificateVerificationTrustManagerProvider()); } // must use builder to make sure client is managed by the SDK ((SdkAsyncClientBuilder) builder).httpClientBuilder(client); - } else { - throw new RuntimeException("Surprising builder type: " + builder.toString()); } } return builder; From 60c24805279802eac3d037193e1b14121104d19c Mon Sep 17 00:00:00 2001 From: Danny Mccormick Date: Thu, 26 Dec 2024 08:57:38 -0500 Subject: [PATCH 34/43] checkstyle --- .../beam/sdk/io/aws2/common/ClientBuilderFactory.java | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/sdks/java/io/amazon-web-services2/src/main/java/org/apache/beam/sdk/io/aws2/common/ClientBuilderFactory.java b/sdks/java/io/amazon-web-services2/src/main/java/org/apache/beam/sdk/io/aws2/common/ClientBuilderFactory.java index eb8cbbe6c536..ae97d0f8530d 100644 --- a/sdks/java/io/amazon-web-services2/src/main/java/org/apache/beam/sdk/io/aws2/common/ClientBuilderFactory.java +++ b/sdks/java/io/amazon-web-services2/src/main/java/org/apache/beam/sdk/io/aws2/common/ClientBuilderFactory.java @@ -120,7 +120,7 @@ static , ClientT> ClientT b } /** Trust provider to skip certificate verification. Should only be used for test pipelines. */ - public class SkipCertificateVerificationTrustManagerProvider implements TlsTrustManagersProvider { + class SkipCertificateVerificationTrustManagerProvider implements TlsTrustManagersProvider { public SkipCertificateVerificationTrustManagerProvider() {} @Override @@ -128,15 +128,15 @@ public TrustManager[] trustManagers() { TrustManager tm = new X509TrustManager() { @Override - public final void checkClientTrusted(X509Certificate[] x509CertificateArr, String str) + public void checkClientTrusted(X509Certificate[] x509CertificateArr, String str) throws CertificateException {} @Override - public final void checkServerTrusted(X509Certificate[] x509CertificateArr, String str) + public void checkServerTrusted(X509Certificate[] x509CertificateArr, String str) throws CertificateException {} @Override - public final X509Certificate[] getAcceptedIssuers() { + public X509Certificate[] getAcceptedIssuers() { return new X509Certificate[0]; } }; From 1a00a4def6c0933943517871b74a01411365ce26 Mon Sep 17 00:00:00 2001 From: Danny Mccormick Date: Thu, 26 Dec 2024 15:25:39 -0500 Subject: [PATCH 35/43] Try disabling aggregation --- .../beam/sdk/io/aws2/kinesis/KinesisTransformRegistrar.java | 2 ++ 1 file changed, 2 insertions(+) diff --git a/sdks/java/io/amazon-web-services2/src/main/java/org/apache/beam/sdk/io/aws2/kinesis/KinesisTransformRegistrar.java b/sdks/java/io/amazon-web-services2/src/main/java/org/apache/beam/sdk/io/aws2/kinesis/KinesisTransformRegistrar.java index 11e517872971..51d4202e4027 100644 --- a/sdks/java/io/amazon-web-services2/src/main/java/org/apache/beam/sdk/io/aws2/kinesis/KinesisTransformRegistrar.java +++ b/sdks/java/io/amazon-web-services2/src/main/java/org/apache/beam/sdk/io/aws2/kinesis/KinesisTransformRegistrar.java @@ -23,6 +23,7 @@ import java.util.Map; import org.apache.beam.sdk.expansion.ExternalTransformRegistrar; import org.apache.beam.sdk.io.aws2.common.ClientConfiguration; +import org.apache.beam.sdk.io.aws2.kinesis.KinesisIO; import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.ExternalTransformBuilder; import org.apache.beam.sdk.transforms.PTransform; @@ -131,6 +132,7 @@ public PTransform, KinesisIO.Write.Result> buildExternal( .skipCertificateVerification(!configuration.verifyCertificate) .build()) .withPartitioner(p -> pk) + .withRecordAggregationDisabled() .withSerializer(serializer); return writeTransform; From f9e617a6e798975055b12975f75adfae984d0d5a Mon Sep 17 00:00:00 2001 From: Danny Mccormick Date: Thu, 26 Dec 2024 16:29:10 -0500 Subject: [PATCH 36/43] easier debugging --- build.gradle.kts | 30 +++++++++---------- .../python/test-suites/portable/common.gradle | 17 ++++++----- 2 files changed, 24 insertions(+), 23 deletions(-) diff --git a/build.gradle.kts b/build.gradle.kts index 0adb29058479..af8db249034c 100644 --- a/build.gradle.kts +++ b/build.gradle.kts @@ -502,40 +502,40 @@ tasks.register("pythonFormatterPreCommit") { } tasks.register("python39PostCommit") { - dependsOn(":sdks:python:test-suites:dataflow:py39:postCommitIT") - dependsOn(":sdks:python:test-suites:direct:py39:postCommitIT") - dependsOn(":sdks:python:test-suites:direct:py39:hdfsIntegrationTest") - dependsOn(":sdks:python:test-suites:direct:py39:azureIntegrationTest") + // dependsOn(":sdks:python:test-suites:dataflow:py39:postCommitIT") + // dependsOn(":sdks:python:test-suites:direct:py39:postCommitIT") + // dependsOn(":sdks:python:test-suites:direct:py39:hdfsIntegrationTest") + // dependsOn(":sdks:python:test-suites:direct:py39:azureIntegrationTest") dependsOn(":sdks:python:test-suites:portable:py39:postCommitPy39") // TODO (https://github.com/apache/beam/issues/23966) // Move this to Python 3.10 test suite once tfx-bsl has python 3.10 wheel. - dependsOn(":sdks:python:test-suites:direct:py39:inferencePostCommitIT") + // dependsOn(":sdks:python:test-suites:direct:py39:inferencePostCommitIT") } tasks.register("python310PostCommit") { - dependsOn(":sdks:python:test-suites:dataflow:py310:postCommitIT") - dependsOn(":sdks:python:test-suites:direct:py310:postCommitIT") + // dependsOn(":sdks:python:test-suites:dataflow:py310:postCommitIT") + // dependsOn(":sdks:python:test-suites:direct:py310:postCommitIT") dependsOn(":sdks:python:test-suites:portable:py310:postCommitPy310") // TODO: https://github.com/apache/beam/issues/22651 // The default container uses Python 3.10. The goal here is to // duild Docker images for TensorRT tests during run time for python versions // other than 3.10 and add these tests in other python postcommit suites. - dependsOn(":sdks:python:test-suites:dataflow:py310:inferencePostCommitIT") + // dependsOn(":sdks:python:test-suites:dataflow:py310:inferencePostCommitIT") } tasks.register("python311PostCommit") { - dependsOn(":sdks:python:test-suites:dataflow:py311:postCommitIT") - dependsOn(":sdks:python:test-suites:direct:py311:postCommitIT") - dependsOn(":sdks:python:test-suites:direct:py311:hdfsIntegrationTest") + // dependsOn(":sdks:python:test-suites:dataflow:py311:postCommitIT") + // dependsOn(":sdks:python:test-suites:direct:py311:postCommitIT") + // dependsOn(":sdks:python:test-suites:direct:py311:hdfsIntegrationTest") dependsOn(":sdks:python:test-suites:portable:py311:postCommitPy311") } tasks.register("python312PostCommit") { - dependsOn(":sdks:python:test-suites:dataflow:py312:postCommitIT") - dependsOn(":sdks:python:test-suites:direct:py312:postCommitIT") - dependsOn(":sdks:python:test-suites:direct:py312:hdfsIntegrationTest") + // dependsOn(":sdks:python:test-suites:dataflow:py312:postCommitIT") + // dependsOn(":sdks:python:test-suites:direct:py312:postCommitIT") + // dependsOn(":sdks:python:test-suites:direct:py312:hdfsIntegrationTest") dependsOn(":sdks:python:test-suites:portable:py312:postCommitPy312") - dependsOn(":sdks:python:test-suites:dataflow:py312:inferencePostCommitITPy312") + // dependsOn(":sdks:python:test-suites:dataflow:py312:inferencePostCommitITPy312") } tasks.register("portablePythonPreCommit") { diff --git a/sdks/python/test-suites/portable/common.gradle b/sdks/python/test-suites/portable/common.gradle index 99b477b2c7db..f5d5b7bbaad3 100644 --- a/sdks/python/test-suites/portable/common.gradle +++ b/sdks/python/test-suites/portable/common.gradle @@ -262,10 +262,11 @@ project.tasks.register("preCommitPy${pythonVersionSuffix}") { project.tasks.register("postCommitPy${pythonVersionSuffix}") { dependsOn = ['setupVirtualenv', "postCommitPy${pythonVersionSuffix}IT", - ':runners:spark:3:job-server:shadowJar', - 'portableLocalRunnerJuliaSetWithSetupPy', - 'portableWordCountSparkRunnerBatch', - 'portableLocalRunnerTestWithRequirementsFile'] + // ':runners:spark:3:job-server:shadowJar', + // 'portableLocalRunnerJuliaSetWithSetupPy', + // 'portableWordCountSparkRunnerBatch', + // 'portableLocalRunnerTestWithRequirementsFile' + ] } project.tasks.register("flinkExamples") { @@ -383,11 +384,11 @@ project.tasks.register("postCommitPy${pythonVersionSuffix}IT") { doLast { def tests = [ - "apache_beam/io/gcp/bigquery_read_it_test.py", - "apache_beam/io/external/xlang_jdbcio_it_test.py", - "apache_beam/io/external/xlang_kafkaio_it_test.py", + // "apache_beam/io/gcp/bigquery_read_it_test.py", + // "apache_beam/io/external/xlang_jdbcio_it_test.py", + // "apache_beam/io/external/xlang_kafkaio_it_test.py", "apache_beam/io/external/xlang_kinesisio_it_test.py", - "apache_beam/io/external/xlang_debeziumio_it_test.py", + // "apache_beam/io/external/xlang_debeziumio_it_test.py", ] def testOpts = ["${tests.join(' ')}"] + ["--log-cli-level=INFO"] def pipelineOpts = [ From ad6ad032f0e57e834524aa37cf27bc0a99ae1ba2 Mon Sep 17 00:00:00 2001 From: Danny Mccormick Date: Fri, 27 Dec 2024 06:29:17 -0500 Subject: [PATCH 37/43] Try upgrading localstack --- sdks/python/apache_beam/io/external/xlang_kinesisio_it_test.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sdks/python/apache_beam/io/external/xlang_kinesisio_it_test.py b/sdks/python/apache_beam/io/external/xlang_kinesisio_it_test.py index e7c840c103bd..94cb7ceae75a 100644 --- a/sdks/python/apache_beam/io/external/xlang_kinesisio_it_test.py +++ b/sdks/python/apache_beam/io/external/xlang_kinesisio_it_test.py @@ -64,7 +64,7 @@ DockerContainer = None # pylint: enable=wrong-import-order, wrong-import-position, ungrouped-imports -LOCALSTACK_VERSION = '0.11.3' +LOCALSTACK_VERSION = '3.8.1' NUM_RECORDS = 10 MAX_READ_TIME = 5 * 60 * 1000 # 5min NOW_SECONDS = time.time() From 1b1fc38422dfec1d4fcd75d4d7d7655ca7788f0e Mon Sep 17 00:00:00 2001 From: Danny Mccormick Date: Fri, 27 Dec 2024 08:09:14 -0500 Subject: [PATCH 38/43] change how containers are started --- .../io/external/xlang_kinesisio_it_test.py | 11 +++++------ 1 file changed, 5 insertions(+), 6 deletions(-) diff --git a/sdks/python/apache_beam/io/external/xlang_kinesisio_it_test.py b/sdks/python/apache_beam/io/external/xlang_kinesisio_it_test.py index 94cb7ceae75a..9e0a4dfe57a6 100644 --- a/sdks/python/apache_beam/io/external/xlang_kinesisio_it_test.py +++ b/sdks/python/apache_beam/io/external/xlang_kinesisio_it_test.py @@ -143,12 +143,11 @@ def run_kinesis_read(self): def set_localstack(self): self.localstack = DockerContainer('localstack/localstack:{}' - .format(LOCALSTACK_VERSION))\ - .with_env('SERVICES', 'kinesis')\ - .with_env('KINESIS_PORT', '4568')\ - .with_env('USE_SSL', 'true')\ - .with_exposed_ports(4568)\ - .with_volume_mapping('/var/run/docker.sock', '/var/run/docker.sock', 'rw') + .format(LOCALSTACK_VERSION))\ + .with_bind_ports(4566, 4566) + + for i in range(4510, 4560): + self.localstack = self.localstack.with_bind_ports(i, i) # Repeat if ReadTimeout is raised. for i in range(4): From 236dfcbd5a4d0da31d3bfd40509b0b6a01eb1bb4 Mon Sep 17 00:00:00 2001 From: Danny Mccormick Date: Fri, 27 Dec 2024 08:23:40 -0500 Subject: [PATCH 39/43] change how containers are started --- sdks/python/apache_beam/io/external/xlang_kinesisio_it_test.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sdks/python/apache_beam/io/external/xlang_kinesisio_it_test.py b/sdks/python/apache_beam/io/external/xlang_kinesisio_it_test.py index 9e0a4dfe57a6..c9181fb2a721 100644 --- a/sdks/python/apache_beam/io/external/xlang_kinesisio_it_test.py +++ b/sdks/python/apache_beam/io/external/xlang_kinesisio_it_test.py @@ -161,7 +161,7 @@ def set_localstack(self): self.aws_service_endpoint = 'https://{}:{}'.format( self.localstack.get_container_host_ip(), - self.localstack.get_exposed_port('4568'), + self.localstack.get_exposed_port('4566'), ) def setUp(self): From 8822b869ae888f44fe03cd571020c3123ec5292a Mon Sep 17 00:00:00 2001 From: Danny Mccormick Date: Fri, 27 Dec 2024 08:50:42 -0500 Subject: [PATCH 40/43] force http1 --- .../apache/beam/sdk/io/aws2/common/ClientBuilderFactory.java | 2 ++ 1 file changed, 2 insertions(+) diff --git a/sdks/java/io/amazon-web-services2/src/main/java/org/apache/beam/sdk/io/aws2/common/ClientBuilderFactory.java b/sdks/java/io/amazon-web-services2/src/main/java/org/apache/beam/sdk/io/aws2/common/ClientBuilderFactory.java index ae97d0f8530d..8d8531ce5cdf 100644 --- a/sdks/java/io/amazon-web-services2/src/main/java/org/apache/beam/sdk/io/aws2/common/ClientBuilderFactory.java +++ b/sdks/java/io/amazon-web-services2/src/main/java/org/apache/beam/sdk/io/aws2/common/ClientBuilderFactory.java @@ -41,6 +41,7 @@ import software.amazon.awssdk.core.client.builder.SdkSyncClientBuilder; import software.amazon.awssdk.core.client.config.ClientOverrideConfiguration; import software.amazon.awssdk.core.retry.RetryPolicy; +import software.amazon.awssdk.http.Protocol; import software.amazon.awssdk.http.TlsTrustManagersProvider; import software.amazon.awssdk.http.apache.ApacheHttpClient; import software.amazon.awssdk.http.apache.ProxyConfiguration; @@ -245,6 +246,7 @@ public , ClientT> BuilderT if (skipCertificateVerification) { client.tlsKeyManagersProvider(NoneTlsKeyManagersProvider.getInstance()); client.tlsTrustManagersProvider(new SkipCertificateVerificationTrustManagerProvider()); + client.protocol(Protocol.HTTP1_1); } // must use builder to make sure client is managed by the SDK From 71f91604da6d235da14ee96149ee8ebe1f984ad8 Mon Sep 17 00:00:00 2001 From: Danny Mccormick Date: Fri, 27 Dec 2024 09:14:43 -0500 Subject: [PATCH 41/43] Add back all tests --- build.gradle.kts | 30 +++++++++---------- .../python/test-suites/portable/common.gradle | 16 +++++----- 2 files changed, 23 insertions(+), 23 deletions(-) diff --git a/build.gradle.kts b/build.gradle.kts index af8db249034c..0adb29058479 100644 --- a/build.gradle.kts +++ b/build.gradle.kts @@ -502,40 +502,40 @@ tasks.register("pythonFormatterPreCommit") { } tasks.register("python39PostCommit") { - // dependsOn(":sdks:python:test-suites:dataflow:py39:postCommitIT") - // dependsOn(":sdks:python:test-suites:direct:py39:postCommitIT") - // dependsOn(":sdks:python:test-suites:direct:py39:hdfsIntegrationTest") - // dependsOn(":sdks:python:test-suites:direct:py39:azureIntegrationTest") + dependsOn(":sdks:python:test-suites:dataflow:py39:postCommitIT") + dependsOn(":sdks:python:test-suites:direct:py39:postCommitIT") + dependsOn(":sdks:python:test-suites:direct:py39:hdfsIntegrationTest") + dependsOn(":sdks:python:test-suites:direct:py39:azureIntegrationTest") dependsOn(":sdks:python:test-suites:portable:py39:postCommitPy39") // TODO (https://github.com/apache/beam/issues/23966) // Move this to Python 3.10 test suite once tfx-bsl has python 3.10 wheel. - // dependsOn(":sdks:python:test-suites:direct:py39:inferencePostCommitIT") + dependsOn(":sdks:python:test-suites:direct:py39:inferencePostCommitIT") } tasks.register("python310PostCommit") { - // dependsOn(":sdks:python:test-suites:dataflow:py310:postCommitIT") - // dependsOn(":sdks:python:test-suites:direct:py310:postCommitIT") + dependsOn(":sdks:python:test-suites:dataflow:py310:postCommitIT") + dependsOn(":sdks:python:test-suites:direct:py310:postCommitIT") dependsOn(":sdks:python:test-suites:portable:py310:postCommitPy310") // TODO: https://github.com/apache/beam/issues/22651 // The default container uses Python 3.10. The goal here is to // duild Docker images for TensorRT tests during run time for python versions // other than 3.10 and add these tests in other python postcommit suites. - // dependsOn(":sdks:python:test-suites:dataflow:py310:inferencePostCommitIT") + dependsOn(":sdks:python:test-suites:dataflow:py310:inferencePostCommitIT") } tasks.register("python311PostCommit") { - // dependsOn(":sdks:python:test-suites:dataflow:py311:postCommitIT") - // dependsOn(":sdks:python:test-suites:direct:py311:postCommitIT") - // dependsOn(":sdks:python:test-suites:direct:py311:hdfsIntegrationTest") + dependsOn(":sdks:python:test-suites:dataflow:py311:postCommitIT") + dependsOn(":sdks:python:test-suites:direct:py311:postCommitIT") + dependsOn(":sdks:python:test-suites:direct:py311:hdfsIntegrationTest") dependsOn(":sdks:python:test-suites:portable:py311:postCommitPy311") } tasks.register("python312PostCommit") { - // dependsOn(":sdks:python:test-suites:dataflow:py312:postCommitIT") - // dependsOn(":sdks:python:test-suites:direct:py312:postCommitIT") - // dependsOn(":sdks:python:test-suites:direct:py312:hdfsIntegrationTest") + dependsOn(":sdks:python:test-suites:dataflow:py312:postCommitIT") + dependsOn(":sdks:python:test-suites:direct:py312:postCommitIT") + dependsOn(":sdks:python:test-suites:direct:py312:hdfsIntegrationTest") dependsOn(":sdks:python:test-suites:portable:py312:postCommitPy312") - // dependsOn(":sdks:python:test-suites:dataflow:py312:inferencePostCommitITPy312") + dependsOn(":sdks:python:test-suites:dataflow:py312:inferencePostCommitITPy312") } tasks.register("portablePythonPreCommit") { diff --git a/sdks/python/test-suites/portable/common.gradle b/sdks/python/test-suites/portable/common.gradle index f5d5b7bbaad3..2d216a01f320 100644 --- a/sdks/python/test-suites/portable/common.gradle +++ b/sdks/python/test-suites/portable/common.gradle @@ -262,10 +262,10 @@ project.tasks.register("preCommitPy${pythonVersionSuffix}") { project.tasks.register("postCommitPy${pythonVersionSuffix}") { dependsOn = ['setupVirtualenv', "postCommitPy${pythonVersionSuffix}IT", - // ':runners:spark:3:job-server:shadowJar', - // 'portableLocalRunnerJuliaSetWithSetupPy', - // 'portableWordCountSparkRunnerBatch', - // 'portableLocalRunnerTestWithRequirementsFile' + ':runners:spark:3:job-server:shadowJar', + 'portableLocalRunnerJuliaSetWithSetupPy', + 'portableWordCountSparkRunnerBatch', + 'portableLocalRunnerTestWithRequirementsFile' ] } @@ -384,11 +384,11 @@ project.tasks.register("postCommitPy${pythonVersionSuffix}IT") { doLast { def tests = [ - // "apache_beam/io/gcp/bigquery_read_it_test.py", - // "apache_beam/io/external/xlang_jdbcio_it_test.py", - // "apache_beam/io/external/xlang_kafkaio_it_test.py", + "apache_beam/io/gcp/bigquery_read_it_test.py", + "apache_beam/io/external/xlang_jdbcio_it_test.py", + "apache_beam/io/external/xlang_kafkaio_it_test.py", "apache_beam/io/external/xlang_kinesisio_it_test.py", - // "apache_beam/io/external/xlang_debeziumio_it_test.py", + "apache_beam/io/external/xlang_debeziumio_it_test.py", ] def testOpts = ["${tests.join(' ')}"] + ["--log-cli-level=INFO"] def pipelineOpts = [ From 8268fb1dc29d003b2cf117132d3e6ec1b76f0139 Mon Sep 17 00:00:00 2001 From: Danny Mccormick Date: Fri, 27 Dec 2024 11:24:04 -0500 Subject: [PATCH 42/43] Update changes wording --- CHANGES.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/CHANGES.md b/CHANGES.md index d8746ce7b26c..75f9548f88c2 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -70,7 +70,7 @@ ## Breaking Changes -* AWS V1 I/Os have been removed (Java). As part of this, x-lang Python I/Os no longer support setting producer_properties ([#33430](https://github.com/apache/beam/issues/33430)). +* AWS V1 I/Os have been removed (Java). As part of this, x-lang Python I/Os have been updated to consume the V2 IOs and they also no longer support setting producer_properties ([#33430](https://github.com/apache/beam/issues/33430)). ## Deprecations From f95fa7a53e71c8c88e524aa690d7e23d228a650a Mon Sep 17 00:00:00 2001 From: Danny McCormick Date: Fri, 27 Dec 2024 11:31:26 -0500 Subject: [PATCH 43/43] Better change description --- CHANGES.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/CHANGES.md b/CHANGES.md index 75f9548f88c2..d5cbb76fb3d5 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -70,7 +70,7 @@ ## Breaking Changes -* AWS V1 I/Os have been removed (Java). As part of this, x-lang Python I/Os have been updated to consume the V2 IOs and they also no longer support setting producer_properties ([#33430](https://github.com/apache/beam/issues/33430)). +* AWS V1 I/Os have been removed (Java). As part of this, x-lang Python Kinesis I/O has been updated to consume the V2 IO and it also no longer supports setting producer_properties ([#33430](https://github.com/apache/beam/issues/33430)). ## Deprecations