From d958077203bbf022947e36935def869dcb66be12 Mon Sep 17 00:00:00 2001 From: Jacob Young Date: Sat, 29 Mar 2025 10:56:45 -0400 Subject: [PATCH] EventLoop: fix futex usage How silly of me to forget that the kernel doesn't implement its own API. The scheduling is not great, but at least doesn't deadlock or hammer. --- lib/std/Io/EventLoop.zig | 79 +++++++++++++++++++++------------------- 1 file changed, 41 insertions(+), 38 deletions(-) diff --git a/lib/std/Io/EventLoop.zig b/lib/std/Io/EventLoop.zig index a02e1f3b40..7baca5c853 100644 --- a/lib/std/Io/EventLoop.zig +++ b/lib/std/Io/EventLoop.zig @@ -11,7 +11,7 @@ gpa: Allocator, mutex: std.Thread.Mutex, queue: std.DoublyLinkedList(void), /// Atomic copy of queue.len -queue_len: usize, +queue_len: u32, free: std.DoublyLinkedList(void), main_fiber: Fiber, idle_count: usize, @@ -20,8 +20,8 @@ exiting: bool, threadlocal var thread_index: u32 = undefined; -/// Empirically saw 10KB being used by the self-hosted backend for logging. -const idle_stack_size = 64 * 1024; +/// Empirically saw >128KB being used by the self-hosted backend to panic. +const idle_stack_size = 256 * 1024; const io_uring_entries = 64; @@ -143,6 +143,7 @@ pub fn deinit(el: *EventLoop) void { const allocated_ptr: [*]align(@alignOf(Thread)) u8 = @alignCast(@ptrCast(el.threads.items.ptr)); for (el.threads.items[1..]) |*thread| thread.thread.join(); el.gpa.free(allocated_ptr[0..idle_stack_end_offset]); + el.* = undefined; } fn yield(el: *EventLoop, optional_fiber: ?*Fiber, pending_task: SwitchMessage.PendingTask) void { @@ -151,8 +152,9 @@ fn yield(el: *EventLoop, optional_fiber: ?*Fiber, pending_task: SwitchMessage.Pe const ready_fiber: *Fiber = optional_fiber orelse if (ready_node: { el.mutex.lock(); defer el.mutex.unlock(); + const expected_queue_len = std.math.lossyCast(u32, el.queue.len); const ready_node = el.queue.pop(); - @atomicStore(usize, &el.queue_len, el.queue.len, .unordered); + _ = @cmpxchgStrong(u32, &el.queue_len, expected_queue_len, std.math.lossyCast(u32, el.queue.len), .monotonic, .monotonic); break :ready_node ready_node; }) |ready_node| @alignCast(@fieldParentPtr("queue_node", ready_node)) @@ -172,20 +174,16 @@ fn yield(el: *EventLoop, optional_fiber: ?*Fiber, pending_task: SwitchMessage.Pe } fn schedule(el: *EventLoop, fiber: *Fiber) void { + std.log.debug("scheduling {*}", .{fiber}); if (idle_count: { el.mutex.lock(); defer el.mutex.unlock(); + const expected_queue_len = std.math.lossyCast(u32, el.queue.len); el.queue.append(&fiber.queue_node); - @atomicStore(usize, &el.queue_len, el.queue.len, .unordered); + _ = @cmpxchgStrong(u32, &el.queue_len, expected_queue_len, std.math.lossyCast(u32, el.queue.len), .monotonic, .monotonic); break :idle_count el.idle_count; } > 0) { - _ = std.os.linux.futex2_wake(&el.queue_len, std.math.maxInt(usize), 1, switch (@bitSizeOf(usize)) { - 8 => std.os.linux.FUTEX2.SIZE_U8, - 16 => std.os.linux.FUTEX2.SIZE_U16, - 32 => std.os.linux.FUTEX2.SIZE_U32, - 64 => std.os.linux.FUTEX2.SIZE_U64, - else => @compileError("unsupported @sizeOf(usize)"), - } | std.os.linux.FUTEX2.PRIVATE); // TODO: io_uring + _ = std.os.linux.futex2_wake(&el.queue_len, std.math.maxInt(u32), 1, std.os.linux.FUTEX2.SIZE_U32 | std.os.linux.FUTEX2.PRIVATE); // TODO: io_uring return; } if (el.threads.items.len == el.threads.capacity) return; @@ -226,8 +224,8 @@ fn threadEntry(el: *EventLoop, index: usize) void { el.idle(); } -const UserData = enum(u64) { - queue_len_futex_wait, +const CompletionKey = enum(u64) { + queue_len_futex_wait = 1, _, }; @@ -235,33 +233,44 @@ fn idle(el: *EventLoop) void { const thread: *Thread = &el.threads.items[thread_index]; const iou = &thread.io_uring; var cqes_buffer: [io_uring_entries]std.os.linux.io_uring_cqe = undefined; - var futex_is_scheduled: bool = false; + var queue_len_futex_is_scheduled: bool = false; while (true) { el.yield(null, .nothing); if (@atomicLoad(bool, &el.exiting, .acquire)) return; - if (!futex_is_scheduled) { + if (!queue_len_futex_is_scheduled) { const sqe = getSqe(&thread.io_uring); - sqe.prep_rw(.FUTEX_WAIT, switch (@bitSizeOf(usize)) { - 8 => std.os.linux.FUTEX2.SIZE_U8, - 16 => std.os.linux.FUTEX2.SIZE_U16, - 32 => std.os.linux.FUTEX2.SIZE_U32, - 64 => std.os.linux.FUTEX2.SIZE_U64, - else => @compileError("unsupported @sizeOf(usize)"), - } | std.os.linux.FUTEX2.PRIVATE, @intFromPtr(&el.queue_len), 0, 0); - sqe.addr3 = std.math.maxInt(u64); - sqe.user_data = @intFromEnum(UserData.queue_len_futex_wait); - futex_is_scheduled = true; + sqe.prep_rw(.FUTEX_WAIT, std.os.linux.FUTEX2.SIZE_U32 | std.os.linux.FUTEX2.PRIVATE, @intFromPtr(&el.queue_len), 0, 0); + sqe.addr3 = std.math.maxInt(u32); + sqe.user_data = @intFromEnum(CompletionKey.queue_len_futex_wait); + queue_len_futex_is_scheduled = true; } _ = iou.submit_and_wait(1) catch |err| switch (err) { - error.SignalInterrupt => 0, + error.SignalInterrupt => std.log.debug("submit_and_wait: SignalInterrupt", .{}), else => @panic(@errorName(err)), }; for (cqes_buffer[0 .. iou.copy_cqes(&cqes_buffer, 1) catch |err| switch (err) { - error.SignalInterrupt => 0, + error.SignalInterrupt => cqes_len: { + std.log.debug("copy_cqes: SignalInterrupt", .{}); + break :cqes_len 0; + }, else => @panic(@errorName(err)), - }]) |cqe| switch (@as(UserData, @enumFromInt(cqe.user_data))) { - .queue_len_futex_wait => futex_is_scheduled = false, + }]) |cqe| switch (@as(CompletionKey, @enumFromInt(cqe.user_data))) { + .queue_len_futex_wait => { + switch (errno(cqe.res)) { + .SUCCESS, .AGAIN => {}, + .INVAL => unreachable, + else => |err| { + std.posix.unexpectedErrno(err) catch {}; + @panic("unexpected"); + }, + } + std.log.debug("{*} woken up with queue size of {d}", .{ + &thread.idle_context, + @atomicLoad(u32, &el.queue_len, .unordered), + }); + queue_len_futex_is_scheduled = false; + }, _ => { const fiber: *Fiber = @ptrFromInt(cqe.user_data); const res: *i32 = @ptrCast(@alignCast(fiber.resultPointer())); @@ -296,14 +305,8 @@ const SwitchMessage = struct { }, .exit => { @atomicStore(bool, &el.exiting, true, .unordered); - @atomicStore(usize, &el.queue_len, std.math.maxInt(usize), .release); - _ = std.os.linux.futex2_wake(&el.queue_len, std.math.maxInt(usize), std.math.maxInt(i32), switch (@bitSizeOf(usize)) { - 8 => std.os.linux.FUTEX2.SIZE_U8, - 16 => std.os.linux.FUTEX2.SIZE_U16, - 32 => std.os.linux.FUTEX2.SIZE_U32, - 64 => std.os.linux.FUTEX2.SIZE_U64, - else => @compileError("unsupported @sizeOf(usize)"), - } | std.os.linux.FUTEX2.PRIVATE); // TODO: use io_uring + @atomicStore(u32, &el.queue_len, std.math.maxInt(u32), .release); + _ = std.os.linux.futex2_wake(&el.queue_len, std.math.maxInt(u32), std.math.maxInt(i32), std.os.linux.FUTEX2.SIZE_U32 | std.os.linux.FUTEX2.PRIVATE); // TODO: use io_uring }, } }