Skip to content

Commit 279fcaa

Browse files
committed
index mapping serialization
1 parent dc5465a commit 279fcaa

File tree

7 files changed

+366
-54
lines changed

7 files changed

+366
-54
lines changed

Framework/AnalysisSupport/src/AODReaderHelpers.cxx

Lines changed: 46 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -18,20 +18,22 @@
1818
#include "Framework/CallbackService.h"
1919
#include "Framework/DataSpecUtils.h"
2020
#include "../src/ExpressionJSONHelpers.h"
21+
#include "../src/IndexJSONHelpers.h"
2122
#include "Framework/ConfigContext.h"
2223
#include "Framework/AnalysisContext.h"
2324

2425
namespace o2::framework::readers
2526
{
27+
namespace
28+
{
2629
template <size_t N, std::array<soa::TableRef, N> refs>
2730
static inline auto extractOriginals(ProcessingContext& pc)
2831
{
2932
return [&]<size_t... Is>(std::index_sequence<Is...>) -> std::vector<std::shared_ptr<arrow::Table>> {
3033
return {pc.inputs().get<TableConsumer>(o2::aod::label<refs[Is]>())->asArrowTable()...};
3134
}(std::make_index_sequence<refs.size()>());
3235
}
33-
namespace
34-
{
36+
3537
template <typename D>
3638
requires(D::exclusive)
3739
auto make_build(D metadata, InputSpec const& input, ProcessingContext& pc)
@@ -58,16 +60,39 @@ auto make_build(D metadata, InputSpec const& input, ProcessingContext& pc)
5860
index_pack_t{});
5961
}
6062

63+
static inline auto extractSources(ProcessingContext& pc, std::vector<std::string> const& labels)
64+
{
65+
std::vector<std::shared_ptr<arrow::Table>> tables;
66+
for (auto const& label : labels) {
67+
tables.emplace_back(pc.inputs().get<TableConsumer>(label.c_str())->asArrowTable());
68+
}
69+
return tables;
70+
}
71+
6172
struct Builder {
73+
std::string binding;
74+
std::vector<std::string> labels;
75+
std::vector<o2::soa::IndexRecord> records;
76+
header::DataOrigin origin;
77+
header::DataDescription description;
78+
header::DataHeader::SubSpecificationType version;
79+
80+
std::shared_ptr<arrow::Table> build(ProcessingContext& pc) const
81+
{
82+
std::shared_ptr<arrow::Table> result;
83+
auto tables = extractSources(pc, labels);
84+
return result;
85+
}
6286

6387
};
6488

6589
struct Buildable {
6690
std::string binding;
67-
91+
std::vector<std::string> labels;
6892
header::DataOrigin origin;
6993
header::DataDescription description;
7094
header::DataHeader::SubSpecificationType version;
95+
std::vector<o2::soa::IndexRecord> records;
7196

7297
Buildable(InputSpec const& spec)
7398
: binding{spec.binding}
@@ -77,14 +102,25 @@ struct Buildable {
77102
description = description_;
78103
version = version_;
79104

80-
// The following components are needed to build an index table
81-
// 1. the labels of the source tables to extract from inputRecord -> extracted from input metadata
82-
// 2. the mapping, in the order of the definition of columns, of the
83-
// position in each source table of an index column pointing to the Key
84-
// and the types of index to write (self, single-valued, slice or array)
85-
// the mapping has to be created at the point where the type information is available and
86-
// put into the input spec metadata as a vector of (type, label, pos)
105+
auto loc = std::find_if(spec.metadata.begin(), spec.metadata.end(), [](ConfigParamSpec const& cps) { return cps.name.compare("index-records") == 0; });
106+
std::stringstream iws(loc->defaultValue.get<std::string>());
107+
records = IndexJSONHelpers::read(iws);
87108

109+
for (auto const& r : records) {
110+
labels.emplace_back(r.label);
111+
}
112+
}
113+
114+
Builder createBuilder() const
115+
{
116+
return Builder{
117+
binding,
118+
labels,
119+
records,
120+
origin,
121+
description,
122+
version
123+
};
88124
}
89125

90126
};
@@ -132,21 +168,6 @@ AlgorithmSpec AODReaderHelpers::indexBuilderCallback(ConfigContext const& ctx)
132168

133169
namespace
134170
{
135-
template <o2::aod::is_aod_hash D>
136-
auto make_spawn(InputSpec const& input, ProcessingContext& pc)
137-
{
138-
using metadata_t = o2::aod::MetadataTrait<D>::metadata;
139-
constexpr auto sources = metadata_t::sources;
140-
static std::shared_ptr<gandiva::Projector> projector = nullptr;
141-
static std::shared_ptr<arrow::Schema> schema = std::make_shared<arrow::Schema>(o2::soa::createFieldsFromColumns(typename metadata_t::expression_pack_t{}));
142-
static auto projectors = []<typename... C>(framework::pack<C...>) -> std::array<expressions::Projector, sizeof...(C)>
143-
{
144-
return {{std::move(C::Projector())...}};
145-
}
146-
(typename metadata_t::expression_pack_t{});
147-
return o2::framework::spawner<D>(extractOriginals<sources.size(), sources>(pc), input.binding.c_str(), projectors.data(), projector, schema);
148-
}
149-
150171
struct Maker {
151172
std::string binding;
152173
std::vector<std::string> labels;

Framework/Core/CMakeLists.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -142,6 +142,7 @@ o2_add_library(Framework
142142
src/Variant.cxx
143143
src/VariantJSONHelpers.cxx
144144
src/ExpressionJSONHelpers.cxx
145+
src/IndexJSONHelpers.cxx
145146
src/VariantPropertyTreeHelpers.cxx
146147
src/WorkflowCustomizationHelpers.cxx
147148
src/WorkflowHelpers.cxx

Framework/Core/include/Framework/ASoA.h

Lines changed: 13 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -217,6 +217,19 @@ using is_self_index_t = typename std::conditional_t<is_self_index_column<C>, std
217217

218218
namespace o2::aod
219219
{
220+
namespace {
221+
template <typename Key, size_t N, std::array<bool, N> map>
222+
static consteval int getIndexPosToKey_impl()
223+
{
224+
constexpr const auto pos = std::find(map.begin(), map.end(), true);
225+
if constexpr (pos != map.end()) {
226+
return std::distance(map.begin(), pos);
227+
} else {
228+
return -1;
229+
}
230+
}
231+
}
232+
220233
/// Base type for table metadata
221234
template <typename D, typename... Cs>
222235
struct TableMetadata {
@@ -243,17 +256,6 @@ struct TableMetadata {
243256
return getIndexPosToKey_impl<Key, framework::pack_size(persistent_columns_t{}), getMap<Key>(persistent_columns_t{})>();
244257
}
245258

246-
template <typename Key, size_t N, std::array<bool, N> map>
247-
static consteval int getIndexPosToKey_impl()
248-
{
249-
constexpr const auto pos = std::find(map.begin(), map.end(), true);
250-
if constexpr (pos != map.end()) {
251-
return std::distance(map.begin(), pos);
252-
} else {
253-
return -1;
254-
}
255-
}
256-
257259
static std::shared_ptr<arrow::Schema> getSchema()
258260
{
259261
return std::make_shared<arrow::Schema>([]<typename... C>(framework::pack<C...>&& p) { return o2::soa::createFieldsFromColumns(p); }(persistent_columns_t{}));

Framework/Core/include/Framework/AnalysisHelpers.h

Lines changed: 64 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -25,11 +25,30 @@
2525
#include "Framework/TableBuilder.h"
2626
#include "Framework/Traits.h"
2727

28+
#include <cstdio>
2829
#include <string>
30+
namespace o2::soa {
31+
enum struct IndexKind : int {
32+
IdxInvalid = -1,
33+
IdxSelf = 0,
34+
IdxSingle = 1,
35+
IdxSlice = 2,
36+
IdxArray = 3
37+
};
38+
39+
struct IndexRecord {
40+
std::string label;
41+
IndexKind kind;
42+
int pos;
43+
auto operator<=>(const IndexRecord&) const = default;
44+
};
45+
} // namespace o2::soa
46+
2947
namespace o2::framework
3048
{
3149
std::string serializeProjectors(std::vector<framework::expressions::Projector>& projectors);
3250
std::string serializeSchema(std::shared_ptr<arrow::Schema> schema);
51+
std::string serializeIndexRecords(std::vector<o2::soa::IndexRecord>& irs);
3352
} // namespace o2::framework
3453

3554
namespace o2::soa
@@ -54,19 +73,6 @@ constexpr auto tableRef2Schema()
5473
{"\"\""}};
5574
}
5675

57-
enum struct IndexKind : unsigned {
58-
IdxSelf = 0,
59-
IdxSingle = 1,
60-
IdxSlice = 2,
61-
IdxArray = 3
62-
};
63-
64-
struct IndexRecord {
65-
std::string label;
66-
IndexKind kind;
67-
size_t pos;
68-
};
69-
7076
namespace
7177
{
7278
template <soa::with_sources T>
@@ -102,6 +108,47 @@ inline constexpr auto getCCDBUrls()
102108
return result;
103109
}
104110

111+
template <typename T>
112+
requires(std::same_as<T, int>)
113+
consteval IndexKind getIndexKind()
114+
{
115+
return IndexKind::IdxSingle;
116+
}
117+
118+
template <typename T>
119+
requires(std::is_bounded_array_v<T>)
120+
consteval IndexKind getIndexKind()
121+
{
122+
return IndexKind::IdxSlice;
123+
}
124+
125+
template <typename T>
126+
requires(framework::is_specialization_v<T, std::vector>)
127+
consteval IndexKind getIndexKind()
128+
{
129+
return IndexKind::IdxArray;
130+
}
131+
132+
template <soa::with_index_pack T>
133+
inline constexpr auto getIndexMapping()
134+
{
135+
std::vector<IndexRecord> idx;
136+
using indices = T::index_pack_t;
137+
using Key = T::Key;
138+
[&idx]<size_t... Is>(std::index_sequence<Is...>) mutable {
139+
constexpr auto refs = T::sources;
140+
([&idx]<TableRef ref, typename CT>() mutable {
141+
constexpr auto pos = o2::aod::MetadataTrait<o2::aod::Hash<ref.desc_hash>>::metadata::template getIndexPosToKey<Key>();
142+
if constexpr (pos == -1) {
143+
idx.emplace_back(o2::aod::label<ref>(), IndexKind::IdxSelf, pos);
144+
} else {
145+
idx.emplace_back(o2::aod::label<ref>(), getIndexKind<CT>(), pos);
146+
}
147+
}.template operator()<refs[Is], typename framework::pack_element_t<Is, indices>::type>(), ...);
148+
}(std::make_index_sequence<framework::pack_size(indices{})>());;
149+
return idx;
150+
}
151+
105152
template <soa::with_sources T>
106153
constexpr auto getInputMetadata() -> std::vector<framework::ConfigParamSpec>
107154
{
@@ -170,8 +217,8 @@ constexpr auto getExpressionMetadata() -> std::vector<framework::ConfigParamSpec
170217
template <soa::with_index_pack T>
171218
constexpr auto getIndexMetadata() -> std::vector<framework::ConfigParamSpec>
172219
{
173-
174-
return {};
220+
auto map = getIndexMapping<T>();
221+
return {framework::ConfigParamSpec{"index-records", framework::VariantType::String, framework::serializeIndexRecords(map), {"\"\""}}};
175222
}
176223

177224
template <typename T>
@@ -193,6 +240,8 @@ constexpr auto tableRef2InputSpec()
193240
metadata.insert(metadata.end(), ccdbMetadata.begin(), ccdbMetadata.end());
194241
auto p = getExpressionMetadata<typename o2::aod::MetadataTrait<o2::aod::Hash<R.desc_hash>>::metadata>();
195242
metadata.insert(metadata.end(), p.begin(), p.end());
243+
auto idx = getIndexMetadata<typename o2::aod::MetadataTrait<o2::aod::Hash<R.desc_hash>>::metadata>();
244+
metadata.insert(metadata.end(), idx.begin(), idx.end());
196245
if constexpr (!soa::with_ccdb_urls<typename o2::aod::MetadataTrait<o2::aod::Hash<R.desc_hash>>::metadata>) {
197246
metadata.emplace_back(framework::ConfigParamSpec{"schema", framework::VariantType::String, framework::serializeSchema(o2::aod::MetadataTrait<o2::aod::Hash<R.desc_hash>>::metadata::getSchema()), {"\"\""}});
198247
}

Framework/Core/src/AnalysisHelpers.cxx

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,8 +8,10 @@
88
// In applying this license CERN does not waive the privileges and immunities
99
// granted to it by virtue of its status as an Intergovernmental Organization
1010
// or submit itself to any jurisdiction.
11+
#include "Framework/AnalysisHelpers.h"
1112
#include "Framework/ExpressionHelpers.h"
1213
#include "ExpressionJSONHelpers.h"
14+
#include "IndexJSONHelpers.h"
1315

1416
namespace o2::framework
1517
{
@@ -41,4 +43,11 @@ std::string serializeSchema(std::shared_ptr<arrow::Schema> schema)
4143
ArrowJSONHelpers::write(osm, schema);
4244
return osm.str();
4345
}
46+
47+
std::string serializeIndexRecords(std::vector<o2::soa::IndexRecord>& irs)
48+
{
49+
std::stringstream osm;
50+
IndexJSONHelpers::write(osm, irs);
51+
return osm.str();
52+
}
4453
} // namespace o2::framework

0 commit comments

Comments
 (0)