From 01d33855c736b04160d9616f138442aa4e41a738 Mon Sep 17 00:00:00 2001 From: Andrew Kelley Date: Tue, 15 Dec 2020 16:55:56 -0700 Subject: [PATCH 01/10] stage2: protect mutable state from data races in updateCObject --- src/Compilation.zig | 81 +++++++++++++++++++++++++++++---------------- 1 file changed, 53 insertions(+), 28 deletions(-) diff --git a/src/Compilation.zig b/src/Compilation.zig index 5e95575642..a7fd75fe56 100644 --- a/src/Compilation.zig +++ b/src/Compilation.zig @@ -41,7 +41,12 @@ link_error_flags: link.File.ErrorFlags = .{}, work_queue: std.fifo.LinearFifo(Job, .Dynamic), +/// These jobs are to invoke the Clang compiler to create an object file, which +/// gets linked with the Compilation. +c_object_work_queue: std.fifo.LinearFifo(*CObject, .Dynamic), + /// The ErrorMsg memory is owned by the `CObject`, using Compilation's general purpose allocator. +/// This data is accessed by multiple threads and is protected by `mutex`. failed_c_objects: std.AutoArrayHashMapUnmanaged(*CObject, *ErrorMsg) = .{}, keep_source_files_loaded: bool, @@ -111,6 +116,9 @@ owned_link_dir: ?std.fs.Dir, /// Don't use this for anything other than stage1 compatibility. color: @import("main.zig").Color = .auto, +/// This mutex guards all `Compilation` mutable state. +mutex: std.Mutex = .{}, + test_filter: ?[]const u8, test_name_prefix: ?[]const u8, test_evented_io: bool, @@ -150,9 +158,6 @@ const Job = union(enum) { /// The source file containing the Decl has been updated, and so the /// Decl may need its line number information updated in the debug info. update_line_number: *Module.Decl, - /// Invoke the Clang compiler to create an object file, which gets linked - /// with the Compilation. - c_object: *CObject, /// one of the glibc static objects glibc_crt_file: glibc.CRTFile, @@ -971,6 +976,7 @@ pub fn create(gpa: *Allocator, options: InitOptions) !*Compilation { .emit_analysis = options.emit_analysis, .emit_docs = options.emit_docs, .work_queue = std.fifo.LinearFifo(Job, .Dynamic).init(gpa), + .c_object_work_queue = std.fifo.LinearFifo(*CObject, .Dynamic).init(gpa), .keep_source_files_loaded = options.keep_source_files_loaded, .use_clang = use_clang, .clang_argv = options.clang_argv, @@ -1190,11 +1196,13 @@ pub fn update(self: *Compilation) !void { const tracy = trace(@src()); defer tracy.end(); + self.c_object_cache_digest_set.clearRetainingCapacity(); + // For compiling C objects, we rely on the cache hash system to avoid duplicating work. // Add a Job for each C object. - try self.work_queue.ensureUnusedCapacity(self.c_object_table.items().len); + try self.c_object_work_queue.ensureUnusedCapacity(self.c_object_table.items().len); for (self.c_object_table.items()) |entry| { - self.work_queue.writeItemAssumeCapacity(.{ .c_object = entry.key }); + self.c_object_work_queue.writeItemAssumeCapacity(entry.key); } const use_stage1 = build_options.is_stage1 and self.bin_file.options.use_llvm; @@ -1372,6 +1380,26 @@ pub fn performAllTheWork(self: *Compilation) error{ TimerUnsupported, OutOfMemor var c_comp_progress_node = main_progress_node.start("Compile C Objects", self.c_source_files.len); defer c_comp_progress_node.end(); + while (self.c_object_work_queue.readItem()) |c_object| { + self.updateCObject(c_object, &c_comp_progress_node) catch |err| switch (err) { + error.AnalysisFail => continue, + else => { + { + var lock = self.mutex.acquire(); + defer lock.release(); + try self.failed_c_objects.ensureCapacity(self.gpa, self.failed_c_objects.items().len + 1); + self.failed_c_objects.putAssumeCapacityNoClobber(c_object, try ErrorMsg.create( + self.gpa, + 0, + "unable to build C object: {s}", + .{@errorName(err)}, + )); + } + c_object.status = .{ .failure = {} }; + }, + }; + } + while (self.work_queue.readItem()) |work_item| switch (work_item) { .codegen_decl => |decl| switch (decl.analysis) { .unreferenced => unreachable, @@ -1447,21 +1475,6 @@ pub fn performAllTheWork(self: *Compilation) error{ TimerUnsupported, OutOfMemor decl.analysis = .codegen_failure_retryable; }; }, - .c_object => |c_object| { - self.updateCObject(c_object, &c_comp_progress_node) catch |err| switch (err) { - error.AnalysisFail => continue, - else => { - try self.failed_c_objects.ensureCapacity(self.gpa, self.failed_c_objects.items().len + 1); - self.failed_c_objects.putAssumeCapacityNoClobber(c_object, try ErrorMsg.create( - self.gpa, - 0, - "unable to build C object: {s}", - .{@errorName(err)}, - )); - c_object.status = .{ .failure = {} }; - }, - }; - }, .glibc_crt_file => |crt_file| { glibc.buildCRTFile(self, crt_file) catch |err| { // TODO Expose this as a normal compile error rather than crashing here. @@ -1553,7 +1566,7 @@ pub fn performAllTheWork(self: *Compilation) error{ TimerUnsupported, OutOfMemor }; } -pub fn obtainCObjectCacheManifest(comp: *Compilation) Cache.Manifest { +pub fn obtainCObjectCacheManifest(comp: *const Compilation) Cache.Manifest { var man = comp.cache_parent.obtain(); // Only things that need to be added on top of the base hash, and only things @@ -1720,6 +1733,8 @@ fn updateCObject(comp: *Compilation, c_object: *CObject, c_comp_progress_node: * if (c_object.clearStatus(comp.gpa)) { // There was previous failure. + var lock = comp.mutex.acquire(); + defer lock.release(); comp.failed_c_objects.removeAssertDiscard(c_object); } @@ -1747,8 +1762,14 @@ fn updateCObject(comp: *Compilation, c_object: *CObject, c_comp_progress_node: * } { - const gop = try comp.c_object_cache_digest_set.getOrPut(comp.gpa, man.hash.peekBin()); - if (gop.found_existing) { + const is_collision = blk: { + var lock = comp.mutex.acquire(); + defer lock.release(); + + const gop = try comp.c_object_cache_digest_set.getOrPut(comp.gpa, man.hash.peekBin()); + break :blk gop.found_existing; + }; + if (is_collision) { return comp.failCObj( c_object, "the same source file was already added to the same compilation with the same flags", @@ -1929,7 +1950,7 @@ pub fn addTranslateCCArgs( /// Add common C compiler args between translate-c and C object compilation. pub fn addCCArgs( - comp: *Compilation, + comp: *const Compilation, arena: *Allocator, argv: *std.ArrayList([]const u8), ext: FileExt, @@ -2164,10 +2185,14 @@ fn failCObj(comp: *Compilation, c_object: *CObject, comptime format: []const u8, fn failCObjWithOwnedErrorMsg(comp: *Compilation, c_object: *CObject, err_msg: *ErrorMsg) InnerError { { - errdefer err_msg.destroy(comp.gpa); - try comp.failed_c_objects.ensureCapacity(comp.gpa, comp.failed_c_objects.items().len + 1); + var lock = comp.mutex.acquire(); + defer lock.release(); + { + errdefer err_msg.destroy(comp.gpa); + try comp.failed_c_objects.ensureCapacity(comp.gpa, comp.failed_c_objects.items().len + 1); + } + comp.failed_c_objects.putAssumeCapacityNoClobber(c_object, err_msg); } - comp.failed_c_objects.putAssumeCapacityNoClobber(c_object, err_msg); c_object.status = .failure; return error.AnalysisFail; } @@ -2324,7 +2349,7 @@ test "classifyFileExt" { std.testing.expectEqual(FileExt.zir, classifyFileExt("foo.zir")); } -fn haveFramePointer(comp: *Compilation) bool { +fn haveFramePointer(comp: *const Compilation) bool { // If you complicate this logic make sure you update the parent cache hash. // Right now it's not in the cache hash because the value depends on optimize_mode // and strip which are both already part of the hash. From 0d1cd0d4822628c104890af4c31cdf38c6f96d35 Mon Sep 17 00:00:00 2001 From: Andrew Kelley Date: Tue, 15 Dec 2020 18:30:29 -0700 Subject: [PATCH 02/10] use kprotty's ThreadPool implementation (v5) --- src/Compilation.zig | 62 ++++++++++++++++------- src/ThreadPool.zig | 116 ++++++++++++++++++++++++++++++++++++++++++++ src/WaitGroup.zig | 22 +++++++++ src/glibc.zig | 1 + src/libcxx.zig | 2 + src/libunwind.zig | 1 + src/main.zig | 10 ++++ src/musl.zig | 1 + src/test.zig | 15 +++++- 9 files changed, 212 insertions(+), 18 deletions(-) create mode 100644 src/ThreadPool.zig create mode 100644 src/WaitGroup.zig diff --git a/src/Compilation.zig b/src/Compilation.zig index a7fd75fe56..ef07de7b17 100644 --- a/src/Compilation.zig +++ b/src/Compilation.zig @@ -26,6 +26,8 @@ const Module = @import("Module.zig"); const Cache = @import("Cache.zig"); const stage1 = @import("stage1.zig"); const translate_c = @import("translate_c.zig"); +const ThreadPool = @import("ThreadPool.zig"); +const WaitGroup = @import("WaitGroup.zig"); /// General-purpose allocator. Used for both temporary and long-term storage. gpa: *Allocator, @@ -79,6 +81,7 @@ zig_lib_directory: Directory, local_cache_directory: Directory, global_cache_directory: Directory, libc_include_dir_list: []const []const u8, +thread_pool: *ThreadPool, /// Populated when we build the libc++ static library. A Job to build this is placed in the queue /// and resolved before calling linker.flush(). @@ -335,6 +338,7 @@ pub const InitOptions = struct { root_name: []const u8, root_pkg: ?*Package, output_mode: std.builtin.OutputMode, + thread_pool: *ThreadPool, dynamic_linker: ?[]const u8 = null, /// `null` means to not emit a binary file. emit_bin: ?EmitLoc, @@ -985,6 +989,7 @@ pub fn create(gpa: *Allocator, options: InitOptions) !*Compilation { .self_exe_path = options.self_exe_path, .libc_include_dir_list = libc_dirs.libc_include_dir_list, .sanitize_c = sanitize_c, + .thread_pool = options.thread_pool, .clang_passthrough_mode = options.clang_passthrough_mode, .clang_preprocessor_mode = options.clang_preprocessor_mode, .verbose_cc = options.verbose_cc, @@ -1380,24 +1385,14 @@ pub fn performAllTheWork(self: *Compilation) error{ TimerUnsupported, OutOfMemor var c_comp_progress_node = main_progress_node.start("Compile C Objects", self.c_source_files.len); defer c_comp_progress_node.end(); + var wg = WaitGroup{}; + defer wg.wait(); + while (self.c_object_work_queue.readItem()) |c_object| { - self.updateCObject(c_object, &c_comp_progress_node) catch |err| switch (err) { - error.AnalysisFail => continue, - else => { - { - var lock = self.mutex.acquire(); - defer lock.release(); - try self.failed_c_objects.ensureCapacity(self.gpa, self.failed_c_objects.items().len + 1); - self.failed_c_objects.putAssumeCapacityNoClobber(c_object, try ErrorMsg.create( - self.gpa, - 0, - "unable to build C object: {s}", - .{@errorName(err)}, - )); - } - c_object.status = .{ .failure = {} }; - }, - }; + wg.start(); + try self.thread_pool.spawn(workerUpdateCObject, .{ + self, c_object, &c_comp_progress_node, &wg, + }); } while (self.work_queue.readItem()) |work_item| switch (work_item) { @@ -1721,6 +1716,37 @@ pub fn cImport(comp: *Compilation, c_src: []const u8) !CImportResult { }; } +fn workerUpdateCObject( + comp: *Compilation, + c_object: *CObject, + progress_node: *std.Progress.Node, + wg: *WaitGroup, +) void { + defer wg.stop(); + + comp.updateCObject(c_object, progress_node) catch |err| switch (err) { + error.AnalysisFail => return, + else => { + { + var lock = comp.mutex.acquire(); + defer lock.release(); + comp.failed_c_objects.ensureCapacity(comp.gpa, comp.failed_c_objects.items().len + 1) catch { + fatal("TODO handle this by setting c_object.status = oom failure", .{}); + }; + comp.failed_c_objects.putAssumeCapacityNoClobber(c_object, ErrorMsg.create( + comp.gpa, + 0, + "unable to build C object: {s}", + .{@errorName(err)}, + ) catch { + fatal("TODO handle this by setting c_object.status = oom failure", .{}); + }); + } + c_object.status = .{ .failure = {} }; + }, + }; +} + fn updateCObject(comp: *Compilation, c_object: *CObject, c_comp_progress_node: *std.Progress.Node) !void { if (!build_options.have_llvm) { return comp.failCObj(c_object, "clang not available: compiler built without LLVM extensions", .{}); @@ -2800,6 +2826,7 @@ fn buildOutputFromZig( .root_name = root_name, .root_pkg = &root_pkg, .output_mode = fixed_output_mode, + .thread_pool = comp.thread_pool, .libc_installation = comp.bin_file.options.libc_installation, .emit_bin = emit_bin, .optimize_mode = optimize_mode, @@ -3173,6 +3200,7 @@ pub fn build_crt_file( .root_name = root_name, .root_pkg = null, .output_mode = output_mode, + .thread_pool = comp.thread_pool, .libc_installation = comp.bin_file.options.libc_installation, .emit_bin = emit_bin, .optimize_mode = comp.bin_file.options.optimize_mode, diff --git a/src/ThreadPool.zig b/src/ThreadPool.zig new file mode 100644 index 0000000000..a5f59a30e7 --- /dev/null +++ b/src/ThreadPool.zig @@ -0,0 +1,116 @@ +const std = @import("std"); +const ThreadPool = @This(); + +lock: std.Mutex = .{}, +is_running: bool = true, +allocator: *std.mem.Allocator, +running: usize = 0, +threads: []*std.Thread, +run_queue: RunQueue = .{}, +idle_queue: IdleQueue = .{}, + +const IdleQueue = std.SinglyLinkedList(std.AutoResetEvent); +const RunQueue = std.SinglyLinkedList(Runnable); +const Runnable = struct { + runFn: fn (*Runnable) void, +}; + +pub fn init(self: *ThreadPool, allocator: *std.mem.Allocator) !void { + self.* = .{ + .allocator = allocator, + .threads = &[_]*std.Thread{}, + }; + + errdefer self.deinit(); + + var num_threads = std.Thread.cpuCount() catch 1; + if (num_threads > 0) + self.threads = try allocator.alloc(*std.Thread, num_threads); + + while (num_threads > 0) : (num_threads -= 1) { + const thread = try std.Thread.spawn(self, runWorker); + self.threads[self.running] = thread; + self.running += 1; + } +} + +pub fn deinit(self: *ThreadPool) void { + self.shutdown(); + + std.debug.assert(!self.is_running); + for (self.threads[0..self.running]) |thread| + thread.wait(); + + defer self.threads = &[_]*std.Thread{}; + if (self.running > 0) + self.allocator.free(self.threads); +} + +pub fn shutdown(self: *ThreadPool) void { + const held = self.lock.acquire(); + + if (!self.is_running) + return held.release(); + + var idle_queue = self.idle_queue; + self.idle_queue = .{}; + self.is_running = false; + held.release(); + + while (idle_queue.popFirst()) |idle_node| + idle_node.data.set(); +} + +pub fn spawn(self: *ThreadPool, comptime func: anytype, args: anytype) !void { + const Args = @TypeOf(args); + const Closure = struct { + arguments: Args, + pool: *ThreadPool, + run_node: RunQueue.Node = .{ .data = .{ .runFn = runFn } }, + + fn runFn(runnable: *Runnable) void { + const run_node = @fieldParentPtr(RunQueue.Node, "data", runnable); + const closure = @fieldParentPtr(@This(), "run_node", run_node); + const result = @call(.{}, func, closure.arguments); + closure.pool.allocator.destroy(closure); + } + }; + + const closure = try self.allocator.create(Closure); + errdefer self.allocator.destroy(closure); + closure.* = .{ + .arguments = args, + .pool = self, + }; + + const held = self.lock.acquire(); + self.run_queue.prepend(&closure.run_node); + + const idle_node = self.idle_queue.popFirst(); + held.release(); + + if (idle_node) |node| + node.data.set(); +} + +fn runWorker(self: *ThreadPool) void { + while (true) { + const held = self.lock.acquire(); + + if (self.run_queue.popFirst()) |run_node| { + held.release(); + (run_node.data.runFn)(&run_node.data); + continue; + } + + if (!self.is_running) { + held.release(); + return; + } + + var idle_node = IdleQueue.Node{ .data = .{} }; + self.idle_queue.prepend(&idle_node); + held.release(); + idle_node.data.wait(); + } +} diff --git a/src/WaitGroup.zig b/src/WaitGroup.zig new file mode 100644 index 0000000000..295dfd39dc --- /dev/null +++ b/src/WaitGroup.zig @@ -0,0 +1,22 @@ +const std = @import("std"); +const WaitGroup = @This(); + +counter: usize = 0, +event: ?*std.AutoResetEvent = null, + +pub fn start(self: *WaitGroup) void { + _ = @atomicRmw(usize, &self.counter, .Add, 1, .SeqCst); +} + +pub fn stop(self: *WaitGroup) void { + if (@atomicRmw(usize, &self.counter, .Sub, 1, .SeqCst) == 1) + if (@atomicRmw(?*std.AutoResetEvent, &self.event, .Xchg, null, .SeqCst)) |event| + event.set(); +} + +pub fn wait(self: *WaitGroup) void { + var event = std.AutoResetEvent{}; + @atomicStore(?*std.AutoResetEvent, &self.event, &event, .SeqCst); + if (@atomicLoad(usize, &self.counter, .SeqCst) != 0) + event.wait(); +} diff --git a/src/glibc.zig b/src/glibc.zig index 15c9d743f9..413e38637b 100644 --- a/src/glibc.zig +++ b/src/glibc.zig @@ -936,6 +936,7 @@ fn buildSharedLib( .root_pkg = null, .output_mode = .Lib, .link_mode = .Dynamic, + .thread_pool = comp.thread_pool, .libc_installation = comp.bin_file.options.libc_installation, .emit_bin = emit_bin, .optimize_mode = comp.bin_file.options.optimize_mode, diff --git a/src/libcxx.zig b/src/libcxx.zig index 8de45a49b2..142d014b2f 100644 --- a/src/libcxx.zig +++ b/src/libcxx.zig @@ -162,6 +162,7 @@ pub fn buildLibCXX(comp: *Compilation) !void { .root_name = root_name, .root_pkg = null, .output_mode = output_mode, + .thread_pool = comp.thread_pool, .libc_installation = comp.bin_file.options.libc_installation, .emit_bin = emit_bin, .optimize_mode = comp.bin_file.options.optimize_mode, @@ -280,6 +281,7 @@ pub fn buildLibCXXABI(comp: *Compilation) !void { .root_name = root_name, .root_pkg = null, .output_mode = output_mode, + .thread_pool = comp.thread_pool, .libc_installation = comp.bin_file.options.libc_installation, .emit_bin = emit_bin, .optimize_mode = comp.bin_file.options.optimize_mode, diff --git a/src/libunwind.zig b/src/libunwind.zig index 9822016ae1..13a4fdf7c7 100644 --- a/src/libunwind.zig +++ b/src/libunwind.zig @@ -95,6 +95,7 @@ pub fn buildStaticLib(comp: *Compilation) !void { .root_name = root_name, .root_pkg = null, .output_mode = output_mode, + .thread_pool = comp.thread_pool, .libc_installation = comp.bin_file.options.libc_installation, .emit_bin = emit_bin, .optimize_mode = comp.bin_file.options.optimize_mode, diff --git a/src/main.zig b/src/main.zig index c54cc6515b..c25bc20d85 100644 --- a/src/main.zig +++ b/src/main.zig @@ -19,6 +19,7 @@ const LibCInstallation = @import("libc_installation.zig").LibCInstallation; const translate_c = @import("translate_c.zig"); const Cache = @import("Cache.zig"); const target_util = @import("target.zig"); +const ThreadPool = @import("ThreadPool.zig"); pub fn fatal(comptime format: []const u8, args: anytype) noreturn { std.log.emerg(format, args); @@ -1632,6 +1633,10 @@ fn buildOutputType( }; defer zig_lib_directory.handle.close(); + var thread_pool: ThreadPool = undefined; + try thread_pool.init(gpa); + defer thread_pool.deinit(); + var libc_installation: ?LibCInstallation = null; defer if (libc_installation) |*l| l.deinit(gpa); @@ -1747,6 +1752,7 @@ fn buildOutputType( .single_threaded = single_threaded, .function_sections = function_sections, .self_exe_path = self_exe_path, + .thread_pool = &thread_pool, .clang_passthrough_mode = arg_mode != .build, .clang_preprocessor_mode = clang_preprocessor_mode, .version = optional_version, @@ -2412,6 +2418,9 @@ pub fn cmdBuild(gpa: *Allocator, arena: *Allocator, args: []const []const u8) !v .directory = null, // Use the local zig-cache. .basename = exe_basename, }; + var thread_pool: ThreadPool = undefined; + try thread_pool.init(gpa); + defer thread_pool.deinit(); const comp = Compilation.create(gpa, .{ .zig_lib_directory = zig_lib_directory, .local_cache_directory = local_cache_directory, @@ -2427,6 +2436,7 @@ pub fn cmdBuild(gpa: *Allocator, arena: *Allocator, args: []const []const u8) !v .emit_h = null, .optimize_mode = .Debug, .self_exe_path = self_exe_path, + .thread_pool = &thread_pool, }) catch |err| { fatal("unable to create compilation: {}", .{@errorName(err)}); }; diff --git a/src/musl.zig b/src/musl.zig index 1a30a6e2b9..80943ab1f7 100644 --- a/src/musl.zig +++ b/src/musl.zig @@ -200,6 +200,7 @@ pub fn buildCRTFile(comp: *Compilation, crt_file: CRTFile) !void { .root_pkg = null, .output_mode = .Lib, .link_mode = .Dynamic, + .thread_pool = comp.thread_pool, .libc_installation = comp.bin_file.options.libc_installation, .emit_bin = Compilation.EmitLoc{ .directory = null, .basename = "libc.so" }, .optimize_mode = comp.bin_file.options.optimize_mode, diff --git a/src/test.zig b/src/test.zig index e0497a8cd5..795d4f0d65 100644 --- a/src/test.zig +++ b/src/test.zig @@ -10,6 +10,7 @@ const enable_qemu: bool = build_options.enable_qemu; const enable_wine: bool = build_options.enable_wine; const enable_wasmtime: bool = build_options.enable_wasmtime; const glibc_multi_install_dir: ?[]const u8 = build_options.glibc_multi_install_dir; +const ThreadPool = @import("ThreadPool.zig"); const cheader = @embedFile("link/cbe.h"); @@ -467,6 +468,10 @@ pub const TestContext = struct { defer zig_lib_directory.handle.close(); defer std.testing.allocator.free(zig_lib_directory.path.?); + var thread_pool: ThreadPool = undefined; + try thread_pool.init(std.testing.allocator); + defer thread_pool.deinit(); + for (self.cases.items) |case| { if (build_options.skip_non_native and case.target.getCpuArch() != std.Target.current.cpu.arch) continue; @@ -480,7 +485,13 @@ pub const TestContext = struct { progress.initial_delay_ns = 0; progress.refresh_rate_ns = 0; - try self.runOneCase(std.testing.allocator, &prg_node, case, zig_lib_directory); + try self.runOneCase( + std.testing.allocator, + &prg_node, + case, + zig_lib_directory, + &thread_pool, + ); } } @@ -490,6 +501,7 @@ pub const TestContext = struct { root_node: *std.Progress.Node, case: Case, zig_lib_directory: Compilation.Directory, + thread_pool: *ThreadPool, ) !void { const target_info = try std.zig.system.NativeTargetInfo.detect(allocator, case.target); const target = target_info.target; @@ -539,6 +551,7 @@ pub const TestContext = struct { .local_cache_directory = zig_cache_directory, .global_cache_directory = zig_cache_directory, .zig_lib_directory = zig_lib_directory, + .thread_pool = thread_pool, .root_name = "test_case", .target = target, // TODO: support tests for object file building, and library builds From 32fd637e57c3b4391b3f2f4499c803e3d4e8f615 Mon Sep 17 00:00:00 2001 From: Andrew Kelley Date: Fri, 18 Dec 2020 16:14:46 -0700 Subject: [PATCH 03/10] stage2: replace WaitGroup with a trivially auditable one --- src/WaitGroup.zig | 30 +++++++++++++++++++++--------- 1 file changed, 21 insertions(+), 9 deletions(-) diff --git a/src/WaitGroup.zig b/src/WaitGroup.zig index 295dfd39dc..c33d084c28 100644 --- a/src/WaitGroup.zig +++ b/src/WaitGroup.zig @@ -1,22 +1,34 @@ const std = @import("std"); const WaitGroup = @This(); +lock: std.Mutex = .{}, counter: usize = 0, -event: ?*std.AutoResetEvent = null, +event: std.AutoResetEvent = .{}, pub fn start(self: *WaitGroup) void { - _ = @atomicRmw(usize, &self.counter, .Add, 1, .SeqCst); + const held = self.lock.acquire(); + defer held.release(); + + self.counter += 1; } pub fn stop(self: *WaitGroup) void { - if (@atomicRmw(usize, &self.counter, .Sub, 1, .SeqCst) == 1) - if (@atomicRmw(?*std.AutoResetEvent, &self.event, .Xchg, null, .SeqCst)) |event| - event.set(); + const held = self.lock.acquire(); + defer held.release(); + + self.counter -= 1; + if (self.counter == 0) + self.event.set(); } pub fn wait(self: *WaitGroup) void { - var event = std.AutoResetEvent{}; - @atomicStore(?*std.AutoResetEvent, &self.event, &event, .SeqCst); - if (@atomicLoad(usize, &self.counter, .SeqCst) != 0) - event.wait(); + { + const held = self.lock.acquire(); + defer held.release(); + + if (self.counter == 0) + return; + } + + self.event.wait(); } From b2f8631a3c9b2cc04a4c78f38d164130be2fb1ae Mon Sep 17 00:00:00 2001 From: Andrew Kelley Date: Fri, 18 Dec 2020 21:50:44 -0700 Subject: [PATCH 04/10] ThreadPool: delete dead code If this errdefer did get run it would constitute a race condition. So I deleted the dead code for clarity. --- lib/std/{progress.zig => Progress.zig} | 0 src/ThreadPool.zig | 1 - 2 files changed, 1 deletion(-) rename lib/std/{progress.zig => Progress.zig} (100%) diff --git a/lib/std/progress.zig b/lib/std/Progress.zig similarity index 100% rename from lib/std/progress.zig rename to lib/std/Progress.zig diff --git a/src/ThreadPool.zig b/src/ThreadPool.zig index a5f59a30e7..6a59b684be 100644 --- a/src/ThreadPool.zig +++ b/src/ThreadPool.zig @@ -77,7 +77,6 @@ pub fn spawn(self: *ThreadPool, comptime func: anytype, args: anytype) !void { }; const closure = try self.allocator.create(Closure); - errdefer self.allocator.destroy(closure); closure.* = .{ .arguments = args, .pool = self, From aa6ef10cc657e2bbe59c362f27d7a557c43d7fae Mon Sep 17 00:00:00 2001 From: Andrew Kelley Date: Fri, 18 Dec 2020 21:51:18 -0700 Subject: [PATCH 05/10] std.Progress: make the API thread-safe We generally get away with atomic primitives, however a lock is required around the refresh function since it traverses the Node graph, and we need to be sure no references to Nodes remain after end() is called. --- CMakeLists.txt | 2 +- lib/std/Progress.zig | 487 +++++++++++++++++--------------- lib/std/special/test_runner.zig | 2 +- lib/std/std.zig | 2 +- src/Compilation.zig | 4 +- src/stage1.zig | 8 +- 6 files changed, 270 insertions(+), 235 deletions(-) diff --git a/CMakeLists.txt b/CMakeLists.txt index ea61fd95c0..2580c06066 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -410,7 +410,7 @@ set(ZIG_STAGE2_SOURCES "${CMAKE_SOURCE_DIR}/lib/std/os/windows/win32error.zig" "${CMAKE_SOURCE_DIR}/lib/std/pdb.zig" "${CMAKE_SOURCE_DIR}/lib/std/process.zig" - "${CMAKE_SOURCE_DIR}/lib/std/progress.zig" + "${CMAKE_SOURCE_DIR}/lib/std/Progress.zig" "${CMAKE_SOURCE_DIR}/lib/std/rand.zig" "${CMAKE_SOURCE_DIR}/lib/std/reset_event.zig" "${CMAKE_SOURCE_DIR}/lib/std/sort.zig" diff --git a/lib/std/Progress.zig b/lib/std/Progress.zig index 82f2801fa1..cd5d5ea79b 100644 --- a/lib/std/Progress.zig +++ b/lib/std/Progress.zig @@ -3,263 +3,298 @@ // This file is part of [zig](https://ziglang.org/), which is MIT licensed. // The MIT license requires this copyright notice to be included in all copies // and substantial portions of the software. + +//! This API non-allocating, non-fallible, and thread-safe. +//! The tradeoff is that users of this API must provide the storage +//! for each `Progress.Node`. +//! +//! Initialize the struct directly, overriding these fields as desired: +//! * `refresh_rate_ms` +//! * `initial_delay_ms` + const std = @import("std"); const windows = std.os.windows; const testing = std.testing; const assert = std.debug.assert; +const Progress = @This(); -/// This API is non-allocating and non-fallible. The tradeoff is that users of -/// this API must provide the storage for each `Progress.Node`. -/// Initialize the struct directly, overriding these fields as desired: -/// * `refresh_rate_ms` -/// * `initial_delay_ms` -pub const Progress = struct { - /// `null` if the current node (and its children) should - /// not print on update() - terminal: ?std.fs.File = undefined, +/// `null` if the current node (and its children) should +/// not print on update() +terminal: ?std.fs.File = undefined, - /// Whether the terminal supports ANSI escape codes. - supports_ansi_escape_codes: bool = false, +/// Whether the terminal supports ANSI escape codes. +supports_ansi_escape_codes: bool = false, - root: Node = undefined, +root: Node = undefined, - /// Keeps track of how much time has passed since the beginning. - /// Used to compare with `initial_delay_ms` and `refresh_rate_ms`. - timer: std.time.Timer = undefined, +/// Keeps track of how much time has passed since the beginning. +/// Used to compare with `initial_delay_ms` and `refresh_rate_ms`. +timer: std.time.Timer = undefined, - /// When the previous refresh was written to the terminal. - /// Used to compare with `refresh_rate_ms`. - prev_refresh_timestamp: u64 = undefined, +/// When the previous refresh was written to the terminal. +/// Used to compare with `refresh_rate_ms`. +prev_refresh_timestamp: u64 = undefined, - /// This buffer represents the maximum number of bytes written to the terminal - /// with each refresh. - output_buffer: [100]u8 = undefined, +/// This buffer represents the maximum number of bytes written to the terminal +/// with each refresh. +output_buffer: [100]u8 = undefined, - /// How many nanoseconds between writing updates to the terminal. - refresh_rate_ns: u64 = 50 * std.time.ns_per_ms, +/// How many nanoseconds between writing updates to the terminal. +refresh_rate_ns: u64 = 50 * std.time.ns_per_ms, - /// How many nanoseconds to keep the output hidden - initial_delay_ns: u64 = 500 * std.time.ns_per_ms, +/// How many nanoseconds to keep the output hidden +initial_delay_ns: u64 = 500 * std.time.ns_per_ms, - done: bool = true, +done: bool = true, - /// Keeps track of how many columns in the terminal have been output, so that - /// we can move the cursor back later. - columns_written: usize = undefined, +/// Protects the `refresh` function, as well as `node.recently_updated_child`. +/// Without this, callsites would call `Node.end` and then free `Node` memory +/// while it was still being accessed by the `refresh` function. +update_lock: std.Mutex = .{}, - /// Represents one unit of progress. Each node can have children nodes, or - /// one can use integers with `update`. - pub const Node = struct { - context: *Progress, - parent: ?*Node, - completed_items: usize, - name: []const u8, - recently_updated_child: ?*Node = null, +/// Keeps track of how many columns in the terminal have been output, so that +/// we can move the cursor back later. +columns_written: usize = undefined, - /// This field may be updated freely. - estimated_total_items: ?usize, +/// Represents one unit of progress. Each node can have children nodes, or +/// one can use integers with `update`. +pub const Node = struct { + context: *Progress, + parent: ?*Node, + name: []const u8, + /// Must be handled atomically to be thread-safe. + recently_updated_child: ?*Node = null, + /// Must be handled atomically to be thread-safe. 0 means null. + unprotected_estimated_total_items: usize, + /// Must be handled atomically to be thread-safe. + unprotected_completed_items: usize, - /// Create a new child progress node. - /// Call `Node.end` when done. - /// TODO solve https://github.com/ziglang/zig/issues/2765 and then change this - /// API to set `self.parent.recently_updated_child` with the return value. - /// Until that is fixed you probably want to call `activate` on the return value. - pub fn start(self: *Node, name: []const u8, estimated_total_items: ?usize) Node { - return Node{ - .context = self.context, - .parent = self, - .completed_items = 0, - .name = name, - .estimated_total_items = estimated_total_items, - }; - } - - /// This is the same as calling `start` and then `end` on the returned `Node`. - pub fn completeOne(self: *Node) void { - if (self.parent) |parent| parent.recently_updated_child = self; - self.completed_items += 1; - self.context.maybeRefresh(); - } - - pub fn end(self: *Node) void { - self.context.maybeRefresh(); - if (self.parent) |parent| { - if (parent.recently_updated_child) |parent_child| { - if (parent_child == self) { - parent.recently_updated_child = null; - } - } - parent.completeOne(); - } else { - self.context.done = true; - self.context.refresh(); - } - } - - /// Tell the parent node that this node is actively being worked on. - pub fn activate(self: *Node) void { - if (self.parent) |parent| parent.recently_updated_child = self; - } - }; - - /// Create a new progress node. + /// Create a new child progress node. Thread-safe. /// Call `Node.end` when done. /// TODO solve https://github.com/ziglang/zig/issues/2765 and then change this - /// API to return Progress rather than accept it as a parameter. - pub fn start(self: *Progress, name: []const u8, estimated_total_items: ?usize) !*Node { - const stderr = std.io.getStdErr(); - self.terminal = null; - if (stderr.supportsAnsiEscapeCodes()) { - self.terminal = stderr; - self.supports_ansi_escape_codes = true; - } else if (std.builtin.os.tag == .windows and stderr.isTty()) { - self.terminal = stderr; - } - self.root = Node{ - .context = self, - .parent = null, - .completed_items = 0, + /// API to set `self.parent.recently_updated_child` with the return value. + /// Until that is fixed you probably want to call `activate` on the return value. + /// Passing 0 for `estimated_total_items` means unknown. + pub fn start(self: *Node, name: []const u8, estimated_total_items: usize) Node { + return Node{ + .context = self.context, + .parent = self, .name = name, - .estimated_total_items = estimated_total_items, + .unprotected_estimated_total_items = estimated_total_items, + .unprotected_completed_items = 0, }; - self.columns_written = 0; - self.prev_refresh_timestamp = 0; - self.timer = try std.time.Timer.start(); - self.done = false; - return &self.root; } - /// Updates the terminal if enough time has passed since last update. - pub fn maybeRefresh(self: *Progress) void { - const now = self.timer.read(); - if (now < self.initial_delay_ns) return; - if (now - self.prev_refresh_timestamp < self.refresh_rate_ns) return; - self.refresh(); + /// This is the same as calling `start` and then `end` on the returned `Node`. Thread-safe. + pub fn completeOne(self: *Node) void { + self.activate(); + _ = @atomicRmw(usize, &self.unprotected_completed_items, .Add, 1, .Monotonic); + self.context.maybeRefresh(); } - /// Updates the terminal and resets `self.next_refresh_timestamp`. - pub fn refresh(self: *Progress) void { - const file = self.terminal orelse return; - - const prev_columns_written = self.columns_written; - var end: usize = 0; - if (self.columns_written > 0) { - // restore the cursor position by moving the cursor - // `columns_written` cells to the left, then clear the rest of the - // line - if (self.supports_ansi_escape_codes) { - end += (std.fmt.bufPrint(self.output_buffer[end..], "\x1b[{}D", .{self.columns_written}) catch unreachable).len; - end += (std.fmt.bufPrint(self.output_buffer[end..], "\x1b[0K", .{}) catch unreachable).len; - } else if (std.builtin.os.tag == .windows) winapi: { - var info: windows.CONSOLE_SCREEN_BUFFER_INFO = undefined; - if (windows.kernel32.GetConsoleScreenBufferInfo(file.handle, &info) != windows.TRUE) - unreachable; - - var cursor_pos = windows.COORD{ - .X = info.dwCursorPosition.X - @intCast(windows.SHORT, self.columns_written), - .Y = info.dwCursorPosition.Y, - }; - - if (cursor_pos.X < 0) - cursor_pos.X = 0; - - const fill_chars = @intCast(windows.DWORD, info.dwSize.X - cursor_pos.X); - - var written: windows.DWORD = undefined; - if (windows.kernel32.FillConsoleOutputAttribute( - file.handle, - info.wAttributes, - fill_chars, - cursor_pos, - &written, - ) != windows.TRUE) { - // Stop trying to write to this file. - self.terminal = null; - break :winapi; - } - if (windows.kernel32.FillConsoleOutputCharacterA( - file.handle, - ' ', - fill_chars, - cursor_pos, - &written, - ) != windows.TRUE) unreachable; - - if (windows.kernel32.SetConsoleCursorPosition(file.handle, cursor_pos) != windows.TRUE) - unreachable; - } else unreachable; - - self.columns_written = 0; - } - - if (!self.done) { - var need_ellipse = false; - var maybe_node: ?*Node = &self.root; - while (maybe_node) |node| { - if (need_ellipse) { - self.bufWrite(&end, "... ", .{}); - } - need_ellipse = false; - if (node.name.len != 0 or node.estimated_total_items != null) { - if (node.name.len != 0) { - self.bufWrite(&end, "{}", .{node.name}); - need_ellipse = true; - } - if (node.estimated_total_items) |total| { - if (need_ellipse) self.bufWrite(&end, " ", .{}); - self.bufWrite(&end, "[{}/{}] ", .{ node.completed_items + 1, total }); - need_ellipse = false; - } else if (node.completed_items != 0) { - if (need_ellipse) self.bufWrite(&end, " ", .{}); - self.bufWrite(&end, "[{}] ", .{node.completed_items + 1}); - need_ellipse = false; - } - } - maybe_node = node.recently_updated_child; + /// Finish a started `Node`. Thread-safe. + pub fn end(self: *Node) void { + self.context.maybeRefresh(); + if (self.parent) |parent| { + { + const held = self.context.update_lock.acquire(); + defer held.release(); + _ = @cmpxchgStrong(?*Node, &parent.recently_updated_child, self, null, .Monotonic, .Monotonic); } + parent.completeOne(); + } else { + self.context.done = true; + self.context.refresh(); + } + } + + /// Tell the parent node that this node is actively being worked on. Thread-safe. + pub fn activate(self: *Node) void { + if (self.parent) |parent| { + @atomicStore(?*Node, &parent.recently_updated_child, self, .Monotonic); + } + } + + /// Thread-safe. 0 means unknown. + pub fn setEstimatedTotalItems(self: *Node, count: usize) void { + @atomicStore(usize, &self.unprotected_estimated_total_items, count, .Monotonic); + } + + /// Thread-safe. + pub fn setCompletedItems(self: *Node, completed_items: usize) void { + @atomicStore(usize, &self.unprotected_completed_items, completed_items, .Monotonic); + } +}; + +/// Create a new progress node. +/// Call `Node.end` when done. +/// TODO solve https://github.com/ziglang/zig/issues/2765 and then change this +/// API to return Progress rather than accept it as a parameter. +/// `estimated_total_items` value of 0 means unknown. +pub fn start(self: *Progress, name: []const u8, estimated_total_items: usize) !*Node { + const stderr = std.io.getStdErr(); + self.terminal = null; + if (stderr.supportsAnsiEscapeCodes()) { + self.terminal = stderr; + self.supports_ansi_escape_codes = true; + } else if (std.builtin.os.tag == .windows and stderr.isTty()) { + self.terminal = stderr; + } + self.root = Node{ + .context = self, + .parent = null, + .name = name, + .unprotected_estimated_total_items = estimated_total_items, + .unprotected_completed_items = 0, + }; + self.columns_written = 0; + self.prev_refresh_timestamp = 0; + self.timer = try std.time.Timer.start(); + self.done = false; + return &self.root; +} + +/// Updates the terminal if enough time has passed since last update. Thread-safe. +pub fn maybeRefresh(self: *Progress) void { + const now = self.timer.read(); + if (now < self.initial_delay_ns) return; + const held = self.update_lock.tryAcquire() orelse return; + defer held.release(); + if (now - self.prev_refresh_timestamp < self.refresh_rate_ns) return; + return self.refreshWithHeldLock(); +} + +/// Updates the terminal and resets `self.next_refresh_timestamp`. Thread-safe. +pub fn refresh(self: *Progress) void { + const held = self.update_lock.tryAcquire() orelse return; + defer held.release(); + + return self.refreshWithHeldLock(); +} + +fn refreshWithHeldLock(self: *Progress) void { + const file = self.terminal orelse return; + + const prev_columns_written = self.columns_written; + var end: usize = 0; + if (self.columns_written > 0) { + // restore the cursor position by moving the cursor + // `columns_written` cells to the left, then clear the rest of the + // line + if (self.supports_ansi_escape_codes) { + end += (std.fmt.bufPrint(self.output_buffer[end..], "\x1b[{d}D", .{self.columns_written}) catch unreachable).len; + end += (std.fmt.bufPrint(self.output_buffer[end..], "\x1b[0K", .{}) catch unreachable).len; + } else if (std.builtin.os.tag == .windows) winapi: { + var info: windows.CONSOLE_SCREEN_BUFFER_INFO = undefined; + if (windows.kernel32.GetConsoleScreenBufferInfo(file.handle, &info) != windows.TRUE) + unreachable; + + var cursor_pos = windows.COORD{ + .X = info.dwCursorPosition.X - @intCast(windows.SHORT, self.columns_written), + .Y = info.dwCursorPosition.Y, + }; + + if (cursor_pos.X < 0) + cursor_pos.X = 0; + + const fill_chars = @intCast(windows.DWORD, info.dwSize.X - cursor_pos.X); + + var written: windows.DWORD = undefined; + if (windows.kernel32.FillConsoleOutputAttribute( + file.handle, + info.wAttributes, + fill_chars, + cursor_pos, + &written, + ) != windows.TRUE) { + // Stop trying to write to this file. + self.terminal = null; + break :winapi; + } + if (windows.kernel32.FillConsoleOutputCharacterA( + file.handle, + ' ', + fill_chars, + cursor_pos, + &written, + ) != windows.TRUE) unreachable; + + if (windows.kernel32.SetConsoleCursorPosition(file.handle, cursor_pos) != windows.TRUE) + unreachable; + } else unreachable; + + self.columns_written = 0; + } + + if (!self.done) { + var need_ellipse = false; + var maybe_node: ?*Node = &self.root; + while (maybe_node) |node| { if (need_ellipse) { self.bufWrite(&end, "... ", .{}); } + need_ellipse = false; + const eti = @atomicLoad(usize, &node.unprotected_estimated_total_items, .Monotonic); + const completed_items = @atomicLoad(usize, &node.unprotected_completed_items, .Monotonic); + if (node.name.len != 0 or eti > 0) { + if (node.name.len != 0) { + self.bufWrite(&end, "{s}", .{node.name}); + need_ellipse = true; + } + if (eti > 0) { + if (need_ellipse) self.bufWrite(&end, " ", .{}); + self.bufWrite(&end, "[{d}/{d}] ", .{ completed_items + 1, eti }); + need_ellipse = false; + } else if (completed_items != 0) { + if (need_ellipse) self.bufWrite(&end, " ", .{}); + self.bufWrite(&end, "[{d}] ", .{completed_items + 1}); + need_ellipse = false; + } + } + maybe_node = @atomicLoad(?*Node, &node.recently_updated_child, .Monotonic); } - - _ = file.write(self.output_buffer[0..end]) catch |e| { - // Stop trying to write to this file once it errors. - self.terminal = null; - }; - self.prev_refresh_timestamp = self.timer.read(); - } - - pub fn log(self: *Progress, comptime format: []const u8, args: anytype) void { - const file = self.terminal orelse return; - self.refresh(); - file.outStream().print(format, args) catch { - self.terminal = null; - return; - }; - self.columns_written = 0; - } - - fn bufWrite(self: *Progress, end: *usize, comptime format: []const u8, args: anytype) void { - if (std.fmt.bufPrint(self.output_buffer[end.*..], format, args)) |written| { - const amt = written.len; - end.* += amt; - self.columns_written += amt; - } else |err| switch (err) { - error.NoSpaceLeft => { - self.columns_written += self.output_buffer.len - end.*; - end.* = self.output_buffer.len; - }, - } - const bytes_needed_for_esc_codes_at_end = if (std.builtin.os.tag == .windows) 0 else 11; - const max_end = self.output_buffer.len - bytes_needed_for_esc_codes_at_end; - if (end.* > max_end) { - const suffix = "... "; - self.columns_written = self.columns_written - (end.* - max_end) + suffix.len; - std.mem.copy(u8, self.output_buffer[max_end..], suffix); - end.* = max_end + suffix.len; + if (need_ellipse) { + self.bufWrite(&end, "... ", .{}); } } -}; + + _ = file.write(self.output_buffer[0..end]) catch |e| { + // Stop trying to write to this file once it errors. + self.terminal = null; + }; + self.prev_refresh_timestamp = self.timer.read(); +} + +pub fn log(self: *Progress, comptime format: []const u8, args: anytype) void { + const file = self.terminal orelse return; + self.refresh(); + file.outStream().print(format, args) catch { + self.terminal = null; + return; + }; + self.columns_written = 0; +} + +fn bufWrite(self: *Progress, end: *usize, comptime format: []const u8, args: anytype) void { + if (std.fmt.bufPrint(self.output_buffer[end.*..], format, args)) |written| { + const amt = written.len; + end.* += amt; + self.columns_written += amt; + } else |err| switch (err) { + error.NoSpaceLeft => { + self.columns_written += self.output_buffer.len - end.*; + end.* = self.output_buffer.len; + }, + } + const bytes_needed_for_esc_codes_at_end = if (std.builtin.os.tag == .windows) 0 else 11; + const max_end = self.output_buffer.len - bytes_needed_for_esc_codes_at_end; + if (end.* > max_end) { + const suffix = "... "; + self.columns_written = self.columns_written - (end.* - max_end) + suffix.len; + std.mem.copy(u8, self.output_buffer[max_end..], suffix); + end.* = max_end + suffix.len; + } +} test "basic functionality" { var disable = true; @@ -300,7 +335,7 @@ test "basic functionality" { std.time.sleep(5 * std.time.ns_per_ms); } { - var node = root_node.start("this is a really long name designed to activate the truncation code. let's find out if it works", null); + var node = root_node.start("this is a really long name designed to activate the truncation code. let's find out if it works", 0); node.activate(); std.time.sleep(10 * std.time.ns_per_ms); progress.refresh(); diff --git a/lib/std/special/test_runner.zig b/lib/std/special/test_runner.zig index 2b2fe78262..4bb9202858 100644 --- a/lib/std/special/test_runner.zig +++ b/lib/std/special/test_runner.zig @@ -36,7 +36,7 @@ pub fn main() anyerror!void { } std.testing.log_level = .warn; - var test_node = root_node.start(test_fn.name, null); + var test_node = root_node.start(test_fn.name, 0); test_node.activate(); progress.refresh(); if (progress.terminal == null) { diff --git a/lib/std/std.zig b/lib/std/std.zig index f6da7afc55..5fbf2662b9 100644 --- a/lib/std/std.zig +++ b/lib/std/std.zig @@ -29,7 +29,7 @@ pub const PackedIntArrayEndian = @import("packed_int_array.zig").PackedIntArrayE pub const PackedIntSlice = @import("packed_int_array.zig").PackedIntSlice; pub const PackedIntSliceEndian = @import("packed_int_array.zig").PackedIntSliceEndian; pub const PriorityQueue = @import("priority_queue.zig").PriorityQueue; -pub const Progress = @import("progress.zig").Progress; +pub const Progress = @import("Progress.zig"); pub const ResetEvent = @import("reset_event.zig").ResetEvent; pub const SemanticVersion = @import("SemanticVersion.zig"); pub const SinglyLinkedList = @import("linked_list.zig").SinglyLinkedList; diff --git a/src/Compilation.zig b/src/Compilation.zig index ef07de7b17..1c27a589ee 100644 --- a/src/Compilation.zig +++ b/src/Compilation.zig @@ -1378,7 +1378,7 @@ pub fn getAllErrorsAlloc(self: *Compilation) !AllErrors { pub fn performAllTheWork(self: *Compilation) error{ TimerUnsupported, OutOfMemory }!void { var progress: std.Progress = .{}; - var main_progress_node = try progress.start("", null); + var main_progress_node = try progress.start("", 0); defer main_progress_node.end(); if (self.color == .off) progress.terminal = null; @@ -1811,7 +1811,7 @@ fn updateCObject(comp: *Compilation, c_object: *CObject, c_comp_progress_node: * const c_source_basename = std.fs.path.basename(c_object.src.src_path); c_comp_progress_node.activate(); - var child_progress_node = c_comp_progress_node.start(c_source_basename, null); + var child_progress_node = c_comp_progress_node.start(c_source_basename, 0); child_progress_node.activate(); defer child_progress_node.end(); diff --git a/src/stage1.zig b/src/stage1.zig index 8576459aa8..7f2d3d732d 100644 --- a/src/stage1.zig +++ b/src/stage1.zig @@ -293,7 +293,7 @@ export fn stage2_progress_start_root( ) *std.Progress.Node { return progress.start( name_ptr[0..name_len], - if (estimated_total_items == 0) null else estimated_total_items, + estimated_total_items, ) catch @panic("timer unsupported"); } @@ -312,7 +312,7 @@ export fn stage2_progress_start( const child_node = std.heap.c_allocator.create(std.Progress.Node) catch @panic("out of memory"); child_node.* = node.start( name_ptr[0..name_len], - if (estimated_total_items == 0) null else estimated_total_items, + estimated_total_items, ); child_node.activate(); return child_node; @@ -333,8 +333,8 @@ export fn stage2_progress_complete_one(node: *std.Progress.Node) void { // ABI warning export fn stage2_progress_update_node(node: *std.Progress.Node, done_count: usize, total_count: usize) void { - node.completed_items = done_count; - node.estimated_total_items = total_count; + node.setCompletedItems(done_count); + node.setEstimatedTotalItems(total_count); node.activate(); node.context.maybeRefresh(); } From e00b6db2aa2e3aa7e61c4b1ccebf4ebdb6e2d45a Mon Sep 17 00:00:00 2001 From: Andrew Kelley Date: Sat, 19 Dec 2020 14:52:12 -0700 Subject: [PATCH 06/10] update stage2 test harness to new std.Progress API --- src/test.zig | 24 ++++++++++++------------ 1 file changed, 12 insertions(+), 12 deletions(-) diff --git a/src/test.zig b/src/test.zig index 795d4f0d65..bc141c94f0 100644 --- a/src/test.zig +++ b/src/test.zig @@ -578,12 +578,12 @@ pub const TestContext = struct { update_node.activate(); defer update_node.end(); - var sync_node = update_node.start("write", null); + var sync_node = update_node.start("write", 0); sync_node.activate(); try tmp.dir.writeFile(tmp_src_path, update.src); sync_node.end(); - var module_node = update_node.start("parse/analysis/codegen", null); + var module_node = update_node.start("parse/analysis/codegen", 0); module_node.activate(); try comp.makeBinFileWritable(); try comp.update(); @@ -635,21 +635,21 @@ pub const TestContext = struct { } } } else { - update_node.estimated_total_items = 5; - var emit_node = update_node.start("emit", null); + update_node.setEstimatedTotalItems(5); + var emit_node = update_node.start("emit", 0); emit_node.activate(); var new_zir_module = try zir.emit(allocator, comp.bin_file.options.module.?); defer new_zir_module.deinit(allocator); emit_node.end(); - var write_node = update_node.start("write", null); + var write_node = update_node.start("write", 0); write_node.activate(); var out_zir = std.ArrayList(u8).init(allocator); defer out_zir.deinit(); try new_zir_module.writeToStream(allocator, out_zir.outStream()); write_node.end(); - var test_node = update_node.start("assert", null); + var test_node = update_node.start("assert", 0); test_node.activate(); defer test_node.end(); @@ -666,7 +666,7 @@ pub const TestContext = struct { } }, .Error => |e| { - var test_node = update_node.start("assert", null); + var test_node = update_node.start("assert", 0); test_node.activate(); defer test_node.end(); var handled_errors = try arena.alloc(bool, e.len); @@ -723,9 +723,9 @@ pub const TestContext = struct { .Execution => |expected_stdout| { std.debug.assert(!case.cbe); - update_node.estimated_total_items = 4; + update_node.setEstimatedTotalItems(4); var exec_result = x: { - var exec_node = update_node.start("execute", null); + var exec_node = update_node.start("execute", 0); exec_node.activate(); defer exec_node.end(); @@ -788,7 +788,7 @@ pub const TestContext = struct { .cwd_dir = tmp.dir, }); }; - var test_node = update_node.start("test", null); + var test_node = update_node.start("test", 0); test_node.activate(); defer test_node.end(); defer allocator.free(exec_result.stdout); @@ -867,7 +867,7 @@ pub const TestContext = struct { }; { - var load_node = update_node.start("load", null); + var load_node = update_node.start("load", 0); load_node.activate(); defer load_node.end(); @@ -905,7 +905,7 @@ pub const TestContext = struct { } } - var exec_node = update_node.start("execute", null); + var exec_node = update_node.start("execute", 0); exec_node.activate(); defer exec_node.end(); From 4e621d4260ed752995dedc50a240931fc0e0941f Mon Sep 17 00:00:00 2001 From: Andrew Kelley Date: Sat, 19 Dec 2020 15:03:03 -0700 Subject: [PATCH 07/10] workaround for std lib AutoResetEvent bug --- lib/std/auto_reset_event.zig | 44 ++++++++++++++++++------------------ src/Event.zig | 43 +++++++++++++++++++++++++++++++++++ src/ThreadPool.zig | 5 ++++ src/WaitGroup.zig | 24 +++++++++++++------- 4 files changed, 86 insertions(+), 30 deletions(-) create mode 100644 src/Event.zig diff --git a/lib/std/auto_reset_event.zig b/lib/std/auto_reset_event.zig index 7e13dc1aba..3c7e65e362 100644 --- a/lib/std/auto_reset_event.zig +++ b/lib/std/auto_reset_event.zig @@ -11,33 +11,33 @@ const assert = std.debug.assert; /// Similar to std.ResetEvent but on `set()` it also (atomically) does `reset()`. /// Unlike std.ResetEvent, `wait()` can only be called by one thread (MPSC-like). pub const AutoResetEvent = struct { - // AutoResetEvent has 3 possible states: - // - UNSET: the AutoResetEvent is currently unset - // - SET: the AutoResetEvent was notified before a wait() was called - // - : there is an active waiter waiting for a notification. - // - // When attempting to wait: - // if the event is unset, it registers a ResetEvent pointer to be notified when the event is set - // if the event is already set, then it consumes the notification and resets the event. - // - // When attempting to notify: - // if the event is unset, then we set the event - // if theres a waiting ResetEvent, then we unset the event and notify the ResetEvent - // - // This ensures that the event is automatically reset after a wait() has been issued - // and avoids the race condition when using std.ResetEvent in the following scenario: - // thread 1 | thread 2 - // std.ResetEvent.wait() | - // | std.ResetEvent.set() - // | std.ResetEvent.set() - // std.ResetEvent.reset() | - // std.ResetEvent.wait() | (missed the second .set() notification above) + /// AutoResetEvent has 3 possible states: + /// - UNSET: the AutoResetEvent is currently unset + /// - SET: the AutoResetEvent was notified before a wait() was called + /// - : there is an active waiter waiting for a notification. + /// + /// When attempting to wait: + /// if the event is unset, it registers a ResetEvent pointer to be notified when the event is set + /// if the event is already set, then it consumes the notification and resets the event. + /// + /// When attempting to notify: + /// if the event is unset, then we set the event + /// if theres a waiting ResetEvent, then we unset the event and notify the ResetEvent + /// + /// This ensures that the event is automatically reset after a wait() has been issued + /// and avoids the race condition when using std.ResetEvent in the following scenario: + /// thread 1 | thread 2 + /// std.ResetEvent.wait() | + /// | std.ResetEvent.set() + /// | std.ResetEvent.set() + /// std.ResetEvent.reset() | + /// std.ResetEvent.wait() | (missed the second .set() notification above) state: usize = UNSET, const UNSET = 0; const SET = 1; - // the minimum alignment for the `*std.ResetEvent` created by wait*() + /// the minimum alignment for the `*std.ResetEvent` created by wait*() const event_align = std.math.max(@alignOf(std.ResetEvent), 2); pub fn wait(self: *AutoResetEvent) void { diff --git a/src/Event.zig b/src/Event.zig new file mode 100644 index 0000000000..2b8d7be998 --- /dev/null +++ b/src/Event.zig @@ -0,0 +1,43 @@ +// SPDX-License-Identifier: MIT +// Copyright (c) 2015-2020 Zig Contributors +// This file is part of [zig](https://ziglang.org/), which is MIT licensed. +// The MIT license requires this copyright notice to be included in all copies +// and substantial portions of the software. +const std = @import("std"); +const Event = @This(); + +lock: std.Mutex = .{}, +event: std.ResetEvent = undefined, +state: enum { empty, waiting, notified } = .empty, + +pub fn wait(self: *Event) void { + const held = self.lock.acquire(); + + switch (self.state) { + .empty => { + self.state = .waiting; + self.event = @TypeOf(self.event).init(); + held.release(); + self.event.wait(); + self.event.deinit(); + }, + .waiting => unreachable, + .notified => held.release(), + } +} + +pub fn set(self: *Event) void { + const held = self.lock.acquire(); + + switch (self.state) { + .empty => { + self.state = .notified; + held.release(); + }, + .waiting => { + held.release(); + self.event.set(); + }, + .notified => unreachable, + } +} diff --git a/src/ThreadPool.zig b/src/ThreadPool.zig index 6a59b684be..7d6af3d24c 100644 --- a/src/ThreadPool.zig +++ b/src/ThreadPool.zig @@ -1,3 +1,8 @@ +// SPDX-License-Identifier: MIT +// Copyright (c) 2015-2020 Zig Contributors +// This file is part of [zig](https://ziglang.org/), which is MIT licensed. +// The MIT license requires this copyright notice to be included in all copies +// and substantial portions of the software. const std = @import("std"); const ThreadPool = @This(); diff --git a/src/WaitGroup.zig b/src/WaitGroup.zig index c33d084c28..f17ab580d3 100644 --- a/src/WaitGroup.zig +++ b/src/WaitGroup.zig @@ -1,9 +1,15 @@ +// SPDX-License-Identifier: MIT +// Copyright (c) 2015-2020 Zig Contributors +// This file is part of [zig](https://ziglang.org/), which is MIT licensed. +// The MIT license requires this copyright notice to be included in all copies +// and substantial portions of the software. const std = @import("std"); const WaitGroup = @This(); +const Event = @import("Event.zig"); lock: std.Mutex = .{}, counter: usize = 0, -event: std.AutoResetEvent = .{}, +event: Event = .{}, pub fn start(self: *WaitGroup) void { const held = self.lock.acquire(); @@ -22,13 +28,15 @@ pub fn stop(self: *WaitGroup) void { } pub fn wait(self: *WaitGroup) void { - { - const held = self.lock.acquire(); - defer held.release(); + while (true) { + { + const held = self.lock.acquire(); + defer held.release(); - if (self.counter == 0) - return; + if (self.counter == 0) + return; + } + + self.event.wait(); } - - self.event.wait(); } From fbcffe9d5de64c792add092be2af317345a151f6 Mon Sep 17 00:00:00 2001 From: Andrew Kelley Date: Sat, 19 Dec 2020 16:12:33 -0700 Subject: [PATCH 08/10] std.Progress: fix atomic ordering semantics thx king protty --- lib/std/Progress.zig | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/lib/std/Progress.zig b/lib/std/Progress.zig index cd5d5ea79b..ae9b1783be 100644 --- a/lib/std/Progress.zig +++ b/lib/std/Progress.zig @@ -111,7 +111,7 @@ pub const Node = struct { /// Tell the parent node that this node is actively being worked on. Thread-safe. pub fn activate(self: *Node) void { if (self.parent) |parent| { - @atomicStore(?*Node, &parent.recently_updated_child, self, .Monotonic); + @atomicStore(?*Node, &parent.recently_updated_child, self, .Release); } } @@ -251,7 +251,7 @@ fn refreshWithHeldLock(self: *Progress) void { need_ellipse = false; } } - maybe_node = @atomicLoad(?*Node, &node.recently_updated_child, .Monotonic); + maybe_node = @atomicLoad(?*Node, &node.recently_updated_child, .Acquire); } if (need_ellipse) { self.bufWrite(&end, "... ", .{}); From 10d30838d1887a8585e79ad7a680d3067674c9d9 Mon Sep 17 00:00:00 2001 From: Andrew Kelley Date: Sat, 19 Dec 2020 16:13:28 -0700 Subject: [PATCH 09/10] update WaitGroup to yet another version --- src/WaitGroup.zig | 26 +++++++++++++++----------- 1 file changed, 15 insertions(+), 11 deletions(-) diff --git a/src/WaitGroup.zig b/src/WaitGroup.zig index f17ab580d3..2c1b49224b 100644 --- a/src/WaitGroup.zig +++ b/src/WaitGroup.zig @@ -9,7 +9,7 @@ const Event = @import("Event.zig"); lock: std.Mutex = .{}, counter: usize = 0, -event: Event = .{}, +event: ?*Event = null, pub fn start(self: *WaitGroup) void { const held = self.lock.acquire(); @@ -19,24 +19,28 @@ pub fn start(self: *WaitGroup) void { } pub fn stop(self: *WaitGroup) void { + var event: ?*Event = null; + defer if (event) |waiter| + waiter.set(); + const held = self.lock.acquire(); defer held.release(); self.counter -= 1; if (self.counter == 0) - self.event.set(); + std.mem.swap(?*Event, &self.event, &event); } pub fn wait(self: *WaitGroup) void { - while (true) { - { - const held = self.lock.acquire(); - defer held.release(); + var event = Event{}; + var has_event = false; + defer if (has_event) + event.wait(); - if (self.counter == 0) - return; - } + const held = self.lock.acquire(); + defer held.release(); - self.event.wait(); - } + has_event = self.counter != 0; + if (has_event) + self.event = &event; } From 1d94a6893689ad1fb8e06308ae51603a6c8708a8 Mon Sep 17 00:00:00 2001 From: Andrew Kelley Date: Sun, 20 Dec 2020 15:37:58 -0700 Subject: [PATCH 10/10] add an option to compile zig in single-threaded mode And enable it for Drone CI. I hate to do this, but I need to make progress on other fronts. --- CMakeLists.txt | 10 ++++++++++ ci/drone/linux_script | 3 ++- src/Compilation.zig | 12 +++++++----- src/ThreadPool.zig | 6 ++++++ src/stage1/zig0.cpp | 4 ++++ 5 files changed, 29 insertions(+), 6 deletions(-) diff --git a/CMakeLists.txt b/CMakeLists.txt index 2580c06066..e3c4e67c5a 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -75,6 +75,7 @@ set(ZIG_TARGET_TRIPLE "native" CACHE STRING "arch-os-abi to output binaries for" set(ZIG_TARGET_MCPU "baseline" CACHE STRING "-mcpu parameter to output binaries for") set(ZIG_EXECUTABLE "" CACHE STRING "(when cross compiling) path to already-built zig binary") set(ZIG_PREFER_LLVM_CONFIG off CACHE BOOL "(when cross compiling) use llvm-config to find target llvm dependencies if needed") +set(ZIG_SINGLE_THREADED off CACHE BOOL "limit the zig compiler to use only 1 thread") find_package(llvm) find_package(clang) @@ -510,10 +511,13 @@ set(ZIG_STAGE2_SOURCES "${CMAKE_SOURCE_DIR}/src/Cache.zig" "${CMAKE_SOURCE_DIR}/src/Compilation.zig" "${CMAKE_SOURCE_DIR}/src/DepTokenizer.zig" + "${CMAKE_SOURCE_DIR}/src/Event.zig" "${CMAKE_SOURCE_DIR}/src/Module.zig" "${CMAKE_SOURCE_DIR}/src/Package.zig" "${CMAKE_SOURCE_DIR}/src/RangeSet.zig" + "${CMAKE_SOURCE_DIR}/src/ThreadPool.zig" "${CMAKE_SOURCE_DIR}/src/TypedValue.zig" + "${CMAKE_SOURCE_DIR}/src/WaitGroup.zig" "${CMAKE_SOURCE_DIR}/src/astgen.zig" "${CMAKE_SOURCE_DIR}/src/clang.zig" "${CMAKE_SOURCE_DIR}/src/clang_options.zig" @@ -713,6 +717,11 @@ if("${CMAKE_BUILD_TYPE}" STREQUAL "Debug") else() set(ZIG1_RELEASE_ARG -OReleaseFast --strip) endif() +if(ZIG_SINGLE_THREADED) + set(ZIG1_SINGLE_THREADED_ARG "--single-threaded") +else() + set(ZIG1_SINGLE_THREADED_ARG "") +endif() set(BUILD_ZIG1_ARGS "src/stage1.zig" @@ -722,6 +731,7 @@ set(BUILD_ZIG1_ARGS --override-lib-dir "${CMAKE_SOURCE_DIR}/lib" "-femit-bin=${ZIG1_OBJECT}" "${ZIG1_RELEASE_ARG}" + "${ZIG1_SINGLE_THREADED_ARG}" -lc --pkg-begin build_options "${ZIG_CONFIG_ZIG_OUT}" --pkg-end diff --git a/ci/drone/linux_script b/ci/drone/linux_script index fdc1704fb7..8c5dc1be2a 100755 --- a/ci/drone/linux_script +++ b/ci/drone/linux_script @@ -17,7 +17,8 @@ git config core.abbrev 9 mkdir build cd build -cmake .. -DCMAKE_BUILD_TYPE=Release "-DCMAKE_INSTALL_PREFIX=$DISTDIR" -DZIG_STATIC=ON -DCMAKE_PREFIX_PATH=/deps/local -GNinja +# TODO figure out why Drone CI is deadlocking and stop passing -DZIG_SINGLE_THREADED=ON +cmake .. -DCMAKE_BUILD_TYPE=Release "-DCMAKE_INSTALL_PREFIX=$DISTDIR" -DZIG_STATIC=ON -DCMAKE_PREFIX_PATH=/deps/local -GNinja -DZIG_SINGLE_THREADED=ON samu install ./zig build test -Dskip-release -Dskip-non-native diff --git a/src/Compilation.zig b/src/Compilation.zig index 1c27a589ee..23f67f5b37 100644 --- a/src/Compilation.zig +++ b/src/Compilation.zig @@ -1728,7 +1728,7 @@ fn workerUpdateCObject( error.AnalysisFail => return, else => { { - var lock = comp.mutex.acquire(); + const lock = comp.mutex.acquire(); defer lock.release(); comp.failed_c_objects.ensureCapacity(comp.gpa, comp.failed_c_objects.items().len + 1) catch { fatal("TODO handle this by setting c_object.status = oom failure", .{}); @@ -1759,7 +1759,7 @@ fn updateCObject(comp: *Compilation, c_object: *CObject, c_comp_progress_node: * if (c_object.clearStatus(comp.gpa)) { // There was previous failure. - var lock = comp.mutex.acquire(); + const lock = comp.mutex.acquire(); defer lock.release(); comp.failed_c_objects.removeAssertDiscard(c_object); } @@ -1789,10 +1789,12 @@ fn updateCObject(comp: *Compilation, c_object: *CObject, c_comp_progress_node: * { const is_collision = blk: { - var lock = comp.mutex.acquire(); + const bin_digest = man.hash.peekBin(); + + const lock = comp.mutex.acquire(); defer lock.release(); - const gop = try comp.c_object_cache_digest_set.getOrPut(comp.gpa, man.hash.peekBin()); + const gop = try comp.c_object_cache_digest_set.getOrPut(comp.gpa, bin_digest); break :blk gop.found_existing; }; if (is_collision) { @@ -2211,7 +2213,7 @@ fn failCObj(comp: *Compilation, c_object: *CObject, comptime format: []const u8, fn failCObjWithOwnedErrorMsg(comp: *Compilation, c_object: *CObject, err_msg: *ErrorMsg) InnerError { { - var lock = comp.mutex.acquire(); + const lock = comp.mutex.acquire(); defer lock.release(); { errdefer err_msg.destroy(comp.gpa); diff --git a/src/ThreadPool.zig b/src/ThreadPool.zig index 7d6af3d24c..00cb26772a 100644 --- a/src/ThreadPool.zig +++ b/src/ThreadPool.zig @@ -25,6 +25,8 @@ pub fn init(self: *ThreadPool, allocator: *std.mem.Allocator) !void { .allocator = allocator, .threads = &[_]*std.Thread{}, }; + if (std.builtin.single_threaded) + return; errdefer self.deinit(); @@ -67,6 +69,10 @@ pub fn shutdown(self: *ThreadPool) void { } pub fn spawn(self: *ThreadPool, comptime func: anytype, args: anytype) !void { + if (std.builtin.single_threaded) { + @call(.{}, func, args); + return; + } const Args = @TypeOf(args); const Closure = struct { arguments: Args, diff --git a/src/stage1/zig0.cpp b/src/stage1/zig0.cpp index 73f5b4f685..bcc9dbc00a 100644 --- a/src/stage1/zig0.cpp +++ b/src/stage1/zig0.cpp @@ -266,6 +266,7 @@ int main(int argc, char **argv) { TargetSubsystem subsystem = TargetSubsystemAuto; const char *override_lib_dir = nullptr; const char *mcpu = nullptr; + bool single_threaded = false; for (int i = 1; i < argc; i += 1) { char *arg = argv[i]; @@ -281,6 +282,8 @@ int main(int argc, char **argv) { optimize_mode = BuildModeSafeRelease; } else if (strcmp(arg, "-OReleaseSmall") == 0) { optimize_mode = BuildModeSmallRelease; + } else if (strcmp(arg, "--single-threaded") == 0) { + single_threaded = true; } else if (strcmp(arg, "--help") == 0) { return print_full_usage(arg0, stdout, EXIT_SUCCESS); } else if (strcmp(arg, "--strip") == 0) { @@ -469,6 +472,7 @@ int main(int argc, char **argv) { stage1->link_libcpp = link_libcpp; stage1->subsystem = subsystem; stage1->pic = true; + stage1->is_single_threaded = single_threaded; zig_stage1_build_object(stage1);