From 2a54c2ff199f9f93dc50d9d1d98e32a9bb8423e3 Mon Sep 17 00:00:00 2001 From: Hiroaki Nakamura Date: Tue, 2 Nov 2021 00:20:55 +0900 Subject: [PATCH 1/2] std.os.linux: Add cancel and io_uring_prep_cancel Signed-off-by: Hiroaki Nakamura --- lib/std/os/linux/io_uring.zig | 120 ++++++++++++++++++++++++++++++++++ 1 file changed, 120 insertions(+) diff --git a/lib/std/os/linux/io_uring.zig b/lib/std/os/linux/io_uring.zig index 71e4458579..a8c8778584 100644 --- a/lib/std/os/linux/io_uring.zig +++ b/lib/std/os/linux/io_uring.zig @@ -667,6 +667,26 @@ pub const IO_Uring = struct { return sqe; } + /// Queues (but does not submit) an SQE to remove an existing operation. + /// Returns a pointer to the SQE. + /// + /// The operation is identified by its `user_data`. + /// + /// The completion event result will be `0` if the operation was found and cancelled successfully, + /// `-EALREADY` if the operation was found but was already in progress, or + /// `-ENOENT` if the operation was not found. + pub fn cancel( + self: *IO_Uring, + user_data: u64, + cancel_user_data: u64, + flags: u32, + ) !*io_uring_sqe { + const sqe = try self.get_sqe(); + io_uring_prep_cancel(sqe, cancel_user_data, flags); + sqe.user_data = user_data; + return sqe; + } + /// Registers an array of file descriptors. /// Every time a file descriptor is put in an SQE and submitted to the kernel, the kernel must /// retrieve a reference to the file, and once I/O has completed the file reference must be @@ -1145,6 +1165,28 @@ pub fn io_uring_prep_statx( sqe.rw_flags = flags; } +pub fn io_uring_prep_cancel( + sqe: *io_uring_sqe, + cancel_user_data: u64, + flags: u32, +) void { + sqe.* = .{ + .opcode = .ASYNC_CANCEL, + .flags = 0, + .ioprio = 0, + .fd = -1, + .off = 0, + .addr = cancel_user_data, + .len = 0, + .rw_flags = flags, + .user_data = 0, + .buf_index = 0, + .personality = 0, + .splice_fd_in = 0, + .__pad2 = [2]u64{ 0, 0 }, + }; +} + test "structs/offsets/entries" { if (builtin.os.tag != .linux) return error.SkipZigTest; @@ -1805,3 +1847,81 @@ test "statx" { try testing.expect(buf.mask & os.linux.STATX_SIZE == os.linux.STATX_SIZE); try testing.expectEqual(@as(u64, 6), buf.size); } + +test "accept/connect/recv/cancel" { + if (builtin.os.tag != .linux) return error.SkipZigTest; + + var ring = IO_Uring.init(16, 0) catch |err| switch (err) { + error.SystemOutdated => return error.SkipZigTest, + error.PermissionDenied => return error.SkipZigTest, + else => return err, + }; + defer ring.deinit(); + + const address = try net.Address.parseIp4("127.0.0.1", 3131); + const kernel_backlog = 1; + const server = try os.socket(address.any.family, os.SOCK.STREAM | os.SOCK.CLOEXEC, 0); + defer os.close(server); + try os.setsockopt(server, os.SOL.SOCKET, os.SO.REUSEADDR, &mem.toBytes(@as(c_int, 1))); + try os.bind(server, &address.any, address.getOsSockLen()); + try os.listen(server, kernel_backlog); + + var buffer_recv = [_]u8{ 0, 1, 0, 1, 0 }; + + var accept_addr: os.sockaddr = undefined; + var accept_addr_len: os.socklen_t = @sizeOf(@TypeOf(accept_addr)); + _ = try ring.accept(0xaaaaaaaa, server, &accept_addr, &accept_addr_len, 0); + try testing.expectEqual(@as(u32, 1), try ring.submit()); + + const client = try os.socket(address.any.family, os.SOCK.STREAM | os.SOCK.CLOEXEC, 0); + defer os.close(client); + _ = try ring.connect(0xcccccccc, client, &address.any, address.getOsSockLen()); + try testing.expectEqual(@as(u32, 1), try ring.submit()); + + var cqe_accept = try ring.copy_cqe(); + if (cqe_accept.err() == .INVAL) return error.SkipZigTest; + var cqe_connect = try ring.copy_cqe(); + if (cqe_connect.err() == .INVAL) return error.SkipZigTest; + + // The accept/connect CQEs may arrive in any order, the connect CQE will sometimes come first: + if (cqe_accept.user_data == 0xcccccccc and cqe_connect.user_data == 0xaaaaaaaa) { + const a = cqe_accept; + const b = cqe_connect; + cqe_accept = b; + cqe_connect = a; + } + + try testing.expectEqual(@as(u64, 0xaaaaaaaa), cqe_accept.user_data); + if (cqe_accept.res <= 0) std.debug.print("\ncqe_accept.res={}\n", .{cqe_accept.res}); + try testing.expect(cqe_accept.res > 0); + try testing.expectEqual(@as(u32, 0), cqe_accept.flags); + try testing.expectEqual(linux.io_uring_cqe{ + .user_data = 0xcccccccc, + .res = 0, + .flags = 0, + }, cqe_connect); + + _ = try ring.recv(0xffffffff, cqe_accept.res, buffer_recv[0..], 0); + try testing.expectEqual(@as(u32, 1), try ring.submit()); + + const sqe_cancel = try ring.cancel(0x99999999, 0xffffffff, 0); + try testing.expectEqual(linux.IORING_OP.ASYNC_CANCEL, sqe_cancel.opcode); + try testing.expectEqual(@as(u64, 0xffffffff), sqe_cancel.addr); + try testing.expectEqual(@as(u64, 0x99999999), sqe_cancel.user_data); + try testing.expectEqual(@as(u32, 1), try ring.submit()); + + const cqe_recv = try ring.copy_cqe(); + if (cqe_recv.err() == .INVAL) return error.SkipZigTest; + try testing.expectEqual(linux.io_uring_cqe{ + .user_data = 0xffffffff, + .res = -@as(i32, @enumToInt(linux.E.CANCELED)), + .flags = 0, + }, cqe_recv); + + const cqe_cancel = try ring.copy_cqe(); + try testing.expectEqual(linux.io_uring_cqe{ + .user_data = 0x99999999, + .res = 0, + .flags = 0, + }, cqe_cancel); +} From 77d1d5839aaa24564d10f334f50bb3995fa7c83c Mon Sep 17 00:00:00 2001 From: Hiroaki Nakamura Date: Tue, 9 Nov 2021 00:52:02 +0900 Subject: [PATCH 2/2] Use io_uring_prep_rw in io_uring_prep_cancel follow liburing's API as closely as possible. Signed-off-by: Hiroaki Nakamura --- lib/std/os/linux/io_uring.zig | 17 ++--------------- 1 file changed, 2 insertions(+), 15 deletions(-) diff --git a/lib/std/os/linux/io_uring.zig b/lib/std/os/linux/io_uring.zig index a8c8778584..bcc30cb764 100644 --- a/lib/std/os/linux/io_uring.zig +++ b/lib/std/os/linux/io_uring.zig @@ -1170,21 +1170,8 @@ pub fn io_uring_prep_cancel( cancel_user_data: u64, flags: u32, ) void { - sqe.* = .{ - .opcode = .ASYNC_CANCEL, - .flags = 0, - .ioprio = 0, - .fd = -1, - .off = 0, - .addr = cancel_user_data, - .len = 0, - .rw_flags = flags, - .user_data = 0, - .buf_index = 0, - .personality = 0, - .splice_fd_in = 0, - .__pad2 = [2]u64{ 0, 0 }, - }; + io_uring_prep_rw(.ASYNC_CANCEL, sqe, -1, cancel_user_data, 0, 0); + sqe.rw_flags = flags; } test "structs/offsets/entries" {