diff --git a/BRANCH_TODO b/BRANCH_TODO index 37739e7d82..d09d2f9273 100644 --- a/BRANCH_TODO +++ b/BRANCH_TODO @@ -1,7 +1,7 @@ -* Threaded: rename Pool to Threaded * Threaded: finish linux impl (all tests passing) * Threaded: finish macos impl * Threaded: finish windows impl +* Threaded: glibc impl of netLookup * fix Group.wait not handling cancelation (need to move impl of ResetEvent to Threaded) * implement cancelRequest for non-linux posix diff --git a/lib/std/Io/Threaded.zig b/lib/std/Io/Threaded.zig index f654687684..76d1e49daf 100644 --- a/lib/std/Io/Threaded.zig +++ b/lib/std/Io/Threaded.zig @@ -1,4 +1,4 @@ -const Pool = @This(); +const Threaded = @This(); const builtin = @import("builtin"); const native_os = builtin.os.tag; @@ -76,18 +76,18 @@ pub fn init( /// If these functions are avoided, then `Allocator.failing` may be passed /// here. gpa: Allocator, -) Pool { - var pool: Pool = .{ +) Threaded { + var t: Threaded = .{ .allocator = gpa, .threads = .empty, .stack_size = std.Thread.SpawnConfig.default_stack_size, .cpu_count = std.Thread.getCpuCount(), .concurrent_count = 0, }; - if (pool.cpu_count) |n| { - pool.threads.ensureTotalCapacityPrecise(gpa, n - 1) catch {}; + if (t.cpu_count) |n| { + t.threads.ensureTotalCapacityPrecise(gpa, n - 1) catch {}; } else |_| {} - return pool; + return t; } /// Statically initialize such that any call to the following functions will @@ -96,7 +96,7 @@ pub fn init( /// * `Io.VTable.concurrent` /// * `Io.VTable.groupAsync` /// When initialized this way, `deinit` is safe, but unnecessary to call. -pub const init_single_threaded: Pool = .{ +pub const init_single_threaded: Threaded = .{ .allocator = .failing, .threads = .empty, .stack_size = std.Thread.SpawnConfig.default_stack_size, @@ -104,48 +104,48 @@ pub const init_single_threaded: Pool = .{ .concurrent_count = 0, }; -pub fn deinit(pool: *Pool) void { - const gpa = pool.allocator; - pool.join(); - pool.threads.deinit(gpa); - pool.* = undefined; +pub fn deinit(t: *Threaded) void { + const gpa = t.allocator; + t.join(); + t.threads.deinit(gpa); + t.* = undefined; } -fn join(pool: *Pool) void { +fn join(t: *Threaded) void { if (builtin.single_threaded) return; { - pool.mutex.lock(); - defer pool.mutex.unlock(); - pool.join_requested = true; + t.mutex.lock(); + defer t.mutex.unlock(); + t.join_requested = true; } - pool.cond.broadcast(); - for (pool.threads.items) |thread| thread.join(); + t.cond.broadcast(); + for (t.threads.items) |thread| thread.join(); } -fn worker(pool: *Pool) void { - pool.mutex.lock(); - defer pool.mutex.unlock(); +fn worker(t: *Threaded) void { + t.mutex.lock(); + defer t.mutex.unlock(); while (true) { - while (pool.run_queue.popFirst()) |closure_node| { - pool.mutex.unlock(); + while (t.run_queue.popFirst()) |closure_node| { + t.mutex.unlock(); const closure: *Closure = @fieldParentPtr("node", closure_node); const is_concurrent = closure.is_concurrent; closure.start(closure); - pool.mutex.lock(); + t.mutex.lock(); if (is_concurrent) { // TODO also pop thread and join sometimes - pool.concurrent_count -= 1; + t.concurrent_count -= 1; } } - if (pool.join_requested) break; - pool.cond.wait(&pool.mutex); + if (t.join_requested) break; + t.cond.wait(&t.mutex); } } -pub fn io(pool: *Pool) Io { +pub fn io(t: *Threaded) Io { return .{ - .userdata = pool, + .userdata = t, .vtable = &.{ .async = async, .concurrent = concurrent, @@ -324,14 +324,14 @@ fn async( start(context.ptr, result.ptr); return null; } - const pool: *Pool = @ptrCast(@alignCast(userdata)); - const cpu_count = pool.cpu_count catch { + const t: *Threaded = @ptrCast(@alignCast(userdata)); + const cpu_count = t.cpu_count catch { return concurrent(userdata, result.len, result_alignment, context, context_alignment, start) catch { start(context.ptr, result.ptr); return null; }; }; - const gpa = pool.allocator; + const gpa = t.allocator; const context_offset = context_alignment.forward(@sizeOf(AsyncClosure)); const result_offset = result_alignment.forward(context_offset + context.len); const n = result_offset + result.len; @@ -356,38 +356,38 @@ fn async( @memcpy(ac.contextPointer()[0..context.len], context); - pool.mutex.lock(); + t.mutex.lock(); - const thread_capacity = cpu_count - 1 + pool.concurrent_count; + const thread_capacity = cpu_count - 1 + t.concurrent_count; - pool.threads.ensureTotalCapacityPrecise(gpa, thread_capacity) catch { - pool.mutex.unlock(); + t.threads.ensureTotalCapacityPrecise(gpa, thread_capacity) catch { + t.mutex.unlock(); ac.free(gpa, result.len); start(context.ptr, result.ptr); return null; }; - pool.run_queue.prepend(&ac.closure.node); + t.run_queue.prepend(&ac.closure.node); - if (pool.threads.items.len < thread_capacity) { - const thread = std.Thread.spawn(.{ .stack_size = pool.stack_size }, worker, .{pool}) catch { - if (pool.threads.items.len == 0) { - assert(pool.run_queue.popFirst() == &ac.closure.node); - pool.mutex.unlock(); + if (t.threads.items.len < thread_capacity) { + const thread = std.Thread.spawn(.{ .stack_size = t.stack_size }, worker, .{t}) catch { + if (t.threads.items.len == 0) { + assert(t.run_queue.popFirst() == &ac.closure.node); + t.mutex.unlock(); ac.free(gpa, result.len); start(context.ptr, result.ptr); return null; } // Rely on other workers to do it. - pool.mutex.unlock(); - pool.cond.signal(); + t.mutex.unlock(); + t.cond.signal(); return @ptrCast(ac); }; - pool.threads.appendAssumeCapacity(thread); + t.threads.appendAssumeCapacity(thread); } - pool.mutex.unlock(); - pool.cond.signal(); + t.mutex.unlock(); + t.cond.signal(); return @ptrCast(ac); } @@ -401,9 +401,9 @@ fn concurrent( ) error{OutOfMemory}!*Io.AnyFuture { if (builtin.single_threaded) unreachable; - const pool: *Pool = @ptrCast(@alignCast(userdata)); - const cpu_count = pool.cpu_count catch 1; - const gpa = pool.allocator; + const t: *Threaded = @ptrCast(@alignCast(userdata)); + const cpu_count = t.cpu_count catch 1; + const gpa = t.allocator; const context_offset = context_alignment.forward(@sizeOf(AsyncClosure)); const result_offset = result_alignment.forward(context_offset + context.len); const n = result_offset + result_len; @@ -424,37 +424,37 @@ fn concurrent( }; @memcpy(ac.contextPointer()[0..context.len], context); - pool.mutex.lock(); + t.mutex.lock(); - pool.concurrent_count += 1; - const thread_capacity = cpu_count - 1 + pool.concurrent_count; + t.concurrent_count += 1; + const thread_capacity = cpu_count - 1 + t.concurrent_count; - pool.threads.ensureTotalCapacity(gpa, thread_capacity) catch { - pool.mutex.unlock(); + t.threads.ensureTotalCapacity(gpa, thread_capacity) catch { + t.mutex.unlock(); ac.free(gpa, result_len); return error.OutOfMemory; }; - pool.run_queue.prepend(&ac.closure.node); + t.run_queue.prepend(&ac.closure.node); - if (pool.threads.items.len < thread_capacity) { - const thread = std.Thread.spawn(.{ .stack_size = pool.stack_size }, worker, .{pool}) catch { - assert(pool.run_queue.popFirst() == &ac.closure.node); - pool.mutex.unlock(); + if (t.threads.items.len < thread_capacity) { + const thread = std.Thread.spawn(.{ .stack_size = t.stack_size }, worker, .{t}) catch { + assert(t.run_queue.popFirst() == &ac.closure.node); + t.mutex.unlock(); ac.free(gpa, result_len); return error.OutOfMemory; }; - pool.threads.appendAssumeCapacity(thread); + t.threads.appendAssumeCapacity(thread); } - pool.mutex.unlock(); - pool.cond.signal(); + t.mutex.unlock(); + t.cond.signal(); return @ptrCast(ac); } const GroupClosure = struct { closure: Closure, - pool: *Pool, + t: *Threaded, group: *Io.Group, /// Points to sibling `GroupClosure`. Used for walking the group to cancel all. node: std.SinglyLinkedList.Node, @@ -515,9 +515,9 @@ fn groupAsync( start: *const fn (*Io.Group, context: *const anyopaque) void, ) void { if (builtin.single_threaded) return start(context.ptr); - const pool: *Pool = @ptrCast(@alignCast(userdata)); - const cpu_count = pool.cpu_count catch 1; - const gpa = pool.allocator; + const t: *Threaded = @ptrCast(@alignCast(userdata)); + const cpu_count = t.cpu_count catch 1; + const gpa = t.allocator; const n = GroupClosure.contextEnd(context_alignment, context.len); const gc: *GroupClosure = @ptrCast(@alignCast(gpa.alignedAlloc(u8, .of(GroupClosure), n) catch { return start(group, context.ptr); @@ -528,7 +528,7 @@ fn groupAsync( .start = GroupClosure.start, .is_concurrent = false, }, - .pool = pool, + .t = t, .group = group, .node = undefined, .func = start, @@ -537,30 +537,30 @@ fn groupAsync( }; @memcpy(gc.contextPointer()[0..context.len], context); - pool.mutex.lock(); + t.mutex.lock(); // Append to the group linked list inside the mutex to make `Io.Group.async` thread-safe. gc.node = .{ .next = @ptrCast(@alignCast(group.token)) }; group.token = &gc.node; - const thread_capacity = cpu_count - 1 + pool.concurrent_count; + const thread_capacity = cpu_count - 1 + t.concurrent_count; - pool.threads.ensureTotalCapacityPrecise(gpa, thread_capacity) catch { - pool.mutex.unlock(); + t.threads.ensureTotalCapacityPrecise(gpa, thread_capacity) catch { + t.mutex.unlock(); gc.free(gpa); return start(group, context.ptr); }; - pool.run_queue.prepend(&gc.closure.node); + t.run_queue.prepend(&gc.closure.node); - if (pool.threads.items.len < thread_capacity) { - const thread = std.Thread.spawn(.{ .stack_size = pool.stack_size }, worker, .{pool}) catch { - assert(pool.run_queue.popFirst() == &gc.closure.node); - pool.mutex.unlock(); + if (t.threads.items.len < thread_capacity) { + const thread = std.Thread.spawn(.{ .stack_size = t.stack_size }, worker, .{t}) catch { + assert(t.run_queue.popFirst() == &gc.closure.node); + t.mutex.unlock(); gc.free(gpa); return start(group, context.ptr); }; - pool.threads.appendAssumeCapacity(thread); + t.threads.appendAssumeCapacity(thread); } // This needs to be done before unlocking the mutex to avoid a race with @@ -568,13 +568,13 @@ fn groupAsync( const group_state: *std.atomic.Value(usize) = @ptrCast(&group.state); std.Thread.WaitGroup.startStateless(group_state); - pool.mutex.unlock(); - pool.cond.signal(); + t.mutex.unlock(); + t.cond.signal(); } fn groupWait(userdata: ?*anyopaque, group: *Io.Group, token: *anyopaque) void { - const pool: *Pool = @ptrCast(@alignCast(userdata)); - const gpa = pool.allocator; + const t: *Threaded = @ptrCast(@alignCast(userdata)); + const gpa = t.allocator; if (builtin.single_threaded) return; @@ -593,8 +593,8 @@ fn groupWait(userdata: ?*anyopaque, group: *Io.Group, token: *anyopaque) void { } fn groupCancel(userdata: ?*anyopaque, group: *Io.Group, token: *anyopaque) void { - const pool: *Pool = @ptrCast(@alignCast(userdata)); - const gpa = pool.allocator; + const t: *Threaded = @ptrCast(@alignCast(userdata)); + const gpa = t.allocator; if (builtin.single_threaded) return; @@ -629,9 +629,9 @@ fn await( result_alignment: std.mem.Alignment, ) void { _ = result_alignment; - const pool: *Pool = @ptrCast(@alignCast(userdata)); + const t: *Threaded = @ptrCast(@alignCast(userdata)); const closure: *AsyncClosure = @ptrCast(@alignCast(any_future)); - closure.waitAndFree(pool.allocator, result); + closure.waitAndFree(t.allocator, result); } fn cancel( @@ -641,31 +641,31 @@ fn cancel( result_alignment: std.mem.Alignment, ) void { _ = result_alignment; - const pool: *Pool = @ptrCast(@alignCast(userdata)); + const t: *Threaded = @ptrCast(@alignCast(userdata)); const ac: *AsyncClosure = @ptrCast(@alignCast(any_future)); ac.closure.requestCancel(); - ac.waitAndFree(pool.allocator, result); + ac.waitAndFree(t.allocator, result); } fn cancelRequested(userdata: ?*anyopaque) bool { - const pool: *Pool = @ptrCast(@alignCast(userdata)); - _ = pool; + const t: *Threaded = @ptrCast(@alignCast(userdata)); + _ = t; const closure = current_closure orelse return false; return @atomicLoad(std.Thread.Id, &closure.cancel_tid, .acquire) == Closure.canceling_tid; } -fn checkCancel(pool: *Pool) error{Canceled}!void { - if (cancelRequested(pool)) return error.Canceled; +fn checkCancel(t: *Threaded) error{Canceled}!void { + if (cancelRequested(t)) return error.Canceled; } fn mutexLock(userdata: ?*anyopaque, prev_state: Io.Mutex.State, mutex: *Io.Mutex) Io.Cancelable!void { - const pool: *Pool = @ptrCast(@alignCast(userdata)); + const t: *Threaded = @ptrCast(@alignCast(userdata)); if (prev_state == .contended) { - try pool.checkCancel(); + try t.checkCancel(); futexWait(@ptrCast(&mutex.state), @intFromEnum(Io.Mutex.State.contended)); } while (@atomicRmw(Io.Mutex.State, &mutex.state, .Xchg, .contended, .acquire) != .unlocked) { - try pool.checkCancel(); + try t.checkCancel(); futexWait(@ptrCast(&mutex.state), @intFromEnum(Io.Mutex.State.contended)); } } @@ -689,8 +689,8 @@ fn mutexUnlock(userdata: ?*anyopaque, prev_state: Io.Mutex.State, mutex: *Io.Mut } fn conditionWaitUncancelable(userdata: ?*anyopaque, cond: *Io.Condition, mutex: *Io.Mutex) void { - const pool: *Pool = @ptrCast(@alignCast(userdata)); - const pool_io = pool.io(); + const t: *Threaded = @ptrCast(@alignCast(userdata)); + const t_io = t.io(); comptime assert(@TypeOf(cond.state) == u64); const ints: *[2]std.atomic.Value(u32) = @ptrCast(&cond.state); const cond_state = &ints[0]; @@ -704,8 +704,8 @@ fn conditionWaitUncancelable(userdata: ?*anyopaque, cond: *Io.Condition, mutex: assert(state & waiter_mask != waiter_mask); state += one_waiter; - mutex.unlock(pool_io); - defer mutex.lockUncancelable(pool_io); + mutex.unlock(t_io); + defer mutex.lockUncancelable(t_io); while (true) { futexWait(cond_epoch, epoch); @@ -719,7 +719,7 @@ fn conditionWaitUncancelable(userdata: ?*anyopaque, cond: *Io.Condition, mutex: } fn conditionWait(userdata: ?*anyopaque, cond: *Io.Condition, mutex: *Io.Mutex) Io.Cancelable!void { - const pool: *Pool = @ptrCast(@alignCast(userdata)); + const t: *Threaded = @ptrCast(@alignCast(userdata)); comptime assert(@TypeOf(cond.state) == u64); const ints: *[2]std.atomic.Value(u32) = @ptrCast(&cond.state); const cond_state = &ints[0]; @@ -743,11 +743,11 @@ fn conditionWait(userdata: ?*anyopaque, cond: *Io.Condition, mutex: *Io.Mutex) I assert(state & waiter_mask != waiter_mask); state += one_waiter; - mutex.unlock(pool.io()); - defer mutex.lockUncancelable(pool.io()); + mutex.unlock(t.io()); + defer mutex.lockUncancelable(t.io()); while (true) { - try pool.checkCancel(); + try t.checkCancel(); futexWait(cond_epoch, epoch); epoch = cond_epoch.load(.acquire); @@ -764,8 +764,8 @@ fn conditionWait(userdata: ?*anyopaque, cond: *Io.Condition, mutex: *Io.Mutex) I } fn conditionWake(userdata: ?*anyopaque, cond: *Io.Condition, wake: Io.Condition.Wake) void { - const pool: *Pool = @ptrCast(@alignCast(userdata)); - _ = pool; + const t: *Threaded = @ptrCast(@alignCast(userdata)); + _ = t; comptime assert(@TypeOf(cond.state) == u64); const ints: *[2]std.atomic.Value(u32) = @ptrCast(&cond.state); const cond_state = &ints[0]; @@ -825,13 +825,13 @@ fn conditionWake(userdata: ?*anyopaque, cond: *Io.Condition, wake: Io.Condition. } fn dirMakePosix(userdata: ?*anyopaque, dir: Io.Dir, sub_path: []const u8, mode: Io.Dir.Mode) Io.Dir.MakeError!void { - const pool: *Pool = @ptrCast(@alignCast(userdata)); + const t: *Threaded = @ptrCast(@alignCast(userdata)); var path_buffer: [posix.PATH_MAX]u8 = undefined; const sub_path_posix = try pathToPosix(sub_path, &path_buffer); while (true) { - try pool.checkCancel(); + try t.checkCancel(); switch (posix.errno(posix.system.mkdirat(dir.handle, sub_path_posix, mode))) { .SUCCESS => return, .INTR => continue, @@ -858,8 +858,8 @@ fn dirMakePosix(userdata: ?*anyopaque, dir: Io.Dir, sub_path: []const u8, mode: } fn dirStat(userdata: ?*anyopaque, dir: Io.Dir) Io.Dir.StatError!Io.Dir.Stat { - const pool: *Pool = @ptrCast(@alignCast(userdata)); - try pool.checkCancel(); + const t: *Threaded = @ptrCast(@alignCast(userdata)); + try t.checkCancel(); _ = dir; @panic("TODO"); @@ -871,7 +871,7 @@ fn dirStatPathLinux( sub_path: []const u8, options: Io.Dir.StatPathOptions, ) Io.Dir.StatPathError!Io.File.Stat { - const pool: *Pool = @ptrCast(@alignCast(userdata)); + const t: *Threaded = @ptrCast(@alignCast(userdata)); const linux = std.os.linux; var path_buffer: [posix.PATH_MAX]u8 = undefined; @@ -881,7 +881,7 @@ fn dirStatPathLinux( @as(u32, if (!options.follow_symlinks) linux.AT.SYMLINK_NOFOLLOW else 0); while (true) { - try pool.checkCancel(); + try t.checkCancel(); var statx = std.mem.zeroes(linux.Statx); const rc = linux.statx( dir.handle, @@ -913,7 +913,7 @@ fn dirStatPathPosix( sub_path: []const u8, options: Io.Dir.StatPathOptions, ) Io.Dir.StatPathError!Io.File.Stat { - const pool: *Pool = @ptrCast(@alignCast(userdata)); + const t: *Threaded = @ptrCast(@alignCast(userdata)); var path_buffer: [posix.PATH_MAX]u8 = undefined; const sub_path_posix = try pathToPosix(sub_path, &path_buffer); @@ -921,7 +921,7 @@ fn dirStatPathPosix( const flags: u32 = if (!options.follow_symlinks) posix.AT.SYMLINK_NOFOLLOW else 0; while (true) { - try pool.checkCancel(); + try t.checkCancel(); var stat = std.mem.zeroes(posix.Stat); switch (posix.errno(fstatat_sym(dir.handle, sub_path_posix, &stat, flags))) { .SUCCESS => return statFromPosix(stat), @@ -943,12 +943,12 @@ fn dirStatPathPosix( } fn fileStatPosix(userdata: ?*anyopaque, file: Io.File) Io.File.StatError!Io.File.Stat { - const pool: *Pool = @ptrCast(@alignCast(userdata)); + const t: *Threaded = @ptrCast(@alignCast(userdata)); if (posix.Stat == void) return error.Streaming; while (true) { - try pool.checkCancel(); + try t.checkCancel(); var stat = std.mem.zeroes(posix.Stat); switch (posix.errno(fstat_sym(file.handle, &stat))) { .SUCCESS => return statFromPosix(&stat), @@ -963,10 +963,10 @@ fn fileStatPosix(userdata: ?*anyopaque, file: Io.File) Io.File.StatError!Io.File } fn fileStatLinux(userdata: ?*anyopaque, file: Io.File) Io.File.StatError!Io.File.Stat { - const pool: *Pool = @ptrCast(@alignCast(userdata)); + const t: *Threaded = @ptrCast(@alignCast(userdata)); const linux = std.os.linux; while (true) { - try pool.checkCancel(); + try t.checkCancel(); var statx = std.mem.zeroes(linux.Statx); const rc = linux.statx( file.handle, @@ -993,17 +993,17 @@ fn fileStatLinux(userdata: ?*anyopaque, file: Io.File) Io.File.StatError!Io.File } fn fileStatWindows(userdata: ?*anyopaque, file: Io.File) Io.File.StatError!Io.File.Stat { - const pool: *Pool = @ptrCast(@alignCast(userdata)); - try pool.checkCancel(); + const t: *Threaded = @ptrCast(@alignCast(userdata)); + try t.checkCancel(); _ = file; @panic("TODO"); } fn fileStatWasi(userdata: ?*anyopaque, file: Io.File) Io.File.StatError!Io.File.Stat { if (builtin.link_libc) return fileStatPosix(userdata, file); - const pool: *Pool = @ptrCast(@alignCast(userdata)); + const t: *Threaded = @ptrCast(@alignCast(userdata)); while (true) { - try pool.checkCancel(); + try t.checkCancel(); var stat: std.os.wasi.filestat_t = undefined; switch (std.os.wasi.fd_filestat_get(file.handle, &stat)) { .SUCCESS => return statFromWasi(&stat), @@ -1031,7 +1031,7 @@ fn dirCreateFilePosix( sub_path: []const u8, flags: Io.File.CreateFlags, ) Io.File.OpenError!Io.File { - const pool: *Pool = @ptrCast(@alignCast(userdata)); + const t: *Threaded = @ptrCast(@alignCast(userdata)); var path_buffer: [posix.PATH_MAX]u8 = undefined; const sub_path_posix = try pathToPosix(sub_path, &path_buffer); @@ -1062,7 +1062,7 @@ fn dirCreateFilePosix( }; const fd: posix.fd_t = while (true) { - try pool.checkCancel(); + try t.checkCancel(); const rc = openat_sym(dir.handle, sub_path_posix, os_flags, flags.mode); switch (posix.errno(rc)) { .SUCCESS => break @intCast(rc), @@ -1106,7 +1106,7 @@ fn dirCreateFilePosix( .exclusive => posix.LOCK.EX | lock_nonblocking, }; while (true) { - try pool.checkCancel(); + try t.checkCancel(); switch (posix.errno(posix.system.flock(fd, lock_flags))) { .SUCCESS => break, .INTR => continue, @@ -1123,7 +1123,7 @@ fn dirCreateFilePosix( if (has_flock_open_flags and flags.lock_nonblocking) { var fl_flags: usize = while (true) { - try pool.checkCancel(); + try t.checkCancel(); switch (posix.errno(posix.system.fcntl(fd, posix.F.GETFL, 0))) { .SUCCESS => break, .INTR => continue, @@ -1132,7 +1132,7 @@ fn dirCreateFilePosix( }; fl_flags &= ~@as(usize, 1 << @bitOffsetOf(posix.O, "NONBLOCK")); while (true) { - try pool.checkCancel(); + try t.checkCancel(); switch (posix.errno(posix.fcntl(fd, posix.F.SETFL, fl_flags))) { .SUCCESS => break, .INTR => continue, @@ -1150,7 +1150,7 @@ fn dirOpenFile( sub_path: []const u8, flags: Io.File.OpenFlags, ) Io.File.OpenError!Io.File { - const pool: *Pool = @ptrCast(@alignCast(userdata)); + const t: *Threaded = @ptrCast(@alignCast(userdata)); var path_buffer: [posix.PATH_MAX]u8 = undefined; const sub_path_posix = try pathToPosix(sub_path, &path_buffer); @@ -1191,7 +1191,7 @@ fn dirOpenFile( } } const fd: posix.fd_t = while (true) { - try pool.checkCancel(); + try t.checkCancel(); const rc = openat_sym(dir.handle, sub_path_posix, os_flags, @as(posix.mode_t, 0)); switch (posix.errno(rc)) { .SUCCESS => break @intCast(rc), @@ -1235,7 +1235,7 @@ fn dirOpenFile( .exclusive => posix.LOCK.EX | lock_nonblocking, }; while (true) { - try pool.checkCancel(); + try t.checkCancel(); switch (posix.errno(posix.system.flock(fd, lock_flags))) { .SUCCESS => break, .INTR => continue, @@ -1252,7 +1252,7 @@ fn dirOpenFile( if (has_flock_open_flags and flags.lock_nonblocking) { var fl_flags: usize = while (true) { - try pool.checkCancel(); + try t.checkCancel(); switch (posix.errno(posix.system.fcntl(fd, posix.F.GETFL, 0))) { .SUCCESS => break, .INTR => continue, @@ -1261,7 +1261,7 @@ fn dirOpenFile( }; fl_flags &= ~@as(usize, 1 << @bitOffsetOf(posix.O, "NONBLOCK")); while (true) { - try pool.checkCancel(); + try t.checkCancel(); switch (posix.errno(posix.fcntl(fd, posix.F.SETFL, fl_flags))) { .SUCCESS => break, .INTR => continue, @@ -1274,13 +1274,13 @@ fn dirOpenFile( } fn fileClose(userdata: ?*anyopaque, file: Io.File) void { - const pool: *Pool = @ptrCast(@alignCast(userdata)); - _ = pool; + const t: *Threaded = @ptrCast(@alignCast(userdata)); + _ = t; posix.close(file.handle); } fn fileReadStreaming(userdata: ?*anyopaque, file: Io.File, data: [][]u8) Io.File.ReadStreamingError!usize { - const pool: *Pool = @ptrCast(@alignCast(userdata)); + const t: *Threaded = @ptrCast(@alignCast(userdata)); if (is_windows) { const DWORD = windows.DWORD; @@ -1288,7 +1288,7 @@ fn fileReadStreaming(userdata: ?*anyopaque, file: Io.File, data: [][]u8) Io.File var truncate: usize = 0; var total: usize = 0; while (index < data.len) { - try pool.checkCancel(); + try t.checkCancel(); { const untruncated = data[index]; data[index] = untruncated[truncate..]; @@ -1333,7 +1333,7 @@ fn fileReadStreaming(userdata: ?*anyopaque, file: Io.File, data: [][]u8) Io.File assert(dest[0].len > 0); if (native_os == .wasi and !builtin.link_libc) while (true) { - try pool.checkCancel(); + try t.checkCancel(); var nread: usize = undefined; switch (std.os.wasi.fd_read(file.handle, dest.ptr, dest.len, &nread)) { .SUCCESS => return nread, @@ -1354,7 +1354,7 @@ fn fileReadStreaming(userdata: ?*anyopaque, file: Io.File, data: [][]u8) Io.File }; while (true) { - try pool.checkCancel(); + try t.checkCancel(); const rc = posix.system.readv(file.handle, dest.ptr, @intCast(dest.len)); switch (posix.errno(rc)) { .SUCCESS => return @intCast(rc), @@ -1377,7 +1377,7 @@ fn fileReadStreaming(userdata: ?*anyopaque, file: Io.File, data: [][]u8) Io.File } fn fileReadPositional(userdata: ?*anyopaque, file: Io.File, data: [][]u8, offset: u64) Io.File.ReadPositionalError!usize { - const pool: *Pool = @ptrCast(@alignCast(userdata)); + const t: *Threaded = @ptrCast(@alignCast(userdata)); if (is_windows) { const DWORD = windows.DWORD; @@ -1386,7 +1386,7 @@ fn fileReadPositional(userdata: ?*anyopaque, file: Io.File, data: [][]u8, offset var truncate: usize = 0; var total: usize = 0; while (true) { - try pool.checkCancel(); + try t.checkCancel(); { const untruncated = data[index]; data[index] = untruncated[truncate..]; @@ -1454,7 +1454,7 @@ fn fileReadPositional(userdata: ?*anyopaque, file: Io.File, data: [][]u8, offset assert(dest[0].len > 0); if (native_os == .wasi and !builtin.link_libc) while (true) { - try pool.checkCancel(); + try t.checkCancel(); var nread: usize = undefined; switch (std.os.wasi.fd_pread(file.handle, dest.ptr, dest.len, offset, &nread)) { .SUCCESS => return nread, @@ -1479,7 +1479,7 @@ fn fileReadPositional(userdata: ?*anyopaque, file: Io.File, data: [][]u8, offset }; while (true) { - try pool.checkCancel(); + try t.checkCancel(); const rc = preadv_sym(file.handle, dest.ptr, @intCast(dest.len), @bitCast(offset)); switch (posix.errno(rc)) { .SUCCESS => return @bitCast(rc), @@ -1505,8 +1505,8 @@ fn fileReadPositional(userdata: ?*anyopaque, file: Io.File, data: [][]u8, offset } fn fileSeekBy(userdata: ?*anyopaque, file: Io.File, offset: i64) Io.File.SeekError!void { - const pool: *Pool = @ptrCast(@alignCast(userdata)); - try pool.checkCancel(); + const t: *Threaded = @ptrCast(@alignCast(userdata)); + try t.checkCancel(); _ = file; _ = offset; @@ -1514,11 +1514,11 @@ fn fileSeekBy(userdata: ?*anyopaque, file: Io.File, offset: i64) Io.File.SeekErr } fn fileSeekTo(userdata: ?*anyopaque, file: Io.File, offset: u64) Io.File.SeekError!void { - const pool: *Pool = @ptrCast(@alignCast(userdata)); + const t: *Threaded = @ptrCast(@alignCast(userdata)); const fd = file.handle; if (native_os == .linux and !builtin.link_libc and @sizeOf(usize) == 4) while (true) { - try pool.checkCancel(); + try t.checkCancel(); var result: u64 = undefined; switch (posix.errno(posix.system.llseek(fd, offset, &result, posix.SEEK.SET))) { .SUCCESS => return, @@ -1533,12 +1533,12 @@ fn fileSeekTo(userdata: ?*anyopaque, file: Io.File, offset: u64) Io.File.SeekErr }; if (native_os == .windows) { - try pool.checkCancel(); + try t.checkCancel(); return windows.SetFilePointerEx_BEGIN(fd, offset); } if (native_os == .wasi and !builtin.link_libc) while (true) { - try pool.checkCancel(); + try t.checkCancel(); var new_offset: std.os.wasi.filesize_t = undefined; switch (std.os.wasi.fd_seek(fd, @bitCast(offset), .SET, &new_offset)) { .SUCCESS => return, @@ -1556,7 +1556,7 @@ fn fileSeekTo(userdata: ?*anyopaque, file: Io.File, offset: u64) Io.File.SeekErr if (posix.SEEK == void) return error.Unseekable; while (true) { - try pool.checkCancel(); + try t.checkCancel(); switch (posix.errno(lseek_sym(fd, @bitCast(offset), posix.SEEK.SET))) { .SUCCESS => return, .INTR => continue, @@ -1571,8 +1571,8 @@ fn fileSeekTo(userdata: ?*anyopaque, file: Io.File, offset: u64) Io.File.SeekErr } fn pwrite(userdata: ?*anyopaque, file: Io.File, buffer: []const u8, offset: posix.off_t) Io.File.PWriteError!usize { - const pool: *Pool = @ptrCast(@alignCast(userdata)); - try pool.checkCancel(); + const t: *Threaded = @ptrCast(@alignCast(userdata)); + try t.checkCancel(); const fs_file: std.fs.File = .{ .handle = file.handle }; return switch (offset) { -1 => fs_file.write(buffer), @@ -1581,8 +1581,8 @@ fn pwrite(userdata: ?*anyopaque, file: Io.File, buffer: []const u8, offset: posi } fn nowPosix(userdata: ?*anyopaque, clock: Io.Clock) Io.Clock.Error!Io.Timestamp { - const pool: *Pool = @ptrCast(@alignCast(userdata)); - _ = pool; + const t: *Threaded = @ptrCast(@alignCast(userdata)); + _ = t; const clock_id: posix.clockid_t = clockToPosix(clock); var tp: posix.timespec = undefined; switch (posix.errno(posix.system.clock_gettime(clock_id, &tp))) { @@ -1593,8 +1593,8 @@ fn nowPosix(userdata: ?*anyopaque, clock: Io.Clock) Io.Clock.Error!Io.Timestamp } fn nowWindows(userdata: ?*anyopaque, clock: Io.Clock) Io.Clock.Error!Io.Timestamp { - const pool: *Pool = @ptrCast(@alignCast(userdata)); - _ = pool; + const t: *Threaded = @ptrCast(@alignCast(userdata)); + _ = t; switch (clock) { .realtime => { // RtlGetSystemTimePrecise() has a granularity of 100 nanoseconds @@ -1612,8 +1612,8 @@ fn nowWindows(userdata: ?*anyopaque, clock: Io.Clock) Io.Clock.Error!Io.Timestam } fn nowWasi(userdata: ?*anyopaque, clock: Io.Clock) Io.Clock.Error!Io.Timestamp { - const pool: *Pool = @ptrCast(@alignCast(userdata)); - _ = pool; + const t: *Threaded = @ptrCast(@alignCast(userdata)); + _ = t; var ns: std.os.wasi.timestamp_t = undefined; const err = std.os.wasi.clock_time_get(clockToWasi(clock), 1, &ns); if (err != .SUCCESS) return error.Unexpected; @@ -1621,7 +1621,7 @@ fn nowWasi(userdata: ?*anyopaque, clock: Io.Clock) Io.Clock.Error!Io.Timestamp { } fn sleepLinux(userdata: ?*anyopaque, timeout: Io.Timeout) Io.SleepError!void { - const pool: *Pool = @ptrCast(@alignCast(userdata)); + const t: *Threaded = @ptrCast(@alignCast(userdata)); const clock_id: posix.clockid_t = clockToPosix(switch (timeout) { .none => .awake, .duration => |d| d.clock, @@ -1634,7 +1634,7 @@ fn sleepLinux(userdata: ?*anyopaque, timeout: Io.Timeout) Io.SleepError!void { }; var timespec: posix.timespec = timestampToPosix(deadline_nanoseconds); while (true) { - try pool.checkCancel(); + try t.checkCancel(); switch (std.os.linux.E.init(std.os.linux.clock_nanosleep(clock_id, .{ .ABSTIME = switch (timeout) { .none, .duration => false, .deadline => true, @@ -1648,10 +1648,10 @@ fn sleepLinux(userdata: ?*anyopaque, timeout: Io.Timeout) Io.SleepError!void { } fn sleepWindows(userdata: ?*anyopaque, timeout: Io.Timeout) Io.SleepError!void { - const pool: *Pool = @ptrCast(@alignCast(userdata)); - try pool.checkCancel(); + const t: *Threaded = @ptrCast(@alignCast(userdata)); + try t.checkCancel(); const ms = ms: { - const duration_and_clock = (try timeout.toDurationFromNow(pool.io())) orelse + const duration_and_clock = (try timeout.toDurationFromNow(t.io())) orelse break :ms std.math.maxInt(windows.DWORD); break :ms std.math.lossyCast(windows.DWORD, duration_and_clock.duration.toMilliseconds()); }; @@ -1659,12 +1659,12 @@ fn sleepWindows(userdata: ?*anyopaque, timeout: Io.Timeout) Io.SleepError!void { } fn sleepWasi(userdata: ?*anyopaque, timeout: Io.Timeout) Io.SleepError!void { - const pool: *Pool = @ptrCast(@alignCast(userdata)); - try pool.checkCancel(); + const t: *Threaded = @ptrCast(@alignCast(userdata)); + try t.checkCancel(); const w = std.os.wasi; - const clock: w.subscription_clock_t = if (try timeout.toDurationFromNow(pool.io())) |d| .{ + const clock: w.subscription_clock_t = if (try timeout.toDurationFromNow(t.io())) |d| .{ .id = clockToWasi(d.clock), .timeout = std.math.lossyCast(u64, d.duration.nanoseconds), .precision = 0, @@ -1688,19 +1688,19 @@ fn sleepWasi(userdata: ?*anyopaque, timeout: Io.Timeout) Io.SleepError!void { } fn sleepPosix(userdata: ?*anyopaque, timeout: Io.Timeout) Io.SleepError!void { - const pool: *Pool = @ptrCast(@alignCast(userdata)); + const t: *Threaded = @ptrCast(@alignCast(userdata)); const sec_type = @typeInfo(posix.timespec).@"struct".fields[0].type; const nsec_type = @typeInfo(posix.timespec).@"struct".fields[1].type; var timespec: posix.timespec = t: { - const d = (try timeout.toDurationFromNow(pool.io())) orelse break :t .{ + const d = (try timeout.toDurationFromNow(t.io())) orelse break :t .{ .sec = std.math.maxInt(sec_type), .nsec = std.math.maxInt(nsec_type), }; break :t timestampToPosix(d.duration.nanoseconds); }; while (true) { - try pool.checkCancel(); + try t.checkCancel(); switch (posix.errno(posix.system.nanosleep(×pec, ×pec))) { .INTR => continue, else => return, // This prong handles success as well as unexpected errors. @@ -1709,8 +1709,8 @@ fn sleepPosix(userdata: ?*anyopaque, timeout: Io.Timeout) Io.SleepError!void { } fn select(userdata: ?*anyopaque, futures: []const *Io.AnyFuture) usize { - const pool: *Pool = @ptrCast(@alignCast(userdata)); - _ = pool; + const t: *Threaded = @ptrCast(@alignCast(userdata)); + _ = t; var reset_event: ResetEvent = .unset; @@ -1745,26 +1745,26 @@ fn netListenIpPosix( address: IpAddress, options: IpAddress.ListenOptions, ) IpAddress.ListenError!net.Server { - const pool: *Pool = @ptrCast(@alignCast(userdata)); + const t: *Threaded = @ptrCast(@alignCast(userdata)); const family = posixAddressFamily(&address); - const socket_fd = try openSocketPosix(pool, family, .{ + const socket_fd = try openSocketPosix(t, family, .{ .mode = options.mode, .protocol = options.protocol, }); errdefer posix.close(socket_fd); if (options.reuse_address) { - try setSocketOption(pool, socket_fd, posix.SOL.SOCKET, posix.SO.REUSEADDR, 1); + try setSocketOption(t, socket_fd, posix.SOL.SOCKET, posix.SO.REUSEADDR, 1); if (@hasDecl(posix.SO, "REUSEPORT")) - try setSocketOption(pool, socket_fd, posix.SOL.SOCKET, posix.SO.REUSEPORT, 1); + try setSocketOption(t, socket_fd, posix.SOL.SOCKET, posix.SO.REUSEPORT, 1); } var storage: PosixAddress = undefined; var addr_len = addressToPosix(&address, &storage); - try posixBind(pool, socket_fd, &storage.any, addr_len); + try posixBind(t, socket_fd, &storage.any, addr_len); while (true) { - try pool.checkCancel(); + try t.checkCancel(); switch (posix.errno(posix.system.listen(socket_fd, options.kernel_backlog))) { .SUCCESS => break, .ADDRINUSE => return error.AddressInUse, @@ -1773,7 +1773,7 @@ fn netListenIpPosix( } } - try posixGetSockName(pool, socket_fd, &storage.any, &addr_len); + try posixGetSockName(t, socket_fd, &storage.any, &addr_len); return .{ .socket = .{ .handle = socket_fd, @@ -1788,8 +1788,8 @@ fn netListenUnix( options: net.UnixAddress.ListenOptions, ) net.UnixAddress.ListenError!net.Socket.Handle { if (!net.has_unix_sockets) return error.AddressFamilyUnsupported; - const pool: *Pool = @ptrCast(@alignCast(userdata)); - const socket_fd = openSocketPosix(pool, posix.AF.UNIX, .{ .mode = .stream }) catch |err| switch (err) { + const t: *Threaded = @ptrCast(@alignCast(userdata)); + const socket_fd = openSocketPosix(t, posix.AF.UNIX, .{ .mode = .stream }) catch |err| switch (err) { error.ProtocolUnsupportedBySystem => return error.AddressFamilyUnsupported, error.ProtocolUnsupportedByAddressFamily => return error.AddressFamilyUnsupported, error.SocketModeUnsupported => return error.AddressFamilyUnsupported, @@ -1799,10 +1799,10 @@ fn netListenUnix( var storage: UnixAddress = undefined; const addr_len = addressUnixToPosix(address, &storage); - try posixBindUnix(pool, socket_fd, &storage.any, addr_len); + try posixBindUnix(t, socket_fd, &storage.any, addr_len); while (true) { - try pool.checkCancel(); + try t.checkCancel(); switch (posix.errno(posix.system.listen(socket_fd, options.kernel_backlog))) { .SUCCESS => break, .ADDRINUSE => return error.AddressInUse, @@ -1814,9 +1814,9 @@ fn netListenUnix( return socket_fd; } -fn posixBindUnix(pool: *Pool, fd: posix.socket_t, addr: *const posix.sockaddr, addr_len: posix.socklen_t) !void { +fn posixBindUnix(t: *Threaded, fd: posix.socket_t, addr: *const posix.sockaddr, addr_len: posix.socklen_t) !void { while (true) { - try pool.checkCancel(); + try t.checkCancel(); switch (posix.errno(posix.system.bind(fd, addr, addr_len))) { .SUCCESS => break, .INTR => continue, @@ -1842,9 +1842,9 @@ fn posixBindUnix(pool: *Pool, fd: posix.socket_t, addr: *const posix.sockaddr, a } } -fn posixBind(pool: *Pool, socket_fd: posix.socket_t, addr: *const posix.sockaddr, addr_len: posix.socklen_t) !void { +fn posixBind(t: *Threaded, socket_fd: posix.socket_t, addr: *const posix.sockaddr, addr_len: posix.socklen_t) !void { while (true) { - try pool.checkCancel(); + try t.checkCancel(); switch (posix.errno(posix.system.bind(socket_fd, addr, addr_len))) { .SUCCESS => break, .INTR => continue, @@ -1861,9 +1861,9 @@ fn posixBind(pool: *Pool, socket_fd: posix.socket_t, addr: *const posix.sockaddr } } -fn posixConnect(pool: *Pool, socket_fd: posix.socket_t, addr: *const posix.sockaddr, addr_len: posix.socklen_t) !void { +fn posixConnect(t: *Threaded, socket_fd: posix.socket_t, addr: *const posix.sockaddr, addr_len: posix.socklen_t) !void { while (true) { - try pool.checkCancel(); + try t.checkCancel(); switch (posix.errno(posix.system.connect(socket_fd, addr, addr_len))) { .SUCCESS => return, .INTR => continue, @@ -1890,9 +1890,9 @@ fn posixConnect(pool: *Pool, socket_fd: posix.socket_t, addr: *const posix.socka } } -fn posixConnectUnix(pool: *Pool, fd: posix.socket_t, addr: *const posix.sockaddr, addr_len: posix.socklen_t) !void { +fn posixConnectUnix(t: *Threaded, fd: posix.socket_t, addr: *const posix.sockaddr, addr_len: posix.socklen_t) !void { while (true) { - try pool.checkCancel(); + try t.checkCancel(); switch (posix.errno(posix.system.connect(fd, addr, addr_len))) { .SUCCESS => return, .INTR => continue, @@ -1919,9 +1919,9 @@ fn posixConnectUnix(pool: *Pool, fd: posix.socket_t, addr: *const posix.sockaddr } } -fn posixGetSockName(pool: *Pool, socket_fd: posix.fd_t, addr: *posix.sockaddr, addr_len: *posix.socklen_t) !void { +fn posixGetSockName(t: *Threaded, socket_fd: posix.fd_t, addr: *posix.sockaddr, addr_len: *posix.socklen_t) !void { while (true) { - try pool.checkCancel(); + try t.checkCancel(); switch (posix.errno(posix.system.getsockname(socket_fd, addr, addr_len))) { .SUCCESS => break, .INTR => continue, @@ -1935,10 +1935,10 @@ fn posixGetSockName(pool: *Pool, socket_fd: posix.fd_t, addr: *posix.sockaddr, a } } -fn setSocketOption(pool: *Pool, fd: posix.fd_t, level: i32, opt_name: u32, option: u32) !void { +fn setSocketOption(t: *Threaded, fd: posix.fd_t, level: i32, opt_name: u32, option: u32) !void { const o: []const u8 = @ptrCast(&option); while (true) { - try pool.checkCancel(); + try t.checkCancel(); switch (posix.errno(posix.system.setsockopt(fd, level, opt_name, o.ptr, @intCast(o.len)))) { .SUCCESS => return, .INTR => continue, @@ -1957,17 +1957,17 @@ fn netConnectIpPosix( options: IpAddress.ConnectOptions, ) IpAddress.ConnectError!net.Stream { if (options.timeout != .none) @panic("TODO"); - const pool: *Pool = @ptrCast(@alignCast(userdata)); + const t: *Threaded = @ptrCast(@alignCast(userdata)); const family = posixAddressFamily(address); - const socket_fd = try openSocketPosix(pool, family, .{ + const socket_fd = try openSocketPosix(t, family, .{ .mode = options.mode, .protocol = options.protocol, }); errdefer posix.close(socket_fd); var storage: PosixAddress = undefined; var addr_len = addressToPosix(address, &storage); - try posixConnect(pool, socket_fd, &storage.any, addr_len); - try posixGetSockName(pool, socket_fd, &storage.any, &addr_len); + try posixConnect(t, socket_fd, &storage.any, addr_len); + try posixGetSockName(t, socket_fd, &storage.any, &addr_len); return .{ .socket = .{ .handle = socket_fd, .address = addressFromPosix(&storage), @@ -1979,12 +1979,12 @@ fn netConnectUnix( address: *const net.UnixAddress, ) net.UnixAddress.ConnectError!net.Socket.Handle { if (!net.has_unix_sockets) return error.AddressFamilyUnsupported; - const pool: *Pool = @ptrCast(@alignCast(userdata)); - const socket_fd = try openSocketPosix(pool, posix.AF.UNIX, .{ .mode = .stream }); + const t: *Threaded = @ptrCast(@alignCast(userdata)); + const socket_fd = try openSocketPosix(t, posix.AF.UNIX, .{ .mode = .stream }); errdefer posix.close(socket_fd); var storage: UnixAddress = undefined; const addr_len = addressUnixToPosix(address, &storage); - try posixConnectUnix(pool, socket_fd, &storage.any, addr_len); + try posixConnectUnix(t, socket_fd, &storage.any, addr_len); return socket_fd; } @@ -1993,25 +1993,25 @@ fn netBindIpPosix( address: *const IpAddress, options: IpAddress.BindOptions, ) IpAddress.BindError!net.Socket { - const pool: *Pool = @ptrCast(@alignCast(userdata)); + const t: *Threaded = @ptrCast(@alignCast(userdata)); const family = posixAddressFamily(address); - const socket_fd = try openSocketPosix(pool, family, options); + const socket_fd = try openSocketPosix(t, family, options); errdefer posix.close(socket_fd); var storage: PosixAddress = undefined; var addr_len = addressToPosix(address, &storage); - try posixBind(pool, socket_fd, &storage.any, addr_len); - try posixGetSockName(pool, socket_fd, &storage.any, &addr_len); + try posixBind(t, socket_fd, &storage.any, addr_len); + try posixGetSockName(t, socket_fd, &storage.any, &addr_len); return .{ .handle = socket_fd, .address = addressFromPosix(&storage), }; } -fn openSocketPosix(pool: *Pool, family: posix.sa_family_t, options: IpAddress.BindOptions) !posix.socket_t { +fn openSocketPosix(t: *Threaded, family: posix.sa_family_t, options: IpAddress.BindOptions) !posix.socket_t { const mode = posixSocketMode(options.mode); const protocol = posixProtocol(options.protocol); const socket_fd = while (true) { - try pool.checkCancel(); + try t.checkCancel(); const flags: u32 = mode | if (socket_flags_unsupported) 0 else posix.SOCK.CLOEXEC; const socket_rc = posix.system.socket(family, flags, protocol); switch (posix.errno(socket_rc)) { @@ -2019,7 +2019,7 @@ fn openSocketPosix(pool: *Pool, family: posix.sa_family_t, options: IpAddress.Bi const fd: posix.fd_t = @intCast(socket_rc); errdefer posix.close(fd); if (socket_flags_unsupported) while (true) { - try pool.checkCancel(); + try t.checkCancel(); switch (posix.errno(posix.system.fcntl(fd, posix.F.SETFD, @as(usize, posix.FD_CLOEXEC)))) { .SUCCESS => break, .INTR => continue, @@ -2044,7 +2044,7 @@ fn openSocketPosix(pool: *Pool, family: posix.sa_family_t, options: IpAddress.Bi if (options.ip6_only) { if (posix.IPV6 == void) return error.OptionUnsupported; - try setSocketOption(pool, socket_fd, posix.IPPROTO.IPV6, posix.IPV6.V6ONLY, 0); + try setSocketOption(t, socket_fd, posix.IPPROTO.IPV6, posix.IPV6.V6ONLY, 0); } return socket_fd; @@ -2054,11 +2054,11 @@ const socket_flags_unsupported = builtin.os.tag.isDarwin() or native_os == .haik const have_accept4 = !socket_flags_unsupported; fn netAcceptPosix(userdata: ?*anyopaque, listen_fd: net.Socket.Handle) net.Server.AcceptError!net.Stream { - const pool: *Pool = @ptrCast(@alignCast(userdata)); + const t: *Threaded = @ptrCast(@alignCast(userdata)); var storage: PosixAddress = undefined; var addr_len: posix.socklen_t = @sizeOf(PosixAddress); const fd = while (true) { - try pool.checkCancel(); + try t.checkCancel(); const rc = if (have_accept4) posix.system.accept4(listen_fd, &storage.any, &addr_len, posix.SOCK.CLOEXEC) else @@ -2068,7 +2068,7 @@ fn netAcceptPosix(userdata: ?*anyopaque, listen_fd: net.Socket.Handle) net.Serve const fd: posix.fd_t = @intCast(rc); errdefer posix.close(fd); if (!have_accept4) while (true) { - try pool.checkCancel(); + try t.checkCancel(); switch (posix.errno(posix.system.fcntl(fd, posix.F.SETFD, @as(usize, posix.FD_CLOEXEC)))) { .SUCCESS => break, .INTR => continue, @@ -2101,7 +2101,7 @@ fn netAcceptPosix(userdata: ?*anyopaque, listen_fd: net.Socket.Handle) net.Serve } fn netReadPosix(userdata: ?*anyopaque, fd: net.Socket.Handle, data: [][]u8) net.Stream.Reader.Error!usize { - const pool: *Pool = @ptrCast(@alignCast(userdata)); + const t: *Threaded = @ptrCast(@alignCast(userdata)); var iovecs_buffer: [max_iovecs_len]posix.iovec = undefined; var i: usize = 0; @@ -2116,7 +2116,7 @@ fn netReadPosix(userdata: ?*anyopaque, fd: net.Socket.Handle, data: [][]u8) net. assert(dest[0].len > 0); if (native_os == .wasi and !builtin.link_libc) while (true) { - try pool.checkCancel(); + try t.checkCancel(); var n: usize = undefined; switch (std.os.wasi.fd_read(fd, dest.ptr, dest.len, &n)) { .SUCCESS => return n, @@ -2137,7 +2137,7 @@ fn netReadPosix(userdata: ?*anyopaque, fd: net.Socket.Handle, data: [][]u8) net. }; while (true) { - try pool.checkCancel(); + try t.checkCancel(); const rc = posix.system.readv(fd, dest.ptr, @intCast(dest.len)); switch (posix.errno(rc)) { .SUCCESS => return @intCast(rc), @@ -2167,7 +2167,7 @@ fn netSend( messages: []net.OutgoingMessage, flags: net.SendFlags, ) struct { ?net.Socket.SendError, usize } { - const pool: *Pool = @ptrCast(@alignCast(userdata)); + const t: *Threaded = @ptrCast(@alignCast(userdata)); const posix_flags: u32 = @as(u32, if (@hasDecl(posix.MSG, "CONFIRM") and flags.confirm) posix.MSG.CONFIRM else 0) | @@ -2180,17 +2180,17 @@ fn netSend( var i: usize = 0; while (messages.len - i != 0) { if (have_sendmmsg) { - i += netSendMany(pool, handle, messages[i..], posix_flags) catch |err| return .{ err, i }; + i += netSendMany(t, handle, messages[i..], posix_flags) catch |err| return .{ err, i }; continue; } - netSendOne(pool, handle, &messages[i], posix_flags) catch |err| return .{ err, i }; + netSendOne(t, handle, &messages[i], posix_flags) catch |err| return .{ err, i }; i += 1; } return .{ null, i }; } fn netSendOne( - pool: *Pool, + t: *Threaded, handle: net.Socket.Handle, message: *net.OutgoingMessage, flags: u32, @@ -2207,7 +2207,7 @@ fn netSendOne( .flags = 0, }; while (true) { - try pool.checkCancel(); + try t.checkCancel(); const rc = posix.system.sendmsg(handle, msg, flags); if (is_windows) { if (rc == windows.ws2_32.SOCKET_ERROR) { @@ -2274,7 +2274,7 @@ fn netSendOne( } fn netSendMany( - pool: *Pool, + t: *Threaded, handle: net.Socket.Handle, messages: []net.OutgoingMessage, flags: u32, @@ -2305,7 +2305,7 @@ fn netSendMany( } while (true) { - try pool.checkCancel(); + try t.checkCancel(); const rc = posix.system.sendmmsg(handle, clamped_msgs.ptr, @intCast(clamped_msgs.len), flags); switch (posix.errno(rc)) { .SUCCESS => { @@ -2348,7 +2348,7 @@ fn netReceive( flags: net.ReceiveFlags, timeout: Io.Timeout, ) struct { ?net.Socket.ReceiveTimeoutError, usize } { - const pool: *Pool = @ptrCast(@alignCast(userdata)); + const t: *Threaded = @ptrCast(@alignCast(userdata)); // recvmmsg is useless, here's why: // * [timeout bug](https://bugzilla.kernel.org/show_bug.cgi?id=75371) @@ -2375,10 +2375,10 @@ fn netReceive( var message_i: usize = 0; var data_i: usize = 0; - const deadline = timeout.toDeadline(pool.io()) catch |err| return .{ err, message_i }; + const deadline = timeout.toDeadline(t.io()) catch |err| return .{ err, message_i }; recv: while (true) { - pool.checkCancel() catch |err| return .{ err, message_i }; + t.checkCancel() catch |err| return .{ err, message_i }; if (message_buffer.len - message_i == 0) return .{ null, message_i }; const message = &message_buffer[message_i]; @@ -2416,12 +2416,12 @@ fn netReceive( continue; }, .AGAIN => while (true) { - pool.checkCancel() catch |err| return .{ err, message_i }; + t.checkCancel() catch |err| return .{ err, message_i }; if (message_i != 0) return .{ null, message_i }; const max_poll_ms = std.math.maxInt(u31); const timeout_ms: u31 = if (deadline) |d| t: { - const duration = d.durationFromNow(pool.io()) catch |err| return .{ err, message_i }; + const duration = d.durationFromNow(t.io()) catch |err| return .{ err, message_i }; if (duration.raw.nanoseconds <= 0) return .{ error.Timeout, message_i }; break :t @intCast(@min(max_poll_ms, duration.raw.toMilliseconds())); } else max_poll_ms; @@ -2473,8 +2473,8 @@ fn netWritePosix( data: []const []const u8, splat: usize, ) net.Stream.Writer.Error!usize { - const pool: *Pool = @ptrCast(@alignCast(userdata)); - try pool.checkCancel(); + const t: *Threaded = @ptrCast(@alignCast(userdata)); + try t.checkCancel(); var iovecs: [max_iovecs_len]posix.iovec_const = undefined; var msg: posix.msghdr_const = .{ @@ -2527,8 +2527,8 @@ fn addBuf(v: []posix.iovec_const, i: *@FieldType(posix.msghdr_const, "iovlen"), } fn netClose(userdata: ?*anyopaque, handle: net.Socket.Handle) void { - const pool: *Pool = @ptrCast(@alignCast(userdata)); - _ = pool; + const t: *Threaded = @ptrCast(@alignCast(userdata)); + _ = t; switch (native_os) { .windows => windows.closesocket(handle) catch recoverableOsBugDetected(), else => posix.close(handle), @@ -2539,10 +2539,10 @@ fn netInterfaceNameResolve( userdata: ?*anyopaque, name: *const net.Interface.Name, ) net.Interface.Name.ResolveError!net.Interface { - const pool: *Pool = @ptrCast(@alignCast(userdata)); + const t: *Threaded = @ptrCast(@alignCast(userdata)); if (native_os == .linux) { - const sock_fd = openSocketPosix(pool, posix.AF.UNIX, .{ .mode = .dgram }) catch |err| switch (err) { + const sock_fd = openSocketPosix(t, posix.AF.UNIX, .{ .mode = .dgram }) catch |err| switch (err) { error.ProcessFdQuotaExceeded => return error.SystemResources, error.SystemFdQuotaExceeded => return error.SystemResources, error.AddressFamilyUnsupported => return error.Unexpected, @@ -2559,7 +2559,7 @@ fn netInterfaceNameResolve( }; while (true) { - try pool.checkCancel(); + try t.checkCancel(); switch (posix.errno(posix.system.ioctl(sock_fd, posix.SIOCGIFINDEX, @intFromPtr(&ifr)))) { .SUCCESS => return .{ .index = @bitCast(ifr.ifru.ivalue) }, .INTR => continue, @@ -2576,14 +2576,14 @@ fn netInterfaceNameResolve( } if (native_os == .windows) { - try pool.checkCancel(); + try t.checkCancel(); const index = std.os.windows.ws2_32.if_nametoindex(&name.bytes); if (index == 0) return error.InterfaceNotFound; return .{ .index = index }; } if (builtin.link_libc) { - try pool.checkCancel(); + try t.checkCancel(); const index = std.c.if_nametoindex(&name.bytes); if (index == 0) return error.InterfaceNotFound; return .{ .index = @bitCast(index) }; @@ -2593,8 +2593,8 @@ fn netInterfaceNameResolve( } fn netInterfaceName(userdata: ?*anyopaque, interface: net.Interface) net.Interface.NameError!net.Interface.Name { - const pool: *Pool = @ptrCast(@alignCast(userdata)); - try pool.checkCancel(); + const t: *Threaded = @ptrCast(@alignCast(userdata)); + try t.checkCancel(); if (native_os == .linux) { _ = interface; @@ -2618,18 +2618,18 @@ fn netLookup( resolved: *Io.Queue(HostName.LookupResult), options: HostName.LookupOptions, ) void { - const pool: *Pool = @ptrCast(@alignCast(userdata)); - const pool_io = pool.io(); - resolved.putOneUncancelable(pool_io, .{ .end = netLookupFallible(pool, host_name, resolved, options) }); + const t: *Threaded = @ptrCast(@alignCast(userdata)); + const t_io = t.io(); + resolved.putOneUncancelable(t_io, .{ .end = netLookupFallible(t, host_name, resolved, options) }); } fn netLookupFallible( - pool: *Pool, + t: *Threaded, host_name: HostName, resolved: *Io.Queue(HostName.LookupResult), options: HostName.LookupOptions, ) !void { - const pool_io = pool.io(); + const t_io = t.io(); const name = host_name.bytes; assert(name.len <= HostName.max_len); @@ -2648,7 +2648,7 @@ fn netLookupFallible( if (native_os == .linux) { if (options.family != .ip4) { if (IpAddress.parseIp6(name, options.port)) |addr| { - try resolved.putAll(pool_io, &.{ + try resolved.putAll(t_io, &.{ .{ .address = addr }, .{ .canonical_name = copyCanon(options.canonical_name_buffer, name) }, }); @@ -2658,7 +2658,7 @@ fn netLookupFallible( if (options.family != .ip6) { if (IpAddress.parseIp4(name, options.port)) |addr| { - try resolved.putAll(pool_io, &.{ + try resolved.putAll(t_io, &.{ .{ .address = addr }, .{ .canonical_name = copyCanon(options.canonical_name_buffer, name) }, }); @@ -2666,7 +2666,7 @@ fn netLookupFallible( } else |_| {} } - lookupHosts(pool, host_name, resolved, options) catch |err| switch (err) { + lookupHosts(t, host_name, resolved, options) catch |err| switch (err) { error.UnknownHostName => {}, else => |e| return e, }; @@ -2697,11 +2697,11 @@ fn netLookupFallible( canon_name_dest.* = canon_name.*; results_buffer[results_index] = .{ .canonical_name = .{ .bytes = canon_name_dest } }; results_index += 1; - try resolved.putAll(pool_io, results_buffer[0..results_index]); + try resolved.putAll(t_io, results_buffer[0..results_index]); return; } - return lookupDnsSearch(pool, host_name, resolved, options); + return lookupDnsSearch(t, host_name, resolved, options); } if (native_os == .openbsd) { @@ -2953,13 +2953,13 @@ fn pathToPosix(file_path: []const u8, buffer: *[posix.PATH_MAX]u8) Io.Dir.PathNa } fn lookupDnsSearch( - pool: *Pool, + t: *Threaded, host_name: HostName, resolved: *Io.Queue(HostName.LookupResult), options: HostName.LookupOptions, ) HostName.LookupError!void { - const pool_io = pool.io(); - const rc = HostName.ResolvConf.init(pool_io) catch return error.ResolvConfParseFailed; + const t_io = t.io(); + const rc = HostName.ResolvConf.init(t_io) catch return error.ResolvConfParseFailed; // Count dots, suppress search when >=ndots or name ends in // a dot, which is an explicit request for global scope. @@ -2983,7 +2983,7 @@ fn lookupDnsSearch( while (it.next()) |token| { @memcpy(options.canonical_name_buffer[canon_name.len + 1 ..][0..token.len], token); const lookup_canon_name = options.canonical_name_buffer[0 .. canon_name.len + 1 + token.len]; - if (lookupDns(pool, lookup_canon_name, &rc, resolved, options)) |result| { + if (lookupDns(t, lookup_canon_name, &rc, resolved, options)) |result| { return result; } else |err| switch (err) { error.UnknownHostName => continue, @@ -2992,17 +2992,17 @@ fn lookupDnsSearch( } const lookup_canon_name = options.canonical_name_buffer[0..canon_name.len]; - return lookupDns(pool, lookup_canon_name, &rc, resolved, options); + return lookupDns(t, lookup_canon_name, &rc, resolved, options); } fn lookupDns( - pool: *Pool, + t: *Threaded, lookup_canon_name: []const u8, rc: *const HostName.ResolvConf, resolved: *Io.Queue(HostName.LookupResult), options: HostName.LookupOptions, ) HostName.LookupError!void { - const pool_io = pool.io(); + const t_io = t.io(); const family_records: [2]struct { af: IpAddress.Family, rr: u8 } = .{ .{ .af = .ip6, .rr = std.posix.RR.A }, .{ .af = .ip4, .rr = std.posix.RR.AAAA }, @@ -3032,7 +3032,7 @@ fn lookupDns( var socket = s: { if (any_ip6) ip6: { const ip6_addr: IpAddress = .{ .ip6 = .unspecified(0) }; - const socket = ip6_addr.bind(pool_io, .{ .ip6_only = true, .mode = .dgram }) catch |err| switch (err) { + const socket = ip6_addr.bind(t_io, .{ .ip6_only = true, .mode = .dgram }) catch |err| switch (err) { error.AddressFamilyUnsupported => break :ip6, else => |e| return e, }; @@ -3040,10 +3040,10 @@ fn lookupDns( } any_ip6 = false; const ip4_addr: IpAddress = .{ .ip4 = .unspecified(0) }; - const socket = try ip4_addr.bind(pool_io, .{ .mode = .dgram }); + const socket = try ip4_addr.bind(t_io, .{ .mode = .dgram }); break :s socket; }; - defer socket.close(pool_io); + defer socket.close(t_io); const mapped_nameservers = if (any_ip6) ip4_mapped[0..rc.nameservers_len] else rc.nameservers(); const queries = queries_buffer[0..nq]; @@ -3054,13 +3054,13 @@ fn lookupDns( // boot clock is chosen because time the computer is suspended should count // against time spent waiting for external messages to arrive. const clock: Io.Clock = .boot; - var now_ts = try clock.now(pool_io); + var now_ts = try clock.now(t_io); const final_ts = now_ts.addDuration(.fromSeconds(rc.timeout_seconds)); const attempt_duration: Io.Duration = .{ .nanoseconds = std.time.ns_per_s * @as(usize, rc.timeout_seconds) / rc.attempts, }; - send: while (now_ts.nanoseconds < final_ts.nanoseconds) : (now_ts = try clock.now(pool_io)) { + send: while (now_ts.nanoseconds < final_ts.nanoseconds) : (now_ts = try clock.now(t_io)) { const max_messages = queries_buffer.len * HostName.ResolvConf.max_nameservers; { var message_buffer: [max_messages]Io.net.OutgoingMessage = undefined; @@ -3076,7 +3076,7 @@ fn lookupDns( message_i += 1; } } - _ = netSend(pool, socket.handle, message_buffer[0..message_i], .{}); + _ = netSend(t, socket.handle, message_buffer[0..message_i], .{}); } const timeout: Io.Timeout = .{ .deadline = .{ @@ -3087,7 +3087,7 @@ fn lookupDns( while (true) { var message_buffer: [max_messages]Io.net.IncomingMessage = undefined; const buf = answer_buffer[answer_buffer_i..]; - const recv_err, const recv_n = socket.receiveManyTimeout(pool_io, &message_buffer, buf, .{}, timeout); + const recv_err, const recv_n = socket.receiveManyTimeout(t_io, &message_buffer, buf, .{}, timeout); for (message_buffer[0..recv_n]) |*received_message| { const reply = received_message.data; // Ignore non-identifiable packets. @@ -3124,7 +3124,7 @@ fn lookupDns( .data_ptr = query.ptr, .data_len = query.len, }; - _ = netSend(pool, socket.handle, (&retry_message)[0..1], .{}); + _ = netSend(t, socket.handle, (&retry_message)[0..1], .{}); continue; }, else => continue, @@ -3155,7 +3155,7 @@ fn lookupDns( std.posix.RR.A => { const data = record.packet[record.data_off..][0..record.data_len]; if (data.len != 4) return error.InvalidDnsARecord; - try resolved.putOne(pool_io, .{ .address = .{ .ip4 = .{ + try resolved.putOne(t_io, .{ .address = .{ .ip4 = .{ .bytes = data[0..4].*, .port = options.port, } } }); @@ -3164,7 +3164,7 @@ fn lookupDns( std.posix.RR.AAAA => { const data = record.packet[record.data_off..][0..record.data_len]; if (data.len != 16) return error.InvalidDnsAAAARecord; - try resolved.putOne(pool_io, .{ .address = .{ .ip6 = .{ + try resolved.putOne(t_io, .{ .address = .{ .ip6 = .{ .bytes = data[0..16].*, .port = options.port, } } }); @@ -3178,18 +3178,18 @@ fn lookupDns( }; } - try resolved.putOne(pool_io, .{ .canonical_name = canonical_name orelse .{ .bytes = lookup_canon_name } }); + try resolved.putOne(t_io, .{ .canonical_name = canonical_name orelse .{ .bytes = lookup_canon_name } }); if (addresses_len == 0) return error.NameServerFailure; } fn lookupHosts( - pool: *Pool, + t: *Threaded, host_name: HostName, resolved: *Io.Queue(HostName.LookupResult), options: HostName.LookupOptions, ) !void { - const pool_io = pool.io(); - const file = Io.File.openAbsolute(pool_io, "/etc/hosts", .{}) catch |err| switch (err) { + const t_io = t.io(); + const file = Io.File.openAbsolute(t_io, "/etc/hosts", .{}) catch |err| switch (err) { error.FileNotFound, error.NotDir, error.AccessDenied, @@ -3202,11 +3202,11 @@ fn lookupHosts( return error.DetectingNetworkConfigurationFailed; }, }; - defer file.close(pool_io); + defer file.close(t_io); var line_buf: [512]u8 = undefined; - var file_reader = file.reader(pool_io, &line_buf); - return lookupHostsReader(pool, host_name, resolved, options, &file_reader.interface) catch |err| switch (err) { + var file_reader = file.reader(t_io, &line_buf); + return lookupHostsReader(t, host_name, resolved, options, &file_reader.interface) catch |err| switch (err) { error.ReadFailed => switch (file_reader.err.?) { error.Canceled => |e| return e, else => { @@ -3220,13 +3220,13 @@ fn lookupHosts( } fn lookupHostsReader( - pool: *Pool, + t: *Threaded, host_name: HostName, resolved: *Io.Queue(HostName.LookupResult), options: HostName.LookupOptions, reader: *Io.Reader, ) error{ ReadFailed, Canceled, UnknownHostName }!void { - const pool_io = pool.io(); + const t_io = t.io(); var addresses_len: usize = 0; var canonical_name: ?HostName = null; while (true) { @@ -3268,19 +3268,19 @@ fn lookupHostsReader( if (options.family != .ip6) { if (IpAddress.parseIp4(ip_text, options.port)) |addr| { - try resolved.putOne(pool_io, .{ .address = addr }); + try resolved.putOne(t_io, .{ .address = addr }); addresses_len += 1; } else |_| {} } if (options.family != .ip4) { if (IpAddress.parseIp6(ip_text, options.port)) |addr| { - try resolved.putOne(pool_io, .{ .address = addr }); + try resolved.putOne(t_io, .{ .address = addr }); addresses_len += 1; } else |_| {} } } - if (canonical_name) |canon_name| try resolved.putOne(pool_io, .{ .canonical_name = canon_name }); + if (canonical_name) |canon_name| try resolved.putOne(t_io, .{ .canonical_name = canon_name }); if (addresses_len == 0) return error.UnknownHostName; }