Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -197,8 +197,8 @@ jobs:
shared: false
coverage: true
build-type: "Debug"
cxxflags: "--coverage -fprofile-arcs -ftest-coverage"
ccflags: "--coverage -fprofile-arcs -ftest-coverage"
cxxflags: "--coverage -fprofile-arcs -ftest-coverage -fprofile-update=atomic"
ccflags: "--coverage -fprofile-arcs -ftest-coverage -fprofile-update=atomic"
install: "lcov wget unzip"

# Linux Clang (5 configurations)
Expand Down
130 changes: 82 additions & 48 deletions src/corosio/src/detail/epoll/acceptors.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@
#include "src/detail/endpoint_convert.hpp"
#include "src/detail/make_err.hpp"

#include <utility>

#include <errno.h>
#include <netinet/in.h>
#include <sys/epoll.h>
Expand Down Expand Up @@ -64,11 +66,14 @@ operator()()
impl.set_socket(accepted_fd);

// Register accepted socket with epoll (edge-triggered mode)
impl.desc_data_.fd = accepted_fd;
impl.desc_data_.read_op.store(nullptr, std::memory_order_relaxed);
impl.desc_data_.write_op.store(nullptr, std::memory_order_relaxed);
impl.desc_data_.connect_op.store(nullptr, std::memory_order_relaxed);
socket_svc->scheduler().register_descriptor(accepted_fd, &impl.desc_data_);
impl.desc_state_.fd = accepted_fd;
{
std::lock_guard lock(impl.desc_state_.mutex);
impl.desc_state_.read_op = nullptr;
impl.desc_state_.write_op = nullptr;
impl.desc_state_.connect_op = nullptr;
}
socket_svc->scheduler().register_descriptor(accepted_fd, &impl.desc_state_);

sockaddr_in local_addr{};
socklen_t local_len = sizeof(local_addr);
Expand Down Expand Up @@ -137,13 +142,6 @@ epoll_acceptor_impl(epoll_acceptor_service& svc) noexcept
{
}

void
epoll_acceptor_impl::
update_epoll_events() noexcept
{
svc_.scheduler().update_descriptor_events(fd_, &desc_data_, 0);
}

void
epoll_acceptor_impl::
release()
Expand Down Expand Up @@ -177,7 +175,10 @@ accept(

if (accepted >= 0)
{
desc_data_.read_ready.store(false, std::memory_order_relaxed);
{
std::lock_guard lock(desc_state_.mutex);
desc_state_.read_ready = false;
}
op.accepted_fd = accepted;
op.complete(0, 0);
op.impl_ptr = shared_from_this();
Expand All @@ -191,33 +192,45 @@ accept(
svc_.work_started();
op.impl_ptr = shared_from_this();

desc_data_.read_op.store(&op, std::memory_order_release);
std::atomic_thread_fence(std::memory_order_seq_cst);
bool perform_now = false;
{
std::lock_guard lock(desc_state_.mutex);
if (desc_state_.read_ready)
{
desc_state_.read_ready = false;
perform_now = true;
}
else
{
desc_state_.read_op = &op;
}
}

if (desc_data_.read_ready.exchange(false, std::memory_order_acquire))
if (perform_now)
{
auto* claimed = desc_data_.read_op.exchange(nullptr, std::memory_order_acq_rel);
if (claimed)
op.perform_io();
if (op.errn == EAGAIN || op.errn == EWOULDBLOCK)
{
claimed->perform_io();
if (claimed->errn == EAGAIN || claimed->errn == EWOULDBLOCK)
{
claimed->errn = 0;
desc_data_.read_op.store(claimed, std::memory_order_release);
}
else
{
svc_.post(claimed);
svc_.work_finished();
}
// completion is always posted to scheduler queue, never inline.
return std::noop_coroutine();
op.errn = 0;
std::lock_guard lock(desc_state_.mutex);
desc_state_.read_op = &op;
}
else
{
svc_.post(&op);
svc_.work_finished();
}
return std::noop_coroutine();
}

if (op.cancelled.load(std::memory_order_acquire))
{
auto* claimed = desc_data_.read_op.exchange(nullptr, std::memory_order_acq_rel);
epoll_op* claimed = nullptr;
{
std::lock_guard lock(desc_state_.mutex);
if (desc_state_.read_op == &op)
claimed = std::exchange(desc_state_.read_op, nullptr);
}
if (claimed)
{
svc_.post(claimed);
Expand Down Expand Up @@ -247,9 +260,14 @@ cancel() noexcept
}

acc_.request_cancel();
// Use atomic exchange - only one of cancellation or reactor will succeed
auto* claimed = desc_data_.read_op.exchange(nullptr, std::memory_order_acq_rel);
if (claimed == &acc_)

epoll_op* claimed = nullptr;
{
std::lock_guard lock(desc_state_.mutex);
if (desc_state_.read_op == &acc_)
claimed = std::exchange(desc_state_.read_op, nullptr);
}
if (claimed)
{
acc_.impl_ptr = self;
svc_.post(&acc_);
Expand All @@ -263,9 +281,13 @@ cancel_single_op(epoll_op& op) noexcept
{
op.request_cancel();

// Use atomic exchange - only one of cancellation or reactor will succeed
auto* claimed = desc_data_.read_op.exchange(nullptr, std::memory_order_acq_rel);
if (claimed == &op)
epoll_op* claimed = nullptr;
{
std::lock_guard lock(desc_state_.mutex);
if (desc_state_.read_op == &op)
claimed = std::exchange(desc_state_.read_op, nullptr);
}
if (claimed)
{
try {
op.impl_ptr = shared_from_this();
Expand All @@ -281,20 +303,29 @@ close_socket() noexcept
{
cancel();

if (desc_state_.is_enqueued_.load(std::memory_order_acquire))
{
try {
desc_state_.impl_ref_ = shared_from_this();
} catch (std::bad_weak_ptr const&) {}
}

if (fd_ >= 0)
{
if (desc_data_.registered_events != 0)
if (desc_state_.registered_events != 0)
svc_.scheduler().deregister_descriptor(fd_);
::close(fd_);
fd_ = -1;
}

desc_data_.fd = -1;
desc_data_.is_registered = false;
desc_data_.read_op.store(nullptr, std::memory_order_relaxed);
desc_data_.read_ready.store(false, std::memory_order_relaxed);
desc_data_.write_ready.store(false, std::memory_order_relaxed);
desc_data_.registered_events = 0;
desc_state_.fd = -1;
{
std::lock_guard lock(desc_state_.mutex);
desc_state_.read_op = nullptr;
desc_state_.read_ready = false;
desc_state_.write_ready = false;
}
desc_state_.registered_events = 0;

// Clear cached endpoint
local_endpoint_ = endpoint{};
Expand Down Expand Up @@ -383,9 +414,12 @@ open_acceptor(
epoll_impl->fd_ = fd;

// Register fd with epoll (edge-triggered mode)
epoll_impl->desc_data_.fd = fd;
epoll_impl->desc_data_.read_op.store(nullptr, std::memory_order_relaxed);
scheduler().register_descriptor(fd, &epoll_impl->desc_data_);
epoll_impl->desc_state_.fd = fd;
{
std::lock_guard lock(epoll_impl->desc_state_.mutex);
epoll_impl->desc_state_.read_op = nullptr;
}
scheduler().register_descriptor(fd, &epoll_impl->desc_state_);

// Cache the local endpoint (queries OS for ephemeral port if port was 0)
sockaddr_in local_addr{};
Expand Down
3 changes: 1 addition & 2 deletions src/corosio/src/detail/epoll/acceptors.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -60,13 +60,12 @@ class epoll_acceptor_impl
void cancel() noexcept override;
void cancel_single_op(epoll_op& op) noexcept;
void close_socket() noexcept;
void update_epoll_events() noexcept;
void set_local_endpoint(endpoint ep) noexcept { local_endpoint_ = ep; }

epoll_acceptor_service& service() noexcept { return svc_; }

epoll_accept_op acc_;
descriptor_data desc_data_;
descriptor_state desc_state_;

private:
epoll_acceptor_service& svc_;
Expand Down
88 changes: 52 additions & 36 deletions src/corosio/src/detail/epoll/op.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
#include <atomic>
#include <cstddef>
#include <memory>
#include <mutex>
#include <optional>
#include <stop_token>

Expand All @@ -51,8 +52,8 @@

Persistent Registration
-----------------------
File descriptors are registered with epoll once (via descriptor_data) and
stay registered until closed. The descriptor_data tracks which operations
File descriptors are registered with epoll once (via descriptor_state) and
stay registered until closed. The descriptor_state tracks which operations
are pending (read_op, write_op, connect_op). When an event arrives, the
reactor dispatches to the appropriate pending operation.

Expand Down Expand Up @@ -82,42 +83,68 @@ class epoll_socket_impl;
class epoll_acceptor_impl;
struct epoll_op;

// Forward declaration
class epoll_scheduler;

/** Per-descriptor state for persistent epoll registration.

Tracks pending operations for a file descriptor. The fd is registered
once with epoll and stays registered until closed. Events are dispatched
to the appropriate pending operation (EPOLLIN -> read_op, etc.).
once with epoll and stays registered until closed.

This struct extends scheduler_op to support deferred I/O processing.
When epoll events arrive, the reactor sets ready_events and queues
this descriptor for processing. When popped from the scheduler queue,
operator() performs the actual I/O and queues completion handlers.

@par Deferred I/O Model
The reactor no longer performs I/O directly. Instead:
1. Reactor sets ready_events and queues descriptor_state
2. Scheduler pops descriptor_state and calls operator()
3. operator() performs I/O under mutex and queues completions

With edge-triggered epoll (EPOLLET), atomic operations are required to
synchronize between operation registration and reactor event delivery.
The read_ready/write_ready flags cache edge events that arrived before
an operation was registered.
This eliminates per-descriptor mutex locking from the reactor hot path.

@par Thread Safety
The mutex protects operation pointers and ready flags during I/O.
ready_events_ and is_enqueued_ are atomic for lock-free reactor access.
*/
struct descriptor_data
struct descriptor_state : scheduler_op
{
/// Currently registered events (EPOLLIN, EPOLLOUT, etc.)
std::uint32_t registered_events = 0;
std::mutex mutex;

/// Pending read operation (nullptr if none)
std::atomic<epoll_op*> read_op{nullptr};
// Protected by mutex
epoll_op* read_op = nullptr;
epoll_op* write_op = nullptr;
epoll_op* connect_op = nullptr;

/// Pending write operation (nullptr if none)
std::atomic<epoll_op*> write_op{nullptr};
// Caches edge events that arrived before an op was registered
bool read_ready = false;
bool write_ready = false;

/// Pending connect operation (nullptr if none)
std::atomic<epoll_op*> connect_op{nullptr};
// Set during registration only (no mutex needed)
std::uint32_t registered_events = 0;
int fd = -1;

/// Cached read readiness (edge event arrived before op registered)
std::atomic<bool> read_ready{false};
// For deferred I/O - set by reactor, read by scheduler
std::atomic<std::uint32_t> ready_events_{0};
std::atomic<bool> is_enqueued_{false};
epoll_scheduler const* scheduler_ = nullptr;

/// Cached write readiness (edge event arrived before op registered)
std::atomic<bool> write_ready{false};
// Prevents impl destruction while this descriptor_state is queued.
// Set by close_socket() when is_enqueued_ is true, cleared by operator().
std::shared_ptr<void> impl_ref_;

/// The file descriptor
int fd = -1;
/// Add ready events atomically.
void add_ready_events(std::uint32_t ev) noexcept
{
ready_events_.fetch_or(ev, std::memory_order_relaxed);
}
Comment on lines +137 to +141
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🔴 Critical

🧩 Analysis chain

🏁 Script executed:

rg -n -C5 'ready_events_' --type=cpp

Repository: cppalliance/corosio

Length of output: 3334


🏁 Script executed:

rg -n -B10 -A10 'ready_events_\.exchange' --type=cpp

Repository: cppalliance/corosio

Length of output: 1572


Change fetch_or to use memory_order_release to pair with consumer's memory_order_acquire.

The producer-side fetch_or with memory_order_relaxed (line 140) does not provide the release semantics needed to synchronize with the consumer's exchange(..., memory_order_acquire) (scheduler.cpp:190). Acquire semantics require a corresponding release store to establish a happens-before relationship; relaxed operations provide no such guarantee. This can result in I/O side-effects from the reactor thread not being visible to the scheduler thread when it reads ready_events_.

Change to memory_order_release on the producer side:

ready_events_.fetch_or(ev, std::memory_order_release);
🤖 Prompt for AI Agents
In `@src/corosio/src/detail/epoll/op.hpp` around lines 137 - 141, The producer
uses ready_events_.fetch_or in add_ready_events with memory_order_relaxed, which
doesn't provide release semantics to match the consumer's exchange(...,
memory_order_acquire) in scheduler.cpp; change the atomic operation in
add_ready_events to use std::memory_order_release (i.e., call
ready_events_.fetch_or(ev, std::memory_order_release)) so the producer
establishes the necessary release-acquire synchronization with the scheduler's
exchange.


/// Perform deferred I/O and queue completions.
void operator()() override;

/// Whether this descriptor is managed by persistent registration
bool is_registered = false;
/// Destroy without invoking.
void destroy() override {}
Comment on lines +146 to +147
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

destroy() should clear impl_ref_ to prevent a memory leak during shutdown.

close_socket() sets impl_ref_ (holding a shared_ptr to the owning impl) when is_enqueued_ is true. If the scheduler shuts down without draining its queue, scheduler_op_queue::~scheduler_op_queue() calls destroy() on remaining items. The empty override leaves impl_ref_ set, creating a self-referencing cycle (the impl → descriptor_stateimpl_ref_ → impl) that is never broken.

🐛 Proposed fix
     /// Destroy without invoking.
-    void destroy() override {}
+    void destroy() override { impl_ref_.reset(); }
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
/// Destroy without invoking.
void destroy() override {}
/// Destroy without invoking.
void destroy() override { impl_ref_.reset(); }
🤖 Prompt for AI Agents
In `@src/corosio/src/detail/epoll/op.hpp` around lines 146 - 147, The override of
destroy() currently does nothing, leaving descriptor_state::impl_ref_ set and
causing a self-referencing cycle on shutdown; update destroy() (the override in
the scheduler op derived class) to clear/reset impl_ref_ (e.g.,
impl_ref_.reset()) so that when scheduler_op_queue::~scheduler_op_queue() calls
destroy() on queued items the shared_ptr cycle is broken — check related symbols
close_socket(), is_enqueued_, and descriptor_state to ensure impl_ref_ is the
member being cleared.

};

struct epoll_op : scheduler_op
Expand Down Expand Up @@ -206,17 +233,6 @@ struct epoll_op : scheduler_op
cancelled.store(true, std::memory_order_release);
}

void start(std::stop_token token)
{
cancelled.store(false, std::memory_order_release);
stop_cb.reset();
socket_impl_ = nullptr;
acceptor_impl_ = nullptr;

if (token.stop_possible())
stop_cb.emplace(token, canceller{this});
}

void start(std::stop_token token, epoll_socket_impl* impl)
{
cancelled.store(false, std::memory_order_release);
Expand Down
Loading
Loading