mirror of
https://github.com/ziglang/zig.git
synced 2025-12-06 22:33:08 +00:00
while still preserving the guarantee about async() being assigned a unit of concurrency (or immediately running the task), this change: * retains the error from calling getCpuCount() * spawns all threads in detached mode, using WaitGroup to join them * treats all workers the same regardless of whether they are processing concurrent or async tasks. one thread pool does all the work, while respecting async and concurrent limits.
88 lines
2.5 KiB
Zig
88 lines
2.5 KiB
Zig
const builtin = @import("builtin");
|
|
const std = @import("std");
|
|
const assert = std.debug.assert;
|
|
const WaitGroup = @This();
|
|
|
|
const is_waiting: usize = 1 << 0;
|
|
const one_pending: usize = 1 << 1;
|
|
|
|
state: std.atomic.Value(usize) = std.atomic.Value(usize).init(0),
|
|
event: std.Thread.ResetEvent = .unset,
|
|
|
|
pub fn start(self: *WaitGroup) void {
|
|
return startStateless(&self.state);
|
|
}
|
|
|
|
pub fn startStateless(state: *std.atomic.Value(usize)) void {
|
|
const prev_state = state.fetchAdd(one_pending, .monotonic);
|
|
assert((prev_state / one_pending) < (std.math.maxInt(usize) / one_pending));
|
|
}
|
|
|
|
pub fn startMany(self: *WaitGroup, n: usize) void {
|
|
const state = self.state.fetchAdd(one_pending * n, .monotonic);
|
|
assert((state / one_pending) < (std.math.maxInt(usize) / one_pending));
|
|
}
|
|
|
|
pub fn finish(self: *WaitGroup) void {
|
|
const state = self.state.fetchSub(one_pending, .acq_rel);
|
|
assert((state / one_pending) > 0);
|
|
|
|
if (state == (one_pending | is_waiting)) {
|
|
self.event.set();
|
|
}
|
|
}
|
|
|
|
pub fn finishStateless(state: *std.atomic.Value(usize), event: *std.Thread.ResetEvent) void {
|
|
const prev_state = state.fetchSub(one_pending, .acq_rel);
|
|
assert((prev_state / one_pending) > 0);
|
|
if (prev_state == (one_pending | is_waiting)) event.set();
|
|
}
|
|
|
|
pub fn wait(wg: *WaitGroup) void {
|
|
return waitStateless(&wg.state, &wg.event);
|
|
}
|
|
|
|
pub fn waitStateless(state: *std.atomic.Value(usize), event: *std.Thread.ResetEvent) void {
|
|
const prev_state = state.fetchAdd(is_waiting, .acquire);
|
|
assert(prev_state & is_waiting == 0);
|
|
if ((prev_state / one_pending) > 0) event.wait();
|
|
}
|
|
|
|
pub fn reset(self: *WaitGroup) void {
|
|
self.state.store(0, .monotonic);
|
|
self.event.reset();
|
|
}
|
|
|
|
pub fn isDone(wg: *WaitGroup) bool {
|
|
const state = wg.state.load(.acquire);
|
|
assert(state & is_waiting == 0);
|
|
|
|
return (state / one_pending) == 0;
|
|
}
|
|
|
|
pub fn value(wg: *WaitGroup) usize {
|
|
return wg.state.load(.monotonic) / one_pending;
|
|
}
|
|
|
|
// Spawns a new thread for the task. This is appropriate when the callee
|
|
// delegates all work.
|
|
pub fn spawnManager(
|
|
wg: *WaitGroup,
|
|
comptime func: anytype,
|
|
args: anytype,
|
|
) void {
|
|
if (builtin.single_threaded) {
|
|
@call(.auto, func, args);
|
|
return;
|
|
}
|
|
const Manager = struct {
|
|
fn run(wg_inner: *WaitGroup, args_inner: @TypeOf(args)) void {
|
|
defer wg_inner.finish();
|
|
@call(.auto, func, args_inner);
|
|
}
|
|
};
|
|
wg.start();
|
|
const t = std.Thread.spawn(.{}, Manager.run, .{ wg, args }) catch return Manager.run(wg, args);
|
|
t.detach();
|
|
}
|