From 24441b184f989dc889c33b6422ec5ddd8f385c5e Mon Sep 17 00:00:00 2001 From: Andrew Kelley Date: Sat, 19 Apr 2025 21:53:52 -0700 Subject: [PATCH] std.io.poll: update for BufferedReader only posix is updated so far also implement `BufferedReader.readVec` --- lib/compiler/std-docs.zig | 113 ++++++++++++------------ lib/std/fs/File.zig | 29 +++++++ lib/std/io.zig | 79 +++++++++-------- lib/std/io/BufferedReader.zig | 156 ++++++++++++++++++++++++++++++++-- 4 files changed, 277 insertions(+), 100 deletions(-) diff --git a/lib/compiler/std-docs.zig b/lib/compiler/std-docs.zig index 9d6c030f6f..ae1fcd6f96 100644 --- a/lib/compiler/std-docs.zig +++ b/lib/compiler/std-docs.zig @@ -320,69 +320,19 @@ fn buildWasmBinary( try sendMessage(child.stdin.?, .update); try sendMessage(child.stdin.?, .exit); - const Header = std.zig.Server.Message.Header; var result: ?Cache.Path = null; var result_error_bundle = std.zig.ErrorBundle.empty; - const stdout = poller.fifo(.stdout); - - poll: while (true) { - while (stdout.readableLength() < @sizeOf(Header)) { - if (!(try poller.poll())) break :poll; - } - const header = stdout.reader().readStruct(Header) catch unreachable; - while (stdout.readableLength() < header.bytes_len) { - if (!(try poller.poll())) break :poll; - } - const body = stdout.readableSliceOfLen(header.bytes_len); - - switch (header.tag) { - .zig_version => { - if (!std.mem.eql(u8, builtin.zig_version_string, body)) { - return error.ZigProtocolVersionMismatch; - } - }, - .error_bundle => { - const EbHdr = std.zig.Server.Message.ErrorBundle; - const eb_hdr = @as(*align(1) const EbHdr, @ptrCast(body)); - const extra_bytes = - body[@sizeOf(EbHdr)..][0 .. @sizeOf(u32) * eb_hdr.extra_len]; - const string_bytes = - body[@sizeOf(EbHdr) + extra_bytes.len ..][0..eb_hdr.string_bytes_len]; - // TODO: use @ptrCast when the compiler supports it - const unaligned_extra = std.mem.bytesAsSlice(u32, extra_bytes); - const extra_array = try arena.alloc(u32, unaligned_extra.len); - @memcpy(extra_array, unaligned_extra); - result_error_bundle = .{ - .string_bytes = try arena.dupe(u8, string_bytes), - .extra = extra_array, - }; - }, - .emit_digest => { - const EmitDigest = std.zig.Server.Message.EmitDigest; - const emit_digest = @as(*align(1) const EmitDigest, @ptrCast(body)); - if (!emit_digest.flags.cache_hit) { - std.log.info("source changes detected; rebuilt wasm component", .{}); - } - const digest = body[@sizeOf(EmitDigest)..][0..Cache.bin_digest_len]; - result = .{ - .root_dir = Cache.Directory.cwd(), - .sub_path = try std.fs.path.join(arena, &.{ - context.global_cache_path, "o" ++ std.fs.path.sep_str ++ Cache.binToHex(digest.*), - }), - }; - }, - else => {}, // ignore other messages - } - - stdout.discard(body.len); + while (true) { + receiveWasmMessage(arena, context, poller.reader(.stdout), &result, &result_error_bundle) catch |err| switch (err) { + error.EndOfStream => break, + error.ReadFailed => if (!(try poller.poll())) break, + else => |e| return e, + }; } - const stderr = poller.fifo(.stderr); - if (stderr.readableLength() > 0) { - const owned_stderr = try stderr.toOwnedSlice(); - defer gpa.free(owned_stderr); - std.debug.print("{s}", .{owned_stderr}); + if (poller.reader(.stderr).buffer.len > 0) { + std.debug.print("{s}", .{poller.reader(.stderr).bufferContents()}); } // Send EOF to stdin. @@ -426,6 +376,53 @@ fn buildWasmBinary( }; } +fn receiveWasmMessage( + arena: Allocator, + context: *Context, + br: *std.io.BufferedReader, + result: *?Cache.Path, + result_error_bundle: *std.zig.ErrorBundle, +) !void { + // Ensure that we will be able to read the entire message without blocking. + const header = try br.peekStructEndian(std.zig.Server.Message.Header, .little); + try br.fill(@sizeOf(std.zig.Server.Message.Header) + header.bytes_len); + br.toss(@sizeOf(std.zig.Server.Message.Header)); + switch (header.tag) { + .zig_version => { + const body = try br.take(header.bytes_len); + if (!std.mem.eql(u8, builtin.zig_version_string, body)) { + return error.ZigProtocolVersionMismatch; + } + }, + .error_bundle => { + const eb_hdr = try br.takeStructEndian(std.zig.Server.Message.ErrorBundle, .little); + const extra_array = try br.readArrayEndianAlloc(arena, u32, eb_hdr.extra_len, .little); + const string_bytes = try br.readAlloc(arena, eb_hdr.string_bytes_len); + result_error_bundle.* = .{ + .string_bytes = string_bytes, + .extra = extra_array, + }; + }, + .emit_digest => { + const emit_digest = try br.takeStructEndian(std.zig.Server.Message.EmitDigest, .little); + if (!emit_digest.flags.cache_hit) { + std.log.info("source changes detected; rebuilt wasm component", .{}); + } + const digest = try br.takeArray(Cache.bin_digest_len); + result.* = .{ + .root_dir = Cache.Directory.cwd(), + .sub_path = try std.fs.path.join(arena, &.{ + context.global_cache_path, "o" ++ std.fs.path.sep_str ++ Cache.binToHex(digest.*), + }), + }; + }, + else => { + // Ignore other messages. + try br.discard(header.bytes_len); + }, + } +} + fn sendMessage(file: std.fs.File, tag: std.zig.Client.Message.Tag) !void { const header: std.zig.Client.Message.Header = .{ .tag = tag, diff --git a/lib/std/fs/File.zig b/lib/std/fs/File.zig index 7ff544aa70..7dd3d2d6af 100644 --- a/lib/std/fs/File.zig +++ b/lib/std/fs/File.zig @@ -1341,14 +1341,43 @@ pub const Writer = struct { } }; +/// Defaults to positional reading; falls back to streaming. +/// +/// Positional is more threadsafe, since the global seek position is not +/// affected. pub fn reader(file: File) Reader { return .{ .file = file }; } +/// Positional is more threadsafe, since the global seek position is not +/// affected, but when such syscalls are not available, preemptively choosing +/// `Reader.Mode.streaming` will skip a failed syscall. +pub fn readerStreaming(file: File) Reader { + return .{ + .file = file, + .mode = .streaming, + .seek_err = error.Unseekable, + }; +} + +/// Defaults to positional reading; falls back to streaming. +/// +/// Positional is more threadsafe, since the global seek position is not +/// affected. pub fn writer(file: File) Writer { return .{ .file = file }; } +/// Positional is more threadsafe, since the global seek position is not +/// affected, but when such syscalls are not available, preemptively choosing +/// `Writer.Mode.streaming` will skip a failed syscall. +pub fn writerStreaming(file: File) Writer { + return .{ + .file = file, + .mode = .streaming, + }; +} + const range_off: windows.LARGE_INTEGER = 0; const range_len: windows.LARGE_INTEGER = 1; diff --git a/lib/std/io.zig b/lib/std/io.zig index ab0dcb1a47..7469bb221f 100644 --- a/lib/std/io.zig +++ b/lib/std/io.zig @@ -46,54 +46,57 @@ pub const BufferedAtomicFile = @import("io/buffered_atomic_file.zig").BufferedAt pub const tty = @import("io/tty.zig"); pub fn poll( - allocator: Allocator, + gpa: Allocator, comptime StreamEnum: type, files: PollFiles(StreamEnum), ) Poller(StreamEnum) { const enum_fields = @typeInfo(StreamEnum).@"enum".fields; - var result: Poller(StreamEnum) = undefined; - - if (is_windows) result.windows = .{ - .first_read_done = false, - .overlapped = [1]windows.OVERLAPPED{ - mem.zeroes(windows.OVERLAPPED), - } ** enum_fields.len, - .small_bufs = undefined, - .active = .{ - .count = 0, - .handles_buf = undefined, - .stream_map = undefined, - }, + var result: Poller(StreamEnum) = .{ + .gpa = gpa, + .readers = undefined, + .poll_fds = undefined, + .windows = if (is_windows) .{ + .first_read_done = false, + .overlapped = [1]windows.OVERLAPPED{ + mem.zeroes(windows.OVERLAPPED), + } ** enum_fields.len, + .small_bufs = undefined, + .active = .{ + .count = 0, + .handles_buf = undefined, + .stream_map = undefined, + }, + } else {}, }; - inline for (0..enum_fields.len) |i| { - result.fifos[i] = .{ - .allocator = allocator, - .buf = &.{}, - .head = 0, - .count = 0, + inline for (enum_fields, 0..) |field, i| { + result.readers[i] = .{ + .unbuffered_reader = .failing, + .buffer = &.{}, + .end = 0, + .seek = 0, }; if (is_windows) { - result.windows.active.handles_buf[i] = @field(files, enum_fields[i].name).handle; + result.windows.active.handles_buf[i] = @field(files, field.name).handle; } else { result.poll_fds[i] = .{ - .fd = @field(files, enum_fields[i].name).handle, + .fd = @field(files, field.name).handle, .events = posix.POLL.IN, .revents = undefined, }; } } + return result; } -pub const PollFifo = std.fifo.LinearFifo(u8, .Dynamic); - pub fn Poller(comptime StreamEnum: type) type { return struct { const enum_fields = @typeInfo(StreamEnum).@"enum".fields; const PollFd = if (is_windows) void else posix.pollfd; - fifos: [enum_fields.len]PollFifo, + gpa: Allocator, + readers: [enum_fields.len]BufferedReader, poll_fds: [enum_fields.len]PollFd, windows: if (is_windows) struct { first_read_done: bool, @@ -105,7 +108,7 @@ pub fn Poller(comptime StreamEnum: type) type { stream_map: [enum_fields.len]StreamEnum, pub fn removeAt(self: *@This(), index: u32) void { - std.debug.assert(index < self.count); + assert(index < self.count); for (index + 1..self.count) |i| { self.handles_buf[i - 1] = self.handles_buf[i]; self.stream_map[i - 1] = self.stream_map[i]; @@ -118,13 +121,14 @@ pub fn Poller(comptime StreamEnum: type) type { const Self = @This(); pub fn deinit(self: *Self) void { + const gpa = self.gpa; if (is_windows) { // cancel any pending IO to prevent clobbering OVERLAPPED value for (self.windows.active.handles_buf[0..self.windows.active.count]) |h| { _ = windows.kernel32.CancelIo(h); } } - inline for (&self.fifos) |*q| q.deinit(); + inline for (&self.readers) |*br| gpa.free(br.buffer); self.* = undefined; } @@ -144,8 +148,8 @@ pub fn Poller(comptime StreamEnum: type) type { } } - pub inline fn fifo(self: *Self, comptime which: StreamEnum) *PollFifo { - return &self.fifos[@intFromEnum(which)]; + pub inline fn reader(self: *Self, comptime which: StreamEnum) *BufferedReader { + return &self.readers[@intFromEnum(which)]; } fn pollWindows(self: *Self, nanoseconds: ?u64) !bool { @@ -236,6 +240,7 @@ pub fn Poller(comptime StreamEnum: type) type { } fn pollPosix(self: *Self, nanoseconds: ?u64) !bool { + const gpa = self.gpa; // We ask for ensureUnusedCapacity with this much extra space. This // has more of an effect on small reads because once the reads // start to get larger the amount of space an ArrayList will @@ -255,18 +260,18 @@ pub fn Poller(comptime StreamEnum: type) type { } var keep_polling = false; - inline for (&self.poll_fds, &self.fifos) |*poll_fd, *q| { + inline for (&self.poll_fds, &self.readers) |*poll_fd, *br| { // Try reading whatever is available before checking the error // conditions. // It's still possible to read after a POLL.HUP is received, // always check if there's some data waiting to be read first. if (poll_fd.revents & posix.POLL.IN != 0) { - const buf = try q.writableWithSize(bump_amt); + const buf = try br.writableSliceGreedyAlloc(gpa, bump_amt); const amt = posix.read(poll_fd.fd, buf) catch |err| switch (err) { error.BrokenPipe => 0, // Handle the same as EOF. else => |e| return e, }; - q.update(amt); + br.advanceBufferEnd(amt); if (amt == 0) { // Remove the fd when the EOF condition is met. poll_fd.fd = -1; @@ -297,14 +302,14 @@ var win_dummy_bytes_read: u32 = undefined; fn windowsAsyncReadToFifoAndQueueSmallRead( handle: windows.HANDLE, overlapped: *windows.OVERLAPPED, - fifo: *PollFifo, + br: *BufferedReader, small_buf: *[128]u8, bump_amt: usize, ) !enum { empty, populated, closed_populated, closed } { var read_any_data = false; while (true) { const fifo_read_pending = while (true) { - const buf = try fifo.writableWithSize(bump_amt); + const buf = try br.writableWithSize(bump_amt); const buf_len = math.cast(u32, buf.len) orelse math.maxInt(u32); if (0 == windows.kernel32.ReadFile( @@ -326,7 +331,7 @@ fn windowsAsyncReadToFifoAndQueueSmallRead( }; read_any_data = true; - fifo.update(num_bytes_read); + br.update(num_bytes_read); if (num_bytes_read == buf_len) { // We filled the buffer, so there's probably more data available. @@ -356,7 +361,7 @@ fn windowsAsyncReadToFifoAndQueueSmallRead( .aborted => break :cancel_read, }; read_any_data = true; - fifo.update(num_bytes_read); + br.update(num_bytes_read); } // Try to queue the 1-byte read. @@ -381,7 +386,7 @@ fn windowsAsyncReadToFifoAndQueueSmallRead( .closed => return if (read_any_data) .closed_populated else .closed, .aborted => unreachable, }; - try fifo.write(small_buf[0..num_bytes_read]); + try br.write(small_buf[0..num_bytes_read]); read_any_data = true; } } diff --git a/lib/std/io/BufferedReader.zig b/lib/std/io/BufferedReader.zig index b8dea9ea5d..6155a474c9 100644 --- a/lib/std/io/BufferedReader.zig +++ b/lib/std/io/BufferedReader.zig @@ -6,6 +6,7 @@ const assert = std.debug.assert; const testing = std.testing; const BufferedWriter = std.io.BufferedWriter; const Reader = std.io.Reader; +const Allocator = std.mem.Allocator; const BufferedReader = @This(); @@ -46,13 +47,17 @@ pub fn reader(br: *BufferedReader) Reader { }; } +pub fn readVec(br: *BufferedReader, data: []const []u8) Reader.Error!usize { + return passthruReadVec(br, data); +} + fn passthruRead(ctx: ?*anyopaque, bw: *BufferedWriter, limit: Reader.Limit) Reader.RwError!usize { const br: *BufferedReader = @alignCast(@ptrCast(ctx)); const buffer = br.buffer[0..br.end]; const buffered = buffer[br.seek..]; const limited = buffered[0..limit.min(buffered.len)]; if (limited.len > 0) { - const n = try bw.writeSplat(limited, 1); + const n = try bw.write(limited); br.seek += n; return n; } @@ -61,9 +66,44 @@ fn passthruRead(ctx: ?*anyopaque, bw: *BufferedWriter, limit: Reader.Limit) Read fn passthruReadVec(ctx: ?*anyopaque, data: []const []u8) Reader.Error!usize { const br: *BufferedReader = @alignCast(@ptrCast(ctx)); - _ = br; - _ = data; - @panic("TODO"); + var total: usize = 0; + for (data, 0..) |buf, i| { + const buffered = br.buffer[br.seek..br.end]; + const copy_len = @min(buffered.len, buf.len); + @memcpy(buf[0..copy_len], buffered[0..copy_len]); + total += copy_len; + br.seek += copy_len; + if (copy_len < buf.len) { + br.seek = 0; + br.end = 0; + var vecs: [8][]u8 = undefined; // Arbitrarily chosen value. + vecs[0] = buf[copy_len..]; + const vecs_len: usize = @min(vecs.len, data.len - i); + var vec_data_len: usize = vecs[0].len; + for (&vecs[1..vecs_len], data[i + 1 ..][0 .. vecs_len - 1]) |*v, d| { + vec_data_len += d.len; + v.* = d; + } + if (vecs_len < vecs.len) { + vecs[vecs_len] = br.buffer; + const n = try br.unbuffered_reader.readVec(vecs[0 .. vecs_len + 1]); + total += @min(n, vec_data_len); + br.end = n -| vec_data_len; + return total; + } + if (vecs[vecs.len - 1].len >= br.buffer.len) { + total += try br.unbuffered_reader.readVec(&vecs); + return total; + } + vec_data_len -= vecs[vecs.len - 1].len; + vecs[vecs.len - 1] = br.buffer; + const n = try br.unbuffered_reader.readVec(&vecs); + total += @min(n, vec_data_len); + br.end = n -| vec_data_len; + return total; + } + } + return total; } pub fn seekBy(br: *BufferedReader, seek_by: i64) !void { @@ -147,7 +187,7 @@ pub fn take(br: *BufferedReader, n: usize) Reader.Error![]u8 { } /// Returns the next `n` bytes from `unbuffered_reader` as an array, filling -/// the buffer as necessary. +/// the buffer as necessary and advancing the seek position `n` bytes. /// /// Asserts that the `BufferedReader` was initialized with a buffer capacity at /// least as big as `n`. @@ -161,6 +201,22 @@ pub fn takeArray(br: *BufferedReader, comptime n: usize) Reader.Error!*[n]u8 { return (try br.take(n))[0..n]; } +/// Returns the next `n` bytes from `unbuffered_reader` as an array, filling +/// the buffer as necessary, without advancing the seek position. +/// +/// Asserts that the `BufferedReader` was initialized with a buffer capacity at +/// least as big as `n`. +/// +/// If there are fewer than `n` bytes left in the stream, `error.EndOfStream` +/// is returned instead. +/// +/// See also: +/// * `peek` +/// * `takeArray` +pub fn peekArray(br: *BufferedReader, comptime n: usize) Reader.Error!*[n]u8 { + return (try br.peek(n))[0..n]; +} + /// Skips the next `n` bytes from the stream, advancing the seek position. /// /// Unlike `toss` which is infallible, in this function `n` can be any amount. @@ -255,6 +311,31 @@ pub fn readShort(br: *BufferedReader, buffer: []u8) Reader.ShortError!usize { @panic("TODO"); } +/// The function is inline to avoid the dead code in case `endian` is +/// comptime-known and matches host endianness. +pub inline fn readArrayEndianAlloc( + br: *BufferedReader, + allocator: Allocator, + Elem: type, + len: usize, + endian: std.builtin.Endian, +) ReadAllocError![]Elem { + const dest = try allocator.alloc(Elem, len); + errdefer allocator.free(dest); + try read(br, @ptrCast(dest)); + if (native_endian != endian) std.mem.byteSwapAllFields(Elem, dest); + return dest; +} + +pub const ReadAllocError = Reader.Error || Allocator.Error; + +pub fn readAlloc(br: *BufferedReader, allocator: Allocator, len: usize) ReadAllocError![]u8 { + const dest = try allocator.alloc(u8, len); + errdefer allocator.free(dest); + try read(br, dest); + return dest; +} + pub const DelimiterInclusiveError = error{ /// See the `Reader` implementation for detailed diagnostics. ReadFailed, @@ -498,12 +579,29 @@ pub fn takeVarInt(br: *BufferedReader, comptime Int: type, endian: std.builtin.E } /// Asserts the buffer was initialized with a capacity at least `@sizeOf(T)`. +/// +/// Advances the seek position. +/// +/// See also: +/// * `peekStruct` pub fn takeStruct(br: *BufferedReader, comptime T: type) Reader.Error!*align(1) T { // Only extern and packed structs have defined in-memory layout. comptime assert(@typeInfo(T).@"struct".layout != .auto); return @ptrCast(try br.takeArray(@sizeOf(T))); } +/// Asserts the buffer was initialized with a capacity at least `@sizeOf(T)`. +/// +/// Does not advance the seek position. +/// +/// See also: +/// * `takeStruct` +pub fn peekStruct(br: *BufferedReader, comptime T: type) Reader.Error!*align(1) T { + // Only extern and packed structs have defined in-memory layout. + comptime assert(@typeInfo(T).@"struct".layout != .auto); + return @ptrCast(try br.peekArray(@sizeOf(T))); +} + /// Asserts the buffer was initialized with a capacity at least `@sizeOf(T)`. /// /// This function is inline to avoid referencing `std.mem.byteSwapAllFields` @@ -514,6 +612,16 @@ pub inline fn takeStructEndian(br: *BufferedReader, comptime T: type, endian: st return res; } +/// Asserts the buffer was initialized with a capacity at least `@sizeOf(T)`. +/// +/// This function is inline to avoid referencing `std.mem.byteSwapAllFields` +/// when `endian` is comptime-known and matches the host endianness. +pub inline fn peekStructEndian(br: *BufferedReader, comptime T: type, endian: std.builtin.Endian) Reader.Error!T { + var res = (try br.peekStruct(T)).*; + if (native_endian != endian) std.mem.byteSwapAllFields(T, &res); + return res; +} + /// Reads an integer with the same size as the given enum's tag type. If the /// integer matches an enum tag, casts the integer to the enum tag and returns /// it. Otherwise, returns `error.InvalidEnumTag`. @@ -536,6 +644,28 @@ pub fn takeLeb128(br: *BufferedReader, comptime Result: type) TakeLeb128Error!Re } }))) orelse error.Overflow; } +/// Returns a slice into the unused capacity of `buffer` with at least +/// `min_len` bytes, extending `buffer` by resizing it with `gpa` as necessary. +/// +/// After calling this function, typically the caller will follow up with a +/// call to `advanceBufferEnd` to report the actual number of bytes buffered. +pub fn writableSliceGreedyAlloc( + br: *BufferedReader, + allocator: Allocator, + min_len: usize, +) error{OutOfMemory}![]u8 { + _ = br; + _ = allocator; + _ = min_len; + @panic("TODO"); +} + +/// After writing directly into the unused capacity of `buffer`, this function +/// updates `end` so that users of `BufferedReader` can receive the data. +pub fn advanceBufferEnd(br: *BufferedReader, n: usize) void { + br.end += n; +} + fn takeMultipleOf7Leb128(br: *BufferedReader, comptime Result: type) TakeLeb128Error!Result { const result_info = @typeInfo(Result).int; comptime assert(result_info.bits % 7 == 0); @@ -599,6 +729,10 @@ test takeArray { return error.Unimplemented; } +test peekArray { + return error.Unimplemented; +} + test discard { var br: BufferedReader = undefined; br.initFixed("foobar"); @@ -684,10 +818,18 @@ test takeStruct { return error.Unimplemented; } +test peekStruct { + return error.Unimplemented; +} + test takeStructEndian { return error.Unimplemented; } +test peekStructEndian { + return error.Unimplemented; +} + test takeEnum { return error.Unimplemented; } @@ -699,3 +841,7 @@ test takeLeb128 { test readShort { return error.Unimplemented; } + +test readVec { + return error.Unimplemented; +}