From e8cea8accb7f289fd00ee82063df32882748ac48 Mon Sep 17 00:00:00 2001 From: Andrew Kelley Date: Mon, 13 Oct 2025 20:17:51 -0700 Subject: [PATCH] std.Io.Threaded: implement netListenUnix --- BRANCH_TODO | 1 + lib/std/Io.zig | 6 +-- lib/std/Io/Threaded.zig | 98 ++++++++++++++++++++++++++++++++++++++--- lib/std/Io/net.zig | 40 ++++++++++++++--- lib/std/Io/net/test.zig | 2 +- 5 files changed, 130 insertions(+), 17 deletions(-) diff --git a/BRANCH_TODO b/BRANCH_TODO index 21a23fd74d..afb955968d 100644 --- a/BRANCH_TODO +++ b/BRANCH_TODO @@ -10,5 +10,6 @@ * address the cancelation race condition (signal received between checkCancel and syscall) * update signal values to be an enum * move fs.File.Writer to Io +* add non-blocking flag to network operations, handle EAGAIN * finish moving std.fs to Io * finish moving all of std.posix into Threaded diff --git a/lib/std/Io.zig b/lib/std/Io.zig index 8d6be76173..749e2b3ae8 100644 --- a/lib/std/Io.zig +++ b/lib/std/Io.zig @@ -672,12 +672,12 @@ pub const VTable = struct { now: *const fn (?*anyopaque, Clock) Clock.Error!Timestamp, sleep: *const fn (?*anyopaque, Timeout) SleepError!void, - netListenIp: *const fn (?*anyopaque, address: net.IpAddress, options: net.IpAddress.ListenOptions) net.IpAddress.ListenError!net.Server, + netListenIp: *const fn (?*anyopaque, address: net.IpAddress, net.IpAddress.ListenOptions) net.IpAddress.ListenError!net.Server, netAccept: *const fn (?*anyopaque, server: net.Socket.Handle) net.Server.AcceptError!net.Stream, netBindIp: *const fn (?*anyopaque, address: *const net.IpAddress, options: net.IpAddress.BindOptions) net.IpAddress.BindError!net.Socket, netConnectIp: *const fn (?*anyopaque, address: *const net.IpAddress, options: net.IpAddress.ConnectOptions) net.IpAddress.ConnectError!net.Stream, - netListenUnix: *const fn (?*anyopaque, net.UnixAddress) net.UnixAddress.ListenError!net.Socket.Handle, - netConnectUnix: *const fn (?*anyopaque, net.UnixAddress) net.UnixAddress.ConnectError!net.Socket.Handle, + netListenUnix: *const fn (?*anyopaque, *const net.UnixAddress, net.UnixAddress.ListenOptions) net.UnixAddress.ListenError!net.Socket.Handle, + netConnectUnix: *const fn (?*anyopaque, *const net.UnixAddress) net.UnixAddress.ConnectError!net.Socket.Handle, netSend: *const fn (?*anyopaque, net.Socket.Handle, []net.OutgoingMessage, net.SendFlags) struct { ?net.Socket.SendError, usize }, netReceive: *const fn (?*anyopaque, net.Socket.Handle, message_buffer: []net.IncomingMessage, data_buffer: []u8, net.ReceiveFlags, Timeout) struct { ?net.Socket.ReceiveTimeoutError, usize }, /// Returns 0 on end of stream. diff --git a/lib/std/Io/Threaded.zig b/lib/std/Io/Threaded.zig index a200632764..6051c99555 100644 --- a/lib/std/Io/Threaded.zig +++ b/lib/std/Io/Threaded.zig @@ -1756,11 +1756,80 @@ fn netListenIpPosix( }; } -fn netListenUnix(userdata: ?*anyopaque, address: Io.net.UnixAddress) Io.net.UnixAddress.ListenError!Io.net.Socket.Handle { +fn netListenUnix( + userdata: ?*anyopaque, + address: *const Io.net.UnixAddress, + options: Io.net.UnixAddress.ListenOptions, +) Io.net.UnixAddress.ListenError!Io.net.Socket.Handle { + if (!Io.net.has_unix_sockets) return error.AddressFamilyUnsupported; const pool: *Pool = @ptrCast(@alignCast(userdata)); - _ = pool; - _ = address; - @panic("TODO"); + const protocol: u32 = 0; + const socket_fd = while (true) { + try pool.checkCancel(); + const flags: u32 = posix.SOCK.STREAM | if (socket_flags_unsupported) 0 else posix.SOCK.CLOEXEC; + const socket_rc = posix.system.socket(posix.AF.UNIX, flags, protocol); + switch (posix.errno(socket_rc)) { + .SUCCESS => { + const fd: posix.fd_t = @intCast(socket_rc); + errdefer posix.close(fd); + if (socket_flags_unsupported) while (true) { + try pool.checkCancel(); + switch (posix.errno(posix.system.fcntl(fd, posix.F.SETFD, @as(usize, posix.FD_CLOEXEC)))) { + .SUCCESS => break, + .INTR => continue, + else => |err| return posix.unexpectedErrno(err), + } + }; + break fd; + }, + .INTR => continue, + .AFNOSUPPORT => return error.AddressFamilyUnsupported, + .MFILE => return error.ProcessFdQuotaExceeded, + .NFILE => return error.SystemFdQuotaExceeded, + .NOBUFS => return error.SystemResources, + .NOMEM => return error.SystemResources, + else => |err| return posix.unexpectedErrno(err), + } + }; + errdefer posix.close(socket_fd); + + var storage: UnixAddress = undefined; + const addr_len = addressUnixToPosix(address, &storage); + while (true) { + try pool.checkCancel(); + switch (posix.errno(posix.system.bind(socket_fd, &storage.any, addr_len))) { + .SUCCESS => break, + .INTR => continue, + .ACCES => return error.AccessDenied, + .PERM => return error.PermissionDenied, + .ADDRINUSE => return error.AddressInUse, + .AFNOSUPPORT => return error.AddressFamilyUnsupported, + .ADDRNOTAVAIL => return error.AddressUnavailable, + .NOMEM => return error.SystemResources, + .LOOP => return error.SymLinkLoop, + .NOENT => return error.FileNotFound, + .NOTDIR => return error.NotDir, + .ROFS => return error.ReadOnlyFileSystem, + .BADF => |err| return errnoBug(err), // always a race condition if this error is returned + .INVAL => |err| return errnoBug(err), // invalid parameters + .NOTSOCK => |err| return errnoBug(err), // invalid `sockfd` + .FAULT => |err| return errnoBug(err), // invalid `addr` pointer + .NAMETOOLONG => |err| return errnoBug(err), + else => |err| return posix.unexpectedErrno(err), + } + } + + while (true) { + try pool.checkCancel(); + switch (posix.errno(posix.system.listen(socket_fd, options.kernel_backlog))) { + .SUCCESS => break, + .ADDRINUSE => return error.AddressInUse, + .BADF => |err| return errnoBug(err), + else => |err| return posix.unexpectedErrno(err), + } + } + + return socket_fd; } fn posixBind(pool: *Pool, socket_fd: posix.socket_t, addr: *const posix.sockaddr, addr_len: posix.socklen_t) !void { @@ -1791,7 +1860,7 @@ fn posixConnect(pool: *Pool, socket_fd: posix.socket_t, addr: *const posix.socka .ADDRINUSE => return error.AddressInUse, .ADDRNOTAVAIL => return error.AddressUnavailable, .AFNOSUPPORT => return error.AddressFamilyUnsupported, - .AGAIN, .INPROGRESS => |err| return errnoBug(err), + .AGAIN, .INPROGRESS => return error.WouldBlock, .ALREADY => return error.ConnectionPending, .BADF => |err| return errnoBug(err), .CONNREFUSED => return error.ConnectionRefused, @@ -1804,8 +1873,8 @@ fn posixConnect(pool: *Pool, socket_fd: posix.socket_t, addr: *const posix.socka .PROTOTYPE => |err| return errnoBug(err), .TIMEDOUT => return error.ConnectionTimedOut, .CONNABORTED => |err| return errnoBug(err), + .ACCES => return error.AccessDenied, // UNIX socket error codes: - .ACCES => |err| return errnoBug(err), .PERM => |err| return errnoBug(err), .NOENT => |err| return errnoBug(err), else => |err| return posix.unexpectedErrno(err), @@ -1867,7 +1936,10 @@ fn netConnectIpPosix( } }; } -fn netConnectUnix(userdata: ?*anyopaque, address: Io.net.UnixAddress) Io.net.UnixAddress.ConnectError!Io.net.Socket.Handle { +fn netConnectUnix( + userdata: ?*anyopaque, + address: *const Io.net.UnixAddress, +) Io.net.UnixAddress.ConnectError!Io.net.Socket.Handle { const pool: *Pool = @ptrCast(@alignCast(userdata)); _ = pool; _ = address; @@ -2503,6 +2575,11 @@ const PosixAddress = extern union { in6: posix.sockaddr.in6, }; +const UnixAddress = extern union { + any: posix.sockaddr, + un: posix.sockaddr.un, +}; + fn posixAddressFamily(a: *const Io.net.IpAddress) posix.sa_family_t { return switch (a.*) { .ip4 => posix.AF.INET, @@ -2531,6 +2608,13 @@ fn addressToPosix(a: *const Io.net.IpAddress, storage: *PosixAddress) posix.sock }; } +fn addressUnixToPosix(a: *const Io.net.UnixAddress, storage: *UnixAddress) posix.socklen_t { + @memcpy(storage.un.path[0..a.path.len], a.path); + storage.un.family = posix.AF.UNIX; + storage.un.path[a.path.len] = 0; + return @sizeOf(posix.sockaddr.un); +} + fn address4FromPosix(in: *posix.sockaddr.in) Io.net.Ip4Address { return .{ .port = std.mem.bigToNative(u16, in.port), diff --git a/lib/std/Io/net.zig b/lib/std/Io/net.zig index 4c0c9405a7..c555a1d502 100644 --- a/lib/std/Io/net.zig +++ b/lib/std/Io/net.zig @@ -51,6 +51,8 @@ pub const has_unix_sockets = switch (native_os) { else => true, }; +pub const default_kernel_backlog = 128; + pub const IpAddress = union(enum) { ip4: Ip4Address, ip6: Ip6Address, @@ -210,7 +212,7 @@ pub const IpAddress = union(enum) { /// How many connections the kernel will accept on the application's behalf. /// If more than this many connections pool in the kernel, clients will start /// seeing "Connection refused". - kernel_backlog: u31 = 128, + kernel_backlog: u31 = default_kernel_backlog, /// Sets SO_REUSEADDR and SO_REUSEPORT on POSIX. /// Sets SO_REUSEADDR on Windows, which is roughly equivalent. reuse_address: bool = false, @@ -288,6 +290,11 @@ pub const IpAddress = union(enum) { ProtocolUnsupportedBySystem, ProtocolUnsupportedByAddressFamily, SocketModeUnsupported, + /// The user tried to connect to a broadcast address without having the socket broadcast flag enabled or + /// the connection request failed because of a local firewall rule. + AccessDenied, + /// Non-blocking was requested and the operation cannot return immediately. + WouldBlock, } || Io.Timeout.Error || Io.UnexpectedError || Io.Cancelable; pub const ConnectOptions = struct { @@ -804,19 +811,40 @@ pub const UnixAddress = struct { return .{ .path = p }; } - pub const ListenError = error{}; + pub const ListenError = error{ + AddressFamilyUnsupported, + AddressInUse, + NetworkDown, + SystemResources, + SymLinkLoop, + FileNotFound, + NotDir, + ReadOnlyFileSystem, + ProcessFdQuotaExceeded, + SystemFdQuotaExceeded, + AccessDenied, + PermissionDenied, + AddressUnavailable, + } || Io.Cancelable || Io.UnexpectedError; - pub fn listen(ua: UnixAddress, io: Io) ListenError!Server { + pub const ListenOptions = struct { + /// How many connections the kernel will accept on the application's behalf. + /// If more than this many connections pool in the kernel, clients will start + /// seeing "Connection refused". + kernel_backlog: u31 = default_kernel_backlog, + }; + + pub fn listen(ua: *const UnixAddress, io: Io, options: ListenOptions) ListenError!Server { assert(ua.path.len <= max_len); return .{ .socket = .{ - .handle = try io.vtable.netListenUnix(io.userdata, ua), + .handle = try io.vtable.netListenUnix(io.userdata, ua, options), .address = .{ .ip4 = .loopback(0) }, } }; } - pub const ConnectError = error{}; + pub const ConnectError = error{} || Io.Cancelable || Io.UnexpectedError; - pub fn connect(ua: UnixAddress, io: Io) ConnectError!Stream { + pub fn connect(ua: *const UnixAddress, io: Io) ConnectError!Stream { assert(ua.path.len <= max_len); return .{ .socket = .{ .handle = try io.vtable.netConnectUnix(io.userdata, ua), diff --git a/lib/std/Io/net/test.zig b/lib/std/Io/net/test.zig index 9bd4618429..edac076a6a 100644 --- a/lib/std/Io/net/test.zig +++ b/lib/std/Io/net/test.zig @@ -273,7 +273,7 @@ test "listen on a unix socket, send bytes, receive bytes" { const socket_addr = try net.UnixAddress.init(socket_path); defer std.fs.cwd().deleteFile(socket_path) catch {}; - var server = try socket_addr.listen(io); + var server = try socket_addr.listen(io, .{}); defer server.socket.close(io); const S = struct {