Skip to content

Commit e821dcf

Browse files
committed
perf(runtime): improve scheduler, run queue, and task execution model
- optimize RuntimeExecutor for better task dispatch efficiency - refine Scheduler and Worker coordination to reduce contention - improve RunQueue structure and task scheduling behavior - adjust Budget handling for more predictable execution slices - update Task and Runtime internals for lower overhead Result: - reduced latency under load - improved throughput and stability in concurrent scenarios - groundwork for higher scalability in HTTP/WebSocket workloads
1 parent 8da3b1c commit e821dcf

7 files changed

Lines changed: 761 additions & 101 deletions

File tree

include/vix/executor/RuntimeExecutor.hpp

Lines changed: 186 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -141,13 +141,13 @@ namespace vix::executor
141141
{
142142
std::lock_guard<std::mutex> lock(lifecycleMutex_);
143143

144+
state_->accepting.store(false, std::memory_order_release);
145+
144146
if (!started_.load(std::memory_order_acquire))
145147
{
146-
state_->accepting.store(false, std::memory_order_release);
147148
return;
148149
}
149150

150-
state_->accepting.store(false, std::memory_order_release);
151151
runtime_->stop();
152152
started_.store(false, std::memory_order_release);
153153
}
@@ -162,6 +162,7 @@ namespace vix::executor
162162
*/
163163
void stop_and_wait()
164164
{
165+
state_->accepting.store(false, std::memory_order_release);
165166
wait_idle();
166167
stop();
167168
}
@@ -186,6 +187,16 @@ namespace vix::executor
186187
return runtime_->running();
187188
}
188189

190+
/**
191+
* @brief Return whether the executor is currently accepting new work.
192+
*
193+
* @return true if new submissions may still be accepted.
194+
*/
195+
[[nodiscard]] bool accepting() const noexcept
196+
{
197+
return state_->accepting.load(std::memory_order_acquire);
198+
}
199+
189200
/**
190201
* @brief Submit a pre-built runtime task directly.
191202
*
@@ -195,12 +206,23 @@ namespace vix::executor
195206
*/
196207
[[nodiscard]] bool submit(vix::runtime::Task task)
197208
{
198-
if (!state_->accepting.load(std::memory_order_acquire))
209+
if (!accepting())
199210
{
211+
state_->rejected.fetch_add(1, std::memory_order_relaxed);
200212
return false;
201213
}
202214

203-
return runtime_->submit(std::move(task));
215+
const bool accepted = runtime_->submit(std::move(task));
216+
if (accepted)
217+
{
218+
state_->submitted.fetch_add(1, std::memory_order_relaxed);
219+
}
220+
else
221+
{
222+
state_->rejected.fetch_add(1, std::memory_order_relaxed);
223+
}
224+
225+
return accepted;
204226
}
205227

206228
/**
@@ -217,12 +239,104 @@ namespace vix::executor
217239
[[nodiscard]] bool submit(vix::runtime::TaskFn fn,
218240
std::uint32_t affinity = 0)
219241
{
220-
if (!state_->accepting.load(std::memory_order_acquire))
242+
if (!fn)
243+
{
244+
state_->rejected.fetch_add(1, std::memory_order_relaxed);
245+
return false;
246+
}
247+
248+
if (!accepting())
249+
{
250+
state_->rejected.fetch_add(1, std::memory_order_relaxed);
251+
return false;
252+
}
253+
254+
const bool accepted = runtime_->submit(std::move(fn), affinity);
255+
if (accepted)
256+
{
257+
state_->submitted.fetch_add(1, std::memory_order_relaxed);
258+
}
259+
else
260+
{
261+
state_->rejected.fetch_add(1, std::memory_order_relaxed);
262+
}
263+
264+
return accepted;
265+
}
266+
267+
/**
268+
* @brief Submit a hot-path HTTP task with minimal executor overhead.
269+
*
270+
* This path is intentionally lighter than @ref post:
271+
* - no active task tracking
272+
* - no timeout measurement
273+
* - no extra wrapper bookkeeping beyond submission counters
274+
*
275+
* It is designed for short-lived request handlers that complete quickly
276+
* and do not require generic executor accounting.
277+
*
278+
* @param fn Runtime task function.
279+
* @param affinity Optional worker affinity hint.
280+
*
281+
* @return true if the task was accepted, false otherwise.
282+
*/
283+
[[nodiscard]] bool post_http_fast(vix::runtime::TaskFn fn,
284+
std::uint32_t affinity = 0)
285+
{
286+
if (!fn)
287+
{
288+
state_->rejected.fetch_add(1, std::memory_order_relaxed);
289+
return false;
290+
}
291+
292+
if (!accepting())
293+
{
294+
state_->rejected.fetch_add(1, std::memory_order_relaxed);
295+
return false;
296+
}
297+
298+
const bool accepted = runtime_->submit(std::move(fn), affinity);
299+
if (accepted)
300+
{
301+
state_->submitted.fetch_add(1, std::memory_order_relaxed);
302+
state_->fastSubmitted.fetch_add(1, std::memory_order_relaxed);
303+
}
304+
else
305+
{
306+
state_->rejected.fetch_add(1, std::memory_order_relaxed);
307+
}
308+
309+
return accepted;
310+
}
311+
312+
/**
313+
* @brief Submit a simple HTTP task from a void callable.
314+
*
315+
* This is a convenience overload for short handlers that do not need to
316+
* return a runtime result manually.
317+
*
318+
* @param fn Void task function.
319+
* @param affinity Optional worker affinity hint.
320+
*
321+
* @return true if the task was accepted, false otherwise.
322+
*/
323+
[[nodiscard]] bool post_http_fast(std::function<void()> fn,
324+
std::uint32_t affinity = 0)
325+
{
326+
if (!fn)
221327
{
328+
state_->rejected.fetch_add(1, std::memory_order_relaxed);
222329
return false;
223330
}
224331

225-
return runtime_->submit(std::move(fn), affinity);
332+
vix::runtime::TaskFn runtimeFn =
333+
[task = std::move(fn)]() mutable -> vix::runtime::TaskResult
334+
{
335+
task();
336+
return vix::runtime::TaskResult::complete;
337+
};
338+
339+
return post_http_fast(std::move(runtimeFn), affinity);
226340
}
227341

228342
/**
@@ -245,17 +359,19 @@ namespace vix::executor
245359
{
246360
if (!fn)
247361
{
362+
state_->rejected.fetch_add(1, std::memory_order_relaxed);
248363
return false;
249364
}
250365

251-
if (!state_->accepting.load(std::memory_order_acquire))
366+
if (!accepting())
252367
{
368+
state_->rejected.fetch_add(1, std::memory_order_relaxed);
253369
return false;
254370
}
255371

256372
const auto shared = state_;
257373

258-
return runtime_->submit(
374+
const bool accepted = runtime_->submit(
259375
[shared,
260376
task = std::move(fn),
261377
options = opt]() mutable -> vix::runtime::TaskResult
@@ -278,7 +394,7 @@ namespace vix::executor
278394

279395
if (elapsed > options.timeout)
280396
{
281-
shared->timed_out.fetch_add(1, std::memory_order_relaxed);
397+
shared->timedOut.fetch_add(1, std::memory_order_relaxed);
282398
}
283399
}
284400

@@ -296,13 +412,25 @@ namespace vix::executor
296412

297413
if (elapsed > options.timeout)
298414
{
299-
shared->timed_out.fetch_add(1, std::memory_order_relaxed);
415+
shared->timedOut.fetch_add(1, std::memory_order_relaxed);
300416
}
301417
}
302418

419+
shared->failed.fetch_add(1, std::memory_order_relaxed);
303420
return vix::runtime::TaskResult::failed;
304421
}
305422
});
423+
424+
if (accepted)
425+
{
426+
state_->submitted.fetch_add(1, std::memory_order_relaxed);
427+
}
428+
else
429+
{
430+
state_->rejected.fetch_add(1, std::memory_order_relaxed);
431+
}
432+
433+
return accepted;
306434
}
307435

308436
/**
@@ -315,7 +443,7 @@ namespace vix::executor
315443
vix::executor::Metrics m;
316444
m.pending = static_cast<std::uint64_t>(runtime_->size());
317445
m.active = state_->active.load(std::memory_order_relaxed);
318-
m.timed_out = state_->timed_out.load(std::memory_order_relaxed);
446+
m.timed_out = state_->timedOut.load(std::memory_order_relaxed);
319447
return m;
320448
}
321449

@@ -338,6 +466,46 @@ namespace vix::executor
338466
}
339467
}
340468

469+
/**
470+
* @brief Return the number of submitted tasks.
471+
*
472+
* @return Submitted task count.
473+
*/
474+
[[nodiscard]] std::uint64_t submitted_tasks() const noexcept
475+
{
476+
return state_->submitted.load(std::memory_order_relaxed);
477+
}
478+
479+
/**
480+
* @brief Return the number of rejected tasks.
481+
*
482+
* @return Rejected task count.
483+
*/
484+
[[nodiscard]] std::uint64_t rejected_tasks() const noexcept
485+
{
486+
return state_->rejected.load(std::memory_order_relaxed);
487+
}
488+
489+
/**
490+
* @brief Return the number of fast-path HTTP submissions.
491+
*
492+
* @return Fast-path HTTP task count.
493+
*/
494+
[[nodiscard]] std::uint64_t fast_http_submitted_tasks() const noexcept
495+
{
496+
return state_->fastSubmitted.load(std::memory_order_relaxed);
497+
}
498+
499+
/**
500+
* @brief Return the number of failed posted tasks.
501+
*
502+
* @return Failed task count observed by post().
503+
*/
504+
[[nodiscard]] std::uint64_t failed_tasks() const noexcept
505+
{
506+
return state_->failed.load(std::memory_order_relaxed);
507+
}
508+
341509
/**
342510
* @brief Return the underlying runtime.
343511
*
@@ -368,7 +536,11 @@ namespace vix::executor
368536
struct SharedState
369537
{
370538
std::atomic<std::uint64_t> active{0};
371-
std::atomic<std::uint64_t> timed_out{0};
539+
std::atomic<std::uint64_t> timedOut{0};
540+
std::atomic<std::uint64_t> submitted{0};
541+
std::atomic<std::uint64_t> rejected{0};
542+
std::atomic<std::uint64_t> fastSubmitted{0};
543+
std::atomic<std::uint64_t> failed{0};
372544
std::atomic<bool> accepting{false};
373545
};
374546

@@ -381,10 +553,10 @@ namespace vix::executor
381553
*/
382554
[[nodiscard]] static vix::runtime::RuntimeConfig make_config_from_workers(std::uint32_t workers)
383555
{
384-
const std::uint32_t normalized_workers = std::max<std::uint32_t>(1u, workers);
556+
const std::uint32_t normalizedWorkers = std::max<std::uint32_t>(1u, workers);
385557

386558
return vix::runtime::RuntimeConfig{
387-
normalized_workers,
559+
normalizedWorkers,
388560
vix::runtime::BudgetConfig{16}};
389561
}
390562

0 commit comments

Comments
 (0)