diff --git a/lib/std/Io.zig b/lib/std/Io.zig index 677d197422..8d6be76173 100644 --- a/lib/std/Io.zig +++ b/lib/std/Io.zig @@ -680,8 +680,9 @@ pub const VTable = struct { netConnectUnix: *const fn (?*anyopaque, net.UnixAddress) net.UnixAddress.ConnectError!net.Socket.Handle, netSend: *const fn (?*anyopaque, net.Socket.Handle, []net.OutgoingMessage, net.SendFlags) struct { ?net.Socket.SendError, usize }, netReceive: *const fn (?*anyopaque, net.Socket.Handle, message_buffer: []net.IncomingMessage, data_buffer: []u8, net.ReceiveFlags, Timeout) struct { ?net.Socket.ReceiveTimeoutError, usize }, - netRead: *const fn (?*anyopaque, src: net.Stream, data: [][]u8) net.Stream.Reader.Error!usize, - netWrite: *const fn (?*anyopaque, dest: net.Stream, header: []const u8, data: []const []const u8, splat: usize) net.Stream.Writer.Error!usize, + /// Returns 0 on end of stream. + netRead: *const fn (?*anyopaque, src: net.Socket.Handle, data: [][]u8) net.Stream.Reader.Error!usize, + netWrite: *const fn (?*anyopaque, dest: net.Socket.Handle, header: []const u8, data: []const []const u8, splat: usize) net.Stream.Writer.Error!usize, netClose: *const fn (?*anyopaque, handle: net.Socket.Handle) void, netInterfaceNameResolve: *const fn (?*anyopaque, *const net.Interface.Name) net.Interface.Name.ResolveError!net.Interface, netInterfaceName: *const fn (?*anyopaque, net.Interface) net.Interface.NameError!net.Interface.Name, diff --git a/lib/std/Io/Threaded.zig b/lib/std/Io/Threaded.zig index fa3c63d609..2c05ff279c 100644 --- a/lib/std/Io/Threaded.zig +++ b/lib/std/Io/Threaded.zig @@ -1986,9 +1986,8 @@ fn netAcceptPosix(userdata: ?*anyopaque, listen_fd: Io.net.Socket.Handle) Io.net } }; } -fn netReadPosix(userdata: ?*anyopaque, stream: Io.net.Stream, data: [][]u8) Io.net.Stream.Reader.Error!usize { +fn netReadPosix(userdata: ?*anyopaque, fd: Io.net.Socket.Handle, data: [][]u8) Io.net.Stream.Reader.Error!usize { const pool: *Pool = @ptrCast(@alignCast(userdata)); - const fd = stream.socket.handle; var iovecs_buffer: [max_iovecs_len]posix.iovec = undefined; var i: usize = 0; @@ -2006,11 +2005,9 @@ fn netReadPosix(userdata: ?*anyopaque, stream: Io.net.Stream, data: [][]u8) Io.n try pool.checkCancel(); var n: usize = undefined; switch (std.os.wasi.fd_read(fd, dest.ptr, dest.len, &n)) { - .SUCCESS => { - if (n == 0) return error.EndOfStream; - return n; - }, + .SUCCESS => return n, .INTR => continue, + .INVAL => |err| return errnoBug(err), .FAULT => |err| return errnoBug(err), .AGAIN => |err| return errnoBug(err), @@ -2029,12 +2026,9 @@ fn netReadPosix(userdata: ?*anyopaque, stream: Io.net.Stream, data: [][]u8) Io.n try pool.checkCancel(); const rc = posix.system.readv(fd, dest.ptr, @intCast(dest.len)); switch (posix.errno(rc)) { - .SUCCESS => { - const n: usize = @intCast(rc); - if (n == 0) return error.EndOfStream; - return n; - }, + .SUCCESS => return @intCast(rc), .INTR => continue, + .INVAL => |err| return errnoBug(err), .FAULT => |err| return errnoBug(err), .AGAIN => |err| return errnoBug(err), @@ -2359,7 +2353,7 @@ fn netReceive( fn netWritePosix( userdata: ?*anyopaque, - stream: Io.net.Stream, + fd: Io.net.Socket.Handle, header: []const u8, data: []const []const u8, splat: usize, @@ -2406,7 +2400,7 @@ fn netWritePosix( }, }; const flags = posix.MSG.NOSIGNAL; - return posix.sendmsg(stream.socket.handle, &msg, flags); + return posix.sendmsg(fd, &msg, flags); } fn addBuf(v: []posix.iovec_const, i: *@FieldType(posix.msghdr_const, "iovlen"), bytes: []const u8) void { diff --git a/lib/std/Io/net.zig b/lib/std/Io/net.zig index d76a7a9b34..dca4dda0a5 100644 --- a/lib/std/Io/net.zig +++ b/lib/std/Io/net.zig @@ -1076,6 +1076,8 @@ pub const Socket = struct { pub const Stream = struct { socket: Socket, + const max_iovecs_len = 8; + pub fn close(s: *Stream, io: Io) void { io.vtable.netClose(io.userdata, s.socket.handle); s.* = undefined; @@ -1097,7 +1099,6 @@ pub const Stream = struct { /// from it. AccessDenied, NetworkDown, - EndOfStream, } || Io.Cancelable || Io.UnexpectedError; pub fn init(stream: Stream, io: Io, buffer: []u8) Reader { @@ -1128,10 +1129,22 @@ pub const Stream = struct { fn readVec(io_r: *Io.Reader, data: [][]u8) Io.Reader.Error!usize { const r: *Reader = @alignCast(@fieldParentPtr("interface", io_r)); const io = r.io; - return io.vtable.netRead(io.userdata, r.stream, data) catch |err| { + var iovecs_buffer: [max_iovecs_len][]u8 = undefined; + const dest_n, const data_size = try io_r.writableVector(&iovecs_buffer, data); + const dest = iovecs_buffer[0..dest_n]; + assert(dest[0].len > 0); + const n = io.vtable.netRead(io.userdata, r.stream.socket.handle, dest) catch |err| { r.err = err; return error.ReadFailed; }; + if (n == 0) { + return error.EndOfStream; + } + if (n > data_size) { + r.interface.end += n - data_size; + return data_size; + } + return n; } }; @@ -1166,7 +1179,8 @@ pub const Stream = struct { const w: *Writer = @alignCast(@fieldParentPtr("interface", io_w)); const io = w.io; const buffered = io_w.buffered(); - const n = io.vtable.netWrite(io.userdata, w.stream, buffered, data, splat) catch |err| { + const handle = w.stream.socket.handle; + const n = io.vtable.netWrite(io.userdata, handle, buffered, data, splat) catch |err| { w.err = err; return error.WriteFailed; };