Skip to content

feat(connectors): add MongoDB sink connector#2815

Open
amuldotexe wants to merge 7 commits intoapache:masterfrom
amuldotexe:codex/2739-sink-sync
Open

feat(connectors): add MongoDB sink connector#2815
amuldotexe wants to merge 7 commits intoapache:masterfrom
amuldotexe:codex/2739-sink-sync

Conversation

@amuldotexe
Copy link

Which issue does this PR close?

Partially addresses #2739 (MongoDB sink only).
MongoDB source support will follow in a separate PR.

Rationale

We need a MongoDB sink connector with explicit failure behavior so writes are never reported as successful when they are not.

What changed?

Before this change, MongoDB sink support was missing for connector runtime users.
This PR adds the MongoDB sink connector, including insert/write logic, retry handling for transient failures, explicit duplicate-key failure behavior, metadata mapping, and delivery-semantics documentation.
It also adds sink-focused integration and unit tests to validate payload formats, batch behavior, auto-create collection behavior, and non-silent failure paths.

Local Execution

  • Passed: cargo fmt --all -- --check

  • Passed: cargo clippy -p iggy_connector_mongodb_sink --all-targets -- -D warnings

  • Passed: cargo test -p iggy_connector_mongodb_sink (13 passed)

  • Passed: cargo test -p integration --test mod -- mongodb_sink (10 passed)

  • Pre-commit hooks ran

  • Docker E2E proof image: ghcr.io/amuldotexe/iggy-mongodb-sink-demo:issue-2739-05fbec16

  • Docker smoke result: SMOKE_OK ... docs=3

  • GHCR package: https://github.com/users/amuldotexe/packages/container/package/iggy-mongodb-sink-demo

  • Key sink tests:

  • duplicate_key_is_explicit_failure_and_not_silent_success

  • ordered_duplicate_partial_insert_has_exact_accounting

  • schema_validation_mid_batch_surfaces_hard_error_and_partial_prefix

  • write_concern_timeout_does_not_report_full_success

  • retryable_write_failover_keeps_single_doc_per_id

  • no_writes_performed_label_path_preserves_state_accuracy

  • json_messages_sink_to_mongodb

  • binary_messages_sink_as_bson_binary

  • large_batch_processed_correctly

  • auto_create_collection_on_open

  • given_no_client_should_return_error_not_silent_ok

AI Usage

  1. Which tools?
    Codex, Claude Code, Rust Rover
  2. Scope of usage?
    PRD, connector precedent analysis, TDD, implementation, and PR prep
  3. How did you verify the generated code works correctly?
    Ran all local format, clippy, unit, and integration checks listed above
  4. Can you explain every line of the code if asked?
    Yes

@amuldotexe
Copy link
Author

amuldotexe commented Feb 25, 2026

Docker proof of working (sink-only demo artifact, outside PR code scope):

Quick pull:

docker pull ghcr.io/amuldotexe/iggy-mongodb-sink-demo:issue-2739-05fbec16

Copy link
Contributor

@krishvishal krishvishal left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@amuldotexe I've added few comments.

It seems that inserts in mongodb can be fail after partial writes. Check if other places where insert is used are effected by this and handle them accordingly.

@amuldotexe
Copy link
Author

Thanks @krishvishal for the detailed feedback

I will work on this and get back with an updated PR

@codecov
Copy link

codecov bot commented Feb 26, 2026

Codecov Report

❌ Patch coverage is 87.05036% with 72 lines in your changes missing coverage. Please review.
✅ Project coverage is 68.70%. Comparing base (8c1d844) to head (081ebf2).

Files with missing lines Patch % Lines
core/connectors/sinks/mongodb_sink/src/lib.rs 87.05% 59 Missing and 13 partials ⚠️
Additional details and impacted files
@@             Coverage Diff              @@
##             master    #2815      +/-   ##
============================================
+ Coverage     68.57%   68.70%   +0.13%     
  Complexity      739      739              
============================================
  Files          1037     1038       +1     
  Lines         85610    86165     +555     
  Branches      62145    62709     +564     
============================================
+ Hits          58708    59203     +495     
- Misses        24499    24540      +41     
- Partials       2403     2422      +19     
Flag Coverage Δ
csharp 67.43% <ø> (-0.19%) ⬇️
go 4.37% <ø> (ø)
java 54.83% <ø> (ø)
node 92.26% <ø> (-0.04%) ⬇️
python 81.57% <ø> (ø)
rust 70.67% <87.05%> (+0.18%) ⬆️

Flags with carried forward coverage won't be shown. Click here to find out more.

Files with missing lines Coverage Δ
core/connectors/runtime/src/sink.rs 74.51% <ø> (+0.17%) ⬆️
core/connectors/sdk/src/lib.rs 56.00% <ø> (ø)
core/connectors/sdk/src/sink.rs 76.82% <ø> (+10.59%) ⬆️
core/connectors/sinks/mongodb_sink/src/lib.rs 87.05% <87.05%> (ø)

... and 12 files with indirect coverage changes

🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.
  • 📦 JS Bundle Analysis: Save yourself from yourself by tracking and limiting bundle sizes in JS merges.

@atharvalade
Copy link
Contributor

Adding to @krishvishal points which already cover a lot of ground. Two more things I think are blockers here.

First, there's a liveness issue with how partial failures interact with offset commits. When any batch inside process_messages fails, the whole call returns Err, so the consumer offset never advances. But ordered insert_many already committed the documents before the failure point. On the next poll the upstream re-sends everything, those already committed docs hit duplicate _id errors, which are non-transient so retries fail immediately, offset still doesn't commit, and you're stuck in a permanent loop. The connector can never recover from even a single transient mid-batch failure.

Second, message.id.to_string() as the MongoDB _id isn't safe when you're consuming multiple topics into the same collection. Two unrelated messages from different topics can easily share the same id, and whichever one lands first wins while the other silently fails. The _id should be a composite of stream, topic, partition, and message id to avoid collisions and quiet data loss.

@amuldotexe
Copy link
Author

Addressed the review feedback in 07bd2317.

What changed:

  • Removed the shared runtime sink callback-status change from this PR. core/connectors/runtime/src/sink.rs is back to connector-runtime behavior, so the MongoDB PR no longer changes sink behavior for all connectors.
  • Completed the remaining range-safe metadata handling in the MongoDB sink for checksum, timestamp, and origin timestamp.
  • Kept the sink-local fixes already on the branch: ordered(false) batch inserts, duplicate-key replay tolerance, composite _id, AtomicU64 counters, config normalization in new(), and warning on unknown payload_format.
  • Re-scoped the two MongoDB integration tests that had been asserting runtime last_error / ConnectorStatus::Error semantics. They now validate connector-local MongoDB outcomes only.

Validation run on this head:

  • cargo clippy -p iggy_connector_mongodb_sink -p iggy-connectors --all-targets --all-features -- -D warnings
  • cargo test -p iggy-connectors
  • cargo test -p iggy_connector_mongodb_sink
  • cargo build -p server --bin iggy-server
  • cargo build -p iggy-connectors --bin iggy-connectors
  • cargo build -p iggy_connector_mongodb_sink --lib
  • cargo test -p integration --test mod connectors::mongodb::mongodb_sink::
  • cargo build --all-targets --all-features

Current PR head: 07bd2317f52b25484e91c0c697dd43d32ec4b2d9.

@krishvishal @atharvalade please take another look when convenient.

Copy link
Contributor

@krishvishal krishvishal left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

All review comments addressed. LGTM.

numinnex
numinnex previously approved these changes Mar 9, 2026
@hubcio
Copy link
Contributor

hubcio commented Mar 9, 2026

@amuldotexe looks good, but please fix the CI. after that we can merge.

@amuldotexe
Copy link
Author

@hubcio fixed CI; was an unused import

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

5 participants