std.event.Loop: promote the fs thread to be available for all OS's

This commit is contained in:
Andrew Kelley 2020-05-02 00:41:19 -04:00
parent 45bce27b8f
commit 2272a07ca0
6 changed files with 137 additions and 193 deletions

View File

@ -112,39 +112,43 @@ pub fn detectTTYConfig() TTY.Config {
/// Tries to print the current stack trace to stderr, unbuffered, and ignores any error returned.
/// TODO multithreaded awareness
pub fn dumpCurrentStackTrace(start_addr: ?usize) void {
const stderr = getStderrStream();
if (builtin.strip_debug_info) {
noasync stderr.print("Unable to dump stack trace: debug info stripped\n", .{}) catch return;
return;
noasync {
const stderr = getStderrStream();
if (builtin.strip_debug_info) {
stderr.print("Unable to dump stack trace: debug info stripped\n", .{}) catch return;
return;
}
const debug_info = getSelfDebugInfo() catch |err| {
stderr.print("Unable to dump stack trace: Unable to open debug info: {}\n", .{@errorName(err)}) catch return;
return;
};
writeCurrentStackTrace(stderr, debug_info, detectTTYConfig(), start_addr) catch |err| {
stderr.print("Unable to dump stack trace: {}\n", .{@errorName(err)}) catch return;
return;
};
}
const debug_info = getSelfDebugInfo() catch |err| {
noasync stderr.print("Unable to dump stack trace: Unable to open debug info: {}\n", .{@errorName(err)}) catch return;
return;
};
writeCurrentStackTrace(stderr, debug_info, detectTTYConfig(), start_addr) catch |err| {
noasync stderr.print("Unable to dump stack trace: {}\n", .{@errorName(err)}) catch return;
return;
};
}
/// Tries to print the stack trace starting from the supplied base pointer to stderr,
/// unbuffered, and ignores any error returned.
/// TODO multithreaded awareness
pub fn dumpStackTraceFromBase(bp: usize, ip: usize) void {
const stderr = getStderrStream();
if (builtin.strip_debug_info) {
noasync stderr.print("Unable to dump stack trace: debug info stripped\n", .{}) catch return;
return;
}
const debug_info = getSelfDebugInfo() catch |err| {
noasync stderr.print("Unable to dump stack trace: Unable to open debug info: {}\n", .{@errorName(err)}) catch return;
return;
};
const tty_config = detectTTYConfig();
printSourceAtAddress(debug_info, stderr, ip, tty_config) catch return;
var it = StackIterator.init(null, bp);
while (it.next()) |return_address| {
printSourceAtAddress(debug_info, stderr, return_address - 1, tty_config) catch return;
noasync {
const stderr = getStderrStream();
if (builtin.strip_debug_info) {
stderr.print("Unable to dump stack trace: debug info stripped\n", .{}) catch return;
return;
}
const debug_info = getSelfDebugInfo() catch |err| {
stderr.print("Unable to dump stack trace: Unable to open debug info: {}\n", .{@errorName(err)}) catch return;
return;
};
const tty_config = detectTTYConfig();
printSourceAtAddress(debug_info, stderr, ip, tty_config) catch return;
var it = StackIterator.init(null, bp);
while (it.next()) |return_address| {
printSourceAtAddress(debug_info, stderr, return_address - 1, tty_config) catch return;
}
}
}
@ -199,19 +203,21 @@ pub fn captureStackTrace(first_address: ?usize, stack_trace: *builtin.StackTrace
/// Tries to print a stack trace to stderr, unbuffered, and ignores any error returned.
/// TODO multithreaded awareness
pub fn dumpStackTrace(stack_trace: builtin.StackTrace) void {
const stderr = getStderrStream();
if (builtin.strip_debug_info) {
noasync stderr.print("Unable to dump stack trace: debug info stripped\n", .{}) catch return;
return;
noasync {
const stderr = getStderrStream();
if (builtin.strip_debug_info) {
stderr.print("Unable to dump stack trace: debug info stripped\n", .{}) catch return;
return;
}
const debug_info = getSelfDebugInfo() catch |err| {
stderr.print("Unable to dump stack trace: Unable to open debug info: {}\n", .{@errorName(err)}) catch return;
return;
};
writeStackTrace(stack_trace, stderr, getDebugInfoAllocator(), debug_info, detectTTYConfig()) catch |err| {
stderr.print("Unable to dump stack trace: {}\n", .{@errorName(err)}) catch return;
return;
};
}
const debug_info = getSelfDebugInfo() catch |err| {
noasync stderr.print("Unable to dump stack trace: Unable to open debug info: {}\n", .{@errorName(err)}) catch return;
return;
};
writeStackTrace(stack_trace, stderr, getDebugInfoAllocator(), debug_info, detectTTYConfig()) catch |err| {
noasync stderr.print("Unable to dump stack trace: {}\n", .{@errorName(err)}) catch return;
return;
};
}
/// This function invokes undefined behavior when `ok` is `false`.
@ -255,7 +261,7 @@ pub fn panicExtra(trace: ?*const builtin.StackTrace, first_trace_addr: ?usize, c
resetSegfaultHandler();
}
switch (panic_stage) {
noasync switch (panic_stage) {
0 => {
panic_stage = 1;
@ -267,7 +273,7 @@ pub fn panicExtra(trace: ?*const builtin.StackTrace, first_trace_addr: ?usize, c
defer held.release();
const stderr = getStderrStream();
noasync stderr.print(format ++ "\n", args) catch os.abort();
stderr.print(format ++ "\n", args) catch os.abort();
if (trace) |t| {
dumpStackTrace(t.*);
}
@ -292,12 +298,12 @@ pub fn panicExtra(trace: ?*const builtin.StackTrace, first_trace_addr: ?usize, c
// we're still holding the mutex but that's fine as we're going to
// call abort()
const stderr = getStderrStream();
noasync stderr.print("Panicked during a panic. Aborting.\n", .{}) catch os.abort();
stderr.print("Panicked during a panic. Aborting.\n", .{}) catch os.abort();
},
else => {
// Panicked while printing "Panicked during a panic."
},
}
};
os.abort();
}

View File

@ -4,19 +4,27 @@ const root = @import("root");
const assert = std.debug.assert;
const testing = std.testing;
const mem = std.mem;
const AtomicRmwOp = builtin.AtomicRmwOp;
const AtomicOrder = builtin.AtomicOrder;
const os = std.os;
const windows = os.windows;
const maxInt = std.math.maxInt;
const Thread = std.Thread;
const is_windows = std.Target.current.os.tag == .windows;
pub const Loop = struct {
next_tick_queue: std.atomic.Queue(anyframe),
os_data: OsData,
final_resume_node: ResumeNode,
pending_event_count: usize,
extra_threads: []*Thread,
/// TODO change this to a pool of configurable number of threads
/// and rename it to be not file-system-specific. it will become
/// a thread pool for turning non-CPU-bound blocking things into
/// async things. A fallback for any missing OS-specific API.
fs_thread: *Thread,
fs_queue: std.atomic.Queue(Request),
fs_end_request: Request.Node,
fs_thread_wakeup: std.ResetEvent,
/// For resources that have the same lifetime as the `Loop`.
/// This is only used by `Loop` for the thread pool and associated resources.
@ -143,7 +151,12 @@ pub const Loop = struct {
.handle = undefined,
.overlapped = ResumeNode.overlapped_init,
},
.fs_end_request = .{ .data = .{ .msg = .end, .finish = .NoAction } },
.fs_queue = std.atomic.Queue(Request).init(),
.fs_thread = undefined,
.fs_thread_wakeup = std.ResetEvent.init(),
};
errdefer self.fs_thread_wakeup.deinit();
errdefer self.arena.deinit();
// We need at least one of these in case the fs thread wants to use onNextTick
@ -158,10 +171,19 @@ pub const Loop = struct {
try self.initOsData(extra_thread_count);
errdefer self.deinitOsData();
if (!builtin.single_threaded) {
self.fs_thread = try Thread.spawn(self, posixFsRun);
}
errdefer if (!builtin.single_threaded) {
self.posixFsRequest(&self.fs_end_request);
self.fs_thread.wait();
};
}
pub fn deinit(self: *Loop) void {
self.deinitOsData();
self.fs_thread_wakeup.deinit();
self.arena.deinit();
self.* = undefined;
}
@ -173,21 +195,10 @@ pub const Loop = struct {
const wakeup_bytes = [_]u8{0x1} ** 8;
fn initOsData(self: *Loop, extra_thread_count: usize) InitOsDataError!void {
switch (builtin.os.tag) {
noasync switch (builtin.os.tag) {
.linux => {
self.os_data.fs_queue = std.atomic.Queue(Request).init();
self.os_data.fs_queue_item = 0;
// we need another thread for the file system because Linux does not have an async
// file system I/O API.
self.os_data.fs_end_request = Request.Node{
.data = Request{
.msg = .end,
.finish = .NoAction,
},
};
errdefer {
while (self.available_eventfd_resume_nodes.pop()) |node| noasync os.close(node.data.eventfd);
while (self.available_eventfd_resume_nodes.pop()) |node| os.close(node.data.eventfd);
}
for (self.eventfd_resume_nodes) |*eventfd_node| {
eventfd_node.* = std.atomic.Stack(ResumeNode.EventFd).Node{
@ -206,10 +217,10 @@ pub const Loop = struct {
}
self.os_data.epollfd = try os.epoll_create1(os.EPOLL_CLOEXEC);
errdefer noasync os.close(self.os_data.epollfd);
errdefer os.close(self.os_data.epollfd);
self.os_data.final_eventfd = try os.eventfd(0, os.EFD_CLOEXEC | os.EFD_NONBLOCK);
errdefer noasync os.close(self.os_data.final_eventfd);
errdefer os.close(self.os_data.final_eventfd);
self.os_data.final_eventfd_event = os.epoll_event{
.events = os.EPOLLIN,
@ -222,12 +233,6 @@ pub const Loop = struct {
&self.os_data.final_eventfd_event,
);
self.os_data.fs_thread = try Thread.spawn(self, posixFsRun);
errdefer {
self.posixFsRequest(&self.os_data.fs_end_request);
self.os_data.fs_thread.wait();
}
if (builtin.single_threaded) {
assert(extra_thread_count == 0);
return;
@ -236,7 +241,7 @@ pub const Loop = struct {
var extra_thread_index: usize = 0;
errdefer {
// writing 8 bytes to an eventfd cannot fail
const amt = noasync os.write(self.os_data.final_eventfd, &wakeup_bytes) catch unreachable;
const amt = os.write(self.os_data.final_eventfd, &wakeup_bytes) catch unreachable;
assert(amt == wakeup_bytes.len);
while (extra_thread_index != 0) {
extra_thread_index -= 1;
@ -249,22 +254,7 @@ pub const Loop = struct {
},
.macosx, .freebsd, .netbsd, .dragonfly => {
self.os_data.kqfd = try os.kqueue();
errdefer noasync os.close(self.os_data.kqfd);
self.os_data.fs_kqfd = try os.kqueue();
errdefer noasync os.close(self.os_data.fs_kqfd);
self.os_data.fs_queue = std.atomic.Queue(Request).init();
// we need another thread for the file system because Darwin does not have an async
// file system I/O API.
self.os_data.fs_end_request = Request.Node{
.prev = undefined,
.next = undefined,
.data = Request{
.msg = .end,
.finish = .NoAction,
},
};
errdefer os.close(self.os_data.kqfd);
const empty_kevs = &[0]os.Kevent{};
@ -310,30 +300,6 @@ pub const Loop = struct {
self.os_data.final_kevent.flags = os.EV_ENABLE;
self.os_data.final_kevent.fflags = os.NOTE_TRIGGER;
self.os_data.fs_kevent_wake = os.Kevent{
.ident = 0,
.filter = os.EVFILT_USER,
.flags = os.EV_ADD | os.EV_ENABLE,
.fflags = os.NOTE_TRIGGER,
.data = 0,
.udata = undefined,
};
self.os_data.fs_kevent_wait = os.Kevent{
.ident = 0,
.filter = os.EVFILT_USER,
.flags = os.EV_ADD | os.EV_CLEAR,
.fflags = 0,
.data = 0,
.udata = undefined,
};
self.os_data.fs_thread = try Thread.spawn(self, posixFsRun);
errdefer {
self.posixFsRequest(&self.os_data.fs_end_request);
self.os_data.fs_thread.wait();
}
if (builtin.single_threaded) {
assert(extra_thread_count == 0);
return;
@ -401,25 +367,24 @@ pub const Loop = struct {
}
},
else => {},
}
};
}
fn deinitOsData(self: *Loop) void {
switch (builtin.os.tag) {
noasync switch (builtin.os.tag) {
.linux => {
noasync os.close(self.os_data.final_eventfd);
while (self.available_eventfd_resume_nodes.pop()) |node| noasync os.close(node.data.eventfd);
noasync os.close(self.os_data.epollfd);
os.close(self.os_data.final_eventfd);
while (self.available_eventfd_resume_nodes.pop()) |node| os.close(node.data.eventfd);
os.close(self.os_data.epollfd);
},
.macosx, .freebsd, .netbsd, .dragonfly => {
noasync os.close(self.os_data.kqfd);
noasync os.close(self.os_data.fs_kqfd);
os.close(self.os_data.kqfd);
},
.windows => {
windows.CloseHandle(self.os_data.io_port);
},
else => {},
}
};
}
/// resume_node must live longer than the anyframe that it holds a reference to.
@ -635,7 +600,7 @@ pub const Loop = struct {
.freebsd,
.netbsd,
.dragonfly,
=> self.os_data.fs_thread.wait(),
=> self.fs_thread.wait(),
else => {},
}
@ -672,23 +637,25 @@ pub const Loop = struct {
/// call finishOneEvent when done
pub fn beginOneEvent(self: *Loop) void {
_ = @atomicRmw(usize, &self.pending_event_count, AtomicRmwOp.Add, 1, AtomicOrder.SeqCst);
_ = @atomicRmw(usize, &self.pending_event_count, .Add, 1, .SeqCst);
}
pub fn finishOneEvent(self: *Loop) void {
const prev = @atomicRmw(usize, &self.pending_event_count, AtomicRmwOp.Sub, 1, AtomicOrder.SeqCst);
if (prev == 1) {
noasync {
const prev = @atomicRmw(usize, &self.pending_event_count, .Sub, 1, .SeqCst);
if (prev != 1) return;
// cause all the threads to stop
self.posixFsRequest(&self.fs_end_request);
switch (builtin.os.tag) {
.linux => {
self.posixFsRequest(&self.os_data.fs_end_request);
// writing 8 bytes to an eventfd cannot fail
const amt = noasync os.write(self.os_data.final_eventfd, &wakeup_bytes) catch unreachable;
const amt = os.write(self.os_data.final_eventfd, &wakeup_bytes) catch unreachable;
assert(amt == wakeup_bytes.len);
return;
},
.macosx, .freebsd, .netbsd, .dragonfly => {
self.posixFsRequest(&self.os_data.fs_end_request);
const final_kevent = @as(*const [1]os.Kevent, &self.os_data.final_kevent);
const empty_kevs = &[0]os.Kevent{};
// cannot fail because we already added it and this just enables it
@ -1041,73 +1008,55 @@ pub const Loop = struct {
fn posixFsRequest(self: *Loop, request_node: *Request.Node) void {
self.beginOneEvent(); // finished in posixFsRun after processing the msg
self.os_data.fs_queue.put(request_node);
switch (builtin.os.tag) {
.macosx, .freebsd, .netbsd, .dragonfly => {
const fs_kevs = @as(*const [1]os.Kevent, &self.os_data.fs_kevent_wake);
const empty_kevs = &[0]os.Kevent{};
_ = os.kevent(self.os_data.fs_kqfd, fs_kevs, empty_kevs, null) catch unreachable;
},
.linux => {
@atomicStore(i32, &self.os_data.fs_queue_item, 1, AtomicOrder.SeqCst);
const rc = os.linux.futex_wake(&self.os_data.fs_queue_item, os.linux.FUTEX_WAKE, 1);
switch (os.linux.getErrno(rc)) {
0 => {},
os.EINVAL => unreachable,
else => unreachable,
}
},
else => @compileError("Unsupported OS"),
}
self.fs_queue.put(request_node);
self.fs_thread_wakeup.set();
}
fn posixFsCancel(self: *Loop, request_node: *Request.Node) void {
if (self.os_data.fs_queue.remove(request_node)) {
if (self.fs_queue.remove(request_node)) {
self.finishOneEvent();
}
}
// TODO make this whole function noasync
// https://github.com/ziglang/zig/issues/3157
fn posixFsRun(self: *Loop) void {
while (true) {
if (builtin.os.tag == .linux) {
@atomicStore(i32, &self.os_data.fs_queue_item, 0, .SeqCst);
}
while (self.os_data.fs_queue.get()) |node| {
noasync while (true) {
self.fs_thread_wakeup.reset();
while (self.fs_queue.get()) |node| {
switch (node.data.msg) {
.end => return,
.read => |*msg| {
msg.result = noasync os.read(msg.fd, msg.buf);
msg.result = os.read(msg.fd, msg.buf);
},
.readv => |*msg| {
msg.result = noasync os.readv(msg.fd, msg.iov);
msg.result = os.readv(msg.fd, msg.iov);
},
.write => |*msg| {
msg.result = noasync os.write(msg.fd, msg.bytes);
msg.result = os.write(msg.fd, msg.bytes);
},
.writev => |*msg| {
msg.result = noasync os.writev(msg.fd, msg.iov);
msg.result = os.writev(msg.fd, msg.iov);
},
.pwritev => |*msg| {
msg.result = noasync os.pwritev(msg.fd, msg.iov, msg.offset);
msg.result = os.pwritev(msg.fd, msg.iov, msg.offset);
},
.pread => |*msg| {
msg.result = noasync os.pread(msg.fd, msg.buf, msg.offset);
msg.result = os.pread(msg.fd, msg.buf, msg.offset);
},
.preadv => |*msg| {
msg.result = noasync os.preadv(msg.fd, msg.iov, msg.offset);
msg.result = os.preadv(msg.fd, msg.iov, msg.offset);
},
.open => |*msg| {
msg.result = noasync os.openZ(msg.path, msg.flags, msg.mode);
if (is_windows) unreachable; // TODO
msg.result = os.openZ(msg.path, msg.flags, msg.mode);
},
.openat => |*msg| {
msg.result = noasync os.openatZ(msg.fd, msg.path, msg.flags, msg.mode);
if (is_windows) unreachable; // TODO
msg.result = os.openatZ(msg.fd, msg.path, msg.flags, msg.mode);
},
.faccessat => |*msg| {
msg.result = noasync os.faccessatZ(msg.dirfd, msg.path, msg.mode, msg.flags);
msg.result = os.faccessatZ(msg.dirfd, msg.path, msg.mode, msg.flags);
},
.close => |*msg| noasync os.close(msg.fd),
.close => |*msg| os.close(msg.fd),
}
switch (node.data.finish) {
.TickNode => |*tick_node| self.onNextTick(tick_node),
@ -1115,22 +1064,8 @@ pub const Loop = struct {
}
self.finishOneEvent();
}
switch (builtin.os.tag) {
.linux => {
const rc = os.linux.futex_wait(&self.os_data.fs_queue_item, os.linux.FUTEX_WAIT, 0, null);
switch (os.linux.getErrno(rc)) {
0, os.EINTR, os.EAGAIN => continue,
else => unreachable,
}
},
.macosx, .freebsd, .netbsd, .dragonfly => {
const fs_kevs = @as(*const [1]os.Kevent, &self.os_data.fs_kevent_wait);
var out_kevs: [1]os.Kevent = undefined;
_ = os.kevent(self.os_data.fs_kqfd, fs_kevs, out_kevs[0..], null) catch unreachable;
},
else => @compileError("Unsupported OS"),
}
}
self.fs_thread_wakeup.wait();
};
}
const OsData = switch (builtin.os.tag) {
@ -1146,22 +1081,12 @@ pub const Loop = struct {
const KEventData = struct {
kqfd: i32,
final_kevent: os.Kevent,
fs_kevent_wake: os.Kevent,
fs_kevent_wait: os.Kevent,
fs_thread: *Thread,
fs_kqfd: i32,
fs_queue: std.atomic.Queue(Request),
fs_end_request: Request.Node,
};
const LinuxOsData = struct {
epollfd: i32,
final_eventfd: i32,
final_eventfd_event: os.linux.epoll_event,
fs_thread: *Thread,
fs_queue_item: i32,
fs_queue: std.atomic.Queue(Request),
fs_end_request: Request.Node,
};
pub const Request = struct {
@ -1302,11 +1227,11 @@ test "std.event.Loop - basic" {
loop.run();
}
async fn testEventLoop() i32 {
fn testEventLoop() i32 {
return 1234;
}
async fn testEventLoop2(h: anyframe->i32, did_it: *bool) void {
fn testEventLoop2(h: anyframe->i32, did_it: *bool) void {
const value = await h;
testing.expect(value == 1234);
did_it.* = true;

View File

@ -734,7 +734,7 @@ pub const Dir = struct {
.dir = self.fd,
.access_mask = w.SYNCHRONIZE | w.GENERIC_WRITE | read_flag,
.share_access = switch (flags.lock) {
.None => @as(?w.ULONG, null),
.None => w.FILE_SHARE_WRITE | w.FILE_SHARE_READ | w.FILE_SHARE_DELETE,
.Shared => w.FILE_SHARE_READ | w.FILE_SHARE_DELETE,
.Exclusive => w.FILE_SHARE_DELETE,
},

View File

@ -854,8 +854,11 @@ pub const OpenError = error{
/// Open and possibly create a file. Keeps trying if it gets interrupted.
/// See also `openC`.
/// TODO support windows
pub fn open(file_path: []const u8, flags: u32, perm: usize) OpenError!fd_t {
if (std.Target.current.os.tag == .windows) {
const file_path_w = try windows.sliceToPrefixedFileW(file_path);
return openW(&file_path_w, flags, perm);
}
const file_path_c = try toPosixPath(file_path);
return openZ(&file_path_c, flags, perm);
}
@ -864,8 +867,11 @@ pub const openC = @compileError("deprecated: renamed to openZ");
/// Open and possibly create a file. Keeps trying if it gets interrupted.
/// See also `open`.
/// TODO support windows
pub fn openZ(file_path: [*:0]const u8, flags: u32, perm: usize) OpenError!fd_t {
if (std.Target.current.os.tag == .windows) {
const file_path_w = try windows.cStrToPrefixedFileW(file_path);
return openW(&file_path_w, flags, perm);
}
while (true) {
const rc = system.open(file_path, flags, perm);
switch (errno(rc)) {
@ -895,6 +901,13 @@ pub fn openZ(file_path: [*:0]const u8, flags: u32, perm: usize) OpenError!fd_t {
}
}
/// Windows-only. The path parameter is
/// [WTF-16](https://simonsapin.github.io/wtf-8/#potentially-ill-formed-utf-16) encoded.
/// Translates the POSIX open API call to a Windows API call.
pub fn openW(file_path_w: []const u16, flags: u32, perm: usize) OpenError!fd_t {
@compileError("TODO implement openW for windows");
}
/// Open and possibly create a file. Keeps trying if it gets interrupted.
/// `file_path` is relative to the open directory handle `dir_fd`.
/// See also `openatC`.
@ -1683,7 +1696,7 @@ pub fn renameatW(
.dir = old_dir_fd,
.access_mask = windows.SYNCHRONIZE | windows.GENERIC_WRITE | windows.DELETE,
.creation = windows.FILE_OPEN,
.enable_async_io = false,
.io_mode = .blocking,
}) catch |err| switch (err) {
error.WouldBlock => unreachable, // Not possible without `.share_access_nonblocking = true`.
else => |e| return e,

View File

@ -116,10 +116,10 @@ pub const OpenFileOptions = struct {
/// TODO when share_access_nonblocking is false, this implementation uses
/// untinterruptible sleep() to block. This is not the final iteration of the API.
pub fn OpenFile(sub_path_w: []const u16, options: OpenFileOptions) OpenError!HANDLE {
if (sub_path_w[0] == '.' and sub_path_w[1] == 0) {
if (mem.eql(u16, sub_path_w, ".")) {
return error.IsDir;
}
if (sub_path_w[0] == '.' and sub_path_w[1] == '.' and sub_path_w[2] == 0) {
if (mem.eql(u16, sub_path_w, "..")) {
return error.IsDir;
}

View File

@ -52,7 +52,7 @@ pub const ResetEvent = struct {
/// Wait for the event to be set by blocking the current thread.
/// A timeout in nanoseconds can be provided as a hint for how
/// long the thread should block on the unset event before throwind error.TimedOut.
/// long the thread should block on the unset event before throwing error.TimedOut.
pub fn timedWait(self: *ResetEvent, timeout_ns: u64) !void {
return self.os_event.wait(timeout_ns);
}