diff --git a/lib/std/Progress.zig b/lib/std/Progress.zig index 199a976b6d..76ef51943d 100644 --- a/lib/std/Progress.zig +++ b/lib/std/Progress.zig @@ -83,6 +83,22 @@ pub const Node = struct { /// Little endian. estimated_total_count: u32, name: [max_name_len]u8, + + fn getIpcFd(s: Storage) ?posix.fd_t { + if (s.estimated_total_count != std.math.maxInt(u32)) + return null; + + return @bitCast(s.completed_count); + } + + fn setIpcFd(s: *Storage, fd: posix.fd_t) void { + s.estimated_total_count = std.math.maxInt(u32); + s.completed_count = @bitCast(fd); + } + + comptime { + assert((@sizeOf(Storage) % 4) == 0); + } }; const Parent = enum(u16) { @@ -201,6 +217,13 @@ pub const Node = struct { } } + /// Posix-only. Used by `std.process.Child`. + pub fn setIpcFd(node: Node, fd: posix.fd_t) void { + const index = node.index.unwrap() orelse return; + assert(fd != -1); + storageByIndex(index).setIpcFd(fd); + } + fn storageByIndex(index: Node.Index) *Node.Storage { return &global_progress.node_storage[@intFromEnum(index)]; } @@ -475,14 +498,8 @@ fn serialize() Serialized { } } - // Now we can analyze our copy of the graph without atomics, reconstructing - // children lists which do not exist in the canonical data. These are - // needed for tree traversal below. - const serialized_node_parents = serialized_node_parents_buffer[0..serialized_len]; - const serialized_node_storage = serialized_node_storage_buffer[0..serialized_len]; - // Remap parents to point inside serialized arrays. - for (serialized_node_parents) |*parent| { + for (serialized_node_parents_buffer[0..serialized_len]) |*parent| { parent.* = switch (parent.*) { .unused => unreachable, .none => .none, @@ -490,15 +507,99 @@ fn serialize() Serialized { }; } + // Find nodes which correspond to child processes. + var pipe_buf: [4096]u8 align(4) = undefined; + + for ( + serialized_node_parents_buffer[0..serialized_len], + serialized_node_storage_buffer[0..serialized_len], + 0.., + ) |main_parent, *main_storage, main_index| { + if (main_parent == .unused) continue; + const fd = main_storage.getIpcFd() orelse continue; + var bytes_read: usize = 0; + while (true) { + bytes_read += posix.read(fd, pipe_buf[bytes_read..]) catch |err| switch (err) { + error.WouldBlock => break, + else => |e| { + std.log.warn("failed to read child progress data: {s}", .{@errorName(e)}); + main_storage.completed_count = 0; + main_storage.estimated_total_count = 0; + continue; + }, + }; + } + // Ignore all but the last message on the pipe. + var input: []align(2) u8 = pipe_buf[0..bytes_read]; + if (input.len == 0) { + main_storage.completed_count = 0; + main_storage.estimated_total_count = 0; + continue; + } + + const storage, const parents = while (true) { + if (input.len < 4) { + std.log.warn("short read: {d} out of 4 header bytes", .{input.len}); + main_storage.completed_count = 0; + main_storage.estimated_total_count = 0; + continue; + } + const subtree_len = std.mem.readInt(u32, input[0..4], .little); + const expected_bytes = 4 + subtree_len * (@sizeOf(Node.Storage) + @sizeOf(Node.Parent)); + if (input.len < expected_bytes) { + std.log.warn("short read: {d} out of {d} ({d} nodes)", .{ input.len, expected_bytes, subtree_len }); + main_storage.completed_count = 0; + main_storage.estimated_total_count = 0; + continue; + } + if (input.len > expected_bytes) { + input = @alignCast(input[expected_bytes..]); + continue; + } + const storage_bytes = input[4..][0 .. subtree_len * @sizeOf(Node.Storage)]; + const parents_bytes = input[4 + storage_bytes.len ..][0 .. subtree_len * @sizeOf(Node.Parent)]; + break .{ + std.mem.bytesAsSlice(Node.Storage, storage_bytes), + std.mem.bytesAsSlice(Node.Parent, parents_bytes), + }; + }; + + // Mount the root here. + main_storage.* = storage[0]; + + // Copy the rest of the tree to the end. + @memcpy(serialized_node_storage_buffer[serialized_len..][0 .. storage.len - 1], storage[1..]); + + // Patch up parent pointers taking into account how the subtree is mounted. + serialized_node_parents_buffer[serialized_len] = .none; + + for (serialized_node_parents_buffer[serialized_len..][0 .. parents.len - 1], parents[1..]) |*dest, p| { + dest.* = switch (p) { + // Fix bad data so the rest of the code does not see `unused`. + .none, .unused => .none, + // Root node is being mounted here. + @as(Node.Parent, @enumFromInt(0)) => @enumFromInt(main_index), + // Other nodes mounted at the end. + _ => |off| @enumFromInt(serialized_len + @intFromEnum(off) - 1), + }; + } + + serialized_len += storage.len - 1; + } + return .{ - .parents = serialized_node_parents, - .storage = serialized_node_storage, + .parents = serialized_node_parents_buffer[0..serialized_len], + .storage = serialized_node_storage_buffer[0..serialized_len], }; } fn computeRedraw() []u8 { const serialized = serialize(); + // Now we can analyze our copy of the graph without atomics, reconstructing + // children lists which do not exist in the canonical data. These are + // needed for tree traversal below. + var children_buffer: [default_node_storage_buffer_len]Children = undefined; const children = children_buffer[0..serialized.parents.len]; @@ -624,7 +725,8 @@ fn write(buf: []const u8) void { fn writeIpc(fd: posix.fd_t, serialized: Serialized) void { assert(serialized.parents.len == serialized.storage.len); - const header = std.mem.asBytes(&serialized.parents.len); + const serialized_len: u32 = @intCast(serialized.parents.len); + const header = std.mem.asBytes(&serialized_len); const storage = std.mem.sliceAsBytes(serialized.storage); const parents = std.mem.sliceAsBytes(serialized.parents); @@ -637,10 +739,17 @@ fn writeIpc(fd: posix.fd_t, serialized: Serialized) void { // TODO: if big endian, byteswap // this is needed because the parent or child process might be running in qemu - const file: std.fs.File = .{ .handle = fd }; - file.writevAll(&vecs) catch |err| { - std.log.warn("failed to send progress to parent process: {s}", .{@errorName(err)}); - }; + // If this write would block we do not want to keep trying, but we need to + // know if a partial message was written. + if (posix.writev(fd, &vecs)) |written| { + const total = header.len + storage.len + parents.len; + if (written < total) { + std.log.warn("short write: {d} out of {d}", .{ written, total }); + } + } else |err| switch (err) { + error.WouldBlock => {}, + else => |e| std.log.warn("failed to send progress to parent process: {s}", .{@errorName(e)}), + } } fn maybeUpdateSize(resize_flag: bool) void { diff --git a/lib/std/process/Child.zig b/lib/std/process/Child.zig index ef4a5c79c5..09664876dd 100644 --- a/lib/std/process/Child.zig +++ b/lib/std/process/Child.zig @@ -98,7 +98,10 @@ resource_usage_statistics: ResourceUsageStatistics = .{}, /// write end of the pipe will be specified in the `ZIG_PROGRESS` /// environment variable inside the child process. The progress reported by /// the child will be attached to this progress node in the parent process. -parent_progress_node: std.Progress.Node = .{ .index = .none }, +/// +/// The child's progress tree will be grafted into the parent's progress tree, +/// by substituting this node with the child's root node. +progress_node: std.Progress.Node = .{ .index = .none }, pub const ResourceUsageStatistics = struct { rusage: @TypeOf(rusage_init) = rusage_init, @@ -581,11 +584,11 @@ fn spawnPosix(self: *ChildProcess) SpawnError!void { } const prog_pipe: [2]posix.fd_t = p: { - if (self.parent_progress_node.index == .none) { + if (self.progress_node.index == .none) { break :p .{ -1, -1 }; } else { // No CLOEXEC because the child needs access to this file descriptor. - break :p try posix.pipe2(.{}); + break :p try posix.pipe2(.{ .NONBLOCK = true }); } }; errdefer destroyPipe(prog_pipe); @@ -685,18 +688,18 @@ fn spawnPosix(self: *ChildProcess) SpawnError!void { // we are the parent const pid: i32 = @intCast(pid_result); - if (self.stdin_behavior == StdIo.Pipe) { - self.stdin = File{ .handle = stdin_pipe[1] }; + if (self.stdin_behavior == .Pipe) { + self.stdin = .{ .handle = stdin_pipe[1] }; } else { self.stdin = null; } - if (self.stdout_behavior == StdIo.Pipe) { - self.stdout = File{ .handle = stdout_pipe[0] }; + if (self.stdout_behavior == .Pipe) { + self.stdout = .{ .handle = stdout_pipe[0] }; } else { self.stdout = null; } - if (self.stderr_behavior == StdIo.Pipe) { - self.stderr = File{ .handle = stderr_pipe[0] }; + if (self.stderr_behavior == .Pipe) { + self.stderr = .{ .handle = stderr_pipe[0] }; } else { self.stderr = null; } @@ -705,15 +708,17 @@ fn spawnPosix(self: *ChildProcess) SpawnError!void { self.err_pipe = err_pipe; self.term = null; - if (self.stdin_behavior == StdIo.Pipe) { + if (self.stdin_behavior == .Pipe) { posix.close(stdin_pipe[0]); } - if (self.stdout_behavior == StdIo.Pipe) { + if (self.stdout_behavior == .Pipe) { posix.close(stdout_pipe[1]); } - if (self.stderr_behavior == StdIo.Pipe) { + if (self.stderr_behavior == .Pipe) { posix.close(stderr_pipe[1]); } + + self.progress_node.setIpcFd(prog_pipe[0]); } fn spawnWindows(self: *ChildProcess) SpawnError!void {