diff --git a/lib/std/Build/Fuzz/WebServer.zig b/lib/std/Build/Fuzz/WebServer.zig index 0df43408f1..18582a60ef 100644 --- a/lib/std/Build/Fuzz/WebServer.zig +++ b/lib/std/Build/Fuzz/WebServer.zig @@ -273,21 +273,17 @@ fn buildWasmBinary( try sendMessage(child.stdin.?, .update); try sendMessage(child.stdin.?, .exit); - const Header = std.zig.Server.Message.Header; var result: ?Path = null; var result_error_bundle = std.zig.ErrorBundle.empty; - const stdout = poller.fifo(.stdout); + const stdout = poller.reader(.stdout); poll: while (true) { - while (stdout.readableLength() < @sizeOf(Header)) { - if (!(try poller.poll())) break :poll; - } - const header = stdout.reader().readStruct(Header) catch unreachable; - while (stdout.readableLength() < header.bytes_len) { - if (!(try poller.poll())) break :poll; - } - const body = stdout.readableSliceOfLen(header.bytes_len); + const Header = std.zig.Server.Message.Header; + while (stdout.buffered().len < @sizeOf(Header)) if (!try poller.poll()) break :poll; + const header = stdout.takeStruct(Header, .little) catch unreachable; + while (stdout.buffered().len < header.bytes_len) if (!try poller.poll()) break :poll; + const body = stdout.take(header.bytes_len) catch unreachable; switch (header.tag) { .zig_version => { @@ -325,15 +321,11 @@ fn buildWasmBinary( }, else => {}, // ignore other messages } - - stdout.discard(body.len); } - const stderr = poller.fifo(.stderr); - if (stderr.readableLength() > 0) { - const owned_stderr = try stderr.toOwnedSlice(); - defer gpa.free(owned_stderr); - std.debug.print("{s}", .{owned_stderr}); + const stderr_contents = try poller.toOwnedSlice(.stderr); + if (stderr_contents.len > 0) { + std.debug.print("{s}", .{stderr_contents}); } // Send EOF to stdin. diff --git a/lib/std/Build/Step.zig b/lib/std/Build/Step.zig index 5192249f12..8583427aad 100644 --- a/lib/std/Build/Step.zig +++ b/lib/std/Build/Step.zig @@ -286,7 +286,7 @@ pub fn cast(step: *Step, comptime T: type) ?*T { } /// For debugging purposes, prints identifying information about this Step. -pub fn dump(step: *Step, w: *std.io.Writer, tty_config: std.io.tty.Config) void { +pub fn dump(step: *Step, w: *std.Io.Writer, tty_config: std.Io.tty.Config) void { const debug_info = std.debug.getSelfDebugInfo() catch |err| { w.print("Unable to dump stack trace: Unable to open debug info: {s}\n", .{ @errorName(err), @@ -359,7 +359,7 @@ pub fn addError(step: *Step, comptime fmt: []const u8, args: anytype) error{OutO pub const ZigProcess = struct { child: std.process.Child, - poller: std.io.Poller(StreamEnum), + poller: std.Io.Poller(StreamEnum), progress_ipc_fd: if (std.Progress.have_ipc) ?std.posix.fd_t else void, pub const StreamEnum = enum { stdout, stderr }; @@ -428,7 +428,7 @@ pub fn evalZigProcess( const zp = try gpa.create(ZigProcess); zp.* = .{ .child = child, - .poller = std.io.poll(gpa, ZigProcess.StreamEnum, .{ + .poller = std.Io.poll(gpa, ZigProcess.StreamEnum, .{ .stdout = child.stdout.?, .stderr = child.stderr.?, }), @@ -508,20 +508,16 @@ fn zigProcessUpdate(s: *Step, zp: *ZigProcess, watch: bool) !?Path { try sendMessage(zp.child.stdin.?, .update); if (!watch) try sendMessage(zp.child.stdin.?, .exit); - const Header = std.zig.Server.Message.Header; var result: ?Path = null; - const stdout = zp.poller.fifo(.stdout); + const stdout = zp.poller.reader(.stdout); poll: while (true) { - while (stdout.readableLength() < @sizeOf(Header)) { - if (!(try zp.poller.poll())) break :poll; - } - const header = stdout.reader().readStruct(Header) catch unreachable; - while (stdout.readableLength() < header.bytes_len) { - if (!(try zp.poller.poll())) break :poll; - } - const body = stdout.readableSliceOfLen(header.bytes_len); + const Header = std.zig.Server.Message.Header; + while (stdout.buffered().len < @sizeOf(Header)) if (!try zp.poller.poll()) break :poll; + const header = stdout.takeStruct(Header, .little) catch unreachable; + while (stdout.buffered().len < header.bytes_len) if (!try zp.poller.poll()) break :poll; + const body = stdout.take(header.bytes_len) catch unreachable; switch (header.tag) { .zig_version => { @@ -547,11 +543,8 @@ fn zigProcessUpdate(s: *Step, zp: *ZigProcess, watch: bool) !?Path { .string_bytes = try arena.dupe(u8, string_bytes), .extra = extra_array, }; - if (watch) { - // This message indicates the end of the update. - stdout.discard(body.len); - break; - } + // This message indicates the end of the update. + if (watch) break :poll; }, .emit_digest => { const EmitDigest = std.zig.Server.Message.EmitDigest; @@ -611,15 +604,13 @@ fn zigProcessUpdate(s: *Step, zp: *ZigProcess, watch: bool) !?Path { }, else => {}, // ignore other messages } - - stdout.discard(body.len); } s.result_duration_ns = timer.read(); - const stderr = zp.poller.fifo(.stderr); - if (stderr.readableLength() > 0) { - try s.result_error_msgs.append(arena, try stderr.toOwnedSlice()); + const stderr_contents = try zp.poller.toOwnedSlice(.stderr); + if (stderr_contents.len > 0) { + try s.result_error_msgs.append(arena, try arena.dupe(u8, stderr_contents)); } return result; @@ -736,7 +727,7 @@ pub fn allocPrintCmd2( argv: []const []const u8, ) Allocator.Error![]u8 { const shell = struct { - fn escape(writer: anytype, string: []const u8, is_argv0: bool) !void { + fn escape(writer: *std.Io.Writer, string: []const u8, is_argv0: bool) !void { for (string) |c| { if (switch (c) { else => true, @@ -770,9 +761,9 @@ pub fn allocPrintCmd2( } }; - var buf: std.ArrayListUnmanaged(u8) = .empty; - const writer = buf.writer(arena); - if (opt_cwd) |cwd| try writer.print("cd {s} && ", .{cwd}); + var aw: std.Io.Writer.Allocating = .init(arena); + const writer = &aw.writer; + if (opt_cwd) |cwd| writer.print("cd {s} && ", .{cwd}) catch return error.OutOfMemory; if (opt_env) |env| { const process_env_map = std.process.getEnvMap(arena) catch std.process.EnvMap.init(arena); var it = env.iterator(); @@ -782,17 +773,17 @@ pub fn allocPrintCmd2( if (process_env_map.get(key)) |process_value| { if (std.mem.eql(u8, value, process_value)) continue; } - try writer.print("{s}=", .{key}); - try shell.escape(writer, value, false); - try writer.writeByte(' '); + writer.print("{s}=", .{key}) catch return error.OutOfMemory; + shell.escape(writer, value, false) catch return error.OutOfMemory; + writer.writeByte(' ') catch return error.OutOfMemory; } } - try shell.escape(writer, argv[0], true); + shell.escape(writer, argv[0], true) catch return error.OutOfMemory; for (argv[1..]) |arg| { - try writer.writeByte(' '); - try shell.escape(writer, arg, false); + writer.writeByte(' ') catch return error.OutOfMemory; + shell.escape(writer, arg, false) catch return error.OutOfMemory; } - return buf.toOwnedSlice(arena); + return aw.toOwnedSlice(); } /// Prefer `cacheHitAndWatch` unless you already added watch inputs diff --git a/lib/std/Build/Step/Run.zig b/lib/std/Build/Step/Run.zig index 414f7ccff2..57f5d73f0c 100644 --- a/lib/std/Build/Step/Run.zig +++ b/lib/std/Build/Step/Run.zig @@ -73,9 +73,12 @@ skip_foreign_checks: bool, /// external executor (such as qemu) but not fail if the executor is unavailable. failing_to_execute_foreign_is_an_error: bool, +/// Deprecated in favor of `stdio_limit`. +max_stdio_size: usize, + /// If stderr or stdout exceeds this amount, the child process is killed and /// the step fails. -max_stdio_size: usize, +stdio_limit: std.Io.Limit, captured_stdout: ?*Output, captured_stderr: ?*Output, @@ -186,6 +189,7 @@ pub fn create(owner: *std.Build, name: []const u8) *Run { .skip_foreign_checks = false, .failing_to_execute_foreign_is_an_error = true, .max_stdio_size = 10 * 1024 * 1024, + .stdio_limit = .unlimited, .captured_stdout = null, .captured_stderr = null, .dep_output_file = null, @@ -1011,7 +1015,7 @@ fn populateGeneratedPaths( } } -fn formatTerm(term: ?std.process.Child.Term, w: *std.io.Writer) std.io.Writer.Error!void { +fn formatTerm(term: ?std.process.Child.Term, w: *std.Io.Writer) std.Io.Writer.Error!void { if (term) |t| switch (t) { .Exited => |code| try w.print("exited with code {d}", .{code}), .Signal => |sig| try w.print("terminated with signal {d}", .{sig}), @@ -1500,7 +1504,7 @@ fn evalZigTest( const gpa = run.step.owner.allocator; const arena = run.step.owner.allocator; - var poller = std.io.poll(gpa, enum { stdout, stderr }, .{ + var poller = std.Io.poll(gpa, enum { stdout, stderr }, .{ .stdout = child.stdout.?, .stderr = child.stderr.?, }); @@ -1524,11 +1528,6 @@ fn evalZigTest( break :failed false; }; - const Header = std.zig.Server.Message.Header; - - const stdout = poller.fifo(.stdout); - const stderr = poller.fifo(.stderr); - var fail_count: u32 = 0; var skip_count: u32 = 0; var leak_count: u32 = 0; @@ -1541,16 +1540,14 @@ fn evalZigTest( var sub_prog_node: ?std.Progress.Node = null; defer if (sub_prog_node) |n| n.end(); + const stdout = poller.reader(.stdout); + const stderr = poller.reader(.stderr); const any_write_failed = first_write_failed or poll: while (true) { - while (stdout.readableLength() < @sizeOf(Header)) { - if (!(try poller.poll())) break :poll false; - } - const header = stdout.reader().readStruct(Header) catch unreachable; - while (stdout.readableLength() < header.bytes_len) { - if (!(try poller.poll())) break :poll false; - } - const body = stdout.readableSliceOfLen(header.bytes_len); - + const Header = std.zig.Server.Message.Header; + while (stdout.buffered().len < @sizeOf(Header)) if (!try poller.poll()) break :poll false; + const header = stdout.takeStruct(Header, .little) catch unreachable; + while (stdout.buffered().len < header.bytes_len) if (!try poller.poll()) break :poll false; + const body = stdout.take(header.bytes_len) catch unreachable; switch (header.tag) { .zig_version => { if (!std.mem.eql(u8, builtin.zig_version_string, body)) { @@ -1607,9 +1604,9 @@ fn evalZigTest( if (tr_hdr.flags.fail or tr_hdr.flags.leak or tr_hdr.flags.log_err_count > 0) { const name = std.mem.sliceTo(md.string_bytes[md.names[tr_hdr.index]..], 0); - const orig_msg = stderr.readableSlice(0); - defer stderr.discard(orig_msg.len); - const msg = std.mem.trim(u8, orig_msg, "\n"); + const stderr_contents = stderr.buffered(); + stderr.toss(stderr_contents.len); + const msg = std.mem.trim(u8, stderr_contents, "\n"); const label = if (tr_hdr.flags.fail) "failed" else if (tr_hdr.flags.leak) @@ -1660,8 +1657,6 @@ fn evalZigTest( }, else => {}, // ignore other messages } - - stdout.discard(body.len); }; if (any_write_failed) { @@ -1670,9 +1665,9 @@ fn evalZigTest( while (try poller.poll()) {} } - if (stderr.readableLength() > 0) { - const msg = std.mem.trim(u8, try stderr.toOwnedSlice(), "\n"); - if (msg.len > 0) run.step.result_stderr = msg; + const stderr_contents = std.mem.trim(u8, stderr.buffered(), "\n"); + if (stderr_contents.len > 0) { + run.step.result_stderr = try arena.dupe(u8, stderr_contents); } // Send EOF to stdin. @@ -1795,28 +1790,43 @@ fn evalGeneric(run: *Run, child: *std.process.Child) !StdIoResult { var stdout_bytes: ?[]const u8 = null; var stderr_bytes: ?[]const u8 = null; + run.stdio_limit = run.stdio_limit.min(.limited(run.max_stdio_size)); if (child.stdout) |stdout| { if (child.stderr) |stderr| { - var poller = std.io.poll(arena, enum { stdout, stderr }, .{ + var poller = std.Io.poll(arena, enum { stdout, stderr }, .{ .stdout = stdout, .stderr = stderr, }); defer poller.deinit(); while (try poller.poll()) { - if (poller.fifo(.stdout).count > run.max_stdio_size) - return error.StdoutStreamTooLong; - if (poller.fifo(.stderr).count > run.max_stdio_size) - return error.StderrStreamTooLong; + if (run.stdio_limit.toInt()) |limit| { + if (poller.reader(.stderr).buffered().len > limit) + return error.StdoutStreamTooLong; + if (poller.reader(.stderr).buffered().len > limit) + return error.StderrStreamTooLong; + } } - stdout_bytes = try poller.fifo(.stdout).toOwnedSlice(); - stderr_bytes = try poller.fifo(.stderr).toOwnedSlice(); + stdout_bytes = try poller.toOwnedSlice(.stdout); + stderr_bytes = try poller.toOwnedSlice(.stderr); } else { - stdout_bytes = try stdout.deprecatedReader().readAllAlloc(arena, run.max_stdio_size); + var small_buffer: [1]u8 = undefined; + var stdout_reader = stdout.readerStreaming(&small_buffer); + stdout_bytes = stdout_reader.interface.allocRemaining(arena, run.stdio_limit) catch |err| switch (err) { + error.OutOfMemory => return error.OutOfMemory, + error.ReadFailed => return stdout_reader.err.?, + error.StreamTooLong => return error.StdoutStreamTooLong, + }; } } else if (child.stderr) |stderr| { - stderr_bytes = try stderr.deprecatedReader().readAllAlloc(arena, run.max_stdio_size); + var small_buffer: [1]u8 = undefined; + var stderr_reader = stderr.readerStreaming(&small_buffer); + stderr_bytes = stderr_reader.interface.allocRemaining(arena, run.stdio_limit) catch |err| switch (err) { + error.OutOfMemory => return error.OutOfMemory, + error.ReadFailed => return stderr_reader.err.?, + error.StreamTooLong => return error.StderrStreamTooLong, + }; } if (stderr_bytes) |bytes| if (bytes.len > 0) { diff --git a/lib/std/Io.zig b/lib/std/Io.zig index a93c31954b..1ab5d13cab 100644 --- a/lib/std/Io.zig +++ b/lib/std/Io.zig @@ -1,16 +1,11 @@ -const std = @import("std.zig"); const builtin = @import("builtin"); -const root = @import("root"); -const c = std.c; const is_windows = builtin.os.tag == .windows; + +const std = @import("std.zig"); const windows = std.os.windows; const posix = std.posix; const math = std.math; const assert = std.debug.assert; -const fs = std.fs; -const mem = std.mem; -const meta = std.meta; -const File = std.fs.File; const Allocator = std.mem.Allocator; const Alignment = std.mem.Alignment; @@ -493,54 +488,51 @@ test null_writer { } pub fn poll( - allocator: Allocator, + gpa: Allocator, comptime StreamEnum: type, files: PollFiles(StreamEnum), ) Poller(StreamEnum) { const enum_fields = @typeInfo(StreamEnum).@"enum".fields; - var result: Poller(StreamEnum) = undefined; - - if (is_windows) result.windows = .{ - .first_read_done = false, - .overlapped = [1]windows.OVERLAPPED{ - mem.zeroes(windows.OVERLAPPED), - } ** enum_fields.len, - .small_bufs = undefined, - .active = .{ - .count = 0, - .handles_buf = undefined, - .stream_map = undefined, - }, + var result: Poller(StreamEnum) = .{ + .gpa = gpa, + .readers = @splat(.failing), + .poll_fds = undefined, + .windows = if (is_windows) .{ + .first_read_done = false, + .overlapped = [1]windows.OVERLAPPED{ + std.mem.zeroes(windows.OVERLAPPED), + } ** enum_fields.len, + .small_bufs = undefined, + .active = .{ + .count = 0, + .handles_buf = undefined, + .stream_map = undefined, + }, + } else {}, }; - inline for (0..enum_fields.len) |i| { - result.fifos[i] = .{ - .allocator = allocator, - .buf = &.{}, - .head = 0, - .count = 0, - }; + inline for (enum_fields, 0..) |field, i| { if (is_windows) { - result.windows.active.handles_buf[i] = @field(files, enum_fields[i].name).handle; + result.windows.active.handles_buf[i] = @field(files, field.name).handle; } else { result.poll_fds[i] = .{ - .fd = @field(files, enum_fields[i].name).handle, + .fd = @field(files, field.name).handle, .events = posix.POLL.IN, .revents = undefined, }; } } + return result; } -pub const PollFifo = std.fifo.LinearFifo(u8, .Dynamic); - pub fn Poller(comptime StreamEnum: type) type { return struct { const enum_fields = @typeInfo(StreamEnum).@"enum".fields; const PollFd = if (is_windows) void else posix.pollfd; - fifos: [enum_fields.len]PollFifo, + gpa: Allocator, + readers: [enum_fields.len]Reader, poll_fds: [enum_fields.len]PollFd, windows: if (is_windows) struct { first_read_done: bool, @@ -552,7 +544,7 @@ pub fn Poller(comptime StreamEnum: type) type { stream_map: [enum_fields.len]StreamEnum, pub fn removeAt(self: *@This(), index: u32) void { - std.debug.assert(index < self.count); + assert(index < self.count); for (index + 1..self.count) |i| { self.handles_buf[i - 1] = self.handles_buf[i]; self.stream_map[i - 1] = self.stream_map[i]; @@ -565,13 +557,14 @@ pub fn Poller(comptime StreamEnum: type) type { const Self = @This(); pub fn deinit(self: *Self) void { + const gpa = self.gpa; if (is_windows) { // cancel any pending IO to prevent clobbering OVERLAPPED value for (self.windows.active.handles_buf[0..self.windows.active.count]) |h| { _ = windows.kernel32.CancelIo(h); } } - inline for (&self.fifos) |*q| q.deinit(); + inline for (&self.readers) |*r| gpa.free(r.buffer); self.* = undefined; } @@ -591,21 +584,40 @@ pub fn Poller(comptime StreamEnum: type) type { } } - pub inline fn fifo(self: *Self, comptime which: StreamEnum) *PollFifo { - return &self.fifos[@intFromEnum(which)]; + pub fn reader(self: *Self, which: StreamEnum) *Reader { + return &self.readers[@intFromEnum(which)]; + } + + pub fn toOwnedSlice(self: *Self, which: StreamEnum) error{OutOfMemory}![]u8 { + const gpa = self.gpa; + const r = reader(self, which); + if (r.seek == 0) { + const new = try gpa.realloc(r.buffer, r.end); + r.buffer = &.{}; + r.end = 0; + return new; + } + const new = try gpa.dupe(u8, r.buffered()); + gpa.free(r.buffer); + r.buffer = &.{}; + r.seek = 0; + r.end = 0; + return new; } fn pollWindows(self: *Self, nanoseconds: ?u64) !bool { const bump_amt = 512; + const gpa = self.gpa; if (!self.windows.first_read_done) { var already_read_data = false; for (0..enum_fields.len) |i| { const handle = self.windows.active.handles_buf[i]; switch (try windowsAsyncReadToFifoAndQueueSmallRead( + gpa, handle, &self.windows.overlapped[i], - &self.fifos[i], + &self.readers[i], &self.windows.small_bufs[i], bump_amt, )) { @@ -652,7 +664,7 @@ pub fn Poller(comptime StreamEnum: type) type { const handle = self.windows.active.handles_buf[active_idx]; const overlapped = &self.windows.overlapped[stream_idx]; - const stream_fifo = &self.fifos[stream_idx]; + const stream_reader = &self.readers[stream_idx]; const small_buf = &self.windows.small_bufs[stream_idx]; const num_bytes_read = switch (try windowsGetReadResult(handle, overlapped, false)) { @@ -663,12 +675,16 @@ pub fn Poller(comptime StreamEnum: type) type { }, .aborted => unreachable, }; - try stream_fifo.write(small_buf[0..num_bytes_read]); + const buf = small_buf[0..num_bytes_read]; + const dest = try writableSliceGreedyAlloc(stream_reader, gpa, buf.len); + @memcpy(dest[0..buf.len], buf); + advanceBufferEnd(stream_reader, buf.len); switch (try windowsAsyncReadToFifoAndQueueSmallRead( + gpa, handle, overlapped, - stream_fifo, + stream_reader, small_buf, bump_amt, )) { @@ -683,6 +699,7 @@ pub fn Poller(comptime StreamEnum: type) type { } fn pollPosix(self: *Self, nanoseconds: ?u64) !bool { + const gpa = self.gpa; // We ask for ensureUnusedCapacity with this much extra space. This // has more of an effect on small reads because once the reads // start to get larger the amount of space an ArrayList will @@ -702,18 +719,18 @@ pub fn Poller(comptime StreamEnum: type) type { } var keep_polling = false; - inline for (&self.poll_fds, &self.fifos) |*poll_fd, *q| { + for (&self.poll_fds, &self.readers) |*poll_fd, *r| { // Try reading whatever is available before checking the error // conditions. // It's still possible to read after a POLL.HUP is received, // always check if there's some data waiting to be read first. if (poll_fd.revents & posix.POLL.IN != 0) { - const buf = try q.writableWithSize(bump_amt); + const buf = try writableSliceGreedyAlloc(r, gpa, bump_amt); const amt = posix.read(poll_fd.fd, buf) catch |err| switch (err) { error.BrokenPipe => 0, // Handle the same as EOF. else => |e| return e, }; - q.update(amt); + advanceBufferEnd(r, amt); if (amt == 0) { // Remove the fd when the EOF condition is met. poll_fd.fd = -1; @@ -729,146 +746,181 @@ pub fn Poller(comptime StreamEnum: type) type { } return keep_polling; } - }; -} -/// The `ReadFile` docuementation states that `lpNumberOfBytesRead` does not have a meaningful -/// result when using overlapped I/O, but also that it cannot be `null` on Windows 7. For -/// compatibility, we point it to this dummy variables, which we never otherwise access. -/// See: https://learn.microsoft.com/en-us/windows/win32/api/fileapi/nf-fileapi-readfile -var win_dummy_bytes_read: u32 = undefined; - -/// Read as much data as possible from `handle` with `overlapped`, and write it to the FIFO. Before -/// returning, queue a read into `small_buf` so that `WaitForMultipleObjects` returns when more data -/// is available. `handle` must have no pending asynchronous operation. -fn windowsAsyncReadToFifoAndQueueSmallRead( - handle: windows.HANDLE, - overlapped: *windows.OVERLAPPED, - fifo: *PollFifo, - small_buf: *[128]u8, - bump_amt: usize, -) !enum { empty, populated, closed_populated, closed } { - var read_any_data = false; - while (true) { - const fifo_read_pending = while (true) { - const buf = try fifo.writableWithSize(bump_amt); - const buf_len = math.cast(u32, buf.len) orelse math.maxInt(u32); - - if (0 == windows.kernel32.ReadFile( - handle, - buf.ptr, - buf_len, - &win_dummy_bytes_read, - overlapped, - )) switch (windows.GetLastError()) { - .IO_PENDING => break true, - .BROKEN_PIPE => return if (read_any_data) .closed_populated else .closed, - else => |err| return windows.unexpectedError(err), - }; - - const num_bytes_read = switch (try windowsGetReadResult(handle, overlapped, false)) { - .success => |n| n, - .closed => return if (read_any_data) .closed_populated else .closed, - .aborted => unreachable, - }; - - read_any_data = true; - fifo.update(num_bytes_read); - - if (num_bytes_read == buf_len) { - // We filled the buffer, so there's probably more data available. - continue; - } else { - // We didn't fill the buffer, so assume we're out of data. - // There is no pending read. - break false; + /// Returns a slice into the unused capacity of `buffer` with at least + /// `min_len` bytes, extending `buffer` by resizing it with `gpa` as necessary. + /// + /// After calling this function, typically the caller will follow up with a + /// call to `advanceBufferEnd` to report the actual number of bytes buffered. + fn writableSliceGreedyAlloc(r: *Reader, allocator: Allocator, min_len: usize) Allocator.Error![]u8 { + { + const unused = r.buffer[r.end..]; + if (unused.len >= min_len) return unused; } - }; - - if (fifo_read_pending) cancel_read: { - // Cancel the pending read into the FIFO. - _ = windows.kernel32.CancelIo(handle); - - // We have to wait for the handle to be signalled, i.e. for the cancellation to complete. - switch (windows.kernel32.WaitForSingleObject(handle, windows.INFINITE)) { - windows.WAIT_OBJECT_0 => {}, - windows.WAIT_FAILED => return windows.unexpectedError(windows.GetLastError()), - else => unreachable, + if (r.seek > 0) r.rebase(); + { + var list: std.ArrayListUnmanaged(u8) = .{ + .items = r.buffer[0..r.end], + .capacity = r.buffer.len, + }; + defer r.buffer = list.allocatedSlice(); + try list.ensureUnusedCapacity(allocator, min_len); } - - // If it completed before we canceled, make sure to tell the FIFO! - const num_bytes_read = switch (try windowsGetReadResult(handle, overlapped, true)) { - .success => |n| n, - .closed => return if (read_any_data) .closed_populated else .closed, - .aborted => break :cancel_read, - }; - read_any_data = true; - fifo.update(num_bytes_read); + const unused = r.buffer[r.end..]; + assert(unused.len >= min_len); + return unused; } - // Try to queue the 1-byte read. - if (0 == windows.kernel32.ReadFile( - handle, - small_buf, - small_buf.len, - &win_dummy_bytes_read, - overlapped, - )) switch (windows.GetLastError()) { - .IO_PENDING => { - // 1-byte read pending as intended - return if (read_any_data) .populated else .empty; - }, - .BROKEN_PIPE => return if (read_any_data) .closed_populated else .closed, - else => |err| return windows.unexpectedError(err), - }; + /// After writing directly into the unused capacity of `buffer`, this function + /// updates `end` so that users of `Reader` can receive the data. + fn advanceBufferEnd(r: *Reader, n: usize) void { + assert(n <= r.buffer.len - r.end); + r.end += n; + } - // We got data back this time. Write it to the FIFO and run the main loop again. - const num_bytes_read = switch (try windowsGetReadResult(handle, overlapped, false)) { - .success => |n| n, - .closed => return if (read_any_data) .closed_populated else .closed, - .aborted => unreachable, - }; - try fifo.write(small_buf[0..num_bytes_read]); - read_any_data = true; - } -} + /// The `ReadFile` docuementation states that `lpNumberOfBytesRead` does not have a meaningful + /// result when using overlapped I/O, but also that it cannot be `null` on Windows 7. For + /// compatibility, we point it to this dummy variables, which we never otherwise access. + /// See: https://learn.microsoft.com/en-us/windows/win32/api/fileapi/nf-fileapi-readfile + var win_dummy_bytes_read: u32 = undefined; -/// Simple wrapper around `GetOverlappedResult` to determine the result of a `ReadFile` operation. -/// If `!allow_aborted`, then `aborted` is never returned (`OPERATION_ABORTED` is considered unexpected). -/// -/// The `ReadFile` documentation states that the number of bytes read by an overlapped `ReadFile` must be determined using `GetOverlappedResult`, even if the -/// operation immediately returns data: -/// "Use NULL for [lpNumberOfBytesRead] if this is an asynchronous operation to avoid potentially -/// erroneous results." -/// "If `hFile` was opened with `FILE_FLAG_OVERLAPPED`, the following conditions are in effect: [...] -/// The lpNumberOfBytesRead parameter should be set to NULL. Use the GetOverlappedResult function to -/// get the actual number of bytes read." -/// See: https://learn.microsoft.com/en-us/windows/win32/api/fileapi/nf-fileapi-readfile -fn windowsGetReadResult( - handle: windows.HANDLE, - overlapped: *windows.OVERLAPPED, - allow_aborted: bool, -) !union(enum) { - success: u32, - closed, - aborted, -} { - var num_bytes_read: u32 = undefined; - if (0 == windows.kernel32.GetOverlappedResult( - handle, - overlapped, - &num_bytes_read, - 0, - )) switch (windows.GetLastError()) { - .BROKEN_PIPE => return .closed, - .OPERATION_ABORTED => |err| if (allow_aborted) { - return .aborted; - } else { - return windows.unexpectedError(err); - }, - else => |err| return windows.unexpectedError(err), + /// Read as much data as possible from `handle` with `overlapped`, and write it to the FIFO. Before + /// returning, queue a read into `small_buf` so that `WaitForMultipleObjects` returns when more data + /// is available. `handle` must have no pending asynchronous operation. + fn windowsAsyncReadToFifoAndQueueSmallRead( + gpa: Allocator, + handle: windows.HANDLE, + overlapped: *windows.OVERLAPPED, + r: *Reader, + small_buf: *[128]u8, + bump_amt: usize, + ) !enum { empty, populated, closed_populated, closed } { + var read_any_data = false; + while (true) { + const fifo_read_pending = while (true) { + const buf = try writableSliceGreedyAlloc(r, gpa, bump_amt); + const buf_len = math.cast(u32, buf.len) orelse math.maxInt(u32); + + if (0 == windows.kernel32.ReadFile( + handle, + buf.ptr, + buf_len, + &win_dummy_bytes_read, + overlapped, + )) switch (windows.GetLastError()) { + .IO_PENDING => break true, + .BROKEN_PIPE => return if (read_any_data) .closed_populated else .closed, + else => |err| return windows.unexpectedError(err), + }; + + const num_bytes_read = switch (try windowsGetReadResult(handle, overlapped, false)) { + .success => |n| n, + .closed => return if (read_any_data) .closed_populated else .closed, + .aborted => unreachable, + }; + + read_any_data = true; + advanceBufferEnd(r, num_bytes_read); + + if (num_bytes_read == buf_len) { + // We filled the buffer, so there's probably more data available. + continue; + } else { + // We didn't fill the buffer, so assume we're out of data. + // There is no pending read. + break false; + } + }; + + if (fifo_read_pending) cancel_read: { + // Cancel the pending read into the FIFO. + _ = windows.kernel32.CancelIo(handle); + + // We have to wait for the handle to be signalled, i.e. for the cancellation to complete. + switch (windows.kernel32.WaitForSingleObject(handle, windows.INFINITE)) { + windows.WAIT_OBJECT_0 => {}, + windows.WAIT_FAILED => return windows.unexpectedError(windows.GetLastError()), + else => unreachable, + } + + // If it completed before we canceled, make sure to tell the FIFO! + const num_bytes_read = switch (try windowsGetReadResult(handle, overlapped, true)) { + .success => |n| n, + .closed => return if (read_any_data) .closed_populated else .closed, + .aborted => break :cancel_read, + }; + read_any_data = true; + advanceBufferEnd(r, num_bytes_read); + } + + // Try to queue the 1-byte read. + if (0 == windows.kernel32.ReadFile( + handle, + small_buf, + small_buf.len, + &win_dummy_bytes_read, + overlapped, + )) switch (windows.GetLastError()) { + .IO_PENDING => { + // 1-byte read pending as intended + return if (read_any_data) .populated else .empty; + }, + .BROKEN_PIPE => return if (read_any_data) .closed_populated else .closed, + else => |err| return windows.unexpectedError(err), + }; + + // We got data back this time. Write it to the FIFO and run the main loop again. + const num_bytes_read = switch (try windowsGetReadResult(handle, overlapped, false)) { + .success => |n| n, + .closed => return if (read_any_data) .closed_populated else .closed, + .aborted => unreachable, + }; + const buf = small_buf[0..num_bytes_read]; + const dest = try writableSliceGreedyAlloc(r, gpa, buf.len); + @memcpy(dest[0..buf.len], buf); + advanceBufferEnd(r, buf.len); + read_any_data = true; + } + } + + /// Simple wrapper around `GetOverlappedResult` to determine the result of a `ReadFile` operation. + /// If `!allow_aborted`, then `aborted` is never returned (`OPERATION_ABORTED` is considered unexpected). + /// + /// The `ReadFile` documentation states that the number of bytes read by an overlapped `ReadFile` must be determined using `GetOverlappedResult`, even if the + /// operation immediately returns data: + /// "Use NULL for [lpNumberOfBytesRead] if this is an asynchronous operation to avoid potentially + /// erroneous results." + /// "If `hFile` was opened with `FILE_FLAG_OVERLAPPED`, the following conditions are in effect: [...] + /// The lpNumberOfBytesRead parameter should be set to NULL. Use the GetOverlappedResult function to + /// get the actual number of bytes read." + /// See: https://learn.microsoft.com/en-us/windows/win32/api/fileapi/nf-fileapi-readfile + fn windowsGetReadResult( + handle: windows.HANDLE, + overlapped: *windows.OVERLAPPED, + allow_aborted: bool, + ) !union(enum) { + success: u32, + closed, + aborted, + } { + var num_bytes_read: u32 = undefined; + if (0 == windows.kernel32.GetOverlappedResult( + handle, + overlapped, + &num_bytes_read, + 0, + )) switch (windows.GetLastError()) { + .BROKEN_PIPE => return .closed, + .OPERATION_ABORTED => |err| if (allow_aborted) { + return .aborted; + } else { + return windows.unexpectedError(err); + }, + else => |err| return windows.unexpectedError(err), + }; + return .{ .success = num_bytes_read }; + } }; - return .{ .success = num_bytes_read }; } /// Given an enum, returns a struct with fields of that enum, each field @@ -879,10 +931,10 @@ pub fn PollFiles(comptime StreamEnum: type) type { for (&struct_fields, enum_fields) |*struct_field, enum_field| { struct_field.* = .{ .name = enum_field.name, - .type = fs.File, + .type = std.fs.File, .default_value_ptr = null, .is_comptime = false, - .alignment = @alignOf(fs.File), + .alignment = @alignOf(std.fs.File), }; } return @Type(.{ .@"struct" = .{ diff --git a/lib/std/Io/Reader.zig b/lib/std/Io/Reader.zig index 5c7fdafe76..f03a774e0c 100644 --- a/lib/std/Io/Reader.zig +++ b/lib/std/Io/Reader.zig @@ -1241,37 +1241,6 @@ pub fn fillAlloc(r: *Reader, allocator: Allocator, n: usize) FillAllocError!void return fill(r, n); } -/// Returns a slice into the unused capacity of `buffer` with at least -/// `min_len` bytes, extending `buffer` by resizing it with `gpa` as necessary. -/// -/// After calling this function, typically the caller will follow up with a -/// call to `advanceBufferEnd` to report the actual number of bytes buffered. -pub fn writableSliceGreedyAlloc(r: *Reader, allocator: Allocator, min_len: usize) Allocator.Error![]u8 { - { - const unused = r.buffer[r.end..]; - if (unused.len >= min_len) return unused; - } - if (r.seek > 0) rebase(r); - { - var list: ArrayList(u8) = .{ - .items = r.buffer[0..r.end], - .capacity = r.buffer.len, - }; - defer r.buffer = list.allocatedSlice(); - try list.ensureUnusedCapacity(allocator, min_len); - } - const unused = r.buffer[r.end..]; - assert(unused.len >= min_len); - return unused; -} - -/// After writing directly into the unused capacity of `buffer`, this function -/// updates `end` so that users of `Reader` can receive the data. -pub fn advanceBufferEnd(r: *Reader, n: usize) void { - assert(n <= r.buffer.len - r.end); - r.end += n; -} - fn takeMultipleOf7Leb128(r: *Reader, comptime Result: type) TakeLeb128Error!Result { const result_info = @typeInfo(Result).int; comptime assert(result_info.bits % 7 == 0); diff --git a/lib/std/process/Child.zig b/lib/std/process/Child.zig index c2effb523a..21cc545f12 100644 --- a/lib/std/process/Child.zig +++ b/lib/std/process/Child.zig @@ -14,6 +14,7 @@ const assert = std.debug.assert; const native_os = builtin.os.tag; const Allocator = std.mem.Allocator; const ChildProcess = @This(); +const ArrayList = std.ArrayListUnmanaged; pub const Id = switch (native_os) { .windows => windows.HANDLE, @@ -348,19 +349,6 @@ pub const RunResult = struct { stderr: []u8, }; -fn writeFifoDataToArrayList(allocator: Allocator, list: *std.ArrayListUnmanaged(u8), fifo: *std.io.PollFifo) !void { - if (fifo.head != 0) fifo.realign(); - if (list.capacity == 0) { - list.* = .{ - .items = fifo.buf[0..fifo.count], - .capacity = fifo.buf.len, - }; - fifo.* = std.io.PollFifo.init(fifo.allocator); - } else { - try list.appendSlice(allocator, fifo.buf[0..fifo.count]); - } -} - /// Collect the output from the process's stdout and stderr. Will return once all output /// has been collected. This does not mean that the process has ended. `wait` should still /// be called to wait for and clean up the process. @@ -370,28 +358,48 @@ pub fn collectOutput( child: ChildProcess, /// Used for `stdout` and `stderr`. allocator: Allocator, - stdout: *std.ArrayListUnmanaged(u8), - stderr: *std.ArrayListUnmanaged(u8), + stdout: *ArrayList(u8), + stderr: *ArrayList(u8), max_output_bytes: usize, ) !void { assert(child.stdout_behavior == .Pipe); assert(child.stderr_behavior == .Pipe); - var poller = std.io.poll(allocator, enum { stdout, stderr }, .{ + var poller = std.Io.poll(allocator, enum { stdout, stderr }, .{ .stdout = child.stdout.?, .stderr = child.stderr.?, }); defer poller.deinit(); - while (try poller.poll()) { - if (poller.fifo(.stdout).count > max_output_bytes) - return error.StdoutStreamTooLong; - if (poller.fifo(.stderr).count > max_output_bytes) - return error.StderrStreamTooLong; + const stdout_r = poller.reader(.stdout); + stdout_r.buffer = stdout.allocatedSlice(); + stdout_r.seek = 0; + stdout_r.end = stdout.items.len; + + const stderr_r = poller.reader(.stderr); + stderr_r.buffer = stderr.allocatedSlice(); + stderr_r.seek = 0; + stderr_r.end = stderr.items.len; + + defer { + stdout.* = .{ + .items = stdout_r.buffer[0..stdout_r.end], + .capacity = stdout_r.buffer.len, + }; + stderr.* = .{ + .items = stderr_r.buffer[0..stderr_r.end], + .capacity = stderr_r.buffer.len, + }; + stdout_r.buffer = &.{}; + stderr_r.buffer = &.{}; } - try writeFifoDataToArrayList(allocator, stdout, poller.fifo(.stdout)); - try writeFifoDataToArrayList(allocator, stderr, poller.fifo(.stderr)); + while (try poller.poll()) { + if (stdout_r.bufferedLen() > max_output_bytes) + return error.StdoutStreamTooLong; + if (stderr_r.bufferedLen() > max_output_bytes) + return error.StderrStreamTooLong; + } } pub const RunError = posix.GetCwdError || posix.ReadError || SpawnError || posix.PollError || error{ @@ -421,10 +429,10 @@ pub fn run(args: struct { child.expand_arg0 = args.expand_arg0; child.progress_node = args.progress_node; - var stdout: std.ArrayListUnmanaged(u8) = .empty; - errdefer stdout.deinit(args.allocator); - var stderr: std.ArrayListUnmanaged(u8) = .empty; - errdefer stderr.deinit(args.allocator); + var stdout: ArrayList(u8) = .empty; + defer stdout.deinit(args.allocator); + var stderr: ArrayList(u8) = .empty; + defer stderr.deinit(args.allocator); try child.spawn(); errdefer { @@ -432,7 +440,7 @@ pub fn run(args: struct { } try child.collectOutput(args.allocator, &stdout, &stderr, args.max_output_bytes); - return RunResult{ + return .{ .stdout = try stdout.toOwnedSlice(args.allocator), .stderr = try stderr.toOwnedSlice(args.allocator), .term = try child.wait(), @@ -878,12 +886,12 @@ fn spawnWindows(self: *ChildProcess) SpawnError!void { var cmd_line_cache = WindowsCommandLineCache.init(self.allocator, self.argv); defer cmd_line_cache.deinit(); - var app_buf: std.ArrayListUnmanaged(u16) = .empty; + var app_buf: ArrayList(u16) = .empty; defer app_buf.deinit(self.allocator); try app_buf.appendSlice(self.allocator, app_name_w); - var dir_buf: std.ArrayListUnmanaged(u16) = .empty; + var dir_buf: ArrayList(u16) = .empty; defer dir_buf.deinit(self.allocator); if (cwd_path_w.len > 0) { @@ -1003,13 +1011,16 @@ fn forkChildErrReport(fd: i32, err: ChildProcess.SpawnError) noreturn { } fn writeIntFd(fd: i32, value: ErrInt) !void { - const file: File = .{ .handle = fd }; - file.deprecatedWriter().writeInt(u64, @intCast(value), .little) catch return error.SystemResources; + var buffer: [8]u8 = undefined; + var fw: std.fs.File.Writer = .initMode(.{ .handle = fd }, &buffer, .streaming); + fw.interface.writeInt(u64, value, .little) catch unreachable; + fw.interface.flush() catch return error.SystemResources; } fn readIntFd(fd: i32) !ErrInt { - const file: File = .{ .handle = fd }; - return @intCast(file.deprecatedReader().readInt(u64, .little) catch return error.SystemResources); + var buffer: [8]u8 = undefined; + var fr: std.fs.File.Reader = .initMode(.{ .handle = fd }, &buffer, .streaming); + return @intCast(fr.interface.takeInt(u64, .little) catch return error.SystemResources); } const ErrInt = std.meta.Int(.unsigned, @sizeOf(anyerror) * 8); @@ -1020,8 +1031,8 @@ const ErrInt = std.meta.Int(.unsigned, @sizeOf(anyerror) * 8); /// Note: If the dir is the cwd, dir_buf should be empty (len = 0). fn windowsCreateProcessPathExt( allocator: mem.Allocator, - dir_buf: *std.ArrayListUnmanaged(u16), - app_buf: *std.ArrayListUnmanaged(u16), + dir_buf: *ArrayList(u16), + app_buf: *ArrayList(u16), pathext: [:0]const u16, cmd_line_cache: *WindowsCommandLineCache, envp_ptr: ?[*]u16, @@ -1504,7 +1515,7 @@ const WindowsCommandLineCache = struct { /// Returns the absolute path of `cmd.exe` within the Windows system directory. /// The caller owns the returned slice. fn windowsCmdExePath(allocator: mem.Allocator) error{ OutOfMemory, Unexpected }![:0]u16 { - var buf = try std.ArrayListUnmanaged(u16).initCapacity(allocator, 128); + var buf = try ArrayList(u16).initCapacity(allocator, 128); errdefer buf.deinit(allocator); while (true) { const unused_slice = buf.unusedCapacitySlice(); diff --git a/src/Compilation.zig b/src/Compilation.zig index 2b661c04d6..3cbddee64b 100644 --- a/src/Compilation.zig +++ b/src/Compilation.zig @@ -6215,19 +6215,20 @@ fn spawnZigRc( return comp.failWin32Resource(win32_resource, "unable to spawn {s} rc: {s}", .{ argv[0], @errorName(err) }); }; - var poller = std.io.poll(comp.gpa, enum { stdout }, .{ + var poller = std.Io.poll(comp.gpa, enum { stdout, stderr }, .{ .stdout = child.stdout.?, + .stderr = child.stderr.?, }); defer poller.deinit(); - const stdout = poller.fifo(.stdout); + const stdout = poller.reader(.stdout); poll: while (true) { - while (stdout.readableLength() < @sizeOf(std.zig.Server.Message.Header)) if (!try poller.poll()) break :poll; - var header: std.zig.Server.Message.Header = undefined; - assert(stdout.read(std.mem.asBytes(&header)) == @sizeOf(std.zig.Server.Message.Header)); - while (stdout.readableLength() < header.bytes_len) if (!try poller.poll()) break :poll; - const body = stdout.readableSliceOfLen(header.bytes_len); + const MessageHeader = std.zig.Server.Message.Header; + while (stdout.buffered().len < @sizeOf(MessageHeader)) if (!try poller.poll()) break :poll; + const header = stdout.takeStruct(MessageHeader, .little) catch unreachable; + while (stdout.buffered().len < header.bytes_len) if (!try poller.poll()) break :poll; + const body = stdout.take(header.bytes_len) catch unreachable; switch (header.tag) { // We expect exactly one ErrorBundle, and if any error_bundle header is @@ -6250,13 +6251,10 @@ fn spawnZigRc( }, else => {}, // ignore other messages } - - stdout.discard(body.len); } // Just in case there's a failure that didn't send an ErrorBundle (e.g. an error return trace) - const stderr_reader = child.stderr.?.deprecatedReader(); - const stderr = try stderr_reader.readAllAlloc(arena, 10 * 1024 * 1024); + const stderr = poller.reader(.stderr); const term = child.wait() catch |err| { return comp.failWin32Resource(win32_resource, "unable to wait for {s} rc: {s}", .{ argv[0], @errorName(err) }); @@ -6265,12 +6263,12 @@ fn spawnZigRc( switch (term) { .Exited => |code| { if (code != 0) { - log.err("zig rc failed with stderr:\n{s}", .{stderr}); + log.err("zig rc failed with stderr:\n{s}", .{stderr.buffered()}); return comp.failWin32Resource(win32_resource, "zig rc exited with code {d}", .{code}); } }, else => { - log.err("zig rc terminated with stderr:\n{s}", .{stderr}); + log.err("zig rc terminated with stderr:\n{s}", .{stderr.buffered()}); return comp.failWin32Resource(win32_resource, "zig rc terminated unexpectedly", .{}); }, } diff --git a/tools/docgen.zig b/tools/docgen.zig index 9f98968c9e..3de9da8c88 100644 --- a/tools/docgen.zig +++ b/tools/docgen.zig @@ -3,7 +3,6 @@ const builtin = @import("builtin"); const io = std.io; const fs = std.fs; const process = std.process; -const ChildProcess = std.process.Child; const Progress = std.Progress; const print = std.debug.print; const mem = std.mem; diff --git a/tools/incr-check.zig b/tools/incr-check.zig index 08b8a21e3b..c187c84ae5 100644 --- a/tools/incr-check.zig +++ b/tools/incr-check.zig @@ -186,7 +186,7 @@ pub fn main() !void { try child.spawn(); - var poller = std.io.poll(arena, Eval.StreamEnum, .{ + var poller = std.Io.poll(arena, Eval.StreamEnum, .{ .stdout = child.stdout.?, .stderr = child.stderr.?, }); @@ -247,19 +247,15 @@ const Eval = struct { fn check(eval: *Eval, poller: *Poller, update: Case.Update, prog_node: std.Progress.Node) !void { const arena = eval.arena; - const Header = std.zig.Server.Message.Header; - const stdout = poller.fifo(.stdout); - const stderr = poller.fifo(.stderr); + const stdout = poller.reader(.stdout); + const stderr = poller.reader(.stderr); poll: while (true) { - while (stdout.readableLength() < @sizeOf(Header)) { - if (!(try poller.poll())) break :poll; - } - const header = stdout.reader().readStruct(Header) catch unreachable; - while (stdout.readableLength() < header.bytes_len) { - if (!(try poller.poll())) break :poll; - } - const body = stdout.readableSliceOfLen(header.bytes_len); + const Header = std.zig.Server.Message.Header; + while (stdout.buffered().len < @sizeOf(Header)) if (!try poller.poll()) break :poll; + const header = stdout.takeStruct(Header, .little) catch unreachable; + while (stdout.buffered().len < header.bytes_len) if (!try poller.poll()) break :poll; + const body = stdout.take(header.bytes_len) catch unreachable; switch (header.tag) { .error_bundle => { @@ -277,8 +273,8 @@ const Eval = struct { .string_bytes = try arena.dupe(u8, string_bytes), .extra = extra_array, }; - if (stderr.readableLength() > 0) { - const stderr_data = try stderr.toOwnedSlice(); + if (stderr.bufferedLen() > 0) { + const stderr_data = try poller.toOwnedSlice(.stderr); if (eval.allow_stderr) { std.log.info("error_bundle included stderr:\n{s}", .{stderr_data}); } else { @@ -289,15 +285,14 @@ const Eval = struct { try eval.checkErrorOutcome(update, result_error_bundle); } // This message indicates the end of the update. - stdout.discard(body.len); return; }, .emit_digest => { const EbpHdr = std.zig.Server.Message.EmitDigest; const ebp_hdr = @as(*align(1) const EbpHdr, @ptrCast(body)); _ = ebp_hdr; - if (stderr.readableLength() > 0) { - const stderr_data = try stderr.toOwnedSlice(); + if (stderr.bufferedLen() > 0) { + const stderr_data = try poller.toOwnedSlice(.stderr); if (eval.allow_stderr) { std.log.info("emit_digest included stderr:\n{s}", .{stderr_data}); } else { @@ -308,7 +303,6 @@ const Eval = struct { if (eval.target.backend == .sema) { try eval.checkSuccessOutcome(update, null, prog_node); // This message indicates the end of the update. - stdout.discard(body.len); } const digest = body[@sizeOf(EbpHdr)..][0..Cache.bin_digest_len]; @@ -323,21 +317,18 @@ const Eval = struct { try eval.checkSuccessOutcome(update, bin_path, prog_node); // This message indicates the end of the update. - stdout.discard(body.len); }, else => { // Ignore other messages. - stdout.discard(body.len); }, } } - if (stderr.readableLength() > 0) { - const stderr_data = try stderr.toOwnedSlice(); + if (stderr.bufferedLen() > 0) { if (eval.allow_stderr) { - std.log.info("update '{s}' included stderr:\n{s}", .{ update.name, stderr_data }); + std.log.info("update '{s}' included stderr:\n{s}", .{ update.name, stderr.buffered() }); } else { - eval.fatal("update '{s}' failed:\n{s}", .{ update.name, stderr_data }); + eval.fatal("update '{s}' failed:\n{s}", .{ update.name, stderr.buffered() }); } } @@ -537,25 +528,19 @@ const Eval = struct { fn end(eval: *Eval, poller: *Poller) !void { requestExit(eval.child, eval); - const Header = std.zig.Server.Message.Header; - const stdout = poller.fifo(.stdout); - const stderr = poller.fifo(.stderr); + const stdout = poller.reader(.stdout); + const stderr = poller.reader(.stderr); poll: while (true) { - while (stdout.readableLength() < @sizeOf(Header)) { - if (!(try poller.poll())) break :poll; - } - const header = stdout.reader().readStruct(Header) catch unreachable; - while (stdout.readableLength() < header.bytes_len) { - if (!(try poller.poll())) break :poll; - } - const body = stdout.readableSliceOfLen(header.bytes_len); - stdout.discard(body.len); + const Header = std.zig.Server.Message.Header; + while (stdout.buffered().len < @sizeOf(Header)) if (!try poller.poll()) break :poll; + const header = stdout.takeStruct(Header, .little) catch unreachable; + while (stdout.buffered().len < header.bytes_len) if (!try poller.poll()) break :poll; + stdout.toss(header.bytes_len); } - if (stderr.readableLength() > 0) { - const stderr_data = try stderr.toOwnedSlice(); - eval.fatal("unexpected stderr:\n{s}", .{stderr_data}); + if (stderr.bufferedLen() > 0) { + eval.fatal("unexpected stderr:\n{s}", .{stderr.buffered()}); } }