Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 9 additions & 2 deletions db_changes/db/dialect_clickhouse.go
Original file line number Diff line number Diff line change
Expand Up @@ -413,11 +413,18 @@ func (d ClickhouseDialect) GetTableColumns(db *sql.DB, schemaName, tableName str
return nil, fmt.Errorf("scanning describe results: %w", err)
}

// First column is always the column name, second is the data type
// First column is always the column name, second is the data type,
// third is the default_type (MATERIALIZED, ALIAS, DEFAULT, or empty)
name := fmt.Sprintf("%v", values[0])
dataType := fmt.Sprintf("%v", values[1])
defaultType := ""
if len(values) > 2 && values[2] != nil {
defaultType = fmt.Sprintf("%v", values[2])
}

if !strings.Contains(dataType, "AggregateFunction") {
// Skip AggregateFunction columns and MATERIALIZED columns
// MATERIALIZED columns are auto-computed and cannot be inserted into
if !strings.Contains(dataType, "AggregateFunction") && defaultType != "MATERIALIZED" {
nonAggregateColumns = append(nonAggregateColumns, EscapeIdentifier(name))
}
}
Expand Down
55 changes: 55 additions & 0 deletions tests/integration/db_changes_clickhouse_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,35 @@ func TestClickhouseSinker_Integration_MaterializedView(t *testing.T) {
)
}

func TestClickhouseSinker_Integration_MaterializedColumn(t *testing.T) {
runClickhouseSinkerTest(
t,
sharedDbChangesClickhouseContainer,
nil,
nil,
rawSQLInput(func(schema string) string {
return `
CREATE TABLE IF NOT EXISTS events (
id String,
timestamp DateTime,
value String,
minute DateTime MATERIALIZED toStartOfMinute(timestamp)
) ENGINE = MergeTree()
ORDER BY id;
`
}),
streamMock(
dbChangesBlockData(t, "10a", finalBlock("10a"),
insertRowSinglePK("events", "event1", "timestamp", "2021-01-01 12:34:56", "value", "test_value"),
),
),
equalsClickhouseEventsRows([]*EventsRow{
{ID: "event1", Timestamp: "2021-01-01T12:34:56Z", Value: "test_value"},
}),
"Block #10 (10a) - LIB #10 (10a)",
)
}

func runClickhouseSinkerTest(
t *testing.T,
clickhouseContainer *ClickhouseContainerExt,
Expand Down Expand Up @@ -312,6 +341,32 @@ func readClickhouseMetricsRows(t *testing.T, db *sqlx.DB, schema SchemaName) []*
return rows
}

// EventsRow represents a row in the events table with MATERIALIZED columns
type EventsRow struct {
ID string `db:"id"`
Timestamp string `db:"timestamp"`
Value string `db:"value"`
}

func equalsClickhouseEventsRows(expected []*EventsRow) func(t *testing.T, dbx *sqlx.DB, schema string) {
return func(t *testing.T, dbx *sqlx.DB, schema string) {
schemaName := NewSchemaName(schema)
require.Equal(t, expected, readClickhouseEventsRows(t, dbx, schemaName))
}
}

func readClickhouseEventsRows(t *testing.T, db *sqlx.DB, schema SchemaName) []*EventsRow {
t.Helper()

var rows []*EventsRow
// Only select the regular columns, not the MATERIALIZED columns
query := fmt.Sprintf(`SELECT id, timestamp, value FROM %s.events ORDER BY id;`, schema)
err := db.SelectContext(context.Background(), &rows, query)
require.NoError(t, err)

return rows
}

// waitForClickHouseCursor implements aggressive retry mechanism for ClickHouse cursor reads
// Uses OPTIMIZE TABLE ... FINAL and retries 500 times with 10ms intervals (5 seconds total)
func waitForClickHouseCursor(t *testing.T, ctx context.Context, db *sqlx.DB, schema SchemaName, moduleHash string) string {
Expand Down
Loading