Skip to content

Commit 8f3b0b9

Browse files
committed
rework index builder
1 parent 9210426 commit 8f3b0b9

File tree

9 files changed

+343
-275
lines changed

9 files changed

+343
-275
lines changed

Framework/AnalysisSupport/src/AODReaderHelpers.cxx

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@ struct Buildable {
3333
header::DataDescription description;
3434
header::DataHeader::SubSpecificationType version;
3535
std::vector<o2::soa::IndexRecord> records;
36+
std::shared_ptr<arrow::Schema> outputSchema;
3637

3738
Buildable(InputSpec const& spec)
3839
: binding{spec.binding}
@@ -52,19 +53,26 @@ struct Buildable {
5253
for (auto const& r : records) {
5354
labels.emplace_back(r.label);
5455
}
56+
outputSchema = std::make_shared<arrow::Schema>([](std::vector<o2::soa::IndexRecord> const& recs) {
57+
std::vector<std::shared_ptr<arrow::Field>> fields;
58+
for (auto& r : recs) {
59+
fields.push_back(r.field());
60+
}
61+
return fields;
62+
}(records))
63+
->WithMetadata(std::make_shared<arrow::KeyValueMetadata>(std::vector{std::string{"label"}}, std::vector{std::string{binding}}));
5564
}
5665

5766
framework::Builder createBuilder() const
5867
{
5968
return {
6069
exclusive,
61-
binding,
6270
labels,
6371
records,
72+
outputSchema,
6473
origin,
6574
description,
66-
version
67-
};
75+
version};
6876
}
6977

7078
};

Framework/Core/include/Framework/ASoA.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3274,6 +3274,7 @@ consteval auto getIndexTargets()
32743274
constexpr auto a = o2::soa::mergeOriginals<typename Cs::binding_t...>(); \
32753275
return o2::aod::filterForKey<a.size(), a, Key>(); \
32763276
}(framework::pack<__VA_ARGS__>{}); \
3277+
static_assert(sources.size() == framework::pack_size(index_pack_t{}), "One of the referred tables does not have index to Key"); \
32773278
}; \
32783279
using _Name_##Metadata = _Name_##MetadataFrom<o2::aod::Hash<_Origin_ ""_h>>; \
32793280
\

Framework/Core/include/Framework/AnalysisHelpers.h

Lines changed: 108 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -27,51 +27,111 @@
2727

2828
#include <cstdio>
2929
#include <string>
30-
namespace o2::soa {
31-
enum struct IndexKind : int {
32-
IdxInvalid = -1,
33-
IdxSelf = 0,
34-
IdxSingle = 1,
35-
IdxSlice = 2,
36-
IdxArray = 3
37-
};
38-
30+
namespace o2::soa
31+
{
3932
struct IndexRecord {
4033
std::string label;
4134
std::string columnLabel;
4235
IndexKind kind;
4336
int pos;
44-
auto operator<=>(const IndexRecord&) const = default;
45-
};
37+
std::shared_ptr<arrow::DataType> type = [](IndexKind kind) -> std::shared_ptr<arrow::DataType> {
38+
switch (kind) {
39+
case IndexKind::IdxSingle:
40+
case IndexKind::IdxSelf:
41+
return arrow::int32();
42+
case IndexKind::IdxSlice:
43+
return arrow::fixed_size_list(arrow::int32(), 2);
44+
case IndexKind::IdxArray:
45+
return arrow::list(arrow::int32());
46+
default:
47+
return {nullptr};
48+
}
49+
}(kind);
4650

47-
namespace
48-
{
49-
inline constexpr int listSize(soa::IndexKind kind)
50-
{
51-
switch (kind) {
52-
case soa::IndexKind::IdxSingle:
53-
return 1;
54-
break;
55-
case soa::IndexKind::IdxSlice:
56-
return 2;
57-
break;
58-
case soa::IndexKind::IdxArray:
59-
return -1;
60-
break;
61-
default:
62-
return -2;
63-
break;
51+
auto operator==(IndexRecord const& other) const
52+
{
53+
return (this->label == other.label) && (this->columnLabel == other.columnLabel) && (this->kind == other.kind) && (this->pos == other.pos);
6454
}
65-
}
66-
} // namespace
55+
56+
std::shared_ptr<arrow::Field> field() const
57+
{
58+
return std::make_shared<arrow::Field>(columnLabel, type);
59+
}
60+
};
6761

6862
struct IndexBuilder {
69-
static std::shared_ptr<arrow::Table> materialize(const char* label, std::vector<std::shared_ptr<arrow::Table>>&& tables, std::vector<soa::IndexRecord> const& records, bool exclusive);
63+
static std::shared_ptr<arrow::Table> materialize(std::vector<std::shared_ptr<arrow::Table>>&& tables, std::vector<soa::IndexRecord> const& records, std::shared_ptr<arrow::Schema> const& schema, bool exclusive);
7064
};
7165
} // namespace o2::soa
7266

7367
namespace o2::framework
7468
{
69+
std::shared_ptr<arrow::Table> makeEmptyTableImpl(const char* name, std::shared_ptr<arrow::Schema>& schema);
70+
71+
template <soa::is_table T>
72+
auto makeEmptyTable(const char* name)
73+
{
74+
auto schema = std::make_shared<arrow::Schema>(soa::createFieldsFromColumns(typename T::table_t::persistent_columns_t{}));
75+
return makeEmptyTableImpl(name, schema);
76+
}
77+
78+
template <soa::TableRef R>
79+
auto makeEmptyTable()
80+
{
81+
auto schema = std::make_shared<arrow::Schema>(soa::createFieldsFromColumns(typename aod::MetadataTrait<aod::Hash<R.desc_hash>>::metadata::persistent_columns_t{}));
82+
return makeEmptyTableImpl(o2::aod::label<R>(), schema);
83+
}
84+
85+
template <typename... Cs>
86+
auto makeEmptyTable(const char* name, framework::pack<Cs...> p)
87+
{
88+
auto schema = std::make_shared<arrow::Schema>(soa::createFieldsFromColumns(p));
89+
return makeEmptyTableImpl(name, schema);
90+
}
91+
92+
template <aod::is_aod_hash D>
93+
auto makeEmptyTable(const char* name)
94+
{
95+
auto schema = std::make_shared<arrow::Schema>(soa::createFieldsFromColumns(typename aod::MetadataTrait<D>::metadata::persistent_columns_t{}));
96+
return makeEmptyTableImpl(name, schema);
97+
}
98+
99+
std::shared_ptr<arrow::Table> spawnerHelper(std::shared_ptr<arrow::Table> const& fullTable, std::shared_ptr<arrow::Schema> newSchema, size_t nColumns,
100+
expressions::Projector* projectors, const char* name, std::shared_ptr<gandiva::Projector>& projector);
101+
102+
std::shared_ptr<arrow::Table> spawnerHelper(std::shared_ptr<arrow::Table> const& fullTable, std::shared_ptr<arrow::Schema> newSchema,
103+
const char* name, size_t nColumns,
104+
const std::shared_ptr<gandiva::Projector>& projector);
105+
106+
/// Expression-based column generator to materialize columns
107+
template <aod::is_aod_hash D>
108+
requires(soa::has_extension<typename o2::aod::MetadataTrait<D>::metadata>)
109+
auto spawner(std::shared_ptr<arrow::Table> const& fullTable, const char* name, o2::framework::expressions::Projector* projectors, std::shared_ptr<gandiva::Projector>& projector, std::shared_ptr<arrow::Schema> const& schema)
110+
{
111+
if (fullTable->num_rows() == 0) {
112+
return makeEmptyTable<D>(name);
113+
}
114+
constexpr auto Ncol = []<typename M>() {
115+
if constexpr (soa::has_configurable_extension<M>) {
116+
return framework::pack_size(typename M::placeholders_pack_t{});
117+
} else {
118+
return framework::pack_size(typename M::expression_pack_t{});
119+
}
120+
}.template operator()<typename o2::aod::MetadataTrait<D>::metadata>();
121+
return spawnerHelper(fullTable, schema, Ncol, projectors, name, projector);
122+
}
123+
124+
template <typename... C>
125+
auto spawner(framework::pack<C...>, std::vector<std::shared_ptr<arrow::Table>>&& tables, const char* name, expressions::Projector* projectors, std::shared_ptr<gandiva::Projector>& projector, std::shared_ptr<arrow::Schema> const& schema)
126+
{
127+
std::array<const char*, 1> labels{"original"};
128+
auto fullTable = soa::ArrowHelpers::joinTables(std::move(tables), std::span<const char* const>{labels});
129+
if (fullTable->num_rows() == 0) {
130+
return makeEmptyTable(name, framework::pack<C...>{});
131+
}
132+
return spawnerHelper(fullTable, schema, sizeof...(C), projectors, name, projector);
133+
}
134+
75135
std::string serializeProjectors(std::vector<framework::expressions::Projector>& projectors);
76136
std::string serializeSchema(std::shared_ptr<arrow::Schema> schema);
77137
std::string serializeIndexRecords(std::vector<o2::soa::IndexRecord>& irs);
@@ -94,9 +154,9 @@ struct Spawner {
94154

95155
struct Builder {
96156
bool exclusive;
97-
std::string binding;
98157
std::vector<std::string> labels;
99158
std::vector<o2::soa::IndexRecord> records;
159+
std::shared_ptr<arrow::Schema> outputSchema;
100160
header::DataOrigin origin;
101161
header::DataDescription description;
102162
header::DataHeader::SubSpecificationType version;
@@ -478,29 +538,29 @@ struct TableTransform {
478538
constexpr static auto sources = M::sources;
479539

480540
template <soa::TableRef R>
481-
static constexpr auto base_spec()
541+
static auto base_spec()
482542
{
483543
return soa::tableRef2InputSpec<R>();
484544
}
485545

486546
static auto base_specs()
487547
{
488-
return []<size_t... Is>(std::index_sequence<Is...>) -> std::vector<InputSpec> {
489-
return {base_spec<sources[Is]>()...};
548+
return []<size_t... Is>(std::index_sequence<Is...>) {
549+
return std::array{base_spec<sources[Is]>()...};
490550
}(std::make_index_sequence<sources.size()>{});
491551
}
492552

493-
constexpr auto spec() const
553+
static constexpr auto spec()
494554
{
495555
return soa::tableRef2OutputSpec<Ref>();
496556
}
497557

498-
constexpr auto output() const
558+
static constexpr auto output()
499559
{
500560
return soa::tableRef2Output<Ref>();
501561
}
502562

503-
constexpr auto ref() const
563+
static constexpr auto ref()
504564
{
505565
return soa::tableRef2OutputRef<Ref>();
506566
}
@@ -526,11 +586,10 @@ struct Spawns : decltype(transformBase<T>()) {
526586
using spawnable_t = T;
527587
using metadata = decltype(transformBase<T>())::metadata;
528588
using extension_t = typename metadata::extension_table_t;
529-
using base_table_t = typename metadata::base_table_t;
530589
using expression_pack_t = typename metadata::expression_pack_t;
531590
static constexpr size_t N = framework::pack_size(expression_pack_t{});
532591

533-
constexpr auto pack()
592+
static consteval auto pack()
534593
{
535594
return expression_pack_t{};
536595
}
@@ -548,6 +607,7 @@ struct Spawns : decltype(transformBase<T>()) {
548607
{
549608
return extension->asArrowTable();
550609
}
610+
551611
std::shared_ptr<typename T::table_t> table = nullptr;
552612
std::shared_ptr<extension_t> extension = nullptr;
553613
std::array<o2::framework::expressions::Projector, N> projectors = []<typename... C>(framework::pack<C...>) -> std::array<expressions::Projector, sizeof...(C)>
@@ -556,7 +616,11 @@ struct Spawns : decltype(transformBase<T>()) {
556616
}
557617
(expression_pack_t{});
558618
std::shared_ptr<gandiva::Projector> projector = nullptr;
559-
std::shared_ptr<arrow::Schema> schema = std::make_shared<arrow::Schema>(o2::soa::createFieldsFromColumns(expression_pack_t{}));
619+
std::shared_ptr<arrow::Schema> schema = []() {
620+
auto s = std::make_shared<arrow::Schema>(o2::soa::createFieldsFromColumns(expression_pack_t{}));
621+
s->WithMetadata(std::make_shared<arrow::KeyValueMetadata>(std::vector{std::string{"label"}}, std::vector{std::string{o2::aod::label<T::ref>()}}));
622+
return s;
623+
}();
560624
};
561625

562626
template <typename T>
@@ -654,6 +718,8 @@ struct Builds : decltype(transformBase<T>()) {
654718
using Ts = typename T::rest_t;
655719
using index_pack_t = metadata::index_pack_t;
656720

721+
std::shared_ptr<arrow::Schema> outputSchema = []() { return std::make_shared<arrow::Schema>(soa::createFieldsFromColumns(index_pack_t{}))->WithMetadata(std::make_shared<arrow::KeyValueMetadata>(std::vector{std::string{"label"}}, std::vector{std::string{o2::aod::label<T::ref>()}})); }();
722+
657723
std::vector<soa::IndexRecord> map = soa::getIndexMapping<metadata>();
658724

659725
T* operator->()
@@ -678,7 +744,7 @@ struct Builds : decltype(transformBase<T>()) {
678744

679745
auto build(std::vector<std::shared_ptr<arrow::Table>>&& tables)
680746
{
681-
this->table = std::make_shared<T>(soa::IndexBuilder::materialize(o2::aod::label<T::ref>(), std::forward<std::vector<std::shared_ptr<arrow::Table>>>(tables), map, metadata::exclusive));
747+
this->table = std::make_shared<T>(soa::IndexBuilder::materialize(std::forward<std::vector<std::shared_ptr<arrow::Table>>>(tables), map, outputSchema, metadata::exclusive));
682748
return (this->table != nullptr);
683749
}
684750
};

Framework/Core/include/Framework/AnalysisManagers.h

Lines changed: 4 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -34,18 +34,6 @@ namespace o2::framework
3434

3535
namespace
3636
{
37-
template <typename O>
38-
static inline auto extractOriginal(ProcessingContext& pc)
39-
{
40-
return pc.inputs().get<TableConsumer>(aod::MetadataTrait<O>::metadata::tableLabel())->asArrowTable();
41-
}
42-
43-
template <typename... Os>
44-
static inline std::vector<std::shared_ptr<arrow::Table>> extractOriginals(framework::pack<Os...>, ProcessingContext& pc)
45-
{
46-
return {extractOriginal<Os>(pc)...};
47-
}
48-
4937
template <size_t N, std::array<soa::TableRef, N> refs>
5038
static inline auto extractOriginals(ProcessingContext& pc)
5139
{
@@ -160,12 +148,12 @@ const char* controlOption()
160148
}
161149

162150
template <typename T>
163-
concept with_base_table = requires(T const& t) { t.base_specs(); };
151+
concept with_base_table = requires { T::base_specs(); };
164152

165153
template <with_base_table T>
166154
bool requestInputs(std::vector<InputSpec>& inputs, T const& entity)
167155
{
168-
auto base_specs = entity.base_specs();
156+
auto base_specs = T::base_specs();
169157
for (auto base_spec : base_specs) {
170158
base_spec.metadata.push_back(ConfigParamSpec{std::string{controlOption<T>()}, VariantType::Bool, true, {"\"\""}});
171159
DataSpecUtils::updateInputList(inputs, std::forward<InputSpec>(base_spec));
@@ -289,9 +277,8 @@ bool prepareOutput(ProcessingContext& context, T& spawns)
289277
{
290278
using metadata = o2::aod::MetadataTrait<o2::aod::Hash<T::spawnable_t::ref.desc_hash>>::metadata;
291279
auto originalTable = soa::ArrowHelpers::joinTables(extractOriginals<metadata::sources.size(), metadata::sources>(context), std::span{metadata::base_table_t::originalLabels});
292-
if (originalTable->schema()->fields().empty() == true) {
293-
using base_table_t = typename T::base_table_t::table_t;
294-
originalTable = makeEmptyTable<base_table_t>(o2::aod::label<metadata::extension_table_t::ref>());
280+
if (originalTable->num_rows() == 0) {
281+
originalTable = makeEmptyTable<metadata::base_table_t::ref>();
295282
}
296283
using D = o2::aod::Hash<metadata::extension_table_t::ref.desc_hash>;
297284

0 commit comments

Comments
 (0)