diff --git a/lib/std/Io/EventLoop.zig b/lib/std/Io/EventLoop.zig index 065ddfa55f..84ea623954 100644 --- a/lib/std/Io/EventLoop.zig +++ b/lib/std/Io/EventLoop.zig @@ -4,28 +4,23 @@ const assert = std.debug.assert; const Allocator = std.mem.Allocator; const Io = std.Io; const EventLoop = @This(); +const Alignment = std.mem.Alignment; gpa: Allocator, mutex: std.Thread.Mutex, cond: std.Thread.Condition, queue: std.DoublyLinkedList(void), free: std.DoublyLinkedList(void), -main_fiber_buffer: [@sizeOf(Fiber) + max_result_len]u8 align(@alignOf(Fiber)), +main_context: Context, exit_awaiter: ?*Fiber, idle_count: usize, threads: std.ArrayListUnmanaged(Thread), threadlocal var current_idle_context: *Context = undefined; -threadlocal var current_fiber_context: *Context = undefined; +threadlocal var current_context: *Context = undefined; -/// Also used for context. -const max_result_len = 64; -/// Also used for context. -const max_result_align: std.mem.Alignment = .@"16"; - -const min_stack_size = 4 * 1024 * 1024; +/// Empirically saw 10KB being used by the self-hosted backend for logging. const idle_stack_size = 32 * 1024; -const stack_align = 16; const Thread = struct { thread: std.Thread, @@ -33,45 +28,53 @@ const Thread = struct { }; const Fiber = struct { - _: void align(max_result_align.toByteUnits()) = {}, - context: Context, awaiter: ?*Fiber, queue_node: std.DoublyLinkedList(void).Node, + result_align: Alignment, const finished: ?*Fiber = @ptrFromInt(std.mem.alignBackward(usize, std.math.maxInt(usize), @alignOf(Fiber))); - fn allocatedSlice(f: *Fiber) []align(@alignOf(Fiber)) u8 { - const base: [*]align(@alignOf(Fiber)) u8 = @ptrCast(f); - return base[0..std.mem.alignForward( + const max_result_align: Alignment = .@"16"; + const max_result_size = max_result_align.forward(64); + /// This includes any stack realignments that need to happen, and also the + /// initial frame return address slot and argument frame, depending on target. + const min_stack_size = 4 * 1024 * 1024; + const max_context_align: Alignment = .@"16"; + const max_context_size = max_context_align.forward(1024); + const allocation_size = std.mem.alignForward( + usize, + std.mem.alignForward( usize, - resultOffset() + max_result_len + min_stack_size, - std.heap.page_size_max, - )]; + max_result_align.forward(@sizeOf(Fiber)) + max_result_size + min_stack_size, + @max(@alignOf(AsyncClosure), max_context_align.toByteUnits()), + ) + @sizeOf(AsyncClosure) + max_context_size, + std.heap.page_size_max, + ); + + fn allocate(el: *EventLoop) error{OutOfMemory}!*Fiber { + return if (free_node: { + el.mutex.lock(); + defer el.mutex.unlock(); + break :free_node el.free.pop(); + }) |free_node| + @alignCast(@fieldParentPtr("queue_node", free_node)) + else + @ptrCast(try el.gpa.alignedAlloc(u8, @alignOf(Fiber), allocation_size)); } - fn argsOffset() usize { - return max_result_align.forward(@sizeOf(Fiber)); + fn allocatedSlice(f: *Fiber) []align(@alignOf(Fiber)) u8 { + return @as([*]align(@alignOf(Fiber)) u8, @ptrCast(f))[0..allocation_size]; } - fn resultOffset() usize { - return max_result_align.forward(argsOffset() + max_result_len); - } - - fn argsSlice(f: *Fiber) []u8 { - const base: [*]align(@alignOf(Fiber)) u8 = @ptrCast(f); - return base[argsOffset()..][0..max_result_len]; - } - - fn resultSlice(f: *Fiber) []u8 { - const base: [*]align(@alignOf(Fiber)) u8 = @ptrCast(f); - return base[resultOffset()..][0..max_result_len]; - } - - fn stackEndPointer(f: *Fiber) [*]u8 { + fn allocatedEnd(f: *Fiber) [*]u8 { const allocated_slice = f.allocatedSlice(); return allocated_slice[allocated_slice.len..].ptr; } + + fn resultPointer(f: *Fiber) [*]u8 { + return @ptrFromInt(f.result_align.forward(@intFromPtr(f) + @sizeOf(Fiber))); + } }; pub fn io(el: *EventLoop) Io { @@ -88,7 +91,7 @@ pub fn init(el: *EventLoop, gpa: Allocator) error{OutOfMemory}!void { const threads_bytes = ((std.Thread.getCpuCount() catch 1) -| 1) * @sizeOf(Thread); const idle_context_offset = std.mem.alignForward(usize, threads_bytes, @alignOf(Context)); const idle_stack_end_offset = std.mem.alignForward(usize, idle_context_offset + idle_stack_size, std.heap.page_size_max); - const allocated_slice = try gpa.alignedAlloc(u8, @max(@alignOf(Thread), @alignOf(Context), stack_align), idle_stack_end_offset); + const allocated_slice = try gpa.alignedAlloc(u8, @max(@alignOf(Thread), @alignOf(Context)), idle_stack_end_offset); errdefer gpa.free(allocated_slice); el.* = .{ .gpa = gpa, @@ -96,13 +99,13 @@ pub fn init(el: *EventLoop, gpa: Allocator) error{OutOfMemory}!void { .cond = .{}, .queue = .{}, .free = .{}, - .main_fiber_buffer = undefined, + .main_context = undefined, .exit_awaiter = null, .idle_count = 0, .threads = .initBuffer(@ptrCast(allocated_slice[0..threads_bytes])), }; const main_idle_context: *Context = @alignCast(std.mem.bytesAsValue(Context, allocated_slice[idle_context_offset..][0..@sizeOf(Context)])); - const idle_stack_end: [*]align(stack_align) usize = @alignCast(@ptrCast(allocated_slice[idle_stack_end_offset..].ptr)); + const idle_stack_end: [*]align(@max(@alignOf(Thread), @alignOf(Context))) usize = @alignCast(@ptrCast(allocated_slice[idle_stack_end_offset..].ptr)); (idle_stack_end - 1)[0..1].* = .{@intFromPtr(el)}; main_idle_context.* = .{ .rsp = @intFromPtr(idle_stack_end - 1), @@ -111,9 +114,8 @@ pub fn init(el: *EventLoop, gpa: Allocator) error{OutOfMemory}!void { }; std.log.debug("created main idle {*}", .{main_idle_context}); current_idle_context = main_idle_context; - const current_fiber: *Fiber = @ptrCast(&el.main_fiber_buffer); - std.log.debug("created main fiber {*}", .{current_fiber}); - current_fiber_context = ¤t_fiber.context; + std.log.debug("created main {*}", .{&el.main_context}); + current_context = &el.main_context; } pub fn deinit(el: *EventLoop) void { @@ -125,27 +127,11 @@ pub fn deinit(el: *EventLoop) void { } const idle_context_offset = std.mem.alignForward(usize, el.threads.items.len * @sizeOf(Thread), @alignOf(Context)); const idle_stack_end = std.mem.alignForward(usize, idle_context_offset + idle_stack_size, std.heap.page_size_max); - const allocated_ptr: [*]align(@max(@alignOf(Thread), @alignOf(Context), stack_align)) u8 = @alignCast(@ptrCast(el.threads.items.ptr)); + const allocated_ptr: [*]align(@max(@alignOf(Thread), @alignOf(Context))) u8 = @alignCast(@ptrCast(el.threads.items.ptr)); for (el.threads.items) |*thread| thread.thread.join(); el.gpa.free(allocated_ptr[0..idle_stack_end]); } -fn allocateFiber(el: *EventLoop) error{OutOfMemory}!*Fiber { - const free_node = free_node: { - el.mutex.lock(); - defer el.mutex.unlock(); - break :free_node el.free.pop(); - } orelse { - const n = std.mem.alignForward( - usize, - Fiber.resultOffset() + max_result_len + min_stack_size, - std.heap.page_size_max, - ); - return @alignCast(@ptrCast(try el.gpa.alignedAlloc(u8, @alignOf(Fiber), n))); - }; - return @alignCast(@fieldParentPtr("queue_node", free_node)); -} - fn yield(el: *EventLoop, optional_fiber: ?*Fiber, register_awaiter: ?*?*Fiber) void { const ready_context: *Context = ready_context: { const ready_fiber: *Fiber = optional_fiber orelse if (ready_node: { @@ -159,7 +145,7 @@ fn yield(el: *EventLoop, optional_fiber: ?*Fiber, register_awaiter: ?*?*Fiber) v break :ready_context &ready_fiber.context; }; const message: SwitchMessage = .{ - .prev_context = current_fiber_context, + .prev_context = current_context, .ready_context = ready_context, .register_awaiter = register_awaiter, }; @@ -189,14 +175,13 @@ fn schedule(el: *EventLoop, fiber: *Fiber) void { fn recycle(el: *EventLoop, fiber: *Fiber) void { std.log.debug("recyling {*}", .{fiber}); - fiber.awaiter = undefined; - @memset(fiber.resultSlice(), undefined); + @memset(fiber.allocatedSlice(), undefined); el.mutex.lock(); defer el.mutex.unlock(); el.free.append(&fiber.queue_node); } -fn mainIdle(el: *EventLoop, message: *const SwitchMessage) callconv(.c) noreturn { +fn mainIdle(el: *EventLoop, message: *const SwitchMessage) callconv(.withStackAlign(.c, @max(@alignOf(Thread), @alignOf(Context)))) noreturn { message.handle(el); el.yield(el.idle(), null); unreachable; // switched to dead fiber @@ -205,7 +190,7 @@ fn mainIdle(el: *EventLoop, message: *const SwitchMessage) callconv(.c) noreturn fn threadEntry(el: *EventLoop, thread: *Thread) void { std.log.debug("created thread idle {*}", .{&thread.idle_context}); current_idle_context = &thread.idle_context; - current_fiber_context = &thread.idle_context; + current_context = &thread.idle_context; _ = el.idle(); } @@ -230,7 +215,7 @@ const SwitchMessage = extern struct { register_awaiter: ?*?*Fiber, fn handle(message: *const SwitchMessage, el: *EventLoop) void { - current_fiber_context = message.ready_context; + current_context = message.ready_context; if (message.register_awaiter) |awaiter| { const prev_fiber: *Fiber = @alignCast(@fieldParentPtr("context", message.prev_context)); if (@atomicRmw(?*Fiber, awaiter, .Xchg, prev_fiber, .acq_rel) == Fiber.finished) el.schedule(prev_fiber); @@ -238,10 +223,13 @@ const SwitchMessage = extern struct { } }; -const Context = extern struct { - rsp: usize, - rbp: usize, - rip: usize, +const Context = switch (builtin.cpu.arch) { + .x86_64 => extern struct { + rsp: u64, + rbp: u64, + rip: u64, + }, + else => |arch| @compileError("unimplemented architecture: " ++ @tagName(arch)), }; inline fn contextSwitch(message: *const SwitchMessage) *const SwitchMessage { @@ -299,42 +287,45 @@ fn fiberEntry() callconv(.naked) void { pub fn @"async"( userdata: ?*anyopaque, result: []u8, - result_alignment: std.mem.Alignment, + result_alignment: Alignment, context: []const u8, - context_alignment: std.mem.Alignment, + context_alignment: Alignment, start: *const fn (context: *const anyopaque, result: *anyopaque) void, ) ?*std.Io.AnyFuture { - assert(result_alignment.compare(.lte, max_result_align)); // TODO - assert(context_alignment.compare(.lte, max_result_align)); // TODO - assert(result.len <= max_result_len); // TODO - assert(context.len <= max_result_len); // TODO + assert(result_alignment.compare(.lte, Fiber.max_result_align)); // TODO + assert(context_alignment.compare(.lte, Fiber.max_context_align)); // TODO + assert(result.len <= Fiber.max_result_size); // TODO + assert(context.len <= Fiber.max_context_size); // TODO const event_loop: *EventLoop = @alignCast(@ptrCast(userdata)); - const fiber = event_loop.allocateFiber() catch { + const fiber = Fiber.allocate(event_loop) catch { start(context.ptr, result.ptr); return null; }; - fiber.awaiter = null; - fiber.queue_node = .{ .data = {} }; - @memcpy(fiber.argsSlice()[0..context.len], context); std.log.debug("allocated {*}", .{fiber}); - const closure: *AsyncClosure = @ptrFromInt(std.mem.alignBackward( - usize, - @intFromPtr(fiber.stackEndPointer() - @sizeOf(AsyncClosure)), - @max(@alignOf(AsyncClosure), stack_align), - )); + const closure: *AsyncClosure = @ptrFromInt(Fiber.max_context_align.max(.of(AsyncClosure)).backward( + @intFromPtr(fiber.allocatedEnd()) - Fiber.max_context_size, + ) - @sizeOf(AsyncClosure)); + fiber.* = .{ + .context = switch (builtin.cpu.arch) { + .x86_64 => .{ + .rsp = @intFromPtr(closure) - @sizeOf(usize), + .rbp = 0, + .rip = @intFromPtr(&fiberEntry), + }, + else => |arch| @compileError("unimplemented architecture: " ++ @tagName(arch)), + }, + .awaiter = null, + .queue_node = undefined, + .result_align = result_alignment, + }; closure.* = .{ .event_loop = event_loop, .fiber = fiber, .start = start, }; - const stack_end: [*]align(stack_align) usize = @alignCast(@ptrCast(closure)); - fiber.context = .{ - .rsp = @intFromPtr(stack_end - 1), - .rbp = 0, - .rip = @intFromPtr(&fiberEntry), - }; + @memcpy(closure.contextPointer(), context); event_loop.schedule(fiber); return @ptrCast(fiber); @@ -345,10 +336,14 @@ const AsyncClosure = struct { fiber: *Fiber, start: *const fn (context: *const anyopaque, result: *anyopaque) void, - fn call(closure: *AsyncClosure, message: *const SwitchMessage) callconv(.c) noreturn { + fn contextPointer(closure: *AsyncClosure) [*]align(Fiber.max_context_align.toByteUnits()) u8 { + return @alignCast(@as([*]u8, @ptrCast(closure)) + @sizeOf(AsyncClosure)); + } + + fn call(closure: *AsyncClosure, message: *const SwitchMessage) callconv(.withStackAlign(.c, @alignOf(AsyncClosure))) noreturn { message.handle(closure.event_loop); std.log.debug("{*} performing async", .{closure.fiber}); - closure.start(closure.fiber.argsSlice().ptr, closure.fiber.resultSlice().ptr); + closure.start(closure.contextPointer(), closure.fiber.resultPointer()); const awaiter = @atomicRmw(?*Fiber, &closure.fiber.awaiter, .Xchg, Fiber.finished, .acq_rel); closure.event_loop.yield(awaiter, null); unreachable; // switched to dead fiber @@ -358,8 +353,7 @@ const AsyncClosure = struct { pub fn @"await"(userdata: ?*anyopaque, any_future: *std.Io.AnyFuture, result: []u8) void { const event_loop: *EventLoop = @alignCast(@ptrCast(userdata)); const future_fiber: *Fiber = @alignCast(@ptrCast(any_future)); - const result_src = future_fiber.resultSlice()[0..result.len]; if (@atomicLoad(?*Fiber, &future_fiber.awaiter, .acquire) != Fiber.finished) event_loop.yield(null, &future_fiber.awaiter); - @memcpy(result, result_src); + @memcpy(result, future_fiber.resultPointer()); event_loop.recycle(future_fiber); }