mirror of
https://github.com/ziglang/zig.git
synced 2026-02-12 20:37:54 +00:00
std.Thread: ResetEvent improvements (#11523)
* std: start removing redundant ResetEvents * src: fix other uses of std.Thread.ResetEvent * src: add builtin.sanitize_thread for tsan detection * atomic: add Atomic.fence for proper fencing with tsan * Thread: remove the other ResetEvent's and rewrite the current one * Thread: ResetEvent docs * zig fmt + WaitGroup.reset() fix * src: fix build issues for ResetEvent + tsan * Thread: ResetEvent tests * Thread: ResetEvent module doc * Atomic: replace llvm *p memory constraint with *m * panicking: handle spurious wakeups in futex.wait() when waiting for abort() * zig fmt
This commit is contained in:
parent
50f1856476
commit
18f3034629
@ -533,11 +533,9 @@ set(ZIG_STAGE2_SOURCES
|
||||
"${CMAKE_SOURCE_DIR}/lib/std/target/wasm.zig"
|
||||
"${CMAKE_SOURCE_DIR}/lib/std/target/x86.zig"
|
||||
"${CMAKE_SOURCE_DIR}/lib/std/Thread.zig"
|
||||
"${CMAKE_SOURCE_DIR}/lib/std/Thread/AutoResetEvent.zig"
|
||||
"${CMAKE_SOURCE_DIR}/lib/std/Thread/Futex.zig"
|
||||
"${CMAKE_SOURCE_DIR}/lib/std/Thread/Mutex.zig"
|
||||
"${CMAKE_SOURCE_DIR}/lib/std/Thread/ResetEvent.zig"
|
||||
"${CMAKE_SOURCE_DIR}/lib/std/Thread/StaticResetEvent.zig"
|
||||
"${CMAKE_SOURCE_DIR}/lib/std/time.zig"
|
||||
"${CMAKE_SOURCE_DIR}/lib/std/treap.zig"
|
||||
"${CMAKE_SOURCE_DIR}/lib/std/unicode.zig"
|
||||
|
||||
@ -10,10 +10,8 @@ const assert = std.debug.assert;
|
||||
const target = builtin.target;
|
||||
const Atomic = std.atomic.Atomic;
|
||||
|
||||
pub const AutoResetEvent = @import("Thread/AutoResetEvent.zig");
|
||||
pub const Futex = @import("Thread/Futex.zig");
|
||||
pub const ResetEvent = @import("Thread/ResetEvent.zig");
|
||||
pub const StaticResetEvent = @import("Thread/StaticResetEvent.zig");
|
||||
pub const Mutex = @import("Thread/Mutex.zig");
|
||||
pub const Semaphore = @import("Thread/Semaphore.zig");
|
||||
pub const Condition = @import("Thread/Condition.zig");
|
||||
@ -1078,17 +1076,13 @@ test "setName, getName" {
|
||||
if (builtin.single_threaded) return error.SkipZigTest;
|
||||
|
||||
const Context = struct {
|
||||
start_wait_event: ResetEvent = undefined,
|
||||
test_done_event: ResetEvent = undefined,
|
||||
start_wait_event: ResetEvent = .{},
|
||||
test_done_event: ResetEvent = .{},
|
||||
thread_done_event: ResetEvent = .{},
|
||||
|
||||
done: std.atomic.Atomic(bool) = std.atomic.Atomic(bool).init(false),
|
||||
thread: Thread = undefined,
|
||||
|
||||
fn init(self: *@This()) !void {
|
||||
try self.start_wait_event.init();
|
||||
try self.test_done_event.init();
|
||||
}
|
||||
|
||||
pub fn run(ctx: *@This()) !void {
|
||||
// Wait for the main thread to have set the thread field in the context.
|
||||
ctx.start_wait_event.wait();
|
||||
@ -1104,16 +1098,14 @@ test "setName, getName" {
|
||||
// Signal our test is done
|
||||
ctx.test_done_event.set();
|
||||
|
||||
while (!ctx.done.load(.SeqCst)) {
|
||||
std.time.sleep(5 * std.time.ns_per_ms);
|
||||
}
|
||||
// wait for the thread to property exit
|
||||
ctx.thread_done_event.wait();
|
||||
}
|
||||
};
|
||||
|
||||
var context = Context{};
|
||||
try context.init();
|
||||
|
||||
var thread = try spawn(.{}, Context.run, .{&context});
|
||||
|
||||
context.thread = thread;
|
||||
context.start_wait_event.set();
|
||||
context.test_done_event.wait();
|
||||
@ -1139,16 +1131,14 @@ test "setName, getName" {
|
||||
},
|
||||
}
|
||||
|
||||
context.done.store(true, .SeqCst);
|
||||
context.thread_done_event.set();
|
||||
thread.join();
|
||||
}
|
||||
|
||||
test "std.Thread" {
|
||||
// Doesn't use testing.refAllDecls() since that would pull in the compileError spinLoopHint.
|
||||
_ = AutoResetEvent;
|
||||
_ = Futex;
|
||||
_ = ResetEvent;
|
||||
_ = StaticResetEvent;
|
||||
_ = Mutex;
|
||||
_ = Semaphore;
|
||||
_ = Condition;
|
||||
@ -1163,9 +1153,7 @@ test "Thread.join" {
|
||||
if (builtin.single_threaded) return error.SkipZigTest;
|
||||
|
||||
var value: usize = 0;
|
||||
var event: ResetEvent = undefined;
|
||||
try event.init();
|
||||
defer event.deinit();
|
||||
var event = ResetEvent{};
|
||||
|
||||
const thread = try Thread.spawn(.{}, testIncrementNotify, .{ &value, &event });
|
||||
thread.join();
|
||||
@ -1177,9 +1165,7 @@ test "Thread.detach" {
|
||||
if (builtin.single_threaded) return error.SkipZigTest;
|
||||
|
||||
var value: usize = 0;
|
||||
var event: ResetEvent = undefined;
|
||||
try event.init();
|
||||
defer event.deinit();
|
||||
var event = ResetEvent{};
|
||||
|
||||
const thread = try Thread.spawn(.{}, testIncrementNotify, .{ &value, &event });
|
||||
thread.detach();
|
||||
|
||||
@ -1,222 +0,0 @@
|
||||
//! Similar to `StaticResetEvent` but on `set()` it also (atomically) does `reset()`.
|
||||
//! Unlike StaticResetEvent, `wait()` can only be called by one thread (MPSC-like).
|
||||
//!
|
||||
//! AutoResetEvent has 3 possible states:
|
||||
//! - UNSET: the AutoResetEvent is currently unset
|
||||
//! - SET: the AutoResetEvent was notified before a wait() was called
|
||||
//! - <StaticResetEvent pointer>: there is an active waiter waiting for a notification.
|
||||
//!
|
||||
//! When attempting to wait:
|
||||
//! if the event is unset, it registers a ResetEvent pointer to be notified when the event is set
|
||||
//! if the event is already set, then it consumes the notification and resets the event.
|
||||
//!
|
||||
//! When attempting to notify:
|
||||
//! if the event is unset, then we set the event
|
||||
//! if theres a waiting ResetEvent, then we unset the event and notify the ResetEvent
|
||||
//!
|
||||
//! This ensures that the event is automatically reset after a wait() has been issued
|
||||
//! and avoids the race condition when using StaticResetEvent in the following scenario:
|
||||
//! thread 1 | thread 2
|
||||
//! StaticResetEvent.wait() |
|
||||
//! | StaticResetEvent.set()
|
||||
//! | StaticResetEvent.set()
|
||||
//! StaticResetEvent.reset() |
|
||||
//! StaticResetEvent.wait() | (missed the second .set() notification above)
|
||||
|
||||
state: usize = UNSET,
|
||||
|
||||
const std = @import("../std.zig");
|
||||
const builtin = @import("builtin");
|
||||
const testing = std.testing;
|
||||
const assert = std.debug.assert;
|
||||
const StaticResetEvent = std.Thread.StaticResetEvent;
|
||||
const AutoResetEvent = @This();
|
||||
|
||||
const UNSET = 0;
|
||||
const SET = 1;
|
||||
|
||||
/// the minimum alignment for the `*StaticResetEvent` created by wait*()
|
||||
const event_align = std.math.max(@alignOf(StaticResetEvent), 2);
|
||||
|
||||
pub fn wait(self: *AutoResetEvent) void {
|
||||
self.waitFor(null) catch unreachable;
|
||||
}
|
||||
|
||||
pub fn timedWait(self: *AutoResetEvent, timeout: u64) error{TimedOut}!void {
|
||||
return self.waitFor(timeout);
|
||||
}
|
||||
|
||||
fn waitFor(self: *AutoResetEvent, timeout: ?u64) error{TimedOut}!void {
|
||||
// lazily initialized StaticResetEvent
|
||||
var reset_event: StaticResetEvent align(event_align) = undefined;
|
||||
var has_reset_event = false;
|
||||
|
||||
var state = @atomicLoad(usize, &self.state, .SeqCst);
|
||||
while (true) {
|
||||
// consume a notification if there is any
|
||||
if (state == SET) {
|
||||
@atomicStore(usize, &self.state, UNSET, .SeqCst);
|
||||
return;
|
||||
}
|
||||
|
||||
// check if theres currently a pending ResetEvent pointer already registered
|
||||
if (state != UNSET) {
|
||||
unreachable; // multiple waiting threads on the same AutoResetEvent
|
||||
}
|
||||
|
||||
// lazily initialize the ResetEvent if it hasn't been already
|
||||
if (!has_reset_event) {
|
||||
has_reset_event = true;
|
||||
reset_event = .{};
|
||||
}
|
||||
|
||||
// Since the AutoResetEvent currently isnt set,
|
||||
// try to register our ResetEvent on it to wait
|
||||
// for a set() call from another thread.
|
||||
if (@cmpxchgWeak(
|
||||
usize,
|
||||
&self.state,
|
||||
UNSET,
|
||||
@ptrToInt(&reset_event),
|
||||
.SeqCst,
|
||||
.SeqCst,
|
||||
)) |new_state| {
|
||||
state = new_state;
|
||||
continue;
|
||||
}
|
||||
|
||||
// if no timeout was specified, then just wait forever
|
||||
const timeout_ns = timeout orelse {
|
||||
reset_event.wait();
|
||||
return;
|
||||
};
|
||||
|
||||
// wait with a timeout and return if signalled via set()
|
||||
switch (reset_event.timedWait(timeout_ns)) {
|
||||
.event_set => return,
|
||||
.timed_out => {},
|
||||
}
|
||||
|
||||
// If we timed out, we need to transition the AutoResetEvent back to UNSET.
|
||||
// If we don't, then when we return, a set() thread could observe a pointer to an invalid ResetEvent.
|
||||
state = @cmpxchgStrong(
|
||||
usize,
|
||||
&self.state,
|
||||
@ptrToInt(&reset_event),
|
||||
UNSET,
|
||||
.SeqCst,
|
||||
.SeqCst,
|
||||
) orelse return error.TimedOut;
|
||||
|
||||
// We didn't manage to unregister ourselves from the state.
|
||||
if (state == SET) {
|
||||
unreachable; // AutoResetEvent notified without waking up the waiting thread
|
||||
} else if (state != UNSET) {
|
||||
unreachable; // multiple waiting threads on the same AutoResetEvent observed when timing out
|
||||
}
|
||||
|
||||
// This menas a set() thread saw our ResetEvent pointer, acquired it, and is trying to wake it up.
|
||||
// We need to wait for it to wake up our ResetEvent before we can return and invalidate it.
|
||||
// We don't return error.TimedOut here as it technically notified us while we were "timing out".
|
||||
reset_event.wait();
|
||||
return;
|
||||
}
|
||||
}
|
||||
|
||||
pub fn set(self: *AutoResetEvent) void {
|
||||
var state = @atomicLoad(usize, &self.state, .SeqCst);
|
||||
while (true) {
|
||||
// If the AutoResetEvent is already set, there is nothing else left to do
|
||||
if (state == SET) {
|
||||
return;
|
||||
}
|
||||
|
||||
// If the AutoResetEvent isn't set,
|
||||
// then try to leave a notification for the wait() thread that we set() it.
|
||||
if (state == UNSET) {
|
||||
state = @cmpxchgWeak(
|
||||
usize,
|
||||
&self.state,
|
||||
UNSET,
|
||||
SET,
|
||||
.SeqCst,
|
||||
.SeqCst,
|
||||
) orelse return;
|
||||
continue;
|
||||
}
|
||||
|
||||
// There is a ResetEvent pointer registered on the AutoResetEvent event thats waiting.
|
||||
// Try to acquire ownership of it so that we can wake it up.
|
||||
// This also resets the AutoResetEvent so that there is no race condition as defined above.
|
||||
if (@cmpxchgWeak(
|
||||
usize,
|
||||
&self.state,
|
||||
state,
|
||||
UNSET,
|
||||
.SeqCst,
|
||||
.SeqCst,
|
||||
)) |new_state| {
|
||||
state = new_state;
|
||||
continue;
|
||||
}
|
||||
|
||||
const reset_event = @intToPtr(*align(event_align) StaticResetEvent, state);
|
||||
reset_event.set();
|
||||
return;
|
||||
}
|
||||
}
|
||||
|
||||
test "basic usage" {
|
||||
// test local code paths
|
||||
{
|
||||
var event = AutoResetEvent{};
|
||||
try testing.expectError(error.TimedOut, event.timedWait(1));
|
||||
event.set();
|
||||
event.wait();
|
||||
}
|
||||
|
||||
// test cross-thread signaling
|
||||
if (builtin.single_threaded)
|
||||
return;
|
||||
|
||||
const Context = struct {
|
||||
value: u128 = 0,
|
||||
in: AutoResetEvent = AutoResetEvent{},
|
||||
out: AutoResetEvent = AutoResetEvent{},
|
||||
|
||||
const Self = @This();
|
||||
|
||||
fn sender(self: *Self) !void {
|
||||
try testing.expect(self.value == 0);
|
||||
self.value = 1;
|
||||
self.out.set();
|
||||
|
||||
self.in.wait();
|
||||
try testing.expect(self.value == 2);
|
||||
self.value = 3;
|
||||
self.out.set();
|
||||
|
||||
self.in.wait();
|
||||
try testing.expect(self.value == 4);
|
||||
}
|
||||
|
||||
fn receiver(self: *Self) !void {
|
||||
self.out.wait();
|
||||
try testing.expect(self.value == 1);
|
||||
self.value = 2;
|
||||
self.in.set();
|
||||
|
||||
self.out.wait();
|
||||
try testing.expect(self.value == 3);
|
||||
self.value = 4;
|
||||
self.in.set();
|
||||
}
|
||||
};
|
||||
|
||||
var context = Context{};
|
||||
const send_thread = try std.Thread.spawn(.{}, Context.sender, .{&context});
|
||||
const recv_thread = try std.Thread.spawn(.{}, Context.receiver, .{&context});
|
||||
|
||||
send_thread.join();
|
||||
recv_thread.join();
|
||||
}
|
||||
@ -809,7 +809,7 @@ const PosixImpl = struct {
|
||||
//
|
||||
// The pending count increment in wait() must also now use SeqCst for the update + this pending load
|
||||
// to be in the same modification order as our load isn't using Release/Acquire to guarantee it.
|
||||
std.atomic.fence(.SeqCst);
|
||||
bucket.pending.fence(.SeqCst);
|
||||
if (bucket.pending.load(.Monotonic) == 0) {
|
||||
return;
|
||||
}
|
||||
|
||||
@ -1,291 +1,281 @@
|
||||
//! A thread-safe resource which supports blocking until signaled.
|
||||
//! This API is for kernel threads, not evented I/O.
|
||||
//! This API requires being initialized at runtime, and initialization
|
||||
//! can fail. Once initialized, the core operations cannot fail.
|
||||
//! If you need an abstraction that cannot fail to be initialized, see
|
||||
//! `std.Thread.StaticResetEvent`. However if you can handle initialization failure,
|
||||
//! it is preferred to use `ResetEvent`.
|
||||
//! ResetEvent is a thread-safe bool which can be set to true/false ("set"/"unset").
|
||||
//! It can also block threads until the "bool" is set with cancellation via timed waits.
|
||||
//! ResetEvent can be statically initialized and is at most `@sizeOf(u64)` large.
|
||||
|
||||
const ResetEvent = @This();
|
||||
const std = @import("../std.zig");
|
||||
const builtin = @import("builtin");
|
||||
const testing = std.testing;
|
||||
const assert = std.debug.assert;
|
||||
const c = std.c;
|
||||
const ResetEvent = @This();
|
||||
|
||||
const os = std.os;
|
||||
const time = std.time;
|
||||
const assert = std.debug.assert;
|
||||
const testing = std.testing;
|
||||
const Atomic = std.atomic.Atomic;
|
||||
const Futex = std.Thread.Futex;
|
||||
|
||||
impl: Impl,
|
||||
impl: Impl = .{},
|
||||
|
||||
pub const Impl = if (builtin.single_threaded)
|
||||
std.Thread.StaticResetEvent.DebugEvent
|
||||
else if (builtin.target.isDarwin())
|
||||
DarwinEvent
|
||||
else if (std.Thread.use_pthreads)
|
||||
PosixEvent
|
||||
/// Returns if the ResetEvent was set().
|
||||
/// Once reset() is called, this returns false until the next set().
|
||||
/// The memory accesses before the set() can be said to happen before isSet() returns true.
|
||||
pub fn isSet(self: *const ResetEvent) bool {
|
||||
return self.impl.isSet();
|
||||
}
|
||||
|
||||
/// Block's the callers thread until the ResetEvent is set().
|
||||
/// This is effectively a more efficient version of `while (!isSet()) {}`.
|
||||
/// The memory accesses before the set() can be said to happen before wait() returns.
|
||||
pub fn wait(self: *ResetEvent) void {
|
||||
self.impl.wait(null) catch |err| switch (err) {
|
||||
error.Timeout => unreachable, // no timeout provided so we shouldn't have timed-out
|
||||
};
|
||||
}
|
||||
|
||||
/// Block's the callers thread until the ResetEvent is set(), or until the corresponding timeout expires.
|
||||
/// If the timeout expires before the ResetEvent is set, `error.Timeout` is returned.
|
||||
/// This is effectively a more efficient version of `while (!isSet()) {}`.
|
||||
/// The memory accesses before the set() can be said to happen before timedWait() returns without error.
|
||||
pub fn timedWait(self: *ResetEvent, timeout_ns: u64) error{Timeout}!void {
|
||||
return self.impl.wait(timeout_ns);
|
||||
}
|
||||
|
||||
/// Marks the ResetEvent as "set" and unblocks any threads in `wait()` or `timedWait()` to observe the new state.
|
||||
/// The ResetEvent says "set" until reset() is called, making future set() calls do nothing semantically.
|
||||
/// The memory accesses before set() can be said to happen before isSet() returns true or wait()/timedWait() return successfully.
|
||||
pub fn set(self: *ResetEvent) void {
|
||||
self.impl.set();
|
||||
}
|
||||
|
||||
/// Unmarks the ResetEvent from its "set" state if set() was called previously.
|
||||
/// It is undefined behavior is reset() is called while threads are blocked in wait() or timedWait().
|
||||
/// Concurrent calls to set(), isSet() and reset() are allowed.
|
||||
pub fn reset(self: *ResetEvent) void {
|
||||
self.impl.reset();
|
||||
}
|
||||
|
||||
const Impl = if (builtin.single_threaded)
|
||||
SingleThreadedImpl
|
||||
else
|
||||
std.Thread.StaticResetEvent.AtomicEvent;
|
||||
FutexImpl;
|
||||
|
||||
pub const InitError = error{SystemResources};
|
||||
const SingleThreadedImpl = struct {
|
||||
is_set: bool = false,
|
||||
|
||||
/// After `init`, it is legal to call any other function.
|
||||
pub fn init(ev: *ResetEvent) InitError!void {
|
||||
return ev.impl.init();
|
||||
}
|
||||
fn isSet(self: *const Impl) bool {
|
||||
return self.is_set;
|
||||
}
|
||||
|
||||
/// This function is not thread-safe.
|
||||
/// After `deinit`, the only legal function to call is `init`.
|
||||
pub fn deinit(ev: *ResetEvent) void {
|
||||
return ev.impl.deinit();
|
||||
}
|
||||
fn wait(self: *Impl, timeout: ?u64) error{Timeout}!void {
|
||||
if (self.isSet()) {
|
||||
return;
|
||||
}
|
||||
|
||||
/// Sets the event if not already set and wakes up all the threads waiting on
|
||||
/// the event. It is safe to call `set` multiple times before calling `wait`.
|
||||
/// However it is illegal to call `set` after `wait` is called until the event
|
||||
/// is `reset`. This function is thread-safe.
|
||||
pub fn set(ev: *ResetEvent) void {
|
||||
return ev.impl.set();
|
||||
}
|
||||
|
||||
/// Resets the event to its original, unset state.
|
||||
/// This function is *not* thread-safe. It is equivalent to calling
|
||||
/// `deinit` followed by `init` but without the possibility of failure.
|
||||
pub fn reset(ev: *ResetEvent) void {
|
||||
return ev.impl.reset();
|
||||
}
|
||||
|
||||
/// Wait for the event to be set by blocking the current thread.
|
||||
/// Thread-safe. No spurious wakeups.
|
||||
/// Upon return from `wait`, the only functions available to be called
|
||||
/// in `ResetEvent` are `reset` and `deinit`.
|
||||
pub fn wait(ev: *ResetEvent) void {
|
||||
return ev.impl.wait();
|
||||
}
|
||||
|
||||
pub const TimedWaitResult = enum { event_set, timed_out };
|
||||
|
||||
/// Wait for the event to be set by blocking the current thread.
|
||||
/// A timeout in nanoseconds can be provided as a hint for how
|
||||
/// long the thread should block on the unset event before returning
|
||||
/// `TimedWaitResult.timed_out`.
|
||||
/// Thread-safe. No precision of timing is guaranteed.
|
||||
/// Upon return from `wait`, the only functions available to be called
|
||||
/// in `ResetEvent` are `reset` and `deinit`.
|
||||
pub fn timedWait(ev: *ResetEvent, timeout_ns: u64) TimedWaitResult {
|
||||
return ev.impl.timedWait(timeout_ns);
|
||||
}
|
||||
|
||||
/// Apple has decided to not support POSIX semaphores, so we go with a
|
||||
/// different approach using Grand Central Dispatch. This API is exposed
|
||||
/// by libSystem so it is guaranteed to be available on all Darwin platforms.
|
||||
pub const DarwinEvent = struct {
|
||||
sem: c.dispatch_semaphore_t = undefined,
|
||||
|
||||
pub fn init(ev: *DarwinEvent) !void {
|
||||
ev.* = .{
|
||||
.sem = c.dispatch_semaphore_create(0) orelse return error.SystemResources,
|
||||
// There are no other threads to wake us up.
|
||||
// So if we wait without a timeout we would never wake up.
|
||||
const timeout_ns = timeout orelse {
|
||||
unreachable; // deadlock detected
|
||||
};
|
||||
|
||||
std.time.sleep(timeout_ns);
|
||||
return error.Timeout;
|
||||
}
|
||||
|
||||
pub fn deinit(ev: *DarwinEvent) void {
|
||||
c.dispatch_release(ev.sem);
|
||||
ev.* = undefined;
|
||||
fn set(self: *Impl) void {
|
||||
self.is_set = true;
|
||||
}
|
||||
|
||||
pub fn set(ev: *DarwinEvent) void {
|
||||
// Empirically this returns the numerical value of the semaphore.
|
||||
_ = c.dispatch_semaphore_signal(ev.sem);
|
||||
}
|
||||
|
||||
pub fn wait(ev: *DarwinEvent) void {
|
||||
assert(c.dispatch_semaphore_wait(ev.sem, c.DISPATCH_TIME_FOREVER) == 0);
|
||||
}
|
||||
|
||||
pub fn timedWait(ev: *DarwinEvent, timeout_ns: u64) TimedWaitResult {
|
||||
const t = c.dispatch_time(c.DISPATCH_TIME_NOW, @intCast(i64, timeout_ns));
|
||||
if (c.dispatch_semaphore_wait(ev.sem, t) != 0) {
|
||||
return .timed_out;
|
||||
} else {
|
||||
return .event_set;
|
||||
}
|
||||
}
|
||||
|
||||
pub fn reset(ev: *DarwinEvent) void {
|
||||
// Keep calling until the semaphore goes back down to 0.
|
||||
while (c.dispatch_semaphore_wait(ev.sem, c.DISPATCH_TIME_NOW) == 0) {}
|
||||
fn reset(self: *Impl) void {
|
||||
self.is_set = false;
|
||||
}
|
||||
};
|
||||
|
||||
/// POSIX semaphores must be initialized at runtime because they are allowed to
|
||||
/// be implemented as file descriptors, in which case initialization would require
|
||||
/// a syscall to open the fd.
|
||||
pub const PosixEvent = struct {
|
||||
sem: c.sem_t = undefined,
|
||||
const FutexImpl = struct {
|
||||
state: Atomic(u32) = Atomic(u32).init(unset),
|
||||
|
||||
pub fn init(ev: *PosixEvent) !void {
|
||||
switch (c.getErrno(c.sem_init(&ev.sem, 0, 0))) {
|
||||
.SUCCESS => return,
|
||||
else => return error.SystemResources,
|
||||
const unset = 0;
|
||||
const waiting = 1;
|
||||
const is_set = 2;
|
||||
|
||||
fn isSet(self: *const Impl) bool {
|
||||
// Acquire barrier ensures memory accesses before set() happen before we return true.
|
||||
return self.state.load(.Acquire) == is_set;
|
||||
}
|
||||
|
||||
fn wait(self: *Impl, timeout: ?u64) error{Timeout}!void {
|
||||
// Outline the slow path to allow isSet() to be inlined
|
||||
if (!self.isSet()) {
|
||||
return self.waitUntilSet(timeout);
|
||||
}
|
||||
}
|
||||
|
||||
pub fn deinit(ev: *PosixEvent) void {
|
||||
assert(c.sem_destroy(&ev.sem) == 0);
|
||||
ev.* = undefined;
|
||||
}
|
||||
fn waitUntilSet(self: *Impl, timeout: ?u64) error{Timeout}!void {
|
||||
@setCold(true);
|
||||
|
||||
pub fn set(ev: *PosixEvent) void {
|
||||
assert(c.sem_post(&ev.sem) == 0);
|
||||
}
|
||||
// Try to set the state from `unset` to `waiting` to indicate
|
||||
// to the set() thread that others are blocked on the ResetEvent.
|
||||
// We avoid using any strict barriers until the end when we know the ResetEvent is set.
|
||||
var state = self.state.load(.Monotonic);
|
||||
if (state == unset) {
|
||||
state = self.state.compareAndSwap(state, waiting, .Monotonic, .Monotonic) orelse waiting;
|
||||
}
|
||||
|
||||
pub fn wait(ev: *PosixEvent) void {
|
||||
while (true) {
|
||||
switch (c.getErrno(c.sem_wait(&ev.sem))) {
|
||||
.SUCCESS => return,
|
||||
.INTR => continue,
|
||||
.INVAL => unreachable,
|
||||
else => unreachable,
|
||||
// Wait until the ResetEvent is set since the state is waiting.
|
||||
if (state == waiting) {
|
||||
var futex_deadline = Futex.Deadline.init(timeout);
|
||||
while (true) {
|
||||
const wait_result = futex_deadline.wait(&self.state, waiting);
|
||||
|
||||
// Check if the ResetEvent was set before possibly reporting error.Timeout below.
|
||||
state = self.state.load(.Monotonic);
|
||||
if (state != waiting) {
|
||||
break;
|
||||
}
|
||||
|
||||
try wait_result;
|
||||
}
|
||||
}
|
||||
|
||||
// Acquire barrier ensures memory accesses before set() happen before we return.
|
||||
assert(state == is_set);
|
||||
self.state.fence(.Acquire);
|
||||
}
|
||||
|
||||
pub fn timedWait(ev: *PosixEvent, timeout_ns: u64) TimedWaitResult {
|
||||
var ts: os.timespec = undefined;
|
||||
var timeout_abs = timeout_ns;
|
||||
os.clock_gettime(os.CLOCK.REALTIME, &ts) catch return .timed_out;
|
||||
timeout_abs += @intCast(u64, ts.tv_sec) * time.ns_per_s;
|
||||
timeout_abs += @intCast(u64, ts.tv_nsec);
|
||||
ts.tv_sec = @intCast(@TypeOf(ts.tv_sec), @divFloor(timeout_abs, time.ns_per_s));
|
||||
ts.tv_nsec = @intCast(@TypeOf(ts.tv_nsec), @mod(timeout_abs, time.ns_per_s));
|
||||
while (true) {
|
||||
switch (c.getErrno(c.sem_timedwait(&ev.sem, &ts))) {
|
||||
.SUCCESS => return .event_set,
|
||||
.INTR => continue,
|
||||
.INVAL => unreachable,
|
||||
.TIMEDOUT => return .timed_out,
|
||||
else => unreachable,
|
||||
}
|
||||
fn set(self: *Impl) void {
|
||||
// Quick check if the ResetEvent is already set before doing the atomic swap below.
|
||||
// set() could be getting called quite often and multiple threads calling swap() increases contention unnecessarily.
|
||||
if (self.state.load(.Monotonic) == is_set) {
|
||||
return;
|
||||
}
|
||||
|
||||
// Mark the ResetEvent as set and unblock all waiters waiting on it if any.
|
||||
// Release barrier ensures memory accesses before set() happen before the ResetEvent is observed to be "set".
|
||||
if (self.state.swap(is_set, .Release) == waiting) {
|
||||
Futex.wake(&self.state, std.math.maxInt(u32));
|
||||
}
|
||||
}
|
||||
|
||||
pub fn reset(ev: *PosixEvent) void {
|
||||
while (true) {
|
||||
switch (c.getErrno(c.sem_trywait(&ev.sem))) {
|
||||
.SUCCESS => continue, // Need to make it go to zero.
|
||||
.INTR => continue,
|
||||
.INVAL => unreachable,
|
||||
.AGAIN => return, // The semaphore currently has the value zero.
|
||||
else => unreachable,
|
||||
}
|
||||
}
|
||||
fn reset(self: *Impl) void {
|
||||
self.state.store(unset, .Monotonic);
|
||||
}
|
||||
};
|
||||
|
||||
test "basic usage" {
|
||||
var event: ResetEvent = undefined;
|
||||
try event.init();
|
||||
defer event.deinit();
|
||||
test "ResetEvent - smoke test" {
|
||||
// make sure the event is unset
|
||||
var event = ResetEvent{};
|
||||
try testing.expectEqual(false, event.isSet());
|
||||
|
||||
// test event setting
|
||||
// make sure the event gets set
|
||||
event.set();
|
||||
try testing.expectEqual(true, event.isSet());
|
||||
|
||||
// test event resetting
|
||||
// make sure the event gets unset again
|
||||
event.reset();
|
||||
try testing.expectEqual(false, event.isSet());
|
||||
|
||||
// test event waiting (non-blocking)
|
||||
// waits should timeout as there's no other thread to set the event
|
||||
try testing.expectError(error.Timeout, event.timedWait(0));
|
||||
try testing.expectError(error.Timeout, event.timedWait(std.time.ns_per_ms));
|
||||
|
||||
// set the event again and make sure waits complete
|
||||
event.set();
|
||||
event.wait();
|
||||
event.reset();
|
||||
try event.timedWait(std.time.ns_per_ms);
|
||||
try testing.expectEqual(true, event.isSet());
|
||||
}
|
||||
|
||||
event.set();
|
||||
try testing.expectEqual(TimedWaitResult.event_set, event.timedWait(1));
|
||||
|
||||
// test cross-thread signaling
|
||||
if (builtin.single_threaded)
|
||||
return;
|
||||
test "ResetEvent - signaling" {
|
||||
// This test requires spawning threads
|
||||
if (builtin.single_threaded) {
|
||||
return error.SkipZigTest;
|
||||
}
|
||||
|
||||
const Context = struct {
|
||||
const Self = @This();
|
||||
in: ResetEvent = .{},
|
||||
out: ResetEvent = .{},
|
||||
value: usize = 0,
|
||||
|
||||
value: u128,
|
||||
in: ResetEvent,
|
||||
out: ResetEvent,
|
||||
|
||||
fn init(self: *Self) !void {
|
||||
self.* = .{
|
||||
.value = 0,
|
||||
.in = undefined,
|
||||
.out = undefined,
|
||||
};
|
||||
try self.in.init();
|
||||
try self.out.init();
|
||||
}
|
||||
|
||||
fn deinit(self: *Self) void {
|
||||
self.in.deinit();
|
||||
self.out.deinit();
|
||||
self.* = undefined;
|
||||
}
|
||||
|
||||
fn sender(self: *Self) !void {
|
||||
// update value and signal input
|
||||
try testing.expect(self.value == 0);
|
||||
self.value = 1;
|
||||
self.in.set();
|
||||
|
||||
// wait for receiver to update value and signal output
|
||||
self.out.wait();
|
||||
try testing.expect(self.value == 2);
|
||||
|
||||
// update value and signal final input
|
||||
self.value = 3;
|
||||
self.in.set();
|
||||
}
|
||||
|
||||
fn receiver(self: *Self) !void {
|
||||
// wait for sender to update value and signal input
|
||||
fn input(self: *@This()) !void {
|
||||
// wait for the value to become 1
|
||||
self.in.wait();
|
||||
try testing.expect(self.value == 1);
|
||||
|
||||
// update value and signal output
|
||||
self.in.reset();
|
||||
try testing.expectEqual(self.value, 1);
|
||||
|
||||
// bump the value and wake up output()
|
||||
self.value = 2;
|
||||
self.out.set();
|
||||
|
||||
// wait for sender to update value and signal final input
|
||||
// wait for output to receive 2, bump the value and wake us up with 3
|
||||
self.in.wait();
|
||||
try testing.expect(self.value == 3);
|
||||
}
|
||||
self.in.reset();
|
||||
try testing.expectEqual(self.value, 3);
|
||||
|
||||
fn sleeper(self: *Self) void {
|
||||
self.in.set();
|
||||
time.sleep(time.ns_per_ms * 2);
|
||||
self.value = 5;
|
||||
// bump the value and wake up output() for it to see 4
|
||||
self.value = 4;
|
||||
self.out.set();
|
||||
}
|
||||
|
||||
fn timedWaiter(self: *Self) !void {
|
||||
self.in.wait();
|
||||
try testing.expectEqual(TimedWaitResult.timed_out, self.out.timedWait(time.ns_per_us));
|
||||
try self.out.timedWait(time.ns_per_ms * 100);
|
||||
try testing.expect(self.value == 5);
|
||||
fn output(self: *@This()) !void {
|
||||
// start with 0 and bump the value for input to see 1
|
||||
try testing.expectEqual(self.value, 0);
|
||||
self.value = 1;
|
||||
self.in.set();
|
||||
|
||||
// wait for input to receive 1, bump the value to 2 and wake us up
|
||||
self.out.wait();
|
||||
self.out.reset();
|
||||
try testing.expectEqual(self.value, 2);
|
||||
|
||||
// bump the value to 3 for input to see (rhymes)
|
||||
self.value = 3;
|
||||
self.in.set();
|
||||
|
||||
// wait for input to bump the value to 4 and receive no more (rhymes)
|
||||
self.out.wait();
|
||||
self.out.reset();
|
||||
try testing.expectEqual(self.value, 4);
|
||||
}
|
||||
};
|
||||
|
||||
var context: Context = undefined;
|
||||
try context.init();
|
||||
defer context.deinit();
|
||||
const receiver = try std.Thread.spawn(.{}, Context.receiver, .{&context});
|
||||
defer receiver.join();
|
||||
try context.sender();
|
||||
var ctx = Context{};
|
||||
|
||||
if (false) {
|
||||
// I have now observed this fail on macOS, Windows, and Linux.
|
||||
// https://github.com/ziglang/zig/issues/7009
|
||||
var timed = Context.init();
|
||||
defer timed.deinit();
|
||||
const sleeper = try std.Thread.spawn(.{}, Context.sleeper, .{&timed});
|
||||
defer sleeper.join();
|
||||
try timed.timedWaiter();
|
||||
}
|
||||
const thread = try std.Thread.spawn(.{}, Context.output, .{&ctx});
|
||||
defer thread.join();
|
||||
|
||||
try ctx.input();
|
||||
}
|
||||
|
||||
test "ResetEvent - broadcast" {
|
||||
// This test requires spawning threads
|
||||
if (builtin.single_threaded) {
|
||||
return error.SkipZigTest;
|
||||
}
|
||||
|
||||
const num_threads = 10;
|
||||
const Barrier = struct {
|
||||
event: ResetEvent = .{},
|
||||
counter: Atomic(usize) = Atomic(usize).init(num_threads),
|
||||
|
||||
fn wait(self: *@This()) void {
|
||||
if (self.counter.fetchSub(1, .AcqRel) == 1) {
|
||||
self.event.set();
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
const Context = struct {
|
||||
start_barrier: Barrier = .{},
|
||||
finish_barrier: Barrier = .{},
|
||||
|
||||
fn run(self: *@This()) void {
|
||||
self.start_barrier.wait();
|
||||
self.finish_barrier.wait();
|
||||
}
|
||||
};
|
||||
|
||||
var ctx = Context{};
|
||||
var threads: [num_threads - 1]std.Thread = undefined;
|
||||
|
||||
for (threads) |*t| t.* = try std.Thread.spawn(.{}, Context.run, .{&ctx});
|
||||
defer for (threads) |t| t.join();
|
||||
|
||||
ctx.run();
|
||||
}
|
||||
|
||||
@ -1,395 +0,0 @@
|
||||
//! A thread-safe resource which supports blocking until signaled.
|
||||
//! This API is for kernel threads, not evented I/O.
|
||||
//! This API is statically initializable. It cannot fail to be initialized
|
||||
//! and it requires no deinitialization. The downside is that it may not
|
||||
//! integrate as cleanly into other synchronization APIs, or, in a worst case,
|
||||
//! may be forced to fall back on spin locking. As a rule of thumb, prefer
|
||||
//! to use `std.Thread.ResetEvent` when possible, and use `StaticResetEvent` when
|
||||
//! the logic needs stronger API guarantees.
|
||||
|
||||
const std = @import("../std.zig");
|
||||
const builtin = @import("builtin");
|
||||
const StaticResetEvent = @This();
|
||||
const assert = std.debug.assert;
|
||||
const os = std.os;
|
||||
const time = std.time;
|
||||
const linux = std.os.linux;
|
||||
const windows = std.os.windows;
|
||||
const testing = std.testing;
|
||||
|
||||
impl: Impl = .{},
|
||||
|
||||
pub const Impl = if (builtin.single_threaded)
|
||||
DebugEvent
|
||||
else
|
||||
AtomicEvent;
|
||||
|
||||
/// Sets the event if not already set and wakes up all the threads waiting on
|
||||
/// the event. It is safe to call `set` multiple times before calling `wait`.
|
||||
/// However it is illegal to call `set` after `wait` is called until the event
|
||||
/// is `reset`. This function is thread-safe.
|
||||
pub fn set(ev: *StaticResetEvent) void {
|
||||
return ev.impl.set();
|
||||
}
|
||||
|
||||
/// Wait for the event to be set by blocking the current thread.
|
||||
/// Thread-safe. No spurious wakeups.
|
||||
/// Upon return from `wait`, the only function available to be called
|
||||
/// in `StaticResetEvent` is `reset`.
|
||||
pub fn wait(ev: *StaticResetEvent) void {
|
||||
return ev.impl.wait();
|
||||
}
|
||||
|
||||
/// Resets the event to its original, unset state.
|
||||
/// This function is *not* thread-safe. It is equivalent to calling
|
||||
/// `deinit` followed by `init` but without the possibility of failure.
|
||||
pub fn reset(ev: *StaticResetEvent) void {
|
||||
return ev.impl.reset();
|
||||
}
|
||||
|
||||
pub const TimedWaitResult = std.Thread.ResetEvent.TimedWaitResult;
|
||||
|
||||
/// Wait for the event to be set by blocking the current thread.
|
||||
/// A timeout in nanoseconds can be provided as a hint for how
|
||||
/// long the thread should block on the unset event before returning
|
||||
/// `TimedWaitResult.timed_out`.
|
||||
/// Thread-safe. No precision of timing is guaranteed.
|
||||
/// Upon return from `timedWait`, the only function available to be called
|
||||
/// in `StaticResetEvent` is `reset`.
|
||||
pub fn timedWait(ev: *StaticResetEvent, timeout_ns: u64) TimedWaitResult {
|
||||
return ev.impl.timedWait(timeout_ns);
|
||||
}
|
||||
|
||||
/// For single-threaded builds, we use this to detect deadlocks.
|
||||
/// In unsafe modes this ends up being no-ops.
|
||||
pub const DebugEvent = struct {
|
||||
state: State = State.unset,
|
||||
|
||||
const State = enum {
|
||||
unset,
|
||||
set,
|
||||
waited,
|
||||
};
|
||||
|
||||
/// This function is provided so that this type can be re-used inside
|
||||
/// `std.Thread.ResetEvent`.
|
||||
pub fn init(ev: *DebugEvent) void {
|
||||
ev.* = .{};
|
||||
}
|
||||
|
||||
/// This function is provided so that this type can be re-used inside
|
||||
/// `std.Thread.ResetEvent`.
|
||||
pub fn deinit(ev: *DebugEvent) void {
|
||||
ev.* = undefined;
|
||||
}
|
||||
|
||||
pub fn set(ev: *DebugEvent) void {
|
||||
switch (ev.state) {
|
||||
.unset => ev.state = .set,
|
||||
.set => {},
|
||||
.waited => unreachable, // Not allowed to call `set` until `reset`.
|
||||
}
|
||||
}
|
||||
|
||||
pub fn wait(ev: *DebugEvent) void {
|
||||
switch (ev.state) {
|
||||
.unset => unreachable, // Deadlock detected.
|
||||
.set => return,
|
||||
.waited => unreachable, // Not allowed to call `wait` until `reset`.
|
||||
}
|
||||
}
|
||||
|
||||
pub fn timedWait(ev: *DebugEvent, timeout: u64) TimedWaitResult {
|
||||
_ = timeout;
|
||||
switch (ev.state) {
|
||||
.unset => return .timed_out,
|
||||
.set => return .event_set,
|
||||
.waited => unreachable, // Not allowed to call `wait` until `reset`.
|
||||
}
|
||||
}
|
||||
|
||||
pub fn reset(ev: *DebugEvent) void {
|
||||
ev.state = .unset;
|
||||
}
|
||||
};
|
||||
|
||||
pub const AtomicEvent = struct {
|
||||
waiters: u32 = 0,
|
||||
|
||||
const WAKE = 1 << 0;
|
||||
const WAIT = 1 << 1;
|
||||
|
||||
/// This function is provided so that this type can be re-used inside
|
||||
/// `std.Thread.ResetEvent`.
|
||||
pub fn init(ev: *AtomicEvent) void {
|
||||
ev.* = .{};
|
||||
}
|
||||
|
||||
/// This function is provided so that this type can be re-used inside
|
||||
/// `std.Thread.ResetEvent`.
|
||||
pub fn deinit(ev: *AtomicEvent) void {
|
||||
ev.* = undefined;
|
||||
}
|
||||
|
||||
pub fn set(ev: *AtomicEvent) void {
|
||||
const waiters = @atomicRmw(u32, &ev.waiters, .Xchg, WAKE, .Release);
|
||||
if (waiters >= WAIT) {
|
||||
return Futex.wake(&ev.waiters, waiters >> 1);
|
||||
}
|
||||
}
|
||||
|
||||
pub fn wait(ev: *AtomicEvent) void {
|
||||
switch (ev.timedWait(null)) {
|
||||
.timed_out => unreachable,
|
||||
.event_set => return,
|
||||
}
|
||||
}
|
||||
|
||||
pub fn timedWait(ev: *AtomicEvent, timeout: ?u64) TimedWaitResult {
|
||||
var waiters = @atomicLoad(u32, &ev.waiters, .Acquire);
|
||||
while (waiters != WAKE) {
|
||||
waiters = @cmpxchgWeak(u32, &ev.waiters, waiters, waiters + WAIT, .Acquire, .Acquire) orelse {
|
||||
if (Futex.wait(&ev.waiters, timeout)) |_| {
|
||||
return .event_set;
|
||||
} else |_| {
|
||||
return .timed_out;
|
||||
}
|
||||
};
|
||||
}
|
||||
return .event_set;
|
||||
}
|
||||
|
||||
pub fn reset(ev: *AtomicEvent) void {
|
||||
@atomicStore(u32, &ev.waiters, 0, .Monotonic);
|
||||
}
|
||||
|
||||
pub const Futex = switch (builtin.os.tag) {
|
||||
.windows => WindowsFutex,
|
||||
.linux => LinuxFutex,
|
||||
else => SpinFutex,
|
||||
};
|
||||
|
||||
pub const SpinFutex = struct {
|
||||
fn wake(waiters: *u32, wake_count: u32) void {
|
||||
_ = waiters;
|
||||
_ = wake_count;
|
||||
}
|
||||
|
||||
fn wait(waiters: *u32, timeout: ?u64) !void {
|
||||
var timer: time.Timer = undefined;
|
||||
if (timeout != null)
|
||||
timer = time.Timer.start() catch return error.TimedOut;
|
||||
|
||||
while (@atomicLoad(u32, waiters, .Acquire) != WAKE) {
|
||||
std.Thread.yield() catch std.atomic.spinLoopHint();
|
||||
if (timeout) |timeout_ns| {
|
||||
if (timer.read() >= timeout_ns)
|
||||
return error.TimedOut;
|
||||
}
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
pub const LinuxFutex = struct {
|
||||
fn wake(waiters: *u32, wake_count: u32) void {
|
||||
_ = wake_count;
|
||||
const waiting = std.math.maxInt(i32); // wake_count
|
||||
const ptr = @ptrCast(*const i32, waiters);
|
||||
const rc = linux.futex_wake(ptr, linux.FUTEX.WAKE | linux.FUTEX.PRIVATE_FLAG, waiting);
|
||||
assert(linux.getErrno(rc) == .SUCCESS);
|
||||
}
|
||||
|
||||
fn wait(waiters: *u32, timeout: ?u64) !void {
|
||||
var ts: linux.timespec = undefined;
|
||||
var ts_ptr: ?*linux.timespec = null;
|
||||
if (timeout) |timeout_ns| {
|
||||
ts_ptr = &ts;
|
||||
ts.tv_sec = @intCast(isize, timeout_ns / time.ns_per_s);
|
||||
ts.tv_nsec = @intCast(isize, timeout_ns % time.ns_per_s);
|
||||
}
|
||||
|
||||
while (true) {
|
||||
const waiting = @atomicLoad(u32, waiters, .Acquire);
|
||||
if (waiting == WAKE)
|
||||
return;
|
||||
const expected = @intCast(i32, waiting);
|
||||
const ptr = @ptrCast(*const i32, waiters);
|
||||
const rc = linux.futex_wait(ptr, linux.FUTEX.WAIT | linux.FUTEX.PRIVATE_FLAG, expected, ts_ptr);
|
||||
switch (linux.getErrno(rc)) {
|
||||
.SUCCESS => continue,
|
||||
.TIMEDOUT => return error.TimedOut,
|
||||
.INTR => continue,
|
||||
.AGAIN => return,
|
||||
else => unreachable,
|
||||
}
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
pub const WindowsFutex = struct {
|
||||
pub fn wake(waiters: *u32, wake_count: u32) void {
|
||||
const handle = getEventHandle() orelse return SpinFutex.wake(waiters, wake_count);
|
||||
const key = @ptrCast(*const anyopaque, waiters);
|
||||
|
||||
var waiting = wake_count;
|
||||
while (waiting != 0) : (waiting -= 1) {
|
||||
const rc = windows.ntdll.NtReleaseKeyedEvent(handle, key, windows.FALSE, null);
|
||||
assert(rc == .SUCCESS);
|
||||
}
|
||||
}
|
||||
|
||||
pub fn wait(waiters: *u32, timeout: ?u64) !void {
|
||||
const handle = getEventHandle() orelse return SpinFutex.wait(waiters, timeout);
|
||||
const key = @ptrCast(*const anyopaque, waiters);
|
||||
|
||||
// NT uses timeouts in units of 100ns with negative value being relative
|
||||
var timeout_ptr: ?*windows.LARGE_INTEGER = null;
|
||||
var timeout_value: windows.LARGE_INTEGER = undefined;
|
||||
if (timeout) |timeout_ns| {
|
||||
timeout_ptr = &timeout_value;
|
||||
timeout_value = -@intCast(windows.LARGE_INTEGER, timeout_ns / 100);
|
||||
}
|
||||
|
||||
// NtWaitForKeyedEvent doesnt have spurious wake-ups
|
||||
var rc = windows.ntdll.NtWaitForKeyedEvent(handle, key, windows.FALSE, timeout_ptr);
|
||||
switch (rc) {
|
||||
.TIMEOUT => {
|
||||
// update the wait count to signal that we're not waiting anymore.
|
||||
// if the .set() thread already observed that we are, perform a
|
||||
// matching NtWaitForKeyedEvent so that the .set() thread doesn't
|
||||
// deadlock trying to run NtReleaseKeyedEvent above.
|
||||
var waiting = @atomicLoad(u32, waiters, .Monotonic);
|
||||
while (true) {
|
||||
if (waiting == WAKE) {
|
||||
rc = windows.ntdll.NtWaitForKeyedEvent(handle, key, windows.FALSE, null);
|
||||
assert(rc == windows.NTSTATUS.WAIT_0);
|
||||
break;
|
||||
} else {
|
||||
waiting = @cmpxchgWeak(u32, waiters, waiting, waiting - WAIT, .Acquire, .Monotonic) orelse break;
|
||||
continue;
|
||||
}
|
||||
}
|
||||
return error.TimedOut;
|
||||
},
|
||||
windows.NTSTATUS.WAIT_0 => {},
|
||||
else => unreachable,
|
||||
}
|
||||
}
|
||||
|
||||
var event_handle: usize = EMPTY;
|
||||
const EMPTY = ~@as(usize, 0);
|
||||
const LOADING = EMPTY - 1;
|
||||
|
||||
pub fn getEventHandle() ?windows.HANDLE {
|
||||
var handle = @atomicLoad(usize, &event_handle, .Monotonic);
|
||||
while (true) {
|
||||
switch (handle) {
|
||||
EMPTY => handle = @cmpxchgWeak(usize, &event_handle, EMPTY, LOADING, .Acquire, .Monotonic) orelse {
|
||||
const handle_ptr = @ptrCast(*windows.HANDLE, &handle);
|
||||
const access_mask = windows.GENERIC_READ | windows.GENERIC_WRITE;
|
||||
if (windows.ntdll.NtCreateKeyedEvent(handle_ptr, access_mask, null, 0) != .SUCCESS)
|
||||
handle = 0;
|
||||
@atomicStore(usize, &event_handle, handle, .Monotonic);
|
||||
return @intToPtr(?windows.HANDLE, handle);
|
||||
},
|
||||
LOADING => {
|
||||
std.Thread.yield() catch std.atomic.spinLoopHint();
|
||||
handle = @atomicLoad(usize, &event_handle, .Monotonic);
|
||||
},
|
||||
else => {
|
||||
return @intToPtr(?windows.HANDLE, handle);
|
||||
},
|
||||
}
|
||||
}
|
||||
}
|
||||
};
|
||||
};
|
||||
|
||||
test "basic usage" {
|
||||
var event = StaticResetEvent{};
|
||||
|
||||
// test event setting
|
||||
event.set();
|
||||
|
||||
// test event resetting
|
||||
event.reset();
|
||||
|
||||
// test event waiting (non-blocking)
|
||||
event.set();
|
||||
event.wait();
|
||||
event.reset();
|
||||
|
||||
event.set();
|
||||
try testing.expectEqual(TimedWaitResult.event_set, event.timedWait(1));
|
||||
|
||||
// test cross-thread signaling
|
||||
if (builtin.single_threaded)
|
||||
return;
|
||||
|
||||
const Context = struct {
|
||||
const Self = @This();
|
||||
|
||||
value: u128 = 0,
|
||||
in: StaticResetEvent = .{},
|
||||
out: StaticResetEvent = .{},
|
||||
|
||||
fn sender(self: *Self) !void {
|
||||
// update value and signal input
|
||||
try testing.expect(self.value == 0);
|
||||
self.value = 1;
|
||||
self.in.set();
|
||||
|
||||
// wait for receiver to update value and signal output
|
||||
self.out.wait();
|
||||
try testing.expect(self.value == 2);
|
||||
|
||||
// update value and signal final input
|
||||
self.value = 3;
|
||||
self.in.set();
|
||||
}
|
||||
|
||||
fn receiver(self: *Self) !void {
|
||||
// wait for sender to update value and signal input
|
||||
self.in.wait();
|
||||
try testing.expect(self.value == 1);
|
||||
|
||||
// update value and signal output
|
||||
self.in.reset();
|
||||
self.value = 2;
|
||||
self.out.set();
|
||||
|
||||
// wait for sender to update value and signal final input
|
||||
self.in.wait();
|
||||
try testing.expect(self.value == 3);
|
||||
}
|
||||
|
||||
fn sleeper(self: *Self) void {
|
||||
self.in.set();
|
||||
time.sleep(time.ns_per_ms * 2);
|
||||
self.value = 5;
|
||||
self.out.set();
|
||||
}
|
||||
|
||||
fn timedWaiter(self: *Self) !void {
|
||||
self.in.wait();
|
||||
try testing.expectEqual(TimedWaitResult.timed_out, self.out.timedWait(time.ns_per_us));
|
||||
try self.out.timedWait(time.ns_per_ms * 100);
|
||||
try testing.expect(self.value == 5);
|
||||
}
|
||||
};
|
||||
|
||||
var context = Context{};
|
||||
const receiver = try std.Thread.spawn(.{}, Context.receiver, .{&context});
|
||||
defer receiver.join();
|
||||
try context.sender();
|
||||
|
||||
if (false) {
|
||||
// I have now observed this fail on macOS, Windows, and Linux.
|
||||
// https://github.com/ziglang/zig/issues/7009
|
||||
var timed = Context.init();
|
||||
defer timed.deinit();
|
||||
const sleeper = try std.Thread.spawn(.{}, Context.sleeper, .{&timed});
|
||||
defer sleeper.join();
|
||||
try timed.timedWaiter();
|
||||
}
|
||||
}
|
||||
@ -14,19 +14,66 @@ pub fn Atomic(comptime T: type) type {
|
||||
return .{ .value = value };
|
||||
}
|
||||
|
||||
/// Perform an atomic fence which uses the atomic value as a hint for the modification order.
|
||||
/// Use this when you want to imply a fence on an atomic variable without necessarily performing a memory access.
|
||||
///
|
||||
/// Example:
|
||||
/// ```
|
||||
/// const RefCount = struct {
|
||||
/// count: Atomic(usize),
|
||||
/// dropFn: *const fn(*RefCount) void,
|
||||
///
|
||||
/// fn ref(self: *RefCount) void {
|
||||
/// _ = self.count.fetchAdd(1, .Monotonic); // no ordering necessary, just updating a counter
|
||||
/// }
|
||||
///
|
||||
/// fn unref(self: *RefCount) void {
|
||||
/// // Release ensures code before unref() happens-before the count is decremented as dropFn could be called by then.
|
||||
/// if (self.count.fetchSub(1, .Release)) {
|
||||
/// // Acquire ensures count decrement and code before previous unrefs()s happens-before we call dropFn below.
|
||||
/// // NOTE: another alterative is to use .AcqRel on the fetchSub count decrement but it's extra barrier in possibly hot path.
|
||||
/// self.count.fence(.Acquire);
|
||||
/// (self.dropFn)(self);
|
||||
/// }
|
||||
/// }
|
||||
/// };
|
||||
/// ```
|
||||
pub inline fn fence(self: *Self, comptime ordering: Ordering) void {
|
||||
// LLVM's ThreadSanitizer doesn't support the normal fences so we specialize for it.
|
||||
if (builtin.sanitize_thread) {
|
||||
const tsan = struct {
|
||||
extern "c" fn __tsan_acquire(addr: *anyopaque) void;
|
||||
extern "c" fn __tsan_release(addr: *anyopaque) void;
|
||||
};
|
||||
|
||||
const addr = @ptrCast(*anyopaque, self);
|
||||
return switch (ordering) {
|
||||
.Unordered, .Monotonic => @compileError(@tagName(ordering) ++ " only applies to atomic loads and stores"),
|
||||
.Acquire => tsan.__tsan_acquire(addr),
|
||||
.Release => tsan.__tsan_release(addr),
|
||||
.AcqRel, .SeqCst => {
|
||||
tsan.__tsan_acquire(addr);
|
||||
tsan.__tsan_release(addr);
|
||||
},
|
||||
};
|
||||
}
|
||||
|
||||
return std.atomic.fence(ordering);
|
||||
}
|
||||
|
||||
/// Non-atomically load from the atomic value without synchronization.
|
||||
/// Care must be taken to avoid data-races when interacting with other atomic operations.
|
||||
pub fn loadUnchecked(self: Self) T {
|
||||
pub inline fn loadUnchecked(self: Self) T {
|
||||
return self.value;
|
||||
}
|
||||
|
||||
/// Non-atomically store to the atomic value without synchronization.
|
||||
/// Care must be taken to avoid data-races when interacting with other atomic operations.
|
||||
pub fn storeUnchecked(self: *Self, value: T) void {
|
||||
pub inline fn storeUnchecked(self: *Self, value: T) void {
|
||||
self.value = value;
|
||||
}
|
||||
|
||||
pub fn load(self: *const Self, comptime ordering: Ordering) T {
|
||||
pub inline fn load(self: *const Self, comptime ordering: Ordering) T {
|
||||
return switch (ordering) {
|
||||
.AcqRel => @compileError(@tagName(ordering) ++ " implies " ++ @tagName(Ordering.Release) ++ " which is only allowed on atomic stores"),
|
||||
.Release => @compileError(@tagName(ordering) ++ " is only allowed on atomic stores"),
|
||||
@ -34,7 +81,7 @@ pub fn Atomic(comptime T: type) type {
|
||||
};
|
||||
}
|
||||
|
||||
pub fn store(self: *Self, value: T, comptime ordering: Ordering) void {
|
||||
pub inline fn store(self: *Self, value: T, comptime ordering: Ordering) void {
|
||||
return switch (ordering) {
|
||||
.AcqRel => @compileError(@tagName(ordering) ++ " implies " ++ @tagName(Ordering.Acquire) ++ " which is only allowed on atomic loads"),
|
||||
.Acquire => @compileError(@tagName(ordering) ++ " is only allowed on atomic loads"),
|
||||
@ -189,21 +236,21 @@ pub fn Atomic(comptime T: type) type {
|
||||
.Set => asm volatile ("lock btsw %[bit], %[ptr]"
|
||||
// LLVM doesn't support u1 flag register return values
|
||||
: [result] "={@ccc}" (-> u8),
|
||||
: [ptr] "*p" (&self.value),
|
||||
: [ptr] "*m" (&self.value),
|
||||
[bit] "X" (@as(T, bit)),
|
||||
: "cc", "memory"
|
||||
),
|
||||
.Reset => asm volatile ("lock btrw %[bit], %[ptr]"
|
||||
// LLVM doesn't support u1 flag register return values
|
||||
: [result] "={@ccc}" (-> u8),
|
||||
: [ptr] "*p" (&self.value),
|
||||
: [ptr] "*m" (&self.value),
|
||||
[bit] "X" (@as(T, bit)),
|
||||
: "cc", "memory"
|
||||
),
|
||||
.Toggle => asm volatile ("lock btcw %[bit], %[ptr]"
|
||||
// LLVM doesn't support u1 flag register return values
|
||||
: [result] "={@ccc}" (-> u8),
|
||||
: [ptr] "*p" (&self.value),
|
||||
: [ptr] "*m" (&self.value),
|
||||
[bit] "X" (@as(T, bit)),
|
||||
: "cc", "memory"
|
||||
),
|
||||
@ -212,21 +259,21 @@ pub fn Atomic(comptime T: type) type {
|
||||
.Set => asm volatile ("lock btsl %[bit], %[ptr]"
|
||||
// LLVM doesn't support u1 flag register return values
|
||||
: [result] "={@ccc}" (-> u8),
|
||||
: [ptr] "*p" (&self.value),
|
||||
: [ptr] "*m" (&self.value),
|
||||
[bit] "X" (@as(T, bit)),
|
||||
: "cc", "memory"
|
||||
),
|
||||
.Reset => asm volatile ("lock btrl %[bit], %[ptr]"
|
||||
// LLVM doesn't support u1 flag register return values
|
||||
: [result] "={@ccc}" (-> u8),
|
||||
: [ptr] "*p" (&self.value),
|
||||
: [ptr] "*m" (&self.value),
|
||||
[bit] "X" (@as(T, bit)),
|
||||
: "cc", "memory"
|
||||
),
|
||||
.Toggle => asm volatile ("lock btcl %[bit], %[ptr]"
|
||||
// LLVM doesn't support u1 flag register return values
|
||||
: [result] "={@ccc}" (-> u8),
|
||||
: [ptr] "*p" (&self.value),
|
||||
: [ptr] "*m" (&self.value),
|
||||
[bit] "X" (@as(T, bit)),
|
||||
: "cc", "memory"
|
||||
),
|
||||
@ -235,21 +282,21 @@ pub fn Atomic(comptime T: type) type {
|
||||
.Set => asm volatile ("lock btsq %[bit], %[ptr]"
|
||||
// LLVM doesn't support u1 flag register return values
|
||||
: [result] "={@ccc}" (-> u8),
|
||||
: [ptr] "*p" (&self.value),
|
||||
: [ptr] "*m" (&self.value),
|
||||
[bit] "X" (@as(T, bit)),
|
||||
: "cc", "memory"
|
||||
),
|
||||
.Reset => asm volatile ("lock btrq %[bit], %[ptr]"
|
||||
// LLVM doesn't support u1 flag register return values
|
||||
: [result] "={@ccc}" (-> u8),
|
||||
: [ptr] "*p" (&self.value),
|
||||
: [ptr] "*m" (&self.value),
|
||||
[bit] "X" (@as(T, bit)),
|
||||
: "cc", "memory"
|
||||
),
|
||||
.Toggle => asm volatile ("lock btcq %[bit], %[ptr]"
|
||||
// LLVM doesn't support u1 flag register return values
|
||||
: [result] "={@ccc}" (-> u8),
|
||||
: [ptr] "*p" (&self.value),
|
||||
: [ptr] "*m" (&self.value),
|
||||
[bit] "X" (@as(T, bit)),
|
||||
: "cc", "memory"
|
||||
),
|
||||
@ -266,6 +313,13 @@ pub fn Atomic(comptime T: type) type {
|
||||
};
|
||||
}
|
||||
|
||||
test "Atomic.fence" {
|
||||
inline for (.{ .Acquire, .Release, .AcqRel, .SeqCst }) |ordering| {
|
||||
var x = Atomic(usize).init(0);
|
||||
x.fence(ordering);
|
||||
}
|
||||
}
|
||||
|
||||
fn atomicIntTypes() []const type {
|
||||
comptime var bytes = 1;
|
||||
comptime var types: []const type = &[_]type{};
|
||||
|
||||
@ -292,7 +292,7 @@ pub fn panicExtra(
|
||||
|
||||
/// Non-zero whenever the program triggered a panic.
|
||||
/// The counter is incremented/decremented atomically.
|
||||
var panicking: u8 = 0;
|
||||
var panicking = std.atomic.Atomic(u8).init(0);
|
||||
|
||||
// Locked to avoid interleaving panic messages from multiple threads.
|
||||
var panic_mutex = std.Thread.Mutex{};
|
||||
@ -316,7 +316,7 @@ pub fn panicImpl(trace: ?*const std.builtin.StackTrace, first_trace_addr: ?usize
|
||||
0 => {
|
||||
panic_stage = 1;
|
||||
|
||||
_ = @atomicRmw(u8, &panicking, .Add, 1, .SeqCst);
|
||||
_ = panicking.fetchAdd(1, .SeqCst);
|
||||
|
||||
// Make sure to release the mutex when done
|
||||
{
|
||||
@ -337,13 +337,13 @@ pub fn panicImpl(trace: ?*const std.builtin.StackTrace, first_trace_addr: ?usize
|
||||
dumpCurrentStackTrace(first_trace_addr);
|
||||
}
|
||||
|
||||
if (@atomicRmw(u8, &panicking, .Sub, 1, .SeqCst) != 1) {
|
||||
if (panicking.fetchSub(1, .SeqCst) != 1) {
|
||||
// Another thread is panicking, wait for the last one to finish
|
||||
// and call abort()
|
||||
|
||||
// Sleep forever without hammering the CPU
|
||||
var event: std.Thread.StaticResetEvent = .{};
|
||||
event.wait();
|
||||
var futex = std.atomic.Atomic(u32).init(0);
|
||||
while (true) std.Thread.Futex.wait(&futex, 0);
|
||||
unreachable;
|
||||
}
|
||||
},
|
||||
|
||||
@ -8,6 +8,7 @@ const os = std.os;
|
||||
const windows = os.windows;
|
||||
const maxInt = std.math.maxInt;
|
||||
const Thread = std.Thread;
|
||||
const Atomic = std.atomic.Atomic;
|
||||
|
||||
const is_windows = builtin.os.tag == .windows;
|
||||
|
||||
@ -168,11 +169,9 @@ pub const Loop = struct {
|
||||
.fs_end_request = .{ .data = .{ .msg = .end, .finish = .NoAction } },
|
||||
.fs_queue = std.atomic.Queue(Request).init(),
|
||||
.fs_thread = undefined,
|
||||
.fs_thread_wakeup = undefined,
|
||||
.fs_thread_wakeup = .{},
|
||||
.delay_queue = undefined,
|
||||
};
|
||||
try self.fs_thread_wakeup.init();
|
||||
errdefer self.fs_thread_wakeup.deinit();
|
||||
errdefer self.arena.deinit();
|
||||
|
||||
// We need at least one of these in case the fs thread wants to use onNextTick
|
||||
@ -202,7 +201,6 @@ pub const Loop = struct {
|
||||
|
||||
pub fn deinit(self: *Loop) void {
|
||||
self.deinitOsData();
|
||||
self.fs_thread_wakeup.deinit();
|
||||
self.arena.deinit();
|
||||
self.* = undefined;
|
||||
}
|
||||
@ -723,9 +721,7 @@ pub const Loop = struct {
|
||||
extra_thread.join();
|
||||
}
|
||||
|
||||
@atomicStore(bool, &self.delay_queue.is_running, false, .SeqCst);
|
||||
self.delay_queue.event.set();
|
||||
self.delay_queue.thread.join();
|
||||
self.delay_queue.deinit();
|
||||
}
|
||||
|
||||
/// Runs the provided function asynchronously. The function's frame is allocated
|
||||
@ -851,8 +847,8 @@ pub const Loop = struct {
|
||||
timer: std.time.Timer,
|
||||
waiters: Waiters,
|
||||
thread: std.Thread,
|
||||
event: std.Thread.AutoResetEvent,
|
||||
is_running: bool,
|
||||
event: std.Thread.ResetEvent,
|
||||
is_running: Atomic(bool),
|
||||
|
||||
/// Initialize the delay queue by spawning the timer thread
|
||||
/// and starting any timer resources.
|
||||
@ -862,11 +858,19 @@ pub const Loop = struct {
|
||||
.waiters = DelayQueue.Waiters{
|
||||
.entries = std.atomic.Queue(anyframe).init(),
|
||||
},
|
||||
.event = std.Thread.AutoResetEvent{},
|
||||
.is_running = true,
|
||||
// Must be last so that it can read the other state, such as `is_running`.
|
||||
.thread = try std.Thread.spawn(.{}, DelayQueue.run, .{self}),
|
||||
.thread = undefined,
|
||||
.event = .{},
|
||||
.is_running = Atomic(bool).init(true),
|
||||
};
|
||||
|
||||
// Must be after init so that it can read the other state, such as `is_running`.
|
||||
self.thread = try std.Thread.spawn(.{}, DelayQueue.run, .{self});
|
||||
}
|
||||
|
||||
fn deinit(self: *DelayQueue) void {
|
||||
self.is_running.store(false, .SeqCst);
|
||||
self.event.set();
|
||||
self.thread.join();
|
||||
}
|
||||
|
||||
/// Entry point for the timer thread
|
||||
@ -874,7 +878,8 @@ pub const Loop = struct {
|
||||
fn run(self: *DelayQueue) void {
|
||||
const loop = @fieldParentPtr(Loop, "delay_queue", self);
|
||||
|
||||
while (@atomicLoad(bool, &self.is_running, .SeqCst)) {
|
||||
while (self.is_running.load(.SeqCst)) {
|
||||
self.event.reset();
|
||||
const now = self.timer.read();
|
||||
|
||||
if (self.waiters.popExpired(now)) |entry| {
|
||||
|
||||
@ -917,7 +917,7 @@ test "open file with exclusive and shared nonblocking lock" {
|
||||
try testing.expectError(error.WouldBlock, file2);
|
||||
}
|
||||
|
||||
test "open file with exclusive lock twice, make sure it waits" {
|
||||
test "open file with exclusive lock twice, make sure second lock waits" {
|
||||
if (builtin.single_threaded) return error.SkipZigTest;
|
||||
|
||||
if (std.io.is_async) {
|
||||
@ -934,30 +934,33 @@ test "open file with exclusive lock twice, make sure it waits" {
|
||||
errdefer file.close();
|
||||
|
||||
const S = struct {
|
||||
fn checkFn(dir: *fs.Dir, evt: *std.Thread.ResetEvent) !void {
|
||||
fn checkFn(dir: *fs.Dir, started: *std.Thread.ResetEvent, locked: *std.Thread.ResetEvent) !void {
|
||||
started.set();
|
||||
const file1 = try dir.createFile(filename, .{ .lock = .Exclusive });
|
||||
defer file1.close();
|
||||
evt.set();
|
||||
|
||||
locked.set();
|
||||
file1.close();
|
||||
}
|
||||
};
|
||||
|
||||
var evt: std.Thread.ResetEvent = undefined;
|
||||
try evt.init();
|
||||
defer evt.deinit();
|
||||
var started = std.Thread.ResetEvent{};
|
||||
var locked = std.Thread.ResetEvent{};
|
||||
|
||||
const t = try std.Thread.spawn(.{}, S.checkFn, .{ &tmp.dir, &evt });
|
||||
const t = try std.Thread.spawn(.{}, S.checkFn, .{
|
||||
&tmp.dir,
|
||||
&started,
|
||||
&locked,
|
||||
});
|
||||
defer t.join();
|
||||
|
||||
const SLEEP_TIMEOUT_NS = 10 * std.time.ns_per_ms;
|
||||
// Make sure we've slept enough.
|
||||
var timer = try std.time.Timer.start();
|
||||
while (true) {
|
||||
std.time.sleep(SLEEP_TIMEOUT_NS);
|
||||
if (timer.read() >= SLEEP_TIMEOUT_NS) break;
|
||||
}
|
||||
// Wait for the spawned thread to start trying to acquire the exclusive file lock.
|
||||
// Then wait a bit to make sure that can't acquire it since we currently hold the file lock.
|
||||
started.wait();
|
||||
try testing.expectError(error.Timeout, locked.timedWait(10 * std.time.ns_per_ms));
|
||||
|
||||
// Release the file lock which should unlock the thread to lock it and set the locked event.
|
||||
file.close();
|
||||
// No timeout to avoid failures on heavily loaded systems.
|
||||
evt.wait();
|
||||
locked.wait();
|
||||
}
|
||||
|
||||
test "open file with exclusive nonblocking lock twice (absolute paths)" {
|
||||
|
||||
@ -163,8 +163,8 @@ emit_llvm_bc: ?EmitLoc,
|
||||
emit_analysis: ?EmitLoc,
|
||||
emit_docs: ?EmitLoc,
|
||||
|
||||
work_queue_wait_group: WaitGroup,
|
||||
astgen_wait_group: WaitGroup,
|
||||
work_queue_wait_group: WaitGroup = .{},
|
||||
astgen_wait_group: WaitGroup = .{},
|
||||
|
||||
/// Exported symbol names. This is only for when the target is wasm.
|
||||
/// TODO: Remove this when Stage2 becomes the default compiler as it will already have this information.
|
||||
@ -1674,19 +1674,11 @@ pub fn create(gpa: Allocator, options: InitOptions) !*Compilation {
|
||||
.test_evented_io = options.test_evented_io,
|
||||
.debug_compiler_runtime_libs = options.debug_compiler_runtime_libs,
|
||||
.debug_compile_errors = options.debug_compile_errors,
|
||||
.work_queue_wait_group = undefined,
|
||||
.astgen_wait_group = undefined,
|
||||
};
|
||||
break :comp comp;
|
||||
};
|
||||
errdefer comp.destroy();
|
||||
|
||||
try comp.work_queue_wait_group.init();
|
||||
errdefer comp.work_queue_wait_group.deinit();
|
||||
|
||||
try comp.astgen_wait_group.init();
|
||||
errdefer comp.astgen_wait_group.deinit();
|
||||
|
||||
// Add a `CObject` for each `c_source_files`.
|
||||
try comp.c_object_table.ensureTotalCapacity(gpa, options.c_source_files.len);
|
||||
for (options.c_source_files) |c_source_file| {
|
||||
@ -1894,9 +1886,6 @@ pub fn destroy(self: *Compilation) void {
|
||||
self.cache_parent.manifest_dir.close();
|
||||
if (self.owned_link_dir) |*dir| dir.close();
|
||||
|
||||
self.work_queue_wait_group.deinit();
|
||||
self.astgen_wait_group.deinit();
|
||||
|
||||
for (self.export_symbol_names.items) |symbol_name| {
|
||||
gpa.free(symbol_name);
|
||||
}
|
||||
@ -4701,6 +4690,7 @@ pub fn generateBuiltinZigSource(comp: *Compilation, allocator: Allocator) Alloca
|
||||
\\pub const link_libcpp = {};
|
||||
\\pub const have_error_return_tracing = {};
|
||||
\\pub const valgrind_support = {};
|
||||
\\pub const sanitize_thread = {};
|
||||
\\pub const position_independent_code = {};
|
||||
\\pub const position_independent_executable = {};
|
||||
\\pub const strip_debug_info = {};
|
||||
@ -4713,6 +4703,7 @@ pub fn generateBuiltinZigSource(comp: *Compilation, allocator: Allocator) Alloca
|
||||
comp.bin_file.options.link_libcpp,
|
||||
comp.bin_file.options.error_return_tracing,
|
||||
comp.bin_file.options.valgrind,
|
||||
comp.bin_file.options.tsan,
|
||||
comp.bin_file.options.pic,
|
||||
comp.bin_file.options.pie,
|
||||
comp.bin_file.options.strip,
|
||||
|
||||
@ -3,13 +3,12 @@ const builtin = @import("builtin");
|
||||
const ThreadPool = @This();
|
||||
|
||||
mutex: std.Thread.Mutex = .{},
|
||||
cond: std.Thread.Condition = .{},
|
||||
run_queue: RunQueue = .{},
|
||||
is_running: bool = true,
|
||||
allocator: std.mem.Allocator,
|
||||
workers: []Worker,
|
||||
run_queue: RunQueue = .{},
|
||||
idle_queue: IdleQueue = .{},
|
||||
threads: []std.Thread,
|
||||
|
||||
const IdleQueue = std.SinglyLinkedList(std.Thread.ResetEvent);
|
||||
const RunQueue = std.SinglyLinkedList(Runnable);
|
||||
const Runnable = struct {
|
||||
runFn: RunProto,
|
||||
@ -20,89 +19,52 @@ const RunProto = switch (builtin.zig_backend) {
|
||||
else => *const fn (*Runnable) void,
|
||||
};
|
||||
|
||||
const Worker = struct {
|
||||
pool: *ThreadPool,
|
||||
thread: std.Thread,
|
||||
/// The node is for this worker only and must have an already initialized event
|
||||
/// when the thread is spawned.
|
||||
idle_node: IdleQueue.Node,
|
||||
|
||||
fn run(worker: *Worker) void {
|
||||
const pool = worker.pool;
|
||||
|
||||
while (true) {
|
||||
pool.mutex.lock();
|
||||
|
||||
if (pool.run_queue.popFirst()) |run_node| {
|
||||
pool.mutex.unlock();
|
||||
(run_node.data.runFn)(&run_node.data);
|
||||
continue;
|
||||
}
|
||||
|
||||
if (pool.is_running) {
|
||||
worker.idle_node.data.reset();
|
||||
|
||||
pool.idle_queue.prepend(&worker.idle_node);
|
||||
pool.mutex.unlock();
|
||||
|
||||
worker.idle_node.data.wait();
|
||||
continue;
|
||||
}
|
||||
|
||||
pool.mutex.unlock();
|
||||
return;
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
pub fn init(self: *ThreadPool, allocator: std.mem.Allocator) !void {
|
||||
self.* = .{
|
||||
.allocator = allocator,
|
||||
.workers = &[_]Worker{},
|
||||
.threads = &[_]std.Thread{},
|
||||
};
|
||||
if (builtin.single_threaded)
|
||||
|
||||
if (builtin.single_threaded) {
|
||||
return;
|
||||
|
||||
const worker_count = std.math.max(1, std.Thread.getCpuCount() catch 1);
|
||||
self.workers = try allocator.alloc(Worker, worker_count);
|
||||
errdefer allocator.free(self.workers);
|
||||
|
||||
var worker_index: usize = 0;
|
||||
errdefer self.destroyWorkers(worker_index);
|
||||
while (worker_index < worker_count) : (worker_index += 1) {
|
||||
const worker = &self.workers[worker_index];
|
||||
worker.pool = self;
|
||||
|
||||
// Each worker requires its ResetEvent to be pre-initialized.
|
||||
try worker.idle_node.data.init();
|
||||
errdefer worker.idle_node.data.deinit();
|
||||
|
||||
worker.thread = try std.Thread.spawn(.{}, Worker.run, .{worker});
|
||||
}
|
||||
}
|
||||
|
||||
fn destroyWorkers(self: *ThreadPool, spawned: usize) void {
|
||||
if (builtin.single_threaded)
|
||||
return;
|
||||
const thread_count = std.math.max(1, std.Thread.getCpuCount() catch 1);
|
||||
self.threads = try allocator.alloc(std.Thread, thread_count);
|
||||
errdefer allocator.free(self.threads);
|
||||
|
||||
for (self.workers[0..spawned]) |*worker| {
|
||||
worker.thread.join();
|
||||
worker.idle_node.data.deinit();
|
||||
// kill and join any threads we spawned previously on error.
|
||||
var spawned: usize = 0;
|
||||
errdefer self.join(spawned);
|
||||
|
||||
for (self.threads) |*thread| {
|
||||
thread.* = try std.Thread.spawn(.{}, worker, .{self});
|
||||
spawned += 1;
|
||||
}
|
||||
}
|
||||
|
||||
pub fn deinit(self: *ThreadPool) void {
|
||||
self.join(self.threads.len); // kill and join all threads.
|
||||
self.* = undefined;
|
||||
}
|
||||
|
||||
fn join(self: *ThreadPool, spawned: usize) void {
|
||||
{
|
||||
self.mutex.lock();
|
||||
defer self.mutex.unlock();
|
||||
|
||||
// ensure future worker threads exit the dequeue loop
|
||||
self.is_running = false;
|
||||
while (self.idle_queue.popFirst()) |idle_node|
|
||||
idle_node.data.set();
|
||||
}
|
||||
|
||||
self.destroyWorkers(self.workers.len);
|
||||
self.allocator.free(self.workers);
|
||||
// wake up any sleeping threads (this can be done outside the mutex)
|
||||
// then wait for all the threads we know are spawned to complete.
|
||||
self.cond.broadcast();
|
||||
for (self.threads[0..spawned]) |thread| {
|
||||
thread.join();
|
||||
}
|
||||
|
||||
self.allocator.free(self.threads);
|
||||
}
|
||||
|
||||
pub fn spawn(self: *ThreadPool, comptime func: anytype, args: anytype) !void {
|
||||
@ -122,24 +84,51 @@ pub fn spawn(self: *ThreadPool, comptime func: anytype, args: anytype) !void {
|
||||
const closure = @fieldParentPtr(@This(), "run_node", run_node);
|
||||
@call(.{}, func, closure.arguments);
|
||||
|
||||
// The thread pool's allocator is protected by the mutex.
|
||||
const mutex = &closure.pool.mutex;
|
||||
mutex.lock();
|
||||
defer mutex.unlock();
|
||||
|
||||
closure.pool.allocator.destroy(closure);
|
||||
}
|
||||
};
|
||||
|
||||
{
|
||||
self.mutex.lock();
|
||||
defer self.mutex.unlock();
|
||||
|
||||
const closure = try self.allocator.create(Closure);
|
||||
closure.* = .{
|
||||
.arguments = args,
|
||||
.pool = self,
|
||||
};
|
||||
|
||||
self.run_queue.prepend(&closure.run_node);
|
||||
}
|
||||
|
||||
// Notify waiting threads outside the lock to try and keep the critical section small.
|
||||
self.cond.signal();
|
||||
}
|
||||
|
||||
fn worker(self: *ThreadPool) void {
|
||||
self.mutex.lock();
|
||||
defer self.mutex.unlock();
|
||||
|
||||
const closure = try self.allocator.create(Closure);
|
||||
closure.* = .{
|
||||
.arguments = args,
|
||||
.pool = self,
|
||||
};
|
||||
while (true) {
|
||||
while (self.run_queue.popFirst()) |run_node| {
|
||||
// Temporarily unlock the mutex in order to execute the run_node
|
||||
self.mutex.unlock();
|
||||
defer self.mutex.lock();
|
||||
|
||||
self.run_queue.prepend(&closure.run_node);
|
||||
const runFn = run_node.data.runFn;
|
||||
runFn(&run_node.data);
|
||||
}
|
||||
|
||||
if (self.idle_queue.popFirst()) |idle_node|
|
||||
idle_node.data.set();
|
||||
// Stop executing instead of waiting if the thread pool is no longer running.
|
||||
if (self.is_running) {
|
||||
self.cond.wait(&self.mutex);
|
||||
} else {
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@ -1,56 +1,39 @@
|
||||
const std = @import("std");
|
||||
const Atomic = std.atomic.Atomic;
|
||||
const assert = std.debug.assert;
|
||||
const WaitGroup = @This();
|
||||
|
||||
mutex: std.Thread.Mutex = .{},
|
||||
counter: usize = 0,
|
||||
event: std.Thread.ResetEvent,
|
||||
const is_waiting: usize = 1 << 0;
|
||||
const one_pending: usize = 1 << 1;
|
||||
|
||||
pub fn init(self: *WaitGroup) !void {
|
||||
self.* = .{
|
||||
.mutex = .{},
|
||||
.counter = 0,
|
||||
.event = undefined,
|
||||
};
|
||||
try self.event.init();
|
||||
}
|
||||
|
||||
pub fn deinit(self: *WaitGroup) void {
|
||||
self.event.deinit();
|
||||
self.* = undefined;
|
||||
}
|
||||
state: Atomic(usize) = Atomic(usize).init(0),
|
||||
event: std.Thread.ResetEvent = .{},
|
||||
|
||||
pub fn start(self: *WaitGroup) void {
|
||||
self.mutex.lock();
|
||||
defer self.mutex.unlock();
|
||||
|
||||
self.counter += 1;
|
||||
const state = self.state.fetchAdd(one_pending, .Monotonic);
|
||||
assert((state / one_pending) < (std.math.maxInt(usize) / one_pending));
|
||||
}
|
||||
|
||||
pub fn finish(self: *WaitGroup) void {
|
||||
self.mutex.lock();
|
||||
defer self.mutex.unlock();
|
||||
const state = self.state.fetchSub(one_pending, .Release);
|
||||
assert((state / one_pending) > 0);
|
||||
|
||||
self.counter -= 1;
|
||||
|
||||
if (self.counter == 0) {
|
||||
if (state == (one_pending | is_waiting)) {
|
||||
self.state.fence(.Acquire);
|
||||
self.event.set();
|
||||
}
|
||||
}
|
||||
|
||||
pub fn wait(self: *WaitGroup) void {
|
||||
while (true) {
|
||||
self.mutex.lock();
|
||||
var state = self.state.fetchAdd(is_waiting, .Acquire);
|
||||
assert(state & is_waiting == 0);
|
||||
|
||||
if (self.counter == 0) {
|
||||
self.mutex.unlock();
|
||||
return;
|
||||
}
|
||||
|
||||
self.mutex.unlock();
|
||||
if ((state / one_pending) > 0) {
|
||||
self.event.wait();
|
||||
}
|
||||
}
|
||||
|
||||
pub fn reset(self: *WaitGroup) void {
|
||||
self.state.store(0, .Monotonic);
|
||||
self.event.reset();
|
||||
}
|
||||
|
||||
@ -362,7 +362,7 @@ const PanicSwitch = struct {
|
||||
/// Updated atomically before taking the panic_mutex.
|
||||
/// In recoverable cases, the program will not abort
|
||||
/// until all panicking threads have dumped their traces.
|
||||
var panicking: u8 = 0;
|
||||
var panicking = std.atomic.Atomic(u8).init(0);
|
||||
|
||||
// Locked to avoid interleaving panic messages from multiple threads.
|
||||
var panic_mutex = std.Thread.Mutex{};
|
||||
@ -430,7 +430,7 @@ const PanicSwitch = struct {
|
||||
};
|
||||
state.* = new_state;
|
||||
|
||||
_ = @atomicRmw(u8, &panicking, .Add, 1, .SeqCst);
|
||||
_ = panicking.fetchAdd(1, .SeqCst);
|
||||
|
||||
state.recover_stage = .release_ref_count;
|
||||
|
||||
@ -512,13 +512,14 @@ const PanicSwitch = struct {
|
||||
noinline fn releaseRefCount(state: *volatile PanicState) noreturn {
|
||||
state.recover_stage = .abort;
|
||||
|
||||
if (@atomicRmw(u8, &panicking, .Sub, 1, .SeqCst) != 1) {
|
||||
if (panicking.fetchSub(1, .SeqCst) != 1) {
|
||||
// Another thread is panicking, wait for the last one to finish
|
||||
// and call abort()
|
||||
|
||||
// Sleep forever without hammering the CPU
|
||||
var event: std.Thread.StaticResetEvent = .{};
|
||||
event.wait();
|
||||
var futex = std.atomic.Atomic(u32).init(0);
|
||||
while (true) std.Thread.Futex.wait(&futex, 0);
|
||||
|
||||
// This should be unreachable, recurse into recoverAbort.
|
||||
@panic("event.wait() returned");
|
||||
}
|
||||
|
||||
@ -9993,6 +9993,7 @@ Buf *codegen_generate_builtin_source(CodeGen *g) {
|
||||
buf_appendf(contents, "pub const link_libcpp = %s;\n", bool_to_str(g->link_libcpp));
|
||||
buf_appendf(contents, "pub const have_error_return_tracing = %s;\n", bool_to_str(g->have_err_ret_tracing));
|
||||
buf_appendf(contents, "pub const valgrind_support = false;\n");
|
||||
buf_appendf(contents, "pub const sanitize_thread = false;\n");
|
||||
buf_appendf(contents, "pub const position_independent_code = %s;\n", bool_to_str(g->have_pic));
|
||||
buf_appendf(contents, "pub const position_independent_executable = %s;\n", bool_to_str(g->have_pie));
|
||||
buf_appendf(contents, "pub const strip_debug_info = %s;\n", bool_to_str(g->strip_debug_symbols));
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user