mirror of
https://github.com/ziglang/zig.git
synced 2026-01-20 14:25:16 +00:00
io_uring: add bind and listen
This commit is contained in:
parent
79460d4a3e
commit
2da8eff9d6
@ -5806,6 +5806,9 @@ pub const IORING_OP = enum(u8) {
|
||||
FUTEX_WAITV,
|
||||
FIXED_FD_INSTALL,
|
||||
FTRUNCATE,
|
||||
BIND,
|
||||
LISTEN,
|
||||
RECV_ZC,
|
||||
|
||||
_,
|
||||
};
|
||||
@ -6190,6 +6193,13 @@ pub const IORING_RESTRICTION = enum(u16) {
|
||||
_,
|
||||
};
|
||||
|
||||
pub const IO_URING_SOCKET_OP = enum(u16) {
|
||||
SIOCIN = 0,
|
||||
SIOCOUTQ = 1,
|
||||
GETSOCKOPT = 2,
|
||||
SETSOCKOPT = 3,
|
||||
};
|
||||
|
||||
pub const io_uring_buf = extern struct {
|
||||
addr: u64,
|
||||
len: u32,
|
||||
|
||||
@ -1356,6 +1356,55 @@ pub fn socket_direct_alloc(
|
||||
return sqe;
|
||||
}
|
||||
|
||||
/// Queues (but does not submit) an SQE to perform an `bind(2)` on a socket.
|
||||
/// Returns a pointer to the SQE.
|
||||
/// Available since 6.11
|
||||
pub fn bind(
|
||||
self: *IoUring,
|
||||
user_data: u64,
|
||||
fd: posix.fd_t,
|
||||
addr: *const posix.sockaddr,
|
||||
addrlen: posix.socklen_t,
|
||||
flags: u32,
|
||||
) !*linux.io_uring_sqe {
|
||||
const sqe = try self.get_sqe();
|
||||
sqe.prep_bind(fd, addr, addrlen, flags);
|
||||
sqe.user_data = user_data;
|
||||
return sqe;
|
||||
}
|
||||
|
||||
/// Queues (but does not submit) an SQE to perform an `listen(2)` on a socket.
|
||||
/// Returns a pointer to the SQE.
|
||||
/// Available since 6.11
|
||||
pub fn listen(
|
||||
self: *IoUring,
|
||||
user_data: u64,
|
||||
fd: posix.fd_t,
|
||||
backlog: usize,
|
||||
flags: u32,
|
||||
) !*linux.io_uring_sqe {
|
||||
const sqe = try self.get_sqe();
|
||||
sqe.prep_listen(fd, backlog, flags);
|
||||
sqe.user_data = user_data;
|
||||
return sqe;
|
||||
}
|
||||
|
||||
fn cmd_sock(
|
||||
self: *IoUring,
|
||||
user_data: u64,
|
||||
cmd_op: linux.IO_URING_SOCKET_OP,
|
||||
fd: linux.fd_t,
|
||||
level: u32,
|
||||
optname: u32,
|
||||
optval: u64,
|
||||
optlen: u32,
|
||||
) !*linux.io_uring_sqe {
|
||||
const sqe = try self.get_sqe();
|
||||
sqe.prep_cmd_sock(cmd_op, fd, level, optname, optval, optlen);
|
||||
sqe.user_data = user_data;
|
||||
return sqe;
|
||||
}
|
||||
|
||||
pub const SubmissionQueue = struct {
|
||||
head: *u32,
|
||||
tail: *u32,
|
||||
@ -4305,3 +4354,140 @@ test "copy_cqes with wrapping sq.cqes buffer" {
|
||||
try testing.expectEqual(2 + 4 * i, ring.cq.head.*);
|
||||
}
|
||||
}
|
||||
|
||||
test "bind" {
|
||||
try skipKernelLessThan(.{ .major = 6, .minor = 11, .patch = 0 });
|
||||
|
||||
var ring = IoUring.init(4, 0) catch |err| switch (err) {
|
||||
error.SystemOutdated => return error.SkipZigTest,
|
||||
error.PermissionDenied => return error.SkipZigTest,
|
||||
else => return err,
|
||||
};
|
||||
defer ring.deinit();
|
||||
|
||||
var addr = net.Address.initIp4([4]u8{ 127, 0, 0, 1 }, 0);
|
||||
const proto: u32 = if (addr.any.family == linux.AF.UNIX) 0 else linux.IPPROTO.TCP;
|
||||
|
||||
const listen_fd = brk: {
|
||||
// Create socket
|
||||
_ = try ring.socket(1, addr.any.family, linux.SOCK.STREAM | linux.SOCK.CLOEXEC, proto, 0);
|
||||
try testing.expectEqual(1, try ring.submit());
|
||||
var cqe = try ring.copy_cqe();
|
||||
try testing.expectEqual(1, cqe.user_data);
|
||||
try testing.expectEqual(posix.E.SUCCESS, cqe.err());
|
||||
const listen_fd: posix.fd_t = @intCast(cqe.res);
|
||||
try testing.expect(listen_fd > 2);
|
||||
|
||||
// Prepare: set socket option * 2, bind, listen
|
||||
var sock_opt: u32 = 1;
|
||||
var sqe = try ring.cmd_sock(2, .SETSOCKOPT, listen_fd, linux.SOL.SOCKET, linux.SO.REUSEADDR, @intFromPtr(&sock_opt), @sizeOf(u32));
|
||||
sqe.flags |= linux.IOSQE_IO_LINK;
|
||||
sqe = try ring.cmd_sock(3, .SETSOCKOPT, listen_fd, linux.SOL.SOCKET, linux.SO.REUSEPORT, @intFromPtr(&sock_opt), @sizeOf(u32));
|
||||
sqe.flags |= linux.IOSQE_IO_LINK;
|
||||
sqe = try ring.bind(4, listen_fd, &addr.any, addr.getOsSockLen(), 0);
|
||||
sqe.flags |= linux.IOSQE_IO_LINK;
|
||||
_ = try ring.listen(5, listen_fd, 1, 0);
|
||||
// Submit 4 operations
|
||||
try testing.expectEqual(4, try ring.submit());
|
||||
// Expect all to succeed
|
||||
for (2..6) |user_data| {
|
||||
cqe = try ring.copy_cqe();
|
||||
try testing.expectEqual(user_data, cqe.user_data);
|
||||
try testing.expectEqual(posix.E.SUCCESS, cqe.err());
|
||||
}
|
||||
|
||||
// Check that socket option is set
|
||||
sock_opt = 0xff;
|
||||
_ = try ring.cmd_sock(5, .GETSOCKOPT, listen_fd, linux.SOL.SOCKET, linux.SO.REUSEADDR, @intFromPtr(&sock_opt), @sizeOf(u32));
|
||||
try testing.expectEqual(1, try ring.submit());
|
||||
cqe = try ring.copy_cqe();
|
||||
try testing.expectEqual(5, cqe.user_data);
|
||||
try testing.expectEqual(posix.E.SUCCESS, cqe.err());
|
||||
try testing.expectEqual(1, sock_opt);
|
||||
|
||||
// Read system assigned port into addr
|
||||
var addr_len: posix.socklen_t = addr.getOsSockLen();
|
||||
try posix.getsockname(listen_fd, &addr.any, &addr_len);
|
||||
|
||||
break :brk listen_fd;
|
||||
};
|
||||
|
||||
const connect_fd = brk: {
|
||||
// Create connect socket
|
||||
_ = try ring.socket(6, addr.any.family, linux.SOCK.STREAM | linux.SOCK.CLOEXEC, proto, 0);
|
||||
try testing.expectEqual(1, try ring.submit());
|
||||
const cqe = try ring.copy_cqe();
|
||||
try testing.expectEqual(6, cqe.user_data);
|
||||
try testing.expectEqual(posix.E.SUCCESS, cqe.err());
|
||||
// Get connect socket fd
|
||||
const connect_fd: posix.fd_t = @intCast(cqe.res);
|
||||
try testing.expect(connect_fd > 2 and connect_fd != listen_fd);
|
||||
break :brk connect_fd;
|
||||
};
|
||||
|
||||
// Prepare accept/connect operations
|
||||
_ = try ring.accept(7, listen_fd, null, null, 0);
|
||||
_ = try ring.connect(8, connect_fd, &addr.any, addr.getOsSockLen());
|
||||
try testing.expectEqual(2, try ring.submit());
|
||||
// Get listener accepted socket
|
||||
var accept_fd: posix.socket_t = 0;
|
||||
for (0..2) |_| {
|
||||
const cqe = try ring.copy_cqe();
|
||||
try testing.expectEqual(posix.E.SUCCESS, cqe.err());
|
||||
if (cqe.user_data == 7) {
|
||||
accept_fd = @intCast(cqe.res);
|
||||
} else {
|
||||
try testing.expectEqual(8, cqe.user_data);
|
||||
}
|
||||
}
|
||||
try testing.expect(accept_fd > 2 and accept_fd != listen_fd and accept_fd != connect_fd);
|
||||
|
||||
// Communicate
|
||||
try testSendRecv(&ring, connect_fd, accept_fd);
|
||||
try testSendRecv(&ring, accept_fd, connect_fd);
|
||||
|
||||
// Shutdown and close all sockets
|
||||
for ([_]posix.socket_t{ connect_fd, accept_fd, listen_fd }) |fd| {
|
||||
var sqe = try ring.shutdown(9, fd, posix.SHUT.RDWR);
|
||||
sqe.flags |= linux.IOSQE_IO_LINK;
|
||||
_ = try ring.close(10, fd);
|
||||
try testing.expectEqual(2, try ring.submit());
|
||||
for (0..2) |i| {
|
||||
const cqe = try ring.copy_cqe();
|
||||
try testing.expectEqual(posix.E.SUCCESS, cqe.err());
|
||||
try testing.expectEqual(9 + i, cqe.user_data);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn testSendRecv(ring: *IoUring, send_fd: posix.socket_t, recv_fd: posix.socket_t) !void {
|
||||
const buffer_send = "0123456789abcdf" ** 10;
|
||||
var buffer_recv: [buffer_send.len * 2]u8 = undefined;
|
||||
|
||||
// 2 sends
|
||||
var sqe = try ring.send(1, send_fd, buffer_send, linux.MSG.WAITALL);
|
||||
sqe.flags |= linux.IOSQE_IO_LINK;
|
||||
_ = try ring.send(2, send_fd, buffer_send, linux.MSG.WAITALL);
|
||||
try testing.expectEqual(2, try ring.submit());
|
||||
for (0..2) |i| {
|
||||
const cqe = try ring.copy_cqe();
|
||||
try testing.expectEqual(1 + i, cqe.user_data);
|
||||
try testing.expectEqual(posix.E.SUCCESS, cqe.err());
|
||||
try testing.expectEqual(buffer_send.len, @as(usize, @intCast(cqe.res)));
|
||||
}
|
||||
|
||||
// receive
|
||||
var recv_len: usize = 0;
|
||||
while (recv_len < buffer_send.len * 2) {
|
||||
_ = try ring.recv(3, recv_fd, .{ .buffer = buffer_recv[recv_len..] }, 0);
|
||||
try testing.expectEqual(1, try ring.submit());
|
||||
const cqe = try ring.copy_cqe();
|
||||
try testing.expectEqual(3, cqe.user_data);
|
||||
try testing.expectEqual(posix.E.SUCCESS, cqe.err());
|
||||
recv_len += @intCast(cqe.res);
|
||||
}
|
||||
|
||||
// inspect recv buffer
|
||||
try testing.expectEqualSlices(u8, buffer_send, buffer_recv[0..buffer_send.len]);
|
||||
try testing.expectEqualSlices(u8, buffer_send, buffer_recv[buffer_send.len..]);
|
||||
}
|
||||
|
||||
@ -619,4 +619,51 @@ pub const io_uring_sqe = extern struct {
|
||||
sqe.rw_flags = flags;
|
||||
sqe.splice_fd_in = @bitCast(options);
|
||||
}
|
||||
|
||||
pub fn prep_bind(
|
||||
sqe: *linux.io_uring_sqe,
|
||||
fd: linux.fd_t,
|
||||
addr: *const linux.sockaddr,
|
||||
addrlen: linux.socklen_t,
|
||||
flags: u32,
|
||||
) void {
|
||||
sqe.prep_rw(.BIND, fd, @intFromPtr(addr), 0, addrlen);
|
||||
sqe.rw_flags = flags;
|
||||
}
|
||||
|
||||
pub fn prep_listen(
|
||||
sqe: *linux.io_uring_sqe,
|
||||
fd: linux.fd_t,
|
||||
backlog: usize,
|
||||
flags: u32,
|
||||
) void {
|
||||
sqe.prep_rw(.LISTEN, fd, 0, backlog, 0);
|
||||
sqe.rw_flags = flags;
|
||||
}
|
||||
|
||||
pub fn prep_cmd_sock(
|
||||
sqe: *linux.io_uring_sqe,
|
||||
cmd_op: linux.IO_URING_SOCKET_OP,
|
||||
fd: linux.fd_t,
|
||||
level: u32,
|
||||
optname: u32,
|
||||
optval: u64,
|
||||
optlen: u32,
|
||||
) void {
|
||||
sqe.prep_rw(.URING_CMD, fd, 0, 0, 0);
|
||||
// off is overloaded with cmd_op, https://github.com/axboe/liburing/blob/e1003e496e66f9b0ae06674869795edf772d5500/src/include/liburing/io_uring.h#L39
|
||||
sqe.off = @intFromEnum(cmd_op);
|
||||
// addr is overloaded, https://github.com/axboe/liburing/blob/e1003e496e66f9b0ae06674869795edf772d5500/src/include/liburing/io_uring.h#L46
|
||||
sqe.addr = @bitCast(packed struct {
|
||||
level: u32,
|
||||
optname: u32,
|
||||
}{
|
||||
.level = level,
|
||||
.optname = optname,
|
||||
});
|
||||
// splice_fd_in if overloaded u32 -> i32
|
||||
sqe.splice_fd_in = @bitCast(optlen);
|
||||
// addr3 is overloaded, https://github.com/axboe/liburing/blob/e1003e496e66f9b0ae06674869795edf772d5500/src/include/liburing/io_uring.h#L102
|
||||
sqe.addr3 = optval;
|
||||
}
|
||||
};
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user