Skip to content

Commit b549d4d

Browse files
committed
DPL: add support for dangling outputs
Sometimes it's handy to leave dangling outputs visible to a data processor, so that one can reuse a given algorithm in different scenarious where they might or not be requested. This changes the previous behavior that was to wipe the output completely, so that invoking make() on it would have thrown an exception. The new behavior will allow the user to invoke make and pass the output to an internal device that acts as a sink. Future updates of this feature will allow the user to query if a given output is dangling or not and provide a callback based API for make which will invoke the callback only if the output is not dangling. We could also exploit this feature to have preconfigured actions (e.g. write to a ROOT file) for dangling outputs, so that the user does not need to instanciate a writer himself, but can rely on the system to create one for all the dangling outputs.
1 parent d4557ac commit b549d4d

File tree

6 files changed

+180
-4
lines changed

6 files changed

+180
-4
lines changed

Framework/Core/CMakeLists.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -208,6 +208,7 @@ set(TEST_SRCS
208208
test/test_CustomGUI.cxx
209209
test/test_CompletionPolicy.cxx
210210
test/test_DanglingInputs.cxx
211+
test/test_DanglingOutputs.cxx
211212
test/test_DataAllocator.cxx
212213
test/test_DataProcessorSpec.cxx
213214
test/test_DataRefUtils.cxx

Framework/Core/include/Framework/OutputSpec.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -68,7 +68,7 @@ struct OutputSpec {
6868
{
6969
}
7070

71-
bool operator==(const OutputSpec& that)
71+
bool operator==(OutputSpec const& that) const
7272
{
7373
return origin == that.origin && description == that.description && subSpec == that.subSpec &&
7474
lifetime == that.lifetime;

Framework/Core/src/WorkflowHelpers.cxx

Lines changed: 69 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -157,6 +157,13 @@ void WorkflowHelpers::injectServiceDevices(WorkflowSpec& workflow)
157157
}
158158
}
159159

160+
auto danglingOutputsInputs = computeDanglingOutputs(workflow);
161+
DataProcessorSpec dplInternalSync{
162+
"internal-dpl-sink",
163+
danglingOutputsInputs,
164+
{},
165+
};
166+
160167
if (ccdbBackend.outputs.empty() == false) {
161168
workflow.push_back(ccdbBackend);
162169
}
@@ -169,6 +176,11 @@ void WorkflowHelpers::injectServiceDevices(WorkflowSpec& workflow)
169176
if (timer.outputs.empty() == false) {
170177
workflow.push_back(timer);
171178
}
179+
/// This will inject a fake sink so that any dangling
180+
/// output is actually requested by it.
181+
if (danglingOutputsInputs.empty() == false) {
182+
workflow.push_back(dplInternalSync);
183+
}
172184
}
173185

174186
void
@@ -477,5 +489,62 @@ WorkflowHelpers::verifyWorkflow(const o2::framework::WorkflowSpec &workflow) {
477489
}
478490
}
479491

492+
struct UnifiedDataSpecType {
493+
header::DataOrigin origin;
494+
header::DataDescription description;
495+
uint64_t subSpec;
496+
int isOutput;
497+
};
498+
499+
std::vector<InputSpec> WorkflowHelpers::computeDanglingOutputs(WorkflowSpec const& workflow)
500+
{
501+
std::vector<UnifiedDataSpecType> tmp;
502+
std::vector<InputSpec> results;
503+
504+
for (auto& spec : workflow) {
505+
for (auto& input : spec.inputs) {
506+
tmp.push_back({ input.origin, input.description, input.subSpec, 0 });
507+
}
508+
for (auto& output : spec.outputs) {
509+
tmp.push_back({ output.origin, output.description, output.subSpec, 1 });
510+
}
511+
}
512+
513+
auto cmp = [](UnifiedDataSpecType const& lhs, UnifiedDataSpecType const& rhs) -> bool {
514+
return std::tie(lhs.origin, lhs.description, lhs.subSpec, lhs.isOutput) <
515+
std::tie(rhs.origin, rhs.description, rhs.subSpec, rhs.isOutput);
516+
};
517+
std::sort(tmp.begin(), tmp.end(), cmp);
518+
519+
// Once sorted, all the transitions which begin with an output
520+
// mean there was no input.
521+
bool isFirst = true;
522+
UnifiedDataSpecType last;
523+
int i = 0;
524+
for (auto& unified : tmp) {
525+
if (last.origin != unified.origin ||
526+
last.description != unified.description ||
527+
last.subSpec != unified.subSpec) {
528+
isFirst = true;
529+
last = unified;
530+
}
531+
if (isFirst && unified.isOutput == 0) {
532+
isFirst = false;
533+
continue;
534+
}
535+
if (isFirst && unified.isOutput == 1) {
536+
isFirst = false;
537+
char buf[64];
538+
results.push_back(InputSpec{ (snprintf(buf, 64, "dangling%d", i), buf), unified.origin,
539+
unified.description, unified.subSpec });
540+
++i;
541+
continue;
542+
}
543+
assert(isFirst == false);
544+
}
545+
546+
return results;
547+
}
548+
480549
} // namespace framwork
481550
} // namespace o2

Framework/Core/src/WorkflowHelpers.h

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -165,6 +165,11 @@ struct WorkflowHelpers {
165165
static std::vector<EdgeAction> computeInEdgeActions(
166166
const std::vector<DeviceConnectionEdge> &edges,
167167
const std::vector<size_t> &index);
168+
169+
/// Given @a workflow it finds the OutputSpec in every module which do not have
170+
/// a corresponding InputSpec. I.e. they are dangling.
171+
/// @return a vector of InputSpec which would have matched said dangling outputs.
172+
static std::vector<InputSpec> computeDanglingOutputs(WorkflowSpec const& workflow);
168173
};
169174

170175
}
Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,51 @@
1+
// Copyright CERN and copyright holders of ALICE O2. This software is
2+
// distributed under the terms of the GNU General Public License v3 (GPL
3+
// Version 3), copied verbatim in the file "COPYING".
4+
//
5+
// See http://alice-o2.web.cern.ch/license for full licensing information.
6+
//
7+
// In applying this license CERN does not waive the privileges and immunities
8+
// granted to it by virtue of its status as an Intergovernmental Organization
9+
// or submit itself to any jurisdiction.
10+
#include "Framework/ConfigParamSpec.h"
11+
#include "Framework/DeviceSpec.h"
12+
#include <InfoLogger/InfoLogger.hxx>
13+
#include <vector>
14+
#include "Framework/runDataProcessing.h"
15+
#include "Framework/ControlService.h"
16+
17+
using namespace o2::framework;
18+
19+
AlgorithmSpec simplePipe(std::string const& what, int minDelay)
20+
{
21+
return AlgorithmSpec{ [what, minDelay](InitContext& ic) {
22+
srand(getpid());
23+
return [what, minDelay](ProcessingContext& ctx) {
24+
auto bData = ctx.outputs().make<int>(OutputRef{ what }, 1);
25+
};
26+
} };
27+
}
28+
29+
// a1 is not actually used by anything, however it might.
30+
WorkflowSpec defineDataProcessing(ConfigContext const& specs)
31+
{
32+
return WorkflowSpec{
33+
{ "A",
34+
Inputs{},
35+
{ OutputSpec{ { "a1" }, "TST", "A1" },
36+
OutputSpec{ { "a2" }, "TST", "A2" } },
37+
AlgorithmSpec{
38+
[](ProcessingContext& ctx) {
39+
sleep(rand() % 2 + 1);
40+
auto aData1 = ctx.outputs().make<int>(OutputRef{ "a1" }, 1);
41+
auto aData2 = ctx.outputs().make<int>(Output{ "TST", "A2" }, 1);
42+
} } },
43+
{ "B",
44+
{ InputSpec{ { "a1" }, "TST", "A1" } },
45+
{},
46+
AlgorithmSpec{
47+
[](ProcessingContext& ctx) {
48+
ctx.services().get<ControlService>().readyToQuit(true);
49+
} } }
50+
};
51+
}

Framework/Core/test/test_WorkflowHelpers.cxx

Lines changed: 53 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
#include "../src/WorkflowHelpers.h"
1717
#include <boost/test/unit_test.hpp>
1818
#include <boost/test/tools/detail/per_element_manip.hpp>
19+
#include <algorithm>
1920

2021
using namespace o2::framework;
2122

@@ -418,10 +419,59 @@ BOOST_AUTO_TEST_CASE(TestExternalInput)
418419

419420
BOOST_CHECK_EQUAL(workflow.size(), 1);
420421
WorkflowHelpers::injectServiceDevices(workflow);
421-
// The added device is the one which should connect to
422-
// the condition DB.
423-
BOOST_CHECK_EQUAL(workflow.size(), 2);
422+
// The added devices are the one which should connect to
423+
// the condition DB and the sink for the dangling outputs.
424+
BOOST_CHECK_EQUAL(workflow.size(), 3);
424425
WorkflowHelpers::constructGraph(workflow, logicalEdges,
425426
outputs,
426427
availableForwardsInfo);
427428
}
429+
430+
BOOST_AUTO_TEST_CASE(DetermineDanglingOutputs)
431+
{
432+
WorkflowSpec workflow0{
433+
{ "A", Inputs{}, { OutputSpec{ "TST", "A" } } },
434+
{ "B", { InputSpec{ "a", "TST", "A" } }, Outputs{} }
435+
};
436+
437+
WorkflowSpec workflow1{
438+
{ "A",
439+
Inputs{},
440+
Outputs{ OutputSpec{ "TST", "A" } } }
441+
};
442+
443+
WorkflowSpec workflow2{
444+
{ "A", Inputs{}, { OutputSpec{ "TST", "A" } } },
445+
{ "B", { InputSpec{ "a", "TST", "B" } }, Outputs{} }
446+
};
447+
448+
WorkflowSpec workflow3{
449+
{ "A", Inputs{}, { OutputSpec{ "TST", "A" }, OutputSpec{ "TST", "B" } } },
450+
{ "B", { InputSpec{ "a", "TST", "A" } }, Outputs{} }
451+
};
452+
453+
WorkflowSpec workflow4{
454+
{ "A", Inputs{}, { OutputSpec{ "TST", "A" }, OutputSpec{ "TST", "B" }, OutputSpec{ "TST", "C" } } },
455+
{ "B", { InputSpec{ "a", "TST", "A" } }, Outputs{} }
456+
};
457+
458+
auto dangling0 = WorkflowHelpers::computeDanglingOutputs(workflow0);
459+
std::vector<InputSpec> expected0{};
460+
BOOST_TEST(dangling0 == expected0, boost::test_tools::per_element());
461+
462+
auto dangling1 = WorkflowHelpers::computeDanglingOutputs(workflow1);
463+
std::vector<InputSpec> expected1{ InputSpec{ "dangling0", "TST", "A" } };
464+
BOOST_TEST(dangling1 == expected1, boost::test_tools::per_element());
465+
466+
auto dangling2 = WorkflowHelpers::computeDanglingOutputs(workflow2);
467+
std::vector<InputSpec> expected2{ InputSpec{ "dangling0", "TST", "A" } };
468+
BOOST_TEST(dangling2 == expected2, boost::test_tools::per_element());
469+
470+
auto dangling3 = WorkflowHelpers::computeDanglingOutputs(workflow3);
471+
std::vector<InputSpec> expected3{ InputSpec{ "dangling0", "TST", "B" } };
472+
BOOST_TEST(dangling3 == expected3, boost::test_tools::per_element());
473+
474+
auto dangling4 = WorkflowHelpers::computeDanglingOutputs(workflow4);
475+
std::vector<InputSpec> expected4{ InputSpec{ "dangling0", "TST", "B" }, InputSpec{ "dangling1", "TST", "C" } };
476+
BOOST_TEST(dangling4 == expected4, boost::test_tools::per_element());
477+
}

0 commit comments

Comments
 (0)