Skip to content

Commit 4abc548

Browse files
matthiasrichtershahor02
authored andcommitted
TPC workflow: sending data from publishers unserialized
Providing the data type of the objects to be published in the RootTreeReader branch configuration to allow the DPL IO to decide upon serialization method. The unserialized data is extracted as span on the receiver side.
1 parent fe0776d commit 4abc548

File tree

4 files changed

+65
-39
lines changed

4 files changed

+65
-39
lines changed

Detectors/TPC/workflow/include/TPCWorkflow/PublisherSpec.h

Lines changed: 42 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,8 +15,12 @@
1515

1616
#include "Framework/DataProcessorSpec.h"
1717
#include "Framework/OutputSpec.h"
18+
#include "Framework/DataSpecUtils.h"
19+
#include "Framework/Output.h"
20+
#include "DPLUtils/RootTreeReader.h"
1821
#include <vector>
1922
#include <string>
23+
#include <functional>
2024

2125
namespace o2
2226
{
@@ -44,7 +48,44 @@ struct PublisherConf {
4448

4549
/// create a processor spec
4650
/// read data from multiple tree branches from ROOT file and publish
47-
framework::DataProcessorSpec getPublisherSpec(PublisherConf const& config, bool propagateMC = true);
51+
template <typename T = void>
52+
framework::DataProcessorSpec getPublisherSpec(PublisherConf const& config, bool propagateMC = true)
53+
{
54+
using Reader = o2::framework::RootTreeReader;
55+
using Output = o2::framework::Output;
56+
auto dto = o2::framework::DataSpecUtils::asConcreteDataTypeMatcher(config.dataoutput);
57+
auto mco = o2::framework::DataSpecUtils::asConcreteDataTypeMatcher(config.mcoutput);
58+
59+
// a creator callback for the actual reader instance
60+
auto creator = [dto, mco, propagateMC](const char* treename, const char* filename, int nofEvents, Reader::PublishingMode publishingMode, o2::header::DataHeader::SubSpecificationType subSpec, const char* branchname, const char* mcbranchname) {
61+
constexpr auto persistency = o2::framework::Lifetime::Timeframe;
62+
if (propagateMC) {
63+
return std::make_shared<Reader>(treename,
64+
filename,
65+
nofEvents,
66+
publishingMode,
67+
Output{mco.origin, mco.description, subSpec, persistency},
68+
mcbranchname,
69+
Reader::BranchDefinition<T>{Output{dto.origin, dto.description, subSpec, persistency}, branchname});
70+
} else {
71+
return std::make_shared<Reader>(treename,
72+
filename,
73+
nofEvents,
74+
publishingMode,
75+
Reader::BranchDefinition<T>{Output{dto.origin, dto.description, subSpec, persistency}, branchname});
76+
}
77+
};
78+
79+
return createPublisherSpec(config, propagateMC, creator);
80+
}
81+
82+
namespace workflow_reader
83+
{
84+
using Reader = o2::framework::RootTreeReader;
85+
using Creator = std::function<std::shared_ptr<Reader>(const char*, const char*, int, Reader::PublishingMode, o2::header::DataHeader::SubSpecificationType, const char*, const char*)>;
86+
} // namespace workflow_reader
87+
88+
framework::DataProcessorSpec createPublisherSpec(PublisherConf const& config, bool propagateMC, workflow_reader::Creator creator);
4889

4990
} // end namespace tpc
5091
} // end namespace o2

Detectors/TPC/workflow/src/ClustererSpec.cxx

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -91,7 +91,7 @@ DataProcessorSpec getClustererSpec(bool sendMC)
9191
if (!labelKey.empty()) {
9292
inMCLabels = std::move(pc.inputs().get<const MCLabelContainer*>(labelKey.c_str()));
9393
}
94-
auto inDigits = pc.inputs().get<const std::vector<o2::tpc::Digit>>(inputKey.c_str());
94+
auto inDigits = pc.inputs().get<gsl::span<o2::tpc::Digit>>(inputKey.c_str());
9595
if (verbosity > 0 && inMCLabels) {
9696
LOG(INFO) << "received " << inDigits.size() << " digits, "
9797
<< inMCLabels->getIndexedSize() << " MC label objects"

Detectors/TPC/workflow/src/PublisherSpec.cxx

Lines changed: 10 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -15,10 +15,8 @@
1515

1616
#include "Framework/ConfigParamRegistry.h"
1717
#include "Framework/ControlService.h"
18-
#include "Framework/DataSpecUtils.h"
1918
#include "TPCWorkflow/PublisherSpec.h"
2019
#include "Headers/DataHeader.h"
21-
#include "DPLUtils/RootTreeReader.h"
2220
#include "TPCBase/Sector.h"
2321
#include "DataFormatsTPC/TPCSectorHeader.h"
2422
#include <memory> // for make_shared, make_unique, unique_ptr
@@ -39,7 +37,7 @@ namespace tpc
3937
/// read data from multiple tree branches from ROOT file and publish
4038
/// data are expected to be stored in separated branches per sector, the default
4139
/// branch name is configurable, sector number is apended as extension '_n'
42-
DataProcessorSpec getPublisherSpec(PublisherConf const& config, bool propagateMC)
40+
DataProcessorSpec createPublisherSpec(PublisherConf const& config, bool propagateMC, workflow_reader::Creator creator)
4341
{
4442
if (config.tpcSectors.size() == 0 || config.outputIds.size() == 0) {
4543
throw std::invalid_argument("need TPC sector and output id configuration");
@@ -54,7 +52,7 @@ DataProcessorSpec getPublisherSpec(PublisherConf const& config, bool propagateMC
5452
bool finished = false;
5553
};
5654

57-
auto initFunction = [config, propagateMC](InitContext& ic) {
55+
auto initFunction = [config, propagateMC, creator](InitContext& ic) {
5856
// get the option from the init context
5957
auto filename = ic.options().get<std::string>("infile");
6058
auto treename = ic.options().get<std::string>("treename");
@@ -88,7 +86,6 @@ DataProcessorSpec getPublisherSpec(PublisherConf const& config, bool propagateMC
8886
// set up the tree interface
8987
// TODO: parallelism on sectors needs to be implemented as selector in the reader
9088
// the data is now in parallel branches, as first attempt use an array of readers
91-
constexpr auto persistency = Lifetime::Timeframe;
9289
auto outputId = outputIds.begin();
9390
for (size_t sector = 0; sector < NSectors; ++sector) {
9491
if ((activeSectors & ((uint64_t)0x1 << sector)) == 0) {
@@ -103,27 +100,14 @@ DataProcessorSpec getPublisherSpec(PublisherConf const& config, bool propagateMC
103100
}
104101
std::string clusterbranchname = clbrName + "_" + std::to_string(sector);
105102
std::string mcbranchname = mcbrName + "_" + std::to_string(sector);
106-
auto dto = DataSpecUtils::asConcreteDataTypeMatcher(config.dataoutput);
107-
auto mco = DataSpecUtils::asConcreteDataTypeMatcher(config.mcoutput);
108-
if (propagateMC) {
109-
readers[sector] = std::make_shared<RootTreeReader>(treename.c_str(), // tree name
110-
sectorfile.c_str(), // input file name
111-
nofEvents, // number of entries to publish
112-
publishingMode,
113-
Output{mco.origin, mco.description, subSpec, persistency},
114-
mcbranchname.c_str(), // name of mc label branch
115-
Output{dto.origin, dto.description, subSpec, persistency},
116-
clusterbranchname.c_str() // name of cluster branch
117-
);
118-
} else {
119-
readers[sector] = std::make_shared<RootTreeReader>(treename.c_str(), // tree name
120-
sectorfile.c_str(), // input file name
121-
nofEvents, // number of entries to publish
122-
publishingMode,
123-
Output{dto.origin, dto.description, subSpec, persistency},
124-
clusterbranchname.c_str() // name of cluster branch
125-
);
126-
}
103+
readers[sector] = creator(treename.c_str(), // tree name
104+
sectorfile.c_str(), // input file name
105+
nofEvents, // number of entries to publish
106+
publishingMode,
107+
subSpec,
108+
clusterbranchname.c_str(), // name of data branch
109+
mcbranchname.c_str() // name of mc label branch
110+
);
127111
if (++outputId == outputIds.end()) {
128112
outputId = outputIds.begin();
129113
}

Detectors/TPC/workflow/src/RecoWorkflow.cxx

Lines changed: 12 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -112,17 +112,18 @@ framework::WorkflowSpec getWorkflow(std::vector<int> const& tpcSectors, std::vec
112112
// needs to be done in accordance. This means, if a new input option is added
113113
// also the dispatch trigger needs to be updated.
114114
if (inputType == InputType::Digits) {
115-
specs.emplace_back(o2::tpc::getPublisherSpec(PublisherConf{
116-
"tpc-digit-reader",
117-
"o2sim",
118-
{"digitbranch", "TPCDigit", "Digit branch"},
119-
{"mcbranch", "TPCDigitMCTruth", "MC label branch"},
120-
OutputSpec{"TPC", "DIGITS"},
121-
OutputSpec{"TPC", "DIGITSMCTR"},
122-
tpcSectors,
123-
laneConfiguration,
124-
},
125-
propagateMC));
115+
using Type = std::vector<o2::tpc::Digit>;
116+
specs.emplace_back(o2::tpc::getPublisherSpec<Type>(PublisherConf{
117+
"tpc-digit-reader",
118+
"o2sim",
119+
{"digitbranch", "TPCDigit", "Digit branch"},
120+
{"mcbranch", "TPCDigitMCTruth", "MC label branch"},
121+
OutputSpec{"TPC", "DIGITS"},
122+
OutputSpec{"TPC", "DIGITSMCTR"},
123+
tpcSectors,
124+
laneConfiguration,
125+
},
126+
propagateMC));
126127
} else if (inputType == InputType::Raw) {
127128
specs.emplace_back(o2::tpc::getPublisherSpec(PublisherConf{
128129
"tpc-raw-cluster-reader",

0 commit comments

Comments
 (0)