diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 8e19d4b9..296dc042 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -197,8 +197,8 @@ jobs: shared: false coverage: true build-type: "Debug" - cxxflags: "--coverage -fprofile-arcs -ftest-coverage" - ccflags: "--coverage -fprofile-arcs -ftest-coverage" + cxxflags: "--coverage -fprofile-arcs -ftest-coverage -fprofile-update=atomic" + ccflags: "--coverage -fprofile-arcs -ftest-coverage -fprofile-update=atomic" install: "lcov wget unzip" # Linux Clang (5 configurations) diff --git a/src/corosio/src/detail/epoll/acceptors.cpp b/src/corosio/src/detail/epoll/acceptors.cpp index 0dd59993..5f5bfbde 100644 --- a/src/corosio/src/detail/epoll/acceptors.cpp +++ b/src/corosio/src/detail/epoll/acceptors.cpp @@ -16,6 +16,8 @@ #include "src/detail/endpoint_convert.hpp" #include "src/detail/make_err.hpp" +#include + #include #include #include @@ -64,11 +66,14 @@ operator()() impl.set_socket(accepted_fd); // Register accepted socket with epoll (edge-triggered mode) - impl.desc_data_.fd = accepted_fd; - impl.desc_data_.read_op.store(nullptr, std::memory_order_relaxed); - impl.desc_data_.write_op.store(nullptr, std::memory_order_relaxed); - impl.desc_data_.connect_op.store(nullptr, std::memory_order_relaxed); - socket_svc->scheduler().register_descriptor(accepted_fd, &impl.desc_data_); + impl.desc_state_.fd = accepted_fd; + { + std::lock_guard lock(impl.desc_state_.mutex); + impl.desc_state_.read_op = nullptr; + impl.desc_state_.write_op = nullptr; + impl.desc_state_.connect_op = nullptr; + } + socket_svc->scheduler().register_descriptor(accepted_fd, &impl.desc_state_); sockaddr_in local_addr{}; socklen_t local_len = sizeof(local_addr); @@ -137,13 +142,6 @@ epoll_acceptor_impl(epoll_acceptor_service& svc) noexcept { } -void -epoll_acceptor_impl:: -update_epoll_events() noexcept -{ - svc_.scheduler().update_descriptor_events(fd_, &desc_data_, 0); -} - void epoll_acceptor_impl:: release() @@ -177,7 +175,10 @@ accept( if (accepted >= 0) { - desc_data_.read_ready.store(false, std::memory_order_relaxed); + { + std::lock_guard lock(desc_state_.mutex); + desc_state_.read_ready = false; + } op.accepted_fd = accepted; op.complete(0, 0); op.impl_ptr = shared_from_this(); @@ -191,33 +192,45 @@ accept( svc_.work_started(); op.impl_ptr = shared_from_this(); - desc_data_.read_op.store(&op, std::memory_order_release); - std::atomic_thread_fence(std::memory_order_seq_cst); + bool perform_now = false; + { + std::lock_guard lock(desc_state_.mutex); + if (desc_state_.read_ready) + { + desc_state_.read_ready = false; + perform_now = true; + } + else + { + desc_state_.read_op = &op; + } + } - if (desc_data_.read_ready.exchange(false, std::memory_order_acquire)) + if (perform_now) { - auto* claimed = desc_data_.read_op.exchange(nullptr, std::memory_order_acq_rel); - if (claimed) + op.perform_io(); + if (op.errn == EAGAIN || op.errn == EWOULDBLOCK) { - claimed->perform_io(); - if (claimed->errn == EAGAIN || claimed->errn == EWOULDBLOCK) - { - claimed->errn = 0; - desc_data_.read_op.store(claimed, std::memory_order_release); - } - else - { - svc_.post(claimed); - svc_.work_finished(); - } - // completion is always posted to scheduler queue, never inline. - return std::noop_coroutine(); + op.errn = 0; + std::lock_guard lock(desc_state_.mutex); + desc_state_.read_op = &op; + } + else + { + svc_.post(&op); + svc_.work_finished(); } + return std::noop_coroutine(); } if (op.cancelled.load(std::memory_order_acquire)) { - auto* claimed = desc_data_.read_op.exchange(nullptr, std::memory_order_acq_rel); + epoll_op* claimed = nullptr; + { + std::lock_guard lock(desc_state_.mutex); + if (desc_state_.read_op == &op) + claimed = std::exchange(desc_state_.read_op, nullptr); + } if (claimed) { svc_.post(claimed); @@ -247,9 +260,14 @@ cancel() noexcept } acc_.request_cancel(); - // Use atomic exchange - only one of cancellation or reactor will succeed - auto* claimed = desc_data_.read_op.exchange(nullptr, std::memory_order_acq_rel); - if (claimed == &acc_) + + epoll_op* claimed = nullptr; + { + std::lock_guard lock(desc_state_.mutex); + if (desc_state_.read_op == &acc_) + claimed = std::exchange(desc_state_.read_op, nullptr); + } + if (claimed) { acc_.impl_ptr = self; svc_.post(&acc_); @@ -263,9 +281,13 @@ cancel_single_op(epoll_op& op) noexcept { op.request_cancel(); - // Use atomic exchange - only one of cancellation or reactor will succeed - auto* claimed = desc_data_.read_op.exchange(nullptr, std::memory_order_acq_rel); - if (claimed == &op) + epoll_op* claimed = nullptr; + { + std::lock_guard lock(desc_state_.mutex); + if (desc_state_.read_op == &op) + claimed = std::exchange(desc_state_.read_op, nullptr); + } + if (claimed) { try { op.impl_ptr = shared_from_this(); @@ -281,20 +303,29 @@ close_socket() noexcept { cancel(); + if (desc_state_.is_enqueued_.load(std::memory_order_acquire)) + { + try { + desc_state_.impl_ref_ = shared_from_this(); + } catch (std::bad_weak_ptr const&) {} + } + if (fd_ >= 0) { - if (desc_data_.registered_events != 0) + if (desc_state_.registered_events != 0) svc_.scheduler().deregister_descriptor(fd_); ::close(fd_); fd_ = -1; } - desc_data_.fd = -1; - desc_data_.is_registered = false; - desc_data_.read_op.store(nullptr, std::memory_order_relaxed); - desc_data_.read_ready.store(false, std::memory_order_relaxed); - desc_data_.write_ready.store(false, std::memory_order_relaxed); - desc_data_.registered_events = 0; + desc_state_.fd = -1; + { + std::lock_guard lock(desc_state_.mutex); + desc_state_.read_op = nullptr; + desc_state_.read_ready = false; + desc_state_.write_ready = false; + } + desc_state_.registered_events = 0; // Clear cached endpoint local_endpoint_ = endpoint{}; @@ -383,9 +414,12 @@ open_acceptor( epoll_impl->fd_ = fd; // Register fd with epoll (edge-triggered mode) - epoll_impl->desc_data_.fd = fd; - epoll_impl->desc_data_.read_op.store(nullptr, std::memory_order_relaxed); - scheduler().register_descriptor(fd, &epoll_impl->desc_data_); + epoll_impl->desc_state_.fd = fd; + { + std::lock_guard lock(epoll_impl->desc_state_.mutex); + epoll_impl->desc_state_.read_op = nullptr; + } + scheduler().register_descriptor(fd, &epoll_impl->desc_state_); // Cache the local endpoint (queries OS for ephemeral port if port was 0) sockaddr_in local_addr{}; diff --git a/src/corosio/src/detail/epoll/acceptors.hpp b/src/corosio/src/detail/epoll/acceptors.hpp index 6477ba8d..d9c91e98 100644 --- a/src/corosio/src/detail/epoll/acceptors.hpp +++ b/src/corosio/src/detail/epoll/acceptors.hpp @@ -60,13 +60,12 @@ class epoll_acceptor_impl void cancel() noexcept override; void cancel_single_op(epoll_op& op) noexcept; void close_socket() noexcept; - void update_epoll_events() noexcept; void set_local_endpoint(endpoint ep) noexcept { local_endpoint_ = ep; } epoll_acceptor_service& service() noexcept { return svc_; } epoll_accept_op acc_; - descriptor_data desc_data_; + descriptor_state desc_state_; private: epoll_acceptor_service& svc_; diff --git a/src/corosio/src/detail/epoll/op.hpp b/src/corosio/src/detail/epoll/op.hpp index 0f0b65e2..05373f7f 100644 --- a/src/corosio/src/detail/epoll/op.hpp +++ b/src/corosio/src/detail/epoll/op.hpp @@ -33,6 +33,7 @@ #include #include #include +#include #include #include @@ -51,8 +52,8 @@ Persistent Registration ----------------------- - File descriptors are registered with epoll once (via descriptor_data) and - stay registered until closed. The descriptor_data tracks which operations + File descriptors are registered with epoll once (via descriptor_state) and + stay registered until closed. The descriptor_state tracks which operations are pending (read_op, write_op, connect_op). When an event arrives, the reactor dispatches to the appropriate pending operation. @@ -82,42 +83,68 @@ class epoll_socket_impl; class epoll_acceptor_impl; struct epoll_op; +// Forward declaration +class epoll_scheduler; + /** Per-descriptor state for persistent epoll registration. Tracks pending operations for a file descriptor. The fd is registered - once with epoll and stays registered until closed. Events are dispatched - to the appropriate pending operation (EPOLLIN -> read_op, etc.). + once with epoll and stays registered until closed. + + This struct extends scheduler_op to support deferred I/O processing. + When epoll events arrive, the reactor sets ready_events and queues + this descriptor for processing. When popped from the scheduler queue, + operator() performs the actual I/O and queues completion handlers. + + @par Deferred I/O Model + The reactor no longer performs I/O directly. Instead: + 1. Reactor sets ready_events and queues descriptor_state + 2. Scheduler pops descriptor_state and calls operator() + 3. operator() performs I/O under mutex and queues completions - With edge-triggered epoll (EPOLLET), atomic operations are required to - synchronize between operation registration and reactor event delivery. - The read_ready/write_ready flags cache edge events that arrived before - an operation was registered. + This eliminates per-descriptor mutex locking from the reactor hot path. + + @par Thread Safety + The mutex protects operation pointers and ready flags during I/O. + ready_events_ and is_enqueued_ are atomic for lock-free reactor access. */ -struct descriptor_data +struct descriptor_state : scheduler_op { - /// Currently registered events (EPOLLIN, EPOLLOUT, etc.) - std::uint32_t registered_events = 0; + std::mutex mutex; - /// Pending read operation (nullptr if none) - std::atomic read_op{nullptr}; + // Protected by mutex + epoll_op* read_op = nullptr; + epoll_op* write_op = nullptr; + epoll_op* connect_op = nullptr; - /// Pending write operation (nullptr if none) - std::atomic write_op{nullptr}; + // Caches edge events that arrived before an op was registered + bool read_ready = false; + bool write_ready = false; - /// Pending connect operation (nullptr if none) - std::atomic connect_op{nullptr}; + // Set during registration only (no mutex needed) + std::uint32_t registered_events = 0; + int fd = -1; - /// Cached read readiness (edge event arrived before op registered) - std::atomic read_ready{false}; + // For deferred I/O - set by reactor, read by scheduler + std::atomic ready_events_{0}; + std::atomic is_enqueued_{false}; + epoll_scheduler const* scheduler_ = nullptr; - /// Cached write readiness (edge event arrived before op registered) - std::atomic write_ready{false}; + // Prevents impl destruction while this descriptor_state is queued. + // Set by close_socket() when is_enqueued_ is true, cleared by operator(). + std::shared_ptr impl_ref_; - /// The file descriptor - int fd = -1; + /// Add ready events atomically. + void add_ready_events(std::uint32_t ev) noexcept + { + ready_events_.fetch_or(ev, std::memory_order_relaxed); + } + + /// Perform deferred I/O and queue completions. + void operator()() override; - /// Whether this descriptor is managed by persistent registration - bool is_registered = false; + /// Destroy without invoking. + void destroy() override {} }; struct epoll_op : scheduler_op @@ -206,17 +233,6 @@ struct epoll_op : scheduler_op cancelled.store(true, std::memory_order_release); } - void start(std::stop_token token) - { - cancelled.store(false, std::memory_order_release); - stop_cb.reset(); - socket_impl_ = nullptr; - acceptor_impl_ = nullptr; - - if (token.stop_possible()) - stop_cb.emplace(token, canceller{this}); - } - void start(std::stop_token token, epoll_socket_impl* impl) { cancelled.store(false, std::memory_order_release); diff --git a/src/corosio/src/detail/epoll/scheduler.cpp b/src/corosio/src/detail/epoll/scheduler.cpp index 537d3b62..dc550478 100644 --- a/src/corosio/src/detail/epoll/scheduler.cpp +++ b/src/corosio/src/detail/epoll/scheduler.cpp @@ -23,6 +23,7 @@ #include #include #include +#include #include #include @@ -44,7 +45,7 @@ Thread Model ------------ - ONE thread runs epoll_wait() at a time (the reactor thread) - - OTHER threads wait on wakeup_event_ (condition variable) for handlers + - OTHER threads wait on cond_ (condition variable) for handlers - When work is posted, exactly one waiting thread wakes via notify_one() - This matches Windows IOCP semantics where N posted items wake N threads @@ -56,20 +57,30 @@ - Run epoll_wait (unlocked), queue I/O completions, loop back 4. If queue empty and reactor running: wait on condvar for work - The reactor_running_ flag ensures only one thread owns epoll_wait(). + The task_running_ flag ensures only one thread owns epoll_wait(). After the reactor queues I/O completions, it loops back to try getting a handler, giving priority to handler execution over more I/O polling. + Signaling State (state_) + ------------------------ + The state_ variable encodes two pieces of information: + - Bit 0: signaled flag (1 = signaled, persists until cleared) + - Upper bits: waiter count (each waiter adds 2 before blocking) + + This allows efficient coordination: + - Signalers only call notify when waiters exist (state_ > 1) + - Waiters check if already signaled before blocking (fast-path) + Wake Coordination (wake_one_thread_and_unlock) ---------------------------------------------- When posting work: - - If idle threads exist: notify_one() wakes exactly one worker + - If waiters exist (state_ > 1): signal and notify_one() - Else if reactor running: interrupt via eventfd write - Else: no-op (thread will find work when it checks queue) - This is critical for matching IOCP behavior. With the old model, posting - N handlers would wake all threads (thundering herd). Now each post() - wakes at most one thread, and that thread handles exactly one item. + This avoids waking threads unnecessarily. With cascading wakes, + each handler execution wakes at most one additional thread if + more work exists in the queue. Work Counting ------------- @@ -86,8 +97,6 @@ namespace boost::corosio::detail { -namespace { - struct scheduler_context { epoll_scheduler const* key; @@ -103,6 +112,8 @@ struct scheduler_context } }; +namespace { + corosio::detail::thread_local_ptr context_stack; struct thread_context_guard @@ -133,8 +144,164 @@ find_context(epoll_scheduler const* self) noexcept return nullptr; } +/// Flush private work count to global counter. +void +flush_private_work( + scheduler_context* ctx, + std::atomic& outstanding_work) noexcept +{ + if (ctx && ctx->private_outstanding_work > 0) + { + outstanding_work.fetch_add( + ctx->private_outstanding_work, std::memory_order_relaxed); + ctx->private_outstanding_work = 0; + } +} + +/// Drain private queue to global queue, flushing work count first. +/// +/// @return True if any ops were drained. +bool +drain_private_queue( + scheduler_context* ctx, + std::atomic& outstanding_work, + op_queue& completed_ops) noexcept +{ + if (!ctx || ctx->private_queue.empty()) + return false; + + flush_private_work(ctx, outstanding_work); + completed_ops.splice(ctx->private_queue); + return true; +} + } // namespace +void +descriptor_state:: +operator()() +{ + is_enqueued_.store(false, std::memory_order_relaxed); + + // Take ownership of impl ref set by close_socket() to prevent + // the owning impl from being freed while we're executing + auto prevent_impl_destruction = std::move(impl_ref_); + + std::uint32_t ev = ready_events_.exchange(0, std::memory_order_acquire); + if (ev == 0) + { + scheduler_->compensating_work_started(); + return; + } + + op_queue local_ops; + + int err = 0; + if (ev & EPOLLERR) + { + socklen_t len = sizeof(err); + if (::getsockopt(fd, SOL_SOCKET, SO_ERROR, &err, &len) < 0) + err = errno; + if (err == 0) + err = EIO; + } + + epoll_op* rd = nullptr; + epoll_op* wr = nullptr; + epoll_op* cn = nullptr; + { + std::lock_guard lock(mutex); + if (ev & EPOLLIN) + { + rd = std::exchange(read_op, nullptr); + if (!rd) + read_ready = true; + } + if (ev & EPOLLOUT) + { + cn = std::exchange(connect_op, nullptr); + wr = std::exchange(write_op, nullptr); + if (!cn && !wr) + write_ready = true; + } + if (err && !(ev & (EPOLLIN | EPOLLOUT))) + { + rd = std::exchange(read_op, nullptr); + wr = std::exchange(write_op, nullptr); + cn = std::exchange(connect_op, nullptr); + } + } + + // Non-null after I/O means EAGAIN; re-register under lock below + if (rd) + { + if (err) + rd->complete(err, 0); + else + rd->perform_io(); + + if (rd->errn == EAGAIN || rd->errn == EWOULDBLOCK) + { + rd->errn = 0; + } + else + { + local_ops.push(rd); + rd = nullptr; + } + } + + if (cn) + { + if (err) + cn->complete(err, 0); + else + cn->perform_io(); + local_ops.push(cn); + cn = nullptr; + } + + if (wr) + { + if (err) + wr->complete(err, 0); + else + wr->perform_io(); + + if (wr->errn == EAGAIN || wr->errn == EWOULDBLOCK) + { + wr->errn = 0; + } + else + { + local_ops.push(wr); + wr = nullptr; + } + } + + if (rd || wr) + { + std::lock_guard lock(mutex); + if (rd) + read_op = rd; + if (wr) + write_op = wr; + } + + // Execute first handler inline — the scheduler's work_cleanup + // accounts for this as the "consumed" work item + scheduler_op* first = local_ops.pop(); + if (first) + { + scheduler_->post_deferred_completions(local_ops); + (*first)(); + } + else + { + scheduler_->compensating_work_started(); + } +} + epoll_scheduler:: epoll_scheduler( capy::execution_context& ctx, @@ -145,9 +312,9 @@ epoll_scheduler( , outstanding_work_(0) , stopped_(false) , shutdown_(false) - , reactor_running_(false) - , reactor_interrupted_(false) - , idle_thread_count_(0) + , task_running_(false) + , task_interrupted_(false) + , state_(0) { epoll_fd_ = ::epoll_create1(EPOLL_CLOEXEC); if (epoll_fd_ < 0) @@ -237,14 +404,14 @@ shutdown() h->destroy(); lock.lock(); } + + signal_all(lock); } outstanding_work_.store(0, std::memory_order_release); if (event_fd_ >= 0) interrupt_reactor(); - - wakeup_event_.notify_all(); } void @@ -280,10 +447,10 @@ post(capy::coro h) const auto ph = std::make_unique(h); - // Fast path: same thread posts to private queue without locking + // Fast path: same thread posts to private queue + // Only count locally; work_cleanup batches to global counter if (auto* ctx = find_context(this)) { - outstanding_work_.fetch_add(1, std::memory_order_relaxed); ++ctx->private_outstanding_work; ctx->private_queue.push(ph.release()); return; @@ -301,10 +468,10 @@ void epoll_scheduler:: post(scheduler_op* h) const { - // Fast path: same thread posts to private queue without locking + // Fast path: same thread posts to private queue + // Only count locally; work_cleanup batches to global counter if (auto* ctx = find_context(this)) { - outstanding_work_.fetch_add(1, std::memory_order_relaxed); ++ctx->private_outstanding_work; ctx->private_queue.push(h); return; @@ -347,15 +514,11 @@ void epoll_scheduler:: stop() { - bool expected = false; - if (stopped_.compare_exchange_strong(expected, true, - std::memory_order_release, std::memory_order_relaxed)) + std::unique_lock lock(mutex_); + if (!stopped_) { - // Wake all threads so they notice stopped_ and exit - { - std::lock_guard lock(mutex_); - wakeup_event_.notify_all(); - } + stopped_ = true; + signal_all(lock); interrupt_reactor(); } } @@ -364,23 +527,22 @@ bool epoll_scheduler:: stopped() const noexcept { - return stopped_.load(std::memory_order_acquire); + std::unique_lock lock(mutex_); + return stopped_; } void epoll_scheduler:: restart() { - stopped_.store(false, std::memory_order_release); + std::unique_lock lock(mutex_); + stopped_ = false; } std::size_t epoll_scheduler:: run() { - if (stopped_.load(std::memory_order_acquire)) - return 0; - if (outstanding_work_.load(std::memory_order_acquire) == 0) { stop(); @@ -388,11 +550,18 @@ run() } thread_context_guard ctx(this); + std::unique_lock lock(mutex_); std::size_t n = 0; - while (do_one(-1)) + for (;;) + { + if (!do_one(lock, -1, &ctx.frame_)) + break; if (n != (std::numeric_limits::max)()) ++n; + if (!lock.owns_lock()) + lock.lock(); + } return n; } @@ -400,9 +569,6 @@ std::size_t epoll_scheduler:: run_one() { - if (stopped_.load(std::memory_order_acquire)) - return 0; - if (outstanding_work_.load(std::memory_order_acquire) == 0) { stop(); @@ -410,16 +576,14 @@ run_one() } thread_context_guard ctx(this); - return do_one(-1); + std::unique_lock lock(mutex_); + return do_one(lock, -1, &ctx.frame_); } std::size_t epoll_scheduler:: wait_one(long usec) { - if (stopped_.load(std::memory_order_acquire)) - return 0; - if (outstanding_work_.load(std::memory_order_acquire) == 0) { stop(); @@ -427,16 +591,14 @@ wait_one(long usec) } thread_context_guard ctx(this); - return do_one(usec); + std::unique_lock lock(mutex_); + return do_one(lock, usec, &ctx.frame_); } std::size_t epoll_scheduler:: poll() { - if (stopped_.load(std::memory_order_acquire)) - return 0; - if (outstanding_work_.load(std::memory_order_acquire) == 0) { stop(); @@ -444,11 +606,18 @@ poll() } thread_context_guard ctx(this); + std::unique_lock lock(mutex_); std::size_t n = 0; - while (do_one(0)) + for (;;) + { + if (!do_one(lock, 0, &ctx.frame_)) + break; if (n != (std::numeric_limits::max)()) ++n; + if (!lock.owns_lock()) + lock.lock(); + } return n; } @@ -456,9 +625,6 @@ std::size_t epoll_scheduler:: poll_one() { - if (stopped_.load(std::memory_order_acquire)) - return 0; - if (outstanding_work_.load(std::memory_order_acquire) == 0) { stop(); @@ -466,12 +632,13 @@ poll_one() } thread_context_guard ctx(this); - return do_one(0); + std::unique_lock lock(mutex_); + return do_one(lock, 0, &ctx.frame_); } void epoll_scheduler:: -register_descriptor(int fd, descriptor_data* desc) const +register_descriptor(int fd, descriptor_state* desc) const { epoll_event ev{}; ev.events = EPOLLIN | EPOLLOUT | EPOLLET | EPOLLERR | EPOLLHUP; @@ -481,18 +648,12 @@ register_descriptor(int fd, descriptor_data* desc) const detail::throw_system_error(make_err(errno), "epoll_ctl (register)"); desc->registered_events = ev.events; - desc->is_registered = true; desc->fd = fd; - desc->read_ready.store(false, std::memory_order_relaxed); - desc->write_ready.store(false, std::memory_order_relaxed); -} + desc->scheduler_ = this; -void -epoll_scheduler:: -update_descriptor_events(int, descriptor_data*, std::uint32_t) const -{ - // Provides memory fence for operation pointer visibility across threads - std::atomic_thread_fence(std::memory_order_seq_cst); + std::lock_guard lock(desc->mutex); + desc->read_ready = false; + desc->write_ready = false; } void @@ -516,29 +677,58 @@ work_finished() const noexcept if (outstanding_work_.fetch_sub(1, std::memory_order_acq_rel) == 1) { // Last work item completed - wake all threads so they can exit. - // notify_all() wakes threads waiting on the condvar. + // signal_all() wakes threads waiting on the condvar. // interrupt_reactor() wakes the reactor thread blocked in epoll_wait(). // Both are needed because they target different blocking mechanisms. std::unique_lock lock(mutex_); - wakeup_event_.notify_all(); - if (reactor_running_ && !reactor_interrupted_) + signal_all(lock); + if (task_running_ && !task_interrupted_) { - reactor_interrupted_ = true; + task_interrupted_ = true; lock.unlock(); interrupt_reactor(); } } } +void +epoll_scheduler:: +compensating_work_started() const noexcept +{ + auto* ctx = find_context(this); + if (ctx) + ++ctx->private_outstanding_work; +} + void epoll_scheduler:: drain_thread_queue(op_queue& queue, long count) const { - std::lock_guard lock(mutex_); // Note: outstanding_work_ was already incremented when posting + std::unique_lock lock(mutex_); completed_ops_.splice(queue); if (count > 0) - wakeup_event_.notify_all(); + maybe_unlock_and_signal_one(lock); +} + +void +epoll_scheduler:: +post_deferred_completions(op_queue& ops) const +{ + if (ops.empty()) + return; + + // Fast path: if on scheduler thread, use private queue + if (auto* ctx = find_context(this)) + { + ctx->private_queue.splice(ops); + return; + } + + // Slow path: add to global queue and wake a thread + std::unique_lock lock(mutex_); + completed_ops_.splice(ops); + wake_one_thread_and_unlock(lock); } void @@ -557,16 +747,80 @@ interrupt_reactor() const void epoll_scheduler:: -wake_one_thread_and_unlock(std::unique_lock& lock) const +signal_all(std::unique_lock&) const +{ + state_ |= 1; + cond_.notify_all(); +} + +bool +epoll_scheduler:: +maybe_unlock_and_signal_one(std::unique_lock& lock) const { - if (idle_thread_count_ > 0) + state_ |= 1; + if (state_ > 1) { - wakeup_event_.notify_one(); lock.unlock(); + cond_.notify_one(); + return true; + } + return false; +} + +void +epoll_scheduler:: +unlock_and_signal_one(std::unique_lock& lock) const +{ + state_ |= 1; + bool have_waiters = state_ > 1; + lock.unlock(); + if (have_waiters) + cond_.notify_one(); +} + +void +epoll_scheduler:: +clear_signal() const +{ + state_ &= ~std::size_t(1); +} + +void +epoll_scheduler:: +wait_for_signal(std::unique_lock& lock) const +{ + while ((state_ & 1) == 0) + { + state_ += 2; + cond_.wait(lock); + state_ -= 2; + } +} + +void +epoll_scheduler:: +wait_for_signal_for( + std::unique_lock& lock, + long timeout_us) const +{ + if ((state_ & 1) == 0) + { + state_ += 2; + cond_.wait_for(lock, std::chrono::microseconds(timeout_us)); + state_ -= 2; } - else if (reactor_running_ && !reactor_interrupted_) +} + +void +epoll_scheduler:: +wake_one_thread_and_unlock(std::unique_lock& lock) const +{ + if (maybe_unlock_and_signal_one(lock)) + return; + + if (task_running_ && !task_interrupted_) { - reactor_interrupted_ = true; + task_interrupted_ = true; lock.unlock(); interrupt_reactor(); } @@ -576,10 +830,68 @@ wake_one_thread_and_unlock(std::unique_lock& lock) const } } -struct work_guard +/** RAII guard for handler execution work accounting. + + Handler consumes 1 work item, may produce N new items via fast-path posts. + Net change = N - 1: + - If N > 1: add (N-1) to global (more work produced than consumed) + - If N == 1: net zero, do nothing + - If N < 1: call work_finished() (work consumed, may trigger stop) + + Also drains private queue to global for other threads to process. +*/ +struct work_cleanup { - epoll_scheduler const* self; - ~work_guard() { self->work_finished(); } + epoll_scheduler const* scheduler; + std::unique_lock* lock; + scheduler_context* ctx; + + ~work_cleanup() + { + if (ctx) + { + long produced = ctx->private_outstanding_work; + if (produced > 1) + scheduler->outstanding_work_.fetch_add(produced - 1, std::memory_order_relaxed); + else if (produced < 1) + scheduler->work_finished(); + // produced == 1: net zero, handler consumed what it produced + ctx->private_outstanding_work = 0; + + if (!ctx->private_queue.empty()) + { + lock->lock(); + scheduler->completed_ops_.splice(ctx->private_queue); + } + } + else + { + // No thread context - slow-path op was already counted globally + scheduler->work_finished(); + } + } +}; + +/** RAII guard for reactor work accounting. + + Reactor only produces work via timer/signal callbacks posting handlers. + Unlike handler execution which consumes 1, the reactor consumes nothing. + All produced work must be flushed to global counter. +*/ +struct task_cleanup +{ + epoll_scheduler const* scheduler; + scheduler_context* ctx; + + ~task_cleanup() + { + if (ctx && ctx->private_outstanding_work > 0) + { + scheduler->outstanding_work_.fetch_add( + ctx->private_outstanding_work, std::memory_order_relaxed); + ctx->private_outstanding_work = 0; + } + } }; void @@ -621,13 +933,18 @@ update_timerfd() const void epoll_scheduler:: -run_reactor(std::unique_lock& lock) +run_task(std::unique_lock& lock, scheduler_context* ctx) { - int timeout_ms = reactor_interrupted_ ? 0 : -1; + int timeout_ms = task_interrupted_ ? 0 : -1; - lock.unlock(); + if (lock.owns_lock()) + lock.unlock(); + + // Flush private work count when reactor completes + task_cleanup on_exit{this, ctx}; + (void)on_exit; - // --- Event loop runs WITHOUT the mutex (like Asio) --- + // Event loop runs without mutex held epoll_event events[128]; int nfds = ::epoll_wait(epoll_fd_, events, 128, timeout_ms); @@ -659,136 +976,22 @@ run_reactor(std::unique_lock& lock) continue; } - auto* desc = static_cast(events[i].data.ptr); - std::uint32_t ev = events[i].events; - int err = 0; - - if (ev & (EPOLLERR | EPOLLHUP)) - { - socklen_t len = sizeof(err); - if (::getsockopt(desc->fd, SOL_SOCKET, SO_ERROR, &err, &len) < 0) - err = errno; - if (err == 0) - err = EIO; - } - - if (ev & EPOLLIN) - { - auto* op = desc->read_op.exchange(nullptr, std::memory_order_acq_rel); - if (op) - { - if (err) - { - op->complete(err, 0); - local_ops.push(op); - ++completions_queued; - } - else - { - op->perform_io(); - if (op->errn == EAGAIN || op->errn == EWOULDBLOCK) - { - op->errn = 0; - desc->read_op.store(op, std::memory_order_release); - } - else - { - local_ops.push(op); - ++completions_queued; - } - } - } - else - { - desc->read_ready.store(true, std::memory_order_release); - } - } + // Deferred I/O: just set ready events and enqueue descriptor + // No per-descriptor mutex locking in reactor hot path! + auto* desc = static_cast(events[i].data.ptr); + desc->add_ready_events(events[i].events); - if (ev & EPOLLOUT) + // Only enqueue if not already enqueued + bool expected = false; + if (desc->is_enqueued_.compare_exchange_strong(expected, true, + std::memory_order_release, std::memory_order_relaxed)) { - auto* conn_op = desc->connect_op.exchange(nullptr, std::memory_order_acq_rel); - if (conn_op) - { - if (err) - { - conn_op->complete(err, 0); - local_ops.push(conn_op); - ++completions_queued; - } - else - { - conn_op->perform_io(); - if (conn_op->errn == EAGAIN || conn_op->errn == EWOULDBLOCK) - { - conn_op->errn = 0; - desc->connect_op.store(conn_op, std::memory_order_release); - } - else - { - local_ops.push(conn_op); - ++completions_queued; - } - } - } - - auto* write_op = desc->write_op.exchange(nullptr, std::memory_order_acq_rel); - if (write_op) - { - if (err) - { - write_op->complete(err, 0); - local_ops.push(write_op); - ++completions_queued; - } - else - { - write_op->perform_io(); - if (write_op->errn == EAGAIN || write_op->errn == EWOULDBLOCK) - { - write_op->errn = 0; - desc->write_op.store(write_op, std::memory_order_release); - } - else - { - local_ops.push(write_op); - ++completions_queued; - } - } - } - - if (!conn_op && !write_op) - desc->write_ready.store(true, std::memory_order_release); - } - - if (err && !(ev & (EPOLLIN | EPOLLOUT))) - { - auto* read_op = desc->read_op.exchange(nullptr, std::memory_order_acq_rel); - if (read_op) - { - read_op->complete(err, 0); - local_ops.push(read_op); - ++completions_queued; - } - - auto* write_op = desc->write_op.exchange(nullptr, std::memory_order_acq_rel); - if (write_op) - { - write_op->complete(err, 0); - local_ops.push(write_op); - ++completions_queued; - } - - auto* conn_op = desc->connect_op.exchange(nullptr, std::memory_order_acq_rel); - if (conn_op) - { - conn_op->complete(err, 0); - local_ops.push(conn_op); - ++completions_queued; - } + local_ops.push(desc); + ++completions_queued; } } - // Process timers only when timerfd fires (like Asio's check_timers pattern) + // Process timers only when timerfd fires if (check_timers) { timer_svc_->process_expired(); @@ -801,104 +1004,90 @@ run_reactor(std::unique_lock& lock) if (!local_ops.empty()) completed_ops_.splice(local_ops); - // Drain private queue (outstanding_work_ was already incremented when posting) - if (auto* ctx = find_context(this)) + // Drain private queue to global (work count handled by task_cleanup) + if (ctx && !ctx->private_queue.empty()) { - if (!ctx->private_queue.empty()) - { - completions_queued += ctx->private_outstanding_work; - ctx->private_outstanding_work = 0; - completed_ops_.splice(ctx->private_queue); - } + completions_queued += ctx->private_outstanding_work; + completed_ops_.splice(ctx->private_queue); } - // Only wake threads that are actually idle, and only as many as we have work - if (completions_queued > 0 && idle_thread_count_ > 0) + // Signal and wake one waiter if work is queued + if (completions_queued > 0) { - int threads_to_wake = (std::min)(completions_queued, idle_thread_count_); - for (int i = 0; i < threads_to_wake; ++i) - wakeup_event_.notify_one(); + if (maybe_unlock_and_signal_one(lock)) + lock.lock(); } } std::size_t epoll_scheduler:: -do_one(long timeout_us) +do_one(std::unique_lock& lock, long timeout_us, scheduler_context* ctx) { - std::unique_lock lock(mutex_); - for (;;) { - if (stopped_.load(std::memory_order_acquire)) + if (stopped_) return 0; scheduler_op* op = completed_ops_.pop(); + // Handle reactor sentinel - time to poll for I/O if (op == &task_op_) { - // Check both global queue and private queue for pending handlers - auto* ctx = find_context(this); bool more_handlers = !completed_ops_.empty() || (ctx && !ctx->private_queue.empty()); - if (!more_handlers) + // Nothing to run the reactor for: no pending work to wait on, + // or caller requested a non-blocking poll + if (!more_handlers && + (outstanding_work_.load(std::memory_order_acquire) == 0 || + timeout_us == 0)) { - if (outstanding_work_.load(std::memory_order_acquire) == 0) - { - completed_ops_.push(&task_op_); - return 0; - } - if (timeout_us == 0) - { - completed_ops_.push(&task_op_); - return 0; - } + completed_ops_.push(&task_op_); + return 0; } - reactor_interrupted_ = more_handlers || timeout_us == 0; - reactor_running_ = true; + task_interrupted_ = more_handlers || timeout_us == 0; + task_running_ = true; - if (more_handlers && idle_thread_count_ > 0) - wakeup_event_.notify_one(); + if (more_handlers) + unlock_and_signal_one(lock); - run_reactor(lock); + run_task(lock, ctx); - reactor_running_ = false; + task_running_ = false; completed_ops_.push(&task_op_); continue; } + // Handle operation if (op != nullptr) { - lock.unlock(); - work_guard g{this}; + if (!completed_ops_.empty()) + unlock_and_signal_one(lock); + else + lock.unlock(); + + work_cleanup on_exit{this, &lock, ctx}; + (void)on_exit; + (*op)(); return 1; } - if (outstanding_work_.load(std::memory_order_acquire) == 0) - return 0; + // No work from global queue - try private queue before blocking + if (drain_private_queue(ctx, outstanding_work_, completed_ops_)) + continue; - if (timeout_us == 0) + // No pending work to wait on, or caller requested non-blocking poll + if (outstanding_work_.load(std::memory_order_acquire) == 0 || + timeout_us == 0) return 0; - // Drain private queue before blocking (outstanding_work_ was already incremented) - if (auto* ctx = find_context(this)) - { - if (!ctx->private_queue.empty()) - { - ctx->private_outstanding_work = 0; - completed_ops_.splice(ctx->private_queue); - continue; - } - } - - ++idle_thread_count_; + clear_signal(); if (timeout_us < 0) - wakeup_event_.wait(lock); + wait_for_signal(lock); else - wakeup_event_.wait_for(lock, std::chrono::microseconds(timeout_us)); - --idle_thread_count_; + wait_for_signal_for(lock, timeout_us); } } diff --git a/src/corosio/src/detail/epoll/scheduler.hpp b/src/corosio/src/detail/epoll/scheduler.hpp index 4d1f2ce6..c035ecd7 100644 --- a/src/corosio/src/detail/epoll/scheduler.hpp +++ b/src/corosio/src/detail/epoll/scheduler.hpp @@ -30,7 +30,8 @@ namespace boost::corosio::detail { struct epoll_op; -struct descriptor_data; +struct descriptor_state; +struct scheduler_context; /** Linux scheduler using epoll for I/O multiplexing. @@ -103,21 +104,13 @@ class epoll_scheduler /** Register a descriptor for persistent monitoring. The fd is registered once and stays registered until explicitly - deregistered. Events are dispatched via descriptor_data which + deregistered. Events are dispatched via descriptor_state which tracks pending read/write/connect operations. @param fd The file descriptor to register. @param desc Pointer to descriptor data (stored in epoll_event.data.ptr). */ - void register_descriptor(int fd, descriptor_data* desc) const; - - /** Update events for a persistently registered descriptor. - - @param fd The file descriptor. - @param desc Pointer to descriptor data. - @param events The new events to monitor. - */ - void update_descriptor_events(int fd, descriptor_data* desc, std::uint32_t events) const; + void register_descriptor(int fd, descriptor_state* desc) const; /** Deregister a persistently registered descriptor. @@ -131,6 +124,13 @@ class epoll_scheduler /** For use by I/O operations to track completed work. */ void work_finished() const noexcept override; + /** Offset a forthcoming work_finished from work_cleanup. + + Called by descriptor_state when all I/O returned EAGAIN and no + handler will be executed. Must be called from a scheduler thread. + */ + void compensating_work_started() const noexcept; + /** Drain work from thread context's private queue to global queue. Called by thread_context_guard destructor when a thread exits run(). @@ -141,36 +141,127 @@ class epoll_scheduler */ void drain_thread_queue(op_queue& queue, long count) const; + /** Post completed operations for deferred invocation. + + If called from a thread running this scheduler, operations go to + the thread's private queue (fast path). Otherwise, operations are + added to the global queue under mutex and a waiter is signaled. + + @par Preconditions + work_started() must have been called for each operation. + + @param ops Queue of operations to post. + */ + void post_deferred_completions(op_queue& ops) const; + private: - std::size_t do_one(long timeout_us); - void run_reactor(std::unique_lock& lock); + friend struct work_cleanup; + friend struct task_cleanup; + + std::size_t do_one(std::unique_lock& lock, long timeout_us, scheduler_context* ctx); + void run_task(std::unique_lock& lock, scheduler_context* ctx); void wake_one_thread_and_unlock(std::unique_lock& lock) const; void interrupt_reactor() const; void update_timerfd() const; + /** Set the signaled state and wake all waiting threads. + + @par Preconditions + Mutex must be held. + + @param lock The held mutex lock. + */ + void signal_all(std::unique_lock& lock) const; + + /** Set the signaled state and wake one waiter if any exist. + + Only unlocks and signals if at least one thread is waiting. + Use this when the caller needs to perform a fallback action + (such as interrupting the reactor) when no waiters exist. + + @par Preconditions + Mutex must be held. + + @param lock The held mutex lock. + + @return `true` if unlocked and signaled, `false` if lock still held. + */ + bool maybe_unlock_and_signal_one(std::unique_lock& lock) const; + + /** Set the signaled state, unlock, and wake one waiter if any exist. + + Always unlocks the mutex. Use this when the caller will release + the lock regardless of whether a waiter exists. + + @par Preconditions + Mutex must be held. + + @param lock The held mutex lock. + */ + void unlock_and_signal_one(std::unique_lock& lock) const; + + /** Clear the signaled state before waiting. + + @par Preconditions + Mutex must be held. + */ + void clear_signal() const; + + /** Block until the signaled state is set. + + Returns immediately if already signaled (fast-path). Otherwise + increments the waiter count, waits on the condition variable, + and decrements the waiter count upon waking. + + @par Preconditions + Mutex must be held. + + @param lock The held mutex lock. + */ + void wait_for_signal(std::unique_lock& lock) const; + + /** Block until signaled or timeout expires. + + @par Preconditions + Mutex must be held. + + @param lock The held mutex lock. + @param timeout_us Maximum time to wait in microseconds. + */ + void wait_for_signal_for( + std::unique_lock& lock, + long timeout_us) const; + int epoll_fd_; int event_fd_; // for interrupting reactor int timer_fd_; // timerfd for kernel-managed timer expiry mutable std::mutex mutex_; - mutable std::condition_variable wakeup_event_; + mutable std::condition_variable cond_; mutable op_queue completed_ops_; mutable std::atomic outstanding_work_; - std::atomic stopped_; + bool stopped_; bool shutdown_; timer_service* timer_svc_ = nullptr; - // Single reactor thread coordination - mutable bool reactor_running_ = false; - mutable bool reactor_interrupted_ = false; - mutable int idle_thread_count_ = 0; + // True while a thread is blocked in epoll_wait. Used by + // wake_one_thread_and_unlock and work_finished to know when + // an eventfd interrupt is needed instead of a condvar signal. + mutable bool task_running_ = false; + + // True when the reactor has been told to do a non-blocking poll + // (more handlers queued or poll mode). Prevents redundant eventfd + // writes and controls the epoll_wait timeout. + mutable bool task_interrupted_ = false; + + // Signaling state: bit 0 = signaled, upper bits = waiter count (incremented by 2) + mutable std::size_t state_ = 0; // Edge-triggered eventfd state mutable std::atomic eventfd_armed_{false}; - // Sentinel operation for interleaving reactor runs with handler execution. // Ensures the reactor runs periodically even when handlers are continuously - // posted, preventing timer starvation. + // posted, preventing starvation of I/O events, timers, and signals. struct task_op final : scheduler_op { void operator()() override {} diff --git a/src/corosio/src/detail/epoll/sockets.cpp b/src/corosio/src/detail/epoll/sockets.cpp index 8b8b16b9..0ad89653 100644 --- a/src/corosio/src/detail/epoll/sockets.cpp +++ b/src/corosio/src/detail/epoll/sockets.cpp @@ -19,6 +19,8 @@ #include #include +#include + #include #include #include @@ -115,14 +117,6 @@ epoll_socket_impl(epoll_socket_service& svc) noexcept epoll_socket_impl:: ~epoll_socket_impl() = default; -void -epoll_socket_impl:: -update_epoll_events() noexcept -{ - // With EPOLLET, update_descriptor_events just provides a memory fence - svc_.scheduler().update_descriptor_events(fd_, &desc_data_, 0); -} - void epoll_socket_impl:: release() @@ -174,32 +168,45 @@ connect( svc_.work_started(); op.impl_ptr = shared_from_this(); - desc_data_.connect_op.store(&op, std::memory_order_seq_cst); + bool perform_now = false; + { + std::lock_guard lock(desc_state_.mutex); + if (desc_state_.write_ready) + { + desc_state_.write_ready = false; + perform_now = true; + } + else + { + desc_state_.connect_op = &op; + } + } - if (desc_data_.write_ready.exchange(false, std::memory_order_seq_cst)) + if (perform_now) { - auto* claimed = desc_data_.connect_op.exchange(nullptr, std::memory_order_acq_rel); - if (claimed) + op.perform_io(); + if (op.errn == EAGAIN || op.errn == EWOULDBLOCK) { - claimed->perform_io(); - if (claimed->errn == EAGAIN || claimed->errn == EWOULDBLOCK) - { - claimed->errn = 0; - desc_data_.connect_op.store(claimed, std::memory_order_release); - } - else - { - svc_.post(claimed); - svc_.work_finished(); - } - // completion is always posted to scheduler queue, never inline. - return std::noop_coroutine(); + op.errn = 0; + std::lock_guard lock(desc_state_.mutex); + desc_state_.connect_op = &op; } + else + { + svc_.post(&op); + svc_.work_finished(); + } + return std::noop_coroutine(); } if (op.cancelled.load(std::memory_order_acquire)) { - auto* claimed = desc_data_.connect_op.exchange(nullptr, std::memory_order_acq_rel); + epoll_op* claimed = nullptr; + { + std::lock_guard lock(desc_state_.mutex); + if (desc_state_.connect_op == &op) + claimed = std::exchange(desc_state_.connect_op, nullptr); + } if (claimed) { svc_.post(claimed); @@ -227,7 +234,10 @@ do_read_io() if (n > 0) { - desc_data_.read_ready.store(false, std::memory_order_relaxed); + { + std::lock_guard lock(desc_state_.mutex); + desc_state_.read_ready = false; + } op.complete(0, static_cast(n)); svc_.post(&op); return; @@ -235,7 +245,10 @@ do_read_io() if (n == 0) { - desc_data_.read_ready.store(false, std::memory_order_relaxed); + { + std::lock_guard lock(desc_state_.mutex); + desc_state_.read_ready = false; + } op.complete(0, 0); svc_.post(&op); return; @@ -245,31 +258,45 @@ do_read_io() { svc_.work_started(); - desc_data_.read_op.store(&op, std::memory_order_seq_cst); + bool perform_now = false; + { + std::lock_guard lock(desc_state_.mutex); + if (desc_state_.read_ready) + { + desc_state_.read_ready = false; + perform_now = true; + } + else + { + desc_state_.read_op = &op; + } + } - if (desc_data_.read_ready.exchange(false, std::memory_order_seq_cst)) + if (perform_now) { - auto* claimed = desc_data_.read_op.exchange(nullptr, std::memory_order_acq_rel); - if (claimed) + op.perform_io(); + if (op.errn == EAGAIN || op.errn == EWOULDBLOCK) + { + op.errn = 0; + std::lock_guard lock(desc_state_.mutex); + desc_state_.read_op = &op; + } + else { - claimed->perform_io(); - if (claimed->errn == EAGAIN || claimed->errn == EWOULDBLOCK) - { - claimed->errn = 0; - desc_data_.read_op.store(claimed, std::memory_order_release); - } - else - { - svc_.post(claimed); - svc_.work_finished(); - } - return; + svc_.post(&op); + svc_.work_finished(); } + return; } if (op.cancelled.load(std::memory_order_acquire)) { - auto* claimed = desc_data_.read_op.exchange(nullptr, std::memory_order_acq_rel); + epoll_op* claimed = nullptr; + { + std::lock_guard lock(desc_state_.mutex); + if (desc_state_.read_op == &op) + claimed = std::exchange(desc_state_.read_op, nullptr); + } if (claimed) { svc_.post(claimed); @@ -297,7 +324,10 @@ do_write_io() if (n > 0) { - desc_data_.write_ready.store(false, std::memory_order_relaxed); + { + std::lock_guard lock(desc_state_.mutex); + desc_state_.write_ready = false; + } op.complete(0, static_cast(n)); svc_.post(&op); return; @@ -307,31 +337,45 @@ do_write_io() { svc_.work_started(); - desc_data_.write_op.store(&op, std::memory_order_seq_cst); + bool perform_now = false; + { + std::lock_guard lock(desc_state_.mutex); + if (desc_state_.write_ready) + { + desc_state_.write_ready = false; + perform_now = true; + } + else + { + desc_state_.write_op = &op; + } + } - if (desc_data_.write_ready.exchange(false, std::memory_order_seq_cst)) + if (perform_now) { - auto* claimed = desc_data_.write_op.exchange(nullptr, std::memory_order_acq_rel); - if (claimed) + op.perform_io(); + if (op.errn == EAGAIN || op.errn == EWOULDBLOCK) + { + op.errn = 0; + std::lock_guard lock(desc_state_.mutex); + desc_state_.write_op = &op; + } + else { - claimed->perform_io(); - if (claimed->errn == EAGAIN || claimed->errn == EWOULDBLOCK) - { - claimed->errn = 0; - desc_data_.write_op.store(claimed, std::memory_order_release); - } - else - { - svc_.post(claimed); - svc_.work_finished(); - } - return; + svc_.post(&op); + svc_.work_finished(); } + return; } if (op.cancelled.load(std::memory_order_acquire)) { - auto* claimed = desc_data_.write_op.exchange(nullptr, std::memory_order_acq_rel); + epoll_op* claimed = nullptr; + { + std::lock_guard lock(desc_state_.mutex); + if (desc_state_.write_op == &op) + claimed = std::exchange(desc_state_.write_op, nullptr); + } if (claimed) { svc_.post(claimed); @@ -584,22 +628,41 @@ cancel() noexcept return; } - // Use atomic exchange to claim operations - only one of cancellation - // or reactor will succeed - auto cancel_atomic_op = [this, &self](epoll_op& op, std::atomic& desc_op_ptr) { - op.request_cancel(); - auto* claimed = desc_op_ptr.exchange(nullptr, std::memory_order_acq_rel); - if (claimed == &op) - { - op.impl_ptr = self; - svc_.post(&op); - svc_.work_finished(); - } - }; + conn_.request_cancel(); + rd_.request_cancel(); + wr_.request_cancel(); - cancel_atomic_op(conn_, desc_data_.connect_op); - cancel_atomic_op(rd_, desc_data_.read_op); - cancel_atomic_op(wr_, desc_data_.write_op); + epoll_op* conn_claimed = nullptr; + epoll_op* rd_claimed = nullptr; + epoll_op* wr_claimed = nullptr; + { + std::lock_guard lock(desc_state_.mutex); + if (desc_state_.connect_op == &conn_) + conn_claimed = std::exchange(desc_state_.connect_op, nullptr); + if (desc_state_.read_op == &rd_) + rd_claimed = std::exchange(desc_state_.read_op, nullptr); + if (desc_state_.write_op == &wr_) + wr_claimed = std::exchange(desc_state_.write_op, nullptr); + } + + if (conn_claimed) + { + conn_.impl_ptr = self; + svc_.post(&conn_); + svc_.work_finished(); + } + if (rd_claimed) + { + rd_.impl_ptr = self; + svc_.post(&rd_); + svc_.work_finished(); + } + if (wr_claimed) + { + wr_.impl_ptr = self; + svc_.post(&wr_); + svc_.work_finished(); + } } void @@ -608,16 +671,20 @@ cancel_single_op(epoll_op& op) noexcept { op.request_cancel(); - std::atomic* desc_op_ptr = nullptr; - if (&op == &conn_) desc_op_ptr = &desc_data_.connect_op; - else if (&op == &rd_) desc_op_ptr = &desc_data_.read_op; - else if (&op == &wr_) desc_op_ptr = &desc_data_.write_op; + epoll_op** desc_op_ptr = nullptr; + if (&op == &conn_) desc_op_ptr = &desc_state_.connect_op; + else if (&op == &rd_) desc_op_ptr = &desc_state_.read_op; + else if (&op == &wr_) desc_op_ptr = &desc_state_.write_op; if (desc_op_ptr) { - // Use atomic exchange - only one of cancellation or reactor will succeed - auto* claimed = desc_op_ptr->exchange(nullptr, std::memory_order_acq_rel); - if (claimed == &op) + epoll_op* claimed = nullptr; + { + std::lock_guard lock(desc_state_.mutex); + if (*desc_op_ptr == &op) + claimed = std::exchange(*desc_op_ptr, nullptr); + } + if (claimed) { try { op.impl_ptr = shared_from_this(); @@ -634,22 +701,34 @@ close_socket() noexcept { cancel(); + // Keep impl alive if descriptor_state is queued in the scheduler. + // Without this, destroy_impl() drops the last shared_ptr while + // the queued descriptor_state node would become dangling. + if (desc_state_.is_enqueued_.load(std::memory_order_acquire)) + { + try { + desc_state_.impl_ref_ = shared_from_this(); + } catch (std::bad_weak_ptr const&) {} + } + if (fd_ >= 0) { - if (desc_data_.registered_events != 0) + if (desc_state_.registered_events != 0) svc_.scheduler().deregister_descriptor(fd_); ::close(fd_); fd_ = -1; } - desc_data_.fd = -1; - desc_data_.is_registered = false; - desc_data_.read_op.store(nullptr, std::memory_order_relaxed); - desc_data_.write_op.store(nullptr, std::memory_order_relaxed); - desc_data_.connect_op.store(nullptr, std::memory_order_relaxed); - desc_data_.read_ready.store(false, std::memory_order_relaxed); - desc_data_.write_ready.store(false, std::memory_order_relaxed); - desc_data_.registered_events = 0; + desc_state_.fd = -1; + { + std::lock_guard lock(desc_state_.mutex); + desc_state_.read_op = nullptr; + desc_state_.write_op = nullptr; + desc_state_.connect_op = nullptr; + desc_state_.read_ready = false; + desc_state_.write_ready = false; + } + desc_state_.registered_events = 0; local_endpoint_ = endpoint{}; remote_endpoint_ = endpoint{}; @@ -718,11 +797,14 @@ open_socket(tcp_socket::socket_impl& impl) epoll_impl->fd_ = fd; // Register fd with epoll (edge-triggered mode) - epoll_impl->desc_data_.fd = fd; - epoll_impl->desc_data_.read_op.store(nullptr, std::memory_order_relaxed); - epoll_impl->desc_data_.write_op.store(nullptr, std::memory_order_relaxed); - epoll_impl->desc_data_.connect_op.store(nullptr, std::memory_order_relaxed); - scheduler().register_descriptor(fd, &epoll_impl->desc_data_); + epoll_impl->desc_state_.fd = fd; + { + std::lock_guard lock(epoll_impl->desc_state_.mutex); + epoll_impl->desc_state_.read_op = nullptr; + epoll_impl->desc_state_.write_op = nullptr; + epoll_impl->desc_state_.connect_op = nullptr; + } + scheduler().register_descriptor(fd, &epoll_impl->desc_state_); return {}; } diff --git a/src/corosio/src/detail/epoll/sockets.hpp b/src/corosio/src/detail/epoll/sockets.hpp index 880d9b7b..165860dc 100644 --- a/src/corosio/src/detail/epoll/sockets.hpp +++ b/src/corosio/src/detail/epoll/sockets.hpp @@ -147,7 +147,6 @@ class epoll_socket_impl void cancel() noexcept override; void cancel_single_op(epoll_op& op) noexcept; void close_socket() noexcept; - void update_epoll_events() noexcept; void set_socket(int fd) noexcept { fd_ = fd; } void set_endpoints(endpoint local, endpoint remote) noexcept { @@ -160,7 +159,7 @@ class epoll_socket_impl epoll_write_op wr_; /// Per-descriptor state for persistent epoll registration - descriptor_data desc_data_; + descriptor_state desc_state_; cached_initiator read_initiator_; cached_initiator write_initiator_; @@ -222,9 +221,6 @@ class epoll_socket_service : public socket_service std::unique_ptr state_; }; -// Backward compatibility alias -using epoll_sockets = epoll_socket_service; - } // namespace boost::corosio::detail #endif // BOOST_COROSIO_HAS_EPOLL diff --git a/src/corosio/src/detail/scheduler_op.hpp b/src/corosio/src/detail/scheduler_op.hpp index 56c6dfee..8b174075 100644 --- a/src/corosio/src/detail/scheduler_op.hpp +++ b/src/corosio/src/detail/scheduler_op.hpp @@ -117,6 +117,10 @@ class scheduler_op : public intrusive_queue::node } func_type func_; + + // Pad to 32 bytes so derived structs (descriptor_state, epoll_op) + // keep hot fields on optimal cache line boundaries + std::byte reserved_[sizeof(void*)] = {}; }; //------------------------------------------------------------------------------ diff --git a/src/corosio/src/detail/timer_service.cpp b/src/corosio/src/detail/timer_service.cpp index 69c98e86..6449dfcf 100644 --- a/src/corosio/src/detail/timer_service.cpp +++ b/src/corosio/src/detail/timer_service.cpp @@ -11,7 +11,7 @@ #include #include "src/detail/intrusive.hpp" -#include "src/detail/resume_coro.hpp" +#include "src/detail/scheduler_op.hpp" #include #include #include @@ -28,6 +28,60 @@ namespace boost::corosio::detail { class timer_service_impl; +// Completion operation posted to scheduler when timer expires or is cancelled. +// Runs inside work_cleanup scope so work accounting is batched correctly. +struct timer_op final : scheduler_op +{ + capy::coro h; + capy::executor_ref d; + std::error_code* ec_out = nullptr; + std::error_code ec_value; + scheduler* sched = nullptr; + + timer_op() noexcept + : scheduler_op(&timer_op::do_complete) + { + } + + static void do_complete( + void* owner, + scheduler_op* base, + std::uint32_t, + std::uint32_t) + { + auto* self = static_cast(base); + if (!owner) + { + delete self; + return; + } + (*self)(); + } + + void operator()() override + { + if (ec_out) + *ec_out = ec_value; + + // Capture before posting (coro may destroy this op) + auto* service = sched; + sched = nullptr; + + d.post(h); + + // Balance the on_work_started() from timer_impl::wait() + if (service) + service->on_work_finished(); + + delete this; + } + + void destroy() override + { + delete this; + } +}; + struct timer_impl : timer::timer_impl , intrusive_list::node @@ -194,14 +248,16 @@ class timer_service_impl : public timer_service notify = (impl.heap_index_ == 0); } - // Resume cancelled waiter outside lock + // Post cancelled waiter as scheduler_op (runs inside work_cleanup scope) if (was_waiting) { - if (ec_out) - *ec_out = make_error_code(capy::error::canceled); - resume_coro(d, h); - // Call on_work_finished AFTER the coroutine resumes - sched_->on_work_finished(); + auto* op = new timer_op; + op->h = h; + op->d = std::move(d); + op->ec_out = ec_out; + op->ec_value = make_error_code(capy::error::canceled); + op->sched = sched_; + sched_->post(op); } if (notify) @@ -234,14 +290,16 @@ class timer_service_impl : public timer_service } } - // Dispatch outside lock + // Post cancelled waiter as scheduler_op (runs inside work_cleanup scope) if (was_waiting) { - if (ec_out) - *ec_out = make_error_code(capy::error::canceled); - resume_coro(d, h); - // Call on_work_finished AFTER the coroutine resumes - sched_->on_work_finished(); + auto* op = new timer_op; + op->h = h; + op->d = std::move(d); + op->ec_out = ec_out; + op->ec_value = make_error_code(capy::error::canceled); + op->sched = sched_; + sched_->post(op); } } @@ -259,14 +317,8 @@ class timer_service_impl : public timer_service std::size_t process_expired() override { - // Collect expired timers while holding lock - struct expired_entry - { - std::coroutine_handle<> h; - capy::executor_ref d; - std::error_code* ec_out; - }; - std::vector expired; + // Collect expired timer_ops while holding lock + std::vector expired; { std::lock_guard lock(mutex_); @@ -280,23 +332,22 @@ class timer_service_impl : public timer_service if (t->waiting_) { t->waiting_ = false; - expired.push_back({t->h_, std::move(t->d_), t->ec_out_}); + auto* op = new timer_op; + op->h = t->h_; + op->d = std::move(t->d_); + op->ec_out = t->ec_out_; + op->ec_value = {}; // Success + op->sched = sched_; + expired.push_back(op); } // If not waiting, timer is removed but not dispatched - // wait() will handle this by checking expiry } } - // Dispatch outside lock - for (auto& e : expired) - { - if (e.ec_out) - *e.ec_out = {}; - resume_coro(e.d, e.h); - // Call on_work_finished AFTER the coroutine resumes, so it has a - // chance to add new work before we potentially trigger stop() - sched_->on_work_finished(); - } + // Post ops to scheduler (they run inside work_cleanup scope) + for (auto* op : expired) + sched_->post(op); return expired.size(); } @@ -390,12 +441,10 @@ wait( if (already_expired) { - // Timer already expired - dispatch immediately + // Timer already expired - post for work tracking if (ec) *ec = {}; - // Note: no work tracking needed - we dispatch synchronously - resume_coro(d, h); - // completion is always posted to scheduler queue, never inline. + d.post(h); return std::noop_coroutine(); }