From 0b5bcd2f56a84e66d5c700744ec1838381893667 Mon Sep 17 00:00:00 2001 From: Andrew Kelley Date: Thu, 6 Feb 2020 17:56:40 -0500 Subject: [PATCH] more std lib async I/O integration * `zig test` gainst `--test-evented-io` parameter and gains the ability to seamlessly run async tests. * `std.ChildProcess` opens its child process pipe with O_NONBLOCK when using evented I/O * `std.io.getStdErr()` gives a File that is blocking even in evented I/O mode. * Delete `std.event.fs`. The functionality is now merged into `std.fs` and async file system access (using a dedicated thread) is automatically handled. * `std.fs.File` can be configured to specify whether its handle is expected to block, and whether that is OK to block even when in async I/O mode. This makes async I/O work correctly for e.g. the file system as well as network. * `std.fs.File` has some deprecated functions removed. * Missing readv,writev,pread,pwrite,preadv,pwritev functions are added to `std.os` and `std.fs.File`. They are all integrated with async I/O. * `std.fs.Watch` is still bit rotted and needs to be audited in light of the new async/await syntax. * `std.io.OutStream` integrates with async I/O * linked list nodes in the std lib have default `null` values for `prev` and `next`. * Windows async I/O integration is enabled for reading/writing file handles. * Added `std.os.mode_t`. Integer sizes need to be audited. * Fixed #4403 which was causing compiler to crash. This is working towards: ./zig test ../test/stage1/behavior.zig --test-evented-io Which does not successfully build yet. I'd like to enable behavioral tests and std lib tests with --test-evented-io in the test matrix in the future, to prevent regressions. --- lib/std/builtin.zig | 1 + lib/std/child_process.zig | 59 +- lib/std/debug.zig | 48 +- lib/std/event.zig | 2 - lib/std/event/loop.zig | 367 ++++++++++-- lib/std/fs.zig | 37 +- lib/std/fs/file.zig | 151 ++--- lib/std/{event/fs.zig => fs/watch.zig} | 751 +------------------------ lib/std/io.zig | 16 +- lib/std/io/out_stream.zig | 24 +- lib/std/linked_list.zig | 9 +- lib/std/os.zig | 205 ++++++- lib/std/os/bits/darwin.zig | 1 + lib/std/os/bits/dragonfly.zig | 1 + lib/std/os/bits/freebsd.zig | 1 + lib/std/os/bits/linux/x86_64.zig | 2 + lib/std/os/bits/netbsd.zig | 1 + lib/std/os/bits/wasi.zig | 1 + lib/std/os/bits/windows.zig | 1 + lib/std/os/windows.zig | 155 ++++- lib/std/special/test_runner.zig | 26 +- src/analyze.cpp | 20 +- src/codegen.cpp | 42 +- 23 files changed, 901 insertions(+), 1020 deletions(-) rename lib/std/{event/fs.zig => fs/watch.zig} (53%) diff --git a/lib/std/builtin.zig b/lib/std/builtin.zig index d8f24753d3..58a3f1a5bf 100644 --- a/lib/std/builtin.zig +++ b/lib/std/builtin.zig @@ -458,6 +458,7 @@ pub const ExportOptions = struct { pub const TestFn = struct { name: []const u8, func: fn () anyerror!void, + async_frame_size: ?usize, }; /// This function type is used by the Zig language code generation and diff --git a/lib/std/child_process.zig b/lib/std/child_process.zig index ab99abe28a..fd67e3a680 100644 --- a/lib/std/child_process.zig +++ b/lib/std/child_process.zig @@ -329,17 +329,18 @@ pub const ChildProcess = struct { } fn spawnPosix(self: *ChildProcess) SpawnError!void { - const stdin_pipe = if (self.stdin_behavior == StdIo.Pipe) try os.pipe() else undefined; + const pipe_flags = if (io.is_async) os.O_NONBLOCK else 0; + const stdin_pipe = if (self.stdin_behavior == StdIo.Pipe) try os.pipe2(pipe_flags) else undefined; errdefer if (self.stdin_behavior == StdIo.Pipe) { destroyPipe(stdin_pipe); }; - const stdout_pipe = if (self.stdout_behavior == StdIo.Pipe) try os.pipe() else undefined; + const stdout_pipe = if (self.stdout_behavior == StdIo.Pipe) try os.pipe2(pipe_flags) else undefined; errdefer if (self.stdout_behavior == StdIo.Pipe) { destroyPipe(stdout_pipe); }; - const stderr_pipe = if (self.stderr_behavior == StdIo.Pipe) try os.pipe() else undefined; + const stderr_pipe = if (self.stderr_behavior == StdIo.Pipe) try os.pipe2(pipe_flags) else undefined; errdefer if (self.stderr_behavior == StdIo.Pipe) { destroyPipe(stderr_pipe); }; @@ -426,17 +427,26 @@ pub const ChildProcess = struct { // we are the parent const pid = @intCast(i32, pid_result); if (self.stdin_behavior == StdIo.Pipe) { - self.stdin = File.openHandle(stdin_pipe[1]); + self.stdin = File{ + .handle = stdin_pipe[1], + .io_mode = std.io.mode, + }; } else { self.stdin = null; } if (self.stdout_behavior == StdIo.Pipe) { - self.stdout = File.openHandle(stdout_pipe[0]); + self.stdout = File{ + .handle = stdout_pipe[0], + .io_mode = std.io.mode, + }; } else { self.stdout = null; } if (self.stderr_behavior == StdIo.Pipe) { - self.stderr = File.openHandle(stderr_pipe[0]); + self.stderr = File{ + .handle = stderr_pipe[0], + .io_mode = std.io.mode, + }; } else { self.stderr = null; } @@ -661,17 +671,26 @@ pub const ChildProcess = struct { }; if (g_hChildStd_IN_Wr) |h| { - self.stdin = File.openHandle(h); + self.stdin = File{ + .handle = h, + .io_mode = io.mode, + }; } else { self.stdin = null; } if (g_hChildStd_OUT_Rd) |h| { - self.stdout = File.openHandle(h); + self.stdout = File{ + .handle = h, + .io_mode = io.mode, + }; } else { self.stdout = null; } if (g_hChildStd_ERR_Rd) |h| { - self.stderr = File.openHandle(h); + self.stderr = File{ + .handle = h, + .io_mode = io.mode, + }; } else { self.stderr = null; } @@ -693,10 +712,10 @@ pub const ChildProcess = struct { fn setUpChildIo(stdio: StdIo, pipe_fd: i32, std_fileno: i32, dev_null_fd: i32) !void { switch (stdio) { - StdIo.Pipe => try os.dup2(pipe_fd, std_fileno), - StdIo.Close => os.close(std_fileno), - StdIo.Inherit => {}, - StdIo.Ignore => try os.dup2(dev_null_fd, std_fileno), + .Pipe => try os.dup2(pipe_fd, std_fileno), + .Close => os.close(std_fileno), + .Inherit => {}, + .Ignore => try os.dup2(dev_null_fd, std_fileno), } } }; @@ -811,12 +830,22 @@ fn forkChildErrReport(fd: i32, err: ChildProcess.SpawnError) noreturn { const ErrInt = @IntType(false, @sizeOf(anyerror) * 8); fn writeIntFd(fd: i32, value: ErrInt) !void { - const stream = &File.openHandle(fd).outStream().stream; + const file = File{ + .handle = fd, + .io_mode = .blocking, + .async_block_allowed = File.async_block_allowed_yes, + }; + const stream = &file.outStream().stream; stream.writeIntNative(u64, @intCast(u64, value)) catch return error.SystemResources; } fn readIntFd(fd: i32) !ErrInt { - const stream = &File.openHandle(fd).inStream().stream; + const file = File{ + .handle = fd, + .io_mode = .blocking, + .async_block_allowed = File.async_block_allowed_yes, + }; + const stream = &file.inStream().stream; return @intCast(ErrInt, stream.readIntNative(u64) catch return error.SystemResources); } diff --git a/lib/std/debug.zig b/lib/std/debug.zig index 8a0503a30d..4a743c8dfb 100644 --- a/lib/std/debug.zig +++ b/lib/std/debug.zig @@ -50,7 +50,7 @@ pub fn warn(comptime fmt: []const u8, args: var) void { const held = stderr_mutex.acquire(); defer held.release(); const stderr = getStderrStream(); - stderr.print(fmt, args) catch return; + noasync stderr.print(fmt, args) catch return; } pub fn getStderrStream() *io.OutStream(File.WriteError) { @@ -102,15 +102,15 @@ pub fn detectTTYConfig() TTY.Config { pub fn dumpCurrentStackTrace(start_addr: ?usize) void { const stderr = getStderrStream(); if (builtin.strip_debug_info) { - stderr.print("Unable to dump stack trace: debug info stripped\n", .{}) catch return; + noasync stderr.print("Unable to dump stack trace: debug info stripped\n", .{}) catch return; return; } const debug_info = getSelfDebugInfo() catch |err| { - stderr.print("Unable to dump stack trace: Unable to open debug info: {}\n", .{@errorName(err)}) catch return; + noasync stderr.print("Unable to dump stack trace: Unable to open debug info: {}\n", .{@errorName(err)}) catch return; return; }; writeCurrentStackTrace(stderr, debug_info, detectTTYConfig(), start_addr) catch |err| { - stderr.print("Unable to dump stack trace: {}\n", .{@errorName(err)}) catch return; + noasync stderr.print("Unable to dump stack trace: {}\n", .{@errorName(err)}) catch return; return; }; } @@ -121,11 +121,11 @@ pub fn dumpCurrentStackTrace(start_addr: ?usize) void { pub fn dumpStackTraceFromBase(bp: usize, ip: usize) void { const stderr = getStderrStream(); if (builtin.strip_debug_info) { - stderr.print("Unable to dump stack trace: debug info stripped\n", .{}) catch return; + noasync stderr.print("Unable to dump stack trace: debug info stripped\n", .{}) catch return; return; } const debug_info = getSelfDebugInfo() catch |err| { - stderr.print("Unable to dump stack trace: Unable to open debug info: {}\n", .{@errorName(err)}) catch return; + noasync stderr.print("Unable to dump stack trace: Unable to open debug info: {}\n", .{@errorName(err)}) catch return; return; }; const tty_config = detectTTYConfig(); @@ -195,15 +195,15 @@ pub fn captureStackTrace(first_address: ?usize, stack_trace: *builtin.StackTrace pub fn dumpStackTrace(stack_trace: builtin.StackTrace) void { const stderr = getStderrStream(); if (builtin.strip_debug_info) { - stderr.print("Unable to dump stack trace: debug info stripped\n", .{}) catch return; + noasync stderr.print("Unable to dump stack trace: debug info stripped\n", .{}) catch return; return; } const debug_info = getSelfDebugInfo() catch |err| { - stderr.print("Unable to dump stack trace: Unable to open debug info: {}\n", .{@errorName(err)}) catch return; + noasync stderr.print("Unable to dump stack trace: Unable to open debug info: {}\n", .{@errorName(err)}) catch return; return; }; writeStackTrace(stack_trace, stderr, getDebugInfoAllocator(), debug_info, detectTTYConfig()) catch |err| { - stderr.print("Unable to dump stack trace: {}\n", .{@errorName(err)}) catch return; + noasync stderr.print("Unable to dump stack trace: {}\n", .{@errorName(err)}) catch return; return; }; } @@ -244,7 +244,7 @@ pub fn panicExtra(trace: ?*const builtin.StackTrace, first_trace_addr: ?usize, c switch (@atomicRmw(u8, &panicking, .Add, 1, .SeqCst)) { 0 => { const stderr = getStderrStream(); - stderr.print(format ++ "\n", args) catch os.abort(); + noasync stderr.print(format ++ "\n", args) catch os.abort(); if (trace) |t| { dumpStackTrace(t.*); } @@ -556,12 +556,12 @@ pub const TTY = struct { switch (conf) { .no_color => return, .escape_codes => switch (color) { - .Red => out_stream.write(RED) catch return, - .Green => out_stream.write(GREEN) catch return, - .Cyan => out_stream.write(CYAN) catch return, - .White, .Bold => out_stream.write(WHITE) catch return, - .Dim => out_stream.write(DIM) catch return, - .Reset => out_stream.write(RESET) catch return, + .Red => noasync out_stream.write(RED) catch return, + .Green => noasync out_stream.write(GREEN) catch return, + .Cyan => noasync out_stream.write(CYAN) catch return, + .White, .Bold => noasync out_stream.write(WHITE) catch return, + .Dim => noasync out_stream.write(DIM) catch return, + .Reset => noasync out_stream.write(RESET) catch return, }, .windows_api => if (builtin.os == .windows) { const S = struct { @@ -717,17 +717,17 @@ fn printLineInfo( tty_config.setColor(out_stream, .White); if (line_info) |*li| { - try out_stream.print("{}:{}:{}", .{ li.file_name, li.line, li.column }); + try noasync out_stream.print("{}:{}:{}", .{ li.file_name, li.line, li.column }); } else { - try out_stream.print("???:?:?", .{}); + try noasync out_stream.write("???:?:?"); } tty_config.setColor(out_stream, .Reset); - try out_stream.write(": "); + try noasync out_stream.write(": "); tty_config.setColor(out_stream, .Dim); - try out_stream.print("0x{x} in {} ({})", .{ address, symbol_name, compile_unit_name }); + try noasync out_stream.print("0x{x} in {} ({})", .{ address, symbol_name, compile_unit_name }); tty_config.setColor(out_stream, .Reset); - try out_stream.write("\n"); + try noasync out_stream.write("\n"); // Show the matching source code line if possible if (line_info) |li| { @@ -736,12 +736,12 @@ fn printLineInfo( // The caret already takes one char const space_needed = @intCast(usize, li.column - 1); - try out_stream.writeByteNTimes(' ', space_needed); + try noasync out_stream.writeByteNTimes(' ', space_needed); tty_config.setColor(out_stream, .Green); - try out_stream.write("^"); + try noasync out_stream.write("^"); tty_config.setColor(out_stream, .Reset); } - try out_stream.write("\n"); + try noasync out_stream.write("\n"); } else |err| switch (err) { error.EndOfFile, error.FileNotFound => {}, error.BadPathName => {}, diff --git a/lib/std/event.zig b/lib/std/event.zig index 2c72c22588..64d73a25e1 100644 --- a/lib/std/event.zig +++ b/lib/std/event.zig @@ -6,11 +6,9 @@ pub const Locked = @import("event/locked.zig").Locked; pub const RwLock = @import("event/rwlock.zig").RwLock; pub const RwLocked = @import("event/rwlocked.zig").RwLocked; pub const Loop = @import("event/loop.zig").Loop; -pub const fs = @import("event/fs.zig"); test "import event tests" { _ = @import("event/channel.zig"); - _ = @import("event/fs.zig"); _ = @import("event/future.zig"); _ = @import("event/group.zig"); _ = @import("event/lock.zig"); diff --git a/lib/std/event/loop.zig b/lib/std/event/loop.zig index e80266c640..555dba3000 100644 --- a/lib/std/event/loop.zig +++ b/lib/std/event/loop.zig @@ -6,7 +6,6 @@ const testing = std.testing; const mem = std.mem; const AtomicRmwOp = builtin.AtomicRmwOp; const AtomicOrder = builtin.AtomicOrder; -const fs = std.event.fs; const os = std.os; const windows = os.windows; const maxInt = std.math.maxInt; @@ -174,21 +173,19 @@ pub const Loop = struct { fn initOsData(self: *Loop, extra_thread_count: usize) InitOsDataError!void { switch (builtin.os) { .linux => { - self.os_data.fs_queue = std.atomic.Queue(fs.Request).init(); + self.os_data.fs_queue = std.atomic.Queue(Request).init(); self.os_data.fs_queue_item = 0; // we need another thread for the file system because Linux does not have an async // file system I/O API. - self.os_data.fs_end_request = fs.RequestNode{ - .prev = undefined, - .next = undefined, - .data = fs.Request{ - .msg = fs.Request.Msg.End, - .finish = fs.Request.Finish.NoAction, + self.os_data.fs_end_request = Request.Node{ + .data = Request{ + .msg = .end, + .finish = .NoAction, }, }; errdefer { - while (self.available_eventfd_resume_nodes.pop()) |node| os.close(node.data.eventfd); + while (self.available_eventfd_resume_nodes.pop()) |node| noasync os.close(node.data.eventfd); } for (self.eventfd_resume_nodes) |*eventfd_node| { eventfd_node.* = std.atomic.Stack(ResumeNode.EventFd).Node{ @@ -207,10 +204,10 @@ pub const Loop = struct { } self.os_data.epollfd = try os.epoll_create1(os.EPOLL_CLOEXEC); - errdefer os.close(self.os_data.epollfd); + errdefer noasync os.close(self.os_data.epollfd); self.os_data.final_eventfd = try os.eventfd(0, os.EFD_CLOEXEC | os.EFD_NONBLOCK); - errdefer os.close(self.os_data.final_eventfd); + errdefer noasync os.close(self.os_data.final_eventfd); self.os_data.final_eventfd_event = os.epoll_event{ .events = os.EPOLLIN, @@ -237,7 +234,7 @@ pub const Loop = struct { var extra_thread_index: usize = 0; errdefer { // writing 8 bytes to an eventfd cannot fail - os.write(self.os_data.final_eventfd, &wakeup_bytes) catch unreachable; + noasync os.write(self.os_data.final_eventfd, &wakeup_bytes) catch unreachable; while (extra_thread_index != 0) { extra_thread_index -= 1; self.extra_threads[extra_thread_index].wait(); @@ -249,20 +246,20 @@ pub const Loop = struct { }, .macosx, .freebsd, .netbsd, .dragonfly => { self.os_data.kqfd = try os.kqueue(); - errdefer os.close(self.os_data.kqfd); + errdefer noasync os.close(self.os_data.kqfd); self.os_data.fs_kqfd = try os.kqueue(); - errdefer os.close(self.os_data.fs_kqfd); + errdefer noasync os.close(self.os_data.fs_kqfd); - self.os_data.fs_queue = std.atomic.Queue(fs.Request).init(); + self.os_data.fs_queue = std.atomic.Queue(Request).init(); // we need another thread for the file system because Darwin does not have an async // file system I/O API. - self.os_data.fs_end_request = fs.RequestNode{ + self.os_data.fs_end_request = Request.Node{ .prev = undefined, .next = undefined, - .data = fs.Request{ - .msg = fs.Request.Msg.End, - .finish = fs.Request.Finish.NoAction, + .data = Request{ + .msg = .end, + .finish = .NoAction, }, }; @@ -407,14 +404,14 @@ pub const Loop = struct { fn deinitOsData(self: *Loop) void { switch (builtin.os) { .linux => { - os.close(self.os_data.final_eventfd); - while (self.available_eventfd_resume_nodes.pop()) |node| os.close(node.data.eventfd); - os.close(self.os_data.epollfd); + noasync os.close(self.os_data.final_eventfd); + while (self.available_eventfd_resume_nodes.pop()) |node| noasync os.close(node.data.eventfd); + noasync os.close(self.os_data.epollfd); self.allocator.free(self.eventfd_resume_nodes); }, .macosx, .freebsd, .netbsd, .dragonfly => { - os.close(self.os_data.kqfd); - os.close(self.os_data.fs_kqfd); + noasync os.close(self.os_data.kqfd); + noasync os.close(self.os_data.fs_kqfd); }, .windows => { windows.CloseHandle(self.os_data.io_port); @@ -711,6 +708,190 @@ pub const Loop = struct { } } + /// Performs an async `os.open` using a separate thread. + pub fn openZ(self: *Loop, file_path: [*:0]const u8, flags: u32, mode: usize) os.OpenError!os.fd_t { + var req_node = Request.Node{ + .data = .{ + .msg = .{ + .open = .{ + .path = file_path, + .flags = flags, + .mode = mode, + .result = undefined, + }, + }, + .finish = .{ .TickNode = .{ .data = @frame() } }, + }, + }; + suspend { + self.posixFsRequest(&req_node); + } + return req_node.data.msg.open.result; + } + + /// Performs an async `os.opent` using a separate thread. + pub fn openatZ(self: *Loop, fd: os.fd_t, file_path: [*:0]const u8, flags: u32, mode: usize) os.OpenError!os.fd_t { + var req_node = Request.Node{ + .data = .{ + .msg = .{ + .openat = .{ + .fd = fd, + .path = file_path, + .flags = flags, + .mode = mode, + .result = undefined, + }, + }, + .finish = .{ .TickNode = .{ .data = @frame() } }, + }, + }; + suspend { + self.posixFsRequest(&req_node); + } + return req_node.data.msg.openat.result; + } + + /// Performs an async `os.close` using a separate thread. + pub fn close(self: *Loop, fd: os.fd_t) void { + var req_node = Request.Node{ + .data = .{ + .msg = .{ .close = .{ .fd = fd } }, + .finish = .{ .TickNode = .{ .data = @frame() } }, + }, + }; + suspend { + self.posixFsRequest(&req_node); + } + } + + /// Performs an async `os.read` using a separate thread. + /// `fd` must block and not return EAGAIN. + pub fn read(self: *Loop, fd: os.fd_t, buf: []u8) os.ReadError!usize { + var req_node = Request.Node{ + .data = .{ + .msg = .{ + .read = .{ + .fd = fd, + .buf = buf, + .result = undefined, + }, + }, + .finish = .{ .TickNode = .{ .data = @frame() } }, + }, + }; + suspend { + self.posixFsRequest(&req_node); + } + return req_node.data.msg.read.result; + } + + /// Performs an async `os.readv` using a separate thread. + /// `fd` must block and not return EAGAIN. + pub fn readv(self: *Loop, fd: os.fd_t, iov: []const os.iovec) os.ReadError!usize { + var req_node = Request.Node{ + .data = .{ + .msg = .{ + .readv = .{ + .fd = fd, + .iov = iov, + .result = undefined, + }, + }, + .finish = .{ .TickNode = .{ .data = @frame() } }, + }, + }; + suspend { + self.posixFsRequest(&req_node); + } + return req_node.data.msg.readv.result; + } + + /// Performs an async `os.preadv` using a separate thread. + /// `fd` must block and not return EAGAIN. + pub fn preadv(self: *Loop, fd: os.fd_t, iov: []const os.iovec, offset: u64) os.ReadError!usize { + var req_node = Request.Node{ + .data = .{ + .msg = .{ + .preadv = .{ + .fd = fd, + .iov = iov, + .offset = offset, + .result = undefined, + }, + }, + .finish = .{ .TickNode = .{ .data = @frame() } }, + }, + }; + suspend { + self.posixFsRequest(&req_node); + } + return req_node.data.msg.preadv.result; + } + + /// Performs an async `os.write` using a separate thread. + /// `fd` must block and not return EAGAIN. + pub fn write(self: *Loop, fd: os.fd_t, bytes: []const u8) os.WriteError!void { + var req_node = Request.Node{ + .data = .{ + .msg = .{ + .write = .{ + .fd = fd, + .bytes = bytes, + .result = undefined, + }, + }, + .finish = .{ .TickNode = .{ .data = @frame() } }, + }, + }; + suspend { + self.posixFsRequest(&req_node); + } + return req_node.data.msg.write.result; + } + + /// Performs an async `os.writev` using a separate thread. + /// `fd` must block and not return EAGAIN. + pub fn writev(self: *Loop, fd: os.fd_t, iov: []const os.iovec_const) WriteError!void { + var req_node = Request.Node{ + .data = .{ + .msg = .{ + .writev = .{ + .fd = fd, + .iov = iov, + .result = undefined, + }, + }, + .finish = .{ .TickNode = .{ .data = @frame() } }, + }, + }; + suspend { + self.posixFsRequest(&req_node); + } + return req_node.data.msg.writev.result; + } + + /// Performs an async `os.pwritev` using a separate thread. + /// `fd` must block and not return EAGAIN. + pub fn pwritev(self: *Loop, fd: os.fd_t, iov: []const os.iovec_const, offset: u64) WriteError!void { + var req_node = Request.Node{ + .data = .{ + .msg = .{ + .pwritev = .{ + .fd = fd, + .iov = iov, + .offset = offset, + .result = undefined, + }, + }, + .finish = .{ .TickNode = .{ .data = @frame() } }, + }, + }; + suspend { + self.posixFsRequest(&req_node); + } + return req_node.data.msg.pwritev.result; + } + fn workerRun(self: *Loop) void { while (true) { while (true) { @@ -804,7 +985,7 @@ pub const Loop = struct { } } - fn posixFsRequest(self: *Loop, request_node: *fs.RequestNode) void { + fn posixFsRequest(self: *Loop, request_node: *Request.Node) void { self.beginOneEvent(); // finished in posixFsRun after processing the msg self.os_data.fs_queue.put(request_node); switch (builtin.os) { @@ -826,7 +1007,7 @@ pub const Loop = struct { } } - fn posixFsCancel(self: *Loop, request_node: *fs.RequestNode) void { + fn posixFsCancel(self: *Loop, request_node: *Request.Node) void { if (self.os_data.fs_queue.remove(request_node)) { self.finishOneEvent(); } @@ -841,37 +1022,32 @@ pub const Loop = struct { } while (self.os_data.fs_queue.get()) |node| { switch (node.data.msg) { - .End => return, - .WriteV => |*msg| { + .end => return, + .read => |*msg| { + msg.result = noasync os.read(msg.fd, msg.buf); + }, + .write => |*msg| { + msg.result = noasync os.write(msg.fd, msg.bytes); + }, + .writev => |*msg| { msg.result = noasync os.writev(msg.fd, msg.iov); }, - .PWriteV => |*msg| { + .pwritev => |*msg| { msg.result = noasync os.pwritev(msg.fd, msg.iov, msg.offset); }, - .PReadV => |*msg| { + .preadv => |*msg| { msg.result = noasync os.preadv(msg.fd, msg.iov, msg.offset); }, - .Open => |*msg| { - msg.result = noasync os.openC(msg.path.ptr, msg.flags, msg.mode); + .open => |*msg| { + msg.result = noasync os.openC(msg.path, msg.flags, msg.mode); }, - .Close => |*msg| noasync os.close(msg.fd), - .WriteFile => |*msg| blk: { - const O_LARGEFILE = if (@hasDecl(os, "O_LARGEFILE")) os.O_LARGEFILE else 0; - const flags = O_LARGEFILE | os.O_WRONLY | os.O_CREAT | - os.O_CLOEXEC | os.O_TRUNC; - const fd = noasync os.openC(msg.path.ptr, flags, msg.mode) catch |err| { - msg.result = err; - break :blk; - }; - defer noasync os.close(fd); - msg.result = noasync os.write(fd, msg.contents); + .openat => |*msg| { + msg.result = noasync os.openatC(msg.fd, msg.path, msg.flags, msg.mode); }, + .close => |*msg| noasync os.close(msg.fd), } switch (node.data.finish) { .TickNode => |*tick_node| self.onNextTick(tick_node), - .DeallocCloseOperation => |close_op| { - self.allocator.destroy(close_op); - }, .NoAction => {}, } self.finishOneEvent(); @@ -911,8 +1087,8 @@ pub const Loop = struct { fs_kevent_wait: os.Kevent, fs_thread: *Thread, fs_kqfd: i32, - fs_queue: std.atomic.Queue(fs.Request), - fs_end_request: fs.RequestNode, + fs_queue: std.atomic.Queue(Request), + fs_end_request: Request.Node, }; const LinuxOsData = struct { @@ -921,8 +1097,99 @@ pub const Loop = struct { final_eventfd_event: os.linux.epoll_event, fs_thread: *Thread, fs_queue_item: i32, - fs_queue: std.atomic.Queue(fs.Request), - fs_end_request: fs.RequestNode, + fs_queue: std.atomic.Queue(Request), + fs_end_request: Request.Node, + }; + + pub const Request = struct { + msg: Msg, + finish: Finish, + + pub const Node = std.atomic.Queue(Request).Node; + + pub const Finish = union(enum) { + TickNode: Loop.NextTickNode, + NoAction, + }; + + pub const Msg = union(enum) { + read: Read, + write: Write, + writev: WriteV, + pwritev: PWriteV, + preadv: PReadV, + open: Open, + openat: OpenAt, + close: Close, + + /// special - means the fs thread should exit + end, + + pub const Read = struct { + fd: os.fd_t, + buf: []u8, + result: Error!usize, + + pub const Error = os.ReadError; + }; + + pub const Write = struct { + fd: os.fd_t, + bytes: []const u8, + result: Error!void, + + pub const Error = os.WriteError; + }; + + pub const WriteV = struct { + fd: os.fd_t, + iov: []const os.iovec_const, + result: Error!void, + + pub const Error = os.WriteError; + }; + + pub const PWriteV = struct { + fd: os.fd_t, + iov: []const os.iovec_const, + offset: usize, + result: Error!void, + + pub const Error = os.WriteError; + }; + + pub const PReadV = struct { + fd: os.fd_t, + iov: []const os.iovec, + offset: usize, + result: Error!usize, + + pub const Error = os.ReadError; + }; + + pub const Open = struct { + path: [*:0]const u8, + flags: u32, + mode: os.mode_t, + result: Error!os.fd_t, + + pub const Error = os.OpenError; + }; + + pub const OpenAt = struct { + fd: os.fd_t, + path: [*:0]const u8, + flags: u32, + mode: os.mode_t, + result: Error!os.fd_t, + + pub const Error = os.OpenError; + }; + + pub const Close = struct { + fd: os.fd_t, + }; + }; }; }; diff --git a/lib/std/fs.zig b/lib/std/fs.zig index fe59b6ee2e..72b12af466 100644 --- a/lib/std/fs.zig +++ b/lib/std/fs.zig @@ -23,6 +23,8 @@ pub const realpathW = os.realpathW; pub const getAppDataDir = @import("fs/get_app_data_dir.zig").getAppDataDir; pub const GetAppDataDirError = @import("fs/get_app_data_dir.zig").GetAppDataDirError; +pub const Watch = @import("fs/watch.zig").Watch; + /// This represents the maximum size of a UTF-8 encoded file path. /// All file system operations which return a path are guaranteed to /// fit into a UTF-8 encoded array of this length. @@ -43,6 +45,13 @@ pub const base64_encoder = base64.Base64Encoder.init( base64.standard_pad_char, ); +/// Whether or not async file system syscalls need a dedicated thread because the operating +/// system does not support non-blocking I/O on the file system. +pub const need_async_thread = std.io.is_async and switch (builtin.os) { + .windows, .other => false, + else => true, +}; + /// TODO remove the allocator requirement from this API pub fn atomicSymLink(allocator: *Allocator, existing_path: []const u8, new_path: []const u8) !void { if (symLink(existing_path, new_path)) { @@ -688,7 +697,11 @@ pub const Dir = struct { } pub fn close(self: *Dir) void { - os.close(self.fd); + if (need_async_thread) { + std.event.Loop.instance.?.close(self.fd); + } else { + os.close(self.fd); + } self.* = undefined; } @@ -718,8 +731,11 @@ pub const Dir = struct { @as(u32, os.O_WRONLY) else @as(u32, os.O_RDONLY); - const fd = try os.openatC(self.fd, sub_path, os_flags, 0); - return File{ .handle = fd }; + const fd = if (need_async_thread) + try std.event.Loop.instance.?.openatZ(self.fd, sub_path, os_flags, 0) + else + try os.openatC(self.fd, sub_path, os_flags, 0); + return File{ .handle = fd, .io_mode = .blocking }; } /// Same as `openFile` but Windows-only and the path parameter is @@ -756,8 +772,11 @@ pub const Dir = struct { (if (flags.truncate) @as(u32, os.O_TRUNC) else 0) | (if (flags.read) @as(u32, os.O_RDWR) else os.O_WRONLY) | (if (flags.exclusive) @as(u32, os.O_EXCL) else 0); - const fd = try os.openatC(self.fd, sub_path_c, os_flags, flags.mode); - return File{ .handle = fd }; + const fd = if (need_async_thread) + try std.event.Loop.instance.?.openatZ(self.fd, sub_path_c, os_flags, flags.mode) + else + try os.openatC(self.fd, sub_path_c, os_flags, flags.mode); + return File{ .handle = fd, .io_mode = .blocking }; } /// Same as `createFile` but Windows-only and the path parameter is @@ -919,7 +938,12 @@ pub const Dir = struct { } fn openDirFlagsC(self: Dir, sub_path_c: [*:0]const u8, flags: u32) OpenError!Dir { - const fd = os.openatC(self.fd, sub_path_c, flags | os.O_DIRECTORY, 0) catch |err| switch (err) { + const os_flags = flags | os.O_DIRECTORY; + const result = if (need_async_thread) + std.event.Loop.instance.?.openatZ(self.fd, sub_path_c, os_flags, 0) + else + os.openatC(self.fd, sub_path_c, os_flags, 0); + const fd = result catch |err| switch (err) { error.FileTooBig => unreachable, // can't happen for directories error.IsDir => unreachable, // we're providing O_DIRECTORY error.NoSpaceLeft => unreachable, // not providing O_CREAT @@ -1588,4 +1612,5 @@ test "" { _ = @import("fs/path.zig"); _ = @import("fs/file.zig"); _ = @import("fs/get_app_data_dir.zig"); + _ = @import("fs/watch.zig"); } diff --git a/lib/std/fs/file.zig b/lib/std/fs/file.zig index 924f10401e..e14f5fb053 100644 --- a/lib/std/fs/file.zig +++ b/lib/std/fs/file.zig @@ -8,18 +8,29 @@ const assert = std.debug.assert; const windows = os.windows; const Os = builtin.Os; const maxInt = std.math.maxInt; +const need_async_thread = std.fs.need_async_thread; pub const File = struct { /// The OS-specific file descriptor or file handle. handle: os.fd_t, - pub const Mode = switch (builtin.os) { - Os.windows => void, - else => u32, - }; + /// On some systems, such as Linux, file system file descriptors are incapable of non-blocking I/O. + /// This forces us to perform asynchronous I/O on a dedicated thread, to achieve non-blocking + /// file-system I/O. To do this, `File` must be aware of whether it is a file system file descriptor, + /// or, more specifically, whether the I/O is blocking. + io_mode: io.Mode, + + /// Even when std.io.mode is async, it is still sometimes desirable to perform blocking I/O, although + /// not by default. For example, when printing a stack trace to stderr. + async_block_allowed: @TypeOf(async_block_allowed_no) = async_block_allowed_no, + + pub const async_block_allowed_yes = if (io.is_async) true else {}; + pub const async_block_allowed_no = if (io.is_async) false else {}; + + pub const Mode = os.mode_t; pub const default_mode = switch (builtin.os) { - Os.windows => {}, + .windows => 0, else => 0o666, }; @@ -49,87 +60,27 @@ pub const File = struct { mode: Mode = default_mode, }; - /// Deprecated; call `std.fs.Dir.openFile` directly. - pub fn openRead(path: []const u8) OpenError!File { - return std.fs.cwd().openFile(path, .{}); - } - - /// Deprecated; call `std.fs.Dir.openFileC` directly. - pub fn openReadC(path_c: [*:0]const u8) OpenError!File { - return std.fs.cwd().openFileC(path_c, .{}); - } - - /// Deprecated; call `std.fs.Dir.openFileW` directly. - pub fn openReadW(path_w: [*:0]const u16) OpenError!File { - return std.fs.cwd().openFileW(path_w, .{}); - } - - /// Deprecated; call `std.fs.Dir.createFile` directly. - pub fn openWrite(path: []const u8) OpenError!File { - return std.fs.cwd().createFile(path, .{}); - } - - /// Deprecated; call `std.fs.Dir.createFile` directly. - pub fn openWriteMode(path: []const u8, file_mode: Mode) OpenError!File { - return std.fs.cwd().createFile(path, .{ .mode = file_mode }); - } - - /// Deprecated; call `std.fs.Dir.createFileC` directly. - pub fn openWriteModeC(path_c: [*:0]const u8, file_mode: Mode) OpenError!File { - return std.fs.cwd().createFileC(path_c, .{ .mode = file_mode }); - } - - /// Deprecated; call `std.fs.Dir.createFileW` directly. - pub fn openWriteModeW(path_w: [*:0]const u16, file_mode: Mode) OpenError!File { - return std.fs.cwd().createFileW(path_w, .{ .mode = file_mode }); - } - - /// Deprecated; call `std.fs.Dir.createFile` directly. - pub fn openWriteNoClobber(path: []const u8, file_mode: Mode) OpenError!File { - return std.fs.cwd().createFile(path, .{ - .mode = file_mode, - .exclusive = true, - }); - } - - /// Deprecated; call `std.fs.Dir.createFileC` directly. - pub fn openWriteNoClobberC(path_c: [*:0]const u8, file_mode: Mode) OpenError!File { - return std.fs.cwd().createFileC(path_c, .{ - .mode = file_mode, - .exclusive = true, - }); - } - - /// Deprecated; call `std.fs.Dir.createFileW` directly. - pub fn openWriteNoClobberW(path_w: [*:0]const u16, file_mode: Mode) OpenError!File { - return std.fs.cwd().createFileW(path_w, .{ - .mode = file_mode, - .exclusive = true, - }); - } - - pub fn openHandle(handle: os.fd_t) File { - return File{ .handle = handle }; - } - /// Test for the existence of `path`. /// `path` is UTF8-encoded. /// In general it is recommended to avoid this function. For example, /// instead of testing if a file exists and then opening it, just /// open it and handle the error for file not found. /// TODO: deprecate this and move it to `std.fs.Dir`. + /// TODO: integrate with async I/O pub fn access(path: []const u8) !void { return os.access(path, os.F_OK); } /// Same as `access` except the parameter is null-terminated. /// TODO: deprecate this and move it to `std.fs.Dir`. + /// TODO: integrate with async I/O pub fn accessC(path: [*:0]const u8) !void { return os.accessC(path, os.F_OK); } /// Same as `access` except the parameter is null-terminated UTF16LE-encoded. /// TODO: deprecate this and move it to `std.fs.Dir`. + /// TODO: integrate with async I/O pub fn accessW(path: [*:0]const u16) !void { return os.accessW(path, os.F_OK); } @@ -137,7 +88,11 @@ pub const File = struct { /// Upon success, the stream is in an uninitialized state. To continue using it, /// you must use the open() function. pub fn close(self: File) void { - return os.close(self.handle); + if (need_async_thread and self.io_mode == .blocking and !self.async_block_allowed) { + std.event.Loop.instance.?.close(self.handle); + } else { + return os.close(self.handle); + } } /// Test whether the file refers to a terminal. @@ -167,26 +122,31 @@ pub const File = struct { pub const SeekError = os.SeekError; /// Repositions read/write file offset relative to the current offset. + /// TODO: integrate with async I/O pub fn seekBy(self: File, offset: i64) SeekError!void { return os.lseek_CUR(self.handle, offset); } /// Repositions read/write file offset relative to the end. + /// TODO: integrate with async I/O pub fn seekFromEnd(self: File, offset: i64) SeekError!void { return os.lseek_END(self.handle, offset); } /// Repositions read/write file offset relative to the beginning. + /// TODO: integrate with async I/O pub fn seekTo(self: File, offset: u64) SeekError!void { return os.lseek_SET(self.handle, offset); } pub const GetPosError = os.SeekError || os.FStatError; + /// TODO: integrate with async I/O pub fn getPos(self: File) GetPosError!u64 { return os.lseek_CUR_get(self.handle); } + /// TODO: integrate with async I/O pub fn getEndPos(self: File) GetPosError!u64 { if (builtin.os == .windows) { return windows.GetFileSizeEx(self.handle); @@ -196,6 +156,7 @@ pub const File = struct { pub const ModeError = os.FStatError; + /// TODO: integrate with async I/O pub fn mode(self: File) ModeError!Mode { if (builtin.os == .windows) { return {}; @@ -219,6 +180,7 @@ pub const File = struct { pub const StatError = os.FStatError; + /// TODO: integrate with async I/O pub fn stat(self: File) StatError!Stat { if (builtin.os == .windows) { var io_status_block: windows.IO_STATUS_BLOCK = undefined; @@ -259,6 +221,7 @@ pub const File = struct { /// and therefore this function cannot guarantee any precision will be stored. /// Further, the maximum value is limited by the system ABI. When a value is provided /// that exceeds this range, the value is clamped to the maximum. + /// TODO: integrate with async I/O pub fn updateTimes( self: File, /// access timestamp in nanoseconds @@ -287,21 +250,61 @@ pub const File = struct { pub const ReadError = os.ReadError; pub fn read(self: File, buffer: []u8) ReadError!usize { + if (need_async_thread and self.io_mode == .blocking and !self.async_block_allowed) { + return std.event.Loop.instance.?.read(self.handle, buffer); + } return os.read(self.handle, buffer); } + pub fn pread(self: File, buffer: []u8, offset: u64) ReadError!usize { + if (need_async_thread and self.io_mode == .blocking and !self.async_block_allowed) { + return std.event.Loop.instance.?.pread(self.handle, buffer); + } + return os.pread(self.handle, buffer, offset); + } + + pub fn readv(self: File, iovecs: []const os.iovec) ReadError!usize { + if (need_async_thread and self.io_mode == .blocking and !self.async_block_allowed) { + return std.event.Loop.instance.?.readv(self.handle, iovecs); + } + return os.readv(self.handle, iovecs); + } + + pub fn preadv(self: File, iovecs: []const os.iovec, offset: u64) ReadError!usize { + if (need_async_thread and self.io_mode == .blocking and !self.async_block_allowed) { + return std.event.Loop.instance.?.preadv(self.handle, iovecs, offset); + } + return os.preadv(self.handle, iovecs, offset); + } + pub const WriteError = os.WriteError; pub fn write(self: File, bytes: []const u8) WriteError!void { + if (need_async_thread and self.io_mode == .blocking and !self.async_block_allowed) { + return std.event.Loop.instance.?.write(self.handle, bytes); + } return os.write(self.handle, bytes); } - pub fn writev_iovec(self: File, iovecs: []const os.iovec_const) WriteError!void { - if (std.event.Loop.instance) |loop| { - return std.event.fs.writevPosix(loop, self.handle, iovecs); - } else { - return os.writev(self.handle, iovecs); + pub fn pwrite(self: File, bytes: []const u8, offset: u64) WriteError!void { + if (need_async_thread and self.io_mode == .blocking and !self.async_block_allowed) { + return std.event.Loop.instance.?.pwrite(self.handle, bytes, offset); } + return os.pwrite(self.handle, bytes, offset); + } + + pub fn writev(self: File, iovecs: []const os.iovec_const) WriteError!void { + if (need_async_thread and self.io_mode == .blocking and !self.async_block_allowed) { + return std.event.Loop.instance.?.writev(self.handle, iovecs); + } + return os.writev(self.handle, iovecs); + } + + pub fn pwritev(self: File, iovecs: []const os.iovec_const, offset: usize) WriteError!void { + if (need_async_thread and self.io_mode == .blocking and !self.async_block_allowed) { + return std.event.Loop.instance.?.pwritev(self.handle, iovecs); + } + return os.pwritev(self.handle, iovecs); } pub fn inStream(file: File) InStream { diff --git a/lib/std/event/fs.zig b/lib/std/fs/watch.zig similarity index 53% rename from lib/std/event/fs.zig rename to lib/std/fs/watch.zig index 7581d2a1fb..8eab1ce36e 100644 --- a/lib/std/event/fs.zig +++ b/lib/std/fs/watch.zig @@ -1,5 +1,5 @@ -const builtin = @import("builtin"); const std = @import("../std.zig"); +const builtin = @import("builtin"); const event = std.event; const assert = std.debug.assert; const testing = std.testing; @@ -11,702 +11,7 @@ const fd_t = os.fd_t; const File = std.fs.File; const Allocator = mem.Allocator; -//! TODO mege this with `std.fs` - -const global_event_loop = Loop.instance orelse - @compileError("std.event.fs currently only works with event-based I/O"); - -pub const RequestNode = std.atomic.Queue(Request).Node; - -pub const Request = struct { - msg: Msg, - finish: Finish, - - pub const Finish = union(enum) { - TickNode: Loop.NextTickNode, - DeallocCloseOperation: *CloseOperation, - NoAction, - }; - - pub const Msg = union(enum) { - WriteV: WriteV, - PWriteV: PWriteV, - PReadV: PReadV, - Open: Open, - Close: Close, - WriteFile: WriteFile, - End, // special - means the fs thread should exit - - pub const WriteV = struct { - fd: fd_t, - iov: []const os.iovec_const, - result: Error!void, - - pub const Error = os.WriteError; - }; - - pub const PWriteV = struct { - fd: fd_t, - iov: []const os.iovec_const, - offset: usize, - result: Error!void, - - pub const Error = os.WriteError; - }; - - pub const PReadV = struct { - fd: fd_t, - iov: []const os.iovec, - offset: usize, - result: Error!usize, - - pub const Error = os.ReadError; - }; - - pub const Open = struct { - path: [:0]const u8, - flags: u32, - mode: File.Mode, - result: Error!fd_t, - - pub const Error = File.OpenError; - }; - - pub const WriteFile = struct { - path: [:0]const u8, - contents: []const u8, - mode: File.Mode, - result: Error!void, - - pub const Error = File.OpenError || File.WriteError; - }; - - pub const Close = struct { - fd: fd_t, - }; - }; -}; - -pub const PWriteVError = error{OutOfMemory} || File.WriteError; - -/// data - just the inner references - must live until pwritev frame completes. -pub fn pwritev(allocator: *Allocator, fd: fd_t, data: []const []const u8, offset: usize) PWriteVError!void { - switch (builtin.os) { - .macosx, - .linux, - .freebsd, - .netbsd, - .dragonfly, - => { - const iovecs = try allocator.alloc(os.iovec_const, data.len); - defer allocator.free(iovecs); - - for (data) |buf, i| { - iovecs[i] = os.iovec_const{ - .iov_base = buf.ptr, - .iov_len = buf.len, - }; - } - - return pwritevPosix(fd, iovecs, offset); - }, - .windows => { - const data_copy = try std.mem.dupe(allocator, []const u8, data); - defer allocator.free(data_copy); - return pwritevWindows(fd, data, offset); - }, - else => @compileError("Unsupported OS"), - } -} - -/// data must outlive the returned frame -pub fn pwritevWindows(fd: fd_t, data: []const []const u8, offset: usize) os.WindowsWriteError!void { - if (data.len == 0) return; - if (data.len == 1) return pwriteWindows(fd, data[0], offset); - - // TODO do these in parallel - var off = offset; - for (data) |buf| { - try pwriteWindows(fd, buf, off); - off += buf.len; - } -} - -pub fn pwriteWindows(fd: fd_t, data: []const u8, offset: u64) os.WindowsWriteError!void { - var resume_node = Loop.ResumeNode.Basic{ - .base = Loop.ResumeNode{ - .id = Loop.ResumeNode.Id.Basic, - .handle = @frame(), - .overlapped = windows.OVERLAPPED{ - .Internal = 0, - .InternalHigh = 0, - .Offset = @truncate(u32, offset), - .OffsetHigh = @truncate(u32, offset >> 32), - .hEvent = null, - }, - }, - }; - // TODO only call create io completion port once per fd - _ = windows.CreateIoCompletionPort(fd, global_event_loop.os_data.io_port, undefined, undefined); - global_event_loop.beginOneEvent(); - errdefer global_event_loop.finishOneEvent(); - - errdefer { - _ = windows.kernel32.CancelIoEx(fd, &resume_node.base.overlapped); - } - suspend { - _ = windows.kernel32.WriteFile(fd, data.ptr, @intCast(windows.DWORD, data.len), null, &resume_node.base.overlapped); - } - var bytes_transferred: windows.DWORD = undefined; - if (windows.kernel32.GetOverlappedResult(fd, &resume_node.base.overlapped, &bytes_transferred, windows.FALSE) == 0) { - switch (windows.kernel32.GetLastError()) { - .IO_PENDING => unreachable, - .INVALID_USER_BUFFER => return error.SystemResources, - .NOT_ENOUGH_MEMORY => return error.SystemResources, - .OPERATION_ABORTED => return error.OperationAborted, - .NOT_ENOUGH_QUOTA => return error.SystemResources, - .BROKEN_PIPE => return error.BrokenPipe, - else => |err| return windows.unexpectedError(err), - } - } -} - -/// iovecs must live until pwritev frame completes. -pub fn pwritevPosix(fd: fd_t, iovecs: []const os.iovec_const, offset: usize) os.WriteError!void { - var req_node = RequestNode{ - .prev = null, - .next = null, - .data = Request{ - .msg = Request.Msg{ - .PWriteV = Request.Msg.PWriteV{ - .fd = fd, - .iov = iovecs, - .offset = offset, - .result = undefined, - }, - }, - .finish = Request.Finish{ - .TickNode = Loop.NextTickNode{ - .prev = null, - .next = null, - .data = @frame(), - }, - }, - }, - }; - - errdefer global_event_loop.posixFsCancel(&req_node); - - suspend { - global_event_loop.posixFsRequest(&req_node); - } - - return req_node.data.msg.PWriteV.result; -} - -/// iovecs must live until pwritev frame completes. -pub fn writevPosix(fd: fd_t, iovecs: []const os.iovec_const) os.WriteError!void { - var req_node = RequestNode{ - .prev = null, - .next = null, - .data = Request{ - .msg = Request.Msg{ - .WriteV = Request.Msg.WriteV{ - .fd = fd, - .iov = iovecs, - .result = undefined, - }, - }, - .finish = Request.Finish{ - .TickNode = Loop.NextTickNode{ - .prev = null, - .next = null, - .data = @frame(), - }, - }, - }, - }; - - suspend { - global_event_loop.posixFsRequest(&req_node); - } - - return req_node.data.msg.WriteV.result; -} - -pub const PReadVError = error{OutOfMemory} || File.ReadError; - -/// data - just the inner references - must live until preadv frame completes. -pub fn preadv(allocator: *Allocator, fd: fd_t, data: []const []u8, offset: usize) PReadVError!usize { - assert(data.len != 0); - switch (builtin.os) { - .macosx, - .linux, - .freebsd, - .netbsd, - .dragonfly, - => { - const iovecs = try allocator.alloc(os.iovec, data.len); - defer allocator.free(iovecs); - - for (data) |buf, i| { - iovecs[i] = os.iovec{ - .iov_base = buf.ptr, - .iov_len = buf.len, - }; - } - - return preadvPosix(fd, iovecs, offset); - }, - .windows => { - const data_copy = try std.mem.dupe(allocator, []u8, data); - defer allocator.free(data_copy); - return preadvWindows(fd, data_copy, offset); - }, - else => @compileError("Unsupported OS"), - } -} - -/// data must outlive the returned frame -pub fn preadvWindows(fd: fd_t, data: []const []u8, offset: u64) !usize { - assert(data.len != 0); - if (data.len == 1) return preadWindows(fd, data[0], offset); - - // TODO do these in parallel? - var off: usize = 0; - var iov_i: usize = 0; - var inner_off: usize = 0; - while (true) { - const v = data[iov_i]; - const amt_read = try preadWindows(fd, v[inner_off .. v.len - inner_off], offset + off); - off += amt_read; - inner_off += amt_read; - if (inner_off == v.len) { - iov_i += 1; - inner_off = 0; - if (iov_i == data.len) { - return off; - } - } - if (amt_read == 0) return off; // EOF - } -} - -pub fn preadWindows(fd: fd_t, data: []u8, offset: u64) !usize { - var resume_node = Loop.ResumeNode.Basic{ - .base = Loop.ResumeNode{ - .id = Loop.ResumeNode.Id.Basic, - .handle = @frame(), - .overlapped = windows.OVERLAPPED{ - .Internal = 0, - .InternalHigh = 0, - .Offset = @truncate(u32, offset), - .OffsetHigh = @truncate(u32, offset >> 32), - .hEvent = null, - }, - }, - }; - // TODO only call create io completion port once per fd - _ = windows.CreateIoCompletionPort(fd, global_event_loop.os_data.io_port, undefined, undefined) catch undefined; - global_event_loop.beginOneEvent(); - errdefer global_event_loop.finishOneEvent(); - - errdefer { - _ = windows.kernel32.CancelIoEx(fd, &resume_node.base.overlapped); - } - suspend { - _ = windows.kernel32.ReadFile(fd, data.ptr, @intCast(windows.DWORD, data.len), null, &resume_node.base.overlapped); - } - var bytes_transferred: windows.DWORD = undefined; - if (windows.kernel32.GetOverlappedResult(fd, &resume_node.base.overlapped, &bytes_transferred, windows.FALSE) == 0) { - switch (windows.kernel32.GetLastError()) { - .IO_PENDING => unreachable, - .OPERATION_ABORTED => return error.OperationAborted, - .BROKEN_PIPE => return error.BrokenPipe, - .HANDLE_EOF => return @as(usize, bytes_transferred), - else => |err| return windows.unexpectedError(err), - } - } - return @as(usize, bytes_transferred); -} - -/// iovecs must live until preadv frame completes -pub fn preadvPosix(fd: fd_t, iovecs: []const os.iovec, offset: usize) os.ReadError!usize { - var req_node = RequestNode{ - .prev = null, - .next = null, - .data = Request{ - .msg = Request.Msg{ - .PReadV = Request.Msg.PReadV{ - .fd = fd, - .iov = iovecs, - .offset = offset, - .result = undefined, - }, - }, - .finish = Request.Finish{ - .TickNode = Loop.NextTickNode{ - .prev = null, - .next = null, - .data = @frame(), - }, - }, - }, - }; - - errdefer global_event_loop.posixFsCancel(&req_node); - - suspend { - global_event_loop.posixFsRequest(&req_node); - } - - return req_node.data.msg.PReadV.result; -} - -pub fn openPosix(path: []const u8, flags: u32, mode: File.Mode) File.OpenError!fd_t { - const path_c = try std.os.toPosixPath(path); - - var req_node = RequestNode{ - .prev = null, - .next = null, - .data = Request{ - .msg = Request.Msg{ - .Open = Request.Msg.Open{ - .path = path_c[0..path.len], - .flags = flags, - .mode = mode, - .result = undefined, - }, - }, - .finish = Request.Finish{ - .TickNode = Loop.NextTickNode{ - .prev = null, - .next = null, - .data = @frame(), - }, - }, - }, - }; - - errdefer global_event_loop.posixFsCancel(&req_node); - - suspend { - global_event_loop.posixFsRequest(&req_node); - } - - return req_node.data.msg.Open.result; -} - -pub fn openRead(path: []const u8) File.OpenError!fd_t { - switch (builtin.os) { - .macosx, .linux, .freebsd, .netbsd, .dragonfly => { - const O_LARGEFILE = if (@hasDecl(os, "O_LARGEFILE")) os.O_LARGEFILE else 0; - const flags = O_LARGEFILE | os.O_RDONLY | os.O_CLOEXEC; - return openPosix(path, flags, File.default_mode); - }, - - .windows => return windows.CreateFile( - path, - windows.GENERIC_READ, - windows.FILE_SHARE_READ, - null, - windows.OPEN_EXISTING, - windows.FILE_ATTRIBUTE_NORMAL | windows.FILE_FLAG_OVERLAPPED, - null, - ), - - else => @compileError("Unsupported OS"), - } -} - -/// Creates if does not exist. Truncates the file if it exists. -/// Uses the default mode. -pub fn openWrite(path: []const u8) File.OpenError!fd_t { - return openWriteMode(path, File.default_mode); -} - -/// Creates if does not exist. Truncates the file if it exists. -pub fn openWriteMode(path: []const u8, mode: File.Mode) File.OpenError!fd_t { - switch (builtin.os) { - .macosx, - .linux, - .freebsd, - .netbsd, - .dragonfly, - => { - const O_LARGEFILE = if (@hasDecl(os, "O_LARGEFILE")) os.O_LARGEFILE else 0; - const flags = O_LARGEFILE | os.O_WRONLY | os.O_CREAT | os.O_CLOEXEC | os.O_TRUNC; - return openPosix(path, flags, File.default_mode); - }, - .windows => return windows.CreateFile( - path, - windows.GENERIC_WRITE, - windows.FILE_SHARE_WRITE | windows.FILE_SHARE_READ | windows.FILE_SHARE_DELETE, - null, - windows.CREATE_ALWAYS, - windows.FILE_ATTRIBUTE_NORMAL | windows.FILE_FLAG_OVERLAPPED, - null, - ), - else => @compileError("Unsupported OS"), - } -} - -/// Creates if does not exist. Does not truncate. -pub fn openReadWrite(path: []const u8, mode: File.Mode) File.OpenError!fd_t { - switch (builtin.os) { - .macosx, .linux, .freebsd, .netbsd, .dragonfly => { - const O_LARGEFILE = if (@hasDecl(os, "O_LARGEFILE")) os.O_LARGEFILE else 0; - const flags = O_LARGEFILE | os.O_RDWR | os.O_CREAT | os.O_CLOEXEC; - return openPosix(path, flags, mode); - }, - - .windows => return windows.CreateFile( - path, - windows.GENERIC_WRITE | windows.GENERIC_READ, - windows.FILE_SHARE_WRITE | windows.FILE_SHARE_READ | windows.FILE_SHARE_DELETE, - null, - windows.OPEN_ALWAYS, - windows.FILE_ATTRIBUTE_NORMAL | windows.FILE_FLAG_OVERLAPPED, - null, - ), - - else => @compileError("Unsupported OS"), - } -} - -/// This abstraction helps to close file handles in defer expressions -/// without the possibility of failure and without the use of suspend points. -/// Start a `CloseOperation` before opening a file, so that you can defer -/// `CloseOperation.finish`. -/// If you call `setHandle` then finishing will close the fd; otherwise finishing -/// will deallocate the `CloseOperation`. -pub const CloseOperation = struct { - allocator: *Allocator, - os_data: OsData, - - const OsData = switch (builtin.os) { - .linux, .macosx, .freebsd, .netbsd, .dragonfly => OsDataPosix, - - .windows => struct { - handle: ?fd_t, - }, - - else => @compileError("Unsupported OS"), - }; - - const OsDataPosix = struct { - have_fd: bool, - close_req_node: RequestNode, - }; - - pub fn start(allocator: *Allocator) (error{OutOfMemory}!*CloseOperation) { - const self = try allocator.create(CloseOperation); - self.* = CloseOperation{ - .allocator = allocator, - .os_data = switch (builtin.os) { - .linux, .macosx, .freebsd, .netbsd, .dragonfly => initOsDataPosix(self), - .windows => OsData{ .handle = null }, - else => @compileError("Unsupported OS"), - }, - }; - return self; - } - - fn initOsDataPosix(self: *CloseOperation) OsData { - return OsData{ - .have_fd = false, - .close_req_node = RequestNode{ - .prev = null, - .next = null, - .data = Request{ - .msg = Request.Msg{ - .Close = Request.Msg.Close{ .fd = undefined }, - }, - .finish = Request.Finish{ .DeallocCloseOperation = self }, - }, - }, - }; - } - - /// Defer this after creating. - pub fn finish(self: *CloseOperation) void { - switch (builtin.os) { - .linux, - .macosx, - .freebsd, - .netbsd, - .dragonfly, - => { - if (self.os_data.have_fd) { - global_event_loop.posixFsRequest(&self.os_data.close_req_node); - } else { - self.allocator.destroy(self); - } - }, - .windows => { - if (self.os_data.handle) |handle| { - os.close(handle); - } - self.allocator.destroy(self); - }, - else => @compileError("Unsupported OS"), - } - } - - pub fn setHandle(self: *CloseOperation, handle: fd_t) void { - switch (builtin.os) { - .linux, - .macosx, - .freebsd, - .netbsd, - .dragonfly, - => { - self.os_data.close_req_node.data.msg.Close.fd = handle; - self.os_data.have_fd = true; - }, - .windows => { - self.os_data.handle = handle; - }, - else => @compileError("Unsupported OS"), - } - } - - /// Undo a `setHandle`. - pub fn clearHandle(self: *CloseOperation) void { - switch (builtin.os) { - .linux, - .macosx, - .freebsd, - .netbsd, - .dragonfly, - => { - self.os_data.have_fd = false; - }, - .windows => { - self.os_data.handle = null; - }, - else => @compileError("Unsupported OS"), - } - } - - pub fn getHandle(self: *CloseOperation) fd_t { - switch (builtin.os) { - .linux, - .macosx, - .freebsd, - .netbsd, - .dragonfly, - => { - assert(self.os_data.have_fd); - return self.os_data.close_req_node.data.msg.Close.fd; - }, - .windows => { - return self.os_data.handle.?; - }, - else => @compileError("Unsupported OS"), - } - } -}; - -/// contents must remain alive until writeFile completes. -/// TODO make this atomic or provide writeFileAtomic and rename this one to writeFileTruncate -pub fn writeFile(allocator: *Allocator, path: []const u8, contents: []const u8) !void { - return writeFileMode(allocator, path, contents, File.default_mode); -} - -/// contents must remain alive until writeFile completes. -pub fn writeFileMode(allocator: *Allocator, path: []const u8, contents: []const u8, mode: File.Mode) !void { - switch (builtin.os) { - .linux, - .macosx, - .freebsd, - .netbsd, - .dragonfly, - => return writeFileModeThread(allocator, path, contents, mode), - .windows => return writeFileWindows(path, contents), - else => @compileError("Unsupported OS"), - } -} - -fn writeFileWindows(path: []const u8, contents: []const u8) !void { - const handle = try windows.CreateFile( - path, - windows.GENERIC_WRITE, - windows.FILE_SHARE_WRITE | windows.FILE_SHARE_READ | windows.FILE_SHARE_DELETE, - null, - windows.CREATE_ALWAYS, - windows.FILE_ATTRIBUTE_NORMAL | windows.FILE_FLAG_OVERLAPPED, - null, - ); - defer os.close(handle); - - try pwriteWindows(handle, contents, 0); -} - -fn writeFileModeThread(allocator: *Allocator, path: []const u8, contents: []const u8, mode: File.Mode) !void { - const path_with_null = try std.cstr.addNullByte(allocator, path); - defer allocator.free(path_with_null); - - var req_node = RequestNode{ - .prev = null, - .next = null, - .data = Request{ - .msg = Request.Msg{ - .WriteFile = Request.Msg.WriteFile{ - .path = path_with_null[0..path.len], - .contents = contents, - .mode = mode, - .result = undefined, - }, - }, - .finish = Request.Finish{ - .TickNode = Loop.NextTickNode{ - .prev = null, - .next = null, - .data = @frame(), - }, - }, - }, - }; - - errdefer global_event_loop.posixFsCancel(&req_node); - - suspend { - global_event_loop.posixFsRequest(&req_node); - } - - return req_node.data.msg.WriteFile.result; -} - -/// The frame resumes when the last data has been confirmed written, but before the file handle -/// is closed. -/// Caller owns returned memory. -pub fn readFile(allocator: *Allocator, file_path: []const u8, max_size: usize) ![]u8 { - var close_op = try CloseOperation.start(allocator); - defer close_op.finish(); - - const fd = try openRead(file_path); - close_op.setHandle(fd); - - var list = std.ArrayList(u8).init(allocator); - defer list.deinit(); - - while (true) { - try list.ensureCapacity(list.len + mem.page_size); - const buf = list.items[list.len..]; - const buf_array = [_][]u8{buf}; - const amt = try preadv(allocator, fd, &buf_array, list.len); - list.len += amt; - if (list.len > max_size) { - return error.FileTooBig; - } - if (amt < buf.len) { - return list.toOwnedSlice(); - } - } -} - -pub const WatchEventId = enum { +const WatchEventId = enum { CloseWrite, Delete, }; @@ -721,7 +26,7 @@ fn hashString(s: []const u16) u32 { return @truncate(u32, std.hash.Wyhash.hash(0, @sliceToBytes(s))); } -pub const WatchEventError = error{ +const WatchEventError = error{ UserResourceLimitReached, SystemResources, AccessDenied, @@ -1366,53 +671,3 @@ fn testFsWatch(allocator: *Allocator) !void { // TODO test deleting the file and then re-adding it. we should get events for both } - -pub const OutStream = struct { - fd: fd_t, - stream: Stream, - allocator: *Allocator, - offset: usize, - - pub const Error = File.WriteError; - pub const Stream = event.io.OutStream(Error); - - pub fn init(allocator: *Allocator, fd: fd_t, offset: usize) OutStream { - return OutStream{ - .fd = fd, - .offset = offset, - .stream = Stream{ .writeFn = writeFn }, - }; - } - - fn writeFn(out_stream: *Stream, bytes: []const u8) Error!void { - const self = @fieldParentPtr(OutStream, "stream", out_stream); - const offset = self.offset; - self.offset += bytes.len; - return pwritev(self.allocator, self.fd, [_][]const u8{bytes}, offset); - } -}; - -pub const InStream = struct { - fd: fd_t, - stream: Stream, - allocator: *Allocator, - offset: usize, - - pub const Error = PReadVError; // TODO make this not have OutOfMemory - pub const Stream = event.io.InStream(Error); - - pub fn init(allocator: *Allocator, fd: fd_t, offset: usize) InStream { - return InStream{ - .fd = fd, - .offset = offset, - .stream = Stream{ .readFn = readFn }, - }; - } - - fn readFn(in_stream: *Stream, bytes: []u8) Error!usize { - const self = @fieldParentPtr(InStream, "stream", in_stream); - const amt = try preadv(self.allocator, self.fd, [_][]u8{bytes}, self.offset); - self.offset += amt; - return amt; - } -}; diff --git a/lib/std/io.zig b/lib/std/io.zig index a0e58c373d..341b73e33c 100644 --- a/lib/std/io.zig +++ b/lib/std/io.zig @@ -47,7 +47,10 @@ fn getStdOutHandle() os.fd_t { } pub fn getStdOut() File { - return File.openHandle(getStdOutHandle()); + return File{ + .handle = getStdOutHandle(), + .io_mode = .blocking, + }; } fn getStdErrHandle() os.fd_t { @@ -63,7 +66,11 @@ fn getStdErrHandle() os.fd_t { } pub fn getStdErr() File { - return File.openHandle(getStdErrHandle()); + return File{ + .handle = getStdErrHandle(), + .io_mode = .blocking, + .async_block_allowed = File.async_block_allowed_yes, + }; } fn getStdInHandle() os.fd_t { @@ -79,7 +86,10 @@ fn getStdInHandle() os.fd_t { } pub fn getStdIn() File { - return File.openHandle(getStdInHandle()); + return File{ + .handle = getStdInHandle(), + .io_mode = .blocking, + }; } pub const SeekableStream = @import("io/seekable_stream.zig").SeekableStream; diff --git a/lib/std/io/out_stream.zig b/lib/std/io/out_stream.zig index 56d3324e6f..7f534865f5 100644 --- a/lib/std/io/out_stream.zig +++ b/lib/std/io/out_stream.zig @@ -9,14 +9,11 @@ pub const stack_size: usize = if (@hasDecl(root, "stack_size_std_io_OutStream")) else default_stack_size; -/// TODO this is not integrated with evented I/O yet. -/// https://github.com/ziglang/zig/issues/3557 pub fn OutStream(comptime WriteError: type) type { return struct { const Self = @This(); pub const Error = WriteError; - // TODO https://github.com/ziglang/zig/issues/3557 - pub const WriteFn = if (std.io.is_async and false) + pub const WriteFn = if (std.io.is_async) async fn (self: *Self, bytes: []const u8) Error!void else fn (self: *Self, bytes: []const u8) Error!void; @@ -24,8 +21,7 @@ pub fn OutStream(comptime WriteError: type) type { writeFn: WriteFn, pub fn write(self: *Self, bytes: []const u8) Error!void { - // TODO https://github.com/ziglang/zig/issues/3557 - if (std.io.is_async and false) { + if (std.io.is_async) { // Let's not be writing 0xaa in safe modes for upwards of 4 MiB for every stream write. @setRuntimeSafety(false); var stack_frame: [stack_size]u8 align(std.Target.stack_align) = undefined; @@ -40,8 +36,8 @@ pub fn OutStream(comptime WriteError: type) type { } pub fn writeByte(self: *Self, byte: u8) Error!void { - const slice = @as(*const [1]u8, &byte)[0..]; - return self.writeFn(self, slice); + const array = [1]u8{byte}; + return self.write(&array); } pub fn writeByteNTimes(self: *Self, byte: u8, n: usize) Error!void { @@ -51,7 +47,7 @@ pub fn OutStream(comptime WriteError: type) type { var remaining: usize = n; while (remaining > 0) { const to_write = std.math.min(remaining, bytes.len); - try self.writeFn(self, bytes[0..to_write]); + try self.write(bytes[0..to_write]); remaining -= to_write; } } @@ -60,32 +56,32 @@ pub fn OutStream(comptime WriteError: type) type { pub fn writeIntNative(self: *Self, comptime T: type, value: T) Error!void { var bytes: [(T.bit_count + 7) / 8]u8 = undefined; mem.writeIntNative(T, &bytes, value); - return self.writeFn(self, &bytes); + return self.write(&bytes); } /// Write a foreign-endian integer. pub fn writeIntForeign(self: *Self, comptime T: type, value: T) Error!void { var bytes: [(T.bit_count + 7) / 8]u8 = undefined; mem.writeIntForeign(T, &bytes, value); - return self.writeFn(self, &bytes); + return self.write(&bytes); } pub fn writeIntLittle(self: *Self, comptime T: type, value: T) Error!void { var bytes: [(T.bit_count + 7) / 8]u8 = undefined; mem.writeIntLittle(T, &bytes, value); - return self.writeFn(self, &bytes); + return self.write(&bytes); } pub fn writeIntBig(self: *Self, comptime T: type, value: T) Error!void { var bytes: [(T.bit_count + 7) / 8]u8 = undefined; mem.writeIntBig(T, &bytes, value); - return self.writeFn(self, &bytes); + return self.write(&bytes); } pub fn writeInt(self: *Self, comptime T: type, value: T, endian: builtin.Endian) Error!void { var bytes: [(T.bit_count + 7) / 8]u8 = undefined; mem.writeInt(T, &bytes, value, endian); - return self.writeFn(self, &bytes); + return self.write(&bytes); } }; } diff --git a/lib/std/linked_list.zig b/lib/std/linked_list.zig index a21c9a83eb..23201dbf94 100644 --- a/lib/std/linked_list.zig +++ b/lib/std/linked_list.zig @@ -18,12 +18,11 @@ pub fn SinglyLinkedList(comptime T: type) type { /// Node inside the linked list wrapping the actual data. pub const Node = struct { - next: ?*Node, + next: ?*Node = null, data: T, pub fn init(data: T) Node { return Node{ - .next = null, .data = data, }; } @@ -196,14 +195,12 @@ pub fn TailQueue(comptime T: type) type { /// Node inside the linked list wrapping the actual data. pub const Node = struct { - prev: ?*Node, - next: ?*Node, + prev: ?*Node = null, + next: ?*Node = null, data: T, pub fn init(data: T) Node { return Node{ - .prev = null, - .next = null, .data = data, }; } diff --git a/lib/std/os.zig b/lib/std/os.zig index 302d8bfce5..94e2ca3869 100644 --- a/lib/std/os.zig +++ b/lib/std/os.zig @@ -169,7 +169,12 @@ fn getRandomBytesDevURandom(buf: []u8) !void { return error.NoDevice; } - const stream = &std.fs.File.openHandle(fd).inStream().stream; + const file = std.fs.File{ + .handle = fd, + .io_mode = .blocking, + .async_block_allowed = std.fs.File.async_block_allowed_yes, + }; + const stream = &file.inStream().stream; stream.readNoEof(buf) catch return error.Unexpected; } @@ -293,7 +298,7 @@ pub const ReadError = error{ /// via the event loop. Otherwise EAGAIN results in error.WouldBlock. pub fn read(fd: fd_t, buf: []u8) ReadError!usize { if (builtin.os == .windows) { - return windows.ReadFile(fd, buf); + return windows.ReadFile(fd, buf, null); } if (builtin.os == .wasi and !builtin.link_libc) { @@ -335,9 +340,37 @@ pub fn read(fd: fd_t, buf: []u8) ReadError!usize { } /// Number of bytes read is returned. Upon reading end-of-file, zero is returned. -/// If the application has a global event loop enabled, EAGAIN is handled -/// via the event loop. Otherwise EAGAIN results in error.WouldBlock. +/// +/// For POSIX systems, if the application has a global event loop enabled, EAGAIN is handled +/// via the event loop. Otherwise EAGAIN results in `error.WouldBlock`. +/// On Windows, if the application has a global event loop enabled, I/O Completion Ports are +/// used to perform the I/O. `error.WouldBlock` is not possible on Windows. +/// +/// This operation is non-atomic on the following systems: +/// * Windows +/// On these systems, the read races with concurrent writes to the same file descriptor. pub fn readv(fd: fd_t, iov: []const iovec) ReadError!usize { + if (builtin.os == .windows) { + // TODO batch these into parallel requests + var off: usize = 0; + var iov_i: usize = 0; + var inner_off: usize = 0; + while (true) { + const v = iov[iov_i]; + const amt_read = try read(fd, v.iov_base[inner_off .. v.iov_len - inner_off]); + off += amt_read; + inner_off += amt_read; + if (inner_off == v.len) { + iov_i += 1; + inner_off = 0; + if (iov_i == iov.len) { + return off; + } + } + if (amt_read == 0) return off; // EOF + } else unreachable; // TODO https://github.com/ziglang/zig/issues/707 + } + while (true) { // TODO handle the case when iov_len is too large and get rid of this @intCast const rc = system.readv(fd, iov.ptr, @intCast(u32, iov.len)); @@ -363,8 +396,56 @@ pub fn readv(fd: fd_t, iov: []const iovec) ReadError!usize { } /// Number of bytes read is returned. Upon reading end-of-file, zero is returned. -/// If the application has a global event loop enabled, EAGAIN is handled -/// via the event loop. Otherwise EAGAIN results in error.WouldBlock. +/// +/// Retries when interrupted by a signal. +/// +/// For POSIX systems, if the application has a global event loop enabled, EAGAIN is handled +/// via the event loop. Otherwise EAGAIN results in `error.WouldBlock`. +/// On Windows, if the application has a global event loop enabled, I/O Completion Ports are +/// used to perform the I/O. `error.WouldBlock` is not possible on Windows. +pub fn pread(fd: fd_t, buf: []u8, offset: u64) ReadError!usize { + if (builtin.os == .windows) { + return windows.ReadFile(fd, buf, offset); + } + + while (true) { + const rc = system.pread(fd, buf.ptr, buf.len, offset); + switch (errno(rc)) { + 0 => return @intCast(usize, rc), + EINTR => continue, + EINVAL => unreachable, + EFAULT => unreachable, + EAGAIN => if (std.event.Loop.instance) |loop| { + loop.waitUntilFdReadable(fd); + continue; + } else { + return error.WouldBlock; + }, + EBADF => unreachable, // Always a race condition. + EIO => return error.InputOutput, + EISDIR => return error.IsDir, + ENOBUFS => return error.SystemResources, + ENOMEM => return error.SystemResources, + ECONNRESET => return error.ConnectionResetByPeer, + else => |err| return unexpectedErrno(err), + } + } + return index; +} + +/// Number of bytes read is returned. Upon reading end-of-file, zero is returned. +/// +/// Retries when interrupted by a signal. +/// +/// For POSIX systems, if the application has a global event loop enabled, EAGAIN is handled +/// via the event loop. Otherwise EAGAIN results in `error.WouldBlock`. +/// On Windows, if the application has a global event loop enabled, I/O Completion Ports are +/// used to perform the I/O. `error.WouldBlock` is not possible on Windows. +/// +/// This operation is non-atomic on the following systems: +/// * Darwin +/// * Windows +/// On these systems, the read races with concurrent writes to the same file descriptor. pub fn preadv(fd: fd_t, iov: []const iovec, offset: u64) ReadError!usize { if (comptime std.Target.current.isDarwin()) { // Darwin does not have preadv but it does have pread. @@ -409,6 +490,28 @@ pub fn preadv(fd: fd_t, iov: []const iovec, offset: u64) ReadError!usize { } } } + + if (builtin.os == .windows) { + // TODO batch these into parallel requests + var off: usize = 0; + var iov_i: usize = 0; + var inner_off: usize = 0; + while (true) { + const v = iov[iov_i]; + const amt_read = try pread(fd, v.iov_base[inner_off .. v.iov_len - inner_off], offset + off); + off += amt_read; + inner_off += amt_read; + if (inner_off == v.len) { + iov_i += 1; + inner_off = 0; + if (iov_i == iov.len) { + return off; + } + } + if (amt_read == 0) return off; // EOF + } else unreachable; // TODO https://github.com/ziglang/zig/issues/707 + } + while (true) { // TODO handle the case when iov_len is too large and get rid of this @intCast const rc = system.preadv(fd, iov.ptr, @intCast(u32, iov.len), offset); @@ -451,11 +554,9 @@ pub const WriteError = error{ /// Write to a file descriptor. Keeps trying if it gets interrupted. /// If the application has a global event loop enabled, EAGAIN is handled /// via the event loop. Otherwise EAGAIN results in error.WouldBlock. -/// TODO evented I/O integration is disabled until -/// https://github.com/ziglang/zig/issues/3557 is solved. pub fn write(fd: fd_t, bytes: []const u8) WriteError!void { if (builtin.os == .windows) { - return windows.WriteFile(fd, bytes); + return windows.WriteFile(fd, bytes, null); } if (builtin.os == .wasi and !builtin.link_libc) { @@ -488,14 +589,12 @@ pub fn write(fd: fd_t, bytes: []const u8) WriteError!void { EINTR => continue, EINVAL => unreachable, EFAULT => unreachable, - // TODO https://github.com/ziglang/zig/issues/3557 - EAGAIN => return error.WouldBlock, - //EAGAIN => if (std.event.Loop.instance) |loop| { - // loop.waitUntilFdWritable(fd); - // continue; - //} else { - // return error.WouldBlock; - //}, + EAGAIN => if (std.event.Loop.instance) |loop| { + loop.waitUntilFdWritable(fd); + continue; + } else { + return error.WouldBlock; + }, EBADF => unreachable, // Always a race condition. EDESTADDRREQ => unreachable, // `connect` was never called. EDQUOT => return error.DiskQuota, @@ -540,8 +639,57 @@ pub fn writev(fd: fd_t, iov: []const iovec_const) WriteError!void { } } +/// Write to a file descriptor, with a position offset. +/// +/// Retries when interrupted by a signal. +/// +/// For POSIX systems, if the application has a global event loop enabled, EAGAIN is handled +/// via the event loop. Otherwise EAGAIN results in `error.WouldBlock`. +/// On Windows, if the application has a global event loop enabled, I/O Completion Ports are +/// used to perform the I/O. `error.WouldBlock` is not possible on Windows. +pub fn pwrite(fd: fd_t, bytes: []const u8, offset: u64) WriteError!void { + if (comptime std.Target.current.isWindows()) { + return windows.WriteFile(fd, bytes, offset); + } + + while (true) { + const rc = system.pwrite(fd, bytes.ptr, bytes.len, offset); + switch (errno(rc)) { + 0 => return, + EINTR => continue, + EINVAL => unreachable, + EFAULT => unreachable, + EAGAIN => if (std.event.Loop.instance) |loop| { + loop.waitUntilFdWritable(fd); + continue; + } else { + return error.WouldBlock; + }, + EBADF => unreachable, // Always a race condition. + EDESTADDRREQ => unreachable, // `connect` was never called. + EDQUOT => return error.DiskQuota, + EFBIG => return error.FileTooBig, + EIO => return error.InputOutput, + ENOSPC => return error.NoSpaceLeft, + EPERM => return error.AccessDenied, + EPIPE => return error.BrokenPipe, + else => |err| return unexpectedErrno(err), + } + } +} + /// Write multiple buffers to a file descriptor, with a position offset. -/// Keeps trying if it gets interrupted. +/// +/// Retries when interrupted by a signal. +/// +/// If the application has a global event loop enabled, EAGAIN is handled +/// via the event loop. Otherwise EAGAIN results in `error.WouldBlock`. +/// +/// This operation is non-atomic on the following systems: +/// * Darwin +/// * Windows +/// On these systems, the write races with concurrent writes to the same file descriptor, and +/// the file can be in a partially written state when an error occurs. pub fn pwritev(fd: fd_t, iov: []const iovec_const, offset: u64) WriteError!void { if (comptime std.Target.current.isDarwin()) { // Darwin does not have pwritev but it does have pwrite. @@ -589,6 +737,15 @@ pub fn pwritev(fd: fd_t, iov: []const iovec_const, offset: u64) WriteError!void } } + if (comptime std.Target.current.isWindows()) { + var off = offset; + for (iov) |item| { + try pwrite(fd, item.iov_base[0..item.iov_len], off); + off += buf.len; + } + return; + } + while (true) { // TODO handle the case when iov_len is too large and get rid of this @intCast const rc = system.pwritev(fd, iov.ptr, @intCast(u32, iov.len), offset); @@ -694,7 +851,7 @@ pub fn openC(file_path: [*:0]const u8, flags: u32, perm: usize) OpenError!fd_t { /// Open and possibly create a file. Keeps trying if it gets interrupted. /// `file_path` is relative to the open directory handle `dir_fd`. /// See also `openatC`. -pub fn openat(dir_fd: fd_t, file_path: []const u8, flags: u32, mode: usize) OpenError!fd_t { +pub fn openat(dir_fd: fd_t, file_path: []const u8, flags: u32, mode: mode_t) OpenError!fd_t { const file_path_c = try toPosixPath(file_path); return openatC(dir_fd, &file_path_c, flags, mode); } @@ -702,7 +859,7 @@ pub fn openat(dir_fd: fd_t, file_path: []const u8, flags: u32, mode: usize) Open /// Open and possibly create a file. Keeps trying if it gets interrupted. /// `file_path` is relative to the open directory handle `dir_fd`. /// See also `openat`. -pub fn openatC(dir_fd: fd_t, file_path: [*:0]const u8, flags: u32, mode: usize) OpenError!fd_t { +pub fn openatC(dir_fd: fd_t, file_path: [*:0]const u8, flags: u32, mode: mode_t) OpenError!fd_t { while (true) { const rc = system.openat(dir_fd, file_path, flags, mode); switch (errno(rc)) { @@ -3328,9 +3485,7 @@ pub fn getrusage(who: i32) rusage { } } -pub const TermiosGetError = error{ - NotATerminal, -} || UnexpectedError; +pub const TermiosGetError = error{NotATerminal} || UnexpectedError; pub fn tcgetattr(handle: fd_t) TermiosGetError!termios { var term: termios = undefined; @@ -3342,9 +3497,7 @@ pub fn tcgetattr(handle: fd_t) TermiosGetError!termios { } } -pub const TermiosSetError = TermiosGetError || error{ - ProcessOrphaned, -}; +pub const TermiosSetError = TermiosGetError || error{ProcessOrphaned}; pub fn tcsetattr(handle: fd_t, optional_action: TCSA, termios_p: termios) TermiosSetError!void { while (true) { diff --git a/lib/std/os/bits/darwin.zig b/lib/std/os/bits/darwin.zig index d4340443e5..1070f68839 100644 --- a/lib/std/os/bits/darwin.zig +++ b/lib/std/os/bits/darwin.zig @@ -4,6 +4,7 @@ const maxInt = std.math.maxInt; pub const fd_t = c_int; pub const pid_t = c_int; +pub const mode_t = c_uint; pub const in_port_t = u16; pub const sa_family_t = u8; diff --git a/lib/std/os/bits/dragonfly.zig b/lib/std/os/bits/dragonfly.zig index 750ca9dff0..c6c23affa7 100644 --- a/lib/std/os/bits/dragonfly.zig +++ b/lib/std/os/bits/dragonfly.zig @@ -7,6 +7,7 @@ pub fn S_ISCHR(m: u32) bool { pub const fd_t = c_int; pub const pid_t = c_int; pub const off_t = c_long; +pub const mode_t = c_uint; pub const ENOTSUP = EOPNOTSUPP; pub const EWOULDBLOCK = EAGAIN; diff --git a/lib/std/os/bits/freebsd.zig b/lib/std/os/bits/freebsd.zig index 66540433e4..b7d14934f5 100644 --- a/lib/std/os/bits/freebsd.zig +++ b/lib/std/os/bits/freebsd.zig @@ -3,6 +3,7 @@ const maxInt = std.math.maxInt; pub const fd_t = c_int; pub const pid_t = c_int; +pub const mode_t = c_uint; pub const socklen_t = u32; diff --git a/lib/std/os/bits/linux/x86_64.zig b/lib/std/os/bits/linux/x86_64.zig index da3caf2c88..608f74e2d3 100644 --- a/lib/std/os/bits/linux/x86_64.zig +++ b/lib/std/os/bits/linux/x86_64.zig @@ -12,6 +12,8 @@ const socklen_t = linux.socklen_t; const iovec = linux.iovec; const iovec_const = linux.iovec_const; +pub const mode_t = usize; + pub const SYS_read = 0; pub const SYS_write = 1; pub const SYS_open = 2; diff --git a/lib/std/os/bits/netbsd.zig b/lib/std/os/bits/netbsd.zig index 2f2494d4ce..89e0998d6d 100644 --- a/lib/std/os/bits/netbsd.zig +++ b/lib/std/os/bits/netbsd.zig @@ -3,6 +3,7 @@ const maxInt = std.math.maxInt; pub const fd_t = c_int; pub const pid_t = c_int; +pub const mode_t = c_uint; /// Renamed from `kevent` to `Kevent` to avoid conflict with function name. pub const Kevent = extern struct { diff --git a/lib/std/os/bits/wasi.zig b/lib/std/os/bits/wasi.zig index 139418ded2..f56e53504e 100644 --- a/lib/std/os/bits/wasi.zig +++ b/lib/std/os/bits/wasi.zig @@ -130,6 +130,7 @@ pub const EVENTTYPE_FD_WRITE: eventtype_t = 2; pub const exitcode_t = u32; pub const fd_t = u32; +pub const mode_t = u32; pub const fdflags_t = u16; pub const FDFLAG_APPEND: fdflags_t = 0x0001; diff --git a/lib/std/os/bits/windows.zig b/lib/std/os/bits/windows.zig index 2998049577..ba2725e0a9 100644 --- a/lib/std/os/bits/windows.zig +++ b/lib/std/os/bits/windows.zig @@ -5,6 +5,7 @@ const ws2_32 = @import("../windows/ws2_32.zig"); pub const fd_t = HANDLE; pub const pid_t = HANDLE; +pub const mode_t = u0; pub const PATH_MAX = 260; diff --git a/lib/std/os/windows.zig b/lib/std/os/windows.zig index b48d0e50b8..cc0d446b12 100644 --- a/lib/std/os/windows.zig +++ b/lib/std/os/windows.zig @@ -344,24 +344,77 @@ pub fn FindClose(hFindFile: HANDLE) void { assert(kernel32.FindClose(hFindFile) != 0); } -pub const ReadFileError = error{Unexpected}; +pub const ReadFileError = error{ + OperationAborted, + BrokenPipe, + Unexpected, +}; -pub fn ReadFile(in_hFile: HANDLE, buffer: []u8) ReadFileError!usize { - var index: usize = 0; - while (index < buffer.len) { - const want_read_count = @intCast(DWORD, math.min(@as(DWORD, maxInt(DWORD)), buffer.len - index)); - var amt_read: DWORD = undefined; - if (kernel32.ReadFile(in_hFile, buffer.ptr + index, want_read_count, &amt_read, null) == 0) { - switch (kernel32.GetLastError()) { - .OPERATION_ABORTED => continue, - .BROKEN_PIPE => return index, - else => |err| return unexpectedError(err), +/// If buffer's length exceeds what a Windows DWORD integer can hold, it will be broken into +/// multiple non-atomic reads. +pub fn ReadFile(in_hFile: HANDLE, buffer: []u8, offset: ?u64) ReadFileError!usize { + if (std.event.Loop.instance) |loop| { + // TODO support async ReadFile with no offset + const off = offset.?; + var resume_node = std.event.Loop.ResumeNode.Basic{ + .base = .{ + .id = .Basic, + .handle = @frame(), + .overlapped = OVERLAPPED{ + .Internal = 0, + .InternalHigh = 0, + .Offset = @truncate(u32, off), + .OffsetHigh = @truncate(u32, off >> 32), + .hEvent = null, + }, + }, + }; + // TODO only call create io completion port once per fd + _ = windows.CreateIoCompletionPort(fd, loop.os_data.io_port, undefined, undefined) catch undefined; + loop.beginOneEvent(); + suspend { + // TODO handle buffer bigger than DWORD can hold + _ = windows.kernel32.ReadFile(fd, buffer.ptr, @intCast(windows.DWORD, buffer.len), null, &resume_node.base.overlapped); + } + var bytes_transferred: windows.DWORD = undefined; + if (windows.kernel32.GetOverlappedResult(fd, &resume_node.base.overlapped, &bytes_transferred, windows.FALSE) == 0) { + switch (windows.kernel32.GetLastError()) { + .IO_PENDING => unreachable, + .OPERATION_ABORTED => return error.OperationAborted, + .BROKEN_PIPE => return error.BrokenPipe, + .HANDLE_EOF => return @as(usize, bytes_transferred), + else => |err| return windows.unexpectedError(err), } } - if (amt_read == 0) return index; - index += amt_read; + return @as(usize, bytes_transferred); + } else { + var index: usize = 0; + while (index < buffer.len) { + const want_read_count = @intCast(DWORD, math.min(@as(DWORD, maxInt(DWORD)), buffer.len - index)); + var amt_read: DWORD = undefined; + var overlapped_data: OVERLAPPED = undefined; + const overlapped: ?*OVERLAPPED = if (offset) |off| blk: { + overlapped_data = .{ + .Internal = 0, + .InternalHigh = 0, + .Offset = @truncate(u32, off + index), + .OffsetHigh = @truncate(u32, (off + index) >> 32), + .hEvent = null, + }; + break :blk &overlapped_data; + } else null; + if (kernel32.ReadFile(in_hFile, buffer.ptr + index, want_read_count, &amt_read, overlapped) == 0) { + switch (kernel32.GetLastError()) { + .OPERATION_ABORTED => continue, + .BROKEN_PIPE => return index, + else => |err| return unexpectedError(err), + } + } + if (amt_read == 0) return index; + index += amt_read; + } + return index; } - return index; } pub const WriteFileError = error{ @@ -371,20 +424,66 @@ pub const WriteFileError = error{ Unexpected, }; -/// This function is for blocking file descriptors only. For non-blocking, see -/// `WriteFileAsync`. -pub fn WriteFile(handle: HANDLE, bytes: []const u8) WriteFileError!void { - var bytes_written: DWORD = undefined; - // TODO replace this @intCast with a loop that writes all the bytes - if (kernel32.WriteFile(handle, bytes.ptr, @intCast(u32, bytes.len), &bytes_written, null) == 0) { - switch (kernel32.GetLastError()) { - .INVALID_USER_BUFFER => return error.SystemResources, - .NOT_ENOUGH_MEMORY => return error.SystemResources, - .OPERATION_ABORTED => return error.OperationAborted, - .NOT_ENOUGH_QUOTA => return error.SystemResources, - .IO_PENDING => unreachable, // this function is for blocking files only - .BROKEN_PIPE => return error.BrokenPipe, - else => |err| return unexpectedError(err), +pub fn WriteFile(handle: HANDLE, bytes: []const u8, offset: ?u64) WriteFileError!void { + if (std.event.Loop.instance) |loop| { + // TODO support async WriteFile with no offset + const off = offset.?; + var resume_node = std.event.Loop.ResumeNode.Basic{ + .base = .{ + .id = .Basic, + .handle = @frame(), + .overlapped = OVERLAPPED{ + .Internal = 0, + .InternalHigh = 0, + .Offset = @truncate(u32, off), + .OffsetHigh = @truncate(u32, off >> 32), + .hEvent = null, + }, + }, + }; + // TODO only call create io completion port once per fd + _ = CreateIoCompletionPort(fd, loop.os_data.io_port, undefined, undefined); + loop.beginOneEvent(); + suspend { + // TODO replace this @intCast with a loop that writes all the bytes + _ = kernel32.WriteFile(fd, bytes.ptr, @intCast(windows.DWORD, bytes.len), null, &resume_node.base.overlapped); + } + var bytes_transferred: windows.DWORD = undefined; + if (kernel32.GetOverlappedResult(fd, &resume_node.base.overlapped, &bytes_transferred, FALSE) == 0) { + switch (kernel32.GetLastError()) { + .IO_PENDING => unreachable, + .INVALID_USER_BUFFER => return error.SystemResources, + .NOT_ENOUGH_MEMORY => return error.SystemResources, + .OPERATION_ABORTED => return error.OperationAborted, + .NOT_ENOUGH_QUOTA => return error.SystemResources, + .BROKEN_PIPE => return error.BrokenPipe, + else => |err| return windows.unexpectedError(err), + } + } + } else { + var bytes_written: DWORD = undefined; + var overlapped_data: OVERLAPPED = undefined; + const overlapped: ?*OVERLAPPED = if (offset) |off| blk: { + overlapped_data = .{ + .Internal = 0, + .InternalHigh = 0, + .Offset = @truncate(u32, off), + .OffsetHigh = @truncate(u32, off >> 32), + .hEvent = null, + }; + break :blk &overlapped_data; + } else null; + // TODO replace this @intCast with a loop that writes all the bytes + if (kernel32.WriteFile(handle, bytes.ptr, @intCast(u32, bytes.len), &bytes_written, overlapped) == 0) { + switch (kernel32.GetLastError()) { + .INVALID_USER_BUFFER => return error.SystemResources, + .NOT_ENOUGH_MEMORY => return error.SystemResources, + .OPERATION_ABORTED => return error.OperationAborted, + .NOT_ENOUGH_QUOTA => return error.SystemResources, + .IO_PENDING => unreachable, // this function is for blocking files only + .BROKEN_PIPE => return error.BrokenPipe, + else => |err| return unexpectedError(err), + } } } } diff --git a/lib/std/special/test_runner.zig b/lib/std/special/test_runner.zig index 4ba2e3aa7e..6dd208e3b4 100644 --- a/lib/std/special/test_runner.zig +++ b/lib/std/special/test_runner.zig @@ -2,7 +2,7 @@ const std = @import("std"); const io = std.io; const builtin = @import("builtin"); -pub const io_mode = builtin.test_io_mode; +pub const io_mode: io.Mode = builtin.test_io_mode; pub fn main() anyerror!void { const test_fn_list = builtin.test_functions; @@ -14,6 +14,11 @@ pub fn main() anyerror!void { error.TimerUnsupported => @panic("timer unsupported"), }; + var async_frame_buffer: []align(std.Target.stack_align) u8 = undefined; + // TODO this is on the next line (using `undefined` above) because otherwise zig incorrectly + // ignores the alignment of the slice. + async_frame_buffer = &[_]u8{}; + for (test_fn_list) |test_fn, i| { std.testing.base_allocator_instance.reset(); @@ -23,7 +28,24 @@ pub fn main() anyerror!void { if (progress.terminal == null) { std.debug.warn("{}/{} {}...", .{ i + 1, test_fn_list.len, test_fn.name }); } - if (test_fn.func()) |_| { + const result = if (test_fn.async_frame_size) |size| switch (io_mode) { + .evented => blk: { + if (async_frame_buffer.len < size) { + std.heap.page_allocator.free(async_frame_buffer); + async_frame_buffer = try std.heap.page_allocator.alignedAlloc(u8, std.Target.stack_align, size); + } + const casted_fn = @ptrCast(async fn () anyerror!void, test_fn.func); + break :blk await @asyncCall(async_frame_buffer, {}, casted_fn); + }, + .blocking => { + skip_count += 1; + test_node.end(); + progress.log("{}...SKIP (async test)\n", .{test_fn.name}); + if (progress.terminal == null) std.debug.warn("SKIP (async test)\n", .{}); + continue; + }, + } else test_fn.func(); + if (result) |_| { ok_count += 1; test_node.end(); std.testing.allocator_instance.validate() catch |err| switch (err) { diff --git a/src/analyze.cpp b/src/analyze.cpp index c15b6b3d57..155f017da9 100644 --- a/src/analyze.cpp +++ b/src/analyze.cpp @@ -6144,6 +6144,15 @@ static bool scope_needs_spill(Scope *scope) { zig_unreachable(); } +static ZigType *resolve_type_isf(ZigType *ty) { + if (ty->id != ZigTypeIdPointer) return ty; + InferredStructField *isf = ty->data.pointer.inferred_struct_field; + if (isf == nullptr) return ty; + TypeStructField *field = find_struct_type_field(isf->inferred_struct_type, isf->field_name); + assert(field != nullptr); + return field->type_entry; +} + static Error resolve_async_frame(CodeGen *g, ZigType *frame_type) { Error err; @@ -6245,6 +6254,9 @@ static Error resolve_async_frame(CodeGen *g, ZigType *frame_type) { } ZigFn *callee = call->fn_entry; if (callee == nullptr) { + if (call->fn_ref->value->type->data.fn.fn_type_id.cc != CallingConventionAsync) { + continue; + } add_node_error(g, call->base.base.source_node, buf_sprintf("function is not comptime-known; @asyncCall required")); return ErrorSemanticAnalyzeFail; @@ -6402,7 +6414,7 @@ static Error resolve_async_frame(CodeGen *g, ZigType *frame_type) { } else { param_name = buf_sprintf("@arg%" ZIG_PRI_usize, arg_i); } - ZigType *param_type = param_info->type; + ZigType *param_type = resolve_type_isf(param_info->type); if ((err = type_resolve(g, param_type, ResolveStatusSizeKnown))) { return err; } @@ -6421,7 +6433,7 @@ static Error resolve_async_frame(CodeGen *g, ZigType *frame_type) { instruction->field_index = SIZE_MAX; ZigType *ptr_type = instruction->base.value->type; assert(ptr_type->id == ZigTypeIdPointer); - ZigType *child_type = ptr_type->data.pointer.child_type; + ZigType *child_type = resolve_type_isf(ptr_type->data.pointer.child_type); if (!type_has_bits(child_type)) continue; if (instruction->base.base.ref_count == 0) @@ -6448,8 +6460,6 @@ static Error resolve_async_frame(CodeGen *g, ZigType *frame_type) { } instruction->field_index = fields.length; - src_assert(child_type->id != ZigTypeIdPointer || child_type->data.pointer.inferred_struct_field == nullptr, - instruction->base.base.source_node); fields.append({name, child_type, instruction->align}); } @@ -8251,6 +8261,8 @@ static void resolve_llvm_types_struct(CodeGen *g, ZigType *struct_type, ResolveS size_t debug_field_index = 0; for (size_t i = 0; i < field_count; i += 1) { TypeStructField *field = struct_type->data.structure.fields[i]; + //fprintf(stderr, "%s at gen index %zu\n", buf_ptr(field->name), field->gen_index); + size_t gen_field_index = field->gen_index; if (gen_field_index == SIZE_MAX) { continue; diff --git a/src/codegen.cpp b/src/codegen.cpp index f71617901b..573aade2f0 100644 --- a/src/codegen.cpp +++ b/src/codegen.cpp @@ -362,14 +362,16 @@ static uint32_t frame_index_arg(CodeGen *g, ZigType *return_type) { } // label (grep this): [fn_frame_struct_layout] -static uint32_t frame_index_trace_stack(CodeGen *g, FnTypeId *fn_type_id) { - uint32_t result = frame_index_arg(g, fn_type_id->return_type); - for (size_t i = 0; i < fn_type_id->param_count; i += 1) { - if (type_has_bits(fn_type_id->param_info->type)) { - result += 1; - } +static uint32_t frame_index_trace_stack(CodeGen *g, ZigFn *fn) { + size_t field_index = 6; + bool have_stack_trace = codegen_fn_has_err_ret_tracing_arg(g, fn->type_entry->data.fn.fn_type_id.return_type); + if (have_stack_trace) { + field_index += 2; } - return result; + field_index += fn->type_entry->data.fn.fn_type_id.param_count; + ZigType *locals_struct = fn->frame_type->data.frame.locals_struct; + TypeStructField *field = locals_struct->data.structure.fields[field_index]; + return field->gen_index; } @@ -7742,7 +7744,7 @@ static void do_code_gen(CodeGen *g) { } uint32_t trace_field_index_stack = UINT32_MAX; if (codegen_fn_has_err_ret_tracing_stack(g, fn_table_entry, true)) { - trace_field_index_stack = frame_index_trace_stack(g, fn_type_id); + trace_field_index_stack = frame_index_trace_stack(g, fn_table_entry); g->cur_err_ret_trace_val_stack = LLVMBuildStructGEP(g->builder, g->cur_frame_ptr, trace_field_index_stack, ""); } @@ -9396,22 +9398,13 @@ static void update_test_functions_builtin_decl(CodeGen *g) { for (size_t i = 0; i < g->test_fns.length; i += 1) { ZigFn *test_fn_entry = g->test_fns.at(i); - if (fn_is_async(test_fn_entry)) { - ErrorMsg *msg = add_node_error(g, test_fn_entry->proto_node, - buf_create_from_str("test functions cannot be async")); - add_error_note(g, msg, test_fn_entry->proto_node, - buf_sprintf("this restriction may be lifted in the future. See https://github.com/ziglang/zig/issues/3117 for more details")); - add_async_error_notes(g, msg, test_fn_entry); - continue; - } - ZigValue *this_val = &test_fn_array->data.x_array.data.s_none.elements[i]; this_val->special = ConstValSpecialStatic; this_val->type = struct_type; this_val->parent.id = ConstParentIdArray; this_val->parent.data.p_array.array_val = test_fn_array; this_val->parent.data.p_array.elem_index = i; - this_val->data.x_struct.fields = alloc_const_vals_ptrs(2); + this_val->data.x_struct.fields = alloc_const_vals_ptrs(3); ZigValue *name_field = this_val->data.x_struct.fields[0]; ZigValue *name_array_val = create_const_str_lit(g, &test_fn_entry->symbol_name)->data.x_ptr.data.ref.pointee; @@ -9423,6 +9416,19 @@ static void update_test_functions_builtin_decl(CodeGen *g) { fn_field->data.x_ptr.special = ConstPtrSpecialFunction; fn_field->data.x_ptr.mut = ConstPtrMutComptimeConst; fn_field->data.x_ptr.data.fn.fn_entry = test_fn_entry; + + ZigValue *frame_size_field = this_val->data.x_struct.fields[2]; + frame_size_field->type = get_optional_type(g, g->builtin_types.entry_usize); + frame_size_field->special = ConstValSpecialStatic; + frame_size_field->data.x_optional = nullptr; + + if (fn_is_async(test_fn_entry)) { + frame_size_field->data.x_optional = create_const_vals(1); + frame_size_field->data.x_optional->special = ConstValSpecialStatic; + frame_size_field->data.x_optional->type = g->builtin_types.entry_usize; + bigint_init_unsigned(&frame_size_field->data.x_optional->data.x_bigint, + test_fn_entry->frame_type->abi_size); + } } report_errors_and_maybe_exit(g);