diff --git a/CMakeLists.txt b/CMakeLists.txt index 9a758008f6..a789f3c4df 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -412,8 +412,6 @@ set(ZIG_STAGE2_SOURCES lib/std/Thread.zig lib/std/Thread/Futex.zig lib/std/Thread/Mutex.zig - lib/std/Thread/Pool.zig - lib/std/Thread/WaitGroup.zig lib/std/array_hash_map.zig lib/std/array_list.zig lib/std/ascii.zig diff --git a/lib/compiler/build_runner.zig b/lib/compiler/build_runner.zig index 545cab6083..fc16aab980 100644 --- a/lib/compiler/build_runner.zig +++ b/lib/compiler/build_runner.zig @@ -107,7 +107,6 @@ pub fn main() !void { var targets = std.array_list.Managed([]const u8).init(arena); var debug_log_scopes = std.array_list.Managed([]const u8).init(arena); - var thread_pool_options: std.Thread.Pool.Options = .{ .allocator = arena }; var install_prefix: ?[]const u8 = null; var dir_list = std.Build.DirList{}; @@ -415,17 +414,10 @@ pub fn main() !void { builder.reference_trace = null; } else if (mem.startsWith(u8, arg, "-j")) { const num = arg["-j".len..]; - const n_jobs = std.fmt.parseUnsigned(u32, num, 10) catch |err| { - std.debug.print("unable to parse jobs count '{s}': {s}", .{ - num, @errorName(err), - }); - process.exit(1); - }; - if (n_jobs < 1) { - std.debug.print("number of jobs must be at least 1\n", .{}); - process.exit(1); - } - thread_pool_options.n_jobs = n_jobs; + const n_jobs = std.fmt.parseUnsigned(u32, num, 10) catch |err| + fatal("unable to parse jobs count '{s}': {t}", .{ num, err }); + if (n_jobs < 1) fatal("number of jobs must be at least 1", .{}); + threaded.setThreadCapacity(n_jobs); } else if (mem.eql(u8, arg, "--")) { builder.args = argsRest(args, arg_idx); break; @@ -524,7 +516,6 @@ pub fn main() !void { .summary = summary orelse if (watch or webui_listen != null) .line else .failures, .ttyconf = ttyconf, .stderr = stderr, - .thread_pool = undefined, }; defer { run.memory_blocked_steps.deinit(gpa); @@ -553,16 +544,12 @@ pub fn main() !void { break :w try .init(); }; - try run.thread_pool.init(thread_pool_options); - defer run.thread_pool.deinit(); - const now = Io.Clock.Timestamp.now(io, .awake) catch |err| fatal("failed to collect timestamp: {t}", .{err}); run.web_server = if (webui_listen) |listen_address| ws: { if (builtin.single_threaded) unreachable; // `fatal` above break :ws .init(.{ .gpa = gpa, - .thread_pool = &run.thread_pool, .graph = &graph, .all_steps = run.step_stack.keys(), .ttyconf = run.ttyconf, @@ -681,7 +668,6 @@ const Run = struct { memory_blocked_steps: std.ArrayListUnmanaged(*Step), /// Allocated into `gpa`. step_stack: std.AutoArrayHashMapUnmanaged(*Step, void), - thread_pool: std.Thread.Pool, claimed_rss: usize, error_style: ErrorStyle, @@ -759,14 +745,13 @@ fn runStepNames( const gpa = run.gpa; const io = b.graph.io; const step_stack = &run.step_stack; - const thread_pool = &run.thread_pool; { const step_prog = parent_prog_node.start("steps", step_stack.count()); defer step_prog.end(); - var wait_group: std.Thread.WaitGroup = .{}; - defer wait_group.wait(); + var wait_group: Io.Group = .init; + defer wait_group.wait(io); // Here we spawn the initial set of tasks with a nice heuristic - // dependency order. Each worker when it finishes a step will then @@ -776,9 +761,7 @@ fn runStepNames( const step = steps_slice[steps_slice.len - i - 1]; if (step.state == .skipped_oom) continue; - thread_pool.spawnWg(&wait_group, workerMakeOneStep, .{ - &wait_group, b, step, step_prog, run, - }); + wait_group.async(io, workerMakeOneStep, .{ &wait_group, b, step, step_prog, run }); } } @@ -862,12 +845,12 @@ fn runStepNames( var f = std.Build.Fuzz.init( gpa, io, - thread_pool, step_stack.keys(), parent_prog_node, ttyconf, mode, - ) catch |err| fatal("failed to start fuzzer: {s}", .{@errorName(err)}); + ) catch |err| + fatal("failed to start fuzzer: {t}", .{err}); defer f.deinit(); f.start(); @@ -1324,13 +1307,13 @@ fn constructGraphAndCheckForDependencyLoop( } fn workerMakeOneStep( - wg: *std.Thread.WaitGroup, + wg: *Io.Group, b: *std.Build, s: *Step, prog_node: std.Progress.Node, run: *Run, ) void { - const thread_pool = &run.thread_pool; + const io = b.graph.io; // First, check the conditions for running this step. If they are not met, // then we return without doing the step, relying on another worker to @@ -1387,7 +1370,6 @@ fn workerMakeOneStep( const make_result = s.make(.{ .progress_node = sub_prog_node, - .thread_pool = thread_pool, .watch = run.watch, .web_server = if (run.web_server) |*ws| ws else null, .unit_test_timeout_ns = run.unit_test_timeout_ns, @@ -1423,9 +1405,7 @@ fn workerMakeOneStep( // Successful completion of a step, so we queue up its dependants as well. for (s.dependants.items) |dep| { - thread_pool.spawnWg(wg, workerMakeOneStep, .{ - wg, b, dep, prog_node, run, - }); + wg.async(io, workerMakeOneStep, .{ wg, b, dep, prog_node, run }); } } @@ -1448,9 +1428,7 @@ fn workerMakeOneStep( if (dep.max_rss <= remaining) { remaining -= dep.max_rss; - thread_pool.spawnWg(wg, workerMakeOneStep, .{ - wg, b, dep, prog_node, run, - }); + wg.async(io, workerMakeOneStep, .{ wg, b, dep, prog_node, run }); } else { run.memory_blocked_steps.items[i] = dep; i += 1; diff --git a/lib/std/Build/Fuzz.zig b/lib/std/Build/Fuzz.zig index d342628871..c286324aa5 100644 --- a/lib/std/Build/Fuzz.zig +++ b/lib/std/Build/Fuzz.zig @@ -21,10 +21,9 @@ mode: Mode, /// Allocated into `gpa`. run_steps: []const *Step.Run, -wait_group: std.Thread.WaitGroup, +wait_group: Io.Group, root_prog_node: std.Progress.Node, prog_node: std.Progress.Node, -thread_pool: *std.Thread.Pool, ttyconf: tty.Config, /// Protects `coverage_files`. @@ -78,7 +77,6 @@ const CoverageMap = struct { pub fn init( gpa: Allocator, io: Io, - thread_pool: *std.Thread.Pool, all_steps: []const *Build.Step, root_prog_node: std.Progress.Node, ttyconf: tty.Config, @@ -89,15 +87,15 @@ pub fn init( defer steps.deinit(gpa); const rebuild_node = root_prog_node.start("Rebuilding Unit Tests", 0); defer rebuild_node.end(); - var rebuild_wg: std.Thread.WaitGroup = .{}; - defer rebuild_wg.wait(); + var rebuild_wg: Io.Group = .init; + defer rebuild_wg.wait(io); for (all_steps) |step| { const run = step.cast(Step.Run) orelse continue; if (run.producer == null) continue; if (run.fuzz_tests.items.len == 0) continue; try steps.append(gpa, run); - thread_pool.spawnWg(&rebuild_wg, rebuildTestsWorkerRun, .{ run, gpa, ttyconf, rebuild_node }); + rebuild_wg.async(io, rebuildTestsWorkerRun, .{ run, gpa, ttyconf, rebuild_node }); } if (steps.items.len == 0) fatal("no fuzz tests found", .{}); @@ -117,8 +115,7 @@ pub fn init( .io = io, .mode = mode, .run_steps = run_steps, - .wait_group = .{}, - .thread_pool = thread_pool, + .wait_group = .init, .ttyconf = ttyconf, .root_prog_node = root_prog_node, .prog_node = .none, @@ -131,29 +128,26 @@ pub fn init( } pub fn start(fuzz: *Fuzz) void { + const io = fuzz.io; fuzz.prog_node = fuzz.root_prog_node.start("Fuzzing", fuzz.run_steps.len); if (fuzz.mode == .forever) { // For polling messages and sending updates to subscribers. - fuzz.wait_group.start(); - _ = std.Thread.spawn(.{}, coverageRun, .{fuzz}) catch |err| { - fuzz.wait_group.finish(); - fatal("unable to spawn coverage thread: {s}", .{@errorName(err)}); - }; + fuzz.wait_group.concurrent(io, coverageRun, .{fuzz}) catch |err| + fatal("unable to spawn coverage thread: {t}", .{err}); } for (fuzz.run_steps) |run| { for (run.fuzz_tests.items) |unit_test_index| { assert(run.rebuilt_executable != null); - fuzz.thread_pool.spawnWg(&fuzz.wait_group, fuzzWorkerRun, .{ - fuzz, run, unit_test_index, - }); + fuzz.wait_group.async(io, fuzzWorkerRun, .{ fuzz, run, unit_test_index }); } } } pub fn deinit(fuzz: *Fuzz) void { - if (!fuzz.wait_group.isDone()) @panic("TODO: terminate the fuzzer processes"); + const io = fuzz.io; + fuzz.wait_group.cancel(io); fuzz.prog_node.end(); fuzz.gpa.free(fuzz.run_steps); } @@ -490,8 +484,8 @@ pub fn waitAndPrintReport(fuzz: *Fuzz) void { assert(fuzz.mode == .limit); const io = fuzz.io; - fuzz.wait_group.wait(); - fuzz.wait_group.reset(); + fuzz.wait_group.wait(io); + fuzz.wait_group = .init; std.debug.print("======= FUZZING REPORT =======\n", .{}); for (fuzz.msg_queue.items) |msg| { diff --git a/lib/std/Build/Step.zig b/lib/std/Build/Step.zig index aa922ff37b..40a568d597 100644 --- a/lib/std/Build/Step.zig +++ b/lib/std/Build/Step.zig @@ -110,7 +110,6 @@ pub const TestResults = struct { pub const MakeOptions = struct { progress_node: std.Progress.Node, - thread_pool: *std.Thread.Pool, watch: bool, web_server: switch (builtin.target.cpu.arch) { else => ?*Build.WebServer, diff --git a/lib/std/Build/Step/Run.zig b/lib/std/Build/Step/Run.zig index 314862e201..fb4f3e21f4 100644 --- a/lib/std/Build/Step/Run.zig +++ b/lib/std/Build/Step/Run.zig @@ -1122,7 +1122,6 @@ pub fn rerunInFuzzMode( const tmp_dir_path = "tmp" ++ fs.path.sep_str ++ std.fmt.hex(rand_int); try runCommand(run, argv_list.items, has_side_effects, tmp_dir_path, .{ .progress_node = prog_node, - .thread_pool = undefined, // not used by `runCommand` .watch = undefined, // not used by `runCommand` .web_server = null, // only needed for time reports .unit_test_timeout_ns = null, // don't time out fuzz tests for now diff --git a/lib/std/Build/WebServer.zig b/lib/std/Build/WebServer.zig index 50e304c950..a7032bc9f8 100644 --- a/lib/std/Build/WebServer.zig +++ b/lib/std/Build/WebServer.zig @@ -1,5 +1,4 @@ gpa: Allocator, -thread_pool: *std.Thread.Pool, graph: *const Build.Graph, all_steps: []const *Build.Step, listen_address: net.IpAddress, @@ -53,7 +52,6 @@ pub fn notifyUpdate(ws: *WebServer) void { pub const Options = struct { gpa: Allocator, - thread_pool: *std.Thread.Pool, graph: *const std.Build.Graph, all_steps: []const *Build.Step, ttyconf: Io.tty.Config, @@ -100,7 +98,6 @@ pub fn init(opts: Options) WebServer { return .{ .gpa = opts.gpa, - .thread_pool = opts.thread_pool, .graph = opts.graph, .all_steps = all_steps, .listen_address = opts.listen_address, @@ -235,7 +232,6 @@ pub fn finishBuild(ws: *WebServer, opts: struct { ws.fuzz = Fuzz.init( ws.gpa, ws.graph.io, - ws.thread_pool, ws.all_steps, ws.root_prog_node, ws.ttyconf, diff --git a/lib/std/Io.zig b/lib/std/Io.zig index d6b6fb7979..53e5fc7f18 100644 --- a/lib/std/Io.zig +++ b/lib/std/Io.zig @@ -653,6 +653,16 @@ pub const VTable = struct { context_alignment: std.mem.Alignment, start: *const fn (*Group, context: *const anyopaque) void, ) void, + groupConcurrent: *const fn ( + /// Corresponds to `Io.userdata`. + userdata: ?*anyopaque, + /// Owner of the spawned async task. + group: *Group, + /// Copied and then passed to `start`. + context: []const u8, + context_alignment: std.mem.Alignment, + start: *const fn (*Group, context: *const anyopaque) void, + ) ConcurrentError!void, groupWait: *const fn (?*anyopaque, *Group, token: *anyopaque) void, groupCancel: *const fn (?*anyopaque, *Group, token: *anyopaque) void, @@ -1037,6 +1047,32 @@ pub const Group = struct { io.vtable.groupAsync(io.userdata, g, @ptrCast((&args)[0..1]), .of(Args), TypeErased.start); } + pub fn concurrent( + g: *Group, + io: Io, + function: anytype, + args: std.meta.ArgsTuple(@TypeOf(function)), + ) ConcurrentError!void { + const Args = @TypeOf(args); + const TypeErased = struct { + fn start(group: *Group, context: *const anyopaque) void { + _ = group; + const args_casted: *const Args = @ptrCast(@alignCast(context)); + @call(.auto, function, args_casted.*); + } + }; + return io.vtable.groupConcurrent(io.userdata, g, @ptrCast((&args)[0..1]), .of(Args), TypeErased.start); + } + + pub fn eager( + g: *Group, + io: Io, + function: anytype, + args: std.meta.ArgsTuple(@TypeOf(function)), + ) void { + return Group.concurrent(g, io, function, args) catch @call(.auto, function, args); + } + /// Blocks until all tasks of the group finish. During this time, /// cancellation requests propagate to all members of the group. /// diff --git a/lib/std/Io/Threaded.zig b/lib/std/Io/Threaded.zig index 168395d335..5d1c95efa1 100644 --- a/lib/std/Io/Threaded.zig +++ b/lib/std/Io/Threaded.zig @@ -13,6 +13,7 @@ const net = std.Io.net; const HostName = std.Io.net.HostName; const IpAddress = std.Io.net.IpAddress; const Allocator = std.mem.Allocator; +const Alignment = std.mem.Alignment; const assert = std.debug.assert; const posix = std.posix; @@ -24,7 +25,8 @@ run_queue: std.SinglyLinkedList = .{}, join_requested: bool = false, threads: std.ArrayListUnmanaged(std.Thread), stack_size: usize, -cpu_count: std.Thread.CpuCountError!usize, +thread_capacity: std.atomic.Value(ThreadCapacity), +thread_capacity_error: ?std.Thread.CpuCountError, concurrent_count: usize, wsa: if (is_windows) Wsa else struct {} = .{}, @@ -33,6 +35,21 @@ have_signal_handler: bool, old_sig_io: if (have_sig_io) posix.Sigaction else void, old_sig_pipe: if (have_sig_pipe) posix.Sigaction else void, +pub const ThreadCapacity = enum(usize) { + unknown = 0, + _, + + pub fn init(n: usize) ThreadCapacity { + assert(n != 0); + return @enumFromInt(n); + } + + pub fn get(tc: ThreadCapacity) ?usize { + if (tc == .unknown) return null; + return @intFromEnum(tc); + } +}; + threadlocal var current_closure: ?*Closure = null; const max_iovecs_len = 8; @@ -103,18 +120,21 @@ pub fn init( /// here. gpa: Allocator, ) Threaded { + const cpu_count = std.Thread.getCpuCount(); + var t: Threaded = .{ .allocator = gpa, .threads = .empty, .stack_size = std.Thread.SpawnConfig.default_stack_size, - .cpu_count = std.Thread.getCpuCount(), + .thread_capacity = .init(if (cpu_count) |n| .init(n) else |_| .unknown), + .thread_capacity_error = if (cpu_count) |_| null else |e| e, .concurrent_count = 0, .old_sig_io = undefined, .old_sig_pipe = undefined, .have_signal_handler = false, }; - if (t.cpu_count) |n| { + if (cpu_count) |n| { t.threads.ensureTotalCapacityPrecise(gpa, n - 1) catch {}; } else |_| {} @@ -144,7 +164,8 @@ pub const init_single_threaded: Threaded = .{ .allocator = .failing, .threads = .empty, .stack_size = std.Thread.SpawnConfig.default_stack_size, - .cpu_count = 1, + .thread_capacity = .init(.init(1)), + .thread_capacity_error = null, .concurrent_count = 0, .old_sig_io = undefined, .old_sig_pipe = undefined, @@ -165,6 +186,18 @@ pub fn deinit(t: *Threaded) void { t.* = undefined; } +pub fn setThreadCapacity(t: *Threaded, n: usize) void { + t.thread_capacity.store(.init(n), .monotonic); +} + +pub fn getThreadCapacity(t: *Threaded) ?usize { + return t.thread_capacity.load(.monotonic).get(); +} + +pub fn getCurrentThreadId() usize { + @panic("TODO"); +} + fn join(t: *Threaded) void { if (builtin.single_threaded) return; { @@ -208,6 +241,7 @@ pub fn io(t: *Threaded) Io { .select = select, .groupAsync = groupAsync, + .groupConcurrent = groupConcurrent, .groupWait = groupWait, .groupCancel = groupCancel, @@ -304,6 +338,7 @@ pub fn ioBasic(t: *Threaded) Io { .select = select, .groupAsync = groupAsync, + .groupConcurrent = groupConcurrent, .groupWait = groupWait, .groupCancel = groupCancel, @@ -387,7 +422,7 @@ const AsyncClosure = struct { func: *const fn (context: *anyopaque, result: *anyopaque) void, reset_event: ResetEvent, select_condition: ?*ResetEvent, - context_alignment: std.mem.Alignment, + context_alignment: Alignment, result_offset: usize, const done_reset_event: *ResetEvent = @ptrFromInt(@alignOf(ResetEvent)); @@ -443,9 +478,9 @@ const AsyncClosure = struct { fn async( userdata: ?*anyopaque, result: []u8, - result_alignment: std.mem.Alignment, + result_alignment: Alignment, context: []const u8, - context_alignment: std.mem.Alignment, + context_alignment: Alignment, start: *const fn (context: *const anyopaque, result: *anyopaque) void, ) ?*Io.AnyFuture { if (builtin.single_threaded) { @@ -453,7 +488,7 @@ fn async( return null; } const t: *Threaded = @ptrCast(@alignCast(userdata)); - const cpu_count = t.cpu_count catch { + const cpu_count = t.getThreadCapacity() orelse { return concurrent(userdata, result.len, result_alignment, context, context_alignment, start) catch { start(context.ptr, result.ptr); return null; @@ -521,15 +556,15 @@ fn async( fn concurrent( userdata: ?*anyopaque, result_len: usize, - result_alignment: std.mem.Alignment, + result_alignment: Alignment, context: []const u8, - context_alignment: std.mem.Alignment, + context_alignment: Alignment, start: *const fn (context: *const anyopaque, result: *anyopaque) void, ) Io.ConcurrentError!*Io.AnyFuture { if (builtin.single_threaded) return error.ConcurrencyUnavailable; const t: *Threaded = @ptrCast(@alignCast(userdata)); - const cpu_count = t.cpu_count catch 1; + const cpu_count = t.getThreadCapacity() orelse 1; const gpa = t.allocator; const context_offset = context_alignment.forward(@sizeOf(AsyncClosure)); const result_offset = result_alignment.forward(context_offset + context.len); @@ -587,7 +622,7 @@ const GroupClosure = struct { /// Points to sibling `GroupClosure`. Used for walking the group to cancel all. node: std.SinglyLinkedList.Node, func: *const fn (*Io.Group, context: *anyopaque) void, - context_alignment: std.mem.Alignment, + context_alignment: Alignment, context_len: usize, fn start(closure: *Closure) void { @@ -621,11 +656,11 @@ const GroupClosure = struct { gpa.free(base[0..contextEnd(gc.context_alignment, gc.context_len)]); } - fn contextOffset(context_alignment: std.mem.Alignment) usize { + fn contextOffset(context_alignment: Alignment) usize { return context_alignment.forward(@sizeOf(GroupClosure)); } - fn contextEnd(context_alignment: std.mem.Alignment, context_len: usize) usize { + fn contextEnd(context_alignment: Alignment, context_len: usize) usize { return contextOffset(context_alignment) + context_len; } @@ -642,12 +677,12 @@ fn groupAsync( userdata: ?*anyopaque, group: *Io.Group, context: []const u8, - context_alignment: std.mem.Alignment, + context_alignment: Alignment, start: *const fn (*Io.Group, context: *const anyopaque) void, ) void { if (builtin.single_threaded) return start(group, context.ptr); const t: *Threaded = @ptrCast(@alignCast(userdata)); - const cpu_count = t.cpu_count catch 1; + const cpu_count = t.getThreadCapacity() orelse 1; const gpa = t.allocator; const n = GroupClosure.contextEnd(context_alignment, context.len); const gc: *GroupClosure = @ptrCast(@alignCast(gpa.alignedAlloc(u8, .of(GroupClosure), n) catch { @@ -704,6 +739,73 @@ fn groupAsync( t.cond.signal(); } +fn groupConcurrent( + userdata: ?*anyopaque, + group: *Io.Group, + context: []const u8, + context_alignment: Alignment, + start: *const fn (*Io.Group, context: *const anyopaque) void, +) Io.ConcurrentError!void { + if (builtin.single_threaded) return error.ConcurrencyUnavailable; + const t: *Threaded = @ptrCast(@alignCast(userdata)); + const cpu_count = t.getThreadCapacity() orelse 1; + const gpa = t.allocator; + const n = GroupClosure.contextEnd(context_alignment, context.len); + const gc_bytes = gpa.alignedAlloc(u8, .of(GroupClosure), n) catch + return error.ConcurrencyUnavailable; + const gc: *GroupClosure = @ptrCast(@alignCast(gc_bytes)); + gc.* = .{ + .closure = .{ + .cancel_tid = .none, + .start = GroupClosure.start, + .is_concurrent = false, + }, + .t = t, + .group = group, + .node = undefined, + .func = start, + .context_alignment = context_alignment, + .context_len = context.len, + }; + @memcpy(gc.contextPointer()[0..context.len], context); + + t.mutex.lock(); + + // Append to the group linked list inside the mutex to make `Io.Group.concurrent` thread-safe. + gc.node = .{ .next = @ptrCast(@alignCast(group.token)) }; + group.token = &gc.node; + + t.concurrent_count += 1; + const thread_capacity = cpu_count - 1 + t.concurrent_count; + + t.threads.ensureTotalCapacityPrecise(gpa, thread_capacity) catch { + t.mutex.unlock(); + gc.free(gpa); + return error.ConcurrencyUnavailable; + }; + + t.run_queue.prepend(&gc.closure.node); + + if (t.threads.items.len < thread_capacity) { + const thread = std.Thread.spawn(.{ .stack_size = t.stack_size }, worker, .{t}) catch { + assert(t.run_queue.popFirst() == &gc.closure.node); + t.mutex.unlock(); + gc.free(gpa); + return error.ConcurrencyUnavailable; + }; + t.threads.appendAssumeCapacity(thread); + } + + // This needs to be done before unlocking the mutex to avoid a race with + // the associated task finishing. + const group_state: *std.atomic.Value(usize) = @ptrCast(&group.state); + const prev_state = group_state.fetchAdd(GroupClosure.sync_one_pending, .monotonic); + assert((prev_state / GroupClosure.sync_one_pending) < (std.math.maxInt(usize) / GroupClosure.sync_one_pending)); + + t.mutex.unlock(); + t.cond.signal(); +} + fn groupWait(userdata: ?*anyopaque, group: *Io.Group, token: *anyopaque) void { const t: *Threaded = @ptrCast(@alignCast(userdata)); const gpa = t.allocator; @@ -771,7 +873,7 @@ fn await( userdata: ?*anyopaque, any_future: *Io.AnyFuture, result: []u8, - result_alignment: std.mem.Alignment, + result_alignment: Alignment, ) void { _ = result_alignment; const t: *Threaded = @ptrCast(@alignCast(userdata)); @@ -783,7 +885,7 @@ fn cancel( userdata: ?*anyopaque, any_future: *Io.AnyFuture, result: []u8, - result_alignment: std.mem.Alignment, + result_alignment: Alignment, ) void { _ = result_alignment; const t: *Threaded = @ptrCast(@alignCast(userdata)); diff --git a/lib/std/Thread.zig b/lib/std/Thread.zig index f3abe1e6cf..3dccecf009 100644 --- a/lib/std/Thread.zig +++ b/lib/std/Thread.zig @@ -17,8 +17,6 @@ pub const Mutex = @import("Thread/Mutex.zig"); pub const Semaphore = @import("Thread/Semaphore.zig"); pub const Condition = @import("Thread/Condition.zig"); pub const RwLock = @import("Thread/RwLock.zig"); -pub const Pool = @import("Thread/Pool.zig"); -pub const WaitGroup = @import("Thread/WaitGroup.zig"); pub const use_pthreads = native_os != .windows and native_os != .wasi and builtin.link_libc; @@ -1756,7 +1754,6 @@ test { _ = Semaphore; _ = Condition; _ = RwLock; - _ = Pool; } fn testIncrementNotify(value: *usize, event: *ResetEvent) void { diff --git a/lib/std/Thread/Pool.zig b/lib/std/Thread/Pool.zig deleted file mode 100644 index e836665d70..0000000000 --- a/lib/std/Thread/Pool.zig +++ /dev/null @@ -1,326 +0,0 @@ -const std = @import("std"); -const builtin = @import("builtin"); -const Pool = @This(); -const WaitGroup = @import("WaitGroup.zig"); - -mutex: std.Thread.Mutex = .{}, -cond: std.Thread.Condition = .{}, -run_queue: std.SinglyLinkedList = .{}, -is_running: bool = true, -allocator: std.mem.Allocator, -threads: if (builtin.single_threaded) [0]std.Thread else []std.Thread, -ids: if (builtin.single_threaded) struct { - inline fn deinit(_: @This(), _: std.mem.Allocator) void {} - fn getIndex(_: @This(), _: std.Thread.Id) usize { - return 0; - } -} else std.AutoArrayHashMapUnmanaged(std.Thread.Id, void), - -const Runnable = struct { - runFn: RunProto, - node: std.SinglyLinkedList.Node = .{}, -}; - -const RunProto = *const fn (*Runnable, id: ?usize) void; - -pub const Options = struct { - allocator: std.mem.Allocator, - n_jobs: ?usize = null, - track_ids: bool = false, - stack_size: usize = std.Thread.SpawnConfig.default_stack_size, -}; - -pub fn init(pool: *Pool, options: Options) !void { - const allocator = options.allocator; - - pool.* = .{ - .allocator = allocator, - .threads = if (builtin.single_threaded) .{} else &.{}, - .ids = .{}, - }; - - if (builtin.single_threaded) { - return; - } - - const thread_count = options.n_jobs orelse @max(1, std.Thread.getCpuCount() catch 1); - if (options.track_ids) { - try pool.ids.ensureTotalCapacity(allocator, 1 + thread_count); - pool.ids.putAssumeCapacityNoClobber(std.Thread.getCurrentId(), {}); - } - - // kill and join any threads we spawned and free memory on error. - pool.threads = try allocator.alloc(std.Thread, thread_count); - var spawned: usize = 0; - errdefer pool.join(spawned); - - for (pool.threads) |*thread| { - thread.* = try std.Thread.spawn(.{ - .stack_size = options.stack_size, - .allocator = allocator, - }, worker, .{pool}); - spawned += 1; - } -} - -pub fn deinit(pool: *Pool) void { - pool.join(pool.threads.len); // kill and join all threads. - pool.ids.deinit(pool.allocator); - pool.* = undefined; -} - -fn join(pool: *Pool, spawned: usize) void { - if (builtin.single_threaded) { - return; - } - - { - pool.mutex.lock(); - defer pool.mutex.unlock(); - - // ensure future worker threads exit the dequeue loop - pool.is_running = false; - } - - // wake up any sleeping threads (this can be done outside the mutex) - // then wait for all the threads we know are spawned to complete. - pool.cond.broadcast(); - for (pool.threads[0..spawned]) |thread| { - thread.join(); - } - - pool.allocator.free(pool.threads); -} - -/// Runs `func` in the thread pool, calling `WaitGroup.start` beforehand, and -/// `WaitGroup.finish` after it returns. -/// -/// In the case that queuing the function call fails to allocate memory, or the -/// target is single-threaded, the function is called directly. -pub fn spawnWg(pool: *Pool, wait_group: *WaitGroup, comptime func: anytype, args: anytype) void { - wait_group.start(); - - if (builtin.single_threaded) { - @call(.auto, func, args); - wait_group.finish(); - return; - } - - const Args = @TypeOf(args); - const Closure = struct { - arguments: Args, - pool: *Pool, - runnable: Runnable = .{ .runFn = runFn }, - wait_group: *WaitGroup, - - fn runFn(runnable: *Runnable, _: ?usize) void { - const closure: *@This() = @alignCast(@fieldParentPtr("runnable", runnable)); - @call(.auto, func, closure.arguments); - closure.wait_group.finish(); - - // The thread pool's allocator is protected by the mutex. - const mutex = &closure.pool.mutex; - mutex.lock(); - defer mutex.unlock(); - - closure.pool.allocator.destroy(closure); - } - }; - - { - pool.mutex.lock(); - - const closure = pool.allocator.create(Closure) catch { - pool.mutex.unlock(); - @call(.auto, func, args); - wait_group.finish(); - return; - }; - closure.* = .{ - .arguments = args, - .pool = pool, - .wait_group = wait_group, - }; - - pool.run_queue.prepend(&closure.runnable.node); - pool.mutex.unlock(); - } - - // Notify waiting threads outside the lock to try and keep the critical section small. - pool.cond.signal(); -} - -/// Runs `func` in the thread pool, calling `WaitGroup.start` beforehand, and -/// `WaitGroup.finish` after it returns. -/// -/// The first argument passed to `func` is a dense `usize` thread id, the rest -/// of the arguments are passed from `args`. Requires the pool to have been -/// initialized with `.track_ids = true`. -/// -/// In the case that queuing the function call fails to allocate memory, or the -/// target is single-threaded, the function is called directly. -pub fn spawnWgId(pool: *Pool, wait_group: *WaitGroup, comptime func: anytype, args: anytype) void { - wait_group.start(); - - if (builtin.single_threaded) { - @call(.auto, func, .{0} ++ args); - wait_group.finish(); - return; - } - - const Args = @TypeOf(args); - const Closure = struct { - arguments: Args, - pool: *Pool, - runnable: Runnable = .{ .runFn = runFn }, - wait_group: *WaitGroup, - - fn runFn(runnable: *Runnable, id: ?usize) void { - const closure: *@This() = @alignCast(@fieldParentPtr("runnable", runnable)); - @call(.auto, func, .{id.?} ++ closure.arguments); - closure.wait_group.finish(); - - // The thread pool's allocator is protected by the mutex. - const mutex = &closure.pool.mutex; - mutex.lock(); - defer mutex.unlock(); - - closure.pool.allocator.destroy(closure); - } - }; - - { - pool.mutex.lock(); - - const closure = pool.allocator.create(Closure) catch { - const id: ?usize = pool.ids.getIndex(std.Thread.getCurrentId()); - pool.mutex.unlock(); - @call(.auto, func, .{id.?} ++ args); - wait_group.finish(); - return; - }; - closure.* = .{ - .arguments = args, - .pool = pool, - .wait_group = wait_group, - }; - - pool.run_queue.prepend(&closure.runnable.node); - pool.mutex.unlock(); - } - - // Notify waiting threads outside the lock to try and keep the critical section small. - pool.cond.signal(); -} - -pub fn spawn(pool: *Pool, comptime func: anytype, args: anytype) !void { - if (builtin.single_threaded) { - @call(.auto, func, args); - return; - } - - const Args = @TypeOf(args); - const Closure = struct { - arguments: Args, - pool: *Pool, - runnable: Runnable = .{ .runFn = runFn }, - - fn runFn(runnable: *Runnable, _: ?usize) void { - const closure: *@This() = @alignCast(@fieldParentPtr("runnable", runnable)); - @call(.auto, func, closure.arguments); - - // The thread pool's allocator is protected by the mutex. - const mutex = &closure.pool.mutex; - mutex.lock(); - defer mutex.unlock(); - - closure.pool.allocator.destroy(closure); - } - }; - - { - pool.mutex.lock(); - defer pool.mutex.unlock(); - - const closure = try pool.allocator.create(Closure); - closure.* = .{ - .arguments = args, - .pool = pool, - }; - - pool.run_queue.prepend(&closure.runnable.node); - } - - // Notify waiting threads outside the lock to try and keep the critical section small. - pool.cond.signal(); -} - -test spawn { - const TestFn = struct { - fn checkRun(completed: *bool) void { - completed.* = true; - } - }; - - var completed: bool = false; - - { - var pool: Pool = undefined; - try pool.init(.{ - .allocator = std.testing.allocator, - }); - defer pool.deinit(); - try pool.spawn(TestFn.checkRun, .{&completed}); - } - - try std.testing.expectEqual(true, completed); -} - -fn worker(pool: *Pool) void { - pool.mutex.lock(); - defer pool.mutex.unlock(); - - const id: ?usize = if (pool.ids.count() > 0) @intCast(pool.ids.count()) else null; - if (id) |_| pool.ids.putAssumeCapacityNoClobber(std.Thread.getCurrentId(), {}); - - while (true) { - while (pool.run_queue.popFirst()) |run_node| { - // Temporarily unlock the mutex in order to execute the run_node - pool.mutex.unlock(); - defer pool.mutex.lock(); - - const runnable: *Runnable = @fieldParentPtr("node", run_node); - runnable.runFn(runnable, id); - } - - // Stop executing instead of waiting if the thread pool is no longer running. - if (pool.is_running) { - pool.cond.wait(&pool.mutex); - } else { - break; - } - } -} - -pub fn waitAndWork(pool: *Pool, wait_group: *WaitGroup) void { - var id: ?usize = null; - - while (!wait_group.isDone()) { - pool.mutex.lock(); - if (pool.run_queue.popFirst()) |run_node| { - id = id orelse pool.ids.getIndex(std.Thread.getCurrentId()); - pool.mutex.unlock(); - const runnable: *Runnable = @fieldParentPtr("node", run_node); - runnable.runFn(runnable, id); - continue; - } - - pool.mutex.unlock(); - wait_group.wait(); - return; - } -} - -pub fn getIdCount(pool: *Pool) usize { - return @intCast(1 + pool.threads.len); -} diff --git a/lib/std/Thread/WaitGroup.zig b/lib/std/Thread/WaitGroup.zig deleted file mode 100644 index a5970b7d69..0000000000 --- a/lib/std/Thread/WaitGroup.zig +++ /dev/null @@ -1,83 +0,0 @@ -const builtin = @import("builtin"); -const std = @import("std"); -const assert = std.debug.assert; -const WaitGroup = @This(); - -const is_waiting: usize = 1 << 0; -const one_pending: usize = 1 << 1; - -state: std.atomic.Value(usize) = std.atomic.Value(usize).init(0), -event: std.Thread.ResetEvent = .unset, - -pub fn start(self: *WaitGroup) void { - return startStateless(&self.state); -} - -pub fn startStateless(state: *std.atomic.Value(usize)) void { - const prev_state = state.fetchAdd(one_pending, .monotonic); - assert((prev_state / one_pending) < (std.math.maxInt(usize) / one_pending)); -} - -pub fn startMany(self: *WaitGroup, n: usize) void { - const state = self.state.fetchAdd(one_pending * n, .monotonic); - assert((state / one_pending) < (std.math.maxInt(usize) / one_pending)); -} - -pub fn finish(self: *WaitGroup) void { - const state = self.state.fetchSub(one_pending, .acq_rel); - assert((state / one_pending) > 0); - - if (state == (one_pending | is_waiting)) { - self.event.set(); - } -} - -pub fn finishStateless(state: *std.atomic.Value(usize), event: *std.Thread.ResetEvent) void { - const prev_state = state.fetchSub(one_pending, .acq_rel); - assert((prev_state / one_pending) > 0); - if (prev_state == (one_pending | is_waiting)) event.set(); -} - -pub fn wait(wg: *WaitGroup) void { - return waitStateless(&wg.state, &wg.event); -} - -pub fn waitStateless(state: *std.atomic.Value(usize), event: *std.Thread.ResetEvent) void { - const prev_state = state.fetchAdd(is_waiting, .acquire); - assert(prev_state & is_waiting == 0); - if ((prev_state / one_pending) > 0) event.wait(); -} - -pub fn reset(self: *WaitGroup) void { - self.state.store(0, .monotonic); - self.event.reset(); -} - -pub fn isDone(wg: *WaitGroup) bool { - const state = wg.state.load(.acquire); - assert(state & is_waiting == 0); - - return (state / one_pending) == 0; -} - -// Spawns a new thread for the task. This is appropriate when the callee -// delegates all work. -pub fn spawnManager( - wg: *WaitGroup, - comptime func: anytype, - args: anytype, -) void { - if (builtin.single_threaded) { - @call(.auto, func, args); - return; - } - const Manager = struct { - fn run(wg_inner: *WaitGroup, args_inner: @TypeOf(args)) void { - defer wg_inner.finish(); - @call(.auto, func, args_inner); - } - }; - wg.start(); - const t = std.Thread.spawn(.{}, Manager.run, .{ wg, args }) catch return Manager.run(wg, args); - t.detach(); -} diff --git a/src/Compilation.zig b/src/Compilation.zig index 3670bc51b5..d07b4a0cbb 100644 --- a/src/Compilation.zig +++ b/src/Compilation.zig @@ -10,8 +10,6 @@ const Allocator = std.mem.Allocator; const assert = std.debug.assert; const log = std.log.scoped(.compilation); const Target = std.Target; -const ThreadPool = std.Thread.Pool; -const WaitGroup = std.Thread.WaitGroup; const ErrorBundle = std.zig.ErrorBundle; const fatal = std.process.fatal; @@ -197,7 +195,6 @@ libc_include_dir_list: []const []const u8, libc_framework_dir_list: []const []const u8, rc_includes: RcIncludes, mingw_unicode_entry_point: bool, -thread_pool: *ThreadPool, /// Populated when we build the libc++ static library. A Job to build this is placed in the queue /// and resolved before calling linker.flush(). @@ -252,11 +249,11 @@ mutex: if (builtin.single_threaded) struct { pub inline fn tryLock(_: @This()) void {} pub inline fn lock(_: @This()) void {} pub inline fn unlock(_: @This()) void {} -} else std.Thread.Mutex = .{}, +} else Io.Mutex = .init, test_filters: []const []const u8, -link_task_wait_group: WaitGroup = .{}, +link_task_wait_group: Io.Group = .init, link_prog_node: std.Progress.Node = .none, link_const_prog_node: std.Progress.Node = .none, link_synth_prog_node: std.Progress.Node = .none, @@ -1579,7 +1576,7 @@ pub const CacheMode = enum { pub const ParentWholeCache = struct { manifest: *Cache.Manifest, - mutex: *std.Thread.Mutex, + mutex: *Io.Mutex, prefix_map: [4]u8, }; @@ -1607,7 +1604,7 @@ const CacheUse = union(CacheMode) { lf_open_opts: link.File.OpenOptions, /// This is a pointer to a local variable inside `update`. cache_manifest: ?*Cache.Manifest, - cache_manifest_mutex: std.Thread.Mutex, + cache_manifest_mutex: Io.Mutex, /// This is non-`null` for most of the body of `update`. It is the temporary directory which /// we initially emit our artifacts to. After the main part of the update is done, it will /// be closed and moved to its final location, and this field set to `null`. @@ -1647,7 +1644,6 @@ const CacheUse = union(CacheMode) { pub const CreateOptions = struct { dirs: Directories, - thread_pool: *ThreadPool, self_exe_path: ?[]const u8 = null, /// Options that have been resolved by calling `resolveDefaults`. @@ -2223,7 +2219,7 @@ pub fn create(gpa: Allocator, arena: Allocator, io: Io, diag: *CreateDiagnostic, .analysis_roots_buffer = undefined, .analysis_roots_len = 0, }; - try zcu.init(options.thread_pool.getIdCount()); + try zcu.init((@import("main.zig").threaded_io.getThreadCapacity() orelse 1) + 1); break :blk zcu; } else blk: { if (options.emit_h != .no) return diag.fail(.emit_h_without_zcu); @@ -2252,7 +2248,6 @@ pub fn create(gpa: Allocator, arena: Allocator, io: Io, diag: *CreateDiagnostic, .libc_framework_dir_list = libc_dirs.libc_framework_dir_list, .rc_includes = options.rc_includes, .mingw_unicode_entry_point = options.mingw_unicode_entry_point, - .thread_pool = options.thread_pool, .clang_passthrough_mode = options.clang_passthrough_mode, .clang_preprocessor_mode = options.clang_preprocessor_mode, .verbose_cc = options.verbose_cc, @@ -2478,7 +2473,7 @@ pub fn create(gpa: Allocator, arena: Allocator, io: Io, diag: *CreateDiagnostic, whole.* = .{ .lf_open_opts = lf_open_opts, .cache_manifest = null, - .cache_manifest_mutex = .{}, + .cache_manifest_mutex = .init, .tmp_artifact_directory = null, .lock = null, }; @@ -2874,8 +2869,10 @@ pub fn update(comp: *Compilation, main_progress_node: std.Progress.Node) UpdateE const tracy_trace = trace(@src()); defer tracy_trace.end(); - // This arena is scoped to this one update. const gpa = comp.gpa; + const io = comp.io; + + // This arena is scoped to this one update. var arena_allocator = std.heap.ArenaAllocator.init(gpa); defer arena_allocator.deinit(); const arena = arena_allocator.allocator(); @@ -2954,8 +2951,8 @@ pub fn update(comp: *Compilation, main_progress_node: std.Progress.Node) UpdateE // In this case the cache hit contains the full set of file system inputs. Nice! if (comp.file_system_inputs) |buf| try man.populateFileSystemInputs(buf); if (comp.parent_whole_cache) |pwc| { - pwc.mutex.lock(); - defer pwc.mutex.unlock(); + pwc.mutex.lockUncancelable(io); + defer pwc.mutex.unlock(io); try man.populateOtherManifest(pwc.manifest, pwc.prefix_map); } @@ -3153,8 +3150,8 @@ pub fn update(comp: *Compilation, main_progress_node: std.Progress.Node) UpdateE .whole => |whole| { if (comp.file_system_inputs) |buf| try man.populateFileSystemInputs(buf); if (comp.parent_whole_cache) |pwc| { - pwc.mutex.lock(); - defer pwc.mutex.unlock(); + pwc.mutex.lockUncancelable(io); + defer pwc.mutex.unlock(io); try man.populateOtherManifest(pwc.manifest, pwc.prefix_map); } @@ -3321,6 +3318,7 @@ fn flush( arena: Allocator, tid: Zcu.PerThread.Id, ) Allocator.Error!void { + const io = comp.io; if (comp.zcu) |zcu| { if (zcu.llvm_object) |llvm_object| { const pt: Zcu.PerThread = .activate(zcu, tid); @@ -3333,8 +3331,8 @@ fn flush( var timer = comp.startTimer(); defer if (timer.finish()) |ns| { - comp.mutex.lock(); - defer comp.mutex.unlock(); + comp.mutex.lockUncancelable(io); + defer comp.mutex.unlock(io); comp.time_report.?.stats.real_ns_llvm_emit = ns; }; @@ -3378,8 +3376,8 @@ fn flush( if (comp.bin_file) |lf| { var timer = comp.startTimer(); defer if (timer.finish()) |ns| { - comp.mutex.lock(); - defer comp.mutex.unlock(); + comp.mutex.lockUncancelable(io); + defer comp.mutex.unlock(io); comp.time_report.?.stats.real_ns_link_flush = ns; }; // This is needed before reading the error flags. @@ -4587,10 +4585,8 @@ pub fn unableToLoadZcuFile( }); } -fn performAllTheWork( - comp: *Compilation, - main_progress_node: std.Progress.Node, -) JobError!void { +fn performAllTheWork(comp: *Compilation, main_progress_node: std.Progress.Node) JobError!void { + const io = comp.io; // Regardless of errors, `comp.zcu` needs to update its generation number. defer if (comp.zcu) |zcu| { zcu.generation += 1; @@ -4602,8 +4598,8 @@ fn performAllTheWork( defer commit_timer: { const t = &(decl_work_timer orelse break :commit_timer); const ns = t.finish() orelse break :commit_timer; - comp.mutex.lock(); - defer comp.mutex.unlock(); + comp.mutex.lockUncancelable(io); + defer comp.mutex.unlock(io); comp.time_report.?.stats.real_ns_decls = ns; } @@ -4612,11 +4608,11 @@ fn performAllTheWork( // (at least for now) single-threaded main work queue. However, C object compilation // only needs to be finished by the end of this function. - var work_queue_wait_group: WaitGroup = .{}; - defer work_queue_wait_group.wait(); + var work_queue_wait_group: Io.Group = .init; + defer work_queue_wait_group.wait(io); - comp.link_task_wait_group.reset(); - defer comp.link_task_wait_group.wait(); + comp.link_task_wait_group = .init; + defer comp.link_task_wait_group.wait(io); // Already-queued prelink tasks comp.link_prog_node.increaseEstimatedTotalItems(comp.link_task_queue.queued_prelink.items.len); @@ -4624,8 +4620,8 @@ fn performAllTheWork( if (comp.emit_docs != null) { dev.check(.docs_emit); - comp.thread_pool.spawnWg(&work_queue_wait_group, workerDocsCopy, .{comp}); - work_queue_wait_group.spawnManager(workerDocsWasm, .{ comp, main_progress_node }); + work_queue_wait_group.async(io, workerDocsCopy, .{comp}); + work_queue_wait_group.eager(io, workerDocsWasm, .{ comp, main_progress_node }); } // In case it failed last time, try again. `clearMiscFailures` was already @@ -4636,7 +4632,7 @@ fn performAllTheWork( // // https://github.com/llvm/llvm-project/issues/43698#issuecomment-2542660611 comp.link_task_queue.startPrelinkItem(); - comp.link_task_wait_group.spawnManager(buildRt, .{ + comp.link_task_wait_group.eager(io, buildRt, .{ comp, "compiler_rt.zig", "compiler_rt", @@ -4654,7 +4650,7 @@ fn performAllTheWork( if (comp.queued_jobs.compiler_rt_obj and comp.compiler_rt_obj == null) { comp.link_task_queue.startPrelinkItem(); - comp.link_task_wait_group.spawnManager(buildRt, .{ + comp.link_task_wait_group.eager(io, buildRt, .{ comp, "compiler_rt.zig", "compiler_rt", @@ -4673,7 +4669,7 @@ fn performAllTheWork( // hack for stage2_x86_64 + coff if (comp.queued_jobs.compiler_rt_dyn_lib and comp.compiler_rt_dyn_lib == null) { comp.link_task_queue.startPrelinkItem(); - comp.link_task_wait_group.spawnManager(buildRt, .{ + comp.link_task_wait_group.eager(io, buildRt, .{ comp, "compiler_rt.zig", "compiler_rt", @@ -4691,7 +4687,7 @@ fn performAllTheWork( if (comp.queued_jobs.fuzzer_lib and comp.fuzzer_lib == null) { comp.link_task_queue.startPrelinkItem(); - comp.link_task_wait_group.spawnManager(buildRt, .{ + comp.link_task_wait_group.eager(io, buildRt, .{ comp, "fuzzer.zig", "fuzzer", @@ -4706,7 +4702,7 @@ fn performAllTheWork( if (comp.queued_jobs.ubsan_rt_lib and comp.ubsan_rt_lib == null) { comp.link_task_queue.startPrelinkItem(); - comp.link_task_wait_group.spawnManager(buildRt, .{ + comp.link_task_wait_group.eager(io, buildRt, .{ comp, "ubsan_rt.zig", "ubsan_rt", @@ -4723,7 +4719,7 @@ fn performAllTheWork( if (comp.queued_jobs.ubsan_rt_obj and comp.ubsan_rt_obj == null) { comp.link_task_queue.startPrelinkItem(); - comp.link_task_wait_group.spawnManager(buildRt, .{ + comp.link_task_wait_group.eager(io, buildRt, .{ comp, "ubsan_rt.zig", "ubsan_rt", @@ -4740,49 +4736,49 @@ fn performAllTheWork( if (comp.queued_jobs.glibc_shared_objects) { comp.link_task_queue.startPrelinkItem(); - comp.link_task_wait_group.spawnManager(buildGlibcSharedObjects, .{ comp, main_progress_node }); + comp.link_task_wait_group.eager(io, buildGlibcSharedObjects, .{ comp, main_progress_node }); } if (comp.queued_jobs.freebsd_shared_objects) { comp.link_task_queue.startPrelinkItem(); - comp.link_task_wait_group.spawnManager(buildFreeBSDSharedObjects, .{ comp, main_progress_node }); + comp.link_task_wait_group.eager(io, buildFreeBSDSharedObjects, .{ comp, main_progress_node }); } if (comp.queued_jobs.netbsd_shared_objects) { comp.link_task_queue.startPrelinkItem(); - comp.link_task_wait_group.spawnManager(buildNetBSDSharedObjects, .{ comp, main_progress_node }); + comp.link_task_wait_group.eager(io, buildNetBSDSharedObjects, .{ comp, main_progress_node }); } if (comp.queued_jobs.libunwind) { comp.link_task_queue.startPrelinkItem(); - comp.link_task_wait_group.spawnManager(buildLibUnwind, .{ comp, main_progress_node }); + comp.link_task_wait_group.eager(io, buildLibUnwind, .{ comp, main_progress_node }); } if (comp.queued_jobs.libcxx) { comp.link_task_queue.startPrelinkItem(); - comp.link_task_wait_group.spawnManager(buildLibCxx, .{ comp, main_progress_node }); + comp.link_task_wait_group.eager(io, buildLibCxx, .{ comp, main_progress_node }); } if (comp.queued_jobs.libcxxabi) { comp.link_task_queue.startPrelinkItem(); - comp.link_task_wait_group.spawnManager(buildLibCxxAbi, .{ comp, main_progress_node }); + comp.link_task_wait_group.eager(io, buildLibCxxAbi, .{ comp, main_progress_node }); } if (comp.queued_jobs.libtsan) { comp.link_task_queue.startPrelinkItem(); - comp.link_task_wait_group.spawnManager(buildLibTsan, .{ comp, main_progress_node }); + comp.link_task_wait_group.eager(io, buildLibTsan, .{ comp, main_progress_node }); } if (comp.queued_jobs.zigc_lib and comp.zigc_static_lib == null) { comp.link_task_queue.startPrelinkItem(); - comp.link_task_wait_group.spawnManager(buildLibZigC, .{ comp, main_progress_node }); + comp.link_task_wait_group.eager(io, buildLibZigC, .{ comp, main_progress_node }); } for (0..@typeInfo(musl.CrtFile).@"enum".fields.len) |i| { if (comp.queued_jobs.musl_crt_file[i]) { const tag: musl.CrtFile = @enumFromInt(i); comp.link_task_queue.startPrelinkItem(); - comp.link_task_wait_group.spawnManager(buildMuslCrtFile, .{ comp, tag, main_progress_node }); + comp.link_task_wait_group.eager(io, buildMuslCrtFile, .{ comp, tag, main_progress_node }); } } @@ -4790,7 +4786,7 @@ fn performAllTheWork( if (comp.queued_jobs.glibc_crt_file[i]) { const tag: glibc.CrtFile = @enumFromInt(i); comp.link_task_queue.startPrelinkItem(); - comp.link_task_wait_group.spawnManager(buildGlibcCrtFile, .{ comp, tag, main_progress_node }); + comp.link_task_wait_group.eager(io, buildGlibcCrtFile, .{ comp, tag, main_progress_node }); } } @@ -4798,7 +4794,7 @@ fn performAllTheWork( if (comp.queued_jobs.freebsd_crt_file[i]) { const tag: freebsd.CrtFile = @enumFromInt(i); comp.link_task_queue.startPrelinkItem(); - comp.link_task_wait_group.spawnManager(buildFreeBSDCrtFile, .{ comp, tag, main_progress_node }); + comp.link_task_wait_group.eager(io, buildFreeBSDCrtFile, .{ comp, tag, main_progress_node }); } } @@ -4806,7 +4802,7 @@ fn performAllTheWork( if (comp.queued_jobs.netbsd_crt_file[i]) { const tag: netbsd.CrtFile = @enumFromInt(i); comp.link_task_queue.startPrelinkItem(); - comp.link_task_wait_group.spawnManager(buildNetBSDCrtFile, .{ comp, tag, main_progress_node }); + comp.link_task_wait_group.eager(io, buildNetBSDCrtFile, .{ comp, tag, main_progress_node }); } } @@ -4814,7 +4810,7 @@ fn performAllTheWork( if (comp.queued_jobs.wasi_libc_crt_file[i]) { const tag: wasi_libc.CrtFile = @enumFromInt(i); comp.link_task_queue.startPrelinkItem(); - comp.link_task_wait_group.spawnManager(buildWasiLibcCrtFile, .{ comp, tag, main_progress_node }); + comp.link_task_wait_group.eager(io, buildWasiLibcCrtFile, .{ comp, tag, main_progress_node }); } } @@ -4822,7 +4818,7 @@ fn performAllTheWork( if (comp.queued_jobs.mingw_crt_file[i]) { const tag: mingw.CrtFile = @enumFromInt(i); comp.link_task_queue.startPrelinkItem(); - comp.link_task_wait_group.spawnManager(buildMingwCrtFile, .{ comp, tag, main_progress_node }); + comp.link_task_wait_group.eager(io, buildMingwCrtFile, .{ comp, tag, main_progress_node }); } } @@ -4835,13 +4831,13 @@ fn performAllTheWork( var timer = comp.startTimer(); defer if (timer.finish()) |ns| { - comp.mutex.lock(); - defer comp.mutex.unlock(); + comp.mutex.lockUncancelable(io); + defer comp.mutex.unlock(io); comp.time_report.?.stats.real_ns_files = ns; }; - var astgen_wait_group: WaitGroup = .{}; - defer astgen_wait_group.wait(); + var astgen_wait_group: Io.Group = .init; + defer astgen_wait_group.wait(io); if (comp.zcu) |zcu| { const gpa = zcu.gpa; @@ -4865,7 +4861,7 @@ fn performAllTheWork( // sure the file contents are still correct on disk, since it can improve the // debugging experience better. That job only needs `file`, so we can kick it // off right now. - comp.thread_pool.spawnWg(&astgen_wait_group, workerUpdateBuiltinFile, .{ comp, file }); + astgen_wait_group.async(io, workerUpdateBuiltinFile, .{ comp, file }); continue; } astgen_work_items.appendAssumeCapacity(.{ @@ -4876,7 +4872,7 @@ fn performAllTheWork( // Now that we're not going to touch `zcu.import_table` again, we can spawn `workerUpdateFile` jobs. for (astgen_work_items.items(.file_index), astgen_work_items.items(.file)) |file_index, file| { - comp.thread_pool.spawnWgId(&astgen_wait_group, workerUpdateFile, .{ + astgen_wait_group.async(io, workerUpdateFile, .{ comp, file, file_index, zir_prog_node, &astgen_wait_group, }); } @@ -4886,22 +4882,20 @@ fn performAllTheWork( // `@embedFile` can't trigger analysis of a new `@embedFile`! for (0.., zcu.embed_table.keys()) |ef_index_usize, ef| { const ef_index: Zcu.EmbedFile.Index = @enumFromInt(ef_index_usize); - comp.thread_pool.spawnWgId(&astgen_wait_group, workerUpdateEmbedFile, .{ - comp, ef_index, ef, - }); + astgen_wait_group.async(io, workerUpdateEmbedFile, .{ comp, ef_index, ef }); } } while (comp.c_object_work_queue.popFront()) |c_object| { comp.link_task_queue.startPrelinkItem(); - comp.thread_pool.spawnWg(&comp.link_task_wait_group, workerUpdateCObject, .{ + comp.link_task_wait_group.async(io, workerUpdateCObject, .{ comp, c_object, main_progress_node, }); } while (comp.win32_resource_work_queue.popFront()) |win32_resource| { comp.link_task_queue.startPrelinkItem(); - comp.thread_pool.spawnWg(&comp.link_task_wait_group, workerUpdateWin32Resource, .{ + comp.link_task_wait_group.async(io, workerUpdateWin32Resource, .{ comp, win32_resource, main_progress_node, }); } @@ -4935,8 +4929,8 @@ fn performAllTheWork( defer gpa.free(path); const result = res: { - whole.cache_manifest_mutex.lock(); - defer whole.cache_manifest_mutex.unlock(); + whole.cache_manifest_mutex.lockUncancelable(io); + defer whole.cache_manifest_mutex.unlock(io); if (file.source) |source| { break :res man.addFilePostContents(path, source, file.stat); } else { @@ -5008,8 +5002,8 @@ fn performAllTheWork( if (!comp.separateCodegenThreadOk()) { // Waits until all input files have been parsed. - comp.link_task_wait_group.wait(); - comp.link_task_wait_group.reset(); + comp.link_task_wait_group.wait(io); + comp.link_task_wait_group = .init; std.log.scoped(.link).debug("finished waiting for link_task_wait_group", .{}); } @@ -5056,6 +5050,8 @@ pub fn queueJobs(comp: *Compilation, jobs: []const Job) !void { } fn processOneJob(tid: usize, comp: *Compilation, job: Job) JobError!void { + const io = comp.io; + switch (job) { .codegen_func => |func| { const zcu = comp.zcu.?; @@ -5087,7 +5083,7 @@ fn processOneJob(tid: usize, comp: *Compilation, job: Job) JobError!void { const air_bytes: u32 = @intCast(air.instructions.len * 5 + air.extra.items.len * 4); if (comp.separateCodegenThreadOk()) { // `workerZcuCodegen` takes ownership of `air`. - comp.thread_pool.spawnWgId(&comp.link_task_wait_group, workerZcuCodegen, .{ comp, func.func, air, shared_mir }); + comp.link_task_wait_group.async(io, workerZcuCodegen, .{ comp, func.func, air, shared_mir }); comp.dispatchZcuLinkTask(tid, .{ .link_func = .{ .func = func.func, .mir = shared_mir, @@ -5463,7 +5459,6 @@ fn workerDocsWasmFallible(comp: *Compilation, prog_node: std.Progress.Node) SubU .entry = .disabled, .cache_mode = .whole, .root_name = root_name, - .thread_pool = comp.thread_pool, .libc_installation = comp.libc_installation, .emit_bin = .yes_cache, .verbose_cc = comp.verbose_cc, @@ -5517,13 +5512,15 @@ fn workerDocsWasmFallible(comp: *Compilation, prog_node: std.Progress.Node) SubU } fn workerUpdateFile( - tid: usize, comp: *Compilation, file: *Zcu.File, file_index: Zcu.File.Index, prog_node: std.Progress.Node, - wg: *WaitGroup, + wg: *Io.Group, ) void { + const io = comp.io; + const tid: usize = std.Io.Threaded.getCurrentThreadId(); + const child_prog_node = prog_node.start(fs.path.basename(file.path.sub_path), 0); defer child_prog_node.end(); @@ -5532,8 +5529,8 @@ fn workerUpdateFile( pt.updateFile(file_index, file) catch |err| { pt.reportRetryableFileError(file_index, "unable to load '{s}': {s}", .{ fs.path.basename(file.path.sub_path), @errorName(err) }) catch |oom| switch (oom) { error.OutOfMemory => { - comp.mutex.lock(); - defer comp.mutex.unlock(); + comp.mutex.lockUncancelable(io); + defer comp.mutex.unlock(io); comp.setAllocFailure(); }, }; @@ -5563,14 +5560,14 @@ fn workerUpdateFile( if (pt.discoverImport(file.path, import_path)) |res| switch (res) { .module, .existing_file => {}, .new_file => |new| { - comp.thread_pool.spawnWgId(wg, workerUpdateFile, .{ + wg.async(io, workerUpdateFile, .{ comp, new.file, new.index, prog_node, wg, }); }, } else |err| switch (err) { error.OutOfMemory => { - comp.mutex.lock(); - defer comp.mutex.unlock(); + comp.mutex.lockUncancelable(io); + defer comp.mutex.unlock(io); comp.setAllocFailure(); }, } @@ -5586,17 +5583,20 @@ fn workerUpdateBuiltinFile(comp: *Compilation, file: *Zcu.File) void { ); } -fn workerUpdateEmbedFile(tid: usize, comp: *Compilation, ef_index: Zcu.EmbedFile.Index, ef: *Zcu.EmbedFile) void { +fn workerUpdateEmbedFile(comp: *Compilation, ef_index: Zcu.EmbedFile.Index, ef: *Zcu.EmbedFile) void { + const tid: usize = std.Io.Threaded.getCurrentThreadId(); + const io = comp.io; comp.detectEmbedFileUpdate(@enumFromInt(tid), ef_index, ef) catch |err| switch (err) { error.OutOfMemory => { - comp.mutex.lock(); - defer comp.mutex.unlock(); + comp.mutex.lockUncancelable(io); + defer comp.mutex.unlock(io); comp.setAllocFailure(); }, }; } fn detectEmbedFileUpdate(comp: *Compilation, tid: Zcu.PerThread.Id, ef_index: Zcu.EmbedFile.Index, ef: *Zcu.EmbedFile) !void { + const io = comp.io; const zcu = comp.zcu.?; const pt: Zcu.PerThread = .activate(zcu, tid); defer pt.deactivate(); @@ -5609,8 +5609,8 @@ fn detectEmbedFileUpdate(comp: *Compilation, tid: Zcu.PerThread.Id, ef_index: Zc if (ef.val != .none and ef.val == old_val) return; // success, value unchanged if (ef.val == .none and old_val == .none and ef.err == old_err) return; // failure, error unchanged - comp.mutex.lock(); - defer comp.mutex.unlock(); + comp.mutex.lockUncancelable(io); + defer comp.mutex.unlock(io); try zcu.markDependeeOutdated(.not_marked_po, .{ .embed_file = ef_index }); } @@ -5753,8 +5753,8 @@ pub fn translateC( switch (comp.cache_use) { .whole => |whole| if (whole.cache_manifest) |whole_cache_manifest| { - whole.cache_manifest_mutex.lock(); - defer whole.cache_manifest_mutex.unlock(); + whole.cache_manifest_mutex.lockUncancelable(io); + defer whole.cache_manifest_mutex.unlock(io); try whole_cache_manifest.addDepFilePost(cache_tmp_dir, dep_basename); }, .incremental, .none => {}, @@ -5892,12 +5892,12 @@ pub const RtOptions = struct { }; fn workerZcuCodegen( - tid: usize, comp: *Compilation, func_index: InternPool.Index, orig_air: Air, out: *link.ZcuTask.LinkFunc.SharedMir, ) void { + const tid: usize = std.Io.Threaded.getCurrentThreadId(); var air = orig_air; // We own `air` now, so we are responsbile for freeing it. defer air.deinit(comp.gpa); @@ -6119,6 +6119,7 @@ fn reportRetryableWin32ResourceError( win32_resource: *Win32Resource, err: anyerror, ) error{OutOfMemory}!void { + const io = comp.io; win32_resource.status = .failure_retryable; var bundle: ErrorBundle.Wip = undefined; @@ -6140,8 +6141,8 @@ fn reportRetryableWin32ResourceError( }); const finished_bundle = try bundle.toOwnedBundle(""); { - comp.mutex.lock(); - defer comp.mutex.unlock(); + comp.mutex.lockUncancelable(io); + defer comp.mutex.unlock(io); try comp.failed_win32_resources.putNoClobber(comp.gpa, win32_resource, finished_bundle); } } @@ -6166,8 +6167,8 @@ fn updateCObject(comp: *Compilation, c_object: *CObject, c_obj_prog_node: std.Pr if (c_object.clearStatus(gpa)) { // There was previous failure. - comp.mutex.lock(); - defer comp.mutex.unlock(); + comp.mutex.lockUncancelable(io); + defer comp.mutex.unlock(io); // If the failure was OOM, there will not be an entry here, so we do // not assert discard. _ = comp.failed_c_objects.swapRemove(c_object); @@ -6459,6 +6460,7 @@ fn updateCObject(comp: *Compilation, c_object: *CObject, c_obj_prog_node: std.Pr } fn updateWin32Resource(comp: *Compilation, win32_resource: *Win32Resource, win32_resource_prog_node: std.Progress.Node) !void { + const io = comp.io; if (!std.process.can_spawn) { return comp.failWin32Resource(win32_resource, "{s} does not support spawning a child process", .{@tagName(builtin.os.tag)}); } @@ -6483,8 +6485,8 @@ fn updateWin32Resource(comp: *Compilation, win32_resource: *Win32Resource, win32 if (win32_resource.clearStatus(comp.gpa)) { // There was previous failure. - comp.mutex.lock(); - defer comp.mutex.unlock(); + comp.mutex.lockUncancelable(io); + defer comp.mutex.unlock(io); // If the failure was OOM, there will not be an entry here, so we do // not assert discard. _ = comp.failed_win32_resources.swapRemove(win32_resource); @@ -6658,8 +6660,8 @@ fn updateWin32Resource(comp: *Compilation, win32_resource: *Win32Resource, win32 try man.addFilePost(dep_file_path); switch (comp.cache_use) { .whole => |whole| if (whole.cache_manifest) |whole_cache_manifest| { - whole.cache_manifest_mutex.lock(); - defer whole.cache_manifest_mutex.unlock(); + whole.cache_manifest_mutex.lockUncancelable(io); + defer whole.cache_manifest_mutex.unlock(io); try whole_cache_manifest.addFilePost(dep_file_path); }, .incremental, .none => {}, @@ -7375,10 +7377,11 @@ fn failCObjWithOwnedDiagBundle( diag_bundle: *CObject.Diag.Bundle, ) SemaError { @branchHint(.cold); + const io = comp.io; assert(diag_bundle.diags.len > 0); { - comp.mutex.lock(); - defer comp.mutex.unlock(); + comp.mutex.lockUncancelable(io); + defer comp.mutex.unlock(io); { errdefer diag_bundle.destroy(comp.gpa); try comp.failed_c_objects.ensureUnusedCapacity(comp.gpa, 1); @@ -7418,9 +7421,10 @@ fn failWin32ResourceWithOwnedBundle( err_bundle: ErrorBundle, ) SemaError { @branchHint(.cold); + const io = comp.io; { - comp.mutex.lock(); - defer comp.mutex.unlock(); + comp.mutex.lockUncancelable(io); + defer comp.mutex.unlock(io); try comp.failed_win32_resources.putNoClobber(comp.gpa, win32_resource, err_bundle); } win32_resource.status = .failure; @@ -7744,8 +7748,9 @@ pub fn lockAndSetMiscFailure( comptime format: []const u8, args: anytype, ) void { - comp.mutex.lock(); - defer comp.mutex.unlock(); + const io = comp.io; + comp.mutex.lockUncancelable(io); + defer comp.mutex.unlock(io); return setMiscFailure(comp, tag, format, args); } @@ -7775,6 +7780,7 @@ pub fn updateSubCompilation( misc_task: MiscTask, prog_node: std.Progress.Node, ) SubUpdateError!void { + const io = parent_comp.io; { const sub_node = prog_node.start(@tagName(misc_task), 0); defer sub_node.end(); @@ -7789,8 +7795,8 @@ pub fn updateSubCompilation( defer errors.deinit(gpa); if (errors.errorMessageCount() > 0) { - parent_comp.mutex.lock(); - defer parent_comp.mutex.unlock(); + parent_comp.mutex.lockUncancelable(io); + defer parent_comp.mutex.unlock(io); try parent_comp.misc_failures.ensureUnusedCapacity(gpa, 1); parent_comp.misc_failures.putAssumeCapacityNoClobber(misc_task, .{ .msg = try std.fmt.allocPrint(gpa, "sub-compilation of {t} failed", .{misc_task}), @@ -7898,7 +7904,6 @@ fn buildOutputFromZig( .config = config, .root_mod = root_mod, .root_name = root_name, - .thread_pool = comp.thread_pool, .libc_installation = comp.libc_installation, .emit_bin = .yes_cache, .function_sections = true, @@ -8034,7 +8039,6 @@ pub fn build_crt_file( .config = config, .root_mod = root_mod, .root_name = root_name, - .thread_pool = comp.thread_pool, .libc_installation = comp.libc_installation, .emit_bin = .yes_cache, .function_sections = options.function_sections orelse false, @@ -8066,8 +8070,8 @@ pub fn build_crt_file( comp.queuePrelinkTaskMode(crt_file.full_object_path, &config); { - comp.mutex.lock(); - defer comp.mutex.unlock(); + comp.mutex.lockUncancelable(io); + defer comp.mutex.unlock(io); try comp.crt_files.ensureUnusedCapacity(gpa, 1); comp.crt_files.putAssumeCapacityNoClobber(basename, crt_file); } diff --git a/src/Package/Fetch.zig b/src/Package/Fetch.zig index 46be6fa069..3b0852ccd2 100644 --- a/src/Package/Fetch.zig +++ b/src/Package/Fetch.zig @@ -38,15 +38,12 @@ const assert = std.debug.assert; const ascii = std.ascii; const Allocator = std.mem.Allocator; const Cache = std.Build.Cache; -const ThreadPool = std.Thread.Pool; -const WaitGroup = std.Thread.WaitGroup; const git = @import("Fetch/git.zig"); const Package = @import("../Package.zig"); const Manifest = Package.Manifest; const ErrorBundle = std.zig.ErrorBundle; arena: std.heap.ArenaAllocator, -io: Io, location: Location, location_tok: std.zig.Ast.TokenIndex, hash_tok: std.zig.Ast.OptionalTokenIndex, @@ -104,6 +101,7 @@ pub const LazyStatus = enum { /// Contains shared state among all `Fetch` tasks. pub const JobQueue = struct { + io: Io, mutex: std.Thread.Mutex = .{}, /// It's an array hash map so that it can be sorted before rendering the /// dependencies.zig source file. @@ -115,8 +113,7 @@ pub const JobQueue = struct { all_fetches: std.ArrayListUnmanaged(*Fetch) = .empty, http_client: *std.http.Client, - thread_pool: *ThreadPool, - wait_group: WaitGroup = .{}, + wait_group: Io.Group = .init, global_cache: Cache.Directory, /// If true then, no fetching occurs, and: /// * The `global_cache` directory is assumed to be the direct parent @@ -326,7 +323,7 @@ pub const RunError = error{ }; pub fn run(f: *Fetch) RunError!void { - const io = f.io; + const io = f.job_queue.io; const eb = &f.error_bundle; const arena = f.arena.allocator(); const gpa = f.arena.child_allocator; @@ -488,7 +485,7 @@ fn runResource( resource: *Resource, remote_hash: ?Package.Hash, ) RunError!void { - const io = f.io; + const io = f.job_queue.io; defer resource.deinit(io); const arena = f.arena.allocator(); const eb = &f.error_bundle; @@ -702,7 +699,7 @@ fn loadManifest(f: *Fetch, pkg_root: Cache.Path) RunError!void { } fn queueJobsForDeps(f: *Fetch) RunError!void { - const io = f.io; + const io = f.job_queue.io; assert(f.job_queue.recursive); // If the package does not have a build.zig.zon file then there are no dependencies. @@ -792,7 +789,6 @@ fn queueJobsForDeps(f: *Fetch) RunError!void { f.job_queue.all_fetches.appendAssumeCapacity(new_fetch); } new_fetch.* = .{ - .io = io, .arena = std.heap.ArenaAllocator.init(gpa), .location = location, .location_tok = dep.location_tok, @@ -831,10 +827,8 @@ fn queueJobsForDeps(f: *Fetch) RunError!void { }; // Now it's time to give tasks to the thread pool. - const thread_pool = f.job_queue.thread_pool; - for (new_fetches, prog_names) |*new_fetch, prog_name| { - thread_pool.spawnWg(&f.job_queue.wait_group, workerRun, .{ new_fetch, prog_name }); + f.job_queue.wait_group.async(io, workerRun, .{ new_fetch, prog_name }); } } @@ -992,7 +986,7 @@ const FileType = enum { const init_resource_buffer_size = git.Packet.max_data_length; fn initResource(f: *Fetch, uri: std.Uri, resource: *Resource, reader_buffer: []u8) RunError!void { - const io = f.io; + const io = f.job_queue.io; const arena = f.arena.allocator(); const eb = &f.error_bundle; @@ -1286,7 +1280,7 @@ fn unzip(f: *Fetch, out_dir: fs.Dir, reader: *Io.Reader) error{ ReadFailed, OutO // must be processed back to front and they could be too large to // load into memory. - const io = f.io; + const io = f.job_queue.io; const cache_root = f.job_queue.global_cache; const prefix = "tmp/"; const suffix = ".zip"; @@ -1348,7 +1342,7 @@ fn unzip(f: *Fetch, out_dir: fs.Dir, reader: *Io.Reader) error{ ReadFailed, OutO } fn unpackGitPack(f: *Fetch, out_dir: fs.Dir, resource: *Resource.Git) anyerror!UnpackResult { - const io = f.io; + const io = f.job_queue.io; const arena = f.arena.allocator(); // TODO don't try to get a gpa from an arena. expose this dependency higher up // because the backing of arena could be page allocator @@ -1486,11 +1480,11 @@ const ComputedHash = struct { /// hashed* and must not be present on the file system when calling this /// function. fn computeHash(f: *Fetch, pkg_path: Cache.Path, filter: Filter) RunError!ComputedHash { + const io = f.job_queue.io; // All the path name strings need to be in memory for sorting. const arena = f.arena.allocator(); const gpa = f.arena.child_allocator; const eb = &f.error_bundle; - const thread_pool = f.job_queue.thread_pool; const root_dir = pkg_path.root_dir.handle; // Collect all files, recursively, then sort. @@ -1514,15 +1508,15 @@ fn computeHash(f: *Fetch, pkg_path: Cache.Path, filter: Filter) RunError!Compute { // The final hash will be a hash of each file hashed independently. This // allows hashing in parallel. - var wait_group: WaitGroup = .{}; - // `computeHash` is called from a worker thread so there must not be + var wait_group: Io.Group = .init; + // TODO `computeHash` is called from a worker thread so there must not be // any waiting without working or a deadlock could occur. - defer thread_pool.waitAndWork(&wait_group); + defer wait_group.wait(io); while (walker.next() catch |err| { try eb.addRootErrorMessage(.{ .msg = try eb.printString( - "unable to walk temporary directory '{f}': {s}", - .{ pkg_path, @errorName(err) }, + "unable to walk temporary directory '{f}': {t}", + .{ pkg_path, err }, ) }); return error.FetchFailed; }) |entry| { @@ -1542,7 +1536,7 @@ fn computeHash(f: *Fetch, pkg_path: Cache.Path, filter: Filter) RunError!Compute .fs_path = fs_path, .failure = undefined, // to be populated by the worker }; - thread_pool.spawnWg(&wait_group, workerDeleteFile, .{ root_dir, deleted_file }); + wait_group.async(io, workerDeleteFile, .{ root_dir, deleted_file }); try deleted_files.append(deleted_file); continue; } @@ -1570,7 +1564,7 @@ fn computeHash(f: *Fetch, pkg_path: Cache.Path, filter: Filter) RunError!Compute .failure = undefined, // to be populated by the worker .size = undefined, // to be populated by the worker }; - thread_pool.spawnWg(&wait_group, workerHashFile, .{ root_dir, hashed_file }); + wait_group.async(io, workerHashFile, .{ root_dir, hashed_file }); try all_files.append(hashed_file); } } @@ -2241,7 +2235,6 @@ fn saveEmbedFile(comptime tarball_name: []const u8, dir: fs.Dir) !void { // Builds Fetch with required dependencies, clears dependencies on deinit(). const TestFetchBuilder = struct { - thread_pool: ThreadPool, http_client: std.http.Client, global_cache_directory: Cache.Directory, job_queue: Fetch.JobQueue, @@ -2256,13 +2249,11 @@ const TestFetchBuilder = struct { ) !*Fetch { const cache_dir = try cache_parent_dir.makeOpenPath("zig-global-cache", .{}); - try self.thread_pool.init(.{ .allocator = allocator }); self.http_client = .{ .allocator = allocator, .io = io }; self.global_cache_directory = .{ .handle = cache_dir, .path = null }; self.job_queue = .{ .http_client = &self.http_client, - .thread_pool = &self.thread_pool, .global_cache = self.global_cache_directory, .recursive = false, .read_only = false, @@ -2309,7 +2300,6 @@ const TestFetchBuilder = struct { self.fetch.prog_node.end(); self.global_cache_directory.handle.close(); self.http_client.deinit(); - self.thread_pool.deinit(); } fn packageDir(self: *TestFetchBuilder) !fs.Dir { diff --git a/src/Zcu.zig b/src/Zcu.zig index ed1ae0eece..a2d29e8d60 100644 --- a/src/Zcu.zig +++ b/src/Zcu.zig @@ -4566,9 +4566,10 @@ pub fn codegenFail( /// Takes ownership of `msg`, even on OOM. pub fn codegenFailMsg(zcu: *Zcu, nav_index: InternPool.Nav.Index, msg: *ErrorMsg) CodegenFailError { const gpa = zcu.gpa; + const io = zcu.comp.io; { - zcu.comp.mutex.lock(); - defer zcu.comp.mutex.unlock(); + zcu.comp.mutex.lockUncancelable(io); + defer zcu.comp.mutex.unlock(io); errdefer msg.deinit(gpa); try zcu.failed_codegen.putNoClobber(gpa, nav_index, msg); } @@ -4577,8 +4578,9 @@ pub fn codegenFailMsg(zcu: *Zcu, nav_index: InternPool.Nav.Index, msg: *ErrorMsg /// Asserts that `zcu.failed_codegen` contains the key `nav`, with the necessary lock held. pub fn assertCodegenFailed(zcu: *Zcu, nav: InternPool.Nav.Index) void { - zcu.comp.mutex.lock(); - defer zcu.comp.mutex.unlock(); + const io = zcu.comp.io; + zcu.comp.mutex.lockUncancelable(io); + defer zcu.comp.mutex.unlock(io); assert(zcu.failed_codegen.contains(nav)); } @@ -4729,6 +4731,7 @@ const TrackedUnitSema = struct { analysis_timer_decl: ?InternPool.TrackedInst.Index, pub fn end(tus: TrackedUnitSema, zcu: *Zcu) void { const comp = zcu.comp; + const io = comp.io; if (tus.old_name) |old_name| { zcu.sema_prog_node.completeOne(); // we're just renaming, but it's effectively completion zcu.cur_sema_prog_node.setName(&old_name); @@ -4739,8 +4742,8 @@ const TrackedUnitSema = struct { report_time: { const sema_ns = zcu.cur_analysis_timer.?.finish() orelse break :report_time; const zir_decl = tus.analysis_timer_decl orelse break :report_time; - comp.mutex.lock(); - defer comp.mutex.unlock(); + comp.mutex.lockUncancelable(io); + defer comp.mutex.unlock(io); comp.time_report.?.stats.cpu_ns_sema += sema_ns; const gop = comp.time_report.?.decl_sema_info.getOrPut(comp.gpa, zir_decl) catch |err| switch (err) { error.OutOfMemory => { diff --git a/src/Zcu/PerThread.zig b/src/Zcu/PerThread.zig index 474ccc710d..bb1d9f9a99 100644 --- a/src/Zcu/PerThread.zig +++ b/src/Zcu/PerThread.zig @@ -267,8 +267,8 @@ pub fn updateFile( // Any potential AST errors are converted to ZIR errors when we run AstGen/ZonGen. file.tree = try Ast.parse(gpa, source, file.getMode()); if (timer.finish()) |ns_parse| { - comp.mutex.lock(); - defer comp.mutex.unlock(); + comp.mutex.lockUncancelable(io); + defer comp.mutex.unlock(io); comp.time_report.?.stats.cpu_ns_parse += ns_parse; } @@ -293,8 +293,8 @@ pub fn updateFile( }, } if (timer.finish()) |ns_astgen| { - comp.mutex.lock(); - defer comp.mutex.unlock(); + comp.mutex.lockUncancelable(io); + defer comp.mutex.unlock(io); comp.time_report.?.stats.cpu_ns_astgen += ns_astgen; } @@ -313,8 +313,8 @@ pub fn updateFile( switch (file.getMode()) { .zig => { if (file.zir.?.hasCompileErrors()) { - comp.mutex.lock(); - defer comp.mutex.unlock(); + comp.mutex.lockUncancelable(io); + defer comp.mutex.unlock(io); try zcu.failed_files.putNoClobber(gpa, file_index, null); } if (file.zir.?.loweringFailed()) { @@ -326,8 +326,8 @@ pub fn updateFile( .zon => { if (file.zoir.?.hasCompileErrors()) { file.status = .astgen_failure; - comp.mutex.lock(); - defer comp.mutex.unlock(); + comp.mutex.lockUncancelable(io); + defer comp.mutex.unlock(io); try zcu.failed_files.putNoClobber(gpa, file_index, null); } else { file.status = .success; @@ -1910,6 +1910,7 @@ pub fn discoverImport( } { const zcu = pt.zcu; const gpa = zcu.gpa; + const io = zcu.comp.io; if (!mem.endsWith(u8, import_string, ".zig") and !mem.endsWith(u8, import_string, ".zon")) { return .module; @@ -1919,8 +1920,8 @@ pub fn discoverImport( errdefer new_path.deinit(gpa); // We're about to do a GOP on `import_table`, so we need the mutex. - zcu.comp.mutex.lock(); - defer zcu.comp.mutex.unlock(); + zcu.comp.mutex.lockUncancelable(io); + defer zcu.comp.mutex.unlock(io); const gop = try zcu.import_table.getOrPutAdapted(gpa, new_path, Zcu.ImportTableAdapter{ .zcu = zcu }); errdefer _ = zcu.import_table.pop(); @@ -2502,12 +2503,10 @@ fn updateEmbedFileInner( } /// Assumes that `path` is allocated into `gpa`. Takes ownership of `path` on success. -fn newEmbedFile( - pt: Zcu.PerThread, - path: Compilation.Path, -) !*Zcu.EmbedFile { +fn newEmbedFile(pt: Zcu.PerThread, path: Compilation.Path) !*Zcu.EmbedFile { const zcu = pt.zcu; const comp = zcu.comp; + const io = comp.io; const gpa = zcu.gpa; const ip = &zcu.intern_pool; @@ -2541,8 +2540,8 @@ fn newEmbedFile( const path_str = try path.toAbsolute(comp.dirs, gpa); defer gpa.free(path_str); - whole.cache_manifest_mutex.lock(); - defer whole.cache_manifest_mutex.unlock(); + whole.cache_manifest_mutex.lockUncancelable(io); + defer whole.cache_manifest_mutex.unlock(io); man.addFilePostContents(path_str, contents, new_file.stat) catch |err| switch (err) { error.Unexpected => unreachable, @@ -3049,6 +3048,8 @@ pub fn getErrorValueFromSlice(pt: Zcu.PerThread, name: []const u8) Allocator.Err /// Removes any entry from `Zcu.failed_files` associated with `file`. Acquires `Compilation.mutex` as needed. /// `file.zir` must be unchanged from the last update, as it is used to determine if there is such an entry. fn lockAndClearFileCompileError(pt: Zcu.PerThread, file_index: Zcu.File.Index, file: *Zcu.File) void { + const io = pt.zcu.comp.io; + const maybe_has_error = switch (file.status) { .never_loaded => false, .retryable_failure => true, @@ -3070,8 +3071,8 @@ fn lockAndClearFileCompileError(pt: Zcu.PerThread, file_index: Zcu.File.Index, f return; } - pt.zcu.comp.mutex.lock(); - defer pt.zcu.comp.mutex.unlock(); + pt.zcu.comp.mutex.lockUncancelable(io); + defer pt.zcu.comp.mutex.unlock(io); if (pt.zcu.failed_files.fetchSwapRemove(file_index)) |kv| { assert(maybe_has_error); // the runtime safety case above if (kv.value) |msg| pt.zcu.gpa.free(msg); // delete previous error message @@ -3400,6 +3401,7 @@ pub fn reportRetryableFileError( ) error{OutOfMemory}!void { const zcu = pt.zcu; const gpa = zcu.gpa; + const io = zcu.comp.io; const file = zcu.fileByIndex(file_index); @@ -3409,8 +3411,8 @@ pub fn reportRetryableFileError( errdefer gpa.free(msg); const old_msg: ?[]u8 = old_msg: { - zcu.comp.mutex.lock(); - defer zcu.comp.mutex.unlock(); + zcu.comp.mutex.lockUncancelable(io); + defer zcu.comp.mutex.unlock(io); const gop = try zcu.failed_files.getOrPut(gpa, file_index); const old: ?[]u8 = if (gop.found_existing) old: { @@ -4391,6 +4393,7 @@ pub fn addDependency(pt: Zcu.PerThread, unit: AnalUnit, dependee: InternPool.Dep /// codegen thread, depending on whether the backend supports `Zcu.Feature.separate_thread`. pub fn runCodegen(pt: Zcu.PerThread, func_index: InternPool.Index, air: *Air, out: *@import("../link.zig").ZcuTask.LinkFunc.SharedMir) void { const zcu = pt.zcu; + const io = zcu.comp.io; crash_report.CodegenFunc.start(zcu, func_index); defer crash_report.CodegenFunc.stop(func_index); @@ -4422,8 +4425,8 @@ pub fn runCodegen(pt: Zcu.PerThread, func_index: InternPool.Index, air: *Air, ou const ip = &zcu.intern_pool; const nav = ip.indexToKey(func_index).func.owner_nav; const zir_decl = ip.getNav(nav).srcInst(ip); - zcu.comp.mutex.lock(); - defer zcu.comp.mutex.unlock(); + zcu.comp.mutex.lockUncancelable(io); + defer zcu.comp.mutex.unlock(io); const tr = &zcu.comp.time_report.?; tr.stats.cpu_ns_codegen += ns_codegen; const gop = tr.decl_codegen_ns.getOrPut(zcu.gpa, zir_decl) catch |err| switch (err) { diff --git a/src/libs/mingw.zig b/src/libs/mingw.zig index 1773c321e1..115199e73a 100644 --- a/src/libs/mingw.zig +++ b/src/libs/mingw.zig @@ -281,8 +281,8 @@ pub fn buildImportLib(comp: *Compilation, lib_name: []const u8) !void { const sub_path = try std.fs.path.join(gpa, &.{ "o", &digest, final_lib_basename }); errdefer gpa.free(sub_path); - comp.mutex.lock(); - defer comp.mutex.unlock(); + comp.mutex.lockUncancelable(io); + defer comp.mutex.unlock(io); try comp.crt_files.ensureUnusedCapacity(gpa, 1); comp.crt_files.putAssumeCapacityNoClobber(final_lib_basename, .{ .full_object_path = .{ @@ -388,8 +388,8 @@ pub fn buildImportLib(comp: *Compilation, lib_name: []const u8) !void { log.warn("failed to write cache manifest for DLL import {s}.lib: {s}", .{ lib_name, @errorName(err) }); }; - comp.mutex.lock(); - defer comp.mutex.unlock(); + comp.mutex.lockUncancelable(io); + defer comp.mutex.unlock(io); try comp.crt_files.putNoClobber(gpa, final_lib_basename, .{ .full_object_path = .{ .root_dir = comp.dirs.global_cache, diff --git a/src/link.zig b/src/link.zig index 7cf8e5c1a6..112ae9fdb6 100644 --- a/src/link.zig +++ b/src/link.zig @@ -1344,6 +1344,7 @@ pub const ZcuTask = union(enum) { }; pub fn doPrelinkTask(comp: *Compilation, task: PrelinkTask) void { + const io = comp.io; const diags = &comp.link_diags; const base = comp.bin_file orelse { comp.link_prog_node.completeOne(); @@ -1352,8 +1353,8 @@ pub fn doPrelinkTask(comp: *Compilation, task: PrelinkTask) void { var timer = comp.startTimer(); defer if (timer.finish()) |ns| { - comp.mutex.lock(); - defer comp.mutex.unlock(); + comp.mutex.lockUncancelable(io); + defer comp.mutex.unlock(io); comp.time_report.?.stats.cpu_ns_link += ns; }; @@ -1478,6 +1479,7 @@ pub fn doPrelinkTask(comp: *Compilation, task: PrelinkTask) void { } } pub fn doZcuTask(comp: *Compilation, tid: usize, task: ZcuTask) void { + const io = comp.io; const diags = &comp.link_diags; const zcu = comp.zcu.?; const ip = &zcu.intern_pool; @@ -1567,8 +1569,8 @@ pub fn doZcuTask(comp: *Compilation, tid: usize, task: ZcuTask) void { .link_nav => |nav| ip.getNav(nav).srcInst(ip), .link_func => |f| ip.getNav(ip.indexToKey(f.func).func.owner_nav).srcInst(ip), }; - comp.mutex.lock(); - defer comp.mutex.unlock(); + comp.mutex.lockUncancelable(io); + defer comp.mutex.unlock(io); const tr = &zcu.comp.time_report.?; tr.stats.cpu_ns_link += ns_link; if (zir_decl) |inst| { diff --git a/src/link/Elf.zig b/src/link/Elf.zig index 2537332c28..8798fb009b 100644 --- a/src/link/Elf.zig +++ b/src/link/Elf.zig @@ -713,6 +713,7 @@ pub fn allocateChunk(self: *Elf, args: struct { pub fn loadInput(self: *Elf, input: link.Input) !void { const comp = self.base.comp; const gpa = comp.gpa; + const io = comp.io; const diags = &comp.link_diags; const target = self.getTarget(); const debug_fmt_strip = comp.config.debug_format == .strip; @@ -720,8 +721,8 @@ pub fn loadInput(self: *Elf, input: link.Input) !void { const is_static_lib = self.base.isStaticLib(); if (comp.verbose_link) { - comp.mutex.lock(); // protect comp.arena - defer comp.mutex.unlock(); + comp.mutex.lockUncancelable(io); // protect comp.arena + defer comp.mutex.unlock(io); const argv = &self.dump_argv_list; switch (input) { diff --git a/src/link/MachO/CodeSignature.zig b/src/link/MachO/CodeSignature.zig index 00b01a6eab..566524b2cc 100644 --- a/src/link/MachO/CodeSignature.zig +++ b/src/link/MachO/CodeSignature.zig @@ -267,6 +267,7 @@ pub fn writeAdhocSignature( defer tracy.end(); const allocator = macho_file.base.comp.gpa; + const io = macho_file.base.comp.io; var header: macho.SuperBlob = .{ .magic = macho.CSMAGIC_EMBEDDED_SIGNATURE, @@ -289,7 +290,7 @@ pub fn writeAdhocSignature( self.code_directory.inner.nCodeSlots = total_pages; // Calculate hash for each page (in file) and write it to the buffer - var hasher = Hasher(Sha256){ .allocator = allocator, .thread_pool = macho_file.base.comp.thread_pool }; + var hasher: Hasher(Sha256) = .{ .allocator = allocator, .io = io }; try hasher.hash(opts.file, self.code_directory.code_slots.items, .{ .chunk_size = self.page_size, .max_file_size = opts.file_size, diff --git a/src/link/MachO/hasher.zig b/src/link/MachO/hasher.zig index f10a2fe8cf..6931aeed39 100644 --- a/src/link/MachO/hasher.zig +++ b/src/link/MachO/hasher.zig @@ -3,7 +3,7 @@ pub fn ParallelHasher(comptime Hasher: type) type { return struct { allocator: Allocator, - thread_pool: *ThreadPool, + io: Io, pub fn hash(self: Self, file: fs.File, out: [][hash_size]u8, opts: struct { chunk_size: u64 = 0x4000, @@ -12,7 +12,8 @@ pub fn ParallelHasher(comptime Hasher: type) type { const tracy = trace(@src()); defer tracy.end(); - var wg: WaitGroup = .{}; + const io = self.io; + const gpa = self.allocator; const file_size = blk: { const file_size = opts.max_file_size orelse try file.getEndPos(); @@ -20,15 +21,15 @@ pub fn ParallelHasher(comptime Hasher: type) type { }; const chunk_size = std.math.cast(usize, opts.chunk_size) orelse return error.Overflow; - const buffer = try self.allocator.alloc(u8, chunk_size * out.len); - defer self.allocator.free(buffer); + const buffer = try gpa.alloc(u8, chunk_size * out.len); + defer gpa.free(buffer); - const results = try self.allocator.alloc(fs.File.PReadError!usize, out.len); - defer self.allocator.free(results); + const results = try gpa.alloc(fs.File.PReadError!usize, out.len); + defer gpa.free(results); { - wg.reset(); - defer wg.wait(); + var wg: Io.Group = .init; + defer wg.wait(io); for (out, results, 0..) |*out_buf, *result, i| { const fstart = i * chunk_size; @@ -36,7 +37,8 @@ pub fn ParallelHasher(comptime Hasher: type) type { file_size - fstart else chunk_size; - self.thread_pool.spawnWg(&wg, worker, .{ + + wg.async(io, worker, .{ file, fstart, buffer[fstart..][0..fsize], @@ -65,12 +67,11 @@ pub fn ParallelHasher(comptime Hasher: type) type { }; } +const std = @import("std"); +const Io = std.Io; const assert = std.debug.assert; const fs = std.fs; const mem = std.mem; -const std = @import("std"); -const trace = @import("../../tracy.zig").trace; - const Allocator = mem.Allocator; -const ThreadPool = std.Thread.Pool; -const WaitGroup = std.Thread.WaitGroup; + +const trace = @import("../../tracy.zig").trace; diff --git a/src/link/MachO/relocatable.zig b/src/link/MachO/relocatable.zig index 09807a2845..d2a6c2a3ab 100644 --- a/src/link/MachO/relocatable.zig +++ b/src/link/MachO/relocatable.zig @@ -773,7 +773,6 @@ fn writeHeader(macho_file: *MachO, ncmds: usize, sizeofcmds: usize) !void { const std = @import("std"); const Path = std.Build.Cache.Path; -const WaitGroup = std.Thread.WaitGroup; const assert = std.debug.assert; const log = std.log.scoped(.link); const macho = std.macho; diff --git a/src/link/MachO/uuid.zig b/src/link/MachO/uuid.zig index 565ae80b22..ed3babe344 100644 --- a/src/link/MachO/uuid.zig +++ b/src/link/MachO/uuid.zig @@ -15,7 +15,7 @@ pub fn calcUuid(comp: *const Compilation, file: fs.File, file_size: u64, out: *[ const hashes = try comp.gpa.alloc([Md5.digest_length]u8, actual_num_chunks); defer comp.gpa.free(hashes); - var hasher = Hasher(Md5){ .allocator = comp.gpa, .thread_pool = comp.thread_pool }; + var hasher: Hasher(Md5) = .{ .allocator = comp.gpa, .io = comp.io }; try hasher.hash(file, hashes, .{ .chunk_size = chunk_size, .max_file_size = file_size, @@ -46,4 +46,3 @@ const trace = @import("../../tracy.zig").trace; const Compilation = @import("../../Compilation.zig"); const Md5 = std.crypto.hash.Md5; const Hasher = @import("hasher.zig").ParallelHasher; -const ThreadPool = std.Thread.Pool; diff --git a/src/link/Queue.zig b/src/link/Queue.zig index 742b4664f1..0cd090af73 100644 --- a/src/link/Queue.zig +++ b/src/link/Queue.zig @@ -102,6 +102,7 @@ pub fn deinit(q: *Queue, comp: *Compilation) void { /// This is expected to be called exactly once, after which the caller must not directly access /// `queued_prelink` any longer. This will spawn the link thread if necessary. pub fn start(q: *Queue, comp: *Compilation) void { + const io = comp.io; assert(q.state == .finished); assert(q.queued_zcu.items.len == 0); // Reset this to 1. We can't init it to 1 in `empty`, because it would fall to 0 on successive @@ -109,7 +110,7 @@ pub fn start(q: *Queue, comp: *Compilation) void { q.prelink_wait_count = 1; if (q.queued_prelink.items.len != 0) { q.state = .running; - comp.thread_pool.spawnWgId(&comp.link_task_wait_group, flushTaskQueue, .{ q, comp }); + comp.link_task_wait_group.async(io, flushTaskQueue, .{ q, comp }); } } @@ -124,6 +125,7 @@ pub fn startPrelinkItem(q: *Queue) void { /// indicates that we have finished calling `startPrelinkItem`, so once all pending items finish, /// we are ready to move on to ZCU tasks. pub fn finishPrelinkItem(q: *Queue, comp: *Compilation) void { + const io = comp.io; { q.mutex.lock(); defer q.mutex.unlock(); @@ -140,12 +142,13 @@ pub fn finishPrelinkItem(q: *Queue, comp: *Compilation) void { // that `link.File.prelink()` is called. q.state = .running; } - comp.thread_pool.spawnWgId(&comp.link_task_wait_group, flushTaskQueue, .{ q, comp }); + comp.link_task_wait_group.async(io, flushTaskQueue, .{ q, comp }); } /// Called by codegen workers after they have populated a `ZcuTask.LinkFunc.SharedMir`. If the link /// thread was waiting for this MIR, it can resume. pub fn mirReady(q: *Queue, comp: *Compilation, func_index: InternPool.Index, mir: *ZcuTask.LinkFunc.SharedMir) void { + const io = comp.io; // We would like to assert that `mir` is not pending, but that would race with a worker thread // potentially freeing it. { @@ -159,12 +162,13 @@ pub fn mirReady(q: *Queue, comp: *Compilation, func_index: InternPool.Index, mir q.state = .running; } assert(mir.status.load(.acquire) != .pending); - comp.thread_pool.spawnWgId(&comp.link_task_wait_group, flushTaskQueue, .{ q, comp }); + comp.link_task_wait_group.async(io, flushTaskQueue, .{ q, comp }); } /// Enqueues all prelink tasks in `tasks`. Asserts that they were expected, i.e. that /// `prelink_wait_count` is not yet 0. Also asserts that `tasks.len` is not 0. pub fn enqueuePrelink(q: *Queue, comp: *Compilation, tasks: []const PrelinkTask) Allocator.Error!void { + const io = comp.io; { q.mutex.lock(); defer q.mutex.unlock(); @@ -178,10 +182,11 @@ pub fn enqueuePrelink(q: *Queue, comp: *Compilation, tasks: []const PrelinkTask) // Restart the linker thread, because it was waiting for a task q.state = .running; } - comp.thread_pool.spawnWgId(&comp.link_task_wait_group, flushTaskQueue, .{ q, comp }); + comp.link_task_wait_group.async(io, flushTaskQueue, .{ q, comp }); } pub fn enqueueZcu(q: *Queue, comp: *Compilation, task: ZcuTask) Allocator.Error!void { + const io = comp.io; assert(comp.separateCodegenThreadOk()); { q.mutex.lock(); @@ -208,10 +213,11 @@ pub fn enqueueZcu(q: *Queue, comp: *Compilation, task: ZcuTask) Allocator.Error! } q.state = .running; } - comp.thread_pool.spawnWgId(&comp.link_task_wait_group, flushTaskQueue, .{ q, comp }); + comp.link_task_wait_group.async(io, flushTaskQueue, .{ q, comp }); } -fn flushTaskQueue(tid: usize, q: *Queue, comp: *Compilation) void { +fn flushTaskQueue(q: *Queue, comp: *Compilation) void { + const tid: usize = std.Io.Threaded.getCurrentThreadId(); q.flush_safety.lock(); // every `return` site should unlock this before unlocking `q.mutex` if (std.debug.runtime_safety) { q.mutex.lock(); diff --git a/src/link/Wasm.zig b/src/link/Wasm.zig index f47f7fbe2a..f0b0912505 100644 --- a/src/link/Wasm.zig +++ b/src/link/Wasm.zig @@ -3393,10 +3393,11 @@ pub fn updateExports( pub fn loadInput(wasm: *Wasm, input: link.Input) !void { const comp = wasm.base.comp; const gpa = comp.gpa; + const io = comp.io; if (comp.verbose_link) { - comp.mutex.lock(); // protect comp.arena - defer comp.mutex.unlock(); + comp.mutex.lockUncancelable(io); // protect comp.arena + defer comp.mutex.unlock(io); const argv = &wasm.dump_argv_list; switch (input) { diff --git a/src/main.zig b/src/main.zig index 03d42ba898..326e1c0677 100644 --- a/src/main.zig +++ b/src/main.zig @@ -11,7 +11,6 @@ const Allocator = mem.Allocator; const Ast = std.zig.Ast; const Color = std.zig.Color; const warn = std.log.warn; -const ThreadPool = std.Thread.Pool; const cleanExit = std.process.cleanExit; const Cache = std.Build.Cache; const Path = std.Build.Cache.Path; @@ -166,6 +165,8 @@ var debug_allocator: std.heap.DebugAllocator(.{ .stack_trace_frames = build_options.mem_leak_frames, }) = .init; +pub var threaded_io: Io.Threaded = undefined; + pub fn main() anyerror!void { const gpa, const is_debug = gpa: { if (build_options.debug_gpa) break :gpa .{ debug_allocator.allocator(), true }; @@ -247,9 +248,13 @@ fn mainArgs(gpa: Allocator, arena: Allocator, args: []const []const u8) !void { } } - var threaded: Io.Threaded = .init(gpa); - defer threaded.deinit(); - const io = threaded.io(); + threaded_io = .init(gpa); + defer threaded_io.deinit(); + if (threaded_io.getThreadCapacity()) |n| { + threaded_io.setThreadCapacity(@min(n, std.math.maxInt(Zcu.PerThread.IdBacking))); + } + threaded_io.stack_size = thread_stack_size; + const io = threaded_io.io(); const cmd = args[1]; const cmd_args = args[2..]; @@ -1124,15 +1129,10 @@ fn buildOutputType( fatal("expected [auto|on|off] after --color, found '{s}'", .{next_arg}); }; } else if (mem.cutPrefix(u8, arg, "-j")) |str| { - const num = std.fmt.parseUnsigned(u32, str, 10) catch |err| { - fatal("unable to parse jobs count '{s}': {s}", .{ - str, @errorName(err), - }); - }; - if (num < 1) { - fatal("number of jobs must be at least 1\n", .{}); - } - n_jobs = num; + const n = std.fmt.parseUnsigned(u32, str, 10) catch |err| + fatal("unable to parse jobs count '{s}': {t}", .{ str, err }); + if (n < 1) fatal("number of jobs must be at least 1", .{}); + n_jobs = n; } else if (mem.eql(u8, arg, "--subsystem")) { subsystem = try parseSubSystem(args_iter.nextOrFatal()); } else if (mem.eql(u8, arg, "-O")) { @@ -3282,14 +3282,10 @@ fn buildOutputType( }, }; - var thread_pool: ThreadPool = undefined; - try thread_pool.init(.{ - .allocator = gpa, - .n_jobs = @min(@max(n_jobs orelse std.Thread.getCpuCount() catch 1, 1), std.math.maxInt(Zcu.PerThread.IdBacking)), - .track_ids = true, - .stack_size = thread_stack_size, - }); - defer thread_pool.deinit(); + if (n_jobs) |n| { + assert(n >= 1); + threaded_io.setThreadCapacity(@min(n, std.math.maxInt(Zcu.PerThread.IdBacking))); + } for (create_module.c_source_files.items) |*src| { dev.check(.c_compiler); @@ -3378,7 +3374,6 @@ fn buildOutputType( var create_diag: Compilation.CreateDiagnostic = undefined; const comp = Compilation.create(gpa, arena, io, &create_diag, .{ .dirs = dirs, - .thread_pool = &thread_pool, .self_exe_path = switch (native_os) { .wasi => null, else => self_exe_path, @@ -4956,15 +4951,10 @@ fn cmdBuild(gpa: Allocator, arena: Allocator, io: Io, args: []const []const u8) try child_argv.appendSlice(&.{ arg, args[i] }); continue; } else if (mem.cutPrefix(u8, arg, "-j")) |str| { - const num = std.fmt.parseUnsigned(u32, str, 10) catch |err| { - fatal("unable to parse jobs count '{s}': {s}", .{ - str, @errorName(err), - }); - }; - if (num < 1) { - fatal("number of jobs must be at least 1\n", .{}); - } - n_jobs = num; + const n = std.fmt.parseUnsigned(u32, str, 10) catch |err| + fatal("unable to parse jobs count '{s}': {t}", .{ str, err }); + if (n < 1) fatal("number of jobs must be at least 1", .{}); + n_jobs = n; } else if (mem.eql(u8, arg, "--seed")) { if (i + 1 >= args.len) fatal("expected argument after '{s}'", .{arg}); i += 1; @@ -5049,14 +5039,10 @@ fn cmdBuild(gpa: Allocator, arena: Allocator, io: Io, args: []const []const u8) child_argv.items[argv_index_global_cache_dir] = dirs.global_cache.path orelse cwd_path; child_argv.items[argv_index_cache_dir] = dirs.local_cache.path orelse cwd_path; - var thread_pool: ThreadPool = undefined; - try thread_pool.init(.{ - .allocator = gpa, - .n_jobs = @min(@max(n_jobs orelse std.Thread.getCpuCount() catch 1, 1), std.math.maxInt(Zcu.PerThread.IdBacking)), - .track_ids = true, - .stack_size = thread_stack_size, - }); - defer thread_pool.deinit(); + if (n_jobs) |n| { + assert(n >= 1); + threaded_io.setThreadCapacity(@min(n, std.math.maxInt(Zcu.PerThread.IdBacking))); + } // Dummy http client that is not actually used when fetch_command is unsupported. // Prevents bootstrap from depending on a bunch of unnecessary stuff. @@ -5122,8 +5108,8 @@ fn cmdBuild(gpa: Allocator, arena: Allocator, io: Io, args: []const []const u8) defer fetch_prog_node.end(); var job_queue: Package.Fetch.JobQueue = .{ + .io = io, .http_client = &http_client, - .thread_pool = &thread_pool, .global_cache = dirs.global_cache, .read_only = false, .recursive = true, @@ -5156,7 +5142,6 @@ fn cmdBuild(gpa: Allocator, arena: Allocator, io: Io, args: []const []const u8) var fetch: Package.Fetch = .{ .arena = std.heap.ArenaAllocator.init(gpa), - .io = io, .location = .{ .relative_path = phantom_package_root }, .location_tok = 0, .hash_tok = .none, @@ -5190,10 +5175,8 @@ fn cmdBuild(gpa: Allocator, arena: Allocator, io: Io, args: []const []const u8) &fetch, ); - job_queue.thread_pool.spawnWg(&job_queue.wait_group, Package.Fetch.workerRun, .{ - &fetch, "root", - }); - job_queue.wait_group.wait(); + job_queue.wait_group.async(io, Package.Fetch.workerRun, .{ &fetch, "root" }); + job_queue.wait_group.wait(io); try job_queue.consolidateErrors(); @@ -5288,7 +5271,6 @@ fn cmdBuild(gpa: Allocator, arena: Allocator, io: Io, args: []const []const u8) .main_mod = build_mod, .emit_bin = .yes_cache, .self_exe_path = self_exe_path, - .thread_pool = &thread_pool, .verbose_cc = verbose_cc, .verbose_link = verbose_link, .verbose_air = verbose_air, @@ -5460,15 +5442,6 @@ fn jitCmd( ); defer dirs.deinit(); - var thread_pool: ThreadPool = undefined; - try thread_pool.init(.{ - .allocator = gpa, - .n_jobs = @min(@max(std.Thread.getCpuCount() catch 1, 1), std.math.maxInt(Zcu.PerThread.IdBacking)), - .track_ids = true, - .stack_size = thread_stack_size, - }); - defer thread_pool.deinit(); - var child_argv: std.ArrayListUnmanaged([]const u8) = .empty; try child_argv.ensureUnusedCapacity(arena, args.len + 4); @@ -5531,7 +5504,6 @@ fn jitCmd( .main_mod = root_mod, .emit_bin = .yes_cache, .self_exe_path = self_exe_path, - .thread_pool = &thread_pool, .cache_mode = .whole, }) catch |err| switch (err) { error.CreateFail => fatal("failed to create compilation: {f}", .{create_diag}), @@ -6875,10 +6847,6 @@ fn cmdFetch( const path_or_url = opt_path_or_url orelse fatal("missing url or path parameter", .{}); - var thread_pool: ThreadPool = undefined; - try thread_pool.init(.{ .allocator = gpa }); - defer thread_pool.deinit(); - var http_client: std.http.Client = .{ .allocator = gpa, .io = io }; defer http_client.deinit(); @@ -6899,8 +6867,8 @@ fn cmdFetch( defer global_cache_directory.handle.close(); var job_queue: Package.Fetch.JobQueue = .{ + .io = io, .http_client = &http_client, - .thread_pool = &thread_pool, .global_cache = global_cache_directory, .recursive = false, .read_only = false, @@ -6912,7 +6880,6 @@ fn cmdFetch( var fetch: Package.Fetch = .{ .arena = std.heap.ArenaAllocator.init(gpa), - .io = io, .location = .{ .path_or_url = path_or_url }, .location_tok = 0, .hash_tok = .none,