diff --git a/lib/std/Io.zig b/lib/std/Io.zig index 256e113ba0..219c0daee8 100644 --- a/lib/std/Io.zig +++ b/lib/std/Io.zig @@ -579,7 +579,6 @@ pub const VTable = struct { context_alignment: std.mem.Alignment, start: *const fn (context: *const anyopaque, result: *anyopaque) void, ) ?*AnyFuture, - /// This function is only called when `async` returns a non-null value. /// /// Thread-safe. @@ -609,7 +608,6 @@ pub const VTable = struct { result: []u8, result_alignment: std.mem.Alignment, ) void, - /// Returns whether the current thread of execution is known to have /// been requested to cancel. /// @@ -619,8 +617,11 @@ pub const VTable = struct { createFile: *const fn (?*anyopaque, dir: fs.Dir, sub_path: []const u8, flags: fs.File.CreateFlags) FileOpenError!fs.File, openFile: *const fn (?*anyopaque, dir: fs.Dir, sub_path: []const u8, flags: fs.File.OpenFlags) FileOpenError!fs.File, closeFile: *const fn (?*anyopaque, fs.File) void, - read: *const fn (?*anyopaque, file: fs.File, buffer: []u8) FileReadError!usize, - write: *const fn (?*anyopaque, file: fs.File, buffer: []const u8) FileWriteError!usize, + pread: *const fn (?*anyopaque, file: fs.File, buffer: []u8, offset: std.posix.off_t) FilePReadError!usize, + pwrite: *const fn (?*anyopaque, file: fs.File, buffer: []const u8, offset: std.posix.off_t) FilePWriteError!usize, + + now: *const fn (?*anyopaque, clockid: std.posix.clockid_t) ClockGetTimeError!Timestamp, + sleep: *const fn (?*anyopaque, clockid: std.posix.clockid_t, deadline: Deadline) SleepError!void, }; pub const OpenFlags = fs.File.OpenFlags; @@ -628,7 +629,27 @@ pub const CreateFlags = fs.File.CreateFlags; pub const FileOpenError = fs.File.OpenError || error{AsyncCancel}; pub const FileReadError = fs.File.ReadError || error{AsyncCancel}; +pub const FilePReadError = fs.File.PReadError || error{AsyncCancel}; pub const FileWriteError = fs.File.WriteError || error{AsyncCancel}; +pub const FilePWriteError = fs.File.PWriteError || error{AsyncCancel}; + +pub const Timestamp = enum(i96) { + _, + + pub fn durationTo(from: Timestamp, to: Timestamp) i96 { + return @intFromEnum(to) - @intFromEnum(from); + } + + pub fn addDuration(from: Timestamp, duration: i96) Timestamp { + return @enumFromInt(@intFromEnum(from) + duration); + } +}; +pub const Deadline = union(enum) { + nanoseconds: i96, + timestamp: Timestamp, +}; +pub const ClockGetTimeError = std.posix.ClockGetTimeError || error{AsyncCancel}; +pub const SleepError = error{ UnsupportedClock, Unexpected, AsyncCancel }; pub const AnyFuture = opaque {}; @@ -694,11 +715,19 @@ pub fn closeFile(io: Io, file: fs.File) void { } pub fn read(io: Io, file: fs.File, buffer: []u8) FileReadError!usize { - return io.vtable.read(io.userdata, file, buffer); + return @errorCast(io.pread(file, buffer, -1)); +} + +pub fn pread(io: Io, file: fs.File, buffer: []u8, offset: std.posix.off_t) FilePReadError!usize { + return io.vtable.pread(io.userdata, file, buffer, offset); } pub fn write(io: Io, file: fs.File, buffer: []const u8) FileWriteError!usize { - return io.vtable.write(io.userdata, file, buffer); + return @errorCast(io.pwrite(file, buffer, -1)); +} + +pub fn pwrite(io: Io, file: fs.File, buffer: []const u8, offset: std.posix.off_t) FilePWriteError!usize { + return io.vtable.pwrite(io.userdata, file, buffer, offset); } pub fn writeAll(io: Io, file: fs.File, bytes: []const u8) FileWriteError!void { @@ -717,3 +746,11 @@ pub fn readAll(io: Io, file: fs.File, buffer: []u8) FileReadError!usize { } return index; } + +pub fn now(io: Io, clockid: std.posix.clockid_t) ClockGetTimeError!Timestamp { + return io.vtable.now(io.userdata, clockid); +} + +pub fn sleep(io: Io, clockid: std.posix.clockid_t, deadline: Deadline) SleepError!void { + return io.vtable.sleep(io.userdata, clockid, deadline); +} diff --git a/lib/std/Io/EventLoop.zig b/lib/std/Io/EventLoop.zig index 5de161d9f9..a24d5173e2 100644 --- a/lib/std/Io/EventLoop.zig +++ b/lib/std/Io/EventLoop.zig @@ -31,10 +31,12 @@ const Thread = struct { idle_search_index: u32, steal_ready_search_index: u32, - threadlocal var index: u32 = undefined; + const canceling: ?*Thread = @ptrFromInt(@alignOf(Thread)); - fn current(el: *EventLoop) *Thread { - return &el.threads.allocated[index]; + threadlocal var self: *Thread = undefined; + + fn current() *Thread { + return self; } fn currentFiber(thread: *Thread) *Fiber { @@ -52,10 +54,9 @@ const Fiber = struct { context: Context, awaiter: ?*Fiber, queue_next: ?*Fiber, - can_cancel: bool, - canceled: bool, + cancel_thread: ?*Thread, - const finished: ?*Fiber = @ptrFromInt(std.mem.alignBackward(usize, std.math.maxInt(usize), @alignOf(Fiber))); + const finished: ?*Fiber = @ptrFromInt(@alignOf(Thread)); const max_result_align: Alignment = .@"16"; const max_result_size = max_result_align.forward(64); @@ -75,7 +76,7 @@ const Fiber = struct { ); fn allocate(el: *EventLoop) error{OutOfMemory}!*Fiber { - const thread: *Thread = .current(el); + const thread: *Thread = .current(); if (thread.free_queue) |free_fiber| { thread.free_queue = free_fiber.queue_next; free_fiber.queue_next = null; @@ -101,6 +102,40 @@ const Fiber = struct { return @ptrFromInt(alignment.forward(@intFromPtr(f) + @sizeOf(Fiber))); } + fn enterCancelRegion(fiber: *Fiber, thread: *Thread) error{AsyncCancel}!void { + if (@cmpxchgStrong( + ?*Thread, + &fiber.cancel_thread, + null, + thread, + .acq_rel, + .acquire, + )) |cancel_thread| { + assert(cancel_thread == Thread.canceling); + return error.AsyncCancel; + } + } + + fn exitCancelRegion(fiber: *Fiber, thread: *Thread) void { + if (@cmpxchgStrong( + ?*Thread, + &fiber.cancel_thread, + thread, + null, + .acq_rel, + .acquire, + )) |cancel_thread| assert(cancel_thread == Thread.canceling); + } + + fn recycle(fiber: *Fiber) void { + const thread: *Thread = .current(); + std.log.debug("recyling {*}", .{fiber}); + assert(fiber.queue_next == null); + @memset(fiber.allocatedSlice(), undefined); + fiber.queue_next = thread.free_queue; + thread.free_queue = fiber; + } + const Queue = struct { head: *Fiber, tail: *Fiber }; }; @@ -110,13 +145,18 @@ pub fn io(el: *EventLoop) Io { .vtable = &.{ .@"async" = @"async", .@"await" = @"await", + .cancel = cancel, .cancelRequested = cancelRequested, + .createFile = createFile, .openFile = openFile, .closeFile = closeFile, - .read = read, - .write = write, + .pread = pread, + .pwrite = pwrite, + + .now = now, + .sleep = sleep, }, }; } @@ -133,8 +173,7 @@ pub fn init(el: *EventLoop, gpa: Allocator) !void { .context = undefined, .awaiter = null, .queue_next = null, - .can_cancel = false, - .canceled = false, + .cancel_thread = null, }, .threads = .{ .allocated = @ptrCast(allocated_slice[0..threads_size]), @@ -142,8 +181,8 @@ pub fn init(el: *EventLoop, gpa: Allocator) !void { .active = 1, }, }; - Thread.index = 0; const main_thread = &el.threads.allocated[0]; + Thread.self = main_thread; const idle_stack_end: [*]usize = @alignCast(@ptrCast(allocated_slice[idle_stack_end_offset..].ptr)); (idle_stack_end - 1)[0..1].* = .{@intFromPtr(el)}; main_thread.* = .{ @@ -168,24 +207,22 @@ pub fn init(el: *EventLoop, gpa: Allocator) !void { pub fn deinit(el: *EventLoop) void { const active_threads = @atomicLoad(u32, &el.threads.active, .acquire); for (el.threads.allocated[0..active_threads]) |*thread| - assert(@atomicLoad(?*Fiber, &thread.ready_queue, .unordered) == null); // pending async + assert(@atomicLoad(?*Fiber, &thread.ready_queue, .acquire) == null); // pending async el.yield(null, .exit); + for (el.threads.allocated[0..active_threads]) |*thread| while (thread.free_queue) |free_fiber| { + thread.free_queue = free_fiber.queue_next; + free_fiber.queue_next = null; + el.gpa.free(free_fiber.allocatedSlice()); + }; const allocated_ptr: [*]align(@alignOf(Thread)) u8 = @alignCast(@ptrCast(el.threads.allocated.ptr)); const idle_stack_end_offset = std.mem.alignForward(usize, el.threads.allocated.len * @sizeOf(Thread) + idle_stack_size, std.heap.page_size_max); - for (el.threads.allocated[1..active_threads]) |*thread| { - thread.thread.join(); - while (thread.free_queue) |free_fiber| { - thread.free_queue = free_fiber.queue_next; - free_fiber.queue_next = null; - el.gpa.free(free_fiber.allocatedSlice()); - } - } + for (el.threads.allocated[1..active_threads]) |thread| thread.thread.join(); el.gpa.free(allocated_ptr[0..idle_stack_end_offset]); el.* = undefined; } fn yield(el: *EventLoop, maybe_ready_fiber: ?*Fiber, pending_task: SwitchMessage.PendingTask) void { - const thread: *Thread = .current(el); + const thread: *Thread = .current(); const ready_context: *Context = if (maybe_ready_fiber) |ready_fiber| &ready_fiber.context else if (thread.ready_queue) |ready_fiber| ready_context: { @@ -198,6 +235,7 @@ fn yield(el: *EventLoop, maybe_ready_fiber: ?*Fiber, pending_task: SwitchMessage defer thread.steal_ready_search_index += 1; if (thread.steal_ready_search_index == ready_threads) thread.steal_ready_search_index = 0; const steal_ready_search_thread = &el.threads.allocated[thread.steal_ready_search_index]; + if (steal_ready_search_thread == thread) continue; const ready_fiber = @atomicLoad(?*Fiber, &steal_ready_search_thread.ready_queue, .acquire) orelse continue; if (@cmpxchgWeak( ?*Fiber, @@ -236,6 +274,7 @@ fn schedule(el: *EventLoop, thread: *Thread, ready_queue: Fiber.Queue) void { defer thread.idle_search_index += 1; if (thread.idle_search_index == new_thread_index) thread.idle_search_index = 0; const idle_search_thread = &el.threads.allocated[thread.idle_search_index]; + if (idle_search_thread == thread) continue; if (@cmpxchgWeak( ?*Fiber, &idle_search_thread.ready_queue, @@ -249,11 +288,11 @@ fn schedule(el: *EventLoop, thread: *Thread, ready_queue: Fiber.Queue) void { .flags = std.os.linux.IOSQE_CQE_SKIP_SUCCESS, .ioprio = 0, .fd = idle_search_thread.io_uring.fd, - .off = @intFromEnum(Completion.Key.wakeup), + .off = @intFromEnum(Completion.UserData.wakeup), .addr = 0, .len = 0, .rw_flags = 0, - .user_data = @intFromEnum(Completion.Key.wakeup), + .user_data = @intFromEnum(Completion.UserData.wakeup), .buf_index = 0, .personality = 0, .splice_fd_in = 0, @@ -314,15 +353,6 @@ fn schedule(el: *EventLoop, thread: *Thread, ready_queue: Fiber.Queue) void { )) |old_head| ready_queue.tail.queue_next = old_head; } -fn recycle(el: *EventLoop, fiber: *Fiber) void { - const thread: *Thread = .current(el); - std.log.debug("recyling {*}", .{fiber}); - assert(fiber.queue_next == null); - @memset(fiber.allocatedSlice(), undefined); - fiber.queue_next = thread.free_queue; - thread.free_queue = fiber; -} - fn mainIdle(el: *EventLoop, message: *const SwitchMessage) callconv(.withStackAlign(.c, @max(@alignOf(Thread), @alignOf(Context)))) noreturn { message.handle(el); const thread: *Thread = &el.threads.allocated[0]; @@ -332,17 +362,16 @@ fn mainIdle(el: *EventLoop, message: *const SwitchMessage) callconv(.withStackAl } fn threadEntry(el: *EventLoop, index: u32) void { - Thread.index = index; const thread: *Thread = &el.threads.allocated[index]; + Thread.self = thread; std.log.debug("created thread idle {*}", .{&thread.idle_context}); el.idle(thread); } const Completion = struct { - const Key = enum(usize) { + const UserData = enum(usize) { unused, wakeup, - cancel, cleanup, exit, /// *Fiber @@ -369,26 +398,43 @@ fn idle(el: *EventLoop, thread: *Thread) void { break :cqes_len 0; }, else => |e| @panic(@errorName(e)), - }]) |cqe| switch (@as(Completion.Key, @enumFromInt(cqe.user_data))) { + }]) |cqe| switch (@as(Completion.UserData, @enumFromInt(cqe.user_data))) { .unused => unreachable, // bad submission queued? .wakeup => {}, - .cancel => {}, .cleanup => @panic("failed to notify other threads that we are exiting"), .exit => { assert(maybe_ready_fiber == null and maybe_ready_queue == null); // pending async return; }, - _ => { - const fiber: *Fiber = @ptrFromInt(cqe.user_data); - assert(fiber.queue_next == null); - fiber.resultPointer(Completion).* = .{ - .result = cqe.res, - .flags = cqe.flags, - }; - if (maybe_ready_fiber == null) maybe_ready_fiber = fiber else if (maybe_ready_queue) |*ready_queue| { - ready_queue.tail.queue_next = fiber; - ready_queue.tail = fiber; - } else maybe_ready_queue = .{ .head = fiber, .tail = fiber }; + _ => switch (errno(cqe.res)) { + .INTR => getSqe(&thread.io_uring).* = .{ + .opcode = .ASYNC_CANCEL, + .flags = std.os.linux.IOSQE_CQE_SKIP_SUCCESS, + .ioprio = 0, + .fd = 0, + .off = 0, + .addr = cqe.user_data, + .len = 0, + .rw_flags = 0, + .user_data = @intFromEnum(Completion.UserData.wakeup), + .buf_index = 0, + .personality = 0, + .splice_fd_in = 0, + .addr3 = 0, + .resv = 0, + }, + else => { + const fiber: *Fiber = @ptrFromInt(cqe.user_data); + assert(fiber.queue_next == null); + fiber.resultPointer(Completion).* = .{ + .result = cqe.res, + .flags = cqe.flags, + }; + if (maybe_ready_fiber == null) maybe_ready_fiber = fiber else if (maybe_ready_queue) |*ready_queue| { + ready_queue.tail.queue_next = fiber; + ready_queue.tail = fiber; + } else maybe_ready_queue = .{ .head = fiber, .tail = fiber }; + }, }, }; if (maybe_ready_queue) |ready_queue| el.schedule(thread, ready_queue); @@ -409,7 +455,7 @@ const SwitchMessage = struct { }; fn handle(message: *const SwitchMessage, el: *EventLoop) void { - const thread: *Thread = .current(el); + const thread: *Thread = .current(); thread.current_context = message.contexts.ready; switch (message.pending_task) { .nothing => {}, @@ -429,11 +475,11 @@ const SwitchMessage = struct { .flags = std.os.linux.IOSQE_CQE_SKIP_SUCCESS, .ioprio = 0, .fd = each_thread.io_uring.fd, - .off = @intFromEnum(Completion.Key.exit), + .off = @intFromEnum(Completion.UserData.exit), .addr = 0, .len = 0, .rw_flags = 0, - .user_data = @intFromEnum(Completion.Key.cleanup), + .user_data = @intFromEnum(Completion.UserData.cleanup), .buf_index = 0, .personality = 0, .splice_fd_in = 0, @@ -544,6 +590,7 @@ fn @"async"( start(context.ptr, result.ptr); return null; }; + errdefer fiber.recycle(); std.log.debug("allocated {*}", .{fiber}); const closure: *AsyncClosure = @ptrFromInt(Fiber.max_context_align.max(.of(AsyncClosure)).backward( @@ -560,8 +607,7 @@ fn @"async"( }, .awaiter = null, .queue_next = null, - .can_cancel = false, - .canceled = false, + .cancel_thread = null, }; closure.* = .{ .event_loop = event_loop, @@ -571,7 +617,7 @@ fn @"async"( }; @memcpy(closure.contextPointer(), context); - event_loop.schedule(.current(event_loop), .{ .head = fiber, .tail = fiber }); + event_loop.schedule(.current(), .{ .head = fiber, .tail = fiber }); return @ptrCast(fiber); } @@ -585,7 +631,7 @@ fn @"await"( const future_fiber: *Fiber = @alignCast(@ptrCast(any_future)); if (@atomicLoad(?*Fiber, &future_fiber.awaiter, .acquire) != Fiber.finished) event_loop.yield(null, .{ .register_awaiter = &future_fiber.awaiter }); @memcpy(result, future_fiber.resultBytes(result_alignment)); - event_loop.recycle(future_fiber); + future_fiber.recycle(); } fn cancel( @@ -594,35 +640,37 @@ fn cancel( result: []u8, result_alignment: Alignment, ) void { - const event_loop: *EventLoop = @alignCast(@ptrCast(userdata)); const future_fiber: *Fiber = @alignCast(@ptrCast(any_future)); - @atomicStore(bool, &future_fiber.canceled, true, .release); - if (@atomicLoad(bool, &future_fiber.can_cancel, .acquire)) { - const thread: *Thread = .current(event_loop); - getSqe(&thread.io_uring).* = .{ - .opcode = .ASYNC_CANCEL, + if (@atomicRmw( + ?*Thread, + &future_fiber.cancel_thread, + .Xchg, + Thread.canceling, + .acq_rel, + )) |cancel_thread| if (cancel_thread != Thread.canceling) { + getSqe(&Thread.current().io_uring).* = .{ + .opcode = .MSG_RING, .flags = std.os.linux.IOSQE_CQE_SKIP_SUCCESS, .ioprio = 0, - .fd = 0, - .off = 0, - .addr = @intFromPtr(future_fiber), - .len = 0, + .fd = cancel_thread.io_uring.fd, + .off = @intFromPtr(future_fiber), + .addr = 0, + .len = @bitCast(-@as(i32, @intFromEnum(std.os.linux.E.INTR))), .rw_flags = 0, - .user_data = @intFromEnum(Completion.Key.cancel), + .user_data = @intFromEnum(Completion.UserData.cleanup), .buf_index = 0, .personality = 0, .splice_fd_in = 0, .addr3 = 0, .resv = 0, }; - } + }; @"await"(userdata, any_future, result, result_alignment); } fn cancelRequested(userdata: ?*anyopaque) bool { - const event_loop: *EventLoop = @alignCast(@ptrCast(userdata)); - const thread: *Thread = .current(event_loop); - return thread.currentFiber().canceled; + _ = userdata; + return @atomicLoad(?*Thread, &Thread.current().currentFiber().cancel_thread, .acquire) == Thread.canceling; } pub fn createFile( @@ -632,6 +680,10 @@ pub fn createFile( flags: Io.CreateFlags, ) Io.FileOpenError!std.fs.File { const el: *EventLoop = @alignCast(@ptrCast(userdata)); + const thread: *Thread = .current(); + const iou = &thread.io_uring; + const fiber = thread.currentFiber(); + try fiber.enterCancelRegion(thread); const posix = std.posix; const sub_path_c = try posix.toPosixPath(sub_path); @@ -670,23 +722,30 @@ pub fn createFile( @panic("TODO"); } - const thread: *Thread = .current(el); - const iou = &thread.io_uring; - const fiber = thread.currentFiber(); - if (@atomicLoad(bool, &fiber.canceled, .acquire)) return error.AsyncCancel; + getSqe(iou).* = .{ + .opcode = .OPENAT, + .flags = 0, + .ioprio = 0, + .fd = dir.fd, + .off = 0, + .addr = @intFromPtr(&sub_path_c), + .len = @intCast(flags.mode), + .rw_flags = @bitCast(os_flags), + .user_data = @intFromPtr(fiber), + .buf_index = 0, + .personality = 0, + .splice_fd_in = 0, + .addr3 = 0, + .resv = 0, + }; - const sqe = getSqe(iou); - sqe.prep_openat(dir.fd, &sub_path_c, os_flags, flags.mode); - sqe.user_data = @intFromPtr(fiber); - - @atomicStore(bool, &fiber.can_cancel, true, .release); el.yield(null, .nothing); - @atomicStore(bool, &fiber.can_cancel, false, .release); + fiber.exitCancelRegion(thread); const completion = fiber.resultPointer(Completion); switch (errno(completion.result)) { .SUCCESS => return .{ .handle = completion.result }, - .INTR => @panic("TODO is this reachable?"), + .INTR => unreachable, .CANCELED => return error.AsyncCancel, .FAULT => unreachable, @@ -723,10 +782,10 @@ pub fn openFile( flags: Io.OpenFlags, ) Io.FileOpenError!std.fs.File { const el: *EventLoop = @alignCast(@ptrCast(userdata)); - const thread: *Thread = .current(el); + const thread: *Thread = .current(); const iou = &thread.io_uring; const fiber = thread.currentFiber(); - if (@atomicLoad(bool, &fiber.canceled, .acquire)) return error.AsyncCancel; + try fiber.enterCancelRegion(thread); const posix = std.posix; const sub_path_c = try posix.toPosixPath(sub_path); @@ -771,18 +830,30 @@ pub fn openFile( @panic("TODO"); } - const sqe = getSqe(iou); - sqe.prep_openat(dir.fd, &sub_path_c, os_flags, 0); - sqe.user_data = @intFromPtr(fiber); + getSqe(iou).* = .{ + .opcode = .OPENAT, + .flags = 0, + .ioprio = 0, + .fd = dir.fd, + .off = 0, + .addr = @intFromPtr(&sub_path_c), + .len = 0, + .rw_flags = @bitCast(os_flags), + .user_data = @intFromPtr(fiber), + .buf_index = 0, + .personality = 0, + .splice_fd_in = 0, + .addr3 = 0, + .resv = 0, + }; - @atomicStore(bool, &fiber.can_cancel, true, .release); el.yield(null, .nothing); - @atomicStore(bool, &fiber.can_cancel, false, .release); + fiber.exitCancelRegion(thread); const completion = fiber.resultPointer(Completion); switch (errno(completion.result)) { .SUCCESS => return .{ .handle = completion.result }, - .INTR => @panic("TODO is this reachable?"), + .INTR => unreachable, .CANCELED => return error.AsyncCancel, .FAULT => unreachable, @@ -814,20 +885,33 @@ pub fn openFile( pub fn closeFile(userdata: ?*anyopaque, file: std.fs.File) void { const el: *EventLoop = @alignCast(@ptrCast(userdata)); - const thread: *Thread = .current(el); + const thread: *Thread = .current(); const iou = &thread.io_uring; const fiber = thread.currentFiber(); - const sqe = getSqe(iou); - sqe.prep_close(file.handle); - sqe.user_data = @intFromPtr(fiber); + getSqe(iou).* = .{ + .opcode = .CLOSE, + .flags = 0, + .ioprio = 0, + .fd = file.handle, + .off = 0, + .addr = 0, + .len = 0, + .rw_flags = 0, + .user_data = @intFromPtr(fiber), + .buf_index = 0, + .personality = 0, + .splice_fd_in = 0, + .addr3 = 0, + .resv = 0, + }; el.yield(null, .nothing); const completion = fiber.resultPointer(Completion); switch (errno(completion.result)) { .SUCCESS => return, - .INTR => @panic("TODO is this reachable?"), + .INTR => unreachable, .CANCELED => return, .BADF => unreachable, // Always a race condition. @@ -835,25 +919,37 @@ pub fn closeFile(userdata: ?*anyopaque, file: std.fs.File) void { } } -pub fn read(userdata: ?*anyopaque, file: std.fs.File, buffer: []u8) Io.FileReadError!usize { +pub fn pread(userdata: ?*anyopaque, file: std.fs.File, buffer: []u8, offset: std.posix.off_t) Io.FilePReadError!usize { const el: *EventLoop = @alignCast(@ptrCast(userdata)); - const thread: *Thread = .current(el); + const thread: *Thread = .current(); const iou = &thread.io_uring; const fiber = thread.currentFiber(); - if (@atomicLoad(bool, &fiber.canceled, .acquire)) return error.AsyncCancel; + try fiber.enterCancelRegion(thread); - const sqe = getSqe(iou); - sqe.prep_read(file.handle, buffer, std.math.maxInt(u64)); - sqe.user_data = @intFromPtr(fiber); + getSqe(iou).* = .{ + .opcode = .READ, + .flags = 0, + .ioprio = 0, + .fd = file.handle, + .off = @bitCast(offset), + .addr = @intFromPtr(buffer.ptr), + .len = @min(buffer.len, 0x7ffff000), + .rw_flags = 0, + .user_data = @intFromPtr(fiber), + .buf_index = 0, + .personality = 0, + .splice_fd_in = 0, + .addr3 = 0, + .resv = 0, + }; - @atomicStore(bool, &fiber.can_cancel, true, .release); el.yield(null, .nothing); - @atomicStore(bool, &fiber.can_cancel, false, .release); + fiber.exitCancelRegion(thread); const completion = fiber.resultPointer(Completion); switch (errno(completion.result)) { .SUCCESS => return @as(u32, @bitCast(completion.result)), - .INTR => @panic("TODO is this reachable?"), + .INTR => unreachable, .CANCELED => return error.AsyncCancel, .INVAL => unreachable, @@ -868,30 +964,44 @@ pub fn read(userdata: ?*anyopaque, file: std.fs.File, buffer: []u8) Io.FileReadE .NOTCONN => return error.SocketNotConnected, .CONNRESET => return error.ConnectionResetByPeer, .TIMEDOUT => return error.ConnectionTimedOut, + .NXIO => return error.Unseekable, + .SPIPE => return error.Unseekable, + .OVERFLOW => return error.Unseekable, else => |err| return std.posix.unexpectedErrno(err), } } -pub fn write(userdata: ?*anyopaque, file: std.fs.File, buffer: []const u8) Io.FileWriteError!usize { +pub fn pwrite(userdata: ?*anyopaque, file: std.fs.File, buffer: []const u8, offset: std.posix.off_t) Io.FilePWriteError!usize { const el: *EventLoop = @alignCast(@ptrCast(userdata)); - - const thread: *Thread = .current(el); + const thread: *Thread = .current(); const iou = &thread.io_uring; const fiber = thread.currentFiber(); - if (@atomicLoad(bool, &fiber.canceled, .acquire)) return error.AsyncCancel; + try fiber.enterCancelRegion(thread); - const sqe = getSqe(iou); - sqe.prep_write(file.handle, buffer, std.math.maxInt(u64)); - sqe.user_data = @intFromPtr(fiber); + getSqe(iou).* = .{ + .opcode = .WRITE, + .flags = 0, + .ioprio = 0, + .fd = file.handle, + .off = @bitCast(offset), + .addr = @intFromPtr(buffer.ptr), + .len = @min(buffer.len, 0x7ffff000), + .rw_flags = 0, + .user_data = @intFromPtr(fiber), + .buf_index = 0, + .personality = 0, + .splice_fd_in = 0, + .addr3 = 0, + .resv = 0, + }; - @atomicStore(bool, &fiber.can_cancel, true, .release); el.yield(null, .nothing); - @atomicStore(bool, &fiber.can_cancel, false, .release); + fiber.exitCancelRegion(thread); const completion = fiber.resultPointer(Completion); switch (errno(completion.result)) { .SUCCESS => return @as(u32, @bitCast(completion.result)), - .INTR => @panic("TODO is this reachable?"), + .INTR => unreachable, .CANCELED => return error.AsyncCancel, .INVAL => return error.InvalidArgument, @@ -907,17 +1017,77 @@ pub fn write(userdata: ?*anyopaque, file: std.fs.File, buffer: []const u8) Io.Fi .ACCES => return error.AccessDenied, .PERM => return error.PermissionDenied, .PIPE => return error.BrokenPipe, - .CONNRESET => return error.ConnectionResetByPeer, + .NXIO => return error.Unseekable, + .SPIPE => return error.Unseekable, + .OVERFLOW => return error.Unseekable, .BUSY => return error.DeviceBusy, - .NXIO => return error.NoDevice, + .CONNRESET => return error.ConnectionResetByPeer, .MSGSIZE => return error.MessageTooBig, else => |err| return std.posix.unexpectedErrno(err), } } -fn errno(signed: i32) std.posix.E { - const int = if (signed > -4096 and signed < 0) -signed else 0; - return @enumFromInt(int); +pub fn now(userdata: ?*anyopaque, clockid: std.posix.clockid_t) Io.ClockGetTimeError!Io.Timestamp { + _ = userdata; + const timespec = try std.posix.clock_gettime(clockid); + return @enumFromInt(@as(i128, timespec.sec) * std.time.ns_per_s + timespec.nsec); +} + +pub fn sleep(userdata: ?*anyopaque, clockid: std.posix.clockid_t, deadline: Io.Deadline) Io.SleepError!void { + const el: *EventLoop = @alignCast(@ptrCast(userdata)); + const thread: *Thread = .current(); + const iou = &thread.io_uring; + const fiber = thread.currentFiber(); + try fiber.enterCancelRegion(thread); + + const deadline_nanoseconds: i96 = switch (deadline) { + .nanoseconds => |nanoseconds| nanoseconds, + .timestamp => |timestamp| @intFromEnum(timestamp), + }; + const timespec: std.os.linux.kernel_timespec = .{ + .sec = @intCast(@divFloor(deadline_nanoseconds, std.time.ns_per_s)), + .nsec = @intCast(@mod(deadline_nanoseconds, std.time.ns_per_s)), + }; + getSqe(iou).* = .{ + .opcode = .TIMEOUT, + .flags = 0, + .ioprio = 0, + .fd = 0, + .off = 0, + .addr = @intFromPtr(×pec), + .len = 1, + .rw_flags = @as(u32, switch (deadline) { + .nanoseconds => 0, + .timestamp => std.os.linux.IORING_TIMEOUT_ABS, + }) | @as(u32, switch (clockid) { + .REALTIME => std.os.linux.IORING_TIMEOUT_REALTIME, + .MONOTONIC => 0, + .BOOTTIME => std.os.linux.IORING_TIMEOUT_BOOTTIME, + else => return error.UnsupportedClock, + }), + .user_data = @intFromPtr(fiber), + .buf_index = 0, + .personality = 0, + .splice_fd_in = 0, + .addr3 = 0, + .resv = 0, + }; + + el.yield(null, .nothing); + fiber.exitCancelRegion(thread); + + const completion = fiber.resultPointer(Completion); + switch (errno(completion.result)) { + .SUCCESS, .TIME => return, + .INTR => unreachable, + .CANCELED => return error.AsyncCancel, + + else => |err| return std.posix.unexpectedErrno(err), + } +} + +fn errno(signed: i32) std.os.linux.E { + return .init(@bitCast(@as(isize, signed))); } fn getSqe(iou: *IoUring) *std.os.linux.io_uring_sqe { diff --git a/lib/std/Thread/Pool.zig b/lib/std/Thread/Pool.zig index 0cddadf40d..6ec6c89040 100644 --- a/lib/std/Thread/Pool.zig +++ b/lib/std/Thread/Pool.zig @@ -332,13 +332,18 @@ pub fn io(pool: *Pool) Io { .vtable = &.{ .@"async" = @"async", .@"await" = @"await", + .cancel = cancel, .cancelRequested = cancelRequested, + .createFile = createFile, .openFile = openFile, .closeFile = closeFile, - .read = read, - .write = write, + .pread = pread, + .pwrite = pwrite, + + .now = now, + .sleep = sleep, }, }; } @@ -347,15 +352,44 @@ const AsyncClosure = struct { func: *const fn (context: *anyopaque, result: *anyopaque) void, runnable: Runnable = .{ .runFn = runFn }, reset_event: std.Thread.ResetEvent, - cancel_flag: bool, + cancel_tid: std.Thread.Id, context_offset: usize, result_offset: usize, + const canceling_tid: std.Thread.Id = switch (@typeInfo(std.Thread.Id)) { + .int => |int_info| switch (int_info.signedness) { + .signed => -1, + .unsigned => std.math.maxInt(std.Thread.Id), + }, + .pointer => @ptrFromInt(std.math.maxInt(usize)), + else => @compileError("unsupported std.Thread.Id: " ++ @typeName(std.Thread.Id)), + }; + fn runFn(runnable: *std.Thread.Pool.Runnable, _: ?usize) void { const closure: *AsyncClosure = @alignCast(@fieldParentPtr("runnable", runnable)); + const tid = std.Thread.getCurrentId(); + if (@cmpxchgStrong( + std.Thread.Id, + &closure.cancel_tid, + 0, + tid, + .acq_rel, + .acquire, + )) |cancel_tid| { + assert(cancel_tid == canceling_tid); + return; + } current_closure = closure; closure.func(closure.contextPointer(), closure.resultPointer()); current_closure = null; + if (@cmpxchgStrong( + std.Thread.Id, + &closure.cancel_tid, + tid, + 0, + .acq_rel, + .acquire, + )) |cancel_tid| assert(cancel_tid == canceling_tid); closure.reset_event.set(); } @@ -414,7 +448,7 @@ fn @"async"( .context_offset = context_offset, .result_offset = result_offset, .reset_event = .{}, - .cancel_flag = false, + .cancel_tid = 0, }; @memcpy(closure.contextPointer()[0..context.len], context); pool.run_queue.prepend(&closure.runnable.node); @@ -456,7 +490,23 @@ fn cancel( _ = result_alignment; const pool: *std.Thread.Pool = @alignCast(@ptrCast(userdata)); const closure: *AsyncClosure = @ptrCast(@alignCast(any_future)); - @atomicStore(bool, &closure.cancel_flag, true, .seq_cst); + switch (@atomicRmw( + std.Thread.Id, + &closure.cancel_tid, + .Xchg, + AsyncClosure.canceling_tid, + .acq_rel, + )) { + 0, AsyncClosure.canceling_tid => {}, + else => |cancel_tid| switch (builtin.os.tag) { + .linux => _ = std.os.linux.tgkill( + std.os.linux.getpid(), + @bitCast(cancel_tid), + std.posix.SIG.IO, + ), + else => {}, + }, + } closure.waitAndFree(pool.allocator, result); } @@ -464,7 +514,7 @@ fn cancelRequested(userdata: ?*anyopaque) bool { const pool: *std.Thread.Pool = @alignCast(@ptrCast(userdata)); _ = pool; const closure = current_closure orelse return false; - return @atomicLoad(bool, &closure.cancel_flag, .unordered); + return @atomicLoad(std.Thread.Id, &closure.cancel_tid, .acquire) == AsyncClosure.canceling_tid; } fn checkCancel(pool: *Pool) error{AsyncCancel}!void { @@ -499,14 +549,52 @@ pub fn closeFile(userdata: ?*anyopaque, file: std.fs.File) void { return file.close(); } -pub fn read(userdata: ?*anyopaque, file: std.fs.File, buffer: []u8) Io.FileReadError!usize { +pub fn pread(userdata: ?*anyopaque, file: std.fs.File, buffer: []u8, offset: std.posix.off_t) Io.FilePReadError!usize { const pool: *std.Thread.Pool = @alignCast(@ptrCast(userdata)); try pool.checkCancel(); - return file.read(buffer); + return switch (offset) { + -1 => file.read(buffer), + else => file.pread(buffer, @bitCast(offset)), + }; } -pub fn write(userdata: ?*anyopaque, file: std.fs.File, buffer: []const u8) Io.FileWriteError!usize { +pub fn pwrite(userdata: ?*anyopaque, file: std.fs.File, buffer: []const u8, offset: std.posix.off_t) Io.FilePWriteError!usize { const pool: *std.Thread.Pool = @alignCast(@ptrCast(userdata)); try pool.checkCancel(); - return file.write(buffer); + return switch (offset) { + -1 => file.write(buffer), + else => file.pwrite(buffer, @bitCast(offset)), + }; +} + +pub fn now(userdata: ?*anyopaque, clockid: std.posix.clockid_t) Io.ClockGetTimeError!Io.Timestamp { + const pool: *std.Thread.Pool = @alignCast(@ptrCast(userdata)); + try pool.checkCancel(); + const timespec = try std.posix.clock_gettime(clockid); + return @enumFromInt(@as(i128, timespec.sec) * std.time.ns_per_s + timespec.nsec); +} + +pub fn sleep(userdata: ?*anyopaque, clockid: std.posix.clockid_t, deadline: Io.Deadline) Io.SleepError!void { + const pool: *std.Thread.Pool = @alignCast(@ptrCast(userdata)); + const deadline_nanoseconds: i96 = switch (deadline) { + .nanoseconds => |nanoseconds| nanoseconds, + .timestamp => |timestamp| @intFromEnum(timestamp), + }; + var timespec: std.posix.timespec = .{ + .sec = @intCast(@divFloor(deadline_nanoseconds, std.time.ns_per_s)), + .nsec = @intCast(@mod(deadline_nanoseconds, std.time.ns_per_s)), + }; + while (true) { + try pool.checkCancel(); + switch (std.os.linux.E.init(std.os.linux.clock_nanosleep(clockid, .{ .ABSTIME = switch (deadline) { + .nanoseconds => false, + .timestamp => true, + } }, ×pec, ×pec))) { + .SUCCESS => return, + .FAULT => unreachable, + .INTR => {}, + .INVAL => return error.UnsupportedClock, + else => |err| return std.posix.unexpectedErrno(err), + } + } } diff --git a/lib/std/start.zig b/lib/std/start.zig index 60ddae43ac..0678f6e1ce 100644 --- a/lib/std/start.zig +++ b/lib/std/start.zig @@ -581,8 +581,8 @@ inline fn callMainWithArgs(argc: usize, argv: [*][*:0]u8, envp: [][*:0]u8) u8 { std.os.argv = argv[0..argc]; std.os.environ = envp; + maybeIgnoreSignals(); std.debug.maybeEnableSegfaultHandler(); - maybeIgnoreSigpipe(); return callMain(); } @@ -687,8 +687,8 @@ pub fn call_wWinMain() std.os.windows.INT { return root.wWinMain(hInstance, null, lpCmdLine, nCmdShow); } -fn maybeIgnoreSigpipe() void { - const have_sigpipe_support = switch (builtin.os.tag) { +fn maybeIgnoreSignals() void { + switch (builtin.os.tag) { .linux, .plan9, .solaris, @@ -703,22 +703,20 @@ fn maybeIgnoreSigpipe() void { .dragonfly, .freebsd, .serenity, - => true, - - else => false, - }; - - if (have_sigpipe_support and !std.options.keep_sigpipe) { - const posix = std.posix; - const act: posix.Sigaction = .{ - // Set handler to a noop function instead of `SIG.IGN` to prevent - // leaking signal disposition to a child process. - .handler = .{ .handler = noopSigHandler }, - .mask = posix.sigemptyset(), - .flags = 0, - }; - posix.sigaction(posix.SIG.PIPE, &act, null); + => {}, + else => return, } + const posix = std.posix; + const act: posix.Sigaction = .{ + // Set handler to a noop function instead of `SIG.IGN` to prevent + // leaking signal disposition to a child process. + .handler = .{ .handler = noopSigHandler }, + .mask = posix.sigemptyset(), + .flags = 0, + }; + if (!std.options.keep_sigpoll) posix.sigaction(posix.SIG.POLL, &act, null); + if (@hasField(posix.SIG, "IO") and posix.SIG.IO != posix.SIG.POLL and !std.options.keep_sigio) posix.sigaction(posix.SIG.IO, &act, null); + if (!std.options.keep_sigpipe) posix.sigaction(posix.SIG.PIPE, &act, null); } fn noopSigHandler(_: i32) callconv(.c) void {} diff --git a/lib/std/std.zig b/lib/std/std.zig index 4e68d1d611..6853109f26 100644 --- a/lib/std/std.zig +++ b/lib/std/std.zig @@ -145,6 +145,9 @@ pub const Options = struct { crypto_fork_safety: bool = true, + keep_sigpoll: bool = false, + keep_sigio: bool = false, + /// By default Zig disables SIGPIPE by setting a "no-op" handler for it. Set this option /// to `true` to prevent that. ///