Skip to content

Commit fe8c2ce

Browse files
authored
DPL: send error count in a delayed manner (#5371)
More effort in trying to reduce the amount of metrics sent.
1 parent df17886 commit fe8c2ce

File tree

4 files changed

+8
-13
lines changed

4 files changed

+8
-13
lines changed

Framework/Core/include/Framework/DataProcessingDevice.h

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -74,7 +74,6 @@ struct DataProcessorContext {
7474
AlgorithmSpec::ErrorCallback* error = nullptr;
7575

7676
std::function<void(o2::framework::RuntimeErrorRef e, InputRecord& record)>* errorHandling = nullptr;
77-
int* errorCount = nullptr;
7877
};
7978

8079
/// A device actually carrying out all the DPL
@@ -124,7 +123,6 @@ class DataProcessingDevice : public FairMQDevice
124123
/// Completed actions
125124
std::vector<DataRelayer::RecordAction> mCompleted;
126125

127-
int mErrorCount;
128126
uint64_t mLastSlowMetricSentTimestamp = 0; /// The timestamp of the last time we sent slow metrics
129127
uint64_t mLastMetricFlushedTimestamp = 0; /// The timestamp of the last time we actually flushed metrics
130128
uint64_t mBeginIterationTimestamp = 0; /// The timestamp of when the current ConditionalRun was started

Framework/Core/include/Framework/DataProcessingStats.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ struct DataProcessingStats {
2525
int minLatency = 0;
2626
int maxLatency = 0;
2727
};
28+
std::atomic<int> errorCount = 0;
2829
std::atomic<int> pendingInputs = 0;
2930
std::atomic<int> incomplete = 0;
3031
std::atomic<int> inputParts = 0;

Framework/Core/src/CommonServices.cxx

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -480,6 +480,7 @@ auto sendRelayerMetrics(ServiceRegistry& registry, DataProcessingStats& stats) -
480480
monitoring.send(Metric{(int)relayerStats.droppedIncomingMessages, "dropped_incoming_messages"}.addTag(Key::Subsystem, Value::DPL));
481481
monitoring.send(Metric{(int)relayerStats.relayedMessages, "relayed_messages"}.addTag(Key::Subsystem, Value::DPL));
482482

483+
monitoring.send(Metric{(int)stats.errorCount, "errors"}.addTag(Key::Subsystem, Value::DPL));
483484
monitoring.send(Metric{(int)stats.pendingInputs, "inputs/relayed/pending"}.addTag(Key::Subsystem, Value::DPL));
484485
monitoring.send(Metric{(int)stats.incomplete, "inputs/relayed/incomplete"}.addTag(Key::Subsystem, Value::DPL));
485486
monitoring.send(Metric{(int)stats.inputParts, "inputs/relayed/total"}.addTag(Key::Subsystem, Value::DPL));

Framework/Core/src/DataProcessingDevice.cxx

Lines changed: 6 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -94,8 +94,7 @@ DataProcessingDevice::DataProcessingDevice(DeviceSpec const& spec, ServiceRegist
9494
mError{spec.algorithm.onError},
9595
mConfigRegistry{nullptr},
9696
mAllocator{&mTimingInfo, &registry, spec.outputs},
97-
mServiceRegistry{registry},
98-
mErrorCount{0}
97+
mServiceRegistry{registry}
9998
{
10099
/// FIXME: move erro handling to a service?
101100
if (mError != nullptr) {
@@ -427,7 +426,6 @@ void DataProcessingDevice::fillContext(DataProcessorContext& context)
427426
context.error = &mError;
428427
/// Callback for the error handling
429428
context.errorHandling = &mErrorHandling;
430-
context.errorCount = &mErrorCount;
431429
}
432430

433431
void DataProcessingDevice::PreRun()
@@ -684,9 +682,8 @@ void DataProcessingDevice::handleData(DataProcessorContext& context, FairMQParts
684682
return results;
685683
};
686684

687-
auto reportError = [& registry = *context.registry, &context](const char* message) {
688-
context.errorCount++;
689-
registry.get<Monitoring>().send(Metric{*context.errorCount, "errors"}.addTag(Key::Subsystem, Value::DPL));
685+
auto reportError = [&registry = *context.registry, &context](const char* message) {
686+
registry.get<DataProcessingStats>().errorCount++;
690687
};
691688

692689
auto handleValidMessages = [&parts, &context = context, &relayer = *context.relayer, &reportError](std::vector<InputType> const& types) {
@@ -782,9 +779,8 @@ bool DataProcessingDevice::tryDispatchComputation(DataProcessorContext& context,
782779
// should work just fine.
783780
std::vector<MessageSet> currentSetOfInputs;
784781

785-
auto reportError = [& registry = *context.registry, &context](const char* message) {
786-
context.errorCount++;
787-
registry.get<Monitoring>().send(Metric{*context.errorCount, "errors"}.addTag(Key::Subsystem, Value::DPL));
782+
auto reportError = [&registry = *context.registry, &context](const char* message) {
783+
registry.get<DataProcessingStats>().errorCount++;
788784
};
789785

790786
// For the moment we have a simple "immediately dispatch" policy for stuff
@@ -1077,8 +1073,7 @@ bool DataProcessingDevice::tryDispatchComputation(DataProcessorContext& context,
10771073
void DataProcessingDevice::error(const char* msg)
10781074
{
10791075
LOG(ERROR) << msg;
1080-
mErrorCount++;
1081-
mServiceRegistry.get<Monitoring>().send(Metric{mErrorCount, "errors"}.addTag(Key::Subsystem, Value::DPL));
1076+
mServiceRegistry.get<DataProcessingStats>().errorCount++;
10821077
}
10831078

10841079
} // namespace o2::framework

0 commit comments

Comments
 (0)