Skip to content

feat(connectors): Clickhouse Sink Connector#2886

Open
kriti-sc wants to merge 3 commits intoapache:masterfrom
kriti-sc:clickhouse-sink
Open

feat(connectors): Clickhouse Sink Connector#2886
kriti-sc wants to merge 3 commits intoapache:masterfrom
kriti-sc:clickhouse-sink

Conversation

@kriti-sc
Copy link
Contributor

@kriti-sc kriti-sc commented Mar 6, 2026

Which issue does this PR close?

Closes #2539

Rationale

Clickhouse is a real-time data analytics engine, and very popular in modern analytics architectures.

What changed?

This PR introduces a Clickhouse Sink Connector that enables writing data from Iggy to Clickhouse.

The Clickhouse writing logic is heavily inspired by the official Clickhouse Kafka Connector.

Local Execution

  • Produced messages 30456 + 29060 rows with schema user_id: String, user_type: u8, email: String, source: String, state: String, created_at: DateTime, message: String using sample data producer.
  • Consumed messages using the Clickhouse sink and into the particular Clickhouse table.
  • Verified schema and number of rows in Clickhouse.
  • Added unit tests and e2e tests, both passing.

Images 1&2: Produced 30456 + 29060 rows into Iggy in two batches
Image 3: Verified schema and number of rows in Clickhouse

image image image

AI Usage

  1. Which tools? (e.g., GitHub Copilot, Claude, ChatGPT) Claude Code
  2. Scope of usage? (e.g., autocomplete, generated functions, entire implementation) generated functions
  3. How did you verify the generated code works correctly? Manual testing by producing data into Iggy and then running the sink and verifying insertion into Clickhouse, unit tests and e2e tests for different Clickhouse insert configurations.
  4. Can you explain every line of the code if asked? Yes

@codecov
Copy link

codecov bot commented Mar 6, 2026

Codecov Report

❌ Patch coverage is 79.47903% with 323 lines in your changes missing coverage. Please review.
✅ Project coverage is 68.54%. Comparing base (ecd7709) to head (fc34e4d).

Files with missing lines Patch % Lines
...ore/connectors/sinks/clickhouse_sink/src/binary.rs 79.30% 102 Missing and 46 partials ⚠️
...ore/connectors/sinks/clickhouse_sink/src/schema.rs 76.73% 22 Missing and 65 partials ⚠️
...ore/connectors/sinks/clickhouse_sink/src/client.rs 63.53% 55 Missing and 11 partials ⚠️
core/connectors/sinks/clickhouse_sink/src/body.rs 91.66% 12 Missing and 1 partial ⚠️
core/connectors/sinks/clickhouse_sink/src/lib.rs 96.37% 3 Missing and 2 partials ⚠️
core/connectors/sinks/clickhouse_sink/src/sink.rs 60.00% 4 Missing ⚠️
Additional details and impacted files
@@             Coverage Diff              @@
##             master    #2886      +/-   ##
============================================
+ Coverage     68.36%   68.54%   +0.18%     
  Complexity      739      739              
============================================
  Files          1053     1059       +6     
  Lines         84763    86337    +1574     
  Branches      61297    62879    +1582     
============================================
+ Hits          57948    59183    +1235     
- Misses        24448    24649     +201     
- Partials       2367     2505     +138     
Flag Coverage Δ
csharp 67.43% <ø> (-0.19%) ⬇️
go 6.27% <ø> (ø)
java 54.83% <ø> (ø)
node 92.18% <ø> (-0.23%) ⬇️
python 81.57% <ø> (ø)
rust 70.30% <79.47%> (+0.24%) ⬆️

Flags with carried forward coverage won't be shown. Click here to find out more.

Files with missing lines Coverage Δ
core/connectors/sinks/clickhouse_sink/src/sink.rs 60.00% <60.00%> (ø)
core/connectors/sinks/clickhouse_sink/src/lib.rs 96.37% <96.37%> (ø)
core/connectors/sinks/clickhouse_sink/src/body.rs 91.66% <91.66%> (ø)
...ore/connectors/sinks/clickhouse_sink/src/client.rs 63.53% <63.53%> (ø)
...ore/connectors/sinks/clickhouse_sink/src/schema.rs 76.73% <76.73%> (ø)
...ore/connectors/sinks/clickhouse_sink/src/binary.rs 79.30% <79.30%> (ø)

... and 17 files with indirect coverage changes

🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.
  • 📦 JS Bundle Analysis: Save yourself from yourself by tracking and limiting bundle sizes in JS merges.

Comment on lines +19 to +30
//! RowBinary / RowBinaryWithDefaults byte serialization.
//!
//! Follows the ClickHouse binary format specification:
//! <https://clickhouse.com/docs/en/interfaces/formats#rowbinary>
//!
//! Key layout rules:
//! - All integers are **little-endian**.
//! - Strings are prefixed with an **unsigned LEB128 varint** length.
//! - `Nullable(T)`: 1-byte null marker (`0x01` = null, `0x00` = not null)
//! followed by T bytes when not null.
//! - `RowBinaryWithDefaults`: each top-level column is preceded by a 1-byte
//! flag (`0x01` = use server DEFAULT, `0x00` = value follows).
Copy link

@abonander abonander Mar 6, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would you mind explaining the reasoning to choose a bespoke implementation here over using the official Rust client? It uses HTTP and RowBinary serialization by default, so it's not clear what's being gained here.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The official client is not suitable for use with Iggy because it requires the target table schema to be defined at compile time using statically typed Rust structs. In contrast, Iggy connectors expect the schema to be provided dynamically via configuration.

Even if the ClickHouse client were used, a dynamic encoder would still need to be implemented to convert runtime data into the required binary format. In that case, the client would only simplify some HTTP request handling while leaving the core complexity unresolved.

Supporting the binary ingestion format is important because it provides the best ingestion performance in ClickHouse.

Let me know if this addresses your question, or if there are other considerations I may have overlooked.

@kriti-sc kriti-sc requested a review from abonander March 7, 2026 10:58
Comment on lines +194 to +208
let f = coerce_f64(value)?;
let scale_factor = 10f64.powi(*scale as i32);
let int_val = (f * scale_factor).round() as i128;
if *precision <= 9 {
buf.extend_from_slice(&(int_val as i32).to_le_bytes());
} else if *precision <= 18 {
buf.extend_from_slice(&(int_val as i64).to_le_bytes());
} else {
// Int128: two little-endian 64-bit words, low word first
let lo = int_val as i64;
let hi = (int_val >> 64) as i64;
buf.extend_from_slice(&lo.to_le_bytes());
buf.extend_from_slice(&hi.to_le_bytes());
}
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The as i32 / as i64 casts on int_val are truncating in Rust. if the incoming value exceeds the column's declared precision, the lower bits silently wrap around and you'll write wrong data into ClickHouse with no error. ClickHouse won't reject it since RowBinary is trusted input. Could you add bounds checks before the casts and return InvalidRecord if the scaled value doesn't fit?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Implement ClickHouse Sink Connector

3 participants