io_uring: ring mapped buffers (#17806)

* io_uring: ring mapped buffers

Ring mapped buffers are newer implementation of ring provided buffers, supported
since kernel 5.19. Best described in Jens Axboe [post](https://github.com/axboe/liburing/wiki/io_uring-and-networking-in-2023#provided-buffers)

This commit implements low level io_uring_*_buf_ring_* functions as mostly
direct translation from liburing. It also adds BufferGroup abstraction over those
low level functions.

* io_uring: add multishot recv to BufferGroup

Once we have ring mapped provided buffers functionality it is possible to use
multishot recv operation. Multishot receive is submitted once, and completions
are posted whenever data arrives on the socket. Received data are placed in a
new buffer from buffer group.

Reference: [io_uring and networking in 2023](https://github.com/axboe/liburing/wiki/io_uring-and-networking-in-2023#multi-shot)

Getting NOENT for cancel completion result, meaning:
  -ENOENT
    The request identified by user_data could not be located.
    This could be because it completed before the cancelation
    request was issued, or if an invalid identifier is used.

https://man7.org/linux/man-pages/man3/io_uring_prep_cancel.3.html
https://github.com/ziglang/zig/actions/runs/6801394000/job/18492139893?pr=17806

Result in cancel/recv cqes are different depending on the kernel.
on older kernel (tested with v6.0.16, v6.1.57, v6.2.12, v6.4.16)
  cqe_cancel.err() == .NOENT
  cqe_crecv.err() == .NOBUFS
on kernel (tested with v6.5.0, v6.5.7)
  cqe_cancel.err() == .SUCCESS
  cqe_crecv.err() == .CANCELED
This commit is contained in:
Igor Anić 2024-03-16 03:34:31 +01:00 committed by GitHub
parent ce4245f873
commit 3ea1276eeb
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
3 changed files with 610 additions and 14 deletions

View File

@ -4387,6 +4387,16 @@ pub const io_uring_cqe = extern struct {
}
return .SUCCESS;
}
// On successful completion of the provided buffers IO request, the CQE flags field
// will have IORING_CQE_F_BUFFER set and the selected buffer ID will be indicated by
// the upper 16-bits of the flags field.
pub fn buffer_id(self: io_uring_cqe) !u16 {
if (self.flags & IORING_CQE_F_BUFFER != IORING_CQE_F_BUFFER) {
return error.NoBufferSelected;
}
return @as(u16, @intCast(self.flags >> IORING_CQE_BUFFER_SHIFT));
}
};
// io_uring_cqe.flags
@ -4667,8 +4677,12 @@ pub const io_uring_buf = extern struct {
resv: u16,
};
// io_uring_buf_ring struct omitted
// it's a io_uring_buf array with the resv of the first item used as a "tail" field.
pub const io_uring_buf_ring = extern struct {
resv1: u64,
resv2: u32,
resv3: u16,
tail: u16,
};
/// argument for IORING_(UN)REGISTER_PBUF_RING
pub const io_uring_buf_reg = extern struct {

View File

@ -1,5 +1,5 @@
const IoUring = @This();
const std = @import("../../std.zig");
const std = @import("std");
const builtin = @import("builtin");
const assert = std.debug.assert;
const mem = std.mem;
@ -1440,6 +1440,229 @@ pub const CompletionQueue = struct {
}
};
/// Group of application provided buffers. Uses newer type, called ring mapped
/// buffers, supported since kernel 5.19. Buffers are identified by a buffer
/// group ID, and within that group, a buffer ID. IO_Uring can have multiple
/// buffer groups, each with unique group ID.
///
/// In `init` application provides contiguous block of memory `buffers` for
/// `buffers_count` buffers of size `buffers_size`. Application can then submit
/// `recv` operation without providing buffer upfront. Once the operation is
/// ready to receive data, a buffer is picked automatically and the resulting
/// CQE will contain the buffer ID in `cqe.buffer_id()`. Use `get` method to get
/// buffer for buffer ID identified by CQE. Once the application has processed
/// the buffer, it may hand ownership back to the kernel, by calling `put`
/// allowing the cycle to repeat.
///
/// Depending on the rate of arrival of data, it is possible that a given buffer
/// group will run out of buffers before those in CQEs can be put back to the
/// kernel. If this happens, a `cqe.err()` will have ENOBUFS as the error value.
///
pub const BufferGroup = struct {
/// Parent ring for which this group is registered.
ring: *IoUring,
/// Pointer to the memory shared by the kernel.
/// `buffers_count` of `io_uring_buf` structures are shared by the kernel.
/// First `io_uring_buf` is overlaid by `io_uring_buf_ring` struct.
br: *align(mem.page_size) linux.io_uring_buf_ring,
/// Contiguous block of memory of size (buffers_count * buffer_size).
buffers: []u8,
/// Size of each buffer in buffers.
buffer_size: u32,
// Number of buffers in `buffers`, number of `io_uring_buf structures` in br.
buffers_count: u16,
/// ID of this group, must be unique in ring.
group_id: u16,
pub fn init(
ring: *IoUring,
group_id: u16,
buffers: []u8,
buffer_size: u32,
buffers_count: u16,
) !BufferGroup {
assert(buffers.len == buffers_count * buffer_size);
const br = try setup_buf_ring(ring.fd, buffers_count, group_id);
buf_ring_init(br);
const mask = buf_ring_mask(buffers_count);
var i: u16 = 0;
while (i < buffers_count) : (i += 1) {
const start = buffer_size * i;
const buf = buffers[start .. start + buffer_size];
buf_ring_add(br, buf, i, mask, i);
}
buf_ring_advance(br, buffers_count);
return BufferGroup{
.ring = ring,
.group_id = group_id,
.br = br,
.buffers = buffers,
.buffer_size = buffer_size,
.buffers_count = buffers_count,
};
}
// Prepare recv operation which will select buffer from this group.
pub fn recv(self: *BufferGroup, user_data: u64, fd: os.fd_t, flags: u32) !*linux.io_uring_sqe {
var sqe = try self.ring.get_sqe();
sqe.prep_rw(.RECV, fd, 0, 0, 0);
sqe.rw_flags = flags;
sqe.flags |= linux.IOSQE_BUFFER_SELECT;
sqe.buf_index = self.group_id;
sqe.user_data = user_data;
return sqe;
}
// Prepare multishot recv operation which will select buffer from this group.
pub fn recv_multishot(self: *BufferGroup, user_data: u64, fd: os.fd_t, flags: u32) !*linux.io_uring_sqe {
var sqe = try self.recv(user_data, fd, flags);
sqe.ioprio |= linux.IORING_RECV_MULTISHOT;
return sqe;
}
// Get buffer by id.
pub fn get(self: *BufferGroup, buffer_id: u16) []u8 {
const head = self.buffer_size * buffer_id;
return self.buffers[head .. head + self.buffer_size];
}
// Get buffer by CQE.
pub fn get_cqe(self: *BufferGroup, cqe: linux.io_uring_cqe) ![]u8 {
const buffer_id = try cqe.buffer_id();
const used_len = @as(usize, @intCast(cqe.res));
return self.get(buffer_id)[0..used_len];
}
// Release buffer to the kernel.
pub fn put(self: *BufferGroup, buffer_id: u16) void {
const mask = buf_ring_mask(self.buffers_count);
const buffer = self.get(buffer_id);
buf_ring_add(self.br, buffer, buffer_id, mask, 0);
buf_ring_advance(self.br, 1);
}
// Release buffer from CQE to the kernel.
pub fn put_cqe(self: *BufferGroup, cqe: linux.io_uring_cqe) !void {
self.put(try cqe.buffer_id());
}
pub fn deinit(self: *BufferGroup) void {
free_buf_ring(self.ring.fd, self.br, self.buffers_count, self.group_id);
}
};
/// Registers a shared buffer ring to be used with provided buffers.
/// `entries` number of `io_uring_buf` structures is mem mapped and shared by kernel.
/// `fd` is IO_Uring.fd for which the provided buffer ring is being registered.
/// `entries` is the number of entries requested in the buffer ring, must be power of 2.
/// `group_id` is the chosen buffer group ID, unique in IO_Uring.
pub fn setup_buf_ring(fd: os.fd_t, entries: u16, group_id: u16) !*align(mem.page_size) linux.io_uring_buf_ring {
if (entries == 0 or entries > 1 << 15) return error.EntriesNotInRange;
if (!std.math.isPowerOfTwo(entries)) return error.EntriesNotPowerOfTwo;
const mmap_size = entries * @sizeOf(linux.io_uring_buf);
const mmap = try os.mmap(
null,
mmap_size,
os.PROT.READ | os.PROT.WRITE,
.{ .TYPE = .PRIVATE, .ANONYMOUS = true },
-1,
0,
);
errdefer os.munmap(mmap);
assert(mmap.len == mmap_size);
const br: *align(mem.page_size) linux.io_uring_buf_ring = @ptrCast(mmap.ptr);
try register_buf_ring(fd, @intFromPtr(br), entries, group_id);
return br;
}
fn register_buf_ring(fd: os.fd_t, addr: u64, entries: u32, group_id: u16) !void {
var reg = mem.zeroInit(linux.io_uring_buf_reg, .{
.ring_addr = addr,
.ring_entries = entries,
.bgid = group_id,
});
const res = linux.io_uring_register(
fd,
.REGISTER_PBUF_RING,
@as(*const anyopaque, @ptrCast(&reg)),
1,
);
try handle_register_buf_ring_result(res);
}
fn unregister_buf_ring(fd: os.fd_t, group_id: u16) !void {
var reg = mem.zeroInit(linux.io_uring_buf_reg, .{
.bgid = group_id,
});
const res = linux.io_uring_register(
fd,
.UNREGISTER_PBUF_RING,
@as(*const anyopaque, @ptrCast(&reg)),
1,
);
try handle_register_buf_ring_result(res);
}
fn handle_register_buf_ring_result(res: usize) !void {
switch (linux.getErrno(res)) {
.SUCCESS => {},
.INVAL => return error.ArgumentsInvalid,
else => |errno| return os.unexpectedErrno(errno),
}
}
// Unregisters a previously registered shared buffer ring, returned from io_uring_setup_buf_ring.
pub fn free_buf_ring(fd: os.fd_t, br: *align(mem.page_size) linux.io_uring_buf_ring, entries: u32, group_id: u16) void {
unregister_buf_ring(fd, group_id) catch {};
var mmap: []align(mem.page_size) u8 = undefined;
mmap.ptr = @ptrCast(br);
mmap.len = entries * @sizeOf(linux.io_uring_buf);
os.munmap(mmap);
}
/// Initialises `br` so that it is ready to be used.
pub fn buf_ring_init(br: *linux.io_uring_buf_ring) void {
br.tail = 0;
}
/// Calculates the appropriate size mask for a buffer ring.
/// `entries` is the ring entries as specified in io_uring_register_buf_ring.
pub fn buf_ring_mask(entries: u16) u16 {
return entries - 1;
}
/// Assigns `buffer` with the `br` buffer ring.
/// `buffer_id` is identifier which will be returned in the CQE.
/// `buffer_offset` is the offset to insert at from the current tail.
/// If just one buffer is provided before the ring tail is committed with advance then offset should be 0.
/// If buffers are provided in a loop before being committed, the offset must be incremented by one for each buffer added.
pub fn buf_ring_add(
br: *linux.io_uring_buf_ring,
buffer: []u8,
buffer_id: u16,
mask: u16,
buffer_offset: u16,
) void {
const bufs: [*]linux.io_uring_buf = @ptrCast(br);
const buf: *linux.io_uring_buf = &bufs[(br.tail +% buffer_offset) & mask];
buf.addr = @intFromPtr(buffer.ptr);
buf.len = @intCast(buffer.len);
buf.bid = buffer_id;
}
/// Make `count` new buffers visible to the kernel. Called after
/// `io_uring_buf_ring_add` has been called `count` times to fill in new buffers.
pub fn buf_ring_advance(br: *linux.io_uring_buf_ring, count: u16) void {
const tail: u16 = br.tail +% count;
@atomicStore(u16, &br.tail, tail, .release);
}
test "structs/offsets/entries" {
if (builtin.os.tag != .linux) return error.SkipZigTest;
@ -3652,7 +3875,7 @@ test "waitid" {
try testing.expectEqual(7, siginfo.fields.common.second.sigchld.status);
}
/// For use in tests. Returns SkipZigTest is kernel version is less than required.
/// For use in tests. Returns SkipZigTest if kernel version is less than required.
inline fn skipKernelLessThan(required: std.SemanticVersion) !void {
if (builtin.os.tag != .linux) return error.SkipZigTest;
@ -3668,3 +3891,342 @@ inline fn skipKernelLessThan(required: std.SemanticVersion) !void {
current.pre = null; // don't check pre field
if (required.order(current) == .gt) return error.SkipZigTest;
}
test BufferGroup {
if (builtin.os.tag != .linux) return error.SkipZigTest;
// Init IoUring
var ring = IoUring.init(16, 0) catch |err| switch (err) {
error.SystemOutdated => return error.SkipZigTest,
error.PermissionDenied => return error.SkipZigTest,
else => return err,
};
defer ring.deinit();
// Init buffer group for ring
const group_id: u16 = 1; // buffers group id
const buffers_count: u16 = 1; // number of buffers in buffer group
const buffer_size: usize = 128; // size of each buffer in group
const buffers = try testing.allocator.alloc(u8, buffers_count * buffer_size);
defer testing.allocator.free(buffers);
var buf_grp = BufferGroup.init(
&ring,
group_id,
buffers,
buffer_size,
buffers_count,
) catch |err| switch (err) {
// kernel older than 5.19
error.ArgumentsInvalid => return error.SkipZigTest,
else => return err,
};
defer buf_grp.deinit();
// Create client/server fds
const fds = try createSocketTestHarness(&ring);
defer fds.close();
const data = [_]u8{ 0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 0xa, 0xb, 0xc, 0xd, 0xe };
// Client sends data
{
_ = try ring.send(1, fds.client, data[0..], 0);
const submitted = try ring.submit();
try testing.expectEqual(1, submitted);
const cqe_send = try ring.copy_cqe();
if (cqe_send.err() == .INVAL) return error.SkipZigTest;
try testing.expectEqual(linux.io_uring_cqe{ .user_data = 1, .res = data.len, .flags = 0 }, cqe_send);
}
// Server uses buffer group receive
{
// Submit recv operation, buffer will be choosen from buffer group
_ = try buf_grp.recv(2, fds.server, 0);
const submitted = try ring.submit();
try testing.expectEqual(1, submitted);
// ... when we have completion for recv operation
const cqe = try ring.copy_cqe();
try testing.expectEqual(2, cqe.user_data); // matches submitted user_data
try testing.expect(cqe.res >= 0); // success
try testing.expectEqual(os.E.SUCCESS, cqe.err());
try testing.expectEqual(data.len, @as(usize, @intCast(cqe.res))); // cqe.res holds received data len
// Read buffer_id and used buffer len from cqe
const buffer_id = try cqe.buffer_id();
const len: usize = @intCast(cqe.res);
// Get buffer from pool
const buf = buf_grp.get(buffer_id)[0..len];
try testing.expectEqualSlices(u8, &data, buf);
// Releaase buffer to the kernel when application is done with it
buf_grp.put(buffer_id);
}
}
test "ring mapped buffers recv" {
if (builtin.os.tag != .linux) return error.SkipZigTest;
var ring = IoUring.init(16, 0) catch |err| switch (err) {
error.SystemOutdated => return error.SkipZigTest,
error.PermissionDenied => return error.SkipZigTest,
else => return err,
};
defer ring.deinit();
// init buffer group
const group_id: u16 = 1; // buffers group id
const buffers_count: u16 = 2; // number of buffers in buffer group
const buffer_size: usize = 4; // size of each buffer in group
const buffers = try testing.allocator.alloc(u8, buffers_count * buffer_size);
defer testing.allocator.free(buffers);
var buf_grp = BufferGroup.init(
&ring,
group_id,
buffers,
buffer_size,
buffers_count,
) catch |err| switch (err) {
// kernel older than 5.19
error.ArgumentsInvalid => return error.SkipZigTest,
else => return err,
};
defer buf_grp.deinit();
// create client/server fds
const fds = try createSocketTestHarness(&ring);
defer fds.close();
// for random user_data in sqe/cqe
var Rnd = std.rand.DefaultPrng.init(0);
var rnd = Rnd.random();
var round: usize = 4; // repeat send/recv cycle round times
while (round > 0) : (round -= 1) {
// client sends data
const data = [_]u8{ 0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 0xa, 0xb, 0xc, 0xd, 0xe };
{
const user_data = rnd.int(u64);
_ = try ring.send(user_data, fds.client, data[0..], 0);
try testing.expectEqual(@as(u32, 1), try ring.submit());
const cqe_send = try ring.copy_cqe();
if (cqe_send.err() == .INVAL) return error.SkipZigTest;
try testing.expectEqual(linux.io_uring_cqe{ .user_data = user_data, .res = data.len, .flags = 0 }, cqe_send);
}
// server reads data into provided buffers
// there are 2 buffers of size 4, so each read gets only chunk of data
// we read four chunks of 4, 4, 4, 3 bytes each
var chunk: []const u8 = data[0..buffer_size]; // first chunk
const id1 = try expect_buf_grp_recv(&ring, &buf_grp, fds.server, rnd.int(u64), chunk);
chunk = data[buffer_size .. buffer_size * 2]; // second chunk
const id2 = try expect_buf_grp_recv(&ring, &buf_grp, fds.server, rnd.int(u64), chunk);
// both buffers provided to the kernel are used so we get error
// 'no more buffers', until we put buffers to the kernel
{
const user_data = rnd.int(u64);
_ = try buf_grp.recv(user_data, fds.server, 0);
try testing.expectEqual(@as(u32, 1), try ring.submit());
const cqe = try ring.copy_cqe();
try testing.expectEqual(user_data, cqe.user_data);
try testing.expect(cqe.res < 0); // fail
try testing.expectEqual(os.E.NOBUFS, cqe.err());
try testing.expect(cqe.flags & linux.IORING_CQE_F_BUFFER == 0); // IORING_CQE_F_BUFFER flags is set on success only
try testing.expectError(error.NoBufferSelected, cqe.buffer_id());
}
// put buffers back to the kernel
buf_grp.put(id1);
buf_grp.put(id2);
chunk = data[buffer_size * 2 .. buffer_size * 3]; // third chunk
const id3 = try expect_buf_grp_recv(&ring, &buf_grp, fds.server, rnd.int(u64), chunk);
buf_grp.put(id3);
chunk = data[buffer_size * 3 ..]; // last chunk
const id4 = try expect_buf_grp_recv(&ring, &buf_grp, fds.server, rnd.int(u64), chunk);
buf_grp.put(id4);
}
}
test "ring mapped buffers multishot recv" {
if (builtin.os.tag != .linux) return error.SkipZigTest;
var ring = IoUring.init(16, 0) catch |err| switch (err) {
error.SystemOutdated => return error.SkipZigTest,
error.PermissionDenied => return error.SkipZigTest,
else => return err,
};
defer ring.deinit();
// init buffer group
const group_id: u16 = 1; // buffers group id
const buffers_count: u16 = 2; // number of buffers in buffer group
const buffer_size: usize = 4; // size of each buffer in group
const buffers = try testing.allocator.alloc(u8, buffers_count * buffer_size);
defer testing.allocator.free(buffers);
var buf_grp = BufferGroup.init(
&ring,
group_id,
buffers,
buffer_size,
buffers_count,
) catch |err| switch (err) {
// kernel older than 5.19
error.ArgumentsInvalid => return error.SkipZigTest,
else => return err,
};
defer buf_grp.deinit();
// create client/server fds
const fds = try createSocketTestHarness(&ring);
defer fds.close();
// for random user_data in sqe/cqe
var Rnd = std.rand.DefaultPrng.init(0);
var rnd = Rnd.random();
var round: usize = 4; // repeat send/recv cycle round times
while (round > 0) : (round -= 1) {
// client sends data
const data = [_]u8{ 0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 0xa, 0xb, 0xc, 0xd, 0xe };
{
const user_data = rnd.int(u64);
_ = try ring.send(user_data, fds.client, data[0..], 0);
try testing.expectEqual(@as(u32, 1), try ring.submit());
const cqe_send = try ring.copy_cqe();
if (cqe_send.err() == .INVAL) return error.SkipZigTest;
try testing.expectEqual(linux.io_uring_cqe{ .user_data = user_data, .res = data.len, .flags = 0 }, cqe_send);
}
// start multishot recv
var recv_user_data = rnd.int(u64);
_ = try buf_grp.recv_multishot(recv_user_data, fds.server, 0);
try testing.expectEqual(@as(u32, 1), try ring.submit()); // submit
// server reads data into provided buffers
// there are 2 buffers of size 4, so each read gets only chunk of data
// we read four chunks of 4, 4, 4, 3 bytes each
var chunk: []const u8 = data[0..buffer_size]; // first chunk
const cqe1 = try expect_buf_grp_cqe(&ring, &buf_grp, recv_user_data, chunk);
try testing.expect(cqe1.flags & linux.IORING_CQE_F_MORE > 0);
chunk = data[buffer_size .. buffer_size * 2]; // second chunk
const cqe2 = try expect_buf_grp_cqe(&ring, &buf_grp, recv_user_data, chunk);
try testing.expect(cqe2.flags & linux.IORING_CQE_F_MORE > 0);
// both buffers provided to the kernel are used so we get error
// 'no more buffers', until we put buffers to the kernel
{
const cqe = try ring.copy_cqe();
try testing.expectEqual(recv_user_data, cqe.user_data);
try testing.expect(cqe.res < 0); // fail
try testing.expectEqual(os.E.NOBUFS, cqe.err());
try testing.expect(cqe.flags & linux.IORING_CQE_F_BUFFER == 0); // IORING_CQE_F_BUFFER flags is set on success only
// has more is not set
// indicates that multishot is finished
try testing.expect(cqe.flags & linux.IORING_CQE_F_MORE == 0);
try testing.expectError(error.NoBufferSelected, cqe.buffer_id());
}
// put buffers back to the kernel
buf_grp.put(try cqe1.buffer_id());
buf_grp.put(try cqe2.buffer_id());
// restart multishot
recv_user_data = rnd.int(u64);
_ = try buf_grp.recv_multishot(recv_user_data, fds.server, 0);
try testing.expectEqual(@as(u32, 1), try ring.submit()); // submit
chunk = data[buffer_size * 2 .. buffer_size * 3]; // third chunk
const cqe3 = try expect_buf_grp_cqe(&ring, &buf_grp, recv_user_data, chunk);
try testing.expect(cqe3.flags & linux.IORING_CQE_F_MORE > 0);
buf_grp.put(try cqe3.buffer_id());
chunk = data[buffer_size * 3 ..]; // last chunk
const cqe4 = try expect_buf_grp_cqe(&ring, &buf_grp, recv_user_data, chunk);
try testing.expect(cqe4.flags & linux.IORING_CQE_F_MORE > 0);
buf_grp.put(try cqe4.buffer_id());
// cancel pending multishot recv operation
{
const cancel_user_data = rnd.int(u64);
_ = try ring.cancel(cancel_user_data, recv_user_data, 0);
try testing.expectEqual(@as(u32, 1), try ring.submit());
// expect completion of cancel operation and completion of recv operation
var cqe_cancel = try ring.copy_cqe();
if (cqe_cancel.err() == .INVAL) return error.SkipZigTest;
var cqe_recv = try ring.copy_cqe();
if (cqe_recv.err() == .INVAL) return error.SkipZigTest;
// don't depend on order of completions
if (cqe_cancel.user_data == recv_user_data and cqe_recv.user_data == cancel_user_data) {
const a = cqe_cancel;
const b = cqe_recv;
cqe_cancel = b;
cqe_recv = a;
}
// Note on different kernel results:
// on older kernel (tested with v6.0.16, v6.1.57, v6.2.12, v6.4.16)
// cqe_cancel.err() == .NOENT
// cqe_recv.err() == .NOBUFS
// on kernel (tested with v6.5.0, v6.5.7)
// cqe_cancel.err() == .SUCCESS
// cqe_recv.err() == .CANCELED
// Upstream reference: https://github.com/axboe/liburing/issues/984
// cancel operation is success (or NOENT on older kernels)
try testing.expectEqual(cancel_user_data, cqe_cancel.user_data);
try testing.expect(cqe_cancel.err() == .NOENT or cqe_cancel.err() == .SUCCESS);
// recv operation is failed with err CANCELED (or NOBUFS on older kernels)
try testing.expectEqual(recv_user_data, cqe_recv.user_data);
try testing.expect(cqe_recv.res < 0);
try testing.expect(cqe_recv.err() == .NOBUFS or cqe_recv.err() == .CANCELED);
try testing.expect(cqe_recv.flags & linux.IORING_CQE_F_MORE == 0);
}
}
}
// Prepare and submit recv using buffer group.
// Test that buffer from group, pointed by cqe, matches expected.
fn expect_buf_grp_recv(
ring: *IoUring,
buf_grp: *BufferGroup,
fd: os.fd_t,
user_data: u64,
expected: []const u8,
) !u16 {
// prepare and submit read
const sqe = try buf_grp.recv(user_data, fd, 0);
try testing.expect(sqe.flags & linux.IOSQE_BUFFER_SELECT == linux.IOSQE_BUFFER_SELECT);
try testing.expect(sqe.buf_index == buf_grp.group_id);
try testing.expectEqual(@as(u32, 1), try ring.submit()); // submit
const cqe = try expect_buf_grp_cqe(ring, buf_grp, user_data, expected);
return try cqe.buffer_id();
}
fn expect_buf_grp_cqe(
ring: *IoUring,
buf_grp: *BufferGroup,
user_data: u64,
expected: []const u8,
) !linux.io_uring_cqe {
// get cqe
const cqe = try ring.copy_cqe();
try testing.expectEqual(user_data, cqe.user_data);
try testing.expect(cqe.res >= 0); // success
try testing.expect(cqe.flags & linux.IORING_CQE_F_BUFFER == linux.IORING_CQE_F_BUFFER); // IORING_CQE_F_BUFFER flag is set
try testing.expectEqual(expected.len, @as(usize, @intCast(cqe.res)));
try testing.expectEqual(os.E.SUCCESS, cqe.err());
// get buffer from pool
const buffer_id = try cqe.buffer_id();
const len = @as(usize, @intCast(cqe.res));
const buf = buf_grp.get(buffer_id)[0..len];
try testing.expectEqualSlices(u8, expected, buf);
return cqe;
}

View File

@ -200,6 +200,36 @@ pub const io_uring_sqe = extern struct {
sqe.rw_flags = flags;
}
pub fn prep_recv_multishot(
sqe: *linux.io_uring_sqe,
fd: os.fd_t,
buffer: []u8,
flags: u32,
) void {
sqe.prep_recv(fd, buffer, flags);
sqe.ioprio |= linux.IORING_RECV_MULTISHOT;
}
pub fn prep_recvmsg(
sqe: *linux.io_uring_sqe,
fd: os.fd_t,
msg: *os.msghdr,
flags: u32,
) void {
sqe.prep_rw(.RECVMSG, fd, @intFromPtr(msg), 1, 0);
sqe.rw_flags = flags;
}
pub fn prep_recvmsg_multishot(
sqe: *linux.io_uring_sqe,
fd: os.fd_t,
msg: *os.msghdr,
flags: u32,
) void {
sqe.prep_recvmsg(fd, msg, flags);
sqe.ioprio |= linux.IORING_RECV_MULTISHOT;
}
pub fn prep_send(sqe: *linux.io_uring_sqe, fd: os.fd_t, buffer: []const u8, flags: u32) void {
sqe.prep_rw(.SEND, fd, @intFromPtr(buffer.ptr), buffer.len, 0);
sqe.rw_flags = flags;
@ -227,16 +257,6 @@ pub const io_uring_sqe = extern struct {
sqe.opcode = .SENDMSG_ZC;
}
pub fn prep_recvmsg(
sqe: *linux.io_uring_sqe,
fd: os.fd_t,
msg: *os.msghdr,
flags: u32,
) void {
sqe.prep_rw(.RECVMSG, fd, @intFromPtr(msg), 1, 0);
sqe.rw_flags = flags;
}
pub fn prep_sendmsg(
sqe: *linux.io_uring_sqe,
fd: os.fd_t,