From e3cbea934ee196b3833f4c4269bc3b2796c11928 Mon Sep 17 00:00:00 2001 From: protty <45520026+kprotty@users.noreply.github.com> Date: Tue, 19 Apr 2022 19:42:15 -0500 Subject: [PATCH] std.Thread.Futex improvements (#11464) * atomic: cache_line * Thread: Futex rewrite + more native platform support * Futex: tests compile * Futex: compiles and runs test * Futex: broadcast test * Futex: fix PosixImpl for tests * Futex: fix compile errors for bsd platforms * Futex: review changes + fix timeout=0 + more comments --- lib/std/Thread/Futex.zig | 1192 +++++++++++++++++++++++++------------- lib/std/atomic.zig | 44 +- lib/std/c/dragonfly.zig | 3 + lib/std/c/freebsd.zig | 40 ++ lib/std/c/openbsd.zig | 7 + 5 files changed, 882 insertions(+), 404 deletions(-) diff --git a/lib/std/Thread/Futex.zig b/lib/std/Thread/Futex.zig index d256fb5a43..a1c8ca71e4 100644 --- a/lib/std/Thread/Futex.zig +++ b/lib/std/Thread/Futex.zig @@ -7,9 +7,7 @@ const std = @import("../std.zig"); const builtin = @import("builtin"); const Futex = @This(); -const target = builtin.target; -const single_threaded = builtin.single_threaded; - +const os = std.os; const assert = std.debug.assert; const testing = std.testing; @@ -21,160 +19,152 @@ const spinLoopHint = std.atomic.spinLoopHint; /// - The caller is unblocked by a matching `wake()`. /// - The caller is unblocked spuriously by an arbitrary internal signal. /// -/// If `timeout` is provided, and the caller is blocked for longer than `timeout` nanoseconds`, `error.TimedOut` is returned. +/// The checking of `ptr` and `expect`, along with blocking the caller, is done atomically +/// and totally ordered (sequentially consistent) with respect to other wait()/wake() calls on the same `ptr`. +pub fn wait(ptr: *const Atomic(u32), expect: u32) void { + @setCold(true); + + Impl.wait(ptr, expect, null) catch |err| switch (err) { + error.Timeout => unreachable, // null timeout meant to wait forever + }; +} + +/// Checks if `ptr` still contains the value `expect` and, if so, blocks the caller until either: +/// - The value at `ptr` is no longer equal to `expect`. +/// - The caller is unblocked by a matching `wake()`. +/// - The caller is unblocked spuriously by an arbitrary internal signal. +/// - The caller blocks for longer than the given timeout. In which case, `error.Timeout` is returned. /// /// The checking of `ptr` and `expect`, along with blocking the caller, is done atomically /// and totally ordered (sequentially consistent) with respect to other wait()/wake() calls on the same `ptr`. -pub fn wait(ptr: *const Atomic(u32), expect: u32, timeout: ?u64) error{TimedOut}!void { - if (single_threaded) { - // check whether the caller should block +pub fn timedWait(ptr: *const Atomic(u32), expect: u32, timeout_ns: u64) error{Timeout}!void { + @setCold(true); + + // Avoid calling into the OS for no-op timeouts. + if (timeout_ns == 0) { + if (ptr.load(.SeqCst) != expect) return; + return error.Timeout; + } + + return Impl.wait(ptr, expect, timeout_ns); +} + +/// Unblocks at most `max_waiters` callers blocked in a `wait()` call on `ptr`. +pub fn wake(ptr: *const Atomic(u32), max_waiters: u32) void { + @setCold(true); + + // Avoid calling into the OS if there's nothing to wake up. + if (max_waiters == 0) { + return; + } + + Impl.wake(ptr, max_waiters); +} + +const Impl = if (builtin.single_threaded) + SerialImpl +else if (builtin.os.tag == .windows) + WindowsImpl +else if (builtin.os.tag.isDarwin()) + DarwinImpl +else if (builtin.os.tag == .linux) + LinuxImpl +else if (builtin.os.tag == .freebsd) + FreebsdImpl +else if (builtin.os.tag == .openbsd) + OpenbsdImpl +else if (builtin.os.tag == .dragonfly) + DragonflyImpl +else if (std.Thread.use_pthreads) + PosixImpl +else + UnsupportedImpl; + +/// We can't do @compileError() in the `Impl` switch statement above as its eagerly evaluated. +/// So instead, we @compileError() on the methods themselves for platforms which don't support futex. +const UnsupportedImpl = struct { + fn wait(ptr: *const Atomic(u32), expect: u32, timeout: ?u64) error{Timeout}!void { + return unsupported(.{ ptr, expect, timeout }); + } + + fn wake(ptr: *const Atomic(u32), max_waiters: u32) void { + return unsupported(.{ ptr, max_waiters }); + } + + fn unsupported(unused: anytype) noreturn { + _ = unused; + @compileError("Unsupported operating system " ++ @tagName(builtin.target.os.tag)); + } +}; + +const SerialImpl = struct { + fn wait(ptr: *const Atomic(u32), expect: u32, timeout: ?u64) error{Timeout}!void { if (ptr.loadUnchecked() != expect) { return; } - // There are no other threads which could notify the caller on single_threaded. - // Therefor a wait() without a timeout would block indefinitely. - const timeout_ns = timeout orelse { - @panic("deadlock"); + // There are no threads to wake us up. + // So if we wait without a timeout we would never wake up. + const delay = timeout orelse { + unreachable; // deadlock detected }; - // Simulate blocking with the timeout knowing that: - // - no other thread can change the ptr value - // - no other thread could unblock us if we waiting on the ptr - std.time.sleep(timeout_ns); - return error.TimedOut; + std.time.sleep(delay); + return error.Timeout; } - // Avoid calling into the OS for no-op waits() - if (timeout) |timeout_ns| { - if (timeout_ns == 0) { - if (ptr.load(.SeqCst) != expect) return; - return error.TimedOut; - } - } - - return OsFutex.wait(ptr, expect, timeout); -} - -/// Unblocks at most `num_waiters` callers blocked in a `wait()` call on `ptr`. -/// `num_waiters` of 1 unblocks at most one `wait(ptr, ...)` and `maxInt(u32)` unblocks effectively all `wait(ptr, ...)`. -pub fn wake(ptr: *const Atomic(u32), num_waiters: u32) void { - if (single_threaded) return; - if (num_waiters == 0) return; - - return OsFutex.wake(ptr, num_waiters); -} - -const OsFutex = if (target.os.tag == .windows) - WindowsFutex -else if (target.os.tag == .linux) - LinuxFutex -else if (target.isDarwin()) - DarwinFutex -else if (builtin.link_libc) - PosixFutex -else - UnsupportedFutex; - -const UnsupportedFutex = struct { - fn wait(ptr: *const Atomic(u32), expect: u32, timeout: ?u64) error{TimedOut}!void { - return unsupported(.{ ptr, expect, timeout }); - } - - fn wake(ptr: *const Atomic(u32), num_waiters: u32) void { - return unsupported(.{ ptr, num_waiters }); - } - - fn unsupported(unused: anytype) noreturn { - @compileLog("Unsupported operating system", target.os.tag); - _ = unused; - unreachable; + fn wake(ptr: *const Atomic(u32), max_waiters: u32) void { + // There are no other threads to possibly wake up + _ = ptr; + _ = max_waiters; } }; -const WindowsFutex = struct { - const windows = std.os.windows; - - fn wait(ptr: *const Atomic(u32), expect: u32, timeout: ?u64) error{TimedOut}!void { - var timeout_value: windows.LARGE_INTEGER = undefined; - var timeout_ptr: ?*const windows.LARGE_INTEGER = null; +// We use WaitOnAddress through NtDll instead of API-MS-Win-Core-Synch-l1-2-0.dll +// as it's generally already a linked target and is autoloaded into all processes anyway. +const WindowsImpl = struct { + fn wait(ptr: *const Atomic(u32), expect: u32, timeout: ?u64) error{Timeout}!void { + var timeout_value: os.windows.LARGE_INTEGER = undefined; + var timeout_ptr: ?*const os.windows.LARGE_INTEGER = null; // NTDLL functions work with time in units of 100 nanoseconds. - // Positive values for timeouts are absolute time while negative is relative. - if (timeout) |timeout_ns| { + // Positive values are absolute deadlines while negative values are relative durations. + if (timeout) |delay| { + timeout_value = @intCast(os.windows.LARGE_INTEGER, delay / 100); + timeout_value = -timeout_value; timeout_ptr = &timeout_value; - timeout_value = -@intCast(windows.LARGE_INTEGER, timeout_ns / 100); } - switch (windows.ntdll.RtlWaitOnAddress( + const rc = os.windows.ntdll.RtlWaitOnAddress( @ptrCast(?*const anyopaque, ptr), @ptrCast(?*const anyopaque, &expect), @sizeOf(@TypeOf(expect)), timeout_ptr, - )) { + ); + + switch (rc) { .SUCCESS => {}, - .TIMEOUT => return error.TimedOut, + .TIMEOUT => { + assert(timeout != null); + return error.Timeout; + }, else => unreachable, } } - fn wake(ptr: *const Atomic(u32), num_waiters: u32) void { + fn wake(ptr: *const Atomic(u32), max_waiters: u32) void { const address = @ptrCast(?*const anyopaque, ptr); - switch (num_waiters) { - 1 => windows.ntdll.RtlWakeAddressSingle(address), - else => windows.ntdll.RtlWakeAddressAll(address), + assert(max_waiters != 0); + + switch (max_waiters) { + 1 => os.windows.ntdll.RtlWakeAddressSingle(address), + else => os.windows.ntdll.RtlWakeAddressAll(address), } } }; -const LinuxFutex = struct { - const linux = std.os.linux; - - fn wait(ptr: *const Atomic(u32), expect: u32, timeout: ?u64) error{TimedOut}!void { - var ts: std.os.timespec = undefined; - var ts_ptr: ?*std.os.timespec = null; - - // Futex timespec timeout is already in relative time. - if (timeout) |timeout_ns| { - ts_ptr = &ts; - ts.tv_sec = @intCast(@TypeOf(ts.tv_sec), timeout_ns / std.time.ns_per_s); - ts.tv_nsec = @intCast(@TypeOf(ts.tv_nsec), timeout_ns % std.time.ns_per_s); - } - - switch (linux.getErrno(linux.futex_wait( - @ptrCast(*const i32, ptr), - linux.FUTEX.PRIVATE_FLAG | linux.FUTEX.WAIT, - @bitCast(i32, expect), - ts_ptr, - ))) { - .SUCCESS => {}, // notified by `wake()` - .INTR => {}, // spurious wakeup - .AGAIN => {}, // ptr.* != expect - .TIMEDOUT => return error.TimedOut, - .INVAL => {}, // possibly timeout overflow - .FAULT => unreachable, - else => unreachable, - } - } - - fn wake(ptr: *const Atomic(u32), num_waiters: u32) void { - switch (linux.getErrno(linux.futex_wake( - @ptrCast(*const i32, ptr), - linux.FUTEX.PRIVATE_FLAG | linux.FUTEX.WAKE, - std.math.cast(i32, num_waiters) catch std.math.maxInt(i32), - ))) { - .SUCCESS => {}, // successful wake up - .INVAL => {}, // invalid futex_wait() on ptr done elsewhere - .FAULT => {}, // pointer became invalid while doing the wake - else => unreachable, - } - } -}; - -const DarwinFutex = struct { - const darwin = std.os.darwin; - - fn wait(ptr: *const Atomic(u32), expect: u32, timeout: ?u64) error{TimedOut}!void { +const DarwinImpl = struct { + fn wait(ptr: *const Atomic(u32), expect: u32, timeout: ?u64) error{Timeout}!void { // Darwin XNU 7195.50.7.100.1 introduced __ulock_wait2 and migrated code paths (notably pthread_cond_t) towards it: // https://github.com/apple/darwin-xnu/commit/d4061fb0260b3ed486147341b72468f836ed6c8f#diff-08f993cc40af475663274687b7c326cc6c3031e0db3ac8de7b24624610616be6 // @@ -183,58 +173,67 @@ const DarwinFutex = struct { // // ulock_wait() uses 32-bit micro-second timeouts where 0 = INFINITE or no-timeout // ulock_wait2() uses 64-bit nano-second timeouts (with the same convention) + const supports_ulock_wait2 = builtin.target.os.version_range.semver.min.major >= 11; + var timeout_ns: u64 = 0; - if (timeout) |timeout_value| { - // This should be checked by the caller. - assert(timeout_value != 0); - timeout_ns = timeout_value; + if (timeout) |delay| { + assert(delay != 0); // handled by timedWait() + timeout_ns = delay; } - const addr = @ptrCast(*const anyopaque, ptr); - const flags = darwin.UL_COMPARE_AND_WAIT | darwin.ULF_NO_ERRNO; + // If we're using `__ulock_wait` and `timeout` is too big to fit inside a `u32` count of // micro-seconds (around 70min), we'll request a shorter timeout. This is fine (users // should handle spurious wakeups), but we need to remember that we did so, so that - // we don't return `TimedOut` incorrectly. If that happens, we set this variable to + // we don't return `Timeout` incorrectly. If that happens, we set this variable to // true so that we we know to ignore the ETIMEDOUT result. var timeout_overflowed = false; + + const addr = @ptrCast(*const anyopaque, ptr); + const flags = os.darwin.UL_COMPARE_AND_WAIT | os.darwin.ULF_NO_ERRNO; const status = blk: { - if (target.os.version_range.semver.min.major >= 11) { - break :blk darwin.__ulock_wait2(flags, addr, expect, timeout_ns, 0); - } else { - const timeout_us = std.math.cast(u32, timeout_ns / std.time.ns_per_us) catch overflow: { - timeout_overflowed = true; - break :overflow std.math.maxInt(u32); - }; - break :blk darwin.__ulock_wait(flags, addr, expect, timeout_us); + if (supports_ulock_wait2) { + break :blk os.darwin.__ulock_wait2(flags, addr, expect, timeout_ns, 0); } + + const timeout_us = std.math.cast(u32, timeout_ns / std.time.ns_per_us) catch overflow: { + timeout_overflowed = true; + break :overflow std.math.maxInt(u32); + }; + + break :blk os.darwin.__ulock_wait(flags, addr, expect, timeout_us); }; if (status >= 0) return; switch (@intToEnum(std.os.E, -status)) { + // Wait was interrupted by the OS or other spurious signalling. .INTR => {}, - // Address of the futex is paged out. This is unlikely, but possible in theory, and + // Address of the futex was paged out. This is unlikely, but possible in theory, and // pthread/libdispatch on darwin bother to handle it. In this case we'll return // without waiting, but the caller should retry anyway. .FAULT => {}, - .TIMEDOUT => if (!timeout_overflowed) return error.TimedOut, + // Only report Timeout if we didn't have to cap the timeout + .TIMEDOUT => { + assert(timeout != null); + if (!timeout_overflowed) return error.Timeout; + }, else => unreachable, } } - fn wake(ptr: *const Atomic(u32), num_waiters: u32) void { - var flags: u32 = darwin.UL_COMPARE_AND_WAIT | darwin.ULF_NO_ERRNO; - if (num_waiters > 1) { - flags |= darwin.ULF_WAKE_ALL; + fn wake(ptr: *const Atomic(u32), max_waiters: u32) void { + var flags: u32 = os.darwin.UL_COMPARE_AND_WAIT | os.darwin.ULF_NO_ERRNO; + if (max_waiters > 1) { + flags |= os.darwin.ULF_WAKE_ALL; } while (true) { const addr = @ptrCast(*const anyopaque, ptr); - const status = darwin.__ulock_wake(flags, addr, 0); + const status = os.darwin.__ulock_wake(flags, addr, 0); if (status >= 0) return; switch (@intToEnum(std.os.E, -status)) { .INTR => continue, // spurious wake() - .FAULT => continue, // address of the lock was paged out + .FAULT => unreachable, // __ulock_wake doesn't generate EFAULT according to darwin pthread_cond_t .NOENT => return, // nothing was woken up .ALREADY => unreachable, // only for ULF_WAKE_THREAD else => unreachable, @@ -243,332 +242,723 @@ const DarwinFutex = struct { } }; -const PosixFutex = struct { - fn wait(ptr: *const Atomic(u32), expect: u32, timeout: ?u64) error{TimedOut}!void { - const address = @ptrToInt(ptr); - const bucket = Bucket.from(address); - var waiter: List.Node = undefined; - - { - assert(std.c.pthread_mutex_lock(&bucket.mutex) == .SUCCESS); - defer assert(std.c.pthread_mutex_unlock(&bucket.mutex) == .SUCCESS); - - if (ptr.load(.SeqCst) != expect) { - return; - } - - waiter.data = .{ .address = address }; - bucket.list.prepend(&waiter); +// https://man7.org/linux/man-pages/man2/futex.2.html +const LinuxImpl = struct { + fn wait(ptr: *const Atomic(u32), expect: u32, timeout: ?u64) error{Timeout}!void { + var ts: os.timespec = undefined; + if (timeout) |timeout_ns| { + ts.tv_sec = @intCast(@TypeOf(ts.tv_sec), timeout_ns / std.time.ns_per_s); + ts.tv_nsec = @intCast(@TypeOf(ts.tv_nsec), timeout_ns % std.time.ns_per_s); } - var timed_out = false; - waiter.data.wait(timeout) catch { - defer if (!timed_out) { - waiter.data.wait(null) catch unreachable; + const rc = os.linux.futex_wait( + @ptrCast(*const i32, &ptr.value), + os.linux.FUTEX.PRIVATE_FLAG | os.linux.FUTEX.WAIT, + @bitCast(i32, expect), + if (timeout != null) &ts else null, + ); + + switch (os.linux.getErrno(rc)) { + .SUCCESS => {}, // notified by `wake()` + .INTR => {}, // spurious wakeup + .AGAIN => {}, // ptr.* != expect + .TIMEDOUT => { + assert(timeout != null); + return error.Timeout; + }, + .INVAL => {}, // possibly timeout overflow + .FAULT => unreachable, // ptr was invalid + else => unreachable, + } + } + + fn wake(ptr: *const Atomic(u32), max_waiters: u32) void { + const rc = os.linux.futex_wake( + @ptrCast(*const i32, &ptr.value), + os.linux.FUTEX.PRIVATE_FLAG | os.linux.FUTEX.WAKE, + std.math.cast(i32, max_waiters) catch std.math.maxInt(i32), + ); + + switch (os.linux.getErrno(rc)) { + .SUCCESS => {}, // successful wake up + .INVAL => {}, // invalid futex_wait() on ptr done elsewhere + .FAULT => {}, // pointer became invalid while doing the wake + else => unreachable, + } + } +}; + +// https://www.freebsd.org/cgi/man.cgi?query=_umtx_op&sektion=2&n=1 +const FreebsdImpl = struct { + fn wait(ptr: *const Atomic(u32), expect: u32, timeout: ?u64) error{Timeout}!void { + var tm_size: usize = 0; + var tm: os.freebsd._umtx_time = undefined; + var tm_ptr: ?*const os.freebsd._umtx_time = null; + + if (timeout) |timeout_ns| { + tm_ptr = &tm; + tm_size = @sizeOf(@TypeOf(tm)); + + tm._flags = 0; // use relative time not UMTX_ABSTIME + tm._clockid = os.CLOCK.MONOTONIC; + tm._timeout.tv_sec = @intCast(@TypeOf(tm._timeout.tv_sec), timeout_ns / std.time.ns_per_s); + tm._timeout.tv_nsec = @intCast(@TypeOf(tm._timeout.tv_nsec), timeout_ns % std.time.ns_per_s); + } + + const rc = os.freebsd._umtx_op( + @ptrToInt(&ptr.value), + @enumToInt(os.freebsd.UMTX_OP.WAIT_UINT_PRIVATE), + @as(c_ulong, expect), + tm_size, + @ptrToInt(tm_ptr), + ); + + switch (os.errno(rc)) { + .SUCCESS => {}, + .FAULT => unreachable, // one of the args points to invalid memory + .INVAL => unreachable, // arguments should be correct + .TIMEDOUT => { + assert(timeout != null); + return error.Timeout; + }, + .INTR => {}, // spurious wake + else => unreachable, + } + } + + fn wake(ptr: *const Atomic(u32), max_waiters: u32) void { + const rc = os.freebsd._umtx_op( + @ptrToInt(&ptr.value), + @enumToInt(os.freebsd.UMTX_OP.WAKE_PRIVATE), + @as(c_ulong, max_waiters), + 0, // there is no timeout struct + 0, // there is no timeout struct pointer + ); + + switch (os.errno(rc)) { + .SUCCESS => {}, + .FAULT => {}, // it's ok if the ptr doesn't point to valid memory + .INVAL => unreachable, // arguments should be correct + else => unreachable, + } + } +}; + +// https://man.openbsd.org/futex.2 +const OpenbsdImpl = struct { + fn wait(ptr: *const Atomic(u32), expect: u32, timeout: ?u64) error{Timeout}!void { + var ts: os.timespec = undefined; + if (timeout) |timeout_ns| { + ts.tv_sec = @intCast(@TypeOf(ts.tv_sec), timeout_ns / std.time.ns_per_s); + ts.tv_nsec = @intCast(@TypeOf(ts.tv_nsec), timeout_ns % std.time.ns_per_s); + } + + const rc = os.openbsd.futex( + @ptrCast(*const volatile u32, &ptr.value), + os.openbsd.FUTEX_WAIT | os.openbsd.FUTEX_PRIVATE_FLAG, + @bitCast(c_int, expect), + if (timeout != null) &ts else null, + null, // FUTEX_WAIT takes no requeue address + ); + + switch (os.errno(rc)) { + .SUCCESS => {}, // woken up by wake + .NOSYS => unreachable, // the futex operation shouldn't be invalid + .FAULT => unreachable, // ptr was invalid + .AGAIN => {}, // ptr != expect + .INVAL => unreachable, // invalid timeout + .TIMEDOUT => { + assert(timeout != null); + return error.Timeout; + }, + .INTR => {}, // spurious wake from signal + .CANCELED => {}, // spurious wake from signal with SA_RESTART + else => unreachable, + } + } + + fn wake(ptr: *const Atomic(u32), max_waiters: u32) void { + const rc = os.openbsd.futex( + @ptrCast(*const volatile u32, &ptr.value), + os.openbsd.FUTEX_WAKE | os.openbsd.FUTEX_PRIVATE_FLAG, + std.math.cast(c_int, max_waiters) catch std.math.maxInt(c_int), + null, // FUTEX_WAKE takes no timeout ptr + null, // FUTEX_WAKE takes no requeue address + ); + + // returns number of threads woken up. + assert(rc >= 0); + } +}; + +// https://man.dragonflybsd.org/?command=umtx§ion=2 +const DragonflyImpl = struct { + fn wait(ptr: *const Atomic(u32), expect: u32, timeout: ?u64) error{Timeout}!void { + // Dragonfly uses a scheme where 0 timeout means wait until signaled or spurious wake. + // It's reporting of timeout's is also unrealiable so we use an external timing source (Timer) instead. + var timeout_us: c_int = 0; + var timeout_overflowed = false; + var sleep_timer: std.time.Timer = undefined; + + if (timeout) |delay| { + assert(delay != 0); // handled by timedWait(). + timeout_us = std.math.cast(c_int, delay / std.time.ns_per_us) catch blk: { + timeout_overflowed = true; + break :blk std.math.maxInt(c_int); }; - assert(std.c.pthread_mutex_lock(&bucket.mutex) == .SUCCESS); - defer assert(std.c.pthread_mutex_unlock(&bucket.mutex) == .SUCCESS); - - if (waiter.data.address == address) { - timed_out = true; - bucket.list.remove(&waiter); + // Only need to record the start time if we can provide somewhat accurate error.Timeout's + if (!timeout_overflowed) { + sleep_timer = std.time.Timer.start() catch unreachable; } - }; + } - waiter.data.deinit(); - if (timed_out) { - return error.TimedOut; + const value = @bitCast(c_int, expect); + const addr = @ptrCast(*const volatile c_int, &ptr.value); + const rc = os.dragonfly.umtx_sleep(addr, value, timeout_us); + + switch (os.errno(rc)) { + .SUCCESS => {}, + .BUSY => {}, // ptr != expect + .AGAIN => { // maybe timed out, or paged out, or hit 2s kernel refresh + if (timeout) |timeout_ns| { + // Report error.Timeout only if we know the timeout duration has passed. + // If not, there's not much choice other than treating it as a spurious wake. + if (!timeout_overflowed and sleep_timer.read() >= timeout_ns) { + return error.Timeout; + } + } + }, + .INTR => {}, // spurious wake + .INVAL => unreachable, // invalid timeout + else => unreachable, } } - fn wake(ptr: *const Atomic(u32), num_waiters: u32) void { - const address = @ptrToInt(ptr); - const bucket = Bucket.from(address); - var can_notify = num_waiters; + fn wake(ptr: *const Atomic(u32), max_waiters: u32) void { + // A count of zero means wake all waiters. + assert(max_waiters != 0); + const to_wake = std.math.cast(c_int, max_waiters) catch 0; - var notified = List{}; - defer while (notified.popFirst()) |waiter| { - waiter.data.notify(); - }; - - assert(std.c.pthread_mutex_lock(&bucket.mutex) == .SUCCESS); - defer assert(std.c.pthread_mutex_unlock(&bucket.mutex) == .SUCCESS); - - var waiters = bucket.list.first; - while (waiters) |waiter| { - assert(waiter.data.address != null); - waiters = waiter.next; - - if (waiter.data.address != address) continue; - if (can_notify == 0) break; - can_notify -= 1; - - bucket.list.remove(waiter); - waiter.data.address = null; - notified.prepend(waiter); - } + // https://man.dragonflybsd.org/?command=umtx§ion=2 + // > umtx_wakeup() will generally return 0 unless the address is bad. + // We are fine with the address being bad (e.g. for Semaphore.post() where Semaphore.wait() frees the Semaphore) + const addr = @ptrCast(*const volatile c_int, &ptr.value); + _ = os.dragonfly.umtx_wakeup(addr, to_wake); } +}; - const Bucket = struct { - mutex: std.c.pthread_mutex_t = .{}, - list: List = .{}, +/// Modified version of linux's futex and Go's sema to implement userspace wait queues with pthread: +/// https://code.woboq.org/linux/linux/kernel/futex.c.html +/// https://go.dev/src/runtime/sema.go +const PosixImpl = struct { + const Event = struct { + cond: std.c.pthread_cond_t, + mutex: std.c.pthread_mutex_t, + state: enum { empty, waiting, notified }, - var buckets = [_]Bucket{.{}} ** 64; - - fn from(address: usize) *Bucket { - return &buckets[address % buckets.len]; - } - }; - - const List = std.TailQueue(struct { - address: ?usize, - state: State = .empty, - cond: std.c.pthread_cond_t = .{}, - mutex: std.c.pthread_mutex_t = .{}, - - const Self = @This(); - const State = enum { - empty, - waiting, - notified, - }; - - fn deinit(self: *Self) void { - _ = std.c.pthread_cond_destroy(&self.cond); - _ = std.c.pthread_mutex_destroy(&self.mutex); + fn init(self: *Event) void { + // Use static init instead of pthread_cond/mutex_init() since this is generally faster. + self.cond = .{}; + self.mutex = .{}; + self.state = .empty; } - fn wait(self: *Self, timeout: ?u64) error{TimedOut}!void { + fn deinit(self: *Event) void { + // Some platforms reportedly give EINVAL for statically initialized pthread types. + const rc = std.c.pthread_cond_destroy(&self.cond); + assert(rc == .SUCCESS or rc == .INVAL); + + const rm = std.c.pthread_mutex_destroy(&self.mutex); + assert(rm == .SUCCESS or rm == .INVAL); + + self.* = undefined; + } + + fn wait(self: *Event, timeout: ?u64) error{Timeout}!void { assert(std.c.pthread_mutex_lock(&self.mutex) == .SUCCESS); defer assert(std.c.pthread_mutex_unlock(&self.mutex) == .SUCCESS); - switch (self.state) { - .empty => self.state = .waiting, - .waiting => unreachable, - .notified => return, + // Early return if the event was already set. + if (self.state == .notified) { + return; } - var ts: std.os.timespec = undefined; - var ts_ptr: ?*const std.os.timespec = null; + // Compute the absolute timeout if one was specified. + // POSIX requires that REALTIME is used by default for the pthread timedwait functions. + // This can be changed with pthread_condattr_setclock, but it's an extension and may not be available everywhere. + var ts: os.timespec = undefined; if (timeout) |timeout_ns| { - ts_ptr = &ts; - std.os.clock_gettime(std.os.CLOCK.REALTIME, &ts) catch unreachable; - ts.tv_sec += @intCast(@TypeOf(ts.tv_sec), timeout_ns / std.time.ns_per_s); + os.clock_gettime(os.CLOCK.REALTIME, &ts) catch unreachable; + ts.tv_sec +|= @intCast(@TypeOf(ts.tv_sec), timeout_ns / std.time.ns_per_s); ts.tv_nsec += @intCast(@TypeOf(ts.tv_nsec), timeout_ns % std.time.ns_per_s); + if (ts.tv_nsec >= std.time.ns_per_s) { - ts.tv_sec += 1; + ts.tv_sec +|= 1; ts.tv_nsec -= std.time.ns_per_s; } } - while (true) { - switch (self.state) { - .empty => unreachable, - .waiting => {}, - .notified => return, - } + // Start waiting on the event - there can be only one thread waiting. + assert(self.state == .empty); + self.state = .waiting; - const ts_ref = ts_ptr orelse { - assert(std.c.pthread_cond_wait(&self.cond, &self.mutex) == .SUCCESS); - continue; + while (true) { + // Block using either pthread_cond_wait or pthread_cond_timewait if there's an absolute timeout. + const rc = blk: { + if (timeout == null) break :blk std.c.pthread_cond_wait(&self.cond, &self.mutex); + break :blk std.c.pthread_cond_timedwait(&self.cond, &self.mutex, &ts); }; - const rc = std.c.pthread_cond_timedwait(&self.cond, &self.mutex, ts_ref); + // After waking up, check if the event was set. + if (self.state == .notified) { + return; + } + + assert(self.state == .waiting); switch (rc) { .SUCCESS => {}, .TIMEDOUT => { + // If timed out, reset the event to avoid the set() thread doing an unnecessary signal(). self.state = .empty; - return error.TimedOut; + return error.Timeout; }, + .INVAL => unreachable, // cond, mutex, and potentially ts should all be valid + .PERM => unreachable, // mutex is locked when cond_*wait() functions are called else => unreachable, } } } - fn notify(self: *Self) void { + fn set(self: *Event) void { assert(std.c.pthread_mutex_lock(&self.mutex) == .SUCCESS); defer assert(std.c.pthread_mutex_unlock(&self.mutex) == .SUCCESS); - switch (self.state) { - .empty => self.state = .notified, - .waiting => { - self.state = .notified; - assert(std.c.pthread_cond_signal(&self.cond) == .SUCCESS); - }, - .notified => unreachable, + // Make sure that multiple calls to set() were not done on the same Event. + const old_state = self.state; + assert(old_state != .notified); + + // Mark the event as set and wake up the waiting thread if there was one. + // This must be done while the mutex as the wait() thread could deallocate + // the condition variable once it observes the new state, potentially causing a UAF if done unlocked. + self.state = .notified; + if (old_state == .waiting) { + assert(std.c.pthread_cond_signal(&self.cond) == .SUCCESS); } } - }); + }; + + const Treap = std.Treap(usize, std.math.order); + const Waiter = struct { + node: Treap.Node, + prev: ?*Waiter, + next: ?*Waiter, + tail: ?*Waiter, + is_queued: bool, + event: Event, + }; + + // An unordered set of Waiters + const WaitList = struct { + top: ?*Waiter = null, + len: usize = 0, + + fn push(self: *WaitList, waiter: *Waiter) void { + waiter.next = self.top; + self.top = waiter; + self.len += 1; + } + + fn pop(self: *WaitList) ?*Waiter { + const waiter = self.top orelse return null; + self.top = waiter.next; + self.len -= 1; + return waiter; + } + }; + + const WaitQueue = struct { + fn insert(treap: *Treap, address: usize, waiter: *Waiter) void { + // prepare the waiter to be inserted. + waiter.next = null; + waiter.is_queued = true; + + // Find the wait queue entry associated with the address. + // If there isn't a wait queue on the address, this waiter creates the queue. + var entry = treap.getEntryFor(address); + const entry_node = entry.node orelse { + waiter.prev = null; + waiter.tail = waiter; + entry.set(&waiter.node); + return; + }; + + // There's a wait queue on the address; get the queue head and tail. + const head = @fieldParentPtr(Waiter, "node", entry_node); + const tail = head.tail orelse unreachable; + + // Push the waiter to the tail by replacing it and linking to the previous tail. + head.tail = waiter; + tail.next = waiter; + waiter.prev = tail; + } + + fn remove(treap: *Treap, address: usize, max_waiters: usize) WaitList { + // Find the wait queue associated with this address and get the head/tail if any. + var entry = treap.getEntryFor(address); + var queue_head = if (entry.node) |node| @fieldParentPtr(Waiter, "node", node) else null; + const queue_tail = if (queue_head) |head| head.tail else null; + + // Once we're done updating the head, fix it's tail pointer and update the treap's queue head as well. + defer entry.set(blk: { + const new_head = queue_head orelse break :blk null; + new_head.tail = queue_tail; + break :blk &new_head.node; + }); + + var removed = WaitList{}; + while (removed.len < max_waiters) { + // dequeue and collect waiters from their wait queue. + const waiter = queue_head orelse break; + queue_head = waiter.next; + removed.push(waiter); + + // When dequeueing, we must mark is_queued as false. + // This ensures that a waiter which calls tryRemove() returns false. + assert(waiter.is_queued); + waiter.is_queued = false; + } + + return removed; + } + + fn tryRemove(treap: *Treap, address: usize, waiter: *Waiter) bool { + if (!waiter.is_queued) { + return false; + } + + queue_remove: { + // Find the wait queue associated with the address. + var entry = blk: { + // A waiter without a previous link means it's the queue head that's in the treap so we can avoid lookup. + if (waiter.prev == null) { + assert(waiter.node.key == address); + break :blk treap.getEntryForExisting(&waiter.node); + } + break :blk treap.getEntryFor(address); + }; + + // The queue head and tail must exist if we're removing a queued waiter. + const head = @fieldParentPtr(Waiter, "node", entry.node orelse unreachable); + const tail = head.tail orelse unreachable; + + // A waiter with a previous link is never the head of the queue. + if (waiter.prev) |prev| { + assert(waiter != head); + prev.next = waiter.next; + + // A waiter with both a previous and next link is in the middle. + // We only need to update the surrounding waiter's links to remove it. + if (waiter.next) |next| { + assert(waiter != tail); + next.prev = waiter.prev; + break :queue_remove; + } + + // A waiter with a previous but no next link means it's the tail of the queue. + // In that case, we need to update the head's tail reference. + assert(waiter == tail); + head.tail = waiter.prev; + break :queue_remove; + } + + // A waiter with no previous link means it's the queue head of queue. + // We must replace (or remove) the head waiter reference in the treap. + assert(waiter == head); + entry.set(blk: { + const new_head = waiter.next orelse break :blk null; + new_head.tail = head.tail; + break :blk &new_head.node; + }); + } + + // Mark the waiter as successfully removed. + waiter.is_queued = false; + return true; + } + }; + + const Bucket = struct { + mutex: std.c.pthread_mutex_t align(std.atomic.cache_line) = .{}, + pending: Atomic(usize) = Atomic(usize).init(0), + treap: Treap = .{}, + + // Global array of buckets that addresses map to. + // Bucket array size is pretty much arbitrary here, but it must be a power of two for fibonacci hashing. + var buckets = [_]Bucket{.{}} ** @bitSizeOf(usize); + + // https://github.com/Amanieu/parking_lot/blob/1cf12744d097233316afa6c8b7d37389e4211756/core/src/parking_lot.rs#L343-L353 + fn from(address: usize) *Bucket { + // The upper `@bitSizeOf(usize)` bits of the fibonacci golden ratio. + // Hashing this via (h * k) >> (64 - b) where k=golden-ration and b=bitsize-of-array + // evenly lays out h=hash values over the bit range even when the hash has poor entropy (identity-hash for pointers). + const max_multiplier_bits = @bitSizeOf(usize); + const fibonacci_multiplier = 0x9E3779B97F4A7C15 >> (64 - max_multiplier_bits); + + const max_bucket_bits = @ctz(usize, buckets.len); + comptime assert(std.math.isPowerOfTwo(buckets.len)); + + const index = (address *% fibonacci_multiplier) >> (max_multiplier_bits - max_bucket_bits); + return &buckets[index]; + } + }; + + const Address = struct { + fn from(ptr: *const Atomic(u32)) usize { + // Get the alignment of the pointer. + const alignment = @alignOf(Atomic(u32)); + comptime assert(std.math.isPowerOfTwo(alignment)); + + // Make sure the pointer is aligned, + // then cut off the zero bits from the alignment to get the unique address. + const addr = @ptrToInt(ptr); + assert(addr & (alignment - 1) == 0); + return addr >> @ctz(usize, alignment); + } + }; + + fn wait(ptr: *const Atomic(u32), expect: u32, timeout: ?u64) error{Timeout}!void { + const address = Address.from(ptr); + const bucket = Bucket.from(address); + + // Announce that there's a waiter in the bucket before checking the ptr/expect condition. + // If the announcement is reordered after the ptr check, the waiter could deadlock: + // + // - T1: checks ptr == expect which is true + // - T2: updates ptr to != expect + // - T2: does Futex.wake(), sees no pending waiters, exits + // - T1: bumps pending waiters (was reordered after the ptr == expect check) + // - T1: goes to sleep and misses both the ptr change and T2's wake up + // + // SeqCst as Acquire barrier to ensure the announcement happens before the ptr check below. + // SeqCst as shared modification order to form a happens-before edge with the fence(.SeqCst)+load() in wake(). + var pending = bucket.pending.fetchAdd(1, .SeqCst); + assert(pending < std.math.maxInt(usize)); + + // If the wait gets cancelled, remove the pending count we previously added. + // This is done outside the mutex lock to keep the critical section short in case of contention. + var cancelled = false; + defer if (cancelled) { + pending = bucket.pending.fetchSub(1, .Monotonic); + assert(pending > 0); + }; + + var waiter: Waiter = undefined; + { + assert(std.c.pthread_mutex_lock(&bucket.mutex) == .SUCCESS); + defer assert(std.c.pthread_mutex_unlock(&bucket.mutex) == .SUCCESS); + + cancelled = ptr.load(.Monotonic) != expect; + if (cancelled) { + return; + } + + waiter.event.init(); + WaitQueue.insert(&bucket.treap, address, &waiter); + } + + defer { + assert(!waiter.is_queued); + waiter.event.deinit(); + } + + waiter.event.wait(timeout) catch { + // If we fail to cancel after a timeout, it means a wake() thread dequeued us and will wake us up. + // We must wait until the event is set as that's a signal that the wake() thread wont access the waiter memory anymore. + // If we return early without waiting, the waiter on the stack would be invalidated and the wake() thread risks a UAF. + defer if (!cancelled) waiter.event.wait(null) catch unreachable; + + assert(std.c.pthread_mutex_lock(&bucket.mutex) == .SUCCESS); + defer assert(std.c.pthread_mutex_unlock(&bucket.mutex) == .SUCCESS); + + cancelled = WaitQueue.tryRemove(&bucket.treap, address, &waiter); + if (cancelled) { + return error.Timeout; + } + }; + } + + fn wake(ptr: *const Atomic(u32), max_waiters: u32) void { + const address = Address.from(ptr); + const bucket = Bucket.from(address); + + // Quick check if there's even anything to wake up. + // The change to the ptr's value must happen before we check for pending waiters. + // If not, the wake() thread could miss a sleeping waiter and have it deadlock: + // + // - T2: p = has pending waiters (reordered before the ptr update) + // - T1: bump pending waiters + // - T1: if ptr == expected: sleep() + // - T2: update ptr != expected + // - T2: p is false from earlier so doesn't wake (T1 missed ptr update and T2 missed T1 sleeping) + // + // What we really want here is a Release load, but that doesn't exist under the C11 memory model. + // We could instead do `bucket.pending.fetchAdd(0, Release) == 0` which achieves effectively the same thing, + // but the RMW operation unconditionally stores which invalidates the cache-line for others causing unnecessary contention. + // + // Instead we opt to do a full-fence + load instead which avoids taking ownership of the cache-line. + // fence(SeqCst) effectively converts the ptr update to SeqCst and the pending load to SeqCst: creating a Store-Load barrier. + // + // The pending count increment in wait() must also now use SeqCst for the update + this pending load + // to be in the same modification order as our load isn't using Release/Acquire to guarantee it. + std.atomic.fence(.SeqCst); + if (bucket.pending.load(.Monotonic) == 0) { + return; + } + + // Keep a list of all the waiters notified and wake then up outside the mutex critical section. + var notified = WaitList{}; + defer if (notified.len > 0) { + const pending = bucket.pending.fetchSub(notified.len, .Monotonic); + assert(pending >= notified.len); + + while (notified.pop()) |waiter| { + assert(!waiter.is_queued); + waiter.event.set(); + } + }; + + assert(std.c.pthread_mutex_lock(&bucket.mutex) == .SUCCESS); + defer assert(std.c.pthread_mutex_unlock(&bucket.mutex) == .SUCCESS); + + // Another pending check again to avoid the WaitQueue lookup if not necessary. + if (bucket.pending.load(.Monotonic) > 0) { + notified = WaitQueue.remove(&bucket.treap, address, max_waiters); + } + } }; -test "Futex - wait/wake" { +test "Futex - smoke test" { var value = Atomic(u32).init(0); - Futex.wait(&value, 1, null) catch unreachable; - const wait_noop_result = Futex.wait(&value, 0, 0); - try testing.expectError(error.TimedOut, wait_noop_result); + // Try waits with invalid values. + Futex.wait(&value, 0xdeadbeef); + Futex.timedWait(&value, 0xdeadbeef, 0) catch {}; - const wait_longer_result = Futex.wait(&value, 0, std.time.ns_per_ms); - try testing.expectError(error.TimedOut, wait_longer_result); + // Try timeout waits. + try testing.expectError(error.Timeout, Futex.timedWait(&value, 0, 0)); + try testing.expectError(error.Timeout, Futex.timedWait(&value, 0, std.time.ns_per_ms)); + // Try wakes Futex.wake(&value, 0); Futex.wake(&value, 1); Futex.wake(&value, std.math.maxInt(u32)); } -test "Futex - Signal" { - if (single_threaded) { +test "Futex - signaling" { + // This test requires spawning threads + if (builtin.single_threaded) { return error.SkipZigTest; } + const num_threads = 4; + const num_iterations = 4; + const Paddle = struct { value: Atomic(u32) = Atomic(u32).init(0), current: u32 = 0, - fn run(self: *@This(), hit_to: *@This()) !void { - var iterations: usize = 4; - while (iterations > 0) : (iterations -= 1) { - var value: u32 = undefined; - while (true) { - value = self.value.load(.Acquire); - if (value != self.current) break; - Futex.wait(&self.value, self.current, null) catch unreachable; - } - - try testing.expectEqual(value, self.current + 1); - self.current = value; - - _ = hit_to.value.fetchAdd(1, .Release); - Futex.wake(&hit_to.value, 1); - } - } - }; - - var ping = Paddle{}; - var pong = Paddle{}; - - const t1 = try std.Thread.spawn(.{}, Paddle.run, .{ &ping, &pong }); - defer t1.join(); - - const t2 = try std.Thread.spawn(.{}, Paddle.run, .{ &pong, &ping }); - defer t2.join(); - - _ = ping.value.fetchAdd(1, .Release); - Futex.wake(&ping.value, 1); -} - -test "Futex - Broadcast" { - if (single_threaded) { - return error.SkipZigTest; - } - - const Context = struct { - threads: [4]std.Thread = undefined, - broadcast: Atomic(u32) = Atomic(u32).init(0), - notified: Atomic(usize) = Atomic(usize).init(0), - - const BROADCAST_EMPTY = 0; - const BROADCAST_SENT = 1; - const BROADCAST_RECEIVED = 2; - - fn runSender(self: *@This()) !void { - self.broadcast.store(BROADCAST_SENT, .Monotonic); - Futex.wake(&self.broadcast, @intCast(u32, self.threads.len)); - - while (true) { - const broadcast = self.broadcast.load(.Acquire); - if (broadcast == BROADCAST_RECEIVED) break; - try testing.expectEqual(broadcast, BROADCAST_SENT); - Futex.wait(&self.broadcast, broadcast, null) catch unreachable; - } - } - - fn runReceiver(self: *@This()) void { - while (true) { - const broadcast = self.broadcast.load(.Acquire); - if (broadcast == BROADCAST_SENT) break; - assert(broadcast == BROADCAST_EMPTY); - Futex.wait(&self.broadcast, broadcast, null) catch unreachable; - } - - const notified = self.notified.fetchAdd(1, .Monotonic); - if (notified + 1 == self.threads.len) { - self.broadcast.store(BROADCAST_RECEIVED, .Release); - Futex.wake(&self.broadcast, 1); - } - } - }; - - var ctx = Context{}; - for (ctx.threads) |*thread| - thread.* = try std.Thread.spawn(.{}, Context.runReceiver, .{&ctx}); - defer for (ctx.threads) |thread| - thread.join(); - - // Try to wait for the threads to start before running runSender(). - // NOTE: not actually needed for correctness. - std.time.sleep(16 * std.time.ns_per_ms); - try ctx.runSender(); - - const notified = ctx.notified.load(.Monotonic); - try testing.expectEqual(notified, ctx.threads.len); -} - -test "Futex - Chain" { - if (single_threaded) { - return error.SkipZigTest; - } - - const Signal = struct { - value: Atomic(u32) = Atomic(u32).init(0), - - fn wait(self: *@This()) void { - while (true) { - const value = self.value.load(.Acquire); - if (value == 1) break; - assert(value == 0); - Futex.wait(&self.value, 0, null) catch unreachable; - } - } - - fn notify(self: *@This()) void { - assert(self.value.load(.Unordered) == 0); - self.value.store(1, .Release); + fn hit(self: *@This()) void { + _ = self.value.fetchAdd(1, .Release); Futex.wake(&self.value, 1); } - }; - const Context = struct { - completed: Signal = .{}, - threads: [4]struct { - thread: std.Thread, - signal: Signal, - } = undefined, + fn run(self: *@This(), hit_to: *@This()) !void { + while (self.current < num_iterations) { + // Wait for the value to change from hit() + var new_value: u32 = undefined; + while (true) { + new_value = self.value.load(.Acquire); + if (new_value != self.current) break; + Futex.wait(&self.value, self.current); + } - fn run(self: *@This(), index: usize) void { - const this_signal = &self.threads[index].signal; + // change the internal "current" value + try testing.expectEqual(new_value, self.current + 1); + self.current = new_value; - var next_signal = &self.completed; - if (index + 1 < self.threads.len) { - next_signal = &self.threads[index + 1].signal; + // hit the next paddle + hit_to.hit(); } - - this_signal.wait(); - next_signal.notify(); } }; - var ctx = Context{}; - for (ctx.threads) |*entry, index| { - entry.signal = .{}; - entry.thread = try std.Thread.spawn(.{}, Context.run, .{ &ctx, index }); + var paddles = [_]Paddle{.{}} ** num_threads; + var threads = [_]std.Thread{undefined} ** num_threads; + + // Create a circle of paddles which hit each other + for (threads) |*t, i| { + const paddle = &paddles[i]; + const hit_to = &paddles[(i + 1) % paddles.len]; + t.* = try std.Thread.spawn(.{}, Paddle.run, .{ paddle, hit_to }); } - ctx.threads[0].signal.notify(); - ctx.completed.wait(); - - for (ctx.threads) |entry| { - entry.thread.join(); - } + // Hit the first paddle and wait for them all to complete by hitting each other for num_iterations. + paddles[0].hit(); + for (threads) |t| t.join(); + for (paddles) |p| try testing.expectEqual(p.current, num_iterations); +} + +test "Futex - broadcasting" { + // This test requires spawning threads + if (builtin.single_threaded) { + return error.SkipZigTest; + } + + const num_threads = 4; + const num_iterations = 4; + + const Barrier = struct { + count: Atomic(u32) = Atomic(u32).init(num_threads), + futex: Atomic(u32) = Atomic(u32).init(0), + + fn wait(self: *@This()) !void { + // Decrement the counter. + // Release ensures stuff before this barrier.wait() happens before the last one. + const count = self.count.fetchSub(1, .Release); + try testing.expect(count <= num_threads); + try testing.expect(count > 0); + + // First counter to reach zero wakes all other threads. + // Acquire for the last counter ensures stuff before previous barrier.wait()s happened before it. + // Release on futex update ensures stuff before all barrier.wait()'s happens before they all return. + if (count - 1 == 0) { + _ = self.count.load(.Acquire); // TODO: could be fence(Acquire) if not for TSAN + self.futex.store(1, .Release); + Futex.wake(&self.futex, num_threads - 1); + return; + } + + // Other threads wait until last counter wakes them up. + // Acquire on futex synchronizes with last barrier count to ensure stuff before all barrier.wait()'s happen before us. + while (self.futex.load(.Acquire) == 0) { + Futex.wait(&self.futex, 0); + } + } + }; + + const Broadcast = struct { + barriers: [num_iterations]Barrier = [_]Barrier{.{}} ** num_iterations, + threads: [num_threads]std.Thread = undefined, + + fn run(self: *@This()) !void { + for (self.barriers) |*barrier| { + try barrier.wait(); + } + } + }; + + var broadcast = Broadcast{}; + for (broadcast.threads) |*t| t.* = try std.Thread.spawn(.{}, Broadcast.run, .{&broadcast}); + for (broadcast.threads) |t| t.join(); } diff --git a/lib/std/atomic.zig b/lib/std/atomic.zig index b1b5789b02..ef1cce1774 100644 --- a/lib/std/atomic.zig +++ b/lib/std/atomic.zig @@ -1,5 +1,5 @@ const std = @import("std.zig"); -const target = @import("builtin").target; +const builtin = @import("builtin"); pub const Ordering = std.builtin.AtomicOrder; @@ -40,7 +40,7 @@ test "fence/compilerFence" { /// Signals to the processor that the caller is inside a busy-wait spin-loop. pub inline fn spinLoopHint() void { - switch (target.cpu.arch) { + switch (builtin.target.cpu.arch) { // No-op instruction that can hint to save (or share with a hardware-thread) // pipelining/power resources // https://software.intel.com/content/www/us/en/develop/articles/benefitting-power-and-performance-sleep-loops.html @@ -59,7 +59,7 @@ pub inline fn spinLoopHint() void { // `yield` was introduced in v6k but is also available on v6m. // https://www.keil.com/support/man/docs/armasm/armasm_dom1361289926796.htm .arm, .armeb, .thumb, .thumbeb => { - const can_yield = comptime std.Target.arm.featureSetHasAny(target.cpu.features, .{ + const can_yield = comptime std.Target.arm.featureSetHasAny(builtin.target.cpu.features, .{ .has_v6k, .has_v6m, }); if (can_yield) { @@ -80,3 +80,41 @@ test "spinLoopHint" { spinLoopHint(); } } + +/// The estimated size of the CPU's cache line when atomically updating memory. +/// Add this much padding or align to this boundary to avoid atomically-updated +/// memory from forcing cache invalidations on near, but non-atomic, memory. +/// +// https://en.wikipedia.org/wiki/False_sharing +// https://github.com/golang/go/search?q=CacheLinePadSize +pub const cache_line = switch (builtin.cpu.arch) { + // x86_64: Starting from Intel's Sandy Bridge, the spatial prefetcher pulls in pairs of 64-byte cache lines at a time. + // - https://www.intel.com/content/dam/www/public/us/en/documents/manuals/64-ia-32-architectures-optimization-manual.pdf + // - https://github.com/facebook/folly/blob/1b5288e6eea6df074758f877c849b6e73bbb9fbb/folly/lang/Align.h#L107 + // + // aarch64: Some big.LITTLE ARM archs have "big" cores with 128-byte cache lines: + // - https://www.mono-project.com/news/2016/09/12/arm64-icache/ + // - https://cpufun.substack.com/p/more-m1-fun-hardware-information + // + // powerpc64: PPC has 128-byte cache lines + // - https://github.com/golang/go/blob/3dd58676054223962cd915bb0934d1f9f489d4d2/src/internal/cpu/cpu_ppc64x.go#L9 + .x86_64, .aarch64, .powerpc64 => 128, + + // These platforms reportedly have 32-byte cache lines + // - https://github.com/golang/go/blob/3dd58676054223962cd915bb0934d1f9f489d4d2/src/internal/cpu/cpu_arm.go#L7 + // - https://github.com/golang/go/blob/3dd58676054223962cd915bb0934d1f9f489d4d2/src/internal/cpu/cpu_mips.go#L7 + // - https://github.com/golang/go/blob/3dd58676054223962cd915bb0934d1f9f489d4d2/src/internal/cpu/cpu_mipsle.go#L7 + // - https://github.com/golang/go/blob/3dd58676054223962cd915bb0934d1f9f489d4d2/src/internal/cpu/cpu_mips64x.go#L9 + // - https://github.com/golang/go/blob/3dd58676054223962cd915bb0934d1f9f489d4d2/src/internal/cpu/cpu_riscv64.go#L7 + .arm, .mips, .mips64, .riscv64 => 32, + + // This platform reportedly has 256-byte cache lines + // - https://github.com/golang/go/blob/3dd58676054223962cd915bb0934d1f9f489d4d2/src/internal/cpu/cpu_s390x.go#L7 + .s390x => 256, + + // Other x86 and WASM platforms have 64-byte cache lines. + // The rest of the architectures are assumed to be similar. + // - https://github.com/golang/go/blob/dda2991c2ea0c5914714469c4defc2562a907230/src/internal/cpu/cpu_x86.go#L9 + // - https://github.com/golang/go/blob/3dd58676054223962cd915bb0934d1f9f489d4d2/src/internal/cpu/cpu_wasm.go#L7 + else => 64, +}; diff --git a/lib/std/c/dragonfly.zig b/lib/std/c/dragonfly.zig index 186525a9ce..2e388d4c80 100644 --- a/lib/std/c/dragonfly.zig +++ b/lib/std/c/dragonfly.zig @@ -36,6 +36,9 @@ pub const sem_t = ?*opaque {}; pub extern "c" fn pthread_setname_np(thread: std.c.pthread_t, name: [*:0]const u8) E; pub extern "c" fn pthread_getname_np(thread: std.c.pthread_t, name: [*:0]u8, len: usize) E; +pub extern "c" fn umtx_sleep(ptr: *const volatile c_int, value: c_int, timeout: c_int) c_int; +pub extern "c" fn umtx_wakeup(ptr: *const volatile c_int, count: c_int) c_int; + // See: // - https://gitweb.dragonflybsd.org/dragonfly.git/blob/HEAD:/include/unistd.h // - https://gitweb.dragonflybsd.org/dragonfly.git/blob/HEAD:/sys/sys/types.h diff --git a/lib/std/c/freebsd.zig b/lib/std/c/freebsd.zig index 8972b6d6dc..e40d7acd8d 100644 --- a/lib/std/c/freebsd.zig +++ b/lib/std/c/freebsd.zig @@ -62,6 +62,46 @@ pub const sem_t = extern struct { _padding: u32, }; +// https://github.com/freebsd/freebsd-src/blob/main/sys/sys/umtx.h +pub const UMTX_OP = enum(c_int) { + LOCK = 0, + UNLOCK = 1, + WAIT = 2, + WAKE = 3, + MUTEX_TRYLOCK = 4, + MUTEX_LOCK = 5, + MUTEX_UNLOCK = 6, + SET_CEILING = 7, + CV_WAIT = 8, + CV_SIGNAL = 9, + CV_BROADCAST = 10, + WAIT_UINT = 11, + RW_RDLOCK = 12, + RW_WRLOCK = 13, + RW_UNLOCK = 14, + WAIT_UINT_PRIVATE = 15, + WAKE_PRIVATE = 16, + MUTEX_WAIT = 17, + MUTEX_WAKE = 18, // deprecated + SEM_WAIT = 19, // deprecated + SEM_WAKE = 20, // deprecated + NWAKE_PRIVATE = 31, + MUTEX_WAKE2 = 22, + SEM2_WAIT = 23, + SEM2_WAKE = 24, + SHM = 25, + ROBUST_LISTS = 26, +}; + +pub const UMTX_ABSTIME = 0x01; +pub const _umtx_time = extern struct { + _timeout: timespec, + _flags: u32, + _clockid: u32, +}; + +pub extern "c" fn _umtx_op(obj: usize, op: c_int, val: c_ulong, uaddr: usize, uaddr2: usize) c_int; + pub const EAI = enum(c_int) { /// address family for hostname not supported ADDRFAMILY = 1, diff --git a/lib/std/c/openbsd.zig b/lib/std/c/openbsd.zig index 56807c7d68..0aa90c741a 100644 --- a/lib/std/c/openbsd.zig +++ b/lib/std/c/openbsd.zig @@ -45,6 +45,13 @@ pub extern "c" fn unveil(path: ?[*:0]const u8, permissions: ?[*:0]const u8) c_in pub extern "c" fn pthread_set_name_np(thread: std.c.pthread_t, name: [*:0]const u8) void; pub extern "c" fn pthread_get_name_np(thread: std.c.pthread_t, name: [*:0]u8, len: usize) void; +// https://github.com/openbsd/src/blob/2207c4325726fdc5c4bcd0011af0fdf7d3dab137/sys/sys/futex.h +pub const FUTEX_WAIT = 1; +pub const FUTEX_WAKE = 2; +pub const FUTEX_REQUEUE = 3; +pub const FUTEX_PRIVATE_FLAG = 128; +pub extern "c" fn futex(uaddr: ?*const volatile u32, op: c_int, val: c_int, timeout: ?*const timespec, uaddr2: ?*const volatile u32) c_int; + pub const login_cap_t = extern struct { lc_class: ?[*:0]const u8, lc_cap: ?[*:0]const u8,