Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 8 additions & 14 deletions examples/espressif/esp/src/lwip/exports.zig
Original file line number Diff line number Diff line change
Expand Up @@ -106,16 +106,13 @@ export fn sys_mbox_set_invalid(ptr: *c.sys_mbox_t) void {

export fn sys_mbox_post(ptr: *c.sys_mbox_t, element: MailboxElement) void {
const mailbox: *Mailbox = @ptrCast(@alignCast(ptr.*));
mailbox.put_one(element, null) catch unreachable;
mailbox.put_one(element, .never) catch unreachable;
}

export fn sys_mbox_trypost(ptr: *c.sys_mbox_t, element: MailboxElement) c.err_t {
const mailbox: *Mailbox = @ptrCast(@alignCast(ptr.*));
if (mailbox.put_one_non_blocking(element)) {
return c.ERR_OK;
} else {
return c.ERR_MEM;
}
mailbox.put_one(element, .non_blocking) catch return c.ERR_MEM;
return c.ERR_OK;
}

comptime {
Expand All @@ -125,7 +122,7 @@ comptime {
export fn sys_arch_mbox_fetch(ptr: *c.sys_mbox_t, element_ptr: *MailboxElement, timeout: u32) u32 {
const mailbox: *Mailbox = @ptrCast(@alignCast(ptr.*));
const now = esp.time.get_time_since_boot();
element_ptr.* = mailbox.get_one(if (timeout != 0) .from_ms(timeout) else null) catch {
element_ptr.* = mailbox.get_one(if (timeout != 0) .{ .after = .from_ms(timeout) } else .never) catch {
return c.SYS_ARCH_TIMEOUT;
};
// returns waiting time in ms
Expand All @@ -134,12 +131,9 @@ export fn sys_arch_mbox_fetch(ptr: *c.sys_mbox_t, element_ptr: *MailboxElement,

export fn sys_arch_mbox_tryfetch(ptr: *c.sys_mbox_t, element_ptr: *MailboxElement) u32 {
const mailbox: *Mailbox = @ptrCast(@alignCast(ptr.*));
if (mailbox.get_one_non_blocking()) |element| {
element_ptr.* = element;
return 0;
} else {
return c.SYS_MBOX_EMPTY;
}
const element = mailbox.get_one(.non_blocking) catch return c.SYS_MBOX_EMPTY;
element_ptr.* = element;
return 0;
}

export fn sys_sem_new(ptr: *c.sys_sem_t, count: u8) c.err_t {
Expand All @@ -165,7 +159,7 @@ export fn sys_sem_signal(ptr: *c.sys_sem_t) void {
export fn sys_arch_sem_wait(ptr: *c.sys_sem_t, timeout: u32) u32 {
const sem: *rtos.Semaphore = @ptrCast(@alignCast(ptr.*));
const now = esp.time.get_time_since_boot();
sem.take_with_timeout(if (timeout != 0) .from_ms(timeout) else null) catch {
sem.take_with_timeout(if (timeout != 0) .{ .after = .from_ms(timeout) } else .never) catch {
return c.SYS_ARCH_TIMEOUT;
};
// returns waiting time in ms
Expand Down
4 changes: 2 additions & 2 deletions examples/espressif/esp/src/rtos.zig
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ pub const microzig_options: microzig.Options = .{

fn task1(queue: *rtos.Queue(u32)) void {
for (0..5) |i| {
queue.put_one(i, null) catch unreachable;
queue.put_one(i, .never) catch unreachable;
rtos.sleep(.from_ms(500));
}
}
Expand All @@ -44,7 +44,7 @@ pub fn main() !void {
defer rtos.wait_and_free(gpa, task);

while (true) {
const item = try queue.get_one(.from_ms(1000));
const item = try queue.get_one(.{ .after = .from_ms(1000) });
std.log.info("got item: {}", .{item});
}
}
7 changes: 4 additions & 3 deletions examples/espressif/esp/src/tcp_server.zig
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ const SERVER_PORT = 3333;

var maybe_netif: ?*lwip.c.netif = null;

var ip_ready_semaphore: rtos.Semaphore = .init(0, 1);
var ip_ready_signal: rtos.Signal(void) = .{};
var ip: lwip.c.ip_addr_t = undefined;

extern fn netconn_new_with_proto_and_callback(t: lwip.c.enum_netconn_type, proto: lwip.c.u8_t, callback: ?*const anyopaque) [*c]lwip.c.struct_netconn;
Expand Down Expand Up @@ -103,9 +103,10 @@ pub fn main() !void {
}

try radio.wifi.connect_blocking();

try lwip.c_err(lwip.c.netifapi_netif_common(&netif, lwip.c.netif_set_link_up, null));

ip_ready_semaphore.take();
ip_ready_signal.wait();
std.log.info("Listening on {f}:{}", .{ IP_Formatter.init(netif.ip_addr), SERVER_PORT });

const server_conn = netconn_new_with_proto_and_callback(lwip.c.NETCONN_TCP, 0, null) orelse {
Expand Down Expand Up @@ -201,7 +202,7 @@ fn netif_status_callback(netif_ptr: [*c]lwip.c.netif) callconv(.c) void {
const netif = &netif_ptr[0];
if (netif.ip_addr.u_addr.ip4.addr != 0) {
ip = netif.ip_addr;
ip_ready_semaphore.give();
ip_ready_signal.put({});
}
}

Expand Down
59 changes: 29 additions & 30 deletions port/espressif/esp/src/hal/radio/osi.zig
Original file line number Diff line number Diff line change
Expand Up @@ -319,11 +319,12 @@ pub fn semphr_take(ptr: ?*anyopaque, tick: u32) callconv(.c) i32 {
log.debug("semphr_take {?} {}", .{ ptr, tick });

const sem: *rtos.Semaphore = @ptrCast(@alignCast(ptr));
const maybe_timeout: ?rtos.Duration = if (tick == c.OSI_FUNCS_TIME_BLOCKING)
.from_ticks(tick)
else
null;
sem.take_with_timeout(maybe_timeout) catch {
const timeout: rtos.Timeout = switch (tick) {
0 => .non_blocking,
c.OSI_FUNCS_TIME_BLOCKING => .never,
else => |ticks| .{ .after = .from_ticks(ticks) },
};
sem.take_with_timeout(timeout) catch {
log.debug(">>>> return from semaphore take with timeout: {*}", .{sem});
return 1;
};
Expand Down Expand Up @@ -368,10 +369,11 @@ const RecursiveMutex = struct {
if (@intFromEnum(current_task.priority) > @intFromEnum(owning_task.priority)) {
mutex.prev_priority = owning_task.priority;
owning_task.priority = current_task.priority;
rtos.make_ready(owning_task);
var _hptw = false;
rtos.make_ready(owning_task, &_hptw);
}

mutex.wait_queue.wait(current_task, null);
mutex.wait_queue.wait(null);
}

assert(mutex.value == 0);
Expand All @@ -395,8 +397,13 @@ const RecursiveMutex = struct {
owning_task.priority = prev_priority;
mutex.prev_priority = null;
}

mutex.owning_task = null;
mutex.wait_queue.wake_one();

var hptw = false;
mutex.wait_queue.wake_one(&hptw);
if (hptw) rtos.yield_from_cs(.reschedule);

return true;
} else {
return false;
Expand Down Expand Up @@ -505,17 +512,12 @@ pub fn queue_send(ptr: ?*anyopaque, item_ptr: ?*anyopaque, block_time_tick: u32)
const queue: *QueueWrapper = @ptrCast(@alignCast(ptr));
const item: [*]const u8 = @ptrCast(@alignCast(item_ptr));

const size = switch (block_time_tick) {
0 => queue.inner.put_non_blocking(item[0..queue.item_len]),
else => queue.inner.put(
item[0..queue.item_len],
1,
if (block_time_tick != c.OSI_FUNCS_TIME_BLOCKING)
.from_ticks(block_time_tick)
else
null,
),
const timeout: rtos.Timeout = switch (block_time_tick) {
0 => .non_blocking,
c.OSI_FUNCS_TIME_BLOCKING => .never,
else => |ticks| .{ .after = .from_ticks(ticks) },
};
const size = queue.inner.put(item[0..queue.item_len], 1, timeout);
if (size == 0) return -1;
return 1;
}
Expand All @@ -525,9 +527,11 @@ pub fn queue_send_from_isr(ptr: ?*anyopaque, item_ptr: ?*anyopaque, _hptw: ?*any

const queue: *QueueWrapper = @ptrCast(@alignCast(ptr));
const item: [*]const u8 = @ptrCast(@alignCast(item_ptr));
const n = @divExact(queue.inner.put_non_blocking(item[0..queue.item_len]), queue.item_len);

@as(*u32, @ptrCast(@alignCast(_hptw))).* = @intFromBool(rtos.is_a_higher_priority_task_ready());
var hptw = false;
const n = @divExact(queue.inner.put_from_isr(item[0..queue.item_len], &hptw), queue.item_len);

@as(*u32, @ptrCast(@alignCast(_hptw))).* |= @intFromBool(hptw);

return @intCast(n);
}
Expand All @@ -546,17 +550,12 @@ pub fn queue_recv(ptr: ?*anyopaque, item_ptr: ?*anyopaque, block_time_tick: u32)
const queue: *QueueWrapper = @ptrCast(@alignCast(ptr));
const item: [*]u8 = @ptrCast(@alignCast(item_ptr));

const size = switch (block_time_tick) {
0 => queue.inner.get_non_blocking(item[0..queue.item_len]),
else => queue.inner.get(
item[0..queue.item_len],
queue.item_len,
if (block_time_tick != c.OSI_FUNCS_TIME_BLOCKING)
.from_ticks(block_time_tick)
else
null,
),
const timeout: rtos.Timeout = switch (block_time_tick) {
0 => .non_blocking,
c.OSI_FUNCS_TIME_BLOCKING => .never,
else => |ticks| .{ .after = .from_ticks(ticks) },
};
const size = queue.inner.get(item[0..queue.item_len], 1, timeout);
if (size == 0) return -1;
return 1;
}
Expand Down
8 changes: 4 additions & 4 deletions port/espressif/esp/src/hal/radio/timer.zig
Original file line number Diff line number Diff line change
Expand Up @@ -150,16 +150,16 @@ fn task_fn() void {
callback(arg);
}

const sleep_duration: ?rtos.Duration = blk: {
const timeout: rtos.Timeout = blk: {
mutex.lock();
defer mutex.unlock();
break :blk if (find_next_wake_absolute()) |next_wake_absolute|
.from_us(@truncate(next_wake_absolute.diff(now).to_us()))
.{ .after = .from_us(@truncate(next_wake_absolute.diff(now).to_us())) }
else
null;
.never;
};

reload_semaphore.take_with_timeout(sleep_duration) catch {};
reload_semaphore.take_with_timeout(timeout) catch {};
}
}

Expand Down
92 changes: 36 additions & 56 deletions port/espressif/esp/src/hal/radio/wifi.zig
Original file line number Diff line number Diff line change
Expand Up @@ -448,21 +448,27 @@ pub fn disconnect() InternalError!void {

pub fn start_blocking() InternalError!void {
const mode = try get_mode();
var events: EventSet = .initEmpty();
if (mode.is_sta()) events.setPresent(.StaStart, true);
if (mode.is_ap()) events.setPresent(.ApStart, true);
clear_events(events);

var wait_set: EventGroup.Set = .initEmpty();
if (mode.is_sta()) wait_set.setPresent(.StaStart, true);
if (mode.is_ap()) wait_set.setPresent(.ApStart, true);

events.clear(wait_set);

try start();
wait_for_all_events(events);

_ = events.wait(wait_set, .{ .wait_for_all = true });
}

pub const ConnectError = error{FailedToConnect} || InternalError;

pub fn connect_blocking() ConnectError!void {
const events: EventSet = .initMany(&.{ .StaConnected, .StaDisconnected });
clear_events(events);
const wait_set: EventGroup.Set = comptime .initMany(&.{ .StaConnected, .StaDisconnected });
events.clear(wait_set);

try connect();
if (wait_for_any_event(events).contains(.StaDisconnected))

if (events.wait(wait_set, .{}).contains(.StaDisconnected))
return error.FailedToConnect;
}

Expand Down Expand Up @@ -678,61 +684,19 @@ pub const Event = union(EventType) {
StaNeighborRep: c.wifi_event_neighbor_report_t,
};

pub const EventSet = std.EnumSet(EventType);

var event_mutex: rtos.Mutex = .{};
var event_condition: rtos.Condition = .{};
var active_events: EventSet = .{};
pub const EventGroup = rtos.EventGroup(EventType);

pub fn wait_for_any_event(events: EventSet) EventSet {
event_mutex.lock();
defer event_mutex.unlock();

while (true) {
const intersection = active_events.intersectWith(events);
if (intersection.count() > 0) return intersection;
event_condition.wait(&event_mutex);
}
}

pub fn wait_for_all_events(events: EventSet) void {
event_mutex.lock();
defer event_mutex.unlock();

while (!active_events.supersetOf(events)) {
event_condition.wait(&event_mutex);
}
}

pub fn wait_for_event(event: EventType) void {
event_mutex.lock();
defer event_mutex.unlock();

while (!active_events.contains(event)) {
event_condition.wait(&event_mutex);
}
}

pub fn clear_events(events: EventSet) void {
event_mutex.lock();
defer event_mutex.unlock();

active_events = active_events.differenceWith(events);
}
pub var events: EventGroup = .{};

/// Internal function. Called by osi layer.
pub fn on_event_post(id: i32, data: ?*anyopaque, data_size: usize) void {
const event_type: EventType = @enumFromInt(id);
log.debug("event received: {t}", .{event_type});

update_sta_state(event_type);
update_ap_state(event_type);

{
event_mutex.lock();
defer event_mutex.unlock();
active_events.setPresent(event_type, true);
}
event_condition.broadcast();
events.set(.initOne(event_type));

const event = switch (event_type) {
inline else => |tag| blk: {
Expand Down Expand Up @@ -768,19 +732,35 @@ pub const StaState = enum(u32) {
}
};

pub const ApState = enum(u32) {
none,
started,
stopped,
};

var sta_state: std.atomic.Value(StaState) = .init(.none);
var ap_state: std.atomic.Value(ApState) = .init(.none);

fn update_sta_state(event: EventType) void {
const new_sta_state: StaState = switch (event) {
.StaStart => .started,
.StaConnected => .connected,
.StaDisconnected => .disconnected,
.StaConnected, .ApStaconnected => .connected,
.StaDisconnected, .ApStadisconnected => .disconnected,
.StaStop => .stopped,
else => return,
};
sta_state.store(new_sta_state, .monotonic);
}

fn update_ap_state(event: EventType) void {
const new_ap_state: ApState = switch (event) {
.ApStart => .started,
.StaStop => .stopped,
else => return,
};
ap_state.store(new_ap_state, .monotonic);
}

pub fn get_sta_state() StaState {
return sta_state.load(.monotonic);
}
Expand Down
Loading
Loading