Skip to content

Commit a6fdf04

Browse files
ktfdavidrohr
authored andcommitted
DPL: add finaliseOutputs callback for services
This is invoked after the processing is done (therefore no other outputs are expected from user code) but before the postProcessing (which is were data is actually sent to their consumers.
1 parent f54a1b2 commit a6fdf04

File tree

8 files changed

+53
-1
lines changed

8 files changed

+53
-1
lines changed

Framework/Core/include/Framework/CallbackService.h

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -71,6 +71,9 @@ class CallbackService
7171
NewTimeslice,
7272
/// Invoked before the processing callback
7373
PreProcessing,
74+
/// Invoked after the processing callback and before the post processing
75+
/// callback to allow for injecting data in the output stream.
76+
FinaliseOutputs,
7477
/// Invoked after the processing callback,
7578
PostProcessing,
7679
/// Invoked whenever an object from CCDB is deserialised via ROOT.
@@ -94,6 +97,7 @@ class CallbackService
9497
using RegionInfoCallback = std::function<void(fair::mq::RegionInfo const&)>;
9598
using NewTimesliceCallback = std::function<void(o2::header::DataHeader&, DataProcessingHeader&)>;
9699
using PreProcessingCallback = std::function<void(ServiceRegistryRef, int)>;
100+
using FinaliseOutputsCallback = std::function<void(ServiceRegistryRef, int)>;
97101
using PostProcessingCallback = std::function<void(ServiceRegistryRef, int)>;
98102
using CCDBDeserializedCallback = std::function<void(ConcreteDataMatcher&, void*)>;
99103
using DomainInfoUpdatedCallback = std::function<void(ServiceRegistryRef, size_t timeslice, ChannelIndex index)>;
@@ -111,6 +115,7 @@ class CallbackService
111115
RegistryPair<Id, Id::RegionInfoCallback, RegionInfoCallback>, //
112116
RegistryPair<Id, Id::NewTimeslice, NewTimesliceCallback>, //
113117
RegistryPair<Id, Id::PreProcessing, PreProcessingCallback>, //
118+
RegistryPair<Id, Id::FinaliseOutputs, FinaliseOutputsCallback>, //
114119
RegistryPair<Id, Id::PostProcessing, PostProcessingCallback>, //
115120
RegistryPair<Id, Id::CCDBDeserialised, CCDBDeserializedCallback>, //
116121
RegistryPair<Id, Id::DomainInfoUpdated, DomainInfoUpdatedCallback>, //

Framework/Core/include/Framework/DataProcessingContext.h

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,9 @@ struct DataProcessorContext {
5353
void preStartCallbacks(ServiceRegistryRef);
5454
/// Invoke callbacks to be executed before every process method invokation
5555
void preProcessingCallbacks(ProcessingContext&);
56+
/// Invoke callbacks to be executed after the outputs have been created
57+
/// by the processing, but before the post processing callbacks.
58+
void finaliseOutputsCallbacks(ProcessingContext&);
5659
/// Invoke callbacks to be executed after every process method invokation
5760
void postProcessingCallbacks(ProcessingContext&);
5861
/// Invoke callbacks to be executed before every dangling check
@@ -91,6 +94,9 @@ struct DataProcessorContext {
9194
mutable std::vector<ServiceProcessingHandle> preProcessingHandlers;
9295
/// Callback for services to be executed after every processing.
9396
/// The callback MUST BE REENTRANT and threadsafe.
97+
mutable std::vector<ServiceProcessingHandle> finaliseOutputsHandles;
98+
/// Callback for services to be executed after every processing.
99+
/// The callback MUST BE REENTRANT and threadsafe.
94100
mutable std::vector<ServiceProcessingHandle> postProcessingHandlers;
95101
/// Callbacks for services to be executed before every dangling check
96102
mutable std::vector<ServiceDanglingHandle> preDanglingHandles;

Framework/Core/include/Framework/ServiceSpec.h

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -146,6 +146,9 @@ struct ServiceSpec {
146146
ServiceConfigureCallback configure = nullptr;
147147
/// Callback executed before actual processing happens.
148148
ServiceProcessingCallback preProcessing = nullptr;
149+
/// Callback executed after the processing callback is completed
150+
/// and the user provided outputs have been created.
151+
ServiceProcessingCallback finaliseOutputs = nullptr;
149152
/// Callback executed once actual processing happened.
150153
ServiceProcessingCallback postProcessing = nullptr;
151154
/// Callback executed before the dangling inputs loop
@@ -170,7 +173,7 @@ struct ServiceSpec {
170173
ServicePreSchedule preSchedule = nullptr;
171174
ServicePostSchedule postSchedule = nullptr;
172175

173-
///Callback executed after each metric is received by the driver.
176+
/// Callback executed after each metric is received by the driver.
174177
ServiceMetricHandling metricHandling = nullptr;
175178

176179
/// Callback executed after a given input record has been successfully

Framework/Core/include/Framework/StreamContext.h

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,9 @@ struct StreamContext {
3838
void preStartStreamCallbacks(ServiceRegistryRef);
3939

4040
void preProcessingCallbacks(ProcessingContext& pcx);
41+
/// Invoke callbacks to be executed after the outputs have been created
42+
/// by the processing, but before the post processing callbacks.
43+
void finaliseOutputsCallbacks(ProcessingContext&);
4144
void postProcessingCallbacks(ProcessingContext& pcx);
4245

4346
/// Invoke callbacks to be executed before every EOS user callback invokation
@@ -47,6 +50,7 @@ struct StreamContext {
4750

4851
/// Callbacks for services to be executed before every process method invokation
4952
std::vector<ServiceProcessingHandle> preProcessingHandles;
53+
std::vector<ServiceProcessingHandle> finaliseOutputsHandles;
5054
/// Callbacks for services to be executed after every process method invokation
5155
std::vector<ServiceProcessingHandle> postProcessingHandles;
5256

Framework/Core/src/ContextHelpers.h

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,9 @@ void ContextHelpers::bindStreamService(DataProcessorContext& dpContext, StreamCo
4141
if (spec.preProcessing) {
4242
context.preProcessingHandles.push_back(ServiceProcessingHandle{spec, spec.preProcessing, service});
4343
}
44+
if (spec.finaliseOutputs) {
45+
context.finaliseOutputsHandles.push_back(ServiceProcessingHandle{spec, spec.finaliseOutputs, service});
46+
}
4447
if (spec.postProcessing) {
4548
context.postProcessingHandles.push_back(ServiceProcessingHandle{spec, spec.postProcessing, service});
4649
}
@@ -59,6 +62,9 @@ void ContextHelpers::bindProcessorService(DataProcessorContext& dataProcessorCon
5962
if (spec.preProcessing) {
6063
dataProcessorContext.preProcessingHandlers.push_back(ServiceProcessingHandle{spec, spec.preProcessing, service});
6164
}
65+
if (spec.finaliseOutputs) {
66+
dataProcessorContext.finaliseOutputsHandles.push_back(ServiceProcessingHandle{spec, spec.finaliseOutputs, service});
67+
}
6268
if (spec.postProcessing) {
6369
dataProcessorContext.postProcessingHandlers.push_back(ServiceProcessingHandle{spec, spec.postProcessing, service});
6470
}

Framework/Core/src/DataProcessingContext.cxx

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,14 @@ void DataProcessorContext::preProcessingCallbacks(ProcessingContext& ctx)
2222
}
2323
}
2424

25+
void DataProcessorContext::finaliseOutputsCallbacks(ProcessingContext& ctx)
26+
{
27+
for (auto& handle : finaliseOutputsHandles) {
28+
LOGP(debug, "Invoking postProcessingCallback for service {}", handle.spec.name);
29+
handle.callback(ctx, handle.service);
30+
}
31+
}
32+
2533
/// Invoke callbacks to be executed before every dangling check
2634
void DataProcessorContext::postProcessingCallbacks(ProcessingContext& ctx)
2735
{

Framework/Core/src/DataProcessingDevice.cxx

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2299,6 +2299,15 @@ bool DataProcessingDevice::tryDispatchComputation(ServiceRegistryRef ref, std::v
22992299
allocator.make<int>(OutputRef{"dpl-summary", compile_time_hash(spec.name.c_str())}, 1);
23002300
}
23012301

2302+
// Extra callback which allows a service to add extra outputs.
2303+
// This is needed e.g. to ensure that injected CCDB outputs are added
2304+
// before an end of stream.
2305+
{
2306+
ref.get<CallbackService>().call<CallbackService::Id::FinaliseOutputs>(o2::framework::ServiceRegistryRef{ref}, (int)action.op);
2307+
dpContext.finaliseOutputsCallbacks(processContext);
2308+
streamContext.finaliseOutputsCallbacks(processContext);
2309+
}
2310+
23022311
{
23032312
ZoneScopedN("service post processing");
23042313
ref.get<CallbackService>().call<CallbackService::Id::PostProcessing>(o2::framework::ServiceRegistryRef{ref}, (int)action.op);

Framework/Core/src/StreamContext.cxx

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,17 @@ void StreamContext::preProcessingCallbacks(ProcessingContext& pcx)
3535
}
3636
}
3737

38+
/// Invoke callbacks to be executed after every process method invokation
39+
void StreamContext::finaliseOutputsCallbacks(ProcessingContext& pcx)
40+
{
41+
for (auto& handle : finaliseOutputsHandles) {
42+
LOG(debug) << "Invoking finaliseOutputsCallbacks for " << handle.service;
43+
assert(handle.service);
44+
assert(handle.callback);
45+
handle.callback(pcx, handle.service);
46+
}
47+
}
48+
3849
/// Invoke callbacks to be executed after every process method invokation
3950
void StreamContext::postProcessingCallbacks(ProcessingContext& pcx)
4051
{

0 commit comments

Comments
 (0)