Skip to content

Commit 5b1f9fa

Browse files
authored
DPL: introduce dangling-outputs-policy (#4381)
Default / "file": old behaviour. Messages matching --keep rule will be dumped to a file. "fairmq": a new "output proxy" device is created and all the dangling outputs are bound to, which will then push them to a fairmq channel.
1 parent 6826ec1 commit 5b1f9fa

File tree

5 files changed

+41
-6
lines changed

5 files changed

+41
-6
lines changed

Framework/Core/include/Framework/CommonDataProcessors.h

Lines changed: 8 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -17,9 +17,7 @@
1717

1818
#include <vector>
1919

20-
namespace o2
21-
{
22-
namespace framework
20+
namespace o2::framework
2321
{
2422
using outputTasks = std::vector<std::pair<uint32_t, std::string>>;
2523
using outputObjects = std::vector<std::pair<uint32_t, std::vector<std::string>>>;
@@ -38,6 +36,12 @@ struct CommonDataProcessors {
3836
/// not going to be used by the returned DataProcessorSpec.
3937
static DataProcessorSpec getGlobalFileSink(std::vector<InputSpec> const& danglingInputs,
4038
std::vector<InputSpec>& unmatched);
39+
/// Given the list of @a danglingInputs @return a DataProcessor which
40+
/// exposes them through a FairMQ channel.
41+
/// @fixme: for now we only support shmem and ipc
42+
/// @fixme: for now only the dangling inputs are forwarded.
43+
static DataProcessorSpec getGlobalFairMQSink(std::vector<InputSpec> const& danglingInputs);
44+
4145
/// writes inputs of kind AOD to file
4246
static DataProcessorSpec getGlobalAODSink(std::vector<InputSpec> const& OutputInputs,
4347
std::vector<bool> const& isdangling);
@@ -46,7 +50,6 @@ struct CommonDataProcessors {
4650
static DataProcessorSpec getDummySink(std::vector<InputSpec> const& danglingInputs);
4751
};
4852

49-
} // namespace framework
50-
} // namespace o2
53+
} // namespace o2::framework
5154

5255
#endif // o2_framework_CommonDataProcessors_H_INCLUDED

Framework/Core/include/Framework/runDataProcessing.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -126,6 +126,8 @@ int main(int argc, char** argv)
126126
UserCustomizationsHelper::userDefinedCustomization(workflowOptions, 0);
127127
workflowOptions.push_back(ConfigParamSpec{"readers", VariantType::Int64, 1ll, {"number of parallel readers to use"}});
128128
workflowOptions.push_back(ConfigParamSpec{"pipeline", VariantType::String, "", {"override default pipeline size"}});
129+
workflowOptions.push_back(ConfigParamSpec{"dangling-outputs-policy", VariantType::String, "file", {"what to do with dangling outputs. file: write to file, fairmq: send to output proxy"}});
130+
129131
std::vector<ChannelConfigurationPolicy> channelPolicies;
130132
UserCustomizationsHelper::userDefinedCustomization(channelPolicies, 0);
131133
auto defaultChannelPolicies = ChannelConfigurationPolicy::createDefaultPolicies();

Framework/Core/src/CommonDataProcessors.cxx

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,8 @@
2929
#include "Framework/OutputObjHeader.h"
3030
#include "Framework/TableTreeHelpers.h"
3131
#include "Framework/StringHelpers.h"
32+
#include "Framework/ChannelSpec.h"
33+
#include "Framework/ExternalFairMQDeviceProxy.h"
3234

3335
#include "TFile.h"
3436
#include "TTree.h"
@@ -574,6 +576,30 @@ DataProcessorSpec
574576
return spec;
575577
}
576578

579+
DataProcessorSpec CommonDataProcessors::getGlobalFairMQSink(std::vector<InputSpec> const& danglingOutputInputs)
580+
{
581+
582+
// we build the default channel configuration from the binding of the first input
583+
// in order to have more than one we would need to possibility to have support for
584+
// vectored options
585+
// use the OutputChannelSpec as a tool to create the default configuration for the out-of-band channel
586+
OutputChannelSpec externalChannelSpec;
587+
externalChannelSpec.name = "output_0_0";
588+
externalChannelSpec.type = ChannelType::Push;
589+
externalChannelSpec.method = ChannelMethod::Bind;
590+
externalChannelSpec.hostname = "localhost";
591+
externalChannelSpec.port = 0;
592+
externalChannelSpec.listeners = 0;
593+
// in principle, protocol and transport are two different things but fur simplicity
594+
// we use ipc when shared memory is selected and the normal tcp url whith zeromq,
595+
// this is for building the default configuration which can be simply changed from the
596+
// command line
597+
externalChannelSpec.protocol = ChannelProtocol::IPC;
598+
std::string defaultChannelConfig = formatExternalChannelConfiguration(externalChannelSpec);
599+
// at some point the formatting tool might add the transport as well so we have to check
600+
return specifyFairMQDeviceOutputProxy("internal-dpl-output-proxy", danglingOutputInputs, defaultChannelConfig.c_str());
601+
}
602+
577603
DataProcessorSpec CommonDataProcessors::getDummySink(std::vector<InputSpec> const& danglingOutputInputs)
578604
{
579605
return DataProcessorSpec{

Framework/Core/src/WorkflowHelpers.cxx

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -400,11 +400,14 @@ void WorkflowHelpers::injectServiceDevices(WorkflowSpec& workflow, ConfigContext
400400
}
401401

402402
std::vector<InputSpec> unmatched;
403-
if (outputsInputsDangling.size() > 0) {
403+
if (outputsInputsDangling.size() > 0 && ctx.options().get<std::string>("dangling-outputs-policy") == "file") {
404404
auto fileSink = CommonDataProcessors::getGlobalFileSink(outputsInputsDangling, unmatched);
405405
if (unmatched.size() != outputsInputsDangling.size()) {
406406
extraSpecs.push_back(fileSink);
407407
}
408+
} else if (outputsInputsDangling.size() > 0 && ctx.options().get<std::string>("dangling-outputs-policy") == "fairmq") {
409+
auto fairMQSink = CommonDataProcessors::getGlobalFairMQSink(outputsInputsDangling);
410+
extraSpecs.push_back(fairMQSink);
408411
}
409412
if (unmatched.size() > 0) {
410413
extraSpecs.push_back(CommonDataProcessors::getDummySink(unmatched));

Framework/Core/test/test_WorkflowHelpers.cxx

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ std::unique_ptr<ConfigContext> makeEmptyConfigContext()
3030
// either owns or shares ownership of the registry.
3131
std::vector<std::unique_ptr<ParamRetriever>> retrievers;
3232
static std::vector<ConfigParamSpec> specs;
33+
specs.push_back(ConfigParamSpec{"dangling-outputs-policy", VariantType::String, "file", {"what to do with dangling outputs. file: write to file, fairmq: send to output proxy"}});
3334
auto store = std::make_unique<ConfigParamStore>(specs, std::move(retrievers));
3435
store->preload();
3536
store->activate();

0 commit comments

Comments
 (0)