diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileSystems.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileSystems.java index 7e2940a2c35b..2edc5c7ba45b 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileSystems.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileSystems.java @@ -78,6 +78,8 @@ public class FileSystems { private static final Pattern FILE_SCHEME_PATTERN = Pattern.compile("(?[a-zA-Z][-a-zA-Z0-9+.]*):/.*"); private static final Pattern GLOB_PATTERN = Pattern.compile("[*?{}]"); + private static final Pattern ESCAPED_GLOB_PATTERN = Pattern.compile("\\\\[*?{}]"); + private static final String GLOB_ESCAPE_PREFIX = "\\"; private static final AtomicReference> FILESYSTEM_REVISION = new AtomicReference<>(); @@ -92,6 +94,40 @@ public static boolean hasGlobWildcard(String spec) { return GLOB_PATTERN.matcher(spec).find(); } + /** + * Escapes glob wildcard characters in the given spec so they are treated as literals. + * + *

This method escapes the characters '*', '?', '{', and '}' by prefixing them with a + * backslash, allowing them to be treated as literal characters in a file path rather than as + * glob wildcards. + * + *

Example: {@code escapeGlobWildcards("file*.txt")} returns {@code "file\\*.txt"} + * + * @param spec the file path specification to escape + * @return the escaped specification + */ + public static String escapeGlobWildcards(String spec) { + checkNotNull(spec, "spec cannot be null"); + return spec.replaceAll("([*?{}])", "\\\\$1"); + } + + /** + * Unescapes glob wildcard characters in the given spec that were previously escaped with {@link + * #escapeGlobWildcards(String)}. + * + *

This method removes the backslash prefix from escaped glob characters ('*', '?', '{', '}'), + * restoring them to their unescaped form. + * + *

Example: {@code unescapeGlobWildcards("file\\*.txt")} returns {@code "file*.txt"} + * + * @param spec the file path specification to unescape + * @return the unescaped specification + */ + public static String unescapeGlobWildcards(String spec) { + checkNotNull(spec, "spec cannot be null"); + return spec.replaceAll("\\\\([*?{}])", "$1"); + } + /** * This is the entry point to convert user-provided specs to {@link ResourceId ResourceIds}. * Callers should use {@link #match} to resolve users specs ambiguities before calling other diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/FileSystemsTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/FileSystemsTest.java index 34567309c7d0..a513c1b1f462 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/FileSystemsTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/FileSystemsTest.java @@ -354,6 +354,56 @@ private void createFileWithContent(Path path, String content) throws Exception { } } + @Test + public void testEscapeGlobWildcards() { + // Test escaping asterisk + assertEquals("file\\*.txt", FileSystems.escapeGlobWildcards("file*.txt")); + + // Test escaping question mark + assertEquals("file\\?.txt", FileSystems.escapeGlobWildcards("file?.txt")); + + // Test escaping braces + assertEquals("file\\{1,2\\}.txt", FileSystems.escapeGlobWildcards("file{1,2}.txt")); + + // Test escaping multiple characters + assertEquals("\\*\\?\\{\\}.txt", FileSystems.escapeGlobWildcards("*?{}.txt")); + + // Test string with no glob characters + assertEquals("file.txt", FileSystems.escapeGlobWildcards("file.txt")); + + // Test empty string + assertEquals("", FileSystems.escapeGlobWildcards("")); + } + + @Test + public void testUnescapeGlobWildcards() { + // Test unescaping asterisk + assertEquals("file*.txt", FileSystems.unescapeGlobWildcards("file\\*.txt")); + + // Test unescaping question mark + assertEquals("file?.txt", FileSystems.unescapeGlobWildcards("file\\?.txt")); + + // Test unescaping braces + assertEquals("file{1,2}.txt", FileSystems.unescapeGlobWildcards("file\\{1,2\\}.txt")); + + // Test unescaping multiple characters + assertEquals("*?{}.txt", FileSystems.unescapeGlobWildcards("\\*\\?\\{\\}.txt")); + + // Test string with no escaped characters + assertEquals("file.txt", FileSystems.unescapeGlobWildcards("file.txt")); + + // Test empty string + assertEquals("", FileSystems.unescapeGlobWildcards("")); + } + + @Test + public void testEscapeUnescapeRoundTrip() { + String original = "file*test?.txt"; + String escaped = FileSystems.escapeGlobWildcards(original); + String unescaped = FileSystems.unescapeGlobWildcards(escaped); + assertEquals(original, unescaped); + } + private LocalResourceId toLocalResourceId(String str) throws Exception { boolean isDirectory; if (SystemUtils.IS_OS_WINDOWS) { diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java index 7aef1bd1ce02..7589270c3646 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java @@ -3330,8 +3330,9 @@ public Write withLoadJobProjectId(ValueProvider loadJobProjectId) { /** * Choose the frequency at which file writes are triggered. * - *

This is only applicable when the write method is set to {@link Method#FILE_LOADS} or - * {@link Method#STORAGE_WRITE_API}, and only when writing an unbounded {@link PCollection}. + *

This is only applicable when the write method is set to {@link Method#FILE_LOADS}, {@link + * Method#STORAGE_WRITE_API}, or {@link Method#STORAGE_API_AT_LEAST_ONCE}, and only when writing + * an unbounded {@link PCollection}. * *

Every triggeringFrequency duration, a BigQuery load job will be generated for all the data * written since the last load job. BigQuery has limits on how many load jobs can be triggered @@ -3736,19 +3737,22 @@ public WriteResult expand(PCollection input) { BigQueryOptions bqOptions = input.getPipeline().getOptions().as(BigQueryOptions.class); Write.Method method = resolveMethod(input); if (input.isBounded() == IsBounded.UNBOUNDED) { - if (method == Write.Method.FILE_LOADS || method == Write.Method.STORAGE_WRITE_API) { + if (method == Write.Method.FILE_LOADS + || method == Write.Method.STORAGE_WRITE_API + || method == Write.Method.STORAGE_API_AT_LEAST_ONCE) { Duration triggeringFrequency = - (method == Write.Method.STORAGE_WRITE_API) + (method == Write.Method.STORAGE_WRITE_API + || method == Write.Method.STORAGE_API_AT_LEAST_ONCE) ? getStorageApiTriggeringFrequency(bqOptions) : getTriggeringFrequency(); checkArgument( triggeringFrequency != null, - "When writing an unbounded PCollection via FILE_LOADS or STORAGE_WRITE_API, " + "When writing an unbounded PCollection via FILE_LOADS, STORAGE_WRITE_API, or STORAGE_API_AT_LEAST_ONCE, " + "triggering frequency must be specified"); } else { checkArgument( getTriggeringFrequency() == null, - "Triggering frequency can be specified only when writing via FILE_LOADS or STORAGE_WRITE_API, but the method was %s.", + "Triggering frequency can be specified only when writing via FILE_LOADS, STORAGE_WRITE_API, or STORAGE_API_AT_LEAST_ONCE, but the method was %s.", method); } if (method != Method.FILE_LOADS) { @@ -3757,13 +3761,7 @@ public WriteResult expand(PCollection input) { "Number of file shards can be specified only when writing via FILE_LOADS, but the method was %s.", method); } - if (method == Method.STORAGE_API_AT_LEAST_ONCE - && getStorageApiTriggeringFrequency(bqOptions) != null) { - LOG.warn( - "Storage API triggering frequency option will be ignored is it can only be specified only " - + "when writing via STORAGE_WRITE_API, but the method was {}.", - method); - } + if (getAutoSharding()) { if (method == Method.STORAGE_WRITE_API && getStorageApiNumStreams(bqOptions) > 0) { LOG.warn( diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWriteUnshardedRecords.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWriteUnshardedRecords.java index f6d10b47ccf2..14f93a95fa6d 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWriteUnshardedRecords.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWriteUnshardedRecords.java @@ -219,7 +219,7 @@ public PCollectionTuple expand(PCollectionwrite() + .withAvroFormatFunction(r -> new GenericData.Record(r.getSchema())) + .to("dataset.table") + .withMethod(Method.STORAGE_API_AT_LEAST_ONCE) + .withCreateDisposition(CreateDisposition.CREATE_NEVER)); + } + + @Test + public void testStreamingWriteValidateSucceedsWithTriggeringFrequencyForStorageApiAtLeastOnce() { + assumeTrue(useStreaming); + assumeTrue(useStorageApiApproximate); // Test STORAGE_API_AT_LEAST_ONCE specifically + p.enableAbandonedNodeEnforcement(false); + + // This should not throw - STORAGE_API_AT_LEAST_ONCE with triggering frequency should be valid + p.getOptions().as(BigQueryOptions.class).setStorageWriteApiTriggeringFrequencySec(30); + p.apply(Create.empty(INPUT_RECORD_CODER)) + .setIsBoundedInternal(PCollection.IsBounded.UNBOUNDED) + .apply( + BigQueryIO.write() + .withAvroFormatFunction(r -> new GenericData.Record(r.getSchema())) + .to("dataset.table") + .withMethod(Method.STORAGE_API_AT_LEAST_ONCE) + .withCreateDisposition(CreateDisposition.CREATE_NEVER) + .withTestServices(fakeBqServices) + .withoutValidation()); + // Should validate without throwing + p.run(); + } + + @Test + public void testBoundedWriteValidateSucceedsWithoutTriggeringFrequencyForStorageApiAtLeastOnce() { + assumeTrue(!useStreaming); // Test bounded PCollection + assumeTrue(useStorageApiApproximate); // Test STORAGE_API_AT_LEAST_ONCE specifically + + // Bounded collections should not require triggering frequency even for + // STORAGE_API_AT_LEAST_ONCE + p.getOptions().as(BigQueryOptions.class).setStorageWriteApiTriggeringFrequencySec(null); + p.apply(Create.empty(INPUT_RECORD_CODER)) + .setIsBoundedInternal(PCollection.IsBounded.BOUNDED) + .apply( + BigQueryIO.write() + .withAvroFormatFunction(r -> new GenericData.Record(r.getSchema())) + .to("dataset.table") + .withMethod(Method.STORAGE_API_AT_LEAST_ONCE) + .withCreateDisposition(CreateDisposition.CREATE_NEVER) + .withTestServices(fakeBqServices) + .withoutValidation()); + // Should validate without throwing + p.run(); + } + @Test public void testBigQueryIOGetName() { assertEquals( @@ -4924,4 +4991,49 @@ public void testCustomGcsTempLocationNull() throws Exception { fakeDatasetService.getAllRows("project-id", "dataset-id", "table-id"), containsInAnyOrder(new TableRow().set("name", "a"), new TableRow().set("name", "b"))); } + + @Test + public void testCdcWithStorageWriteApiDoesNotThrowIllegalStateException() throws Exception { + // Test for issue #31422: CDC with STORAGE_WRITE_API should not throw IllegalStateException + assumeTrue(useStorageApi); + assumeTrue(!useStorageApiApproximate); // Test STORAGE_WRITE_API specifically + + TableSchema schema = + new TableSchema() + .setFields( + ImmutableList.of( + new TableFieldSchema().setName("id").setType("INTEGER"), + new TableFieldSchema().setName("name").setType("STRING"))); + + // Create a write transform with CDC enabled using RowMutationInformation + BigQueryIO.Write write = + BigQueryIO.write() + .to("project-id:dataset-id.table-id") + .withSchema(schema) + .withMethod(BigQueryIO.Write.Method.STORAGE_WRITE_API) + .withRowMutationInformationFn( + (Row row) -> { + return RowMutationInformation.of( + RowMutationInformation.MutationType.UPSERT, row.getValue("id").toString()); + }) + .withTestServices(fakeBqServices) + .withoutValidation(); + + // Create test data with CDC-style updates + Schema beamSchema = Schema.builder().addInt32Field("id").addStringField("name").build(); + + List testData = + ImmutableList.of( + Row.withSchema(beamSchema).addValues(1, "Alice").build(), + Row.withSchema(beamSchema).addValues(2, "Bob").build(), + Row.withSchema(beamSchema).addValues(1, "Alice Updated").build() // Update row with id=1 + ); + + // This should not throw an IllegalStateException + PCollection input = p.apply(Create.of(testData).withRowSchema(beamSchema)); + + WriteResult result = input.apply("WriteCdcToBQ", write); + + p.run(); // Should complete successfully without IllegalStateException + } } diff --git a/sdks/python/apache_beam/examples/snippets/snippets.py b/sdks/python/apache_beam/examples/snippets/snippets.py index f6bf5e5d44ec..c3d8fc25bcac 100644 --- a/sdks/python/apache_beam/examples/snippets/snippets.py +++ b/sdks/python/apache_beam/examples/snippets/snippets.py @@ -1451,6 +1451,73 @@ def cross_join(left, rights): return pipeline, result +def side_input_slow_update_global_window(): + # [START SideInputPatternSlowUpdateGlobalWindowSnip1] + import apache_beam as beam + from apache_beam.transforms.periodicsequence import PeriodicSequence + from apache_beam import pvalue + from apache_beam.transforms import window + from apache_beam.transforms.trigger import Repeatedly, AfterProcessingTime + from apache_beam.utils.timestamp import MAX_TIMESTAMP + import time + import logging + + def placeholder_external_service_read_test_data(element): + """Placeholder function that represents an external service generating test data.""" + # Replace with actual external service call + return { + 'Key_A': time.strftime('%H:%M:%S', time.localtime(time.time())) + } + + # Create pipeline + pipeline = beam.Pipeline() + + # Create a side input that updates every 5 seconds. + # View as an iterable, not singleton, so that if we happen to trigger more + # than once before Latest.Globally is computed we can handle both elements. + side_input = ( + pipeline + | 'SideInputSequence' >> PeriodicSequence( + start=time.time(), + stop=MAX_TIMESTAMP, # Run indefinitely + interval=5) # Update every 5 seconds + | 'FetchExternalData' >> beam.Map(placeholder_external_service_read_test_data) + | 'GlobalWindow' >> beam.WindowInto( + window.GlobalWindows(), + trigger=Repeatedly(AfterProcessingTime(delay=0)), + accumulation_mode=beam.transforms.trigger.AccumulationMode.DISCARDING) + | 'LatestValue' >> beam.combiners.Latest.Globally().without_defaults() + | 'ViewAsIterable' >> beam.pvalue.AsIter()) + + # Consume side input. Use PeriodicSequence for test data. + # Use a real source (like PubSubIO or KafkaIO) in production. + main_result = ( + pipeline + | 'MainSequence' >> PeriodicSequence( + start=time.time() - 1, + stop=MAX_TIMESTAMP, + interval=1) # Generate every 1 second + | 'FixedWindow' >> beam.WindowInto(window.FixedWindows(1)) + | 'SumGlobally' >> beam.CombineGlobally(sum).without_defaults() + | 'ProcessWithSideInput' >> beam.FlatMap( + lambda element, side_data: [ + { + 'main_value': element, + 'side_input_key_a': list(side_data)[0].get('Key_A', 'N/A') + if side_data else 'No side input', + 'timestamp': time.strftime('%H:%M:%S') + } + ], + side_data=side_input) + | 'LogResults' >> beam.Map( + lambda result: logging.info( + f"Value is {result['main_value']} with timestamp {result['timestamp']}, " + f"using key A from side input with time {result['side_input_key_a']}.") or result)) + # [END SideInputPatternSlowUpdateGlobalWindowSnip1] + + return pipeline, main_result + + def bigqueryio_deadletter(): # [START BigQueryIODeadLetter] diff --git a/sdks/python/apache_beam/examples/snippets/snippets_test.py b/sdks/python/apache_beam/examples/snippets/snippets_test.py index d7dd5e6af191..4f25bbd7450f 100644 --- a/sdks/python/apache_beam/examples/snippets/snippets_test.py +++ b/sdks/python/apache_beam/examples/snippets/snippets_test.py @@ -1469,6 +1469,20 @@ def test_side_input_slow_update(self): os.unlink(src_file_pattern + str(first_ts + interval * i)) +class SideInputGlobalWindowTest(unittest.TestCase): + """Tests for side input pattern with global windows.""" + def test_side_input_slow_update_global_window(self): + """Test the global window side input pattern with mock data.""" + # This test validates that the pattern can be constructed without external dependencies + try: + pipeline, result = snippets.side_input_slow_update_global_window() + self.assertIsNotNone(pipeline) + self.assertIsNotNone(result) + except ImportError as e: + # Skip test if PeriodicSequence is not available in this environment + self.skipTest(f"PeriodicSequence not available: {e}") + + class ValueProviderInfoTest(unittest.TestCase): """Tests for accessing value provider info after run.""" def test_accessing_valueprovider_info_after_run(self): diff --git a/sdks/python/apache_beam/yaml/examples/README.md b/sdks/python/apache_beam/yaml/examples/README.md index 55fd19bd8c40..2218dd686159 100644 --- a/sdks/python/apache_beam/yaml/examples/README.md +++ b/sdks/python/apache_beam/yaml/examples/README.md @@ -100,6 +100,18 @@ These examples leverage the built-in mapping transforms including `MapToFields`, `Filter` and `Explode`. More information can be found about mapping transforms [here](https://beam.apache.org/documentation/sdks/yaml-udf/). +### SQL + +Examples that demonstrate SQL transforms with various database dialect configurations: + +- [Basic SQL Transform](transforms/sql/sql_basic_example.yaml) - Simple SQL queries without special configuration +- [PostgreSQL Functions](transforms/sql/sql_postgresql_functions.yaml) - Using PostgreSQL-specific functions like SPLIT_PART +- [BigQuery Functions](transforms/sql/sql_bigquery_functions.yaml) - BigQuery syntax and functions with proper calcite_connection_properties +- [MySQL Functions](transforms/sql/sql_mysql_functions.yaml) - MySQL-specific date and string functions +- [Advanced Configuration](transforms/sql/sql_advanced_configuration.yaml) - Multiple calcite_connection_properties options + +These examples show how to use the `calcite_connection_properties` pipeline option to configure SQL transforms for different database dialects and enable dialect-specific functions and syntax. + ### IO #### Spanner diff --git a/sdks/python/apache_beam/yaml/examples/transforms/sql/README.md b/sdks/python/apache_beam/yaml/examples/transforms/sql/README.md new file mode 100644 index 000000000000..265f3e46ab62 --- /dev/null +++ b/sdks/python/apache_beam/yaml/examples/transforms/sql/README.md @@ -0,0 +1,104 @@ +# SQL Transform calcite_connection_properties Configuration Guide + +This directory contains examples demonstrating how to use `calcite_connection_properties` in Beam YAML pipelines to configure SQL transforms for different database dialects and use cases. + +## Overview + +The `calcite_connection_properties` option in pipeline options allows you to configure Apache Calcite's SQL parser and function library to support database-specific SQL syntax and functions. This is particularly useful when you need to use SQL functions or syntax that are specific to certain databases like PostgreSQL, BigQuery, MySQL, or Oracle. + +## Configuration Options + +The most commonly used `calcite_connection_properties` include: + +### Function Libraries (`fun`) +- `"standard"` - Standard SQL functions (default) +- `"postgresql"` - PostgreSQL-specific functions (e.g., SPLIT_PART, STRING_AGG) +- `"bigquery"` - BigQuery-specific functions (e.g., FORMAT_TIMESTAMP, ARRAY_TO_STRING) +- `"mysql"` - MySQL-specific functions (e.g., DATEDIFF, SUBSTRING_INDEX) +- `"oracle"` - Oracle-specific functions (e.g., NVL, SUBSTR) + +### Lexical Rules (`lex`) +- `"standard"` - Standard SQL lexical rules (default) +- `"big_query"` - BigQuery lexical rules and syntax +- `"mysql"` - MySQL lexical rules +- `"oracle"` - Oracle lexical rules + +### Other Properties +- `conformance` - SQL conformance level ("LENIENT", "STRICT", etc.) +- `caseSensitive` - Whether identifiers are case sensitive ("true"/"false") +- `quotedCasing` - How to handle quoted identifiers ("UNCHANGED", "TO_UPPER", "TO_LOWER") +- `unquotedCasing` - How to handle unquoted identifiers + +## Usage Patterns + +### Basic Configuration +```yaml +options: + calcite_connection_properties: + fun: "postgresql" +``` + +### Advanced Configuration +```yaml +options: + calcite_connection_properties: + fun: "bigquery" + lex: "big_query" + conformance: "LENIENT" + caseSensitive: "false" +``` + +## Examples in this Directory + +1. **sql_basic_example.yaml** - Basic SQL transform without special configuration +2. **sql_postgresql_functions.yaml** - Using PostgreSQL functions like SPLIT_PART +3. **sql_bigquery_functions.yaml** - BigQuery syntax and functions +4. **sql_mysql_functions.yaml** - MySQL-specific date and string functions +5. **sql_advanced_configuration.yaml** - Multiple configuration options + +## Common Use Cases + +### PostgreSQL Functions +Useful for string manipulation and array operations: +```yaml +options: + calcite_connection_properties: + fun: "postgresql" +``` + +### BigQuery Compatibility +For BigQuery-style syntax and functions: +```yaml +options: + calcite_connection_properties: + fun: "bigquery" + lex: "big_query" +``` + +### Lenient SQL Parsing +For more flexible SQL parsing: +```yaml +options: + calcite_connection_properties: + conformance: "LENIENT" +``` + +## Important Notes + +- These properties affect only the SQL parsing and function availability, not the actual data processing semantics +- Some database-specific functions may not be available depending on the Calcite version used +- Always test your SQL queries with the intended configuration before deploying to production +- The `calcite_connection_properties` must be specified in the pipeline `options` section, not in individual transform configurations + +## Troubleshooting + +If you encounter SQL parsing errors: + +1. Check that the function you're using is supported by the specified function library +2. Verify that the lexical rules (`lex`) match your SQL syntax style +3. Try using `conformance: "LENIENT"` for more flexible parsing +4. Refer to the Apache Calcite documentation for supported functions in each dialect + +For more information about Beam SQL and supported functions, see: +- [Beam SQL Documentation](https://beam.apache.org/documentation/dsls/sql/overview/) +- [Apache Calcite SQL Reference](https://calcite.apache.org/docs/reference.html) \ No newline at end of file diff --git a/sdks/python/apache_beam/yaml/examples/transforms/sql/sql_advanced_configuration.yaml b/sdks/python/apache_beam/yaml/examples/transforms/sql/sql_advanced_configuration.yaml new file mode 100644 index 000000000000..85e6348696a0 --- /dev/null +++ b/sdks/python/apache_beam/yaml/examples/transforms/sql/sql_advanced_configuration.yaml @@ -0,0 +1,87 @@ +# coding=utf-8 +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# Advanced SQL Transform Configuration Examples +# This example demonstrates multiple calcite_connection_properties and their effects. + +pipeline: + transforms: + - type: Create + name: CreateComplexData + config: + elements: + - {id: 1, name: "Product A", price: 29.99, tags: ["electronics", "gadget"], metadata: '{"brand": "TechCorp", "warranty": 12}'} + - {id: 2, name: "Product B", price: 15.50, tags: ["books", "fiction"], metadata: '{"author": "John Doe", "pages": 320}'} + - {id: 3, name: "Product C", price: 199.99, tags: ["electronics", "computer"], metadata: '{"brand": "CompuTech", "warranty": 24}'} + + # Example 1: Standard SQL with strict conformance + - type: Sql + name: StandardSQL + input: CreateComplexData + config: + query: | + SELECT + id, + name, + price, + CASE + WHEN price < 20 THEN 'Budget' + WHEN price < 100 THEN 'Mid-range' + ELSE 'Premium' + END as price_category + FROM PCOLLECTION + WHERE price > 10 + ORDER BY price + + # Example 2: Using Oracle-style functions + - type: Sql + name: OracleStyleSQL + input: CreateComplexData + config: + query: | + SELECT + id, + name, + price, + -- Oracle-style string functions + SUBSTR(name, 1, 10) as short_name, + LENGTH(name) as name_length, + NVL(name, 'Unknown') as safe_name + FROM PCOLLECTION + + - type: LogForTesting + input: StandardSQL + + - type: LogForTesting + input: OracleStyleSQL + +# Multiple calcite_connection_properties can be configured: +# - conformance: Controls SQL conformance level (LENIENT, STRICT, etc.) +# - caseSensitive: Whether identifiers are case sensitive +# - quotedCasing: How to handle quoted identifiers (UNCHANGED, TO_UPPER, TO_LOWER) +# - unquotedCasing: How to handle unquoted identifiers +# - fun: SQL function library (standard, oracle, mysql, postgresql, bigquery, etc.) +# - lex: Lexical analysis rules (standard, oracle, mysql, big_query, etc.) +options: + calcite_connection_properties: + conformance: "LENIENT" + fun: "oracle" + lex: "oracle" + caseSensitive: "false" + quotedCasing: "UNCHANGED" + unquotedCasing: "TO_UPPER" + streaming: false \ No newline at end of file diff --git a/sdks/python/apache_beam/yaml/examples/transforms/sql/sql_basic_example.yaml b/sdks/python/apache_beam/yaml/examples/transforms/sql/sql_basic_example.yaml new file mode 100644 index 000000000000..3fb5f5acafa3 --- /dev/null +++ b/sdks/python/apache_beam/yaml/examples/transforms/sql/sql_basic_example.yaml @@ -0,0 +1,50 @@ +# coding=utf-8 +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# Basic SQL Transform Example +# This example demonstrates basic SQL transform usage with default Calcite configuration. + +pipeline: + transforms: + - type: Create + name: CreateData + config: + elements: + - {id: 1, name: "Alice", age: 30, city: "Seattle"} + - {id: 2, name: "Bob", age: 25, city: "Portland"} + - {id: 3, name: "Charlie", age: 35, city: "San Francisco"} + - {id: 4, name: "Diana", age: 28, city: "Seattle"} + + - type: Sql + name: FilterAndGroup + input: CreateData + config: + query: | + SELECT + city, + COUNT(*) as person_count, + AVG(age) as avg_age + FROM PCOLLECTION + WHERE age >= 25 + GROUP BY city + ORDER BY city + + - type: LogForTesting + input: FilterAndGroup + +options: + streaming: false \ No newline at end of file diff --git a/sdks/python/apache_beam/yaml/examples/transforms/sql/sql_bigquery_functions.yaml b/sdks/python/apache_beam/yaml/examples/transforms/sql/sql_bigquery_functions.yaml new file mode 100644 index 000000000000..bab20413af77 --- /dev/null +++ b/sdks/python/apache_beam/yaml/examples/transforms/sql/sql_bigquery_functions.yaml @@ -0,0 +1,65 @@ +# coding=utf-8 +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# SQL Transform with BigQuery Functions and Syntax +# This example demonstrates using BigQuery-specific SQL syntax and functions. +# The calcite_connection_properties enable BigQuery function library and lexical rules. + +pipeline: + transforms: + - type: Create + name: CreateSalesData + config: + elements: + - {transaction_id: "txn_001", customer_id: 101, amount: 250.75, timestamp: "2024-01-15T10:30:00Z", product_categories: ["electronics", "accessories"]} + - {transaction_id: "txn_002", customer_id: 102, amount: 89.99, timestamp: "2024-01-15T11:45:00Z", product_categories: ["books", "education"]} + - {transaction_id: "txn_003", customer_id: 103, amount: 1250.00, timestamp: "2024-01-15T14:20:00Z", product_categories: ["electronics", "computers"]} + - {transaction_id: "txn_004", customer_id: 101, amount: 45.50, timestamp: "2024-01-16T09:15:00Z", product_categories: ["food", "groceries"]} + + - type: Sql + name: AnalyzeSalesData + input: CreateSalesData + config: + query: | + SELECT + customer_id, + COUNT(*) as transaction_count, + SUM(amount) as total_spent, + AVG(amount) as avg_transaction_amount, + -- BigQuery-style date/time functions + FORMAT_TIMESTAMP('%Y-%m-%d', PARSE_TIMESTAMP('%Y-%m-%dT%H:%M:%SZ', timestamp)) as transaction_date, + -- BigQuery array functions (when available) + ARRAY_TO_STRING(product_categories, ', ') as categories_str, + -- Conditional aggregation using BigQuery syntax + COUNTIF(amount > 100) as high_value_transactions, + -- BigQuery mathematical functions + ROUND(amount, 2) as rounded_amount + FROM PCOLLECTION + GROUP BY customer_id, transaction_date, categories_str, rounded_amount + ORDER BY customer_id, total_spent DESC + + - type: LogForTesting + input: AnalyzeSalesData + +# Configure Calcite to use BigQuery function library and syntax +# 'fun': 'bigquery' enables BigQuery-specific functions +# 'lex': 'big_query' enables BigQuery lexical rules and syntax +options: + calcite_connection_properties: + fun: "bigquery" + lex: "big_query" + streaming: false \ No newline at end of file diff --git a/sdks/python/apache_beam/yaml/examples/transforms/sql/sql_mysql_functions.yaml b/sdks/python/apache_beam/yaml/examples/transforms/sql/sql_mysql_functions.yaml new file mode 100644 index 000000000000..523ea41b743c --- /dev/null +++ b/sdks/python/apache_beam/yaml/examples/transforms/sql/sql_mysql_functions.yaml @@ -0,0 +1,66 @@ +# coding=utf-8 +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# SQL Transform with MySQL Functions and Syntax +# This example shows how to configure calcite_connection_properties for MySQL-specific SQL features. + +pipeline: + transforms: + - type: Create + name: CreateUserData + config: + elements: + - {user_id: 1, username: "alice_123", registration_date: "2024-01-15", last_login: "2024-01-20 14:30:15", profile_data: '{"age": 30, "city": "Seattle"}'} + - {user_id: 2, username: "bob.jones", registration_date: "2024-01-10", last_login: "2024-01-19 09:45:22", profile_data: '{"age": 25, "city": "Portland"}'} + - {user_id: 3, username: "charlie_brown", registration_date: "2024-01-05", last_login: "2024-01-21 16:20:33", profile_data: '{"age": 35, "city": "San Francisco"}'} + + - type: Sql + name: ProcessUserData + input: CreateUserData + config: + query: | + SELECT + user_id, + username, + registration_date, + last_login, + -- MySQL date/time functions + DATEDIFF(last_login, registration_date) as days_since_registration, + DATE_FORMAT(last_login, '%Y-%m-%d') as login_date, + DATE_FORMAT(last_login, '%H:%i:%s') as login_time, + -- MySQL string functions + SUBSTRING_INDEX(username, '_', 1) as username_prefix, + CASE + WHEN LOCATE('_', username) > 0 THEN 'has_underscore' + WHEN LOCATE('.', username) > 0 THEN 'has_dot' + ELSE 'alphanumeric_only' + END as username_format, + -- MySQL conditional functions + IF(DATEDIFF(CURRENT_DATE, registration_date) < 7, 'new_user', 'existing_user') as user_type + FROM PCOLLECTION + ORDER BY days_since_registration DESC + + - type: LogForTesting + input: ProcessUserData + +# Configure Calcite for MySQL SQL dialect +# This enables MySQL-specific functions like DATEDIFF, SUBSTRING_INDEX, LOCATE, etc. +options: + calcite_connection_properties: + fun: "mysql" + lex: "mysql" + streaming: false \ No newline at end of file diff --git a/sdks/python/apache_beam/yaml/examples/transforms/sql/sql_postgresql_functions.yaml b/sdks/python/apache_beam/yaml/examples/transforms/sql/sql_postgresql_functions.yaml new file mode 100644 index 000000000000..7c9dadbeb91a --- /dev/null +++ b/sdks/python/apache_beam/yaml/examples/transforms/sql/sql_postgresql_functions.yaml @@ -0,0 +1,60 @@ +# coding=utf-8 +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# SQL Transform with PostgreSQL Functions +# This example shows how to use calcite_connection_properties to enable PostgreSQL-specific SQL functions. +# The 'fun' property tells Calcite to use PostgreSQL function library, enabling functions like SPLIT_PART. + +pipeline: + transforms: + - type: Create + name: CreateEmailData + config: + elements: + - {id: 1, email: "alice@example.com", full_name: "Alice Smith"} + - {id: 2, email: "bob.jones@company.org", full_name: "Bob Jones"} + - {id: 3, email: "charlie_brown@test.net", full_name: "Charlie Brown"} + - {id: 4, email: "diana.wilson@work.co", full_name: "Diana Wilson"} + + - type: Sql + name: ExtractEmailParts + input: CreateEmailData + config: + query: | + SELECT + id, + full_name, + email, + SPLIT_PART(email, '@', 1) as username, + SPLIT_PART(email, '@', 2) as domain, + CASE + WHEN SPLIT_PART(email, '@', 2) LIKE '%.com' THEN 'Commercial' + WHEN SPLIT_PART(email, '@', 2) LIKE '%.org' THEN 'Organization' + ELSE 'Other' + END as domain_type + FROM PCOLLECTION + ORDER BY domain + + - type: LogForTesting + input: ExtractEmailParts + +# Pipeline options demonstrate how to configure calcite_connection_properties +# The 'fun' property enables PostgreSQL function library in Calcite SQL parser +options: + calcite_connection_properties: + fun: "postgresql" + streaming: false \ No newline at end of file diff --git a/sdks/python/apache_beam/yaml/tests/sql.yaml b/sdks/python/apache_beam/yaml/tests/sql.yaml index 0040a2790c54..a598fffc4181 100644 --- a/sdks/python/apache_beam/yaml/tests/sql.yaml +++ b/sdks/python/apache_beam/yaml/tests/sql.yaml @@ -93,3 +93,99 @@ pipelines: - type: PyTransform config: constructor: apache_beam.transforms.util.LogElements + + # Test calcite_connection_properties with PostgreSQL functions + - pipeline: + type: chain + transforms: + - type: Create + name: CreateEmailData + config: + elements: + - {email: "alice@example.com", id: 1} + - {email: "bob@test.org", id: 2} + + - type: Sql + name: PostgreSQLFunctions + config: + query: | + SELECT + id, + email, + SPLIT_PART(email, '@', 1) as username, + SPLIT_PART(email, '@', 2) as domain + FROM PCOLLECTION + + - type: AssertEqual + config: + elements: + - {id: 1, email: "alice@example.com", username: "alice", domain: "example.com"} + - {id: 2, email: "bob@test.org", username: "bob", domain: "test.org"} + + options: + calcite_connection_properties: + fun: "postgresql" + + # Test calcite_connection_properties with BigQuery syntax + - pipeline: + type: chain + transforms: + - type: Create + name: CreateArrayData + config: + elements: + - {id: 1, tags: ["tag1", "tag2", "tag3"]} + - {id: 2, tags: ["tagA", "tagB"]} + + - type: Sql + name: BigQueryArrayFunctions + config: + query: | + SELECT + id, + ARRAY_TO_STRING(tags, '|') as tags_string, + ARRAY_LENGTH(tags) as tag_count + FROM PCOLLECTION + + - type: AssertEqual + config: + elements: + - {id: 1, tags_string: "tag1|tag2|tag3", tag_count: 3} + - {id: 2, tags_string: "tagA|tagB", tag_count: 2} + + options: + calcite_connection_properties: + fun: "bigquery" + lex: "big_query" + + # Test calcite_connection_properties with case sensitivity settings + - pipeline: + type: chain + transforms: + - type: Create + name: CreateCaseTestData + config: + elements: + - {ID: 1, Name: "Alice", Email: "alice@test.com"} + - {ID: 2, Name: "Bob", Email: "bob@test.com"} + + - type: Sql + name: CaseSensitiveSQL + config: + query: | + SELECT + id as lower_id, + name as lower_name, + email as lower_email + FROM PCOLLECTION + + - type: AssertEqual + config: + elements: + - {lower_id: 1, lower_name: "Alice", lower_email: "alice@test.com"} + - {lower_id: 2, lower_name: "Bob", lower_email: "bob@test.com"} + + options: + calcite_connection_properties: + caseSensitive: "false" + unquotedCasing: "TO_LOWER" diff --git a/website/www/site/content/en/documentation/patterns/side-inputs.md b/website/www/site/content/en/documentation/patterns/side-inputs.md index 136eeef29ada..675503a32baf 100644 --- a/website/www/site/content/en/documentation/patterns/side-inputs.md +++ b/website/www/site/content/en/documentation/patterns/side-inputs.md @@ -48,7 +48,7 @@ For instance, the following code sample uses a `Map` to create a `DoFn`. The `Ma {{< /highlight >}} {{< highlight py >}} -No sample present. +{{< code_sample "sdks/python/apache_beam/examples/snippets/snippets.py" SideInputPatternSlowUpdateGlobalWindowSnip1 >}} {{< /highlight >}}