x/io, x/os: async i/o reactor, cross-platform socket syscalls and bits

Cross-platform versions of msghdr, sendmsg, recvmsg, linger, and iovec
were provided based on findings from glibc, musl, and Microsoft's
documentation.

Implemented initial Reactor interface for epoll (linux) which wraps
around I/O reactor subsystems such as epoll, kqueue, select, etc. across
different platforms. The Reactor interface allows for driving async I/O
in Zig applications.

A test was added for the Reactor interface to drive a TCP
client/listener socket pair.

A greatest-common-subset of possible socket initialization flags (close
socket on exec syscalls, initialize socket to be non-blocking) were
implemented.

A test was added for using sendmsg/recvmsg syscalls across different
platforms for a TCP client/listener socket pair.
This commit is contained in:
lithdew 2021-05-12 22:43:34 +09:00 committed by Kenta Iwasaki
parent d496400cff
commit 3600508fe1
21 changed files with 548 additions and 451 deletions

View File

@ -4994,7 +4994,7 @@ pub fn sendmsg(
/// The file descriptor of the sending socket.
sockfd: socket_t,
/// Message header and iovecs
msg: msghdr_const,
msg: std.x.os.Socket.Message,
flags: u32,
) SendMsgError!usize {
while (true) {

View File

@ -768,15 +768,6 @@ pub const dl_phdr_info = extern struct {
dlpi_phdr: [*]std.elf.Phdr,
dlpi_phnum: u16,
};
pub const msghdr = extern struct {
msg_name: ?*c_void,
msg_namelen: socklen_t,
msg_iov: [*c]iovec,
msg_iovlen: c_int,
msg_control: ?*c_void,
msg_controllen: socklen_t,
msg_flags: c_int,
};
pub const cmsghdr = extern struct {
cmsg_len: socklen_t,
cmsg_level: c_int,

View File

@ -82,52 +82,6 @@ pub const Flock = extern struct {
__unused: [4]u8,
};
pub const msghdr = extern struct {
/// optional address
msg_name: ?*sockaddr,
/// size of address
msg_namelen: socklen_t,
/// scatter/gather array
msg_iov: [*]iovec,
/// # elements in msg_iov
msg_iovlen: i32,
/// ancillary data
msg_control: ?*c_void,
/// ancillary data buffer len
msg_controllen: socklen_t,
/// flags on received message
msg_flags: i32,
};
pub const msghdr_const = extern struct {
/// optional address
msg_name: ?*const sockaddr,
/// size of address
msg_namelen: socklen_t,
/// scatter/gather array
msg_iov: [*]iovec_const,
/// # elements in msg_iov
msg_iovlen: i32,
/// ancillary data
msg_control: ?*c_void,
/// ancillary data buffer len
msg_controllen: socklen_t,
/// flags on received message
msg_flags: i32,
};
pub const libc_stat = extern struct {
dev: dev_t,
ino: ino_t,

View File

@ -73,52 +73,6 @@ pub const Flock = extern struct {
__unused: [4]u8,
};
pub const msghdr = extern struct {
/// optional address
msg_name: ?*sockaddr,
/// size of address
msg_namelen: socklen_t,
/// scatter/gather array
msg_iov: [*]iovec,
/// # elements in msg_iov
msg_iovlen: i32,
/// ancillary data
msg_control: ?*c_void,
/// ancillary data buffer len
msg_controllen: socklen_t,
/// flags on received message
msg_flags: i32,
};
pub const msghdr_const = extern struct {
/// optional address
msg_name: ?*const sockaddr,
/// size of address
msg_namelen: socklen_t,
/// scatter/gather array
msg_iov: [*]iovec_const,
/// # elements in msg_iov
msg_iovlen: i32,
/// ancillary data
msg_control: ?*c_void,
/// ancillary data buffer len
msg_controllen: socklen_t,
/// flags on received message
msg_flags: i32,
};
pub const off_t = i64;
pub const ino_t = u64;

View File

@ -1180,12 +1180,12 @@ pub const sockaddr_un = extern struct {
};
pub const mmsghdr = extern struct {
msg_hdr: msghdr,
msg_hdr: std.x.os.Socket.Message,
msg_len: u32,
};
pub const mmsghdr_const = extern struct {
msg_hdr: msghdr_const,
msg_hdr: std.x.os.Socket.Message,
msg_len: u32,
};

View File

@ -526,26 +526,6 @@ pub const Flock = extern struct {
__unused: [4]u8,
};
pub const msghdr = extern struct {
msg_name: ?*sockaddr,
msg_namelen: socklen_t,
msg_iov: [*]iovec,
msg_iovlen: i32,
msg_control: ?*c_void,
msg_controllen: socklen_t,
msg_flags: i32,
};
pub const msghdr_const = extern struct {
msg_name: ?*const sockaddr,
msg_namelen: socklen_t,
msg_iov: [*]iovec_const,
msg_iovlen: i32,
msg_control: ?*c_void,
msg_controllen: socklen_t,
msg_flags: i32,
};
pub const blksize_t = i32;
pub const nlink_t = u32;
pub const time_t = isize;

View File

@ -395,30 +395,6 @@ pub const Flock = extern struct {
__unused: [4]u8,
};
pub const msghdr = extern struct {
msg_name: ?*sockaddr,
msg_namelen: socklen_t,
msg_iov: [*]iovec,
msg_iovlen: i32,
__pad1: i32 = 0,
msg_control: ?*c_void,
msg_controllen: socklen_t,
__pad2: socklen_t = 0,
msg_flags: i32,
};
pub const msghdr_const = extern struct {
msg_name: ?*const sockaddr,
msg_namelen: socklen_t,
msg_iov: [*]iovec_const,
msg_iovlen: i32,
__pad1: i32 = 0,
msg_control: ?*c_void,
msg_controllen: socklen_t,
__pad2: socklen_t = 0,
msg_flags: i32,
};
pub const blksize_t = i32;
pub const nlink_t = u32;
pub const time_t = isize;

View File

@ -523,26 +523,6 @@ pub const Flock = extern struct {
l_pid: pid_t,
};
pub const msghdr = extern struct {
msg_name: ?*sockaddr,
msg_namelen: socklen_t,
msg_iov: [*]iovec,
msg_iovlen: i32,
msg_control: ?*c_void,
msg_controllen: socklen_t,
msg_flags: i32,
};
pub const msghdr_const = extern struct {
msg_name: ?*const sockaddr,
msg_namelen: socklen_t,
msg_iov: [*]iovec_const,
msg_iovlen: i32,
msg_control: ?*c_void,
msg_controllen: socklen_t,
msg_flags: i32,
};
pub const blksize_t = i32;
pub const nlink_t = u32;
pub const time_t = isize;

View File

@ -515,26 +515,6 @@ pub const Flock = extern struct {
l_pid: pid_t,
};
pub const msghdr = extern struct {
msg_name: ?*sockaddr,
msg_namelen: socklen_t,
msg_iov: [*]iovec,
msg_iovlen: usize,
msg_control: ?*c_void,
msg_controllen: socklen_t,
msg_flags: i32,
};
pub const msghdr_const = extern struct {
msg_name: ?*const sockaddr,
msg_namelen: socklen_t,
msg_iov: [*]iovec_const,
msg_iovlen: usize,
msg_control: ?*c_void,
msg_controllen: socklen_t,
msg_flags: i32,
};
pub const blksize_t = i32;
pub const nlink_t = u32;
pub const time_t = isize;

View File

@ -491,26 +491,6 @@ pub const Flock = extern struct {
__unused: [4]u8,
};
pub const msghdr = extern struct {
msg_name: ?*sockaddr,
msg_namelen: socklen_t,
msg_iov: [*]iovec,
msg_iovlen: usize,
msg_control: ?*c_void,
msg_controllen: usize,
msg_flags: i32,
};
pub const msghdr_const = extern struct {
msg_name: ?*const sockaddr,
msg_namelen: socklen_t,
msg_iov: [*]iovec_const,
msg_iovlen: usize,
msg_control: ?*c_void,
msg_controllen: usize,
msg_flags: i32,
};
pub const blksize_t = i64;
pub const nlink_t = u64;
pub const time_t = i64;

View File

@ -465,26 +465,6 @@ pub const Flock = extern struct {
l_pid: pid_t,
};
pub const msghdr = extern struct {
msg_name: ?*sockaddr,
msg_namelen: socklen_t,
msg_iov: [*]iovec,
msg_iovlen: u64,
msg_control: ?*c_void,
msg_controllen: u64,
msg_flags: i32,
};
pub const msghdr_const = extern struct {
msg_name: ?*const sockaddr,
msg_namelen: socklen_t,
msg_iov: [*]iovec_const,
msg_iovlen: u64,
msg_control: ?*c_void,
msg_controllen: u64,
msg_flags: i32,
};
pub const off_t = i64;
pub const ino_t = u64;
pub const mode_t = u32;

View File

@ -489,30 +489,6 @@ pub const Flock = extern struct {
l_pid: pid_t,
};
pub const msghdr = extern struct {
msg_name: ?*sockaddr,
msg_namelen: socklen_t,
msg_iov: [*]iovec,
msg_iovlen: i32,
__pad1: i32 = 0,
msg_control: ?*c_void,
msg_controllen: socklen_t,
__pad2: socklen_t = 0,
msg_flags: i32,
};
pub const msghdr_const = extern struct {
msg_name: ?*const sockaddr,
msg_namelen: socklen_t,
msg_iov: [*]iovec_const,
msg_iovlen: i32,
__pad1: i32 = 0,
msg_control: ?*c_void,
msg_controllen: socklen_t,
__pad2: socklen_t = 0,
msg_flags: i32,
};
pub const off_t = i64;
pub const ino_t = u64;
pub const dev_t = u64;

View File

@ -108,52 +108,6 @@ pub const EAI = enum(c_int) {
pub const EAI_MAX = 15;
pub const msghdr = extern struct {
/// optional address
msg_name: ?*sockaddr,
/// size of address
msg_namelen: socklen_t,
/// scatter/gather array
msg_iov: [*]iovec,
/// # elements in msg_iov
msg_iovlen: i32,
/// ancillary data
msg_control: ?*c_void,
/// ancillary data buffer len
msg_controllen: socklen_t,
/// flags on received message
msg_flags: i32,
};
pub const msghdr_const = extern struct {
/// optional address
msg_name: ?*const sockaddr,
/// size of address
msg_namelen: socklen_t,
/// scatter/gather array
msg_iov: [*]iovec_const,
/// # elements in msg_iov
msg_iovlen: i32,
/// ancillary data
msg_control: ?*c_void,
/// ancillary data buffer len
msg_controllen: socklen_t,
/// flags on received message
msg_flags: i32,
};
pub const libc_stat = extern struct {
dev: dev_t,
mode: mode_t,

View File

@ -124,52 +124,6 @@ pub const EAI = enum(c_int) {
pub const EAI_MAX = 15;
pub const msghdr = extern struct {
/// optional address
msg_name: ?*sockaddr,
/// size of address
msg_namelen: socklen_t,
/// scatter/gather array
msg_iov: [*]iovec,
/// # elements in msg_iov
msg_iovlen: c_uint,
/// ancillary data
msg_control: ?*c_void,
/// ancillary data buffer len
msg_controllen: socklen_t,
/// flags on received message
msg_flags: c_int,
};
pub const msghdr_const = extern struct {
/// optional address
msg_name: ?*const sockaddr,
/// size of address
msg_namelen: socklen_t,
/// scatter/gather array
msg_iov: [*]iovec_const,
/// # elements in msg_iov
msg_iovlen: c_uint,
/// ancillary data
msg_control: ?*c_void,
/// ancillary data buffer len
msg_controllen: socklen_t,
/// flags on received message
msg_flags: c_int,
};
pub const libc_stat = extern struct {
mode: mode_t,
dev: dev_t,

View File

@ -3,6 +3,7 @@
// This file is part of [zig](https://ziglang.org/), which is MIT licensed.
// The MIT license requires this copyright notice to be included in all copies
// and substantial portions of the software.
const std = @import("../../std.zig");
usingnamespace @import("bits.zig");
pub const SOCKET = *opaque {};
@ -1093,27 +1094,6 @@ pub const WSABUF = extern struct {
buf: [*]u8,
};
pub const msghdr = WSAMSG;
pub const msghdr_const = WSAMSG_const;
pub const WSAMSG_const = extern struct {
name: *const sockaddr,
namelen: INT,
lpBuffers: [*]WSABUF,
dwBufferCount: DWORD,
Control: WSABUF,
dwFlags: DWORD,
};
pub const WSAMSG = extern struct {
name: *sockaddr,
namelen: INT,
lpBuffers: [*]WSABUF,
dwBufferCount: DWORD,
Control: WSABUF,
dwFlags: DWORD,
};
pub const WSAPOLLFD = pollfd;
pub const pollfd = extern struct {
@ -1163,7 +1143,7 @@ pub const LPFN_GETACCEPTEXSOCKADDRS = fn (
pub const LPFN_WSASENDMSG = fn (
s: SOCKET,
lpMsg: *const WSAMSG_const,
lpMsg: *const std.x.os.Socket.Message,
dwFlags: u32,
lpNumberOfBytesSent: ?*u32,
lpOverlapped: ?*OVERLAPPED,
@ -1172,7 +1152,7 @@ pub const LPFN_WSASENDMSG = fn (
pub const LPFN_WSARECVMSG = fn (
s: SOCKET,
lpMsg: *WSAMSG,
lpMsg: *std.x.os.Socket.Message,
lpdwNumberOfBytesRecv: ?*u32,
lpOverlapped: ?*OVERLAPPED,
lpCompletionRoutine: ?LPWSAOVERLAPPED_COMPLETION_ROUTINE,
@ -2046,7 +2026,7 @@ pub extern "ws2_32" fn WSASend(
pub extern "ws2_32" fn WSASendMsg(
s: SOCKET,
lpMsg: *const WSAMSG_const,
lpMsg: *const std.x.os.Socket.Message,
dwFlags: u32,
lpNumberOfBytesSent: ?*u32,
lpOverlapped: ?*OVERLAPPED,
@ -2055,7 +2035,7 @@ pub extern "ws2_32" fn WSASendMsg(
pub extern "ws2_32" fn WSARecvMsg(
s: SOCKET,
lpMsg: *WSAMSG,
lpMsg: *std.x.os.Socket.Message,
lpdwNumberOfBytesRecv: ?*u32,
lpOverlapped: ?*OVERLAPPED,
lpCompletionRoutine: ?LPWSAOVERLAPPED_COMPLETION_ROUTINE,

View File

@ -8,6 +8,7 @@ const std = @import("std.zig");
pub const os = struct {
pub const Socket = @import("x/os/socket.zig").Socket;
pub usingnamespace @import("x/os/io.zig");
pub usingnamespace @import("x/os/net.zig");
};

View File

@ -18,6 +18,7 @@ const testing = std.testing;
const IPv4 = std.x.os.IPv4;
const IPv6 = std.x.os.IPv6;
const Socket = std.x.os.Socket;
const Buffer = std.x.os.Buffer;
/// A generic TCP socket abstraction.
const tcp = @This();
@ -82,12 +83,13 @@ pub const Client = struct {
};
/// Opens a new client.
pub fn init(domain: tcp.Domain, flags: u32) !Client {
pub fn init(domain: tcp.Domain, flags: std.enums.EnumFieldStruct(Socket.InitFlags, bool, false)) !Client {
return Client{
.socket = try Socket.init(
@enumToInt(domain),
os.SOCK_STREAM | flags,
os.SOCK_STREAM,
os.IPPROTO_TCP,
flags,
),
};
}
@ -143,14 +145,14 @@ pub const Client = struct {
/// Writes multiple I/O vectors with a prepended message header to the socket
/// with a set of flags specified. It returns the number of bytes that are
/// written to the socket.
pub fn writeVectorized(self: Client, msg: os.msghdr_const, flags: u32) !usize {
pub fn writeVectorized(self: Client, msg: Socket.Message, flags: u32) !usize {
return self.socket.writeVectorized(msg, flags);
}
/// Read multiple I/O vectors with a prepended message header from the socket
/// with a set of flags specified. It returns the number of bytes that were
/// read into the buffer provided.
pub fn readVectorized(self: Client, msg: *os.msghdr, flags: u32) !usize {
pub fn readVectorized(self: Client, msg: *Socket.Message, flags: u32) !usize {
return self.socket.readVectorized(msg, flags);
}
@ -244,12 +246,13 @@ pub const Listener = struct {
socket: Socket,
/// Opens a new listener.
pub fn init(domain: tcp.Domain, flags: u32) !Listener {
pub fn init(domain: tcp.Domain, flags: std.enums.EnumFieldStruct(Socket.InitFlags, bool, false)) !Listener {
return Listener{
.socket = try Socket.init(
@enumToInt(domain),
os.SOCK_STREAM | flags,
os.SOCK_STREAM,
os.IPPROTO_TCP,
flags,
),
};
}
@ -278,7 +281,7 @@ pub const Listener = struct {
/// Accept a pending incoming connection queued to the kernel backlog
/// of the listener's socket.
pub fn accept(self: Listener, flags: u32) !tcp.Connection {
pub fn accept(self: Listener, flags: std.enums.EnumFieldStruct(Socket.InitFlags, bool, false)) !tcp.Connection {
return tcp.Connection.from(try self.socket.accept(flags));
}
@ -324,7 +327,7 @@ pub const Listener = struct {
test "tcp: create client/listener pair" {
if (builtin.os.tag == .wasi) return error.SkipZigTest;
const listener = try tcp.Listener.init(.ip, os.SOCK_CLOEXEC);
const listener = try tcp.Listener.init(.ip, .{ .close_on_exec = true });
defer listener.deinit();
try listener.bind(ip.Address.initIPv4(IPv4.unspecified, 0));
@ -336,19 +339,19 @@ test "tcp: create client/listener pair" {
.ipv6 => |*ipv6| ipv6.host = IPv6.localhost,
}
const client = try tcp.Client.init(.ip, os.SOCK_CLOEXEC);
const client = try tcp.Client.init(.ip, .{ .close_on_exec = true });
defer client.deinit();
try client.connect(binded_address);
const conn = try listener.accept(os.SOCK_CLOEXEC);
const conn = try listener.accept(.{ .close_on_exec = true });
defer conn.deinit();
}
test "tcp/client: set read timeout of 1 millisecond on blocking client" {
test "tcp/client: 1ms read timeout" {
if (builtin.os.tag == .wasi) return error.SkipZigTest;
const listener = try tcp.Listener.init(.ip, os.SOCK_CLOEXEC);
const listener = try tcp.Listener.init(.ip, .{ .close_on_exec = true });
defer listener.deinit();
try listener.bind(ip.Address.initIPv4(IPv4.unspecified, 0));
@ -360,23 +363,62 @@ test "tcp/client: set read timeout of 1 millisecond on blocking client" {
.ipv6 => |*ipv6| ipv6.host = IPv6.localhost,
}
const client = try tcp.Client.init(.ip, os.SOCK_CLOEXEC);
const client = try tcp.Client.init(.ip, .{ .close_on_exec = true });
defer client.deinit();
try client.connect(binded_address);
try client.setReadTimeout(1);
const conn = try listener.accept(os.SOCK_CLOEXEC);
const conn = try listener.accept(.{ .close_on_exec = true });
defer conn.deinit();
var buf: [1]u8 = undefined;
try testing.expectError(error.WouldBlock, client.reader(0).read(&buf));
}
test "tcp/client: read and write multiple vectors" {
if (builtin.os.tag == .wasi) return error.SkipZigTest;
const listener = try tcp.Listener.init(.ip, .{ .close_on_exec = true });
defer listener.deinit();
try listener.bind(ip.Address.initIPv4(IPv4.unspecified, 0));
try listener.listen(128);
var binded_address = try listener.getLocalAddress();
switch (binded_address) {
.ipv4 => |*ipv4| ipv4.host = IPv4.localhost,
.ipv6 => |*ipv6| ipv6.host = IPv6.localhost,
}
const client = try tcp.Client.init(.ip, .{ .close_on_exec = true });
defer client.deinit();
try client.connect(binded_address);
const conn = try listener.accept(.{ .close_on_exec = true });
defer conn.deinit();
const message = "hello world";
_ = try conn.client.writeVectorized(Socket.Message.fromBuffers(&[_]Buffer{
Buffer.from(message[0 .. message.len / 2]),
Buffer.from(message[message.len / 2 ..]),
}), 0);
var buf: [message.len]u8 = undefined;
var msg = Socket.Message.fromBuffers(&[_]Buffer{
Buffer.from(buf[0 .. message.len / 2]),
Buffer.from(buf[message.len / 2 ..]),
});
_ = try client.readVectorized(&msg, 0);
try testing.expectEqualStrings(message, &buf);
}
test "tcp/listener: bind to unspecified ipv4 address" {
if (builtin.os.tag == .wasi) return error.SkipZigTest;
const listener = try tcp.Listener.init(.ip, os.SOCK_CLOEXEC);
const listener = try tcp.Listener.init(.ip, .{ .close_on_exec = true });
defer listener.deinit();
try listener.bind(ip.Address.initIPv4(IPv4.unspecified, 0));
@ -389,7 +431,7 @@ test "tcp/listener: bind to unspecified ipv4 address" {
test "tcp/listener: bind to unspecified ipv6 address" {
if (builtin.os.tag == .wasi) return error.SkipZigTest;
const listener = try tcp.Listener.init(.ipv6, os.SOCK_CLOEXEC);
const listener = try tcp.Listener.init(.ipv6, .{ .close_on_exec = true });
defer listener.deinit();
try listener.bind(ip.Address.initIPv6(IPv6.unspecified, 0));

205
lib/std/x/os/io.zig Normal file
View File

@ -0,0 +1,205 @@
const std = @import("../../std.zig");
const os = std.os;
const mem = std.mem;
const builtin = std.builtin;
const testing = std.testing;
/// POSIX `iovec`, or Windows `WSABUF`. The difference between the two are the ordering
/// of fields, alongside the length being represented as either a ULONG or a size_t.
pub const Buffer = if (builtin.os.tag == .windows)
extern struct {
len: c_ulong,
ptr: usize,
pub fn from(slice: []const u8) Buffer {
return .{ .len = @intCast(c_ulong, slice.len), .ptr = @ptrToInt(slice.ptr) };
}
pub fn into(self: Buffer) []const u8 {
return @intToPtr([*]const u8, self.ptr)[0..self.len];
}
pub fn intoMutable(self: Buffer) []u8 {
return @intToPtr([*]u8, self.ptr)[0..self.len];
}
}
else
extern struct {
ptr: usize,
len: usize,
pub fn from(slice: []const u8) Buffer {
return .{ .ptr = @ptrToInt(slice.ptr), .len = slice.len };
}
pub fn into(self: Buffer) []const u8 {
return @intToPtr([*]const u8, self.ptr)[0..self.len];
}
pub fn intoMutable(self: Buffer) []u8 {
return @intToptr([*]u8, self.ptr)[0..self.len];
}
};
pub const Reactor = struct {
pub const InitFlags = enum {
close_on_exec,
};
pub const Event = struct {
data: usize,
is_error: bool,
is_hup: bool,
is_readable: bool,
is_writable: bool,
};
pub const Interest = struct {
hup: bool = false,
oneshot: bool = false,
readable: bool = false,
writable: bool = false,
};
fd: os.fd_t,
pub fn init(flags: std.enums.EnumFieldStruct(Reactor.InitFlags, bool, false)) !Reactor {
var raw_flags: u32 = 0;
const set = std.EnumSet(Reactor.InitFlags).init(flags);
if (set.contains(.close_on_exec)) raw_flags |= os.EPOLL_CLOEXEC;
return Reactor{ .fd = try os.epoll_create1(raw_flags) };
}
pub fn deinit(self: Reactor) void {
os.close(self.fd);
}
pub fn update(self: Reactor, fd: os.fd_t, identifier: usize, interest: Reactor.Interest) !void {
var flags: u32 = 0;
flags |= if (interest.oneshot) os.EPOLLONESHOT else os.EPOLLET;
if (interest.hup) flags |= os.EPOLLRDHUP;
if (interest.readable) flags |= os.EPOLLIN;
if (interest.writable) flags |= os.EPOLLOUT;
const event = &os.epoll_event{
.events = flags,
.data = .{ .ptr = identifier },
};
os.epoll_ctl(self.fd, os.EPOLL_CTL_MOD, fd, event) catch |err| switch (err) {
error.FileDescriptorNotRegistered => try os.epoll_ctl(self.fd, os.EPOLL_CTL_ADD, fd, event),
else => return err,
};
}
pub fn poll(self: Reactor, comptime max_num_events: comptime_int, closure: anytype, timeout_milliseconds: ?u64) !void {
var events: [max_num_events]os.epoll_event = undefined;
const num_events = os.epoll_wait(self.fd, &events, if (timeout_milliseconds) |ms| @intCast(i32, ms) else -1);
for (events[0..num_events]) |ev| {
const is_error = ev.events & os.EPOLLERR != 0;
const is_hup = ev.events & (os.EPOLLHUP | os.EPOLLRDHUP) != 0;
const is_readable = ev.events & os.EPOLLIN != 0;
const is_writable = ev.events & os.EPOLLOUT != 0;
try closure.call(Reactor.Event{
.data = ev.data.ptr,
.is_error = is_error,
.is_hup = is_hup,
.is_readable = is_readable,
.is_writable = is_writable,
});
}
}
};
test "reactor/linux: drive async tcp client/listener pair" {
if (builtin.os.tag != .linux) return error.SkipZigTest;
const ip = std.x.net.ip;
const tcp = std.x.net.tcp;
const IPv4 = std.x.os.IPv4;
const IPv6 = std.x.os.IPv6;
const Socket = std.x.os.Socket;
const reactor = try Reactor.init(.{ .close_on_exec = true });
defer reactor.deinit();
const listener = try tcp.Listener.init(.ip, .{
.close_on_exec = true,
.nonblocking = true,
});
defer listener.deinit();
try reactor.update(listener.socket.fd, 0, .{ .readable = true });
try reactor.poll(1, struct {
fn call(event: Reactor.Event) !void {
try testing.expectEqual(Reactor.Event{
.data = 0,
.is_error = false,
.is_hup = true,
.is_readable = false,
.is_writable = false,
}, event);
}
}, null);
try listener.bind(ip.Address.initIPv4(IPv4.unspecified, 0));
try listener.listen(128);
var binded_address = try listener.getLocalAddress();
switch (binded_address) {
.ipv4 => |*ipv4| ipv4.host = IPv4.localhost,
.ipv6 => |*ipv6| ipv6.host = IPv6.localhost,
}
const client = try tcp.Client.init(.ip, .{
.close_on_exec = true,
.nonblocking = true,
});
defer client.deinit();
try reactor.update(client.socket.fd, 1, .{ .readable = true, .writable = true });
try reactor.poll(1, struct {
fn call(event: Reactor.Event) !void {
try testing.expectEqual(Reactor.Event{
.data = 1,
.is_error = false,
.is_hup = true,
.is_readable = false,
.is_writable = true,
}, event);
}
}, null);
client.connect(binded_address) catch |err| switch (err) {
error.WouldBlock => {},
else => return err,
};
try reactor.poll(1, struct {
fn call(event: Reactor.Event) !void {
try testing.expectEqual(Reactor.Event{
.data = 1,
.is_error = false,
.is_hup = false,
.is_readable = false,
.is_writable = true,
}, event);
}
}, null);
try reactor.poll(1, struct {
fn call(event: Reactor.Event) !void {
try testing.expectEqual(Reactor.Event{
.data = 0,
.is_error = false,
.is_hup = false,
.is_readable = true,
.is_writable = false,
}, event);
}
}, null);
}

View File

@ -11,8 +11,13 @@ const os = std.os;
const fmt = std.fmt;
const mem = std.mem;
const time = std.time;
const meta = std.meta;
const builtin = std.builtin;
const Buffer = std.x.os.Buffer;
const assert = std.debug.assert;
/// A generic, cross-platform socket abstraction.
pub const Socket = struct {
/// A socket-address pair.
@ -29,6 +34,32 @@ pub const Socket = struct {
/// A generic socket address abstraction. It is safe to directly access and modify
/// the fields of a `Socket.Address`.
pub const Address = union(enum) {
pub const Native = struct {
pub const requires_prepended_length = builtin.os.getVersionRange() == .semver;
pub const Length = if (requires_prepended_length) u8 else [0]u8;
pub const Family = if (requires_prepended_length) u8 else c_ushort;
/// POSIX `sockaddr_storage`. The expected size and alignment is specified in IETF RFC 2553.
pub const Storage = extern struct {
pub const expected_size = 128;
pub const expected_alignment = 8;
pub const padding_size = expected_size -
mem.alignForward(@sizeOf(Address.Native.Length), expected_alignment) -
mem.alignForward(@sizeOf(Address.Native.Family), expected_alignment);
len: Address.Native.Length align(expected_alignment) = undefined,
family: Address.Native.Family align(expected_alignment) = undefined,
padding: [padding_size]u8 align(expected_alignment) = undefined,
comptime {
assert(@sizeOf(Storage) == Storage.expected_size);
assert(@alignOf(Storage) == Storage.expected_alignment);
}
};
};
ipv4: net.IPv4.Address,
ipv6: net.IPv6.Address,
@ -107,6 +138,174 @@ pub const Socket = struct {
}
};
/// POSIX `msghdr`. Denotes a destination address, set of buffers, control data, and flags. Ported
/// directly from musl.
pub const Message = if (builtin.os.isAtLeast(.windows, .vista) != null and builtin.os.isAtLeast(.windows, .vista).?)
extern struct {
name: usize = @ptrToInt(@as(?[*]u8, null)),
name_len: c_int = 0,
buffers: usize = undefined,
buffers_len: c_ulong = undefined,
control: Buffer = .{
.ptr = @ptrToInt(@as(?[*]u8, null)),
.len = 0,
},
flags: c_ulong = 0,
pub usingnamespace MessageMixin(Message);
}
else if (builtin.os.tag == .windows)
extern struct {
name: usize = @ptrToInt(@as(?[*]u8, null)),
name_len: c_int = 0,
buffers: usize = undefined,
buffers_len: u32 = undefined,
control: Buffer = .{
.ptr = @ptrToInt(@as(?[*]u8, null)),
.len = 0,
},
flags: u32 = 0,
pub usingnamespace MessageMixin(Message);
}
else if (@sizeOf(usize) > 4 and builtin.endian == .Big)
extern struct {
name: usize = @ptrToInt(@as(?[*]u8, null)),
name_len: c_uint = 0,
buffers: usize = undefined,
_pad_1: c_int = 0,
buffers_len: c_int = undefined,
control: usize = @ptrToInt(@as(?[*]u8, null)),
_pad_2: c_int = 0,
control_len: c_uint = 0,
flags: c_int = 0,
pub usingnamespace MessageMixin(Message);
}
else if (@sizeOf(usize) > 4 and builtin.endian == .Little)
extern struct {
name: usize = @ptrToInt(@as(?[*]u8, null)),
name_len: c_uint = 0,
buffers: usize = undefined,
buffers_len: c_int = undefined,
_pad_1: c_int = 0,
control: usize = @ptrToInt(@as(?[*]u8, null)),
control_len: c_uint = 0,
_pad_2: c_int = 0,
flags: c_int = 0,
pub usingnamespace MessageMixin(Message);
}
else
extern struct {
name: usize = @ptrToInt(@as(?[*]u8, null)),
name_len: c_uint = 0,
buffers: usize = undefined,
buffers_len: c_int,
control: usize = null,
control_len: c_uint = 0,
flags: c_int = 0,
pub usingnamespace MessageMixin(Message);
};
fn MessageMixin(comptime Self: type) type {
return struct {
pub fn fromBuffers(buffers: []const Buffer) Self {
var self: Self = .{};
self.setBuffers(buffers);
return self;
}
pub fn setName(self: *Self, name: []const u8) void {
self.name = @ptrToInt(name.ptr);
self.name_len = @intCast(meta.fieldInfo(Self, .name_len).field_type, name.len);
}
pub fn setBuffers(self: *Self, buffers: []const Buffer) void {
self.buffers = @ptrToInt(buffers.ptr);
self.buffers_len = @intCast(meta.fieldInfo(Self, .buffers_len).field_type, buffers.len);
}
pub fn setControl(self: *Self, control: []const u8) void {
if (builtin.os.tag == .windows) {
self.control = Buffer.from(control);
} else {
self.control = @ptrToInt(control.ptr);
self.control_len = @intCast(meta.fieldInfo(Self, .control_len).field_type, control.len);
}
}
pub fn setFlags(self: *Self, flags: u32) void {
self.flags = @intCast(meta.fieldInfo(Self, .flags).field_type, flags);
}
pub fn getName(self: Self) []const u8 {
return @intToPtr([*]const u8, self.name)[0..@intCast(usize, self.name_len)];
}
pub fn getBuffers(self: Self) []const Buffer {
return @intToPtr([*]const Buffer, self.buffers)[0..@intCast(usize, self.buffers_len)];
}
pub fn getControl(self: Self) []const u8 {
if (builtin.os.tag == .windows) {
return self.control.into();
} else {
return @intToPtr([*]const u8, self.control)[0..@intCast(usize, self.control_len)];
}
}
pub fn getFlags(self: Self) u32 {
return @intCast(u32, self.flags);
}
};
}
/// POSIX `linger`, denoting the linger settings of a socket.
///
/// Microsoft's documentation and glibc denote the fields to be unsigned
/// short's on Windows, whereas glibc and musl denote the fields to be
/// int's on every other platform.
pub const Linger = extern struct {
pub const Field = switch (builtin.os.tag) {
.windows => c_ushort,
else => c_int,
};
enabled: Field,
timeout_seconds: Field,
pub fn init(timeout_seconds: ?u16) Socket.Linger {
return .{
.enabled = @intCast(Socket.Linger.Field, @boolToInt(timeout_seconds != null)),
.timeout_seconds = if (timeout_seconds) |seconds| @intCast(Socket.Linger.Field, seconds) else 0,
};
}
};
/// Possible set of flags to initialize a socket with.
pub const InitFlags = enum {
// Initialize a socket to be non-blocking.
nonblocking,
// Have a socket close itself on exec syscalls.
close_on_exec,
};
/// The underlying handle of a socket.
fd: os.socket_t,

View File

@ -13,8 +13,12 @@ const time = std.time;
pub fn Mixin(comptime Socket: type) type {
return struct {
/// Open a new socket.
pub fn init(domain: u32, socket_type: u32, protocol: u32) !Socket {
return Socket{ .fd = try os.socket(domain, socket_type, protocol) };
pub fn init(domain: u32, socket_type: u32, protocol: u32, flags: std.enums.EnumFieldStruct(Socket.InitFlags, bool, false)) !Socket {
var raw_flags: u32 = socket_type;
const set = std.EnumSet(Socket.InitFlags).init(flags);
if (set.contains(.close_on_exec)) raw_flags |= os.SOCK_CLOEXEC;
if (set.contains(.nonblocking)) raw_flags |= os.SOCK_NONBLOCK;
return Socket{ .fd = try os.socket(domain, raw_flags, protocol) };
}
/// Closes the socket.
@ -44,11 +48,16 @@ pub fn Mixin(comptime Socket: type) type {
/// Accept a pending incoming connection queued to the kernel backlog
/// of the socket.
pub fn accept(self: Socket, flags: u32) !Socket.Connection {
var address: os.sockaddr_storage = undefined;
var address_len: u32 = @sizeOf(os.sockaddr_storage);
pub fn accept(self: Socket, flags: std.enums.EnumFieldStruct(Socket.InitFlags, bool, false)) !Socket.Connection {
var address: Socket.Address.Native.Storage = undefined;
var address_len: u32 = @sizeOf(Socket.Address.Native.Storage);
const socket = Socket{ .fd = try os.accept(self.fd, @ptrCast(*os.sockaddr, &address), &address_len, flags) };
var raw_flags: u32 = 0;
const set = std.EnumSet(Socket.InitFlags).init(flags);
if (set.contains(.close_on_exec)) raw_flags |= os.SOCK_CLOEXEC;
if (set.contains(.nonblocking)) raw_flags |= os.SOCK_NONBLOCK;
const socket = Socket{ .fd = try os.accept(self.fd, @ptrCast(*os.sockaddr, &address), &address_len, raw_flags) };
const socket_address = Socket.Address.fromNative(@ptrCast(*os.sockaddr, &address));
return Socket.Connection.from(socket, socket_address);
@ -69,48 +78,45 @@ pub fn Mixin(comptime Socket: type) type {
/// Writes multiple I/O vectors with a prepended message header to the socket
/// with a set of flags specified. It returns the number of bytes that are
/// written to the socket.
pub fn writeVectorized(self: Socket, msg: os.msghdr_const, flags: u32) !usize {
pub fn writeVectorized(self: Socket, msg: Socket.Message, flags: u32) !usize {
return os.sendmsg(self.fd, msg, flags);
}
/// Read multiple I/O vectors with a prepended message header from the socket
/// with a set of flags specified. It returns the number of bytes that were
/// read into the buffer provided.
pub fn readVectorized(self: Socket, msg: *os.msghdr, flags: u32) !usize {
if (comptime @hasDecl(os.system, "recvmsg")) {
while (true) {
const rc = os.system.recvmsg(self.fd, msg, flags);
return switch (os.errno(rc)) {
0 => @intCast(usize, rc),
os.EBADF => unreachable, // always a race condition
os.EFAULT => unreachable,
os.EINVAL => unreachable,
os.ENOTCONN => unreachable,
os.ENOTSOCK => unreachable,
os.EINTR => continue,
os.EAGAIN => error.WouldBlock,
os.ENOMEM => error.SystemResources,
os.ECONNREFUSED => error.ConnectionRefused,
os.ECONNRESET => error.ConnectionResetByPeer,
else => |err| os.unexpectedErrno(err),
};
}
pub fn readVectorized(self: Socket, msg: *Socket.Message, flags: u32) !usize {
while (true) {
const rc = os.system.recvmsg(self.fd, msg, flags);
return switch (os.errno(rc)) {
0 => @intCast(usize, rc),
os.EBADF => unreachable, // always a race condition
os.EFAULT => unreachable,
os.EINVAL => unreachable,
os.ENOTCONN => unreachable,
os.ENOTSOCK => unreachable,
os.EINTR => continue,
os.EAGAIN => error.WouldBlock,
os.ENOMEM => error.SystemResources,
os.ECONNREFUSED => error.ConnectionRefused,
os.ECONNRESET => error.ConnectionResetByPeer,
else => |err| os.unexpectedErrno(err),
};
}
return error.NotSupported;
}
/// Query the address that the socket is locally bounded to.
pub fn getLocalAddress(self: Socket) !Socket.Address {
var address: os.sockaddr_storage = undefined;
var address_len: u32 = @sizeOf(os.sockaddr_storage);
var address: Socket.Address.Native.Storage = undefined;
var address_len: u32 = @sizeOf(Socket.Address.Native.Storage);
try os.getsockname(self.fd, @ptrCast(*os.sockaddr, &address), &address_len);
return Socket.Address.fromNative(@ptrCast(*os.sockaddr, &address));
}
/// Query the address that the socket is connected to.
pub fn getRemoteAddress(self: Socket) !Socket.Address {
var address: os.sockaddr_storage = undefined;
var address_len: u32 = @sizeOf(os.sockaddr_storage);
var address: Socket.Address.Native.Storage = undefined;
var address_len: u32 = @sizeOf(Socket.Address.Native.Storage);
try os.getpeername(self.fd, @ptrCast(*os.sockaddr, &address), &address_len);
return Socket.Address.fromNative(@ptrCast(*os.sockaddr, &address));
}
@ -165,14 +171,7 @@ pub fn Mixin(comptime Socket: type) type {
/// seconds.
pub fn setLinger(self: Socket, timeout_seconds: ?u16) !void {
if (comptime @hasDecl(os, "SO_LINGER")) {
const settings = extern struct {
l_onoff: c_int,
l_linger: c_int,
}{
.l_onoff = @intCast(c_int, @boolToInt(timeout_seconds != null)),
.l_linger = if (timeout_seconds) |seconds| @intCast(c_int, seconds) else 0,
};
const settings = Socket.Linger.init(timeout_seconds);
return self.setOption(os.SOL_SOCKET, os.SO_LINGER, mem.asBytes(&settings));
}

View File

@ -16,27 +16,24 @@ const ws2_32 = windows.ws2_32;
pub fn Mixin(comptime Socket: type) type {
return struct {
/// Open a new socket.
pub fn init(domain: u32, socket_type: u32, protocol: u32) !Socket {
var filtered_socket_type = socket_type & ~@as(u32, os.SOCK_CLOEXEC);
var filtered_flags: u32 = ws2_32.WSA_FLAG_OVERLAPPED;
if (socket_type & os.SOCK_CLOEXEC != 0) {
filtered_flags |= ws2_32.WSA_FLAG_NO_HANDLE_INHERIT;
}
pub fn init(domain: u32, socket_type: u32, protocol: u32, flags: std.enums.EnumFieldStruct(Socket.InitFlags, bool, false)) !Socket {
var raw_flags: u32 = 0;
const set = std.EnumSet(Socket.InitFlags).init(flags);
if (set.contains(.close_on_exec)) raw_flags |= ws2_32.WSA_FLAG_NO_HANDLE_INHERIT;
const fd = ws2_32.WSASocketW(
@intCast(i32, domain),
@intCast(i32, filtered_socket_type),
@intCast(i32, socket_type),
@intCast(i32, protocol),
null,
0,
filtered_flags,
raw_flags,
);
if (fd == ws2_32.INVALID_SOCKET) {
return switch (ws2_32.WSAGetLastError()) {
.WSANOTINITIALISED => {
_ = try windows.WSAStartup(2, 2);
return Socket.init(domain, socket_type, protocol);
return Socket.init(domain, socket_type, protocol, flags);
},
.WSAEAFNOSUPPORT => error.AddressFamilyNotSupported,
.WSAEMFILE => error.ProcessFdQuotaExceeded,
@ -46,6 +43,14 @@ pub fn Mixin(comptime Socket: type) type {
};
}
if (set.contains(.nonblocking)) {
var enabled: c_ulong = 1;
const rc = ws2_32.ioctlsocket(fd, ws2_32.FIONBIO, &enabled);
if (rc == ws2_32.SOCKET_ERROR) {
return windows.unexpectedWSAError(ws2_32.WSAGetLastError());
}
}
return Socket{ .fd = fd };
}
@ -138,12 +143,12 @@ pub fn Mixin(comptime Socket: type) type {
/// Accept a pending incoming connection queued to the kernel backlog
/// of the socket.
pub fn accept(self: Socket, flags: u32) !Socket.Connection {
var address: ws2_32.sockaddr_storage = undefined;
var address_len: c_int = @sizeOf(ws2_32.sockaddr_storage);
pub fn accept(self: Socket, flags: std.enums.EnumFieldStruct(Socket.InitFlags, bool, false)) !Socket.Connection {
var address: Socket.Address.Native.Storage = undefined;
var address_len: c_int = @sizeOf(Socket.Address.Native.Storage);
const rc = ws2_32.accept(self.fd, @ptrCast(*ws2_32.sockaddr, &address), &address_len);
if (rc == ws2_32.INVALID_SOCKET) {
const fd = ws2_32.accept(self.fd, @ptrCast(*ws2_32.sockaddr, &address), &address_len);
if (fd == ws2_32.INVALID_SOCKET) {
return switch (ws2_32.WSAGetLastError()) {
.WSANOTINITIALISED => unreachable,
.WSAECONNRESET => error.ConnectionResetByPeer,
@ -158,9 +163,20 @@ pub fn Mixin(comptime Socket: type) type {
};
}
const socket = Socket.from(rc);
const socket = Socket.from(fd);
errdefer socket.deinit();
const socket_address = Socket.Address.fromNative(@ptrCast(*ws2_32.sockaddr, &address));
const set = std.EnumSet(Socket.InitFlags).init(flags);
if (set.contains(.nonblocking)) {
var enabled: c_ulong = 1;
const rc = ws2_32.ioctlsocket(fd, ws2_32.FIONBIO, &enabled);
if (rc == ws2_32.SOCKET_ERROR) {
return windows.unexpectedWSAError(ws2_32.WSAGetLastError());
}
}
return Socket.Connection.from(socket, socket_address);
}
@ -238,7 +254,7 @@ pub fn Mixin(comptime Socket: type) type {
/// Writes multiple I/O vectors with a prepended message header to the socket
/// with a set of flags specified. It returns the number of bytes that are
/// written to the socket.
pub fn writeVectorized(self: Socket, msg: ws2_32.msghdr_const, flags: u32) !usize {
pub fn writeVectorized(self: Socket, msg: Socket.Message, flags: u32) !usize {
const call = try windows.loadWinsockExtensionFunction(ws2_32.LPFN_WSASENDMSG, self.fd, ws2_32.WSAID_WSASENDMSG);
var num_bytes: u32 = undefined;
@ -275,7 +291,7 @@ pub fn Mixin(comptime Socket: type) type {
/// Read multiple I/O vectors with a prepended message header from the socket
/// with a set of flags specified. It returns the number of bytes that were
/// read into the buffer provided.
pub fn readVectorized(self: Socket, msg: *ws2_32.msghdr, flags: u32) !usize {
pub fn readVectorized(self: Socket, msg: *Socket.Message, flags: u32) !usize {
const call = try windows.loadWinsockExtensionFunction(ws2_32.LPFN_WSARECVMSG, self.fd, ws2_32.WSAID_WSARECVMSG);
var num_bytes: u32 = undefined;
@ -311,8 +327,8 @@ pub fn Mixin(comptime Socket: type) type {
/// Query the address that the socket is locally bounded to.
pub fn getLocalAddress(self: Socket) !Socket.Address {
var address: ws2_32.sockaddr_storage = undefined;
var address_len: c_int = @sizeOf(ws2_32.sockaddr_storage);
var address: Socket.Address.Native.Storage = undefined;
var address_len: c_int = @sizeOf(Socket.Address.Native.Storage);
const rc = ws2_32.getsockname(self.fd, @ptrCast(*ws2_32.sockaddr, &address), &address_len);
if (rc == ws2_32.SOCKET_ERROR) {
@ -331,8 +347,8 @@ pub fn Mixin(comptime Socket: type) type {
/// Query the address that the socket is connected to.
pub fn getRemoteAddress(self: Socket) !Socket.Address {
var address: ws2_32.sockaddr_storage = undefined;
var address_len: c_int = @sizeOf(ws2_32.sockaddr_storage);
var address: Socket.Address.Native.Storage = undefined;
var address_len: c_int = @sizeOf(Socket.Address.Native.Storage);
const rc = ws2_32.getpeername(self.fd, @ptrCast(*ws2_32.sockaddr, &address), &address_len);
if (rc == ws2_32.SOCKET_ERROR) {
@ -384,11 +400,7 @@ pub fn Mixin(comptime Socket: type) type {
/// if the host does not support the option for a socket to linger around up until a timeout specified in
/// seconds.
pub fn setLinger(self: Socket, timeout_seconds: ?u16) !void {
const settings = ws2_32.linger{
.l_onoff = @as(u16, @boolToInt(timeout_seconds != null)),
.l_linger = if (timeout_seconds) |seconds| seconds else 0,
};
const settings = Socket.Linger.init(timeout_seconds);
return self.setOption(ws2_32.SOL_SOCKET, ws2_32.SO_LINGER, mem.asBytes(&settings));
}