From b52c85f676f1d1566ccb36b27787ba7f43ca23c3 Mon Sep 17 00:00:00 2001 From: Talat Uyarer Date: Tue, 30 Sep 2025 00:46:18 -0700 Subject: [PATCH 1/4] create catalog and iceberg table details --- .../dsls/sql/extensions/create-catalog.md | 258 ++++++++++++++++++ .../sql/extensions/create-external-table.md | 171 ++++++++++++ 2 files changed, 429 insertions(+) create mode 100644 website/www/site/content/en/documentation/dsls/sql/extensions/create-catalog.md diff --git a/website/www/site/content/en/documentation/dsls/sql/extensions/create-catalog.md b/website/www/site/content/en/documentation/dsls/sql/extensions/create-catalog.md new file mode 100644 index 000000000000..0b71330efd8c --- /dev/null +++ b/website/www/site/content/en/documentation/dsls/sql/extensions/create-catalog.md @@ -0,0 +1,258 @@ +--- +type: languages +title: "Beam SQL extension: CREATE CATALOG Statement" +aliases: + - /documentation/dsls/sql/create-catalog/ + - /documentation/dsls/sql/statements/create-catalog/ +--- + + +# Beam SQL extensions: CREATE CATALOG + +Beam SQL's `CREATE CATALOG` statement creates and registers a catalog that manages metadata for external data sources. Catalogs provide a unified interface for accessing different types of data stores and enable features like schema management, table discovery, and cross-catalog queries. + +Currently, Beam SQL supports the **Apache Iceberg** catalog type, which provides access to Iceberg tables with full ACID transaction support, schema evolution, and time travel capabilities. + +## Syntax + +``` +CREATE CATALOG [ IF NOT EXISTS ] catalogName +TYPE catalogType +[PROPERTIES (propertyKey = propertyValue [, propertyKey = propertyValue ]*)] +``` + +* `IF NOT EXISTS`: Optional. If the catalog is already registered, Beam SQL + ignores the statement instead of returning an error. +* `catalogName`: The case sensitive name of the catalog to create and register, + specified as an [Identifier](/documentation/dsls/sql/calcite/lexical#identifiers). +* `catalogType`: The type of catalog to create. Currently supported values: + * `iceberg`: Apache Iceberg catalog +* `PROPERTIES`: Optional. Key-value pairs for catalog-specific configuration. + Each property is specified as `'key' = 'value'` with string literals. + +## Apache Iceberg Catalog + +The Iceberg catalog provides access to [Apache Iceberg](https://iceberg.apache.org/) tables, which are high-performance table formats for huge analytic datasets. + +### Syntax + +``` +CREATE CATALOG [ IF NOT EXISTS ] catalogName +TYPE iceberg +PROPERTIES ( + 'catalog-impl' = 'catalogImplementation', + 'warehouse' = 'warehouseLocation' + [, additionalProperties...] +) +``` + +### Required Properties + +* `catalog-impl`: The Iceberg catalog implementation class. Common values: + * `org.apache.iceberg.hadoop.HadoopCatalog`: For Hadoop-compatible storage (HDFS, S3, GCS, etc.) + * `org.apache.iceberg.gcp.bigquery.BigQueryMetastoreCatalog`: For BigQuery integration + * `org.apache.iceberg.jdbc.JdbcCatalog`: For JDBC-based metadata storage + * `org.apache.iceberg.rest.RESTCatalog`: For REST-based catalog access +* `warehouse`: The root location where Iceberg tables and metadata are stored. + Format depends on the storage system: + * **Local filesystem**: `file:///path/to/warehouse` + * **HDFS**: `hdfs://namenode:port/path/to/warehouse` + * **S3**: `s3://bucket-name/path/to/warehouse` + * **Google Cloud Storage**: `gs://bucket-name/path/to/warehouse` + +### Optional Properties + +The available optional properties depend on the catalog implementation: + +#### Hadoop Catalog Properties + +* `io-impl`: The file I/O implementation class. Common values: + * `org.apache.iceberg.hadoop.HadoopFileIO`: For Hadoop-compatible storage + * `org.apache.iceberg.aws.s3.S3FileIO`: For S3 storage + * `org.apache.iceberg.gcp.gcs.GCSFileIO`: For Google Cloud Storage +* `hadoop.*`: Any Hadoop configuration property (e.g., `hadoop.fs.s3a.access.key`) + +#### BigQuery Metastore Catalog Properties + +* `io-impl`: Must be `org.apache.iceberg.gcp.gcs.GCSFileIO` for GCS storage +* `gcp_project`: Google Cloud Project ID +* `gcp_region`: Google Cloud region (e.g., `us-central1`) +* `gcp_location`: Alternative to `gcp_region` for specifying location + +#### JDBC Catalog Properties + +* `uri`: JDBC connection URI +* `jdbc.user`: Database username +* `jdbc.password`: Database password +* `jdbc.driver`: JDBC driver class name + +### Examples + +#### Hadoop Catalog with Local Storage + +```sql +CREATE CATALOG my_iceberg_catalog +TYPE iceberg +PROPERTIES ( + 'catalog-impl' = 'org.apache.iceberg.hadoop.HadoopCatalog', + 'warehouse' = 'file:///tmp/iceberg-warehouse' +) +``` + +#### Hadoop Catalog with S3 Storage + +```sql +CREATE CATALOG s3_iceberg_catalog +TYPE iceberg +PROPERTIES ( + 'catalog-impl' = 'org.apache.iceberg.hadoop.HadoopCatalog', + 'warehouse' = 's3://my-bucket/iceberg-warehouse', + 'io-impl' = 'org.apache.iceberg.aws.s3.S3FileIO', + 'hadoop.fs.s3a.access.key' = 'your-access-key', + 'hadoop.fs.s3a.secret.key' = 'your-secret-key' +) +``` + +#### BigQuery Metastore Catalog + +```sql +CREATE CATALOG bigquery_iceberg_catalog +TYPE iceberg +PROPERTIES ( + 'catalog-impl' = 'org.apache.iceberg.gcp.bigquery.BigQueryMetastoreCatalog', + 'io-impl' = 'org.apache.iceberg.gcp.gcs.GCSFileIO', + 'warehouse' = 'gs://my-bucket/iceberg-warehouse', + 'gcp_project' = 'my-gcp-project', + 'gcp_region' = 'us-central1' +) +``` + +#### JDBC Catalog + +```sql +CREATE CATALOG jdbc_iceberg_catalog +TYPE iceberg +PROPERTIES ( + 'catalog-impl' = 'org.apache.iceberg.jdbc.JdbcCatalog', + 'uri' = 'jdbc:postgresql://localhost:5432/iceberg_metadata', + 'jdbc.user' = 'iceberg_user', + 'jdbc.password' = 'iceberg_password', + 'jdbc.driver' = 'org.postgresql.Driver', + 'warehouse' = 's3://my-bucket/iceberg-warehouse' +) +``` + +## Using Catalogs + +After creating a catalog, you can use it to manage databases and tables: + +### Switch to a Catalog + +```sql +USE CATALOG catalogName +``` + +### Create and Use a Database + +```sql +-- Create a database (namespace) +CREATE DATABASE my_database + +-- Use the database +USE DATABASE my_database +``` + +### Create Tables in the Catalog + +Once you've switched to a catalog and database, you can create tables: + +```sql +-- Switch to your catalog and database +USE CATALOG my_iceberg_catalog +USE DATABASE my_database + +-- Create an Iceberg table +CREATE EXTERNAL TABLE users ( + id BIGINT, + username VARCHAR, + email VARCHAR, + created_at TIMESTAMP +) +TYPE iceberg +``` + +## Catalog Management + +### List Available Catalogs + +```sql +SHOW CATALOGS +``` + +### Drop a Catalog + +```sql +DROP CATALOG [ IF EXISTS ] catalogName +``` + +## Best Practices + +### Security + +* **Credentials**: Store sensitive credentials (access keys, passwords) in secure configuration systems rather than hardcoding them in SQL statements +* **IAM Roles**: Use IAM roles and service accounts when possible instead of access keys +* **Network Security**: Ensure proper network access controls for your storage systems + +### Performance + +* **Warehouse Location**: Choose a warehouse location that's geographically close to your compute resources +* **Partitioning**: Use appropriate partitioning strategies for your data access patterns +* **File Formats**: Iceberg automatically manages file formats, but consider compression settings for your use case + +### Monitoring + +* **Catalog Health**: Monitor catalog connectivity and performance +* **Storage Usage**: Track warehouse storage usage and implement lifecycle policies +* **Query Performance**: Monitor query performance and optimize table schemas as needed + +## Troubleshooting + +### Common Issues + +#### Catalog Creation Fails + +* **Check Dependencies**: Ensure all required Iceberg dependencies are available in your classpath +* **Verify Properties**: Double-check that all required properties are provided and correctly formatted +* **Storage Access**: Ensure your compute environment has access to the specified warehouse location + +#### Table Operations Fail + +* **Catalog Context**: Make sure you're using the correct catalog with `USE CATALOG` +* **Database Context**: Ensure you're in the correct database with `USE DATABASE` +* **Permissions**: Verify that your credentials have the necessary permissions for the storage system + +#### Performance Issues + +* **Partitioning**: Review your table partitioning strategy +* **File Size**: Check if files are too large or too small for your use case +* **Compression**: Consider adjusting compression settings for your data types + +### Getting Help + +For more information about Apache Iceberg: + +* [Apache Iceberg Documentation](https://iceberg.apache.org/docs/) +* [Iceberg Catalog Implementations](https://iceberg.apache.org/docs/latest/configuration/) +* [Beam SQL Documentation](/documentation/dsls/sql/) diff --git a/website/www/site/content/en/documentation/dsls/sql/extensions/create-external-table.md b/website/www/site/content/en/documentation/dsls/sql/extensions/create-external-table.md index ad6ba66beb20..62e30b515e44 100644 --- a/website/www/site/content/en/documentation/dsls/sql/extensions/create-external-table.md +++ b/website/www/site/content/en/documentation/dsls/sql/extensions/create-external-table.md @@ -72,6 +72,8 @@ tableElement: columnName fieldType [ NOT NULL ] * `kafka` * `parquet` * `text` + * `iceberg` + * `datagen` * `location`: The I/O specific location of the underlying table, specified as a [String Literal](/documentation/dsls/sql/calcite/lexical/#string-literals). @@ -748,6 +750,175 @@ TYPE text LOCATION '/home/admin/orders' ``` +## Apache Iceberg + +Beam SQL supports reading from and writing to [Apache Iceberg](https://iceberg.apache.org/) tables. Iceberg is a high-performance table format for huge analytic datasets that provides ACID transactions, schema evolution, and time travel capabilities. + +**Prerequisites**: Before creating Iceberg tables, you must first create an Iceberg catalog. See the [CREATE CATALOG](/documentation/dsls/sql/extensions/create-catalog/) documentation for details. + +### Syntax + +``` +CREATE EXTERNAL TABLE [ IF NOT EXISTS ] tableName (tableElement [, tableElement ]*) +TYPE iceberg +[PARTITIONED BY (partitionField [, partitionField ]*)] +[TBLPROPERTIES tblProperties] +``` + +* `tableName`: The case sensitive name of the table to create and register. +* `tableElement`: `columnName` `fieldType` `[ NOT NULL ]` + * `columnName`: The case sensitive name of the column. + * `fieldType`: The field's type, specified as one of the following types: + * `simpleType`: `TINYINT`, `SMALLINT`, `INTEGER`, `BIGINT`, `FLOAT`, + `DOUBLE`, `DECIMAL`, `BOOLEAN`, `DATE`, `TIME`, `TIMESTAMP`, `CHAR`, + `VARCHAR`, `BINARY`, `VARBINARY` + * `MAP` + * `ARRAY` + * `ROW` + * `NOT NULL`: Optional. Indicates that the column is not nullable. +* `PARTITIONED BY`: Optional. Specifies partition fields for the table. Supports various partition functions: + * `identity(columnName)`: Identity partitioning + * `bucket(columnName, numBuckets)`: Bucket partitioning + * `truncate(columnName, width)`: Truncate partitioning + * `year(columnName)`, `month(columnName)`, `day(columnName)`, `hour(columnName)`: Time-based partitioning +* `TBLPROPERTIES`: Optional. JSON object with table-specific configuration: + * `triggering_frequency_seconds`: For streaming pipelines, specifies how often to commit snapshots (in seconds). + +### Read Mode + +Beam SQL supports reading from existing Iceberg tables. The connector automatically infers the table schema from the Iceberg table metadata and supports: + +* **Predicate push-down**: Filters are pushed down to the Iceberg scan level for better performance +* **Projection push-down**: Only requested columns are read from the table +* **Schema evolution**: Automatically handles schema changes in the underlying Iceberg table + +### Write Mode + +Beam SQL supports writing to Iceberg tables with the following features: + +* **Automatic table creation**: If the table doesn't exist, it will be created with the specified schema +* **ACID transactions**: All writes are committed as atomic transactions +* **Schema validation**: Ensures data matches the table schema +* **Partitioning**: Supports writing to partitioned tables +* **Streaming support**: For streaming pipelines, commits are performed at regular intervals + +### Schema + +Beam SQL types map to Iceberg types as follows: + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + +
Beam SQL Type + Iceberg Type +
TINYINT, SMALLINT, INTEGER, BIGINT   + INTEGER, LONG +
FLOAT, DOUBLE + FLOAT, DOUBLE +
DECIMAL + DECIMAL +
BOOLEAN + BOOLEAN +
DATE, TIME, TIMESTAMP + DATE, TIME, TIMESTAMP +
CHAR, VARCHAR + STRING +
BINARY, VARBINARY + BINARY +
ARRAY + LIST +
MAP + MAP +
ROW + STRUCT +
+ +### Examples + +#### Basic Table Creation + +```sql +CREATE EXTERNAL TABLE users ( + id BIGINT, + username VARCHAR, + email VARCHAR, + created_at TIMESTAMP +) +TYPE iceberg +``` + +#### Partitioned Table + +```sql +CREATE EXTERNAL TABLE events ( + event_id BIGINT, + user_id BIGINT, + event_type VARCHAR, + event_timestamp TIMESTAMP, + data ROW +) +TYPE iceberg +PARTITIONED BY ( + 'bucket(user_id, 10)', + 'day(event_timestamp)', + 'event_type' +) +``` + +#### Streaming Table with Commit Frequency + +```sql +CREATE EXTERNAL TABLE streaming_events ( + id BIGINT, + message VARCHAR, + timestamp TIMESTAMP +) +TYPE iceberg +TBLPROPERTIES '{"triggering_frequency_seconds": 60}' +``` + ## DataGen The **DataGen** connector allows for creating tables based on in-memory data generation. This is useful for developing and testing queries locally without requiring access to external systems. The DataGen connector is built-in; no additional dependencies are required.It is available for Beam 2.67.0+ From 257cd5ae1d90223fbb9e8c862086d8114cbd4389 Mon Sep 17 00:00:00 2001 From: Ahmed Abualsaud Date: Fri, 6 Feb 2026 16:47:39 -0500 Subject: [PATCH 2/4] add sql ddl website documentation --- .../provider/iceberg/IcebergMetastore.java | 8 +- .../meta/provider/iceberg/IcebergTable.java | 19 +- .../provider/iceberg/PubsubToIcebergIT.java | 4 +- .../sdk/io/iceberg/IcebergCatalogConfig.java | 4 +- .../en/documentation/dsls/sql/ddl/alter.md | 75 +++++ .../en/documentation/dsls/sql/ddl/create.md | 120 ++++++++ .../en/documentation/dsls/sql/ddl/drop.md | 50 ++++ .../en/documentation/dsls/sql/ddl/overview.md | 53 ++++ .../en/documentation/dsls/sql/ddl/show.md | 83 ++++++ .../en/documentation/dsls/sql/ddl/use.md | 48 ++++ .../dsls/sql/extensions/create-catalog.md | 258 ------------------ .../sql/extensions/create-external-table.md | 171 ------------ .../partials/section-menu/en/sdks.html | 11 + 13 files changed, 468 insertions(+), 436 deletions(-) create mode 100644 website/www/site/content/en/documentation/dsls/sql/ddl/alter.md create mode 100644 website/www/site/content/en/documentation/dsls/sql/ddl/create.md create mode 100644 website/www/site/content/en/documentation/dsls/sql/ddl/drop.md create mode 100644 website/www/site/content/en/documentation/dsls/sql/ddl/overview.md create mode 100644 website/www/site/content/en/documentation/dsls/sql/ddl/show.md create mode 100644 website/www/site/content/en/documentation/dsls/sql/ddl/use.md delete mode 100644 website/www/site/content/en/documentation/dsls/sql/extensions/create-catalog.md diff --git a/sdks/java/extensions/sql/iceberg/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/iceberg/IcebergMetastore.java b/sdks/java/extensions/sql/iceberg/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/iceberg/IcebergMetastore.java index b73aa25c7a2b..72e8988f41e2 100644 --- a/sdks/java/extensions/sql/iceberg/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/iceberg/IcebergMetastore.java +++ b/sdks/java/extensions/sql/iceberg/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/iceberg/IcebergMetastore.java @@ -22,6 +22,9 @@ import java.util.HashMap; import java.util.Map; +import java.util.stream.Collectors; + +import com.fasterxml.jackson.core.type.TypeReference; import org.apache.beam.sdk.extensions.sql.TableUtils; import org.apache.beam.sdk.extensions.sql.impl.TableName; import org.apache.beam.sdk.extensions.sql.meta.BeamSqlTable; @@ -59,8 +62,11 @@ public void createTable(Table table) { getProvider(table.getType()).createTable(table); } else { String identifier = getIdentifier(table); + Map props = TableUtils.getObjectMapper() + .convertValue(table.getProperties(), new TypeReference>() {}).entrySet() + .stream().filter(p -> !p.getKey().startsWith("beam.")).collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)); try { - catalogConfig.createTable(identifier, table.getSchema(), table.getPartitionFields()); + catalogConfig.createTable(identifier, table.getSchema(), table.getPartitionFields(), props); } catch (TableAlreadyExistsException e) { LOG.info( "Iceberg table '{}' already exists at location '{}'.", table.getName(), identifier); diff --git a/sdks/java/extensions/sql/iceberg/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/iceberg/IcebergTable.java b/sdks/java/extensions/sql/iceberg/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/iceberg/IcebergTable.java index b68aa34a1777..e104b21bf132 100644 --- a/sdks/java/extensions/sql/iceberg/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/iceberg/IcebergTable.java +++ b/sdks/java/extensions/sql/iceberg/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/iceberg/IcebergTable.java @@ -19,6 +19,7 @@ import static org.apache.beam.sdk.util.Preconditions.checkStateNotNull; +import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.node.ObjectNode; import java.util.HashMap; import java.util.List; @@ -56,6 +57,8 @@ class IcebergTable extends SchemaBaseBeamTable { @VisibleForTesting static final String CATALOG_PROPERTIES_FIELD = "catalog_properties"; @VisibleForTesting static final String HADOOP_CONFIG_PROPERTIES_FIELD = "config_properties"; @VisibleForTesting static final String CATALOG_NAME_FIELD = "catalog_name"; + static final String BEAM_WRITE_PROPERTY = "beam.write."; + static final String BEAM_READ_PROPERTY = "beam.read."; @VisibleForTesting static final String TRIGGERING_FREQUENCY_FIELD = "triggering_frequency_seconds"; @@ -71,9 +74,21 @@ class IcebergTable extends SchemaBaseBeamTable { this.tableIdentifier = tableIdentifier; this.catalogConfig = catalogConfig; ObjectNode properties = table.getProperties(); - if (properties.has(TRIGGERING_FREQUENCY_FIELD)) { - this.triggeringFrequency = properties.get(TRIGGERING_FREQUENCY_FIELD).asInt(); + for (Map.Entry property : properties.properties()) { + String name = property.getKey(); + if (name.startsWith(BEAM_WRITE_PROPERTY)) { + String prop = name.substring(BEAM_WRITE_PROPERTY.length()); + if (prop.equals(TRIGGERING_FREQUENCY_FIELD)) { + this.triggeringFrequency = property.getValue().asInt(); + } else { + throw new IllegalArgumentException("Unknown Beam write property: " + name); + } + } else if (name.startsWith(BEAM_READ_PROPERTY)) { + // none supported yet + throw new IllegalArgumentException("Unknown Beam read property: " + name); + } } + this.partitionFields = table.getPartitionFields(); } diff --git a/sdks/java/extensions/sql/iceberg/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/iceberg/PubsubToIcebergIT.java b/sdks/java/extensions/sql/iceberg/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/iceberg/PubsubToIcebergIT.java index 900fdae743a1..8b250af2754a 100644 --- a/sdks/java/extensions/sql/iceberg/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/iceberg/PubsubToIcebergIT.java +++ b/sdks/java/extensions/sql/iceberg/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/iceberg/PubsubToIcebergIT.java @@ -156,7 +156,7 @@ public void testSimpleInsertWithPartitionedFields() throws Exception { + ") \n" + "TYPE 'iceberg' \n" + "PARTITIONED BY('id', 'truncate(name, 3)') \n" - + "TBLPROPERTIES '{ \"triggering_frequency_seconds\" : 10 }'"; + + "TBLPROPERTIES '{ \"beam.write.triggering_frequency_seconds\" : 10 }'"; String insertStatement = format("INSERT INTO %s \n", tableIdentifier) + "SELECT \n" @@ -211,7 +211,7 @@ public void testSimpleInsertFlat() throws Exception { + " name VARCHAR \n " + ") \n" + "TYPE 'iceberg' \n" - + "TBLPROPERTIES '{ \"triggering_frequency_seconds\" : 10 }'"; + + "TBLPROPERTIES '{ \"beam.write.triggering_frequency_seconds\" : 10 }'"; String insertStatement = format("INSERT INTO %s \n", tableIdentifier) + "SELECT \n" diff --git a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergCatalogConfig.java b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergCatalogConfig.java index 7603e2c6259f..e7ee6dfb94ef 100644 --- a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergCatalogConfig.java +++ b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergCatalogConfig.java @@ -143,7 +143,7 @@ public boolean dropNamespace(String namespace, boolean cascade) { } public void createTable( - String tableIdentifier, Schema tableSchema, @Nullable List partitionFields) { + String tableIdentifier, Schema tableSchema, @Nullable List partitionFields, @Nullable Map properties) { TableIdentifier icebergIdentifier = TableIdentifier.parse(tableIdentifier); org.apache.iceberg.Schema icebergSchema = IcebergUtils.beamSchemaToIcebergSchema(tableSchema); PartitionSpec icebergSpec = PartitionUtils.toPartitionSpec(partitionFields, tableSchema); @@ -153,7 +153,7 @@ public void createTable( icebergIdentifier, icebergSchema, icebergSpec); - catalog().createTable(icebergIdentifier, icebergSchema, icebergSpec); + catalog().createTable(icebergIdentifier, icebergSchema, icebergSpec, properties); LOG.info("Successfully created table '{}'.", icebergIdentifier); } catch (AlreadyExistsException e) { throw new TableAlreadyExistsException(e); diff --git a/website/www/site/content/en/documentation/dsls/sql/ddl/alter.md b/website/www/site/content/en/documentation/dsls/sql/ddl/alter.md new file mode 100644 index 000000000000..d2e04037735a --- /dev/null +++ b/website/www/site/content/en/documentation/dsls/sql/ddl/alter.md @@ -0,0 +1,75 @@ +--- +type: languages +title: "Beam SQL DDL: Alter" +--- + + +# ALTER statements + +The **ALTER** statement modifies the definition of an existing Catalog or Table. +For supported tables (like Iceberg), this enables **schema and partition evolution**. + +## ALTER CATALOG +Modifies an existing catalog's properties. + +```sql +ALTER CATALOG catalog_name + [ SET ( 'key' = 'val', ... ) ] + [ RESET ( 'key', ... ) ] +``` +- **SET**: Adds new properties or updates existing ones. +- **RESET** / **UNSET**: Removes properties. + +## ALTER TABLE +Modifies an existing table's properties and evolves its partition and schema. + +```sql +ALTER TABLE table_name + [ ADD COLUMNS ( col_def, ... ) ] + [ DROP COLUMNS ( col_name, ... ) ] + [ ADD PARTITIONS ( partition_field, ... ) ] + [ DROP PARTITIONS ( partition_field, ... ) ] + [ SET ( 'key' = 'val', ... ) ] + [ ( RESET | UNSET ) ( 'key', ... ) ]; +``` + +*Example 1: Add or remove columns* +```sql +-- Add columns +ALTER TABLE orders ADD COLUMNS ( + customer_email VARCHAR, + shipping_region VARCHAR +); + +-- Drop columns +ALTER TABLE orders DROP COLUMNS ( customer_email ); +``` + +*Example 2: Modify partition spec* +```sql +-- Add a partition field +ALTER TABLE orders ADD PARTITIONS ( 'year(order_date)' ); + +-- Remove a partition field +ALTER TABLE orders DROP PARTITIONS ( 'region_id' ); +``` + +*Example 3: Modify table properties* +```sql +ALTER TABLE orders SET ( + 'write.format.default' = 'orc', + 'write.metadata.metrics.default' = 'full' ); +ALTER TABLE orders RESET ( 'write.target-file-size-bytes' ); +``` diff --git a/website/www/site/content/en/documentation/dsls/sql/ddl/create.md b/website/www/site/content/en/documentation/dsls/sql/ddl/create.md new file mode 100644 index 000000000000..73d44207c390 --- /dev/null +++ b/website/www/site/content/en/documentation/dsls/sql/ddl/create.md @@ -0,0 +1,120 @@ +--- +type: languages +title: "Beam SQL DDL: Create" +--- + + +# CREATE statements + +The **CREATE** command serves two potential functions depending on the connector: + +- **Registration**: By default, it registers an existing external entity in the Beam SQL session. +- **Instantiation**: For supported connectors (e.g., Iceberg), it physically creates the entity +(e.g. namespace or table) in the underlying storage. + +_**Note**: Creating a catalog or database does not automatically switch to it. Remember +to run [USE](TODO:LINK-TO-USE) afterwards to set it as a default._ + +## `CREATE CATALOG` +Registers a new catalog instance. + +```sql +CREATE CATALOG [ IF NOT EXISTS ] catalog_name +TYPE 'type_name' +[ PROPERTIES ( 'key' = 'value' [, ...] ) ] +``` + +_**Example**: Creating a Hadoop Catalog (Local Storage)_ +```sql +CREATE CATALOG local_catalog +TYPE iceberg +PROPERTIES ( + 'type' = 'hadoop', + 'warehouse' = 'file:///tmp/iceberg-warehouse' +) +``` + +_**Example**: Registering a BigLake Catalog (GCS)_ +```sql +CREATE CATALOG prod_iceberg +TYPE iceberg +PROPERTIES ( + 'type' = 'rest', + 'uri' = 'https://biglake.googleapis.com/iceberg/v1/restcatalog', + 'warehouse' = 'gs://my-company-bucket/warehouse', + 'header.x-goog-user-project' = 'my_prod_project', + 'rest.auth.type' = 'org.apache.iceberg.gcp.auth.GoogleAuthManager', + 'io-impl' = 'org.apache.iceberg.gcp.gcs.GCSFileIO', + 'rest-metrics-reporting-enabled' = 'false' +); +``` + +### `CREATE DATABASE` +Creates a new Database within the current Catalog (default), or the specified Catalog. +```sql +CREATE DATABASE [ IF NOT EXISTS ] [ catalog_name. ]database_name; +``` + +_**Example**: Create a database in the current active catalog_ +```sql +USE CATALOG my_catalog; +CREATE DATABASE sales_data; +``` + +_**Example**: Create a database in a specified catalog (must be registered)_ +```sql +CREATE DATABASE other_catalog.sales_data; +``` + +### `CREATE TABLE` +Creates a table within the currently active catalog and database. If the table name is fully qualified, the referenced database and catalog is used. + +```sql +CREATE EXTERNAL TABLE [ IF NOT EXISTS ] [ catalog. ][ db. ]table_name ( + col_name col_type [ NOT NULL ] [ COMMENT 'col_comment' ], + ... +) +TYPE 'type_name' +[ PARTITIONED BY ( 'partition_field' [, ... ] ) ] +[ COMMENT 'table_comment' ] +[ LOCATION 'location_uri' ] +[ TBLPROPERTIES 'properties_json_string' ]; +``` +- **TYPE**: the table type (e.g. `'iceberg'`, `'text'`, `'kafka'`) +- **PARTITIONED BY**: an ordered list of fields describing the partition spec. +- **LOCATION**: explicitly sets the location of the table (overriding the inferred `catalog.db.table_name` location) +- **TBLPROPERTIES**: configuration properties used when creating the table or setting up its IO connection. + +_**Example**: Creating an Iceberg Table_ +```sql +CREATE EXTERNAL TABLE prod_iceberg.sales_data.orders ( + order_id BIGINT NOT NULL COMMENT 'Unique order identifier', + amount DECIMAL(10, 2), + order_date TIMESTAMP, + region_id VARCHAR +) +TYPE 'iceberg' +PARTITIONED BY ( 'region_id', 'day(order_date)' ) +COMMENT 'Daily sales transactions' +TBLPROPERTIES '{ + "write.format.default": "parquet", + "read.split.target-size": 268435456", + "beam.write.triggering_frequency_seconds": 60" +}'; +``` +- This creates an Iceberg table named `orders` under the namespace `sales_data`, within the `prod_iceberg` catalog. +- The table is partitioned by `region_id`, then by the day value of `order_date` (using Iceberg's [hidden partitioning](https://iceberg.apache.org/docs/latest/partitioning/#icebergs-hidden-partitioning)). +- The table is created with the appropriate properties `"write.format.default"` and `"read.split.target-size"`. The Beam property `"beam.write.triggering_frequency_seconds"` +- Beam properties (prefixed with `"beam.write."` and `"beam.read."` are intended for the relevant IOs) diff --git a/website/www/site/content/en/documentation/dsls/sql/ddl/drop.md b/website/www/site/content/en/documentation/dsls/sql/ddl/drop.md new file mode 100644 index 000000000000..fb6eab45cf37 --- /dev/null +++ b/website/www/site/content/en/documentation/dsls/sql/ddl/drop.md @@ -0,0 +1,50 @@ +--- +type: languages +title: "Beam SQL DDL: Drop" +--- + + +# DROP statements +The **DROP** command serves two potential functions depending on the connector: + +- **Unregistration**: unregisters an entity from the current Beam SQL session. +- **Deletion**: For supported connectors (like **Iceberg**), it **physically deletes** the entity + (e.g. namespace or table) in the underlying storage. + + **Caution:** Physical deletion can be permanent + +## DROP CATALOG +Unregisters a catalog from Beam SQL. This does not destroy external data, only the link within the SQL session. + +```sql +DROP CATALOG [ IF EXISTS ] catalog_name; +``` + +## DROP DATABASE +Unregisters a database from the current session. For supported connectors, this +will also **delete** the database from the external data source. + +```sql +DROP DATABASE [ IF EXISTS ] database_name [ RESTRICT | CASCADE ]; +``` +- **RESTRICT** (Default): Fails if the database is not empty. +- **CASCADE**: Drops the database and all tables contained within it. **Use with caution.** + +## DROP TABLE +Unregisters a table from the current session. For supported connectors, this +will also **delete** the table from the external data source. +```sql +DROP TABLE [ IF EXISTS ] table_name; +``` diff --git a/website/www/site/content/en/documentation/dsls/sql/ddl/overview.md b/website/www/site/content/en/documentation/dsls/sql/ddl/overview.md new file mode 100644 index 000000000000..420408d431bb --- /dev/null +++ b/website/www/site/content/en/documentation/dsls/sql/ddl/overview.md @@ -0,0 +1,53 @@ +--- +type: languages +title: "Beam SQL DDL Overview" +--- + + +# Beam SQL DDL Overview + +Beam SQL provides a robust hierarchy for managing metadata for external data sources. Instead of treating all tables as flat objects, Beam SQL utilizes a three-tier namespace system: +1. Catalog: The highest level container (e.g. a Glue Catalog connected to S3 or a BigLake Catalog connected to GCS). +2. Database: A logical namespace within a Catalog (often maps to "namespace" in systems like Iceberg). +3. Table: The actual data entity containing schema and rows. + +This structure allows users to connect to multiple disparate systems (e.g., a production BigQuery catalog and a dev Iceberg catalog) simultaneously and switch contexts seamlessly. +It also allows interactions between these systems via cross-catalog queries. + +Click below to learn about metadata management at each level: + +## Catalogs +The Catalog is the entry point for external metadata. When you initialize Beam SQL, you start off with a `default` catalog that contains a `default` database. +You can register new catalogs, switch between them, and modify their configurations. + +### [CREATE](/create#CREATE-CATALOG) +### [USE](/use#USE-CATALOG) +### [DROP](/drop#DROP-CATALOG) +### [ALTER](/alter#ALTER-CATALOG) +### [SHOW](/show#SHOW-CATALOGS) + +## Databases + +### [CREATE](/create#CREATE-DATABASE) +### [USE](/use#USE-DATABASE) +### [DROP](/drop#DROP-DATABASE) +### [SHOW](/show#SHOW-DATABASES) + +## Tables + +### [CREATE](/create#CREATE-TABLE) +### [DROP](/drop#DROP-TABLE) +### [ALTER](/alter#ALTER-TABLE) +### [SHOW](/show#SHOW-TABLES) diff --git a/website/www/site/content/en/documentation/dsls/sql/ddl/show.md b/website/www/site/content/en/documentation/dsls/sql/ddl/show.md new file mode 100644 index 000000000000..167b96d164e3 --- /dev/null +++ b/website/www/site/content/en/documentation/dsls/sql/ddl/show.md @@ -0,0 +1,83 @@ +--- +type: languages +title: "Beam SQL DDL: Show" +--- + + +# SHOW statements + +The **SHOW** statement are used to list objects within a parent container +(e.g. databases within a catalog), or inspect the currently active context. +Results can be filtered using regex patterns. + +## SHOW CATALOGS +Lists all registered catalogs (name and type). Supports regex filtering. + +_**Example**: List all catalogs_ +```sql +SHOW CATALOGS; +``` + +_**Example**: List catalogs matching a pattern_ +```sql +SHOW CATALOGS LIKE 'prod_%'; +``` + +_**Example**: Verify which catalog is currently active_ +```sql +SHOW CURRENT CATALOG; +``` + +## SHOW DATABASES +Lists databases under the currently active catalog, or a specified catalog. + +_**Example**: List databases in the currently active catalog_ +```sql +SHOW DATABASES; +``` + +_**Example**: List databases in a specified catalog_ +```sql +SHOW DATABASES IN my_catalog; +``` + +_**Example**: List databases matching a pattern_ +```sql +SHOW DATABASES IN my_catalog LIKE '%geo%'; +``` + +_**Example**: Verify which database is currently active_ +```sql +SHOW CURRENT DATABASE; +``` + +## SHOW TABLES +Lists tables under the currently active database, or a specified database. + +_**Example**: List tables in the currently active database_ +```sql +SHOW TABLES; +``` + +_**Example**: List databases in a specified database_ +```sql +SHOW TABLES IN my_db; +SHOW TABLES IN my_catalog.my_db; +``` + +_**Example**: List databases matching a pattern_ +```sql +SHOW TABLES IN my_db LIKE '%orders%'; +``` diff --git a/website/www/site/content/en/documentation/dsls/sql/ddl/use.md b/website/www/site/content/en/documentation/dsls/sql/ddl/use.md new file mode 100644 index 000000000000..279b0e4ea2e7 --- /dev/null +++ b/website/www/site/content/en/documentation/dsls/sql/ddl/use.md @@ -0,0 +1,48 @@ +--- +type: languages +title: "Beam SQL DDL: Use" +--- + + +# USE statements + +The **USE** statement sets the active catalog or database for the current session. +This simplifies queries by allowing you to reference tables directly (e.g., `orders`), +avoiding the need for fully qualified names (e.g., `prod_catalog.sales_db.orders`). + +***Tip**: the SHOW CURRENT statement helps verify what the currently active entity is.* + +## USE CATALOG +Switches the current session's active Catalog. + +_**Note:** All subsequent DATABASE and TABLE commands will be executed under this Catalog, unless fully qualified._ + +```sql +USE CATALOG prod_iceberg; +``` + +## USE DATABASE +Switches the current session's active Database. + +_**Note:** All subsequent TABLE commands will be executed under this Database, unless fully qualified._ + +```sql +USE DATABASE sales_data; +``` + +Switch to a database in a specified catalog (_**Note:** this also switches the default to that catalog_): +```sql +USE DATABASE other_catalog.sales_data; +``` diff --git a/website/www/site/content/en/documentation/dsls/sql/extensions/create-catalog.md b/website/www/site/content/en/documentation/dsls/sql/extensions/create-catalog.md deleted file mode 100644 index 0b71330efd8c..000000000000 --- a/website/www/site/content/en/documentation/dsls/sql/extensions/create-catalog.md +++ /dev/null @@ -1,258 +0,0 @@ ---- -type: languages -title: "Beam SQL extension: CREATE CATALOG Statement" -aliases: - - /documentation/dsls/sql/create-catalog/ - - /documentation/dsls/sql/statements/create-catalog/ ---- - - -# Beam SQL extensions: CREATE CATALOG - -Beam SQL's `CREATE CATALOG` statement creates and registers a catalog that manages metadata for external data sources. Catalogs provide a unified interface for accessing different types of data stores and enable features like schema management, table discovery, and cross-catalog queries. - -Currently, Beam SQL supports the **Apache Iceberg** catalog type, which provides access to Iceberg tables with full ACID transaction support, schema evolution, and time travel capabilities. - -## Syntax - -``` -CREATE CATALOG [ IF NOT EXISTS ] catalogName -TYPE catalogType -[PROPERTIES (propertyKey = propertyValue [, propertyKey = propertyValue ]*)] -``` - -* `IF NOT EXISTS`: Optional. If the catalog is already registered, Beam SQL - ignores the statement instead of returning an error. -* `catalogName`: The case sensitive name of the catalog to create and register, - specified as an [Identifier](/documentation/dsls/sql/calcite/lexical#identifiers). -* `catalogType`: The type of catalog to create. Currently supported values: - * `iceberg`: Apache Iceberg catalog -* `PROPERTIES`: Optional. Key-value pairs for catalog-specific configuration. - Each property is specified as `'key' = 'value'` with string literals. - -## Apache Iceberg Catalog - -The Iceberg catalog provides access to [Apache Iceberg](https://iceberg.apache.org/) tables, which are high-performance table formats for huge analytic datasets. - -### Syntax - -``` -CREATE CATALOG [ IF NOT EXISTS ] catalogName -TYPE iceberg -PROPERTIES ( - 'catalog-impl' = 'catalogImplementation', - 'warehouse' = 'warehouseLocation' - [, additionalProperties...] -) -``` - -### Required Properties - -* `catalog-impl`: The Iceberg catalog implementation class. Common values: - * `org.apache.iceberg.hadoop.HadoopCatalog`: For Hadoop-compatible storage (HDFS, S3, GCS, etc.) - * `org.apache.iceberg.gcp.bigquery.BigQueryMetastoreCatalog`: For BigQuery integration - * `org.apache.iceberg.jdbc.JdbcCatalog`: For JDBC-based metadata storage - * `org.apache.iceberg.rest.RESTCatalog`: For REST-based catalog access -* `warehouse`: The root location where Iceberg tables and metadata are stored. - Format depends on the storage system: - * **Local filesystem**: `file:///path/to/warehouse` - * **HDFS**: `hdfs://namenode:port/path/to/warehouse` - * **S3**: `s3://bucket-name/path/to/warehouse` - * **Google Cloud Storage**: `gs://bucket-name/path/to/warehouse` - -### Optional Properties - -The available optional properties depend on the catalog implementation: - -#### Hadoop Catalog Properties - -* `io-impl`: The file I/O implementation class. Common values: - * `org.apache.iceberg.hadoop.HadoopFileIO`: For Hadoop-compatible storage - * `org.apache.iceberg.aws.s3.S3FileIO`: For S3 storage - * `org.apache.iceberg.gcp.gcs.GCSFileIO`: For Google Cloud Storage -* `hadoop.*`: Any Hadoop configuration property (e.g., `hadoop.fs.s3a.access.key`) - -#### BigQuery Metastore Catalog Properties - -* `io-impl`: Must be `org.apache.iceberg.gcp.gcs.GCSFileIO` for GCS storage -* `gcp_project`: Google Cloud Project ID -* `gcp_region`: Google Cloud region (e.g., `us-central1`) -* `gcp_location`: Alternative to `gcp_region` for specifying location - -#### JDBC Catalog Properties - -* `uri`: JDBC connection URI -* `jdbc.user`: Database username -* `jdbc.password`: Database password -* `jdbc.driver`: JDBC driver class name - -### Examples - -#### Hadoop Catalog with Local Storage - -```sql -CREATE CATALOG my_iceberg_catalog -TYPE iceberg -PROPERTIES ( - 'catalog-impl' = 'org.apache.iceberg.hadoop.HadoopCatalog', - 'warehouse' = 'file:///tmp/iceberg-warehouse' -) -``` - -#### Hadoop Catalog with S3 Storage - -```sql -CREATE CATALOG s3_iceberg_catalog -TYPE iceberg -PROPERTIES ( - 'catalog-impl' = 'org.apache.iceberg.hadoop.HadoopCatalog', - 'warehouse' = 's3://my-bucket/iceberg-warehouse', - 'io-impl' = 'org.apache.iceberg.aws.s3.S3FileIO', - 'hadoop.fs.s3a.access.key' = 'your-access-key', - 'hadoop.fs.s3a.secret.key' = 'your-secret-key' -) -``` - -#### BigQuery Metastore Catalog - -```sql -CREATE CATALOG bigquery_iceberg_catalog -TYPE iceberg -PROPERTIES ( - 'catalog-impl' = 'org.apache.iceberg.gcp.bigquery.BigQueryMetastoreCatalog', - 'io-impl' = 'org.apache.iceberg.gcp.gcs.GCSFileIO', - 'warehouse' = 'gs://my-bucket/iceberg-warehouse', - 'gcp_project' = 'my-gcp-project', - 'gcp_region' = 'us-central1' -) -``` - -#### JDBC Catalog - -```sql -CREATE CATALOG jdbc_iceberg_catalog -TYPE iceberg -PROPERTIES ( - 'catalog-impl' = 'org.apache.iceberg.jdbc.JdbcCatalog', - 'uri' = 'jdbc:postgresql://localhost:5432/iceberg_metadata', - 'jdbc.user' = 'iceberg_user', - 'jdbc.password' = 'iceberg_password', - 'jdbc.driver' = 'org.postgresql.Driver', - 'warehouse' = 's3://my-bucket/iceberg-warehouse' -) -``` - -## Using Catalogs - -After creating a catalog, you can use it to manage databases and tables: - -### Switch to a Catalog - -```sql -USE CATALOG catalogName -``` - -### Create and Use a Database - -```sql --- Create a database (namespace) -CREATE DATABASE my_database - --- Use the database -USE DATABASE my_database -``` - -### Create Tables in the Catalog - -Once you've switched to a catalog and database, you can create tables: - -```sql --- Switch to your catalog and database -USE CATALOG my_iceberg_catalog -USE DATABASE my_database - --- Create an Iceberg table -CREATE EXTERNAL TABLE users ( - id BIGINT, - username VARCHAR, - email VARCHAR, - created_at TIMESTAMP -) -TYPE iceberg -``` - -## Catalog Management - -### List Available Catalogs - -```sql -SHOW CATALOGS -``` - -### Drop a Catalog - -```sql -DROP CATALOG [ IF EXISTS ] catalogName -``` - -## Best Practices - -### Security - -* **Credentials**: Store sensitive credentials (access keys, passwords) in secure configuration systems rather than hardcoding them in SQL statements -* **IAM Roles**: Use IAM roles and service accounts when possible instead of access keys -* **Network Security**: Ensure proper network access controls for your storage systems - -### Performance - -* **Warehouse Location**: Choose a warehouse location that's geographically close to your compute resources -* **Partitioning**: Use appropriate partitioning strategies for your data access patterns -* **File Formats**: Iceberg automatically manages file formats, but consider compression settings for your use case - -### Monitoring - -* **Catalog Health**: Monitor catalog connectivity and performance -* **Storage Usage**: Track warehouse storage usage and implement lifecycle policies -* **Query Performance**: Monitor query performance and optimize table schemas as needed - -## Troubleshooting - -### Common Issues - -#### Catalog Creation Fails - -* **Check Dependencies**: Ensure all required Iceberg dependencies are available in your classpath -* **Verify Properties**: Double-check that all required properties are provided and correctly formatted -* **Storage Access**: Ensure your compute environment has access to the specified warehouse location - -#### Table Operations Fail - -* **Catalog Context**: Make sure you're using the correct catalog with `USE CATALOG` -* **Database Context**: Ensure you're in the correct database with `USE DATABASE` -* **Permissions**: Verify that your credentials have the necessary permissions for the storage system - -#### Performance Issues - -* **Partitioning**: Review your table partitioning strategy -* **File Size**: Check if files are too large or too small for your use case -* **Compression**: Consider adjusting compression settings for your data types - -### Getting Help - -For more information about Apache Iceberg: - -* [Apache Iceberg Documentation](https://iceberg.apache.org/docs/) -* [Iceberg Catalog Implementations](https://iceberg.apache.org/docs/latest/configuration/) -* [Beam SQL Documentation](/documentation/dsls/sql/) diff --git a/website/www/site/content/en/documentation/dsls/sql/extensions/create-external-table.md b/website/www/site/content/en/documentation/dsls/sql/extensions/create-external-table.md index 62e30b515e44..ad6ba66beb20 100644 --- a/website/www/site/content/en/documentation/dsls/sql/extensions/create-external-table.md +++ b/website/www/site/content/en/documentation/dsls/sql/extensions/create-external-table.md @@ -72,8 +72,6 @@ tableElement: columnName fieldType [ NOT NULL ] * `kafka` * `parquet` * `text` - * `iceberg` - * `datagen` * `location`: The I/O specific location of the underlying table, specified as a [String Literal](/documentation/dsls/sql/calcite/lexical/#string-literals). @@ -750,175 +748,6 @@ TYPE text LOCATION '/home/admin/orders' ``` -## Apache Iceberg - -Beam SQL supports reading from and writing to [Apache Iceberg](https://iceberg.apache.org/) tables. Iceberg is a high-performance table format for huge analytic datasets that provides ACID transactions, schema evolution, and time travel capabilities. - -**Prerequisites**: Before creating Iceberg tables, you must first create an Iceberg catalog. See the [CREATE CATALOG](/documentation/dsls/sql/extensions/create-catalog/) documentation for details. - -### Syntax - -``` -CREATE EXTERNAL TABLE [ IF NOT EXISTS ] tableName (tableElement [, tableElement ]*) -TYPE iceberg -[PARTITIONED BY (partitionField [, partitionField ]*)] -[TBLPROPERTIES tblProperties] -``` - -* `tableName`: The case sensitive name of the table to create and register. -* `tableElement`: `columnName` `fieldType` `[ NOT NULL ]` - * `columnName`: The case sensitive name of the column. - * `fieldType`: The field's type, specified as one of the following types: - * `simpleType`: `TINYINT`, `SMALLINT`, `INTEGER`, `BIGINT`, `FLOAT`, - `DOUBLE`, `DECIMAL`, `BOOLEAN`, `DATE`, `TIME`, `TIMESTAMP`, `CHAR`, - `VARCHAR`, `BINARY`, `VARBINARY` - * `MAP` - * `ARRAY` - * `ROW` - * `NOT NULL`: Optional. Indicates that the column is not nullable. -* `PARTITIONED BY`: Optional. Specifies partition fields for the table. Supports various partition functions: - * `identity(columnName)`: Identity partitioning - * `bucket(columnName, numBuckets)`: Bucket partitioning - * `truncate(columnName, width)`: Truncate partitioning - * `year(columnName)`, `month(columnName)`, `day(columnName)`, `hour(columnName)`: Time-based partitioning -* `TBLPROPERTIES`: Optional. JSON object with table-specific configuration: - * `triggering_frequency_seconds`: For streaming pipelines, specifies how often to commit snapshots (in seconds). - -### Read Mode - -Beam SQL supports reading from existing Iceberg tables. The connector automatically infers the table schema from the Iceberg table metadata and supports: - -* **Predicate push-down**: Filters are pushed down to the Iceberg scan level for better performance -* **Projection push-down**: Only requested columns are read from the table -* **Schema evolution**: Automatically handles schema changes in the underlying Iceberg table - -### Write Mode - -Beam SQL supports writing to Iceberg tables with the following features: - -* **Automatic table creation**: If the table doesn't exist, it will be created with the specified schema -* **ACID transactions**: All writes are committed as atomic transactions -* **Schema validation**: Ensures data matches the table schema -* **Partitioning**: Supports writing to partitioned tables -* **Streaming support**: For streaming pipelines, commits are performed at regular intervals - -### Schema - -Beam SQL types map to Iceberg types as follows: - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
Beam SQL Type - Iceberg Type -
TINYINT, SMALLINT, INTEGER, BIGINT   - INTEGER, LONG -
FLOAT, DOUBLE - FLOAT, DOUBLE -
DECIMAL - DECIMAL -
BOOLEAN - BOOLEAN -
DATE, TIME, TIMESTAMP - DATE, TIME, TIMESTAMP -
CHAR, VARCHAR - STRING -
BINARY, VARBINARY - BINARY -
ARRAY - LIST -
MAP - MAP -
ROW - STRUCT -
- -### Examples - -#### Basic Table Creation - -```sql -CREATE EXTERNAL TABLE users ( - id BIGINT, - username VARCHAR, - email VARCHAR, - created_at TIMESTAMP -) -TYPE iceberg -``` - -#### Partitioned Table - -```sql -CREATE EXTERNAL TABLE events ( - event_id BIGINT, - user_id BIGINT, - event_type VARCHAR, - event_timestamp TIMESTAMP, - data ROW -) -TYPE iceberg -PARTITIONED BY ( - 'bucket(user_id, 10)', - 'day(event_timestamp)', - 'event_type' -) -``` - -#### Streaming Table with Commit Frequency - -```sql -CREATE EXTERNAL TABLE streaming_events ( - id BIGINT, - message VARCHAR, - timestamp TIMESTAMP -) -TYPE iceberg -TBLPROPERTIES '{"triggering_frequency_seconds": 60}' -``` - ## DataGen The **DataGen** connector allows for creating tables based on in-memory data generation. This is useful for developing and testing queries locally without requiring access to external systems. The DataGen connector is built-in; no additional dependencies are required.It is available for Beam 2.67.0+ diff --git a/website/www/site/layouts/partials/section-menu/en/sdks.html b/website/www/site/layouts/partials/section-menu/en/sdks.html index 45fc937ac1f0..e02dd2494f61 100644 --- a/website/www/site/layouts/partials/section-menu/en/sdks.html +++ b/website/www/site/layouts/partials/section-menu/en/sdks.html @@ -112,6 +112,17 @@
  • Overview
  • Walkthrough
  • Shell
  • +
  • + DDL + +
  • Apache Calcite dialect From 0d6ea9828146cf29cab8e613160222e643c35632 Mon Sep 17 00:00:00 2001 From: Ahmed Abualsaud Date: Fri, 6 Feb 2026 16:49:34 -0500 Subject: [PATCH 3/4] add links --- .../www/site/content/en/documentation/dsls/sql/ddl/create.md | 2 +- website/www/site/content/en/documentation/dsls/sql/ddl/use.md | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/website/www/site/content/en/documentation/dsls/sql/ddl/create.md b/website/www/site/content/en/documentation/dsls/sql/ddl/create.md index 73d44207c390..1b83300b62ab 100644 --- a/website/www/site/content/en/documentation/dsls/sql/ddl/create.md +++ b/website/www/site/content/en/documentation/dsls/sql/ddl/create.md @@ -25,7 +25,7 @@ The **CREATE** command serves two potential functions depending on the connector (e.g. namespace or table) in the underlying storage. _**Note**: Creating a catalog or database does not automatically switch to it. Remember -to run [USE](TODO:LINK-TO-USE) afterwards to set it as a default._ +to run [USE](/use) afterwards to set it as a default._ ## `CREATE CATALOG` Registers a new catalog instance. diff --git a/website/www/site/content/en/documentation/dsls/sql/ddl/use.md b/website/www/site/content/en/documentation/dsls/sql/ddl/use.md index 279b0e4ea2e7..8ff22470598f 100644 --- a/website/www/site/content/en/documentation/dsls/sql/ddl/use.md +++ b/website/www/site/content/en/documentation/dsls/sql/ddl/use.md @@ -22,7 +22,7 @@ The **USE** statement sets the active catalog or database for the current sessio This simplifies queries by allowing you to reference tables directly (e.g., `orders`), avoiding the need for fully qualified names (e.g., `prod_catalog.sales_db.orders`). -***Tip**: the SHOW CURRENT statement helps verify what the currently active entity is.* +***Tip**: the [SHOW CURRENT](/show) statement helps verify what the current context is.* ## USE CATALOG Switches the current session's active Catalog. From aa5d7018a69e383cd5114c46492a3df71ec746a7 Mon Sep 17 00:00:00 2001 From: Ahmed Abualsaud Date: Sun, 8 Feb 2026 15:10:20 -0500 Subject: [PATCH 4/4] spotless --- .../sql/meta/provider/iceberg/IcebergMetastore.java | 12 +++++++----- .../beam/sdk/io/iceberg/IcebergCatalogConfig.java | 11 +++++++++-- 2 files changed, 16 insertions(+), 7 deletions(-) diff --git a/sdks/java/extensions/sql/iceberg/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/iceberg/IcebergMetastore.java b/sdks/java/extensions/sql/iceberg/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/iceberg/IcebergMetastore.java index 72e8988f41e2..246b305bf92e 100644 --- a/sdks/java/extensions/sql/iceberg/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/iceberg/IcebergMetastore.java +++ b/sdks/java/extensions/sql/iceberg/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/iceberg/IcebergMetastore.java @@ -20,11 +20,10 @@ import static org.apache.beam.sdk.util.Preconditions.checkStateNotNull; import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkArgument; +import com.fasterxml.jackson.core.type.TypeReference; import java.util.HashMap; import java.util.Map; import java.util.stream.Collectors; - -import com.fasterxml.jackson.core.type.TypeReference; import org.apache.beam.sdk.extensions.sql.TableUtils; import org.apache.beam.sdk.extensions.sql.impl.TableName; import org.apache.beam.sdk.extensions.sql.meta.BeamSqlTable; @@ -62,9 +61,12 @@ public void createTable(Table table) { getProvider(table.getType()).createTable(table); } else { String identifier = getIdentifier(table); - Map props = TableUtils.getObjectMapper() - .convertValue(table.getProperties(), new TypeReference>() {}).entrySet() - .stream().filter(p -> !p.getKey().startsWith("beam.")).collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)); + Map props = + TableUtils.getObjectMapper() + .convertValue(table.getProperties(), new TypeReference>() {}) + .entrySet().stream() + .filter(p -> !p.getKey().startsWith("beam.")) + .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)); try { catalogConfig.createTable(identifier, table.getSchema(), table.getPartitionFields(), props); } catch (TableAlreadyExistsException e) { diff --git a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergCatalogConfig.java b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergCatalogConfig.java index e7ee6dfb94ef..d461e5feb033 100644 --- a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergCatalogConfig.java +++ b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergCatalogConfig.java @@ -143,7 +143,10 @@ public boolean dropNamespace(String namespace, boolean cascade) { } public void createTable( - String tableIdentifier, Schema tableSchema, @Nullable List partitionFields, @Nullable Map properties) { + String tableIdentifier, + Schema tableSchema, + @Nullable List partitionFields, + @Nullable Map properties) { TableIdentifier icebergIdentifier = TableIdentifier.parse(tableIdentifier); org.apache.iceberg.Schema icebergSchema = IcebergUtils.beamSchemaToIcebergSchema(tableSchema); PartitionSpec icebergSpec = PartitionUtils.toPartitionSpec(partitionFields, tableSchema); @@ -153,7 +156,11 @@ public void createTable( icebergIdentifier, icebergSchema, icebergSpec); - catalog().createTable(icebergIdentifier, icebergSchema, icebergSpec, properties); + if (properties != null) { + catalog().createTable(icebergIdentifier, icebergSchema, icebergSpec, properties); + } else { + catalog().createTable(icebergIdentifier, icebergSchema, icebergSpec); + } LOG.info("Successfully created table '{}'.", icebergIdentifier); } catch (AlreadyExistsException e) { throw new TableAlreadyExistsException(e);