diff --git a/src/Package/Fetch.zig b/src/Package/Fetch.zig index fab9d74a91..9b7c83cc39 100644 --- a/src/Package/Fetch.zig +++ b/src/Package/Fetch.zig @@ -38,15 +38,12 @@ const assert = std.debug.assert; const ascii = std.ascii; const Allocator = std.mem.Allocator; const Cache = std.Build.Cache; -const ThreadPool = std.Thread.Pool; -const WaitGroup = std.Thread.WaitGroup; const git = @import("Fetch/git.zig"); const Package = @import("../Package.zig"); const Manifest = Package.Manifest; const ErrorBundle = std.zig.ErrorBundle; arena: std.heap.ArenaAllocator, -io: Io, location: Location, location_tok: std.zig.Ast.TokenIndex, hash_tok: std.zig.Ast.OptionalTokenIndex, @@ -104,7 +101,8 @@ pub const LazyStatus = enum { /// Contains shared state among all `Fetch` tasks. pub const JobQueue = struct { - mutex: std.Thread.Mutex = .{}, + io: Io, + mutex: Io.Mutex = .init, /// It's an array hash map so that it can be sorted before rendering the /// dependencies.zig source file. /// Protected by `mutex`. @@ -115,8 +113,7 @@ pub const JobQueue = struct { all_fetches: std.ArrayList(*Fetch) = .empty, http_client: *std.http.Client, - thread_pool: *ThreadPool, - wait_group: WaitGroup = .{}, + group: Io.Group = .init, global_cache: Cache.Directory, /// If true then, no fetching occurs, and: /// * The `global_cache` directory is assumed to be the direct parent @@ -320,13 +317,14 @@ pub const Location = union(enum) { pub const RunError = error{ OutOfMemory, + Canceled, /// This error code is intended to be handled by inspecting the /// `error_bundle` field. FetchFailed, }; pub fn run(f: *Fetch) RunError!void { - const io = f.io; + const io = f.job_queue.io; const eb = &f.error_bundle; const arena = f.arena.allocator(); const gpa = f.arena.child_allocator; @@ -488,7 +486,7 @@ fn runResource( resource: *Resource, remote_hash: ?Package.Hash, ) RunError!void { - const io = f.io; + const io = f.job_queue.io; defer resource.deinit(io); const arena = f.arena.allocator(); const eb = &f.error_bundle; @@ -702,7 +700,8 @@ fn loadManifest(f: *Fetch, pkg_root: Cache.Path) RunError!void { } fn queueJobsForDeps(f: *Fetch) RunError!void { - const io = f.io; + const io = f.job_queue.io; + assert(f.job_queue.recursive); // If the package does not have a build.zig.zon file then there are no dependencies. @@ -722,8 +721,8 @@ fn queueJobsForDeps(f: *Fetch) RunError!void { const prog_names = try parent_arena.alloc([]const u8, deps.len); var new_fetch_index: usize = 0; - f.job_queue.mutex.lock(); - defer f.job_queue.mutex.unlock(); + try f.job_queue.mutex.lock(io); + defer f.job_queue.mutex.unlock(io); try f.job_queue.all_fetches.ensureUnusedCapacity(gpa, new_fetches.len); try f.job_queue.table.ensureUnusedCapacity(gpa, @intCast(new_fetches.len)); @@ -792,7 +791,6 @@ fn queueJobsForDeps(f: *Fetch) RunError!void { f.job_queue.all_fetches.appendAssumeCapacity(new_fetch); } new_fetch.* = .{ - .io = io, .arena = std.heap.ArenaAllocator.init(gpa), .location = location, .location_tok = dep.location_tok, @@ -830,11 +828,9 @@ fn queueJobsForDeps(f: *Fetch) RunError!void { break :nf .{ new_fetches[0..new_fetch_index], prog_names[0..new_fetch_index] }; }; - // Now it's time to give tasks to the thread pool. - const thread_pool = f.job_queue.thread_pool; - + // Now it's time to dispatch tasks. for (new_fetches, prog_names) |*new_fetch, prog_name| { - thread_pool.spawnWg(&f.job_queue.wait_group, workerRun, .{ new_fetch, prog_name }); + f.job_queue.group.async(io, workerRun, .{ new_fetch, prog_name }); } } @@ -848,6 +844,7 @@ pub fn workerRun(f: *Fetch, prog_name: []const u8) void { run(f) catch |err| switch (err) { error.OutOfMemory => f.oom_flag = true, + error.Canceled => {}, error.FetchFailed => { // Nothing to do because the errors are already reported in `error_bundle`, // and a reference is kept to the `Fetch` task inside `all_fetches`. @@ -992,7 +989,7 @@ const FileType = enum { const init_resource_buffer_size = git.Packet.max_data_length; fn initResource(f: *Fetch, uri: std.Uri, resource: *Resource, reader_buffer: []u8) RunError!void { - const io = f.io; + const io = f.job_queue.io; const arena = f.arena.allocator(); const eb = &f.error_bundle; @@ -1281,12 +1278,16 @@ fn unpackTarball(f: *Fetch, out_dir: fs.Dir, reader: *Io.Reader) RunError!Unpack return res; } -fn unzip(f: *Fetch, out_dir: fs.Dir, reader: *Io.Reader) error{ ReadFailed, OutOfMemory, FetchFailed }!UnpackResult { +fn unzip( + f: *Fetch, + out_dir: fs.Dir, + reader: *Io.Reader, +) error{ ReadFailed, OutOfMemory, Canceled, FetchFailed }!UnpackResult { // We write the entire contents to a file first because zip files // must be processed back to front and they could be too large to // load into memory. - const io = f.io; + const io = f.job_queue.io; const cache_root = f.job_queue.global_cache; const prefix = "tmp/"; const suffix = ".zip"; @@ -1306,6 +1307,7 @@ fn unzip(f: *Fetch, out_dir: fs.Dir, reader: *Io.Reader) error{ ReadFailed, OutO .read = true, }) catch |err| switch (err) { error.PathAlreadyExists => continue, + error.Canceled => return error.Canceled, else => |e| return f.fail( f.location_tok, try eb.printString("failed to create temporary zip file: {t}", .{e}), @@ -1348,7 +1350,7 @@ fn unzip(f: *Fetch, out_dir: fs.Dir, reader: *Io.Reader) error{ ReadFailed, OutO } fn unpackGitPack(f: *Fetch, out_dir: fs.Dir, resource: *Resource.Git) anyerror!UnpackResult { - const io = f.io; + const io = f.job_queue.io; const arena = f.arena.allocator(); // TODO don't try to get a gpa from an arena. expose this dependency higher up // because the backing of arena could be page allocator @@ -1486,11 +1488,11 @@ const ComputedHash = struct { /// hashed* and must not be present on the file system when calling this /// function. fn computeHash(f: *Fetch, pkg_path: Cache.Path, filter: Filter) RunError!ComputedHash { + const io = f.job_queue.io; // All the path name strings need to be in memory for sorting. const arena = f.arena.allocator(); const gpa = f.arena.child_allocator; const eb = &f.error_bundle; - const thread_pool = f.job_queue.thread_pool; const root_dir = pkg_path.root_dir.handle; // Collect all files, recursively, then sort. @@ -1514,10 +1516,8 @@ fn computeHash(f: *Fetch, pkg_path: Cache.Path, filter: Filter) RunError!Compute { // The final hash will be a hash of each file hashed independently. This // allows hashing in parallel. - var wait_group: WaitGroup = .{}; - // `computeHash` is called from a worker thread so there must not be - // any waiting without working or a deadlock could occur. - defer thread_pool.waitAndWork(&wait_group); + var group: Io.Group = .init; + defer group.wait(io); while (walker.next() catch |err| { try eb.addRootErrorMessage(.{ .msg = try eb.printString( @@ -1542,7 +1542,7 @@ fn computeHash(f: *Fetch, pkg_path: Cache.Path, filter: Filter) RunError!Compute .fs_path = fs_path, .failure = undefined, // to be populated by the worker }; - thread_pool.spawnWg(&wait_group, workerDeleteFile, .{ root_dir, deleted_file }); + group.async(io, workerDeleteFile, .{ root_dir, deleted_file }); try deleted_files.append(deleted_file); continue; } @@ -1570,7 +1570,7 @@ fn computeHash(f: *Fetch, pkg_path: Cache.Path, filter: Filter) RunError!Compute .failure = undefined, // to be populated by the worker .size = undefined, // to be populated by the worker }; - thread_pool.spawnWg(&wait_group, workerHashFile, .{ root_dir, hashed_file }); + group.async(io, workerHashFile, .{ root_dir, hashed_file }); try all_files.append(hashed_file); } } @@ -2241,7 +2241,6 @@ fn saveEmbedFile(comptime tarball_name: []const u8, dir: fs.Dir) !void { // Builds Fetch with required dependencies, clears dependencies on deinit(). const TestFetchBuilder = struct { - thread_pool: ThreadPool, http_client: std.http.Client, global_cache_directory: Cache.Directory, job_queue: Fetch.JobQueue, @@ -2256,13 +2255,12 @@ const TestFetchBuilder = struct { ) !*Fetch { const cache_dir = try cache_parent_dir.makeOpenPath("zig-global-cache", .{}); - try self.thread_pool.init(.{ .allocator = allocator }); self.http_client = .{ .allocator = allocator, .io = io }; self.global_cache_directory = .{ .handle = cache_dir, .path = null }; self.job_queue = .{ + .io = io, .http_client = &self.http_client, - .thread_pool = &self.thread_pool, .global_cache = self.global_cache_directory, .recursive = false, .read_only = false, @@ -2273,7 +2271,6 @@ const TestFetchBuilder = struct { self.fetch = .{ .arena = std.heap.ArenaAllocator.init(allocator), - .io = io, .location = .{ .path_or_url = path_or_url }, .location_tok = 0, .hash_tok = .none, @@ -2309,7 +2306,6 @@ const TestFetchBuilder = struct { self.fetch.prog_node.end(); self.global_cache_directory.handle.close(); self.http_client.deinit(); - self.thread_pool.deinit(); } fn packageDir(self: *TestFetchBuilder) !fs.Dir { diff --git a/src/main.zig b/src/main.zig index c410100b08..c08e9da449 100644 --- a/src/main.zig +++ b/src/main.zig @@ -5139,8 +5139,8 @@ fn cmdBuild(gpa: Allocator, arena: Allocator, io: Io, args: []const []const u8) defer fetch_prog_node.end(); var job_queue: Package.Fetch.JobQueue = .{ + .io = io, .http_client = &http_client, - .thread_pool = &thread_pool, .global_cache = dirs.global_cache, .read_only = false, .recursive = true, @@ -5173,7 +5173,6 @@ fn cmdBuild(gpa: Allocator, arena: Allocator, io: Io, args: []const []const u8) var fetch: Package.Fetch = .{ .arena = std.heap.ArenaAllocator.init(gpa), - .io = io, .location = .{ .relative_path = phantom_package_root }, .location_tok = 0, .hash_tok = .none, @@ -5207,10 +5206,8 @@ fn cmdBuild(gpa: Allocator, arena: Allocator, io: Io, args: []const []const u8) &fetch, ); - job_queue.thread_pool.spawnWg(&job_queue.wait_group, Package.Fetch.workerRun, .{ - &fetch, "root", - }); - job_queue.wait_group.wait(); + job_queue.group.async(io, Package.Fetch.workerRun, .{ &fetch, "root" }); + job_queue.group.wait(io); try job_queue.consolidateErrors(); @@ -6899,8 +6896,8 @@ fn cmdFetch( defer global_cache_directory.handle.close(); var job_queue: Package.Fetch.JobQueue = .{ + .io = io, .http_client = &http_client, - .thread_pool = &thread_pool, .global_cache = global_cache_directory, .recursive = false, .read_only = false, @@ -6912,7 +6909,6 @@ fn cmdFetch( var fetch: Package.Fetch = .{ .arena = std.heap.ArenaAllocator.init(gpa), - .io = io, .location = .{ .path_or_url = path_or_url }, .location_tok = 0, .hash_tok = .none, @@ -6942,7 +6938,7 @@ fn cmdFetch( defer fetch.deinit(); fetch.run() catch |err| switch (err) { - error.OutOfMemory => fatal("out of memory", .{}), + error.OutOfMemory, error.Canceled => |e| return e, error.FetchFailed => {}, // error bundle checked below };