diff --git a/lib/std/mutex.zig b/lib/std/mutex.zig index 5f3b9272d9..26f0142c7c 100644 --- a/lib/std/mutex.zig +++ b/lib/std/mutex.zig @@ -1,19 +1,13 @@ const std = @import("std.zig"); const builtin = @import("builtin"); -const AtomicOrder = builtin.AtomicOrder; -const AtomicRmwOp = builtin.AtomicRmwOp; const testing = std.testing; const SpinLock = std.SpinLock; -const linux = std.os.linux; -const windows = std.os.windows; +const ThreadParker = std.ThreadParker; /// Lock may be held only once. If the same thread /// tries to acquire the same mutex twice, it deadlocks. -/// This type must be initialized at runtime, and then deinitialized when no -/// longer needed, to free resources. -/// If you need static initialization, use std.StaticallyInitializedMutex. -/// The Linux implementation is based on mutex3 from -/// https://www.akkadia.org/drepper/futex.pdf +/// This type supports static initialization and is based off of Golang 1.13 runtime.lock_futex: +/// https://github.com/golang/go/blob/master/src/runtime/lock_futex.go /// When an application is built in single threaded release mode, all the functions are /// no-ops. In single threaded debug mode, there is deadlock detection. pub const Mutex = if (builtin.single_threaded) @@ -43,83 +37,78 @@ pub const Mutex = if (builtin.single_threaded) return Held{ .mutex = self }; } } -else switch (builtin.os) { - builtin.Os.linux => struct { - /// 0: unlocked - /// 1: locked, no waiters - /// 2: locked, one or more waiters - lock: i32, +else struct { + state: u32, // TODO: make this an enum + parker: ThreadParker, - pub const Held = struct { - mutex: *Mutex, + const Unlocked = 0; + const Sleeping = 1; + const Locked = 2; - pub fn release(self: Held) void { - const c = @atomicRmw(i32, &self.mutex.lock, AtomicRmwOp.Sub, 1, AtomicOrder.Release); - if (c != 1) { - _ = @atomicRmw(i32, &self.mutex.lock, AtomicRmwOp.Xchg, 0, AtomicOrder.Release); - const rc = linux.futex_wake(&self.mutex.lock, linux.FUTEX_WAKE | linux.FUTEX_PRIVATE_FLAG, 1); - switch (linux.getErrno(rc)) { - 0 => {}, - linux.EINVAL => unreachable, - else => unreachable, - } - } - } + /// number of iterations to spin yielding the cpu + const SpinCpu = 4; + /// number of iterations to perform in the cpu yield loop + const SpinCpuCount = 30; + /// number of iterations to spin yielding the thread + const SpinThread = 1; + + pub fn init() Mutex { + return Mutex{ + .state = Unlocked, + .parker = ThreadParker.init(), }; + } - pub fn init() Mutex { - return Mutex{ .lock = 0 }; + pub fn deinit(self: *Mutex) void { + self.parker.deinit(); + } + + pub const Held = struct { + mutex: *Mutex, + + pub fn release(self: Held) void { + switch (@atomicRmw(u32, &self.mutex.state, .Xchg, Unlocked, .Release)) { + Locked => {}, + Sleeping => self.mutex.parker.unpark(&self.mutex.state), + Unlocked => unreachable, // unlocking an unlocked mutex + else => unreachable, // should never be anything else + } } + }; - pub fn deinit(self: *Mutex) void {} + pub fn acquire(self: *Mutex) Held { + // Try and speculatively grab the lock. + // If it fails, the state is either Locked or Sleeping + // depending on if theres a thread stuck sleeping below. + var state = @atomicRmw(u32, &self.state, .Xchg, Locked, .Acquire); + if (state == Unlocked) + return Held{ .mutex = self }; - pub fn acquire(self: *Mutex) Held { - var c = @cmpxchgWeak(i32, &self.lock, 0, 1, AtomicOrder.Acquire, AtomicOrder.Monotonic) orelse + while (true) { + // try and acquire the lock using cpu spinning on failure + for (([SpinCpu]void)(undefined)) |_| { + var value = @atomicLoad(u32, &self.state, .Monotonic); + while (value == Unlocked) + value = @cmpxchgWeak(u32, &self.state, Unlocked, state, .Acquire, .Monotonic) orelse return Held{ .mutex = self }; + for (([SpinCpuCount]void)(undefined)) |_| + SpinLock.yieldCpu(); + } + + // try and acquire the lock using thread rescheduling on failure + for (([SpinThread]void)(undefined)) |_| { + var value = @atomicLoad(u32, &self.state, .Monotonic); + while (value == Unlocked) + value = @cmpxchgWeak(u32, &self.state, Unlocked, state, .Acquire, .Monotonic) orelse return Held{ .mutex = self }; + SpinLock.yieldThread(); + } + + // failed to acquire the lock, go to sleep until woken up by `Held.release()` + if (@atomicRmw(u32, &self.state, .Xchg, Sleeping, .Acquire) == Unlocked) return Held{ .mutex = self }; - if (c != 2) - c = @atomicRmw(i32, &self.lock, AtomicRmwOp.Xchg, 2, AtomicOrder.Acquire); - while (c != 0) { - const rc = linux.futex_wait(&self.lock, linux.FUTEX_WAIT | linux.FUTEX_PRIVATE_FLAG, 2, null); - switch (linux.getErrno(rc)) { - 0, linux.EINTR, linux.EAGAIN => {}, - linux.EINVAL => unreachable, - else => unreachable, - } - c = @atomicRmw(i32, &self.lock, AtomicRmwOp.Xchg, 2, AtomicOrder.Acquire); - } - return Held{ .mutex = self }; + state = Sleeping; + self.parker.park(&self.state, Sleeping); } - }, - // TODO once https://github.com/ziglang/zig/issues/287 (copy elision) is solved, we can make a - // better implementation of this. The problem is we need the init() function to have access to - // the address of the CRITICAL_SECTION, and then have it not move. - builtin.Os.windows => std.StaticallyInitializedMutex, - else => struct { - /// TODO better implementation than spin lock. - /// When changing this, one must also change the corresponding - /// std.StaticallyInitializedMutex code, since it aliases this type, - /// under the assumption that it works both statically and at runtime. - lock: SpinLock, - - pub const Held = struct { - mutex: *Mutex, - - pub fn release(self: Held) void { - SpinLock.Held.release(SpinLock.Held{ .spinlock = &self.mutex.lock }); - } - }; - - pub fn init() Mutex { - return Mutex{ .lock = SpinLock.init() }; - } - - pub fn deinit(self: *Mutex) void {} - - pub fn acquire(self: *Mutex) Held { - _ = self.lock.acquire(); - return Held{ .mutex = self }; - } - }, + } }; const TestContext = struct { diff --git a/lib/std/parker.zig b/lib/std/parker.zig index b6945fa000..8571c89423 100644 --- a/lib/std/parker.zig +++ b/lib/std/parker.zig @@ -159,7 +159,7 @@ const WindowsParker = struct { const key = @ptrCast(*const c_void, ptr); var waiting = @atomicLoad(u32, waiters, .Acquire); while (waiting != 0) { - waiting = @cmpxchgWeak(u32, waiters, waiting, waiting - 1, .AcqRel, .Monotonic) orelse { + waiting = @cmpxchgWeak(u32, waiters, waiting, waiting - 1, .Acquire, .Monotonic) orelse { const rc = windows.ntdll.NtReleaseKeyedEvent(self.handle, key, windows.FALSE, null); assert(rc == 0); return; @@ -338,3 +338,39 @@ const PosixParker = struct { else => unreachable, }; }; + +test "std.ThreadParker" { + const Context = struct { + parker: ThreadParker, + data: u32, + + fn receiver(self: *@This()) void { + self.parker.park(&self.data, 0); // receives 1 + assert(@atomicRmw(u32, &self.data, .Xchg, 2, .SeqCst) == 1); // sends 2 + self.parker.unpark(&self.data); // wakes up waiters on 2 + self.parker.park(&self.data, 2); // receives 3 + assert(@atomicRmw(u32, &self.data, .Xchg, 4, .SeqCst) == 3); // sends 4 + self.parker.unpark(&self.data); // wakes up waiters on 4 + } + + fn sender(self: *@This()) void { + assert(@atomicRmw(u32, &self.data, .Xchg, 1, .SeqCst) == 0); // sends 1 + self.parker.unpark(&self.data); // wakes up waiters on 1 + self.parker.park(&self.data, 1); // receives 2 + assert(@atomicRmw(u32, &self.data, .Xchg, 3, .SeqCst) == 2); // sends 3 + self.parker.unpark(&self.data); // wakes up waiters on 3 + self.parker.park(&self.data, 3); // receives 4 + } + }; + + var context = Context{ + .parker = ThreadParker.init(), + .data = 0, + }; + defer context.parker.deinit(); + + var receiver = try std.Thread.spawn(&context, Context.receiver); + defer receiver.wait(); + + context.sender(); +} \ No newline at end of file diff --git a/lib/std/spinlock.zig b/lib/std/spinlock.zig index c45778cec1..3fd73ab8a3 100644 --- a/lib/std/spinlock.zig +++ b/lib/std/spinlock.zig @@ -28,7 +28,7 @@ pub const SpinLock = struct { return Held{ .spinlock = self }; } - fn yieldCpu() void { + pub fn yieldCpu() void { switch (builtin.arch) { .i386, .x86_64 => asm volatile("pause" ::: "memory"), .arm, .aarch64 => asm volatile("yield"), @@ -36,7 +36,7 @@ pub const SpinLock = struct { } } - fn yieldThread() void { + pub fn yieldThread() void { switch (builtin.os) { .linux => assert(linux.syscall0(linux.SYS_sched_yield) == 0), .windows => _ = windows.kernel32.SwitchToThread(),