Skip to content

Commit 73179de

Browse files
authored
QC-1143 Merge objects before publishing at End Of Stream (#12998)
and make sure it is tested
1 parent 38140b6 commit 73179de

File tree

8 files changed

+80
-28
lines changed

8 files changed

+80
-28
lines changed

Utilities/Mergers/include/Mergers/IntegratingMerger.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,7 @@ class IntegratingMerger : public framework::Task
5555
void endOfStream(framework::EndOfStreamContext& eosContext) override;
5656

5757
private:
58+
void finishCycle(framework::DataAllocator& outputs);
5859
void publishIntegral(framework::DataAllocator& allocator);
5960
void publishMovingWindow(framework::DataAllocator& allocator);
6061
static void merge(ObjectStore& mMergedDelta, ObjectStore&& other);

Utilities/Mergers/src/FullHistoryMerger.cxx

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -90,6 +90,7 @@ void FullHistoryMerger::run(framework::ProcessingContext& ctx)
9090

9191
void FullHistoryMerger::endOfStream(framework::EndOfStreamContext& eosContext)
9292
{
93+
mergeCache();
9394
publish(eosContext.outputs());
9495
}
9596

Utilities/Mergers/src/IntegratingMerger.cxx

Lines changed: 25 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -69,31 +69,37 @@ void IntegratingMerger::run(framework::ProcessingContext& ctx)
6969
}
7070

7171
if (ctx.inputs().isValid("timer-publish")) {
72-
mCyclesSinceReset++;
72+
finishCycle(ctx.outputs());
73+
}
74+
}
7375

74-
if (mConfig.publishMovingWindow.value == PublishMovingWindow::Yes) {
75-
publishMovingWindow(ctx.outputs());
76-
}
76+
void IntegratingMerger::finishCycle(DataAllocator& outputs)
77+
{
78+
mCyclesSinceReset++;
7779

78-
if (!std::holds_alternative<std::monostate>(mMergedObjectLastCycle)) {
79-
merge(mMergedObjectIntegral, std::move(mMergedObjectLastCycle));
80-
}
81-
mMergedObjectLastCycle = std::monostate{};
82-
mTotalDeltasMerged += mDeltasMerged;
80+
if (mConfig.publishMovingWindow.value == PublishMovingWindow::Yes) {
81+
publishMovingWindow(outputs);
82+
}
8383

84-
publishIntegral(ctx.outputs());
84+
if (!std::holds_alternative<std::monostate>(mMergedObjectLastCycle)) {
85+
merge(mMergedObjectIntegral, std::move(mMergedObjectLastCycle));
86+
}
87+
mMergedObjectLastCycle = std::monostate{};
88+
mTotalDeltasMerged += mDeltasMerged;
8589

86-
if (mConfig.mergedObjectTimespan.value == MergedObjectTimespan::LastDifference ||
87-
mConfig.mergedObjectTimespan.value == MergedObjectTimespan::NCycles && mConfig.mergedObjectTimespan.param == mCyclesSinceReset) {
88-
clear();
89-
}
90+
publishIntegral(outputs);
9091

91-
mCollector->send({mTotalDeltasMerged, "total_deltas_merged"}, monitoring::DerivedMetricMode::RATE);
92-
mCollector->send({mDeltasMerged, "deltas_merged_since_last_publication"});
93-
mCollector->send({mCyclesSinceReset, "cycles_since_reset"});
94-
mDeltasMerged = 0;
92+
if (mConfig.mergedObjectTimespan.value == MergedObjectTimespan::LastDifference ||
93+
mConfig.mergedObjectTimespan.value == MergedObjectTimespan::NCycles && mConfig.mergedObjectTimespan.param == mCyclesSinceReset) {
94+
clear();
9595
}
96+
97+
mCollector->send({mTotalDeltasMerged, "total_deltas_merged"}, monitoring::DerivedMetricMode::RATE);
98+
mCollector->send({mDeltasMerged, "deltas_merged_since_last_publication"});
99+
mCollector->send({mCyclesSinceReset, "cycles_since_reset"});
100+
mDeltasMerged = 0;
96101
}
102+
97103
void IntegratingMerger::merge(ObjectStore& target, ObjectStore&& other)
98104
{
99105
if (std::holds_alternative<std::monostate>(target)) {
@@ -121,7 +127,7 @@ void IntegratingMerger::merge(ObjectStore& target, ObjectStore&& other)
121127

122128
void IntegratingMerger::endOfStream(framework::EndOfStreamContext& eosContext)
123129
{
124-
publishIntegral(eosContext.outputs());
130+
finishCycle(eosContext.outputs());
125131
}
126132

127133
// I am not calling it reset(), because it does not have to be performed during the FairMQs reset.

Utilities/Mergers/test/common.h

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,13 +15,34 @@
1515
#include <sstream>
1616
#include <gsl/span>
1717
#include <TH1.h>
18+
#include <Framework/CallbackService.h>
1819

1920
namespace o2::mergers::test
2021
{
2122
inline auto to_span(const TH1F& histo)
2223
{
2324
return gsl::span(histo.GetArray(), histo.GetSize());
2425
}
26+
27+
void registerCallbacksForTestFailure(framework::CallbackService& cb, std::shared_ptr<bool> success)
28+
{
29+
cb.set<framework::CallbackService::Id::EndOfStream>([success](framework::EndOfStreamContext& ctx) {
30+
if (*success == false) {
31+
LOG(fatal) << "Received an EndOfStream without having received the expected object";
32+
}
33+
});
34+
cb.set<framework::CallbackService::Id::Stop>([success]() {
35+
if (*success == false) {
36+
LOG(fatal) << "STOP transition without having received the expected object";
37+
}
38+
});
39+
cb.set<framework::CallbackService::Id::ExitRequested>([success](framework::ServiceRegistryRef) {
40+
if (*success == false) {
41+
LOG(fatal) << "EXIT transition without having received the expected object";
42+
}
43+
});
44+
}
45+
2546
} // namespace o2::mergers::test
2647

2748
namespace std

Utilities/Mergers/test/customTopologyCommon.h

Lines changed: 16 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
#include <Mergers/CustomMergeableObject.h>
2525
#include <Mergers/MergerBuilder.h>
2626
#include <Mergers/MergerInfrastructureBuilder.h>
27+
#include "common.h"
2728

2829
void customize(std::vector<o2::framework::CompletionPolicy>& policies)
2930
{
@@ -109,9 +110,12 @@ class CustomMergerTestGenerator
109110
},
110111
Outputs{},
111112
AlgorithmSpec{
112-
AlgorithmSpec::InitCallback{[expectedResult = mExpectedResult](InitContext&) {
113+
AlgorithmSpec::InitCallback{[expectedResult = mExpectedResult](InitContext& initContext) {
114+
auto success = std::make_shared<bool>(false);
115+
mergers::test::registerCallbacksForTestFailure(initContext.services().get<CallbackService>(), success);
116+
113117
return AlgorithmSpec::ProcessCallback{
114-
[expectedResult, numberOfCalls = 0, numberOfObjects = 0, numberOfMovingWindows = 0, lastObjectValue = 0, retries = 5](ProcessingContext& processingContext) mutable {
118+
[expectedResult, numberOfCalls = 0, numberOfObjects = 0, numberOfMovingWindows = 0, lastObjectValue = 0, retries = 5, success](ProcessingContext& processingContext) mutable {
115119
numberOfCalls++;
116120

117121
if (processingContext.inputs().isValid("custom")) {
@@ -139,7 +143,10 @@ class CustomMergerTestGenerator
139143
if (lastObjectValue != expectedResult) {
140144
LOG(fatal) << "got wrong secret from object: " << lastObjectValue << ", expected: " << expectedResult;
141145
}
146+
return;
142147
}
148+
LOG(info) << "Received the expected objects, test successful";
149+
*success = true;
143150
}
144151
}};
145152
}}}});
@@ -154,12 +161,17 @@ class CustomMergerTestGenerator
154161
},
155162
Outputs{},
156163
AlgorithmSpec{
157-
AlgorithmSpec::InitCallback{[expectedResult = mExpectedResult](InitContext&) {
164+
AlgorithmSpec::InitCallback{[expectedResult = mExpectedResult](InitContext& initContext) {
165+
auto success = std::make_shared<bool>(false);
166+
mergers::test::registerCallbacksForTestFailure(initContext.services().get<CallbackService>(), success);
167+
158168
return AlgorithmSpec::ProcessCallback{
159-
[expectedResult, retryNumber = 0, numberOfRetries = 5](ProcessingContext& processingContext) mutable {
169+
[expectedResult, retryNumber = 0, numberOfRetries = 5, success](ProcessingContext& processingContext) mutable {
160170
const auto obj = processingContext.inputs().get<mergers::CustomMergeableObject*>("custom");
161171

162172
if (obj->getSecret() == expectedResult) {
173+
LOG(info) << "Received the expected object, test successful";
174+
*success = true;
163175
processingContext.services().get<ControlService>().readyToQuit(QuitRequest::All);
164176
return;
165177
}

Utilities/Mergers/test/histosTopologyCommon.h

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -71,6 +71,7 @@ class HistosMergerTestGenerator
7171
"histo", "histo", histoBinsCount, histoMin, histoMax);
7272
histo.Fill(5);
7373
histo.Fill(producerIdx);
74+
processingContext.services().get<ControlService>().endOfStream();
7475
processingContext.services().get<ControlService>().readyToQuit(QuitRequest::Me);
7576
})}});
7677
}
@@ -103,14 +104,19 @@ class HistosMergerTestGenerator
103104
Inputs{{"histo", origin, description, 0, Lifetime::Sporadic}},
104105
Outputs{},
105106
AlgorithmSpec{
106-
AlgorithmSpec::InitCallback{[expectedResult = mExpectedResult](InitContext&) {
107+
AlgorithmSpec::InitCallback{[expectedResult = mExpectedResult](InitContext& initContext) {
108+
auto success = std::make_shared<bool>(false);
109+
mergers::test::registerCallbacksForTestFailure(initContext.services().get<CallbackService>(), success);
110+
107111
// reason for this crude retry is that multiple layers are not synchronized between each other and publish on their own timers,
108112
// number of retries is chosen arbitrarily as we need to retry at least twice
109-
return AlgorithmSpec::ProcessCallback{[expectedResult, retryNumber = 1, retries = 5](ProcessingContext& processingContext) mutable {
113+
return AlgorithmSpec::ProcessCallback{[expectedResult, retryNumber = 1, retries = 5, success](ProcessingContext& processingContext) mutable {
110114
const auto histo = processingContext.inputs().get<TH1F*>("histo");
111115

112116
LOG(info) << "RETRY: " << retryNumber << ": comparing: " << std::to_string(histo) << " to the expected: " << std::to_string(expectedResult);
113117
if (std::equal(expectedResult.begin(), expectedResult.end(), histo->GetArray(), histo->GetArray() + histo->GetSize())) {
118+
LOG(info) << "Received the expected object, test successful";
119+
*success = true;
114120
processingContext.services().get<ControlService>().readyToQuit(QuitRequest::All);
115121
return;
116122
}

Utilities/Mergers/test/test_MergerTopologyHistosFullHistory.cxx

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@ WorkflowSpec defineDataProcessing(ConfigContext const&)
3232

3333
const auto mergersInputs = generator.generateHistoProducers(specs, producersCount);
3434

35-
generator.generateMergers(specs, mergersInputs, InputObjectsTimespan::LastDifference);
35+
generator.generateMergers(specs, mergersInputs, InputObjectsTimespan::FullHistory);
3636

3737
generator.generateChecker(specs);
3838

Utilities/Mergers/test/vectorTopologyCommon.h

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -127,15 +127,20 @@ class VectorMergerTestGenerator
127127
Inputs{{"vec", origin, description, 0, Lifetime::Sporadic}},
128128
Outputs{},
129129
AlgorithmSpec{
130-
AlgorithmSpec::InitCallback{[expectedResult = mExpectedResult](InitContext&) {
130+
AlgorithmSpec::InitCallback{[expectedResult = mExpectedResult](InitContext& initContext) {
131+
auto success = std::make_shared<bool>(false);
132+
mergers::test::registerCallbacksForTestFailure(initContext.services().get<CallbackService>(), success);
133+
131134
// reason for this crude retry is that multiple layers are not synchronized between each other and publish on their own timers.
132135
// number of retries was chosen a bit randomly, as we need to have at least 2 runs through this function because of publish
133136
// timers inside of the mergers
134-
return AlgorithmSpec::ProcessCallback{[expectedResult, retryNumber = 1, retries = 5](ProcessingContext& processingContext) mutable {
137+
return AlgorithmSpec::ProcessCallback{[expectedResult, retryNumber = 1, retries = 5, success](ProcessingContext& processingContext) mutable {
135138
const auto vectorOfHistos = processingContext.inputs().get<std::vector<TObject*>*>("vec");
136139

137140
LOG(info) << "RETRY: " << retryNumber << ": comparing: " << std::to_string(vectorOfHistos) << " to the expected: " << std::to_string(expectedResult);
138141
if (vectorOfHistos == expectedResult) {
142+
LOG(info) << "Received the expected object, test successful";
143+
*success = true;
139144
processingContext.services().get<ControlService>().readyToQuit(QuitRequest::All);
140145
return;
141146
}

0 commit comments

Comments
 (0)