Skip to content

Commit fd342d2

Browse files
knopers8ktf
authored andcommitted
Automatic InputSpecs generation for a policy
1 parent 8ab6f83 commit fd342d2

File tree

3 files changed

+53
-0
lines changed

3 files changed

+53
-0
lines changed

Framework/Core/include/Framework/DataSampling.h

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -78,6 +78,10 @@ class DataSampling
7878
static void CustomizeInfrastructure(std::vector<CompletionPolicy>&);
7979
/// \brief Applies blocking/nonblocking data sampling configuration to the workflow.
8080
static void CustomizeInfrastructure(std::vector<ChannelConfigurationPolicy>&);
81+
/// \brief Provides InputSpecs to receive data for given DataSamplingPolicy
82+
static std::vector<InputSpec> InputSpecsForPolicy(const std::string& policiesSource, const std::string& policyName);
83+
/// \brief Provides InputSpecs to receive data for given DataSamplingPolicy
84+
static std::vector<InputSpec> InputSpecsForPolicy(configuration::ConfigurationInterface* const config, const std::string& policyName);
8185

8286
private:
8387
// Internal functions, used by GenerateInfrastructure()

Framework/Core/src/DataSampling.cxx

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -112,5 +112,38 @@ void DataSampling::CustomizeInfrastructure(std::vector<ChannelConfigurationPolic
112112
// now it cannot be done, since matching is possible only using data processors names
113113
}
114114

115+
std::vector<InputSpec> DataSampling::InputSpecsForPolicy(const std::string& policiesSource, const std::string& policyName)
116+
{
117+
std::unique_ptr<ConfigurationInterface> config = ConfigurationFactory::getConfiguration(policiesSource);
118+
return InputSpecsForPolicy(config.get(), policyName);
119+
}
120+
121+
std::vector<InputSpec> DataSampling::InputSpecsForPolicy(ConfigurationInterface* const config, const std::string& policyName)
122+
{
123+
std::vector<InputSpec> inputs;
124+
auto policiesTree = config->getRecursive("dataSamplingPolicies");
125+
126+
for (auto&& policyConfig : policiesTree) {
127+
if (policyConfig.second.get<std::string>("id") == policyName) {
128+
DataSamplingPolicy policy(policyConfig.second);
129+
if (policy.getSubSpec() == -1) {
130+
//fixme: support it, when wildcards are available
131+
LOG(WARNING) << "InputSpecsForPolicy does not support subscriptions to all subSpecs yet.";
132+
}
133+
for (const auto& path : policy.getPathMap()) {
134+
inputs.push_back(
135+
InputSpec{
136+
path.second.binding.value,
137+
path.second.origin,
138+
path.second.description,
139+
path.second.subSpec,
140+
path.second.lifetime });
141+
}
142+
}
143+
break;
144+
}
145+
return inputs;
146+
}
147+
115148
} // namespace framework
116149
} // namespace o2

Framework/Core/test/test_DataSampling.cxx

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -235,3 +235,19 @@ BOOST_AUTO_TEST_CASE(DataSamplingFairMq)
235235
BOOST_REQUIRE(channelConfig != disp->options.end());
236236
}
237237

238+
BOOST_AUTO_TEST_CASE(InputSpecsForPolicy)
239+
{
240+
std::string configFilePath = "json://" + std::string(getenv("O2_ROOT")) + "/share/tests/test_DataSampling.json";
241+
std::vector<InputSpec> inputs = DataSampling::InputSpecsForPolicy(configFilePath, "tpcclusters");
242+
243+
BOOST_CHECK_EQUAL(inputs.size(), 2);
244+
BOOST_CHECK_EQUAL(inputs[0], (InputSpec{ "clusters_p", "DS", "tpcclusters-1", static_cast<DataHeader::SubSpecificationType>(-1) }));
245+
BOOST_CHECK_EQUAL(inputs[0].binding, "clusters_p");
246+
BOOST_CHECK_EQUAL(inputs[1], (InputSpec{ "clusters", "DS", "tpcclusters-0", static_cast<DataHeader::SubSpecificationType>(-1) }));
247+
BOOST_CHECK_EQUAL(inputs[1].binding, "clusters");
248+
249+
std::unique_ptr<ConfigurationInterface> config = ConfigurationFactory::getConfiguration(configFilePath);
250+
inputs = DataSampling::InputSpecsForPolicy(config.get(), "tpcclusters");
251+
252+
BOOST_CHECK_EQUAL(inputs.size(), 2);
253+
}

0 commit comments

Comments
 (0)