diff --git a/lib/std/os/linux/io_uring.zig b/lib/std/os/linux/io_uring.zig index 50e3453bb8..ae8cb1518a 100644 --- a/lib/std/os/linux/io_uring.zig +++ b/lib/std/os/linux/io_uring.zig @@ -441,22 +441,75 @@ pub const IO_Uring = struct { sqe.user_data = user_data; return sqe; } - - /// Like `link_with_next_sqe()` but stronger. - /// For when you don't want the chain to fail in the event of a completion result error. - /// For example, you may know that some commands will fail and may want the chain to continue. - /// Hard links are resilient to completion results, but are not resilient to submission errors. - pub fn hardlink_with_next_sqe(self: *IO_Uring, sqe: *io_uring_sqe) void { - sqe.flags |= linux.IOSQE_IO_HARDLINK; + + /// Queue (but does not submit) an SQE to perform a `connect(2)` on a socket. + /// Returns a pointer to the SQE. + pub fn connect( + self: *IO_Uring, + user_data: u64, + fd: os.fd_t, + addr: *const os.sockaddr, + addrlen: os.socklen_t + ) !*io_uring_sqe { + const sqe = try self.get_sqe(); + io_uring_prep_connect(sqe, fd, addr, addrlen); + sqe.user_data = user_data; + return sqe; } - - /// This creates a full pipeline barrier in the submission queue. - /// This SQE will not be started until previous SQEs complete. - /// Subsequent SQEs will not be started until this SQE completes. - /// In other words, this stalls the entire submission queue. - /// You should first consider using link_with_next_sqe() for more granular SQE sequence control. - pub fn drain_previous_sqes(self: *IO_Uring, sqe: *io_uring_sqe) void { - sqe.flags |= linux.IOSQE_IO_DRAIN; + + /// Queues (but does not submit) an SQE to perform a `recv(2)`. + /// Returns a pointer to the SQE. + pub fn recv( + self: *IO_Uring, + user_data: u64, + fd: os.fd_t, + buffer: []u8, + flags: u32 + ) !*io_uring_sqe { + const sqe = try self.get_sqe(); + io_uring_prep_recv(sqe, fd, buffer, flags); + sqe.user_data = user_data; + return sqe; + } + + /// Queues (but does not submit) an SQE to perform a `send(2)`. + /// Returns a pointer to the SQE. + pub fn send( + self: *IO_Uring, + user_data: u64, + fd: os.fd_t, + buffer: []u8, + flags: u32 + ) !*io_uring_sqe { + const sqe = try self.get_sqe(); + io_uring_prep_send(sqe, fd, buffer, flags); + sqe.user_data = user_data; + return sqe; + } + + /// Queues (but does not submit) an SQE to perform an `openat(2)`. + /// Returns a pointer to the SQE. + pub fn openat( + self: *IO_Uring, + user_data: u64, + fd: os.fd_t, + path: [*:0]const u8, + flags: u32, + mode: os.mode_t + ) !*io_uring_sqe { + const sqe = try self.get_sqe(); + io_uring_prep_openat(sqe, fd, path, flags, mode); + sqe.user_data = user_data; + return sqe; + } + + /// Queues (but does not submit) an SQE to perform a `close(2)`. + /// Returns a pointer to the SQE. + pub fn close(self: *IO_Uring, user_data: u64, fd: os.fd_t) !*io_uring_sqe { + const sqe = try self.get_sqe(); + io_uring_prep_close(sqe, fd); + sqe.user_data = user_data; + return sqe; } /// Registers an array of file descriptors. @@ -1007,3 +1060,133 @@ test "write/read" { }, cqe_read); testing.expectEqualSlices(u8, buffer_write[0..], buffer_read[0..]); } + +test "openat/close" { + if (builtin.os.tag != .linux) return error.SkipZigTest; + + var ring = IO_Uring.init(1, 0) catch |err| switch (err) { + error.SystemOutdated => return error.SkipZigTest, + error.PermissionDenied => return error.SkipZigTest, + else => return err + }; + defer ring.deinit(); + + const path = "test_io_uring_openat_close"; + defer std.fs.cwd().deleteFile(path) catch {}; + + const flags: u32 = os.O_CLOEXEC | os.O_RDWR | os.O_CREAT; + const mode: os.mode_t = 0o666; + var sqe_openat = try ring.openat(789, linux.AT_FDCWD, path, flags, mode); + testing.expectEqual(io_uring_sqe { + .opcode = .OPENAT, + .flags = 0, + .ioprio = 0, + .fd = linux.AT_FDCWD, + .off = 0, + .addr = @ptrToInt(path), + .len = mode, + .rw_flags = flags, + .user_data = 789, + .buf_index = 0, + .personality = 0, + .splice_fd_in = 0, + .__pad2 = [2]u64{ 0, 0 } + }, sqe_openat.*); + testing.expectEqual(@as(u32, 1), try ring.submit()); + + var cqe_openat = try ring.copy_cqe(); + if (cqe_openat.res == -linux.EINVAL) return error.SkipZigTest; + testing.expectEqual(@as(u64, 789), cqe_openat.user_data); + testing.expect(cqe_openat.res > 0); + testing.expectEqual(@as(u32, 0), cqe_openat.flags); + + var sqe_close = try ring.close(1011, cqe_openat.res); + testing.expectEqual(linux.IORING_OP.CLOSE, sqe_close.opcode); + testing.expectEqual(cqe_openat.res, sqe_close.fd); + testing.expectEqual(@as(u32, 1), try ring.submit()); + + var cqe_close = try ring.copy_cqe(); + if (cqe_close.res == -linux.EINVAL) return error.SkipZigTest; + testing.expectEqual(linux.io_uring_cqe { + .user_data = 1011, + .res = 0, + .flags = 0, + }, cqe_close); +} + +test "accept/connect/send/recv" { + if (builtin.os.tag != .linux) return error.SkipZigTest; + + var ring = IO_Uring.init(16, 0) catch |err| switch (err) { + error.SystemOutdated => return error.SkipZigTest, + error.PermissionDenied => return error.SkipZigTest, + else => return err + }; + defer ring.deinit(); + + var address = try net.Address.parseIp4("127.0.0.1", 3131); + const kernel_backlog = 1; + const server = try os.socket(address.any.family, os.SOCK_STREAM | os.SOCK_CLOEXEC, 0); + defer os.close(server); + try os.setsockopt(server, os.SOL_SOCKET, os.SO_REUSEADDR, &mem.toBytes(@as(c_int, 1))); + try os.bind(server, &address.any, address.getOsSockLen()); + try os.listen(server, kernel_backlog); + + var buffer_send = [_]u8{1,0,1,0,1,0,1,0,1,0}; + var buffer_recv = [_]u8{0,1,0,1,0}; + + var accept_addr: os.sockaddr = undefined; + var accept_addr_len: os.socklen_t = @sizeOf(@TypeOf(accept_addr)); + var accept = try ring.accept(0xaaaaaaaa, server, &accept_addr, &accept_addr_len, 0); + testing.expectEqual(@as(u32, 1), try ring.submit()); + + const client = try os.socket(address.any.family, os.SOCK_STREAM | os.SOCK_CLOEXEC, 0); + defer os.close(client); + var connect = try ring.connect(0xcccccccc, client, &address.any, address.getOsSockLen()); + testing.expectEqual(@as(u32, 1), try ring.submit()); + + var cqe_accept = try ring.copy_cqe(); + if (cqe_accept.res == -linux.EINVAL) return error.SkipZigTest; + var cqe_connect = try ring.copy_cqe(); + if (cqe_connect.res == -linux.EINVAL) return error.SkipZigTest; + + // The accept/connect CQEs may arrive in any order, the connect CQE will sometimes come first: + if (cqe_accept.user_data == 0xcccccccc and cqe_connect.user_data == 0xaaaaaaaa) { + var a = cqe_accept; + var b = cqe_connect; + cqe_accept = b; + cqe_connect = a; + } + + testing.expectEqual(@as(u64, 0xaaaaaaaa), cqe_accept.user_data); + testing.expect(cqe_accept.res > 0); + testing.expectEqual(@as(u32, 0), cqe_accept.flags); + testing.expectEqual(linux.io_uring_cqe { + .user_data = 0xcccccccc, + .res = 0, + .flags = 0, + }, cqe_connect); + + var send = try ring.send(0xeeeeeeee, client, buffer_send[0..], 0); + send.flags |= linux.IOSQE_IO_LINK; + var recv = try ring.recv(0xffffffff, cqe_accept.res, buffer_recv[0..], 0); + testing.expectEqual(@as(u32, 2), try ring.submit()); + + var cqe_send = try ring.copy_cqe(); + if (cqe_send.res == -linux.EINVAL) return error.SkipZigTest; + testing.expectEqual(linux.io_uring_cqe { + .user_data = 0xeeeeeeee, + .res = buffer_send.len, + .flags = 0, + }, cqe_send); + + var cqe_recv = try ring.copy_cqe(); + if (cqe_recv.res == -linux.EINVAL) return error.SkipZigTest; + testing.expectEqual(linux.io_uring_cqe { + .user_data = 0xffffffff, + .res = buffer_recv.len, + .flags = 0, + }, cqe_recv); + + testing.expectEqualSlices(u8, buffer_send[0..buffer_recv.len], buffer_recv[0..]); +}