Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 19 additions & 2 deletions be/src/cloud/cloud_tablet.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1595,10 +1595,20 @@ void CloudTablet::_submit_segment_download_task(const RowsetSharedPtr& rs,
// clang-format off
const auto& rowset_meta = rs->rowset_meta();
auto self = std::dynamic_pointer_cast<CloudTablet>(shared_from_this());
// Use rowset_meta->fs() instead of storage_resource->fs to support packed file.
// RowsetMeta::fs() wraps the underlying FileSystem with PackedFileSystem when
// packed_slice_locations is not empty, which correctly maps segment file paths
// to their actual locations within packed files.
auto file_system = rowset_meta->fs();
if (!file_system) {
LOG(WARNING) << "failed to get file system for tablet_id=" << _tablet_meta->tablet_id()
<< ", rowset_id=" << rowset_meta->rowset_id();
return;
}
_engine.file_cache_block_downloader().submit_download_task(io::DownloadFileMeta {
.path = storage_resource->remote_segment_path(*rowset_meta, seg_id),
.file_size = rs->rowset_meta()->segment_file_size(seg_id),
.file_system = storage_resource->fs,
.file_system = file_system,
.ctx = {
.expiration_time = expiration_time,
.is_dryrun = config::enable_reader_dryrun_when_download_file_cache,
Expand Down Expand Up @@ -1633,10 +1643,17 @@ void CloudTablet::_submit_inverted_index_download_task(const RowsetSharedPtr& rs
// clang-format off
const auto& rowset_meta = rs->rowset_meta();
auto self = std::dynamic_pointer_cast<CloudTablet>(shared_from_this());
// Use rowset_meta->fs() instead of storage_resource->fs to support packed file for idx files.
auto file_system = rowset_meta->fs();
if (!file_system) {
LOG(WARNING) << "failed to get file system for tablet_id=" << _tablet_meta->tablet_id()
<< ", rowset_id=" << rowset_meta->rowset_id();
return;
}
io::DownloadFileMeta meta {
.path = idx_path,
.file_size = idx_size,
.file_system = storage_resource->fs,
.file_system = file_system,
.ctx = {
.expiration_time = expiration_time,
.is_dryrun = config::enable_reader_dryrun_when_download_file_cache,
Expand Down
12 changes: 11 additions & 1 deletion be/src/io/cache/block_file_cache_downloader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -210,6 +210,16 @@ void FileCacheBlockDownloader::download_file_cache_block(
LOG(WARNING) << storage_resource.error();
return;
}
// Use RowsetMeta::fs() instead of storage_resource->fs to support packed file.
// RowsetMeta::fs() wraps the underlying FileSystem with PackedFileSystem when
// packed_slice_locations is not empty, which correctly maps segment file paths
// to their actual locations within packed files.
auto file_system = find_it->second->fs();
if (!file_system) {
LOG(WARNING) << "download_file_cache_block: failed to get file system for tablet_id="
<< meta.tablet_id() << ", rowset_id=" << meta.rowset_id();
return;
}

auto download_done = [&, tablet_id = meta.tablet_id()](Status st) {
std::lock_guard lock(_inflight_mtx);
Expand Down Expand Up @@ -251,7 +261,7 @@ void FileCacheBlockDownloader::download_file_cache_block(
: -1, // To avoid trigger get file size IO
.offset = meta.offset(),
.download_size = meta.size(),
.file_system = storage_resource.value()->fs,
.file_system = file_system,
.ctx =
{
.is_index_data = meta.cache_type() == ::doris::FileCacheType::INDEX,
Expand Down
4 changes: 3 additions & 1 deletion be/src/olap/rowset/rowset_writer_context.h
Original file line number Diff line number Diff line change
Expand Up @@ -221,7 +221,9 @@ struct RowsetWriterContext {
append_info.tablet_id = tablet_id;
append_info.rowset_id = rowset_id.to_string();
append_info.txn_id = txn_id;
append_info.expiration_time = file_cache_ttl_sec;
append_info.expiration_time = file_cache_ttl_sec > 0 && newest_write_timestamp > 0
? newest_write_timestamp + file_cache_ttl_sec
: 0;
fs = std::make_shared<io::PackedFileSystem>(fs, append_info);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,20 @@ suite('test_alter_compute_group_properties', 'docker') {
if (!isCloudMode()) {
return;
}

// Randomly enable or disable packed_file to test both scenarios
def enablePackedFile = new Random().nextBoolean()
logger.info("Running test with enable_packed_file=${enablePackedFile}")

def options = new ClusterOptions()
options.feConfigs += [
'cloud_cluster_check_interval_second=1',
'cloud_tablet_rebalancer_interval_second=1',
'sys_log_verbose_modules=org',
]
options.beConfigs += [
"enable_packed_file=${enablePackedFile}",
]
options.setFeNum(1)
options.setBeNum(1)
options.cloudMode = true
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,11 @@ suite('test_balance_metrics', 'docker') {
if (!isCloudMode()) {
return;
}

// Randomly enable or disable packed_file to test both scenarios
def enablePackedFile = new Random().nextBoolean()
logger.info("Running test with enable_packed_file=${enablePackedFile}")

def options = new ClusterOptions()
options.feConfigs += [
'cloud_cluster_check_interval_second=1',
Expand All @@ -35,7 +40,8 @@ suite('test_balance_metrics', 'docker') {
'report_tablet_interval_seconds=1',
'schedule_sync_tablets_interval_s=18000',
'disable_auto_compaction=true',
'sys_log_verbose_modules=*'
'sys_log_verbose_modules=*',
"enable_packed_file=${enablePackedFile}",
]
options.setFeNum(1)
options.setBeNum(1)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,11 @@ suite('test_balance_use_compute_group_properties', 'docker') {
if (!isCloudMode()) {
return;
}

// Randomly enable or disable packed_file to test both scenarios
def enablePackedFile = new Random().nextBoolean()
logger.info("Running test with enable_packed_file=${enablePackedFile}")

def options = new ClusterOptions()
options.feConfigs += [
'cloud_cluster_check_interval_second=1',
Expand All @@ -37,7 +42,7 @@ suite('test_balance_use_compute_group_properties', 'docker') {
'schedule_sync_tablets_interval_s=18000',
'disable_auto_compaction=true',
'sys_log_verbose_modules=*',
'enable_packed_file=false',
"enable_packed_file=${enablePackedFile}",
]
options.setFeNum(1)
options.setBeNum(1)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,11 @@ suite('test_balance_warm_up', 'docker') {
if (!isCloudMode()) {
return;
}

// Randomly enable or disable packed_file to test both scenarios
def enablePackedFile = new Random().nextBoolean()
logger.info("Running test with enable_packed_file=${enablePackedFile}")

def options = new ClusterOptions()
options.feConfigs += [
'cloud_cluster_check_interval_second=1',
Expand All @@ -38,7 +43,7 @@ suite('test_balance_warm_up', 'docker') {
'sys_log_verbose_modules=*',
'cache_read_from_peer_expired_seconds=100',
'enable_cache_read_from_peer=true',
'enable_packed_file=false',
"enable_packed_file=${enablePackedFile}",
]
options.setFeNum(1)
options.setBeNum(1)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,11 @@ suite('test_balance_warm_up_sync_cache', 'docker') {
if (!isCloudMode()) {
return;
}

// Randomly enable or disable packed_file to test both scenarios
def enablePackedFile = new Random().nextBoolean()
logger.info("Running test with enable_packed_file=${enablePackedFile}")

def options = new ClusterOptions()
options.feConfigs += [
'cloud_cluster_check_interval_second=1',
Expand All @@ -37,7 +42,7 @@ suite('test_balance_warm_up_sync_cache', 'docker') {
'schedule_sync_tablets_interval_s=18000',
'disable_auto_compaction=true',
'sys_log_verbose_modules=*',
'enable_packed_file=false',
"enable_packed_file=${enablePackedFile}",
]
options.setFeNum(1)
options.setBeNum(1)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,11 @@ suite('test_balance_warm_up_task_abnormal', 'docker') {
if (!isCloudMode()) {
return;
}

// Randomly enable or disable packed_file to test both scenarios
def enablePackedFile = new Random().nextBoolean()
logger.info("Running test with enable_packed_file=${enablePackedFile}")

def options = new ClusterOptions()
options.feConfigs += [
'cloud_cluster_check_interval_second=1',
Expand All @@ -37,7 +42,7 @@ suite('test_balance_warm_up_task_abnormal', 'docker') {
'schedule_sync_tablets_interval_s=18000',
'disable_auto_compaction=true',
'sys_log_verbose_modules=*',
'enable_packed_file=false',
"enable_packed_file=${enablePackedFile}",
]
options.setFeNum(1)
options.setBeNum(1)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,11 @@ suite('test_balance_warm_up_use_peer_cache', 'docker') {
if (!isCloudMode()) {
return;
}

// Randomly enable or disable packed_file to test both scenarios
def enablePackedFile = new Random().nextBoolean()
logger.info("Running test with enable_packed_file=${enablePackedFile}")

def options = new ClusterOptions()
options.feConfigs += [
'cloud_cluster_check_interval_second=1',
Expand All @@ -41,7 +46,7 @@ suite('test_balance_warm_up_use_peer_cache', 'docker') {
'sys_log_verbose_modules=*',
'cache_read_from_peer_expired_seconds=100',
'enable_cache_read_from_peer=true',
'enable_packed_file=false',
"enable_packed_file=${enablePackedFile}",
]
options.setFeNum(1)
options.setBeNum(1)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,11 @@ suite('test_balance_warm_up_with_compaction_use_peer_cache', 'docker') {
if (!isCloudMode()) {
return;
}

// Randomly enable or disable packed_file to test both scenarios
def enablePackedFile = new Random().nextBoolean()
logger.info("Running test with enable_packed_file=${enablePackedFile}")

def options = new ClusterOptions()
options.feConfigs += [
'cloud_cluster_check_interval_second=1',
Expand All @@ -42,7 +47,7 @@ suite('test_balance_warm_up_with_compaction_use_peer_cache', 'docker') {
'cumulative_compaction_min_deltas=5',
'cache_read_from_peer_expired_seconds=100',
'enable_cache_read_from_peer=true',
'enable_packed_file=false',
"enable_packed_file=${enablePackedFile}",
]
options.setFeNum(1)
options.setBeNum(1)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,10 @@ suite('test_expanding_node_balance', 'docker') {
return;
}

// Randomly enable or disable packed_file to test both scenarios
def enablePackedFile = new Random().nextBoolean()
logger.info("Running test with enable_packed_file=${enablePackedFile}")

def clusterOptions = [
new ClusterOptions(),
new ClusterOptions(),
Expand All @@ -40,6 +44,9 @@ suite('test_expanding_node_balance', 'docker') {
// disable Auto Analysis Job Executor
'auto_check_statistics_in_minutes=60',
]
options.beConfigs += [
"enable_packed_file=${enablePackedFile}",
]
options.cloudMode = true
options.setFeNum(1)
options.setBeNum(1)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,11 @@ suite('test_peer_read_async_warmup', 'docker') {
if (!isCloudMode()) {
return;
}

// Randomly enable or disable packed_file to test both scenarios
def enablePackedFile = new Random().nextBoolean()
logger.info("Running test with enable_packed_file=${enablePackedFile}")

def options = new ClusterOptions()
options.feConfigs += [
'cloud_cluster_check_interval_second=1',
Expand All @@ -39,7 +44,7 @@ suite('test_peer_read_async_warmup', 'docker') {
'disable_auto_compaction=true',
'sys_log_verbose_modules=*',
'enable_cache_read_from_peer=true',
'enable_packed_file=false',
"enable_packed_file=${enablePackedFile}",
]
options.setFeNum(1)
options.setBeNum(1)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,11 @@ suite('test_warmup_rebalance_in_cloud', 'multi_cluster, docker') {
if (!isCloudMode()) {
return;
}

// Randomly enable or disable packed_file to test both scenarios
def enablePackedFile = new Random().nextBoolean()
logger.info("Running test with enable_packed_file=${enablePackedFile}")

def options = new ClusterOptions()
options.feConfigs += [
'cloud_cluster_check_interval_second=1',
Expand All @@ -31,6 +36,9 @@ suite('test_warmup_rebalance_in_cloud', 'multi_cluster, docker') {
'sys_log_verbose_modules=org',
'cloud_pre_heating_time_limit_sec=600'
]
options.beConfigs += [
"enable_packed_file=${enablePackedFile}",
]
options.setFeNum(2)
options.setBeNum(3)
options.cloudMode = true
Expand Down
5 changes: 5 additions & 0 deletions regression-test/suites/cloud_p0/cache/test_load_cache.groovy
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,10 @@ changes to statistics are possible, only a reasonable range is required.
*/

suite('test_load_cache', 'docker') {
// Randomly enable or disable packed_file to test both scenarios
def enablePackedFile = new Random().nextBoolean()
logger.info("Running test with enable_packed_file=${enablePackedFile}")

def options = new ClusterOptions()
options.feConfigs += [
'cloud_cluster_check_interval_second=1',
Expand All @@ -41,6 +45,7 @@ suite('test_load_cache', 'docker') {
'file_cache_enter_disk_resource_limit_mode_percent=99',
'enable_evict_file_cache_in_advance=false',
'file_cache_background_monitor_interval_ms=1000',
"enable_packed_file=${enablePackedFile}",
]
options.cloudMode = true
options.beNum = 1
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,9 @@ import org.apache.doris.regression.suite.ClusterOptions

suite("test_topn_broadcast", "docker") {

// Randomly enable or disable packed_file to test both scenarios
def enablePackedFile = new Random().nextBoolean()
logger.info("Running test with enable_packed_file=${enablePackedFile}")

def options = new ClusterOptions()

Expand All @@ -32,7 +35,8 @@ suite("test_topn_broadcast", "docker") {
options.beConfigs += ['enable_file_cache=true', 'enable_java_support=false', 'file_cache_enter_disk_resource_limit_mode_percent=99',
'file_cache_background_lru_dump_interval_ms=2000', 'file_cache_background_lru_log_replay_interval_ms=500',
'disable_auto_compation=true', 'file_cache_enter_need_evict_cache_in_advance_percent=99',
'file_cache_background_lru_dump_update_cnt_threshold=0'
'file_cache_background_lru_dump_update_cnt_threshold=0',
"enable_packed_file=${enablePackedFile}",
]

docker(options) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,11 @@ suite('test_clean_stale_rs_file_cache', 'docker') {
if (!isCloudMode()) {
return;
}

// Randomly enable or disable packed_file to test both scenarios
def enablePackedFile = new Random().nextBoolean()
logger.info("Running test with enable_packed_file=${enablePackedFile}")

def options = new ClusterOptions()
options.feConfigs += [
'cloud_cluster_check_interval_second=1',
Expand All @@ -35,7 +40,7 @@ suite('test_clean_stale_rs_file_cache', 'docker') {
'tablet_rowset_stale_sweep_by_size=false',
'tablet_rowset_stale_sweep_time_sec=60',
'vacuum_stale_rowsets_interval_s=10',
'enable_packed_file=false',
"enable_packed_file=${enablePackedFile}",
]
options.setFeNum(1)
options.setBeNum(1)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,11 @@ suite('test_clean_stale_rs_index_file_cache', 'docker') {
if (!isCloudMode()) {
return;
}

// Randomly enable or disable packed_file to test both scenarios
def enablePackedFile = new Random().nextBoolean()
logger.info("Running test with enable_packed_file=${enablePackedFile}")

def options = new ClusterOptions()
options.feConfigs += [
'cloud_cluster_check_interval_second=1',
Expand All @@ -35,7 +40,7 @@ suite('test_clean_stale_rs_index_file_cache', 'docker') {
'tablet_rowset_stale_sweep_by_size=false',
'tablet_rowset_stale_sweep_time_sec=60',
'vacuum_stale_rowsets_interval_s=10',
'enable_packed_file=false',
"enable_packed_file=${enablePackedFile}",
]
options.setFeNum(1)
options.setBeNum(1)
Expand Down
Loading
Loading