diff --git a/sdks/java/extensions/sql/iceberg/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/iceberg/IcebergMetastore.java b/sdks/java/extensions/sql/iceberg/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/iceberg/IcebergMetastore.java index b73aa25c7a2b..246b305bf92e 100644 --- a/sdks/java/extensions/sql/iceberg/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/iceberg/IcebergMetastore.java +++ b/sdks/java/extensions/sql/iceberg/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/iceberg/IcebergMetastore.java @@ -20,8 +20,10 @@ import static org.apache.beam.sdk.util.Preconditions.checkStateNotNull; import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkArgument; +import com.fasterxml.jackson.core.type.TypeReference; import java.util.HashMap; import java.util.Map; +import java.util.stream.Collectors; import org.apache.beam.sdk.extensions.sql.TableUtils; import org.apache.beam.sdk.extensions.sql.impl.TableName; import org.apache.beam.sdk.extensions.sql.meta.BeamSqlTable; @@ -59,8 +61,14 @@ public void createTable(Table table) { getProvider(table.getType()).createTable(table); } else { String identifier = getIdentifier(table); + Map props = + TableUtils.getObjectMapper() + .convertValue(table.getProperties(), new TypeReference>() {}) + .entrySet().stream() + .filter(p -> !p.getKey().startsWith("beam.")) + .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)); try { - catalogConfig.createTable(identifier, table.getSchema(), table.getPartitionFields()); + catalogConfig.createTable(identifier, table.getSchema(), table.getPartitionFields(), props); } catch (TableAlreadyExistsException e) { LOG.info( "Iceberg table '{}' already exists at location '{}'.", table.getName(), identifier); diff --git a/sdks/java/extensions/sql/iceberg/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/iceberg/IcebergTable.java b/sdks/java/extensions/sql/iceberg/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/iceberg/IcebergTable.java index b68aa34a1777..e104b21bf132 100644 --- a/sdks/java/extensions/sql/iceberg/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/iceberg/IcebergTable.java +++ b/sdks/java/extensions/sql/iceberg/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/iceberg/IcebergTable.java @@ -19,6 +19,7 @@ import static org.apache.beam.sdk.util.Preconditions.checkStateNotNull; +import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.node.ObjectNode; import java.util.HashMap; import java.util.List; @@ -56,6 +57,8 @@ class IcebergTable extends SchemaBaseBeamTable { @VisibleForTesting static final String CATALOG_PROPERTIES_FIELD = "catalog_properties"; @VisibleForTesting static final String HADOOP_CONFIG_PROPERTIES_FIELD = "config_properties"; @VisibleForTesting static final String CATALOG_NAME_FIELD = "catalog_name"; + static final String BEAM_WRITE_PROPERTY = "beam.write."; + static final String BEAM_READ_PROPERTY = "beam.read."; @VisibleForTesting static final String TRIGGERING_FREQUENCY_FIELD = "triggering_frequency_seconds"; @@ -71,9 +74,21 @@ class IcebergTable extends SchemaBaseBeamTable { this.tableIdentifier = tableIdentifier; this.catalogConfig = catalogConfig; ObjectNode properties = table.getProperties(); - if (properties.has(TRIGGERING_FREQUENCY_FIELD)) { - this.triggeringFrequency = properties.get(TRIGGERING_FREQUENCY_FIELD).asInt(); + for (Map.Entry property : properties.properties()) { + String name = property.getKey(); + if (name.startsWith(BEAM_WRITE_PROPERTY)) { + String prop = name.substring(BEAM_WRITE_PROPERTY.length()); + if (prop.equals(TRIGGERING_FREQUENCY_FIELD)) { + this.triggeringFrequency = property.getValue().asInt(); + } else { + throw new IllegalArgumentException("Unknown Beam write property: " + name); + } + } else if (name.startsWith(BEAM_READ_PROPERTY)) { + // none supported yet + throw new IllegalArgumentException("Unknown Beam read property: " + name); + } } + this.partitionFields = table.getPartitionFields(); } diff --git a/sdks/java/extensions/sql/iceberg/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/iceberg/PubsubToIcebergIT.java b/sdks/java/extensions/sql/iceberg/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/iceberg/PubsubToIcebergIT.java index 900fdae743a1..8b250af2754a 100644 --- a/sdks/java/extensions/sql/iceberg/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/iceberg/PubsubToIcebergIT.java +++ b/sdks/java/extensions/sql/iceberg/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/iceberg/PubsubToIcebergIT.java @@ -156,7 +156,7 @@ public void testSimpleInsertWithPartitionedFields() throws Exception { + ") \n" + "TYPE 'iceberg' \n" + "PARTITIONED BY('id', 'truncate(name, 3)') \n" - + "TBLPROPERTIES '{ \"triggering_frequency_seconds\" : 10 }'"; + + "TBLPROPERTIES '{ \"beam.write.triggering_frequency_seconds\" : 10 }'"; String insertStatement = format("INSERT INTO %s \n", tableIdentifier) + "SELECT \n" @@ -211,7 +211,7 @@ public void testSimpleInsertFlat() throws Exception { + " name VARCHAR \n " + ") \n" + "TYPE 'iceberg' \n" - + "TBLPROPERTIES '{ \"triggering_frequency_seconds\" : 10 }'"; + + "TBLPROPERTIES '{ \"beam.write.triggering_frequency_seconds\" : 10 }'"; String insertStatement = format("INSERT INTO %s \n", tableIdentifier) + "SELECT \n" diff --git a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergCatalogConfig.java b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergCatalogConfig.java index 7603e2c6259f..d461e5feb033 100644 --- a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergCatalogConfig.java +++ b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergCatalogConfig.java @@ -143,7 +143,10 @@ public boolean dropNamespace(String namespace, boolean cascade) { } public void createTable( - String tableIdentifier, Schema tableSchema, @Nullable List partitionFields) { + String tableIdentifier, + Schema tableSchema, + @Nullable List partitionFields, + @Nullable Map properties) { TableIdentifier icebergIdentifier = TableIdentifier.parse(tableIdentifier); org.apache.iceberg.Schema icebergSchema = IcebergUtils.beamSchemaToIcebergSchema(tableSchema); PartitionSpec icebergSpec = PartitionUtils.toPartitionSpec(partitionFields, tableSchema); @@ -153,7 +156,11 @@ public void createTable( icebergIdentifier, icebergSchema, icebergSpec); - catalog().createTable(icebergIdentifier, icebergSchema, icebergSpec); + if (properties != null) { + catalog().createTable(icebergIdentifier, icebergSchema, icebergSpec, properties); + } else { + catalog().createTable(icebergIdentifier, icebergSchema, icebergSpec); + } LOG.info("Successfully created table '{}'.", icebergIdentifier); } catch (AlreadyExistsException e) { throw new TableAlreadyExistsException(e); diff --git a/website/www/site/content/en/documentation/dsls/sql/ddl/alter.md b/website/www/site/content/en/documentation/dsls/sql/ddl/alter.md new file mode 100644 index 000000000000..d2e04037735a --- /dev/null +++ b/website/www/site/content/en/documentation/dsls/sql/ddl/alter.md @@ -0,0 +1,75 @@ +--- +type: languages +title: "Beam SQL DDL: Alter" +--- + + +# ALTER statements + +The **ALTER** statement modifies the definition of an existing Catalog or Table. +For supported tables (like Iceberg), this enables **schema and partition evolution**. + +## ALTER CATALOG +Modifies an existing catalog's properties. + +```sql +ALTER CATALOG catalog_name + [ SET ( 'key' = 'val', ... ) ] + [ RESET ( 'key', ... ) ] +``` +- **SET**: Adds new properties or updates existing ones. +- **RESET** / **UNSET**: Removes properties. + +## ALTER TABLE +Modifies an existing table's properties and evolves its partition and schema. + +```sql +ALTER TABLE table_name + [ ADD COLUMNS ( col_def, ... ) ] + [ DROP COLUMNS ( col_name, ... ) ] + [ ADD PARTITIONS ( partition_field, ... ) ] + [ DROP PARTITIONS ( partition_field, ... ) ] + [ SET ( 'key' = 'val', ... ) ] + [ ( RESET | UNSET ) ( 'key', ... ) ]; +``` + +*Example 1: Add or remove columns* +```sql +-- Add columns +ALTER TABLE orders ADD COLUMNS ( + customer_email VARCHAR, + shipping_region VARCHAR +); + +-- Drop columns +ALTER TABLE orders DROP COLUMNS ( customer_email ); +``` + +*Example 2: Modify partition spec* +```sql +-- Add a partition field +ALTER TABLE orders ADD PARTITIONS ( 'year(order_date)' ); + +-- Remove a partition field +ALTER TABLE orders DROP PARTITIONS ( 'region_id' ); +``` + +*Example 3: Modify table properties* +```sql +ALTER TABLE orders SET ( + 'write.format.default' = 'orc', + 'write.metadata.metrics.default' = 'full' ); +ALTER TABLE orders RESET ( 'write.target-file-size-bytes' ); +``` diff --git a/website/www/site/content/en/documentation/dsls/sql/ddl/create.md b/website/www/site/content/en/documentation/dsls/sql/ddl/create.md new file mode 100644 index 000000000000..1b83300b62ab --- /dev/null +++ b/website/www/site/content/en/documentation/dsls/sql/ddl/create.md @@ -0,0 +1,120 @@ +--- +type: languages +title: "Beam SQL DDL: Create" +--- + + +# CREATE statements + +The **CREATE** command serves two potential functions depending on the connector: + +- **Registration**: By default, it registers an existing external entity in the Beam SQL session. +- **Instantiation**: For supported connectors (e.g., Iceberg), it physically creates the entity +(e.g. namespace or table) in the underlying storage. + +_**Note**: Creating a catalog or database does not automatically switch to it. Remember +to run [USE](/use) afterwards to set it as a default._ + +## `CREATE CATALOG` +Registers a new catalog instance. + +```sql +CREATE CATALOG [ IF NOT EXISTS ] catalog_name +TYPE 'type_name' +[ PROPERTIES ( 'key' = 'value' [, ...] ) ] +``` + +_**Example**: Creating a Hadoop Catalog (Local Storage)_ +```sql +CREATE CATALOG local_catalog +TYPE iceberg +PROPERTIES ( + 'type' = 'hadoop', + 'warehouse' = 'file:///tmp/iceberg-warehouse' +) +``` + +_**Example**: Registering a BigLake Catalog (GCS)_ +```sql +CREATE CATALOG prod_iceberg +TYPE iceberg +PROPERTIES ( + 'type' = 'rest', + 'uri' = 'https://biglake.googleapis.com/iceberg/v1/restcatalog', + 'warehouse' = 'gs://my-company-bucket/warehouse', + 'header.x-goog-user-project' = 'my_prod_project', + 'rest.auth.type' = 'org.apache.iceberg.gcp.auth.GoogleAuthManager', + 'io-impl' = 'org.apache.iceberg.gcp.gcs.GCSFileIO', + 'rest-metrics-reporting-enabled' = 'false' +); +``` + +### `CREATE DATABASE` +Creates a new Database within the current Catalog (default), or the specified Catalog. +```sql +CREATE DATABASE [ IF NOT EXISTS ] [ catalog_name. ]database_name; +``` + +_**Example**: Create a database in the current active catalog_ +```sql +USE CATALOG my_catalog; +CREATE DATABASE sales_data; +``` + +_**Example**: Create a database in a specified catalog (must be registered)_ +```sql +CREATE DATABASE other_catalog.sales_data; +``` + +### `CREATE TABLE` +Creates a table within the currently active catalog and database. If the table name is fully qualified, the referenced database and catalog is used. + +```sql +CREATE EXTERNAL TABLE [ IF NOT EXISTS ] [ catalog. ][ db. ]table_name ( + col_name col_type [ NOT NULL ] [ COMMENT 'col_comment' ], + ... +) +TYPE 'type_name' +[ PARTITIONED BY ( 'partition_field' [, ... ] ) ] +[ COMMENT 'table_comment' ] +[ LOCATION 'location_uri' ] +[ TBLPROPERTIES 'properties_json_string' ]; +``` +- **TYPE**: the table type (e.g. `'iceberg'`, `'text'`, `'kafka'`) +- **PARTITIONED BY**: an ordered list of fields describing the partition spec. +- **LOCATION**: explicitly sets the location of the table (overriding the inferred `catalog.db.table_name` location) +- **TBLPROPERTIES**: configuration properties used when creating the table or setting up its IO connection. + +_**Example**: Creating an Iceberg Table_ +```sql +CREATE EXTERNAL TABLE prod_iceberg.sales_data.orders ( + order_id BIGINT NOT NULL COMMENT 'Unique order identifier', + amount DECIMAL(10, 2), + order_date TIMESTAMP, + region_id VARCHAR +) +TYPE 'iceberg' +PARTITIONED BY ( 'region_id', 'day(order_date)' ) +COMMENT 'Daily sales transactions' +TBLPROPERTIES '{ + "write.format.default": "parquet", + "read.split.target-size": 268435456", + "beam.write.triggering_frequency_seconds": 60" +}'; +``` +- This creates an Iceberg table named `orders` under the namespace `sales_data`, within the `prod_iceberg` catalog. +- The table is partitioned by `region_id`, then by the day value of `order_date` (using Iceberg's [hidden partitioning](https://iceberg.apache.org/docs/latest/partitioning/#icebergs-hidden-partitioning)). +- The table is created with the appropriate properties `"write.format.default"` and `"read.split.target-size"`. The Beam property `"beam.write.triggering_frequency_seconds"` +- Beam properties (prefixed with `"beam.write."` and `"beam.read."` are intended for the relevant IOs) diff --git a/website/www/site/content/en/documentation/dsls/sql/ddl/drop.md b/website/www/site/content/en/documentation/dsls/sql/ddl/drop.md new file mode 100644 index 000000000000..fb6eab45cf37 --- /dev/null +++ b/website/www/site/content/en/documentation/dsls/sql/ddl/drop.md @@ -0,0 +1,50 @@ +--- +type: languages +title: "Beam SQL DDL: Drop" +--- + + +# DROP statements +The **DROP** command serves two potential functions depending on the connector: + +- **Unregistration**: unregisters an entity from the current Beam SQL session. +- **Deletion**: For supported connectors (like **Iceberg**), it **physically deletes** the entity + (e.g. namespace or table) in the underlying storage. + + **Caution:** Physical deletion can be permanent + +## DROP CATALOG +Unregisters a catalog from Beam SQL. This does not destroy external data, only the link within the SQL session. + +```sql +DROP CATALOG [ IF EXISTS ] catalog_name; +``` + +## DROP DATABASE +Unregisters a database from the current session. For supported connectors, this +will also **delete** the database from the external data source. + +```sql +DROP DATABASE [ IF EXISTS ] database_name [ RESTRICT | CASCADE ]; +``` +- **RESTRICT** (Default): Fails if the database is not empty. +- **CASCADE**: Drops the database and all tables contained within it. **Use with caution.** + +## DROP TABLE +Unregisters a table from the current session. For supported connectors, this +will also **delete** the table from the external data source. +```sql +DROP TABLE [ IF EXISTS ] table_name; +``` diff --git a/website/www/site/content/en/documentation/dsls/sql/ddl/overview.md b/website/www/site/content/en/documentation/dsls/sql/ddl/overview.md new file mode 100644 index 000000000000..420408d431bb --- /dev/null +++ b/website/www/site/content/en/documentation/dsls/sql/ddl/overview.md @@ -0,0 +1,53 @@ +--- +type: languages +title: "Beam SQL DDL Overview" +--- + + +# Beam SQL DDL Overview + +Beam SQL provides a robust hierarchy for managing metadata for external data sources. Instead of treating all tables as flat objects, Beam SQL utilizes a three-tier namespace system: +1. Catalog: The highest level container (e.g. a Glue Catalog connected to S3 or a BigLake Catalog connected to GCS). +2. Database: A logical namespace within a Catalog (often maps to "namespace" in systems like Iceberg). +3. Table: The actual data entity containing schema and rows. + +This structure allows users to connect to multiple disparate systems (e.g., a production BigQuery catalog and a dev Iceberg catalog) simultaneously and switch contexts seamlessly. +It also allows interactions between these systems via cross-catalog queries. + +Click below to learn about metadata management at each level: + +## Catalogs +The Catalog is the entry point for external metadata. When you initialize Beam SQL, you start off with a `default` catalog that contains a `default` database. +You can register new catalogs, switch between them, and modify their configurations. + +### [CREATE](/create#CREATE-CATALOG) +### [USE](/use#USE-CATALOG) +### [DROP](/drop#DROP-CATALOG) +### [ALTER](/alter#ALTER-CATALOG) +### [SHOW](/show#SHOW-CATALOGS) + +## Databases + +### [CREATE](/create#CREATE-DATABASE) +### [USE](/use#USE-DATABASE) +### [DROP](/drop#DROP-DATABASE) +### [SHOW](/show#SHOW-DATABASES) + +## Tables + +### [CREATE](/create#CREATE-TABLE) +### [DROP](/drop#DROP-TABLE) +### [ALTER](/alter#ALTER-TABLE) +### [SHOW](/show#SHOW-TABLES) diff --git a/website/www/site/content/en/documentation/dsls/sql/ddl/show.md b/website/www/site/content/en/documentation/dsls/sql/ddl/show.md new file mode 100644 index 000000000000..167b96d164e3 --- /dev/null +++ b/website/www/site/content/en/documentation/dsls/sql/ddl/show.md @@ -0,0 +1,83 @@ +--- +type: languages +title: "Beam SQL DDL: Show" +--- + + +# SHOW statements + +The **SHOW** statement are used to list objects within a parent container +(e.g. databases within a catalog), or inspect the currently active context. +Results can be filtered using regex patterns. + +## SHOW CATALOGS +Lists all registered catalogs (name and type). Supports regex filtering. + +_**Example**: List all catalogs_ +```sql +SHOW CATALOGS; +``` + +_**Example**: List catalogs matching a pattern_ +```sql +SHOW CATALOGS LIKE 'prod_%'; +``` + +_**Example**: Verify which catalog is currently active_ +```sql +SHOW CURRENT CATALOG; +``` + +## SHOW DATABASES +Lists databases under the currently active catalog, or a specified catalog. + +_**Example**: List databases in the currently active catalog_ +```sql +SHOW DATABASES; +``` + +_**Example**: List databases in a specified catalog_ +```sql +SHOW DATABASES IN my_catalog; +``` + +_**Example**: List databases matching a pattern_ +```sql +SHOW DATABASES IN my_catalog LIKE '%geo%'; +``` + +_**Example**: Verify which database is currently active_ +```sql +SHOW CURRENT DATABASE; +``` + +## SHOW TABLES +Lists tables under the currently active database, or a specified database. + +_**Example**: List tables in the currently active database_ +```sql +SHOW TABLES; +``` + +_**Example**: List databases in a specified database_ +```sql +SHOW TABLES IN my_db; +SHOW TABLES IN my_catalog.my_db; +``` + +_**Example**: List databases matching a pattern_ +```sql +SHOW TABLES IN my_db LIKE '%orders%'; +``` diff --git a/website/www/site/content/en/documentation/dsls/sql/ddl/use.md b/website/www/site/content/en/documentation/dsls/sql/ddl/use.md new file mode 100644 index 000000000000..8ff22470598f --- /dev/null +++ b/website/www/site/content/en/documentation/dsls/sql/ddl/use.md @@ -0,0 +1,48 @@ +--- +type: languages +title: "Beam SQL DDL: Use" +--- + + +# USE statements + +The **USE** statement sets the active catalog or database for the current session. +This simplifies queries by allowing you to reference tables directly (e.g., `orders`), +avoiding the need for fully qualified names (e.g., `prod_catalog.sales_db.orders`). + +***Tip**: the [SHOW CURRENT](/show) statement helps verify what the current context is.* + +## USE CATALOG +Switches the current session's active Catalog. + +_**Note:** All subsequent DATABASE and TABLE commands will be executed under this Catalog, unless fully qualified._ + +```sql +USE CATALOG prod_iceberg; +``` + +## USE DATABASE +Switches the current session's active Database. + +_**Note:** All subsequent TABLE commands will be executed under this Database, unless fully qualified._ + +```sql +USE DATABASE sales_data; +``` + +Switch to a database in a specified catalog (_**Note:** this also switches the default to that catalog_): +```sql +USE DATABASE other_catalog.sales_data; +``` diff --git a/website/www/site/layouts/partials/section-menu/en/sdks.html b/website/www/site/layouts/partials/section-menu/en/sdks.html index 45fc937ac1f0..e02dd2494f61 100644 --- a/website/www/site/layouts/partials/section-menu/en/sdks.html +++ b/website/www/site/layouts/partials/section-menu/en/sdks.html @@ -112,6 +112,17 @@
  • Overview
  • Walkthrough
  • Shell
  • +
  • + DDL + +
  • Apache Calcite dialect