mirror of
https://github.com/ziglang/zig.git
synced 2025-12-24 15:13:08 +00:00
compiler_rt_lib and compiler_rt_obj are extracted from the generic JobQueue into simple boolean flags, and then handled explicitly inside performAllTheWork(). Introduced generic handling of allocation failure and made setMiscFailure not return a possible error. Building the compiler-rt static library now takes advantage of Compilation's ThreadPool. This introduced a problem, however, because now each of the object files of compiler-rt all perform AstGen for the full standard library and compiler-rt files. Even though all of them end up being cache hits except for the first ones, this is wasteful - O(N*M) where N is number of compilation units inside compiler-rt and M is the number of .zig files in the standard library and compiler-rt combined. More importantly, however, it causes a deadlock, because each thread interacts with a file system lock for doing AstGen on files, and threads end up waiting for each other. This will need to be handled with a process-level file caching system, or some other creative solution.
156 lines
4.1 KiB
Zig
156 lines
4.1 KiB
Zig
const std = @import("std");
|
|
const builtin = @import("builtin");
|
|
const ThreadPool = @This();
|
|
const WaitGroup = @import("WaitGroup.zig");
|
|
|
|
mutex: std.Thread.Mutex = .{},
|
|
cond: std.Thread.Condition = .{},
|
|
run_queue: RunQueue = .{},
|
|
is_running: bool = true,
|
|
allocator: std.mem.Allocator,
|
|
threads: []std.Thread,
|
|
|
|
const RunQueue = std.SinglyLinkedList(Runnable);
|
|
const Runnable = struct {
|
|
runFn: RunProto,
|
|
};
|
|
|
|
const RunProto = switch (builtin.zig_backend) {
|
|
.stage1 => fn (*Runnable) void,
|
|
else => *const fn (*Runnable) void,
|
|
};
|
|
|
|
pub fn init(pool: *ThreadPool, allocator: std.mem.Allocator) !void {
|
|
pool.* = .{
|
|
.allocator = allocator,
|
|
.threads = &[_]std.Thread{},
|
|
};
|
|
|
|
if (builtin.single_threaded) {
|
|
return;
|
|
}
|
|
|
|
const thread_count = std.math.max(1, std.Thread.getCpuCount() catch 1);
|
|
pool.threads = try allocator.alloc(std.Thread, thread_count);
|
|
errdefer allocator.free(pool.threads);
|
|
|
|
// kill and join any threads we spawned previously on error.
|
|
var spawned: usize = 0;
|
|
errdefer pool.join(spawned);
|
|
|
|
for (pool.threads) |*thread| {
|
|
thread.* = try std.Thread.spawn(.{}, worker, .{pool});
|
|
spawned += 1;
|
|
}
|
|
}
|
|
|
|
pub fn deinit(pool: *ThreadPool) void {
|
|
pool.join(pool.threads.len); // kill and join all threads.
|
|
pool.* = undefined;
|
|
}
|
|
|
|
fn join(pool: *ThreadPool, spawned: usize) void {
|
|
if (builtin.single_threaded) {
|
|
return;
|
|
}
|
|
|
|
{
|
|
pool.mutex.lock();
|
|
defer pool.mutex.unlock();
|
|
|
|
// ensure future worker threads exit the dequeue loop
|
|
pool.is_running = false;
|
|
}
|
|
|
|
// wake up any sleeping threads (this can be done outside the mutex)
|
|
// then wait for all the threads we know are spawned to complete.
|
|
pool.cond.broadcast();
|
|
for (pool.threads[0..spawned]) |thread| {
|
|
thread.join();
|
|
}
|
|
|
|
pool.allocator.free(pool.threads);
|
|
}
|
|
|
|
pub fn spawn(pool: *ThreadPool, comptime func: anytype, args: anytype) !void {
|
|
if (builtin.single_threaded) {
|
|
@call(.{}, func, args);
|
|
return;
|
|
}
|
|
|
|
const Args = @TypeOf(args);
|
|
const Closure = struct {
|
|
arguments: Args,
|
|
pool: *ThreadPool,
|
|
run_node: RunQueue.Node = .{ .data = .{ .runFn = runFn } },
|
|
|
|
fn runFn(runnable: *Runnable) void {
|
|
const run_node = @fieldParentPtr(RunQueue.Node, "data", runnable);
|
|
const closure = @fieldParentPtr(@This(), "run_node", run_node);
|
|
@call(.{}, func, closure.arguments);
|
|
|
|
// The thread pool's allocator is protected by the mutex.
|
|
const mutex = &closure.pool.mutex;
|
|
mutex.lock();
|
|
defer mutex.unlock();
|
|
|
|
closure.pool.allocator.destroy(closure);
|
|
}
|
|
};
|
|
|
|
{
|
|
pool.mutex.lock();
|
|
defer pool.mutex.unlock();
|
|
|
|
const closure = try pool.allocator.create(Closure);
|
|
closure.* = .{
|
|
.arguments = args,
|
|
.pool = pool,
|
|
};
|
|
|
|
pool.run_queue.prepend(&closure.run_node);
|
|
}
|
|
|
|
// Notify waiting threads outside the lock to try and keep the critical section small.
|
|
pool.cond.signal();
|
|
}
|
|
|
|
fn worker(pool: *ThreadPool) void {
|
|
pool.mutex.lock();
|
|
defer pool.mutex.unlock();
|
|
|
|
while (true) {
|
|
while (pool.run_queue.popFirst()) |run_node| {
|
|
// Temporarily unlock the mutex in order to execute the run_node
|
|
pool.mutex.unlock();
|
|
defer pool.mutex.lock();
|
|
|
|
const runFn = run_node.data.runFn;
|
|
runFn(&run_node.data);
|
|
}
|
|
|
|
// Stop executing instead of waiting if the thread pool is no longer running.
|
|
if (pool.is_running) {
|
|
pool.cond.wait(&pool.mutex);
|
|
} else {
|
|
break;
|
|
}
|
|
}
|
|
}
|
|
|
|
pub fn waitAndWork(pool: *ThreadPool, wait_group: *WaitGroup) void {
|
|
while (!wait_group.isDone()) {
|
|
if (blk: {
|
|
pool.mutex.lock();
|
|
defer pool.mutex.unlock();
|
|
break :blk pool.run_queue.popFirst();
|
|
}) |run_node| {
|
|
run_node.data.runFn(&run_node.data);
|
|
continue;
|
|
}
|
|
|
|
wait_group.wait();
|
|
return;
|
|
}
|
|
}
|