diff --git a/be/src/cloud/cloud_internal_service.cpp b/be/src/cloud/cloud_internal_service.cpp index 99b08a9bf38564..5f336573894ee9 100644 --- a/be/src/cloud/cloud_internal_service.cpp +++ b/be/src/cloud/cloud_internal_service.cpp @@ -579,11 +579,13 @@ void CloudInternalServiceImpl::warm_up_rowset(google::protobuf::RpcController* c .expiration_time = expiration_time, .is_dryrun = config::enable_reader_dryrun_when_download_file_cache, .is_warmup = true}, - .download_done = [=, version = rs_meta.version()](Status st) { - handle_segment_download_done(st, tablet_id, rowset_id, segment_id, - tablet, wait, version, segment_size, - request_ts, handle_ts); - }}; + .download_done = + [=, version = rs_meta.version()](Status st) { + handle_segment_download_done( + st, tablet_id, rowset_id, segment_id, tablet, wait, + version, segment_size, request_ts, handle_ts); + }, + .tablet_id = tablet_id}; g_file_cache_event_driven_warm_up_submitted_segment_num << 1; g_file_cache_event_driven_warm_up_submitted_segment_size << segment_size; @@ -604,11 +606,13 @@ void CloudInternalServiceImpl::warm_up_rowset(google::protobuf::RpcController* c .expiration_time = expiration_time, .is_dryrun = config::enable_reader_dryrun_when_download_file_cache, .is_warmup = true}, - .download_done = [=, version = rs_meta.version()](Status st) { - handle_inverted_index_download_done( - st, tablet_id, rowset_id, segment_id, index_path, tablet, wait, - version, idx_size, request_ts, handle_ts); - }}; + .download_done = + [=, version = rs_meta.version()](Status st) { + handle_inverted_index_download_done( + st, tablet_id, rowset_id, segment_id, index_path, + tablet, wait, version, idx_size, request_ts, handle_ts); + }, + .tablet_id = tablet_id}; g_file_cache_event_driven_warm_up_submitted_index_num << 1; g_file_cache_event_driven_warm_up_submitted_index_size << idx_size; tablet->update_rowset_warmup_state_inverted_idx_num( diff --git a/be/src/cloud/cloud_tablet.cpp b/be/src/cloud/cloud_tablet.cpp index 2567019301cff5..f7be421e442867 100644 --- a/be/src/cloud/cloud_tablet.cpp +++ b/be/src/cloud/cloud_tablet.cpp @@ -1718,6 +1718,7 @@ void CloudTablet::_submit_segment_download_task(const RowsetSharedPtr& rs, LOG_WARNING("add rowset warm up error ").error(st); } }}, + .tablet_id = _tablet_meta->tablet_id(), }); // clang-format on } @@ -1760,6 +1761,7 @@ void CloudTablet::_submit_inverted_index_download_task(const RowsetSharedPtr& rs LOG_WARNING("add rowset warm up error ").error(st); } }}, + .tablet_id = _tablet_meta->tablet_id(), }; self->update_rowset_warmup_state_inverted_idx_num_unlocked(WarmUpTriggerSource::SYNC_ROWSET, rowset_meta->rowset_id(), 1); _engine.file_cache_block_downloader().submit_download_task(std::move(meta)); diff --git a/be/src/cloud/cloud_warm_up_manager.cpp b/be/src/cloud/cloud_warm_up_manager.cpp index 9df39e8c305b67..6383850d48749d 100644 --- a/be/src/cloud/cloud_warm_up_manager.cpp +++ b/be/src/cloud/cloud_warm_up_manager.cpp @@ -129,7 +129,8 @@ void CloudWarmUpManager::submit_download_tasks(io::Path path, int64_t file_size, io::FileSystemSPtr file_system, int64_t expiration_time, std::shared_ptr wait, - bool is_index, std::function done_cb) { + bool is_index, std::function done_cb, + int64_t tablet_id) { VLOG_DEBUG << "submit warm up task for file: " << path << ", file_size: " << file_size << ", expiration_time: " << expiration_time << ", is_index: " << (is_index ? "true" : "false"); @@ -184,6 +185,7 @@ void CloudWarmUpManager::submit_download_tasks(io::Path path, int64_t file_size, } wait->signal(); }, + .tablet_id = tablet_id, }); offset += current_chunk_size; @@ -256,7 +258,8 @@ void CloudWarmUpManager::handle_jobs() { submit_download_tasks( storage_resource.value()->remote_segment_path(*rs, seg_id), rs->segment_file_size(cast_set(seg_id)), rs->fs(), - expiration_time, wait, false, [tablet, rs, seg_id](Status st) { + expiration_time, wait, false, + [tablet, rs, seg_id](Status st) { VLOG_DEBUG << "warmup rowset " << rs->version() << " segment " << seg_id << " completed"; if (tablet->complete_rowset_segment_warmup( @@ -266,7 +269,8 @@ void CloudWarmUpManager::handle_jobs() { VLOG_DEBUG << "warmup rowset " << rs->version() << " completed"; } - }); + }, + tablet_id); } // 2nd. download inverted index files @@ -313,7 +317,8 @@ void CloudWarmUpManager::handle_jobs() { VLOG_DEBUG << "warmup rowset " << rs->version() << " completed"; } - }); + }, + tablet_id); } } else { if (schema_ptr->has_inverted_index() || schema_ptr->has_ann_index()) { @@ -336,7 +341,8 @@ void CloudWarmUpManager::handle_jobs() { VLOG_DEBUG << "warmup rowset " << rs->version() << " completed"; } - }); + }, + tablet_id); } } } diff --git a/be/src/cloud/cloud_warm_up_manager.h b/be/src/cloud/cloud_warm_up_manager.h index 47411f657b899a..992702f162e0a1 100644 --- a/be/src/cloud/cloud_warm_up_manager.h +++ b/be/src/cloud/cloud_warm_up_manager.h @@ -114,7 +114,8 @@ class CloudWarmUpManager { void submit_download_tasks(io::Path path, int64_t file_size, io::FileSystemSPtr file_system, int64_t expiration_time, std::shared_ptr wait, bool is_index = false, - std::function done_cb = nullptr); + std::function done_cb = nullptr, + int64_t tablet_id = -1); std::mutex _mtx; std::condition_variable _cond; int64_t _cur_job_id {0}; diff --git a/be/src/io/cache/block_file_cache_downloader.cpp b/be/src/io/cache/block_file_cache_downloader.cpp index ae5fff244da375..1c4ca8577f64f7 100644 --- a/be/src/io/cache/block_file_cache_downloader.cpp +++ b/be/src/io/cache/block_file_cache_downloader.cpp @@ -286,6 +286,7 @@ void FileCacheBlockDownloader::download_file_cache_block( .is_warmup = true, }, .download_done = std::move(download_done), + .tablet_id = meta.tablet_id(), }; download_segment_file(download_meta); }); @@ -300,6 +301,7 @@ void FileCacheBlockDownloader::download_segment_file(const DownloadFileMeta& met .is_doris_table = true, .cache_base_path {}, .file_size = meta.file_size, + .tablet_id = meta.tablet_id, }; auto st = meta.file_system->open_file(meta.path, &file_reader, &opts); if (!st.ok()) { diff --git a/be/src/io/cache/block_file_cache_downloader.h b/be/src/io/cache/block_file_cache_downloader.h index c9a4689167363f..29d55a4c6ccd34 100644 --- a/be/src/io/cache/block_file_cache_downloader.h +++ b/be/src/io/cache/block_file_cache_downloader.h @@ -44,6 +44,7 @@ struct DownloadFileMeta { io::FileSystemSPtr file_system; IOContext ctx; std::function download_done; + int64_t tablet_id {-1}; }; struct DownloadTask { diff --git a/be/src/io/cache/cached_remote_file_reader.cpp b/be/src/io/cache/cached_remote_file_reader.cpp index 083d4a5e3f5d05..ee5f4ceadbd79d 100644 --- a/be/src/io/cache/cached_remote_file_reader.cpp +++ b/be/src/io/cache/cached_remote_file_reader.cpp @@ -55,7 +55,6 @@ #include "runtime/thread_context.h" #include "runtime/workload_management/io_throttle.h" #include "service/backend_options.h" -#include "storage/storage_policy.h" #include "util/bit_util.h" #include "util/brpc_client_cache.h" // BrpcClientCache #include "util/client_cache.h" @@ -93,7 +92,8 @@ bvar::Adder g_failed_get_peer_addr_counter( CachedRemoteFileReader::CachedRemoteFileReader(FileReaderSPtr remote_file_reader, const FileReaderOptions& opts) - : _remote_file_reader(std::move(remote_file_reader)) { + : _tablet_id(opts.tablet_id), _remote_file_reader(std::move(remote_file_reader)) { + DCHECK(!opts.is_doris_table || _tablet_id > 0); _is_doris_table = opts.is_doris_table; if (_is_doris_table) { _cache_hash = BlockFileCache::hash(path().filename().native()); @@ -159,28 +159,30 @@ std::pair CachedRemoteFileReader::s_align_size(size_t offset, si } namespace { -std::optional extract_tablet_id(const std::string& file_path) { - return StorageResource::parse_tablet_id_from_path(file_path); +// Execute S3 read +Status execute_s3_read(size_t empty_start, size_t& size, std::unique_ptr& buffer, + ReadStatistics& stats, const IOContext* io_ctx, + FileReaderSPtr remote_file_reader) { + s3_read_counter << 1; + SCOPED_RAW_TIMER(&stats.remote_read_timer); + stats.from_peer_cache = false; + return remote_file_reader->read_at(empty_start, Slice(buffer.get(), size), &size, io_ctx); } // Get peer connection info from tablet_id -std::pair get_peer_connection_info(const std::string& file_path) { +std::pair get_peer_connection_info(int64_t tablet_id, + const std::string& file_path) { std::string host = ""; int port = 0; - // Try to get tablet_id from actual path and lookup tablet info - if (auto tablet_id = extract_tablet_id(file_path)) { - auto& manager = ExecEnv::GetInstance()->storage_engine().to_cloud().cloud_warm_up_manager(); - if (auto tablet_info = manager.get_balanced_tablet_info(*tablet_id)) { - host = tablet_info->first; - port = tablet_info->second; - } else { - VLOG_DEBUG << "get peer connection info not found" - << ", tablet_id=" << *tablet_id << ", file_path=" << file_path; - } + DCHECK(tablet_id > 0); + auto& manager = ExecEnv::GetInstance()->storage_engine().to_cloud().cloud_warm_up_manager(); + if (auto tablet_info = manager.get_balanced_tablet_info(tablet_id)) { + host = tablet_info->first; + port = tablet_info->second; } else { - VLOG_DEBUG << "parse tablet id from path failed" - << "tablet_id=null, file_path=" << file_path; + LOG_EVERY_N(WARNING, 100) << "get peer connection info not found" + << ", tablet_id=" << tablet_id << ", file_path=" << file_path; } DBUG_EXECUTE_IF("PeerFileCacheReader::_fetch_from_peer_cache_blocks", { @@ -200,8 +202,8 @@ std::pair get_peer_connection_info(const std::string& file_pat Status execute_peer_read(const std::vector& empty_blocks, size_t empty_start, size_t& size, std::unique_ptr& buffer, const std::string& file_path, size_t file_size, bool is_doris_table, - ReadStatistics& stats, const IOContext* io_ctx) { - auto [host, port] = get_peer_connection_info(file_path); + int64_t tablet_id, ReadStatistics& stats, const IOContext* io_ctx) { + auto [host, port] = get_peer_connection_info(tablet_id, file_path); VLOG_DEBUG << "PeerFileCacheReader read from peer, host=" << host << ", port=" << port << ", file_path=" << file_path; @@ -224,16 +226,6 @@ Status execute_peer_read(const std::vector& empty_blocks, size_t return st; } -// Execute S3 read -Status execute_s3_read(size_t empty_start, size_t& size, std::unique_ptr& buffer, - ReadStatistics& stats, const IOContext* io_ctx, - FileReaderSPtr remote_file_reader) { - s3_read_counter << 1; - SCOPED_RAW_TIMER(&stats.remote_read_timer); - stats.from_peer_cache = false; - return remote_file_reader->read_at(empty_start, Slice(buffer.get(), size), &size, io_ctx); -} - } // anonymous namespace Status CachedRemoteFileReader::_execute_remote_read(const std::vector& empty_blocks, @@ -255,7 +247,7 @@ Status CachedRemoteFileReader::_execute_remote_read(const std::vectorsize(), _is_doris_table, stats, io_ctx); + this->size(), _is_doris_table, _tablet_id, stats, io_ctx); } }); @@ -269,7 +261,7 @@ Status CachedRemoteFileReader::_execute_remote_read(const std::vectorsize(), _is_doris_table, stats, io_ctx); + this->size(), _is_doris_table, _tablet_id, stats, io_ctx); if (!st.ok()) { // Restore original size for S3 fallback, as peer read may have modified it size = original_size; @@ -386,8 +378,7 @@ Status CachedRemoteFileReader::read_at_impl(size_t offset, Slice result, size_t* s_align_size(offset + already_read, bytes_req - already_read, size()); CacheContext cache_context(io_ctx); cache_context.stats = &stats; - auto tablet_id = get_tablet_id(path().string()); - cache_context.tablet_id = tablet_id.value_or(0); + cache_context.tablet_id = _tablet_id; MonotonicStopWatch sw; sw.start(); diff --git a/be/src/io/cache/cached_remote_file_reader.h b/be/src/io/cache/cached_remote_file_reader.h index a0037d42c64c19..3f2e1ceb2e1395 100644 --- a/be/src/io/cache/cached_remote_file_reader.h +++ b/be/src/io/cache/cached_remote_file_reader.h @@ -85,7 +85,8 @@ class CachedRemoteFileReader final : public FileReader, void _update_stats(const ReadStatistics& stats, FileCacheStatistics* state, bool is_inverted_index) const; - bool _is_doris_table; + bool _is_doris_table = false; + int64_t _tablet_id = -1; FileReaderSPtr _remote_file_reader; UInt128Wrapper _cache_hash; BlockFileCache* _cache; diff --git a/be/src/io/tools/file_cache_microbench.cpp b/be/src/io/tools/file_cache_microbench.cpp index 50d0eb70f7b6a0..3725083b8356be 100644 --- a/be/src/io/tools/file_cache_microbench.cpp +++ b/be/src/io/tools/file_cache_microbench.cpp @@ -1496,6 +1496,7 @@ class JobManager { doris::io::FileReaderOptions reader_opts; reader_opts.cache_type = doris::io::FileCachePolicy::FILE_BLOCK_CACHE; reader_opts.is_doris_table = true; + reader_opts.tablet_id = 1; // microbench placeholder doris::io::FileDescription fd; std::string obj_path = "s3://" + doris::config::test_s3_bucket + "/"; diff --git a/be/src/storage/compaction/collection_statistics.cpp b/be/src/storage/compaction/collection_statistics.cpp index 16decc2b15f12b..afd79f0c0f3b5b 100644 --- a/be/src/storage/compaction/collection_statistics.cpp +++ b/be/src/storage/compaction/collection_statistics.cpp @@ -165,7 +165,7 @@ Status CollectionStatistics::process_segment(const RowsetSharedPtr& rowset, int3 rowset_meta->fs(), std::string {InvertedIndexDescriptor::get_index_file_path_prefix(seg_path)}, tablet_schema->get_inverted_index_storage_format(), - rowset_meta->inverted_index_file_info(seg_id)); + rowset_meta->inverted_index_file_info(seg_id), rowset_meta->tablet_id()); RETURN_IF_ERROR(idx_file_reader->init(config::inverted_index_read_buffer_size, io_ctx)); int32_t total_seg_num_docs = 0; diff --git a/be/src/storage/compaction/compaction.cpp b/be/src/storage/compaction/compaction.cpp index 698c81f7849fd0..322530cbb66d36 100644 --- a/be/src/storage/compaction/compaction.cpp +++ b/be/src/storage/compaction/compaction.cpp @@ -819,7 +819,7 @@ Status Compaction::do_inverted_index_compaction() { fs, std::string {InvertedIndexDescriptor::get_index_file_path_prefix(seg_path.value())}, _cur_tablet_schema->get_inverted_index_storage_format(), - rowset->rowset_meta()->inverted_index_file_info(seg_id)); + rowset->rowset_meta()->inverted_index_file_info(seg_id), _tablet->tablet_id()); auto st = index_file_reader->init(config::inverted_index_read_buffer_size); DBUG_EXECUTE_IF("Compaction::do_inverted_index_compaction_init_inverted_index_file_reader", { @@ -1020,7 +1020,7 @@ static bool check_rowset_has_inverted_index(const RowsetSharedPtr& src_rs, int32 std::string {InvertedIndexDescriptor::get_index_file_path_prefix( seg_path.value())}, cur_tablet_schema->get_inverted_index_storage_format(), - rowset->rowset_meta()->inverted_index_file_info(i)); + rowset->rowset_meta()->inverted_index_file_info(i), tablet->tablet_id()); auto st = index_file_reader->init(config::inverted_index_read_buffer_size); index_file_path = index_file_reader->get_index_file_path(index_meta); DBUG_EXECUTE_IF( diff --git a/be/src/storage/index/index_file_reader.cpp b/be/src/storage/index/index_file_reader.cpp index e4ed32fdc30669..348e1399421e5a 100644 --- a/be/src/storage/index/index_file_reader.cpp +++ b/be/src/storage/index/index_file_reader.cpp @@ -63,8 +63,9 @@ Status IndexFileReader::_init_from(int32_t read_buffer_size, const io::IOContext DCHECK(_fs != nullptr) << "file system is nullptr, index_file_full_path: " << index_file_full_path; // 2. open file - auto ok = DorisFSDirectory::FSIndexInput::open( - _fs, index_file_full_path.c_str(), index_input, err, read_buffer_size, file_size); + auto ok = + DorisFSDirectory::FSIndexInput::open(_fs, index_file_full_path.c_str(), index_input, + err, read_buffer_size, file_size, _tablet_id); if (!ok) { if (err.number() == CL_ERR_FileNotFound) { return Status::Error( @@ -182,8 +183,9 @@ Result> IndexFileReader:: DCHECK(_fs != nullptr) << "file system is nullptr, index_file_path: " << index_file_path; // 2. open file - auto ok = DorisFSDirectory::FSIndexInput::open( - _fs, index_file_path.c_str(), index_input, err, _read_buffer_size, file_size); + auto ok = DorisFSDirectory::FSIndexInput::open(_fs, index_file_path.c_str(), + index_input, err, _read_buffer_size, + file_size, _tablet_id); if (!ok) { // now index_input = nullptr if (err.number() == CL_ERR_FileNotFound) { diff --git a/be/src/storage/index/index_file_reader.h b/be/src/storage/index/index_file_reader.h index 8a185f6b8b7577..72617446580b49 100644 --- a/be/src/storage/index/index_file_reader.h +++ b/be/src/storage/index/index_file_reader.h @@ -52,11 +52,13 @@ class IndexFileReader { IndexFileReader(io::FileSystemSPtr fs, std::string index_path_prefix, InvertedIndexStorageFormatPB storage_format, - InvertedIndexFileInfo idx_file_info = InvertedIndexFileInfo()) + InvertedIndexFileInfo idx_file_info = InvertedIndexFileInfo(), + int64_t tablet_id = -1) : _fs(std::move(fs)), _index_path_prefix(std::move(index_path_prefix)), _storage_format(storage_format), - _idx_file_info(idx_file_info) {} + _idx_file_info(idx_file_info), + _tablet_id(tablet_id) {} virtual ~IndexFileReader() = default; MOCK_FUNCTION Status init(int32_t read_buffer_size = config::inverted_index_read_buffer_size, @@ -90,6 +92,7 @@ class IndexFileReader { mutable std::shared_mutex _mutex; // Use mutable for const read operations bool _inited = false; InvertedIndexFileInfo _idx_file_info; + int64_t _tablet_id = -1; }; } // namespace segment_v2 diff --git a/be/src/storage/index/index_file_writer.cpp b/be/src/storage/index/index_file_writer.cpp index 9acfadf417246c..2d9c40144187de 100644 --- a/be/src/storage/index/index_file_writer.cpp +++ b/be/src/storage/index/index_file_writer.cpp @@ -41,7 +41,8 @@ namespace doris::segment_v2 { IndexFileWriter::IndexFileWriter(io::FileSystemSPtr fs, std::string index_path_prefix, std::string rowset_id, int64_t seg_id, InvertedIndexStorageFormatPB storage_format, - io::FileWriterPtr file_writer, bool can_use_ram_dir) + io::FileWriterPtr file_writer, bool can_use_ram_dir, + int64_t tablet_id) : _fs(std::move(fs)), _index_path_prefix(std::move(index_path_prefix)), _rowset_id(std::move(rowset_id)), @@ -49,7 +50,8 @@ IndexFileWriter::IndexFileWriter(io::FileSystemSPtr fs, std::string index_path_p _storage_format(storage_format), _local_fs(io::global_local_filesystem()), _idx_v2_writer(std::move(file_writer)), - _can_use_ram_dir(can_use_ram_dir) { + _can_use_ram_dir(can_use_ram_dir), + _tablet_id(tablet_id) { auto tmp_file_dir = ExecEnv::GetInstance()->get_tmp_file_dirs()->get_tmp_file_dir(); _tmp_dir = tmp_file_dir.native(); if (_storage_format == InvertedIndexStorageFormatPB::V1) { @@ -121,8 +123,8 @@ Status IndexFileWriter::delete_index(const TabletIndex* index_meta) { } Status IndexFileWriter::add_into_searcher_cache() { - auto index_file_reader = - std::make_unique(_fs, _index_path_prefix, _storage_format); + auto index_file_reader = std::make_unique( + _fs, _index_path_prefix, _storage_format, InvertedIndexFileInfo(), _tablet_id); auto st = index_file_reader->init(); if (!st.ok()) { if (dynamic_cast(_idx_v2_writer.get()) != nullptr) { diff --git a/be/src/storage/index/index_file_writer.h b/be/src/storage/index/index_file_writer.h index d968dc5fa12a7e..a303de8b68c156 100644 --- a/be/src/storage/index/index_file_writer.h +++ b/be/src/storage/index/index_file_writer.h @@ -50,7 +50,8 @@ class IndexFileWriter { public: IndexFileWriter(io::FileSystemSPtr fs, std::string index_path_prefix, std::string rowset_id, int64_t seg_id, InvertedIndexStorageFormatPB storage_format, - io::FileWriterPtr file_writer = nullptr, bool can_use_ram_dir = true); + io::FileWriterPtr file_writer = nullptr, bool can_use_ram_dir = true, + int64_t tablet_id = -1); virtual ~IndexFileWriter() = default; MOCK_FUNCTION Result> open(const TabletIndex* index_meta); @@ -111,6 +112,7 @@ class IndexFileWriter { bool _can_use_ram_dir = true; IndexStorageFormatPtr _index_storage_format; + int64_t _tablet_id = -1; friend class IndexStorageFormatV1; friend class IndexStorageFormatV2; diff --git a/be/src/storage/index/inverted/inverted_index_fs_directory.cpp b/be/src/storage/index/inverted/inverted_index_fs_directory.cpp index 6b2c4a80b05575..e65025b25a4fc7 100644 --- a/be/src/storage/index/inverted/inverted_index_fs_directory.cpp +++ b/be/src/storage/index/inverted/inverted_index_fs_directory.cpp @@ -87,7 +87,8 @@ const char* const DorisFSDirectory::WRITE_LOCK_FILE = "write.lock"; bool DorisFSDirectory::FSIndexInput::open(const io::FileSystemSPtr& fs, const char* path, IndexInput*& ret, CLuceneError& error, - int32_t buffer_size, int64_t file_size) { + int32_t buffer_size, int64_t file_size, + int64_t tablet_id) { CND_PRECONDITION(path != nullptr, "path is NULL"); if (buffer_size == -1) { @@ -100,6 +101,7 @@ bool DorisFSDirectory::FSIndexInput::open(const io::FileSystemSPtr& fs, const ch : io::FileCachePolicy::NO_CACHE; reader_options.is_doris_table = true; reader_options.file_size = file_size; + reader_options.tablet_id = tablet_id; Status st = fs->open_file(path, &h->_reader, &reader_options); DBUG_EXECUTE_IF("inverted file read error: index file not found", { st = Status::Error("index file not found"); }) diff --git a/be/src/storage/index/inverted/inverted_index_fs_directory.h b/be/src/storage/index/inverted/inverted_index_fs_directory.h index 7b1ac0bdf550d9..79854df88d235d 100644 --- a/be/src/storage/index/inverted/inverted_index_fs_directory.h +++ b/be/src/storage/index/inverted/inverted_index_fs_directory.h @@ -190,7 +190,8 @@ class DorisFSDirectory::FSIndexInput : public lucene::store::BufferedIndexInput public: static bool open(const io::FileSystemSPtr& fs, const char* path, IndexInput*& ret, - CLuceneError& error, int32_t bufferSize = -1, int64_t file_size = -1); + CLuceneError& error, int32_t bufferSize = -1, int64_t file_size = -1, + int64_t tablet_id = -1); ~FSIndexInput() override; IndexInput* clone() const override; diff --git a/be/src/storage/rowset/beta_rowset.cpp b/be/src/storage/rowset/beta_rowset.cpp index e94430731b17da..96fb7a36889c95 100644 --- a/be/src/storage/rowset/beta_rowset.cpp +++ b/be/src/storage/rowset/beta_rowset.cpp @@ -633,6 +633,7 @@ Status BetaRowset::check_current_rowset_segment() { .is_doris_table = true, .cache_base_path {}, .file_size = _rowset_meta->segment_file_size(seg_id), + .tablet_id = _rowset_meta->tablet_id(), }; auto s = segment_v2::Segment::open(fs, seg_path, _rowset_meta->tablet_id(), seg_id, @@ -843,7 +844,8 @@ Status BetaRowset::show_nested_index_file(rapidjson::Value* rowset_value, auto seg_path = DORIS_TRY(segment_path(seg_id)); auto index_file_path_prefix = InvertedIndexDescriptor::get_index_file_path_prefix(seg_path); auto index_file_reader = std::make_unique( - fs, std::string(index_file_path_prefix), storage_format); + fs, std::string(index_file_path_prefix), storage_format, InvertedIndexFileInfo(), + _rowset_meta->tablet_id()); RETURN_IF_ERROR(index_file_reader->init()); auto dirs = index_file_reader->get_all_directories(); diff --git a/be/src/storage/rowset/beta_rowset_writer.cpp b/be/src/storage/rowset/beta_rowset_writer.cpp index 25216e3de9c735..94bd38a16ef7f2 100644 --- a/be/src/storage/rowset/beta_rowset_writer.cpp +++ b/be/src/storage/rowset/beta_rowset_writer.cpp @@ -442,6 +442,7 @@ Status BetaRowsetWriter::_load_noncompacted_segment(segment_v2::SegmentSharedPtr : io::FileCachePolicy::NO_CACHE, .is_doris_table = true, .cache_base_path {}, + .tablet_id = _rowset_meta->tablet_id(), }; auto s = segment_v2::Segment::open(fs, path, _rowset_meta->tablet_id(), segment_id, rowset_id(), _context.tablet_schema, reader_options, &segment); @@ -1107,7 +1108,7 @@ Status BetaRowsetWriter::create_segment_writer_for_segcompaction( index_file_writer = std::make_unique( _context.fs(), prefix, _context.rowset_id.to_string(), _num_segcompacted, _context.tablet_schema->get_inverted_index_storage_format(), - std::move(idx_file_writer)); + std::move(idx_file_writer), true /* can_use_ram_dir */, _context.tablet_id); } segment_v2::SegmentWriterOptions writer_options; diff --git a/be/src/storage/rowset/rowset_writer.h b/be/src/storage/rowset/rowset_writer.h index 693ebfe4169757..75c0bff084bd8c 100644 --- a/be/src/storage/rowset/rowset_writer.h +++ b/be/src/storage/rowset/rowset_writer.h @@ -114,7 +114,7 @@ class RowsetWriter { *index_file_writer = std::make_unique( _context.fs(), segment_prefix, _context.rowset_id.to_string(), segment_id, _context.tablet_schema->get_inverted_index_storage_format(), - std::move(idx_file_v2_ptr), can_use_ram_dir); + std::move(idx_file_v2_ptr), can_use_ram_dir, _context.tablet_id); return Status::OK(); } diff --git a/be/src/storage/segment/segment.cpp b/be/src/storage/segment/segment.cpp index fdeeb3e752c26e..8f768ae4f89996 100644 --- a/be/src/storage/segment/segment.cpp +++ b/be/src/storage/segment/segment.cpp @@ -87,8 +87,15 @@ Status Segment::open(io::FileSystemSPtr fs, const std::string& path, int64_t tab uint32_t segment_id, RowsetId rowset_id, TabletSchemaSPtr tablet_schema, const io::FileReaderOptions& reader_options, std::shared_ptr* output, InvertedIndexFileInfo idx_file_info, OlapReaderStatistics* stats) { - auto s = _open(fs, path, segment_id, rowset_id, tablet_schema, reader_options, output, + // Ensure tablet_id is available in reader_options for CachedRemoteFileReader peer read. + io::FileReaderOptions opts_with_tablet = reader_options; + opts_with_tablet.tablet_id = tablet_id; + + auto s = _open(fs, path, segment_id, rowset_id, tablet_schema, opts_with_tablet, output, idx_file_info, stats); + if (s.ok() && output && *output) { + (*output)->_tablet_id = tablet_id; + } if (!s.ok()) { if (!config::is_cloud_mode()) { auto res = ExecEnv::get_tablet(tablet_id); @@ -235,7 +242,7 @@ Status Segment::_open_index_file_reader() { _fs, std::string {InvertedIndexDescriptor::get_index_file_path_prefix( _file_reader->path().native())}, - _tablet_schema->get_inverted_index_storage_format(), _idx_file_info); + _tablet_schema->get_inverted_index_storage_format(), _idx_file_info, _tablet_id); return Status::OK(); } diff --git a/be/src/storage/segment/segment.h b/be/src/storage/segment/segment.h index 4e434680140471..f6e8dc6db52107 100644 --- a/be/src/storage/segment/segment.h +++ b/be/src/storage/segment/segment.h @@ -300,6 +300,7 @@ class Segment : public std::enable_shared_from_this, public MetadataAdd DorisCallOnce _index_file_reader_open; InvertedIndexFileInfo _idx_file_info; + int64_t _tablet_id = -1; int _be_exec_version = BeExecVersionManager::get_newest_version(); }; diff --git a/be/src/storage/task/index_builder.cpp b/be/src/storage/task/index_builder.cpp index b7e2db3b4b68f6..93c70bf0cdb951 100644 --- a/be/src/storage/task/index_builder.cpp +++ b/be/src/storage/task/index_builder.cpp @@ -279,7 +279,8 @@ Status IndexBuilder::update_inverted_index_info() { auto idx_file_reader = std::make_unique( context.fs(), std::string {InvertedIndexDescriptor::get_index_file_path_prefix(seg_path)}, - output_rs_tablet_schema->get_inverted_index_storage_format()); + output_rs_tablet_schema->get_inverted_index_storage_format(), + InvertedIndexFileInfo(), _tablet->tablet_id()); auto st = idx_file_reader->init(); DBUG_EXECUTE_IF( "IndexBuilder::update_inverted_index_info_index_file_reader_init_not_ok", { @@ -372,7 +373,7 @@ Status IndexBuilder::handle_single_rowset(RowsetMetaSharedPtr output_rowset_meta fs, std::move(index_path_prefix), output_rowset_meta->rowset_id().to_string(), seg_ptr->id(), output_rowset_schema->get_inverted_index_storage_format(), - std::move(file_writer)); + std::move(file_writer), true /* can_use_ram_dir */, _tablet->tablet_id()); RETURN_IF_ERROR(index_file_writer->initialize(dirs)); // create inverted index writer for (auto& index_meta : _dropped_inverted_indexes) { @@ -443,12 +444,13 @@ Status IndexBuilder::handle_single_rowset(RowsetMetaSharedPtr output_rowset_meta index_file_writer = std::make_unique( fs, index_path_prefix, output_rowset_meta->rowset_id().to_string(), seg_ptr->id(), output_rowset_schema->get_inverted_index_storage_format(), - std::move(file_writer)); + std::move(file_writer), true /* can_use_ram_dir */, _tablet->tablet_id()); RETURN_IF_ERROR(index_file_writer->initialize(dirs)); } else { index_file_writer = std::make_unique( fs, index_path_prefix, output_rowset_meta->rowset_id().to_string(), - seg_ptr->id(), output_rowset_schema->get_inverted_index_storage_format()); + seg_ptr->id(), output_rowset_schema->get_inverted_index_storage_format(), + nullptr, true /* can_use_ram_dir */, _tablet->tablet_id()); } // create inverted index writer, or ann index writer for (auto inverted_index : _alter_inverted_indexes) { diff --git a/be/test/io/cache/block_file_cache_test.cpp b/be/test/io/cache/block_file_cache_test.cpp index c2fe61d5d36006..cf417b4f3478f7 100644 --- a/be/test/io/cache/block_file_cache_test.cpp +++ b/be/test/io/cache/block_file_cache_test.cpp @@ -3334,6 +3334,7 @@ TEST_F(BlockFileCacheTest, cached_remote_file_reader) { io::FileReaderOptions opts; opts.cache_type = io::cache_type_from_string("file_block_cache"); opts.is_doris_table = true; + opts.tablet_id = 10086; CachedRemoteFileReader reader(local_reader, opts); auto key = io::BlockFileCache::hash("tmp_file"); EXPECT_EQ(reader._cache_hash, key); @@ -3447,6 +3448,7 @@ TEST_F(BlockFileCacheTest, cached_remote_file_reader_tail) { io::FileReaderOptions opts; opts.cache_type = io::cache_type_from_string("file_block_cache"); opts.is_doris_table = true; + opts.tablet_id = 10086; CachedRemoteFileReader reader(local_reader, opts); { std::string buffer; @@ -3520,6 +3522,7 @@ TEST_F(BlockFileCacheTest, cached_remote_file_reader_error_handle) { io::FileReaderOptions opts; opts.cache_type = io::cache_type_from_string("file_block_cache"); opts.is_doris_table = true; + opts.tablet_id = 10086; CachedRemoteFileReader reader(local_reader, opts); auto sp = SyncPoint::get_instance(); sp->enable_processing(); @@ -3605,6 +3608,7 @@ TEST_F(BlockFileCacheTest, cached_remote_file_reader_self_heal_on_downloaded_not io::FileReaderOptions opts; opts.cache_type = io::cache_type_from_string("file_block_cache"); opts.is_doris_table = true; + opts.tablet_id = 10086; CachedRemoteFileReader reader(local_reader, opts); uint64_t before_self_heal = g_read_cache_self_heal_on_not_found.get_value(); @@ -3701,6 +3705,7 @@ TEST_F(BlockFileCacheTest, cached_remote_file_reader_no_self_heal_on_non_not_fou io::FileReaderOptions opts; opts.cache_type = io::cache_type_from_string("file_block_cache"); opts.is_doris_table = true; + opts.tablet_id = 10086; CachedRemoteFileReader reader(local_reader, opts); std::string buffer(64_kb, '\0'); @@ -3849,6 +3854,7 @@ TEST_F(BlockFileCacheTest, cached_remote_file_reader_concurrent) { io::FileReaderOptions opts; opts.cache_type = io::cache_type_from_string("file_block_cache"); opts.is_doris_table = true; + opts.tablet_id = 10086; bool flag1 = false; auto reader = std::make_shared(local_reader, opts); auto sp = SyncPoint::get_instance(); @@ -3933,6 +3939,7 @@ TEST_F(BlockFileCacheTest, cached_remote_file_reader_concurrent_2) { io::FileReaderOptions opts; opts.cache_type = io::cache_type_from_string("file_block_cache"); opts.is_doris_table = true; + opts.tablet_id = 10086; auto reader = std::make_shared(local_reader, opts); auto sp = SyncPoint::get_instance(); sp->enable_processing(); @@ -4455,6 +4462,7 @@ TEST_F(BlockFileCacheTest, cached_remote_file_reader_opt_lock) { io::FileReaderOptions opts; opts.cache_type = FileCachePolicy::FILE_BLOCK_CACHE; opts.is_doris_table = true; + opts.tablet_id = 10086; { FileReaderSPtr local_reader; ASSERT_TRUE(global_local_filesystem()->open_file(tmp_file, &local_reader).ok()); @@ -7055,6 +7063,7 @@ TEST_F(BlockFileCacheTest, reader_dryrun_when_download_file_cache) { io::FileReaderOptions opts; opts.cache_type = io::cache_type_from_string("file_block_cache"); opts.is_doris_table = true; + opts.tablet_id = 10086; CachedRemoteFileReader reader(local_reader, opts); auto key = io::BlockFileCache::hash("tmp_file"); EXPECT_EQ(reader._cache_hash, key); @@ -7550,6 +7559,7 @@ TEST_F(BlockFileCacheTest, cached_remote_file_reader_ttl_index) { io::FileReaderOptions opts; opts.cache_type = io::cache_type_from_string("file_block_cache"); opts.is_doris_table = true; + opts.tablet_id = 10086; CachedRemoteFileReader reader(local_reader, opts); auto key = io::BlockFileCache::hash("tmp_file"); EXPECT_EQ(reader._cache_hash, key); @@ -7631,6 +7641,7 @@ TEST_F(BlockFileCacheTest, cached_remote_file_reader_normal_index) { io::FileReaderOptions opts; opts.cache_type = io::cache_type_from_string("file_block_cache"); opts.is_doris_table = true; + opts.tablet_id = 10086; CachedRemoteFileReader reader(local_reader, opts); auto key = io::BlockFileCache::hash("tmp_file"); EXPECT_EQ(reader._cache_hash, key); @@ -7786,6 +7797,7 @@ TEST_F(BlockFileCacheTest, DISABLE_cached_remote_file_reader_direct_read_and_evi io::FileReaderOptions opts; opts.cache_type = io::cache_type_from_string("file_block_cache"); opts.is_doris_table = true; + opts.tablet_id = 10086; auto reader = std::make_shared(local_reader, opts); std::string buffer; @@ -7878,6 +7890,7 @@ TEST_F(BlockFileCacheTest, cached_remote_file_reader_direct_read_bytes_check) { io::FileReaderOptions opts; opts.cache_type = io::cache_type_from_string("file_block_cache"); opts.is_doris_table = true; + opts.tablet_id = 10086; auto reader = std::make_shared(local_reader, opts); std::string buffer; diff --git a/be/test/io/cache/block_file_cache_test_lru_dump.cpp b/be/test/io/cache/block_file_cache_test_lru_dump.cpp index e8a373568ef752..a5d06b5abbc3a2 100644 --- a/be/test/io/cache/block_file_cache_test_lru_dump.cpp +++ b/be/test/io/cache/block_file_cache_test_lru_dump.cpp @@ -528,6 +528,7 @@ TEST_F(BlockFileCacheTest, cached_remote_file_reader_direct_read_order_check) { io::FileReaderOptions opts; opts.cache_type = io::cache_type_from_string("file_block_cache"); opts.is_doris_table = true; + opts.tablet_id = 10086; auto reader = std::make_shared(local_reader, opts); std::string buffer; diff --git a/be/test/io/fs/packed_file_concurrency_test.cpp b/be/test/io/fs/packed_file_concurrency_test.cpp index 1e581ff59b8259..14c41d53a7b442 100644 --- a/be/test/io/fs/packed_file_concurrency_test.cpp +++ b/be/test/io/fs/packed_file_concurrency_test.cpp @@ -456,6 +456,7 @@ class MockRemoteFileSystem : public FileSystem { FileReaderOptions local_opts = opts ? *opts : FileReaderOptions(); local_opts.cache_type = FileCachePolicy::FILE_BLOCK_CACHE; local_opts.is_doris_table = true; + local_opts.tablet_id = 10086; *reader = std::make_shared(raw, local_opts); return Status::OK(); } @@ -722,6 +723,7 @@ TEST_F(MergeFileConcurrencyTest, ConcurrentWriteReadCorrectness) { FileReaderOptions opts; opts.cache_type = FileCachePolicy::FILE_BLOCK_CACHE; opts.is_doris_table = true; + opts.tablet_id = 10086; ASSERT_TRUE(reader_fs.open_file(Path(path), &reader, &opts).ok()); // After the fix, CachedRemoteFileReader wraps PackedFileReader (not vice versa) // This ensures cache key uses segment path for proper cleanup