From a242292644a12b8ca0485759ba45c550265da5bd Mon Sep 17 00:00:00 2001 From: Andrew Kelley Date: Mon, 24 Nov 2025 07:37:05 -0800 Subject: [PATCH 1/4] build runner: update from std.Thread.Pool to std.Io --- lib/compiler/build_runner.zig | 50 +++++++++-------------------------- lib/std/Build/Fuzz.zig | 38 +++++++++++--------------- lib/std/Build/Step.zig | 1 - lib/std/Build/Step/Run.zig | 1 - lib/std/Build/WebServer.zig | 4 --- lib/std/Io/Threaded.zig | 14 +++++++--- 6 files changed, 40 insertions(+), 68 deletions(-) diff --git a/lib/compiler/build_runner.zig b/lib/compiler/build_runner.zig index e5eb5eec67..44885a5152 100644 --- a/lib/compiler/build_runner.zig +++ b/lib/compiler/build_runner.zig @@ -107,7 +107,6 @@ pub fn main() !void { var targets = std.array_list.Managed([]const u8).init(arena); var debug_log_scopes = std.array_list.Managed([]const u8).init(arena); - var thread_pool_options: std.Thread.Pool.Options = .{ .allocator = arena }; var install_prefix: ?[]const u8 = null; var dir_list = std.Build.DirList{}; @@ -413,19 +412,11 @@ pub fn main() !void { }; } else if (mem.eql(u8, arg, "-fno-reference-trace")) { builder.reference_trace = null; - } else if (mem.startsWith(u8, arg, "-j")) { - const num = arg["-j".len..]; - const n_jobs = std.fmt.parseUnsigned(u32, num, 10) catch |err| { - std.debug.print("unable to parse jobs count '{s}': {s}", .{ - num, @errorName(err), - }); - process.exit(1); - }; - if (n_jobs < 1) { - std.debug.print("number of jobs must be at least 1\n", .{}); - process.exit(1); - } - thread_pool_options.n_jobs = n_jobs; + } else if (mem.cutPrefix(u8, arg, "-j")) |text| { + const n = std.fmt.parseUnsigned(u32, text, 10) catch |err| + fatal("unable to parse jobs count '{s}': {t}", .{ text, err }); + if (n < 1) fatal("number of jobs must be at least 1", .{}); + threaded.setAsyncLimit(.limited(n)); } else if (mem.eql(u8, arg, "--")) { builder.args = argsRest(args, arg_idx); break; @@ -516,7 +507,6 @@ pub fn main() !void { .error_style = error_style, .multiline_errors = multiline_errors, .summary = summary orelse if (watch or webui_listen != null) .line else .failures, - .thread_pool = undefined, .ttyconf = ttyconf, }; @@ -547,16 +537,12 @@ pub fn main() !void { break :w try .init(); }; - try run.thread_pool.init(thread_pool_options); - defer run.thread_pool.deinit(); - const now = Io.Clock.Timestamp.now(io, .awake) catch |err| fatal("failed to collect timestamp: {t}", .{err}); run.web_server = if (webui_listen) |listen_address| ws: { if (builtin.single_threaded) unreachable; // `fatal` above break :ws .init(.{ .gpa = gpa, - .thread_pool = &run.thread_pool, .ttyconf = ttyconf, .graph = &graph, .all_steps = run.step_stack.keys(), @@ -675,7 +661,6 @@ const Run = struct { memory_blocked_steps: std.ArrayList(*Step), /// Allocated into `gpa`. step_stack: std.AutoArrayHashMapUnmanaged(*Step, void), - thread_pool: std.Thread.Pool, /// Similar to the `tty.Config` returned by `std.debug.lockStderrWriter`, /// but also respects the '--color' flag. ttyconf: tty.Config, @@ -754,14 +739,13 @@ fn runStepNames( const gpa = run.gpa; const io = b.graph.io; const step_stack = &run.step_stack; - const thread_pool = &run.thread_pool; { const step_prog = parent_prog_node.start("steps", step_stack.count()); defer step_prog.end(); - var wait_group: std.Thread.WaitGroup = .{}; - defer wait_group.wait(); + var group: Io.Group = .init; + defer group.wait(io); // Here we spawn the initial set of tasks with a nice heuristic - // dependency order. Each worker when it finishes a step will then @@ -771,9 +755,7 @@ fn runStepNames( const step = steps_slice[steps_slice.len - i - 1]; if (step.state == .skipped_oom) continue; - thread_pool.spawnWg(&wait_group, workerMakeOneStep, .{ - &wait_group, b, step, step_prog, run, - }); + group.async(io, workerMakeOneStep, .{ &group, b, step, step_prog, run }); } } @@ -855,7 +837,6 @@ fn runStepNames( var f = std.Build.Fuzz.init( gpa, io, - thread_pool, run.ttyconf, step_stack.keys(), parent_prog_node, @@ -1318,14 +1299,12 @@ fn constructGraphAndCheckForDependencyLoop( } fn workerMakeOneStep( - wg: *std.Thread.WaitGroup, + group: *Io.Group, b: *std.Build, s: *Step, prog_node: std.Progress.Node, run: *Run, ) void { - const thread_pool = &run.thread_pool; - // First, check the conditions for running this step. If they are not met, // then we return without doing the step, relying on another worker to // queue this step up again when dependencies are met. @@ -1381,7 +1360,6 @@ fn workerMakeOneStep( const make_result = s.make(.{ .progress_node = sub_prog_node, - .thread_pool = thread_pool, .watch = run.watch, .web_server = if (run.web_server) |*ws| ws else null, .ttyconf = run.ttyconf, @@ -1400,6 +1378,8 @@ fn workerMakeOneStep( printErrorMessages(run.gpa, s, .{}, bw, ttyconf, run.error_style, run.multiline_errors) catch {}; } + const io = b.graph.io; + handle_result: { if (make_result) |_| { @atomicStore(Step.State, &s.state, .success, .seq_cst); @@ -1419,9 +1399,7 @@ fn workerMakeOneStep( // Successful completion of a step, so we queue up its dependants as well. for (s.dependants.items) |dep| { - thread_pool.spawnWg(wg, workerMakeOneStep, .{ - wg, b, dep, prog_node, run, - }); + group.async(io, workerMakeOneStep, .{ group, b, dep, prog_node, run }); } } @@ -1444,9 +1422,7 @@ fn workerMakeOneStep( if (dep.max_rss <= remaining) { remaining -= dep.max_rss; - thread_pool.spawnWg(wg, workerMakeOneStep, .{ - wg, b, dep, prog_node, run, - }); + group.async(io, workerMakeOneStep, .{ group, b, dep, prog_node, run }); } else { run.memory_blocked_steps.items[i] = dep; i += 1; diff --git a/lib/std/Build/Fuzz.zig b/lib/std/Build/Fuzz.zig index 37af72a6de..6c285ff313 100644 --- a/lib/std/Build/Fuzz.zig +++ b/lib/std/Build/Fuzz.zig @@ -22,10 +22,9 @@ mode: Mode, /// Allocated into `gpa`. run_steps: []const *Step.Run, -wait_group: std.Thread.WaitGroup, +group: Io.Group, root_prog_node: std.Progress.Node, prog_node: std.Progress.Node, -thread_pool: *std.Thread.Pool, /// Protects `coverage_files`. coverage_mutex: std.Thread.Mutex, @@ -78,7 +77,6 @@ const CoverageMap = struct { pub fn init( gpa: Allocator, io: Io, - thread_pool: *std.Thread.Pool, ttyconf: tty.Config, all_steps: []const *Build.Step, root_prog_node: std.Progress.Node, @@ -89,20 +87,22 @@ pub fn init( defer steps.deinit(gpa); const rebuild_node = root_prog_node.start("Rebuilding Unit Tests", 0); defer rebuild_node.end(); - var rebuild_wg: std.Thread.WaitGroup = .{}; - defer rebuild_wg.wait(); + var rebuild_group: Io.Group = .init; + defer rebuild_group.cancel(io); for (all_steps) |step| { const run = step.cast(Step.Run) orelse continue; if (run.producer == null) continue; if (run.fuzz_tests.items.len == 0) continue; try steps.append(gpa, run); - thread_pool.spawnWg(&rebuild_wg, rebuildTestsWorkerRun, .{ run, gpa, ttyconf, rebuild_node }); + rebuild_group.async(io, rebuildTestsWorkerRun, .{ run, gpa, ttyconf, rebuild_node }); } if (steps.items.len == 0) fatal("no fuzz tests found", .{}); rebuild_node.setEstimatedTotalItems(steps.items.len); - break :steps try gpa.dupe(*Step.Run, steps.items); + const run_steps = try gpa.dupe(*Step.Run, steps.items); + rebuild_group.wait(io); + break :steps run_steps; }; errdefer gpa.free(run_steps); @@ -118,8 +118,7 @@ pub fn init( .ttyconf = ttyconf, .mode = mode, .run_steps = run_steps, - .wait_group = .{}, - .thread_pool = thread_pool, + .group = .init, .root_prog_node = root_prog_node, .prog_node = .none, .coverage_files = .empty, @@ -131,29 +130,26 @@ pub fn init( } pub fn start(fuzz: *Fuzz) void { + const io = fuzz.io; fuzz.prog_node = fuzz.root_prog_node.start("Fuzzing", fuzz.run_steps.len); if (fuzz.mode == .forever) { // For polling messages and sending updates to subscribers. - fuzz.wait_group.start(); - _ = std.Thread.spawn(.{}, coverageRun, .{fuzz}) catch |err| { - fuzz.wait_group.finish(); - fatal("unable to spawn coverage thread: {s}", .{@errorName(err)}); - }; + fuzz.group.concurrent(io, coverageRun, .{fuzz}) catch |err| + fatal("unable to spawn coverage task: {t}", .{err}); } for (fuzz.run_steps) |run| { for (run.fuzz_tests.items) |unit_test_index| { assert(run.rebuilt_executable != null); - fuzz.thread_pool.spawnWg(&fuzz.wait_group, fuzzWorkerRun, .{ - fuzz, run, unit_test_index, - }); + fuzz.group.async(io, fuzzWorkerRun, .{ fuzz, run, unit_test_index }); } } } pub fn deinit(fuzz: *Fuzz) void { - if (!fuzz.wait_group.isDone()) @panic("TODO: terminate the fuzzer processes"); + const io = fuzz.io; + fuzz.group.cancel(io); fuzz.prog_node.end(); fuzz.gpa.free(fuzz.run_steps); } @@ -335,8 +331,6 @@ pub fn sendUpdate( } fn coverageRun(fuzz: *Fuzz) void { - defer fuzz.wait_group.finish(); - fuzz.queue_mutex.lock(); defer fuzz.queue_mutex.unlock(); @@ -511,8 +505,8 @@ pub fn waitAndPrintReport(fuzz: *Fuzz) void { assert(fuzz.mode == .limit); const io = fuzz.io; - fuzz.wait_group.wait(); - fuzz.wait_group.reset(); + fuzz.group.wait(io); + fuzz.group = .init; std.debug.print("======= FUZZING REPORT =======\n", .{}); for (fuzz.msg_queue.items) |msg| { diff --git a/lib/std/Build/Step.zig b/lib/std/Build/Step.zig index 8ee686e44e..478b600195 100644 --- a/lib/std/Build/Step.zig +++ b/lib/std/Build/Step.zig @@ -110,7 +110,6 @@ pub const TestResults = struct { pub const MakeOptions = struct { progress_node: std.Progress.Node, - thread_pool: *std.Thread.Pool, watch: bool, web_server: switch (builtin.target.cpu.arch) { else => ?*Build.WebServer, diff --git a/lib/std/Build/Step/Run.zig b/lib/std/Build/Step/Run.zig index 12e4e936f4..bcde251d48 100644 --- a/lib/std/Build/Step/Run.zig +++ b/lib/std/Build/Step/Run.zig @@ -1151,7 +1151,6 @@ pub fn rerunInFuzzMode( const tmp_dir_path = "tmp" ++ fs.path.sep_str ++ std.fmt.hex(rand_int); try runCommand(run, argv_list.items, has_side_effects, tmp_dir_path, .{ .progress_node = prog_node, - .thread_pool = undefined, // not used by `runCommand` .watch = undefined, // not used by `runCommand` .web_server = null, // only needed for time reports .ttyconf = fuzz.ttyconf, diff --git a/lib/std/Build/WebServer.zig b/lib/std/Build/WebServer.zig index ec15ab1dba..e28dd75b5e 100644 --- a/lib/std/Build/WebServer.zig +++ b/lib/std/Build/WebServer.zig @@ -1,5 +1,4 @@ gpa: Allocator, -thread_pool: *std.Thread.Pool, graph: *const Build.Graph, all_steps: []const *Build.Step, listen_address: net.IpAddress, @@ -53,7 +52,6 @@ pub fn notifyUpdate(ws: *WebServer) void { pub const Options = struct { gpa: Allocator, - thread_pool: *std.Thread.Pool, ttyconf: Io.tty.Config, graph: *const std.Build.Graph, all_steps: []const *Build.Step, @@ -100,7 +98,6 @@ pub fn init(opts: Options) WebServer { return .{ .gpa = opts.gpa, - .thread_pool = opts.thread_pool, .ttyconf = opts.ttyconf, .graph = opts.graph, .all_steps = all_steps, @@ -235,7 +232,6 @@ pub fn finishBuild(ws: *WebServer, opts: struct { ws.fuzz = Fuzz.init( ws.gpa, ws.graph.io, - ws.thread_pool, ws.ttyconf, ws.all_steps, ws.root_prog_node, diff --git a/lib/std/Io/Threaded.zig b/lib/std/Io/Threaded.zig index 57c1bf0c71..3b858767c8 100644 --- a/lib/std/Io/Threaded.zig +++ b/lib/std/Io/Threaded.zig @@ -33,6 +33,9 @@ wait_group: std.Thread.WaitGroup = .{}, /// immediately. /// /// Defaults to a number equal to logical CPU cores. +/// +/// Protected by `mutex` once the I/O instance is already in use. See +/// `setAsyncLimit`. async_limit: Io.Limit, /// Maximum thread pool size (excluding main thread) for dispatching concurrent /// tasks. Until this limit, calls to `Io.concurrent` will increase the thread @@ -168,6 +171,12 @@ pub const init_single_threaded: Threaded = .{ .have_signal_handler = false, }; +pub fn setAsyncLimit(t: *Threaded, new_limit: Io.Limit) void { + t.mutex.lock(); + defer t.mutex.unlock(); + t.async_limit = new_limit; +} + pub fn deinit(t: *Threaded) void { t.join(); if (is_windows and t.wsa.status == .initialized) { @@ -507,7 +516,7 @@ fn async( start: *const fn (context: *const anyopaque, result: *anyopaque) void, ) ?*Io.AnyFuture { const t: *Threaded = @ptrCast(@alignCast(userdata)); - if (builtin.single_threaded or t.async_limit == .nothing) { + if (builtin.single_threaded) { start(context.ptr, result.ptr); return null; } @@ -684,8 +693,7 @@ fn groupAsync( start: *const fn (*Io.Group, context: *const anyopaque) void, ) void { const t: *Threaded = @ptrCast(@alignCast(userdata)); - if (builtin.single_threaded or t.async_limit == .nothing) - return start(group, context.ptr); + if (builtin.single_threaded) return start(group, context.ptr); const gpa = t.allocator; const gc = GroupClosure.init(gpa, t, group, context, context_alignment, start) catch From 3f34f5e43349214c862882a83bd951701a6c735f Mon Sep 17 00:00:00 2001 From: Andrew Kelley Date: Mon, 24 Nov 2025 11:18:35 -0800 Subject: [PATCH 2/4] build runner: update Mutex and Condition usage to std.Io --- lib/compiler/build_runner.zig | 18 ++++---- lib/std/Build/Cache.zig | 31 +++++++++---- lib/std/Build/Fuzz.zig | 85 ++++++++++++++++++++--------------- lib/std/Build/Step.zig | 37 +++++++-------- lib/std/Build/Step/Run.zig | 13 +++--- lib/std/Build/WebServer.zig | 77 +++++++++++++++++-------------- 6 files changed, 149 insertions(+), 112 deletions(-) diff --git a/lib/compiler/build_runner.zig b/lib/compiler/build_runner.zig index 44885a5152..fb2385b7e0 100644 --- a/lib/compiler/build_runner.zig +++ b/lib/compiler/build_runner.zig @@ -494,7 +494,7 @@ pub fn main() !void { .max_rss = max_rss, .max_rss_is_default = false, - .max_rss_mutex = .{}, + .max_rss_mutex = .init, .skip_oom_steps = skip_oom_steps, .unit_test_timeout_ns = test_timeout_ns, @@ -583,7 +583,7 @@ pub fn main() !void { if (run.web_server) |*ws| { assert(!watch); // fatal error after CLI parsing - while (true) switch (ws.wait()) { + while (true) switch (try ws.wait()) { .rebuild => { for (run.step_stack.keys()) |step| { step.state = .precheck_done; @@ -652,7 +652,7 @@ const Run = struct { gpa: Allocator, max_rss: u64, max_rss_is_default: bool, - max_rss_mutex: std.Thread.Mutex, + max_rss_mutex: Io.Mutex, skip_oom_steps: bool, unit_test_timeout_ns: ?u64, watch: bool, @@ -1305,6 +1305,8 @@ fn workerMakeOneStep( prog_node: std.Progress.Node, run: *Run, ) void { + const io = b.graph.io; + // First, check the conditions for running this step. If they are not met, // then we return without doing the step, relying on another worker to // queue this step up again when dependencies are met. @@ -1326,8 +1328,8 @@ fn workerMakeOneStep( } if (s.max_rss != 0) { - run.max_rss_mutex.lock(); - defer run.max_rss_mutex.unlock(); + run.max_rss_mutex.lockUncancelable(io); + defer run.max_rss_mutex.unlock(io); // Avoid running steps twice. if (s.state != .precheck_done) { @@ -1378,8 +1380,6 @@ fn workerMakeOneStep( printErrorMessages(run.gpa, s, .{}, bw, ttyconf, run.error_style, run.multiline_errors) catch {}; } - const io = b.graph.io; - handle_result: { if (make_result) |_| { @atomicStore(Step.State, &s.state, .success, .seq_cst); @@ -1406,8 +1406,8 @@ fn workerMakeOneStep( // If this is a step that claims resources, we must now queue up other // steps that are waiting for resources. if (s.max_rss != 0) { - run.max_rss_mutex.lock(); - defer run.max_rss_mutex.unlock(); + run.max_rss_mutex.lockUncancelable(io); + defer run.max_rss_mutex.unlock(io); // Give the memory back to the scheduler. run.claimed_rss -= s.max_rss; diff --git a/lib/std/Build/Cache.zig b/lib/std/Build/Cache.zig index 263d990e6d..5d5b9df413 100644 --- a/lib/std/Build/Cache.zig +++ b/lib/std/Build/Cache.zig @@ -22,7 +22,7 @@ manifest_dir: fs.Dir, hash: HashHelper = .{}, /// This value is accessed from multiple threads, protected by mutex. recent_problematic_timestamp: Io.Timestamp = .zero, -mutex: std.Thread.Mutex = .{}, +mutex: Io.Mutex = .init, /// A set of strings such as the zig library directory or project source root, which /// are stripped from the file paths before putting into the cache. They @@ -474,6 +474,7 @@ pub const Manifest = struct { /// A cache manifest file exists however it could not be parsed. InvalidFormat, OutOfMemory, + Canceled, }; /// Check the cache to see if the input exists in it. If it exists, returns `true`. @@ -559,12 +560,14 @@ pub const Manifest = struct { self.diagnostic = .{ .manifest_create = error.FileNotFound }; return error.CacheCheckFailed; }, + error.Canceled => return error.Canceled, else => |e| { self.diagnostic = .{ .manifest_create = e }; return error.CacheCheckFailed; }, } }, + error.Canceled => return error.Canceled, else => |e| { self.diagnostic = .{ .manifest_create = e }; return error.CacheCheckFailed; @@ -762,6 +765,7 @@ pub const Manifest = struct { // Every digest before this one has been populated successfully. return .{ .miss = .{ .file_digests_populated = idx } }; }, + error.Canceled => return error.Canceled, else => |e| { self.diagnostic = .{ .file_open = .{ .file_index = idx, @@ -790,7 +794,7 @@ pub const Manifest = struct { .inode = actual_stat.inode, }; - if (self.isProblematicTimestamp(cache_hash_file.stat.mtime)) { + if (try self.isProblematicTimestamp(cache_hash_file.stat.mtime)) { // The actual file has an unreliable timestamp, force it to be hashed cache_hash_file.stat.mtime = .zero; cache_hash_file.stat.inode = 0; @@ -848,7 +852,9 @@ pub const Manifest = struct { } } - fn isProblematicTimestamp(man: *Manifest, timestamp: Io.Timestamp) bool { + fn isProblematicTimestamp(man: *Manifest, timestamp: Io.Timestamp) error{Canceled}!bool { + const io = man.cache.io; + // If the file_time is prior to the most recent problematic timestamp // then we don't need to access the filesystem. if (timestamp.nanoseconds < man.recent_problematic_timestamp.nanoseconds) @@ -856,8 +862,8 @@ pub const Manifest = struct { // Next we will check the globally shared Cache timestamp, which is accessed // from multiple threads. - man.cache.mutex.lock(); - defer man.cache.mutex.unlock(); + try man.cache.mutex.lock(io); + defer man.cache.mutex.unlock(io); // Save the global one to our local one to avoid locking next time. man.recent_problematic_timestamp = man.cache.recent_problematic_timestamp; @@ -871,11 +877,18 @@ pub const Manifest = struct { var file = man.cache.manifest_dir.createFile("timestamp", .{ .read = true, .truncate = true, - }) catch return true; + }) catch |err| switch (err) { + error.Canceled => return error.Canceled, + else => return true, + }; defer file.close(); // Save locally and also save globally (we still hold the global lock). - man.recent_problematic_timestamp = (file.stat() catch return true).mtime; + const stat = file.stat() catch |err| switch (err) { + error.Canceled => return error.Canceled, + else => return true, + }; + man.recent_problematic_timestamp = stat.mtime; man.cache.recent_problematic_timestamp = man.recent_problematic_timestamp; } @@ -902,7 +915,7 @@ pub const Manifest = struct { .inode = actual_stat.inode, }; - if (self.isProblematicTimestamp(ch_file.stat.mtime)) { + if (try self.isProblematicTimestamp(ch_file.stat.mtime)) { // The actual file has an unreliable timestamp, force it to be hashed ch_file.stat.mtime = .zero; ch_file.stat.inode = 0; @@ -1038,7 +1051,7 @@ pub const Manifest = struct { .contents = null, }; - if (self.isProblematicTimestamp(new_file.stat.mtime)) { + if (try self.isProblematicTimestamp(new_file.stat.mtime)) { // The actual file has an unreliable timestamp, force it to be hashed new_file.stat.mtime = .zero; new_file.stat.inode = 0; diff --git a/lib/std/Build/Fuzz.zig b/lib/std/Build/Fuzz.zig index 6c285ff313..bb4a960a73 100644 --- a/lib/std/Build/Fuzz.zig +++ b/lib/std/Build/Fuzz.zig @@ -27,11 +27,11 @@ root_prog_node: std.Progress.Node, prog_node: std.Progress.Node, /// Protects `coverage_files`. -coverage_mutex: std.Thread.Mutex, +coverage_mutex: Io.Mutex, coverage_files: std.AutoArrayHashMapUnmanaged(u64, CoverageMap), -queue_mutex: std.Thread.Mutex, -queue_cond: std.Thread.Condition, +queue_mutex: Io.Mutex, +queue_cond: Io.Condition, msg_queue: std.ArrayList(Msg), pub const Mode = union(enum) { @@ -122,8 +122,8 @@ pub fn init( .root_prog_node = root_prog_node, .prog_node = .none, .coverage_files = .empty, - .coverage_mutex = .{}, - .queue_mutex = .{}, + .coverage_mutex = .init, + .queue_mutex = .init, .queue_cond = .{}, .msg_queue = .empty, }; @@ -157,9 +157,7 @@ pub fn deinit(fuzz: *Fuzz) void { fn rebuildTestsWorkerRun(run: *Step.Run, gpa: Allocator, ttyconf: tty.Config, parent_prog_node: std.Progress.Node) void { rebuildTestsWorkerRunFallible(run, gpa, ttyconf, parent_prog_node) catch |err| { const compile = run.producer.?; - log.err("step '{s}': failed to rebuild in fuzz mode: {s}", .{ - compile.step.name, @errorName(err), - }); + log.err("step '{s}': failed to rebuild in fuzz mode: {t}", .{ compile.step.name, err }); }; } @@ -208,9 +206,7 @@ fn fuzzWorkerRun( return; }, else => { - log.err("step '{s}': failed to rerun '{s}' in fuzz mode: {s}", .{ - run.step.name, test_name, @errorName(err), - }); + log.err("step '{s}': failed to rerun '{s}' in fuzz mode: {t}", .{ run.step.name, test_name, err }); return; }, }; @@ -269,8 +265,10 @@ pub fn sendUpdate( socket: *std.http.Server.WebSocket, prev: *Previous, ) !void { - fuzz.coverage_mutex.lock(); - defer fuzz.coverage_mutex.unlock(); + const io = fuzz.io; + + try fuzz.coverage_mutex.lock(io); + defer fuzz.coverage_mutex.unlock(io); const coverage_maps = fuzz.coverage_files.values(); if (coverage_maps.len == 0) return; @@ -331,30 +329,41 @@ pub fn sendUpdate( } fn coverageRun(fuzz: *Fuzz) void { - fuzz.queue_mutex.lock(); - defer fuzz.queue_mutex.unlock(); + coverageRunCancelable(fuzz) catch |err| switch (err) { + error.Canceled => return, + }; +} + +fn coverageRunCancelable(fuzz: *Fuzz) Io.Cancelable!void { + const io = fuzz.io; + + try fuzz.queue_mutex.lock(io); + defer fuzz.queue_mutex.unlock(io); while (true) { - fuzz.queue_cond.wait(&fuzz.queue_mutex); + try fuzz.queue_cond.wait(io, &fuzz.queue_mutex); for (fuzz.msg_queue.items) |msg| switch (msg) { .coverage => |coverage| prepareTables(fuzz, coverage.run, coverage.id) catch |err| switch (err) { error.AlreadyReported => continue, - else => |e| log.err("failed to prepare code coverage tables: {s}", .{@errorName(e)}), + error.Canceled => return, + else => |e| log.err("failed to prepare code coverage tables: {t}", .{e}), }, .entry_point => |entry_point| addEntryPoint(fuzz, entry_point.coverage_id, entry_point.addr) catch |err| switch (err) { error.AlreadyReported => continue, - else => |e| log.err("failed to prepare code coverage tables: {s}", .{@errorName(e)}), + error.Canceled => return, + else => |e| log.err("failed to prepare code coverage tables: {t}", .{e}), }, }; fuzz.msg_queue.clearRetainingCapacity(); } } -fn prepareTables(fuzz: *Fuzz, run_step: *Step.Run, coverage_id: u64) error{ OutOfMemory, AlreadyReported }!void { +fn prepareTables(fuzz: *Fuzz, run_step: *Step.Run, coverage_id: u64) error{ OutOfMemory, AlreadyReported, Canceled }!void { assert(fuzz.mode == .forever); const ws = fuzz.mode.forever.ws; + const io = fuzz.io; - fuzz.coverage_mutex.lock(); - defer fuzz.coverage_mutex.unlock(); + try fuzz.coverage_mutex.lock(io); + defer fuzz.coverage_mutex.unlock(io); const gop = try fuzz.coverage_files.getOrPut(fuzz.gpa, coverage_id); if (gop.found_existing) { @@ -385,8 +394,8 @@ fn prepareTables(fuzz: *Fuzz, run_step: *Step.Run, coverage_id: u64) error{ OutO target.ofmt, target.cpu.arch, ) catch |err| { - log.err("step '{s}': failed to load debug information for '{f}': {s}", .{ - run_step.step.name, rebuilt_exe_path, @errorName(err), + log.err("step '{s}': failed to load debug information for '{f}': {t}", .{ + run_step.step.name, rebuilt_exe_path, err, }); return error.AlreadyReported; }; @@ -397,15 +406,15 @@ fn prepareTables(fuzz: *Fuzz, run_step: *Step.Run, coverage_id: u64) error{ OutO .sub_path = "v/" ++ std.fmt.hex(coverage_id), }; var coverage_file = coverage_file_path.root_dir.handle.openFile(coverage_file_path.sub_path, .{}) catch |err| { - log.err("step '{s}': failed to load coverage file '{f}': {s}", .{ - run_step.step.name, coverage_file_path, @errorName(err), + log.err("step '{s}': failed to load coverage file '{f}': {t}", .{ + run_step.step.name, coverage_file_path, err, }); return error.AlreadyReported; }; defer coverage_file.close(); const file_size = coverage_file.getEndPos() catch |err| { - log.err("unable to check len of coverage file '{f}': {s}", .{ coverage_file_path, @errorName(err) }); + log.err("unable to check len of coverage file '{f}': {t}", .{ coverage_file_path, err }); return error.AlreadyReported; }; @@ -417,7 +426,7 @@ fn prepareTables(fuzz: *Fuzz, run_step: *Step.Run, coverage_id: u64) error{ OutO coverage_file.handle, 0, ) catch |err| { - log.err("failed to map coverage file '{f}': {s}", .{ coverage_file_path, @errorName(err) }); + log.err("failed to map coverage file '{f}': {t}", .{ coverage_file_path, err }); return error.AlreadyReported; }; gop.value_ptr.mapped_memory = mapped_memory; @@ -443,7 +452,7 @@ fn prepareTables(fuzz: *Fuzz, run_step: *Step.Run, coverage_id: u64) error{ OutO }{ .addrs = sorted_pcs.items(.pc) }); debug_info.resolveAddresses(fuzz.gpa, sorted_pcs.items(.pc), sorted_pcs.items(.sl)) catch |err| { - log.err("failed to resolve addresses to source locations: {s}", .{@errorName(err)}); + log.err("failed to resolve addresses to source locations: {t}", .{err}); return error.AlreadyReported; }; @@ -453,9 +462,11 @@ fn prepareTables(fuzz: *Fuzz, run_step: *Step.Run, coverage_id: u64) error{ OutO ws.notifyUpdate(); } -fn addEntryPoint(fuzz: *Fuzz, coverage_id: u64, addr: u64) error{ AlreadyReported, OutOfMemory }!void { - fuzz.coverage_mutex.lock(); - defer fuzz.coverage_mutex.unlock(); +fn addEntryPoint(fuzz: *Fuzz, coverage_id: u64, addr: u64) error{ AlreadyReported, OutOfMemory, Canceled }!void { + const io = fuzz.io; + + try fuzz.coverage_mutex.lock(io); + defer fuzz.coverage_mutex.unlock(io); const coverage_map = fuzz.coverage_files.getPtr(coverage_id).?; const header: *const abi.SeenPcsHeader = @ptrCast(coverage_map.mapped_memory[0..@sizeOf(abi.SeenPcsHeader)]); @@ -518,8 +529,8 @@ pub fn waitAndPrintReport(fuzz: *Fuzz) void { .sub_path = "v/" ++ std.fmt.hex(cov.id), }; var coverage_file = coverage_file_path.root_dir.handle.openFile(coverage_file_path.sub_path, .{}) catch |err| { - fatal("step '{s}': failed to load coverage file '{f}': {s}", .{ - cov.run.step.name, coverage_file_path, @errorName(err), + fatal("step '{s}': failed to load coverage file '{f}': {t}", .{ + cov.run.step.name, coverage_file_path, err, }); }; defer coverage_file.close(); @@ -530,8 +541,8 @@ pub fn waitAndPrintReport(fuzz: *Fuzz) void { var header: fuzz_abi.SeenPcsHeader = undefined; r.interface.readSliceAll(std.mem.asBytes(&header)) catch |err| { - fatal("step '{s}': failed to read from coverage file '{f}': {s}", .{ - cov.run.step.name, coverage_file_path, @errorName(err), + fatal("step '{s}': failed to read from coverage file '{f}': {t}", .{ + cov.run.step.name, coverage_file_path, err, }); }; @@ -545,8 +556,8 @@ pub fn waitAndPrintReport(fuzz: *Fuzz) void { const chunk_count = fuzz_abi.SeenPcsHeader.seenElemsLen(header.pcs_len); for (0..chunk_count) |_| { const seen = r.interface.takeInt(usize, .little) catch |err| { - fatal("step '{s}': failed to read from coverage file '{f}': {s}", .{ - cov.run.step.name, coverage_file_path, @errorName(err), + fatal("step '{s}': failed to read from coverage file '{f}': {t}", .{ + cov.run.step.name, coverage_file_path, err, }); }; seen_count += @popCount(seen); diff --git a/lib/std/Build/Step.zig b/lib/std/Build/Step.zig index 478b600195..c247e69461 100644 --- a/lib/std/Build/Step.zig +++ b/lib/std/Build/Step.zig @@ -362,7 +362,7 @@ pub fn captureChildProcess( .allocator = arena, .argv = argv, .progress_node = progress_node, - }) catch |err| return s.fail("failed to run {s}: {s}", .{ argv[0], @errorName(err) }); + }) catch |err| return s.fail("failed to run {s}: {t}", .{ argv[0], err }); if (result.stderr.len > 0) { try s.result_error_msgs.append(arena, result.stderr); @@ -412,7 +412,7 @@ pub fn evalZigProcess( error.BrokenPipe => { // Process restart required. const term = zp.child.wait() catch |e| { - return s.fail("unable to wait for {s}: {s}", .{ argv[0], @errorName(e) }); + return s.fail("unable to wait for {s}: {t}", .{ argv[0], e }); }; _ = term; s.clearZigProcess(gpa); @@ -428,7 +428,7 @@ pub fn evalZigProcess( if (s.result_error_msgs.items.len > 0 and result == null) { // Crash detected. const term = zp.child.wait() catch |e| { - return s.fail("unable to wait for {s}: {s}", .{ argv[0], @errorName(e) }); + return s.fail("unable to wait for {s}: {t}", .{ argv[0], e }); }; s.result_peak_rss = zp.child.resource_usage_statistics.getMaxRss() orelse 0; s.clearZigProcess(gpa); @@ -453,9 +453,7 @@ pub fn evalZigProcess( child.request_resource_usage_statistics = true; child.progress_node = prog_node; - child.spawn() catch |err| return s.fail("failed to spawn zig compiler {s}: {s}", .{ - argv[0], @errorName(err), - }); + child.spawn() catch |err| return s.fail("failed to spawn zig compiler {s}: {t}", .{ argv[0], err }); const zp = try gpa.create(ZigProcess); zp.* = .{ @@ -480,7 +478,7 @@ pub fn evalZigProcess( zp.child.stdin = null; const term = zp.child.wait() catch |err| { - return s.fail("unable to wait for {s}: {s}", .{ argv[0], @errorName(err) }); + return s.fail("unable to wait for {s}: {t}", .{ argv[0], err }); }; s.result_peak_rss = zp.child.resource_usage_statistics.getMaxRss() orelse 0; @@ -513,8 +511,8 @@ pub fn installFile(s: *Step, src_lazy_path: Build.LazyPath, dest_path: []const u const src_path = src_lazy_path.getPath3(b, s); try handleVerbose(b, null, &.{ "install", "-C", b.fmt("{f}", .{src_path}), dest_path }); return Io.Dir.updateFile(src_path.root_dir.handle.adaptToNewApi(), io, src_path.sub_path, .cwd(), dest_path, .{}) catch |err| { - return s.fail("unable to update file from '{f}' to '{s}': {s}", .{ - src_path, dest_path, @errorName(err), + return s.fail("unable to update file from '{f}' to '{s}': {t}", .{ + src_path, dest_path, err, }); }; } @@ -524,9 +522,7 @@ pub fn installDir(s: *Step, dest_path: []const u8) !std.fs.Dir.MakePathStatus { const b = s.owner; try handleVerbose(b, null, &.{ "install", "-d", dest_path }); return std.fs.cwd().makePathStatus(dest_path) catch |err| { - return s.fail("unable to create dir '{s}': {s}", .{ - dest_path, @errorName(err), - }); + return s.fail("unable to create dir '{s}': {t}", .{ dest_path, err }); }; } @@ -825,22 +821,27 @@ pub fn cacheHitAndWatch(s: *Step, man: *Build.Cache.Manifest) !bool { return is_hit; } -fn failWithCacheError(s: *Step, man: *const Build.Cache.Manifest, err: Build.Cache.Manifest.HitError) error{ OutOfMemory, MakeFailed } { +fn failWithCacheError( + s: *Step, + man: *const Build.Cache.Manifest, + err: Build.Cache.Manifest.HitError, +) error{ OutOfMemory, Canceled, MakeFailed } { switch (err) { error.CacheCheckFailed => switch (man.diagnostic) { .none => unreachable, - .manifest_create, .manifest_read, .manifest_lock => |e| return s.fail("failed to check cache: {s} {s}", .{ - @tagName(man.diagnostic), @errorName(e), + .manifest_create, .manifest_read, .manifest_lock => |e| return s.fail("failed to check cache: {t} {t}", .{ + man.diagnostic, e, }), .file_open, .file_stat, .file_read, .file_hash => |op| { const pp = man.files.keys()[op.file_index].prefixed_path; const prefix = man.cache.prefixes()[pp.prefix].path orelse ""; - return s.fail("failed to check cache: '{s}{c}{s}' {s} {s}", .{ - prefix, std.fs.path.sep, pp.sub_path, @tagName(man.diagnostic), @errorName(op.err), + return s.fail("failed to check cache: '{s}{c}{s}' {t} {t}", .{ + prefix, std.fs.path.sep, pp.sub_path, man.diagnostic, op.err, }); }, }, error.OutOfMemory => return error.OutOfMemory, + error.Canceled => return error.Canceled, error.InvalidFormat => return s.fail("failed to check cache: invalid manifest file format", .{}), } } @@ -850,7 +851,7 @@ fn failWithCacheError(s: *Step, man: *const Build.Cache.Manifest, err: Build.Cac pub fn writeManifest(s: *Step, man: *Build.Cache.Manifest) !void { if (s.test_results.isSuccess()) { man.writeManifest() catch |err| { - try s.addError("unable to write cache manifest: {s}", .{@errorName(err)}); + try s.addError("unable to write cache manifest: {t}", .{err}); }; } } diff --git a/lib/std/Build/Step/Run.zig b/lib/std/Build/Step/Run.zig index bcde251d48..764d2830ff 100644 --- a/lib/std/Build/Step/Run.zig +++ b/lib/std/Build/Step/Run.zig @@ -1830,6 +1830,7 @@ fn pollZigTest( } { const gpa = run.step.owner.allocator; const arena = run.step.owner.allocator; + const io = run.step.owner.graph.io; var sub_prog_node: ?std.Progress.Node = null; defer if (sub_prog_node) |n| n.end(); @@ -2035,8 +2036,8 @@ fn pollZigTest( { const fuzz = fuzz_context.?.fuzz; - fuzz.queue_mutex.lock(); - defer fuzz.queue_mutex.unlock(); + fuzz.queue_mutex.lockUncancelable(io); + defer fuzz.queue_mutex.unlock(io); try fuzz.msg_queue.append(fuzz.gpa, .{ .coverage = .{ .id = coverage_id.?, .cumulative = .{ @@ -2046,20 +2047,20 @@ fn pollZigTest( }, .run = run, } }); - fuzz.queue_cond.signal(); + fuzz.queue_cond.signal(io); } }, .fuzz_start_addr => { const fuzz = fuzz_context.?.fuzz; const addr = body_r.takeInt(u64, .little) catch unreachable; { - fuzz.queue_mutex.lock(); - defer fuzz.queue_mutex.unlock(); + fuzz.queue_mutex.lockUncancelable(io); + defer fuzz.queue_mutex.unlock(io); try fuzz.msg_queue.append(fuzz.gpa, .{ .entry_point = .{ .addr = addr, .coverage_id = coverage_id.?, } }); - fuzz.queue_cond.signal(); + fuzz.queue_cond.signal(io); } }, else => {}, // ignore other messages diff --git a/lib/std/Build/WebServer.zig b/lib/std/Build/WebServer.zig index e28dd75b5e..4d649e6f9b 100644 --- a/lib/std/Build/WebServer.zig +++ b/lib/std/Build/WebServer.zig @@ -19,7 +19,7 @@ step_names_trailing: []u8, step_status_bits: []u8, fuzz: ?Fuzz, -time_report_mutex: std.Thread.Mutex, +time_report_mutex: Io.Mutex, time_report_msgs: [][]u8, time_report_update_times: []i64, @@ -33,9 +33,9 @@ build_status: std.atomic.Value(abi.BuildStatus), /// an unreasonable number of packets. update_id: std.atomic.Value(u32), -runner_request_mutex: std.Thread.Mutex, -runner_request_ready_cond: std.Thread.Condition, -runner_request_empty_cond: std.Thread.Condition, +runner_request_mutex: Io.Mutex, +runner_request_ready_cond: Io.Condition, +runner_request_empty_cond: Io.Condition, runner_request: ?RunnerRequest, /// If a client is not explicitly notified of changes with `notifyUpdate`, it will be sent updates @@ -114,14 +114,14 @@ pub fn init(opts: Options) WebServer { .step_status_bits = step_status_bits, .fuzz = null, - .time_report_mutex = .{}, + .time_report_mutex = .init, .time_report_msgs = time_report_msgs, .time_report_update_times = time_report_update_times, .build_status = .init(.idle), .update_id = .init(0), - .runner_request_mutex = .{}, + .runner_request_mutex = .init, .runner_request_ready_cond = .{}, .runner_request_empty_cond = .{}, .runner_request = null, @@ -296,6 +296,8 @@ fn accept(ws: *WebServer, stream: net.Stream) void { } fn serveWebSocket(ws: *WebServer, sock: *http.Server.WebSocket) !noreturn { + const io = ws.graph.io; + var prev_build_status = ws.build_status.load(.monotonic); const prev_step_status_bits = try ws.gpa.alloc(u8, ws.step_status_bits.len); @@ -331,8 +333,8 @@ fn serveWebSocket(ws: *WebServer, sock: *http.Server.WebSocket) !noreturn { } { - ws.time_report_mutex.lock(); - defer ws.time_report_mutex.unlock(); + try ws.time_report_mutex.lock(io); + defer ws.time_report_mutex.unlock(io); for (ws.time_report_msgs, ws.time_report_update_times) |msg, update_time| { if (update_time <= prev_time) continue; // We want to send `msg`, but shouldn't block `ws.time_report_mutex` while we do, so @@ -340,8 +342,8 @@ fn serveWebSocket(ws: *WebServer, sock: *http.Server.WebSocket) !noreturn { const owned_msg = try ws.gpa.dupe(u8, msg); defer ws.gpa.free(owned_msg); // Temporarily unlock, then re-lock after the message is sent. - ws.time_report_mutex.unlock(); - defer ws.time_report_mutex.lock(); + ws.time_report_mutex.unlock(io); + defer ws.time_report_mutex.lockUncancelable(io); try sock.writeMessage(owned_msg, .binary); } } @@ -382,6 +384,8 @@ fn serveWebSocket(ws: *WebServer, sock: *http.Server.WebSocket) !noreturn { } } fn recvWebSocketMessages(ws: *WebServer, sock: *http.Server.WebSocket) void { + const io = ws.graph.io; + while (true) { const msg = sock.readSmallMessage() catch return; if (msg.opcode != .binary) continue; @@ -390,14 +394,16 @@ fn recvWebSocketMessages(ws: *WebServer, sock: *http.Server.WebSocket) void { switch (tag) { _ => continue, .rebuild => while (true) { - ws.runner_request_mutex.lock(); - defer ws.runner_request_mutex.unlock(); + ws.runner_request_mutex.lock(io) catch |err| switch (err) { + error.Canceled => return, + }; + defer ws.runner_request_mutex.unlock(io); if (ws.runner_request == null) { ws.runner_request = .rebuild; - ws.runner_request_ready_cond.signal(); + ws.runner_request_ready_cond.signal(io); break; } - ws.runner_request_empty_cond.wait(&ws.runner_request_mutex); + ws.runner_request_empty_cond.wait(io, &ws.runner_request_mutex) catch return; }, } } @@ -691,14 +697,15 @@ pub fn updateTimeReportCompile(ws: *WebServer, opts: struct { trailing: []const u8, }) void { const gpa = ws.gpa; + const io = ws.graph.io; const step_idx: u32 = for (ws.all_steps, 0..) |s, i| { if (s == &opts.compile.step) break @intCast(i); } else unreachable; const old_buf = old: { - ws.time_report_mutex.lock(); - defer ws.time_report_mutex.unlock(); + ws.time_report_mutex.lock(io) catch return; + defer ws.time_report_mutex.unlock(io); const old = ws.time_report_msgs[step_idx]; ws.time_report_msgs[step_idx] = &.{}; break :old old; @@ -720,8 +727,8 @@ pub fn updateTimeReportCompile(ws: *WebServer, opts: struct { @memcpy(buf[@sizeOf(abi.time_report.CompileResult)..], opts.trailing); { - ws.time_report_mutex.lock(); - defer ws.time_report_mutex.unlock(); + ws.time_report_mutex.lock(io) catch return; + defer ws.time_report_mutex.unlock(io); assert(ws.time_report_msgs[step_idx].len == 0); ws.time_report_msgs[step_idx] = buf; ws.time_report_update_times[step_idx] = ws.now(); @@ -731,14 +738,15 @@ pub fn updateTimeReportCompile(ws: *WebServer, opts: struct { pub fn updateTimeReportGeneric(ws: *WebServer, step: *Build.Step, ns_total: u64) void { const gpa = ws.gpa; + const io = ws.graph.io; const step_idx: u32 = for (ws.all_steps, 0..) |s, i| { if (s == step) break @intCast(i); } else unreachable; const old_buf = old: { - ws.time_report_mutex.lock(); - defer ws.time_report_mutex.unlock(); + ws.time_report_mutex.lock(io) catch return; + defer ws.time_report_mutex.unlock(io); const old = ws.time_report_msgs[step_idx]; ws.time_report_msgs[step_idx] = &.{}; break :old old; @@ -750,8 +758,8 @@ pub fn updateTimeReportGeneric(ws: *WebServer, step: *Build.Step, ns_total: u64) .ns_total = ns_total, }; { - ws.time_report_mutex.lock(); - defer ws.time_report_mutex.unlock(); + ws.time_report_mutex.lock(io) catch return; + defer ws.time_report_mutex.unlock(io); assert(ws.time_report_msgs[step_idx].len == 0); ws.time_report_msgs[step_idx] = buf; ws.time_report_update_times[step_idx] = ws.now(); @@ -766,6 +774,7 @@ pub fn updateTimeReportRunTest( ns_per_test: []const u64, ) void { const gpa = ws.gpa; + const io = ws.graph.io; const step_idx: u32 = for (ws.all_steps, 0..) |s, i| { if (s == &run.step) break @intCast(i); @@ -782,8 +791,8 @@ pub fn updateTimeReportRunTest( break :len @sizeOf(abi.time_report.RunTestResult) + names_len + 8 * tests_len; }; const old_buf = old: { - ws.time_report_mutex.lock(); - defer ws.time_report_mutex.unlock(); + ws.time_report_mutex.lock(io) catch return; + defer ws.time_report_mutex.unlock(io); const old = ws.time_report_msgs[step_idx]; ws.time_report_msgs[step_idx] = &.{}; break :old old; @@ -808,8 +817,8 @@ pub fn updateTimeReportRunTest( assert(offset == buf.len); { - ws.time_report_mutex.lock(); - defer ws.time_report_mutex.unlock(); + ws.time_report_mutex.lock(io) catch return; + defer ws.time_report_mutex.unlock(io); assert(ws.time_report_msgs[step_idx].len == 0); ws.time_report_msgs[step_idx] = buf; ws.time_report_update_times[step_idx] = ws.now(); @@ -821,8 +830,9 @@ const RunnerRequest = union(enum) { rebuild, }; pub fn getRunnerRequest(ws: *WebServer) ?RunnerRequest { - ws.runner_request_mutex.lock(); - defer ws.runner_request_mutex.unlock(); + const io = ws.graph.io; + ws.runner_request_mutex.lock(io) catch return; + defer ws.runner_request_mutex.unlock(io); if (ws.runner_request) |req| { ws.runner_request = null; ws.runner_request_empty_cond.signal(); @@ -830,16 +840,17 @@ pub fn getRunnerRequest(ws: *WebServer) ?RunnerRequest { } return null; } -pub fn wait(ws: *WebServer) RunnerRequest { - ws.runner_request_mutex.lock(); - defer ws.runner_request_mutex.unlock(); +pub fn wait(ws: *WebServer) Io.Cancelable!RunnerRequest { + const io = ws.graph.io; + try ws.runner_request_mutex.lock(io); + defer ws.runner_request_mutex.unlock(io); while (true) { if (ws.runner_request) |req| { ws.runner_request = null; - ws.runner_request_empty_cond.signal(); + ws.runner_request_empty_cond.signal(io); return req; } - ws.runner_request_ready_cond.wait(&ws.runner_request_mutex); + try ws.runner_request_ready_cond.wait(io, &ws.runner_request_mutex); } } From ece62a0223cfad5e49a5c75a944a21e735a01a05 Mon Sep 17 00:00:00 2001 From: Andrew Kelley Date: Mon, 24 Nov 2025 11:58:24 -0800 Subject: [PATCH 3/4] frontend: introduce error.Canceled --- src/Compilation.zig | 30 +++++++++++++++--------------- src/Sema.zig | 14 ++++++++------ src/Type.zig | 3 ++- src/Value.zig | 10 +++++++--- src/Zcu.zig | 4 +++- src/Zcu/PerThread.zig | 27 ++++++++++++++++++--------- src/print_value.zig | 2 ++ 7 files changed, 55 insertions(+), 35 deletions(-) diff --git a/src/Compilation.zig b/src/Compilation.zig index c76bcc37ea..687525cbf3 100644 --- a/src/Compilation.zig +++ b/src/Compilation.zig @@ -2851,6 +2851,7 @@ fn cleanupAfterUpdate(comp: *Compilation, tmp_dir_rand_int: u64) void { pub const UpdateError = error{ OutOfMemory, + Canceled, Unexpected, CurrentWorkingDirectoryUnlinked, }; @@ -2930,6 +2931,7 @@ pub fn update(comp: *Compilation, main_progress_node: std.Progress.Node) UpdateE }, }, error.OutOfMemory => return error.OutOfMemory, + error.Canceled => return error.Canceled, error.InvalidFormat => return comp.setMiscFailure( .check_whole_cache, "failed to check cache: invalid manifest file format", @@ -5010,7 +5012,7 @@ fn performAllTheWork( } } -const JobError = Allocator.Error; +const JobError = Allocator.Error || Io.Cancelable; pub fn queueJob(comp: *Compilation, job: Job) !void { try comp.work_queues[Job.stage(job)].pushBack(comp.gpa, job); @@ -5117,6 +5119,7 @@ fn processOneJob(tid: usize, comp: *Compilation, job: Job) JobError!void { pt.ensureFuncBodyUpToDate(func) catch |err| switch (err) { error.OutOfMemory => |e| return e, + error.Canceled => |e| return e, error.AnalysisFail => return, }; }, @@ -5137,6 +5140,7 @@ fn processOneJob(tid: usize, comp: *Compilation, job: Job) JobError!void { }; maybe_err catch |err| switch (err) { error.OutOfMemory => |e| return e, + error.Canceled => |e| return e, error.AnalysisFail => return, }; @@ -5166,7 +5170,7 @@ fn processOneJob(tid: usize, comp: *Compilation, job: Job) JobError!void { const pt: Zcu.PerThread = .activate(comp.zcu.?, @enumFromInt(tid)); defer pt.deactivate(); Type.fromInterned(ty).resolveFully(pt) catch |err| switch (err) { - error.OutOfMemory => return error.OutOfMemory, + error.OutOfMemory, error.Canceled => |e| return e, error.AnalysisFail => return, }; }, @@ -5177,7 +5181,7 @@ fn processOneJob(tid: usize, comp: *Compilation, job: Job) JobError!void { const pt: Zcu.PerThread = .activate(comp.zcu.?, @enumFromInt(tid)); defer pt.deactivate(); pt.semaMod(mod) catch |err| switch (err) { - error.OutOfMemory => return error.OutOfMemory, + error.OutOfMemory, error.Canceled => |e| return e, error.AnalysisFail => return, }; }, @@ -5190,8 +5194,8 @@ fn processOneJob(tid: usize, comp: *Compilation, job: Job) JobError!void { // TODO Surface more error details. comp.lockAndSetMiscFailure( .windows_import_lib, - "unable to generate DLL import .lib file for {s}: {s}", - .{ link_lib, @errorName(err) }, + "unable to generate DLL import .lib file for {s}: {t}", + .{ link_lib, err }, ); }; }, @@ -6066,14 +6070,10 @@ fn buildLibZigC(comp: *Compilation, prog_node: std.Progress.Node) void { }; } -fn reportRetryableCObjectError( - comp: *Compilation, - c_object: *CObject, - err: anyerror, -) error{OutOfMemory}!void { +fn reportRetryableCObjectError(comp: *Compilation, c_object: *CObject, err: anyerror) error{OutOfMemory}!void { c_object.status = .failure_retryable; - switch (comp.failCObj(c_object, "{s}", .{@errorName(err)})) { + switch (comp.failCObj(c_object, "{t}", .{err})) { error.AnalysisFail => return, else => |e| return e, } @@ -7317,7 +7317,7 @@ fn failCObj( c_object: *CObject, comptime format: []const u8, args: anytype, -) SemaError { +) error{ OutOfMemory, AnalysisFail } { @branchHint(.cold); const diag_bundle = blk: { const diag_bundle = try comp.gpa.create(CObject.Diag.Bundle); @@ -7341,7 +7341,7 @@ fn failCObjWithOwnedDiagBundle( comp: *Compilation, c_object: *CObject, diag_bundle: *CObject.Diag.Bundle, -) SemaError { +) error{ OutOfMemory, AnalysisFail } { @branchHint(.cold); assert(diag_bundle.diags.len > 0); { @@ -7357,7 +7357,7 @@ fn failCObjWithOwnedDiagBundle( return error.AnalysisFail; } -fn failWin32Resource(comp: *Compilation, win32_resource: *Win32Resource, comptime format: []const u8, args: anytype) SemaError { +fn failWin32Resource(comp: *Compilation, win32_resource: *Win32Resource, comptime format: []const u8, args: anytype) error{ OutOfMemory, AnalysisFail } { @branchHint(.cold); var bundle: ErrorBundle.Wip = undefined; try bundle.init(comp.gpa); @@ -7384,7 +7384,7 @@ fn failWin32ResourceWithOwnedBundle( comp: *Compilation, win32_resource: *Win32Resource, err_bundle: ErrorBundle, -) SemaError { +) error{ OutOfMemory, AnalysisFail } { @branchHint(.cold); { comp.mutex.lock(); diff --git a/src/Sema.zig b/src/Sema.zig index 2ca75e1574..324f1b6867 100644 --- a/src/Sema.zig +++ b/src/Sema.zig @@ -6696,7 +6696,7 @@ pub fn analyzeSaveErrRetIndex(sema: *Sema, block: *Block) SemaError!Air.Inst.Ref const field_index = sema.structFieldIndex(block, stack_trace_ty, field_name, LazySrcLoc.unneeded) catch |err| switch (err) { error.AnalysisFail => @panic("std.builtin.StackTrace is corrupt"), error.ComptimeReturn, error.ComptimeBreak => unreachable, - error.OutOfMemory => |e| return e, + error.OutOfMemory, error.Canceled => |e| return e, }; return try block.addInst(.{ @@ -13924,6 +13924,7 @@ fn zirEmbedFile(sema: *Sema, block: *Block, inst: Zir.Inst.Index) CompileError!A return sema.fail(block, operand_src, "unable to resolve '{s}': working directory has been unlinked", .{name}); }, error.OutOfMemory => |e| return e, + error.Canceled => |e| return e, }; try sema.declareDependency(.{ .embed_file = ef_idx }); @@ -34345,7 +34346,7 @@ pub fn resolveStructLayout(sema: *Sema, ty: Type) SemaError!void { if (struct_type.layout == .@"packed") { sema.backingIntType(struct_type) catch |err| switch (err) { - error.OutOfMemory, error.AnalysisFail => |e| return e, + error.AnalysisFail, error.OutOfMemory, error.Canceled => |e| return e, error.ComptimeBreak, error.ComptimeReturn => unreachable, }; return; @@ -34893,7 +34894,7 @@ pub fn resolveStructFieldTypes( defer tracked_unit.end(zcu); sema.structFields(struct_type) catch |err| switch (err) { - error.AnalysisFail, error.OutOfMemory => |e| return e, + error.AnalysisFail, error.OutOfMemory, error.Canceled => |e| return e, error.ComptimeBreak, error.ComptimeReturn => unreachable, }; } @@ -34926,7 +34927,7 @@ pub fn resolveStructFieldInits(sema: *Sema, ty: Type) SemaError!void { defer tracked_unit.end(zcu); sema.structFieldInits(struct_type) catch |err| switch (err) { - error.AnalysisFail, error.OutOfMemory => |e| return e, + error.AnalysisFail, error.OutOfMemory, error.Canceled => |e| return e, error.ComptimeBreak, error.ComptimeReturn => unreachable, }; struct_type.setHaveFieldInits(ip); @@ -34960,7 +34961,7 @@ pub fn resolveUnionFieldTypes(sema: *Sema, ty: Type, union_type: InternPool.Load union_type.setStatus(ip, .field_types_wip); errdefer union_type.setStatus(ip, .none); sema.unionFields(ty.toIntern(), union_type) catch |err| switch (err) { - error.AnalysisFail, error.OutOfMemory => |e| return e, + error.AnalysisFail, error.OutOfMemory, error.Canceled => |e| return e, error.ComptimeBreak, error.ComptimeReturn => unreachable, }; union_type.setStatus(ip, .have_field_types); @@ -37027,6 +37028,7 @@ fn notePathToComptimeAllocPtr( const derivation = comptime_ptr.pointerDerivationAdvanced(arena, pt, false, sema) catch |err| switch (err) { error.OutOfMemory => |e| return e, + error.Canceled => @panic("TODO"), // pls don't be cancelable mlugg error.AnalysisFail => unreachable, }; @@ -37367,7 +37369,7 @@ pub fn resolveDeclaredEnum( ) catch |err| switch (err) { error.ComptimeBreak => unreachable, error.ComptimeReturn => unreachable, - error.OutOfMemory => |e| return e, + error.OutOfMemory, error.Canceled => |e| return e, error.AnalysisFail => { if (!zcu.failed_analysis.contains(sema.owner)) { try zcu.transitive_failed_analysis.put(gpa, sema.owner, {}); diff --git a/src/Type.zig b/src/Type.zig index 3d3b36640c..3c13f3db94 100644 --- a/src/Type.zig +++ b/src/Type.zig @@ -3837,7 +3837,7 @@ fn resolveStructInner( } return error.AnalysisFail; }, - error.OutOfMemory => |e| return e, + error.OutOfMemory, error.Canceled => |e| return e, }; } @@ -3896,6 +3896,7 @@ fn resolveUnionInner( return error.AnalysisFail; }, error.OutOfMemory => |e| return e, + error.Canceled => |e| return e, }; } diff --git a/src/Value.zig b/src/Value.zig index 930f94a5a3..e2699746fa 100644 --- a/src/Value.zig +++ b/src/Value.zig @@ -1,12 +1,15 @@ -const std = @import("std"); -const builtin = @import("builtin"); const build_options = @import("build_options"); -const Type = @import("Type.zig"); +const builtin = @import("builtin"); + +const std = @import("std"); +const Io = std.Io; const assert = std.debug.assert; const BigIntConst = std.math.big.int.Const; const BigIntMutable = std.math.big.int.Mutable; const Target = std.Target; const Allocator = std.mem.Allocator; + +const Type = @import("Type.zig"); const Zcu = @import("Zcu.zig"); const Sema = @import("Sema.zig"); const InternPool = @import("InternPool.zig"); @@ -2410,6 +2413,7 @@ pub const PointerDeriveStep = union(enum) { pub fn pointerDerivation(ptr_val: Value, arena: Allocator, pt: Zcu.PerThread) Allocator.Error!PointerDeriveStep { return ptr_val.pointerDerivationAdvanced(arena, pt, false, null) catch |err| switch (err) { error.OutOfMemory => |e| return e, + error.Canceled => @panic("TODO"), // pls remove from error set mlugg error.AnalysisFail => unreachable, }; } diff --git a/src/Zcu.zig b/src/Zcu.zig index c170ef6d71..6027ab07fa 100644 --- a/src/Zcu.zig +++ b/src/Zcu.zig @@ -2755,9 +2755,11 @@ pub const LazySrcLoc = struct { } }; -pub const SemaError = error{ OutOfMemory, AnalysisFail }; +pub const SemaError = error{ OutOfMemory, Canceled, AnalysisFail }; pub const CompileError = error{ OutOfMemory, + /// The compilation update is no longer desired. + Canceled, /// When this is returned, the compile error for the failure has already been recorded. AnalysisFail, /// In a comptime scope, a return instruction was encountered. This error is only seen when diff --git a/src/Zcu/PerThread.zig b/src/Zcu/PerThread.zig index 78a35fb124..702e30dab0 100644 --- a/src/Zcu/PerThread.zig +++ b/src/Zcu/PerThread.zig @@ -1,26 +1,31 @@ //! This type provides a wrapper around a `*Zcu` for uses which require a thread `Id`. //! Any operation which mutates `InternPool` state lives here rather than on `Zcu`. -const Air = @import("../Air.zig"); +const std = @import("std"); const Allocator = std.mem.Allocator; const assert = std.debug.assert; const Ast = std.zig.Ast; const AstGen = std.zig.AstGen; const BigIntConst = std.math.big.int.Const; const BigIntMutable = std.math.big.int.Mutable; +const Cache = std.Build.Cache; +const log = std.log.scoped(.zcu); +const mem = std.mem; +const Zir = std.zig.Zir; +const Zoir = std.zig.Zoir; +const ZonGen = std.zig.ZonGen; +const Io = std.Io; + +const Air = @import("../Air.zig"); const Builtin = @import("../Builtin.zig"); const build_options = @import("build_options"); const builtin = @import("builtin"); -const Cache = std.Build.Cache; const dev = @import("../dev.zig"); const InternPool = @import("../InternPool.zig"); const AnalUnit = InternPool.AnalUnit; const introspect = @import("../introspect.zig"); -const log = std.log.scoped(.zcu); const Module = @import("../Package.zig").Module; const Sema = @import("../Sema.zig"); -const std = @import("std"); -const mem = std.mem; const target_util = @import("../target.zig"); const trace = @import("../tracy.zig").trace; const Type = @import("../Type.zig"); @@ -29,9 +34,6 @@ const Zcu = @import("../Zcu.zig"); const Compilation = @import("../Compilation.zig"); const codegen = @import("../codegen.zig"); const crash_report = @import("../crash_report.zig"); -const Zir = std.zig.Zir; -const Zoir = std.zig.Zoir; -const ZonGen = std.zig.ZonGen; zcu: *Zcu, @@ -678,6 +680,7 @@ pub fn ensureMemoizedStateUpToDate(pt: Zcu.PerThread, stage: InternPool.Memoized // TODO: same as for `ensureComptimeUnitUpToDate` etc return error.OutOfMemory; }, + error.Canceled => |e| return e, error.ComptimeReturn => unreachable, error.ComptimeBreak => unreachable, }; @@ -842,6 +845,7 @@ pub fn ensureComptimeUnitUpToDate(pt: Zcu.PerThread, cu_id: InternPool.ComptimeU // for reporting OOM errors without allocating. return error.OutOfMemory; }, + error.Canceled => |e| return e, error.ComptimeReturn => unreachable, error.ComptimeBreak => unreachable, }; @@ -1030,6 +1034,7 @@ pub fn ensureNavValUpToDate(pt: Zcu.PerThread, nav_id: InternPool.Nav.Index) Zcu // for reporting OOM errors without allocating. return error.OutOfMemory; }, + error.Canceled => |e| return e, error.ComptimeReturn => unreachable, error.ComptimeBreak => unreachable, }; @@ -1443,6 +1448,7 @@ pub fn ensureNavTypeUpToDate(pt: Zcu.PerThread, nav_id: InternPool.Nav.Index) Zc // for reporting OOM errors without allocating. return error.OutOfMemory; }, + error.Canceled => |e| return e, error.ComptimeReturn => unreachable, error.ComptimeBreak => unreachable, }; @@ -1668,6 +1674,7 @@ pub fn ensureFuncBodyUpToDate(pt: Zcu.PerThread, func_index: InternPool.Index) Z // for reporting OOM errors without allocating. return error.OutOfMemory; }, + error.Canceled => |e| return e, }; if (was_outdated) { @@ -2360,6 +2367,7 @@ pub fn embedFile( import_string: []const u8, ) error{ OutOfMemory, + Canceled, ImportOutsideModulePath, CurrentWorkingDirectoryUnlinked, }!Zcu.EmbedFile.Index { @@ -4123,7 +4131,7 @@ fn recreateEnumType( pt: Zcu.PerThread, old_ty: InternPool.Index, key: InternPool.Key.NamespaceType.Declared, -) Allocator.Error!InternPool.Index { +) (Allocator.Error || Io.Cancelable)!InternPool.Index { const zcu = pt.zcu; const gpa = zcu.gpa; const ip = &zcu.intern_pool; @@ -4234,6 +4242,7 @@ fn recreateEnumType( body_end, ) catch |err| switch (err) { error.OutOfMemory => |e| return e, + error.Canceled => |e| return e, error.AnalysisFail => {}, // call sites are responsible for checking `[transitive_]failed_analysis` to detect this }; diff --git a/src/print_value.zig b/src/print_value.zig index 9a44e8d7a4..28c2595427 100644 --- a/src/print_value.zig +++ b/src/print_value.zig @@ -27,6 +27,7 @@ pub fn formatSema(ctx: FormatContext, writer: *Writer) Writer.Error!void { error.OutOfMemory => @panic("OOM"), // We're not allowed to return this from a format function error.ComptimeBreak, error.ComptimeReturn => unreachable, error.AnalysisFail => unreachable, // TODO: re-evaluate when we use `sema` more fully + error.Canceled => @panic("TODO"), // pls stop returning this error mlugg else => |e| return e, }; } @@ -36,6 +37,7 @@ pub fn format(ctx: FormatContext, writer: *Writer) Writer.Error!void { return print(ctx.val, writer, ctx.depth, ctx.pt, null) catch |err| switch (err) { error.OutOfMemory => @panic("OOM"), // We're not allowed to return this from a format function error.ComptimeBreak, error.ComptimeReturn, error.AnalysisFail => unreachable, + error.Canceled => @panic("TODO"), // pls stop returning this error mlugg else => |e| return e, }; } From 84353183c724298d2341b857bbd69975d315ec8a Mon Sep 17 00:00:00 2001 From: Andrew Kelley Date: Mon, 24 Nov 2025 14:33:59 -0800 Subject: [PATCH 4/4] build runner: fix recursive locking of max_rss_mutex --- lib/compiler/build_runner.zig | 52 ++++++++++++++++++++--------------- 1 file changed, 30 insertions(+), 22 deletions(-) diff --git a/lib/compiler/build_runner.zig b/lib/compiler/build_runner.zig index fb2385b7e0..272b1b8077 100644 --- a/lib/compiler/build_runner.zig +++ b/lib/compiler/build_runner.zig @@ -1306,6 +1306,7 @@ fn workerMakeOneStep( run: *Run, ) void { const io = b.graph.io; + const gpa = run.gpa; // First, check the conditions for running this step. If they are not met, // then we return without doing the step, relying on another worker to @@ -1341,7 +1342,7 @@ fn workerMakeOneStep( if (new_claimed_rss > run.max_rss) { // Running this step right now could possibly exceed the allotted RSS. // Add this step to the queue of memory-blocked steps. - run.memory_blocked_steps.append(run.gpa, s) catch @panic("OOM"); + run.memory_blocked_steps.append(gpa, s) catch @panic("OOM"); return; } @@ -1366,7 +1367,7 @@ fn workerMakeOneStep( .web_server = if (run.web_server) |*ws| ws else null, .ttyconf = run.ttyconf, .unit_test_timeout_ns = run.unit_test_timeout_ns, - .gpa = run.gpa, + .gpa = gpa, }); // No matter the result, we want to display error/warning messages. @@ -1377,7 +1378,7 @@ fn workerMakeOneStep( const bw, _ = std.debug.lockStderrWriter(&stdio_buffer_allocation); defer std.debug.unlockStderrWriter(); const ttyconf = run.ttyconf; - printErrorMessages(run.gpa, s, .{}, bw, ttyconf, run.error_style, run.multiline_errors) catch {}; + printErrorMessages(gpa, s, .{}, bw, ttyconf, run.error_style, run.multiline_errors) catch {}; } handle_result: { @@ -1406,29 +1407,36 @@ fn workerMakeOneStep( // If this is a step that claims resources, we must now queue up other // steps that are waiting for resources. if (s.max_rss != 0) { - run.max_rss_mutex.lockUncancelable(io); - defer run.max_rss_mutex.unlock(io); + var dispatch_deps: std.ArrayList(*Step) = .empty; + defer dispatch_deps.deinit(gpa); + dispatch_deps.ensureUnusedCapacity(gpa, run.memory_blocked_steps.items.len) catch @panic("OOM"); - // Give the memory back to the scheduler. - run.claimed_rss -= s.max_rss; - // Avoid kicking off too many tasks that we already know will not have - // enough resources. - var remaining = run.max_rss - run.claimed_rss; - var i: usize = 0; - var j: usize = 0; - while (j < run.memory_blocked_steps.items.len) : (j += 1) { - const dep = run.memory_blocked_steps.items[j]; - assert(dep.max_rss != 0); - if (dep.max_rss <= remaining) { - remaining -= dep.max_rss; + { + run.max_rss_mutex.lockUncancelable(io); + defer run.max_rss_mutex.unlock(io); - group.async(io, workerMakeOneStep, .{ group, b, dep, prog_node, run }); - } else { - run.memory_blocked_steps.items[i] = dep; - i += 1; + // Give the memory back to the scheduler. + run.claimed_rss -= s.max_rss; + // Avoid kicking off too many tasks that we already know will not have + // enough resources. + var remaining = run.max_rss - run.claimed_rss; + var i: usize = 0; + for (run.memory_blocked_steps.items) |dep| { + assert(dep.max_rss != 0); + if (dep.max_rss <= remaining) { + remaining -= dep.max_rss; + dispatch_deps.appendAssumeCapacity(dep); + } else { + run.memory_blocked_steps.items[i] = dep; + i += 1; + } } + run.memory_blocked_steps.shrinkRetainingCapacity(i); + } + for (dispatch_deps.items) |dep| { + // Must be called without max_rss_mutex held in case it executes recursively. + group.async(io, workerMakeOneStep, .{ group, b, dep, prog_node, run }); } - run.memory_blocked_steps.shrinkRetainingCapacity(i); } }