Skip to content

Commit 18de589

Browse files
committed
DPL: prevent InputSpec default constructor
As we move closer to have InputSpec as a veritable "matcher" object rather than a set of variables to be equal compared, I need to tight the access to the member variables. The first adiabatic step in such a direction is to make sure an InputSpec is built all at once, and not incrementally, as that ability will be lost once we move to use a DataDescriptorMatcher to describe the InputSpec internal State.
1 parent c14e7d6 commit 18de589

File tree

8 files changed

+114
-154
lines changed

8 files changed

+114
-154
lines changed

Framework/Core/include/Framework/InputSpec.h

Lines changed: 13 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -25,11 +25,22 @@ namespace framework
2525
/// input or in output. This can be used, for example to match
2626
/// specific payloads in a timeframe.
2727
struct InputSpec {
28+
/// This is the legacy way to construct things. For the moment we still allow
29+
/// accessing directly the members, but this will change as well at some point.
30+
InputSpec(std::string binding_, header::DataOrigin origin_, header::DataDescription description_, header::DataHeader::SubSpecificationType subSpec_ = 0, enum Lifetime lifetime_ = Lifetime::Timeframe)
31+
: binding{binding_},
32+
origin{origin_},
33+
description{description_},
34+
subSpec{subSpec_},
35+
lifetime{lifetime_}
36+
{
37+
}
38+
2839
std::string binding;
2940
header::DataOrigin origin;
3041
header::DataDescription description;
31-
header::DataHeader::SubSpecificationType subSpec = 0;
32-
enum Lifetime lifetime = Lifetime::Timeframe;
42+
header::DataHeader::SubSpecificationType subSpec;
43+
enum Lifetime lifetime;
3344

3445
bool operator==(InputSpec const& that) const
3546
{

Framework/Core/include/Framework/LifetimeHelpers.h

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -13,15 +13,16 @@
1313
#include "Framework/ExpirationHandler.h"
1414
#include "Framework/PartRef.h"
1515

16-
#include <functional>
1716
#include <chrono>
17+
#include <functional>
18+
#include <string>
1819

1920
namespace o2
2021
{
2122
namespace framework
2223
{
2324

24-
struct InputRoute;
25+
struct InputSpec;
2526

2627
/// Lifetime handlers are used to manage the cases in which data is not coming
2728
/// from the dataflow, but from some other source or trigger, e.g.,
@@ -58,13 +59,13 @@ struct LifetimeHelpers {
5859
/// FIXME: provide a way to customise the histogram from the configuration.
5960
static ExpirationHandler::Handler fetchFromObjectRegistry();
6061

61-
/// Enumerate entries on every invokation. @a route is the route which the
62+
/// Enumerate entries on every invokation. @a matcher is the InputSpec which the
6263
/// given enumeration refers to. In particular messages created by the
6364
/// returned ExpirationHandler will have an header which matches the
6465
/// dataOrigin, dataDescrition and dataSpecification of the given @a route.
6566
/// The payload of each message will contain an incremental number for each
6667
/// message being created.
67-
static ExpirationHandler::Handler enumerate(InputRoute const& route);
68+
static ExpirationHandler::Handler enumerate(InputSpec const& spec, std::string const& sourceChannel);
6869
};
6970

7071
} // namespace framework

Framework/Core/src/DataSampling.cxx

Lines changed: 11 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -185,21 +185,24 @@ QcTaskConfigurations DataSampling::readQcTasksConfiguration(const std::string& c
185185

186186
for (auto&& input : taskInputsSplit) {
187187

188-
InputSpec desiredData;
189-
try {
190-
desiredData.binding = configFile->get<std::string>(prefixConfigTasks + input + ".inputName");
191-
192-
std::string origin = configFile->get<std::string>(prefixConfigTasks + input + ".dataOrigin");
193-
origin.copy(desiredData.origin.str, (size_t)desiredData.origin.size);
188+
std::string binding;
189+
header::DataOrigin origin;
190+
header::DataDescription description;
194191

195-
std::string description = configFile->get<std::string>(prefixConfigTasks + input + ".dataDescription");
196-
description.copy(desiredData.description.str, (size_t)desiredData.description.size);
192+
try {
193+
binding = configFile->get<std::string>(prefixConfigTasks + input + ".inputName");
194+
std::string originStr = configFile->get<std::string>(prefixConfigTasks + input + ".dataOrigin");
195+
std::string descriptionStr = configFile->get<std::string>(prefixConfigTasks + input + ".dataDescription");
197196

197+
originStr.copy(origin.str, (size_t)origin.size);
198+
descriptionStr.copy(description.str, (size_t)description.size);
198199
} catch (...) {
199200
LOG(ERROR) << "QC Task configuration error. In file " << configurationSource << " input " << input
200201
<< " has missing values";
201202
continue;
202203
}
204+
205+
InputSpec desiredData{binding, origin, description, 0};
203206
task.desiredDataSpecs.push_back(desiredData);
204207

205208
// for temporary feature

Framework/Core/src/DeviceSpecHelpers.cxx

Lines changed: 40 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -58,10 +58,10 @@ struct ExpirationHandlerHelpers {
5858
return [](ConfigParamRegistry const&) { return LifetimeHelpers::expireNever(); };
5959
}
6060

61-
static InputRoute::ExpirationConfigurator expiringConditionConfigurator(InputRoute& route)
61+
static InputRoute::ExpirationConfigurator expiringConditionConfigurator(InputSpec const& matcher)
6262
{
63-
return [route](ConfigParamRegistry const&) {
64-
std::string prefix = std::string{ "/" } + route.matcher.origin.str + "/" + route.matcher.description.str;
63+
return [matcher](ConfigParamRegistry const&) {
64+
std::string prefix = std::string{ "/" } + matcher.origin.str + "/" + matcher.description.str;
6565
return LifetimeHelpers::fetchFromCCDBCache(prefix);
6666
};
6767
}
@@ -79,18 +79,18 @@ struct ExpirationHandlerHelpers {
7979
return [](ConfigParamRegistry const&) { return LifetimeHelpers::fetchFromQARegistry(); };
8080
}
8181

82-
static InputRoute::DanglingConfigurator danglingTimerConfigurator(InputRoute& route)
82+
static InputRoute::DanglingConfigurator danglingTimerConfigurator(InputSpec const& matcher)
8383
{
84-
return [route](ConfigParamRegistry const& options) {
85-
std::string rateName = std::string{ "period-" } + route.matcher.origin.str + "-" + route.matcher.description.str;
84+
return [matcher](ConfigParamRegistry const& options) {
85+
std::string rateName = std::string{ "period-" } + matcher.binding;
8686
auto period = options.get<int>(rateName.c_str());
8787
return LifetimeHelpers::expireTimed(std::chrono::milliseconds(period));
8888
};
8989
}
9090

91-
static InputRoute::ExpirationConfigurator expiringTimerConfigurator(InputRoute& route)
91+
static InputRoute::ExpirationConfigurator expiringTimerConfigurator(InputSpec const& matcher, std::string const& sourceChannel)
9292
{
93-
return [route](ConfigParamRegistry const&) { return LifetimeHelpers::enumerate(route); };
93+
return [matcher, sourceChannel](ConfigParamRegistry const&) { return LifetimeHelpers::enumerate(matcher, sourceChannel); };
9494
}
9595

9696
static InputRoute::DanglingConfigurator danglingTransientConfigurator()
@@ -101,7 +101,7 @@ struct ExpirationHandlerHelpers {
101101
return [](ConfigParamRegistry const&) { return LifetimeHelpers::expireNever(); };
102102
}
103103

104-
static InputRoute::ExpirationConfigurator expiringTransientConfigurator(InputRoute& route)
104+
static InputRoute::ExpirationConfigurator expiringTransientConfigurator(InputSpec const& matcher)
105105
{
106106
return [](ConfigParamRegistry const&) { return LifetimeHelpers::fetchFromObjectRegistry(); };
107107
}
@@ -252,11 +252,12 @@ void DeviceSpecHelpers::processOutEdgeActions(std::vector<DeviceSpec>& devices,
252252
};
253253
device.outputs.emplace_back(route);
254254
} else {
255-
ForwardRoute route;
256-
route.matcher = workflow[edge.consumer].inputs[edge.consumerInputIndex];
257-
route.channel = channel.name;
258-
route.timeslice = edge.timeIndex;
259-
route.maxTimeslices = consumer.maxInputTimeslices;
255+
ForwardRoute route{
256+
edge.timeIndex,
257+
consumer.maxInputTimeslices,
258+
workflow[edge.consumer].inputs[edge.consumerInputIndex],
259+
channel.name
260+
};
260261
device.forwards.emplace_back(route);
261262
}
262263
};
@@ -423,32 +424,43 @@ void DeviceSpecHelpers::processInEdgeActions(std::vector<DeviceSpec>& devices,
423424
auto const& edge = logicalEdges[ei];
424425
auto const& consumer = workflow[edge.consumer];
425426
auto& consumerDevice = devices[di];
426-
InputRoute route;
427-
route.matcher = consumer.inputs[edge.consumerInputIndex];
428-
route.sourceChannel = consumerDevice.inputChannels[ci].name;
429-
route.timeslice = edge.producerTimeIndex;
427+
428+
InputRoute::DanglingConfigurator danglingConfigurator;
429+
InputRoute::ExpirationConfigurator expirationConfigurator;
430+
auto const& inputSpec = consumer.inputs[edge.consumerInputIndex];
431+
auto const& sourceChannel = consumerDevice.inputChannels[ci].name;
432+
430433
switch (consumer.inputs[edge.consumerInputIndex].lifetime) {
431434
case Lifetime::Timeframe:
432-
route.danglingConfigurator = ExpirationHandlerHelpers::danglingTimeframeConfigurator();
433-
route.expirationConfigurator = ExpirationHandlerHelpers::expiringTimeframeConfigurator();
435+
danglingConfigurator = ExpirationHandlerHelpers::danglingTimeframeConfigurator();
436+
expirationConfigurator = ExpirationHandlerHelpers::expiringTimeframeConfigurator();
434437
break;
435438
case Lifetime::Condition:
436-
route.danglingConfigurator = ExpirationHandlerHelpers::danglingConditionConfigurator();
437-
route.expirationConfigurator = ExpirationHandlerHelpers::expiringConditionConfigurator(route);
439+
danglingConfigurator = ExpirationHandlerHelpers::danglingConditionConfigurator();
440+
expirationConfigurator = ExpirationHandlerHelpers::expiringConditionConfigurator(inputSpec);
438441
break;
439442
case Lifetime::QA:
440-
route.danglingConfigurator = ExpirationHandlerHelpers::danglingQAConfigurator();
441-
route.expirationConfigurator = ExpirationHandlerHelpers::expiringQAConfigurator();
443+
danglingConfigurator = ExpirationHandlerHelpers::danglingQAConfigurator();
444+
expirationConfigurator = ExpirationHandlerHelpers::expiringQAConfigurator();
442445
break;
443446
case Lifetime::Timer:
444-
route.danglingConfigurator = ExpirationHandlerHelpers::danglingTimerConfigurator(route);
445-
route.expirationConfigurator = ExpirationHandlerHelpers::expiringTimerConfigurator(route);
447+
danglingConfigurator = ExpirationHandlerHelpers::danglingTimerConfigurator(inputSpec);
448+
expirationConfigurator = ExpirationHandlerHelpers::expiringTimerConfigurator(inputSpec, sourceChannel);
446449
break;
447450
case Lifetime::Transient:
448-
route.danglingConfigurator = ExpirationHandlerHelpers::danglingTransientConfigurator();
449-
route.expirationConfigurator = ExpirationHandlerHelpers::expiringTransientConfigurator(route);
451+
danglingConfigurator = ExpirationHandlerHelpers::danglingTransientConfigurator();
452+
expirationConfigurator = ExpirationHandlerHelpers::expiringTransientConfigurator(inputSpec);
450453
break;
451454
}
455+
456+
InputRoute route{
457+
inputSpec,
458+
sourceChannel,
459+
edge.producerTimeIndex,
460+
danglingConfigurator,
461+
expirationConfigurator
462+
};
463+
452464
consumerDevice.inputs.push_back(route);
453465
};
454466

Framework/Core/src/LifetimeHelpers.cxx

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@
1111
#include "Framework/LifetimeHelpers.h"
1212
#include "Framework/ServiceRegistry.h"
1313
#include "Framework/RawDeviceService.h"
14-
#include "Framework/InputRoute.h"
14+
#include "Framework/InputSpec.h"
1515
#include "Headers/DataHeader.h"
1616
#include "Headers/Stack.h"
1717
#include "MemoryResources/MemoryResources.h"
@@ -107,25 +107,25 @@ ExpirationHandler::Handler LifetimeHelpers::fetchFromObjectRegistry()
107107
}
108108

109109
/// Enumerate entries on every invokation.
110-
ExpirationHandler::Handler LifetimeHelpers::enumerate(InputRoute const& route)
110+
ExpirationHandler::Handler LifetimeHelpers::enumerate(InputSpec const& matcher, std::string const& sourceChannel)
111111
{
112112
auto counter = std::make_shared<int64_t>(0);
113-
auto f = [route, counter](ServiceRegistry& services, PartRef& ref, uint64_t timestamp) -> void {
113+
auto f = [matcher, counter, sourceChannel](ServiceRegistry& services, PartRef& ref, uint64_t timestamp) -> void {
114114
// We should invoke the handler only once.
115115
assert(!ref.header);
116116
assert(!ref.payload);
117117
auto& rawDeviceService = services.get<RawDeviceService>();
118118

119119
DataHeader dh;
120-
dh.dataOrigin = route.matcher.origin;
121-
dh.dataDescription = route.matcher.description;
122-
dh.subSpecification = route.matcher.subSpec;
120+
dh.dataOrigin = matcher.origin;
121+
dh.dataDescription = matcher.description;
122+
dh.subSpecification = matcher.subSpec;
123123
dh.payloadSize = 8;
124124
dh.payloadSerializationMethod = gSerializationMethodNone;
125125

126126
DataProcessingHeader dph{ timestamp, 1 };
127127

128-
auto&& transport = rawDeviceService.device()->GetChannel(route.sourceChannel, 0).Transport();
128+
auto&& transport = rawDeviceService.device()->GetChannel(sourceChannel, 0).Transport();
129129
auto channelAlloc = o2::memory_resource::getTransportAllocator(transport);
130130
auto header = o2::memory_resource::getMessage(o2::header::Stack{ channelAlloc, dh, dph });
131131
ref.header = std::move(header);

Framework/Core/test/test_DataRelayer.cxx

Lines changed: 26 additions & 74 deletions
Original file line numberDiff line numberDiff line change
@@ -32,21 +32,12 @@ using Stack = o2::header::Stack;
3232
// and the subsequent InputRecord is immediately requested.
3333
BOOST_AUTO_TEST_CASE(TestNoWait) {
3434
Monitoring metrics;
35-
InputSpec spec;
36-
spec.binding = "clusters";
37-
spec.description = "CLUSTERS";
38-
spec.origin = "TPC";
39-
spec.subSpec = 0;
40-
spec.lifetime = Lifetime::Timeframe;
41-
42-
InputRoute route;
43-
route.sourceChannel = "Fake";
44-
route.matcher = spec;
45-
route.timeslice = 0;
35+
InputSpec spec{ "clusters", "TPC", "CLUSTERS" };
4636

4737
std::vector<InputRoute> inputs = {
48-
route
38+
InputRoute{ spec, "Fake", 0 }
4939
};
40+
5041
std::vector<ForwardRoute> forwards;
5142
TimesliceIndex index;
5243

@@ -81,31 +72,22 @@ BOOST_AUTO_TEST_CASE(TestNoWait) {
8172
// correctly relayed before being processed.
8273
BOOST_AUTO_TEST_CASE(TestRelay) {
8374
Monitoring metrics;
84-
InputSpec spec1;
85-
spec1.binding = "clusters";
86-
spec1.description = "CLUSTERS";
87-
spec1.origin = "TPC";
88-
spec1.subSpec = 0;
89-
spec1.lifetime = Lifetime::Timeframe;
90-
91-
InputSpec spec2;
92-
spec2.binding = "clusters_its";
93-
spec2.description = "CLUSTERS";
94-
spec2.origin = "ITS";
95-
spec2.subSpec = 0;
96-
spec2.lifetime = Lifetime::Timeframe;
97-
98-
InputRoute route1;
99-
route1.sourceChannel = "Fake";
100-
route1.matcher = spec1;
101-
route1.timeslice = 0;
102-
103-
InputRoute route2;
104-
route2.sourceChannel = "Fake";
105-
route2.matcher = spec2;
106-
route2.timeslice = 0;
107-
108-
std::vector<InputRoute> inputs = { route1, route2 };
75+
InputSpec spec1{
76+
"clusters",
77+
"TPC",
78+
"CLUSTERS",
79+
};
80+
InputSpec spec2{
81+
"clusters_its",
82+
"ITS",
83+
"CLUSTERS",
84+
};
85+
86+
std::vector<InputRoute> inputs = {
87+
InputRoute{ spec1, "Fake1", 0 },
88+
InputRoute{ spec2, "Fake2", 0 }
89+
};
90+
10991
std::vector<ForwardRoute> forwards;
11092

11193
TimesliceIndex index;
@@ -157,20 +139,10 @@ BOOST_AUTO_TEST_CASE(TestRelay) {
157139
// the cache.
158140
BOOST_AUTO_TEST_CASE(TestCache) {
159141
Monitoring metrics;
160-
InputSpec spec;
161-
spec.binding = "clusters";
162-
spec.description = "CLUSTERS";
163-
spec.origin = "TPC";
164-
spec.subSpec = 0;
165-
spec.lifetime = Lifetime::Timeframe;
166-
167-
InputRoute route;
168-
route.sourceChannel = "Fake";
169-
route.matcher = spec;
170-
route.timeslice = 0;
142+
InputSpec spec{ "clusters", "TPC", "CLUSTERS" };
171143

172144
std::vector<InputRoute> inputs = {
173-
route
145+
InputRoute{ spec, "Fake", 0 }
174146
};
175147
std::vector<ForwardRoute> forwards;
176148

@@ -231,34 +203,14 @@ BOOST_AUTO_TEST_CASE(TestCache) {
231203
// it will run immediately.
232204
BOOST_AUTO_TEST_CASE(TestPolicies) {
233205
Monitoring metrics;
234-
InputSpec spec1;
235-
spec1.binding = "clusters";
236-
spec1.description = "CLUSTERS";
237-
spec1.origin = "TPC";
238-
spec1.subSpec = 0;
239-
spec1.lifetime = Lifetime::Timeframe;
240-
241-
InputSpec spec2;
242-
spec2.binding = "tracks";
243-
spec2.description = "TRACKS";
244-
spec2.origin = "TPC";
245-
spec2.subSpec = 0;
246-
spec2.lifetime = Lifetime::Timeframe;
247-
248-
InputRoute route1;
249-
route1.sourceChannel = "Fake";
250-
route1.matcher = spec1;
251-
route1.timeslice = 0;
252-
253-
InputRoute route2;
254-
route2.sourceChannel = "Fake2";
255-
route2.matcher = spec2;
256-
route2.timeslice = 0;
206+
InputSpec spec1{ "clusters", "TPC", "CLUSTERS" };
207+
InputSpec spec2{ "tracks", "TPC", "TRACKS" };
257208

258209
std::vector<InputRoute> inputs = {
259-
route1,
260-
route2
210+
InputRoute{ spec1, "Fake1", 0 },
211+
InputRoute{ spec2, "Fake2", 0 },
261212
};
213+
262214
std::vector<ForwardRoute> forwards;
263215
TimesliceIndex index;
264216

0 commit comments

Comments
 (0)