diff --git a/lib/std/Io/EventLoop.zig b/lib/std/Io/EventLoop.zig index 1837798296..1fda9cb3d1 100644 --- a/lib/std/Io/EventLoop.zig +++ b/lib/std/Io/EventLoop.zig @@ -7,15 +7,25 @@ const EventLoop = @This(); gpa: Allocator, mutex: std.Thread.Mutex, +cond: std.Thread.Condition, queue: std.DoublyLinkedList(void), free: std.DoublyLinkedList(void), main_fiber_buffer: [@sizeOf(Fiber) + max_result_len]u8 align(@alignOf(Fiber)), +exiting: bool, +idle_count: usize, +threads: std.ArrayListUnmanaged(Thread), +threadlocal var current_thread: *Thread = undefined; threadlocal var current_fiber: *Fiber = undefined; const max_result_len = 64; const min_stack_size = 4 * 1024 * 1024; +const Thread = struct { + thread: std.Thread, + idle_fiber: Fiber, +}; + const Fiber = struct { context: Context, awaiter: ?*Fiber, @@ -23,32 +33,58 @@ const Fiber = struct { const finished: ?*Fiber = @ptrFromInt(std.mem.alignBackward(usize, std.math.maxInt(usize), @alignOf(Fiber))); - fn resultPointer(f: *Fiber) [*]u8 { - const base: [*]u8 = @ptrCast(f); - return base + @sizeOf(Fiber); - } - - fn stackEndPointer(f: *Fiber) [*]u8 { - const base: [*]u8 = @ptrCast(f); - return base + std.mem.alignForward( + fn allocatedSlice(f: *Fiber) []align(@alignOf(Fiber)) u8 { + const base: [*]align(@alignOf(Fiber)) u8 = @ptrCast(f); + return base[0..std.mem.alignForward( usize, @sizeOf(Fiber) + max_result_len + min_stack_size, std.heap.page_size_max, - ); + )]; + } + + fn resultSlice(f: *Fiber) []u8 { + const base: [*]align(@alignOf(Fiber)) u8 = @ptrCast(f); + return base[@sizeOf(Fiber)..][0..max_result_len]; + } + + fn stackEndPointer(f: *Fiber) [*]u8 { + const allocated_slice = f.allocatedSlice(); + return allocated_slice[allocated_slice.len..].ptr; } }; -pub fn init(el: *EventLoop, gpa: Allocator) void { +pub fn init(el: *EventLoop, gpa: Allocator) error{OutOfMemory}!void { el.* = .{ .gpa = gpa, .mutex = .{}, + .cond = .{}, .queue = .{}, .free = .{}, .main_fiber_buffer = undefined, + .exiting = false, + .idle_count = 0, + .threads = try .initCapacity(gpa, @max(std.Thread.getCpuCount() catch 1, 1)), }; + current_thread = el.threads.addOneAssumeCapacity(); current_fiber = @ptrCast(&el.main_fiber_buffer); } +pub fn deinit(el: *EventLoop) void { + { + el.mutex.lock(); + defer el.mutex.unlock(); + assert(el.queue.len == 0); // pending async + el.exiting = true; + } + el.cond.broadcast(); + while (el.free.pop()) |free_node| { + const free_fiber: *Fiber = @fieldParentPtr("queue_node", free_node); + el.gpa.free(free_fiber.allocatedSlice()); + } + for (el.threads.items[1..]) |*thread| thread.thread.join(); + el.threads.deinit(el.gpa); +} + fn allocateFiber(el: *EventLoop, result_len: usize) error{OutOfMemory}!*Fiber { assert(result_len <= max_result_len); const free_node = free_node: { @@ -73,10 +109,8 @@ fn yield(el: *EventLoop, optional_fiber: ?*Fiber, register_awaiter: ?*?*Fiber) v break :ready_node el.queue.pop(); }) |ready_node| @fieldParentPtr("queue_node", ready_node) - else if (register_awaiter) |_| // time to switch to an idle fiber? - @panic("no other fiber to switch to in order to be able to register this fiber as an awaiter") - else // nothing to do - return; + else + ¤t_thread.idle_fiber; const message: SwitchMessage = .{ .prev_context = ¤t_fiber.context, .ready_context = &ready_fiber.context, @@ -90,20 +124,44 @@ fn yield(el: *EventLoop, optional_fiber: ?*Fiber, register_awaiter: ?*?*Fiber) v } fn schedule(el: *EventLoop, fiber: *Fiber) void { - el.mutex.lock(); - defer el.mutex.unlock(); - el.queue.append(&fiber.queue_node); + signal: { + el.mutex.lock(); + defer el.mutex.unlock(); + el.queue.append(&fiber.queue_node); + if (el.idle_count > 0) break :signal; + if (el.threads.items.len == el.threads.capacity) return; + const thread = el.threads.addOneAssumeCapacity(); + thread.thread = std.Thread.spawn(.{ + .stack_size = min_stack_size, + .allocator = el.gpa, + }, threadEntry, .{ el, thread }) catch return; + } + el.cond.signal(); } fn recycle(el: *EventLoop, fiber: *Fiber) void { std.log.debug("recyling {*}", .{fiber}); fiber.awaiter = undefined; - @memset(fiber.resultPointer()[0..max_result_len], undefined); + @memset(fiber.resultSlice(), undefined); el.mutex.lock(); defer el.mutex.unlock(); el.free.append(&fiber.queue_node); } +fn threadEntry(el: *EventLoop, thread: *Thread) void { + current_thread = thread; + current_fiber = &thread.idle_fiber; + while (true) { + el.yield(null, null); + el.mutex.lock(); + defer el.mutex.unlock(); + if (el.exiting) return; + el.idle_count += 1; + defer el.idle_count -= 1; + el.cond.wait(&el.mutex); + } +} + const SwitchMessage = extern struct { prev_context: *Context, ready_context: *Context, @@ -209,7 +267,7 @@ const AsyncClosure = struct { fn call(closure: *AsyncClosure, message: *const SwitchMessage) callconv(.c) noreturn { message.handle(closure.event_loop); std.log.debug("{*} performing async", .{closure.fiber}); - closure.start(closure.context, closure.fiber.resultPointer()); + closure.start(closure.context, closure.fiber.resultSlice().ptr); const awaiter = @atomicRmw(?*Fiber, &closure.fiber.awaiter, .Xchg, Fiber.finished, .acq_rel); closure.event_loop.yield(awaiter, null); unreachable; // switched to dead fiber @@ -219,7 +277,7 @@ const AsyncClosure = struct { pub fn @"await"(userdata: ?*anyopaque, any_future: *std.Io.AnyFuture, result: []u8) void { const event_loop: *EventLoop = @alignCast(@ptrCast(userdata)); const future_fiber: *Fiber = @alignCast(@ptrCast(any_future)); - const result_src = future_fiber.resultPointer()[0..result.len]; + const result_src = future_fiber.resultSlice()[0..result.len]; if (@atomicLoad(?*Fiber, &future_fiber.awaiter, .acquire) != Fiber.finished) event_loop.yield(null, &future_fiber.awaiter); @memcpy(result, result_src); event_loop.recycle(future_fiber);