std.Io.Threaded: fix netWrite cancellation

Move std.posix logic over rather than calling into it.
This commit is contained in:
Andrew Kelley 2025-10-16 23:40:32 -07:00
parent d4215ffaa0
commit cf6fa219fd
2 changed files with 87 additions and 32 deletions

View File

@ -250,6 +250,19 @@ pub fn io(t: *Threaded) Io {
};
}
const socket_flags_unsupported = builtin.os.tag.isDarwin() or native_os == .haiku; // 💩💩
const have_accept4 = !socket_flags_unsupported;
const have_flock_open_flags = @hasField(posix.O, "EXLOCK");
const have_networking = builtin.os.tag != .wasi;
const have_flock = @TypeOf(posix.system.flock) != void;
const have_sendmmsg = builtin.os.tag == .linux;
const openat_sym = if (posix.lfs64_abi) posix.system.openat64 else posix.system.openat;
const fstat_sym = if (posix.lfs64_abi) posix.system.fstat64 else posix.system.fstat;
const fstatat_sym = if (posix.lfs64_abi) posix.system.fstatat64 else posix.system.fstatat;
const lseek_sym = if (posix.lfs64_abi) posix.system.lseek64 else posix.system.lseek;
const preadv_sym = if (posix.lfs64_abi) posix.system.preadv64 else posix.system.preadv;
/// Trailing data:
/// 1. context
/// 2. result
@ -1143,13 +1156,6 @@ fn fileStatWasi(userdata: ?*anyopaque, file: Io.File) Io.File.StatError!Io.File.
}
}
const have_flock = @TypeOf(posix.system.flock) != void;
const openat_sym = if (posix.lfs64_abi) posix.system.openat64 else posix.system.openat;
const fstat_sym = if (posix.lfs64_abi) posix.system.fstat64 else posix.system.fstat;
const fstatat_sym = if (posix.lfs64_abi) posix.system.fstatat64 else posix.system.fstatat;
const lseek_sym = if (posix.lfs64_abi) posix.system.lseek64 else posix.system.lseek;
const preadv_sym = if (posix.lfs64_abi) posix.system.preadv64 else posix.system.preadv;
fn dirAccessPosix(
userdata: ?*anyopaque,
dir: Io.Dir,
@ -1277,8 +1283,7 @@ fn dirCreateFilePosix(
// Use the O locking flags if the os supports them to acquire the lock
// atomically. Note that the NONBLOCK flag is removed after the openat()
// call is successful.
const has_flock_open_flags = @hasField(posix.O, "EXLOCK");
if (has_flock_open_flags) switch (flags.lock) {
if (have_flock_open_flags) switch (flags.lock) {
.none => {},
.shared => {
os_flags.SHLOCK = true;
@ -1328,7 +1333,7 @@ fn dirCreateFilePosix(
};
errdefer posix.close(fd);
if (have_flock and !has_flock_open_flags and flags.lock != .none) {
if (have_flock and !have_flock_open_flags and flags.lock != .none) {
const lock_nonblocking: i32 = if (flags.lock_nonblocking) posix.LOCK.NB else 0;
const lock_flags = switch (flags.lock) {
.none => unreachable,
@ -1352,7 +1357,7 @@ fn dirCreateFilePosix(
}
}
if (has_flock_open_flags and flags.lock_nonblocking) {
if (have_flock_open_flags and flags.lock_nonblocking) {
var fl_flags: usize = while (true) {
try t.checkCancel();
const rc = posix.system.fcntl(fd, posix.F.GETFL, @as(usize, 0));
@ -1476,8 +1481,7 @@ fn dirOpenFile(
// Use the O locking flags if the os supports them to acquire the lock
// atomically.
const has_flock_open_flags = @hasField(posix.O, "EXLOCK");
if (has_flock_open_flags) {
if (have_flock_open_flags) {
// Note that the NONBLOCK flag is removed after the openat() call
// is successful.
switch (flags.lock) {
@ -1530,7 +1534,7 @@ fn dirOpenFile(
};
errdefer posix.close(fd);
if (have_flock and !has_flock_open_flags and flags.lock != .none) {
if (have_flock and !have_flock_open_flags and flags.lock != .none) {
const lock_nonblocking: i32 = if (flags.lock_nonblocking) posix.LOCK.NB else 0;
const lock_flags = switch (flags.lock) {
.none => unreachable,
@ -1554,7 +1558,7 @@ fn dirOpenFile(
}
}
if (has_flock_open_flags and flags.lock_nonblocking) {
if (have_flock_open_flags and flags.lock_nonblocking) {
var fl_flags: usize = while (true) {
try t.checkCancel();
const rc = posix.system.fcntl(fd, posix.F.GETFL, @as(usize, 0));
@ -1954,7 +1958,7 @@ fn nowWasi(userdata: ?*anyopaque, clock: Io.Clock) Io.Clock.Error!Io.Timestamp {
var ns: std.os.wasi.timestamp_t = undefined;
const err = std.os.wasi.clock_time_get(clockToWasi(clock), 1, &ns);
if (err != .SUCCESS) return error.Unexpected;
return ns;
return .fromNanoseconds(ns);
}
fn sleepLinux(userdata: ?*anyopaque, timeout: Io.Timeout) Io.SleepError!void {
@ -2004,7 +2008,7 @@ fn sleepWasi(userdata: ?*anyopaque, timeout: Io.Timeout) Io.SleepError!void {
const clock: w.subscription_clock_t = if (try timeout.toDurationFromNow(t.io())) |d| .{
.id = clockToWasi(d.clock),
.timeout = std.math.lossyCast(u64, d.duration.nanoseconds),
.timeout = std.math.lossyCast(u64, d.raw.nanoseconds),
.precision = 0,
.flags = 0,
} else .{
@ -2083,6 +2087,7 @@ fn netListenIpPosix(
address: IpAddress,
options: IpAddress.ListenOptions,
) IpAddress.ListenError!net.Server {
if (!have_networking) return error.NetworkDown;
const t: *Threaded = @ptrCast(@alignCast(userdata));
const family = posixAddressFamily(&address);
const socket_fd = try openSocketPosix(t, family, .{
@ -2230,6 +2235,7 @@ fn posixConnect(t: *Threaded, socket_fd: posix.socket_t, addr: *const posix.sock
.ACCES => return error.AccessDenied,
.PERM => |err| return errnoBug(err),
.NOENT => |err| return errnoBug(err),
.NETDOWN => return error.NetworkDown,
else => |err| return posix.unexpectedErrno(err),
}
}
@ -2306,6 +2312,7 @@ fn netConnectIpPosix(
address: *const IpAddress,
options: IpAddress.ConnectOptions,
) IpAddress.ConnectError!net.Stream {
if (!have_networking) return error.NetworkDown;
if (options.timeout != .none) @panic("TODO");
const t: *Threaded = @ptrCast(@alignCast(userdata));
const family = posixAddressFamily(address);
@ -2346,6 +2353,7 @@ fn netBindIpPosix(
address: *const IpAddress,
options: IpAddress.BindOptions,
) IpAddress.BindError!net.Socket {
if (!have_networking) return error.NetworkDown;
const t: *Threaded = @ptrCast(@alignCast(userdata));
const family = posixAddressFamily(address);
const socket_fd = try openSocketPosix(t, family, options);
@ -2421,9 +2429,6 @@ fn openSocketPosix(
return socket_fd;
}
const socket_flags_unsupported = builtin.os.tag.isDarwin() or native_os == .haiku; // 💩💩
const have_accept4 = !socket_flags_unsupported;
fn netAcceptPosix(userdata: ?*anyopaque, listen_fd: net.Socket.Handle) net.Server.AcceptError!net.Stream {
const t: *Threaded = @ptrCast(@alignCast(userdata));
var storage: PosixAddress = undefined;
@ -2534,14 +2539,13 @@ fn netReadPosix(userdata: ?*anyopaque, fd: net.Socket.Handle, data: [][]u8) net.
}
}
const have_sendmmsg = builtin.os.tag == .linux;
fn netSend(
userdata: ?*anyopaque,
handle: net.Socket.Handle,
messages: []net.OutgoingMessage,
flags: net.SendFlags,
) struct { ?net.Socket.SendError, usize } {
if (!have_networking) return .{ error.NetworkDown, 0 };
const t: *Threaded = @ptrCast(@alignCast(userdata));
const posix_flags: u32 =
@ -2703,7 +2707,7 @@ fn netSendMany(
.OPNOTSUPP => |err| return errnoBug(err), // Some bit in the flags argument is inappropriate for the socket type.
.PIPE => return error.SocketUnconnected,
.AFNOSUPPORT => return error.AddressFamilyUnsupported,
.HOSTUNREACH => return error.NetworkUnreachable,
.HOSTUNREACH => return error.HostUnreachable,
.NETUNREACH => return error.NetworkUnreachable,
.NOTCONN => return error.SocketUnconnected,
.NETDOWN => return error.NetworkDown,
@ -2720,6 +2724,7 @@ fn netReceive(
flags: net.ReceiveFlags,
timeout: Io.Timeout,
) struct { ?net.Socket.ReceiveTimeoutError, usize } {
if (!have_networking) return .{ error.NetworkDown, 0 };
const t: *Threaded = @ptrCast(@alignCast(userdata));
// recvmmsg is useless, here's why:
@ -2847,8 +2852,8 @@ fn netWritePosix(
data: []const []const u8,
splat: usize,
) net.Stream.Writer.Error!usize {
if (!have_networking) return error.NetworkDown;
const t: *Threaded = @ptrCast(@alignCast(userdata));
try t.checkCancel();
var iovecs: [max_iovecs_len]posix.iovec_const = undefined;
var msg: posix.msghdr_const = .{
@ -2889,7 +2894,37 @@ fn netWritePosix(
},
};
const flags = posix.MSG.NOSIGNAL;
return posix.sendmsg(fd, &msg, flags);
while (true) {
try t.checkCancel();
const rc = posix.system.sendmsg(fd, &msg, flags);
switch (posix.errno(rc)) {
.SUCCESS => return @intCast(rc),
.INTR => continue,
.CANCELED => return error.Canceled,
.ACCES => |err| return errnoBug(err),
.AGAIN => |err| return errnoBug(err),
.ALREADY => return error.FastOpenAlreadyInProgress,
.BADF => |err| return errnoBug(err), // always a race condition
.CONNRESET => return error.ConnectionResetByPeer,
.DESTADDRREQ => |err| return errnoBug(err), // The socket is not connection-mode, and no peer address is set.
.FAULT => |err| return errnoBug(err), // An invalid user space address was specified for an argument.
.INVAL => |err| return errnoBug(err), // Invalid argument passed.
.ISCONN => |err| return errnoBug(err), // connection-mode socket was connected already but a recipient was specified
.MSGSIZE => |err| return errnoBug(err),
.NOBUFS => return error.SystemResources,
.NOMEM => return error.SystemResources,
.NOTSOCK => |err| return errnoBug(err), // The file descriptor sockfd does not refer to a socket.
.OPNOTSUPP => |err| return errnoBug(err), // Some bit in the flags argument is inappropriate for the socket type.
.PIPE => return error.SocketUnconnected,
.AFNOSUPPORT => return error.AddressFamilyUnsupported,
.HOSTUNREACH => return error.HostUnreachable,
.NETUNREACH => return error.NetworkUnreachable,
.NOTCONN => return error.SocketUnconnected,
.NETDOWN => return error.NetworkDown,
else => |err| return posix.unexpectedErrno(err),
}
}
}
fn addBuf(v: []posix.iovec_const, i: *@FieldType(posix.msghdr_const, "iovlen"), bytes: []const u8) void {
@ -2913,6 +2948,7 @@ fn netInterfaceNameResolve(
userdata: ?*anyopaque,
name: *const net.Interface.Name,
) net.Interface.Name.ResolveError!net.Interface {
if (!have_networking) return error.InterfaceNotFound;
const t: *Threaded = @ptrCast(@alignCast(userdata));
if (native_os == .linux) {

View File

@ -309,6 +309,7 @@ pub const IpAddress = union(enum) {
AccessDenied,
/// Non-blocking was requested and the operation cannot return immediately.
WouldBlock,
NetworkDown,
} || Io.Timeout.Error || Io.UnexpectedError || Io.Cancelable;
pub const ConnectOptions = struct {
@ -1062,7 +1063,7 @@ pub const Socket = struct {
AddressFamilyUnsupported,
/// Another TCP Fast Open is already in progress.
FastOpenAlreadyInProgress,
/// Network connection was unexpectedly closed by recipient.
/// Network session was unexpectedly closed by recipient.
ConnectionResetByPeer,
/// Local end has been shut down on a connection-oriented socket, or
/// the socket was never connected.
@ -1242,15 +1243,33 @@ pub const Stream = struct {
stream: Stream,
err: ?Error = null,
pub const Error = std.posix.SendMsgError || error{
pub const Error = error{
/// Another TCP Fast Open is already in progress.
FastOpenAlreadyInProgress,
/// Network session was unexpectedly closed by recipient.
ConnectionResetByPeer,
SocketNotBound,
MessageOversize,
NetworkDown,
/// The output queue for a network interface was full. This generally indicates that the
/// interface has stopped sending, but may be caused by transient congestion. (Normally,
/// this does not occur in Linux. Packets are just silently dropped when a device queue
/// overflows.)
///
/// This is also caused when there is not enough kernel memory available.
SystemResources,
/// No route to network.
NetworkUnreachable,
/// Network reached but no route to host.
HostUnreachable,
/// The local network interface used to reach the destination is down.
NetworkDown,
/// The destination address is not listening.
ConnectionRefused,
/// The passed address didn't have the correct address family in its sa_family field.
AddressFamilyUnsupported,
/// Local end has been shut down on a connection-oriented socket, or
/// the socket was never connected.
SocketUnconnected,
Unexpected,
} || Io.Cancelable;
SocketNotBound,
} || Io.UnexpectedError || Io.Cancelable;
pub fn init(stream: Stream, io: Io, buffer: []u8) Writer {
return .{