diff --git a/CMakeLists.txt b/CMakeLists.txt index 6eaad4d3..3dae2718 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -111,5 +111,5 @@ endif () # #------------------------------------------------- if (BOOST_CAPY_BUILD_EXAMPLES) - # add_subdirectory(example) + add_subdirectory(example) endif () diff --git a/doc/modules/ROOT/pages/examples/buffer-composition.adoc b/doc/modules/ROOT/pages/examples/buffer-composition.adoc index d93afd82..a9f3be13 100644 --- a/doc/modules/ROOT/pages/examples/buffer-composition.adoc +++ b/doc/modules/ROOT/pages/examples/buffer-composition.adoc @@ -5,8 +5,8 @@ Composing buffer sequences without allocation for scatter/gather I/O. == What You Will Learn * Creating buffers from different sources -* Composing buffer sequences with `cat()` -* Zero-allocation scatter/gather patterns +* Using `const_buffer_pair` and `mutable_buffer_pair` for scatter/gather I/O +* Zero-allocation buffer sequence patterns == Prerequisites @@ -21,80 +21,112 @@ Composing buffer sequences without allocation for scatter/gather I/O. #include #include #include +#include using namespace boost::capy; -void demonstrate_buffers() +void demonstrate_single_buffers() { - // Individual buffers from various sources + std::cout << "=== Single Buffer Examples ===\n\n"; + + // Create buffers from various sources + std::string str = "Hello, World!"; + char arr[] = "Array data"; + std::vector vec = {'V', 'e', 'c', 't', 'o', 'r'}; + + // make_buffer creates buffer views (no copies) + auto str_buf = make_buffer(str); // mutable_buffer + auto arr_buf = make_buffer(arr, sizeof(arr) - 1); // mutable_buffer - Exclude null terminator + auto vec_buf = make_buffer(vec); // mutable_buffer + + std::cout << "String buffer: " << str_buf.size() << " bytes\n"; + std::cout << "Array buffer: " << arr_buf.size() << " bytes\n"; + std::cout << "Vector buffer: " << vec_buf.size() << " bytes\n"; +} + +void demonstrate_buffer_pair() +{ + std::cout << "\n=== Buffer Pair (Scatter/Gather) ===\n\n"; + + // const_buffer_pair is std::array std::string header = "Content-Type: text/plain\r\n\r\n"; std::string body = "Hello, World!"; - char footer[] = "\r\n--END--"; - - // Create buffer views (no copies) - auto header_buf = make_buffer(header); - auto body_buf = make_buffer(body); - auto footer_buf = make_buffer(footer, sizeof(footer) - 1); - // Compose into a single sequence (no allocation!) - auto message = cat(header_buf, body_buf, footer_buf); + const_buffer_pair message = {{ + make_buffer(header), + make_buffer(body) + }}; - // Measure - std::cout << "Total message size: " << buffer_size(message) << " bytes\n"; + // Calculate total size + std::size_t total = buffer_size(message); + std::cout << "Total message size: " << total << " bytes\n"; std::cout << "Buffer count: " << buffer_length(message) << "\n"; - // Iterate (for demonstration) + // Iterate through buffers std::cout << "\nBuffer contents:\n"; - for (auto it = begin(message); it != end(message); ++it) + for (auto const& buf : message) // const_buffer const& { - const_buffer buf = *it; std::cout << " [" << buf.size() << " bytes]: "; std::cout.write(static_cast(buf.data()), buf.size()); std::cout << "\n"; } } -// HTTP-style message assembly -struct http_message +void demonstrate_buffer_array() { - std::string status_line = "HTTP/1.1 200 OK\r\n"; - std::array, 2> headers = {{ - {"Content-Type", "application/json"}, - {"Server", "Capy/1.0"} - }}; + std::cout << "\n=== Multi-Buffer Array ===\n\n"; + + // Use std::array for more than 2 buffers + std::string status = "HTTP/1.1 200 OK\r\n"; + std::string content_type = "Content-Type: application/json\r\n"; + std::string server = "Server: Capy/1.0\r\n"; + std::string empty_line = "\r\n"; std::string body = R"({"status":"ok"})"; - // Returns a composed buffer sequence - auto buffers() const - { - // Format headers - static constexpr char crlf[] = "\r\n"; - static constexpr char sep[] = ": "; - static constexpr char empty_line[] = "\r\n"; - - return cat( - make_buffer(status_line), - make_buffer(headers[0].first), make_buffer(sep, 2), - make_buffer(headers[0].second), make_buffer(crlf, 2), - make_buffer(headers[1].first), make_buffer(sep, 2), - make_buffer(headers[1].second), make_buffer(crlf, 2), - make_buffer(empty_line, 2), - make_buffer(body) - ); - } -}; + std::array http_response = {{ + make_buffer(status), + make_buffer(content_type), + make_buffer(server), + make_buffer(empty_line), + make_buffer(body) + }}; + + std::size_t total = buffer_size(http_response); + std::cout << "HTTP response size: " << total << " bytes\n"; + std::cout << "Buffer count: " << buffer_length(http_response) << "\n"; + + // In real code with streams: + // co_await write(stream, http_response); + // This performs scatter/gather I/O - single syscall for all buffers +} -int main() +void demonstrate_mutable_buffers() { - demonstrate_buffers(); + std::cout << "\n=== Mutable Buffer Example ===\n\n"; + + // Mutable buffers for receiving data + char buf1[64]; + char buf2[64]; - std::cout << "\n--- HTTP Message ---\n"; - http_message msg; - auto bufs = msg.buffers(); - std::cout << "Message size: " << buffer_size(bufs) << " bytes\n"; + mutable_buffer_pair recv_buffers = {{ + mutable_buffer(buf1, sizeof(buf1)), + mutable_buffer(buf2, sizeof(buf2)) + }}; + + std::cout << "Prepared " << buffer_length(recv_buffers) + << " buffers with " << buffer_size(recv_buffers) + << " bytes total capacity\n"; - // In real code: co_await write(stream, msg.buffers()); - // Single system call writes all buffers (scatter/gather) + // In real code: + // auto [ec, n] = co_await stream.read_some(recv_buffers); +} + +int main() +{ + demonstrate_single_buffers(); + demonstrate_buffer_pair(); + demonstrate_buffer_array(); + demonstrate_mutable_buffers(); return 0; } @@ -114,30 +146,45 @@ target_link_libraries(buffer_composition PRIVATE capy) [source,cpp] ---- -auto header_buf = make_buffer(header); -auto body_buf = make_buffer(body); -auto footer_buf = make_buffer(footer, sizeof(footer) - 1); +auto str_buf = make_buffer(str); // mutable_buffer +auto arr_buf = make_buffer(arr, sizeof(arr) - 1); // mutable_buffer ---- `make_buffer` creates buffer views from various sources. No data is copied—the buffers reference the original storage. -=== Zero-Allocation Composition +=== Buffer Pairs [source,cpp] ---- -auto message = cat(header_buf, body_buf, footer_buf); +const_buffer_pair message = {{ + make_buffer(header), + make_buffer(body) +}}; ---- -`cat()` composes buffer sequences without allocation. The returned object stores references and iterates through all buffers in sequence. +`const_buffer_pair` is `std::array` — a fixed-size buffer sequence for scatter/gather I/O. Similarly, `mutable_buffer_pair` holds two mutable buffers. + +=== Multi-Buffer Arrays + +[source,cpp] +---- +std::array http_response = {{ + make_buffer(status), + make_buffer(content_type), + // ... +}}; +---- + +For more than two buffers, use `std::array` directly. Buffer sequences support `buffer_size()` and `buffer_length()` for querying total bytes and buffer count. === Scatter/Gather I/O [source,cpp] ---- -co_await write(stream, msg.buffers()); +co_await write(stream, http_response); ---- -When you write a composed buffer sequence, the OS receives all buffers in a single system call. This is *scatter/gather I/O*: +When you write a buffer sequence, the OS receives all buffers in a single system call. This is *scatter/gather I/O*: * No intermediate buffer allocation * No copying data together @@ -146,18 +193,31 @@ When you write a composed buffer sequence, the OS receives all buffers in a sing == Output ---- -Total message size: 55 bytes -Buffer count: 3 +=== Single Buffer Examples === + +String buffer: 13 bytes +Array buffer: 10 bytes +Vector buffer: 6 bytes + +=== Buffer Pair (Scatter/Gather) === + +Total message size: 41 bytes +Buffer count: 2 Buffer contents: - [27 bytes]: Content-Type: text/plain + [28 bytes]: Content-Type: text/plain + [13 bytes]: Hello, World! - [9 bytes]: ---END-- ---- HTTP Message --- -Message size: 87 bytes +=== Multi-Buffer Array === + +HTTP response size: 84 bytes +Buffer count: 5 + +=== Mutable Buffer Example === + +Prepared 2 buffers with 128 bytes total capacity ---- == Exercises diff --git a/doc/modules/ROOT/pages/examples/custom-dynamic-buffer.adoc b/doc/modules/ROOT/pages/examples/custom-dynamic-buffer.adoc index cffe1c83..fb733558 100644 --- a/doc/modules/ROOT/pages/examples/custom-dynamic-buffer.adoc +++ b/doc/modules/ROOT/pages/examples/custom-dynamic-buffer.adoc @@ -20,9 +20,12 @@ Implementing the DynamicBuffer concept for a custom allocation strategy. #include #include #include +#include +#include #include #include #include +#include using namespace boost::capy; @@ -106,8 +109,9 @@ public: // Consumer: mark bytes as processed void consume(std::size_t n) { - total_consumed_ += n; - read_pos_ += n; + std::size_t actual = std::min(n, size()); // std::size_t + total_consumed_ += actual; + read_pos_ += actual; if (read_pos_ == write_pos_) { @@ -143,18 +147,19 @@ private: }; // Demonstrate using the custom buffer -task<> read_into_tracked_buffer(any_stream& stream, tracked_buffer& buffer) +task<> read_into_tracked_buffer(test::stream& stream, tracked_buffer& buffer) { // Read data in chunks while (true) { - auto space = buffer.prepare(256); + auto space = buffer.prepare(256); // mutable_buffer + // ec: std::error_code, n: std::size_t auto [ec, n] = co_await stream.read_some(space); if (ec == cond::eof) break; - if (ec.failed()) + if (ec) throw std::system_error(ec); buffer.commit(n); @@ -169,19 +174,19 @@ void demo_tracked_buffer() std::cout << "=== Tracked Buffer Demo ===\n\n"; // Setup mock stream with test data - test::stream mock; + test::fuse f; + test::stream mock(f); mock.provide("Hello, "); mock.provide("World! "); mock.provide("This is a test of the custom buffer.\n"); - mock.provide_eof(); + // Stream returns eof when data is exhausted - any_stream stream{mock}; tracked_buffer buffer; - test::run_blocking(read_into_tracked_buffer(stream, buffer)); + test::run_blocking()(read_into_tracked_buffer(mock, buffer)); std::cout << "\nFinal buffer contents: "; - auto data = buffer.data(); + auto data = buffer.data(); // const_buffer std::cout.write(static_cast(data.data()), data.size()); std::cout << "\n\n"; @@ -235,16 +240,17 @@ std::size_t capacity() const; // Currently allocated [source,cpp] ---- // 1. Producer prepares space -auto space = buffer.prepare(256); +auto space = buffer.prepare(256); // mutable_buffer // 2. Data is written into space +// ec: std::error_code, n: std::size_t auto [ec, n] = co_await stream.read_some(space); // 3. Producer commits written bytes buffer.commit(n); // 4. Consumer reads data -auto data = buffer.data(); +auto data = buffer.data(); // const_buffer process(data); // 5. Consumer marks bytes as processed @@ -265,26 +271,25 @@ The `tracked_buffer` implementation: ---- === Tracked Buffer Demo === -Read 7 bytes, buffer size now: 7 -Read 7 bytes, buffer size now: 14 -Read 37 bytes, buffer size now: 51 +Read 51 bytes, buffer size now: 51 Final buffer contents: Hello, World! This is a test of the custom buffer. + Buffer statistics: - Total prepared: 768 bytes + Total prepared: 512 bytes Total committed: 51 bytes Total consumed: 0 bytes Current size: 51 bytes - Capacity: 256 bytes + Capacity: 1024 bytes Consuming 7 bytes... Buffer statistics: - Total prepared: 768 bytes + Total prepared: 512 bytes Total committed: 51 bytes Total consumed: 7 bytes Current size: 44 bytes - Capacity: 249 bytes + Capacity: 1017 bytes ---- == Exercises diff --git a/doc/modules/ROOT/pages/examples/mock-stream-testing.adoc b/doc/modules/ROOT/pages/examples/mock-stream-testing.adoc index 61f30911..7d9c67c2 100644 --- a/doc/modules/ROOT/pages/examples/mock-stream-testing.adoc +++ b/doc/modules/ROOT/pages/examples/mock-stream-testing.adoc @@ -18,16 +18,18 @@ Unit testing protocol code with mock streams and error injection. [source,cpp] ---- #include -#include -#include +#include #include -#include +#include +#include #include #include +#include using namespace boost::capy; // A simple protocol: read until newline, echo back uppercase +// Takes any_stream& so the function is transport-independent task echo_line_uppercase(any_stream& stream) { std::string line; @@ -36,10 +38,15 @@ task echo_line_uppercase(any_stream& stream) // Read character by character until newline while (true) { + // ec: std::error_code, n: std::size_t auto [ec, n] = co_await stream.read_some(mutable_buffer(&c, 1)); - if (ec.failed()) + if (ec) + { + if (ec == cond::eof) + break; co_return false; + } if (c == '\n') break; @@ -49,25 +56,40 @@ task echo_line_uppercase(any_stream& stream) line += '\n'; - // Echo uppercase - auto [ec, n] = co_await write(stream, make_buffer(line)); + // Echo uppercase - loop until all bytes written + std::size_t written = 0; // std::size_t - total bytes written + while (written < line.size()) + { + // wec: std::error_code, wn: std::size_t + auto [wec, wn] = co_await stream.write_some( + const_buffer(line.data() + written, line.size() - written)); + + if (wec) + co_return false; + + written += wn; + } - co_return !ec.failed(); + co_return true; } void test_happy_path() { std::cout << "Test: happy path\n"; - test::stream mock; + // Use fuse in disarmed mode (no error injection) for happy path + test::fuse f; // test::fuse + test::stream mock(f); // test::stream mock.provide("hello\n"); - any_stream stream{mock}; + // Wrap mock in any_stream using pointer construction for reference semantics + any_stream stream{&mock}; // any_stream - bool result = test::run_blocking(echo_line_uppercase(stream)); + bool result = false; // bool + test::run_blocking([&](bool r) { result = r; })(echo_line_uppercase(stream)); assert(result == true); - assert(mock.output() == "HELLO\n"); + assert(mock.data() == "HELLO\n"); std::cout << " PASSED\n"; } @@ -76,16 +98,20 @@ void test_partial_reads() { std::cout << "Test: partial reads (1 byte at a time)\n"; + // Use fuse in disarmed mode (no error injection) + test::fuse f; // test::fuse // Mock returns at most 1 byte per read_some - test::stream mock(1); // max_read_size = 1 + test::stream mock(f, 1); // test::stream, max_read_size = 1 mock.provide("hi\n"); - any_stream stream{mock}; + // Wrap mock in any_stream using pointer construction for reference semantics + any_stream stream{&mock}; // any_stream - bool result = test::run_blocking(echo_line_uppercase(stream)); + bool result = false; // bool + test::run_blocking([&](bool r) { result = r; })(echo_line_uppercase(stream)); assert(result == true); - assert(mock.output() == "HI\n"); + assert(mock.data() == "HI\n"); std::cout << " PASSED\n"; } @@ -94,27 +120,43 @@ void test_with_error_injection() { std::cout << "Test: error injection\n"; + int success_count = 0; + int error_count = 0; + // fuse::armed runs the test repeatedly, failing at each // operation point until all paths are covered - test::fuse::armed([](test::fuse& f) { - test::stream mock; + test::fuse f; // test::fuse + auto r = f.armed([&](test::fuse&) -> task<> { // fuse::result + test::stream mock(f); // test::stream mock.provide("test\n"); - // Associate fuse with mock for error injection - mock.set_fuse(&f); - - any_stream stream{mock}; + // Wrap mock in any_stream using pointer construction for reference semantics + any_stream stream{&mock}; // any_stream // Run the protocol - fuse will inject errors at each step - auto result = test::run_blocking(echo_line_uppercase(stream)); + bool result = co_await echo_line_uppercase(stream); // bool // Either succeeds with correct output, or fails cleanly if (result) { - f.expect(mock.output() == "TEST\n"); + ++success_count; + assert(mock.data() == "TEST\n"); + } + else + { + ++error_count; } }); + // Verify that fuse testing exercised both paths + std::cout << " Runs: " << (success_count + error_count) + << " (success=" << success_count + << ", error=" << error_count << ")\n"; + + assert(r.success); + assert(success_count > 0); // At least one successful run + assert(error_count > 0); // At least one error-injected run + std::cout << " PASSED (all error paths tested)\n"; } @@ -143,31 +185,45 @@ target_link_libraries(mock_stream_testing PRIVATE capy) [source,cpp] ---- -test::stream mock; +test::fuse f; // test::fuse +test::stream mock(f); // test::stream mock.provide("hello\n"); ---- `test::stream` is a bidirectional mock that satisfies both `ReadStream` and `WriteStream`: +* Constructor takes a `fuse&` for error injection * `provide(data)` — Supplies data for reads -* `output()` — Returns data written to the mock -* Constructor parameter controls max bytes per operation +* `data()` — Returns data written to the mock +* Second constructor parameter controls max bytes per operation + +=== Type-Erased Streams + +[source,cpp] +---- +// Wrap mock in any_stream using pointer construction for reference semantics +any_stream stream{&mock}; // any_stream +---- + +Use pointer construction (`&mock`) so the `any_stream` wrapper references the mock without taking ownership. This allows inspecting `mock.data()` after operations. === Synchronous Testing [source,cpp] ---- -bool result = test::run_blocking(echo_line_uppercase(stream)); +bool result = false; // bool +test::run_blocking([&](bool r) { result = r; })(echo_line_uppercase(stream)); ---- -`run_blocking` executes a coroutine synchronously, blocking until complete. This enables traditional unit test patterns with coroutines. +`run_blocking` executes a coroutine synchronously, blocking until complete. Pass a handler to capture the result. === Error Injection [source,cpp] ---- -test::fuse::armed([](test::fuse& f) { - mock.set_fuse(&f); +test::fuse f; // test::fuse +auto r = f.armed([&](test::fuse&) -> task<> { + test::stream mock(f); // test::stream // ... run test ... }); ---- @@ -188,6 +244,7 @@ Test: happy path Test: partial reads (1 byte at a time) PASSED Test: error injection + Runs: 9 (success=2, error=7) PASSED (all error paths tested) All tests passed! diff --git a/doc/modules/ROOT/pages/examples/parallel-fetch.adoc b/doc/modules/ROOT/pages/examples/parallel-fetch.adoc index 38fef0ba..40b7f262 100644 --- a/doc/modules/ROOT/pages/examples/parallel-fetch.adoc +++ b/doc/modules/ROOT/pages/examples/parallel-fetch.adoc @@ -19,6 +19,7 @@ Running multiple operations concurrently with `when_all`. ---- #include #include +#include #include using namespace boost::capy; @@ -28,7 +29,7 @@ task fetch_user_id(std::string username) { std::cout << "Fetching user ID for: " << username << "\n"; // In real code: co_await http_get("/users/" + username); - co_return username.length() * 100; // Fake ID + co_return static_cast(username.length()) * 100; // Fake ID } task fetch_user_name(int id) @@ -60,6 +61,7 @@ task<> fetch_user_dashboard(std::string username) // Now fetch all user data in parallel std::cout << "Starting parallel fetches...\n"; + // name: std::string, orders: int, balance: double auto [name, orders, balance] = co_await when_all( fetch_user_name(user_id), fetch_order_count(user_id), @@ -90,11 +92,12 @@ task fetch_with_side_effects() std::cout << "\n=== Fetch with side effects ===\n"; // void tasks don't contribute to result tuple - auto [data] = co_await when_all( + std::tuple results = co_await when_all( log_access("api/data"), // void - no result update_metrics("api_calls"), // void - no result fetch_user_name(42) // returns string ); + std::string data = std::get<0>(results); // std::string std::cout << "Data: " << data << "\n"; co_return data; @@ -120,6 +123,7 @@ task<> demonstrate_error_handling() try { + // a: int, b: int, c: int auto [a, b, c] = co_await when_all( might_fail(false, "A"), might_fail(true, "B"), // This one fails @@ -139,10 +143,18 @@ int main() { thread_pool pool; - run_async(pool.get_executor())(fetch_user_dashboard("alice")); - run_async(pool.get_executor())(fetch_with_side_effects()); - run_async(pool.get_executor())(demonstrate_error_handling()); + std::latch done(3); // std::latch - wait for 3 tasks + // Completion handlers signal the latch when each task finishes + // Use generic lambda to accept any result type (or no result for task) + auto on_complete = [&done](auto&&...) { done.count_down(); }; + auto on_error = [&done](std::exception_ptr) { done.count_down(); }; + + run_async(pool.get_executor(), on_complete, on_error)(fetch_user_dashboard("alice")); + run_async(pool.get_executor(), on_complete, on_error)(fetch_with_side_effects()); + run_async(pool.get_executor(), on_complete, on_error)(demonstrate_error_handling()); + + done.wait(); // Block until all tasks complete return 0; } ---- @@ -174,11 +186,12 @@ All three tasks run concurrently. `when_all` completes when all tasks finish. Re [source,cpp] ---- -auto [data] = co_await when_all( +std::tuple results = co_await when_all( log_access("api/data"), // void - filtered out update_metrics("api_calls"), // void - filtered out fetch_user_name(42) // string - in tuple ); +std::string data = std::get<0>(results); // std::string ---- Tasks returning `void` don't contribute to the result tuple. Only non-void results appear. diff --git a/doc/modules/ROOT/pages/examples/producer-consumer.adoc b/doc/modules/ROOT/pages/examples/producer-consumer.adoc index 8b0dcfd4..3b6de961 100644 --- a/doc/modules/ROOT/pages/examples/producer-consumer.adoc +++ b/doc/modules/ROOT/pages/examples/producer-consumer.adoc @@ -1,11 +1,12 @@ = Producer-Consumer -Two tasks communicating via an async event. +Two tasks communicating via an async event, with strand serialization. == What You Will Learn * Using `async_event` for coroutine synchronization * Running multiple concurrent tasks with `when_all` +* Using `strand` to serialize access to shared state * Task-to-task communication patterns == Prerequisites @@ -18,46 +19,49 @@ Two tasks communicating via an async event. [source,cpp] ---- #include +#include #include +#include using namespace boost::capy; -async_event data_ready; -int shared_value = 0; - -task<> producer() -{ - std::cout << "Producer: preparing data...\n"; - - // Simulate work - shared_value = 42; - - std::cout << "Producer: data ready, signaling\n"; - data_ready.set(); - - co_return; -} - -task<> consumer() -{ - std::cout << "Consumer: waiting for data...\n"; - - co_await data_ready.wait(); - - std::cout << "Consumer: received value " << shared_value << "\n"; - - co_return; -} - -task<> run_both() -{ - co_await when_all(producer(), consumer()); -} - int main() { - thread_pool pool; - run_async(pool.get_executor())(run_both()); + thread_pool pool; // thread_pool + strand s{pool.get_executor()}; // strand - serializes execution + std::latch done(1); // std::latch - wait for completion + + auto on_complete = [&done](auto&&...) { done.count_down(); }; // lambda + auto on_error = [&done](std::exception_ptr) { done.count_down(); }; // lambda + + async_event data_ready; // async_event + int shared_value = 0; // int + + auto producer = [&]() -> task<> { + std::cout << "Producer: preparing data...\n"; + shared_value = 42; + std::cout << "Producer: data ready, signaling\n"; + data_ready.set(); + co_return; + }; + + auto consumer = [&]() -> task<> { + std::cout << "Consumer: waiting for data...\n"; + co_await data_ready.wait(); + std::cout << "Consumer: received value " << shared_value << "\n"; + co_return; + }; + + // Run both tasks concurrently using when_all, through a strand. + // The strand serializes execution, ensuring thread-safe access + // to the shared async_event and shared_value. + auto run_both = [&]() -> task<> { + co_await when_all(producer(), consumer()); + }; + + run_async(s, on_complete, on_error)(run_both()); + + done.wait(); // Block until tasks complete return 0; } ---- @@ -72,11 +76,20 @@ target_link_libraries(producer_consumer PRIVATE capy) == Walkthrough +=== The Strand + +[source,cpp] +---- +strand s{pool.get_executor()}; // strand - serializes execution +---- + +A `strand` is an executor adaptor that serializes execution. All coroutines dispatched through a strand are guaranteed not to run concurrently, making it safe to access shared state without explicit locking. Note that `async_event` is not thread-safe, so using a strand ensures safe access. + === The Event [source,cpp] ---- -async_event data_ready; +async_event data_ready; // async_event ---- `async_event` is a one-shot signaling mechanism. One task can `set()` it; other tasks can `wait()` for it. When set, all waiting tasks resume. @@ -85,12 +98,13 @@ async_event data_ready; [source,cpp] ---- -task<> producer() -{ +auto producer = [&]() -> task<> { + std::cout << "Producer: preparing data...\n"; shared_value = 42; + std::cout << "Producer: data ready, signaling\n"; data_ready.set(); co_return; -} +}; ---- The producer prepares data and signals completion by calling `set()`. @@ -99,12 +113,12 @@ The producer prepares data and signals completion by calling `set()`. [source,cpp] ---- -task<> consumer() -{ +auto consumer = [&]() -> task<> { + std::cout << "Consumer: waiting for data...\n"; co_await data_ready.wait(); std::cout << "Consumer: received value " << shared_value << "\n"; co_return; -} +}; ---- The consumer waits until the event is set. The `co_await data_ready.wait()` suspends until `set()` is called. @@ -113,22 +127,37 @@ The consumer waits until the event is set. The `co_await data_ready.wait()` susp [source,cpp] ---- -task<> run_both() -{ +// Run both tasks concurrently using when_all, through a strand. +// The strand serializes execution, ensuring thread-safe access +// to the shared async_event and shared_value. +auto run_both = [&]() -> task<> { co_await when_all(producer(), consumer()); -} +}; + +run_async(s, on_complete, on_error)(run_both()); ---- -`when_all` runs both tasks concurrently. It completes when both tasks have finished. +`when_all` runs both tasks concurrently within the same parent coroutine context, but the strand ensures they don't run at the same time on different threads. The producer signals `data_ready` when the value is set, and the consumer waits for the signal before reading. + +=== Completion Synchronization + +[source,cpp] +---- +std::latch done(1); // std::latch - wait for completion +auto on_complete = [&done](auto&&...) { done.count_down(); }; +auto on_error = [&done](std::exception_ptr) { done.count_down(); }; +// ... +done.wait(); // Block until tasks complete +---- -The order of execution depends on scheduling, but synchronization ensures the consumer sees the producer's data. +The `std::latch` ensures `main()` waits for the tasks to complete before returning. == Output ---- Producer: preparing data... -Consumer: waiting for data... Producer: data ready, signaling +Consumer: waiting for data... Consumer: received value 42 ---- diff --git a/doc/modules/ROOT/pages/examples/stream-pipeline.adoc b/doc/modules/ROOT/pages/examples/stream-pipeline.adoc index 7aa2ba9c..d95462c8 100644 --- a/doc/modules/ROOT/pages/examples/stream-pipeline.adoc +++ b/doc/modules/ROOT/pages/examples/stream-pipeline.adoc @@ -17,23 +17,38 @@ Data transformation through a pipeline of sources and sinks. [source,cpp] ---- +// +// Stream Pipeline Example +// +// This example demonstrates chaining buffer sources to create a data +// processing pipeline. Data flows through transform stages: +// +// input -> uppercase_transform -> line_numbering_transform -> output +// +// Each transform is a BufferSource that wraps an upstream any_buffer_source, +// enabling type-erased composition of arbitrary transform chains. +// + #include #include #include -#include +#include +#include +#include #include #include #include +#include using namespace boost::capy; // A transform stage that converts to uppercase class uppercase_transform { - any_buffer_source* source_; - std::vector buffer_; - std::size_t offset_ = 0; - bool exhausted_ = false; + any_buffer_source* source_; // any_buffer_source* + std::vector buffer_; // std::vector + std::size_t consumed_ = 0; // std::size_t + bool exhausted_ = false; // bool public: explicit uppercase_transform(any_buffer_source& source) @@ -41,61 +56,88 @@ public: { } - // BufferSource interface - io_result pull(const_buffer* arr, std::size_t max_count) + // BufferSource::consume - advance past processed bytes + void consume(std::size_t n) noexcept { - if (exhausted_ && offset_ >= buffer_.size()) - co_return {error_code{}, 0}; // Exhausted - - // Need more data? - if (offset_ >= buffer_.size()) + consumed_ += n; + if (consumed_ >= buffer_.size()) { buffer_.clear(); - offset_ = 0; - - // Pull from upstream - const_buffer upstream[8]; - auto [ec, count] = co_await source_->pull(upstream, 8); - - if (ec.failed()) - co_return {ec, 0}; + consumed_ = 0; + } + } + + // BufferSource::pull - returns task<> to enable co_await on upstream + task>> + pull(std::span dest) + { + // Already have unconsumed data? + if (consumed_ < buffer_.size()) + { + if (dest.empty()) + co_return {std::error_code{}, std::span{}}; - if (count == 0) - { - exhausted_ = true; - co_return {error_code{}, 0}; - } + dest[0] = const_buffer( + buffer_.data() + consumed_, + buffer_.size() - consumed_); + co_return {std::error_code{}, dest.first(1)}; + } + + // Upstream exhausted? + if (exhausted_) + co_return {std::error_code{}, std::span{}}; + + // Pull from upstream + buffer_.clear(); + consumed_ = 0; + + const_buffer upstream[8]; // const_buffer[8] + // ec: std::error_code, bufs: std::span + auto [ec, bufs] = co_await source_->pull(upstream); + + if (ec) + co_return {ec, std::span{}}; + + if (bufs.empty()) + { + exhausted_ = true; + co_return {std::error_code{}, std::span{}}; + } + + // Transform: uppercase each byte + for (auto const& buf : bufs) // const_buffer const& + { + auto const* data = static_cast(buf.data()); // char const* + auto size = buf.size(); // std::size_t - // Transform: copy and uppercase - for (std::size_t i = 0; i < count; ++i) + for (std::size_t i = 0; i < size; ++i) { - auto data = static_cast(upstream[i].data()); - auto size = upstream[i].size(); - - for (std::size_t j = 0; j < size; ++j) - { - buffer_.push_back(static_cast( - std::toupper(static_cast(data[j])))); - } + buffer_.push_back(static_cast( + std::toupper(static_cast(data[i])))); } } - // Return our buffer - arr[0] = const_buffer(buffer_.data() + offset_, buffer_.size() - offset_); - offset_ = buffer_.size(); // Mark as consumed + // Consume from upstream + source_->consume(buffer_size(bufs)); - co_return {error_code{}, 1}; + // Return transformed data + if (dest.empty() || buffer_.empty()) + co_return {std::error_code{}, std::span{}}; + + dest[0] = const_buffer(buffer_.data(), buffer_.size()); + co_return {std::error_code{}, dest.first(1)}; } }; // A transform that adds line numbers class line_numbering_transform { - any_buffer_source* source_; - std::string buffer_; - std::size_t line_num_ = 1; - bool exhausted_ = false; - bool at_line_start_ = true; + any_buffer_source* source_; // any_buffer_source* + std::string buffer_; // std::string + std::size_t consumed_ = 0; // std::size_t + std::size_t line_num_ = 1; // std::size_t + bool at_line_start_ = true; // bool + bool exhausted_ = false; // bool public: explicit line_numbering_transform(any_buffer_source& source) @@ -103,81 +145,111 @@ public: { } - io_result pull(const_buffer* arr, std::size_t max_count) + void consume(std::size_t n) noexcept { - if (exhausted_ && buffer_.empty()) - co_return {error_code{}, 0}; - - // Pull more data if needed - if (buffer_.empty()) + consumed_ += n; + if (consumed_ >= buffer_.size()) { - const_buffer upstream[8]; - auto [ec, count] = co_await source_->pull(upstream, 8); - - if (ec.failed()) - co_return {ec, 0}; + buffer_.clear(); + consumed_ = 0; + } + } + + task>> + pull(std::span dest) + { + if (consumed_ < buffer_.size()) + { + if (dest.empty()) + co_return {std::error_code{}, std::span{}}; - if (count == 0) - { - exhausted_ = true; - co_return {error_code{}, 0}; - } + dest[0] = const_buffer( + buffer_.data() + consumed_, + buffer_.size() - consumed_); + co_return {std::error_code{}, dest.first(1)}; + } + + if (exhausted_) + co_return {std::error_code{}, std::span{}}; + + buffer_.clear(); + consumed_ = 0; + + const_buffer upstream[8]; // const_buffer[8] + // ec: std::error_code, bufs: std::span + auto [ec, bufs] = co_await source_->pull(upstream); + + if (ec) + co_return {ec, std::span{}}; + + if (bufs.empty()) + { + exhausted_ = true; + co_return {std::error_code{}, std::span{}}; + } + + // Transform: add line numbers + for (auto const& buf : bufs) // const_buffer const& + { + auto const* data = static_cast(buf.data()); // char const* + auto size = buf.size(); // std::size_t - // Transform: add line numbers - for (std::size_t i = 0; i < count; ++i) + for (std::size_t i = 0; i < size; ++i) { - auto data = static_cast(upstream[i].data()); - auto size = upstream[i].size(); - - for (std::size_t j = 0; j < size; ++j) + if (at_line_start_) { - if (at_line_start_) - { - buffer_ += std::to_string(line_num_++) + ": "; - at_line_start_ = false; - } - - buffer_ += data[j]; - - if (data[j] == '\n') - at_line_start_ = true; + buffer_ += std::to_string(line_num_++) + ": "; + at_line_start_ = false; } + buffer_ += data[i]; + if (data[i] == '\n') + at_line_start_ = true; } } - arr[0] = const_buffer(buffer_.data(), buffer_.size()); - buffer_.clear(); + source_->consume(buffer_size(bufs)); - co_return {error_code{}, 1}; + if (dest.empty() || buffer_.empty()) + co_return {std::error_code{}, std::span{}}; + + dest[0] = const_buffer(buffer_.data(), buffer_.size()); + co_return {std::error_code{}, dest.first(1)}; } }; // Transfer from source to sink task transfer(any_buffer_source& source, any_write_sink& sink) { - std::size_t total = 0; - const_buffer bufs[8]; + std::size_t total = 0; // std::size_t + const_buffer bufs[8]; // const_buffer[8] for (;;) { - auto [ec, count] = co_await source.pull(bufs, 8); + // ec: std::error_code, spans: std::span + auto [ec, spans] = co_await source.pull(bufs); - if (ec.failed()) + if (ec) throw std::system_error(ec); - if (count == 0) + if (spans.empty()) break; - for (std::size_t i = 0; i < count; ++i) + for (auto const& buf : spans) // const_buffer const& { - auto [wec, n] = co_await sink.write(bufs[i]); - if (wec.failed()) + // wec: std::error_code, n: std::size_t + auto [wec, n] = co_await sink.write(buf); + if (wec) throw std::system_error(wec); total += n; } + + source.consume(buffer_size(spans)); } - co_await sink.write_eof(); + io_result<> eof_result = co_await sink.write_eof(); + if (eof_result.ec) + throw std::system_error(eof_result.ec); + co_return total; } @@ -185,31 +257,34 @@ void demo_pipeline() { std::cout << "=== Stream Pipeline Demo ===\n\n"; - // Source data + // Input data std::string input = "hello world\nthis is a test\nof the pipeline\n"; std::cout << "Input:\n" << input << "\n"; - // Create source from string - test::buffer_source source; + // Create mock source with input data + test::fuse f; // test::fuse + test::buffer_source source(f); // test::buffer_source source.provide(input); - source.provide_eof(); - // Wrap as any_buffer_source - any_buffer_source src{source}; + // Build the pipeline using type-erased buffer sources. + // Using pointer construction (&source) for reference semantics - + // the wrapper does not take ownership, so source must outlive src. + any_buffer_source src{&source}; // any_buffer_source - // Create transform stages - uppercase_transform upper{src}; - any_buffer_source upper_src{upper}; + uppercase_transform upper{src}; // uppercase_transform + any_buffer_source upper_src{&upper}; // any_buffer_source - line_numbering_transform numbered{upper_src}; - any_buffer_source numbered_src{numbered}; + line_numbering_transform numbered{upper_src}; // line_numbering_transform + any_buffer_source numbered_src{&numbered}; // any_buffer_source - // Create sink - test::write_sink sink; - any_write_sink dst{sink}; + // Create sink - pointer construction ensures sink outlives dst + test::write_sink sink(f); // test::write_sink + any_write_sink dst{&sink}; // any_write_sink // Run pipeline - auto bytes = test::run_blocking(transfer(numbered_src, dst)); + std::size_t bytes = 0; // std::size_t + test::run_blocking([&](std::size_t n) { bytes = n; })( + transfer(numbered_src, dst)); std::cout << "Output (" << bytes << " bytes):\n"; std::cout << sink.data() << "\n"; @@ -217,7 +292,15 @@ void demo_pipeline() int main() { - demo_pipeline(); + try + { + demo_pipeline(); + } + catch (std::system_error const& e) + { + std::cerr << "Pipeline error: " << e.what() << "\n"; + return 1; + } return 0; } ---- @@ -249,32 +332,43 @@ Data flows through the pipeline: [source,cpp] ---- -io_result pull(const_buffer* arr, std::size_t max_count) +task>> +pull(std::span dest) { // Pull from upstream - auto [ec, count] = co_await source_->pull(upstream, 8); + // ec: std::error_code, bufs: std::span + auto [ec, bufs] = co_await source_->pull(upstream); - // Transform data - // ... + // Transform data... + + // Consume from upstream + source_->consume(buffer_size(bufs)); // Return transformed buffer - arr[0] = const_buffer(buffer_.data(), buffer_.size()); - co_return {error_code{}, 1}; + dest[0] = const_buffer(buffer_.data(), buffer_.size()); + co_return {std::error_code{}, dest.first(1)}; } ---- -Each stage pulls from upstream, transforms, and provides output buffers. +Each stage: + +1. Pulls buffers from upstream using `co_await` +2. Transforms the data +3. Calls `consume()` on upstream to indicate bytes processed +4. Returns transformed buffers -=== Type Erasure +=== Type Erasure with Pointer Construction [source,cpp] ---- -any_buffer_source src{source}; -uppercase_transform upper{src}; -any_buffer_source upper_src{upper}; +// Using pointer construction (&source) for reference semantics +any_buffer_source src{&source}; // any_buffer_source + +uppercase_transform upper{src}; // uppercase_transform +any_buffer_source upper_src{&upper}; // any_buffer_source ---- -`any_buffer_source` wraps each stage, allowing uniform composition. +`any_buffer_source` wraps each stage using pointer construction, allowing uniform composition while preserving the lifetime of the underlying objects. == Output @@ -286,7 +380,7 @@ hello world this is a test of the pipeline -Output (63 bytes): +Output (52 bytes): 1: HELLO WORLD 2: THIS IS A TEST 3: OF THE PIPELINE diff --git a/doc/modules/ROOT/pages/examples/timeout-cancellation.adoc b/doc/modules/ROOT/pages/examples/timeout-cancellation.adoc index b3875718..302c75b6 100644 --- a/doc/modules/ROOT/pages/examples/timeout-cancellation.adoc +++ b/doc/modules/ROOT/pages/examples/timeout-cancellation.adoc @@ -20,8 +20,9 @@ Using stop tokens to implement operation timeouts. #include #include #include -#include #include +#include +#include #include using namespace boost::capy; @@ -29,7 +30,7 @@ using namespace boost::capy; // A slow operation that respects cancellation task slow_fetch(int steps) { - auto token = co_await get_stop_token(); + auto token = co_await this_coro::stop_token; // std::stop_token std::string result; for (int i = 0; i < steps; ++i) @@ -37,15 +38,19 @@ task slow_fetch(int steps) // Check cancellation before each step if (token.stop_requested()) { - std::cout << " Cancelled at step " << i << "\n"; + std::cout << " Cancelled at step " << i << std::endl; throw std::system_error( make_error_code(std::errc::operation_canceled)); } result += "step" + std::to_string(i) + " "; - // Simulate work (in real code, this would be I/O) - std::cout << " Completed step " << i << "\n"; + // Simulate slow work (in real code, this would be I/O) + std::this_thread::sleep_for(std::chrono::milliseconds(10)); + std::cout << " Completed step " << i << std::endl; + + // Yield to allow stop request to be processed before next check + std::this_thread::sleep_for(std::chrono::milliseconds(15)); } co_return result; @@ -54,11 +59,11 @@ task slow_fetch(int steps) // Run with timeout (conceptual - real implementation needs timer) task> fetch_with_timeout() { - auto token = co_await get_stop_token(); + auto token = co_await this_coro::stop_token; // std::stop_token try { - auto result = co_await slow_fetch(5); + auto result = co_await slow_fetch(5); // std::string co_return result; } catch (std::system_error const& e) @@ -75,15 +80,20 @@ void demo_normal_completion() thread_pool pool; std::stop_source source; + std::latch done(1); // std::latch - wait for 1 task run_async(pool.get_executor(), source.get_token(), - [](std::optional result) { + [&done](std::optional result) { if (result) std::cout << "Result: " << *result << "\n"; else std::cout << "Cancelled\n"; - } + done.count_down(); + }, + [&done](std::exception_ptr) { done.count_down(); } )(fetch_with_timeout()); + + done.wait(); // Block until task completes } void demo_cancellation() @@ -92,30 +102,37 @@ void demo_cancellation() thread_pool pool; std::stop_source source; + std::latch done(1); // std::latch - wait for 1 task // Launch the task run_async(pool.get_executor(), source.get_token(), - [](std::optional result) { + [&done](std::optional result) { if (result) std::cout << "Result: " << *result << "\n"; else std::cout << "Cancelled (returned nullopt)\n"; - } + done.count_down(); + }, + [&done](std::exception_ptr) { done.count_down(); } )(fetch_with_timeout()); - // Simulate timeout: cancel after brief delay - std::this_thread::sleep_for(std::chrono::milliseconds(10)); - std::cout << " Requesting stop...\n"; + // Simulate timeout: cancel after 2 steps complete + // Timing: each step is 10ms work + 15ms yield = 25ms total + // Stop at 42ms: after step 1 print, before step 2 check + std::this_thread::sleep_for(std::chrono::milliseconds(42)); + std::cout << " Requesting stop..." << std::endl; source.request_stop(); + + done.wait(); // Block until task completes (after cancellation) } // Example: Manual stop token checking task process_items(std::vector const& items) { - auto token = co_await get_stop_token(); + auto token = co_await this_coro::stop_token; // std::stop_token int sum = 0; - for (auto item : items) + for (auto item : items) // int { if (token.stop_requested()) { @@ -152,10 +169,10 @@ target_link_libraries(timeout_cancellation PRIVATE capy) [source,cpp] ---- -auto token = co_await get_stop_token(); +auto token = co_await this_coro::stop_token; // std::stop_token ---- -Inside a task, `get_stop_token()` retrieves the stop token propagated from the caller. +Inside a task, `this_coro::stop_token` retrieves the stop token propagated from the caller. === Checking for Cancellation diff --git a/doc/modules/ROOT/pages/examples/type-erased-echo.adoc b/doc/modules/ROOT/pages/examples/type-erased-echo.adoc index 53f7c87b..0527599c 100644 --- a/doc/modules/ROOT/pages/examples/type-erased-echo.adoc +++ b/doc/modules/ROOT/pages/examples/type-erased-echo.adoc @@ -42,6 +42,8 @@ boost::capy::task<> echo_session(boost::capy::any_stream& stream); #include "echo.hpp" #include #include +#include +#include namespace myapp { @@ -54,18 +56,20 @@ task<> echo_session(any_stream& stream) for (;;) { // Read some data - auto [ec, n] = co_await stream.read_some(mutable_buffer(buffer)); + // ec: std::error_code, n: std::size_t + auto [ec, n] = co_await stream.read_some(make_buffer(buffer)); if (ec == cond::eof) co_return; // Client closed connection - if (ec.failed()) + if (ec) throw std::system_error(ec); // Echo it back + // wec: std::error_code, wn: std::size_t auto [wec, wn] = co_await write(stream, const_buffer(buffer, n)); - if (wec.failed()) + if (wec) throw std::system_error(wec); } } @@ -80,6 +84,7 @@ task<> echo_session(any_stream& stream) #include "echo.hpp" #include #include +#include #include #include @@ -87,27 +92,32 @@ using namespace boost::capy; void test_with_mock() { - test::stream mock; + test::fuse f; + test::stream mock(f); mock.provide("Hello, "); mock.provide("World!\n"); - mock.provide_eof(); + // Stream returns eof when no more data is available - any_stream stream{mock}; - test::run_blocking(myapp::echo_session(stream)); + // Using pointer construction (&mock) for reference semantics - the + // wrapper does not take ownership, so mock must outlive stream. + any_stream stream{&mock}; // any_stream + test::run_blocking()(myapp::echo_session(stream)); - std::cout << "Echo output: " << mock.output() << "\n"; + std::cout << "Echo output: " << mock.data() << "\n"; } +// With real sockets (using Corosio), you would write: +// +// task<> handle_client(corosio::tcp::socket socket) +// { +// // Value construction moves socket into wrapper (transfers ownership) +// any_stream stream{std::move(socket)}; +// co_await myapp::echo_session(stream); +// } + int main() { test_with_mock(); - - // With real sockets (using Corosio): - // tcp::socket socket; - // ... accept connection ... - // any_stream stream{socket}; - // co_await myapp::echo_session(stream); - return 0; } ---- diff --git a/example/CMakeLists.txt b/example/CMakeLists.txt new file mode 100644 index 00000000..5ae31913 --- /dev/null +++ b/example/CMakeLists.txt @@ -0,0 +1,23 @@ +# +# Copyright (c) 2026 Mungo Gill +# +# Distributed under the Boost Software License, Version 1.0. (See accompanying +# file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) +# +# Official repository: https://github.com/cppalliance/capy +# + +add_subdirectory(hello-task) +add_subdirectory(producer-consumer) +add_subdirectory(buffer-composition) +add_subdirectory(mock-stream-testing) +add_subdirectory(type-erased-echo) +add_subdirectory(timeout-cancellation) +add_subdirectory(parallel-fetch) +add_subdirectory(custom-dynamic-buffer) +add_subdirectory(stream-pipeline) + +# Requires Corosio dependency +if(TARGET Boost::corosio) + add_subdirectory(echo-server-corosio) +endif() diff --git a/example/Jamfile b/example/Jamfile new file mode 100644 index 00000000..a3a08279 --- /dev/null +++ b/example/Jamfile @@ -0,0 +1,18 @@ +# +# Copyright (c) 2026 Mungo Gill +# +# Distributed under the Boost Software License, Version 1.0. (See accompanying +# file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) +# +# Official repository: https://github.com/cppalliance/capy +# + +build-project hello-task ; +build-project producer-consumer ; +build-project buffer-composition ; +build-project mock-stream-testing ; +build-project type-erased-echo ; +build-project timeout-cancellation ; +build-project parallel-fetch ; +build-project custom-dynamic-buffer ; +build-project stream-pipeline ; diff --git a/example/README.md b/example/README.md new file mode 100644 index 00000000..09e846e5 --- /dev/null +++ b/example/README.md @@ -0,0 +1,60 @@ +# Boost.Capy Examples + +This directory contains example programs demonstrating Boost.Capy usage. + +## Examples + +### hello-task/ + +Minimal Capy program: a task that prints a message. + +### producer-consumer/ + +Two tasks communicating via an async event. + +### buffer-composition/ + +Composing buffer sequences without allocation for scatter/gather I/O. + +### mock-stream-testing/ + +Unit testing protocol code with mock streams and error injection. + +### type-erased-echo/ + +Echo server demonstrating the compilation firewall pattern. + +### timeout-cancellation/ + +Using stop tokens to implement operation timeouts. + +### parallel-fetch/ + +Running multiple operations concurrently with `when_all`. + +### custom-dynamic-buffer/ + +Implementing the DynamicBuffer concept for a custom allocation strategy. + +### echo-server-corosio/ + +A complete echo server using Corosio for real network I/O. Requires Corosio. + +### stream-pipeline/ + +Data transformation through a pipeline of sources and sinks. + +## Building + +### CMake + +```bash +cmake -B build -DBOOST_CAPY_BUILD_EXAMPLES=ON +cmake --build build +``` + +### B2 (BJam) + +```bash +b2 example +``` diff --git a/example/buffer-composition/CMakeLists.txt b/example/buffer-composition/CMakeLists.txt new file mode 100644 index 00000000..5a40b42b --- /dev/null +++ b/example/buffer-composition/CMakeLists.txt @@ -0,0 +1,22 @@ +# +# Copyright (c) 2026 Mungo Gill +# +# Distributed under the Boost Software License, Version 1.0. (See accompanying +# file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) +# +# Official repository: https://github.com/cppalliance/capy +# + +file(GLOB_RECURSE PFILES CONFIGURE_DEPENDS *.cpp *.hpp + CMakeLists.txt + Jamfile) + +source_group(TREE ${CMAKE_CURRENT_SOURCE_DIR} PREFIX "" FILES ${PFILES}) + +add_executable(capy_example_buffer_composition ${PFILES}) + +set_property(TARGET capy_example_buffer_composition + PROPERTY FOLDER "examples") + +target_link_libraries(capy_example_buffer_composition + Boost::capy) diff --git a/example/buffer-composition/Jamfile b/example/buffer-composition/Jamfile new file mode 100644 index 00000000..1a9c1b37 --- /dev/null +++ b/example/buffer-composition/Jamfile @@ -0,0 +1,18 @@ +# +# Copyright (c) 2026 Mungo Gill +# +# Distributed under the Boost Software License, Version 1.0. (See accompanying +# file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) +# +# Official repository: https://github.com/cppalliance/capy +# + +project + : requirements + /boost/capy//boost_capy + . + ; + +exe buffer_composition : + [ glob *.cpp ] + ; diff --git a/example/buffer-composition/buffer_composition.cpp b/example/buffer-composition/buffer_composition.cpp new file mode 100644 index 00000000..8f39e6c9 --- /dev/null +++ b/example/buffer-composition/buffer_composition.cpp @@ -0,0 +1,122 @@ +// +// Copyright (c) 2026 Mungo Gill +// +// Distributed under the Boost Software License, Version 1.0. (See accompanying +// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) +// +// Official repository: https://github.com/cppalliance/capy +// + +#include +#include +#include +#include +#include + +using namespace boost::capy; + +void demonstrate_single_buffers() +{ + std::cout << "=== Single Buffer Examples ===\n\n"; + + // Create buffers from various sources + std::string str = "Hello, World!"; + char arr[] = "Array data"; + std::vector vec = {'V', 'e', 'c', 't', 'o', 'r'}; + + // make_buffer creates buffer views (no copies) + auto str_buf = make_buffer(str); // mutable_buffer + auto arr_buf = make_buffer(arr, sizeof(arr) - 1); // mutable_buffer - Exclude null terminator + auto vec_buf = make_buffer(vec); // mutable_buffer + + std::cout << "String buffer: " << str_buf.size() << " bytes\n"; + std::cout << "Array buffer: " << arr_buf.size() << " bytes\n"; + std::cout << "Vector buffer: " << vec_buf.size() << " bytes\n"; +} + +void demonstrate_buffer_pair() +{ + std::cout << "\n=== Buffer Pair (Scatter/Gather) ===\n\n"; + + // const_buffer_pair is std::array + std::string header = "Content-Type: text/plain\r\n\r\n"; + std::string body = "Hello, World!"; + + const_buffer_pair message = {{ + make_buffer(header), + make_buffer(body) + }}; + + // Calculate total size + std::size_t total = buffer_size(message); + std::cout << "Total message size: " << total << " bytes\n"; + std::cout << "Buffer count: " << buffer_length(message) << "\n"; + + // Iterate through buffers + std::cout << "\nBuffer contents:\n"; + for (auto const& buf : message) // const_buffer const& + { + std::cout << " [" << buf.size() << " bytes]: "; + std::cout.write(static_cast(buf.data()), buf.size()); + std::cout << "\n"; + } +} + +void demonstrate_buffer_array() +{ + std::cout << "\n=== Multi-Buffer Array ===\n\n"; + + // Use std::array for more than 2 buffers + std::string status = "HTTP/1.1 200 OK\r\n"; + std::string content_type = "Content-Type: application/json\r\n"; + std::string server = "Server: Capy/1.0\r\n"; + std::string empty_line = "\r\n"; + std::string body = R"({"status":"ok"})"; + + std::array http_response = {{ + make_buffer(status), + make_buffer(content_type), + make_buffer(server), + make_buffer(empty_line), + make_buffer(body) + }}; + + std::size_t total = buffer_size(http_response); + std::cout << "HTTP response size: " << total << " bytes\n"; + std::cout << "Buffer count: " << buffer_length(http_response) << "\n"; + + // In real code with streams: + // co_await write(stream, http_response); + // This performs scatter/gather I/O - single syscall for all buffers +} + +void demonstrate_mutable_buffers() +{ + std::cout << "\n=== Mutable Buffer Example ===\n\n"; + + // Mutable buffers for receiving data + char buf1[64]; + char buf2[64]; + + mutable_buffer_pair recv_buffers = {{ + mutable_buffer(buf1, sizeof(buf1)), + mutable_buffer(buf2, sizeof(buf2)) + }}; + + std::cout << "Prepared " << buffer_length(recv_buffers) + << " buffers with " << buffer_size(recv_buffers) + << " bytes total capacity\n"; + + // In real code: + // auto [ec, n] = co_await stream.read_some(recv_buffers); +} + +int main() +{ + demonstrate_single_buffers(); + demonstrate_buffer_pair(); + demonstrate_buffer_array(); + demonstrate_mutable_buffers(); + + return 0; +} diff --git a/example/custom-dynamic-buffer/CMakeLists.txt b/example/custom-dynamic-buffer/CMakeLists.txt new file mode 100644 index 00000000..4f9bf694 --- /dev/null +++ b/example/custom-dynamic-buffer/CMakeLists.txt @@ -0,0 +1,22 @@ +# +# Copyright (c) 2026 Mungo Gill +# +# Distributed under the Boost Software License, Version 1.0. (See accompanying +# file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) +# +# Official repository: https://github.com/cppalliance/capy +# + +file(GLOB_RECURSE PFILES CONFIGURE_DEPENDS *.cpp *.hpp + CMakeLists.txt + Jamfile) + +source_group(TREE ${CMAKE_CURRENT_SOURCE_DIR} PREFIX "" FILES ${PFILES}) + +add_executable(capy_example_custom_dynamic_buffer ${PFILES}) + +set_property(TARGET capy_example_custom_dynamic_buffer + PROPERTY FOLDER "examples") + +target_link_libraries(capy_example_custom_dynamic_buffer + Boost::capy) diff --git a/example/custom-dynamic-buffer/Jamfile b/example/custom-dynamic-buffer/Jamfile new file mode 100644 index 00000000..e1a05f71 --- /dev/null +++ b/example/custom-dynamic-buffer/Jamfile @@ -0,0 +1,18 @@ +# +# Copyright (c) 2026 Mungo Gill +# +# Distributed under the Boost Software License, Version 1.0. (See accompanying +# file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) +# +# Official repository: https://github.com/cppalliance/capy +# + +project + : requirements + /boost/capy//boost_capy + . + ; + +exe custom_dynamic_buffer : + [ glob *.cpp ] + ; diff --git a/example/custom-dynamic-buffer/custom_dynamic_buffer.cpp b/example/custom-dynamic-buffer/custom_dynamic_buffer.cpp new file mode 100644 index 00000000..7b4527cc --- /dev/null +++ b/example/custom-dynamic-buffer/custom_dynamic_buffer.cpp @@ -0,0 +1,195 @@ +// +// Copyright (c) 2026 Mungo Gill +// +// Distributed under the Boost Software License, Version 1.0. (See accompanying +// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) +// +// Official repository: https://github.com/cppalliance/capy +// + +#include +#include +#include +#include +#include +#include +#include +#include +#include + +using namespace boost::capy; + +// Custom dynamic buffer with statistics tracking +class tracked_buffer +{ + std::vector storage_; + std::size_t read_pos_ = 0; // Start of readable data + std::size_t write_pos_ = 0; // End of readable data + std::size_t max_size_; + + // Statistics + std::size_t total_prepared_ = 0; + std::size_t total_committed_ = 0; + std::size_t total_consumed_ = 0; + +public: + explicit tracked_buffer(std::size_t max_size = 65536) + : max_size_(max_size) + { + storage_.reserve(1024); + } + + // === DynamicBuffer interface === + + // Consumer: readable data + const_buffer data() const noexcept + { + return const_buffer( + storage_.data() + read_pos_, + write_pos_ - read_pos_); + } + + // Capacity queries + std::size_t size() const noexcept + { + return write_pos_ - read_pos_; + } + + std::size_t max_size() const noexcept + { + return max_size_; + } + + std::size_t capacity() const noexcept + { + return storage_.capacity() - read_pos_; + } + + // Producer: prepare space for writing + mutable_buffer prepare(std::size_t n) + { + total_prepared_ += n; + + // Compact if needed + if (storage_.size() + n > storage_.capacity() && read_pos_ > 0) + { + compact(); + } + + // Grow if needed + std::size_t required = write_pos_ + n; + if (required > max_size_) + throw std::length_error("tracked_buffer: max_size exceeded"); + + if (required > storage_.size()) + storage_.resize(required); + + return mutable_buffer( + storage_.data() + write_pos_, + n); + } + + // Producer: mark bytes as written + void commit(std::size_t n) + { + total_committed_ += n; + write_pos_ += n; + } + + // Consumer: mark bytes as processed + void consume(std::size_t n) + { + std::size_t actual = std::min(n, size()); // std::size_t + total_consumed_ += actual; + read_pos_ += actual; + + if (read_pos_ == write_pos_) + { + // Buffer empty, reset positions + read_pos_ = 0; + write_pos_ = 0; + } + } + + // === Statistics === + + void print_stats() const + { + std::cout << "Buffer statistics:\n" + << " Total prepared: " << total_prepared_ << " bytes\n" + << " Total committed: " << total_committed_ << " bytes\n" + << " Total consumed: " << total_consumed_ << " bytes\n" + << " Current size: " << size() << " bytes\n" + << " Capacity: " << capacity() << " bytes\n"; + } + +private: + void compact() + { + if (read_pos_ == 0) + return; + + std::size_t len = write_pos_ - read_pos_; + std::memmove(storage_.data(), storage_.data() + read_pos_, len); + read_pos_ = 0; + write_pos_ = len; + } +}; + +// Demonstrate using the custom buffer +task<> read_into_tracked_buffer(test::stream& stream, tracked_buffer& buffer) +{ + // Read data in chunks + while (true) + { + auto space = buffer.prepare(256); // mutable_buffer + // ec: std::error_code, n: std::size_t + auto [ec, n] = co_await stream.read_some(space); + + if (ec == cond::eof) + break; + + if (ec) + throw std::system_error(ec); + + buffer.commit(n); + + std::cout << "Read " << n << " bytes, buffer size now: " + << buffer.size() << "\n"; + } +} + +void demo_tracked_buffer() +{ + std::cout << "=== Tracked Buffer Demo ===\n\n"; + + // Setup mock stream with test data + test::fuse f; + test::stream mock(f); + mock.provide("Hello, "); + mock.provide("World! "); + mock.provide("This is a test of the custom buffer.\n"); + // Stream returns eof when data is exhausted + + tracked_buffer buffer; + + test::run_blocking()(read_into_tracked_buffer(mock, buffer)); + + std::cout << "\nFinal buffer contents: "; + auto data = buffer.data(); // const_buffer + std::cout.write(static_cast(data.data()), data.size()); + std::cout << "\n\n"; + + buffer.print_stats(); + + // Consume some data + std::cout << "\nConsuming 7 bytes...\n"; + buffer.consume(7); + buffer.print_stats(); +} + +int main() +{ + demo_tracked_buffer(); + return 0; +} diff --git a/example/echo-server-corosio/CMakeLists.txt b/example/echo-server-corosio/CMakeLists.txt new file mode 100644 index 00000000..d8376b52 --- /dev/null +++ b/example/echo-server-corosio/CMakeLists.txt @@ -0,0 +1,23 @@ +# +# Copyright (c) 2026 Mungo Gill +# +# Distributed under the Boost Software License, Version 1.0. (See accompanying +# file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) +# +# Official repository: https://github.com/cppalliance/capy +# + +file(GLOB_RECURSE PFILES CONFIGURE_DEPENDS *.cpp *.hpp + CMakeLists.txt + Jamfile) + +source_group(TREE ${CMAKE_CURRENT_SOURCE_DIR} PREFIX "" FILES ${PFILES}) + +add_executable(capy_example_echo_server_corosio ${PFILES}) + +set_property(TARGET capy_example_echo_server_corosio + PROPERTY FOLDER "examples") + +target_link_libraries(capy_example_echo_server_corosio + Boost::capy + Boost::corosio) diff --git a/example/echo-server-corosio/Jamfile b/example/echo-server-corosio/Jamfile new file mode 100644 index 00000000..c2ed5b01 --- /dev/null +++ b/example/echo-server-corosio/Jamfile @@ -0,0 +1,19 @@ +# +# Copyright (c) 2026 Mungo Gill +# +# Distributed under the Boost Software License, Version 1.0. (See accompanying +# file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) +# +# Official repository: https://github.com/cppalliance/capy +# + +project + : requirements + /boost/capy//boost_capy + /boost/corosio//boost_corosio + . + ; + +exe echo_server_corosio : + [ glob *.cpp ] + ; diff --git a/example/echo-server-corosio/echo_server.cpp b/example/echo-server-corosio/echo_server.cpp new file mode 100644 index 00000000..bc21ffe4 --- /dev/null +++ b/example/echo-server-corosio/echo_server.cpp @@ -0,0 +1,144 @@ +// +// Copyright (c) 2026 Mungo Gill +// +// Distributed under the Boost Software License, Version 1.0. (See accompanying +// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) +// +// Official repository: https://github.com/cppalliance/capy +// + +#include +#include +#include + +using namespace boost::capy; +namespace tcp = boost::corosio::tcp; + +// Echo handler: receives data and sends it back +task<> echo_session(any_stream& stream, std::string client_info) +{ + std::cout << "[" << client_info << "] Session started\n"; + + char buffer[1024]; + std::size_t total_bytes = 0; + + try + { + for (;;) + { + // Read some data + // ec: std::error_code, n: std::size_t + auto [ec, n] = co_await stream.read_some(mutable_buffer(buffer)); + + if (ec == cond::eof) + { + std::cout << "[" << client_info << "] Client disconnected\n"; + break; + } + + if (ec) + { + std::cout << "[" << client_info << "] Read error: " + << ec.message() << "\n"; + break; + } + + total_bytes += n; + + // Echo it back + // wec: std::error_code, wn: std::size_t + auto [wec, wn] = co_await write(stream, const_buffer(buffer, n)); + + if (wec) + { + std::cout << "[" << client_info << "] Write error: " + << wec.message() << "\n"; + break; + } + } + } + catch (std::exception const& e) + { + std::cout << "[" << client_info << "] Exception: " << e.what() << "\n"; + } + + std::cout << "[" << client_info << "] Session ended, " + << total_bytes << " bytes echoed\n"; +} + +// Accept loop: accepts connections and spawns handlers +task<> accept_loop(tcp::acceptor& acceptor, executor_ref ex) +{ + std::cout << "Server listening on port " + << acceptor.local_endpoint().port() << "\n"; + + int connection_id = 0; + + for (;;) + { + // Accept a connection + // ec: std::error_code, socket: tcp::socket + auto [ec, socket] = co_await acceptor.async_accept(); + + if (ec) + { + std::cout << "Accept error: " << ec.message() << "\n"; + continue; + } + + // Build client info string + auto remote = socket.remote_endpoint(); // tcp::endpoint + std::string client_info = + std::to_string(++connection_id) + ":" + + remote.address().to_string() + ":" + + std::to_string(remote.port()); + + std::cout << "[" << client_info << "] Connection accepted\n"; + + // Wrap socket and spawn handler + // Note: socket ownership transfers to the lambda + run_async(ex)( + [](tcp::socket sock, std::string info) -> task<> { + any_stream stream{sock}; + co_await echo_session(stream, std::move(info)); + }(std::move(socket), std::move(client_info)) + ); + } +} + +int main(int argc, char* argv[]) +{ + try + { + // Parse port from command line + unsigned short port = 8080; + if (argc > 1) + port = static_cast(std::stoi(argv[1])); + + // Create I/O context and thread pool + boost::corosio::io_context ioc; + thread_pool pool(4); + + // Create acceptor + tcp::endpoint endpoint(tcp::v4(), port); + tcp::acceptor acceptor(ioc, endpoint); + acceptor.set_option(tcp::acceptor::reuse_address(true)); + + std::cout << "Starting echo server...\n"; + + // Run accept loop + run_async(pool.get_executor())( + accept_loop(acceptor, pool.get_executor()) + ); + + // Run the I/O context (this blocks) + ioc.run(); + } + catch (std::exception const& e) + { + std::cerr << "Error: " << e.what() << "\n"; + return 1; + } + + return 0; +} diff --git a/example/hello-task/CMakeLists.txt b/example/hello-task/CMakeLists.txt new file mode 100644 index 00000000..5abe101c --- /dev/null +++ b/example/hello-task/CMakeLists.txt @@ -0,0 +1,22 @@ +# +# Copyright (c) 2026 Mungo Gill +# +# Distributed under the Boost Software License, Version 1.0. (See accompanying +# file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) +# +# Official repository: https://github.com/cppalliance/capy +# + +file(GLOB_RECURSE PFILES CONFIGURE_DEPENDS *.cpp *.hpp + CMakeLists.txt + Jamfile) + +source_group(TREE ${CMAKE_CURRENT_SOURCE_DIR} PREFIX "" FILES ${PFILES}) + +add_executable(capy_example_hello_task ${PFILES}) + +set_property(TARGET capy_example_hello_task + PROPERTY FOLDER "examples") + +target_link_libraries(capy_example_hello_task + Boost::capy) diff --git a/example/hello-task/Jamfile b/example/hello-task/Jamfile new file mode 100644 index 00000000..48556929 --- /dev/null +++ b/example/hello-task/Jamfile @@ -0,0 +1,18 @@ +# +# Copyright (c) 2026 Mungo Gill +# +# Distributed under the Boost Software License, Version 1.0. (See accompanying +# file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) +# +# Official repository: https://github.com/cppalliance/capy +# + +project + : requirements + /boost/capy//boost_capy + . + ; + +exe hello_task : + [ glob *.cpp ] + ; diff --git a/example/hello-task/hello_task.cpp b/example/hello-task/hello_task.cpp new file mode 100644 index 00000000..e6aa44ce --- /dev/null +++ b/example/hello-task/hello_task.cpp @@ -0,0 +1,26 @@ +// +// Copyright (c) 2026 Mungo Gill +// +// Distributed under the Boost Software License, Version 1.0. (See accompanying +// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) +// +// Official repository: https://github.com/cppalliance/capy +// + +#include +#include + +using namespace boost::capy; + +task<> say_hello() +{ + std::cout << "Hello from Capy!\n"; + co_return; +} + +int main() +{ + thread_pool pool; + run_async(pool.get_executor())(say_hello()); + return 0; +} diff --git a/example/mock-stream-testing/CMakeLists.txt b/example/mock-stream-testing/CMakeLists.txt new file mode 100644 index 00000000..bd715fb5 --- /dev/null +++ b/example/mock-stream-testing/CMakeLists.txt @@ -0,0 +1,22 @@ +# +# Copyright (c) 2026 Mungo Gill +# +# Distributed under the Boost Software License, Version 1.0. (See accompanying +# file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) +# +# Official repository: https://github.com/cppalliance/capy +# + +file(GLOB_RECURSE PFILES CONFIGURE_DEPENDS *.cpp *.hpp + CMakeLists.txt + Jamfile) + +source_group(TREE ${CMAKE_CURRENT_SOURCE_DIR} PREFIX "" FILES ${PFILES}) + +add_executable(capy_example_mock_stream_testing ${PFILES}) + +set_property(TARGET capy_example_mock_stream_testing + PROPERTY FOLDER "examples") + +target_link_libraries(capy_example_mock_stream_testing + Boost::capy) diff --git a/example/mock-stream-testing/Jamfile b/example/mock-stream-testing/Jamfile new file mode 100644 index 00000000..d28d199a --- /dev/null +++ b/example/mock-stream-testing/Jamfile @@ -0,0 +1,18 @@ +# +# Copyright (c) 2026 Mungo Gill +# +# Distributed under the Boost Software License, Version 1.0. (See accompanying +# file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) +# +# Official repository: https://github.com/cppalliance/capy +# + +project + : requirements + /boost/capy//boost_capy + . + ; + +exe mock_stream_testing : + [ glob *.cpp ] + ; diff --git a/example/mock-stream-testing/mock_stream_testing.cpp b/example/mock-stream-testing/mock_stream_testing.cpp new file mode 100644 index 00000000..beba8b09 --- /dev/null +++ b/example/mock-stream-testing/mock_stream_testing.cpp @@ -0,0 +1,161 @@ +// +// Copyright (c) 2026 Mungo Gill +// +// Distributed under the Boost Software License, Version 1.0. (See accompanying +// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) +// +// Official repository: https://github.com/cppalliance/capy +// + +#include +#include +#include +#include +#include +#include +#include +#include + +using namespace boost::capy; + +// A simple protocol: read until newline, echo back uppercase +// Takes any_stream& so the function is transport-independent +task echo_line_uppercase(any_stream& stream) +{ + std::string line; + char c; + + // Read character by character until newline + while (true) + { + // ec: std::error_code, n: std::size_t + auto [ec, n] = co_await stream.read_some(mutable_buffer(&c, 1)); + + if (ec) + { + if (ec == cond::eof) + break; + co_return false; + } + + if (c == '\n') + break; + + line += static_cast(std::toupper(static_cast(c))); + } + + line += '\n'; + + // Echo uppercase - loop until all bytes written + std::size_t written = 0; // std::size_t - total bytes written + while (written < line.size()) + { + // wec: std::error_code, wn: std::size_t + auto [wec, wn] = co_await stream.write_some( + const_buffer(line.data() + written, line.size() - written)); + + if (wec) + co_return false; + + written += wn; + } + + co_return true; +} + +void test_happy_path() +{ + std::cout << "Test: happy path\n"; + + // Use fuse in disarmed mode (no error injection) for happy path + test::fuse f; // test::fuse + test::stream mock(f); // test::stream + mock.provide("hello\n"); + + // Wrap mock in any_stream using pointer construction for reference semantics + any_stream stream{&mock}; // any_stream + + bool result = false; // bool + test::run_blocking([&](bool r) { result = r; })(echo_line_uppercase(stream)); + + assert(result == true); + assert(mock.data() == "HELLO\n"); + + std::cout << " PASSED\n"; +} + +void test_partial_reads() +{ + std::cout << "Test: partial reads (1 byte at a time)\n"; + + // Use fuse in disarmed mode (no error injection) + test::fuse f; // test::fuse + // Mock returns at most 1 byte per read_some + test::stream mock(f, 1); // test::stream, max_read_size = 1 + mock.provide("hi\n"); + + // Wrap mock in any_stream using pointer construction for reference semantics + any_stream stream{&mock}; // any_stream + + bool result = false; // bool + test::run_blocking([&](bool r) { result = r; })(echo_line_uppercase(stream)); + + assert(result == true); + assert(mock.data() == "HI\n"); + + std::cout << " PASSED\n"; +} + +void test_with_error_injection() +{ + std::cout << "Test: error injection\n"; + + int success_count = 0; + int error_count = 0; + + // fuse::armed runs the test repeatedly, failing at each + // operation point until all paths are covered + test::fuse f; // test::fuse + auto r = f.armed([&](test::fuse&) -> task<> { // fuse::result + test::stream mock(f); // test::stream + mock.provide("test\n"); + + // Wrap mock in any_stream using pointer construction for reference semantics + any_stream stream{&mock}; // any_stream + + // Run the protocol - fuse will inject errors at each step + bool result = co_await echo_line_uppercase(stream); // bool + + // Either succeeds with correct output, or fails cleanly + if (result) + { + ++success_count; + assert(mock.data() == "TEST\n"); + } + else + { + ++error_count; + } + }); + + // Verify that fuse testing exercised both paths + std::cout << " Runs: " << (success_count + error_count) + << " (success=" << success_count + << ", error=" << error_count << ")\n"; + + assert(r.success); + assert(success_count > 0); // At least one successful run + assert(error_count > 0); // At least one error-injected run + + std::cout << " PASSED (all error paths tested)\n"; +} + +int main() +{ + test_happy_path(); + test_partial_reads(); + test_with_error_injection(); + + std::cout << "\nAll tests passed!\n"; + return 0; +} diff --git a/example/parallel-fetch/CMakeLists.txt b/example/parallel-fetch/CMakeLists.txt new file mode 100644 index 00000000..8dcc9a82 --- /dev/null +++ b/example/parallel-fetch/CMakeLists.txt @@ -0,0 +1,22 @@ +# +# Copyright (c) 2026 Mungo Gill +# +# Distributed under the Boost Software License, Version 1.0. (See accompanying +# file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) +# +# Official repository: https://github.com/cppalliance/capy +# + +file(GLOB_RECURSE PFILES CONFIGURE_DEPENDS *.cpp *.hpp + CMakeLists.txt + Jamfile) + +source_group(TREE ${CMAKE_CURRENT_SOURCE_DIR} PREFIX "" FILES ${PFILES}) + +add_executable(capy_example_parallel_fetch ${PFILES}) + +set_property(TARGET capy_example_parallel_fetch + PROPERTY FOLDER "examples") + +target_link_libraries(capy_example_parallel_fetch + Boost::capy) diff --git a/example/parallel-fetch/Jamfile b/example/parallel-fetch/Jamfile new file mode 100644 index 00000000..4542d2c5 --- /dev/null +++ b/example/parallel-fetch/Jamfile @@ -0,0 +1,18 @@ +# +# Copyright (c) 2026 Mungo Gill +# +# Distributed under the Boost Software License, Version 1.0. (See accompanying +# file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) +# +# Official repository: https://github.com/cppalliance/capy +# + +project + : requirements + /boost/capy//boost_capy + . + ; + +exe parallel_fetch : + [ glob *.cpp ] + ; diff --git a/example/parallel-fetch/parallel_fetch.cpp b/example/parallel-fetch/parallel_fetch.cpp new file mode 100644 index 00000000..3528714d --- /dev/null +++ b/example/parallel-fetch/parallel_fetch.cpp @@ -0,0 +1,148 @@ +// +// Copyright (c) 2026 Mungo Gill +// +// Distributed under the Boost Software License, Version 1.0. (See accompanying +// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) +// +// Official repository: https://github.com/cppalliance/capy +// + +#include +#include +#include +#include + +using namespace boost::capy; + +// Simulated async operations +task fetch_user_id(std::string username) +{ + std::cout << "Fetching user ID for: " << username << "\n"; + // In real code: co_await http_get("/users/" + username); + co_return static_cast(username.length()) * 100; // Fake ID +} + +task fetch_user_name(int id) +{ + std::cout << "Fetching name for user ID: " << id << "\n"; + co_return "User" + std::to_string(id); +} + +task fetch_order_count(int user_id) +{ + std::cout << "Fetching order count for user: " << user_id << "\n"; + co_return user_id / 10; // Fake count +} + +task fetch_account_balance(int user_id) +{ + std::cout << "Fetching balance for user: " << user_id << "\n"; + co_return user_id * 1.5; // Fake balance +} + +// Fetch all user data in parallel +task<> fetch_user_dashboard(std::string username) +{ + std::cout << "\n=== Fetching dashboard for: " << username << " ===\n"; + + // First, get the user ID (needed for other queries) + int user_id = co_await fetch_user_id(username); + std::cout << "Got user ID: " << user_id << "\n\n"; + + // Now fetch all user data in parallel + std::cout << "Starting parallel fetches...\n"; + // name: std::string, orders: int, balance: double + auto [name, orders, balance] = co_await when_all( + fetch_user_name(user_id), + fetch_order_count(user_id), + fetch_account_balance(user_id) + ); + + std::cout << "\nDashboard results:\n"; + std::cout << " Name: " << name << "\n"; + std::cout << " Orders: " << orders << "\n"; + std::cout << " Balance: $" << balance << "\n"; +} + +// Example with void tasks +task<> log_access(std::string resource) +{ + std::cout << "Logging access to: " << resource << "\n"; + co_return; +} + +task<> update_metrics(std::string metric) +{ + std::cout << "Updating metric: " << metric << "\n"; + co_return; +} + +task fetch_with_side_effects() +{ + std::cout << "\n=== Fetch with side effects ===\n"; + + // void tasks don't contribute to result tuple + std::tuple results = co_await when_all( + log_access("api/data"), // void - no result + update_metrics("api_calls"), // void - no result + fetch_user_name(42) // returns string + ); + std::string data = std::get<0>(results); // std::string + + std::cout << "Data: " << data << "\n"; + co_return data; +} + +// Error handling example +task might_fail(bool should_fail, std::string name) +{ + std::cout << "Task " << name << " starting\n"; + + if (should_fail) + { + throw std::runtime_error(name + " failed!"); + } + + std::cout << "Task " << name << " completed\n"; + co_return 42; +} + +task<> demonstrate_error_handling() +{ + std::cout << "\n=== Error handling ===\n"; + + try + { + // a: int, b: int, c: int + auto [a, b, c] = co_await when_all( + might_fail(false, "A"), + might_fail(true, "B"), // This one fails + might_fail(false, "C") + ); + std::cout << "All succeeded: " << a << ", " << b << ", " << c << "\n"; + } + catch (std::runtime_error const& e) + { + std::cout << "Caught error: " << e.what() << "\n"; + // Note: when_all waits for all tasks to complete (or respond to stop) + // before propagating the first exception + } +} + +int main() +{ + thread_pool pool; + std::latch done(3); // std::latch - wait for 3 tasks + + // Completion handlers signal the latch when each task finishes + // Use generic lambda to accept any result type (or no result for task) + auto on_complete = [&done](auto&&...) { done.count_down(); }; + auto on_error = [&done](std::exception_ptr) { done.count_down(); }; + + run_async(pool.get_executor(), on_complete, on_error)(fetch_user_dashboard("alice")); + run_async(pool.get_executor(), on_complete, on_error)(fetch_with_side_effects()); + run_async(pool.get_executor(), on_complete, on_error)(demonstrate_error_handling()); + + done.wait(); // Block until all tasks complete + return 0; +} diff --git a/example/producer-consumer/CMakeLists.txt b/example/producer-consumer/CMakeLists.txt new file mode 100644 index 00000000..c26aeded --- /dev/null +++ b/example/producer-consumer/CMakeLists.txt @@ -0,0 +1,22 @@ +# +# Copyright (c) 2026 Mungo Gill +# +# Distributed under the Boost Software License, Version 1.0. (See accompanying +# file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) +# +# Official repository: https://github.com/cppalliance/capy +# + +file(GLOB_RECURSE PFILES CONFIGURE_DEPENDS *.cpp *.hpp + CMakeLists.txt + Jamfile) + +source_group(TREE ${CMAKE_CURRENT_SOURCE_DIR} PREFIX "" FILES ${PFILES}) + +add_executable(capy_example_producer_consumer ${PFILES}) + +set_property(TARGET capy_example_producer_consumer + PROPERTY FOLDER "examples") + +target_link_libraries(capy_example_producer_consumer + Boost::capy) diff --git a/example/producer-consumer/Jamfile b/example/producer-consumer/Jamfile new file mode 100644 index 00000000..549bd8a5 --- /dev/null +++ b/example/producer-consumer/Jamfile @@ -0,0 +1,18 @@ +# +# Copyright (c) 2026 Mungo Gill +# +# Distributed under the Boost Software License, Version 1.0. (See accompanying +# file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) +# +# Official repository: https://github.com/cppalliance/capy +# + +project + : requirements + /boost/capy//boost_capy + . + ; + +exe producer_consumer : + [ glob *.cpp ] + ; diff --git a/example/producer-consumer/producer_consumer.cpp b/example/producer-consumer/producer_consumer.cpp new file mode 100644 index 00000000..ed1505f4 --- /dev/null +++ b/example/producer-consumer/producer_consumer.cpp @@ -0,0 +1,62 @@ +// +// Copyright (c) 2026 Mungo Gill +// +// Distributed under the Boost Software License, Version 1.0. (See accompanying +// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) +// +// Official repository: https://github.com/cppalliance/capy +// + +// +// Producer-Consumer Example +// +// Demonstrates coordination between coroutines using async_event +// for signaling when data is ready, with strand for serialization. +// + +#include +#include +#include +#include + +using namespace boost::capy; + +int main() +{ + thread_pool pool; // thread_pool + strand s{pool.get_executor()}; // strand - serializes execution + std::latch done(1); // std::latch - wait for completion + + auto on_complete = [&done](auto&&...) { done.count_down(); }; // lambda + auto on_error = [&done](std::exception_ptr) { done.count_down(); }; // lambda + + async_event data_ready; // async_event + int shared_value = 0; // int + + auto producer = [&]() -> task<> { + std::cout << "Producer: preparing data...\n"; + shared_value = 42; + std::cout << "Producer: data ready, signaling\n"; + data_ready.set(); + co_return; + }; + + auto consumer = [&]() -> task<> { + std::cout << "Consumer: waiting for data...\n"; + co_await data_ready.wait(); + std::cout << "Consumer: received value " << shared_value << "\n"; + co_return; + }; + + // Run both tasks concurrently using when_all, through a strand. + // The strand serializes execution, ensuring thread-safe access + // to the shared async_event and shared_value. + auto run_both = [&]() -> task<> { + co_await when_all(producer(), consumer()); + }; + + run_async(s, on_complete, on_error)(run_both()); + + done.wait(); // Block until tasks complete + return 0; +} diff --git a/example/stream-pipeline/CMakeLists.txt b/example/stream-pipeline/CMakeLists.txt new file mode 100644 index 00000000..6fd38c3a --- /dev/null +++ b/example/stream-pipeline/CMakeLists.txt @@ -0,0 +1,22 @@ +# +# Copyright (c) 2026 Mungo Gill +# +# Distributed under the Boost Software License, Version 1.0. (See accompanying +# file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) +# +# Official repository: https://github.com/cppalliance/capy +# + +file(GLOB_RECURSE PFILES CONFIGURE_DEPENDS *.cpp *.hpp + CMakeLists.txt + Jamfile) + +source_group(TREE ${CMAKE_CURRENT_SOURCE_DIR} PREFIX "" FILES ${PFILES}) + +add_executable(capy_example_stream_pipeline ${PFILES}) + +set_property(TARGET capy_example_stream_pipeline + PROPERTY FOLDER "examples") + +target_link_libraries(capy_example_stream_pipeline + Boost::capy) diff --git a/example/stream-pipeline/Jamfile b/example/stream-pipeline/Jamfile new file mode 100644 index 00000000..92d49423 --- /dev/null +++ b/example/stream-pipeline/Jamfile @@ -0,0 +1,18 @@ +# +# Copyright (c) 2026 Mungo Gill +# +# Distributed under the Boost Software License, Version 1.0. (See accompanying +# file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) +# +# Official repository: https://github.com/cppalliance/capy +# + +project + : requirements + /boost/capy//boost_capy + . + ; + +exe stream_pipeline : + [ glob *.cpp ] + ; diff --git a/example/stream-pipeline/stream_pipeline.cpp b/example/stream-pipeline/stream_pipeline.cpp new file mode 100644 index 00000000..a84e199f --- /dev/null +++ b/example/stream-pipeline/stream_pipeline.cpp @@ -0,0 +1,351 @@ +// +// Copyright (c) 2026 Mungo Gill +// +// Distributed under the Boost Software License, Version 1.0. (See accompanying +// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) +// +// Official repository: https://github.com/cppalliance/capy +// + +// +// Stream Pipeline Example +// +// This example demonstrates chaining buffer sources to create a data +// processing pipeline. Data flows through transform stages: +// +// input -> uppercase_transform -> line_numbering_transform -> output +// +// Each transform is a BufferSource that wraps an upstream any_buffer_source, +// enabling type-erased composition of arbitrary transform chains. +// +// The transforms use task<> coroutines for their pull() methods, allowing +// them to properly co_await the upstream source. +// + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +using namespace boost::capy; + +//------------------------------------------------------------------------------ +// +// Transform: uppercase_transform +// +// A BufferSource that pulls from an upstream source and converts all +// characters to uppercase. Demonstrates a simple byte-by-byte transform. +// +//------------------------------------------------------------------------------ + +class uppercase_transform +{ + any_buffer_source* source_; // any_buffer_source* + std::vector buffer_; // std::vector - transformed data + std::size_t consumed_ = 0; // std::size_t - bytes consumed by downstream + bool exhausted_ = false; // bool - upstream exhausted + +public: + explicit uppercase_transform(any_buffer_source& source) + : source_(&source) + { + } + + // BufferSource::consume - advance past processed bytes + void + consume(std::size_t n) noexcept + { + consumed_ += n; + // Compact buffer when fully consumed + if (consumed_ >= buffer_.size()) + { + buffer_.clear(); + consumed_ = 0; + } + } + + // BufferSource::pull - returns task<> to enable co_await on upstream + task>> + pull(std::span dest) + { + // Already have unconsumed data? + if (consumed_ < buffer_.size()) + { + if (dest.empty()) + co_return {std::error_code{}, std::span{}}; + + dest[0] = const_buffer( + buffer_.data() + consumed_, + buffer_.size() - consumed_); + co_return {std::error_code{}, dest.first(1)}; + } + + // Upstream exhausted? + if (exhausted_) + co_return {std::error_code{}, std::span{}}; + + // Pull from upstream + buffer_.clear(); + consumed_ = 0; + + const_buffer upstream[8]; // const_buffer[8] + // ec: std::error_code, bufs: std::span + auto [ec, bufs] = co_await source_->pull(upstream); + + if (ec) + co_return {ec, std::span{}}; + + if (bufs.empty()) + { + exhausted_ = true; + co_return {std::error_code{}, std::span{}}; + } + + // Transform: uppercase each byte + for (auto const& buf : bufs) // const_buffer const& + { + auto const* data = static_cast(buf.data()); // char const* + auto size = buf.size(); // std::size_t + + for (std::size_t i = 0; i < size; ++i) + { + buffer_.push_back(static_cast( + std::toupper(static_cast(data[i])))); + } + } + + // Consume from upstream + source_->consume(buffer_size(bufs)); + + // Return transformed data + if (dest.empty() || buffer_.empty()) + co_return {std::error_code{}, std::span{}}; + + dest[0] = const_buffer(buffer_.data(), buffer_.size()); + co_return {std::error_code{}, dest.first(1)}; + } +}; + +//------------------------------------------------------------------------------ +// +// Transform: line_numbering_transform +// +// A BufferSource that pulls from an upstream source and prepends line +// numbers to each line. Demonstrates a transform that changes data size. +// +//------------------------------------------------------------------------------ + +class line_numbering_transform +{ + any_buffer_source* source_; // any_buffer_source* + std::string buffer_; // std::string - transformed data + std::size_t consumed_ = 0; // std::size_t - bytes consumed by downstream + std::size_t line_num_ = 1; // std::size_t - current line number + bool at_line_start_ = true; // bool - are we at start of a line? + bool exhausted_ = false; // bool - upstream exhausted + +public: + explicit line_numbering_transform(any_buffer_source& source) + : source_(&source) + { + } + + // BufferSource::consume - advance past processed bytes + void + consume(std::size_t n) noexcept + { + consumed_ += n; + // Compact buffer when fully consumed + if (consumed_ >= buffer_.size()) + { + buffer_.clear(); + consumed_ = 0; + } + } + + // BufferSource::pull - returns task<> to enable co_await on upstream + task>> + pull(std::span dest) + { + // Already have unconsumed data? + if (consumed_ < buffer_.size()) + { + if (dest.empty()) + co_return {std::error_code{}, std::span{}}; + + dest[0] = const_buffer( + buffer_.data() + consumed_, + buffer_.size() - consumed_); + co_return {std::error_code{}, dest.first(1)}; + } + + // Upstream exhausted? + if (exhausted_) + co_return {std::error_code{}, std::span{}}; + + // Pull from upstream + buffer_.clear(); + consumed_ = 0; + + const_buffer upstream[8]; // const_buffer[8] + // ec: std::error_code, bufs: std::span + auto [ec, bufs] = co_await source_->pull(upstream); + + if (ec) + co_return {ec, std::span{}}; + + if (bufs.empty()) + { + exhausted_ = true; + co_return {std::error_code{}, std::span{}}; + } + + // Transform: add line numbers + for (auto const& buf : bufs) // const_buffer const& + { + auto const* data = static_cast(buf.data()); // char const* + auto size = buf.size(); // std::size_t + + for (std::size_t i = 0; i < size; ++i) + { + if (at_line_start_) + { + buffer_ += std::to_string(line_num_++) + ": "; + at_line_start_ = false; + } + buffer_ += data[i]; + if (data[i] == '\n') + at_line_start_ = true; + } + } + + // Consume from upstream + source_->consume(buffer_size(bufs)); + + // Return transformed data + if (dest.empty() || buffer_.empty()) + co_return {std::error_code{}, std::span{}}; + + dest[0] = const_buffer(buffer_.data(), buffer_.size()); + co_return {std::error_code{}, dest.first(1)}; + } +}; + +//------------------------------------------------------------------------------ +// +// transfer: Pull from source and write to sink until exhausted +// +//------------------------------------------------------------------------------ + +task transfer(any_buffer_source& source, any_write_sink& sink) +{ + std::size_t total = 0; // std::size_t + const_buffer bufs[8]; // const_buffer[8] + + for (;;) + { + // ec: std::error_code, spans: std::span + auto [ec, spans] = co_await source.pull(bufs); + + if (ec) + throw std::system_error(ec); + + if (spans.empty()) + break; + + // Write each buffer to sink + for (auto const& buf : spans) // const_buffer const& + { + // wec: std::error_code, n: std::size_t + auto [wec, n] = co_await sink.write(buf); + if (wec) + throw std::system_error(wec); + total += n; + } + + // Consume what we read + source.consume(buffer_size(spans)); + } + + io_result<> eof_result = co_await sink.write_eof(); + if (eof_result.ec) + throw std::system_error(eof_result.ec); + + co_return total; +} + +//------------------------------------------------------------------------------ +// +// demo_pipeline: Demonstrate chained transforms +// +//------------------------------------------------------------------------------ + +void demo_pipeline() +{ + std::cout << "=== Stream Pipeline Demo ===\n\n"; + + // Input data - three lines + std::string input = "hello world\nthis is a test\nof the pipeline\n"; + std::cout << "Input:\n" << input << "\n"; + + // Create mock source with input data + test::fuse f; // test::fuse + test::buffer_source source(f); // test::buffer_source + source.provide(input); + + // Build the pipeline using type-erased buffer sources: + // source -> [uppercase] -> [line_numbering] -> sink + + // Stage 1: Wrap raw source as any_buffer_source. + // Using pointer construction (&source) for reference semantics - the + // wrapper does not take ownership, so source must outlive src. + any_buffer_source src{&source}; // any_buffer_source + + // Stage 2: Uppercase transform wraps src. + // Again using pointer construction so upper_src references upper + // without taking ownership. + uppercase_transform upper{src}; // uppercase_transform + any_buffer_source upper_src{&upper}; // any_buffer_source + + // Stage 3: Line numbering transform wraps upper_src. + line_numbering_transform numbered{upper_src}; // line_numbering_transform + any_buffer_source numbered_src{&numbered}; // any_buffer_source + + // Create sink to collect output. + // Pointer construction ensures sink outlives dst. + test::write_sink sink(f); // test::write_sink + any_write_sink dst{&sink}; // any_write_sink + + // Run the pipeline + std::size_t bytes = 0; // std::size_t + test::run_blocking([&](std::size_t n) { bytes = n; })( + transfer(numbered_src, dst)); + + std::cout << "Output (" << bytes << " bytes):\n"; + std::cout << sink.data() << "\n"; + + // Expected output: + // 1: HELLO WORLD + // 2: THIS IS A TEST + // 3: OF THE PIPELINE +} + +int main() +{ + try + { + demo_pipeline(); + } + catch (std::system_error const& e) + { + std::cerr << "Pipeline error: " << e.what() << "\n"; + return 1; + } + return 0; +} diff --git a/example/timeout-cancellation/CMakeLists.txt b/example/timeout-cancellation/CMakeLists.txt new file mode 100644 index 00000000..434421f5 --- /dev/null +++ b/example/timeout-cancellation/CMakeLists.txt @@ -0,0 +1,22 @@ +# +# Copyright (c) 2026 Mungo Gill +# +# Distributed under the Boost Software License, Version 1.0. (See accompanying +# file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) +# +# Official repository: https://github.com/cppalliance/capy +# + +file(GLOB_RECURSE PFILES CONFIGURE_DEPENDS *.cpp *.hpp + CMakeLists.txt + Jamfile) + +source_group(TREE ${CMAKE_CURRENT_SOURCE_DIR} PREFIX "" FILES ${PFILES}) + +add_executable(capy_example_timeout_cancellation ${PFILES}) + +set_property(TARGET capy_example_timeout_cancellation + PROPERTY FOLDER "examples") + +target_link_libraries(capy_example_timeout_cancellation + Boost::capy) diff --git a/example/timeout-cancellation/Jamfile b/example/timeout-cancellation/Jamfile new file mode 100644 index 00000000..48eb641d --- /dev/null +++ b/example/timeout-cancellation/Jamfile @@ -0,0 +1,18 @@ +# +# Copyright (c) 2026 Mungo Gill +# +# Distributed under the Boost Software License, Version 1.0. (See accompanying +# file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) +# +# Official repository: https://github.com/cppalliance/capy +# + +project + : requirements + /boost/capy//boost_capy + . + ; + +exe timeout_cancellation : + [ glob *.cpp ] + ; diff --git a/example/timeout-cancellation/timeout_cancellation.cpp b/example/timeout-cancellation/timeout_cancellation.cpp new file mode 100644 index 00000000..45050fc5 --- /dev/null +++ b/example/timeout-cancellation/timeout_cancellation.cpp @@ -0,0 +1,147 @@ +// +// Copyright (c) 2026 Mungo Gill +// +// Distributed under the Boost Software License, Version 1.0. (See accompanying +// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) +// +// Official repository: https://github.com/cppalliance/capy +// + +#include +#include +#include +#include +#include +#include +#include + +using namespace boost::capy; + +// A slow operation that respects cancellation +task slow_fetch(int steps) +{ + auto token = co_await this_coro::stop_token; // std::stop_token + std::string result; + + for (int i = 0; i < steps; ++i) + { + // Check cancellation before each step + if (token.stop_requested()) + { + std::cout << " Cancelled at step " << i << std::endl; + throw std::system_error( + make_error_code(std::errc::operation_canceled)); + } + + result += "step" + std::to_string(i) + " "; + + // Simulate slow work (in real code, this would be I/O) + std::this_thread::sleep_for(std::chrono::milliseconds(10)); + std::cout << " Completed step " << i << std::endl; + + // Yield to allow stop request to be processed before next check + // Extra 5ms ensures print completes before main thread prints + std::this_thread::sleep_for(std::chrono::milliseconds(15)); + } + + co_return result; +} + +// Run with timeout (conceptual - real implementation needs timer) +task> fetch_with_timeout() +{ + auto token = co_await this_coro::stop_token; // std::stop_token + + try + { + auto result = co_await slow_fetch(5); // std::string + co_return result; + } + catch (std::system_error const& e) + { + if (e.code() == std::errc::operation_canceled) + co_return std::nullopt; + throw; + } +} + +void demo_normal_completion() +{ + std::cout << "Demo: Normal completion\n"; + + thread_pool pool; + std::stop_source source; + std::latch done(1); // std::latch - wait for 1 task + + run_async(pool.get_executor(), source.get_token(), + [&done](std::optional result) { + if (result) + std::cout << "Result: " << *result << "\n"; + else + std::cout << "Cancelled\n"; + done.count_down(); + }, + [&done](std::exception_ptr) { done.count_down(); } + )(fetch_with_timeout()); + + done.wait(); // Block until task completes +} + +void demo_cancellation() +{ + std::cout << "\nDemo: Cancellation after 2 steps\n"; + + thread_pool pool; + std::stop_source source; + std::latch done(1); // std::latch - wait for 1 task + + // Launch the task + run_async(pool.get_executor(), source.get_token(), + [&done](std::optional result) { + if (result) + std::cout << "Result: " << *result << "\n"; + else + std::cout << "Cancelled (returned nullopt)\n"; + done.count_down(); + }, + [&done](std::exception_ptr) { done.count_down(); } + )(fetch_with_timeout()); + + // Simulate timeout: cancel after 2 steps complete + // Timing: each step is 10ms work + 15ms yield = 25ms total + // Step 1 prints at 35ms, step 2 check at 50ms + // Stop at 42ms: after step 1 print, before step 2 check + std::this_thread::sleep_for(std::chrono::milliseconds(42)); + std::cout << " Requesting stop..." << std::endl; + source.request_stop(); + + done.wait(); // Block until task completes (after cancellation) +} + +// Example: Manual stop token checking +task process_items(std::vector const& items) +{ + auto token = co_await this_coro::stop_token; // std::stop_token + int sum = 0; + + for (auto item : items) // int + { + if (token.stop_requested()) + { + std::cout << "Processing cancelled, partial sum: " << sum << "\n"; + co_return sum; // Return partial result + } + + sum += item; + } + + co_return sum; +} + +int main() +{ + demo_normal_completion(); + demo_cancellation(); + + return 0; +} diff --git a/example/type-erased-echo/CMakeLists.txt b/example/type-erased-echo/CMakeLists.txt new file mode 100644 index 00000000..54cc2b9a --- /dev/null +++ b/example/type-erased-echo/CMakeLists.txt @@ -0,0 +1,22 @@ +# +# Copyright (c) 2026 Mungo Gill +# +# Distributed under the Boost Software License, Version 1.0. (See accompanying +# file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) +# +# Official repository: https://github.com/cppalliance/capy +# + +file(GLOB_RECURSE PFILES CONFIGURE_DEPENDS *.cpp *.hpp + CMakeLists.txt + Jamfile) + +source_group(TREE ${CMAKE_CURRENT_SOURCE_DIR} PREFIX "" FILES ${PFILES}) + +add_executable(capy_example_type_erased_echo ${PFILES}) + +set_property(TARGET capy_example_type_erased_echo + PROPERTY FOLDER "examples") + +target_link_libraries(capy_example_type_erased_echo + Boost::capy) diff --git a/example/type-erased-echo/Jamfile b/example/type-erased-echo/Jamfile new file mode 100644 index 00000000..537524f2 --- /dev/null +++ b/example/type-erased-echo/Jamfile @@ -0,0 +1,18 @@ +# +# Copyright (c) 2026 Mungo Gill +# +# Distributed under the Boost Software License, Version 1.0. (See accompanying +# file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) +# +# Official repository: https://github.com/cppalliance/capy +# + +project + : requirements + /boost/capy//boost_capy + . + ; + +exe type_erased_echo : + [ glob *.cpp ] + ; diff --git a/example/type-erased-echo/echo.cpp b/example/type-erased-echo/echo.cpp new file mode 100644 index 00000000..fe04912b --- /dev/null +++ b/example/type-erased-echo/echo.cpp @@ -0,0 +1,45 @@ +// +// Copyright (c) 2026 Mungo Gill +// +// Distributed under the Boost Software License, Version 1.0. (See accompanying +// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) +// +// Official repository: https://github.com/cppalliance/capy +// + +#include "echo.hpp" +#include +#include +#include +#include + +namespace myapp { + +using namespace boost::capy; + +task<> echo_session(any_stream& stream) +{ + char buffer[1024]; + + for (;;) + { + // Read some data + // ec: std::error_code, n: std::size_t + auto [ec, n] = co_await stream.read_some(make_buffer(buffer)); + + if (ec == cond::eof) + co_return; // Client closed connection + + if (ec) + throw std::system_error(ec); + + // Echo it back + // wec: std::error_code, wn: std::size_t + auto [wec, wn] = co_await write(stream, const_buffer(buffer, n)); + + if (wec) + throw std::system_error(wec); + } +} + +} // namespace myapp diff --git a/example/type-erased-echo/echo.hpp b/example/type-erased-echo/echo.hpp new file mode 100644 index 00000000..1dcf9fe6 --- /dev/null +++ b/example/type-erased-echo/echo.hpp @@ -0,0 +1,23 @@ +// +// Copyright (c) 2026 Mungo Gill +// +// Distributed under the Boost Software License, Version 1.0. (See accompanying +// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) +// +// Official repository: https://github.com/cppalliance/capy +// + +#ifndef ECHO_HPP +#define ECHO_HPP + +#include +#include + +namespace myapp { + +// Type-erased interface: no template dependencies +boost::capy::task<> echo_session(boost::capy::any_stream& stream); + +} // namespace myapp + +#endif diff --git a/example/type-erased-echo/main.cpp b/example/type-erased-echo/main.cpp new file mode 100644 index 00000000..cbd4e528 --- /dev/null +++ b/example/type-erased-echo/main.cpp @@ -0,0 +1,48 @@ +// +// Copyright (c) 2026 Mungo Gill +// +// Distributed under the Boost Software License, Version 1.0. (See accompanying +// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) +// +// Official repository: https://github.com/cppalliance/capy +// + +#include "echo.hpp" +#include +#include +#include +#include +#include + +using namespace boost::capy; + +void test_with_mock() +{ + test::fuse f; + test::stream mock(f); + mock.provide("Hello, "); + mock.provide("World!\n"); + // Stream returns eof when no more data is available + + // Using pointer construction (&mock) for reference semantics - the + // wrapper does not take ownership, so mock must outlive stream. + any_stream stream{&mock}; // any_stream + test::run_blocking()(myapp::echo_session(stream)); + + std::cout << "Echo output: " << mock.data() << "\n"; +} + +// With real sockets (using Corosio), you would write: +// +// task<> handle_client(corosio::tcp::socket socket) +// { +// // Value construction moves socket into wrapper (transfers ownership) +// any_stream stream{std::move(socket)}; +// co_await myapp::echo_session(stream); +// } + +int main() +{ + test_with_mock(); + return 0; +} diff --git a/include/boost/capy/test/buffer_sink.hpp b/include/boost/capy/test/buffer_sink.hpp index 4e669ec3..f1dfd1e2 100644 --- a/include/boost/capy/test/buffer_sink.hpp +++ b/include/boost/capy/test/buffer_sink.hpp @@ -165,6 +165,17 @@ class buffer_sink bool await_ready() const noexcept { return true; } + // This method is required to satisfy Capy's IoAwaitable concept, + // but is never called because await_ready() returns true. + // + // Capy uses a two-layer awaitable system: the promise's + // await_transform wraps awaitables in a transform_awaiter whose + // standard await_suspend(coroutine_handle) calls this custom + // 3-argument overload, passing the executor and stop_token from + // the coroutine's context. For synchronous test awaitables like + // this one, the coroutine never suspends, so this is not invoked. + // The signature exists to allow the same awaitable type to work + // with both synchronous (test) and asynchronous (real I/O) code. void await_suspend( coro, executor_ref, @@ -212,6 +223,9 @@ class buffer_sink bool await_ready() const noexcept { return true; } + // This method is required to satisfy Capy's IoAwaitable concept, + // but is never called because await_ready() returns true. + // See the comment on commit(std::size_t) for a detailed explanation. void await_suspend( coro, executor_ref, @@ -258,6 +272,9 @@ class buffer_sink bool await_ready() const noexcept { return true; } + // This method is required to satisfy Capy's IoAwaitable concept, + // but is never called because await_ready() returns true. + // See the comment on commit(std::size_t) for a detailed explanation. void await_suspend( coro, executor_ref, diff --git a/include/boost/capy/test/buffer_source.hpp b/include/boost/capy/test/buffer_source.hpp index f72b9be6..372f608c 100644 --- a/include/boost/capy/test/buffer_source.hpp +++ b/include/boost/capy/test/buffer_source.hpp @@ -153,6 +153,17 @@ class buffer_source bool await_ready() const noexcept { return true; } + // This method is required to satisfy Capy's IoAwaitable concept, + // but is never called because await_ready() returns true. + // + // Capy uses a two-layer awaitable system: the promise's + // await_transform wraps awaitables in a transform_awaiter whose + // standard await_suspend(coroutine_handle) calls this custom + // 3-argument overload, passing the executor and stop_token from + // the coroutine's context. For synchronous test awaitables like + // this one, the coroutine never suspends, so this is not invoked. + // The signature exists to allow the same awaitable type to work + // with both synchronous (test) and asynchronous (real I/O) code. void await_suspend( coro, executor_ref, diff --git a/include/boost/capy/test/read_source.hpp b/include/boost/capy/test/read_source.hpp index f2dce81e..d279556f 100644 --- a/include/boost/capy/test/read_source.hpp +++ b/include/boost/capy/test/read_source.hpp @@ -145,6 +145,17 @@ class read_source bool await_ready() const noexcept { return true; } + // This method is required to satisfy Capy's IoAwaitable concept, + // but is never called because await_ready() returns true. + // + // Capy uses a two-layer awaitable system: the promise's + // await_transform wraps awaitables in a transform_awaiter whose + // standard await_suspend(coroutine_handle) calls this custom + // 3-argument overload, passing the executor and stop_token from + // the coroutine's context. For synchronous test awaitables like + // this one, the coroutine never suspends, so this is not invoked. + // The signature exists to allow the same awaitable type to work + // with both synchronous (test) and asynchronous (real I/O) code. void await_suspend( coro, executor_ref, diff --git a/include/boost/capy/test/read_stream.hpp b/include/boost/capy/test/read_stream.hpp index 44a7c0ce..4927f93e 100644 --- a/include/boost/capy/test/read_stream.hpp +++ b/include/boost/capy/test/read_stream.hpp @@ -141,6 +141,17 @@ class read_stream bool await_ready() const noexcept { return true; } + // This method is required to satisfy Capy's IoAwaitable concept, + // but is never called because await_ready() returns true. + // + // Capy uses a two-layer awaitable system: the promise's + // await_transform wraps awaitables in a transform_awaiter whose + // standard await_suspend(coroutine_handle) calls this custom + // 3-argument overload, passing the executor and stop_token from + // the coroutine's context. For synchronous test awaitables like + // this one, the coroutine never suspends, so this is not invoked. + // The signature exists to allow the same awaitable type to work + // with both synchronous (test) and asynchronous (real I/O) code. void await_suspend( coro, executor_ref, diff --git a/include/boost/capy/test/stream.hpp b/include/boost/capy/test/stream.hpp index 068d2d37..475fbf6d 100644 --- a/include/boost/capy/test/stream.hpp +++ b/include/boost/capy/test/stream.hpp @@ -178,6 +178,17 @@ class stream bool await_ready() const noexcept { return true; } + // This method is required to satisfy Capy's IoAwaitable concept, + // but is never called because await_ready() returns true. + // + // Capy uses a two-layer awaitable system: the promise's + // await_transform wraps awaitables in a transform_awaiter whose + // standard await_suspend(coroutine_handle) calls this custom + // 3-argument overload, passing the executor and stop_token from + // the coroutine's context. For synchronous test awaitables like + // this one, the coroutine never suspends, so this is not invoked. + // The signature exists to allow the same awaitable type to work + // with both synchronous (test) and asynchronous (real I/O) code. void await_suspend( coro, executor_ref, @@ -278,6 +289,17 @@ class stream bool await_ready() const noexcept { return true; } + // This method is required to satisfy Capy's IoAwaitable concept, + // but is never called because await_ready() returns true. + // + // Capy uses a two-layer awaitable system: the promise's + // await_transform wraps awaitables in a transform_awaiter whose + // standard await_suspend(coroutine_handle) calls this custom + // 3-argument overload, passing the executor and stop_token from + // the coroutine's context. For synchronous test awaitables like + // this one, the coroutine never suspends, so this is not invoked. + // The signature exists to allow the same awaitable type to work + // with both synchronous (test) and asynchronous (real I/O) code. void await_suspend( coro, executor_ref, diff --git a/include/boost/capy/test/write_sink.hpp b/include/boost/capy/test/write_sink.hpp index c8fecb5b..53549ddf 100644 --- a/include/boost/capy/test/write_sink.hpp +++ b/include/boost/capy/test/write_sink.hpp @@ -180,6 +180,17 @@ class write_sink bool await_ready() const noexcept { return true; } + // This method is required to satisfy Capy's IoAwaitable concept, + // but is never called because await_ready() returns true. + // + // Capy uses a two-layer awaitable system: the promise's + // await_transform wraps awaitables in a transform_awaiter whose + // standard await_suspend(coroutine_handle) calls this custom + // 3-argument overload, passing the executor and stop_token from + // the coroutine's context. For synchronous test awaitables like + // this one, the coroutine never suspends, so this is not invoked. + // The signature exists to allow the same awaitable type to work + // with both synchronous (test) and asynchronous (real I/O) code. void await_suspend( coro, executor_ref, @@ -250,6 +261,9 @@ class write_sink bool await_ready() const noexcept { return true; } + // This method is required to satisfy Capy's IoAwaitable concept, + // but is never called because await_ready() returns true. + // See the comment on write(CB buffers) for a detailed explanation. void await_suspend( coro, executor_ref, @@ -313,6 +327,9 @@ class write_sink bool await_ready() const noexcept { return true; } + // This method is required to satisfy Capy's IoAwaitable concept, + // but is never called because await_ready() returns true. + // See the comment on write(CB buffers) for a detailed explanation. void await_suspend( coro, executor_ref, diff --git a/include/boost/capy/test/write_stream.hpp b/include/boost/capy/test/write_stream.hpp index 3ac41f83..99271252 100644 --- a/include/boost/capy/test/write_stream.hpp +++ b/include/boost/capy/test/write_stream.hpp @@ -158,6 +158,17 @@ class write_stream bool await_ready() const noexcept { return true; } + // This method is required to satisfy Capy's IoAwaitable concept, + // but is never called because await_ready() returns true. + // + // Capy uses a two-layer awaitable system: the promise's + // await_transform wraps awaitables in a transform_awaiter whose + // standard await_suspend(coroutine_handle) calls this custom + // 3-argument overload, passing the executor and stop_token from + // the coroutine's context. For synchronous test awaitables like + // this one, the coroutine never suspends, so this is not invoked. + // The signature exists to allow the same awaitable type to work + // with both synchronous (test) and asynchronous (real I/O) code. void await_suspend( coro, executor_ref,