Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .github/actions/spelling/allow.txt
Original file line number Diff line number Diff line change
Expand Up @@ -564,3 +564,4 @@ sighup
CLAUDE
linting
lexers
dlq
73 changes: 73 additions & 0 deletions changelog.d/1772_add_dlq.feature.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
Add support for Dead Letter Queue (dlq) on sink and enabling it in Elasticsearch sink.

## Simple use case
Create two indexes, one with mapped filed `value` equal to long and one without mappings like this:

```bash
curl -X PUT "localhost:9200/test-index" -H 'Content-Type: application/json' -d '
{
"mappings": {
"properties": {
"value": {
"type": "long"
}
}
}
}' -v

curl -X PUT "localhost:9200/test-index-no-mapping" -H 'Content-Type: application/json' -d '{}' -v
```

Then, configure vector like this:

```yaml
api:
enabled: true
sources:
my_source:
type: demo_logs
format: shuffle
lines:
- '{"value":1,"tag":"ok"}'
- '{"value":2,"tag":"ok"}'
- '{"value":"bad","tag":"bad"}'
- '{"value":"bad2","tag":"bad"}'
- '{"value":3,"tag":"bad"}'

transforms:
my_transform:
type: remap
inputs:
- my_source
source: |
. = parse_json!(.message)
my_dlq_transform:
type: remap
inputs:
- "es_out.dlq"
source: |
.enter_dlq = true
sinks:
es_out:
type: elasticsearch
inputs:
- my_transform
endpoints:
- "http://localhost:9200"
bulk:
index: "test-index"
es_dlq_out:
type: elasticsearch
inputs:
- "my_dlq_transform"
endpoints:
- "http://localhost:9200"
bulk:
index: "test-index-no-mapping"

```

You should see that the events with `value` field as string will be sent to `test-index-no-mapping` index and the ones with `value` field as long will be sent to `test-index` index.
As per example above, `es_out.dlq` is used as input for `transform` or can be used directly into another `sink`, like filesystem line.

authors: tanganellilore
72 changes: 72 additions & 0 deletions lib/vector-core/src/config/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -191,6 +191,72 @@ impl SourceOutput {
}
}

#[derive(Debug, Clone, PartialEq)]
pub struct SinkOutput {
pub port: Option<String>,
pub ty: DataType,

// NOTE: schema definitions are only implemented/supported for log-type events. There is no
// inherent blocker to support other types as well, but it'll require additional work to add
// the relevant schemas, and store them separately in this type.
pub schema_definition: Option<Arc<schema::Definition>>,
}

impl SinkOutput {
#[must_use]
pub fn new_maybe_logs(ty: DataType, schema_definition: schema::Definition) -> Self {
let schema_definition = ty
.contains(DataType::Log)
.then(|| Arc::new(schema_definition));

Self {
port: None,
ty,
schema_definition,
}
}

#[must_use]
pub fn new_metrics() -> Self {
Self {
port: None,
ty: DataType::Metric,
schema_definition: None,
}
}

#[must_use]
pub fn new_traces() -> Self {
Self {
port: None,
ty: DataType::Trace,
schema_definition: None,
}
}

#[must_use]
pub fn schema_definition(&self, schema_enabled: bool) -> Option<schema::Definition> {
use std::ops::Deref;

self.schema_definition.as_ref().map(|definition| {
if schema_enabled {
definition.deref().clone()
} else {
let mut new_definition =
schema::Definition::default_for_namespace(definition.log_namespaces());
new_definition.add_meanings(definition.meanings());
new_definition
}
})
}

#[must_use]
pub fn with_port(mut self, name: impl Into<String>) -> Self {
self.port = Some(name.into());
self
}
}

fn fmt_helper(
f: &mut fmt::Formatter<'_>,
maybe_port: Option<&String>,
Expand All @@ -209,6 +275,12 @@ impl fmt::Display for SourceOutput {
}
}

impl fmt::Display for SinkOutput {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
fmt_helper(f, self.port.as_ref(), self.ty)
}
}

#[derive(Debug, Clone, PartialEq)]
pub struct TransformOutput {
pub port: Option<String>,
Expand Down
43 changes: 42 additions & 1 deletion lib/vector-core/src/source_sender/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use vector_buffers::topology::channel::LimitedReceiver;
use vector_common::internal_event::DEFAULT_OUTPUT;

use super::{CHUNK_SIZE, LAG_TIME_NAME, Output, SourceSender, SourceSenderItem};
use crate::config::{ComponentKey, OutputId, SourceOutput};
use crate::config::{ComponentKey, OutputId, SinkOutput, SourceOutput};

pub struct Builder {
buf_size: usize,
Expand Down Expand Up @@ -89,6 +89,47 @@ impl Builder {
}
}

pub fn add_sink_output(
&mut self,
output: SinkOutput,
component_key: ComponentKey,
) -> LimitedReceiver<SourceSenderItem> {
let lag_time = self.lag_time.clone();
let log_definition = output.schema_definition.clone();
let output_id = OutputId {
component: component_key,
port: output.port.clone(),
};
match output.port {
None => {
let (output, rx) = Output::new_with_buffer(
self.buf_size,
DEFAULT_OUTPUT.to_owned(),
lag_time,
log_definition,
output_id,
self.timeout,
self.ewma_half_life_seconds,
);
self.default_output = Some(output);
rx
}
Some(name) => {
let (output, rx) = Output::new_with_buffer(
self.buf_size,
name.clone(),
lag_time,
log_definition,
output_id,
self.timeout,
self.ewma_half_life_seconds,
);
self.named_outputs.insert(name, output);
rx
}
}
}

pub fn build(self) -> SourceSender {
SourceSender {
default_output: self.default_output,
Expand Down
4 changes: 2 additions & 2 deletions lib/vector-lib/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,8 @@ pub mod config {
pub use vector_common::config::ComponentKey;
pub use vector_core::config::{
AcknowledgementsConfig, DataType, GlobalOptions, Input, LegacyKey, LogNamespace, LogSchema,
MEMORY_BUFFER_DEFAULT_MAX_EVENTS, OutputId, SourceAcknowledgementsConfig, SourceOutput,
Tags, Telemetry, TransformOutput, WildcardMatching, clone_input_definitions,
MEMORY_BUFFER_DEFAULT_MAX_EVENTS, OutputId, SinkOutput, SourceAcknowledgementsConfig,
SourceOutput, Tags, Telemetry, TransformOutput, WildcardMatching, clone_input_definitions,
init_log_schema, init_telemetry, log_schema, proxy, telemetry,
};
}
Expand Down
2 changes: 1 addition & 1 deletion lib/vector-top/src/dashboard.rs
Original file line number Diff line number Diff line change
Expand Up @@ -373,7 +373,7 @@ impl<'a> Widgets<'a> {
.map(Cell::from)
.collect::<Vec<_>>();
data[1] = Cell::from(id.as_str());
data[5] = Cell::from(sent_events_metric);
data[6] = Cell::from(sent_events_metric);
items.push(Row::new(data).style(Style::default()));
}
}
Expand Down
18 changes: 16 additions & 2 deletions src/api/schema/metrics/filter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,20 @@ fn sum_metrics_owned<I: IntoIterator<Item = Metric>>(metrics: I) -> Option<Metri
Some(iter.fold(m, |mut m1, m2| if m1.update(&m2) { m1 } else { m2 }))
}

fn component_total_sent_events_metric(metrics: Vec<Metric>) -> Option<Metric> {
let output_agnostic: Vec<Metric> = metrics
.iter()
.filter(|m| m.tag_value("output").is_none())
.cloned()
.collect();

if output_agnostic.is_empty() {
sum_metrics_owned(metrics)
} else {
sum_metrics_owned(output_agnostic)
}
}

pub trait MetricsFilter<'a> {
fn received_bytes_total(&self) -> Option<ReceivedBytesTotal>;
fn received_events_total(&self) -> Option<ReceivedEventsTotal>;
Expand Down Expand Up @@ -310,7 +324,7 @@ pub fn component_sent_events_totals_metrics_with_outputs(
})
.collect();

let sum = sum_metrics_owned(metrics)?;
let sum = component_total_sent_events_metric(metrics)?;
match sum.value() {
MetricValue::Counter { value }
if cache.insert(id, *value).unwrap_or(0.00) < *value =>
Expand Down Expand Up @@ -350,7 +364,7 @@ pub fn component_sent_events_total_throughputs_with_outputs(
})
.collect::<Vec<_>>();

let sum = sum_metrics_owned(metrics)?;
let sum = component_total_sent_events_metric(metrics)?;
let total_throughput = throughput(&sum, id.clone(), &mut cache)?;
Some((
ComponentKey::from(id),
Expand Down
Loading
Loading