Skip to content

Commit 10b2c38

Browse files
committed
DPL: add benchmark for DataRelayer
As I plan to change a bit the internals of InputRoute and use a matcher this will come handy to spot performance regressions.
1 parent 8a345d5 commit 10b2c38

File tree

2 files changed

+244
-1
lines changed

2 files changed

+244
-1
lines changed

Framework/Core/CMakeLists.txt

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -250,7 +250,9 @@ set(TEST_SRCS
250250
test/test_WorkflowHelpers.cxx
251251
)
252252

253-
set(BENCH_SRCS test/benchmark_DataDescriptorMatcher.cxx)
253+
set(BENCH_SRCS
254+
test/benchmark_DataDescriptorMatcher.cxx
255+
test/benchmark_DataRelayer.cxx)
254256

255257
O2_GENERATE_TESTS(
256258
MODULE_LIBRARY_NAME ${LIBRARY_NAME}
Lines changed: 241 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,241 @@
1+
// Copyright CERN and copyright holders of ALICE O2. This software is
2+
// distributed under the terms of the GNU General Public License v3 (GPL
3+
// Version 3), copied verbatim in the file "COPYING".
4+
//
5+
// See http://alice-o2.web.cern.ch/license for full licensing information.
6+
//
7+
// In applying this license CERN does not waive the privileges and immunities
8+
// granted to it by virtue of its status as an Intergovernmental Organization
9+
// or submit itself to any jurisdiction.
10+
#include <benchmark/benchmark.h>
11+
12+
#include "Headers/DataHeader.h"
13+
#include "Headers/Stack.h"
14+
#include "Framework/CompletionPolicyHelpers.h"
15+
#include "Framework/DataRelayer.h"
16+
#include "Framework/DataProcessingHeader.h"
17+
#include <Monitoring/Monitoring.h>
18+
#include <fairmq/FairMQTransportFactory.h>
19+
#include <cstring>
20+
21+
using Monitoring = o2::monitoring::Monitoring;
22+
using namespace o2::framework;
23+
using DataHeader = o2::header::DataHeader;
24+
using Stack = o2::header::Stack;
25+
26+
// A simple test where an input is provided
27+
// and the subsequent InputRecord is immediately requested.
28+
static void BM_RelayMessageCreation(benchmark::State& state)
29+
{
30+
Monitoring metrics;
31+
InputSpec spec{ "clusters", "TPC", "CLUSTERS" };
32+
33+
std::vector<InputRoute> inputs = {
34+
InputRoute{ spec, "Fake", 0 }
35+
};
36+
37+
std::vector<ForwardRoute> forwards;
38+
TimesliceIndex index;
39+
40+
auto policy = CompletionPolicyHelpers::consumeWhenAny();
41+
DataRelayer relayer(policy, inputs, forwards, metrics, index);
42+
relayer.setPipelineLength(4);
43+
44+
// Let's create a dummy O2 Message with two headers in the stack:
45+
// - DataHeader matching the one provided in the input
46+
DataHeader dh;
47+
dh.dataDescription = "CLUSTERS";
48+
dh.dataOrigin = "TPC";
49+
dh.subSpecification = 0;
50+
51+
DataProcessingHeader dph{ 0, 1 };
52+
Stack stack{ dh, dph };
53+
auto transport = FairMQTransportFactory::CreateTransportFactory("zeromq");
54+
55+
for (auto _ : state) {
56+
// FIXME: Understand why pausing the timer makes it slower..
57+
//state.PauseTiming();
58+
FairMQMessagePtr header = transport->CreateMessage(stack.size());
59+
FairMQMessagePtr payload = transport->CreateMessage(1000);
60+
memcpy(header->GetData(), stack.data(), stack.size());
61+
//state.ResumeTiming();
62+
}
63+
// One for the header, one for the payload
64+
}
65+
66+
BENCHMARK(BM_RelayMessageCreation);
67+
68+
// A simple test where an input is provided
69+
// and the subsequent InputRecord is immediately requested.
70+
static void BM_RelaySingleSlot(benchmark::State& state)
71+
{
72+
Monitoring metrics;
73+
InputSpec spec{ "clusters", "TPC", "CLUSTERS" };
74+
75+
std::vector<InputRoute> inputs = {
76+
InputRoute{ spec, "Fake", 0 }
77+
};
78+
79+
std::vector<ForwardRoute> forwards;
80+
TimesliceIndex index;
81+
82+
auto policy = CompletionPolicyHelpers::consumeWhenAny();
83+
DataRelayer relayer(policy, inputs, forwards, metrics, index);
84+
relayer.setPipelineLength(4);
85+
86+
// Let's create a dummy O2 Message with two headers in the stack:
87+
// - DataHeader matching the one provided in the input
88+
DataHeader dh;
89+
dh.dataDescription = "CLUSTERS";
90+
dh.dataOrigin = "TPC";
91+
dh.subSpecification = 0;
92+
93+
DataProcessingHeader dph{ 0, 1 };
94+
Stack stack{ dh, dph };
95+
auto transport = FairMQTransportFactory::CreateTransportFactory("zeromq");
96+
97+
for (auto _ : state) {
98+
// FIXME: Understand why pausing the timer makes it slower..
99+
//state.PauseTiming();
100+
FairMQMessagePtr header = transport->CreateMessage(stack.size());
101+
FairMQMessagePtr payload = transport->CreateMessage(1000);
102+
memcpy(header->GetData(), stack.data(), stack.size());
103+
//state.ResumeTiming();
104+
105+
relayer.relay(std::move(header), std::move(payload));
106+
auto ready = relayer.getReadyToProcess();
107+
assert(ready.size() == 1);
108+
assert(ready[0].slot.index == 0);
109+
assert(ready[0].op == CompletionPolicy::CompletionOp::Consume);
110+
auto result = relayer.getInputsForTimeslice(ready[0].slot);
111+
assert(result.size() == 2);
112+
}
113+
// One for the header, one for the payload
114+
}
115+
116+
BENCHMARK(BM_RelaySingleSlot);
117+
118+
// This one will simulate a single input.
119+
static void BM_RelayMultipleSlots(benchmark::State& state)
120+
{
121+
Monitoring metrics;
122+
InputSpec spec{ "clusters", "TPC", "CLUSTERS" };
123+
124+
std::vector<InputRoute> inputs = {
125+
InputRoute{ spec, "Fake", 0 }
126+
};
127+
128+
std::vector<ForwardRoute> forwards;
129+
TimesliceIndex index;
130+
131+
auto policy = CompletionPolicyHelpers::consumeWhenAny();
132+
DataRelayer relayer(policy, inputs, forwards, metrics, index);
133+
relayer.setPipelineLength(4);
134+
135+
// Let's create a dummy O2 Message with two headers in the stack:
136+
// - DataHeader matching the one provided in the input
137+
DataHeader dh;
138+
dh.dataDescription = "CLUSTERS";
139+
dh.dataOrigin = "TPC";
140+
dh.subSpecification = 0;
141+
142+
auto transport = FairMQTransportFactory::CreateTransportFactory("zeromq");
143+
size_t timeslice = 0;
144+
145+
for (auto _ : state) {
146+
// FIXME: Understand why pausing the timer makes it slower..
147+
//state.PauseTiming();
148+
149+
DataProcessingHeader dph{ timeslice++, 1 };
150+
Stack stack{ dh, dph };
151+
FairMQMessagePtr header = transport->CreateMessage(stack.size());
152+
FairMQMessagePtr payload = transport->CreateMessage(1000);
153+
154+
memcpy(header->GetData(), stack.data(), stack.size());
155+
//state.ResumeTiming();
156+
157+
relayer.relay(std::move(header), std::move(payload));
158+
auto ready = relayer.getReadyToProcess();
159+
assert(ready.size() == 1);
160+
assert(ready[0].op == CompletionPolicy::CompletionOp::Consume);
161+
auto result = relayer.getInputsForTimeslice(ready[0].slot);
162+
assert(result.size() == 2);
163+
}
164+
// One for the header, one for the payload
165+
}
166+
167+
BENCHMARK(BM_RelayMultipleSlots);
168+
169+
/// In this case we have a record with two entries
170+
static void BM_RelayMultipleRoutes(benchmark::State& state)
171+
{
172+
Monitoring metrics;
173+
InputSpec spec1{ "clusters", "TPC", "CLUSTERS" };
174+
InputSpec spec2{ "tracks", "TPC", "TRACKS" };
175+
176+
std::vector<InputRoute> inputs = {
177+
InputRoute{ spec1, "Fake1", 0 },
178+
InputRoute{ spec2, "Fake2", 0 }
179+
};
180+
181+
std::vector<ForwardRoute> forwards;
182+
TimesliceIndex index;
183+
184+
auto policy = CompletionPolicyHelpers::consumeWhenAny();
185+
DataRelayer relayer(policy, inputs, forwards, metrics, index);
186+
relayer.setPipelineLength(4);
187+
188+
// Let's create a dummy O2 Message with two headers in the stack:
189+
// - DataHeader matching the one provided in the input
190+
DataHeader dh1;
191+
dh1.dataDescription = "CLUSTERS";
192+
dh1.dataOrigin = "TPC";
193+
dh1.subSpecification = 0;
194+
195+
DataHeader dh2;
196+
dh2.dataDescription = "TRACKS";
197+
dh2.dataOrigin = "TPC";
198+
dh2.subSpecification = 0;
199+
200+
auto transport = FairMQTransportFactory::CreateTransportFactory("zeromq");
201+
size_t timeslice = 0;
202+
203+
for (auto _ : state) {
204+
// FIXME: Understand why pausing the timer makes it slower..
205+
//state.PauseTiming();
206+
207+
DataProcessingHeader dph1{ timeslice, 1 };
208+
Stack stack1{ dh1, dph1 };
209+
210+
FairMQMessagePtr header1 = transport->CreateMessage(stack1.size());
211+
FairMQMessagePtr payload1 = transport->CreateMessage(1000);
212+
213+
memcpy(header1->GetData(), stack1.data(), stack1.size());
214+
215+
DataProcessingHeader dph2{ timeslice, 1 };
216+
Stack stack2{ dh2, dph2 };
217+
218+
FairMQMessagePtr header2 = transport->CreateMessage(stack2.size());
219+
FairMQMessagePtr payload2 = transport->CreateMessage(1000);
220+
221+
memcpy(header2->GetData(), stack2.data(), stack2.size());
222+
//state.ResumeTiming();
223+
224+
relayer.relay(std::move(header1), std::move(payload1));
225+
auto ready = relayer.getReadyToProcess();
226+
assert(ready.size() == 1);
227+
assert(ready[0].op == CompletionPolicy::CompletionOp::Consume);
228+
229+
relayer.relay(std::move(header2), std::move(payload2));
230+
ready = relayer.getReadyToProcess();
231+
assert(ready.size() == 1);
232+
assert(ready[0].op == CompletionPolicy::CompletionOp::Consume);
233+
auto result = relayer.getInputsForTimeslice(ready[0].slot);
234+
assert(result.size() == 4);
235+
}
236+
// One for the header, one for the payload
237+
}
238+
239+
BENCHMARK(BM_RelayMultipleRoutes);
240+
241+
BENCHMARK_MAIN()

0 commit comments

Comments
 (0)