diff --git a/BRANCH_TODO b/BRANCH_TODO index afb955968d..65b7580f68 100644 --- a/BRANCH_TODO +++ b/BRANCH_TODO @@ -5,11 +5,12 @@ * fix Group.wait not handling cancelation (need to move impl of ResetEvent to Threaded) * implement cancelRequest for non-linux posix * finish converting all Threaded into directly calling system functions and handling EINTR +* audit the TODOs * move max_iovecs_len to std.Io * address the cancelation race condition (signal received between checkCancel and syscall) * update signal values to be an enum * move fs.File.Writer to Io -* add non-blocking flag to network operations, handle EAGAIN +* add non-blocking flag to net and fs operations, handle EAGAIN * finish moving std.fs to Io * finish moving all of std.posix into Threaded diff --git a/lib/std/Io/Threaded.zig b/lib/std/Io/Threaded.zig index 6051c99555..a22a80c82e 100644 --- a/lib/std/Io/Threaded.zig +++ b/lib/std/Io/Threaded.zig @@ -1697,34 +1697,10 @@ fn netListenIpPosix( ) Io.net.IpAddress.ListenError!Io.net.Server { const pool: *Pool = @ptrCast(@alignCast(userdata)); const family = posixAddressFamily(&address); - const protocol: u32 = posix.IPPROTO.TCP; - const socket_fd = while (true) { - try pool.checkCancel(); - const flags: u32 = posix.SOCK.STREAM | if (socket_flags_unsupported) 0 else posix.SOCK.CLOEXEC; - const socket_rc = posix.system.socket(family, flags, protocol); - switch (posix.errno(socket_rc)) { - .SUCCESS => { - const fd: posix.fd_t = @intCast(socket_rc); - errdefer posix.close(fd); - if (socket_flags_unsupported) while (true) { - try pool.checkCancel(); - switch (posix.errno(posix.system.fcntl(fd, posix.F.SETFD, @as(usize, posix.FD_CLOEXEC)))) { - .SUCCESS => break, - .INTR => continue, - else => |err| return posix.unexpectedErrno(err), - } - }; - break fd; - }, - .INTR => continue, - .AFNOSUPPORT => return error.AddressFamilyUnsupported, - .MFILE => return error.ProcessFdQuotaExceeded, - .NFILE => return error.SystemFdQuotaExceeded, - .NOBUFS => return error.SystemResources, - .NOMEM => return error.SystemResources, - else => |err| return posix.unexpectedErrno(err), - } - }; + const socket_fd = try openSocketPosix(pool, family, .{ + .mode = options.mode, + .protocol = options.protocol, + }); errdefer posix.close(socket_fd); if (options.reuse_address) { @@ -1763,61 +1739,17 @@ fn netListenUnix( ) Io.net.UnixAddress.ListenError!Io.net.Socket.Handle { if (!Io.net.has_unix_sockets) return error.AddressFamilyUnsupported; const pool: *Pool = @ptrCast(@alignCast(userdata)); - const protocol: u32 = 0; - const socket_fd = while (true) { - try pool.checkCancel(); - const flags: u32 = posix.SOCK.STREAM | if (socket_flags_unsupported) 0 else posix.SOCK.CLOEXEC; - const socket_rc = posix.system.socket(posix.AF.UNIX, flags, protocol); - switch (posix.errno(socket_rc)) { - .SUCCESS => { - const fd: posix.fd_t = @intCast(socket_rc); - errdefer posix.close(fd); - if (socket_flags_unsupported) while (true) { - try pool.checkCancel(); - switch (posix.errno(posix.system.fcntl(fd, posix.F.SETFD, @as(usize, posix.FD_CLOEXEC)))) { - .SUCCESS => break, - .INTR => continue, - else => |err| return posix.unexpectedErrno(err), - } - }; - break fd; - }, - .INTR => continue, - .AFNOSUPPORT => return error.AddressFamilyUnsupported, - .MFILE => return error.ProcessFdQuotaExceeded, - .NFILE => return error.SystemFdQuotaExceeded, - .NOBUFS => return error.SystemResources, - .NOMEM => return error.SystemResources, - else => |err| return posix.unexpectedErrno(err), - } + const socket_fd = openSocketPosix(pool, posix.AF.UNIX, .{ .mode = .stream }) catch |err| switch (err) { + error.ProtocolUnsupportedBySystem => return error.AddressFamilyUnsupported, + error.ProtocolUnsupportedByAddressFamily => return error.AddressFamilyUnsupported, + error.SocketModeUnsupported => return error.AddressFamilyUnsupported, + else => |e| return e, }; errdefer posix.close(socket_fd); var storage: UnixAddress = undefined; const addr_len = addressUnixToPosix(address, &storage); - while (true) { - try pool.checkCancel(); - switch (posix.errno(posix.system.bind(socket_fd, &storage.any, addr_len))) { - .SUCCESS => break, - .INTR => continue, - .ACCES => return error.AccessDenied, - .PERM => return error.PermissionDenied, - .ADDRINUSE => return error.AddressInUse, - .AFNOSUPPORT => return error.AddressFamilyUnsupported, - .ADDRNOTAVAIL => return error.AddressUnavailable, - .NOMEM => return error.SystemResources, - .LOOP => return error.SymLinkLoop, - .NOENT => return error.FileNotFound, - .NOTDIR => return error.NotDir, - .ROFS => return error.ReadOnlyFileSystem, - .BADF => |err| return errnoBug(err), // always a race condition if this error is returned - .INVAL => |err| return errnoBug(err), // invalid parameters - .NOTSOCK => |err| return errnoBug(err), // invalid `sockfd` - .FAULT => |err| return errnoBug(err), // invalid `addr` pointer - .NAMETOOLONG => |err| return errnoBug(err), - else => |err| return posix.unexpectedErrno(err), - } - } + try posixBindUnix(pool, socket_fd, &storage.any, addr_len); while (true) { try pool.checkCancel(); @@ -1832,6 +1764,34 @@ fn netListenUnix( return socket_fd; } +fn posixBindUnix(pool: *Pool, fd: posix.socket_t, addr: *const posix.sockaddr, addr_len: posix.socklen_t) !void { + while (true) { + try pool.checkCancel(); + switch (posix.errno(posix.system.bind(fd, addr, addr_len))) { + .SUCCESS => break, + .INTR => continue, + .ACCES => return error.AccessDenied, + .ADDRINUSE => return error.AddressInUse, + .AFNOSUPPORT => return error.AddressFamilyUnsupported, + .ADDRNOTAVAIL => return error.AddressUnavailable, + .NOMEM => return error.SystemResources, + + .LOOP => return error.SymLinkLoop, + .NOENT => return error.FileNotFound, + .NOTDIR => return error.NotDir, + .ROFS => return error.ReadOnlyFileSystem, + .PERM => return error.PermissionDenied, + + .BADF => |err| return errnoBug(err), // always a race condition if this error is returned + .INVAL => |err| return errnoBug(err), // invalid parameters + .NOTSOCK => |err| return errnoBug(err), // invalid `sockfd` + .FAULT => |err| return errnoBug(err), // invalid `addr` pointer + .NAMETOOLONG => |err| return errnoBug(err), + else => |err| return posix.unexpectedErrno(err), + } + } +} + fn posixBind(pool: *Pool, socket_fd: posix.socket_t, addr: *const posix.sockaddr, addr_len: posix.socklen_t) !void { while (true) { try pool.checkCancel(); @@ -1857,7 +1817,6 @@ fn posixConnect(pool: *Pool, socket_fd: posix.socket_t, addr: *const posix.socka switch (posix.errno(posix.system.connect(socket_fd, addr, addr_len))) { .SUCCESS => return, .INTR => continue, - .ADDRINUSE => return error.AddressInUse, .ADDRNOTAVAIL => return error.AddressUnavailable, .AFNOSUPPORT => return error.AddressFamilyUnsupported, .AGAIN, .INPROGRESS => return error.WouldBlock, @@ -1866,7 +1825,7 @@ fn posixConnect(pool: *Pool, socket_fd: posix.socket_t, addr: *const posix.socka .CONNREFUSED => return error.ConnectionRefused, .CONNRESET => return error.ConnectionResetByPeer, .FAULT => |err| return errnoBug(err), - .ISCONN => return error.AlreadyConnected, + .ISCONN => |err| return errnoBug(err), .HOSTUNREACH => return error.HostUnreachable, .NETUNREACH => return error.NetworkUnreachable, .NOTSOCK => |err| return errnoBug(err), @@ -1874,7 +1833,6 @@ fn posixConnect(pool: *Pool, socket_fd: posix.socket_t, addr: *const posix.socka .TIMEDOUT => return error.ConnectionTimedOut, .CONNABORTED => |err| return errnoBug(err), .ACCES => return error.AccessDenied, - // UNIX socket error codes: .PERM => |err| return errnoBug(err), .NOENT => |err| return errnoBug(err), else => |err| return posix.unexpectedErrno(err), @@ -1882,6 +1840,35 @@ fn posixConnect(pool: *Pool, socket_fd: posix.socket_t, addr: *const posix.socka } } +fn posixConnectUnix(pool: *Pool, fd: posix.socket_t, addr: *const posix.sockaddr, addr_len: posix.socklen_t) !void { + while (true) { + try pool.checkCancel(); + switch (posix.errno(posix.system.connect(fd, addr, addr_len))) { + .SUCCESS => return, + .INTR => continue, + + .AFNOSUPPORT => return error.AddressFamilyUnsupported, + .AGAIN => return error.WouldBlock, + .INPROGRESS => return error.WouldBlock, + .ACCES => return error.AccessDenied, + + .LOOP => return error.SymLinkLoop, + .NOENT => return error.FileNotFound, + .NOTDIR => return error.NotDir, + .ROFS => return error.ReadOnlyFileSystem, + .PERM => return error.PermissionDenied, + + .BADF => |err| return errnoBug(err), + .CONNABORTED => |err| return errnoBug(err), + .FAULT => |err| return errnoBug(err), + .ISCONN => |err| return errnoBug(err), + .NOTSOCK => |err| return errnoBug(err), + .PROTOTYPE => |err| return errnoBug(err), + else => |err| return posix.unexpectedErrno(err), + } + } +} + fn posixGetSockName(pool: *Pool, socket_fd: posix.fd_t, addr: *posix.sockaddr, addr_len: *posix.socklen_t) !void { while (true) { try pool.checkCancel(); @@ -1926,6 +1913,7 @@ fn netConnectIpPosix( .mode = options.mode, .protocol = options.protocol, }); + errdefer posix.close(socket_fd); var storage: PosixAddress = undefined; var addr_len = addressToPosix(address, &storage); try posixConnect(pool, socket_fd, &storage.any, addr_len); @@ -1940,10 +1928,14 @@ fn netConnectUnix( userdata: ?*anyopaque, address: *const Io.net.UnixAddress, ) Io.net.UnixAddress.ConnectError!Io.net.Socket.Handle { + if (!Io.net.has_unix_sockets) return error.AddressFamilyUnsupported; const pool: *Pool = @ptrCast(@alignCast(userdata)); - _ = pool; - _ = address; - @panic("TODO"); + const socket_fd = try openSocketPosix(pool, posix.AF.UNIX, .{ .mode = .stream }); + errdefer posix.close(socket_fd); + var storage: UnixAddress = undefined; + const addr_len = addressUnixToPosix(address, &storage); + try posixConnectUnix(pool, socket_fd, &storage.any, addr_len); + return socket_fd; } fn netBindIpPosix( @@ -2497,18 +2489,16 @@ fn netInterfaceNameResolve( name: *const Io.net.Interface.Name, ) Io.net.Interface.Name.ResolveError!Io.net.Interface { const pool: *Pool = @ptrCast(@alignCast(userdata)); - try pool.checkCancel(); if (native_os == .linux) { - const rc = posix.system.socket(posix.AF.UNIX, posix.SOCK.DGRAM | posix.SOCK.CLOEXEC, 0); - const sock_fd: posix.fd_t = switch (posix.errno(rc)) { - .SUCCESS => @intCast(rc), - .ACCES => return error.AccessDenied, - .MFILE => return error.SystemResources, - .NFILE => return error.SystemResources, - .NOBUFS => return error.SystemResources, - .NOMEM => return error.SystemResources, - else => |err| return posix.unexpectedErrno(err), + const sock_fd = openSocketPosix(pool, posix.AF.UNIX, .{ .mode = .dgram }) catch |err| switch (err) { + error.ProcessFdQuotaExceeded => return error.SystemResources, + error.SystemFdQuotaExceeded => return error.SystemResources, + error.AddressFamilyUnsupported => return error.Unexpected, + error.ProtocolUnsupportedBySystem => return error.Unexpected, + error.ProtocolUnsupportedByAddressFamily => return error.Unexpected, + error.SocketModeUnsupported => return error.Unexpected, + else => |e| return e, }; defer posix.close(sock_fd); @@ -2521,12 +2511,12 @@ fn netInterfaceNameResolve( try pool.checkCancel(); switch (posix.errno(posix.system.ioctl(sock_fd, posix.SIOCGIFINDEX, @intFromPtr(&ifr)))) { .SUCCESS => return .{ .index = @bitCast(ifr.ifru.ivalue) }, + .INTR => continue, .INVAL => |err| return errnoBug(err), // Bad parameters. .NOTTY => |err| return errnoBug(err), .NXIO => |err| return errnoBug(err), .BADF => |err| return errnoBug(err), // Always a race condition. .FAULT => |err| return errnoBug(err), // Bad pointer parameter. - .INTR => continue, .IO => |err| return errnoBug(err), // sock_fd is not a file descriptor .NODEV => return error.InterfaceNotFound, else => |err| return posix.unexpectedErrno(err), @@ -2535,12 +2525,14 @@ fn netInterfaceNameResolve( } if (native_os == .windows) { + try pool.checkCancel(); const index = std.os.windows.ws2_32.if_nametoindex(&name.bytes); if (index == 0) return error.InterfaceNotFound; return .{ .index = index }; } if (builtin.link_libc) { + try pool.checkCancel(); const index = std.c.if_nametoindex(&name.bytes); if (index == 0) return error.InterfaceNotFound; return .{ .index = @bitCast(index) }; @@ -2591,7 +2583,7 @@ fn addressFromPosix(posix_address: *PosixAddress) Io.net.IpAddress { return switch (posix_address.any.family) { posix.AF.INET => .{ .ip4 = address4FromPosix(&posix_address.in) }, posix.AF.INET6 => .{ .ip6 = address6FromPosix(&posix_address.in6) }, - else => unreachable, + else => .{ .ip4 = .loopback(0) }, }; } diff --git a/lib/std/Io/net.zig b/lib/std/Io/net.zig index c555a1d502..e8aadde38c 100644 --- a/lib/std/Io/net.zig +++ b/lib/std/Io/net.zig @@ -206,6 +206,9 @@ pub const IpAddress = union(enum) { SystemFdQuotaExceeded, /// The requested address family (IPv4 or IPv6) is not supported by the operating system. AddressFamilyUnsupported, + ProtocolUnsupportedBySystem, + ProtocolUnsupportedByAddressFamily, + SocketModeUnsupported, } || Io.UnexpectedError || Io.Cancelable; pub const ListenOptions = struct { @@ -216,6 +219,16 @@ pub const IpAddress = union(enum) { /// Sets SO_REUSEADDR and SO_REUSEPORT on POSIX. /// Sets SO_REUSEADDR on Windows, which is roughly equivalent. reuse_address: bool = false, + /// Only connection-oriented modes may be used here, which includes: + /// * `Socket.Mode.stream` + /// * `Socket.Mode.seqpacket` + mode: Socket.Mode = .stream, + /// Only connection-oriented protocols may be used here, which includes: + /// * `Protocol.tcp` + /// * `Protocol.tp` + /// * `Protocol.dccp` + /// * `Protocol.sctp` + protocol: Protocol = .tcp, }; /// Waits for a TCP connection. When using this API, `bind` does not need @@ -276,7 +289,6 @@ pub const IpAddress = union(enum) { ConnectionPending, ConnectionRefused, ConnectionResetByPeer, - AlreadyConnected, HostUnreachable, NetworkUnreachable, ConnectionTimedOut, @@ -842,7 +854,22 @@ pub const UnixAddress = struct { } }; } - pub const ConnectError = error{} || Io.Cancelable || Io.UnexpectedError; + pub const ConnectError = error{ + SystemResources, + ProcessFdQuotaExceeded, + SystemFdQuotaExceeded, + AddressFamilyUnsupported, + ProtocolUnsupportedBySystem, + ProtocolUnsupportedByAddressFamily, + SocketModeUnsupported, + AccessDenied, + PermissionDenied, + SymLinkLoop, + FileNotFound, + NotDir, + ReadOnlyFileSystem, + WouldBlock, + } || Io.Cancelable || Io.UnexpectedError; pub fn connect(ua: *const UnixAddress, io: Io) ConnectError!Stream { assert(ua.path.len <= max_len);