diff --git a/lib/std/Io.zig b/lib/std/Io.zig index e63f5068e3..aae2c5c535 100644 --- a/lib/std/Io.zig +++ b/lib/std/Io.zig @@ -1,7 +1,5 @@ -const std = @import("std.zig"); const builtin = @import("builtin"); -const root = @import("root"); -const c = std.c; +const std = @import("std.zig"); const is_windows = builtin.os.tag == .windows; const windows = std.os.windows; const posix = std.posix; @@ -9,8 +7,6 @@ const math = std.math; const assert = std.debug.assert; const fs = std.fs; const mem = std.mem; -const meta = std.meta; -const File = std.fs.File; const Allocator = std.mem.Allocator; const Alignment = std.mem.Alignment; @@ -972,6 +968,12 @@ pub const VTable = struct { /// Thread-safe. cancelRequested: *const fn (?*anyopaque) bool, + mutexLock: *const fn (?*anyopaque, mutex: *Mutex) void, + mutexUnlock: *const fn (?*anyopaque, mutex: *Mutex) void, + + conditionWait: *const fn (?*anyopaque, cond: *Condition, mutex: *Mutex, timeout_ns: ?u64) Condition.WaitError!void, + conditionWake: *const fn (?*anyopaque, cond: *Condition, notify: Condition.Notify) void, + createFile: *const fn (?*anyopaque, dir: fs.Dir, sub_path: []const u8, flags: fs.File.CreateFlags) FileOpenError!fs.File, openFile: *const fn (?*anyopaque, dir: fs.Dir, sub_path: []const u8, flags: fs.File.OpenFlags) FileOpenError!fs.File, closeFile: *const fn (?*anyopaque, fs.File) void, @@ -985,11 +987,11 @@ pub const VTable = struct { pub const OpenFlags = fs.File.OpenFlags; pub const CreateFlags = fs.File.CreateFlags; -pub const FileOpenError = fs.File.OpenError || error{AsyncCancel}; -pub const FileReadError = fs.File.ReadError || error{AsyncCancel}; -pub const FilePReadError = fs.File.PReadError || error{AsyncCancel}; -pub const FileWriteError = fs.File.WriteError || error{AsyncCancel}; -pub const FilePWriteError = fs.File.PWriteError || error{AsyncCancel}; +pub const FileOpenError = fs.File.OpenError || error{Canceled}; +pub const FileReadError = fs.File.ReadError || error{Canceled}; +pub const FilePReadError = fs.File.PReadError || error{Canceled}; +pub const FileWriteError = fs.File.WriteError || error{Canceled}; +pub const FilePWriteError = fs.File.PWriteError || error{Canceled}; pub const Timestamp = enum(i96) { _, @@ -1006,8 +1008,8 @@ pub const Deadline = union(enum) { nanoseconds: i96, timestamp: Timestamp, }; -pub const ClockGetTimeError = std.posix.ClockGetTimeError || error{AsyncCancel}; -pub const SleepError = error{ UnsupportedClock, Unexpected, AsyncCancel }; +pub const ClockGetTimeError = std.posix.ClockGetTimeError || error{Canceled}; +pub const SleepError = error{ UnsupportedClock, Unexpected, Canceled }; pub const AnyFuture = opaque {}; @@ -1036,6 +1038,302 @@ pub fn Future(Result: type) type { }; } +pub const Mutex = struct { + state: std.atomic.Value(u32) = std.atomic.Value(u32).init(unlocked), + + pub const unlocked: u32 = 0b00; + pub const locked: u32 = 0b01; + pub const contended: u32 = 0b11; // must contain the `locked` bit for x86 optimization below + + pub fn tryLock(m: *Mutex) bool { + // On x86, use `lock bts` instead of `lock cmpxchg` as: + // - they both seem to mark the cache-line as modified regardless: https://stackoverflow.com/a/63350048 + // - `lock bts` is smaller instruction-wise which makes it better for inlining + if (builtin.target.cpu.arch.isX86()) { + const locked_bit = @ctz(locked); + return m.state.bitSet(locked_bit, .acquire) == 0; + } + + // Acquire barrier ensures grabbing the lock happens before the critical section + // and that the previous lock holder's critical section happens before we grab the lock. + return m.state.cmpxchgWeak(unlocked, locked, .acquire, .monotonic) == null; + } + + /// Avoids the vtable for uncontended locks. + pub fn lock(m: *Mutex, io: Io) void { + if (!m.tryLock()) { + @branchHint(.unlikely); + io.vtable.mutexLock(io.userdata, m); + } + } + + pub fn unlock(m: *Mutex, io: Io) void { + io.vtable.mutexUnlock(io.userdata, m); + } +}; + +pub const Condition = struct { + state: u64 = 0, + + pub const WaitError = error{ + Timeout, + Canceled, + }; + + /// How many waiters to wake up. + pub const Notify = enum { + one, + all, + }; + + pub fn wait(cond: *Condition, io: Io, mutex: *Mutex) void { + io.vtable.conditionWait(io.userdata, cond, mutex, null) catch |err| switch (err) { + error.Timeout => unreachable, // no timeout provided so we shouldn't have timed-out + error.Canceled => return, // handled as spurious wakeup + }; + } + + pub fn timedWait(cond: *Condition, io: Io, mutex: *Mutex, timeout_ns: u64) WaitError!void { + return io.vtable.conditionWait(io.userdata, cond, mutex, timeout_ns); + } + + pub fn signal(cond: *Condition, io: Io) void { + io.vtable.conditionWake(io.userdata, cond, .one); + } + + pub fn broadcast(cond: *Condition, io: Io) void { + io.vtable.conditionWake(io.userdata, cond, .all); + } +}; + +pub const TypeErasedQueue = struct { + mutex: Mutex, + + /// Ring buffer. This data is logically *after* queued getters. + buffer: []u8, + put_index: usize, + get_index: usize, + + putters: std.DoublyLinkedList(PutNode), + getters: std.DoublyLinkedList(GetNode), + + const PutNode = struct { + remaining: []const u8, + condition: Condition, + }; + + const GetNode = struct { + remaining: []u8, + condition: Condition, + }; + + pub fn init(buffer: []u8) TypeErasedQueue { + return .{ + .mutex = .{}, + .buffer = buffer, + .put_index = 0, + .get_index = 0, + .putters = .{}, + .getters = .{}, + }; + } + + pub fn put(q: *TypeErasedQueue, io: Io, elements: []const u8, min: usize) usize { + assert(elements.len >= min); + + q.mutex.lock(io); + defer q.mutex.unlock(io); + + // Getters have first priority on the data, and only when the getters + // queue is empty do we start populating the buffer. + + var remaining = elements; + while (true) { + const getter = q.getters.popFirst() orelse break; + const copy_len = @min(getter.data.remaining.len, remaining.len); + @memcpy(getter.data.remaining[0..copy_len], remaining[0..copy_len]); + remaining = remaining[copy_len..]; + getter.data.remaining = getter.data.remaining[copy_len..]; + if (getter.data.remaining.len == 0) { + getter.data.condition.signal(io); + continue; + } + q.getters.prepend(getter); + assert(remaining.len == 0); + return elements.len; + } + + while (true) { + { + const available = q.buffer[q.put_index..]; + const copy_len = @min(available.len, remaining.len); + @memcpy(available[0..copy_len], remaining[0..copy_len]); + remaining = remaining[copy_len..]; + q.put_index += copy_len; + if (remaining.len == 0) return elements.len; + } + { + const available = q.buffer[0..q.get_index]; + const copy_len = @min(available.len, remaining.len); + @memcpy(available[0..copy_len], remaining[0..copy_len]); + remaining = remaining[copy_len..]; + q.put_index = copy_len; + if (remaining.len == 0) return elements.len; + } + + const total_filled = elements.len - remaining.len; + if (total_filled >= min) return total_filled; + + var node: std.DoublyLinkedList(PutNode).Node = .{ + .data = .{ .remaining = remaining, .condition = .{} }, + }; + q.putters.append(&node); + node.data.condition.wait(io, &q.mutex); + remaining = node.data.remaining; + } + } + + pub fn get(q: *@This(), io: Io, buffer: []u8, min: usize) usize { + assert(buffer.len >= min); + + q.mutex.lock(io); + defer q.mutex.unlock(io); + + // The ring buffer gets first priority, then data should come from any + // queued putters, then finally the ring buffer should be filled with + // data from putters so they can be resumed. + + var remaining = buffer; + while (true) { + if (q.get_index <= q.put_index) { + const available = q.buffer[q.get_index..q.put_index]; + const copy_len = @min(available.len, remaining.len); + @memcpy(remaining[0..copy_len], available[0..copy_len]); + q.get_index += copy_len; + remaining = remaining[copy_len..]; + if (remaining.len == 0) return fillRingBufferFromPutters(q, io, buffer.len); + } else { + { + const available = q.buffer[q.get_index..]; + const copy_len = @min(available.len, remaining.len); + @memcpy(remaining[0..copy_len], available[0..copy_len]); + q.get_index += copy_len; + remaining = remaining[copy_len..]; + if (remaining.len == 0) return fillRingBufferFromPutters(q, io, buffer.len); + } + { + const available = q.buffer[0..q.put_index]; + const copy_len = @min(available.len, remaining.len); + @memcpy(remaining[0..copy_len], available[0..copy_len]); + q.get_index = copy_len; + remaining = remaining[copy_len..]; + if (remaining.len == 0) return fillRingBufferFromPutters(q, io, buffer.len); + } + } + // Copy directly from putters into buffer. + while (remaining.len > 0) { + const putter = q.putters.popFirst() orelse break; + const copy_len = @min(putter.data.remaining.len, remaining.len); + @memcpy(remaining[0..copy_len], putter.data.remaining[0..copy_len]); + putter.data.remaining = putter.data.remaining[copy_len..]; + remaining = remaining[copy_len..]; + if (putter.data.remaining.len == 0) { + putter.data.condition.signal(io); + } else { + assert(remaining.len == 0); + q.putters.prepend(putter); + return fillRingBufferFromPutters(q, io, buffer.len); + } + } + // Both ring buffer and putters queue is empty. + const total_filled = buffer.len - remaining.len; + if (total_filled >= min) return total_filled; + + var node: std.DoublyLinkedList(GetNode).Node = .{ + .data = .{ .remaining = remaining, .condition = .{} }, + }; + q.getters.append(&node); + node.data.condition.wait(io, &q.mutex); + remaining = node.data.remaining; + } + } + + /// Called when there is nonzero space available in the ring buffer and + /// potentially putters waiting. The mutex is already held and the task is + /// to copy putter data to the ring buffer and signal any putters whose + /// buffers been fully copied. + fn fillRingBufferFromPutters(q: *TypeErasedQueue, io: Io, len: usize) usize { + while (true) { + const putter = q.putters.popFirst() orelse return len; + const available = q.buffer[q.put_index..]; + const copy_len = @min(available.len, putter.data.remaining.len); + @memcpy(available[0..copy_len], putter.data.remaining[0..copy_len]); + putter.data.remaining = putter.data.remaining[copy_len..]; + q.put_index += copy_len; + if (putter.data.remaining.len == 0) { + putter.data.condition.signal(io); + continue; + } + const second_available = q.buffer[0..q.get_index]; + const second_copy_len = @min(second_available.len, putter.data.remaining.len); + @memcpy(second_available[0..second_copy_len], putter.data.remaining[0..second_copy_len]); + putter.data.remaining = putter.data.remaining[copy_len..]; + q.put_index = copy_len; + if (putter.data.remaining.len == 0) { + putter.data.condition.signal(io); + continue; + } + q.putters.prepend(putter); + return len; + } + } +}; + +/// Many producer, many consumer, thread-safe, runtime configurable buffer size. +/// When buffer is empty, consumers suspend and are resumed by producers. +/// When buffer is full, producers suspend and are resumed by consumers. +pub fn Queue(Elem: type) type { + return struct { + type_erased: TypeErasedQueue, + + pub fn init(buffer: []Elem) @This() { + return .{ .type_erased = .init(@ptrCast(buffer)) }; + } + + /// Appends elements to the end of the queue. The function returns when + /// at least `min` elements have been added to the buffer or sent + /// directly to a consumer. + /// + /// Returns how many elements have been added to the queue. + /// + /// Asserts that `elements.len >= min`. + pub fn put(q: *@This(), io: Io, elements: []const Elem, min: usize) usize { + return @divExact(q.type_erased.put(io, @ptrCast(elements), min * @sizeOf(Elem)), @sizeOf(Elem)); + } + + /// Receives elements from the beginning of the queue. The function + /// returns when at least `min` elements have been populated inside + /// `buffer`. + /// + /// Returns how many elements of `buffer` have been populated. + /// + /// Asserts that `buffer.len >= min`. + pub fn get(q: *@This(), io: Io, buffer: []Elem, min: usize) usize { + return @divExact(q.type_erased.get(io, @ptrCast(buffer), min * @sizeOf(Elem)), @sizeOf(Elem)); + } + + pub fn putOne(q: *@This(), io: Io, item: Elem) void { + assert(q.put(io, &.{item}, 1) == 1); + } + + pub fn getOne(q: *@This(), io: Io) Elem { + var buf: [1]Elem = undefined; + assert(q.get(io, &buf, 1) == 1); + return buf[0]; + } + }; +} + /// Calls `function` with `args`, such that the return value of the function is /// not guaranteed to be available until `await` is called. pub fn async(io: Io, function: anytype, args: anytype) Future(@typeInfo(@TypeOf(function)).@"fn".return_type.?) { diff --git a/lib/std/Io/EventLoop.zig b/lib/std/Io/EventLoop.zig index a24d5173e2..55ed05b146 100644 --- a/lib/std/Io/EventLoop.zig +++ b/lib/std/Io/EventLoop.zig @@ -102,7 +102,7 @@ const Fiber = struct { return @ptrFromInt(alignment.forward(@intFromPtr(f) + @sizeOf(Fiber))); } - fn enterCancelRegion(fiber: *Fiber, thread: *Thread) error{AsyncCancel}!void { + fn enterCancelRegion(fiber: *Fiber, thread: *Thread) error{Canceled}!void { if (@cmpxchgStrong( ?*Thread, &fiber.cancel_thread, @@ -112,7 +112,7 @@ const Fiber = struct { .acquire, )) |cancel_thread| { assert(cancel_thread == Thread.canceling); - return error.AsyncCancel; + return error.Canceled; } } @@ -746,7 +746,7 @@ pub fn createFile( switch (errno(completion.result)) { .SUCCESS => return .{ .handle = completion.result }, .INTR => unreachable, - .CANCELED => return error.AsyncCancel, + .CANCELED => return error.Canceled, .FAULT => unreachable, .INVAL => return error.BadPathName, @@ -854,7 +854,7 @@ pub fn openFile( switch (errno(completion.result)) { .SUCCESS => return .{ .handle = completion.result }, .INTR => unreachable, - .CANCELED => return error.AsyncCancel, + .CANCELED => return error.Canceled, .FAULT => unreachable, .INVAL => return error.BadPathName, @@ -950,7 +950,7 @@ pub fn pread(userdata: ?*anyopaque, file: std.fs.File, buffer: []u8, offset: std switch (errno(completion.result)) { .SUCCESS => return @as(u32, @bitCast(completion.result)), .INTR => unreachable, - .CANCELED => return error.AsyncCancel, + .CANCELED => return error.Canceled, .INVAL => unreachable, .FAULT => unreachable, @@ -1002,7 +1002,7 @@ pub fn pwrite(userdata: ?*anyopaque, file: std.fs.File, buffer: []const u8, offs switch (errno(completion.result)) { .SUCCESS => return @as(u32, @bitCast(completion.result)), .INTR => unreachable, - .CANCELED => return error.AsyncCancel, + .CANCELED => return error.Canceled, .INVAL => return error.InvalidArgument, .FAULT => unreachable, @@ -1080,7 +1080,7 @@ pub fn sleep(userdata: ?*anyopaque, clockid: std.posix.clockid_t, deadline: Io.D switch (errno(completion.result)) { .SUCCESS, .TIME => return, .INTR => unreachable, - .CANCELED => return error.AsyncCancel, + .CANCELED => return error.Canceled, else => |err| return std.posix.unexpectedErrno(err), } diff --git a/lib/std/Thread/Pool.zig b/lib/std/Thread/Pool.zig index 6ec6c89040..37018f2ab7 100644 --- a/lib/std/Thread/Pool.zig +++ b/lib/std/Thread/Pool.zig @@ -332,9 +332,12 @@ pub fn io(pool: *Pool) Io { .vtable = &.{ .@"async" = @"async", .@"await" = @"await", - .cancel = cancel, .cancelRequested = cancelRequested, + .mutexLock = mutexLock, + .mutexUnlock = mutexUnlock, + .conditionWait = conditionWait, + .conditionWake = conditionWake, .createFile = createFile, .openFile = openFile, @@ -517,11 +520,179 @@ fn cancelRequested(userdata: ?*anyopaque) bool { return @atomicLoad(std.Thread.Id, &closure.cancel_tid, .acquire) == AsyncClosure.canceling_tid; } -fn checkCancel(pool: *Pool) error{AsyncCancel}!void { - if (cancelRequested(pool)) return error.AsyncCancel; +fn checkCancel(pool: *Pool) error{Canceled}!void { + if (cancelRequested(pool)) return error.Canceled; } -pub fn createFile( +fn mutexLock(userdata: ?*anyopaque, m: *Io.Mutex) void { + @branchHint(.cold); + const pool: *std.Thread.Pool = @alignCast(@ptrCast(userdata)); + _ = pool; + + // Avoid doing an atomic swap below if we already know the state is contended. + // An atomic swap unconditionally stores which marks the cache-line as modified unnecessarily. + if (m.state.load(.monotonic) == Io.Mutex.contended) { + std.Thread.Futex.wait(&m.state, Io.Mutex.contended); + } + + // Try to acquire the lock while also telling the existing lock holder that there are threads waiting. + // + // Once we sleep on the Futex, we must acquire the mutex using `contended` rather than `locked`. + // If not, threads sleeping on the Futex wouldn't see the state change in unlock and potentially deadlock. + // The downside is that the last mutex unlocker will see `contended` and do an unnecessary Futex wake + // but this is better than having to wake all waiting threads on mutex unlock. + // + // Acquire barrier ensures grabbing the lock happens before the critical section + // and that the previous lock holder's critical section happens before we grab the lock. + while (m.state.swap(Io.Mutex.contended, .acquire) != Io.Mutex.unlocked) { + std.Thread.Futex.wait(&m.state, Io.Mutex.contended); + } +} + +fn mutexUnlock(userdata: ?*anyopaque, m: *Io.Mutex) void { + const pool: *std.Thread.Pool = @alignCast(@ptrCast(userdata)); + _ = pool; + // Needs to also wake up a waiting thread if any. + // + // A waiting thread will acquire with `contended` instead of `locked` + // which ensures that it wakes up another thread on the next unlock(). + // + // Release barrier ensures the critical section happens before we let go of the lock + // and that our critical section happens before the next lock holder grabs the lock. + const state = m.state.swap(Io.Mutex.unlocked, .release); + assert(state != Io.Mutex.unlocked); + + if (state == Io.Mutex.contended) { + std.Thread.Futex.wake(&m.state, 1); + } +} + +fn mutexLockInternal(pool: *std.Thread.Pool, m: *Io.Mutex) void { + if (!m.tryLock()) { + @branchHint(.unlikely); + mutexLock(pool, m); + } +} + +fn conditionWait( + userdata: ?*anyopaque, + cond: *Io.Condition, + mutex: *Io.Mutex, + timeout: ?u64, +) Io.Condition.WaitError!void { + const pool: *std.Thread.Pool = @alignCast(@ptrCast(userdata)); + comptime assert(@TypeOf(cond.state) == u64); + const ints: *[2]std.atomic.Value(u32) = @ptrCast(&cond.state); + const cond_state = &ints[0]; + const cond_epoch = &ints[1]; + const one_waiter = 1; + const waiter_mask = 0xffff; + const one_signal = 1 << 16; + const signal_mask = 0xffff << 16; + // Observe the epoch, then check the state again to see if we should wake up. + // The epoch must be observed before we check the state or we could potentially miss a wake() and deadlock: + // + // - T1: s = LOAD(&state) + // - T2: UPDATE(&s, signal) + // - T2: UPDATE(&epoch, 1) + FUTEX_WAKE(&epoch) + // - T1: e = LOAD(&epoch) (was reordered after the state load) + // - T1: s & signals == 0 -> FUTEX_WAIT(&epoch, e) (missed the state update + the epoch change) + // + // Acquire barrier to ensure the epoch load happens before the state load. + var epoch = cond_epoch.load(.acquire); + var state = cond_state.fetchAdd(one_waiter, .monotonic); + assert(state & waiter_mask != waiter_mask); + state += one_waiter; + + mutexUnlock(pool, mutex); + defer mutexLockInternal(pool, mutex); + + var futex_deadline = std.Thread.Futex.Deadline.init(timeout); + + while (true) { + futex_deadline.wait(cond_epoch, epoch) catch |err| switch (err) { + // On timeout, we must decrement the waiter we added above. + error.Timeout => { + while (true) { + // If there's a signal when we're timing out, consume it and report being woken up instead. + // Acquire barrier ensures code before the wake() which added the signal happens before we decrement it and return. + while (state & signal_mask != 0) { + const new_state = state - one_waiter - one_signal; + state = cond_state.cmpxchgWeak(state, new_state, .acquire, .monotonic) orelse return; + } + + // Remove the waiter we added and officially return timed out. + const new_state = state - one_waiter; + state = cond_state.cmpxchgWeak(state, new_state, .monotonic, .monotonic) orelse return err; + } + }, + }; + + epoch = cond_epoch.load(.acquire); + state = cond_state.load(.monotonic); + + // Try to wake up by consuming a signal and decremented the waiter we added previously. + // Acquire barrier ensures code before the wake() which added the signal happens before we decrement it and return. + while (state & signal_mask != 0) { + const new_state = state - one_waiter - one_signal; + state = cond_state.cmpxchgWeak(state, new_state, .acquire, .monotonic) orelse return; + } + } +} + +fn conditionWake(userdata: ?*anyopaque, cond: *Io.Condition, notify: Io.Condition.Notify) void { + const pool: *std.Thread.Pool = @alignCast(@ptrCast(userdata)); + _ = pool; + comptime assert(@TypeOf(cond.state) == u64); + const ints: *[2]std.atomic.Value(u32) = @ptrCast(&cond.state); + const cond_state = &ints[0]; + const cond_epoch = &ints[1]; + const one_waiter = 1; + const waiter_mask = 0xffff; + const one_signal = 1 << 16; + const signal_mask = 0xffff << 16; + var state = cond_state.load(.monotonic); + while (true) { + const waiters = (state & waiter_mask) / one_waiter; + const signals = (state & signal_mask) / one_signal; + + // Reserves which waiters to wake up by incrementing the signals count. + // Therefore, the signals count is always less than or equal to the waiters count. + // We don't need to Futex.wake if there's nothing to wake up or if other wake() threads have reserved to wake up the current waiters. + const wakeable = waiters - signals; + if (wakeable == 0) { + return; + } + + const to_wake = switch (notify) { + .one => 1, + .all => wakeable, + }; + + // Reserve the amount of waiters to wake by incrementing the signals count. + // Release barrier ensures code before the wake() happens before the signal it posted and consumed by the wait() threads. + const new_state = state + (one_signal * to_wake); + state = cond_state.cmpxchgWeak(state, new_state, .release, .monotonic) orelse { + // Wake up the waiting threads we reserved above by changing the epoch value. + // NOTE: a waiting thread could miss a wake up if *exactly* ((1<<32)-1) wake()s happen between it observing the epoch and sleeping on it. + // This is very unlikely due to how many precise amount of Futex.wake() calls that would be between the waiting thread's potential preemption. + // + // Release barrier ensures the signal being added to the state happens before the epoch is changed. + // If not, the waiting thread could potentially deadlock from missing both the state and epoch change: + // + // - T2: UPDATE(&epoch, 1) (reordered before the state change) + // - T1: e = LOAD(&epoch) + // - T1: s = LOAD(&state) + // - T2: UPDATE(&state, signal) + FUTEX_WAKE(&epoch) + // - T1: s & signals == 0 -> FUTEX_WAIT(&epoch, e) (missed both epoch change and state change) + _ = cond_epoch.fetchAdd(1, .release); + std.Thread.Futex.wake(cond_epoch, to_wake); + return; + }; + } +} + +fn createFile( userdata: ?*anyopaque, dir: std.fs.Dir, sub_path: []const u8, @@ -532,7 +703,7 @@ pub fn createFile( return dir.createFile(sub_path, flags); } -pub fn openFile( +fn openFile( userdata: ?*anyopaque, dir: std.fs.Dir, sub_path: []const u8, @@ -543,13 +714,13 @@ pub fn openFile( return dir.openFile(sub_path, flags); } -pub fn closeFile(userdata: ?*anyopaque, file: std.fs.File) void { +fn closeFile(userdata: ?*anyopaque, file: std.fs.File) void { const pool: *std.Thread.Pool = @alignCast(@ptrCast(userdata)); _ = pool; return file.close(); } -pub fn pread(userdata: ?*anyopaque, file: std.fs.File, buffer: []u8, offset: std.posix.off_t) Io.FilePReadError!usize { +fn pread(userdata: ?*anyopaque, file: std.fs.File, buffer: []u8, offset: std.posix.off_t) Io.FilePReadError!usize { const pool: *std.Thread.Pool = @alignCast(@ptrCast(userdata)); try pool.checkCancel(); return switch (offset) { @@ -558,7 +729,7 @@ pub fn pread(userdata: ?*anyopaque, file: std.fs.File, buffer: []u8, offset: std }; } -pub fn pwrite(userdata: ?*anyopaque, file: std.fs.File, buffer: []const u8, offset: std.posix.off_t) Io.FilePWriteError!usize { +fn pwrite(userdata: ?*anyopaque, file: std.fs.File, buffer: []const u8, offset: std.posix.off_t) Io.FilePWriteError!usize { const pool: *std.Thread.Pool = @alignCast(@ptrCast(userdata)); try pool.checkCancel(); return switch (offset) {