Skip to content

Commit 54fb57a

Browse files
committed
fixup! rework index builder
1 parent 8f3b0b9 commit 54fb57a

File tree

6 files changed

+693
-332
lines changed

6 files changed

+693
-332
lines changed

Framework/AnalysisSupport/src/AODReaderHelpers.cxx

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -72,7 +72,7 @@ struct Buildable {
7272
outputSchema,
7373
origin,
7474
description,
75-
version};
75+
version, nullptr};
7676
}
7777

7878
};
@@ -91,7 +91,7 @@ AlgorithmSpec AODReaderHelpers::indexBuilderCallback(ConfigContext const& ctx)
9191
for (auto& b : buildables) {
9292
builders.push_back(b.createBuilder());
9393
}
94-
return [builders](ProcessingContext& pc) {
94+
return [builders](ProcessingContext& pc) mutable {
9595
auto outputs = pc.outputs();
9696
for (auto& builder : builders) {
9797
outputs.adopt(Output{builder.origin, builder.description, builder.version}, builder.materialize(pc));

Framework/Core/include/Framework/AnalysisHelpers.h

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -60,7 +60,11 @@ struct IndexRecord {
6060
};
6161

6262
struct IndexBuilder {
63-
static std::shared_ptr<arrow::Table> materialize(std::vector<std::shared_ptr<arrow::Table>>&& tables, std::vector<soa::IndexRecord> const& records, std::shared_ptr<arrow::Schema> const& schema, bool exclusive);
63+
static std::vector<framework::IndexColumnBuilderNG> makeBuilders(std::vector<std::shared_ptr<arrow::Table>>&& tables, std::vector<soa::IndexRecord> const& records);
64+
static void resetBuilders(std::vector<framework::IndexColumnBuilderNG>& builders, std::vector<std::shared_ptr<arrow::Table>>&& tables);
65+
66+
// static std::shared_ptr<arrow::Table> materialize(std::vector<std::shared_ptr<arrow::Table>>&& tables, std::vector<soa::IndexRecord> const& records, std::shared_ptr<arrow::Schema> const& schema, bool exclusive);
67+
static std::shared_ptr<arrow::Table> materializeNG(std::vector<framework::IndexColumnBuilderNG>& builders, std::vector<std::shared_ptr<arrow::Table>>&& tables, std::vector<soa::IndexRecord> const& records, std::shared_ptr<arrow::Schema> const& schema, bool exclusive);
6468
};
6569
} // namespace o2::soa
6670

@@ -161,7 +165,9 @@ struct Builder {
161165
header::DataDescription description;
162166
header::DataHeader::SubSpecificationType version;
163167

164-
std::shared_ptr<arrow::Table> materialize(ProcessingContext& pc) const;
168+
std::shared_ptr<std::vector<framework::IndexColumnBuilderNG>> builders = nullptr;
169+
170+
std::shared_ptr<arrow::Table> materialize(ProcessingContext& pc);
165171
};
166172
} // namespace o2::framework
167173

@@ -722,6 +728,8 @@ struct Builds : decltype(transformBase<T>()) {
722728

723729
std::vector<soa::IndexRecord> map = soa::getIndexMapping<metadata>();
724730

731+
std::vector<framework::IndexColumnBuilderNG> builders;
732+
725733
T* operator->()
726734
{
727735
return table.get();
@@ -744,7 +752,7 @@ struct Builds : decltype(transformBase<T>()) {
744752

745753
auto build(std::vector<std::shared_ptr<arrow::Table>>&& tables)
746754
{
747-
this->table = std::make_shared<T>(soa::IndexBuilder::materialize(std::forward<std::vector<std::shared_ptr<arrow::Table>>>(tables), map, outputSchema, metadata::exclusive));
755+
this->table = std::make_shared<T>(soa::IndexBuilder::materializeNG(builders, std::forward<std::vector<std::shared_ptr<arrow::Table>>>(tables), map, outputSchema, metadata::exclusive));
748756
return (this->table != nullptr);
749757
}
750758
};

Framework/Core/include/Framework/IndexBuilderHelpers.h

Lines changed: 136 additions & 125 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,6 @@
1414
#include <arrow/chunked_array.h>
1515
#include <arrow/builder.h>
1616
#include <arrow/memory_pool.h>
17-
#include <string>
1817
#include <memory>
1918

2019
namespace o2::soa
@@ -35,6 +34,7 @@ void cannotCreateIndexBuilder();
3534

3635
struct ChunkedArrayIterator {
3736
ChunkedArrayIterator(std::shared_ptr<arrow::ChunkedArray> source);
37+
void reset(std::shared_ptr<arrow::ChunkedArray>& source);
3838

3939
std::shared_ptr<arrow::ChunkedArray> mSource = nullptr;
4040
size_t mPosition = 0;
@@ -44,6 +44,7 @@ struct ChunkedArrayIterator {
4444
int const* mCurrent = nullptr;
4545
int const* mLast = nullptr;
4646
size_t mFirstIndex = 0;
47+
size_t mSourceSize = 0;
4748

4849
std::shared_ptr<arrow::Int32Array> getCurrentArray();
4950
void nextChunk();
@@ -53,160 +54,170 @@ struct ChunkedArrayIterator {
5354

5455
struct SelfBuilder {
5556
std::unique_ptr<arrow::ArrayBuilder> mBuilder = nullptr;
57+
std::unique_ptr<framework::ChunkedArrayIterator> keyIndex = nullptr;
5658
SelfBuilder(arrow::MemoryPool* pool);
59+
void reset(std::shared_ptr<arrow::ChunkedArray>);
60+
61+
inline bool find(int) const
62+
{
63+
return true;
64+
}
65+
void fill(int idx);
66+
std::shared_ptr<arrow::ChunkedArray> result() const;
5767
};
5868

59-
struct SingleBuilder {
60-
ChunkedArrayIterator arrayIterator;
69+
struct SingleBuilder : public ChunkedArrayIterator {
6170
std::unique_ptr<arrow::ArrayBuilder> mBuilder = nullptr;
6271
SingleBuilder(std::shared_ptr<arrow::ChunkedArray> source, arrow::MemoryPool* pool);
72+
void reset(std::shared_ptr<arrow::ChunkedArray> source);
73+
74+
bool find(int idx);
75+
void fill(int idx);
76+
std::shared_ptr<arrow::ChunkedArray> result() const;
6377
};
6478

65-
struct SliceBuilder {
66-
ChunkedArrayIterator arrayIterator;
79+
struct SliceBuilder : public ChunkedArrayIterator {
6780
arrow::ArrayBuilder* mValueBuilder = nullptr;
6881
std::unique_ptr<arrow::ArrayBuilder> mListBuilder = nullptr;
6982
std::shared_ptr<arrow::NumericArray<arrow::Int32Type>> mValues = nullptr;
7083
std::shared_ptr<arrow::NumericArray<arrow::Int64Type>> mCounts = nullptr;
84+
int mValuePos = 0;
7185
SliceBuilder(std::shared_ptr<arrow::ChunkedArray> source, arrow::MemoryPool* pool);
86+
void reset(std::shared_ptr<arrow::ChunkedArray> source);
87+
88+
bool find(int idx);
89+
void fill(int idx);
90+
std::shared_ptr<arrow::ChunkedArray> result() const;
7291

7392
arrow::Status preSlice();
7493
};
7594

76-
struct ArrayBuilder {
77-
ChunkedArrayIterator arrayIterator;
95+
struct ArrayBuilder : public ChunkedArrayIterator {
7896
arrow::ArrayBuilder* mValueBuilder = nullptr;
7997
std::vector<int> mValues;
8098
std::vector<std::vector<int>> mIndices;
8199
std::unique_ptr<arrow::ArrayBuilder> mListBuilder = nullptr;
82100
ArrayBuilder(std::shared_ptr<arrow::ChunkedArray> source, arrow::MemoryPool* pool);
101+
void reset(std::shared_ptr<arrow::ChunkedArray> source);
102+
103+
bool find(int idx);
104+
void fill(int idx);
105+
std::shared_ptr<arrow::ChunkedArray> result() const;
83106

84107
arrow::Status preFind();
85108
};
86109

87110
struct IndexColumnBuilderNG {
88111
std::variant<std::monostate, SelfBuilder, SingleBuilder, SliceBuilder, ArrayBuilder> builder;
89-
90-
IndexColumnBuilderNG(soa::IndexKind kind, arrow::MemoryPool* pool, std::shared_ptr<arrow::ChunkedArray> source = nullptr)
91-
{
92-
switch (kind) {
93-
case soa::IndexKind::IdxSelf:
94-
builder = SelfBuilder{pool};
95-
break;
96-
case soa::IndexKind::IdxSingle:
97-
builder = SingleBuilder{source, pool};
98-
break;
99-
case soa::IndexKind::IdxSlice:
100-
builder = SliceBuilder{source, pool};
101-
break;
102-
case soa::IndexKind::IdxArray:
103-
builder = ArrayBuilder{source, pool};
104-
break;
105-
default:
106-
cannotCreateIndexBuilder();
107-
}
108-
}
109-
};
110-
111-
struct SelfIndexColumnBuilder {
112-
SelfIndexColumnBuilder(const char* name, arrow::MemoryPool* pool);
113-
virtual ~SelfIndexColumnBuilder() = default;
114-
115-
inline std::shared_ptr<arrow::ChunkedArray> result() const
116-
{
117-
std::shared_ptr<arrow::Array> array;
118-
auto status = static_cast<arrow::Int32Builder*>(mBuilder.get())->Finish(&array);
119-
if (!status.ok()) {
120-
cannotBuildAnArray();
121-
}
122-
123-
return std::make_shared<arrow::ChunkedArray>(array);
124-
}
125-
126-
inline bool find(int)
127-
{
128-
return true;
129-
}
130-
131-
inline void fill(int idx)
132-
{
133-
(void)static_cast<arrow::Int32Builder*>(mBuilder.get())->Append(idx);
134-
}
135-
136-
std::string mColumnName;
137-
std::unique_ptr<arrow::ArrayBuilder> mBuilder = nullptr;
138-
};
139-
140-
class IndexColumnBuilder : public SelfIndexColumnBuilder, public ChunkedArrayIterator
141-
{
142-
public:
143-
IndexColumnBuilder(std::shared_ptr<arrow::ChunkedArray> source, const char* name, int listSize, arrow::MemoryPool* pool);
144-
~IndexColumnBuilder() override = default;
145-
146-
inline std::shared_ptr<arrow::ChunkedArray> result() const
147-
{
148-
if (mListSize == -1) {
149-
return resultMulti();
150-
} else if (mListSize == 2) {
151-
return resultSlice();
152-
} else {
153-
return resultSingle();
154-
}
155-
}
156-
157-
inline bool find(int idx)
158-
{
159-
if (mListSize == -1) {
160-
return findMulti(idx);
161-
} else if (mListSize == 2) {
162-
return findSlice(idx);
163-
} else {
164-
return findSingle(idx);
165-
}
166-
}
167-
168-
inline void fill(int idx)
169-
{
170-
++mResultSize;
171-
if (mListSize == -1) {
172-
fillMulti(idx);
173-
} else if (mListSize == 2) {
174-
fillSlice(idx);
175-
} else {
176-
fillSingle(idx);
177-
}
178-
}
179-
180-
private:
181-
arrow::Status preSlice();
182-
arrow::Status preFind();
183-
184-
bool findSingle(int idx);
185-
bool findSlice(int idx);
186-
bool findMulti(int idx);
187-
188-
void fillSingle(int idx);
189-
void fillSlice(int idx);
190-
void fillMulti(int idx);
191-
192-
std::shared_ptr<arrow::ChunkedArray> resultSingle() const;
193-
std::shared_ptr<arrow::ChunkedArray> resultSlice() const;
194-
std::shared_ptr<arrow::ChunkedArray> resultMulti() const;
195-
196-
int mListSize = 1;
197-
arrow::ArrayBuilder* mValueBuilder = nullptr;
198-
std::unique_ptr<arrow::ArrayBuilder> mListBuilder = nullptr;
199-
200-
size_t mSourceSize = 0;
201112
size_t mResultSize = 0;
113+
int mColumnPos = -1;
114+
IndexColumnBuilderNG(soa::IndexKind kind, int pos, arrow::MemoryPool* pool, std::shared_ptr<arrow::ChunkedArray> source = nullptr);
115+
void reset(std::shared_ptr<arrow::ChunkedArray> source = nullptr);
202116

203-
std::shared_ptr<arrow::NumericArray<arrow::Int32Type>> mValuesArrow = nullptr;
204-
std::shared_ptr<arrow::NumericArray<arrow::Int64Type>> mCounts = nullptr;
205-
std::vector<int> mValues;
206-
std::vector<std::vector<int>> mIndices;
207-
int mFillOffset = 0;
208-
int mValuePos = 0;
117+
bool find(int idx);
118+
void fill(int idx);
119+
std::shared_ptr<arrow::ChunkedArray> result() const;
209120
};
121+
122+
// struct SelfIndexColumnBuilder {
123+
// SelfIndexColumnBuilder(const char* name, arrow::MemoryPool* pool);
124+
// virtual ~SelfIndexColumnBuilder() = default;
125+
126+
// inline std::shared_ptr<arrow::ChunkedArray> result() const
127+
// {
128+
// std::shared_ptr<arrow::Array> array;
129+
// auto status = static_cast<arrow::Int32Builder*>(mBuilder.get())->Finish(&array);
130+
// if (!status.ok()) {
131+
// cannotBuildAnArray();
132+
// }
133+
134+
// return std::make_shared<arrow::ChunkedArray>(array);
135+
// }
136+
137+
// inline bool find(int)
138+
// {
139+
// return true;
140+
// }
141+
142+
// inline void fill(int idx)
143+
// {
144+
// (void)static_cast<arrow::Int32Builder*>(mBuilder.get())->Append(idx);
145+
// }
146+
147+
// std::string mColumnName;
148+
// std::unique_ptr<arrow::ArrayBuilder> mBuilder = nullptr;
149+
// };
150+
151+
// class IndexColumnBuilder : public SelfIndexColumnBuilder, public ChunkedArrayIterator
152+
// {
153+
// public:
154+
// IndexColumnBuilder(std::shared_ptr<arrow::ChunkedArray> source, const char* name, int listSize, arrow::MemoryPool* pool);
155+
// ~IndexColumnBuilder() override = default;
156+
157+
// inline std::shared_ptr<arrow::ChunkedArray> result() const
158+
// {
159+
// if (mListSize == -1) {
160+
// return resultMulti();
161+
// } else if (mListSize == 2) {
162+
// return resultSlice();
163+
// } else {
164+
// return resultSingle();
165+
// }
166+
// }
167+
168+
// inline bool find(int idx)
169+
// {
170+
// if (mListSize == -1) {
171+
// return findMulti(idx);
172+
// } else if (mListSize == 2) {
173+
// return findSlice(idx);
174+
// } else {
175+
// return findSingle(idx);
176+
// }
177+
// }
178+
179+
// inline void fill(int idx)
180+
// {
181+
// ++mResultSize;
182+
// if (mListSize == -1) {
183+
// fillMulti(idx);
184+
// } else if (mListSize == 2) {
185+
// fillSlice(idx);
186+
// } else {
187+
// fillSingle(idx);
188+
// }
189+
// }
190+
191+
// private:
192+
// arrow::Status preSlice();
193+
// arrow::Status preFind();
194+
195+
// bool findSingle(int idx);
196+
// bool findSlice(int idx);
197+
// bool findMulti(int idx);
198+
199+
// void fillSingle(int idx);
200+
// void fillSlice(int idx);
201+
// void fillMulti(int idx);
202+
203+
// std::shared_ptr<arrow::ChunkedArray> resultSingle() const;
204+
// std::shared_ptr<arrow::ChunkedArray> resultSlice() const;
205+
// std::shared_ptr<arrow::ChunkedArray> resultMulti() const;
206+
207+
// int mListSize = 1;
208+
// arrow::ArrayBuilder* mValueBuilder = nullptr;
209+
// std::unique_ptr<arrow::ArrayBuilder> mListBuilder = nullptr;
210+
211+
// size_t mSourceSize = 0;
212+
// size_t mResultSize = 0;
213+
214+
// std::shared_ptr<arrow::NumericArray<arrow::Int32Type>> mValuesArrow = nullptr;
215+
// std::shared_ptr<arrow::NumericArray<arrow::Int64Type>> mCounts = nullptr;
216+
// std::vector<int> mValues;
217+
// std::vector<std::vector<int>> mIndices;
218+
// int mFillOffset = 0;
219+
// int mValuePos = 0;
220+
// };
210221
} // namespace o2::framework
211222

212223
#endif // O2_FRAMEWORK_INDEXBUILDERHELPERS_H_

0 commit comments

Comments
 (0)