Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
319 changes: 319 additions & 0 deletions Cargo.lock

Large diffs are not rendered by default.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ members = [
"core/connectors/sdk",
"core/connectors/sinks/elasticsearch_sink",
"core/connectors/sinks/iceberg_sink",
"core/connectors/sinks/mongodb_sink",
"core/connectors/sinks/postgres_sink",
"core/connectors/sinks/quickwit_sink",
"core/connectors/sinks/stdout_sink",
Expand Down
23 changes: 23 additions & 0 deletions DEPENDENCIES.md
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,7 @@ borsh: 1.6.0, "Apache-2.0 OR MIT",
borsh-derive: 1.6.0, "Apache-2.0",
brotli: 8.0.2, "BSD-3-Clause AND MIT",
brotli-decompressor: 5.0.0, "BSD-3-Clause OR MIT",
bson: 2.15.0, "MIT",
bstr: 1.12.1, "Apache-2.0 OR MIT",
built: 0.8.0, "MIT",
bumpalo: 3.19.1, "Apache-2.0 OR MIT",
Expand Down Expand Up @@ -254,6 +255,8 @@ der: 0.7.10, "Apache-2.0 OR MIT",
der-parser: 10.0.0, "Apache-2.0 OR MIT",
deranged: 0.5.6, "Apache-2.0 OR MIT",
derive-new: 0.7.0, "MIT",
derive-syn-parse: 0.2.0, "Apache-2.0 OR MIT",
derive-where: 1.6.0, "Apache-2.0 OR MIT",
derive_builder: 0.20.2, "Apache-2.0 OR MIT",
derive_builder_core: 0.20.2, "Apache-2.0 OR MIT",
derive_builder_macro: 0.20.2, "Apache-2.0 OR MIT",
Expand Down Expand Up @@ -289,6 +292,7 @@ embedded-io: 0.4.0, "Apache-2.0 OR MIT",
embedded-io: 0.6.1, "Apache-2.0 OR MIT",
encode_unicode: 1.0.0, "Apache-2.0 OR MIT",
encoding_rs: 0.8.35, "(Apache-2.0 OR MIT) AND BSD-3-Clause",
enum-as-inner: 0.6.1, "Apache-2.0 OR MIT",
enum_dispatch: 0.3.13, "Apache-2.0 OR MIT",
enumset: 1.1.10, "Apache-2.0 OR MIT",
enumset_derive: 0.14.0, "Apache-2.0 OR MIT",
Expand Down Expand Up @@ -402,6 +406,8 @@ heapless: 0.7.17, "Apache-2.0 OR MIT",
heck: 0.5.0, "Apache-2.0 OR MIT",
hermit-abi: 0.5.2, "Apache-2.0 OR MIT",
hex: 0.4.3, "Apache-2.0 OR MIT",
hickory-proto: 0.25.2, "Apache-2.0 OR MIT",
hickory-resolver: 0.25.2, "Apache-2.0 OR MIT",
hkdf: 0.12.4, "Apache-2.0 OR MIT",
hmac: 0.12.1, "Apache-2.0 OR MIT",
home: 0.5.12, "Apache-2.0 OR MIT",
Expand Down Expand Up @@ -450,6 +456,7 @@ iggy_common: 0.9.1-edge.1, "Apache-2.0",
iggy_connector_elasticsearch_sink: 0.3.1-edge.1, "Apache-2.0",
iggy_connector_elasticsearch_source: 0.3.1-edge.1, "Apache-2.0",
iggy_connector_iceberg_sink: 0.3.1-edge.1, "Apache-2.0",
iggy_connector_mongodb_sink: 0.3.0, "Apache-2.0",
iggy_connector_postgres_sink: 0.3.1-edge.1, "Apache-2.0",
iggy_connector_postgres_source: 0.3.1-edge.1, "Apache-2.0",
iggy_connector_quickwit_sink: 0.3.1-edge.1, "Apache-2.0",
Expand Down Expand Up @@ -478,6 +485,7 @@ interpolate_name: 0.2.4, "MIT",
inventory: 0.3.21, "Apache-2.0 OR MIT",
io-uring: 0.7.11, "Apache-2.0 OR MIT",
io_uring_buf_ring: 0.2.3, "MIT",
ipconfig: 0.3.2, "Apache-2.0 OR MIT",
ipnet: 2.11.0, "Apache-2.0 OR MIT",
iri-string: 0.7.10, "Apache-2.0 OR MIT",
is_terminal_polyfill: 1.70.2, "Apache-2.0 OR MIT",
Expand Down Expand Up @@ -542,6 +550,10 @@ loom: 0.7.2, "MIT",
loop9: 0.1.5, "MIT",
lru-slab: 0.1.2, "Apache-2.0 OR MIT OR Zlib",
lz4_flex: 0.12.0, "MIT",
macro_magic: 0.5.1, "MIT",
macro_magic_core: 0.5.1, "MIT",
macro_magic_core_macros: 0.5.1, "MIT",
macro_magic_macros: 0.5.1, "MIT",
macro_rules_attribute: 0.1.3, "MIT",
macro_rules_attribute-proc_macro: 0.1.3, "MIT",
matchers: 0.2.0, "MIT",
Expand All @@ -563,6 +575,10 @@ mio: 1.1.1, "MIT",
mockall: 0.14.0, "Apache-2.0 OR MIT",
mockall_derive: 0.14.0, "Apache-2.0 OR MIT",
moka: 0.12.13, "(Apache-2.0 OR MIT) AND Apache-2.0",
mongocrypt: 0.3.2, "Apache-2.0",
mongocrypt-sys: 0.1.5+1.15.1, "Apache-2.0",
mongodb: 3.5.1, "Apache-2.0",
mongodb-internal-macros: 3.5.1, "Apache-2.0",
moxcms: 0.7.11, "Apache-2.0 OR BSD-3-Clause",
murmur3: 0.5.2, "Apache-2.0 OR MIT",
never-say-never: 6.6.666, "Apache-2.0 OR MIT OR Zlib",
Expand Down Expand Up @@ -638,6 +654,7 @@ password-hash: 0.5.0, "Apache-2.0 OR MIT",
paste: 1.0.15, "Apache-2.0 OR MIT",
pastey: 0.1.1, "Apache-2.0 OR MIT",
pastey: 0.2.1, "Apache-2.0 OR MIT",
pbkdf2: 0.12.2, "Apache-2.0 OR MIT",
pear: 0.2.9, "Apache-2.0 OR MIT",
pear_codegen: 0.2.9, "Apache-2.0 OR MIT",
peg: 0.6.3, "MIT",
Expand Down Expand Up @@ -738,6 +755,7 @@ reqwest: 0.12.28, "Apache-2.0 OR MIT",
reqwest-middleware: 0.4.2, "Apache-2.0 OR MIT",
reqwest-retry: 0.8.0, "Apache-2.0 OR MIT",
reqwest-tracing: 0.5.8, "Apache-2.0 OR MIT",
resolv-conf: 0.7.6, "Apache-2.0 OR MIT",
resvg: 0.45.1, "Apache-2.0 OR MIT",
retry-policies: 0.5.1, "Apache-2.0 OR MIT",
rfc6979: 0.4.0, "Apache-2.0 OR MIT",
Expand All @@ -762,6 +780,7 @@ rust-ini: 0.21.3, "MIT",
rust_decimal: 1.40.0, "MIT",
rustc-hash: 2.1.1, "Apache-2.0 OR MIT",
rustc_version: 0.4.1, "Apache-2.0 OR MIT",
rustc_version_runtime: 0.3.0, "MIT",
rusticata-macros: 4.1.0, "Apache-2.0 OR MIT",
rustix: 0.38.44, "Apache-2.0 OR Apache-2.0 WITH LLVM-exception OR MIT",
rustix: 1.1.3, "Apache-2.0 OR Apache-2.0 WITH LLVM-exception OR MIT",
Expand Down Expand Up @@ -877,6 +896,7 @@ sys_traits_macros: 0.1.0, "MIT",
sysinfo: 0.37.2, "MIT",
sysinfo: 0.38.1, "MIT",
tagptr: 0.2.0, "Apache-2.0 OR MIT",
take_mut: 0.2.2, "MIT",
tap: 1.0.1, "MIT",
tar: 0.4.44, "Apache-2.0 OR MIT",
tempfile: 3.25.0, "Apache-2.0 OR MIT",
Expand Down Expand Up @@ -943,8 +963,10 @@ ttf-parser: 0.25.1, "Apache-2.0 OR MIT",
tungstenite: 0.28.0, "Apache-2.0 OR MIT",
twox-hash: 2.1.2, "MIT",
typed-builder: 0.20.1, "Apache-2.0 OR MIT",
typed-builder: 0.22.0, "Apache-2.0 OR MIT",
typed-builder: 0.23.2, "Apache-2.0 OR MIT",
typed-builder-macro: 0.20.1, "Apache-2.0 OR MIT",
typed-builder-macro: 0.22.0, "Apache-2.0 OR MIT",
typed-builder-macro: 0.23.2, "Apache-2.0 OR MIT",
typed-path: 0.12.3, "Apache-2.0 OR MIT",
typenum: 1.19.0, "Apache-2.0 OR MIT",
Expand Down Expand Up @@ -1084,6 +1106,7 @@ windows_x86_64_msvc: 0.52.6, "Apache-2.0 OR MIT",
windows_x86_64_msvc: 0.53.1, "Apache-2.0 OR MIT",
winnow: 0.5.40, "MIT",
winnow: 0.7.14, "MIT",
winreg: 0.50.0, "MIT",
winsafe: 0.0.19, "MIT",
wit-bindgen: 0.51.0, "Apache-2.0 OR Apache-2.0 WITH LLVM-exception OR MIT",
wit-bindgen-core: 0.51.0, "Apache-2.0 OR Apache-2.0 WITH LLVM-exception OR MIT",
Expand Down
3 changes: 1 addition & 2 deletions core/connectors/runtime/src/sink.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
use crate::configs::connectors::SinkConfig;
use crate::context::RuntimeContext;
use crate::log::LOG_CALLBACK;
use crate::metrics::{ConnectorType, Metrics};
use crate::metrics::Metrics;
use crate::{
PLUGIN_ID, RuntimeError, SinkApi, SinkConnector, SinkConnectorConsumer, SinkConnectorPlugin,
SinkConnectorWrapper, resolve_plugin_path, transform,
Expand Down Expand Up @@ -324,7 +324,6 @@ pub(crate) async fn consume_messages(
error!(
"Failed to process {messages_count} messages for sink connector with ID: {plugin_id}. {error}",
);
metrics.increment_errors(plugin_key, ConnectorType::Sink);
return Err(error);
}
};
Expand Down
6 changes: 6 additions & 0 deletions core/connectors/sdk/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,12 @@ pub mod transforms;
pub use log::LogCallback;
pub use transforms::Transform;

#[doc(hidden)]
pub mod connector_macro_support {
pub use dashmap::DashMap;
pub use once_cell::sync::Lazy;
}

static RUNTIME: OnceCell<Runtime> = OnceCell::new();

pub fn get_runtime() -> &'static Runtime {
Expand Down
3 changes: 1 addition & 2 deletions core/connectors/sdk/src/sink.rs
Original file line number Diff line number Diff line change
Expand Up @@ -222,8 +222,7 @@ macro_rules! sink_connector {
assert_trait::<$type>();
};

use dashmap::DashMap;
use once_cell::sync::Lazy;
use $crate::connector_macro_support::{DashMap, Lazy};
use $crate::LogCallback;
use $crate::sink::SinkContainer;

Expand Down
42 changes: 42 additions & 0 deletions core/connectors/sinks/mongodb_sink/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

[package]
name = "iggy_connector_mongodb_sink"
version = "0.3.0"
description = "Iggy MongoDB sink connector for storing stream messages into MongoDB database"
edition = "2024"
license = "Apache-2.0"
keywords = ["iggy", "messaging", "streaming", "mongodb", "sink"]
categories = ["command-line-utilities", "database", "network-programming"]
homepage = "https://iggy.apache.org"
documentation = "https://iggy.apache.org/docs"
repository = "https://github.com/apache/iggy"
readme = "../../README.md"

[lib]
crate-type = ["cdylib", "lib"]

[dependencies]
async-trait = { workspace = true }
humantime = { workspace = true }
iggy_connector_sdk = { workspace = true }
mongodb = { version = "3.0", features = ["rustls-tls"] }
serde = { workspace = true }
serde_json = { workspace = true }
tokio = { workspace = true }
tracing = { workspace = true }
146 changes: 146 additions & 0 deletions core/connectors/sinks/mongodb_sink/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,146 @@
# MongoDB Sink Connector

Consumes messages from Iggy streams and stores them in a MongoDB collection.

## Try It

Send a JSON message through Iggy and see it land in MongoDB.

**Prerequisites**: Docker running, project built (`cargo build` from repo root).

```bash
# Start MongoDB
docker run -d --name mongo-test -p 27017:27017 mongo:7

# Start iggy-server (terminal 2)
IGGY_ROOT_USERNAME=iggy IGGY_ROOT_PASSWORD=iggy ./target/debug/iggy-server

# Create stream and topic
./target/debug/iggy -u iggy -p iggy stream create demo_stream
./target/debug/iggy -u iggy -p iggy topic create demo_stream demo_topic 1

# Setup connector config
mkdir -p /tmp/mdb-sink-test/connectors
cat > /tmp/mdb-sink-test/config.toml << 'TOML'
[iggy]
address = "localhost:8090"
username = "iggy"
password = "iggy"
[state]
path = "/tmp/mdb-sink-test/state"
[connectors]
config_type = "local"
config_dir = "/tmp/mdb-sink-test/connectors"
TOML
cat > /tmp/mdb-sink-test/connectors/sink.toml << 'TOML'
type = "sink"
key = "mongodb"
enabled = true
version = 0
name = "test"
path = "target/debug/libiggy_connector_mongodb_sink"
[[streams]]
stream = "demo_stream"
topics = ["demo_topic"]
schema = "json"
batch_length = 100
poll_interval = "100ms"
consumer_group = "test_cg"
[plugin_config]
connection_uri = "mongodb://localhost:27017"
database = "test_db"
collection = "messages"
payload_format = "json"
auto_create_collection = true
TOML

# Start connector (terminal 3)
IGGY_CONNECTORS_CONFIG_PATH=/tmp/mdb-sink-test/config.toml ./target/debug/iggy-connectors

# Send a message
./target/debug/iggy -u iggy -p iggy message send demo_stream demo_topic '{"hello":"mongodb"}'

# Verify in MongoDB
docker exec mongo-test mongosh --quiet --eval \
'db.getSiblingDB("test_db").messages.find().pretty()'
```

Expected:

```json
{ "payload": { "hello": "mongodb" }, "iggy_offset": 0, "iggy_stream": "demo_stream" }
```

Cleanup: `docker rm -f mongo-test && rm -rf /tmp/mdb-sink-test`

## Quick Start

```toml
[[streams]]
stream = "demo_stream"
topics = ["demo_topic"]
schema = "json"
batch_length = 100
poll_interval = "100ms"
consumer_group = "mongodb_cg"

[plugin_config]
connection_uri = "mongodb://localhost:27017"
database = "iggy_data"
collection = "messages"
payload_format = "json"
```

## Configuration

| Option | Default | Description |
| ------ | ------- | ----------- |
| `connection_uri` | **required** | MongoDB URI |
| `database` | **required** | Target database |
| `collection` | **required** | Target collection |
| `batch_size` | `100` | Documents per `insertMany` call |
| `payload_format` | `binary` | `binary`, `json`, or `string` |
| `include_metadata` | `true` | Add iggy offset, timestamp, stream, topic, partition |
| `include_checksum` | `true` | Add message checksum |
| `include_origin_timestamp` | `true` | Add origin timestamp |
| `auto_create_collection` | `false` | Create collection if missing |
| `max_pool_size` | driver default | Connection pool size |
| `verbose_logging` | `false` | Log at info instead of debug |
| `max_retries` | `3` | Retry attempts for transient errors |
| `retry_delay` | `1s` | Base delay (`retry_delay * attempt`) |

## Testing

Requires Docker. Testcontainers starts MongoDB 7 + iggy-server automatically.

```bash
cargo test --test mod -- mongodb_sink
```

This runs 4 E2E tests against a real MongoDB instance:

- `json_messages_sink_to_mongodb` — JSON payloads stored as embedded BSON documents
- `binary_messages_sink_as_bson_binary` — binary payloads stored as BSON Binary
- `large_batch_processed_correctly` — batch insertion with configurable batch size
- `auto_create_collection_on_open` — collection created automatically when missing

Unit tests (no Docker):

```bash
cargo test -p iggy_connector_mongodb_sink
```

## Delivery Semantics

This connector provides **at-least-once** delivery semantics.

### Behavior

- Messages may be delivered more than once on retry or restart
- Uses Iggy message ID as MongoDB `_id` for document identity
- **Insert-only mode**: duplicate key error is a hard failure (not upsert)

### Known Limitations

- On network timeout during insert, retry may cause duplicate key error
- Sink does not upsert on duplicate (future improvement)
Loading
Loading