Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
import org.apache.commons.io.FileUtils;
import org.apache.curator.test.TestingCluster;
import org.apache.druid.cli.CliPeon;
import org.apache.druid.cli.CliPeonTest;
import org.apache.druid.cli.PeonLoadSpecHolder;
import org.apache.druid.cli.PeonTaskHolder;
import org.apache.druid.data.input.InputEntity;
Expand Down Expand Up @@ -88,6 +89,12 @@
import org.apache.druid.java.util.common.guava.Sequences;
import org.apache.druid.java.util.common.parsers.CloseableIterator;
import org.apache.druid.java.util.common.parsers.ParseException;
import org.apache.druid.java.util.emitter.core.Emitter;
import org.apache.druid.java.util.emitter.core.Event;
import org.apache.druid.java.util.emitter.core.EventMap;
import org.apache.druid.java.util.emitter.service.ServiceMetricEvent;
import org.apache.druid.java.util.metrics.StubServiceEmitter;
import org.apache.druid.java.util.metrics.TaskHolder;
import org.apache.druid.math.expr.ExprMacroTable;
import org.apache.druid.query.DefaultGenericQueryMetricsFactory;
import org.apache.druid.query.DefaultQueryRunnerFactoryConglomerate;
Expand Down Expand Up @@ -121,7 +128,6 @@
import org.apache.druid.server.coordination.BroadcastDatasourceLoadingSpec;
import org.apache.druid.server.lookup.cache.LookupLoadingSpec;
import org.apache.druid.server.metrics.LoadSpecHolder;
import org.apache.druid.server.metrics.TaskHolder;
import org.apache.druid.server.security.Action;
import org.apache.druid.server.security.Resource;
import org.apache.druid.server.security.ResourceAction;
Expand Down Expand Up @@ -3487,6 +3493,64 @@ public void testTaskWithTransformSpecDoesNotCauseCliPeonCyclicDependency()
verifyPersistAndMergeTimeMetricsArePositive(observedSegmentGenerationMetrics);
}

@Test(timeout = 60_000L)
public void testKafkaTaskContainsAllTaskDimensions()
throws IOException, ExecutionException, InterruptedException
{
insertData();

final KafkaIndexTask task = createTask(
"index_kafka_test_id1",
new KafkaIndexTaskIOConfig(
0,
"sequence0",
new SeekableStreamStartSequenceNumbers<>(topic, ImmutableMap.of(new KafkaTopicPartition(false, topic, 0), 2L), ImmutableSet.of()),
new SeekableStreamEndSequenceNumbers<>(topic, ImmutableMap.of(new KafkaTopicPartition(false, topic, 0), 5L)),
kafkaServer.consumerProperties(),
KafkaSupervisorIOConfig.DEFAULT_POLL_TIMEOUT_MILLIS,
true,
null,
null,
INPUT_FORMAT,
null,
Duration.standardHours(2).getStandardMinutes()
)
);

Injector peonInjector = CliPeonTest.makePeonInjectorWithStubEmitter(task, temporaryFolder, OBJECT_MAPPER);
Emitter peonEmitter = peonInjector.getInstance(Emitter.class);
Assert.assertTrue(peonEmitter instanceof StubServiceEmitter);
emitter = (StubServiceEmitter) peonEmitter;
emitter.start();
makeToolboxFactory();

final ListenableFuture<TaskStatus> future = runTask(task);

// Wait for task to exit
Assert.assertEquals(TaskState.SUCCESS, future.get().getStatusCode());
Assert.assertTrue(emitter.getNumEmittedEvents() > 0);

// Check published metadata & segments in deep storage
final List<SegmentDescriptor> publishedDescriptors = publishedDescriptors();
Assert.assertEquals(
new KafkaDataSourceMetadata(new SeekableStreamEndSequenceNumbers<>(topic, ImmutableMap.of(new KafkaTopicPartition(false, topic, 0), 5L))),
newDataSchemaMetadata()
);
Assert.assertEquals(ImmutableList.of("c"), readSegmentColumn("dim1", publishedDescriptors.get(0)));

Assert.assertTrue(emitter.getNumEmittedEvents() > 0);
for (Event event : emitter.getEvents()) {
if (event instanceof ServiceMetricEvent) {
EventMap observedEvent = event.toMap();
Assert.assertEquals("test_ds", observedEvent.get("dataSource"));
Assert.assertEquals("index_kafka_test_id1", observedEvent.get("id"));
Assert.assertEquals("index_kafka_test_id1", observedEvent.get("taskId"));
Assert.assertEquals("index_kafka", observedEvent.get("taskType"));
Assert.assertEquals("index_kafka_test_ds", observedEvent.get("groupId"));
}
}
}

public static class TestKafkaInputFormat implements InputFormat
{
final InputFormat baseInputFormat;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,39 +25,56 @@
import org.apache.druid.java.util.common.lifecycle.LifecycleStop;
import org.apache.druid.java.util.emitter.core.Emitter;
import org.apache.druid.java.util.emitter.core.Event;
import org.apache.druid.java.util.metrics.NoopTaskHolder;
import org.apache.druid.java.util.metrics.TaskHolder;

import java.io.IOException;

public class ServiceEmitter implements Emitter
{
private final ImmutableMap<String, String> serviceDimensions;
private final Emitter emitter;
private final String service;
private final ImmutableMap<String, String> otherServiceDimensions;
private final String host;
private final TaskHolder taskHolder;

/**
* This is initialized in {@link #start()} rather than in the constructor, since calling {@link TaskHolder#getMetricDimensions()}
* may introduce cyclic dependencies. So we defer initialization until {@link #start()} which is {@link LifecycleStart} managed.
*/
private ImmutableMap<String, String> serviceDimensions;

public ServiceEmitter(String service, String host, Emitter emitter)
{
this(service, host, emitter, ImmutableMap.of());
this(service, host, emitter, ImmutableMap.of(), new NoopTaskHolder());
}

public ServiceEmitter(
String service,
String host,
Emitter emitter,
ImmutableMap<String, String> otherServiceDimensions
ImmutableMap<String, String> otherServiceDimensions,
TaskHolder taskHolder
)
{
this.serviceDimensions = ImmutableMap
.<String, String>builder()
.put("service", Preconditions.checkNotNull(service, "service should be non-null"))
.put("host", Preconditions.checkNotNull(host, "host should be non-null"))
.putAll(otherServiceDimensions)
.build();
this.service = Preconditions.checkNotNull(service, "service should be non-null");
this.host = Preconditions.checkNotNull(host, "host should be non-null");
this.otherServiceDimensions = otherServiceDimensions;
this.emitter = emitter;
this.taskHolder = taskHolder;
}

@Override
@LifecycleStart
public void start()
{
serviceDimensions = ImmutableMap
.<String, String>builder()
.put(Event.SERVICE, service)
.put(Event.HOST, host)
.putAll(otherServiceDimensions)
.putAll(taskHolder.getMetricDimensions())
.build();
emitter.start();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,23 +40,21 @@ public class CgroupCpuMonitor extends FeedDefiningMonitor
private static final Logger LOG = new Logger(CgroupCpuMonitor.class);
private static final Long DEFAULT_USER_HZ = 100L;
final CgroupDiscoverer cgroupDiscoverer;
final Map<String, String[]> dimensions;
private Long userHz;
private final KeyedDiff jiffies = new KeyedDiff();
private long prevJiffiesSnapshotAt = 0;
private final boolean isRunningOnCgroupsV2;
private final CgroupV2CpuMonitor cgroupV2CpuMonitor;

public CgroupCpuMonitor(CgroupDiscoverer cgroupDiscoverer, final Map<String, String[]> dimensions, String feed)
public CgroupCpuMonitor(CgroupDiscoverer cgroupDiscoverer, String feed)
{
super(feed);
this.cgroupDiscoverer = cgroupDiscoverer;
this.dimensions = dimensions;


// Check if we're running on cgroups v2
this.isRunningOnCgroupsV2 = cgroupDiscoverer.getCgroupVersion().equals(CgroupVersion.V2);
if (isRunningOnCgroupsV2) {
this.cgroupV2CpuMonitor = new CgroupV2CpuMonitor(cgroupDiscoverer, dimensions, feed);
this.cgroupV2CpuMonitor = new CgroupV2CpuMonitor(cgroupDiscoverer, feed);
LOG.info("Detected cgroups v2, using CgroupV2CpuMonitor behavior for accurate metrics");
} else {
this.cgroupV2CpuMonitor = null;
Expand All @@ -65,19 +63,14 @@ public CgroupCpuMonitor(CgroupDiscoverer cgroupDiscoverer, final Map<String, Str

}

public CgroupCpuMonitor(final Map<String, String[]> dimensions, String feed)
{
this(ProcSelfCgroupDiscoverer.autoCgroupDiscoverer(), dimensions, feed);
}

public CgroupCpuMonitor(final Map<String, String[]> dimensions)
public CgroupCpuMonitor(String feed)
{
this(dimensions, DEFAULT_METRICS_FEED);
this(ProcSelfCgroupDiscoverer.autoCgroupDiscoverer(), feed);
}

public CgroupCpuMonitor()
{
this(ImmutableMap.of());
this(DEFAULT_METRICS_FEED);
}

@Override
Expand All @@ -96,7 +89,6 @@ private boolean doMonitorV1(ServiceEmitter emitter)
long now = Instant.now().getEpochSecond();

final ServiceMetricEvent.Builder builder = builder();
MonitorUtils.addDimensionsToBuilder(builder, dimensions);
builder.setDimension("cgroupversion", cgroupDiscoverer.getCgroupVersion().name());

emitter.emit(builder.setMetric("cgroup/cpu/shares", cpuSnapshot.getShares()));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,52 +19,41 @@

package org.apache.druid.java.util.metrics;

import com.google.common.collect.ImmutableMap;
import org.apache.druid.java.util.emitter.service.ServiceEmitter;
import org.apache.druid.java.util.emitter.service.ServiceMetricEvent;
import org.apache.druid.java.util.metrics.cgroups.CgroupDiscoverer;
import org.apache.druid.java.util.metrics.cgroups.CpuSet;
import org.apache.druid.java.util.metrics.cgroups.ProcSelfCgroupDiscoverer;

import java.util.Map;

/**
* Monitor that reports CPU set metrics from cgroups both v1 and v2.
*/

public class CgroupCpuSetMonitor extends FeedDefiningMonitor
{
final CgroupDiscoverer cgroupDiscoverer;
final Map<String, String[]> dimensions;

public CgroupCpuSetMonitor(CgroupDiscoverer cgroupDiscoverer, final Map<String, String[]> dimensions, String feed)
public CgroupCpuSetMonitor(CgroupDiscoverer cgroupDiscoverer, String feed)
{
super(feed);
this.cgroupDiscoverer = cgroupDiscoverer;
this.dimensions = dimensions;
}

public CgroupCpuSetMonitor(final Map<String, String[]> dimensions, String feed)
{
this(ProcSelfCgroupDiscoverer.autoCgroupDiscoverer(), dimensions, feed);
}

public CgroupCpuSetMonitor(final Map<String, String[]> dimensions)
public CgroupCpuSetMonitor(String feed)
{
this(dimensions, DEFAULT_METRICS_FEED);
this(ProcSelfCgroupDiscoverer.autoCgroupDiscoverer(), feed);
}

public CgroupCpuSetMonitor()
{
this(ImmutableMap.of());
this(DEFAULT_METRICS_FEED);
}

@Override
public boolean doMonitor(ServiceEmitter emitter)
{
final CpuSet.CpuSetMetric cpusetSnapshot = cgroupDiscoverer.getCpuSetMetrics();
final ServiceMetricEvent.Builder builder = builder();
MonitorUtils.addDimensionsToBuilder(builder, dimensions);
builder.setDimension("cgroupversion", cgroupDiscoverer.getCgroupVersion().name());
emitter.emit(builder.setMetric(
"cgroup/cpuset/cpu_count",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,41 +33,34 @@
public class CgroupDiskMonitor extends FeedDefiningMonitor
{
private static final Logger LOG = new Logger(CgroupDiskMonitor.class);
final CgroupDiscoverer cgroupDiscoverer;
final Map<String, String[]> dimensions;
private final CgroupDiscoverer cgroupDiscoverer;
private final KeyedDiff diff = new KeyedDiff();
private final boolean isRunningOnCgroupsV2;
private final CgroupV2DiskMonitor cgroupV2DiskMonitor;

public CgroupDiskMonitor(CgroupDiscoverer cgroupDiscoverer, final Map<String, String[]> dimensions, String feed)
public CgroupDiskMonitor(CgroupDiscoverer cgroupDiscoverer, String feed)
{
super(feed);
this.cgroupDiscoverer = cgroupDiscoverer;
this.dimensions = dimensions;


// Check if we're running on cgroups v2
this.isRunningOnCgroupsV2 = cgroupDiscoverer.getCgroupVersion().equals(CgroupVersion.V2);
if (isRunningOnCgroupsV2) {
this.cgroupV2DiskMonitor = new CgroupV2DiskMonitor(cgroupDiscoverer, dimensions, feed);
this.cgroupV2DiskMonitor = new CgroupV2DiskMonitor(cgroupDiscoverer, feed);
LOG.info("Detected cgroups v2, using CgroupV2DiskMonitor behavior for accurate metrics");
} else {
this.cgroupV2DiskMonitor = null;
}
}

public CgroupDiskMonitor(final Map<String, String[]> dimensions, String feed)
{
this(ProcSelfCgroupDiscoverer.autoCgroupDiscoverer(), dimensions, feed);
}

public CgroupDiskMonitor(final Map<String, String[]> dimensions)
public CgroupDiskMonitor(String feed)
{
this(dimensions, DEFAULT_METRICS_FEED);
this(ProcSelfCgroupDiscoverer.autoCgroupDiscoverer(), feed);
}

public CgroupDiskMonitor()
{
this(ImmutableMap.of());
this(DEFAULT_METRICS_FEED);
}

@Override
Expand Down Expand Up @@ -97,7 +90,6 @@ private boolean doMonitorV1(ServiceEmitter emitter)
if (stats != null) {
final ServiceMetricEvent.Builder builder = builder()
.setDimension("diskName", entry.getValue().getDiskName());
MonitorUtils.addDimensionsToBuilder(builder, dimensions);
builder.setDimension("cgroupversion", cgroupDiscoverer.getCgroupVersion());
for (Map.Entry<String, Long> stat : stats.entrySet()) {
emitter.emit(builder.setMetric(stat.getKey(), stat.getValue()));
Expand Down
Loading
Loading