From 963ac60918b39cafdda3cb99eff4cd9d20edd839 Mon Sep 17 00:00:00 2001 From: protty <45520026+kprotty@users.noreply.github.com> Date: Sat, 23 Apr 2022 19:35:56 -0500 Subject: [PATCH] std.Thread: Mutex and Condition improvements (#11497) * Thread: minor cleanups * Thread: rewrite Mutex * Thread: introduce Futex.Deadline * Thread: Condition rewrite + cleanup * Mutex: optimize lock fast path * Condition: more docs * Thread: more mutex + condition docs * Thread: remove broken Condition test * Thread: zig fmt * address review comments + fix Thread.DummyMutex in GPA * Atomic: disable bitRmw x86 inline asm for stage2 * GPA: typo mutex_init * Thread: remove noalias on stuff * Thread: comment typos + clarifications --- lib/std/Thread.zig | 27 +- lib/std/Thread/Condition.zig | 793 ++++++++++++--------- lib/std/Thread/Futex.zig | 69 +- lib/std/Thread/Mutex.zig | 495 +++++++------ lib/std/atomic/Atomic.zig | 168 ++--- lib/std/heap/general_purpose_allocator.zig | 11 +- lib/std/os/windows.zig | 13 +- 7 files changed, 873 insertions(+), 703 deletions(-) diff --git a/lib/std/Thread.zig b/lib/std/Thread.zig index e28471d6b3..db7117fdd7 100644 --- a/lib/std/Thread.zig +++ b/lib/std/Thread.zig @@ -459,9 +459,8 @@ const UnsupportedImpl = struct { } fn unsupported(unusued: anytype) noreturn { - @compileLog("Unsupported operating system", target.os.tag); _ = unusued; - unreachable; + @compileError("Unsupported operating system " ++ @tagName(target.os.tag)); } }; @@ -1188,27 +1187,3 @@ test "Thread.detach" { event.wait(); try std.testing.expectEqual(value, 1); } - -fn testWaitForSignal(mutex: *Mutex, cond: *Condition) void { - mutex.lock(); - defer mutex.unlock(); - cond.signal(); - cond.wait(mutex); -} - -test "Condition.signal" { - if (builtin.single_threaded) return error.SkipZigTest; - - var mutex = Mutex{}; - var cond = Condition{}; - - var thread: Thread = undefined; - { - mutex.lock(); - defer mutex.unlock(); - thread = try Thread.spawn(.{}, testWaitForSignal, .{ &mutex, &cond }); - cond.wait(&mutex); - cond.signal(); - } - thread.join(); -} diff --git a/lib/std/Thread/Condition.zig b/lib/std/Thread/Condition.zig index fb48db8e53..d5855ec066 100644 --- a/lib/std/Thread/Condition.zig +++ b/lib/std/Thread/Condition.zig @@ -1,411 +1,538 @@ -//! A condition provides a way for a kernel thread to block until it is signaled -//! to wake up. Spurious wakeups are possible. -//! This API supports static initialization and does not require deinitialization. - -impl: Impl = .{}, +//! Condition variables are used with a Mutex to efficiently wait for an arbitrary condition to occur. +//! It does this by atomically unlocking the mutex, blocking the thread until notified, and finally re-locking the mutex. +//! Condition can be statically initialized and is at most `@sizeOf(u64)` large. +//! +//! Example: +//! ``` +//! var m = Mutex{}; +//! var c = Condition{}; +//! var predicate = false; +//! +//! fn consumer() void { +//! m.lock(); +//! defer m.unlock(); +//! +//! while (!predicate) { +//! c.wait(&mutex); +//! } +//! } +//! +//! fn producer() void { +//! m.lock(); +//! defer m.unlock(); +//! +//! predicate = true; +//! c.signal(); +//! } +//! +//! const thread = try std.Thread.spawn(.{}, producer, .{}); +//! consumer(); +//! thread.join(); +//! ``` +//! +//! Note that condition variables can only reliably unblock threads that are sequenced before them using the same Mutex. +//! This means that the following is allowed to deadlock: +//! ``` +//! thread-1: mutex.lock() +//! thread-1: condition.wait(&mutex) +//! +//! thread-2: // mutex.lock() (without this, the following signal may not see the waiting thread-1) +//! thread-2: // mutex.unlock() (this is optional for correctness once locked above, as signal can be called without holding the mutex) +//! thread-2: condition.signal() +//! ``` const std = @import("../std.zig"); const builtin = @import("builtin"); const Condition = @This(); -const windows = std.os.windows; -const linux = std.os.linux; const Mutex = std.Thread.Mutex; + +const os = std.os; const assert = std.debug.assert; const testing = std.testing; +const Atomic = std.atomic.Atomic; +const Futex = std.Thread.Futex; -pub fn wait(cond: *Condition, mutex: *Mutex) void { - cond.impl.wait(mutex); +impl: Impl = .{}, + +/// Atomically releases the Mutex, blocks the caller thread, then re-acquires the Mutex on return. +/// "Atomically" here refers to accesses done on the Condition after acquiring the Mutex. +/// +/// The Mutex must be locked by the caller's thread when this function is called. +/// A Mutex can have multiple Conditions waiting with it concurrently, but not the opposite. +/// It is undefined behavior for multiple threads to wait ith different mutexes using the same Condition concurrently. +/// Once threads have finished waiting with one Mutex, the Condition can be used to wait with another Mutex. +/// +/// A blocking call to wait() is unblocked from one of the following conditions: +/// - a spurious ("at random") wake up occurs +/// - a future call to `signal()` or `broadcast()` which has acquired the Mutex and is sequenced after this `wait()`. +/// +/// Given wait() can be interrupted spuriously, the blocking condition should be checked continuously +/// irrespective of any notifications from `signal()` or `broadcast()`. +pub fn wait(self: *Condition, mutex: *Mutex) void { + self.impl.wait(mutex, null) catch |err| switch (err) { + error.Timeout => unreachable, // no timeout provided so we shouldn't have timed-out + }; } -pub fn timedWait(cond: *Condition, mutex: *Mutex, timeout_ns: u64) error{TimedOut}!void { - try cond.impl.timedWait(mutex, timeout_ns); +/// Atomically releases the Mutex, blocks the caller thread, then re-acquires the Mutex on return. +/// "Atomically" here refers to accesses done on the Condition after acquiring the Mutex. +/// +/// The Mutex must be locked by the caller's thread when this function is called. +/// A Mutex can have multiple Conditions waiting with it concurrently, but not the opposite. +/// It is undefined behavior for multiple threads to wait ith different mutexes using the same Condition concurrently. +/// Once threads have finished waiting with one Mutex, the Condition can be used to wait with another Mutex. +/// +/// A blocking call to `timedWait()` is unblocked from one of the following conditions: +/// - a spurious ("at random") wake occurs +/// - the caller was blocked for around `timeout_ns` nanoseconds, in which `error.Timeout` is returned. +/// - a future call to `signal()` or `broadcast()` which has acquired the Mutex and is sequenced after this `timedWait()`. +/// +/// Given `timedWait()` can be interrupted spuriously, the blocking condition should be checked continuously +/// irrespective of any notifications from `signal()` or `broadcast()`. +pub fn timedWait(self: *Condition, mutex: *Mutex, timeout_ns: u64) error{Timeout}!void { + return self.impl.wait(mutex, timeout_ns); } -pub fn signal(cond: *Condition) void { - cond.impl.signal(); +/// Unblocks at least one thread blocked in a call to `wait()` or `timedWait()` with a given Mutex. +/// The blocked thread must be sequenced before this call with respect to acquiring the same Mutex in order to be observable for unblocking. +/// `signal()` can be called with or without the relevant Mutex being acquired and have no "effect" if there's no observable blocked threads. +pub fn signal(self: *Condition) void { + self.impl.wake(.one); } -pub fn broadcast(cond: *Condition) void { - cond.impl.broadcast(); +/// Unblocks all threads currently blocked in a call to `wait()` or `timedWait()` with a given Mutex. +/// The blocked threads must be sequenced before this call with respect to acquiring the same Mutex in order to be observable for unblocking. +/// `broadcast()` can be called with or without the relevant Mutex being acquired and have no "effect" if there's no observable blocked threads. +pub fn broadcast(self: *Condition) void { + self.impl.wake(.all); } const Impl = if (builtin.single_threaded) - SingleThreadedCondition + SingleThreadedImpl else if (builtin.os.tag == .windows) - WindowsCondition -else if (std.Thread.use_pthreads) - PthreadCondition + WindowsImpl else - AtomicCondition; + FutexImpl; -pub const SingleThreadedCondition = struct { - pub fn wait(cond: *SingleThreadedCondition, mutex: *Mutex) void { - _ = cond; - _ = mutex; - unreachable; // deadlock detected - } +const Notify = enum { + one, // wake up only one thread + all, // wake up all threads +}; - pub fn timedWait(cond: *SingleThreadedCondition, mutex: *Mutex, timeout_ns: u64) error{TimedOut}!void { - _ = cond; +const SingleThreadedImpl = struct { + fn wait(self: *Impl, mutex: *Mutex, timeout: ?u64) error{Timeout}!void { + _ = self; _ = mutex; - _ = timeout_ns; + + // There are no other threads to wake us up. + // So if we wait without a timeout we would never wake up. + const timeout_ns = timeout orelse { + unreachable; // deadlock detected + }; + std.time.sleep(timeout_ns); - return error.TimedOut; + return error.Timeout; } - pub fn signal(cond: *SingleThreadedCondition) void { - _ = cond; - } - - pub fn broadcast(cond: *SingleThreadedCondition) void { - _ = cond; + fn wake(self: *Impl, comptime notify: Notify) void { + // There are no other threads to wake up. + _ = self; + _ = notify; } }; -pub const WindowsCondition = struct { - cond: windows.CONDITION_VARIABLE = windows.CONDITION_VARIABLE_INIT, +const WindowsImpl = struct { + condition: os.windows.CONDITION_VARIABLE = .{}, - pub fn wait(cond: *WindowsCondition, mutex: *Mutex) void { - const rc = windows.kernel32.SleepConditionVariableSRW( - &cond.cond, + fn wait(self: *Impl, mutex: *Mutex, timeout: ?u64) error{Timeout}!void { + var timeout_overflowed = false; + var timeout_ms: os.windows.DWORD = os.windows.INFINITE; + + if (timeout) |timeout_ns| { + // Round the nanoseconds to the nearest millisecond, + // then saturating cast it to windows DWORD for use in kernel32 call. + const ms = (timeout_ns +| (std.time.ns_per_ms / 2)) / std.time.ns_per_ms; + timeout_ms = std.math.cast(os.windows.DWORD, ms) catch std.math.maxInt(os.windows.DWORD); + + // Track if the timeout overflowed into INFINITE and make sure not to wait forever. + if (timeout_ms == os.windows.INFINITE) { + timeout_overflowed = true; + timeout_ms -= 1; + } + } + + const rc = os.windows.kernel32.SleepConditionVariableSRW( + &self.condition, &mutex.impl.srwlock, - windows.INFINITE, - @as(windows.ULONG, 0), + timeout_ms, + 0, // the srwlock was assumed to acquired in exclusive mode not shared ); - assert(rc != windows.FALSE); + + // Return error.Timeout if we know the timeout elapsed correctly. + if (rc == os.windows.FALSE) { + assert(os.windows.kernel32.GetLastError() == .TIMEOUT); + if (!timeout_overflowed) return error.Timeout; + } } - pub fn timedWait(cond: *WindowsCondition, mutex: *Mutex, timeout_ns: u64) error{TimedOut}!void { - var timeout_checked = std.math.cast(windows.DWORD, timeout_ns / std.time.ns_per_ms) catch overflow: { - break :overflow std.math.maxInt(windows.DWORD); - }; - - // Handle the case where timeout is INFINITE, otherwise SleepConditionVariableSRW's time-out never elapses - const timeout_overflowed = timeout_checked == windows.INFINITE; - timeout_checked -= @boolToInt(timeout_overflowed); - - const rc = windows.kernel32.SleepConditionVariableSRW( - &cond.cond, - &mutex.impl.srwlock, - timeout_checked, - @as(windows.ULONG, 0), - ); - if (rc == windows.FALSE and windows.kernel32.GetLastError() == windows.Win32Error.TIMEOUT) return error.TimedOut; - assert(rc != windows.FALSE); - } - - pub fn signal(cond: *WindowsCondition) void { - windows.kernel32.WakeConditionVariable(&cond.cond); - } - - pub fn broadcast(cond: *WindowsCondition) void { - windows.kernel32.WakeAllConditionVariable(&cond.cond); + fn wake(self: *Impl, comptime notify: Notify) void { + switch (notify) { + .one => os.windows.kernel32.WakeConditionVariable(&self.condition), + .all => os.windows.kernel32.WakeAllConditionVariable(&self.condition), + } } }; -pub const PthreadCondition = struct { - cond: std.c.pthread_cond_t = .{}, +const FutexImpl = struct { + state: Atomic(u32) = Atomic(u32).init(0), + epoch: Atomic(u32) = Atomic(u32).init(0), - pub fn wait(cond: *PthreadCondition, mutex: *Mutex) void { - const rc = std.c.pthread_cond_wait(&cond.cond, &mutex.impl.pthread_mutex); - assert(rc == .SUCCESS); - } + const one_waiter = 1; + const waiter_mask = 0xffff; - pub fn timedWait(cond: *PthreadCondition, mutex: *Mutex, timeout_ns: u64) error{TimedOut}!void { - var ts: std.os.timespec = undefined; - std.os.clock_gettime(std.os.CLOCK.REALTIME, &ts) catch unreachable; - ts.tv_sec += @intCast(@TypeOf(ts.tv_sec), timeout_ns / std.time.ns_per_s); - ts.tv_nsec += @intCast(@TypeOf(ts.tv_nsec), timeout_ns % std.time.ns_per_s); - if (ts.tv_nsec >= std.time.ns_per_s) { - ts.tv_sec += 1; - ts.tv_nsec -= std.time.ns_per_s; - } + const one_signal = 1 << 16; + const signal_mask = 0xffff << 16; - const rc = std.c.pthread_cond_timedwait(&cond.cond, &mutex.impl.pthread_mutex, &ts); - return switch (rc) { - .SUCCESS => {}, - .TIMEDOUT => error.TimedOut, - else => unreachable, - }; - } + fn wait(self: *Impl, mutex: *Mutex, timeout: ?u64) error{Timeout}!void { + // Register that we're waiting on the state by incrementing the wait count. + // This assumes that there can be at most ((1<<16)-1) or 65,355 threads concurrently waiting on the same Condvar. + // If this is hit in practice, then this condvar not working is the least of your concerns. + var state = self.state.fetchAdd(one_waiter, .Monotonic); + assert(state & waiter_mask != waiter_mask); + state += one_waiter; - pub fn signal(cond: *PthreadCondition) void { - const rc = std.c.pthread_cond_signal(&cond.cond); - assert(rc == .SUCCESS); - } - - pub fn broadcast(cond: *PthreadCondition) void { - const rc = std.c.pthread_cond_broadcast(&cond.cond); - assert(rc == .SUCCESS); - } -}; - -pub const AtomicCondition = struct { - pending: bool = false, - queue_mutex: Mutex = .{}, - queue_list: QueueList = .{}, - - pub const QueueList = std.SinglyLinkedList(QueueItem); - - pub const QueueItem = struct { - futex: i32 = 0, - dequeued: bool = false, - - fn wait(cond: *@This()) void { - while (@atomicLoad(i32, &cond.futex, .Acquire) == 0) { - switch (builtin.os.tag) { - .linux => { - switch (linux.getErrno(linux.futex_wait( - &cond.futex, - linux.FUTEX.PRIVATE_FLAG | linux.FUTEX.WAIT, - 0, - null, - ))) { - .SUCCESS => {}, - .INTR => {}, - .AGAIN => {}, - else => unreachable, - } - }, - else => std.atomic.spinLoopHint(), - } - } - } - - pub fn timedWait(cond: *@This(), timeout_ns: u64) error{TimedOut}!void { - const start_time = std.time.nanoTimestamp(); - while (@atomicLoad(i32, &cond.futex, .Acquire) == 0) { - switch (builtin.os.tag) { - .linux => { - var ts: std.os.timespec = undefined; - ts.tv_sec = @intCast(@TypeOf(ts.tv_sec), timeout_ns / std.time.ns_per_s); - ts.tv_nsec = @intCast(@TypeOf(ts.tv_nsec), timeout_ns % std.time.ns_per_s); - switch (linux.getErrno(linux.futex_wait( - &cond.futex, - linux.FUTEX.PRIVATE_FLAG | linux.FUTEX.WAIT, - 0, - &ts, - ))) { - .SUCCESS => {}, - .INTR => {}, - .AGAIN => {}, - .TIMEDOUT => return error.TimedOut, - .INVAL => {}, // possibly timeout overflow - .FAULT => unreachable, - else => unreachable, - } - }, - else => { - if (std.time.nanoTimestamp() - start_time >= timeout_ns) { - return error.TimedOut; - } - std.atomic.spinLoopHint(); - }, - } - } - } - - fn notify(cond: *@This()) void { - @atomicStore(i32, &cond.futex, 1, .Release); - - switch (builtin.os.tag) { - .linux => { - switch (linux.getErrno(linux.futex_wake( - &cond.futex, - linux.FUTEX.PRIVATE_FLAG | linux.FUTEX.WAKE, - 1, - ))) { - .SUCCESS => {}, - .FAULT => {}, - else => unreachable, - } - }, - else => {}, - } - } - }; - - pub fn wait(cond: *AtomicCondition, mutex: *Mutex) void { - var waiter = QueueList.Node{ .data = .{} }; - - { - cond.queue_mutex.lock(); - defer cond.queue_mutex.unlock(); - - cond.queue_list.prepend(&waiter); - @atomicStore(bool, &cond.pending, true, .SeqCst); - } - - mutex.unlock(); - waiter.data.wait(); - mutex.lock(); - } - - pub fn timedWait(cond: *AtomicCondition, mutex: *Mutex, timeout_ns: u64) error{TimedOut}!void { - var waiter = QueueList.Node{ .data = .{} }; - - { - cond.queue_mutex.lock(); - defer cond.queue_mutex.unlock(); - - cond.queue_list.prepend(&waiter); - @atomicStore(bool, &cond.pending, true, .SeqCst); - } - - var timed_out = false; + // Temporarily release the mutex in order to block on the condition variable. mutex.unlock(); defer mutex.lock(); - waiter.data.timedWait(timeout_ns) catch |err| switch (err) { - error.TimedOut => { - defer if (!timed_out) { - waiter.data.wait(); - }; - cond.queue_mutex.lock(); - defer cond.queue_mutex.unlock(); - if (!waiter.data.dequeued) { - timed_out = true; - cond.queue_list.remove(&waiter); - } - }, - else => unreachable, - }; + var futex_deadline = Futex.Deadline.init(timeout); + while (true) { + // Try to wake up by consuming a signal and decremented the waiter we added previously. + // Acquire barrier ensures code before the wake() which added the signal happens before we decrement it and return. + while (state & signal_mask != 0) { + const new_state = state - one_waiter - one_signal; + state = self.state.tryCompareAndSwap(state, new_state, .Acquire, .Monotonic) orelse return; + } - if (timed_out) { - return error.TimedOut; + // Observe the epoch, then check the state again to see if we should wake up. + // The epoch must be observed before we check the state or we could potentially miss a wake() and deadlock: + // + // - T1: s = LOAD(&state) + // - T2: UPDATE(&s, signal) + // - T2: UPDATE(&epoch, 1) + FUTEX_WAKE(&epoch) + // - T1: e = LOAD(&epoch) (was reordered after the state load) + // - T1: s & signals == 0 -> FUTEX_WAIT(&epoch, e) (missed the state update + the epoch change) + // + // Acquire barrier to ensure the epoch load happens before the state load. + const epoch = self.epoch.load(.Acquire); + state = self.state.load(.Monotonic); + if (state & signal_mask != 0) { + continue; + } + + futex_deadline.wait(&self.epoch, epoch) catch |err| switch (err) { + // On timeout, we must decrement the waiter we added above. + error.Timeout => { + while (true) { + // If there's a signal when we're timing out, consume it and report being woken up instead. + // Acquire barrier ensures code before the wake() which added the signal happens before we decrement it and return. + while (state & signal_mask != 0) { + const new_state = state - one_waiter - one_signal; + state = self.state.tryCompareAndSwap(state, new_state, .Acquire, .Monotonic) orelse return; + } + + // Remove the waiter we added and officially return timed out. + const new_state = state - one_waiter; + state = self.state.tryCompareAndSwap(state, new_state, .Monotonic, .Monotonic) orelse return err; + } + }, + }; } } - pub fn signal(cond: *AtomicCondition) void { - if (@atomicLoad(bool, &cond.pending, .SeqCst) == false) - return; + fn wake(self: *Impl, comptime notify: Notify) void { + var state = self.state.load(.Monotonic); + while (true) { + const waiters = (state & waiter_mask) / one_waiter; + const signals = (state & signal_mask) / one_signal; - const maybe_waiter = blk: { - cond.queue_mutex.lock(); - defer cond.queue_mutex.unlock(); - - const maybe_waiter = cond.queue_list.popFirst(); - if (maybe_waiter) |waiter| { - waiter.data.dequeued = true; - } - @atomicStore(bool, &cond.pending, cond.queue_list.first != null, .SeqCst); - break :blk maybe_waiter; - }; - - if (maybe_waiter) |waiter| { - waiter.data.notify(); - } - } - - pub fn broadcast(cond: *AtomicCondition) void { - if (@atomicLoad(bool, &cond.pending, .SeqCst) == false) - return; - - @atomicStore(bool, &cond.pending, false, .SeqCst); - - var waiters = blk: { - cond.queue_mutex.lock(); - defer cond.queue_mutex.unlock(); - - const waiters = cond.queue_list; - - var it = waiters.first; - while (it) |node| : (it = node.next) { - node.data.dequeued = true; + // Reserves which waiters to wake up by incrementing the signals count. + // Therefor, the signals count is always less than or equal to the waiters count. + // We don't need to Futex.wake if there's nothing to wake up or if other wake() threads have reserved to wake up the current waiters. + const wakeable = waiters - signals; + if (wakeable == 0) { + return; } - cond.queue_list = .{}; - break :blk waiters; - }; + const to_wake = switch (notify) { + .one => 1, + .all => wakeable, + }; - while (waiters.popFirst()) |waiter| { - waiter.data.notify(); + // Reserve the amount of waiters to wake by incrementing the signals count. + // Release barrier ensures code before the wake() happens before the signal it posted and consumed by the wait() threads. + const new_state = state + (one_signal * to_wake); + state = self.state.tryCompareAndSwap(state, new_state, .Release, .Monotonic) orelse { + // Wake up the waiting threads we reserved above by changing the epoch value. + // NOTE: a waiting thread could miss a wake up if *exactly* ((1<<32)-1) wake()s happen between it observing the epoch and sleeping on it. + // This is very unlikely due to how many precise amount of Futex.wake() calls that would be between the waiting thread's potential preemption. + // + // Release barrier ensures the signal being added to the state happens before the epoch is changed. + // If not, the waiting thread could potentially deadlock from missing both the state and epoch change: + // + // - T2: UPDATE(&epoch, 1) (reordered before the state change) + // - T1: e = LOAD(&epoch) + // - T1: s = LOAD(&state) + // - T2: UPDATE(&state, signal) + FUTEX_WAKE(&epoch) + // - T1: s & signals == 0 -> FUTEX_WAIT(&epoch, e) (missed both epoch change and state change) + _ = self.epoch.fetchAdd(1, .Release); + Futex.wake(&self.epoch, to_wake); + return; + }; } } }; -test "Thread.Condition" { +test "Condition - smoke test" { + var mutex = Mutex{}; + var cond = Condition{}; + + // Try to wake outside the mutex + defer cond.signal(); + defer cond.broadcast(); + + mutex.lock(); + defer mutex.unlock(); + + // Try to wait with a timeout (should not deadlock) + try testing.expectError(error.Timeout, cond.timedWait(&mutex, 0)); + try testing.expectError(error.Timeout, cond.timedWait(&mutex, std.time.ns_per_ms)); + + // Try to wake inside the mutex. + cond.signal(); + cond.broadcast(); +} + +// Inspired from: https://github.com/Amanieu/parking_lot/pull/129 +test "Condition - wait and signal" { + // This test requires spawning threads if (builtin.single_threaded) { return error.SkipZigTest; } - const TestContext = struct { - cond: *Condition, - cond_main: *Condition, - mutex: *Mutex, - n: *i32, - fn worker(ctx: *@This()) void { - ctx.mutex.lock(); - ctx.n.* += 1; - ctx.cond_main.signal(); - ctx.cond.wait(ctx.mutex); - ctx.n.* -= 1; - ctx.cond_main.signal(); - ctx.mutex.unlock(); + const num_threads = 4; + + const MultiWait = struct { + mutex: Mutex = .{}, + cond: Condition = .{}, + threads: [num_threads]std.Thread = undefined, + + fn run(self: *@This()) void { + self.mutex.lock(); + defer self.mutex.unlock(); + + self.cond.wait(&self.mutex); + self.cond.timedWait(&self.mutex, std.time.ns_per_ms) catch {}; + self.cond.signal(); } }; - const num_threads = 3; - var threads: [num_threads]std.Thread = undefined; - var cond = Condition{}; - var cond_main = Condition{}; - var mut = Mutex{}; - var n: i32 = 0; - var ctx = TestContext{ .cond = &cond, .cond_main = &cond_main, .mutex = &mut, .n = &n }; - mut.lock(); - for (threads) |*t| t.* = try std.Thread.spawn(.{}, TestContext.worker, .{&ctx}); - cond_main.wait(&mut); - while (n < num_threads) cond_main.wait(&mut); + var multi_wait = MultiWait{}; + for (multi_wait.threads) |*t| { + t.* = try std.Thread.spawn(.{}, MultiWait.run, .{&multi_wait}); + } - cond.signal(); - cond_main.wait(&mut); - try testing.expect(n == (num_threads - 1)); + std.time.sleep(100 * std.time.ns_per_ms); - cond.broadcast(); - while (n > 0) cond_main.wait(&mut); - try testing.expect(n == 0); - - for (threads) |t| t.join(); + multi_wait.cond.signal(); + for (multi_wait.threads) |t| { + t.join(); + } } -test "Thread.Condition.timedWait" { +test "Condition - signal" { + // This test requires spawning threads if (builtin.single_threaded) { return error.SkipZigTest; } - var cond = Condition{}; - var mut = Mutex{}; + const num_threads = 4; - // Expect a timeout, as the condition variable is never signaled - { - mut.lock(); - defer mut.unlock(); - try testing.expectError(error.TimedOut, cond.timedWait(&mut, 10 * std.time.ns_per_ms)); + const SignalTest = struct { + mutex: Mutex = .{}, + cond: Condition = .{}, + notified: bool = false, + threads: [num_threads]std.Thread = undefined, + + fn run(self: *@This()) void { + self.mutex.lock(); + defer self.mutex.unlock(); + + // Use timedWait() a few times before using wait() + // to test multiple threads timing out frequently. + var i: usize = 0; + while (!self.notified) : (i +%= 1) { + if (i < 5) { + self.cond.timedWait(&self.mutex, 1) catch {}; + } else { + self.cond.wait(&self.mutex); + } + } + + // Once we received the signal, notify another thread (inside the lock). + assert(self.notified); + self.cond.signal(); + } + }; + + var signal_test = SignalTest{}; + for (signal_test.threads) |*t| { + t.* = try std.Thread.spawn(.{}, SignalTest.run, .{&signal_test}); } - // Expect a signal before timeout { - const TestContext = struct { - cond: *Condition, - mutex: *Mutex, - n: *u32, - fn worker(ctx: *@This()) void { - ctx.mutex.lock(); - defer ctx.mutex.unlock(); - ctx.n.* = 1; - ctx.cond.signal(); - } - }; + // Wait for a bit in hopes that the spawned threads start queuing up on the condvar + std.time.sleep(10 * std.time.ns_per_ms); - var n: u32 = 0; + // Wake up one of them (outside the lock) after setting notified=true. + defer signal_test.cond.signal(); - var ctx = TestContext{ .cond = &cond, .mutex = &mut, .n = &n }; - mut.lock(); - var thread = try std.Thread.spawn(.{}, TestContext.worker, .{&ctx}); - // Looped check to handle spurious wakeups - while (n != 1) try cond.timedWait(&mut, 500 * std.time.ns_per_ms); - mut.unlock(); - try testing.expect(n == 1); - thread.join(); + signal_test.mutex.lock(); + defer signal_test.mutex.unlock(); + + try testing.expect(!signal_test.notified); + signal_test.notified = true; + } + + for (signal_test.threads) |t| { + t.join(); + } +} + +test "Condition - multi signal" { + // This test requires spawning threads + if (builtin.single_threaded) { + return error.SkipZigTest; + } + + const num_threads = 4; + const num_iterations = 4; + + const Paddle = struct { + mutex: Mutex = .{}, + cond: Condition = .{}, + value: u32 = 0, + + fn hit(self: *@This()) void { + defer self.cond.signal(); + + self.mutex.lock(); + defer self.mutex.unlock(); + + self.value += 1; + } + + fn run(self: *@This(), hit_to: *@This()) !void { + self.mutex.lock(); + defer self.mutex.unlock(); + + var current: u32 = 0; + while (current < num_iterations) : (current += 1) { + // Wait for the value to change from hit() + while (self.value == current) { + self.cond.wait(&self.mutex); + } + + // hit the next paddle + try testing.expectEqual(self.value, current + 1); + hit_to.hit(); + } + } + }; + + var paddles = [_]Paddle{.{}} ** num_threads; + var threads = [_]std.Thread{undefined} ** num_threads; + + // Create a circle of paddles which hit each other + for (threads) |*t, i| { + const paddle = &paddles[i]; + const hit_to = &paddles[(i + 1) % paddles.len]; + t.* = try std.Thread.spawn(.{}, Paddle.run, .{ paddle, hit_to }); + } + + // Hit the first paddle and wait for them all to complete by hitting each other for num_iterations. + paddles[0].hit(); + for (threads) |t| t.join(); + + // The first paddle will be hit one last time by the last paddle. + for (paddles) |p, i| { + const expected = @as(u32, num_iterations) + @boolToInt(i == 0); + try testing.expectEqual(p.value, expected); + } +} + +test "Condition - broadcasting" { + // This test requires spawning threads + if (builtin.single_threaded) { + return error.SkipZigTest; + } + + const num_threads = 10; + + const BroadcastTest = struct { + mutex: Mutex = .{}, + cond: Condition = .{}, + completed: Condition = .{}, + count: usize = 0, + threads: [num_threads]std.Thread = undefined, + + fn run(self: *@This()) void { + self.mutex.lock(); + defer self.mutex.unlock(); + + // The last broadcast thread to start tells the main test thread it's completed. + self.count += 1; + if (self.count == num_threads) { + self.completed.signal(); + } + + // Waits for the count to reach zero after the main test thread observes it at num_threads. + // Tries to use timedWait() a bit before falling back to wait() to test multiple threads timing out. + var i: usize = 0; + while (self.count != 0) : (i +%= 1) { + if (i < 10) { + self.cond.timedWait(&self.mutex, 1) catch {}; + } else { + self.cond.wait(&self.mutex); + } + } + } + }; + + var broadcast_test = BroadcastTest{}; + for (broadcast_test.threads) |*t| { + t.* = try std.Thread.spawn(.{}, BroadcastTest.run, .{&broadcast_test}); + } + + { + broadcast_test.mutex.lock(); + defer broadcast_test.mutex.unlock(); + + // Wait for all the broadcast threads to spawn. + // timedWait() to detect any potential deadlocks. + while (broadcast_test.count != num_threads) { + try broadcast_test.completed.timedWait( + &broadcast_test.mutex, + 1 * std.time.ns_per_s, + ); + } + + // Reset the counter and wake all the threads to exit. + broadcast_test.count = 0; + broadcast_test.cond.broadcast(); + } + + for (broadcast_test.threads) |t| { + t.join(); } } diff --git a/lib/std/Thread/Futex.zig b/lib/std/Thread/Futex.zig index a1c8ca71e4..33eb30ba9d 100644 --- a/lib/std/Thread/Futex.zig +++ b/lib/std/Thread/Futex.zig @@ -10,14 +10,12 @@ const Futex = @This(); const os = std.os; const assert = std.debug.assert; const testing = std.testing; - const Atomic = std.atomic.Atomic; -const spinLoopHint = std.atomic.spinLoopHint; /// Checks if `ptr` still contains the value `expect` and, if so, blocks the caller until either: /// - The value at `ptr` is no longer equal to `expect`. /// - The caller is unblocked by a matching `wake()`. -/// - The caller is unblocked spuriously by an arbitrary internal signal. +/// - The caller is unblocked spuriously ("at random"). /// /// The checking of `ptr` and `expect`, along with blocking the caller, is done atomically /// and totally ordered (sequentially consistent) with respect to other wait()/wake() calls on the same `ptr`. @@ -32,7 +30,7 @@ pub fn wait(ptr: *const Atomic(u32), expect: u32) void { /// Checks if `ptr` still contains the value `expect` and, if so, blocks the caller until either: /// - The value at `ptr` is no longer equal to `expect`. /// - The caller is unblocked by a matching `wake()`. -/// - The caller is unblocked spuriously by an arbitrary internal signal. +/// - The caller is unblocked spuriously ("at random"). /// - The caller blocks for longer than the given timeout. In which case, `error.Timeout` is returned. /// /// The checking of `ptr` and `expect`, along with blocking the caller, is done atomically @@ -62,7 +60,7 @@ pub fn wake(ptr: *const Atomic(u32), max_waiters: u32) void { } const Impl = if (builtin.single_threaded) - SerialImpl + SingleThreadedImpl else if (builtin.os.tag == .windows) WindowsImpl else if (builtin.os.tag.isDarwin()) @@ -97,7 +95,7 @@ const UnsupportedImpl = struct { } }; -const SerialImpl = struct { +const SingleThreadedImpl = struct { fn wait(ptr: *const Atomic(u32), expect: u32, timeout: ?u64) error{Timeout}!void { if (ptr.loadUnchecked() != expect) { return; @@ -804,7 +802,7 @@ const PosixImpl = struct { // // What we really want here is a Release load, but that doesn't exist under the C11 memory model. // We could instead do `bucket.pending.fetchAdd(0, Release) == 0` which achieves effectively the same thing, - // but the RMW operation unconditionally stores which invalidates the cache-line for others causing unnecessary contention. + // but the RMW operation unconditionally marks the cache-line as modified for others causing unnecessary fetching/contention. // // Instead we opt to do a full-fence + load instead which avoids taking ownership of the cache-line. // fence(SeqCst) effectively converts the ptr update to SeqCst and the pending load to SeqCst: creating a Store-Load barrier. @@ -962,3 +960,60 @@ test "Futex - broadcasting" { for (broadcast.threads) |*t| t.* = try std.Thread.spawn(.{}, Broadcast.run, .{&broadcast}); for (broadcast.threads) |t| t.join(); } + +/// Deadline is used to wait efficiently for a pointer's value to change using Futex and a fixed timeout. +/// +/// Futex's timedWait() api uses a relative duration which suffers from over-waiting +/// when used in a loop which is often required due to the possibility of spurious wakeups. +/// +/// Deadline instead converts the relative timeout to an absolute one so that multiple calls +/// to Futex timedWait() can block for and report more accurate error.Timeouts. +pub const Deadline = struct { + timeout: ?u64, + started: std.time.Timer, + + /// Create the deadline to expire after the given amount of time in nanoseconds passes. + /// Pass in `null` to have the deadline call `Futex.wait()` and never expire. + pub fn init(expires_in_ns: ?u64) Deadline { + var deadline: Deadline = undefined; + deadline.timeout = expires_in_ns; + + // std.time.Timer is required to be supported for somewhat accurate reportings of error.Timeout. + if (deadline.timeout != null) { + deadline.started = std.time.Timer.start() catch unreachable; + } + + return deadline; + } + + /// Wait until either: + /// - the `ptr`'s value changes from `expect`. + /// - `Futex.wake()` is called on the `ptr`. + /// - A spurious wake occurs. + /// - The deadline expires; In which case `error.Timeout` is returned. + pub fn wait(self: *Deadline, ptr: *const Atomic(u32), expect: u32) error{Timeout}!void { + @setCold(true); + + // Check if we actually have a timeout to wait until. + // If not just wait "forever". + const timeout_ns = self.timeout orelse { + return Futex.wait(ptr, expect); + }; + + // Get how much time has passed since we started waiting + // then subtract that from the init() timeout to get how much longer to wait. + // Use overflow to detect when we've been waiting longer than the init() timeout. + const elapsed_ns = self.started.read(); + const until_timeout_ns = std.math.sub(u64, timeout_ns, elapsed_ns) catch 0; + return Futex.timedWait(ptr, expect, until_timeout_ns); + } +}; + +test "Futex - Deadline" { + var deadline = Deadline.init(100 * std.time.ns_per_ms); + var futex_word = Atomic(u32).init(0); + + while (true) { + deadline.wait(&futex_word, 0) catch break; + } +} diff --git a/lib/std/Thread/Mutex.zig b/lib/std/Thread/Mutex.zig index c8f5c0534d..35d754ad19 100644 --- a/lib/std/Thread/Mutex.zig +++ b/lib/std/Thread/Mutex.zig @@ -1,288 +1,285 @@ -//! Lock may be held only once. If the same thread tries to acquire -//! the same mutex twice, it deadlocks. This type supports static -//! initialization and is at most `@sizeOf(usize)` in size. When an -//! application is built in single threaded release mode, all the -//! functions are no-ops. In single threaded debug mode, there is -//! deadlock detection. +//! Mutex is a synchronization primitive which enforces atomic access to a shared region of code known as the "critical section". +//! It does this by blocking ensuring only one thread is in the critical section at any given point in time by blocking the others. +//! Mutex can be statically initialized and is at most `@sizeOf(u64)` large. +//! Use `lock()` or `tryLock()` to enter the critical section and `unlock()` to leave it. //! -//! Example usage: +//! Example: +//! ``` //! var m = Mutex{}; //! -//! m.lock(); -//! defer m.release(); -//! ... critical code +//! { +//! m.lock(); +//! defer m.unlock(); +//! // ... critical section code +//! } //! -//! Non-blocking: //! if (m.tryLock()) { //! defer m.unlock(); -//! // ... critical section -//! } else { -//! // ... lock not acquired +//! // ... critical section code //! } +//! ``` + +const std = @import("../std.zig"); +const builtin = @import("builtin"); +const Mutex = @This(); + +const os = std.os; +const assert = std.debug.assert; +const testing = std.testing; +const Atomic = std.atomic.Atomic; +const Futex = std.Thread.Futex; impl: Impl = .{}, -const Mutex = @This(); -const std = @import("../std.zig"); -const builtin = @import("builtin"); -const os = std.os; -const assert = std.debug.assert; -const windows = os.windows; -const linux = os.linux; -const testing = std.testing; -const StaticResetEvent = std.thread.StaticResetEvent; - -/// Try to acquire the mutex without blocking. Returns `false` if the mutex is -/// unavailable. Otherwise returns `true`. Call `unlock` on the mutex to release. -pub fn tryLock(m: *Mutex) bool { - return m.impl.tryLock(); +/// Tries to acquire the mutex without blocking the caller's thread. +/// Returns `false` if the calling thread would have to block to acquire it. +/// Otherwise, returns `true` and the caller should `unlock()` the Mutex to release it. +pub fn tryLock(self: *Mutex) bool { + return self.impl.tryLock(); } -/// Acquire the mutex. Deadlocks if the mutex is already -/// held by the calling thread. -pub fn lock(m: *Mutex) void { - m.impl.lock(); +/// Acquires the mutex, blocking the caller's thread until it can. +/// It is undefined behavior if the mutex is already held by the caller's thread. +/// Once acquired, call `unlock()` on the Mutex to release it. +pub fn lock(self: *Mutex) void { + self.impl.lock(); } -pub fn unlock(m: *Mutex) void { - m.impl.unlock(); +/// Releases the mutex which was previously acquired with `lock()` or `tryLock()`. +/// It is undefined behavior if the mutex is unlocked from a different thread that it was locked from. +pub fn unlock(self: *Mutex) void { + self.impl.unlock(); } const Impl = if (builtin.single_threaded) - Dummy + SingleThreadedImpl else if (builtin.os.tag == .windows) - WindowsMutex -else if (std.Thread.use_pthreads) - PthreadMutex + WindowsImpl +else if (builtin.os.tag.isDarwin()) + DarwinImpl else - AtomicMutex; + FutexImpl; -pub const AtomicMutex = struct { - state: State = .unlocked, +const SingleThreadedImpl = struct { + is_locked: bool = false, - const State = enum(i32) { - unlocked, - locked, - waiting, - }; - - pub fn tryLock(m: *AtomicMutex) bool { - return @cmpxchgStrong( - State, - &m.state, - .unlocked, - .locked, - .Acquire, - .Monotonic, - ) == null; - } - - pub fn lock(m: *AtomicMutex) void { - switch (@atomicRmw(State, &m.state, .Xchg, .locked, .Acquire)) { - .unlocked => {}, - else => |s| m.lockSlow(s), - } - } - - pub fn unlock(m: *AtomicMutex) void { - switch (@atomicRmw(State, &m.state, .Xchg, .unlocked, .Release)) { - .unlocked => unreachable, - .locked => {}, - .waiting => m.unlockSlow(), - } - } - - fn lockSlow(m: *AtomicMutex, current_state: State) void { - @setCold(true); - var new_state = current_state; - - var spin: u8 = 0; - while (spin < 100) : (spin += 1) { - const state = @cmpxchgWeak( - State, - &m.state, - .unlocked, - new_state, - .Acquire, - .Monotonic, - ) orelse return; - - switch (state) { - .unlocked => {}, - .locked => {}, - .waiting => break, - } - - var iter = std.math.min(32, spin + 1); - while (iter > 0) : (iter -= 1) - std.atomic.spinLoopHint(); - } - - new_state = .waiting; - while (true) { - switch (@atomicRmw(State, &m.state, .Xchg, new_state, .Acquire)) { - .unlocked => return, - else => {}, - } - switch (builtin.os.tag) { - .linux => { - switch (linux.getErrno(linux.futex_wait( - @ptrCast(*const i32, &m.state), - linux.FUTEX.PRIVATE_FLAG | linux.FUTEX.WAIT, - @enumToInt(new_state), - null, - ))) { - .SUCCESS => {}, - .INTR => {}, - .AGAIN => {}, - else => unreachable, - } - }, - else => std.atomic.spinLoopHint(), - } - } - } - - fn unlockSlow(m: *AtomicMutex) void { - @setCold(true); - - switch (builtin.os.tag) { - .linux => { - switch (linux.getErrno(linux.futex_wake( - @ptrCast(*const i32, &m.state), - linux.FUTEX.PRIVATE_FLAG | linux.FUTEX.WAKE, - 1, - ))) { - .SUCCESS => {}, - .FAULT => unreachable, // invalid pointer passed to futex_wake - else => unreachable, - } - }, - else => {}, - } - } -}; - -pub const PthreadMutex = struct { - pthread_mutex: std.c.pthread_mutex_t = .{}, - - /// Try to acquire the mutex without blocking. Returns true if - /// the mutex is unavailable. Otherwise returns false. Call - /// release when done. - pub fn tryLock(m: *PthreadMutex) bool { - return std.c.pthread_mutex_trylock(&m.pthread_mutex) == .SUCCESS; - } - - /// Acquire the mutex. Will deadlock if the mutex is already - /// held by the calling thread. - pub fn lock(m: *PthreadMutex) void { - switch (std.c.pthread_mutex_lock(&m.pthread_mutex)) { - .SUCCESS => {}, - .INVAL => unreachable, - .BUSY => unreachable, - .AGAIN => unreachable, - .DEADLK => unreachable, - .PERM => unreachable, - else => unreachable, - } - } - - pub fn unlock(m: *PthreadMutex) void { - switch (std.c.pthread_mutex_unlock(&m.pthread_mutex)) { - .SUCCESS => return, - .INVAL => unreachable, - .AGAIN => unreachable, - .PERM => unreachable, - else => unreachable, - } - } -}; - -/// This has the sematics as `Mutex`, however it does not actually do any -/// synchronization. Operations are safety-checked no-ops. -pub const Dummy = struct { - locked: @TypeOf(lock_init) = lock_init, - - const lock_init = if (std.debug.runtime_safety) false else {}; - - /// Try to acquire the mutex without blocking. Returns false if - /// the mutex is unavailable. Otherwise returns true. - pub fn tryLock(m: *Dummy) bool { - if (std.debug.runtime_safety) { - if (m.locked) return false; - m.locked = true; - } + fn tryLock(self: *Impl) bool { + if (self.is_locked) return false; + self.is_locked = true; return true; } - /// Acquire the mutex. Will deadlock if the mutex is already - /// held by the calling thread. - pub fn lock(m: *Dummy) void { - if (!m.tryLock()) { - @panic("deadlock detected"); + fn lock(self: *Impl) void { + if (!self.tryLock()) { + unreachable; // deadlock detected } } - pub fn unlock(m: *Dummy) void { - if (std.debug.runtime_safety) { - m.locked = false; + fn unlock(self: *Impl) void { + assert(self.is_locked); + self.is_locked = false; + } +}; + +// SRWLOCK on windows is almost always faster than Futex solution. +// It also implements an efficient Condition with requeue support for us. +const WindowsImpl = struct { + srwlock: os.windows.SRWLOCK = .{}, + + fn tryLock(self: *Impl) bool { + return os.windows.kernel32.TryAcquireSRWLockExclusive(&self.srwlock) != os.windows.FALSE; + } + + fn lock(self: *Impl) void { + os.windows.kernel32.AcquireSRWLockExclusive(&self.srwlock); + } + + fn unlock(self: *Impl) void { + os.windows.kernel32.ReleaseSRWLockExclusive(&self.srwlock); + } +}; + +// os_unfair_lock on darwin supports priority inheritance and is generally faster than Futex solutions. +const DarwinImpl = struct { + oul: os.darwin.os_unfair_lock = .{}, + + fn tryLock(self: *Impl) bool { + return os.darwin.os_unfair_lock_trylock(&self.oul); + } + + fn lock(self: *Impl) void { + os.darwin.os_unfair_lock_lock(&self.oul); + } + + fn unlock(self: *Impl) void { + os.darwin.os_unfair_lock_unlock(&self.oul); + } +}; + +const FutexImpl = struct { + state: Atomic(u32) = Atomic(u32).init(unlocked), + + const unlocked = 0b00; + const locked = 0b01; + const contended = 0b11; // must contain the `locked` bit for x86 optimization below + + fn tryLock(self: *Impl) bool { + // Lock with compareAndSwap instead of tryCompareAndSwap to avoid reporting spurious CAS failure. + return self.lockFast("compareAndSwap"); + } + + fn lock(self: *Impl) void { + // Lock with tryCompareAndSwap instead of compareAndSwap due to being more inline-able on LL/SC archs like ARM. + if (!self.lockFast("tryCompareAndSwap")) { + self.lockSlow(); + } + } + + inline fn lockFast(self: *Impl, comptime casFn: []const u8) bool { + // On x86, use `lock bts` instead of `lock cmpxchg` as: + // - they both seem to mark the cache-line as modified regardless: https://stackoverflow.com/a/63350048 + // - `lock bts` is smaller instruction-wise which makes it better for inlining + if (comptime builtin.target.cpu.arch.isX86()) { + const locked_bit = @ctz(u32, @as(u32, locked)); + return self.state.bitSet(locked_bit, .Acquire) == 0; + } + + // Acquire barrier ensures grabbing the lock happens before the critical section + // and that the previous lock holder's critical section happens before we grab the lock. + return @field(self.state, casFn)(unlocked, locked, .Acquire, .Monotonic) == null; + } + + fn lockSlow(self: *Impl) void { + @setCold(true); + + // Avoid doing an atomic swap below if we already know the state is contended. + // An atomic swap unconditionally stores which marks the cache-line as modified unnecessarily. + if (self.state.load(.Monotonic) == contended) { + Futex.wait(&self.state, contended); + } + + // Try to acquire the lock while also telling the existing lock holder that there are threads waiting. + // + // Once we sleep on the Futex, we must acquire the mutex using `contended` rather than `locked`. + // If not, threads sleeping on the Futex wouldn't see the state change in unlock and potentially deadlock. + // The downside is that the last mutex unlocker will see `contended` and do an unnecessary Futex wake + // but this is better than having to wake all waiting threads on mutex unlock. + // + // Acquire barrier ensures grabbing the lock happens before the critical section + // and that the previous lock holder's critical section happens before we grab the lock. + while (self.state.swap(contended, .Acquire) != unlocked) { + Futex.wait(&self.state, contended); + } + } + + fn unlock(self: *Impl) void { + // Unlock the mutex and wake up a waiting thread if any. + // + // A waiting thread will acquire with `contended` instead of `locked` + // which ensures that it wakes up another thread on the next unlock(). + // + // Release barrier ensures the critical section happens before we let go of the lock + // and that our critical section happens before the next lock holder grabs the lock. + const state = self.state.swap(unlocked, .Release); + assert(state != unlocked); + + if (state == contended) { + Futex.wake(&self.state, 1); } } }; -pub const WindowsMutex = struct { - srwlock: windows.SRWLOCK = windows.SRWLOCK_INIT, - - pub fn tryLock(m: *WindowsMutex) bool { - return windows.kernel32.TryAcquireSRWLockExclusive(&m.srwlock) != windows.FALSE; - } - - pub fn lock(m: *WindowsMutex) void { - windows.kernel32.AcquireSRWLockExclusive(&m.srwlock); - } - - pub fn unlock(m: *WindowsMutex) void { - windows.kernel32.ReleaseSRWLockExclusive(&m.srwlock); - } -}; - -const TestContext = struct { - mutex: *Mutex, - data: i128, - - const incr_count = 10000; -}; - -test "basic usage" { +test "Mutex - smoke test" { var mutex = Mutex{}; - var context = TestContext{ - .mutex = &mutex, - .data = 0, + try testing.expect(mutex.tryLock()); + try testing.expect(!mutex.tryLock()); + mutex.unlock(); + + mutex.lock(); + try testing.expect(!mutex.tryLock()); + mutex.unlock(); +} + +// A counter which is incremented without atomic instructions +const NonAtomicCounter = struct { + // direct u128 could maybe use xmm ops on x86 which are atomic + value: [2]u64 = [_]u64{ 0, 0 }, + + fn get(self: NonAtomicCounter) u128 { + return @bitCast(u128, self.value); + } + + fn inc(self: *NonAtomicCounter) void { + for (@bitCast([2]u64, self.get() + 1)) |v, i| { + @ptrCast(*volatile u64, &self.value[i]).* = v; + } + } +}; + +test "Mutex - many uncontended" { + // This test requires spawning threads. + if (builtin.single_threaded) { + return error.SkipZigTest; + } + + const num_threads = 4; + const num_increments = 1000; + + const Runner = struct { + mutex: Mutex = .{}, + thread: std.Thread = undefined, + counter: NonAtomicCounter = .{}, + + fn run(self: *@This()) void { + var i: usize = num_increments; + while (i > 0) : (i -= 1) { + self.mutex.lock(); + defer self.mutex.unlock(); + + self.counter.inc(); + } + } }; + var runners = [_]Runner{.{}} ** num_threads; + for (runners) |*r| r.thread = try std.Thread.spawn(.{}, Runner.run, .{r}); + for (runners) |r| r.thread.join(); + for (runners) |r| try testing.expectEqual(r.counter.get(), num_increments); +} + +test "Mutex - many contended" { + // This test requires spawning threads. if (builtin.single_threaded) { - worker(&context); - try testing.expect(context.data == TestContext.incr_count); - } else { - const thread_count = 10; - var threads: [thread_count]std.Thread = undefined; - for (threads) |*t| { - t.* = try std.Thread.spawn(.{}, worker, .{&context}); + return error.SkipZigTest; + } + + const num_threads = 4; + const num_increments = 1000; + + const Runner = struct { + mutex: Mutex = .{}, + counter: NonAtomicCounter = .{}, + + fn run(self: *@This()) void { + var i: usize = num_increments; + while (i > 0) : (i -= 1) { + // Occasionally hint to let another thread run. + defer if (i % 100 == 0) std.Thread.yield() catch {}; + + self.mutex.lock(); + defer self.mutex.unlock(); + + self.counter.inc(); + } } - for (threads) |t| - t.join(); + }; - try testing.expect(context.data == thread_count * TestContext.incr_count); - } -} - -fn worker(ctx: *TestContext) void { - var i: usize = 0; - while (i != TestContext.incr_count) : (i += 1) { - ctx.mutex.lock(); - defer ctx.mutex.unlock(); - - ctx.data += 1; - } + var runner = Runner{}; + + var threads: [num_threads]std.Thread = undefined; + for (threads) |*t| t.* = try std.Thread.spawn(.{}, Runner.run, .{&runner}); + for (threads) |t| t.join(); + + try testing.expectEqual(runner.counter.get(), num_increments * num_threads); } diff --git a/lib/std/atomic/Atomic.zig b/lib/std/atomic/Atomic.zig index 336230333a..c396281e91 100644 --- a/lib/std/atomic/Atomic.zig +++ b/lib/std/atomic/Atomic.zig @@ -1,7 +1,7 @@ const std = @import("../std.zig"); +const builtin = @import("builtin"); const testing = std.testing; -const target = @import("builtin").target; const Ordering = std.atomic.Ordering; pub fn Atomic(comptime T: type) type { @@ -164,87 +164,13 @@ pub fn Atomic(comptime T: type) type { return bitRmw(self, .Toggle, bit, ordering); } - inline fn bitRmw( - self: *Self, - comptime op: BitRmwOp, - bit: Bit, - comptime ordering: Ordering, - ) u1 { + inline fn bitRmw(self: *Self, comptime op: BitRmwOp, bit: Bit, comptime ordering: Ordering) u1 { // x86 supports dedicated bitwise instructions - if (comptime target.cpu.arch.isX86() and @sizeOf(T) >= 2 and @sizeOf(T) <= 8) { - const old_bit: u8 = switch (@sizeOf(T)) { - 2 => switch (op) { - .Set => asm volatile ("lock btsw %[bit], %[ptr]" - // LLVM doesn't support u1 flag register return values - : [result] "={@ccc}" (-> u8), - : [ptr] "*p" (&self.value), - [bit] "X" (@as(T, bit)), - : "cc", "memory" - ), - .Reset => asm volatile ("lock btrw %[bit], %[ptr]" - // LLVM doesn't support u1 flag register return values - : [result] "={@ccc}" (-> u8), - : [ptr] "*p" (&self.value), - [bit] "X" (@as(T, bit)), - : "cc", "memory" - ), - .Toggle => asm volatile ("lock btcw %[bit], %[ptr]" - // LLVM doesn't support u1 flag register return values - : [result] "={@ccc}" (-> u8), - : [ptr] "*p" (&self.value), - [bit] "X" (@as(T, bit)), - : "cc", "memory" - ), - }, - 4 => switch (op) { - .Set => asm volatile ("lock btsl %[bit], %[ptr]" - // LLVM doesn't support u1 flag register return values - : [result] "={@ccc}" (-> u8), - : [ptr] "*p" (&self.value), - [bit] "X" (@as(T, bit)), - : "cc", "memory" - ), - .Reset => asm volatile ("lock btrl %[bit], %[ptr]" - // LLVM doesn't support u1 flag register return values - : [result] "={@ccc}" (-> u8), - : [ptr] "*p" (&self.value), - [bit] "X" (@as(T, bit)), - : "cc", "memory" - ), - .Toggle => asm volatile ("lock btcl %[bit], %[ptr]" - // LLVM doesn't support u1 flag register return values - : [result] "={@ccc}" (-> u8), - : [ptr] "*p" (&self.value), - [bit] "X" (@as(T, bit)), - : "cc", "memory" - ), - }, - 8 => switch (op) { - .Set => asm volatile ("lock btsq %[bit], %[ptr]" - // LLVM doesn't support u1 flag register return values - : [result] "={@ccc}" (-> u8), - : [ptr] "*p" (&self.value), - [bit] "X" (@as(T, bit)), - : "cc", "memory" - ), - .Reset => asm volatile ("lock btrq %[bit], %[ptr]" - // LLVM doesn't support u1 flag register return values - : [result] "={@ccc}" (-> u8), - : [ptr] "*p" (&self.value), - [bit] "X" (@as(T, bit)), - : "cc", "memory" - ), - .Toggle => asm volatile ("lock btcq %[bit], %[ptr]" - // LLVM doesn't support u1 flag register return values - : [result] "={@ccc}" (-> u8), - : [ptr] "*p" (&self.value), - [bit] "X" (@as(T, bit)), - : "cc", "memory" - ), - }, - else => @compileError("Invalid atomic type " ++ @typeName(T)), - }; - return @intCast(u1, old_bit); + if (comptime builtin.target.cpu.arch.isX86() and @sizeOf(T) >= 2 and @sizeOf(T) <= 8) { + // TODO: stage2 currently doesn't like the inline asm this function emits. + if (builtin.zig_backend == .stage1) { + return x86BitRmw(self, op, bit, ordering); + } } const mask = @as(T, 1) << bit; @@ -256,6 +182,86 @@ pub fn Atomic(comptime T: type) type { return @boolToInt(value & mask != 0); } + + inline fn x86BitRmw(self: *Self, comptime op: BitRmwOp, bit: Bit, comptime ordering: Ordering) u1 { + const old_bit: u8 = switch (@sizeOf(T)) { + 2 => switch (op) { + .Set => asm volatile ("lock btsw %[bit], %[ptr]" + // LLVM doesn't support u1 flag register return values + : [result] "={@ccc}" (-> u8), + : [ptr] "*p" (&self.value), + [bit] "X" (@as(T, bit)), + : "cc", "memory" + ), + .Reset => asm volatile ("lock btrw %[bit], %[ptr]" + // LLVM doesn't support u1 flag register return values + : [result] "={@ccc}" (-> u8), + : [ptr] "*p" (&self.value), + [bit] "X" (@as(T, bit)), + : "cc", "memory" + ), + .Toggle => asm volatile ("lock btcw %[bit], %[ptr]" + // LLVM doesn't support u1 flag register return values + : [result] "={@ccc}" (-> u8), + : [ptr] "*p" (&self.value), + [bit] "X" (@as(T, bit)), + : "cc", "memory" + ), + }, + 4 => switch (op) { + .Set => asm volatile ("lock btsl %[bit], %[ptr]" + // LLVM doesn't support u1 flag register return values + : [result] "={@ccc}" (-> u8), + : [ptr] "*p" (&self.value), + [bit] "X" (@as(T, bit)), + : "cc", "memory" + ), + .Reset => asm volatile ("lock btrl %[bit], %[ptr]" + // LLVM doesn't support u1 flag register return values + : [result] "={@ccc}" (-> u8), + : [ptr] "*p" (&self.value), + [bit] "X" (@as(T, bit)), + : "cc", "memory" + ), + .Toggle => asm volatile ("lock btcl %[bit], %[ptr]" + // LLVM doesn't support u1 flag register return values + : [result] "={@ccc}" (-> u8), + : [ptr] "*p" (&self.value), + [bit] "X" (@as(T, bit)), + : "cc", "memory" + ), + }, + 8 => switch (op) { + .Set => asm volatile ("lock btsq %[bit], %[ptr]" + // LLVM doesn't support u1 flag register return values + : [result] "={@ccc}" (-> u8), + : [ptr] "*p" (&self.value), + [bit] "X" (@as(T, bit)), + : "cc", "memory" + ), + .Reset => asm volatile ("lock btrq %[bit], %[ptr]" + // LLVM doesn't support u1 flag register return values + : [result] "={@ccc}" (-> u8), + : [ptr] "*p" (&self.value), + [bit] "X" (@as(T, bit)), + : "cc", "memory" + ), + .Toggle => asm volatile ("lock btcq %[bit], %[ptr]" + // LLVM doesn't support u1 flag register return values + : [result] "={@ccc}" (-> u8), + : [ptr] "*p" (&self.value), + [bit] "X" (@as(T, bit)), + : "cc", "memory" + ), + }, + else => @compileError("Invalid atomic type " ++ @typeName(T)), + }; + + // TODO: emit appropriate tsan fence if compiling with tsan + _ = ordering; + + return @intCast(u1, old_bit); + } }); }; } diff --git a/lib/std/heap/general_purpose_allocator.zig b/lib/std/heap/general_purpose_allocator.zig index 7359fe402f..11d897ac1b 100644 --- a/lib/std/heap/general_purpose_allocator.zig +++ b/lib/std/heap/general_purpose_allocator.zig @@ -151,12 +151,12 @@ pub const Config = struct { /// What type of mutex you'd like to use, for thread safety. /// when specfied, the mutex type must have the same shape as `std.Thread.Mutex` and - /// `std.Thread.Mutex.Dummy`, and have no required fields. Specifying this field causes + /// `DummyMutex`, and have no required fields. Specifying this field causes /// the `thread_safe` field to be ignored. /// /// when null (default): /// * the mutex type defaults to `std.Thread.Mutex` when thread_safe is enabled. - /// * the mutex type defaults to `std.Thread.Mutex.Dummy` otherwise. + /// * the mutex type defaults to `DummyMutex` otherwise. MutexType: ?type = null, /// This is a temporary debugging trick you can use to turn segfaults into more helpful @@ -198,7 +198,12 @@ pub fn GeneralPurposeAllocator(comptime config: Config) type { else if (config.thread_safe) std.Thread.Mutex{} else - std.Thread.Mutex.Dummy{}; + DummyMutex{}; + + const DummyMutex = struct { + fn lock(_: *DummyMutex) void {} + fn unlock(_: *DummyMutex) void {} + }; const stack_n = config.stack_trace_frames; const one_trace_size = @sizeOf(usize) * stack_n; diff --git a/lib/std/os/windows.zig b/lib/std/os/windows.zig index b1a2262083..800a16f660 100644 --- a/lib/std/os/windows.zig +++ b/lib/std/os/windows.zig @@ -3680,10 +3680,15 @@ pub const OBJECT_NAME_INFORMATION = extern struct { Name: UNICODE_STRING, }; -pub const SRWLOCK = usize; -pub const SRWLOCK_INIT: SRWLOCK = 0; -pub const CONDITION_VARIABLE = usize; -pub const CONDITION_VARIABLE_INIT: CONDITION_VARIABLE = 0; +pub const SRWLOCK_INIT = SRWLOCK{}; +pub const SRWLOCK = extern struct { + Ptr: ?PVOID = null, +}; + +pub const CONDITION_VARIABLE_INIT = CONDITION_VARIABLE{}; +pub const CONDITION_VARIABLE = extern struct { + Ptr: ?PVOID = null, +}; pub const FILE_SKIP_COMPLETION_PORT_ON_SUCCESS = 0x1; pub const FILE_SKIP_SET_EVENT_ON_HANDLE = 0x2;