Skip to content

Commit 9cb10c0

Browse files
committed
Executable for chunk large raw-data files
allows to split single or group of large (with many TFs) raw data files to chunks each containing at most N TFs. One can also change the grouping of the output files: e.g.: o2-raw-file-split --detect-tf0 --input-conf raw.cfg -n 10 -o split/ch --file-for cru will regroup the data in 10 chunks per output file in split/ch_0 ... split/ch_N directories and creating a single output file per CRU. The configuration file split/ch_X/<DET>raw.cfg will be created for every chunk
1 parent ea5fd7c commit 9cb10c0

File tree

4 files changed

+220
-2
lines changed

4 files changed

+220
-2
lines changed

Detectors/Raw/CMakeLists.txt

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,12 +35,18 @@ o2_add_executable(file-check
3535
PUBLIC_LINK_LIBRARIES O2::DetectorsRaw
3636
Boost::program_options)
3737

38+
o2_add_executable(file-split
39+
COMPONENT_NAME raw
40+
SOURCES src/rawfileSplit.cxx
41+
PUBLIC_LINK_LIBRARIES O2::DetectorsRaw)
42+
3843
o2_add_executable(file-reader-workflow
3944
COMPONENT_NAME raw
4045
SOURCES src/rawfile-reader-workflow.cxx
4146
src/RawFileReaderWorkflow.cxx
4247
PUBLIC_LINK_LIBRARIES O2::DetectorsRaw)
4348

49+
4450
o2_add_test(HBFUtils
4551
PUBLIC_LINK_LIBRARIES O2::DetectorsRaw
4652
O2::Steer

Detectors/Raw/include/DetectorsRaw/RawFileWriter.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -240,6 +240,8 @@ class RawFileWriter
240240
return it->first;
241241
}
242242

243+
OutputFile& getOutputFileForLink(const LinkData& lnk) { return mFName2File[lnk.fileName]; }
244+
243245
int getSuperPageSize() const { return mSuperPageSize; }
244246
void setSuperPageSize(int nbytes);
245247

Detectors/Raw/src/RawFileWriter.cxx

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -613,8 +613,7 @@ void RawFileWriter::LinkData::fillEmptyHBHs(const IR& ir, bool dataAdded)
613613
std::string RawFileWriter::LinkData::describe() const
614614
{
615615
std::stringstream ss;
616-
ss << "Link SubSpec=0x" << std::hex << std::setw(8) << std::setfill('0')
617-
<< RDHUtils::getSubSpec(rdhCopy) << std::dec
616+
ss << "Link SubSpec=0x" << std::hex << std::setw(8) << std::setfill('0') << subspec << std::dec
618617
<< '(' << std::setw(3) << int(RDHUtils::getCRUID(rdhCopy)) << ':' << std::setw(2) << int(RDHUtils::getLinkID(rdhCopy)) << ':'
619618
<< int(RDHUtils::getEndPointID(rdhCopy)) << ") feeID=0x" << std::hex << std::setw(4) << std::setfill('0') << RDHUtils::getFEEID(rdhCopy);
620619
return ss.str();

Detectors/Raw/src/rawfileSplit.cxx

Lines changed: 211 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,211 @@
1+
// Copyright CERN and copyright holders of ALICE O2. This software is
2+
// distributed under the terms of the GNU General Public License v3 (GPL
3+
// Version 3), copied verbatim in the file "COPYING".
4+
//
5+
// See http://alice-o2.web.cern.ch/license for full licensing information.
6+
//
7+
// In applying this license CERN does not waive the privileges and immunities
8+
// granted to it by virtue of its status as an Intergovernmental Organization
9+
// or submit itself to any jurisdiction.
10+
11+
/// @file rawfileCheck.h
12+
/// @author ruben.shahoyan@cern.ch
13+
/// @brief Checker for raw data conformity with CRU format
14+
15+
#include "DetectorsRaw/RawFileReader.h"
16+
#include "DetectorsRaw/RawFileWriter.h"
17+
#include "CommonUtils/ConfigurableParam.h"
18+
#include "CommonUtils/StringUtils.h"
19+
#include "Framework/Logger.h"
20+
#include <TStopwatch.h>
21+
#include <boost/program_options.hpp>
22+
#include <iostream>
23+
#include <string>
24+
#include <vector>
25+
#include <TSystem.h>
26+
27+
namespace bpo = boost::program_options;
28+
29+
using namespace o2::raw;
30+
31+
int main(int argc, char* argv[])
32+
{
33+
RawFileReader reader;
34+
std::vector<std::string> fnames;
35+
std::string config, configKeyValues;
36+
bpo::variables_map vm;
37+
bpo::options_description descOpt("Options");
38+
auto desc_add_option = descOpt.add_options();
39+
desc_add_option("help,h", "print this help message.");
40+
desc_add_option("input-conf,c", bpo::value(&config)->default_value(""), "read input from configuration file");
41+
desc_add_option("max-tf,m", bpo::value<uint32_t>()->default_value(0xffffffff), " ID to read (counts from 0)");
42+
desc_add_option("verbosity,v", bpo::value<int>()->default_value(reader.getVerbosity()), "1: long report, 2 or 3: print or dump all RDH");
43+
desc_add_option("spsize,s", bpo::value<int>()->default_value(reader.getNominalSPageSize()), "nominal super-page size in bytes");
44+
desc_add_option("buffer-size,b", bpo::value<size_t>()->default_value(reader.getNominalSPageSize()), "buffer size for files preprocessing");
45+
desc_add_option("detect-tf0", "autodetect HBFUtils start Orbit/BC from 1st TF seen");
46+
desc_add_option("rorc", "impose RORC as default detector mode");
47+
desc_add_option("tfs-per-chunk,n", bpo::value<uint32_t>()->default_value(0xffffffff), " number of output TFs per chunk");
48+
desc_add_option("output-dir-prefix,o", bpo::value<std::string>()->default_value("./chunk"), "output directory prefix for raw data chunk (chunk ID will be added)");
49+
desc_add_option("file-for,f", bpo::value<std::string>()->default_value("all"), "single file per: all,cru,link");
50+
51+
desc_add_option("configKeyValues", bpo::value(&configKeyValues)->default_value(""), "semicolon separated key=value strings");
52+
for (int i = 0; i < RawFileReader::NErrorsDefined; i++) {
53+
auto ei = RawFileReader::ErrTypes(i);
54+
desc_add_option(RawFileReader::nochk_opt(ei).c_str(), RawFileReader::nochk_expl(ei).c_str());
55+
}
56+
57+
bpo::options_description hiddenOpt("hidden");
58+
hiddenOpt.add_options()("files", bpo::value(&fnames)->composing(), "");
59+
60+
bpo::options_description fullOpt("cmd");
61+
fullOpt.add(descOpt).add(hiddenOpt);
62+
63+
bpo::positional_options_description posOpt;
64+
posOpt.add("files", -1);
65+
66+
auto printHelp = [&](std::ostream& stream) {
67+
stream << "Usage: " << argv[0] << " [options] file0 [... fileN]" << std::endl;
68+
stream << descOpt << std::endl;
69+
stream << " (input files are optional if config file was provided)" << std::endl;
70+
};
71+
72+
try {
73+
bpo::store(bpo::command_line_parser(argc, argv)
74+
.options(fullOpt)
75+
.positional(posOpt)
76+
.allow_unregistered()
77+
.run(),
78+
vm);
79+
bpo::notify(vm);
80+
if (argc == 1 || vm.count("help") || (fnames.empty() && config.empty())) {
81+
printHelp(std::cout);
82+
return 0;
83+
}
84+
o2::conf::ConfigurableParam::updateFromString(configKeyValues);
85+
} catch (const bpo::error& e) {
86+
std::cerr << e.what() << "\n\n";
87+
std::cerr << "Error parsing command line arguments\n";
88+
printHelp(std::cerr);
89+
return -1;
90+
}
91+
92+
RawFileReader::RDH rdh;
93+
LOG(INFO) << "RawDataHeader v" << int(rdh.version) << " is assumed";
94+
95+
RawFileReader::ReadoutCardType rocard = vm.count("rorc") ? RawFileReader::ReadoutCardType::RORC : RawFileReader::ReadoutCardType::CRU;
96+
97+
reader.setVerbosity(vm["verbosity"].as<int>());
98+
reader.setNominalSPageSize(vm["spsize"].as<int>());
99+
reader.setMaxTFToRead(vm["max-tf"].as<uint32_t>());
100+
reader.setBufferSize(vm["buffer-size"].as<size_t>());
101+
reader.setDefaultReadoutCardType(rocard);
102+
reader.setTFAutodetect(vm.count("detect-tf0") ? RawFileReader::FirstTFDetection::Pending : RawFileReader::FirstTFDetection::Disabled);
103+
104+
std::string_view fileFor = vm["file-for"].as<std::string>();
105+
106+
uint32_t errmap = 0;
107+
for (int i = RawFileReader::NErrorsDefined; i--;) {
108+
auto ei = RawFileReader::ErrTypes(i);
109+
if (RawFileReader::ErrCheckDefaults[i]) {
110+
errmap |= 0x1 << i;
111+
}
112+
if (vm.count(RawFileReader::nochk_opt(ei).c_str())) { // toggle
113+
errmap ^= 0x1 << i;
114+
}
115+
LOG(INFO) << ((errmap & (0x1 << i)) ? "apply " : "ignore") << " check for " << RawFileReader::ErrNames[i].data();
116+
}
117+
118+
if (!config.empty()) {
119+
auto inp = RawFileReader::parseInput(config);
120+
reader.loadFromInputsMap(inp);
121+
}
122+
123+
for (int i = 0; i < fnames.size(); i++) {
124+
reader.addFile(fnames[i]);
125+
}
126+
127+
TStopwatch sw;
128+
sw.Start();
129+
130+
reader.setCheckErrors(errmap);
131+
reader.init();
132+
133+
sw.Print();
134+
int maxTFPerChunk = vm["tfs-per-chunk"].as<uint32_t>();
135+
std::string outDirPrefix = vm["output-dir-prefix"].as<std::string>(), outDir = "";
136+
int ntf = reader.getNTimeFrames();
137+
int nlinks = reader.getNLinks();
138+
std::vector<RawFileReader::PartStat> partsSP;
139+
std::vector<char> buffer;
140+
std::unique_ptr<RawFileWriter> writer;
141+
int chunkID = -1;
142+
143+
for (int itf = 0; itf < ntf; itf++) {
144+
reader.setNextTFToRead(itf);
145+
bool reinitWriter = false;
146+
if ((itf % maxTFPerChunk) == 0) {
147+
reinitWriter = true;
148+
chunkID++;
149+
}
150+
for (int il = 0; il < nlinks; il++) {
151+
auto& link = reader.getLink(il);
152+
if (!link.rewindToTF(itf)) {
153+
continue; // this link has no data for wanted TF
154+
}
155+
int nParts = link.getNextTFSuperPagesStat(partsSP);
156+
for (int ip = 0; ip < nParts; ip++) {
157+
buffer.resize(partsSP[ip].size);
158+
auto bread = link.readNextSuperPage(buffer.data(), &partsSP[ip]);
159+
if (bread != partsSP[ip].size) {
160+
LOG(ERROR) << "Link " << il << " read " << bread << " bytes instead of " << partsSP[ip].size << " expected in TF=" << itf << " part=" << ip;
161+
}
162+
163+
if (reinitWriter) {
164+
if (writer) { // generate config for previous chunk
165+
writer->writeConfFile(writer->getOrigin().str, "RAWDATA", o2::utils::concat_string(outDir, '/', writer->getOrigin().str, "raw.cfg"));
166+
}
167+
outDir = o2::utils::concat_string(outDirPrefix, "_", std::to_string(chunkID));
168+
if (gSystem->AccessPathName(outDir.data())) {
169+
if (gSystem->mkdir(outDir.data(), kTRUE)) {
170+
LOG(FATAL) << "could not create output directory " << outDir;
171+
} else {
172+
LOG(INFO) << "created output directory " << outDir;
173+
}
174+
}
175+
writer = std::make_unique<RawFileWriter>(link.origin, link.cruDetector);
176+
writer->useRDHVersion(RDHUtils::getVersion(link.rdhl));
177+
reinitWriter = false;
178+
}
179+
if (!writer->isLinkRegistered(RDHUtils::getSubSpec(RDHUtils::getCRUID(link.rdhl), RDHUtils::getLinkID(link.rdhl), RDHUtils::getEndPointID(link.rdhl), RDHUtils::getFEEID(link.rdhl)))) { // register the output link
180+
std::string outFileName;
181+
182+
if (fileFor == "all") { // single file for all links
183+
outFileName = o2::utils::concat_string(outDir, "/", fileFor, ".raw");
184+
} else if (fileFor == "cru") {
185+
outFileName = o2::utils::concat_string(outDir, "/", fileFor, "_", std::to_string(RDHUtils::getCRUID(link.rdhl)), ".raw");
186+
} else if (fileFor == "link") {
187+
outFileName = o2::utils::concat_string(outDir, "/", fileFor,
188+
"_", std::to_string(RDHUtils::getLinkID(link.rdhl)),
189+
"_cru", std::to_string(RDHUtils::getCRUID(link.rdhl)),
190+
"_ep", std::to_string(RDHUtils::getEndPointID(link.rdhl)),
191+
"_feeid", std::to_string(RDHUtils::getFEEID(link.rdhl)), ".raw");
192+
} else {
193+
throw std::runtime_error("invalid option provided for file grouping");
194+
}
195+
196+
writer->registerLink(link.rdhl, outFileName);
197+
}
198+
199+
auto& linkW = writer->getLinkWithSubSpec(link.rdhl);
200+
auto& outF = writer->getOutputFileForLink(linkW);
201+
outF.write(buffer.data(), bread);
202+
}
203+
}
204+
}
205+
if (writer) { // generate config for previous chunk
206+
writer->writeConfFile(writer->getOrigin().str, "RAWDATA", o2::utils::concat_string(outDir, '/', writer->getOrigin().str, "raw.cfg"));
207+
}
208+
writer.reset();
209+
210+
return 0;
211+
}

0 commit comments

Comments
 (0)