diff --git a/BRANCH_TODO b/BRANCH_TODO index c270307737..53b157f41e 100644 --- a/BRANCH_TODO +++ b/BRANCH_TODO @@ -1,3 +1,4 @@ +* Threaded: rename Pool to Threaded * Threaded: finish linux impl (all tests passing) * Threaded: finish macos impl * Threaded: finish windows impl @@ -14,4 +15,6 @@ * move fs.File.Writer to Io * add non-blocking flag to net and fs operations, handle EAGAIN * finish moving std.fs to Io +* migrate child process into std.Io +* eliminate std.Io.poll (it should be replaced by "select" functionality) * finish moving all of std.posix into Threaded diff --git a/lib/std/Io.zig b/lib/std/Io.zig index 4a8f65060e..41aebcb712 100644 --- a/lib/std/Io.zig +++ b/lib/std/Io.zig @@ -649,9 +649,11 @@ pub const VTable = struct { select: *const fn (?*anyopaque, futures: []const *AnyFuture) usize, mutexLock: *const fn (?*anyopaque, prev_state: Mutex.State, mutex: *Mutex) Cancelable!void, + mutexLockUncancelable: *const fn (?*anyopaque, prev_state: Mutex.State, mutex: *Mutex) void, mutexUnlock: *const fn (?*anyopaque, prev_state: Mutex.State, mutex: *Mutex) void, conditionWait: *const fn (?*anyopaque, cond: *Condition, mutex: *Mutex) Cancelable!void, + conditionWaitUncancelable: *const fn (?*anyopaque, cond: *Condition, mutex: *Mutex) void, conditionWake: *const fn (?*anyopaque, cond: *Condition, wake: Condition.Wake) void, dirMake: *const fn (?*anyopaque, Dir, sub_path: []const u8, mode: Dir.Mode) Dir.MakeError!void, @@ -686,6 +688,7 @@ pub const VTable = struct { netClose: *const fn (?*anyopaque, handle: net.Socket.Handle) void, netInterfaceNameResolve: *const fn (?*anyopaque, *const net.Interface.Name) net.Interface.Name.ResolveError!net.Interface, netInterfaceName: *const fn (?*anyopaque, net.Interface) net.Interface.NameError!net.Interface.Name, + netLookup: *const fn (?*anyopaque, net.HostName, *Queue(net.HostName.LookupResult), net.HostName.LookupOptions) void, }; pub const Cancelable = error{ @@ -1030,7 +1033,7 @@ pub const Group = struct { } }; -pub const Mutex = if (true) struct { +pub const Mutex = struct { state: State, pub const State = enum(usize) { @@ -1073,54 +1076,32 @@ pub const Mutex = if (true) struct { return io.vtable.mutexLock(io.userdata, prev_state, mutex); } + /// Same as `lock` but cannot be canceled. + pub fn lockUncancelable(mutex: *Mutex, io: std.Io) void { + const prev_state: State = @enumFromInt(@atomicRmw( + usize, + @as(*usize, @ptrCast(&mutex.state)), + .And, + ~@intFromEnum(State.unlocked), + .acquire, + )); + if (prev_state.isUnlocked()) { + @branchHint(.likely); + return; + } + return io.vtable.mutexLockUncancelable(io.userdata, prev_state, mutex); + } + pub fn unlock(mutex: *Mutex, io: std.Io) void { const prev_state = @cmpxchgWeak(State, &mutex.state, .locked_once, .unlocked, .release, .acquire) orelse { @branchHint(.likely); return; }; - std.debug.assert(prev_state != .unlocked); // mutex not locked + assert(prev_state != .unlocked); // mutex not locked return io.vtable.mutexUnlock(io.userdata, prev_state, mutex); } -} else struct { - state: std.atomic.Value(u32), - - pub const State = void; - - pub const init: Mutex = .{ .state = .init(unlocked) }; - - pub const unlocked: u32 = 0b00; - pub const locked: u32 = 0b01; - pub const contended: u32 = 0b11; // must contain the `locked` bit for x86 optimization below - - pub fn tryLock(m: *Mutex) bool { - // On x86, use `lock bts` instead of `lock cmpxchg` as: - // - they both seem to mark the cache-line as modified regardless: https://stackoverflow.com/a/63350048 - // - `lock bts` is smaller instruction-wise which makes it better for inlining - if (builtin.target.cpu.arch.isX86()) { - const locked_bit = @ctz(locked); - return m.state.bitSet(locked_bit, .acquire) == 0; - } - - // Acquire barrier ensures grabbing the lock happens before the critical section - // and that the previous lock holder's critical section happens before we grab the lock. - return m.state.cmpxchgWeak(unlocked, locked, .acquire, .monotonic) == null; - } - - /// Avoids the vtable for uncontended locks. - pub fn lock(m: *Mutex, io: Io) Cancelable!void { - if (!m.tryLock()) { - @branchHint(.unlikely); - try io.vtable.mutexLock(io.userdata, {}, m); - } - } - - pub fn unlock(m: *Mutex, io: Io) void { - io.vtable.mutexUnlock(io.userdata, {}, m); - } }; -/// Supports exactly 1 waiter. More than 1 simultaneous wait on the same -/// condition is illegal. pub const Condition = struct { state: u64 = 0, @@ -1128,6 +1109,10 @@ pub const Condition = struct { return io.vtable.conditionWait(io.userdata, cond, mutex); } + pub fn waitUncancelable(cond: *Condition, io: Io, mutex: *Mutex) void { + return io.vtable.conditionWaitUncancelable(io.userdata, cond, mutex); + } + pub fn signal(cond: *Condition, io: Io) void { io.vtable.conditionWake(io.userdata, cond, .one); } @@ -1137,9 +1122,9 @@ pub const Condition = struct { } pub const Wake = enum { - /// wake up only one thread + /// Wake up only one thread. one, - /// wake up all thread + /// Wake up all threads. all, }; }; @@ -1180,10 +1165,24 @@ pub const TypeErasedQueue = struct { pub fn put(q: *TypeErasedQueue, io: Io, elements: []const u8, min: usize) Cancelable!usize { assert(elements.len >= min); - + if (elements.len == 0) return 0; try q.mutex.lock(io); defer q.mutex.unlock(io); + return putLocked(q, io, elements, min, false); + } + /// Same as `put` but cannot be canceled. + pub fn putUncancelable(q: *TypeErasedQueue, io: Io, elements: []const u8, min: usize) usize { + assert(elements.len >= min); + if (elements.len == 0) return 0; + q.mutex.lockUncancelable(io); + defer q.mutex.unlock(io); + return putLocked(q, io, elements, min, true) catch |err| switch (err) { + error.Canceled => unreachable, + }; + } + + fn putLocked(q: *TypeErasedQueue, io: Io, elements: []const u8, min: usize, uncancelable: bool) Cancelable!usize { // Getters have first priority on the data, and only when the getters // queue is empty do we start populating the buffer. @@ -1226,7 +1225,10 @@ pub const TypeErasedQueue = struct { var pending: Put = .{ .remaining = remaining, .condition = .{}, .node = .{} }; q.putters.append(&pending.node); - try pending.condition.wait(io, &q.mutex); + if (uncancelable) + pending.condition.waitUncancelable(io, &q.mutex) + else + try pending.condition.wait(io, &q.mutex); remaining = pending.remaining; } } @@ -1347,6 +1349,16 @@ pub fn Queue(Elem: type) type { return @divExact(try q.type_erased.put(io, @ptrCast(elements), min * @sizeOf(Elem)), @sizeOf(Elem)); } + /// Same as `put` but blocks until all elements have been added to the queue. + pub fn putAll(q: *@This(), io: Io, elements: []const Elem) Cancelable!void { + assert(try q.put(io, elements, elements.len) == elements.len); + } + + /// Same as `put` but cannot be interrupted. + pub fn putUncancelable(q: *@This(), io: Io, elements: []const Elem, min: usize) usize { + return @divExact(q.type_erased.putUncancelable(io, @ptrCast(elements), min * @sizeOf(Elem)), @sizeOf(Elem)); + } + /// Receives elements from the beginning of the queue. The function /// returns when at least `min` elements have been populated inside /// `buffer`. @@ -1362,11 +1374,20 @@ pub fn Queue(Elem: type) type { assert(try q.put(io, &.{item}, 1) == 1); } + pub fn putOneUncancelable(q: *@This(), io: Io, item: Elem) void { + assert(q.putUncancelable(io, &.{item}, 1) == 1); + } + pub fn getOne(q: *@This(), io: Io) Cancelable!Elem { var buf: [1]Elem = undefined; assert(try q.get(io, &buf, 1) == 1); return buf[0]; } + + /// Returns buffer length in `Elem` units. + pub fn capacity(q: *const @This()) usize { + return @divExact(q.type_erased.buffer.len, @sizeOf(Elem)); + } }; } diff --git a/lib/std/Io/EventLoop.zig b/lib/std/Io/EventLoop.zig index 4cb5745e31..84ea0035ae 100644 --- a/lib/std/Io/EventLoop.zig +++ b/lib/std/Io/EventLoop.zig @@ -1410,7 +1410,7 @@ fn pread(userdata: ?*anyopaque, file: Io.File, buffer: []u8, offset: std.posix.o .NOMEM => return error.SystemResources, .NOTCONN => return error.SocketUnconnected, .CONNRESET => return error.ConnectionResetByPeer, - .TIMEDOUT => return error.ConnectionTimedOut, + .TIMEDOUT => return error.Timeout, .NXIO => return error.Unseekable, .SPIPE => return error.Unseekable, .OVERFLOW => return error.Unseekable, diff --git a/lib/std/Io/File.zig b/lib/std/Io/File.zig index c2ceaa95b7..7bd3669e2e 100644 --- a/lib/std/Io/File.zig +++ b/lib/std/Io/File.zig @@ -153,7 +153,7 @@ pub const ReadStreamingError = error{ IsDir, BrokenPipe, ConnectionResetByPeer, - ConnectionTimedOut, + Timeout, NotOpenForReading, SocketUnconnected, /// This error occurs when no global event loop is configured, diff --git a/lib/std/Io/Threaded.zig b/lib/std/Io/Threaded.zig index 4ecc6e7b31..f0d7f3ea4b 100644 --- a/lib/std/Io/Threaded.zig +++ b/lib/std/Io/Threaded.zig @@ -8,6 +8,8 @@ const windows = std.os.windows; const std = @import("../std.zig"); const Io = std.Io; const net = std.Io.net; +const HostName = std.Io.net.HostName; +const IpAddress = std.Io.net.IpAddress; const Allocator = std.mem.Allocator; const assert = std.debug.assert; const posix = std.posix; @@ -156,9 +158,11 @@ pub fn io(pool: *Pool) Io { .groupCancel = groupCancel, .mutexLock = mutexLock, + .mutexLockUncancelable = mutexLockUncancelable, .mutexUnlock = mutexUnlock, .conditionWait = conditionWait, + .conditionWaitUncancelable = conditionWaitUncancelable, .conditionWake = conditionWake, .dirMake = switch (builtin.os.tag) { @@ -235,6 +239,7 @@ pub fn io(pool: *Pool) Io { .netReceive = netReceive, .netInterfaceNameResolve = netInterfaceNameResolve, .netInterfaceName = netInterfaceName, + .netLookup = netLookup, }, }; } @@ -653,26 +658,63 @@ fn checkCancel(pool: *Pool) error{Canceled}!void { if (cancelRequested(pool)) return error.Canceled; } -fn mutexLock(userdata: ?*anyopaque, prev_state: Io.Mutex.State, mutex: *Io.Mutex) error{Canceled}!void { - _ = userdata; +fn mutexLock(userdata: ?*anyopaque, prev_state: Io.Mutex.State, mutex: *Io.Mutex) Io.Cancelable!void { + const pool: *Pool = @ptrCast(@alignCast(userdata)); if (prev_state == .contended) { - std.Thread.Futex.wait(@ptrCast(&mutex.state), @intFromEnum(Io.Mutex.State.contended)); + try pool.checkCancel(); + futexWait(@ptrCast(&mutex.state), @intFromEnum(Io.Mutex.State.contended)); } - while (@atomicRmw( - Io.Mutex.State, - &mutex.state, - .Xchg, - .contended, - .acquire, - ) != .unlocked) { - std.Thread.Futex.wait(@ptrCast(&mutex.state), @intFromEnum(Io.Mutex.State.contended)); + while (@atomicRmw(Io.Mutex.State, &mutex.state, .Xchg, .contended, .acquire) != .unlocked) { + try pool.checkCancel(); + futexWait(@ptrCast(&mutex.state), @intFromEnum(Io.Mutex.State.contended)); } } + +fn mutexLockUncancelable(userdata: ?*anyopaque, prev_state: Io.Mutex.State, mutex: *Io.Mutex) void { + _ = userdata; + if (prev_state == .contended) { + futexWait(@ptrCast(&mutex.state), @intFromEnum(Io.Mutex.State.contended)); + } + while (@atomicRmw(Io.Mutex.State, &mutex.state, .Xchg, .contended, .acquire) != .unlocked) { + futexWait(@ptrCast(&mutex.state), @intFromEnum(Io.Mutex.State.contended)); + } +} + fn mutexUnlock(userdata: ?*anyopaque, prev_state: Io.Mutex.State, mutex: *Io.Mutex) void { _ = userdata; _ = prev_state; if (@atomicRmw(Io.Mutex.State, &mutex.state, .Xchg, .unlocked, .release) == .contended) { - std.Thread.Futex.wake(@ptrCast(&mutex.state), 1); + futexWake(@ptrCast(&mutex.state), 1); + } +} + +fn conditionWaitUncancelable(userdata: ?*anyopaque, cond: *Io.Condition, mutex: *Io.Mutex) void { + const pool: *Pool = @ptrCast(@alignCast(userdata)); + const pool_io = pool.io(); + comptime assert(@TypeOf(cond.state) == u64); + const ints: *[2]std.atomic.Value(u32) = @ptrCast(&cond.state); + const cond_state = &ints[0]; + const cond_epoch = &ints[1]; + const one_waiter = 1; + const waiter_mask = 0xffff; + const one_signal = 1 << 16; + const signal_mask = 0xffff << 16; + var epoch = cond_epoch.load(.acquire); + var state = cond_state.fetchAdd(one_waiter, .monotonic); + assert(state & waiter_mask != waiter_mask); + state += one_waiter; + + mutex.unlock(pool_io); + defer mutex.lockUncancelable(pool_io); + + while (true) { + futexWait(cond_epoch, epoch); + epoch = cond_epoch.load(.acquire); + state = cond_state.load(.monotonic); + while (state & signal_mask != 0) { + const new_state = state - one_waiter - one_signal; + state = cond_state.cmpxchgWeak(state, new_state, .acquire, .monotonic) orelse return; + } } } @@ -702,20 +744,18 @@ fn conditionWait(userdata: ?*anyopaque, cond: *Io.Condition, mutex: *Io.Mutex) I state += one_waiter; mutex.unlock(pool.io()); - defer mutex.lock(pool.io()) catch @panic("TODO"); - - var futex_deadline = std.Thread.Futex.Deadline.init(null); + defer mutex.lockUncancelable(pool.io()); while (true) { - futex_deadline.wait(cond_epoch, epoch) catch |err| switch (err) { - error.Timeout => unreachable, - }; + try pool.checkCancel(); + futexWait(cond_epoch, epoch); epoch = cond_epoch.load(.acquire); state = cond_state.load(.monotonic); - // Try to wake up by consuming a signal and decremented the waiter we added previously. - // Acquire barrier ensures code before the wake() which added the signal happens before we decrement it and return. + // Try to wake up by consuming a signal and decremented the waiter we + // added previously. Acquire barrier ensures code before the wake() + // which added the signal happens before we decrement it and return. while (state & signal_mask != 0) { const new_state = state - one_waiter - one_signal; state = cond_state.cmpxchgWeak(state, new_state, .acquire, .monotonic) orelse return; @@ -740,8 +780,10 @@ fn conditionWake(userdata: ?*anyopaque, cond: *Io.Condition, wake: Io.Condition. const signals = (state & signal_mask) / one_signal; // Reserves which waiters to wake up by incrementing the signals count. - // Therefore, the signals count is always less than or equal to the waiters count. - // We don't need to Futex.wake if there's nothing to wake up or if other wake() threads have reserved to wake up the current waiters. + // Therefore, the signals count is always less than or equal to the + // waiters count. We don't need to Futex.wake if there's nothing to + // wake up or if other wake() threads have reserved to wake up the + // current waiters. const wakeable = waiters - signals; if (wakeable == 0) { return; @@ -752,16 +794,23 @@ fn conditionWake(userdata: ?*anyopaque, cond: *Io.Condition, wake: Io.Condition. .all => wakeable, }; - // Reserve the amount of waiters to wake by incrementing the signals count. - // Release barrier ensures code before the wake() happens before the signal it posted and consumed by the wait() threads. + // Reserve the amount of waiters to wake by incrementing the signals + // count. Release barrier ensures code before the wake() happens before + // the signal it posted and consumed by the wait() threads. const new_state = state + (one_signal * to_wake); state = cond_state.cmpxchgWeak(state, new_state, .release, .monotonic) orelse { // Wake up the waiting threads we reserved above by changing the epoch value. - // NOTE: a waiting thread could miss a wake up if *exactly* ((1<<32)-1) wake()s happen between it observing the epoch and sleeping on it. - // This is very unlikely due to how many precise amount of Futex.wake() calls that would be between the waiting thread's potential preemption. // - // Release barrier ensures the signal being added to the state happens before the epoch is changed. - // If not, the waiting thread could potentially deadlock from missing both the state and epoch change: + // A waiting thread could miss a wake up if *exactly* ((1<<32)-1) + // wake()s happen between it observing the epoch and sleeping on + // it. This is very unlikely due to how many precise amount of + // Futex.wake() calls that would be between the waiting thread's + // potential preemption. + // + // Release barrier ensures the signal being added to the state + // happens before the epoch is changed. If not, the waiting thread + // could potentially deadlock from missing both the state and epoch + // change: // // - T2: UPDATE(&epoch, 1) (reordered before the state change) // - T1: e = LOAD(&epoch) @@ -769,7 +818,7 @@ fn conditionWake(userdata: ?*anyopaque, cond: *Io.Condition, wake: Io.Condition. // - T2: UPDATE(&state, signal) + FUTEX_WAKE(&epoch) // - T1: s & signals == 0 -> FUTEX_WAIT(&epoch, e) (missed both epoch change and state change) _ = cond_epoch.fetchAdd(1, .release); - std.Thread.Futex.wake(cond_epoch, to_wake); + futexWake(cond_epoch, to_wake); return; }; } @@ -1298,7 +1347,7 @@ fn fileReadStreaming(userdata: ?*anyopaque, file: Io.File, data: [][]u8) Io.File .NOMEM => return error.SystemResources, .NOTCONN => return error.SocketUnconnected, .CONNRESET => return error.ConnectionResetByPeer, - .TIMEDOUT => return error.ConnectionTimedOut, + .TIMEDOUT => return error.Timeout, .NOTCAPABLE => return error.AccessDenied, else => |err| return posix.unexpectedErrno(err), } @@ -1321,7 +1370,7 @@ fn fileReadStreaming(userdata: ?*anyopaque, file: Io.File, data: [][]u8) Io.File .NOMEM => return error.SystemResources, .NOTCONN => return error.SocketUnconnected, .CONNRESET => return error.ConnectionResetByPeer, - .TIMEDOUT => return error.ConnectionTimedOut, + .TIMEDOUT => return error.Timeout, else => |err| return posix.unexpectedErrno(err), } } @@ -1420,7 +1469,7 @@ fn fileReadPositional(userdata: ?*anyopaque, file: Io.File, data: [][]u8, offset .NOMEM => return error.SystemResources, .NOTCONN => return error.SocketUnconnected, .CONNRESET => return error.ConnectionResetByPeer, - .TIMEDOUT => return error.ConnectionTimedOut, + .TIMEDOUT => return error.Timeout, .NXIO => return error.Unseekable, .SPIPE => return error.Unseekable, .OVERFLOW => return error.Unseekable, @@ -1446,7 +1495,7 @@ fn fileReadPositional(userdata: ?*anyopaque, file: Io.File, data: [][]u8, offset .NOMEM => return error.SystemResources, .NOTCONN => return error.SocketUnconnected, .CONNRESET => return error.ConnectionResetByPeer, - .TIMEDOUT => return error.ConnectionTimedOut, + .TIMEDOUT => return error.Timeout, .NXIO => return error.Unseekable, .SPIPE => return error.Unseekable, .OVERFLOW => return error.Unseekable, @@ -1693,9 +1742,9 @@ fn select(userdata: ?*anyopaque, futures: []const *Io.AnyFuture) usize { fn netListenIpPosix( userdata: ?*anyopaque, - address: net.IpAddress, - options: net.IpAddress.ListenOptions, -) net.IpAddress.ListenError!net.Server { + address: IpAddress, + options: IpAddress.ListenOptions, +) IpAddress.ListenError!net.Server { const pool: *Pool = @ptrCast(@alignCast(userdata)); const family = posixAddressFamily(&address); const socket_fd = try openSocketPosix(pool, family, .{ @@ -1831,7 +1880,7 @@ fn posixConnect(pool: *Pool, socket_fd: posix.socket_t, addr: *const posix.socka .NETUNREACH => return error.NetworkUnreachable, .NOTSOCK => |err| return errnoBug(err), .PROTOTYPE => |err| return errnoBug(err), - .TIMEDOUT => return error.ConnectionTimedOut, + .TIMEDOUT => return error.Timeout, .CONNABORTED => |err| return errnoBug(err), .ACCES => return error.AccessDenied, .PERM => |err| return errnoBug(err), @@ -1904,9 +1953,9 @@ fn setSocketOption(pool: *Pool, fd: posix.fd_t, level: i32, opt_name: u32, optio fn netConnectIpPosix( userdata: ?*anyopaque, - address: *const net.IpAddress, - options: net.IpAddress.ConnectOptions, -) net.IpAddress.ConnectError!net.Stream { + address: *const IpAddress, + options: IpAddress.ConnectOptions, +) IpAddress.ConnectError!net.Stream { if (options.timeout != .none) @panic("TODO"); const pool: *Pool = @ptrCast(@alignCast(userdata)); const family = posixAddressFamily(address); @@ -1941,9 +1990,9 @@ fn netConnectUnix( fn netBindIpPosix( userdata: ?*anyopaque, - address: *const net.IpAddress, - options: net.IpAddress.BindOptions, -) net.IpAddress.BindError!net.Socket { + address: *const IpAddress, + options: IpAddress.BindOptions, +) IpAddress.BindError!net.Socket { const pool: *Pool = @ptrCast(@alignCast(userdata)); const family = posixAddressFamily(address); const socket_fd = try openSocketPosix(pool, family, options); @@ -1958,7 +2007,7 @@ fn netBindIpPosix( }; } -fn openSocketPosix(pool: *Pool, family: posix.sa_family_t, options: net.IpAddress.BindOptions) !posix.socket_t { +fn openSocketPosix(pool: *Pool, family: posix.sa_family_t, options: IpAddress.BindOptions) !posix.socket_t { const mode = posixSocketMode(options.mode); const protocol = posixProtocol(options.protocol); const socket_fd = while (true) { @@ -2081,7 +2130,7 @@ fn netReadPosix(userdata: ?*anyopaque, fd: net.Socket.Handle, data: [][]u8) net. .NOMEM => return error.SystemResources, .NOTCONN => return error.SocketUnconnected, .CONNRESET => return error.ConnectionResetByPeer, - .TIMEDOUT => return error.ConnectionTimedOut, + .TIMEDOUT => return error.Timeout, .NOTCAPABLE => return error.AccessDenied, else => |err| return posix.unexpectedErrno(err), } @@ -2102,7 +2151,7 @@ fn netReadPosix(userdata: ?*anyopaque, fd: net.Socket.Handle, data: [][]u8) net. .NOMEM => return error.SystemResources, .NOTCONN => return error.SocketUnconnected, .CONNRESET => return error.ConnectionResetByPeer, - .TIMEDOUT => return error.ConnectionTimedOut, + .TIMEDOUT => return error.Timeout, .PIPE => return error.BrokenPipe, .NETDOWN => return error.NetworkDown, else => |err| return posix.unexpectedErrno(err), @@ -2563,6 +2612,118 @@ fn netInterfaceName(userdata: ?*anyopaque, interface: net.Interface) net.Interfa @panic("unimplemented"); } +fn netLookup( + userdata: ?*anyopaque, + host_name: HostName, + resolved: *Io.Queue(HostName.LookupResult), + options: HostName.LookupOptions, +) void { + const pool: *Pool = @ptrCast(@alignCast(userdata)); + const pool_io = pool.io(); + resolved.putOneUncancelable(pool_io, .{ .end = netLookupFallible(pool, host_name, resolved, options) }); +} + +fn netLookupFallible( + pool: *Pool, + host_name: HostName, + resolved: *Io.Queue(HostName.LookupResult), + options: HostName.LookupOptions, +) !void { + const pool_io = pool.io(); + const name = host_name.bytes; + assert(name.len <= HostName.max_len); + + if (is_windows) { + // TODO use GetAddrInfoExW / GetAddrInfoExCancel + @compileError("TODO"); + } + + // On Linux, glibc provides getaddrinfo_a which is capable of supporting our semantics. + // However, musl's POSIX-compliant getaddrinfo is not, so we bypass it. + + if (builtin.target.isGnuLibC()) { + // TODO use getaddrinfo_a / gai_cancel + } + + if (native_os == .linux) { + if (options.family != .ip4) { + if (IpAddress.parseIp6(name, options.port)) |addr| { + try resolved.putAll(pool_io, &.{ + .{ .address = addr }, + .{ .canonical_name = copyCanon(options.canonical_name_buffer, name) }, + }); + return; + } else |_| {} + } + + if (options.family != .ip6) { + if (IpAddress.parseIp4(name, options.port)) |addr| { + try resolved.putAll(pool_io, &.{ + .{ .address = addr }, + .{ .canonical_name = copyCanon(options.canonical_name_buffer, name) }, + }); + } else |_| {} + } + + lookupHosts(pool, host_name, resolved, options) catch |err| switch (err) { + error.UnknownHostName => {}, + else => |e| return e, + }; + + // RFC 6761 Section 6.3.3 + // Name resolution APIs and libraries SHOULD recognize + // localhost names as special and SHOULD always return the IP + // loopback address for address queries and negative responses + // for all other query types. + + // Check for equal to "localhost(.)" or ends in ".localhost(.)" + const localhost = if (name[name.len - 1] == '.') "localhost." else "localhost"; + if (std.mem.endsWith(u8, name, localhost) and + (name.len == localhost.len or name[name.len - localhost.len] == '.')) + { + var results_buffer: [3]HostName.LookupResult = undefined; + var results_index: usize = 0; + if (options.family != .ip4) { + results_buffer[results_index] = .{ .address = .{ .ip6 = .loopback(options.port) } }; + results_index += 1; + } + if (options.family != .ip6) { + results_buffer[results_index] = .{ .address = .{ .ip4 = .loopback(options.port) } }; + results_index += 1; + } + const canon_name = "localhost"; + const canon_name_dest = options.canonical_name_buffer[0..canon_name.len]; + canon_name_dest.* = canon_name.*; + results_buffer[results_index] = .{ .canonical_name = .{ .bytes = canon_name_dest } }; + results_index += 1; + try resolved.putAll(pool_io, results_buffer[0..results_index]); + return; + } + + return lookupDnsSearch(pool, host_name, resolved, options); + } + + if (native_os == .openbsd) { + // TODO use getaddrinfo_async / asr_abort + } + + if (native_os == .freebsd) { + // TODO use dnsres_getaddrinfo + } + + if (native_os.isDarwin()) { + // TODO use CFHostStartInfoResolution / CFHostCancelInfoResolution + } + + if (builtin.link_libc) { + // This operating system lacks a way to resolve asynchronously. We are + // stuck with getaddrinfo. + @compileError("TODO"); + } + + return error.OptionUnsupported; +} + const PosixAddress = extern union { any: posix.sockaddr, in: posix.sockaddr.in, @@ -2574,14 +2735,14 @@ const UnixAddress = extern union { un: posix.sockaddr.un, }; -fn posixAddressFamily(a: *const net.IpAddress) posix.sa_family_t { +fn posixAddressFamily(a: *const IpAddress) posix.sa_family_t { return switch (a.*) { .ip4 => posix.AF.INET, .ip6 => posix.AF.INET6, }; } -fn addressFromPosix(posix_address: *PosixAddress) net.IpAddress { +fn addressFromPosix(posix_address: *PosixAddress) IpAddress { return switch (posix_address.any.family) { posix.AF.INET => .{ .ip4 = address4FromPosix(&posix_address.in) }, posix.AF.INET6 => .{ .ip6 = address6FromPosix(&posix_address.in6) }, @@ -2589,7 +2750,7 @@ fn addressFromPosix(posix_address: *PosixAddress) net.IpAddress { }; } -fn addressToPosix(a: *const net.IpAddress, storage: *PosixAddress) posix.socklen_t { +fn addressToPosix(a: *const IpAddress, storage: *PosixAddress) posix.socklen_t { return switch (a.*) { .ip4 => |ip4| { storage.in = address4ToPosix(ip4); @@ -2789,3 +2950,436 @@ fn pathToPosix(file_path: []const u8, buffer: *[posix.PATH_MAX]u8) Io.Dir.PathNa buffer[file_path.len] = 0; return buffer[0..file_path.len :0]; } + +fn lookupDnsSearch( + pool: *Pool, + host_name: HostName, + resolved: *Io.Queue(HostName.LookupResult), + options: HostName.LookupOptions, +) HostName.LookupError!void { + const pool_io = pool.io(); + const rc = HostName.ResolvConf.init(pool_io) catch return error.ResolvConfParseFailed; + + // Count dots, suppress search when >=ndots or name ends in + // a dot, which is an explicit request for global scope. + const dots = std.mem.countScalar(u8, host_name.bytes, '.'); + const search_len = if (dots >= rc.ndots or std.mem.endsWith(u8, host_name.bytes, ".")) 0 else rc.search_len; + const search = rc.search_buffer[0..search_len]; + + var canon_name = host_name.bytes; + + // Strip final dot for canon, fail if multiple trailing dots. + if (std.mem.endsWith(u8, canon_name, ".")) canon_name.len -= 1; + if (std.mem.endsWith(u8, canon_name, ".")) return error.UnknownHostName; + + // Name with search domain appended is set up in `canon_name`. This + // both provides the desired default canonical name (if the requested + // name is not a CNAME record) and serves as a buffer for passing the + // full requested name to `lookupDns`. + @memcpy(options.canonical_name_buffer[0..canon_name.len], canon_name); + options.canonical_name_buffer[canon_name.len] = '.'; + var it = std.mem.tokenizeAny(u8, search, " \t"); + while (it.next()) |token| { + @memcpy(options.canonical_name_buffer[canon_name.len + 1 ..][0..token.len], token); + const lookup_canon_name = options.canonical_name_buffer[0 .. canon_name.len + 1 + token.len]; + if (lookupDns(pool, lookup_canon_name, &rc, resolved, options)) |result| { + return result; + } else |err| switch (err) { + error.UnknownHostName => continue, + else => |e| return e, + } + } + + const lookup_canon_name = options.canonical_name_buffer[0..canon_name.len]; + return lookupDns(pool, lookup_canon_name, &rc, resolved, options); +} + +fn lookupDns( + pool: *Pool, + lookup_canon_name: []const u8, + rc: *const HostName.ResolvConf, + resolved: *Io.Queue(HostName.LookupResult), + options: HostName.LookupOptions, +) HostName.LookupError!void { + const pool_io = pool.io(); + const family_records: [2]struct { af: IpAddress.Family, rr: u8 } = .{ + .{ .af = .ip6, .rr = std.posix.RR.A }, + .{ .af = .ip4, .rr = std.posix.RR.AAAA }, + }; + var query_buffers: [2][280]u8 = undefined; + var answer_buffer: [2 * 512]u8 = undefined; + var queries_buffer: [2][]const u8 = undefined; + var answers_buffer: [2][]const u8 = undefined; + var nq: usize = 0; + var answer_buffer_i: usize = 0; + + for (family_records) |fr| { + if (options.family != fr.af) { + const entropy = std.crypto.random.array(u8, 2); + const len = writeResolutionQuery(&query_buffers[nq], 0, lookup_canon_name, 1, fr.rr, entropy); + queries_buffer[nq] = query_buffers[nq][0..len]; + nq += 1; + } + } + + var ip4_mapped: [HostName.ResolvConf.max_nameservers]IpAddress = undefined; + var any_ip6 = false; + for (rc.nameservers(), &ip4_mapped) |*ns, *m| { + m.* = .{ .ip6 = .fromAny(ns.*) }; + any_ip6 = any_ip6 or ns.* == .ip6; + } + var socket = s: { + if (any_ip6) ip6: { + const ip6_addr: IpAddress = .{ .ip6 = .unspecified(0) }; + const socket = ip6_addr.bind(pool_io, .{ .ip6_only = true, .mode = .dgram }) catch |err| switch (err) { + error.AddressFamilyUnsupported => break :ip6, + else => |e| return e, + }; + break :s socket; + } + any_ip6 = false; + const ip4_addr: IpAddress = .{ .ip4 = .unspecified(0) }; + const socket = try ip4_addr.bind(pool_io, .{ .mode = .dgram }); + break :s socket; + }; + defer socket.close(pool_io); + + const mapped_nameservers = if (any_ip6) ip4_mapped[0..rc.nameservers_len] else rc.nameservers(); + const queries = queries_buffer[0..nq]; + const answers = answers_buffer[0..queries.len]; + var answers_remaining = answers.len; + for (answers) |*answer| answer.len = 0; + + // boot clock is chosen because time the computer is suspended should count + // against time spent waiting for external messages to arrive. + const clock: Io.Clock = .boot; + var now_ts = try clock.now(pool_io); + const final_ts = now_ts.addDuration(.fromSeconds(rc.timeout_seconds)); + const attempt_duration: Io.Duration = .{ + .nanoseconds = std.time.ns_per_s * @as(usize, rc.timeout_seconds) / rc.attempts, + }; + + send: while (now_ts.nanoseconds < final_ts.nanoseconds) : (now_ts = try clock.now(pool_io)) { + const max_messages = queries_buffer.len * HostName.ResolvConf.max_nameservers; + { + var message_buffer: [max_messages]Io.net.OutgoingMessage = undefined; + var message_i: usize = 0; + for (queries, answers) |query, *answer| { + if (answer.len != 0) continue; + for (mapped_nameservers) |*ns| { + message_buffer[message_i] = .{ + .address = ns, + .data_ptr = query.ptr, + .data_len = query.len, + }; + message_i += 1; + } + } + _ = netSend(pool, socket.handle, message_buffer[0..message_i], .{}); + } + + const timeout: Io.Timeout = .{ .deadline = .{ + .raw = now_ts.addDuration(attempt_duration), + .clock = clock, + } }; + + while (true) { + var message_buffer: [max_messages]Io.net.IncomingMessage = undefined; + const buf = answer_buffer[answer_buffer_i..]; + const recv_err, const recv_n = socket.receiveManyTimeout(pool_io, &message_buffer, buf, .{}, timeout); + for (message_buffer[0..recv_n]) |*received_message| { + const reply = received_message.data; + // Ignore non-identifiable packets. + if (reply.len < 4) continue; + + // Ignore replies from addresses we didn't send to. + const ns = for (mapped_nameservers) |*ns| { + if (received_message.from.eql(ns)) break ns; + } else { + continue; + }; + + // Find which query this answer goes with, if any. + const query, const answer = for (queries, answers) |query, *answer| { + if (reply[0] == query[0] and reply[1] == query[1]) break .{ query, answer }; + } else { + continue; + }; + if (answer.len != 0) continue; + + // Only accept positive or negative responses; retry immediately on + // server failure, and ignore all other codes such as refusal. + switch (reply[3] & 15) { + 0, 3 => { + answer.* = reply; + answer_buffer_i += reply.len; + answers_remaining -= 1; + if (answer_buffer.len - answer_buffer_i == 0) break :send; + if (answers_remaining == 0) break :send; + }, + 2 => { + var retry_message: Io.net.OutgoingMessage = .{ + .address = ns, + .data_ptr = query.ptr, + .data_len = query.len, + }; + _ = netSend(pool, socket.handle, (&retry_message)[0..1], .{}); + continue; + }, + else => continue, + } + } + if (recv_err) |err| switch (err) { + error.Canceled => return error.Canceled, + error.Timeout => continue :send, + else => continue, + }; + } + } else { + return error.NameServerFailure; + } + + var addresses_len: usize = 0; + var canonical_name: ?HostName = null; + + for (answers) |answer| { + var it = HostName.DnsResponse.init(answer) catch { + // TODO accept a diagnostics struct and append warnings + continue; + }; + while (it.next() catch { + // TODO accept a diagnostics struct and append warnings + continue; + }) |record| switch (record.rr) { + std.posix.RR.A => { + const data = record.packet[record.data_off..][0..record.data_len]; + if (data.len != 4) return error.InvalidDnsARecord; + try resolved.putOne(pool_io, .{ .address = .{ .ip4 = .{ + .bytes = data[0..4].*, + .port = options.port, + } } }); + addresses_len += 1; + }, + std.posix.RR.AAAA => { + const data = record.packet[record.data_off..][0..record.data_len]; + if (data.len != 16) return error.InvalidDnsAAAARecord; + try resolved.putOne(pool_io, .{ .address = .{ .ip6 = .{ + .bytes = data[0..16].*, + .port = options.port, + } } }); + addresses_len += 1; + }, + std.posix.RR.CNAME => { + _, canonical_name = HostName.expand(record.packet, record.data_off, options.canonical_name_buffer) catch + return error.InvalidDnsCnameRecord; + }, + else => continue, + }; + } + + try resolved.putOne(pool_io, .{ .canonical_name = canonical_name orelse .{ .bytes = lookup_canon_name } }); + if (addresses_len == 0) return error.NameServerFailure; +} + +fn lookupHosts( + pool: *Pool, + host_name: HostName, + resolved: *Io.Queue(HostName.LookupResult), + options: HostName.LookupOptions, +) !void { + const pool_io = pool.io(); + const file = Io.File.openAbsolute(pool_io, "/etc/hosts", .{}) catch |err| switch (err) { + error.FileNotFound, + error.NotDir, + error.AccessDenied, + => return error.UnknownHostName, + + error.Canceled => |e| return e, + + else => { + // TODO populate optional diagnostic struct + return error.DetectingNetworkConfigurationFailed; + }, + }; + defer file.close(pool_io); + + var line_buf: [512]u8 = undefined; + var file_reader = file.reader(pool_io, &line_buf); + return lookupHostsReader(pool, host_name, resolved, options, &file_reader.interface) catch |err| switch (err) { + error.ReadFailed => switch (file_reader.err.?) { + error.Canceled => |e| return e, + else => { + // TODO populate optional diagnostic struct + return error.DetectingNetworkConfigurationFailed; + }, + }, + error.Canceled => |e| return e, + error.UnknownHostName => |e| return e, + }; +} + +fn lookupHostsReader( + pool: *Pool, + host_name: HostName, + resolved: *Io.Queue(HostName.LookupResult), + options: HostName.LookupOptions, + reader: *Io.Reader, +) error{ ReadFailed, Canceled, UnknownHostName }!void { + const pool_io = pool.io(); + var addresses_len: usize = 0; + var canonical_name: ?HostName = null; + while (true) { + const line = reader.takeDelimiterExclusive('\n') catch |err| switch (err) { + error.StreamTooLong => { + // Skip lines that are too long. + _ = reader.discardDelimiterInclusive('\n') catch |e| switch (e) { + error.EndOfStream => break, + error.ReadFailed => return error.ReadFailed, + }; + continue; + }, + error.ReadFailed => return error.ReadFailed, + error.EndOfStream => break, + }; + reader.toss(1); + var split_it = std.mem.splitScalar(u8, line, '#'); + const no_comment_line = split_it.first(); + + var line_it = std.mem.tokenizeAny(u8, no_comment_line, " \t"); + const ip_text = line_it.next() orelse continue; + var first_name_text: ?[]const u8 = null; + while (line_it.next()) |name_text| { + if (std.mem.eql(u8, name_text, host_name.bytes)) { + if (first_name_text == null) first_name_text = name_text; + break; + } + } else continue; + + if (canonical_name == null) { + if (HostName.init(first_name_text.?)) |name_text| { + if (name_text.bytes.len <= options.canonical_name_buffer.len) { + const canonical_name_dest = options.canonical_name_buffer[0..name_text.bytes.len]; + @memcpy(canonical_name_dest, name_text.bytes); + canonical_name = .{ .bytes = canonical_name_dest }; + } + } else |_| {} + } + + if (options.family != .ip6) { + if (IpAddress.parseIp4(ip_text, options.port)) |addr| { + try resolved.putOne(pool_io, .{ .address = addr }); + addresses_len += 1; + } else |_| {} + } + if (options.family != .ip4) { + if (IpAddress.parseIp6(ip_text, options.port)) |addr| { + try resolved.putOne(pool_io, .{ .address = addr }); + addresses_len += 1; + } else |_| {} + } + } + + if (canonical_name) |canon_name| try resolved.putOne(pool_io, .{ .canonical_name = canon_name }); + if (addresses_len == 0) return error.UnknownHostName; +} + +/// Writes DNS resolution query packet data to `w`; at most 280 bytes. +fn writeResolutionQuery(q: *[280]u8, op: u4, dname: []const u8, class: u8, ty: u8, entropy: [2]u8) usize { + // This implementation is ported from musl libc. + // A more idiomatic "ziggy" implementation would be welcome. + var name = dname; + if (std.mem.endsWith(u8, name, ".")) name.len -= 1; + assert(name.len <= 253); + const n = 17 + name.len + @intFromBool(name.len != 0); + + // Construct query template - ID will be filled later + q[0..2].* = entropy; + @memset(q[2..n], 0); + q[2] = @as(u8, op) * 8 + 1; + q[5] = 1; + @memcpy(q[13..][0..name.len], name); + var i: usize = 13; + var j: usize = undefined; + while (q[i] != 0) : (i = j + 1) { + j = i; + while (q[j] != 0 and q[j] != '.') : (j += 1) {} + // TODO determine the circumstances for this and whether or + // not this should be an error. + if (j - i - 1 > 62) unreachable; + q[i - 1] = @intCast(j - i); + } + q[i + 1] = ty; + q[i + 3] = class; + return n; +} + +fn copyCanon(canonical_name_buffer: *[HostName.max_len]u8, name: []const u8) HostName { + const dest = canonical_name_buffer[0..name.len]; + @memcpy(dest, name); + return .{ .bytes = dest }; +} + +pub fn futexWait(ptr: *const std.atomic.Value(u32), expect: u32) void { + @branchHint(.cold); + + if (native_os == .linux) { + const linux = std.os.linux; + const rc = linux.futex_4arg(ptr, .{ .cmd = .WAIT, .private = true }, expect, null); + if (builtin.mode == .Debug) switch (linux.E.init(rc)) { + .SUCCESS => {}, // notified by `wake()` + .INTR => {}, // gives caller a chance to check cancellation + .AGAIN => {}, // ptr.* != expect + .INVAL => {}, // possibly timeout overflow + .TIMEDOUT => unreachable, + .FAULT => unreachable, // ptr was invalid + else => unreachable, + }; + return; + } + + @compileError("TODO"); +} + +pub fn futexWaitDuration(ptr: *const std.atomic.Value(u32), expect: u32, timeout: Io.Duration) void { + @branchHint(.cold); + + if (native_os == .linux) { + const linux = std.os.linux; + var ts = timestampToPosix(timeout.toNanoseconds()); + const rc = linux.futex_4arg(ptr, .{ .cmd = .WAIT, .private = true }, expect, &ts); + if (builtin.mode == .Debug) switch (linux.E.init(rc)) { + .SUCCESS => {}, // notified by `wake()` + .INTR => {}, // gives caller a chance to check cancellation + .AGAIN => {}, // ptr.* != expect + .TIMEDOUT => {}, + .INVAL => {}, // possibly timeout overflow + .FAULT => unreachable, // ptr was invalid + else => unreachable, + }; + return; + } + + @compileError("TODO"); +} + +pub fn futexWake(ptr: *const std.atomic.Value(u32), max_waiters: u32) void { + @branchHint(.cold); + + if (native_os == .linux) { + const linux = std.os.linux; + const rc = linux.futex_3arg( + &ptr.raw, + .{ .cmd = .WAKE, .private = true }, + @min(max_waiters, std.math.maxInt(i32)), + ); + if (builtin.mode == .Debug) switch (linux.E.init(rc)) { + .SUCCESS => {}, // successful wake up + .INVAL => {}, // invalid futex_wait() on ptr done elsewhere + .FAULT => {}, // pointer became invalid while doing the wake + else => unreachable, + }; + return; + } + + @compileError("TODO"); +} diff --git a/lib/std/Io/net.zig b/lib/std/Io/net.zig index e8aadde38c..53cfee60c5 100644 --- a/lib/std/Io/net.zig +++ b/lib/std/Io/net.zig @@ -281,7 +281,6 @@ pub const IpAddress = union(enum) { } pub const ConnectError = error{ - AddressInUse, AddressUnavailable, AddressFamilyUnsupported, /// Insufficient memory or other resource internal to the operating system. @@ -291,7 +290,7 @@ pub const IpAddress = union(enum) { ConnectionResetByPeer, HostUnreachable, NetworkUnreachable, - ConnectionTimedOut, + Timeout, /// One of the `ConnectOptions` is not supported by the Io /// implementation. OptionUnsupported, @@ -1165,7 +1164,7 @@ pub const Stream = struct { SystemResources, BrokenPipe, ConnectionResetByPeer, - ConnectionTimedOut, + Timeout, SocketUnconnected, /// The file descriptor does not hold the required rights to read /// from it. diff --git a/lib/std/Io/net/HostName.zig b/lib/std/Io/net/HostName.zig index 4d0df744c4..788c40dd0c 100644 --- a/lib/std/Io/net/HostName.zig +++ b/lib/std/Io/net/HostName.zig @@ -63,8 +63,6 @@ pub fn eql(a: HostName, b: HostName) bool { pub const LookupOptions = struct { port: u16, - /// Must have at least length 2. - addresses_buffer: []IpAddress, canonical_name_buffer: *[max_len]u8, /// `null` means either. family: ?IpAddress.Family = null, @@ -81,487 +79,23 @@ pub const LookupError = error{ DetectingNetworkConfigurationFailed, } || Io.Clock.Error || IpAddress.BindError || Io.Cancelable; -pub const LookupResult = struct { - /// How many `LookupOptions.addresses_buffer` elements are populated. - addresses_len: usize, +pub const LookupResult = union(enum) { + address: IpAddress, canonical_name: HostName, - - pub const empty: LookupResult = .{ - .addresses_len = 0, - .canonical_name = undefined, - }; + end: LookupError!void, }; -pub fn lookup(host_name: HostName, io: Io, options: LookupOptions) LookupError!LookupResult { - const name = host_name.bytes; - assert(name.len <= max_len); - assert(options.addresses_buffer.len >= 2); - - if (native_os == .windows) @compileError("TODO"); - if (builtin.link_libc) @compileError("TODO"); - if (native_os == .linux) { - if (options.family != .ip6) { - if (IpAddress.parseIp4(name, options.port)) |addr| { - options.addresses_buffer[0] = addr; - return .{ .addresses_len = 1, .canonical_name = copyCanon(options.canonical_name_buffer, name) }; - } else |_| {} - } - if (options.family != .ip4) { - if (IpAddress.parseIp6(name, options.port)) |addr| { - options.addresses_buffer[0] = addr; - return .{ .addresses_len = 1, .canonical_name = copyCanon(options.canonical_name_buffer, name) }; - } else |_| {} - } - { - const result = try lookupHosts(host_name, io, options); - if (result.addresses_len > 0) return sortLookupResults(options, result); - } - { - // RFC 6761 Section 6.3.3 - // Name resolution APIs and libraries SHOULD recognize - // localhost names as special and SHOULD always return the IP - // loopback address for address queries and negative responses - // for all other query types. - - // Check for equal to "localhost(.)" or ends in ".localhost(.)" - const localhost = if (name[name.len - 1] == '.') "localhost." else "localhost"; - if (std.mem.endsWith(u8, name, localhost) and - (name.len == localhost.len or name[name.len - localhost.len] == '.')) - { - var i: usize = 0; - if (options.family != .ip6) { - options.addresses_buffer[i] = .{ .ip4 = .loopback(options.port) }; - i += 1; - } - if (options.family != .ip4) { - options.addresses_buffer[i] = .{ .ip6 = .loopback(options.port) }; - i += 1; - } - const canon_name = "localhost"; - const canon_name_dest = options.canonical_name_buffer[0..canon_name.len]; - canon_name_dest.* = canon_name.*; - return sortLookupResults(options, .{ - .addresses_len = i, - .canonical_name = .{ .bytes = canon_name_dest }, - }); - } - } - { - const result = try lookupDnsSearch(host_name, io, options); - if (result.addresses_len > 0) return sortLookupResults(options, result); - } - return error.UnknownHostName; - } - @compileError("unimplemented"); -} - -fn sortLookupResults(options: LookupOptions, result: LookupResult) !LookupResult { - const addresses = options.addresses_buffer[0..result.addresses_len]; - // No further processing is needed if there are fewer than 2 results or - // if there are only IPv4 results. - if (addresses.len < 2) return result; - const all_ip4 = for (addresses) |a| switch (a) { - .ip4 => continue, - .ip6 => break false, - } else true; - if (all_ip4) return result; - - // RFC 3484/6724 describes how destination address selection is - // supposed to work. However, to implement it requires making a bunch - // of networking syscalls, which is unnecessarily high latency, - // especially if implemented serially. Furthermore, rules 3, 4, and 7 - // have excessive runtime and code size cost and dubious benefit. - // - // Therefore, this logic sorts only using values available without - // doing any syscalls, relying on the calling code to have a - // meta-strategy such as attempting connection to multiple results at - // once and keeping the fastest response while canceling the others. - - const S = struct { - pub fn lessThan(s: @This(), lhs: IpAddress, rhs: IpAddress) bool { - return sortKey(s, lhs) < sortKey(s, rhs); - } - - fn sortKey(s: @This(), a: IpAddress) i32 { - _ = s; - var da6: Ip6Address = .{ - .port = 65535, - .bytes = undefined, - }; - switch (a) { - .ip6 => |ip6| { - da6.bytes = ip6.bytes; - da6.interface = ip6.interface; - }, - .ip4 => |ip4| { - da6.bytes[0..12].* = "\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\xff\xff".*; - da6.bytes[12..].* = ip4.bytes; - }, - } - const da6_scope: i32 = da6.scope(); - const da6_prec: i32 = da6.policy().prec; - var key: i32 = 0; - key |= da6_prec << 20; - key |= (15 - da6_scope) << 16; - return key; - } - }; - std.mem.sort(IpAddress, addresses, @as(S, .{}), S.lessThan); - return result; -} - -fn lookupDnsSearch(host_name: HostName, io: Io, options: LookupOptions) LookupError!LookupResult { - const rc = ResolvConf.init(io) catch return error.ResolvConfParseFailed; - - // Count dots, suppress search when >=ndots or name ends in - // a dot, which is an explicit request for global scope. - const dots = std.mem.countScalar(u8, host_name.bytes, '.'); - const search_len = if (dots >= rc.ndots or std.mem.endsWith(u8, host_name.bytes, ".")) 0 else rc.search_len; - const search = rc.search_buffer[0..search_len]; - - var canon_name = host_name.bytes; - - // Strip final dot for canon, fail if multiple trailing dots. - if (std.mem.endsWith(u8, canon_name, ".")) canon_name.len -= 1; - if (std.mem.endsWith(u8, canon_name, ".")) return error.UnknownHostName; - - // Name with search domain appended is set up in `canon_name`. This - // both provides the desired default canonical name (if the requested - // name is not a CNAME record) and serves as a buffer for passing the - // full requested name to `lookupDns`. - @memcpy(options.canonical_name_buffer[0..canon_name.len], canon_name); - options.canonical_name_buffer[canon_name.len] = '.'; - var it = std.mem.tokenizeAny(u8, search, " \t"); - while (it.next()) |token| { - @memcpy(options.canonical_name_buffer[canon_name.len + 1 ..][0..token.len], token); - const lookup_canon_name = options.canonical_name_buffer[0 .. canon_name.len + 1 + token.len]; - const result = try lookupDns(io, lookup_canon_name, &rc, options); - if (result.addresses_len > 0) return sortLookupResults(options, result); - } - - const lookup_canon_name = options.canonical_name_buffer[0..canon_name.len]; - return lookupDns(io, lookup_canon_name, &rc, options); -} - -fn lookupDns(io: Io, lookup_canon_name: []const u8, rc: *const ResolvConf, options: LookupOptions) LookupError!LookupResult { - const family_records: [2]struct { af: IpAddress.Family, rr: u8 } = .{ - .{ .af = .ip6, .rr = std.posix.RR.A }, - .{ .af = .ip4, .rr = std.posix.RR.AAAA }, - }; - var query_buffers: [2][280]u8 = undefined; - var answer_buffer: [2 * 512]u8 = undefined; - var queries_buffer: [2][]const u8 = undefined; - var answers_buffer: [2][]const u8 = undefined; - var nq: usize = 0; - var answer_buffer_i: usize = 0; - - for (family_records) |fr| { - if (options.family != fr.af) { - const entropy = std.crypto.random.array(u8, 2); - const len = writeResolutionQuery(&query_buffers[nq], 0, lookup_canon_name, 1, fr.rr, entropy); - queries_buffer[nq] = query_buffers[nq][0..len]; - nq += 1; - } - } - - var ip4_mapped: [ResolvConf.max_nameservers]IpAddress = undefined; - var any_ip6 = false; - for (rc.nameservers(), &ip4_mapped) |*ns, *m| { - m.* = .{ .ip6 = .fromAny(ns.*) }; - any_ip6 = any_ip6 or ns.* == .ip6; - } - var socket = s: { - if (any_ip6) ip6: { - const ip6_addr: IpAddress = .{ .ip6 = .unspecified(0) }; - const socket = ip6_addr.bind(io, .{ .ip6_only = true, .mode = .dgram }) catch |err| switch (err) { - error.AddressFamilyUnsupported => break :ip6, - else => |e| return e, - }; - break :s socket; - } - any_ip6 = false; - const ip4_addr: IpAddress = .{ .ip4 = .unspecified(0) }; - const socket = try ip4_addr.bind(io, .{ .mode = .dgram }); - break :s socket; - }; - defer socket.close(io); - - const mapped_nameservers = if (any_ip6) ip4_mapped[0..rc.nameservers_len] else rc.nameservers(); - const queries = queries_buffer[0..nq]; - const answers = answers_buffer[0..queries.len]; - var answers_remaining = answers.len; - for (answers) |*answer| answer.len = 0; - - // boot clock is chosen because time the computer is suspended should count - // against time spent waiting for external messages to arrive. - const clock: Io.Clock = .boot; - var now_ts = try clock.now(io); - const final_ts = now_ts.addDuration(.fromSeconds(rc.timeout_seconds)); - const attempt_duration: Io.Duration = .{ - .nanoseconds = std.time.ns_per_s * @as(usize, rc.timeout_seconds) / rc.attempts, - }; - - send: while (now_ts.nanoseconds < final_ts.nanoseconds) : (now_ts = try clock.now(io)) { - const max_messages = queries_buffer.len * ResolvConf.max_nameservers; - { - var message_buffer: [max_messages]Io.net.OutgoingMessage = undefined; - var message_i: usize = 0; - for (queries, answers) |query, *answer| { - if (answer.len != 0) continue; - for (mapped_nameservers) |*ns| { - message_buffer[message_i] = .{ - .address = ns, - .data_ptr = query.ptr, - .data_len = query.len, - }; - message_i += 1; - } - } - _ = io.vtable.netSend(io.userdata, socket.handle, message_buffer[0..message_i], .{}); - } - - const timeout: Io.Timeout = .{ .deadline = .{ - .raw = now_ts.addDuration(attempt_duration), - .clock = clock, - } }; - - while (true) { - var message_buffer: [max_messages]Io.net.IncomingMessage = undefined; - const buf = answer_buffer[answer_buffer_i..]; - const recv_err, const recv_n = socket.receiveManyTimeout(io, &message_buffer, buf, .{}, timeout); - for (message_buffer[0..recv_n]) |*received_message| { - const reply = received_message.data; - // Ignore non-identifiable packets. - if (reply.len < 4) continue; - - // Ignore replies from addresses we didn't send to. - const ns = for (mapped_nameservers) |*ns| { - if (received_message.from.eql(ns)) break ns; - } else { - continue; - }; - - // Find which query this answer goes with, if any. - const query, const answer = for (queries, answers) |query, *answer| { - if (reply[0] == query[0] and reply[1] == query[1]) break .{ query, answer }; - } else { - continue; - }; - if (answer.len != 0) continue; - - // Only accept positive or negative responses; retry immediately on - // server failure, and ignore all other codes such as refusal. - switch (reply[3] & 15) { - 0, 3 => { - answer.* = reply; - answer_buffer_i += reply.len; - answers_remaining -= 1; - if (answer_buffer.len - answer_buffer_i == 0) break :send; - if (answers_remaining == 0) break :send; - }, - 2 => { - var retry_message: Io.net.OutgoingMessage = .{ - .address = ns, - .data_ptr = query.ptr, - .data_len = query.len, - }; - _ = io.vtable.netSend(io.userdata, socket.handle, (&retry_message)[0..1], .{}); - continue; - }, - else => continue, - } - } - if (recv_err) |err| switch (err) { - error.Canceled => return error.Canceled, - error.Timeout => continue :send, - else => continue, - }; - } - } else { - return error.NameServerFailure; - } - - var addresses_len: usize = 0; - var canonical_name: ?HostName = null; - - for (answers) |answer| { - var it = DnsResponse.init(answer) catch { - // TODO accept a diagnostics struct and append warnings - continue; - }; - while (it.next() catch { - // TODO accept a diagnostics struct and append warnings - continue; - }) |record| switch (record.rr) { - std.posix.RR.A => { - const data = record.packet[record.data_off..][0..record.data_len]; - if (data.len != 4) return error.InvalidDnsARecord; - if (addresses_len < options.addresses_buffer.len) { - options.addresses_buffer[addresses_len] = .{ .ip4 = .{ - .bytes = data[0..4].*, - .port = options.port, - } }; - addresses_len += 1; - } - }, - std.posix.RR.AAAA => { - const data = record.packet[record.data_off..][0..record.data_len]; - if (data.len != 16) return error.InvalidDnsAAAARecord; - if (addresses_len < options.addresses_buffer.len) { - options.addresses_buffer[addresses_len] = .{ .ip6 = .{ - .bytes = data[0..16].*, - .port = options.port, - } }; - addresses_len += 1; - } - }, - std.posix.RR.CNAME => { - _, canonical_name = expand(record.packet, record.data_off, options.canonical_name_buffer) catch - return error.InvalidDnsCnameRecord; - }, - else => continue, - }; - } - - if (addresses_len != 0) return .{ - .addresses_len = addresses_len, - .canonical_name = canonical_name orelse .{ .bytes = lookup_canon_name }, - }; - - return error.NameServerFailure; -} - -fn lookupHosts(host_name: HostName, io: Io, options: LookupOptions) !LookupResult { - const file = Io.File.openAbsolute(io, "/etc/hosts", .{}) catch |err| switch (err) { - error.FileNotFound, - error.NotDir, - error.AccessDenied, - => return .empty, - - error.Canceled => |e| return e, - - else => { - // TODO populate optional diagnostic struct - return error.DetectingNetworkConfigurationFailed; - }, - }; - defer file.close(io); - - var line_buf: [512]u8 = undefined; - var file_reader = file.reader(io, &line_buf); - return lookupHostsReader(host_name, options, &file_reader.interface) catch |err| switch (err) { - error.ReadFailed => switch (file_reader.err.?) { - error.Canceled => |e| return e, - else => { - // TODO populate optional diagnostic struct - return error.DetectingNetworkConfigurationFailed; - }, - }, - }; -} - -fn lookupHostsReader(host_name: HostName, options: LookupOptions, reader: *Io.Reader) error{ReadFailed}!LookupResult { - var addresses_len: usize = 0; - var canonical_name: ?HostName = null; - while (true) { - const line = reader.takeDelimiterExclusive('\n') catch |err| switch (err) { - error.StreamTooLong => { - // Skip lines that are too long. - _ = reader.discardDelimiterInclusive('\n') catch |e| switch (e) { - error.EndOfStream => break, - error.ReadFailed => return error.ReadFailed, - }; - continue; - }, - error.ReadFailed => return error.ReadFailed, - error.EndOfStream => break, - }; - reader.toss(1); - var split_it = std.mem.splitScalar(u8, line, '#'); - const no_comment_line = split_it.first(); - - var line_it = std.mem.tokenizeAny(u8, no_comment_line, " \t"); - const ip_text = line_it.next() orelse continue; - var first_name_text: ?[]const u8 = null; - while (line_it.next()) |name_text| { - if (std.mem.eql(u8, name_text, host_name.bytes)) { - if (first_name_text == null) first_name_text = name_text; - break; - } - } else continue; - - if (canonical_name == null) { - if (HostName.init(first_name_text.?)) |name_text| { - if (name_text.bytes.len <= options.canonical_name_buffer.len) { - const canonical_name_dest = options.canonical_name_buffer[0..name_text.bytes.len]; - @memcpy(canonical_name_dest, name_text.bytes); - canonical_name = .{ .bytes = canonical_name_dest }; - } - } else |_| {} - } - - if (options.family != .ip6) { - if (IpAddress.parseIp4(ip_text, options.port)) |addr| { - options.addresses_buffer[addresses_len] = addr; - addresses_len += 1; - if (options.addresses_buffer.len - addresses_len == 0) return .{ - .addresses_len = addresses_len, - .canonical_name = canonical_name orelse copyCanon(options.canonical_name_buffer, ip_text), - }; - } else |_| {} - } - if (options.family != .ip4) { - if (IpAddress.parseIp6(ip_text, options.port)) |addr| { - options.addresses_buffer[addresses_len] = addr; - addresses_len += 1; - if (options.addresses_buffer.len - addresses_len == 0) return .{ - .addresses_len = addresses_len, - .canonical_name = canonical_name orelse copyCanon(options.canonical_name_buffer, ip_text), - }; - } else |_| {} - } - } - if (canonical_name == null) assert(addresses_len == 0); - return .{ - .addresses_len = addresses_len, - .canonical_name = canonical_name orelse undefined, - }; -} - -fn copyCanon(canonical_name_buffer: *[max_len]u8, name: []const u8) HostName { - const dest = canonical_name_buffer[0..name.len]; - @memcpy(dest, name); - return .{ .bytes = dest }; -} - -/// Writes DNS resolution query packet data to `w`; at most 280 bytes. -fn writeResolutionQuery(q: *[280]u8, op: u4, dname: []const u8, class: u8, ty: u8, entropy: [2]u8) usize { - // This implementation is ported from musl libc. - // A more idiomatic "ziggy" implementation would be welcome. - var name = dname; - if (std.mem.endsWith(u8, name, ".")) name.len -= 1; - assert(name.len <= 253); - const n = 17 + name.len + @intFromBool(name.len != 0); - - // Construct query template - ID will be filled later - q[0..2].* = entropy; - @memset(q[2..n], 0); - q[2] = @as(u8, op) * 8 + 1; - q[5] = 1; - @memcpy(q[13..][0..name.len], name); - var i: usize = 13; - var j: usize = undefined; - while (q[i] != 0) : (i = j + 1) { - j = i; - while (q[j] != 0 and q[j] != '.') : (j += 1) {} - // TODO determine the circumstances for this and whether or - // not this should be an error. - if (j - i - 1 > 62) unreachable; - q[i - 1] = @intCast(j - i); - } - q[i + 1] = ty; - q[i + 3] = class; - return n; +/// Adds any number of `IpAddress` into resolved, exactly one canonical_name, +/// and then always finishes by adding one `LookupResult.end` entry. +/// +/// Guaranteed not to block if provided queue has capacity at least 8. +pub fn lookup( + host_name: HostName, + io: Io, + resolved: *Io.Queue(LookupResult), + options: LookupOptions, +) void { + return io.vtable.netLookup(io.userdata, host_name, resolved, options); } pub const ExpandError = error{InvalidDnsPacket} || ValidateError; @@ -672,33 +206,43 @@ pub fn connect( port: u16, options: IpAddress.ConnectOptions, ) ConnectError!Stream { - var addresses_buffer: [32]IpAddress = undefined; - var canonical_name_buffer: [HostName.max_len]u8 = undefined; + var canonical_name_buffer: [max_len]u8 = undefined; + var results_buffer: [32]HostName.LookupResult = undefined; + var results: Io.Queue(LookupResult) = .init(&results_buffer); - const results = try lookup(host_name, io, .{ + var lookup_task = io.async(HostName.lookup, .{ host_name, io, &results, .{ .port = port, - .addresses_buffer = &addresses_buffer, .canonical_name_buffer = &canonical_name_buffer, - }); - const addresses = addresses_buffer[0..results.addresses_len]; + } }); + defer lookup_task.cancel(io); - if (addresses.len == 0) return error.UnknownHostName; + var select: Io.Select(union(enum) { ip_connect: IpAddress.ConnectError!Stream }) = .init; + defer select.cancel(io); - // TODO instead of serially, use a Select API to send out - // the connections simultaneously and then keep the first - // successful one, canceling the rest. + while (results.getOne(io)) |result| switch (result) { + .address => |address| select.async(io, .ip_connect, IpAddress.connect, .{ address, io, options }), + .canonical_name => continue, + .end => |lookup_result| { + try lookup_result; + break; + }, + } else |err| return err; - // TODO On Linux this should additionally use an Io.Queue based - // DNS resolution API in order to send out a connection after - // each DNS response before waiting for the rest of them. + var aggregate_error: ConnectError = error.UnknownHostName; - for (addresses) |*addr| { - return addr.connect(io, options) catch |err| switch (err) { - error.ConnectionRefused => continue, - else => |e| return e, - }; - } - return error.ConnectionRefused; + while (select.remaining != 0) switch (select.wait(io)) { + .ip_connect => |ip_connect| if (ip_connect) |stream| return stream else |err| switch (err) { + error.SystemResources => |e| return e, + error.OptionUnsupported => |e| return e, + error.ProcessFdQuotaExceeded => |e| return e, + error.SystemFdQuotaExceeded => |e| return e, + error.Canceled => |e| return e, + error.WouldBlock => return error.Unexpected, + else => |e| aggregate_error = e, + }, + }; + + return aggregate_error; } pub const ResolvConf = struct { @@ -713,7 +257,7 @@ pub const ResolvConf = struct { pub const max_nameservers = 3; /// Returns `error.StreamTooLong` if a line is longer than 512 bytes. - fn init(io: Io) !ResolvConf { + pub fn init(io: Io) !ResolvConf { var rc: ResolvConf = .{ .nameservers_buffer = undefined, .nameservers_len = 0, @@ -749,7 +293,7 @@ pub const ResolvConf = struct { const Directive = enum { options, nameserver, domain, search }; const Option = enum { ndots, attempts, timeout }; - fn parse(rc: *ResolvConf, io: Io, reader: *Io.Reader) !void { + pub fn parse(rc: *ResolvConf, io: Io, reader: *Io.Reader) !void { while (reader.takeSentinel('\n')) |line_with_comment| { const line = line: { var split = std.mem.splitScalar(u8, line_with_comment, '#'); @@ -799,7 +343,7 @@ pub const ResolvConf = struct { rc.nameservers_len += 1; } - fn nameservers(rc: *const ResolvConf) []const IpAddress { + pub fn nameservers(rc: *const ResolvConf) []const IpAddress { return rc.nameservers_buffer[0..rc.nameservers_len]; } }; diff --git a/lib/std/posix.zig b/lib/std/posix.zig index bd5f04232d..8844a8249e 100644 --- a/lib/std/posix.zig +++ b/lib/std/posix.zig @@ -845,7 +845,7 @@ pub fn read(fd: fd_t, buf: []u8) ReadError!usize { .NOMEM => return error.SystemResources, .NOTCONN => return error.SocketUnconnected, .CONNRESET => return error.ConnectionResetByPeer, - .TIMEDOUT => return error.ConnectionTimedOut, + .TIMEDOUT => return error.Timeout, .NOTCAPABLE => return error.AccessDenied, else => |err| return unexpectedErrno(err), } @@ -874,7 +874,7 @@ pub fn read(fd: fd_t, buf: []u8) ReadError!usize { .NOMEM => return error.SystemResources, .NOTCONN => return error.SocketUnconnected, .CONNRESET => return error.ConnectionResetByPeer, - .TIMEDOUT => return error.ConnectionTimedOut, + .TIMEDOUT => return error.Timeout, else => |err| return unexpectedErrno(err), } } @@ -914,7 +914,7 @@ pub fn readv(fd: fd_t, iov: []const iovec) ReadError!usize { .NOMEM => return error.SystemResources, .NOTCONN => return error.SocketUnconnected, .CONNRESET => return error.ConnectionResetByPeer, - .TIMEDOUT => return error.ConnectionTimedOut, + .TIMEDOUT => return error.Timeout, .NOTCAPABLE => return error.AccessDenied, else => |err| return unexpectedErrno(err), } @@ -936,7 +936,7 @@ pub fn readv(fd: fd_t, iov: []const iovec) ReadError!usize { .NOMEM => return error.SystemResources, .NOTCONN => return error.SocketUnconnected, .CONNRESET => return error.ConnectionResetByPeer, - .TIMEDOUT => return error.ConnectionTimedOut, + .TIMEDOUT => return error.Timeout, else => |err| return unexpectedErrno(err), } } @@ -983,7 +983,7 @@ pub fn pread(fd: fd_t, buf: []u8, offset: u64) PReadError!usize { .NOMEM => return error.SystemResources, .NOTCONN => return error.SocketUnconnected, .CONNRESET => return error.ConnectionResetByPeer, - .TIMEDOUT => return error.ConnectionTimedOut, + .TIMEDOUT => return error.Timeout, .NXIO => return error.Unseekable, .SPIPE => return error.Unseekable, .OVERFLOW => return error.Unseekable, @@ -1016,7 +1016,7 @@ pub fn pread(fd: fd_t, buf: []u8, offset: u64) PReadError!usize { .NOMEM => return error.SystemResources, .NOTCONN => return error.SocketUnconnected, .CONNRESET => return error.ConnectionResetByPeer, - .TIMEDOUT => return error.ConnectionTimedOut, + .TIMEDOUT => return error.Timeout, .NXIO => return error.Unseekable, .SPIPE => return error.Unseekable, .OVERFLOW => return error.Unseekable, @@ -1134,7 +1134,7 @@ pub fn preadv(fd: fd_t, iov: []const iovec, offset: u64) PReadError!usize { .NOMEM => return error.SystemResources, .NOTCONN => return error.SocketUnconnected, .CONNRESET => return error.ConnectionResetByPeer, - .TIMEDOUT => return error.ConnectionTimedOut, + .TIMEDOUT => return error.Timeout, .NXIO => return error.Unseekable, .SPIPE => return error.Unseekable, .OVERFLOW => return error.Unseekable, @@ -1160,7 +1160,7 @@ pub fn preadv(fd: fd_t, iov: []const iovec, offset: u64) PReadError!usize { .NOMEM => return error.SystemResources, .NOTCONN => return error.SocketUnconnected, .CONNRESET => return error.ConnectionResetByPeer, - .TIMEDOUT => return error.ConnectionTimedOut, + .TIMEDOUT => return error.Timeout, .NXIO => return error.Unseekable, .SPIPE => return error.Unseekable, .OVERFLOW => return error.Unseekable, @@ -4205,7 +4205,7 @@ pub const ConnectError = error{ /// Timeout while attempting connection. The server may be too busy to accept new connections. Note /// that for IP sockets the timeout may be very long when syncookies are enabled on the server. - ConnectionTimedOut, + Timeout, /// This error occurs when no global event loop is configured, /// and connecting to the socket would block. @@ -4236,7 +4236,7 @@ pub fn connect(sock: socket_t, sock_addr: *const sockaddr, len: socklen_t) Conne .WSAEADDRNOTAVAIL => return error.AddressNotAvailable, .WSAECONNREFUSED => return error.ConnectionRefused, .WSAECONNRESET => return error.ConnectionResetByPeer, - .WSAETIMEDOUT => return error.ConnectionTimedOut, + .WSAETIMEDOUT => return error.Timeout, .WSAEHOSTUNREACH, // TODO: should we return NetworkUnreachable in this case as well? .WSAENETUNREACH, => return error.NetworkUnreachable, @@ -4273,7 +4273,7 @@ pub fn connect(sock: socket_t, sock_addr: *const sockaddr, len: socklen_t) Conne .NETUNREACH => return error.NetworkUnreachable, .NOTSOCK => unreachable, // The file descriptor sockfd does not refer to a socket. .PROTOTYPE => unreachable, // The socket type does not support the requested communications protocol. - .TIMEDOUT => return error.ConnectionTimedOut, + .TIMEDOUT => return error.Timeout, .NOENT => return error.FileNotFound, // Returned when socket is AF.UNIX and the given path does not exist. .CONNABORTED => unreachable, // Tried to reuse socket that previously received error.ConnectionRefused. else => |err| return unexpectedErrno(err), @@ -4333,7 +4333,7 @@ pub fn getsockoptError(sockfd: fd_t) ConnectError!void { .NETUNREACH => return error.NetworkUnreachable, .NOTSOCK => unreachable, // The file descriptor sockfd does not refer to a socket. .PROTOTYPE => unreachable, // The socket type does not support the requested communications protocol. - .TIMEDOUT => return error.ConnectionTimedOut, + .TIMEDOUT => return error.Timeout, .CONNRESET => return error.ConnectionResetByPeer, else => |err| return unexpectedErrno(err), }, @@ -6465,7 +6465,7 @@ pub const RecvFromError = error{ SystemResources, ConnectionResetByPeer, - ConnectionTimedOut, + Timeout, /// The socket has not been bound. SocketNotBound, @@ -6508,7 +6508,7 @@ pub fn recvfrom( .WSAENETDOWN => return error.NetworkDown, .WSAENOTCONN => return error.SocketUnconnected, .WSAEWOULDBLOCK => return error.WouldBlock, - .WSAETIMEDOUT => return error.ConnectionTimedOut, + .WSAETIMEDOUT => return error.Timeout, // TODO: handle more errors else => |err| return windows.unexpectedWSAError(err), } @@ -6528,7 +6528,7 @@ pub fn recvfrom( .NOMEM => return error.SystemResources, .CONNREFUSED => return error.ConnectionRefused, .CONNRESET => return error.ConnectionResetByPeer, - .TIMEDOUT => return error.ConnectionTimedOut, + .TIMEDOUT => return error.Timeout, .PIPE => return error.BrokenPipe, else => |err| return unexpectedErrno(err), } diff --git a/lib/std/zig/system.zig b/lib/std/zig/system.zig index 7bf836b583..2b11e971ca 100644 --- a/lib/std/zig/system.zig +++ b/lib/std/zig/system.zig @@ -428,7 +428,7 @@ pub fn resolveTargetQuery(io: Io, query: Target.Query) DetectError!Target { error.WouldBlock => return error.Unexpected, error.BrokenPipe => return error.Unexpected, error.ConnectionResetByPeer => return error.Unexpected, - error.ConnectionTimedOut => return error.Unexpected, + error.Timeout => return error.Unexpected, error.NotOpenForReading => return error.Unexpected, error.SocketUnconnected => return error.Unexpected,