std.net.Stream: implement Reader

This commit is contained in:
Andrew Kelley 2025-04-13 20:57:09 -07:00
parent f3d0fc7a66
commit c872a9fd49
2 changed files with 57 additions and 43 deletions

View File

@ -1631,7 +1631,7 @@ fn posReadVec(context: *anyopaque, data: []const []u8, offset: u64) anyerror!std
};
}
fn streamRead(
pub fn streamRead(
context: ?*anyopaque,
bw: *std.io.BufferedWriter,
limit: std.io.Reader.Limit,
@ -1644,7 +1644,7 @@ fn streamRead(
};
}
fn streamReadVec(context: ?*anyopaque, data: []const []u8) anyerror!std.io.Reader.Status {
pub fn streamReadVec(context: ?*anyopaque, data: []const []u8) anyerror!std.io.Reader.Status {
const handle = opaqueToHandle(context);
if (is_windows) {

View File

@ -1837,10 +1837,20 @@ pub const Stream = struct {
Unexpected,
};
pub const Reader = io.Reader(Stream, ReadError, read);
pub fn reader(self: Stream) Reader {
return .{ .context = self };
pub fn reader(stream: Stream) std.io.Reader {
return .{
.context = handleToOpaque(stream.handle),
.vtable = switch (native_os) {
.windows => &.{
.read = windows_read,
.readv = windows_readv,
},
else => &.{
.read = std.fs.File.streamRead,
.readv = std.fs.File.streamReadVec,
},
},
};
}
pub fn writer(stream: Stream) std.io.Writer {
@ -1859,46 +1869,50 @@ pub const Stream = struct {
};
}
pub fn read(self: Stream, buffer: []u8) ReadError!usize {
if (native_os == .windows) {
return windows.ReadFile(self.handle, buffer, null);
}
return posix.read(self.handle, buffer);
fn windows_read(
context: ?*anyopaque,
bw: *std.io.BufferedWriter,
limit: std.io.Reader.Limit,
) anyerror!std.io.Reader.Status {
const buf = limit.slice(try bw.writableSlice(1));
const status = try windows_readv(context, &.{buf});
bw.advance(status.len);
return status;
}
pub fn readv(s: Stream, iovecs: []const posix.iovec) ReadError!usize {
if (native_os == .windows) {
// TODO improve this to use ReadFileScatter
if (iovecs.len == 0) return @as(usize, 0);
const first = iovecs[0];
return windows.ReadFile(s.handle, first.base[0..first.len], null);
fn windows_readv(context: ?*anyopaque, data: []const []u8) anyerror!std.io.Reader.Status {
var iovecs: [max_buffers_len]windows.WSABUF = undefined;
var iovecs_i: usize = 0;
for (data) |d| {
// In case Windows checks pointer address before length, we must omit
// length-zero vectors.
if (d.len == 0) continue;
iovecs[iovecs_i] = .{ .buf = d.ptr, .len = d.len };
iovecs_i += 1;
if (iovecs_i >= iovecs.len) break;
}
return posix.readv(s.handle, iovecs);
}
/// Returns the number of bytes read. If the number read is smaller than
/// `buffer.len`, it means the stream reached the end. Reaching the end of
/// a stream is not an error condition.
pub fn readAll(s: Stream, buffer: []u8) ReadError!usize {
return readAtLeast(s, buffer, buffer.len);
}
/// Returns the number of bytes read, calling the underlying read function
/// the minimal number of times until the buffer has at least `len` bytes
/// filled. If the number read is less than `len` it means the stream
/// reached the end. Reaching the end of the stream is not an error
/// condition.
pub fn readAtLeast(s: Stream, buffer: []u8, len: usize) ReadError!usize {
assert(len <= buffer.len);
var index: usize = 0;
while (index < len) {
const amt = try s.read(buffer[index..]);
if (amt == 0) break;
index += amt;
}
return index;
const bufs = iovecs[0..iovecs_i];
if (bufs.len == 0) return .{}; // Prevent false positive end detection on empty `data`.
const handle = opaqueToHandle(context);
var n: u32 = undefined;
var flags: u32 = 0;
const rc = windows.ws2_32.WSARecvFrom(handle, bufs.ptr, bufs.len, &n, &flags, null, null, null, null);
if (rc != 0) switch (windows.ws2_32.WSAGetLastError()) {
.WSAECONNRESET => return error.ConnectionResetByPeer,
.WSAEFAULT => unreachable, // a pointer is not completely contained in user address space.
.WSAEINPROGRESS, .WSAEINTR => unreachable, // deprecated and removed in WSA 2.2
.WSAEINVAL => return error.SocketNotBound,
.WSAEMSGSIZE => return error.MessageTooBig,
.WSAENETDOWN => return error.NetworkSubsystemFailed,
.WSAENETRESET => return error.ConnectionResetByPeer,
.WSAENOTCONN => return error.SocketNotConnected,
.WSAEWOULDBLOCK => return error.WouldBlock,
.WSANOTINITIALISED => unreachable, // WSAStartup must be called before this function
.WSA_IO_PENDING => unreachable, // not using overlapped I/O
.WSA_OPERATION_ABORTED => unreachable, // not using overlapped I/O
else => |err| return windows.unexpectedWSAError(err),
};
return .{ .len = n, .end = n == 0 };
}
fn windows_writeSplat(context: *anyopaque, data: []const []const u8, splat: usize) anyerror!usize {