Skip to content

Commit e42e84f

Browse files
committed
DPL: Use DataDescriptorMatcher on incoming data
This changes the DataRelayer so that we use a DataDescriptorMatcher, rather than the InputSpec to match incoming data. At the moment the matcher is derived from the InputSpec hardcoded values.
1 parent 1741811 commit e42e84f

File tree

2 files changed

+43
-13
lines changed

2 files changed

+43
-13
lines changed

Framework/Core/include/Framework/DataRelayer.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111
#define FRAMEWORK_DATARELAYER_H
1212

1313
#include "Framework/InputRoute.h"
14+
#include "Framework/DataDescriptorMatcher.h"
1415
#include "Framework/ForwardRoute.h"
1516
#include "Framework/CompletionPolicy.h"
1617
#include "Framework/PartRef.h"
@@ -97,6 +98,7 @@ class DataRelayer {
9798
std::vector<bool> mForwardingMask;
9899
CompletionPolicy mCompletionPolicy;
99100
std::vector<size_t> mDistinctRoutesIndex;
101+
std::vector<data_matcher::DataDescriptorMatcher> mInputMatchers;
100102
static std::vector<std::string> sMetricsNames;
101103
};
102104

Framework/Core/src/DataRelayer.cxx

Lines changed: 41 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -8,25 +8,28 @@
88
// granted to it by virtue of its status as an Intergovernmental Organization
99
// or submit itself to any jurisdiction.
1010
#include "Framework/DataRelayer.h"
11+
12+
#include "Framework/DataDescriptorMatcher.h"
1113
#include "Framework/DataSpecUtils.h"
1214
#include "Framework/DataProcessingHeader.h"
1315
#include "Framework/DataRef.h"
1416
#include "Framework/InputRecord.h"
1517
#include "Framework/CompletionPolicy.h"
1618
#include "Framework/PartRef.h"
17-
#include "fairmq/FairMQLogger.h"
1819
#include "Framework/TimesliceIndex.h"
1920

2021
#include <Monitoring/Monitoring.h>
2122

23+
#include <fairmq/FairMQLogger.h>
24+
2225
#include <gsl/span>
2326

27+
using namespace o2::framework::data_matcher;
2428
using DataHeader = o2::header::DataHeader;
2529
using DataProcessingHeader = o2::framework::DataProcessingHeader;
2630

2731
constexpr size_t MAX_PARALLEL_TIMESLICES = 256;
2832

29-
3033
namespace o2
3134
{
3235
namespace framework
@@ -45,6 +48,28 @@ std::vector<size_t> createDistinctRouteIndex(std::vector<InputRoute> const& rout
4548
}
4649
return result;
4750
}
51+
52+
/// This converts from InputRoute to the associated DataDescriptorMatcher.
53+
std::vector<DataDescriptorMatcher> createInputMatchers(std::vector<InputRoute> const& routes)
54+
{
55+
std::vector<DataDescriptorMatcher> result;
56+
57+
for (auto& route : routes) {
58+
DataDescriptorMatcher matcher{
59+
DataDescriptorMatcher::Op::And,
60+
OriginValueMatcher{ route.matcher.origin.str },
61+
std::make_unique<DataDescriptorMatcher>(
62+
DataDescriptorMatcher::Op::And,
63+
DescriptionValueMatcher{ route.matcher.description.str },
64+
std::make_unique<DataDescriptorMatcher>(
65+
DataDescriptorMatcher::Op::Just,
66+
SubSpecificationTypeValueMatcher{ route.matcher.subSpec }))
67+
};
68+
result.emplace_back(std::move(matcher));
69+
}
70+
71+
return result;
72+
}
4873
}
4974

5075
constexpr int INVALID_INPUT = -1;
@@ -64,7 +89,8 @@ DataRelayer::DataRelayer(const CompletionPolicy& policy,
6489
mTimesliceIndex{ index },
6590
mMetrics{ metrics },
6691
mCompletionPolicy{ policy },
67-
mDistinctRoutesIndex{ createDistinctRouteIndex(inputRoutes) }
92+
mDistinctRoutesIndex{ createDistinctRouteIndex(inputRoutes) },
93+
mInputMatchers{ createInputMatchers(inputRoutes) }
6894
{
6995
setPipelineLength(DEFAULT_PIPELINE_LENGTH);
7096
for (size_t ci = 0; ci < mCache.size(); ci++) {
@@ -107,22 +133,24 @@ void DataRelayer::processDanglingInputs(std::vector<ExpirationHandler> const& ex
107133
/// reason why these might diffent is that when you have timepipelining
108134
/// you have one route per timeslice, even if the type is the same.
109135
size_t
110-
assignInputSpecId(void *data, std::vector<InputRoute> const &routes) {
111-
for (size_t ri = 0, re = routes.size(); ri < re; ++ri) {
112-
auto &route = routes[ri];
136+
assignInputSpecId(void* data, std::vector<DataDescriptorMatcher> const& matchers)
137+
{
138+
/// FIXME: for the moment we have a global context, since we do not support
139+
/// yet generic matchers as InputSpec.
140+
std::vector<ContextElement> context{};
141+
142+
for (size_t ri = 0, re = matchers.size(); ri < re; ++ri) {
143+
auto& matcher = matchers[ri];
113144
const DataHeader* h = o2::header::get<DataHeader*>(data);
114145
if (h == nullptr) {
115146
return re;
116147
}
117148

118-
if (DataSpecUtils::match(route.matcher,
119-
h->dataOrigin,
120-
h->dataDescription,
121-
h->subSpecification)) {
149+
if (matcher.match(*h, context)) {
122150
return ri;
123151
}
124152
}
125-
return routes.size();
153+
return matchers.size();
126154
}
127155

128156
DataRelayer::RelayChoice
@@ -145,8 +173,8 @@ DataRelayer::relay(std::unique_ptr<FairMQMessage> &&header,
145173
// This returns the identifier for the given input. We use a separate
146174
// function because while it's trivial now, the actual matchmaking will
147175
// become more complicated when we will start supporting ranges.
148-
auto getInput = [&inputRoutes,&header] () -> int {
149-
return assignInputSpecId(header->GetData(), inputRoutes);
176+
auto getInput = [&matchers = mInputMatchers ,&header] () -> int {
177+
return assignInputSpecId(header->GetData(), matchers);
150178
};
151179

152180
// This will check if the input is valid. We hide the details so that

0 commit comments

Comments
 (0)