diff --git a/db_changes/db/dialect_clickhouse.go b/db_changes/db/dialect_clickhouse.go index 79b8b0e..2fb4ea7 100644 --- a/db_changes/db/dialect_clickhouse.go +++ b/db_changes/db/dialect_clickhouse.go @@ -413,11 +413,18 @@ func (d ClickhouseDialect) GetTableColumns(db *sql.DB, schemaName, tableName str return nil, fmt.Errorf("scanning describe results: %w", err) } - // First column is always the column name, second is the data type + // First column is always the column name, second is the data type, + // third is the default_type (MATERIALIZED, ALIAS, DEFAULT, or empty) name := fmt.Sprintf("%v", values[0]) dataType := fmt.Sprintf("%v", values[1]) + defaultType := "" + if len(values) > 2 && values[2] != nil { + defaultType = fmt.Sprintf("%v", values[2]) + } - if !strings.Contains(dataType, "AggregateFunction") { + // Skip AggregateFunction columns and MATERIALIZED columns + // MATERIALIZED columns are auto-computed and cannot be inserted into + if !strings.Contains(dataType, "AggregateFunction") && defaultType != "MATERIALIZED" { nonAggregateColumns = append(nonAggregateColumns, EscapeIdentifier(name)) } } diff --git a/tests/integration/db_changes_clickhouse_test.go b/tests/integration/db_changes_clickhouse_test.go index 5a2562d..71c41a1 100644 --- a/tests/integration/db_changes_clickhouse_test.go +++ b/tests/integration/db_changes_clickhouse_test.go @@ -120,6 +120,35 @@ func TestClickhouseSinker_Integration_MaterializedView(t *testing.T) { ) } +func TestClickhouseSinker_Integration_MaterializedColumn(t *testing.T) { + runClickhouseSinkerTest( + t, + sharedDbChangesClickhouseContainer, + nil, + nil, + rawSQLInput(func(schema string) string { + return ` + CREATE TABLE IF NOT EXISTS events ( + id String, + timestamp DateTime, + value String, + minute DateTime MATERIALIZED toStartOfMinute(timestamp) + ) ENGINE = MergeTree() + ORDER BY id; + ` + }), + streamMock( + dbChangesBlockData(t, "10a", finalBlock("10a"), + insertRowSinglePK("events", "event1", "timestamp", "2021-01-01 12:34:56", "value", "test_value"), + ), + ), + equalsClickhouseEventsRows([]*EventsRow{ + {ID: "event1", Timestamp: "2021-01-01T12:34:56Z", Value: "test_value"}, + }), + "Block #10 (10a) - LIB #10 (10a)", + ) +} + func runClickhouseSinkerTest( t *testing.T, clickhouseContainer *ClickhouseContainerExt, @@ -312,6 +341,32 @@ func readClickhouseMetricsRows(t *testing.T, db *sqlx.DB, schema SchemaName) []* return rows } +// EventsRow represents a row in the events table with MATERIALIZED columns +type EventsRow struct { + ID string `db:"id"` + Timestamp string `db:"timestamp"` + Value string `db:"value"` +} + +func equalsClickhouseEventsRows(expected []*EventsRow) func(t *testing.T, dbx *sqlx.DB, schema string) { + return func(t *testing.T, dbx *sqlx.DB, schema string) { + schemaName := NewSchemaName(schema) + require.Equal(t, expected, readClickhouseEventsRows(t, dbx, schemaName)) + } +} + +func readClickhouseEventsRows(t *testing.T, db *sqlx.DB, schema SchemaName) []*EventsRow { + t.Helper() + + var rows []*EventsRow + // Only select the regular columns, not the MATERIALIZED columns + query := fmt.Sprintf(`SELECT id, timestamp, value FROM %s.events ORDER BY id;`, schema) + err := db.SelectContext(context.Background(), &rows, query) + require.NoError(t, err) + + return rows +} + // waitForClickHouseCursor implements aggressive retry mechanism for ClickHouse cursor reads // Uses OPTIMIZE TABLE ... FINAL and retries 500 times with 10ms intervals (5 seconds total) func waitForClickHouseCursor(t *testing.T, ctx context.Context, db *sqlx.DB, schema SchemaName, moduleHash string) string {