diff --git a/lib/std/Io.zig b/lib/std/Io.zig index 42b80bb38d..b11ee66cee 100644 --- a/lib/std/Io.zig +++ b/lib/std/Io.zig @@ -559,6 +559,7 @@ const Io = @This(); pub const EventLoop = @import("Io/EventLoop.zig"); pub const ThreadPool = @import("Io/ThreadPool.zig"); +pub const net = @import("Io/net.zig"); userdata: ?*anyopaque, vtable: *const VTable, @@ -656,6 +657,12 @@ pub const VTable = struct { now: *const fn (?*anyopaque, clockid: std.posix.clockid_t) ClockGetTimeError!Timestamp, sleep: *const fn (?*anyopaque, clockid: std.posix.clockid_t, deadline: Deadline) SleepError!void, + + listen: *const fn (?*anyopaque, address: net.IpAddress, options: net.ListenOptions) net.ListenError!net.Server, + accept: *const fn (?*anyopaque, server: *net.Server) net.Server.AcceptError!net.Server.Connection, + netRead: *const fn (?*anyopaque, src: net.Stream, dest: *Io.Writer, limit: Io.Limit) net.Stream.Reader.Error!usize, + netWrite: *const fn (?*anyopaque, dest: net.Stream, header: []const u8, data: []const []const u8, splat: usize) net.Stream.Writer.Error!usize, + netClose: *const fn (?*anyopaque, stream: net.Stream) void, }; pub const Cancelable = error{ diff --git a/lib/std/Io/ThreadPool.zig b/lib/std/Io/ThreadPool.zig index 0006840047..13eb2e7695 100644 --- a/lib/std/Io/ThreadPool.zig +++ b/lib/std/Io/ThreadPool.zig @@ -3,6 +3,7 @@ const std = @import("../std.zig"); const Allocator = std.mem.Allocator; const assert = std.debug.assert; const WaitGroup = std.Thread.WaitGroup; +const posix = std.posix; const Io = std.Io; const Pool = @This(); @@ -19,6 +20,9 @@ parallel_count: usize, threadlocal var current_closure: ?*AsyncClosure = null; +const max_iovecs_len = 8; +const splat_buffer_size = 64; + pub const Runnable = struct { start: Start, node: std.SinglyLinkedList.Node = .{}, @@ -107,6 +111,18 @@ pub fn io(pool: *Pool) Io { .now = now, .sleep = sleep, + + .listen = listen, + .accept = accept, + .netRead = switch (builtin.os.tag) { + .windows => @panic("TODO"), + else => netReadPosix, + }, + .netWrite = switch (builtin.os.tag) { + .windows => @panic("TODO"), + else => netWritePosix, + }, + .netClose = netClose, }, }; } @@ -461,7 +477,7 @@ fn cancel( .linux => _ = std.os.linux.tgkill( std.os.linux.getpid(), @bitCast(cancel_tid), - std.posix.SIG.IO, + posix.SIG.IO, ), else => {}, }, @@ -635,7 +651,7 @@ fn closeFile(userdata: ?*anyopaque, file: Io.File) void { return fs_file.close(); } -fn pread(userdata: ?*anyopaque, file: Io.File, buffer: []u8, offset: std.posix.off_t) Io.File.PReadError!usize { +fn pread(userdata: ?*anyopaque, file: Io.File, buffer: []u8, offset: posix.off_t) Io.File.PReadError!usize { const pool: *Pool = @alignCast(@ptrCast(userdata)); try pool.checkCancel(); const fs_file: std.fs.File = .{ .handle = file.handle }; @@ -645,7 +661,7 @@ fn pread(userdata: ?*anyopaque, file: Io.File, buffer: []u8, offset: std.posix.o }; } -fn pwrite(userdata: ?*anyopaque, file: Io.File, buffer: []const u8, offset: std.posix.off_t) Io.File.PWriteError!usize { +fn pwrite(userdata: ?*anyopaque, file: Io.File, buffer: []const u8, offset: posix.off_t) Io.File.PWriteError!usize { const pool: *Pool = @alignCast(@ptrCast(userdata)); try pool.checkCancel(); const fs_file: std.fs.File = .{ .handle = file.handle }; @@ -655,20 +671,20 @@ fn pwrite(userdata: ?*anyopaque, file: Io.File, buffer: []const u8, offset: std. }; } -fn now(userdata: ?*anyopaque, clockid: std.posix.clockid_t) Io.ClockGetTimeError!Io.Timestamp { +fn now(userdata: ?*anyopaque, clockid: posix.clockid_t) Io.ClockGetTimeError!Io.Timestamp { const pool: *Pool = @alignCast(@ptrCast(userdata)); try pool.checkCancel(); - const timespec = try std.posix.clock_gettime(clockid); + const timespec = try posix.clock_gettime(clockid); return @enumFromInt(@as(i128, timespec.sec) * std.time.ns_per_s + timespec.nsec); } -fn sleep(userdata: ?*anyopaque, clockid: std.posix.clockid_t, deadline: Io.Deadline) Io.SleepError!void { +fn sleep(userdata: ?*anyopaque, clockid: posix.clockid_t, deadline: Io.Deadline) Io.SleepError!void { const pool: *Pool = @alignCast(@ptrCast(userdata)); const deadline_nanoseconds: i96 = switch (deadline) { .duration => |duration| duration.nanoseconds, .timestamp => |timestamp| @intFromEnum(timestamp), }; - var timespec: std.posix.timespec = .{ + var timespec: posix.timespec = .{ .sec = @intCast(@divFloor(deadline_nanoseconds, std.time.ns_per_s)), .nsec = @intCast(@mod(deadline_nanoseconds, std.time.ns_per_s)), }; @@ -682,7 +698,7 @@ fn sleep(userdata: ?*anyopaque, clockid: std.posix.clockid_t, deadline: Io.Deadl .FAULT => unreachable, .INTR => {}, .INVAL => return error.UnsupportedClock, - else => |err| return std.posix.unexpectedErrno(err), + else => |err| return posix.unexpectedErrno(err), } } } @@ -718,3 +734,206 @@ fn select(userdata: ?*anyopaque, futures: []const *Io.AnyFuture) usize { } return result.?; } + +fn listen(userdata: ?*anyopaque, address: Io.net.IpAddress, options: Io.net.ListenOptions) Io.net.ListenError!Io.net.Server { + const pool: *Pool = @alignCast(@ptrCast(userdata)); + try pool.checkCancel(); + + const nonblock: u32 = if (options.force_nonblocking) posix.SOCK.NONBLOCK else 0; + const sock_flags = posix.SOCK.STREAM | posix.SOCK.CLOEXEC | nonblock; + const proto: u32 = posix.IPPROTO.TCP; + const family = posixAddressFamily(address); + const sockfd = try posix.socket(family, sock_flags, proto); + const stream: std.net.Stream = .{ .handle = sockfd }; + errdefer stream.close(); + + if (options.reuse_address) { + try posix.setsockopt( + sockfd, + posix.SOL.SOCKET, + posix.SO.REUSEADDR, + &std.mem.toBytes(@as(c_int, 1)), + ); + if (@hasDecl(posix.SO, "REUSEPORT") and family != posix.AF.UNIX) { + try posix.setsockopt( + sockfd, + posix.SOL.SOCKET, + posix.SO.REUSEPORT, + &std.mem.toBytes(@as(c_int, 1)), + ); + } + } + + var storage: PosixAddress = undefined; + var socklen = addressToPosix(address, &storage); + try posix.bind(sockfd, &storage.any, socklen); + try posix.listen(sockfd, options.kernel_backlog); + try posix.getsockname(sockfd, &storage.any, &socklen); + return .{ + .listen_address = addressFromPosix(&storage), + .stream = .{ .handle = stream.handle }, + }; +} + +fn accept(userdata: ?*anyopaque, server: *Io.net.Server) Io.net.Server.AcceptError!Io.net.Server.Connection { + const pool: *Pool = @alignCast(@ptrCast(userdata)); + try pool.checkCancel(); + + var storage: PosixAddress = undefined; + var addr_len: posix.socklen_t = @sizeOf(PosixAddress); + const fd = try posix.accept(server.stream.handle, &storage.any, &addr_len, posix.SOCK.CLOEXEC); + return .{ + .stream = .{ .handle = fd }, + .address = addressFromPosix(&storage), + }; +} + +fn netReadPosix( + userdata: ?*anyopaque, + stream: Io.net.Stream, + w: *Io.Writer, + limit: Io.Limit, +) Io.net.Stream.Reader.Error!usize { + const pool: *Pool = @alignCast(@ptrCast(userdata)); + try pool.checkCancel(); + + var iovecs_buffer: [max_iovecs_len]posix.iovec = undefined; + const dest = try w.writableVectorPosix(&iovecs_buffer, limit); + assert(dest[0].len > 0); + const n = try posix.readv(stream.handle, dest); + if (n == 0) return error.EndOfStream; + return n; +} + +fn netWritePosix( + userdata: ?*anyopaque, + stream: Io.net.Stream, + header: []const u8, + data: []const []const u8, + splat: usize, +) Io.net.Stream.Writer.Error!usize { + const pool: *Pool = @alignCast(@ptrCast(userdata)); + try pool.checkCancel(); + + var iovecs: [max_iovecs_len]posix.iovec_const = undefined; + var msg: posix.msghdr_const = .{ + .name = null, + .namelen = 0, + .iov = &iovecs, + .iovlen = 0, + .control = null, + .controllen = 0, + .flags = 0, + }; + addBuf(&iovecs, &msg.iovlen, header); + for (data[0 .. data.len - 1]) |bytes| addBuf(&iovecs, &msg.iovlen, bytes); + const pattern = data[data.len - 1]; + if (iovecs.len - msg.iovlen != 0) switch (splat) { + 0 => {}, + 1 => addBuf(&iovecs, &msg.iovlen, pattern), + else => switch (pattern.len) { + 0 => {}, + 1 => { + var backup_buffer: [splat_buffer_size]u8 = undefined; + const splat_buffer = &backup_buffer; + const memset_len = @min(splat_buffer.len, splat); + const buf = splat_buffer[0..memset_len]; + @memset(buf, pattern[0]); + addBuf(&iovecs, &msg.iovlen, buf); + var remaining_splat = splat - buf.len; + while (remaining_splat > splat_buffer.len and iovecs.len - msg.iovlen != 0) { + assert(buf.len == splat_buffer.len); + addBuf(&iovecs, &msg.iovlen, splat_buffer); + remaining_splat -= splat_buffer.len; + } + addBuf(&iovecs, &msg.iovlen, splat_buffer[0..remaining_splat]); + }, + else => for (0..@min(splat, iovecs.len - msg.iovlen)) |_| { + addBuf(&iovecs, &msg.iovlen, pattern); + }, + }, + }; + const flags = posix.MSG.NOSIGNAL; + return posix.sendmsg(stream.handle, &msg, flags); +} + +fn addBuf(v: []posix.iovec_const, i: *@FieldType(posix.msghdr_const, "iovlen"), bytes: []const u8) void { + // OS checks ptr addr before length so zero length vectors must be omitted. + if (bytes.len == 0) return; + if (v.len - i.* == 0) return; + v[i.*] = .{ .base = bytes.ptr, .len = bytes.len }; + i.* += 1; +} + +fn netClose(userdata: ?*anyopaque, stream: Io.net.Stream) void { + const pool: *Pool = @alignCast(@ptrCast(userdata)); + _ = pool; + const net_stream: std.net.Stream = .{ .handle = stream.handle }; + return net_stream.close(); +} + +const PosixAddress = extern union { + any: posix.sockaddr, + in: posix.sockaddr.in, + in6: posix.sockaddr.in6, +}; + +fn posixAddressFamily(a: Io.net.IpAddress) posix.sa_family_t { + return switch (a) { + .ip4 => posix.AF.INET, + .ip6 => posix.AF.INET6, + }; +} + +fn addressFromPosix(posix_address: *PosixAddress) Io.net.IpAddress { + return switch (posix_address.any.family) { + posix.AF.INET => .{ .ip4 = address4FromPosix(&posix_address.in) }, + posix.AF.INET6 => .{ .ip6 = address6FromPosix(&posix_address.in6) }, + else => unreachable, + }; +} + +fn addressToPosix(a: Io.net.IpAddress, storage: *PosixAddress) posix.socklen_t { + return switch (a) { + .ip4 => |ip4| { + storage.in = address4ToPosix(ip4); + return @sizeOf(posix.sockaddr.in); + }, + .ip6 => |ip6| { + storage.in6 = address6ToPosix(ip6); + return @sizeOf(posix.sockaddr.in6); + }, + }; +} + +fn address4FromPosix(in: *posix.sockaddr.in) Io.net.Ip4Address { + return .{ + .port = std.mem.bigToNative(u16, in.port), + .bytes = @bitCast(in.addr), + }; +} + +fn address6FromPosix(in6: *posix.sockaddr.in6) Io.net.Ip6Address { + return .{ + .port = std.mem.bigToNative(u16, in6.port), + .bytes = in6.addr, + .flowinfo = in6.flowinfo, + .scope_id = in6.scope_id, + }; +} + +fn address4ToPosix(a: Io.net.Ip4Address) posix.sockaddr.in { + return .{ + .port = std.mem.nativeToBig(u16, a.port), + .addr = @bitCast(a.bytes), + }; +} + +fn address6ToPosix(a: Io.net.Ip6Address) posix.sockaddr.in6 { + return .{ + .port = std.mem.nativeToBig(u16, a.port), + .flowinfo = a.flowinfo, + .addr = a.bytes, + .scope_id = a.scope_id, + }; +} diff --git a/lib/std/Io/net.zig b/lib/std/Io/net.zig new file mode 100644 index 0000000000..24c310edde --- /dev/null +++ b/lib/std/Io/net.zig @@ -0,0 +1,450 @@ +const builtin = @import("builtin"); +const native_os = builtin.os.tag; +const std = @import("../std.zig"); +const Io = std.Io; + +pub const ListenError = std.net.Address.ListenError || Io.Cancelable; + +pub const ListenOptions = struct { + /// How many connections the kernel will accept on the application's behalf. + /// If more than this many connections pool in the kernel, clients will start + /// seeing "Connection refused". + kernel_backlog: u31 = 128, + /// Sets SO_REUSEADDR and SO_REUSEPORT on POSIX. + /// Sets SO_REUSEADDR on Windows, which is roughly equivalent. + reuse_address: bool = false, + force_nonblocking: bool = false, +}; + +pub const IpAddress = union(enum) { + ip4: Ip4Address, + ip6: Ip6Address, + + /// Parse the given IP address string into an `IpAddress` value. + pub fn parse(name: []const u8, port: u16) !IpAddress { + if (parseIp4(name, port)) |ip4| return ip4 else |err| switch (err) { + error.Overflow, + error.InvalidEnd, + error.InvalidCharacter, + error.Incomplete, + error.NonCanonical, + => {}, + } + + if (parseIp6(name, port)) |ip6| return ip6 else |err| switch (err) { + error.Overflow, + error.InvalidEnd, + error.InvalidCharacter, + error.Incomplete, + error.InvalidIpv4Mapping, + => {}, + } + + return error.InvalidIpAddressFormat; + } + + pub fn parseIp6(buffer: []const u8, port: u16) Ip6Address.ParseError!IpAddress { + return .{ .ip6 = try Ip6Address.parse(buffer, port) }; + } + + pub fn parseIp4(buffer: []const u8, port: u16) Ip4Address.ParseError!IpAddress { + return .{ .ip4 = try Ip4Address.parse(buffer, port) }; + } + + /// Returns the port in native endian. + pub fn getPort(a: IpAddress) u16 { + return switch (a) { + inline .ip4, .ip6 => |x| x.port, + }; + } + + /// `port` is native-endian. + pub fn setPort(a: *IpAddress, port: u16) void { + switch (a) { + inline .ip4, .ip6 => |*x| x.port = port, + } + } + + pub fn format(a: IpAddress, w: *std.io.Writer) std.io.Writer.Error!void { + switch (a) { + .ip4, .ip6 => |x| return x.format(w), + } + } + + pub fn eql(a: IpAddress, b: IpAddress) bool { + return switch (a) { + .ip4 => |a_ip4| switch (b) { + .ip4 => |b_ip4| a_ip4.eql(b_ip4), + else => false, + }, + .ip6 => |a_ip6| switch (b) { + .ip6 => |b_ip6| a_ip6.eql(b_ip6), + else => false, + }, + }; + } + + /// The returned `Server` has an open `stream`. + pub fn listen(address: IpAddress, io: Io, options: ListenOptions) ListenError!Server { + return io.vtable.listen(io.userdata, address, options); + } +}; + +pub const Ip4Address = struct { + bytes: [4]u8, + port: u16, + + pub const ParseError = error{ + Overflow, + InvalidEnd, + InvalidCharacter, + Incomplete, + NonCanonical, + }; + + pub fn parse(buffer: []const u8, port: u16) ParseError!Ip4Address { + var bytes: [4]u8 = @splat(0); + var index: u8 = 0; + var saw_any_digits = false; + var has_zero_prefix = false; + for (buffer) |c| switch (c) { + '.' => { + if (!saw_any_digits) return error.InvalidCharacter; + if (index == 3) return error.InvalidEnd; + index += 1; + saw_any_digits = false; + has_zero_prefix = false; + }, + '0'...'9' => { + if (c == '0' and !saw_any_digits) { + has_zero_prefix = true; + } else if (has_zero_prefix) { + return error.NonCanonical; + } + saw_any_digits = true; + bytes[index] = try std.math.mul(u8, bytes[index], 10); + bytes[index] = try std.math.add(u8, bytes[index], c - '0'); + }, + else => return error.InvalidCharacter, + }; + if (index == 3 and saw_any_digits) return .{ + .bytes = bytes, + .port = port, + }; + return error.Incomplete; + } + + pub fn format(a: Ip4Address, w: *std.io.Writer) std.io.Writer.Error!void { + const bytes = &a.bytes; + try w.print("{d}.{d}.{d}.{d}:{d}", .{ bytes[0], bytes[1], bytes[2], bytes[3], a.port }); + } + + pub fn eql(a: Ip4Address, b: Ip4Address) bool { + const a_int: u32 = @bitCast(a.bytes); + const b_int: u32 = @bitCast(b.bytes); + return a.port == b.port and a_int == b_int; + } +}; + +pub const Ip6Address = struct { + /// Native endian + port: u16, + /// Big endian + bytes: [16]u8, + flowinfo: u32 = 0, + scope_id: u32 = 0, + + pub const ParseError = error{ + Overflow, + InvalidCharacter, + InvalidEnd, + InvalidIpv4Mapping, + Incomplete, + }; + + pub fn parse(buffer: []const u8, port: u16) ParseError!Ip6Address { + var result: Ip6Address = .{ + .port = port, + .bytes = undefined, + }; + var ip_slice: *[16]u8 = &result.bytes; + + var tail: [16]u8 = undefined; + + var x: u16 = 0; + var saw_any_digits = false; + var index: u8 = 0; + var scope_id = false; + var abbrv = false; + for (buffer, 0..) |c, i| { + if (scope_id) { + if (c >= '0' and c <= '9') { + const digit = c - '0'; + { + const ov = @mulWithOverflow(result.scope_id, 10); + if (ov[1] != 0) return error.Overflow; + result.scope_id = ov[0]; + } + { + const ov = @addWithOverflow(result.scope_id, digit); + if (ov[1] != 0) return error.Overflow; + result.scope_id = ov[0]; + } + } else { + return error.InvalidCharacter; + } + } else if (c == ':') { + if (!saw_any_digits) { + if (abbrv) return error.InvalidCharacter; // ':::' + if (i != 0) abbrv = true; + @memset(ip_slice[index..], 0); + ip_slice = tail[0..]; + index = 0; + continue; + } + if (index == 14) { + return error.InvalidEnd; + } + ip_slice[index] = @as(u8, @truncate(x >> 8)); + index += 1; + ip_slice[index] = @as(u8, @truncate(x)); + index += 1; + + x = 0; + saw_any_digits = false; + } else if (c == '%') { + if (!saw_any_digits) { + return error.InvalidCharacter; + } + scope_id = true; + saw_any_digits = false; + } else if (c == '.') { + if (!abbrv or ip_slice[0] != 0xff or ip_slice[1] != 0xff) { + // must start with '::ffff:' + return error.InvalidIpv4Mapping; + } + const start_index = std.mem.lastIndexOfScalar(u8, buffer[0..i], ':').? + 1; + const addr = (Ip4Address.parse(buffer[start_index..], 0) catch { + return error.InvalidIpv4Mapping; + }).bytes; + ip_slice = result.bytes[0..]; + ip_slice[10] = 0xff; + ip_slice[11] = 0xff; + + ip_slice[12] = addr[0]; + ip_slice[13] = addr[1]; + ip_slice[14] = addr[2]; + ip_slice[15] = addr[3]; + return result; + } else { + const digit = try std.fmt.charToDigit(c, 16); + { + const ov = @mulWithOverflow(x, 16); + if (ov[1] != 0) return error.Overflow; + x = ov[0]; + } + { + const ov = @addWithOverflow(x, digit); + if (ov[1] != 0) return error.Overflow; + x = ov[0]; + } + saw_any_digits = true; + } + } + + if (!saw_any_digits and !abbrv) { + return error.Incomplete; + } + if (!abbrv and index < 14) { + return error.Incomplete; + } + + if (index == 14) { + ip_slice[14] = @as(u8, @truncate(x >> 8)); + ip_slice[15] = @as(u8, @truncate(x)); + return result; + } else { + ip_slice[index] = @as(u8, @truncate(x >> 8)); + index += 1; + ip_slice[index] = @as(u8, @truncate(x)); + index += 1; + @memcpy(result.bytes[16 - index ..][0..index], ip_slice[0..index]); + return result; + } + } + + pub fn format(a: Ip6Address, w: *std.io.Writer) std.io.Writer.Error!void { + const bytes = &a.bytes; + if (std.mem.eql(u8, bytes[0..12], &[_]u8{ 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0xff, 0xff })) { + try w.print("[::ffff:{d}.{d}.{d}.{d}]:{d}", .{ + bytes[12], bytes[13], bytes[14], bytes[15], a.port, + }); + return; + } + const parts: [8]u16 = .{ + std.mem.readInt(u16, bytes[0..2], .big), + std.mem.readInt(u16, bytes[2..4], .big), + std.mem.readInt(u16, bytes[4..6], .big), + std.mem.readInt(u16, bytes[6..8], .big), + std.mem.readInt(u16, bytes[8..10], .big), + std.mem.readInt(u16, bytes[10..12], .big), + std.mem.readInt(u16, bytes[12..14], .big), + std.mem.readInt(u16, bytes[14..16], .big), + }; + + // Find the longest zero run + var longest_start: usize = 8; + var longest_len: usize = 0; + var current_start: usize = 0; + var current_len: usize = 0; + + for (parts, 0..) |part, i| { + if (part == 0) { + if (current_len == 0) { + current_start = i; + } + current_len += 1; + if (current_len > longest_len) { + longest_start = current_start; + longest_len = current_len; + } + } else { + current_len = 0; + } + } + + // Only compress if the longest zero run is 2 or more + if (longest_len < 2) { + longest_start = 8; + longest_len = 0; + } + + try w.writeAll("["); + var i: usize = 0; + var abbrv = false; + while (i < parts.len) : (i += 1) { + if (i == longest_start) { + // Emit "::" for the longest zero run + if (!abbrv) { + try w.writeAll(if (i == 0) "::" else ":"); + abbrv = true; + } + i += longest_len - 1; // Skip the compressed range + continue; + } + if (abbrv) { + abbrv = false; + } + try w.print("{x}", .{parts[i]}); + if (i != parts.len - 1) { + try w.writeAll(":"); + } + } + try w.print("]:{d}", .{a.port}); + } + + pub fn eql(a: Ip6Address, b: Ip6Address) bool { + return a.port == b.port and std.mem.eql(u8, &a.bytes, &b.bytes); + } +}; + +pub const Stream = struct { + /// Underlying platform-defined type which may or may not be + /// interchangeable with a file system file descriptor. + handle: Handle, + + pub const Handle = switch (native_os) { + .windows => std.windows.ws2_32.SOCKET, + else => std.posix.fd_t, + }; + + pub fn close(s: Stream, io: Io) void { + return io.vtable.close(io.userdata, s); + } + + pub const Reader = struct { + io: Io, + interface: Io.Reader, + stream: Stream, + err: ?Error, + + pub const Error = std.net.Stream.ReadError || Io.Cancelable || Io.Writer.Error || error{EndOfStream}; + + pub fn init(stream: Stream, buffer: []u8) Reader { + return .{ + .interface = .{ + .vtable = &.{ .stream = streamImpl }, + .buffer = buffer, + .seek = 0, + .end = 0, + }, + .stream = stream, + .err = null, + }; + } + + fn streamImpl(io_r: *Io.Reader, io_w: *Io.Writer, limit: Io.Limit) Io.Reader.StreamError!usize { + const r: *Reader = @alignCast(@fieldParentPtr("interface", io_r)); + const io = r.io; + return io.vtable.netRead(io.vtable.userdata, r.stream, io_w, limit); + } + }; + + pub const Writer = struct { + io: Io, + interface: Io.Writer, + stream: Stream, + err: ?Error = null, + + pub const Error = std.net.Stream.WriteError || Io.Cancelable; + + pub fn init(stream: Stream, buffer: []u8) Writer { + return .{ + .stream = stream, + .interface = .{ + .vtable = &.{ .drain = drain }, + .buffer = buffer, + }, + }; + } + + fn drain(io_w: *Io.Writer, data: []const []const u8, splat: usize) Io.Writer.Error!usize { + const w: *Writer = @alignCast(@fieldParentPtr("interface", io_w)); + const io = w.io; + const buffered = io_w.buffered(); + const n = try io.vtable.netWrite(io.vtable.userdata, w.stream, buffered, data, splat); + return io_w.consume(n); + } + }; + + pub fn reader(stream: Stream, buffer: []u8) Reader { + return .init(stream, buffer); + } + + pub fn writer(stream: Stream, buffer: []u8) Writer { + return .init(stream, buffer); + } +}; + +pub const Server = struct { + listen_address: IpAddress, + stream: Stream, + + pub const Connection = struct { + stream: Stream, + address: IpAddress, + }; + + pub fn deinit(s: *Server, io: Io) void { + s.stream.close(io); + s.* = undefined; + } + + pub const AcceptError = std.posix.AcceptError || Io.Cancelable; + + /// Blocks until a client connects to the server. The returned `Connection` has + /// an open stream. + pub fn accept(s: *Server, io: Io) AcceptError!Connection { + return io.vtable.accept(io, s); + } +};