Skip to content

Commit 032b405

Browse files
authored
DPL: allow configuring FairMQ channels used by DPL (#4995)
In particular use it to configure the logging rate.
1 parent ad10690 commit 032b405

14 files changed

+161
-102
lines changed

Framework/Core/include/Framework/ChannelConfigurationPolicy.h

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -7,20 +7,20 @@
77
// In applying this license CERN does not waive the privileges and immunities
88
// granted to it by virtue of its status as an Intergovernmental Organization
99
// or submit itself to any jurisdiction.
10-
#ifndef FRAMEWORK_CHANNELCONFIGURATIONPOLICY_H
11-
#define FRAMEWORK_CHANNELCONFIGURATIONPOLICY_H
10+
#ifndef O2_FRAMEWORK_CHANNELCONFIGURATIONPOLICY_H_
11+
#define O2_FRAMEWORK_CHANNELCONFIGURATIONPOLICY_H_
1212

1313
#include "Framework/ChannelConfigurationPolicyHelpers.h"
1414
#include "Framework/ChannelSpec.h"
1515

1616
#include <vector>
1717
#include <functional>
1818

19-
namespace o2
20-
{
21-
namespace framework
19+
namespace o2::framework
2220
{
2321

22+
struct ConfigContext;
23+
2424
/// A ChannelConfigurationPolicy allows the user
2525
/// to customise connection method and type
2626
/// for a channel created between two devices.
@@ -37,10 +37,10 @@ struct ChannelConfigurationPolicy {
3737
Helpers::InputChannelModifier modifyInput = nullptr;
3838
Helpers::OutputChannelModifier modifyOutput = nullptr;
3939

40-
static std::vector<ChannelConfigurationPolicy> createDefaultPolicies();
40+
/// Default policies to use, based on the contents of the @configContex content
41+
static std::vector<ChannelConfigurationPolicy> createDefaultPolicies(ConfigContext const& configContext);
4142
};
4243

43-
} // namespace framework
44-
} // namespace o2
44+
} // namespace o2::framework
4545

46-
#endif // FRAMEWORK_CHANNELCONFIGURATIONPOLICY_H
46+
#endif // O2_FRAMEWORK_CHANNELCONFIGURATIONPOLICY_H_

Framework/Core/include/Framework/ChannelConfigurationPolicyHelpers.h

Lines changed: 15 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -14,11 +14,13 @@
1414

1515
#include <functional>
1616

17-
namespace o2
18-
{
19-
namespace framework
17+
namespace o2::framework
2018
{
2119

20+
struct FairMQChannelConfigSpec {
21+
int64_t rateLogging;
22+
};
23+
2224
/// A set of helpers for common ChannelConfigurationPolicy behaviors
2325
struct ChannelConfigurationPolicyHelpers {
2426
// TODO: currently we allow matching of the policy only based on
@@ -41,24 +43,23 @@ struct ChannelConfigurationPolicyHelpers {
4143

4244
// Some trivial modifier which can be used by the policy.
4345
/// Makes the passed input channel connect and subscribe
44-
static InputChannelModifier subscribeInput;
46+
static InputChannelModifier subscribeInput(FairMQChannelConfigSpec const& spec);
4547
/// Makes the passed output channel bind and subscribe
46-
static OutputChannelModifier publishOutput;
48+
static OutputChannelModifier publishOutput(FairMQChannelConfigSpec const& spec);
4749
/// Makes the passed input channel connect and pull
48-
static InputChannelModifier pullInput;
50+
static InputChannelModifier pullInput(FairMQChannelConfigSpec const& spec);
4951
/// Makes the passed output channel bind and push
50-
static OutputChannelModifier pushOutput;
52+
static OutputChannelModifier pushOutput(FairMQChannelConfigSpec const& spec);
5153
/// Makes the passed input channel connect and request
52-
static InputChannelModifier reqInput;
54+
static InputChannelModifier reqInput(FairMQChannelConfigSpec const& spec);
5355
/// Makes the passed output channel bind and reply
54-
static OutputChannelModifier replyOutput;
56+
static OutputChannelModifier replyOutput(FairMQChannelConfigSpec const& spec);
5557
/// Makes the passed input channel connect and pair
56-
static InputChannelModifier pairInput;
58+
static InputChannelModifier pairInput(FairMQChannelConfigSpec const& spec);
5759
/// Makes the passed output channel bind and pair
58-
static OutputChannelModifier pairOutput;
60+
static OutputChannelModifier pairOutput(FairMQChannelConfigSpec const& spec);
5961
};
6062

61-
} // namespace framework
62-
} // namespace o2
63+
} // namespace o2::framework
6364

64-
#endif // FRAMEWORK_CHANNELCONFIGURATIONPOLICY_H
65+
#endif // O2_FRAMEWORK_CHANNELCONFIGURATIONPOLICY_H_

Framework/Core/include/Framework/ChannelSpec.h

Lines changed: 7 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -7,14 +7,12 @@
77
// In applying this license CERN does not waive the privileges and immunities
88
// granted to it by virtue of its status as an Intergovernmental Organization
99
// or submit itself to any jurisdiction.
10-
#ifndef FRAMEWORK_CHANNELSPEC_H
11-
#define FRAMEWORK_CHANNELSPEC_H
10+
#ifndef O2_FRAMEWORK_CHANNELSPEC_H_
11+
#define O2_FRAMEWORK_CHANNELSPEC_H_
1212

1313
#include <string>
1414

15-
namespace o2
16-
{
17-
namespace framework
15+
namespace o2::framework
1816
{
1917

2018
/// These map to zeromq connection
@@ -50,6 +48,7 @@ struct InputChannelSpec {
5048
std::string hostname;
5149
unsigned short port;
5250
ChannelProtocol protocol = ChannelProtocol::Network;
51+
size_t rateLogging = 60;
5352
};
5453

5554
/// This describes an output channel. Output channels are semantically
@@ -65,9 +64,9 @@ struct OutputChannelSpec {
6564
unsigned short port;
6665
size_t listeners;
6766
ChannelProtocol protocol = ChannelProtocol::Network;
67+
size_t rateLogging = 60;
6868
};
6969

70-
} // namespace framework
71-
} // namespace o2
70+
} // namespace o2::framework
7271

73-
#endif // FRAMEWORK_CHANNELSPEC_H
72+
#endif // O2_FRAMEWORK_CHANNELSPEC_H_

Framework/Core/include/Framework/runDataProcessing.h

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -138,6 +138,8 @@ int main(int argc, char** argv)
138138
workflowOptions.push_back(ConfigParamSpec{"aod-writer-ntfmerge", VariantType::Int, -1, {"Number of time frames to merge into one file"}});
139139
workflowOptions.push_back(ConfigParamSpec{"aod-writer-keep", VariantType::String, "", {"Comma separated list of ORIGIN/DESCRIPTION/SUBSPECIFICATION:treename:col1/col2/..:filename"}});
140140

141+
workflowOptions.push_back(ConfigParamSpec{"fairmq-rate-logging", VariantType::Int, 60, {"Rate logging for FairMQ channels"}});
142+
141143
workflowOptions.push_back(ConfigParamSpec{"forwarding-policy",
142144
VariantType::String,
143145
"dangling",
@@ -150,10 +152,6 @@ int main(int argc, char** argv)
150152
{"Destination for forwarded messages."
151153
" file: write to file,"
152154
" fairmq: send to output proxy"}});
153-
std::vector<ChannelConfigurationPolicy> channelPolicies;
154-
UserCustomizationsHelper::userDefinedCustomization(channelPolicies, 0);
155-
auto defaultChannelPolicies = ChannelConfigurationPolicy::createDefaultPolicies();
156-
channelPolicies.insert(std::end(channelPolicies), std::begin(defaultChannelPolicies), std::end(defaultChannelPolicies));
157155

158156
std::vector<CompletionPolicy> completionPolicies;
159157
UserCustomizationsHelper::userDefinedCustomization(completionPolicies, 0);
@@ -178,6 +176,10 @@ int main(int argc, char** argv)
178176
for (auto& spec : specs) {
179177
UserCustomizationsHelper::userDefinedCustomization(spec.requiredServices, 0);
180178
}
179+
std::vector<ChannelConfigurationPolicy> channelPolicies;
180+
UserCustomizationsHelper::userDefinedCustomization(channelPolicies, 0);
181+
auto defaultChannelPolicies = ChannelConfigurationPolicy::createDefaultPolicies(configContext);
182+
channelPolicies.insert(std::end(channelPolicies), std::begin(defaultChannelPolicies), std::end(defaultChannelPolicies));
181183
result = doMain(argc, argv, specs, channelPolicies, completionPolicies, dispatchPolicies, workflowOptions, configContext);
182184
} catch (boost::exception& e) {
183185
doBoostException(e);

Framework/Core/src/ChannelConfigurationPolicy.cxx

Lines changed: 7 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -9,20 +9,19 @@
99
// or submit itself to any jurisdiction.
1010

1111
#include "Framework/ChannelConfigurationPolicy.h"
12+
#include "Framework/ConfigContext.h"
1213

13-
namespace o2
14-
{
15-
namespace framework
14+
namespace o2::framework
1615
{
1716

18-
std::vector<ChannelConfigurationPolicy> ChannelConfigurationPolicy::createDefaultPolicies()
17+
std::vector<ChannelConfigurationPolicy> ChannelConfigurationPolicy::createDefaultPolicies(ConfigContext const& configContext)
1918
{
2019
ChannelConfigurationPolicy defaultPolicy;
2120
defaultPolicy.match = ChannelConfigurationPolicyHelpers::matchAny;
22-
defaultPolicy.modifyInput = ChannelConfigurationPolicyHelpers::pullInput;
23-
defaultPolicy.modifyOutput = ChannelConfigurationPolicyHelpers::pushOutput;
21+
defaultPolicy.modifyInput = ChannelConfigurationPolicyHelpers::pullInput({configContext.options().get<int>("fairmq-rate-logging")});
22+
defaultPolicy.modifyOutput = ChannelConfigurationPolicyHelpers::pushOutput({configContext.options().get<int>("fairmq-rate-logging")});
23+
2424
return {defaultPolicy};
2525
}
2626

27-
} // namespace framework
28-
} // namespace o2
27+
} // namespace o2::framework

Framework/Core/src/ChannelConfigurationPolicyHelpers.cxx

Lines changed: 32 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -13,9 +13,7 @@
1313
#include <string>
1414
#include "Framework/ChannelSpec.h"
1515

16-
namespace o2
17-
{
18-
namespace framework
16+
namespace o2::framework
1917
{
2018

2119
ChannelConfigurationPolicyHelpers::PolicyMatcher ChannelConfigurationPolicyHelpers::matchAny =
@@ -37,41 +35,58 @@ ChannelConfigurationPolicyHelpers::PolicyMatcher ChannelConfigurationPolicyHelpe
3735
return [nameString](std::string const&, std::string const& consumerId) -> bool { return consumerId == nameString; };
3836
}
3937

40-
ChannelConfigurationPolicyHelpers::InputChannelModifier ChannelConfigurationPolicyHelpers::subscribeInput =
41-
[](InputChannelSpec& channel) {
38+
ChannelConfigurationPolicyHelpers::InputChannelModifier ChannelConfigurationPolicyHelpers::subscribeInput(FairMQChannelConfigSpec const& spec)
39+
{
40+
return [spec](InputChannelSpec& channel) {
4241
channel.method = ChannelMethod::Connect;
4342
channel.type = ChannelType::Sub;
43+
channel.rateLogging = spec.rateLogging;
4444
};
45+
}
4546

46-
ChannelConfigurationPolicyHelpers::OutputChannelModifier ChannelConfigurationPolicyHelpers::publishOutput =
47-
[](OutputChannelSpec& channel) {
47+
ChannelConfigurationPolicyHelpers::OutputChannelModifier ChannelConfigurationPolicyHelpers::publishOutput(FairMQChannelConfigSpec const& spec)
48+
{
49+
return [spec](OutputChannelSpec& channel) {
4850
channel.method = ChannelMethod::Bind;
4951
channel.type = ChannelType::Pub;
52+
channel.rateLogging = spec.rateLogging;
5053
};
54+
}
5155

52-
ChannelConfigurationPolicyHelpers::InputChannelModifier ChannelConfigurationPolicyHelpers::pullInput =
53-
[](InputChannelSpec& channel) {
56+
ChannelConfigurationPolicyHelpers::InputChannelModifier ChannelConfigurationPolicyHelpers::pullInput(FairMQChannelConfigSpec const& spec)
57+
{
58+
return [spec](InputChannelSpec& channel) {
5459
channel.method = ChannelMethod::Connect;
5560
channel.type = ChannelType::Pull;
61+
channel.rateLogging = spec.rateLogging;
5662
};
63+
}
5764

58-
ChannelConfigurationPolicyHelpers::OutputChannelModifier ChannelConfigurationPolicyHelpers::pushOutput =
59-
[](OutputChannelSpec& channel) {
65+
ChannelConfigurationPolicyHelpers::OutputChannelModifier ChannelConfigurationPolicyHelpers::pushOutput(FairMQChannelConfigSpec const& spec)
66+
{
67+
return [spec](OutputChannelSpec& channel) {
6068
channel.method = ChannelMethod::Bind;
6169
channel.type = ChannelType::Push;
70+
channel.rateLogging = spec.rateLogging;
6271
};
72+
}
6373

64-
ChannelConfigurationPolicyHelpers::InputChannelModifier ChannelConfigurationPolicyHelpers::pairInput =
65-
[](InputChannelSpec& channel) {
74+
ChannelConfigurationPolicyHelpers::InputChannelModifier ChannelConfigurationPolicyHelpers::pairInput(FairMQChannelConfigSpec const& spec)
75+
{
76+
return [spec](InputChannelSpec& channel) {
6677
channel.method = ChannelMethod::Connect;
6778
channel.type = ChannelType::Pair;
79+
channel.rateLogging = spec.rateLogging;
6880
};
81+
}
6982

70-
ChannelConfigurationPolicyHelpers::OutputChannelModifier ChannelConfigurationPolicyHelpers::pairOutput =
71-
[](OutputChannelSpec& channel) {
83+
ChannelConfigurationPolicyHelpers::OutputChannelModifier ChannelConfigurationPolicyHelpers::pairOutput(FairMQChannelConfigSpec const& spec)
84+
{
85+
return [spec](OutputChannelSpec& channel) {
7286
channel.method = ChannelMethod::Bind;
7387
channel.type = ChannelType::Pair;
88+
channel.rateLogging = spec.rateLogging;
7489
};
90+
}
7591

76-
} // namespace framework
77-
} // namespace o2
92+
} // namespace o2::framework

Framework/Core/src/DeviceSpecHelpers.cxx

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -253,7 +253,7 @@ std::string DeviceSpecHelpers::inputChannel2String(const InputChannelSpec& chann
253253
result += std::string("type=") + ChannelSpecHelpers::typeAsString(channel.type);
254254
result += std::string(",method=") + ChannelSpecHelpers::methodAsString(channel.method);
255255
result += std::string(",address=") + ChannelSpecHelpers::channelUrl(channel);
256-
result += std::string(",rateLogging=60");
256+
result += std::string(",rateLogging=" + std::to_string(channel.rateLogging));
257257

258258
return result;
259259
}
@@ -268,7 +268,7 @@ std::string DeviceSpecHelpers::outputChannel2String(const OutputChannelSpec& cha
268268
result += std::string("type=") + ChannelSpecHelpers::typeAsString(channel.type);
269269
result += std::string(",method=") + ChannelSpecHelpers::methodAsString(channel.method);
270270
result += std::string(",address=") + ChannelSpecHelpers::channelUrl(channel);
271-
result += std::string(",rateLogging=60");
271+
result += std::string(",rateLogging=" + std::to_string(channel.rateLogging));
272272

273273
return result;
274274
}

Framework/Core/test/Mocking.h

Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,45 @@
1+
// Copyright CERN and copyright holders of ALICE O2. This software is
2+
// distributed under the terms of the GNU General Public License v3 (GPL
3+
// Version 3), copied verbatim in the file "COPYING".
4+
//
5+
// See http://alice-o2.web.cern.ch/license for full licensing information.
6+
//
7+
// In applying this license CERN does not waive the privileges and immunities
8+
// granted to it by virtue of its status as an Intergovernmental Organization
9+
// or submit itself to any jurisdiction.
10+
#include "test_HelperMacros.h"
11+
#include "Framework/ConfigContext.h"
12+
#include "Framework/WorkflowSpec.h"
13+
#include "Framework/DataSpecUtils.h"
14+
#include "Framework/SimpleOptionsRetriever.h"
15+
#include "../src/WorkflowHelpers.h"
16+
17+
using namespace o2::framework;
18+
19+
std::unique_ptr<ConfigContext> makeEmptyConfigContext()
20+
{
21+
// FIXME: Ugly... We need to fix ownership and make sure the ConfigContext
22+
// either owns or shares ownership of the registry.
23+
std::vector<std::unique_ptr<ParamRetriever>> retrievers;
24+
static std::vector<ConfigParamSpec> specs = {
25+
ConfigParamSpec{"forwarding-policy",
26+
VariantType::String,
27+
"dangling",
28+
{""}},
29+
ConfigParamSpec{"forwarding-destination",
30+
VariantType::String,
31+
"file",
32+
{"what to do with dangling outputs. file: write to file, fairmq: send to output proxy"}},
33+
ConfigParamSpec{"fairmq-rate-logging",
34+
VariantType::Int,
35+
60,
36+
{"rateLogging"}},
37+
};
38+
specs.push_back(ConfigParamSpec{"aod-memory-rate-limit", VariantType::String, "0", {"rate"}});
39+
auto store = std::make_unique<ConfigParamStore>(specs, std::move(retrievers));
40+
store->preload();
41+
store->activate();
42+
static ConfigParamRegistry registry(std::move(store));
43+
auto context = std::make_unique<ConfigContext>(registry, 0, nullptr);
44+
return context;
45+
}

0 commit comments

Comments
 (0)