Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
50 changes: 30 additions & 20 deletions be/src/pipeline/dependency.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -318,21 +318,31 @@ Status AggSharedState::reset_hash_table() {
agg_data->method_variant);
}

void PartitionedAggSharedState::init_spill_params(size_t spill_partition_count) {
partition_count = spill_partition_count;
max_partition_index = partition_count - 1;
void PartitionedAggSharedState::init_spill_params() {
// PartitionedAgg uses hierarchical spill partitioning with fixed 8-way fanout per level.
// Keep the API but ignore spill_partition_count for fanout.
//
// The existing RuntimeState::spill_aggregation_partition_count() was originally used to decide
// the number of single-level partitions. With multi-level partitioning, fanout must be stable
// across sink/source and across split levels, so we pin it to kSpillFanout=8 (same as join).
partition_count = kSpillFanout;

for (int i = 0; i < partition_count; ++i) {
spill_partitions.emplace_back(std::make_shared<AggSpillPartition>());
spill_partitions.clear();
pending_partitions.clear();
for (uint32_t i = 0; i < partition_count; ++i) {
SpillPartitionId id {.level = 0, .path = i};
auto [it, inserted] = spill_partitions.try_emplace(id.key());
it->second.id = id;
pending_partitions.emplace_back(id);
}
}

void PartitionedAggSharedState::update_spill_stream_profiles(RuntimeProfile* source_profile) {
for (auto& partition : spill_partitions) {
if (partition->spilling_stream_) {
partition->spilling_stream_->update_shared_profiles(source_profile);
for (auto& [_, partition] : spill_partitions) {
if (partition.spilling_stream) {
partition.spilling_stream->update_shared_profiles(source_profile);
}
for (auto& stream : partition->spill_streams_) {
for (auto& stream : partition.spill_streams) {
if (stream) {
stream->update_shared_profiles(source_profile);
}
Expand All @@ -343,25 +353,25 @@ void PartitionedAggSharedState::update_spill_stream_profiles(RuntimeProfile* sou
Status AggSpillPartition::get_spill_stream(RuntimeState* state, int node_id,
RuntimeProfile* profile,
vectorized::SpillStreamSPtr& spill_stream) {
if (spilling_stream_) {
spill_stream = spilling_stream_;
if (spilling_stream) {
spill_stream = spilling_stream;
return Status::OK();
}
RETURN_IF_ERROR(ExecEnv::GetInstance()->spill_stream_mgr()->register_spill_stream(
state, spilling_stream_, print_id(state->query_id()), "agg", node_id,
state, spilling_stream, print_id(state->query_id()), "agg", node_id,
std::numeric_limits<int32_t>::max(), std::numeric_limits<size_t>::max(), profile));
spill_streams_.emplace_back(spilling_stream_);
spill_stream = spilling_stream_;
spill_streams.emplace_back(spilling_stream);
spill_stream = spilling_stream;
return Status::OK();
}
void AggSpillPartition::close() {
if (spilling_stream_) {
spilling_stream_.reset();
if (spilling_stream) {
spilling_stream.reset();
}
for (auto& stream : spill_streams_) {
for (auto& stream : spill_streams) {
(void)ExecEnv::GetInstance()->spill_stream_mgr()->delete_spill_stream(stream);
}
spill_streams_.clear();
spill_streams.clear();
}

void PartitionedAggSharedState::close() {
Expand All @@ -372,8 +382,8 @@ void PartitionedAggSharedState::close() {
return;
}
DCHECK(!false_close && is_closed);
for (auto partition : spill_partitions) {
partition->close();
for (auto& [_, partition] : spill_partitions) {
partition.close();
}
spill_partitions.clear();
}
Expand Down
153 changes: 110 additions & 43 deletions be/src/pipeline/dependency.h
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

#pragma once

#include "vec/common/custom_allocator.h"
#ifdef __APPLE__
#include <netinet/in.h>
#include <sys/_types/_u_int.h>
Expand All @@ -26,10 +27,12 @@
#include <sqltypes.h>

#include <atomic>
#include <deque>
#include <functional>
#include <memory>
#include <mutex>
#include <thread>
#include <unordered_map>
#include <utility>

#include "common/config.h"
Expand All @@ -39,6 +42,7 @@
#include "pipeline/common/join_utils.h"
#include "pipeline/common/set_utils.h"
#include "pipeline/exec/data_queue.h"
#include "pipeline/exec/hierarchical_spill_partition.h"
#include "pipeline/exec/join/process_hash_table_probe.h"
#include "util/brpc_closure.h"
#include "util/stack_util.h"
Expand Down Expand Up @@ -440,69 +444,89 @@ struct BasicSpillSharedState {
virtual void update_spill_stream_profiles(RuntimeProfile* source_profile) = 0;
};

struct AggSpillPartition;
struct PartitionedAggSharedState : public BasicSharedState,
public BasicSpillSharedState,
public std::enable_shared_from_this<PartitionedAggSharedState> {
ENABLE_FACTORY_CREATOR(PartitionedAggSharedState)

PartitionedAggSharedState() = default;
~PartitionedAggSharedState() override = default;

void update_spill_stream_profiles(RuntimeProfile* source_profile) override;

void init_spill_params(size_t spill_partition_count);

void close();

AggSharedState* in_mem_shared_state = nullptr;
std::shared_ptr<BasicSharedState> in_mem_shared_state_sptr;

size_t partition_count;
size_t max_partition_index;
bool is_spilled = false;
std::atomic_bool is_closed = false;
std::deque<std::shared_ptr<AggSpillPartition>> spill_partitions;

size_t get_partition_index(size_t hash_value) const { return hash_value % partition_count; }
};

struct AggSpillPartition {
static constexpr int64_t AGG_SPILL_FILE_SIZE = 1024 * 1024 * 1024; // 1G

AggSpillPartition() = default;

SpillPartitionId id;
bool is_split = false;
// Best-effort bytes written via this partition node (in block format).
// Used as a split trigger; not used for correctness.
int64_t spilled_bytes = 0;

void close();

Status get_spill_stream(RuntimeState* state, int node_id, RuntimeProfile* profile,
vectorized::SpillStreamSPtr& spilling_stream);
vectorized::SpillStreamSPtr& spill_stream);

Status flush_if_full() {
DCHECK(spilling_stream_);
DCHECK(spilling_stream);
Status status;
// avoid small spill files
if (spilling_stream_->get_written_bytes() >= AGG_SPILL_FILE_SIZE) {
status = spilling_stream_->spill_eof();
spilling_stream_.reset();
if (spilling_stream->get_written_bytes() >= AGG_SPILL_FILE_SIZE) {
status = spilling_stream->spill_eof();
spilling_stream.reset();
}
return status;
}

Status finish_current_spilling(bool eos = false) {
if (spilling_stream_) {
if (eos || spilling_stream_->get_written_bytes() >= AGG_SPILL_FILE_SIZE) {
auto status = spilling_stream_->spill_eof();
spilling_stream_.reset();
if (spilling_stream) {
if (eos || spilling_stream->get_written_bytes() >= AGG_SPILL_FILE_SIZE) {
auto status = spilling_stream->spill_eof();
spilling_stream.reset();
return status;
}
}
return Status::OK();
}

std::deque<vectorized::SpillStreamSPtr> spill_streams_;
vectorized::SpillStreamSPtr spilling_stream_;
std::deque<vectorized::SpillStreamSPtr> spill_streams;
vectorized::SpillStreamSPtr spilling_stream;
};
using AggSpillPartitionSPtr = std::shared_ptr<AggSpillPartition>;

struct PartitionedAggSharedState : public BasicSharedState,
public BasicSpillSharedState,
public std::enable_shared_from_this<PartitionedAggSharedState> {
ENABLE_FACTORY_CREATOR(PartitionedAggSharedState)

PartitionedAggSharedState() = default;
~PartitionedAggSharedState() override = default;

void update_spill_stream_profiles(RuntimeProfile* source_profile) override;

void init_spill_params();

void close();

AggSharedState* in_mem_shared_state = nullptr;
std::shared_ptr<BasicSharedState> in_mem_shared_state_sptr;

size_t partition_count;
bool is_spilled = false;
std::atomic_bool is_closed = false;
// Hierarchical spill partitions (multi-level split).
// Keyed by SpillPartitionId::key(). (level-0 has kSpillFanout base partitions.)
DorisMap<uint32_t, AggSpillPartition> spill_partitions;

std::deque<SpillPartitionId> pending_partitions;

size_t get_partition_index(size_t hash_value) const { return hash_value % partition_count; }

// NOTE: Aggregation has a "null key" bucket in its hash table implementation.
// We route spilled null-key rows to a deterministic hash bucket so it participates in
// the same multi-level split behavior as normal keys.
inline AggSpillPartition& get_or_create_agg_partition(const SpillPartitionId& partition_id) {
auto [it, inserted] = spill_partitions.try_emplace(partition_id.key());
if (inserted) {
it->second.id = partition_id;
}
return it->second;
}
};

struct SortSharedState : public BasicSharedState {
ENABLE_FACTORY_CREATOR(SortSharedState)
public:
Expand Down Expand Up @@ -630,24 +654,67 @@ struct HashJoinSharedState : public JoinSharedState {
bool left_semi_direct_return = false;
};

// Hierarchical spill partitioning for hash join probe-side.
static constexpr uint32_t kHashJoinSpillFanout = kSpillFanout;
static constexpr uint32_t kHashJoinSpillBitsPerLevel = kSpillBitsPerLevel;
static constexpr uint32_t kHashJoinSpillMaxDepth = kSpillMaxDepth;
using HashJoinSpillPartitionId = SpillPartitionId;

struct HashJoinSpillPartition {
HashJoinSpillPartitionId id;
bool is_split = false;
// Probe-side buffered rows for this partition before flushing into blocks/spill.
std::unique_ptr<vectorized::MutableBlock> accumulating_block;
// Probe-side materialized blocks for this partition (in-memory).
std::vector<vectorized::Block> blocks;
vectorized::SpillStreamSPtr spill_stream;

// Memory tracking for this partition.
int64_t in_mem_bytes = 0; // Bytes of data currently in memory (accumulating_block + blocks).
int64_t spilled_bytes = 0; // Bytes of data that have been spilled to disk.

int64_t total_bytes() const { return in_mem_bytes + spilled_bytes; }
};

using HashJoinSpillPartitionMap = DorisMap<uint32_t, HashJoinSpillPartition>;

struct HashJoinSpillBuildPartition {
HashJoinSpillPartitionId id;
bool is_split = false;
// Build-side buffered rows for this partition before hash table build.
std::unique_ptr<vectorized::MutableBlock> build_block;
std::vector<vectorized::Block> blocks;
vectorized::SpillStreamSPtr spill_stream;

// Memory tracking for this partition.
int64_t in_mem_bytes = 0; // Bytes of data currently in memory (build_block).
int64_t spilled_bytes = 0; // Bytes of data currently spilled to disk.
int64_t row_count = 0; // Total number of rows currently in this partition.

int64_t total_bytes() const { return in_mem_bytes + spilled_bytes; }
};

using HashJoinSpillBuildPartitionMap = DorisMap<uint32_t, HashJoinSpillBuildPartition>;

struct PartitionedHashJoinSharedState
: public HashJoinSharedState,
public BasicSpillSharedState,
public std::enable_shared_from_this<PartitionedHashJoinSharedState> {
ENABLE_FACTORY_CREATOR(PartitionedHashJoinSharedState)

void update_spill_stream_profiles(RuntimeProfile* source_profile) override {
for (auto& stream : spilled_streams) {
if (stream) {
stream->update_shared_profiles(source_profile);
for (auto& [_, partition] : build_partitions) {
if (partition.spill_stream) {
partition.spill_stream->update_shared_profiles(source_profile);
}
}
}

std::unique_ptr<RuntimeState> inner_runtime_state;
std::shared_ptr<HashJoinSharedState> inner_shared_state;
std::vector<std::unique_ptr<vectorized::MutableBlock>> partitioned_build_blocks;
std::vector<vectorized::SpillStreamSPtr> spilled_streams;
HashJoinSpillPartitionMap probe_partitions;
HashJoinSpillBuildPartitionMap build_partitions;
std::deque<HashJoinSpillPartitionId> pending_probe_partitions;
bool is_spilled = false;
};

Expand Down
11 changes: 11 additions & 0 deletions be/src/pipeline/exec/data_queue.h
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
#include <deque>
#include <memory>
#include <mutex>
#include <utility>
#include <vector>

#include "common/status.h"
Expand Down Expand Up @@ -81,6 +82,16 @@ class DataQueue {

void terminate();

std::pair<int64_t, uint32_t> current_queue_size() const {
int64_t total_bytes = 0;
uint32_t total_blocks = 0;
for (int i = 0; i < _child_count; ++i) {
total_bytes += _cur_bytes_in_queue[i].load();
total_blocks += _cur_blocks_nums_in_queue[i].load();
}
return {total_bytes, total_blocks};
}

private:
std::vector<std::unique_ptr<std::mutex>> _queue_blocks_lock;
std::vector<std::deque<std::unique_ptr<vectorized::Block>>> _queue_blocks;
Expand Down
27 changes: 26 additions & 1 deletion be/src/pipeline/exec/distinct_streaming_aggregation_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,12 @@

#include <gen_cpp/Metrics_types.h>

#include <cstdint>
#include <memory>
#include <utility>

#include "common/compiler_util.h" // IWYU pragma: keep
#include "util/runtime_profile.h"
#include "vec/exprs/vectorized_agg_fn.h"

namespace doris {
Expand Down Expand Up @@ -202,6 +204,26 @@ Status DistinctStreamingAggLocalState::_distinct_pre_agg_with_serialized_key(
_emplace_into_hash_table_to_distinct(_distinct_row, key_columns, rows);
DCHECK_LE(_distinct_row.size(), rows)
<< "_distinct_row size should be less than or equal to rows";

size_t used_memory = 0;
std::visit(vectorized::Overload {
[&](std::monostate& arg) {
// Do nothing
},
[&](auto& agg_method) {
used_memory = agg_method.hash_table->get_buffer_size_in_bytes();
}},
_agg_data->method_variant);
COUNTER_SET(_memory_used_counter,
int64_t(_distinct_row.allocated_bytes() + _arena.size() + used_memory));
} else {
std::visit(vectorized::Overload {[&](std::monostate& arg) {
// Do nothing
},
[&](auto& agg_method) { agg_method.hash_table.reset(); }},
_agg_data->method_variant);
_arena.clear(true);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

这里虽然要停止插入hashtable了,但是已经在hashtable 中的数据直接不管了吗?这里直接reset
这里会不会挂掉?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hash table 里面记录的应该是已经输出过的行。

COUNTER_SET(_memory_used_counter, 0);
}

bool mem_reuse = _parent->cast<DistinctStreamingAggOperatorX>()._make_nullable_keys.empty() &&
Expand Down Expand Up @@ -434,8 +456,11 @@ Status DistinctStreamingAggLocalState::close(RuntimeState* state) {
// Do nothing
},
[&](auto& agg_method) {
COUNTER_SET(_hash_table_size_counter,
if (agg_method.hash_table) {
COUNTER_SET(
_hash_table_size_counter,
int64_t(agg_method.hash_table->size()));
}
}},
_agg_data->method_variant);
}
Expand Down
Loading
Loading