std.Io.Threaded.netReceive: recvmsg first, then poll

Calling recvmsg first means no poll syscall needed when messages are
already in the operating system queue. Empirically, this happens when
repeating a DNS query that has been already been made recently. In such
case, poll() is never called!
This commit is contained in:
Andrew Kelley 2025-10-02 16:59:22 -07:00
parent a6347a68a9
commit 62d0dd0d36

View File

@ -1431,8 +1431,8 @@ fn netReceive(
// the split vectors though because reducing the buffer size might make
// some messages unreceivable.
// So the strategy instead is to use poll with timeout and then non-blocking
// recvmsg calls.
// So the strategy instead is to use non-blocking recvmsg calls, calling
// poll() with timeout if the first one returns EAGAIN.
const posix_flags: u32 =
@as(u32, if (flags.oob) posix.MSG.OOB else 0) |
@as(u32, if (flags.peek) posix.MSG.PEEK else 0) |
@ -1449,93 +1449,92 @@ fn netReceive(
var message_i: usize = 0;
var data_i: usize = 0;
// TODO: recvmsg first, then poll if EAGAIN. saves syscall in case the messages are already queued.
const deadline = timeout.toDeadline(pool.io()) catch |err| return .{ err, message_i };
poll: while (true) {
recv: while (true) {
pool.checkCancel() catch |err| return .{ err, message_i };
if (message_i > 0 or message_buffer.len - message_i == 0) return .{ null, message_i };
if (message_buffer.len - message_i == 0) return .{ null, message_i };
const message = &message_buffer[message_i];
const remaining_data_buffer = data_buffer[data_i..];
var storage: PosixAddress = undefined;
var iov: posix.iovec = .{ .base = remaining_data_buffer.ptr, .len = remaining_data_buffer.len };
var msg: posix.msghdr = .{
.name = &storage.any,
.namelen = @sizeOf(PosixAddress),
.iov = (&iov)[0..1],
.iovlen = 1,
.control = message.control.ptr,
.controllen = message.control.len,
.flags = undefined,
};
const max_poll_ms = std.math.maxInt(u31);
const timeout_ms: u31 = if (deadline) |d| t: {
const duration = d.durationFromNow(pool.io()) catch |err| return .{ err, message_i };
if (duration.nanoseconds <= 0) return .{ error.Timeout, message_i };
break :t @intCast(@min(max_poll_ms, duration.toMilliseconds()));
} else max_poll_ms;
const poll_rc = posix.system.poll(&poll_fds, poll_fds.len, timeout_ms);
switch (posix.errno(poll_rc)) {
const recv_rc = posix.system.recvmsg(handle, &msg, posix_flags);
switch (posix.errno(recv_rc)) {
.SUCCESS => {
if (poll_rc == 0) {
// Possibly spurious timeout.
if (deadline == null) continue;
return .{ error.Timeout, message_i };
}
const data = remaining_data_buffer[0..@intCast(recv_rc)];
data_i += data.len;
message.* = .{
.from = addressFromPosix(&storage),
.data = data,
.control = if (msg.control) |ptr| @as([*]u8, @ptrCast(ptr))[0..msg.controllen] else message.control,
.flags = .{
.eor = (msg.flags & posix.MSG.EOR) != 0,
.trunc = (msg.flags & posix.MSG.TRUNC) != 0,
.ctrunc = (msg.flags & posix.MSG.CTRUNC) != 0,
.oob = (msg.flags & posix.MSG.OOB) != 0,
.errqueue = (msg.flags & posix.MSG.ERRQUEUE) != 0,
},
};
message_i += 1;
continue;
},
.AGAIN => while (true) {
pool.checkCancel() catch |err| return .{ err, message_i };
if (message_i != 0) return .{ null, message_i };
// Proceed to recvmsg.
while (true) {
pool.checkCancel() catch |err| return .{ err, message_i };
const max_poll_ms = std.math.maxInt(u31);
const timeout_ms: u31 = if (deadline) |d| t: {
const duration = d.durationFromNow(pool.io()) catch |err| return .{ err, message_i };
if (duration.nanoseconds <= 0) return .{ error.Timeout, message_i };
break :t @intCast(@min(max_poll_ms, duration.toMilliseconds()));
} else max_poll_ms;
const message = &message_buffer[message_i];
const remaining_data_buffer = data_buffer[data_i..];
var storage: PosixAddress = undefined;
var iov: posix.iovec = .{ .base = remaining_data_buffer.ptr, .len = remaining_data_buffer.len };
var msg: posix.msghdr = .{
.name = &storage.any,
.namelen = @sizeOf(PosixAddress),
.iov = (&iov)[0..1],
.iovlen = 1,
.control = message.control.ptr,
.controllen = message.control.len,
.flags = undefined,
};
const poll_rc = posix.system.poll(&poll_fds, poll_fds.len, timeout_ms);
switch (posix.errno(poll_rc)) {
.SUCCESS => {
if (poll_rc == 0) {
// Although spurious timeouts are OK, when no deadline
// is passed we must not return `error.Timeout`.
if (deadline == null) continue;
return .{ error.Timeout, message_i };
}
continue :recv;
},
.INTR => continue,
const rc = posix.system.recvmsg(handle, &msg, posix_flags);
switch (posix.errno(rc)) {
.SUCCESS => {
const data = remaining_data_buffer[0..@intCast(rc)];
data_i += data.len;
message.* = .{
.from = addressFromPosix(&storage),
.data = data,
.control = if (msg.control) |ptr| @as([*]u8, @ptrCast(ptr))[0..msg.controllen] else message.control,
.flags = .{
.eor = (msg.flags & posix.MSG.EOR) != 0,
.trunc = (msg.flags & posix.MSG.TRUNC) != 0,
.ctrunc = (msg.flags & posix.MSG.CTRUNC) != 0,
.oob = (msg.flags & posix.MSG.OOB) != 0,
.errqueue = (msg.flags & posix.MSG.ERRQUEUE) != 0,
},
};
message_i += 1;
continue;
},
.AGAIN => continue :poll,
.BADF => |err| return .{ errnoBug(err), message_i },
.NFILE => return .{ error.SystemFdQuotaExceeded, message_i },
.MFILE => return .{ error.ProcessFdQuotaExceeded, message_i },
.INTR => continue,
.FAULT => |err| return .{ errnoBug(err), message_i },
.INVAL => |err| return .{ errnoBug(err), message_i },
.NOBUFS => return .{ error.SystemResources, message_i },
.NOMEM => return .{ error.SystemResources, message_i },
.NOTCONN => return .{ error.SocketUnconnected, message_i },
.NOTSOCK => |err| return .{ errnoBug(err), message_i },
.MSGSIZE => return .{ error.MessageOversize, message_i },
.PIPE => return .{ error.SocketUnconnected, message_i },
.OPNOTSUPP => |err| return .{ errnoBug(err), message_i },
.CONNRESET => return .{ error.ConnectionResetByPeer, message_i },
.NETDOWN => return .{ error.NetworkDown, message_i },
else => |err| return .{ posix.unexpectedErrno(err), message_i },
}
.FAULT => |err| return .{ errnoBug(err), message_i },
.INVAL => |err| return .{ errnoBug(err), message_i },
.NOMEM => return .{ error.SystemResources, message_i },
else => |err| return .{ posix.unexpectedErrno(err), message_i },
}
},
.INTR => continue,
.BADF => |err| return .{ errnoBug(err), message_i },
.NFILE => return .{ error.SystemFdQuotaExceeded, message_i },
.MFILE => return .{ error.ProcessFdQuotaExceeded, message_i },
.FAULT => |err| return .{ errnoBug(err), message_i },
.INVAL => |err| return .{ errnoBug(err), message_i },
.NOBUFS => return .{ error.SystemResources, message_i },
.NOMEM => return .{ error.SystemResources, message_i },
.NOTCONN => return .{ error.SocketUnconnected, message_i },
.NOTSOCK => |err| return .{ errnoBug(err), message_i },
.MSGSIZE => return .{ error.MessageOversize, message_i },
.PIPE => return .{ error.SocketUnconnected, message_i },
.OPNOTSUPP => |err| return .{ errnoBug(err), message_i },
.CONNRESET => return .{ error.ConnectionResetByPeer, message_i },
.NETDOWN => return .{ error.NetworkDown, message_i },
else => |err| return .{ posix.unexpectedErrno(err), message_i },
}
}