From a242292644a12b8ca0485759ba45c550265da5bd Mon Sep 17 00:00:00 2001 From: Andrew Kelley Date: Mon, 24 Nov 2025 07:37:05 -0800 Subject: [PATCH] build runner: update from std.Thread.Pool to std.Io --- lib/compiler/build_runner.zig | 50 +++++++++-------------------------- lib/std/Build/Fuzz.zig | 38 +++++++++++--------------- lib/std/Build/Step.zig | 1 - lib/std/Build/Step/Run.zig | 1 - lib/std/Build/WebServer.zig | 4 --- lib/std/Io/Threaded.zig | 14 +++++++--- 6 files changed, 40 insertions(+), 68 deletions(-) diff --git a/lib/compiler/build_runner.zig b/lib/compiler/build_runner.zig index e5eb5eec67..44885a5152 100644 --- a/lib/compiler/build_runner.zig +++ b/lib/compiler/build_runner.zig @@ -107,7 +107,6 @@ pub fn main() !void { var targets = std.array_list.Managed([]const u8).init(arena); var debug_log_scopes = std.array_list.Managed([]const u8).init(arena); - var thread_pool_options: std.Thread.Pool.Options = .{ .allocator = arena }; var install_prefix: ?[]const u8 = null; var dir_list = std.Build.DirList{}; @@ -413,19 +412,11 @@ pub fn main() !void { }; } else if (mem.eql(u8, arg, "-fno-reference-trace")) { builder.reference_trace = null; - } else if (mem.startsWith(u8, arg, "-j")) { - const num = arg["-j".len..]; - const n_jobs = std.fmt.parseUnsigned(u32, num, 10) catch |err| { - std.debug.print("unable to parse jobs count '{s}': {s}", .{ - num, @errorName(err), - }); - process.exit(1); - }; - if (n_jobs < 1) { - std.debug.print("number of jobs must be at least 1\n", .{}); - process.exit(1); - } - thread_pool_options.n_jobs = n_jobs; + } else if (mem.cutPrefix(u8, arg, "-j")) |text| { + const n = std.fmt.parseUnsigned(u32, text, 10) catch |err| + fatal("unable to parse jobs count '{s}': {t}", .{ text, err }); + if (n < 1) fatal("number of jobs must be at least 1", .{}); + threaded.setAsyncLimit(.limited(n)); } else if (mem.eql(u8, arg, "--")) { builder.args = argsRest(args, arg_idx); break; @@ -516,7 +507,6 @@ pub fn main() !void { .error_style = error_style, .multiline_errors = multiline_errors, .summary = summary orelse if (watch or webui_listen != null) .line else .failures, - .thread_pool = undefined, .ttyconf = ttyconf, }; @@ -547,16 +537,12 @@ pub fn main() !void { break :w try .init(); }; - try run.thread_pool.init(thread_pool_options); - defer run.thread_pool.deinit(); - const now = Io.Clock.Timestamp.now(io, .awake) catch |err| fatal("failed to collect timestamp: {t}", .{err}); run.web_server = if (webui_listen) |listen_address| ws: { if (builtin.single_threaded) unreachable; // `fatal` above break :ws .init(.{ .gpa = gpa, - .thread_pool = &run.thread_pool, .ttyconf = ttyconf, .graph = &graph, .all_steps = run.step_stack.keys(), @@ -675,7 +661,6 @@ const Run = struct { memory_blocked_steps: std.ArrayList(*Step), /// Allocated into `gpa`. step_stack: std.AutoArrayHashMapUnmanaged(*Step, void), - thread_pool: std.Thread.Pool, /// Similar to the `tty.Config` returned by `std.debug.lockStderrWriter`, /// but also respects the '--color' flag. ttyconf: tty.Config, @@ -754,14 +739,13 @@ fn runStepNames( const gpa = run.gpa; const io = b.graph.io; const step_stack = &run.step_stack; - const thread_pool = &run.thread_pool; { const step_prog = parent_prog_node.start("steps", step_stack.count()); defer step_prog.end(); - var wait_group: std.Thread.WaitGroup = .{}; - defer wait_group.wait(); + var group: Io.Group = .init; + defer group.wait(io); // Here we spawn the initial set of tasks with a nice heuristic - // dependency order. Each worker when it finishes a step will then @@ -771,9 +755,7 @@ fn runStepNames( const step = steps_slice[steps_slice.len - i - 1]; if (step.state == .skipped_oom) continue; - thread_pool.spawnWg(&wait_group, workerMakeOneStep, .{ - &wait_group, b, step, step_prog, run, - }); + group.async(io, workerMakeOneStep, .{ &group, b, step, step_prog, run }); } } @@ -855,7 +837,6 @@ fn runStepNames( var f = std.Build.Fuzz.init( gpa, io, - thread_pool, run.ttyconf, step_stack.keys(), parent_prog_node, @@ -1318,14 +1299,12 @@ fn constructGraphAndCheckForDependencyLoop( } fn workerMakeOneStep( - wg: *std.Thread.WaitGroup, + group: *Io.Group, b: *std.Build, s: *Step, prog_node: std.Progress.Node, run: *Run, ) void { - const thread_pool = &run.thread_pool; - // First, check the conditions for running this step. If they are not met, // then we return without doing the step, relying on another worker to // queue this step up again when dependencies are met. @@ -1381,7 +1360,6 @@ fn workerMakeOneStep( const make_result = s.make(.{ .progress_node = sub_prog_node, - .thread_pool = thread_pool, .watch = run.watch, .web_server = if (run.web_server) |*ws| ws else null, .ttyconf = run.ttyconf, @@ -1400,6 +1378,8 @@ fn workerMakeOneStep( printErrorMessages(run.gpa, s, .{}, bw, ttyconf, run.error_style, run.multiline_errors) catch {}; } + const io = b.graph.io; + handle_result: { if (make_result) |_| { @atomicStore(Step.State, &s.state, .success, .seq_cst); @@ -1419,9 +1399,7 @@ fn workerMakeOneStep( // Successful completion of a step, so we queue up its dependants as well. for (s.dependants.items) |dep| { - thread_pool.spawnWg(wg, workerMakeOneStep, .{ - wg, b, dep, prog_node, run, - }); + group.async(io, workerMakeOneStep, .{ group, b, dep, prog_node, run }); } } @@ -1444,9 +1422,7 @@ fn workerMakeOneStep( if (dep.max_rss <= remaining) { remaining -= dep.max_rss; - thread_pool.spawnWg(wg, workerMakeOneStep, .{ - wg, b, dep, prog_node, run, - }); + group.async(io, workerMakeOneStep, .{ group, b, dep, prog_node, run }); } else { run.memory_blocked_steps.items[i] = dep; i += 1; diff --git a/lib/std/Build/Fuzz.zig b/lib/std/Build/Fuzz.zig index 37af72a6de..6c285ff313 100644 --- a/lib/std/Build/Fuzz.zig +++ b/lib/std/Build/Fuzz.zig @@ -22,10 +22,9 @@ mode: Mode, /// Allocated into `gpa`. run_steps: []const *Step.Run, -wait_group: std.Thread.WaitGroup, +group: Io.Group, root_prog_node: std.Progress.Node, prog_node: std.Progress.Node, -thread_pool: *std.Thread.Pool, /// Protects `coverage_files`. coverage_mutex: std.Thread.Mutex, @@ -78,7 +77,6 @@ const CoverageMap = struct { pub fn init( gpa: Allocator, io: Io, - thread_pool: *std.Thread.Pool, ttyconf: tty.Config, all_steps: []const *Build.Step, root_prog_node: std.Progress.Node, @@ -89,20 +87,22 @@ pub fn init( defer steps.deinit(gpa); const rebuild_node = root_prog_node.start("Rebuilding Unit Tests", 0); defer rebuild_node.end(); - var rebuild_wg: std.Thread.WaitGroup = .{}; - defer rebuild_wg.wait(); + var rebuild_group: Io.Group = .init; + defer rebuild_group.cancel(io); for (all_steps) |step| { const run = step.cast(Step.Run) orelse continue; if (run.producer == null) continue; if (run.fuzz_tests.items.len == 0) continue; try steps.append(gpa, run); - thread_pool.spawnWg(&rebuild_wg, rebuildTestsWorkerRun, .{ run, gpa, ttyconf, rebuild_node }); + rebuild_group.async(io, rebuildTestsWorkerRun, .{ run, gpa, ttyconf, rebuild_node }); } if (steps.items.len == 0) fatal("no fuzz tests found", .{}); rebuild_node.setEstimatedTotalItems(steps.items.len); - break :steps try gpa.dupe(*Step.Run, steps.items); + const run_steps = try gpa.dupe(*Step.Run, steps.items); + rebuild_group.wait(io); + break :steps run_steps; }; errdefer gpa.free(run_steps); @@ -118,8 +118,7 @@ pub fn init( .ttyconf = ttyconf, .mode = mode, .run_steps = run_steps, - .wait_group = .{}, - .thread_pool = thread_pool, + .group = .init, .root_prog_node = root_prog_node, .prog_node = .none, .coverage_files = .empty, @@ -131,29 +130,26 @@ pub fn init( } pub fn start(fuzz: *Fuzz) void { + const io = fuzz.io; fuzz.prog_node = fuzz.root_prog_node.start("Fuzzing", fuzz.run_steps.len); if (fuzz.mode == .forever) { // For polling messages and sending updates to subscribers. - fuzz.wait_group.start(); - _ = std.Thread.spawn(.{}, coverageRun, .{fuzz}) catch |err| { - fuzz.wait_group.finish(); - fatal("unable to spawn coverage thread: {s}", .{@errorName(err)}); - }; + fuzz.group.concurrent(io, coverageRun, .{fuzz}) catch |err| + fatal("unable to spawn coverage task: {t}", .{err}); } for (fuzz.run_steps) |run| { for (run.fuzz_tests.items) |unit_test_index| { assert(run.rebuilt_executable != null); - fuzz.thread_pool.spawnWg(&fuzz.wait_group, fuzzWorkerRun, .{ - fuzz, run, unit_test_index, - }); + fuzz.group.async(io, fuzzWorkerRun, .{ fuzz, run, unit_test_index }); } } } pub fn deinit(fuzz: *Fuzz) void { - if (!fuzz.wait_group.isDone()) @panic("TODO: terminate the fuzzer processes"); + const io = fuzz.io; + fuzz.group.cancel(io); fuzz.prog_node.end(); fuzz.gpa.free(fuzz.run_steps); } @@ -335,8 +331,6 @@ pub fn sendUpdate( } fn coverageRun(fuzz: *Fuzz) void { - defer fuzz.wait_group.finish(); - fuzz.queue_mutex.lock(); defer fuzz.queue_mutex.unlock(); @@ -511,8 +505,8 @@ pub fn waitAndPrintReport(fuzz: *Fuzz) void { assert(fuzz.mode == .limit); const io = fuzz.io; - fuzz.wait_group.wait(); - fuzz.wait_group.reset(); + fuzz.group.wait(io); + fuzz.group = .init; std.debug.print("======= FUZZING REPORT =======\n", .{}); for (fuzz.msg_queue.items) |msg| { diff --git a/lib/std/Build/Step.zig b/lib/std/Build/Step.zig index 8ee686e44e..478b600195 100644 --- a/lib/std/Build/Step.zig +++ b/lib/std/Build/Step.zig @@ -110,7 +110,6 @@ pub const TestResults = struct { pub const MakeOptions = struct { progress_node: std.Progress.Node, - thread_pool: *std.Thread.Pool, watch: bool, web_server: switch (builtin.target.cpu.arch) { else => ?*Build.WebServer, diff --git a/lib/std/Build/Step/Run.zig b/lib/std/Build/Step/Run.zig index 12e4e936f4..bcde251d48 100644 --- a/lib/std/Build/Step/Run.zig +++ b/lib/std/Build/Step/Run.zig @@ -1151,7 +1151,6 @@ pub fn rerunInFuzzMode( const tmp_dir_path = "tmp" ++ fs.path.sep_str ++ std.fmt.hex(rand_int); try runCommand(run, argv_list.items, has_side_effects, tmp_dir_path, .{ .progress_node = prog_node, - .thread_pool = undefined, // not used by `runCommand` .watch = undefined, // not used by `runCommand` .web_server = null, // only needed for time reports .ttyconf = fuzz.ttyconf, diff --git a/lib/std/Build/WebServer.zig b/lib/std/Build/WebServer.zig index ec15ab1dba..e28dd75b5e 100644 --- a/lib/std/Build/WebServer.zig +++ b/lib/std/Build/WebServer.zig @@ -1,5 +1,4 @@ gpa: Allocator, -thread_pool: *std.Thread.Pool, graph: *const Build.Graph, all_steps: []const *Build.Step, listen_address: net.IpAddress, @@ -53,7 +52,6 @@ pub fn notifyUpdate(ws: *WebServer) void { pub const Options = struct { gpa: Allocator, - thread_pool: *std.Thread.Pool, ttyconf: Io.tty.Config, graph: *const std.Build.Graph, all_steps: []const *Build.Step, @@ -100,7 +98,6 @@ pub fn init(opts: Options) WebServer { return .{ .gpa = opts.gpa, - .thread_pool = opts.thread_pool, .ttyconf = opts.ttyconf, .graph = opts.graph, .all_steps = all_steps, @@ -235,7 +232,6 @@ pub fn finishBuild(ws: *WebServer, opts: struct { ws.fuzz = Fuzz.init( ws.gpa, ws.graph.io, - ws.thread_pool, ws.ttyconf, ws.all_steps, ws.root_prog_node, diff --git a/lib/std/Io/Threaded.zig b/lib/std/Io/Threaded.zig index 57c1bf0c71..3b858767c8 100644 --- a/lib/std/Io/Threaded.zig +++ b/lib/std/Io/Threaded.zig @@ -33,6 +33,9 @@ wait_group: std.Thread.WaitGroup = .{}, /// immediately. /// /// Defaults to a number equal to logical CPU cores. +/// +/// Protected by `mutex` once the I/O instance is already in use. See +/// `setAsyncLimit`. async_limit: Io.Limit, /// Maximum thread pool size (excluding main thread) for dispatching concurrent /// tasks. Until this limit, calls to `Io.concurrent` will increase the thread @@ -168,6 +171,12 @@ pub const init_single_threaded: Threaded = .{ .have_signal_handler = false, }; +pub fn setAsyncLimit(t: *Threaded, new_limit: Io.Limit) void { + t.mutex.lock(); + defer t.mutex.unlock(); + t.async_limit = new_limit; +} + pub fn deinit(t: *Threaded) void { t.join(); if (is_windows and t.wsa.status == .initialized) { @@ -507,7 +516,7 @@ fn async( start: *const fn (context: *const anyopaque, result: *anyopaque) void, ) ?*Io.AnyFuture { const t: *Threaded = @ptrCast(@alignCast(userdata)); - if (builtin.single_threaded or t.async_limit == .nothing) { + if (builtin.single_threaded) { start(context.ptr, result.ptr); return null; } @@ -684,8 +693,7 @@ fn groupAsync( start: *const fn (*Io.Group, context: *const anyopaque) void, ) void { const t: *Threaded = @ptrCast(@alignCast(userdata)); - if (builtin.single_threaded or t.async_limit == .nothing) - return start(group, context.ptr); + if (builtin.single_threaded) return start(group, context.ptr); const gpa = t.allocator; const gc = GroupClosure.init(gpa, t, group, context, context_alignment, start) catch