diff --git a/lib/std/Io/EventLoop.zig b/lib/std/Io/EventLoop.zig index 802089cc55..fbab98ff00 100644 --- a/lib/std/Io/EventLoop.zig +++ b/lib/std/Io/EventLoop.zig @@ -18,7 +18,11 @@ threads: std.ArrayListUnmanaged(Thread), threadlocal var current_idle_context: *Context = undefined; threadlocal var current_fiber_context: *Context = undefined; +/// Also used for context. const max_result_len = 64; +/// Also used for context. +const max_result_align: std.mem.Alignment = .@"16"; + const min_stack_size = 4 * 1024 * 1024; const idle_stack_size = 32 * 1024; const stack_align = 16; @@ -29,6 +33,8 @@ const Thread = struct { }; const Fiber = struct { + _: void align(max_result_align.toByteUnits()) = {}, + context: Context, awaiter: ?*Fiber, queue_node: std.DoublyLinkedList(void).Node, @@ -39,14 +45,27 @@ const Fiber = struct { const base: [*]align(@alignOf(Fiber)) u8 = @ptrCast(f); return base[0..std.mem.alignForward( usize, - @sizeOf(Fiber) + max_result_len + min_stack_size, + resultOffset() + max_result_len + min_stack_size, std.heap.page_size_max, )]; } + fn argsOffset() usize { + return max_result_align.forward(@sizeOf(Fiber)); + } + + fn resultOffset() usize { + return max_result_align.forward(argsOffset() + max_result_len); + } + + fn argsSlice(f: *Fiber) []u8 { + const base: [*]align(@alignOf(Fiber)) u8 = @ptrCast(f); + return base[argsOffset()..][0..max_result_len]; + } + fn resultSlice(f: *Fiber) []u8 { const base: [*]align(@alignOf(Fiber)) u8 = @ptrCast(f); - return base[@sizeOf(Fiber)..][0..max_result_len]; + return base[resultOffset()..][0..max_result_len]; } fn stackEndPointer(f: *Fiber) [*]u8 { @@ -102,7 +121,7 @@ pub fn deinit(el: *EventLoop) void { assert(el.queue.len == 0); // pending async el.yield(null, &el.exit_awaiter); while (el.free.pop()) |free_node| { - const free_fiber: *Fiber = @fieldParentPtr("queue_node", free_node); + const free_fiber: *Fiber = @alignCast(@fieldParentPtr("queue_node", free_node)); el.gpa.free(free_fiber.allocatedSlice()); } const idle_context_offset = std.mem.alignForward(usize, el.threads.items.len * @sizeOf(Thread), @alignOf(Context)); @@ -112,8 +131,7 @@ pub fn deinit(el: *EventLoop) void { el.gpa.free(allocated_ptr[0..idle_stack_end]); } -fn allocateFiber(el: *EventLoop, result_len: usize) error{OutOfMemory}!*Fiber { - assert(result_len <= max_result_len); +fn allocateFiber(el: *EventLoop) error{OutOfMemory}!*Fiber { const free_node = free_node: { el.mutex.lock(); defer el.mutex.unlock(); @@ -121,12 +139,12 @@ fn allocateFiber(el: *EventLoop, result_len: usize) error{OutOfMemory}!*Fiber { } orelse { const n = std.mem.alignForward( usize, - @sizeOf(Fiber) + max_result_len + min_stack_size, + Fiber.resultOffset() + max_result_len + min_stack_size, std.heap.page_size_max, ); return @alignCast(@ptrCast(try el.gpa.alignedAlloc(u8, @alignOf(Fiber), n))); }; - return @fieldParentPtr("queue_node", free_node); + return @alignCast(@fieldParentPtr("queue_node", free_node)); } fn yield(el: *EventLoop, optional_fiber: ?*Fiber, register_awaiter: ?*?*Fiber) void { @@ -136,7 +154,7 @@ fn yield(el: *EventLoop, optional_fiber: ?*Fiber, register_awaiter: ?*?*Fiber) v defer el.mutex.unlock(); break :ready_node el.queue.pop(); }) |ready_node| - @fieldParentPtr("queue_node", ready_node) + @alignCast(@fieldParentPtr("queue_node", ready_node)) else break :ready_context current_idle_context; break :ready_context &ready_fiber.context; @@ -213,7 +231,7 @@ const SwitchMessage = extern struct { register_awaiter: ?*?*Fiber, fn handle(message: *const SwitchMessage, el: *EventLoop) void { - const prev_fiber: *Fiber = @fieldParentPtr("context", message.prev_context); + const prev_fiber: *Fiber = @alignCast(@fieldParentPtr("context", message.prev_context)); current_fiber_context = message.ready_context; if (message.register_awaiter) |awaiter| if (@atomicRmw(?*Fiber, awaiter, .Xchg, prev_fiber, .acq_rel) == Fiber.finished) el.schedule(prev_fiber); } @@ -279,17 +297,25 @@ fn fiberEntry() callconv(.naked) void { pub fn @"async"( userdata: ?*anyopaque, - eager_result: []u8, - context: ?*anyopaque, - start: *const fn (context: ?*anyopaque, result: *anyopaque) void, + result: []u8, + result_alignment: std.mem.Alignment, + context: []const u8, + context_alignment: std.mem.Alignment, + start: *const fn (context: *const anyopaque, result: *anyopaque) void, ) ?*std.Io.AnyFuture { + assert(result_alignment.compare(.lte, max_result_align)); // TODO + assert(context_alignment.compare(.lte, max_result_align)); // TODO + assert(result.len <= max_result_len); // TODO + assert(context.len <= max_result_len); // TODO + const event_loop: *EventLoop = @alignCast(@ptrCast(userdata)); - const fiber = event_loop.allocateFiber(eager_result.len) catch { - start(context, eager_result.ptr); + const fiber = event_loop.allocateFiber() catch { + start(context.ptr, result.ptr); return null; }; fiber.awaiter = null; fiber.queue_node = .{ .data = {} }; + @memcpy(fiber.argsSlice()[0..context.len], context); std.log.debug("allocated {*}", .{fiber}); const closure: *AsyncClosure = @ptrFromInt(std.mem.alignBackward( @@ -299,7 +325,6 @@ pub fn @"async"( )); closure.* = .{ .event_loop = event_loop, - .context = context, .fiber = fiber, .start = start, }; @@ -316,14 +341,13 @@ pub fn @"async"( const AsyncClosure = struct { event_loop: *EventLoop, - context: ?*anyopaque, fiber: *Fiber, - start: *const fn (context: ?*anyopaque, result: *anyopaque) void, + start: *const fn (context: *const anyopaque, result: *anyopaque) void, fn call(closure: *AsyncClosure, message: *const SwitchMessage) callconv(.c) noreturn { message.handle(closure.event_loop); std.log.debug("{*} performing async", .{closure.fiber}); - closure.start(closure.context, closure.fiber.resultSlice().ptr); + closure.start(closure.fiber.argsSlice().ptr, closure.fiber.resultSlice().ptr); const awaiter = @atomicRmw(?*Fiber, &closure.fiber.awaiter, .Xchg, Fiber.finished, .acq_rel); closure.event_loop.yield(awaiter, null); unreachable; // switched to dead fiber