diff --git a/benchmarks/bench_latency.cpp b/benchmarks/bench_latency.cpp index e1871c0..c279244 100644 --- a/benchmarks/bench_latency.cpp +++ b/benchmarks/bench_latency.cpp @@ -4,7 +4,6 @@ #include #include #include -#include #include @@ -176,6 +175,8 @@ BM_MPSC_PureDequeueLatency 25.4 us 23.5 us 28047 P99.9_n #else +#include + int main(int argc, char** argv) { std::cout << "This test is only for x86." << std::endl; return 0; diff --git a/benchmarks/bench_linearizable.cpp b/benchmarks/bench_linearizable.cpp index f5c3b60..d8af548 100644 --- a/benchmarks/bench_linearizable.cpp +++ b/benchmarks/bench_linearizable.cpp @@ -1,7 +1,4 @@ #include -#include -#include -#include #include #include @@ -9,6 +6,10 @@ #if DAKING_HAS_CXX20_OR_ABOVE +#include +#include +#include + struct Message { int producer_id; uint64_t seq; diff --git a/examples/command_dispatcher.cpp b/examples/command_dispatcher.cpp index e9d90f6..407bb98 100644 --- a/examples/command_dispatcher.cpp +++ b/examples/command_dispatcher.cpp @@ -3,7 +3,6 @@ #include #include #include -#include #include "daking/MPSC_queue.hpp" diff --git a/examples/log_system.cpp b/examples/log_system.cpp index 36302fd..e29e59b 100644 --- a/examples/log_system.cpp +++ b/examples/log_system.cpp @@ -6,7 +6,6 @@ #include #include #include -#include #include #include #include diff --git a/include/daking/MPSC_queue.hpp b/include/daking/MPSC_queue.hpp index f92bdd4..615cf8e 100644 --- a/include/daking/MPSC_queue.hpp +++ b/include/daking/MPSC_queue.hpp @@ -95,63 +95,13 @@ SOFTWARE. #include #include #include -#include -#include -#include #include namespace daking { - - /* - SC MP - [tail]->[]->[]->[]->[]->[head] - - SC MP - [tail]->[]->[]->[]->[]->[head] - - SC MP - [tail]->[]->[]->[]->[]->[head] - - ... - - Although all alive MPSC_queue instances share a global pool of nodes to reduce memory allocation overhead, - the consumer of each MPSC_queue could be different. - - All producers has a thread_local pool of nodes to reduce contention on the global pool, - and the cost of getting nodes from the global pool is O(1) , no matter ThreadLocalCapacity is 256 or larger. - This is because the global pool is organized as a stack of chunks, each chunk contains ThreadLocalCapacity nodes, - when allocate nodes from the global pool, we always pop a chunk from the stack, this is a cheap pointer exchange operation. - And the consumer thread will push back the chunk to the global pool when its thread_local pool is full. - - The chunk is freely combined of nodes, and the nodes in a chunk are not required to be contiguous in memory. - To achieve this, every node has a next_chunk_ pointer , and all of the nodes in a chunk are linked together via next_ pointer, - In MPMC_queue instance, wo focus on the next_ pointer, which is used to link the nodes in the queue. - And in chunk_stack, we focus on the next_chunk_ pointer, which is used to link the nodes in a chunk. - - The page list is used to manage the memory of nodes allocated from global pool, when the last instance is destructed, all pages will be deleted automatically. - - Page: - Blue Green Red - [ThreadLocalCapacity * node] -> [2 * ThreadLocalCapacity * node] -> [4 * ThreadLocalCapacity * node] -> ... -> nullptr - Color the nodes of contiguous memory with the same color for better illustration. - - GLOBAL: - TOP - [[B][B][B][R][G][R]] consumers pop chunks from here and producers push chunks to here - ↓ - [[R][R][G][R][R][G]] - ↓ - [[R][G][B][G][G][R]] It is obvious that the nodes in a chunk are not required to be contiguous in memory. - ↓ Actually, they are freely combined of nodes, - ... ABA problem exists when read next_chunk_ and compare stack top pointer, so we use tagged pointer to avoid it. - nullptr - */ - namespace detail { - template + template struct MPSC_node { - using value_type = typename Queue::value_type; - + using value_type = Ty; using node_t = MPSC_node; MPSC_node() { @@ -160,36 +110,36 @@ namespace daking { ~MPSC_node() { /* Don't call destructor of value_ here*/ } union { - value_type value_; - node_t* next_chunk_; + value_type value_; + MPSC_node* next_chunk_; // for stable }; std::atomic next_; }; - template + template struct MPSC_page{ - using size_type = typename Queue::size_type; + using size_type = std::size_t; - using node_t = MPSC_node; - using page_t = MPSC_page; + using obj_ptr = Obj*; + using page_t = MPSC_page; - MPSC_page(node_t* node, size_type count, page_t* next) + MPSC_page(obj_ptr node, size_type count, page_t* next) : node_(node), count_(count), next_(next) {} ~MPSC_page() = default; + obj_ptr node_; size_type count_; - node_t* node_; page_t* next_; }; - template + template struct MPSC_chunk_stack { - using size_type = typename Queue::size_type; + using size_type = std::size_t; - using node_t = MPSC_node; + using chunk_t = Chunk; struct tagged_ptr { - node_t* node_ = nullptr; + chunk_t* node_ = nullptr; size_type tag_ = 0; }; @@ -200,7 +150,7 @@ namespace daking { top_.store(tagged_ptr{ nullptr, 0 }); } - DAKING_ALWAYS_INLINE void push(node_t* chunk) noexcept /* Pointer Swap */ { + DAKING_NO_TSAN void push(chunk_t* chunk) noexcept /* Pointer Swap */ { tagged_ptr new_top{ chunk, 0 }; tagged_ptr old_top = top_.load(std::memory_order_relaxed); // If TB read old_top, and TA pop the old_top then @@ -217,7 +167,7 @@ namespace daking { )); } - DAKING_ALWAYS_INLINE bool try_pop(node_t*& chunk) noexcept /* Pointer Swap */ { + DAKING_NO_TSAN bool try_pop(chunk_t*& chunk) noexcept /* Pointer Swap */ { tagged_ptr old_top = top_.load(std::memory_order_acquire); tagged_ptr new_top{}; @@ -225,9 +175,9 @@ namespace daking { if (!old_top.node_) { return false; } - DAKING_TSAN_ANNOTATE_IGNORED(&old_top.node_->next_chunk_, sizeof(node_t*), "Reason: healthy data race"); new_top.node_ = old_top.node_->next_chunk_; new_top.tag_ = old_top.tag_ + 1; + // For stable pattern: // If TA and TB reach here at the same time // And A pop the chunk successfully, then it will construct object at old_top.node_->next_chunk_, // so that B will read a invalid value, but this value will not pass the next CAS.(old_top have been updated by A) @@ -246,148 +196,355 @@ namespace daking { std::atomic top_{}; }; - template - struct MPSC_thread_hook { - using size_type = typename Queue::size_type; + template < + std::size_t ThreadLocalCapacity = 256, + std::size_t Align = 64, /* std::hardware_destructive_interference_size */ + std::size_t ExpansionFactor = 2 + > + struct stable { + static_assert(ThreadLocalCapacity && (ThreadLocalCapacity & (ThreadLocalCapacity - 1)) == 0, "ThreadLocalCapacity must be a power of 2."); + static_assert(ExpansionFactor > 1, "ExpansionFactor must be greater than 1."); + static constexpr std::size_t thread_local_capacity = ThreadLocalCapacity; + static constexpr std::size_t align = Align; + static constexpr std::size_t expansion_factor = ExpansionFactor; + }; - using node_t = MPSC_node; - using thread_local_t = typename Queue::thread_local_t; + template + struct MPSC_stable_impl : public std::allocator_traits::template rebind_alloc> { + using value_type = Ty; + using size_type = std::size_t; - MPSC_thread_hook() : tid_(std::this_thread::get_id()) { - std::lock_guard guard(Queue::global_mutex_); - // Only being called after global_manager is not a nullptr. - pair_ = Queue::_get_global_manager().register_for(tid_); - } + static constexpr std::size_t thread_local_capacity = LowJitter::thread_local_capacity; + static constexpr std::size_t align = LowJitter::align; + static constexpr std::size_t expansion_factor = LowJitter::expansion_factor; - ~MPSC_thread_hook() { - // If this is consumer hook, release the queue tail to help destructor thread. - std::atomic_thread_fence(std::memory_order_release); - std::lock_guard guard(Queue::global_mutex_); - Queue::_get_global_manager().unregister_for(tid_); - } + using node_t = MPSC_node; + using page_t = MPSC_page; + using chunk_stack_t = detail::MPSC_chunk_stack; - DAKING_ALWAYS_INLINE node_t*& node_list() noexcept { - return pair_->first; - } + struct control_block_t { + void reset() { + node_list = nullptr; + node_size = 0; + } - DAKING_ALWAYS_INLINE size_type& node_size() noexcept { - return pair_->second; - } + node_t* node_list = nullptr; + size_type node_size = 0; + control_block_t* next_chunk_ = nullptr; // for recycle chunk_stack + }; - std::thread::id tid_; - thread_local_t* pair_; - }; + struct thread_hook_t { + thread_hook_t() { + control_block_ = MPSC_stable_impl::get_global_manager().get_control_block(); + } - // If allocator is stateless, there is no data race. - // But if it has stateful member: construct/destroy, you should protect these two functions by yourself, - // and other functions are protected by daking. - template - struct MPSC_manager : - public std::allocator_traits::template rebind_alloc>, - public std::allocator_traits::template rebind_alloc> { - using size_type = typename Queue::size_type; - - using node_t = MPSC_node; - using page_t = MPSC_page; - using thread_local_t = ThreadLocalType; - using thread_local_manager_t = std::unordered_map>; - using thread_local_recycler_t = std::vector>; - using alloc_node_t = typename std::allocator_traits::template rebind_alloc; - using altraits_node_t = std::allocator_traits; - using alloc_page_t = typename std::allocator_traits::template rebind_alloc; - using altraits_page_t = std::allocator_traits; - - MPSC_manager(const Alloc& alloc) - : alloc_node_t(alloc), alloc_page_t(alloc) {} - - ~MPSC_manager() = default; - - void reset() { - /* Already locked */ - for (auto& [tid, pair_ptr] : global_thread_local_manager_) { - auto& [node, size] = *pair_ptr; - node = nullptr; - size = 0; + ~thread_hook_t() { + MPSC_stable_impl::get_global_manager().return_control_block(control_block_); + } + + DAKING_ALWAYS_INLINE node_t*& node_list() noexcept { + return control_block_->node_list; + } + + DAKING_ALWAYS_INLINE size_type& node_size() noexcept { + return control_block_->node_size; + } + + control_block_t* control_block_; + }; + + using alloc_node_t = typename std::allocator_traits::template rebind_alloc; + using altraits_node_t = std::allocator_traits; + + struct manager_t : public std::allocator_traits::template rebind_alloc { + using alloc_page_t = typename std::allocator_traits::template rebind_alloc; + using altraits_page_t = std::allocator_traits; + + // new/delete meta data + using control_block_page_t = MPSC_page; + using control_block_recycler_t = MPSC_chunk_stack; + + manager_t(const Alloc& alloc) : alloc_page_t(alloc) {} + + ~manager_t() { + while (global_control_block_page_list_) { + delete global_control_block_page_list_->node_; + delete std::exchange( global_control_block_page_list_, global_control_block_page_list_->next_); + } + } + + void reset(alloc_node_t& alloc_node) { + /* Already locked */ + for (auto* control_block_page = global_control_block_page_list_; control_block_page;) { + control_block_page->node_->reset(); + control_block_page = control_block_page->next_; + } + + while (global_page_list_) { + altraits_node_t::deallocate(alloc_node, global_page_list_->node_, global_page_list_->count_); + altraits_page_t::deallocate(*this, std::exchange(global_page_list_, global_page_list_->next_), 1); + } + + global_node_count_.store(0, std::memory_order_release); } - for (auto& pair_ptr : global_thread_local_recycler_) { - auto& [node, size] = *pair_ptr; - node = nullptr; - size = 0; + + void reserve(size_type count, alloc_node_t& alloc_node) { + /* Already locked */ + node_t* new_nodes = altraits_node_t::allocate(alloc_node, count); + page_t* new_page = altraits_page_t::allocate(*this, 1); + altraits_page_t::construct(*this, new_page, new_nodes, count, global_page_list_); + global_page_list_ = new_page; + + for (size_type i = 0; i < count; i++) { + new_nodes[i].next_ = new_nodes + i + 1; // seq_cst + if ((i & (MPSC_stable_impl::thread_local_capacity - 1)) == MPSC_stable_impl::thread_local_capacity - 1) DAKING_UNLIKELY { + // chunk_count = count / ThreadLocalCapacity + new_nodes[i].next_ = nullptr; + std::atomic_thread_fence(std::memory_order_acq_rel); + // mutex don't protect global_chunk_stack_ + MPSC_stable_impl::global_chunk_stack_.push(&new_nodes[i - MPSC_stable_impl::thread_local_capacity + 1]); + } + } + + global_node_count_.store(global_node_count_ + count, std::memory_order_release); + } + + DAKING_ALWAYS_INLINE control_block_t* allocate_control_block() { + std::lock_guard guard(MPSC_stable_impl::global_mutex_); + control_block_t* control_block = new control_block_t(); + control_block_page_t* new_page = new control_block_page_t(control_block, 1, global_control_block_page_list_); + global_control_block_page_list_ = new_page; + return control_block; + } + + DAKING_ALWAYS_INLINE control_block_t* get_control_block() { + control_block_t* control_block; + if (!global_control_block_recycler_.try_pop(control_block)) { + control_block = allocate_control_block(); + } + return control_block; } - while (global_page_list_) { - altraits_node_t::deallocate(*this, global_page_list_->node_, global_page_list_->count_); - altraits_page_t::deallocate(*this, std::exchange(global_page_list_, global_page_list_->next_), 1); + DAKING_ALWAYS_INLINE void return_control_block(control_block_t* control_block) { + global_control_block_recycler_.push(control_block); } - global_node_count_.store(0, std::memory_order_release); + DAKING_ALWAYS_INLINE size_type node_count() noexcept { + return global_node_count_.load(std::memory_order_acquire); + } + + DAKING_ALWAYS_INLINE static manager_t* create_global_manager(const Alloc& alloc) { + static manager_t global_manager(alloc); + return &global_manager; + } + + page_t* global_page_list_ = nullptr; + std::atomic global_node_count_ = 0; + + + control_block_page_t* global_control_block_page_list_ = nullptr; + control_block_recycler_t global_control_block_recycler_{}; + }; + + using alloc_page_t = typename manager_t::alloc_page_t; + using altraits_page_t = typename manager_t::altraits_page_t; + + static_assert(std::is_empty_v, + "In the stable global manager design, Alloc must be stateless to avoid dangling references. " + ); + static_assert( + std::is_constructible_v && std::is_constructible_v, + "Alloc should have a template constructor like 'Alloc(const Alloc& alloc)' to meet internal conversion." + ); + + MPSC_stable_impl(const Alloc& alloc) : alloc_node_t(alloc) { + global_instance_count_++; + std::lock_guard guard(global_mutex_); + global_manager_instance_ = manager_t::create_global_manager(alloc); // single instance } - void reserve(size_type count) { - /* Already locked */ - node_t* new_nodes = altraits_node_t::allocate(*this, count); - page_t* new_page = altraits_page_t::allocate(*this, 1); - altraits_page_t::construct(*this, new_page, new_nodes, count, global_page_list_); - global_page_list_ = new_page; - - for (size_type i = 0; i < count; i++) { - new_nodes[i].next_ = new_nodes + i + 1; // seq_cst - if ((i & (Queue::thread_local_capacity - 1)) == Queue::thread_local_capacity - 1) DAKING_UNLIKELY { - // chunk_count = count / ThreadLocalCapacity - new_nodes[i].next_ = nullptr; - std::atomic_thread_fence(std::memory_order_acq_rel); - // mutex don't protect global_chunk_stack_ - Queue::global_chunk_stack_.push(&new_nodes[i - Queue::thread_local_capacity + 1]); + ~MPSC_stable_impl() { + if (--global_instance_count_ == 0) { + // only the last instance free the global resource + std::lock_guard lock(global_mutex_); + // if a new instance constructed before i get mutex, I do nothing. + if (global_instance_count_ == 0) { + free_global(); } } + } - global_node_count_.store(global_node_count_ + count, std::memory_order_release); + DAKING_ALWAYS_INLINE static manager_t& get_global_manager() noexcept { + return *global_manager_instance_; } - DAKING_ALWAYS_INLINE thread_local_t* register_for(std::thread::id tid) { - /* Already locked */ - if (!global_thread_local_recycler_.empty()) { - global_thread_local_manager_[tid] = std::move(global_thread_local_recycler_.back()); - global_thread_local_recycler_.pop_back(); + DAKING_ALWAYS_INLINE thread_hook_t& get_thread_hook() { + static thread_local thread_hook_t thread_hook; + return thread_hook; + } + + DAKING_ALWAYS_INLINE node_t*& get_thread_local_node_list() noexcept { + return get_thread_hook().node_list(); + } + + DAKING_ALWAYS_INLINE size_type& get_thread_local_node_size() noexcept { + return get_thread_hook().node_size(); + } + + DAKING_ALWAYS_INLINE node_t* allocate() { + node_t*& thread_local_node_list = get_thread_local_node_list(); + size_type& thread_local_node_size = get_thread_local_node_size(); + if (thread_local_node_size == 0) DAKING_UNLIKELY { + while (!global_chunk_stack_.try_pop(thread_local_node_list)) { + reserve_global_internal(); + } + thread_local_node_size = thread_local_capacity; } - else { - global_thread_local_manager_[tid] = std::make_unique(nullptr, 0); + thread_local_node_size--; + DAKING_TSAN_ANNOTATE_ACQUIRE(thread_local_node_list); + DAKING_TSAN_ANNOTATE_ACQUIRE(thread_local_node_list->next_); + node_t* res = std::exchange(thread_local_node_list, thread_local_node_list->next_.load(std::memory_order_relaxed)); + res->next_.store(nullptr, std::memory_order_relaxed); + return res; + } + + DAKING_ALWAYS_INLINE void deallocate(node_t* node) noexcept { + node_t*& thread_local_node_list = get_thread_local_node_list(); + node->next_.store(thread_local_node_list, std::memory_order_relaxed); + thread_local_node_list = node; + DAKING_TSAN_ANNOTATE_RELEASE(node); + if (++get_thread_local_node_size() >= thread_local_capacity) DAKING_UNLIKELY { + global_chunk_stack_.push(thread_local_node_list); + thread_local_node_list = nullptr; + get_thread_local_node_size() = 0; + } + } + + DAKING_ALWAYS_INLINE bool reserve_global_external(size_type chunk_count) { + manager_t& manager = get_global_manager(); + size_type global_node_count = manager.node_count(); + if (global_node_count / thread_local_capacity >= chunk_count) { + return false; + } + std::lock_guard lock(global_mutex_); + global_node_count = manager.node_count(); + if (global_node_count / thread_local_capacity >= chunk_count) { + return false; + } + + size_type count = (chunk_count - global_node_count / thread_local_capacity) * thread_local_capacity; + manager.reserve(count, *this); + return true; + } + + DAKING_ALWAYS_INLINE void reserve_global_internal() { + std::lock_guard lock(global_mutex_); + if (global_chunk_stack_.top_.load(std::memory_order_acquire).node_) { + // if anyone have already allocate chunks, I return. + return; } - return global_thread_local_manager_[tid].get(); + + constexpr size_type mask = thread_local_capacity - 1; + size_type count = (expansion_factor - 1) * get_global_manager().node_count(); + get_global_manager().reserve(std::max(thread_local_capacity, count), *this); } - DAKING_ALWAYS_INLINE void unregister_for(std::thread::id tid) { + DAKING_ALWAYS_INLINE void free_global() { /* Already locked */ - global_thread_local_recycler_.push_back(std::move(global_thread_local_manager_[tid])); - global_thread_local_manager_.erase(tid); + global_chunk_stack_.reset(); + get_global_manager().reset(*this); } - DAKING_ALWAYS_INLINE size_type node_count() noexcept { - return global_node_count_.load(std::memory_order_acquire); + DAKING_ALWAYS_INLINE size_type global_node_size_apprx() noexcept { + return global_manager_instance_ ? get_global_manager().node_count() : 0; } - DAKING_ALWAYS_INLINE static MPSC_manager* create_global_manager(const Alloc& alloc) { - static MPSC_manager global_manager(alloc); - return &global_manager; + DAKING_ALWAYS_INLINE bool reserve_global_chunk(size_type chunk_count) { + return global_manager_instance_ ? reserve_global_external(chunk_count) : false; } - page_t* global_page_list_ = nullptr; - std::atomic global_node_count_ = 0; - thread_local_manager_t global_thread_local_manager_; - thread_local_recycler_t global_thread_local_recycler_; + /* Global Lock Free*/ + inline static chunk_stack_t global_chunk_stack_{}; + inline static std::atomic global_instance_count_ = 0; + + /* Global Mutex*/ + inline static std::mutex global_mutex_{}; + inline static manager_t* global_manager_instance_ = nullptr; + }; + + template + struct MPSC_base; + + template + struct MPSC_base, Alloc> + : MPSC_stable_impl, Alloc> { + using memory_policy = MPSC_stable_impl, Alloc>; + + MPSC_base(const Alloc& alloc) : memory_policy(alloc) {} + ~MPSC_base() = default; }; } + using detail::stable; + /* + stable pattern will not free memory to OS at runtime, but have a stable jitter performance. + + In this mode: + + SC MP + [tail]->[]->[]->[]->[]->[head] + + SC MP + [tail]->[]->[]->[]->[]->[head] + + SC MP + [tail]->[]->[]->[]->[]->[head] + + ... + + Although all alive MPSC_queue instances share a global pool of nodes to reduce memory allocation overhead, + the consumer of each MPSC_queue could be different. + + All producers has a thread_local pool of nodes to reduce contention on the global pool, + and the cost of getting nodes from the global pool is O(1) , no matter ThreadLocalCapacity is 256 or larger. + This is because the global pool is organized as a stack of chunks, each chunk contains ThreadLocalCapacity nodes, + when allocate nodes from the global pool, we always pop a chunk from the stack, this is a cheap pointer exchange operation. + And the consumer thread will push back the chunk to the global pool when its thread_local pool is full. + + The chunk is freely combined of nodes, and the nodes in a chunk are not required to be contiguous in memory. + To achieve this, every node has a next_chunk_ pointer , and all of the nodes in a chunk are linked together via next_ pointer, + In MPMC_queue instance, wo focus on the next_ pointer, which is used to link the nodes in the queue. + And in chunk_stack, we focus on the next_chunk_ pointer, which is used to link the nodes in a chunk. + + The page list is used to manage the memory of nodes allocated from global pool, when the last instance is destructed, all pages will be deleted automatically. + + Page: + Blue Green Red + [ThreadLocalCapacity * node] -> [expansion_factor^1 * ThreadLocalCapacity * node] -> [expansion_factor^2 * ThreadLocalCapacity * node] -> ... -> nullptr + Color the nodes of contiguous memory with the same color for better illustration. + + GLOBAL: + TOP + [[B][B][B][R][G][R]] consumers pop chunks from here and producers push chunks to here + ↓ + [[R][R][G][R][R][G]] + ↓ + [[R][G][B][G][G][R]] It is obvious that the nodes in a chunk are not required to be contiguous in memory. + ↓ Actually, they are freely combined of nodes, + ... ABA problem exists when read next_chunk_ and compare stack top pointer, so we use tagged pointer to avoid it. + nullptr + */ + + + template < typename Ty, - std::size_t ThreadLocalCapacity = 256, - std::size_t Align = 64, /* std::hardware_destructive_interference_size */ - typename Alloc = std::allocator + typename MemoryPolicy = stable<>, + typename Alloc = std::allocator > - class MPSC_queue { + class MPSC_queue : private detail::MPSC_base { public: static_assert(std::is_object_v, "Ty must be object."); - static_assert((ThreadLocalCapacity & (ThreadLocalCapacity - 1)) == 0, "ThreadLocalCapacity must be a power of 2."); using value_type = Ty; using allocator_type = Alloc; @@ -396,39 +553,25 @@ namespace daking { using reference = Ty&; using const_reference = const Ty&; - static constexpr std::size_t thread_local_capacity = ThreadLocalCapacity; - static constexpr std::size_t align = Align; + using base = detail::MPSC_base; + using memory_policy = typename base::memory_policy; + + static constexpr std::size_t thread_local_capacity = memory_policy::thread_local_capacity; + static constexpr std::size_t align = memory_policy::align; private: - using node_t = detail::MPSC_node; - using page_t = detail::MPSC_page; - using chunk_stack_t = detail::MPSC_chunk_stack; - using thread_hook_t = detail::MPSC_thread_hook; - using thread_local_t = std::pair; - using manager_t = detail::MPSC_manager; - using alloc_node_t = typename manager_t::alloc_node_t; - using altraits_node_t = typename manager_t::altraits_node_t; - using alloc_page_t = typename manager_t::alloc_page_t; - using altraits_page_t = typename manager_t::altraits_page_t; - - static_assert( - std::is_constructible_v && // for constructor of MPSC_manager - std::is_constructible_v, - "Alloc should have a template constructor like 'Alloc(const Alloc& alloc)' to meet internal conversion." - ); - - friend thread_hook_t; - friend manager_t; - friend altraits_node_t; - friend altraits_page_t; + using node_t = typename memory_policy::node_t; + using alloc_node_t = typename memory_policy::alloc_node_t; + using altraits_node_t = typename memory_policy::altraits_node_t; public: MPSC_queue() : MPSC_queue(allocator_type()) {} - MPSC_queue(const allocator_type& alloc) { + MPSC_queue(const allocator_type& alloc) : base(alloc) { /* Alloc -> Alloc<...>, which means Alloc should have a template constructor */ - global_instance_count_++; - _initial(alloc); + node_t* dummy = memory_policy::allocate(); + tail_ = dummy; + head_.store(dummy, std::memory_order_release); } explicit MPSC_queue(size_type initial_global_chunk_count, const allocator_type& alloc = allocator_type()) @@ -439,20 +582,11 @@ namespace daking { ~MPSC_queue() { node_t* next = tail_->next_.load(std::memory_order_acquire); while (next) { - altraits_node_t::destroy(_get_global_manager(), std::addressof(next->value_)); - _deallocate(std::exchange(tail_, next)); + altraits_node_t::destroy(*this, std::addressof(next->value_)); + memory_policy::deallocate(std::exchange(tail_, next)); next = tail_->next_.load(std::memory_order_acquire); } - _deallocate(tail_); - - if (--global_instance_count_ == 0) { - // only the last instance free the global resource - std::lock_guard lock(global_mutex_); - // if a new instance constructed before i get mutex, I do nothing. - if (global_instance_count_ == 0) { - _free_global(); - } - } + memory_policy::deallocate(tail_); } MPSC_queue(const MPSC_queue&) = delete; @@ -462,8 +596,8 @@ namespace daking { template DAKING_ALWAYS_INLINE void emplace(Args&&... args) { - node_t* new_node = _allocate(); - altraits_node_t::construct(_get_global_manager(), std::addressof(new_node->value_), std::forward(args)...); + node_t* new_node = memory_policy::allocate(); + altraits_node_t::construct(*this, std::addressof(new_node->value_), std::forward(args)...); node_t* old_head = head_.exchange(new_node, std::memory_order_acq_rel); old_head->next_.store(new_node, std::memory_order_release); @@ -484,12 +618,12 @@ namespace daking { // N times thread_local operation, One time CAS operation. // So it is more efficient than N times enqueue. - node_t* first_new_node = _allocate(); + node_t* first_new_node = memory_policy::allocate(); node_t* prev_node = first_new_node; - altraits_node_t::construct(_get_global_manager(), std::addressof(first_new_node->value_), value); + altraits_node_t::construct(*this, std::addressof(first_new_node->value_), value); for (size_type i = 1; i < n; i++) { - node_t* new_node = _allocate(); - altraits_node_t::construct(_get_global_manager(), std::addressof(new_node->value_), value); + node_t* new_node = memory_policy::allocate(); + altraits_node_t::construct(*this, std::addressof(new_node->value_), value); prev_node->next_.store(new_node, std::memory_order_relaxed); prev_node = new_node; } @@ -509,13 +643,13 @@ namespace daking { static_assert(std::is_same_v::value_type, value_type>, "The value type of iterator must be same as MPSC_queue::value_type."); - node_t* first_new_node = _allocate(); + node_t* first_new_node = memory_policy::allocate(); node_t* prev_node = first_new_node; - altraits_node_t::construct(_get_global_manager(), std::addressof(first_new_node->value_), *it); + altraits_node_t::construct(*this, std::addressof(first_new_node->value_), *it); ++it; for (size_type i = 1; i < n; i++) { - node_t* new_node = _allocate(); - altraits_node_t::construct(_get_global_manager(), std::addressof(new_node->value_), *it); + node_t* new_node = memory_policy::allocate(); + altraits_node_t::construct(*this, std::addressof(new_node->value_), *it); prev_node->next_.store(new_node, std::memory_order_relaxed); prev_node = new_node; ++it; @@ -542,8 +676,8 @@ namespace daking { node_t* next = tail_->next_.load(std::memory_order_acquire); if (next) DAKING_LIKELY { value = std::move(next->value_); - altraits_node_t::destroy(_get_global_manager(), std::addressof(next->value_)); - _deallocate(std::exchange(tail_, next)); + altraits_node_t::destroy(*this, std::addressof(next->value_)); + memory_policy::deallocate(std::exchange(tail_, next)); return true; } else { @@ -627,111 +761,15 @@ namespace daking { return tail_->next_.load(std::memory_order_acquire) == nullptr; } - DAKING_ALWAYS_INLINE static size_type global_node_size_apprx() noexcept { - return global_manager_instance_ ? _get_global_manager().node_count() : 0; + DAKING_ALWAYS_INLINE size_type global_node_size_apprx() noexcept { + return memory_policy::global_node_size_apprx(); } - DAKING_ALWAYS_INLINE static bool reserve_global_chunk(size_type chunk_count) { - return global_manager_instance_ ? _reserve_global_external(chunk_count) : false; + DAKING_ALWAYS_INLINE bool reserve_global_chunk(size_type chunk_count) { + return memory_policy::reserve_global_chunk(chunk_count); } - + private: - DAKING_ALWAYS_INLINE static manager_t& _get_global_manager() noexcept { - return *global_manager_instance_; - } - - DAKING_ALWAYS_INLINE thread_hook_t& _get_thread_hook() { - static thread_local thread_hook_t thread_hook; - return thread_hook; - } - - DAKING_ALWAYS_INLINE node_t*& _get_thread_local_node_list() noexcept { - return _get_thread_hook().node_list(); - } - - DAKING_ALWAYS_INLINE size_type& _get_thread_local_node_size() noexcept { - return _get_thread_hook().node_size(); - } - - DAKING_ALWAYS_INLINE void _initial(const Alloc& alloc) { - { - std::lock_guard guard(global_mutex_); - global_manager_instance_ = manager_t::create_global_manager(alloc); // single instance - } - - node_t* dummy = _allocate(); - tail_ = dummy; - head_.store(dummy, std::memory_order_release); - } - - DAKING_ALWAYS_INLINE node_t* _allocate() { - node_t*& thread_local_node_list = _get_thread_local_node_list(); - if (!thread_local_node_list) DAKING_UNLIKELY { - while (!global_chunk_stack_.try_pop(thread_local_node_list)) { - _reserve_global_internal(); - } - } - DAKING_TSAN_ANNOTATE_ACQUIRE(thread_local_node_list); - DAKING_TSAN_ANNOTATE_ACQUIRE(thread_local_node_list->next_); - node_t* res = std::exchange(thread_local_node_list, thread_local_node_list->next_.load(std::memory_order_relaxed)); - res->next_.store(nullptr, std::memory_order_relaxed); - return res; - } - - DAKING_ALWAYS_INLINE void _deallocate(node_t* node) noexcept { - node_t*& thread_local_node_list = _get_thread_local_node_list(); - node->next_.store(thread_local_node_list, std::memory_order_relaxed); - thread_local_node_list = node; - DAKING_TSAN_ANNOTATE_RELEASE(node); - if (++_get_thread_local_node_size() >= thread_local_capacity) DAKING_UNLIKELY { - global_chunk_stack_.push(thread_local_node_list); - thread_local_node_list = nullptr; - _get_thread_local_node_size() = 0; - } - } - - DAKING_ALWAYS_INLINE static bool _reserve_global_external(size_type chunk_count) { - manager_t& manager = _get_global_manager(); - size_type global_node_count = manager.node_count(); - if (global_node_count / thread_local_capacity >= chunk_count) { - return false; - } - std::lock_guard lock(global_mutex_); - global_node_count = manager.node_count(); - if (global_node_count / thread_local_capacity >= chunk_count) { - return false; - } - - size_type count = (chunk_count - global_node_count / thread_local_capacity) * thread_local_capacity; - manager.reserve(count); - return true; - } - - DAKING_ALWAYS_INLINE static void _reserve_global_internal() { - std::lock_guard lock(global_mutex_); - if (global_chunk_stack_.top_.load(std::memory_order_acquire).node_) { - // if anyone have already allocate chunks, I return. - return; - } - - _get_global_manager().reserve(std::max(thread_local_capacity, _get_global_manager().node_count())); - } - - DAKING_ALWAYS_INLINE void _free_global() { - /* Already locked */ - global_chunk_stack_.reset(); - _get_global_manager().reset(); - } - - /* Global LockFree*/ - inline static chunk_stack_t global_chunk_stack_{}; - inline static std::atomic global_instance_count_ = 0; - - /* Global Mutex*/ - inline static std::mutex global_mutex_{}; - inline static manager_t* global_manager_instance_ = nullptr; - - /* MPSC */ alignas(align) std::atomic head_; alignas(align) node_t* tail_; }; diff --git a/tests/test_MPSC.cpp b/tests/test_MPSC.cpp index 6811700..4fab8b6 100644 --- a/tests/test_MPSC.cpp +++ b/tests/test_MPSC.cpp @@ -4,11 +4,11 @@ #include #include #include -#include #include "daking/MPSC_queue.hpp" using daking::MPSC_queue; +using daking::stable; // Use default template parameters for testing using TestQueue = MPSC_queue; @@ -21,7 +21,7 @@ using StringQueue = MPSC_queue; TEST(MPSCQueueBasicTest, InitialStateAndEmptyCheck) { TestQueue queue; EXPECT_TRUE(queue.empty()); - EXPECT_EQ(TestQueue::global_node_size_apprx(), 256); // ThreadLocalCapacity default is 256 + EXPECT_EQ(queue.global_node_size_apprx(), 256); // ThreadLocalCapacity default is 256 } TEST(MPSCQueueBasicTest, EnqueueAndTryDequeueSingle) { @@ -63,38 +63,35 @@ TEST(MPSCQueueBasicTest, EmplaceMove) { TEST(MPSCQueueMemoryTest, GlobalResourceSharingAndDestruction) { // Two instances with the same template parameters share global resources - MPSC_queue* q1 = new MPSC_queue(); - MPSC_queue* q2 = new MPSC_queue(); + auto* q1 = new MPSC_queue>(); + auto* q2 = new MPSC_queue>(); - // The current thread now has 126 free nodes, q1 and q2 each have one dummy node - - // Pre-allocate some global nodes - MPSC_queue::reserve_global_chunk(5); - size_t initial_global_size = MPSC_queue::global_node_size_apprx(); - EXPECT_GE(initial_global_size, (size_t)5 * 128); + // Allocate some global nodes + q1->reserve_global_chunk(5); + size_t initial_global_size = q1->global_node_size_apprx(); + EXPECT_EQ(initial_global_size, (size_t)5 * 128); delete q1; - size_t after_delete_size = MPSC_queue::global_node_size_apprx(); + size_t after_delete_size = q2->global_node_size_apprx(); // q2 still exists, global resources should not be released - EXPECT_GE(after_delete_size, (size_t)initial_global_size); + EXPECT_EQ(after_delete_size, (size_t)initial_global_size); delete q2; - after_delete_size = MPSC_queue::global_node_size_apprx(); - // The last instance is destructed, global resources should be released - EXPECT_EQ(after_delete_size, (size_t)0); } TEST(MPSCQueueMemoryTest, ReserveGlobalChunk) { - using Q = MPSC_queue; - size_t initial_size = Q::global_node_size_apprx(); // Initial value is 0 + using Q = MPSC_queue>; Q q; // Create an instance to register global manager - Q::reserve_global_chunk(10); // Reserve 10 * 64 nodes - size_t reserved_size = Q::global_node_size_apprx(); - EXPECT_GE(reserved_size, initial_size + 10 * 64); + size_t initial_size = q.global_node_size_apprx(); + EXPECT_EQ(initial_size, 64); + + q.reserve_global_chunk(10); // Reserve to 10 * 64 nodes + size_t reserved_size = q.global_node_size_apprx(); + EXPECT_EQ(reserved_size, 10 * 64); // Reserve again, but with a smaller number, should not allocate - Q::reserve_global_chunk(5); - EXPECT_EQ(Q::global_node_size_apprx(), reserved_size); + q.reserve_global_chunk(5); + EXPECT_EQ(q.global_node_size_apprx(), reserved_size); } // ------------------------------------------------------------------------- @@ -230,7 +227,7 @@ struct CountingAllocator : Counter { }; TEST(MPSCQueueAllocatorTest, CustomAllocatorUsage) { - using AllocQueue = MPSC_queue>; + using AllocQueue = MPSC_queue, CountingAllocator>; AllocQueue* queue = new AllocQueue(); const size_t n = 1000; for (size_t i = 0; i < n; ++i) {