Skip to content

Commit 55142b8

Browse files
authored
[MCH] allow sending full HBframes in MCH custom raw page reader (#4286)
This commit adds an option to the MCH CRU page reader that allows to buffer the pages belonging to the same orbit and send them in one single DPL message. The default behavior is to send each CRU page in a separate DPL message.
1 parent e2c6ead commit 55142b8

File tree

1 file changed

+81
-55
lines changed

1 file changed

+81
-55
lines changed

Detectors/MUON/MCH/Workflow/src/cru-page-reader-workflow.cxx

Lines changed: 81 additions & 55 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88
// granted to it by virtue of its status as an Intergovernmental Organization
99
// or submit itself to any jurisdiction.
1010

11+
///
1112
///
1213
/// \file cru-page-reader-workflow.cxx
1314
/// \author Andrea Ferrero
@@ -62,6 +63,7 @@ class FileReaderTask
6263
LOG(INFO) << "initializing file reader";
6364
mFrameMax = ic.options().get<int>("nframes");
6465
mPrint = ic.options().get<bool>("print");
66+
mFullHBF = ic.options().get<bool>("full-hbf");
6567

6668
auto inputFileName = ic.options().get<std::string>("infile");
6769
mInputFile.open(inputFileName, std::ios::binary);
@@ -82,80 +84,103 @@ class FileReaderTask
8284
{
8385
/// send one RDH block via DPL
8486
RDH rdh;
87+
char* buf{nullptr};
88+
size_t bufSize{0};
8589

86-
// stop if the required number of frames has been reached
87-
if (mFrameMax == 0) {
88-
pc.services().get<ControlService>().endOfStream();
89-
return;
90-
}
90+
while (true) {
9191

92-
if (mPrint) {
93-
printf("mFrameMax: %d\n", mFrameMax);
94-
}
95-
if (mFrameMax > 0) {
96-
mFrameMax -= 1;
97-
}
92+
// stop if the required number of frames has been reached
93+
if (mFrameMax == 0) {
94+
pc.services().get<ControlService>().endOfStream();
95+
return;
96+
}
9897

99-
// read the next RDH, stop if no more data is available
100-
mInputFile.read((char*)(&rdh), sizeof(RDH));
101-
if (mInputFile.fail()) {
10298
if (mPrint) {
103-
std::cout << "end of file reached" << std::endl;
99+
printf("mFrameMax: %d\n", mFrameMax);
100+
}
101+
if (mFrameMax > 0) {
102+
mFrameMax -= 1;
104103
}
105-
pc.services().get<ControlService>().endOfStream();
106-
return; // probably reached eof
107-
}
108104

109-
// check that the RDH version is ok (only RDH versions from 4 to 6 are supported at the moment)
110-
auto rdhVersion = o2::raw::RDHUtils::getVersion(rdh);
111-
auto rdhHeaderSize = o2::raw::RDHUtils::getHeaderSize(rdh);
112-
if (mPrint) {
113-
std::cout << "header_version=" << (int)rdhVersion << std::endl;
114-
}
115-
if (rdhVersion < 4 || rdhVersion > 6 || rdhHeaderSize != 64) {
116-
return;
117-
}
105+
// read the next RDH, stop if no more data is available
106+
mInputFile.read((char*)(&rdh), sizeof(RDH));
107+
if (mInputFile.fail()) {
108+
if (mPrint) {
109+
std::cout << "end of file reached" << std::endl;
110+
}
111+
pc.services().get<ControlService>().endOfStream();
112+
return; // probably reached eof
113+
}
118114

119-
// get the frame size from the RDH offsetToNext field
120-
auto frameSize = o2::raw::RDHUtils::getOffsetToNext(rdh);
121-
if (mPrint) {
122-
std::cout << "frameSize=" << frameSize << std::endl;
123-
}
115+
// check that the RDH version is ok (only RDH versions from 4 to 6 are supported at the moment)
116+
auto rdhVersion = o2::raw::RDHUtils::getVersion(rdh);
117+
auto rdhHeaderSize = o2::raw::RDHUtils::getHeaderSize(rdh);
118+
if (mPrint) {
119+
std::cout << "header_version=" << (int)rdhVersion << std::endl;
120+
}
121+
if (rdhVersion < 4 || rdhVersion > 6 || rdhHeaderSize != 64) {
122+
return;
123+
}
124124

125-
// stop if the frame size is too small
126-
if (frameSize < rdhHeaderSize) {
127-
std::cout << mFrameMax << " - frameSize too small: " << frameSize << std::endl;
128-
pc.services().get<ControlService>().endOfStream();
129-
return;
130-
}
125+
// get the frame size from the RDH offsetToNext field
126+
auto frameSize = o2::raw::RDHUtils::getOffsetToNext(rdh);
127+
if (mPrint) {
128+
std::cout << "frameSize=" << frameSize << std::endl;
129+
}
131130

132-
// allocate the output buffer
133-
char* buf = (char*)malloc(frameSize);
131+
// stop if the frame size is too small
132+
if (frameSize < rdhHeaderSize) {
133+
std::cout << mFrameMax << " - frameSize too small: " << frameSize << std::endl;
134+
pc.services().get<ControlService>().endOfStream();
135+
return;
136+
}
134137

135-
// copy the RDH into the output buffer
136-
memcpy(buf, &rdh, rdhHeaderSize);
138+
// allocate the output buffer
139+
buf = (char*)realloc(buf, bufSize + frameSize);
140+
if (buf == nullptr) {
141+
std::cout << mFrameMax << " - failed to allocate buffer" << std::endl;
142+
pc.services().get<ControlService>().endOfStream();
143+
return;
144+
}
137145

138-
// read the frame payload into the output buffer
139-
mInputFile.read(buf + rdhHeaderSize, frameSize - rdhHeaderSize);
146+
// copy the RDH into the output buffer
147+
memcpy(buf + bufSize, &rdh, rdhHeaderSize);
140148

141-
// stop if data cannot be read completely
142-
if (mInputFile.fail()) {
143-
if (mPrint) {
144-
std::cout << "end of file reached" << std::endl;
149+
// read the frame payload into the output buffer
150+
mInputFile.read(buf + bufSize + rdhHeaderSize, frameSize - rdhHeaderSize);
151+
152+
// stop if data cannot be read completely
153+
if (mInputFile.fail()) {
154+
if (mPrint) {
155+
std::cout << "end of file reached" << std::endl;
156+
}
157+
free(buf);
158+
pc.services().get<ControlService>().endOfStream();
159+
return; // probably reached eof
145160
}
146-
free(buf);
147-
pc.services().get<ControlService>().endOfStream();
148-
return; // probably reached eof
149-
}
150161

151-
// create the output message
152-
auto freefct = [](void* data, void* /*hint*/) { free(data); };
153-
pc.outputs().adoptChunk(Output{"ROUT", "RAWDATA"}, buf, frameSize, freefct, nullptr);
162+
// increment the total buffer size
163+
bufSize += frameSize;
164+
165+
auto stopBit = o2::raw::RDHUtils::getStop(rdh);
166+
167+
// when requesting full HBframes, the output message is sent only when the stop RDH is reached
168+
// otherwise we send one message for each CRU page
169+
if ((stopBit != 0) || (mFullHBF == false)) {
170+
// create the output message
171+
auto freefct = [](void* data, void* /*hint*/) { free(data); };
172+
pc.outputs().adoptChunk(Output{"ROUT", "RAWDATA"}, buf, bufSize, freefct, nullptr);
173+
174+
// stop the readout loop
175+
break;
176+
}
177+
} // while (true)
154178
}
155179

156180
private:
157181
std::ifstream mInputFile{}; ///< input file
158182
int mFrameMax; ///< number of frames to process
183+
bool mFullHBF; ///< send full HeartBeat frames
159184
bool mPrint = false; ///< print debug messages
160185
};
161186

@@ -170,6 +195,7 @@ o2::framework::DataProcessorSpec getFileReaderSpec()
170195
AlgorithmSpec{adaptFromTask<FileReaderTask>()},
171196
Options{{"infile", VariantType::String, "", {"input file name"}},
172197
{"nframes", VariantType::Int, -1, {"number of frames to process"}},
198+
{"full-hbf", VariantType::Bool, false, {"send full HeartBeat frames"}},
173199
{"print", VariantType::Bool, false, {"verbose output"}}}};
174200
}
175201
// clang-format on

0 commit comments

Comments
 (0)