From 266bcfbf2f9fc6fa2c13229f8a00d57e57613c91 Mon Sep 17 00:00:00 2001 From: Andrew Kelley Date: Mon, 31 Mar 2025 14:36:20 -0700 Subject: [PATCH] EventLoop: implement detached async data races on deinit tho --- lib/std/Io.zig | 57 ++++++++++--------- lib/std/Io/EventLoop.zig | 116 ++++++++++++++++++++++++++++++++++----- 2 files changed, 133 insertions(+), 40 deletions(-) diff --git a/lib/std/Io.zig b/lib/std/Io.zig index 8b241add2d..413d31b396 100644 --- a/lib/std/Io.zig +++ b/lib/std/Io.zig @@ -626,7 +626,7 @@ pub const VTable = struct { /// Thread-safe. cancelRequested: *const fn (?*anyopaque) bool, - mutexLock: *const fn (?*anyopaque, prev_state: Mutex.State, mutex: *Mutex) error{Canceled}!void, + mutexLock: *const fn (?*anyopaque, prev_state: Mutex.State, mutex: *Mutex) Cancelable!void, mutexUnlock: *const fn (?*anyopaque, prev_state: Mutex.State, mutex: *Mutex) void, conditionWait: *const fn (?*anyopaque, cond: *Condition, mutex: *Mutex, timeout_ns: ?u64) Condition.WaitError!void, @@ -645,11 +645,11 @@ pub const VTable = struct { pub const OpenFlags = fs.File.OpenFlags; pub const CreateFlags = fs.File.CreateFlags; -pub const FileOpenError = fs.File.OpenError || error{Canceled}; -pub const FileReadError = fs.File.ReadError || error{Canceled}; -pub const FilePReadError = fs.File.PReadError || error{Canceled}; -pub const FileWriteError = fs.File.WriteError || error{Canceled}; -pub const FilePWriteError = fs.File.PWriteError || error{Canceled}; +pub const FileOpenError = fs.File.OpenError || Cancelable; +pub const FileReadError = fs.File.ReadError || Cancelable; +pub const FilePReadError = fs.File.PReadError || Cancelable; +pub const FileWriteError = fs.File.WriteError || Cancelable; +pub const FilePWriteError = fs.File.PWriteError || Cancelable; pub const Timestamp = enum(i96) { _, @@ -666,7 +666,7 @@ pub const Deadline = union(enum) { nanoseconds: i96, timestamp: Timestamp, }; -pub const ClockGetTimeError = std.posix.ClockGetTimeError || error{Canceled}; +pub const ClockGetTimeError = std.posix.ClockGetTimeError || Cancelable; pub const SleepError = error{ UnsupportedClock, Unexpected, Canceled }; pub const AnyFuture = opaque {}; @@ -734,7 +734,7 @@ pub const Mutex = if (true) struct { return prev_state.isUnlocked(); } - pub fn lock(mutex: *Mutex, io: std.Io) error{Canceled}!void { + pub fn lock(mutex: *Mutex, io: std.Io) Cancelable!void { const prev_state: State = @enumFromInt(@atomicRmw( usize, @as(*usize, @ptrCast(&mutex.state)), @@ -783,7 +783,7 @@ pub const Mutex = if (true) struct { } /// Avoids the vtable for uncontended locks. - pub fn lock(m: *Mutex, io: Io) error{Canceled}!void { + pub fn lock(m: *Mutex, io: Io) Cancelable!void { if (!m.tryLock()) { @branchHint(.unlikely); try io.vtable.mutexLock(io.userdata, {}, m); @@ -809,10 +809,10 @@ pub const Condition = struct { all, }; - pub fn wait(cond: *Condition, io: Io, mutex: *Mutex) void { + pub fn wait(cond: *Condition, io: Io, mutex: *Mutex) Cancelable!void { io.vtable.conditionWait(io.userdata, cond, mutex, null) catch |err| switch (err) { error.Timeout => unreachable, // no timeout provided so we shouldn't have timed-out - error.Canceled => return, // handled as spurious wakeup + error.Canceled => return error.Canceled, }; } @@ -829,6 +829,11 @@ pub const Condition = struct { } }; +pub const Cancelable = error{ + /// Caller has requested the async operation to stop. + Canceled, +}; + pub const TypeErasedQueue = struct { mutex: Mutex, @@ -852,7 +857,7 @@ pub const TypeErasedQueue = struct { pub fn init(buffer: []u8) TypeErasedQueue { return .{ - .mutex = .{}, + .mutex = .init, .buffer = buffer, .put_index = 0, .get_index = 0, @@ -861,10 +866,10 @@ pub const TypeErasedQueue = struct { }; } - pub fn put(q: *TypeErasedQueue, io: Io, elements: []const u8, min: usize) usize { + pub fn put(q: *TypeErasedQueue, io: Io, elements: []const u8, min: usize) Cancelable!usize { assert(elements.len >= min); - q.mutex.lock(io); + try q.mutex.lock(io); defer q.mutex.unlock(io); // Getters have first priority on the data, and only when the getters @@ -911,15 +916,15 @@ pub const TypeErasedQueue = struct { .data = .{ .remaining = remaining, .condition = .{} }, }; q.putters.append(&node); - node.data.condition.wait(io, &q.mutex); + try node.data.condition.wait(io, &q.mutex); remaining = node.data.remaining; } } - pub fn get(q: *@This(), io: Io, buffer: []u8, min: usize) usize { + pub fn get(q: *@This(), io: Io, buffer: []u8, min: usize) Cancelable!usize { assert(buffer.len >= min); - q.mutex.lock(io); + try q.mutex.lock(io); defer q.mutex.unlock(io); // The ring buffer gets first priority, then data should come from any @@ -976,7 +981,7 @@ pub const TypeErasedQueue = struct { .data = .{ .remaining = remaining, .condition = .{} }, }; q.getters.append(&node); - node.data.condition.wait(io, &q.mutex); + try node.data.condition.wait(io, &q.mutex); remaining = node.data.remaining; } } @@ -1030,8 +1035,8 @@ pub fn Queue(Elem: type) type { /// Returns how many elements have been added to the queue. /// /// Asserts that `elements.len >= min`. - pub fn put(q: *@This(), io: Io, elements: []const Elem, min: usize) usize { - return @divExact(q.type_erased.put(io, @ptrCast(elements), min * @sizeOf(Elem)), @sizeOf(Elem)); + pub fn put(q: *@This(), io: Io, elements: []const Elem, min: usize) Cancelable!usize { + return @divExact(try q.type_erased.put(io, @ptrCast(elements), min * @sizeOf(Elem)), @sizeOf(Elem)); } /// Receives elements from the beginning of the queue. The function @@ -1041,17 +1046,17 @@ pub fn Queue(Elem: type) type { /// Returns how many elements of `buffer` have been populated. /// /// Asserts that `buffer.len >= min`. - pub fn get(q: *@This(), io: Io, buffer: []Elem, min: usize) usize { - return @divExact(q.type_erased.get(io, @ptrCast(buffer), min * @sizeOf(Elem)), @sizeOf(Elem)); + pub fn get(q: *@This(), io: Io, buffer: []Elem, min: usize) Cancelable!usize { + return @divExact(try q.type_erased.get(io, @ptrCast(buffer), min * @sizeOf(Elem)), @sizeOf(Elem)); } - pub fn putOne(q: *@This(), io: Io, item: Elem) void { - assert(q.put(io, &.{item}, 1) == 1); + pub fn putOne(q: *@This(), io: Io, item: Elem) Cancelable!void { + assert(try q.put(io, &.{item}, 1) == 1); } - pub fn getOne(q: *@This(), io: Io) Elem { + pub fn getOne(q: *@This(), io: Io) Cancelable!Elem { var buf: [1]Elem = undefined; - assert(q.get(io, &buf, 1) == 1); + assert(try q.get(io, &buf, 1) == 1); return buf[0]; } }; diff --git a/lib/std/Io/EventLoop.zig b/lib/std/Io/EventLoop.zig index d642a0e227..d03f339b52 100644 --- a/lib/std/Io/EventLoop.zig +++ b/lib/std/Io/EventLoop.zig @@ -27,6 +27,7 @@ const Thread = struct { current_context: *Context, ready_queue: ?*Fiber, free_queue: ?*Fiber, + detached_queue: ?*Fiber, io_uring: IoUring, idle_search_index: u32, steal_ready_search_index: u32, @@ -208,6 +209,7 @@ pub fn init(el: *EventLoop, gpa: Allocator) !void { .current_context = &main_fiber.context, .ready_queue = null, .free_queue = null, + .detached_queue = null, .io_uring = try IoUring.init(io_uring_entries, 0), .idle_search_index = 1, .steal_ready_search_index = 1, @@ -218,7 +220,16 @@ pub fn init(el: *EventLoop, gpa: Allocator) !void { } pub fn deinit(el: *EventLoop) void { + // Wait for detached fibers. const active_threads = @atomicLoad(u32, &el.threads.active, .acquire); + for (el.threads.allocated[0..active_threads]) |*thread| { + while (thread.detached_queue) |detached_fiber| { + if (@atomicLoad(?*Fiber, &detached_fiber.awaiter, .acquire) != Fiber.finished) + el.yield(null, .{ .register_awaiter = &detached_fiber.awaiter }); + detached_fiber.recycle(); + } + } + for (el.threads.allocated[0..active_threads]) |*thread| { const ready_fiber = @atomicLoad(?*Fiber, &thread.ready_queue, .monotonic); assert(ready_fiber == null or ready_fiber == Fiber.finished); // pending async @@ -336,6 +347,7 @@ fn schedule(el: *EventLoop, thread: *Thread, ready_queue: Fiber.Queue) void { .current_context = &new_thread.idle_context, .ready_queue = ready_queue.head, .free_queue = null, + .detached_queue = null, .io_uring = IoUring.init(io_uring_entries, 0) catch |err| { @atomicStore(u32, &el.threads.reserved, new_thread_index, .release); // no more access to `thread` after giving up reservation @@ -470,6 +482,7 @@ const SwitchMessage = struct { const PendingTask = union(enum) { nothing, reschedule, + recycle: *Fiber, register_awaiter: *?*Fiber, lock_mutex: struct { prev_state: Io.Mutex.State, @@ -488,6 +501,9 @@ const SwitchMessage = struct { assert(prev_fiber.queue_next == null); el.schedule(thread, .{ .head = prev_fiber, .tail = prev_fiber }); }, + .recycle => |fiber| { + fiber.recycle(); + }, .register_awaiter => |awaiter| { const prev_fiber: *Fiber = @alignCast(@fieldParentPtr("context", message.contexts.prev)); assert(prev_fiber.queue_next == null); @@ -612,6 +628,18 @@ fn fiberEntry() callconv(.naked) void { } } +fn fiberEntryDetached() callconv(.naked) void { + switch (builtin.cpu.arch) { + .x86_64 => asm volatile ( + \\ leaq 8(%%rsp), %%rdi + \\ jmp %[DetachedClosure_call:P] + : + : [DetachedClosure_call] "X" (&DetachedClosure.call), + ), + else => |arch| @compileError("unimplemented architecture: " ++ @tagName(arch)), + } +} + const AsyncClosure = struct { event_loop: *EventLoop, fiber: *Fiber, @@ -632,6 +660,31 @@ const AsyncClosure = struct { } }; +const DetachedClosure = struct { + event_loop: *EventLoop, + fiber: *Fiber, + start: *const fn (context: *const anyopaque) void, + + fn contextPointer(closure: *DetachedClosure) [*]align(Fiber.max_context_align.toByteUnits()) u8 { + return @alignCast(@as([*]u8, @ptrCast(closure)) + @sizeOf(DetachedClosure)); + } + + fn call(closure: *DetachedClosure, message: *const SwitchMessage) callconv(.withStackAlign(.c, @alignOf(DetachedClosure))) noreturn { + message.handle(closure.event_loop); + std.log.debug("{*} performing async detached", .{closure.fiber}); + closure.start(closure.contextPointer()); + const current_thread: *Thread = .current(); + current_thread.detached_queue = closure.fiber.queue_next; + const awaiter = @atomicRmw(?*Fiber, &closure.fiber.awaiter, .Xchg, Fiber.finished, .acq_rel); + if (awaiter) |a| { + closure.event_loop.yield(a, .nothing); + } else { + closure.event_loop.yield(null, .{ .recycle = closure.fiber }); + } + unreachable; // switched to dead fiber + } +}; + fn @"async"( userdata: ?*anyopaque, result: []u8, @@ -682,6 +735,53 @@ fn @"async"( return @ptrCast(fiber); } +fn go( + userdata: ?*anyopaque, + context: []const u8, + context_alignment: std.mem.Alignment, + start: *const fn (context: *const anyopaque) void, +) void { + assert(context_alignment.compare(.lte, Fiber.max_context_align)); // TODO + assert(context.len <= Fiber.max_context_size); // TODO + + const event_loop: *EventLoop = @alignCast(@ptrCast(userdata)); + const fiber = Fiber.allocate(event_loop) catch { + start(context.ptr); + return; + }; + std.log.debug("allocated {*}", .{fiber}); + + const current_thread: *Thread = .current(); + const closure: *DetachedClosure = @ptrFromInt(Fiber.max_context_align.max(.of(DetachedClosure)).backward( + @intFromPtr(fiber.allocatedEnd()) - Fiber.max_context_size, + ) - @sizeOf(DetachedClosure)); + fiber.* = .{ + .required_align = {}, + .context = switch (builtin.cpu.arch) { + .x86_64 => .{ + .rsp = @intFromPtr(closure) - @sizeOf(usize), + .rbp = 0, + .rip = @intFromPtr(&fiberEntryDetached), + }, + else => |arch| @compileError("unimplemented architecture: " ++ @tagName(arch)), + }, + .awaiter = null, + .queue_next = current_thread.detached_queue, + .cancel_thread = null, + .awaiting_completions = .initEmpty(), + }; + current_thread.detached_queue = fiber; + closure.* = .{ + .event_loop = event_loop, + .fiber = fiber, + .start = start, + }; + @memcpy(closure.contextPointer(), context); + + event_loop.schedule(current_thread, .{ .head = fiber, .tail = fiber }); +} + + fn @"await"( userdata: ?*anyopaque, any_future: *std.Io.AnyFuture, @@ -690,24 +790,12 @@ fn @"await"( ) void { const event_loop: *EventLoop = @alignCast(@ptrCast(userdata)); const future_fiber: *Fiber = @alignCast(@ptrCast(any_future)); - if (@atomicLoad(?*Fiber, &future_fiber.awaiter, .acquire) != Fiber.finished) event_loop.yield(null, .{ .register_awaiter = &future_fiber.awaiter }); + if (@atomicLoad(?*Fiber, &future_fiber.awaiter, .acquire) != Fiber.finished) + event_loop.yield(null, .{ .register_awaiter = &future_fiber.awaiter }); @memcpy(result, future_fiber.resultBytes(result_alignment)); future_fiber.recycle(); } -fn go( - userdata: ?*anyopaque, - context: []const u8, - context_alignment: std.mem.Alignment, - start: *const fn (context: *const anyopaque) void, -) void { - _ = userdata; - _ = context; - _ = context_alignment; - _ = start; - @panic("TODO"); -} - fn cancel( userdata: ?*anyopaque, any_future: *std.Io.AnyFuture,