Skip to content

Commit 50336fe

Browse files
authored
DPL: provide metric for rate in and out of each channel (#7393)
1 parent 20c1494 commit 50336fe

File tree

2 files changed

+29
-0
lines changed

2 files changed

+29
-0
lines changed

Framework/Core/include/Framework/DataProcessingStats.h

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,12 +40,16 @@ struct DataProcessingStats {
4040
std::atomic<uint64_t> availableManagedShm = 0; /// Available shared memory in bytes.
4141

4242
std::atomic<uint64_t> lastSlowMetricSentTimestamp = 0; /// The timestamp of the last time we sent slow metrics
43+
std::atomic<uint64_t> lastVerySlowMetricSentTimestamp = 0; /// The timestamp of the last time we sent very slow metrics
4344
std::atomic<uint64_t> lastMetricFlushedTimestamp = 0; /// The timestamp of the last time we actually flushed metrics
4445
std::atomic<uint64_t> beginIterationTimestamp = 0; /// The timestamp of when the current ConditionalRun was started
4546

4647
std::atomic<uint64_t> performedComputations = 0; // The number of computations which have completed so far
4748
std::atomic<uint64_t> lastReportedPerformedComputations = 0; // The number of computations which have completed until lastSlowMetricSentTimestamp
4849

50+
std::vector<uint64_t> channelBytesIn; // How many incoming bytes have gone through the channels
51+
std::vector<uint64_t> channelBytesOut; // How many outgoing bytes have gone through the channels
52+
4953
InputLatency lastLatency = {0, 0};
5054

5155
std::atomic<int> relayerState[MAX_RELAYER_STATES];

Framework/Core/src/CommonServices.cxx

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -440,6 +440,7 @@ namespace
440440
auto sendRelayerMetrics(ServiceRegistry& registry, DataProcessingStats& stats) -> void
441441
{
442442
auto timeSinceLastUpdate = stats.beginIterationTimestamp - stats.lastSlowMetricSentTimestamp;
443+
auto timeSinceLastLongUpdate = stats.beginIterationTimestamp - stats.lastVerySlowMetricSentTimestamp;
443444
if (timeSinceLastUpdate < 5000) {
444445
return;
445446
}
@@ -503,6 +504,30 @@ auto sendRelayerMetrics(ServiceRegistry& registry, DataProcessingStats& stats) -
503504
stats.lastSlowMetricSentTimestamp.store(stats.beginIterationTimestamp.load());
504505
stats.lastReportedPerformedComputations.store(stats.performedComputations.load());
505506
O2_SIGNPOST_END(MonitoringStatus::ID, MonitoringStatus::SEND, 0, 0, O2_SIGNPOST_BLUE);
507+
508+
// Things which we report every 30s
509+
if (timeSinceLastLongUpdate < 30000) {
510+
return;
511+
}
512+
513+
auto device = registry.get<RawDeviceService>().device();
514+
515+
stats.channelBytesIn.resize(device->fChannels.size());
516+
stats.channelBytesOut.resize(device->fChannels.size());
517+
size_t ci = 0;
518+
for (auto& channel : device->fChannels) {
519+
auto newBytesOut = channel.second[0].GetBytesTx();
520+
auto newBytesIn = channel.second[0].GetBytesRx();
521+
monitoring.send(Metric{(float)(newBytesOut - stats.channelBytesOut[ci]) / 1000000.f / (timeSinceLastLongUpdate / 1000.f), fmt::format("channel_{}_rate_in_mb_s", channel.first)}
522+
.addTag(Key::Subsystem, Value::DPL));
523+
monitoring.send(Metric{(float)(newBytesIn - stats.channelBytesIn[ci]) / 1000000.f / (timeSinceLastLongUpdate / 1000.f), fmt::format("channel_{}_rate_out_mb_s", channel.first)}
524+
.addTag(Key::Subsystem, Value::DPL));
525+
stats.channelBytesOut[ci] = newBytesOut;
526+
stats.channelBytesIn[ci] = newBytesIn;
527+
ci++;
528+
}
529+
530+
stats.lastVerySlowMetricSentTimestamp.store(stats.beginIterationTimestamp.load());
506531
};
507532

508533
/// This will flush metrics only once every second.

0 commit comments

Comments
 (0)