mirror of
https://github.com/ziglang/zig.git
synced 2025-12-06 06:13:07 +00:00
Mostly picking the same paths as FreeBSD. We need a little special handling for crt files, as netbsd uses its own (and not GCC's) for those, with slightly different names.
913 lines
38 KiB
Zig
913 lines
38 KiB
Zig
const std = @import("../index.zig");
|
|
const builtin = @import("builtin");
|
|
const assert = std.debug.assert;
|
|
const testing = std.testing;
|
|
const mem = std.mem;
|
|
const AtomicRmwOp = builtin.AtomicRmwOp;
|
|
const AtomicOrder = builtin.AtomicOrder;
|
|
const fs = std.event.fs;
|
|
const os = std.os;
|
|
const posix = os.posix;
|
|
const windows = os.windows;
|
|
const maxInt = std.math.maxInt;
|
|
|
|
pub const Loop = struct {
|
|
allocator: *mem.Allocator,
|
|
next_tick_queue: std.atomic.Queue(promise),
|
|
os_data: OsData,
|
|
final_resume_node: ResumeNode,
|
|
pending_event_count: usize,
|
|
extra_threads: []*os.Thread,
|
|
|
|
// pre-allocated eventfds. all permanently active.
|
|
// this is how we send promises to be resumed on other threads.
|
|
available_eventfd_resume_nodes: std.atomic.Stack(ResumeNode.EventFd),
|
|
eventfd_resume_nodes: []std.atomic.Stack(ResumeNode.EventFd).Node,
|
|
|
|
pub const NextTickNode = std.atomic.Queue(promise).Node;
|
|
|
|
pub const ResumeNode = struct {
|
|
id: Id,
|
|
handle: promise,
|
|
overlapped: Overlapped,
|
|
|
|
pub const overlapped_init = switch (builtin.os) {
|
|
builtin.Os.windows => windows.OVERLAPPED{
|
|
.Internal = 0,
|
|
.InternalHigh = 0,
|
|
.Offset = 0,
|
|
.OffsetHigh = 0,
|
|
.hEvent = null,
|
|
},
|
|
else => {},
|
|
};
|
|
pub const Overlapped = @typeOf(overlapped_init);
|
|
|
|
pub const Id = enum {
|
|
Basic,
|
|
Stop,
|
|
EventFd,
|
|
};
|
|
|
|
pub const EventFd = switch (builtin.os) {
|
|
builtin.Os.macosx, builtin.Os.freebsd, builtin.Os.netbsd => KEventFd,
|
|
builtin.Os.linux => struct {
|
|
base: ResumeNode,
|
|
epoll_op: u32,
|
|
eventfd: i32,
|
|
},
|
|
builtin.Os.windows => struct {
|
|
base: ResumeNode,
|
|
completion_key: usize,
|
|
},
|
|
else => @compileError("unsupported OS"),
|
|
};
|
|
|
|
const KEventFd = struct {
|
|
base: ResumeNode,
|
|
kevent: posix.Kevent,
|
|
};
|
|
|
|
pub const Basic = switch (builtin.os) {
|
|
builtin.Os.macosx, builtin.Os.freebsd, builtin.Os.netbsd => KEventBasic,
|
|
builtin.Os.linux => struct {
|
|
base: ResumeNode,
|
|
},
|
|
builtin.Os.windows => struct {
|
|
base: ResumeNode,
|
|
},
|
|
else => @compileError("unsupported OS"),
|
|
};
|
|
|
|
const KEventBasic = struct {
|
|
base: ResumeNode,
|
|
kev: posix.Kevent,
|
|
};
|
|
};
|
|
|
|
/// After initialization, call run().
|
|
/// TODO copy elision / named return values so that the threads referencing *Loop
|
|
/// have the correct pointer value.
|
|
pub fn initSingleThreaded(self: *Loop, allocator: *mem.Allocator) !void {
|
|
return self.initInternal(allocator, 1);
|
|
}
|
|
|
|
/// The allocator must be thread-safe because we use it for multiplexing
|
|
/// coroutines onto kernel threads.
|
|
/// After initialization, call run().
|
|
/// TODO copy elision / named return values so that the threads referencing *Loop
|
|
/// have the correct pointer value.
|
|
pub fn initMultiThreaded(self: *Loop, allocator: *mem.Allocator) !void {
|
|
if (builtin.single_threaded) @compileError("initMultiThreaded unavailable when building in single-threaded mode");
|
|
const core_count = try os.cpuCount(allocator);
|
|
return self.initInternal(allocator, core_count);
|
|
}
|
|
|
|
/// Thread count is the total thread count. The thread pool size will be
|
|
/// max(thread_count - 1, 0)
|
|
fn initInternal(self: *Loop, allocator: *mem.Allocator, thread_count: usize) !void {
|
|
self.* = Loop{
|
|
.pending_event_count = 1,
|
|
.allocator = allocator,
|
|
.os_data = undefined,
|
|
.next_tick_queue = std.atomic.Queue(promise).init(),
|
|
.extra_threads = undefined,
|
|
.available_eventfd_resume_nodes = std.atomic.Stack(ResumeNode.EventFd).init(),
|
|
.eventfd_resume_nodes = undefined,
|
|
.final_resume_node = ResumeNode{
|
|
.id = ResumeNode.Id.Stop,
|
|
.handle = undefined,
|
|
.overlapped = ResumeNode.overlapped_init,
|
|
},
|
|
};
|
|
const extra_thread_count = thread_count - 1;
|
|
self.eventfd_resume_nodes = try self.allocator.alloc(
|
|
std.atomic.Stack(ResumeNode.EventFd).Node,
|
|
extra_thread_count,
|
|
);
|
|
errdefer self.allocator.free(self.eventfd_resume_nodes);
|
|
|
|
self.extra_threads = try self.allocator.alloc(*os.Thread, extra_thread_count);
|
|
errdefer self.allocator.free(self.extra_threads);
|
|
|
|
try self.initOsData(extra_thread_count);
|
|
errdefer self.deinitOsData();
|
|
}
|
|
|
|
pub fn deinit(self: *Loop) void {
|
|
self.deinitOsData();
|
|
self.allocator.free(self.extra_threads);
|
|
}
|
|
|
|
const InitOsDataError = os.LinuxEpollCreateError || mem.Allocator.Error || os.LinuxEventFdError ||
|
|
os.SpawnThreadError || os.LinuxEpollCtlError || os.BsdKEventError ||
|
|
os.WindowsCreateIoCompletionPortError;
|
|
|
|
const wakeup_bytes = []u8{0x1} ** 8;
|
|
|
|
fn initOsData(self: *Loop, extra_thread_count: usize) InitOsDataError!void {
|
|
switch (builtin.os) {
|
|
builtin.Os.linux => {
|
|
self.os_data.fs_queue = std.atomic.Queue(fs.Request).init();
|
|
self.os_data.fs_queue_item = 0;
|
|
// we need another thread for the file system because Linux does not have an async
|
|
// file system I/O API.
|
|
self.os_data.fs_end_request = fs.RequestNode{
|
|
.prev = undefined,
|
|
.next = undefined,
|
|
.data = fs.Request{
|
|
.msg = fs.Request.Msg.End,
|
|
.finish = fs.Request.Finish.NoAction,
|
|
},
|
|
};
|
|
|
|
errdefer {
|
|
while (self.available_eventfd_resume_nodes.pop()) |node| os.close(node.data.eventfd);
|
|
}
|
|
for (self.eventfd_resume_nodes) |*eventfd_node| {
|
|
eventfd_node.* = std.atomic.Stack(ResumeNode.EventFd).Node{
|
|
.data = ResumeNode.EventFd{
|
|
.base = ResumeNode{
|
|
.id = ResumeNode.Id.EventFd,
|
|
.handle = undefined,
|
|
.overlapped = ResumeNode.overlapped_init,
|
|
},
|
|
.eventfd = try os.linuxEventFd(1, posix.EFD_CLOEXEC | posix.EFD_NONBLOCK),
|
|
.epoll_op = posix.EPOLL_CTL_ADD,
|
|
},
|
|
.next = undefined,
|
|
};
|
|
self.available_eventfd_resume_nodes.push(eventfd_node);
|
|
}
|
|
|
|
self.os_data.epollfd = try os.linuxEpollCreate(posix.EPOLL_CLOEXEC);
|
|
errdefer os.close(self.os_data.epollfd);
|
|
|
|
self.os_data.final_eventfd = try os.linuxEventFd(0, posix.EFD_CLOEXEC | posix.EFD_NONBLOCK);
|
|
errdefer os.close(self.os_data.final_eventfd);
|
|
|
|
self.os_data.final_eventfd_event = posix.epoll_event{
|
|
.events = posix.EPOLLIN,
|
|
.data = posix.epoll_data{ .ptr = @ptrToInt(&self.final_resume_node) },
|
|
};
|
|
try os.linuxEpollCtl(
|
|
self.os_data.epollfd,
|
|
posix.EPOLL_CTL_ADD,
|
|
self.os_data.final_eventfd,
|
|
&self.os_data.final_eventfd_event,
|
|
);
|
|
|
|
self.os_data.fs_thread = try os.spawnThread(self, posixFsRun);
|
|
errdefer {
|
|
self.posixFsRequest(&self.os_data.fs_end_request);
|
|
self.os_data.fs_thread.wait();
|
|
}
|
|
|
|
if (builtin.single_threaded) {
|
|
assert(extra_thread_count == 0);
|
|
return;
|
|
}
|
|
|
|
var extra_thread_index: usize = 0;
|
|
errdefer {
|
|
// writing 8 bytes to an eventfd cannot fail
|
|
os.posixWrite(self.os_data.final_eventfd, wakeup_bytes) catch unreachable;
|
|
while (extra_thread_index != 0) {
|
|
extra_thread_index -= 1;
|
|
self.extra_threads[extra_thread_index].wait();
|
|
}
|
|
}
|
|
while (extra_thread_index < extra_thread_count) : (extra_thread_index += 1) {
|
|
self.extra_threads[extra_thread_index] = try os.spawnThread(self, workerRun);
|
|
}
|
|
},
|
|
builtin.Os.macosx, builtin.Os.freebsd, builtin.Os.netbsd => {
|
|
self.os_data.kqfd = try os.bsdKQueue();
|
|
errdefer os.close(self.os_data.kqfd);
|
|
|
|
self.os_data.fs_kqfd = try os.bsdKQueue();
|
|
errdefer os.close(self.os_data.fs_kqfd);
|
|
|
|
self.os_data.fs_queue = std.atomic.Queue(fs.Request).init();
|
|
// we need another thread for the file system because Darwin does not have an async
|
|
// file system I/O API.
|
|
self.os_data.fs_end_request = fs.RequestNode{
|
|
.prev = undefined,
|
|
.next = undefined,
|
|
.data = fs.Request{
|
|
.msg = fs.Request.Msg.End,
|
|
.finish = fs.Request.Finish.NoAction,
|
|
},
|
|
};
|
|
|
|
const empty_kevs = ([*]posix.Kevent)(undefined)[0..0];
|
|
|
|
for (self.eventfd_resume_nodes) |*eventfd_node, i| {
|
|
eventfd_node.* = std.atomic.Stack(ResumeNode.EventFd).Node{
|
|
.data = ResumeNode.EventFd{
|
|
.base = ResumeNode{
|
|
.id = ResumeNode.Id.EventFd,
|
|
.handle = undefined,
|
|
.overlapped = ResumeNode.overlapped_init,
|
|
},
|
|
// this one is for sending events
|
|
.kevent = posix.Kevent{
|
|
.ident = i,
|
|
.filter = posix.EVFILT_USER,
|
|
.flags = posix.EV_CLEAR | posix.EV_ADD | posix.EV_DISABLE,
|
|
.fflags = 0,
|
|
.data = 0,
|
|
.udata = @ptrToInt(&eventfd_node.data.base),
|
|
},
|
|
},
|
|
.next = undefined,
|
|
};
|
|
self.available_eventfd_resume_nodes.push(eventfd_node);
|
|
const kevent_array = (*[1]posix.Kevent)(&eventfd_node.data.kevent);
|
|
_ = try os.bsdKEvent(self.os_data.kqfd, kevent_array, empty_kevs, null);
|
|
eventfd_node.data.kevent.flags = posix.EV_CLEAR | posix.EV_ENABLE;
|
|
eventfd_node.data.kevent.fflags = posix.NOTE_TRIGGER;
|
|
}
|
|
|
|
// Pre-add so that we cannot get error.SystemResources
|
|
// later when we try to activate it.
|
|
self.os_data.final_kevent = posix.Kevent{
|
|
.ident = extra_thread_count,
|
|
.filter = posix.EVFILT_USER,
|
|
.flags = posix.EV_ADD | posix.EV_DISABLE,
|
|
.fflags = 0,
|
|
.data = 0,
|
|
.udata = @ptrToInt(&self.final_resume_node),
|
|
};
|
|
const final_kev_arr = (*[1]posix.Kevent)(&self.os_data.final_kevent);
|
|
_ = try os.bsdKEvent(self.os_data.kqfd, final_kev_arr, empty_kevs, null);
|
|
self.os_data.final_kevent.flags = posix.EV_ENABLE;
|
|
self.os_data.final_kevent.fflags = posix.NOTE_TRIGGER;
|
|
|
|
self.os_data.fs_kevent_wake = posix.Kevent{
|
|
.ident = 0,
|
|
.filter = posix.EVFILT_USER,
|
|
.flags = posix.EV_ADD | posix.EV_ENABLE,
|
|
.fflags = posix.NOTE_TRIGGER,
|
|
.data = 0,
|
|
.udata = undefined,
|
|
};
|
|
|
|
self.os_data.fs_kevent_wait = posix.Kevent{
|
|
.ident = 0,
|
|
.filter = posix.EVFILT_USER,
|
|
.flags = posix.EV_ADD | posix.EV_CLEAR,
|
|
.fflags = 0,
|
|
.data = 0,
|
|
.udata = undefined,
|
|
};
|
|
|
|
self.os_data.fs_thread = try os.spawnThread(self, posixFsRun);
|
|
errdefer {
|
|
self.posixFsRequest(&self.os_data.fs_end_request);
|
|
self.os_data.fs_thread.wait();
|
|
}
|
|
|
|
if (builtin.single_threaded) {
|
|
assert(extra_thread_count == 0);
|
|
return;
|
|
}
|
|
|
|
var extra_thread_index: usize = 0;
|
|
errdefer {
|
|
_ = os.bsdKEvent(self.os_data.kqfd, final_kev_arr, empty_kevs, null) catch unreachable;
|
|
while (extra_thread_index != 0) {
|
|
extra_thread_index -= 1;
|
|
self.extra_threads[extra_thread_index].wait();
|
|
}
|
|
}
|
|
while (extra_thread_index < extra_thread_count) : (extra_thread_index += 1) {
|
|
self.extra_threads[extra_thread_index] = try os.spawnThread(self, workerRun);
|
|
}
|
|
},
|
|
builtin.Os.windows => {
|
|
self.os_data.io_port = try os.windowsCreateIoCompletionPort(
|
|
windows.INVALID_HANDLE_VALUE,
|
|
null,
|
|
undefined,
|
|
maxInt(windows.DWORD),
|
|
);
|
|
errdefer os.close(self.os_data.io_port);
|
|
|
|
for (self.eventfd_resume_nodes) |*eventfd_node, i| {
|
|
eventfd_node.* = std.atomic.Stack(ResumeNode.EventFd).Node{
|
|
.data = ResumeNode.EventFd{
|
|
.base = ResumeNode{
|
|
.id = ResumeNode.Id.EventFd,
|
|
.handle = undefined,
|
|
.overlapped = ResumeNode.overlapped_init,
|
|
},
|
|
// this one is for sending events
|
|
.completion_key = @ptrToInt(&eventfd_node.data.base),
|
|
},
|
|
.next = undefined,
|
|
};
|
|
self.available_eventfd_resume_nodes.push(eventfd_node);
|
|
}
|
|
|
|
if (builtin.single_threaded) {
|
|
assert(extra_thread_count == 0);
|
|
return;
|
|
}
|
|
|
|
var extra_thread_index: usize = 0;
|
|
errdefer {
|
|
var i: usize = 0;
|
|
while (i < extra_thread_index) : (i += 1) {
|
|
while (true) {
|
|
const overlapped = &self.final_resume_node.overlapped;
|
|
os.windowsPostQueuedCompletionStatus(self.os_data.io_port, undefined, undefined, overlapped) catch continue;
|
|
break;
|
|
}
|
|
}
|
|
while (extra_thread_index != 0) {
|
|
extra_thread_index -= 1;
|
|
self.extra_threads[extra_thread_index].wait();
|
|
}
|
|
}
|
|
while (extra_thread_index < extra_thread_count) : (extra_thread_index += 1) {
|
|
self.extra_threads[extra_thread_index] = try os.spawnThread(self, workerRun);
|
|
}
|
|
},
|
|
else => {},
|
|
}
|
|
}
|
|
|
|
fn deinitOsData(self: *Loop) void {
|
|
switch (builtin.os) {
|
|
builtin.Os.linux => {
|
|
os.close(self.os_data.final_eventfd);
|
|
while (self.available_eventfd_resume_nodes.pop()) |node| os.close(node.data.eventfd);
|
|
os.close(self.os_data.epollfd);
|
|
self.allocator.free(self.eventfd_resume_nodes);
|
|
},
|
|
builtin.Os.macosx, builtin.Os.freebsd, builtin.Os.netbsd => {
|
|
os.close(self.os_data.kqfd);
|
|
os.close(self.os_data.fs_kqfd);
|
|
},
|
|
builtin.Os.windows => {
|
|
os.close(self.os_data.io_port);
|
|
},
|
|
else => {},
|
|
}
|
|
}
|
|
|
|
/// resume_node must live longer than the promise that it holds a reference to.
|
|
/// flags must contain EPOLLET
|
|
pub fn linuxAddFd(self: *Loop, fd: i32, resume_node: *ResumeNode, flags: u32) !void {
|
|
assert(flags & posix.EPOLLET == posix.EPOLLET);
|
|
self.beginOneEvent();
|
|
errdefer self.finishOneEvent();
|
|
try self.linuxModFd(
|
|
fd,
|
|
posix.EPOLL_CTL_ADD,
|
|
flags,
|
|
resume_node,
|
|
);
|
|
}
|
|
|
|
pub fn linuxModFd(self: *Loop, fd: i32, op: u32, flags: u32, resume_node: *ResumeNode) !void {
|
|
assert(flags & posix.EPOLLET == posix.EPOLLET);
|
|
var ev = os.linux.epoll_event{
|
|
.events = flags,
|
|
.data = os.linux.epoll_data{ .ptr = @ptrToInt(resume_node) },
|
|
};
|
|
try os.linuxEpollCtl(self.os_data.epollfd, op, fd, &ev);
|
|
}
|
|
|
|
pub fn linuxRemoveFd(self: *Loop, fd: i32) void {
|
|
os.linuxEpollCtl(self.os_data.epollfd, os.linux.EPOLL_CTL_DEL, fd, undefined) catch {};
|
|
self.finishOneEvent();
|
|
}
|
|
|
|
pub async fn linuxWaitFd(self: *Loop, fd: i32, flags: u32) !void {
|
|
defer self.linuxRemoveFd(fd);
|
|
suspend {
|
|
// TODO explicitly put this memory in the coroutine frame #1194
|
|
var resume_node = ResumeNode.Basic{
|
|
.base = ResumeNode{
|
|
.id = ResumeNode.Id.Basic,
|
|
.handle = @handle(),
|
|
.overlapped = ResumeNode.overlapped_init,
|
|
},
|
|
};
|
|
try self.linuxAddFd(fd, &resume_node.base, flags);
|
|
}
|
|
}
|
|
|
|
pub async fn bsdWaitKev(self: *Loop, ident: usize, filter: i16, fflags: u32) !posix.Kevent {
|
|
// TODO #1194
|
|
suspend {
|
|
resume @handle();
|
|
}
|
|
var resume_node = ResumeNode.Basic{
|
|
.base = ResumeNode{
|
|
.id = ResumeNode.Id.Basic,
|
|
.handle = @handle(),
|
|
.overlapped = ResumeNode.overlapped_init,
|
|
},
|
|
.kev = undefined,
|
|
};
|
|
defer self.bsdRemoveKev(ident, filter);
|
|
suspend {
|
|
try self.bsdAddKev(&resume_node, ident, filter, fflags);
|
|
}
|
|
return resume_node.kev;
|
|
}
|
|
|
|
/// resume_node must live longer than the promise that it holds a reference to.
|
|
pub fn bsdAddKev(self: *Loop, resume_node: *ResumeNode.Basic, ident: usize, filter: i16, fflags: u32) !void {
|
|
self.beginOneEvent();
|
|
errdefer self.finishOneEvent();
|
|
var kev = posix.Kevent{
|
|
.ident = ident,
|
|
.filter = filter,
|
|
.flags = posix.EV_ADD | posix.EV_ENABLE | posix.EV_CLEAR,
|
|
.fflags = fflags,
|
|
.data = 0,
|
|
.udata = @ptrToInt(&resume_node.base),
|
|
};
|
|
const kevent_array = (*[1]posix.Kevent)(&kev);
|
|
const empty_kevs = ([*]posix.Kevent)(undefined)[0..0];
|
|
_ = try os.bsdKEvent(self.os_data.kqfd, kevent_array, empty_kevs, null);
|
|
}
|
|
|
|
pub fn bsdRemoveKev(self: *Loop, ident: usize, filter: i16) void {
|
|
var kev = posix.Kevent{
|
|
.ident = ident,
|
|
.filter = filter,
|
|
.flags = posix.EV_DELETE,
|
|
.fflags = 0,
|
|
.data = 0,
|
|
.udata = 0,
|
|
};
|
|
const kevent_array = (*[1]posix.Kevent)(&kev);
|
|
const empty_kevs = ([*]posix.Kevent)(undefined)[0..0];
|
|
_ = os.bsdKEvent(self.os_data.kqfd, kevent_array, empty_kevs, null) catch undefined;
|
|
self.finishOneEvent();
|
|
}
|
|
|
|
fn dispatch(self: *Loop) void {
|
|
while (self.available_eventfd_resume_nodes.pop()) |resume_stack_node| {
|
|
const next_tick_node = self.next_tick_queue.get() orelse {
|
|
self.available_eventfd_resume_nodes.push(resume_stack_node);
|
|
return;
|
|
};
|
|
const eventfd_node = &resume_stack_node.data;
|
|
eventfd_node.base.handle = next_tick_node.data;
|
|
switch (builtin.os) {
|
|
builtin.Os.macosx, builtin.Os.freebsd, builtin.Os.netbsd => {
|
|
const kevent_array = (*[1]posix.Kevent)(&eventfd_node.kevent);
|
|
const empty_kevs = ([*]posix.Kevent)(undefined)[0..0];
|
|
_ = os.bsdKEvent(self.os_data.kqfd, kevent_array, empty_kevs, null) catch {
|
|
self.next_tick_queue.unget(next_tick_node);
|
|
self.available_eventfd_resume_nodes.push(resume_stack_node);
|
|
return;
|
|
};
|
|
},
|
|
builtin.Os.linux => {
|
|
// the pending count is already accounted for
|
|
const epoll_events = posix.EPOLLONESHOT | os.linux.EPOLLIN | os.linux.EPOLLOUT |
|
|
os.linux.EPOLLET;
|
|
self.linuxModFd(
|
|
eventfd_node.eventfd,
|
|
eventfd_node.epoll_op,
|
|
epoll_events,
|
|
&eventfd_node.base,
|
|
) catch {
|
|
self.next_tick_queue.unget(next_tick_node);
|
|
self.available_eventfd_resume_nodes.push(resume_stack_node);
|
|
return;
|
|
};
|
|
},
|
|
builtin.Os.windows => {
|
|
os.windowsPostQueuedCompletionStatus(
|
|
self.os_data.io_port,
|
|
undefined,
|
|
undefined,
|
|
&eventfd_node.base.overlapped,
|
|
) catch {
|
|
self.next_tick_queue.unget(next_tick_node);
|
|
self.available_eventfd_resume_nodes.push(resume_stack_node);
|
|
return;
|
|
};
|
|
},
|
|
else => @compileError("unsupported OS"),
|
|
}
|
|
}
|
|
}
|
|
|
|
/// Bring your own linked list node. This means it can't fail.
|
|
pub fn onNextTick(self: *Loop, node: *NextTickNode) void {
|
|
self.beginOneEvent(); // finished in dispatch()
|
|
self.next_tick_queue.put(node);
|
|
self.dispatch();
|
|
}
|
|
|
|
pub fn cancelOnNextTick(self: *Loop, node: *NextTickNode) void {
|
|
if (self.next_tick_queue.remove(node)) {
|
|
self.finishOneEvent();
|
|
}
|
|
}
|
|
|
|
pub fn run(self: *Loop) void {
|
|
self.finishOneEvent(); // the reference we start with
|
|
|
|
self.workerRun();
|
|
|
|
switch (builtin.os) {
|
|
builtin.Os.linux,
|
|
builtin.Os.macosx,
|
|
builtin.Os.freebsd,
|
|
builtin.Os.netbsd,
|
|
=> self.os_data.fs_thread.wait(),
|
|
else => {},
|
|
}
|
|
|
|
for (self.extra_threads) |extra_thread| {
|
|
extra_thread.wait();
|
|
}
|
|
}
|
|
|
|
/// This is equivalent to an async call, except instead of beginning execution of the async function,
|
|
/// it immediately returns to the caller, and the async function is queued in the event loop. It still
|
|
/// returns a promise to be awaited.
|
|
pub fn call(self: *Loop, comptime func: var, args: ...) !(promise->@typeOf(func).ReturnType) {
|
|
const S = struct {
|
|
async fn asyncFunc(loop: *Loop, handle: *promise->@typeOf(func).ReturnType, args2: ...) @typeOf(func).ReturnType {
|
|
suspend {
|
|
handle.* = @handle();
|
|
var my_tick_node = Loop.NextTickNode{
|
|
.prev = undefined,
|
|
.next = undefined,
|
|
.data = @handle(),
|
|
};
|
|
loop.onNextTick(&my_tick_node);
|
|
}
|
|
// TODO guaranteed allocation elision for await in same func as async
|
|
return await (async func(args2) catch unreachable);
|
|
}
|
|
};
|
|
var handle: promise->@typeOf(func).ReturnType = undefined;
|
|
return async<self.allocator> S.asyncFunc(self, &handle, args);
|
|
}
|
|
|
|
/// Awaiting a yield lets the event loop run, starting any unstarted async operations.
|
|
/// Note that async operations automatically start when a function yields for any other reason,
|
|
/// for example, when async I/O is performed. This function is intended to be used only when
|
|
/// CPU bound tasks would be waiting in the event loop but never get started because no async I/O
|
|
/// is performed.
|
|
pub async fn yield(self: *Loop) void {
|
|
suspend {
|
|
var my_tick_node = Loop.NextTickNode{
|
|
.prev = undefined,
|
|
.next = undefined,
|
|
.data = @handle(),
|
|
};
|
|
self.onNextTick(&my_tick_node);
|
|
}
|
|
}
|
|
|
|
/// call finishOneEvent when done
|
|
pub fn beginOneEvent(self: *Loop) void {
|
|
_ = @atomicRmw(usize, &self.pending_event_count, AtomicRmwOp.Add, 1, AtomicOrder.SeqCst);
|
|
}
|
|
|
|
pub fn finishOneEvent(self: *Loop) void {
|
|
const prev = @atomicRmw(usize, &self.pending_event_count, AtomicRmwOp.Sub, 1, AtomicOrder.SeqCst);
|
|
if (prev == 1) {
|
|
// cause all the threads to stop
|
|
switch (builtin.os) {
|
|
builtin.Os.linux => {
|
|
self.posixFsRequest(&self.os_data.fs_end_request);
|
|
// writing 8 bytes to an eventfd cannot fail
|
|
os.posixWrite(self.os_data.final_eventfd, wakeup_bytes) catch unreachable;
|
|
return;
|
|
},
|
|
builtin.Os.macosx, builtin.Os.freebsd, builtin.Os.netbsd => {
|
|
self.posixFsRequest(&self.os_data.fs_end_request);
|
|
const final_kevent = (*[1]posix.Kevent)(&self.os_data.final_kevent);
|
|
const empty_kevs = ([*]posix.Kevent)(undefined)[0..0];
|
|
// cannot fail because we already added it and this just enables it
|
|
_ = os.bsdKEvent(self.os_data.kqfd, final_kevent, empty_kevs, null) catch unreachable;
|
|
return;
|
|
},
|
|
builtin.Os.windows => {
|
|
var i: usize = 0;
|
|
while (i < self.extra_threads.len + 1) : (i += 1) {
|
|
while (true) {
|
|
const overlapped = &self.final_resume_node.overlapped;
|
|
os.windowsPostQueuedCompletionStatus(self.os_data.io_port, undefined, undefined, overlapped) catch continue;
|
|
break;
|
|
}
|
|
}
|
|
return;
|
|
},
|
|
else => @compileError("unsupported OS"),
|
|
}
|
|
}
|
|
}
|
|
|
|
fn workerRun(self: *Loop) void {
|
|
while (true) {
|
|
while (true) {
|
|
const next_tick_node = self.next_tick_queue.get() orelse break;
|
|
self.dispatch();
|
|
resume next_tick_node.data;
|
|
self.finishOneEvent();
|
|
}
|
|
|
|
switch (builtin.os) {
|
|
builtin.Os.linux => {
|
|
// only process 1 event so we don't steal from other threads
|
|
var events: [1]os.linux.epoll_event = undefined;
|
|
const count = os.linuxEpollWait(self.os_data.epollfd, events[0..], -1);
|
|
for (events[0..count]) |ev| {
|
|
const resume_node = @intToPtr(*ResumeNode, ev.data.ptr);
|
|
const handle = resume_node.handle;
|
|
const resume_node_id = resume_node.id;
|
|
switch (resume_node_id) {
|
|
ResumeNode.Id.Basic => {},
|
|
ResumeNode.Id.Stop => return,
|
|
ResumeNode.Id.EventFd => {
|
|
const event_fd_node = @fieldParentPtr(ResumeNode.EventFd, "base", resume_node);
|
|
event_fd_node.epoll_op = posix.EPOLL_CTL_MOD;
|
|
const stack_node = @fieldParentPtr(std.atomic.Stack(ResumeNode.EventFd).Node, "data", event_fd_node);
|
|
self.available_eventfd_resume_nodes.push(stack_node);
|
|
},
|
|
}
|
|
resume handle;
|
|
if (resume_node_id == ResumeNode.Id.EventFd) {
|
|
self.finishOneEvent();
|
|
}
|
|
}
|
|
},
|
|
builtin.Os.macosx, builtin.Os.freebsd, builtin.Os.netbsd => {
|
|
var eventlist: [1]posix.Kevent = undefined;
|
|
const empty_kevs = ([*]posix.Kevent)(undefined)[0..0];
|
|
const count = os.bsdKEvent(self.os_data.kqfd, empty_kevs, eventlist[0..], null) catch unreachable;
|
|
for (eventlist[0..count]) |ev| {
|
|
const resume_node = @intToPtr(*ResumeNode, ev.udata);
|
|
const handle = resume_node.handle;
|
|
const resume_node_id = resume_node.id;
|
|
switch (resume_node_id) {
|
|
ResumeNode.Id.Basic => {
|
|
const basic_node = @fieldParentPtr(ResumeNode.Basic, "base", resume_node);
|
|
basic_node.kev = ev;
|
|
},
|
|
ResumeNode.Id.Stop => return,
|
|
ResumeNode.Id.EventFd => {
|
|
const event_fd_node = @fieldParentPtr(ResumeNode.EventFd, "base", resume_node);
|
|
const stack_node = @fieldParentPtr(std.atomic.Stack(ResumeNode.EventFd).Node, "data", event_fd_node);
|
|
self.available_eventfd_resume_nodes.push(stack_node);
|
|
},
|
|
}
|
|
resume handle;
|
|
if (resume_node_id == ResumeNode.Id.EventFd) {
|
|
self.finishOneEvent();
|
|
}
|
|
}
|
|
},
|
|
builtin.Os.windows => {
|
|
var completion_key: usize = undefined;
|
|
const overlapped = while (true) {
|
|
var nbytes: windows.DWORD = undefined;
|
|
var overlapped: ?*windows.OVERLAPPED = undefined;
|
|
switch (os.windowsGetQueuedCompletionStatus(self.os_data.io_port, &nbytes, &completion_key, &overlapped, windows.INFINITE)) {
|
|
os.WindowsWaitResult.Aborted => return,
|
|
os.WindowsWaitResult.Normal => {},
|
|
os.WindowsWaitResult.EOF => {},
|
|
os.WindowsWaitResult.Cancelled => continue,
|
|
}
|
|
if (overlapped) |o| break o;
|
|
} else unreachable; // TODO else unreachable should not be necessary
|
|
const resume_node = @fieldParentPtr(ResumeNode, "overlapped", overlapped);
|
|
const handle = resume_node.handle;
|
|
const resume_node_id = resume_node.id;
|
|
switch (resume_node_id) {
|
|
ResumeNode.Id.Basic => {},
|
|
ResumeNode.Id.Stop => return,
|
|
ResumeNode.Id.EventFd => {
|
|
const event_fd_node = @fieldParentPtr(ResumeNode.EventFd, "base", resume_node);
|
|
const stack_node = @fieldParentPtr(std.atomic.Stack(ResumeNode.EventFd).Node, "data", event_fd_node);
|
|
self.available_eventfd_resume_nodes.push(stack_node);
|
|
},
|
|
}
|
|
resume handle;
|
|
self.finishOneEvent();
|
|
},
|
|
else => @compileError("unsupported OS"),
|
|
}
|
|
}
|
|
}
|
|
|
|
fn posixFsRequest(self: *Loop, request_node: *fs.RequestNode) void {
|
|
self.beginOneEvent(); // finished in posixFsRun after processing the msg
|
|
self.os_data.fs_queue.put(request_node);
|
|
switch (builtin.os) {
|
|
builtin.Os.macosx, builtin.Os.freebsd, builtin.Os.netbsd => {
|
|
const fs_kevs = (*[1]posix.Kevent)(&self.os_data.fs_kevent_wake);
|
|
const empty_kevs = ([*]posix.Kevent)(undefined)[0..0];
|
|
_ = os.bsdKEvent(self.os_data.fs_kqfd, fs_kevs, empty_kevs, null) catch unreachable;
|
|
},
|
|
builtin.Os.linux => {
|
|
_ = @atomicRmw(i32, &self.os_data.fs_queue_item, AtomicRmwOp.Xchg, 1, AtomicOrder.SeqCst);
|
|
const rc = os.linux.futex_wake(&self.os_data.fs_queue_item, os.linux.FUTEX_WAKE, 1);
|
|
switch (os.linux.getErrno(rc)) {
|
|
0 => {},
|
|
posix.EINVAL => unreachable,
|
|
else => unreachable,
|
|
}
|
|
},
|
|
else => @compileError("Unsupported OS"),
|
|
}
|
|
}
|
|
|
|
fn posixFsCancel(self: *Loop, request_node: *fs.RequestNode) void {
|
|
if (self.os_data.fs_queue.remove(request_node)) {
|
|
self.finishOneEvent();
|
|
}
|
|
}
|
|
|
|
fn posixFsRun(self: *Loop) void {
|
|
while (true) {
|
|
if (builtin.os == builtin.Os.linux) {
|
|
_ = @atomicRmw(i32, &self.os_data.fs_queue_item, AtomicRmwOp.Xchg, 0, AtomicOrder.SeqCst);
|
|
}
|
|
while (self.os_data.fs_queue.get()) |node| {
|
|
switch (node.data.msg) {
|
|
@TagType(fs.Request.Msg).End => return,
|
|
@TagType(fs.Request.Msg).PWriteV => |*msg| {
|
|
msg.result = os.posix_pwritev(msg.fd, msg.iov.ptr, msg.iov.len, msg.offset);
|
|
},
|
|
@TagType(fs.Request.Msg).PReadV => |*msg| {
|
|
msg.result = os.posix_preadv(msg.fd, msg.iov.ptr, msg.iov.len, msg.offset);
|
|
},
|
|
@TagType(fs.Request.Msg).Open => |*msg| {
|
|
msg.result = os.posixOpenC(msg.path.ptr, msg.flags, msg.mode);
|
|
},
|
|
@TagType(fs.Request.Msg).Close => |*msg| os.close(msg.fd),
|
|
@TagType(fs.Request.Msg).WriteFile => |*msg| blk: {
|
|
const flags = posix.O_LARGEFILE | posix.O_WRONLY | posix.O_CREAT |
|
|
posix.O_CLOEXEC | posix.O_TRUNC;
|
|
const fd = os.posixOpenC(msg.path.ptr, flags, msg.mode) catch |err| {
|
|
msg.result = err;
|
|
break :blk;
|
|
};
|
|
defer os.close(fd);
|
|
msg.result = os.posixWrite(fd, msg.contents);
|
|
},
|
|
}
|
|
switch (node.data.finish) {
|
|
@TagType(fs.Request.Finish).TickNode => |*tick_node| self.onNextTick(tick_node),
|
|
@TagType(fs.Request.Finish).DeallocCloseOperation => |close_op| {
|
|
self.allocator.destroy(close_op);
|
|
},
|
|
@TagType(fs.Request.Finish).NoAction => {},
|
|
}
|
|
self.finishOneEvent();
|
|
}
|
|
switch (builtin.os) {
|
|
builtin.Os.linux => {
|
|
const rc = os.linux.futex_wait(&self.os_data.fs_queue_item, os.linux.FUTEX_WAIT, 0, null);
|
|
switch (os.linux.getErrno(rc)) {
|
|
0, posix.EINTR, posix.EAGAIN => continue,
|
|
else => unreachable,
|
|
}
|
|
},
|
|
builtin.Os.macosx, builtin.Os.freebsd, builtin.Os.netbsd => {
|
|
const fs_kevs = (*[1]posix.Kevent)(&self.os_data.fs_kevent_wait);
|
|
var out_kevs: [1]posix.Kevent = undefined;
|
|
_ = os.bsdKEvent(self.os_data.fs_kqfd, fs_kevs, out_kevs[0..], null) catch unreachable;
|
|
},
|
|
else => @compileError("Unsupported OS"),
|
|
}
|
|
}
|
|
}
|
|
|
|
const OsData = switch (builtin.os) {
|
|
builtin.Os.linux => LinuxOsData,
|
|
builtin.Os.macosx, builtin.Os.freebsd, builtin.Os.netbsd => KEventData,
|
|
builtin.Os.windows => struct {
|
|
io_port: windows.HANDLE,
|
|
extra_thread_count: usize,
|
|
},
|
|
else => struct {},
|
|
};
|
|
|
|
const KEventData = struct {
|
|
kqfd: i32,
|
|
final_kevent: posix.Kevent,
|
|
fs_kevent_wake: posix.Kevent,
|
|
fs_kevent_wait: posix.Kevent,
|
|
fs_thread: *os.Thread,
|
|
fs_kqfd: i32,
|
|
fs_queue: std.atomic.Queue(fs.Request),
|
|
fs_end_request: fs.RequestNode,
|
|
};
|
|
|
|
const LinuxOsData = struct {
|
|
epollfd: i32,
|
|
final_eventfd: i32,
|
|
final_eventfd_event: os.linux.epoll_event,
|
|
fs_thread: *os.Thread,
|
|
fs_queue_item: i32,
|
|
fs_queue: std.atomic.Queue(fs.Request),
|
|
fs_end_request: fs.RequestNode,
|
|
};
|
|
};
|
|
|
|
test "std.event.Loop - basic" {
|
|
// https://github.com/ziglang/zig/issues/1908
|
|
if (builtin.single_threaded) return error.SkipZigTest;
|
|
|
|
var da = std.heap.DirectAllocator.init();
|
|
defer da.deinit();
|
|
|
|
const allocator = &da.allocator;
|
|
|
|
var loop: Loop = undefined;
|
|
try loop.initMultiThreaded(allocator);
|
|
defer loop.deinit();
|
|
|
|
loop.run();
|
|
}
|
|
|
|
test "std.event.Loop - call" {
|
|
// https://github.com/ziglang/zig/issues/1908
|
|
if (builtin.single_threaded) return error.SkipZigTest;
|
|
|
|
var da = std.heap.DirectAllocator.init();
|
|
defer da.deinit();
|
|
|
|
const allocator = &da.allocator;
|
|
|
|
var loop: Loop = undefined;
|
|
try loop.initMultiThreaded(allocator);
|
|
defer loop.deinit();
|
|
|
|
var did_it = false;
|
|
const handle = try loop.call(testEventLoop);
|
|
const handle2 = try loop.call(testEventLoop2, handle, &did_it);
|
|
defer cancel handle2;
|
|
|
|
loop.run();
|
|
|
|
testing.expect(did_it);
|
|
}
|
|
|
|
async fn testEventLoop() i32 {
|
|
return 1234;
|
|
}
|
|
|
|
async fn testEventLoop2(h: promise->i32, did_it: *bool) void {
|
|
const value = await h;
|
|
testing.expect(value == 1234);
|
|
did_it.* = true;
|
|
}
|