zig/lib/std/mutex.zig
Nathan Michaels 33b5dbb82c Document std.Mutex.
Not sure what the build platform is for the generated documentation,
and it's worth thinking about how best to deal with this pattern. It
might be worth figuring out how to rewrite this to have a single
definition of the public API with the implementation chosen at compile
time.
2019-12-19 23:42:27 -05:00

328 lines
12 KiB
Zig

const std = @import("std.zig");
const builtin = @import("builtin");
const os = std.os;
const testing = std.testing;
const SpinLock = std.SpinLock;
const ResetEvent = std.ResetEvent;
/// Lock may be held only once. If the same thread tries to acquire
/// the same mutex twice, it deadlocks. This type supports static
/// initialization and is at most `@sizeOf(usize)` in size. When an
/// application is built in single threaded release mode, all the
/// functions are no-ops. In single threaded debug mode, there is
/// deadlock detection.
///
/// Example usage:
/// var m = Mutex.init();
/// defer m.deinit();
///
/// const lock = m.acquire();
/// defer lock.release();
/// ... critical code
///
/// Non-blocking:
/// if (m.tryAcquire) |lock| {
/// defer lock.release();
/// // ... critical section
/// } else {
/// // ... lock not acquired
/// }
pub const Mutex = if (builtin.single_threaded)
struct {
lock: @TypeOf(lock_init),
const lock_init = if (std.debug.runtime_safety) false else {};
pub const Held = struct {
mutex: *Mutex,
pub fn release(self: Held) void {
if (std.debug.runtime_safety) {
self.mutex.lock = false;
}
}
};
/// Create a new mutex in unlocked state.
pub fn init() Mutex {
return Mutex{ .lock = lock_init };
}
/// Free a mutex created with init. Calling this while the
/// mutex is held may result in safety-checked undefined
/// behavior.
pub fn deinit(self: *Mutex) void {
self.* = undefined;
}
/// Try to acquire the mutex without blocking. Returns null if
/// the mutex is unavailable. Otherwise returns Held. Call
/// release on Held.
pub fn tryAcquire(self: *Mutex) ?Held {
if (std.debug.runtime_safety) {
if (self.lock) return null;
self.lock = true;
}
return Held{ .mutex = self };
}
/// Acquire the mutex. Will deadlock if the mutex is already
/// held by the calling thread.
pub fn acquire(self: *Mutex) Held {
return self.tryAcquire() orelse @panic("deadlock detected");
}
}
else if (builtin.os == .windows)
// https://locklessinc.com/articles/keyed_events/
extern union {
locked: u8,
waiters: u32,
const WAKE = 1 << 8;
const WAIT = 1 << 9;
pub fn init() Mutex {
return Mutex{ .waiters = 0 };
}
pub fn deinit(self: *Mutex) void {
self.* = undefined;
}
pub fn tryAcquire(self: *Mutex) ?Held {
if (@atomicRmw(u8, &self.locked, .Xchg, 1, .Acquire) != 0)
return null;
return Held{ .mutex = self };
}
pub fn acquire(self: *Mutex) Held {
return self.tryAcquire() orelse self.acquireSlow();
}
fn acquireSlow(self: *Mutex) Held {
@setCold(true);
while (true) : (SpinLock.loopHint(1)) {
const waiters = @atomicLoad(u32, &self.waiters, .Monotonic);
// try and take lock if unlocked
if ((waiters & 1) == 0) {
if (@atomicRmw(u8, &self.locked, .Xchg, 1, .Acquire) == 0)
return Held{ .mutex = self };
// otherwise, try and update the waiting count.
// then unset the WAKE bit so that another unlocker can wake up a thread.
} else if (@cmpxchgWeak(u32, &self.waiters, waiters, (waiters + WAIT) | 1, .Monotonic, .Monotonic) == null) {
ResetEvent.OsEvent.Futex.wait(@ptrCast(*i32, &self.waiters), undefined, null) catch unreachable;
_ = @atomicRmw(u32, &self.waiters, .Sub, WAKE, .Monotonic);
}
}
}
pub const Held = struct {
mutex: *Mutex,
pub fn release(self: Held) void {
// unlock without a rmw/cmpxchg instruction
@atomicStore(u8, @ptrCast(*u8, &self.mutex.locked), 0, .Release);
while (true) : (SpinLock.loopHint(1)) {
const waiters = @atomicLoad(u32, &self.mutex.waiters, .Monotonic);
// no one is waiting
if (waiters < WAIT) return;
// someone grabbed the lock and will do the wake instead
if (waiters & 1 != 0) return;
// someone else is currently waking up
if (waiters & WAKE != 0) return;
// try to decrease the waiter count & set the WAKE bit meaning a thread is waking up
if (@cmpxchgWeak(u32, &self.mutex.waiters, waiters, waiters - WAIT + WAKE, .Release, .Monotonic) == null)
return ResetEvent.OsEvent.Futex.wake(@ptrCast(*i32, &self.mutex.waiters));
}
}
};
}
else if (builtin.link_libc or builtin.os == .linux)
// stack-based version of https://github.com/Amanieu/parking_lot/blob/master/core/src/word_lock.rs
struct {
state: usize,
/// number of times to spin trying to acquire the lock.
/// https://webkit.org/blog/6161/locking-in-webkit/
const SPIN_COUNT = 40;
const MUTEX_LOCK: usize = 1 << 0;
const QUEUE_LOCK: usize = 1 << 1;
const QUEUE_MASK: usize = ~(MUTEX_LOCK | QUEUE_LOCK);
const Node = struct {
next: ?*Node,
event: ResetEvent,
};
pub fn init() Mutex {
return Mutex{ .state = 0 };
}
pub fn deinit(self: *Mutex) void {
self.* = undefined;
}
pub fn tryAcquire(self: *Mutex) ?Held {
if (@cmpxchgWeak(usize, &self.state, 0, MUTEX_LOCK, .Acquire, .Monotonic) != null)
return null;
return Held{ .mutex = self };
}
pub fn acquire(self: *Mutex) Held {
return self.tryAcquire() orelse {
self.acquireSlow();
return Held{ .mutex = self };
};
}
fn acquireSlow(self: *Mutex) void {
// inlining the fast path and hiding *Slow()
// calls behind a @setCold(true) appears to
// improve performance in release builds.
@setCold(true);
while (true) {
// try and spin for a bit to acquire the mutex if theres currently no queue
var spin_count: u32 = SPIN_COUNT;
var state = @atomicLoad(usize, &self.state, .Monotonic);
while (spin_count != 0) : (spin_count -= 1) {
if (state & MUTEX_LOCK == 0) {
_ = @cmpxchgWeak(usize, &self.state, state, state | MUTEX_LOCK, .Acquire, .Monotonic) orelse return;
} else if (state & QUEUE_MASK == 0) {
break;
}
SpinLock.yield();
state = @atomicLoad(usize, &self.state, .Monotonic);
}
// create the ResetEvent node on the stack
// (faster than threadlocal on platforms like OSX)
var node: Node = undefined;
node.event = ResetEvent.init();
defer node.event.deinit();
// we've spun too long, try and add our node to the LIFO queue.
// if the mutex becomes available in the process, try and grab it instead.
while (true) {
if (state & MUTEX_LOCK == 0) {
_ = @cmpxchgWeak(usize, &self.state, state, state | MUTEX_LOCK, .Acquire, .Monotonic) orelse return;
} else {
node.next = @intToPtr(?*Node, state & QUEUE_MASK);
const new_state = @ptrToInt(&node) | (state & ~QUEUE_MASK);
_ = @cmpxchgWeak(usize, &self.state, state, new_state, .Release, .Monotonic) orelse {
node.event.wait();
break;
};
}
SpinLock.yield();
state = @atomicLoad(usize, &self.state, .Monotonic);
}
}
}
/// Returned when the lock is acquired. Call release to
/// release.
pub const Held = struct {
mutex: *Mutex,
/// Release the held lock.
pub fn release(self: Held) void {
// first, remove the lock bit so another possibly parallel acquire() can succeed.
// use .Sub since it can be usually compiled down more efficiency
// (`lock sub` on x86) vs .And ~MUTEX_LOCK (`lock cmpxchg` loop on x86)
const state = @atomicRmw(usize, &self.mutex.state, .Sub, MUTEX_LOCK, .Release);
// if the LIFO queue isnt locked and it has a node, try and wake up the node.
if ((state & QUEUE_LOCK) == 0 and (state & QUEUE_MASK) != 0)
self.mutex.releaseSlow();
}
};
fn releaseSlow(self: *Mutex) void {
@setCold(true);
// try and lock the LFIO queue to pop a node off,
// stopping altogether if its already locked or the queue is empty
var state = @atomicLoad(usize, &self.state, .Monotonic);
while (true) : (SpinLock.loopHint(1)) {
if (state & QUEUE_LOCK != 0 or state & QUEUE_MASK == 0)
return;
state = @cmpxchgWeak(usize, &self.state, state, state | QUEUE_LOCK, .Acquire, .Monotonic) orelse break;
}
// acquired the QUEUE_LOCK, try and pop a node to wake it.
// if the mutex is locked, then unset QUEUE_LOCK and let
// the thread who holds the mutex do the wake-up on unlock()
while (true) : (SpinLock.loopHint(1)) {
if ((state & MUTEX_LOCK) != 0) {
state = @cmpxchgWeak(usize, &self.state, state, state & ~QUEUE_LOCK, .Release, .Acquire) orelse return;
} else {
const node = @intToPtr(*Node, state & QUEUE_MASK);
const new_state = @ptrToInt(node.next);
state = @cmpxchgWeak(usize, &self.state, state, new_state, .Release, .Acquire) orelse {
node.event.set();
return;
};
}
}
}
}
// for platforms without a known OS blocking
// primitive, default to SpinLock for correctness
else SpinLock;
const TestContext = struct {
mutex: *Mutex,
data: i128,
const incr_count = 10000;
};
test "std.Mutex" {
var plenty_of_memory = try std.heap.page_allocator.alloc(u8, 300 * 1024);
defer std.heap.page_allocator.free(plenty_of_memory);
var fixed_buffer_allocator = std.heap.ThreadSafeFixedBufferAllocator.init(plenty_of_memory);
var a = &fixed_buffer_allocator.allocator;
var mutex = Mutex.init();
defer mutex.deinit();
var context = TestContext{
.mutex = &mutex,
.data = 0,
};
if (builtin.single_threaded) {
worker(&context);
testing.expect(context.data == TestContext.incr_count);
} else {
const thread_count = 10;
var threads: [thread_count]*std.Thread = undefined;
for (threads) |*t| {
t.* = try std.Thread.spawn(&context, worker);
}
for (threads) |t|
t.wait();
testing.expect(context.data == thread_count * TestContext.incr_count);
}
}
fn worker(ctx: *TestContext) void {
var i: usize = 0;
while (i != TestContext.incr_count) : (i += 1) {
const held = ctx.mutex.acquire();
defer held.release();
ctx.data += 1;
}
}