From 96cf75977bd46ccf4d0626183bc1d9e3e5d80eac Mon Sep 17 00:00:00 2001 From: Andrew Kelley Date: Wed, 1 Oct 2025 16:07:50 -0700 Subject: [PATCH] std.Io: implement netSend --- lib/std/Io.zig | 2 +- lib/std/Io/Threaded.zig | 111 +++++++++++++++++++++++++++++++----- lib/std/Io/net.zig | 34 +++++++---- lib/std/Io/net/HostName.zig | 10 ++-- lib/std/os/linux.zig | 7 +-- 5 files changed, 127 insertions(+), 37 deletions(-) diff --git a/lib/std/Io.zig b/lib/std/Io.zig index d0a5c5df8b..ddfb8c2e01 100644 --- a/lib/std/Io.zig +++ b/lib/std/Io.zig @@ -671,7 +671,7 @@ pub const VTable = struct { listen: *const fn (?*anyopaque, address: net.IpAddress, options: net.IpAddress.ListenOptions) net.IpAddress.ListenError!net.Server, accept: *const fn (?*anyopaque, server: *net.Server) net.Server.AcceptError!net.Stream, ipBind: *const fn (?*anyopaque, address: net.IpAddress, options: net.IpAddress.BindOptions) net.IpAddress.BindError!net.Socket, - netSend: *const fn (?*anyopaque, net.Socket.Handle, []const net.OutgoingMessage, net.SendFlags) net.Socket.SendError!void, + netSend: *const fn (?*anyopaque, net.Socket.Handle, []net.OutgoingMessage, net.SendFlags) net.Socket.SendError!void, netReceive: *const fn (?*anyopaque, handle: net.Socket.Handle, buffer: []u8, timeout: Timeout) net.Socket.ReceiveTimeoutError!net.ReceivedMessage, netRead: *const fn (?*anyopaque, src: net.Stream, data: [][]u8) net.Stream.Reader.Error!usize, netWrite: *const fn (?*anyopaque, dest: net.Stream, header: []const u8, data: []const []const u8, splat: usize) net.Stream.Writer.Error!usize, diff --git a/lib/std/Io/Threaded.zig b/lib/std/Io/Threaded.zig index 3e0c1380df..3b72f1ede1 100644 --- a/lib/std/Io/Threaded.zig +++ b/lib/std/Io/Threaded.zig @@ -1108,8 +1108,8 @@ fn listenPosix( } var storage: PosixAddress = undefined; - var socklen = addressToPosix(address, &storage); - try posixBind(pool, socket_fd, &storage.any, socklen); + var addr_len = addressToPosix(&address, &storage); + try posixBind(pool, socket_fd, &storage.any, addr_len); while (true) { try pool.checkCancel(); @@ -1121,7 +1121,7 @@ fn listenPosix( } } - try posixGetSockName(pool, socket_fd, &storage.any, &socklen); + try posixGetSockName(pool, socket_fd, &storage.any, &addr_len); return .{ .socket = .{ .handle = socket_fd, @@ -1226,9 +1226,9 @@ fn ipBindPosix( } var storage: PosixAddress = undefined; - var socklen = addressToPosix(address, &storage); - try posixBind(pool, socket_fd, &storage.any, socklen); - try posixGetSockName(pool, socket_fd, &storage.any, &socklen); + var addr_len = addressToPosix(&address, &storage); + try posixBind(pool, socket_fd, &storage.any, addr_len); + try posixGetSockName(pool, socket_fd, &storage.any, &addr_len); return .{ .handle = socket_fd, .address = addressFromPosix(&storage), @@ -1306,21 +1306,102 @@ fn netReadPosix(userdata: ?*anyopaque, stream: Io.net.Stream, data: [][]u8) Io.n return n; } +const have_sendmmsg = builtin.os.tag == .linux; + fn netSend( userdata: ?*anyopaque, handle: Io.net.Socket.Handle, - messages: []const Io.net.OutgoingMessage, + messages: []Io.net.OutgoingMessage, flags: Io.net.SendFlags, ) Io.net.Socket.SendError!void { const pool: *Pool = @ptrCast(@alignCast(userdata)); - try pool.checkCancel(); - _ = handle; - _ = messages; - _ = flags; + if (have_sendmmsg) { + var i: usize = 0; + while (messages.len - i != 0) { + i += try netSendMany(pool, handle, messages[i..], flags); + } + return; + } + + try pool.checkCancel(); @panic("TODO"); } +fn netSendMany( + pool: *Pool, + handle: Io.net.Socket.Handle, + messages: []Io.net.OutgoingMessage, + flags: Io.net.SendFlags, +) Io.net.Socket.SendError!usize { + var msg_buffer: [64]std.os.linux.mmsghdr = undefined; + var addr_buffer: [msg_buffer.len]PosixAddress = undefined; + var iovecs_buffer: [msg_buffer.len]posix.iovec = undefined; + const min_len: usize = @min(messages.len, msg_buffer.len); + const clamped_messages = messages[0..min_len]; + const clamped_msgs = (&msg_buffer)[0..min_len]; + const clamped_addrs = (&addr_buffer)[0..min_len]; + const clamped_iovecs = (&iovecs_buffer)[0..min_len]; + + for (clamped_messages, clamped_msgs, clamped_addrs, clamped_iovecs) |*message, *msg, *addr, *iovec| { + iovec.* = .{ .base = @constCast(message.data_ptr), .len = message.data_len }; + msg.* = .{ + .hdr = .{ + .name = &addr.any, + .namelen = addressToPosix(message.address, addr), + .iov = iovec[0..1], + .iovlen = 1, + .control = @constCast(message.control.ptr), + .controllen = message.control.len, + .flags = 0, + }, + .len = undefined, // Populated by calling sendmmsg below. + }; + } + + const posix_flags: u32 = + @as(u32, if (flags.confirm) posix.MSG.CONFIRM else 0) | + @as(u32, if (flags.dont_route) posix.MSG.DONTROUTE else 0) | + @as(u32, if (flags.eor) posix.MSG.EOR else 0) | + @as(u32, if (flags.oob) posix.MSG.OOB else 0) | + @as(u32, if (flags.fastopen) posix.MSG.FASTOPEN else 0) | + posix.MSG.NOSIGNAL; + + while (true) { + try pool.checkCancel(); + const rc = posix.system.sendmmsg(handle, clamped_msgs.ptr, @intCast(clamped_msgs.len), posix_flags); + switch (posix.errno(rc)) { + .SUCCESS => { + for (clamped_messages[0..rc], clamped_msgs[0..rc]) |*message, *msg| { + message.data_len = msg.len; + } + return rc; + }, + .AGAIN => |err| return errnoBug(err), + .ALREADY => return error.FastOpenAlreadyInProgress, + .BADF => |err| return errnoBug(err), // Always a race condition. + .CONNRESET => return error.ConnectionResetByPeer, + .DESTADDRREQ => |err| return errnoBug(err), // The socket is not connection-mode, and no peer address is set. + .FAULT => |err| return errnoBug(err), // An invalid user space address was specified for an argument. + .INTR => continue, + .INVAL => |err| return errnoBug(err), // Invalid argument passed. + .ISCONN => |err| return errnoBug(err), // connection-mode socket was connected already but a recipient was specified + .MSGSIZE => return error.MessageOversize, + .NOBUFS => return error.SystemResources, + .NOMEM => return error.SystemResources, + .NOTSOCK => |err| return errnoBug(err), // The file descriptor sockfd does not refer to a socket. + .OPNOTSUPP => |err| return errnoBug(err), // Some bit in the flags argument is inappropriate for the socket type. + .PIPE => return error.SocketNotConnected, + .AFNOSUPPORT => return error.AddressFamilyUnsupported, + .HOSTUNREACH => return error.NetworkUnreachable, + .NETUNREACH => return error.NetworkUnreachable, + .NOTCONN => return error.SocketNotConnected, + .NETDOWN => return error.NetworkDown, + else => |err| return posix.unexpectedErrno(err), + } + } +} + fn netReceive( userdata: ?*anyopaque, handle: Io.net.Socket.Handle, @@ -1503,13 +1584,13 @@ fn addressFromPosix(posix_address: *PosixAddress) Io.net.IpAddress { }; } -fn addressToPosix(a: Io.net.IpAddress, storage: *PosixAddress) posix.socklen_t { - return switch (a) { +fn addressToPosix(a: *const Io.net.IpAddress, storage: *PosixAddress) posix.socklen_t { + return switch (a.*) { .ip4 => |ip4| { storage.in = address4ToPosix(ip4); return @sizeOf(posix.sockaddr.in); }, - .ip6 => |ip6| { + .ip6 => |*ip6| { storage.in6 = address6ToPosix(ip6); return @sizeOf(posix.sockaddr.in6); }, @@ -1539,7 +1620,7 @@ fn address4ToPosix(a: Io.net.Ip4Address) posix.sockaddr.in { }; } -fn address6ToPosix(a: Io.net.Ip6Address) posix.sockaddr.in6 { +fn address6ToPosix(a: *const Io.net.Ip6Address) posix.sockaddr.in6 { return .{ .port = std.mem.nativeToBig(u16, a.port), .flowinfo = a.flow, diff --git a/lib/std/Io/net.zig b/lib/std/Io/net.zig index 02cec48b0c..f83dc7e97c 100644 --- a/lib/std/Io/net.zig +++ b/lib/std/Io/net.zig @@ -154,7 +154,7 @@ pub const IpAddress = union(enum) { /// A nonexistent interface was requested or the requested address was not local. AddressUnavailable, /// The local network interface used to reach the destination is offline. - NetworkSubsystemDown, + NetworkDown, /// Insufficient memory or other resource internal to the operating system. SystemResources, /// Per-process limit on the number of open file descriptors has been reached. @@ -192,7 +192,7 @@ pub const IpAddress = union(enum) { /// Insufficient memory or other resource internal to the operating system. SystemResources, /// The local network interface used to reach the destination is offline. - NetworkSubsystemDown, + NetworkDown, ProtocolUnsupportedBySystem, ProtocolUnsupportedByAddressFamily, /// Per-process limit on the number of open file descriptors has been reached. @@ -702,7 +702,10 @@ pub const ReceivedMessage = struct { pub const OutgoingMessage = struct { address: *const IpAddress, - data: []const u8, + data_ptr: [*]const u8, + /// Initialized with how many bytes of `data_ptr` to send. After sending + /// succeeds, replaced with how many bytes were actually sent. + data_len: usize, control: []const u8 = &.{}, }; @@ -808,9 +811,10 @@ pub const Socket = struct { } pub const SendError = error{ - /// The socket type requires that message be sent atomically, and the size of the message - /// to be sent made this impossible. The message is not transmitted. - MessageTooBig, + /// The socket type requires that message be sent atomically, and the + /// size of the message to be sent made this impossible. The message + /// was not transmitted, or was partially transmitted. + MessageOversize, /// The output queue for a network interface was full. This generally indicates that the /// interface has stopped sending, but may be caused by transient congestion. (Normally, /// this does not occur in Linux. Packets are just silently dropped when a device queue @@ -823,21 +827,29 @@ pub const Socket = struct { /// Network reached but no route to host. HostUnreachable, /// The local network interface used to reach the destination is offline. - NetworkSubsystemDown, + NetworkDown, /// The destination address is not listening. Can still occur for /// connectionless messages. ConnectionRefused, /// Operating system or protocol does not support the address family. AddressFamilyUnsupported, + /// Another TCP Fast Open is already in progress. + FastOpenAlreadyInProgress, + /// Network connection was unexpectedly closed by recipient. + ConnectionResetByPeer, + /// Local end has been shut down on a connection-oriented socket, or + /// the socket was never connected. + SocketNotConnected, } || Io.UnexpectedError || Io.Cancelable; - /// Transfers `data` to `dest`, connectionless. + /// Transfers `data` to `dest`, connectionless, in one packet. pub fn send(s: *const Socket, io: Io, dest: *const IpAddress, data: []const u8) SendError!void { - const message: OutgoingMessage = .{ .address = dest, .data = data }; - return io.vtable.netSend(io.userdata, s.handle, &.{message}, .{}); + var message: OutgoingMessage = .{ .address = dest, .data_ptr = data.ptr, .data_len = data.len }; + try io.vtable.netSend(io.userdata, s.handle, &message, .{}); + if (message.data_len != data.len) return error.MessageOversize; } - pub fn sendMany(s: *const Socket, io: Io, messages: []const OutgoingMessage, flags: SendFlags) SendError!void { + pub fn sendMany(s: *const Socket, io: Io, messages: []OutgoingMessage, flags: SendFlags) SendError!void { return io.vtable.netSend(io.userdata, s.handle, messages, flags); } diff --git a/lib/std/Io/net/HostName.zig b/lib/std/Io/net/HostName.zig index e77987aa1a..1a595a0b67 100644 --- a/lib/std/Io/net/HostName.zig +++ b/lib/std/Io/net/HostName.zig @@ -278,7 +278,8 @@ fn lookupDns(io: Io, lookup_canon_name: []const u8, rc: *const ResolvConf, optio for (mapped_nameservers) |*ns| { message_buffer[message_i] = .{ .address = ns, - .data = query, + .data_ptr = query.ptr, + .data_len = query.len, }; message_i += 1; } @@ -324,11 +325,12 @@ fn lookupDns(io: Io, lookup_canon_name: []const u8, rc: *const ResolvConf, optio if (next_answer_buffer == answers.len) break :send; }, 2 => { - const message: Io.net.OutgoingMessage = .{ + var message: Io.net.OutgoingMessage = .{ .address = ns, - .data = query, + .data_ptr = query.ptr, + .data_len = query.len, }; - io.vtable.netSend(io.userdata, socket.handle, &.{message}, .{}) catch {}; + io.vtable.netSend(io.userdata, socket.handle, (&message)[0..1], .{}) catch {}; continue; }, else => continue, diff --git a/lib/std/os/linux.zig b/lib/std/os/linux.zig index 2b792453a1..ed4b9f6ca2 100644 --- a/lib/std/os/linux.zig +++ b/lib/std/os/linux.zig @@ -2011,7 +2011,7 @@ pub fn sendmsg(fd: i32, msg: *const msghdr_const, flags: u32) usize { } } -pub fn sendmmsg(fd: i32, msgvec: [*]mmsghdr_const, vlen: u32, flags: u32) usize { +pub fn sendmmsg(fd: i32, msgvec: [*]mmsghdr, vlen: u32, flags: u32) usize { return syscall4(.sendmmsg, @as(usize, @bitCast(@as(isize, fd))), @intFromPtr(msgvec), vlen, flags); } @@ -5884,11 +5884,6 @@ pub const mmsghdr = extern struct { len: u32, }; -pub const mmsghdr_const = extern struct { - hdr: msghdr_const, - len: u32, -}; - pub const epoll_data = extern union { ptr: usize, fd: i32,