mirror of
https://github.com/ziglang/zig.git
synced 2025-12-06 06:13:07 +00:00
Merge pull request #23062 from ianic/io_uring_bind
io_uring: Update to kernel changes in 6.11 and 6.12
This commit is contained in:
commit
b350049f51
@ -5806,6 +5806,9 @@ pub const IORING_OP = enum(u8) {
|
||||
FUTEX_WAITV,
|
||||
FIXED_FD_INSTALL,
|
||||
FTRUNCATE,
|
||||
BIND,
|
||||
LISTEN,
|
||||
RECV_ZC,
|
||||
|
||||
_,
|
||||
};
|
||||
@ -5930,6 +5933,8 @@ pub const IORING_CQE_F_MORE = 1 << 1;
|
||||
pub const IORING_CQE_F_SOCK_NONEMPTY = 1 << 2;
|
||||
/// Set for notification CQEs. Can be used to distinct them from sends.
|
||||
pub const IORING_CQE_F_NOTIF = 1 << 3;
|
||||
/// If set, the buffer ID set in the completion will get more completions.
|
||||
pub const IORING_CQE_F_BUF_MORE = 1 << 4;
|
||||
|
||||
pub const IORING_CQE_BUFFER_SHIFT = 16;
|
||||
|
||||
@ -6135,26 +6140,32 @@ pub const IO_URING_OP_SUPPORTED = 1 << 0;
|
||||
|
||||
pub const io_uring_probe_op = extern struct {
|
||||
op: IORING_OP,
|
||||
|
||||
resv: u8,
|
||||
|
||||
/// IO_URING_OP_* flags
|
||||
flags: u16,
|
||||
|
||||
resv2: u32,
|
||||
|
||||
pub fn is_supported(self: @This()) bool {
|
||||
return self.flags & IO_URING_OP_SUPPORTED != 0;
|
||||
}
|
||||
};
|
||||
|
||||
pub const io_uring_probe = extern struct {
|
||||
/// last opcode supported
|
||||
/// Last opcode supported
|
||||
last_op: IORING_OP,
|
||||
|
||||
/// Number of io_uring_probe_op following
|
||||
/// Length of ops[] array below
|
||||
ops_len: u8,
|
||||
|
||||
resv: u16,
|
||||
resv2: [3]u32,
|
||||
ops: [256]io_uring_probe_op,
|
||||
|
||||
// Followed by up to `ops_len` io_uring_probe_op structures
|
||||
/// Is the operation supported on the running kernel.
|
||||
pub fn is_supported(self: @This(), op: IORING_OP) bool {
|
||||
const i = @intFromEnum(op);
|
||||
if (i > @intFromEnum(self.last_op) or i >= self.ops_len)
|
||||
return false;
|
||||
return self.ops[i].is_supported();
|
||||
}
|
||||
};
|
||||
|
||||
pub const io_uring_restriction = extern struct {
|
||||
@ -6190,6 +6201,13 @@ pub const IORING_RESTRICTION = enum(u16) {
|
||||
_,
|
||||
};
|
||||
|
||||
pub const IO_URING_SOCKET_OP = enum(u16) {
|
||||
SIOCIN = 0,
|
||||
SIOCOUTQ = 1,
|
||||
GETSOCKOPT = 2,
|
||||
SETSOCKOPT = 3,
|
||||
};
|
||||
|
||||
pub const io_uring_buf = extern struct {
|
||||
addr: u64,
|
||||
len: u32,
|
||||
@ -6209,8 +6227,15 @@ pub const io_uring_buf_reg = extern struct {
|
||||
ring_addr: u64,
|
||||
ring_entries: u32,
|
||||
bgid: u16,
|
||||
pad: u16,
|
||||
flags: Flags,
|
||||
resv: [3]u64,
|
||||
|
||||
pub const Flags = packed struct {
|
||||
_0: u1 = 0,
|
||||
/// Incremental buffer consumption.
|
||||
inc: bool,
|
||||
_: u14 = 0,
|
||||
};
|
||||
};
|
||||
|
||||
pub const io_uring_getevents_arg = extern struct {
|
||||
|
||||
@ -1272,6 +1272,16 @@ pub fn unregister_buffers(self: *IoUring) !void {
|
||||
}
|
||||
}
|
||||
|
||||
/// Returns a io_uring_probe which is used to probe the capabilities of the
|
||||
/// io_uring subsystem of the running kernel. The io_uring_probe contains the
|
||||
/// list of supported operations.
|
||||
pub fn get_probe(self: *IoUring) !linux.io_uring_probe {
|
||||
var probe = mem.zeroInit(linux.io_uring_probe, .{});
|
||||
const res = linux.io_uring_register(self.fd, .REGISTER_PROBE, &probe, probe.ops.len);
|
||||
try handle_register_buf_ring_result(res);
|
||||
return probe;
|
||||
}
|
||||
|
||||
fn handle_registration_result(res: usize) !void {
|
||||
switch (linux.E.init(res)) {
|
||||
.SUCCESS => {},
|
||||
@ -1356,6 +1366,102 @@ pub fn socket_direct_alloc(
|
||||
return sqe;
|
||||
}
|
||||
|
||||
/// Queues (but does not submit) an SQE to perform an `bind(2)` on a socket.
|
||||
/// Returns a pointer to the SQE.
|
||||
/// Available since 6.11
|
||||
pub fn bind(
|
||||
self: *IoUring,
|
||||
user_data: u64,
|
||||
fd: posix.fd_t,
|
||||
addr: *const posix.sockaddr,
|
||||
addrlen: posix.socklen_t,
|
||||
flags: u32,
|
||||
) !*linux.io_uring_sqe {
|
||||
const sqe = try self.get_sqe();
|
||||
sqe.prep_bind(fd, addr, addrlen, flags);
|
||||
sqe.user_data = user_data;
|
||||
return sqe;
|
||||
}
|
||||
|
||||
/// Queues (but does not submit) an SQE to perform an `listen(2)` on a socket.
|
||||
/// Returns a pointer to the SQE.
|
||||
/// Available since 6.11
|
||||
pub fn listen(
|
||||
self: *IoUring,
|
||||
user_data: u64,
|
||||
fd: posix.fd_t,
|
||||
backlog: usize,
|
||||
flags: u32,
|
||||
) !*linux.io_uring_sqe {
|
||||
const sqe = try self.get_sqe();
|
||||
sqe.prep_listen(fd, backlog, flags);
|
||||
sqe.user_data = user_data;
|
||||
return sqe;
|
||||
}
|
||||
|
||||
/// Prepares an cmd request for a socket.
|
||||
/// See: https://man7.org/linux/man-pages/man3/io_uring_prep_cmd.3.html
|
||||
/// Available since 6.7.
|
||||
pub fn cmd_sock(
|
||||
self: *IoUring,
|
||||
user_data: u64,
|
||||
cmd_op: linux.IO_URING_SOCKET_OP,
|
||||
fd: linux.fd_t,
|
||||
level: u32, // linux.SOL
|
||||
optname: u32, // linux.SO
|
||||
optval: u64, // pointer to the option value
|
||||
optlen: u32, // size of the option value
|
||||
) !*linux.io_uring_sqe {
|
||||
const sqe = try self.get_sqe();
|
||||
sqe.prep_cmd_sock(cmd_op, fd, level, optname, optval, optlen);
|
||||
sqe.user_data = user_data;
|
||||
return sqe;
|
||||
}
|
||||
|
||||
/// Prepares set socket option for the optname argument, at the protocol
|
||||
/// level specified by the level argument.
|
||||
/// Available since 6.7.n
|
||||
pub fn setsockopt(
|
||||
self: *IoUring,
|
||||
user_data: u64,
|
||||
fd: linux.fd_t,
|
||||
level: u32, // linux.SOL
|
||||
optname: u32, // linux.SO
|
||||
opt: []const u8,
|
||||
) !*linux.io_uring_sqe {
|
||||
return try self.cmd_sock(
|
||||
user_data,
|
||||
.SETSOCKOPT,
|
||||
fd,
|
||||
level,
|
||||
optname,
|
||||
@intFromPtr(opt.ptr),
|
||||
@intCast(opt.len),
|
||||
);
|
||||
}
|
||||
|
||||
/// Prepares get socket option to retrieve the value for the option specified by
|
||||
/// the option_name argument for the socket specified by the fd argument.
|
||||
/// Available since 6.7.
|
||||
pub fn getsockopt(
|
||||
self: *IoUring,
|
||||
user_data: u64,
|
||||
fd: linux.fd_t,
|
||||
level: u32, // linux.SOL
|
||||
optname: u32, // linux.SO
|
||||
opt: []u8,
|
||||
) !*linux.io_uring_sqe {
|
||||
return try self.cmd_sock(
|
||||
user_data,
|
||||
.GETSOCKOPT,
|
||||
fd,
|
||||
level,
|
||||
optname,
|
||||
@intFromPtr(opt.ptr),
|
||||
@intCast(opt.len),
|
||||
);
|
||||
}
|
||||
|
||||
pub const SubmissionQueue = struct {
|
||||
head: *u32,
|
||||
tail: *u32,
|
||||
@ -1488,28 +1594,34 @@ pub const BufferGroup = struct {
|
||||
buffers: []u8,
|
||||
/// Size of each buffer in buffers.
|
||||
buffer_size: u32,
|
||||
// Number of buffers in `buffers`, number of `io_uring_buf structures` in br.
|
||||
/// Number of buffers in `buffers`, number of `io_uring_buf structures` in br.
|
||||
buffers_count: u16,
|
||||
/// Head of unconsumed part of each buffer, if incremental consumption is enabled
|
||||
heads: []u32,
|
||||
/// ID of this group, must be unique in ring.
|
||||
group_id: u16,
|
||||
|
||||
pub fn init(
|
||||
ring: *IoUring,
|
||||
allocator: mem.Allocator,
|
||||
group_id: u16,
|
||||
buffers: []u8,
|
||||
buffer_size: u32,
|
||||
buffers_count: u16,
|
||||
) !BufferGroup {
|
||||
assert(buffers.len == buffers_count * buffer_size);
|
||||
const buffers = try allocator.alloc(u8, buffer_size * buffers_count);
|
||||
errdefer allocator.free(buffers);
|
||||
const heads = try allocator.alloc(u32, buffers_count);
|
||||
errdefer allocator.free(heads);
|
||||
|
||||
const br = try setup_buf_ring(ring.fd, buffers_count, group_id);
|
||||
const br = try setup_buf_ring(ring.fd, buffers_count, group_id, .{ .inc = true });
|
||||
buf_ring_init(br);
|
||||
|
||||
const mask = buf_ring_mask(buffers_count);
|
||||
var i: u16 = 0;
|
||||
while (i < buffers_count) : (i += 1) {
|
||||
const start = buffer_size * i;
|
||||
const buf = buffers[start .. start + buffer_size];
|
||||
const pos = buffer_size * i;
|
||||
const buf = buffers[pos .. pos + buffer_size];
|
||||
heads[i] = 0;
|
||||
buf_ring_add(br, buf, i, mask, i);
|
||||
}
|
||||
buf_ring_advance(br, buffers_count);
|
||||
@ -1519,11 +1631,18 @@ pub const BufferGroup = struct {
|
||||
.group_id = group_id,
|
||||
.br = br,
|
||||
.buffers = buffers,
|
||||
.heads = heads,
|
||||
.buffer_size = buffer_size,
|
||||
.buffers_count = buffers_count,
|
||||
};
|
||||
}
|
||||
|
||||
pub fn deinit(self: *BufferGroup, allocator: mem.Allocator) void {
|
||||
free_buf_ring(self.ring.fd, self.br, self.buffers_count, self.group_id);
|
||||
allocator.free(self.buffers);
|
||||
allocator.free(self.heads);
|
||||
}
|
||||
|
||||
// Prepare recv operation which will select buffer from this group.
|
||||
pub fn recv(self: *BufferGroup, user_data: u64, fd: posix.fd_t, flags: u32) !*linux.io_uring_sqe {
|
||||
var sqe = try self.ring.get_sqe();
|
||||
@ -1543,33 +1662,34 @@ pub const BufferGroup = struct {
|
||||
}
|
||||
|
||||
// Get buffer by id.
|
||||
pub fn get(self: *BufferGroup, buffer_id: u16) []u8 {
|
||||
const head = self.buffer_size * buffer_id;
|
||||
return self.buffers[head .. head + self.buffer_size];
|
||||
fn get_by_id(self: *BufferGroup, buffer_id: u16) []u8 {
|
||||
const pos = self.buffer_size * buffer_id;
|
||||
return self.buffers[pos .. pos + self.buffer_size][self.heads[buffer_id]..];
|
||||
}
|
||||
|
||||
// Get buffer by CQE.
|
||||
pub fn get_cqe(self: *BufferGroup, cqe: linux.io_uring_cqe) ![]u8 {
|
||||
pub fn get(self: *BufferGroup, cqe: linux.io_uring_cqe) ![]u8 {
|
||||
const buffer_id = try cqe.buffer_id();
|
||||
const used_len = @as(usize, @intCast(cqe.res));
|
||||
return self.get(buffer_id)[0..used_len];
|
||||
}
|
||||
|
||||
// Release buffer to the kernel.
|
||||
pub fn put(self: *BufferGroup, buffer_id: u16) void {
|
||||
const mask = buf_ring_mask(self.buffers_count);
|
||||
const buffer = self.get(buffer_id);
|
||||
buf_ring_add(self.br, buffer, buffer_id, mask, 0);
|
||||
buf_ring_advance(self.br, 1);
|
||||
return self.get_by_id(buffer_id)[0..used_len];
|
||||
}
|
||||
|
||||
// Release buffer from CQE to the kernel.
|
||||
pub fn put_cqe(self: *BufferGroup, cqe: linux.io_uring_cqe) !void {
|
||||
self.put(try cqe.buffer_id());
|
||||
pub fn put(self: *BufferGroup, cqe: linux.io_uring_cqe) !void {
|
||||
const buffer_id = try cqe.buffer_id();
|
||||
if (cqe.flags & linux.IORING_CQE_F_BUF_MORE == linux.IORING_CQE_F_BUF_MORE) {
|
||||
// Incremental consumption active, kernel will write to the this buffer again
|
||||
const used_len = @as(u32, @intCast(cqe.res));
|
||||
// Track what part of the buffer is used
|
||||
self.heads[buffer_id] += used_len;
|
||||
return;
|
||||
}
|
||||
self.heads[buffer_id] = 0;
|
||||
|
||||
pub fn deinit(self: *BufferGroup) void {
|
||||
free_buf_ring(self.ring.fd, self.br, self.buffers_count, self.group_id);
|
||||
// Release buffer to the kernel. const mask = buf_ring_mask(self.buffers_count);
|
||||
const mask = buf_ring_mask(self.buffers_count);
|
||||
buf_ring_add(self.br, self.get_by_id(buffer_id), buffer_id, mask, 0);
|
||||
buf_ring_advance(self.br, 1);
|
||||
}
|
||||
};
|
||||
|
||||
@ -1578,7 +1698,12 @@ pub const BufferGroup = struct {
|
||||
/// `fd` is IO_Uring.fd for which the provided buffer ring is being registered.
|
||||
/// `entries` is the number of entries requested in the buffer ring, must be power of 2.
|
||||
/// `group_id` is the chosen buffer group ID, unique in IO_Uring.
|
||||
pub fn setup_buf_ring(fd: posix.fd_t, entries: u16, group_id: u16) !*align(page_size_min) linux.io_uring_buf_ring {
|
||||
pub fn setup_buf_ring(
|
||||
fd: posix.fd_t,
|
||||
entries: u16,
|
||||
group_id: u16,
|
||||
flags: linux.io_uring_buf_reg.Flags,
|
||||
) !*align(page_size_min) linux.io_uring_buf_ring {
|
||||
if (entries == 0 or entries > 1 << 15) return error.EntriesNotInRange;
|
||||
if (!std.math.isPowerOfTwo(entries)) return error.EntriesNotPowerOfTwo;
|
||||
|
||||
@ -1595,22 +1720,30 @@ pub fn setup_buf_ring(fd: posix.fd_t, entries: u16, group_id: u16) !*align(page_
|
||||
assert(mmap.len == mmap_size);
|
||||
|
||||
const br: *align(page_size_min) linux.io_uring_buf_ring = @ptrCast(mmap.ptr);
|
||||
try register_buf_ring(fd, @intFromPtr(br), entries, group_id);
|
||||
try register_buf_ring(fd, @intFromPtr(br), entries, group_id, flags);
|
||||
return br;
|
||||
}
|
||||
|
||||
fn register_buf_ring(fd: posix.fd_t, addr: u64, entries: u32, group_id: u16) !void {
|
||||
fn register_buf_ring(
|
||||
fd: posix.fd_t,
|
||||
addr: u64,
|
||||
entries: u32,
|
||||
group_id: u16,
|
||||
flags: linux.io_uring_buf_reg.Flags,
|
||||
) !void {
|
||||
var reg = mem.zeroInit(linux.io_uring_buf_reg, .{
|
||||
.ring_addr = addr,
|
||||
.ring_entries = entries,
|
||||
.bgid = group_id,
|
||||
.flags = flags,
|
||||
});
|
||||
const res = linux.io_uring_register(
|
||||
fd,
|
||||
.REGISTER_PBUF_RING,
|
||||
@as(*const anyopaque, @ptrCast(®)),
|
||||
1,
|
||||
);
|
||||
var res = linux.io_uring_register(fd, .REGISTER_PBUF_RING, @as(*const anyopaque, @ptrCast(®)), 1);
|
||||
if (linux.E.init(res) == .INVAL and reg.flags.inc) {
|
||||
// Retry without incremental buffer consumption.
|
||||
// It is available since kernel 6.12. returns INVAL on older.
|
||||
reg.flags.inc = false;
|
||||
res = linux.io_uring_register(fd, .REGISTER_PBUF_RING, @as(*const anyopaque, @ptrCast(®)), 1);
|
||||
}
|
||||
try handle_register_buf_ring_result(res);
|
||||
}
|
||||
|
||||
@ -3054,7 +3187,7 @@ test "provide_buffers: read" {
|
||||
const cqe = try ring.copy_cqe();
|
||||
switch (cqe.err()) {
|
||||
// Happens when the kernel is < 5.7
|
||||
.INVAL => return error.SkipZigTest,
|
||||
.INVAL, .BADF => return error.SkipZigTest,
|
||||
.SUCCESS => {},
|
||||
else => |errno| std.debug.panic("unhandled errno: {}", .{errno}),
|
||||
}
|
||||
@ -3181,7 +3314,7 @@ test "remove_buffers" {
|
||||
|
||||
const cqe = try ring.copy_cqe();
|
||||
switch (cqe.err()) {
|
||||
.INVAL => return error.SkipZigTest,
|
||||
.INVAL, .BADF => return error.SkipZigTest,
|
||||
.SUCCESS => {},
|
||||
else => |errno| std.debug.panic("unhandled errno: {}", .{errno}),
|
||||
}
|
||||
@ -3935,12 +4068,10 @@ test BufferGroup {
|
||||
const group_id: u16 = 1; // buffers group id
|
||||
const buffers_count: u16 = 1; // number of buffers in buffer group
|
||||
const buffer_size: usize = 128; // size of each buffer in group
|
||||
const buffers = try testing.allocator.alloc(u8, buffers_count * buffer_size);
|
||||
defer testing.allocator.free(buffers);
|
||||
var buf_grp = BufferGroup.init(
|
||||
&ring,
|
||||
testing.allocator,
|
||||
group_id,
|
||||
buffers,
|
||||
buffer_size,
|
||||
buffers_count,
|
||||
) catch |err| switch (err) {
|
||||
@ -3948,7 +4079,7 @@ test BufferGroup {
|
||||
error.ArgumentsInvalid => return error.SkipZigTest,
|
||||
else => return err,
|
||||
};
|
||||
defer buf_grp.deinit();
|
||||
defer buf_grp.deinit(testing.allocator);
|
||||
|
||||
// Create client/server fds
|
||||
const fds = try createSocketTestHarness(&ring);
|
||||
@ -3979,14 +4110,11 @@ test BufferGroup {
|
||||
try testing.expectEqual(posix.E.SUCCESS, cqe.err());
|
||||
try testing.expectEqual(data.len, @as(usize, @intCast(cqe.res))); // cqe.res holds received data len
|
||||
|
||||
// Read buffer_id and used buffer len from cqe
|
||||
const buffer_id = try cqe.buffer_id();
|
||||
const len: usize = @intCast(cqe.res);
|
||||
// Get buffer from pool
|
||||
const buf = buf_grp.get(buffer_id)[0..len];
|
||||
const buf = try buf_grp.get(cqe);
|
||||
try testing.expectEqualSlices(u8, &data, buf);
|
||||
// Release buffer to the kernel when application is done with it
|
||||
buf_grp.put(buffer_id);
|
||||
try buf_grp.put(cqe);
|
||||
}
|
||||
}
|
||||
|
||||
@ -4004,12 +4132,10 @@ test "ring mapped buffers recv" {
|
||||
const group_id: u16 = 1; // buffers group id
|
||||
const buffers_count: u16 = 2; // number of buffers in buffer group
|
||||
const buffer_size: usize = 4; // size of each buffer in group
|
||||
const buffers = try testing.allocator.alloc(u8, buffers_count * buffer_size);
|
||||
defer testing.allocator.free(buffers);
|
||||
var buf_grp = BufferGroup.init(
|
||||
&ring,
|
||||
testing.allocator,
|
||||
group_id,
|
||||
buffers,
|
||||
buffer_size,
|
||||
buffers_count,
|
||||
) catch |err| switch (err) {
|
||||
@ -4017,7 +4143,7 @@ test "ring mapped buffers recv" {
|
||||
error.ArgumentsInvalid => return error.SkipZigTest,
|
||||
else => return err,
|
||||
};
|
||||
defer buf_grp.deinit();
|
||||
defer buf_grp.deinit(testing.allocator);
|
||||
|
||||
// create client/server fds
|
||||
const fds = try createSocketTestHarness(&ring);
|
||||
@ -4039,14 +4165,18 @@ test "ring mapped buffers recv" {
|
||||
if (cqe_send.err() == .INVAL) return error.SkipZigTest;
|
||||
try testing.expectEqual(linux.io_uring_cqe{ .user_data = user_data, .res = data.len, .flags = 0 }, cqe_send);
|
||||
}
|
||||
var pos: usize = 0;
|
||||
|
||||
// server reads data into provided buffers
|
||||
// there are 2 buffers of size 4, so each read gets only chunk of data
|
||||
// we read four chunks of 4, 4, 4, 3 bytes each
|
||||
var chunk: []const u8 = data[0..buffer_size]; // first chunk
|
||||
const id1 = try expect_buf_grp_recv(&ring, &buf_grp, fds.server, rnd.int(u64), chunk);
|
||||
chunk = data[buffer_size .. buffer_size * 2]; // second chunk
|
||||
const id2 = try expect_buf_grp_recv(&ring, &buf_grp, fds.server, rnd.int(u64), chunk);
|
||||
// read first chunk
|
||||
const cqe1 = try buf_grp_recv_submit_get_cqe(&ring, &buf_grp, fds.server, rnd.int(u64));
|
||||
var buf = try buf_grp.get(cqe1);
|
||||
try testing.expectEqualSlices(u8, data[pos..][0..buf.len], buf);
|
||||
pos += buf.len;
|
||||
// second chunk
|
||||
const cqe2 = try buf_grp_recv_submit_get_cqe(&ring, &buf_grp, fds.server, rnd.int(u64));
|
||||
buf = try buf_grp.get(cqe2);
|
||||
try testing.expectEqualSlices(u8, data[pos..][0..buf.len], buf);
|
||||
pos += buf.len;
|
||||
|
||||
// both buffers provided to the kernel are used so we get error
|
||||
// 'no more buffers', until we put buffers to the kernel
|
||||
@ -4063,16 +4193,17 @@ test "ring mapped buffers recv" {
|
||||
}
|
||||
|
||||
// put buffers back to the kernel
|
||||
buf_grp.put(id1);
|
||||
buf_grp.put(id2);
|
||||
try buf_grp.put(cqe1);
|
||||
try buf_grp.put(cqe2);
|
||||
|
||||
chunk = data[buffer_size * 2 .. buffer_size * 3]; // third chunk
|
||||
const id3 = try expect_buf_grp_recv(&ring, &buf_grp, fds.server, rnd.int(u64), chunk);
|
||||
buf_grp.put(id3);
|
||||
|
||||
chunk = data[buffer_size * 3 ..]; // last chunk
|
||||
const id4 = try expect_buf_grp_recv(&ring, &buf_grp, fds.server, rnd.int(u64), chunk);
|
||||
buf_grp.put(id4);
|
||||
// read remaining data
|
||||
while (pos < data.len) {
|
||||
const cqe = try buf_grp_recv_submit_get_cqe(&ring, &buf_grp, fds.server, rnd.int(u64));
|
||||
buf = try buf_grp.get(cqe);
|
||||
try testing.expectEqualSlices(u8, data[pos..][0..buf.len], buf);
|
||||
pos += buf.len;
|
||||
try buf_grp.put(cqe);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@ -4090,12 +4221,10 @@ test "ring mapped buffers multishot recv" {
|
||||
const group_id: u16 = 1; // buffers group id
|
||||
const buffers_count: u16 = 2; // number of buffers in buffer group
|
||||
const buffer_size: usize = 4; // size of each buffer in group
|
||||
const buffers = try testing.allocator.alloc(u8, buffers_count * buffer_size);
|
||||
defer testing.allocator.free(buffers);
|
||||
var buf_grp = BufferGroup.init(
|
||||
&ring,
|
||||
testing.allocator,
|
||||
group_id,
|
||||
buffers,
|
||||
buffer_size,
|
||||
buffers_count,
|
||||
) catch |err| switch (err) {
|
||||
@ -4103,7 +4232,7 @@ test "ring mapped buffers multishot recv" {
|
||||
error.ArgumentsInvalid => return error.SkipZigTest,
|
||||
else => return err,
|
||||
};
|
||||
defer buf_grp.deinit();
|
||||
defer buf_grp.deinit(testing.allocator);
|
||||
|
||||
// create client/server fds
|
||||
const fds = try createSocketTestHarness(&ring);
|
||||
@ -4116,7 +4245,7 @@ test "ring mapped buffers multishot recv" {
|
||||
var round: usize = 4; // repeat send/recv cycle round times
|
||||
while (round > 0) : (round -= 1) {
|
||||
// client sends data
|
||||
const data = [_]u8{ 0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 0xa, 0xb, 0xc, 0xd, 0xe };
|
||||
const data = [_]u8{ 0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 0xa, 0xb, 0xc, 0xd, 0xe, 0xf };
|
||||
{
|
||||
const user_data = rnd.int(u64);
|
||||
_ = try ring.send(user_data, fds.client, data[0..], 0);
|
||||
@ -4133,7 +4262,7 @@ test "ring mapped buffers multishot recv" {
|
||||
|
||||
// server reads data into provided buffers
|
||||
// there are 2 buffers of size 4, so each read gets only chunk of data
|
||||
// we read four chunks of 4, 4, 4, 3 bytes each
|
||||
// we read four chunks of 4, 4, 4, 4 bytes each
|
||||
var chunk: []const u8 = data[0..buffer_size]; // first chunk
|
||||
const cqe1 = try expect_buf_grp_cqe(&ring, &buf_grp, recv_user_data, chunk);
|
||||
try testing.expect(cqe1.flags & linux.IORING_CQE_F_MORE > 0);
|
||||
@ -4157,8 +4286,8 @@ test "ring mapped buffers multishot recv" {
|
||||
}
|
||||
|
||||
// put buffers back to the kernel
|
||||
buf_grp.put(try cqe1.buffer_id());
|
||||
buf_grp.put(try cqe2.buffer_id());
|
||||
try buf_grp.put(cqe1);
|
||||
try buf_grp.put(cqe2);
|
||||
|
||||
// restart multishot
|
||||
recv_user_data = rnd.int(u64);
|
||||
@ -4168,12 +4297,12 @@ test "ring mapped buffers multishot recv" {
|
||||
chunk = data[buffer_size * 2 .. buffer_size * 3]; // third chunk
|
||||
const cqe3 = try expect_buf_grp_cqe(&ring, &buf_grp, recv_user_data, chunk);
|
||||
try testing.expect(cqe3.flags & linux.IORING_CQE_F_MORE > 0);
|
||||
buf_grp.put(try cqe3.buffer_id());
|
||||
try buf_grp.put(cqe3);
|
||||
|
||||
chunk = data[buffer_size * 3 ..]; // last chunk
|
||||
const cqe4 = try expect_buf_grp_cqe(&ring, &buf_grp, recv_user_data, chunk);
|
||||
try testing.expect(cqe4.flags & linux.IORING_CQE_F_MORE > 0);
|
||||
buf_grp.put(try cqe4.buffer_id());
|
||||
try buf_grp.put(cqe4);
|
||||
|
||||
// cancel pending multishot recv operation
|
||||
{
|
||||
@ -4217,23 +4346,26 @@ test "ring mapped buffers multishot recv" {
|
||||
}
|
||||
}
|
||||
|
||||
// Prepare and submit recv using buffer group.
|
||||
// Test that buffer from group, pointed by cqe, matches expected.
|
||||
fn expect_buf_grp_recv(
|
||||
// Prepare, submit recv and get cqe using buffer group.
|
||||
fn buf_grp_recv_submit_get_cqe(
|
||||
ring: *IoUring,
|
||||
buf_grp: *BufferGroup,
|
||||
fd: posix.fd_t,
|
||||
user_data: u64,
|
||||
expected: []const u8,
|
||||
) !u16 {
|
||||
// prepare and submit read
|
||||
) !linux.io_uring_cqe {
|
||||
// prepare and submit recv
|
||||
const sqe = try buf_grp.recv(user_data, fd, 0);
|
||||
try testing.expect(sqe.flags & linux.IOSQE_BUFFER_SELECT == linux.IOSQE_BUFFER_SELECT);
|
||||
try testing.expect(sqe.buf_index == buf_grp.group_id);
|
||||
try testing.expectEqual(@as(u32, 1), try ring.submit()); // submit
|
||||
// get cqe, expect success
|
||||
const cqe = try ring.copy_cqe();
|
||||
try testing.expectEqual(user_data, cqe.user_data);
|
||||
try testing.expect(cqe.res >= 0); // success
|
||||
try testing.expectEqual(posix.E.SUCCESS, cqe.err());
|
||||
try testing.expect(cqe.flags & linux.IORING_CQE_F_BUFFER == linux.IORING_CQE_F_BUFFER); // IORING_CQE_F_BUFFER flag is set
|
||||
|
||||
const cqe = try expect_buf_grp_cqe(ring, buf_grp, user_data, expected);
|
||||
return try cqe.buffer_id();
|
||||
return cqe;
|
||||
}
|
||||
|
||||
fn expect_buf_grp_cqe(
|
||||
@ -4253,7 +4385,7 @@ fn expect_buf_grp_cqe(
|
||||
// get buffer from pool
|
||||
const buffer_id = try cqe.buffer_id();
|
||||
const len = @as(usize, @intCast(cqe.res));
|
||||
const buf = buf_grp.get(buffer_id)[0..len];
|
||||
const buf = buf_grp.get_by_id(buffer_id)[0..len];
|
||||
try testing.expectEqualSlices(u8, expected, buf);
|
||||
|
||||
return cqe;
|
||||
@ -4305,3 +4437,137 @@ test "copy_cqes with wrapping sq.cqes buffer" {
|
||||
try testing.expectEqual(2 + 4 * i, ring.cq.head.*);
|
||||
}
|
||||
}
|
||||
|
||||
test "bind/listen/connect" {
|
||||
var ring = IoUring.init(4, 0) catch |err| switch (err) {
|
||||
error.SystemOutdated => return error.SkipZigTest,
|
||||
error.PermissionDenied => return error.SkipZigTest,
|
||||
else => return err,
|
||||
};
|
||||
defer ring.deinit();
|
||||
|
||||
const probe = ring.get_probe() catch return error.SkipZigTest;
|
||||
// LISTEN is higher required operation
|
||||
if (!probe.is_supported(.LISTEN)) return error.SkipZigTest;
|
||||
|
||||
var addr = net.Address.initIp4([4]u8{ 127, 0, 0, 1 }, 0);
|
||||
const proto: u32 = if (addr.any.family == linux.AF.UNIX) 0 else linux.IPPROTO.TCP;
|
||||
|
||||
const listen_fd = brk: {
|
||||
// Create socket
|
||||
_ = try ring.socket(1, addr.any.family, linux.SOCK.STREAM | linux.SOCK.CLOEXEC, proto, 0);
|
||||
try testing.expectEqual(1, try ring.submit());
|
||||
var cqe = try ring.copy_cqe();
|
||||
try testing.expectEqual(1, cqe.user_data);
|
||||
try testing.expectEqual(posix.E.SUCCESS, cqe.err());
|
||||
const listen_fd: posix.fd_t = @intCast(cqe.res);
|
||||
try testing.expect(listen_fd > 2);
|
||||
|
||||
// Prepare: set socket option * 2, bind, listen
|
||||
var optval: u32 = 1;
|
||||
(try ring.setsockopt(2, listen_fd, linux.SOL.SOCKET, linux.SO.REUSEADDR, mem.asBytes(&optval))).link_next();
|
||||
(try ring.setsockopt(3, listen_fd, linux.SOL.SOCKET, linux.SO.REUSEPORT, mem.asBytes(&optval))).link_next();
|
||||
(try ring.bind(4, listen_fd, &addr.any, addr.getOsSockLen(), 0)).link_next();
|
||||
_ = try ring.listen(5, listen_fd, 1, 0);
|
||||
// Submit 4 operations
|
||||
try testing.expectEqual(4, try ring.submit());
|
||||
// Expect all to succeed
|
||||
for (2..6) |user_data| {
|
||||
cqe = try ring.copy_cqe();
|
||||
try testing.expectEqual(user_data, cqe.user_data);
|
||||
try testing.expectEqual(posix.E.SUCCESS, cqe.err());
|
||||
}
|
||||
|
||||
// Check that socket option is set
|
||||
optval = 0;
|
||||
_ = try ring.getsockopt(5, listen_fd, linux.SOL.SOCKET, linux.SO.REUSEADDR, mem.asBytes(&optval));
|
||||
try testing.expectEqual(1, try ring.submit());
|
||||
cqe = try ring.copy_cqe();
|
||||
try testing.expectEqual(5, cqe.user_data);
|
||||
try testing.expectEqual(posix.E.SUCCESS, cqe.err());
|
||||
try testing.expectEqual(1, optval);
|
||||
|
||||
// Read system assigned port into addr
|
||||
var addr_len: posix.socklen_t = addr.getOsSockLen();
|
||||
try posix.getsockname(listen_fd, &addr.any, &addr_len);
|
||||
|
||||
break :brk listen_fd;
|
||||
};
|
||||
|
||||
const connect_fd = brk: {
|
||||
// Create connect socket
|
||||
_ = try ring.socket(6, addr.any.family, linux.SOCK.STREAM | linux.SOCK.CLOEXEC, proto, 0);
|
||||
try testing.expectEqual(1, try ring.submit());
|
||||
const cqe = try ring.copy_cqe();
|
||||
try testing.expectEqual(6, cqe.user_data);
|
||||
try testing.expectEqual(posix.E.SUCCESS, cqe.err());
|
||||
// Get connect socket fd
|
||||
const connect_fd: posix.fd_t = @intCast(cqe.res);
|
||||
try testing.expect(connect_fd > 2 and connect_fd != listen_fd);
|
||||
break :brk connect_fd;
|
||||
};
|
||||
|
||||
// Prepare accept/connect operations
|
||||
_ = try ring.accept(7, listen_fd, null, null, 0);
|
||||
_ = try ring.connect(8, connect_fd, &addr.any, addr.getOsSockLen());
|
||||
try testing.expectEqual(2, try ring.submit());
|
||||
// Get listener accepted socket
|
||||
var accept_fd: posix.socket_t = 0;
|
||||
for (0..2) |_| {
|
||||
const cqe = try ring.copy_cqe();
|
||||
try testing.expectEqual(posix.E.SUCCESS, cqe.err());
|
||||
if (cqe.user_data == 7) {
|
||||
accept_fd = @intCast(cqe.res);
|
||||
} else {
|
||||
try testing.expectEqual(8, cqe.user_data);
|
||||
}
|
||||
}
|
||||
try testing.expect(accept_fd > 2 and accept_fd != listen_fd and accept_fd != connect_fd);
|
||||
|
||||
// Communicate
|
||||
try testSendRecv(&ring, connect_fd, accept_fd);
|
||||
try testSendRecv(&ring, accept_fd, connect_fd);
|
||||
|
||||
// Shutdown and close all sockets
|
||||
for ([_]posix.socket_t{ connect_fd, accept_fd, listen_fd }) |fd| {
|
||||
(try ring.shutdown(9, fd, posix.SHUT.RDWR)).link_next();
|
||||
_ = try ring.close(10, fd);
|
||||
try testing.expectEqual(2, try ring.submit());
|
||||
for (0..2) |i| {
|
||||
const cqe = try ring.copy_cqe();
|
||||
try testing.expectEqual(posix.E.SUCCESS, cqe.err());
|
||||
try testing.expectEqual(9 + i, cqe.user_data);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn testSendRecv(ring: *IoUring, send_fd: posix.socket_t, recv_fd: posix.socket_t) !void {
|
||||
const buffer_send = "0123456789abcdf" ** 10;
|
||||
var buffer_recv: [buffer_send.len * 2]u8 = undefined;
|
||||
|
||||
// 2 sends
|
||||
_ = try ring.send(1, send_fd, buffer_send, linux.MSG.WAITALL);
|
||||
_ = try ring.send(2, send_fd, buffer_send, linux.MSG.WAITALL);
|
||||
try testing.expectEqual(2, try ring.submit());
|
||||
for (0..2) |i| {
|
||||
const cqe = try ring.copy_cqe();
|
||||
try testing.expectEqual(1 + i, cqe.user_data);
|
||||
try testing.expectEqual(posix.E.SUCCESS, cqe.err());
|
||||
try testing.expectEqual(buffer_send.len, @as(usize, @intCast(cqe.res)));
|
||||
}
|
||||
|
||||
// receive
|
||||
var recv_len: usize = 0;
|
||||
while (recv_len < buffer_send.len * 2) {
|
||||
_ = try ring.recv(3, recv_fd, .{ .buffer = buffer_recv[recv_len..] }, 0);
|
||||
try testing.expectEqual(1, try ring.submit());
|
||||
const cqe = try ring.copy_cqe();
|
||||
try testing.expectEqual(3, cqe.user_data);
|
||||
try testing.expectEqual(posix.E.SUCCESS, cqe.err());
|
||||
recv_len += @intCast(cqe.res);
|
||||
}
|
||||
|
||||
// inspect recv buffer
|
||||
try testing.expectEqualSlices(u8, buffer_send, buffer_recv[0..buffer_send.len]);
|
||||
try testing.expectEqualSlices(u8, buffer_send, buffer_recv[buffer_send.len..]);
|
||||
}
|
||||
|
||||
@ -619,4 +619,61 @@ pub const io_uring_sqe = extern struct {
|
||||
sqe.rw_flags = flags;
|
||||
sqe.splice_fd_in = @bitCast(options);
|
||||
}
|
||||
|
||||
pub fn prep_bind(
|
||||
sqe: *linux.io_uring_sqe,
|
||||
fd: linux.fd_t,
|
||||
addr: *const linux.sockaddr,
|
||||
addrlen: linux.socklen_t,
|
||||
flags: u32,
|
||||
) void {
|
||||
sqe.prep_rw(.BIND, fd, @intFromPtr(addr), 0, addrlen);
|
||||
sqe.rw_flags = flags;
|
||||
}
|
||||
|
||||
pub fn prep_listen(
|
||||
sqe: *linux.io_uring_sqe,
|
||||
fd: linux.fd_t,
|
||||
backlog: usize,
|
||||
flags: u32,
|
||||
) void {
|
||||
sqe.prep_rw(.LISTEN, fd, 0, backlog, 0);
|
||||
sqe.rw_flags = flags;
|
||||
}
|
||||
|
||||
pub fn prep_cmd_sock(
|
||||
sqe: *linux.io_uring_sqe,
|
||||
cmd_op: linux.IO_URING_SOCKET_OP,
|
||||
fd: linux.fd_t,
|
||||
level: u32,
|
||||
optname: u32,
|
||||
optval: u64,
|
||||
optlen: u32,
|
||||
) void {
|
||||
sqe.prep_rw(.URING_CMD, fd, 0, 0, 0);
|
||||
// off is overloaded with cmd_op, https://github.com/axboe/liburing/blob/e1003e496e66f9b0ae06674869795edf772d5500/src/include/liburing/io_uring.h#L39
|
||||
sqe.off = @intFromEnum(cmd_op);
|
||||
// addr is overloaded, https://github.com/axboe/liburing/blob/e1003e496e66f9b0ae06674869795edf772d5500/src/include/liburing/io_uring.h#L46
|
||||
sqe.addr = @bitCast(packed struct {
|
||||
level: u32,
|
||||
optname: u32,
|
||||
}{
|
||||
.level = level,
|
||||
.optname = optname,
|
||||
});
|
||||
// splice_fd_in if overloaded u32 -> i32
|
||||
sqe.splice_fd_in = @bitCast(optlen);
|
||||
// addr3 is overloaded, https://github.com/axboe/liburing/blob/e1003e496e66f9b0ae06674869795edf772d5500/src/include/liburing/io_uring.h#L102
|
||||
sqe.addr3 = optval;
|
||||
}
|
||||
|
||||
pub fn set_flags(sqe: *linux.io_uring_sqe, flags: u8) void {
|
||||
sqe.flags |= flags;
|
||||
}
|
||||
|
||||
/// This SQE forms a link with the next SQE in the submission ring. Next SQE
|
||||
/// will not be started before this one completes. Forms a chain of SQEs.
|
||||
pub fn link_next(sqe: *linux.io_uring_sqe) void {
|
||||
sqe.flags |= linux.IOSQE_IO_LINK;
|
||||
}
|
||||
};
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user