diff --git a/lib/std/Io/EventLoop.zig b/lib/std/Io/EventLoop.zig index 1fda9cb3d1..7ad43fd6a3 100644 --- a/lib/std/Io/EventLoop.zig +++ b/lib/std/Io/EventLoop.zig @@ -11,19 +11,21 @@ cond: std.Thread.Condition, queue: std.DoublyLinkedList(void), free: std.DoublyLinkedList(void), main_fiber_buffer: [@sizeOf(Fiber) + max_result_len]u8 align(@alignOf(Fiber)), -exiting: bool, +exit_awaiter: ?*Fiber, idle_count: usize, threads: std.ArrayListUnmanaged(Thread), -threadlocal var current_thread: *Thread = undefined; -threadlocal var current_fiber: *Fiber = undefined; +threadlocal var current_idle_context: *Context = undefined; +threadlocal var current_fiber_context: *Context = undefined; const max_result_len = 64; const min_stack_size = 4 * 1024 * 1024; +const idle_stack_size = 32 * 1024; +const stack_align = 16; const Thread = struct { thread: std.Thread, - idle_fiber: Fiber, + idle_context: Context, }; const Fiber = struct { @@ -54,6 +56,11 @@ const Fiber = struct { }; pub fn init(el: *EventLoop, gpa: Allocator) error{OutOfMemory}!void { + const threads_bytes = ((std.Thread.getCpuCount() catch 1) -| 1) * @sizeOf(Thread); + const idle_context_offset = std.mem.alignForward(usize, threads_bytes, @alignOf(Context)); + const idle_stack_end_offset = std.mem.alignForward(usize, idle_context_offset + idle_stack_size, std.heap.page_size_max); + const allocated_slice = try gpa.alignedAlloc(u8, @max(@alignOf(Thread), @alignOf(Context), stack_align), idle_stack_end_offset); + errdefer gpa.free(allocated_slice); el.* = .{ .gpa = gpa, .mutex = .{}, @@ -61,28 +68,37 @@ pub fn init(el: *EventLoop, gpa: Allocator) error{OutOfMemory}!void { .queue = .{}, .free = .{}, .main_fiber_buffer = undefined, - .exiting = false, + .exit_awaiter = null, .idle_count = 0, - .threads = try .initCapacity(gpa, @max(std.Thread.getCpuCount() catch 1, 1)), + .threads = .initBuffer(@ptrCast(allocated_slice[0..threads_bytes])), }; - current_thread = el.threads.addOneAssumeCapacity(); - current_fiber = @ptrCast(&el.main_fiber_buffer); + const main_idle_context: *Context = @alignCast(std.mem.bytesAsValue(Context, allocated_slice[idle_context_offset..][0..@sizeOf(Context)])); + const idle_stack_end: [*]align(stack_align) usize = @alignCast(@ptrCast(allocated_slice[idle_stack_end_offset..].ptr)); + (idle_stack_end - 1)[0..1].* = .{@intFromPtr(el)}; + main_idle_context.* = .{ + .rsp = @intFromPtr(idle_stack_end - 1), + .rbp = 0, + .rip = @intFromPtr(&mainIdleEntry), + }; + std.log.debug("created main idle {*}", .{main_idle_context}); + current_idle_context = main_idle_context; + const current_fiber: *Fiber = @ptrCast(&el.main_fiber_buffer); + std.log.debug("created main fiber {*}", .{current_fiber}); + current_fiber_context = ¤t_fiber.context; } pub fn deinit(el: *EventLoop) void { - { - el.mutex.lock(); - defer el.mutex.unlock(); - assert(el.queue.len == 0); // pending async - el.exiting = true; - } - el.cond.broadcast(); + assert(el.queue.len == 0); // pending async + el.yield(null, &el.exit_awaiter); while (el.free.pop()) |free_node| { const free_fiber: *Fiber = @fieldParentPtr("queue_node", free_node); el.gpa.free(free_fiber.allocatedSlice()); } - for (el.threads.items[1..]) |*thread| thread.thread.join(); - el.threads.deinit(el.gpa); + const idle_context_offset = std.mem.alignForward(usize, el.threads.items.len * @sizeOf(Thread), @alignOf(Context)); + const idle_stack_end = std.mem.alignForward(usize, idle_context_offset + idle_stack_size, std.heap.page_size_max); + const allocated_ptr: [*]align(@max(@alignOf(Thread), @alignOf(Context), stack_align)) u8 = @alignCast(@ptrCast(el.threads.items.ptr)); + for (el.threads.items) |*thread| thread.thread.join(); + el.gpa.free(allocated_ptr[0..idle_stack_end]); } fn allocateFiber(el: *EventLoop, result_len: usize) error{OutOfMemory}!*Fiber { @@ -103,40 +119,44 @@ fn allocateFiber(el: *EventLoop, result_len: usize) error{OutOfMemory}!*Fiber { } fn yield(el: *EventLoop, optional_fiber: ?*Fiber, register_awaiter: ?*?*Fiber) void { - const ready_fiber: *Fiber = optional_fiber orelse if (ready_node: { - el.mutex.lock(); - defer el.mutex.unlock(); - break :ready_node el.queue.pop(); - }) |ready_node| - @fieldParentPtr("queue_node", ready_node) - else - ¤t_thread.idle_fiber; + const ready_context: *Context = ready_context: { + const ready_fiber: *Fiber = optional_fiber orelse if (ready_node: { + el.mutex.lock(); + defer el.mutex.unlock(); + break :ready_node el.queue.pop(); + }) |ready_node| + @fieldParentPtr("queue_node", ready_node) + else + break :ready_context current_idle_context; + break :ready_context &ready_fiber.context; + }; const message: SwitchMessage = .{ - .prev_context = ¤t_fiber.context, - .ready_context = &ready_fiber.context, + .prev_context = current_fiber_context, + .ready_context = ready_context, .register_awaiter = register_awaiter, }; - std.log.debug("switching from {*} to {*}", .{ - @as(*Fiber, @fieldParentPtr("context", message.prev_context)), - @as(*Fiber, @fieldParentPtr("context", message.ready_context)), - }); + std.log.debug("switching from {*} to {*}", .{ message.prev_context, message.ready_context }); contextSwitch(&message).handle(el); } fn schedule(el: *EventLoop, fiber: *Fiber) void { - signal: { - el.mutex.lock(); - defer el.mutex.unlock(); - el.queue.append(&fiber.queue_node); - if (el.idle_count > 0) break :signal; - if (el.threads.items.len == el.threads.capacity) return; - const thread = el.threads.addOneAssumeCapacity(); - thread.thread = std.Thread.spawn(.{ - .stack_size = min_stack_size, - .allocator = el.gpa, - }, threadEntry, .{ el, thread }) catch return; + el.mutex.lock(); + el.queue.append(&fiber.queue_node); + if (el.idle_count > 0) { + el.mutex.unlock(); + el.cond.signal(); + return; } - el.cond.signal(); + defer el.mutex.unlock(); + if (el.threads.items.len == el.threads.capacity) return; + const thread = el.threads.addOneAssumeCapacity(); + thread.thread = std.Thread.spawn(.{ + .stack_size = idle_stack_size, + .allocator = el.gpa, + }, threadEntry, .{ el, thread }) catch { + el.threads.items.len -= 1; + return; + }; } fn recycle(el: *EventLoop, fiber: *Fiber) void { @@ -148,14 +168,28 @@ fn recycle(el: *EventLoop, fiber: *Fiber) void { el.free.append(&fiber.queue_node); } +fn mainIdle(el: *EventLoop, message: *const SwitchMessage) callconv(.c) noreturn { + message.handle(el); + el.yield(el.idle(), null); + unreachable; // switched to dead fiber +} + fn threadEntry(el: *EventLoop, thread: *Thread) void { - current_thread = thread; - current_fiber = &thread.idle_fiber; + std.log.debug("created thread idle {*}", .{&thread.idle_context}); + current_idle_context = &thread.idle_context; + current_fiber_context = &thread.idle_context; + _ = el.idle(); +} + +fn idle(el: *EventLoop) *Fiber { while (true) { el.yield(null, null); + if (@atomicLoad(?*Fiber, &el.exit_awaiter, .acquire)) |exit_awaiter| { + el.cond.broadcast(); + return exit_awaiter; + } el.mutex.lock(); defer el.mutex.unlock(); - if (el.exiting) return; el.idle_count += 1; defer el.idle_count -= 1; el.cond.wait(&el.mutex); @@ -169,7 +203,7 @@ const SwitchMessage = extern struct { fn handle(message: *const SwitchMessage, el: *EventLoop) void { const prev_fiber: *Fiber = @fieldParentPtr("context", message.prev_context); - current_fiber = @fieldParentPtr("context", message.ready_context); + current_fiber_context = message.ready_context; if (message.register_awaiter) |awaiter| if (@atomicRmw(?*Fiber, awaiter, .Xchg, prev_fiber, .acq_rel) == Fiber.finished) el.schedule(prev_fiber); } }; @@ -208,6 +242,18 @@ inline fn contextSwitch(message: *const SwitchMessage) *const SwitchMessage { }; } +fn mainIdleEntry() callconv(.naked) void { + switch (builtin.cpu.arch) { + .x86_64 => asm volatile ( + \\ movq (%%rsp), %%rdi + \\ jmp %[mainIdle:P] + : + : [mainIdle] "X" (&mainIdle), + ), + else => |arch| @compileError("unimplemented architecture: " ++ @tagName(arch)), + } +} + fn fiberEntry() callconv(.naked) void { switch (builtin.cpu.arch) { .x86_64 => asm volatile ( @@ -238,7 +284,7 @@ pub fn @"async"( const closure: *AsyncClosure = @ptrFromInt(std.mem.alignBackward( usize, @intFromPtr(fiber.stackEndPointer() - @sizeOf(AsyncClosure)), - @alignOf(AsyncClosure), + @max(@alignOf(AsyncClosure), stack_align), )); closure.* = .{ .event_loop = event_loop, @@ -246,7 +292,7 @@ pub fn @"async"( .fiber = fiber, .start = start, }; - const stack_end: [*]align(16) usize = @alignCast(@ptrCast(closure)); + const stack_end: [*]align(stack_align) usize = @alignCast(@ptrCast(closure)); fiber.context = .{ .rsp = @intFromPtr(stack_end - 1), .rbp = 0, @@ -258,7 +304,6 @@ pub fn @"async"( } const AsyncClosure = struct { - _: void align(16) = {}, event_loop: *EventLoop, context: ?*anyopaque, fiber: *Fiber,