std.io.poll: update for BufferedReader

only posix is updated so far

also implement `BufferedReader.readVec`
This commit is contained in:
Andrew Kelley 2025-04-19 21:53:52 -07:00
parent 9fe0ce377c
commit 24441b184f
4 changed files with 277 additions and 100 deletions

View File

@ -320,69 +320,19 @@ fn buildWasmBinary(
try sendMessage(child.stdin.?, .update);
try sendMessage(child.stdin.?, .exit);
const Header = std.zig.Server.Message.Header;
var result: ?Cache.Path = null;
var result_error_bundle = std.zig.ErrorBundle.empty;
const stdout = poller.fifo(.stdout);
poll: while (true) {
while (stdout.readableLength() < @sizeOf(Header)) {
if (!(try poller.poll())) break :poll;
}
const header = stdout.reader().readStruct(Header) catch unreachable;
while (stdout.readableLength() < header.bytes_len) {
if (!(try poller.poll())) break :poll;
}
const body = stdout.readableSliceOfLen(header.bytes_len);
switch (header.tag) {
.zig_version => {
if (!std.mem.eql(u8, builtin.zig_version_string, body)) {
return error.ZigProtocolVersionMismatch;
}
},
.error_bundle => {
const EbHdr = std.zig.Server.Message.ErrorBundle;
const eb_hdr = @as(*align(1) const EbHdr, @ptrCast(body));
const extra_bytes =
body[@sizeOf(EbHdr)..][0 .. @sizeOf(u32) * eb_hdr.extra_len];
const string_bytes =
body[@sizeOf(EbHdr) + extra_bytes.len ..][0..eb_hdr.string_bytes_len];
// TODO: use @ptrCast when the compiler supports it
const unaligned_extra = std.mem.bytesAsSlice(u32, extra_bytes);
const extra_array = try arena.alloc(u32, unaligned_extra.len);
@memcpy(extra_array, unaligned_extra);
result_error_bundle = .{
.string_bytes = try arena.dupe(u8, string_bytes),
.extra = extra_array,
};
},
.emit_digest => {
const EmitDigest = std.zig.Server.Message.EmitDigest;
const emit_digest = @as(*align(1) const EmitDigest, @ptrCast(body));
if (!emit_digest.flags.cache_hit) {
std.log.info("source changes detected; rebuilt wasm component", .{});
}
const digest = body[@sizeOf(EmitDigest)..][0..Cache.bin_digest_len];
result = .{
.root_dir = Cache.Directory.cwd(),
.sub_path = try std.fs.path.join(arena, &.{
context.global_cache_path, "o" ++ std.fs.path.sep_str ++ Cache.binToHex(digest.*),
}),
};
},
else => {}, // ignore other messages
}
stdout.discard(body.len);
while (true) {
receiveWasmMessage(arena, context, poller.reader(.stdout), &result, &result_error_bundle) catch |err| switch (err) {
error.EndOfStream => break,
error.ReadFailed => if (!(try poller.poll())) break,
else => |e| return e,
};
}
const stderr = poller.fifo(.stderr);
if (stderr.readableLength() > 0) {
const owned_stderr = try stderr.toOwnedSlice();
defer gpa.free(owned_stderr);
std.debug.print("{s}", .{owned_stderr});
if (poller.reader(.stderr).buffer.len > 0) {
std.debug.print("{s}", .{poller.reader(.stderr).bufferContents()});
}
// Send EOF to stdin.
@ -426,6 +376,53 @@ fn buildWasmBinary(
};
}
fn receiveWasmMessage(
arena: Allocator,
context: *Context,
br: *std.io.BufferedReader,
result: *?Cache.Path,
result_error_bundle: *std.zig.ErrorBundle,
) !void {
// Ensure that we will be able to read the entire message without blocking.
const header = try br.peekStructEndian(std.zig.Server.Message.Header, .little);
try br.fill(@sizeOf(std.zig.Server.Message.Header) + header.bytes_len);
br.toss(@sizeOf(std.zig.Server.Message.Header));
switch (header.tag) {
.zig_version => {
const body = try br.take(header.bytes_len);
if (!std.mem.eql(u8, builtin.zig_version_string, body)) {
return error.ZigProtocolVersionMismatch;
}
},
.error_bundle => {
const eb_hdr = try br.takeStructEndian(std.zig.Server.Message.ErrorBundle, .little);
const extra_array = try br.readArrayEndianAlloc(arena, u32, eb_hdr.extra_len, .little);
const string_bytes = try br.readAlloc(arena, eb_hdr.string_bytes_len);
result_error_bundle.* = .{
.string_bytes = string_bytes,
.extra = extra_array,
};
},
.emit_digest => {
const emit_digest = try br.takeStructEndian(std.zig.Server.Message.EmitDigest, .little);
if (!emit_digest.flags.cache_hit) {
std.log.info("source changes detected; rebuilt wasm component", .{});
}
const digest = try br.takeArray(Cache.bin_digest_len);
result.* = .{
.root_dir = Cache.Directory.cwd(),
.sub_path = try std.fs.path.join(arena, &.{
context.global_cache_path, "o" ++ std.fs.path.sep_str ++ Cache.binToHex(digest.*),
}),
};
},
else => {
// Ignore other messages.
try br.discard(header.bytes_len);
},
}
}
fn sendMessage(file: std.fs.File, tag: std.zig.Client.Message.Tag) !void {
const header: std.zig.Client.Message.Header = .{
.tag = tag,

View File

@ -1341,14 +1341,43 @@ pub const Writer = struct {
}
};
/// Defaults to positional reading; falls back to streaming.
///
/// Positional is more threadsafe, since the global seek position is not
/// affected.
pub fn reader(file: File) Reader {
return .{ .file = file };
}
/// Positional is more threadsafe, since the global seek position is not
/// affected, but when such syscalls are not available, preemptively choosing
/// `Reader.Mode.streaming` will skip a failed syscall.
pub fn readerStreaming(file: File) Reader {
return .{
.file = file,
.mode = .streaming,
.seek_err = error.Unseekable,
};
}
/// Defaults to positional reading; falls back to streaming.
///
/// Positional is more threadsafe, since the global seek position is not
/// affected.
pub fn writer(file: File) Writer {
return .{ .file = file };
}
/// Positional is more threadsafe, since the global seek position is not
/// affected, but when such syscalls are not available, preemptively choosing
/// `Writer.Mode.streaming` will skip a failed syscall.
pub fn writerStreaming(file: File) Writer {
return .{
.file = file,
.mode = .streaming,
};
}
const range_off: windows.LARGE_INTEGER = 0;
const range_len: windows.LARGE_INTEGER = 1;

View File

@ -46,54 +46,57 @@ pub const BufferedAtomicFile = @import("io/buffered_atomic_file.zig").BufferedAt
pub const tty = @import("io/tty.zig");
pub fn poll(
allocator: Allocator,
gpa: Allocator,
comptime StreamEnum: type,
files: PollFiles(StreamEnum),
) Poller(StreamEnum) {
const enum_fields = @typeInfo(StreamEnum).@"enum".fields;
var result: Poller(StreamEnum) = undefined;
if (is_windows) result.windows = .{
.first_read_done = false,
.overlapped = [1]windows.OVERLAPPED{
mem.zeroes(windows.OVERLAPPED),
} ** enum_fields.len,
.small_bufs = undefined,
.active = .{
.count = 0,
.handles_buf = undefined,
.stream_map = undefined,
},
var result: Poller(StreamEnum) = .{
.gpa = gpa,
.readers = undefined,
.poll_fds = undefined,
.windows = if (is_windows) .{
.first_read_done = false,
.overlapped = [1]windows.OVERLAPPED{
mem.zeroes(windows.OVERLAPPED),
} ** enum_fields.len,
.small_bufs = undefined,
.active = .{
.count = 0,
.handles_buf = undefined,
.stream_map = undefined,
},
} else {},
};
inline for (0..enum_fields.len) |i| {
result.fifos[i] = .{
.allocator = allocator,
.buf = &.{},
.head = 0,
.count = 0,
inline for (enum_fields, 0..) |field, i| {
result.readers[i] = .{
.unbuffered_reader = .failing,
.buffer = &.{},
.end = 0,
.seek = 0,
};
if (is_windows) {
result.windows.active.handles_buf[i] = @field(files, enum_fields[i].name).handle;
result.windows.active.handles_buf[i] = @field(files, field.name).handle;
} else {
result.poll_fds[i] = .{
.fd = @field(files, enum_fields[i].name).handle,
.fd = @field(files, field.name).handle,
.events = posix.POLL.IN,
.revents = undefined,
};
}
}
return result;
}
pub const PollFifo = std.fifo.LinearFifo(u8, .Dynamic);
pub fn Poller(comptime StreamEnum: type) type {
return struct {
const enum_fields = @typeInfo(StreamEnum).@"enum".fields;
const PollFd = if (is_windows) void else posix.pollfd;
fifos: [enum_fields.len]PollFifo,
gpa: Allocator,
readers: [enum_fields.len]BufferedReader,
poll_fds: [enum_fields.len]PollFd,
windows: if (is_windows) struct {
first_read_done: bool,
@ -105,7 +108,7 @@ pub fn Poller(comptime StreamEnum: type) type {
stream_map: [enum_fields.len]StreamEnum,
pub fn removeAt(self: *@This(), index: u32) void {
std.debug.assert(index < self.count);
assert(index < self.count);
for (index + 1..self.count) |i| {
self.handles_buf[i - 1] = self.handles_buf[i];
self.stream_map[i - 1] = self.stream_map[i];
@ -118,13 +121,14 @@ pub fn Poller(comptime StreamEnum: type) type {
const Self = @This();
pub fn deinit(self: *Self) void {
const gpa = self.gpa;
if (is_windows) {
// cancel any pending IO to prevent clobbering OVERLAPPED value
for (self.windows.active.handles_buf[0..self.windows.active.count]) |h| {
_ = windows.kernel32.CancelIo(h);
}
}
inline for (&self.fifos) |*q| q.deinit();
inline for (&self.readers) |*br| gpa.free(br.buffer);
self.* = undefined;
}
@ -144,8 +148,8 @@ pub fn Poller(comptime StreamEnum: type) type {
}
}
pub inline fn fifo(self: *Self, comptime which: StreamEnum) *PollFifo {
return &self.fifos[@intFromEnum(which)];
pub inline fn reader(self: *Self, comptime which: StreamEnum) *BufferedReader {
return &self.readers[@intFromEnum(which)];
}
fn pollWindows(self: *Self, nanoseconds: ?u64) !bool {
@ -236,6 +240,7 @@ pub fn Poller(comptime StreamEnum: type) type {
}
fn pollPosix(self: *Self, nanoseconds: ?u64) !bool {
const gpa = self.gpa;
// We ask for ensureUnusedCapacity with this much extra space. This
// has more of an effect on small reads because once the reads
// start to get larger the amount of space an ArrayList will
@ -255,18 +260,18 @@ pub fn Poller(comptime StreamEnum: type) type {
}
var keep_polling = false;
inline for (&self.poll_fds, &self.fifos) |*poll_fd, *q| {
inline for (&self.poll_fds, &self.readers) |*poll_fd, *br| {
// Try reading whatever is available before checking the error
// conditions.
// It's still possible to read after a POLL.HUP is received,
// always check if there's some data waiting to be read first.
if (poll_fd.revents & posix.POLL.IN != 0) {
const buf = try q.writableWithSize(bump_amt);
const buf = try br.writableSliceGreedyAlloc(gpa, bump_amt);
const amt = posix.read(poll_fd.fd, buf) catch |err| switch (err) {
error.BrokenPipe => 0, // Handle the same as EOF.
else => |e| return e,
};
q.update(amt);
br.advanceBufferEnd(amt);
if (amt == 0) {
// Remove the fd when the EOF condition is met.
poll_fd.fd = -1;
@ -297,14 +302,14 @@ var win_dummy_bytes_read: u32 = undefined;
fn windowsAsyncReadToFifoAndQueueSmallRead(
handle: windows.HANDLE,
overlapped: *windows.OVERLAPPED,
fifo: *PollFifo,
br: *BufferedReader,
small_buf: *[128]u8,
bump_amt: usize,
) !enum { empty, populated, closed_populated, closed } {
var read_any_data = false;
while (true) {
const fifo_read_pending = while (true) {
const buf = try fifo.writableWithSize(bump_amt);
const buf = try br.writableWithSize(bump_amt);
const buf_len = math.cast(u32, buf.len) orelse math.maxInt(u32);
if (0 == windows.kernel32.ReadFile(
@ -326,7 +331,7 @@ fn windowsAsyncReadToFifoAndQueueSmallRead(
};
read_any_data = true;
fifo.update(num_bytes_read);
br.update(num_bytes_read);
if (num_bytes_read == buf_len) {
// We filled the buffer, so there's probably more data available.
@ -356,7 +361,7 @@ fn windowsAsyncReadToFifoAndQueueSmallRead(
.aborted => break :cancel_read,
};
read_any_data = true;
fifo.update(num_bytes_read);
br.update(num_bytes_read);
}
// Try to queue the 1-byte read.
@ -381,7 +386,7 @@ fn windowsAsyncReadToFifoAndQueueSmallRead(
.closed => return if (read_any_data) .closed_populated else .closed,
.aborted => unreachable,
};
try fifo.write(small_buf[0..num_bytes_read]);
try br.write(small_buf[0..num_bytes_read]);
read_any_data = true;
}
}

View File

@ -6,6 +6,7 @@ const assert = std.debug.assert;
const testing = std.testing;
const BufferedWriter = std.io.BufferedWriter;
const Reader = std.io.Reader;
const Allocator = std.mem.Allocator;
const BufferedReader = @This();
@ -46,13 +47,17 @@ pub fn reader(br: *BufferedReader) Reader {
};
}
pub fn readVec(br: *BufferedReader, data: []const []u8) Reader.Error!usize {
return passthruReadVec(br, data);
}
fn passthruRead(ctx: ?*anyopaque, bw: *BufferedWriter, limit: Reader.Limit) Reader.RwError!usize {
const br: *BufferedReader = @alignCast(@ptrCast(ctx));
const buffer = br.buffer[0..br.end];
const buffered = buffer[br.seek..];
const limited = buffered[0..limit.min(buffered.len)];
if (limited.len > 0) {
const n = try bw.writeSplat(limited, 1);
const n = try bw.write(limited);
br.seek += n;
return n;
}
@ -61,9 +66,44 @@ fn passthruRead(ctx: ?*anyopaque, bw: *BufferedWriter, limit: Reader.Limit) Read
fn passthruReadVec(ctx: ?*anyopaque, data: []const []u8) Reader.Error!usize {
const br: *BufferedReader = @alignCast(@ptrCast(ctx));
_ = br;
_ = data;
@panic("TODO");
var total: usize = 0;
for (data, 0..) |buf, i| {
const buffered = br.buffer[br.seek..br.end];
const copy_len = @min(buffered.len, buf.len);
@memcpy(buf[0..copy_len], buffered[0..copy_len]);
total += copy_len;
br.seek += copy_len;
if (copy_len < buf.len) {
br.seek = 0;
br.end = 0;
var vecs: [8][]u8 = undefined; // Arbitrarily chosen value.
vecs[0] = buf[copy_len..];
const vecs_len: usize = @min(vecs.len, data.len - i);
var vec_data_len: usize = vecs[0].len;
for (&vecs[1..vecs_len], data[i + 1 ..][0 .. vecs_len - 1]) |*v, d| {
vec_data_len += d.len;
v.* = d;
}
if (vecs_len < vecs.len) {
vecs[vecs_len] = br.buffer;
const n = try br.unbuffered_reader.readVec(vecs[0 .. vecs_len + 1]);
total += @min(n, vec_data_len);
br.end = n -| vec_data_len;
return total;
}
if (vecs[vecs.len - 1].len >= br.buffer.len) {
total += try br.unbuffered_reader.readVec(&vecs);
return total;
}
vec_data_len -= vecs[vecs.len - 1].len;
vecs[vecs.len - 1] = br.buffer;
const n = try br.unbuffered_reader.readVec(&vecs);
total += @min(n, vec_data_len);
br.end = n -| vec_data_len;
return total;
}
}
return total;
}
pub fn seekBy(br: *BufferedReader, seek_by: i64) !void {
@ -147,7 +187,7 @@ pub fn take(br: *BufferedReader, n: usize) Reader.Error![]u8 {
}
/// Returns the next `n` bytes from `unbuffered_reader` as an array, filling
/// the buffer as necessary.
/// the buffer as necessary and advancing the seek position `n` bytes.
///
/// Asserts that the `BufferedReader` was initialized with a buffer capacity at
/// least as big as `n`.
@ -161,6 +201,22 @@ pub fn takeArray(br: *BufferedReader, comptime n: usize) Reader.Error!*[n]u8 {
return (try br.take(n))[0..n];
}
/// Returns the next `n` bytes from `unbuffered_reader` as an array, filling
/// the buffer as necessary, without advancing the seek position.
///
/// Asserts that the `BufferedReader` was initialized with a buffer capacity at
/// least as big as `n`.
///
/// If there are fewer than `n` bytes left in the stream, `error.EndOfStream`
/// is returned instead.
///
/// See also:
/// * `peek`
/// * `takeArray`
pub fn peekArray(br: *BufferedReader, comptime n: usize) Reader.Error!*[n]u8 {
return (try br.peek(n))[0..n];
}
/// Skips the next `n` bytes from the stream, advancing the seek position.
///
/// Unlike `toss` which is infallible, in this function `n` can be any amount.
@ -255,6 +311,31 @@ pub fn readShort(br: *BufferedReader, buffer: []u8) Reader.ShortError!usize {
@panic("TODO");
}
/// The function is inline to avoid the dead code in case `endian` is
/// comptime-known and matches host endianness.
pub inline fn readArrayEndianAlloc(
br: *BufferedReader,
allocator: Allocator,
Elem: type,
len: usize,
endian: std.builtin.Endian,
) ReadAllocError![]Elem {
const dest = try allocator.alloc(Elem, len);
errdefer allocator.free(dest);
try read(br, @ptrCast(dest));
if (native_endian != endian) std.mem.byteSwapAllFields(Elem, dest);
return dest;
}
pub const ReadAllocError = Reader.Error || Allocator.Error;
pub fn readAlloc(br: *BufferedReader, allocator: Allocator, len: usize) ReadAllocError![]u8 {
const dest = try allocator.alloc(u8, len);
errdefer allocator.free(dest);
try read(br, dest);
return dest;
}
pub const DelimiterInclusiveError = error{
/// See the `Reader` implementation for detailed diagnostics.
ReadFailed,
@ -498,12 +579,29 @@ pub fn takeVarInt(br: *BufferedReader, comptime Int: type, endian: std.builtin.E
}
/// Asserts the buffer was initialized with a capacity at least `@sizeOf(T)`.
///
/// Advances the seek position.
///
/// See also:
/// * `peekStruct`
pub fn takeStruct(br: *BufferedReader, comptime T: type) Reader.Error!*align(1) T {
// Only extern and packed structs have defined in-memory layout.
comptime assert(@typeInfo(T).@"struct".layout != .auto);
return @ptrCast(try br.takeArray(@sizeOf(T)));
}
/// Asserts the buffer was initialized with a capacity at least `@sizeOf(T)`.
///
/// Does not advance the seek position.
///
/// See also:
/// * `takeStruct`
pub fn peekStruct(br: *BufferedReader, comptime T: type) Reader.Error!*align(1) T {
// Only extern and packed structs have defined in-memory layout.
comptime assert(@typeInfo(T).@"struct".layout != .auto);
return @ptrCast(try br.peekArray(@sizeOf(T)));
}
/// Asserts the buffer was initialized with a capacity at least `@sizeOf(T)`.
///
/// This function is inline to avoid referencing `std.mem.byteSwapAllFields`
@ -514,6 +612,16 @@ pub inline fn takeStructEndian(br: *BufferedReader, comptime T: type, endian: st
return res;
}
/// Asserts the buffer was initialized with a capacity at least `@sizeOf(T)`.
///
/// This function is inline to avoid referencing `std.mem.byteSwapAllFields`
/// when `endian` is comptime-known and matches the host endianness.
pub inline fn peekStructEndian(br: *BufferedReader, comptime T: type, endian: std.builtin.Endian) Reader.Error!T {
var res = (try br.peekStruct(T)).*;
if (native_endian != endian) std.mem.byteSwapAllFields(T, &res);
return res;
}
/// Reads an integer with the same size as the given enum's tag type. If the
/// integer matches an enum tag, casts the integer to the enum tag and returns
/// it. Otherwise, returns `error.InvalidEnumTag`.
@ -536,6 +644,28 @@ pub fn takeLeb128(br: *BufferedReader, comptime Result: type) TakeLeb128Error!Re
} }))) orelse error.Overflow;
}
/// Returns a slice into the unused capacity of `buffer` with at least
/// `min_len` bytes, extending `buffer` by resizing it with `gpa` as necessary.
///
/// After calling this function, typically the caller will follow up with a
/// call to `advanceBufferEnd` to report the actual number of bytes buffered.
pub fn writableSliceGreedyAlloc(
br: *BufferedReader,
allocator: Allocator,
min_len: usize,
) error{OutOfMemory}![]u8 {
_ = br;
_ = allocator;
_ = min_len;
@panic("TODO");
}
/// After writing directly into the unused capacity of `buffer`, this function
/// updates `end` so that users of `BufferedReader` can receive the data.
pub fn advanceBufferEnd(br: *BufferedReader, n: usize) void {
br.end += n;
}
fn takeMultipleOf7Leb128(br: *BufferedReader, comptime Result: type) TakeLeb128Error!Result {
const result_info = @typeInfo(Result).int;
comptime assert(result_info.bits % 7 == 0);
@ -599,6 +729,10 @@ test takeArray {
return error.Unimplemented;
}
test peekArray {
return error.Unimplemented;
}
test discard {
var br: BufferedReader = undefined;
br.initFixed("foobar");
@ -684,10 +818,18 @@ test takeStruct {
return error.Unimplemented;
}
test peekStruct {
return error.Unimplemented;
}
test takeStructEndian {
return error.Unimplemented;
}
test peekStructEndian {
return error.Unimplemented;
}
test takeEnum {
return error.Unimplemented;
}
@ -699,3 +841,7 @@ test takeLeb128 {
test readShort {
return error.Unimplemented;
}
test readVec {
return error.Unimplemented;
}