From 2da8eff9d6b7f9d784a596836188cbede6cfb1d0 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Igor=20Anic=CC=81?= Date: Thu, 27 Feb 2025 23:55:43 +0100 Subject: [PATCH 1/6] io_uring: add bind and listen --- lib/std/os/linux.zig | 10 ++ lib/std/os/linux/IoUring.zig | 186 ++++++++++++++++++++++++++++++ lib/std/os/linux/io_uring_sqe.zig | 47 ++++++++ 3 files changed, 243 insertions(+) diff --git a/lib/std/os/linux.zig b/lib/std/os/linux.zig index d224a45a8f..53424dc547 100644 --- a/lib/std/os/linux.zig +++ b/lib/std/os/linux.zig @@ -5806,6 +5806,9 @@ pub const IORING_OP = enum(u8) { FUTEX_WAITV, FIXED_FD_INSTALL, FTRUNCATE, + BIND, + LISTEN, + RECV_ZC, _, }; @@ -6190,6 +6193,13 @@ pub const IORING_RESTRICTION = enum(u16) { _, }; +pub const IO_URING_SOCKET_OP = enum(u16) { + SIOCIN = 0, + SIOCOUTQ = 1, + GETSOCKOPT = 2, + SETSOCKOPT = 3, +}; + pub const io_uring_buf = extern struct { addr: u64, len: u32, diff --git a/lib/std/os/linux/IoUring.zig b/lib/std/os/linux/IoUring.zig index f26b2ce8bf..7ce2acf49b 100644 --- a/lib/std/os/linux/IoUring.zig +++ b/lib/std/os/linux/IoUring.zig @@ -1356,6 +1356,55 @@ pub fn socket_direct_alloc( return sqe; } +/// Queues (but does not submit) an SQE to perform an `bind(2)` on a socket. +/// Returns a pointer to the SQE. +/// Available since 6.11 +pub fn bind( + self: *IoUring, + user_data: u64, + fd: posix.fd_t, + addr: *const posix.sockaddr, + addrlen: posix.socklen_t, + flags: u32, +) !*linux.io_uring_sqe { + const sqe = try self.get_sqe(); + sqe.prep_bind(fd, addr, addrlen, flags); + sqe.user_data = user_data; + return sqe; +} + +/// Queues (but does not submit) an SQE to perform an `listen(2)` on a socket. +/// Returns a pointer to the SQE. +/// Available since 6.11 +pub fn listen( + self: *IoUring, + user_data: u64, + fd: posix.fd_t, + backlog: usize, + flags: u32, +) !*linux.io_uring_sqe { + const sqe = try self.get_sqe(); + sqe.prep_listen(fd, backlog, flags); + sqe.user_data = user_data; + return sqe; +} + +fn cmd_sock( + self: *IoUring, + user_data: u64, + cmd_op: linux.IO_URING_SOCKET_OP, + fd: linux.fd_t, + level: u32, + optname: u32, + optval: u64, + optlen: u32, +) !*linux.io_uring_sqe { + const sqe = try self.get_sqe(); + sqe.prep_cmd_sock(cmd_op, fd, level, optname, optval, optlen); + sqe.user_data = user_data; + return sqe; +} + pub const SubmissionQueue = struct { head: *u32, tail: *u32, @@ -4305,3 +4354,140 @@ test "copy_cqes with wrapping sq.cqes buffer" { try testing.expectEqual(2 + 4 * i, ring.cq.head.*); } } + +test "bind" { + try skipKernelLessThan(.{ .major = 6, .minor = 11, .patch = 0 }); + + var ring = IoUring.init(4, 0) catch |err| switch (err) { + error.SystemOutdated => return error.SkipZigTest, + error.PermissionDenied => return error.SkipZigTest, + else => return err, + }; + defer ring.deinit(); + + var addr = net.Address.initIp4([4]u8{ 127, 0, 0, 1 }, 0); + const proto: u32 = if (addr.any.family == linux.AF.UNIX) 0 else linux.IPPROTO.TCP; + + const listen_fd = brk: { + // Create socket + _ = try ring.socket(1, addr.any.family, linux.SOCK.STREAM | linux.SOCK.CLOEXEC, proto, 0); + try testing.expectEqual(1, try ring.submit()); + var cqe = try ring.copy_cqe(); + try testing.expectEqual(1, cqe.user_data); + try testing.expectEqual(posix.E.SUCCESS, cqe.err()); + const listen_fd: posix.fd_t = @intCast(cqe.res); + try testing.expect(listen_fd > 2); + + // Prepare: set socket option * 2, bind, listen + var sock_opt: u32 = 1; + var sqe = try ring.cmd_sock(2, .SETSOCKOPT, listen_fd, linux.SOL.SOCKET, linux.SO.REUSEADDR, @intFromPtr(&sock_opt), @sizeOf(u32)); + sqe.flags |= linux.IOSQE_IO_LINK; + sqe = try ring.cmd_sock(3, .SETSOCKOPT, listen_fd, linux.SOL.SOCKET, linux.SO.REUSEPORT, @intFromPtr(&sock_opt), @sizeOf(u32)); + sqe.flags |= linux.IOSQE_IO_LINK; + sqe = try ring.bind(4, listen_fd, &addr.any, addr.getOsSockLen(), 0); + sqe.flags |= linux.IOSQE_IO_LINK; + _ = try ring.listen(5, listen_fd, 1, 0); + // Submit 4 operations + try testing.expectEqual(4, try ring.submit()); + // Expect all to succeed + for (2..6) |user_data| { + cqe = try ring.copy_cqe(); + try testing.expectEqual(user_data, cqe.user_data); + try testing.expectEqual(posix.E.SUCCESS, cqe.err()); + } + + // Check that socket option is set + sock_opt = 0xff; + _ = try ring.cmd_sock(5, .GETSOCKOPT, listen_fd, linux.SOL.SOCKET, linux.SO.REUSEADDR, @intFromPtr(&sock_opt), @sizeOf(u32)); + try testing.expectEqual(1, try ring.submit()); + cqe = try ring.copy_cqe(); + try testing.expectEqual(5, cqe.user_data); + try testing.expectEqual(posix.E.SUCCESS, cqe.err()); + try testing.expectEqual(1, sock_opt); + + // Read system assigned port into addr + var addr_len: posix.socklen_t = addr.getOsSockLen(); + try posix.getsockname(listen_fd, &addr.any, &addr_len); + + break :brk listen_fd; + }; + + const connect_fd = brk: { + // Create connect socket + _ = try ring.socket(6, addr.any.family, linux.SOCK.STREAM | linux.SOCK.CLOEXEC, proto, 0); + try testing.expectEqual(1, try ring.submit()); + const cqe = try ring.copy_cqe(); + try testing.expectEqual(6, cqe.user_data); + try testing.expectEqual(posix.E.SUCCESS, cqe.err()); + // Get connect socket fd + const connect_fd: posix.fd_t = @intCast(cqe.res); + try testing.expect(connect_fd > 2 and connect_fd != listen_fd); + break :brk connect_fd; + }; + + // Prepare accept/connect operations + _ = try ring.accept(7, listen_fd, null, null, 0); + _ = try ring.connect(8, connect_fd, &addr.any, addr.getOsSockLen()); + try testing.expectEqual(2, try ring.submit()); + // Get listener accepted socket + var accept_fd: posix.socket_t = 0; + for (0..2) |_| { + const cqe = try ring.copy_cqe(); + try testing.expectEqual(posix.E.SUCCESS, cqe.err()); + if (cqe.user_data == 7) { + accept_fd = @intCast(cqe.res); + } else { + try testing.expectEqual(8, cqe.user_data); + } + } + try testing.expect(accept_fd > 2 and accept_fd != listen_fd and accept_fd != connect_fd); + + // Communicate + try testSendRecv(&ring, connect_fd, accept_fd); + try testSendRecv(&ring, accept_fd, connect_fd); + + // Shutdown and close all sockets + for ([_]posix.socket_t{ connect_fd, accept_fd, listen_fd }) |fd| { + var sqe = try ring.shutdown(9, fd, posix.SHUT.RDWR); + sqe.flags |= linux.IOSQE_IO_LINK; + _ = try ring.close(10, fd); + try testing.expectEqual(2, try ring.submit()); + for (0..2) |i| { + const cqe = try ring.copy_cqe(); + try testing.expectEqual(posix.E.SUCCESS, cqe.err()); + try testing.expectEqual(9 + i, cqe.user_data); + } + } +} + +fn testSendRecv(ring: *IoUring, send_fd: posix.socket_t, recv_fd: posix.socket_t) !void { + const buffer_send = "0123456789abcdf" ** 10; + var buffer_recv: [buffer_send.len * 2]u8 = undefined; + + // 2 sends + var sqe = try ring.send(1, send_fd, buffer_send, linux.MSG.WAITALL); + sqe.flags |= linux.IOSQE_IO_LINK; + _ = try ring.send(2, send_fd, buffer_send, linux.MSG.WAITALL); + try testing.expectEqual(2, try ring.submit()); + for (0..2) |i| { + const cqe = try ring.copy_cqe(); + try testing.expectEqual(1 + i, cqe.user_data); + try testing.expectEqual(posix.E.SUCCESS, cqe.err()); + try testing.expectEqual(buffer_send.len, @as(usize, @intCast(cqe.res))); + } + + // receive + var recv_len: usize = 0; + while (recv_len < buffer_send.len * 2) { + _ = try ring.recv(3, recv_fd, .{ .buffer = buffer_recv[recv_len..] }, 0); + try testing.expectEqual(1, try ring.submit()); + const cqe = try ring.copy_cqe(); + try testing.expectEqual(3, cqe.user_data); + try testing.expectEqual(posix.E.SUCCESS, cqe.err()); + recv_len += @intCast(cqe.res); + } + + // inspect recv buffer + try testing.expectEqualSlices(u8, buffer_send, buffer_recv[0..buffer_send.len]); + try testing.expectEqualSlices(u8, buffer_send, buffer_recv[buffer_send.len..]); +} diff --git a/lib/std/os/linux/io_uring_sqe.zig b/lib/std/os/linux/io_uring_sqe.zig index 19dae3e8fa..0b7ed519cd 100644 --- a/lib/std/os/linux/io_uring_sqe.zig +++ b/lib/std/os/linux/io_uring_sqe.zig @@ -619,4 +619,51 @@ pub const io_uring_sqe = extern struct { sqe.rw_flags = flags; sqe.splice_fd_in = @bitCast(options); } + + pub fn prep_bind( + sqe: *linux.io_uring_sqe, + fd: linux.fd_t, + addr: *const linux.sockaddr, + addrlen: linux.socklen_t, + flags: u32, + ) void { + sqe.prep_rw(.BIND, fd, @intFromPtr(addr), 0, addrlen); + sqe.rw_flags = flags; + } + + pub fn prep_listen( + sqe: *linux.io_uring_sqe, + fd: linux.fd_t, + backlog: usize, + flags: u32, + ) void { + sqe.prep_rw(.LISTEN, fd, 0, backlog, 0); + sqe.rw_flags = flags; + } + + pub fn prep_cmd_sock( + sqe: *linux.io_uring_sqe, + cmd_op: linux.IO_URING_SOCKET_OP, + fd: linux.fd_t, + level: u32, + optname: u32, + optval: u64, + optlen: u32, + ) void { + sqe.prep_rw(.URING_CMD, fd, 0, 0, 0); + // off is overloaded with cmd_op, https://github.com/axboe/liburing/blob/e1003e496e66f9b0ae06674869795edf772d5500/src/include/liburing/io_uring.h#L39 + sqe.off = @intFromEnum(cmd_op); + // addr is overloaded, https://github.com/axboe/liburing/blob/e1003e496e66f9b0ae06674869795edf772d5500/src/include/liburing/io_uring.h#L46 + sqe.addr = @bitCast(packed struct { + level: u32, + optname: u32, + }{ + .level = level, + .optname = optname, + }); + // splice_fd_in if overloaded u32 -> i32 + sqe.splice_fd_in = @bitCast(optlen); + // addr3 is overloaded, https://github.com/axboe/liburing/blob/e1003e496e66f9b0ae06674869795edf772d5500/src/include/liburing/io_uring.h#L102 + sqe.addr3 = optval; + } }; From 85e20748785880555f9db3bb3061edbd1bae10e7 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Igor=20Anic=CC=81?= Date: Fri, 28 Feb 2025 21:05:43 +0100 Subject: [PATCH 2/6] io_uring: fix tests on 5.4.0 kernel Found it failing in a new way on that kernel. --- lib/std/os/linux/IoUring.zig | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/lib/std/os/linux/IoUring.zig b/lib/std/os/linux/IoUring.zig index 7ce2acf49b..d100916e81 100644 --- a/lib/std/os/linux/IoUring.zig +++ b/lib/std/os/linux/IoUring.zig @@ -3103,7 +3103,7 @@ test "provide_buffers: read" { const cqe = try ring.copy_cqe(); switch (cqe.err()) { // Happens when the kernel is < 5.7 - .INVAL => return error.SkipZigTest, + .INVAL, .BADF => return error.SkipZigTest, .SUCCESS => {}, else => |errno| std.debug.panic("unhandled errno: {}", .{errno}), } @@ -3230,7 +3230,7 @@ test "remove_buffers" { const cqe = try ring.copy_cqe(); switch (cqe.err()) { - .INVAL => return error.SkipZigTest, + .INVAL, .BADF => return error.SkipZigTest, .SUCCESS => {}, else => |errno| std.debug.panic("unhandled errno: {}", .{errno}), } From d98c0893b0a565507af955d4f47e63393ac8e464 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Igor=20Anic=CC=81?= Date: Fri, 28 Feb 2025 21:36:02 +0100 Subject: [PATCH 3/6] io_uring: probe capabilities function ring.get_probe returns io_uring_probe which can be use to probe capabilities of the current running kernel. Ref: https://unixism.net/loti/ref-liburing/supported_caps.html https://github.com/axboe/liburing/blob/e1003e496e66f9b0ae06674869795edf772d5500/src/setup.c#L454 --- lib/std/os/linux.zig | 26 ++++++++++++++------------ lib/std/os/linux/IoUring.zig | 18 +++++++++++++++--- 2 files changed, 29 insertions(+), 15 deletions(-) diff --git a/lib/std/os/linux.zig b/lib/std/os/linux.zig index 53424dc547..aff6119cf1 100644 --- a/lib/std/os/linux.zig +++ b/lib/std/os/linux.zig @@ -6138,26 +6138,28 @@ pub const IO_URING_OP_SUPPORTED = 1 << 0; pub const io_uring_probe_op = extern struct { op: IORING_OP, - resv: u8, - - /// IO_URING_OP_* flags - flags: u16, - + flags: u16, // IO_URING_OP_* flags resv2: u32, + + pub fn is_supported(self: @This()) bool { + return self.flags & IO_URING_OP_SUPPORTED != 0; + } }; pub const io_uring_probe = extern struct { - /// last opcode supported - last_op: IORING_OP, - - /// Number of io_uring_probe_op following - ops_len: u8, - + last_op: IORING_OP, // last opcode supported + ops_len: u8, // length of ops[] array below resv: u16, resv2: [3]u32, + ops: [256]io_uring_probe_op, - // Followed by up to `ops_len` io_uring_probe_op structures + pub fn is_supported(self: @This(), op: IORING_OP) bool { + const i = @intFromEnum(op); + if (i > @intFromEnum(self.last_op) or i >= self.ops_len) + return false; + return self.ops[i].is_supported(); + } }; pub const io_uring_restriction = extern struct { diff --git a/lib/std/os/linux/IoUring.zig b/lib/std/os/linux/IoUring.zig index d100916e81..4e000d6c82 100644 --- a/lib/std/os/linux/IoUring.zig +++ b/lib/std/os/linux/IoUring.zig @@ -1272,6 +1272,16 @@ pub fn unregister_buffers(self: *IoUring) !void { } } +/// Returns a io_uring_probe which is used to probe the capabilities of the +/// io_uring subsystem of the running kernel. The io_uring_probe contains the +/// list of supported operations. +pub fn get_probe(self: *IoUring) !linux.io_uring_probe { + var probe = mem.zeroInit(linux.io_uring_probe, .{}); + const res = linux.io_uring_register(self.fd, .REGISTER_PROBE, &probe, probe.ops.len); + try handle_register_buf_ring_result(res); + return probe; +} + fn handle_registration_result(res: usize) !void { switch (linux.E.init(res)) { .SUCCESS => {}, @@ -4355,9 +4365,7 @@ test "copy_cqes with wrapping sq.cqes buffer" { } } -test "bind" { - try skipKernelLessThan(.{ .major = 6, .minor = 11, .patch = 0 }); - +test "bind/listen/connect" { var ring = IoUring.init(4, 0) catch |err| switch (err) { error.SystemOutdated => return error.SkipZigTest, error.PermissionDenied => return error.SkipZigTest, @@ -4365,6 +4373,10 @@ test "bind" { }; defer ring.deinit(); + const probe = ring.get_probe() catch return error.SkipZigTest; + // LISTEN is higher required operation + if (!probe.is_supported(.LISTEN)) return error.SkipZigTest; + var addr = net.Address.initIp4([4]u8{ 127, 0, 0, 1 }, 0); const proto: u32 = if (addr.any.family == linux.AF.UNIX) 0 else linux.IPPROTO.TCP; From 4df039d235d5f77830fdc30a4c23121f6216364a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Igor=20Anic=CC=81?= Date: Sun, 2 Mar 2025 13:11:45 +0100 Subject: [PATCH 4/6] io_uring: add setsockopt/getsockopt ring.cmd_sock is generic socket operation. Two most common uses are setsockopt and getsockopt. This provides same interface as posix versions of this methods. libring has also [sqe_set_flags](https://man7.org/linux/man-pages/man3/io_uring_sqe_set_flags.3.html) method. Adding that in our io_uring_sqe. Adding sqe.link_next method for setting most common flag. --- lib/std/os/linux.zig | 1 + lib/std/os/linux/IoUring.zig | 80 +++++++++++++++++++++++-------- lib/std/os/linux/io_uring_sqe.zig | 10 ++++ 3 files changed, 72 insertions(+), 19 deletions(-) diff --git a/lib/std/os/linux.zig b/lib/std/os/linux.zig index aff6119cf1..0cf2c60db1 100644 --- a/lib/std/os/linux.zig +++ b/lib/std/os/linux.zig @@ -6154,6 +6154,7 @@ pub const io_uring_probe = extern struct { resv2: [3]u32, ops: [256]io_uring_probe_op, + /// Is the operation supported on the running kernel. pub fn is_supported(self: @This(), op: IORING_OP) bool { const i = @intFromEnum(op); if (i > @intFromEnum(self.last_op) or i >= self.ops_len) diff --git a/lib/std/os/linux/IoUring.zig b/lib/std/os/linux/IoUring.zig index 4e000d6c82..323e36e6d6 100644 --- a/lib/std/os/linux/IoUring.zig +++ b/lib/std/os/linux/IoUring.zig @@ -1399,15 +1399,18 @@ pub fn listen( return sqe; } -fn cmd_sock( +/// Prepares an cmd request for a socket. +/// See: https://man7.org/linux/man-pages/man3/io_uring_prep_cmd.3.html +/// Available since 6.7. +pub fn cmd_sock( self: *IoUring, user_data: u64, cmd_op: linux.IO_URING_SOCKET_OP, fd: linux.fd_t, - level: u32, - optname: u32, - optval: u64, - optlen: u32, + level: u32, // linux.SOL + optname: u32, // linux.SO + optval: u64, // pointer to the option value + optlen: u32, // size of the option value ) !*linux.io_uring_sqe { const sqe = try self.get_sqe(); sqe.prep_cmd_sock(cmd_op, fd, level, optname, optval, optlen); @@ -1415,6 +1418,50 @@ fn cmd_sock( return sqe; } +/// Prepares set socket option for the optname argument, at the protocol +/// level specified by the level argument. +/// Available since 6.7.n +pub fn setsockopt( + self: *IoUring, + user_data: u64, + fd: linux.fd_t, + level: u32, // linux.SOL + optname: u32, // linux.SO + opt: []const u8, +) !*linux.io_uring_sqe { + return try self.cmd_sock( + user_data, + .SETSOCKOPT, + fd, + level, + optname, + @intFromPtr(opt.ptr), + @intCast(opt.len), + ); +} + +/// Prepares get socket option to retrieve the value for the option specified by +/// the option_name argument for the socket specified by the fd argument. +/// Available since 6.7. +pub fn getsockopt( + self: *IoUring, + user_data: u64, + fd: linux.fd_t, + level: u32, // linux.SOL + optname: u32, // linux.SO + opt: []u8, +) !*linux.io_uring_sqe { + return try self.cmd_sock( + user_data, + .GETSOCKOPT, + fd, + level, + optname, + @intFromPtr(opt.ptr), + @intCast(opt.len), + ); +} + pub const SubmissionQueue = struct { head: *u32, tail: *u32, @@ -4391,13 +4438,10 @@ test "bind/listen/connect" { try testing.expect(listen_fd > 2); // Prepare: set socket option * 2, bind, listen - var sock_opt: u32 = 1; - var sqe = try ring.cmd_sock(2, .SETSOCKOPT, listen_fd, linux.SOL.SOCKET, linux.SO.REUSEADDR, @intFromPtr(&sock_opt), @sizeOf(u32)); - sqe.flags |= linux.IOSQE_IO_LINK; - sqe = try ring.cmd_sock(3, .SETSOCKOPT, listen_fd, linux.SOL.SOCKET, linux.SO.REUSEPORT, @intFromPtr(&sock_opt), @sizeOf(u32)); - sqe.flags |= linux.IOSQE_IO_LINK; - sqe = try ring.bind(4, listen_fd, &addr.any, addr.getOsSockLen(), 0); - sqe.flags |= linux.IOSQE_IO_LINK; + var optval: u32 = 1; + (try ring.setsockopt(2, listen_fd, linux.SOL.SOCKET, linux.SO.REUSEADDR, mem.asBytes(&optval))).link_next(); + (try ring.setsockopt(3, listen_fd, linux.SOL.SOCKET, linux.SO.REUSEPORT, mem.asBytes(&optval))).link_next(); + (try ring.bind(4, listen_fd, &addr.any, addr.getOsSockLen(), 0)).link_next(); _ = try ring.listen(5, listen_fd, 1, 0); // Submit 4 operations try testing.expectEqual(4, try ring.submit()); @@ -4409,13 +4453,13 @@ test "bind/listen/connect" { } // Check that socket option is set - sock_opt = 0xff; - _ = try ring.cmd_sock(5, .GETSOCKOPT, listen_fd, linux.SOL.SOCKET, linux.SO.REUSEADDR, @intFromPtr(&sock_opt), @sizeOf(u32)); + optval = 0; + _ = try ring.getsockopt(5, listen_fd, linux.SOL.SOCKET, linux.SO.REUSEADDR, mem.asBytes(&optval)); try testing.expectEqual(1, try ring.submit()); cqe = try ring.copy_cqe(); try testing.expectEqual(5, cqe.user_data); try testing.expectEqual(posix.E.SUCCESS, cqe.err()); - try testing.expectEqual(1, sock_opt); + try testing.expectEqual(1, optval); // Read system assigned port into addr var addr_len: posix.socklen_t = addr.getOsSockLen(); @@ -4460,8 +4504,7 @@ test "bind/listen/connect" { // Shutdown and close all sockets for ([_]posix.socket_t{ connect_fd, accept_fd, listen_fd }) |fd| { - var sqe = try ring.shutdown(9, fd, posix.SHUT.RDWR); - sqe.flags |= linux.IOSQE_IO_LINK; + (try ring.shutdown(9, fd, posix.SHUT.RDWR)).link_next(); _ = try ring.close(10, fd); try testing.expectEqual(2, try ring.submit()); for (0..2) |i| { @@ -4477,8 +4520,7 @@ fn testSendRecv(ring: *IoUring, send_fd: posix.socket_t, recv_fd: posix.socket_t var buffer_recv: [buffer_send.len * 2]u8 = undefined; // 2 sends - var sqe = try ring.send(1, send_fd, buffer_send, linux.MSG.WAITALL); - sqe.flags |= linux.IOSQE_IO_LINK; + _ = try ring.send(1, send_fd, buffer_send, linux.MSG.WAITALL); _ = try ring.send(2, send_fd, buffer_send, linux.MSG.WAITALL); try testing.expectEqual(2, try ring.submit()); for (0..2) |i| { diff --git a/lib/std/os/linux/io_uring_sqe.zig b/lib/std/os/linux/io_uring_sqe.zig index 0b7ed519cd..5658206a66 100644 --- a/lib/std/os/linux/io_uring_sqe.zig +++ b/lib/std/os/linux/io_uring_sqe.zig @@ -666,4 +666,14 @@ pub const io_uring_sqe = extern struct { // addr3 is overloaded, https://github.com/axboe/liburing/blob/e1003e496e66f9b0ae06674869795edf772d5500/src/include/liburing/io_uring.h#L102 sqe.addr3 = optval; } + + pub fn set_flags(sqe: *linux.io_uring_sqe, flags: u8) void { + sqe.flags |= flags; + } + + /// This SQE forms a link with the next SQE in the submission ring. Next SQE + /// will not be started before this one completes. Forms a chain of SQEs. + pub fn link_next(sqe: *linux.io_uring_sqe) void { + sqe.flags |= linux.IOSQE_IO_LINK; + } }; From c133171567fe3a81f817d0ea159bd9229d75291c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Igor=20Anic=CC=81?= Date: Mon, 3 Mar 2025 14:37:52 +0100 Subject: [PATCH 5/6] io_uring: incremental provided buffer consumption [Incremental provided buffer consumption](https://github.com/axboe/liburing/wiki/What's-new-with-io_uring-in-6.11-and-6.12#incremental-provided-buffer-consumption) support is added in kernel 6.12. IoUring.BufferGroup will now use incremental consumption whenever kernel supports it. Before, provided buffers are wholly consumed when picked. Each cqe points to the different buffer. With this, cqe points to the part of the buffer. Multiple cqe's can reuse same buffer. Appropriate sizing of buffers becomes less important. There are slight changes in BufferGroup interface (it now needs to track current receive point for each buffer). Init requires allocator instead of buffers slice, it will allocate buffers slice and head pointers slice. Get and put now requires cqe becasue there we have information will the buffer be reused. --- lib/std/os/linux.zig | 9 +- lib/std/os/linux/IoUring.zig | 177 +++++++++++++++++++---------------- 2 files changed, 104 insertions(+), 82 deletions(-) diff --git a/lib/std/os/linux.zig b/lib/std/os/linux.zig index 0cf2c60db1..ebdcd3f94a 100644 --- a/lib/std/os/linux.zig +++ b/lib/std/os/linux.zig @@ -5933,6 +5933,8 @@ pub const IORING_CQE_F_MORE = 1 << 1; pub const IORING_CQE_F_SOCK_NONEMPTY = 1 << 2; /// Set for notification CQEs. Can be used to distinct them from sends. pub const IORING_CQE_F_NOTIF = 1 << 3; +/// If set, the buffer ID set in the completion will get more completions. +pub const IORING_CQE_F_BUF_MORE = 1 << 4; pub const IORING_CQE_BUFFER_SHIFT = 16; @@ -6222,8 +6224,13 @@ pub const io_uring_buf_reg = extern struct { ring_addr: u64, ring_entries: u32, bgid: u16, - pad: u16, + flags: u16, resv: [3]u64, + + pub const FLAG = struct { + // Incremental buffer consummation. + pub const INC: u16 = 2; + }; }; pub const io_uring_getevents_arg = extern struct { diff --git a/lib/std/os/linux/IoUring.zig b/lib/std/os/linux/IoUring.zig index 323e36e6d6..368bd0fb59 100644 --- a/lib/std/os/linux/IoUring.zig +++ b/lib/std/os/linux/IoUring.zig @@ -1594,28 +1594,34 @@ pub const BufferGroup = struct { buffers: []u8, /// Size of each buffer in buffers. buffer_size: u32, - // Number of buffers in `buffers`, number of `io_uring_buf structures` in br. + /// Number of buffers in `buffers`, number of `io_uring_buf structures` in br. buffers_count: u16, + /// Head of unconsumed part of each buffer, if incremental consumption is enabled + heads: []u32, /// ID of this group, must be unique in ring. group_id: u16, pub fn init( ring: *IoUring, + allocator: mem.Allocator, group_id: u16, - buffers: []u8, buffer_size: u32, buffers_count: u16, ) !BufferGroup { - assert(buffers.len == buffers_count * buffer_size); + const buffers = try allocator.alloc(u8, buffer_size * buffers_count); + errdefer allocator.free(buffers); + const heads = try allocator.alloc(u32, buffers_count); + errdefer allocator.free(heads); - const br = try setup_buf_ring(ring.fd, buffers_count, group_id); + const br = try setup_buf_ring(ring.fd, buffers_count, group_id, linux.io_uring_buf_reg.FLAG.INC); buf_ring_init(br); const mask = buf_ring_mask(buffers_count); var i: u16 = 0; while (i < buffers_count) : (i += 1) { - const start = buffer_size * i; - const buf = buffers[start .. start + buffer_size]; + const pos = buffer_size * i; + const buf = buffers[pos .. pos + buffer_size]; + heads[i] = 0; buf_ring_add(br, buf, i, mask, i); } buf_ring_advance(br, buffers_count); @@ -1625,11 +1631,18 @@ pub const BufferGroup = struct { .group_id = group_id, .br = br, .buffers = buffers, + .heads = heads, .buffer_size = buffer_size, .buffers_count = buffers_count, }; } + pub fn deinit(self: *BufferGroup, allocator: mem.Allocator) void { + free_buf_ring(self.ring.fd, self.br, self.buffers_count, self.group_id); + allocator.free(self.buffers); + allocator.free(self.heads); + } + // Prepare recv operation which will select buffer from this group. pub fn recv(self: *BufferGroup, user_data: u64, fd: posix.fd_t, flags: u32) !*linux.io_uring_sqe { var sqe = try self.ring.get_sqe(); @@ -1649,33 +1662,34 @@ pub const BufferGroup = struct { } // Get buffer by id. - pub fn get(self: *BufferGroup, buffer_id: u16) []u8 { - const head = self.buffer_size * buffer_id; - return self.buffers[head .. head + self.buffer_size]; + fn get_by_id(self: *BufferGroup, buffer_id: u16) []u8 { + const pos = self.buffer_size * buffer_id; + return self.buffers[pos .. pos + self.buffer_size][self.heads[buffer_id]..]; } // Get buffer by CQE. - pub fn get_cqe(self: *BufferGroup, cqe: linux.io_uring_cqe) ![]u8 { + pub fn get(self: *BufferGroup, cqe: linux.io_uring_cqe) ![]u8 { const buffer_id = try cqe.buffer_id(); const used_len = @as(usize, @intCast(cqe.res)); - return self.get(buffer_id)[0..used_len]; - } - - // Release buffer to the kernel. - pub fn put(self: *BufferGroup, buffer_id: u16) void { - const mask = buf_ring_mask(self.buffers_count); - const buffer = self.get(buffer_id); - buf_ring_add(self.br, buffer, buffer_id, mask, 0); - buf_ring_advance(self.br, 1); + return self.get_by_id(buffer_id)[0..used_len]; } // Release buffer from CQE to the kernel. - pub fn put_cqe(self: *BufferGroup, cqe: linux.io_uring_cqe) !void { - self.put(try cqe.buffer_id()); - } + pub fn put(self: *BufferGroup, cqe: linux.io_uring_cqe) !void { + const buffer_id = try cqe.buffer_id(); + if (cqe.flags & linux.IORING_CQE_F_BUF_MORE == linux.IORING_CQE_F_BUF_MORE) { + // Incremental consumption active, kernel will write to the this buffer again + const used_len = @as(u32, @intCast(cqe.res)); + // Track what part of the buffer is used + self.heads[buffer_id] += used_len; + return; + } + self.heads[buffer_id] = 0; - pub fn deinit(self: *BufferGroup) void { - free_buf_ring(self.ring.fd, self.br, self.buffers_count, self.group_id); + // Release buffer to the kernel. const mask = buf_ring_mask(self.buffers_count); + const mask = buf_ring_mask(self.buffers_count); + buf_ring_add(self.br, self.get_by_id(buffer_id), buffer_id, mask, 0); + buf_ring_advance(self.br, 1); } }; @@ -1684,7 +1698,7 @@ pub const BufferGroup = struct { /// `fd` is IO_Uring.fd for which the provided buffer ring is being registered. /// `entries` is the number of entries requested in the buffer ring, must be power of 2. /// `group_id` is the chosen buffer group ID, unique in IO_Uring. -pub fn setup_buf_ring(fd: posix.fd_t, entries: u16, group_id: u16) !*align(page_size_min) linux.io_uring_buf_ring { +pub fn setup_buf_ring(fd: posix.fd_t, entries: u16, group_id: u16, flags: u16) !*align(page_size_min) linux.io_uring_buf_ring { if (entries == 0 or entries > 1 << 15) return error.EntriesNotInRange; if (!std.math.isPowerOfTwo(entries)) return error.EntriesNotPowerOfTwo; @@ -1701,22 +1715,24 @@ pub fn setup_buf_ring(fd: posix.fd_t, entries: u16, group_id: u16) !*align(page_ assert(mmap.len == mmap_size); const br: *align(page_size_min) linux.io_uring_buf_ring = @ptrCast(mmap.ptr); - try register_buf_ring(fd, @intFromPtr(br), entries, group_id); + try register_buf_ring(fd, @intFromPtr(br), entries, group_id, flags); return br; } -fn register_buf_ring(fd: posix.fd_t, addr: u64, entries: u32, group_id: u16) !void { +fn register_buf_ring(fd: posix.fd_t, addr: u64, entries: u32, group_id: u16, flags: u16) !void { var reg = mem.zeroInit(linux.io_uring_buf_reg, .{ .ring_addr = addr, .ring_entries = entries, .bgid = group_id, + .flags = flags, }); - const res = linux.io_uring_register( - fd, - .REGISTER_PBUF_RING, - @as(*const anyopaque, @ptrCast(®)), - 1, - ); + var res = linux.io_uring_register(fd, .REGISTER_PBUF_RING, @as(*const anyopaque, @ptrCast(®)), 1); + if (linux.E.init(res) == .INVAL and reg.flags & linux.io_uring_buf_reg.FLAG.INC > 0) { + // Retry without incremental buffer consumption. + // It is available since kernel 6.12. returns INVAL on older. + reg.flags &= ~linux.io_uring_buf_reg.FLAG.INC; + res = linux.io_uring_register(fd, .REGISTER_PBUF_RING, @as(*const anyopaque, @ptrCast(®)), 1); + } try handle_register_buf_ring_result(res); } @@ -4041,12 +4057,10 @@ test BufferGroup { const group_id: u16 = 1; // buffers group id const buffers_count: u16 = 1; // number of buffers in buffer group const buffer_size: usize = 128; // size of each buffer in group - const buffers = try testing.allocator.alloc(u8, buffers_count * buffer_size); - defer testing.allocator.free(buffers); var buf_grp = BufferGroup.init( &ring, + testing.allocator, group_id, - buffers, buffer_size, buffers_count, ) catch |err| switch (err) { @@ -4054,7 +4068,7 @@ test BufferGroup { error.ArgumentsInvalid => return error.SkipZigTest, else => return err, }; - defer buf_grp.deinit(); + defer buf_grp.deinit(testing.allocator); // Create client/server fds const fds = try createSocketTestHarness(&ring); @@ -4085,14 +4099,11 @@ test BufferGroup { try testing.expectEqual(posix.E.SUCCESS, cqe.err()); try testing.expectEqual(data.len, @as(usize, @intCast(cqe.res))); // cqe.res holds received data len - // Read buffer_id and used buffer len from cqe - const buffer_id = try cqe.buffer_id(); - const len: usize = @intCast(cqe.res); // Get buffer from pool - const buf = buf_grp.get(buffer_id)[0..len]; + const buf = try buf_grp.get(cqe); try testing.expectEqualSlices(u8, &data, buf); // Release buffer to the kernel when application is done with it - buf_grp.put(buffer_id); + try buf_grp.put(cqe); } } @@ -4110,12 +4121,10 @@ test "ring mapped buffers recv" { const group_id: u16 = 1; // buffers group id const buffers_count: u16 = 2; // number of buffers in buffer group const buffer_size: usize = 4; // size of each buffer in group - const buffers = try testing.allocator.alloc(u8, buffers_count * buffer_size); - defer testing.allocator.free(buffers); var buf_grp = BufferGroup.init( &ring, + testing.allocator, group_id, - buffers, buffer_size, buffers_count, ) catch |err| switch (err) { @@ -4123,7 +4132,7 @@ test "ring mapped buffers recv" { error.ArgumentsInvalid => return error.SkipZigTest, else => return err, }; - defer buf_grp.deinit(); + defer buf_grp.deinit(testing.allocator); // create client/server fds const fds = try createSocketTestHarness(&ring); @@ -4145,14 +4154,18 @@ test "ring mapped buffers recv" { if (cqe_send.err() == .INVAL) return error.SkipZigTest; try testing.expectEqual(linux.io_uring_cqe{ .user_data = user_data, .res = data.len, .flags = 0 }, cqe_send); } + var pos: usize = 0; - // server reads data into provided buffers - // there are 2 buffers of size 4, so each read gets only chunk of data - // we read four chunks of 4, 4, 4, 3 bytes each - var chunk: []const u8 = data[0..buffer_size]; // first chunk - const id1 = try expect_buf_grp_recv(&ring, &buf_grp, fds.server, rnd.int(u64), chunk); - chunk = data[buffer_size .. buffer_size * 2]; // second chunk - const id2 = try expect_buf_grp_recv(&ring, &buf_grp, fds.server, rnd.int(u64), chunk); + // read first chunk + const cqe1 = try buf_grp_recv_submit_get_cqe(&ring, &buf_grp, fds.server, rnd.int(u64)); + var buf = try buf_grp.get(cqe1); + try testing.expectEqualSlices(u8, data[pos..][0..buf.len], buf); + pos += buf.len; + // second chunk + const cqe2 = try buf_grp_recv_submit_get_cqe(&ring, &buf_grp, fds.server, rnd.int(u64)); + buf = try buf_grp.get(cqe2); + try testing.expectEqualSlices(u8, data[pos..][0..buf.len], buf); + pos += buf.len; // both buffers provided to the kernel are used so we get error // 'no more buffers', until we put buffers to the kernel @@ -4169,16 +4182,17 @@ test "ring mapped buffers recv" { } // put buffers back to the kernel - buf_grp.put(id1); - buf_grp.put(id2); + try buf_grp.put(cqe1); + try buf_grp.put(cqe2); - chunk = data[buffer_size * 2 .. buffer_size * 3]; // third chunk - const id3 = try expect_buf_grp_recv(&ring, &buf_grp, fds.server, rnd.int(u64), chunk); - buf_grp.put(id3); - - chunk = data[buffer_size * 3 ..]; // last chunk - const id4 = try expect_buf_grp_recv(&ring, &buf_grp, fds.server, rnd.int(u64), chunk); - buf_grp.put(id4); + // read remaining data + while (pos < data.len) { + const cqe = try buf_grp_recv_submit_get_cqe(&ring, &buf_grp, fds.server, rnd.int(u64)); + buf = try buf_grp.get(cqe); + try testing.expectEqualSlices(u8, data[pos..][0..buf.len], buf); + pos += buf.len; + try buf_grp.put(cqe); + } } } @@ -4196,12 +4210,10 @@ test "ring mapped buffers multishot recv" { const group_id: u16 = 1; // buffers group id const buffers_count: u16 = 2; // number of buffers in buffer group const buffer_size: usize = 4; // size of each buffer in group - const buffers = try testing.allocator.alloc(u8, buffers_count * buffer_size); - defer testing.allocator.free(buffers); var buf_grp = BufferGroup.init( &ring, + testing.allocator, group_id, - buffers, buffer_size, buffers_count, ) catch |err| switch (err) { @@ -4209,7 +4221,7 @@ test "ring mapped buffers multishot recv" { error.ArgumentsInvalid => return error.SkipZigTest, else => return err, }; - defer buf_grp.deinit(); + defer buf_grp.deinit(testing.allocator); // create client/server fds const fds = try createSocketTestHarness(&ring); @@ -4222,7 +4234,7 @@ test "ring mapped buffers multishot recv" { var round: usize = 4; // repeat send/recv cycle round times while (round > 0) : (round -= 1) { // client sends data - const data = [_]u8{ 0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 0xa, 0xb, 0xc, 0xd, 0xe }; + const data = [_]u8{ 0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 0xa, 0xb, 0xc, 0xd, 0xe, 0xf }; { const user_data = rnd.int(u64); _ = try ring.send(user_data, fds.client, data[0..], 0); @@ -4239,7 +4251,7 @@ test "ring mapped buffers multishot recv" { // server reads data into provided buffers // there are 2 buffers of size 4, so each read gets only chunk of data - // we read four chunks of 4, 4, 4, 3 bytes each + // we read four chunks of 4, 4, 4, 4 bytes each var chunk: []const u8 = data[0..buffer_size]; // first chunk const cqe1 = try expect_buf_grp_cqe(&ring, &buf_grp, recv_user_data, chunk); try testing.expect(cqe1.flags & linux.IORING_CQE_F_MORE > 0); @@ -4263,8 +4275,8 @@ test "ring mapped buffers multishot recv" { } // put buffers back to the kernel - buf_grp.put(try cqe1.buffer_id()); - buf_grp.put(try cqe2.buffer_id()); + try buf_grp.put(cqe1); + try buf_grp.put(cqe2); // restart multishot recv_user_data = rnd.int(u64); @@ -4274,12 +4286,12 @@ test "ring mapped buffers multishot recv" { chunk = data[buffer_size * 2 .. buffer_size * 3]; // third chunk const cqe3 = try expect_buf_grp_cqe(&ring, &buf_grp, recv_user_data, chunk); try testing.expect(cqe3.flags & linux.IORING_CQE_F_MORE > 0); - buf_grp.put(try cqe3.buffer_id()); + try buf_grp.put(cqe3); chunk = data[buffer_size * 3 ..]; // last chunk const cqe4 = try expect_buf_grp_cqe(&ring, &buf_grp, recv_user_data, chunk); try testing.expect(cqe4.flags & linux.IORING_CQE_F_MORE > 0); - buf_grp.put(try cqe4.buffer_id()); + try buf_grp.put(cqe4); // cancel pending multishot recv operation { @@ -4323,23 +4335,26 @@ test "ring mapped buffers multishot recv" { } } -// Prepare and submit recv using buffer group. -// Test that buffer from group, pointed by cqe, matches expected. -fn expect_buf_grp_recv( +// Prepare, submit recv and get cqe using buffer group. +fn buf_grp_recv_submit_get_cqe( ring: *IoUring, buf_grp: *BufferGroup, fd: posix.fd_t, user_data: u64, - expected: []const u8, -) !u16 { - // prepare and submit read +) !linux.io_uring_cqe { + // prepare and submit recv const sqe = try buf_grp.recv(user_data, fd, 0); try testing.expect(sqe.flags & linux.IOSQE_BUFFER_SELECT == linux.IOSQE_BUFFER_SELECT); try testing.expect(sqe.buf_index == buf_grp.group_id); try testing.expectEqual(@as(u32, 1), try ring.submit()); // submit + // get cqe, expect success + const cqe = try ring.copy_cqe(); + try testing.expectEqual(user_data, cqe.user_data); + try testing.expect(cqe.res >= 0); // success + try testing.expectEqual(posix.E.SUCCESS, cqe.err()); + try testing.expect(cqe.flags & linux.IORING_CQE_F_BUFFER == linux.IORING_CQE_F_BUFFER); // IORING_CQE_F_BUFFER flag is set - const cqe = try expect_buf_grp_cqe(ring, buf_grp, user_data, expected); - return try cqe.buffer_id(); + return cqe; } fn expect_buf_grp_cqe( @@ -4359,7 +4374,7 @@ fn expect_buf_grp_cqe( // get buffer from pool const buffer_id = try cqe.buffer_id(); const len = @as(usize, @intCast(cqe.res)); - const buf = buf_grp.get(buffer_id)[0..len]; + const buf = buf_grp.get_by_id(buffer_id)[0..len]; try testing.expectEqualSlices(u8, expected, buf); return cqe; From 94b36dbe50a90172a57e0ab828079fcb2b7cfcfc Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Igor=20Anic=CC=81?= Date: Wed, 5 Mar 2025 13:32:52 +0100 Subject: [PATCH 6/6] io_uring: refactor buf_reg flags Use packed struct instead of or-ed integers. Thanks to @linsug for pr comments: https://github.com/ziglang/zig/pull/23062 --- lib/std/os/linux.zig | 19 ++++++++++++------- lib/std/os/linux/IoUring.zig | 21 ++++++++++++++++----- 2 files changed, 28 insertions(+), 12 deletions(-) diff --git a/lib/std/os/linux.zig b/lib/std/os/linux.zig index ebdcd3f94a..b05ea412c5 100644 --- a/lib/std/os/linux.zig +++ b/lib/std/os/linux.zig @@ -6141,7 +6141,8 @@ pub const IO_URING_OP_SUPPORTED = 1 << 0; pub const io_uring_probe_op = extern struct { op: IORING_OP, resv: u8, - flags: u16, // IO_URING_OP_* flags + /// IO_URING_OP_* flags + flags: u16, resv2: u32, pub fn is_supported(self: @This()) bool { @@ -6150,8 +6151,10 @@ pub const io_uring_probe_op = extern struct { }; pub const io_uring_probe = extern struct { - last_op: IORING_OP, // last opcode supported - ops_len: u8, // length of ops[] array below + /// Last opcode supported + last_op: IORING_OP, + /// Length of ops[] array below + ops_len: u8, resv: u16, resv2: [3]u32, ops: [256]io_uring_probe_op, @@ -6224,12 +6227,14 @@ pub const io_uring_buf_reg = extern struct { ring_addr: u64, ring_entries: u32, bgid: u16, - flags: u16, + flags: Flags, resv: [3]u64, - pub const FLAG = struct { - // Incremental buffer consummation. - pub const INC: u16 = 2; + pub const Flags = packed struct { + _0: u1 = 0, + /// Incremental buffer consumption. + inc: bool, + _: u14 = 0, }; }; diff --git a/lib/std/os/linux/IoUring.zig b/lib/std/os/linux/IoUring.zig index 368bd0fb59..2daf29fd05 100644 --- a/lib/std/os/linux/IoUring.zig +++ b/lib/std/os/linux/IoUring.zig @@ -1613,7 +1613,7 @@ pub const BufferGroup = struct { const heads = try allocator.alloc(u32, buffers_count); errdefer allocator.free(heads); - const br = try setup_buf_ring(ring.fd, buffers_count, group_id, linux.io_uring_buf_reg.FLAG.INC); + const br = try setup_buf_ring(ring.fd, buffers_count, group_id, .{ .inc = true }); buf_ring_init(br); const mask = buf_ring_mask(buffers_count); @@ -1698,7 +1698,12 @@ pub const BufferGroup = struct { /// `fd` is IO_Uring.fd for which the provided buffer ring is being registered. /// `entries` is the number of entries requested in the buffer ring, must be power of 2. /// `group_id` is the chosen buffer group ID, unique in IO_Uring. -pub fn setup_buf_ring(fd: posix.fd_t, entries: u16, group_id: u16, flags: u16) !*align(page_size_min) linux.io_uring_buf_ring { +pub fn setup_buf_ring( + fd: posix.fd_t, + entries: u16, + group_id: u16, + flags: linux.io_uring_buf_reg.Flags, +) !*align(page_size_min) linux.io_uring_buf_ring { if (entries == 0 or entries > 1 << 15) return error.EntriesNotInRange; if (!std.math.isPowerOfTwo(entries)) return error.EntriesNotPowerOfTwo; @@ -1719,7 +1724,13 @@ pub fn setup_buf_ring(fd: posix.fd_t, entries: u16, group_id: u16, flags: u16) ! return br; } -fn register_buf_ring(fd: posix.fd_t, addr: u64, entries: u32, group_id: u16, flags: u16) !void { +fn register_buf_ring( + fd: posix.fd_t, + addr: u64, + entries: u32, + group_id: u16, + flags: linux.io_uring_buf_reg.Flags, +) !void { var reg = mem.zeroInit(linux.io_uring_buf_reg, .{ .ring_addr = addr, .ring_entries = entries, @@ -1727,10 +1738,10 @@ fn register_buf_ring(fd: posix.fd_t, addr: u64, entries: u32, group_id: u16, fla .flags = flags, }); var res = linux.io_uring_register(fd, .REGISTER_PBUF_RING, @as(*const anyopaque, @ptrCast(®)), 1); - if (linux.E.init(res) == .INVAL and reg.flags & linux.io_uring_buf_reg.FLAG.INC > 0) { + if (linux.E.init(res) == .INVAL and reg.flags.inc) { // Retry without incremental buffer consumption. // It is available since kernel 6.12. returns INVAL on older. - reg.flags &= ~linux.io_uring_buf_reg.FLAG.INC; + reg.flags.inc = false; res = linux.io_uring_register(fd, .REGISTER_PBUF_RING, @as(*const anyopaque, @ptrCast(®)), 1); } try handle_register_buf_ring_result(res);