Skip to content

Commit 548ed10

Browse files
knopers8ktf
authored andcommitted
Reconfigurable dispatcher
1 parent cc593eb commit 548ed10

File tree

3 files changed

+225
-0
lines changed

3 files changed

+225
-0
lines changed

Framework/Core/CMakeLists.txt

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -63,6 +63,7 @@ set(SRCS
6363
src/TextControlService.cxx
6464
src/TableBuilder.cxx
6565
src/TableConsumer.cxx
66+
src/WorkflowDispatcher.cxx
6667
src/WorkflowHelpers.cxx
6768
src/WorkflowSpec.cxx
6869
src/runDataProcessing.cxx
@@ -150,6 +151,8 @@ set(HEADERS
150151
include/Framework/DPLBoostSerializer.h
151152
include/Framework/TableBuilder.h
152153
include/Framework/FairMQResizableBuffer.h
154+
include/Framework/Dispatcher.h
155+
include/Framework/WorkflowDispatcher.h
153156
src/ComputingResource.h
154157
src/DDSConfigHelpers.h
155158
src/DeviceSpecHelpers.h
@@ -248,6 +251,7 @@ set(TEST_SRCS
248251
test/test_TimeParallelPipelining.cxx
249252
test/test_TypeTraits.cxx
250253
test/test_Variants.cxx
254+
# test/test_WorkflowDispatcher.cxx
251255
test/test_WorkflowHelpers.cxx
252256
)
253257

Lines changed: 63 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,63 @@
1+
// Copyright CERN and copyright holders of ALICE O2. This software is
2+
// distributed under the terms of the GNU General Public License v3 (GPL
3+
// Version 3), copied verbatim in the file "COPYING".
4+
//
5+
// See http://alice-o2.web.cern.ch/license for full licensing information.
6+
//
7+
// In applying this license CERN does not waive the privileges and immunities
8+
// granted to it by virtue of its status as an Intergovernmental Organization
9+
// or submit itself to any jurisdiction.
10+
11+
/// \file WorkflowDispatcher.h
12+
/// \brief Declaration of WorkflowDispatcher for O2 Data Sampling
13+
///
14+
/// \author Piotr Konopka, piotr.jan.konopka@cern.ch
15+
16+
#ifndef ALICEO2_WORKFLOWDISPATCHER_H
17+
#define ALICEO2_WORKFLOWDISPATCHER_H
18+
19+
#include "Framework/DataProcessorSpec.h"
20+
#include "Framework/DataSamplingPolicy.h"
21+
#include "Framework/Task.h"
22+
23+
namespace o2
24+
{
25+
namespace framework
26+
{
27+
28+
class WorkflowDispatcher : public Task
29+
{
30+
public:
31+
/// \brief Constructor
32+
WorkflowDispatcher(const std::string name = std::string(), const std::string reconfSource = std::string());
33+
/// \brief Destructor
34+
~WorkflowDispatcher();
35+
36+
/// \brief Dispatcher init callback
37+
void init(InitContext& ctx) override;
38+
/// \brief Dispatcher process callback
39+
void run(ProcessingContext& ctx) override;
40+
41+
/// \brief Create appropriate inputSpecs and outputSpecs for sampled data during the workflow declaration phase.
42+
void registerPath(const std::pair<InputSpec, OutputSpec>&);
43+
44+
const std::string& getName();
45+
Inputs getInputSpecs();
46+
Outputs getOutputSpecs();
47+
48+
private:
49+
void send(DataAllocator& dataAllocator, const DataRef& inputData, const Output& output) const;
50+
void sendFairMQ(const FairMQDevice* device, const DataRef& inputData, const std::string& fairMQChannel) const;
51+
52+
std::string mName;
53+
std::string mReconfigurationSource;
54+
Inputs inputs;
55+
Outputs outputs;
56+
// policies should be shared between all pipeline threads
57+
std::vector<std::shared_ptr<DataSamplingPolicy>> mPolicies;
58+
};
59+
60+
} // namespace framework
61+
} // namespace o2
62+
63+
#endif //ALICEO2_WORKFLOWDISPATCHER_H
Lines changed: 158 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,158 @@
1+
// Copyright CERN and copyright holders of ALICE O2. This software is
2+
// distributed under the terms of the GNU General Public License v3 (GPL
3+
// Version 3), copied verbatim in the file "COPYING".
4+
//
5+
// See http://alice-o2.web.cern.ch/license for full licensing information.
6+
//
7+
// In applying this license CERN does not waive the privileges and immunities
8+
// granted to it by virtue of its status as an Intergovernmental Organization
9+
// or submit itself to any jurisdiction.
10+
11+
/// \file WorkflowDispatcher.cxx
12+
/// \brief Implementation of WorkflowDispatcher for O2 Data Sampling
13+
///
14+
/// \author Piotr Konopka, piotr.jan.konopka@cern.ch
15+
16+
#include <Configuration/ConfigurationInterface.h>
17+
#include <Configuration/ConfigurationFactory.h>
18+
#include "Framework/WorkflowDispatcher.h"
19+
#include "Framework/RawDeviceService.h"
20+
#include "Framework/DataSamplingPolicy.h"
21+
#include "Framework/DataProcessingHeader.h"
22+
#include <FairMQDevice.h>
23+
#include <FairLogger.h>
24+
25+
using namespace o2::configuration;
26+
27+
namespace o2
28+
{
29+
namespace framework
30+
{
31+
32+
WorkflowDispatcher::WorkflowDispatcher(std::string name, const std::string reconfigurationSource)
33+
: mReconfigurationSource(reconfigurationSource)
34+
{
35+
mName = name.empty() ? std::string("Dispatcher_") + getenv("HOSTNAME") : name;
36+
}
37+
38+
WorkflowDispatcher::~WorkflowDispatcher()
39+
{
40+
}
41+
42+
void WorkflowDispatcher::init(InitContext& ctx)
43+
{
44+
LOG(DEBUG) << "Reading Data Sampling Policies...";
45+
46+
std::unique_ptr<ConfigurationInterface> cfg = ConfigurationFactory::getConfiguration(mReconfigurationSource);
47+
auto policiesTree = cfg->getRecursive("dataSamplingPolicies");
48+
mPolicies.clear();
49+
50+
for (auto&& policyConfig : policiesTree) {
51+
mPolicies.emplace_back(std::make_shared<DataSamplingPolicy>(policyConfig.second));
52+
}
53+
}
54+
55+
void WorkflowDispatcher::run(ProcessingContext& ctx)
56+
{
57+
for (const auto& input : ctx.inputs()) {
58+
if (input.header != nullptr && input.spec != nullptr) {
59+
60+
for (auto& policy : mPolicies) {
61+
// todo: evaluate order's impact on performance
62+
// todo: consider getting the outputSpec in match to improve performance
63+
// todo: consider matching (and deciding) in completion policy to save some time
64+
if (policy->match(*input.spec) && policy->decide(input)) {
65+
66+
67+
if (!policy->getFairMQOutputChannel().empty()) {
68+
sendFairMQ(ctx.services().get<RawDeviceService>().device(), input, policy->getFairMQOutputChannelName());
69+
} else {
70+
send(ctx.outputs(), input, policy->prepareOutput(*input.spec));
71+
}
72+
}
73+
}
74+
}
75+
}
76+
}
77+
78+
void WorkflowDispatcher::send(DataAllocator& dataAllocator, const DataRef& inputData, const Output& output) const
79+
{
80+
const auto* inputHeader = header::get<header::DataHeader*>(inputData.header);
81+
if (inputHeader->payloadSerializationMethod == header::gSerializationMethodInvalid) {
82+
LOG(WARNING) << "DataSampling::dispatcherCallback: input of origin'" << inputHeader->dataOrigin.str
83+
<< "', description '" << inputHeader->dataDescription.str
84+
<< "' has gSerializationMethodInvalid.";
85+
} else if (inputHeader->payloadSerializationMethod == header::gSerializationMethodROOT) {
86+
dataAllocator.adopt(output, DataRefUtils::as<TObject>(inputData).release());
87+
} else { // POD
88+
// todo: do it non-copy, when API is available
89+
auto outputMessage = dataAllocator.newChunk(output, inputHeader->payloadSize);
90+
memcpy(outputMessage.data, inputData.payload, inputHeader->payloadSize);
91+
}
92+
}
93+
94+
// ideally this should be in a separate proxy device or use Lifetime::External
95+
void WorkflowDispatcher::sendFairMQ(const FairMQDevice* device, const DataRef& inputData, const std::string& fairMQChannel) const
96+
{
97+
const auto* dh = header::get<header::DataHeader*>(inputData.header);
98+
assert(dh);
99+
const auto* dph = header::get<DataProcessingHeader*>(inputData.header);
100+
assert(dph);
101+
102+
header::DataHeader dhout{dh->dataDescription, dh->dataOrigin, dh->subSpecification, dh->payloadSize};
103+
dhout.payloadSerializationMethod = dh->payloadSerializationMethod;
104+
DataProcessingHeader dphout{ dph->startTime, dph->duration};
105+
o2::header::Stack headerStack{ dhout, dphout };
106+
107+
auto channelAlloc = o2::memory_resource::getTransportAllocator(device->Transport());
108+
FairMQMessagePtr msgHeaderStack = o2::memory_resource::getMessage(std::move(headerStack), channelAlloc);
109+
110+
char* payloadCopy = new char[dh->payloadSize];
111+
memcpy(payloadCopy, inputData.payload, dh->payloadSize);
112+
auto cleanupFcn = [](void* data, void*) { delete[] reinterpret_cast<char*>(data); };
113+
FairMQMessagePtr msgPayload(device->NewMessage(payloadCopy, dh->payloadSize, cleanupFcn, payloadCopy));
114+
115+
FairMQParts message;
116+
message.AddPart(move(msgHeaderStack));
117+
message.AddPart(move(msgPayload));
118+
119+
int64_t bytesSent = device->Send(message, fairMQChannel);
120+
}
121+
122+
void WorkflowDispatcher::registerPath(const std::pair<InputSpec, OutputSpec>& path)
123+
{
124+
//todo: take care of inputs inclusive in others
125+
auto cmp = [a = path.first](const InputSpec b)
126+
{
127+
return a.origin == b.origin && a.description == b.description && a.subSpec == b.subSpec && a.lifetime == b.lifetime;
128+
};
129+
130+
if (std::find_if(inputs.begin(), inputs.end(), cmp) == inputs.end()) {
131+
inputs.push_back(path.first);
132+
LOG(DEBUG) << "Registering input " << path.first.origin.str << " " << path.first.description.str << " "
133+
<< path.first.subSpec;
134+
} else {
135+
LOG(DEBUG) << "Input " << path.first.origin.str << " " << path.first.description.str << " " << path.first.subSpec
136+
<< " already registered";
137+
}
138+
139+
outputs.push_back(path.second);
140+
}
141+
142+
const std::string& WorkflowDispatcher::getName()
143+
{
144+
return mName;
145+
}
146+
147+
Inputs WorkflowDispatcher::getInputSpecs()
148+
{
149+
return inputs;
150+
}
151+
152+
Outputs WorkflowDispatcher::getOutputSpecs()
153+
{
154+
return outputs;
155+
}
156+
157+
} // namespace framework
158+
} // namespace o2

0 commit comments

Comments
 (0)