From 7b3e5ce0b3d3e6c8e24dd147d23eec3fe5088d74 Mon Sep 17 00:00:00 2001 From: Vincent Rischmann Date: Thu, 12 May 2022 13:28:34 +0200 Subject: [PATCH 1/7] io_uring: add provide_buffers and remove_buffers These functions are needed to implement automatic buffer selection. This maps to the IORING_OP_PROVIDE_BUFFERS and IORING_OP_PROVIDE_BUFFERS ops. --- lib/std/os/linux/io_uring.zig | 57 +++++++++++++++++++++++++++++++++++ 1 file changed, 57 insertions(+) diff --git a/lib/std/os/linux/io_uring.zig b/lib/std/os/linux/io_uring.zig index be8b442611..9646a923d7 100644 --- a/lib/std/os/linux/io_uring.zig +++ b/lib/std/os/linux/io_uring.zig @@ -857,6 +857,41 @@ pub const IO_Uring = struct { return sqe; } + /// Queues (but does not submit) an SQE to provide a group of buffers used for commands that read/receive data. + /// Returns a pointer to the SQE. + /// + /// Provided buffers can be used in `read`, `recv` or `recvmsg` commands via .buffer_selection. + /// + /// The kernel expects a contiguous block of memory of size (buffers_count * buffer_size). + pub fn provide_buffers( + self: *IO_Uring, + user_data: u64, + buffers: [*]u8, + buffers_count: usize, + buffer_size: usize, + group_id: usize, + buffer_id: usize, + ) !*io_uring_sqe { + const sqe = try self.get_sqe(); + io_uring_prep_provide_buffers(sqe, buffers, buffers_count, buffer_size, group_id, buffer_id); + sqe.user_data = user_data; + return sqe; + } + + /// Queues (but does not submit) an SQE to remove a group of provided buffers. + /// Returns a pointer to the SQE. + pub fn remove_buffers( + self: *IO_Uring, + user_data: u64, + buffers_count: usize, + group_id: usize, + ) !*io_uring_sqe { + const sqe = try self.get_sqe(); + io_uring_prep_remove_buffers(sqe, buffers_count, group_id); + sqe.user_data = user_data; + return sqe; + } + /// Registers an array of file descriptors. /// Every time a file descriptor is put in an SQE and submitted to the kernel, the kernel must /// retrieve a reference to the file, and once I/O has completed the file reference must be @@ -1508,6 +1543,28 @@ pub fn io_uring_prep_linkat( sqe.rw_flags = flags; } +pub fn io_uring_prep_provide_buffers( + sqe: *io_uring_sqe, + buffers: [*]u8, + num: usize, + buffer_len: usize, + group_id: usize, + buffer_id: usize, +) void { + const ptr = @ptrToInt(buffers); + io_uring_prep_rw(.PROVIDE_BUFFERS, sqe, @intCast(i32, num), ptr, buffer_len, buffer_id); + sqe.buf_index = @intCast(u16, group_id); +} + +pub fn io_uring_prep_remove_buffers( + sqe: *io_uring_sqe, + num: usize, + group_id: usize, +) void { + io_uring_prep_rw(.REMOVE_BUFFERS, sqe, @intCast(i32, num), 0, 0, 0); + sqe.buf_index = @intCast(u16, group_id); +} + test "structs/offsets/entries" { if (builtin.os.tag != .linux) return error.SkipZigTest; From 52dd468cc3e279e7d1a764c2c3f6fde13a404a14 Mon Sep 17 00:00:00 2001 From: Vincent Rischmann Date: Mon, 16 May 2022 19:02:22 +0200 Subject: [PATCH 2/7] io_uring: change read() to use a ReadBuffer instead Reads can be done in two ways with io_uring: * using a simple buffer * using a automatic buffer selection which requires the user to have provided a number of buffers before ReadBuffer let's the caller choose where the data should be read. --- lib/std/os/linux/io_uring.zig | 33 +++++++++++++++++++++++++++------ 1 file changed, 27 insertions(+), 6 deletions(-) diff --git a/lib/std/os/linux/io_uring.zig b/lib/std/os/linux/io_uring.zig index 9646a923d7..74e6416102 100644 --- a/lib/std/os/linux/io_uring.zig +++ b/lib/std/os/linux/io_uring.zig @@ -358,17 +358,38 @@ pub const IO_Uring = struct { return sqe; } + /// Used to select how the read should be handled. + pub const ReadBuffer = union(enum) { + /// io_uring will read directly into this buffer + buffer: []u8, + + /// io_uring will select a buffer that has previously been provided with `provide_buffers`. + /// The buffer group reference by `group_id` must contain at least one buffer for the read to work. + /// `len` controls the number of bytes to read into the selected buffer. + buffer_selection: struct { + group_id: u16, + len: usize, + }, + }; + /// Queues (but does not submit) an SQE to perform a `read(2)`. /// Returns a pointer to the SQE. pub fn read( self: *IO_Uring, user_data: u64, fd: os.fd_t, - buffer: []u8, + buffer: ReadBuffer, offset: u64, ) !*io_uring_sqe { const sqe = try self.get_sqe(); - io_uring_prep_read(sqe, fd, buffer, offset); + switch (buffer) { + .buffer => |slice| io_uring_prep_read(sqe, fd, slice, offset), + .buffer_selection => |selection| { + io_uring_prep_rw(.READ, sqe, fd, 0, selection.len, offset); + sqe.flags |= linux.IOSQE_BUFFER_SELECT; + sqe.buf_index = selection.group_id; + }, + } sqe.user_data = user_data; return sqe; } @@ -1778,7 +1799,7 @@ test "write/read" { try testing.expectEqual(linux.IORING_OP.WRITE, sqe_write.opcode); try testing.expectEqual(@as(u64, 10), sqe_write.off); sqe_write.flags |= linux.IOSQE_IO_LINK; - const sqe_read = try ring.read(0x22222222, fd, buffer_read[0..], 10); + const sqe_read = try ring.read(0x22222222, fd, .{ .buffer = buffer_read[0..] }, 10); try testing.expectEqual(linux.IORING_OP.READ, sqe_read.opcode); try testing.expectEqual(@as(u64, 10), sqe_read.off); try testing.expectEqual(@as(u32, 2), try ring.submit()); @@ -2520,7 +2541,7 @@ test "register_files_update" { var buffer = [_]u8{42} ** 128; { - const sqe = try ring.read(0xcccccccc, fd_index, &buffer, 0); + const sqe = try ring.read(0xcccccccc, fd_index, .{ .buffer = &buffer }, 0); try testing.expectEqual(linux.IORING_OP.READ, sqe.opcode); sqe.flags |= linux.IOSQE_FIXED_FILE; @@ -2541,7 +2562,7 @@ test "register_files_update" { { // Next read should still work since fd_index in the registered file descriptors hasn't been updated yet. - const sqe = try ring.read(0xcccccccc, fd_index, &buffer, 0); + const sqe = try ring.read(0xcccccccc, fd_index, .{ .buffer = &buffer }, 0); try testing.expectEqual(linux.IORING_OP.READ, sqe.opcode); sqe.flags |= linux.IOSQE_FIXED_FILE; @@ -2558,7 +2579,7 @@ test "register_files_update" { { // Now this should fail since both fds are sparse (-1) - const sqe = try ring.read(0xcccccccc, fd_index, &buffer, 0); + const sqe = try ring.read(0xcccccccc, fd_index, .{ .buffer = &buffer }, 0); try testing.expectEqual(linux.IORING_OP.READ, sqe.opcode); sqe.flags |= linux.IOSQE_FIXED_FILE; From 270a5039d4bc09bb5fbe017334d5d9ba6c4ac584 Mon Sep 17 00:00:00 2001 From: Vincent Rischmann Date: Mon, 16 May 2022 19:48:55 +0200 Subject: [PATCH 3/7] io_uring: change recv() to use a RecvBuffer instead RecvBuffer is equivalent to ReadBuffer but tailored for recv only. --- lib/std/os/linux/io_uring.zig | 32 +++++++++++++++++++++++++++----- 1 file changed, 27 insertions(+), 5 deletions(-) diff --git a/lib/std/os/linux/io_uring.zig b/lib/std/os/linux/io_uring.zig index 74e6416102..ad78f7bfab 100644 --- a/lib/std/os/linux/io_uring.zig +++ b/lib/std/os/linux/io_uring.zig @@ -528,17 +528,39 @@ pub const IO_Uring = struct { return sqe; } + /// Used to select how the recv call should be handled. + pub const RecvBuffer = union(enum) { + /// io_uring will recv directly into this buffer + buffer: []u8, + + /// io_uring will select a buffer that has previously been provided with `provide_buffers`. + /// The buffer group referenced by `group_id` must contain at least one buffer for the recv call to work. + /// `len` controls the number of bytes to read into the selected buffer. + buffer_selection: struct { + group_id: u16, + len: usize, + }, + }; + /// Queues (but does not submit) an SQE to perform a `recv(2)`. /// Returns a pointer to the SQE. pub fn recv( self: *IO_Uring, user_data: u64, fd: os.fd_t, - buffer: []u8, + buffer: RecvBuffer, flags: u32, ) !*io_uring_sqe { const sqe = try self.get_sqe(); - io_uring_prep_recv(sqe, fd, buffer, flags); + switch (buffer) { + .buffer => |slice| io_uring_prep_recv(sqe, fd, slice, flags), + .buffer_selection => |selection| { + io_uring_prep_rw(.RECV, sqe, fd, 0, selection.len, 0); + sqe.rw_flags = flags; + sqe.flags |= linux.IOSQE_BUFFER_SELECT; + sqe.buf_index = selection.group_id; + }, + } sqe.user_data = user_data; return sqe; } @@ -2014,7 +2036,7 @@ test "accept/connect/send/recv" { const send = try ring.send(0xeeeeeeee, client, buffer_send[0..], 0); send.flags |= linux.IOSQE_IO_LINK; - _ = try ring.recv(0xffffffff, cqe_accept.res, buffer_recv[0..], 0); + _ = try ring.recv(0xffffffff, cqe_accept.res, .{ .buffer = buffer_recv[0..] }, 0); try testing.expectEqual(@as(u32, 2), try ring.submit()); const cqe_send = try ring.copy_cqe(); @@ -2282,7 +2304,7 @@ test "accept/connect/recv/link_timeout" { .flags = 0, }, cqe_connect); - const sqe_recv = try ring.recv(0xffffffff, cqe_accept.res, buffer_recv[0..], 0); + const sqe_recv = try ring.recv(0xffffffff, cqe_accept.res, .{ .buffer = buffer_recv[0..] }, 0); sqe_recv.flags |= linux.IOSQE_IO_LINK; const ts = os.linux.kernel_timespec{ .tv_sec = 0, .tv_nsec = 1000000 }; @@ -2469,7 +2491,7 @@ test "accept/connect/recv/cancel" { .flags = 0, }, cqe_connect); - _ = try ring.recv(0xffffffff, cqe_accept.res, buffer_recv[0..], 0); + _ = try ring.recv(0xffffffff, cqe_accept.res, .{ .buffer = buffer_recv[0..] }, 0); try testing.expectEqual(@as(u32, 1), try ring.submit()); const sqe_cancel = try ring.cancel(0x99999999, 0xffffffff, 0); From 456716b30dffc343df0bed9a69cbce54d10787aa Mon Sep 17 00:00:00 2001 From: Vincent Rischmann Date: Mon, 16 May 2022 21:54:14 +0200 Subject: [PATCH 4/7] io_uring: add a test harness for server/client TCP socket tests --- lib/std/os/linux/io_uring.zig | 68 +++++++++++++++++++++++++++++++++++ 1 file changed, 68 insertions(+) diff --git a/lib/std/os/linux/io_uring.zig b/lib/std/os/linux/io_uring.zig index ad78f7bfab..ef669d2369 100644 --- a/lib/std/os/linux/io_uring.zig +++ b/lib/std/os/linux/io_uring.zig @@ -2943,3 +2943,71 @@ test "linkat" { const read = try second_file.readAll(&second_file_data); try testing.expectEqualStrings("hello", second_file_data[0..read]); } + +/// Used for testing server/client interactions. +const SocketTestHarness = struct { + listener: os.socket_t, + server: os.socket_t, + client: os.socket_t, + + fn close(self: SocketTestHarness) void { + os.closeSocket(self.client); + os.closeSocket(self.listener); + } +}; + +fn createSocketTestHarness(ring: *IO_Uring) !SocketTestHarness { + // Create a TCP server socket + + const address = try net.Address.parseIp4("127.0.0.1", 3131); + const kernel_backlog = 1; + const listener_socket = try os.socket(address.any.family, os.SOCK.STREAM | os.SOCK.CLOEXEC, 0); + errdefer os.closeSocket(listener_socket); + + try os.setsockopt(listener_socket, os.SOL.SOCKET, os.SO.REUSEADDR, &mem.toBytes(@as(c_int, 1))); + try os.bind(listener_socket, &address.any, address.getOsSockLen()); + try os.listen(listener_socket, kernel_backlog); + + // Submit 1 accept + var accept_addr: os.sockaddr = undefined; + var accept_addr_len: os.socklen_t = @sizeOf(@TypeOf(accept_addr)); + _ = try ring.accept(0xaaaaaaaa, listener_socket, &accept_addr, &accept_addr_len, 0); + + // Create a TCP client socket + const client = try os.socket(address.any.family, os.SOCK.STREAM | os.SOCK.CLOEXEC, 0); + errdefer os.closeSocket(client); + _ = try ring.connect(0xcccccccc, client, &address.any, address.getOsSockLen()); + + try testing.expectEqual(@as(u32, 2), try ring.submit()); + + var cqe_accept = try ring.copy_cqe(); + if (cqe_accept.err() == .INVAL) return error.SkipZigTest; + var cqe_connect = try ring.copy_cqe(); + if (cqe_connect.err() == .INVAL) return error.SkipZigTest; + + // The accept/connect CQEs may arrive in any order, the connect CQE will sometimes come first: + if (cqe_accept.user_data == 0xcccccccc and cqe_connect.user_data == 0xaaaaaaaa) { + const a = cqe_accept; + const b = cqe_connect; + cqe_accept = b; + cqe_connect = a; + } + + try testing.expectEqual(@as(u64, 0xaaaaaaaa), cqe_accept.user_data); + if (cqe_accept.res <= 0) std.debug.print("\ncqe_accept.res={}\n", .{cqe_accept.res}); + try testing.expect(cqe_accept.res > 0); + try testing.expectEqual(@as(u32, 0), cqe_accept.flags); + try testing.expectEqual(linux.io_uring_cqe{ + .user_data = 0xcccccccc, + .res = 0, + .flags = 0, + }, cqe_connect); + + // All good + + return SocketTestHarness{ + .listener = listener_socket, + .server = cqe_accept.res, + .client = client, + }; +} From d8798ef0cd22532d26157aa1939b96aabc02223c Mon Sep 17 00:00:00 2001 From: Vincent Rischmann Date: Mon, 16 May 2022 21:45:56 +0200 Subject: [PATCH 5/7] io_uring: use the socket test harness --- lib/std/os/linux/io_uring.zig | 134 +++------------------------------- 1 file changed, 10 insertions(+), 124 deletions(-) diff --git a/lib/std/os/linux/io_uring.zig b/lib/std/os/linux/io_uring.zig index ef669d2369..bb48c63719 100644 --- a/lib/std/os/linux/io_uring.zig +++ b/lib/std/os/linux/io_uring.zig @@ -1990,53 +1990,15 @@ test "accept/connect/send/recv" { }; defer ring.deinit(); - const address = try net.Address.parseIp4("127.0.0.1", 3131); - const kernel_backlog = 1; - const server = try os.socket(address.any.family, os.SOCK.STREAM | os.SOCK.CLOEXEC, 0); - defer os.close(server); - try os.setsockopt(server, os.SOL.SOCKET, os.SO.REUSEADDR, &mem.toBytes(@as(c_int, 1))); - try os.bind(server, &address.any, address.getOsSockLen()); - try os.listen(server, kernel_backlog); + const socket_test_harness = try createSocketTestHarness(&ring); + defer socket_test_harness.close(); const buffer_send = [_]u8{ 1, 0, 1, 0, 1, 0, 1, 0, 1, 0 }; var buffer_recv = [_]u8{ 0, 1, 0, 1, 0 }; - var accept_addr: os.sockaddr = undefined; - var accept_addr_len: os.socklen_t = @sizeOf(@TypeOf(accept_addr)); - _ = try ring.accept(0xaaaaaaaa, server, &accept_addr, &accept_addr_len, 0); - try testing.expectEqual(@as(u32, 1), try ring.submit()); - - const client = try os.socket(address.any.family, os.SOCK.STREAM | os.SOCK.CLOEXEC, 0); - defer os.close(client); - _ = try ring.connect(0xcccccccc, client, &address.any, address.getOsSockLen()); - try testing.expectEqual(@as(u32, 1), try ring.submit()); - - var cqe_accept = try ring.copy_cqe(); - if (cqe_accept.err() == .INVAL) return error.SkipZigTest; - var cqe_connect = try ring.copy_cqe(); - if (cqe_connect.err() == .INVAL) return error.SkipZigTest; - - // The accept/connect CQEs may arrive in any order, the connect CQE will sometimes come first: - if (cqe_accept.user_data == 0xcccccccc and cqe_connect.user_data == 0xaaaaaaaa) { - const a = cqe_accept; - const b = cqe_connect; - cqe_accept = b; - cqe_connect = a; - } - - try testing.expectEqual(@as(u64, 0xaaaaaaaa), cqe_accept.user_data); - if (cqe_accept.res <= 0) std.debug.print("\ncqe_accept.res={}\n", .{cqe_accept.res}); - try testing.expect(cqe_accept.res > 0); - try testing.expectEqual(@as(u32, 0), cqe_accept.flags); - try testing.expectEqual(linux.io_uring_cqe{ - .user_data = 0xcccccccc, - .res = 0, - .flags = 0, - }, cqe_connect); - - const send = try ring.send(0xeeeeeeee, client, buffer_send[0..], 0); + const send = try ring.send(0xeeeeeeee, socket_test_harness.client, buffer_send[0..], 0); send.flags |= linux.IOSQE_IO_LINK; - _ = try ring.recv(0xffffffff, cqe_accept.res, .{ .buffer = buffer_recv[0..] }, 0); + _ = try ring.recv(0xffffffff, socket_test_harness.server, .{ .buffer = buffer_recv[0..] }, 0); try testing.expectEqual(@as(u32, 2), try ring.submit()); const cqe_send = try ring.copy_cqe(); @@ -2261,50 +2223,12 @@ test "accept/connect/recv/link_timeout" { }; defer ring.deinit(); - const address = try net.Address.parseIp4("127.0.0.1", 3131); - const kernel_backlog = 1; - const server = try os.socket(address.any.family, os.SOCK.STREAM | os.SOCK.CLOEXEC, 0); - defer os.close(server); - try os.setsockopt(server, os.SOL.SOCKET, os.SO.REUSEADDR, &mem.toBytes(@as(c_int, 1))); - try os.bind(server, &address.any, address.getOsSockLen()); - try os.listen(server, kernel_backlog); + const socket_test_harness = try createSocketTestHarness(&ring); + defer socket_test_harness.close(); var buffer_recv = [_]u8{ 0, 1, 0, 1, 0 }; - var accept_addr: os.sockaddr = undefined; - var accept_addr_len: os.socklen_t = @sizeOf(@TypeOf(accept_addr)); - _ = try ring.accept(0xaaaaaaaa, server, &accept_addr, &accept_addr_len, 0); - try testing.expectEqual(@as(u32, 1), try ring.submit()); - - const client = try os.socket(address.any.family, os.SOCK.STREAM | os.SOCK.CLOEXEC, 0); - defer os.close(client); - _ = try ring.connect(0xcccccccc, client, &address.any, address.getOsSockLen()); - try testing.expectEqual(@as(u32, 1), try ring.submit()); - - var cqe_accept = try ring.copy_cqe(); - if (cqe_accept.err() == .INVAL) return error.SkipZigTest; - var cqe_connect = try ring.copy_cqe(); - if (cqe_connect.err() == .INVAL) return error.SkipZigTest; - - // The accept/connect CQEs may arrive in any order, the connect CQE will sometimes come first: - if (cqe_accept.user_data == 0xcccccccc and cqe_connect.user_data == 0xaaaaaaaa) { - const a = cqe_accept; - const b = cqe_connect; - cqe_accept = b; - cqe_connect = a; - } - - try testing.expectEqual(@as(u64, 0xaaaaaaaa), cqe_accept.user_data); - if (cqe_accept.res <= 0) std.debug.print("\ncqe_accept.res={}\n", .{cqe_accept.res}); - try testing.expect(cqe_accept.res > 0); - try testing.expectEqual(@as(u32, 0), cqe_accept.flags); - try testing.expectEqual(linux.io_uring_cqe{ - .user_data = 0xcccccccc, - .res = 0, - .flags = 0, - }, cqe_connect); - - const sqe_recv = try ring.recv(0xffffffff, cqe_accept.res, .{ .buffer = buffer_recv[0..] }, 0); + const sqe_recv = try ring.recv(0xffffffff, socket_test_harness.server, .{ .buffer = buffer_recv[0..] }, 0); sqe_recv.flags |= linux.IOSQE_IO_LINK; const ts = os.linux.kernel_timespec{ .tv_sec = 0, .tv_nsec = 1000000 }; @@ -2448,50 +2372,12 @@ test "accept/connect/recv/cancel" { }; defer ring.deinit(); - const address = try net.Address.parseIp4("127.0.0.1", 3131); - const kernel_backlog = 1; - const server = try os.socket(address.any.family, os.SOCK.STREAM | os.SOCK.CLOEXEC, 0); - defer os.close(server); - try os.setsockopt(server, os.SOL.SOCKET, os.SO.REUSEADDR, &mem.toBytes(@as(c_int, 1))); - try os.bind(server, &address.any, address.getOsSockLen()); - try os.listen(server, kernel_backlog); + const socket_test_harness = try createSocketTestHarness(&ring); + defer socket_test_harness.close(); var buffer_recv = [_]u8{ 0, 1, 0, 1, 0 }; - var accept_addr: os.sockaddr = undefined; - var accept_addr_len: os.socklen_t = @sizeOf(@TypeOf(accept_addr)); - _ = try ring.accept(0xaaaaaaaa, server, &accept_addr, &accept_addr_len, 0); - try testing.expectEqual(@as(u32, 1), try ring.submit()); - - const client = try os.socket(address.any.family, os.SOCK.STREAM | os.SOCK.CLOEXEC, 0); - defer os.close(client); - _ = try ring.connect(0xcccccccc, client, &address.any, address.getOsSockLen()); - try testing.expectEqual(@as(u32, 1), try ring.submit()); - - var cqe_accept = try ring.copy_cqe(); - if (cqe_accept.err() == .INVAL) return error.SkipZigTest; - var cqe_connect = try ring.copy_cqe(); - if (cqe_connect.err() == .INVAL) return error.SkipZigTest; - - // The accept/connect CQEs may arrive in any order, the connect CQE will sometimes come first: - if (cqe_accept.user_data == 0xcccccccc and cqe_connect.user_data == 0xaaaaaaaa) { - const a = cqe_accept; - const b = cqe_connect; - cqe_accept = b; - cqe_connect = a; - } - - try testing.expectEqual(@as(u64, 0xaaaaaaaa), cqe_accept.user_data); - if (cqe_accept.res <= 0) std.debug.print("\ncqe_accept.res={}\n", .{cqe_accept.res}); - try testing.expect(cqe_accept.res > 0); - try testing.expectEqual(@as(u32, 0), cqe_accept.flags); - try testing.expectEqual(linux.io_uring_cqe{ - .user_data = 0xcccccccc, - .res = 0, - .flags = 0, - }, cqe_connect); - - _ = try ring.recv(0xffffffff, cqe_accept.res, .{ .buffer = buffer_recv[0..] }, 0); + _ = try ring.recv(0xffffffff, socket_test_harness.server, .{ .buffer = buffer_recv[0..] }, 0); try testing.expectEqual(@as(u32, 1), try ring.submit()); const sqe_cancel = try ring.cancel(0x99999999, 0xffffffff, 0); From acb8af468f496dadbc7550196d7ba05fb8efd586 Mon Sep 17 00:00:00 2001 From: Vincent Rischmann Date: Mon, 16 May 2022 21:54:29 +0200 Subject: [PATCH 6/7] io_uring: add tests for automatic buffer selection --- lib/std/os/linux/io_uring.zig | 384 ++++++++++++++++++++++++++++++++++ 1 file changed, 384 insertions(+) diff --git a/lib/std/os/linux/io_uring.zig b/lib/std/os/linux/io_uring.zig index bb48c63719..a62362c963 100644 --- a/lib/std/os/linux/io_uring.zig +++ b/lib/std/os/linux/io_uring.zig @@ -2830,6 +2830,390 @@ test "linkat" { try testing.expectEqualStrings("hello", second_file_data[0..read]); } +test "provide_buffers: read" { + if (builtin.os.tag != .linux) return error.SkipZigTest; + + var ring = IO_Uring.init(1, 0) catch |err| switch (err) { + error.SystemOutdated => return error.SkipZigTest, + error.PermissionDenied => return error.SkipZigTest, + else => return err, + }; + defer ring.deinit(); + + const fd = try os.openZ("/dev/zero", os.O.RDONLY | os.O.CLOEXEC, 0); + defer os.close(fd); + + const group_id = 1337; + const buffer_id = 0; + + const buffer_len = 128; + + var buffers: [4][buffer_len]u8 = undefined; + + // Provide 4 buffers + + { + const sqe = try ring.provide_buffers(0xcccccccc, @ptrCast([*]u8, &buffers), buffers.len, buffer_len, group_id, buffer_id); + try testing.expectEqual(linux.IORING_OP.PROVIDE_BUFFERS, sqe.opcode); + try testing.expectEqual(@as(i32, buffers.len), sqe.fd); + try testing.expectEqual(@as(u32, buffers[0].len), sqe.len); + try testing.expectEqual(@as(u16, group_id), sqe.buf_index); + try testing.expectEqual(@as(u32, 1), try ring.submit()); + + const cqe = try ring.copy_cqe(); + switch (cqe.err()) { + // Happens when the kernel is < 5.7 + .INVAL => return error.SkipZigTest, + .SUCCESS => {}, + else => |errno| std.debug.panic("unhandled errno: {}", .{errno}), + } + try testing.expectEqual(@as(u64, 0xcccccccc), cqe.user_data); + } + + // Do 4 reads which should consume all buffers + + var i: usize = 0; + while (i < buffers.len) : (i += 1) { + var sqe = try ring.read(0xdededede, fd, .{ .buffer_selection = .{ .group_id = group_id, .len = buffer_len } }, 0); + try testing.expectEqual(linux.IORING_OP.READ, sqe.opcode); + try testing.expectEqual(@as(i32, fd), sqe.fd); + try testing.expectEqual(@as(u64, 0), sqe.addr); + try testing.expectEqual(@as(u32, buffer_len), sqe.len); + try testing.expectEqual(@as(u16, group_id), sqe.buf_index); + try testing.expectEqual(@as(u32, 1), try ring.submit()); + + const cqe = try ring.copy_cqe(); + switch (cqe.err()) { + .SUCCESS => {}, + else => |errno| std.debug.panic("unhandled errno: {}", .{errno}), + } + + try testing.expect(cqe.flags & linux.IORING_CQE_F_BUFFER == linux.IORING_CQE_F_BUFFER); + const used_buffer_id = cqe.flags >> 16; + try testing.expect(used_buffer_id >= 0 and used_buffer_id <= 3); + try testing.expectEqual(@as(i32, buffer_len), cqe.res); + + try testing.expectEqual(@as(u64, 0xdededede), cqe.user_data); + try testing.expectEqualSlices(u8, &([_]u8{0} ** buffer_len), buffers[used_buffer_id][0..@intCast(usize, cqe.res)]); + } + + // This read should fail + + { + var sqe = try ring.read(0xdfdfdfdf, fd, .{ .buffer_selection = .{ .group_id = group_id, .len = buffer_len } }, 0); + try testing.expectEqual(linux.IORING_OP.READ, sqe.opcode); + try testing.expectEqual(@as(i32, fd), sqe.fd); + try testing.expectEqual(@as(u64, 0), sqe.addr); + try testing.expectEqual(@as(u32, buffer_len), sqe.len); + try testing.expectEqual(@as(u16, group_id), sqe.buf_index); + try testing.expectEqual(@as(u32, 1), try ring.submit()); + + const cqe = try ring.copy_cqe(); + switch (cqe.err()) { + // Expected + .NOBUFS => {}, + .SUCCESS => std.debug.panic("unexpected success", .{}), + else => |errno| std.debug.panic("unhandled errno: {}", .{errno}), + } + try testing.expectEqual(@as(u64, 0xdfdfdfdf), cqe.user_data); + } + + // Provide 1 buffer again + + // Deliberately put something we don't expect in the buffers + mem.set(u8, mem.sliceAsBytes(&buffers), 42); + + const reprovided_buffer_id = 2; + + { + _ = try ring.provide_buffers(0xabababab, @ptrCast([*]u8, &buffers[reprovided_buffer_id]), 1, buffer_len, group_id, reprovided_buffer_id); + try testing.expectEqual(@as(u32, 1), try ring.submit()); + + const cqe = try ring.copy_cqe(); + switch (cqe.err()) { + .SUCCESS => {}, + else => |errno| std.debug.panic("unhandled errno: {}", .{errno}), + } + } + + // Final read which should work + + { + var sqe = try ring.read(0xdfdfdfdf, fd, .{ .buffer_selection = .{ .group_id = group_id, .len = buffer_len } }, 0); + try testing.expectEqual(linux.IORING_OP.READ, sqe.opcode); + try testing.expectEqual(@as(i32, fd), sqe.fd); + try testing.expectEqual(@as(u64, 0), sqe.addr); + try testing.expectEqual(@as(u32, buffer_len), sqe.len); + try testing.expectEqual(@as(u16, group_id), sqe.buf_index); + try testing.expectEqual(@as(u32, 1), try ring.submit()); + + const cqe = try ring.copy_cqe(); + switch (cqe.err()) { + .SUCCESS => {}, + else => |errno| std.debug.panic("unhandled errno: {}", .{errno}), + } + + try testing.expect(cqe.flags & linux.IORING_CQE_F_BUFFER == linux.IORING_CQE_F_BUFFER); + const used_buffer_id = cqe.flags >> 16; + try testing.expectEqual(used_buffer_id, reprovided_buffer_id); + try testing.expectEqual(@as(i32, buffer_len), cqe.res); + try testing.expectEqual(@as(u64, 0xdfdfdfdf), cqe.user_data); + try testing.expectEqualSlices(u8, &([_]u8{0} ** buffer_len), buffers[used_buffer_id][0..@intCast(usize, cqe.res)]); + } +} + +test "remove_buffers" { + if (builtin.os.tag != .linux) return error.SkipZigTest; + + var ring = IO_Uring.init(1, 0) catch |err| switch (err) { + error.SystemOutdated => return error.SkipZigTest, + error.PermissionDenied => return error.SkipZigTest, + else => return err, + }; + defer ring.deinit(); + + const fd = try os.openZ("/dev/zero", os.O.RDONLY | os.O.CLOEXEC, 0); + defer os.close(fd); + + const group_id = 1337; + const buffer_id = 0; + + const buffer_len = 128; + + var buffers: [4][buffer_len]u8 = undefined; + + // Provide 4 buffers + + { + _ = try ring.provide_buffers(0xcccccccc, @ptrCast([*]u8, &buffers), buffers.len, buffer_len, group_id, buffer_id); + try testing.expectEqual(@as(u32, 1), try ring.submit()); + + const cqe = try ring.copy_cqe(); + switch (cqe.err()) { + .SUCCESS => {}, + else => |errno| std.debug.panic("unhandled errno: {}", .{errno}), + } + try testing.expectEqual(@as(u64, 0xcccccccc), cqe.user_data); + } + + // Remove the first 3 buffers + + { + var sqe = try ring.remove_buffers(0xbababababa, 3, group_id); + try testing.expectEqual(linux.IORING_OP.REMOVE_BUFFERS, sqe.opcode); + try testing.expectEqual(@as(i32, 3), sqe.fd); + try testing.expectEqual(@as(u64, 0), sqe.addr); + try testing.expectEqual(@as(u16, group_id), sqe.buf_index); + try testing.expectEqual(@as(u32, 1), try ring.submit()); + + const cqe = try ring.copy_cqe(); + switch (cqe.err()) { + .SUCCESS => {}, + else => |errno| std.debug.panic("unhandled errno: {}", .{errno}), + } + try testing.expectEqual(@as(u64, 0xbababababa), cqe.user_data); + } + + // This read should work + + { + _ = try ring.read(0xdfdfdfdf, fd, .{ .buffer_selection = .{ .group_id = group_id, .len = buffer_len } }, 0); + try testing.expectEqual(@as(u32, 1), try ring.submit()); + + const cqe = try ring.copy_cqe(); + switch (cqe.err()) { + .SUCCESS => {}, + else => |errno| std.debug.panic("unhandled errno: {}", .{errno}), + } + + try testing.expect(cqe.flags & linux.IORING_CQE_F_BUFFER == linux.IORING_CQE_F_BUFFER); + const used_buffer_id = cqe.flags >> 16; + try testing.expectEqual(used_buffer_id, 0); + try testing.expectEqual(@as(i32, buffer_len), cqe.res); + try testing.expectEqual(@as(u64, 0xdfdfdfdf), cqe.user_data); + try testing.expectEqualSlices(u8, &([_]u8{0} ** buffer_len), buffers[used_buffer_id][0..@intCast(usize, cqe.res)]); + } + + // Final read should _not_ work + + { + _ = try ring.read(0xdfdfdfdf, fd, .{ .buffer_selection = .{ .group_id = group_id, .len = buffer_len } }, 0); + try testing.expectEqual(@as(u32, 1), try ring.submit()); + + const cqe = try ring.copy_cqe(); + switch (cqe.err()) { + // Expected + .NOBUFS => {}, + .SUCCESS => std.debug.panic("unexpected success", .{}), + else => |errno| std.debug.panic("unhandled errno: {}", .{errno}), + } + } +} + +test "provide_buffers: accept/connect/send/recv" { + if (builtin.os.tag != .linux) return error.SkipZigTest; + + var ring = IO_Uring.init(16, 0) catch |err| switch (err) { + error.SystemOutdated => return error.SkipZigTest, + error.PermissionDenied => return error.SkipZigTest, + else => return err, + }; + defer ring.deinit(); + + const group_id = 1337; + const buffer_id = 0; + + const buffer_len = 128; + var buffers: [4][buffer_len]u8 = undefined; + + // Provide 4 buffers + + { + const sqe = try ring.provide_buffers(0xcccccccc, @ptrCast([*]u8, &buffers), buffers.len, buffer_len, group_id, buffer_id); + try testing.expectEqual(linux.IORING_OP.PROVIDE_BUFFERS, sqe.opcode); + try testing.expectEqual(@as(i32, buffers.len), sqe.fd); + try testing.expectEqual(@as(u32, buffer_len), sqe.len); + try testing.expectEqual(@as(u16, group_id), sqe.buf_index); + try testing.expectEqual(@as(u32, 1), try ring.submit()); + + const cqe = try ring.copy_cqe(); + switch (cqe.err()) { + // Happens when the kernel is < 5.7 + .INVAL => return error.SkipZigTest, + .SUCCESS => {}, + else => |errno| std.debug.panic("unhandled errno: {}", .{errno}), + } + try testing.expectEqual(@as(u64, 0xcccccccc), cqe.user_data); + } + + const socket_test_harness = try createSocketTestHarness(&ring); + defer socket_test_harness.close(); + + // Do 4 send on the socket + + { + var i: usize = 0; + while (i < buffers.len) : (i += 1) { + _ = try ring.send(0xdeaddead, socket_test_harness.server, &([_]u8{'z'} ** buffer_len), 0); + try testing.expectEqual(@as(u32, 1), try ring.submit()); + } + + var cqes: [4]linux.io_uring_cqe = undefined; + try testing.expectEqual(@as(u32, 4), try ring.copy_cqes(&cqes, 4)); + } + + // Do 4 recv which should consume all buffers + + // Deliberately put something we don't expect in the buffers + mem.set(u8, mem.sliceAsBytes(&buffers), 1); + + var i: usize = 0; + while (i < buffers.len) : (i += 1) { + var sqe = try ring.recv(0xdededede, socket_test_harness.client, .{ .buffer_selection = .{ .group_id = group_id, .len = buffer_len } }, 0); + try testing.expectEqual(linux.IORING_OP.RECV, sqe.opcode); + try testing.expectEqual(@as(i32, socket_test_harness.client), sqe.fd); + try testing.expectEqual(@as(u64, 0), sqe.addr); + try testing.expectEqual(@as(u32, buffer_len), sqe.len); + try testing.expectEqual(@as(u16, group_id), sqe.buf_index); + try testing.expectEqual(@as(u32, 0), sqe.rw_flags); + try testing.expectEqual(@as(u32, linux.IOSQE_BUFFER_SELECT), sqe.flags); + try testing.expectEqual(@as(u32, 1), try ring.submit()); + + const cqe = try ring.copy_cqe(); + switch (cqe.err()) { + .SUCCESS => {}, + else => |errno| std.debug.panic("unhandled errno: {}", .{errno}), + } + + try testing.expect(cqe.flags & linux.IORING_CQE_F_BUFFER == linux.IORING_CQE_F_BUFFER); + const used_buffer_id = cqe.flags >> 16; + try testing.expect(used_buffer_id >= 0 and used_buffer_id <= 3); + try testing.expectEqual(@as(i32, buffer_len), cqe.res); + + try testing.expectEqual(@as(u64, 0xdededede), cqe.user_data); + const buffer = buffers[used_buffer_id][0..@intCast(usize, cqe.res)]; + try testing.expectEqualSlices(u8, &([_]u8{'z'} ** buffer_len), buffer); + } + + // This recv should fail + + { + var sqe = try ring.recv(0xdfdfdfdf, socket_test_harness.client, .{ .buffer_selection = .{ .group_id = group_id, .len = buffer_len } }, 0); + try testing.expectEqual(linux.IORING_OP.RECV, sqe.opcode); + try testing.expectEqual(@as(i32, socket_test_harness.client), sqe.fd); + try testing.expectEqual(@as(u64, 0), sqe.addr); + try testing.expectEqual(@as(u32, buffer_len), sqe.len); + try testing.expectEqual(@as(u16, group_id), sqe.buf_index); + try testing.expectEqual(@as(u32, 0), sqe.rw_flags); + try testing.expectEqual(@as(u32, linux.IOSQE_BUFFER_SELECT), sqe.flags); + try testing.expectEqual(@as(u32, 1), try ring.submit()); + + const cqe = try ring.copy_cqe(); + switch (cqe.err()) { + // Expected + .NOBUFS => {}, + .SUCCESS => std.debug.panic("unexpected success", .{}), + else => |errno| std.debug.panic("unhandled errno: {}", .{errno}), + } + try testing.expectEqual(@as(u64, 0xdfdfdfdf), cqe.user_data); + } + + // Provide 1 buffer again + + const reprovided_buffer_id = 2; + + { + _ = try ring.provide_buffers(0xabababab, @ptrCast([*]u8, &buffers[reprovided_buffer_id]), 1, buffer_len, group_id, reprovided_buffer_id); + try testing.expectEqual(@as(u32, 1), try ring.submit()); + + const cqe = try ring.copy_cqe(); + switch (cqe.err()) { + .SUCCESS => {}, + else => |errno| std.debug.panic("unhandled errno: {}", .{errno}), + } + } + + // Redo 1 send on the server socket + + { + _ = try ring.send(0xdeaddead, socket_test_harness.server, &([_]u8{'w'} ** buffer_len), 0); + try testing.expectEqual(@as(u32, 1), try ring.submit()); + + _ = try ring.copy_cqe(); + } + + // Final recv which should work + + // Deliberately put something we don't expect in the buffers + mem.set(u8, mem.sliceAsBytes(&buffers), 1); + + { + var sqe = try ring.recv(0xdfdfdfdf, socket_test_harness.client, .{ .buffer_selection = .{ .group_id = group_id, .len = buffer_len } }, 0); + try testing.expectEqual(linux.IORING_OP.RECV, sqe.opcode); + try testing.expectEqual(@as(i32, socket_test_harness.client), sqe.fd); + try testing.expectEqual(@as(u64, 0), sqe.addr); + try testing.expectEqual(@as(u32, buffer_len), sqe.len); + try testing.expectEqual(@as(u16, group_id), sqe.buf_index); + try testing.expectEqual(@as(u32, 0), sqe.rw_flags); + try testing.expectEqual(@as(u32, linux.IOSQE_BUFFER_SELECT), sqe.flags); + try testing.expectEqual(@as(u32, 1), try ring.submit()); + + const cqe = try ring.copy_cqe(); + switch (cqe.err()) { + .SUCCESS => {}, + else => |errno| std.debug.panic("unhandled errno: {}", .{errno}), + } + + try testing.expect(cqe.flags & linux.IORING_CQE_F_BUFFER == linux.IORING_CQE_F_BUFFER); + const used_buffer_id = cqe.flags >> 16; + try testing.expectEqual(used_buffer_id, reprovided_buffer_id); + try testing.expectEqual(@as(i32, buffer_len), cqe.res); + try testing.expectEqual(@as(u64, 0xdfdfdfdf), cqe.user_data); + const buffer = buffers[used_buffer_id][0..@intCast(usize, cqe.res)]; + try testing.expectEqualSlices(u8, &([_]u8{'w'} ** buffer_len), buffer); + } +} + /// Used for testing server/client interactions. const SocketTestHarness = struct { listener: os.socket_t, From 3c58d3e281fca421879fe1d2ae7943bd31dccc0e Mon Sep 17 00:00:00 2001 From: Vincent Rischmann Date: Fri, 20 May 2022 20:41:56 +0200 Subject: [PATCH 7/7] io_uring: replace the readv method with a read on a new ReadBuffer type readv() is essentially identical to read() except for the buffer type, this simplifies the API for the caller at the cost of not clearly mapping to the liburing C API. --- lib/std/os/linux/io_uring.zig | 31 +++++++++++-------------------- 1 file changed, 11 insertions(+), 20 deletions(-) diff --git a/lib/std/os/linux/io_uring.zig b/lib/std/os/linux/io_uring.zig index a62362c963..31c416c8a1 100644 --- a/lib/std/os/linux/io_uring.zig +++ b/lib/std/os/linux/io_uring.zig @@ -363,6 +363,9 @@ pub const IO_Uring = struct { /// io_uring will read directly into this buffer buffer: []u8, + /// io_uring will read directly into these buffers using readv. + iovecs: []const os.iovec, + /// io_uring will select a buffer that has previously been provided with `provide_buffers`. /// The buffer group reference by `group_id` must contain at least one buffer for the read to work. /// `len` controls the number of bytes to read into the selected buffer. @@ -372,7 +375,11 @@ pub const IO_Uring = struct { }, }; - /// Queues (but does not submit) an SQE to perform a `read(2)`. + /// Queues (but does not submit) an SQE to perform a `read(2)` or `preadv` depending on the buffer type. + /// * Reading into a `ReadBuffer.buffer` uses `read(2)` + /// * Reading into a `ReadBuffer.iovecs` uses `preadv(2)` + /// If you want to do a `preadv2()` then set `rw_flags` on the returned SQE. See https://linux.die.net/man/2/preadv. + /// /// Returns a pointer to the SQE. pub fn read( self: *IO_Uring, @@ -384,6 +391,7 @@ pub const IO_Uring = struct { const sqe = try self.get_sqe(); switch (buffer) { .buffer => |slice| io_uring_prep_read(sqe, fd, slice, offset), + .iovecs => |vecs| io_uring_prep_readv(sqe, fd, vecs, offset), .buffer_selection => |selection| { io_uring_prep_rw(.READ, sqe, fd, 0, selection.len, offset); sqe.flags |= linux.IOSQE_BUFFER_SELECT; @@ -409,23 +417,6 @@ pub const IO_Uring = struct { return sqe; } - /// Queues (but does not submit) an SQE to perform a `preadv()`. - /// Returns a pointer to the SQE so that you can further modify the SQE for advanced use cases. - /// For example, if you want to do a `preadv2()` then set `rw_flags` on the returned SQE. - /// See https://linux.die.net/man/2/preadv. - pub fn readv( - self: *IO_Uring, - user_data: u64, - fd: os.fd_t, - iovecs: []const os.iovec, - offset: u64, - ) !*io_uring_sqe { - const sqe = try self.get_sqe(); - io_uring_prep_readv(sqe, fd, iovecs, offset); - sqe.user_data = user_data; - return sqe; - } - /// Queues (but does not submit) an SQE to perform a IORING_OP_READ_FIXED. /// The `buffer` provided must be registered with the kernel by calling `register_buffers` first. /// The `buffer_index` must be the same as its index in the array provided to `register_buffers`. @@ -1715,7 +1706,7 @@ test "readv" { var buffer = [_]u8{42} ** 128; var iovecs = [_]os.iovec{os.iovec{ .iov_base = &buffer, .iov_len = buffer.len }}; - const sqe = try ring.readv(0xcccccccc, fd_index, iovecs[0..], 0); + const sqe = try ring.read(0xcccccccc, fd_index, .{ .iovecs = iovecs[0..] }, 0); try testing.expectEqual(linux.IORING_OP.READV, sqe.opcode); sqe.flags |= linux.IOSQE_FIXED_FILE; @@ -1766,7 +1757,7 @@ test "writev/fsync/readv" { try testing.expectEqual(fd, sqe_fsync.fd); sqe_fsync.flags |= linux.IOSQE_IO_LINK; - const sqe_readv = try ring.readv(0xffffffff, fd, iovecs_read[0..], 17); + const sqe_readv = try ring.read(0xffffffff, fd, .{ .iovecs = iovecs_read[0..] }, 17); try testing.expectEqual(linux.IORING_OP.READV, sqe_readv.opcode); try testing.expectEqual(@as(u64, 17), sqe_readv.off);