Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions src/paimon/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -355,6 +355,7 @@ if(PAIMON_BUILD_TESTS)
common/file_index/bsi/bit_slice_index_roaring_bitmap_test.cpp
common/file_index/bloomfilter/bloom_filter_file_index_test.cpp
common/file_index/bloomfilter/fast_hash_test.cpp
common/file_index/rangebitmap/range_bitmap_file_index_test.cpp
common/global_index/complete_index_score_batch_reader_test.cpp
common/global_index/global_index_result_test.cpp
common/global_index/global_indexer_factory_test.cpp
Expand Down
10 changes: 9 additions & 1 deletion src/paimon/common/file_index/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,15 @@ set(PAIMON_FILE_INDEX_SRC
bsi/bit_slice_index_roaring_bitmap.cpp
bloomfilter/bloom_filter_file_index.cpp
bloomfilter/bloom_filter_file_index_factory.cpp
bloomfilter/fast_hash.cpp)
bloomfilter/fast_hash.cpp
rangebitmap/range_bitmap_file_index.cpp
rangebitmap/range_bitmap_file_index_factory.cpp
rangebitmap/range_bitmap.cpp
rangebitmap/bit_slice_index_bitmap.cpp
rangebitmap/dictionary/chunked_dictionary.cpp
rangebitmap/dictionary/fixed_length_chunk.cpp
rangebitmap/dictionary/key_factory.cpp
rangebitmap/utils/literal_serialization_utils.cpp)

add_paimon_lib(paimon_file_index
SOURCES
Expand Down
49 changes: 47 additions & 2 deletions src/paimon/common/file_index/file_index_format_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,11 @@
#include "paimon/common/file_index/bloomfilter/bloom_filter_file_index.h"
#include "paimon/common/file_index/bsi/bit_slice_index_bitmap_file_index.h"
#include "paimon/common/file_index/empty/empty_file_index_reader.h"
#include "paimon/common/file_index/rangebitmap/range_bitmap.h"
#include "paimon/common/file_index/rangebitmap/range_bitmap_file_index.h"
#include "paimon/data/timestamp.h"
#include "paimon/defs.h"
#include "paimon/file_index/file_index_result.h"
#include "paimon/file_index/file_indexer_factory.h"
#include "paimon/fs/local/local_file_system.h"
#include "paimon/io/byte_array_input_stream.h"
#include "paimon/memory/memory_pool.h"
Expand Down Expand Up @@ -149,6 +151,50 @@ TEST_F(FileIndexFormatTest, TestSimple) {
}
}

// index file generated by paimon Java implementation
// type: int32
// data: 17,3,5,7,9,null,null,10
TEST_F(FileIndexFormatTest, TestRangeBitmapCompatibleWithJava) {
const auto schema = arrow::schema({arrow::field("data", arrow::int32())});
const auto index_file_bytes =
std::make_unique<std::vector<uint8_t>>(std::initializer_list<uint8_t>{
0, 5, 78, 78, 208, 26, 53, 174, 0, 0, 0, 1, 0, 0, 0, 56, 0, 0, 0,
1, 0, 4, 100, 97, 116, 97, 0, 0, 0, 1, 0, 12, 114, 97, 110, 103, 101, 45,
98, 105, 116, 109, 97, 112, 0, 0, 0, 56, 0, 0, 0, 210, 0, 0, 0, 0, 0,
0, 0, 21, 1, 0, 0, 0, 8, 0, 0, 0, 6, 0, 0, 0, 3, 0, 0, 0,
17, 0, 0, 0, 66, 0, 0, 0, 13, 1, 0, 0, 0, 1, 0, 0, 0, 4, 0,
0, 0, 25, 0, 0, 0, 0, 1, 0, 0, 0, 3, 0, 0, 0, 0, 0, 0, 0,
0, 0, 0, 0, 5, 0, 0, 0, 20, 0, 0, 0, 4, 0, 0, 0, 5, 0, 0,
0, 7, 0, 0, 0, 9, 0, 0, 0, 10, 0, 0, 0, 17, 0, 0, 0, 34, 1,
3, 0, 0, 0, 19, 0, 0, 0, 24, 0, 0, 0, 0, 0, 0, 0, 22, 0, 0,
0, 22, 0, 0, 0, 20, 0, 0, 0, 42, 0, 0, 0, 20, 59, 48, 0, 0, 1,
0, 0, 5, 0, 2, 0, 0, 0, 4, 0, 7, 0, 0, 0, 58, 48, 0, 0, 1,
0, 0, 0, 0, 0, 2, 0, 16, 0, 0, 0, 0, 0, 2, 0, 4, 0, 58, 48,
0, 0, 1, 0, 0, 0, 0, 0, 1, 0, 16, 0, 0, 0, 3, 0, 4, 0, 58,
48, 0, 0, 1, 0, 0, 0, 0, 0, 1, 0, 16, 0, 0, 0, 0, 0, 7, 0,
});
const auto input_stream = std::make_shared<ByteArrayInputStream>(
reinterpret_cast<char*>(index_file_bytes->data()), index_file_bytes->size());
ASSERT_OK_AND_ASSIGN(const auto reader, FileIndexFormat::CreateReader(input_stream, pool_));
ASSERT_OK_AND_ASSIGN(const auto index_file_readers,
reader->ReadColumnIndex("data", CreateArrowSchema(schema).get()));
ASSERT_EQ(1, index_file_readers.size());
auto* range_bitmap_reader =
dynamic_cast<RangeBitmapFileIndexReader*>(index_file_readers[0].get());
ASSERT_TRUE(range_bitmap_reader);

ASSERT_OK_AND_ASSIGN(const auto eq_result, range_bitmap_reader->VisitEqual(Literal(3)));
ASSERT_TRUE(eq_result);
ASSERT_EQ(eq_result->ToString(), "{1}");

ASSERT_OK_AND_ASSIGN(const auto lt_result, range_bitmap_reader->VisitLessThan(Literal(10)));
ASSERT_TRUE(lt_result);
ASSERT_EQ(lt_result->ToString(), "{1,2,3,4}");

ASSERT_OK_AND_ASSIGN(const auto gt_result, range_bitmap_reader->VisitIsNull());
ASSERT_EQ(gt_result->ToString(), "{5,6}");
}

// NOLINTNEXTLINE(google-readability-function-size)
TEST_F(FileIndexFormatTest, TestBitmapIndexWithTimestamp) {
auto schema = arrow::schema({
Expand Down Expand Up @@ -816,5 +862,4 @@ TEST_F(FileIndexFormatTest, TestBitmapIndexWithTimestamp) {
check_nano("ts_nano");
check_nano("ts_tz_nano");
}

} // namespace paimon::test
276 changes: 276 additions & 0 deletions src/paimon/common/file_index/rangebitmap/bit_slice_index_bitmap.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,276 @@
/*
* Copyright 2024-present Alibaba Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

#include "paimon/common/file_index/rangebitmap/bit_slice_index_bitmap.h"

#include <fmt/format.h>

#include <algorithm>
#include <cmath>
#include <string>

#include "paimon/common/io/memory_segment_output_stream.h"
#include "paimon/common/memory/memory_segment_utils.h"
#include "paimon/io/data_input_stream.h"
#include "paimon/result.h"
#include "paimon/status.h"

namespace paimon {

Result<std::unique_ptr<BitSliceIndexBitmap>> BitSliceIndexBitmap::Create(
const std::shared_ptr<MemoryPool>& pool, const std::shared_ptr<InputStream>& input_stream,
const int32_t offset) {
const auto data_in = std::make_unique<DataInputStream>(input_stream);
PAIMON_RETURN_NOT_OK(data_in->Seek(offset));
PAIMON_ASSIGN_OR_RAISE(const auto header_length, data_in->ReadValue<int32_t>());
PAIMON_ASSIGN_OR_RAISE(const auto version, data_in->ReadValue<int8_t>());
if (version != CURRENT_VERSION) {
return Status::Invalid("Unknown BitSliceBitmap Version");
}
PAIMON_ASSIGN_OR_RAISE(const auto slices_size, data_in->ReadValue<int8_t>());
auto slices = std::vector<std::unique_ptr<RoaringBitmap32>>();
slices.resize(slices_size);
PAIMON_ASSIGN_OR_RAISE(const auto ebm_size, data_in->ReadValue<int32_t>());
PAIMON_ASSIGN_OR_RAISE(const auto indexes_length, data_in->ReadValue<int32_t>());
auto indexes = Bytes::AllocateBytes(indexes_length, pool.get());
PAIMON_RETURN_NOT_OK(data_in->Read(indexes->data(), indexes_length));
auto body_offset = offset + sizeof(int32_t) + header_length;
return std::make_unique<BitSliceIndexBitmap>(pool, indexes_length, std::move(indexes), ebm_size,
slices_size, input_stream, body_offset);
}

int32_t NumberOfLeadingZeros(const int64_t value) {
if (value == 0) {
return 64;
}
return __builtin_clzll(static_cast<uint64_t>(value));
}

int32_t NumberOfTrailingZeros(const int64_t value) {
if (value == 0) {
return 64;
}
return __builtin_ctzll(static_cast<uint64_t>(value));
}

BitSliceIndexBitmap::BitSliceIndexBitmap(const std::shared_ptr<MemoryPool>& pool,
const int32_t indexes_length,
PAIMON_UNIQUE_PTR<Bytes> indexes, const int32_t ebm_length,
const int32_t slices_size,
const std::shared_ptr<InputStream>& input_stream,
const int32_t body_offset)
: pool_(pool),
initialized_(false),
bit_slices_(std::vector<std::optional<RoaringBitmap32>>(slices_size, {std::nullopt})),
ebm({std::nullopt}),
input_stream_(input_stream),
body_offset_(body_offset),
indexes_(std::move(indexes)),
ebm_length_(ebm_length),
indexes_length_(indexes_length) {}

Result<const RoaringBitmap32*> BitSliceIndexBitmap::GetEmtpyBitmap() {
if (!ebm.has_value()) {
PAIMON_RETURN_NOT_OK(input_stream_->Seek(body_offset_, FS_SEEK_SET));
const auto bytes = Bytes::AllocateBytes(ebm_length_, pool_.get());
PAIMON_RETURN_NOT_OK(input_stream_->Read(bytes->data(), ebm_length_));
RoaringBitmap32 bitmap;
PAIMON_RETURN_NOT_OK(bitmap.Deserialize(bytes->data(), ebm_length_));
ebm = bitmap;
}
return &ebm.value();
}

Result<const RoaringBitmap32*> BitSliceIndexBitmap::GetSliceBitmap(const int32_t idx) {
if (!bit_slices_[idx].has_value()) {
const auto data_in = std::make_unique<DataInputStream>(
std::make_shared<ByteArrayInputStream>(indexes_->data(), indexes_length_));
const int position = static_cast<int32_t>(2 * sizeof(int32_t) * idx);
PAIMON_RETURN_NOT_OK(data_in->Seek(position));
PAIMON_ASSIGN_OR_RAISE(const auto offset, data_in->ReadValue<int32_t>());
PAIMON_ASSIGN_OR_RAISE(const auto length, data_in->ReadValue<int32_t>());
PAIMON_RETURN_NOT_OK(input_stream_->Seek(body_offset_ + ebm_length_ + offset, FS_SEEK_SET));
RoaringBitmap32 bitmap;
const auto bytes = Bytes::AllocateBytes(length, pool_.get());
PAIMON_RETURN_NOT_OK(input_stream_->Read(bytes->data(), length));
PAIMON_RETURN_NOT_OK(bitmap.Deserialize(bytes->data(), length));
bit_slices_[idx] = bitmap;
}
return &bit_slices_[idx].value();
}

/// Batch load slices from start to end to avoid unnecessary IO
Status BitSliceIndexBitmap::LoadSlices(const int32_t start, const int32_t end) {
if (initialized_) {
return Status::OK();
}
auto indexes_stream = std::make_shared<ByteArrayInputStream>(indexes_->data(), indexes_length_);
const auto data_in = std::make_unique<DataInputStream>(indexes_stream);
const auto position = static_cast<int32_t>(2 * sizeof(int32_t) * start);
PAIMON_RETURN_NOT_OK(data_in->Seek(position));
PAIMON_ASSIGN_OR_RAISE(const auto offset, data_in->ReadValue<int32_t>());
PAIMON_ASSIGN_OR_RAISE(auto length, data_in->ReadValue<int32_t>());
std::vector<int32_t> lengths(end);
lengths[start] = length;

for (int32_t i = start + 1; i < end; ++i) {
PAIMON_RETURN_NOT_OK(data_in->ReadValue<int32_t>());
PAIMON_ASSIGN_OR_RAISE(const auto slice_length, data_in->ReadValue<int32_t>());
lengths[i] = slice_length;
length += slice_length;
}
PAIMON_RETURN_NOT_OK(input_stream_->Seek(body_offset_ + ebm_length_ + offset, FS_SEEK_SET));
const auto bytes = Bytes::AllocateBytes(length, pool_.get());
PAIMON_RETURN_NOT_OK(input_stream_->Read(bytes->data(), length));
int32_t byte_position = 0;
for (int32_t i = start; i < end; ++i) {
const int32_t slice_length = lengths[i];
RoaringBitmap32 bitmap;
PAIMON_RETURN_NOT_OK(bitmap.Deserialize(bytes->data() + byte_position, slice_length));
bit_slices_[i] = std::move(bitmap);
byte_position += slice_length;
}
initialized_ = true;
return Status::OK();
}

Result<RoaringBitmap32> BitSliceIndexBitmap::Eq(const int32_t code) {
PAIMON_ASSIGN_OR_RAISE(const auto empty_bitmap, GetEmtpyBitmap());
auto equal = RoaringBitmap32(*empty_bitmap);
for (int32_t i = static_cast<int32_t>(bit_slices_.size()) - 1; i >= 0; --i) {
PAIMON_ASSIGN_OR_RAISE(const auto slice_bitmap, GetSliceBitmap(i));
if ((code >> i & 1) == 1) {
equal &= *slice_bitmap;
} else {
equal -= *slice_bitmap;
}
}
return equal;
}

Result<RoaringBitmap32> BitSliceIndexBitmap::Gt(const int32_t code) {
if (code < 0) {
return IsNotNull({});
}
PAIMON_ASSIGN_OR_RAISE(const auto found_set, IsNotNull({}));
if (found_set.IsEmpty()) {
return RoaringBitmap32();
}
auto state = RoaringBitmap32{};
auto state_inited = false;
const auto start = NumberOfTrailingZeros(~code);
PAIMON_RETURN_NOT_OK(LoadSlices(start, static_cast<int32_t>(bit_slices_.size())));
for (int i = start; i < static_cast<int32_t>(bit_slices_.size()); ++i) {
if (!state_inited) {
PAIMON_ASSIGN_OR_RAISE(const auto slice_ptr, GetSliceBitmap(i));
state = *slice_ptr;
state_inited = true;
continue;
}
const auto bit = code >> i & 1;
PAIMON_ASSIGN_OR_RAISE(const auto slice_ptr, GetSliceBitmap(i));
if (bit == 1) {
state &= *slice_ptr;
} else {
state |= *slice_ptr;
}
}
if (!state_inited) {
return RoaringBitmap32();
}
return state &= found_set;
}

Result<RoaringBitmap32> BitSliceIndexBitmap::Gte(const int32_t code) {
return Gt(code - 1);
}

Result<RoaringBitmap32> BitSliceIndexBitmap::IsNotNull(const RoaringBitmap32& found_set) {
if (!ebm.has_value()) {
PAIMON_RETURN_NOT_OK(input_stream_->Seek(body_offset_, FS_SEEK_SET));
const auto bytes = Bytes::AllocateBytes(ebm_length_, pool_.get());
PAIMON_RETURN_NOT_OK(input_stream_->Read(bytes->data(), ebm_length_));
RoaringBitmap32 bitmap;
PAIMON_RETURN_NOT_OK(bitmap.Deserialize(bytes->data(), ebm_length_));
ebm = bitmap;
}
return found_set.IsEmpty() ? ebm.value() : RoaringBitmap32::And(ebm.value(), found_set);
}

BitSliceIndexBitmap::Appender::Appender(const std::shared_ptr<MemoryPool>& pool, const int32_t min,
const int32_t max)
: pool_(pool), min_(min), max_(max) {
ebm_ = RoaringBitmap32{};
const auto slices_size = std::max(64 - NumberOfLeadingZeros(max), 1);
slices_.resize(slices_size);
}

Status BitSliceIndexBitmap::Appender::Append(const int32_t key, const int32_t value) {
if (key < 0) {
return Status::Invalid(fmt::format("Invalid key: {}", key));
}
if (value < min_ || value > max_) {
return Status::Invalid(fmt::format("value not in range [{}, {}]", min_, max_));
}
int bits = value;
while (bits != 0) {
slices_[NumberOfTrailingZeros(bits)].Add(key);
bits &= (bits - 1);
}
ebm_.Add(key);
return Status::OK();
}

Result<PAIMON_UNIQUE_PTR<Bytes>> BitSliceIndexBitmap::Appender::Serialize() const {
const auto indexes_length = static_cast<int32_t>(2 * sizeof(int32_t) * slices_.size());
const auto ebm_bytes = ebm_.Serialize(pool_.get());
const auto ebm_length = static_cast<int32_t>(ebm_bytes->size());
int32_t header_size = 0;
header_size += sizeof(int8_t); // version
header_size += sizeof(int8_t); // slices size
header_size += sizeof(int32_t); // ebm length
header_size += sizeof(int32_t); // indexes length
header_size += indexes_length;
int32_t offset = 0;
const auto data_output_stream = std::make_unique<MemorySegmentOutputStream>(
MemorySegmentOutputStream::DEFAULT_SEGMENT_SIZE, pool_);
auto slices_bytes_vector = std::vector<PAIMON_UNIQUE_PTR<Bytes>>{};
auto indexes_vector = std::vector<std::pair<int32_t, int32_t>>{};
for (const auto& slice : slices_) {
auto slice_bytes = slice.Serialize(pool_.get());
const auto length = static_cast<int32_t>(slice_bytes->size());
indexes_vector.emplace_back(offset, length);
offset += length;
slices_bytes_vector.emplace_back(std::move(slice_bytes));
}
data_output_stream->WriteValue<int32_t>(header_size);
data_output_stream->WriteValue<int8_t>(CURRENT_VERSION);
data_output_stream->WriteValue<int8_t>(static_cast<int8_t>(slices_.size()));
data_output_stream->WriteValue<int32_t>(ebm_length);
data_output_stream->WriteValue<int32_t>(indexes_length);
for (const auto& [slice_offset, length] : indexes_vector) {
data_output_stream->WriteValue<int32_t>(slice_offset);
data_output_stream->WriteValue<int32_t>(length);
}
data_output_stream->Write(ebm_bytes->data(), ebm_length);
for (const auto& slice_bytes : slices_bytes_vector) {
data_output_stream->Write(slice_bytes->data(), slice_bytes->size());
}
return MemorySegmentUtils::CopyToBytes(data_output_stream->Segments(), 0,
static_cast<int32_t>(data_output_stream->CurrentSize()),
pool_.get());
}
} // namespace paimon
Loading