@@ -66,82 +66,7 @@ inline constexpr int listSize(soa::IndexKind kind)
6666} // namespace
6767
6868struct IndexBuilder {
69- template <bool Exclusive>
70- static auto materialize (const char * label, std::vector<std::shared_ptr<arrow::Table>>&& tables, std::vector<soa::IndexRecord> const & records)
71- {
72- auto pool = arrow::default_memory_pool ();
73- std::vector<std::shared_ptr<framework::SelfIndexColumnBuilder>> builders;
74- framework::SelfIndexColumnBuilder self{records[0 ].columnLabel .c_str (), pool};
75- std::unique_ptr<framework::ChunkedArrayIterator> keyIndex = nullptr ;
76- if (records[0 ].kind != soa::IndexKind::IdxSelf) {
77- keyIndex = std::make_unique<framework::ChunkedArrayIterator>(tables[0 ]->column (records[0 ].pos ));
78- }
79-
80- for (auto i = 1U ; i < records.size (); ++i) {
81- if (records[i].kind == soa::IndexKind::IdxSelf) {
82- builders.emplace_back (std::make_shared<framework::SelfIndexColumnBuilder>(records[i].columnLabel .c_str (), pool));
83- } else {
84- builders.emplace_back (std::make_shared<framework::IndexColumnBuilder>(tables[i]->column (records[i].pos ), records[i].columnLabel .c_str (), listSize (records[i].kind ), pool));
85- }
86- }
87-
88- std::vector<bool > finds;
89- finds.resize (builders.size ());
90- for (int64_t counter = 0 ; counter < tables[0 ]->num_rows (); ++counter) {
91- int64_t idx = -1 ;
92- if (keyIndex == nullptr ) {
93- idx = counter;
94- } else {
95- idx = keyIndex->valueAt (counter);
96- }
97- for (auto i = 0U ; i < builders.size (); ++i) {
98- if (records[i+1 ].kind == soa::IndexKind::IdxSelf) {
99- finds[i] = builders[i]->find (idx);
100- } else {
101- finds[i] = std::static_pointer_cast<framework::IndexColumnBuilder>(builders[i])->find (idx);
102- }
103- }
104- if constexpr (Exclusive) {
105- if (std::none_of (finds.begin (), finds.end (), [](bool const x) { return x == false ; })) {
106- for (auto i = 0U ; i < builders.size (); ++i) {
107- if (records[i+1 ].kind == soa::IndexKind::IdxSelf) {
108- builders[i]->fill (idx);
109- } else {
110- std::static_pointer_cast<framework::IndexColumnBuilder>(builders[i])->fill (idx);
111- }
112- }
113- self.fill (counter);
114- }
115- } else {
116- for (auto i = 0U ; i < builders.size (); ++i) {
117- if (records[i+1 ].kind == soa::IndexKind::IdxSelf) {
118- builders[i]->fill (idx);
119- } else {
120- std::static_pointer_cast<framework::IndexColumnBuilder>(builders[i])->fill (idx);
121- }
122- }
123- self.fill (counter);
124- }
125- }
126-
127- std::vector<std::shared_ptr<arrow::ChunkedArray>> arrays;
128- arrays.reserve (records.size ());
129- std::vector<std::shared_ptr<arrow::Field>> fields;
130- fields.reserve (records.size ());
131- arrays.push_back (self.result ());
132- fields.push_back (self.field ());
133- for (auto i = 0U ; i < builders.size (); ++i) {
134- if (records[i+1 ].kind == soa::IndexKind::IdxSelf) {
135- arrays.push_back (builders[i]->result ());
136- fields.push_back (builders[i]->field ());
137- } else {
138- arrays.push_back (std::static_pointer_cast<framework::IndexColumnBuilder>(builders[i])->result ());
139- fields.push_back (std::static_pointer_cast<framework::IndexColumnBuilder>(builders[i])->field ());
140- }
141- }
142-
143- return framework::makeArrowTable (label, std::move (arrays), std::move (fields));
144- }
69+ static std::shared_ptr<arrow::Table> materialize (const char * label, std::vector<std::shared_ptr<arrow::Table>>&& tables, std::vector<soa::IndexRecord> const & records, bool exclusive);
14570};
14671} // namespace o2::soa
14772
@@ -150,6 +75,7 @@ namespace o2::framework
15075std::string serializeProjectors (std::vector<framework::expressions::Projector>& projectors);
15176std::string serializeSchema (std::shared_ptr<arrow::Schema> schema);
15277std::string serializeIndexRecords (std::vector<o2::soa::IndexRecord>& irs);
78+ std::vector<std::shared_ptr<arrow::Table>> extractSources (ProcessingContext& pc, std::vector<std::string> const & labels);
15379
15480struct Spawner {
15581 std::string binding;
@@ -163,32 +89,9 @@ struct Spawner {
16389 header::DataDescription description;
16490 header::DataHeader::SubSpecificationType version;
16591
166- std::shared_ptr<arrow::Table> materialize (ProcessingContext& pc) const
167- {
168- std::vector<std::shared_ptr<arrow::Table>> originals;
169- for (auto const & label : labels) {
170- originals.push_back (pc.inputs ().get <TableConsumer>(label)->asArrowTable ());
171- }
172- auto fullTable = soa::ArrowHelpers::joinTables (std::move (originals), std::span{labels.begin (), labels.size ()});
173- if (fullTable->num_rows () == 0 ) {
174- return arrow::Table::MakeEmpty (schema).ValueOrDie ();
175- }
176-
177- return spawnerHelper (fullTable, schema, binding.c_str (), schema->num_fields (), projector);
178- }
92+ std::shared_ptr<arrow::Table> materialize (ProcessingContext& pc) const ;
17993};
18094
181- namespace {
182- static inline auto extractSources (ProcessingContext& pc, std::vector<std::string> const & labels)
183- {
184- std::vector<std::shared_ptr<arrow::Table>> tables;
185- for (auto const & label : labels) {
186- tables.emplace_back (pc.inputs ().get <TableConsumer>(label.c_str ())->asArrowTable ());
187- }
188- return tables;
189- }
190- }
191-
19295struct Builder {
19396 bool exclusive;
19497 std::string binding;
@@ -198,17 +101,7 @@ struct Builder {
198101 header::DataDescription description;
199102 header::DataHeader::SubSpecificationType version;
200103
201- std::shared_ptr<arrow::Table> materialize (ProcessingContext& pc) const
202- {
203- std::shared_ptr<arrow::Table> result;
204- auto tables = extractSources (pc, labels);
205- if (exclusive) {
206- result = o2::soa::IndexBuilder::materialize<true >(binding.c_str (), std::move (tables), records);
207- } else {
208- result = o2::soa::IndexBuilder::materialize<false >(binding.c_str (), std::move (tables), records);
209- }
210- return result;
211- }
104+ std::shared_ptr<arrow::Table> materialize (ProcessingContext& pc) const ;
212105};
213106} // namespace o2::framework
214107
@@ -761,7 +654,7 @@ struct Builds : decltype(transformBase<T>()) {
761654 using Ts = typename T::rest_t ;
762655 using index_pack_t = metadata::index_pack_t ;
763656
764- std::vector<soa::IndexRecord> map;
657+ std::vector<soa::IndexRecord> map = soa::getIndexMapping<metadata>() ;
765658
766659 T* operator ->()
767660 {
@@ -785,10 +678,7 @@ struct Builds : decltype(transformBase<T>()) {
785678
786679 auto build (std::vector<std::shared_ptr<arrow::Table>>&& tables)
787680 {
788- if (map.empty ()) {
789- map = soa::getIndexMapping<metadata>();
790- }
791- this ->table = std::make_shared<T>(soa::IndexBuilder::materialize<metadata::exclusive>(o2::aod::label<T::ref>(), std::forward<std::vector<std::shared_ptr<arrow::Table>>>(tables), map));
681+ this ->table = std::make_shared<T>(soa::IndexBuilder::materialize (o2::aod::label<T::ref>(), std::forward<std::vector<std::shared_ptr<arrow::Table>>>(tables), map, metadata::exclusive));
792682 return (this ->table != nullptr );
793683 }
794684};
0 commit comments