diff --git a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java index 46b101055c52..ce84944bb63b 100644 --- a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java +++ b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java @@ -39,6 +39,7 @@ import org.apache.commons.io.FileUtils; import org.apache.curator.test.TestingCluster; import org.apache.druid.cli.CliPeon; +import org.apache.druid.cli.CliPeonTest; import org.apache.druid.cli.PeonLoadSpecHolder; import org.apache.druid.cli.PeonTaskHolder; import org.apache.druid.data.input.InputEntity; @@ -88,6 +89,12 @@ import org.apache.druid.java.util.common.guava.Sequences; import org.apache.druid.java.util.common.parsers.CloseableIterator; import org.apache.druid.java.util.common.parsers.ParseException; +import org.apache.druid.java.util.emitter.core.Emitter; +import org.apache.druid.java.util.emitter.core.Event; +import org.apache.druid.java.util.emitter.core.EventMap; +import org.apache.druid.java.util.emitter.service.ServiceMetricEvent; +import org.apache.druid.java.util.metrics.StubServiceEmitter; +import org.apache.druid.java.util.metrics.TaskHolder; import org.apache.druid.math.expr.ExprMacroTable; import org.apache.druid.query.DefaultGenericQueryMetricsFactory; import org.apache.druid.query.DefaultQueryRunnerFactoryConglomerate; @@ -121,7 +128,6 @@ import org.apache.druid.server.coordination.BroadcastDatasourceLoadingSpec; import org.apache.druid.server.lookup.cache.LookupLoadingSpec; import org.apache.druid.server.metrics.LoadSpecHolder; -import org.apache.druid.server.metrics.TaskHolder; import org.apache.druid.server.security.Action; import org.apache.druid.server.security.Resource; import org.apache.druid.server.security.ResourceAction; @@ -3487,6 +3493,64 @@ public void testTaskWithTransformSpecDoesNotCauseCliPeonCyclicDependency() verifyPersistAndMergeTimeMetricsArePositive(observedSegmentGenerationMetrics); } + @Test(timeout = 60_000L) + public void testKafkaTaskContainsAllTaskDimensions() + throws IOException, ExecutionException, InterruptedException + { + insertData(); + + final KafkaIndexTask task = createTask( + "index_kafka_test_id1", + new KafkaIndexTaskIOConfig( + 0, + "sequence0", + new SeekableStreamStartSequenceNumbers<>(topic, ImmutableMap.of(new KafkaTopicPartition(false, topic, 0), 2L), ImmutableSet.of()), + new SeekableStreamEndSequenceNumbers<>(topic, ImmutableMap.of(new KafkaTopicPartition(false, topic, 0), 5L)), + kafkaServer.consumerProperties(), + KafkaSupervisorIOConfig.DEFAULT_POLL_TIMEOUT_MILLIS, + true, + null, + null, + INPUT_FORMAT, + null, + Duration.standardHours(2).getStandardMinutes() + ) + ); + + Injector peonInjector = CliPeonTest.makePeonInjectorWithStubEmitter(task, temporaryFolder, OBJECT_MAPPER); + Emitter peonEmitter = peonInjector.getInstance(Emitter.class); + Assert.assertTrue(peonEmitter instanceof StubServiceEmitter); + emitter = (StubServiceEmitter) peonEmitter; + emitter.start(); + makeToolboxFactory(); + + final ListenableFuture future = runTask(task); + + // Wait for task to exit + Assert.assertEquals(TaskState.SUCCESS, future.get().getStatusCode()); + Assert.assertTrue(emitter.getNumEmittedEvents() > 0); + + // Check published metadata & segments in deep storage + final List publishedDescriptors = publishedDescriptors(); + Assert.assertEquals( + new KafkaDataSourceMetadata(new SeekableStreamEndSequenceNumbers<>(topic, ImmutableMap.of(new KafkaTopicPartition(false, topic, 0), 5L))), + newDataSchemaMetadata() + ); + Assert.assertEquals(ImmutableList.of("c"), readSegmentColumn("dim1", publishedDescriptors.get(0))); + + Assert.assertTrue(emitter.getNumEmittedEvents() > 0); + for (Event event : emitter.getEvents()) { + if (event instanceof ServiceMetricEvent) { + EventMap observedEvent = event.toMap(); + Assert.assertEquals("test_ds", observedEvent.get("dataSource")); + Assert.assertEquals("index_kafka_test_id1", observedEvent.get("id")); + Assert.assertEquals("index_kafka_test_id1", observedEvent.get("taskId")); + Assert.assertEquals("index_kafka", observedEvent.get("taskType")); + Assert.assertEquals("index_kafka_test_ds", observedEvent.get("groupId")); + } + } + } + public static class TestKafkaInputFormat implements InputFormat { final InputFormat baseInputFormat; diff --git a/processing/src/main/java/org/apache/druid/java/util/emitter/service/ServiceEmitter.java b/processing/src/main/java/org/apache/druid/java/util/emitter/service/ServiceEmitter.java index 13593fd146fc..d5e241df5a43 100644 --- a/processing/src/main/java/org/apache/druid/java/util/emitter/service/ServiceEmitter.java +++ b/processing/src/main/java/org/apache/druid/java/util/emitter/service/ServiceEmitter.java @@ -25,39 +25,56 @@ import org.apache.druid.java.util.common.lifecycle.LifecycleStop; import org.apache.druid.java.util.emitter.core.Emitter; import org.apache.druid.java.util.emitter.core.Event; +import org.apache.druid.java.util.metrics.NoopTaskHolder; +import org.apache.druid.java.util.metrics.TaskHolder; import java.io.IOException; public class ServiceEmitter implements Emitter { - private final ImmutableMap serviceDimensions; private final Emitter emitter; + private final String service; + private final ImmutableMap otherServiceDimensions; + private final String host; + private final TaskHolder taskHolder; + + /** + * This is initialized in {@link #start()} rather than in the constructor, since calling {@link TaskHolder#getMetricDimensions()} + * may introduce cyclic dependencies. So we defer initialization until {@link #start()} which is {@link LifecycleStart} managed. + */ + private ImmutableMap serviceDimensions; public ServiceEmitter(String service, String host, Emitter emitter) { - this(service, host, emitter, ImmutableMap.of()); + this(service, host, emitter, ImmutableMap.of(), new NoopTaskHolder()); } public ServiceEmitter( String service, String host, Emitter emitter, - ImmutableMap otherServiceDimensions + ImmutableMap otherServiceDimensions, + TaskHolder taskHolder ) { - this.serviceDimensions = ImmutableMap - .builder() - .put("service", Preconditions.checkNotNull(service, "service should be non-null")) - .put("host", Preconditions.checkNotNull(host, "host should be non-null")) - .putAll(otherServiceDimensions) - .build(); + this.service = Preconditions.checkNotNull(service, "service should be non-null"); + this.host = Preconditions.checkNotNull(host, "host should be non-null"); + this.otherServiceDimensions = otherServiceDimensions; this.emitter = emitter; + this.taskHolder = taskHolder; } @Override @LifecycleStart public void start() { + serviceDimensions = ImmutableMap + .builder() + .put(Event.SERVICE, service) + .put(Event.HOST, host) + .putAll(otherServiceDimensions) + .putAll(taskHolder.getMetricDimensions()) + .build(); emitter.start(); } diff --git a/processing/src/main/java/org/apache/druid/java/util/metrics/CgroupCpuMonitor.java b/processing/src/main/java/org/apache/druid/java/util/metrics/CgroupCpuMonitor.java index d4589df73c08..1c6dc7abcea0 100644 --- a/processing/src/main/java/org/apache/druid/java/util/metrics/CgroupCpuMonitor.java +++ b/processing/src/main/java/org/apache/druid/java/util/metrics/CgroupCpuMonitor.java @@ -40,23 +40,21 @@ public class CgroupCpuMonitor extends FeedDefiningMonitor private static final Logger LOG = new Logger(CgroupCpuMonitor.class); private static final Long DEFAULT_USER_HZ = 100L; final CgroupDiscoverer cgroupDiscoverer; - final Map dimensions; private Long userHz; private final KeyedDiff jiffies = new KeyedDiff(); private long prevJiffiesSnapshotAt = 0; private final boolean isRunningOnCgroupsV2; private final CgroupV2CpuMonitor cgroupV2CpuMonitor; - public CgroupCpuMonitor(CgroupDiscoverer cgroupDiscoverer, final Map dimensions, String feed) + public CgroupCpuMonitor(CgroupDiscoverer cgroupDiscoverer, String feed) { super(feed); this.cgroupDiscoverer = cgroupDiscoverer; - this.dimensions = dimensions; - + // Check if we're running on cgroups v2 this.isRunningOnCgroupsV2 = cgroupDiscoverer.getCgroupVersion().equals(CgroupVersion.V2); if (isRunningOnCgroupsV2) { - this.cgroupV2CpuMonitor = new CgroupV2CpuMonitor(cgroupDiscoverer, dimensions, feed); + this.cgroupV2CpuMonitor = new CgroupV2CpuMonitor(cgroupDiscoverer, feed); LOG.info("Detected cgroups v2, using CgroupV2CpuMonitor behavior for accurate metrics"); } else { this.cgroupV2CpuMonitor = null; @@ -65,19 +63,14 @@ public CgroupCpuMonitor(CgroupDiscoverer cgroupDiscoverer, final Map dimensions, String feed) - { - this(ProcSelfCgroupDiscoverer.autoCgroupDiscoverer(), dimensions, feed); - } - - public CgroupCpuMonitor(final Map dimensions) + public CgroupCpuMonitor(String feed) { - this(dimensions, DEFAULT_METRICS_FEED); + this(ProcSelfCgroupDiscoverer.autoCgroupDiscoverer(), feed); } public CgroupCpuMonitor() { - this(ImmutableMap.of()); + this(DEFAULT_METRICS_FEED); } @Override @@ -96,7 +89,6 @@ private boolean doMonitorV1(ServiceEmitter emitter) long now = Instant.now().getEpochSecond(); final ServiceMetricEvent.Builder builder = builder(); - MonitorUtils.addDimensionsToBuilder(builder, dimensions); builder.setDimension("cgroupversion", cgroupDiscoverer.getCgroupVersion().name()); emitter.emit(builder.setMetric("cgroup/cpu/shares", cpuSnapshot.getShares())); diff --git a/processing/src/main/java/org/apache/druid/java/util/metrics/CgroupCpuSetMonitor.java b/processing/src/main/java/org/apache/druid/java/util/metrics/CgroupCpuSetMonitor.java index 4d4ae2008bb5..90ab47545819 100644 --- a/processing/src/main/java/org/apache/druid/java/util/metrics/CgroupCpuSetMonitor.java +++ b/processing/src/main/java/org/apache/druid/java/util/metrics/CgroupCpuSetMonitor.java @@ -19,15 +19,12 @@ package org.apache.druid.java.util.metrics; -import com.google.common.collect.ImmutableMap; import org.apache.druid.java.util.emitter.service.ServiceEmitter; import org.apache.druid.java.util.emitter.service.ServiceMetricEvent; import org.apache.druid.java.util.metrics.cgroups.CgroupDiscoverer; import org.apache.druid.java.util.metrics.cgroups.CpuSet; import org.apache.druid.java.util.metrics.cgroups.ProcSelfCgroupDiscoverer; -import java.util.Map; - /** * Monitor that reports CPU set metrics from cgroups both v1 and v2. */ @@ -35,28 +32,21 @@ public class CgroupCpuSetMonitor extends FeedDefiningMonitor { final CgroupDiscoverer cgroupDiscoverer; - final Map dimensions; - public CgroupCpuSetMonitor(CgroupDiscoverer cgroupDiscoverer, final Map dimensions, String feed) + public CgroupCpuSetMonitor(CgroupDiscoverer cgroupDiscoverer, String feed) { super(feed); this.cgroupDiscoverer = cgroupDiscoverer; - this.dimensions = dimensions; - } - - public CgroupCpuSetMonitor(final Map dimensions, String feed) - { - this(ProcSelfCgroupDiscoverer.autoCgroupDiscoverer(), dimensions, feed); } - public CgroupCpuSetMonitor(final Map dimensions) + public CgroupCpuSetMonitor(String feed) { - this(dimensions, DEFAULT_METRICS_FEED); + this(ProcSelfCgroupDiscoverer.autoCgroupDiscoverer(), feed); } public CgroupCpuSetMonitor() { - this(ImmutableMap.of()); + this(DEFAULT_METRICS_FEED); } @Override @@ -64,7 +54,6 @@ public boolean doMonitor(ServiceEmitter emitter) { final CpuSet.CpuSetMetric cpusetSnapshot = cgroupDiscoverer.getCpuSetMetrics(); final ServiceMetricEvent.Builder builder = builder(); - MonitorUtils.addDimensionsToBuilder(builder, dimensions); builder.setDimension("cgroupversion", cgroupDiscoverer.getCgroupVersion().name()); emitter.emit(builder.setMetric( "cgroup/cpuset/cpu_count", diff --git a/processing/src/main/java/org/apache/druid/java/util/metrics/CgroupDiskMonitor.java b/processing/src/main/java/org/apache/druid/java/util/metrics/CgroupDiskMonitor.java index d120cd8a28bf..28c985c1ffc2 100644 --- a/processing/src/main/java/org/apache/druid/java/util/metrics/CgroupDiskMonitor.java +++ b/processing/src/main/java/org/apache/druid/java/util/metrics/CgroupDiskMonitor.java @@ -33,41 +33,34 @@ public class CgroupDiskMonitor extends FeedDefiningMonitor { private static final Logger LOG = new Logger(CgroupDiskMonitor.class); - final CgroupDiscoverer cgroupDiscoverer; - final Map dimensions; + private final CgroupDiscoverer cgroupDiscoverer; private final KeyedDiff diff = new KeyedDiff(); private final boolean isRunningOnCgroupsV2; private final CgroupV2DiskMonitor cgroupV2DiskMonitor; - public CgroupDiskMonitor(CgroupDiscoverer cgroupDiscoverer, final Map dimensions, String feed) + public CgroupDiskMonitor(CgroupDiscoverer cgroupDiscoverer, String feed) { super(feed); this.cgroupDiscoverer = cgroupDiscoverer; - this.dimensions = dimensions; - + // Check if we're running on cgroups v2 this.isRunningOnCgroupsV2 = cgroupDiscoverer.getCgroupVersion().equals(CgroupVersion.V2); if (isRunningOnCgroupsV2) { - this.cgroupV2DiskMonitor = new CgroupV2DiskMonitor(cgroupDiscoverer, dimensions, feed); + this.cgroupV2DiskMonitor = new CgroupV2DiskMonitor(cgroupDiscoverer, feed); LOG.info("Detected cgroups v2, using CgroupV2DiskMonitor behavior for accurate metrics"); } else { this.cgroupV2DiskMonitor = null; } } - public CgroupDiskMonitor(final Map dimensions, String feed) - { - this(ProcSelfCgroupDiscoverer.autoCgroupDiscoverer(), dimensions, feed); - } - - public CgroupDiskMonitor(final Map dimensions) + public CgroupDiskMonitor(String feed) { - this(dimensions, DEFAULT_METRICS_FEED); + this(ProcSelfCgroupDiscoverer.autoCgroupDiscoverer(), feed); } public CgroupDiskMonitor() { - this(ImmutableMap.of()); + this(DEFAULT_METRICS_FEED); } @Override @@ -97,7 +90,6 @@ private boolean doMonitorV1(ServiceEmitter emitter) if (stats != null) { final ServiceMetricEvent.Builder builder = builder() .setDimension("diskName", entry.getValue().getDiskName()); - MonitorUtils.addDimensionsToBuilder(builder, dimensions); builder.setDimension("cgroupversion", cgroupDiscoverer.getCgroupVersion()); for (Map.Entry stat : stats.entrySet()) { emitter.emit(builder.setMetric(stat.getKey(), stat.getValue())); diff --git a/processing/src/main/java/org/apache/druid/java/util/metrics/CgroupMemoryMonitor.java b/processing/src/main/java/org/apache/druid/java/util/metrics/CgroupMemoryMonitor.java index 24c2aa35bc59..ecffdfb779e6 100644 --- a/processing/src/main/java/org/apache/druid/java/util/metrics/CgroupMemoryMonitor.java +++ b/processing/src/main/java/org/apache/druid/java/util/metrics/CgroupMemoryMonitor.java @@ -19,7 +19,6 @@ package org.apache.druid.java.util.metrics; -import com.google.common.collect.ImmutableMap; import org.apache.druid.java.util.common.StringUtils; import org.apache.druid.java.util.common.logger.Logger; import org.apache.druid.java.util.emitter.service.ServiceEmitter; @@ -29,8 +28,6 @@ import org.apache.druid.java.util.metrics.cgroups.Memory; import org.apache.druid.java.util.metrics.cgroups.ProcSelfCgroupDiscoverer; -import java.util.Map; - public class CgroupMemoryMonitor extends FeedDefiningMonitor { private static final Logger LOG = new Logger(CgroupMemoryMonitor.class); @@ -38,40 +35,33 @@ public class CgroupMemoryMonitor extends FeedDefiningMonitor private static final String MEMORY_LIMIT_FILE = "memory.limit_in_bytes"; final CgroupDiscoverer cgroupDiscoverer; - final Map dimensions; private final boolean isRunningOnCgroupsV2; private final CgroupV2MemoryMonitor cgroupV2MemoryMonitor; - public CgroupMemoryMonitor(CgroupDiscoverer cgroupDiscoverer, final Map dimensions, String feed) + public CgroupMemoryMonitor(CgroupDiscoverer cgroupDiscoverer, String feed) { super(feed); this.cgroupDiscoverer = cgroupDiscoverer; - this.dimensions = dimensions; - + // Check if we're running on cgroups v2 this.isRunningOnCgroupsV2 = cgroupDiscoverer.getCgroupVersion().equals(CgroupVersion.V2); if (isRunningOnCgroupsV2) { - this.cgroupV2MemoryMonitor = new CgroupV2MemoryMonitor(cgroupDiscoverer, dimensions, feed); + this.cgroupV2MemoryMonitor = new CgroupV2MemoryMonitor(cgroupDiscoverer, feed); LOG.info("Detected cgroups v2, using CgroupV2MemoryMonitor behavior for accurate metrics"); } else { this.cgroupV2MemoryMonitor = null; } } - public CgroupMemoryMonitor(final Map dimensions, String feed) - { - this(ProcSelfCgroupDiscoverer.autoCgroupDiscoverer(), dimensions, feed); - } - - public CgroupMemoryMonitor(final Map dimensions) + public CgroupMemoryMonitor(String feed) { - this(dimensions, DEFAULT_METRICS_FEED); + this(ProcSelfCgroupDiscoverer.autoCgroupDiscoverer(), feed); } public CgroupMemoryMonitor() { - this(ImmutableMap.of()); + this(DEFAULT_METRICS_FEED); } @Override @@ -80,7 +70,7 @@ public boolean doMonitor(ServiceEmitter emitter) if (isRunningOnCgroupsV2) { return cgroupV2MemoryMonitor.doMonitor(emitter); } else { - return parseAndEmit(emitter, cgroupDiscoverer, dimensions, MEMORY_USAGE_FILE, MEMORY_LIMIT_FILE, this); + return parseAndEmit(emitter, cgroupDiscoverer, MEMORY_USAGE_FILE, MEMORY_LIMIT_FILE, this); } } @@ -90,7 +80,6 @@ public boolean doMonitor(ServiceEmitter emitter) public static boolean parseAndEmit( ServiceEmitter emitter, CgroupDiscoverer cgroupDiscoverer, - Map dimensions, String memoryUsageFile, String memoryLimitFile, FeedDefiningMonitor feedDefiningMonitor @@ -99,7 +88,6 @@ public static boolean parseAndEmit( final Memory memory = new Memory(cgroupDiscoverer); final Memory.MemoryStat stat = memory.snapshot(memoryUsageFile, memoryLimitFile); final ServiceMetricEvent.Builder builder = feedDefiningMonitor.builder(); - MonitorUtils.addDimensionsToBuilder(builder, dimensions); builder.setDimension("cgroupversion", cgroupDiscoverer.getCgroupVersion().name()); emitter.emit(builder.setMetric("cgroup/memory/usage/bytes", stat.getUsage())); emitter.emit(builder.setMetric("cgroup/memory/limit/bytes", stat.getLimit())); diff --git a/processing/src/main/java/org/apache/druid/java/util/metrics/CgroupV2CpuMonitor.java b/processing/src/main/java/org/apache/druid/java/util/metrics/CgroupV2CpuMonitor.java index 3e9dbb4a3fe5..bf95394485cd 100644 --- a/processing/src/main/java/org/apache/druid/java/util/metrics/CgroupV2CpuMonitor.java +++ b/processing/src/main/java/org/apache/druid/java/util/metrics/CgroupV2CpuMonitor.java @@ -39,20 +39,18 @@ public class CgroupV2CpuMonitor extends FeedDefiningMonitor { private static final String SNAPSHOT = "snapshot"; final CgroupDiscoverer cgroupDiscoverer; - final Map dimensions; private final KeyedDiff diff = new KeyedDiff(); - public CgroupV2CpuMonitor(CgroupDiscoverer cgroupDiscoverer, final Map dimensions, String feed) + public CgroupV2CpuMonitor(CgroupDiscoverer cgroupDiscoverer, String feed) { super(feed); this.cgroupDiscoverer = cgroupDiscoverer; - this.dimensions = dimensions; } @VisibleForTesting CgroupV2CpuMonitor(CgroupDiscoverer cgroupDiscoverer) { - this(cgroupDiscoverer, ImmutableMap.of(), DEFAULT_METRICS_FEED); + this(cgroupDiscoverer, DEFAULT_METRICS_FEED); } CgroupV2CpuMonitor() @@ -64,7 +62,6 @@ public CgroupV2CpuMonitor(CgroupDiscoverer cgroupDiscoverer, final Map dimensions, String feed) + public CgroupV2CpuSetMonitor(CgroupDiscoverer cgroupDiscoverer, String feed) { - super(cgroupDiscoverer, dimensions, feed); + super(cgroupDiscoverer, feed); } @VisibleForTesting CgroupV2CpuSetMonitor(CgroupDiscoverer cgroupDiscoverer) { - this(cgroupDiscoverer, ImmutableMap.of(), DEFAULT_METRICS_FEED); + this(cgroupDiscoverer, DEFAULT_METRICS_FEED); } CgroupV2CpuSetMonitor() diff --git a/processing/src/main/java/org/apache/druid/java/util/metrics/CgroupV2DiskMonitor.java b/processing/src/main/java/org/apache/druid/java/util/metrics/CgroupV2DiskMonitor.java index b35f780851b0..1df7181677b7 100644 --- a/processing/src/main/java/org/apache/druid/java/util/metrics/CgroupV2DiskMonitor.java +++ b/processing/src/main/java/org/apache/druid/java/util/metrics/CgroupV2DiskMonitor.java @@ -47,20 +47,18 @@ public class CgroupV2DiskMonitor extends FeedDefiningMonitor private static final Logger LOG = new Logger(CgroupV2DiskMonitor.class); private static final String IO_STAT = "io.stat"; final CgroupDiscoverer cgroupDiscoverer; - final Map dimensions; private final KeyedDiff diff = new KeyedDiff(); - public CgroupV2DiskMonitor(CgroupDiscoverer cgroupDiscoverer, final Map dimensions, String feed) + public CgroupV2DiskMonitor(CgroupDiscoverer cgroupDiscoverer, String feed) { super(feed); this.cgroupDiscoverer = cgroupDiscoverer; - this.dimensions = dimensions; } @VisibleForTesting CgroupV2DiskMonitor(CgroupDiscoverer cgroupDiscoverer) { - this(cgroupDiscoverer, ImmutableMap.of(), DEFAULT_METRICS_FEED); + this(cgroupDiscoverer, DEFAULT_METRICS_FEED); } CgroupV2DiskMonitor() @@ -86,7 +84,6 @@ public boolean doMonitor(ServiceEmitter emitter) if (stats != null) { final ServiceMetricEvent.Builder builder = builder() .setDimension("diskName", entry.getDiskName()); - MonitorUtils.addDimensionsToBuilder(builder, dimensions); builder.setDimension("cgroupversion", cgroupDiscoverer.getCgroupVersion()); for (Map.Entry stat : stats.entrySet()) { emitter.emit(builder.setMetric(stat.getKey(), stat.getValue())); diff --git a/processing/src/main/java/org/apache/druid/java/util/metrics/CgroupV2MemoryMonitor.java b/processing/src/main/java/org/apache/druid/java/util/metrics/CgroupV2MemoryMonitor.java index 43b15cbf728b..8207cc07f8d2 100644 --- a/processing/src/main/java/org/apache/druid/java/util/metrics/CgroupV2MemoryMonitor.java +++ b/processing/src/main/java/org/apache/druid/java/util/metrics/CgroupV2MemoryMonitor.java @@ -20,14 +20,11 @@ package org.apache.druid.java.util.metrics; import com.google.common.annotations.VisibleForTesting; -import com.google.common.collect.ImmutableMap; import org.apache.druid.java.util.emitter.service.ServiceEmitter; import org.apache.druid.java.util.metrics.cgroups.CgroupDiscoverer; import org.apache.druid.java.util.metrics.cgroups.ProcCgroupV2Discoverer; import org.apache.druid.java.util.metrics.cgroups.ProcSelfCgroupDiscoverer; -import java.util.Map; - /** * Monitor that reports memory usage stats by reading `memory.*` files reported by cgroupv2 */ @@ -37,40 +34,31 @@ public class CgroupV2MemoryMonitor extends FeedDefiningMonitor private static final String MEMORY_USAGE_FILE = "memory.current"; private static final String MEMORY_LIMIT_FILE = "memory.max"; private final CgroupDiscoverer cgroupDiscoverer; - private final Map dimensions; @VisibleForTesting - CgroupV2MemoryMonitor(CgroupDiscoverer cgroupDiscoverer, final Map dimensions, String feed) + CgroupV2MemoryMonitor(CgroupDiscoverer cgroupDiscoverer, String feed) { super(feed); this.cgroupDiscoverer = cgroupDiscoverer; - this.dimensions = dimensions; } - public CgroupV2MemoryMonitor(final Map dimensions, String feed) + public CgroupV2MemoryMonitor(String feed) { - this(new ProcSelfCgroupDiscoverer(ProcCgroupV2Discoverer.class), dimensions, feed); - } - - public CgroupV2MemoryMonitor(final Map dimensions) - { - this(dimensions, DEFAULT_METRICS_FEED); + this(new ProcSelfCgroupDiscoverer(ProcCgroupV2Discoverer.class), feed); } public CgroupV2MemoryMonitor() { - this(ImmutableMap.of()); + this(DEFAULT_METRICS_FEED); } - @Override public boolean doMonitor(ServiceEmitter emitter) { return CgroupMemoryMonitor.parseAndEmit( emitter, cgroupDiscoverer, - dimensions, MEMORY_USAGE_FILE, MEMORY_LIMIT_FILE, this diff --git a/processing/src/main/java/org/apache/druid/java/util/metrics/CpuAcctDeltaMonitor.java b/processing/src/main/java/org/apache/druid/java/util/metrics/CpuAcctDeltaMonitor.java index 10d46258556c..7d6364422d87 100644 --- a/processing/src/main/java/org/apache/druid/java/util/metrics/CpuAcctDeltaMonitor.java +++ b/processing/src/main/java/org/apache/druid/java/util/metrics/CpuAcctDeltaMonitor.java @@ -20,7 +20,6 @@ package org.apache.druid.java.util.metrics; import com.google.common.base.Preconditions; -import com.google.common.collect.ImmutableMap; import org.apache.druid.java.util.common.DateTimes; import org.apache.druid.java.util.common.StringUtils; import org.apache.druid.java.util.common.logger.Logger; @@ -32,7 +31,6 @@ import org.apache.druid.java.util.metrics.cgroups.ProcSelfCgroupDiscoverer; import org.joda.time.DateTime; -import java.util.Map; import java.util.concurrent.atomic.AtomicReference; public class CpuAcctDeltaMonitor extends FeedDefiningMonitor @@ -44,35 +42,23 @@ public class CpuAcctDeltaMonitor extends FeedDefiningMonitor CgroupV2CpuMonitor.class.getSimpleName() ); private final AtomicReference priorSnapshot = new AtomicReference<>(null); - private final Map dimensions; private final CgroupDiscoverer cgroupDiscoverer; private final boolean isRunningOnCgroupsV2; public CpuAcctDeltaMonitor() { - this(ImmutableMap.of()); + this(DEFAULT_METRICS_FEED); } - public CpuAcctDeltaMonitor(final Map dimensions) + public CpuAcctDeltaMonitor(final String feed) { - this(dimensions, DEFAULT_METRICS_FEED); + this(feed, ProcSelfCgroupDiscoverer.autoCgroupDiscoverer()); } - public CpuAcctDeltaMonitor(final Map dimensions, final String feed) - { - this(feed, dimensions, ProcSelfCgroupDiscoverer.autoCgroupDiscoverer()); - } - - public CpuAcctDeltaMonitor( - String feed, - Map dimensions, - CgroupDiscoverer cgroupDiscoverer - ) + public CpuAcctDeltaMonitor(String feed, CgroupDiscoverer cgroupDiscoverer) { super(feed); - Preconditions.checkNotNull(dimensions); - this.dimensions = ImmutableMap.copyOf(dimensions); this.cgroupDiscoverer = Preconditions.checkNotNull(cgroupDiscoverer, "cgroupDiscoverer required"); isRunningOnCgroupsV2 = cgroupDiscoverer.getCgroupVersion().equals(CgroupVersion.V2); @@ -119,8 +105,6 @@ public boolean doMonitor(ServiceEmitter emitter) .setDimension("cpuName", Integer.toString(i)) .setDimension("cpuTime", "sys") .setDimension("cgroupversion", cgroupDiscoverer.getCgroupVersion().name()); - MonitorUtils.addDimensionsToBuilder(builderUsr, dimensions); - MonitorUtils.addDimensionsToBuilder(builderSys, dimensions); emitter.emit(builderUsr.setCreatedTime(dateTime).setMetric( "cgroup/cpu_time_delta_ns", snapshot.usrTime(i) - priorSnapshotHolder.metric.usrTime(i) diff --git a/processing/src/main/java/org/apache/druid/java/util/metrics/JvmCpuMonitor.java b/processing/src/main/java/org/apache/druid/java/util/metrics/JvmCpuMonitor.java index b587d26ee607..606be1daba95 100644 --- a/processing/src/main/java/org/apache/druid/java/util/metrics/JvmCpuMonitor.java +++ b/processing/src/main/java/org/apache/druid/java/util/metrics/JvmCpuMonitor.java @@ -19,7 +19,6 @@ package org.apache.druid.java.util.metrics; -import com.google.common.base.Preconditions; import com.google.common.collect.ImmutableMap; import org.apache.druid.java.util.common.logger.Logger; import org.apache.druid.java.util.emitter.service.ServiceEmitter; @@ -39,23 +38,14 @@ public class JvmCpuMonitor extends FeedDefiningMonitor private final KeyedDiff diff = new KeyedDiff(); - private Map dimensions; - public JvmCpuMonitor() { - this(ImmutableMap.of()); - } - - public JvmCpuMonitor(Map dimensions) - { - this(dimensions, DEFAULT_METRICS_FEED); + this(DEFAULT_METRICS_FEED); } - public JvmCpuMonitor(Map dimensions, String feed) + public JvmCpuMonitor(String feed) { super(feed); - Preconditions.checkNotNull(dimensions); - this.dimensions = ImmutableMap.copyOf(dimensions); } @Override @@ -65,7 +55,6 @@ public boolean doMonitor(ServiceEmitter emitter) try { ProcCpu procCpu = sigar.getProcCpu(currentProcessId); final ServiceMetricEvent.Builder builder = builder(); - MonitorUtils.addDimensionsToBuilder(builder, dimensions); // delta for total, sys, user Map procDiff = diff.to( "proc/cpu", ImmutableMap.of( diff --git a/processing/src/main/java/org/apache/druid/java/util/metrics/JvmMonitor.java b/processing/src/main/java/org/apache/druid/java/util/metrics/JvmMonitor.java index 2a2bfdeda57f..08a9ab5bfc8e 100644 --- a/processing/src/main/java/org/apache/druid/java/util/metrics/JvmMonitor.java +++ b/processing/src/main/java/org/apache/druid/java/util/metrics/JvmMonitor.java @@ -20,7 +20,6 @@ package org.apache.druid.java.util.metrics; import com.google.common.annotations.VisibleForTesting; -import com.google.common.base.Preconditions; import com.google.common.collect.ImmutableMap; import org.apache.druid.java.util.common.Pair; import org.apache.druid.java.util.emitter.service.ServiceEmitter; @@ -45,25 +44,17 @@ public class JvmMonitor extends FeedDefiningMonitor @VisibleForTesting @Nullable final GcCollectors gcCollectors; - private final Map dimensions; @Nullable private final AllocationMetricCollector collector; public JvmMonitor() { - this(ImmutableMap.of()); + this(DEFAULT_METRICS_FEED); } - public JvmMonitor(Map dimensions) - { - this(dimensions, DEFAULT_METRICS_FEED); - } - - public JvmMonitor(Map dimensions, String feed) + public JvmMonitor(String feed) { super(feed); - Preconditions.checkNotNull(dimensions); - this.dimensions = ImmutableMap.copyOf(dimensions); this.collector = AllocationMetricCollectors.getAllocationMetricCollector(); this.gcCollectors = new GcCollectors(); } @@ -82,7 +73,6 @@ public boolean doMonitor(ServiceEmitter emitter) private void emitThreadAllocationMetrics(ServiceEmitter emitter) { final ServiceMetricEvent.Builder builder = builder(); - MonitorUtils.addDimensionsToBuilder(builder, dimensions); builder.setDimension(JVM_VERSION, JAVA_VERSION); if (collector != null) { long delta = collector.calculateDelta(); @@ -108,7 +98,6 @@ private void emitJvmMemMetrics(ServiceEmitter emitter) final ServiceMetricEvent.Builder builder = builder() .setDimension("memKind", kind) .setDimension(JVM_VERSION, JAVA_VERSION); - MonitorUtils.addDimensionsToBuilder(builder, dimensions); emitter.emit(builder.setMetric("jvm/mem/max", usage.getMax())); emitter.emit(builder.setMetric("jvm/mem/committed", usage.getCommitted())); @@ -124,7 +113,6 @@ private void emitJvmMemMetrics(ServiceEmitter emitter) .setDimension("poolKind", kind) .setDimension("poolName", pool.getName()) .setDimension(JVM_VERSION, JAVA_VERSION); - MonitorUtils.addDimensionsToBuilder(builder, dimensions); emitter.emit(builder.setMetric("jvm/pool/max", usage.getMax())); emitter.emit(builder.setMetric("jvm/pool/committed", usage.getCommitted())); @@ -139,7 +127,6 @@ private void emitDirectMemMetrics(ServiceEmitter emitter) final ServiceMetricEvent.Builder builder = builder() .setDimension("bufferpoolName", pool.getName()) .setDimension(JVM_VERSION, JAVA_VERSION); - MonitorUtils.addDimensionsToBuilder(builder, dimensions); emitter.emit(builder.setMetric("jvm/bufferpool/capacity", pool.getTotalCapacity())); emitter.emit(builder.setMetric("jvm/bufferpool/used", pool.getMemoryUsed())); @@ -149,7 +136,7 @@ private void emitDirectMemMetrics(ServiceEmitter emitter) private void emitGcMetrics(ServiceEmitter emitter) { - gcCollectors.emit(emitter, dimensions); + gcCollectors.emit(emitter); } private class GcCollectors @@ -174,14 +161,14 @@ private class GcCollectors } - void emit(ServiceEmitter emitter, Map dimensions) + void emit(ServiceEmitter emitter) { for (GcGenerationCollector generationCollector : generationCollectors) { - generationCollector.emit(emitter, dimensions); + generationCollector.emit(emitter); } for (GcSpaceCollector spaceCollector : spaceCollectors) { - spaceCollector.emit(emitter, dimensions); + spaceCollector.emit(emitter); } } } @@ -253,11 +240,10 @@ private Pair getReadableName(String name) } } - void emit(ServiceEmitter emitter, Map dimensions) + void emit(ServiceEmitter emitter) { ImmutableMap.Builder dimensionsCopyBuilder = ImmutableMap .builder() - .putAll(dimensions) .put("gcGen", new String[]{generation}); dimensionsCopyBuilder.put("gcName", new String[]{collectorName}); @@ -289,10 +275,10 @@ public GcSpaceCollector(MemoryUsage collectionUsage, String name) spaces.add(new GcGenerationSpace(collectionUsage, name)); } - void emit(ServiceEmitter emitter, Map dimensions) + void emit(ServiceEmitter emitter) { for (GcGenerationSpace space : spaces) { - space.emit(emitter, dimensions); + space.emit(emitter); } } } @@ -308,11 +294,9 @@ public GcGenerationSpace(MemoryUsage memoryUsage, String name) this.name = name; } - void emit(ServiceEmitter emitter, Map dimensions) + void emit(ServiceEmitter emitter) { final ServiceMetricEvent.Builder builder = builder(); - MonitorUtils.addDimensionsToBuilder(builder, dimensions); - builder .setDimension(JVM_VERSION, JAVA_VERSION) .setDimension("gcGenSpaceName", name); diff --git a/processing/src/main/java/org/apache/druid/java/util/metrics/JvmThreadsMonitor.java b/processing/src/main/java/org/apache/druid/java/util/metrics/JvmThreadsMonitor.java index d81fab69429e..b37e38dd04a9 100644 --- a/processing/src/main/java/org/apache/druid/java/util/metrics/JvmThreadsMonitor.java +++ b/processing/src/main/java/org/apache/druid/java/util/metrics/JvmThreadsMonitor.java @@ -19,32 +19,25 @@ package org.apache.druid.java.util.metrics; -import com.google.common.base.Preconditions; -import com.google.common.collect.ImmutableMap; import org.apache.druid.java.util.emitter.service.ServiceEmitter; import org.apache.druid.java.util.emitter.service.ServiceMetricEvent; import java.lang.management.ManagementFactory; import java.lang.management.ThreadMXBean; -import java.util.Map; public class JvmThreadsMonitor extends FeedDefiningMonitor { - private final Map dimensions; - private int lastLiveThreads = 0; private long lastStartedThreads = 0; - public JvmThreadsMonitor(Map dimensions) + public JvmThreadsMonitor() { - this(dimensions, DEFAULT_METRICS_FEED); + this(DEFAULT_METRICS_FEED); } - public JvmThreadsMonitor(Map dimensions, String feed) + public JvmThreadsMonitor(String feed) { super(feed); - Preconditions.checkNotNull(dimensions, "dimensions"); - this.dimensions = ImmutableMap.copyOf(dimensions); } @Override @@ -53,7 +46,6 @@ public boolean doMonitor(ServiceEmitter emitter) ThreadMXBean threadBean = ManagementFactory.getThreadMXBean(); final ServiceMetricEvent.Builder builder = builder(); - MonitorUtils.addDimensionsToBuilder(builder, dimensions); // Because between next two calls on ThreadMXBean new threads can be started we can observe some inconsistency // in counters values and finished counter could be even negative diff --git a/processing/src/main/java/org/apache/druid/java/util/metrics/Monitors.java b/processing/src/main/java/org/apache/druid/java/util/metrics/Monitors.java index 3893f1a6feb3..7af8d54eed3a 100644 --- a/processing/src/main/java/org/apache/druid/java/util/metrics/Monitors.java +++ b/processing/src/main/java/org/apache/druid/java/util/metrics/Monitors.java @@ -20,22 +20,19 @@ package org.apache.druid.java.util.metrics; import java.util.List; -import java.util.Map; public class Monitors { /** * Creates a JVM monitor, configured with the given dimensions, that gathers all currently available JVM-wide * monitors. Emitted events have default feed {@link FeedDefiningMonitor#DEFAULT_METRICS_FEED} - * See: {@link Monitors#createCompoundJvmMonitor(Map, String)} - * - * @param dimensions common dimensions to configure the JVM monitor with + * See: {@link Monitors#createCompoundJvmMonitor(String)} * * @return a universally useful JVM-wide monitor */ - public static Monitor createCompoundJvmMonitor(Map dimensions) + public static Monitor createCompoundJvmMonitor() { - return createCompoundJvmMonitor(dimensions, FeedDefiningMonitor.DEFAULT_METRICS_FEED); + return createCompoundJvmMonitor(FeedDefiningMonitor.DEFAULT_METRICS_FEED); } /** @@ -43,20 +40,19 @@ public static Monitor createCompoundJvmMonitor(Map dimensions) * monitors: {@link JvmMonitor}, {@link JvmCpuMonitor} and {@link JvmThreadsMonitor} (this list may * change in any future release of this library, including a minor release). * - * @param dimensions common dimensions to configure the JVM monitor with * @param feed feed for all emitted events * * @return a universally useful JVM-wide monitor */ - public static Monitor createCompoundJvmMonitor(Map dimensions, String feed) + public static Monitor createCompoundJvmMonitor(String feed) { // This list doesn't include SysMonitor because it should probably be run only in one JVM, if several JVMs are // running on the same instance, so most of the time SysMonitor should be configured/set up differently than // "simple" JVM monitors, created below. return and(// Could equally be or(), because all member monitors always return true from their monitor() methods. - new JvmMonitor(dimensions, feed), - new JvmCpuMonitor(dimensions, feed), - new JvmThreadsMonitor(dimensions, feed) + new JvmMonitor(feed), + new JvmCpuMonitor(feed), + new JvmThreadsMonitor(feed) ); } diff --git a/processing/src/main/java/org/apache/druid/java/util/metrics/NoopOshiSysMonitor.java b/processing/src/main/java/org/apache/druid/java/util/metrics/NoopOshiSysMonitor.java index 9a693f7bfdf4..8d66ee21b391 100644 --- a/processing/src/main/java/org/apache/druid/java/util/metrics/NoopOshiSysMonitor.java +++ b/processing/src/main/java/org/apache/druid/java/util/metrics/NoopOshiSysMonitor.java @@ -20,14 +20,13 @@ package org.apache.druid.java.util.metrics; import com.google.common.collect.ImmutableList; -import com.google.common.collect.ImmutableMap; import org.apache.druid.java.util.emitter.service.ServiceEmitter; public class NoopOshiSysMonitor extends OshiSysMonitor { public NoopOshiSysMonitor() { - super(ImmutableMap.of(), new OshiSysMonitorConfig(ImmutableList.of())); + super(new OshiSysMonitorConfig(ImmutableList.of())); } @Override diff --git a/server/src/main/java/org/apache/druid/server/metrics/NoopTaskHolder.java b/processing/src/main/java/org/apache/druid/java/util/metrics/NoopTaskHolder.java similarity index 79% rename from server/src/main/java/org/apache/druid/server/metrics/NoopTaskHolder.java rename to processing/src/main/java/org/apache/druid/java/util/metrics/NoopTaskHolder.java index d870e4869e7b..5cfe957891da 100644 --- a/server/src/main/java/org/apache/druid/server/metrics/NoopTaskHolder.java +++ b/processing/src/main/java/org/apache/druid/java/util/metrics/NoopTaskHolder.java @@ -17,9 +17,10 @@ * under the License. */ -package org.apache.druid.server.metrics; +package org.apache.druid.java.util.metrics; import javax.annotation.Nullable; +import java.util.Map; /** * A TaskHolder implementation for all servers that are not {@code CliPeon}. @@ -42,4 +43,24 @@ public String getTaskId() { return null; } + + @Nullable + @Override + public String getTaskType() + { + return null; + } + + @Nullable + @Override + public String getGroupId() + { + return null; + } + + @Override + public Map getMetricDimensions() + { + return Map.of(); + } } diff --git a/processing/src/main/java/org/apache/druid/java/util/metrics/OshiSysMonitor.java b/processing/src/main/java/org/apache/druid/java/util/metrics/OshiSysMonitor.java index 130c353c8ad7..f60ae8e963c2 100644 --- a/processing/src/main/java/org/apache/druid/java/util/metrics/OshiSysMonitor.java +++ b/processing/src/main/java/org/apache/druid/java/util/metrics/OshiSysMonitor.java @@ -20,7 +20,6 @@ package org.apache.druid.java.util.metrics; import com.google.common.annotations.VisibleForTesting; -import com.google.common.base.Preconditions; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; import org.apache.druid.java.util.emitter.service.ServiceEmitter; @@ -73,7 +72,6 @@ public class OshiSysMonitor extends FeedDefiningMonitor private final SysStats sysStats; private final TcpStats tcpStats; - private final Map dimensions; private final OshiSysMonitorConfig config; private final Map> monitoringFunctions = ImmutableMap.of( "mem", this::monitorMemStats, @@ -86,18 +84,16 @@ public class OshiSysMonitor extends FeedDefiningMonitor "tcp", this::monitorTcpStats ); - public OshiSysMonitor(Map dimensions, OshiSysMonitorConfig config) + public OshiSysMonitor(OshiSysMonitorConfig config) { - this(dimensions, config, new SystemInfo()); + this(config, new SystemInfo()); } // Create an object with mocked systemInfo for testing purposes @VisibleForTesting - public OshiSysMonitor(Map dimensions, OshiSysMonitorConfig config, SystemInfo systemInfo) + public OshiSysMonitor(OshiSysMonitorConfig config, SystemInfo systemInfo) { super(DEFAULT_METRICS_FEED); - Preconditions.checkNotNull(dimensions); - this.dimensions = ImmutableMap.copyOf(dimensions); this.config = config; this.si = systemInfo; @@ -188,7 +184,6 @@ public void emit(ServiceEmitter emitter) mem.getAvailable() ); final ServiceMetricEvent.Builder builder = builder(); - MonitorUtils.addDimensionsToBuilder(builder, dimensions); for (Map.Entry entry : stats.entrySet()) { emitter.emit(builder.setMetric(entry.getKey(), entry.getValue())); } @@ -216,7 +211,6 @@ public void emit(ServiceEmitter emitter) ); final ServiceMetricEvent.Builder builder = builder(); - MonitorUtils.addDimensionsToBuilder(builder, dimensions); for (Map.Entry entry : stats.entrySet()) { emitter.emit(builder.setMetric(entry.getKey(), entry.getValue())); } @@ -243,7 +237,6 @@ public void emit(ServiceEmitter emitter) final ServiceMetricEvent.Builder builder = builder() .setDimension("fsDevName", fs.getVolume()) .setDimension("fsDirName", fs.getMount()); - MonitorUtils.addDimensionsToBuilder(builder, dimensions); for (Map.Entry entry : stats.entrySet()) { emitter.emit(builder.setMetric(entry.getKey(), entry.getValue())); } @@ -277,7 +270,6 @@ public void emit(ServiceEmitter emitter) if (stats != null) { final ServiceMetricEvent.Builder builder = builder() .setDimension("diskName", disk.getName()); - MonitorUtils.addDimensionsToBuilder(builder, dimensions); for (Map.Entry entry : stats.entrySet()) { emitter.emit(builder.setMetric(entry.getKey(), entry.getValue())); } @@ -319,7 +311,6 @@ public void emit(ServiceEmitter emitter) .setDimension("netName", net.getName()) .setDimension("netAddress", addr) .setDimension("netHwaddr", net.getMacaddr()); - MonitorUtils.addDimensionsToBuilder(builder, dimensions); for (Map.Entry entry : stats.entrySet()) { emitter.emit(builder.setMetric(entry.getKey(), entry.getValue())); } @@ -371,7 +362,6 @@ public void emit(ServiceEmitter emitter) final ServiceMetricEvent.Builder builder = builder() .setDimension("cpuName", name) .setDimension("cpuTime", entry.getKey()); - MonitorUtils.addDimensionsToBuilder(builder, dimensions); if (total != 0) { // prevent divide by 0 exception and don't emit such events emitter.emit(builder.setMetric("sys/cpu", entry.getValue() * 100 / total)); // [0,100] @@ -389,8 +379,6 @@ private class SysStats public void emit(ServiceEmitter emitter) { final ServiceMetricEvent.Builder builder = builder(); - MonitorUtils.addDimensionsToBuilder(builder, dimensions); - long uptime = os.getSystemUptime(); final Map stats = ImmutableMap.of( @@ -422,7 +410,6 @@ private class TcpStats public void emit(ServiceEmitter emitter) { final ServiceMetricEvent.Builder builder = builder(); - MonitorUtils.addDimensionsToBuilder(builder, dimensions); InternetProtocolStats ipstats = os.getInternetProtocolStats(); InternetProtocolStats.TcpStats tcpv4 = ipstats.getTCPv4Stats(); diff --git a/processing/src/main/java/org/apache/druid/java/util/metrics/SysMonitor.java b/processing/src/main/java/org/apache/druid/java/util/metrics/SysMonitor.java index 6082fb261e38..031574d5371b 100644 --- a/processing/src/main/java/org/apache/druid/java/util/metrics/SysMonitor.java +++ b/processing/src/main/java/org/apache/druid/java/util/metrics/SysMonitor.java @@ -20,7 +20,6 @@ package org.apache.druid.java.util.metrics; import com.google.common.base.Joiner; -import com.google.common.base.Preconditions; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; import org.apache.druid.java.util.common.logger.Logger; @@ -65,23 +64,14 @@ public class SysMonitor extends FeedDefiningMonitor private final List statsList; - private Map dimensions; - public SysMonitor() { - this(ImmutableMap.of()); - } - - public SysMonitor(Map dimensions) - { - this(dimensions, DEFAULT_METRICS_FEED); + this(DEFAULT_METRICS_FEED); } - public SysMonitor(Map dimensions, String feed) + public SysMonitor(String feed) { super(feed); - Preconditions.checkNotNull(dimensions); - this.dimensions = ImmutableMap.copyOf(dimensions); sigar.enableLogging(true); @@ -142,7 +132,6 @@ public void emit(ServiceEmitter emitter) "sys/mem/actual/free", mem.getActualFree() ); final ServiceMetricEvent.Builder builder = builder(); - MonitorUtils.addDimensionsToBuilder(builder, dimensions); for (Map.Entry entry : stats.entrySet()) { emitter.emit(builder.setMetric(entry.getKey(), entry.getValue())); } @@ -193,7 +182,6 @@ public void emit(ServiceEmitter emitter) ); final ServiceMetricEvent.Builder builder = builder(); - MonitorUtils.addDimensionsToBuilder(builder, dimensions); for (Map.Entry entry : stats.entrySet()) { emitter.emit(builder.setMetric(entry.getKey(), entry.getValue())); } @@ -233,7 +221,6 @@ public void emit(ServiceEmitter emitter) ); final ServiceMetricEvent.Builder builder = builder() .setDimension("fsDirName", dir); // fsDirName because FsStats uses fsDirName - MonitorUtils.addDimensionsToBuilder(builder, dimensions); for (Map.Entry entry : stats.entrySet()) { emitter.emit(builder.setMetric(entry.getKey(), entry.getValue())); } @@ -279,7 +266,6 @@ public void emit(ServiceEmitter emitter) .setDimension("fsTypeName", fs.getTypeName()) .setDimension("fsSysTypeName", fs.getSysTypeName()) .setDimension("fsOptions", fs.getOptions().split(",")); - MonitorUtils.addDimensionsToBuilder(builder, dimensions); for (Map.Entry entry : stats.entrySet()) { emitter.emit(builder.setMetric(entry.getKey(), entry.getValue())); } @@ -340,7 +326,6 @@ public void emit(ServiceEmitter emitter) .setDimension("fsTypeName", fs.getTypeName()) .setDimension("fsSysTypeName", fs.getSysTypeName()) .setDimension("fsOptions", fs.getOptions().split(",")); - MonitorUtils.addDimensionsToBuilder(builder, dimensions); for (Map.Entry entry : stats.entrySet()) { emitter.emit(builder.setMetric(entry.getKey(), entry.getValue())); } @@ -410,7 +395,6 @@ public void emit(ServiceEmitter emitter) .setDimension("netName", netconf.getName()) .setDimension("netAddress", netconf.getAddress()) .setDimension("netHwaddr", netconf.getHwaddr()); - MonitorUtils.addDimensionsToBuilder(builder, dimensions); for (Map.Entry entry : stats.entrySet()) { emitter.emit(builder.setMetric(entry.getKey(), entry.getValue())); } @@ -463,7 +447,6 @@ public void emit(ServiceEmitter emitter) final ServiceMetricEvent.Builder builder = builder() .setDimension("cpuName", name) .setDimension("cpuTime", entry.getKey()); - MonitorUtils.addDimensionsToBuilder(builder, dimensions); emitter.emit(builder.setMetric("sys/cpu", entry.getValue() * 100 / total)); // [0,100] } } @@ -478,7 +461,6 @@ private class SysStats implements Stats public void emit(ServiceEmitter emitter) { final ServiceMetricEvent.Builder builder = builder(); - MonitorUtils.addDimensionsToBuilder(builder, dimensions); Uptime uptime = null; try { @@ -526,7 +508,6 @@ private class TcpStats implements Stats public void emit(ServiceEmitter emitter) { final ServiceMetricEvent.Builder builder = builder(); - MonitorUtils.addDimensionsToBuilder(builder, dimensions); Tcp tcp = null; try { diff --git a/server/src/main/java/org/apache/druid/server/metrics/TaskHolder.java b/processing/src/main/java/org/apache/druid/java/util/metrics/TaskHolder.java similarity index 70% rename from server/src/main/java/org/apache/druid/server/metrics/TaskHolder.java rename to processing/src/main/java/org/apache/druid/java/util/metrics/TaskHolder.java index 5f6bb4e07212..9f0f11077bc7 100644 --- a/server/src/main/java/org/apache/druid/server/metrics/TaskHolder.java +++ b/processing/src/main/java/org/apache/druid/java/util/metrics/TaskHolder.java @@ -17,9 +17,10 @@ * under the License. */ -package org.apache.druid.server.metrics; +package org.apache.druid.java.util.metrics; import javax.annotation.Nullable; +import java.util.Map; /** * Provides identifying information for a task. Implementations return {@code null} @@ -38,4 +39,21 @@ public interface TaskHolder */ @Nullable String getTaskId(); + + /** + * @return the type name of this task, or {@code null} if called from a server that is not {@code CliPeon}. + */ + @Nullable + String getTaskType(); + + /** + * @return the group ID of this task, or {@code null} if called from a server that is not {@code CliPeon}. + */ + @Nullable + String getGroupId(); + + /** + * @return a map of task holder dimensions, or an empty map if called from a server that is not {@code CliPeon}. + */ + Map getMetricDimensions(); } diff --git a/processing/src/test/java/org/apache/druid/java/util/metrics/CgroupCpuMonitorTest.java b/processing/src/test/java/org/apache/druid/java/util/metrics/CgroupCpuMonitorTest.java index 990addd6d82e..b8f4bd288828 100644 --- a/processing/src/test/java/org/apache/druid/java/util/metrics/CgroupCpuMonitorTest.java +++ b/processing/src/test/java/org/apache/druid/java/util/metrics/CgroupCpuMonitorTest.java @@ -19,7 +19,6 @@ package org.apache.druid.java.util.metrics; -import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableSet; import org.apache.druid.java.util.common.FileUtils; import org.apache.druid.java.util.emitter.core.Event; @@ -76,8 +75,8 @@ public void setUp() throws IOException @Test public void testMonitor() throws IOException, InterruptedException { - final CgroupCpuMonitor monitor = new CgroupCpuMonitor(discoverer, ImmutableMap.of(), "some_feed"); - final StubServiceEmitter emitter = new StubServiceEmitter("service", "host"); + final CgroupCpuMonitor monitor = new CgroupCpuMonitor(discoverer, "some_feed"); + final StubServiceEmitter emitter = StubServiceEmitter.createStarted(); Assert.assertTrue(monitor.doMonitor(emitter)); final List actualEvents = emitter.getEvents(); Assert.assertEquals(2, actualEvents.size()); @@ -131,9 +130,9 @@ public void testCgroupsV2Detection() throws IOException, URISyntaxException CgroupDiscoverer v2Discoverer = ProcSelfCgroupDiscoverer.autoCgroupDiscoverer(procV2Dir.toPath()); // Constructor should detect v2 and log warning - CgroupCpuMonitor monitor = new CgroupCpuMonitor(v2Discoverer, ImmutableMap.of(), "test-feed"); + CgroupCpuMonitor monitor = new CgroupCpuMonitor(v2Discoverer, "test-feed"); - final StubServiceEmitter emitter = new StubServiceEmitter("service", "host"); + final StubServiceEmitter emitter = StubServiceEmitter.createStarted(); // doMonitor should return true Assert.assertTrue(monitor.doMonitor(emitter)); @@ -147,8 +146,8 @@ public void testCgroupsV1MonitoringContinuesNormally() throws IOException, Inter { // This test verifies that the existing v1 monitoring continues to work // after the v2 detection changes - final CgroupCpuMonitor monitor = new CgroupCpuMonitor(discoverer, ImmutableMap.of(), "some_feed"); - final StubServiceEmitter emitter = new StubServiceEmitter("service", "host"); + final CgroupCpuMonitor monitor = new CgroupCpuMonitor(discoverer, "some_feed"); + final StubServiceEmitter emitter = StubServiceEmitter.createStarted(); Assert.assertTrue(monitor.doMonitor(emitter)); final List actualEvents = emitter.getEvents(); diff --git a/processing/src/test/java/org/apache/druid/java/util/metrics/CgroupCpuSetMonitorTest.java b/processing/src/test/java/org/apache/druid/java/util/metrics/CgroupCpuSetMonitorTest.java index 71ea59ef5fa6..db17161869d1 100644 --- a/processing/src/test/java/org/apache/druid/java/util/metrics/CgroupCpuSetMonitorTest.java +++ b/processing/src/test/java/org/apache/druid/java/util/metrics/CgroupCpuSetMonitorTest.java @@ -19,7 +19,6 @@ package org.apache.druid.java.util.metrics; -import com.google.common.collect.ImmutableMap; import org.apache.druid.java.util.common.FileUtils; import org.apache.druid.java.util.metrics.cgroups.CgroupDiscoverer; import org.apache.druid.java.util.metrics.cgroups.CgroupVersion; @@ -71,8 +70,8 @@ public void setUp() throws IOException @Test public void testMonitor() { - final CgroupCpuSetMonitor monitor = new CgroupCpuSetMonitor(discoverer, ImmutableMap.of(), "some_feed"); - final StubServiceEmitter emitter = new StubServiceEmitter("service", "host"); + final CgroupCpuSetMonitor monitor = new CgroupCpuSetMonitor(discoverer, "some_feed"); + final StubServiceEmitter emitter = StubServiceEmitter.createStarted(); Assert.assertTrue(monitor.doMonitor(emitter)); Assert.assertEquals(4, emitter.getNumEmittedEvents()); @@ -101,9 +100,9 @@ public void testCgroupsV2DetectionInConstructor() throws IOException Assert.assertEquals(CgroupVersion.V2, v2Discoverer.getCgroupVersion()); // Constructor should detect v2 and log warning - CgroupCpuSetMonitor monitor = new CgroupCpuSetMonitor(v2Discoverer, ImmutableMap.of(), "test-feed"); + CgroupCpuSetMonitor monitor = new CgroupCpuSetMonitor(v2Discoverer, "test-feed"); - final StubServiceEmitter emitter = new StubServiceEmitter("service", "host"); + final StubServiceEmitter emitter = StubServiceEmitter.createStarted(); // doMonitor should return true but skip actual monitoring Assert.assertTrue(monitor.doMonitor(emitter)); diff --git a/processing/src/test/java/org/apache/druid/java/util/metrics/CgroupDiskMonitorTest.java b/processing/src/test/java/org/apache/druid/java/util/metrics/CgroupDiskMonitorTest.java index d75af320f125..ad07a35a6002 100644 --- a/processing/src/test/java/org/apache/druid/java/util/metrics/CgroupDiskMonitorTest.java +++ b/processing/src/test/java/org/apache/druid/java/util/metrics/CgroupDiskMonitorTest.java @@ -19,7 +19,6 @@ package org.apache.druid.java.util.metrics; -import com.google.common.collect.ImmutableMap; import org.apache.druid.java.util.common.FileUtils; import org.apache.druid.java.util.metrics.cgroups.CgroupDiscoverer; import org.apache.druid.java.util.metrics.cgroups.ProcCgroupDiscoverer; @@ -65,8 +64,8 @@ public void setUp() throws IOException @Test public void testMonitor() throws IOException { - final CgroupDiskMonitor monitor = new CgroupDiskMonitor(discoverer, ImmutableMap.of(), "some_feed"); - final StubServiceEmitter emitter = new StubServiceEmitter("service", "host"); + final CgroupDiskMonitor monitor = new CgroupDiskMonitor(discoverer, "some_feed"); + final StubServiceEmitter emitter = StubServiceEmitter.createStarted(); Assert.assertTrue(monitor.doMonitor(emitter)); Assert.assertEquals(0, emitter.getNumEmittedEvents()); diff --git a/processing/src/test/java/org/apache/druid/java/util/metrics/CgroupMemoryMonitorTest.java b/processing/src/test/java/org/apache/druid/java/util/metrics/CgroupMemoryMonitorTest.java index 6538cb9691a0..8daad7e543fb 100644 --- a/processing/src/test/java/org/apache/druid/java/util/metrics/CgroupMemoryMonitorTest.java +++ b/processing/src/test/java/org/apache/druid/java/util/metrics/CgroupMemoryMonitorTest.java @@ -19,7 +19,6 @@ package org.apache.druid.java.util.metrics; -import com.google.common.collect.ImmutableMap; import org.apache.druid.java.util.common.FileUtils; import org.apache.druid.java.util.metrics.cgroups.CgroupDiscoverer; import org.apache.druid.java.util.metrics.cgroups.ProcCgroupDiscoverer; @@ -66,7 +65,7 @@ public void setUp() throws IOException @Test public void testMonitor() { - final CgroupMemoryMonitor monitor = new CgroupMemoryMonitor(discoverer, ImmutableMap.of(), "some_feed"); + final CgroupMemoryMonitor monitor = new CgroupMemoryMonitor(discoverer, "some_feed"); final StubServiceEmitter emitter = new StubServiceEmitter("service", "host"); Assert.assertTrue(monitor.doMonitor(emitter)); Assert.assertEquals(46, emitter.getNumEmittedEvents()); diff --git a/processing/src/test/java/org/apache/druid/java/util/metrics/CgroupV2CpuMonitorTest.java b/processing/src/test/java/org/apache/druid/java/util/metrics/CgroupV2CpuMonitorTest.java index 9a32dbbf86aa..48544d41a021 100644 --- a/processing/src/test/java/org/apache/druid/java/util/metrics/CgroupV2CpuMonitorTest.java +++ b/processing/src/test/java/org/apache/druid/java/util/metrics/CgroupV2CpuMonitorTest.java @@ -61,7 +61,7 @@ public void setUp() throws IOException public void testMonitor() throws IOException, InterruptedException { final CgroupV2CpuMonitor monitor = new CgroupV2CpuMonitor(discoverer); - final StubServiceEmitter emitter = new StubServiceEmitter("service", "host"); + final StubServiceEmitter emitter = StubServiceEmitter.createStarted(); Assert.assertTrue(monitor.doMonitor(emitter)); Assert.assertEquals(2, emitter.getNumEmittedEvents()); diff --git a/processing/src/test/java/org/apache/druid/java/util/metrics/CgroupV2DiskMonitorTest.java b/processing/src/test/java/org/apache/druid/java/util/metrics/CgroupV2DiskMonitorTest.java index f9a977edd92e..119937e47a93 100644 --- a/processing/src/test/java/org/apache/druid/java/util/metrics/CgroupV2DiskMonitorTest.java +++ b/processing/src/test/java/org/apache/druid/java/util/metrics/CgroupV2DiskMonitorTest.java @@ -59,7 +59,7 @@ public void setUp() throws IOException public void testMonitor() throws IOException { final CgroupV2DiskMonitor monitor = new CgroupV2DiskMonitor(discoverer); - final StubServiceEmitter emitter = new StubServiceEmitter("service", "host"); + final StubServiceEmitter emitter = StubServiceEmitter.createStarted(); Assert.assertTrue(monitor.doMonitor(emitter)); Assert.assertEquals(0, emitter.getNumEmittedEvents()); diff --git a/processing/src/test/java/org/apache/druid/java/util/metrics/CgroupV2MemoryMonitorTest.java b/processing/src/test/java/org/apache/druid/java/util/metrics/CgroupV2MemoryMonitorTest.java index eb8f57a9c466..082f89b0acd8 100644 --- a/processing/src/test/java/org/apache/druid/java/util/metrics/CgroupV2MemoryMonitorTest.java +++ b/processing/src/test/java/org/apache/druid/java/util/metrics/CgroupV2MemoryMonitorTest.java @@ -19,7 +19,6 @@ package org.apache.druid.java.util.metrics; -import com.google.common.collect.ImmutableMap; import org.apache.druid.java.util.emitter.core.Event; import org.apache.druid.java.util.metrics.cgroups.CgroupDiscoverer; import org.apache.druid.java.util.metrics.cgroups.ProcCgroupV2Discoverer; @@ -65,7 +64,6 @@ public void testMonitor() { final CgroupV2MemoryMonitor monitor = new CgroupV2MemoryMonitor( discoverer, - ImmutableMap.of(), FeedDefiningMonitor.DEFAULT_METRICS_FEED ); final StubServiceEmitter emitter = new StubServiceEmitter("service", "host"); diff --git a/processing/src/test/java/org/apache/druid/java/util/metrics/CpuAcctDeltaMonitorTest.java b/processing/src/test/java/org/apache/druid/java/util/metrics/CpuAcctDeltaMonitorTest.java index fd7a0d0ef54b..c33b7b789f9a 100644 --- a/processing/src/test/java/org/apache/druid/java/util/metrics/CpuAcctDeltaMonitorTest.java +++ b/processing/src/test/java/org/apache/druid/java/util/metrics/CpuAcctDeltaMonitorTest.java @@ -19,7 +19,6 @@ package org.apache.druid.java.util.metrics; -import com.google.common.collect.ImmutableMap; import org.apache.druid.java.util.common.FileUtils; import org.apache.druid.java.util.common.StringUtils; import org.apache.druid.java.util.metrics.cgroups.CgroupDiscoverer; @@ -69,7 +68,6 @@ public void testMonitorWontCrash() { final CpuAcctDeltaMonitor monitor = new CpuAcctDeltaMonitor( "some_feed", - ImmutableMap.of(), TestUtils.exceptionThrowingDiscoverer() ); final StubServiceEmitter emitter = new StubServiceEmitter("service", "host"); @@ -91,7 +89,6 @@ public void testSimpleMonitor() throws Exception } final CpuAcctDeltaMonitor monitor = new CpuAcctDeltaMonitor( "some_feed", - ImmutableMap.of(), new CgroupDiscoverer() { @Override diff --git a/processing/src/test/java/org/apache/druid/java/util/metrics/MonitorsTest.java b/processing/src/test/java/org/apache/druid/java/util/metrics/MonitorsTest.java index 6f58fa47d454..4e00093cfb27 100644 --- a/processing/src/test/java/org/apache/druid/java/util/metrics/MonitorsTest.java +++ b/processing/src/test/java/org/apache/druid/java/util/metrics/MonitorsTest.java @@ -19,7 +19,6 @@ package org.apache.druid.java.util.metrics; -import com.google.common.collect.ImmutableMap; import org.apache.druid.java.util.common.StringUtils; import org.apache.druid.java.util.emitter.core.Event; import org.junit.Assert; @@ -45,7 +44,7 @@ public void testSetFeed() { String feed = "testFeed"; StubServiceEmitter emitter = new StubServiceEmitter("dev/monitor-test", "localhost:0000"); - Monitor m = Monitors.createCompoundJvmMonitor(ImmutableMap.of(), feed); + Monitor m = Monitors.createCompoundJvmMonitor(feed); m.start(); m.monitor(emitter); m.stop(); @@ -56,7 +55,7 @@ public void testSetFeed() public void testDefaultFeed() { StubServiceEmitter emitter = new StubServiceEmitter("dev/monitor-test", "localhost:0000"); - Monitor m = Monitors.createCompoundJvmMonitor(ImmutableMap.of()); + Monitor m = Monitors.createCompoundJvmMonitor(); m.start(); m.monitor(emitter); m.stop(); diff --git a/processing/src/test/java/org/apache/druid/java/util/metrics/OshiSysMonitorTest.java b/processing/src/test/java/org/apache/druid/java/util/metrics/OshiSysMonitorTest.java index d6c982169fe5..bfc28f99ee9f 100644 --- a/processing/src/test/java/org/apache/druid/java/util/metrics/OshiSysMonitorTest.java +++ b/processing/src/test/java/org/apache/druid/java/util/metrics/OshiSysMonitorTest.java @@ -647,6 +647,6 @@ private OshiSysMonitor createMonitor(SystemInfo si) private OshiSysMonitor createMonitor(SystemInfo si, List categories) { - return new OshiSysMonitor(ImmutableMap.of(), new OshiSysMonitorConfig(categories), si); + return new OshiSysMonitor(new OshiSysMonitorConfig(categories), si); } } diff --git a/processing/src/test/java/org/apache/druid/java/util/metrics/StubServiceEmitter.java b/processing/src/test/java/org/apache/druid/java/util/metrics/StubServiceEmitter.java index 8d1146441319..4bcdd58f182c 100644 --- a/processing/src/test/java/org/apache/druid/java/util/metrics/StubServiceEmitter.java +++ b/processing/src/test/java/org/apache/druid/java/util/metrics/StubServiceEmitter.java @@ -19,12 +19,15 @@ package org.apache.druid.java.util.metrics; +import com.google.common.collect.ImmutableMap; import org.apache.druid.java.util.emitter.core.Event; +import org.apache.druid.java.util.emitter.core.NoopEmitter; import org.apache.druid.java.util.emitter.service.AlertEvent; import org.apache.druid.java.util.emitter.service.ServiceEmitter; import org.apache.druid.java.util.emitter.service.ServiceMetricEvent; import javax.annotation.Nullable; +import java.io.IOException; import java.util.ArrayDeque; import java.util.ArrayList; import java.util.Collections; @@ -41,18 +44,32 @@ */ public class StubServiceEmitter extends ServiceEmitter implements MetricsVerifier { + public static final String TYPE = "stub"; + private final Deque events = new ConcurrentLinkedDeque<>(); private final Deque alertEvents = new ConcurrentLinkedDeque<>(); private final ConcurrentHashMap> metricEvents = new ConcurrentHashMap<>(); public StubServiceEmitter() { - super("testing", "localhost", null); + this("testing", "localhost"); } public StubServiceEmitter(String service, String host) { - super(service, host, null); + this(service, host, new NoopTaskHolder()); + } + + public StubServiceEmitter(String service, String host, TaskHolder taskHolder) + { + super(service, host, new NoopEmitter(), ImmutableMap.of(), taskHolder); + } + + public static StubServiceEmitter createStarted() + { + final StubServiceEmitter stubServiceEmitter = new StubServiceEmitter(); + stubServiceEmitter.start(); + return stubServiceEmitter; } @Override @@ -103,7 +120,7 @@ public List getAlerts() @Override public List getMetricValues( String metricName, - Map dimensionFilters + @Nullable Map dimensionFilters ) { final List values = new ArrayList<>(); @@ -160,6 +177,7 @@ public Number getLatestMetricEventValue(String metricName, Number defaultValue) @Override public void start() { + super.start(); } @Override @@ -173,5 +191,11 @@ public void flush() @Override public void close() { + try { + super.close(); + } + catch (IOException e) { + throw new RuntimeException(e); + } } } diff --git a/processing/src/test/java/org/apache/druid/java/util/metrics/StubServiceEmitterModule.java b/processing/src/test/java/org/apache/druid/java/util/metrics/StubServiceEmitterModule.java new file mode 100644 index 000000000000..cb244b19174b --- /dev/null +++ b/processing/src/test/java/org/apache/druid/java/util/metrics/StubServiceEmitterModule.java @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.java.util.metrics; + +import com.google.inject.Binder; +import com.google.inject.Module; +import com.google.inject.Provides; +import com.google.inject.name.Named; +import org.apache.druid.guice.ManageLifecycle; +import org.apache.druid.java.util.emitter.core.Emitter; + +public class StubServiceEmitterModule implements Module +{ + @Override + public void configure(Binder binder) + { + } + + @Provides + @ManageLifecycle + @Named(StubServiceEmitter.TYPE) + public Emitter getEmitter(TaskHolder taskHolder) + { + return new StubServiceEmitter("test", "host", taskHolder); + } +} diff --git a/processing/src/test/java/org/apache/druid/query/CPUTimeMetricQueryRunnerTest.java b/processing/src/test/java/org/apache/druid/query/CPUTimeMetricQueryRunnerTest.java index 32d9852d0c1b..b55f19c62be3 100644 --- a/processing/src/test/java/org/apache/druid/query/CPUTimeMetricQueryRunnerTest.java +++ b/processing/src/test/java/org/apache/druid/query/CPUTimeMetricQueryRunnerTest.java @@ -41,7 +41,7 @@ public class CPUTimeMetricQueryRunnerTest @Test public void testCpuTimeMetric() { - final StubServiceEmitter emitter = new StubServiceEmitter("s", "h"); + final StubServiceEmitter emitter = StubServiceEmitter.createStarted(); final AtomicLong accumulator = new AtomicLong(); final List> expectedResults = Collections.singletonList( diff --git a/processing/src/test/java/org/apache/druid/query/DefaultQueryMetricsTest.java b/processing/src/test/java/org/apache/druid/query/DefaultQueryMetricsTest.java index ee82f206f79c..8cea00215e05 100644 --- a/processing/src/test/java/org/apache/druid/query/DefaultQueryMetricsTest.java +++ b/processing/src/test/java/org/apache/druid/query/DefaultQueryMetricsTest.java @@ -48,7 +48,7 @@ public class DefaultQueryMetricsTest extends InitializedNullHandlingTest @Test public void testDefaultQueryMetricsQuery() { - final StubServiceEmitter serviceEmitter = new StubServiceEmitter("", ""); + final StubServiceEmitter serviceEmitter = StubServiceEmitter.createStarted(); DefaultQueryMetrics> queryMetrics = new DefaultQueryMetrics<>(); TopNQuery query = new TopNQueryBuilder() .dataSource("xx") @@ -75,8 +75,8 @@ public void testDefaultQueryMetricsQuery() Assert.assertEquals(13, actualEvent.size()); Assert.assertTrue(actualEvent.containsKey("feed")); Assert.assertTrue(actualEvent.containsKey("timestamp")); - Assert.assertEquals("", actualEvent.get("host")); - Assert.assertEquals("", actualEvent.get("service")); + Assert.assertEquals("localhost", actualEvent.get("host")); + Assert.assertEquals("testing", actualEvent.get("service")); Assert.assertEquals("xx", actualEvent.get(DruidMetrics.DATASOURCE)); Assert.assertEquals(query.getType(), actualEvent.get(DruidMetrics.TYPE)); List expectedIntervals = QueryRunnerTestHelper.FULL_ON_INTERVAL_SPEC.getIntervals(); @@ -144,7 +144,7 @@ public static void testQueryMetricsDefaultMetricNamesAndUnits( @Test public void testVectorizedDimensionInMetrics() { - final StubServiceEmitter serviceEmitter = new StubServiceEmitter("", ""); + final StubServiceEmitter serviceEmitter = StubServiceEmitter.createStarted(); DefaultQueryMetrics> queryMetrics = new DefaultQueryMetrics<>(); queryMetrics.vectorized(true); queryMetrics.reportSegmentTime(0).emit(serviceEmitter); @@ -152,8 +152,8 @@ public void testVectorizedDimensionInMetrics() Assert.assertEquals(7, actualEvent.size()); Assert.assertTrue(actualEvent.containsKey("feed")); Assert.assertTrue(actualEvent.containsKey("timestamp")); - Assert.assertEquals("", actualEvent.get("host")); - Assert.assertEquals("", actualEvent.get("service")); + Assert.assertEquals("localhost", actualEvent.get("host")); + Assert.assertEquals("testing", actualEvent.get("service")); Assert.assertEquals("query/segment/time", actualEvent.get("metric")); Assert.assertEquals(0L, actualEvent.get("value")); Assert.assertEquals(true, actualEvent.get("vectorized")); diff --git a/processing/src/test/java/org/apache/druid/query/groupby/DefaultGroupByQueryMetricsTest.java b/processing/src/test/java/org/apache/druid/query/groupby/DefaultGroupByQueryMetricsTest.java index 80c0ad405883..66f89a92cec6 100644 --- a/processing/src/test/java/org/apache/druid/query/groupby/DefaultGroupByQueryMetricsTest.java +++ b/processing/src/test/java/org/apache/druid/query/groupby/DefaultGroupByQueryMetricsTest.java @@ -51,7 +51,7 @@ public class DefaultGroupByQueryMetricsTest extends InitializedNullHandlingTest @Test public void testDefaultGroupByQueryMetricsQuery() { - final StubServiceEmitter serviceEmitter = new StubServiceEmitter("", ""); + final StubServiceEmitter serviceEmitter = StubServiceEmitter.createStarted(); DefaultGroupByQueryMetrics queryMetrics = new DefaultGroupByQueryMetrics(); GroupByQuery.Builder builder = GroupByQuery .builder() @@ -78,8 +78,8 @@ public void testDefaultGroupByQueryMetricsQuery() Assert.assertEquals(16, actualEvent.size()); Assert.assertTrue(actualEvent.containsKey("feed")); Assert.assertTrue(actualEvent.containsKey("timestamp")); - Assert.assertEquals("", actualEvent.get("host")); - Assert.assertEquals("", actualEvent.get("service")); + Assert.assertEquals("localhost", actualEvent.get("host")); + Assert.assertEquals("testing", actualEvent.get("service")); Assert.assertEquals(QueryRunnerTestHelper.DATA_SOURCE, actualEvent.get(DruidMetrics.DATASOURCE)); Assert.assertEquals(query.getType(), actualEvent.get(DruidMetrics.TYPE)); Interval expectedInterval = Intervals.of("2011-04-02/2011-04-04"); diff --git a/processing/src/test/java/org/apache/druid/query/search/DefaultSearchQueryMetricsTest.java b/processing/src/test/java/org/apache/druid/query/search/DefaultSearchQueryMetricsTest.java index f29f93ff8624..8f613b71a9ed 100644 --- a/processing/src/test/java/org/apache/druid/query/search/DefaultSearchQueryMetricsTest.java +++ b/processing/src/test/java/org/apache/druid/query/search/DefaultSearchQueryMetricsTest.java @@ -47,7 +47,7 @@ public class DefaultSearchQueryMetricsTest extends InitializedNullHandlingTest @Test public void testDefaultSearchQueryMetricsQuery() { - final StubServiceEmitter serviceEmitter = new StubServiceEmitter("", ""); + final StubServiceEmitter serviceEmitter = StubServiceEmitter.createStarted(); SearchQuery query = Druids .newSearchQueryBuilder() .dataSource(QueryRunnerTestHelper.DATA_SOURCE) @@ -70,8 +70,8 @@ public void testDefaultSearchQueryMetricsQuery() Assert.assertEquals(13, actualEvent.size()); Assert.assertTrue(actualEvent.containsKey("feed")); Assert.assertTrue(actualEvent.containsKey("timestamp")); - Assert.assertEquals("", actualEvent.get("host")); - Assert.assertEquals("", actualEvent.get("service")); + Assert.assertEquals("localhost", actualEvent.get("host")); + Assert.assertEquals("testing", actualEvent.get("service")); Assert.assertEquals(QueryRunnerTestHelper.DATA_SOURCE, actualEvent.get(DruidMetrics.DATASOURCE)); Assert.assertEquals(query.getType(), actualEvent.get(DruidMetrics.TYPE)); List expectedIntervals = QueryRunnerTestHelper.FULL_ON_INTERVAL_SPEC.getIntervals(); diff --git a/processing/src/test/java/org/apache/druid/query/timeseries/DefaultTimeseriesQueryMetricsTest.java b/processing/src/test/java/org/apache/druid/query/timeseries/DefaultTimeseriesQueryMetricsTest.java index 61b9dc5edba8..0b426b08be5c 100644 --- a/processing/src/test/java/org/apache/druid/query/timeseries/DefaultTimeseriesQueryMetricsTest.java +++ b/processing/src/test/java/org/apache/druid/query/timeseries/DefaultTimeseriesQueryMetricsTest.java @@ -44,7 +44,7 @@ public class DefaultTimeseriesQueryMetricsTest extends InitializedNullHandlingTe @Test public void testDefaultTimeseriesQueryMetricsQuery() { - final StubServiceEmitter serviceEmitter = new StubServiceEmitter("", ""); + final StubServiceEmitter serviceEmitter = StubServiceEmitter.createStarted(); DefaultTimeseriesQueryMetrics queryMetrics = new DefaultTimeseriesQueryMetrics(); TimeseriesQuery query = Druids .newTimeseriesQueryBuilder() @@ -63,8 +63,8 @@ public void testDefaultTimeseriesQueryMetricsQuery() Assert.assertEquals(16, actualEvent.size()); Assert.assertTrue(actualEvent.containsKey("feed")); Assert.assertTrue(actualEvent.containsKey("timestamp")); - Assert.assertEquals("", actualEvent.get("host")); - Assert.assertEquals("", actualEvent.get("service")); + Assert.assertEquals("localhost", actualEvent.get("host")); + Assert.assertEquals("testing", actualEvent.get("service")); Assert.assertEquals(QueryRunnerTestHelper.DATA_SOURCE, actualEvent.get(DruidMetrics.DATASOURCE)); Assert.assertEquals(query.getType(), actualEvent.get(DruidMetrics.TYPE)); List expectedIntervals = QueryRunnerTestHelper.FULL_ON_INTERVAL_SPEC.getIntervals(); diff --git a/processing/src/test/java/org/apache/druid/query/topn/DefaultTopNQueryMetricsTest.java b/processing/src/test/java/org/apache/druid/query/topn/DefaultTopNQueryMetricsTest.java index e232e6a64052..074ea10330a0 100644 --- a/processing/src/test/java/org/apache/druid/query/topn/DefaultTopNQueryMetricsTest.java +++ b/processing/src/test/java/org/apache/druid/query/topn/DefaultTopNQueryMetricsTest.java @@ -49,7 +49,7 @@ public class DefaultTopNQueryMetricsTest extends InitializedNullHandlingTest @Test public void testDefaultTopNQueryMetricsQuery() { - final StubServiceEmitter serviceEmitter = new StubServiceEmitter("", ""); + final StubServiceEmitter serviceEmitter = StubServiceEmitter.createStarted(); DefaultTopNQueryMetrics queryMetrics = new DefaultTopNQueryMetrics(); TopNQuery query = new TopNQueryBuilder() .dataSource("xx") @@ -73,8 +73,8 @@ public void testDefaultTopNQueryMetricsQuery() Assert.assertEquals(17, actualEvent.size()); Assert.assertTrue(actualEvent.containsKey("feed")); Assert.assertTrue(actualEvent.containsKey("timestamp")); - Assert.assertEquals("", actualEvent.get("host")); - Assert.assertEquals("", actualEvent.get("service")); + Assert.assertEquals("localhost", actualEvent.get("host")); + Assert.assertEquals("testing", actualEvent.get("service")); Assert.assertEquals("xx", actualEvent.get(DruidMetrics.DATASOURCE)); Assert.assertEquals(query.getType(), actualEvent.get(DruidMetrics.TYPE)); List expectedIntervals = QueryRunnerTestHelper.FULL_ON_INTERVAL_SPEC.getIntervals(); diff --git a/server/src/main/java/org/apache/druid/guice/DefaultServerHolderModule.java b/server/src/main/java/org/apache/druid/guice/DefaultServerHolderModule.java index bd8d9c3c1f23..927223b6ee77 100644 --- a/server/src/main/java/org/apache/druid/guice/DefaultServerHolderModule.java +++ b/server/src/main/java/org/apache/druid/guice/DefaultServerHolderModule.java @@ -24,10 +24,10 @@ import org.apache.druid.discovery.NodeRole; import org.apache.druid.guice.annotations.ExcludeScope; import org.apache.druid.initialization.DruidModule; +import org.apache.druid.java.util.metrics.NoopTaskHolder; +import org.apache.druid.java.util.metrics.TaskHolder; import org.apache.druid.server.metrics.DefaultLoadSpecHolder; import org.apache.druid.server.metrics.LoadSpecHolder; -import org.apache.druid.server.metrics.NoopTaskHolder; -import org.apache.druid.server.metrics.TaskHolder; /** * Binds the following holder configs for all servers except {@code CliPeon}: diff --git a/server/src/main/java/org/apache/druid/query/lookup/LookupListeningAnnouncerConfig.java b/server/src/main/java/org/apache/druid/query/lookup/LookupListeningAnnouncerConfig.java index ffc8447a9920..da9b46e66076 100644 --- a/server/src/main/java/org/apache/druid/query/lookup/LookupListeningAnnouncerConfig.java +++ b/server/src/main/java/org/apache/druid/query/lookup/LookupListeningAnnouncerConfig.java @@ -24,9 +24,9 @@ import com.fasterxml.jackson.annotation.JsonProperty; import com.google.common.base.Preconditions; import org.apache.druid.java.util.common.StringUtils; +import org.apache.druid.java.util.metrics.TaskHolder; import org.apache.druid.server.lookup.cache.LookupLoadingSpec; import org.apache.druid.server.metrics.LoadSpecHolder; -import org.apache.druid.server.metrics.TaskHolder; class LookupListeningAnnouncerConfig { diff --git a/server/src/main/java/org/apache/druid/segment/realtime/ChatHandlerResource.java b/server/src/main/java/org/apache/druid/segment/realtime/ChatHandlerResource.java index 5745e42ac68d..32775fcdf063 100644 --- a/server/src/main/java/org/apache/druid/segment/realtime/ChatHandlerResource.java +++ b/server/src/main/java/org/apache/druid/segment/realtime/ChatHandlerResource.java @@ -23,9 +23,9 @@ import com.google.common.collect.Iterables; import com.google.inject.Inject; import org.apache.druid.java.util.common.StringUtils; +import org.apache.druid.java.util.metrics.TaskHolder; import org.apache.druid.server.initialization.jetty.BadRequestException; import org.apache.druid.server.initialization.jetty.ServiceUnavailableException; -import org.apache.druid.server.metrics.TaskHolder; import javax.annotation.Nullable; import javax.ws.rs.Path; diff --git a/server/src/main/java/org/apache/druid/server/emitter/EmitterModule.java b/server/src/main/java/org/apache/druid/server/emitter/EmitterModule.java index 686d993873b6..41312c592086 100644 --- a/server/src/main/java/org/apache/druid/server/emitter/EmitterModule.java +++ b/server/src/main/java/org/apache/druid/server/emitter/EmitterModule.java @@ -42,6 +42,7 @@ import org.apache.druid.java.util.emitter.EmittingLogger; import org.apache.druid.java.util.emitter.core.Emitter; import org.apache.druid.java.util.emitter.service.ServiceEmitter; +import org.apache.druid.java.util.metrics.TaskHolder; import org.apache.druid.server.DruidNode; import java.lang.annotation.Annotation; @@ -72,7 +73,6 @@ public void setProps( public void configure(Binder binder) { String emitterType = props.getProperty(EMITTER_PROPERTY, ""); - binder.install(new NoopEmitterModule()); binder.install(new LogEmitterModule()); binder.install(new HttpEmitterModule()); @@ -99,16 +99,18 @@ public void configure(Binder binder) public ServiceEmitter getServiceEmitter( @Self Supplier configSupplier, Emitter emitter, - @ExtraServiceDimensions Map extraServiceDimensions + @ExtraServiceDimensions Map extraServiceDimensions, + TaskHolder taskHolder ) { final DruidNode config = configSupplier.get(); - log.info("Using emitter [%s] for metrics and alerts, with dimensions [%s].", emitter, extraServiceDimensions); + log.info("Using emitter [%s] for metrics and alerts, with dimensions [%s] and taskHolder[%s].", emitter, extraServiceDimensions, taskHolder); final ServiceEmitter retVal = new ServiceEmitter( config.getServiceName(), config.getHostAndPortToUse(), emitter, - ImmutableMap.copyOf(extraServiceDimensions) + ImmutableMap.copyOf(extraServiceDimensions), + taskHolder ); EmittingLogger.registerEmitter(retVal); return retVal; diff --git a/server/src/main/java/org/apache/druid/server/initialization/jetty/ChatHandlerServerModule.java b/server/src/main/java/org/apache/druid/server/initialization/jetty/ChatHandlerServerModule.java index a1c2debf629f..fa70e7c36cd9 100644 --- a/server/src/main/java/org/apache/druid/server/initialization/jetty/ChatHandlerServerModule.java +++ b/server/src/main/java/org/apache/druid/server/initialization/jetty/ChatHandlerServerModule.java @@ -31,11 +31,11 @@ import org.apache.druid.guice.annotations.RemoteChatHandler; import org.apache.druid.guice.annotations.Self; import org.apache.druid.java.util.common.lifecycle.Lifecycle; +import org.apache.druid.java.util.metrics.TaskHolder; import org.apache.druid.segment.realtime.ChatHandlerResource; import org.apache.druid.server.DruidNode; import org.apache.druid.server.initialization.ServerConfig; import org.apache.druid.server.initialization.TLSServerConfig; -import org.apache.druid.server.metrics.TaskHolder; import org.apache.druid.server.security.TLSCertificateChecker; import org.eclipse.jetty.server.Server; import org.eclipse.jetty.util.ssl.SslContextFactory; diff --git a/server/src/main/java/org/apache/druid/server/initialization/jetty/CliIndexerServerModule.java b/server/src/main/java/org/apache/druid/server/initialization/jetty/CliIndexerServerModule.java index 4532d6c87af0..c1c10547d880 100644 --- a/server/src/main/java/org/apache/druid/server/initialization/jetty/CliIndexerServerModule.java +++ b/server/src/main/java/org/apache/druid/server/initialization/jetty/CliIndexerServerModule.java @@ -31,12 +31,12 @@ import org.apache.druid.guice.annotations.RemoteChatHandler; import org.apache.druid.guice.annotations.Self; import org.apache.druid.java.util.common.lifecycle.Lifecycle; +import org.apache.druid.java.util.metrics.TaskHolder; import org.apache.druid.query.lookup.LookupModule; import org.apache.druid.segment.realtime.ChatHandlerResource; import org.apache.druid.server.DruidNode; import org.apache.druid.server.initialization.ServerConfig; import org.apache.druid.server.initialization.TLSServerConfig; -import org.apache.druid.server.metrics.TaskHolder; import org.apache.druid.server.security.TLSCertificateChecker; import org.eclipse.jetty.server.Server; import org.eclipse.jetty.util.ssl.SslContextFactory; diff --git a/server/src/main/java/org/apache/druid/server/initialization/jetty/JettyServerModule.java b/server/src/main/java/org/apache/druid/server/initialization/jetty/JettyServerModule.java index 5d54bf8b4d40..41cd7bf7fbdb 100644 --- a/server/src/main/java/org/apache/druid/server/initialization/jetty/JettyServerModule.java +++ b/server/src/main/java/org/apache/druid/server/initialization/jetty/JettyServerModule.java @@ -48,14 +48,11 @@ import org.apache.druid.java.util.emitter.service.ServiceEmitter; import org.apache.druid.java.util.emitter.service.ServiceMetricEvent; import org.apache.druid.java.util.metrics.AbstractMonitor; -import org.apache.druid.java.util.metrics.MonitorUtils; import org.apache.druid.server.DruidNode; import org.apache.druid.server.StatusResource; import org.apache.druid.server.initialization.ServerConfig; import org.apache.druid.server.initialization.TLSServerConfig; import org.apache.druid.server.metrics.MetricsModule; -import org.apache.druid.server.metrics.MonitorsConfig; -import org.apache.druid.server.metrics.TaskHolder; import org.apache.druid.server.security.CustomCheckX509TrustManager; import org.apache.druid.server.security.TLSCertificateChecker; import org.eclipse.jetty.server.ConnectionFactory; @@ -92,7 +89,6 @@ import java.util.Arrays; import java.util.Collection; import java.util.List; -import java.util.Map; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.atomic.AtomicInteger; @@ -517,27 +513,17 @@ private static int getTCPAcceptQueueSize() @Provides @LazySingleton - public JettyMonitor getJettyMonitor(TaskHolder taskHolder) + public JettyMonitor getJettyMonitor() { - return new JettyMonitor( - MonitorsConfig.mapOfTaskHolderDimensions(taskHolder) - ); + return new JettyMonitor(); } public static class JettyMonitor extends AbstractMonitor { - private final Map dimensions; - - public JettyMonitor(Map dimensions) - { - this.dimensions = dimensions; - } - @Override public boolean doMonitor(ServiceEmitter emitter) { final ServiceMetricEvent.Builder builder = new ServiceMetricEvent.Builder(); - MonitorUtils.addDimensionsToBuilder(builder, dimensions); emitter.emit(builder.setMetric("jetty/numOpenConnections", ACTIVE_CONNECTIONS.get())); if (jettyServerThreadPool != null) { emitter.emit(builder.setMetric("jetty/threadPool/total", jettyServerThreadPool.getThreads())); diff --git a/server/src/main/java/org/apache/druid/server/metrics/GroupByStatsMonitor.java b/server/src/main/java/org/apache/druid/server/metrics/GroupByStatsMonitor.java index 453fb4daaed8..10985b4b4d3a 100644 --- a/server/src/main/java/org/apache/druid/server/metrics/GroupByStatsMonitor.java +++ b/server/src/main/java/org/apache/druid/server/metrics/GroupByStatsMonitor.java @@ -27,11 +27,9 @@ import org.apache.druid.java.util.emitter.service.ServiceEmitter; import org.apache.druid.java.util.emitter.service.ServiceMetricEvent; import org.apache.druid.java.util.metrics.AbstractMonitor; -import org.apache.druid.java.util.metrics.MonitorUtils; import org.apache.druid.query.groupby.GroupByStatsProvider; import java.nio.ByteBuffer; -import java.util.Map; @LoadScope(roles = { NodeRole.BROKER_JSON_NAME, @@ -43,25 +41,21 @@ public class GroupByStatsMonitor extends AbstractMonitor { private final GroupByStatsProvider groupByStatsProvider; private final BlockingPool mergeBufferPool; - private final Map dimensions; @Inject public GroupByStatsMonitor( GroupByStatsProvider groupByStatsProvider, - @Merging BlockingPool mergeBufferPool, - TaskHolder taskHolder + @Merging BlockingPool mergeBufferPool ) { this.groupByStatsProvider = groupByStatsProvider; this.mergeBufferPool = mergeBufferPool; - this.dimensions = MonitorsConfig.mapOfTaskHolderDimensions(taskHolder); } @Override public boolean doMonitor(ServiceEmitter emitter) { final ServiceMetricEvent.Builder builder = new ServiceMetricEvent.Builder(); - MonitorUtils.addDimensionsToBuilder(builder, dimensions); emitter.emit(builder.setMetric("mergeBuffer/pendingRequests", mergeBufferPool.getPendingRequests())); diff --git a/server/src/main/java/org/apache/druid/server/metrics/MetricsModule.java b/server/src/main/java/org/apache/druid/server/metrics/MetricsModule.java index 0b59409de0d2..c1425b81652c 100644 --- a/server/src/main/java/org/apache/druid/server/metrics/MetricsModule.java +++ b/server/src/main/java/org/apache/druid/server/metrics/MetricsModule.java @@ -57,7 +57,6 @@ import java.time.Duration; import java.util.ArrayList; import java.util.List; -import java.util.Map; import java.util.Set; import java.util.stream.Collectors; @@ -143,48 +142,39 @@ public MonitorScheduler getMonitorScheduler( @Provides @ManageLifecycle - public JvmMonitor getJvmMonitor( - TaskHolder taskHolder - ) + public JvmMonitor getJvmMonitor() { - Map dimensions = MonitorsConfig.mapOfTaskHolderDimensions(taskHolder); - return new JvmMonitor(dimensions); + return new JvmMonitor(); } @Provides @ManageLifecycle - public JvmCpuMonitor getJvmCpuMonitor( - TaskHolder taskHolder - ) + public JvmCpuMonitor getJvmCpuMonitor() { - Map dimensions = MonitorsConfig.mapOfTaskHolderDimensions(taskHolder); - return new JvmCpuMonitor(dimensions); + return new JvmCpuMonitor(); } @Provides @ManageLifecycle - public JvmThreadsMonitor getJvmThreadsMonitor(TaskHolder taskHolder) + public JvmThreadsMonitor getJvmThreadsMonitor() { - Map dimensions = MonitorsConfig.mapOfTaskHolderDimensions(taskHolder); - return new JvmThreadsMonitor(dimensions); + return new JvmThreadsMonitor(); } @Provides @ManageLifecycle - public SysMonitor getSysMonitor(TaskHolder taskHolder, @Self Set nodeRoles) + public SysMonitor getSysMonitor(@Self Set nodeRoles) { if (nodeRoles.contains(NodeRole.PEON)) { return new NoopSysMonitor(); } else { - Map dimensions = MonitorsConfig.mapOfTaskHolderDimensions(taskHolder); - return new SysMonitor(dimensions); + return new SysMonitor(); } } @Provides @ManageLifecycle public OshiSysMonitor getOshiSysMonitor( - TaskHolder taskHolder, @Self Set nodeRoles, OshiSysMonitorConfig oshiSysConfig ) @@ -192,8 +182,7 @@ public OshiSysMonitor getOshiSysMonitor( if (nodeRoles.contains(NodeRole.PEON)) { return new NoopOshiSysMonitor(); } else { - Map dimensions = MonitorsConfig.mapOfTaskHolderDimensions(taskHolder); - return new OshiSysMonitor(dimensions, oshiSysConfig); + return new OshiSysMonitor(oshiSysConfig); } } diff --git a/server/src/main/java/org/apache/druid/server/metrics/MonitorsConfig.java b/server/src/main/java/org/apache/druid/server/metrics/MonitorsConfig.java index 261eb052d058..41561b2bb777 100644 --- a/server/src/main/java/org/apache/druid/server/metrics/MonitorsConfig.java +++ b/server/src/main/java/org/apache/druid/server/metrics/MonitorsConfig.java @@ -20,16 +20,13 @@ package org.apache.druid.server.metrics; import com.fasterxml.jackson.annotation.JsonProperty; -import com.google.common.collect.ImmutableMap; import org.apache.druid.java.util.common.StringUtils; import org.apache.druid.java.util.common.logger.Logger; import org.apache.druid.java.util.metrics.Monitor; -import org.apache.druid.query.DruidMetrics; import javax.validation.constraints.NotNull; import java.util.ArrayList; import java.util.List; -import java.util.Map; /** */ @@ -77,28 +74,6 @@ public String toString() '}'; } - /** - * @return a map of task holder dimensions from the provided {@link TaskHolder} if {@link TaskHolder#getDataSource()} - * and {@link TaskHolder#getTaskId()} are non-null. - *

The task ID ({@link TaskHolder#getTaskId()}) is added to both {@link DruidMetrics#TASK_ID} - * and {@link DruidMetrics#ID} dimensions to the map for backward compatibility. {@link DruidMetrics#ID} is - * deprecated because it's ambiguous and will be removed in a future release.

- */ - public static Map mapOfTaskHolderDimensions(final TaskHolder taskHolder) - { - final String dataSource = taskHolder.getDataSource(); - final String taskId = taskHolder.getTaskId(); - final ImmutableMap.Builder builder = ImmutableMap.builder(); - if (dataSource != null) { - builder.put(DruidMetrics.DATASOURCE, new String[]{dataSource}); - } - if (taskId != null) { - builder.put(DruidMetrics.TASK_ID, new String[]{taskId}); - builder.put(DruidMetrics.ID, new String[]{taskId}); - } - return builder.build(); - } - private static List> getMonitorsFromNames(List monitorNames) { List> monitors = new ArrayList<>(); diff --git a/server/src/test/java/org/apache/druid/guice/DefaultServerHolderModuleTest.java b/server/src/test/java/org/apache/druid/guice/DefaultServerHolderModuleTest.java index 4d377001ef84..d8c2751da5f1 100644 --- a/server/src/test/java/org/apache/druid/guice/DefaultServerHolderModuleTest.java +++ b/server/src/test/java/org/apache/druid/guice/DefaultServerHolderModuleTest.java @@ -23,10 +23,10 @@ import com.google.inject.Injector; import org.apache.druid.discovery.NodeRole; import org.apache.druid.initialization.CoreInjectorBuilder; +import org.apache.druid.java.util.metrics.NoopTaskHolder; +import org.apache.druid.java.util.metrics.TaskHolder; import org.apache.druid.server.metrics.DefaultLoadSpecHolder; import org.apache.druid.server.metrics.LoadSpecHolder; -import org.apache.druid.server.metrics.NoopTaskHolder; -import org.apache.druid.server.metrics.TaskHolder; import org.junit.Assert; import org.junit.Test; diff --git a/server/src/test/java/org/apache/druid/query/lookup/LookupListeningAnnouncerConfigTest.java b/server/src/test/java/org/apache/druid/query/lookup/LookupListeningAnnouncerConfigTest.java index 0038b7d36c50..61586e47d3dc 100644 --- a/server/src/test/java/org/apache/druid/query/lookup/LookupListeningAnnouncerConfigTest.java +++ b/server/src/test/java/org/apache/druid/query/lookup/LookupListeningAnnouncerConfigTest.java @@ -29,12 +29,13 @@ import org.apache.druid.guice.JsonConfigurator; import org.apache.druid.guice.annotations.Self; import org.apache.druid.initialization.Initialization; +import org.apache.druid.java.util.metrics.TaskHolder; import org.apache.druid.server.DruidNode; import org.apache.druid.server.coordination.BroadcastDatasourceLoadingSpec; import org.apache.druid.server.lookup.cache.LookupLoadingSpec; import org.apache.druid.server.metrics.DefaultLoadSpecHolder; import org.apache.druid.server.metrics.LoadSpecHolder; -import org.apache.druid.server.metrics.TaskHolder; +import org.apache.druid.server.metrics.TestLoadSpecHolder; import org.apache.druid.server.metrics.TestTaskHolder; import org.junit.Assert; import org.junit.Before; @@ -59,21 +60,10 @@ public void configure(Binder binder) Key.get(DruidNode.class, Self.class), new DruidNode("test-inject", null, false, null, null, true, false) ); - binder.bind(TaskHolder.class).toInstance(new TestTaskHolder("some_datasource", "some_taskid")); - binder.bind(LoadSpecHolder.class).toInstance(new LoadSpecHolder() - { - @Override - public LookupLoadingSpec getLookupLoadingSpec() - { - return LookupLoadingSpec.loadOnly(Set.of("lookupName1", "lookupName2")); - } - - @Override - public BroadcastDatasourceLoadingSpec getBroadcastDatasourceLoadingSpec() - { - return BroadcastDatasourceLoadingSpec.ALL; - } - }); + binder.bind(TaskHolder.class).toInstance(new TestTaskHolder("some_datasource", "some_taskid", "test_tasktype", "test_groupid")); + binder.bind(LoadSpecHolder.class).toInstance( + new TestLoadSpecHolder(LookupLoadingSpec.loadOnly(Set.of("lookupName1", "lookupName2")), BroadcastDatasourceLoadingSpec.ALL) + ); } }, new LookupModule() diff --git a/server/src/test/java/org/apache/druid/segment/realtime/ChatHandlerResourceTest.java b/server/src/test/java/org/apache/druid/segment/realtime/ChatHandlerResourceTest.java index 7e5320d74b60..4636e6523d86 100644 --- a/server/src/test/java/org/apache/druid/segment/realtime/ChatHandlerResourceTest.java +++ b/server/src/test/java/org/apache/druid/segment/realtime/ChatHandlerResourceTest.java @@ -21,8 +21,8 @@ package org.apache.druid.segment.realtime; import com.google.common.base.Optional; +import org.apache.druid.java.util.metrics.TaskHolder; import org.apache.druid.server.initialization.jetty.ServiceUnavailableException; -import org.apache.druid.server.metrics.TaskHolder; import org.easymock.EasyMock; import org.easymock.EasyMockRunner; import org.easymock.EasyMockSupport; diff --git a/server/src/test/java/org/apache/druid/server/ClientQuerySegmentWalkerTest.java b/server/src/test/java/org/apache/druid/server/ClientQuerySegmentWalkerTest.java index 021227a3ca36..b374bbb820f6 100644 --- a/server/src/test/java/org/apache/druid/server/ClientQuerySegmentWalkerTest.java +++ b/server/src/test/java/org/apache/druid/server/ClientQuerySegmentWalkerTest.java @@ -221,7 +221,7 @@ public class ClientQuerySegmentWalkerTest private Closer closer; private QueryRunnerFactoryConglomerate conglomerate; - private final StubServiceEmitter emitter = new StubServiceEmitter(); + private final StubServiceEmitter emitter = StubServiceEmitter.createStarted(); // Queries that are issued; checked by "testQuery" against its "expectedQueries" parameter. private final List issuedQueries = new ArrayList<>(); diff --git a/server/src/test/java/org/apache/druid/server/QueryResourceTest.java b/server/src/test/java/org/apache/druid/server/QueryResourceTest.java index 72c895e46a03..b1ff73cace0a 100644 --- a/server/src/test/java/org/apache/druid/server/QueryResourceTest.java +++ b/server/src/test/java/org/apache/druid/server/QueryResourceTest.java @@ -258,7 +258,7 @@ public void setup() queryScheduler = QueryStackTests.DEFAULT_NOOP_SCHEDULER; testRequestLogger = new TestRequestLogger(); - emitter = new StubServiceEmitter(); + emitter = StubServiceEmitter.createStarted(); queryResource = createQueryResource(ResponseContextConfig.newConfig(true)); } diff --git a/server/src/test/java/org/apache/druid/server/coordination/SegmentCacheBootstrapperTest.java b/server/src/test/java/org/apache/druid/server/coordination/SegmentCacheBootstrapperTest.java index cd06239cbcc5..d64f39fb8ee4 100644 --- a/server/src/test/java/org/apache/druid/server/coordination/SegmentCacheBootstrapperTest.java +++ b/server/src/test/java/org/apache/druid/server/coordination/SegmentCacheBootstrapperTest.java @@ -30,7 +30,7 @@ import org.apache.druid.server.SegmentManager; import org.apache.druid.server.lookup.cache.LookupLoadingSpec; import org.apache.druid.server.metrics.DefaultLoadSpecHolder; -import org.apache.druid.server.metrics.LoadSpecHolder; +import org.apache.druid.server.metrics.TestLoadSpecHolder; import org.apache.druid.test.utils.TestSegmentCacheManager; import org.apache.druid.timeline.DataSegment; import org.junit.Assert; @@ -300,20 +300,7 @@ public void testLoadNoBootstrapSegments() throws Exception new ServerTypeConfig(ServerType.HISTORICAL), coordinatorClient, serviceEmitter, - new LoadSpecHolder() - { - @Override - public LookupLoadingSpec getLookupLoadingSpec() - { - return LookupLoadingSpec.ALL; - } - - @Override - public BroadcastDatasourceLoadingSpec getBroadcastDatasourceLoadingSpec() - { - return BroadcastDatasourceLoadingSpec.NONE; - } - } + new TestLoadSpecHolder(LookupLoadingSpec.ALL, BroadcastDatasourceLoadingSpec.NONE) ); Assert.assertTrue(segmentManager.getDataSourceCounts().isEmpty()); @@ -362,20 +349,7 @@ public void testLoadOnlyRequiredBootstrapSegments() throws Exception new ServerTypeConfig(ServerType.HISTORICAL), coordinatorClient, serviceEmitter, - new LoadSpecHolder() - { - @Override - public LookupLoadingSpec getLookupLoadingSpec() - { - return LookupLoadingSpec.NONE; - } - - @Override - public BroadcastDatasourceLoadingSpec getBroadcastDatasourceLoadingSpec() - { - return BroadcastDatasourceLoadingSpec.loadOnly(Set.of("test1")); - } - } + new TestLoadSpecHolder(LookupLoadingSpec.NONE, BroadcastDatasourceLoadingSpec.loadOnly(Set.of("test1"))) ); Assert.assertTrue(segmentManager.getDataSourceCounts().isEmpty()); diff --git a/server/src/test/java/org/apache/druid/server/coordinator/duty/RunRulesTest.java b/server/src/test/java/org/apache/druid/server/coordinator/duty/RunRulesTest.java index f7e9ececc1ac..a3042db2df18 100644 --- a/server/src/test/java/org/apache/druid/server/coordinator/duty/RunRulesTest.java +++ b/server/src/test/java/org/apache/druid/server/coordinator/duty/RunRulesTest.java @@ -96,7 +96,7 @@ public class RunRulesTest public void setUp() { mockPeon = EasyMock.createMock(LoadQueuePeon.class); - emitter = new StubServiceEmitter("coordinator", "host"); + emitter = StubServiceEmitter.createStarted(); EmittingLogger.registerEmitter(emitter); databaseRuleManager = EasyMock.createMock(MetadataRuleManager.class); ruleRunner = new RunRules((ds, set) -> set.size(), databaseRuleManager::getRulesWithDefault); diff --git a/server/src/test/java/org/apache/druid/server/emitter/EmitterModuleTest.java b/server/src/test/java/org/apache/druid/server/emitter/EmitterModuleTest.java index 3467d7bee640..536ff5c34ce5 100644 --- a/server/src/test/java/org/apache/druid/server/emitter/EmitterModuleTest.java +++ b/server/src/test/java/org/apache/druid/server/emitter/EmitterModuleTest.java @@ -20,19 +20,39 @@ package org.apache.druid.server.emitter; import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableSet; import com.google.inject.Binder; import com.google.inject.Guice; import com.google.inject.Injector; +import com.google.inject.Key; import com.google.inject.Module; +import com.google.inject.Scopes; +import org.apache.druid.discovery.NodeRole; +import org.apache.druid.guice.DefaultServerHolderModule; import org.apache.druid.guice.DruidGuiceExtensions; +import org.apache.druid.guice.JsonConfigProvider; import org.apache.druid.guice.JsonConfigurator; import org.apache.druid.guice.LazySingleton; import org.apache.druid.guice.LifecycleModule; import org.apache.druid.guice.ServerModule; +import org.apache.druid.guice.annotations.Self; +import org.apache.druid.initialization.ServerInjectorBuilder; import org.apache.druid.jackson.JacksonModule; import org.apache.druid.java.util.emitter.core.Emitter; +import org.apache.druid.java.util.emitter.core.Event; +import org.apache.druid.java.util.emitter.core.EventMap; import org.apache.druid.java.util.emitter.core.NoopEmitter; import org.apache.druid.java.util.emitter.core.ParametrizedUriEmitter; +import org.apache.druid.java.util.emitter.service.ServiceEmitter; +import org.apache.druid.java.util.emitter.service.ServiceMetricEvent; +import org.apache.druid.java.util.metrics.StubServiceEmitter; +import org.apache.druid.java.util.metrics.StubServiceEmitterModule; +import org.apache.druid.java.util.metrics.TaskHolder; +import org.apache.druid.query.DruidMetrics; +import org.apache.druid.server.DruidNode; +import org.apache.druid.server.metrics.DefaultLoadSpecHolder; +import org.apache.druid.server.metrics.LoadSpecHolder; +import org.apache.druid.server.metrics.TestTaskHolder; import org.hamcrest.CoreMatchers; import org.junit.Assert; import org.junit.Rule; @@ -41,6 +61,7 @@ import javax.validation.Validation; import javax.validation.Validator; +import java.util.List; import java.util.Properties; public class EmitterModuleTest @@ -87,6 +108,78 @@ public void testInvalidEmitterType() makeInjectorWithProperties(props).getInstance(Emitter.class); } + @Test + public void testEmitterForTaskContainsAllTaskDimensions() + { + Properties props = new Properties(); + props.setProperty("druid.emitter", "stub"); + EmitterModule emitterModule = new EmitterModule(); + emitterModule.setProps(props); + + ImmutableSet nodeRoles = ImmutableSet.of(); + + TestTaskHolder testTaskHolder = new TestTaskHolder("wiki", "id1", "type1", "group1"); + Injector injector = Guice.createInjector( + new JacksonModule(), + new LifecycleModule(), + binder -> { + JsonConfigProvider.bindInstance( + binder, + Key.get(DruidNode.class, Self.class), + new DruidNode("test-inject", null, false, null, null, true, false) + ); + binder.bind(Validator.class).toInstance(Validation.buildDefaultValidatorFactory().getValidator()); + binder.bindScope(LazySingleton.class, Scopes.SINGLETON); + binder.bind(Properties.class).toInstance(props); + binder.bind(TaskHolder.class).toInstance(testTaskHolder); + binder.bind(LoadSpecHolder.class).to(DefaultLoadSpecHolder.class).in(LazySingleton.class); + }, + ServerInjectorBuilder.registerNodeRoleModule(nodeRoles), + emitterModule, + new StubServiceEmitterModule() + ); + + ServiceEmitter instance = injector.getInstance(ServiceEmitter.class); + Assert.assertNotNull(instance); + instance.start(); + final ServiceMetricEvent.Builder builder = new ServiceMetricEvent.Builder(); + builder.setDimension("foo", "bar"); + builder.setMetric("metric1", 1); + instance.emit(builder); + + Emitter instance1 = injector.getInstance(Emitter.class); + Assert.assertTrue(instance1 instanceof StubServiceEmitter); + StubServiceEmitter stubEmitter = (StubServiceEmitter) instance1; + + stubEmitter.verifyEmitted("metric1", 1); + List events = stubEmitter.getEvents(); + Assert.assertEquals(1, events.size()); + ServiceMetricEvent event = (ServiceMetricEvent) events.get(0); + EventMap map = event.toMap(); + Assert.assertEquals("id1", map.get(DruidMetrics.TASK_ID)); + Assert.assertEquals("id1", map.get(DruidMetrics.ID)); + Assert.assertEquals("type1", map.get(DruidMetrics.TASK_TYPE)); + Assert.assertEquals("group1", map.get(DruidMetrics.GROUP_ID)); + Assert.assertEquals("wiki", map.get(DruidMetrics.DATASOURCE)); + stubEmitter.flush(); + + // Override a dimension and verify that is emitted + final ServiceMetricEvent.Builder builder2 = new ServiceMetricEvent.Builder(); + builder2.setDimension("taskId", "id2"); + builder2.setMetric("metric2", 1); + instance.emit(builder2); + + List events2 = stubEmitter.getEvents(); + Assert.assertEquals(1, events2.size()); + ServiceMetricEvent event2 = (ServiceMetricEvent) events2.get(0); + EventMap map2 = event2.toMap(); + Assert.assertEquals("id2", map2.get(DruidMetrics.TASK_ID)); + Assert.assertEquals("id1", map2.get(DruidMetrics.ID)); + Assert.assertEquals("type1", map2.get(DruidMetrics.TASK_TYPE)); + Assert.assertEquals("group1", map2.get(DruidMetrics.GROUP_ID)); + Assert.assertEquals("wiki", map2.get(DruidMetrics.DATASOURCE)); + } + private Injector makeInjectorWithProperties(final Properties props) { EmitterModule emitterModule = new EmitterModule(); @@ -97,6 +190,7 @@ private Injector makeInjectorWithProperties(final Properties props) new LifecycleModule(), new ServerModule(), new JacksonModule(), + new DefaultServerHolderModule(), new Module() { @Override diff --git a/server/src/test/java/org/apache/druid/server/initialization/jetty/JettyServerModuleTest.java b/server/src/test/java/org/apache/druid/server/initialization/jetty/JettyServerModuleTest.java index 18c3317c84d7..317c69e17e2c 100644 --- a/server/src/test/java/org/apache/druid/server/initialization/jetty/JettyServerModuleTest.java +++ b/server/src/test/java/org/apache/druid/server/initialization/jetty/JettyServerModuleTest.java @@ -28,8 +28,6 @@ import org.apache.druid.server.DruidNode; import org.apache.druid.server.initialization.ServerConfig; import org.apache.druid.server.initialization.TLSServerConfig; -import org.apache.druid.server.metrics.MonitorsConfig; -import org.apache.druid.server.metrics.TestTaskHolder; import org.apache.druid.server.security.TLSCertificateChecker; import org.eclipse.jetty.server.Server; import org.eclipse.jetty.util.ssl.SslContextFactory; @@ -40,7 +38,6 @@ import java.util.Arrays; import java.util.List; -import java.util.Map; public class JettyServerModuleTest { @@ -57,12 +54,9 @@ public void testJettyServerModule() Mockito.when(jettyServerThreadPool.getQueueSize()).thenReturn(50); Mockito.when(jettyServerThreadPool.getBusyThreads()).thenReturn(60); - final Map dimensionMap = MonitorsConfig.mapOfTaskHolderDimensions( - new TestTaskHolder("ds", "t0") - ); - JettyServerModule.JettyMonitor jettyMonitor = new JettyServerModule.JettyMonitor(dimensionMap); + JettyServerModule.JettyMonitor jettyMonitor = new JettyServerModule.JettyMonitor(); - final StubServiceEmitter serviceEmitter = new StubServiceEmitter("service", "host"); + final StubServiceEmitter serviceEmitter = StubServiceEmitter.createStarted(); jettyMonitor.doMonitor(serviceEmitter); serviceEmitter.verifyValue("jetty/numOpenConnections", 0); diff --git a/server/src/test/java/org/apache/druid/server/metrics/GroupByStatsMonitorTest.java b/server/src/test/java/org/apache/druid/server/metrics/GroupByStatsMonitorTest.java index ce4fbf1b68ed..94da5889d39c 100644 --- a/server/src/test/java/org/apache/druid/server/metrics/GroupByStatsMonitorTest.java +++ b/server/src/test/java/org/apache/druid/server/metrics/GroupByStatsMonitorTest.java @@ -21,7 +21,10 @@ import org.apache.druid.collections.BlockingPool; import org.apache.druid.collections.DefaultBlockingPool; +import org.apache.druid.java.util.emitter.core.EventMap; +import org.apache.druid.java.util.emitter.service.ServiceMetricEvent; import org.apache.druid.java.util.metrics.StubServiceEmitter; +import org.apache.druid.query.DruidMetrics; import org.apache.druid.query.groupby.GroupByStatsProvider; import org.junit.After; import org.junit.Assert; @@ -32,6 +35,7 @@ import java.util.Collections; import java.util.List; import java.util.Map; +import java.util.Objects; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; @@ -76,8 +80,9 @@ public void tearDown() public void testMonitor() { final GroupByStatsMonitor monitor = - new GroupByStatsMonitor(groupByStatsProvider, mergeBufferPool, new NoopTaskHolder()); + new GroupByStatsMonitor(groupByStatsProvider, mergeBufferPool); final StubServiceEmitter emitter = new StubServiceEmitter("service", "host"); + emitter.start(); monitor.doMonitor(emitter); emitter.flush(); // Trigger metric emission @@ -98,30 +103,35 @@ public void testMonitorWithServiceDimensions() { final String dataSource = "fooDs"; final String taskId = "taskId1"; + final String groupId = "test_groupid"; + final String taskType = "test_tasktype"; final GroupByStatsMonitor monitor = new GroupByStatsMonitor( groupByStatsProvider, - mergeBufferPool, - new TestTaskHolder(dataSource, taskId) + mergeBufferPool ); - final StubServiceEmitter emitter = new StubServiceEmitter("service", "host"); + final StubServiceEmitter emitter = new StubServiceEmitter("service", "host", new TestTaskHolder(dataSource, taskId, taskType, groupId)); + emitter.start(); monitor.doMonitor(emitter); emitter.flush(); // Trigger metric emission monitor.doMonitor(emitter); final Map dimFilters = Map.of( - "taskId", List.of(taskId), "dataSource", List.of(dataSource), "id", List.of(taskId) + DruidMetrics.DATASOURCE, dataSource, + DruidMetrics.TASK_ID, taskId, + DruidMetrics.ID, taskId, + DruidMetrics.TASK_TYPE, + taskType, DruidMetrics.GROUP_ID, groupId ); - Assert.assertEquals(7, emitter.getNumEmittedEvents()); - emitter.verifyValue("mergeBuffer/pendingRequests", dimFilters, 0L); - emitter.verifyValue("mergeBuffer/used", dimFilters, 0L); - emitter.verifyValue("mergeBuffer/queries", dimFilters, 1L); - emitter.verifyValue("mergeBuffer/acquisitionTimeNs", dimFilters, 100L); - emitter.verifyValue("groupBy/spilledQueries", dimFilters, 2L); - emitter.verifyValue("groupBy/spilledBytes", dimFilters, 200L); - emitter.verifyValue("groupBy/mergeDictionarySize", dimFilters, 300L); - } + verifyMetricValue(emitter, "mergeBuffer/pendingRequests", dimFilters, 0L); + verifyMetricValue(emitter, "mergeBuffer/used", dimFilters, 0L); + verifyMetricValue(emitter, "mergeBuffer/queries", dimFilters, 1L); + verifyMetricValue(emitter, "mergeBuffer/acquisitionTimeNs", dimFilters, 100L); + verifyMetricValue(emitter, "groupBy/spilledQueries", dimFilters, 2L); + verifyMetricValue(emitter, "groupBy/spilledBytes", dimFilters, 200L); + verifyMetricValue(emitter, "groupBy/mergeDictionarySize", dimFilters, 300L); + } @Test public void testMonitoringMergeBuffer_acquiredCount() @@ -132,7 +142,7 @@ public void testMonitoringMergeBuffer_acquiredCount() }).get(20, TimeUnit.SECONDS); final GroupByStatsMonitor monitor = - new GroupByStatsMonitor(groupByStatsProvider, mergeBufferPool, new NoopTaskHolder()); + new GroupByStatsMonitor(groupByStatsProvider, mergeBufferPool); final StubServiceEmitter emitter = new StubServiceEmitter("DummyService", "DummyHost"); boolean ret = monitor.doMonitor(emitter); Assert.assertTrue(ret); @@ -161,7 +171,7 @@ public void testMonitoringMergeBuffer_pendingRequests() } final GroupByStatsMonitor monitor = - new GroupByStatsMonitor(groupByStatsProvider, mergeBufferPool, new NoopTaskHolder()); + new GroupByStatsMonitor(groupByStatsProvider, mergeBufferPool); final StubServiceEmitter emitter = new StubServiceEmitter("DummyService", "DummyHost"); boolean ret = monitor.doMonitor(emitter); Assert.assertTrue(ret); @@ -174,4 +184,16 @@ public void testMonitoringMergeBuffer_pendingRequests() // do nothing } } + + private void verifyMetricValue(StubServiceEmitter emitter, String metricName, Map dimFilters, Number expectedValue) + { + final List observedMetricEvents = emitter.getMetricEvents(metricName); + Assert.assertEquals(1, observedMetricEvents.size()); + final ServiceMetricEvent event = observedMetricEvents.get(0); + final EventMap map = event.toMap(); + final boolean matchesDims = dimFilters.entrySet().stream() + .allMatch(e -> Objects.equals(e.getValue(), map.get(e.getKey()))); + Assert.assertTrue(matchesDims); + Assert.assertEquals(expectedValue, event.getValue()); + } } diff --git a/server/src/test/java/org/apache/druid/server/metrics/LatchableEmitter.java b/server/src/test/java/org/apache/druid/server/metrics/LatchableEmitter.java index 8f5ad4979284..e9779c1ddd68 100644 --- a/server/src/test/java/org/apache/druid/server/metrics/LatchableEmitter.java +++ b/server/src/test/java/org/apache/druid/server/metrics/LatchableEmitter.java @@ -24,6 +24,7 @@ import org.apache.druid.java.util.emitter.core.Event; import org.apache.druid.java.util.emitter.service.ServiceMetricEvent; import org.apache.druid.java.util.metrics.StubServiceEmitter; +import org.apache.druid.java.util.metrics.TaskHolder; import org.hamcrest.Matcher; import org.hamcrest.Matchers; import org.junit.jupiter.api.Timeout; @@ -72,9 +73,9 @@ public class LatchableEmitter extends StubServiceEmitter /** * Creates a {@link StubServiceEmitter} that may be used in embedded tests. */ - public LatchableEmitter(String service, String host, LatchableEmitterConfig config) + public LatchableEmitter(String service, String host, LatchableEmitterConfig config, TaskHolder taskHolder) { - super(service, host); + super(service, host, taskHolder); this.defaultWaitTimeoutMillis = config.getDefaultWaitTimeoutMillis(); } diff --git a/server/src/test/java/org/apache/druid/server/metrics/MetricsModuleTest.java b/server/src/test/java/org/apache/druid/server/metrics/MetricsModuleTest.java index ad0e6aac3d94..5b54f0d5ccf8 100644 --- a/server/src/test/java/org/apache/druid/server/metrics/MetricsModuleTest.java +++ b/server/src/test/java/org/apache/druid/server/metrics/MetricsModuleTest.java @@ -49,9 +49,12 @@ import org.apache.druid.java.util.metrics.MonitorScheduler; import org.apache.druid.java.util.metrics.NoopOshiSysMonitor; import org.apache.druid.java.util.metrics.NoopSysMonitor; +import org.apache.druid.java.util.metrics.NoopTaskHolder; import org.apache.druid.java.util.metrics.OshiSysMonitor; import org.apache.druid.java.util.metrics.OshiSysMonitorConfig; import org.apache.druid.java.util.metrics.SysMonitor; +import org.apache.druid.java.util.metrics.TaskHolder; +import org.apache.druid.query.DruidMetrics; import org.apache.druid.server.DruidNode; import org.hamcrest.CoreMatchers; import org.junit.Assert; @@ -64,6 +67,7 @@ import javax.validation.Validation; import javax.validation.Validator; +import java.util.Map; import java.util.Properties; import java.util.Set; @@ -99,8 +103,10 @@ public void configure(Binder binder) @Test public void testSimpleInjectionWithValues() { - final String dataSource = "some datasource"; - final String taskId = "some task"; + final String dataSource = "some_datasource"; + final String taskId = "some_taskid"; + final String taskType = "some_task_type"; + final String groupId = "some_groupid"; final Injector injector = Initialization.makeInjectorWithModules( GuiceInjectors.makeStartupInjector(), ImmutableList.of(new Module() @@ -113,7 +119,7 @@ public void configure(Binder binder) Key.get(DruidNode.class, Self.class), new DruidNode("test-inject", null, false, null, null, true, false) ); - binder.bind(TaskHolder.class).toInstance(new TestTaskHolder(dataSource, taskId)); + binder.bind(TaskHolder.class).toInstance(new TestTaskHolder(dataSource, taskId, taskType, groupId)); binder.bind(LoadSpecHolder.class).to(DefaultLoadSpecHolder.class).in(LazySingleton.class); } }) @@ -121,6 +127,17 @@ public void configure(Binder binder) TaskHolder taskHolder = injector.getInstance(TaskHolder.class); Assert.assertEquals(dataSource, taskHolder.getDataSource()); Assert.assertEquals(taskId, taskHolder.getTaskId()); + Assert.assertEquals(taskType, taskHolder.getTaskType()); + Assert.assertEquals(groupId, taskHolder.getGroupId()); + Map expectedTaskDims = Map.of( + DruidMetrics.DATASOURCE, dataSource, + DruidMetrics.TASK_ID, taskId, + DruidMetrics.ID, taskId, + DruidMetrics.TASK_TYPE, taskType, + DruidMetrics.GROUP_ID, groupId + ); + + Assert.assertEquals(expectedTaskDims, taskHolder.getMetricDimensions()); } @Test diff --git a/server/src/test/java/org/apache/druid/server/metrics/TestLoadSpecHolder.java b/server/src/test/java/org/apache/druid/server/metrics/TestLoadSpecHolder.java new file mode 100644 index 000000000000..b66686b9a720 --- /dev/null +++ b/server/src/test/java/org/apache/druid/server/metrics/TestLoadSpecHolder.java @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.server.metrics; + +import org.apache.druid.server.coordination.BroadcastDatasourceLoadingSpec; +import org.apache.druid.server.lookup.cache.LookupLoadingSpec; + +public class TestLoadSpecHolder implements LoadSpecHolder +{ + private final LookupLoadingSpec lookupLoadingSpec; + private final BroadcastDatasourceLoadingSpec broadcastDatasourceLoadingSpec; + + public TestLoadSpecHolder( + final LookupLoadingSpec lookupLoadingSpec, + final BroadcastDatasourceLoadingSpec broadcastDatasourceLoadingSpec + ) + { + this.lookupLoadingSpec = lookupLoadingSpec; + this.broadcastDatasourceLoadingSpec = broadcastDatasourceLoadingSpec; + } + + @Override + public LookupLoadingSpec getLookupLoadingSpec() + { + return lookupLoadingSpec; + } + + @Override + public BroadcastDatasourceLoadingSpec getBroadcastDatasourceLoadingSpec() + { + return broadcastDatasourceLoadingSpec; + } +} diff --git a/server/src/test/java/org/apache/druid/server/metrics/TestTaskHolder.java b/server/src/test/java/org/apache/druid/server/metrics/TestTaskHolder.java index eed9bcab8272..474ac71cbbc1 100644 --- a/server/src/test/java/org/apache/druid/server/metrics/TestTaskHolder.java +++ b/server/src/test/java/org/apache/druid/server/metrics/TestTaskHolder.java @@ -19,15 +19,24 @@ package org.apache.druid.server.metrics; +import org.apache.druid.java.util.metrics.TaskHolder; +import org.apache.druid.query.DruidMetrics; + +import java.util.Map; + public class TestTaskHolder implements TaskHolder { private final String dataSource; private final String taskId; + private final String taskType; + private final String groupId; - public TestTaskHolder(final String dataSource, final String taskId) + public TestTaskHolder(final String dataSource, final String taskId, final String taskType, final String groupId) { this.dataSource = dataSource; this.taskId = taskId; + this.taskType = taskType; + this.groupId = groupId; } @Override @@ -41,4 +50,28 @@ public String getTaskId() { return taskId; } + + @Override + public String getTaskType() + { + return taskType; + } + + @Override + public String getGroupId() + { + return groupId; + } + + @Override + public Map getMetricDimensions() + { + return Map.of( + DruidMetrics.DATASOURCE, dataSource, + DruidMetrics.TASK_ID, taskId, + DruidMetrics.ID, taskId, + DruidMetrics.TASK_TYPE, taskType, + DruidMetrics.GROUP_ID, groupId + ); + } } diff --git a/services/src/main/java/org/apache/druid/cli/CliPeon.java b/services/src/main/java/org/apache/druid/cli/CliPeon.java index 7aa859ebe326..84a6046d1ba4 100644 --- a/services/src/main/java/org/apache/druid/cli/CliPeon.java +++ b/services/src/main/java/org/apache/druid/cli/CliPeon.java @@ -94,6 +94,7 @@ import org.apache.druid.indexing.worker.shuffle.LocalIntermediaryDataManager; import org.apache.druid.java.util.common.lifecycle.Lifecycle; import org.apache.druid.java.util.common.logger.Logger; +import org.apache.druid.java.util.metrics.TaskHolder; import org.apache.druid.metadata.input.InputSourceModule; import org.apache.druid.msq.guice.MSQDurableStorageModule; import org.apache.druid.msq.guice.MSQExternalDataSourceModule; @@ -131,7 +132,6 @@ import org.apache.druid.server.initialization.jetty.JettyServerInitializer; import org.apache.druid.server.metrics.LoadSpecHolder; import org.apache.druid.server.metrics.ServiceStatusMonitor; -import org.apache.druid.server.metrics.TaskHolder; import org.apache.druid.storage.local.LocalTmpStorageConfig; import org.apache.druid.tasklogs.TaskPayloadManager; import org.eclipse.jetty.server.Server; diff --git a/services/src/main/java/org/apache/druid/cli/PeonTaskHolder.java b/services/src/main/java/org/apache/druid/cli/PeonTaskHolder.java index 1c223dd6d39c..74ce547ed900 100644 --- a/services/src/main/java/org/apache/druid/cli/PeonTaskHolder.java +++ b/services/src/main/java/org/apache/druid/cli/PeonTaskHolder.java @@ -23,11 +23,14 @@ import com.google.inject.Provider; import org.apache.druid.indexing.common.task.Task; import org.apache.druid.java.util.metrics.AbstractMonitor; +import org.apache.druid.java.util.metrics.TaskHolder; +import org.apache.druid.query.DruidMetrics; import org.apache.druid.query.aggregation.AggregatorFactory; import org.apache.druid.query.lookup.LookupModule; import org.apache.druid.segment.transform.TransformSpec; import org.apache.druid.server.metrics.MetricsModule; -import org.apache.druid.server.metrics.TaskHolder; + +import java.util.Map; /** * TaskHolder implementation for {@code CliPeon} processes. @@ -59,4 +62,35 @@ public String getTaskId() { return taskProvider.get().getId(); } + + @Override + public String getTaskType() + { + return taskProvider.get().getType(); + } + + @Override + public String getGroupId() + { + return taskProvider.get().getGroupId(); + } + + /** + * @return a map of all task-specific dimensions applicable to this peon. + * The task ID ({@link TaskHolder#getTaskId()}) is added to both {@link DruidMetrics#TASK_ID} + * {@link DruidMetrics#ID} dimensions to the map for backward compatibility. {@link DruidMetrics#ID} is + * deprecated because it's ambiguous and can be removed in a future release.

+ */ + @Override + public Map getMetricDimensions() + { + final Task task = taskProvider.get(); + return Map.of( + DruidMetrics.DATASOURCE, task.getDataSource(), + DruidMetrics.TASK_ID, task.getId(), + DruidMetrics.ID, task.getId(), + DruidMetrics.TASK_TYPE, task.getType(), + DruidMetrics.GROUP_ID, task.getGroupId() + ); + } } diff --git a/services/src/test/java/org/apache/druid/cli/CliPeonTest.java b/services/src/test/java/org/apache/druid/cli/CliPeonTest.java index 718933e7cfff..962b716a5b28 100644 --- a/services/src/test/java/org/apache/druid/cli/CliPeonTest.java +++ b/services/src/test/java/org/apache/druid/cli/CliPeonTest.java @@ -22,7 +22,10 @@ import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; +import com.google.inject.Guice; import com.google.inject.Injector; +import com.google.inject.Module; +import com.google.inject.Scopes; import org.apache.commons.io.FileUtils; import org.apache.druid.data.input.impl.ByteEntity; import org.apache.druid.data.input.impl.CsvInputFormat; @@ -30,6 +33,8 @@ import org.apache.druid.data.input.impl.TimestampSpec; import org.apache.druid.discovery.NodeRole; import org.apache.druid.guice.GuiceInjectors; +import org.apache.druid.guice.LazySingleton; +import org.apache.druid.guice.LifecycleModule; import org.apache.druid.indexer.granularity.ArbitraryGranularitySpec; import org.apache.druid.indexing.common.SegmentCacheManagerFactory; import org.apache.druid.indexing.common.TaskToolbox; @@ -46,9 +51,18 @@ import org.apache.druid.indexing.seekablestream.SeekableStreamStartSequenceNumbers; import org.apache.druid.indexing.seekablestream.common.RecordSupplier; import org.apache.druid.indexing.worker.executor.ExecutorLifecycleConfig; +import org.apache.druid.jackson.JacksonModule; import org.apache.druid.java.util.common.DateTimes; import org.apache.druid.java.util.common.Intervals; import org.apache.druid.java.util.common.granularity.AllGranularity; +import org.apache.druid.java.util.emitter.core.Emitter; +import org.apache.druid.java.util.emitter.core.Event; +import org.apache.druid.java.util.emitter.core.EventMap; +import org.apache.druid.java.util.emitter.service.ServiceEventBuilder; +import org.apache.druid.java.util.emitter.service.ServiceMetricEvent; +import org.apache.druid.java.util.metrics.StubServiceEmitter; +import org.apache.druid.java.util.metrics.StubServiceEmitterModule; +import org.apache.druid.java.util.metrics.TaskHolder; import org.apache.druid.query.DruidMetrics; import org.apache.druid.query.aggregation.AggregatorFactory; import org.apache.druid.query.aggregation.CountAggregatorFactory; @@ -61,7 +75,6 @@ import org.apache.druid.server.coordination.BroadcastDatasourceLoadingSpec; import org.apache.druid.server.lookup.cache.LookupLoadingSpec; import org.apache.druid.server.metrics.LoadSpecHolder; -import org.apache.druid.server.metrics.TaskHolder; import org.apache.druid.storage.local.LocalTmpStorageConfig; import org.joda.time.Duration; import org.junit.Assert; @@ -70,10 +83,14 @@ import org.junit.rules.TemporaryFolder; import javax.annotation.Nullable; +import javax.validation.Validation; +import javax.validation.Validator; import java.io.File; import java.io.IOException; import java.nio.charset.StandardCharsets; +import java.util.ArrayList; import java.util.Collections; +import java.util.List; import java.util.Map; import java.util.Properties; import java.util.Set; @@ -254,6 +271,43 @@ public void testTaskWithMonitorsAndMetricsSpecDoNotCauseCyclicDependency() throw verifyTaskHolder(peonInjector.getInstance(TaskHolder.class), compactionTask); } + @Test + public void testCliPeonMetricsContainAllTaskDimensions() throws IOException + { + final CompactionTask compactionTask = compactBuilder + .metricsSpec(new AggregatorFactory[]{ + new CountAggregatorFactory("cnt"), + new LongSumAggregatorFactory("val", "val") + }).build(); + + final Injector peonInjector = makePeonInjectorWithStubEmitter(compactionTask, temporaryFolder, mapper); + verifyLoadSpecHolder(peonInjector.getInstance(LoadSpecHolder.class), compactionTask); + verifyTaskHolder(peonInjector.getInstance(TaskHolder.class), compactionTask); + + Emitter instance = peonInjector.getInstance(Emitter.class); + Assert.assertTrue(instance instanceof StubServiceEmitter); + instance.start(); + + ServiceMetricEvent.Builder builder = ServiceMetricEvent.builder(); + ServiceEventBuilder eventBuilder = builder.setMetric("foo", 1.0); + builder.setDimension("dd", "wikipedia"); + builder.setDimension("ee", "index"); + ((StubServiceEmitter) instance).emit(eventBuilder); + + StubServiceEmitter stubEmitter = (StubServiceEmitter) instance; + Assert.assertEquals(1, stubEmitter.getNumEmittedEvents()); + List events = stubEmitter.getEvents(); + for (Event event : events) { + Assert.assertTrue(event instanceof ServiceMetricEvent); + EventMap map = event.toMap(); + Assert.assertEquals(compactionTask.getDataSource(), map.get("dataSource")); + Assert.assertEquals(compactionTask.getId(), map.get("id")); + Assert.assertEquals(compactionTask.getId(), map.get("taskId")); + Assert.assertEquals(compactionTask.getType(), map.get("taskType")); + Assert.assertEquals(compactionTask.getGroupId(), map.get("groupId")); + } + } + private Injector makePeonInjector(File taskFile, Properties properties) { final CliPeon peon = new CliPeon(); @@ -264,6 +318,43 @@ private Injector makePeonInjector(File taskFile, Properties properties) return peon.makeInjector(Set.of(NodeRole.PEON)); } + public static Injector makePeonInjectorWithStubEmitter(Task task, TemporaryFolder temporaryFolder, ObjectMapper mapper) throws IOException + { + File taskFile = temporaryFolder.newFile("task.json"); + FileUtils.write(taskFile, mapper.writeValueAsString(task), StandardCharsets.UTF_8); + + final Properties properties = new Properties(); + properties.setProperty("druid.emitter", "stub"); + + final Injector baseInjector = Guice.createInjector( + new JacksonModule(), + new LifecycleModule(), + binder -> { + binder.bind(Validator.class).toInstance(Validation.buildDefaultValidatorFactory().getValidator()); + binder.bindScope(LazySingleton.class, Scopes.SINGLETON); + binder.bind(Properties.class).toInstance(properties); + } + ); + + final CliPeon peon = new CliPeon() + { + @Override + protected List getModules() + { + // Load the CliPeon with StubServiceEmitter + List modules = new ArrayList<>(super.getModules()); + modules.add(new StubServiceEmitterModule()); + return modules; + } + }; + + peon.taskAndStatusFile = ImmutableList.of(taskFile.getParent(), "1"); + + peon.configure(properties); + peon.configure(properties, baseInjector); + return peon.makeInjector(Set.of(NodeRole.PEON)); + } + private Injector makePeonInjector(Task task, Properties properties) throws IOException { File taskFile = temporaryFolder.newFile("task.json"); diff --git a/services/src/test/java/org/apache/druid/server/AsyncQueryForwardingServletTest.java b/services/src/test/java/org/apache/druid/server/AsyncQueryForwardingServletTest.java index 24416859ea2f..f79e239d586d 100644 --- a/services/src/test/java/org/apache/druid/server/AsyncQueryForwardingServletTest.java +++ b/services/src/test/java/org/apache/druid/server/AsyncQueryForwardingServletTest.java @@ -500,7 +500,7 @@ public Throwable getFailure() } }; - final StubServiceEmitter stubServiceEmitter = new StubServiceEmitter("", ""); + final StubServiceEmitter stubServiceEmitter = StubServiceEmitter.createStarted(); final AsyncQueryForwardingServlet servlet = new AsyncQueryForwardingServlet( new MapQueryToolChestWarehouse(ImmutableMap.of()), TestHelper.makeJsonMapper(), @@ -571,7 +571,7 @@ public Throwable getFailure() } }; - final StubServiceEmitter stubServiceEmitter = new StubServiceEmitter("", ""); + final StubServiceEmitter stubServiceEmitter = StubServiceEmitter.createStarted(); final AsyncQueryForwardingServlet servlet = new AsyncQueryForwardingServlet( new MapQueryToolChestWarehouse(ImmutableMap.of()), TestHelper.makeJsonMapper(), @@ -624,7 +624,7 @@ public void testOnFailureWithExceptionAndUnassignedStatusCode() Mockito.when(responseMock.getStatus()).thenReturn(0); // Test unassigned http status code case from server Mockito.when(responseMock.getHeaders()).thenReturn(HttpFields.build()); - final StubServiceEmitter stubServiceEmitter = new StubServiceEmitter("", ""); + final StubServiceEmitter stubServiceEmitter = StubServiceEmitter.createStarted(); final AsyncQueryForwardingServlet servlet = new AsyncQueryForwardingServlet( new MapQueryToolChestWarehouse(ImmutableMap.of()), TestHelper.makeJsonMapper(), @@ -871,7 +871,7 @@ public CompletableFuture abort(Throwable throwable) } }; final Result result = new Result(proxyRequestMock, response); - final StubServiceEmitter stubServiceEmitter = new StubServiceEmitter("", ""); + final StubServiceEmitter stubServiceEmitter = StubServiceEmitter.createStarted(); final AsyncQueryForwardingServlet servlet = new AsyncQueryForwardingServlet( new MapQueryToolChestWarehouse(ImmutableMap.of()), jsonMapper, diff --git a/services/src/test/java/org/apache/druid/testing/embedded/emitter/LatchableEmitterModule.java b/services/src/test/java/org/apache/druid/testing/embedded/emitter/LatchableEmitterModule.java index 78603090e5f1..9bd3c8bc30e3 100644 --- a/services/src/test/java/org/apache/druid/testing/embedded/emitter/LatchableEmitterModule.java +++ b/services/src/test/java/org/apache/druid/testing/embedded/emitter/LatchableEmitterModule.java @@ -28,6 +28,7 @@ import org.apache.druid.guice.annotations.Self; import org.apache.druid.initialization.DruidModule; import org.apache.druid.java.util.emitter.core.Emitter; +import org.apache.druid.java.util.metrics.TaskHolder; import org.apache.druid.server.DruidNode; import org.apache.druid.server.metrics.LatchableEmitter; import org.apache.druid.server.metrics.LatchableEmitterConfig; @@ -51,9 +52,10 @@ public void configure(Binder binder) @ManageLifecycle public LatchableEmitter makeEmitter( @Self DruidNode selfNode, - LatchableEmitterConfig config + LatchableEmitterConfig config, + TaskHolder taskHolder ) { - return new LatchableEmitter(selfNode.getServiceName(), selfNode.getHost(), config); + return new LatchableEmitter(selfNode.getServiceName(), selfNode.getHost(), config, taskHolder); } } diff --git a/sql/src/test/java/org/apache/druid/sql/http/SqlResourceTest.java b/sql/src/test/java/org/apache/druid/sql/http/SqlResourceTest.java index 894619b9ea3c..6fb974a527cd 100644 --- a/sql/src/test/java/org/apache/druid/sql/http/SqlResourceTest.java +++ b/sql/src/test/java/org/apache/druid/sql/http/SqlResourceTest.java @@ -277,7 +277,7 @@ public void add(String sqlQueryId, Cancelable lifecycle) } } }; - stubServiceEmitter = new StubServiceEmitter("test", "test"); + stubServiceEmitter = StubServiceEmitter.createStarted(); final AuthConfig authConfig = new AuthConfig(); final DefaultQueryConfig defaultQueryConfig = new DefaultQueryConfig(ImmutableMap.of()); final SqlToolbox sqlToolbox = new SqlToolbox(