From 69a55fc560dab477222f2ad104050b443647faa5 Mon Sep 17 00:00:00 2001 From: Joran Dirk Greef Date: Sun, 4 Oct 2020 13:01:41 +0200 Subject: [PATCH] Allow for advanced non-sequential SQE allocation schemes Decouples SQE queueing and SQE prepping methods to allow for non-sequential SQE allocation schemes as suggested by @daurnimator. Adds essential SQE prepping methods from liburing to reduce boilerplate. Removes non-essential .link_with_next_sqe() and .use_registered_fd(). --- lib/std/os/linux/io_uring.zig | 361 +++++++++++++++++++++++----------- 1 file changed, 246 insertions(+), 115 deletions(-) diff --git a/lib/std/os/linux/io_uring.zig b/lib/std/os/linux/io_uring.zig index 49a1eab556..c142fa3f73 100644 --- a/lib/std/os/linux/io_uring.zig +++ b/lib/std/os/linux/io_uring.zig @@ -135,13 +135,13 @@ pub const IO_Uring = struct { self.fd = -1; } - /// Returns a pointer to a zeroed SQE, or an error if the submission queue is full. + /// Returns a pointer to a vacant SQE, or an error if the submission queue is full. /// We follow the implementation (and atomics) of liburing's `io_uring_get_sqe()` exactly. /// However, instead of a null we return an error to force safe handling. /// Any situation where the submission queue is full tends more towards a control flow error, /// and the null return in liburing is more a C idiom than anything else, for lack of a better /// alternative. In Zig, we have first-class error handling... so let's use it. - /// Matches the implementation of io_uring_get_sqe() in liburing, except zeroes for safety. + /// Matches the implementation of io_uring_get_sqe() in liburing. pub fn get_sqe(self: *IO_Uring) !*io_uring_sqe { const head = @atomicLoad(u32, self.sq.head, .Acquire); // Remember that these head and tail offsets wrap around every four billion operations. @@ -150,8 +150,6 @@ pub const IO_Uring = struct { if (next -% head > self.sq.sqes.len) return error.SubmissionQueueFull; var sqe = &self.sq.sqes[self.sq.sqe_tail & self.sq.mask]; self.sq.sqe_tail = next; - // We zero the SQE slot here in a single place, rather than in many `queue_` methods. - @memset(@ptrCast([*]u8, sqe), 0, @sizeOf(io_uring_sqe)); return sqe; } @@ -336,29 +334,6 @@ pub const IO_Uring = struct { } } - /// Queues (but does not submit) an SQE to perform an `accept4(2)` on a socket. - /// Returns a pointer to the SQE. - pub fn queue_accept( - self: *IO_Uring, - user_data: u64, - fd: os.fd_t, - addr: *os.sockaddr, - addrlen: *os.socklen_t, - accept_flags: u32 - ) !*io_uring_sqe { - // "sqe->fd is the file descriptor, sqe->addr holds a pointer to struct sockaddr, - // sqe->addr2 holds a pointer to socklen_t, and finally sqe->accept_flags holds the flags - // for accept(4)." - https://lwn.net/ml/linux-block/20191025173037.13486-1-axboe@kernel.dk/ - const sqe = try self.get_sqe(); - sqe.opcode = .ACCEPT; - sqe.fd = fd; - sqe.off = @ptrToInt(addrlen); // `addr2` is a newer union member that maps to `off`. - sqe.addr = @ptrToInt(addr); - sqe.user_data = user_data; - sqe.rw_flags = accept_flags; - return sqe; - } - /// Queues (but does not submit) an SQE to perform an `fsync(2)`. /// Returns a pointer to the SQE so that you can further modify the SQE for advanced use cases. /// For example, for `fdatasync()` you can set `IORING_FSYNC_DATASYNC` in the SQE's `rw_flags`. @@ -368,11 +343,9 @@ pub const IO_Uring = struct { /// apply to the write, since the fsync may complete before the write is issued to the disk. /// You should preferably use `link_with_next_sqe()` on a write's SQE to link it with an fsync, /// or else insert a full write barrier using `drain_previous_sqes()` when queueing an fsync. - pub fn queue_fsync(self: *IO_Uring, user_data: u64, fd: os.fd_t, flags: u32) !*io_uring_sqe { + pub fn fsync(self: *IO_Uring, user_data: u64, fd: os.fd_t, flags: u32) !*io_uring_sqe { const sqe = try self.get_sqe(); - sqe.opcode = .FSYNC; - sqe.fd = fd; - sqe.rw_flags = flags; + io_uring_prep_fsync(sqe, fd, flags); sqe.user_data = user_data; return sqe; } @@ -382,16 +355,16 @@ pub const IO_Uring = struct { /// A no-op is more useful than may appear at first glance. /// For example, you could call `drain_previous_sqes()` on the returned SQE, to use the no-op to /// know when the ring is idle before acting on a kill signal. - pub fn queue_nop(self: *IO_Uring, user_data: u64) !*io_uring_sqe { + pub fn nop(self: *IO_Uring, user_data: u64) !*io_uring_sqe { const sqe = try self.get_sqe(); - sqe.opcode = .NOP; + io_uring_prep_nop(sqe); sqe.user_data = user_data; return sqe; } /// Queues (but does not submit) an SQE to perform a `read(2)`. /// Returns a pointer to the SQE. - pub fn queue_read( + pub fn read( self: *IO_Uring, user_data: u64, fd: os.fd_t, @@ -399,18 +372,14 @@ pub const IO_Uring = struct { offset: u64 ) !*io_uring_sqe { const sqe = try self.get_sqe(); - sqe.opcode = .READ; - sqe.fd = fd; - sqe.off = offset; - sqe.addr = @ptrToInt(buffer.ptr); - sqe.len = @intCast(u32, buffer.len); + io_uring_prep_read(sqe, fd, buffer, offset); sqe.user_data = user_data; return sqe; } /// Queues (but does not submit) an SQE to perform a `write(2)`. /// Returns a pointer to the SQE. - pub fn queue_write( + pub fn write( self: *IO_Uring, user_data: u64, fd: os.fd_t, @@ -418,11 +387,7 @@ pub const IO_Uring = struct { offset: u64 ) !*io_uring_sqe { const sqe = try self.get_sqe(); - sqe.opcode = .WRITE; - sqe.fd = fd; - sqe.off = offset; - sqe.addr = @ptrToInt(buffer.ptr); - sqe.len = @intCast(u32, buffer.len); + io_uring_prep_write(sqe, fd, buffer, offset); sqe.user_data = user_data; return sqe; } @@ -431,7 +396,7 @@ pub const IO_Uring = struct { /// Returns a pointer to the SQE so that you can further modify the SQE for advanced use cases. /// For example, if you want to do a `preadv2()` then set `rw_flags` on the returned SQE. /// See https://linux.die.net/man/2/preadv. - pub fn queue_readv( + pub fn readv( self: *IO_Uring, user_data: u64, fd: os.fd_t, @@ -439,11 +404,7 @@ pub const IO_Uring = struct { offset: u64 ) !*io_uring_sqe { const sqe = try self.get_sqe(); - sqe.opcode = .READV; - sqe.fd = fd; - sqe.off = offset; - sqe.addr = @ptrToInt(iovecs.ptr); - sqe.len = @intCast(u32, iovecs.len); + io_uring_prep_readv(sqe, fd, iovecs, offset); sqe.user_data = user_data; return sqe; } @@ -452,7 +413,7 @@ pub const IO_Uring = struct { /// Returns a pointer to the SQE so that you can further modify the SQE for advanced use cases. /// For example, if you want to do a `pwritev2()` then set `rw_flags` on the returned SQE. /// See https://linux.die.net/man/2/pwritev. - pub fn queue_writev( + pub fn writev( self: *IO_Uring, user_data: u64, fd: os.fd_t, @@ -460,25 +421,25 @@ pub const IO_Uring = struct { offset: u64 ) !*io_uring_sqe { const sqe = try self.get_sqe(); - sqe.opcode = .WRITEV; - sqe.fd = fd; - sqe.off = offset; - sqe.addr = @ptrToInt(iovecs.ptr); - sqe.len = @intCast(u32, iovecs.len); + io_uring_prep_writev(sqe, fd, iovecs, offset); sqe.user_data = user_data; return sqe; } - /// The next SQE will not be started until this one completes. - /// This can be used to chain causally dependent SQEs, and the chain can be arbitrarily long. - /// The tail of the chain is denoted by the first SQE that does not have this flag set. - /// This flag has no effect on previous SQEs, nor does it impact SQEs outside the chain. - /// This means that multiple chains can be executing in parallel, along with individual SQEs. - /// Only members inside the chain are serialized. - /// A chain will be broken if any SQE in the chain ends in error, where any unexpected result is - /// considered an error. For example, a short read will terminate the remainder of the chain. - pub fn link_with_next_sqe(self: *IO_Uring, sqe: *io_uring_sqe) void { - sqe.flags |= linux.IOSQE_IO_LINK; + /// Queues (but does not submit) an SQE to perform an `accept4(2)` on a socket. + /// Returns a pointer to the SQE. + pub fn accept( + self: *IO_Uring, + user_data: u64, + fd: os.fd_t, + addr: *os.sockaddr, + addrlen: *os.socklen_t, + flags: u32 + ) !*io_uring_sqe { + const sqe = try self.get_sqe(); + io_uring_prep_accept(sqe, fd, addr, addrlen, flags); + sqe.user_data = user_data; + return sqe; } /// Like `link_with_next_sqe()` but stronger. @@ -538,11 +499,6 @@ pub const IO_Uring = struct { } } - /// Changes the semantics of the SQE's `fd` to refer to a pre-registered file descriptor. - pub fn use_registered_fd(self: *IO_Uring, sqe: *io_uring_sqe) void { - sqe.flags |= linux.IOSQE_FIXED_FILE; - } - /// Unregisters all registered file descriptors previously associated with the ring. pub fn unregister_files(self: *IO_Uring) !void { assert(self.fd >= 0); @@ -563,8 +519,8 @@ pub const SubmissionQueue = struct { dropped: *u32, array: []u32, sqes: []io_uring_sqe, - mmap: []align(std.mem.page_size) u8, - mmap_sqes: []align(std.mem.page_size) u8, + mmap: []align(mem.page_size) u8, + mmap_sqes: []align(mem.page_size) u8, // We use `sqe_head` and `sqe_tail` in the same way as liburing: // We increment `sqe_tail` (but not `tail`) for each call to `get_sqe()`. @@ -666,7 +622,156 @@ pub const CompletionQueue = struct { } }; -test "structs and offsets" { +pub fn io_uring_prep_nop(sqe: *io_uring_sqe) void { + sqe.* = .{ + .opcode = .NOP, + .flags = 0, + .ioprio = 0, + .fd = 0, + .off = 0, + .addr = 0, + .len = 0, + .rw_flags = 0, + .user_data = 0, + .buf_index = 0, + .personality = 0, + .splice_fd_in = 0, + .__pad2 = [2]u64{ 0, 0 } + }; +} + +pub fn io_uring_prep_fsync(sqe: *io_uring_sqe, fd: os.fd_t, flags: u32) void { + sqe.* = .{ + .opcode = .FSYNC, + .flags = 0, + .ioprio = 0, + .fd = fd, + .off = 0, + .addr = 0, + .len = 0, + .rw_flags = flags, + .user_data = 0, + .buf_index = 0, + .personality = 0, + .splice_fd_in = 0, + .__pad2 = [2]u64{ 0, 0 } + }; +} + +pub fn io_uring_prep_rw( + op: linux.IORING_OP, + sqe: *io_uring_sqe, + fd: os.fd_t, + addr: anytype, + len: usize, + offset: u64 +) void { + sqe.* = .{ + .opcode = op, + .flags = 0, + .ioprio = 0, + .fd = fd, + .off = offset, + .addr = @ptrToInt(addr), + .len = @intCast(u32, len), + .rw_flags = 0, + .user_data = 0, + .buf_index = 0, + .personality = 0, + .splice_fd_in = 0, + .__pad2 = [2]u64{ 0, 0 } + }; +} + +pub fn io_uring_prep_read(sqe: *io_uring_sqe, fd: os.fd_t, buffer: []u8, offset: u64) void { + io_uring_prep_rw(.READ, sqe, fd, buffer.ptr, buffer.len, offset); +} + +pub fn io_uring_prep_write(sqe: *io_uring_sqe, fd: os.fd_t, buffer: []const u8, offset: u64) void { + io_uring_prep_rw(.WRITE, sqe, fd, buffer.ptr, buffer.len, offset); +} + +pub fn io_uring_prep_readv( + sqe: *io_uring_sqe, + fd: os.fd_t, + iovecs: []const os.iovec, + offset: u64 +) void { + io_uring_prep_rw(.READV, sqe, fd, iovecs.ptr, iovecs.len, offset); +} + +pub fn io_uring_prep_writev( + sqe: *io_uring_sqe, + fd: os.fd_t, + iovecs: []const os.iovec_const, + offset: u64 +) void { + io_uring_prep_rw(.WRITEV, sqe, fd, iovecs.ptr, iovecs.len, offset); +} + +pub fn io_uring_prep_accept( + sqe: *io_uring_sqe, + fd: os.fd_t, + addr: *os.sockaddr, + addrlen: *os.socklen_t, + flags: u32 +) void { + // `addr` holds a pointer to `sockaddr`, and `addr2` holds a pointer to socklen_t`. + // `addr2` maps to `sqe.off` (u64) instead of `sqe.len` (which is only a u32). + io_uring_prep_rw(.ACCEPT, sqe, fd, addr, 0, @ptrToInt(addrlen)); + sqe.rw_flags = flags; +} + +pub fn io_uring_prep_connect( + sqe: *io_uring_sqe, + fd: os.fd_t, + addr: *const os.sockaddr, + addrlen: os.socklen_t +) void { + // `addrlen` maps to `sqe.off` (u64) instead of `sqe.len` (which is only a u32). + io_uring_prep_rw(.CONNECT, sqe, fd, addr, 0, addrlen); +} + +pub fn io_uring_prep_recv(sqe: *io_uring_sqe, fd: os.fd_t, buffer: []u8, flags: u32) void { + io_uring_prep_rw(.RECV, sqe, fd, buffer.ptr, buffer.len, 0); + sqe.rw_flags = flags; +} + +pub fn io_uring_prep_send(sqe: *io_uring_sqe, fd: os.fd_t, buffer: []const u8, flags: u32) void { + io_uring_prep_rw(.SEND, sqe, fd, buffer.ptr, buffer.len, 0); + sqe.rw_flags = flags; +} + +pub fn io_uring_prep_openat( + sqe: *io_uring_sqe, + fd: os.fd_t, + path: [*:0]const u8, + flags: u32, + mode: os.mode_t +) void { + io_uring_prep_rw(.OPENAT, sqe, fd, path, mode, 0); + sqe.rw_flags = flags; +} + +pub fn io_uring_prep_close(sqe: *io_uring_sqe, fd: os.fd_t) void { + sqe.* = .{ + .opcode = .CLOSE, + .flags = 0, + .ioprio = 0, + .fd = fd, + .off = 0, + .addr = 0, + .len = 0, + .rw_flags = 0, + .user_data = 0, + .buf_index = 0, + .personality = 0, + .splice_fd_in = 0, + .__pad2 = [2]u64{ 0, 0 } + }; +} + +test "structs/offsets/entries" { if (builtin.os.tag != .linux) return error.SkipZigTest; testing.expectEqual(@as(usize, 120), @sizeOf(io_uring_params)); @@ -681,7 +786,7 @@ test "structs and offsets" { testing.expectError(error.EntriesNotPowerOfTwo, IO_Uring.init(3, 0)); } -test "queue_nop" { +test "nop" { if (builtin.os.tag != .linux) return error.SkipZigTest; var ring = IO_Uring.init(1, 0) catch |err| switch (err) { @@ -694,7 +799,7 @@ test "queue_nop" { testing.expectEqual(@as(os.fd_t, -1), ring.fd); } - var sqe = try ring.queue_nop(@intCast(u64, 0xaaaaaaaa)); + var sqe = try ring.nop(0xaaaaaaaa); testing.expectEqual(io_uring_sqe { .opcode = .NOP, .flags = 0, @@ -704,7 +809,7 @@ test "queue_nop" { .addr = 0, .len = 0, .rw_flags = 0, - .user_data = @intCast(u64, 0xaaaaaaaa), + .user_data = 0xaaaaaaaa, .buf_index = 0, .personality = 0, .splice_fd_in = 0, @@ -733,9 +838,8 @@ test "queue_nop" { testing.expectEqual(@as(u32, 1), ring.cq.head.*); testing.expectEqual(@as(u32, 0), ring.cq_ready()); - var sqe_barrier = try ring.queue_nop(@intCast(u64, 0xbbbbbbbb)); - ring.drain_previous_sqes(sqe_barrier); - testing.expectEqual(@as(u8, linux.IOSQE_IO_DRAIN), sqe_barrier.flags); + var sqe_barrier = try ring.nop(0xbbbbbbbb); + sqe_barrier.flags |= linux.IOSQE_IO_DRAIN; testing.expectEqual(@as(u32, 1), try ring.submit()); testing.expectEqual(io_uring_cqe { .user_data = 0xbbbbbbbb, @@ -748,7 +852,7 @@ test "queue_nop" { testing.expectEqual(@as(u32, 2), ring.cq.head.*); } -test "queue_readv" { +test "readv" { if (builtin.os.tag != .linux) return error.SkipZigTest; var ring = IO_Uring.init(1, 0) catch |err| switch (err) { @@ -774,11 +878,11 @@ test "queue_readv" { var buffer = [_]u8{42} ** 128; var iovecs = [_]os.iovec{ os.iovec { .iov_base = &buffer, .iov_len = buffer.len } }; - var sqe = try ring.queue_readv(0xcccccccc, fd_index, iovecs[0..], 0); - ring.use_registered_fd(sqe); - testing.expectEqual(@as(u8, linux.IOSQE_FIXED_FILE), sqe.flags); + var sqe = try ring.readv(0xcccccccc, fd_index, iovecs[0..], 0); + testing.expectEqual(linux.IORING_OP.READV, sqe.opcode); + sqe.flags |= linux.IOSQE_FIXED_FILE; - testing.expectError(error.SubmissionQueueFull, ring.queue_nop(0)); + testing.expectError(error.SubmissionQueueFull, ring.nop(0)); testing.expectEqual(@as(u32, 1), try ring.submit()); testing.expectEqual(linux.io_uring_cqe { .user_data = 0xcccccccc, @@ -790,52 +894,75 @@ test "queue_readv" { try ring.unregister_files(); } -test "queue_writev/queue_fsync" { +test "writev/fsync/readv" { if (builtin.os.tag != .linux) return error.SkipZigTest; - var ring = IO_Uring.init(2, 0) catch |err| switch (err) { + var ring = IO_Uring.init(4, 0) catch |err| switch (err) { error.SystemOutdated => return error.SkipZigTest, error.PermissionDenied => return error.SkipZigTest, else => return err }; defer ring.deinit(); - const path = "test_io_uring_queue_writev"; - const file = try std.fs.cwd().createFile(path, .{ .truncate = true }); + const path = "test_io_uring_writev_fsync_readv"; + const file = try std.fs.cwd().createFile(path, .{ .read = true, .truncate = true }); defer file.close(); defer std.fs.cwd().deleteFile(path) catch {}; const fd = file.handle; - var buffer = [_]u8{42} ** 128; - var iovecs = [_]os.iovec_const { - os.iovec_const { .iov_base = &buffer, .iov_len = buffer.len } + var buffer_write = [_]u8{42} ** 128; + var iovecs_write = [_]os.iovec_const { + os.iovec_const { .iov_base = &buffer_write, .iov_len = buffer_write.len } + }; + var buffer_read = [_]u8{0} ** 128; + var iovecs_read = [_]os.iovec { + os.iovec { .iov_base = &buffer_read, .iov_len = buffer_read.len } }; - var sqe_writev = try ring.queue_writev(0xdddddddd, fd, iovecs[0..], 0); - ring.link_with_next_sqe(sqe_writev); - testing.expectEqual(@as(u8, linux.IOSQE_IO_LINK), sqe_writev.flags); - - var sqe_fsync = try ring.queue_fsync(0xeeeeeeee, fd, 0); - testing.expectEqual(fd, sqe_fsync.fd); - testing.expectEqual(@as(u32, 2), ring.sq_ready()); - testing.expectEqual(@as(u32, 2), try ring.submit_and_wait(2)); + var sqe_writev = try ring.writev(0xdddddddd, fd, iovecs_write[0..], 17); + testing.expectEqual(linux.IORING_OP.WRITEV, sqe_writev.opcode); + testing.expectEqual(@as(u64, 17), sqe_writev.off); + sqe_writev.flags |= linux.IOSQE_IO_LINK; + + var sqe_fsync = try ring.fsync(0xeeeeeeee, fd, 0); + testing.expectEqual(linux.IORING_OP.FSYNC, sqe_fsync.opcode); + testing.expectEqual(fd, sqe_fsync.fd); + sqe_fsync.flags |= linux.IOSQE_IO_LINK; + + var sqe_readv = try ring.readv(0xffffffff, fd, iovecs_read[0..], 17); + testing.expectEqual(linux.IORING_OP.READV, sqe_readv.opcode); + testing.expectEqual(@as(u64, 17), sqe_readv.off); + + testing.expectEqual(@as(u32, 3), ring.sq_ready()); + testing.expectEqual(@as(u32, 3), try ring.submit_and_wait(3)); testing.expectEqual(@as(u32, 0), ring.sq_ready()); - testing.expectEqual(@as(u32, 2), ring.cq_ready()); + testing.expectEqual(@as(u32, 3), ring.cq_ready()); + testing.expectEqual(linux.io_uring_cqe { .user_data = 0xdddddddd, - .res = buffer.len, + .res = buffer_write.len, .flags = 0, }, try ring.copy_cqe()); - testing.expectEqual(@as(u32, 1), ring.cq_ready()); + testing.expectEqual(@as(u32, 2), ring.cq_ready()); + testing.expectEqual(linux.io_uring_cqe { .user_data = 0xeeeeeeee, .res = 0, .flags = 0, }, try ring.copy_cqe()); + testing.expectEqual(@as(u32, 1), ring.cq_ready()); + + testing.expectEqual(linux.io_uring_cqe { + .user_data = 0xffffffff, + .res = buffer_read.len, + .flags = 0, + }, try ring.copy_cqe()); testing.expectEqual(@as(u32, 0), ring.cq_ready()); + + testing.expectEqualSlices(u8, buffer_write[0..], buffer_read[0..]); } -test "queue_write/queue_read" { +test "write/read" { if (builtin.os.tag != .linux) return error.SkipZigTest; var ring = IO_Uring.init(2, 0) catch |err| switch (err) { @@ -845,7 +972,7 @@ test "queue_write/queue_read" { }; defer ring.deinit(); - const path = "test_io_uring_queue_write"; + const path = "test_io_uring_write_read"; const file = try std.fs.cwd().createFile(path, .{ .read = true, .truncate = true }); defer file.close(); defer std.fs.cwd().deleteFile(path) catch {}; @@ -853,26 +980,30 @@ test "queue_write/queue_read" { var buffer_write = [_]u8{97} ** 20; var buffer_read = [_]u8{98} ** 20; - var sqe_write = try ring.queue_write(123, fd, buffer_write[0..], 10); - ring.link_with_next_sqe(sqe_write); - var sqe_read = try ring.queue_read(456, fd, buffer_read[0..], 10); + var sqe_write = try ring.write(123, fd, buffer_write[0..], 10); + testing.expectEqual(linux.IORING_OP.WRITE, sqe_write.opcode); + testing.expectEqual(@as(u64, 10), sqe_write.off); + sqe_write.flags |= linux.IOSQE_IO_LINK; + var sqe_read = try ring.read(456, fd, buffer_read[0..], 10); + testing.expectEqual(linux.IORING_OP.READ, sqe_read.opcode); + testing.expectEqual(@as(u64, 10), sqe_read.off); testing.expectEqual(@as(u32, 2), try ring.submit()); - var cqe1 = try ring.copy_cqe(); - var cqe2 = try ring.copy_cqe(); + var cqe_write = try ring.copy_cqe(); + var cqe_read = try ring.copy_cqe(); // Prior to Linux Kernel 5.6 this is the only way to test for read/write support: // https://lwn.net/Articles/809820/ - if (cqe1.res == -linux.EINVAL) return error.SkipZigTest; - if (cqe2.res == -linux.EINVAL) return error.SkipZigTest; + if (cqe_write.res == -linux.EINVAL) return error.SkipZigTest; + if (cqe_read.res == -linux.EINVAL) return error.SkipZigTest; testing.expectEqual(linux.io_uring_cqe { .user_data = 123, .res = buffer_write.len, .flags = 0, - }, cqe1); + }, cqe_write); testing.expectEqual(linux.io_uring_cqe { .user_data = 456, .res = buffer_read.len, .flags = 0, - }, cqe2); + }, cqe_read); testing.expectEqualSlices(u8, buffer_write[0..], buffer_read[0..]); }