std.Thread: Mutex and Condition improvements (#11497)

* Thread: minor cleanups

* Thread: rewrite Mutex

* Thread: introduce Futex.Deadline

* Thread: Condition rewrite + cleanup

* Mutex: optimize lock fast path

* Condition: more docs

* Thread: more mutex + condition docs

* Thread: remove broken Condition test

* Thread: zig fmt

* address review comments + fix Thread.DummyMutex in GPA

* Atomic: disable bitRmw x86 inline asm for stage2

* GPA: typo mutex_init

* Thread: remove noalias on stuff

* Thread: comment typos + clarifications
This commit is contained in:
protty 2022-04-23 19:35:56 -05:00 committed by GitHub
parent daef82d06f
commit 963ac60918
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 873 additions and 703 deletions

View File

@ -459,9 +459,8 @@ const UnsupportedImpl = struct {
}
fn unsupported(unusued: anytype) noreturn {
@compileLog("Unsupported operating system", target.os.tag);
_ = unusued;
unreachable;
@compileError("Unsupported operating system " ++ @tagName(target.os.tag));
}
};
@ -1188,27 +1187,3 @@ test "Thread.detach" {
event.wait();
try std.testing.expectEqual(value, 1);
}
fn testWaitForSignal(mutex: *Mutex, cond: *Condition) void {
mutex.lock();
defer mutex.unlock();
cond.signal();
cond.wait(mutex);
}
test "Condition.signal" {
if (builtin.single_threaded) return error.SkipZigTest;
var mutex = Mutex{};
var cond = Condition{};
var thread: Thread = undefined;
{
mutex.lock();
defer mutex.unlock();
thread = try Thread.spawn(.{}, testWaitForSignal, .{ &mutex, &cond });
cond.wait(&mutex);
cond.signal();
}
thread.join();
}

View File

@ -1,411 +1,538 @@
//! A condition provides a way for a kernel thread to block until it is signaled
//! to wake up. Spurious wakeups are possible.
//! This API supports static initialization and does not require deinitialization.
impl: Impl = .{},
//! Condition variables are used with a Mutex to efficiently wait for an arbitrary condition to occur.
//! It does this by atomically unlocking the mutex, blocking the thread until notified, and finally re-locking the mutex.
//! Condition can be statically initialized and is at most `@sizeOf(u64)` large.
//!
//! Example:
//! ```
//! var m = Mutex{};
//! var c = Condition{};
//! var predicate = false;
//!
//! fn consumer() void {
//! m.lock();
//! defer m.unlock();
//!
//! while (!predicate) {
//! c.wait(&mutex);
//! }
//! }
//!
//! fn producer() void {
//! m.lock();
//! defer m.unlock();
//!
//! predicate = true;
//! c.signal();
//! }
//!
//! const thread = try std.Thread.spawn(.{}, producer, .{});
//! consumer();
//! thread.join();
//! ```
//!
//! Note that condition variables can only reliably unblock threads that are sequenced before them using the same Mutex.
//! This means that the following is allowed to deadlock:
//! ```
//! thread-1: mutex.lock()
//! thread-1: condition.wait(&mutex)
//!
//! thread-2: // mutex.lock() (without this, the following signal may not see the waiting thread-1)
//! thread-2: // mutex.unlock() (this is optional for correctness once locked above, as signal can be called without holding the mutex)
//! thread-2: condition.signal()
//! ```
const std = @import("../std.zig");
const builtin = @import("builtin");
const Condition = @This();
const windows = std.os.windows;
const linux = std.os.linux;
const Mutex = std.Thread.Mutex;
const os = std.os;
const assert = std.debug.assert;
const testing = std.testing;
const Atomic = std.atomic.Atomic;
const Futex = std.Thread.Futex;
pub fn wait(cond: *Condition, mutex: *Mutex) void {
cond.impl.wait(mutex);
impl: Impl = .{},
/// Atomically releases the Mutex, blocks the caller thread, then re-acquires the Mutex on return.
/// "Atomically" here refers to accesses done on the Condition after acquiring the Mutex.
///
/// The Mutex must be locked by the caller's thread when this function is called.
/// A Mutex can have multiple Conditions waiting with it concurrently, but not the opposite.
/// It is undefined behavior for multiple threads to wait ith different mutexes using the same Condition concurrently.
/// Once threads have finished waiting with one Mutex, the Condition can be used to wait with another Mutex.
///
/// A blocking call to wait() is unblocked from one of the following conditions:
/// - a spurious ("at random") wake up occurs
/// - a future call to `signal()` or `broadcast()` which has acquired the Mutex and is sequenced after this `wait()`.
///
/// Given wait() can be interrupted spuriously, the blocking condition should be checked continuously
/// irrespective of any notifications from `signal()` or `broadcast()`.
pub fn wait(self: *Condition, mutex: *Mutex) void {
self.impl.wait(mutex, null) catch |err| switch (err) {
error.Timeout => unreachable, // no timeout provided so we shouldn't have timed-out
};
}
pub fn timedWait(cond: *Condition, mutex: *Mutex, timeout_ns: u64) error{TimedOut}!void {
try cond.impl.timedWait(mutex, timeout_ns);
/// Atomically releases the Mutex, blocks the caller thread, then re-acquires the Mutex on return.
/// "Atomically" here refers to accesses done on the Condition after acquiring the Mutex.
///
/// The Mutex must be locked by the caller's thread when this function is called.
/// A Mutex can have multiple Conditions waiting with it concurrently, but not the opposite.
/// It is undefined behavior for multiple threads to wait ith different mutexes using the same Condition concurrently.
/// Once threads have finished waiting with one Mutex, the Condition can be used to wait with another Mutex.
///
/// A blocking call to `timedWait()` is unblocked from one of the following conditions:
/// - a spurious ("at random") wake occurs
/// - the caller was blocked for around `timeout_ns` nanoseconds, in which `error.Timeout` is returned.
/// - a future call to `signal()` or `broadcast()` which has acquired the Mutex and is sequenced after this `timedWait()`.
///
/// Given `timedWait()` can be interrupted spuriously, the blocking condition should be checked continuously
/// irrespective of any notifications from `signal()` or `broadcast()`.
pub fn timedWait(self: *Condition, mutex: *Mutex, timeout_ns: u64) error{Timeout}!void {
return self.impl.wait(mutex, timeout_ns);
}
pub fn signal(cond: *Condition) void {
cond.impl.signal();
/// Unblocks at least one thread blocked in a call to `wait()` or `timedWait()` with a given Mutex.
/// The blocked thread must be sequenced before this call with respect to acquiring the same Mutex in order to be observable for unblocking.
/// `signal()` can be called with or without the relevant Mutex being acquired and have no "effect" if there's no observable blocked threads.
pub fn signal(self: *Condition) void {
self.impl.wake(.one);
}
pub fn broadcast(cond: *Condition) void {
cond.impl.broadcast();
/// Unblocks all threads currently blocked in a call to `wait()` or `timedWait()` with a given Mutex.
/// The blocked threads must be sequenced before this call with respect to acquiring the same Mutex in order to be observable for unblocking.
/// `broadcast()` can be called with or without the relevant Mutex being acquired and have no "effect" if there's no observable blocked threads.
pub fn broadcast(self: *Condition) void {
self.impl.wake(.all);
}
const Impl = if (builtin.single_threaded)
SingleThreadedCondition
SingleThreadedImpl
else if (builtin.os.tag == .windows)
WindowsCondition
else if (std.Thread.use_pthreads)
PthreadCondition
WindowsImpl
else
AtomicCondition;
FutexImpl;
pub const SingleThreadedCondition = struct {
pub fn wait(cond: *SingleThreadedCondition, mutex: *Mutex) void {
_ = cond;
_ = mutex;
unreachable; // deadlock detected
}
const Notify = enum {
one, // wake up only one thread
all, // wake up all threads
};
pub fn timedWait(cond: *SingleThreadedCondition, mutex: *Mutex, timeout_ns: u64) error{TimedOut}!void {
_ = cond;
const SingleThreadedImpl = struct {
fn wait(self: *Impl, mutex: *Mutex, timeout: ?u64) error{Timeout}!void {
_ = self;
_ = mutex;
_ = timeout_ns;
// There are no other threads to wake us up.
// So if we wait without a timeout we would never wake up.
const timeout_ns = timeout orelse {
unreachable; // deadlock detected
};
std.time.sleep(timeout_ns);
return error.TimedOut;
return error.Timeout;
}
pub fn signal(cond: *SingleThreadedCondition) void {
_ = cond;
}
pub fn broadcast(cond: *SingleThreadedCondition) void {
_ = cond;
fn wake(self: *Impl, comptime notify: Notify) void {
// There are no other threads to wake up.
_ = self;
_ = notify;
}
};
pub const WindowsCondition = struct {
cond: windows.CONDITION_VARIABLE = windows.CONDITION_VARIABLE_INIT,
const WindowsImpl = struct {
condition: os.windows.CONDITION_VARIABLE = .{},
pub fn wait(cond: *WindowsCondition, mutex: *Mutex) void {
const rc = windows.kernel32.SleepConditionVariableSRW(
&cond.cond,
fn wait(self: *Impl, mutex: *Mutex, timeout: ?u64) error{Timeout}!void {
var timeout_overflowed = false;
var timeout_ms: os.windows.DWORD = os.windows.INFINITE;
if (timeout) |timeout_ns| {
// Round the nanoseconds to the nearest millisecond,
// then saturating cast it to windows DWORD for use in kernel32 call.
const ms = (timeout_ns +| (std.time.ns_per_ms / 2)) / std.time.ns_per_ms;
timeout_ms = std.math.cast(os.windows.DWORD, ms) catch std.math.maxInt(os.windows.DWORD);
// Track if the timeout overflowed into INFINITE and make sure not to wait forever.
if (timeout_ms == os.windows.INFINITE) {
timeout_overflowed = true;
timeout_ms -= 1;
}
}
const rc = os.windows.kernel32.SleepConditionVariableSRW(
&self.condition,
&mutex.impl.srwlock,
windows.INFINITE,
@as(windows.ULONG, 0),
timeout_ms,
0, // the srwlock was assumed to acquired in exclusive mode not shared
);
assert(rc != windows.FALSE);
// Return error.Timeout if we know the timeout elapsed correctly.
if (rc == os.windows.FALSE) {
assert(os.windows.kernel32.GetLastError() == .TIMEOUT);
if (!timeout_overflowed) return error.Timeout;
}
}
pub fn timedWait(cond: *WindowsCondition, mutex: *Mutex, timeout_ns: u64) error{TimedOut}!void {
var timeout_checked = std.math.cast(windows.DWORD, timeout_ns / std.time.ns_per_ms) catch overflow: {
break :overflow std.math.maxInt(windows.DWORD);
};
// Handle the case where timeout is INFINITE, otherwise SleepConditionVariableSRW's time-out never elapses
const timeout_overflowed = timeout_checked == windows.INFINITE;
timeout_checked -= @boolToInt(timeout_overflowed);
const rc = windows.kernel32.SleepConditionVariableSRW(
&cond.cond,
&mutex.impl.srwlock,
timeout_checked,
@as(windows.ULONG, 0),
);
if (rc == windows.FALSE and windows.kernel32.GetLastError() == windows.Win32Error.TIMEOUT) return error.TimedOut;
assert(rc != windows.FALSE);
}
pub fn signal(cond: *WindowsCondition) void {
windows.kernel32.WakeConditionVariable(&cond.cond);
}
pub fn broadcast(cond: *WindowsCondition) void {
windows.kernel32.WakeAllConditionVariable(&cond.cond);
fn wake(self: *Impl, comptime notify: Notify) void {
switch (notify) {
.one => os.windows.kernel32.WakeConditionVariable(&self.condition),
.all => os.windows.kernel32.WakeAllConditionVariable(&self.condition),
}
}
};
pub const PthreadCondition = struct {
cond: std.c.pthread_cond_t = .{},
const FutexImpl = struct {
state: Atomic(u32) = Atomic(u32).init(0),
epoch: Atomic(u32) = Atomic(u32).init(0),
pub fn wait(cond: *PthreadCondition, mutex: *Mutex) void {
const rc = std.c.pthread_cond_wait(&cond.cond, &mutex.impl.pthread_mutex);
assert(rc == .SUCCESS);
}
const one_waiter = 1;
const waiter_mask = 0xffff;
pub fn timedWait(cond: *PthreadCondition, mutex: *Mutex, timeout_ns: u64) error{TimedOut}!void {
var ts: std.os.timespec = undefined;
std.os.clock_gettime(std.os.CLOCK.REALTIME, &ts) catch unreachable;
ts.tv_sec += @intCast(@TypeOf(ts.tv_sec), timeout_ns / std.time.ns_per_s);
ts.tv_nsec += @intCast(@TypeOf(ts.tv_nsec), timeout_ns % std.time.ns_per_s);
if (ts.tv_nsec >= std.time.ns_per_s) {
ts.tv_sec += 1;
ts.tv_nsec -= std.time.ns_per_s;
}
const one_signal = 1 << 16;
const signal_mask = 0xffff << 16;
const rc = std.c.pthread_cond_timedwait(&cond.cond, &mutex.impl.pthread_mutex, &ts);
return switch (rc) {
.SUCCESS => {},
.TIMEDOUT => error.TimedOut,
else => unreachable,
};
}
fn wait(self: *Impl, mutex: *Mutex, timeout: ?u64) error{Timeout}!void {
// Register that we're waiting on the state by incrementing the wait count.
// This assumes that there can be at most ((1<<16)-1) or 65,355 threads concurrently waiting on the same Condvar.
// If this is hit in practice, then this condvar not working is the least of your concerns.
var state = self.state.fetchAdd(one_waiter, .Monotonic);
assert(state & waiter_mask != waiter_mask);
state += one_waiter;
pub fn signal(cond: *PthreadCondition) void {
const rc = std.c.pthread_cond_signal(&cond.cond);
assert(rc == .SUCCESS);
}
pub fn broadcast(cond: *PthreadCondition) void {
const rc = std.c.pthread_cond_broadcast(&cond.cond);
assert(rc == .SUCCESS);
}
};
pub const AtomicCondition = struct {
pending: bool = false,
queue_mutex: Mutex = .{},
queue_list: QueueList = .{},
pub const QueueList = std.SinglyLinkedList(QueueItem);
pub const QueueItem = struct {
futex: i32 = 0,
dequeued: bool = false,
fn wait(cond: *@This()) void {
while (@atomicLoad(i32, &cond.futex, .Acquire) == 0) {
switch (builtin.os.tag) {
.linux => {
switch (linux.getErrno(linux.futex_wait(
&cond.futex,
linux.FUTEX.PRIVATE_FLAG | linux.FUTEX.WAIT,
0,
null,
))) {
.SUCCESS => {},
.INTR => {},
.AGAIN => {},
else => unreachable,
}
},
else => std.atomic.spinLoopHint(),
}
}
}
pub fn timedWait(cond: *@This(), timeout_ns: u64) error{TimedOut}!void {
const start_time = std.time.nanoTimestamp();
while (@atomicLoad(i32, &cond.futex, .Acquire) == 0) {
switch (builtin.os.tag) {
.linux => {
var ts: std.os.timespec = undefined;
ts.tv_sec = @intCast(@TypeOf(ts.tv_sec), timeout_ns / std.time.ns_per_s);
ts.tv_nsec = @intCast(@TypeOf(ts.tv_nsec), timeout_ns % std.time.ns_per_s);
switch (linux.getErrno(linux.futex_wait(
&cond.futex,
linux.FUTEX.PRIVATE_FLAG | linux.FUTEX.WAIT,
0,
&ts,
))) {
.SUCCESS => {},
.INTR => {},
.AGAIN => {},
.TIMEDOUT => return error.TimedOut,
.INVAL => {}, // possibly timeout overflow
.FAULT => unreachable,
else => unreachable,
}
},
else => {
if (std.time.nanoTimestamp() - start_time >= timeout_ns) {
return error.TimedOut;
}
std.atomic.spinLoopHint();
},
}
}
}
fn notify(cond: *@This()) void {
@atomicStore(i32, &cond.futex, 1, .Release);
switch (builtin.os.tag) {
.linux => {
switch (linux.getErrno(linux.futex_wake(
&cond.futex,
linux.FUTEX.PRIVATE_FLAG | linux.FUTEX.WAKE,
1,
))) {
.SUCCESS => {},
.FAULT => {},
else => unreachable,
}
},
else => {},
}
}
};
pub fn wait(cond: *AtomicCondition, mutex: *Mutex) void {
var waiter = QueueList.Node{ .data = .{} };
{
cond.queue_mutex.lock();
defer cond.queue_mutex.unlock();
cond.queue_list.prepend(&waiter);
@atomicStore(bool, &cond.pending, true, .SeqCst);
}
mutex.unlock();
waiter.data.wait();
mutex.lock();
}
pub fn timedWait(cond: *AtomicCondition, mutex: *Mutex, timeout_ns: u64) error{TimedOut}!void {
var waiter = QueueList.Node{ .data = .{} };
{
cond.queue_mutex.lock();
defer cond.queue_mutex.unlock();
cond.queue_list.prepend(&waiter);
@atomicStore(bool, &cond.pending, true, .SeqCst);
}
var timed_out = false;
// Temporarily release the mutex in order to block on the condition variable.
mutex.unlock();
defer mutex.lock();
waiter.data.timedWait(timeout_ns) catch |err| switch (err) {
error.TimedOut => {
defer if (!timed_out) {
waiter.data.wait();
};
cond.queue_mutex.lock();
defer cond.queue_mutex.unlock();
if (!waiter.data.dequeued) {
timed_out = true;
cond.queue_list.remove(&waiter);
}
},
else => unreachable,
};
var futex_deadline = Futex.Deadline.init(timeout);
while (true) {
// Try to wake up by consuming a signal and decremented the waiter we added previously.
// Acquire barrier ensures code before the wake() which added the signal happens before we decrement it and return.
while (state & signal_mask != 0) {
const new_state = state - one_waiter - one_signal;
state = self.state.tryCompareAndSwap(state, new_state, .Acquire, .Monotonic) orelse return;
}
if (timed_out) {
return error.TimedOut;
// Observe the epoch, then check the state again to see if we should wake up.
// The epoch must be observed before we check the state or we could potentially miss a wake() and deadlock:
//
// - T1: s = LOAD(&state)
// - T2: UPDATE(&s, signal)
// - T2: UPDATE(&epoch, 1) + FUTEX_WAKE(&epoch)
// - T1: e = LOAD(&epoch) (was reordered after the state load)
// - T1: s & signals == 0 -> FUTEX_WAIT(&epoch, e) (missed the state update + the epoch change)
//
// Acquire barrier to ensure the epoch load happens before the state load.
const epoch = self.epoch.load(.Acquire);
state = self.state.load(.Monotonic);
if (state & signal_mask != 0) {
continue;
}
futex_deadline.wait(&self.epoch, epoch) catch |err| switch (err) {
// On timeout, we must decrement the waiter we added above.
error.Timeout => {
while (true) {
// If there's a signal when we're timing out, consume it and report being woken up instead.
// Acquire barrier ensures code before the wake() which added the signal happens before we decrement it and return.
while (state & signal_mask != 0) {
const new_state = state - one_waiter - one_signal;
state = self.state.tryCompareAndSwap(state, new_state, .Acquire, .Monotonic) orelse return;
}
// Remove the waiter we added and officially return timed out.
const new_state = state - one_waiter;
state = self.state.tryCompareAndSwap(state, new_state, .Monotonic, .Monotonic) orelse return err;
}
},
};
}
}
pub fn signal(cond: *AtomicCondition) void {
if (@atomicLoad(bool, &cond.pending, .SeqCst) == false)
return;
fn wake(self: *Impl, comptime notify: Notify) void {
var state = self.state.load(.Monotonic);
while (true) {
const waiters = (state & waiter_mask) / one_waiter;
const signals = (state & signal_mask) / one_signal;
const maybe_waiter = blk: {
cond.queue_mutex.lock();
defer cond.queue_mutex.unlock();
const maybe_waiter = cond.queue_list.popFirst();
if (maybe_waiter) |waiter| {
waiter.data.dequeued = true;
}
@atomicStore(bool, &cond.pending, cond.queue_list.first != null, .SeqCst);
break :blk maybe_waiter;
};
if (maybe_waiter) |waiter| {
waiter.data.notify();
}
}
pub fn broadcast(cond: *AtomicCondition) void {
if (@atomicLoad(bool, &cond.pending, .SeqCst) == false)
return;
@atomicStore(bool, &cond.pending, false, .SeqCst);
var waiters = blk: {
cond.queue_mutex.lock();
defer cond.queue_mutex.unlock();
const waiters = cond.queue_list;
var it = waiters.first;
while (it) |node| : (it = node.next) {
node.data.dequeued = true;
// Reserves which waiters to wake up by incrementing the signals count.
// Therefor, the signals count is always less than or equal to the waiters count.
// We don't need to Futex.wake if there's nothing to wake up or if other wake() threads have reserved to wake up the current waiters.
const wakeable = waiters - signals;
if (wakeable == 0) {
return;
}
cond.queue_list = .{};
break :blk waiters;
};
const to_wake = switch (notify) {
.one => 1,
.all => wakeable,
};
while (waiters.popFirst()) |waiter| {
waiter.data.notify();
// Reserve the amount of waiters to wake by incrementing the signals count.
// Release barrier ensures code before the wake() happens before the signal it posted and consumed by the wait() threads.
const new_state = state + (one_signal * to_wake);
state = self.state.tryCompareAndSwap(state, new_state, .Release, .Monotonic) orelse {
// Wake up the waiting threads we reserved above by changing the epoch value.
// NOTE: a waiting thread could miss a wake up if *exactly* ((1<<32)-1) wake()s happen between it observing the epoch and sleeping on it.
// This is very unlikely due to how many precise amount of Futex.wake() calls that would be between the waiting thread's potential preemption.
//
// Release barrier ensures the signal being added to the state happens before the epoch is changed.
// If not, the waiting thread could potentially deadlock from missing both the state and epoch change:
//
// - T2: UPDATE(&epoch, 1) (reordered before the state change)
// - T1: e = LOAD(&epoch)
// - T1: s = LOAD(&state)
// - T2: UPDATE(&state, signal) + FUTEX_WAKE(&epoch)
// - T1: s & signals == 0 -> FUTEX_WAIT(&epoch, e) (missed both epoch change and state change)
_ = self.epoch.fetchAdd(1, .Release);
Futex.wake(&self.epoch, to_wake);
return;
};
}
}
};
test "Thread.Condition" {
test "Condition - smoke test" {
var mutex = Mutex{};
var cond = Condition{};
// Try to wake outside the mutex
defer cond.signal();
defer cond.broadcast();
mutex.lock();
defer mutex.unlock();
// Try to wait with a timeout (should not deadlock)
try testing.expectError(error.Timeout, cond.timedWait(&mutex, 0));
try testing.expectError(error.Timeout, cond.timedWait(&mutex, std.time.ns_per_ms));
// Try to wake inside the mutex.
cond.signal();
cond.broadcast();
}
// Inspired from: https://github.com/Amanieu/parking_lot/pull/129
test "Condition - wait and signal" {
// This test requires spawning threads
if (builtin.single_threaded) {
return error.SkipZigTest;
}
const TestContext = struct {
cond: *Condition,
cond_main: *Condition,
mutex: *Mutex,
n: *i32,
fn worker(ctx: *@This()) void {
ctx.mutex.lock();
ctx.n.* += 1;
ctx.cond_main.signal();
ctx.cond.wait(ctx.mutex);
ctx.n.* -= 1;
ctx.cond_main.signal();
ctx.mutex.unlock();
const num_threads = 4;
const MultiWait = struct {
mutex: Mutex = .{},
cond: Condition = .{},
threads: [num_threads]std.Thread = undefined,
fn run(self: *@This()) void {
self.mutex.lock();
defer self.mutex.unlock();
self.cond.wait(&self.mutex);
self.cond.timedWait(&self.mutex, std.time.ns_per_ms) catch {};
self.cond.signal();
}
};
const num_threads = 3;
var threads: [num_threads]std.Thread = undefined;
var cond = Condition{};
var cond_main = Condition{};
var mut = Mutex{};
var n: i32 = 0;
var ctx = TestContext{ .cond = &cond, .cond_main = &cond_main, .mutex = &mut, .n = &n };
mut.lock();
for (threads) |*t| t.* = try std.Thread.spawn(.{}, TestContext.worker, .{&ctx});
cond_main.wait(&mut);
while (n < num_threads) cond_main.wait(&mut);
var multi_wait = MultiWait{};
for (multi_wait.threads) |*t| {
t.* = try std.Thread.spawn(.{}, MultiWait.run, .{&multi_wait});
}
cond.signal();
cond_main.wait(&mut);
try testing.expect(n == (num_threads - 1));
std.time.sleep(100 * std.time.ns_per_ms);
cond.broadcast();
while (n > 0) cond_main.wait(&mut);
try testing.expect(n == 0);
for (threads) |t| t.join();
multi_wait.cond.signal();
for (multi_wait.threads) |t| {
t.join();
}
}
test "Thread.Condition.timedWait" {
test "Condition - signal" {
// This test requires spawning threads
if (builtin.single_threaded) {
return error.SkipZigTest;
}
var cond = Condition{};
var mut = Mutex{};
const num_threads = 4;
// Expect a timeout, as the condition variable is never signaled
{
mut.lock();
defer mut.unlock();
try testing.expectError(error.TimedOut, cond.timedWait(&mut, 10 * std.time.ns_per_ms));
const SignalTest = struct {
mutex: Mutex = .{},
cond: Condition = .{},
notified: bool = false,
threads: [num_threads]std.Thread = undefined,
fn run(self: *@This()) void {
self.mutex.lock();
defer self.mutex.unlock();
// Use timedWait() a few times before using wait()
// to test multiple threads timing out frequently.
var i: usize = 0;
while (!self.notified) : (i +%= 1) {
if (i < 5) {
self.cond.timedWait(&self.mutex, 1) catch {};
} else {
self.cond.wait(&self.mutex);
}
}
// Once we received the signal, notify another thread (inside the lock).
assert(self.notified);
self.cond.signal();
}
};
var signal_test = SignalTest{};
for (signal_test.threads) |*t| {
t.* = try std.Thread.spawn(.{}, SignalTest.run, .{&signal_test});
}
// Expect a signal before timeout
{
const TestContext = struct {
cond: *Condition,
mutex: *Mutex,
n: *u32,
fn worker(ctx: *@This()) void {
ctx.mutex.lock();
defer ctx.mutex.unlock();
ctx.n.* = 1;
ctx.cond.signal();
}
};
// Wait for a bit in hopes that the spawned threads start queuing up on the condvar
std.time.sleep(10 * std.time.ns_per_ms);
var n: u32 = 0;
// Wake up one of them (outside the lock) after setting notified=true.
defer signal_test.cond.signal();
var ctx = TestContext{ .cond = &cond, .mutex = &mut, .n = &n };
mut.lock();
var thread = try std.Thread.spawn(.{}, TestContext.worker, .{&ctx});
// Looped check to handle spurious wakeups
while (n != 1) try cond.timedWait(&mut, 500 * std.time.ns_per_ms);
mut.unlock();
try testing.expect(n == 1);
thread.join();
signal_test.mutex.lock();
defer signal_test.mutex.unlock();
try testing.expect(!signal_test.notified);
signal_test.notified = true;
}
for (signal_test.threads) |t| {
t.join();
}
}
test "Condition - multi signal" {
// This test requires spawning threads
if (builtin.single_threaded) {
return error.SkipZigTest;
}
const num_threads = 4;
const num_iterations = 4;
const Paddle = struct {
mutex: Mutex = .{},
cond: Condition = .{},
value: u32 = 0,
fn hit(self: *@This()) void {
defer self.cond.signal();
self.mutex.lock();
defer self.mutex.unlock();
self.value += 1;
}
fn run(self: *@This(), hit_to: *@This()) !void {
self.mutex.lock();
defer self.mutex.unlock();
var current: u32 = 0;
while (current < num_iterations) : (current += 1) {
// Wait for the value to change from hit()
while (self.value == current) {
self.cond.wait(&self.mutex);
}
// hit the next paddle
try testing.expectEqual(self.value, current + 1);
hit_to.hit();
}
}
};
var paddles = [_]Paddle{.{}} ** num_threads;
var threads = [_]std.Thread{undefined} ** num_threads;
// Create a circle of paddles which hit each other
for (threads) |*t, i| {
const paddle = &paddles[i];
const hit_to = &paddles[(i + 1) % paddles.len];
t.* = try std.Thread.spawn(.{}, Paddle.run, .{ paddle, hit_to });
}
// Hit the first paddle and wait for them all to complete by hitting each other for num_iterations.
paddles[0].hit();
for (threads) |t| t.join();
// The first paddle will be hit one last time by the last paddle.
for (paddles) |p, i| {
const expected = @as(u32, num_iterations) + @boolToInt(i == 0);
try testing.expectEqual(p.value, expected);
}
}
test "Condition - broadcasting" {
// This test requires spawning threads
if (builtin.single_threaded) {
return error.SkipZigTest;
}
const num_threads = 10;
const BroadcastTest = struct {
mutex: Mutex = .{},
cond: Condition = .{},
completed: Condition = .{},
count: usize = 0,
threads: [num_threads]std.Thread = undefined,
fn run(self: *@This()) void {
self.mutex.lock();
defer self.mutex.unlock();
// The last broadcast thread to start tells the main test thread it's completed.
self.count += 1;
if (self.count == num_threads) {
self.completed.signal();
}
// Waits for the count to reach zero after the main test thread observes it at num_threads.
// Tries to use timedWait() a bit before falling back to wait() to test multiple threads timing out.
var i: usize = 0;
while (self.count != 0) : (i +%= 1) {
if (i < 10) {
self.cond.timedWait(&self.mutex, 1) catch {};
} else {
self.cond.wait(&self.mutex);
}
}
}
};
var broadcast_test = BroadcastTest{};
for (broadcast_test.threads) |*t| {
t.* = try std.Thread.spawn(.{}, BroadcastTest.run, .{&broadcast_test});
}
{
broadcast_test.mutex.lock();
defer broadcast_test.mutex.unlock();
// Wait for all the broadcast threads to spawn.
// timedWait() to detect any potential deadlocks.
while (broadcast_test.count != num_threads) {
try broadcast_test.completed.timedWait(
&broadcast_test.mutex,
1 * std.time.ns_per_s,
);
}
// Reset the counter and wake all the threads to exit.
broadcast_test.count = 0;
broadcast_test.cond.broadcast();
}
for (broadcast_test.threads) |t| {
t.join();
}
}

View File

@ -10,14 +10,12 @@ const Futex = @This();
const os = std.os;
const assert = std.debug.assert;
const testing = std.testing;
const Atomic = std.atomic.Atomic;
const spinLoopHint = std.atomic.spinLoopHint;
/// Checks if `ptr` still contains the value `expect` and, if so, blocks the caller until either:
/// - The value at `ptr` is no longer equal to `expect`.
/// - The caller is unblocked by a matching `wake()`.
/// - The caller is unblocked spuriously by an arbitrary internal signal.
/// - The caller is unblocked spuriously ("at random").
///
/// The checking of `ptr` and `expect`, along with blocking the caller, is done atomically
/// and totally ordered (sequentially consistent) with respect to other wait()/wake() calls on the same `ptr`.
@ -32,7 +30,7 @@ pub fn wait(ptr: *const Atomic(u32), expect: u32) void {
/// Checks if `ptr` still contains the value `expect` and, if so, blocks the caller until either:
/// - The value at `ptr` is no longer equal to `expect`.
/// - The caller is unblocked by a matching `wake()`.
/// - The caller is unblocked spuriously by an arbitrary internal signal.
/// - The caller is unblocked spuriously ("at random").
/// - The caller blocks for longer than the given timeout. In which case, `error.Timeout` is returned.
///
/// The checking of `ptr` and `expect`, along with blocking the caller, is done atomically
@ -62,7 +60,7 @@ pub fn wake(ptr: *const Atomic(u32), max_waiters: u32) void {
}
const Impl = if (builtin.single_threaded)
SerialImpl
SingleThreadedImpl
else if (builtin.os.tag == .windows)
WindowsImpl
else if (builtin.os.tag.isDarwin())
@ -97,7 +95,7 @@ const UnsupportedImpl = struct {
}
};
const SerialImpl = struct {
const SingleThreadedImpl = struct {
fn wait(ptr: *const Atomic(u32), expect: u32, timeout: ?u64) error{Timeout}!void {
if (ptr.loadUnchecked() != expect) {
return;
@ -804,7 +802,7 @@ const PosixImpl = struct {
//
// What we really want here is a Release load, but that doesn't exist under the C11 memory model.
// We could instead do `bucket.pending.fetchAdd(0, Release) == 0` which achieves effectively the same thing,
// but the RMW operation unconditionally stores which invalidates the cache-line for others causing unnecessary contention.
// but the RMW operation unconditionally marks the cache-line as modified for others causing unnecessary fetching/contention.
//
// Instead we opt to do a full-fence + load instead which avoids taking ownership of the cache-line.
// fence(SeqCst) effectively converts the ptr update to SeqCst and the pending load to SeqCst: creating a Store-Load barrier.
@ -962,3 +960,60 @@ test "Futex - broadcasting" {
for (broadcast.threads) |*t| t.* = try std.Thread.spawn(.{}, Broadcast.run, .{&broadcast});
for (broadcast.threads) |t| t.join();
}
/// Deadline is used to wait efficiently for a pointer's value to change using Futex and a fixed timeout.
///
/// Futex's timedWait() api uses a relative duration which suffers from over-waiting
/// when used in a loop which is often required due to the possibility of spurious wakeups.
///
/// Deadline instead converts the relative timeout to an absolute one so that multiple calls
/// to Futex timedWait() can block for and report more accurate error.Timeouts.
pub const Deadline = struct {
timeout: ?u64,
started: std.time.Timer,
/// Create the deadline to expire after the given amount of time in nanoseconds passes.
/// Pass in `null` to have the deadline call `Futex.wait()` and never expire.
pub fn init(expires_in_ns: ?u64) Deadline {
var deadline: Deadline = undefined;
deadline.timeout = expires_in_ns;
// std.time.Timer is required to be supported for somewhat accurate reportings of error.Timeout.
if (deadline.timeout != null) {
deadline.started = std.time.Timer.start() catch unreachable;
}
return deadline;
}
/// Wait until either:
/// - the `ptr`'s value changes from `expect`.
/// - `Futex.wake()` is called on the `ptr`.
/// - A spurious wake occurs.
/// - The deadline expires; In which case `error.Timeout` is returned.
pub fn wait(self: *Deadline, ptr: *const Atomic(u32), expect: u32) error{Timeout}!void {
@setCold(true);
// Check if we actually have a timeout to wait until.
// If not just wait "forever".
const timeout_ns = self.timeout orelse {
return Futex.wait(ptr, expect);
};
// Get how much time has passed since we started waiting
// then subtract that from the init() timeout to get how much longer to wait.
// Use overflow to detect when we've been waiting longer than the init() timeout.
const elapsed_ns = self.started.read();
const until_timeout_ns = std.math.sub(u64, timeout_ns, elapsed_ns) catch 0;
return Futex.timedWait(ptr, expect, until_timeout_ns);
}
};
test "Futex - Deadline" {
var deadline = Deadline.init(100 * std.time.ns_per_ms);
var futex_word = Atomic(u32).init(0);
while (true) {
deadline.wait(&futex_word, 0) catch break;
}
}

View File

@ -1,288 +1,285 @@
//! Lock may be held only once. If the same thread tries to acquire
//! the same mutex twice, it deadlocks. This type supports static
//! initialization and is at most `@sizeOf(usize)` in size. When an
//! application is built in single threaded release mode, all the
//! functions are no-ops. In single threaded debug mode, there is
//! deadlock detection.
//! Mutex is a synchronization primitive which enforces atomic access to a shared region of code known as the "critical section".
//! It does this by blocking ensuring only one thread is in the critical section at any given point in time by blocking the others.
//! Mutex can be statically initialized and is at most `@sizeOf(u64)` large.
//! Use `lock()` or `tryLock()` to enter the critical section and `unlock()` to leave it.
//!
//! Example usage:
//! Example:
//! ```
//! var m = Mutex{};
//!
//! m.lock();
//! defer m.release();
//! ... critical code
//! {
//! m.lock();
//! defer m.unlock();
//! // ... critical section code
//! }
//!
//! Non-blocking:
//! if (m.tryLock()) {
//! defer m.unlock();
//! // ... critical section
//! } else {
//! // ... lock not acquired
//! // ... critical section code
//! }
//! ```
const std = @import("../std.zig");
const builtin = @import("builtin");
const Mutex = @This();
const os = std.os;
const assert = std.debug.assert;
const testing = std.testing;
const Atomic = std.atomic.Atomic;
const Futex = std.Thread.Futex;
impl: Impl = .{},
const Mutex = @This();
const std = @import("../std.zig");
const builtin = @import("builtin");
const os = std.os;
const assert = std.debug.assert;
const windows = os.windows;
const linux = os.linux;
const testing = std.testing;
const StaticResetEvent = std.thread.StaticResetEvent;
/// Try to acquire the mutex without blocking. Returns `false` if the mutex is
/// unavailable. Otherwise returns `true`. Call `unlock` on the mutex to release.
pub fn tryLock(m: *Mutex) bool {
return m.impl.tryLock();
/// Tries to acquire the mutex without blocking the caller's thread.
/// Returns `false` if the calling thread would have to block to acquire it.
/// Otherwise, returns `true` and the caller should `unlock()` the Mutex to release it.
pub fn tryLock(self: *Mutex) bool {
return self.impl.tryLock();
}
/// Acquire the mutex. Deadlocks if the mutex is already
/// held by the calling thread.
pub fn lock(m: *Mutex) void {
m.impl.lock();
/// Acquires the mutex, blocking the caller's thread until it can.
/// It is undefined behavior if the mutex is already held by the caller's thread.
/// Once acquired, call `unlock()` on the Mutex to release it.
pub fn lock(self: *Mutex) void {
self.impl.lock();
}
pub fn unlock(m: *Mutex) void {
m.impl.unlock();
/// Releases the mutex which was previously acquired with `lock()` or `tryLock()`.
/// It is undefined behavior if the mutex is unlocked from a different thread that it was locked from.
pub fn unlock(self: *Mutex) void {
self.impl.unlock();
}
const Impl = if (builtin.single_threaded)
Dummy
SingleThreadedImpl
else if (builtin.os.tag == .windows)
WindowsMutex
else if (std.Thread.use_pthreads)
PthreadMutex
WindowsImpl
else if (builtin.os.tag.isDarwin())
DarwinImpl
else
AtomicMutex;
FutexImpl;
pub const AtomicMutex = struct {
state: State = .unlocked,
const SingleThreadedImpl = struct {
is_locked: bool = false,
const State = enum(i32) {
unlocked,
locked,
waiting,
};
pub fn tryLock(m: *AtomicMutex) bool {
return @cmpxchgStrong(
State,
&m.state,
.unlocked,
.locked,
.Acquire,
.Monotonic,
) == null;
}
pub fn lock(m: *AtomicMutex) void {
switch (@atomicRmw(State, &m.state, .Xchg, .locked, .Acquire)) {
.unlocked => {},
else => |s| m.lockSlow(s),
}
}
pub fn unlock(m: *AtomicMutex) void {
switch (@atomicRmw(State, &m.state, .Xchg, .unlocked, .Release)) {
.unlocked => unreachable,
.locked => {},
.waiting => m.unlockSlow(),
}
}
fn lockSlow(m: *AtomicMutex, current_state: State) void {
@setCold(true);
var new_state = current_state;
var spin: u8 = 0;
while (spin < 100) : (spin += 1) {
const state = @cmpxchgWeak(
State,
&m.state,
.unlocked,
new_state,
.Acquire,
.Monotonic,
) orelse return;
switch (state) {
.unlocked => {},
.locked => {},
.waiting => break,
}
var iter = std.math.min(32, spin + 1);
while (iter > 0) : (iter -= 1)
std.atomic.spinLoopHint();
}
new_state = .waiting;
while (true) {
switch (@atomicRmw(State, &m.state, .Xchg, new_state, .Acquire)) {
.unlocked => return,
else => {},
}
switch (builtin.os.tag) {
.linux => {
switch (linux.getErrno(linux.futex_wait(
@ptrCast(*const i32, &m.state),
linux.FUTEX.PRIVATE_FLAG | linux.FUTEX.WAIT,
@enumToInt(new_state),
null,
))) {
.SUCCESS => {},
.INTR => {},
.AGAIN => {},
else => unreachable,
}
},
else => std.atomic.spinLoopHint(),
}
}
}
fn unlockSlow(m: *AtomicMutex) void {
@setCold(true);
switch (builtin.os.tag) {
.linux => {
switch (linux.getErrno(linux.futex_wake(
@ptrCast(*const i32, &m.state),
linux.FUTEX.PRIVATE_FLAG | linux.FUTEX.WAKE,
1,
))) {
.SUCCESS => {},
.FAULT => unreachable, // invalid pointer passed to futex_wake
else => unreachable,
}
},
else => {},
}
}
};
pub const PthreadMutex = struct {
pthread_mutex: std.c.pthread_mutex_t = .{},
/// Try to acquire the mutex without blocking. Returns true if
/// the mutex is unavailable. Otherwise returns false. Call
/// release when done.
pub fn tryLock(m: *PthreadMutex) bool {
return std.c.pthread_mutex_trylock(&m.pthread_mutex) == .SUCCESS;
}
/// Acquire the mutex. Will deadlock if the mutex is already
/// held by the calling thread.
pub fn lock(m: *PthreadMutex) void {
switch (std.c.pthread_mutex_lock(&m.pthread_mutex)) {
.SUCCESS => {},
.INVAL => unreachable,
.BUSY => unreachable,
.AGAIN => unreachable,
.DEADLK => unreachable,
.PERM => unreachable,
else => unreachable,
}
}
pub fn unlock(m: *PthreadMutex) void {
switch (std.c.pthread_mutex_unlock(&m.pthread_mutex)) {
.SUCCESS => return,
.INVAL => unreachable,
.AGAIN => unreachable,
.PERM => unreachable,
else => unreachable,
}
}
};
/// This has the sematics as `Mutex`, however it does not actually do any
/// synchronization. Operations are safety-checked no-ops.
pub const Dummy = struct {
locked: @TypeOf(lock_init) = lock_init,
const lock_init = if (std.debug.runtime_safety) false else {};
/// Try to acquire the mutex without blocking. Returns false if
/// the mutex is unavailable. Otherwise returns true.
pub fn tryLock(m: *Dummy) bool {
if (std.debug.runtime_safety) {
if (m.locked) return false;
m.locked = true;
}
fn tryLock(self: *Impl) bool {
if (self.is_locked) return false;
self.is_locked = true;
return true;
}
/// Acquire the mutex. Will deadlock if the mutex is already
/// held by the calling thread.
pub fn lock(m: *Dummy) void {
if (!m.tryLock()) {
@panic("deadlock detected");
fn lock(self: *Impl) void {
if (!self.tryLock()) {
unreachable; // deadlock detected
}
}
pub fn unlock(m: *Dummy) void {
if (std.debug.runtime_safety) {
m.locked = false;
fn unlock(self: *Impl) void {
assert(self.is_locked);
self.is_locked = false;
}
};
// SRWLOCK on windows is almost always faster than Futex solution.
// It also implements an efficient Condition with requeue support for us.
const WindowsImpl = struct {
srwlock: os.windows.SRWLOCK = .{},
fn tryLock(self: *Impl) bool {
return os.windows.kernel32.TryAcquireSRWLockExclusive(&self.srwlock) != os.windows.FALSE;
}
fn lock(self: *Impl) void {
os.windows.kernel32.AcquireSRWLockExclusive(&self.srwlock);
}
fn unlock(self: *Impl) void {
os.windows.kernel32.ReleaseSRWLockExclusive(&self.srwlock);
}
};
// os_unfair_lock on darwin supports priority inheritance and is generally faster than Futex solutions.
const DarwinImpl = struct {
oul: os.darwin.os_unfair_lock = .{},
fn tryLock(self: *Impl) bool {
return os.darwin.os_unfair_lock_trylock(&self.oul);
}
fn lock(self: *Impl) void {
os.darwin.os_unfair_lock_lock(&self.oul);
}
fn unlock(self: *Impl) void {
os.darwin.os_unfair_lock_unlock(&self.oul);
}
};
const FutexImpl = struct {
state: Atomic(u32) = Atomic(u32).init(unlocked),
const unlocked = 0b00;
const locked = 0b01;
const contended = 0b11; // must contain the `locked` bit for x86 optimization below
fn tryLock(self: *Impl) bool {
// Lock with compareAndSwap instead of tryCompareAndSwap to avoid reporting spurious CAS failure.
return self.lockFast("compareAndSwap");
}
fn lock(self: *Impl) void {
// Lock with tryCompareAndSwap instead of compareAndSwap due to being more inline-able on LL/SC archs like ARM.
if (!self.lockFast("tryCompareAndSwap")) {
self.lockSlow();
}
}
inline fn lockFast(self: *Impl, comptime casFn: []const u8) bool {
// On x86, use `lock bts` instead of `lock cmpxchg` as:
// - they both seem to mark the cache-line as modified regardless: https://stackoverflow.com/a/63350048
// - `lock bts` is smaller instruction-wise which makes it better for inlining
if (comptime builtin.target.cpu.arch.isX86()) {
const locked_bit = @ctz(u32, @as(u32, locked));
return self.state.bitSet(locked_bit, .Acquire) == 0;
}
// Acquire barrier ensures grabbing the lock happens before the critical section
// and that the previous lock holder's critical section happens before we grab the lock.
return @field(self.state, casFn)(unlocked, locked, .Acquire, .Monotonic) == null;
}
fn lockSlow(self: *Impl) void {
@setCold(true);
// Avoid doing an atomic swap below if we already know the state is contended.
// An atomic swap unconditionally stores which marks the cache-line as modified unnecessarily.
if (self.state.load(.Monotonic) == contended) {
Futex.wait(&self.state, contended);
}
// Try to acquire the lock while also telling the existing lock holder that there are threads waiting.
//
// Once we sleep on the Futex, we must acquire the mutex using `contended` rather than `locked`.
// If not, threads sleeping on the Futex wouldn't see the state change in unlock and potentially deadlock.
// The downside is that the last mutex unlocker will see `contended` and do an unnecessary Futex wake
// but this is better than having to wake all waiting threads on mutex unlock.
//
// Acquire barrier ensures grabbing the lock happens before the critical section
// and that the previous lock holder's critical section happens before we grab the lock.
while (self.state.swap(contended, .Acquire) != unlocked) {
Futex.wait(&self.state, contended);
}
}
fn unlock(self: *Impl) void {
// Unlock the mutex and wake up a waiting thread if any.
//
// A waiting thread will acquire with `contended` instead of `locked`
// which ensures that it wakes up another thread on the next unlock().
//
// Release barrier ensures the critical section happens before we let go of the lock
// and that our critical section happens before the next lock holder grabs the lock.
const state = self.state.swap(unlocked, .Release);
assert(state != unlocked);
if (state == contended) {
Futex.wake(&self.state, 1);
}
}
};
pub const WindowsMutex = struct {
srwlock: windows.SRWLOCK = windows.SRWLOCK_INIT,
pub fn tryLock(m: *WindowsMutex) bool {
return windows.kernel32.TryAcquireSRWLockExclusive(&m.srwlock) != windows.FALSE;
}
pub fn lock(m: *WindowsMutex) void {
windows.kernel32.AcquireSRWLockExclusive(&m.srwlock);
}
pub fn unlock(m: *WindowsMutex) void {
windows.kernel32.ReleaseSRWLockExclusive(&m.srwlock);
}
};
const TestContext = struct {
mutex: *Mutex,
data: i128,
const incr_count = 10000;
};
test "basic usage" {
test "Mutex - smoke test" {
var mutex = Mutex{};
var context = TestContext{
.mutex = &mutex,
.data = 0,
try testing.expect(mutex.tryLock());
try testing.expect(!mutex.tryLock());
mutex.unlock();
mutex.lock();
try testing.expect(!mutex.tryLock());
mutex.unlock();
}
// A counter which is incremented without atomic instructions
const NonAtomicCounter = struct {
// direct u128 could maybe use xmm ops on x86 which are atomic
value: [2]u64 = [_]u64{ 0, 0 },
fn get(self: NonAtomicCounter) u128 {
return @bitCast(u128, self.value);
}
fn inc(self: *NonAtomicCounter) void {
for (@bitCast([2]u64, self.get() + 1)) |v, i| {
@ptrCast(*volatile u64, &self.value[i]).* = v;
}
}
};
test "Mutex - many uncontended" {
// This test requires spawning threads.
if (builtin.single_threaded) {
return error.SkipZigTest;
}
const num_threads = 4;
const num_increments = 1000;
const Runner = struct {
mutex: Mutex = .{},
thread: std.Thread = undefined,
counter: NonAtomicCounter = .{},
fn run(self: *@This()) void {
var i: usize = num_increments;
while (i > 0) : (i -= 1) {
self.mutex.lock();
defer self.mutex.unlock();
self.counter.inc();
}
}
};
var runners = [_]Runner{.{}} ** num_threads;
for (runners) |*r| r.thread = try std.Thread.spawn(.{}, Runner.run, .{r});
for (runners) |r| r.thread.join();
for (runners) |r| try testing.expectEqual(r.counter.get(), num_increments);
}
test "Mutex - many contended" {
// This test requires spawning threads.
if (builtin.single_threaded) {
worker(&context);
try testing.expect(context.data == TestContext.incr_count);
} else {
const thread_count = 10;
var threads: [thread_count]std.Thread = undefined;
for (threads) |*t| {
t.* = try std.Thread.spawn(.{}, worker, .{&context});
return error.SkipZigTest;
}
const num_threads = 4;
const num_increments = 1000;
const Runner = struct {
mutex: Mutex = .{},
counter: NonAtomicCounter = .{},
fn run(self: *@This()) void {
var i: usize = num_increments;
while (i > 0) : (i -= 1) {
// Occasionally hint to let another thread run.
defer if (i % 100 == 0) std.Thread.yield() catch {};
self.mutex.lock();
defer self.mutex.unlock();
self.counter.inc();
}
}
for (threads) |t|
t.join();
};
try testing.expect(context.data == thread_count * TestContext.incr_count);
}
}
fn worker(ctx: *TestContext) void {
var i: usize = 0;
while (i != TestContext.incr_count) : (i += 1) {
ctx.mutex.lock();
defer ctx.mutex.unlock();
ctx.data += 1;
}
var runner = Runner{};
var threads: [num_threads]std.Thread = undefined;
for (threads) |*t| t.* = try std.Thread.spawn(.{}, Runner.run, .{&runner});
for (threads) |t| t.join();
try testing.expectEqual(runner.counter.get(), num_increments * num_threads);
}

View File

@ -1,7 +1,7 @@
const std = @import("../std.zig");
const builtin = @import("builtin");
const testing = std.testing;
const target = @import("builtin").target;
const Ordering = std.atomic.Ordering;
pub fn Atomic(comptime T: type) type {
@ -164,87 +164,13 @@ pub fn Atomic(comptime T: type) type {
return bitRmw(self, .Toggle, bit, ordering);
}
inline fn bitRmw(
self: *Self,
comptime op: BitRmwOp,
bit: Bit,
comptime ordering: Ordering,
) u1 {
inline fn bitRmw(self: *Self, comptime op: BitRmwOp, bit: Bit, comptime ordering: Ordering) u1 {
// x86 supports dedicated bitwise instructions
if (comptime target.cpu.arch.isX86() and @sizeOf(T) >= 2 and @sizeOf(T) <= 8) {
const old_bit: u8 = switch (@sizeOf(T)) {
2 => switch (op) {
.Set => asm volatile ("lock btsw %[bit], %[ptr]"
// LLVM doesn't support u1 flag register return values
: [result] "={@ccc}" (-> u8),
: [ptr] "*p" (&self.value),
[bit] "X" (@as(T, bit)),
: "cc", "memory"
),
.Reset => asm volatile ("lock btrw %[bit], %[ptr]"
// LLVM doesn't support u1 flag register return values
: [result] "={@ccc}" (-> u8),
: [ptr] "*p" (&self.value),
[bit] "X" (@as(T, bit)),
: "cc", "memory"
),
.Toggle => asm volatile ("lock btcw %[bit], %[ptr]"
// LLVM doesn't support u1 flag register return values
: [result] "={@ccc}" (-> u8),
: [ptr] "*p" (&self.value),
[bit] "X" (@as(T, bit)),
: "cc", "memory"
),
},
4 => switch (op) {
.Set => asm volatile ("lock btsl %[bit], %[ptr]"
// LLVM doesn't support u1 flag register return values
: [result] "={@ccc}" (-> u8),
: [ptr] "*p" (&self.value),
[bit] "X" (@as(T, bit)),
: "cc", "memory"
),
.Reset => asm volatile ("lock btrl %[bit], %[ptr]"
// LLVM doesn't support u1 flag register return values
: [result] "={@ccc}" (-> u8),
: [ptr] "*p" (&self.value),
[bit] "X" (@as(T, bit)),
: "cc", "memory"
),
.Toggle => asm volatile ("lock btcl %[bit], %[ptr]"
// LLVM doesn't support u1 flag register return values
: [result] "={@ccc}" (-> u8),
: [ptr] "*p" (&self.value),
[bit] "X" (@as(T, bit)),
: "cc", "memory"
),
},
8 => switch (op) {
.Set => asm volatile ("lock btsq %[bit], %[ptr]"
// LLVM doesn't support u1 flag register return values
: [result] "={@ccc}" (-> u8),
: [ptr] "*p" (&self.value),
[bit] "X" (@as(T, bit)),
: "cc", "memory"
),
.Reset => asm volatile ("lock btrq %[bit], %[ptr]"
// LLVM doesn't support u1 flag register return values
: [result] "={@ccc}" (-> u8),
: [ptr] "*p" (&self.value),
[bit] "X" (@as(T, bit)),
: "cc", "memory"
),
.Toggle => asm volatile ("lock btcq %[bit], %[ptr]"
// LLVM doesn't support u1 flag register return values
: [result] "={@ccc}" (-> u8),
: [ptr] "*p" (&self.value),
[bit] "X" (@as(T, bit)),
: "cc", "memory"
),
},
else => @compileError("Invalid atomic type " ++ @typeName(T)),
};
return @intCast(u1, old_bit);
if (comptime builtin.target.cpu.arch.isX86() and @sizeOf(T) >= 2 and @sizeOf(T) <= 8) {
// TODO: stage2 currently doesn't like the inline asm this function emits.
if (builtin.zig_backend == .stage1) {
return x86BitRmw(self, op, bit, ordering);
}
}
const mask = @as(T, 1) << bit;
@ -256,6 +182,86 @@ pub fn Atomic(comptime T: type) type {
return @boolToInt(value & mask != 0);
}
inline fn x86BitRmw(self: *Self, comptime op: BitRmwOp, bit: Bit, comptime ordering: Ordering) u1 {
const old_bit: u8 = switch (@sizeOf(T)) {
2 => switch (op) {
.Set => asm volatile ("lock btsw %[bit], %[ptr]"
// LLVM doesn't support u1 flag register return values
: [result] "={@ccc}" (-> u8),
: [ptr] "*p" (&self.value),
[bit] "X" (@as(T, bit)),
: "cc", "memory"
),
.Reset => asm volatile ("lock btrw %[bit], %[ptr]"
// LLVM doesn't support u1 flag register return values
: [result] "={@ccc}" (-> u8),
: [ptr] "*p" (&self.value),
[bit] "X" (@as(T, bit)),
: "cc", "memory"
),
.Toggle => asm volatile ("lock btcw %[bit], %[ptr]"
// LLVM doesn't support u1 flag register return values
: [result] "={@ccc}" (-> u8),
: [ptr] "*p" (&self.value),
[bit] "X" (@as(T, bit)),
: "cc", "memory"
),
},
4 => switch (op) {
.Set => asm volatile ("lock btsl %[bit], %[ptr]"
// LLVM doesn't support u1 flag register return values
: [result] "={@ccc}" (-> u8),
: [ptr] "*p" (&self.value),
[bit] "X" (@as(T, bit)),
: "cc", "memory"
),
.Reset => asm volatile ("lock btrl %[bit], %[ptr]"
// LLVM doesn't support u1 flag register return values
: [result] "={@ccc}" (-> u8),
: [ptr] "*p" (&self.value),
[bit] "X" (@as(T, bit)),
: "cc", "memory"
),
.Toggle => asm volatile ("lock btcl %[bit], %[ptr]"
// LLVM doesn't support u1 flag register return values
: [result] "={@ccc}" (-> u8),
: [ptr] "*p" (&self.value),
[bit] "X" (@as(T, bit)),
: "cc", "memory"
),
},
8 => switch (op) {
.Set => asm volatile ("lock btsq %[bit], %[ptr]"
// LLVM doesn't support u1 flag register return values
: [result] "={@ccc}" (-> u8),
: [ptr] "*p" (&self.value),
[bit] "X" (@as(T, bit)),
: "cc", "memory"
),
.Reset => asm volatile ("lock btrq %[bit], %[ptr]"
// LLVM doesn't support u1 flag register return values
: [result] "={@ccc}" (-> u8),
: [ptr] "*p" (&self.value),
[bit] "X" (@as(T, bit)),
: "cc", "memory"
),
.Toggle => asm volatile ("lock btcq %[bit], %[ptr]"
// LLVM doesn't support u1 flag register return values
: [result] "={@ccc}" (-> u8),
: [ptr] "*p" (&self.value),
[bit] "X" (@as(T, bit)),
: "cc", "memory"
),
},
else => @compileError("Invalid atomic type " ++ @typeName(T)),
};
// TODO: emit appropriate tsan fence if compiling with tsan
_ = ordering;
return @intCast(u1, old_bit);
}
});
};
}

View File

@ -151,12 +151,12 @@ pub const Config = struct {
/// What type of mutex you'd like to use, for thread safety.
/// when specfied, the mutex type must have the same shape as `std.Thread.Mutex` and
/// `std.Thread.Mutex.Dummy`, and have no required fields. Specifying this field causes
/// `DummyMutex`, and have no required fields. Specifying this field causes
/// the `thread_safe` field to be ignored.
///
/// when null (default):
/// * the mutex type defaults to `std.Thread.Mutex` when thread_safe is enabled.
/// * the mutex type defaults to `std.Thread.Mutex.Dummy` otherwise.
/// * the mutex type defaults to `DummyMutex` otherwise.
MutexType: ?type = null,
/// This is a temporary debugging trick you can use to turn segfaults into more helpful
@ -198,7 +198,12 @@ pub fn GeneralPurposeAllocator(comptime config: Config) type {
else if (config.thread_safe)
std.Thread.Mutex{}
else
std.Thread.Mutex.Dummy{};
DummyMutex{};
const DummyMutex = struct {
fn lock(_: *DummyMutex) void {}
fn unlock(_: *DummyMutex) void {}
};
const stack_n = config.stack_trace_frames;
const one_trace_size = @sizeOf(usize) * stack_n;

View File

@ -3680,10 +3680,15 @@ pub const OBJECT_NAME_INFORMATION = extern struct {
Name: UNICODE_STRING,
};
pub const SRWLOCK = usize;
pub const SRWLOCK_INIT: SRWLOCK = 0;
pub const CONDITION_VARIABLE = usize;
pub const CONDITION_VARIABLE_INIT: CONDITION_VARIABLE = 0;
pub const SRWLOCK_INIT = SRWLOCK{};
pub const SRWLOCK = extern struct {
Ptr: ?PVOID = null,
};
pub const CONDITION_VARIABLE_INIT = CONDITION_VARIABLE{};
pub const CONDITION_VARIABLE = extern struct {
Ptr: ?PVOID = null,
};
pub const FILE_SKIP_COMPLETION_PORT_ON_SUCCESS = 0x1;
pub const FILE_SKIP_SET_EVENT_ON_HANDLE = 0x2;