diff --git a/perf/bench/asio/callback/accept_churn_bench.cpp b/perf/bench/asio/callback/accept_churn_bench.cpp index 3522bda9..ead7be82 100644 --- a/perf/bench/asio/callback/accept_churn_bench.cpp +++ b/perf/bench/asio/callback/accept_churn_bench.cpp @@ -126,7 +126,7 @@ bench::benchmark_result bench_sequential_churn( double duration_s ) { perf::print_header( "Sequential Accept Churn (Asio Callbacks)" ); - asio::io_context ioc; + asio::io_context ioc( 1 ); tcp_acceptor acc( ioc.get_executor(), tcp::endpoint( tcp::v4(), 0 ) ); acc.set_option( tcp_acceptor::reuse_address( true ) ); auto ep = tcp::endpoint( asio::ip::address_v4::loopback(), acc.local_endpoint().port() ); @@ -177,7 +177,7 @@ bench::benchmark_result bench_concurrent_churn( int num_loops, double duration_s { std::cout << " Concurrent loops: " << num_loops << "\n"; - asio::io_context ioc; + asio::io_context ioc( 1 ); std::atomic running{ true }; std::vector cycle_counts( num_loops, 0 ); std::vector stats( num_loops ); @@ -325,7 +325,7 @@ bench::benchmark_result bench_burst_churn( int burst_size, double duration_s ) { std::cout << " Burst size: " << burst_size << "\n"; - asio::io_context ioc; + asio::io_context ioc( 1 ); tcp_acceptor acc( ioc.get_executor(), tcp::endpoint( tcp::v4(), 0 ) ); acc.set_option( tcp_acceptor::reuse_address( true ) ); auto ep = tcp::endpoint( asio::ip::address_v4::loopback(), acc.local_endpoint().port() ); diff --git a/perf/bench/asio/callback/fan_out_bench.cpp b/perf/bench/asio/callback/fan_out_bench.cpp index 2c325c50..cdea54e5 100644 --- a/perf/bench/asio/callback/fan_out_bench.cpp +++ b/perf/bench/asio/callback/fan_out_bench.cpp @@ -174,7 +174,7 @@ bench::benchmark_result bench_fork_join( int fan_out, double duration_s ) { std::cout << " Fan-out: " << fan_out << "\n"; - asio::io_context ioc; + asio::io_context ioc( 1 ); std::vector clients; std::vector servers; @@ -331,7 +331,7 @@ bench::benchmark_result bench_nested( std::cout << " Groups: " << groups << ", Subs/group: " << subs_per_group << " (total " << total_subs << ")\n"; - asio::io_context ioc; + asio::io_context ioc( 1 ); std::vector clients; std::vector servers; @@ -402,7 +402,7 @@ bench::benchmark_result bench_concurrent_parents( << fan_out << "\n"; int total_subs = num_parents * fan_out; - asio::io_context ioc; + asio::io_context ioc( 1 ); std::vector clients; std::vector servers; diff --git a/perf/bench/asio/callback/http_server_bench.cpp b/perf/bench/asio/callback/http_server_bench.cpp index 01bffb8b..39d141a5 100644 --- a/perf/bench/asio/callback/http_server_bench.cpp +++ b/perf/bench/asio/callback/http_server_bench.cpp @@ -170,7 +170,7 @@ bench::benchmark_result bench_single_connection( double duration_s ) { perf::print_header( "Single Connection (Asio Callbacks)" ); - asio::io_context ioc; + asio::io_context ioc( 1 ); auto [client, server] = asio_bench::make_socket_pair( ioc ); std::atomic running{ true }; @@ -220,7 +220,7 @@ bench::benchmark_result bench_concurrent_connections( int num_connections, doubl { std::cout << " Connections: " << num_connections << "\n"; - asio::io_context ioc; + asio::io_context ioc( 1 ); std::vector clients; std::vector servers; @@ -416,7 +416,7 @@ void run_http_server_benchmarks( // Warm up { - asio::io_context ioc; + asio::io_context ioc( 1 ); auto [c, s] = asio_bench::make_socket_pair( ioc ); char buf[256] = {}; for( int i = 0; i < 10; ++i ) diff --git a/perf/bench/asio/callback/io_context_bench.cpp b/perf/bench/asio/callback/io_context_bench.cpp index 11d1da6c..c17bb4c7 100644 --- a/perf/bench/asio/callback/io_context_bench.cpp +++ b/perf/bench/asio/callback/io_context_bench.cpp @@ -31,7 +31,7 @@ bench::benchmark_result bench_single_threaded_post( double duration_s ) { perf::print_header( "Single-threaded Handler Post (Asio Callbacks)" ); - asio::io_context ioc; + asio::io_context ioc( 1 ); int64_t counter = 0; int constexpr batch_size = 1000; @@ -145,7 +145,7 @@ bench::benchmark_result bench_interleaved_post_run( double duration_s, int handl { perf::print_header( "Interleaved Post/Run (Asio Callbacks)" ); - asio::io_context ioc; + asio::io_context ioc( 1 ); int64_t counter = 0; perf::stopwatch sw; @@ -249,7 +249,7 @@ void run_io_context_benchmarks( // Warm up { - asio::io_context ioc; + asio::io_context ioc( 1 ); int64_t counter = 0; for( int i = 0; i < 1000; ++i ) asio::post( ioc, [&counter] { ++counter; } ); diff --git a/perf/bench/asio/callback/socket_latency_bench.cpp b/perf/bench/asio/callback/socket_latency_bench.cpp index c1f10ee9..6b6556be 100644 --- a/perf/bench/asio/callback/socket_latency_bench.cpp +++ b/perf/bench/asio/callback/socket_latency_bench.cpp @@ -131,7 +131,7 @@ bench::benchmark_result bench_pingpong_latency( std::size_t message_size, double { std::cout << " Message size: " << message_size << " bytes\n"; - asio::io_context ioc; + asio::io_context ioc( 1 ); auto [client, server] = asio_bench::make_socket_pair( ioc ); std::atomic running{ true }; @@ -170,7 +170,7 @@ bench::benchmark_result bench_concurrent_latency( std::cout << " Concurrent pairs: " << num_pairs << ", "; std::cout << "Message size: " << message_size << " bytes\n"; - asio::io_context ioc; + asio::io_context ioc( 1 ); std::vector clients; std::vector servers; @@ -256,7 +256,7 @@ void run_socket_latency_benchmarks( // Warm up { - asio::io_context ioc; + asio::io_context ioc( 1 ); auto [c, s] = asio_bench::make_socket_pair( ioc ); char buf[64] = {}; for( int i = 0; i < 100; ++i ) diff --git a/perf/bench/asio/callback/socket_throughput_bench.cpp b/perf/bench/asio/callback/socket_throughput_bench.cpp index 447544d4..e2b95e22 100644 --- a/perf/bench/asio/callback/socket_throughput_bench.cpp +++ b/perf/bench/asio/callback/socket_throughput_bench.cpp @@ -81,7 +81,7 @@ bench::benchmark_result bench_throughput( std::size_t chunk_size, double duratio { std::cout << " Buffer size: " << chunk_size << " bytes\n"; - asio::io_context ioc; + asio::io_context ioc( 1 ); auto [writer, reader] = asio_bench::make_socket_pair( ioc ); std::vector write_buf( chunk_size, 'x' ); @@ -133,7 +133,7 @@ bench::benchmark_result bench_bidirectional_throughput( std::size_t chunk_size, { std::cout << " Buffer size: " << chunk_size << " bytes, bidirectional\n"; - asio::io_context ioc; + asio::io_context ioc( 1 ); auto [sock1, sock2] = asio_bench::make_socket_pair( ioc ); std::vector buf1( chunk_size, 'a' ); @@ -205,7 +205,7 @@ void run_socket_throughput_benchmarks( // Warm up { - asio::io_context ioc; + asio::io_context ioc( 1 ); auto [w, r] = asio_bench::make_socket_pair( ioc ); std::vector buf( 4096, 'w' ); asio::write( w, asio::buffer( buf ) ); diff --git a/perf/bench/asio/callback/timer_bench.cpp b/perf/bench/asio/callback/timer_bench.cpp index fa27bede..65152035 100644 --- a/perf/bench/asio/callback/timer_bench.cpp +++ b/perf/bench/asio/callback/timer_bench.cpp @@ -35,7 +35,7 @@ bench::benchmark_result bench_schedule_cancel( double duration_s ) { perf::print_header( "Timer Schedule/Cancel (Asio Callbacks)" ); - asio::io_context ioc; + asio::io_context ioc( 1 ); int64_t counter = 0; int constexpr batch_size = 1000; @@ -107,7 +107,7 @@ bench::benchmark_result bench_fire_rate( double duration_s ) { perf::print_header( "Timer Fire Rate (Asio Callbacks)" ); - asio::io_context ioc; + asio::io_context ioc( 1 ); std::atomic running{ true }; int64_t counter = 0; @@ -189,7 +189,7 @@ bench::benchmark_result bench_concurrent_timers( int num_timers, double duration { std::cout << " Timers: " << num_timers << "\n"; - asio::io_context ioc; + asio::io_context ioc( 1 ); std::atomic running{ true }; std::vector fire_counts( num_timers, 0 ); std::vector stats( num_timers ); diff --git a/perf/bench/asio/coroutine/accept_churn_bench.cpp b/perf/bench/asio/coroutine/accept_churn_bench.cpp index 4de25bde..79071145 100644 --- a/perf/bench/asio/coroutine/accept_churn_bench.cpp +++ b/perf/bench/asio/coroutine/accept_churn_bench.cpp @@ -39,7 +39,7 @@ bench::benchmark_result bench_sequential_churn( double duration_s ) { perf::print_header( "Sequential Accept Churn (Asio Coroutines)" ); - asio::io_context ioc; + asio::io_context ioc( 1 ); tcp_acceptor acc( ioc.get_executor(), tcp::endpoint( tcp::v4(), 0 ) ); acc.set_option( tcp_acceptor::reuse_address( true ) ); auto ep = tcp::endpoint( asio::ip::address_v4::loopback(), acc.local_endpoint().port() ); @@ -132,7 +132,7 @@ bench::benchmark_result bench_concurrent_churn( int num_loops, double duration_s { std::cout << " Concurrent loops: " << num_loops << "\n"; - asio::io_context ioc; + asio::io_context ioc( 1 ); std::atomic running{ true }; std::vector cycle_counts( num_loops, 0 ); std::vector stats( num_loops ); @@ -249,7 +249,7 @@ bench::benchmark_result bench_burst_churn( int burst_size, double duration_s ) { std::cout << " Burst size: " << burst_size << "\n"; - asio::io_context ioc; + asio::io_context ioc( 1 ); tcp_acceptor acc( ioc.get_executor(), tcp::endpoint( tcp::v4(), 0 ) ); acc.set_option( tcp_acceptor::reuse_address( true ) ); auto ep = tcp::endpoint( asio::ip::address_v4::loopback(), acc.local_endpoint().port() ); diff --git a/perf/bench/asio/coroutine/fan_out_bench.cpp b/perf/bench/asio/coroutine/fan_out_bench.cpp index aa5d28e7..c07fd1ad 100644 --- a/perf/bench/asio/coroutine/fan_out_bench.cpp +++ b/perf/bench/asio/coroutine/fan_out_bench.cpp @@ -76,7 +76,7 @@ bench::benchmark_result bench_fork_join( int fan_out, double duration_s ) { std::cout << " Fan-out: " << fan_out << "\n"; - asio::io_context ioc; + asio::io_context ioc( 1 ); std::vector clients; std::vector servers; @@ -171,7 +171,7 @@ bench::benchmark_result bench_nested( std::cout << " Groups: " << groups << ", Subs/group: " << subs_per_group << " (total " << total_subs << ")\n"; - asio::io_context ioc; + asio::io_context ioc( 1 ); std::vector clients; std::vector servers; @@ -294,7 +294,7 @@ bench::benchmark_result bench_concurrent_parents( << fan_out << "\n"; int total_subs = num_parents * fan_out; - asio::io_context ioc; + asio::io_context ioc( 1 ); std::vector clients; std::vector servers; diff --git a/perf/bench/asio/coroutine/http_server_bench.cpp b/perf/bench/asio/coroutine/http_server_bench.cpp index e5aa3637..21c863fc 100644 --- a/perf/bench/asio/coroutine/http_server_bench.cpp +++ b/perf/bench/asio/coroutine/http_server_bench.cpp @@ -129,7 +129,7 @@ bench::benchmark_result bench_single_connection( double duration_s ) { perf::print_header( "Single Connection (Asio Coroutines)" ); - asio::io_context ioc; + asio::io_context ioc( 1 ); auto [client, server] = make_socket_pair( ioc ); std::atomic running{ true }; @@ -180,7 +180,7 @@ bench::benchmark_result bench_concurrent_connections( int num_connections, doubl { std::cout << " Connections: " << num_connections << "\n"; - asio::io_context ioc; + asio::io_context ioc( 1 ); std::vector clients; std::vector servers; @@ -366,7 +366,7 @@ void run_http_server_benchmarks( // Warm up { - asio::io_context ioc; + asio::io_context ioc( 1 ); auto [c, s] = make_socket_pair( ioc ); char buf[256] = {}; for( int i = 0; i < 10; ++i ) diff --git a/perf/bench/asio/coroutine/io_context_bench.cpp b/perf/bench/asio/coroutine/io_context_bench.cpp index 3d77e900..2ccf7478 100644 --- a/perf/bench/asio/coroutine/io_context_bench.cpp +++ b/perf/bench/asio/coroutine/io_context_bench.cpp @@ -46,7 +46,7 @@ bench::benchmark_result bench_single_threaded_post( double duration_s ) { perf::print_header( "Single-threaded Handler Post (Asio Coroutines)" ); - asio::io_context ioc; + asio::io_context ioc( 1 ); int64_t counter = 0; int constexpr batch_size = 1000; @@ -156,7 +156,7 @@ bench::benchmark_result bench_interleaved_post_run( double duration_s, int handl { perf::print_header( "Interleaved Post/Run (Asio Coroutines)" ); - asio::io_context ioc; + asio::io_context ioc( 1 ); int64_t counter = 0; perf::stopwatch sw; @@ -258,7 +258,7 @@ void run_io_context_benchmarks( // Warm up { - asio::io_context ioc; + asio::io_context ioc( 1 ); int64_t counter = 0; for( int i = 0; i < 1000; ++i ) asio::co_spawn( ioc, increment_task( counter ), asio::detached ); diff --git a/perf/bench/asio/coroutine/socket_latency_bench.cpp b/perf/bench/asio/coroutine/socket_latency_bench.cpp index 20d635c3..211aa48b 100644 --- a/perf/bench/asio/coroutine/socket_latency_bench.cpp +++ b/perf/bench/asio/coroutine/socket_latency_bench.cpp @@ -82,7 +82,7 @@ bench::benchmark_result bench_pingpong_latency( std::size_t message_size, double { std::cout << " Message size: " << message_size << " bytes\n"; - asio::io_context ioc; + asio::io_context ioc( 1 ); auto [client, server] = make_socket_pair( ioc ); std::atomic running{ true }; @@ -122,7 +122,7 @@ bench::benchmark_result bench_concurrent_latency( std::cout << " Concurrent pairs: " << num_pairs << ", "; std::cout << "Message size: " << message_size << " bytes\n"; - asio::io_context ioc; + asio::io_context ioc( 1 ); std::vector clients; std::vector servers; @@ -206,7 +206,7 @@ void run_socket_latency_benchmarks( // Warm up { - asio::io_context ioc; + asio::io_context ioc( 1 ); auto [c, s] = make_socket_pair( ioc ); char buf[64] = {}; for( int i = 0; i < 100; ++i ) diff --git a/perf/bench/asio/coroutine/socket_throughput_bench.cpp b/perf/bench/asio/coroutine/socket_throughput_bench.cpp index 9589b577..6ca48ecb 100644 --- a/perf/bench/asio/coroutine/socket_throughput_bench.cpp +++ b/perf/bench/asio/coroutine/socket_throughput_bench.cpp @@ -35,7 +35,7 @@ bench::benchmark_result bench_throughput( std::size_t chunk_size, double duratio { std::cout << " Buffer size: " << chunk_size << " bytes\n"; - asio::io_context ioc; + asio::io_context ioc( 1 ); auto [writer, reader] = make_socket_pair( ioc ); std::vector write_buf( chunk_size, 'x' ); @@ -117,7 +117,7 @@ bench::benchmark_result bench_bidirectional_throughput( std::size_t chunk_size, { std::cout << " Buffer size: " << chunk_size << " bytes, bidirectional\n"; - asio::io_context ioc; + asio::io_context ioc( 1 ); auto [sock1, sock2] = make_socket_pair( ioc ); std::vector buf1( chunk_size, 'a' ); @@ -245,7 +245,7 @@ void run_socket_throughput_benchmarks( // Warm up { - asio::io_context ioc; + asio::io_context ioc( 1 ); auto [w, r] = make_socket_pair( ioc ); std::vector buf( 4096, 'w' ); asio::write( w, asio::buffer( buf ) ); diff --git a/perf/bench/asio/coroutine/timer_bench.cpp b/perf/bench/asio/coroutine/timer_bench.cpp index 6b564569..e1440002 100644 --- a/perf/bench/asio/coroutine/timer_bench.cpp +++ b/perf/bench/asio/coroutine/timer_bench.cpp @@ -37,7 +37,7 @@ bench::benchmark_result bench_schedule_cancel( double duration_s ) { perf::print_header( "Timer Schedule/Cancel (Asio Coroutines)" ); - asio::io_context ioc; + asio::io_context ioc( 1 ); int64_t counter = 0; int constexpr batch_size = 1000; @@ -82,7 +82,7 @@ bench::benchmark_result bench_fire_rate( double duration_s ) { perf::print_header( "Timer Fire Rate (Asio Coroutines)" ); - asio::io_context ioc; + asio::io_context ioc( 1 ); std::atomic running{ true }; int64_t counter = 0; @@ -136,7 +136,7 @@ bench::benchmark_result bench_concurrent_timers( int num_timers, double duration { std::cout << " Timers: " << num_timers << "\n"; - asio::io_context ioc; + asio::io_context ioc( 1 ); std::atomic running{ true }; std::vector fire_counts( num_timers, 0 ); std::vector stats( num_timers ); diff --git a/perf/bench/corosio/accept_churn_bench.cpp b/perf/bench/corosio/accept_churn_bench.cpp index 4e7bdc6f..b2996a64 100644 --- a/perf/bench/corosio/accept_churn_bench.cpp +++ b/perf/bench/corosio/accept_churn_bench.cpp @@ -41,7 +41,7 @@ bench::benchmark_result bench_sequential_churn( { perf::print_header( "Sequential Accept Churn (Corosio)" ); - auto ioc = factory(); + auto ioc = factory( 1 ); corosio::tcp_acceptor acc( *ioc ); auto listen_ec = acc.listen( corosio::endpoint( corosio::ipv4_address::loopback(), 0 ) ); @@ -144,7 +144,7 @@ bench::benchmark_result bench_concurrent_churn( { std::cout << " Concurrent loops: " << num_loops << "\n"; - auto ioc = factory(); + auto ioc = factory( 1 ); std::atomic running{ true }; std::vector cycle_counts( num_loops, 0 ); std::vector stats( num_loops ); @@ -270,7 +270,7 @@ bench::benchmark_result bench_burst_churn( { std::cout << " Burst size: " << burst_size << "\n"; - auto ioc = factory(); + auto ioc = factory( 1 ); corosio::tcp_acceptor acc( *ioc ); auto listen_ec = acc.listen( corosio::endpoint( corosio::ipv4_address::loopback(), 0 ) ); diff --git a/perf/bench/corosio/fan_out_bench.cpp b/perf/bench/corosio/fan_out_bench.cpp index 446db81a..042c4f0a 100644 --- a/perf/bench/corosio/fan_out_bench.cpp +++ b/perf/bench/corosio/fan_out_bench.cpp @@ -81,7 +81,7 @@ bench::benchmark_result bench_fork_join( { std::cout << " Fan-out: " << fan_out << "\n"; - auto ioc = factory(); + auto ioc = factory( 1 ); std::vector clients; std::vector servers; @@ -177,7 +177,7 @@ bench::benchmark_result bench_nested( std::cout << " Groups: " << groups << ", Subs/group: " << subs_per_group << " (total " << total_subs << ")\n"; - auto ioc = factory(); + auto ioc = factory( 1 ); std::vector clients; std::vector servers; @@ -294,7 +294,7 @@ bench::benchmark_result bench_concurrent_parents( << fan_out << "\n"; int total_subs = num_parents * fan_out; - auto ioc = factory(); + auto ioc = factory( 1 ); std::vector clients; std::vector servers; diff --git a/perf/bench/corosio/http_server_bench.cpp b/perf/bench/corosio/http_server_bench.cpp index 09d7bf94..913325fb 100644 --- a/perf/bench/corosio/http_server_bench.cpp +++ b/perf/bench/corosio/http_server_bench.cpp @@ -122,7 +122,7 @@ bench::benchmark_result bench_single_connection( { perf::print_header( "Single Connection (Corosio)" ); - auto ioc = factory(); + auto ioc = factory( 1 ); auto [client, server] = corosio::test::make_socket_pair( *ioc ); client.set_no_delay( true ); @@ -175,7 +175,7 @@ bench::benchmark_result bench_concurrent_connections( { std::cout << " Connections: " << num_connections << "\n"; - auto ioc = factory(); + auto ioc = factory( 1 ); std::vector clients; std::vector servers; @@ -261,7 +261,7 @@ bench::benchmark_result bench_multithread( std::cout << " Threads: " << num_threads << ", Connections: " << num_connections << "\n"; - auto ioc = factory(); + auto ioc = factory( num_threads ); std::vector clients; std::vector servers; @@ -362,7 +362,7 @@ void run_http_server_benchmarks( // Warm up { - auto ioc = factory(); + auto ioc = factory( 1 ); auto [c, s] = corosio::test::make_socket_pair( *ioc ); char buf[256] = {}; auto task = [&]() -> capy::task<> diff --git a/perf/bench/corosio/io_context_bench.cpp b/perf/bench/corosio/io_context_bench.cpp index bfdf2824..c6032abc 100644 --- a/perf/bench/corosio/io_context_bench.cpp +++ b/perf/bench/corosio/io_context_bench.cpp @@ -45,7 +45,7 @@ bench::benchmark_result bench_single_threaded_post( { perf::print_header( "Single-threaded Handler Post (Corosio)" ); - auto ioc = factory(); + auto ioc = factory( 1 ); auto ex = ioc->get_executor(); int64_t counter = 0; int constexpr batch_size = 1000; @@ -91,7 +91,7 @@ bench::benchmark_result bench_multithreaded_scaling( for( int num_threads = 1; num_threads <= max_threads; num_threads *= 2 ) { - auto ioc = factory(); + auto ioc = factory( max_threads ); auto ex = ioc->get_executor(); std::atomic running{ true }; std::atomic counter{ 0 }; @@ -158,7 +158,7 @@ bench::benchmark_result bench_interleaved_post_run( { perf::print_header( "Interleaved Post/Run (Corosio)" ); - auto ioc = factory(); + auto ioc = factory( 1 ); auto ex = ioc->get_executor(); int64_t counter = 0; @@ -198,7 +198,7 @@ bench::benchmark_result bench_concurrent_post_run( { perf::print_header( "Concurrent Post and Run (Corosio)" ); - auto ioc = factory(); + auto ioc = factory( num_threads ); auto ex = ioc->get_executor(); std::atomic running{ true }; std::atomic counter{ 0 }; @@ -263,7 +263,7 @@ void run_io_context_benchmarks( // Warm up { - auto ioc = factory(); + auto ioc = factory( 1 ); auto ex = ioc->get_executor(); int64_t counter = 0; for( int i = 0; i < 1000; ++i ) diff --git a/perf/bench/corosio/socket_latency_bench.cpp b/perf/bench/corosio/socket_latency_bench.cpp index ff2d74e7..13e59db6 100644 --- a/perf/bench/corosio/socket_latency_bench.cpp +++ b/perf/bench/corosio/socket_latency_bench.cpp @@ -81,7 +81,7 @@ bench::benchmark_result bench_pingpong_latency( { std::cout << " Message size: " << message_size << " bytes\n"; - auto ioc = factory(); + auto ioc = factory( 1 ); auto [client, server] = corosio::test::make_socket_pair( *ioc ); client.set_no_delay( true ); @@ -123,7 +123,7 @@ bench::benchmark_result bench_concurrent_latency( std::cout << " Concurrent pairs: " << num_pairs << ", "; std::cout << "Message size: " << message_size << " bytes\n"; - auto ioc = factory(); + auto ioc = factory( 1 ); std::vector clients; std::vector servers; @@ -209,7 +209,7 @@ void run_socket_latency_benchmarks( // Warm up { - auto ioc = factory(); + auto ioc = factory( 1 ); auto [c, s] = corosio::test::make_socket_pair( *ioc ); char buf[64] = {}; auto task = [&]() -> capy::task<> diff --git a/perf/bench/corosio/socket_throughput_bench.cpp b/perf/bench/corosio/socket_throughput_bench.cpp index 12b393ef..c785edb9 100644 --- a/perf/bench/corosio/socket_throughput_bench.cpp +++ b/perf/bench/corosio/socket_throughput_bench.cpp @@ -56,7 +56,7 @@ bench::benchmark_result bench_throughput( { std::cout << " Buffer size: " << chunk_size << " bytes\n"; - auto ioc = factory(); + auto ioc = factory( 1 ); auto [writer, reader] = corosio::test::make_socket_pair( *ioc ); set_nodelay( writer ); @@ -131,7 +131,7 @@ bench::benchmark_result bench_bidirectional_throughput( { std::cout << " Buffer size: " << chunk_size << " bytes, bidirectional\n"; - auto ioc = factory(); + auto ioc = factory( 1 ); auto [sock1, sock2] = corosio::test::make_socket_pair( *ioc ); set_nodelay( sock1 ); @@ -245,7 +245,7 @@ void run_socket_throughput_benchmarks( // Warm up { - auto ioc = factory(); + auto ioc = factory( 1 ); auto [w, r] = corosio::test::make_socket_pair( *ioc ); std::vector buf( 4096, 'w' ); auto task = [&]() -> capy::task<> diff --git a/perf/bench/corosio/timer_bench.cpp b/perf/bench/corosio/timer_bench.cpp index fa34ff59..a6fb6ef1 100644 --- a/perf/bench/corosio/timer_bench.cpp +++ b/perf/bench/corosio/timer_bench.cpp @@ -37,7 +37,7 @@ bench::benchmark_result bench_schedule_cancel( { perf::print_header( "Timer Schedule/Cancel (Corosio)" ); - auto ioc = factory(); + auto ioc = factory( 1 ); int64_t counter = 0; int constexpr batch_size = 1000; @@ -83,7 +83,7 @@ bench::benchmark_result bench_fire_rate( { perf::print_header( "Timer Fire Rate (Corosio)" ); - auto ioc = factory(); + auto ioc = factory( 1 ); std::atomic running{ true }; int64_t counter = 0; @@ -136,7 +136,7 @@ bench::benchmark_result bench_concurrent_timers( { std::cout << " Timers: " << num_timers << "\n"; - auto ioc = factory(); + auto ioc = factory( 1 ); std::atomic running{ true }; std::vector fire_counts( num_timers, 0 ); std::vector stats( num_timers ); diff --git a/perf/common/backend_selection.hpp b/perf/common/backend_selection.hpp index 510fc3db..c6945d27 100644 --- a/perf/common/backend_selection.hpp +++ b/perf/common/backend_selection.hpp @@ -20,8 +20,25 @@ namespace perf { -/// Factory function pointer that creates a fresh io_context. -using context_factory = std::unique_ptr(*)(); +/// Factory that creates a fresh io_context with an optional concurrency hint. +/// +/// The hint controls whether the scheduler uses mutex locking: +/// hint == 1 → single-threaded mode (all locking elided) +/// hint != 1 → multi-threaded mode (locking enabled) +/// +/// Defaults to 0 (multi-threaded, locking enabled). Single-threaded +/// benchmarks should pass 1 explicitly to opt in to lock elision. +struct context_factory +{ + using create_fn = std::unique_ptr(*)(unsigned); + create_fn create; + + std::unique_ptr + operator()(unsigned concurrency_hint = 0) const + { + return create(concurrency_hint); + } +}; /** Return the default backend name for the current platform. */ inline const char* default_backend_name() @@ -75,9 +92,9 @@ int dispatch_backend(const char* backend, Func&& func) #if BOOST_COROSIO_HAS_EPOLL if (std::strcmp(backend, "epoll") == 0) { - func([]() -> std::unique_ptr { - return std::make_unique(); - }, "epoll"); + func(context_factory{[](unsigned hint) -> std::unique_ptr { + return std::make_unique(hint); + }}, "epoll"); return 0; } #endif @@ -85,9 +102,9 @@ int dispatch_backend(const char* backend, Func&& func) #if BOOST_COROSIO_HAS_KQUEUE if (std::strcmp(backend, "kqueue") == 0) { - func([]() -> std::unique_ptr { - return std::make_unique(); - }, "kqueue"); + func(context_factory{[](unsigned hint) -> std::unique_ptr { + return std::make_unique(hint); + }}, "kqueue"); return 0; } #endif @@ -95,9 +112,9 @@ int dispatch_backend(const char* backend, Func&& func) #if BOOST_COROSIO_HAS_SELECT if (std::strcmp(backend, "select") == 0) { - func([]() -> std::unique_ptr { - return std::make_unique(); - }, "select"); + func(context_factory{[](unsigned hint) -> std::unique_ptr { + return std::make_unique(hint); + }}, "select"); return 0; } #endif @@ -105,9 +122,9 @@ int dispatch_backend(const char* backend, Func&& func) #if BOOST_COROSIO_HAS_IOCP if (std::strcmp(backend, "iocp") == 0) { - func([]() -> std::unique_ptr { - return std::make_unique(); - }, "iocp"); + func(context_factory{[](unsigned hint) -> std::unique_ptr { + return std::make_unique(hint); + }}, "iocp"); return 0; } #endif diff --git a/perf/profile/concurrent_io_bench.cpp b/perf/profile/concurrent_io_bench.cpp index 2c68df16..e461736c 100644 --- a/perf/profile/concurrent_io_bench.cpp +++ b/perf/profile/concurrent_io_bench.cpp @@ -84,7 +84,7 @@ void run_workload( int num_pairs, int num_threads) { - auto ioc = factory(); + auto ioc = factory( num_threads ); std::atomic ops{0}; std::atomic stop{false}; @@ -199,7 +199,7 @@ void run_profiler_workload( // Warmup std::cout << "Warming up (1 second)...\n"; { - auto ioc = factory(); + auto ioc = factory( 1 ); auto [a, b] = corosio::test::make_socket_pair(*ioc); a.set_no_delay(true); b.set_no_delay(true); diff --git a/perf/profile/coroutine_post_bench.cpp b/perf/profile/coroutine_post_bench.cpp index 511b0dbc..159e073c 100644 --- a/perf/profile/coroutine_post_bench.cpp +++ b/perf/profile/coroutine_post_bench.cpp @@ -65,7 +65,7 @@ void run_workload( int batch_size, std::size_t capture_size) { - auto ioc = factory(); + auto ioc = factory( 1 ); auto ex = ioc->get_executor(); std::atomic counter{0}; @@ -159,7 +159,7 @@ void run_profiler_workload( // Warmup std::cout << "Warming up (1 second)...\n"; { - auto ioc = factory(); + auto ioc = factory( 1 ); auto ex = ioc->get_executor(); std::atomic warmup_counter{0}; diff --git a/perf/profile/queue_depth_bench.cpp b/perf/profile/queue_depth_bench.cpp index 1d8e2ca7..24708b21 100644 --- a/perf/profile/queue_depth_bench.cpp +++ b/perf/profile/queue_depth_bench.cpp @@ -57,7 +57,7 @@ void run_workload( int queue_depth, int num_threads) { - auto ioc = factory(); + auto ioc = factory( num_threads ); auto ex = ioc->get_executor(); std::atomic counter{0}; @@ -147,7 +147,7 @@ void run_profiler_workload( // Warmup std::cout << "Warming up (1 second)...\n"; { - auto ioc = factory(); + auto ioc = factory( 1 ); auto ex = ioc->get_executor(); std::atomic warmup_counter{0}; diff --git a/perf/profile/scheduler_contention_bench.cpp b/perf/profile/scheduler_contention_bench.cpp index 1652db52..de90bd68 100644 --- a/perf/profile/scheduler_contention_bench.cpp +++ b/perf/profile/scheduler_contention_bench.cpp @@ -140,7 +140,7 @@ void run_balanced_workload( int num_threads, int batch_size) { - auto ioc = factory(); + auto ioc = factory( num_threads ); std::atomic counter{0}; std::atomic stop{false}; @@ -223,7 +223,7 @@ void run_post_only_workload( int num_threads, int batch_size) { - auto ioc = factory(); + auto ioc = factory( num_threads ); std::atomic counter{0}; std::atomic stop{false}; @@ -325,7 +325,7 @@ void run_run_only_workload( int num_threads, int queue_depth) { - auto ioc = factory(); + auto ioc = factory( num_threads ); std::atomic counter{0}; std::atomic stop{false}; @@ -433,7 +433,7 @@ void run_profiler_workload( // Warmup - main thread participates, no sleeping std::cout << "Warming up (1 second)...\n"; { - auto ioc = factory(); + auto ioc = factory( num_threads ); std::atomic warmup_counter{0}; std::atomic stop{false}; diff --git a/perf/profile/small_io_bench.cpp b/perf/profile/small_io_bench.cpp index e3835edb..f1a3399a 100644 --- a/perf/profile/small_io_bench.cpp +++ b/perf/profile/small_io_bench.cpp @@ -83,7 +83,7 @@ void run_workload( std::size_t buffer_size, int num_pairs) { - auto ioc = factory(); + auto ioc = factory( 1 ); std::atomic ops{0}; std::atomic stop{false}; @@ -185,7 +185,7 @@ void run_profiler_workload( // Warmup std::cout << "Warming up (1 second)...\n"; { - auto ioc = factory(); + auto ioc = factory( 1 ); auto [a, b] = corosio::test::make_socket_pair(*ioc); a.set_no_delay(true); b.set_no_delay(true); diff --git a/src/corosio/src/detail/conditional_atomic.hpp b/src/corosio/src/detail/conditional_atomic.hpp new file mode 100644 index 00000000..b94bc0a0 --- /dev/null +++ b/src/corosio/src/detail/conditional_atomic.hpp @@ -0,0 +1,102 @@ +// +// Copyright (c) 2026 Michael Vandeberg +// +// Distributed under the Boost Software License, Version 1.0. (See accompanying +// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) +// +// Official repository: https://github.com/cppalliance/corosio +// + +#ifndef BOOST_COROSIO_SRC_DETAIL_CONDITIONAL_ATOMIC_HPP +#define BOOST_COROSIO_SRC_DETAIL_CONDITIONAL_ATOMIC_HPP + +#include + +/* + Conditional atomic for single-threaded optimization. + + Keeps std::atomic as storage but when disabled (concurrency_hint == 1), + decomposes RMW ops (fetch_add, fetch_sub, exchange) into relaxed + load + modify + relaxed store — no exclusive pairs, no barriers. + On ARM64 this turns ldaxr/stlxr (~10-20 cycles) into plain + ldr/add/str (~3 cycles). On x86 (TSO) it eliminates LOCK prefixes. + + load/store downgrade their memory ordering to relaxed when disabled. +*/ + +namespace boost::corosio::detail { + +template +class conditional_atomic +{ +public: + explicit conditional_atomic(T initial = T{}, bool enabled = true) noexcept + : atomic_(initial), enabled_(enabled) {} + + conditional_atomic(conditional_atomic const&) = delete; + conditional_atomic& operator=(conditional_atomic const&) = delete; + + T load(std::memory_order order) const noexcept + { + return atomic_.load(enabled_ ? order : std::memory_order_relaxed); + } + + void store(T val, std::memory_order order) noexcept + { + atomic_.store(val, enabled_ ? order : std::memory_order_relaxed); + } + + T fetch_add(T arg, std::memory_order order) noexcept + { + if (enabled_) + return atomic_.fetch_add(arg, order); + T old = atomic_.load(std::memory_order_relaxed); + atomic_.store(old + arg, std::memory_order_relaxed); + return old; + } + + T fetch_sub(T arg, std::memory_order order) noexcept + { + if (enabled_) + return atomic_.fetch_sub(arg, order); + T old = atomic_.load(std::memory_order_relaxed); + atomic_.store(old - arg, std::memory_order_relaxed); + return old; + } + + T exchange(T val, std::memory_order order) noexcept + { + if (enabled_) + return atomic_.exchange(val, order); + T old = atomic_.load(std::memory_order_relaxed); + atomic_.store(val, std::memory_order_relaxed); + return old; + } + + bool compare_exchange_strong( + T& expected, T desired, + std::memory_order success, std::memory_order failure) noexcept + { + if (enabled_) + return atomic_.compare_exchange_strong(expected, desired, success, failure); + T current = atomic_.load(std::memory_order_relaxed); + if (current == expected) + { + atomic_.store(desired, std::memory_order_relaxed); + return true; + } + expected = current; + return false; + } + + void set_enabled(bool v) noexcept { enabled_ = v; } + bool enabled() const noexcept { return enabled_; } + +private: + std::atomic atomic_; + bool enabled_; +}; + +} // namespace boost::corosio::detail + +#endif // BOOST_COROSIO_SRC_DETAIL_CONDITIONAL_ATOMIC_HPP diff --git a/src/corosio/src/detail/conditional_mutex.hpp b/src/corosio/src/detail/conditional_mutex.hpp new file mode 100644 index 00000000..c0491a66 --- /dev/null +++ b/src/corosio/src/detail/conditional_mutex.hpp @@ -0,0 +1,170 @@ +// +// Copyright (c) 2026 Michael Vandeberg +// +// Distributed under the Boost Software License, Version 1.0. (See accompanying +// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) +// +// Official repository: https://github.com/cppalliance/corosio +// + +#ifndef BOOST_COROSIO_SRC_DETAIL_CONDITIONAL_MUTEX_HPP +#define BOOST_COROSIO_SRC_DETAIL_CONDITIONAL_MUTEX_HPP + +#include +#include +#include + +/* + Conditional locking primitives for single-threaded optimization. + + When concurrency_hint == 1, the user guarantees single-threaded access. + All mutex operations become no-ops, eliminating pthread_mutex overhead + on every I/O operation. + + When locking is enabled, lock() spins briefly (spin_count_ iterations) + before falling back to the OS mutex. This avoids the ~1-2μs futex + round-trip for the scheduler's short critical sections. + + conditional_mutex satisfies BasicLockable, so std::lock_guard works + via CTAD. The scheduler uses conditional_unique_lock + conditional_event + because std::condition_variable::wait() requires std::unique_lock. +*/ + +namespace boost::corosio::detail { + +inline void spin_pause() noexcept +{ +#if defined(__aarch64__) || defined(_M_ARM64) + __asm__ volatile("yield"); +#elif defined(__x86_64__) || defined(__i386__) || defined(_M_X64) || defined(_M_IX86) + __builtin_ia32_pause(); +#endif +} + +class conditional_mutex +{ +public: + explicit conditional_mutex(bool enabled = true, int spin_count = 16) noexcept + : enabled_(enabled) + , spin_count_(spin_count) + { + } + + conditional_mutex(conditional_mutex const&) = delete; + conditional_mutex& operator=(conditional_mutex const&) = delete; + + void lock() + { + if (!enabled_) + return; + for (int i = 0; i < spin_count_; ++i) + { + if (mutex_.try_lock()) + return; + spin_pause(); + } + mutex_.lock(); + } + + void unlock() + { + if (enabled_) + mutex_.unlock(); + } + + bool try_lock() + { + return !enabled_ || mutex_.try_lock(); + } + + void set_enabled(bool v) noexcept { enabled_ = v; } + void set_spin_count(int n) noexcept { spin_count_ = n; } + bool enabled() const noexcept { return enabled_; } + int spin_count() const noexcept { return spin_count_; } + std::mutex& underlying() noexcept { return mutex_; } + +private: + std::mutex mutex_; + bool enabled_; + int spin_count_; +}; + +class conditional_unique_lock +{ +public: + explicit conditional_unique_lock(conditional_mutex& m) + : real_lock_(m.underlying(), std::defer_lock) + , enabled_(m.enabled()) + , spin_count_(m.spin_count()) + { + if (enabled_) + spin_lock(); + } + + conditional_unique_lock(conditional_unique_lock const&) = delete; + conditional_unique_lock& operator=(conditional_unique_lock const&) = delete; + + void lock() + { + if (enabled_) + spin_lock(); + } + + void unlock() + { + if (enabled_) + real_lock_.unlock(); + } + + bool owns_lock() const noexcept + { + return !enabled_ || real_lock_.owns_lock(); + } + + std::unique_lock& underlying() noexcept { return real_lock_; } + +private: + void spin_lock() + { + for (int i = 0; i < spin_count_; ++i) + { + if (real_lock_.try_lock()) + return; + spin_pause(); + } + real_lock_.lock(); + } + + std::unique_lock real_lock_; + bool enabled_; + int spin_count_; +}; + +class conditional_event +{ +public: + void notify_one() { cond_.notify_one(); } + void notify_all() { cond_.notify_all(); } + + void wait(conditional_unique_lock& lock) + { + if (lock.underlying().owns_lock()) + cond_.wait(lock.underlying()); + } + + template + void wait_for( + conditional_unique_lock& lock, + std::chrono::duration const& dur) + { + if (lock.underlying().owns_lock()) + cond_.wait_for(lock.underlying(), dur); + } + +private: + std::condition_variable cond_; +}; + +} // namespace boost::corosio::detail + +#endif // BOOST_COROSIO_SRC_DETAIL_CONDITIONAL_MUTEX_HPP diff --git a/src/corosio/src/detail/kqueue/op.hpp b/src/corosio/src/detail/kqueue/op.hpp index b9b34902..d3a8bc92 100644 --- a/src/corosio/src/detail/kqueue/op.hpp +++ b/src/corosio/src/detail/kqueue/op.hpp @@ -22,6 +22,7 @@ #include #include +#include "src/detail/conditional_mutex.hpp" #include "src/detail/scheduler_op.hpp" #include @@ -31,7 +32,6 @@ #include #include #include -#include #include #include @@ -118,7 +118,7 @@ class kqueue_scheduler; */ struct descriptor_state : scheduler_op { - std::mutex mutex; + conditional_mutex mutex; // Protected by mutex kqueue_op* read_op = nullptr; diff --git a/src/corosio/src/detail/kqueue/scheduler.cpp b/src/corosio/src/detail/kqueue/scheduler.cpp index 61631a52..fe9fcc96 100644 --- a/src/corosio/src/detail/kqueue/scheduler.cpp +++ b/src/corosio/src/detail/kqueue/scheduler.cpp @@ -124,7 +124,7 @@ find_context(kqueue_scheduler const* self) noexcept void flush_private_work( scheduler_context* ctx, - std::atomic& outstanding_work) noexcept + conditional_atomic& outstanding_work) noexcept { if (ctx && ctx->private_outstanding_work > 0) { @@ -140,7 +140,7 @@ flush_private_work( bool drain_private_queue( scheduler_context* ctx, - std::atomic& outstanding_work, + conditional_atomic& outstanding_work, op_queue& completed_ops) noexcept { if (!ctx || ctx->private_queue.empty()) @@ -365,14 +365,16 @@ operator()() kqueue_scheduler:: kqueue_scheduler( capy::execution_context& ctx, - int) + int concurrency_hint) : kq_fd_(-1) - , outstanding_work_(0) - , stopped_(false) + , mutex_(concurrency_hint != 1) + , outstanding_work_(0, concurrency_hint != 1) + , stopped_(false, concurrency_hint != 1) , shutdown_(false) , task_running_(false) , task_interrupted_(false) , state_(0) + , user_event_armed_(false, concurrency_hint != 1) { // FreeBSD 13+: kqueue1(O_CLOEXEC) available kq_fd_ = ::kqueue(); @@ -425,7 +427,7 @@ kqueue_scheduler:: shutdown() { { - std::unique_lock lock(mutex_); + conditional_unique_lock lock(mutex_); shutdown_ = true; while (auto* h = completed_ops_.pop()) @@ -494,7 +496,7 @@ post(std::coroutine_handle<> h) const // Slow path: cross-thread post requires mutex outstanding_work_.fetch_add(1, std::memory_order_relaxed); - std::unique_lock lock(mutex_); + conditional_unique_lock lock(mutex_); completed_ops_.push(ph.release()); wake_one_thread_and_unlock(lock); } @@ -515,7 +517,7 @@ post(scheduler_op* h) const // Slow path: cross-thread post requires mutex outstanding_work_.fetch_add(1, std::memory_order_relaxed); - std::unique_lock lock(mutex_); + conditional_unique_lock lock(mutex_); completed_ops_.push(h); wake_one_thread_and_unlock(lock); } @@ -549,7 +551,7 @@ void kqueue_scheduler:: stop() { - std::unique_lock lock(mutex_); + conditional_unique_lock lock(mutex_); if (!stopped_.load(std::memory_order_relaxed)) { stopped_.store(true, std::memory_order_release); @@ -569,7 +571,7 @@ void kqueue_scheduler:: restart() { - std::unique_lock lock(mutex_); + conditional_unique_lock lock(mutex_); stopped_.store(false, std::memory_order_release); } @@ -584,7 +586,7 @@ run() } thread_context_guard ctx(this); - std::unique_lock lock(mutex_); + conditional_unique_lock lock(mutex_); std::size_t n = 0; for (;;) @@ -610,7 +612,7 @@ run_one() } thread_context_guard ctx(this); - std::unique_lock lock(mutex_); + conditional_unique_lock lock(mutex_); return do_one(lock, -1, &ctx.frame_); } @@ -625,7 +627,7 @@ wait_one(long usec) } thread_context_guard ctx(this); - std::unique_lock lock(mutex_); + conditional_unique_lock lock(mutex_); return do_one(lock, usec, &ctx.frame_); } @@ -640,7 +642,7 @@ poll() } thread_context_guard ctx(this); - std::unique_lock lock(mutex_); + conditional_unique_lock lock(mutex_); std::size_t n = 0; for (;;) @@ -666,7 +668,7 @@ poll_one() } thread_context_guard ctx(this); - std::unique_lock lock(mutex_); + conditional_unique_lock lock(mutex_); return do_one(lock, 0, &ctx.frame_); } @@ -686,6 +688,8 @@ register_descriptor(int fd, descriptor_state* desc) const desc->registered_events = kqueue_event_read | kqueue_event_write; desc->fd = fd; desc->scheduler_ = this; + desc->mutex.set_enabled(locking_enabled()); + desc->mutex.set_spin_count(mutex_.spin_count()); std::lock_guard lock(desc->mutex); desc->read_ready = false; @@ -722,7 +726,7 @@ work_finished() const noexcept // signal_all() wakes threads waiting on the condvar. // interrupt_reactor() wakes the reactor thread blocked in kevent(). // Both are needed because they target different blocking mechanisms. - std::unique_lock lock(mutex_); + conditional_unique_lock lock(mutex_); signal_all(lock); if (task_running_ && !task_interrupted_) { @@ -751,7 +755,7 @@ drain_thread_queue(op_queue& queue, std::int64_t count) const if (count > 0) outstanding_work_.fetch_add(count, std::memory_order_relaxed); - std::unique_lock lock(mutex_); + conditional_unique_lock lock(mutex_); completed_ops_.splice(queue); if (count > 0) maybe_unlock_and_signal_one(lock); @@ -772,7 +776,7 @@ post_deferred_completions(op_queue& ops) const } // Slow path: add to global queue and wake a thread - std::unique_lock lock(mutex_); + conditional_unique_lock lock(mutex_); completed_ops_.splice(ops); wake_one_thread_and_unlock(lock); } @@ -798,7 +802,7 @@ interrupt_reactor() const void kqueue_scheduler:: -signal_all(std::unique_lock&) const +signal_all(conditional_unique_lock&) const { state_ |= signaled_bit; cond_.notify_all(); @@ -806,7 +810,7 @@ signal_all(std::unique_lock&) const bool kqueue_scheduler:: -maybe_unlock_and_signal_one(std::unique_lock& lock) const +maybe_unlock_and_signal_one(conditional_unique_lock& lock) const { state_ |= signaled_bit; if (state_ > signaled_bit) @@ -820,7 +824,7 @@ maybe_unlock_and_signal_one(std::unique_lock& lock) const void kqueue_scheduler:: -unlock_and_signal_one(std::unique_lock& lock) const +unlock_and_signal_one(conditional_unique_lock& lock) const { state_ |= signaled_bit; bool have_waiters = state_ > signaled_bit; @@ -838,7 +842,7 @@ clear_signal() const void kqueue_scheduler:: -wait_for_signal(std::unique_lock& lock) const +wait_for_signal(conditional_unique_lock& lock) const { while ((state_ & signaled_bit) == 0) { @@ -851,7 +855,7 @@ wait_for_signal(std::unique_lock& lock) const void kqueue_scheduler:: wait_for_signal_for( - std::unique_lock& lock, + conditional_unique_lock& lock, long timeout_us) const { if ((state_ & signaled_bit) == 0) @@ -864,7 +868,7 @@ wait_for_signal_for( void kqueue_scheduler:: -wake_one_thread_and_unlock(std::unique_lock& lock) const +wake_one_thread_and_unlock(conditional_unique_lock& lock) const { if (maybe_unlock_and_signal_one(lock)) return; @@ -926,7 +930,7 @@ calculate_timeout(long requested_timeout_us) const struct work_cleanup { kqueue_scheduler const* scheduler; - std::unique_lock* lock; + conditional_unique_lock* lock; scheduler_context* ctx; ~work_cleanup() @@ -979,7 +983,7 @@ struct task_cleanup void kqueue_scheduler:: -run_task(std::unique_lock& lock, scheduler_context* ctx) +run_task(conditional_unique_lock& lock, scheduler_context* ctx) { long effective_timeout_us = task_interrupted_ ? 0 : calculate_timeout(-1); @@ -1105,7 +1109,7 @@ run_task(std::unique_lock& lock, scheduler_context* ctx) std::size_t kqueue_scheduler:: -do_one(std::unique_lock& lock, long timeout_us, scheduler_context* ctx) +do_one(conditional_unique_lock& lock, long timeout_us, scheduler_context* ctx) { for (;;) { diff --git a/src/corosio/src/detail/kqueue/scheduler.hpp b/src/corosio/src/detail/kqueue/scheduler.hpp index 6f477709..c147ea72 100644 --- a/src/corosio/src/detail/kqueue/scheduler.hpp +++ b/src/corosio/src/detail/kqueue/scheduler.hpp @@ -17,14 +17,13 @@ #include #include +#include "src/detail/conditional_atomic.hpp" +#include "src/detail/conditional_mutex.hpp" #include "src/detail/scheduler_impl.hpp" #include "src/detail/scheduler_op.hpp" -#include -#include #include #include -#include namespace boost::corosio::detail { @@ -204,9 +203,9 @@ class kqueue_scheduler friend struct work_cleanup; friend struct task_cleanup; - std::size_t do_one(std::unique_lock& lock, long timeout_us, scheduler_context* ctx); - void run_task(std::unique_lock& lock, scheduler_context* ctx); - void wake_one_thread_and_unlock(std::unique_lock& lock) const; + std::size_t do_one(conditional_unique_lock& lock, long timeout_us, scheduler_context* ctx); + void run_task(conditional_unique_lock& lock, scheduler_context* ctx); + void wake_one_thread_and_unlock(conditional_unique_lock& lock) const; void interrupt_reactor() const; long calculate_timeout(long requested_timeout_us) const; @@ -217,7 +216,7 @@ class kqueue_scheduler @param lock The held mutex lock. */ - void signal_all(std::unique_lock& lock) const; + void signal_all(conditional_unique_lock& lock) const; /** Set the signaled state and wake one waiter if any exist. @@ -232,7 +231,7 @@ class kqueue_scheduler @return `true` if unlocked and signaled, `false` if lock still held. */ - bool maybe_unlock_and_signal_one(std::unique_lock& lock) const; + bool maybe_unlock_and_signal_one(conditional_unique_lock& lock) const; /** Set the signaled state, unlock, and wake one waiter if any exist. @@ -244,7 +243,7 @@ class kqueue_scheduler @param lock The held mutex lock. */ - void unlock_and_signal_one(std::unique_lock& lock) const; + void unlock_and_signal_one(conditional_unique_lock& lock) const; /** Clear the signaled state before waiting. @@ -264,7 +263,7 @@ class kqueue_scheduler @param lock The held mutex lock. */ - void wait_for_signal(std::unique_lock& lock) const; + void wait_for_signal(conditional_unique_lock& lock) const; /** Block until signaled or timeout expires. @@ -275,16 +274,18 @@ class kqueue_scheduler @param timeout_us Maximum time to wait in microseconds. */ void wait_for_signal_for( - std::unique_lock& lock, + conditional_unique_lock& lock, long timeout_us) const; + bool locking_enabled() const noexcept { return mutex_.enabled(); } + int kq_fd_; int max_inline_budget_ = 2; - mutable std::mutex mutex_; - mutable std::condition_variable cond_; + mutable conditional_mutex mutex_; + mutable conditional_event cond_; mutable op_queue completed_ops_; - mutable std::atomic outstanding_work_{0}; - std::atomic stopped_{false}; + mutable conditional_atomic outstanding_work_{0}; + conditional_atomic stopped_{false}; bool shutdown_ = false; // True while a thread is blocked in kevent(). Used by @@ -303,7 +304,7 @@ class kqueue_scheduler mutable std::size_t state_ = 0; // EVFILT_USER idempotency: prevents redundant NOTE_TRIGGER writes - mutable std::atomic user_event_armed_{false}; + mutable conditional_atomic user_event_armed_{false}; // Sentinel operation for interleaving reactor runs with handler execution. // Ensures the reactor runs periodically even when handlers are continuously