From 2da8eff9d6b7f9d784a596836188cbede6cfb1d0 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Igor=20Anic=CC=81?= Date: Thu, 27 Feb 2025 23:55:43 +0100 Subject: [PATCH] io_uring: add bind and listen --- lib/std/os/linux.zig | 10 ++ lib/std/os/linux/IoUring.zig | 186 ++++++++++++++++++++++++++++++ lib/std/os/linux/io_uring_sqe.zig | 47 ++++++++ 3 files changed, 243 insertions(+) diff --git a/lib/std/os/linux.zig b/lib/std/os/linux.zig index d224a45a8f..53424dc547 100644 --- a/lib/std/os/linux.zig +++ b/lib/std/os/linux.zig @@ -5806,6 +5806,9 @@ pub const IORING_OP = enum(u8) { FUTEX_WAITV, FIXED_FD_INSTALL, FTRUNCATE, + BIND, + LISTEN, + RECV_ZC, _, }; @@ -6190,6 +6193,13 @@ pub const IORING_RESTRICTION = enum(u16) { _, }; +pub const IO_URING_SOCKET_OP = enum(u16) { + SIOCIN = 0, + SIOCOUTQ = 1, + GETSOCKOPT = 2, + SETSOCKOPT = 3, +}; + pub const io_uring_buf = extern struct { addr: u64, len: u32, diff --git a/lib/std/os/linux/IoUring.zig b/lib/std/os/linux/IoUring.zig index f26b2ce8bf..7ce2acf49b 100644 --- a/lib/std/os/linux/IoUring.zig +++ b/lib/std/os/linux/IoUring.zig @@ -1356,6 +1356,55 @@ pub fn socket_direct_alloc( return sqe; } +/// Queues (but does not submit) an SQE to perform an `bind(2)` on a socket. +/// Returns a pointer to the SQE. +/// Available since 6.11 +pub fn bind( + self: *IoUring, + user_data: u64, + fd: posix.fd_t, + addr: *const posix.sockaddr, + addrlen: posix.socklen_t, + flags: u32, +) !*linux.io_uring_sqe { + const sqe = try self.get_sqe(); + sqe.prep_bind(fd, addr, addrlen, flags); + sqe.user_data = user_data; + return sqe; +} + +/// Queues (but does not submit) an SQE to perform an `listen(2)` on a socket. +/// Returns a pointer to the SQE. +/// Available since 6.11 +pub fn listen( + self: *IoUring, + user_data: u64, + fd: posix.fd_t, + backlog: usize, + flags: u32, +) !*linux.io_uring_sqe { + const sqe = try self.get_sqe(); + sqe.prep_listen(fd, backlog, flags); + sqe.user_data = user_data; + return sqe; +} + +fn cmd_sock( + self: *IoUring, + user_data: u64, + cmd_op: linux.IO_URING_SOCKET_OP, + fd: linux.fd_t, + level: u32, + optname: u32, + optval: u64, + optlen: u32, +) !*linux.io_uring_sqe { + const sqe = try self.get_sqe(); + sqe.prep_cmd_sock(cmd_op, fd, level, optname, optval, optlen); + sqe.user_data = user_data; + return sqe; +} + pub const SubmissionQueue = struct { head: *u32, tail: *u32, @@ -4305,3 +4354,140 @@ test "copy_cqes with wrapping sq.cqes buffer" { try testing.expectEqual(2 + 4 * i, ring.cq.head.*); } } + +test "bind" { + try skipKernelLessThan(.{ .major = 6, .minor = 11, .patch = 0 }); + + var ring = IoUring.init(4, 0) catch |err| switch (err) { + error.SystemOutdated => return error.SkipZigTest, + error.PermissionDenied => return error.SkipZigTest, + else => return err, + }; + defer ring.deinit(); + + var addr = net.Address.initIp4([4]u8{ 127, 0, 0, 1 }, 0); + const proto: u32 = if (addr.any.family == linux.AF.UNIX) 0 else linux.IPPROTO.TCP; + + const listen_fd = brk: { + // Create socket + _ = try ring.socket(1, addr.any.family, linux.SOCK.STREAM | linux.SOCK.CLOEXEC, proto, 0); + try testing.expectEqual(1, try ring.submit()); + var cqe = try ring.copy_cqe(); + try testing.expectEqual(1, cqe.user_data); + try testing.expectEqual(posix.E.SUCCESS, cqe.err()); + const listen_fd: posix.fd_t = @intCast(cqe.res); + try testing.expect(listen_fd > 2); + + // Prepare: set socket option * 2, bind, listen + var sock_opt: u32 = 1; + var sqe = try ring.cmd_sock(2, .SETSOCKOPT, listen_fd, linux.SOL.SOCKET, linux.SO.REUSEADDR, @intFromPtr(&sock_opt), @sizeOf(u32)); + sqe.flags |= linux.IOSQE_IO_LINK; + sqe = try ring.cmd_sock(3, .SETSOCKOPT, listen_fd, linux.SOL.SOCKET, linux.SO.REUSEPORT, @intFromPtr(&sock_opt), @sizeOf(u32)); + sqe.flags |= linux.IOSQE_IO_LINK; + sqe = try ring.bind(4, listen_fd, &addr.any, addr.getOsSockLen(), 0); + sqe.flags |= linux.IOSQE_IO_LINK; + _ = try ring.listen(5, listen_fd, 1, 0); + // Submit 4 operations + try testing.expectEqual(4, try ring.submit()); + // Expect all to succeed + for (2..6) |user_data| { + cqe = try ring.copy_cqe(); + try testing.expectEqual(user_data, cqe.user_data); + try testing.expectEqual(posix.E.SUCCESS, cqe.err()); + } + + // Check that socket option is set + sock_opt = 0xff; + _ = try ring.cmd_sock(5, .GETSOCKOPT, listen_fd, linux.SOL.SOCKET, linux.SO.REUSEADDR, @intFromPtr(&sock_opt), @sizeOf(u32)); + try testing.expectEqual(1, try ring.submit()); + cqe = try ring.copy_cqe(); + try testing.expectEqual(5, cqe.user_data); + try testing.expectEqual(posix.E.SUCCESS, cqe.err()); + try testing.expectEqual(1, sock_opt); + + // Read system assigned port into addr + var addr_len: posix.socklen_t = addr.getOsSockLen(); + try posix.getsockname(listen_fd, &addr.any, &addr_len); + + break :brk listen_fd; + }; + + const connect_fd = brk: { + // Create connect socket + _ = try ring.socket(6, addr.any.family, linux.SOCK.STREAM | linux.SOCK.CLOEXEC, proto, 0); + try testing.expectEqual(1, try ring.submit()); + const cqe = try ring.copy_cqe(); + try testing.expectEqual(6, cqe.user_data); + try testing.expectEqual(posix.E.SUCCESS, cqe.err()); + // Get connect socket fd + const connect_fd: posix.fd_t = @intCast(cqe.res); + try testing.expect(connect_fd > 2 and connect_fd != listen_fd); + break :brk connect_fd; + }; + + // Prepare accept/connect operations + _ = try ring.accept(7, listen_fd, null, null, 0); + _ = try ring.connect(8, connect_fd, &addr.any, addr.getOsSockLen()); + try testing.expectEqual(2, try ring.submit()); + // Get listener accepted socket + var accept_fd: posix.socket_t = 0; + for (0..2) |_| { + const cqe = try ring.copy_cqe(); + try testing.expectEqual(posix.E.SUCCESS, cqe.err()); + if (cqe.user_data == 7) { + accept_fd = @intCast(cqe.res); + } else { + try testing.expectEqual(8, cqe.user_data); + } + } + try testing.expect(accept_fd > 2 and accept_fd != listen_fd and accept_fd != connect_fd); + + // Communicate + try testSendRecv(&ring, connect_fd, accept_fd); + try testSendRecv(&ring, accept_fd, connect_fd); + + // Shutdown and close all sockets + for ([_]posix.socket_t{ connect_fd, accept_fd, listen_fd }) |fd| { + var sqe = try ring.shutdown(9, fd, posix.SHUT.RDWR); + sqe.flags |= linux.IOSQE_IO_LINK; + _ = try ring.close(10, fd); + try testing.expectEqual(2, try ring.submit()); + for (0..2) |i| { + const cqe = try ring.copy_cqe(); + try testing.expectEqual(posix.E.SUCCESS, cqe.err()); + try testing.expectEqual(9 + i, cqe.user_data); + } + } +} + +fn testSendRecv(ring: *IoUring, send_fd: posix.socket_t, recv_fd: posix.socket_t) !void { + const buffer_send = "0123456789abcdf" ** 10; + var buffer_recv: [buffer_send.len * 2]u8 = undefined; + + // 2 sends + var sqe = try ring.send(1, send_fd, buffer_send, linux.MSG.WAITALL); + sqe.flags |= linux.IOSQE_IO_LINK; + _ = try ring.send(2, send_fd, buffer_send, linux.MSG.WAITALL); + try testing.expectEqual(2, try ring.submit()); + for (0..2) |i| { + const cqe = try ring.copy_cqe(); + try testing.expectEqual(1 + i, cqe.user_data); + try testing.expectEqual(posix.E.SUCCESS, cqe.err()); + try testing.expectEqual(buffer_send.len, @as(usize, @intCast(cqe.res))); + } + + // receive + var recv_len: usize = 0; + while (recv_len < buffer_send.len * 2) { + _ = try ring.recv(3, recv_fd, .{ .buffer = buffer_recv[recv_len..] }, 0); + try testing.expectEqual(1, try ring.submit()); + const cqe = try ring.copy_cqe(); + try testing.expectEqual(3, cqe.user_data); + try testing.expectEqual(posix.E.SUCCESS, cqe.err()); + recv_len += @intCast(cqe.res); + } + + // inspect recv buffer + try testing.expectEqualSlices(u8, buffer_send, buffer_recv[0..buffer_send.len]); + try testing.expectEqualSlices(u8, buffer_send, buffer_recv[buffer_send.len..]); +} diff --git a/lib/std/os/linux/io_uring_sqe.zig b/lib/std/os/linux/io_uring_sqe.zig index 19dae3e8fa..0b7ed519cd 100644 --- a/lib/std/os/linux/io_uring_sqe.zig +++ b/lib/std/os/linux/io_uring_sqe.zig @@ -619,4 +619,51 @@ pub const io_uring_sqe = extern struct { sqe.rw_flags = flags; sqe.splice_fd_in = @bitCast(options); } + + pub fn prep_bind( + sqe: *linux.io_uring_sqe, + fd: linux.fd_t, + addr: *const linux.sockaddr, + addrlen: linux.socklen_t, + flags: u32, + ) void { + sqe.prep_rw(.BIND, fd, @intFromPtr(addr), 0, addrlen); + sqe.rw_flags = flags; + } + + pub fn prep_listen( + sqe: *linux.io_uring_sqe, + fd: linux.fd_t, + backlog: usize, + flags: u32, + ) void { + sqe.prep_rw(.LISTEN, fd, 0, backlog, 0); + sqe.rw_flags = flags; + } + + pub fn prep_cmd_sock( + sqe: *linux.io_uring_sqe, + cmd_op: linux.IO_URING_SOCKET_OP, + fd: linux.fd_t, + level: u32, + optname: u32, + optval: u64, + optlen: u32, + ) void { + sqe.prep_rw(.URING_CMD, fd, 0, 0, 0); + // off is overloaded with cmd_op, https://github.com/axboe/liburing/blob/e1003e496e66f9b0ae06674869795edf772d5500/src/include/liburing/io_uring.h#L39 + sqe.off = @intFromEnum(cmd_op); + // addr is overloaded, https://github.com/axboe/liburing/blob/e1003e496e66f9b0ae06674869795edf772d5500/src/include/liburing/io_uring.h#L46 + sqe.addr = @bitCast(packed struct { + level: u32, + optname: u32, + }{ + .level = level, + .optname = optname, + }); + // splice_fd_in if overloaded u32 -> i32 + sqe.splice_fd_in = @bitCast(optlen); + // addr3 is overloaded, https://github.com/axboe/liburing/blob/e1003e496e66f9b0ae06674869795edf772d5500/src/include/liburing/io_uring.h#L102 + sqe.addr3 = optval; + } };