Allow for advanced non-sequential SQE allocation schemes

Decouples SQE queueing and SQE prepping methods to allow for non-sequential
SQE allocation schemes as suggested by @daurnimator.

Adds essential SQE prepping methods from liburing to reduce boilerplate.

Removes non-essential .link_with_next_sqe() and .use_registered_fd().
This commit is contained in:
Joran Dirk Greef 2020-10-04 13:01:41 +02:00
parent e32c7d06e5
commit 69a55fc560

View File

@ -135,13 +135,13 @@ pub const IO_Uring = struct {
self.fd = -1;
}
/// Returns a pointer to a zeroed SQE, or an error if the submission queue is full.
/// Returns a pointer to a vacant SQE, or an error if the submission queue is full.
/// We follow the implementation (and atomics) of liburing's `io_uring_get_sqe()` exactly.
/// However, instead of a null we return an error to force safe handling.
/// Any situation where the submission queue is full tends more towards a control flow error,
/// and the null return in liburing is more a C idiom than anything else, for lack of a better
/// alternative. In Zig, we have first-class error handling... so let's use it.
/// Matches the implementation of io_uring_get_sqe() in liburing, except zeroes for safety.
/// Matches the implementation of io_uring_get_sqe() in liburing.
pub fn get_sqe(self: *IO_Uring) !*io_uring_sqe {
const head = @atomicLoad(u32, self.sq.head, .Acquire);
// Remember that these head and tail offsets wrap around every four billion operations.
@ -150,8 +150,6 @@ pub const IO_Uring = struct {
if (next -% head > self.sq.sqes.len) return error.SubmissionQueueFull;
var sqe = &self.sq.sqes[self.sq.sqe_tail & self.sq.mask];
self.sq.sqe_tail = next;
// We zero the SQE slot here in a single place, rather than in many `queue_` methods.
@memset(@ptrCast([*]u8, sqe), 0, @sizeOf(io_uring_sqe));
return sqe;
}
@ -336,29 +334,6 @@ pub const IO_Uring = struct {
}
}
/// Queues (but does not submit) an SQE to perform an `accept4(2)` on a socket.
/// Returns a pointer to the SQE.
pub fn queue_accept(
self: *IO_Uring,
user_data: u64,
fd: os.fd_t,
addr: *os.sockaddr,
addrlen: *os.socklen_t,
accept_flags: u32
) !*io_uring_sqe {
// "sqe->fd is the file descriptor, sqe->addr holds a pointer to struct sockaddr,
// sqe->addr2 holds a pointer to socklen_t, and finally sqe->accept_flags holds the flags
// for accept(4)." - https://lwn.net/ml/linux-block/20191025173037.13486-1-axboe@kernel.dk/
const sqe = try self.get_sqe();
sqe.opcode = .ACCEPT;
sqe.fd = fd;
sqe.off = @ptrToInt(addrlen); // `addr2` is a newer union member that maps to `off`.
sqe.addr = @ptrToInt(addr);
sqe.user_data = user_data;
sqe.rw_flags = accept_flags;
return sqe;
}
/// Queues (but does not submit) an SQE to perform an `fsync(2)`.
/// Returns a pointer to the SQE so that you can further modify the SQE for advanced use cases.
/// For example, for `fdatasync()` you can set `IORING_FSYNC_DATASYNC` in the SQE's `rw_flags`.
@ -368,11 +343,9 @@ pub const IO_Uring = struct {
/// apply to the write, since the fsync may complete before the write is issued to the disk.
/// You should preferably use `link_with_next_sqe()` on a write's SQE to link it with an fsync,
/// or else insert a full write barrier using `drain_previous_sqes()` when queueing an fsync.
pub fn queue_fsync(self: *IO_Uring, user_data: u64, fd: os.fd_t, flags: u32) !*io_uring_sqe {
pub fn fsync(self: *IO_Uring, user_data: u64, fd: os.fd_t, flags: u32) !*io_uring_sqe {
const sqe = try self.get_sqe();
sqe.opcode = .FSYNC;
sqe.fd = fd;
sqe.rw_flags = flags;
io_uring_prep_fsync(sqe, fd, flags);
sqe.user_data = user_data;
return sqe;
}
@ -382,16 +355,16 @@ pub const IO_Uring = struct {
/// A no-op is more useful than may appear at first glance.
/// For example, you could call `drain_previous_sqes()` on the returned SQE, to use the no-op to
/// know when the ring is idle before acting on a kill signal.
pub fn queue_nop(self: *IO_Uring, user_data: u64) !*io_uring_sqe {
pub fn nop(self: *IO_Uring, user_data: u64) !*io_uring_sqe {
const sqe = try self.get_sqe();
sqe.opcode = .NOP;
io_uring_prep_nop(sqe);
sqe.user_data = user_data;
return sqe;
}
/// Queues (but does not submit) an SQE to perform a `read(2)`.
/// Returns a pointer to the SQE.
pub fn queue_read(
pub fn read(
self: *IO_Uring,
user_data: u64,
fd: os.fd_t,
@ -399,18 +372,14 @@ pub const IO_Uring = struct {
offset: u64
) !*io_uring_sqe {
const sqe = try self.get_sqe();
sqe.opcode = .READ;
sqe.fd = fd;
sqe.off = offset;
sqe.addr = @ptrToInt(buffer.ptr);
sqe.len = @intCast(u32, buffer.len);
io_uring_prep_read(sqe, fd, buffer, offset);
sqe.user_data = user_data;
return sqe;
}
/// Queues (but does not submit) an SQE to perform a `write(2)`.
/// Returns a pointer to the SQE.
pub fn queue_write(
pub fn write(
self: *IO_Uring,
user_data: u64,
fd: os.fd_t,
@ -418,11 +387,7 @@ pub const IO_Uring = struct {
offset: u64
) !*io_uring_sqe {
const sqe = try self.get_sqe();
sqe.opcode = .WRITE;
sqe.fd = fd;
sqe.off = offset;
sqe.addr = @ptrToInt(buffer.ptr);
sqe.len = @intCast(u32, buffer.len);
io_uring_prep_write(sqe, fd, buffer, offset);
sqe.user_data = user_data;
return sqe;
}
@ -431,7 +396,7 @@ pub const IO_Uring = struct {
/// Returns a pointer to the SQE so that you can further modify the SQE for advanced use cases.
/// For example, if you want to do a `preadv2()` then set `rw_flags` on the returned SQE.
/// See https://linux.die.net/man/2/preadv.
pub fn queue_readv(
pub fn readv(
self: *IO_Uring,
user_data: u64,
fd: os.fd_t,
@ -439,11 +404,7 @@ pub const IO_Uring = struct {
offset: u64
) !*io_uring_sqe {
const sqe = try self.get_sqe();
sqe.opcode = .READV;
sqe.fd = fd;
sqe.off = offset;
sqe.addr = @ptrToInt(iovecs.ptr);
sqe.len = @intCast(u32, iovecs.len);
io_uring_prep_readv(sqe, fd, iovecs, offset);
sqe.user_data = user_data;
return sqe;
}
@ -452,7 +413,7 @@ pub const IO_Uring = struct {
/// Returns a pointer to the SQE so that you can further modify the SQE for advanced use cases.
/// For example, if you want to do a `pwritev2()` then set `rw_flags` on the returned SQE.
/// See https://linux.die.net/man/2/pwritev.
pub fn queue_writev(
pub fn writev(
self: *IO_Uring,
user_data: u64,
fd: os.fd_t,
@ -460,25 +421,25 @@ pub const IO_Uring = struct {
offset: u64
) !*io_uring_sqe {
const sqe = try self.get_sqe();
sqe.opcode = .WRITEV;
sqe.fd = fd;
sqe.off = offset;
sqe.addr = @ptrToInt(iovecs.ptr);
sqe.len = @intCast(u32, iovecs.len);
io_uring_prep_writev(sqe, fd, iovecs, offset);
sqe.user_data = user_data;
return sqe;
}
/// The next SQE will not be started until this one completes.
/// This can be used to chain causally dependent SQEs, and the chain can be arbitrarily long.
/// The tail of the chain is denoted by the first SQE that does not have this flag set.
/// This flag has no effect on previous SQEs, nor does it impact SQEs outside the chain.
/// This means that multiple chains can be executing in parallel, along with individual SQEs.
/// Only members inside the chain are serialized.
/// A chain will be broken if any SQE in the chain ends in error, where any unexpected result is
/// considered an error. For example, a short read will terminate the remainder of the chain.
pub fn link_with_next_sqe(self: *IO_Uring, sqe: *io_uring_sqe) void {
sqe.flags |= linux.IOSQE_IO_LINK;
/// Queues (but does not submit) an SQE to perform an `accept4(2)` on a socket.
/// Returns a pointer to the SQE.
pub fn accept(
self: *IO_Uring,
user_data: u64,
fd: os.fd_t,
addr: *os.sockaddr,
addrlen: *os.socklen_t,
flags: u32
) !*io_uring_sqe {
const sqe = try self.get_sqe();
io_uring_prep_accept(sqe, fd, addr, addrlen, flags);
sqe.user_data = user_data;
return sqe;
}
/// Like `link_with_next_sqe()` but stronger.
@ -538,11 +499,6 @@ pub const IO_Uring = struct {
}
}
/// Changes the semantics of the SQE's `fd` to refer to a pre-registered file descriptor.
pub fn use_registered_fd(self: *IO_Uring, sqe: *io_uring_sqe) void {
sqe.flags |= linux.IOSQE_FIXED_FILE;
}
/// Unregisters all registered file descriptors previously associated with the ring.
pub fn unregister_files(self: *IO_Uring) !void {
assert(self.fd >= 0);
@ -563,8 +519,8 @@ pub const SubmissionQueue = struct {
dropped: *u32,
array: []u32,
sqes: []io_uring_sqe,
mmap: []align(std.mem.page_size) u8,
mmap_sqes: []align(std.mem.page_size) u8,
mmap: []align(mem.page_size) u8,
mmap_sqes: []align(mem.page_size) u8,
// We use `sqe_head` and `sqe_tail` in the same way as liburing:
// We increment `sqe_tail` (but not `tail`) for each call to `get_sqe()`.
@ -666,7 +622,156 @@ pub const CompletionQueue = struct {
}
};
test "structs and offsets" {
pub fn io_uring_prep_nop(sqe: *io_uring_sqe) void {
sqe.* = .{
.opcode = .NOP,
.flags = 0,
.ioprio = 0,
.fd = 0,
.off = 0,
.addr = 0,
.len = 0,
.rw_flags = 0,
.user_data = 0,
.buf_index = 0,
.personality = 0,
.splice_fd_in = 0,
.__pad2 = [2]u64{ 0, 0 }
};
}
pub fn io_uring_prep_fsync(sqe: *io_uring_sqe, fd: os.fd_t, flags: u32) void {
sqe.* = .{
.opcode = .FSYNC,
.flags = 0,
.ioprio = 0,
.fd = fd,
.off = 0,
.addr = 0,
.len = 0,
.rw_flags = flags,
.user_data = 0,
.buf_index = 0,
.personality = 0,
.splice_fd_in = 0,
.__pad2 = [2]u64{ 0, 0 }
};
}
pub fn io_uring_prep_rw(
op: linux.IORING_OP,
sqe: *io_uring_sqe,
fd: os.fd_t,
addr: anytype,
len: usize,
offset: u64
) void {
sqe.* = .{
.opcode = op,
.flags = 0,
.ioprio = 0,
.fd = fd,
.off = offset,
.addr = @ptrToInt(addr),
.len = @intCast(u32, len),
.rw_flags = 0,
.user_data = 0,
.buf_index = 0,
.personality = 0,
.splice_fd_in = 0,
.__pad2 = [2]u64{ 0, 0 }
};
}
pub fn io_uring_prep_read(sqe: *io_uring_sqe, fd: os.fd_t, buffer: []u8, offset: u64) void {
io_uring_prep_rw(.READ, sqe, fd, buffer.ptr, buffer.len, offset);
}
pub fn io_uring_prep_write(sqe: *io_uring_sqe, fd: os.fd_t, buffer: []const u8, offset: u64) void {
io_uring_prep_rw(.WRITE, sqe, fd, buffer.ptr, buffer.len, offset);
}
pub fn io_uring_prep_readv(
sqe: *io_uring_sqe,
fd: os.fd_t,
iovecs: []const os.iovec,
offset: u64
) void {
io_uring_prep_rw(.READV, sqe, fd, iovecs.ptr, iovecs.len, offset);
}
pub fn io_uring_prep_writev(
sqe: *io_uring_sqe,
fd: os.fd_t,
iovecs: []const os.iovec_const,
offset: u64
) void {
io_uring_prep_rw(.WRITEV, sqe, fd, iovecs.ptr, iovecs.len, offset);
}
pub fn io_uring_prep_accept(
sqe: *io_uring_sqe,
fd: os.fd_t,
addr: *os.sockaddr,
addrlen: *os.socklen_t,
flags: u32
) void {
// `addr` holds a pointer to `sockaddr`, and `addr2` holds a pointer to socklen_t`.
// `addr2` maps to `sqe.off` (u64) instead of `sqe.len` (which is only a u32).
io_uring_prep_rw(.ACCEPT, sqe, fd, addr, 0, @ptrToInt(addrlen));
sqe.rw_flags = flags;
}
pub fn io_uring_prep_connect(
sqe: *io_uring_sqe,
fd: os.fd_t,
addr: *const os.sockaddr,
addrlen: os.socklen_t
) void {
// `addrlen` maps to `sqe.off` (u64) instead of `sqe.len` (which is only a u32).
io_uring_prep_rw(.CONNECT, sqe, fd, addr, 0, addrlen);
}
pub fn io_uring_prep_recv(sqe: *io_uring_sqe, fd: os.fd_t, buffer: []u8, flags: u32) void {
io_uring_prep_rw(.RECV, sqe, fd, buffer.ptr, buffer.len, 0);
sqe.rw_flags = flags;
}
pub fn io_uring_prep_send(sqe: *io_uring_sqe, fd: os.fd_t, buffer: []const u8, flags: u32) void {
io_uring_prep_rw(.SEND, sqe, fd, buffer.ptr, buffer.len, 0);
sqe.rw_flags = flags;
}
pub fn io_uring_prep_openat(
sqe: *io_uring_sqe,
fd: os.fd_t,
path: [*:0]const u8,
flags: u32,
mode: os.mode_t
) void {
io_uring_prep_rw(.OPENAT, sqe, fd, path, mode, 0);
sqe.rw_flags = flags;
}
pub fn io_uring_prep_close(sqe: *io_uring_sqe, fd: os.fd_t) void {
sqe.* = .{
.opcode = .CLOSE,
.flags = 0,
.ioprio = 0,
.fd = fd,
.off = 0,
.addr = 0,
.len = 0,
.rw_flags = 0,
.user_data = 0,
.buf_index = 0,
.personality = 0,
.splice_fd_in = 0,
.__pad2 = [2]u64{ 0, 0 }
};
}
test "structs/offsets/entries" {
if (builtin.os.tag != .linux) return error.SkipZigTest;
testing.expectEqual(@as(usize, 120), @sizeOf(io_uring_params));
@ -681,7 +786,7 @@ test "structs and offsets" {
testing.expectError(error.EntriesNotPowerOfTwo, IO_Uring.init(3, 0));
}
test "queue_nop" {
test "nop" {
if (builtin.os.tag != .linux) return error.SkipZigTest;
var ring = IO_Uring.init(1, 0) catch |err| switch (err) {
@ -694,7 +799,7 @@ test "queue_nop" {
testing.expectEqual(@as(os.fd_t, -1), ring.fd);
}
var sqe = try ring.queue_nop(@intCast(u64, 0xaaaaaaaa));
var sqe = try ring.nop(0xaaaaaaaa);
testing.expectEqual(io_uring_sqe {
.opcode = .NOP,
.flags = 0,
@ -704,7 +809,7 @@ test "queue_nop" {
.addr = 0,
.len = 0,
.rw_flags = 0,
.user_data = @intCast(u64, 0xaaaaaaaa),
.user_data = 0xaaaaaaaa,
.buf_index = 0,
.personality = 0,
.splice_fd_in = 0,
@ -733,9 +838,8 @@ test "queue_nop" {
testing.expectEqual(@as(u32, 1), ring.cq.head.*);
testing.expectEqual(@as(u32, 0), ring.cq_ready());
var sqe_barrier = try ring.queue_nop(@intCast(u64, 0xbbbbbbbb));
ring.drain_previous_sqes(sqe_barrier);
testing.expectEqual(@as(u8, linux.IOSQE_IO_DRAIN), sqe_barrier.flags);
var sqe_barrier = try ring.nop(0xbbbbbbbb);
sqe_barrier.flags |= linux.IOSQE_IO_DRAIN;
testing.expectEqual(@as(u32, 1), try ring.submit());
testing.expectEqual(io_uring_cqe {
.user_data = 0xbbbbbbbb,
@ -748,7 +852,7 @@ test "queue_nop" {
testing.expectEqual(@as(u32, 2), ring.cq.head.*);
}
test "queue_readv" {
test "readv" {
if (builtin.os.tag != .linux) return error.SkipZigTest;
var ring = IO_Uring.init(1, 0) catch |err| switch (err) {
@ -774,11 +878,11 @@ test "queue_readv" {
var buffer = [_]u8{42} ** 128;
var iovecs = [_]os.iovec{ os.iovec { .iov_base = &buffer, .iov_len = buffer.len } };
var sqe = try ring.queue_readv(0xcccccccc, fd_index, iovecs[0..], 0);
ring.use_registered_fd(sqe);
testing.expectEqual(@as(u8, linux.IOSQE_FIXED_FILE), sqe.flags);
var sqe = try ring.readv(0xcccccccc, fd_index, iovecs[0..], 0);
testing.expectEqual(linux.IORING_OP.READV, sqe.opcode);
sqe.flags |= linux.IOSQE_FIXED_FILE;
testing.expectError(error.SubmissionQueueFull, ring.queue_nop(0));
testing.expectError(error.SubmissionQueueFull, ring.nop(0));
testing.expectEqual(@as(u32, 1), try ring.submit());
testing.expectEqual(linux.io_uring_cqe {
.user_data = 0xcccccccc,
@ -790,52 +894,75 @@ test "queue_readv" {
try ring.unregister_files();
}
test "queue_writev/queue_fsync" {
test "writev/fsync/readv" {
if (builtin.os.tag != .linux) return error.SkipZigTest;
var ring = IO_Uring.init(2, 0) catch |err| switch (err) {
var ring = IO_Uring.init(4, 0) catch |err| switch (err) {
error.SystemOutdated => return error.SkipZigTest,
error.PermissionDenied => return error.SkipZigTest,
else => return err
};
defer ring.deinit();
const path = "test_io_uring_queue_writev";
const file = try std.fs.cwd().createFile(path, .{ .truncate = true });
const path = "test_io_uring_writev_fsync_readv";
const file = try std.fs.cwd().createFile(path, .{ .read = true, .truncate = true });
defer file.close();
defer std.fs.cwd().deleteFile(path) catch {};
const fd = file.handle;
var buffer = [_]u8{42} ** 128;
var iovecs = [_]os.iovec_const {
os.iovec_const { .iov_base = &buffer, .iov_len = buffer.len }
var buffer_write = [_]u8{42} ** 128;
var iovecs_write = [_]os.iovec_const {
os.iovec_const { .iov_base = &buffer_write, .iov_len = buffer_write.len }
};
var buffer_read = [_]u8{0} ** 128;
var iovecs_read = [_]os.iovec {
os.iovec { .iov_base = &buffer_read, .iov_len = buffer_read.len }
};
var sqe_writev = try ring.queue_writev(0xdddddddd, fd, iovecs[0..], 0);
ring.link_with_next_sqe(sqe_writev);
testing.expectEqual(@as(u8, linux.IOSQE_IO_LINK), sqe_writev.flags);
var sqe_fsync = try ring.queue_fsync(0xeeeeeeee, fd, 0);
testing.expectEqual(fd, sqe_fsync.fd);
testing.expectEqual(@as(u32, 2), ring.sq_ready());
testing.expectEqual(@as(u32, 2), try ring.submit_and_wait(2));
var sqe_writev = try ring.writev(0xdddddddd, fd, iovecs_write[0..], 17);
testing.expectEqual(linux.IORING_OP.WRITEV, sqe_writev.opcode);
testing.expectEqual(@as(u64, 17), sqe_writev.off);
sqe_writev.flags |= linux.IOSQE_IO_LINK;
var sqe_fsync = try ring.fsync(0xeeeeeeee, fd, 0);
testing.expectEqual(linux.IORING_OP.FSYNC, sqe_fsync.opcode);
testing.expectEqual(fd, sqe_fsync.fd);
sqe_fsync.flags |= linux.IOSQE_IO_LINK;
var sqe_readv = try ring.readv(0xffffffff, fd, iovecs_read[0..], 17);
testing.expectEqual(linux.IORING_OP.READV, sqe_readv.opcode);
testing.expectEqual(@as(u64, 17), sqe_readv.off);
testing.expectEqual(@as(u32, 3), ring.sq_ready());
testing.expectEqual(@as(u32, 3), try ring.submit_and_wait(3));
testing.expectEqual(@as(u32, 0), ring.sq_ready());
testing.expectEqual(@as(u32, 2), ring.cq_ready());
testing.expectEqual(@as(u32, 3), ring.cq_ready());
testing.expectEqual(linux.io_uring_cqe {
.user_data = 0xdddddddd,
.res = buffer.len,
.res = buffer_write.len,
.flags = 0,
}, try ring.copy_cqe());
testing.expectEqual(@as(u32, 1), ring.cq_ready());
testing.expectEqual(@as(u32, 2), ring.cq_ready());
testing.expectEqual(linux.io_uring_cqe {
.user_data = 0xeeeeeeee,
.res = 0,
.flags = 0,
}, try ring.copy_cqe());
testing.expectEqual(@as(u32, 1), ring.cq_ready());
testing.expectEqual(linux.io_uring_cqe {
.user_data = 0xffffffff,
.res = buffer_read.len,
.flags = 0,
}, try ring.copy_cqe());
testing.expectEqual(@as(u32, 0), ring.cq_ready());
testing.expectEqualSlices(u8, buffer_write[0..], buffer_read[0..]);
}
test "queue_write/queue_read" {
test "write/read" {
if (builtin.os.tag != .linux) return error.SkipZigTest;
var ring = IO_Uring.init(2, 0) catch |err| switch (err) {
@ -845,7 +972,7 @@ test "queue_write/queue_read" {
};
defer ring.deinit();
const path = "test_io_uring_queue_write";
const path = "test_io_uring_write_read";
const file = try std.fs.cwd().createFile(path, .{ .read = true, .truncate = true });
defer file.close();
defer std.fs.cwd().deleteFile(path) catch {};
@ -853,26 +980,30 @@ test "queue_write/queue_read" {
var buffer_write = [_]u8{97} ** 20;
var buffer_read = [_]u8{98} ** 20;
var sqe_write = try ring.queue_write(123, fd, buffer_write[0..], 10);
ring.link_with_next_sqe(sqe_write);
var sqe_read = try ring.queue_read(456, fd, buffer_read[0..], 10);
var sqe_write = try ring.write(123, fd, buffer_write[0..], 10);
testing.expectEqual(linux.IORING_OP.WRITE, sqe_write.opcode);
testing.expectEqual(@as(u64, 10), sqe_write.off);
sqe_write.flags |= linux.IOSQE_IO_LINK;
var sqe_read = try ring.read(456, fd, buffer_read[0..], 10);
testing.expectEqual(linux.IORING_OP.READ, sqe_read.opcode);
testing.expectEqual(@as(u64, 10), sqe_read.off);
testing.expectEqual(@as(u32, 2), try ring.submit());
var cqe1 = try ring.copy_cqe();
var cqe2 = try ring.copy_cqe();
var cqe_write = try ring.copy_cqe();
var cqe_read = try ring.copy_cqe();
// Prior to Linux Kernel 5.6 this is the only way to test for read/write support:
// https://lwn.net/Articles/809820/
if (cqe1.res == -linux.EINVAL) return error.SkipZigTest;
if (cqe2.res == -linux.EINVAL) return error.SkipZigTest;
if (cqe_write.res == -linux.EINVAL) return error.SkipZigTest;
if (cqe_read.res == -linux.EINVAL) return error.SkipZigTest;
testing.expectEqual(linux.io_uring_cqe {
.user_data = 123,
.res = buffer_write.len,
.flags = 0,
}, cqe1);
}, cqe_write);
testing.expectEqual(linux.io_uring_cqe {
.user_data = 456,
.res = buffer_read.len,
.flags = 0,
}, cqe2);
}, cqe_read);
testing.expectEqualSlices(u8, buffer_write[0..], buffer_read[0..]);
}