From 087ee497d430d30bd9c7a2cdfdfc3d2654d105d3 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Igor=20Anic=CC=81?= Date: Fri, 10 Nov 2023 16:44:18 +0100 Subject: [PATCH 1/8] io_uring: add zero-copy send operation `send_zc` tries to avoid making intermediate copies of data. Zerocopy execution is not guaranteed and may fall back to copying. The flags field of the first struct io_uring_cqe may likely contain IORING_CQE_F_MORE , which means that there will be a second completion event / notification for the request, with the user_data field set to the same value. The user must not modify the data buffer until the notification is posted. The first cqe follows the usual rules and so its res field will contain the number of bytes sent or a negative error code. The notification's res field will be set to zero and the flags field will contain IORING_CQE_F_NOTIF. The two step model is needed because the kernel may hold on to buffers for a long time, e.g. waiting for a TCP ACK, and having a separate cqe for request completions allows userspace to push more data without extra delays. Note, notifications are only responsible for controlling the lifetime of the buffers, and as such don't mean anything about whether the data has atually been sent out or received by the other end. Even errored requests may generate a notification, and the user must check for IORING_CQE_F_MORE rather than relying on the result. Available since kernel 6.0. References: https://man7.org/linux/man-pages/man3/io_uring_prep_send_zc.3.html https://man7.org/linux/man-pages/man2/io_uring_enter.2.html --- lib/std/os/linux/io_uring.zig | 126 ++++++++++++++++++++++++++++++++++ 1 file changed, 126 insertions(+) diff --git a/lib/std/os/linux/io_uring.zig b/lib/std/os/linux/io_uring.zig index 3b282422ae..ff492a4bbc 100644 --- a/lib/std/os/linux/io_uring.zig +++ b/lib/std/os/linux/io_uring.zig @@ -606,6 +606,39 @@ pub const IO_Uring = struct { return sqe; } + /// Queues (but does not submit) an SQE to perform an async zerocopy `send(2)`. + /// Returns a pointer to the SQE. + pub fn send_zc( + self: *IO_Uring, + user_data: u64, + fd: os.fd_t, + buffer: []const u8, + send_flags: u32, + zc_flags: u16, + ) !*linux.io_uring_sqe { + const sqe = try self.get_sqe(); + io_uring_prep_send_zc(sqe, fd, buffer, send_flags, zc_flags); + sqe.user_data = user_data; + return sqe; + } + + /// Queues (but does not submit) an SQE to perform an async zerocopy `send(2)`. + /// Returns a pointer to the SQE. + pub fn send_zc_fixed( + self: *IO_Uring, + user_data: u64, + fd: os.fd_t, + buffer: []const u8, + send_flags: u32, + zc_flags: u16, + buf_index: u16, + ) !*linux.io_uring_sqe { + const sqe = try self.get_sqe(); + io_uring_prep_send_zc_fixed(sqe, fd, buffer, send_flags, zc_flags, buf_index); + sqe.user_data = user_data; + return sqe; + } + /// Queues (but does not submit) an SQE to perform a `recvmsg(2)`. /// Returns a pointer to the SQE. pub fn recvmsg( @@ -636,6 +669,21 @@ pub const IO_Uring = struct { return sqe; } + /// Queues (but does not submit) an SQE to perform an async zerocopy `sendmsg(2)`. + /// Returns a pointer to the SQE. + pub fn sendmsg_zc( + self: *IO_Uring, + user_data: u64, + fd: os.fd_t, + msg: *const os.msghdr_const, + flags: u32, + ) !*linux.io_uring_sqe { + const sqe = try self.get_sqe(); + io_uring_prep_sendmsg_zc(sqe, fd, msg, flags); + sqe.user_data = user_data; + return sqe; + } + /// Queues (but does not submit) an SQE to perform an `openat(2)`. /// Returns a pointer to the SQE. pub fn openat( @@ -1373,6 +1421,28 @@ pub fn io_uring_prep_send(sqe: *linux.io_uring_sqe, fd: os.fd_t, buffer: []const sqe.rw_flags = flags; } +pub fn io_uring_prep_send_zc(sqe: *linux.io_uring_sqe, fd: os.fd_t, buffer: []const u8, flags: u32, zc_flags: u16) void { + io_uring_prep_rw(.SEND_ZC, sqe, fd, @intFromPtr(buffer.ptr), buffer.len, 0); + sqe.rw_flags = flags; + sqe.ioprio = zc_flags; +} + +pub fn io_uring_prep_send_zc_fixed(sqe: *linux.io_uring_sqe, fd: os.fd_t, buffer: []const u8, flags: u32, zc_flags: u16, buf_index: u16) void { + io_uring_prep_send_zc(sqe, fd, buffer, flags, zc_flags); + sqe.ioprio |= linux.IORING_RECVSEND_FIXED_BUF; + sqe.buf_index = buf_index; +} + +pub fn io_uring_prep_sendmsg_zc( + sqe: *linux.io_uring_sqe, + fd: os.fd_t, + msg: *const os.msghdr_const, + flags: u32, +) void { + io_uring_prep_sendmsg(sqe, fd, msg, flags); + sqe.opcode = .SENDMSG_ZC; +} + pub fn io_uring_prep_recvmsg( sqe: *linux.io_uring_sqe, fd: os.fd_t, @@ -3491,3 +3561,59 @@ test "accept multishot" { os.closeSocket(client); } } + +test "accept/connect/send_zc/recv" { + if (builtin.os.tag != .linux) return error.SkipZigTest; + + var ring = IO_Uring.init(16, 0) catch |err| switch (err) { + error.SystemOutdated => return error.SkipZigTest, + error.PermissionDenied => return error.SkipZigTest, + else => return err, + }; + defer ring.deinit(); + + const socket_test_harness = try createSocketTestHarness(&ring); + defer socket_test_harness.close(); + + const buffer_send = [_]u8{ 0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 0xa, 0xb, 0xc, 0xd, 0xe }; + var buffer_recv = [_]u8{0} ** 10; + + // zero-copy send + const send = try ring.send_zc(0xeeeeeeee, socket_test_harness.client, buffer_send[0..], 0, 0); + send.flags |= linux.IOSQE_IO_LINK; + _ = try ring.recv(0xffffffff, socket_test_harness.server, .{ .buffer = buffer_recv[0..] }, 0); + try testing.expectEqual(@as(u32, 2), try ring.submit()); + + // First completion of zero-copy send. + // IORING_CQE_F_MORE, means that there + // will be a second completion event / notification for the + // request, with the user_data field set to the same value. + // buffer_send must be keep alive until second cqe. + var cqe_send = try ring.copy_cqe(); + if (cqe_send.err() == .INVAL) return error.SkipZigTest; + try testing.expectEqual(linux.io_uring_cqe{ + .user_data = 0xeeeeeeee, + .res = buffer_send.len, + .flags = linux.IORING_CQE_F_MORE, + }, cqe_send); + + const cqe_recv = try ring.copy_cqe(); + if (cqe_recv.err() == .INVAL) return error.SkipZigTest; + try testing.expectEqual(linux.io_uring_cqe{ + .user_data = 0xffffffff, + .res = buffer_recv.len, + .flags = cqe_recv.flags & linux.IORING_CQE_F_SOCK_NONEMPTY, + }, cqe_recv); + + try testing.expectEqualSlices(u8, buffer_send[0..buffer_recv.len], buffer_recv[0..]); + + // Second completion of zero-copy send. + // IORING_CQE_F_NOTIF in flags signals that kernel is done with send_buffer + cqe_send = try ring.copy_cqe(); + if (cqe_send.err() == .INVAL) return error.SkipZigTest; + try testing.expectEqual(linux.io_uring_cqe{ + .user_data = 0xeeeeeeee, + .res = 0, + .flags = linux.IORING_CQE_F_NOTIF, + }, cqe_send); +} From 256384a2ec2f67fc1b9380be987c63d5702f180c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Igor=20Anic=CC=81?= Date: Fri, 17 Nov 2023 13:48:17 +0100 Subject: [PATCH 2/8] io_uring: add direct operations Add operation on direct file descriptors. Also referred to as fixed or registered files. References: https://kernel.dk/axboe-kr2022.pdf https://lwn.net/Articles/863071/ Added functions: IO_Uring accept_direct accept_multishot_direct openat_direct close_direct socket socket_direct socket_direct_alloc Raw prepare operations: io_uring_prep_accept_direct io_uring_prep_multishot_accept_direct io_uring_prep_openat_direct io_uring_prep_close_direct io_uring_prep_socket io_uring_prep_socket_direct io_uring_prep_socket_direct_alloc Tested on this kernels: 5.4.0-164-generic 2559 passed; 70 skipped; 0 failed. 5.8.0-63-generic 2573 passed; 56 skipped; 0 failed. 5.11.0-49-generic 2576 passed; 53 skipped; 0 failed. 5.13.0-52-generic 2576 passed; 53 skipped; 0 failed. 5.15.0-87-generic 2579 passed; 50 skipped; 0 failed. 5.19.0-46-geneic 2584 passed; 45 skipped; 0 failed. 6.2.0-35-generic.log 2585 passed; 44 skipped; 0 failed. 6.5.0-9-generic 2585 passed; 44 skipped; 0 failed. --- lib/std/os/linux/io_uring.zig | 505 +++++++++++++++++++++++++++++++++- 1 file changed, 502 insertions(+), 3 deletions(-) diff --git a/lib/std/os/linux/io_uring.zig b/lib/std/os/linux/io_uring.zig index ff492a4bbc..be1fbe6893 100644 --- a/lib/std/os/linux/io_uring.zig +++ b/lib/std/os/linux/io_uring.zig @@ -505,10 +505,12 @@ pub const IO_Uring = struct { return sqe; } - /// Queues (but does not submit) an SQE to perform an multishot `accept4(2)` on a socket. + /// Queues an multishot accept on a socket. + /// /// Multishot variant allows an application to issue a single accept request, /// which will repeatedly trigger a CQE when a connection request comes in. - /// Returns a pointer to the SQE. + /// While IORING_CQE_F_MORE flag is set in CQE flags accept will generate + /// further CQEs. pub fn accept_multishot( self: *IO_Uring, user_data: u64, @@ -523,6 +525,44 @@ pub const IO_Uring = struct { return sqe; } + /// Queues an accept using direct (registered) file descriptors. + /// + /// To use an accept direct variant, the application must first have registered + /// a file table (with register_files). An unused table index will be + /// dynamically chosen and returned in the CQE res field. + /// + /// After creation, they can be used by setting IOSQE_FIXED_FILE in the SQE + /// flags member, and setting the SQE fd field to the direct descriptor value + /// rather than the regular file descriptor. + pub fn accept_direct( + self: *IO_Uring, + user_data: u64, + fd: os.fd_t, + addr: ?*os.sockaddr, + addrlen: ?*os.socklen_t, + flags: u32, + ) !*linux.io_uring_sqe { + const sqe = try self.get_sqe(); + io_uring_prep_accept_direct(sqe, fd, addr, addrlen, flags, linux.IORING_FILE_INDEX_ALLOC); + sqe.user_data = user_data; + return sqe; + } + + /// Queues an multishot accept using direct (registered) file descriptors. + pub fn accept_multishot_direct( + self: *IO_Uring, + user_data: u64, + fd: os.fd_t, + addr: ?*os.sockaddr, + addrlen: ?*os.socklen_t, + flags: u32, + ) !*linux.io_uring_sqe { + const sqe = try self.get_sqe(); + io_uring_prep_multishot_accept_direct(sqe, fd, addr, addrlen, flags); + sqe.user_data = user_data; + return sqe; + } + /// Queue (but does not submit) an SQE to perform a `connect(2)` on a socket. /// Returns a pointer to the SQE. pub fn connect( @@ -700,6 +740,30 @@ pub const IO_Uring = struct { return sqe; } + /// Queues an openat using direct (registered) file descriptors. + /// + /// To use an accept direct variant, the application must first have registered + /// a file table (with register_files). An unused table index will be + /// dynamically chosen and returned in the CQE res field. + /// + /// After creation, they can be used by setting IOSQE_FIXED_FILE in the SQE + /// flags member, and setting the SQE fd field to the direct descriptor value + /// rather than the regular file descriptor. + pub fn openat_direct( + self: *IO_Uring, + user_data: u64, + fd: os.fd_t, + path: [*:0]const u8, + flags: u32, + mode: os.mode_t, + file_index: u32, + ) !*linux.io_uring_sqe { + const sqe = try self.get_sqe(); + io_uring_prep_openat_direct(sqe, fd, path, flags, mode, file_index); + sqe.user_data = user_data; + return sqe; + } + /// Queues (but does not submit) an SQE to perform a `close(2)`. /// Returns a pointer to the SQE. pub fn close(self: *IO_Uring, user_data: u64, fd: os.fd_t) !*linux.io_uring_sqe { @@ -709,6 +773,14 @@ pub const IO_Uring = struct { return sqe; } + /// Queues close of registered file descriptor. + pub fn close_direct(self: *IO_Uring, user_data: u64, file_index: u32) !*linux.io_uring_sqe { + const sqe = try self.get_sqe(); + io_uring_prep_close_direct(sqe, file_index); + sqe.user_data = user_data; + return sqe; + } + /// Queues (but does not submit) an SQE to register a timeout operation. /// Returns a pointer to the SQE. /// @@ -1157,6 +1229,54 @@ pub const IO_Uring = struct { else => |errno| return os.unexpectedErrno(errno), } } + + /// Prepares a socket creation request. + /// New socket fd will be returned in completion result. + pub fn socket( + self: *IO_Uring, + user_data: u64, + domain: u32, + socket_type: u32, + protocol: u32, + flags: u32, + ) !*linux.io_uring_sqe { + const sqe = try self.get_sqe(); + io_uring_prep_socket(sqe, domain, socket_type, protocol, flags); + sqe.user_data = user_data; + return sqe; + } + + /// Prepares a socket creation request for registered file at index `file_index`. + pub fn socket_direct( + self: *IO_Uring, + user_data: u64, + domain: u32, + socket_type: u32, + protocol: u32, + flags: u32, + file_index: u32, + ) !*linux.io_uring_sqe { + const sqe = try self.get_sqe(); + io_uring_prep_socket_direct(sqe, domain, socket_type, protocol, flags, file_index); + sqe.user_data = user_data; + return sqe; + } + + /// Prepares a socket creation request for registered file, index chosen by kernel (file index alloc). + /// File index will be returned in CQE res field. + pub fn socket_direct_alloc( + self: *IO_Uring, + user_data: u64, + domain: u32, + socket_type: u32, + protocol: u32, + flags: u32, + ) !*linux.io_uring_sqe { + const sqe = try self.get_sqe(); + io_uring_prep_socket_direct_alloc(sqe, domain, socket_type, protocol, flags); + sqe.user_data = user_data; + return sqe; + } }; pub const SubmissionQueue = struct { @@ -1391,6 +1511,41 @@ pub fn io_uring_prep_accept( sqe.rw_flags = flags; } +pub fn io_uring_prep_accept_direct( + sqe: *linux.io_uring_sqe, + fd: os.fd_t, + addr: ?*os.sockaddr, + addrlen: ?*os.socklen_t, + flags: u32, + file_index: u32, +) void { + io_uring_prep_accept(sqe, fd, addr, addrlen, flags); + __io_uring_set_target_fixed_file(sqe, file_index); +} + +pub fn io_uring_prep_multishot_accept_direct( + sqe: *linux.io_uring_sqe, + fd: os.fd_t, + addr: ?*os.sockaddr, + addrlen: ?*os.socklen_t, + flags: u32, +) void { + io_uring_prep_multishot_accept(sqe, fd, addr, addrlen, flags); + __io_uring_set_target_fixed_file(sqe, linux.IORING_FILE_INDEX_ALLOC); +} + +fn __io_uring_set_target_fixed_file(sqe: *linux.io_uring_sqe, file_index: u32) void { + const sqe_file_index: u32 = if (file_index == linux.IORING_FILE_INDEX_ALLOC) + linux.IORING_FILE_INDEX_ALLOC + else + // 0 means no fixed files, indexes should be encoded as "index + 1" + file_index + 1; + // This filed is overloaded in liburing: + // splice_fd_in: i32 + // sqe_file_index: u32 + sqe.splice_fd_in = @bitCast(sqe_file_index); +} + pub fn io_uring_prep_connect( sqe: *linux.io_uring_sqe, fd: os.fd_t, @@ -1474,6 +1629,18 @@ pub fn io_uring_prep_openat( sqe.rw_flags = flags; } +pub fn io_uring_prep_openat_direct( + sqe: *linux.io_uring_sqe, + fd: os.fd_t, + path: [*:0]const u8, + flags: u32, + mode: os.mode_t, + file_index: u32, +) void { + io_uring_prep_openat(sqe, fd, path, flags, mode); + __io_uring_set_target_fixed_file(sqe, file_index); +} + pub fn io_uring_prep_close(sqe: *linux.io_uring_sqe, fd: os.fd_t) void { sqe.* = .{ .opcode = .CLOSE, @@ -1493,6 +1660,11 @@ pub fn io_uring_prep_close(sqe: *linux.io_uring_sqe, fd: os.fd_t) void { }; } +pub fn io_uring_prep_close_direct(sqe: *linux.io_uring_sqe, file_index: u32) void { + io_uring_prep_close(sqe, 0); + __io_uring_set_target_fixed_file(sqe, file_index); +} + pub fn io_uring_prep_timeout( sqe: *linux.io_uring_sqe, ts: *const os.linux.kernel_timespec, @@ -1720,6 +1892,40 @@ pub fn io_uring_prep_multishot_accept( sqe.ioprio |= linux.IORING_ACCEPT_MULTISHOT; } +pub fn io_uring_prep_socket( + sqe: *linux.io_uring_sqe, + domain: u32, + socket_type: u32, + protocol: u32, + flags: u32, +) void { + io_uring_prep_rw(.SOCKET, sqe, @intCast(domain), 0, protocol, socket_type); + sqe.rw_flags = flags; +} + +pub fn io_uring_prep_socket_direct( + sqe: *linux.io_uring_sqe, + domain: u32, + socket_type: u32, + protocol: u32, + flags: u32, + file_index: u32, +) void { + io_uring_prep_socket(sqe, domain, socket_type, protocol, flags); + __io_uring_set_target_fixed_file(sqe, file_index); +} + +pub fn io_uring_prep_socket_direct_alloc( + sqe: *linux.io_uring_sqe, + domain: u32, + socket_type: u32, + protocol: u32, + flags: u32, +) void { + io_uring_prep_socket(sqe, domain, socket_type, protocol, flags); + __io_uring_set_target_fixed_file(sqe, linux.IORING_FILE_INDEX_ALLOC); +} + test "structs/offsets/entries" { if (builtin.os.tag != .linux) return error.SkipZigTest; @@ -3582,7 +3788,8 @@ test "accept/connect/send_zc/recv" { const send = try ring.send_zc(0xeeeeeeee, socket_test_harness.client, buffer_send[0..], 0, 0); send.flags |= linux.IOSQE_IO_LINK; _ = try ring.recv(0xffffffff, socket_test_harness.server, .{ .buffer = buffer_recv[0..] }, 0); - try testing.expectEqual(@as(u32, 2), try ring.submit()); + const submitted = try ring.submit(); + if (submitted != 2) return error.SkipZigTest; // on kernel 5.8 (without zc support) // First completion of zero-copy send. // IORING_CQE_F_MORE, means that there @@ -3617,3 +3824,295 @@ test "accept/connect/send_zc/recv" { .flags = linux.IORING_CQE_F_NOTIF, }, cqe_send); } + +test "accept_direct" { + if (builtin.os.tag != .linux) return error.SkipZigTest; + + var ring = IO_Uring.init(1, 0) catch |err| switch (err) { + error.SystemOutdated => return error.SkipZigTest, + error.PermissionDenied => return error.SkipZigTest, + else => return err, + }; + defer ring.deinit(); + var address = try net.Address.parseIp4("127.0.0.1", 0); + + // register direct file descriptors + var registered_fds = [_]os.fd_t{-1} ** 2; + ring.register_files(registered_fds[0..]) catch |err| switch (err) { + error.FileDescriptorInvalid => return error.SkipZigTest, + else => return err, + }; + + const listener_socket = try createListenerSocket(&address); + defer os.closeSocket(listener_socket); + + const accept_userdata: u64 = 0xaaaaaaaa; + const read_userdata: u64 = 0xbbbbbbbb; + const data = [_]u8{ 0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 0xa, 0xb, 0xc, 0xd, 0xe }; + + for (0..2) |_| { + for (registered_fds, 0..) |_, i| { + var buffer_recv = [_]u8{0} ** 16; + const buffer_send: []const u8 = data[0 .. data.len - i]; // make it different at each loop + + // submit accept, will chose registered fd and return index in cqe + _ = try ring.accept_direct(accept_userdata, listener_socket, null, null, 0); + try testing.expectEqual(@as(u32, 1), try ring.submit()); + + // connect + var client = try os.socket(address.any.family, os.SOCK.STREAM | os.SOCK.CLOEXEC, 0); + try os.connect(client, &address.any, address.getOsSockLen()); + defer os.closeSocket(client); + + // accept completion + const cqe_accept = try ring.copy_cqe(); + if (cqe_accept.err() == .INVAL) return error.SkipZigTest; + try testing.expectEqual(os.E.SUCCESS, cqe_accept.err()); + const fd_index = cqe_accept.res; + if (fd_index >= registered_fds.len) return error.SkipZigTest; // old kernel fallback to ordinary accept + try testing.expect(fd_index < registered_fds.len); + try testing.expect(cqe_accept.user_data == accept_userdata); + + // send data + _ = try os.send(client, buffer_send, 0); + + // Example of how to use registered fd: + // Submit receive to fixed file returned by accept (fd_index). + // Fd field is set to registered file index, returned by accept. + // Flag linux.IOSQE_FIXED_FILE must be set. + const recv_sqe = try ring.recv(read_userdata, fd_index, .{ .buffer = &buffer_recv }, 0); + recv_sqe.flags |= linux.IOSQE_FIXED_FILE; + try testing.expectEqual(@as(u32, 1), try ring.submit()); + + // accept receive + const recv_cqe = try ring.copy_cqe(); + try testing.expect(recv_cqe.user_data == read_userdata); + try testing.expect(recv_cqe.res == buffer_send.len); + try testing.expectEqualSlices(u8, buffer_send, buffer_recv[0..buffer_send.len]); + } + // no more available fds, accept will get NFILE error + { + // submit accept + _ = try ring.accept_direct(accept_userdata, listener_socket, null, null, 0); + try testing.expectEqual(@as(u32, 1), try ring.submit()); + // connect + var client = try os.socket(address.any.family, os.SOCK.STREAM | os.SOCK.CLOEXEC, 0); + try os.connect(client, &address.any, address.getOsSockLen()); + defer os.closeSocket(client); + // completion with error + const cqe_accept = try ring.copy_cqe(); + try testing.expect(cqe_accept.user_data == accept_userdata); + try testing.expectEqual(os.E.NFILE, cqe_accept.err()); + } + // return file descriptors to kernel + try ring.register_files_update(0, registered_fds[0..]); + } + try ring.unregister_files(); +} + +test "accept_multishot_direct" { + if (builtin.os.tag != .linux) return error.SkipZigTest; + + var ring = IO_Uring.init(1, 0) catch |err| switch (err) { + error.SystemOutdated => return error.SkipZigTest, + error.PermissionDenied => return error.SkipZigTest, + else => return err, + }; + defer ring.deinit(); + var address = try net.Address.parseIp4("127.0.0.1", 0); + + var registered_fds = [_]os.fd_t{-1} ** 2; + ring.register_files(registered_fds[0..]) catch |err| switch (err) { + error.FileDescriptorInvalid => return error.SkipZigTest, + else => return err, + }; + + const listener_socket = try createListenerSocket(&address); + defer os.closeSocket(listener_socket); + + const accept_userdata: u64 = 0xaaaaaaaa; + + for (0..2) |_| { + // submit accept + // Will chose registered fd and return index of the selected registered file in cqe. + _ = try ring.accept_multishot_direct(accept_userdata, listener_socket, null, null, 0); + try testing.expectEqual(@as(u32, 1), try ring.submit()); + + for (registered_fds) |_| { + // connect + var client = try os.socket(address.any.family, os.SOCK.STREAM | os.SOCK.CLOEXEC, 0); + try os.connect(client, &address.any, address.getOsSockLen()); + defer os.closeSocket(client); + + // accept completion + const cqe_accept = try ring.copy_cqe(); + if (cqe_accept.err() == .INVAL) return error.SkipZigTest; + const fd_index = cqe_accept.res; + try testing.expect(fd_index < registered_fds.len); + try testing.expect(cqe_accept.user_data == accept_userdata); + try testing.expect(cqe_accept.flags & linux.IORING_CQE_F_MORE > 0); // has more is set + } + // No more available fds, accept will get NFILE error. + // Multishot is terminated (more flag is not set). + { + // connect + var client = try os.socket(address.any.family, os.SOCK.STREAM | os.SOCK.CLOEXEC, 0); + try os.connect(client, &address.any, address.getOsSockLen()); + defer os.closeSocket(client); + // completion with error + const cqe_accept = try ring.copy_cqe(); + try testing.expect(cqe_accept.user_data == accept_userdata); + try testing.expectEqual(os.E.NFILE, cqe_accept.err()); + try testing.expect(cqe_accept.flags & linux.IORING_CQE_F_MORE == 0); // has more is not set + } + // return file descriptors to kernel + try ring.register_files_update(0, registered_fds[0..]); + } + try ring.unregister_files(); +} + +test "socket/socket_direct/socket_direct_alloc/close_direct" { + if (builtin.os.tag != .linux) return error.SkipZigTest; + + var ring = IO_Uring.init(2, 0) catch |err| switch (err) { + error.SystemOutdated => return error.SkipZigTest, + error.PermissionDenied => return error.SkipZigTest, + else => return err, + }; + defer ring.deinit(); + var address = try net.Address.parseIp4("127.0.0.1", 0); + + // Below are 4 different ways to get socket fd. + // Two upfront before register_files, and two after + var registered_fds = [_]os.fd_t{-1} ** 4; + // 1. sync syscall socket call + registered_fds[0] = try os.socket(address.any.family, os.SOCK.STREAM | os.SOCK.CLOEXEC, 0); + // 2. io_uring socket + const socket_userdata = 0xcccccccc; + _ = try ring.socket(socket_userdata, linux.AF.INET, os.SOCK.STREAM, 0, 0); + try testing.expectEqual(@as(u32, 1), try ring.submit()); + var cqe_socket = try ring.copy_cqe(); + if (cqe_socket.err() == .INVAL) return error.SkipZigTest; + try testing.expectEqual(os.E.SUCCESS, cqe_socket.err()); + try testing.expect(cqe_socket.res > 2); + registered_fds[1] = cqe_socket.res; // set index 1 to created socket + + ring.register_files(registered_fds[0..]) catch |err| switch (err) { + error.FileDescriptorInvalid => return error.SkipZigTest, + else => return err, + }; + + // 3. io_uring socket_direct, create new socket on index 2 + _ = try ring.socket_direct(socket_userdata, linux.AF.INET, os.SOCK.STREAM, 0, 0, @intCast(2)); + try testing.expectEqual(@as(u32, 1), try ring.submit()); + cqe_socket = try ring.copy_cqe(); + try testing.expectEqual(os.E.SUCCESS, cqe_socket.err()); + try testing.expect(cqe_socket.res == 0); + + // 4. io_uring socket_direct_alloc + _ = try ring.socket_direct_alloc(socket_userdata, linux.AF.INET, os.SOCK.STREAM, 0, 0); + try testing.expectEqual(@as(u32, 1), try ring.submit()); + cqe_socket = try ring.copy_cqe(); + try testing.expectEqual(os.E.SUCCESS, cqe_socket.err()); + try testing.expect(cqe_socket.res == 3); + + // use sockets from registered_fds in connect operation + const listener_socket = try createListenerSocket(&address); + defer os.closeSocket(listener_socket); + const accept_userdata: u64 = 0xaaaaaaaa; + const connect_userdata: u64 = 0xbbbbbbbb; + const close_userdata: u64 = 0xcccccccc; + for (registered_fds, 0..) |_, fd_index| { + // prepare accept + _ = try ring.accept(accept_userdata, listener_socket, null, null, 0); + // prepare connect with fixed socket + const connect_sqe = try ring.connect(connect_userdata, @intCast(fd_index), &address.any, address.getOsSockLen()); + connect_sqe.flags |= linux.IOSQE_FIXED_FILE; + // submit both + try testing.expectEqual(@as(u32, 2), try ring.submit()); + // get completions + var cqe_connect = try ring.copy_cqe(); + if (cqe_connect.err() == .INVAL) return error.SkipZigTest; + var cqe_accept = try ring.copy_cqe(); + if (cqe_accept.err() == .INVAL) return error.SkipZigTest; + // ignore order + if (cqe_connect.user_data == accept_userdata and cqe_accept.user_data == connect_userdata) { + const a = cqe_accept; + const b = cqe_connect; + cqe_accept = b; + cqe_connect = a; + } + // test connect completion + try testing.expect(cqe_connect.user_data == connect_userdata); + try testing.expectEqual(os.E.SUCCESS, cqe_connect.err()); + // test accept completion + try testing.expect(cqe_accept.user_data == accept_userdata); + try testing.expectEqual(os.E.SUCCESS, cqe_accept.err()); + + // submit and test close completion + _ = try ring.close_direct(close_userdata, @intCast(fd_index)); + try testing.expectEqual(@as(u32, 1), try ring.submit()); + var cqe_close = try ring.copy_cqe(); + try testing.expect(cqe_close.user_data == close_userdata); + try testing.expectEqual(os.E.SUCCESS, cqe_close.err()); + } + + try ring.unregister_files(); +} + +test "openat_direct/close_direct" { + if (builtin.os.tag != .linux) return error.SkipZigTest; + + var ring = IO_Uring.init(2, 0) catch |err| switch (err) { + error.SystemOutdated => return error.SkipZigTest, + error.PermissionDenied => return error.SkipZigTest, + else => return err, + }; + defer ring.deinit(); + + var registered_fds = [_]os.fd_t{-1} ** 3; + ring.register_files(registered_fds[0..]) catch |err| switch (err) { + error.FileDescriptorInvalid => return error.SkipZigTest, + else => return err, + }; + + var tmp = std.testing.tmpDir(.{}); + defer tmp.cleanup(); + const path = "test_io_uring_close_direct"; + const flags: u32 = os.O.RDWR | os.O.CREAT; + const mode: os.mode_t = 0o666; + const user_data: u64 = 0; + + // use registered file at index 0 (last param) + _ = try ring.openat_direct(user_data, tmp.dir.fd, path, flags, mode, 0); + try testing.expectEqual(@as(u32, 1), try ring.submit()); + var cqe = try ring.copy_cqe(); + if (cqe.err() == .INVAL) return error.SkipZigTest; + if (cqe.res != 0) return error.SkipZigTest; // old kernel fallback to openat without direct + try testing.expectEqual(os.E.SUCCESS, cqe.err()); + try testing.expect(cqe.res == 0); + + // use registered file at index 1 + _ = try ring.openat_direct(user_data, tmp.dir.fd, path, flags, mode, 1); + try testing.expectEqual(@as(u32, 1), try ring.submit()); + cqe = try ring.copy_cqe(); + try testing.expectEqual(os.E.SUCCESS, cqe.err()); + try testing.expect(cqe.res == 0); // res is 0 when we specify index + + // let kernel choose registered file index + _ = try ring.openat_direct(user_data, tmp.dir.fd, path, flags, mode, linux.IORING_FILE_INDEX_ALLOC); + try testing.expectEqual(@as(u32, 1), try ring.submit()); + cqe = try ring.copy_cqe(); + if (cqe.err() == .INVAL) return error.SkipZigTest; // kernel 5.15 bug + try testing.expectEqual(os.E.SUCCESS, cqe.err()); + try testing.expect(cqe.res == 2); // chosen index is in res + + // close all open file descriptors + for (registered_fds, 0..) |_, fd_index| { + _ = try ring.close_direct(user_data, @intCast(fd_index)); + try testing.expectEqual(@as(u32, 1), try ring.submit()); + var cqe_close = try ring.copy_cqe(); + try testing.expectEqual(os.E.SUCCESS, cqe_close.err()); + } + try ring.unregister_files(); +} From 87dd8d56690b4e8187d9a2a9fda9c611199f430f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Igor=20Anic=CC=81?= Date: Fri, 17 Nov 2023 16:31:57 +0100 Subject: [PATCH 3/8] io_uring: simplify tests by checking kernel version So far we relied on getting EINVAL in CQE for operations that kernel don't support. The problem with that approach is that there are many other reasons (like wrong params) to get EINVAL. The other problem is when we have an operation that existed before and gets new behavior via different attributes, like accept and accept_direct. Older kernels can fall back to non direct operation although we set attributes for direct operation. Operation completes successfully in both cases but with different results. This commit introduces kernel version check at the start of the test. Making body of the test free of checking for various kernel version differences. Feature availability references: * https://manpages.debian.org/unstable/liburing-dev/io_uring_enter.2.en.html * https://kernel.dk/axboe-kr2022.pdf * https://github.com/Jiboo/zig/blob/5acf7969bc759ce51005924109924d9666f74429/lib/std/os/linux.zig#L3727 * https://github.com/Jiboo/zig/blob/5acf7969bc759ce51005924109924d9666f74429/lib/std/os/linux.zig#L3993 --- lib/std/os/linux/io_uring.zig | 157 ++++++++++++++++++---------------- 1 file changed, 83 insertions(+), 74 deletions(-) diff --git a/lib/std/os/linux/io_uring.zig b/lib/std/os/linux/io_uring.zig index be1fbe6893..c7865986df 100644 --- a/lib/std/os/linux/io_uring.zig +++ b/lib/std/os/linux/io_uring.zig @@ -491,6 +491,7 @@ pub const IO_Uring = struct { /// Queues (but does not submit) an SQE to perform an `accept4(2)` on a socket. /// Returns a pointer to the SQE. + /// Available since 5.5 pub fn accept( self: *IO_Uring, user_data: u64, @@ -511,6 +512,8 @@ pub const IO_Uring = struct { /// which will repeatedly trigger a CQE when a connection request comes in. /// While IORING_CQE_F_MORE flag is set in CQE flags accept will generate /// further CQEs. + /// + /// Available since 5.19 pub fn accept_multishot( self: *IO_Uring, user_data: u64, @@ -534,6 +537,8 @@ pub const IO_Uring = struct { /// After creation, they can be used by setting IOSQE_FIXED_FILE in the SQE /// flags member, and setting the SQE fd field to the direct descriptor value /// rather than the regular file descriptor. + /// + /// Available since 5.19 pub fn accept_direct( self: *IO_Uring, user_data: u64, @@ -549,6 +554,7 @@ pub const IO_Uring = struct { } /// Queues an multishot accept using direct (registered) file descriptors. + /// Available since 5.19 pub fn accept_multishot_direct( self: *IO_Uring, user_data: u64, @@ -726,6 +732,7 @@ pub const IO_Uring = struct { /// Queues (but does not submit) an SQE to perform an `openat(2)`. /// Returns a pointer to the SQE. + /// Available since 5.6. pub fn openat( self: *IO_Uring, user_data: u64, @@ -749,6 +756,8 @@ pub const IO_Uring = struct { /// After creation, they can be used by setting IOSQE_FIXED_FILE in the SQE /// flags member, and setting the SQE fd field to the direct descriptor value /// rather than the regular file descriptor. + /// + /// Available since 5.15 pub fn openat_direct( self: *IO_Uring, user_data: u64, @@ -766,6 +775,7 @@ pub const IO_Uring = struct { /// Queues (but does not submit) an SQE to perform a `close(2)`. /// Returns a pointer to the SQE. + /// Available since 5.6. pub fn close(self: *IO_Uring, user_data: u64, fd: os.fd_t) !*linux.io_uring_sqe { const sqe = try self.get_sqe(); io_uring_prep_close(sqe, fd); @@ -774,6 +784,7 @@ pub const IO_Uring = struct { } /// Queues close of registered file descriptor. + /// Available since 5.15 pub fn close_direct(self: *IO_Uring, user_data: u64, file_index: u32) !*linux.io_uring_sqe { const sqe = try self.get_sqe(); io_uring_prep_close_direct(sqe, file_index); @@ -1232,6 +1243,7 @@ pub const IO_Uring = struct { /// Prepares a socket creation request. /// New socket fd will be returned in completion result. + /// Available since 5.19 pub fn socket( self: *IO_Uring, user_data: u64, @@ -1247,6 +1259,7 @@ pub const IO_Uring = struct { } /// Prepares a socket creation request for registered file at index `file_index`. + /// Available since 5.19 pub fn socket_direct( self: *IO_Uring, user_data: u64, @@ -1264,6 +1277,7 @@ pub const IO_Uring = struct { /// Prepares a socket creation request for registered file, index chosen by kernel (file index alloc). /// File index will be returned in CQE res field. + /// Available since 5.19 pub fn socket_direct_alloc( self: *IO_Uring, user_data: u64, @@ -3826,22 +3840,15 @@ test "accept/connect/send_zc/recv" { } test "accept_direct" { - if (builtin.os.tag != .linux) return error.SkipZigTest; + try skipKernelLessThan(.{ .major = 5, .minor = 19, .patch = 0 }); - var ring = IO_Uring.init(1, 0) catch |err| switch (err) { - error.SystemOutdated => return error.SkipZigTest, - error.PermissionDenied => return error.SkipZigTest, - else => return err, - }; + var ring = try IO_Uring.init(1, 0); defer ring.deinit(); var address = try net.Address.parseIp4("127.0.0.1", 0); // register direct file descriptors var registered_fds = [_]os.fd_t{-1} ** 2; - ring.register_files(registered_fds[0..]) catch |err| switch (err) { - error.FileDescriptorInvalid => return error.SkipZigTest, - else => return err, - }; + try ring.register_files(registered_fds[0..]); const listener_socket = try createListenerSocket(&address); defer os.closeSocket(listener_socket); @@ -3866,10 +3873,8 @@ test "accept_direct" { // accept completion const cqe_accept = try ring.copy_cqe(); - if (cqe_accept.err() == .INVAL) return error.SkipZigTest; try testing.expectEqual(os.E.SUCCESS, cqe_accept.err()); const fd_index = cqe_accept.res; - if (fd_index >= registered_fds.len) return error.SkipZigTest; // old kernel fallback to ordinary accept try testing.expect(fd_index < registered_fds.len); try testing.expect(cqe_accept.user_data == accept_userdata); @@ -3911,21 +3916,15 @@ test "accept_direct" { } test "accept_multishot_direct" { - if (builtin.os.tag != .linux) return error.SkipZigTest; + try skipKernelLessThan(.{ .major = 5, .minor = 19, .patch = 0 }); - var ring = IO_Uring.init(1, 0) catch |err| switch (err) { - error.SystemOutdated => return error.SkipZigTest, - error.PermissionDenied => return error.SkipZigTest, - else => return err, - }; + var ring = try IO_Uring.init(1, 0); defer ring.deinit(); + var address = try net.Address.parseIp4("127.0.0.1", 0); var registered_fds = [_]os.fd_t{-1} ** 2; - ring.register_files(registered_fds[0..]) catch |err| switch (err) { - error.FileDescriptorInvalid => return error.SkipZigTest, - else => return err, - }; + try ring.register_files(registered_fds[0..]); const listener_socket = try createListenerSocket(&address); defer os.closeSocket(listener_socket); @@ -3933,7 +3932,7 @@ test "accept_multishot_direct" { const accept_userdata: u64 = 0xaaaaaaaa; for (0..2) |_| { - // submit accept + // submit multishot accept // Will chose registered fd and return index of the selected registered file in cqe. _ = try ring.accept_multishot_direct(accept_userdata, listener_socket, null, null, 0); try testing.expectEqual(@as(u32, 1), try ring.submit()); @@ -3946,7 +3945,6 @@ test "accept_multishot_direct" { // accept completion const cqe_accept = try ring.copy_cqe(); - if (cqe_accept.err() == .INVAL) return error.SkipZigTest; const fd_index = cqe_accept.res; try testing.expect(fd_index < registered_fds.len); try testing.expect(cqe_accept.user_data == accept_userdata); @@ -3971,52 +3969,58 @@ test "accept_multishot_direct" { try ring.unregister_files(); } -test "socket/socket_direct/socket_direct_alloc/close_direct" { - if (builtin.os.tag != .linux) return error.SkipZigTest; +test "socket" { + try skipKernelLessThan(.{ .major = 5, .minor = 19, .patch = 0 }); - var ring = IO_Uring.init(2, 0) catch |err| switch (err) { - error.SystemOutdated => return error.SkipZigTest, - error.PermissionDenied => return error.SkipZigTest, - else => return err, - }; + var ring = try IO_Uring.init(2, 0); defer ring.deinit(); - var address = try net.Address.parseIp4("127.0.0.1", 0); - // Below are 4 different ways to get socket fd. - // Two upfront before register_files, and two after - var registered_fds = [_]os.fd_t{-1} ** 4; - // 1. sync syscall socket call - registered_fds[0] = try os.socket(address.any.family, os.SOCK.STREAM | os.SOCK.CLOEXEC, 0); - // 2. io_uring socket - const socket_userdata = 0xcccccccc; - _ = try ring.socket(socket_userdata, linux.AF.INET, os.SOCK.STREAM, 0, 0); + // prepare, submit socket operation + _ = try ring.socket(0, linux.AF.INET, os.SOCK.STREAM, 0, 0); + try testing.expectEqual(@as(u32, 1), try ring.submit()); + + // test completion + var cqe = try ring.copy_cqe(); + try testing.expectEqual(os.E.SUCCESS, cqe.err()); + const fd: os.fd_t = @intCast(cqe.res); + try testing.expect(fd > 2); + + os.close(fd); +} + +test "socket_direct/socket_direct_alloc/close_direct" { + try skipKernelLessThan(.{ .major = 5, .minor = 19, .patch = 0 }); + + var ring = try IO_Uring.init(2, 0); + defer ring.deinit(); + + var registered_fds = [_]os.fd_t{-1} ** 3; + try ring.register_files(registered_fds[0..]); + + // create socket in registered file descriptor at index 0 (last param) + _ = try ring.socket_direct(0, linux.AF.INET, os.SOCK.STREAM, 0, 0, 0); try testing.expectEqual(@as(u32, 1), try ring.submit()); var cqe_socket = try ring.copy_cqe(); - if (cqe_socket.err() == .INVAL) return error.SkipZigTest; - try testing.expectEqual(os.E.SUCCESS, cqe_socket.err()); - try testing.expect(cqe_socket.res > 2); - registered_fds[1] = cqe_socket.res; // set index 1 to created socket - - ring.register_files(registered_fds[0..]) catch |err| switch (err) { - error.FileDescriptorInvalid => return error.SkipZigTest, - else => return err, - }; - - // 3. io_uring socket_direct, create new socket on index 2 - _ = try ring.socket_direct(socket_userdata, linux.AF.INET, os.SOCK.STREAM, 0, 0, @intCast(2)); - try testing.expectEqual(@as(u32, 1), try ring.submit()); - cqe_socket = try ring.copy_cqe(); try testing.expectEqual(os.E.SUCCESS, cqe_socket.err()); try testing.expect(cqe_socket.res == 0); - // 4. io_uring socket_direct_alloc - _ = try ring.socket_direct_alloc(socket_userdata, linux.AF.INET, os.SOCK.STREAM, 0, 0); + // create socket in registered file descriptor at index 1 (last param) + _ = try ring.socket_direct(0, linux.AF.INET, os.SOCK.STREAM, 0, 0, 1); try testing.expectEqual(@as(u32, 1), try ring.submit()); cqe_socket = try ring.copy_cqe(); try testing.expectEqual(os.E.SUCCESS, cqe_socket.err()); - try testing.expect(cqe_socket.res == 3); + try testing.expect(cqe_socket.res == 0); // res is 0 when index is specified + + // create socket in kernel chosen file descriptor index (_alloc version) + // completion res has index from registered files + _ = try ring.socket_direct_alloc(0, linux.AF.INET, os.SOCK.STREAM, 0, 0); + try testing.expectEqual(@as(u32, 1), try ring.submit()); + cqe_socket = try ring.copy_cqe(); + try testing.expectEqual(os.E.SUCCESS, cqe_socket.err()); + try testing.expect(cqe_socket.res == 2); // returns registered file index // use sockets from registered_fds in connect operation + var address = try net.Address.parseIp4("127.0.0.1", 0); const listener_socket = try createListenerSocket(&address); defer os.closeSocket(listener_socket); const accept_userdata: u64 = 0xaaaaaaaa; @@ -4027,14 +4031,12 @@ test "socket/socket_direct/socket_direct_alloc/close_direct" { _ = try ring.accept(accept_userdata, listener_socket, null, null, 0); // prepare connect with fixed socket const connect_sqe = try ring.connect(connect_userdata, @intCast(fd_index), &address.any, address.getOsSockLen()); - connect_sqe.flags |= linux.IOSQE_FIXED_FILE; + connect_sqe.flags |= linux.IOSQE_FIXED_FILE; // fd is fixed file index // submit both try testing.expectEqual(@as(u32, 2), try ring.submit()); // get completions var cqe_connect = try ring.copy_cqe(); - if (cqe_connect.err() == .INVAL) return error.SkipZigTest; var cqe_accept = try ring.copy_cqe(); - if (cqe_accept.err() == .INVAL) return error.SkipZigTest; // ignore order if (cqe_connect.user_data == accept_userdata and cqe_accept.user_data == connect_userdata) { const a = cqe_accept; @@ -4049,7 +4051,7 @@ test "socket/socket_direct/socket_direct_alloc/close_direct" { try testing.expect(cqe_accept.user_data == accept_userdata); try testing.expectEqual(os.E.SUCCESS, cqe_accept.err()); - // submit and test close completion + // submit and test close_direct _ = try ring.close_direct(close_userdata, @intCast(fd_index)); try testing.expectEqual(@as(u32, 1), try ring.submit()); var cqe_close = try ring.copy_cqe(); @@ -4061,20 +4063,13 @@ test "socket/socket_direct/socket_direct_alloc/close_direct" { } test "openat_direct/close_direct" { - if (builtin.os.tag != .linux) return error.SkipZigTest; + try skipKernelLessThan(.{ .major = 5, .minor = 19, .patch = 0 }); - var ring = IO_Uring.init(2, 0) catch |err| switch (err) { - error.SystemOutdated => return error.SkipZigTest, - error.PermissionDenied => return error.SkipZigTest, - else => return err, - }; + var ring = try IO_Uring.init(2, 0); defer ring.deinit(); var registered_fds = [_]os.fd_t{-1} ** 3; - ring.register_files(registered_fds[0..]) catch |err| switch (err) { - error.FileDescriptorInvalid => return error.SkipZigTest, - else => return err, - }; + try ring.register_files(registered_fds[0..]); var tmp = std.testing.tmpDir(.{}); defer tmp.cleanup(); @@ -4087,8 +4082,6 @@ test "openat_direct/close_direct" { _ = try ring.openat_direct(user_data, tmp.dir.fd, path, flags, mode, 0); try testing.expectEqual(@as(u32, 1), try ring.submit()); var cqe = try ring.copy_cqe(); - if (cqe.err() == .INVAL) return error.SkipZigTest; - if (cqe.res != 0) return error.SkipZigTest; // old kernel fallback to openat without direct try testing.expectEqual(os.E.SUCCESS, cqe.err()); try testing.expect(cqe.res == 0); @@ -4103,7 +4096,6 @@ test "openat_direct/close_direct" { _ = try ring.openat_direct(user_data, tmp.dir.fd, path, flags, mode, linux.IORING_FILE_INDEX_ALLOC); try testing.expectEqual(@as(u32, 1), try ring.submit()); cqe = try ring.copy_cqe(); - if (cqe.err() == .INVAL) return error.SkipZigTest; // kernel 5.15 bug try testing.expectEqual(os.E.SUCCESS, cqe.err()); try testing.expect(cqe.res == 2); // chosen index is in res @@ -4116,3 +4108,20 @@ test "openat_direct/close_direct" { } try ring.unregister_files(); } + +/// For use in tests. Returns SkipZigTest is kernel version is less than required. +fn skipKernelLessThan(required: std.SemanticVersion) !void { + if (builtin.os.tag != .linux) return error.SkipZigTest; + + var uts: linux.utsname = undefined; + const res = linux.uname(&uts); + switch (linux.getErrno(res)) { + .SUCCESS => {}, + else => |errno| return os.unexpectedErrno(errno), + } + + const release = mem.sliceTo(&uts.release, 0); + var current = try std.SemanticVersion.parse(release); + current.pre = null; // don't check pre field + if (required.order(current) == .gt) return error.SkipZigTest; +} From 257b8131ec3133d3d30cf26d0836545aa95da2d3 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Igor=20Anic=CC=81?= Date: Fri, 17 Nov 2023 17:21:54 +0100 Subject: [PATCH 4/8] io_uring: use kernel version test in send_zc test --- lib/std/os/linux/io_uring.zig | 18 +++++++++--------- 1 file changed, 9 insertions(+), 9 deletions(-) diff --git a/lib/std/os/linux/io_uring.zig b/lib/std/os/linux/io_uring.zig index c7865986df..a6120c15fb 100644 --- a/lib/std/os/linux/io_uring.zig +++ b/lib/std/os/linux/io_uring.zig @@ -616,6 +616,7 @@ pub const IO_Uring = struct { /// Queues (but does not submit) an SQE to perform a `recv(2)`. /// Returns a pointer to the SQE. + /// Available since 5.6 pub fn recv( self: *IO_Uring, user_data: u64, @@ -639,6 +640,7 @@ pub const IO_Uring = struct { /// Queues (but does not submit) an SQE to perform a `send(2)`. /// Returns a pointer to the SQE. + /// Available since 5.6 pub fn send( self: *IO_Uring, user_data: u64, @@ -654,6 +656,7 @@ pub const IO_Uring = struct { /// Queues (but does not submit) an SQE to perform an async zerocopy `send(2)`. /// Returns a pointer to the SQE. + /// Available since 6.0 pub fn send_zc( self: *IO_Uring, user_data: u64, @@ -670,6 +673,7 @@ pub const IO_Uring = struct { /// Queues (but does not submit) an SQE to perform an async zerocopy `send(2)`. /// Returns a pointer to the SQE. + /// Available since 6.0 pub fn send_zc_fixed( self: *IO_Uring, user_data: u64, @@ -687,6 +691,7 @@ pub const IO_Uring = struct { /// Queues (but does not submit) an SQE to perform a `recvmsg(2)`. /// Returns a pointer to the SQE. + /// Available since 5.3 pub fn recvmsg( self: *IO_Uring, user_data: u64, @@ -702,6 +707,7 @@ pub const IO_Uring = struct { /// Queues (but does not submit) an SQE to perform a `sendmsg(2)`. /// Returns a pointer to the SQE. + /// Available since 5.3 pub fn sendmsg( self: *IO_Uring, user_data: u64, @@ -717,6 +723,7 @@ pub const IO_Uring = struct { /// Queues (but does not submit) an SQE to perform an async zerocopy `sendmsg(2)`. /// Returns a pointer to the SQE. + /// Available since 6.1 pub fn sendmsg_zc( self: *IO_Uring, user_data: u64, @@ -3783,13 +3790,9 @@ test "accept multishot" { } test "accept/connect/send_zc/recv" { - if (builtin.os.tag != .linux) return error.SkipZigTest; + try skipKernelLessThan(.{ .major = 6, .minor = 0, .patch = 0 }); - var ring = IO_Uring.init(16, 0) catch |err| switch (err) { - error.SystemOutdated => return error.SkipZigTest, - error.PermissionDenied => return error.SkipZigTest, - else => return err, - }; + var ring = try IO_Uring.init(16, 0); defer ring.deinit(); const socket_test_harness = try createSocketTestHarness(&ring); @@ -3811,7 +3814,6 @@ test "accept/connect/send_zc/recv" { // request, with the user_data field set to the same value. // buffer_send must be keep alive until second cqe. var cqe_send = try ring.copy_cqe(); - if (cqe_send.err() == .INVAL) return error.SkipZigTest; try testing.expectEqual(linux.io_uring_cqe{ .user_data = 0xeeeeeeee, .res = buffer_send.len, @@ -3819,7 +3821,6 @@ test "accept/connect/send_zc/recv" { }, cqe_send); const cqe_recv = try ring.copy_cqe(); - if (cqe_recv.err() == .INVAL) return error.SkipZigTest; try testing.expectEqual(linux.io_uring_cqe{ .user_data = 0xffffffff, .res = buffer_recv.len, @@ -3831,7 +3832,6 @@ test "accept/connect/send_zc/recv" { // Second completion of zero-copy send. // IORING_CQE_F_NOTIF in flags signals that kernel is done with send_buffer cqe_send = try ring.copy_cqe(); - if (cqe_send.err() == .INVAL) return error.SkipZigTest; try testing.expectEqual(linux.io_uring_cqe{ .user_data = 0xeeeeeeee, .res = 0, From a7001b86f1dccb3e10aca5666b8fd9aed042400f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Igor=20Anic=CC=81?= Date: Fri, 17 Nov 2023 19:58:23 +0100 Subject: [PATCH 5/8] io_uring: include review comments Thanks @rootbeer for review. This adds description to send_zc behavior. Cleans up tests. --- lib/std/os/linux/io_uring.zig | 30 +++++++++++++++--------------- 1 file changed, 15 insertions(+), 15 deletions(-) diff --git a/lib/std/os/linux/io_uring.zig b/lib/std/os/linux/io_uring.zig index a6120c15fb..d2be31d5af 100644 --- a/lib/std/os/linux/io_uring.zig +++ b/lib/std/os/linux/io_uring.zig @@ -655,7 +655,19 @@ pub const IO_Uring = struct { } /// Queues (but does not submit) an SQE to perform an async zerocopy `send(2)`. - /// Returns a pointer to the SQE. + /// + /// This operation will most likely produce two CQEs. The flags field of the + /// first cqe may likely contain IORING_CQE_F_MORE, which means that there will + /// be a second cqe with the user_data field set to the same value. The user + /// must not modify the data buffer until the notification is posted. The first + /// cqe follows the usual rules and so its res field will contain the number of + /// bytes sent or a negative error code. The notification's res field will be + /// set to zero and the flags field will contain IORING_CQE_F_NOTIF. The two + /// step model is needed because the kernel may hold on to buffers for a long + /// time, e.g. waiting for a TCP ACK. Notifications responsible for controlling + /// the lifetime of the buffers. Even errored requests may generate a + /// notification. + /// /// Available since 6.0 pub fn send_zc( self: *IO_Uring, @@ -3805,8 +3817,7 @@ test "accept/connect/send_zc/recv" { const send = try ring.send_zc(0xeeeeeeee, socket_test_harness.client, buffer_send[0..], 0, 0); send.flags |= linux.IOSQE_IO_LINK; _ = try ring.recv(0xffffffff, socket_test_harness.server, .{ .buffer = buffer_recv[0..] }, 0); - const submitted = try ring.submit(); - if (submitted != 2) return error.SkipZigTest; // on kernel 5.8 (without zc support) + try testing.expectEqual(@as(u32, 2), try ring.submit()); // First completion of zero-copy send. // IORING_CQE_F_MORE, means that there @@ -4112,16 +4123,5 @@ test "openat_direct/close_direct" { /// For use in tests. Returns SkipZigTest is kernel version is less than required. fn skipKernelLessThan(required: std.SemanticVersion) !void { if (builtin.os.tag != .linux) return error.SkipZigTest; - - var uts: linux.utsname = undefined; - const res = linux.uname(&uts); - switch (linux.getErrno(res)) { - .SUCCESS => {}, - else => |errno| return os.unexpectedErrno(errno), - } - - const release = mem.sliceTo(&uts.release, 0); - var current = try std.SemanticVersion.parse(release); - current.pre = null; // don't check pre field - if (required.order(current) == .gt) return error.SkipZigTest; + if (required.order(builtin.os.version_range.linux.range.max) == .gt) return error.SkipZigTest; } From ccf5a6cc5c5070a130f4879069390933f9c738f1 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Igor=20Anic=CC=81?= Date: Fri, 17 Nov 2023 22:17:00 +0100 Subject: [PATCH 6/8] io_uring: make Linux version check runtime instead od comptime Reverting previous change. I'm building test bin and then running it in virtual machines with different kernels. So Linux kernel checks has to be runtime instead of comptime. --- lib/std/os/linux/io_uring.zig | 13 ++++++++++++- 1 file changed, 12 insertions(+), 1 deletion(-) diff --git a/lib/std/os/linux/io_uring.zig b/lib/std/os/linux/io_uring.zig index d2be31d5af..52bb74848c 100644 --- a/lib/std/os/linux/io_uring.zig +++ b/lib/std/os/linux/io_uring.zig @@ -4123,5 +4123,16 @@ test "openat_direct/close_direct" { /// For use in tests. Returns SkipZigTest is kernel version is less than required. fn skipKernelLessThan(required: std.SemanticVersion) !void { if (builtin.os.tag != .linux) return error.SkipZigTest; - if (required.order(builtin.os.version_range.linux.range.max) == .gt) return error.SkipZigTest; + + var uts: linux.utsname = undefined; + const res = linux.uname(&uts); + switch (linux.getErrno(res)) { + .SUCCESS => {}, + else => |errno| return os.unexpectedErrno(errno), + } + + const release = mem.sliceTo(&uts.release, 0); + var current = try std.SemanticVersion.parse(release); + current.pre = null; // don't check pre field + if (required.order(current) == .gt) return error.SkipZigTest; } From 7eee0d3353c669fdf01999b319ab37883a107754 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Igor=20Anic=CC=81?= Date: Fri, 17 Nov 2023 23:39:29 +0100 Subject: [PATCH 7/8] io_uring: try to pass windows/macos tests --- lib/std/os/linux/io_uring.zig | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lib/std/os/linux/io_uring.zig b/lib/std/os/linux/io_uring.zig index 52bb74848c..cc34ad0319 100644 --- a/lib/std/os/linux/io_uring.zig +++ b/lib/std/os/linux/io_uring.zig @@ -4121,7 +4121,7 @@ test "openat_direct/close_direct" { } /// For use in tests. Returns SkipZigTest is kernel version is less than required. -fn skipKernelLessThan(required: std.SemanticVersion) !void { +inline fn skipKernelLessThan(required: std.SemanticVersion) !void { if (builtin.os.tag != .linux) return error.SkipZigTest; var uts: linux.utsname = undefined; From be4a77d9aaf724bb11b6d55457b5be80b60b4aa8 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Igor=20Anic=CC=81?= Date: Sat, 18 Nov 2023 05:40:24 +0100 Subject: [PATCH 8/8] io_uring: fix ci failing test Getting this error in ci: run test std-arm-linux-none-generic-Debug: error: 'test.accept/connect/send_zc/recv' failed: /home/ci/actions-runner1/_work/zig/zig/lib/std/os/linux/io_uring.zig:60:23: 0x70b06b in init_params (test) .NOSYS => return error.SystemOutdated, ^ /home/ci/actions-runner1/_work/zig/zig/lib/std/os/linux/io_uring.zig:27:16: 0x70b6b7 in init (test) return try IO_Uring.init_params(entries, ¶ms); ^ /home/ci/actions-runner1/_work/zig/zig/lib/std/os/linux/io_uring.zig:3807:16: 0x72405b in test.accept/connect/send_zc/recv (test) var ring = try IO_Uring.init(16, 0); https://github.com/ziglang/zig/actions/runs/6909813408/job/18801841015?pr=18025 --- lib/std/os/linux/io_uring.zig | 36 +++++++++++++++++++++++++++++------ 1 file changed, 30 insertions(+), 6 deletions(-) diff --git a/lib/std/os/linux/io_uring.zig b/lib/std/os/linux/io_uring.zig index cc34ad0319..6da25cf628 100644 --- a/lib/std/os/linux/io_uring.zig +++ b/lib/std/os/linux/io_uring.zig @@ -3804,7 +3804,11 @@ test "accept multishot" { test "accept/connect/send_zc/recv" { try skipKernelLessThan(.{ .major = 6, .minor = 0, .patch = 0 }); - var ring = try IO_Uring.init(16, 0); + var ring = IO_Uring.init(16, 0) catch |err| switch (err) { + error.SystemOutdated => return error.SkipZigTest, + error.PermissionDenied => return error.SkipZigTest, + else => return err, + }; defer ring.deinit(); const socket_test_harness = try createSocketTestHarness(&ring); @@ -3853,7 +3857,11 @@ test "accept/connect/send_zc/recv" { test "accept_direct" { try skipKernelLessThan(.{ .major = 5, .minor = 19, .patch = 0 }); - var ring = try IO_Uring.init(1, 0); + var ring = IO_Uring.init(1, 0) catch |err| switch (err) { + error.SystemOutdated => return error.SkipZigTest, + error.PermissionDenied => return error.SkipZigTest, + else => return err, + }; defer ring.deinit(); var address = try net.Address.parseIp4("127.0.0.1", 0); @@ -3929,7 +3937,11 @@ test "accept_direct" { test "accept_multishot_direct" { try skipKernelLessThan(.{ .major = 5, .minor = 19, .patch = 0 }); - var ring = try IO_Uring.init(1, 0); + var ring = IO_Uring.init(1, 0) catch |err| switch (err) { + error.SystemOutdated => return error.SkipZigTest, + error.PermissionDenied => return error.SkipZigTest, + else => return err, + }; defer ring.deinit(); var address = try net.Address.parseIp4("127.0.0.1", 0); @@ -3983,7 +3995,11 @@ test "accept_multishot_direct" { test "socket" { try skipKernelLessThan(.{ .major = 5, .minor = 19, .patch = 0 }); - var ring = try IO_Uring.init(2, 0); + var ring = IO_Uring.init(1, 0) catch |err| switch (err) { + error.SystemOutdated => return error.SkipZigTest, + error.PermissionDenied => return error.SkipZigTest, + else => return err, + }; defer ring.deinit(); // prepare, submit socket operation @@ -4002,7 +4018,11 @@ test "socket" { test "socket_direct/socket_direct_alloc/close_direct" { try skipKernelLessThan(.{ .major = 5, .minor = 19, .patch = 0 }); - var ring = try IO_Uring.init(2, 0); + var ring = IO_Uring.init(2, 0) catch |err| switch (err) { + error.SystemOutdated => return error.SkipZigTest, + error.PermissionDenied => return error.SkipZigTest, + else => return err, + }; defer ring.deinit(); var registered_fds = [_]os.fd_t{-1} ** 3; @@ -4076,7 +4096,11 @@ test "socket_direct/socket_direct_alloc/close_direct" { test "openat_direct/close_direct" { try skipKernelLessThan(.{ .major = 5, .minor = 19, .patch = 0 }); - var ring = try IO_Uring.init(2, 0); + var ring = IO_Uring.init(2, 0) catch |err| switch (err) { + error.SystemOutdated => return error.SkipZigTest, + error.PermissionDenied => return error.SkipZigTest, + else => return err, + }; defer ring.deinit(); var registered_fds = [_]os.fd_t{-1} ** 3;