From 6534e4f3222e477ed3f06090f558378bf1b3e5ed Mon Sep 17 00:00:00 2001 From: Tudor Andrei Dicu Date: Thu, 12 Feb 2026 09:52:36 +0200 Subject: [PATCH 1/3] Sync primitives timeout refactor and high prio task yield --- examples/espressif/esp/src/lwip/exports.zig | 22 +- examples/espressif/esp/src/rtos.zig | 4 +- port/espressif/esp/src/hal/radio/osi.zig | 59 ++-- port/espressif/esp/src/hal/radio/timer.zig | 8 +- port/espressif/esp/src/hal/rtos.zig | 369 ++++++++++++-------- 5 files changed, 260 insertions(+), 202 deletions(-) diff --git a/examples/espressif/esp/src/lwip/exports.zig b/examples/espressif/esp/src/lwip/exports.zig index 4e4a88e7b..0eed8d1c2 100644 --- a/examples/espressif/esp/src/lwip/exports.zig +++ b/examples/espressif/esp/src/lwip/exports.zig @@ -106,16 +106,13 @@ export fn sys_mbox_set_invalid(ptr: *c.sys_mbox_t) void { export fn sys_mbox_post(ptr: *c.sys_mbox_t, element: MailboxElement) void { const mailbox: *Mailbox = @ptrCast(@alignCast(ptr.*)); - mailbox.put_one(element, null) catch unreachable; + mailbox.put_one(element, .never) catch unreachable; } export fn sys_mbox_trypost(ptr: *c.sys_mbox_t, element: MailboxElement) c.err_t { const mailbox: *Mailbox = @ptrCast(@alignCast(ptr.*)); - if (mailbox.put_one_non_blocking(element)) { - return c.ERR_OK; - } else { - return c.ERR_MEM; - } + mailbox.put_one(element, .non_blocking) catch return c.ERR_MEM; + return c.ERR_OK; } comptime { @@ -125,7 +122,7 @@ comptime { export fn sys_arch_mbox_fetch(ptr: *c.sys_mbox_t, element_ptr: *MailboxElement, timeout: u32) u32 { const mailbox: *Mailbox = @ptrCast(@alignCast(ptr.*)); const now = esp.time.get_time_since_boot(); - element_ptr.* = mailbox.get_one(if (timeout != 0) .from_ms(timeout) else null) catch { + element_ptr.* = mailbox.get_one(if (timeout != 0) .{ .after = .from_ms(timeout) } else .never) catch { return c.SYS_ARCH_TIMEOUT; }; // returns waiting time in ms @@ -134,12 +131,9 @@ export fn sys_arch_mbox_fetch(ptr: *c.sys_mbox_t, element_ptr: *MailboxElement, export fn sys_arch_mbox_tryfetch(ptr: *c.sys_mbox_t, element_ptr: *MailboxElement) u32 { const mailbox: *Mailbox = @ptrCast(@alignCast(ptr.*)); - if (mailbox.get_one_non_blocking()) |element| { - element_ptr.* = element; - return 0; - } else { - return c.SYS_MBOX_EMPTY; - } + const element = mailbox.get_one(.non_blocking) catch return c.SYS_MBOX_EMPTY; + element_ptr.* = element; + return 0; } export fn sys_sem_new(ptr: *c.sys_sem_t, count: u8) c.err_t { @@ -165,7 +159,7 @@ export fn sys_sem_signal(ptr: *c.sys_sem_t) void { export fn sys_arch_sem_wait(ptr: *c.sys_sem_t, timeout: u32) u32 { const sem: *rtos.Semaphore = @ptrCast(@alignCast(ptr.*)); const now = esp.time.get_time_since_boot(); - sem.take_with_timeout(if (timeout != 0) .from_ms(timeout) else null) catch { + sem.take_with_timeout(if (timeout != 0) .{ .after = .from_ms(timeout) } else .never) catch { return c.SYS_ARCH_TIMEOUT; }; // returns waiting time in ms diff --git a/examples/espressif/esp/src/rtos.zig b/examples/espressif/esp/src/rtos.zig index 8e065010f..eb1a816c3 100644 --- a/examples/espressif/esp/src/rtos.zig +++ b/examples/espressif/esp/src/rtos.zig @@ -26,7 +26,7 @@ pub const microzig_options: microzig.Options = .{ fn task1(queue: *rtos.Queue(u32)) void { for (0..5) |i| { - queue.put_one(i, null) catch unreachable; + queue.put_one(i, .never) catch unreachable; rtos.sleep(.from_ms(500)); } } @@ -44,7 +44,7 @@ pub fn main() !void { defer rtos.wait_and_free(gpa, task); while (true) { - const item = try queue.get_one(.from_ms(1000)); + const item = try queue.get_one(.{ .after = .from_ms(1000) }); std.log.info("got item: {}", .{item}); } } diff --git a/port/espressif/esp/src/hal/radio/osi.zig b/port/espressif/esp/src/hal/radio/osi.zig index 61907eda3..0bbc9dc14 100644 --- a/port/espressif/esp/src/hal/radio/osi.zig +++ b/port/espressif/esp/src/hal/radio/osi.zig @@ -319,11 +319,12 @@ pub fn semphr_take(ptr: ?*anyopaque, tick: u32) callconv(.c) i32 { log.debug("semphr_take {?} {}", .{ ptr, tick }); const sem: *rtos.Semaphore = @ptrCast(@alignCast(ptr)); - const maybe_timeout: ?rtos.Duration = if (tick == c.OSI_FUNCS_TIME_BLOCKING) - .from_ticks(tick) - else - null; - sem.take_with_timeout(maybe_timeout) catch { + const timeout: rtos.Timeout = switch (tick) { + 0 => .non_blocking, + c.OSI_FUNCS_TIME_BLOCKING => .never, + else => |ticks| .{ .after = .from_ticks(ticks) }, + }; + sem.take_with_timeout(timeout) catch { log.debug(">>>> return from semaphore take with timeout: {*}", .{sem}); return 1; }; @@ -368,10 +369,11 @@ const RecursiveMutex = struct { if (@intFromEnum(current_task.priority) > @intFromEnum(owning_task.priority)) { mutex.prev_priority = owning_task.priority; owning_task.priority = current_task.priority; - rtos.make_ready(owning_task); + var _hptw = false; + rtos.make_ready(owning_task, &_hptw); } - mutex.wait_queue.wait(current_task, null); + mutex.wait_queue.wait(null); } assert(mutex.value == 0); @@ -395,8 +397,13 @@ const RecursiveMutex = struct { owning_task.priority = prev_priority; mutex.prev_priority = null; } + mutex.owning_task = null; - mutex.wait_queue.wake_one(); + + var hptw = false; + mutex.wait_queue.wake_one(&hptw); + if (hptw) rtos.yield_from_cs(.reschedule); + return true; } else { return false; @@ -505,17 +512,12 @@ pub fn queue_send(ptr: ?*anyopaque, item_ptr: ?*anyopaque, block_time_tick: u32) const queue: *QueueWrapper = @ptrCast(@alignCast(ptr)); const item: [*]const u8 = @ptrCast(@alignCast(item_ptr)); - const size = switch (block_time_tick) { - 0 => queue.inner.put_non_blocking(item[0..queue.item_len]), - else => queue.inner.put( - item[0..queue.item_len], - 1, - if (block_time_tick != c.OSI_FUNCS_TIME_BLOCKING) - .from_ticks(block_time_tick) - else - null, - ), + const timeout: rtos.Timeout = switch (block_time_tick) { + 0 => .non_blocking, + c.OSI_FUNCS_TIME_BLOCKING => .never, + else => |ticks| .{ .after = .from_ticks(ticks) }, }; + const size = queue.inner.put(item[0..queue.item_len], 1, timeout); if (size == 0) return -1; return 1; } @@ -525,9 +527,11 @@ pub fn queue_send_from_isr(ptr: ?*anyopaque, item_ptr: ?*anyopaque, _hptw: ?*any const queue: *QueueWrapper = @ptrCast(@alignCast(ptr)); const item: [*]const u8 = @ptrCast(@alignCast(item_ptr)); - const n = @divExact(queue.inner.put_non_blocking(item[0..queue.item_len]), queue.item_len); - @as(*u32, @ptrCast(@alignCast(_hptw))).* = @intFromBool(rtos.is_a_higher_priority_task_ready()); + var hptw = false; + const n = @divExact(queue.inner.put_from_isr(item[0..queue.item_len], &hptw), queue.item_len); + + @as(*u32, @ptrCast(@alignCast(_hptw))).* |= @intFromBool(hptw); return @intCast(n); } @@ -546,17 +550,12 @@ pub fn queue_recv(ptr: ?*anyopaque, item_ptr: ?*anyopaque, block_time_tick: u32) const queue: *QueueWrapper = @ptrCast(@alignCast(ptr)); const item: [*]u8 = @ptrCast(@alignCast(item_ptr)); - const size = switch (block_time_tick) { - 0 => queue.inner.get_non_blocking(item[0..queue.item_len]), - else => queue.inner.get( - item[0..queue.item_len], - queue.item_len, - if (block_time_tick != c.OSI_FUNCS_TIME_BLOCKING) - .from_ticks(block_time_tick) - else - null, - ), + const timeout: rtos.Timeout = switch (block_time_tick) { + 0 => .non_blocking, + c.OSI_FUNCS_TIME_BLOCKING => .never, + else => |ticks| .{ .after = .from_ticks(ticks) }, }; + const size = queue.inner.get(item[0..queue.item_len], 1, timeout); if (size == 0) return -1; return 1; } diff --git a/port/espressif/esp/src/hal/radio/timer.zig b/port/espressif/esp/src/hal/radio/timer.zig index 3653923af..f912e7203 100644 --- a/port/espressif/esp/src/hal/radio/timer.zig +++ b/port/espressif/esp/src/hal/radio/timer.zig @@ -150,16 +150,16 @@ fn task_fn() void { callback(arg); } - const sleep_duration: ?rtos.Duration = blk: { + const timeout: rtos.Timeout = blk: { mutex.lock(); defer mutex.unlock(); break :blk if (find_next_wake_absolute()) |next_wake_absolute| - .from_us(@truncate(next_wake_absolute.diff(now).to_us())) + .{ .after = .from_us(@truncate(next_wake_absolute.diff(now).to_us())) } else - null; + .never; }; - reload_semaphore.take_with_timeout(sleep_duration) catch {}; + reload_semaphore.take_with_timeout(timeout) catch {}; } } diff --git a/port/espressif/esp/src/hal/rtos.zig b/port/espressif/esp/src/hal/rtos.zig index b64b53036..3a0752e37 100644 --- a/port/espressif/esp/src/hal/rtos.zig +++ b/port/espressif/esp/src/hal/rtos.zig @@ -20,9 +20,11 @@ const systimer = @import("systimer.zig"); // task that was interrupted by force and vice versa. Because of the forced // yield, tasks are required to have a minimum stack size available at all // times. +// +// `hptw` stands for "high priority task woken". -// TODO: trigger context switch if a higher priority is awaken in sync -// primitives +// TODO: remove Condition +// TODO: reimplement TypeErasedQueue to store item_len // TODO: low power mode where tick interrupt is only triggered when necessary // TODO: investigate tick interrupt assembly and improve generated code // TODO: stack overflow detection @@ -148,7 +150,9 @@ pub fn init() void { if (rtos_options.paint_stack_byte) |paint_byte| { @memset(&idle_stack, paint_byte); } - make_ready(&idle_task); + + var _hptw = false; + make_ready(&idle_task, &_hptw); // unit0 is already enabled as it is used by `hal.time`. if (rtos_options.systimer_unit != .unit0) { @@ -255,7 +259,9 @@ pub fn spawn( const cs = enter_critical_section(); defer cs.leave(); - make_ready(task); + var hptw = false; + make_ready(task, &hptw); + if (hptw) yield_from_cs(.reschedule); return task; } @@ -268,7 +274,7 @@ pub fn wait_and_free(gpa: std.mem.Allocator, task: *Task) void { defer cs.leave(); if (task.state != .exited) { task.awaiter = rtos_state.current_task; - yield(.wait); + yield_from_cs(.wait); } } // alloc_size = stack_end - task @@ -278,7 +284,7 @@ pub fn wait_and_free(gpa: std.mem.Allocator, task: *Task) void { } /// Must execute inside a critical section. -pub fn make_ready(task: *Task) linksection(".ram_text") void { +pub fn make_ready(task: *Task, hptw: *bool) linksection(".ram_text") void { switch (task.state) { .ready, .running, .exited => return, .none, .suspended => {}, @@ -292,6 +298,7 @@ pub fn make_ready(task: *Task) linksection(".ram_text") void { task.state = .ready; rtos_state.ready_queue.put(task); + hptw.* |= @intFromEnum(task.priority) > @intFromEnum(rtos_state.current_task.priority); } pub const YieldAction = union(enum) { @@ -304,13 +311,16 @@ pub const YieldAction = union(enum) { pub inline fn yield(action: YieldAction) void { const cs = enter_critical_section(); defer cs.leave(); + yield_from_cs(action); +} +pub inline fn yield_from_cs(action: YieldAction) void { const current_task, const next_task = yield_inner(action); context_switch(¤t_task.context, &next_task.context); } fn yield_inner(action: YieldAction) linksection(".ram_text") struct { *Task, *Task } { - assert(microzig.cpu.csr.mscratch.read_raw() == 0); + assert(!in_isr()); const current_task = rtos_state.current_task; action: switch (action) { @@ -359,7 +369,8 @@ fn yield_inner(action: YieldAction) linksection(".ram_text") struct { *Task, *Ta current_task.state = .exited; if (current_task.awaiter) |awaiter| { - make_ready(awaiter); + awaiter.state = .ready; + rtos_state.ready_queue.put(awaiter); } }, } @@ -440,13 +451,6 @@ pub fn yield_from_isr() linksection(".ram_text") void { rtos_options.cpu_interrupt.set_pending(true); } -pub fn is_a_higher_priority_task_ready() linksection(".ram_text") bool { - const cs = enter_critical_section(); - defer cs.leave(); - - return @intFromEnum(rtos_state.ready_queue.max_ready_priority() orelse .idle) > @intFromEnum(rtos_state.current_task.priority); -} - pub const tick_interrupt_handler: microzig.cpu.InterruptHandler = .{ .naked = struct { pub fn handler_fn() linksection(".ram_vectors") callconv(.naked) void { @@ -669,6 +673,10 @@ pub fn log_task_info(task: *Task) void { } } +inline fn in_isr() bool { + return microzig.cpu.csr.mscratch.read_raw() != 0; +} + pub const Task = struct { name: ?[]const u8 = null, @@ -838,27 +846,6 @@ pub const Duration = enum(u32) { } }; -/// Must be used only from a critical section. -pub const Timeout = struct { - end_ticks: u64, - - pub fn after(duration: Duration) Timeout { - const current_ticks = (@as(u64, rtos_state.overflow_count) << 32) | rtos_state.current_ticks; - return .{ - .end_ticks = current_ticks + duration.to_ticks(), - }; - } - - pub fn get_remaining_sleep_duration(timeout: Timeout) ?Duration { - const current_ticks = (@as(u64, rtos_state.overflow_count) << 32) | rtos_state.current_ticks; - const remaining = timeout.end_ticks -| current_ticks; - if (remaining == 0) return null; - return .from_ticks(@truncate(remaining)); - } -}; - -pub const TimeoutError = error{Timeout}; - pub const PriorityWaitQueue = struct { list: DoublyLinkedList = .{}, @@ -868,24 +855,27 @@ pub const PriorityWaitQueue = struct { node: LinkedListNode = .{}, }; - /// Must execute inside a critical section. - pub fn wake_one(q: *PriorityWaitQueue) void { + /// Wakes one waiting task. Must execute inside a critical section. + pub fn wake_one(q: *PriorityWaitQueue, hptw: *bool) void { if (q.list.first) |first_node| { const waiter: *Waiter = @alignCast(@fieldParentPtr("node", first_node)); - make_ready(waiter.task); + make_ready(waiter.task, hptw); } } - /// Must execute inside a critical section. - pub fn wake_all(q: *PriorityWaitQueue) void { + /// Wakes all waiting tasks. Must execute inside a critical section. + pub fn wake_all(q: *PriorityWaitQueue, hptw: *bool) void { while (q.list.pop_first()) |current_node| { const current_waiter: *Waiter = @alignCast(@fieldParentPtr("node", current_node)); - make_ready(current_waiter.task); + make_ready(current_waiter.task, hptw); } } - /// Must execute inside a critical section. - pub fn wait(q: *PriorityWaitQueue, task: *Task, maybe_timeout_duration: ?Duration) void { + /// Puts the task to sleep. Must execute inside a critical section. + pub fn wait(q: *PriorityWaitQueue, maybe_timeout_duration: ?Duration) void { + assert(!in_isr()); + + const task = get_current_task(); var waiter: Waiter = .{ .task = task, .priority = task.priority, @@ -903,42 +893,73 @@ pub const PriorityWaitQueue = struct { } if (maybe_timeout_duration) |duration| { - yield(.{ .sleep = duration }); + yield_from_cs(.{ .sleep = duration }); } else { - yield(.wait); + yield_from_cs(.wait); } q.list.remove(&waiter.node); } }; +pub const TimeoutError = error{Timeout}; + +pub const Timeout = union(enum) { + never, + after: Duration, + non_blocking, +}; + +/// Must be used only from a critical section. +pub const ResolvedTimeout = enum(u64) { + non_blocking = std.math.maxInt(u64) - 1, + never = std.math.maxInt(u64), + _, + + pub fn init(timeout: Timeout) ResolvedTimeout { + return switch (timeout) { + .non_blocking => .non_blocking, + .never => .never, + .after => |duration| blk: { + const current_ticks = (@as(u64, rtos_state.overflow_count) << 32) | rtos_state.current_ticks; + break :blk @enumFromInt(current_ticks + duration.to_ticks()); + }, + }; + } + + pub fn tick(resolved_timeout: ResolvedTimeout) TimeoutError!?Duration { + return switch (resolved_timeout) { + .non_blocking => error.Timeout, + .never => null, + else => { + const current_ticks = (@as(u64, rtos_state.overflow_count) << 32) | rtos_state.current_ticks; + const remaining = @intFromEnum(resolved_timeout) -| current_ticks; + if (remaining == 0) return error.Timeout; + return .from_ticks(@truncate(remaining)); + }, + }; + } +}; + pub const Mutex = struct { locked: ?*Task = null, prev_priority: ?Priority = null, wait_queue: PriorityWaitQueue = .{}, pub fn lock(mutex: *Mutex) void { - mutex.lock_with_timeout(null) catch unreachable; + mutex.lock_with_timeout(.never) catch unreachable; } - pub fn lock_with_timeout(mutex: *Mutex, maybe_timeout_after: ?Duration) TimeoutError!void { + pub fn lock_with_timeout(mutex: *Mutex, timeout: Timeout) TimeoutError!void { const cs = enter_critical_section(); defer cs.leave(); const current_task = get_current_task(); - - const maybe_timeout: ?Timeout = if (maybe_timeout_after) |duration| - .after(duration) - else - null; + const resolved_timeout: ResolvedTimeout = .init(timeout); assert(mutex.locked != current_task); - while (mutex.locked) |owning_task| { - const maybe_remaining_duration = if (maybe_timeout) |timeout| - timeout.get_remaining_sleep_duration() orelse return error.Timeout - else - null; + const maybe_remaining_duration = try resolved_timeout.tick(); // Owning task inherits the priority of the current task if it the // current task has a bigger priority. @@ -946,10 +967,12 @@ pub const Mutex = struct { if (mutex.prev_priority == null) mutex.prev_priority = owning_task.priority; owning_task.priority = current_task.priority; - make_ready(owning_task); + + var _hptw = false; + make_ready(owning_task, &_hptw); } - mutex.wait_queue.wait(current_task, maybe_remaining_duration); + mutex.wait_queue.wait(maybe_remaining_duration); } mutex.locked = current_task; @@ -964,6 +987,7 @@ pub const Mutex = struct { fn unlock_impl(mutex: *Mutex) void { const owning_task = mutex.locked.?; + assert(owning_task == get_current_task()); // Restore the priority of the task if (mutex.prev_priority) |prev_priority| { @@ -972,7 +996,10 @@ pub const Mutex = struct { } mutex.locked = null; - mutex.wait_queue.wake_one(); + + var hptw = false; + mutex.wait_queue.wake_one(&hptw); + if (hptw) yield_from_cs(.reschedule); } }; @@ -980,26 +1007,27 @@ pub const Condition = struct { wait_queue: PriorityWaitQueue = .{}, pub fn wait(cond: *Condition, mutex: *Mutex) void { - { - const cs = enter_critical_section(); - defer cs.leave(); - mutex.unlock_impl(); - cond.wait_queue.wait(get_current_task(), null); - } - + const cs = enter_critical_section(); + defer cs.leave(); + mutex.unlock_impl(); + cond.wait_queue.wait(null); mutex.lock(); } pub fn signal(cond: *Condition) void { const cs = enter_critical_section(); defer cs.leave(); - cond.wait_queue.wake_one(); + var hptw = false; + cond.wait_queue.wake_one(&hptw); + if (hptw) yield_from_cs(.reschedule); } pub fn broadcast(cond: *Condition) void { const cs = enter_critical_section(); defer cs.leave(); - cond.wait_queue.wake_all(); + var hptw = false; + cond.wait_queue.wake_all(&hptw); + if (hptw) yield_from_cs(.reschedule); } }; @@ -1019,24 +1047,17 @@ pub const Semaphore = struct { } pub fn take(sem: *Semaphore) void { - sem.take_with_timeout(null) catch unreachable; + sem.take_with_timeout(.never) catch unreachable; } - pub fn take_with_timeout(sem: *Semaphore, maybe_timeout_after: ?Duration) TimeoutError!void { + pub fn take_with_timeout(sem: *Semaphore, timeout: Timeout) TimeoutError!void { const cs = enter_critical_section(); defer cs.leave(); - const maybe_timeout: ?Timeout = if (maybe_timeout_after) |duration| - .after(duration) - else - null; - + const resolved_timeout: ResolvedTimeout = .init(timeout); while (sem.current_value <= 0) { - const maybe_remaining_duration = if (maybe_timeout) |timeout| - timeout.get_remaining_sleep_duration() orelse return error.Timeout - else - null; - sem.wait_queue.wait(rtos_state.current_task, maybe_remaining_duration); + const maybe_remaining_duration = try resolved_timeout.tick(); + sem.wait_queue.wait(maybe_remaining_duration); } sem.current_value -= 1; @@ -1046,31 +1067,40 @@ pub const Semaphore = struct { const cs = enter_critical_section(); defer cs.leave(); + var hptw = false; + sem.give_impl(&hptw); + if (hptw) yield_from_cs(.reschedule); + } + + pub fn give_from_isr(sem: *Semaphore, hptw: *bool) void { + const cs = enter_critical_section(); + defer cs.leave(); + + sem.give_impl(&hptw); + } + + fn give_impl(sem: *Semaphore, hptw: *bool) void { sem.current_value += 1; if (sem.current_value > sem.max_value) { sem.current_value = sem.max_value; } else { - sem.wait_queue.wake_one(); + sem.wait_queue.wake_one(hptw); } } }; pub const TypeErasedQueue = struct { buffer: []u8, - start: usize, - len: usize, + start: usize = 0, + len: usize = 0, - putters: PriorityWaitQueue, - getters: PriorityWaitQueue, + putters: PriorityWaitQueue = .{}, + getters: PriorityWaitQueue = .{}, pub fn init(buffer: []u8) TypeErasedQueue { assert(buffer.len != 0); // buffer len must be greater than 0 return .{ .buffer = buffer, - .start = 0, - .len = 0, - .putters = .{}, - .getters = .{}, }; } @@ -1078,41 +1108,36 @@ pub const TypeErasedQueue = struct { q: *TypeErasedQueue, elements: []const u8, min: usize, - maybe_timeout_after: ?Duration, + timeout: Timeout, ) usize { assert(elements.len >= min); if (elements.len == 0) return 0; - const maybe_timeout: ?Timeout = if (maybe_timeout_after) |duration| - .after(duration) - else - null; - - var n: usize = 0; - const cs = enter_critical_section(); defer cs.leave(); + const resolved_timeout: ResolvedTimeout = .init(timeout); + + var n: usize = 0; while (true) { - n += q.put_non_blocking_from_cs(elements[n..]); + var hptw = false; + n += q.put_non_blocking_from_cs(elements[n..], &hptw); + if (hptw) yield_from_cs(.reschedule); + if (n >= min) return n; - const maybe_remaining_duration = if (maybe_timeout) |timeout| - timeout.get_remaining_sleep_duration() orelse return n - else - null; - q.putters.wait(rtos_state.current_task, maybe_remaining_duration); + const maybe_remaining_duration = resolved_timeout.tick() catch return n; + q.putters.wait(maybe_remaining_duration); } } - pub fn put_non_blocking(q: *TypeErasedQueue, elements: []const u8) usize { + pub fn put_from_isr(q: *TypeErasedQueue, elements: []const u8, hptw: *bool) usize { const cs = enter_critical_section(); defer cs.leave(); - - return q.put_non_blocking_from_cs(elements); + return q.put_non_blocking_from_cs(elements, hptw); } - fn put_non_blocking_from_cs(q: *TypeErasedQueue, elements: []const u8) usize { + fn put_non_blocking_from_cs(q: *TypeErasedQueue, elements: []const u8, hptw: *bool) usize { var n: usize = 0; while (q.puttable_slice()) |slice| { const copy_len = @min(slice.len, elements.len - n); @@ -1122,7 +1147,7 @@ pub const TypeErasedQueue = struct { n += copy_len; if (n == elements.len) break; } - if (n > 0) q.getters.wake_one(); + if (n > 0) q.getters.wake_one(hptw); return n; } @@ -1140,41 +1165,30 @@ pub const TypeErasedQueue = struct { q: *TypeErasedQueue, buffer: []u8, min: usize, - maybe_timeout_after: ?Duration, + timeout: Timeout, ) usize { assert(buffer.len >= min); if (buffer.len == 0) return 0; - const maybe_timeout: ?Timeout = if (maybe_timeout_after) |duration| - .after(duration) - else - null; - - var n: usize = 0; - const cs = enter_critical_section(); defer cs.leave(); + const resolved_timeout: ResolvedTimeout = .init(timeout); + + var n: usize = 0; while (true) { - n += q.get_non_blocking_from_cs(buffer[n..]); + var hptw = false; + n += q.get_non_blocking_from_cs(buffer[n..], &hptw); + if (hptw) yield_from_cs(.reschedule); + if (n >= min) return n; - const maybe_remaining_duration = if (maybe_timeout) |timeout| - timeout.get_remaining_sleep_duration() orelse return n - else - null; - q.getters.wait(rtos_state.current_task, maybe_remaining_duration); + const maybe_remaining_duration = resolved_timeout.tick() catch return n; + q.getters.wait(maybe_remaining_duration); } } - pub fn get_non_blocking(q: *TypeErasedQueue, buffer: []u8) usize { - const cs = enter_critical_section(); - defer cs.leave(); - - return q.get_non_blocking_from_cs(buffer); - } - - fn get_non_blocking_from_cs(q: *TypeErasedQueue, buffer: []u8) usize { + fn get_non_blocking_from_cs(q: *TypeErasedQueue, buffer: []u8, hptw: *bool) usize { var n: usize = 0; while (q.gettable_slice()) |slice| { const copy_len = @min(slice.len, buffer.len - n); @@ -1186,7 +1200,7 @@ pub const TypeErasedQueue = struct { n += copy_len; if (n == buffer.len) break; } - if (n > 0) q.putters.wake_one(); + if (n > 0) q.putters.wake_one(hptw); return n; } @@ -1207,50 +1221,101 @@ pub fn Queue(Elem: type) type { return .{ .type_erased = .init(@ptrCast(buffer)) }; } - pub fn put(q: *Self, elements: []const Elem, min: usize, timeout: ?Duration) usize { + pub fn put(q: *Self, elements: []const Elem, min: usize, timeout: Timeout) usize { return @divExact(q.type_erased.put(@ptrCast(elements), min * @sizeOf(Elem), timeout), @sizeOf(Elem)); } - pub fn put_all(q: *Self, elements: []const Elem, timeout: ?Duration) TimeoutError!void { + pub fn put_from_isr(q: *Self, elements: []const Elem, hptw: *bool) usize { + return @divExact(q.type_erased.put_from_isr(@ptrCast(elements), hptw), @sizeOf(Elem)); + } + + pub fn put_all(q: *Self, elements: []const Elem, timeout: Timeout) TimeoutError!void { if (q.put(elements, elements.len, timeout) != elements.len) return error.Timeout; } - pub fn put_one(q: *Self, item: Elem, timeout: ?Duration) TimeoutError!void { + pub fn put_one(q: *Self, item: Elem, timeout: Timeout) TimeoutError!void { if (q.put(&.{item}, 1, timeout) != 1) return error.Timeout; } - pub fn put_non_blocking(q: *Self, elements: []const Elem) usize { - return @divExact(q.type_erased.put_non_blocking(@ptrCast(elements)), @sizeOf(Elem)); - } - - pub fn put_one_non_blocking(q: *Self, item: Elem) bool { - return q.put_non_blocking(@ptrCast(&item)) == 1; + pub fn put_one_from_isr(q: *Self, item: Elem, hptw: *bool) bool { + return q.put_from_isr(&.{item}, hptw) == 1; } - pub fn get(q: *Self, buffer: []Elem, target: usize, timeout: ?Duration) usize { + pub fn get(q: *Self, buffer: []Elem, target: usize, timeout: Timeout) usize { return @divExact(q.type_erased.get(@ptrCast(buffer), target * @sizeOf(Elem), timeout), @sizeOf(Elem)); } - pub fn get_one(q: *Self, timeout: ?Duration) TimeoutError!Elem { + pub fn get_one(q: *Self, timeout: Timeout) TimeoutError!Elem { var buf: [1]Elem = undefined; if (q.get(&buf, 1, timeout) != 1) return error.Timeout; return buf[0]; } - pub fn get_one_non_blocking(q: *Self) ?Elem { - var buf: [1]Elem = undefined; - if (q.type_erased.get_non_blocking(@ptrCast(&buf)) == 1) { - return buf[0]; - } else { - return null; + pub fn capacity(q: *const Self) usize { + return @divExact(q.type_erased.buffer.len, @sizeOf(Elem)); + } + }; +} + +pub fn Signal(T: type) type { + return struct { + const Self = @This(); + + value: ?T = null, + awaiter: ?*Task = null, + + pub fn put(s: *Self, value: T) void { + const cs = enter_critical_section(); + defer cs.leave(); + + var hptw = false; + s.put_impl(value, &hptw); + if (hptw) yield_from_cs(.reschedule); + } + + pub fn put_from_isr(s: *Self, value: T, hptw: *bool) void { + const cs = enter_critical_section(); + defer cs.leave(); + + s.put_impl(value, hptw); + } + + fn put_impl(s: *Self, value: T, hptw: *bool) void { + s.value = value; + if (s.awaiter) |awaiter| { + make_ready(awaiter, hptw); } } - pub fn capacity(q: *const Self) usize { - return @divExact(q.type_erased.buffer.len, @sizeOf(Elem)); + pub fn wait(s: *Self) T { + return s.wait_with_timeout(.never) catch unreachable; + } + + pub fn wait_with_timeout(s: *Self, timeout: Timeout) TimeoutError!T { + assert(s.awaiter == null); + + const cs = enter_critical_section(); + defer cs.leave(); + + const resolved_timeout: ResolvedTimeout = .init(timeout); + + s.awaiter = get_current_task(); + while (s.value == null) { + const maybe_remaining_duration = try resolved_timeout.tick(); + if (maybe_remaining_duration) |remaining_duration| { + yield_from_cs(.{ .sleep = remaining_duration }); + } else { + yield_from_cs(.wait); + } + } + + const value = s.value.?; + s.value = null; + s.awaiter = null; + return value; } }; } From ba1584c9218765d1b2a57d1da7b63c25b6d66523 Mon Sep 17 00:00:00 2001 From: Tudor Andrei Dicu Date: Sun, 15 Feb 2026 22:56:56 +0200 Subject: [PATCH 2/3] Remove Condition, add EventGroup --- port/espressif/esp/src/hal/radio/wifi.zig | 92 ++++++-------- port/espressif/esp/src/hal/rtos.zig | 140 +++++++++++++++++----- 2 files changed, 147 insertions(+), 85 deletions(-) diff --git a/port/espressif/esp/src/hal/radio/wifi.zig b/port/espressif/esp/src/hal/radio/wifi.zig index ee0d84aba..2c8bd7ed6 100644 --- a/port/espressif/esp/src/hal/radio/wifi.zig +++ b/port/espressif/esp/src/hal/radio/wifi.zig @@ -448,21 +448,27 @@ pub fn disconnect() InternalError!void { pub fn start_blocking() InternalError!void { const mode = try get_mode(); - var events: EventSet = .initEmpty(); - if (mode.is_sta()) events.setPresent(.StaStart, true); - if (mode.is_ap()) events.setPresent(.ApStart, true); - clear_events(events); + + var wait_set: EventGroup.Set = .initEmpty(); + if (mode.is_sta()) wait_set.setPresent(.StaStart, true); + if (mode.is_ap()) wait_set.setPresent(.ApStart, true); + + events.clear(wait_set); + try start(); - wait_for_all_events(events); + + _ = events.wait(wait_set, .{ .wait_for_all = true }); } pub const ConnectError = error{FailedToConnect} || InternalError; pub fn connect_blocking() ConnectError!void { - const events: EventSet = .initMany(&.{ .StaConnected, .StaDisconnected }); - clear_events(events); + const wait_set: EventGroup.Set = comptime .initMany(&.{ .StaConnected, .StaDisconnected }); + events.clear(wait_set); + try connect(); - if (wait_for_any_event(events).contains(.StaDisconnected)) + + if (events.wait(wait_set, .{}).contains(.StaDisconnected)) return error.FailedToConnect; } @@ -678,47 +684,9 @@ pub const Event = union(EventType) { StaNeighborRep: c.wifi_event_neighbor_report_t, }; -pub const EventSet = std.EnumSet(EventType); - -var event_mutex: rtos.Mutex = .{}; -var event_condition: rtos.Condition = .{}; -var active_events: EventSet = .{}; +pub const EventGroup = rtos.EventGroup(EventType); -pub fn wait_for_any_event(events: EventSet) EventSet { - event_mutex.lock(); - defer event_mutex.unlock(); - - while (true) { - const intersection = active_events.intersectWith(events); - if (intersection.count() > 0) return intersection; - event_condition.wait(&event_mutex); - } -} - -pub fn wait_for_all_events(events: EventSet) void { - event_mutex.lock(); - defer event_mutex.unlock(); - - while (!active_events.supersetOf(events)) { - event_condition.wait(&event_mutex); - } -} - -pub fn wait_for_event(event: EventType) void { - event_mutex.lock(); - defer event_mutex.unlock(); - - while (!active_events.contains(event)) { - event_condition.wait(&event_mutex); - } -} - -pub fn clear_events(events: EventSet) void { - event_mutex.lock(); - defer event_mutex.unlock(); - - active_events = active_events.differenceWith(events); -} +pub var events: EventGroup = .{}; /// Internal function. Called by osi layer. pub fn on_event_post(id: i32, data: ?*anyopaque, data_size: usize) void { @@ -726,13 +694,9 @@ pub fn on_event_post(id: i32, data: ?*anyopaque, data_size: usize) void { log.debug("event received: {t}", .{event_type}); update_sta_state(event_type); + update_ap_state(event_type); - { - event_mutex.lock(); - defer event_mutex.unlock(); - active_events.setPresent(event_type, true); - } - event_condition.broadcast(); + events.set(.initOne(event_type)); const event = switch (event_type) { inline else => |tag| blk: { @@ -768,19 +732,35 @@ pub const StaState = enum(u32) { } }; +pub const ApState = enum(u32) { + none, + started, + stopped, +}; + var sta_state: std.atomic.Value(StaState) = .init(.none); +var ap_state: std.atomic.Value(ApState) = .init(.none); fn update_sta_state(event: EventType) void { const new_sta_state: StaState = switch (event) { .StaStart => .started, - .StaConnected => .connected, - .StaDisconnected => .disconnected, + .StaConnected, .ApStaconnected => .connected, + .StaDisconnected, .ApStadisconnected => .disconnected, .StaStop => .stopped, else => return, }; sta_state.store(new_sta_state, .monotonic); } +fn update_ap_state(event: EventType) void { + const new_ap_state: ApState = switch (event) { + .ApStart => .started, + .StaStop => .stopped, + else => return, + }; + ap_state.store(new_ap_state, .monotonic); +} + pub fn get_sta_state() StaState { return sta_state.load(.monotonic); } diff --git a/port/espressif/esp/src/hal/rtos.zig b/port/espressif/esp/src/hal/rtos.zig index 3a0752e37..466a93bb6 100644 --- a/port/espressif/esp/src/hal/rtos.zig +++ b/port/espressif/esp/src/hal/rtos.zig @@ -23,7 +23,6 @@ const systimer = @import("systimer.zig"); // // `hptw` stands for "high priority task woken". -// TODO: remove Condition // TODO: reimplement TypeErasedQueue to store item_len // TODO: low power mode where tick interrupt is only triggered when necessary // TODO: investigate tick interrupt assembly and improve generated code @@ -1003,34 +1002,6 @@ pub const Mutex = struct { } }; -pub const Condition = struct { - wait_queue: PriorityWaitQueue = .{}, - - pub fn wait(cond: *Condition, mutex: *Mutex) void { - const cs = enter_critical_section(); - defer cs.leave(); - mutex.unlock_impl(); - cond.wait_queue.wait(null); - mutex.lock(); - } - - pub fn signal(cond: *Condition) void { - const cs = enter_critical_section(); - defer cs.leave(); - var hptw = false; - cond.wait_queue.wake_one(&hptw); - if (hptw) yield_from_cs(.reschedule); - } - - pub fn broadcast(cond: *Condition) void { - const cs = enter_critical_section(); - defer cs.leave(); - var hptw = false; - cond.wait_queue.wake_all(&hptw); - if (hptw) yield_from_cs(.reschedule); - } -}; - pub const Semaphore = struct { current_value: u32, max_value: u32, @@ -1320,6 +1291,117 @@ pub fn Signal(T: type) type { }; } +pub fn EventGroup(T: type) type { + return struct { + const Self = @This(); + + pub const Set = std.EnumSet(T); + + mutex: Mutex = .{}, + state: Set = .initEmpty(), + listeners: DoublyLinkedList = .{}, + + pub const WaitOptions = packed struct(u2) { + clear_on_exit: bool = false, + wait_for_all: bool = false, + }; + + pub const Listener = struct { + wait_events: Set, + options: WaitOptions, + signal: *Signal(void), + node: LinkedListNode = .{}, + + pub const Flags = packed struct(u8) { + clear_on_exit: bool, + wait_for_all: bool, + }; + }; + + pub fn set(self: *Self, events: Set) void { + self.mutex.lock(); + defer self.mutex.unlock(); + + self.state.setUnion(events); + + var clear_set: Set = .initEmpty(); + + var it = self.listeners.first; + while (it) |node| : (it = node.next) { + const listener: *Listener = @alignCast(@fieldParentPtr("node", node)); + + const satisfied = self.state.intersectWith(listener.wait_events); + + const should_wake = + (listener.options.wait_for_all and satisfied.eql(listener.wait_events)) or + (!listener.options.wait_for_all and !satisfied.eql(.initEmpty())); + + if (should_wake) { + listener.wait_events = self.state; + listener.signal.put({}); + self.listeners.remove(&listener.node); + + if (listener.options.clear_on_exit) { + clear_set.setUnion(listener.wait_events); + } + } + } + + self.state = self.state.differenceWith(clear_set); + } + + pub fn clear(self: *Self, events: Set) void { + self.mutex.lock(); + defer self.mutex.unlock(); + + self.state = self.state.differenceWith(events); + } + + pub fn wait(self: *Self, wait_events: Set, options: WaitOptions) Set { + return self.wait_with_timeout(wait_events, .never, options) catch unreachable; + } + + pub fn wait_with_timeout(self: *Self, wait_events: Set, timeout: Timeout, options: WaitOptions) TimeoutError!Set { + var signal: Signal(void) = .{}; + + var listener: Listener = .{ + .wait_events = wait_events, + .options = .{ + .clear_on_exit = options.clear_on_exit, + .wait_for_all = options.wait_for_all, + }, + .signal = &signal, + }; + + { + self.mutex.lock(); + defer self.mutex.unlock(); + + const satisfied = self.state.intersectWith(wait_events); + if ((options.wait_for_all and satisfied.eql(wait_events)) or + (!options.wait_for_all and !satisfied.eql(.initEmpty()))) + { + if (options.clear_on_exit) { + self.state = self.state.differenceWith(wait_events); + } + return self.state; + } + + self.listeners.append(&listener.node); + } + + signal.wait_with_timeout(timeout) catch |err| { + self.mutex.lock(); + defer self.mutex.unlock(); + self.listeners.remove(&listener.node); + return err; + }; + + return listener.wait_events; + } + }; +} + pub const LinkedListNode = struct { prev: ?*LinkedListNode = null, next: ?*LinkedListNode = null, From 5b0289f600d05e3a7ca7d042f095dc479c29fb39 Mon Sep 17 00:00:00 2001 From: Tudor Andrei Dicu Date: Sun, 15 Feb 2026 23:07:54 +0200 Subject: [PATCH 3/3] RTOS bug fixing and update example to use Signal --- examples/espressif/esp/src/tcp_server.zig | 7 +- port/espressif/esp/src/hal/rtos.zig | 324 ++++++++-------------- 2 files changed, 123 insertions(+), 208 deletions(-) diff --git a/examples/espressif/esp/src/tcp_server.zig b/examples/espressif/esp/src/tcp_server.zig index aa4457164..9666ae754 100644 --- a/examples/espressif/esp/src/tcp_server.zig +++ b/examples/espressif/esp/src/tcp_server.zig @@ -52,7 +52,7 @@ const SERVER_PORT = 3333; var maybe_netif: ?*lwip.c.netif = null; -var ip_ready_semaphore: rtos.Semaphore = .init(0, 1); +var ip_ready_signal: rtos.Signal(void) = .{}; var ip: lwip.c.ip_addr_t = undefined; extern fn netconn_new_with_proto_and_callback(t: lwip.c.enum_netconn_type, proto: lwip.c.u8_t, callback: ?*const anyopaque) [*c]lwip.c.struct_netconn; @@ -103,9 +103,10 @@ pub fn main() !void { } try radio.wifi.connect_blocking(); + try lwip.c_err(lwip.c.netifapi_netif_common(&netif, lwip.c.netif_set_link_up, null)); - ip_ready_semaphore.take(); + ip_ready_signal.wait(); std.log.info("Listening on {f}:{}", .{ IP_Formatter.init(netif.ip_addr), SERVER_PORT }); const server_conn = netconn_new_with_proto_and_callback(lwip.c.NETCONN_TCP, 0, null) orelse { @@ -201,7 +202,7 @@ fn netif_status_callback(netif_ptr: [*c]lwip.c.netif) callconv(.c) void { const netif = &netif_ptr[0]; if (netif.ip_addr.u_addr.ip4.addr != 0) { ip = netif.ip_addr; - ip_ready_semaphore.give(); + ip_ready_signal.put({}); } } diff --git a/port/espressif/esp/src/hal/rtos.zig b/port/espressif/esp/src/hal/rtos.zig index 466a93bb6..2e4f3f938 100644 --- a/port/espressif/esp/src/hal/rtos.zig +++ b/port/espressif/esp/src/hal/rtos.zig @@ -27,7 +27,6 @@ const systimer = @import("systimer.zig"); // TODO: low power mode where tick interrupt is only triggered when necessary // TODO: investigate tick interrupt assembly and improve generated code // TODO: stack overflow detection -// TODO: direct task signaling // TODO: implement std.Io // TODO: use @stackUpperBound when implemented // TODO: support SMP for other esp32 chips with multicore (far future) @@ -300,6 +299,16 @@ pub fn make_ready(task: *Task, hptw: *bool) linksection(".ram_text") void { hptw.* |= @intFromEnum(task.priority) > @intFromEnum(rtos_state.current_task.priority); } +pub fn change_priority(task: *Task, new_priority: Priority) void { + if (task.state == .ready) { + rtos_state.ready_queue.remove(task); + } + task.priority = new_priority; + if (task.state == .ready) { + rtos_state.ready_queue.put(task); + } +} + pub const YieldAction = union(enum) { reschedule, sleep: Duration, @@ -687,7 +696,7 @@ pub const Task = struct { state: State = .none, /// Node used for rtos internal lists. - node: LinkedListNode = .{}, + node: DoublyLinkedList.Node = .{}, /// Ticks for when the task will wake. ticks: u32 = 0, @@ -730,16 +739,27 @@ pub const ReadyTaskConstraint = union(enum) { none, at_least_prio: Priority, more_than_prio: Priority, + + pub fn check(constraint: ReadyTaskConstraint, prio: Priority) bool { + switch (constraint) { + .none => {}, + inline else => |constraint_priority, tag| { + if ((tag == .at_least_prio and @intFromEnum(prio) < @intFromEnum(constraint_priority)) or + (tag == .more_than_prio and @intFromEnum(prio) <= @intFromEnum(constraint_priority))) + { + return false; + } + }, + } + return true; + } }; pub const ReadyPriorityQueue = if (ready_queue_use_buckets) struct { const ReadySet = std.EnumSet(Priority); ready: ReadySet = .initEmpty(), - lists: std.EnumArray(Priority, LinkedList(.{ - .use_last = true, - .use_prev = false, - })) = .initFill(.{}), + lists: std.EnumArray(Priority, DoublyLinkedList) = .initFill(.{}), pub fn max_ready_priority(pq: *ReadyPriorityQueue) ?Priority { const raw_prio = pq.ready.bits.findLastSet() orelse return null; @@ -748,16 +768,8 @@ pub const ReadyPriorityQueue = if (ready_queue_use_buckets) struct { pub fn pop(pq: *ReadyPriorityQueue, constraint: ReadyTaskConstraint) ?*Task { const prio = pq.max_ready_priority() orelse return null; - switch (constraint) { - .none => {}, - inline else => |constraint_priority, tag| { - if ((tag == .at_least_prio and @intFromEnum(prio) < @intFromEnum(constraint_priority)) or - (tag == .more_than_prio and @intFromEnum(prio) <= @intFromEnum(constraint_priority))) - { - return null; - } - }, - } + if (!constraint.check(prio)) + return null; const bucket = pq.lists.getPtr(prio); @@ -777,6 +789,14 @@ pub const ReadyPriorityQueue = if (ready_queue_use_buckets) struct { pq.lists.getPtr(new_task.priority).append(&new_task.node); pq.ready.setPresent(new_task.priority, true); } + + pub fn remove(pq: *ReadyPriorityQueue, task: *Task) void { + const bucket = pq.lists.getPtr(task.priority); + bucket.remove(&task.node); + if (bucket.first == null) { + pq.ready.remove(task.priority); + } + } } else struct { inner: DoublyLinkedList = .{}, @@ -795,13 +815,10 @@ pub const ReadyPriorityQueue = if (ready_queue_use_buckets) struct { null; } - pub fn pop(pq: *ReadyPriorityQueue, maybe_more_than_prio: ?Priority) ?*Task { + pub fn pop(pq: *ReadyPriorityQueue, constraint: ReadyTaskConstraint) ?*Task { if (pq.peek_top()) |task| { - if (maybe_more_than_prio) |more_than_prio| { - if (@intFromEnum(task.priority) <= @intFromEnum(more_than_prio)) { - return null; - } - } + if (!constraint.check(task.priority)) + return null; pq.inner.remove(&task.node); return task; } @@ -820,6 +837,10 @@ pub const ReadyPriorityQueue = if (ready_queue_use_buckets) struct { pq.inner.append(&new_task.node); } } + + pub fn remove(pq: *ReadyPriorityQueue, task: *Task) void { + pq.inner.remove(&task.node); + } }; pub const Duration = enum(u32) { @@ -851,7 +872,7 @@ pub const PriorityWaitQueue = struct { pub const Waiter = struct { task: *Task, priority: Priority, - node: LinkedListNode = .{}, + node: DoublyLinkedList.Node = .{}, }; /// Wakes one waiting task. Must execute inside a critical section. @@ -965,10 +986,7 @@ pub const Mutex = struct { if (@intFromEnum(current_task.priority) > @intFromEnum(owning_task.priority)) { if (mutex.prev_priority == null) mutex.prev_priority = owning_task.priority; - owning_task.priority = current_task.priority; - - var _hptw = false; - make_ready(owning_task, &_hptw); + change_priority(owning_task, current_task.priority); } mutex.wait_queue.wait(maybe_remaining_duration); @@ -990,7 +1008,7 @@ pub const Mutex = struct { // Restore the priority of the task if (mutex.prev_priority) |prev_priority| { - owning_task.priority = prev_priority; + change_priority(owning_task, prev_priority); mutex.prev_priority = null; } @@ -1308,14 +1326,9 @@ pub fn EventGroup(T: type) type { pub const Listener = struct { wait_events: Set, - options: WaitOptions, signal: *Signal(void), - node: LinkedListNode = .{}, - - pub const Flags = packed struct(u8) { - clear_on_exit: bool, - wait_for_all: bool, - }; + options: WaitOptions, + node: DoublyLinkedList.Node = .{}, }; pub fn set(self: *Self, events: Set) void { @@ -1337,13 +1350,12 @@ pub fn EventGroup(T: type) type { (!listener.options.wait_for_all and !satisfied.eql(.initEmpty())); if (should_wake) { - listener.wait_events = self.state; - listener.signal.put({}); - self.listeners.remove(&listener.node); - if (listener.options.clear_on_exit) { clear_set.setUnion(listener.wait_events); } + listener.wait_events = self.state; + self.listeners.remove(&listener.node); + listener.signal.put({}); } } @@ -1366,11 +1378,11 @@ pub fn EventGroup(T: type) type { var listener: Listener = .{ .wait_events = wait_events, + .signal = &signal, .options = .{ .clear_on_exit = options.clear_on_exit, .wait_for_all = options.wait_for_all, }, - .signal = &signal, }; { @@ -1402,191 +1414,93 @@ pub fn EventGroup(T: type) type { }; } -pub const LinkedListNode = struct { - prev: ?*LinkedListNode = null, - next: ?*LinkedListNode = null, -}; - -pub const DoublyLinkedList = LinkedList(.{ - .use_last = true, - .use_prev = true, -}); - -pub const LinkedListCapabilities = struct { - use_last: bool = true, - use_prev: bool = true, -}; - -pub fn LinkedList(comptime caps: LinkedListCapabilities) type { - return struct { - const Self = @This(); +pub const DoublyLinkedList = struct { + first: ?*Node = null, + last: ?*Node = null, - first: ?*LinkedListNode = null, - last: if (caps.use_last) ?*LinkedListNode else noreturn = null, - - pub const append = if (caps.use_last) struct { - fn append(ll: *Self, node: *LinkedListNode) void { - if (caps.use_prev) node.prev = ll.last; - node.next = null; - if (ll.last) |last| { - last.next = node; - ll.last = node; - } else { - ll.first = node; - ll.last = node; - } - } - }.append else @compileError("linked list does not support append"); + pub const Node = struct { + prev: ?*Node = null, + next: ?*Node = null, + }; - pub fn prepend(ll: *Self, node: *LinkedListNode) void { - if (caps.use_prev) { - node.prev = null; - if (ll.first) |first| { - first.prev = node; - } - } - node.next = ll.first; - if (caps.use_last and ll.first == null) { - ll.last = node; - } + pub fn append(ll: *DoublyLinkedList, node: *Node) void { + node.prev = ll.last; + node.next = null; + if (ll.last) |last| { + last.next = node; + ll.last = node; + } else { ll.first = node; + ll.last = node; } + } - pub fn pop_first(ll: *Self) ?*LinkedListNode { - if (ll.first) |first| { - ll.first = first.next; - if (caps.use_last) { - if (ll.last == first) { - ll.last = null; - } - } - if (caps.use_prev) { - if (ll.first) |new_first| { - new_first.prev = null; - } - } - return first; - } else return null; + pub fn prepend(ll: *DoublyLinkedList, node: *Node) void { + node.prev = null; + if (ll.first) |first| { + first.prev = node; + } + node.next = ll.first; + if (ll.first == null) { + ll.last = node; } + ll.first = node; + } - pub const insert_before = if (caps.use_prev) struct { - pub fn insert_before(ll: *Self, existing_node: *LinkedListNode, new_node: *LinkedListNode) void { - new_node.next = existing_node; - if (existing_node.prev) |prev_node| { - // Intermediate node. - new_node.prev = prev_node; - prev_node.next = new_node; - } else { - // First element of the list. - new_node.prev = null; - ll.first = new_node; - } - existing_node.prev = new_node; + pub fn pop_first(ll: *DoublyLinkedList) ?*Node { + if (ll.first) |first| { + ll.first = first.next; + if (ll.last == first) { + ll.last = null; } - }.insert_before else @compileError("linked list does not support insert_before"); - - pub const remove = if (caps.use_prev) struct { - pub fn remove(ll: *Self, node: *LinkedListNode) void { - if (node.prev) |prev_node| { - // Intermediate node. - prev_node.next = node.next; - } else { - // First element of the list. - ll.first = node.next; - } - - if (node.next) |next_node| { - // Intermediate node. - next_node.prev = node.prev; - } else { - // Last element of the list. - if (caps.use_last) ll.last = node.prev; - } + if (ll.first) |new_first| { + new_first.prev = null; } - }.remove else @compileError("linked list does not support remove"); - }; -} - -test "LinkedList.with_last" { - const expect = std.testing.expect; - const TestNode = struct { - data: i32, - node: LinkedListNode = .{}, - }; - - var list: LinkedList(.{ - .use_prev = false, - .use_last = true, - }) = .{}; - - var n1: TestNode = .{ .data = 1 }; - var n2: TestNode = .{ .data = 2 }; - var n3: TestNode = .{ .data = 3 }; - - // 1. Test Append on empty - list.append(&n1.node); - - // State: [1] - try expect(list.first == &n1.node); - try expect(list.last == &n1.node); - try expect(n1.node.next == null); - - // 2. Test Append on existing - list.append(&n2.node); - - // State: [1, 2] - try expect(list.first == &n1.node); - try expect(list.last == &n2.node); - try expect(n1.node.next == &n2.node); - try expect(n2.node.next == null); - - // 3. Test Prepend - list.prepend(&n3.node); - - // State: [3, 1, 2] - try expect(list.first == &n3.node); - try expect(list.last == &n2.node); - try expect(n3.node.next == &n1.node); - - // 4. Test Pop (FIFO if we pop from head) - const p1 = list.pop_first(); - - // State: [1, 2] - try expect(p1 == &n3.node); - try expect(list.first == &n1.node); - - if (p1) |node_ptr| { - const parent: *TestNode = @fieldParentPtr("node", node_ptr); - try expect(parent.data == 3); + return first; + } else return null; } - const p2 = list.pop_first(); - // State: [2] - try expect(p2 == &n1.node); - try expect(list.first == &n2.node); - try expect(list.last == &n2.node); + pub fn insert_before(ll: *DoublyLinkedList, existing_node: *Node, new_node: *Node) void { + new_node.next = existing_node; + if (existing_node.prev) |prev_node| { + // Intermediate node. + new_node.prev = prev_node; + prev_node.next = new_node; + } else { + // First element of the list. + new_node.prev = null; + ll.first = new_node; + } + existing_node.prev = new_node; + } - const p3 = list.pop_first(); - // State: [] - try expect(p3 == &n2.node); - try expect(list.first == null); - try expect(list.last == null); + pub fn remove(ll: *DoublyLinkedList, node: *Node) void { + if (node.prev) |prev_node| { + // Intermediate node. + prev_node.next = node.next; + } else { + // First element of the list. + ll.first = node.next; + } - // 5. Test Pop on empty - try expect(list.pop_first() == null); -} + if (node.next) |next_node| { + // Intermediate node. + next_node.prev = node.prev; + } else { + // Last element of the list. + ll.last = node.prev; + } + } +}; test "LinkedList.doubly_linked" { const expect = std.testing.expect; const TestNode = struct { data: i32, - node: LinkedListNode = .{}, + node: DoublyLinkedList.Node = .{}, }; - var list: LinkedList(.{ - .use_prev = true, - .use_last = true, - }) = .{}; + var list: DoublyLinkedList = .{}; var n1: TestNode = .{ .data = 10 }; var n2: TestNode = .{ .data = 20 };