Skip to content

Commit d3bb3a9

Browse files
Eyizohalxy-9602lucasfang
authored
feat: Add global config API and optimize Parquet read thread conf (#68)
* feat: Add global config API and optimize Parquet read thread configuration - Add global_config.h/cpp for managing global configurations - Change Parquet read thread configuration from thread count to boolean switch - Update related test cases to adapt to the new configuration * fix review --------- Co-authored-by: lxy <38709059+lxy-9602@users.noreply.github.com> Co-authored-by: Yonghao Fang <yonghao.fyh@alibaba-inc.com>
1 parent 775bc8c commit d3bb3a9

7 files changed

Lines changed: 91 additions & 18 deletions

File tree

include/paimon/api.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
#include "paimon/file_store_commit.h" // IWYU pragma: export
2525
#include "paimon/file_store_write.h" // IWYU pragma: export
2626
#include "paimon/fs/file_system_factory.h" // IWYU pragma: export
27+
#include "paimon/global_config.h" // IWYU pragma: export
2728
#include "paimon/memory/memory_pool.h" // IWYU pragma: export
2829
#include "paimon/predicate/predicate.h" // IWYU pragma: export
2930
#include "paimon/read_context.h" // IWYU pragma: export

include/paimon/global_config.h

Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,45 @@
1+
/*
2+
* Copyright 2026-present Alibaba Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
#pragma once
18+
19+
#include "paimon/result.h"
20+
#include "paimon/visibility.h"
21+
22+
namespace paimon {
23+
24+
/// Get the capacity of the arrow's global thread pool
25+
/// This is a simple wrapper of arrow::GetCpuThreadPoolCapacity()
26+
///
27+
/// Return the number of worker threads in the thread pool to which
28+
/// Arrow dispatches various CPU-bound tasks. This is an ideal number,
29+
/// not necessarily the exact number of threads at a given point in time.
30+
///
31+
/// You can change this number using SetArrowCpuThreadPoolCapacity().
32+
PAIMON_EXPORT int GetArrowCpuThreadPoolCapacity();
33+
34+
/// Set the capacity of the arrow's global thread pool
35+
/// This is a simple wrapper of arrow::SetCpuThreadPoolCapacity()
36+
///
37+
/// Set the number of worker threads in the thread pool to which
38+
/// Arrow dispatches various CPU-bound tasks.
39+
///
40+
/// The current number is returned by GetArrowCpuThreadPoolCapacity().
41+
/// Currently, this capacity will significantly affect the performance
42+
/// of parquet file batch read.
43+
PAIMON_EXPORT Status SetArrowCpuThreadPoolCapacity(int threads);
44+
45+
} // namespace paimon

src/paimon/CMakeLists.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,7 @@ set(PAIMON_COMMON_SRCS
4646
common/fs/file_system.cpp
4747
common/fs/resolving_file_system.cpp
4848
common/fs/file_system_factory.cpp
49+
common/global_config.cpp
4950
common/global_index/complete_index_score_batch_reader.cpp
5051
common/global_index/bitmap_vector_search_global_index_result.cpp
5152
common/global_index/bitmap_global_index_result.cpp
Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,31 @@
1+
/*
2+
* Copyright 2026-present Alibaba Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
#include "paimon/global_config.h"
18+
19+
#include "arrow/util/thread_pool.h"
20+
#include "paimon/common/utils/arrow/status_utils.h"
21+
22+
namespace paimon {
23+
PAIMON_EXPORT int GetArrowCpuThreadPoolCapacity() {
24+
return arrow::GetCpuThreadPoolCapacity();
25+
}
26+
27+
PAIMON_EXPORT Status SetArrowCpuThreadPoolCapacity(int threads) {
28+
PAIMON_RETURN_NOT_OK_FROM_ARROW(arrow::SetCpuThreadPoolCapacity(threads));
29+
return Status::OK();
30+
}
31+
} // namespace paimon

src/paimon/format/parquet/parquet_file_batch_reader.cpp

Lines changed: 4 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,6 @@
3333
#include "arrow/record_batch.h"
3434
#include "arrow/type.h"
3535
#include "arrow/util/range.h"
36-
#include "arrow/util/thread_pool.h"
3736
#include "fmt/format.h"
3837
#include "paimon/common/metrics/metrics_impl.h"
3938
#include "paimon/common/utils/arrow/status_utils.h"
@@ -265,21 +264,15 @@ Result<::parquet::ReaderProperties> ParquetFileBatchReader::CreateReaderProperti
265264
Result<::parquet::ArrowReaderProperties> ParquetFileBatchReader::CreateArrowReaderProperties(
266265
const std::shared_ptr<arrow::MemoryPool>& pool,
267266
const std::map<std::string, std::string>& options, int32_t batch_size) {
268-
PAIMON_ASSIGN_OR_RAISE(
269-
uint32_t executor_thread_count,
270-
OptionsUtils::GetValueFromMap<uint32_t>(options, PARQUET_READ_EXECUTOR_THREAD_COUNT,
271-
DEFAULT_PARQUET_READ_EXECUTOR_THREAD_COUNT));
267+
PAIMON_ASSIGN_OR_RAISE(bool use_threads,
268+
OptionsUtils::GetValueFromMap<bool>(options, PARQUET_READ_USE_THREADS,
269+
DEFAULT_PARQUET_READ_USE_THREADS));
272270

273271
::parquet::ArrowReaderProperties arrow_reader_props;
274272
// TODO(jinli.zjw): set more ArrowReaderProperties (compare with java)
275273
arrow_reader_props.set_pre_buffer(true);
276274
arrow_reader_props.set_batch_size(static_cast<int64_t>(batch_size));
277-
if (executor_thread_count != 0) {
278-
PAIMON_RETURN_NOT_OK_FROM_ARROW(arrow::SetCpuThreadPoolCapacity(executor_thread_count));
279-
arrow_reader_props.set_use_threads(true);
280-
} else {
281-
arrow_reader_props.set_use_threads(false);
282-
}
275+
arrow_reader_props.set_use_threads(use_threads);
283276
PAIMON_ASSIGN_OR_RAISE(bool cache_lazy, OptionsUtils::GetValueFromMap<bool>(
284277
options, PARQUET_READ_CACHE_OPTION_LAZY, false));
285278
PAIMON_ASSIGN_OR_RAISE(

src/paimon/format/parquet/parquet_file_batch_reader_test.cpp

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@
4141
#include "paimon/format/parquet/parquet_input_stream_impl.h"
4242
#include "paimon/fs/file_system.h"
4343
#include "paimon/fs/local/local_file_system.h"
44+
#include "paimon/global_config.h"
4445
#include "paimon/memory/memory_pool.h"
4546
#include "paimon/predicate/literal.h"
4647
#include "paimon/predicate/predicate_builder.h"
@@ -394,25 +395,27 @@ TEST_F(ParquetFileBatchReaderTest, TestCreateArrowReaderProperties) {
394395
ASSERT_EQ(arrow_reader_properties.pre_buffer(), true);
395396
ASSERT_EQ(arrow_reader_properties.batch_size(), 1024);
396397
ASSERT_EQ(arrow_reader_properties.use_threads(), true);
397-
ASSERT_EQ(arrow::GetCpuThreadPoolCapacity(), 3);
398398
ASSERT_EQ(arrow_reader_properties.cache_options(), arrow::io::CacheOptions::Defaults());
399399
}
400400
{
401-
std::map<std::string, std::string> options = {{PARQUET_READ_EXECUTOR_THREAD_COUNT, "0"}};
401+
std::map<std::string, std::string> options = {{PARQUET_READ_USE_THREADS, "false"}};
402402
int32_t batch_size = 1024;
403403
ASSERT_OK_AND_ASSIGN(
404404
auto arrow_reader_properties,
405405
ParquetFileBatchReader::CreateArrowReaderProperties(pool_, options, batch_size));
406406
ASSERT_EQ(arrow_reader_properties.use_threads(), false);
407407
}
408408
{
409-
std::map<std::string, std::string> options = {{PARQUET_READ_EXECUTOR_THREAD_COUNT, "6"}};
409+
int original_capacity = GetArrowCpuThreadPoolCapacity();
410+
ASSERT_OK(SetArrowCpuThreadPoolCapacity(6));
411+
std::map<std::string, std::string> options = {{PARQUET_READ_USE_THREADS, "true"}};
410412
int32_t batch_size = 1024;
411413
ASSERT_OK_AND_ASSIGN(
412414
auto arrow_reader_properties,
413415
ParquetFileBatchReader::CreateArrowReaderProperties(pool_, options, batch_size));
414416
ASSERT_EQ(arrow_reader_properties.use_threads(), true);
415-
ASSERT_EQ(arrow::GetCpuThreadPoolCapacity(), 6);
417+
ASSERT_EQ(GetArrowCpuThreadPoolCapacity(), 6);
418+
ASSERT_OK(SetArrowCpuThreadPoolCapacity(original_capacity));
416419
}
417420
}
418421

src/paimon/format/parquet/parquet_format_defs.h

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -36,9 +36,8 @@ static inline const char PARQUET_COMPRESSION_CODEC_ZLIB_LEVEL[] = "zlib.compress
3636
static inline const char PARQUET_COMPRESSION_CODEC_BROTLI_LEVEL[] = "compression.brotli.quality";
3737

3838
// read
39-
static inline const char PARQUET_READ_EXECUTOR_THREAD_COUNT[] =
40-
"parquet.read.executor.thread-count";
41-
static constexpr uint32_t DEFAULT_PARQUET_READ_EXECUTOR_THREAD_COUNT = 3;
39+
static inline const char PARQUET_READ_USE_THREADS[] = "parquet.read.use-threads";
40+
static inline const bool DEFAULT_PARQUET_READ_USE_THREADS = true;
4241
static inline const char PARQUET_READ_CACHE_OPTION_LAZY[] = "parquet.read.cache-option.lazy";
4342
static inline const char PARQUET_READ_CACHE_OPTION_PREFETCH_LIMIT[] =
4443
"parquet.read.cache-option.prefetch-limit";

0 commit comments

Comments
 (0)