From 256384a2ec2f67fc1b9380be987c63d5702f180c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Igor=20Anic=CC=81?= Date: Fri, 17 Nov 2023 13:48:17 +0100 Subject: [PATCH] io_uring: add direct operations Add operation on direct file descriptors. Also referred to as fixed or registered files. References: https://kernel.dk/axboe-kr2022.pdf https://lwn.net/Articles/863071/ Added functions: IO_Uring accept_direct accept_multishot_direct openat_direct close_direct socket socket_direct socket_direct_alloc Raw prepare operations: io_uring_prep_accept_direct io_uring_prep_multishot_accept_direct io_uring_prep_openat_direct io_uring_prep_close_direct io_uring_prep_socket io_uring_prep_socket_direct io_uring_prep_socket_direct_alloc Tested on this kernels: 5.4.0-164-generic 2559 passed; 70 skipped; 0 failed. 5.8.0-63-generic 2573 passed; 56 skipped; 0 failed. 5.11.0-49-generic 2576 passed; 53 skipped; 0 failed. 5.13.0-52-generic 2576 passed; 53 skipped; 0 failed. 5.15.0-87-generic 2579 passed; 50 skipped; 0 failed. 5.19.0-46-geneic 2584 passed; 45 skipped; 0 failed. 6.2.0-35-generic.log 2585 passed; 44 skipped; 0 failed. 6.5.0-9-generic 2585 passed; 44 skipped; 0 failed. --- lib/std/os/linux/io_uring.zig | 505 +++++++++++++++++++++++++++++++++- 1 file changed, 502 insertions(+), 3 deletions(-) diff --git a/lib/std/os/linux/io_uring.zig b/lib/std/os/linux/io_uring.zig index ff492a4bbc..be1fbe6893 100644 --- a/lib/std/os/linux/io_uring.zig +++ b/lib/std/os/linux/io_uring.zig @@ -505,10 +505,12 @@ pub const IO_Uring = struct { return sqe; } - /// Queues (but does not submit) an SQE to perform an multishot `accept4(2)` on a socket. + /// Queues an multishot accept on a socket. + /// /// Multishot variant allows an application to issue a single accept request, /// which will repeatedly trigger a CQE when a connection request comes in. - /// Returns a pointer to the SQE. + /// While IORING_CQE_F_MORE flag is set in CQE flags accept will generate + /// further CQEs. pub fn accept_multishot( self: *IO_Uring, user_data: u64, @@ -523,6 +525,44 @@ pub const IO_Uring = struct { return sqe; } + /// Queues an accept using direct (registered) file descriptors. + /// + /// To use an accept direct variant, the application must first have registered + /// a file table (with register_files). An unused table index will be + /// dynamically chosen and returned in the CQE res field. + /// + /// After creation, they can be used by setting IOSQE_FIXED_FILE in the SQE + /// flags member, and setting the SQE fd field to the direct descriptor value + /// rather than the regular file descriptor. + pub fn accept_direct( + self: *IO_Uring, + user_data: u64, + fd: os.fd_t, + addr: ?*os.sockaddr, + addrlen: ?*os.socklen_t, + flags: u32, + ) !*linux.io_uring_sqe { + const sqe = try self.get_sqe(); + io_uring_prep_accept_direct(sqe, fd, addr, addrlen, flags, linux.IORING_FILE_INDEX_ALLOC); + sqe.user_data = user_data; + return sqe; + } + + /// Queues an multishot accept using direct (registered) file descriptors. + pub fn accept_multishot_direct( + self: *IO_Uring, + user_data: u64, + fd: os.fd_t, + addr: ?*os.sockaddr, + addrlen: ?*os.socklen_t, + flags: u32, + ) !*linux.io_uring_sqe { + const sqe = try self.get_sqe(); + io_uring_prep_multishot_accept_direct(sqe, fd, addr, addrlen, flags); + sqe.user_data = user_data; + return sqe; + } + /// Queue (but does not submit) an SQE to perform a `connect(2)` on a socket. /// Returns a pointer to the SQE. pub fn connect( @@ -700,6 +740,30 @@ pub const IO_Uring = struct { return sqe; } + /// Queues an openat using direct (registered) file descriptors. + /// + /// To use an accept direct variant, the application must first have registered + /// a file table (with register_files). An unused table index will be + /// dynamically chosen and returned in the CQE res field. + /// + /// After creation, they can be used by setting IOSQE_FIXED_FILE in the SQE + /// flags member, and setting the SQE fd field to the direct descriptor value + /// rather than the regular file descriptor. + pub fn openat_direct( + self: *IO_Uring, + user_data: u64, + fd: os.fd_t, + path: [*:0]const u8, + flags: u32, + mode: os.mode_t, + file_index: u32, + ) !*linux.io_uring_sqe { + const sqe = try self.get_sqe(); + io_uring_prep_openat_direct(sqe, fd, path, flags, mode, file_index); + sqe.user_data = user_data; + return sqe; + } + /// Queues (but does not submit) an SQE to perform a `close(2)`. /// Returns a pointer to the SQE. pub fn close(self: *IO_Uring, user_data: u64, fd: os.fd_t) !*linux.io_uring_sqe { @@ -709,6 +773,14 @@ pub const IO_Uring = struct { return sqe; } + /// Queues close of registered file descriptor. + pub fn close_direct(self: *IO_Uring, user_data: u64, file_index: u32) !*linux.io_uring_sqe { + const sqe = try self.get_sqe(); + io_uring_prep_close_direct(sqe, file_index); + sqe.user_data = user_data; + return sqe; + } + /// Queues (but does not submit) an SQE to register a timeout operation. /// Returns a pointer to the SQE. /// @@ -1157,6 +1229,54 @@ pub const IO_Uring = struct { else => |errno| return os.unexpectedErrno(errno), } } + + /// Prepares a socket creation request. + /// New socket fd will be returned in completion result. + pub fn socket( + self: *IO_Uring, + user_data: u64, + domain: u32, + socket_type: u32, + protocol: u32, + flags: u32, + ) !*linux.io_uring_sqe { + const sqe = try self.get_sqe(); + io_uring_prep_socket(sqe, domain, socket_type, protocol, flags); + sqe.user_data = user_data; + return sqe; + } + + /// Prepares a socket creation request for registered file at index `file_index`. + pub fn socket_direct( + self: *IO_Uring, + user_data: u64, + domain: u32, + socket_type: u32, + protocol: u32, + flags: u32, + file_index: u32, + ) !*linux.io_uring_sqe { + const sqe = try self.get_sqe(); + io_uring_prep_socket_direct(sqe, domain, socket_type, protocol, flags, file_index); + sqe.user_data = user_data; + return sqe; + } + + /// Prepares a socket creation request for registered file, index chosen by kernel (file index alloc). + /// File index will be returned in CQE res field. + pub fn socket_direct_alloc( + self: *IO_Uring, + user_data: u64, + domain: u32, + socket_type: u32, + protocol: u32, + flags: u32, + ) !*linux.io_uring_sqe { + const sqe = try self.get_sqe(); + io_uring_prep_socket_direct_alloc(sqe, domain, socket_type, protocol, flags); + sqe.user_data = user_data; + return sqe; + } }; pub const SubmissionQueue = struct { @@ -1391,6 +1511,41 @@ pub fn io_uring_prep_accept( sqe.rw_flags = flags; } +pub fn io_uring_prep_accept_direct( + sqe: *linux.io_uring_sqe, + fd: os.fd_t, + addr: ?*os.sockaddr, + addrlen: ?*os.socklen_t, + flags: u32, + file_index: u32, +) void { + io_uring_prep_accept(sqe, fd, addr, addrlen, flags); + __io_uring_set_target_fixed_file(sqe, file_index); +} + +pub fn io_uring_prep_multishot_accept_direct( + sqe: *linux.io_uring_sqe, + fd: os.fd_t, + addr: ?*os.sockaddr, + addrlen: ?*os.socklen_t, + flags: u32, +) void { + io_uring_prep_multishot_accept(sqe, fd, addr, addrlen, flags); + __io_uring_set_target_fixed_file(sqe, linux.IORING_FILE_INDEX_ALLOC); +} + +fn __io_uring_set_target_fixed_file(sqe: *linux.io_uring_sqe, file_index: u32) void { + const sqe_file_index: u32 = if (file_index == linux.IORING_FILE_INDEX_ALLOC) + linux.IORING_FILE_INDEX_ALLOC + else + // 0 means no fixed files, indexes should be encoded as "index + 1" + file_index + 1; + // This filed is overloaded in liburing: + // splice_fd_in: i32 + // sqe_file_index: u32 + sqe.splice_fd_in = @bitCast(sqe_file_index); +} + pub fn io_uring_prep_connect( sqe: *linux.io_uring_sqe, fd: os.fd_t, @@ -1474,6 +1629,18 @@ pub fn io_uring_prep_openat( sqe.rw_flags = flags; } +pub fn io_uring_prep_openat_direct( + sqe: *linux.io_uring_sqe, + fd: os.fd_t, + path: [*:0]const u8, + flags: u32, + mode: os.mode_t, + file_index: u32, +) void { + io_uring_prep_openat(sqe, fd, path, flags, mode); + __io_uring_set_target_fixed_file(sqe, file_index); +} + pub fn io_uring_prep_close(sqe: *linux.io_uring_sqe, fd: os.fd_t) void { sqe.* = .{ .opcode = .CLOSE, @@ -1493,6 +1660,11 @@ pub fn io_uring_prep_close(sqe: *linux.io_uring_sqe, fd: os.fd_t) void { }; } +pub fn io_uring_prep_close_direct(sqe: *linux.io_uring_sqe, file_index: u32) void { + io_uring_prep_close(sqe, 0); + __io_uring_set_target_fixed_file(sqe, file_index); +} + pub fn io_uring_prep_timeout( sqe: *linux.io_uring_sqe, ts: *const os.linux.kernel_timespec, @@ -1720,6 +1892,40 @@ pub fn io_uring_prep_multishot_accept( sqe.ioprio |= linux.IORING_ACCEPT_MULTISHOT; } +pub fn io_uring_prep_socket( + sqe: *linux.io_uring_sqe, + domain: u32, + socket_type: u32, + protocol: u32, + flags: u32, +) void { + io_uring_prep_rw(.SOCKET, sqe, @intCast(domain), 0, protocol, socket_type); + sqe.rw_flags = flags; +} + +pub fn io_uring_prep_socket_direct( + sqe: *linux.io_uring_sqe, + domain: u32, + socket_type: u32, + protocol: u32, + flags: u32, + file_index: u32, +) void { + io_uring_prep_socket(sqe, domain, socket_type, protocol, flags); + __io_uring_set_target_fixed_file(sqe, file_index); +} + +pub fn io_uring_prep_socket_direct_alloc( + sqe: *linux.io_uring_sqe, + domain: u32, + socket_type: u32, + protocol: u32, + flags: u32, +) void { + io_uring_prep_socket(sqe, domain, socket_type, protocol, flags); + __io_uring_set_target_fixed_file(sqe, linux.IORING_FILE_INDEX_ALLOC); +} + test "structs/offsets/entries" { if (builtin.os.tag != .linux) return error.SkipZigTest; @@ -3582,7 +3788,8 @@ test "accept/connect/send_zc/recv" { const send = try ring.send_zc(0xeeeeeeee, socket_test_harness.client, buffer_send[0..], 0, 0); send.flags |= linux.IOSQE_IO_LINK; _ = try ring.recv(0xffffffff, socket_test_harness.server, .{ .buffer = buffer_recv[0..] }, 0); - try testing.expectEqual(@as(u32, 2), try ring.submit()); + const submitted = try ring.submit(); + if (submitted != 2) return error.SkipZigTest; // on kernel 5.8 (without zc support) // First completion of zero-copy send. // IORING_CQE_F_MORE, means that there @@ -3617,3 +3824,295 @@ test "accept/connect/send_zc/recv" { .flags = linux.IORING_CQE_F_NOTIF, }, cqe_send); } + +test "accept_direct" { + if (builtin.os.tag != .linux) return error.SkipZigTest; + + var ring = IO_Uring.init(1, 0) catch |err| switch (err) { + error.SystemOutdated => return error.SkipZigTest, + error.PermissionDenied => return error.SkipZigTest, + else => return err, + }; + defer ring.deinit(); + var address = try net.Address.parseIp4("127.0.0.1", 0); + + // register direct file descriptors + var registered_fds = [_]os.fd_t{-1} ** 2; + ring.register_files(registered_fds[0..]) catch |err| switch (err) { + error.FileDescriptorInvalid => return error.SkipZigTest, + else => return err, + }; + + const listener_socket = try createListenerSocket(&address); + defer os.closeSocket(listener_socket); + + const accept_userdata: u64 = 0xaaaaaaaa; + const read_userdata: u64 = 0xbbbbbbbb; + const data = [_]u8{ 0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 0xa, 0xb, 0xc, 0xd, 0xe }; + + for (0..2) |_| { + for (registered_fds, 0..) |_, i| { + var buffer_recv = [_]u8{0} ** 16; + const buffer_send: []const u8 = data[0 .. data.len - i]; // make it different at each loop + + // submit accept, will chose registered fd and return index in cqe + _ = try ring.accept_direct(accept_userdata, listener_socket, null, null, 0); + try testing.expectEqual(@as(u32, 1), try ring.submit()); + + // connect + var client = try os.socket(address.any.family, os.SOCK.STREAM | os.SOCK.CLOEXEC, 0); + try os.connect(client, &address.any, address.getOsSockLen()); + defer os.closeSocket(client); + + // accept completion + const cqe_accept = try ring.copy_cqe(); + if (cqe_accept.err() == .INVAL) return error.SkipZigTest; + try testing.expectEqual(os.E.SUCCESS, cqe_accept.err()); + const fd_index = cqe_accept.res; + if (fd_index >= registered_fds.len) return error.SkipZigTest; // old kernel fallback to ordinary accept + try testing.expect(fd_index < registered_fds.len); + try testing.expect(cqe_accept.user_data == accept_userdata); + + // send data + _ = try os.send(client, buffer_send, 0); + + // Example of how to use registered fd: + // Submit receive to fixed file returned by accept (fd_index). + // Fd field is set to registered file index, returned by accept. + // Flag linux.IOSQE_FIXED_FILE must be set. + const recv_sqe = try ring.recv(read_userdata, fd_index, .{ .buffer = &buffer_recv }, 0); + recv_sqe.flags |= linux.IOSQE_FIXED_FILE; + try testing.expectEqual(@as(u32, 1), try ring.submit()); + + // accept receive + const recv_cqe = try ring.copy_cqe(); + try testing.expect(recv_cqe.user_data == read_userdata); + try testing.expect(recv_cqe.res == buffer_send.len); + try testing.expectEqualSlices(u8, buffer_send, buffer_recv[0..buffer_send.len]); + } + // no more available fds, accept will get NFILE error + { + // submit accept + _ = try ring.accept_direct(accept_userdata, listener_socket, null, null, 0); + try testing.expectEqual(@as(u32, 1), try ring.submit()); + // connect + var client = try os.socket(address.any.family, os.SOCK.STREAM | os.SOCK.CLOEXEC, 0); + try os.connect(client, &address.any, address.getOsSockLen()); + defer os.closeSocket(client); + // completion with error + const cqe_accept = try ring.copy_cqe(); + try testing.expect(cqe_accept.user_data == accept_userdata); + try testing.expectEqual(os.E.NFILE, cqe_accept.err()); + } + // return file descriptors to kernel + try ring.register_files_update(0, registered_fds[0..]); + } + try ring.unregister_files(); +} + +test "accept_multishot_direct" { + if (builtin.os.tag != .linux) return error.SkipZigTest; + + var ring = IO_Uring.init(1, 0) catch |err| switch (err) { + error.SystemOutdated => return error.SkipZigTest, + error.PermissionDenied => return error.SkipZigTest, + else => return err, + }; + defer ring.deinit(); + var address = try net.Address.parseIp4("127.0.0.1", 0); + + var registered_fds = [_]os.fd_t{-1} ** 2; + ring.register_files(registered_fds[0..]) catch |err| switch (err) { + error.FileDescriptorInvalid => return error.SkipZigTest, + else => return err, + }; + + const listener_socket = try createListenerSocket(&address); + defer os.closeSocket(listener_socket); + + const accept_userdata: u64 = 0xaaaaaaaa; + + for (0..2) |_| { + // submit accept + // Will chose registered fd and return index of the selected registered file in cqe. + _ = try ring.accept_multishot_direct(accept_userdata, listener_socket, null, null, 0); + try testing.expectEqual(@as(u32, 1), try ring.submit()); + + for (registered_fds) |_| { + // connect + var client = try os.socket(address.any.family, os.SOCK.STREAM | os.SOCK.CLOEXEC, 0); + try os.connect(client, &address.any, address.getOsSockLen()); + defer os.closeSocket(client); + + // accept completion + const cqe_accept = try ring.copy_cqe(); + if (cqe_accept.err() == .INVAL) return error.SkipZigTest; + const fd_index = cqe_accept.res; + try testing.expect(fd_index < registered_fds.len); + try testing.expect(cqe_accept.user_data == accept_userdata); + try testing.expect(cqe_accept.flags & linux.IORING_CQE_F_MORE > 0); // has more is set + } + // No more available fds, accept will get NFILE error. + // Multishot is terminated (more flag is not set). + { + // connect + var client = try os.socket(address.any.family, os.SOCK.STREAM | os.SOCK.CLOEXEC, 0); + try os.connect(client, &address.any, address.getOsSockLen()); + defer os.closeSocket(client); + // completion with error + const cqe_accept = try ring.copy_cqe(); + try testing.expect(cqe_accept.user_data == accept_userdata); + try testing.expectEqual(os.E.NFILE, cqe_accept.err()); + try testing.expect(cqe_accept.flags & linux.IORING_CQE_F_MORE == 0); // has more is not set + } + // return file descriptors to kernel + try ring.register_files_update(0, registered_fds[0..]); + } + try ring.unregister_files(); +} + +test "socket/socket_direct/socket_direct_alloc/close_direct" { + if (builtin.os.tag != .linux) return error.SkipZigTest; + + var ring = IO_Uring.init(2, 0) catch |err| switch (err) { + error.SystemOutdated => return error.SkipZigTest, + error.PermissionDenied => return error.SkipZigTest, + else => return err, + }; + defer ring.deinit(); + var address = try net.Address.parseIp4("127.0.0.1", 0); + + // Below are 4 different ways to get socket fd. + // Two upfront before register_files, and two after + var registered_fds = [_]os.fd_t{-1} ** 4; + // 1. sync syscall socket call + registered_fds[0] = try os.socket(address.any.family, os.SOCK.STREAM | os.SOCK.CLOEXEC, 0); + // 2. io_uring socket + const socket_userdata = 0xcccccccc; + _ = try ring.socket(socket_userdata, linux.AF.INET, os.SOCK.STREAM, 0, 0); + try testing.expectEqual(@as(u32, 1), try ring.submit()); + var cqe_socket = try ring.copy_cqe(); + if (cqe_socket.err() == .INVAL) return error.SkipZigTest; + try testing.expectEqual(os.E.SUCCESS, cqe_socket.err()); + try testing.expect(cqe_socket.res > 2); + registered_fds[1] = cqe_socket.res; // set index 1 to created socket + + ring.register_files(registered_fds[0..]) catch |err| switch (err) { + error.FileDescriptorInvalid => return error.SkipZigTest, + else => return err, + }; + + // 3. io_uring socket_direct, create new socket on index 2 + _ = try ring.socket_direct(socket_userdata, linux.AF.INET, os.SOCK.STREAM, 0, 0, @intCast(2)); + try testing.expectEqual(@as(u32, 1), try ring.submit()); + cqe_socket = try ring.copy_cqe(); + try testing.expectEqual(os.E.SUCCESS, cqe_socket.err()); + try testing.expect(cqe_socket.res == 0); + + // 4. io_uring socket_direct_alloc + _ = try ring.socket_direct_alloc(socket_userdata, linux.AF.INET, os.SOCK.STREAM, 0, 0); + try testing.expectEqual(@as(u32, 1), try ring.submit()); + cqe_socket = try ring.copy_cqe(); + try testing.expectEqual(os.E.SUCCESS, cqe_socket.err()); + try testing.expect(cqe_socket.res == 3); + + // use sockets from registered_fds in connect operation + const listener_socket = try createListenerSocket(&address); + defer os.closeSocket(listener_socket); + const accept_userdata: u64 = 0xaaaaaaaa; + const connect_userdata: u64 = 0xbbbbbbbb; + const close_userdata: u64 = 0xcccccccc; + for (registered_fds, 0..) |_, fd_index| { + // prepare accept + _ = try ring.accept(accept_userdata, listener_socket, null, null, 0); + // prepare connect with fixed socket + const connect_sqe = try ring.connect(connect_userdata, @intCast(fd_index), &address.any, address.getOsSockLen()); + connect_sqe.flags |= linux.IOSQE_FIXED_FILE; + // submit both + try testing.expectEqual(@as(u32, 2), try ring.submit()); + // get completions + var cqe_connect = try ring.copy_cqe(); + if (cqe_connect.err() == .INVAL) return error.SkipZigTest; + var cqe_accept = try ring.copy_cqe(); + if (cqe_accept.err() == .INVAL) return error.SkipZigTest; + // ignore order + if (cqe_connect.user_data == accept_userdata and cqe_accept.user_data == connect_userdata) { + const a = cqe_accept; + const b = cqe_connect; + cqe_accept = b; + cqe_connect = a; + } + // test connect completion + try testing.expect(cqe_connect.user_data == connect_userdata); + try testing.expectEqual(os.E.SUCCESS, cqe_connect.err()); + // test accept completion + try testing.expect(cqe_accept.user_data == accept_userdata); + try testing.expectEqual(os.E.SUCCESS, cqe_accept.err()); + + // submit and test close completion + _ = try ring.close_direct(close_userdata, @intCast(fd_index)); + try testing.expectEqual(@as(u32, 1), try ring.submit()); + var cqe_close = try ring.copy_cqe(); + try testing.expect(cqe_close.user_data == close_userdata); + try testing.expectEqual(os.E.SUCCESS, cqe_close.err()); + } + + try ring.unregister_files(); +} + +test "openat_direct/close_direct" { + if (builtin.os.tag != .linux) return error.SkipZigTest; + + var ring = IO_Uring.init(2, 0) catch |err| switch (err) { + error.SystemOutdated => return error.SkipZigTest, + error.PermissionDenied => return error.SkipZigTest, + else => return err, + }; + defer ring.deinit(); + + var registered_fds = [_]os.fd_t{-1} ** 3; + ring.register_files(registered_fds[0..]) catch |err| switch (err) { + error.FileDescriptorInvalid => return error.SkipZigTest, + else => return err, + }; + + var tmp = std.testing.tmpDir(.{}); + defer tmp.cleanup(); + const path = "test_io_uring_close_direct"; + const flags: u32 = os.O.RDWR | os.O.CREAT; + const mode: os.mode_t = 0o666; + const user_data: u64 = 0; + + // use registered file at index 0 (last param) + _ = try ring.openat_direct(user_data, tmp.dir.fd, path, flags, mode, 0); + try testing.expectEqual(@as(u32, 1), try ring.submit()); + var cqe = try ring.copy_cqe(); + if (cqe.err() == .INVAL) return error.SkipZigTest; + if (cqe.res != 0) return error.SkipZigTest; // old kernel fallback to openat without direct + try testing.expectEqual(os.E.SUCCESS, cqe.err()); + try testing.expect(cqe.res == 0); + + // use registered file at index 1 + _ = try ring.openat_direct(user_data, tmp.dir.fd, path, flags, mode, 1); + try testing.expectEqual(@as(u32, 1), try ring.submit()); + cqe = try ring.copy_cqe(); + try testing.expectEqual(os.E.SUCCESS, cqe.err()); + try testing.expect(cqe.res == 0); // res is 0 when we specify index + + // let kernel choose registered file index + _ = try ring.openat_direct(user_data, tmp.dir.fd, path, flags, mode, linux.IORING_FILE_INDEX_ALLOC); + try testing.expectEqual(@as(u32, 1), try ring.submit()); + cqe = try ring.copy_cqe(); + if (cqe.err() == .INVAL) return error.SkipZigTest; // kernel 5.15 bug + try testing.expectEqual(os.E.SUCCESS, cqe.err()); + try testing.expect(cqe.res == 2); // chosen index is in res + + // close all open file descriptors + for (registered_fds, 0..) |_, fd_index| { + _ = try ring.close_direct(user_data, @intCast(fd_index)); + try testing.expectEqual(@as(u32, 1), try ring.submit()); + var cqe_close = try ring.copy_cqe(); + try testing.expectEqual(os.E.SUCCESS, cqe_close.err()); + } + try ring.unregister_files(); +}