Skip to content

Commit cc593eb

Browse files
knopers8ktf
authored andcommitted
Add a class representing certain policy of sampling data.
1 parent 18de589 commit cc593eb

File tree

4 files changed

+380
-0
lines changed

4 files changed

+380
-0
lines changed

Framework/Core/CMakeLists.txt

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@ set(SRCS
3838
src/DataSamplingConditionNConsecutive.cxx
3939
src/DataSamplingConditionFactory.cxx
4040
src/DataSamplingReadoutAdapter.cxx
41+
src/DataSamplingPolicy.cxx
4142
src/DataSourceDevice.cxx
4243
src/DeviceMetricsInfo.cxx
4344
src/DeviceSpec.cxx
@@ -145,6 +146,7 @@ set(HEADERS
145146
include/Framework/DataSamplingCondition.h
146147
include/Framework/DataSamplingConditionFactory.h
147148
include/Framework/DataSamplingReadoutAdapter.h
149+
include/Framework/DataSamplingPolicy.h
148150
include/Framework/DPLBoostSerializer.h
149151
include/Framework/TableBuilder.h
150152
include/Framework/FairMQResizableBuffer.h
@@ -218,6 +220,7 @@ set(TEST_SRCS
218220
test/test_DataRelayer.cxx
219221
test/test_DataSampling.cxx
220222
test/test_DataSamplingCondition.cxx
223+
test/test_DataSamplingPolicy.cxx
221224
test/test_DeviceMetricsInfo.cxx
222225
test/test_DeviceSpec.cxx
223226
test/test_ExternalFairMQDeviceProxy.cxx
Lines changed: 102 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,102 @@
1+
// Copyright CERN and copyright holders of ALICE O2. This software is
2+
// distributed under the terms of the GNU General Public License v3 (GPL
3+
// Version 3), copied verbatim in the file "COPYING".
4+
//
5+
// See http://alice-o2.web.cern.ch/license for full licensing information.
6+
//
7+
// In applying this license CERN does not waive the privileges and immunities
8+
// granted to it by virtue of its status as an Intergovernmental Organization
9+
// or submit itself to any jurisdiction.
10+
11+
#ifndef ALICEO2_DATASAMPLINGPOLICY_H
12+
#define ALICEO2_DATASAMPLINGPOLICY_H
13+
14+
/// \file DataSamplingPolicy.h
15+
/// \brief A declaration of O2 Data Sampling Policy
16+
///
17+
/// \author Piotr Konopka, piotr.jan.konopka@cern.ch
18+
19+
#include <mutex>
20+
#include <boost/property_tree/ptree.hpp>
21+
22+
#include "Headers/DataHeader.h"
23+
#include "Framework/InputSpec.h"
24+
#include "Framework/Output.h"
25+
#include "Framework/OutputSpec.h"
26+
#include "Framework/DataSamplingCondition.h"
27+
28+
namespace o2
29+
{
30+
namespace framework
31+
{
32+
33+
/// A class representing certain policy of sampling data.
34+
///
35+
/// This class stores information about specified sampling policy - data headers and conditions of sampling.
36+
/// For given InputSpec, it can provide corresponding Output to pass data further.
37+
class DataSamplingPolicy
38+
{
39+
private:
40+
struct inputSpecHasher {
41+
size_t operator()(const InputSpec& i) const
42+
{
43+
return (static_cast<size_t>(i.description.itg[0]) << 32 |
44+
static_cast<size_t>(i.description.itg[1])) ^
45+
static_cast<size_t>(i.origin.itg[0]);
46+
}
47+
};
48+
struct inputSpecEqual {
49+
bool operator()(const InputSpec& a, const InputSpec& b) const
50+
{
51+
// -1 means match all subSpec
52+
if (a.subSpec == -1 || b.subSpec == -1) {
53+
return a.description == b.description && a.origin == b.origin;
54+
} else {
55+
return a.description == b.description && a.origin == b.origin && a.subSpec == b.subSpec;
56+
}
57+
}
58+
};
59+
using PathMap = std::unordered_map<InputSpec, OutputSpec, inputSpecHasher, inputSpecEqual>;
60+
61+
public:
62+
/// \brief Constructor.
63+
DataSamplingPolicy();
64+
/// \brief Constructor.
65+
DataSamplingPolicy(const boost::property_tree::ptree&);
66+
/// \brief Destructor
67+
~DataSamplingPolicy();
68+
69+
/// \brief Configures a policy using structured configuration entry.
70+
void configure(const boost::property_tree::ptree&);
71+
/// \brief Returns true if this policy requires data with given InputSpec.
72+
bool match(const InputSpec&) const;
73+
/// \brief Returns true if user-defined conditions of sampling are fulfilled.
74+
bool decide(const o2::framework::DataRef&);
75+
/// \brief Returns Output for given InputSpec to pass data forward.
76+
const Output prepareOutput(const InputSpec&) const;
77+
78+
const std::string& getName() const;
79+
// const std::vector<InputSpec>& getInputs() const;
80+
const PathMap& getPathMap() const;
81+
const header::DataHeader::SubSpecificationType getSubSpec() const;
82+
// optional fairmq channel to send stuff outside of DPL
83+
const std::string& getFairMQOutputChannel() const;
84+
std::string getFairMQOutputChannelName() const;
85+
86+
static header::DataOrigin policyDataOrigin();
87+
static header::DataDescription policyDataDescription(std::string policyName, size_t id);
88+
89+
private:
90+
std::string mName;
91+
PathMap mPaths;
92+
header::DataHeader::SubSpecificationType mSubSpec;
93+
std::vector<std::unique_ptr<DataSamplingCondition>> mConditions;
94+
std::string mFairMQOutputChannel;
95+
96+
std::mutex mDecisionMutex;
97+
};
98+
99+
} // namespace framework
100+
} // namespace o2
101+
102+
#endif // ALICEO2_DATASAMPLINGPOLICY_H
Lines changed: 166 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,166 @@
1+
// Copyright CERN and copyright holders of ALICE O2. This software is
2+
// distributed under the terms of the GNU General Public License v3 (GPL
3+
// Version 3), copied verbatim in the file "COPYING".
4+
//
5+
// See http://alice-o2.web.cern.ch/license for full licensing information.
6+
//
7+
// In applying this license CERN does not waive the privileges and immunities
8+
// granted to it by virtue of its status as an Intergovernmental Organization
9+
// or submit itself to any jurisdiction.
10+
11+
/// \file DataSamplingPolicy.cxx
12+
/// \brief Implementation of O2 Data Sampling Policy
13+
///
14+
/// \author Piotr Konopka, piotr.jan.konopka@cern.ch
15+
16+
#include "Framework/DataSamplingPolicy.h"
17+
#include "Framework/DataSamplingConditionFactory.h"
18+
19+
namespace o2
20+
{
21+
namespace framework
22+
{
23+
24+
using boost::property_tree::ptree;
25+
26+
DataSamplingPolicy::DataSamplingPolicy()
27+
{
28+
}
29+
30+
DataSamplingPolicy::DataSamplingPolicy(const ptree& config)
31+
{
32+
configure(config);
33+
}
34+
35+
DataSamplingPolicy::~DataSamplingPolicy()
36+
{
37+
}
38+
39+
void DataSamplingPolicy::configure(const ptree& config)
40+
{
41+
mName = config.get<std::string>("id");
42+
if (mName.size() > 14) {
43+
LOG(WARNING) << "DataSamplingPolicy name '" << mName << "' is longer than 14 characters, trimming.";
44+
mName.resize(14);
45+
}
46+
47+
// todo: get the sub spec range if there is a requirement and when dpl supports it
48+
auto subSpecString = config.get<std::string>("subSpec");
49+
mSubSpec = subSpecString.find_first_of("-*") != std::string::npos ? -1 : std::strtoull(subSpecString.c_str(), nullptr, 10);
50+
mPaths.clear();
51+
size_t outputId = 0;
52+
53+
for (const auto& dataHeaderConfig : config.get_child("dataHeaders")) {
54+
55+
header::DataOrigin origin;
56+
header::DataDescription description;
57+
origin.runtimeInit(dataHeaderConfig.second.get<std::string>("dataOrigin").c_str());
58+
description.runtimeInit(dataHeaderConfig.second.get<std::string>("dataDescription").c_str());
59+
60+
InputSpec inputSpec{
61+
dataHeaderConfig.second.get<std::string>("binding"),
62+
origin,
63+
description,
64+
mSubSpec
65+
};
66+
67+
OutputSpec outputSpec{
68+
{ dataHeaderConfig.second.get<std::string>("binding") },
69+
policyDataOrigin(),
70+
policyDataDescription(mName, outputId++),
71+
mSubSpec
72+
};
73+
74+
mPaths.emplace(inputSpec, outputSpec);
75+
if (outputId > 9) {
76+
LOG(ERROR) << "Maximum 10 inputs in DataSamplingPolicy are supported";
77+
break;
78+
}
79+
}
80+
81+
mConditions.clear();
82+
for (const auto& conditionConfig : config.get_child("samplingConditions")) {
83+
mConditions.push_back(DataSamplingConditionFactory::create(conditionConfig.second.get<std::string>("condition")));
84+
mConditions.back()->configure(conditionConfig.second);
85+
}
86+
87+
mFairMQOutputChannel = config.get_optional<std::string>("fairMQOutput").value_or("");
88+
}
89+
90+
bool DataSamplingPolicy::match(const InputSpec& input) const
91+
{
92+
return mPaths.find(input) != mPaths.end();
93+
}
94+
95+
bool DataSamplingPolicy::decide(const o2::framework::DataRef& dataRef)
96+
{
97+
// protect from accessing conditions from different time-pipeline threads
98+
std::lock_guard<std::mutex> guard(mDecisionMutex);
99+
100+
return std::all_of(mConditions.begin(), mConditions.end(),
101+
[dataRef](std::unique_ptr<DataSamplingCondition>& condition) {
102+
return condition->decide(dataRef);
103+
});
104+
}
105+
106+
const Output DataSamplingPolicy::prepareOutput(const InputSpec& input) const
107+
{
108+
auto result = mPaths.find(input);
109+
return result != mPaths.end() ?
110+
Output{ result->second.origin, result->second.description, input.subSpec, result->second.lifetime } :
111+
Output{ header::gDataOriginInvalid, header::gDataDescriptionInvalid };
112+
}
113+
114+
const std::string& DataSamplingPolicy::getName() const
115+
{
116+
return mName;
117+
}
118+
119+
//const std::vector<InputSpec>& DataSamplingPolicy::getInputs() const
120+
//{
121+
// return mInputs;
122+
//}
123+
124+
const DataSamplingPolicy::PathMap& DataSamplingPolicy::getPathMap() const
125+
{
126+
return mPaths;
127+
}
128+
129+
const std::string& DataSamplingPolicy::getFairMQOutputChannel() const
130+
{
131+
return mFairMQOutputChannel;
132+
}
133+
134+
std::string DataSamplingPolicy::getFairMQOutputChannelName() const
135+
{
136+
size_t nameBegin = mFairMQOutputChannel.find("name=") + sizeof("name=") - 1;
137+
size_t nameEnd = mFairMQOutputChannel.find_first_of(',', nameBegin);
138+
std::string name = mFairMQOutputChannel.substr(nameBegin, nameEnd - nameBegin);
139+
return name;
140+
}
141+
142+
143+
const header::DataHeader::SubSpecificationType DataSamplingPolicy::getSubSpec() const
144+
{
145+
return mSubSpec;
146+
}
147+
148+
header::DataOrigin DataSamplingPolicy::policyDataOrigin()
149+
{
150+
return header::DataOrigin("DS");
151+
}
152+
153+
header::DataDescription DataSamplingPolicy::policyDataDescription(std::string policyName, size_t id)
154+
{
155+
if (policyName.size() > 14) {
156+
LOG(WARNING) << "DataSamplingPolicy name '" << policyName << "' is longer than 14 characters, trimming in dataDescription.";
157+
policyName.resize(14);
158+
}
159+
160+
header::DataDescription outputDescription;
161+
outputDescription.runtimeInit(std::string(policyName + "-" + std::to_string(id)).c_str());
162+
return outputDescription;
163+
}
164+
165+
} // namespace framework
166+
} // namespace o2

0 commit comments

Comments
 (0)