Skip to content

Commit 76bbd91

Browse files
committed
Add reason dimension to ingest/events/thrownAway metric
1 parent ab969cb commit 76bbd91

File tree

22 files changed

+794
-50
lines changed

22 files changed

+794
-50
lines changed

docs/operations/metrics.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -270,7 +270,7 @@ batch ingestion emit the following metrics. These metrics are deltas for each em
270270
|`ingest/events/processed`|Number of events processed per emission period.|`dataSource`, `taskId`, `taskType`, `groupId`, `tags`|Equal to the number of events per emission period.|
271271
|`ingest/events/processedWithError`|Number of events processed with some partial errors per emission period. Events processed with partial errors are counted towards both this metric and `ingest/events/processed`.|`dataSource`, `taskId`, `taskType`, `groupId`, `tags`|0|
272272
|`ingest/events/unparseable`|Number of events rejected because the events are unparseable.|`dataSource`, `taskId`, `taskType`, `groupId`, `tags`|0|
273-
|`ingest/events/thrownAway`|Number of events rejected because they are null, or filtered by `transformSpec`, or outside one of `lateMessageRejectionPeriod`, `earlyMessageRejectionPeriod`.|`dataSource`, `taskId`, `taskType`, `groupId`, `tags`|0|
273+
|`ingest/events/thrownAway`|Number of events rejected because they are null, or filtered by `transformSpec`, or outside one of `lateMessageRejectionPeriod`, `earlyMessageRejectionPeriod`. The `reason` dimension indicates why the event was thrown away: `null` (null or empty event), `beforeMinMessageTime` (late message rejection), `afterMaxMessageTime` (early message rejection), or `filtered` (filtered by transformSpec).|`dataSource`, `taskId`, `taskType`, `groupId`, `tags`, `reason`|0|
274274
|`ingest/events/duplicate`|Number of events rejected because the events are duplicated.|`dataSource`, `taskId`, `taskType`, `groupId`, `tags`|0|
275275
|`ingest/input/bytes`|Number of bytes read from input sources, after decompression but prior to parsing. This covers all data read, including data that does not end up being fully processed and ingested. For example, this includes data that ends up being rejected for being unparseable or filtered out.|`dataSource`, `taskId`, `taskType`, `groupId`, `tags`|Depends on the amount of data read.|
276276
|`ingest/rows/output`|Number of Druid rows persisted.|`dataSource`, `taskId`, `taskType`, `groupId`|Your number of events with rollup.|

extensions-contrib/dropwizard-emitter/src/main/resources/defaultMetricDimensions.json

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -126,7 +126,8 @@
126126
},
127127
"ingest/events/thrownAway": {
128128
"dimensions": [
129-
"dataSource"
129+
"dataSource",
130+
"reason"
130131
],
131132
"type": "counter"
132133
},

extensions-contrib/opentsdb-emitter/src/main/resources/defaultMetrics.json

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -54,7 +54,8 @@
5454
"query/cache/total/timeouts": [],
5555
"query/cache/total/errors": [],
5656
"ingest/events/thrownAway": [
57-
"dataSource"
57+
"dataSource",
58+
"reason"
5859
],
5960
"ingest/events/unparseable": [
6061
"dataSource"

extensions-contrib/prometheus-emitter/src/main/resources/defaultMetrics.json

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -104,7 +104,7 @@
104104
"ingest/events/processed" : { "dimensions" : ["dataSource"], "type" : "count", "help": "Number of events successfully processed per emission period." },
105105
"ingest/events/processedWithError" : { "dimensions" : ["dataSource"], "type" : "count", "help": "Number of events processed with some partial errors per emission period." },
106106
"ingest/events/unparseable" : { "dimensions" : ["dataSource"], "type" : "count", "help": "Number of events rejected because the events are unparseable." },
107-
"ingest/events/thrownAway" : { "dimensions" : ["dataSource"], "type" : "count", "help": "Number of events rejected because they are outside the windowPeriod."},
107+
"ingest/events/thrownAway" : { "dimensions" : ["dataSource", "reason"], "type" : "count", "help": "Number of events rejected because they are null, filtered by transformSpec, or outside the message rejection periods. The reason dimension indicates why: null, beforeMinMessageTime, afterMaxMessageTime, or filtered."},
108108
"ingest/events/duplicate" : { "dimensions" : ["dataSource"], "type" : "count", "help": "Number of events rejected because the events are duplicated."},
109109
"ingest/input/bytes" : { "dimensions" : ["dataSource"], "type" : "count", "help": "Number of bytes read from input sources, after decompression but prior to parsing." },
110110
"ingest/rows/output" : { "dimensions" : ["dataSource"], "type" : "count", "help": "Number of Druid rows persisted."},

extensions-contrib/statsd-emitter/src/main/resources/defaultMetricDimensions.json

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,7 @@
3838
"query/cache/total/timeouts" : { "dimensions" : [], "type" : "gauge" },
3939
"query/cache/total/errors" : { "dimensions" : [], "type" : "gauge" },
4040

41-
"ingest/events/thrownAway" : { "dimensions" : ["dataSource"], "type" : "count" },
41+
"ingest/events/thrownAway" : { "dimensions" : ["dataSource", "reason"], "type" : "count" },
4242
"ingest/events/unparseable" : { "dimensions" : ["dataSource"], "type" : "count" },
4343
"ingest/events/duplicate" : { "dimensions" : ["dataSource"], "type" : "count" },
4444
"ingest/events/processed" : { "dimensions" : ["dataSource"], "type" : "count" },

indexing-service/src/main/java/org/apache/druid/indexing/common/stats/DropwizardRowIngestionMeters.java

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,9 @@
2323
import com.codahale.metrics.MetricRegistry;
2424
import org.apache.druid.segment.incremental.RowIngestionMeters;
2525
import org.apache.druid.segment.incremental.RowIngestionMetersTotals;
26+
import org.apache.druid.segment.incremental.ThrownAwayReason;
2627

28+
import java.util.EnumMap;
2729
import java.util.HashMap;
2830
import java.util.Map;
2931

@@ -33,11 +35,14 @@ public class DropwizardRowIngestionMeters implements RowIngestionMeters
3335
public static final String FIVE_MINUTE_NAME = "5m";
3436
public static final String FIFTEEN_MINUTE_NAME = "15m";
3537

38+
private static final int NUM_THROWN_AWAY_REASONS = ThrownAwayReason.values().length;
39+
3640
private final Meter processed;
3741
private final Meter processedBytes;
3842
private final Meter processedWithError;
3943
private final Meter unparseable;
4044
private final Meter thrownAway;
45+
private final Meter[] thrownAwayByReason = new Meter[NUM_THROWN_AWAY_REASONS];
4146

4247
public DropwizardRowIngestionMeters()
4348
{
@@ -47,6 +52,9 @@ public DropwizardRowIngestionMeters()
4752
this.processedWithError = metricRegistry.meter(PROCESSED_WITH_ERROR);
4853
this.unparseable = metricRegistry.meter(UNPARSEABLE);
4954
this.thrownAway = metricRegistry.meter(THROWN_AWAY);
55+
for (ThrownAwayReason reason : ThrownAwayReason.values()) {
56+
this.thrownAwayByReason[reason.ordinal()] = metricRegistry.meter(THROWN_AWAY + "_" + reason.name());
57+
}
5058
}
5159

5260
@Override
@@ -109,6 +117,23 @@ public void incrementThrownAway()
109117
thrownAway.mark();
110118
}
111119

120+
@Override
121+
public void incrementThrownAway(ThrownAwayReason reason)
122+
{
123+
thrownAway.mark();
124+
thrownAwayByReason[reason.ordinal()].mark();
125+
}
126+
127+
@Override
128+
public Map<ThrownAwayReason, Long> getThrownAwayByReason()
129+
{
130+
EnumMap<ThrownAwayReason, Long> result = new EnumMap<>(ThrownAwayReason.class);
131+
for (ThrownAwayReason reason : ThrownAwayReason.values()) {
132+
result.put(reason, thrownAwayByReason[reason.ordinal()].getCount());
133+
}
134+
return result;
135+
}
136+
112137
@Override
113138
public RowIngestionMetersTotals getTotals()
114139
{

indexing-service/src/main/java/org/apache/druid/indexing/common/stats/TaskRealtimeMetricsMonitor.java

Lines changed: 25 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -25,21 +25,27 @@
2525
import org.apache.druid.java.util.metrics.AbstractMonitor;
2626
import org.apache.druid.segment.incremental.RowIngestionMeters;
2727
import org.apache.druid.segment.incremental.RowIngestionMetersTotals;
28+
import org.apache.druid.segment.incremental.ThrownAwayReason;
2829
import org.apache.druid.segment.realtime.SegmentGenerationMetrics;
2930

31+
import java.util.EnumMap;
32+
import java.util.Map;
33+
3034
/**
3135
* Emits metrics from {@link SegmentGenerationMetrics} and {@link RowIngestionMeters}.
3236
*/
3337
public class TaskRealtimeMetricsMonitor extends AbstractMonitor
3438
{
3539
private static final EmittingLogger log = new EmittingLogger(TaskRealtimeMetricsMonitor.class);
40+
private static final String REASON_DIMENSION = "reason";
3641

3742
private final SegmentGenerationMetrics segmentGenerationMetrics;
3843
private final RowIngestionMeters rowIngestionMeters;
3944
private final ServiceMetricEvent.Builder builder;
4045

4146
private SegmentGenerationMetrics previousSegmentGenerationMetrics;
4247
private RowIngestionMetersTotals previousRowIngestionMetersTotals;
48+
private Map<ThrownAwayReason, Long> previousThrownAwayByReason;
4349

4450
public TaskRealtimeMetricsMonitor(
4551
SegmentGenerationMetrics segmentGenerationMetrics,
@@ -52,6 +58,7 @@ public TaskRealtimeMetricsMonitor(
5258
this.builder = metricEventBuilder;
5359
previousSegmentGenerationMetrics = new SegmentGenerationMetrics();
5460
previousRowIngestionMetersTotals = new RowIngestionMetersTotals(0, 0, 0, 0, 0);
61+
previousThrownAwayByReason = new EnumMap<>(ThrownAwayReason.class);
5562
}
5663

5764
@Override
@@ -60,14 +67,28 @@ public boolean doMonitor(ServiceEmitter emitter)
6067
SegmentGenerationMetrics metrics = segmentGenerationMetrics.snapshot();
6168
RowIngestionMetersTotals rowIngestionMetersTotals = rowIngestionMeters.getTotals();
6269

63-
final long thrownAway = rowIngestionMetersTotals.getThrownAway() - previousRowIngestionMetersTotals.getThrownAway();
64-
if (thrownAway > 0) {
70+
// Emit per-reason metrics with the reason dimension
71+
final Map<ThrownAwayReason, Long> currentThrownAwayByReason = rowIngestionMeters.getThrownAwayByReason();
72+
long totalThrownAway = 0;
73+
for (ThrownAwayReason reason : ThrownAwayReason.values()) {
74+
final long currentCount = currentThrownAwayByReason.getOrDefault(reason, 0L);
75+
final long previousCount = previousThrownAwayByReason.getOrDefault(reason, 0L);
76+
final long delta = currentCount - previousCount;
77+
if (delta > 0) {
78+
totalThrownAway += delta;
79+
emitter.emit(
80+
builder.setDimension(REASON_DIMENSION, reason.getMetricValue())
81+
.setMetric("ingest/events/thrownAway", delta)
82+
);
83+
}
84+
}
85+
previousThrownAwayByReason = currentThrownAwayByReason;
86+
if (totalThrownAway > 0) {
6587
log.warn(
6688
"[%,d] events thrown away. Possible causes: null events, events filtered out by transformSpec, or events outside earlyMessageRejectionPeriod / lateMessageRejectionPeriod.",
67-
thrownAway
89+
totalThrownAway
6890
);
6991
}
70-
emitter.emit(builder.setMetric("ingest/events/thrownAway", thrownAway));
7192

7293
final long unparseable = rowIngestionMetersTotals.getUnparseable()
7394
- previousRowIngestionMetersTotals.getUnparseable();

indexing-service/src/main/java/org/apache/druid/indexing/common/task/FilteringCloseableInputRowIterator.java

Lines changed: 22 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -24,34 +24,49 @@
2424
import org.apache.druid.java.util.common.parsers.ParseException;
2525
import org.apache.druid.segment.incremental.ParseExceptionHandler;
2626
import org.apache.druid.segment.incremental.RowIngestionMeters;
27+
import org.apache.druid.segment.incremental.ThrownAwayReason;
2728

2829
import java.io.IOException;
2930
import java.util.NoSuchElementException;
3031
import java.util.function.Predicate;
3132

3233
/**
3334
* An {@link InputRow} iterator used by ingestion {@link Task}s. It can filter out rows which do not satisfy the given
34-
* {@link #filter} or throw {@link ParseException} while parsing them. The relevant metric should be counted whenever
35+
* {@link RowFilter} or throw {@link ParseException} while parsing them. The relevant metric should be counted whenever
3536
* it filters out rows based on the filter. ParseException handling is delegatged to {@link ParseExceptionHandler}.
3637
*/
3738
public class FilteringCloseableInputRowIterator implements CloseableIterator<InputRow>
3839
{
3940
private final CloseableIterator<InputRow> delegate;
40-
private final Predicate<InputRow> filter;
41+
private final RowFilter rowFilter;
4142
private final RowIngestionMeters rowIngestionMeters;
4243
private final ParseExceptionHandler parseExceptionHandler;
4344

4445
private InputRow next;
4546

47+
/**
48+
* @deprecated Use the constructor with {@link RowFilter} for better observability of thrown away reasons.
49+
*/
50+
@Deprecated
4651
public FilteringCloseableInputRowIterator(
4752
CloseableIterator<InputRow> delegate,
4853
Predicate<InputRow> filter,
4954
RowIngestionMeters rowIngestionMeters,
5055
ParseExceptionHandler parseExceptionHandler
5156
)
57+
{
58+
this(delegate, RowFilter.fromPredicate(filter), rowIngestionMeters, parseExceptionHandler);
59+
}
60+
61+
public FilteringCloseableInputRowIterator(
62+
CloseableIterator<InputRow> delegate,
63+
RowFilter rowFilter,
64+
RowIngestionMeters rowIngestionMeters,
65+
ParseExceptionHandler parseExceptionHandler
66+
)
5267
{
5368
this.delegate = delegate;
54-
this.filter = filter;
69+
this.rowFilter = rowFilter;
5570
this.rowIngestionMeters = rowIngestionMeters;
5671
this.parseExceptionHandler = parseExceptionHandler;
5772
}
@@ -66,11 +81,12 @@ public boolean hasNext()
6681
while (next == null && delegate.hasNext()) {
6782
// delegate.next() can throw ParseException
6883
final InputRow row = delegate.next();
69-
// filter.test() can throw ParseException
70-
if (filter.test(row)) {
84+
// rowFilter.test() can throw ParseException, returns null if accepted, or reason if rejected
85+
final ThrownAwayReason rejectionReason = rowFilter.test(row);
86+
if (rejectionReason == null) {
7187
next = row;
7288
} else {
73-
rowIngestionMeters.incrementThrownAway();
89+
rowIngestionMeters.incrementThrownAway(rejectionReason);
7490
}
7591
}
7692
break;
Lines changed: 68 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,68 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
package org.apache.druid.indexing.common.task;
21+
22+
import org.apache.druid.data.input.InputRow;
23+
import org.apache.druid.segment.incremental.ThrownAwayReason;
24+
25+
import javax.annotation.Nullable;
26+
import java.util.function.Predicate;
27+
28+
/**
29+
* A filter for input rows during ingestion that can report the reason for rejection.
30+
* This is similar to {@link Predicate} but returns the rejection reason instead of just a boolean.
31+
*/
32+
@FunctionalInterface
33+
public interface RowFilter
34+
{
35+
/**
36+
* Tests whether the given row should be accepted.
37+
*
38+
* @param row the input row to test
39+
* @return null if the row should be accepted, or the {@link ThrownAwayReason} if the row should be rejected
40+
*/
41+
@Nullable
42+
ThrownAwayReason test(InputRow row);
43+
44+
/**
45+
* Creates a RowFilter from a Predicate. When the predicate returns false,
46+
* the rejection reason will be {@link ThrownAwayReason#FILTERED}.
47+
*/
48+
static RowFilter fromPredicate(Predicate<InputRow> predicate)
49+
{
50+
return row -> predicate.test(row) ? null : ThrownAwayReason.FILTERED;
51+
}
52+
53+
/**
54+
* Combines this filter with another filter. A row is rejected if either filter rejects it.
55+
* The rejection reason from the first rejecting filter (this filter first) is returned.
56+
*/
57+
default RowFilter and(RowFilter other)
58+
{
59+
return row -> {
60+
ThrownAwayReason reason = this.test(row);
61+
if (reason != null) {
62+
return reason;
63+
}
64+
return other.test(row);
65+
};
66+
}
67+
}
68+

indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunner.java

Lines changed: 21 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -66,6 +66,7 @@
6666
import org.apache.druid.indexing.common.actions.TaskLocks;
6767
import org.apache.druid.indexing.common.actions.TimeChunkLockAcquireAction;
6868
import org.apache.druid.indexing.common.stats.TaskRealtimeMetricsMonitor;
69+
import org.apache.druid.indexing.common.task.RowFilter;
6970
import org.apache.druid.indexing.input.InputRowSchemas;
7071
import org.apache.druid.indexing.seekablestream.common.OrderedPartitionableRecord;
7172
import org.apache.druid.indexing.seekablestream.common.OrderedSequenceNumber;
@@ -81,6 +82,7 @@
8182
import org.apache.druid.segment.incremental.ParseExceptionHandler;
8283
import org.apache.druid.segment.incremental.ParseExceptionReport;
8384
import org.apache.druid.segment.incremental.RowIngestionMeters;
85+
import org.apache.druid.segment.incremental.ThrownAwayReason;
8486
import org.apache.druid.segment.realtime.ChatHandler;
8587
import org.apache.druid.segment.realtime.SegmentGenerationMetrics;
8688
import org.apache.druid.segment.realtime.appenderator.Appenderator;
@@ -419,7 +421,7 @@ private TaskStatus runInternal(TaskToolbox toolbox) throws Exception
419421
inputRowSchema,
420422
task.getDataSchema().getTransformSpec(),
421423
toolbox.getIndexingTmpDir(),
422-
row -> row != null && withinMinMaxRecordTime(row),
424+
this::getRowRejectionReason,
423425
rowIngestionMeters,
424426
parseExceptionHandler
425427
);
@@ -2144,26 +2146,36 @@ private void refreshMinMaxMessageTime()
21442146
);
21452147
}
21462148

2147-
public boolean withinMinMaxRecordTime(final InputRow row)
2149+
/**
2150+
* Returns the rejection reason for a row, or null if the row should be accepted.
2151+
* This method is used as a {@link RowFilter} for the {@link StreamChunkParser}.
2152+
*/
2153+
@Nullable
2154+
ThrownAwayReason getRowRejectionReason(final InputRow row)
21482155
{
2149-
final boolean beforeMinimumMessageTime = minMessageTime.isAfter(row.getTimestamp());
2150-
final boolean afterMaximumMessageTime = maxMessageTime.isBefore(row.getTimestamp());
2151-
2152-
if (log.isDebugEnabled()) {
2153-
if (beforeMinimumMessageTime) {
2156+
if (row == null) {
2157+
return ThrownAwayReason.NULL;
2158+
}
2159+
if (minMessageTime.isAfter(row.getTimestamp())) {
2160+
if (log.isDebugEnabled()) {
21542161
log.debug(
21552162
"CurrentTimeStamp[%s] is before MinimumMessageTime[%s]",
21562163
row.getTimestamp(),
21572164
minMessageTime
21582165
);
2159-
} else if (afterMaximumMessageTime) {
2166+
}
2167+
return ThrownAwayReason.BEFORE_MIN_MESSAGE_TIME;
2168+
}
2169+
if (maxMessageTime.isBefore(row.getTimestamp())) {
2170+
if (log.isDebugEnabled()) {
21602171
log.debug(
21612172
"CurrentTimeStamp[%s] is after MaximumMessageTime[%s]",
21622173
row.getTimestamp(),
21632174
maxMessageTime
21642175
);
21652176
}
2177+
return ThrownAwayReason.AFTER_MAX_MESSAGE_TIME;
21662178
}
2167-
return !beforeMinimumMessageTime && !afterMaximumMessageTime;
2179+
return null;
21682180
}
21692181
}

0 commit comments

Comments
 (0)