zig/lib/std/Thread/Mutex.zig
Andrew Kelley 008b0ec5e5 std.Thread.Mutex: change API to lock() and unlock()
This is a breaking change. Before, usage looked like this:

```zig
const held = mutex.acquire();
defer held.release();
```

Now it looks like this:

```zig
mutex.lock();
defer mutex.unlock();
```

The `Held` type was an idea to make mutexes slightly safer by making it
more difficult to forget to release an aquired lock. However, this
ultimately caused more problems than it solved, when any data structures
needed to store a held mutex. Simplify everything by reducing the API
down to the primitives: lock() and unlock().

Closes #8051
Closes #8246
Closes #10105
2021-11-09 18:31:03 -07:00

289 lines
7.7 KiB
Zig

//! Lock may be held only once. If the same thread tries to acquire
//! the same mutex twice, it deadlocks. This type supports static
//! initialization and is at most `@sizeOf(usize)` in size. When an
//! application is built in single threaded release mode, all the
//! functions are no-ops. In single threaded debug mode, there is
//! deadlock detection.
//!
//! Example usage:
//! var m = Mutex{};
//!
//! m.lock();
//! defer m.release();
//! ... critical code
//!
//! Non-blocking:
//! if (m.tryLock()) {
//! defer m.unlock();
//! // ... critical section
//! } else {
//! // ... lock not acquired
//! }
impl: Impl = .{},
const Mutex = @This();
const std = @import("../std.zig");
const builtin = @import("builtin");
const os = std.os;
const assert = std.debug.assert;
const windows = os.windows;
const linux = os.linux;
const testing = std.testing;
const StaticResetEvent = std.thread.StaticResetEvent;
/// Try to acquire the mutex without blocking. Returns `false` if the mutex is
/// unavailable. Otherwise returns `true`. Call `unlock` on the mutex to release.
pub fn tryLock(m: *Mutex) bool {
return m.impl.tryLock();
}
/// Acquire the mutex. Deadlocks if the mutex is already
/// held by the calling thread.
pub fn lock(m: *Mutex) void {
m.impl.lock();
}
pub fn unlock(m: *Mutex) void {
m.impl.unlock();
}
const Impl = if (builtin.single_threaded)
Dummy
else if (builtin.os.tag == .windows)
WindowsMutex
else if (std.Thread.use_pthreads)
PthreadMutex
else
AtomicMutex;
pub const AtomicMutex = struct {
state: State = .unlocked,
const State = enum(i32) {
unlocked,
locked,
waiting,
};
pub fn tryLock(m: *AtomicMutex) bool {
return @cmpxchgStrong(
State,
&m.state,
.unlocked,
.locked,
.Acquire,
.Monotonic,
) == null;
}
pub fn lock(m: *AtomicMutex) void {
switch (@atomicRmw(State, &m.state, .Xchg, .locked, .Acquire)) {
.unlocked => {},
else => |s| m.lockSlow(s),
}
}
pub fn unlock(m: *AtomicMutex) void {
switch (@atomicRmw(State, &m.state, .Xchg, .unlocked, .Release)) {
.unlocked => unreachable,
.locked => {},
.waiting => m.unlockSlow(),
}
}
fn lockSlow(m: *AtomicMutex, current_state: State) void {
@setCold(true);
var new_state = current_state;
var spin: u8 = 0;
while (spin < 100) : (spin += 1) {
const state = @cmpxchgWeak(
State,
&m.state,
.unlocked,
new_state,
.Acquire,
.Monotonic,
) orelse return;
switch (state) {
.unlocked => {},
.locked => {},
.waiting => break,
}
var iter = std.math.min(32, spin + 1);
while (iter > 0) : (iter -= 1)
std.atomic.spinLoopHint();
}
new_state = .waiting;
while (true) {
switch (@atomicRmw(State, &m.state, .Xchg, new_state, .Acquire)) {
.unlocked => return,
else => {},
}
switch (builtin.os.tag) {
.linux => {
switch (linux.getErrno(linux.futex_wait(
@ptrCast(*const i32, &m.state),
linux.FUTEX.PRIVATE_FLAG | linux.FUTEX.WAIT,
@enumToInt(new_state),
null,
))) {
.SUCCESS => {},
.INTR => {},
.AGAIN => {},
else => unreachable,
}
},
else => std.atomic.spinLoopHint(),
}
}
}
fn unlockSlow(m: *AtomicMutex) void {
@setCold(true);
switch (builtin.os.tag) {
.linux => {
switch (linux.getErrno(linux.futex_wake(
@ptrCast(*const i32, &m.state),
linux.FUTEX.PRIVATE_FLAG | linux.FUTEX.WAKE,
1,
))) {
.SUCCESS => {},
.FAULT => unreachable, // invalid pointer passed to futex_wake
else => unreachable,
}
},
else => {},
}
}
};
pub const PthreadMutex = struct {
pthread_mutex: std.c.pthread_mutex_t = .{},
/// Try to acquire the mutex without blocking. Returns true if
/// the mutex is unavailable. Otherwise returns false. Call
/// release when done.
pub fn tryLock(m: *PthreadMutex) bool {
return std.c.pthread_mutex_trylock(&m.pthread_mutex) == .SUCCESS;
}
/// Acquire the mutex. Will deadlock if the mutex is already
/// held by the calling thread.
pub fn lock(m: *PthreadMutex) void {
switch (std.c.pthread_mutex_lock(&m.pthread_mutex)) {
.SUCCESS => {},
.INVAL => unreachable,
.BUSY => unreachable,
.AGAIN => unreachable,
.DEADLK => unreachable,
.PERM => unreachable,
else => unreachable,
}
}
pub fn unlock(m: *PthreadMutex) void {
switch (std.c.pthread_mutex_unlock(&m.pthread_mutex)) {
.SUCCESS => return,
.INVAL => unreachable,
.AGAIN => unreachable,
.PERM => unreachable,
else => unreachable,
}
}
};
/// This has the sematics as `Mutex`, however it does not actually do any
/// synchronization. Operations are safety-checked no-ops.
pub const Dummy = struct {
locked: @TypeOf(lock_init) = lock_init,
const lock_init = if (std.debug.runtime_safety) false else {};
/// Try to acquire the mutex without blocking. Returns false if
/// the mutex is unavailable. Otherwise returns true.
pub fn tryLock(m: *Dummy) bool {
if (std.debug.runtime_safety) {
if (m.locked) return false;
m.locked = true;
}
return true;
}
/// Acquire the mutex. Will deadlock if the mutex is already
/// held by the calling thread.
pub fn lock(m: *Dummy) void {
if (!m.tryLock()) {
@panic("deadlock detected");
}
}
pub fn unlock(m: *Dummy) void {
if (std.debug.runtime_safety) {
m.locked = false;
}
}
};
pub const WindowsMutex = struct {
srwlock: windows.SRWLOCK = windows.SRWLOCK_INIT,
pub fn tryLock(m: *WindowsMutex) bool {
return windows.kernel32.TryAcquireSRWLockExclusive(&m.srwlock) != windows.FALSE;
}
pub fn lock(m: *WindowsMutex) void {
windows.kernel32.AcquireSRWLockExclusive(&m.srwlock);
}
pub fn unlock(m: *WindowsMutex) void {
windows.kernel32.ReleaseSRWLockExclusive(&m.srwlock);
}
};
const TestContext = struct {
mutex: *Mutex,
data: i128,
const incr_count = 10000;
};
test "basic usage" {
var mutex = Mutex{};
var context = TestContext{
.mutex = &mutex,
.data = 0,
};
if (builtin.single_threaded) {
worker(&context);
try testing.expect(context.data == TestContext.incr_count);
} else {
const thread_count = 10;
var threads: [thread_count]std.Thread = undefined;
for (threads) |*t| {
t.* = try std.Thread.spawn(.{}, worker, .{&context});
}
for (threads) |t|
t.join();
try testing.expect(context.data == thread_count * TestContext.incr_count);
}
}
fn worker(ctx: *TestContext) void {
var i: usize = 0;
while (i != TestContext.incr_count) : (i += 1) {
ctx.mutex.lock();
defer ctx.mutex.unlock();
ctx.data += 1;
}
}