diff --git a/lib/std/os/linux/io_uring.zig b/lib/std/os/linux/io_uring.zig index be8b442611..31c416c8a1 100644 --- a/lib/std/os/linux/io_uring.zig +++ b/lib/std/os/linux/io_uring.zig @@ -358,17 +358,46 @@ pub const IO_Uring = struct { return sqe; } - /// Queues (but does not submit) an SQE to perform a `read(2)`. + /// Used to select how the read should be handled. + pub const ReadBuffer = union(enum) { + /// io_uring will read directly into this buffer + buffer: []u8, + + /// io_uring will read directly into these buffers using readv. + iovecs: []const os.iovec, + + /// io_uring will select a buffer that has previously been provided with `provide_buffers`. + /// The buffer group reference by `group_id` must contain at least one buffer for the read to work. + /// `len` controls the number of bytes to read into the selected buffer. + buffer_selection: struct { + group_id: u16, + len: usize, + }, + }; + + /// Queues (but does not submit) an SQE to perform a `read(2)` or `preadv` depending on the buffer type. + /// * Reading into a `ReadBuffer.buffer` uses `read(2)` + /// * Reading into a `ReadBuffer.iovecs` uses `preadv(2)` + /// If you want to do a `preadv2()` then set `rw_flags` on the returned SQE. See https://linux.die.net/man/2/preadv. + /// /// Returns a pointer to the SQE. pub fn read( self: *IO_Uring, user_data: u64, fd: os.fd_t, - buffer: []u8, + buffer: ReadBuffer, offset: u64, ) !*io_uring_sqe { const sqe = try self.get_sqe(); - io_uring_prep_read(sqe, fd, buffer, offset); + switch (buffer) { + .buffer => |slice| io_uring_prep_read(sqe, fd, slice, offset), + .iovecs => |vecs| io_uring_prep_readv(sqe, fd, vecs, offset), + .buffer_selection => |selection| { + io_uring_prep_rw(.READ, sqe, fd, 0, selection.len, offset); + sqe.flags |= linux.IOSQE_BUFFER_SELECT; + sqe.buf_index = selection.group_id; + }, + } sqe.user_data = user_data; return sqe; } @@ -388,23 +417,6 @@ pub const IO_Uring = struct { return sqe; } - /// Queues (but does not submit) an SQE to perform a `preadv()`. - /// Returns a pointer to the SQE so that you can further modify the SQE for advanced use cases. - /// For example, if you want to do a `preadv2()` then set `rw_flags` on the returned SQE. - /// See https://linux.die.net/man/2/preadv. - pub fn readv( - self: *IO_Uring, - user_data: u64, - fd: os.fd_t, - iovecs: []const os.iovec, - offset: u64, - ) !*io_uring_sqe { - const sqe = try self.get_sqe(); - io_uring_prep_readv(sqe, fd, iovecs, offset); - sqe.user_data = user_data; - return sqe; - } - /// Queues (but does not submit) an SQE to perform a IORING_OP_READ_FIXED. /// The `buffer` provided must be registered with the kernel by calling `register_buffers` first. /// The `buffer_index` must be the same as its index in the array provided to `register_buffers`. @@ -507,17 +519,39 @@ pub const IO_Uring = struct { return sqe; } + /// Used to select how the recv call should be handled. + pub const RecvBuffer = union(enum) { + /// io_uring will recv directly into this buffer + buffer: []u8, + + /// io_uring will select a buffer that has previously been provided with `provide_buffers`. + /// The buffer group referenced by `group_id` must contain at least one buffer for the recv call to work. + /// `len` controls the number of bytes to read into the selected buffer. + buffer_selection: struct { + group_id: u16, + len: usize, + }, + }; + /// Queues (but does not submit) an SQE to perform a `recv(2)`. /// Returns a pointer to the SQE. pub fn recv( self: *IO_Uring, user_data: u64, fd: os.fd_t, - buffer: []u8, + buffer: RecvBuffer, flags: u32, ) !*io_uring_sqe { const sqe = try self.get_sqe(); - io_uring_prep_recv(sqe, fd, buffer, flags); + switch (buffer) { + .buffer => |slice| io_uring_prep_recv(sqe, fd, slice, flags), + .buffer_selection => |selection| { + io_uring_prep_rw(.RECV, sqe, fd, 0, selection.len, 0); + sqe.rw_flags = flags; + sqe.flags |= linux.IOSQE_BUFFER_SELECT; + sqe.buf_index = selection.group_id; + }, + } sqe.user_data = user_data; return sqe; } @@ -857,6 +891,41 @@ pub const IO_Uring = struct { return sqe; } + /// Queues (but does not submit) an SQE to provide a group of buffers used for commands that read/receive data. + /// Returns a pointer to the SQE. + /// + /// Provided buffers can be used in `read`, `recv` or `recvmsg` commands via .buffer_selection. + /// + /// The kernel expects a contiguous block of memory of size (buffers_count * buffer_size). + pub fn provide_buffers( + self: *IO_Uring, + user_data: u64, + buffers: [*]u8, + buffers_count: usize, + buffer_size: usize, + group_id: usize, + buffer_id: usize, + ) !*io_uring_sqe { + const sqe = try self.get_sqe(); + io_uring_prep_provide_buffers(sqe, buffers, buffers_count, buffer_size, group_id, buffer_id); + sqe.user_data = user_data; + return sqe; + } + + /// Queues (but does not submit) an SQE to remove a group of provided buffers. + /// Returns a pointer to the SQE. + pub fn remove_buffers( + self: *IO_Uring, + user_data: u64, + buffers_count: usize, + group_id: usize, + ) !*io_uring_sqe { + const sqe = try self.get_sqe(); + io_uring_prep_remove_buffers(sqe, buffers_count, group_id); + sqe.user_data = user_data; + return sqe; + } + /// Registers an array of file descriptors. /// Every time a file descriptor is put in an SQE and submitted to the kernel, the kernel must /// retrieve a reference to the file, and once I/O has completed the file reference must be @@ -1508,6 +1577,28 @@ pub fn io_uring_prep_linkat( sqe.rw_flags = flags; } +pub fn io_uring_prep_provide_buffers( + sqe: *io_uring_sqe, + buffers: [*]u8, + num: usize, + buffer_len: usize, + group_id: usize, + buffer_id: usize, +) void { + const ptr = @ptrToInt(buffers); + io_uring_prep_rw(.PROVIDE_BUFFERS, sqe, @intCast(i32, num), ptr, buffer_len, buffer_id); + sqe.buf_index = @intCast(u16, group_id); +} + +pub fn io_uring_prep_remove_buffers( + sqe: *io_uring_sqe, + num: usize, + group_id: usize, +) void { + io_uring_prep_rw(.REMOVE_BUFFERS, sqe, @intCast(i32, num), 0, 0, 0); + sqe.buf_index = @intCast(u16, group_id); +} + test "structs/offsets/entries" { if (builtin.os.tag != .linux) return error.SkipZigTest; @@ -1615,7 +1706,7 @@ test "readv" { var buffer = [_]u8{42} ** 128; var iovecs = [_]os.iovec{os.iovec{ .iov_base = &buffer, .iov_len = buffer.len }}; - const sqe = try ring.readv(0xcccccccc, fd_index, iovecs[0..], 0); + const sqe = try ring.read(0xcccccccc, fd_index, .{ .iovecs = iovecs[0..] }, 0); try testing.expectEqual(linux.IORING_OP.READV, sqe.opcode); sqe.flags |= linux.IOSQE_FIXED_FILE; @@ -1666,7 +1757,7 @@ test "writev/fsync/readv" { try testing.expectEqual(fd, sqe_fsync.fd); sqe_fsync.flags |= linux.IOSQE_IO_LINK; - const sqe_readv = try ring.readv(0xffffffff, fd, iovecs_read[0..], 17); + const sqe_readv = try ring.read(0xffffffff, fd, .{ .iovecs = iovecs_read[0..] }, 17); try testing.expectEqual(linux.IORING_OP.READV, sqe_readv.opcode); try testing.expectEqual(@as(u64, 17), sqe_readv.off); @@ -1721,7 +1812,7 @@ test "write/read" { try testing.expectEqual(linux.IORING_OP.WRITE, sqe_write.opcode); try testing.expectEqual(@as(u64, 10), sqe_write.off); sqe_write.flags |= linux.IOSQE_IO_LINK; - const sqe_read = try ring.read(0x22222222, fd, buffer_read[0..], 10); + const sqe_read = try ring.read(0x22222222, fd, .{ .buffer = buffer_read[0..] }, 10); try testing.expectEqual(linux.IORING_OP.READ, sqe_read.opcode); try testing.expectEqual(@as(u64, 10), sqe_read.off); try testing.expectEqual(@as(u32, 2), try ring.submit()); @@ -1890,53 +1981,15 @@ test "accept/connect/send/recv" { }; defer ring.deinit(); - const address = try net.Address.parseIp4("127.0.0.1", 3131); - const kernel_backlog = 1; - const server = try os.socket(address.any.family, os.SOCK.STREAM | os.SOCK.CLOEXEC, 0); - defer os.close(server); - try os.setsockopt(server, os.SOL.SOCKET, os.SO.REUSEADDR, &mem.toBytes(@as(c_int, 1))); - try os.bind(server, &address.any, address.getOsSockLen()); - try os.listen(server, kernel_backlog); + const socket_test_harness = try createSocketTestHarness(&ring); + defer socket_test_harness.close(); const buffer_send = [_]u8{ 1, 0, 1, 0, 1, 0, 1, 0, 1, 0 }; var buffer_recv = [_]u8{ 0, 1, 0, 1, 0 }; - var accept_addr: os.sockaddr = undefined; - var accept_addr_len: os.socklen_t = @sizeOf(@TypeOf(accept_addr)); - _ = try ring.accept(0xaaaaaaaa, server, &accept_addr, &accept_addr_len, 0); - try testing.expectEqual(@as(u32, 1), try ring.submit()); - - const client = try os.socket(address.any.family, os.SOCK.STREAM | os.SOCK.CLOEXEC, 0); - defer os.close(client); - _ = try ring.connect(0xcccccccc, client, &address.any, address.getOsSockLen()); - try testing.expectEqual(@as(u32, 1), try ring.submit()); - - var cqe_accept = try ring.copy_cqe(); - if (cqe_accept.err() == .INVAL) return error.SkipZigTest; - var cqe_connect = try ring.copy_cqe(); - if (cqe_connect.err() == .INVAL) return error.SkipZigTest; - - // The accept/connect CQEs may arrive in any order, the connect CQE will sometimes come first: - if (cqe_accept.user_data == 0xcccccccc and cqe_connect.user_data == 0xaaaaaaaa) { - const a = cqe_accept; - const b = cqe_connect; - cqe_accept = b; - cqe_connect = a; - } - - try testing.expectEqual(@as(u64, 0xaaaaaaaa), cqe_accept.user_data); - if (cqe_accept.res <= 0) std.debug.print("\ncqe_accept.res={}\n", .{cqe_accept.res}); - try testing.expect(cqe_accept.res > 0); - try testing.expectEqual(@as(u32, 0), cqe_accept.flags); - try testing.expectEqual(linux.io_uring_cqe{ - .user_data = 0xcccccccc, - .res = 0, - .flags = 0, - }, cqe_connect); - - const send = try ring.send(0xeeeeeeee, client, buffer_send[0..], 0); + const send = try ring.send(0xeeeeeeee, socket_test_harness.client, buffer_send[0..], 0); send.flags |= linux.IOSQE_IO_LINK; - _ = try ring.recv(0xffffffff, cqe_accept.res, buffer_recv[0..], 0); + _ = try ring.recv(0xffffffff, socket_test_harness.server, .{ .buffer = buffer_recv[0..] }, 0); try testing.expectEqual(@as(u32, 2), try ring.submit()); const cqe_send = try ring.copy_cqe(); @@ -2161,50 +2214,12 @@ test "accept/connect/recv/link_timeout" { }; defer ring.deinit(); - const address = try net.Address.parseIp4("127.0.0.1", 3131); - const kernel_backlog = 1; - const server = try os.socket(address.any.family, os.SOCK.STREAM | os.SOCK.CLOEXEC, 0); - defer os.close(server); - try os.setsockopt(server, os.SOL.SOCKET, os.SO.REUSEADDR, &mem.toBytes(@as(c_int, 1))); - try os.bind(server, &address.any, address.getOsSockLen()); - try os.listen(server, kernel_backlog); + const socket_test_harness = try createSocketTestHarness(&ring); + defer socket_test_harness.close(); var buffer_recv = [_]u8{ 0, 1, 0, 1, 0 }; - var accept_addr: os.sockaddr = undefined; - var accept_addr_len: os.socklen_t = @sizeOf(@TypeOf(accept_addr)); - _ = try ring.accept(0xaaaaaaaa, server, &accept_addr, &accept_addr_len, 0); - try testing.expectEqual(@as(u32, 1), try ring.submit()); - - const client = try os.socket(address.any.family, os.SOCK.STREAM | os.SOCK.CLOEXEC, 0); - defer os.close(client); - _ = try ring.connect(0xcccccccc, client, &address.any, address.getOsSockLen()); - try testing.expectEqual(@as(u32, 1), try ring.submit()); - - var cqe_accept = try ring.copy_cqe(); - if (cqe_accept.err() == .INVAL) return error.SkipZigTest; - var cqe_connect = try ring.copy_cqe(); - if (cqe_connect.err() == .INVAL) return error.SkipZigTest; - - // The accept/connect CQEs may arrive in any order, the connect CQE will sometimes come first: - if (cqe_accept.user_data == 0xcccccccc and cqe_connect.user_data == 0xaaaaaaaa) { - const a = cqe_accept; - const b = cqe_connect; - cqe_accept = b; - cqe_connect = a; - } - - try testing.expectEqual(@as(u64, 0xaaaaaaaa), cqe_accept.user_data); - if (cqe_accept.res <= 0) std.debug.print("\ncqe_accept.res={}\n", .{cqe_accept.res}); - try testing.expect(cqe_accept.res > 0); - try testing.expectEqual(@as(u32, 0), cqe_accept.flags); - try testing.expectEqual(linux.io_uring_cqe{ - .user_data = 0xcccccccc, - .res = 0, - .flags = 0, - }, cqe_connect); - - const sqe_recv = try ring.recv(0xffffffff, cqe_accept.res, buffer_recv[0..], 0); + const sqe_recv = try ring.recv(0xffffffff, socket_test_harness.server, .{ .buffer = buffer_recv[0..] }, 0); sqe_recv.flags |= linux.IOSQE_IO_LINK; const ts = os.linux.kernel_timespec{ .tv_sec = 0, .tv_nsec = 1000000 }; @@ -2348,50 +2363,12 @@ test "accept/connect/recv/cancel" { }; defer ring.deinit(); - const address = try net.Address.parseIp4("127.0.0.1", 3131); - const kernel_backlog = 1; - const server = try os.socket(address.any.family, os.SOCK.STREAM | os.SOCK.CLOEXEC, 0); - defer os.close(server); - try os.setsockopt(server, os.SOL.SOCKET, os.SO.REUSEADDR, &mem.toBytes(@as(c_int, 1))); - try os.bind(server, &address.any, address.getOsSockLen()); - try os.listen(server, kernel_backlog); + const socket_test_harness = try createSocketTestHarness(&ring); + defer socket_test_harness.close(); var buffer_recv = [_]u8{ 0, 1, 0, 1, 0 }; - var accept_addr: os.sockaddr = undefined; - var accept_addr_len: os.socklen_t = @sizeOf(@TypeOf(accept_addr)); - _ = try ring.accept(0xaaaaaaaa, server, &accept_addr, &accept_addr_len, 0); - try testing.expectEqual(@as(u32, 1), try ring.submit()); - - const client = try os.socket(address.any.family, os.SOCK.STREAM | os.SOCK.CLOEXEC, 0); - defer os.close(client); - _ = try ring.connect(0xcccccccc, client, &address.any, address.getOsSockLen()); - try testing.expectEqual(@as(u32, 1), try ring.submit()); - - var cqe_accept = try ring.copy_cqe(); - if (cqe_accept.err() == .INVAL) return error.SkipZigTest; - var cqe_connect = try ring.copy_cqe(); - if (cqe_connect.err() == .INVAL) return error.SkipZigTest; - - // The accept/connect CQEs may arrive in any order, the connect CQE will sometimes come first: - if (cqe_accept.user_data == 0xcccccccc and cqe_connect.user_data == 0xaaaaaaaa) { - const a = cqe_accept; - const b = cqe_connect; - cqe_accept = b; - cqe_connect = a; - } - - try testing.expectEqual(@as(u64, 0xaaaaaaaa), cqe_accept.user_data); - if (cqe_accept.res <= 0) std.debug.print("\ncqe_accept.res={}\n", .{cqe_accept.res}); - try testing.expect(cqe_accept.res > 0); - try testing.expectEqual(@as(u32, 0), cqe_accept.flags); - try testing.expectEqual(linux.io_uring_cqe{ - .user_data = 0xcccccccc, - .res = 0, - .flags = 0, - }, cqe_connect); - - _ = try ring.recv(0xffffffff, cqe_accept.res, buffer_recv[0..], 0); + _ = try ring.recv(0xffffffff, socket_test_harness.server, .{ .buffer = buffer_recv[0..] }, 0); try testing.expectEqual(@as(u32, 1), try ring.submit()); const sqe_cancel = try ring.cancel(0x99999999, 0xffffffff, 0); @@ -2463,7 +2440,7 @@ test "register_files_update" { var buffer = [_]u8{42} ** 128; { - const sqe = try ring.read(0xcccccccc, fd_index, &buffer, 0); + const sqe = try ring.read(0xcccccccc, fd_index, .{ .buffer = &buffer }, 0); try testing.expectEqual(linux.IORING_OP.READ, sqe.opcode); sqe.flags |= linux.IOSQE_FIXED_FILE; @@ -2484,7 +2461,7 @@ test "register_files_update" { { // Next read should still work since fd_index in the registered file descriptors hasn't been updated yet. - const sqe = try ring.read(0xcccccccc, fd_index, &buffer, 0); + const sqe = try ring.read(0xcccccccc, fd_index, .{ .buffer = &buffer }, 0); try testing.expectEqual(linux.IORING_OP.READ, sqe.opcode); sqe.flags |= linux.IOSQE_FIXED_FILE; @@ -2501,7 +2478,7 @@ test "register_files_update" { { // Now this should fail since both fds are sparse (-1) - const sqe = try ring.read(0xcccccccc, fd_index, &buffer, 0); + const sqe = try ring.read(0xcccccccc, fd_index, .{ .buffer = &buffer }, 0); try testing.expectEqual(linux.IORING_OP.READ, sqe.opcode); sqe.flags |= linux.IOSQE_FIXED_FILE; @@ -2843,3 +2820,455 @@ test "linkat" { const read = try second_file.readAll(&second_file_data); try testing.expectEqualStrings("hello", second_file_data[0..read]); } + +test "provide_buffers: read" { + if (builtin.os.tag != .linux) return error.SkipZigTest; + + var ring = IO_Uring.init(1, 0) catch |err| switch (err) { + error.SystemOutdated => return error.SkipZigTest, + error.PermissionDenied => return error.SkipZigTest, + else => return err, + }; + defer ring.deinit(); + + const fd = try os.openZ("/dev/zero", os.O.RDONLY | os.O.CLOEXEC, 0); + defer os.close(fd); + + const group_id = 1337; + const buffer_id = 0; + + const buffer_len = 128; + + var buffers: [4][buffer_len]u8 = undefined; + + // Provide 4 buffers + + { + const sqe = try ring.provide_buffers(0xcccccccc, @ptrCast([*]u8, &buffers), buffers.len, buffer_len, group_id, buffer_id); + try testing.expectEqual(linux.IORING_OP.PROVIDE_BUFFERS, sqe.opcode); + try testing.expectEqual(@as(i32, buffers.len), sqe.fd); + try testing.expectEqual(@as(u32, buffers[0].len), sqe.len); + try testing.expectEqual(@as(u16, group_id), sqe.buf_index); + try testing.expectEqual(@as(u32, 1), try ring.submit()); + + const cqe = try ring.copy_cqe(); + switch (cqe.err()) { + // Happens when the kernel is < 5.7 + .INVAL => return error.SkipZigTest, + .SUCCESS => {}, + else => |errno| std.debug.panic("unhandled errno: {}", .{errno}), + } + try testing.expectEqual(@as(u64, 0xcccccccc), cqe.user_data); + } + + // Do 4 reads which should consume all buffers + + var i: usize = 0; + while (i < buffers.len) : (i += 1) { + var sqe = try ring.read(0xdededede, fd, .{ .buffer_selection = .{ .group_id = group_id, .len = buffer_len } }, 0); + try testing.expectEqual(linux.IORING_OP.READ, sqe.opcode); + try testing.expectEqual(@as(i32, fd), sqe.fd); + try testing.expectEqual(@as(u64, 0), sqe.addr); + try testing.expectEqual(@as(u32, buffer_len), sqe.len); + try testing.expectEqual(@as(u16, group_id), sqe.buf_index); + try testing.expectEqual(@as(u32, 1), try ring.submit()); + + const cqe = try ring.copy_cqe(); + switch (cqe.err()) { + .SUCCESS => {}, + else => |errno| std.debug.panic("unhandled errno: {}", .{errno}), + } + + try testing.expect(cqe.flags & linux.IORING_CQE_F_BUFFER == linux.IORING_CQE_F_BUFFER); + const used_buffer_id = cqe.flags >> 16; + try testing.expect(used_buffer_id >= 0 and used_buffer_id <= 3); + try testing.expectEqual(@as(i32, buffer_len), cqe.res); + + try testing.expectEqual(@as(u64, 0xdededede), cqe.user_data); + try testing.expectEqualSlices(u8, &([_]u8{0} ** buffer_len), buffers[used_buffer_id][0..@intCast(usize, cqe.res)]); + } + + // This read should fail + + { + var sqe = try ring.read(0xdfdfdfdf, fd, .{ .buffer_selection = .{ .group_id = group_id, .len = buffer_len } }, 0); + try testing.expectEqual(linux.IORING_OP.READ, sqe.opcode); + try testing.expectEqual(@as(i32, fd), sqe.fd); + try testing.expectEqual(@as(u64, 0), sqe.addr); + try testing.expectEqual(@as(u32, buffer_len), sqe.len); + try testing.expectEqual(@as(u16, group_id), sqe.buf_index); + try testing.expectEqual(@as(u32, 1), try ring.submit()); + + const cqe = try ring.copy_cqe(); + switch (cqe.err()) { + // Expected + .NOBUFS => {}, + .SUCCESS => std.debug.panic("unexpected success", .{}), + else => |errno| std.debug.panic("unhandled errno: {}", .{errno}), + } + try testing.expectEqual(@as(u64, 0xdfdfdfdf), cqe.user_data); + } + + // Provide 1 buffer again + + // Deliberately put something we don't expect in the buffers + mem.set(u8, mem.sliceAsBytes(&buffers), 42); + + const reprovided_buffer_id = 2; + + { + _ = try ring.provide_buffers(0xabababab, @ptrCast([*]u8, &buffers[reprovided_buffer_id]), 1, buffer_len, group_id, reprovided_buffer_id); + try testing.expectEqual(@as(u32, 1), try ring.submit()); + + const cqe = try ring.copy_cqe(); + switch (cqe.err()) { + .SUCCESS => {}, + else => |errno| std.debug.panic("unhandled errno: {}", .{errno}), + } + } + + // Final read which should work + + { + var sqe = try ring.read(0xdfdfdfdf, fd, .{ .buffer_selection = .{ .group_id = group_id, .len = buffer_len } }, 0); + try testing.expectEqual(linux.IORING_OP.READ, sqe.opcode); + try testing.expectEqual(@as(i32, fd), sqe.fd); + try testing.expectEqual(@as(u64, 0), sqe.addr); + try testing.expectEqual(@as(u32, buffer_len), sqe.len); + try testing.expectEqual(@as(u16, group_id), sqe.buf_index); + try testing.expectEqual(@as(u32, 1), try ring.submit()); + + const cqe = try ring.copy_cqe(); + switch (cqe.err()) { + .SUCCESS => {}, + else => |errno| std.debug.panic("unhandled errno: {}", .{errno}), + } + + try testing.expect(cqe.flags & linux.IORING_CQE_F_BUFFER == linux.IORING_CQE_F_BUFFER); + const used_buffer_id = cqe.flags >> 16; + try testing.expectEqual(used_buffer_id, reprovided_buffer_id); + try testing.expectEqual(@as(i32, buffer_len), cqe.res); + try testing.expectEqual(@as(u64, 0xdfdfdfdf), cqe.user_data); + try testing.expectEqualSlices(u8, &([_]u8{0} ** buffer_len), buffers[used_buffer_id][0..@intCast(usize, cqe.res)]); + } +} + +test "remove_buffers" { + if (builtin.os.tag != .linux) return error.SkipZigTest; + + var ring = IO_Uring.init(1, 0) catch |err| switch (err) { + error.SystemOutdated => return error.SkipZigTest, + error.PermissionDenied => return error.SkipZigTest, + else => return err, + }; + defer ring.deinit(); + + const fd = try os.openZ("/dev/zero", os.O.RDONLY | os.O.CLOEXEC, 0); + defer os.close(fd); + + const group_id = 1337; + const buffer_id = 0; + + const buffer_len = 128; + + var buffers: [4][buffer_len]u8 = undefined; + + // Provide 4 buffers + + { + _ = try ring.provide_buffers(0xcccccccc, @ptrCast([*]u8, &buffers), buffers.len, buffer_len, group_id, buffer_id); + try testing.expectEqual(@as(u32, 1), try ring.submit()); + + const cqe = try ring.copy_cqe(); + switch (cqe.err()) { + .SUCCESS => {}, + else => |errno| std.debug.panic("unhandled errno: {}", .{errno}), + } + try testing.expectEqual(@as(u64, 0xcccccccc), cqe.user_data); + } + + // Remove the first 3 buffers + + { + var sqe = try ring.remove_buffers(0xbababababa, 3, group_id); + try testing.expectEqual(linux.IORING_OP.REMOVE_BUFFERS, sqe.opcode); + try testing.expectEqual(@as(i32, 3), sqe.fd); + try testing.expectEqual(@as(u64, 0), sqe.addr); + try testing.expectEqual(@as(u16, group_id), sqe.buf_index); + try testing.expectEqual(@as(u32, 1), try ring.submit()); + + const cqe = try ring.copy_cqe(); + switch (cqe.err()) { + .SUCCESS => {}, + else => |errno| std.debug.panic("unhandled errno: {}", .{errno}), + } + try testing.expectEqual(@as(u64, 0xbababababa), cqe.user_data); + } + + // This read should work + + { + _ = try ring.read(0xdfdfdfdf, fd, .{ .buffer_selection = .{ .group_id = group_id, .len = buffer_len } }, 0); + try testing.expectEqual(@as(u32, 1), try ring.submit()); + + const cqe = try ring.copy_cqe(); + switch (cqe.err()) { + .SUCCESS => {}, + else => |errno| std.debug.panic("unhandled errno: {}", .{errno}), + } + + try testing.expect(cqe.flags & linux.IORING_CQE_F_BUFFER == linux.IORING_CQE_F_BUFFER); + const used_buffer_id = cqe.flags >> 16; + try testing.expectEqual(used_buffer_id, 0); + try testing.expectEqual(@as(i32, buffer_len), cqe.res); + try testing.expectEqual(@as(u64, 0xdfdfdfdf), cqe.user_data); + try testing.expectEqualSlices(u8, &([_]u8{0} ** buffer_len), buffers[used_buffer_id][0..@intCast(usize, cqe.res)]); + } + + // Final read should _not_ work + + { + _ = try ring.read(0xdfdfdfdf, fd, .{ .buffer_selection = .{ .group_id = group_id, .len = buffer_len } }, 0); + try testing.expectEqual(@as(u32, 1), try ring.submit()); + + const cqe = try ring.copy_cqe(); + switch (cqe.err()) { + // Expected + .NOBUFS => {}, + .SUCCESS => std.debug.panic("unexpected success", .{}), + else => |errno| std.debug.panic("unhandled errno: {}", .{errno}), + } + } +} + +test "provide_buffers: accept/connect/send/recv" { + if (builtin.os.tag != .linux) return error.SkipZigTest; + + var ring = IO_Uring.init(16, 0) catch |err| switch (err) { + error.SystemOutdated => return error.SkipZigTest, + error.PermissionDenied => return error.SkipZigTest, + else => return err, + }; + defer ring.deinit(); + + const group_id = 1337; + const buffer_id = 0; + + const buffer_len = 128; + var buffers: [4][buffer_len]u8 = undefined; + + // Provide 4 buffers + + { + const sqe = try ring.provide_buffers(0xcccccccc, @ptrCast([*]u8, &buffers), buffers.len, buffer_len, group_id, buffer_id); + try testing.expectEqual(linux.IORING_OP.PROVIDE_BUFFERS, sqe.opcode); + try testing.expectEqual(@as(i32, buffers.len), sqe.fd); + try testing.expectEqual(@as(u32, buffer_len), sqe.len); + try testing.expectEqual(@as(u16, group_id), sqe.buf_index); + try testing.expectEqual(@as(u32, 1), try ring.submit()); + + const cqe = try ring.copy_cqe(); + switch (cqe.err()) { + // Happens when the kernel is < 5.7 + .INVAL => return error.SkipZigTest, + .SUCCESS => {}, + else => |errno| std.debug.panic("unhandled errno: {}", .{errno}), + } + try testing.expectEqual(@as(u64, 0xcccccccc), cqe.user_data); + } + + const socket_test_harness = try createSocketTestHarness(&ring); + defer socket_test_harness.close(); + + // Do 4 send on the socket + + { + var i: usize = 0; + while (i < buffers.len) : (i += 1) { + _ = try ring.send(0xdeaddead, socket_test_harness.server, &([_]u8{'z'} ** buffer_len), 0); + try testing.expectEqual(@as(u32, 1), try ring.submit()); + } + + var cqes: [4]linux.io_uring_cqe = undefined; + try testing.expectEqual(@as(u32, 4), try ring.copy_cqes(&cqes, 4)); + } + + // Do 4 recv which should consume all buffers + + // Deliberately put something we don't expect in the buffers + mem.set(u8, mem.sliceAsBytes(&buffers), 1); + + var i: usize = 0; + while (i < buffers.len) : (i += 1) { + var sqe = try ring.recv(0xdededede, socket_test_harness.client, .{ .buffer_selection = .{ .group_id = group_id, .len = buffer_len } }, 0); + try testing.expectEqual(linux.IORING_OP.RECV, sqe.opcode); + try testing.expectEqual(@as(i32, socket_test_harness.client), sqe.fd); + try testing.expectEqual(@as(u64, 0), sqe.addr); + try testing.expectEqual(@as(u32, buffer_len), sqe.len); + try testing.expectEqual(@as(u16, group_id), sqe.buf_index); + try testing.expectEqual(@as(u32, 0), sqe.rw_flags); + try testing.expectEqual(@as(u32, linux.IOSQE_BUFFER_SELECT), sqe.flags); + try testing.expectEqual(@as(u32, 1), try ring.submit()); + + const cqe = try ring.copy_cqe(); + switch (cqe.err()) { + .SUCCESS => {}, + else => |errno| std.debug.panic("unhandled errno: {}", .{errno}), + } + + try testing.expect(cqe.flags & linux.IORING_CQE_F_BUFFER == linux.IORING_CQE_F_BUFFER); + const used_buffer_id = cqe.flags >> 16; + try testing.expect(used_buffer_id >= 0 and used_buffer_id <= 3); + try testing.expectEqual(@as(i32, buffer_len), cqe.res); + + try testing.expectEqual(@as(u64, 0xdededede), cqe.user_data); + const buffer = buffers[used_buffer_id][0..@intCast(usize, cqe.res)]; + try testing.expectEqualSlices(u8, &([_]u8{'z'} ** buffer_len), buffer); + } + + // This recv should fail + + { + var sqe = try ring.recv(0xdfdfdfdf, socket_test_harness.client, .{ .buffer_selection = .{ .group_id = group_id, .len = buffer_len } }, 0); + try testing.expectEqual(linux.IORING_OP.RECV, sqe.opcode); + try testing.expectEqual(@as(i32, socket_test_harness.client), sqe.fd); + try testing.expectEqual(@as(u64, 0), sqe.addr); + try testing.expectEqual(@as(u32, buffer_len), sqe.len); + try testing.expectEqual(@as(u16, group_id), sqe.buf_index); + try testing.expectEqual(@as(u32, 0), sqe.rw_flags); + try testing.expectEqual(@as(u32, linux.IOSQE_BUFFER_SELECT), sqe.flags); + try testing.expectEqual(@as(u32, 1), try ring.submit()); + + const cqe = try ring.copy_cqe(); + switch (cqe.err()) { + // Expected + .NOBUFS => {}, + .SUCCESS => std.debug.panic("unexpected success", .{}), + else => |errno| std.debug.panic("unhandled errno: {}", .{errno}), + } + try testing.expectEqual(@as(u64, 0xdfdfdfdf), cqe.user_data); + } + + // Provide 1 buffer again + + const reprovided_buffer_id = 2; + + { + _ = try ring.provide_buffers(0xabababab, @ptrCast([*]u8, &buffers[reprovided_buffer_id]), 1, buffer_len, group_id, reprovided_buffer_id); + try testing.expectEqual(@as(u32, 1), try ring.submit()); + + const cqe = try ring.copy_cqe(); + switch (cqe.err()) { + .SUCCESS => {}, + else => |errno| std.debug.panic("unhandled errno: {}", .{errno}), + } + } + + // Redo 1 send on the server socket + + { + _ = try ring.send(0xdeaddead, socket_test_harness.server, &([_]u8{'w'} ** buffer_len), 0); + try testing.expectEqual(@as(u32, 1), try ring.submit()); + + _ = try ring.copy_cqe(); + } + + // Final recv which should work + + // Deliberately put something we don't expect in the buffers + mem.set(u8, mem.sliceAsBytes(&buffers), 1); + + { + var sqe = try ring.recv(0xdfdfdfdf, socket_test_harness.client, .{ .buffer_selection = .{ .group_id = group_id, .len = buffer_len } }, 0); + try testing.expectEqual(linux.IORING_OP.RECV, sqe.opcode); + try testing.expectEqual(@as(i32, socket_test_harness.client), sqe.fd); + try testing.expectEqual(@as(u64, 0), sqe.addr); + try testing.expectEqual(@as(u32, buffer_len), sqe.len); + try testing.expectEqual(@as(u16, group_id), sqe.buf_index); + try testing.expectEqual(@as(u32, 0), sqe.rw_flags); + try testing.expectEqual(@as(u32, linux.IOSQE_BUFFER_SELECT), sqe.flags); + try testing.expectEqual(@as(u32, 1), try ring.submit()); + + const cqe = try ring.copy_cqe(); + switch (cqe.err()) { + .SUCCESS => {}, + else => |errno| std.debug.panic("unhandled errno: {}", .{errno}), + } + + try testing.expect(cqe.flags & linux.IORING_CQE_F_BUFFER == linux.IORING_CQE_F_BUFFER); + const used_buffer_id = cqe.flags >> 16; + try testing.expectEqual(used_buffer_id, reprovided_buffer_id); + try testing.expectEqual(@as(i32, buffer_len), cqe.res); + try testing.expectEqual(@as(u64, 0xdfdfdfdf), cqe.user_data); + const buffer = buffers[used_buffer_id][0..@intCast(usize, cqe.res)]; + try testing.expectEqualSlices(u8, &([_]u8{'w'} ** buffer_len), buffer); + } +} + +/// Used for testing server/client interactions. +const SocketTestHarness = struct { + listener: os.socket_t, + server: os.socket_t, + client: os.socket_t, + + fn close(self: SocketTestHarness) void { + os.closeSocket(self.client); + os.closeSocket(self.listener); + } +}; + +fn createSocketTestHarness(ring: *IO_Uring) !SocketTestHarness { + // Create a TCP server socket + + const address = try net.Address.parseIp4("127.0.0.1", 3131); + const kernel_backlog = 1; + const listener_socket = try os.socket(address.any.family, os.SOCK.STREAM | os.SOCK.CLOEXEC, 0); + errdefer os.closeSocket(listener_socket); + + try os.setsockopt(listener_socket, os.SOL.SOCKET, os.SO.REUSEADDR, &mem.toBytes(@as(c_int, 1))); + try os.bind(listener_socket, &address.any, address.getOsSockLen()); + try os.listen(listener_socket, kernel_backlog); + + // Submit 1 accept + var accept_addr: os.sockaddr = undefined; + var accept_addr_len: os.socklen_t = @sizeOf(@TypeOf(accept_addr)); + _ = try ring.accept(0xaaaaaaaa, listener_socket, &accept_addr, &accept_addr_len, 0); + + // Create a TCP client socket + const client = try os.socket(address.any.family, os.SOCK.STREAM | os.SOCK.CLOEXEC, 0); + errdefer os.closeSocket(client); + _ = try ring.connect(0xcccccccc, client, &address.any, address.getOsSockLen()); + + try testing.expectEqual(@as(u32, 2), try ring.submit()); + + var cqe_accept = try ring.copy_cqe(); + if (cqe_accept.err() == .INVAL) return error.SkipZigTest; + var cqe_connect = try ring.copy_cqe(); + if (cqe_connect.err() == .INVAL) return error.SkipZigTest; + + // The accept/connect CQEs may arrive in any order, the connect CQE will sometimes come first: + if (cqe_accept.user_data == 0xcccccccc and cqe_connect.user_data == 0xaaaaaaaa) { + const a = cqe_accept; + const b = cqe_connect; + cqe_accept = b; + cqe_connect = a; + } + + try testing.expectEqual(@as(u64, 0xaaaaaaaa), cqe_accept.user_data); + if (cqe_accept.res <= 0) std.debug.print("\ncqe_accept.res={}\n", .{cqe_accept.res}); + try testing.expect(cqe_accept.res > 0); + try testing.expectEqual(@as(u32, 0), cqe_accept.flags); + try testing.expectEqual(linux.io_uring_cqe{ + .user_data = 0xcccccccc, + .res = 0, + .flags = 0, + }, cqe_connect); + + // All good + + return SocketTestHarness{ + .listener = listener_socket, + .server = cqe_accept.res, + .client = client, + }; +}