diff --git a/lib/std/Progress.zig b/lib/std/Progress.zig index d6b06e7be2..1765dc9825 100644 --- a/lib/std/Progress.zig +++ b/lib/std/Progress.zig @@ -324,7 +324,7 @@ var global_progress: Progress = .{ .node_end_index = 0, }; -const node_storage_buffer_len = 200; +const node_storage_buffer_len = 83; var node_parents_buffer: [node_storage_buffer_len]Node.Parent = undefined; var node_storage_buffer: [node_storage_buffer_len]Node.Storage = undefined; var node_freelist_buffer: [node_storage_buffer_len]Node.OptionalIndex = undefined; @@ -755,8 +755,10 @@ const Serialized = struct { parents_copy: [node_storage_buffer_len]Node.Parent, storage_copy: [node_storage_buffer_len]Node.Storage, + ipc_metadata_fds_copy: [node_storage_buffer_len]Fd, ipc_metadata_copy: [node_storage_buffer_len]SavedMetadata, + ipc_metadata_fds: [node_storage_buffer_len]Fd, ipc_metadata: [node_storage_buffer_len]SavedMetadata, }; }; @@ -810,36 +812,39 @@ fn serialize(serialized_buffer: *Serialized.Buffer) Serialized { } const SavedMetadata = struct { - ipc_fd: u16, + remaining_read_trash_bytes: u16, main_index: u8, start_index: u8, nodes_len: u8, +}; - fn getIpcFd(metadata: SavedMetadata) posix.fd_t { - return if (is_windows) - @ptrFromInt(@as(usize, metadata.ipc_fd) << 2) - else - metadata.ipc_fd; +const Fd = enum(i32) { + _, + + fn init(fd: posix.fd_t) Fd { + return @enumFromInt(if (is_windows) @as(isize, @bitCast(@intFromPtr(fd))) else fd); } - fn setIpcFd(fd: posix.fd_t) u16 { - return @intCast(if (is_windows) - @shrExact(@intFromPtr(fd), 2) + fn get(fd: Fd) posix.fd_t { + return if (is_windows) + @ptrFromInt(@as(usize, @bitCast(@as(isize, @intFromEnum(fd))))) else - fd); + @intFromEnum(fd); } }; var ipc_metadata_len: u8 = 0; -var remaining_read_trash_bytes: usize = 0; fn serializeIpc(start_serialized_len: usize, serialized_buffer: *Serialized.Buffer) usize { + const ipc_metadata_fds_copy = &serialized_buffer.ipc_metadata_fds_copy; const ipc_metadata_copy = &serialized_buffer.ipc_metadata_copy; + const ipc_metadata_fds = &serialized_buffer.ipc_metadata_fds; const ipc_metadata = &serialized_buffer.ipc_metadata; var serialized_len = start_serialized_len; - var pipe_buf: [2 * 4096]u8 align(4) = undefined; + var pipe_buf: [2 * 4096]u8 = undefined; + const old_ipc_metadata_fds = ipc_metadata_fds_copy[0..ipc_metadata_len]; const old_ipc_metadata = ipc_metadata_copy[0..ipc_metadata_len]; ipc_metadata_len = 0; @@ -850,6 +855,7 @@ fn serializeIpc(start_serialized_len: usize, serialized_buffer: *Serialized.Buff ) |main_parent, *main_storage, main_index| { if (main_parent == .unused) continue; const fd = main_storage.getIpcFd() orelse continue; + const opt_saved_metadata = findOld(fd, old_ipc_metadata_fds, old_ipc_metadata); var bytes_read: usize = 0; while (true) { const n = posix.read(fd, pipe_buf[bytes_read..]) catch |err| switch (err) { @@ -862,24 +868,26 @@ fn serializeIpc(start_serialized_len: usize, serialized_buffer: *Serialized.Buff }, }; if (n == 0) break; - if (remaining_read_trash_bytes > 0) { - assert(bytes_read == 0); - if (remaining_read_trash_bytes >= n) { - remaining_read_trash_bytes -= n; + if (opt_saved_metadata) |m| { + if (m.remaining_read_trash_bytes > 0) { + assert(bytes_read == 0); + if (m.remaining_read_trash_bytes >= n) { + m.remaining_read_trash_bytes = @intCast(m.remaining_read_trash_bytes - n); + continue; + } + const src = pipe_buf[m.remaining_read_trash_bytes..n]; + std.mem.copyForwards(u8, &pipe_buf, src); + m.remaining_read_trash_bytes = 0; + bytes_read = src.len; continue; } - const src = pipe_buf[remaining_read_trash_bytes..n]; - std.mem.copyForwards(u8, &pipe_buf, src); - remaining_read_trash_bytes = 0; - bytes_read = src.len; - continue; } bytes_read += n; } // Ignore all but the last message on the pipe. var input: []u8 = pipe_buf[0..bytes_read]; if (input.len == 0) { - serialized_len = useSavedIpcData(serialized_len, serialized_buffer, main_storage, main_index, old_ipc_metadata); + serialized_len = useSavedIpcData(serialized_len, serialized_buffer, main_storage, main_index, opt_saved_metadata, 0, fd); continue; } @@ -888,9 +896,8 @@ fn serializeIpc(start_serialized_len: usize, serialized_buffer: *Serialized.Buff const expected_bytes = 1 + subtree_len * (@sizeOf(Node.Storage) + @sizeOf(Node.Parent)); if (input.len < expected_bytes) { // Ignore short reads. We'll handle the next full message when it comes instead. - assert(remaining_read_trash_bytes == 0); - remaining_read_trash_bytes = expected_bytes - input.len; - serialized_len = useSavedIpcData(serialized_len, serialized_buffer, main_storage, main_index, old_ipc_metadata); + const remaining_read_trash_bytes: u16 = @intCast(expected_bytes - input.len); + serialized_len = useSavedIpcData(serialized_len, serialized_buffer, main_storage, main_index, opt_saved_metadata, remaining_read_trash_bytes, fd); continue :main_loop; } if (input.len > expected_bytes) { @@ -908,8 +915,9 @@ fn serializeIpc(start_serialized_len: usize, serialized_buffer: *Serialized.Buff const nodes_len: u8 = @intCast(@min(parents.len - 1, serialized_buffer.storage.len - serialized_len)); // Remember in case the pipe is empty on next update. + ipc_metadata_fds[ipc_metadata_len] = Fd.init(fd); ipc_metadata[ipc_metadata_len] = .{ - .ipc_fd = SavedMetadata.setIpcFd(fd), + .remaining_read_trash_bytes = 0, .start_index = @intCast(serialized_len), .nodes_len = nodes_len, .main_index = @intCast(main_index), @@ -950,6 +958,7 @@ fn serializeIpc(start_serialized_len: usize, serialized_buffer: *Serialized.Buff // Save a copy in case any pipes are empty on the next update. @memcpy(serialized_buffer.parents_copy[0..serialized_len], serialized_buffer.parents[0..serialized_len]); @memcpy(serialized_buffer.storage_copy[0..serialized_len], serialized_buffer.storage[0..serialized_len]); + @memcpy(ipc_metadata_fds_copy[0..ipc_metadata_len], ipc_metadata_fds[0..ipc_metadata_len]); @memcpy(ipc_metadata_copy[0..ipc_metadata_len], ipc_metadata[0..ipc_metadata_len]); return serialized_len; @@ -963,9 +972,13 @@ fn copyRoot(dest: *Node.Storage, src: *align(1) Node.Storage) void { }; } -fn findOld(ipc_fd: posix.fd_t, old_metadata: []const SavedMetadata) ?*const SavedMetadata { - for (old_metadata) |*m| { - if (m.getIpcFd() == ipc_fd) +fn findOld( + ipc_fd: posix.fd_t, + old_metadata_fds: []Fd, + old_metadata: []SavedMetadata, +) ?*SavedMetadata { + for (old_metadata_fds, old_metadata) |fd, *m| { + if (fd.get() == ipc_fd) return m; } return null; @@ -976,16 +989,28 @@ fn useSavedIpcData( serialized_buffer: *Serialized.Buffer, main_storage: *Node.Storage, main_index: usize, - old_metadata: []const SavedMetadata, + opt_saved_metadata: ?*SavedMetadata, + remaining_read_trash_bytes: u16, + fd: posix.fd_t, ) usize { const parents_copy = &serialized_buffer.parents_copy; const storage_copy = &serialized_buffer.storage_copy; + const ipc_metadata_fds = &serialized_buffer.ipc_metadata_fds; const ipc_metadata = &serialized_buffer.ipc_metadata; - const ipc_fd = main_storage.getIpcFd().?; - const saved_metadata = findOld(ipc_fd, old_metadata) orelse { + const saved_metadata = opt_saved_metadata orelse { main_storage.completed_count = 0; main_storage.estimated_total_count = 0; + if (remaining_read_trash_bytes > 0) { + ipc_metadata_fds[ipc_metadata_len] = Fd.init(fd); + ipc_metadata[ipc_metadata_len] = .{ + .remaining_read_trash_bytes = remaining_read_trash_bytes, + .start_index = @intCast(start_serialized_len), + .nodes_len = 0, + .main_index = @intCast(main_index), + }; + ipc_metadata_len += 1; + } return start_serialized_len; }; @@ -993,8 +1018,9 @@ fn useSavedIpcData( const nodes_len = @min(saved_metadata.nodes_len, serialized_buffer.storage.len - start_serialized_len); const old_main_index = saved_metadata.main_index; + ipc_metadata_fds[ipc_metadata_len] = Fd.init(fd); ipc_metadata[ipc_metadata_len] = .{ - .ipc_fd = SavedMetadata.setIpcFd(ipc_fd), + .remaining_read_trash_bytes = remaining_read_trash_bytes, .start_index = @intCast(start_serialized_len), .nodes_len = nodes_len, .main_index = @intCast(main_index), @@ -1209,6 +1235,11 @@ fn writeIpc(fd: posix.fd_t, serialized: Serialized) error{BrokenPipe}!void { .{ .base = parents.ptr, .len = parents.len }, }; + // Ensures the packet can fit in the pipe buffer. + const upper_bound_msg_len = 1 + node_storage_buffer_len * @sizeOf(Node.Storage) + + node_storage_buffer_len * @sizeOf(Node.OptionalIndex); + comptime assert(upper_bound_msg_len <= 4096); + while (remaining_write_trash_bytes > 0) { // We do this in a separate write call to give a better chance for the // writev below to be in a single packet. @@ -1228,7 +1259,7 @@ fn writeIpc(fd: posix.fd_t, serialized: Serialized) error{BrokenPipe}!void { // If this write would block we do not want to keep trying, but we need to // know if a partial message was written. - if (posix.writev(fd, &vecs)) |written| { + if (writevNonblock(fd, &vecs)) |written| { const total = header.len + storage.len + parents.len; if (written < total) { remaining_write_trash_bytes = total - written; @@ -1243,6 +1274,23 @@ fn writeIpc(fd: posix.fd_t, serialized: Serialized) error{BrokenPipe}!void { } } +fn writevNonblock(fd: posix.fd_t, iov: []posix.iovec_const) posix.WriteError!usize { + var iov_index: usize = 0; + var written: usize = 0; + var total_written: usize = 0; + while (true) { + while (if (iov_index < iov.len) + written >= iov[iov_index].len + else + return total_written) : (iov_index += 1) written -= iov[iov_index].len; + iov[iov_index].base += written; + iov[iov_index].len -= written; + written = try posix.writev(fd, iov[iov_index..]); + if (written == 0) return total_written; + total_written += written; + } +} + fn maybeUpdateSize(resize_flag: bool) void { if (!resize_flag) return;