1010// or submit itself to any jurisdiction.
1111
1212#include " AODReaderHelpers.h"
13+ #include " ../src/ExpressionJSONHelpers.h"
14+ #include " ../src/IndexJSONHelpers.h"
15+
16+ #include " Framework/AnalysisDataModel.h"
1317#include " Framework/AnalysisHelpers.h"
14- #include " Framework/AnalysisDataModelHelpers.h"
15- #include " Framework/ExpressionHelpers.h"
1618#include " Framework/DataProcessingHelpers.h"
1719#include " Framework/AlgorithmSpec.h"
18- #include " Framework/CallbackService.h"
1920#include " Framework/DataSpecUtils.h"
20- #include " ../src/ExpressionJSONHelpers.h"
21- #include " ../src/IndexJSONHelpers.h"
2221#include " Framework/ConfigContext.h"
2322#include " Framework/AnalysisContext.h"
2423
2524namespace o2 ::framework::readers
2625{
2726namespace
2827{
29- template <size_t N, std::array<soa::TableRef, N> refs>
30- static inline auto extractOriginals (ProcessingContext& pc)
31- {
32- return [&]<size_t ... Is>(std::index_sequence<Is...>) -> std::vector<std::shared_ptr<arrow::Table>> {
33- return {pc.inputs ().get <TableConsumer>(o2::aod::label<refs[Is]>())->asArrowTable ()...};
34- }(std::make_index_sequence<refs.size ()>());
35- }
36-
37- template <typename D>
38- requires (D::exclusive)
39- auto make_build (D metadata, InputSpec const & input, ProcessingContext& pc)
40- {
41- using metadata_t = decltype (metadata);
42- using Key = typename metadata_t ::Key;
43- using index_pack_t = typename metadata_t ::index_pack_t ;
44- constexpr auto sources = metadata_t ::sources;
45- return o2::framework::IndexBuilder<o2::framework::Exclusive>::indexBuilder<Key, sources.size (), sources>(input.binding .c_str (),
46- extractOriginals<sources.size (), sources>(pc),
47- index_pack_t {});
48- }
49-
50- template <typename D>
51- requires (!D::exclusive)
52- auto make_build (D metadata, InputSpec const & input, ProcessingContext& pc)
53- {
54- using metadata_t = decltype (metadata);
55- using Key = typename metadata_t ::Key;
56- using index_pack_t = typename metadata_t ::index_pack_t ;
57- constexpr auto sources = metadata_t ::sources;
58- return o2::framework::IndexBuilder<o2::framework::Sparse>::indexBuilder<Key, sources.size (), sources>(input.binding .c_str (),
59- extractOriginals<sources.size (), sources>(pc),
60- index_pack_t {});
61- }
62-
63- static inline auto extractSources (ProcessingContext& pc, std::vector<std::string> const & labels)
64- {
65- std::vector<std::shared_ptr<arrow::Table>> tables;
66- for (auto const & label : labels) {
67- tables.emplace_back (pc.inputs ().get <TableConsumer>(label.c_str ())->asArrowTable ());
68- }
69- return tables;
70- }
71-
72- struct Builder {
73- std::string binding;
74- std::vector<std::string> labels;
75- std::vector<o2::soa::IndexRecord> records;
76- header::DataOrigin origin;
77- header::DataDescription description;
78- header::DataHeader::SubSpecificationType version;
79-
80- std::shared_ptr<arrow::Table> build (ProcessingContext& pc) const
81- {
82- std::shared_ptr<arrow::Table> result;
83- auto tables = extractSources (pc, labels);
84- return result;
85- }
86-
87- };
88-
8928struct Buildable {
29+ bool exclusive = false ;
9030 std::string binding;
9131 std::vector<std::string> labels;
9232 header::DataOrigin origin;
@@ -106,14 +46,18 @@ struct Buildable {
10646 std::stringstream iws (loc->defaultValue .get <std::string>());
10747 records = IndexJSONHelpers::read (iws);
10848
49+ loc = std::find_if (spec.metadata .begin (), spec.metadata .end (), [](ConfigParamSpec const & cps){ return cps.name .compare (" index-exclusive" ) == 0 ; });
50+ exclusive = loc->defaultValue .get <bool >();
51+
10952 for (auto const & r : records) {
11053 labels.emplace_back (r.label );
11154 }
11255 }
11356
114- Builder createBuilder () const
57+ framework:: Builder createBuilder () const
11558 {
116- return Builder{
59+ return {
60+ exclusive,
11761 binding,
11862 labels,
11963 records,
@@ -131,70 +75,25 @@ AlgorithmSpec AODReaderHelpers::indexBuilderCallback(ConfigContext const& ctx)
13175{
13276 auto & ac = ctx.services ().get <AnalysisContext>();
13377 return AlgorithmSpec::InitCallback{[requested = ac.requestedIDXs ](InitContext& /* ic*/ ) {
134- return [requested](ProcessingContext& pc) {
78+ std::vector<Buildable> buildables;
79+ for (auto & i : requested) {
80+ buildables.emplace_back (i);
81+ }
82+ std::vector<Builder> builders;
83+ for (auto & b : buildables) {
84+ builders.push_back (b.createBuilder ());
85+ }
86+ return [builders](ProcessingContext& pc) {
13587 auto outputs = pc.outputs ();
136- // spawn tables
137- for (auto & input : requested) {
138- auto && [origin, description, version] = DataSpecUtils::asConcreteDataMatcher (input);
139- if (description == header::DataDescription{" MA_RN2_EX" }) {
140- outputs.adopt (Output{origin, description, version}, make_build (o2::aod::Run2MatchedExclusiveMetadata{}, input, pc));
141- } else if (description == header::DataDescription{" MA_RN2_SP" }) {
142- outputs.adopt (Output{origin, description, version}, make_build (o2::aod::Run2MatchedSparseMetadata{}, input, pc));
143- } else if (description == header::DataDescription{" MA_RN3_EX" }) {
144- outputs.adopt (Output{origin, description, version}, make_build (o2::aod::Run3MatchedExclusiveMetadata{}, input, pc));
145- } else if (description == header::DataDescription{" MA_RN3_SP" }) {
146- outputs.adopt (Output{origin, description, version}, make_build (o2::aod::Run3MatchedSparseMetadata{}, input, pc));
147- } else if (description == header::DataDescription{" MA_BCCOL_EX" }) {
148- outputs.adopt (Output{origin, description, version}, make_build (o2::aod::MatchedBCCollisionsExclusiveMetadata{}, input, pc));
149- } else if (description == header::DataDescription{" MA_BCCOL_SP" }) {
150- outputs.adopt (Output{origin, description, version}, make_build (o2::aod::MatchedBCCollisionsSparseMetadata{}, input, pc));
151- } else if (description == header::DataDescription{" MA_BCCOLS_EX" }) {
152- outputs.adopt (Output{origin, description, version}, make_build (o2::aod::MatchedBCCollisionsExclusiveMultiMetadata{}, input, pc));
153- } else if (description == header::DataDescription{" MA_BCCOLS_SP" }) {
154- outputs.adopt (Output{origin, description, version}, make_build (o2::aod::MatchedBCCollisionsSparseMultiMetadata{}, input, pc));
155- } else if (description == header::DataDescription{" MA_RN3_BC_SP" }) {
156- outputs.adopt (Output{origin, description, version}, make_build (o2::aod::Run3MatchedToBCSparseMetadata{}, input, pc));
157- } else if (description == header::DataDescription{" MA_RN3_BC_EX" }) {
158- outputs.adopt (Output{origin, description, version}, make_build (o2::aod::Run3MatchedToBCExclusiveMetadata{}, input, pc));
159- } else if (description == header::DataDescription{" MA_RN2_BC_SP" }) {
160- outputs.adopt (Output{origin, description, version}, make_build (o2::aod::Run2MatchedToBCSparseMetadata{}, input, pc));
161- } else {
162- throw std::runtime_error (" Not an index table" );
163- }
88+ for (auto & builder : builders) {
89+ outputs.adopt (Output{builder.origin , builder.description , builder.version }, builder.materialize (pc));
16490 }
16591 };
16692 }};
16793}
16894
16995namespace
17096{
171- struct Maker {
172- std::string binding;
173- std::vector<std::string> labels;
174- std::vector<std::shared_ptr<gandiva::Expression>> expressions;
175- std::shared_ptr<gandiva::Projector> projector = nullptr ;
176- std::shared_ptr<arrow::Schema> schema = nullptr ;
177- std::shared_ptr<arrow::Schema> inputSchema = nullptr ;
178-
179- header::DataOrigin origin;
180- header::DataDescription description;
181- header::DataHeader::SubSpecificationType version;
182-
183- std::shared_ptr<arrow::Table> make (ProcessingContext& pc) const
184- {
185- std::vector<std::shared_ptr<arrow::Table>> originals;
186- for (auto const & label : labels) {
187- originals.push_back (pc.inputs ().get <TableConsumer>(label)->asArrowTable ());
188- }
189- auto fullTable = soa::ArrowHelpers::joinTables (std::move (originals), std::span{labels.begin (), labels.size ()});
190- if (fullTable->num_rows () == 0 ) {
191- return arrow::Table::MakeEmpty (schema).ValueOrDie ();
192- }
193-
194- return spawnerHelper (fullTable, schema, binding.c_str (), schema->num_fields (), projector);
195- }
196- };
197-
19897struct Spawnable {
19998 std::string binding;
20099 std::vector<std::string> labels;
@@ -222,6 +121,7 @@ struct Spawnable {
222121 iws.clear ();
223122 iws.str (loc->defaultValue .get <std::string>());
224123 outputSchema = ArrowJSONHelpers::read (iws);
124+ o2::framework::addLabelToSchema (outputSchema, binding.c_str ());
225125
226126 std::vector<std::shared_ptr<arrow::Schema>> schemas;
227127 for (auto & i : spec.metadata ) {
@@ -233,22 +133,14 @@ struct Spawnable {
233133 schemas.emplace_back (ArrowJSONHelpers::read (iws));
234134 }
235135 }
136+
236137 std::vector<std::shared_ptr<arrow::Field>> fields;
237138 for (auto & s : schemas) {
238139 std::copy (s->fields ().begin (), s->fields ().end (), std::back_inserter (fields));
239140 }
240- inputSchema = std::make_shared<arrow::Schema>(fields);
241141
242- int i = 0 ;
243- for (auto & p : projectors) {
244- expressions.push_back (
245- expressions::makeExpression (
246- expressions::createExpressionTree (
247- expressions::createOperations (p),
248- inputSchema),
249- outputSchema->field (i)));
250- ++i;
251- }
142+ inputSchema = std::make_shared<arrow::Schema>(fields);
143+ expressions = expressions::materializeProjectors (projectors, inputSchema, outputSchema->fields ());
252144 }
253145
254146 std::shared_ptr<gandiva::Projector> makeProjector () const
@@ -264,9 +156,8 @@ struct Spawnable {
264156 return p;
265157 }
266158
267- Maker createMaker () const
159+ framework::Spawner createMaker () const
268160 {
269- o2::framework::addLabelToSchema (outputSchema, binding.c_str ());
270161 return {
271162 binding,
272163 labels,
@@ -290,15 +181,15 @@ AlgorithmSpec AODReaderHelpers::aodSpawnerCallback(ConfigContext const& ctx)
290181 for (auto & i : requested) {
291182 spawnables.emplace_back (i);
292183 }
293- std::vector<Maker> makers ;
184+ std::vector<Spawner> spawners ;
294185 for (auto & s : spawnables) {
295- makers .push_back (s.createMaker ());
186+ spawners .push_back (s.createMaker ());
296187 }
297188
298- return [makers ](ProcessingContext& pc) mutable {
189+ return [spawners ](ProcessingContext& pc) mutable {
299190 auto outputs = pc.outputs ();
300- for (auto & maker : makers ) {
301- outputs.adopt (Output{maker .origin , maker .description , maker .version }, maker. make (pc));
191+ for (auto & spawner : spawners ) {
192+ outputs.adopt (Output{spawner .origin , spawner .description , spawner .version }, spawner. materialize (pc));
302193 }
303194 };
304195 }};
0 commit comments