diff --git a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java
index 46b101055c52..ce84944bb63b 100644
--- a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java
+++ b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java
@@ -39,6 +39,7 @@
import org.apache.commons.io.FileUtils;
import org.apache.curator.test.TestingCluster;
import org.apache.druid.cli.CliPeon;
+import org.apache.druid.cli.CliPeonTest;
import org.apache.druid.cli.PeonLoadSpecHolder;
import org.apache.druid.cli.PeonTaskHolder;
import org.apache.druid.data.input.InputEntity;
@@ -88,6 +89,12 @@
import org.apache.druid.java.util.common.guava.Sequences;
import org.apache.druid.java.util.common.parsers.CloseableIterator;
import org.apache.druid.java.util.common.parsers.ParseException;
+import org.apache.druid.java.util.emitter.core.Emitter;
+import org.apache.druid.java.util.emitter.core.Event;
+import org.apache.druid.java.util.emitter.core.EventMap;
+import org.apache.druid.java.util.emitter.service.ServiceMetricEvent;
+import org.apache.druid.java.util.metrics.StubServiceEmitter;
+import org.apache.druid.java.util.metrics.TaskHolder;
import org.apache.druid.math.expr.ExprMacroTable;
import org.apache.druid.query.DefaultGenericQueryMetricsFactory;
import org.apache.druid.query.DefaultQueryRunnerFactoryConglomerate;
@@ -121,7 +128,6 @@
import org.apache.druid.server.coordination.BroadcastDatasourceLoadingSpec;
import org.apache.druid.server.lookup.cache.LookupLoadingSpec;
import org.apache.druid.server.metrics.LoadSpecHolder;
-import org.apache.druid.server.metrics.TaskHolder;
import org.apache.druid.server.security.Action;
import org.apache.druid.server.security.Resource;
import org.apache.druid.server.security.ResourceAction;
@@ -3487,6 +3493,64 @@ public void testTaskWithTransformSpecDoesNotCauseCliPeonCyclicDependency()
verifyPersistAndMergeTimeMetricsArePositive(observedSegmentGenerationMetrics);
}
+ @Test(timeout = 60_000L)
+ public void testKafkaTaskContainsAllTaskDimensions()
+ throws IOException, ExecutionException, InterruptedException
+ {
+ insertData();
+
+ final KafkaIndexTask task = createTask(
+ "index_kafka_test_id1",
+ new KafkaIndexTaskIOConfig(
+ 0,
+ "sequence0",
+ new SeekableStreamStartSequenceNumbers<>(topic, ImmutableMap.of(new KafkaTopicPartition(false, topic, 0), 2L), ImmutableSet.of()),
+ new SeekableStreamEndSequenceNumbers<>(topic, ImmutableMap.of(new KafkaTopicPartition(false, topic, 0), 5L)),
+ kafkaServer.consumerProperties(),
+ KafkaSupervisorIOConfig.DEFAULT_POLL_TIMEOUT_MILLIS,
+ true,
+ null,
+ null,
+ INPUT_FORMAT,
+ null,
+ Duration.standardHours(2).getStandardMinutes()
+ )
+ );
+
+ Injector peonInjector = CliPeonTest.makePeonInjectorWithStubEmitter(task, temporaryFolder, OBJECT_MAPPER);
+ Emitter peonEmitter = peonInjector.getInstance(Emitter.class);
+ Assert.assertTrue(peonEmitter instanceof StubServiceEmitter);
+ emitter = (StubServiceEmitter) peonEmitter;
+ emitter.start();
+ makeToolboxFactory();
+
+ final ListenableFuture future = runTask(task);
+
+ // Wait for task to exit
+ Assert.assertEquals(TaskState.SUCCESS, future.get().getStatusCode());
+ Assert.assertTrue(emitter.getNumEmittedEvents() > 0);
+
+ // Check published metadata & segments in deep storage
+ final List publishedDescriptors = publishedDescriptors();
+ Assert.assertEquals(
+ new KafkaDataSourceMetadata(new SeekableStreamEndSequenceNumbers<>(topic, ImmutableMap.of(new KafkaTopicPartition(false, topic, 0), 5L))),
+ newDataSchemaMetadata()
+ );
+ Assert.assertEquals(ImmutableList.of("c"), readSegmentColumn("dim1", publishedDescriptors.get(0)));
+
+ Assert.assertTrue(emitter.getNumEmittedEvents() > 0);
+ for (Event event : emitter.getEvents()) {
+ if (event instanceof ServiceMetricEvent) {
+ EventMap observedEvent = event.toMap();
+ Assert.assertEquals("test_ds", observedEvent.get("dataSource"));
+ Assert.assertEquals("index_kafka_test_id1", observedEvent.get("id"));
+ Assert.assertEquals("index_kafka_test_id1", observedEvent.get("taskId"));
+ Assert.assertEquals("index_kafka", observedEvent.get("taskType"));
+ Assert.assertEquals("index_kafka_test_ds", observedEvent.get("groupId"));
+ }
+ }
+ }
+
public static class TestKafkaInputFormat implements InputFormat
{
final InputFormat baseInputFormat;
diff --git a/processing/src/main/java/org/apache/druid/java/util/emitter/service/ServiceEmitter.java b/processing/src/main/java/org/apache/druid/java/util/emitter/service/ServiceEmitter.java
index 13593fd146fc..d5e241df5a43 100644
--- a/processing/src/main/java/org/apache/druid/java/util/emitter/service/ServiceEmitter.java
+++ b/processing/src/main/java/org/apache/druid/java/util/emitter/service/ServiceEmitter.java
@@ -25,39 +25,56 @@
import org.apache.druid.java.util.common.lifecycle.LifecycleStop;
import org.apache.druid.java.util.emitter.core.Emitter;
import org.apache.druid.java.util.emitter.core.Event;
+import org.apache.druid.java.util.metrics.NoopTaskHolder;
+import org.apache.druid.java.util.metrics.TaskHolder;
import java.io.IOException;
public class ServiceEmitter implements Emitter
{
- private final ImmutableMap serviceDimensions;
private final Emitter emitter;
+ private final String service;
+ private final ImmutableMap otherServiceDimensions;
+ private final String host;
+ private final TaskHolder taskHolder;
+
+ /**
+ * This is initialized in {@link #start()} rather than in the constructor, since calling {@link TaskHolder#getMetricDimensions()}
+ * may introduce cyclic dependencies. So we defer initialization until {@link #start()} which is {@link LifecycleStart} managed.
+ */
+ private ImmutableMap serviceDimensions;
public ServiceEmitter(String service, String host, Emitter emitter)
{
- this(service, host, emitter, ImmutableMap.of());
+ this(service, host, emitter, ImmutableMap.of(), new NoopTaskHolder());
}
public ServiceEmitter(
String service,
String host,
Emitter emitter,
- ImmutableMap otherServiceDimensions
+ ImmutableMap otherServiceDimensions,
+ TaskHolder taskHolder
)
{
- this.serviceDimensions = ImmutableMap
- .builder()
- .put("service", Preconditions.checkNotNull(service, "service should be non-null"))
- .put("host", Preconditions.checkNotNull(host, "host should be non-null"))
- .putAll(otherServiceDimensions)
- .build();
+ this.service = Preconditions.checkNotNull(service, "service should be non-null");
+ this.host = Preconditions.checkNotNull(host, "host should be non-null");
+ this.otherServiceDimensions = otherServiceDimensions;
this.emitter = emitter;
+ this.taskHolder = taskHolder;
}
@Override
@LifecycleStart
public void start()
{
+ serviceDimensions = ImmutableMap
+ .builder()
+ .put(Event.SERVICE, service)
+ .put(Event.HOST, host)
+ .putAll(otherServiceDimensions)
+ .putAll(taskHolder.getMetricDimensions())
+ .build();
emitter.start();
}
diff --git a/processing/src/main/java/org/apache/druid/java/util/metrics/CgroupCpuMonitor.java b/processing/src/main/java/org/apache/druid/java/util/metrics/CgroupCpuMonitor.java
index d4589df73c08..1c6dc7abcea0 100644
--- a/processing/src/main/java/org/apache/druid/java/util/metrics/CgroupCpuMonitor.java
+++ b/processing/src/main/java/org/apache/druid/java/util/metrics/CgroupCpuMonitor.java
@@ -40,23 +40,21 @@ public class CgroupCpuMonitor extends FeedDefiningMonitor
private static final Logger LOG = new Logger(CgroupCpuMonitor.class);
private static final Long DEFAULT_USER_HZ = 100L;
final CgroupDiscoverer cgroupDiscoverer;
- final Map dimensions;
private Long userHz;
private final KeyedDiff jiffies = new KeyedDiff();
private long prevJiffiesSnapshotAt = 0;
private final boolean isRunningOnCgroupsV2;
private final CgroupV2CpuMonitor cgroupV2CpuMonitor;
- public CgroupCpuMonitor(CgroupDiscoverer cgroupDiscoverer, final Map dimensions, String feed)
+ public CgroupCpuMonitor(CgroupDiscoverer cgroupDiscoverer, String feed)
{
super(feed);
this.cgroupDiscoverer = cgroupDiscoverer;
- this.dimensions = dimensions;
-
+
// Check if we're running on cgroups v2
this.isRunningOnCgroupsV2 = cgroupDiscoverer.getCgroupVersion().equals(CgroupVersion.V2);
if (isRunningOnCgroupsV2) {
- this.cgroupV2CpuMonitor = new CgroupV2CpuMonitor(cgroupDiscoverer, dimensions, feed);
+ this.cgroupV2CpuMonitor = new CgroupV2CpuMonitor(cgroupDiscoverer, feed);
LOG.info("Detected cgroups v2, using CgroupV2CpuMonitor behavior for accurate metrics");
} else {
this.cgroupV2CpuMonitor = null;
@@ -65,19 +63,14 @@ public CgroupCpuMonitor(CgroupDiscoverer cgroupDiscoverer, final Map dimensions, String feed)
- {
- this(ProcSelfCgroupDiscoverer.autoCgroupDiscoverer(), dimensions, feed);
- }
-
- public CgroupCpuMonitor(final Map dimensions)
+ public CgroupCpuMonitor(String feed)
{
- this(dimensions, DEFAULT_METRICS_FEED);
+ this(ProcSelfCgroupDiscoverer.autoCgroupDiscoverer(), feed);
}
public CgroupCpuMonitor()
{
- this(ImmutableMap.of());
+ this(DEFAULT_METRICS_FEED);
}
@Override
@@ -96,7 +89,6 @@ private boolean doMonitorV1(ServiceEmitter emitter)
long now = Instant.now().getEpochSecond();
final ServiceMetricEvent.Builder builder = builder();
- MonitorUtils.addDimensionsToBuilder(builder, dimensions);
builder.setDimension("cgroupversion", cgroupDiscoverer.getCgroupVersion().name());
emitter.emit(builder.setMetric("cgroup/cpu/shares", cpuSnapshot.getShares()));
diff --git a/processing/src/main/java/org/apache/druid/java/util/metrics/CgroupCpuSetMonitor.java b/processing/src/main/java/org/apache/druid/java/util/metrics/CgroupCpuSetMonitor.java
index 4d4ae2008bb5..90ab47545819 100644
--- a/processing/src/main/java/org/apache/druid/java/util/metrics/CgroupCpuSetMonitor.java
+++ b/processing/src/main/java/org/apache/druid/java/util/metrics/CgroupCpuSetMonitor.java
@@ -19,15 +19,12 @@
package org.apache.druid.java.util.metrics;
-import com.google.common.collect.ImmutableMap;
import org.apache.druid.java.util.emitter.service.ServiceEmitter;
import org.apache.druid.java.util.emitter.service.ServiceMetricEvent;
import org.apache.druid.java.util.metrics.cgroups.CgroupDiscoverer;
import org.apache.druid.java.util.metrics.cgroups.CpuSet;
import org.apache.druid.java.util.metrics.cgroups.ProcSelfCgroupDiscoverer;
-import java.util.Map;
-
/**
* Monitor that reports CPU set metrics from cgroups both v1 and v2.
*/
@@ -35,28 +32,21 @@
public class CgroupCpuSetMonitor extends FeedDefiningMonitor
{
final CgroupDiscoverer cgroupDiscoverer;
- final Map dimensions;
- public CgroupCpuSetMonitor(CgroupDiscoverer cgroupDiscoverer, final Map dimensions, String feed)
+ public CgroupCpuSetMonitor(CgroupDiscoverer cgroupDiscoverer, String feed)
{
super(feed);
this.cgroupDiscoverer = cgroupDiscoverer;
- this.dimensions = dimensions;
- }
-
- public CgroupCpuSetMonitor(final Map dimensions, String feed)
- {
- this(ProcSelfCgroupDiscoverer.autoCgroupDiscoverer(), dimensions, feed);
}
- public CgroupCpuSetMonitor(final Map dimensions)
+ public CgroupCpuSetMonitor(String feed)
{
- this(dimensions, DEFAULT_METRICS_FEED);
+ this(ProcSelfCgroupDiscoverer.autoCgroupDiscoverer(), feed);
}
public CgroupCpuSetMonitor()
{
- this(ImmutableMap.of());
+ this(DEFAULT_METRICS_FEED);
}
@Override
@@ -64,7 +54,6 @@ public boolean doMonitor(ServiceEmitter emitter)
{
final CpuSet.CpuSetMetric cpusetSnapshot = cgroupDiscoverer.getCpuSetMetrics();
final ServiceMetricEvent.Builder builder = builder();
- MonitorUtils.addDimensionsToBuilder(builder, dimensions);
builder.setDimension("cgroupversion", cgroupDiscoverer.getCgroupVersion().name());
emitter.emit(builder.setMetric(
"cgroup/cpuset/cpu_count",
diff --git a/processing/src/main/java/org/apache/druid/java/util/metrics/CgroupDiskMonitor.java b/processing/src/main/java/org/apache/druid/java/util/metrics/CgroupDiskMonitor.java
index d120cd8a28bf..28c985c1ffc2 100644
--- a/processing/src/main/java/org/apache/druid/java/util/metrics/CgroupDiskMonitor.java
+++ b/processing/src/main/java/org/apache/druid/java/util/metrics/CgroupDiskMonitor.java
@@ -33,41 +33,34 @@
public class CgroupDiskMonitor extends FeedDefiningMonitor
{
private static final Logger LOG = new Logger(CgroupDiskMonitor.class);
- final CgroupDiscoverer cgroupDiscoverer;
- final Map dimensions;
+ private final CgroupDiscoverer cgroupDiscoverer;
private final KeyedDiff diff = new KeyedDiff();
private final boolean isRunningOnCgroupsV2;
private final CgroupV2DiskMonitor cgroupV2DiskMonitor;
- public CgroupDiskMonitor(CgroupDiscoverer cgroupDiscoverer, final Map dimensions, String feed)
+ public CgroupDiskMonitor(CgroupDiscoverer cgroupDiscoverer, String feed)
{
super(feed);
this.cgroupDiscoverer = cgroupDiscoverer;
- this.dimensions = dimensions;
-
+
// Check if we're running on cgroups v2
this.isRunningOnCgroupsV2 = cgroupDiscoverer.getCgroupVersion().equals(CgroupVersion.V2);
if (isRunningOnCgroupsV2) {
- this.cgroupV2DiskMonitor = new CgroupV2DiskMonitor(cgroupDiscoverer, dimensions, feed);
+ this.cgroupV2DiskMonitor = new CgroupV2DiskMonitor(cgroupDiscoverer, feed);
LOG.info("Detected cgroups v2, using CgroupV2DiskMonitor behavior for accurate metrics");
} else {
this.cgroupV2DiskMonitor = null;
}
}
- public CgroupDiskMonitor(final Map dimensions, String feed)
- {
- this(ProcSelfCgroupDiscoverer.autoCgroupDiscoverer(), dimensions, feed);
- }
-
- public CgroupDiskMonitor(final Map dimensions)
+ public CgroupDiskMonitor(String feed)
{
- this(dimensions, DEFAULT_METRICS_FEED);
+ this(ProcSelfCgroupDiscoverer.autoCgroupDiscoverer(), feed);
}
public CgroupDiskMonitor()
{
- this(ImmutableMap.of());
+ this(DEFAULT_METRICS_FEED);
}
@Override
@@ -97,7 +90,6 @@ private boolean doMonitorV1(ServiceEmitter emitter)
if (stats != null) {
final ServiceMetricEvent.Builder builder = builder()
.setDimension("diskName", entry.getValue().getDiskName());
- MonitorUtils.addDimensionsToBuilder(builder, dimensions);
builder.setDimension("cgroupversion", cgroupDiscoverer.getCgroupVersion());
for (Map.Entry stat : stats.entrySet()) {
emitter.emit(builder.setMetric(stat.getKey(), stat.getValue()));
diff --git a/processing/src/main/java/org/apache/druid/java/util/metrics/CgroupMemoryMonitor.java b/processing/src/main/java/org/apache/druid/java/util/metrics/CgroupMemoryMonitor.java
index 24c2aa35bc59..ecffdfb779e6 100644
--- a/processing/src/main/java/org/apache/druid/java/util/metrics/CgroupMemoryMonitor.java
+++ b/processing/src/main/java/org/apache/druid/java/util/metrics/CgroupMemoryMonitor.java
@@ -19,7 +19,6 @@
package org.apache.druid.java.util.metrics;
-import com.google.common.collect.ImmutableMap;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.java.util.emitter.service.ServiceEmitter;
@@ -29,8 +28,6 @@
import org.apache.druid.java.util.metrics.cgroups.Memory;
import org.apache.druid.java.util.metrics.cgroups.ProcSelfCgroupDiscoverer;
-import java.util.Map;
-
public class CgroupMemoryMonitor extends FeedDefiningMonitor
{
private static final Logger LOG = new Logger(CgroupMemoryMonitor.class);
@@ -38,40 +35,33 @@ public class CgroupMemoryMonitor extends FeedDefiningMonitor
private static final String MEMORY_LIMIT_FILE = "memory.limit_in_bytes";
final CgroupDiscoverer cgroupDiscoverer;
- final Map dimensions;
private final boolean isRunningOnCgroupsV2;
private final CgroupV2MemoryMonitor cgroupV2MemoryMonitor;
- public CgroupMemoryMonitor(CgroupDiscoverer cgroupDiscoverer, final Map dimensions, String feed)
+ public CgroupMemoryMonitor(CgroupDiscoverer cgroupDiscoverer, String feed)
{
super(feed);
this.cgroupDiscoverer = cgroupDiscoverer;
- this.dimensions = dimensions;
-
+
// Check if we're running on cgroups v2
this.isRunningOnCgroupsV2 = cgroupDiscoverer.getCgroupVersion().equals(CgroupVersion.V2);
if (isRunningOnCgroupsV2) {
- this.cgroupV2MemoryMonitor = new CgroupV2MemoryMonitor(cgroupDiscoverer, dimensions, feed);
+ this.cgroupV2MemoryMonitor = new CgroupV2MemoryMonitor(cgroupDiscoverer, feed);
LOG.info("Detected cgroups v2, using CgroupV2MemoryMonitor behavior for accurate metrics");
} else {
this.cgroupV2MemoryMonitor = null;
}
}
- public CgroupMemoryMonitor(final Map dimensions, String feed)
- {
- this(ProcSelfCgroupDiscoverer.autoCgroupDiscoverer(), dimensions, feed);
- }
-
- public CgroupMemoryMonitor(final Map dimensions)
+ public CgroupMemoryMonitor(String feed)
{
- this(dimensions, DEFAULT_METRICS_FEED);
+ this(ProcSelfCgroupDiscoverer.autoCgroupDiscoverer(), feed);
}
public CgroupMemoryMonitor()
{
- this(ImmutableMap.of());
+ this(DEFAULT_METRICS_FEED);
}
@Override
@@ -80,7 +70,7 @@ public boolean doMonitor(ServiceEmitter emitter)
if (isRunningOnCgroupsV2) {
return cgroupV2MemoryMonitor.doMonitor(emitter);
} else {
- return parseAndEmit(emitter, cgroupDiscoverer, dimensions, MEMORY_USAGE_FILE, MEMORY_LIMIT_FILE, this);
+ return parseAndEmit(emitter, cgroupDiscoverer, MEMORY_USAGE_FILE, MEMORY_LIMIT_FILE, this);
}
}
@@ -90,7 +80,6 @@ public boolean doMonitor(ServiceEmitter emitter)
public static boolean parseAndEmit(
ServiceEmitter emitter,
CgroupDiscoverer cgroupDiscoverer,
- Map dimensions,
String memoryUsageFile,
String memoryLimitFile,
FeedDefiningMonitor feedDefiningMonitor
@@ -99,7 +88,6 @@ public static boolean parseAndEmit(
final Memory memory = new Memory(cgroupDiscoverer);
final Memory.MemoryStat stat = memory.snapshot(memoryUsageFile, memoryLimitFile);
final ServiceMetricEvent.Builder builder = feedDefiningMonitor.builder();
- MonitorUtils.addDimensionsToBuilder(builder, dimensions);
builder.setDimension("cgroupversion", cgroupDiscoverer.getCgroupVersion().name());
emitter.emit(builder.setMetric("cgroup/memory/usage/bytes", stat.getUsage()));
emitter.emit(builder.setMetric("cgroup/memory/limit/bytes", stat.getLimit()));
diff --git a/processing/src/main/java/org/apache/druid/java/util/metrics/CgroupV2CpuMonitor.java b/processing/src/main/java/org/apache/druid/java/util/metrics/CgroupV2CpuMonitor.java
index 3e9dbb4a3fe5..bf95394485cd 100644
--- a/processing/src/main/java/org/apache/druid/java/util/metrics/CgroupV2CpuMonitor.java
+++ b/processing/src/main/java/org/apache/druid/java/util/metrics/CgroupV2CpuMonitor.java
@@ -39,20 +39,18 @@ public class CgroupV2CpuMonitor extends FeedDefiningMonitor
{
private static final String SNAPSHOT = "snapshot";
final CgroupDiscoverer cgroupDiscoverer;
- final Map dimensions;
private final KeyedDiff diff = new KeyedDiff();
- public CgroupV2CpuMonitor(CgroupDiscoverer cgroupDiscoverer, final Map dimensions, String feed)
+ public CgroupV2CpuMonitor(CgroupDiscoverer cgroupDiscoverer, String feed)
{
super(feed);
this.cgroupDiscoverer = cgroupDiscoverer;
- this.dimensions = dimensions;
}
@VisibleForTesting
CgroupV2CpuMonitor(CgroupDiscoverer cgroupDiscoverer)
{
- this(cgroupDiscoverer, ImmutableMap.of(), DEFAULT_METRICS_FEED);
+ this(cgroupDiscoverer, DEFAULT_METRICS_FEED);
}
CgroupV2CpuMonitor()
@@ -64,7 +62,6 @@ public CgroupV2CpuMonitor(CgroupDiscoverer cgroupDiscoverer, final Map dimensions, String feed)
+ public CgroupV2CpuSetMonitor(CgroupDiscoverer cgroupDiscoverer, String feed)
{
- super(cgroupDiscoverer, dimensions, feed);
+ super(cgroupDiscoverer, feed);
}
@VisibleForTesting
CgroupV2CpuSetMonitor(CgroupDiscoverer cgroupDiscoverer)
{
- this(cgroupDiscoverer, ImmutableMap.of(), DEFAULT_METRICS_FEED);
+ this(cgroupDiscoverer, DEFAULT_METRICS_FEED);
}
CgroupV2CpuSetMonitor()
diff --git a/processing/src/main/java/org/apache/druid/java/util/metrics/CgroupV2DiskMonitor.java b/processing/src/main/java/org/apache/druid/java/util/metrics/CgroupV2DiskMonitor.java
index b35f780851b0..1df7181677b7 100644
--- a/processing/src/main/java/org/apache/druid/java/util/metrics/CgroupV2DiskMonitor.java
+++ b/processing/src/main/java/org/apache/druid/java/util/metrics/CgroupV2DiskMonitor.java
@@ -47,20 +47,18 @@ public class CgroupV2DiskMonitor extends FeedDefiningMonitor
private static final Logger LOG = new Logger(CgroupV2DiskMonitor.class);
private static final String IO_STAT = "io.stat";
final CgroupDiscoverer cgroupDiscoverer;
- final Map dimensions;
private final KeyedDiff diff = new KeyedDiff();
- public CgroupV2DiskMonitor(CgroupDiscoverer cgroupDiscoverer, final Map dimensions, String feed)
+ public CgroupV2DiskMonitor(CgroupDiscoverer cgroupDiscoverer, String feed)
{
super(feed);
this.cgroupDiscoverer = cgroupDiscoverer;
- this.dimensions = dimensions;
}
@VisibleForTesting
CgroupV2DiskMonitor(CgroupDiscoverer cgroupDiscoverer)
{
- this(cgroupDiscoverer, ImmutableMap.of(), DEFAULT_METRICS_FEED);
+ this(cgroupDiscoverer, DEFAULT_METRICS_FEED);
}
CgroupV2DiskMonitor()
@@ -86,7 +84,6 @@ public boolean doMonitor(ServiceEmitter emitter)
if (stats != null) {
final ServiceMetricEvent.Builder builder = builder()
.setDimension("diskName", entry.getDiskName());
- MonitorUtils.addDimensionsToBuilder(builder, dimensions);
builder.setDimension("cgroupversion", cgroupDiscoverer.getCgroupVersion());
for (Map.Entry stat : stats.entrySet()) {
emitter.emit(builder.setMetric(stat.getKey(), stat.getValue()));
diff --git a/processing/src/main/java/org/apache/druid/java/util/metrics/CgroupV2MemoryMonitor.java b/processing/src/main/java/org/apache/druid/java/util/metrics/CgroupV2MemoryMonitor.java
index 43b15cbf728b..8207cc07f8d2 100644
--- a/processing/src/main/java/org/apache/druid/java/util/metrics/CgroupV2MemoryMonitor.java
+++ b/processing/src/main/java/org/apache/druid/java/util/metrics/CgroupV2MemoryMonitor.java
@@ -20,14 +20,11 @@
package org.apache.druid.java.util.metrics;
import com.google.common.annotations.VisibleForTesting;
-import com.google.common.collect.ImmutableMap;
import org.apache.druid.java.util.emitter.service.ServiceEmitter;
import org.apache.druid.java.util.metrics.cgroups.CgroupDiscoverer;
import org.apache.druid.java.util.metrics.cgroups.ProcCgroupV2Discoverer;
import org.apache.druid.java.util.metrics.cgroups.ProcSelfCgroupDiscoverer;
-import java.util.Map;
-
/**
* Monitor that reports memory usage stats by reading `memory.*` files reported by cgroupv2
*/
@@ -37,40 +34,31 @@ public class CgroupV2MemoryMonitor extends FeedDefiningMonitor
private static final String MEMORY_USAGE_FILE = "memory.current";
private static final String MEMORY_LIMIT_FILE = "memory.max";
private final CgroupDiscoverer cgroupDiscoverer;
- private final Map dimensions;
@VisibleForTesting
- CgroupV2MemoryMonitor(CgroupDiscoverer cgroupDiscoverer, final Map dimensions, String feed)
+ CgroupV2MemoryMonitor(CgroupDiscoverer cgroupDiscoverer, String feed)
{
super(feed);
this.cgroupDiscoverer = cgroupDiscoverer;
- this.dimensions = dimensions;
}
- public CgroupV2MemoryMonitor(final Map dimensions, String feed)
+ public CgroupV2MemoryMonitor(String feed)
{
- this(new ProcSelfCgroupDiscoverer(ProcCgroupV2Discoverer.class), dimensions, feed);
- }
-
- public CgroupV2MemoryMonitor(final Map dimensions)
- {
- this(dimensions, DEFAULT_METRICS_FEED);
+ this(new ProcSelfCgroupDiscoverer(ProcCgroupV2Discoverer.class), feed);
}
public CgroupV2MemoryMonitor()
{
- this(ImmutableMap.of());
+ this(DEFAULT_METRICS_FEED);
}
-
@Override
public boolean doMonitor(ServiceEmitter emitter)
{
return CgroupMemoryMonitor.parseAndEmit(
emitter,
cgroupDiscoverer,
- dimensions,
MEMORY_USAGE_FILE,
MEMORY_LIMIT_FILE,
this
diff --git a/processing/src/main/java/org/apache/druid/java/util/metrics/CpuAcctDeltaMonitor.java b/processing/src/main/java/org/apache/druid/java/util/metrics/CpuAcctDeltaMonitor.java
index 10d46258556c..7d6364422d87 100644
--- a/processing/src/main/java/org/apache/druid/java/util/metrics/CpuAcctDeltaMonitor.java
+++ b/processing/src/main/java/org/apache/druid/java/util/metrics/CpuAcctDeltaMonitor.java
@@ -20,7 +20,6 @@
package org.apache.druid.java.util.metrics;
import com.google.common.base.Preconditions;
-import com.google.common.collect.ImmutableMap;
import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.logger.Logger;
@@ -32,7 +31,6 @@
import org.apache.druid.java.util.metrics.cgroups.ProcSelfCgroupDiscoverer;
import org.joda.time.DateTime;
-import java.util.Map;
import java.util.concurrent.atomic.AtomicReference;
public class CpuAcctDeltaMonitor extends FeedDefiningMonitor
@@ -44,35 +42,23 @@ public class CpuAcctDeltaMonitor extends FeedDefiningMonitor
CgroupV2CpuMonitor.class.getSimpleName()
);
private final AtomicReference priorSnapshot = new AtomicReference<>(null);
- private final Map dimensions;
private final CgroupDiscoverer cgroupDiscoverer;
private final boolean isRunningOnCgroupsV2;
public CpuAcctDeltaMonitor()
{
- this(ImmutableMap.of());
+ this(DEFAULT_METRICS_FEED);
}
- public CpuAcctDeltaMonitor(final Map dimensions)
+ public CpuAcctDeltaMonitor(final String feed)
{
- this(dimensions, DEFAULT_METRICS_FEED);
+ this(feed, ProcSelfCgroupDiscoverer.autoCgroupDiscoverer());
}
- public CpuAcctDeltaMonitor(final Map dimensions, final String feed)
- {
- this(feed, dimensions, ProcSelfCgroupDiscoverer.autoCgroupDiscoverer());
- }
-
- public CpuAcctDeltaMonitor(
- String feed,
- Map dimensions,
- CgroupDiscoverer cgroupDiscoverer
- )
+ public CpuAcctDeltaMonitor(String feed, CgroupDiscoverer cgroupDiscoverer)
{
super(feed);
- Preconditions.checkNotNull(dimensions);
- this.dimensions = ImmutableMap.copyOf(dimensions);
this.cgroupDiscoverer = Preconditions.checkNotNull(cgroupDiscoverer, "cgroupDiscoverer required");
isRunningOnCgroupsV2 = cgroupDiscoverer.getCgroupVersion().equals(CgroupVersion.V2);
@@ -119,8 +105,6 @@ public boolean doMonitor(ServiceEmitter emitter)
.setDimension("cpuName", Integer.toString(i))
.setDimension("cpuTime", "sys")
.setDimension("cgroupversion", cgroupDiscoverer.getCgroupVersion().name());
- MonitorUtils.addDimensionsToBuilder(builderUsr, dimensions);
- MonitorUtils.addDimensionsToBuilder(builderSys, dimensions);
emitter.emit(builderUsr.setCreatedTime(dateTime).setMetric(
"cgroup/cpu_time_delta_ns",
snapshot.usrTime(i) - priorSnapshotHolder.metric.usrTime(i)
diff --git a/processing/src/main/java/org/apache/druid/java/util/metrics/JvmCpuMonitor.java b/processing/src/main/java/org/apache/druid/java/util/metrics/JvmCpuMonitor.java
index b587d26ee607..606be1daba95 100644
--- a/processing/src/main/java/org/apache/druid/java/util/metrics/JvmCpuMonitor.java
+++ b/processing/src/main/java/org/apache/druid/java/util/metrics/JvmCpuMonitor.java
@@ -19,7 +19,6 @@
package org.apache.druid.java.util.metrics;
-import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableMap;
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.java.util.emitter.service.ServiceEmitter;
@@ -39,23 +38,14 @@ public class JvmCpuMonitor extends FeedDefiningMonitor
private final KeyedDiff diff = new KeyedDiff();
- private Map dimensions;
-
public JvmCpuMonitor()
{
- this(ImmutableMap.of());
- }
-
- public JvmCpuMonitor(Map dimensions)
- {
- this(dimensions, DEFAULT_METRICS_FEED);
+ this(DEFAULT_METRICS_FEED);
}
- public JvmCpuMonitor(Map dimensions, String feed)
+ public JvmCpuMonitor(String feed)
{
super(feed);
- Preconditions.checkNotNull(dimensions);
- this.dimensions = ImmutableMap.copyOf(dimensions);
}
@Override
@@ -65,7 +55,6 @@ public boolean doMonitor(ServiceEmitter emitter)
try {
ProcCpu procCpu = sigar.getProcCpu(currentProcessId);
final ServiceMetricEvent.Builder builder = builder();
- MonitorUtils.addDimensionsToBuilder(builder, dimensions);
// delta for total, sys, user
Map procDiff = diff.to(
"proc/cpu", ImmutableMap.of(
diff --git a/processing/src/main/java/org/apache/druid/java/util/metrics/JvmMonitor.java b/processing/src/main/java/org/apache/druid/java/util/metrics/JvmMonitor.java
index 2a2bfdeda57f..08a9ab5bfc8e 100644
--- a/processing/src/main/java/org/apache/druid/java/util/metrics/JvmMonitor.java
+++ b/processing/src/main/java/org/apache/druid/java/util/metrics/JvmMonitor.java
@@ -20,7 +20,6 @@
package org.apache.druid.java.util.metrics;
import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableMap;
import org.apache.druid.java.util.common.Pair;
import org.apache.druid.java.util.emitter.service.ServiceEmitter;
@@ -45,25 +44,17 @@ public class JvmMonitor extends FeedDefiningMonitor
@VisibleForTesting
@Nullable
final GcCollectors gcCollectors;
- private final Map dimensions;
@Nullable
private final AllocationMetricCollector collector;
public JvmMonitor()
{
- this(ImmutableMap.of());
+ this(DEFAULT_METRICS_FEED);
}
- public JvmMonitor(Map dimensions)
- {
- this(dimensions, DEFAULT_METRICS_FEED);
- }
-
- public JvmMonitor(Map dimensions, String feed)
+ public JvmMonitor(String feed)
{
super(feed);
- Preconditions.checkNotNull(dimensions);
- this.dimensions = ImmutableMap.copyOf(dimensions);
this.collector = AllocationMetricCollectors.getAllocationMetricCollector();
this.gcCollectors = new GcCollectors();
}
@@ -82,7 +73,6 @@ public boolean doMonitor(ServiceEmitter emitter)
private void emitThreadAllocationMetrics(ServiceEmitter emitter)
{
final ServiceMetricEvent.Builder builder = builder();
- MonitorUtils.addDimensionsToBuilder(builder, dimensions);
builder.setDimension(JVM_VERSION, JAVA_VERSION);
if (collector != null) {
long delta = collector.calculateDelta();
@@ -108,7 +98,6 @@ private void emitJvmMemMetrics(ServiceEmitter emitter)
final ServiceMetricEvent.Builder builder = builder()
.setDimension("memKind", kind)
.setDimension(JVM_VERSION, JAVA_VERSION);
- MonitorUtils.addDimensionsToBuilder(builder, dimensions);
emitter.emit(builder.setMetric("jvm/mem/max", usage.getMax()));
emitter.emit(builder.setMetric("jvm/mem/committed", usage.getCommitted()));
@@ -124,7 +113,6 @@ private void emitJvmMemMetrics(ServiceEmitter emitter)
.setDimension("poolKind", kind)
.setDimension("poolName", pool.getName())
.setDimension(JVM_VERSION, JAVA_VERSION);
- MonitorUtils.addDimensionsToBuilder(builder, dimensions);
emitter.emit(builder.setMetric("jvm/pool/max", usage.getMax()));
emitter.emit(builder.setMetric("jvm/pool/committed", usage.getCommitted()));
@@ -139,7 +127,6 @@ private void emitDirectMemMetrics(ServiceEmitter emitter)
final ServiceMetricEvent.Builder builder = builder()
.setDimension("bufferpoolName", pool.getName())
.setDimension(JVM_VERSION, JAVA_VERSION);
- MonitorUtils.addDimensionsToBuilder(builder, dimensions);
emitter.emit(builder.setMetric("jvm/bufferpool/capacity", pool.getTotalCapacity()));
emitter.emit(builder.setMetric("jvm/bufferpool/used", pool.getMemoryUsed()));
@@ -149,7 +136,7 @@ private void emitDirectMemMetrics(ServiceEmitter emitter)
private void emitGcMetrics(ServiceEmitter emitter)
{
- gcCollectors.emit(emitter, dimensions);
+ gcCollectors.emit(emitter);
}
private class GcCollectors
@@ -174,14 +161,14 @@ private class GcCollectors
}
- void emit(ServiceEmitter emitter, Map dimensions)
+ void emit(ServiceEmitter emitter)
{
for (GcGenerationCollector generationCollector : generationCollectors) {
- generationCollector.emit(emitter, dimensions);
+ generationCollector.emit(emitter);
}
for (GcSpaceCollector spaceCollector : spaceCollectors) {
- spaceCollector.emit(emitter, dimensions);
+ spaceCollector.emit(emitter);
}
}
}
@@ -253,11 +240,10 @@ private Pair getReadableName(String name)
}
}
- void emit(ServiceEmitter emitter, Map dimensions)
+ void emit(ServiceEmitter emitter)
{
ImmutableMap.Builder dimensionsCopyBuilder = ImmutableMap
.builder()
- .putAll(dimensions)
.put("gcGen", new String[]{generation});
dimensionsCopyBuilder.put("gcName", new String[]{collectorName});
@@ -289,10 +275,10 @@ public GcSpaceCollector(MemoryUsage collectionUsage, String name)
spaces.add(new GcGenerationSpace(collectionUsage, name));
}
- void emit(ServiceEmitter emitter, Map dimensions)
+ void emit(ServiceEmitter emitter)
{
for (GcGenerationSpace space : spaces) {
- space.emit(emitter, dimensions);
+ space.emit(emitter);
}
}
}
@@ -308,11 +294,9 @@ public GcGenerationSpace(MemoryUsage memoryUsage, String name)
this.name = name;
}
- void emit(ServiceEmitter emitter, Map dimensions)
+ void emit(ServiceEmitter emitter)
{
final ServiceMetricEvent.Builder builder = builder();
- MonitorUtils.addDimensionsToBuilder(builder, dimensions);
-
builder
.setDimension(JVM_VERSION, JAVA_VERSION)
.setDimension("gcGenSpaceName", name);
diff --git a/processing/src/main/java/org/apache/druid/java/util/metrics/JvmThreadsMonitor.java b/processing/src/main/java/org/apache/druid/java/util/metrics/JvmThreadsMonitor.java
index d81fab69429e..b37e38dd04a9 100644
--- a/processing/src/main/java/org/apache/druid/java/util/metrics/JvmThreadsMonitor.java
+++ b/processing/src/main/java/org/apache/druid/java/util/metrics/JvmThreadsMonitor.java
@@ -19,32 +19,25 @@
package org.apache.druid.java.util.metrics;
-import com.google.common.base.Preconditions;
-import com.google.common.collect.ImmutableMap;
import org.apache.druid.java.util.emitter.service.ServiceEmitter;
import org.apache.druid.java.util.emitter.service.ServiceMetricEvent;
import java.lang.management.ManagementFactory;
import java.lang.management.ThreadMXBean;
-import java.util.Map;
public class JvmThreadsMonitor extends FeedDefiningMonitor
{
- private final Map dimensions;
-
private int lastLiveThreads = 0;
private long lastStartedThreads = 0;
- public JvmThreadsMonitor(Map dimensions)
+ public JvmThreadsMonitor()
{
- this(dimensions, DEFAULT_METRICS_FEED);
+ this(DEFAULT_METRICS_FEED);
}
- public JvmThreadsMonitor(Map dimensions, String feed)
+ public JvmThreadsMonitor(String feed)
{
super(feed);
- Preconditions.checkNotNull(dimensions, "dimensions");
- this.dimensions = ImmutableMap.copyOf(dimensions);
}
@Override
@@ -53,7 +46,6 @@ public boolean doMonitor(ServiceEmitter emitter)
ThreadMXBean threadBean = ManagementFactory.getThreadMXBean();
final ServiceMetricEvent.Builder builder = builder();
- MonitorUtils.addDimensionsToBuilder(builder, dimensions);
// Because between next two calls on ThreadMXBean new threads can be started we can observe some inconsistency
// in counters values and finished counter could be even negative
diff --git a/processing/src/main/java/org/apache/druid/java/util/metrics/Monitors.java b/processing/src/main/java/org/apache/druid/java/util/metrics/Monitors.java
index 3893f1a6feb3..7af8d54eed3a 100644
--- a/processing/src/main/java/org/apache/druid/java/util/metrics/Monitors.java
+++ b/processing/src/main/java/org/apache/druid/java/util/metrics/Monitors.java
@@ -20,22 +20,19 @@
package org.apache.druid.java.util.metrics;
import java.util.List;
-import java.util.Map;
public class Monitors
{
/**
* Creates a JVM monitor, configured with the given dimensions, that gathers all currently available JVM-wide
* monitors. Emitted events have default feed {@link FeedDefiningMonitor#DEFAULT_METRICS_FEED}
- * See: {@link Monitors#createCompoundJvmMonitor(Map, String)}
- *
- * @param dimensions common dimensions to configure the JVM monitor with
+ * See: {@link Monitors#createCompoundJvmMonitor(String)}
*
* @return a universally useful JVM-wide monitor
*/
- public static Monitor createCompoundJvmMonitor(Map dimensions)
+ public static Monitor createCompoundJvmMonitor()
{
- return createCompoundJvmMonitor(dimensions, FeedDefiningMonitor.DEFAULT_METRICS_FEED);
+ return createCompoundJvmMonitor(FeedDefiningMonitor.DEFAULT_METRICS_FEED);
}
/**
@@ -43,20 +40,19 @@ public static Monitor createCompoundJvmMonitor(Map dimensions)
* monitors: {@link JvmMonitor}, {@link JvmCpuMonitor} and {@link JvmThreadsMonitor} (this list may
* change in any future release of this library, including a minor release).
*
- * @param dimensions common dimensions to configure the JVM monitor with
* @param feed feed for all emitted events
*
* @return a universally useful JVM-wide monitor
*/
- public static Monitor createCompoundJvmMonitor(Map dimensions, String feed)
+ public static Monitor createCompoundJvmMonitor(String feed)
{
// This list doesn't include SysMonitor because it should probably be run only in one JVM, if several JVMs are
// running on the same instance, so most of the time SysMonitor should be configured/set up differently than
// "simple" JVM monitors, created below.
return and(// Could equally be or(), because all member monitors always return true from their monitor() methods.
- new JvmMonitor(dimensions, feed),
- new JvmCpuMonitor(dimensions, feed),
- new JvmThreadsMonitor(dimensions, feed)
+ new JvmMonitor(feed),
+ new JvmCpuMonitor(feed),
+ new JvmThreadsMonitor(feed)
);
}
diff --git a/processing/src/main/java/org/apache/druid/java/util/metrics/NoopOshiSysMonitor.java b/processing/src/main/java/org/apache/druid/java/util/metrics/NoopOshiSysMonitor.java
index 9a693f7bfdf4..8d66ee21b391 100644
--- a/processing/src/main/java/org/apache/druid/java/util/metrics/NoopOshiSysMonitor.java
+++ b/processing/src/main/java/org/apache/druid/java/util/metrics/NoopOshiSysMonitor.java
@@ -20,14 +20,13 @@
package org.apache.druid.java.util.metrics;
import com.google.common.collect.ImmutableList;
-import com.google.common.collect.ImmutableMap;
import org.apache.druid.java.util.emitter.service.ServiceEmitter;
public class NoopOshiSysMonitor extends OshiSysMonitor
{
public NoopOshiSysMonitor()
{
- super(ImmutableMap.of(), new OshiSysMonitorConfig(ImmutableList.of()));
+ super(new OshiSysMonitorConfig(ImmutableList.of()));
}
@Override
diff --git a/server/src/main/java/org/apache/druid/server/metrics/NoopTaskHolder.java b/processing/src/main/java/org/apache/druid/java/util/metrics/NoopTaskHolder.java
similarity index 79%
rename from server/src/main/java/org/apache/druid/server/metrics/NoopTaskHolder.java
rename to processing/src/main/java/org/apache/druid/java/util/metrics/NoopTaskHolder.java
index d870e4869e7b..5cfe957891da 100644
--- a/server/src/main/java/org/apache/druid/server/metrics/NoopTaskHolder.java
+++ b/processing/src/main/java/org/apache/druid/java/util/metrics/NoopTaskHolder.java
@@ -17,9 +17,10 @@
* under the License.
*/
-package org.apache.druid.server.metrics;
+package org.apache.druid.java.util.metrics;
import javax.annotation.Nullable;
+import java.util.Map;
/**
* A TaskHolder implementation for all servers that are not {@code CliPeon}.
@@ -42,4 +43,24 @@ public String getTaskId()
{
return null;
}
+
+ @Nullable
+ @Override
+ public String getTaskType()
+ {
+ return null;
+ }
+
+ @Nullable
+ @Override
+ public String getGroupId()
+ {
+ return null;
+ }
+
+ @Override
+ public Map getMetricDimensions()
+ {
+ return Map.of();
+ }
}
diff --git a/processing/src/main/java/org/apache/druid/java/util/metrics/OshiSysMonitor.java b/processing/src/main/java/org/apache/druid/java/util/metrics/OshiSysMonitor.java
index 130c353c8ad7..f60ae8e963c2 100644
--- a/processing/src/main/java/org/apache/druid/java/util/metrics/OshiSysMonitor.java
+++ b/processing/src/main/java/org/apache/druid/java/util/metrics/OshiSysMonitor.java
@@ -20,7 +20,6 @@
package org.apache.druid.java.util.metrics;
import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import org.apache.druid.java.util.emitter.service.ServiceEmitter;
@@ -73,7 +72,6 @@ public class OshiSysMonitor extends FeedDefiningMonitor
private final SysStats sysStats;
private final TcpStats tcpStats;
- private final Map dimensions;
private final OshiSysMonitorConfig config;
private final Map> monitoringFunctions = ImmutableMap.of(
"mem", this::monitorMemStats,
@@ -86,18 +84,16 @@ public class OshiSysMonitor extends FeedDefiningMonitor
"tcp", this::monitorTcpStats
);
- public OshiSysMonitor(Map dimensions, OshiSysMonitorConfig config)
+ public OshiSysMonitor(OshiSysMonitorConfig config)
{
- this(dimensions, config, new SystemInfo());
+ this(config, new SystemInfo());
}
// Create an object with mocked systemInfo for testing purposes
@VisibleForTesting
- public OshiSysMonitor(Map dimensions, OshiSysMonitorConfig config, SystemInfo systemInfo)
+ public OshiSysMonitor(OshiSysMonitorConfig config, SystemInfo systemInfo)
{
super(DEFAULT_METRICS_FEED);
- Preconditions.checkNotNull(dimensions);
- this.dimensions = ImmutableMap.copyOf(dimensions);
this.config = config;
this.si = systemInfo;
@@ -188,7 +184,6 @@ public void emit(ServiceEmitter emitter)
mem.getAvailable()
);
final ServiceMetricEvent.Builder builder = builder();
- MonitorUtils.addDimensionsToBuilder(builder, dimensions);
for (Map.Entry entry : stats.entrySet()) {
emitter.emit(builder.setMetric(entry.getKey(), entry.getValue()));
}
@@ -216,7 +211,6 @@ public void emit(ServiceEmitter emitter)
);
final ServiceMetricEvent.Builder builder = builder();
- MonitorUtils.addDimensionsToBuilder(builder, dimensions);
for (Map.Entry entry : stats.entrySet()) {
emitter.emit(builder.setMetric(entry.getKey(), entry.getValue()));
}
@@ -243,7 +237,6 @@ public void emit(ServiceEmitter emitter)
final ServiceMetricEvent.Builder builder = builder()
.setDimension("fsDevName", fs.getVolume())
.setDimension("fsDirName", fs.getMount());
- MonitorUtils.addDimensionsToBuilder(builder, dimensions);
for (Map.Entry entry : stats.entrySet()) {
emitter.emit(builder.setMetric(entry.getKey(), entry.getValue()));
}
@@ -277,7 +270,6 @@ public void emit(ServiceEmitter emitter)
if (stats != null) {
final ServiceMetricEvent.Builder builder = builder()
.setDimension("diskName", disk.getName());
- MonitorUtils.addDimensionsToBuilder(builder, dimensions);
for (Map.Entry entry : stats.entrySet()) {
emitter.emit(builder.setMetric(entry.getKey(), entry.getValue()));
}
@@ -319,7 +311,6 @@ public void emit(ServiceEmitter emitter)
.setDimension("netName", net.getName())
.setDimension("netAddress", addr)
.setDimension("netHwaddr", net.getMacaddr());
- MonitorUtils.addDimensionsToBuilder(builder, dimensions);
for (Map.Entry entry : stats.entrySet()) {
emitter.emit(builder.setMetric(entry.getKey(), entry.getValue()));
}
@@ -371,7 +362,6 @@ public void emit(ServiceEmitter emitter)
final ServiceMetricEvent.Builder builder = builder()
.setDimension("cpuName", name)
.setDimension("cpuTime", entry.getKey());
- MonitorUtils.addDimensionsToBuilder(builder, dimensions);
if (total != 0) {
// prevent divide by 0 exception and don't emit such events
emitter.emit(builder.setMetric("sys/cpu", entry.getValue() * 100 / total)); // [0,100]
@@ -389,8 +379,6 @@ private class SysStats
public void emit(ServiceEmitter emitter)
{
final ServiceMetricEvent.Builder builder = builder();
- MonitorUtils.addDimensionsToBuilder(builder, dimensions);
-
long uptime = os.getSystemUptime();
final Map stats = ImmutableMap.of(
@@ -422,7 +410,6 @@ private class TcpStats
public void emit(ServiceEmitter emitter)
{
final ServiceMetricEvent.Builder builder = builder();
- MonitorUtils.addDimensionsToBuilder(builder, dimensions);
InternetProtocolStats ipstats = os.getInternetProtocolStats();
InternetProtocolStats.TcpStats tcpv4 = ipstats.getTCPv4Stats();
diff --git a/processing/src/main/java/org/apache/druid/java/util/metrics/SysMonitor.java b/processing/src/main/java/org/apache/druid/java/util/metrics/SysMonitor.java
index 6082fb261e38..031574d5371b 100644
--- a/processing/src/main/java/org/apache/druid/java/util/metrics/SysMonitor.java
+++ b/processing/src/main/java/org/apache/druid/java/util/metrics/SysMonitor.java
@@ -20,7 +20,6 @@
package org.apache.druid.java.util.metrics;
import com.google.common.base.Joiner;
-import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import org.apache.druid.java.util.common.logger.Logger;
@@ -65,23 +64,14 @@ public class SysMonitor extends FeedDefiningMonitor
private final List statsList;
- private Map dimensions;
-
public SysMonitor()
{
- this(ImmutableMap.of());
- }
-
- public SysMonitor(Map dimensions)
- {
- this(dimensions, DEFAULT_METRICS_FEED);
+ this(DEFAULT_METRICS_FEED);
}
- public SysMonitor(Map dimensions, String feed)
+ public SysMonitor(String feed)
{
super(feed);
- Preconditions.checkNotNull(dimensions);
- this.dimensions = ImmutableMap.copyOf(dimensions);
sigar.enableLogging(true);
@@ -142,7 +132,6 @@ public void emit(ServiceEmitter emitter)
"sys/mem/actual/free", mem.getActualFree()
);
final ServiceMetricEvent.Builder builder = builder();
- MonitorUtils.addDimensionsToBuilder(builder, dimensions);
for (Map.Entry entry : stats.entrySet()) {
emitter.emit(builder.setMetric(entry.getKey(), entry.getValue()));
}
@@ -193,7 +182,6 @@ public void emit(ServiceEmitter emitter)
);
final ServiceMetricEvent.Builder builder = builder();
- MonitorUtils.addDimensionsToBuilder(builder, dimensions);
for (Map.Entry entry : stats.entrySet()) {
emitter.emit(builder.setMetric(entry.getKey(), entry.getValue()));
}
@@ -233,7 +221,6 @@ public void emit(ServiceEmitter emitter)
);
final ServiceMetricEvent.Builder builder = builder()
.setDimension("fsDirName", dir); // fsDirName because FsStats uses fsDirName
- MonitorUtils.addDimensionsToBuilder(builder, dimensions);
for (Map.Entry entry : stats.entrySet()) {
emitter.emit(builder.setMetric(entry.getKey(), entry.getValue()));
}
@@ -279,7 +266,6 @@ public void emit(ServiceEmitter emitter)
.setDimension("fsTypeName", fs.getTypeName())
.setDimension("fsSysTypeName", fs.getSysTypeName())
.setDimension("fsOptions", fs.getOptions().split(","));
- MonitorUtils.addDimensionsToBuilder(builder, dimensions);
for (Map.Entry entry : stats.entrySet()) {
emitter.emit(builder.setMetric(entry.getKey(), entry.getValue()));
}
@@ -340,7 +326,6 @@ public void emit(ServiceEmitter emitter)
.setDimension("fsTypeName", fs.getTypeName())
.setDimension("fsSysTypeName", fs.getSysTypeName())
.setDimension("fsOptions", fs.getOptions().split(","));
- MonitorUtils.addDimensionsToBuilder(builder, dimensions);
for (Map.Entry entry : stats.entrySet()) {
emitter.emit(builder.setMetric(entry.getKey(), entry.getValue()));
}
@@ -410,7 +395,6 @@ public void emit(ServiceEmitter emitter)
.setDimension("netName", netconf.getName())
.setDimension("netAddress", netconf.getAddress())
.setDimension("netHwaddr", netconf.getHwaddr());
- MonitorUtils.addDimensionsToBuilder(builder, dimensions);
for (Map.Entry entry : stats.entrySet()) {
emitter.emit(builder.setMetric(entry.getKey(), entry.getValue()));
}
@@ -463,7 +447,6 @@ public void emit(ServiceEmitter emitter)
final ServiceMetricEvent.Builder builder = builder()
.setDimension("cpuName", name)
.setDimension("cpuTime", entry.getKey());
- MonitorUtils.addDimensionsToBuilder(builder, dimensions);
emitter.emit(builder.setMetric("sys/cpu", entry.getValue() * 100 / total)); // [0,100]
}
}
@@ -478,7 +461,6 @@ private class SysStats implements Stats
public void emit(ServiceEmitter emitter)
{
final ServiceMetricEvent.Builder builder = builder();
- MonitorUtils.addDimensionsToBuilder(builder, dimensions);
Uptime uptime = null;
try {
@@ -526,7 +508,6 @@ private class TcpStats implements Stats
public void emit(ServiceEmitter emitter)
{
final ServiceMetricEvent.Builder builder = builder();
- MonitorUtils.addDimensionsToBuilder(builder, dimensions);
Tcp tcp = null;
try {
diff --git a/server/src/main/java/org/apache/druid/server/metrics/TaskHolder.java b/processing/src/main/java/org/apache/druid/java/util/metrics/TaskHolder.java
similarity index 70%
rename from server/src/main/java/org/apache/druid/server/metrics/TaskHolder.java
rename to processing/src/main/java/org/apache/druid/java/util/metrics/TaskHolder.java
index 5f6bb4e07212..9f0f11077bc7 100644
--- a/server/src/main/java/org/apache/druid/server/metrics/TaskHolder.java
+++ b/processing/src/main/java/org/apache/druid/java/util/metrics/TaskHolder.java
@@ -17,9 +17,10 @@
* under the License.
*/
-package org.apache.druid.server.metrics;
+package org.apache.druid.java.util.metrics;
import javax.annotation.Nullable;
+import java.util.Map;
/**
* Provides identifying information for a task. Implementations return {@code null}
@@ -38,4 +39,21 @@ public interface TaskHolder
*/
@Nullable
String getTaskId();
+
+ /**
+ * @return the type name of this task, or {@code null} if called from a server that is not {@code CliPeon}.
+ */
+ @Nullable
+ String getTaskType();
+
+ /**
+ * @return the group ID of this task, or {@code null} if called from a server that is not {@code CliPeon}.
+ */
+ @Nullable
+ String getGroupId();
+
+ /**
+ * @return a map of task holder dimensions, or an empty map if called from a server that is not {@code CliPeon}.
+ */
+ Map getMetricDimensions();
}
diff --git a/processing/src/test/java/org/apache/druid/java/util/metrics/CgroupCpuMonitorTest.java b/processing/src/test/java/org/apache/druid/java/util/metrics/CgroupCpuMonitorTest.java
index 990addd6d82e..b8f4bd288828 100644
--- a/processing/src/test/java/org/apache/druid/java/util/metrics/CgroupCpuMonitorTest.java
+++ b/processing/src/test/java/org/apache/druid/java/util/metrics/CgroupCpuMonitorTest.java
@@ -19,7 +19,6 @@
package org.apache.druid.java.util.metrics;
-import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import org.apache.druid.java.util.common.FileUtils;
import org.apache.druid.java.util.emitter.core.Event;
@@ -76,8 +75,8 @@ public void setUp() throws IOException
@Test
public void testMonitor() throws IOException, InterruptedException
{
- final CgroupCpuMonitor monitor = new CgroupCpuMonitor(discoverer, ImmutableMap.of(), "some_feed");
- final StubServiceEmitter emitter = new StubServiceEmitter("service", "host");
+ final CgroupCpuMonitor monitor = new CgroupCpuMonitor(discoverer, "some_feed");
+ final StubServiceEmitter emitter = StubServiceEmitter.createStarted();
Assert.assertTrue(monitor.doMonitor(emitter));
final List actualEvents = emitter.getEvents();
Assert.assertEquals(2, actualEvents.size());
@@ -131,9 +130,9 @@ public void testCgroupsV2Detection() throws IOException, URISyntaxException
CgroupDiscoverer v2Discoverer = ProcSelfCgroupDiscoverer.autoCgroupDiscoverer(procV2Dir.toPath());
// Constructor should detect v2 and log warning
- CgroupCpuMonitor monitor = new CgroupCpuMonitor(v2Discoverer, ImmutableMap.of(), "test-feed");
+ CgroupCpuMonitor monitor = new CgroupCpuMonitor(v2Discoverer, "test-feed");
- final StubServiceEmitter emitter = new StubServiceEmitter("service", "host");
+ final StubServiceEmitter emitter = StubServiceEmitter.createStarted();
// doMonitor should return true
Assert.assertTrue(monitor.doMonitor(emitter));
@@ -147,8 +146,8 @@ public void testCgroupsV1MonitoringContinuesNormally() throws IOException, Inter
{
// This test verifies that the existing v1 monitoring continues to work
// after the v2 detection changes
- final CgroupCpuMonitor monitor = new CgroupCpuMonitor(discoverer, ImmutableMap.of(), "some_feed");
- final StubServiceEmitter emitter = new StubServiceEmitter("service", "host");
+ final CgroupCpuMonitor monitor = new CgroupCpuMonitor(discoverer, "some_feed");
+ final StubServiceEmitter emitter = StubServiceEmitter.createStarted();
Assert.assertTrue(monitor.doMonitor(emitter));
final List actualEvents = emitter.getEvents();
diff --git a/processing/src/test/java/org/apache/druid/java/util/metrics/CgroupCpuSetMonitorTest.java b/processing/src/test/java/org/apache/druid/java/util/metrics/CgroupCpuSetMonitorTest.java
index 71ea59ef5fa6..db17161869d1 100644
--- a/processing/src/test/java/org/apache/druid/java/util/metrics/CgroupCpuSetMonitorTest.java
+++ b/processing/src/test/java/org/apache/druid/java/util/metrics/CgroupCpuSetMonitorTest.java
@@ -19,7 +19,6 @@
package org.apache.druid.java.util.metrics;
-import com.google.common.collect.ImmutableMap;
import org.apache.druid.java.util.common.FileUtils;
import org.apache.druid.java.util.metrics.cgroups.CgroupDiscoverer;
import org.apache.druid.java.util.metrics.cgroups.CgroupVersion;
@@ -71,8 +70,8 @@ public void setUp() throws IOException
@Test
public void testMonitor()
{
- final CgroupCpuSetMonitor monitor = new CgroupCpuSetMonitor(discoverer, ImmutableMap.of(), "some_feed");
- final StubServiceEmitter emitter = new StubServiceEmitter("service", "host");
+ final CgroupCpuSetMonitor monitor = new CgroupCpuSetMonitor(discoverer, "some_feed");
+ final StubServiceEmitter emitter = StubServiceEmitter.createStarted();
Assert.assertTrue(monitor.doMonitor(emitter));
Assert.assertEquals(4, emitter.getNumEmittedEvents());
@@ -101,9 +100,9 @@ public void testCgroupsV2DetectionInConstructor() throws IOException
Assert.assertEquals(CgroupVersion.V2, v2Discoverer.getCgroupVersion());
// Constructor should detect v2 and log warning
- CgroupCpuSetMonitor monitor = new CgroupCpuSetMonitor(v2Discoverer, ImmutableMap.of(), "test-feed");
+ CgroupCpuSetMonitor monitor = new CgroupCpuSetMonitor(v2Discoverer, "test-feed");
- final StubServiceEmitter emitter = new StubServiceEmitter("service", "host");
+ final StubServiceEmitter emitter = StubServiceEmitter.createStarted();
// doMonitor should return true but skip actual monitoring
Assert.assertTrue(monitor.doMonitor(emitter));
diff --git a/processing/src/test/java/org/apache/druid/java/util/metrics/CgroupDiskMonitorTest.java b/processing/src/test/java/org/apache/druid/java/util/metrics/CgroupDiskMonitorTest.java
index d75af320f125..ad07a35a6002 100644
--- a/processing/src/test/java/org/apache/druid/java/util/metrics/CgroupDiskMonitorTest.java
+++ b/processing/src/test/java/org/apache/druid/java/util/metrics/CgroupDiskMonitorTest.java
@@ -19,7 +19,6 @@
package org.apache.druid.java.util.metrics;
-import com.google.common.collect.ImmutableMap;
import org.apache.druid.java.util.common.FileUtils;
import org.apache.druid.java.util.metrics.cgroups.CgroupDiscoverer;
import org.apache.druid.java.util.metrics.cgroups.ProcCgroupDiscoverer;
@@ -65,8 +64,8 @@ public void setUp() throws IOException
@Test
public void testMonitor() throws IOException
{
- final CgroupDiskMonitor monitor = new CgroupDiskMonitor(discoverer, ImmutableMap.of(), "some_feed");
- final StubServiceEmitter emitter = new StubServiceEmitter("service", "host");
+ final CgroupDiskMonitor monitor = new CgroupDiskMonitor(discoverer, "some_feed");
+ final StubServiceEmitter emitter = StubServiceEmitter.createStarted();
Assert.assertTrue(monitor.doMonitor(emitter));
Assert.assertEquals(0, emitter.getNumEmittedEvents());
diff --git a/processing/src/test/java/org/apache/druid/java/util/metrics/CgroupMemoryMonitorTest.java b/processing/src/test/java/org/apache/druid/java/util/metrics/CgroupMemoryMonitorTest.java
index 6538cb9691a0..8daad7e543fb 100644
--- a/processing/src/test/java/org/apache/druid/java/util/metrics/CgroupMemoryMonitorTest.java
+++ b/processing/src/test/java/org/apache/druid/java/util/metrics/CgroupMemoryMonitorTest.java
@@ -19,7 +19,6 @@
package org.apache.druid.java.util.metrics;
-import com.google.common.collect.ImmutableMap;
import org.apache.druid.java.util.common.FileUtils;
import org.apache.druid.java.util.metrics.cgroups.CgroupDiscoverer;
import org.apache.druid.java.util.metrics.cgroups.ProcCgroupDiscoverer;
@@ -66,7 +65,7 @@ public void setUp() throws IOException
@Test
public void testMonitor()
{
- final CgroupMemoryMonitor monitor = new CgroupMemoryMonitor(discoverer, ImmutableMap.of(), "some_feed");
+ final CgroupMemoryMonitor monitor = new CgroupMemoryMonitor(discoverer, "some_feed");
final StubServiceEmitter emitter = new StubServiceEmitter("service", "host");
Assert.assertTrue(monitor.doMonitor(emitter));
Assert.assertEquals(46, emitter.getNumEmittedEvents());
diff --git a/processing/src/test/java/org/apache/druid/java/util/metrics/CgroupV2CpuMonitorTest.java b/processing/src/test/java/org/apache/druid/java/util/metrics/CgroupV2CpuMonitorTest.java
index 9a32dbbf86aa..48544d41a021 100644
--- a/processing/src/test/java/org/apache/druid/java/util/metrics/CgroupV2CpuMonitorTest.java
+++ b/processing/src/test/java/org/apache/druid/java/util/metrics/CgroupV2CpuMonitorTest.java
@@ -61,7 +61,7 @@ public void setUp() throws IOException
public void testMonitor() throws IOException, InterruptedException
{
final CgroupV2CpuMonitor monitor = new CgroupV2CpuMonitor(discoverer);
- final StubServiceEmitter emitter = new StubServiceEmitter("service", "host");
+ final StubServiceEmitter emitter = StubServiceEmitter.createStarted();
Assert.assertTrue(monitor.doMonitor(emitter));
Assert.assertEquals(2, emitter.getNumEmittedEvents());
diff --git a/processing/src/test/java/org/apache/druid/java/util/metrics/CgroupV2DiskMonitorTest.java b/processing/src/test/java/org/apache/druid/java/util/metrics/CgroupV2DiskMonitorTest.java
index f9a977edd92e..119937e47a93 100644
--- a/processing/src/test/java/org/apache/druid/java/util/metrics/CgroupV2DiskMonitorTest.java
+++ b/processing/src/test/java/org/apache/druid/java/util/metrics/CgroupV2DiskMonitorTest.java
@@ -59,7 +59,7 @@ public void setUp() throws IOException
public void testMonitor() throws IOException
{
final CgroupV2DiskMonitor monitor = new CgroupV2DiskMonitor(discoverer);
- final StubServiceEmitter emitter = new StubServiceEmitter("service", "host");
+ final StubServiceEmitter emitter = StubServiceEmitter.createStarted();
Assert.assertTrue(monitor.doMonitor(emitter));
Assert.assertEquals(0, emitter.getNumEmittedEvents());
diff --git a/processing/src/test/java/org/apache/druid/java/util/metrics/CgroupV2MemoryMonitorTest.java b/processing/src/test/java/org/apache/druid/java/util/metrics/CgroupV2MemoryMonitorTest.java
index eb8f57a9c466..082f89b0acd8 100644
--- a/processing/src/test/java/org/apache/druid/java/util/metrics/CgroupV2MemoryMonitorTest.java
+++ b/processing/src/test/java/org/apache/druid/java/util/metrics/CgroupV2MemoryMonitorTest.java
@@ -19,7 +19,6 @@
package org.apache.druid.java.util.metrics;
-import com.google.common.collect.ImmutableMap;
import org.apache.druid.java.util.emitter.core.Event;
import org.apache.druid.java.util.metrics.cgroups.CgroupDiscoverer;
import org.apache.druid.java.util.metrics.cgroups.ProcCgroupV2Discoverer;
@@ -65,7 +64,6 @@ public void testMonitor()
{
final CgroupV2MemoryMonitor monitor = new CgroupV2MemoryMonitor(
discoverer,
- ImmutableMap.of(),
FeedDefiningMonitor.DEFAULT_METRICS_FEED
);
final StubServiceEmitter emitter = new StubServiceEmitter("service", "host");
diff --git a/processing/src/test/java/org/apache/druid/java/util/metrics/CpuAcctDeltaMonitorTest.java b/processing/src/test/java/org/apache/druid/java/util/metrics/CpuAcctDeltaMonitorTest.java
index fd7a0d0ef54b..c33b7b789f9a 100644
--- a/processing/src/test/java/org/apache/druid/java/util/metrics/CpuAcctDeltaMonitorTest.java
+++ b/processing/src/test/java/org/apache/druid/java/util/metrics/CpuAcctDeltaMonitorTest.java
@@ -19,7 +19,6 @@
package org.apache.druid.java.util.metrics;
-import com.google.common.collect.ImmutableMap;
import org.apache.druid.java.util.common.FileUtils;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.metrics.cgroups.CgroupDiscoverer;
@@ -69,7 +68,6 @@ public void testMonitorWontCrash()
{
final CpuAcctDeltaMonitor monitor = new CpuAcctDeltaMonitor(
"some_feed",
- ImmutableMap.of(),
TestUtils.exceptionThrowingDiscoverer()
);
final StubServiceEmitter emitter = new StubServiceEmitter("service", "host");
@@ -91,7 +89,6 @@ public void testSimpleMonitor() throws Exception
}
final CpuAcctDeltaMonitor monitor = new CpuAcctDeltaMonitor(
"some_feed",
- ImmutableMap.of(),
new CgroupDiscoverer()
{
@Override
diff --git a/processing/src/test/java/org/apache/druid/java/util/metrics/MonitorsTest.java b/processing/src/test/java/org/apache/druid/java/util/metrics/MonitorsTest.java
index 6f58fa47d454..4e00093cfb27 100644
--- a/processing/src/test/java/org/apache/druid/java/util/metrics/MonitorsTest.java
+++ b/processing/src/test/java/org/apache/druid/java/util/metrics/MonitorsTest.java
@@ -19,7 +19,6 @@
package org.apache.druid.java.util.metrics;
-import com.google.common.collect.ImmutableMap;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.emitter.core.Event;
import org.junit.Assert;
@@ -45,7 +44,7 @@ public void testSetFeed()
{
String feed = "testFeed";
StubServiceEmitter emitter = new StubServiceEmitter("dev/monitor-test", "localhost:0000");
- Monitor m = Monitors.createCompoundJvmMonitor(ImmutableMap.of(), feed);
+ Monitor m = Monitors.createCompoundJvmMonitor(feed);
m.start();
m.monitor(emitter);
m.stop();
@@ -56,7 +55,7 @@ public void testSetFeed()
public void testDefaultFeed()
{
StubServiceEmitter emitter = new StubServiceEmitter("dev/monitor-test", "localhost:0000");
- Monitor m = Monitors.createCompoundJvmMonitor(ImmutableMap.of());
+ Monitor m = Monitors.createCompoundJvmMonitor();
m.start();
m.monitor(emitter);
m.stop();
diff --git a/processing/src/test/java/org/apache/druid/java/util/metrics/OshiSysMonitorTest.java b/processing/src/test/java/org/apache/druid/java/util/metrics/OshiSysMonitorTest.java
index d6c982169fe5..bfc28f99ee9f 100644
--- a/processing/src/test/java/org/apache/druid/java/util/metrics/OshiSysMonitorTest.java
+++ b/processing/src/test/java/org/apache/druid/java/util/metrics/OshiSysMonitorTest.java
@@ -647,6 +647,6 @@ private OshiSysMonitor createMonitor(SystemInfo si)
private OshiSysMonitor createMonitor(SystemInfo si, List categories)
{
- return new OshiSysMonitor(ImmutableMap.of(), new OshiSysMonitorConfig(categories), si);
+ return new OshiSysMonitor(new OshiSysMonitorConfig(categories), si);
}
}
diff --git a/processing/src/test/java/org/apache/druid/java/util/metrics/StubServiceEmitter.java b/processing/src/test/java/org/apache/druid/java/util/metrics/StubServiceEmitter.java
index 8d1146441319..4bcdd58f182c 100644
--- a/processing/src/test/java/org/apache/druid/java/util/metrics/StubServiceEmitter.java
+++ b/processing/src/test/java/org/apache/druid/java/util/metrics/StubServiceEmitter.java
@@ -19,12 +19,15 @@
package org.apache.druid.java.util.metrics;
+import com.google.common.collect.ImmutableMap;
import org.apache.druid.java.util.emitter.core.Event;
+import org.apache.druid.java.util.emitter.core.NoopEmitter;
import org.apache.druid.java.util.emitter.service.AlertEvent;
import org.apache.druid.java.util.emitter.service.ServiceEmitter;
import org.apache.druid.java.util.emitter.service.ServiceMetricEvent;
import javax.annotation.Nullable;
+import java.io.IOException;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Collections;
@@ -41,18 +44,32 @@
*/
public class StubServiceEmitter extends ServiceEmitter implements MetricsVerifier
{
+ public static final String TYPE = "stub";
+
private final Deque events = new ConcurrentLinkedDeque<>();
private final Deque alertEvents = new ConcurrentLinkedDeque<>();
private final ConcurrentHashMap> metricEvents = new ConcurrentHashMap<>();
public StubServiceEmitter()
{
- super("testing", "localhost", null);
+ this("testing", "localhost");
}
public StubServiceEmitter(String service, String host)
{
- super(service, host, null);
+ this(service, host, new NoopTaskHolder());
+ }
+
+ public StubServiceEmitter(String service, String host, TaskHolder taskHolder)
+ {
+ super(service, host, new NoopEmitter(), ImmutableMap.of(), taskHolder);
+ }
+
+ public static StubServiceEmitter createStarted()
+ {
+ final StubServiceEmitter stubServiceEmitter = new StubServiceEmitter();
+ stubServiceEmitter.start();
+ return stubServiceEmitter;
}
@Override
@@ -103,7 +120,7 @@ public List getAlerts()
@Override
public List getMetricValues(
String metricName,
- Map dimensionFilters
+ @Nullable Map dimensionFilters
)
{
final List values = new ArrayList<>();
@@ -160,6 +177,7 @@ public Number getLatestMetricEventValue(String metricName, Number defaultValue)
@Override
public void start()
{
+ super.start();
}
@Override
@@ -173,5 +191,11 @@ public void flush()
@Override
public void close()
{
+ try {
+ super.close();
+ }
+ catch (IOException e) {
+ throw new RuntimeException(e);
+ }
}
}
diff --git a/processing/src/test/java/org/apache/druid/java/util/metrics/StubServiceEmitterModule.java b/processing/src/test/java/org/apache/druid/java/util/metrics/StubServiceEmitterModule.java
new file mode 100644
index 000000000000..cb244b19174b
--- /dev/null
+++ b/processing/src/test/java/org/apache/druid/java/util/metrics/StubServiceEmitterModule.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.java.util.metrics;
+
+import com.google.inject.Binder;
+import com.google.inject.Module;
+import com.google.inject.Provides;
+import com.google.inject.name.Named;
+import org.apache.druid.guice.ManageLifecycle;
+import org.apache.druid.java.util.emitter.core.Emitter;
+
+public class StubServiceEmitterModule implements Module
+{
+ @Override
+ public void configure(Binder binder)
+ {
+ }
+
+ @Provides
+ @ManageLifecycle
+ @Named(StubServiceEmitter.TYPE)
+ public Emitter getEmitter(TaskHolder taskHolder)
+ {
+ return new StubServiceEmitter("test", "host", taskHolder);
+ }
+}
diff --git a/processing/src/test/java/org/apache/druid/query/CPUTimeMetricQueryRunnerTest.java b/processing/src/test/java/org/apache/druid/query/CPUTimeMetricQueryRunnerTest.java
index 32d9852d0c1b..b55f19c62be3 100644
--- a/processing/src/test/java/org/apache/druid/query/CPUTimeMetricQueryRunnerTest.java
+++ b/processing/src/test/java/org/apache/druid/query/CPUTimeMetricQueryRunnerTest.java
@@ -41,7 +41,7 @@ public class CPUTimeMetricQueryRunnerTest
@Test
public void testCpuTimeMetric()
{
- final StubServiceEmitter emitter = new StubServiceEmitter("s", "h");
+ final StubServiceEmitter emitter = StubServiceEmitter.createStarted();
final AtomicLong accumulator = new AtomicLong();
final List> expectedResults = Collections.singletonList(
diff --git a/processing/src/test/java/org/apache/druid/query/DefaultQueryMetricsTest.java b/processing/src/test/java/org/apache/druid/query/DefaultQueryMetricsTest.java
index ee82f206f79c..8cea00215e05 100644
--- a/processing/src/test/java/org/apache/druid/query/DefaultQueryMetricsTest.java
+++ b/processing/src/test/java/org/apache/druid/query/DefaultQueryMetricsTest.java
@@ -48,7 +48,7 @@ public class DefaultQueryMetricsTest extends InitializedNullHandlingTest
@Test
public void testDefaultQueryMetricsQuery()
{
- final StubServiceEmitter serviceEmitter = new StubServiceEmitter("", "");
+ final StubServiceEmitter serviceEmitter = StubServiceEmitter.createStarted();
DefaultQueryMetrics> queryMetrics = new DefaultQueryMetrics<>();
TopNQuery query = new TopNQueryBuilder()
.dataSource("xx")
@@ -75,8 +75,8 @@ public void testDefaultQueryMetricsQuery()
Assert.assertEquals(13, actualEvent.size());
Assert.assertTrue(actualEvent.containsKey("feed"));
Assert.assertTrue(actualEvent.containsKey("timestamp"));
- Assert.assertEquals("", actualEvent.get("host"));
- Assert.assertEquals("", actualEvent.get("service"));
+ Assert.assertEquals("localhost", actualEvent.get("host"));
+ Assert.assertEquals("testing", actualEvent.get("service"));
Assert.assertEquals("xx", actualEvent.get(DruidMetrics.DATASOURCE));
Assert.assertEquals(query.getType(), actualEvent.get(DruidMetrics.TYPE));
List expectedIntervals = QueryRunnerTestHelper.FULL_ON_INTERVAL_SPEC.getIntervals();
@@ -144,7 +144,7 @@ public static void testQueryMetricsDefaultMetricNamesAndUnits(
@Test
public void testVectorizedDimensionInMetrics()
{
- final StubServiceEmitter serviceEmitter = new StubServiceEmitter("", "");
+ final StubServiceEmitter serviceEmitter = StubServiceEmitter.createStarted();
DefaultQueryMetrics> queryMetrics = new DefaultQueryMetrics<>();
queryMetrics.vectorized(true);
queryMetrics.reportSegmentTime(0).emit(serviceEmitter);
@@ -152,8 +152,8 @@ public void testVectorizedDimensionInMetrics()
Assert.assertEquals(7, actualEvent.size());
Assert.assertTrue(actualEvent.containsKey("feed"));
Assert.assertTrue(actualEvent.containsKey("timestamp"));
- Assert.assertEquals("", actualEvent.get("host"));
- Assert.assertEquals("", actualEvent.get("service"));
+ Assert.assertEquals("localhost", actualEvent.get("host"));
+ Assert.assertEquals("testing", actualEvent.get("service"));
Assert.assertEquals("query/segment/time", actualEvent.get("metric"));
Assert.assertEquals(0L, actualEvent.get("value"));
Assert.assertEquals(true, actualEvent.get("vectorized"));
diff --git a/processing/src/test/java/org/apache/druid/query/groupby/DefaultGroupByQueryMetricsTest.java b/processing/src/test/java/org/apache/druid/query/groupby/DefaultGroupByQueryMetricsTest.java
index 80c0ad405883..66f89a92cec6 100644
--- a/processing/src/test/java/org/apache/druid/query/groupby/DefaultGroupByQueryMetricsTest.java
+++ b/processing/src/test/java/org/apache/druid/query/groupby/DefaultGroupByQueryMetricsTest.java
@@ -51,7 +51,7 @@ public class DefaultGroupByQueryMetricsTest extends InitializedNullHandlingTest
@Test
public void testDefaultGroupByQueryMetricsQuery()
{
- final StubServiceEmitter serviceEmitter = new StubServiceEmitter("", "");
+ final StubServiceEmitter serviceEmitter = StubServiceEmitter.createStarted();
DefaultGroupByQueryMetrics queryMetrics = new DefaultGroupByQueryMetrics();
GroupByQuery.Builder builder = GroupByQuery
.builder()
@@ -78,8 +78,8 @@ public void testDefaultGroupByQueryMetricsQuery()
Assert.assertEquals(16, actualEvent.size());
Assert.assertTrue(actualEvent.containsKey("feed"));
Assert.assertTrue(actualEvent.containsKey("timestamp"));
- Assert.assertEquals("", actualEvent.get("host"));
- Assert.assertEquals("", actualEvent.get("service"));
+ Assert.assertEquals("localhost", actualEvent.get("host"));
+ Assert.assertEquals("testing", actualEvent.get("service"));
Assert.assertEquals(QueryRunnerTestHelper.DATA_SOURCE, actualEvent.get(DruidMetrics.DATASOURCE));
Assert.assertEquals(query.getType(), actualEvent.get(DruidMetrics.TYPE));
Interval expectedInterval = Intervals.of("2011-04-02/2011-04-04");
diff --git a/processing/src/test/java/org/apache/druid/query/search/DefaultSearchQueryMetricsTest.java b/processing/src/test/java/org/apache/druid/query/search/DefaultSearchQueryMetricsTest.java
index f29f93ff8624..8f613b71a9ed 100644
--- a/processing/src/test/java/org/apache/druid/query/search/DefaultSearchQueryMetricsTest.java
+++ b/processing/src/test/java/org/apache/druid/query/search/DefaultSearchQueryMetricsTest.java
@@ -47,7 +47,7 @@ public class DefaultSearchQueryMetricsTest extends InitializedNullHandlingTest
@Test
public void testDefaultSearchQueryMetricsQuery()
{
- final StubServiceEmitter serviceEmitter = new StubServiceEmitter("", "");
+ final StubServiceEmitter serviceEmitter = StubServiceEmitter.createStarted();
SearchQuery query = Druids
.newSearchQueryBuilder()
.dataSource(QueryRunnerTestHelper.DATA_SOURCE)
@@ -70,8 +70,8 @@ public void testDefaultSearchQueryMetricsQuery()
Assert.assertEquals(13, actualEvent.size());
Assert.assertTrue(actualEvent.containsKey("feed"));
Assert.assertTrue(actualEvent.containsKey("timestamp"));
- Assert.assertEquals("", actualEvent.get("host"));
- Assert.assertEquals("", actualEvent.get("service"));
+ Assert.assertEquals("localhost", actualEvent.get("host"));
+ Assert.assertEquals("testing", actualEvent.get("service"));
Assert.assertEquals(QueryRunnerTestHelper.DATA_SOURCE, actualEvent.get(DruidMetrics.DATASOURCE));
Assert.assertEquals(query.getType(), actualEvent.get(DruidMetrics.TYPE));
List expectedIntervals = QueryRunnerTestHelper.FULL_ON_INTERVAL_SPEC.getIntervals();
diff --git a/processing/src/test/java/org/apache/druid/query/timeseries/DefaultTimeseriesQueryMetricsTest.java b/processing/src/test/java/org/apache/druid/query/timeseries/DefaultTimeseriesQueryMetricsTest.java
index 61b9dc5edba8..0b426b08be5c 100644
--- a/processing/src/test/java/org/apache/druid/query/timeseries/DefaultTimeseriesQueryMetricsTest.java
+++ b/processing/src/test/java/org/apache/druid/query/timeseries/DefaultTimeseriesQueryMetricsTest.java
@@ -44,7 +44,7 @@ public class DefaultTimeseriesQueryMetricsTest extends InitializedNullHandlingTe
@Test
public void testDefaultTimeseriesQueryMetricsQuery()
{
- final StubServiceEmitter serviceEmitter = new StubServiceEmitter("", "");
+ final StubServiceEmitter serviceEmitter = StubServiceEmitter.createStarted();
DefaultTimeseriesQueryMetrics queryMetrics = new DefaultTimeseriesQueryMetrics();
TimeseriesQuery query = Druids
.newTimeseriesQueryBuilder()
@@ -63,8 +63,8 @@ public void testDefaultTimeseriesQueryMetricsQuery()
Assert.assertEquals(16, actualEvent.size());
Assert.assertTrue(actualEvent.containsKey("feed"));
Assert.assertTrue(actualEvent.containsKey("timestamp"));
- Assert.assertEquals("", actualEvent.get("host"));
- Assert.assertEquals("", actualEvent.get("service"));
+ Assert.assertEquals("localhost", actualEvent.get("host"));
+ Assert.assertEquals("testing", actualEvent.get("service"));
Assert.assertEquals(QueryRunnerTestHelper.DATA_SOURCE, actualEvent.get(DruidMetrics.DATASOURCE));
Assert.assertEquals(query.getType(), actualEvent.get(DruidMetrics.TYPE));
List expectedIntervals = QueryRunnerTestHelper.FULL_ON_INTERVAL_SPEC.getIntervals();
diff --git a/processing/src/test/java/org/apache/druid/query/topn/DefaultTopNQueryMetricsTest.java b/processing/src/test/java/org/apache/druid/query/topn/DefaultTopNQueryMetricsTest.java
index e232e6a64052..074ea10330a0 100644
--- a/processing/src/test/java/org/apache/druid/query/topn/DefaultTopNQueryMetricsTest.java
+++ b/processing/src/test/java/org/apache/druid/query/topn/DefaultTopNQueryMetricsTest.java
@@ -49,7 +49,7 @@ public class DefaultTopNQueryMetricsTest extends InitializedNullHandlingTest
@Test
public void testDefaultTopNQueryMetricsQuery()
{
- final StubServiceEmitter serviceEmitter = new StubServiceEmitter("", "");
+ final StubServiceEmitter serviceEmitter = StubServiceEmitter.createStarted();
DefaultTopNQueryMetrics queryMetrics = new DefaultTopNQueryMetrics();
TopNQuery query = new TopNQueryBuilder()
.dataSource("xx")
@@ -73,8 +73,8 @@ public void testDefaultTopNQueryMetricsQuery()
Assert.assertEquals(17, actualEvent.size());
Assert.assertTrue(actualEvent.containsKey("feed"));
Assert.assertTrue(actualEvent.containsKey("timestamp"));
- Assert.assertEquals("", actualEvent.get("host"));
- Assert.assertEquals("", actualEvent.get("service"));
+ Assert.assertEquals("localhost", actualEvent.get("host"));
+ Assert.assertEquals("testing", actualEvent.get("service"));
Assert.assertEquals("xx", actualEvent.get(DruidMetrics.DATASOURCE));
Assert.assertEquals(query.getType(), actualEvent.get(DruidMetrics.TYPE));
List expectedIntervals = QueryRunnerTestHelper.FULL_ON_INTERVAL_SPEC.getIntervals();
diff --git a/server/src/main/java/org/apache/druid/guice/DefaultServerHolderModule.java b/server/src/main/java/org/apache/druid/guice/DefaultServerHolderModule.java
index bd8d9c3c1f23..927223b6ee77 100644
--- a/server/src/main/java/org/apache/druid/guice/DefaultServerHolderModule.java
+++ b/server/src/main/java/org/apache/druid/guice/DefaultServerHolderModule.java
@@ -24,10 +24,10 @@
import org.apache.druid.discovery.NodeRole;
import org.apache.druid.guice.annotations.ExcludeScope;
import org.apache.druid.initialization.DruidModule;
+import org.apache.druid.java.util.metrics.NoopTaskHolder;
+import org.apache.druid.java.util.metrics.TaskHolder;
import org.apache.druid.server.metrics.DefaultLoadSpecHolder;
import org.apache.druid.server.metrics.LoadSpecHolder;
-import org.apache.druid.server.metrics.NoopTaskHolder;
-import org.apache.druid.server.metrics.TaskHolder;
/**
* Binds the following holder configs for all servers except {@code CliPeon}:
diff --git a/server/src/main/java/org/apache/druid/query/lookup/LookupListeningAnnouncerConfig.java b/server/src/main/java/org/apache/druid/query/lookup/LookupListeningAnnouncerConfig.java
index ffc8447a9920..da9b46e66076 100644
--- a/server/src/main/java/org/apache/druid/query/lookup/LookupListeningAnnouncerConfig.java
+++ b/server/src/main/java/org/apache/druid/query/lookup/LookupListeningAnnouncerConfig.java
@@ -24,9 +24,9 @@
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.base.Preconditions;
import org.apache.druid.java.util.common.StringUtils;
+import org.apache.druid.java.util.metrics.TaskHolder;
import org.apache.druid.server.lookup.cache.LookupLoadingSpec;
import org.apache.druid.server.metrics.LoadSpecHolder;
-import org.apache.druid.server.metrics.TaskHolder;
class LookupListeningAnnouncerConfig
{
diff --git a/server/src/main/java/org/apache/druid/segment/realtime/ChatHandlerResource.java b/server/src/main/java/org/apache/druid/segment/realtime/ChatHandlerResource.java
index 5745e42ac68d..32775fcdf063 100644
--- a/server/src/main/java/org/apache/druid/segment/realtime/ChatHandlerResource.java
+++ b/server/src/main/java/org/apache/druid/segment/realtime/ChatHandlerResource.java
@@ -23,9 +23,9 @@
import com.google.common.collect.Iterables;
import com.google.inject.Inject;
import org.apache.druid.java.util.common.StringUtils;
+import org.apache.druid.java.util.metrics.TaskHolder;
import org.apache.druid.server.initialization.jetty.BadRequestException;
import org.apache.druid.server.initialization.jetty.ServiceUnavailableException;
-import org.apache.druid.server.metrics.TaskHolder;
import javax.annotation.Nullable;
import javax.ws.rs.Path;
diff --git a/server/src/main/java/org/apache/druid/server/emitter/EmitterModule.java b/server/src/main/java/org/apache/druid/server/emitter/EmitterModule.java
index 686d993873b6..41312c592086 100644
--- a/server/src/main/java/org/apache/druid/server/emitter/EmitterModule.java
+++ b/server/src/main/java/org/apache/druid/server/emitter/EmitterModule.java
@@ -42,6 +42,7 @@
import org.apache.druid.java.util.emitter.EmittingLogger;
import org.apache.druid.java.util.emitter.core.Emitter;
import org.apache.druid.java.util.emitter.service.ServiceEmitter;
+import org.apache.druid.java.util.metrics.TaskHolder;
import org.apache.druid.server.DruidNode;
import java.lang.annotation.Annotation;
@@ -72,7 +73,6 @@ public void setProps(
public void configure(Binder binder)
{
String emitterType = props.getProperty(EMITTER_PROPERTY, "");
-
binder.install(new NoopEmitterModule());
binder.install(new LogEmitterModule());
binder.install(new HttpEmitterModule());
@@ -99,16 +99,18 @@ public void configure(Binder binder)
public ServiceEmitter getServiceEmitter(
@Self Supplier configSupplier,
Emitter emitter,
- @ExtraServiceDimensions Map extraServiceDimensions
+ @ExtraServiceDimensions Map extraServiceDimensions,
+ TaskHolder taskHolder
)
{
final DruidNode config = configSupplier.get();
- log.info("Using emitter [%s] for metrics and alerts, with dimensions [%s].", emitter, extraServiceDimensions);
+ log.info("Using emitter [%s] for metrics and alerts, with dimensions [%s] and taskHolder[%s].", emitter, extraServiceDimensions, taskHolder);
final ServiceEmitter retVal = new ServiceEmitter(
config.getServiceName(),
config.getHostAndPortToUse(),
emitter,
- ImmutableMap.copyOf(extraServiceDimensions)
+ ImmutableMap.copyOf(extraServiceDimensions),
+ taskHolder
);
EmittingLogger.registerEmitter(retVal);
return retVal;
diff --git a/server/src/main/java/org/apache/druid/server/initialization/jetty/ChatHandlerServerModule.java b/server/src/main/java/org/apache/druid/server/initialization/jetty/ChatHandlerServerModule.java
index a1c2debf629f..fa70e7c36cd9 100644
--- a/server/src/main/java/org/apache/druid/server/initialization/jetty/ChatHandlerServerModule.java
+++ b/server/src/main/java/org/apache/druid/server/initialization/jetty/ChatHandlerServerModule.java
@@ -31,11 +31,11 @@
import org.apache.druid.guice.annotations.RemoteChatHandler;
import org.apache.druid.guice.annotations.Self;
import org.apache.druid.java.util.common.lifecycle.Lifecycle;
+import org.apache.druid.java.util.metrics.TaskHolder;
import org.apache.druid.segment.realtime.ChatHandlerResource;
import org.apache.druid.server.DruidNode;
import org.apache.druid.server.initialization.ServerConfig;
import org.apache.druid.server.initialization.TLSServerConfig;
-import org.apache.druid.server.metrics.TaskHolder;
import org.apache.druid.server.security.TLSCertificateChecker;
import org.eclipse.jetty.server.Server;
import org.eclipse.jetty.util.ssl.SslContextFactory;
diff --git a/server/src/main/java/org/apache/druid/server/initialization/jetty/CliIndexerServerModule.java b/server/src/main/java/org/apache/druid/server/initialization/jetty/CliIndexerServerModule.java
index 4532d6c87af0..c1c10547d880 100644
--- a/server/src/main/java/org/apache/druid/server/initialization/jetty/CliIndexerServerModule.java
+++ b/server/src/main/java/org/apache/druid/server/initialization/jetty/CliIndexerServerModule.java
@@ -31,12 +31,12 @@
import org.apache.druid.guice.annotations.RemoteChatHandler;
import org.apache.druid.guice.annotations.Self;
import org.apache.druid.java.util.common.lifecycle.Lifecycle;
+import org.apache.druid.java.util.metrics.TaskHolder;
import org.apache.druid.query.lookup.LookupModule;
import org.apache.druid.segment.realtime.ChatHandlerResource;
import org.apache.druid.server.DruidNode;
import org.apache.druid.server.initialization.ServerConfig;
import org.apache.druid.server.initialization.TLSServerConfig;
-import org.apache.druid.server.metrics.TaskHolder;
import org.apache.druid.server.security.TLSCertificateChecker;
import org.eclipse.jetty.server.Server;
import org.eclipse.jetty.util.ssl.SslContextFactory;
diff --git a/server/src/main/java/org/apache/druid/server/initialization/jetty/JettyServerModule.java b/server/src/main/java/org/apache/druid/server/initialization/jetty/JettyServerModule.java
index 5d54bf8b4d40..41cd7bf7fbdb 100644
--- a/server/src/main/java/org/apache/druid/server/initialization/jetty/JettyServerModule.java
+++ b/server/src/main/java/org/apache/druid/server/initialization/jetty/JettyServerModule.java
@@ -48,14 +48,11 @@
import org.apache.druid.java.util.emitter.service.ServiceEmitter;
import org.apache.druid.java.util.emitter.service.ServiceMetricEvent;
import org.apache.druid.java.util.metrics.AbstractMonitor;
-import org.apache.druid.java.util.metrics.MonitorUtils;
import org.apache.druid.server.DruidNode;
import org.apache.druid.server.StatusResource;
import org.apache.druid.server.initialization.ServerConfig;
import org.apache.druid.server.initialization.TLSServerConfig;
import org.apache.druid.server.metrics.MetricsModule;
-import org.apache.druid.server.metrics.MonitorsConfig;
-import org.apache.druid.server.metrics.TaskHolder;
import org.apache.druid.server.security.CustomCheckX509TrustManager;
import org.apache.druid.server.security.TLSCertificateChecker;
import org.eclipse.jetty.server.ConnectionFactory;
@@ -92,7 +89,6 @@
import java.util.Arrays;
import java.util.Collection;
import java.util.List;
-import java.util.Map;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicInteger;
@@ -517,27 +513,17 @@ private static int getTCPAcceptQueueSize()
@Provides
@LazySingleton
- public JettyMonitor getJettyMonitor(TaskHolder taskHolder)
+ public JettyMonitor getJettyMonitor()
{
- return new JettyMonitor(
- MonitorsConfig.mapOfTaskHolderDimensions(taskHolder)
- );
+ return new JettyMonitor();
}
public static class JettyMonitor extends AbstractMonitor
{
- private final Map dimensions;
-
- public JettyMonitor(Map dimensions)
- {
- this.dimensions = dimensions;
- }
-
@Override
public boolean doMonitor(ServiceEmitter emitter)
{
final ServiceMetricEvent.Builder builder = new ServiceMetricEvent.Builder();
- MonitorUtils.addDimensionsToBuilder(builder, dimensions);
emitter.emit(builder.setMetric("jetty/numOpenConnections", ACTIVE_CONNECTIONS.get()));
if (jettyServerThreadPool != null) {
emitter.emit(builder.setMetric("jetty/threadPool/total", jettyServerThreadPool.getThreads()));
diff --git a/server/src/main/java/org/apache/druid/server/metrics/GroupByStatsMonitor.java b/server/src/main/java/org/apache/druid/server/metrics/GroupByStatsMonitor.java
index 453fb4daaed8..10985b4b4d3a 100644
--- a/server/src/main/java/org/apache/druid/server/metrics/GroupByStatsMonitor.java
+++ b/server/src/main/java/org/apache/druid/server/metrics/GroupByStatsMonitor.java
@@ -27,11 +27,9 @@
import org.apache.druid.java.util.emitter.service.ServiceEmitter;
import org.apache.druid.java.util.emitter.service.ServiceMetricEvent;
import org.apache.druid.java.util.metrics.AbstractMonitor;
-import org.apache.druid.java.util.metrics.MonitorUtils;
import org.apache.druid.query.groupby.GroupByStatsProvider;
import java.nio.ByteBuffer;
-import java.util.Map;
@LoadScope(roles = {
NodeRole.BROKER_JSON_NAME,
@@ -43,25 +41,21 @@ public class GroupByStatsMonitor extends AbstractMonitor
{
private final GroupByStatsProvider groupByStatsProvider;
private final BlockingPool mergeBufferPool;
- private final Map dimensions;
@Inject
public GroupByStatsMonitor(
GroupByStatsProvider groupByStatsProvider,
- @Merging BlockingPool mergeBufferPool,
- TaskHolder taskHolder
+ @Merging BlockingPool mergeBufferPool
)
{
this.groupByStatsProvider = groupByStatsProvider;
this.mergeBufferPool = mergeBufferPool;
- this.dimensions = MonitorsConfig.mapOfTaskHolderDimensions(taskHolder);
}
@Override
public boolean doMonitor(ServiceEmitter emitter)
{
final ServiceMetricEvent.Builder builder = new ServiceMetricEvent.Builder();
- MonitorUtils.addDimensionsToBuilder(builder, dimensions);
emitter.emit(builder.setMetric("mergeBuffer/pendingRequests", mergeBufferPool.getPendingRequests()));
diff --git a/server/src/main/java/org/apache/druid/server/metrics/MetricsModule.java b/server/src/main/java/org/apache/druid/server/metrics/MetricsModule.java
index 0b59409de0d2..c1425b81652c 100644
--- a/server/src/main/java/org/apache/druid/server/metrics/MetricsModule.java
+++ b/server/src/main/java/org/apache/druid/server/metrics/MetricsModule.java
@@ -57,7 +57,6 @@
import java.time.Duration;
import java.util.ArrayList;
import java.util.List;
-import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
@@ -143,48 +142,39 @@ public MonitorScheduler getMonitorScheduler(
@Provides
@ManageLifecycle
- public JvmMonitor getJvmMonitor(
- TaskHolder taskHolder
- )
+ public JvmMonitor getJvmMonitor()
{
- Map dimensions = MonitorsConfig.mapOfTaskHolderDimensions(taskHolder);
- return new JvmMonitor(dimensions);
+ return new JvmMonitor();
}
@Provides
@ManageLifecycle
- public JvmCpuMonitor getJvmCpuMonitor(
- TaskHolder taskHolder
- )
+ public JvmCpuMonitor getJvmCpuMonitor()
{
- Map dimensions = MonitorsConfig.mapOfTaskHolderDimensions(taskHolder);
- return new JvmCpuMonitor(dimensions);
+ return new JvmCpuMonitor();
}
@Provides
@ManageLifecycle
- public JvmThreadsMonitor getJvmThreadsMonitor(TaskHolder taskHolder)
+ public JvmThreadsMonitor getJvmThreadsMonitor()
{
- Map dimensions = MonitorsConfig.mapOfTaskHolderDimensions(taskHolder);
- return new JvmThreadsMonitor(dimensions);
+ return new JvmThreadsMonitor();
}
@Provides
@ManageLifecycle
- public SysMonitor getSysMonitor(TaskHolder taskHolder, @Self Set nodeRoles)
+ public SysMonitor getSysMonitor(@Self Set nodeRoles)
{
if (nodeRoles.contains(NodeRole.PEON)) {
return new NoopSysMonitor();
} else {
- Map dimensions = MonitorsConfig.mapOfTaskHolderDimensions(taskHolder);
- return new SysMonitor(dimensions);
+ return new SysMonitor();
}
}
@Provides
@ManageLifecycle
public OshiSysMonitor getOshiSysMonitor(
- TaskHolder taskHolder,
@Self Set nodeRoles,
OshiSysMonitorConfig oshiSysConfig
)
@@ -192,8 +182,7 @@ public OshiSysMonitor getOshiSysMonitor(
if (nodeRoles.contains(NodeRole.PEON)) {
return new NoopOshiSysMonitor();
} else {
- Map dimensions = MonitorsConfig.mapOfTaskHolderDimensions(taskHolder);
- return new OshiSysMonitor(dimensions, oshiSysConfig);
+ return new OshiSysMonitor(oshiSysConfig);
}
}
diff --git a/server/src/main/java/org/apache/druid/server/metrics/MonitorsConfig.java b/server/src/main/java/org/apache/druid/server/metrics/MonitorsConfig.java
index 261eb052d058..41561b2bb777 100644
--- a/server/src/main/java/org/apache/druid/server/metrics/MonitorsConfig.java
+++ b/server/src/main/java/org/apache/druid/server/metrics/MonitorsConfig.java
@@ -20,16 +20,13 @@
package org.apache.druid.server.metrics;
import com.fasterxml.jackson.annotation.JsonProperty;
-import com.google.common.collect.ImmutableMap;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.java.util.metrics.Monitor;
-import org.apache.druid.query.DruidMetrics;
import javax.validation.constraints.NotNull;
import java.util.ArrayList;
import java.util.List;
-import java.util.Map;
/**
*/
@@ -77,28 +74,6 @@ public String toString()
'}';
}
- /**
- * @return a map of task holder dimensions from the provided {@link TaskHolder} if {@link TaskHolder#getDataSource()}
- * and {@link TaskHolder#getTaskId()} are non-null.
- * The task ID ({@link TaskHolder#getTaskId()}) is added to both {@link DruidMetrics#TASK_ID}
- * and {@link DruidMetrics#ID} dimensions to the map for backward compatibility. {@link DruidMetrics#ID} is
- * deprecated because it's ambiguous and will be removed in a future release.
- */
- public static Map mapOfTaskHolderDimensions(final TaskHolder taskHolder)
- {
- final String dataSource = taskHolder.getDataSource();
- final String taskId = taskHolder.getTaskId();
- final ImmutableMap.Builder builder = ImmutableMap.builder();
- if (dataSource != null) {
- builder.put(DruidMetrics.DATASOURCE, new String[]{dataSource});
- }
- if (taskId != null) {
- builder.put(DruidMetrics.TASK_ID, new String[]{taskId});
- builder.put(DruidMetrics.ID, new String[]{taskId});
- }
- return builder.build();
- }
-
private static List> getMonitorsFromNames(List monitorNames)
{
List> monitors = new ArrayList<>();
diff --git a/server/src/test/java/org/apache/druid/guice/DefaultServerHolderModuleTest.java b/server/src/test/java/org/apache/druid/guice/DefaultServerHolderModuleTest.java
index 4d377001ef84..d8c2751da5f1 100644
--- a/server/src/test/java/org/apache/druid/guice/DefaultServerHolderModuleTest.java
+++ b/server/src/test/java/org/apache/druid/guice/DefaultServerHolderModuleTest.java
@@ -23,10 +23,10 @@
import com.google.inject.Injector;
import org.apache.druid.discovery.NodeRole;
import org.apache.druid.initialization.CoreInjectorBuilder;
+import org.apache.druid.java.util.metrics.NoopTaskHolder;
+import org.apache.druid.java.util.metrics.TaskHolder;
import org.apache.druid.server.metrics.DefaultLoadSpecHolder;
import org.apache.druid.server.metrics.LoadSpecHolder;
-import org.apache.druid.server.metrics.NoopTaskHolder;
-import org.apache.druid.server.metrics.TaskHolder;
import org.junit.Assert;
import org.junit.Test;
diff --git a/server/src/test/java/org/apache/druid/query/lookup/LookupListeningAnnouncerConfigTest.java b/server/src/test/java/org/apache/druid/query/lookup/LookupListeningAnnouncerConfigTest.java
index 0038b7d36c50..61586e47d3dc 100644
--- a/server/src/test/java/org/apache/druid/query/lookup/LookupListeningAnnouncerConfigTest.java
+++ b/server/src/test/java/org/apache/druid/query/lookup/LookupListeningAnnouncerConfigTest.java
@@ -29,12 +29,13 @@
import org.apache.druid.guice.JsonConfigurator;
import org.apache.druid.guice.annotations.Self;
import org.apache.druid.initialization.Initialization;
+import org.apache.druid.java.util.metrics.TaskHolder;
import org.apache.druid.server.DruidNode;
import org.apache.druid.server.coordination.BroadcastDatasourceLoadingSpec;
import org.apache.druid.server.lookup.cache.LookupLoadingSpec;
import org.apache.druid.server.metrics.DefaultLoadSpecHolder;
import org.apache.druid.server.metrics.LoadSpecHolder;
-import org.apache.druid.server.metrics.TaskHolder;
+import org.apache.druid.server.metrics.TestLoadSpecHolder;
import org.apache.druid.server.metrics.TestTaskHolder;
import org.junit.Assert;
import org.junit.Before;
@@ -59,21 +60,10 @@ public void configure(Binder binder)
Key.get(DruidNode.class, Self.class),
new DruidNode("test-inject", null, false, null, null, true, false)
);
- binder.bind(TaskHolder.class).toInstance(new TestTaskHolder("some_datasource", "some_taskid"));
- binder.bind(LoadSpecHolder.class).toInstance(new LoadSpecHolder()
- {
- @Override
- public LookupLoadingSpec getLookupLoadingSpec()
- {
- return LookupLoadingSpec.loadOnly(Set.of("lookupName1", "lookupName2"));
- }
-
- @Override
- public BroadcastDatasourceLoadingSpec getBroadcastDatasourceLoadingSpec()
- {
- return BroadcastDatasourceLoadingSpec.ALL;
- }
- });
+ binder.bind(TaskHolder.class).toInstance(new TestTaskHolder("some_datasource", "some_taskid", "test_tasktype", "test_groupid"));
+ binder.bind(LoadSpecHolder.class).toInstance(
+ new TestLoadSpecHolder(LookupLoadingSpec.loadOnly(Set.of("lookupName1", "lookupName2")), BroadcastDatasourceLoadingSpec.ALL)
+ );
}
},
new LookupModule()
diff --git a/server/src/test/java/org/apache/druid/segment/realtime/ChatHandlerResourceTest.java b/server/src/test/java/org/apache/druid/segment/realtime/ChatHandlerResourceTest.java
index 7e5320d74b60..4636e6523d86 100644
--- a/server/src/test/java/org/apache/druid/segment/realtime/ChatHandlerResourceTest.java
+++ b/server/src/test/java/org/apache/druid/segment/realtime/ChatHandlerResourceTest.java
@@ -21,8 +21,8 @@
package org.apache.druid.segment.realtime;
import com.google.common.base.Optional;
+import org.apache.druid.java.util.metrics.TaskHolder;
import org.apache.druid.server.initialization.jetty.ServiceUnavailableException;
-import org.apache.druid.server.metrics.TaskHolder;
import org.easymock.EasyMock;
import org.easymock.EasyMockRunner;
import org.easymock.EasyMockSupport;
diff --git a/server/src/test/java/org/apache/druid/server/ClientQuerySegmentWalkerTest.java b/server/src/test/java/org/apache/druid/server/ClientQuerySegmentWalkerTest.java
index 021227a3ca36..b374bbb820f6 100644
--- a/server/src/test/java/org/apache/druid/server/ClientQuerySegmentWalkerTest.java
+++ b/server/src/test/java/org/apache/druid/server/ClientQuerySegmentWalkerTest.java
@@ -221,7 +221,7 @@ public class ClientQuerySegmentWalkerTest
private Closer closer;
private QueryRunnerFactoryConglomerate conglomerate;
- private final StubServiceEmitter emitter = new StubServiceEmitter();
+ private final StubServiceEmitter emitter = StubServiceEmitter.createStarted();
// Queries that are issued; checked by "testQuery" against its "expectedQueries" parameter.
private final List issuedQueries = new ArrayList<>();
diff --git a/server/src/test/java/org/apache/druid/server/QueryResourceTest.java b/server/src/test/java/org/apache/druid/server/QueryResourceTest.java
index 72c895e46a03..b1ff73cace0a 100644
--- a/server/src/test/java/org/apache/druid/server/QueryResourceTest.java
+++ b/server/src/test/java/org/apache/druid/server/QueryResourceTest.java
@@ -258,7 +258,7 @@ public void setup()
queryScheduler = QueryStackTests.DEFAULT_NOOP_SCHEDULER;
testRequestLogger = new TestRequestLogger();
- emitter = new StubServiceEmitter();
+ emitter = StubServiceEmitter.createStarted();
queryResource = createQueryResource(ResponseContextConfig.newConfig(true));
}
diff --git a/server/src/test/java/org/apache/druid/server/coordination/SegmentCacheBootstrapperTest.java b/server/src/test/java/org/apache/druid/server/coordination/SegmentCacheBootstrapperTest.java
index cd06239cbcc5..d64f39fb8ee4 100644
--- a/server/src/test/java/org/apache/druid/server/coordination/SegmentCacheBootstrapperTest.java
+++ b/server/src/test/java/org/apache/druid/server/coordination/SegmentCacheBootstrapperTest.java
@@ -30,7 +30,7 @@
import org.apache.druid.server.SegmentManager;
import org.apache.druid.server.lookup.cache.LookupLoadingSpec;
import org.apache.druid.server.metrics.DefaultLoadSpecHolder;
-import org.apache.druid.server.metrics.LoadSpecHolder;
+import org.apache.druid.server.metrics.TestLoadSpecHolder;
import org.apache.druid.test.utils.TestSegmentCacheManager;
import org.apache.druid.timeline.DataSegment;
import org.junit.Assert;
@@ -300,20 +300,7 @@ public void testLoadNoBootstrapSegments() throws Exception
new ServerTypeConfig(ServerType.HISTORICAL),
coordinatorClient,
serviceEmitter,
- new LoadSpecHolder()
- {
- @Override
- public LookupLoadingSpec getLookupLoadingSpec()
- {
- return LookupLoadingSpec.ALL;
- }
-
- @Override
- public BroadcastDatasourceLoadingSpec getBroadcastDatasourceLoadingSpec()
- {
- return BroadcastDatasourceLoadingSpec.NONE;
- }
- }
+ new TestLoadSpecHolder(LookupLoadingSpec.ALL, BroadcastDatasourceLoadingSpec.NONE)
);
Assert.assertTrue(segmentManager.getDataSourceCounts().isEmpty());
@@ -362,20 +349,7 @@ public void testLoadOnlyRequiredBootstrapSegments() throws Exception
new ServerTypeConfig(ServerType.HISTORICAL),
coordinatorClient,
serviceEmitter,
- new LoadSpecHolder()
- {
- @Override
- public LookupLoadingSpec getLookupLoadingSpec()
- {
- return LookupLoadingSpec.NONE;
- }
-
- @Override
- public BroadcastDatasourceLoadingSpec getBroadcastDatasourceLoadingSpec()
- {
- return BroadcastDatasourceLoadingSpec.loadOnly(Set.of("test1"));
- }
- }
+ new TestLoadSpecHolder(LookupLoadingSpec.NONE, BroadcastDatasourceLoadingSpec.loadOnly(Set.of("test1")))
);
Assert.assertTrue(segmentManager.getDataSourceCounts().isEmpty());
diff --git a/server/src/test/java/org/apache/druid/server/coordinator/duty/RunRulesTest.java b/server/src/test/java/org/apache/druid/server/coordinator/duty/RunRulesTest.java
index f7e9ececc1ac..a3042db2df18 100644
--- a/server/src/test/java/org/apache/druid/server/coordinator/duty/RunRulesTest.java
+++ b/server/src/test/java/org/apache/druid/server/coordinator/duty/RunRulesTest.java
@@ -96,7 +96,7 @@ public class RunRulesTest
public void setUp()
{
mockPeon = EasyMock.createMock(LoadQueuePeon.class);
- emitter = new StubServiceEmitter("coordinator", "host");
+ emitter = StubServiceEmitter.createStarted();
EmittingLogger.registerEmitter(emitter);
databaseRuleManager = EasyMock.createMock(MetadataRuleManager.class);
ruleRunner = new RunRules((ds, set) -> set.size(), databaseRuleManager::getRulesWithDefault);
diff --git a/server/src/test/java/org/apache/druid/server/emitter/EmitterModuleTest.java b/server/src/test/java/org/apache/druid/server/emitter/EmitterModuleTest.java
index 3467d7bee640..536ff5c34ce5 100644
--- a/server/src/test/java/org/apache/druid/server/emitter/EmitterModuleTest.java
+++ b/server/src/test/java/org/apache/druid/server/emitter/EmitterModuleTest.java
@@ -20,19 +20,39 @@
package org.apache.druid.server.emitter;
import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableSet;
import com.google.inject.Binder;
import com.google.inject.Guice;
import com.google.inject.Injector;
+import com.google.inject.Key;
import com.google.inject.Module;
+import com.google.inject.Scopes;
+import org.apache.druid.discovery.NodeRole;
+import org.apache.druid.guice.DefaultServerHolderModule;
import org.apache.druid.guice.DruidGuiceExtensions;
+import org.apache.druid.guice.JsonConfigProvider;
import org.apache.druid.guice.JsonConfigurator;
import org.apache.druid.guice.LazySingleton;
import org.apache.druid.guice.LifecycleModule;
import org.apache.druid.guice.ServerModule;
+import org.apache.druid.guice.annotations.Self;
+import org.apache.druid.initialization.ServerInjectorBuilder;
import org.apache.druid.jackson.JacksonModule;
import org.apache.druid.java.util.emitter.core.Emitter;
+import org.apache.druid.java.util.emitter.core.Event;
+import org.apache.druid.java.util.emitter.core.EventMap;
import org.apache.druid.java.util.emitter.core.NoopEmitter;
import org.apache.druid.java.util.emitter.core.ParametrizedUriEmitter;
+import org.apache.druid.java.util.emitter.service.ServiceEmitter;
+import org.apache.druid.java.util.emitter.service.ServiceMetricEvent;
+import org.apache.druid.java.util.metrics.StubServiceEmitter;
+import org.apache.druid.java.util.metrics.StubServiceEmitterModule;
+import org.apache.druid.java.util.metrics.TaskHolder;
+import org.apache.druid.query.DruidMetrics;
+import org.apache.druid.server.DruidNode;
+import org.apache.druid.server.metrics.DefaultLoadSpecHolder;
+import org.apache.druid.server.metrics.LoadSpecHolder;
+import org.apache.druid.server.metrics.TestTaskHolder;
import org.hamcrest.CoreMatchers;
import org.junit.Assert;
import org.junit.Rule;
@@ -41,6 +61,7 @@
import javax.validation.Validation;
import javax.validation.Validator;
+import java.util.List;
import java.util.Properties;
public class EmitterModuleTest
@@ -87,6 +108,78 @@ public void testInvalidEmitterType()
makeInjectorWithProperties(props).getInstance(Emitter.class);
}
+ @Test
+ public void testEmitterForTaskContainsAllTaskDimensions()
+ {
+ Properties props = new Properties();
+ props.setProperty("druid.emitter", "stub");
+ EmitterModule emitterModule = new EmitterModule();
+ emitterModule.setProps(props);
+
+ ImmutableSet nodeRoles = ImmutableSet.of();
+
+ TestTaskHolder testTaskHolder = new TestTaskHolder("wiki", "id1", "type1", "group1");
+ Injector injector = Guice.createInjector(
+ new JacksonModule(),
+ new LifecycleModule(),
+ binder -> {
+ JsonConfigProvider.bindInstance(
+ binder,
+ Key.get(DruidNode.class, Self.class),
+ new DruidNode("test-inject", null, false, null, null, true, false)
+ );
+ binder.bind(Validator.class).toInstance(Validation.buildDefaultValidatorFactory().getValidator());
+ binder.bindScope(LazySingleton.class, Scopes.SINGLETON);
+ binder.bind(Properties.class).toInstance(props);
+ binder.bind(TaskHolder.class).toInstance(testTaskHolder);
+ binder.bind(LoadSpecHolder.class).to(DefaultLoadSpecHolder.class).in(LazySingleton.class);
+ },
+ ServerInjectorBuilder.registerNodeRoleModule(nodeRoles),
+ emitterModule,
+ new StubServiceEmitterModule()
+ );
+
+ ServiceEmitter instance = injector.getInstance(ServiceEmitter.class);
+ Assert.assertNotNull(instance);
+ instance.start();
+ final ServiceMetricEvent.Builder builder = new ServiceMetricEvent.Builder();
+ builder.setDimension("foo", "bar");
+ builder.setMetric("metric1", 1);
+ instance.emit(builder);
+
+ Emitter instance1 = injector.getInstance(Emitter.class);
+ Assert.assertTrue(instance1 instanceof StubServiceEmitter);
+ StubServiceEmitter stubEmitter = (StubServiceEmitter) instance1;
+
+ stubEmitter.verifyEmitted("metric1", 1);
+ List events = stubEmitter.getEvents();
+ Assert.assertEquals(1, events.size());
+ ServiceMetricEvent event = (ServiceMetricEvent) events.get(0);
+ EventMap map = event.toMap();
+ Assert.assertEquals("id1", map.get(DruidMetrics.TASK_ID));
+ Assert.assertEquals("id1", map.get(DruidMetrics.ID));
+ Assert.assertEquals("type1", map.get(DruidMetrics.TASK_TYPE));
+ Assert.assertEquals("group1", map.get(DruidMetrics.GROUP_ID));
+ Assert.assertEquals("wiki", map.get(DruidMetrics.DATASOURCE));
+ stubEmitter.flush();
+
+ // Override a dimension and verify that is emitted
+ final ServiceMetricEvent.Builder builder2 = new ServiceMetricEvent.Builder();
+ builder2.setDimension("taskId", "id2");
+ builder2.setMetric("metric2", 1);
+ instance.emit(builder2);
+
+ List events2 = stubEmitter.getEvents();
+ Assert.assertEquals(1, events2.size());
+ ServiceMetricEvent event2 = (ServiceMetricEvent) events2.get(0);
+ EventMap map2 = event2.toMap();
+ Assert.assertEquals("id2", map2.get(DruidMetrics.TASK_ID));
+ Assert.assertEquals("id1", map2.get(DruidMetrics.ID));
+ Assert.assertEquals("type1", map2.get(DruidMetrics.TASK_TYPE));
+ Assert.assertEquals("group1", map2.get(DruidMetrics.GROUP_ID));
+ Assert.assertEquals("wiki", map2.get(DruidMetrics.DATASOURCE));
+ }
+
private Injector makeInjectorWithProperties(final Properties props)
{
EmitterModule emitterModule = new EmitterModule();
@@ -97,6 +190,7 @@ private Injector makeInjectorWithProperties(final Properties props)
new LifecycleModule(),
new ServerModule(),
new JacksonModule(),
+ new DefaultServerHolderModule(),
new Module()
{
@Override
diff --git a/server/src/test/java/org/apache/druid/server/initialization/jetty/JettyServerModuleTest.java b/server/src/test/java/org/apache/druid/server/initialization/jetty/JettyServerModuleTest.java
index 18c3317c84d7..317c69e17e2c 100644
--- a/server/src/test/java/org/apache/druid/server/initialization/jetty/JettyServerModuleTest.java
+++ b/server/src/test/java/org/apache/druid/server/initialization/jetty/JettyServerModuleTest.java
@@ -28,8 +28,6 @@
import org.apache.druid.server.DruidNode;
import org.apache.druid.server.initialization.ServerConfig;
import org.apache.druid.server.initialization.TLSServerConfig;
-import org.apache.druid.server.metrics.MonitorsConfig;
-import org.apache.druid.server.metrics.TestTaskHolder;
import org.apache.druid.server.security.TLSCertificateChecker;
import org.eclipse.jetty.server.Server;
import org.eclipse.jetty.util.ssl.SslContextFactory;
@@ -40,7 +38,6 @@
import java.util.Arrays;
import java.util.List;
-import java.util.Map;
public class JettyServerModuleTest
{
@@ -57,12 +54,9 @@ public void testJettyServerModule()
Mockito.when(jettyServerThreadPool.getQueueSize()).thenReturn(50);
Mockito.when(jettyServerThreadPool.getBusyThreads()).thenReturn(60);
- final Map dimensionMap = MonitorsConfig.mapOfTaskHolderDimensions(
- new TestTaskHolder("ds", "t0")
- );
- JettyServerModule.JettyMonitor jettyMonitor = new JettyServerModule.JettyMonitor(dimensionMap);
+ JettyServerModule.JettyMonitor jettyMonitor = new JettyServerModule.JettyMonitor();
- final StubServiceEmitter serviceEmitter = new StubServiceEmitter("service", "host");
+ final StubServiceEmitter serviceEmitter = StubServiceEmitter.createStarted();
jettyMonitor.doMonitor(serviceEmitter);
serviceEmitter.verifyValue("jetty/numOpenConnections", 0);
diff --git a/server/src/test/java/org/apache/druid/server/metrics/GroupByStatsMonitorTest.java b/server/src/test/java/org/apache/druid/server/metrics/GroupByStatsMonitorTest.java
index ce4fbf1b68ed..94da5889d39c 100644
--- a/server/src/test/java/org/apache/druid/server/metrics/GroupByStatsMonitorTest.java
+++ b/server/src/test/java/org/apache/druid/server/metrics/GroupByStatsMonitorTest.java
@@ -21,7 +21,10 @@
import org.apache.druid.collections.BlockingPool;
import org.apache.druid.collections.DefaultBlockingPool;
+import org.apache.druid.java.util.emitter.core.EventMap;
+import org.apache.druid.java.util.emitter.service.ServiceMetricEvent;
import org.apache.druid.java.util.metrics.StubServiceEmitter;
+import org.apache.druid.query.DruidMetrics;
import org.apache.druid.query.groupby.GroupByStatsProvider;
import org.junit.After;
import org.junit.Assert;
@@ -32,6 +35,7 @@
import java.util.Collections;
import java.util.List;
import java.util.Map;
+import java.util.Objects;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
@@ -76,8 +80,9 @@ public void tearDown()
public void testMonitor()
{
final GroupByStatsMonitor monitor =
- new GroupByStatsMonitor(groupByStatsProvider, mergeBufferPool, new NoopTaskHolder());
+ new GroupByStatsMonitor(groupByStatsProvider, mergeBufferPool);
final StubServiceEmitter emitter = new StubServiceEmitter("service", "host");
+ emitter.start();
monitor.doMonitor(emitter);
emitter.flush();
// Trigger metric emission
@@ -98,30 +103,35 @@ public void testMonitorWithServiceDimensions()
{
final String dataSource = "fooDs";
final String taskId = "taskId1";
+ final String groupId = "test_groupid";
+ final String taskType = "test_tasktype";
final GroupByStatsMonitor monitor = new GroupByStatsMonitor(
groupByStatsProvider,
- mergeBufferPool,
- new TestTaskHolder(dataSource, taskId)
+ mergeBufferPool
);
- final StubServiceEmitter emitter = new StubServiceEmitter("service", "host");
+ final StubServiceEmitter emitter = new StubServiceEmitter("service", "host", new TestTaskHolder(dataSource, taskId, taskType, groupId));
+ emitter.start();
monitor.doMonitor(emitter);
emitter.flush();
// Trigger metric emission
monitor.doMonitor(emitter);
final Map dimFilters = Map.of(
- "taskId", List.of(taskId), "dataSource", List.of(dataSource), "id", List.of(taskId)
+ DruidMetrics.DATASOURCE, dataSource,
+ DruidMetrics.TASK_ID, taskId,
+ DruidMetrics.ID, taskId,
+ DruidMetrics.TASK_TYPE,
+ taskType, DruidMetrics.GROUP_ID, groupId
);
- Assert.assertEquals(7, emitter.getNumEmittedEvents());
- emitter.verifyValue("mergeBuffer/pendingRequests", dimFilters, 0L);
- emitter.verifyValue("mergeBuffer/used", dimFilters, 0L);
- emitter.verifyValue("mergeBuffer/queries", dimFilters, 1L);
- emitter.verifyValue("mergeBuffer/acquisitionTimeNs", dimFilters, 100L);
- emitter.verifyValue("groupBy/spilledQueries", dimFilters, 2L);
- emitter.verifyValue("groupBy/spilledBytes", dimFilters, 200L);
- emitter.verifyValue("groupBy/mergeDictionarySize", dimFilters, 300L);
- }
+ verifyMetricValue(emitter, "mergeBuffer/pendingRequests", dimFilters, 0L);
+ verifyMetricValue(emitter, "mergeBuffer/used", dimFilters, 0L);
+ verifyMetricValue(emitter, "mergeBuffer/queries", dimFilters, 1L);
+ verifyMetricValue(emitter, "mergeBuffer/acquisitionTimeNs", dimFilters, 100L);
+ verifyMetricValue(emitter, "groupBy/spilledQueries", dimFilters, 2L);
+ verifyMetricValue(emitter, "groupBy/spilledBytes", dimFilters, 200L);
+ verifyMetricValue(emitter, "groupBy/mergeDictionarySize", dimFilters, 300L);
+ }
@Test
public void testMonitoringMergeBuffer_acquiredCount()
@@ -132,7 +142,7 @@ public void testMonitoringMergeBuffer_acquiredCount()
}).get(20, TimeUnit.SECONDS);
final GroupByStatsMonitor monitor =
- new GroupByStatsMonitor(groupByStatsProvider, mergeBufferPool, new NoopTaskHolder());
+ new GroupByStatsMonitor(groupByStatsProvider, mergeBufferPool);
final StubServiceEmitter emitter = new StubServiceEmitter("DummyService", "DummyHost");
boolean ret = monitor.doMonitor(emitter);
Assert.assertTrue(ret);
@@ -161,7 +171,7 @@ public void testMonitoringMergeBuffer_pendingRequests()
}
final GroupByStatsMonitor monitor =
- new GroupByStatsMonitor(groupByStatsProvider, mergeBufferPool, new NoopTaskHolder());
+ new GroupByStatsMonitor(groupByStatsProvider, mergeBufferPool);
final StubServiceEmitter emitter = new StubServiceEmitter("DummyService", "DummyHost");
boolean ret = monitor.doMonitor(emitter);
Assert.assertTrue(ret);
@@ -174,4 +184,16 @@ public void testMonitoringMergeBuffer_pendingRequests()
// do nothing
}
}
+
+ private void verifyMetricValue(StubServiceEmitter emitter, String metricName, Map dimFilters, Number expectedValue)
+ {
+ final List observedMetricEvents = emitter.getMetricEvents(metricName);
+ Assert.assertEquals(1, observedMetricEvents.size());
+ final ServiceMetricEvent event = observedMetricEvents.get(0);
+ final EventMap map = event.toMap();
+ final boolean matchesDims = dimFilters.entrySet().stream()
+ .allMatch(e -> Objects.equals(e.getValue(), map.get(e.getKey())));
+ Assert.assertTrue(matchesDims);
+ Assert.assertEquals(expectedValue, event.getValue());
+ }
}
diff --git a/server/src/test/java/org/apache/druid/server/metrics/LatchableEmitter.java b/server/src/test/java/org/apache/druid/server/metrics/LatchableEmitter.java
index 8f5ad4979284..e9779c1ddd68 100644
--- a/server/src/test/java/org/apache/druid/server/metrics/LatchableEmitter.java
+++ b/server/src/test/java/org/apache/druid/server/metrics/LatchableEmitter.java
@@ -24,6 +24,7 @@
import org.apache.druid.java.util.emitter.core.Event;
import org.apache.druid.java.util.emitter.service.ServiceMetricEvent;
import org.apache.druid.java.util.metrics.StubServiceEmitter;
+import org.apache.druid.java.util.metrics.TaskHolder;
import org.hamcrest.Matcher;
import org.hamcrest.Matchers;
import org.junit.jupiter.api.Timeout;
@@ -72,9 +73,9 @@ public class LatchableEmitter extends StubServiceEmitter
/**
* Creates a {@link StubServiceEmitter} that may be used in embedded tests.
*/
- public LatchableEmitter(String service, String host, LatchableEmitterConfig config)
+ public LatchableEmitter(String service, String host, LatchableEmitterConfig config, TaskHolder taskHolder)
{
- super(service, host);
+ super(service, host, taskHolder);
this.defaultWaitTimeoutMillis = config.getDefaultWaitTimeoutMillis();
}
diff --git a/server/src/test/java/org/apache/druid/server/metrics/MetricsModuleTest.java b/server/src/test/java/org/apache/druid/server/metrics/MetricsModuleTest.java
index ad0e6aac3d94..5b54f0d5ccf8 100644
--- a/server/src/test/java/org/apache/druid/server/metrics/MetricsModuleTest.java
+++ b/server/src/test/java/org/apache/druid/server/metrics/MetricsModuleTest.java
@@ -49,9 +49,12 @@
import org.apache.druid.java.util.metrics.MonitorScheduler;
import org.apache.druid.java.util.metrics.NoopOshiSysMonitor;
import org.apache.druid.java.util.metrics.NoopSysMonitor;
+import org.apache.druid.java.util.metrics.NoopTaskHolder;
import org.apache.druid.java.util.metrics.OshiSysMonitor;
import org.apache.druid.java.util.metrics.OshiSysMonitorConfig;
import org.apache.druid.java.util.metrics.SysMonitor;
+import org.apache.druid.java.util.metrics.TaskHolder;
+import org.apache.druid.query.DruidMetrics;
import org.apache.druid.server.DruidNode;
import org.hamcrest.CoreMatchers;
import org.junit.Assert;
@@ -64,6 +67,7 @@
import javax.validation.Validation;
import javax.validation.Validator;
+import java.util.Map;
import java.util.Properties;
import java.util.Set;
@@ -99,8 +103,10 @@ public void configure(Binder binder)
@Test
public void testSimpleInjectionWithValues()
{
- final String dataSource = "some datasource";
- final String taskId = "some task";
+ final String dataSource = "some_datasource";
+ final String taskId = "some_taskid";
+ final String taskType = "some_task_type";
+ final String groupId = "some_groupid";
final Injector injector = Initialization.makeInjectorWithModules(
GuiceInjectors.makeStartupInjector(),
ImmutableList.of(new Module()
@@ -113,7 +119,7 @@ public void configure(Binder binder)
Key.get(DruidNode.class, Self.class),
new DruidNode("test-inject", null, false, null, null, true, false)
);
- binder.bind(TaskHolder.class).toInstance(new TestTaskHolder(dataSource, taskId));
+ binder.bind(TaskHolder.class).toInstance(new TestTaskHolder(dataSource, taskId, taskType, groupId));
binder.bind(LoadSpecHolder.class).to(DefaultLoadSpecHolder.class).in(LazySingleton.class);
}
})
@@ -121,6 +127,17 @@ public void configure(Binder binder)
TaskHolder taskHolder = injector.getInstance(TaskHolder.class);
Assert.assertEquals(dataSource, taskHolder.getDataSource());
Assert.assertEquals(taskId, taskHolder.getTaskId());
+ Assert.assertEquals(taskType, taskHolder.getTaskType());
+ Assert.assertEquals(groupId, taskHolder.getGroupId());
+ Map expectedTaskDims = Map.of(
+ DruidMetrics.DATASOURCE, dataSource,
+ DruidMetrics.TASK_ID, taskId,
+ DruidMetrics.ID, taskId,
+ DruidMetrics.TASK_TYPE, taskType,
+ DruidMetrics.GROUP_ID, groupId
+ );
+
+ Assert.assertEquals(expectedTaskDims, taskHolder.getMetricDimensions());
}
@Test
diff --git a/server/src/test/java/org/apache/druid/server/metrics/TestLoadSpecHolder.java b/server/src/test/java/org/apache/druid/server/metrics/TestLoadSpecHolder.java
new file mode 100644
index 000000000000..b66686b9a720
--- /dev/null
+++ b/server/src/test/java/org/apache/druid/server/metrics/TestLoadSpecHolder.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.server.metrics;
+
+import org.apache.druid.server.coordination.BroadcastDatasourceLoadingSpec;
+import org.apache.druid.server.lookup.cache.LookupLoadingSpec;
+
+public class TestLoadSpecHolder implements LoadSpecHolder
+{
+ private final LookupLoadingSpec lookupLoadingSpec;
+ private final BroadcastDatasourceLoadingSpec broadcastDatasourceLoadingSpec;
+
+ public TestLoadSpecHolder(
+ final LookupLoadingSpec lookupLoadingSpec,
+ final BroadcastDatasourceLoadingSpec broadcastDatasourceLoadingSpec
+ )
+ {
+ this.lookupLoadingSpec = lookupLoadingSpec;
+ this.broadcastDatasourceLoadingSpec = broadcastDatasourceLoadingSpec;
+ }
+
+ @Override
+ public LookupLoadingSpec getLookupLoadingSpec()
+ {
+ return lookupLoadingSpec;
+ }
+
+ @Override
+ public BroadcastDatasourceLoadingSpec getBroadcastDatasourceLoadingSpec()
+ {
+ return broadcastDatasourceLoadingSpec;
+ }
+}
diff --git a/server/src/test/java/org/apache/druid/server/metrics/TestTaskHolder.java b/server/src/test/java/org/apache/druid/server/metrics/TestTaskHolder.java
index eed9bcab8272..474ac71cbbc1 100644
--- a/server/src/test/java/org/apache/druid/server/metrics/TestTaskHolder.java
+++ b/server/src/test/java/org/apache/druid/server/metrics/TestTaskHolder.java
@@ -19,15 +19,24 @@
package org.apache.druid.server.metrics;
+import org.apache.druid.java.util.metrics.TaskHolder;
+import org.apache.druid.query.DruidMetrics;
+
+import java.util.Map;
+
public class TestTaskHolder implements TaskHolder
{
private final String dataSource;
private final String taskId;
+ private final String taskType;
+ private final String groupId;
- public TestTaskHolder(final String dataSource, final String taskId)
+ public TestTaskHolder(final String dataSource, final String taskId, final String taskType, final String groupId)
{
this.dataSource = dataSource;
this.taskId = taskId;
+ this.taskType = taskType;
+ this.groupId = groupId;
}
@Override
@@ -41,4 +50,28 @@ public String getTaskId()
{
return taskId;
}
+
+ @Override
+ public String getTaskType()
+ {
+ return taskType;
+ }
+
+ @Override
+ public String getGroupId()
+ {
+ return groupId;
+ }
+
+ @Override
+ public Map getMetricDimensions()
+ {
+ return Map.of(
+ DruidMetrics.DATASOURCE, dataSource,
+ DruidMetrics.TASK_ID, taskId,
+ DruidMetrics.ID, taskId,
+ DruidMetrics.TASK_TYPE, taskType,
+ DruidMetrics.GROUP_ID, groupId
+ );
+ }
}
diff --git a/services/src/main/java/org/apache/druid/cli/CliPeon.java b/services/src/main/java/org/apache/druid/cli/CliPeon.java
index 7aa859ebe326..84a6046d1ba4 100644
--- a/services/src/main/java/org/apache/druid/cli/CliPeon.java
+++ b/services/src/main/java/org/apache/druid/cli/CliPeon.java
@@ -94,6 +94,7 @@
import org.apache.druid.indexing.worker.shuffle.LocalIntermediaryDataManager;
import org.apache.druid.java.util.common.lifecycle.Lifecycle;
import org.apache.druid.java.util.common.logger.Logger;
+import org.apache.druid.java.util.metrics.TaskHolder;
import org.apache.druid.metadata.input.InputSourceModule;
import org.apache.druid.msq.guice.MSQDurableStorageModule;
import org.apache.druid.msq.guice.MSQExternalDataSourceModule;
@@ -131,7 +132,6 @@
import org.apache.druid.server.initialization.jetty.JettyServerInitializer;
import org.apache.druid.server.metrics.LoadSpecHolder;
import org.apache.druid.server.metrics.ServiceStatusMonitor;
-import org.apache.druid.server.metrics.TaskHolder;
import org.apache.druid.storage.local.LocalTmpStorageConfig;
import org.apache.druid.tasklogs.TaskPayloadManager;
import org.eclipse.jetty.server.Server;
diff --git a/services/src/main/java/org/apache/druid/cli/PeonTaskHolder.java b/services/src/main/java/org/apache/druid/cli/PeonTaskHolder.java
index 1c223dd6d39c..74ce547ed900 100644
--- a/services/src/main/java/org/apache/druid/cli/PeonTaskHolder.java
+++ b/services/src/main/java/org/apache/druid/cli/PeonTaskHolder.java
@@ -23,11 +23,14 @@
import com.google.inject.Provider;
import org.apache.druid.indexing.common.task.Task;
import org.apache.druid.java.util.metrics.AbstractMonitor;
+import org.apache.druid.java.util.metrics.TaskHolder;
+import org.apache.druid.query.DruidMetrics;
import org.apache.druid.query.aggregation.AggregatorFactory;
import org.apache.druid.query.lookup.LookupModule;
import org.apache.druid.segment.transform.TransformSpec;
import org.apache.druid.server.metrics.MetricsModule;
-import org.apache.druid.server.metrics.TaskHolder;
+
+import java.util.Map;
/**
* TaskHolder implementation for {@code CliPeon} processes.
@@ -59,4 +62,35 @@ public String getTaskId()
{
return taskProvider.get().getId();
}
+
+ @Override
+ public String getTaskType()
+ {
+ return taskProvider.get().getType();
+ }
+
+ @Override
+ public String getGroupId()
+ {
+ return taskProvider.get().getGroupId();
+ }
+
+ /**
+ * @return a map of all task-specific dimensions applicable to this peon.
+ * The task ID ({@link TaskHolder#getTaskId()}) is added to both {@link DruidMetrics#TASK_ID}
+ * {@link DruidMetrics#ID} dimensions to the map for backward compatibility. {@link DruidMetrics#ID} is
+ * deprecated because it's ambiguous and can be removed in a future release.
+ */
+ @Override
+ public Map getMetricDimensions()
+ {
+ final Task task = taskProvider.get();
+ return Map.of(
+ DruidMetrics.DATASOURCE, task.getDataSource(),
+ DruidMetrics.TASK_ID, task.getId(),
+ DruidMetrics.ID, task.getId(),
+ DruidMetrics.TASK_TYPE, task.getType(),
+ DruidMetrics.GROUP_ID, task.getGroupId()
+ );
+ }
}
diff --git a/services/src/test/java/org/apache/druid/cli/CliPeonTest.java b/services/src/test/java/org/apache/druid/cli/CliPeonTest.java
index 718933e7cfff..962b716a5b28 100644
--- a/services/src/test/java/org/apache/druid/cli/CliPeonTest.java
+++ b/services/src/test/java/org/apache/druid/cli/CliPeonTest.java
@@ -22,7 +22,10 @@
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
+import com.google.inject.Guice;
import com.google.inject.Injector;
+import com.google.inject.Module;
+import com.google.inject.Scopes;
import org.apache.commons.io.FileUtils;
import org.apache.druid.data.input.impl.ByteEntity;
import org.apache.druid.data.input.impl.CsvInputFormat;
@@ -30,6 +33,8 @@
import org.apache.druid.data.input.impl.TimestampSpec;
import org.apache.druid.discovery.NodeRole;
import org.apache.druid.guice.GuiceInjectors;
+import org.apache.druid.guice.LazySingleton;
+import org.apache.druid.guice.LifecycleModule;
import org.apache.druid.indexer.granularity.ArbitraryGranularitySpec;
import org.apache.druid.indexing.common.SegmentCacheManagerFactory;
import org.apache.druid.indexing.common.TaskToolbox;
@@ -46,9 +51,18 @@
import org.apache.druid.indexing.seekablestream.SeekableStreamStartSequenceNumbers;
import org.apache.druid.indexing.seekablestream.common.RecordSupplier;
import org.apache.druid.indexing.worker.executor.ExecutorLifecycleConfig;
+import org.apache.druid.jackson.JacksonModule;
import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.java.util.common.granularity.AllGranularity;
+import org.apache.druid.java.util.emitter.core.Emitter;
+import org.apache.druid.java.util.emitter.core.Event;
+import org.apache.druid.java.util.emitter.core.EventMap;
+import org.apache.druid.java.util.emitter.service.ServiceEventBuilder;
+import org.apache.druid.java.util.emitter.service.ServiceMetricEvent;
+import org.apache.druid.java.util.metrics.StubServiceEmitter;
+import org.apache.druid.java.util.metrics.StubServiceEmitterModule;
+import org.apache.druid.java.util.metrics.TaskHolder;
import org.apache.druid.query.DruidMetrics;
import org.apache.druid.query.aggregation.AggregatorFactory;
import org.apache.druid.query.aggregation.CountAggregatorFactory;
@@ -61,7 +75,6 @@
import org.apache.druid.server.coordination.BroadcastDatasourceLoadingSpec;
import org.apache.druid.server.lookup.cache.LookupLoadingSpec;
import org.apache.druid.server.metrics.LoadSpecHolder;
-import org.apache.druid.server.metrics.TaskHolder;
import org.apache.druid.storage.local.LocalTmpStorageConfig;
import org.joda.time.Duration;
import org.junit.Assert;
@@ -70,10 +83,14 @@
import org.junit.rules.TemporaryFolder;
import javax.annotation.Nullable;
+import javax.validation.Validation;
+import javax.validation.Validator;
import java.io.File;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
import java.util.Collections;
+import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
@@ -254,6 +271,43 @@ public void testTaskWithMonitorsAndMetricsSpecDoNotCauseCyclicDependency() throw
verifyTaskHolder(peonInjector.getInstance(TaskHolder.class), compactionTask);
}
+ @Test
+ public void testCliPeonMetricsContainAllTaskDimensions() throws IOException
+ {
+ final CompactionTask compactionTask = compactBuilder
+ .metricsSpec(new AggregatorFactory[]{
+ new CountAggregatorFactory("cnt"),
+ new LongSumAggregatorFactory("val", "val")
+ }).build();
+
+ final Injector peonInjector = makePeonInjectorWithStubEmitter(compactionTask, temporaryFolder, mapper);
+ verifyLoadSpecHolder(peonInjector.getInstance(LoadSpecHolder.class), compactionTask);
+ verifyTaskHolder(peonInjector.getInstance(TaskHolder.class), compactionTask);
+
+ Emitter instance = peonInjector.getInstance(Emitter.class);
+ Assert.assertTrue(instance instanceof StubServiceEmitter);
+ instance.start();
+
+ ServiceMetricEvent.Builder builder = ServiceMetricEvent.builder();
+ ServiceEventBuilder eventBuilder = builder.setMetric("foo", 1.0);
+ builder.setDimension("dd", "wikipedia");
+ builder.setDimension("ee", "index");
+ ((StubServiceEmitter) instance).emit(eventBuilder);
+
+ StubServiceEmitter stubEmitter = (StubServiceEmitter) instance;
+ Assert.assertEquals(1, stubEmitter.getNumEmittedEvents());
+ List events = stubEmitter.getEvents();
+ for (Event event : events) {
+ Assert.assertTrue(event instanceof ServiceMetricEvent);
+ EventMap map = event.toMap();
+ Assert.assertEquals(compactionTask.getDataSource(), map.get("dataSource"));
+ Assert.assertEquals(compactionTask.getId(), map.get("id"));
+ Assert.assertEquals(compactionTask.getId(), map.get("taskId"));
+ Assert.assertEquals(compactionTask.getType(), map.get("taskType"));
+ Assert.assertEquals(compactionTask.getGroupId(), map.get("groupId"));
+ }
+ }
+
private Injector makePeonInjector(File taskFile, Properties properties)
{
final CliPeon peon = new CliPeon();
@@ -264,6 +318,43 @@ private Injector makePeonInjector(File taskFile, Properties properties)
return peon.makeInjector(Set.of(NodeRole.PEON));
}
+ public static Injector makePeonInjectorWithStubEmitter(Task task, TemporaryFolder temporaryFolder, ObjectMapper mapper) throws IOException
+ {
+ File taskFile = temporaryFolder.newFile("task.json");
+ FileUtils.write(taskFile, mapper.writeValueAsString(task), StandardCharsets.UTF_8);
+
+ final Properties properties = new Properties();
+ properties.setProperty("druid.emitter", "stub");
+
+ final Injector baseInjector = Guice.createInjector(
+ new JacksonModule(),
+ new LifecycleModule(),
+ binder -> {
+ binder.bind(Validator.class).toInstance(Validation.buildDefaultValidatorFactory().getValidator());
+ binder.bindScope(LazySingleton.class, Scopes.SINGLETON);
+ binder.bind(Properties.class).toInstance(properties);
+ }
+ );
+
+ final CliPeon peon = new CliPeon()
+ {
+ @Override
+ protected List extends Module> getModules()
+ {
+ // Load the CliPeon with StubServiceEmitter
+ List modules = new ArrayList<>(super.getModules());
+ modules.add(new StubServiceEmitterModule());
+ return modules;
+ }
+ };
+
+ peon.taskAndStatusFile = ImmutableList.of(taskFile.getParent(), "1");
+
+ peon.configure(properties);
+ peon.configure(properties, baseInjector);
+ return peon.makeInjector(Set.of(NodeRole.PEON));
+ }
+
private Injector makePeonInjector(Task task, Properties properties) throws IOException
{
File taskFile = temporaryFolder.newFile("task.json");
diff --git a/services/src/test/java/org/apache/druid/server/AsyncQueryForwardingServletTest.java b/services/src/test/java/org/apache/druid/server/AsyncQueryForwardingServletTest.java
index 24416859ea2f..f79e239d586d 100644
--- a/services/src/test/java/org/apache/druid/server/AsyncQueryForwardingServletTest.java
+++ b/services/src/test/java/org/apache/druid/server/AsyncQueryForwardingServletTest.java
@@ -500,7 +500,7 @@ public Throwable getFailure()
}
};
- final StubServiceEmitter stubServiceEmitter = new StubServiceEmitter("", "");
+ final StubServiceEmitter stubServiceEmitter = StubServiceEmitter.createStarted();
final AsyncQueryForwardingServlet servlet = new AsyncQueryForwardingServlet(
new MapQueryToolChestWarehouse(ImmutableMap.of()),
TestHelper.makeJsonMapper(),
@@ -571,7 +571,7 @@ public Throwable getFailure()
}
};
- final StubServiceEmitter stubServiceEmitter = new StubServiceEmitter("", "");
+ final StubServiceEmitter stubServiceEmitter = StubServiceEmitter.createStarted();
final AsyncQueryForwardingServlet servlet = new AsyncQueryForwardingServlet(
new MapQueryToolChestWarehouse(ImmutableMap.of()),
TestHelper.makeJsonMapper(),
@@ -624,7 +624,7 @@ public void testOnFailureWithExceptionAndUnassignedStatusCode()
Mockito.when(responseMock.getStatus()).thenReturn(0); // Test unassigned http status code case from server
Mockito.when(responseMock.getHeaders()).thenReturn(HttpFields.build());
- final StubServiceEmitter stubServiceEmitter = new StubServiceEmitter("", "");
+ final StubServiceEmitter stubServiceEmitter = StubServiceEmitter.createStarted();
final AsyncQueryForwardingServlet servlet = new AsyncQueryForwardingServlet(
new MapQueryToolChestWarehouse(ImmutableMap.of()),
TestHelper.makeJsonMapper(),
@@ -871,7 +871,7 @@ public CompletableFuture abort(Throwable throwable)
}
};
final Result result = new Result(proxyRequestMock, response);
- final StubServiceEmitter stubServiceEmitter = new StubServiceEmitter("", "");
+ final StubServiceEmitter stubServiceEmitter = StubServiceEmitter.createStarted();
final AsyncQueryForwardingServlet servlet = new AsyncQueryForwardingServlet(
new MapQueryToolChestWarehouse(ImmutableMap.of()),
jsonMapper,
diff --git a/services/src/test/java/org/apache/druid/testing/embedded/emitter/LatchableEmitterModule.java b/services/src/test/java/org/apache/druid/testing/embedded/emitter/LatchableEmitterModule.java
index 78603090e5f1..9bd3c8bc30e3 100644
--- a/services/src/test/java/org/apache/druid/testing/embedded/emitter/LatchableEmitterModule.java
+++ b/services/src/test/java/org/apache/druid/testing/embedded/emitter/LatchableEmitterModule.java
@@ -28,6 +28,7 @@
import org.apache.druid.guice.annotations.Self;
import org.apache.druid.initialization.DruidModule;
import org.apache.druid.java.util.emitter.core.Emitter;
+import org.apache.druid.java.util.metrics.TaskHolder;
import org.apache.druid.server.DruidNode;
import org.apache.druid.server.metrics.LatchableEmitter;
import org.apache.druid.server.metrics.LatchableEmitterConfig;
@@ -51,9 +52,10 @@ public void configure(Binder binder)
@ManageLifecycle
public LatchableEmitter makeEmitter(
@Self DruidNode selfNode,
- LatchableEmitterConfig config
+ LatchableEmitterConfig config,
+ TaskHolder taskHolder
)
{
- return new LatchableEmitter(selfNode.getServiceName(), selfNode.getHost(), config);
+ return new LatchableEmitter(selfNode.getServiceName(), selfNode.getHost(), config, taskHolder);
}
}
diff --git a/sql/src/test/java/org/apache/druid/sql/http/SqlResourceTest.java b/sql/src/test/java/org/apache/druid/sql/http/SqlResourceTest.java
index 894619b9ea3c..6fb974a527cd 100644
--- a/sql/src/test/java/org/apache/druid/sql/http/SqlResourceTest.java
+++ b/sql/src/test/java/org/apache/druid/sql/http/SqlResourceTest.java
@@ -277,7 +277,7 @@ public void add(String sqlQueryId, Cancelable lifecycle)
}
}
};
- stubServiceEmitter = new StubServiceEmitter("test", "test");
+ stubServiceEmitter = StubServiceEmitter.createStarted();
final AuthConfig authConfig = new AuthConfig();
final DefaultQueryConfig defaultQueryConfig = new DefaultQueryConfig(ImmutableMap.of());
final SqlToolbox sqlToolbox = new SqlToolbox(