Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions example/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ add_subdirectory(mock-stream-testing)
add_subdirectory(parallel-fetch)
add_subdirectory(parallel-tasks)
add_subdirectory(producer-consumer)
add_subdirectory(quitter-shutdown)
add_subdirectory(strand-serialization)
add_subdirectory(stream-pipeline)
add_subdirectory(timeout-cancellation)
Expand Down
22 changes: 22 additions & 0 deletions example/quitter-shutdown/CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
#
# Copyright (c) 2026 Michael Vandeberg
#
# Distributed under the Boost Software License, Version 1.0. (See accompanying
# file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
#
# Official repository: https://github.com/cppalliance/capy
#

file(GLOB_RECURSE PFILES CONFIGURE_DEPENDS *.cpp *.hpp
CMakeLists.txt
Jamfile)

source_group(TREE ${CMAKE_CURRENT_SOURCE_DIR} PREFIX "" FILES ${PFILES})

add_executable(capy_example_quitter_shutdown ${PFILES})

set_property(TARGET capy_example_quitter_shutdown
PROPERTY FOLDER "examples")

target_link_libraries(capy_example_quitter_shutdown
Boost::capy)
165 changes: 165 additions & 0 deletions example/quitter-shutdown/quitter_shutdown.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,165 @@
//
// Copyright (c) 2026 Michael Vandeberg
//
// Distributed under the Boost Software License, Version 1.0. (See accompanying
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
//
// Official repository: https://github.com/cppalliance/capy
//

/* Quitter Shutdown Example

Demonstrates quitter<T> for responsive application shutdown.

Four workers simulate a batch file-processing pipeline: each
"downloads" data (delay), "transforms" it, and "writes" the
result (delay). Workers are quitter<> coroutines — their
bodies contain zero cancellation-handling code.

Press Ctrl+C to request shutdown. Every in-flight worker
exits at its next co_await, RAII cleanup runs (each worker
holds a resource_guard that logs its cleanup), and the
application prints a summary and exits.

Contrast with task<>:
With task<>, every co_await that touches I/O needs:
auto [ec] = co_await delay(dur);
if(ec) co_return; // <-- cancellation boilerplate
This is repeated at every suspension point.

With quitter<>, the promise intercepts the stop token
automatically. The worker body is pure business logic.
*/

#include <boost/capy.hpp>

#include <atomic>
#include <chrono>
#include <csignal>
#include <iostream>
#include <latch>
#include <sstream>
#include <stop_token>

namespace capy = boost::capy;
using namespace std::chrono_literals;

// Global stop source wired to Ctrl+C.
static std::stop_source g_stop;
static std::atomic<std::chrono::steady_clock::time_point>
g_stop_time{std::chrono::steady_clock::time_point{}};

extern "C" void signal_handler(int)
{
g_stop_time.store(std::chrono::steady_clock::now(),
std::memory_order_relaxed);
g_stop.request_stop();
}

// RAII resource that logs construction and destruction.
// Simulates holding a file handle, socket, or temp buffer
// that must be released on shutdown.
struct resource_guard
{
int id;
std::atomic<int>& cleanup_count;

resource_guard(int id_, std::atomic<int>& count)
: id(id_)
, cleanup_count(count)
{
std::ostringstream oss;
oss << " [worker " << id << "] acquired resources\n";
std::cout << oss.str();
}

~resource_guard()
{
++cleanup_count;
std::ostringstream oss;
oss << " [worker " << id << "] released resources "
<< "(cleanup)\n";
std::cout << oss.str();
}

resource_guard(resource_guard const&) = delete;
resource_guard& operator=(resource_guard const&) = delete;
};

// A single worker: download → transform → write, repeated.
// No cancellation code. quitter handles it.
capy::quitter<> worker(
int id,
std::atomic<int>& items_processed,
std::atomic<int>& cleanup_count)
{
resource_guard guard(id, cleanup_count);

for(int item = 0; ; ++item)
{
// Simulate download (200-400ms depending on worker)
auto download_time = 200ms + 50ms * id;
(void) co_await capy::delay(download_time);

// Simulate transform (CPU work — no co_await needed)
{
std::ostringstream oss;
oss << " [worker " << id << "] processing item "
<< item << "\n";
std::cout << oss.str();
}

// Simulate write (100ms)
(void) co_await capy::delay(100ms);

++items_processed;
}

// Never reached — the loop is infinite.
// quitter exits at the next co_await after stop is requested.
}

int main()
{
std::signal(SIGINT, signal_handler);
#ifdef SIGTERM
std::signal(SIGTERM, signal_handler);
#endif

constexpr int num_workers = 4;
capy::thread_pool pool(num_workers);
std::latch done(num_workers);

std::atomic<int> items_processed{0};
std::atomic<int> cleanup_count{0};

std::cout << "Starting " << num_workers
<< " workers. Press Ctrl+C to quit.\n\n";

for(int i = 0; i < num_workers; ++i)
{
capy::run_async(
pool.get_executor(),
g_stop.get_token(),
[&]() { done.count_down(); },
[&](std::exception_ptr) { done.count_down(); })(
worker(i, items_processed, cleanup_count));
}

done.wait();

auto stop_at = g_stop_time.load(std::memory_order_relaxed);
auto now = std::chrono::steady_clock::now();

std::cout << "\nShutdown complete.\n"
<< " Items processed: " << items_processed << "\n"
<< " Workers cleaned up: " << cleanup_count
<< "/" << num_workers << "\n";

if(stop_at != std::chrono::steady_clock::time_point{})
{
auto us = std::chrono::duration_cast<
std::chrono::microseconds>(now - stop_at).count();
std::cout << " Shutdown latency: " << us << " us\n";
}
}
1 change: 1 addition & 0 deletions include/boost/capy.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
#include <boost/capy/error.hpp>
#include <boost/capy/io_result.hpp>
#include <boost/capy/io_task.hpp>
#include <boost/capy/quitter.hpp>
#include <boost/capy/task.hpp>

// Algorithms
Expand Down
28 changes: 28 additions & 0 deletions include/boost/capy/detail/stop_requested_exception.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
//
// Copyright (c) 2026 Michael Vandeberg
//
// Distributed under the Boost Software License, Version 1.0. (See accompanying
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
//
// Official repository: https://github.com/cppalliance/capy
//

#ifndef BOOST_CAPY_DETAIL_STOP_REQUESTED_EXCEPTION_HPP
#define BOOST_CAPY_DETAIL_STOP_REQUESTED_EXCEPTION_HPP

namespace boost {
namespace capy {
namespace detail {

/* Lightweight sentinel thrown inside quitter<T> when the stop token
is triggered. Not derived from std::exception. Never escapes the
coroutine — unhandled_exception() catches it and sets the stopped
flag. The cost is one throw+catch per cancellation per coroutine
lifetime. */
struct stop_requested_exception {};

} // namespace detail
} // namespace capy
} // namespace boost

#endif
11 changes: 9 additions & 2 deletions include/boost/capy/ex/run_async.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -283,9 +283,16 @@ make_trampoline(Ex, Handlers, Alloc)
// promise_type ctor steals the parameters
auto& p = co_await get_promise_awaiter<
typename run_async_trampoline<Ex, Handlers, Alloc>::promise_type>{};


// Guard ensures the task frame is destroyed even when invoke_
// throws (e.g. default_handler rethrows an unhandled exception).
struct frame_guard
{
std::coroutine_handle<>& h;
~frame_guard() { h.destroy(); }
} guard{p.task_h_};

p.invoke_(p.task_promise_, p.handlers_);
p.task_h_.destroy();
}

} // namespace detail
Expand Down
Loading
Loading