Skip to content

Commit 6f841a6

Browse files
committed
DPL: move ROOT file reading completely to the plugin
This way we can have the plugin depend on JAlien and not require everyone does.
1 parent f04c23d commit 6f841a6

File tree

8 files changed

+354
-247
lines changed

8 files changed

+354
-247
lines changed

Framework/AnalysisSupport/CMakeLists.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,5 +12,6 @@
1212
# Drop this once we move to GCC 8.2+
1313
o2_add_library(FrameworkAnalysisSupport
1414
SOURCES src/Plugin.cxx
15+
src/AODJAlienReaderHelpers.cxx
1516
PRIVATE_INCLUDE_DIRECTORIES ${CMAKE_CURRENT_LIST_DIR}/src
1617
PUBLIC_LINK_LIBRARIES O2::Framework)
Lines changed: 323 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,323 @@
1+
// Copyright CERN and copyright holders of ALICE O2. This software is
2+
// distributed under the terms of the GNU General Public License v3 (GPL
3+
// Version 3), copied verbatim in the file "COPYING".
4+
//
5+
// See http://alice-o2.web.cern.ch/license for full licensing information.
6+
//
7+
// In applying this license CERN does not waive the privileges and immunities
8+
// granted to it by virtue of its status as an Intergovernmental Organization
9+
// or submit itself to any jurisdiction.
10+
11+
#include "AODJAlienReaderHelpers.h"
12+
#include "Framework/TableTreeHelpers.h"
13+
#include "Framework/AnalysisHelpers.h"
14+
#include "Framework/RootTableBuilderHelpers.h"
15+
#include "Framework/AlgorithmSpec.h"
16+
#include "Framework/ConfigParamRegistry.h"
17+
#include "Framework/ControlService.h"
18+
#include "Framework/CallbackService.h"
19+
#include "Framework/EndOfStreamContext.h"
20+
#include "Framework/DeviceSpec.h"
21+
#include "Framework/RawDeviceService.h"
22+
#include "Framework/DataSpecUtils.h"
23+
#include "Framework/DataInputDirector.h"
24+
#include "Framework/SourceInfoHeader.h"
25+
#include "Framework/ChannelInfo.h"
26+
#include "Framework/Logger.h"
27+
28+
#include <Monitoring/Monitoring.h>
29+
30+
#include <ROOT/RDataFrame.hxx>
31+
#include <TGrid.h>
32+
#include <TFile.h>
33+
#include <TTreeCache.h>
34+
#include <TTreePerfStats.h>
35+
36+
#include <arrow/ipc/reader.h>
37+
#include <arrow/ipc/writer.h>
38+
#include <arrow/io/interfaces.h>
39+
#include <arrow/table.h>
40+
#include <arrow/util/key_value_metadata.h>
41+
42+
#include <thread>
43+
44+
using namespace o2;
45+
using namespace o2::aod;
46+
47+
struct RuntimeWatchdog {
48+
int numberTimeFrames;
49+
uint64_t startTime;
50+
uint64_t lastTime;
51+
double runTime;
52+
uint64_t runTimeLimit;
53+
54+
RuntimeWatchdog(Long64_t limit)
55+
{
56+
numberTimeFrames = -1;
57+
startTime = uv_hrtime();
58+
lastTime = startTime;
59+
runTime = 0.;
60+
runTimeLimit = limit;
61+
}
62+
63+
bool update()
64+
{
65+
numberTimeFrames++;
66+
if (runTimeLimit <= 0) {
67+
return true;
68+
}
69+
70+
auto nowTime = uv_hrtime();
71+
72+
// time spent to process the time frame
73+
double time_spent = numberTimeFrames < 1 ? (double)(nowTime - lastTime) / 1.E9 : 0.;
74+
runTime += time_spent;
75+
lastTime = nowTime;
76+
77+
return ((double)(lastTime - startTime) / 1.E9 + runTime / (numberTimeFrames + 1)) < runTimeLimit;
78+
}
79+
80+
void printOut()
81+
{
82+
LOGP(INFO, "RuntimeWatchdog");
83+
LOGP(INFO, " run time limit: {}", runTimeLimit);
84+
LOGP(INFO, " number of time frames: {}", numberTimeFrames);
85+
LOGP(INFO, " estimated run time per time frame: {}", (numberTimeFrames >= 0) ? runTime / (numberTimeFrames + 1) : 0.);
86+
LOGP(INFO, " estimated total run time: {}", (double)(lastTime - startTime) / 1.E9 + ((numberTimeFrames >= 0) ? runTime / (numberTimeFrames + 1) : 0.));
87+
}
88+
};
89+
90+
template <typename... C>
91+
static constexpr auto columnNamesTrait(framework::pack<C...>)
92+
{
93+
return std::vector<std::string>{C::columnLabel()...};
94+
}
95+
96+
std::vector<std::string> getColumnNames(header::DataHeader dh)
97+
{
98+
auto description = std::string(dh.dataDescription.str);
99+
auto origin = std::string(dh.dataOrigin.str);
100+
101+
// get column names
102+
// AOD / RN2
103+
if (origin == "AOD") {
104+
if (description == "TRACK:PAR") {
105+
return columnNamesTrait(typename StoredTracksMetadata::table_t::persistent_columns_t{});
106+
} else if (description == "TRACK:PARCOV") {
107+
return columnNamesTrait(typename StoredTracksCovMetadata::table_t::persistent_columns_t{});
108+
} else if (description == "TRACK:EXTRA") {
109+
return columnNamesTrait(typename TracksExtraMetadata::table_t::persistent_columns_t{});
110+
}
111+
}
112+
113+
// default: column names = {}
114+
return std::vector<std::string>({});
115+
}
116+
117+
using o2::monitoring::Metric;
118+
using o2::monitoring::Monitoring;
119+
using o2::monitoring::tags::Key;
120+
using o2::monitoring::tags::Value;
121+
122+
namespace o2::framework::readers
123+
{
124+
auto setEOSCallback(InitContext& ic)
125+
{
126+
ic.services().get<CallbackService>().set(CallbackService::Id::EndOfStream,
127+
[](EndOfStreamContext& eosc) {
128+
auto& control = eosc.services().get<ControlService>();
129+
control.endOfStream();
130+
control.readyToQuit(QuitRequest::Me);
131+
});
132+
}
133+
134+
template <typename O>
135+
static inline auto extractTypedOriginal(ProcessingContext& pc)
136+
{
137+
///FIXME: this should be done in invokeProcess() as some of the originals may be compound tables
138+
return O{pc.inputs().get<TableConsumer>(aod::MetadataTrait<O>::metadata::tableLabel())->asArrowTable()};
139+
}
140+
141+
template <typename... Os>
142+
static inline auto extractOriginalsTuple(framework::pack<Os...>, ProcessingContext& pc)
143+
{
144+
return std::make_tuple(extractTypedOriginal<Os>(pc)...);
145+
}
146+
147+
AlgorithmSpec AODJAlienReaderHelpers::rootFileReaderCallback()
148+
{
149+
auto callback = AlgorithmSpec{adaptStateful([](ConfigParamRegistry const& options,
150+
DeviceSpec const& spec,
151+
Monitoring& monitoring) {
152+
monitoring.send(Metric{(uint64_t)0, "arrow-bytes-created"}.addTag(Key::Subsystem, monitoring::tags::Value::DPL));
153+
monitoring.send(Metric{(uint64_t)0, "arrow-messages-created"}.addTag(Key::Subsystem, monitoring::tags::Value::DPL));
154+
monitoring.send(Metric{(uint64_t)0, "arrow-bytes-destroyed"}.addTag(Key::Subsystem, monitoring::tags::Value::DPL));
155+
monitoring.send(Metric{(uint64_t)0, "arrow-messages-destroyed"}.addTag(Key::Subsystem, monitoring::tags::Value::DPL));
156+
monitoring.flushBuffer();
157+
158+
if (!options.isSet("aod-file")) {
159+
LOGP(ERROR, "No input file defined!");
160+
throw std::runtime_error("Processing is stopped!");
161+
}
162+
163+
auto filename = options.get<std::string>("aod-file");
164+
165+
// create a DataInputDirector
166+
auto didir = std::make_shared<DataInputDirector>(filename);
167+
if (options.isSet("aod-reader-json")) {
168+
auto jsonFile = options.get<std::string>("aod-reader-json");
169+
if (!didir->readJson(jsonFile)) {
170+
LOGP(ERROR, "Check the JSON document! Can not be properly parsed!");
171+
}
172+
}
173+
174+
// get the run time watchdog
175+
auto* watchdog = new RuntimeWatchdog(options.get<int64_t>("time-limit"));
176+
177+
// selected the TFN input and
178+
// create list of requested tables
179+
header::DataHeader TFNumberHeader;
180+
std::vector<OutputRoute> requestedTables;
181+
std::vector<OutputRoute> routes(spec.outputs);
182+
for (auto route : routes) {
183+
if (DataSpecUtils::partialMatch(route.matcher, header::DataOrigin("TFN"))) {
184+
auto concrete = DataSpecUtils::asConcreteDataMatcher(route.matcher);
185+
TFNumberHeader = header::DataHeader(concrete.description, concrete.origin, concrete.subSpec);
186+
} else {
187+
requestedTables.emplace_back(route);
188+
}
189+
}
190+
191+
auto fileCounter = std::make_shared<int>(0);
192+
auto numTF = std::make_shared<int>(-1);
193+
return adaptStateless([TFNumberHeader,
194+
requestedTables,
195+
fileCounter,
196+
numTF,
197+
watchdog,
198+
didir](Monitoring& monitoring, DataAllocator& outputs, ControlService& control, DeviceSpec const& device) {
199+
// check if RuntimeLimit is reached
200+
if (!watchdog->update()) {
201+
LOGP(INFO, "Run time exceeds run time limit of {} seconds!", watchdog->runTimeLimit);
202+
LOGP(INFO, "Stopping reader {} after time frame {}.", device.inputTimesliceId, watchdog->numberTimeFrames - 1);
203+
monitoring.flushBuffer();
204+
didir->closeInputFiles();
205+
control.endOfStream();
206+
control.readyToQuit(QuitRequest::Me);
207+
return;
208+
}
209+
210+
// Each parallel reader device.inputTimesliceId reads the files fileCounter*device.maxInputTimeslices+device.inputTimesliceId
211+
// the TF to read is numTF
212+
assert(device.inputTimesliceId < device.maxInputTimeslices);
213+
uint64_t timeFrameNumber = 0;
214+
int fcnt = (*fileCounter * device.maxInputTimeslices) + device.inputTimesliceId;
215+
int ntf = *numTF + 1;
216+
monitoring.send(Metric{(uint64_t)ntf, "tf-sent"}.addTag(Key::Subsystem, monitoring::tags::Value::DPL));
217+
static int currentFileCounter = -1;
218+
static int filesProcessed = 0;
219+
if (currentFileCounter != *fileCounter) {
220+
currentFileCounter = *fileCounter;
221+
monitoring.send(Metric{(uint64_t)++filesProcessed, "files-opened"}.addTag(Key::Subsystem, monitoring::tags::Value::DPL));
222+
}
223+
224+
// loop over requested tables
225+
TTree* tr = nullptr;
226+
bool first = true;
227+
static size_t totalSizeUncompressed = 0;
228+
static size_t totalSizeCompressed = 0;
229+
static size_t totalReadCalls = 0;
230+
231+
for (auto route : requestedTables) {
232+
233+
// create header
234+
auto concrete = DataSpecUtils::asConcreteDataMatcher(route.matcher);
235+
auto dh = header::DataHeader(concrete.description, concrete.origin, concrete.subSpec);
236+
237+
// create a TreeToTable object
238+
auto info = didir->getFileFolder(dh, fcnt, ntf);
239+
size_t before = 0;
240+
if (info.file) {
241+
info.file->GetReadCalls();
242+
}
243+
tr = didir->getDataTree(dh, fcnt, ntf);
244+
if (!tr) {
245+
if (first) {
246+
// check if there is a next file to read
247+
fcnt += device.maxInputTimeslices;
248+
if (didir->atEnd(fcnt)) {
249+
LOGP(INFO, "No input files left to read for reader {}!", device.inputTimesliceId);
250+
didir->closeInputFiles();
251+
control.endOfStream();
252+
control.readyToQuit(QuitRequest::Me);
253+
return;
254+
}
255+
// get first folder of next file
256+
ntf = 0;
257+
tr = didir->getDataTree(dh, fcnt, ntf);
258+
if (!tr) {
259+
LOGP(FATAL, "Can not retrieve tree for table {}: fileCounter {}, timeFrame {}", concrete.origin, fcnt, ntf);
260+
throw std::runtime_error("Processing is stopped!");
261+
}
262+
} else {
263+
LOGP(FATAL, "Can not retrieve tree for table {}: fileCounter {}, timeFrame {}", concrete.origin, fcnt, ntf);
264+
throw std::runtime_error("Processing is stopped!");
265+
}
266+
}
267+
TTreePerfStats ps("ioperf", tr);
268+
269+
if (first) {
270+
timeFrameNumber = didir->getTimeFrameNumber(dh, fcnt, ntf);
271+
auto o = Output(TFNumberHeader);
272+
outputs.make<uint64_t>(o) = timeFrameNumber;
273+
}
274+
275+
// create table output
276+
auto o = Output(dh);
277+
auto& t2t = outputs.make<TreeToTable>(o);
278+
279+
// add branches to read
280+
// fill the table
281+
282+
auto colnames = getColumnNames(dh);
283+
if (colnames.size() == 0) {
284+
totalSizeCompressed += tr->GetZipBytes();
285+
totalSizeUncompressed += tr->GetTotBytes();
286+
t2t.addAllColumns(tr);
287+
} else {
288+
for (auto& colname : colnames) {
289+
TBranch* branch = tr->GetBranch(colname.c_str());
290+
totalSizeCompressed += branch->GetZipBytes("*");
291+
totalSizeUncompressed += branch->GetTotBytes("*");
292+
t2t.addColumn(colname.c_str());
293+
}
294+
}
295+
t2t.fill(tr);
296+
if (info.file) {
297+
totalReadCalls += info.file->GetReadCalls() - before;
298+
static std::string currentFileRead = "";
299+
std::string nextFileRead = info.file->GetPath();
300+
if (currentFileRead != nextFileRead) {
301+
currentFileRead = nextFileRead;
302+
monitoring.send(Metric{currentFileRead, "aod-file-read-path"}.addTag(Key::Subsystem, monitoring::tags::Value::DPL));
303+
}
304+
}
305+
monitoring.send(Metric{(double)ps.GetReadCalls(), "aod-tree-read-calls"}.addTag(Key::Subsystem, monitoring::tags::Value::DPL));
306+
delete tr;
307+
308+
first = false;
309+
}
310+
monitoring.send(Metric{(uint64_t)totalSizeUncompressed / 1000, "aod-bytes-read-uncompressed"}.addTag(Key::Subsystem, monitoring::tags::Value::DPL));
311+
monitoring.send(Metric{(uint64_t)totalSizeCompressed / 1000, "aod-bytes-read-compressed"}.addTag(Key::Subsystem, monitoring::tags::Value::DPL));
312+
monitoring.send(Metric{(uint64_t)totalReadCalls, "aod-total-read-calls"}.addTag(Key::Subsystem, monitoring::tags::Value::DPL));
313+
314+
// save file number and time frame
315+
*fileCounter = (fcnt - device.inputTimesliceId) / device.maxInputTimeslices;
316+
*numTF = ntf;
317+
});
318+
})};
319+
320+
return callback;
321+
}
322+
323+
} // namespace o2::framework::readers
Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
// Copyright CERN and copyright holders of ALICE O2. This software is
2+
// distributed under the terms of the GNU General Public License v3 (GPL
3+
// Version 3), copied verbatim in the file "COPYING".
4+
//
5+
// See http://alice-o2.web.cern.ch/license for full licensing information.
6+
//
7+
// In applying this license CERN does not waive the privileges and immunities
8+
// granted to it by virtue of its status as an Intergovernmental Organization
9+
// or submit itself to any jurisdiction.
10+
11+
#ifndef O2_FRAMEWORK_AODJALIENREADERHELPERS_H_
12+
#define O2_FRAMEWORK_AODJALIENREADERHELPERS_H_
13+
14+
#include "Framework/TableBuilder.h"
15+
#include "Framework/AlgorithmSpec.h"
16+
#include "Framework/Logger.h"
17+
#include <uv.h>
18+
19+
namespace o2::framework::readers
20+
{
21+
22+
struct AODJAlienReaderHelpers {
23+
static AlgorithmSpec rootFileReaderCallback();
24+
};
25+
26+
} // namespace o2::framework::readers
27+
28+
#endif // O2_FRAMEWORK_AODREADERHELPERS_H_

Framework/AnalysisSupport/src/Plugin.cxx

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -9,12 +9,12 @@
99
// or submit itself to any jurisdiction.
1010
#include "Framework/Plugins.h"
1111
#include "Framework/AlgorithmSpec.h"
12-
#include "Framework/AODReaderHelpers.h"
12+
#include "AODJAlienReaderHelpers.h"
1313

1414
struct ROOTFileReader : o2::framework::AlgorithmPlugin {
1515
o2::framework::AlgorithmSpec create() override
1616
{
17-
return o2::framework::readers::AODReaderHelpers::rootFileReaderCallback();
17+
return o2::framework::readers::AODJAlienReaderHelpers::rootFileReaderCallback();
1818
}
1919
};
2020

0 commit comments

Comments
 (0)