Skip to content

Commit 93e9e59

Browse files
committed
DPL: add propaedeutic changes for consumed timeframes
1 parent 3560d18 commit 93e9e59

13 files changed

+96
-42
lines changed

Framework/Core/include/Framework/AnalysisHelpers.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -448,7 +448,7 @@ struct OutputObj {
448448
s << std::hex << mTaskHash;
449449
s << std::hex << reinterpret_cast<uint64_t>(this);
450450
std::memcpy(desc.str, s.str().c_str(), 12);
451-
return OutputSpec{OutputLabel{label}, "ATSK", desc, 0};
451+
return OutputSpec{OutputLabel{label}, "ATSK", desc, 0, Lifetime::QA};
452452
}
453453

454454
T* operator->()

Framework/Core/include/Framework/CallbackService.h

Lines changed: 14 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -8,18 +8,17 @@
88
// In applying this license CERN does not waive the privileges and immunities
99
// granted to it by virtue of its status as an Intergovernmental Organization
1010
// or submit itself to any jurisdiction.
11-
#ifndef FRAMEWORK_CALLBACKSERVICE_H
12-
#define FRAMEWORK_CALLBACKSERVICE_H
11+
#ifndef O2_FRAMEWORK_CALLBACKSERVICE_H_
12+
#define O2_FRAMEWORK_CALLBACKSERVICE_H_
1313

1414
#include "CallbackRegistry.h"
1515
#include "Framework/ServiceHandle.h"
16+
#include "ServiceRegistry.h"
1617
#include <tuple>
1718

1819
#include <fairmq/FwdDecls.h>
1920

20-
namespace o2
21-
{
22-
namespace framework
21+
namespace o2::framework
2322
{
2423

2524
class EndOfStreamContext;
@@ -34,11 +33,12 @@ class CallbackService
3433
constexpr static ServiceKind service_kind = ServiceKind::Global;
3534
/// the defined processing steps at which a callback can be invoked
3635
enum class Id {
37-
Start, /**< Invoked before the inner loop is started */
38-
Stop, /**< Invoked when the device is about to be stoped */
39-
Reset, /**< Invoked on device rest */
40-
Idle, /**< Invoked when there was no computation scheduled */
41-
ClockTick, /**< Invoked every iteration of the inner loop */
36+
Start, /**< Invoked before the inner loop is started */
37+
Stop, /**< Invoked when the device is about to be stoped */
38+
Reset, /**< Invoked on device rest */
39+
Idle, /**< Invoked when there was no computation scheduled */
40+
ClockTick, /**< Invoked every iteration of the inner loop */
41+
DataConsumed, /**< Invoked whenever data has been consumed */
4242
/// Invoked when we are notified that no further data will arrive.
4343
/// Notice that one could have more "EndOfData" notifications. Because
4444
/// we could be signaled by control that the data flow restarted.
@@ -66,6 +66,7 @@ class CallbackService
6666
using ResetCallback = std::function<void()>;
6767
using IdleCallback = std::function<void()>;
6868
using ClockTickCallback = std::function<void()>;
69+
using DataConsumedCallback = std::function<void(ServiceRegistry&)>;
6970
using EndOfStreamCallback = std::function<void(EndOfStreamContext&)>;
7071
using RegionInfoCallback = std::function<void(FairMQRegionInfo const&)>;
7172

@@ -75,6 +76,7 @@ class CallbackService
7576
RegistryPair<Id, Id::Reset, ResetCallback>, //
7677
RegistryPair<Id, Id::Idle, IdleCallback>, //
7778
RegistryPair<Id, Id::ClockTick, ClockTickCallback>, //
79+
RegistryPair<Id, Id::DataConsumed, DataConsumedCallback>, //
7880
RegistryPair<Id, Id::EndOfStream, EndOfStreamCallback>, //
7981
RegistryPair<Id, Id::RegionInfoCallback, RegionInfoCallback> //
8082
>; //
@@ -97,6 +99,5 @@ class CallbackService
9799
Callbacks mCallbacks;
98100
};
99101

100-
} // namespace framework
101-
} // namespace o2
102-
#endif // FRAMEWORK_CALLBACKSERVICE_H
102+
} // namespace o2::framework
103+
#endif // O2_FRAMEWORK_CALLBACKSERVICE_H_

Framework/Core/include/Framework/CompletionPolicy.h

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,12 @@ struct CompletionPolicy {
4646
Wait,
4747
/// Do not run the ProcessCallback. Contents of the record will
4848
/// be forwarded to the next consumer, if any.
49-
Discard
49+
Discard,
50+
/// ConsumeExisting: run the ProcessCallback on the InputRecord. After
51+
/// we are done, the processed payloads will be deallocated (but
52+
/// not the headers) while we wait for the record to be actually fully
53+
/// Consumed.
54+
ConsumeExisting
5055
};
5156

5257
using Matcher = std::function<bool(DeviceSpec const& device)>;

Framework/Core/include/Framework/CompletionPolicyHelpers.h

Lines changed: 10 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -8,8 +8,8 @@
88
// In applying this license CERN does not waive the privileges and immunities
99
// granted to it by virtue of its status as an Intergovernmental Organization
1010
// or submit itself to any jurisdiction.
11-
#ifndef FRAMEWORK_COMPLETIONPOLICYHELPERS_H
12-
#define FRAMEWORK_COMPLETIONPOLICYHELPERS_H
11+
#ifndef O2_FRAMEWORK_COMPLETIONPOLICYHELPERS_H_
12+
#define O2_FRAMEWORK_COMPLETIONPOLICYHELPERS_H_
1313

1414
#include "Framework/ChannelSpec.h"
1515
#include "Framework/CompletionPolicyHelpers.h"
@@ -20,9 +20,7 @@
2020
#include <string>
2121
#include <type_traits>
2222

23-
namespace o2
24-
{
25-
namespace framework
23+
namespace o2::framework
2624
{
2725

2826
/// Helper class which holds commonly used policies.
@@ -41,6 +39,11 @@ struct CompletionPolicyHelpers {
4139
{
4240
return consumeWhenAny("consume-any", matcher);
4341
}
42+
/// When any of the parts of the record have been received, process the existing and free the associated payloads.
43+
/// This allows freeing things as early as possible, while still being able to wait
44+
/// all the parts before disposing the timeslice completely
45+
static CompletionPolicy consumeExistingWhenAny(const char* name, CompletionPolicy::Matcher matcher);
46+
4447
/// When any of the parts of the record have been received, process them,
4548
/// without actually consuming them.
4649
static CompletionPolicy processWhenAny(const char* name, CompletionPolicy::Matcher matcher);
@@ -64,7 +67,6 @@ struct CompletionPolicyHelpers {
6467
}
6568
};
6669

67-
} // namespace framework
68-
} // namespace o2
70+
} // namespace o2::framework
6971

70-
#endif // FRAMEWORK_COMPLETIONPOLICYHELPERS_H
72+
#endif // O2_FRAMEWORK_COMPLETIONPOLICYHELPERS_H_

Framework/Core/include/Framework/DataProcessingStats.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@ struct DataProcessingStats {
3636
std::atomic<int> lastProcessedSize = 0;
3737
std::atomic<int> totalProcessedSize = 0;
3838
std::atomic<int> totalSigusr1 = 0;
39+
std::atomic<int> consumedTimeframes = 0;
3940

4041
std::atomic<uint64_t> lastSlowMetricSentTimestamp = 0; /// The timestamp of the last time we sent slow metrics
4142
std::atomic<uint64_t> lastMetricFlushedTimestamp = 0; /// The timestamp of the last time we actually flushed metrics

Framework/Core/src/CommonDataProcessors.cxx

Lines changed: 23 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
#include "Framework/DataDescriptorMatcher.h"
2020
#include "Framework/DataOutputDirector.h"
2121
#include "Framework/DataProcessorSpec.h"
22+
#include "Framework/DataProcessingStats.h"
2223
#include "Framework/DataSpecUtils.h"
2324
#include "Framework/TableBuilder.h"
2425
#include "Framework/EndOfStreamContext.h"
@@ -34,6 +35,7 @@
3435
#include "Framework/ChannelSpec.h"
3536
#include "Framework/ExternalFairMQDeviceProxy.h"
3637
#include "Framework/RuntimeError.h"
38+
#include <Monitoring/Monitoring.h>
3739

3840
#include "TFile.h"
3941
#include "TTree.h"
@@ -53,9 +55,10 @@ template class std::vector<o2::framework::OutputObjectInfo>;
5355
template class std::vector<o2::framework::OutputTaskInfo>;
5456
using namespace o2::framework::data_matcher;
5557

56-
namespace o2
57-
{
58-
namespace framework
58+
#pragma GCC diagnostic push
59+
#pragma GCC diagnostic ignored "-Wpedantic"
60+
61+
namespace o2::framework
5962
{
6063

6164
struct InputObjectRoute {
@@ -230,12 +233,12 @@ DataProcessorSpec CommonDataProcessors::getOutputObjHistSink(std::vector<OutputO
230233
};
231234
};
232235

236+
char const* name = "internal-dpl-aod-global-analysis-file-sink";
233237
DataProcessorSpec spec{
234-
"internal-dpl-aod-global-analysis-file-sink",
235-
{InputSpec("x", DataSpecUtils::dataDescriptorMatcherFrom(header::DataOrigin{"ATSK"}))},
236-
Outputs{},
237-
AlgorithmSpec(writerFunction),
238-
{}};
238+
.name = name,
239+
.inputs = {InputSpec("x", DataSpecUtils::dataDescriptorMatcherFrom(header::DataOrigin{"ATSK"}))},
240+
.algorithm = {writerFunction},
241+
};
239242

240243
return spec;
241244
}
@@ -498,11 +501,17 @@ DataProcessorSpec CommonDataProcessors::getGlobalFairMQSink(std::vector<InputSpe
498501
DataProcessorSpec CommonDataProcessors::getDummySink(std::vector<InputSpec> const& danglingOutputInputs)
499502
{
500503
return DataProcessorSpec{
501-
"internal-dpl-injected-dummy-sink",
502-
danglingOutputInputs,
503-
Outputs{},
504-
AlgorithmSpec([](ProcessingContext& ctx) {})};
504+
.name = "internal-dpl-injected-dummy-sink",
505+
.inputs = danglingOutputInputs,
506+
.algorithm = AlgorithmSpec{adaptStateful([](CallbackService& callbacks) {
507+
auto dataConsumed = [](ServiceRegistry& services) {
508+
services.get<DataProcessingStats>().consumedTimeframes++;
509+
};
510+
callbacks.set(CallbackService::Id::DataConsumed, dataConsumed);
511+
512+
return adaptStateless([]() {});
513+
})}};
505514
}
506515

507-
} // namespace framework
508-
} // namespace o2
516+
#pragma GCC diagnostic pop
517+
} // namespace o2::framework

Framework/Core/src/CommonServices.cxx

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -473,6 +473,10 @@ auto sendRelayerMetrics(ServiceRegistry& registry, DataProcessingStats& stats) -
473473
.addTag(Key::Subsystem, Value::DPL));
474474
monitoring.send(Metric{((float)performedComputationsSinceLastUpdate / (float)timeSinceLastUpdate) * 1000, "processing_rate_hz"}.addTag(Key::Subsystem, Value::DPL));
475475

476+
if (stats.consumedTimeframes) {
477+
monitoring.send(Metric{stats.consumedTimeframes, "consumed-timeframes"}.addTag(Key::Subsystem, Value::DPL));
478+
}
479+
476480
stats.lastSlowMetricSentTimestamp.store(stats.beginIterationTimestamp.load());
477481
stats.lastReportedPerformedComputations.store(stats.performedComputations.load());
478482
O2_SIGNPOST_END(MonitoringStatus::ID, MonitoringStatus::SEND, 0, 0, O2_SIGNPOST_BLUE);

Framework/Core/src/CompletionPolicy.cxx

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
#include "Framework/CompletionPolicy.h"
1313
#include "Framework/CompletionPolicyHelpers.h"
1414
#include "Framework/InputRecord.h"
15+
#include "Framework/DeviceSpec.h"
1516
#include <functional>
1617
#include <iostream>
1718

@@ -44,6 +45,8 @@ std::ostream& operator<<(std::ostream& oss, CompletionPolicy::CompletionOp const
4445
case CompletionPolicy::CompletionOp::Discard:
4546
oss << "discard";
4647
break;
48+
case CompletionPolicy::CompletionOp::ConsumeExisting:
49+
oss << "consumeExisting";
4750
};
4851
return oss;
4952
}

Framework/Core/src/CompletionPolicyHelpers.cxx

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -81,6 +81,9 @@ CompletionPolicy CompletionPolicyHelpers::defineByName(std::string const& name,
8181
case CompletionPolicy::CompletionOp::Consume:
8282
return CompletionPolicy{"always-consume", matcher, callback};
8383
break;
84+
case CompletionPolicy::CompletionOp::ConsumeExisting:
85+
return CompletionPolicy{"consume-existing", matcher, callback};
86+
break;
8487
case CompletionPolicy::CompletionOp::Process:
8588
return CompletionPolicy{"always-process", matcher, callback};
8689
break;
@@ -107,6 +110,25 @@ CompletionPolicy CompletionPolicyHelpers::consumeWhenAll(const char* name, Compl
107110
return CompletionPolicy{name, matcher, callback};
108111
}
109112

113+
CompletionPolicy CompletionPolicyHelpers::consumeExistingWhenAny(const char* name, CompletionPolicy::Matcher matcher)
114+
{
115+
auto callback = [](InputSpan const& inputs) -> CompletionPolicy::CompletionOp {
116+
size_t present = 0;
117+
for (auto& input : inputs) {
118+
if (input.header != nullptr) {
119+
present++;
120+
}
121+
}
122+
if (present == inputs.size()) {
123+
return CompletionPolicy::CompletionOp::Consume;
124+
} else if (present == 0) {
125+
return CompletionPolicy::CompletionOp::Wait;
126+
}
127+
return CompletionPolicy::CompletionOp::ConsumeExisting;
128+
};
129+
return CompletionPolicy{name, matcher, callback};
130+
}
131+
110132
CompletionPolicy CompletionPolicyHelpers::consumeWhenAny(const char* name, CompletionPolicy::Matcher matcher)
111133
{
112134
auto callback = [](InputSpan const& inputs) -> CompletionPolicy::CompletionOp {

Framework/Core/src/DataProcessingDevice.cxx

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,8 @@
4242
#include "DataProcessingHelpers.h"
4343
#include "DataRelayerHelpers.h"
4444
#include "ProcessingPoliciesHelpers.h"
45+
#include "Headers/DataHeader.h"
46+
#include "Headers/DataHeaderHelpers.h"
4547

4648
#include "ScopedExit.h"
4749

@@ -1188,7 +1190,7 @@ bool DataProcessingDevice::tryDispatchComputation(DataProcessorContext& context,
11881190
// FIXME: do it in a smarter way than O(N^2)
11891191
auto forwardInputs = [&reportError,
11901192
&spec = context.deviceContext->spec,
1191-
&device = context.deviceContext->device, &currentSetOfInputs](TimesliceSlot slot, InputRecord& record, bool copy) {
1193+
&device = context.deviceContext->device, &currentSetOfInputs](TimesliceSlot slot, InputRecord& record, bool copy, bool consume = true) {
11921194
ZoneScopedN("forward inputs");
11931195
assert(record.size() == currentSetOfInputs.size());
11941196
// we collect all messages per forward in a map and send them together
@@ -1367,7 +1369,7 @@ bool DataProcessingDevice::tryDispatchComputation(DataProcessorContext& context,
13671369

13681370
static bool noCatch = getenv("O2_NO_CATCHALL_EXCEPTIONS") && strcmp(getenv("O2_NO_CATCHALL_EXCEPTIONS"), "0");
13691371

1370-
auto runNoCatch = [&context, &processContext]() {
1372+
auto runNoCatch = [&context, &processContext](DataRelayer::RecordAction& action) {
13711373
if (context.deviceContext->state->quitRequested == false) {
13721374
if (*context.statefulProcess) {
13731375
ZoneScopedN("statefull process");
@@ -1386,10 +1388,10 @@ bool DataProcessingDevice::tryDispatchComputation(DataProcessorContext& context,
13861388
};
13871389

13881390
if (noCatch) {
1389-
runNoCatch();
1391+
runNoCatch(action);
13901392
} else {
13911393
try {
1392-
runNoCatch();
1394+
runNoCatch(action);
13931395
} catch (std::exception& ex) {
13941396
ZoneScopedN("error handling");
13951397
/// Convert a standard exception to a RuntimeErrorRef

0 commit comments

Comments
 (0)