Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 7 additions & 3 deletions src/Common/FailPoint.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,8 @@ static struct InitFiu
/// We should define different types of failpoints here. There are four types of them:
/// - ONCE: the failpoint will only be triggered once.
/// - REGULAR: the failpoint will always be triggered until disableFailPoint is called.
/// - PAUSEABLE_ONCE: the failpoint will be blocked one time when pauseFailPoint is called, util disableFailPoint is called.
/// - PAUSEABLE: the failpoint will be blocked every time when pauseFailPoint is called, util disableFailPoint is called.
/// - PAUSEABLE_ONCE: the failpoint will be blocked one time when pauseFailPoint is called, until disableFailPoint is called.
/// - PAUSEABLE: the failpoint will be blocked every time when pauseFailPoint is called, until disableFailPoint is called.

#define APPLY_FOR_FAILPOINTS(ONCE, REGULAR, PAUSEABLE_ONCE, PAUSEABLE) \
ONCE(replicated_merge_tree_commit_zk_fail_after_op) \
Expand Down Expand Up @@ -131,9 +131,13 @@ static struct InitFiu
REGULAR(rmt_delay_commit_part) \
ONCE(smt_commit_exception_before_op) \
ONCE(backup_add_empty_memory_table) \
ONCE(local_object_storage_network_error_during_remove) \
ONCE(parallel_replicas_check_read_mode_always) \
REGULAR(lightweight_show_tables) \
PAUSEABLE_ONCE(drop_database_before_exclusive_ddl_lock) \
REGULAR(storage_merge_tree_background_schedule_merge_fail) \
REGULAR(refresh_task_stop_racing_for_running_refresh)


namespace FailPoints
{
#define M(NAME) extern const char(NAME)[] = #NAME "";
Expand Down
162 changes: 162 additions & 0 deletions src/Storages/MergeTree/IMergeTreeCleanupThread.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,162 @@
#include <Storages/MergeTree/IMergeTreeCleanupThread.h>

#include <Interpreters/Context.h>
#include <Storages/MergeTree/MergeTreeData.h>
#include <Storages/MergeTree/MergeTreeSettings.h>
#include <Common/ZooKeeper/KeeperException.h>

namespace DB
{

namespace MergeTreeSetting
{
extern const MergeTreeSettingsUInt64 cleanup_delay_period;
extern const MergeTreeSettingsUInt64 cleanup_delay_period_random_add;
extern const MergeTreeSettingsUInt64 cleanup_thread_preferred_points_per_iteration;
extern const MergeTreeSettingsUInt64 max_cleanup_delay_period;
}

IMergeTreeCleanupThread::IMergeTreeCleanupThread(MergeTreeData & data_)
: data(data_)
, log_name(data.getStorageID().getFullTableName() + " (CleanupThread)")
, log(getLogger(log_name))
, sleep_ms((*data.getSettings())[MergeTreeSetting::cleanup_delay_period] * 1000)
{
task = data.getContext()->getSchedulePool().createTask(data.getStorageID(), log_name, [this] { run(); });
}

IMergeTreeCleanupThread::~IMergeTreeCleanupThread() = default;

void IMergeTreeCleanupThread::start()
{
task->activateAndSchedule();
}

void IMergeTreeCleanupThread::wakeup()
{
task->schedule();
}

void IMergeTreeCleanupThread::stop()
{
task->deactivate();
}

void IMergeTreeCleanupThread::wakeupEarlierIfNeeded()
{
/// It may happen that the tables was idle for a long time, but then a user started to aggressively insert (or mutate) data.
/// In this case, sleep_ms was set to the highest possible value, the task is not going to wake up soon,
/// but the number of objects to clean up is growing. We need to wakeup the task earlier.
auto storage_settings = data.getSettings();
if (!(*storage_settings)[MergeTreeSetting::cleanup_thread_preferred_points_per_iteration])
return;

/// The number of other objects (logs, blocks, etc) is usually correlated with the number of Outdated parts.
/// Do not wake up unless we have too many.
size_t number_of_outdated_objects = data.getOutdatedPartsCount();
if (number_of_outdated_objects < (*storage_settings)[MergeTreeSetting::cleanup_thread_preferred_points_per_iteration] * 2)
return;

/// A race condition is possible here, but it's okay
if (is_running.load(std::memory_order_relaxed))
return;

/// Do not re-check all parts too often (avoid constantly calling getNumberOfOutdatedPartsWithExpiredRemovalTime())
if (!wakeup_check_timer.compareAndRestart(static_cast<double>((*storage_settings)[MergeTreeSetting::cleanup_delay_period]) / 4.0))
return;

UInt64 prev_run_timestamp_ms = prev_cleanup_timestamp_ms.load(std::memory_order_relaxed);
UInt64 now_ms = clock_gettime_ns_adjusted(prev_run_timestamp_ms * 1'000'000) / 1'000'000;
if (!prev_run_timestamp_ms || now_ms <= prev_run_timestamp_ms)
return;

/// Don't run it more often than cleanup_delay_period
UInt64 seconds_passed = (now_ms - prev_run_timestamp_ms) / 1000;
if (seconds_passed < (*storage_settings)[MergeTreeSetting::cleanup_delay_period])
return;

/// Do not count parts that cannot be removed anyway. Do not wake up unless we have too many.
number_of_outdated_objects = data.getNumberOfOutdatedPartsWithExpiredRemovalTime();
if (number_of_outdated_objects < (*storage_settings)[MergeTreeSetting::cleanup_thread_preferred_points_per_iteration] * 2)
return;

LOG_TRACE(
log,
"Waking up cleanup thread because there are {} outdated objects and previous cleanup finished {}s ago",
number_of_outdated_objects,
seconds_passed);

wakeup();
}

void IMergeTreeCleanupThread::run()
{
if (cleanup_blocker.isCancelled())
{
LOG_TRACE(LogFrequencyLimiter(log, 30), "Cleanup is cancelled, exiting");
return;
}

SCOPE_EXIT({ is_running.store(false, std::memory_order_relaxed); });
is_running.store(true, std::memory_order_relaxed);

auto storage_settings = data.getSettings();

Float32 cleanup_points = 0;
try
{
cleanup_points = iterate();
}
catch (const Coordination::Exception & e)
{
tryLogCurrentException(log, __PRETTY_FUNCTION__);

if (e.code == Coordination::Error::ZSESSIONEXPIRED)
return;
}
catch (...)
{
tryLogCurrentException(log, __PRETTY_FUNCTION__);
}

UInt64 prev_timestamp = prev_cleanup_timestamp_ms.load(std::memory_order_relaxed);
UInt64 now_ms = clock_gettime_ns_adjusted(prev_timestamp * 1'000'000) / 1'000'000;

/// Do not adjust sleep_ms on the first run after starting the server
if (prev_timestamp && (*storage_settings)[MergeTreeSetting::cleanup_thread_preferred_points_per_iteration])
{
/// We don't want to run the task too often when the table was barely changed and there's almost nothing to cleanup.
/// But we cannot simply sleep max_cleanup_delay_period (300s) when nothing was cleaned up and cleanup_delay_period (30s)
/// when we removed something, because inserting one part per 30s will lead to running cleanup each 30s just to remove one part.
/// So we need some interpolation based on preferred batch size.
auto expected_cleanup_points = (*storage_settings)[MergeTreeSetting::cleanup_thread_preferred_points_per_iteration];

/// How long should we sleep to remove cleanup_thread_preferred_points_per_iteration on the next iteration?
Float32 ratio = cleanup_points / static_cast<Float32>(expected_cleanup_points);
if (ratio == 0)
sleep_ms = (*storage_settings)[MergeTreeSetting::max_cleanup_delay_period] * 1000;
else
sleep_ms = static_cast<UInt64>(static_cast<Float32>(sleep_ms) / ratio);

sleep_ms = std::clamp(
sleep_ms,
(*storage_settings)[MergeTreeSetting::cleanup_delay_period] * 1000,
(*storage_settings)[MergeTreeSetting::max_cleanup_delay_period] * 1000);

UInt64 interval_ms = now_ms - prev_timestamp;
LOG_TRACE(
log,
"Scheduling next cleanup after {}ms (points: {}, interval: {}ms, ratio: {}, points per minute: {})",
sleep_ms,
cleanup_points,
interval_ms,
ratio,
cleanup_points / static_cast<Float32>(interval_ms * 60'000));
}
prev_cleanup_timestamp_ms.store(now_ms, std::memory_order_relaxed);

sleep_ms += std::uniform_int_distribution<UInt64>(0, (*storage_settings)[MergeTreeSetting::cleanup_delay_period_random_add] * 1000)(rng);
task->scheduleAfter(sleep_ms);
}

}
56 changes: 56 additions & 0 deletions src/Storages/MergeTree/IMergeTreeCleanupThread.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
#pragma once

#include <Core/BackgroundSchedulePool.h>
#include <Common/ActionBlocker.h>
#include <Common/Stopwatch.h>
#include <Common/randomSeed.h>

#include <pcg_random.hpp>

namespace DB
{

class MergeTreeData;

/// Removes obsolete data from a table of type [Replicated]MergeTree.
class IMergeTreeCleanupThread
{
public:
explicit IMergeTreeCleanupThread(MergeTreeData & data_);

virtual ~IMergeTreeCleanupThread();

void start();

void wakeup();

void stop();

void wakeupEarlierIfNeeded();

ActionLock getCleanupLock() { return cleanup_blocker.cancel(); }

protected:
MergeTreeData & data;

String log_name;
LoggerPtr log;
BackgroundSchedulePoolTaskHolder task;
pcg64 rng{randomSeed()};

UInt64 sleep_ms;

std::atomic<UInt64> prev_cleanup_timestamp_ms = 0;
std::atomic<bool> is_running = false;

AtomicStopwatch wakeup_check_timer;

ActionBlocker cleanup_blocker;

void run();

/// Returns a number this is directly proportional to the number of cleaned up blocks
virtual Float32 iterate() = 0;
};

}
63 changes: 63 additions & 0 deletions src/Storages/MergeTree/MergeTreeCleanupThread.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
#include <Storages/MergeTree/MergeTreeCleanupThread.h>

#include <Storages/MergeTree/MergeTreeSettings.h>
#include <Storages/StorageMergeTree.h>

namespace DB
{

namespace MergeTreeSetting
{
extern const MergeTreeSettingsSeconds lock_acquire_timeout_for_background_operations;
extern const MergeTreeSettingsUInt64 merge_tree_clear_old_parts_interval_seconds;
extern const MergeTreeSettingsUInt64 merge_tree_clear_old_temporary_directories_interval_seconds;
extern const MergeTreeSettingsSeconds temporary_directories_lifetime;
}

MergeTreeCleanupThread::MergeTreeCleanupThread(StorageMergeTree & storage_)
: IMergeTreeCleanupThread(storage_)
, storage(storage_)
{
}

void MergeTreeCleanupThread::start()
{
time_after_previous_cleanup_parts.restart();
time_after_previous_cleanup_temporary_directories.restart();
IMergeTreeCleanupThread::start();
}

Float32 MergeTreeCleanupThread::iterate()
{
size_t cleaned_other = 0;
size_t cleaned_part_like = 0;
size_t cleaned_parts = 0;

auto storage_settings = storage.getSettings();

auto shared_lock
= storage.lockForShare(RWLockImpl::NO_QUERY, (*storage_settings)[MergeTreeSetting::lock_acquire_timeout_for_background_operations]);
if (auto lock = time_after_previous_cleanup_temporary_directories.compareAndRestartDeferred(
static_cast<double>((*storage_settings)[MergeTreeSetting::merge_tree_clear_old_temporary_directories_interval_seconds])))
{
/// Both use relative_data_path which changes during rename, so we do it under share lock
cleaned_part_like += storage.clearOldTemporaryDirectories(
(*storage.getSettings())[MergeTreeSetting::temporary_directories_lifetime].totalSeconds());
}

if (auto lock = time_after_previous_cleanup_parts.compareAndRestartDeferred(
static_cast<double>((*storage_settings)[MergeTreeSetting::merge_tree_clear_old_parts_interval_seconds])))
{
cleaned_parts += storage.clearOldPartsFromFilesystem(/* force */ false, /* with_pause_point */ true);
cleaned_other += storage.clearOldMutations();
cleaned_part_like += storage.clearEmptyParts();
cleaned_part_like += storage.clearUnusedPatchParts();
cleaned_part_like += storage.unloadPrimaryKeysAndClearCachesOfOutdatedParts();
}

constexpr Float32 parts_number_amplification = 1.3f; /// Assuming we merge 4-5 parts each time
Float32 cleaned_inserted_parts = static_cast<Float32>(cleaned_parts) / parts_number_amplification;
return cleaned_inserted_parts + static_cast<Float32>(cleaned_part_like) + static_cast<Float32>(cleaned_other);
}

}
31 changes: 31 additions & 0 deletions src/Storages/MergeTree/MergeTreeCleanupThread.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
#pragma once

#include <Storages/MergeTree/IMergeTreeCleanupThread.h>
#include <Common/Stopwatch.h>

namespace DB
{

class StorageMergeTree;

class MergeTreeCleanupThread : public IMergeTreeCleanupThread
{
public:
explicit MergeTreeCleanupThread(StorageMergeTree & storage_);

/// Shadows IMergeTreeCleanupThread::start() to restart cleanup timers
/// before activating the background task. This ensures the thread waits
/// a full interval after the manual cleanup done in startup().
void start();

private:
StorageMergeTree & storage;

AtomicStopwatch time_after_previous_cleanup_parts;
AtomicStopwatch time_after_previous_cleanup_temporary_directories;

/// Returns a number that is directly proportional to the number of cleaned up objects
Float32 iterate() override;
};

}
Loading
Loading