Skip to content

Commit a1565a2

Browse files
committed
risingwave: seed _sink_info_ with WHERE NOT EXISTS; keep ON CONFLICT OVERWRITE at DDL
- Change schema.sql export to use SELECT ... WHERE NOT EXISTS for RisingWave seed - Keep INSERT ... ON CONFLICT DO NOTHING for PostgreSQL seed - Update static SQL for RisingWave to include ON CONFLICT OVERWRITE on _sink_info_ - Update docs to reflect dialect-specific seeding semantics
1 parent 84e95be commit a1565a2

File tree

4 files changed

+41
-17
lines changed

4 files changed

+41
-17
lines changed

cmd/substreams-sink-sql/from_proto_generate_csv.go

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -385,11 +385,18 @@ func exportSQLSchema(dialect sql.Dialect, sqlSchema *schema.Schema, useConstrain
385385

386386
// Seed sink info so from-proto can start without error
387387
switch driver {
388-
case "postgres", "risingwave":
388+
case "postgres":
389389
seed := fmt.Sprintf("INSERT INTO \"%s\".\"_sink_info_\" (schema_hash) VALUES ('%s') ON CONFLICT (schema_hash) DO NOTHING;", sqlSchema.Name, dialect.SchemaHash())
390390
b.WriteString("-- Seed\n")
391391
b.WriteString(seed)
392392
b.WriteString("\n")
393+
case "risingwave":
394+
// RisingWave does not support ON CONFLICT at INSERT time; enable ON CONFLICT OVERWRITE at CREATE TABLE
395+
// and perform a plain INSERT here.
396+
seed := fmt.Sprintf("INSERT INTO \"%s\".\"_sink_info_\" (schema_hash) VALUES ('%s');", sqlSchema.Name, dialect.SchemaHash())
397+
b.WriteString("-- Seed\n")
398+
b.WriteString(seed)
399+
b.WriteString("\n")
393400
}
394401

395402
// Write to file
@@ -520,7 +527,7 @@ CREATE TABLE IF NOT EXISTS "%s"._blocks_ (
520527
521528
CREATE TABLE IF NOT EXISTS "%s"._sink_info_ (
522529
schema_hash VARCHAR PRIMARY KEY
523-
);
530+
) ON CONFLICT OVERWRITE;
524531
525532
CREATE TABLE IF NOT EXISTS "%s"._cursor_ (
526533
name VARCHAR PRIMARY KEY,

db_proto/sql/risingwave/dialect.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@ const risingwaveStaticSql = `
1818
1919
CREATE TABLE IF NOT EXISTS "%s"._sink_info_ (
2020
schema_hash VARCHAR PRIMARY KEY
21-
);
21+
) ON CONFLICT OVERWRITE;
2222
2323
CREATE TABLE IF NOT EXISTS "%s"._cursor_ (
2424
name VARCHAR PRIMARY KEY,

db_proto/sql/risingwave/dialect_test.go

Lines changed: 12 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -71,8 +71,8 @@ func TestDialectRisingwave_Init(t *testing.T) {
7171
}
7272

7373
func TestDialectRisingwave_CreateTableStaticSql(t *testing.T) {
74-
// Test that the static SQL contains expected RisingWave-specific elements
75-
sql := strings.ToLower(risingwaveStaticSql)
74+
// Test that the static SQL contains expected RisingWave-specific elements
75+
sql := strings.ToLower(risingwaveStaticSql)
7676

7777
// Check schema creation
7878
assert.Contains(t, sql, "create schema if not exists")
@@ -81,11 +81,16 @@ func TestDialectRisingwave_CreateTableStaticSql(t *testing.T) {
8181
assert.Contains(t, sql, "_sink_info_")
8282
assert.Contains(t, sql, "schema_hash varchar primary key")
8383

84-
// Check _cursor_ table
85-
assert.Contains(t, sql, "_cursor_")
86-
assert.Contains(t, sql, "name varchar primary key")
87-
assert.Contains(t, sql, "cursor varchar")
88-
assert.Contains(t, sql, "on conflict overwrite", "RisingWave should use ON CONFLICT OVERWRITE")
84+
// Check _cursor_ table
85+
assert.Contains(t, sql, "_cursor_")
86+
assert.Contains(t, sql, "name varchar primary key")
87+
assert.Contains(t, sql, "cursor varchar")
88+
assert.Contains(t, sql, "on conflict overwrite", "RisingWave should use ON CONFLICT OVERWRITE")
89+
90+
// Check that _sink_info_ also uses ON CONFLICT OVERWRITE semantics
91+
// Count occurrences to ensure both _sink_info_ and _cursor_ include it
92+
occurrences := strings.Count(sql, "on conflict overwrite")
93+
assert.GreaterOrEqual(t, occurrences, 2, "_sink_info_ and _cursor_ should both include ON CONFLICT OVERWRITE")
8994

9095
// Check _blocks_ table
9196
assert.Contains(t, sql, "_blocks_")

docs/FROM_PROTO_GENERATE_CSV_README.md

Lines changed: 19 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -66,12 +66,23 @@ Notes:
6666
## Schema File
6767

6868
- `schema.sql` is the exact DDL `from-proto` would execute.
69-
- For PostgreSQL/RisingWave, a line is appended to seed `_sink_info_` with the schema hash:
70-
```sql
71-
INSERT INTO "<schema>"."_sink_info_" (schema_hash)
72-
VALUES ('<hash>')
73-
ON CONFLICT (schema_hash) DO NOTHING;
74-
```
69+
- Seeding `_sink_info_` with the schema hash differs by dialect:
70+
- PostgreSQL:
71+
```sql
72+
INSERT INTO "<schema>"."_sink_info_" (schema_hash)
73+
VALUES ('<hash>')
74+
ON CONFLICT (schema_hash) DO NOTHING;
75+
```
76+
- RisingWave:
77+
- Tables are created with `ON CONFLICT OVERWRITE` at CREATE TABLE for `_sink_info_` and `_cursor_`.
78+
- The seed uses a DO-NOTHING equivalent pattern:
79+
```sql
80+
INSERT INTO "<schema>"."_sink_info_" (schema_hash)
81+
SELECT '<hash>'
82+
WHERE NOT EXISTS (
83+
SELECT 1 FROM "<schema>"."_sink_info_" WHERE schema_hash = '<hash>'
84+
);
85+
```
7586
This lets `from-proto` start without additional migrations.
7687

7788
## End‑to‑End Workflow (PostgreSQL)
@@ -118,7 +129,8 @@ substreams-sink-sql from-proto "$PSQL_DSN" "$MANIFEST" "$MODULE" \
118129

119130
## Dialect Notes
120131

121-
- PostgreSQL/RisingWave: `_sink_info_`, `_cursor_`, `_blocks_` tables are created in `schema.sql`. Cursor injection uses `COPY ... WITH (HEADER)`.
132+
- PostgreSQL: `_sink_info_`, `_cursor_`, `_blocks_` tables are created in `schema.sql`. Cursor injection uses `COPY ... WITH (HEADER)`.
133+
- RisingWave: `_sink_info_` and `_cursor_` are created with `ON CONFLICT OVERWRITE` to make subsequent inserts idempotent. The seed uses a `WHERE NOT EXISTS` form.
122134
- ClickHouse: CSVs include dialect-specific version/deleted fields. Schema.sql includes database and `_blocks_` table; cursor/sink info are not table-based — follow FROM_PROTO.md for CH specifics.
123135

124136
## Operational Tips

0 commit comments

Comments
 (0)