io_uring: add zero-copy send operation

`send_zc` tries to avoid making intermediate copies of data. Zerocopy execution
is not guaranteed and may fall back to copying.

The flags field of the first struct io_uring_cqe may likely contain
IORING_CQE_F_MORE , which means that there will be a second completion event /
notification for the request, with the user_data field set to the same value.
The user must not modify the data buffer until the notification is posted. The
first cqe follows the usual rules and so its res field will contain the number
of bytes sent or a negative error code. The notification's res field will be set
to zero and the flags field will contain IORING_CQE_F_NOTIF. The two step model
is needed because the kernel may hold on to buffers for a long time, e.g.
waiting for a TCP ACK, and having a separate cqe for request completions allows
userspace to push more data without extra delays. Note, notifications are only
responsible for controlling the lifetime of the buffers, and as such don't mean
anything about whether the data has atually been sent out or received by the
other end. Even errored requests may generate a notification, and the user must
check for IORING_CQE_F_MORE rather than relying on the result.

Available since kernel 6.0.

References:
https://man7.org/linux/man-pages/man3/io_uring_prep_send_zc.3.html
https://man7.org/linux/man-pages/man2/io_uring_enter.2.html
This commit is contained in:
Igor Anić 2023-11-10 16:44:18 +01:00
parent 6b9f7e26c9
commit 087ee497d4

View File

@ -606,6 +606,39 @@ pub const IO_Uring = struct {
return sqe;
}
/// Queues (but does not submit) an SQE to perform an async zerocopy `send(2)`.
/// Returns a pointer to the SQE.
pub fn send_zc(
self: *IO_Uring,
user_data: u64,
fd: os.fd_t,
buffer: []const u8,
send_flags: u32,
zc_flags: u16,
) !*linux.io_uring_sqe {
const sqe = try self.get_sqe();
io_uring_prep_send_zc(sqe, fd, buffer, send_flags, zc_flags);
sqe.user_data = user_data;
return sqe;
}
/// Queues (but does not submit) an SQE to perform an async zerocopy `send(2)`.
/// Returns a pointer to the SQE.
pub fn send_zc_fixed(
self: *IO_Uring,
user_data: u64,
fd: os.fd_t,
buffer: []const u8,
send_flags: u32,
zc_flags: u16,
buf_index: u16,
) !*linux.io_uring_sqe {
const sqe = try self.get_sqe();
io_uring_prep_send_zc_fixed(sqe, fd, buffer, send_flags, zc_flags, buf_index);
sqe.user_data = user_data;
return sqe;
}
/// Queues (but does not submit) an SQE to perform a `recvmsg(2)`.
/// Returns a pointer to the SQE.
pub fn recvmsg(
@ -636,6 +669,21 @@ pub const IO_Uring = struct {
return sqe;
}
/// Queues (but does not submit) an SQE to perform an async zerocopy `sendmsg(2)`.
/// Returns a pointer to the SQE.
pub fn sendmsg_zc(
self: *IO_Uring,
user_data: u64,
fd: os.fd_t,
msg: *const os.msghdr_const,
flags: u32,
) !*linux.io_uring_sqe {
const sqe = try self.get_sqe();
io_uring_prep_sendmsg_zc(sqe, fd, msg, flags);
sqe.user_data = user_data;
return sqe;
}
/// Queues (but does not submit) an SQE to perform an `openat(2)`.
/// Returns a pointer to the SQE.
pub fn openat(
@ -1373,6 +1421,28 @@ pub fn io_uring_prep_send(sqe: *linux.io_uring_sqe, fd: os.fd_t, buffer: []const
sqe.rw_flags = flags;
}
pub fn io_uring_prep_send_zc(sqe: *linux.io_uring_sqe, fd: os.fd_t, buffer: []const u8, flags: u32, zc_flags: u16) void {
io_uring_prep_rw(.SEND_ZC, sqe, fd, @intFromPtr(buffer.ptr), buffer.len, 0);
sqe.rw_flags = flags;
sqe.ioprio = zc_flags;
}
pub fn io_uring_prep_send_zc_fixed(sqe: *linux.io_uring_sqe, fd: os.fd_t, buffer: []const u8, flags: u32, zc_flags: u16, buf_index: u16) void {
io_uring_prep_send_zc(sqe, fd, buffer, flags, zc_flags);
sqe.ioprio |= linux.IORING_RECVSEND_FIXED_BUF;
sqe.buf_index = buf_index;
}
pub fn io_uring_prep_sendmsg_zc(
sqe: *linux.io_uring_sqe,
fd: os.fd_t,
msg: *const os.msghdr_const,
flags: u32,
) void {
io_uring_prep_sendmsg(sqe, fd, msg, flags);
sqe.opcode = .SENDMSG_ZC;
}
pub fn io_uring_prep_recvmsg(
sqe: *linux.io_uring_sqe,
fd: os.fd_t,
@ -3491,3 +3561,59 @@ test "accept multishot" {
os.closeSocket(client);
}
}
test "accept/connect/send_zc/recv" {
if (builtin.os.tag != .linux) return error.SkipZigTest;
var ring = IO_Uring.init(16, 0) catch |err| switch (err) {
error.SystemOutdated => return error.SkipZigTest,
error.PermissionDenied => return error.SkipZigTest,
else => return err,
};
defer ring.deinit();
const socket_test_harness = try createSocketTestHarness(&ring);
defer socket_test_harness.close();
const buffer_send = [_]u8{ 0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 0xa, 0xb, 0xc, 0xd, 0xe };
var buffer_recv = [_]u8{0} ** 10;
// zero-copy send
const send = try ring.send_zc(0xeeeeeeee, socket_test_harness.client, buffer_send[0..], 0, 0);
send.flags |= linux.IOSQE_IO_LINK;
_ = try ring.recv(0xffffffff, socket_test_harness.server, .{ .buffer = buffer_recv[0..] }, 0);
try testing.expectEqual(@as(u32, 2), try ring.submit());
// First completion of zero-copy send.
// IORING_CQE_F_MORE, means that there
// will be a second completion event / notification for the
// request, with the user_data field set to the same value.
// buffer_send must be keep alive until second cqe.
var cqe_send = try ring.copy_cqe();
if (cqe_send.err() == .INVAL) return error.SkipZigTest;
try testing.expectEqual(linux.io_uring_cqe{
.user_data = 0xeeeeeeee,
.res = buffer_send.len,
.flags = linux.IORING_CQE_F_MORE,
}, cqe_send);
const cqe_recv = try ring.copy_cqe();
if (cqe_recv.err() == .INVAL) return error.SkipZigTest;
try testing.expectEqual(linux.io_uring_cqe{
.user_data = 0xffffffff,
.res = buffer_recv.len,
.flags = cqe_recv.flags & linux.IORING_CQE_F_SOCK_NONEMPTY,
}, cqe_recv);
try testing.expectEqualSlices(u8, buffer_send[0..buffer_recv.len], buffer_recv[0..]);
// Second completion of zero-copy send.
// IORING_CQE_F_NOTIF in flags signals that kernel is done with send_buffer
cqe_send = try ring.copy_cqe();
if (cqe_send.err() == .INVAL) return error.SkipZigTest;
try testing.expectEqual(linux.io_uring_cqe{
.user_data = 0xeeeeeeee,
.res = 0,
.flags = linux.IORING_CQE_F_NOTIF,
}, cqe_send);
}