diff --git a/lib/std/Io/Kqueue.zig b/lib/std/Io/Kqueue.zig index edfa76cd5e..eef959155a 100644 --- a/lib/std/Io/Kqueue.zig +++ b/lib/std/Io/Kqueue.zig @@ -9,6 +9,9 @@ const net = std.Io.net; const assert = std.debug.assert; const Allocator = std.mem.Allocator; const Alignment = std.mem.Alignment; +const posix = std.posix; +const IpAddress = std.Io.net.IpAddress; +const errnoBug = std.Io.Threaded.errnoBug; /// Must be a thread-safe allocator. gpa: Allocator, @@ -97,14 +100,10 @@ fn async( context_alignment: std.mem.Alignment, start: *const fn (context: *const anyopaque, result: *anyopaque) void, ) ?*Io.AnyFuture { - const k: *Kqueue = @ptrCast(@alignCast(userdata)); - _ = k; - _ = result; - _ = result_alignment; - _ = context; - _ = context_alignment; - _ = start; - @panic("TODO"); + return concurrent(userdata, result.len, result_alignment, context, context_alignment, start) catch { + start(context.ptr, result.ptr); + return null; + }; } fn concurrent( @@ -156,7 +155,7 @@ fn cancel( fn cancelRequested(userdata: ?*anyopaque) bool { const k: *Kqueue = @ptrCast(@alignCast(userdata)); _ = k; - @panic("TODO"); + return false; // TODO } fn groupAsync( @@ -419,12 +418,23 @@ fn netAccept(userdata: ?*anyopaque, server: net.Socket.Handle) net.Server.Accept _ = server; @panic("TODO"); } -fn netBindIp(userdata: ?*anyopaque, address: *const net.IpAddress, options: net.IpAddress.BindOptions) net.IpAddress.BindError!net.Socket { +fn netBindIp( + userdata: ?*anyopaque, + address: *const net.IpAddress, + options: net.IpAddress.BindOptions, +) net.IpAddress.BindError!net.Socket { const k: *Kqueue = @ptrCast(@alignCast(userdata)); - _ = k; - _ = address; - _ = options; - @panic("TODO"); + const family = Io.Threaded.posixAddressFamily(address); + const socket_fd = try openSocketPosix(k, family, options); + errdefer posix.close(socket_fd); + var storage: Io.Threaded.PosixAddress = undefined; + var addr_len = Io.Threaded.addressToPosix(address, &storage); + try posixBind(k, socket_fd, &storage.any, addr_len); + try posixGetSockName(k, socket_fd, &storage.any, &addr_len); + return .{ + .handle = socket_fd, + .address = Io.Threaded.addressFromPosix(&storage), + }; } fn netConnectIp(userdata: ?*anyopaque, address: *const net.IpAddress, options: net.IpAddress.ConnectOptions) net.IpAddress.ConnectError!net.Stream { const k: *Kqueue = @ptrCast(@alignCast(userdata)); @@ -453,6 +463,7 @@ fn netConnectUnix( _ = unix_address; @panic("TODO"); } + fn netSend( userdata: ?*anyopaque, handle: net.Socket.Handle, @@ -460,12 +471,22 @@ fn netSend( flags: net.SendFlags, ) struct { ?net.Socket.SendError, usize } { const k: *Kqueue = @ptrCast(@alignCast(userdata)); + + const posix_flags: u32 = + @as(u32, if (@hasDecl(posix.MSG, "CONFIRM") and flags.confirm) posix.MSG.CONFIRM else 0) | + @as(u32, if (@hasDecl(posix.MSG, "DONTROUTE") and flags.dont_route) posix.MSG.DONTROUTE else 0) | + @as(u32, if (@hasDecl(posix.MSG, "EOR") and flags.eor) posix.MSG.EOR else 0) | + @as(u32, if (@hasDecl(posix.MSG, "OOB") and flags.oob) posix.MSG.OOB else 0) | + @as(u32, if (@hasDecl(posix.MSG, "FASTOPEN") and flags.fastopen) posix.MSG.FASTOPEN else 0) | + posix.MSG.NOSIGNAL; + _ = k; + _ = posix_flags; _ = handle; _ = outgoing_messages; - _ = flags; @panic("TODO"); } + fn netReceive( userdata: ?*anyopaque, handle: net.Socket.Handle, @@ -533,3 +554,153 @@ fn netLookup( _ = options; @panic("TODO"); } + +fn openSocketPosix( + k: *Kqueue, + family: posix.sa_family_t, + options: IpAddress.BindOptions, +) error{ + AddressFamilyUnsupported, + ProtocolUnsupportedBySystem, + ProcessFdQuotaExceeded, + SystemFdQuotaExceeded, + SystemResources, + ProtocolUnsupportedByAddressFamily, + SocketModeUnsupported, + OptionUnsupported, + Unexpected, + Canceled, +}!posix.socket_t { + const mode = Io.Threaded.posixSocketMode(options.mode); + const protocol = Io.Threaded.posixProtocol(options.protocol); + const socket_fd = while (true) { + try k.checkCancel(); + const flags: u32 = mode | if (Io.Threaded.socket_flags_unsupported) 0 else posix.SOCK.CLOEXEC; + const socket_rc = posix.system.socket(family, flags, protocol); + switch (posix.errno(socket_rc)) { + .SUCCESS => { + const fd: posix.fd_t = @intCast(socket_rc); + errdefer posix.close(fd); + if (Io.Threaded.socket_flags_unsupported) { + while (true) { + try k.checkCancel(); + switch (posix.errno(posix.system.fcntl(fd, posix.F.SETFD, @as(usize, posix.FD_CLOEXEC)))) { + .SUCCESS => break, + .INTR => continue, + .CANCELED => return error.Canceled, + else => |err| return posix.unexpectedErrno(err), + } + } + + var fl_flags: usize = while (true) { + try k.checkCancel(); + const rc = posix.system.fcntl(fd, posix.F.GETFL, @as(usize, 0)); + switch (posix.errno(rc)) { + .SUCCESS => break @intCast(rc), + .INTR => continue, + .CANCELED => return error.Canceled, + else => |err| return posix.unexpectedErrno(err), + } + }; + fl_flags &= ~@as(usize, 1 << @bitOffsetOf(posix.O, "NONBLOCK")); + while (true) { + try k.checkCancel(); + switch (posix.errno(posix.system.fcntl(fd, posix.F.SETFL, fl_flags))) { + .SUCCESS => break, + .INTR => continue, + .CANCELED => return error.Canceled, + else => |err| return posix.unexpectedErrno(err), + } + } + } + break fd; + }, + .INTR => continue, + .CANCELED => return error.Canceled, + + .AFNOSUPPORT => return error.AddressFamilyUnsupported, + .INVAL => return error.ProtocolUnsupportedBySystem, + .MFILE => return error.ProcessFdQuotaExceeded, + .NFILE => return error.SystemFdQuotaExceeded, + .NOBUFS => return error.SystemResources, + .NOMEM => return error.SystemResources, + .PROTONOSUPPORT => return error.ProtocolUnsupportedByAddressFamily, + .PROTOTYPE => return error.SocketModeUnsupported, + else => |err| return posix.unexpectedErrno(err), + } + }; + errdefer posix.close(socket_fd); + + if (options.ip6_only) { + if (posix.IPV6 == void) return error.OptionUnsupported; + try setSocketOption(k, socket_fd, posix.IPPROTO.IPV6, posix.IPV6.V6ONLY, 0); + } + + return socket_fd; +} + +fn posixBind( + k: *Kqueue, + socket_fd: posix.socket_t, + addr: *const posix.sockaddr, + addr_len: posix.socklen_t, +) !void { + while (true) { + try k.checkCancel(); + switch (posix.errno(posix.system.bind(socket_fd, addr, addr_len))) { + .SUCCESS => break, + .INTR => continue, + .CANCELED => return error.Canceled, + + .ADDRINUSE => return error.AddressInUse, + .BADF => |err| return errnoBug(err), // File descriptor used after closed. + .INVAL => |err| return errnoBug(err), // invalid parameters + .NOTSOCK => |err| return errnoBug(err), // invalid `sockfd` + .AFNOSUPPORT => return error.AddressFamilyUnsupported, + .ADDRNOTAVAIL => return error.AddressUnavailable, + .FAULT => |err| return errnoBug(err), // invalid `addr` pointer + .NOMEM => return error.SystemResources, + else => |err| return posix.unexpectedErrno(err), + } + } +} + +fn posixGetSockName(k: *Kqueue, socket_fd: posix.fd_t, addr: *posix.sockaddr, addr_len: *posix.socklen_t) !void { + while (true) { + try k.checkCancel(); + switch (posix.errno(posix.system.getsockname(socket_fd, addr, addr_len))) { + .SUCCESS => break, + .INTR => continue, + .CANCELED => return error.Canceled, + + .BADF => |err| return errnoBug(err), // File descriptor used after closed. + .FAULT => |err| return errnoBug(err), + .INVAL => |err| return errnoBug(err), // invalid parameters + .NOTSOCK => |err| return errnoBug(err), // always a race condition + .NOBUFS => return error.SystemResources, + else => |err| return posix.unexpectedErrno(err), + } + } +} + +fn setSocketOption(k: *Kqueue, fd: posix.fd_t, level: i32, opt_name: u32, option: u32) !void { + const o: []const u8 = @ptrCast(&option); + while (true) { + try k.checkCancel(); + switch (posix.errno(posix.system.setsockopt(fd, level, opt_name, o.ptr, @intCast(o.len)))) { + .SUCCESS => return, + .INTR => continue, + .CANCELED => return error.Canceled, + + .BADF => |err| return errnoBug(err), // File descriptor used after closed. + .NOTSOCK => |err| return errnoBug(err), + .INVAL => |err| return errnoBug(err), + .FAULT => |err| return errnoBug(err), + else => |err| return posix.unexpectedErrno(err), + } + } +} + +fn checkCancel(k: *Kqueue) error{Canceled}!void { + if (cancelRequested(k)) return error.Canceled; +} diff --git a/lib/std/Io/Threaded.zig b/lib/std/Io/Threaded.zig index 4a216c5601..c1365e0f44 100644 --- a/lib/std/Io/Threaded.zig +++ b/lib/std/Io/Threaded.zig @@ -285,7 +285,7 @@ pub fn io(t: *Threaded) Io { }; } -const socket_flags_unsupported = builtin.os.tag.isDarwin() or native_os == .haiku; // 💩💩 +pub const socket_flags_unsupported = builtin.os.tag.isDarwin() or native_os == .haiku; // 💩💩 const have_accept4 = !socket_flags_unsupported; const have_flock_open_flags = @hasField(posix.O, "EXLOCK"); const have_networking = builtin.os.tag != .wasi; @@ -4283,7 +4283,7 @@ fn netLookupFallible( return error.OptionUnsupported; } -const PosixAddress = extern union { +pub const PosixAddress = extern union { any: posix.sockaddr, in: posix.sockaddr.in, in6: posix.sockaddr.in6, @@ -4301,14 +4301,14 @@ const WsaAddress = extern union { un: ws2_32.sockaddr.un, }; -fn posixAddressFamily(a: *const IpAddress) posix.sa_family_t { +pub fn posixAddressFamily(a: *const IpAddress) posix.sa_family_t { return switch (a.*) { .ip4 => posix.AF.INET, .ip6 => posix.AF.INET6, }; } -fn addressFromPosix(posix_address: *const PosixAddress) IpAddress { +pub fn addressFromPosix(posix_address: *const PosixAddress) IpAddress { return switch (posix_address.any.family) { posix.AF.INET => .{ .ip4 = address4FromPosix(&posix_address.in) }, posix.AF.INET6 => .{ .ip6 = address6FromPosix(&posix_address.in6) }, @@ -4324,7 +4324,7 @@ fn addressFromWsa(wsa_address: *const WsaAddress) IpAddress { }; } -fn addressToPosix(a: *const IpAddress, storage: *PosixAddress) posix.socklen_t { +pub fn addressToPosix(a: *const IpAddress, storage: *PosixAddress) posix.socklen_t { return switch (a.*) { .ip4 => |ip4| { storage.in = address4ToPosix(ip4); @@ -4405,7 +4405,7 @@ fn address6ToPosix(a: *const net.Ip6Address) posix.sockaddr.in6 { }; } -fn errnoBug(err: posix.E) Io.UnexpectedError { +pub fn errnoBug(err: posix.E) Io.UnexpectedError { switch (builtin.mode) { .Debug => std.debug.panic("programmer bug caused syscall error: {t}", .{err}), else => return error.Unexpected, @@ -4419,7 +4419,7 @@ fn wsaErrorBug(err: ws2_32.WinsockError) Io.UnexpectedError { } } -fn posixSocketMode(mode: net.Socket.Mode) u32 { +pub fn posixSocketMode(mode: net.Socket.Mode) u32 { return switch (mode) { .stream => posix.SOCK.STREAM, .dgram => posix.SOCK.DGRAM, @@ -4429,7 +4429,7 @@ fn posixSocketMode(mode: net.Socket.Mode) u32 { }; } -fn posixProtocol(protocol: ?net.Protocol) u32 { +pub fn posixProtocol(protocol: ?net.Protocol) u32 { return @intFromEnum(protocol orelse return 0); }