Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 20 additions & 9 deletions include/boost/capy/delay.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -152,22 +152,33 @@ class delay_awaitable

ts_ = &env->executor.context().use_service<detail::timer_service>();

// Schedule timer (won't fire inline since deadline is in the future)
tid_ = ts_->schedule_after(dur_,
// Register stop callback before arming the timer.
// Once the timer is armed, another thread can fire it,
// resume the coroutine, and destroy this awaitable.
stop_cb_active_ = true;
::new(stop_cb_buf_) stop_cb_t(
env->stop_token,
cancel_fn{this, env->executor, h});

// If the stop callback already claimed the resume
// (inline invocation), skip the timer entirely.
if(claimed_.load(std::memory_order_acquire))
return std::noop_coroutine();

// Schedule timer using the output-reference overload so
// that tid_ is written while the timer_service lock is
// held — the timer thread cannot fire the callback until
// after the lock is released, at which point tid_ is set.
ts_->schedule_after(dur_,
[this, h, ex = env->executor]()
{
if(!claimed_.exchange(
true, std::memory_order_acq_rel))
{
ex.post(h);
}
});

// Register stop callback (may fire inline)
::new(stop_cb_buf_) stop_cb_t(
env->stop_token,
cancel_fn{this, env->executor, h});
stop_cb_active_ = true;
},
tid_);

return std::noop_coroutine();
}
Expand Down
18 changes: 12 additions & 6 deletions include/boost/capy/ex/detail/timer_service.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -57,15 +57,20 @@ class BOOST_CAPY_DECL
The callback is invoked on the timer service's background
thread. It must not block for extended periods.

@return An id that can be passed to cancel().
The id is written to @p out while the internal lock is
held, so the timer thread cannot fire the callback before
the write completes. This is required when the id
destination lives in memory that the callback itself may
free (e.g. a coroutine frame).
*/
template<typename Rep, typename Period>
timer_id schedule_after(
void schedule_after(
std::chrono::duration<Rep, Period> dur,
std::function<void()> cb)
std::function<void()> cb,
timer_id& out)
{
auto deadline = std::chrono::steady_clock::now() + dur;
return schedule_at(deadline, std::move(cb));
schedule_at(deadline, std::move(cb), out);
}

/** Cancel a pending timer.
Expand Down Expand Up @@ -97,9 +102,10 @@ class BOOST_CAPY_DECL
}
};

timer_id schedule_at(
void schedule_at(
std::chrono::steady_clock::time_point deadline,
std::function<void()> cb);
std::function<void()> cb,
timer_id& out);

void run();

Expand Down
7 changes: 4 additions & 3 deletions src/ex/detail/timer_service.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -26,18 +26,19 @@ timer_service::
stop_and_join();
}

timer_service::timer_id
void
timer_service::
schedule_at(
std::chrono::steady_clock::time_point deadline,
std::function<void()> cb)
std::function<void()> cb,
timer_id& out)
{
std::lock_guard lock(mutex_);
auto id = ++next_id_;
out = id;
active_ids_.insert(id);
queue_.push(entry{deadline, id, std::move(cb)});
cv_.notify_one();
return id;
}

void
Expand Down
56 changes: 34 additions & 22 deletions test/unit/ex/detail/timer_service.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -35,10 +35,11 @@ struct timer_service_test
std::latch done(1);
bool fired = false;

detail::timer_service::timer_id id;
ts.schedule_after(1ms, [&] {
fired = true;
done.count_down();
});
}, id);

done.wait();
BOOST_TEST(fired);
Expand All @@ -54,9 +55,10 @@ struct timer_service_test

bool fired = false;

auto id = ts.schedule_after(1s, [&] {
detail::timer_service::timer_id id;
ts.schedule_after(1s, [&] {
fired = true;
});
}, id);

ts.cancel(id);

Expand All @@ -75,9 +77,10 @@ struct timer_service_test

std::latch done(1);

auto id = ts.schedule_after(1ms, [&] {
detail::timer_service::timer_id id;
ts.schedule_after(1ms, [&] {
done.count_down();
});
}, id);

done.wait();
// Should not block or crash
Expand All @@ -98,21 +101,22 @@ struct timer_service_test

auto const scale = failsafe_scale;

detail::timer_service::timer_id id;
ts.schedule_after(30ms * scale, [&] {
std::lock_guard lock(mu);
order.push_back(3);
done.count_down();
});
}, id);
ts.schedule_after(10ms * scale, [&] {
std::lock_guard lock(mu);
order.push_back(1);
done.count_down();
});
}, id);
ts.schedule_after(20ms * scale, [&] {
std::lock_guard lock(mu);
order.push_back(2);
done.count_down();
});
}, id);

done.wait();
BOOST_TEST_EQ(order.size(), 3u);
Expand All @@ -132,9 +136,10 @@ struct timer_service_test
std::latch done(1);
auto start = std::chrono::steady_clock::now();

detail::timer_service::timer_id id;
ts.schedule_after(0ms, [&] {
done.count_down();
});
}, id);

done.wait();
auto elapsed = std::chrono::steady_clock::now() - start;
Expand All @@ -153,12 +158,13 @@ struct timer_service_test
std::atomic<int> count{0};
std::latch done(N);

detail::timer_service::timer_id id;
for(int i = 0; i < N; ++i)
{
ts.schedule_after(1ms, [&] {
count.fetch_add(1, std::memory_order_relaxed);
done.count_down();
});
}, id);
}

done.wait();
Expand All @@ -176,16 +182,19 @@ struct timer_service_test
std::atomic<int> count{0};
std::latch done(1);

auto id1 = ts.schedule_after(10ms, [&] {
detail::timer_service::timer_id id1;
ts.schedule_after(10ms, [&] {
count.fetch_add(1, std::memory_order_relaxed);
});
}, id1);
detail::timer_service::timer_id id2;
ts.schedule_after(10ms, [&] {
count.fetch_add(1, std::memory_order_relaxed);
done.count_down();
});
auto id3 = ts.schedule_after(10ms, [&] {
}, id2);
detail::timer_service::timer_id id3;
ts.schedule_after(10ms, [&] {
count.fetch_add(1, std::memory_order_relaxed);
});
}, id3);

ts.cancel(id1);
ts.cancel(id3);
Expand All @@ -207,9 +216,10 @@ struct timer_service_test
.use_service<detail::timer_service>();

// Schedule timers far in the future
ts.schedule_after(10s, [] {});
ts.schedule_after(10s, [] {});
ts.schedule_after(10s, [] {});
detail::timer_service::timer_id id;
ts.schedule_after(10s, [] {}, id);
ts.schedule_after(10s, [] {}, id);
ts.schedule_after(10s, [] {}, id);

// pool destructor calls shutdown — should not hang
}
Expand All @@ -228,9 +238,10 @@ struct timer_service_test
auto start = std::chrono::steady_clock::now();
auto dur = 50ms;

detail::timer_service::timer_id id;
ts.schedule_after(dur, [&] {
done.count_down();
});
}, id);

done.wait();
auto elapsed = std::chrono::steady_clock::now() - start;
Expand All @@ -249,12 +260,13 @@ struct timer_service_test
std::atomic<bool> callback_finished{false};
std::latch started(1);

auto id = ts.schedule_after(1ms, [&] {
detail::timer_service::timer_id id;
ts.schedule_after(1ms, [&] {
callback_started.store(true);
started.count_down();
std::this_thread::sleep_for(50ms);
callback_finished.store(true);
});
}, id);

// Wait for callback to start executing
started.wait();
Expand Down Expand Up @@ -284,4 +296,4 @@ struct timer_service_test
TEST_SUITE(timer_service_test, "capy.ex.timer_service");

} // capy
} // boost
} // boost
Loading