std.Io.Threaded: import std.Io.net

This commit is contained in:
Andrew Kelley 2025-10-14 10:50:00 -07:00
parent 2bcdde2985
commit 1382e41226

View File

@ -6,10 +6,11 @@ const is_windows = native_os == .windows;
const windows = std.os.windows; const windows = std.os.windows;
const std = @import("../std.zig"); const std = @import("../std.zig");
const Io = std.Io;
const net = std.Io.net;
const Allocator = std.mem.Allocator; const Allocator = std.mem.Allocator;
const assert = std.debug.assert; const assert = std.debug.assert;
const posix = std.posix; const posix = std.posix;
const Io = std.Io;
const ResetEvent = std.Thread.ResetEvent; const ResetEvent = std.Thread.ResetEvent;
/// Thread-safe. /// Thread-safe.
@ -1692,9 +1693,9 @@ fn select(userdata: ?*anyopaque, futures: []const *Io.AnyFuture) usize {
fn netListenIpPosix( fn netListenIpPosix(
userdata: ?*anyopaque, userdata: ?*anyopaque,
address: Io.net.IpAddress, address: net.IpAddress,
options: Io.net.IpAddress.ListenOptions, options: net.IpAddress.ListenOptions,
) Io.net.IpAddress.ListenError!Io.net.Server { ) net.IpAddress.ListenError!net.Server {
const pool: *Pool = @ptrCast(@alignCast(userdata)); const pool: *Pool = @ptrCast(@alignCast(userdata));
const family = posixAddressFamily(&address); const family = posixAddressFamily(&address);
const socket_fd = try openSocketPosix(pool, family, .{ const socket_fd = try openSocketPosix(pool, family, .{
@ -1734,10 +1735,10 @@ fn netListenIpPosix(
fn netListenUnix( fn netListenUnix(
userdata: ?*anyopaque, userdata: ?*anyopaque,
address: *const Io.net.UnixAddress, address: *const net.UnixAddress,
options: Io.net.UnixAddress.ListenOptions, options: net.UnixAddress.ListenOptions,
) Io.net.UnixAddress.ListenError!Io.net.Socket.Handle { ) net.UnixAddress.ListenError!net.Socket.Handle {
if (!Io.net.has_unix_sockets) return error.AddressFamilyUnsupported; if (!net.has_unix_sockets) return error.AddressFamilyUnsupported;
const pool: *Pool = @ptrCast(@alignCast(userdata)); const pool: *Pool = @ptrCast(@alignCast(userdata));
const socket_fd = openSocketPosix(pool, posix.AF.UNIX, .{ .mode = .stream }) catch |err| switch (err) { const socket_fd = openSocketPosix(pool, posix.AF.UNIX, .{ .mode = .stream }) catch |err| switch (err) {
error.ProtocolUnsupportedBySystem => return error.AddressFamilyUnsupported, error.ProtocolUnsupportedBySystem => return error.AddressFamilyUnsupported,
@ -1903,9 +1904,9 @@ fn setSocketOption(pool: *Pool, fd: posix.fd_t, level: i32, opt_name: u32, optio
fn netConnectIpPosix( fn netConnectIpPosix(
userdata: ?*anyopaque, userdata: ?*anyopaque,
address: *const Io.net.IpAddress, address: *const net.IpAddress,
options: Io.net.IpAddress.ConnectOptions, options: net.IpAddress.ConnectOptions,
) Io.net.IpAddress.ConnectError!Io.net.Stream { ) net.IpAddress.ConnectError!net.Stream {
if (options.timeout != .none) @panic("TODO"); if (options.timeout != .none) @panic("TODO");
const pool: *Pool = @ptrCast(@alignCast(userdata)); const pool: *Pool = @ptrCast(@alignCast(userdata));
const family = posixAddressFamily(address); const family = posixAddressFamily(address);
@ -1926,9 +1927,9 @@ fn netConnectIpPosix(
fn netConnectUnix( fn netConnectUnix(
userdata: ?*anyopaque, userdata: ?*anyopaque,
address: *const Io.net.UnixAddress, address: *const net.UnixAddress,
) Io.net.UnixAddress.ConnectError!Io.net.Socket.Handle { ) net.UnixAddress.ConnectError!net.Socket.Handle {
if (!Io.net.has_unix_sockets) return error.AddressFamilyUnsupported; if (!net.has_unix_sockets) return error.AddressFamilyUnsupported;
const pool: *Pool = @ptrCast(@alignCast(userdata)); const pool: *Pool = @ptrCast(@alignCast(userdata));
const socket_fd = try openSocketPosix(pool, posix.AF.UNIX, .{ .mode = .stream }); const socket_fd = try openSocketPosix(pool, posix.AF.UNIX, .{ .mode = .stream });
errdefer posix.close(socket_fd); errdefer posix.close(socket_fd);
@ -1940,9 +1941,9 @@ fn netConnectUnix(
fn netBindIpPosix( fn netBindIpPosix(
userdata: ?*anyopaque, userdata: ?*anyopaque,
address: *const Io.net.IpAddress, address: *const net.IpAddress,
options: Io.net.IpAddress.BindOptions, options: net.IpAddress.BindOptions,
) Io.net.IpAddress.BindError!Io.net.Socket { ) net.IpAddress.BindError!net.Socket {
const pool: *Pool = @ptrCast(@alignCast(userdata)); const pool: *Pool = @ptrCast(@alignCast(userdata));
const family = posixAddressFamily(address); const family = posixAddressFamily(address);
const socket_fd = try openSocketPosix(pool, family, options); const socket_fd = try openSocketPosix(pool, family, options);
@ -1957,7 +1958,7 @@ fn netBindIpPosix(
}; };
} }
fn openSocketPosix(pool: *Pool, family: posix.sa_family_t, options: Io.net.IpAddress.BindOptions) !posix.socket_t { fn openSocketPosix(pool: *Pool, family: posix.sa_family_t, options: net.IpAddress.BindOptions) !posix.socket_t {
const mode = posixSocketMode(options.mode); const mode = posixSocketMode(options.mode);
const protocol = posixProtocol(options.protocol); const protocol = posixProtocol(options.protocol);
const socket_fd = while (true) { const socket_fd = while (true) {
@ -2003,7 +2004,7 @@ fn openSocketPosix(pool: *Pool, family: posix.sa_family_t, options: Io.net.IpAdd
const socket_flags_unsupported = builtin.os.tag.isDarwin() or native_os == .haiku; // 💩💩 const socket_flags_unsupported = builtin.os.tag.isDarwin() or native_os == .haiku; // 💩💩
const have_accept4 = !socket_flags_unsupported; const have_accept4 = !socket_flags_unsupported;
fn netAcceptPosix(userdata: ?*anyopaque, listen_fd: Io.net.Socket.Handle) Io.net.Server.AcceptError!Io.net.Stream { fn netAcceptPosix(userdata: ?*anyopaque, listen_fd: net.Socket.Handle) net.Server.AcceptError!net.Stream {
const pool: *Pool = @ptrCast(@alignCast(userdata)); const pool: *Pool = @ptrCast(@alignCast(userdata));
var storage: PosixAddress = undefined; var storage: PosixAddress = undefined;
var addr_len: posix.socklen_t = @sizeOf(PosixAddress); var addr_len: posix.socklen_t = @sizeOf(PosixAddress);
@ -2050,7 +2051,7 @@ fn netAcceptPosix(userdata: ?*anyopaque, listen_fd: Io.net.Socket.Handle) Io.net
} }; } };
} }
fn netReadPosix(userdata: ?*anyopaque, fd: Io.net.Socket.Handle, data: [][]u8) Io.net.Stream.Reader.Error!usize { fn netReadPosix(userdata: ?*anyopaque, fd: net.Socket.Handle, data: [][]u8) net.Stream.Reader.Error!usize {
const pool: *Pool = @ptrCast(@alignCast(userdata)); const pool: *Pool = @ptrCast(@alignCast(userdata));
var iovecs_buffer: [max_iovecs_len]posix.iovec = undefined; var iovecs_buffer: [max_iovecs_len]posix.iovec = undefined;
@ -2113,10 +2114,10 @@ const have_sendmmsg = builtin.os.tag == .linux;
fn netSend( fn netSend(
userdata: ?*anyopaque, userdata: ?*anyopaque,
handle: Io.net.Socket.Handle, handle: net.Socket.Handle,
messages: []Io.net.OutgoingMessage, messages: []net.OutgoingMessage,
flags: Io.net.SendFlags, flags: net.SendFlags,
) struct { ?Io.net.Socket.SendError, usize } { ) struct { ?net.Socket.SendError, usize } {
const pool: *Pool = @ptrCast(@alignCast(userdata)); const pool: *Pool = @ptrCast(@alignCast(userdata));
const posix_flags: u32 = const posix_flags: u32 =
@ -2141,10 +2142,10 @@ fn netSend(
fn netSendOne( fn netSendOne(
pool: *Pool, pool: *Pool,
handle: Io.net.Socket.Handle, handle: net.Socket.Handle,
message: *Io.net.OutgoingMessage, message: *net.OutgoingMessage,
flags: u32, flags: u32,
) Io.net.Socket.SendError!void { ) net.Socket.SendError!void {
var addr: PosixAddress = undefined; var addr: PosixAddress = undefined;
var iovec: posix.iovec = .{ .base = @constCast(message.data_ptr), .len = message.data_len }; var iovec: posix.iovec = .{ .base = @constCast(message.data_ptr), .len = message.data_len };
const msg: posix.msghdr = .{ const msg: posix.msghdr = .{
@ -2225,10 +2226,10 @@ fn netSendOne(
fn netSendMany( fn netSendMany(
pool: *Pool, pool: *Pool,
handle: Io.net.Socket.Handle, handle: net.Socket.Handle,
messages: []Io.net.OutgoingMessage, messages: []net.OutgoingMessage,
flags: u32, flags: u32,
) Io.net.Socket.SendError!usize { ) net.Socket.SendError!usize {
var msg_buffer: [64]std.os.linux.mmsghdr = undefined; var msg_buffer: [64]std.os.linux.mmsghdr = undefined;
var addr_buffer: [msg_buffer.len]PosixAddress = undefined; var addr_buffer: [msg_buffer.len]PosixAddress = undefined;
var iovecs_buffer: [msg_buffer.len]posix.iovec = undefined; var iovecs_buffer: [msg_buffer.len]posix.iovec = undefined;
@ -2292,12 +2293,12 @@ fn netSendMany(
fn netReceive( fn netReceive(
userdata: ?*anyopaque, userdata: ?*anyopaque,
handle: Io.net.Socket.Handle, handle: net.Socket.Handle,
message_buffer: []Io.net.IncomingMessage, message_buffer: []net.IncomingMessage,
data_buffer: []u8, data_buffer: []u8,
flags: Io.net.ReceiveFlags, flags: net.ReceiveFlags,
timeout: Io.Timeout, timeout: Io.Timeout,
) struct { ?Io.net.Socket.ReceiveTimeoutError, usize } { ) struct { ?net.Socket.ReceiveTimeoutError, usize } {
const pool: *Pool = @ptrCast(@alignCast(userdata)); const pool: *Pool = @ptrCast(@alignCast(userdata));
// recvmmsg is useless, here's why: // recvmmsg is useless, here's why:
@ -2418,11 +2419,11 @@ fn netReceive(
fn netWritePosix( fn netWritePosix(
userdata: ?*anyopaque, userdata: ?*anyopaque,
fd: Io.net.Socket.Handle, fd: net.Socket.Handle,
header: []const u8, header: []const u8,
data: []const []const u8, data: []const []const u8,
splat: usize, splat: usize,
) Io.net.Stream.Writer.Error!usize { ) net.Stream.Writer.Error!usize {
const pool: *Pool = @ptrCast(@alignCast(userdata)); const pool: *Pool = @ptrCast(@alignCast(userdata));
try pool.checkCancel(); try pool.checkCancel();
@ -2476,7 +2477,7 @@ fn addBuf(v: []posix.iovec_const, i: *@FieldType(posix.msghdr_const, "iovlen"),
i.* += 1; i.* += 1;
} }
fn netClose(userdata: ?*anyopaque, handle: Io.net.Socket.Handle) void { fn netClose(userdata: ?*anyopaque, handle: net.Socket.Handle) void {
const pool: *Pool = @ptrCast(@alignCast(userdata)); const pool: *Pool = @ptrCast(@alignCast(userdata));
_ = pool; _ = pool;
switch (native_os) { switch (native_os) {
@ -2487,8 +2488,8 @@ fn netClose(userdata: ?*anyopaque, handle: Io.net.Socket.Handle) void {
fn netInterfaceNameResolve( fn netInterfaceNameResolve(
userdata: ?*anyopaque, userdata: ?*anyopaque,
name: *const Io.net.Interface.Name, name: *const net.Interface.Name,
) Io.net.Interface.Name.ResolveError!Io.net.Interface { ) net.Interface.Name.ResolveError!net.Interface {
const pool: *Pool = @ptrCast(@alignCast(userdata)); const pool: *Pool = @ptrCast(@alignCast(userdata));
if (native_os == .linux) { if (native_os == .linux) {
@ -2542,7 +2543,7 @@ fn netInterfaceNameResolve(
@panic("unimplemented"); @panic("unimplemented");
} }
fn netInterfaceName(userdata: ?*anyopaque, interface: Io.net.Interface) Io.net.Interface.NameError!Io.net.Interface.Name { fn netInterfaceName(userdata: ?*anyopaque, interface: net.Interface) net.Interface.NameError!net.Interface.Name {
const pool: *Pool = @ptrCast(@alignCast(userdata)); const pool: *Pool = @ptrCast(@alignCast(userdata));
try pool.checkCancel(); try pool.checkCancel();
@ -2573,14 +2574,14 @@ const UnixAddress = extern union {
un: posix.sockaddr.un, un: posix.sockaddr.un,
}; };
fn posixAddressFamily(a: *const Io.net.IpAddress) posix.sa_family_t { fn posixAddressFamily(a: *const net.IpAddress) posix.sa_family_t {
return switch (a.*) { return switch (a.*) {
.ip4 => posix.AF.INET, .ip4 => posix.AF.INET,
.ip6 => posix.AF.INET6, .ip6 => posix.AF.INET6,
}; };
} }
fn addressFromPosix(posix_address: *PosixAddress) Io.net.IpAddress { fn addressFromPosix(posix_address: *PosixAddress) net.IpAddress {
return switch (posix_address.any.family) { return switch (posix_address.any.family) {
posix.AF.INET => .{ .ip4 = address4FromPosix(&posix_address.in) }, posix.AF.INET => .{ .ip4 = address4FromPosix(&posix_address.in) },
posix.AF.INET6 => .{ .ip6 = address6FromPosix(&posix_address.in6) }, posix.AF.INET6 => .{ .ip6 = address6FromPosix(&posix_address.in6) },
@ -2588,7 +2589,7 @@ fn addressFromPosix(posix_address: *PosixAddress) Io.net.IpAddress {
}; };
} }
fn addressToPosix(a: *const Io.net.IpAddress, storage: *PosixAddress) posix.socklen_t { fn addressToPosix(a: *const net.IpAddress, storage: *PosixAddress) posix.socklen_t {
return switch (a.*) { return switch (a.*) {
.ip4 => |ip4| { .ip4 => |ip4| {
storage.in = address4ToPosix(ip4); storage.in = address4ToPosix(ip4);
@ -2601,21 +2602,21 @@ fn addressToPosix(a: *const Io.net.IpAddress, storage: *PosixAddress) posix.sock
}; };
} }
fn addressUnixToPosix(a: *const Io.net.UnixAddress, storage: *UnixAddress) posix.socklen_t { fn addressUnixToPosix(a: *const net.UnixAddress, storage: *UnixAddress) posix.socklen_t {
@memcpy(storage.un.path[0..a.path.len], a.path); @memcpy(storage.un.path[0..a.path.len], a.path);
storage.un.family = posix.AF.UNIX; storage.un.family = posix.AF.UNIX;
storage.un.path[a.path.len] = 0; storage.un.path[a.path.len] = 0;
return @sizeOf(posix.sockaddr.un); return @sizeOf(posix.sockaddr.un);
} }
fn address4FromPosix(in: *posix.sockaddr.in) Io.net.Ip4Address { fn address4FromPosix(in: *posix.sockaddr.in) net.Ip4Address {
return .{ return .{
.port = std.mem.bigToNative(u16, in.port), .port = std.mem.bigToNative(u16, in.port),
.bytes = @bitCast(in.addr), .bytes = @bitCast(in.addr),
}; };
} }
fn address6FromPosix(in6: *posix.sockaddr.in6) Io.net.Ip6Address { fn address6FromPosix(in6: *posix.sockaddr.in6) net.Ip6Address {
return .{ return .{
.port = std.mem.bigToNative(u16, in6.port), .port = std.mem.bigToNative(u16, in6.port),
.bytes = in6.addr, .bytes = in6.addr,
@ -2624,14 +2625,14 @@ fn address6FromPosix(in6: *posix.sockaddr.in6) Io.net.Ip6Address {
}; };
} }
fn address4ToPosix(a: Io.net.Ip4Address) posix.sockaddr.in { fn address4ToPosix(a: net.Ip4Address) posix.sockaddr.in {
return .{ return .{
.port = std.mem.nativeToBig(u16, a.port), .port = std.mem.nativeToBig(u16, a.port),
.addr = @bitCast(a.bytes), .addr = @bitCast(a.bytes),
}; };
} }
fn address6ToPosix(a: *const Io.net.Ip6Address) posix.sockaddr.in6 { fn address6ToPosix(a: *const net.Ip6Address) posix.sockaddr.in6 {
return .{ return .{
.port = std.mem.nativeToBig(u16, a.port), .port = std.mem.nativeToBig(u16, a.port),
.flowinfo = a.flow, .flowinfo = a.flow,
@ -2647,7 +2648,7 @@ fn errnoBug(err: posix.E) Io.UnexpectedError {
} }
} }
fn posixSocketMode(mode: Io.net.Socket.Mode) u32 { fn posixSocketMode(mode: net.Socket.Mode) u32 {
return switch (mode) { return switch (mode) {
.stream => posix.SOCK.STREAM, .stream => posix.SOCK.STREAM,
.dgram => posix.SOCK.DGRAM, .dgram => posix.SOCK.DGRAM,
@ -2657,7 +2658,7 @@ fn posixSocketMode(mode: Io.net.Socket.Mode) u32 {
}; };
} }
fn posixProtocol(protocol: ?Io.net.Protocol) u32 { fn posixProtocol(protocol: ?net.Protocol) u32 {
return @intFromEnum(protocol orelse return 0); return @intFromEnum(protocol orelse return 0);
} }