Merge pull request #8750 from lithdew/master

x/io, x/os: async i/o reactor, cross-platform socket syscalls and bits
This commit is contained in:
Andrew Kelley 2021-06-04 01:21:28 -04:00 committed by GitHub
commit 9c08a33b22
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
24 changed files with 647 additions and 204 deletions

View File

@ -19,7 +19,7 @@ test "std.atomic" {
_ = @import("atomic/Atomic.zig");
}
pub fn fence(comptime ordering: Ordering) callconv(.Inline) void {
pub inline fn fence(comptime ordering: Ordering) void {
switch (ordering) {
.Acquire, .Release, .AcqRel, .SeqCst => {
@fence(ordering);
@ -30,7 +30,7 @@ pub fn fence(comptime ordering: Ordering) callconv(.Inline) void {
}
}
pub fn compilerFence(comptime ordering: Ordering) callconv(.Inline) void {
pub inline fn compilerFence(comptime ordering: Ordering) void {
switch (ordering) {
.Acquire, .Release, .AcqRel, .SeqCst => asm volatile ("" ::: "memory"),
else => @compileLog(ordering, " only applies to a given memory location"),
@ -45,7 +45,7 @@ test "fence/compilerFence" {
}
/// Signals to the processor that the caller is inside a busy-wait spin-loop.
pub fn spinLoopHint() callconv(.Inline) void {
pub inline fn spinLoopHint() void {
const hint_instruction = switch (target.cpu.arch) {
// No-op instruction that can hint to save (or share with a hardware-thread) pipelining/power resources
// https://software.intel.com/content/www/us/en/develop/articles/benefitting-power-and-performance-sleep-loops.html

View File

@ -48,38 +48,38 @@ pub fn Atomic(comptime T: type) type {
};
}
pub fn swap(self: *Self, value: T, comptime ordering: Ordering) callconv(.Inline) T {
pub inline fn swap(self: *Self, value: T, comptime ordering: Ordering) T {
return self.rmw(.Xchg, value, ordering);
}
pub fn compareAndSwap(
pub inline fn compareAndSwap(
self: *Self,
compare: T,
exchange: T,
comptime success: Ordering,
comptime failure: Ordering,
) callconv(.Inline) ?T {
) ?T {
return self.cmpxchg(true, compare, exchange, success, failure);
}
pub fn tryCompareAndSwap(
pub inline fn tryCompareAndSwap(
self: *Self,
compare: T,
exchange: T,
comptime success: Ordering,
comptime failure: Ordering,
) callconv(.Inline) ?T {
) ?T {
return self.cmpxchg(false, compare, exchange, success, failure);
}
fn cmpxchg(
inline fn cmpxchg(
self: *Self,
comptime is_strong: bool,
compare: T,
exchange: T,
comptime success: Ordering,
comptime failure: Ordering,
) callconv(.Inline) ?T {
) ?T {
if (success == .Unordered or failure == .Unordered) {
@compileError(@tagName(Ordering.Unordered) ++ " is only allowed on atomic loads and stores");
}
@ -103,12 +103,12 @@ pub fn Atomic(comptime T: type) type {
};
}
fn rmw(
inline fn rmw(
self: *Self,
comptime op: std.builtin.AtomicRmwOp,
value: T,
comptime ordering: Ordering,
) callconv(.Inline) T {
) T {
return @atomicRmw(T, &self.value, op, value, ordering);
}
@ -117,37 +117,37 @@ pub fn Atomic(comptime T: type) type {
}
pub usingnamespace exportWhen(std.meta.trait.isNumber(T), struct {
pub fn fetchAdd(self: *Self, value: T, comptime ordering: Ordering) callconv(.Inline) T {
pub inline fn fetchAdd(self: *Self, value: T, comptime ordering: Ordering) T {
return self.rmw(.Add, value, ordering);
}
pub fn fetchSub(self: *Self, value: T, comptime ordering: Ordering) callconv(.Inline) T {
pub inline fn fetchSub(self: *Self, value: T, comptime ordering: Ordering) T {
return self.rmw(.Sub, value, ordering);
}
pub fn fetchMin(self: *Self, value: T, comptime ordering: Ordering) callconv(.Inline) T {
pub inline fn fetchMin(self: *Self, value: T, comptime ordering: Ordering) T {
return self.rmw(.Min, value, ordering);
}
pub fn fetchMax(self: *Self, value: T, comptime ordering: Ordering) callconv(.Inline) T {
pub inline fn fetchMax(self: *Self, value: T, comptime ordering: Ordering) T {
return self.rmw(.Max, value, ordering);
}
});
pub usingnamespace exportWhen(std.meta.trait.isIntegral(T), struct {
pub fn fetchAnd(self: *Self, value: T, comptime ordering: Ordering) callconv(.Inline) T {
pub inline fn fetchAnd(self: *Self, value: T, comptime ordering: Ordering) T {
return self.rmw(.And, value, ordering);
}
pub fn fetchNand(self: *Self, value: T, comptime ordering: Ordering) callconv(.Inline) T {
pub inline fn fetchNand(self: *Self, value: T, comptime ordering: Ordering) T {
return self.rmw(.Nand, value, ordering);
}
pub fn fetchOr(self: *Self, value: T, comptime ordering: Ordering) callconv(.Inline) T {
pub inline fn fetchOr(self: *Self, value: T, comptime ordering: Ordering) T {
return self.rmw(.Or, value, ordering);
}
pub fn fetchXor(self: *Self, value: T, comptime ordering: Ordering) callconv(.Inline) T {
pub inline fn fetchXor(self: *Self, value: T, comptime ordering: Ordering) T {
return self.rmw(.Xor, value, ordering);
}
@ -158,24 +158,24 @@ pub fn Atomic(comptime T: type) type {
Toggle,
};
pub fn bitSet(self: *Self, bit: Bit, comptime ordering: Ordering) callconv(.Inline) u1 {
pub inline fn bitSet(self: *Self, bit: Bit, comptime ordering: Ordering) u1 {
return bitRmw(self, .Set, bit, ordering);
}
pub fn bitReset(self: *Self, bit: Bit, comptime ordering: Ordering) callconv(.Inline) u1 {
pub inline fn bitReset(self: *Self, bit: Bit, comptime ordering: Ordering) u1 {
return bitRmw(self, .Reset, bit, ordering);
}
pub fn bitToggle(self: *Self, bit: Bit, comptime ordering: Ordering) callconv(.Inline) u1 {
pub inline fn bitToggle(self: *Self, bit: Bit, comptime ordering: Ordering) u1 {
return bitRmw(self, .Toggle, bit, ordering);
}
fn bitRmw(
inline fn bitRmw(
self: *Self,
comptime op: BitRmwOp,
bit: Bit,
comptime ordering: Ordering,
) callconv(.Inline) u1 {
) u1 {
// x86 supports dedicated bitwise instructions
if (comptime target.cpu.arch.isX86() and @sizeOf(T) >= 2 and @sizeOf(T) <= 8) {
const instruction = switch (op) {

View File

@ -166,9 +166,10 @@ pub extern "c" fn sendto(
dest_addr: ?*const sockaddr,
addrlen: socklen_t,
) isize;
pub extern "c" fn sendmsg(sockfd: fd_t, msg: *const std.x.os.Socket.Message, flags: c_int) isize;
pub extern fn recv(sockfd: fd_t, arg1: ?*c_void, arg2: usize, arg3: c_int) isize;
pub extern fn recvfrom(
pub extern "c" fn recv(sockfd: fd_t, arg1: ?*c_void, arg2: usize, arg3: c_int) isize;
pub extern "c" fn recvfrom(
sockfd: fd_t,
noalias buf: *c_void,
len: usize,
@ -176,6 +177,7 @@ pub extern fn recvfrom(
noalias src_addr: ?*sockaddr,
noalias addrlen: ?*socklen_t,
) isize;
pub extern "c" fn recvmsg(sockfd: fd_t, msg: *std.x.os.Socket.Message, flags: c_int) isize;
pub usingnamespace switch (builtin.os.tag) {
.netbsd => struct {

View File

@ -2111,10 +2111,7 @@ test "parse into struct with duplicate field" {
const ballast = try testing.allocator.alloc(u64, 1);
defer testing.allocator.free(ballast);
const options_first = ParseOptions{
.allocator = testing.allocator,
.duplicate_field_behavior = .UseFirst,
};
const options_first = ParseOptions{ .allocator = testing.allocator, .duplicate_field_behavior = .UseFirst };
const options_last = ParseOptions{
.allocator = testing.allocator,

View File

@ -1171,7 +1171,7 @@ test "mem.indexOf" {
test "mem.indexOf multibyte" {
{
// make haystack and needle long enough to trigger boyer-moore-horspool algorithm
const haystack = [1]u16{0} ** 100 ++ [_]u16 { 0xbbaa, 0xccbb, 0xddcc, 0xeedd, 0xffee, 0x00ff };
const haystack = [1]u16{0} ** 100 ++ [_]u16{ 0xbbaa, 0xccbb, 0xddcc, 0xeedd, 0xffee, 0x00ff };
const needle = [_]u16{ 0xbbaa, 0xccbb, 0xddcc, 0xeedd, 0xffee };
try testing.expectEqual(indexOfPos(u16, &haystack, 0, &needle), 100);
@ -1184,7 +1184,7 @@ test "mem.indexOf multibyte" {
{
// make haystack and needle long enough to trigger boyer-moore-horspool algorithm
const haystack = [_]u16 { 0xbbaa, 0xccbb, 0xddcc, 0xeedd, 0xffee, 0x00ff } ++ [1]u16{0} ** 100;
const haystack = [_]u16{ 0xbbaa, 0xccbb, 0xddcc, 0xeedd, 0xffee, 0x00ff } ++ [1]u16{0} ** 100;
const needle = [_]u16{ 0xbbaa, 0xccbb, 0xddcc, 0xeedd, 0xffee };
try testing.expectEqual(lastIndexOf(u16, &haystack, &needle), 0);
@ -2201,7 +2201,7 @@ pub fn collapseRepeatsLen(comptime T: type, slice: []T, elem: T) usize {
/// Collapse consecutive duplicate elements into one entry.
pub fn collapseRepeats(comptime T: type, slice: []T, elem: T) []T {
return slice[0 .. collapseRepeatsLen(T, slice, elem)];
return slice[0..collapseRepeatsLen(T, slice, elem)];
}
fn testCollapseRepeats(str: []const u8, elem: u8, expected: []const u8) !void {

View File

@ -4998,7 +4998,7 @@ pub fn sendmsg(
flags: u32,
) SendMsgError!usize {
while (true) {
const rc = system.sendmsg(sockfd, &msg, flags);
const rc = system.sendmsg(sockfd, @ptrCast(*const std.x.os.Socket.Message, &msg), @intCast(c_int, flags));
if (builtin.os.tag == .windows) {
if (rc == windows.ws2_32.SOCKET_ERROR) {
switch (windows.ws2_32.WSAGetLastError()) {

View File

@ -23,13 +23,7 @@ pub const sockaddr = extern struct {
family: sa_family_t,
data: [14]u8,
};
pub const sockaddr_storage = extern struct {
len: u8,
family: sa_family_t,
__pad1: [5]u8,
__align: i64,
__pad2: [112]u8,
};
pub const sockaddr_storage = std.x.os.Socket.Address.Native.Storage;
pub const sockaddr_in = extern struct {
len: u8 = @sizeOf(sockaddr_in),
family: sa_family_t = AF_INET,

View File

@ -396,6 +396,8 @@ pub const sockaddr = extern struct {
data: [14]u8,
};
pub const sockaddr_storage = std.x.os.Socket.Address.Native.Storage;
pub const Kevent = extern struct {
ident: usize,
filter: c_short,
@ -694,14 +696,6 @@ pub const in_port_t = u16;
pub const sa_family_t = u8;
pub const socklen_t = u32;
pub const sockaddr_storage = extern struct {
ss_len: u8,
ss_family: sa_family_t,
__ss_pad1: [5]u8,
__ss_align: i64,
__ss_pad2: [112]u8,
};
pub const sockaddr_in = extern struct {
len: u8 = @sizeOf(sockaddr_in),
family: sa_family_t = AF_INET,
@ -768,6 +762,11 @@ pub const dl_phdr_info = extern struct {
dlpi_phdr: [*]std.elf.Phdr,
dlpi_phnum: u16,
};
pub const cmsghdr = extern struct {
cmsg_len: socklen_t,
cmsg_level: c_int,
cmsg_type: c_int,
};
pub const msghdr = extern struct {
msg_name: ?*c_void,
msg_namelen: socklen_t,
@ -777,11 +776,6 @@ pub const msghdr = extern struct {
msg_controllen: socklen_t,
msg_flags: c_int,
};
pub const cmsghdr = extern struct {
cmsg_len: socklen_t,
cmsg_level: c_int,
cmsg_type: c_int,
};
pub const cmsgcred = extern struct {
cmcred_pid: pid_t,
cmcred_uid: uid_t,

View File

@ -206,13 +206,7 @@ pub const sockaddr = extern struct {
data: [14]u8,
};
pub const sockaddr_storage = extern struct {
len: u8,
family: sa_family_t,
__pad1: [5]u8,
__align: i64,
__pad2: [112]u8,
};
pub const sockaddr_storage = std.x.os.Socket.Address.Native.Storage;
pub const sockaddr_in = extern struct {
len: u8 = @sizeOf(sockaddr_in),

View File

@ -239,13 +239,7 @@ pub const sockaddr = extern struct {
data: [14]u8,
};
pub const sockaddr_storage = extern struct {
len: u8,
family: sa_family_t,
__pad1: [5]u8,
__align: i64,
__pad2: [112]u8,
};
pub const sockaddr_storage = std.x.os.Socket.Address.Native.Storage;
pub const sockaddr_in = extern struct {
len: u8 = @sizeOf(sockaddr_in),

View File

@ -1149,12 +1149,7 @@ pub const sockaddr = extern struct {
data: [14]u8,
};
pub const sockaddr_storage = extern struct {
family: sa_family_t,
__pad1: [6]u8,
__align: i64,
__pad2: [112]u8,
};
pub const sockaddr_storage = std.x.os.Socket.Address.Native.Storage;
/// IPv4 socket address
pub const sockaddr_in = extern struct {

View File

@ -226,13 +226,7 @@ pub const sockaddr = extern struct {
data: [14]u8,
};
pub const sockaddr_storage = extern struct {
len: u8,
family: sa_family_t,
__pad1: [5]u8,
__align: i64,
__pad2: [112]u8,
};
pub const sockaddr_storage = std.x.os.Socket.Address.Native.Storage;
pub const sockaddr_in = extern struct {
len: u8 = @sizeOf(sockaddr_in),

View File

@ -246,13 +246,7 @@ pub const sockaddr = extern struct {
data: [14]u8,
};
pub const sockaddr_storage = extern struct {
len: u8,
family: sa_family_t,
__pad1: [5]u8,
__align: i64,
__pad2: [112]u8,
};
pub const sockaddr_storage = std.x.os.Socket.Address.Native.Storage;
pub const sockaddr_in = extern struct {
len: u8 = @sizeOf(sockaddr_in),

View File

@ -1000,11 +1000,11 @@ pub fn getsockopt(fd: i32, level: u32, optname: u32, noalias optval: [*]u8, noal
return syscall5(.getsockopt, @bitCast(usize, @as(isize, fd)), level, optname, @ptrToInt(optval), @ptrToInt(optlen));
}
pub fn sendmsg(fd: i32, msg: *const msghdr_const, flags: u32) usize {
pub fn sendmsg(fd: i32, msg: *const std.x.os.Socket.Message, flags: c_int) usize {
if (native_arch == .i386) {
return socketcall(SC_sendmsg, &[3]usize{ @bitCast(usize, @as(isize, fd)), @ptrToInt(msg), flags });
return socketcall(SC_sendmsg, &[3]usize{ @bitCast(usize, @as(isize, fd)), @ptrToInt(msg), @bitCast(usize, @as(isize, flags)) });
}
return syscall3(.sendmsg, @bitCast(usize, @as(isize, fd)), @ptrToInt(msg), flags);
return syscall3(.sendmsg, @bitCast(usize, @as(isize, fd)), @ptrToInt(msg), @bitCast(usize, @as(isize, flags)));
}
pub fn sendmmsg(fd: i32, msgvec: [*]mmsghdr_const, vlen: u32, flags: u32) usize {
@ -1054,11 +1054,11 @@ pub fn connect(fd: i32, addr: *const c_void, len: socklen_t) usize {
return syscall3(.connect, @bitCast(usize, @as(isize, fd)), @ptrToInt(addr), len);
}
pub fn recvmsg(fd: i32, msg: *msghdr, flags: u32) usize {
pub fn recvmsg(fd: i32, msg: *std.x.os.Socket.Message, flags: c_int) usize {
if (native_arch == .i386) {
return socketcall(SC_recvmsg, &[3]usize{ @bitCast(usize, @as(isize, fd)), @ptrToInt(msg), flags });
return socketcall(SC_recvmsg, &[3]usize{ @bitCast(usize, @as(isize, fd)), @ptrToInt(msg), @bitCast(usize, @as(isize, flags)) });
}
return syscall3(.recvmsg, @bitCast(usize, @as(isize, fd)), @ptrToInt(msg), flags);
return syscall3(.recvmsg, @bitCast(usize, @as(isize, fd)), @ptrToInt(msg), @bitCast(usize, @as(isize, flags)));
}
pub fn recvfrom(fd: i32, noalias buf: [*]u8, len: usize, flags: u32, noalias addr: ?*sockaddr, noalias alen: ?*socklen_t) usize {

View File

@ -1844,7 +1844,7 @@ pub fn sliceToPrefixedFileW(s: []const u8) !PathSpace {
}
fn getFullPathNameW(path: [*:0]const u16, out: []u16) !usize {
const result= kernel32.GetFullPathNameW(path, @intCast(u32, out.len), std.meta.assumeSentinel(out.ptr, 0), null);
const result = kernel32.GetFullPathNameW(path, @intCast(u32, out.len), std.meta.assumeSentinel(out.ptr, 0), null);
if (result == 0) {
switch (kernel32.GetLastError()) {
else => |err| return unexpectedError(err),

View File

@ -3,6 +3,7 @@
// This file is part of [zig](https://ziglang.org/), which is MIT licensed.
// The MIT license requires this copyright notice to be included in all copies
// and substantial portions of the software.
const std = @import("../../std.zig");
usingnamespace @import("bits.zig");
pub const SOCKET = *opaque {};
@ -1058,12 +1059,7 @@ pub const sockaddr = extern struct {
data: [14]u8,
};
pub const sockaddr_storage = extern struct {
family: ADDRESS_FAMILY,
__pad1: [6]u8,
__align: i64,
__pad2: [112]u8,
};
pub const sockaddr_storage = std.x.os.Socket.Address.Native.Storage;
/// IPv4 socket address
pub const sockaddr_in = extern struct {
@ -1163,7 +1159,7 @@ pub const LPFN_GETACCEPTEXSOCKADDRS = fn (
pub const LPFN_WSASENDMSG = fn (
s: SOCKET,
lpMsg: *const WSAMSG_const,
lpMsg: *const std.x.os.Socket.Message,
dwFlags: u32,
lpNumberOfBytesSent: ?*u32,
lpOverlapped: ?*OVERLAPPED,
@ -1172,7 +1168,7 @@ pub const LPFN_WSASENDMSG = fn (
pub const LPFN_WSARECVMSG = fn (
s: SOCKET,
lpMsg: *WSAMSG,
lpMsg: *std.x.os.Socket.Message,
lpdwNumberOfBytesRecv: ?*u32,
lpOverlapped: ?*OVERLAPPED,
lpCompletionRoutine: ?LPWSAOVERLAPPED_COMPLETION_ROUTINE,
@ -2046,7 +2042,7 @@ pub extern "ws2_32" fn WSASend(
pub extern "ws2_32" fn WSASendMsg(
s: SOCKET,
lpMsg: *const WSAMSG_const,
lpMsg: *const std.x.os.Socket.Message,
dwFlags: u32,
lpNumberOfBytesSent: ?*u32,
lpOverlapped: ?*OVERLAPPED,
@ -2055,7 +2051,7 @@ pub extern "ws2_32" fn WSASendMsg(
pub extern "ws2_32" fn WSARecvMsg(
s: SOCKET,
lpMsg: *WSAMSG,
lpMsg: *std.x.os.Socket.Message,
lpdwNumberOfBytesRecv: ?*u32,
lpOverlapped: ?*OVERLAPPED,
lpCompletionRoutine: ?LPWSAOVERLAPPED_COMPLETION_ROUTINE,

View File

@ -500,8 +500,7 @@ pub const Target = struct {
.haiku,
.windows,
=> return .gnu,
.uefi,
=> return .msvc,
.uefi => return .msvc,
.linux,
.wasi,
.emscripten,

View File

@ -8,6 +8,7 @@ const std = @import("std.zig");
pub const os = struct {
pub const Socket = @import("x/os/socket.zig").Socket;
pub usingnamespace @import("x/os/io.zig");
pub usingnamespace @import("x/os/net.zig");
};

View File

@ -12,12 +12,13 @@ const ip = std.x.net.ip;
const fmt = std.fmt;
const mem = std.mem;
const builtin = std.builtin;
const testing = std.testing;
const native_os = std.Target.current.os;
const IPv4 = std.x.os.IPv4;
const IPv6 = std.x.os.IPv6;
const Socket = std.x.os.Socket;
const Buffer = std.x.os.Buffer;
/// A generic TCP socket abstraction.
const tcp = @This();
@ -82,12 +83,13 @@ pub const Client = struct {
};
/// Opens a new client.
pub fn init(domain: tcp.Domain, flags: u32) !Client {
pub fn init(domain: tcp.Domain, flags: std.enums.EnumFieldStruct(Socket.InitFlags, bool, false)) !Client {
return Client{
.socket = try Socket.init(
@enumToInt(domain),
os.SOCK_STREAM | flags,
os.SOCK_STREAM,
os.IPPROTO_TCP,
flags,
),
};
}
@ -143,15 +145,15 @@ pub const Client = struct {
/// Writes multiple I/O vectors with a prepended message header to the socket
/// with a set of flags specified. It returns the number of bytes that are
/// written to the socket.
pub fn writeVectorized(self: Client, msg: os.msghdr_const, flags: u32) !usize {
return self.socket.writeVectorized(msg, flags);
pub fn writeMessage(self: Client, msg: Socket.Message, flags: u32) !usize {
return self.socket.writeMessage(msg, flags);
}
/// Read multiple I/O vectors with a prepended message header from the socket
/// with a set of flags specified. It returns the number of bytes that were
/// read into the buffer provided.
pub fn readVectorized(self: Client, msg: *os.msghdr, flags: u32) !usize {
return self.socket.readVectorized(msg, flags);
pub fn readMessage(self: Client, msg: *Socket.Message, flags: u32) !usize {
return self.socket.readMessage(msg, flags);
}
/// Query and return the latest cached error on the client's underlying socket.
@ -244,12 +246,13 @@ pub const Listener = struct {
socket: Socket,
/// Opens a new listener.
pub fn init(domain: tcp.Domain, flags: u32) !Listener {
pub fn init(domain: tcp.Domain, flags: std.enums.EnumFieldStruct(Socket.InitFlags, bool, false)) !Listener {
return Listener{
.socket = try Socket.init(
@enumToInt(domain),
os.SOCK_STREAM | flags,
os.SOCK_STREAM,
os.IPPROTO_TCP,
flags,
),
};
}
@ -278,7 +281,7 @@ pub const Listener = struct {
/// Accept a pending incoming connection queued to the kernel backlog
/// of the listener's socket.
pub fn accept(self: Listener, flags: u32) !tcp.Connection {
pub fn accept(self: Listener, flags: std.enums.EnumFieldStruct(Socket.InitFlags, bool, false)) !tcp.Connection {
return tcp.Connection.from(try self.socket.accept(flags));
}
@ -322,9 +325,9 @@ pub const Listener = struct {
};
test "tcp: create client/listener pair" {
if (builtin.os.tag == .wasi) return error.SkipZigTest;
if (native_os.tag == .wasi) return error.SkipZigTest;
const listener = try tcp.Listener.init(.ip, os.SOCK_CLOEXEC);
const listener = try tcp.Listener.init(.ip, .{ .close_on_exec = true });
defer listener.deinit();
try listener.bind(ip.Address.initIPv4(IPv4.unspecified, 0));
@ -336,19 +339,19 @@ test "tcp: create client/listener pair" {
.ipv6 => |*ipv6| ipv6.host = IPv6.localhost,
}
const client = try tcp.Client.init(.ip, os.SOCK_CLOEXEC);
const client = try tcp.Client.init(.ip, .{ .close_on_exec = true });
defer client.deinit();
try client.connect(binded_address);
const conn = try listener.accept(os.SOCK_CLOEXEC);
const conn = try listener.accept(.{ .close_on_exec = true });
defer conn.deinit();
}
test "tcp/client: set read timeout of 1 millisecond on blocking client" {
if (builtin.os.tag == .wasi) return error.SkipZigTest;
test "tcp/client: 1ms read timeout" {
if (native_os.tag == .wasi) return error.SkipZigTest;
const listener = try tcp.Listener.init(.ip, os.SOCK_CLOEXEC);
const listener = try tcp.Listener.init(.ip, .{ .close_on_exec = true });
defer listener.deinit();
try listener.bind(ip.Address.initIPv4(IPv4.unspecified, 0));
@ -360,23 +363,62 @@ test "tcp/client: set read timeout of 1 millisecond on blocking client" {
.ipv6 => |*ipv6| ipv6.host = IPv6.localhost,
}
const client = try tcp.Client.init(.ip, os.SOCK_CLOEXEC);
const client = try tcp.Client.init(.ip, .{ .close_on_exec = true });
defer client.deinit();
try client.connect(binded_address);
try client.setReadTimeout(1);
const conn = try listener.accept(os.SOCK_CLOEXEC);
const conn = try listener.accept(.{ .close_on_exec = true });
defer conn.deinit();
var buf: [1]u8 = undefined;
try testing.expectError(error.WouldBlock, client.reader(0).read(&buf));
}
test "tcp/listener: bind to unspecified ipv4 address" {
if (builtin.os.tag == .wasi) return error.SkipZigTest;
test "tcp/client: read and write multiple vectors" {
if (native_os.tag == .wasi) return error.SkipZigTest;
const listener = try tcp.Listener.init(.ip, os.SOCK_CLOEXEC);
const listener = try tcp.Listener.init(.ip, .{ .close_on_exec = true });
defer listener.deinit();
try listener.bind(ip.Address.initIPv4(IPv4.unspecified, 0));
try listener.listen(128);
var binded_address = try listener.getLocalAddress();
switch (binded_address) {
.ipv4 => |*ipv4| ipv4.host = IPv4.localhost,
.ipv6 => |*ipv6| ipv6.host = IPv6.localhost,
}
const client = try tcp.Client.init(.ip, .{ .close_on_exec = true });
defer client.deinit();
try client.connect(binded_address);
const conn = try listener.accept(.{ .close_on_exec = true });
defer conn.deinit();
const message = "hello world";
_ = try conn.client.writeMessage(Socket.Message.fromBuffers(&[_]Buffer{
Buffer.from(message[0 .. message.len / 2]),
Buffer.from(message[message.len / 2 ..]),
}), 0);
var buf: [message.len + 1]u8 = undefined;
var msg = Socket.Message.fromBuffers(&[_]Buffer{
Buffer.from(buf[0 .. message.len / 2]),
Buffer.from(buf[message.len / 2 ..]),
});
_ = try client.readMessage(&msg, 0);
try testing.expectEqualStrings(message, buf[0..message.len]);
}
test "tcp/listener: bind to unspecified ipv4 address" {
if (native_os.tag == .wasi) return error.SkipZigTest;
const listener = try tcp.Listener.init(.ip, .{ .close_on_exec = true });
defer listener.deinit();
try listener.bind(ip.Address.initIPv4(IPv4.unspecified, 0));
@ -387,9 +429,9 @@ test "tcp/listener: bind to unspecified ipv4 address" {
}
test "tcp/listener: bind to unspecified ipv6 address" {
if (builtin.os.tag == .wasi) return error.SkipZigTest;
if (native_os.tag == .wasi) return error.SkipZigTest;
const listener = try tcp.Listener.init(.ipv6, os.SOCK_CLOEXEC);
const listener = try tcp.Listener.init(.ipv6, .{ .close_on_exec = true });
defer listener.deinit();
try listener.bind(ip.Address.initIPv6(IPv6.unspecified, 0));

205
lib/std/x/os/io.zig Normal file
View File

@ -0,0 +1,205 @@
const std = @import("../../std.zig");
const os = std.os;
const mem = std.mem;
const testing = std.testing;
const native_os = std.Target.current.os;
/// POSIX `iovec`, or Windows `WSABUF`. The difference between the two are the ordering
/// of fields, alongside the length being represented as either a ULONG or a size_t.
pub const Buffer = if (native_os.tag == .windows)
extern struct {
len: c_ulong,
ptr: usize,
pub fn from(slice: []const u8) Buffer {
return .{ .len = @intCast(c_ulong, slice.len), .ptr = @ptrToInt(slice.ptr) };
}
pub fn into(self: Buffer) []const u8 {
return @intToPtr([*]const u8, self.ptr)[0..self.len];
}
pub fn intoMutable(self: Buffer) []u8 {
return @intToPtr([*]u8, self.ptr)[0..self.len];
}
}
else
extern struct {
ptr: usize,
len: usize,
pub fn from(slice: []const u8) Buffer {
return .{ .ptr = @ptrToInt(slice.ptr), .len = slice.len };
}
pub fn into(self: Buffer) []const u8 {
return @intToPtr([*]const u8, self.ptr)[0..self.len];
}
pub fn intoMutable(self: Buffer) []u8 {
return @intToPtr([*]u8, self.ptr)[0..self.len];
}
};
pub const Reactor = struct {
pub const InitFlags = enum {
close_on_exec,
};
pub const Event = struct {
data: usize,
is_error: bool,
is_hup: bool,
is_readable: bool,
is_writable: bool,
};
pub const Interest = struct {
hup: bool = false,
oneshot: bool = false,
readable: bool = false,
writable: bool = false,
};
fd: os.fd_t,
pub fn init(flags: std.enums.EnumFieldStruct(Reactor.InitFlags, bool, false)) !Reactor {
var raw_flags: u32 = 0;
const set = std.EnumSet(Reactor.InitFlags).init(flags);
if (set.contains(.close_on_exec)) raw_flags |= os.EPOLL_CLOEXEC;
return Reactor{ .fd = try os.epoll_create1(raw_flags) };
}
pub fn deinit(self: Reactor) void {
os.close(self.fd);
}
pub fn update(self: Reactor, fd: os.fd_t, identifier: usize, interest: Reactor.Interest) !void {
var flags: u32 = 0;
flags |= if (interest.oneshot) os.EPOLLONESHOT else os.EPOLLET;
if (interest.hup) flags |= os.EPOLLRDHUP;
if (interest.readable) flags |= os.EPOLLIN;
if (interest.writable) flags |= os.EPOLLOUT;
const event = &os.epoll_event{
.events = flags,
.data = .{ .ptr = identifier },
};
os.epoll_ctl(self.fd, os.EPOLL_CTL_MOD, fd, event) catch |err| switch (err) {
error.FileDescriptorNotRegistered => try os.epoll_ctl(self.fd, os.EPOLL_CTL_ADD, fd, event),
else => return err,
};
}
pub fn poll(self: Reactor, comptime max_num_events: comptime_int, closure: anytype, timeout_milliseconds: ?u64) !void {
var events: [max_num_events]os.epoll_event = undefined;
const num_events = os.epoll_wait(self.fd, &events, if (timeout_milliseconds) |ms| @intCast(i32, ms) else -1);
for (events[0..num_events]) |ev| {
const is_error = ev.events & os.EPOLLERR != 0;
const is_hup = ev.events & (os.EPOLLHUP | os.EPOLLRDHUP) != 0;
const is_readable = ev.events & os.EPOLLIN != 0;
const is_writable = ev.events & os.EPOLLOUT != 0;
try closure.call(Reactor.Event{
.data = ev.data.ptr,
.is_error = is_error,
.is_hup = is_hup,
.is_readable = is_readable,
.is_writable = is_writable,
});
}
}
};
test "reactor/linux: drive async tcp client/listener pair" {
if (native_os.tag != .linux) return error.SkipZigTest;
const ip = std.x.net.ip;
const tcp = std.x.net.tcp;
const IPv4 = std.x.os.IPv4;
const IPv6 = std.x.os.IPv6;
const Socket = std.x.os.Socket;
const reactor = try Reactor.init(.{ .close_on_exec = true });
defer reactor.deinit();
const listener = try tcp.Listener.init(.ip, .{
.close_on_exec = true,
.nonblocking = true,
});
defer listener.deinit();
try reactor.update(listener.socket.fd, 0, .{ .readable = true });
try reactor.poll(1, struct {
fn call(event: Reactor.Event) !void {
try testing.expectEqual(Reactor.Event{
.data = 0,
.is_error = false,
.is_hup = true,
.is_readable = false,
.is_writable = false,
}, event);
}
}, null);
try listener.bind(ip.Address.initIPv4(IPv4.unspecified, 0));
try listener.listen(128);
var binded_address = try listener.getLocalAddress();
switch (binded_address) {
.ipv4 => |*ipv4| ipv4.host = IPv4.localhost,
.ipv6 => |*ipv6| ipv6.host = IPv6.localhost,
}
const client = try tcp.Client.init(.ip, .{
.close_on_exec = true,
.nonblocking = true,
});
defer client.deinit();
try reactor.update(client.socket.fd, 1, .{ .readable = true, .writable = true });
try reactor.poll(1, struct {
fn call(event: Reactor.Event) !void {
try testing.expectEqual(Reactor.Event{
.data = 1,
.is_error = false,
.is_hup = true,
.is_readable = false,
.is_writable = true,
}, event);
}
}, null);
client.connect(binded_address) catch |err| switch (err) {
error.WouldBlock => {},
else => return err,
};
try reactor.poll(1, struct {
fn call(event: Reactor.Event) !void {
try testing.expectEqual(Reactor.Event{
.data = 1,
.is_error = false,
.is_hup = false,
.is_readable = false,
.is_writable = true,
}, event);
}
}, null);
try reactor.poll(1, struct {
fn call(event: Reactor.Event) !void {
try testing.expectEqual(Reactor.Event{
.data = 0,
.is_error = false,
.is_hup = false,
.is_readable = true,
.is_writable = false,
}, event);
}
}, null);
}

View File

@ -10,17 +10,17 @@ const os = std.os;
const fmt = std.fmt;
const mem = std.mem;
const math = std.math;
const builtin = std.builtin;
const testing = std.testing;
const native_os = std.Target.current.os;
/// Resolves a network interface name into a scope/zone ID. It returns
/// an error if either resolution fails, or if the interface name is
/// too long.
pub fn resolveScopeID(name: []const u8) !u32 {
if (comptime @hasDecl(os, "IFNAMESIZE")) {
if (@hasDecl(os, "IFNAMESIZE")) {
if (name.len >= os.IFNAMESIZE - 1) return error.NameTooLong;
if (comptime builtin.os.tag == .windows) {
if (native_os.tag == .windows) {
var interface_name: [os.IFNAMESIZE]u8 = undefined;
mem.copy(u8, &interface_name, name);
interface_name[name.len] = 0;

View File

@ -11,7 +11,13 @@ const os = std.os;
const fmt = std.fmt;
const mem = std.mem;
const time = std.time;
const builtin = std.builtin;
const meta = std.meta;
const native_os = std.Target.current.os;
const native_endian = std.Target.current.cpu.arch.endian();
const Buffer = std.x.os.Buffer;
const assert = std.debug.assert;
/// A generic, cross-platform socket abstraction.
pub const Socket = struct {
@ -29,6 +35,32 @@ pub const Socket = struct {
/// A generic socket address abstraction. It is safe to directly access and modify
/// the fields of a `Socket.Address`.
pub const Address = union(enum) {
pub const Native = struct {
pub const requires_prepended_length = native_os.getVersionRange() == .semver;
pub const Length = if (requires_prepended_length) u8 else [0]u8;
pub const Family = if (requires_prepended_length) u8 else c_ushort;
/// POSIX `sockaddr_storage`. The expected size and alignment is specified in IETF RFC 2553.
pub const Storage = extern struct {
pub const expected_size = 128;
pub const expected_alignment = 8;
pub const padding_size = expected_size -
mem.alignForward(@sizeOf(Address.Native.Length), expected_alignment) -
mem.alignForward(@sizeOf(Address.Native.Family), expected_alignment);
len: Address.Native.Length align(expected_alignment) = undefined,
family: Address.Native.Family align(expected_alignment) = undefined,
padding: [padding_size]u8 align(expected_alignment) = undefined,
comptime {
assert(@sizeOf(Storage) == Storage.expected_size);
assert(@alignOf(Storage) == Storage.expected_alignment);
}
};
};
ipv4: net.IPv4.Address,
ipv6: net.IPv6.Address,
@ -107,6 +139,174 @@ pub const Socket = struct {
}
};
/// POSIX `msghdr`. Denotes a destination address, set of buffers, control data, and flags. Ported
/// directly from musl.
pub const Message = if (native_os.isAtLeast(.windows, .vista) != null and native_os.isAtLeast(.windows, .vista).?)
extern struct {
name: usize = @ptrToInt(@as(?[*]u8, null)),
name_len: c_int = 0,
buffers: usize = undefined,
buffers_len: c_ulong = undefined,
control: Buffer = .{
.ptr = @ptrToInt(@as(?[*]u8, null)),
.len = 0,
},
flags: c_ulong = 0,
pub usingnamespace MessageMixin(Message);
}
else if (native_os.tag == .windows)
extern struct {
name: usize = @ptrToInt(@as(?[*]u8, null)),
name_len: c_int = 0,
buffers: usize = undefined,
buffers_len: u32 = undefined,
control: Buffer = .{
.ptr = @ptrToInt(@as(?[*]u8, null)),
.len = 0,
},
flags: u32 = 0,
pub usingnamespace MessageMixin(Message);
}
else if (@sizeOf(usize) > 4 and native_endian == .Big)
extern struct {
name: usize = @ptrToInt(@as(?[*]u8, null)),
name_len: c_uint = 0,
buffers: usize = undefined,
_pad_1: c_int = 0,
buffers_len: c_int = undefined,
control: usize = @ptrToInt(@as(?[*]u8, null)),
_pad_2: c_int = 0,
control_len: c_uint = 0,
flags: c_int = 0,
pub usingnamespace MessageMixin(Message);
}
else if (@sizeOf(usize) > 4 and native_endian == .Little)
extern struct {
name: usize = @ptrToInt(@as(?[*]u8, null)),
name_len: c_uint = 0,
buffers: usize = undefined,
buffers_len: c_int = undefined,
_pad_1: c_int = 0,
control: usize = @ptrToInt(@as(?[*]u8, null)),
control_len: c_uint = 0,
_pad_2: c_int = 0,
flags: c_int = 0,
pub usingnamespace MessageMixin(Message);
}
else
extern struct {
name: usize = @ptrToInt(@as(?[*]u8, null)),
name_len: c_uint = 0,
buffers: usize = undefined,
buffers_len: c_int = undefined,
control: usize = @ptrToInt(@as(?[*]u8, null)),
control_len: c_uint = 0,
flags: c_int = 0,
pub usingnamespace MessageMixin(Message);
};
fn MessageMixin(comptime Self: type) type {
return struct {
pub fn fromBuffers(buffers: []const Buffer) Self {
var self: Self = .{};
self.setBuffers(buffers);
return self;
}
pub fn setName(self: *Self, name: []const u8) void {
self.name = @ptrToInt(name.ptr);
self.name_len = @intCast(meta.fieldInfo(Self, .name_len).field_type, name.len);
}
pub fn setBuffers(self: *Self, buffers: []const Buffer) void {
self.buffers = @ptrToInt(buffers.ptr);
self.buffers_len = @intCast(meta.fieldInfo(Self, .buffers_len).field_type, buffers.len);
}
pub fn setControl(self: *Self, control: []const u8) void {
if (native_os.tag == .windows) {
self.control = Buffer.from(control);
} else {
self.control = @ptrToInt(control.ptr);
self.control_len = @intCast(meta.fieldInfo(Self, .control_len).field_type, control.len);
}
}
pub fn setFlags(self: *Self, flags: u32) void {
self.flags = @intCast(meta.fieldInfo(Self, .flags).field_type, flags);
}
pub fn getName(self: Self) []const u8 {
return @intToPtr([*]const u8, self.name)[0..@intCast(usize, self.name_len)];
}
pub fn getBuffers(self: Self) []const Buffer {
return @intToPtr([*]const Buffer, self.buffers)[0..@intCast(usize, self.buffers_len)];
}
pub fn getControl(self: Self) []const u8 {
if (native_os.tag == .windows) {
return self.control.into();
} else {
return @intToPtr([*]const u8, self.control)[0..@intCast(usize, self.control_len)];
}
}
pub fn getFlags(self: Self) u32 {
return @intCast(u32, self.flags);
}
};
}
/// POSIX `linger`, denoting the linger settings of a socket.
///
/// Microsoft's documentation and glibc denote the fields to be unsigned
/// short's on Windows, whereas glibc and musl denote the fields to be
/// int's on every other platform.
pub const Linger = extern struct {
pub const Field = switch (native_os.tag) {
.windows => c_ushort,
else => c_int,
};
enabled: Field,
timeout_seconds: Field,
pub fn init(timeout_seconds: ?u16) Socket.Linger {
return .{
.enabled = @intCast(Socket.Linger.Field, @boolToInt(timeout_seconds != null)),
.timeout_seconds = if (timeout_seconds) |seconds| @intCast(Socket.Linger.Field, seconds) else 0,
};
}
};
/// Possible set of flags to initialize a socket with.
pub const InitFlags = enum {
// Initialize a socket to be non-blocking.
nonblocking,
// Have a socket close itself on exec syscalls.
close_on_exec,
};
/// The underlying handle of a socket.
fd: os.socket_t,
@ -116,7 +316,7 @@ pub const Socket = struct {
}
/// Mix in socket syscalls depending on the platform we are compiling against.
pub usingnamespace switch (builtin.os.tag) {
pub usingnamespace switch (native_os.tag) {
.windows => @import("socket_windows.zig"),
else => @import("socket_posix.zig"),
}.Mixin(Socket);

View File

@ -13,8 +13,12 @@ const time = std.time;
pub fn Mixin(comptime Socket: type) type {
return struct {
/// Open a new socket.
pub fn init(domain: u32, socket_type: u32, protocol: u32) !Socket {
return Socket{ .fd = try os.socket(domain, socket_type, protocol) };
pub fn init(domain: u32, socket_type: u32, protocol: u32, flags: std.enums.EnumFieldStruct(Socket.InitFlags, bool, false)) !Socket {
var raw_flags: u32 = socket_type;
const set = std.EnumSet(Socket.InitFlags).init(flags);
if (set.contains(.close_on_exec)) raw_flags |= os.SOCK_CLOEXEC;
if (set.contains(.nonblocking)) raw_flags |= os.SOCK_NONBLOCK;
return Socket{ .fd = try os.socket(domain, raw_flags, protocol) };
}
/// Closes the socket.
@ -44,11 +48,16 @@ pub fn Mixin(comptime Socket: type) type {
/// Accept a pending incoming connection queued to the kernel backlog
/// of the socket.
pub fn accept(self: Socket, flags: u32) !Socket.Connection {
var address: os.sockaddr_storage = undefined;
var address_len: u32 = @sizeOf(os.sockaddr_storage);
pub fn accept(self: Socket, flags: std.enums.EnumFieldStruct(Socket.InitFlags, bool, false)) !Socket.Connection {
var address: Socket.Address.Native.Storage = undefined;
var address_len: u32 = @sizeOf(Socket.Address.Native.Storage);
const socket = Socket{ .fd = try os.accept(self.fd, @ptrCast(*os.sockaddr, &address), &address_len, flags) };
var raw_flags: u32 = 0;
const set = std.EnumSet(Socket.InitFlags).init(flags);
if (set.contains(.close_on_exec)) raw_flags |= os.SOCK_CLOEXEC;
if (set.contains(.nonblocking)) raw_flags |= os.SOCK_NONBLOCK;
const socket = Socket{ .fd = try os.accept(self.fd, @ptrCast(*os.sockaddr, &address), &address_len, raw_flags) };
const socket_address = Socket.Address.fromNative(@ptrCast(*os.sockaddr, &address));
return Socket.Connection.from(socket, socket_address);
@ -69,48 +78,76 @@ pub fn Mixin(comptime Socket: type) type {
/// Writes multiple I/O vectors with a prepended message header to the socket
/// with a set of flags specified. It returns the number of bytes that are
/// written to the socket.
pub fn writeVectorized(self: Socket, msg: os.msghdr_const, flags: u32) !usize {
return os.sendmsg(self.fd, msg, flags);
pub fn writeMessage(self: Socket, msg: Socket.Message, flags: u32) !usize {
while (true) {
const rc = os.system.sendmsg(self.fd, &msg, @intCast(c_int, flags));
return switch (os.errno(rc)) {
0 => return @intCast(usize, rc),
os.EACCES => error.AccessDenied,
os.EAGAIN => error.WouldBlock,
os.EALREADY => error.FastOpenAlreadyInProgress,
os.EBADF => unreachable, // always a race condition
os.ECONNRESET => error.ConnectionResetByPeer,
os.EDESTADDRREQ => unreachable, // The socket is not connection-mode, and no peer address is set.
os.EFAULT => unreachable, // An invalid user space address was specified for an argument.
os.EINTR => continue,
os.EINVAL => unreachable, // Invalid argument passed.
os.EISCONN => unreachable, // connection-mode socket was connected already but a recipient was specified
os.EMSGSIZE => error.MessageTooBig,
os.ENOBUFS => error.SystemResources,
os.ENOMEM => error.SystemResources,
os.ENOTSOCK => unreachable, // The file descriptor sockfd does not refer to a socket.
os.EOPNOTSUPP => unreachable, // Some bit in the flags argument is inappropriate for the socket type.
os.EPIPE => error.BrokenPipe,
os.EAFNOSUPPORT => error.AddressFamilyNotSupported,
os.ELOOP => error.SymLinkLoop,
os.ENAMETOOLONG => error.NameTooLong,
os.ENOENT => error.FileNotFound,
os.ENOTDIR => error.NotDir,
os.EHOSTUNREACH => error.NetworkUnreachable,
os.ENETUNREACH => error.NetworkUnreachable,
os.ENOTCONN => error.SocketNotConnected,
os.ENETDOWN => error.NetworkSubsystemFailed,
else => |err| os.unexpectedErrno(err),
};
}
}
/// Read multiple I/O vectors with a prepended message header from the socket
/// with a set of flags specified. It returns the number of bytes that were
/// read into the buffer provided.
pub fn readVectorized(self: Socket, msg: *os.msghdr, flags: u32) !usize {
if (comptime @hasDecl(os.system, "recvmsg")) {
while (true) {
const rc = os.system.recvmsg(self.fd, msg, flags);
return switch (os.errno(rc)) {
0 => @intCast(usize, rc),
os.EBADF => unreachable, // always a race condition
os.EFAULT => unreachable,
os.EINVAL => unreachable,
os.ENOTCONN => unreachable,
os.ENOTSOCK => unreachable,
os.EINTR => continue,
os.EAGAIN => error.WouldBlock,
os.ENOMEM => error.SystemResources,
os.ECONNREFUSED => error.ConnectionRefused,
os.ECONNRESET => error.ConnectionResetByPeer,
else => |err| os.unexpectedErrno(err),
};
}
pub fn readMessage(self: Socket, msg: *Socket.Message, flags: u32) !usize {
while (true) {
const rc = os.system.recvmsg(self.fd, msg, @intCast(c_int, flags));
return switch (os.errno(rc)) {
0 => @intCast(usize, rc),
os.EBADF => unreachable, // always a race condition
os.EFAULT => unreachable,
os.EINVAL => unreachable,
os.ENOTCONN => unreachable,
os.ENOTSOCK => unreachable,
os.EINTR => continue,
os.EAGAIN => error.WouldBlock,
os.ENOMEM => error.SystemResources,
os.ECONNREFUSED => error.ConnectionRefused,
os.ECONNRESET => error.ConnectionResetByPeer,
else => |err| os.unexpectedErrno(err),
};
}
return error.NotSupported;
}
/// Query the address that the socket is locally bounded to.
pub fn getLocalAddress(self: Socket) !Socket.Address {
var address: os.sockaddr_storage = undefined;
var address_len: u32 = @sizeOf(os.sockaddr_storage);
var address: Socket.Address.Native.Storage = undefined;
var address_len: u32 = @sizeOf(Socket.Address.Native.Storage);
try os.getsockname(self.fd, @ptrCast(*os.sockaddr, &address), &address_len);
return Socket.Address.fromNative(@ptrCast(*os.sockaddr, &address));
}
/// Query the address that the socket is connected to.
pub fn getRemoteAddress(self: Socket) !Socket.Address {
var address: os.sockaddr_storage = undefined;
var address_len: u32 = @sizeOf(os.sockaddr_storage);
var address: Socket.Address.Native.Storage = undefined;
var address_len: u32 = @sizeOf(Socket.Address.Native.Storage);
try os.getpeername(self.fd, @ptrCast(*os.sockaddr, &address), &address_len);
return Socket.Address.fromNative(@ptrCast(*os.sockaddr, &address));
}
@ -165,14 +202,7 @@ pub fn Mixin(comptime Socket: type) type {
/// seconds.
pub fn setLinger(self: Socket, timeout_seconds: ?u16) !void {
if (comptime @hasDecl(os, "SO_LINGER")) {
const settings = extern struct {
l_onoff: c_int,
l_linger: c_int,
}{
.l_onoff = @intCast(c_int, @boolToInt(timeout_seconds != null)),
.l_linger = if (timeout_seconds) |seconds| @intCast(c_int, seconds) else 0,
};
const settings = Socket.Linger.init(timeout_seconds);
return self.setOption(os.SOL_SOCKET, os.SO_LINGER, mem.asBytes(&settings));
}

View File

@ -16,27 +16,24 @@ const ws2_32 = windows.ws2_32;
pub fn Mixin(comptime Socket: type) type {
return struct {
/// Open a new socket.
pub fn init(domain: u32, socket_type: u32, protocol: u32) !Socket {
var filtered_socket_type = socket_type & ~@as(u32, os.SOCK_CLOEXEC);
var filtered_flags: u32 = ws2_32.WSA_FLAG_OVERLAPPED;
if (socket_type & os.SOCK_CLOEXEC != 0) {
filtered_flags |= ws2_32.WSA_FLAG_NO_HANDLE_INHERIT;
}
pub fn init(domain: u32, socket_type: u32, protocol: u32, flags: std.enums.EnumFieldStruct(Socket.InitFlags, bool, false)) !Socket {
var raw_flags: u32 = 0;
const set = std.EnumSet(Socket.InitFlags).init(flags);
if (set.contains(.close_on_exec)) raw_flags |= ws2_32.WSA_FLAG_NO_HANDLE_INHERIT;
const fd = ws2_32.WSASocketW(
@intCast(i32, domain),
@intCast(i32, filtered_socket_type),
@intCast(i32, socket_type),
@intCast(i32, protocol),
null,
0,
filtered_flags,
raw_flags,
);
if (fd == ws2_32.INVALID_SOCKET) {
return switch (ws2_32.WSAGetLastError()) {
.WSANOTINITIALISED => {
_ = try windows.WSAStartup(2, 2);
return Socket.init(domain, socket_type, protocol);
return Socket.init(domain, socket_type, protocol, flags);
},
.WSAEAFNOSUPPORT => error.AddressFamilyNotSupported,
.WSAEMFILE => error.ProcessFdQuotaExceeded,
@ -46,6 +43,14 @@ pub fn Mixin(comptime Socket: type) type {
};
}
if (set.contains(.nonblocking)) {
var enabled: c_ulong = 1;
const rc = ws2_32.ioctlsocket(fd, ws2_32.FIONBIO, &enabled);
if (rc == ws2_32.SOCKET_ERROR) {
return windows.unexpectedWSAError(ws2_32.WSAGetLastError());
}
}
return Socket{ .fd = fd };
}
@ -138,12 +143,12 @@ pub fn Mixin(comptime Socket: type) type {
/// Accept a pending incoming connection queued to the kernel backlog
/// of the socket.
pub fn accept(self: Socket, flags: u32) !Socket.Connection {
var address: ws2_32.sockaddr_storage = undefined;
var address_len: c_int = @sizeOf(ws2_32.sockaddr_storage);
pub fn accept(self: Socket, flags: std.enums.EnumFieldStruct(Socket.InitFlags, bool, false)) !Socket.Connection {
var address: Socket.Address.Native.Storage = undefined;
var address_len: c_int = @sizeOf(Socket.Address.Native.Storage);
const rc = ws2_32.accept(self.fd, @ptrCast(*ws2_32.sockaddr, &address), &address_len);
if (rc == ws2_32.INVALID_SOCKET) {
const fd = ws2_32.accept(self.fd, @ptrCast(*ws2_32.sockaddr, &address), &address_len);
if (fd == ws2_32.INVALID_SOCKET) {
return switch (ws2_32.WSAGetLastError()) {
.WSANOTINITIALISED => unreachable,
.WSAECONNRESET => error.ConnectionResetByPeer,
@ -158,9 +163,20 @@ pub fn Mixin(comptime Socket: type) type {
};
}
const socket = Socket.from(rc);
const socket = Socket.from(fd);
errdefer socket.deinit();
const socket_address = Socket.Address.fromNative(@ptrCast(*ws2_32.sockaddr, &address));
const set = std.EnumSet(Socket.InitFlags).init(flags);
if (set.contains(.nonblocking)) {
var enabled: c_ulong = 1;
const rc = ws2_32.ioctlsocket(fd, ws2_32.FIONBIO, &enabled);
if (rc == ws2_32.SOCKET_ERROR) {
return windows.unexpectedWSAError(ws2_32.WSAGetLastError());
}
}
return Socket.Connection.from(socket, socket_address);
}
@ -238,7 +254,7 @@ pub fn Mixin(comptime Socket: type) type {
/// Writes multiple I/O vectors with a prepended message header to the socket
/// with a set of flags specified. It returns the number of bytes that are
/// written to the socket.
pub fn writeVectorized(self: Socket, msg: ws2_32.msghdr_const, flags: u32) !usize {
pub fn writeMessage(self: Socket, msg: Socket.Message, flags: u32) !usize {
const call = try windows.loadWinsockExtensionFunction(ws2_32.LPFN_WSASENDMSG, self.fd, ws2_32.WSAID_WSASENDMSG);
var num_bytes: u32 = undefined;
@ -275,7 +291,7 @@ pub fn Mixin(comptime Socket: type) type {
/// Read multiple I/O vectors with a prepended message header from the socket
/// with a set of flags specified. It returns the number of bytes that were
/// read into the buffer provided.
pub fn readVectorized(self: Socket, msg: *ws2_32.msghdr, flags: u32) !usize {
pub fn readMessage(self: Socket, msg: *Socket.Message, flags: u32) !usize {
const call = try windows.loadWinsockExtensionFunction(ws2_32.LPFN_WSARECVMSG, self.fd, ws2_32.WSAID_WSARECVMSG);
var num_bytes: u32 = undefined;
@ -311,8 +327,8 @@ pub fn Mixin(comptime Socket: type) type {
/// Query the address that the socket is locally bounded to.
pub fn getLocalAddress(self: Socket) !Socket.Address {
var address: ws2_32.sockaddr_storage = undefined;
var address_len: c_int = @sizeOf(ws2_32.sockaddr_storage);
var address: Socket.Address.Native.Storage = undefined;
var address_len: c_int = @sizeOf(Socket.Address.Native.Storage);
const rc = ws2_32.getsockname(self.fd, @ptrCast(*ws2_32.sockaddr, &address), &address_len);
if (rc == ws2_32.SOCKET_ERROR) {
@ -331,8 +347,8 @@ pub fn Mixin(comptime Socket: type) type {
/// Query the address that the socket is connected to.
pub fn getRemoteAddress(self: Socket) !Socket.Address {
var address: ws2_32.sockaddr_storage = undefined;
var address_len: c_int = @sizeOf(ws2_32.sockaddr_storage);
var address: Socket.Address.Native.Storage = undefined;
var address_len: c_int = @sizeOf(Socket.Address.Native.Storage);
const rc = ws2_32.getpeername(self.fd, @ptrCast(*ws2_32.sockaddr, &address), &address_len);
if (rc == ws2_32.SOCKET_ERROR) {
@ -384,11 +400,7 @@ pub fn Mixin(comptime Socket: type) type {
/// if the host does not support the option for a socket to linger around up until a timeout specified in
/// seconds.
pub fn setLinger(self: Socket, timeout_seconds: ?u16) !void {
const settings = ws2_32.linger{
.l_onoff = @as(u16, @boolToInt(timeout_seconds != null)),
.l_linger = if (timeout_seconds) |seconds| seconds else 0,
};
const settings = Socket.Linger.init(timeout_seconds);
return self.setOption(ws2_32.SOL_SOCKET, ws2_32.SO_LINGER, mem.asBytes(&settings));
}