Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions src/sql-lexer/src/keywords.txt
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ Bpchar
Broken
Broker
Brokers
Bucket
By
Bytes
Capture
Expand Down Expand Up @@ -492,6 +493,7 @@ Transactional
Transform
Trim
True
Truncate
Tunnel
Type
Types
Expand Down Expand Up @@ -521,6 +523,7 @@ Verbose
Version
View
Views
Void
Wait
Warehouse
Warning
Expand Down
73 changes: 73 additions & 0 deletions src/sql-parser/src/ast/defs/ddl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -725,6 +725,72 @@ impl AstDisplay for IcebergSinkMode {
}
impl_display!(IcebergSinkMode);

/// An Iceberg partition transform applied to a source column.
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
pub enum IcebergPartitionTransform {
Identity,
Year,
Month,
Day,
Hour,
Void,
Bucket(u32),
Truncate(u32),
}

impl fmt::Display for IcebergPartitionTransform {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
Self::Identity => f.write_str("identity"),
Self::Year => f.write_str("year"),
Self::Month => f.write_str("month"),
Self::Day => f.write_str("day"),
Self::Hour => f.write_str("hour"),
Self::Void => f.write_str("void"),
Self::Bucket(_) => f.write_str("bucket"),
Self::Truncate(_) => f.write_str("truncate"),
}
}
}

/// A single field in a `PARTITION BY` clause for an Iceberg sink.
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
pub struct IcebergPartitionField {
pub column: Ident,
pub transform: IcebergPartitionTransform,
}

impl AstDisplay for IcebergPartitionField {
fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
match &self.transform {
IcebergPartitionTransform::Identity => {
f.write_node(&self.column);
}
IcebergPartitionTransform::Bucket(n) => {
f.write_str("bucket(");
f.write_str(&n.to_string());
f.write_str(", ");
f.write_node(&self.column);
f.write_str(")");
}
IcebergPartitionTransform::Truncate(w) => {
f.write_str("truncate(");
f.write_str(&w.to_string());
f.write_str(", ");
f.write_node(&self.column);
f.write_str(")");
}
other => {
f.write_str(&other.to_string());
f.write_str("(");
f.write_node(&self.column);
f.write_str(")");
}
}
}
}
impl_display!(IcebergPartitionField);

#[derive(Debug, Clone, PartialEq, Eq, Hash)]
pub enum SubscribeOutput<T: AstInfo> {
Diffs,
Expand Down Expand Up @@ -1563,6 +1629,7 @@ pub enum CreateSinkConnection<T: AstInfo> {
connection: T::ItemName,
aws_connection: T::ItemName,
key: Option<SinkKey>,
partition_by: Option<Vec<IcebergPartitionField>>,
options: Vec<IcebergSinkConfigOption<T>>,
},
}
Expand Down Expand Up @@ -1596,6 +1663,7 @@ impl<T: AstInfo> AstDisplay for CreateSinkConnection<T> {
connection,
aws_connection,
key,
partition_by,
options,
} => {
f.write_str("ICEBERG CATALOG CONNECTION ");
Expand All @@ -1611,6 +1679,11 @@ impl<T: AstInfo> AstDisplay for CreateSinkConnection<T> {
f.write_str(" ");
f.write_node(key);
}
if let Some(fields) = partition_by {
f.write_str(" PARTITION BY (");
f.write_node(&display::comma_separated(fields));
f.write_str(")");
}
}
}
}
Expand Down
59 changes: 59 additions & 0 deletions src/sql-parser/src/parser.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2644,6 +2644,55 @@ impl<'a> Parser<'a> {
})
}

/// Parse a single Iceberg partition field: `col`, `day(col)`, or `bucket(N, col)`.
fn parse_iceberg_partition_field(&mut self) -> Result<IcebergPartitionField, ParserError> {
let ident = self.parse_identifier()?;

if !self.consume_token(&Token::LParen) {
return Ok(IcebergPartitionField {
column: ident,
transform: IcebergPartitionTransform::Identity,
});
}

let name = ident.as_str().to_lowercase();
let transform = match name.as_str() {
"identity" => IcebergPartitionTransform::Identity,
"year" => IcebergPartitionTransform::Year,
"month" => IcebergPartitionTransform::Month,
"day" => IcebergPartitionTransform::Day,
"hour" => IcebergPartitionTransform::Hour,
"void" => IcebergPartitionTransform::Void,
"bucket" | "truncate" => {
let n = self.parse_literal_uint()?;
let n = u32::try_from(n).map_err(|_| {
self.error(
self.peek_prev_pos(),
format!("{name} parameter must fit in u32, got {n}"),
)
})?;
self.expect_token(&Token::Comma)?;
if name == "bucket" {
IcebergPartitionTransform::Bucket(n)
} else {
IcebergPartitionTransform::Truncate(n)
}
}
other => {
return Err(self.error(
self.peek_prev_pos(),
format!(
"unknown Iceberg partition transform '{other}', \
expected identity, year, month, day, hour, void, bucket, or truncate"
),
));
}
};
let column = self.parse_identifier()?;
self.expect_token(&Token::RParen)?;
Ok(IcebergPartitionField { column, transform })
}

fn parse_kafka_sink_config_option(
&mut self,
) -> Result<KafkaSinkConfigOption<Raw>, ParserError> {
Expand Down Expand Up @@ -3854,10 +3903,20 @@ impl<'a> Parser<'a> {
None
};

let partition_by = if self.parse_keywords(&[PARTITION, BY]) {
self.expect_token(&Token::LParen)?;
let fields = self.parse_comma_separated(Parser::parse_iceberg_partition_field)?;
self.expect_token(&Token::RParen)?;
Some(fields)
} else {
None
};

Ok(CreateSinkConnection::Iceberg {
connection,
aws_connection,
key,
partition_by,
options,
})
}
Expand Down
48 changes: 45 additions & 3 deletions src/sql-parser/tests/testdata/ddl
Original file line number Diff line number Diff line change
Expand Up @@ -919,14 +919,14 @@ CREATE SINK bar FROM foo INTO ICEBERG CATALOG CONNECTION s3tables (NAMESPACE = '
----
CREATE SINK bar FROM foo INTO ICEBERG CATALOG CONNECTION s3tables (NAMESPACE = 'testnamespace', TABLE = 'daily_sales') USING AWS CONNECTION aws_conn KEY (a) NOT ENFORCED MODE UPSERT
=>
CreateSink(CreateSinkStatement { name: Some(UnresolvedItemName([Ident("bar")])), in_cluster: None, if_not_exists: false, from: Name(UnresolvedItemName([Ident("foo")])), connection: Iceberg { connection: Name(UnresolvedItemName([Ident("s3tables")])), aws_connection: Name(UnresolvedItemName([Ident("aws_conn")])), key: Some(SinkKey { key_columns: [Ident("a")], not_enforced: true }), options: [IcebergSinkConfigOption { name: Namespace, value: Some(Value(String("testnamespace"))) }, IcebergSinkConfigOption { name: Table, value: Some(Value(String("daily_sales"))) }] }, format: None, envelope: None, mode: Some(Upsert), with_options: [] })
CreateSink(CreateSinkStatement { name: Some(UnresolvedItemName([Ident("bar")])), in_cluster: None, if_not_exists: false, from: Name(UnresolvedItemName([Ident("foo")])), connection: Iceberg { connection: Name(UnresolvedItemName([Ident("s3tables")])), aws_connection: Name(UnresolvedItemName([Ident("aws_conn")])), key: Some(SinkKey { key_columns: [Ident("a")], not_enforced: true }), partition_by: None, options: [IcebergSinkConfigOption { name: Namespace, value: Some(Value(String("testnamespace"))) }, IcebergSinkConfigOption { name: Table, value: Some(Value(String("daily_sales"))) }] }, format: None, envelope: None, mode: Some(Upsert), with_options: [] })

parse-statement
CREATE SINK bar FROM foo INTO ICEBERG CATALOG CONNECTION s3tables (NAMESPACE = 'testnamespace', TABLE = 'daily_sales') USING AWS CONNECTION aws_conn KEY (a) MODE UPSERT;
----
CREATE SINK bar FROM foo INTO ICEBERG CATALOG CONNECTION s3tables (NAMESPACE = 'testnamespace', TABLE = 'daily_sales') USING AWS CONNECTION aws_conn KEY (a) MODE UPSERT
=>
CreateSink(CreateSinkStatement { name: Some(UnresolvedItemName([Ident("bar")])), in_cluster: None, if_not_exists: false, from: Name(UnresolvedItemName([Ident("foo")])), connection: Iceberg { connection: Name(UnresolvedItemName([Ident("s3tables")])), aws_connection: Name(UnresolvedItemName([Ident("aws_conn")])), key: Some(SinkKey { key_columns: [Ident("a")], not_enforced: false }), options: [IcebergSinkConfigOption { name: Namespace, value: Some(Value(String("testnamespace"))) }, IcebergSinkConfigOption { name: Table, value: Some(Value(String("daily_sales"))) }] }, format: None, envelope: None, mode: Some(Upsert), with_options: [] })
CreateSink(CreateSinkStatement { name: Some(UnresolvedItemName([Ident("bar")])), in_cluster: None, if_not_exists: false, from: Name(UnresolvedItemName([Ident("foo")])), connection: Iceberg { connection: Name(UnresolvedItemName([Ident("s3tables")])), aws_connection: Name(UnresolvedItemName([Ident("aws_conn")])), key: Some(SinkKey { key_columns: [Ident("a")], not_enforced: false }), partition_by: None, options: [IcebergSinkConfigOption { name: Namespace, value: Some(Value(String("testnamespace"))) }, IcebergSinkConfigOption { name: Table, value: Some(Value(String("daily_sales"))) }] }, format: None, envelope: None, mode: Some(Upsert), with_options: [] })

parse-statement
CREATE SINK bar FROM foo INTO ICEBERG CATALOG CONNECTION s3tables (BLAH = 'boo!') USING AWS CONNECTION aws_conn MODE UPSERT;
Expand All @@ -940,7 +940,49 @@ CREATE SINK bar FROM foo INTO ICEBERG CATALOG CONNECTION s3tables (NAMESPACE = '
----
CREATE SINK bar FROM foo INTO ICEBERG CATALOG CONNECTION s3tables (NAMESPACE = 'testnamespace', TABLE = 'daily_sales') USING AWS CONNECTION aws_conn MODE APPEND
=>
CreateSink(CreateSinkStatement { name: Some(UnresolvedItemName([Ident("bar")])), in_cluster: None, if_not_exists: false, from: Name(UnresolvedItemName([Ident("foo")])), connection: Iceberg { connection: Name(UnresolvedItemName([Ident("s3tables")])), aws_connection: Name(UnresolvedItemName([Ident("aws_conn")])), key: None, options: [IcebergSinkConfigOption { name: Namespace, value: Some(Value(String("testnamespace"))) }, IcebergSinkConfigOption { name: Table, value: Some(Value(String("daily_sales"))) }] }, format: None, envelope: None, mode: Some(Append), with_options: [] })
CreateSink(CreateSinkStatement { name: Some(UnresolvedItemName([Ident("bar")])), in_cluster: None, if_not_exists: false, from: Name(UnresolvedItemName([Ident("foo")])), connection: Iceberg { connection: Name(UnresolvedItemName([Ident("s3tables")])), aws_connection: Name(UnresolvedItemName([Ident("aws_conn")])), key: None, partition_by: None, options: [IcebergSinkConfigOption { name: Namespace, value: Some(Value(String("testnamespace"))) }, IcebergSinkConfigOption { name: Table, value: Some(Value(String("daily_sales"))) }] }, format: None, envelope: None, mode: Some(Append), with_options: [] })

parse-statement
CREATE SINK bar FROM foo INTO ICEBERG CATALOG CONNECTION s3tables (NAMESPACE = 'testnamespace', TABLE = 'daily_sales') USING AWS CONNECTION aws_conn PARTITION BY (region) MODE APPEND;
----
CREATE SINK bar FROM foo INTO ICEBERG CATALOG CONNECTION s3tables (NAMESPACE = 'testnamespace', TABLE = 'daily_sales') USING AWS CONNECTION aws_conn PARTITION BY (region) MODE APPEND
=>
CreateSink(CreateSinkStatement { name: Some(UnresolvedItemName([Ident("bar")])), in_cluster: None, if_not_exists: false, from: Name(UnresolvedItemName([Ident("foo")])), connection: Iceberg { connection: Name(UnresolvedItemName([Ident("s3tables")])), aws_connection: Name(UnresolvedItemName([Ident("aws_conn")])), key: None, partition_by: Some([IcebergPartitionField { column: Ident("region"), transform: Identity }]), options: [IcebergSinkConfigOption { name: Namespace, value: Some(Value(String("testnamespace"))) }, IcebergSinkConfigOption { name: Table, value: Some(Value(String("daily_sales"))) }] }, format: None, envelope: None, mode: Some(Append), with_options: [] })

parse-statement
CREATE SINK bar FROM foo INTO ICEBERG CATALOG CONNECTION s3tables (NAMESPACE = 'testnamespace', TABLE = 'daily_sales') USING AWS CONNECTION aws_conn KEY (a) PARTITION BY (day(event_ts), bucket(16, user_id)) MODE UPSERT;
----
CREATE SINK bar FROM foo INTO ICEBERG CATALOG CONNECTION s3tables (NAMESPACE = 'testnamespace', TABLE = 'daily_sales') USING AWS CONNECTION aws_conn KEY (a) PARTITION BY (day(event_ts), bucket(16, user_id)) MODE UPSERT
=>
CreateSink(CreateSinkStatement { name: Some(UnresolvedItemName([Ident("bar")])), in_cluster: None, if_not_exists: false, from: Name(UnresolvedItemName([Ident("foo")])), connection: Iceberg { connection: Name(UnresolvedItemName([Ident("s3tables")])), aws_connection: Name(UnresolvedItemName([Ident("aws_conn")])), key: Some(SinkKey { key_columns: [Ident("a")], not_enforced: false }), partition_by: Some([IcebergPartitionField { column: Ident("event_ts"), transform: Day }, IcebergPartitionField { column: Ident("user_id"), transform: Bucket(16) }]), options: [IcebergSinkConfigOption { name: Namespace, value: Some(Value(String("testnamespace"))) }, IcebergSinkConfigOption { name: Table, value: Some(Value(String("daily_sales"))) }] }, format: None, envelope: None, mode: Some(Upsert), with_options: [] })

parse-statement
CREATE SINK bar FROM foo INTO ICEBERG CATALOG CONNECTION s3tables (NAMESPACE = 'testnamespace', TABLE = 'daily_sales') USING AWS CONNECTION aws_conn PARTITION BY (year(ts), month(ts), truncate(10, name)) MODE APPEND;
----
CREATE SINK bar FROM foo INTO ICEBERG CATALOG CONNECTION s3tables (NAMESPACE = 'testnamespace', TABLE = 'daily_sales') USING AWS CONNECTION aws_conn PARTITION BY (year(ts), month(ts), truncate(10, name)) MODE APPEND
=>
CreateSink(CreateSinkStatement { name: Some(UnresolvedItemName([Ident("bar")])), in_cluster: None, if_not_exists: false, from: Name(UnresolvedItemName([Ident("foo")])), connection: Iceberg { connection: Name(UnresolvedItemName([Ident("s3tables")])), aws_connection: Name(UnresolvedItemName([Ident("aws_conn")])), key: None, partition_by: Some([IcebergPartitionField { column: Ident("ts"), transform: Year }, IcebergPartitionField { column: Ident("ts"), transform: Month }, IcebergPartitionField { column: Ident("name"), transform: Truncate(10) }]), options: [IcebergSinkConfigOption { name: Namespace, value: Some(Value(String("testnamespace"))) }, IcebergSinkConfigOption { name: Table, value: Some(Value(String("daily_sales"))) }] }, format: None, envelope: None, mode: Some(Append), with_options: [] })

parse-statement
CREATE SINK bar FROM foo INTO ICEBERG CATALOG CONNECTION s3tables (NAMESPACE = 'testnamespace', TABLE = 'daily_sales') USING AWS CONNECTION aws_conn PARTITION BY (identity(region)) MODE APPEND;
----
CREATE SINK bar FROM foo INTO ICEBERG CATALOG CONNECTION s3tables (NAMESPACE = 'testnamespace', TABLE = 'daily_sales') USING AWS CONNECTION aws_conn PARTITION BY (region) MODE APPEND
=>
CreateSink(CreateSinkStatement { name: Some(UnresolvedItemName([Ident("bar")])), in_cluster: None, if_not_exists: false, from: Name(UnresolvedItemName([Ident("foo")])), connection: Iceberg { connection: Name(UnresolvedItemName([Ident("s3tables")])), aws_connection: Name(UnresolvedItemName([Ident("aws_conn")])), key: None, partition_by: Some([IcebergPartitionField { column: Ident("region"), transform: Identity }]), options: [IcebergSinkConfigOption { name: Namespace, value: Some(Value(String("testnamespace"))) }, IcebergSinkConfigOption { name: Table, value: Some(Value(String("daily_sales"))) }] }, format: None, envelope: None, mode: Some(Append), with_options: [] })

parse-statement
CREATE SINK bar FROM foo INTO ICEBERG CATALOG CONNECTION s3tables (NAMESPACE = 'testnamespace', TABLE = 'daily_sales') USING AWS CONNECTION aws_conn PARTITION BY (hour(ts), void(x)) MODE APPEND;
----
CREATE SINK bar FROM foo INTO ICEBERG CATALOG CONNECTION s3tables (NAMESPACE = 'testnamespace', TABLE = 'daily_sales') USING AWS CONNECTION aws_conn PARTITION BY (hour(ts), void(x)) MODE APPEND
=>
CreateSink(CreateSinkStatement { name: Some(UnresolvedItemName([Ident("bar")])), in_cluster: None, if_not_exists: false, from: Name(UnresolvedItemName([Ident("foo")])), connection: Iceberg { connection: Name(UnresolvedItemName([Ident("s3tables")])), aws_connection: Name(UnresolvedItemName([Ident("aws_conn")])), key: None, partition_by: Some([IcebergPartitionField { column: Ident("ts"), transform: Hour }, IcebergPartitionField { column: Ident("x"), transform: Void }]), options: [IcebergSinkConfigOption { name: Namespace, value: Some(Value(String("testnamespace"))) }, IcebergSinkConfigOption { name: Table, value: Some(Value(String("daily_sales"))) }] }, format: None, envelope: None, mode: Some(Append), with_options: [] })

parse-statement
CREATE SINK bar FROM foo INTO ICEBERG CATALOG CONNECTION s3tables (NAMESPACE = 'testnamespace', TABLE = 'daily_sales') USING AWS CONNECTION aws_conn PARTITION BY (badtransform(x)) MODE APPEND;
----
error: unknown Iceberg partition transform 'badtransform', expected identity, year, month, day, hour, void, bucket, or truncate
CREATE SINK bar FROM foo INTO ICEBERG CATALOG CONNECTION s3tables (NAMESPACE = 'testnamespace', TABLE = 'daily_sales') USING AWS CONNECTION aws_conn PARTITION BY (badtransform(x)) MODE APPEND;
^

parse-statement
CREATE INDEX foo ON myschema.bar (a, b)
Expand Down
Loading
Loading