mirror of
https://github.com/ziglang/zig.git
synced 2025-12-24 15:13:08 +00:00
EventLoop: implement main idle fiber
This commit is contained in:
parent
2c1ceb4c9c
commit
1e79f2c12f
@ -11,19 +11,21 @@ cond: std.Thread.Condition,
|
||||
queue: std.DoublyLinkedList(void),
|
||||
free: std.DoublyLinkedList(void),
|
||||
main_fiber_buffer: [@sizeOf(Fiber) + max_result_len]u8 align(@alignOf(Fiber)),
|
||||
exiting: bool,
|
||||
exit_awaiter: ?*Fiber,
|
||||
idle_count: usize,
|
||||
threads: std.ArrayListUnmanaged(Thread),
|
||||
|
||||
threadlocal var current_thread: *Thread = undefined;
|
||||
threadlocal var current_fiber: *Fiber = undefined;
|
||||
threadlocal var current_idle_context: *Context = undefined;
|
||||
threadlocal var current_fiber_context: *Context = undefined;
|
||||
|
||||
const max_result_len = 64;
|
||||
const min_stack_size = 4 * 1024 * 1024;
|
||||
const idle_stack_size = 32 * 1024;
|
||||
const stack_align = 16;
|
||||
|
||||
const Thread = struct {
|
||||
thread: std.Thread,
|
||||
idle_fiber: Fiber,
|
||||
idle_context: Context,
|
||||
};
|
||||
|
||||
const Fiber = struct {
|
||||
@ -54,6 +56,11 @@ const Fiber = struct {
|
||||
};
|
||||
|
||||
pub fn init(el: *EventLoop, gpa: Allocator) error{OutOfMemory}!void {
|
||||
const threads_bytes = ((std.Thread.getCpuCount() catch 1) -| 1) * @sizeOf(Thread);
|
||||
const idle_context_offset = std.mem.alignForward(usize, threads_bytes, @alignOf(Context));
|
||||
const idle_stack_end_offset = std.mem.alignForward(usize, idle_context_offset + idle_stack_size, std.heap.page_size_max);
|
||||
const allocated_slice = try gpa.alignedAlloc(u8, @max(@alignOf(Thread), @alignOf(Context), stack_align), idle_stack_end_offset);
|
||||
errdefer gpa.free(allocated_slice);
|
||||
el.* = .{
|
||||
.gpa = gpa,
|
||||
.mutex = .{},
|
||||
@ -61,28 +68,37 @@ pub fn init(el: *EventLoop, gpa: Allocator) error{OutOfMemory}!void {
|
||||
.queue = .{},
|
||||
.free = .{},
|
||||
.main_fiber_buffer = undefined,
|
||||
.exiting = false,
|
||||
.exit_awaiter = null,
|
||||
.idle_count = 0,
|
||||
.threads = try .initCapacity(gpa, @max(std.Thread.getCpuCount() catch 1, 1)),
|
||||
.threads = .initBuffer(@ptrCast(allocated_slice[0..threads_bytes])),
|
||||
};
|
||||
current_thread = el.threads.addOneAssumeCapacity();
|
||||
current_fiber = @ptrCast(&el.main_fiber_buffer);
|
||||
const main_idle_context: *Context = @alignCast(std.mem.bytesAsValue(Context, allocated_slice[idle_context_offset..][0..@sizeOf(Context)]));
|
||||
const idle_stack_end: [*]align(stack_align) usize = @alignCast(@ptrCast(allocated_slice[idle_stack_end_offset..].ptr));
|
||||
(idle_stack_end - 1)[0..1].* = .{@intFromPtr(el)};
|
||||
main_idle_context.* = .{
|
||||
.rsp = @intFromPtr(idle_stack_end - 1),
|
||||
.rbp = 0,
|
||||
.rip = @intFromPtr(&mainIdleEntry),
|
||||
};
|
||||
std.log.debug("created main idle {*}", .{main_idle_context});
|
||||
current_idle_context = main_idle_context;
|
||||
const current_fiber: *Fiber = @ptrCast(&el.main_fiber_buffer);
|
||||
std.log.debug("created main fiber {*}", .{current_fiber});
|
||||
current_fiber_context = ¤t_fiber.context;
|
||||
}
|
||||
|
||||
pub fn deinit(el: *EventLoop) void {
|
||||
{
|
||||
el.mutex.lock();
|
||||
defer el.mutex.unlock();
|
||||
assert(el.queue.len == 0); // pending async
|
||||
el.exiting = true;
|
||||
}
|
||||
el.cond.broadcast();
|
||||
assert(el.queue.len == 0); // pending async
|
||||
el.yield(null, &el.exit_awaiter);
|
||||
while (el.free.pop()) |free_node| {
|
||||
const free_fiber: *Fiber = @fieldParentPtr("queue_node", free_node);
|
||||
el.gpa.free(free_fiber.allocatedSlice());
|
||||
}
|
||||
for (el.threads.items[1..]) |*thread| thread.thread.join();
|
||||
el.threads.deinit(el.gpa);
|
||||
const idle_context_offset = std.mem.alignForward(usize, el.threads.items.len * @sizeOf(Thread), @alignOf(Context));
|
||||
const idle_stack_end = std.mem.alignForward(usize, idle_context_offset + idle_stack_size, std.heap.page_size_max);
|
||||
const allocated_ptr: [*]align(@max(@alignOf(Thread), @alignOf(Context), stack_align)) u8 = @alignCast(@ptrCast(el.threads.items.ptr));
|
||||
for (el.threads.items) |*thread| thread.thread.join();
|
||||
el.gpa.free(allocated_ptr[0..idle_stack_end]);
|
||||
}
|
||||
|
||||
fn allocateFiber(el: *EventLoop, result_len: usize) error{OutOfMemory}!*Fiber {
|
||||
@ -103,40 +119,44 @@ fn allocateFiber(el: *EventLoop, result_len: usize) error{OutOfMemory}!*Fiber {
|
||||
}
|
||||
|
||||
fn yield(el: *EventLoop, optional_fiber: ?*Fiber, register_awaiter: ?*?*Fiber) void {
|
||||
const ready_fiber: *Fiber = optional_fiber orelse if (ready_node: {
|
||||
el.mutex.lock();
|
||||
defer el.mutex.unlock();
|
||||
break :ready_node el.queue.pop();
|
||||
}) |ready_node|
|
||||
@fieldParentPtr("queue_node", ready_node)
|
||||
else
|
||||
¤t_thread.idle_fiber;
|
||||
const ready_context: *Context = ready_context: {
|
||||
const ready_fiber: *Fiber = optional_fiber orelse if (ready_node: {
|
||||
el.mutex.lock();
|
||||
defer el.mutex.unlock();
|
||||
break :ready_node el.queue.pop();
|
||||
}) |ready_node|
|
||||
@fieldParentPtr("queue_node", ready_node)
|
||||
else
|
||||
break :ready_context current_idle_context;
|
||||
break :ready_context &ready_fiber.context;
|
||||
};
|
||||
const message: SwitchMessage = .{
|
||||
.prev_context = ¤t_fiber.context,
|
||||
.ready_context = &ready_fiber.context,
|
||||
.prev_context = current_fiber_context,
|
||||
.ready_context = ready_context,
|
||||
.register_awaiter = register_awaiter,
|
||||
};
|
||||
std.log.debug("switching from {*} to {*}", .{
|
||||
@as(*Fiber, @fieldParentPtr("context", message.prev_context)),
|
||||
@as(*Fiber, @fieldParentPtr("context", message.ready_context)),
|
||||
});
|
||||
std.log.debug("switching from {*} to {*}", .{ message.prev_context, message.ready_context });
|
||||
contextSwitch(&message).handle(el);
|
||||
}
|
||||
|
||||
fn schedule(el: *EventLoop, fiber: *Fiber) void {
|
||||
signal: {
|
||||
el.mutex.lock();
|
||||
defer el.mutex.unlock();
|
||||
el.queue.append(&fiber.queue_node);
|
||||
if (el.idle_count > 0) break :signal;
|
||||
if (el.threads.items.len == el.threads.capacity) return;
|
||||
const thread = el.threads.addOneAssumeCapacity();
|
||||
thread.thread = std.Thread.spawn(.{
|
||||
.stack_size = min_stack_size,
|
||||
.allocator = el.gpa,
|
||||
}, threadEntry, .{ el, thread }) catch return;
|
||||
el.mutex.lock();
|
||||
el.queue.append(&fiber.queue_node);
|
||||
if (el.idle_count > 0) {
|
||||
el.mutex.unlock();
|
||||
el.cond.signal();
|
||||
return;
|
||||
}
|
||||
el.cond.signal();
|
||||
defer el.mutex.unlock();
|
||||
if (el.threads.items.len == el.threads.capacity) return;
|
||||
const thread = el.threads.addOneAssumeCapacity();
|
||||
thread.thread = std.Thread.spawn(.{
|
||||
.stack_size = idle_stack_size,
|
||||
.allocator = el.gpa,
|
||||
}, threadEntry, .{ el, thread }) catch {
|
||||
el.threads.items.len -= 1;
|
||||
return;
|
||||
};
|
||||
}
|
||||
|
||||
fn recycle(el: *EventLoop, fiber: *Fiber) void {
|
||||
@ -148,14 +168,28 @@ fn recycle(el: *EventLoop, fiber: *Fiber) void {
|
||||
el.free.append(&fiber.queue_node);
|
||||
}
|
||||
|
||||
fn mainIdle(el: *EventLoop, message: *const SwitchMessage) callconv(.c) noreturn {
|
||||
message.handle(el);
|
||||
el.yield(el.idle(), null);
|
||||
unreachable; // switched to dead fiber
|
||||
}
|
||||
|
||||
fn threadEntry(el: *EventLoop, thread: *Thread) void {
|
||||
current_thread = thread;
|
||||
current_fiber = &thread.idle_fiber;
|
||||
std.log.debug("created thread idle {*}", .{&thread.idle_context});
|
||||
current_idle_context = &thread.idle_context;
|
||||
current_fiber_context = &thread.idle_context;
|
||||
_ = el.idle();
|
||||
}
|
||||
|
||||
fn idle(el: *EventLoop) *Fiber {
|
||||
while (true) {
|
||||
el.yield(null, null);
|
||||
if (@atomicLoad(?*Fiber, &el.exit_awaiter, .acquire)) |exit_awaiter| {
|
||||
el.cond.broadcast();
|
||||
return exit_awaiter;
|
||||
}
|
||||
el.mutex.lock();
|
||||
defer el.mutex.unlock();
|
||||
if (el.exiting) return;
|
||||
el.idle_count += 1;
|
||||
defer el.idle_count -= 1;
|
||||
el.cond.wait(&el.mutex);
|
||||
@ -169,7 +203,7 @@ const SwitchMessage = extern struct {
|
||||
|
||||
fn handle(message: *const SwitchMessage, el: *EventLoop) void {
|
||||
const prev_fiber: *Fiber = @fieldParentPtr("context", message.prev_context);
|
||||
current_fiber = @fieldParentPtr("context", message.ready_context);
|
||||
current_fiber_context = message.ready_context;
|
||||
if (message.register_awaiter) |awaiter| if (@atomicRmw(?*Fiber, awaiter, .Xchg, prev_fiber, .acq_rel) == Fiber.finished) el.schedule(prev_fiber);
|
||||
}
|
||||
};
|
||||
@ -208,6 +242,18 @@ inline fn contextSwitch(message: *const SwitchMessage) *const SwitchMessage {
|
||||
};
|
||||
}
|
||||
|
||||
fn mainIdleEntry() callconv(.naked) void {
|
||||
switch (builtin.cpu.arch) {
|
||||
.x86_64 => asm volatile (
|
||||
\\ movq (%%rsp), %%rdi
|
||||
\\ jmp %[mainIdle:P]
|
||||
:
|
||||
: [mainIdle] "X" (&mainIdle),
|
||||
),
|
||||
else => |arch| @compileError("unimplemented architecture: " ++ @tagName(arch)),
|
||||
}
|
||||
}
|
||||
|
||||
fn fiberEntry() callconv(.naked) void {
|
||||
switch (builtin.cpu.arch) {
|
||||
.x86_64 => asm volatile (
|
||||
@ -238,7 +284,7 @@ pub fn @"async"(
|
||||
const closure: *AsyncClosure = @ptrFromInt(std.mem.alignBackward(
|
||||
usize,
|
||||
@intFromPtr(fiber.stackEndPointer() - @sizeOf(AsyncClosure)),
|
||||
@alignOf(AsyncClosure),
|
||||
@max(@alignOf(AsyncClosure), stack_align),
|
||||
));
|
||||
closure.* = .{
|
||||
.event_loop = event_loop,
|
||||
@ -246,7 +292,7 @@ pub fn @"async"(
|
||||
.fiber = fiber,
|
||||
.start = start,
|
||||
};
|
||||
const stack_end: [*]align(16) usize = @alignCast(@ptrCast(closure));
|
||||
const stack_end: [*]align(stack_align) usize = @alignCast(@ptrCast(closure));
|
||||
fiber.context = .{
|
||||
.rsp = @intFromPtr(stack_end - 1),
|
||||
.rbp = 0,
|
||||
@ -258,7 +304,6 @@ pub fn @"async"(
|
||||
}
|
||||
|
||||
const AsyncClosure = struct {
|
||||
_: void align(16) = {},
|
||||
event_loop: *EventLoop,
|
||||
context: ?*anyopaque,
|
||||
fiber: *Fiber,
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user