Skip to content

Commit 83dcad1

Browse files
committed
DPL: allow saving all dangling outputs to a file
Ability to actually select what to save (and drop everything by default) being worked on.
1 parent 1b3dd05 commit 83dcad1

File tree

5 files changed

+179
-17
lines changed

5 files changed

+179
-17
lines changed

Framework/Core/CMakeLists.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -72,6 +72,7 @@ set(SRCS
7272
src/FrameworkGUIDeviceInspector.cxx
7373
src/FrameworkGUIDataRelayerUsage.cxx
7474
src/PaletteHelpers.cxx
75+
src/CommonDataProcessors.cxx
7576
${GUI_SOURCES}
7677
test/TestClasses.cxx
7778
src/Variant.cxx
Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,39 @@
1+
// Copyright CERN and copyright holders of ALICE O2. This software is
2+
// distributed under the terms of the GNU General Public License v3 (GPL
3+
// Version 3), copied verbatim in the file "COPYING".
4+
//
5+
// See http://alice-o2.web.cern.ch/license for full licensing information.
6+
//
7+
// In applying this license CERN does not waive the privileges and immunities
8+
// granted to it by virtue of its status as an Intergovernmental Organization
9+
// or submit itself to any jurisdiction.
10+
#ifndef o2_framework_CommonDataProcessors_H_INCLUDED
11+
#define o2_framework_CommonDataProcessors_H_INCLUDED
12+
13+
#include "Framework/DataProcessorSpec.h"
14+
#include "Framework/InputSpec.h"
15+
16+
#include <vector>
17+
18+
namespace o2
19+
{
20+
namespace framework
21+
{
22+
23+
/// Helpers to create a few general data processors
24+
struct CommonDataProcessors {
25+
/// Given the list of @a danglingInputs @return a DataProcessor which does
26+
/// a binary dump for all the dangling inputs matching the Timeframe
27+
/// lifetime. @a unmatched will be filled with all the InputSpecs which are
28+
/// not going to be used by the returned DataProcessorSpec.
29+
static DataProcessorSpec getGlobalFileSink(std::vector<InputSpec> const& danglingInputs,
30+
std::vector<InputSpec>& unmatched);
31+
/// @return a dummy DataProcessorSpec which requires all the passed @a InputSpec
32+
/// and simply discards them.
33+
static DataProcessorSpec getDummySink(std::vector<InputSpec> const& danglingInputs);
34+
};
35+
36+
} // namespace framework
37+
} // namespace o2
38+
39+
#endif // o2_framework_CommonDataProcessors_H_INCLUDED
Lines changed: 118 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,118 @@
1+
// Copyright CERN and copyright holders of ALICE O2. This software is
2+
// distributed under the terms of the GNU General Public License v3 (GPL
3+
// Version 3), copied verbatim in the file "COPYING".
4+
//
5+
// See http://alice-o2.web.cern.ch/license for full licensing information.
6+
//
7+
// In applying this license CERN does not waive the privileges and immunities
8+
// granted to it by virtue of its status as an Intergovernmental Organization
9+
// or submit itself to any jurisdiction.
10+
#include "Framework/CommonDataProcessors.h"
11+
12+
#include "Framework/AlgorithmSpec.h"
13+
#include "Framework/DataProcessingHeader.h"
14+
#include "Framework/DataDescriptorQueryBuilder.h"
15+
#include "Framework/DataDescriptorMatcher.h"
16+
#include "Framework/DataProcessorSpec.h"
17+
#include "Framework/InitContext.h"
18+
#include "Framework/InitContext.h"
19+
#include "Framework/InputSpec.h"
20+
#include "Framework/OutputSpec.h"
21+
#include "Framework/Variant.h"
22+
23+
#include <exception>
24+
#include <fstream>
25+
#include <functional>
26+
#include <memory>
27+
#include <string>
28+
29+
namespace o2
30+
{
31+
namespace framework
32+
{
33+
34+
DataProcessorSpec CommonDataProcessors::getGlobalFileSink(std::vector<InputSpec> const& danglingOutputInputs,
35+
std::vector<InputSpec>& unmatched)
36+
{
37+
auto writerFunction = [danglingOutputInputs](InitContext& ic) -> std::function<void(ProcessingContext&)> {
38+
auto filename = ic.options().get<std::string>("outfile");
39+
auto keepString = ic.options().get<std::string>("keep");
40+
41+
if (filename.empty()) {
42+
throw std::runtime_error("output file missing");
43+
}
44+
45+
bool hasOutputsToWrite = false;
46+
auto outputMatcher = DataDescriptorQueryBuilder::buildFromKeepConfig(keepString);
47+
for (auto& spec : danglingOutputInputs) {
48+
if (outputMatcher->match(spec)) {
49+
hasOutputsToWrite = true;
50+
}
51+
}
52+
if (hasOutputsToWrite == false) {
53+
return std::move([](ProcessingContext& pc) mutable -> void {
54+
static bool once = false;
55+
/// We do it like this until we can use the interruptible sleep
56+
/// provided by recent FairMQ releases.
57+
if (!once) {
58+
LOG(INFO) << "No dangling output to be dumped.";
59+
once = true;
60+
}
61+
sleep(1);
62+
});
63+
}
64+
auto output = std::make_shared<std::ofstream>(filename.c_str(), std::ios_base::binary);
65+
return std::move([ output, matcher = outputMatcher ](ProcessingContext & pc) mutable->void {
66+
LOG(INFO) << "processing data set with " << pc.inputs().size() << " entries";
67+
for (const auto& entry : pc.inputs()) {
68+
LOG(INFO) << " " << *(entry.spec);
69+
auto header = DataRefUtils::getHeader<header::DataHeader*>(entry);
70+
auto dataProcessingHeader = DataRefUtils::getHeader<DataProcessingHeader*>(entry);
71+
if (matcher->match(*header) == false) {
72+
continue;
73+
}
74+
output->write(reinterpret_cast<char const*>(header), sizeof(header::DataHeader));
75+
output->write(reinterpret_cast<char const*>(dataProcessingHeader), sizeof(DataProcessingHeader));
76+
output->write(entry.payload, o2::framework::DataRefUtils::getPayloadSize(entry));
77+
LOG(INFO) << "wrote data, size " << o2::framework::DataRefUtils::getPayloadSize(entry);
78+
}
79+
});
80+
};
81+
82+
std::vector<InputSpec> validBinaryInputs;
83+
auto onlyTimeframe = [](InputSpec const& input) {
84+
return input.lifetime == Lifetime::Timeframe;
85+
};
86+
87+
auto noTimeframe = [](InputSpec const& input) {
88+
return input.lifetime != Lifetime::Timeframe;
89+
};
90+
91+
std::copy_if(danglingOutputInputs.begin(), danglingOutputInputs.end(),
92+
std::back_inserter(validBinaryInputs), onlyTimeframe);
93+
std::copy_if(danglingOutputInputs.begin(), danglingOutputInputs.end(),
94+
std::back_inserter(unmatched), noTimeframe);
95+
96+
DataProcessorSpec spec{
97+
"dpl-global-binary-file-sink",
98+
validBinaryInputs,
99+
Outputs{},
100+
AlgorithmSpec(writerFunction),
101+
{ { "outfile", VariantType::String, "dpl-out.bin", { "Name of the output file" } },
102+
{ "keep", VariantType::String, "", { "Comma separated list of ORIGIN/DESCRIPTION/SUBSPECIFICATION to save in outfile" } } }
103+
};
104+
105+
return spec;
106+
}
107+
108+
DataProcessorSpec CommonDataProcessors::getDummySink(std::vector<InputSpec> const& danglingOutputInputs)
109+
{
110+
return DataProcessorSpec{
111+
"dpl-dummy-sink",
112+
danglingOutputInputs,
113+
Outputs{},
114+
};
115+
}
116+
117+
} // namespace framework
118+
} // namespace o2

Framework/Core/src/WorkflowHelpers.cxx

Lines changed: 14 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99
// or submit itself to any jurisdiction.
1010
#include "WorkflowHelpers.h"
1111
#include "Framework/ChannelMatching.h"
12+
#include "Framework/CommonDataProcessors.h"
1213
#include "Framework/DeviceSpec.h"
1314
#include "Framework/AlgorithmSpec.h"
1415
#include <algorithm>
@@ -157,13 +158,6 @@ void WorkflowHelpers::injectServiceDevices(WorkflowSpec& workflow)
157158
}
158159
}
159160

160-
auto danglingOutputsInputs = computeDanglingOutputs(workflow);
161-
DataProcessorSpec dplInternalSync{
162-
"internal-dpl-sink",
163-
danglingOutputsInputs,
164-
{},
165-
};
166-
167161
if (ccdbBackend.outputs.empty() == false) {
168162
workflow.push_back(ccdbBackend);
169163
}
@@ -176,10 +170,19 @@ void WorkflowHelpers::injectServiceDevices(WorkflowSpec& workflow)
176170
if (timer.outputs.empty() == false) {
177171
workflow.push_back(timer);
178172
}
179-
/// This will inject a fake sink so that any dangling
180-
/// output is actually requested by it.
181-
if (danglingOutputsInputs.empty() == false) {
182-
workflow.push_back(dplInternalSync);
173+
/// This will inject a file sink so that any dangling
174+
/// output is actually written to it.
175+
auto danglingOutputsInputs = computeDanglingOutputs(workflow);
176+
177+
std::vector<InputSpec> unmatched;
178+
if (danglingOutputsInputs.size() > 0) {
179+
auto fileSink = CommonDataProcessors::getGlobalFileSink(danglingOutputsInputs, unmatched);
180+
if (unmatched.size() != danglingOutputsInputs.size()) {
181+
workflow.push_back(fileSink);
182+
}
183+
}
184+
if (unmatched.size() > 0) {
185+
workflow.push_back(CommonDataProcessors::getDummySink(unmatched));
183186
}
184187
}
185188

Framework/Core/src/runDataProcessing.cxx

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -843,9 +843,7 @@ int runStateMachine(DataProcessorSpecs const& workflow, DriverControl& driverCon
843843
case DriverState::MATERIALISE_WORKFLOW:
844844
try {
845845
std::vector<ComputingResource> resources = resourceManager->getAvailableResources();
846-
auto physicalWorkflow = workflow;
847-
WorkflowHelpers::injectServiceDevices(physicalWorkflow);
848-
DeviceSpecHelpers::dataProcessorSpecs2DeviceSpecs(physicalWorkflow, driverInfo.channelPolicies, driverInfo.completionPolicies, deviceSpecs, resources);
846+
DeviceSpecHelpers::dataProcessorSpecs2DeviceSpecs(workflow, driverInfo.channelPolicies, driverInfo.completionPolicies, deviceSpecs, resources);
849847
// This should expand nodes so that we can build a consistent DAG.
850848
} catch (std::runtime_error& e) {
851849
std::cerr << "Invalid workflow: " << e.what() << std::endl;
@@ -1070,10 +1068,13 @@ int doMain(int argc, char** argv, o2::framework::WorkflowSpec const& workflow,
10701068

10711069
bpo::options_description visibleOptions;
10721070
visibleOptions.add(executorOptions);
1071+
1072+
auto physicalWorkflow = workflow;
1073+
WorkflowHelpers::injectServiceDevices(physicalWorkflow);
10731074
// Use the hidden options as veto, all config specs matching a definition
10741075
// in the hidden options are skipped in order to avoid duplicate definitions
10751076
// in the main parser. Note: all config specs are forwarded to devices
1076-
visibleOptions.add(ConfigParamsHelper::prepareOptionDescriptions(workflow, workflowOptions, gHiddenDeviceOptions));
1077+
visibleOptions.add(ConfigParamsHelper::prepareOptionDescriptions(physicalWorkflow, workflowOptions, gHiddenDeviceOptions));
10771078

10781079
bpo::options_description od;
10791080
od.add(visibleOptions);
@@ -1093,7 +1094,7 @@ int doMain(int argc, char** argv, o2::framework::WorkflowSpec const& workflow,
10931094
bpo::options_description helpOptions;
10941095
helpOptions.add(executorOptions);
10951096
// this time no veto is applied, so all the options are added for printout
1096-
helpOptions.add(ConfigParamsHelper::prepareOptionDescriptions(workflow, workflowOptions));
1097+
helpOptions.add(ConfigParamsHelper::prepareOptionDescriptions(physicalWorkflow, workflowOptions));
10971098
std::cout << helpOptions << std::endl;
10981099
exit(0);
10991100
}
@@ -1132,5 +1133,5 @@ int doMain(int argc, char** argv, o2::framework::WorkflowSpec const& workflow,
11321133
driverInfo.startPort = finder.port();
11331134
driverInfo.portRange = finder.range();
11341135
}
1135-
return runStateMachine(workflow, driverControl, driverInfo, gDeviceMetricsInfos, frameworkId);
1136+
return runStateMachine(physicalWorkflow, driverControl, driverInfo, gDeviceMetricsInfos, frameworkId);
11361137
}

0 commit comments

Comments
 (0)