Merge pull request #18085 from ziglang/std-atomics

rework std.atomic
This commit is contained in:
Andrew Kelley 2023-11-23 04:55:28 -05:00 committed by GitHub
commit 2bffd81015
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
17 changed files with 441 additions and 1330 deletions

View File

@ -209,9 +209,6 @@ set(ZIG_STAGE2_SOURCES
"${CMAKE_SOURCE_DIR}/lib/std/array_list.zig"
"${CMAKE_SOURCE_DIR}/lib/std/ascii.zig"
"${CMAKE_SOURCE_DIR}/lib/std/atomic.zig"
"${CMAKE_SOURCE_DIR}/lib/std/atomic/Atomic.zig"
"${CMAKE_SOURCE_DIR}/lib/std/atomic/queue.zig"
"${CMAKE_SOURCE_DIR}/lib/std/atomic/stack.zig"
"${CMAKE_SOURCE_DIR}/lib/std/base64.zig"
"${CMAKE_SOURCE_DIR}/lib/std/BitStack.zig"
"${CMAKE_SOURCE_DIR}/lib/std/buf_map.zig"

View File

@ -8,7 +8,6 @@ const math = std.math;
const os = std.os;
const assert = std.debug.assert;
const target = builtin.target;
const Atomic = std.atomic.Atomic;
pub const Futex = @import("Thread/Futex.zig");
pub const ResetEvent = @import("Thread/ResetEvent.zig");
@ -388,7 +387,7 @@ pub fn yield() YieldError!void {
}
/// State to synchronize detachment of spawner thread to spawned thread
const Completion = Atomic(enum(u8) {
const Completion = std.atomic.Value(enum(u8) {
running,
detached,
completed,
@ -746,7 +745,7 @@ const WasiThreadImpl = struct {
const WasiThread = struct {
/// Thread ID
tid: Atomic(i32) = Atomic(i32).init(0),
tid: std.atomic.Value(i32) = std.atomic.Value(i32).init(0),
/// Contains all memory which was allocated to bootstrap this thread, including:
/// - Guard page
/// - Stack
@ -784,7 +783,7 @@ const WasiThreadImpl = struct {
original_stack_pointer: [*]u8,
};
const State = Atomic(enum(u8) { running, completed, detached });
const State = std.atomic.Value(enum(u8) { running, completed, detached });
fn getCurrentId() Id {
return tls_thread_id;
@ -1048,7 +1047,7 @@ const LinuxThreadImpl = struct {
const ThreadCompletion = struct {
completion: Completion = Completion.init(.running),
child_tid: Atomic(i32) = Atomic(i32).init(1),
child_tid: std.atomic.Value(i32) = std.atomic.Value(i32).init(1),
parent_tid: i32 = undefined,
mapped: []align(std.mem.page_size) u8,
@ -1304,7 +1303,7 @@ const LinuxThreadImpl = struct {
@intFromPtr(instance),
&instance.thread.parent_tid,
tls_ptr,
&instance.thread.child_tid.value,
&instance.thread.child_tid.raw,
))) {
.SUCCESS => return Impl{ .thread = &instance.thread },
.AGAIN => return error.ThreadQuotaExceeded,
@ -1346,7 +1345,7 @@ const LinuxThreadImpl = struct {
}
switch (linux.getErrno(linux.futex_wait(
&self.thread.child_tid.value,
&self.thread.child_tid.raw,
linux.FUTEX.WAIT,
tid,
null,
@ -1387,7 +1386,7 @@ test "setName, getName" {
test_done_event: ResetEvent = .{},
thread_done_event: ResetEvent = .{},
done: std.atomic.Atomic(bool) = std.atomic.Atomic(bool).init(false),
done: std.atomic.Value(bool) = std.atomic.Value(bool).init(false),
thread: Thread = undefined,
pub fn run(ctx: *@This()) !void {

View File

@ -50,7 +50,6 @@ const Mutex = std.Thread.Mutex;
const os = std.os;
const assert = std.debug.assert;
const testing = std.testing;
const Atomic = std.atomic.Atomic;
const Futex = std.Thread.Futex;
impl: Impl = .{},
@ -193,8 +192,8 @@ const WindowsImpl = struct {
};
const FutexImpl = struct {
state: Atomic(u32) = Atomic(u32).init(0),
epoch: Atomic(u32) = Atomic(u32).init(0),
state: std.atomic.Value(u32) = std.atomic.Value(u32).init(0),
epoch: std.atomic.Value(u32) = std.atomic.Value(u32).init(0),
const one_waiter = 1;
const waiter_mask = 0xffff;
@ -232,12 +231,12 @@ const FutexImpl = struct {
// Acquire barrier ensures code before the wake() which added the signal happens before we decrement it and return.
while (state & signal_mask != 0) {
const new_state = state - one_waiter - one_signal;
state = self.state.tryCompareAndSwap(state, new_state, .Acquire, .Monotonic) orelse return;
state = self.state.cmpxchgWeak(state, new_state, .Acquire, .Monotonic) orelse return;
}
// Remove the waiter we added and officially return timed out.
const new_state = state - one_waiter;
state = self.state.tryCompareAndSwap(state, new_state, .Monotonic, .Monotonic) orelse return err;
state = self.state.cmpxchgWeak(state, new_state, .Monotonic, .Monotonic) orelse return err;
}
},
};
@ -249,7 +248,7 @@ const FutexImpl = struct {
// Acquire barrier ensures code before the wake() which added the signal happens before we decrement it and return.
while (state & signal_mask != 0) {
const new_state = state - one_waiter - one_signal;
state = self.state.tryCompareAndSwap(state, new_state, .Acquire, .Monotonic) orelse return;
state = self.state.cmpxchgWeak(state, new_state, .Acquire, .Monotonic) orelse return;
}
}
}
@ -276,7 +275,7 @@ const FutexImpl = struct {
// Reserve the amount of waiters to wake by incrementing the signals count.
// Release barrier ensures code before the wake() happens before the signal it posted and consumed by the wait() threads.
const new_state = state + (one_signal * to_wake);
state = self.state.tryCompareAndSwap(state, new_state, .Release, .Monotonic) orelse {
state = self.state.cmpxchgWeak(state, new_state, .Release, .Monotonic) orelse {
// Wake up the waiting threads we reserved above by changing the epoch value.
// NOTE: a waiting thread could miss a wake up if *exactly* ((1<<32)-1) wake()s happen between it observing the epoch and sleeping on it.
// This is very unlikely due to how many precise amount of Futex.wake() calls that would be between the waiting thread's potential preemption.

View File

@ -10,7 +10,7 @@ const Futex = @This();
const os = std.os;
const assert = std.debug.assert;
const testing = std.testing;
const Atomic = std.atomic.Atomic;
const atomic = std.atomic;
/// Checks if `ptr` still contains the value `expect` and, if so, blocks the caller until either:
/// - The value at `ptr` is no longer equal to `expect`.
@ -19,7 +19,7 @@ const Atomic = std.atomic.Atomic;
///
/// The checking of `ptr` and `expect`, along with blocking the caller, is done atomically
/// and totally ordered (sequentially consistent) with respect to other wait()/wake() calls on the same `ptr`.
pub fn wait(ptr: *const Atomic(u32), expect: u32) void {
pub fn wait(ptr: *const atomic.Value(u32), expect: u32) void {
@setCold(true);
Impl.wait(ptr, expect, null) catch |err| switch (err) {
@ -35,7 +35,7 @@ pub fn wait(ptr: *const Atomic(u32), expect: u32) void {
///
/// The checking of `ptr` and `expect`, along with blocking the caller, is done atomically
/// and totally ordered (sequentially consistent) with respect to other wait()/wake() calls on the same `ptr`.
pub fn timedWait(ptr: *const Atomic(u32), expect: u32, timeout_ns: u64) error{Timeout}!void {
pub fn timedWait(ptr: *const atomic.Value(u32), expect: u32, timeout_ns: u64) error{Timeout}!void {
@setCold(true);
// Avoid calling into the OS for no-op timeouts.
@ -48,7 +48,7 @@ pub fn timedWait(ptr: *const Atomic(u32), expect: u32, timeout_ns: u64) error{Ti
}
/// Unblocks at most `max_waiters` callers blocked in a `wait()` call on `ptr`.
pub fn wake(ptr: *const Atomic(u32), max_waiters: u32) void {
pub fn wake(ptr: *const atomic.Value(u32), max_waiters: u32) void {
@setCold(true);
// Avoid calling into the OS if there's nothing to wake up.
@ -83,11 +83,11 @@ else
/// We can't do @compileError() in the `Impl` switch statement above as its eagerly evaluated.
/// So instead, we @compileError() on the methods themselves for platforms which don't support futex.
const UnsupportedImpl = struct {
fn wait(ptr: *const Atomic(u32), expect: u32, timeout: ?u64) error{Timeout}!void {
fn wait(ptr: *const atomic.Value(u32), expect: u32, timeout: ?u64) error{Timeout}!void {
return unsupported(.{ ptr, expect, timeout });
}
fn wake(ptr: *const Atomic(u32), max_waiters: u32) void {
fn wake(ptr: *const atomic.Value(u32), max_waiters: u32) void {
return unsupported(.{ ptr, max_waiters });
}
@ -98,8 +98,8 @@ const UnsupportedImpl = struct {
};
const SingleThreadedImpl = struct {
fn wait(ptr: *const Atomic(u32), expect: u32, timeout: ?u64) error{Timeout}!void {
if (ptr.loadUnchecked() != expect) {
fn wait(ptr: *const atomic.Value(u32), expect: u32, timeout: ?u64) error{Timeout}!void {
if (ptr.raw != expect) {
return;
}
@ -113,7 +113,7 @@ const SingleThreadedImpl = struct {
return error.Timeout;
}
fn wake(ptr: *const Atomic(u32), max_waiters: u32) void {
fn wake(ptr: *const atomic.Value(u32), max_waiters: u32) void {
// There are no other threads to possibly wake up
_ = ptr;
_ = max_waiters;
@ -123,7 +123,7 @@ const SingleThreadedImpl = struct {
// We use WaitOnAddress through NtDll instead of API-MS-Win-Core-Synch-l1-2-0.dll
// as it's generally already a linked target and is autoloaded into all processes anyway.
const WindowsImpl = struct {
fn wait(ptr: *const Atomic(u32), expect: u32, timeout: ?u64) error{Timeout}!void {
fn wait(ptr: *const atomic.Value(u32), expect: u32, timeout: ?u64) error{Timeout}!void {
var timeout_value: os.windows.LARGE_INTEGER = undefined;
var timeout_ptr: ?*const os.windows.LARGE_INTEGER = null;
@ -152,7 +152,7 @@ const WindowsImpl = struct {
}
}
fn wake(ptr: *const Atomic(u32), max_waiters: u32) void {
fn wake(ptr: *const atomic.Value(u32), max_waiters: u32) void {
const address: ?*const anyopaque = ptr;
assert(max_waiters != 0);
@ -164,7 +164,7 @@ const WindowsImpl = struct {
};
const DarwinImpl = struct {
fn wait(ptr: *const Atomic(u32), expect: u32, timeout: ?u64) error{Timeout}!void {
fn wait(ptr: *const atomic.Value(u32), expect: u32, timeout: ?u64) error{Timeout}!void {
// Darwin XNU 7195.50.7.100.1 introduced __ulock_wait2 and migrated code paths (notably pthread_cond_t) towards it:
// https://github.com/apple/darwin-xnu/commit/d4061fb0260b3ed486147341b72468f836ed6c8f#diff-08f993cc40af475663274687b7c326cc6c3031e0db3ac8de7b24624610616be6
//
@ -220,7 +220,7 @@ const DarwinImpl = struct {
}
}
fn wake(ptr: *const Atomic(u32), max_waiters: u32) void {
fn wake(ptr: *const atomic.Value(u32), max_waiters: u32) void {
var flags: u32 = os.darwin.UL_COMPARE_AND_WAIT | os.darwin.ULF_NO_ERRNO;
if (max_waiters > 1) {
flags |= os.darwin.ULF_WAKE_ALL;
@ -244,7 +244,7 @@ const DarwinImpl = struct {
// https://man7.org/linux/man-pages/man2/futex.2.html
const LinuxImpl = struct {
fn wait(ptr: *const Atomic(u32), expect: u32, timeout: ?u64) error{Timeout}!void {
fn wait(ptr: *const atomic.Value(u32), expect: u32, timeout: ?u64) error{Timeout}!void {
var ts: os.timespec = undefined;
if (timeout) |timeout_ns| {
ts.tv_sec = @as(@TypeOf(ts.tv_sec), @intCast(timeout_ns / std.time.ns_per_s));
@ -252,7 +252,7 @@ const LinuxImpl = struct {
}
const rc = os.linux.futex_wait(
@as(*const i32, @ptrCast(&ptr.value)),
@as(*const i32, @ptrCast(&ptr.raw)),
os.linux.FUTEX.PRIVATE_FLAG | os.linux.FUTEX.WAIT,
@as(i32, @bitCast(expect)),
if (timeout != null) &ts else null,
@ -272,9 +272,9 @@ const LinuxImpl = struct {
}
}
fn wake(ptr: *const Atomic(u32), max_waiters: u32) void {
fn wake(ptr: *const atomic.Value(u32), max_waiters: u32) void {
const rc = os.linux.futex_wake(
@as(*const i32, @ptrCast(&ptr.value)),
@as(*const i32, @ptrCast(&ptr.raw)),
os.linux.FUTEX.PRIVATE_FLAG | os.linux.FUTEX.WAKE,
std.math.cast(i32, max_waiters) orelse std.math.maxInt(i32),
);
@ -290,7 +290,7 @@ const LinuxImpl = struct {
// https://www.freebsd.org/cgi/man.cgi?query=_umtx_op&sektion=2&n=1
const FreebsdImpl = struct {
fn wait(ptr: *const Atomic(u32), expect: u32, timeout: ?u64) error{Timeout}!void {
fn wait(ptr: *const atomic.Value(u32), expect: u32, timeout: ?u64) error{Timeout}!void {
var tm_size: usize = 0;
var tm: os.freebsd._umtx_time = undefined;
var tm_ptr: ?*const os.freebsd._umtx_time = null;
@ -326,7 +326,7 @@ const FreebsdImpl = struct {
}
}
fn wake(ptr: *const Atomic(u32), max_waiters: u32) void {
fn wake(ptr: *const atomic.Value(u32), max_waiters: u32) void {
const rc = os.freebsd._umtx_op(
@intFromPtr(&ptr.value),
@intFromEnum(os.freebsd.UMTX_OP.WAKE_PRIVATE),
@ -346,7 +346,7 @@ const FreebsdImpl = struct {
// https://man.openbsd.org/futex.2
const OpenbsdImpl = struct {
fn wait(ptr: *const Atomic(u32), expect: u32, timeout: ?u64) error{Timeout}!void {
fn wait(ptr: *const atomic.Value(u32), expect: u32, timeout: ?u64) error{Timeout}!void {
var ts: os.timespec = undefined;
if (timeout) |timeout_ns| {
ts.tv_sec = @as(@TypeOf(ts.tv_sec), @intCast(timeout_ns / std.time.ns_per_s));
@ -377,7 +377,7 @@ const OpenbsdImpl = struct {
}
}
fn wake(ptr: *const Atomic(u32), max_waiters: u32) void {
fn wake(ptr: *const atomic.Value(u32), max_waiters: u32) void {
const rc = os.openbsd.futex(
@as(*const volatile u32, @ptrCast(&ptr.value)),
os.openbsd.FUTEX_WAKE | os.openbsd.FUTEX_PRIVATE_FLAG,
@ -393,7 +393,7 @@ const OpenbsdImpl = struct {
// https://man.dragonflybsd.org/?command=umtx&section=2
const DragonflyImpl = struct {
fn wait(ptr: *const Atomic(u32), expect: u32, timeout: ?u64) error{Timeout}!void {
fn wait(ptr: *const atomic.Value(u32), expect: u32, timeout: ?u64) error{Timeout}!void {
// Dragonfly uses a scheme where 0 timeout means wait until signaled or spurious wake.
// It's reporting of timeout's is also unrealiable so we use an external timing source (Timer) instead.
var timeout_us: c_int = 0;
@ -435,7 +435,7 @@ const DragonflyImpl = struct {
}
}
fn wake(ptr: *const Atomic(u32), max_waiters: u32) void {
fn wake(ptr: *const atomic.Value(u32), max_waiters: u32) void {
// A count of zero means wake all waiters.
assert(max_waiters != 0);
const to_wake = std.math.cast(c_int, max_waiters) orelse 0;
@ -449,7 +449,7 @@ const DragonflyImpl = struct {
};
const WasmImpl = struct {
fn wait(ptr: *const Atomic(u32), expect: u32, timeout: ?u64) error{Timeout}!void {
fn wait(ptr: *const atomic.Value(u32), expect: u32, timeout: ?u64) error{Timeout}!void {
if (!comptime std.Target.wasm.featureSetHas(builtin.target.cpu.features, .atomics)) {
@compileError("WASI target missing cpu feature 'atomics'");
}
@ -473,7 +473,7 @@ const WasmImpl = struct {
}
}
fn wake(ptr: *const Atomic(u32), max_waiters: u32) void {
fn wake(ptr: *const atomic.Value(u32), max_waiters: u32) void {
if (!comptime std.Target.wasm.featureSetHas(builtin.target.cpu.features, .atomics)) {
@compileError("WASI target missing cpu feature 'atomics'");
}
@ -732,8 +732,8 @@ const PosixImpl = struct {
};
const Bucket = struct {
mutex: std.c.pthread_mutex_t align(std.atomic.cache_line) = .{},
pending: Atomic(usize) = Atomic(usize).init(0),
mutex: std.c.pthread_mutex_t align(atomic.cache_line) = .{},
pending: atomic.Value(usize) = atomic.Value(usize).init(0),
treap: Treap = .{},
// Global array of buckets that addresses map to.
@ -757,9 +757,9 @@ const PosixImpl = struct {
};
const Address = struct {
fn from(ptr: *const Atomic(u32)) usize {
fn from(ptr: *const atomic.Value(u32)) usize {
// Get the alignment of the pointer.
const alignment = @alignOf(Atomic(u32));
const alignment = @alignOf(atomic.Value(u32));
comptime assert(std.math.isPowerOfTwo(alignment));
// Make sure the pointer is aligned,
@ -770,7 +770,7 @@ const PosixImpl = struct {
}
};
fn wait(ptr: *const Atomic(u32), expect: u32, timeout: ?u64) error{Timeout}!void {
fn wait(ptr: *const atomic.Value(u32), expect: u32, timeout: ?u64) error{Timeout}!void {
const address = Address.from(ptr);
const bucket = Bucket.from(address);
@ -831,7 +831,7 @@ const PosixImpl = struct {
};
}
fn wake(ptr: *const Atomic(u32), max_waiters: u32) void {
fn wake(ptr: *const atomic.Value(u32), max_waiters: u32) void {
const address = Address.from(ptr);
const bucket = Bucket.from(address);
@ -882,7 +882,7 @@ const PosixImpl = struct {
};
test "Futex - smoke test" {
var value = Atomic(u32).init(0);
var value = atomic.Value(u32).init(0);
// Try waits with invalid values.
Futex.wait(&value, 0xdeadbeef);
@ -908,7 +908,7 @@ test "Futex - signaling" {
const num_iterations = 4;
const Paddle = struct {
value: Atomic(u32) = Atomic(u32).init(0),
value: atomic.Value(u32) = atomic.Value(u32).init(0),
current: u32 = 0,
fn hit(self: *@This()) void {
@ -962,8 +962,8 @@ test "Futex - broadcasting" {
const num_iterations = 4;
const Barrier = struct {
count: Atomic(u32) = Atomic(u32).init(num_threads),
futex: Atomic(u32) = Atomic(u32).init(0),
count: atomic.Value(u32) = atomic.Value(u32).init(num_threads),
futex: atomic.Value(u32) = atomic.Value(u32).init(0),
fn wait(self: *@This()) !void {
// Decrement the counter.
@ -1036,7 +1036,7 @@ pub const Deadline = struct {
/// - `Futex.wake()` is called on the `ptr`.
/// - A spurious wake occurs.
/// - The deadline expires; In which case `error.Timeout` is returned.
pub fn wait(self: *Deadline, ptr: *const Atomic(u32), expect: u32) error{Timeout}!void {
pub fn wait(self: *Deadline, ptr: *const atomic.Value(u32), expect: u32) error{Timeout}!void {
@setCold(true);
// Check if we actually have a timeout to wait until.
@ -1056,7 +1056,7 @@ pub const Deadline = struct {
test "Futex - Deadline" {
var deadline = Deadline.init(100 * std.time.ns_per_ms);
var futex_word = Atomic(u32).init(0);
var futex_word = atomic.Value(u32).init(0);
while (true) {
deadline.wait(&futex_word, 0) catch break;

View File

@ -26,7 +26,6 @@ const Mutex = @This();
const os = std.os;
const assert = std.debug.assert;
const testing = std.testing;
const Atomic = std.atomic.Atomic;
const Thread = std.Thread;
const Futex = Thread.Futex;
@ -67,7 +66,7 @@ else
FutexImpl;
const DebugImpl = struct {
locking_thread: Atomic(Thread.Id) = Atomic(Thread.Id).init(0), // 0 means it's not locked.
locking_thread: std.atomic.Value(Thread.Id) = std.atomic.Value(Thread.Id).init(0), // 0 means it's not locked.
impl: ReleaseImpl = .{},
inline fn tryLock(self: *@This()) bool {
@ -151,37 +150,29 @@ const DarwinImpl = struct {
};
const FutexImpl = struct {
state: Atomic(u32) = Atomic(u32).init(unlocked),
state: std.atomic.Value(u32) = std.atomic.Value(u32).init(unlocked),
const unlocked = 0b00;
const locked = 0b01;
const contended = 0b11; // must contain the `locked` bit for x86 optimization below
fn tryLock(self: *@This()) bool {
// Lock with compareAndSwap instead of tryCompareAndSwap to avoid reporting spurious CAS failure.
return self.lockFast("compareAndSwap");
}
const unlocked: u32 = 0b00;
const locked: u32 = 0b01;
const contended: u32 = 0b11; // must contain the `locked` bit for x86 optimization below
fn lock(self: *@This()) void {
// Lock with tryCompareAndSwap instead of compareAndSwap due to being more inline-able on LL/SC archs like ARM.
if (!self.lockFast("tryCompareAndSwap")) {
if (!self.tryLock())
self.lockSlow();
}
}
inline fn lockFast(self: *@This(), comptime cas_fn_name: []const u8) bool {
fn tryLock(self: *@This()) bool {
// On x86, use `lock bts` instead of `lock cmpxchg` as:
// - they both seem to mark the cache-line as modified regardless: https://stackoverflow.com/a/63350048
// - `lock bts` is smaller instruction-wise which makes it better for inlining
if (comptime builtin.target.cpu.arch.isX86()) {
const locked_bit = @ctz(@as(u32, locked));
const locked_bit = @ctz(locked);
return self.state.bitSet(locked_bit, .Acquire) == 0;
}
// Acquire barrier ensures grabbing the lock happens before the critical section
// and that the previous lock holder's critical section happens before we grab the lock.
const casFn = @field(@TypeOf(self.state), cas_fn_name);
return casFn(&self.state, unlocked, locked, .Acquire, .Monotonic) == null;
return self.state.cmpxchgWeak(unlocked, locked, .Acquire, .Monotonic) == null;
}
fn lockSlow(self: *@This()) void {

View File

@ -9,7 +9,6 @@ const ResetEvent = @This();
const os = std.os;
const assert = std.debug.assert;
const testing = std.testing;
const Atomic = std.atomic.Atomic;
const Futex = std.Thread.Futex;
impl: Impl = .{},
@ -89,7 +88,7 @@ const SingleThreadedImpl = struct {
};
const FutexImpl = struct {
state: Atomic(u32) = Atomic(u32).init(unset),
state: std.atomic.Value(u32) = std.atomic.Value(u32).init(unset),
const unset = 0;
const waiting = 1;
@ -115,7 +114,7 @@ const FutexImpl = struct {
// We avoid using any strict barriers until the end when we know the ResetEvent is set.
var state = self.state.load(.Monotonic);
if (state == unset) {
state = self.state.compareAndSwap(state, waiting, .Monotonic, .Monotonic) orelse waiting;
state = self.state.cmpxchgStrong(state, waiting, .Monotonic, .Monotonic) orelse waiting;
}
// Wait until the ResetEvent is set since the state is waiting.
@ -252,7 +251,7 @@ test "ResetEvent - broadcast" {
const num_threads = 10;
const Barrier = struct {
event: ResetEvent = .{},
counter: Atomic(usize) = Atomic(usize).init(num_threads),
counter: std.atomic.Value(usize) = std.atomic.Value(usize).init(num_threads),
fn wait(self: *@This()) void {
if (self.counter.fetchSub(1, .AcqRel) == 1) {

View File

@ -307,7 +307,7 @@ test "RwLock - concurrent access" {
rwl: RwLock = .{},
writes: usize = 0,
reads: std.atomic.Atomic(usize) = std.atomic.Atomic(usize).init(0),
reads: std.atomic.Value(usize) = std.atomic.Value(usize).init(0),
term1: usize = 0,
term2: usize = 0,

View File

@ -1,12 +1,11 @@
const std = @import("std");
const Atomic = std.atomic.Atomic;
const assert = std.debug.assert;
const WaitGroup = @This();
const is_waiting: usize = 1 << 0;
const one_pending: usize = 1 << 1;
state: Atomic(usize) = Atomic(usize).init(0),
state: std.atomic.Value(usize) = std.atomic.Value(usize).init(0),
event: std.Thread.ResetEvent = .{},
pub fn start(self: *WaitGroup) void {

View File

@ -1,40 +1,374 @@
const std = @import("std.zig");
const builtin = @import("builtin");
/// This is a thin wrapper around a primitive value to prevent accidental data races.
pub fn Value(comptime T: type) type {
return extern struct {
/// Care must be taken to avoid data races when interacting with this field directly.
raw: T,
pub const Ordering = std.builtin.AtomicOrder;
const Self = @This();
pub const Stack = @import("atomic/stack.zig").Stack;
pub const Queue = @import("atomic/queue.zig").Queue;
pub const Atomic = @import("atomic/Atomic.zig").Atomic;
pub fn init(value: T) Self {
return .{ .raw = value };
}
test {
_ = @import("atomic/stack.zig");
_ = @import("atomic/queue.zig");
_ = @import("atomic/Atomic.zig");
/// Perform an atomic fence which uses the atomic value as a hint for
/// the modification order. Use this when you want to imply a fence on
/// an atomic variable without necessarily performing a memory access.
pub inline fn fence(self: *Self, comptime order: AtomicOrder) void {
// LLVM's ThreadSanitizer doesn't support the normal fences so we specialize for it.
if (builtin.sanitize_thread) {
const tsan = struct {
extern "c" fn __tsan_acquire(addr: *anyopaque) void;
extern "c" fn __tsan_release(addr: *anyopaque) void;
};
const addr: *anyopaque = self;
return switch (order) {
.Unordered, .Monotonic => @compileError(@tagName(order) ++ " only applies to atomic loads and stores"),
.Acquire => tsan.__tsan_acquire(addr),
.Release => tsan.__tsan_release(addr),
.AcqRel, .SeqCst => {
tsan.__tsan_acquire(addr);
tsan.__tsan_release(addr);
},
};
}
return @fence(order);
}
pub inline fn load(self: *const Self, comptime order: AtomicOrder) T {
return @atomicLoad(T, &self.raw, order);
}
pub inline fn store(self: *Self, value: T, comptime order: AtomicOrder) void {
@atomicStore(T, &self.raw, value, order);
}
pub inline fn swap(self: *Self, operand: T, comptime order: AtomicOrder) T {
return @atomicRmw(T, &self.raw, .Xchg, operand, order);
}
pub inline fn cmpxchgWeak(
self: *Self,
expected_value: T,
new_value: T,
comptime success_order: AtomicOrder,
comptime fail_order: AtomicOrder,
) ?T {
return @cmpxchgWeak(T, &self.raw, expected_value, new_value, success_order, fail_order);
}
pub inline fn cmpxchgStrong(
self: *Self,
expected_value: T,
new_value: T,
comptime success_order: AtomicOrder,
comptime fail_order: AtomicOrder,
) ?T {
return @cmpxchgStrong(T, &self.raw, expected_value, new_value, success_order, fail_order);
}
pub inline fn fetchAdd(self: *Self, operand: T, comptime order: AtomicOrder) T {
return @atomicRmw(T, &self.raw, .Add, operand, order);
}
pub inline fn fetchSub(self: *Self, operand: T, comptime order: AtomicOrder) T {
return @atomicRmw(T, &self.raw, .Sub, operand, order);
}
pub inline fn fetchMin(self: *Self, operand: T, comptime order: AtomicOrder) T {
return @atomicRmw(T, &self.raw, .Min, operand, order);
}
pub inline fn fetchMax(self: *Self, operand: T, comptime order: AtomicOrder) T {
return @atomicRmw(T, &self.raw, .Max, operand, order);
}
pub inline fn fetchAnd(self: *Self, operand: T, comptime order: AtomicOrder) T {
return @atomicRmw(T, &self.raw, .And, operand, order);
}
pub inline fn fetchNand(self: *Self, operand: T, comptime order: AtomicOrder) T {
return @atomicRmw(T, &self.raw, .Nand, operand, order);
}
pub inline fn fetchXor(self: *Self, operand: T, comptime order: AtomicOrder) T {
return @atomicRmw(T, &self.raw, .Xor, operand, order);
}
pub inline fn fetchOr(self: *Self, operand: T, comptime order: AtomicOrder) T {
return @atomicRmw(T, &self.raw, .Or, operand, order);
}
pub inline fn rmw(
self: *Self,
comptime op: std.builtin.AtomicRmwOp,
operand: T,
comptime order: AtomicOrder,
) T {
return @atomicRmw(T, &self.raw, op, operand, order);
}
const Bit = std.math.Log2Int(T);
/// Marked `inline` so that if `bit` is comptime-known, the instruction
/// can be lowered to a more efficient machine code instruction if
/// possible.
pub inline fn bitSet(self: *Self, bit: Bit, comptime order: AtomicOrder) u1 {
const mask = @as(T, 1) << bit;
const value = self.fetchOr(mask, order);
return @intFromBool(value & mask != 0);
}
/// Marked `inline` so that if `bit` is comptime-known, the instruction
/// can be lowered to a more efficient machine code instruction if
/// possible.
pub inline fn bitReset(self: *Self, bit: Bit, comptime order: AtomicOrder) u1 {
const mask = @as(T, 1) << bit;
const value = self.fetchAnd(~mask, order);
return @intFromBool(value & mask != 0);
}
/// Marked `inline` so that if `bit` is comptime-known, the instruction
/// can be lowered to a more efficient machine code instruction if
/// possible.
pub inline fn bitToggle(self: *Self, bit: Bit, comptime order: AtomicOrder) u1 {
const mask = @as(T, 1) << bit;
const value = self.fetchXor(mask, order);
return @intFromBool(value & mask != 0);
}
};
}
pub inline fn fence(comptime ordering: Ordering) void {
switch (ordering) {
.Acquire, .Release, .AcqRel, .SeqCst => {
@fence(ordering);
},
else => {
@compileLog(ordering, " only applies to a given memory location");
},
test Value {
const RefCount = struct {
count: Value(usize),
dropFn: *const fn (*RefCount) void,
const RefCount = @This();
fn ref(rc: *RefCount) void {
// No ordering necessary; just updating a counter.
_ = rc.count.fetchAdd(1, .Monotonic);
}
fn unref(rc: *RefCount) void {
// Release ensures code before unref() happens-before the
// count is decremented as dropFn could be called by then.
if (rc.count.fetchSub(1, .Release) == 1) {
// Acquire ensures count decrement and code before
// previous unrefs()s happens-before we call dropFn
// below.
// Another alternative is to use .AcqRel on the
// fetchSub count decrement but it's extra barrier in
// possibly hot path.
rc.count.fence(.Acquire);
(rc.dropFn)(rc);
}
}
fn noop(rc: *RefCount) void {
_ = rc;
}
};
var ref_count: RefCount = .{
.count = Value(usize).init(0),
.dropFn = RefCount.noop,
};
ref_count.ref();
ref_count.unref();
}
test "Value.swap" {
var x = Value(usize).init(5);
try testing.expectEqual(@as(usize, 5), x.swap(10, .SeqCst));
try testing.expectEqual(@as(usize, 10), x.load(.SeqCst));
const E = enum(usize) { a, b, c };
var y = Value(E).init(.c);
try testing.expectEqual(E.c, y.swap(.a, .SeqCst));
try testing.expectEqual(E.a, y.load(.SeqCst));
var z = Value(f32).init(5.0);
try testing.expectEqual(@as(f32, 5.0), z.swap(10.0, .SeqCst));
try testing.expectEqual(@as(f32, 10.0), z.load(.SeqCst));
var a = Value(bool).init(false);
try testing.expectEqual(false, a.swap(true, .SeqCst));
try testing.expectEqual(true, a.load(.SeqCst));
var b = Value(?*u8).init(null);
try testing.expectEqual(@as(?*u8, null), b.swap(@as(?*u8, @ptrFromInt(@alignOf(u8))), .SeqCst));
try testing.expectEqual(@as(?*u8, @ptrFromInt(@alignOf(u8))), b.load(.SeqCst));
}
test "Value.store" {
var x = Value(usize).init(5);
x.store(10, .SeqCst);
try testing.expectEqual(@as(usize, 10), x.load(.SeqCst));
}
test "Value.cmpxchgWeak" {
var x = Value(usize).init(0);
try testing.expectEqual(@as(?usize, 0), x.cmpxchgWeak(1, 0, .SeqCst, .SeqCst));
try testing.expectEqual(@as(usize, 0), x.load(.SeqCst));
while (x.cmpxchgWeak(0, 1, .SeqCst, .SeqCst)) |_| {}
try testing.expectEqual(@as(usize, 1), x.load(.SeqCst));
while (x.cmpxchgWeak(1, 0, .SeqCst, .SeqCst)) |_| {}
try testing.expectEqual(@as(usize, 0), x.load(.SeqCst));
}
test "Value.cmpxchgStrong" {
var x = Value(usize).init(0);
try testing.expectEqual(@as(?usize, 0), x.cmpxchgStrong(1, 0, .SeqCst, .SeqCst));
try testing.expectEqual(@as(usize, 0), x.load(.SeqCst));
try testing.expectEqual(@as(?usize, null), x.cmpxchgStrong(0, 1, .SeqCst, .SeqCst));
try testing.expectEqual(@as(usize, 1), x.load(.SeqCst));
try testing.expectEqual(@as(?usize, null), x.cmpxchgStrong(1, 0, .SeqCst, .SeqCst));
try testing.expectEqual(@as(usize, 0), x.load(.SeqCst));
}
test "Value.fetchAdd" {
var x = Value(usize).init(5);
try testing.expectEqual(@as(usize, 5), x.fetchAdd(5, .SeqCst));
try testing.expectEqual(@as(usize, 10), x.load(.SeqCst));
try testing.expectEqual(@as(usize, 10), x.fetchAdd(std.math.maxInt(usize), .SeqCst));
try testing.expectEqual(@as(usize, 9), x.load(.SeqCst));
}
test "Value.fetchSub" {
var x = Value(usize).init(5);
try testing.expectEqual(@as(usize, 5), x.fetchSub(5, .SeqCst));
try testing.expectEqual(@as(usize, 0), x.load(.SeqCst));
try testing.expectEqual(@as(usize, 0), x.fetchSub(1, .SeqCst));
try testing.expectEqual(@as(usize, std.math.maxInt(usize)), x.load(.SeqCst));
}
test "Value.fetchMin" {
var x = Value(usize).init(5);
try testing.expectEqual(@as(usize, 5), x.fetchMin(0, .SeqCst));
try testing.expectEqual(@as(usize, 0), x.load(.SeqCst));
try testing.expectEqual(@as(usize, 0), x.fetchMin(10, .SeqCst));
try testing.expectEqual(@as(usize, 0), x.load(.SeqCst));
}
test "Value.fetchMax" {
var x = Value(usize).init(5);
try testing.expectEqual(@as(usize, 5), x.fetchMax(10, .SeqCst));
try testing.expectEqual(@as(usize, 10), x.load(.SeqCst));
try testing.expectEqual(@as(usize, 10), x.fetchMax(5, .SeqCst));
try testing.expectEqual(@as(usize, 10), x.load(.SeqCst));
}
test "Value.fetchAnd" {
var x = Value(usize).init(0b11);
try testing.expectEqual(@as(usize, 0b11), x.fetchAnd(0b10, .SeqCst));
try testing.expectEqual(@as(usize, 0b10), x.load(.SeqCst));
try testing.expectEqual(@as(usize, 0b10), x.fetchAnd(0b00, .SeqCst));
try testing.expectEqual(@as(usize, 0b00), x.load(.SeqCst));
}
test "Value.fetchNand" {
var x = Value(usize).init(0b11);
try testing.expectEqual(@as(usize, 0b11), x.fetchNand(0b10, .SeqCst));
try testing.expectEqual(~@as(usize, 0b10), x.load(.SeqCst));
try testing.expectEqual(~@as(usize, 0b10), x.fetchNand(0b00, .SeqCst));
try testing.expectEqual(~@as(usize, 0b00), x.load(.SeqCst));
}
test "Value.fetchOr" {
var x = Value(usize).init(0b11);
try testing.expectEqual(@as(usize, 0b11), x.fetchOr(0b100, .SeqCst));
try testing.expectEqual(@as(usize, 0b111), x.load(.SeqCst));
try testing.expectEqual(@as(usize, 0b111), x.fetchOr(0b010, .SeqCst));
try testing.expectEqual(@as(usize, 0b111), x.load(.SeqCst));
}
test "Value.fetchXor" {
var x = Value(usize).init(0b11);
try testing.expectEqual(@as(usize, 0b11), x.fetchXor(0b10, .SeqCst));
try testing.expectEqual(@as(usize, 0b01), x.load(.SeqCst));
try testing.expectEqual(@as(usize, 0b01), x.fetchXor(0b01, .SeqCst));
try testing.expectEqual(@as(usize, 0b00), x.load(.SeqCst));
}
test "Value.bitSet" {
var x = Value(usize).init(0);
for (0..@bitSizeOf(usize)) |bit_index| {
const bit = @as(std.math.Log2Int(usize), @intCast(bit_index));
const mask = @as(usize, 1) << bit;
// setting the bit should change the bit
try testing.expect(x.load(.SeqCst) & mask == 0);
try testing.expectEqual(@as(u1, 0), x.bitSet(bit, .SeqCst));
try testing.expect(x.load(.SeqCst) & mask != 0);
// setting it again shouldn't change the bit
try testing.expectEqual(@as(u1, 1), x.bitSet(bit, .SeqCst));
try testing.expect(x.load(.SeqCst) & mask != 0);
// all the previous bits should have not changed (still be set)
for (0..bit_index) |prev_bit_index| {
const prev_bit = @as(std.math.Log2Int(usize), @intCast(prev_bit_index));
const prev_mask = @as(usize, 1) << prev_bit;
try testing.expect(x.load(.SeqCst) & prev_mask != 0);
}
}
}
pub inline fn compilerFence(comptime ordering: Ordering) void {
switch (ordering) {
.Acquire, .Release, .AcqRel, .SeqCst => asm volatile ("" ::: "memory"),
else => @compileLog(ordering, " only applies to a given memory location"),
test "Value.bitReset" {
var x = Value(usize).init(0);
for (0..@bitSizeOf(usize)) |bit_index| {
const bit = @as(std.math.Log2Int(usize), @intCast(bit_index));
const mask = @as(usize, 1) << bit;
x.raw |= mask;
// unsetting the bit should change the bit
try testing.expect(x.load(.SeqCst) & mask != 0);
try testing.expectEqual(@as(u1, 1), x.bitReset(bit, .SeqCst));
try testing.expect(x.load(.SeqCst) & mask == 0);
// unsetting it again shouldn't change the bit
try testing.expectEqual(@as(u1, 0), x.bitReset(bit, .SeqCst));
try testing.expect(x.load(.SeqCst) & mask == 0);
// all the previous bits should have not changed (still be reset)
for (0..bit_index) |prev_bit_index| {
const prev_bit = @as(std.math.Log2Int(usize), @intCast(prev_bit_index));
const prev_mask = @as(usize, 1) << prev_bit;
try testing.expect(x.load(.SeqCst) & prev_mask == 0);
}
}
}
test "fence/compilerFence" {
inline for (.{ .Acquire, .Release, .AcqRel, .SeqCst }) |ordering| {
compilerFence(ordering);
fence(ordering);
test "Value.bitToggle" {
var x = Value(usize).init(0);
for (0..@bitSizeOf(usize)) |bit_index| {
const bit = @as(std.math.Log2Int(usize), @intCast(bit_index));
const mask = @as(usize, 1) << bit;
// toggling the bit should change the bit
try testing.expect(x.load(.SeqCst) & mask == 0);
try testing.expectEqual(@as(u1, 0), x.bitToggle(bit, .SeqCst));
try testing.expect(x.load(.SeqCst) & mask != 0);
// toggling it again *should* change the bit
try testing.expectEqual(@as(u1, 1), x.bitToggle(bit, .SeqCst));
try testing.expect(x.load(.SeqCst) & mask == 0);
// all the previous bits should have not changed (still be toggled back)
for (0..bit_index) |prev_bit_index| {
const prev_bit = @as(std.math.Log2Int(usize), @intCast(prev_bit_index));
const prev_mask = @as(usize, 1) << prev_bit;
try testing.expect(x.load(.SeqCst) & prev_mask == 0);
}
}
}
@ -74,9 +408,8 @@ pub inline fn spinLoopHint() void {
}
}
test "spinLoopHint" {
var i: usize = 10;
while (i > 0) : (i -= 1) {
test spinLoopHint {
for (0..10) |_| {
spinLoopHint();
}
}
@ -85,8 +418,8 @@ test "spinLoopHint" {
/// Add this much padding or align to this boundary to avoid atomically-updated
/// memory from forcing cache invalidations on near, but non-atomic, memory.
///
// https://en.wikipedia.org/wiki/False_sharing
// https://github.com/golang/go/search?q=CacheLinePadSize
/// https://en.wikipedia.org/wiki/False_sharing
/// https://github.com/golang/go/search?q=CacheLinePadSize
pub const cache_line = switch (builtin.cpu.arch) {
// x86_64: Starting from Intel's Sandy Bridge, the spatial prefetcher pulls in pairs of 64-byte cache lines at a time.
// - https://www.intel.com/content/dam/www/public/us/en/documents/manuals/64-ia-32-architectures-optimization-manual.pdf
@ -118,3 +451,8 @@ pub const cache_line = switch (builtin.cpu.arch) {
// - https://github.com/golang/go/blob/3dd58676054223962cd915bb0934d1f9f489d4d2/src/internal/cpu/cpu_wasm.go#L7
else => 64,
};
const std = @import("std.zig");
const builtin = @import("builtin");
const AtomicOrder = std.builtin.AtomicOrder;
const testing = std.testing;

View File

@ -1,619 +0,0 @@
const std = @import("../std.zig");
const builtin = @import("builtin");
const testing = std.testing;
const Ordering = std.atomic.Ordering;
pub fn Atomic(comptime T: type) type {
return extern struct {
value: T,
const Self = @This();
pub fn init(value: T) Self {
return .{ .value = value };
}
/// Perform an atomic fence which uses the atomic value as a hint for the modification order.
/// Use this when you want to imply a fence on an atomic variable without necessarily performing a memory access.
///
/// Example:
/// ```
/// const RefCount = struct {
/// count: Atomic(usize),
/// dropFn: *const fn (*RefCount) void,
///
/// fn ref(self: *RefCount) void {
/// _ = self.count.fetchAdd(1, .Monotonic); // no ordering necessary, just updating a counter
/// }
///
/// fn unref(self: *RefCount) void {
/// // Release ensures code before unref() happens-before the count is decremented as dropFn could be called by then.
/// if (self.count.fetchSub(1, .Release)) {
/// // Acquire ensures count decrement and code before previous unrefs()s happens-before we call dropFn below.
/// // NOTE: another alternative is to use .AcqRel on the fetchSub count decrement but it's extra barrier in possibly hot path.
/// self.count.fence(.Acquire);
/// (self.dropFn)(self);
/// }
/// }
/// };
/// ```
pub inline fn fence(self: *Self, comptime ordering: Ordering) void {
// LLVM's ThreadSanitizer doesn't support the normal fences so we specialize for it.
if (builtin.sanitize_thread) {
const tsan = struct {
extern "c" fn __tsan_acquire(addr: *anyopaque) void;
extern "c" fn __tsan_release(addr: *anyopaque) void;
};
const addr: *anyopaque = self;
return switch (ordering) {
.Unordered, .Monotonic => @compileError(@tagName(ordering) ++ " only applies to atomic loads and stores"),
.Acquire => tsan.__tsan_acquire(addr),
.Release => tsan.__tsan_release(addr),
.AcqRel, .SeqCst => {
tsan.__tsan_acquire(addr);
tsan.__tsan_release(addr);
},
};
}
return std.atomic.fence(ordering);
}
/// Non-atomically load from the atomic value without synchronization.
/// Care must be taken to avoid data-races when interacting with other atomic operations.
pub inline fn loadUnchecked(self: Self) T {
return self.value;
}
/// Non-atomically store to the atomic value without synchronization.
/// Care must be taken to avoid data-races when interacting with other atomic operations.
pub inline fn storeUnchecked(self: *Self, value: T) void {
self.value = value;
}
pub inline fn load(self: *const Self, comptime ordering: Ordering) T {
return switch (ordering) {
.AcqRel => @compileError(@tagName(ordering) ++ " implies " ++ @tagName(Ordering.Release) ++ " which is only allowed on atomic stores"),
.Release => @compileError(@tagName(ordering) ++ " is only allowed on atomic stores"),
else => @atomicLoad(T, &self.value, ordering),
};
}
pub inline fn store(self: *Self, value: T, comptime ordering: Ordering) void {
switch (ordering) {
.AcqRel => @compileError(@tagName(ordering) ++ " implies " ++ @tagName(Ordering.Acquire) ++ " which is only allowed on atomic loads"),
.Acquire => @compileError(@tagName(ordering) ++ " is only allowed on atomic loads"),
else => @atomicStore(T, &self.value, value, ordering),
}
}
pub inline fn swap(self: *Self, value: T, comptime ordering: Ordering) T {
return self.rmw(.Xchg, value, ordering);
}
pub inline fn compareAndSwap(
self: *Self,
compare: T,
exchange: T,
comptime success: Ordering,
comptime failure: Ordering,
) ?T {
return self.cmpxchg(true, compare, exchange, success, failure);
}
pub inline fn tryCompareAndSwap(
self: *Self,
compare: T,
exchange: T,
comptime success: Ordering,
comptime failure: Ordering,
) ?T {
return self.cmpxchg(false, compare, exchange, success, failure);
}
inline fn cmpxchg(
self: *Self,
comptime is_strong: bool,
compare: T,
exchange: T,
comptime success: Ordering,
comptime failure: Ordering,
) ?T {
if (success == .Unordered or failure == .Unordered) {
@compileError(@tagName(Ordering.Unordered) ++ " is only allowed on atomic loads and stores");
}
const success_is_stronger = switch (failure) {
.SeqCst => success == .SeqCst,
.AcqRel => @compileError(@tagName(failure) ++ " implies " ++ @tagName(Ordering.Release) ++ " which is only allowed on success"),
.Acquire => success == .SeqCst or success == .AcqRel or success == .Acquire,
.Release => @compileError(@tagName(failure) ++ " is only allowed on success"),
.Monotonic => true,
.Unordered => unreachable,
};
if (!success_is_stronger) {
@compileError(@tagName(success) ++ " must be stronger than " ++ @tagName(failure));
}
return switch (is_strong) {
true => @cmpxchgStrong(T, &self.value, compare, exchange, success, failure),
false => @cmpxchgWeak(T, &self.value, compare, exchange, success, failure),
};
}
inline fn rmw(
self: *Self,
comptime op: std.builtin.AtomicRmwOp,
value: T,
comptime ordering: Ordering,
) T {
return @atomicRmw(T, &self.value, op, value, ordering);
}
pub inline fn fetchAdd(self: *Self, value: T, comptime ordering: Ordering) T {
return self.rmw(.Add, value, ordering);
}
pub inline fn fetchSub(self: *Self, value: T, comptime ordering: Ordering) T {
return self.rmw(.Sub, value, ordering);
}
pub inline fn fetchMin(self: *Self, value: T, comptime ordering: Ordering) T {
return self.rmw(.Min, value, ordering);
}
pub inline fn fetchMax(self: *Self, value: T, comptime ordering: Ordering) T {
return self.rmw(.Max, value, ordering);
}
pub inline fn fetchAnd(self: *Self, value: T, comptime ordering: Ordering) T {
return self.rmw(.And, value, ordering);
}
pub inline fn fetchNand(self: *Self, value: T, comptime ordering: Ordering) T {
return self.rmw(.Nand, value, ordering);
}
pub inline fn fetchOr(self: *Self, value: T, comptime ordering: Ordering) T {
return self.rmw(.Or, value, ordering);
}
pub inline fn fetchXor(self: *Self, value: T, comptime ordering: Ordering) T {
return self.rmw(.Xor, value, ordering);
}
const Bit = std.math.Log2Int(T);
const BitRmwOp = enum {
Set,
Reset,
Toggle,
};
pub inline fn bitSet(self: *Self, bit: Bit, comptime ordering: Ordering) u1 {
return bitRmw(self, .Set, bit, ordering);
}
pub inline fn bitReset(self: *Self, bit: Bit, comptime ordering: Ordering) u1 {
return bitRmw(self, .Reset, bit, ordering);
}
pub inline fn bitToggle(self: *Self, bit: Bit, comptime ordering: Ordering) u1 {
return bitRmw(self, .Toggle, bit, ordering);
}
inline fn bitRmw(self: *Self, comptime op: BitRmwOp, bit: Bit, comptime ordering: Ordering) u1 {
// x86 supports dedicated bitwise instructions
if (comptime builtin.target.cpu.arch.isX86() and @sizeOf(T) >= 2 and @sizeOf(T) <= 8) {
// TODO: this causes std lib test failures when enabled
if (false) {
return x86BitRmw(self, op, bit, ordering);
}
}
const mask = @as(T, 1) << bit;
const value = switch (op) {
.Set => self.fetchOr(mask, ordering),
.Reset => self.fetchAnd(~mask, ordering),
.Toggle => self.fetchXor(mask, ordering),
};
return @intFromBool(value & mask != 0);
}
inline fn x86BitRmw(self: *Self, comptime op: BitRmwOp, bit: Bit, comptime ordering: Ordering) u1 {
const old_bit: u8 = switch (@sizeOf(T)) {
2 => switch (op) {
.Set => asm volatile ("lock btsw %[bit], %[ptr]"
// LLVM doesn't support u1 flag register return values
: [result] "={@ccc}" (-> u8),
: [ptr] "*m" (&self.value),
[bit] "X" (@as(T, bit)),
: "cc", "memory"
),
.Reset => asm volatile ("lock btrw %[bit], %[ptr]"
// LLVM doesn't support u1 flag register return values
: [result] "={@ccc}" (-> u8),
: [ptr] "*m" (&self.value),
[bit] "X" (@as(T, bit)),
: "cc", "memory"
),
.Toggle => asm volatile ("lock btcw %[bit], %[ptr]"
// LLVM doesn't support u1 flag register return values
: [result] "={@ccc}" (-> u8),
: [ptr] "*m" (&self.value),
[bit] "X" (@as(T, bit)),
: "cc", "memory"
),
},
4 => switch (op) {
.Set => asm volatile ("lock btsl %[bit], %[ptr]"
// LLVM doesn't support u1 flag register return values
: [result] "={@ccc}" (-> u8),
: [ptr] "*m" (&self.value),
[bit] "X" (@as(T, bit)),
: "cc", "memory"
),
.Reset => asm volatile ("lock btrl %[bit], %[ptr]"
// LLVM doesn't support u1 flag register return values
: [result] "={@ccc}" (-> u8),
: [ptr] "*m" (&self.value),
[bit] "X" (@as(T, bit)),
: "cc", "memory"
),
.Toggle => asm volatile ("lock btcl %[bit], %[ptr]"
// LLVM doesn't support u1 flag register return values
: [result] "={@ccc}" (-> u8),
: [ptr] "*m" (&self.value),
[bit] "X" (@as(T, bit)),
: "cc", "memory"
),
},
8 => switch (op) {
.Set => asm volatile ("lock btsq %[bit], %[ptr]"
// LLVM doesn't support u1 flag register return values
: [result] "={@ccc}" (-> u8),
: [ptr] "*m" (&self.value),
[bit] "X" (@as(T, bit)),
: "cc", "memory"
),
.Reset => asm volatile ("lock btrq %[bit], %[ptr]"
// LLVM doesn't support u1 flag register return values
: [result] "={@ccc}" (-> u8),
: [ptr] "*m" (&self.value),
[bit] "X" (@as(T, bit)),
: "cc", "memory"
),
.Toggle => asm volatile ("lock btcq %[bit], %[ptr]"
// LLVM doesn't support u1 flag register return values
: [result] "={@ccc}" (-> u8),
: [ptr] "*m" (&self.value),
[bit] "X" (@as(T, bit)),
: "cc", "memory"
),
},
else => @compileError("Invalid atomic type " ++ @typeName(T)),
};
// TODO: emit appropriate tsan fence if compiling with tsan
_ = ordering;
return @intCast(old_bit);
}
};
}
test "Atomic.fence" {
inline for (.{ .Acquire, .Release, .AcqRel, .SeqCst }) |ordering| {
var x = Atomic(usize).init(0);
x.fence(ordering);
}
}
fn atomicIntTypes() []const type {
comptime var bytes = 1;
comptime var types: []const type = &[_]type{};
inline while (bytes <= @sizeOf(usize)) : (bytes *= 2) {
types = types ++ &[_]type{std.meta.Int(.unsigned, bytes * 8)};
}
return types;
}
test "Atomic.loadUnchecked" {
inline for (atomicIntTypes()) |Int| {
var x = Atomic(Int).init(5);
try testing.expectEqual(x.loadUnchecked(), 5);
}
}
test "Atomic.storeUnchecked" {
inline for (atomicIntTypes()) |Int| {
_ = Int;
var x = Atomic(usize).init(5);
x.storeUnchecked(10);
try testing.expectEqual(x.loadUnchecked(), 10);
}
}
test "Atomic.load" {
inline for (atomicIntTypes()) |Int| {
inline for (.{ .Unordered, .Monotonic, .Acquire, .SeqCst }) |ordering| {
var x = Atomic(Int).init(5);
try testing.expectEqual(x.load(ordering), 5);
}
}
}
test "Atomic.store" {
inline for (atomicIntTypes()) |Int| {
inline for (.{ .Unordered, .Monotonic, .Release, .SeqCst }) |ordering| {
_ = Int;
var x = Atomic(usize).init(5);
x.store(10, ordering);
try testing.expectEqual(x.load(.SeqCst), 10);
}
}
}
const atomic_rmw_orderings = [_]Ordering{
.Monotonic,
.Acquire,
.Release,
.AcqRel,
.SeqCst,
};
test "Atomic.swap" {
inline for (atomic_rmw_orderings) |ordering| {
var x = Atomic(usize).init(5);
try testing.expectEqual(x.swap(10, ordering), 5);
try testing.expectEqual(x.load(.SeqCst), 10);
var y = Atomic(enum(usize) { a, b, c }).init(.c);
try testing.expectEqual(y.swap(.a, ordering), .c);
try testing.expectEqual(y.load(.SeqCst), .a);
var z = Atomic(f32).init(5.0);
try testing.expectEqual(z.swap(10.0, ordering), 5.0);
try testing.expectEqual(z.load(.SeqCst), 10.0);
var a = Atomic(bool).init(false);
try testing.expectEqual(a.swap(true, ordering), false);
try testing.expectEqual(a.load(.SeqCst), true);
var b = Atomic(?*u8).init(null);
try testing.expectEqual(b.swap(@as(?*u8, @ptrFromInt(@alignOf(u8))), ordering), null);
try testing.expectEqual(b.load(.SeqCst), @as(?*u8, @ptrFromInt(@alignOf(u8))));
}
}
const atomic_cmpxchg_orderings = [_][2]Ordering{
.{ .Monotonic, .Monotonic },
.{ .Acquire, .Monotonic },
.{ .Acquire, .Acquire },
.{ .Release, .Monotonic },
// Although accepted by LLVM, acquire failure implies AcqRel success
// .{ .Release, .Acquire },
.{ .AcqRel, .Monotonic },
.{ .AcqRel, .Acquire },
.{ .SeqCst, .Monotonic },
.{ .SeqCst, .Acquire },
.{ .SeqCst, .SeqCst },
};
test "Atomic.compareAndSwap" {
inline for (atomicIntTypes()) |Int| {
inline for (atomic_cmpxchg_orderings) |ordering| {
var x = Atomic(Int).init(0);
try testing.expectEqual(x.compareAndSwap(1, 0, ordering[0], ordering[1]), 0);
try testing.expectEqual(x.load(.SeqCst), 0);
try testing.expectEqual(x.compareAndSwap(0, 1, ordering[0], ordering[1]), null);
try testing.expectEqual(x.load(.SeqCst), 1);
try testing.expectEqual(x.compareAndSwap(1, 0, ordering[0], ordering[1]), null);
try testing.expectEqual(x.load(.SeqCst), 0);
}
}
}
test "Atomic.tryCompareAndSwap" {
inline for (atomicIntTypes()) |Int| {
inline for (atomic_cmpxchg_orderings) |ordering| {
var x = Atomic(Int).init(0);
try testing.expectEqual(x.tryCompareAndSwap(1, 0, ordering[0], ordering[1]), 0);
try testing.expectEqual(x.load(.SeqCst), 0);
while (x.tryCompareAndSwap(0, 1, ordering[0], ordering[1])) |_| {}
try testing.expectEqual(x.load(.SeqCst), 1);
while (x.tryCompareAndSwap(1, 0, ordering[0], ordering[1])) |_| {}
try testing.expectEqual(x.load(.SeqCst), 0);
}
}
}
test "Atomic.fetchAdd" {
inline for (atomicIntTypes()) |Int| {
inline for (atomic_rmw_orderings) |ordering| {
var x = Atomic(Int).init(5);
try testing.expectEqual(x.fetchAdd(5, ordering), 5);
try testing.expectEqual(x.load(.SeqCst), 10);
try testing.expectEqual(x.fetchAdd(std.math.maxInt(Int), ordering), 10);
try testing.expectEqual(x.load(.SeqCst), 9);
}
}
}
test "Atomic.fetchSub" {
inline for (atomicIntTypes()) |Int| {
inline for (atomic_rmw_orderings) |ordering| {
var x = Atomic(Int).init(5);
try testing.expectEqual(x.fetchSub(5, ordering), 5);
try testing.expectEqual(x.load(.SeqCst), 0);
try testing.expectEqual(x.fetchSub(1, ordering), 0);
try testing.expectEqual(x.load(.SeqCst), std.math.maxInt(Int));
}
}
}
test "Atomic.fetchMin" {
inline for (atomicIntTypes()) |Int| {
inline for (atomic_rmw_orderings) |ordering| {
var x = Atomic(Int).init(5);
try testing.expectEqual(x.fetchMin(0, ordering), 5);
try testing.expectEqual(x.load(.SeqCst), 0);
try testing.expectEqual(x.fetchMin(10, ordering), 0);
try testing.expectEqual(x.load(.SeqCst), 0);
}
}
}
test "Atomic.fetchMax" {
inline for (atomicIntTypes()) |Int| {
inline for (atomic_rmw_orderings) |ordering| {
var x = Atomic(Int).init(5);
try testing.expectEqual(x.fetchMax(10, ordering), 5);
try testing.expectEqual(x.load(.SeqCst), 10);
try testing.expectEqual(x.fetchMax(5, ordering), 10);
try testing.expectEqual(x.load(.SeqCst), 10);
}
}
}
test "Atomic.fetchAnd" {
inline for (atomicIntTypes()) |Int| {
inline for (atomic_rmw_orderings) |ordering| {
var x = Atomic(Int).init(0b11);
try testing.expectEqual(x.fetchAnd(0b10, ordering), 0b11);
try testing.expectEqual(x.load(.SeqCst), 0b10);
try testing.expectEqual(x.fetchAnd(0b00, ordering), 0b10);
try testing.expectEqual(x.load(.SeqCst), 0b00);
}
}
}
test "Atomic.fetchNand" {
inline for (atomicIntTypes()) |Int| {
inline for (atomic_rmw_orderings) |ordering| {
var x = Atomic(Int).init(0b11);
try testing.expectEqual(x.fetchNand(0b10, ordering), 0b11);
try testing.expectEqual(x.load(.SeqCst), ~@as(Int, 0b10));
try testing.expectEqual(x.fetchNand(0b00, ordering), ~@as(Int, 0b10));
try testing.expectEqual(x.load(.SeqCst), ~@as(Int, 0b00));
}
}
}
test "Atomic.fetchOr" {
inline for (atomicIntTypes()) |Int| {
inline for (atomic_rmw_orderings) |ordering| {
var x = Atomic(Int).init(0b11);
try testing.expectEqual(x.fetchOr(0b100, ordering), 0b11);
try testing.expectEqual(x.load(.SeqCst), 0b111);
try testing.expectEqual(x.fetchOr(0b010, ordering), 0b111);
try testing.expectEqual(x.load(.SeqCst), 0b111);
}
}
}
test "Atomic.fetchXor" {
inline for (atomicIntTypes()) |Int| {
inline for (atomic_rmw_orderings) |ordering| {
var x = Atomic(Int).init(0b11);
try testing.expectEqual(x.fetchXor(0b10, ordering), 0b11);
try testing.expectEqual(x.load(.SeqCst), 0b01);
try testing.expectEqual(x.fetchXor(0b01, ordering), 0b01);
try testing.expectEqual(x.load(.SeqCst), 0b00);
}
}
}
test "Atomic.bitSet" {
inline for (atomicIntTypes()) |Int| {
inline for (atomic_rmw_orderings) |ordering| {
var x = Atomic(Int).init(0);
for (0..@bitSizeOf(Int)) |bit_index| {
const bit = @as(std.math.Log2Int(Int), @intCast(bit_index));
const mask = @as(Int, 1) << bit;
// setting the bit should change the bit
try testing.expect(x.load(.SeqCst) & mask == 0);
try testing.expectEqual(x.bitSet(bit, ordering), 0);
try testing.expect(x.load(.SeqCst) & mask != 0);
// setting it again shouldn't change the bit
try testing.expectEqual(x.bitSet(bit, ordering), 1);
try testing.expect(x.load(.SeqCst) & mask != 0);
// all the previous bits should have not changed (still be set)
for (0..bit_index) |prev_bit_index| {
const prev_bit = @as(std.math.Log2Int(Int), @intCast(prev_bit_index));
const prev_mask = @as(Int, 1) << prev_bit;
try testing.expect(x.load(.SeqCst) & prev_mask != 0);
}
}
}
}
}
test "Atomic.bitReset" {
inline for (atomicIntTypes()) |Int| {
inline for (atomic_rmw_orderings) |ordering| {
var x = Atomic(Int).init(0);
for (0..@bitSizeOf(Int)) |bit_index| {
const bit = @as(std.math.Log2Int(Int), @intCast(bit_index));
const mask = @as(Int, 1) << bit;
x.storeUnchecked(x.loadUnchecked() | mask);
// unsetting the bit should change the bit
try testing.expect(x.load(.SeqCst) & mask != 0);
try testing.expectEqual(x.bitReset(bit, ordering), 1);
try testing.expect(x.load(.SeqCst) & mask == 0);
// unsetting it again shouldn't change the bit
try testing.expectEqual(x.bitReset(bit, ordering), 0);
try testing.expect(x.load(.SeqCst) & mask == 0);
// all the previous bits should have not changed (still be reset)
for (0..bit_index) |prev_bit_index| {
const prev_bit = @as(std.math.Log2Int(Int), @intCast(prev_bit_index));
const prev_mask = @as(Int, 1) << prev_bit;
try testing.expect(x.load(.SeqCst) & prev_mask == 0);
}
}
}
}
}
test "Atomic.bitToggle" {
inline for (atomicIntTypes()) |Int| {
inline for (atomic_rmw_orderings) |ordering| {
var x = Atomic(Int).init(0);
for (0..@bitSizeOf(Int)) |bit_index| {
const bit = @as(std.math.Log2Int(Int), @intCast(bit_index));
const mask = @as(Int, 1) << bit;
// toggling the bit should change the bit
try testing.expect(x.load(.SeqCst) & mask == 0);
try testing.expectEqual(x.bitToggle(bit, ordering), 0);
try testing.expect(x.load(.SeqCst) & mask != 0);
// toggling it again *should* change the bit
try testing.expectEqual(x.bitToggle(bit, ordering), 1);
try testing.expect(x.load(.SeqCst) & mask == 0);
// all the previous bits should have not changed (still be toggled back)
for (0..bit_index) |prev_bit_index| {
const prev_bit = @as(std.math.Log2Int(Int), @intCast(prev_bit_index));
const prev_mask = @as(Int, 1) << prev_bit;
try testing.expect(x.load(.SeqCst) & prev_mask == 0);
}
}
}
}
}

View File

@ -1,413 +0,0 @@
const std = @import("../std.zig");
const builtin = @import("builtin");
const assert = std.debug.assert;
const expect = std.testing.expect;
/// Many producer, many consumer, non-allocating, thread-safe.
/// Uses a mutex to protect access.
/// The queue does not manage ownership and the user is responsible to
/// manage the storage of the nodes.
pub fn Queue(comptime T: type) type {
return struct {
head: ?*Node,
tail: ?*Node,
mutex: std.Thread.Mutex,
pub const Self = @This();
pub const Node = std.DoublyLinkedList(T).Node;
/// Initializes a new queue. The queue does not provide a `deinit()`
/// function, so the user must take care of cleaning up the queue elements.
pub fn init() Self {
return Self{
.head = null,
.tail = null,
.mutex = std.Thread.Mutex{},
};
}
/// Appends `node` to the queue.
/// The lifetime of `node` must be longer than the lifetime of the queue.
pub fn put(self: *Self, node: *Node) void {
node.next = null;
self.mutex.lock();
defer self.mutex.unlock();
node.prev = self.tail;
self.tail = node;
if (node.prev) |prev_tail| {
prev_tail.next = node;
} else {
assert(self.head == null);
self.head = node;
}
}
/// Gets a previously inserted node or returns `null` if there is none.
/// It is safe to `get()` a node from the queue while another thread tries
/// to `remove()` the same node at the same time.
pub fn get(self: *Self) ?*Node {
self.mutex.lock();
defer self.mutex.unlock();
const head = self.head orelse return null;
self.head = head.next;
if (head.next) |new_head| {
new_head.prev = null;
} else {
self.tail = null;
}
// This way, a get() and a remove() are thread-safe with each other.
head.prev = null;
head.next = null;
return head;
}
/// Prepends `node` to the front of the queue.
/// The lifetime of `node` must be longer than the lifetime of the queue.
pub fn unget(self: *Self, node: *Node) void {
node.prev = null;
self.mutex.lock();
defer self.mutex.unlock();
const opt_head = self.head;
self.head = node;
if (opt_head) |old_head| {
node.next = old_head;
} else {
assert(self.tail == null);
self.tail = node;
}
}
/// Removes a node from the queue, returns whether node was actually removed.
/// It is safe to `remove()` a node from the queue while another thread tries
/// to `get()` the same node at the same time.
pub fn remove(self: *Self, node: *Node) bool {
self.mutex.lock();
defer self.mutex.unlock();
if (node.prev == null and node.next == null and self.head != node) {
return false;
}
if (node.prev) |prev| {
prev.next = node.next;
} else {
self.head = node.next;
}
if (node.next) |next| {
next.prev = node.prev;
} else {
self.tail = node.prev;
}
node.prev = null;
node.next = null;
return true;
}
/// Returns `true` if the queue is currently empty.
/// Note that in a multi-consumer environment a return value of `false`
/// does not mean that `get` will yield a non-`null` value!
pub fn isEmpty(self: *Self) bool {
self.mutex.lock();
defer self.mutex.unlock();
return self.head == null;
}
/// Dumps the contents of the queue to `stderr`.
pub fn dump(self: *Self) void {
self.dumpToStream(std.io.getStdErr().writer()) catch return;
}
/// Dumps the contents of the queue to `stream`.
/// Up to 4 elements from the head are dumped and the tail of the queue is
/// dumped as well.
pub fn dumpToStream(self: *Self, stream: anytype) !void {
const S = struct {
fn dumpRecursive(
s: anytype,
optional_node: ?*Node,
indent: usize,
comptime depth: comptime_int,
) !void {
try s.writeByteNTimes(' ', indent);
if (optional_node) |node| {
try s.print("0x{x}={}\n", .{ @intFromPtr(node), node.data });
if (depth == 0) {
try s.print("(max depth)\n", .{});
return;
}
try dumpRecursive(s, node.next, indent + 1, depth - 1);
} else {
try s.print("(null)\n", .{});
}
}
};
self.mutex.lock();
defer self.mutex.unlock();
try stream.print("head: ", .{});
try S.dumpRecursive(stream, self.head, 0, 4);
try stream.print("tail: ", .{});
try S.dumpRecursive(stream, self.tail, 0, 4);
}
};
}
const Context = struct {
allocator: std.mem.Allocator,
queue: *Queue(i32),
put_sum: isize,
get_sum: isize,
get_count: usize,
puts_done: bool,
};
// TODO add lazy evaluated build options and then put puts_per_thread behind
// some option such as: "AggressiveMultithreadedFuzzTest". In the AppVeyor
// CI we would use a less aggressive setting since at 1 core, while we still
// want this test to pass, we need a smaller value since there is so much thrashing
// we would also use a less aggressive setting when running in valgrind
const puts_per_thread = 500;
const put_thread_count = 3;
test "std.atomic.Queue" {
const plenty_of_memory = try std.heap.page_allocator.alloc(u8, 300 * 1024);
defer std.heap.page_allocator.free(plenty_of_memory);
var fixed_buffer_allocator = std.heap.FixedBufferAllocator.init(plenty_of_memory);
const a = fixed_buffer_allocator.threadSafeAllocator();
var queue = Queue(i32).init();
var context = Context{
.allocator = a,
.queue = &queue,
.put_sum = 0,
.get_sum = 0,
.puts_done = false,
.get_count = 0,
};
if (builtin.single_threaded) {
try expect(context.queue.isEmpty());
{
var i: usize = 0;
while (i < put_thread_count) : (i += 1) {
try expect(startPuts(&context) == 0);
}
}
try expect(!context.queue.isEmpty());
context.puts_done = true;
{
var i: usize = 0;
while (i < put_thread_count) : (i += 1) {
try expect(startGets(&context) == 0);
}
}
try expect(context.queue.isEmpty());
} else {
try expect(context.queue.isEmpty());
var putters: [put_thread_count]std.Thread = undefined;
for (&putters) |*t| {
t.* = try std.Thread.spawn(.{}, startPuts, .{&context});
}
var getters: [put_thread_count]std.Thread = undefined;
for (&getters) |*t| {
t.* = try std.Thread.spawn(.{}, startGets, .{&context});
}
for (putters) |t|
t.join();
@atomicStore(bool, &context.puts_done, true, .SeqCst);
for (getters) |t|
t.join();
try expect(context.queue.isEmpty());
}
if (context.put_sum != context.get_sum) {
std.debug.panic("failure\nput_sum:{} != get_sum:{}", .{ context.put_sum, context.get_sum });
}
if (context.get_count != puts_per_thread * put_thread_count) {
std.debug.panic("failure\nget_count:{} != puts_per_thread:{} * put_thread_count:{}", .{
context.get_count,
@as(u32, puts_per_thread),
@as(u32, put_thread_count),
});
}
}
fn startPuts(ctx: *Context) u8 {
var put_count: usize = puts_per_thread;
var prng = std.rand.DefaultPrng.init(0xdeadbeef);
const random = prng.random();
while (put_count != 0) : (put_count -= 1) {
std.time.sleep(1); // let the os scheduler be our fuzz
const x = @as(i32, @bitCast(random.int(u32)));
const node = ctx.allocator.create(Queue(i32).Node) catch unreachable;
node.* = .{
.prev = undefined,
.next = undefined,
.data = x,
};
ctx.queue.put(node);
_ = @atomicRmw(isize, &ctx.put_sum, .Add, x, .SeqCst);
}
return 0;
}
fn startGets(ctx: *Context) u8 {
while (true) {
const last = @atomicLoad(bool, &ctx.puts_done, .SeqCst);
while (ctx.queue.get()) |node| {
std.time.sleep(1); // let the os scheduler be our fuzz
_ = @atomicRmw(isize, &ctx.get_sum, .Add, node.data, .SeqCst);
_ = @atomicRmw(usize, &ctx.get_count, .Add, 1, .SeqCst);
}
if (last) return 0;
}
}
test "std.atomic.Queue single-threaded" {
var queue = Queue(i32).init();
try expect(queue.isEmpty());
var node_0 = Queue(i32).Node{
.data = 0,
.next = undefined,
.prev = undefined,
};
queue.put(&node_0);
try expect(!queue.isEmpty());
var node_1 = Queue(i32).Node{
.data = 1,
.next = undefined,
.prev = undefined,
};
queue.put(&node_1);
try expect(!queue.isEmpty());
try expect(queue.get().?.data == 0);
try expect(!queue.isEmpty());
var node_2 = Queue(i32).Node{
.data = 2,
.next = undefined,
.prev = undefined,
};
queue.put(&node_2);
try expect(!queue.isEmpty());
var node_3 = Queue(i32).Node{
.data = 3,
.next = undefined,
.prev = undefined,
};
queue.put(&node_3);
try expect(!queue.isEmpty());
try expect(queue.get().?.data == 1);
try expect(!queue.isEmpty());
try expect(queue.get().?.data == 2);
try expect(!queue.isEmpty());
var node_4 = Queue(i32).Node{
.data = 4,
.next = undefined,
.prev = undefined,
};
queue.put(&node_4);
try expect(!queue.isEmpty());
try expect(queue.get().?.data == 3);
node_3.next = null;
try expect(!queue.isEmpty());
queue.unget(&node_3);
try expect(queue.get().?.data == 3);
try expect(!queue.isEmpty());
try expect(queue.get().?.data == 4);
try expect(queue.isEmpty());
try expect(queue.get() == null);
try expect(queue.isEmpty());
// unget an empty queue
queue.unget(&node_4);
try expect(queue.tail == &node_4);
try expect(queue.head == &node_4);
try expect(queue.get().?.data == 4);
try expect(queue.get() == null);
try expect(queue.isEmpty());
}
test "std.atomic.Queue dump" {
const mem = std.mem;
var buffer: [1024]u8 = undefined;
var expected_buffer: [1024]u8 = undefined;
var fbs = std.io.fixedBufferStream(&buffer);
var queue = Queue(i32).init();
// Test empty stream
fbs.reset();
try queue.dumpToStream(fbs.writer());
try expect(mem.eql(u8, buffer[0..fbs.pos],
\\head: (null)
\\tail: (null)
\\
));
// Test a stream with one element
var node_0 = Queue(i32).Node{
.data = 1,
.next = undefined,
.prev = undefined,
};
queue.put(&node_0);
fbs.reset();
try queue.dumpToStream(fbs.writer());
var expected = try std.fmt.bufPrint(expected_buffer[0..],
\\head: 0x{x}=1
\\ (null)
\\tail: 0x{x}=1
\\ (null)
\\
, .{ @intFromPtr(queue.head), @intFromPtr(queue.tail) });
try expect(mem.eql(u8, buffer[0..fbs.pos], expected));
// Test a stream with two elements
var node_1 = Queue(i32).Node{
.data = 2,
.next = undefined,
.prev = undefined,
};
queue.put(&node_1);
fbs.reset();
try queue.dumpToStream(fbs.writer());
expected = try std.fmt.bufPrint(expected_buffer[0..],
\\head: 0x{x}=1
\\ 0x{x}=2
\\ (null)
\\tail: 0x{x}=2
\\ (null)
\\
, .{ @intFromPtr(queue.head), @intFromPtr(queue.head.?.next), @intFromPtr(queue.tail) });
try expect(mem.eql(u8, buffer[0..fbs.pos], expected));
}

View File

@ -1,178 +0,0 @@
const std = @import("../std.zig");
const builtin = @import("builtin");
const assert = std.debug.assert;
const expect = std.testing.expect;
/// Many reader, many writer, non-allocating, thread-safe.
/// Uses a spinlock to protect `push()` and `pop()`.
/// When building in single threaded mode, this is a simple linked list.
pub fn Stack(comptime T: type) type {
return struct {
root: ?*Node,
lock: @TypeOf(lock_init),
const lock_init = if (builtin.single_threaded) {} else false;
pub const Self = @This();
pub const Node = struct {
next: ?*Node,
data: T,
};
pub fn init() Self {
return Self{
.root = null,
.lock = lock_init,
};
}
/// push operation, but only if you are the first item in the stack. if you did not succeed in
/// being the first item in the stack, returns the other item that was there.
pub fn pushFirst(self: *Self, node: *Node) ?*Node {
node.next = null;
return @cmpxchgStrong(?*Node, &self.root, null, node, .SeqCst, .SeqCst);
}
pub fn push(self: *Self, node: *Node) void {
if (builtin.single_threaded) {
node.next = self.root;
self.root = node;
} else {
while (@atomicRmw(bool, &self.lock, .Xchg, true, .SeqCst)) {}
defer assert(@atomicRmw(bool, &self.lock, .Xchg, false, .SeqCst));
node.next = self.root;
self.root = node;
}
}
pub fn pop(self: *Self) ?*Node {
if (builtin.single_threaded) {
const root = self.root orelse return null;
self.root = root.next;
return root;
} else {
while (@atomicRmw(bool, &self.lock, .Xchg, true, .SeqCst)) {}
defer assert(@atomicRmw(bool, &self.lock, .Xchg, false, .SeqCst));
const root = self.root orelse return null;
self.root = root.next;
return root;
}
}
pub fn isEmpty(self: *Self) bool {
return @atomicLoad(?*Node, &self.root, .SeqCst) == null;
}
};
}
const Context = struct {
allocator: std.mem.Allocator,
stack: *Stack(i32),
put_sum: isize,
get_sum: isize,
get_count: usize,
puts_done: bool,
};
// TODO add lazy evaluated build options and then put puts_per_thread behind
// some option such as: "AggressiveMultithreadedFuzzTest". In the AppVeyor
// CI we would use a less aggressive setting since at 1 core, while we still
// want this test to pass, we need a smaller value since there is so much thrashing
// we would also use a less aggressive setting when running in valgrind
const puts_per_thread = 500;
const put_thread_count = 3;
test "std.atomic.stack" {
const plenty_of_memory = try std.heap.page_allocator.alloc(u8, 300 * 1024);
defer std.heap.page_allocator.free(plenty_of_memory);
var fixed_buffer_allocator = std.heap.FixedBufferAllocator.init(plenty_of_memory);
const a = fixed_buffer_allocator.threadSafeAllocator();
var stack = Stack(i32).init();
var context = Context{
.allocator = a,
.stack = &stack,
.put_sum = 0,
.get_sum = 0,
.puts_done = false,
.get_count = 0,
};
if (builtin.single_threaded) {
{
var i: usize = 0;
while (i < put_thread_count) : (i += 1) {
try expect(startPuts(&context) == 0);
}
}
context.puts_done = true;
{
var i: usize = 0;
while (i < put_thread_count) : (i += 1) {
try expect(startGets(&context) == 0);
}
}
} else {
var putters: [put_thread_count]std.Thread = undefined;
for (&putters) |*t| {
t.* = try std.Thread.spawn(.{}, startPuts, .{&context});
}
var getters: [put_thread_count]std.Thread = undefined;
for (&getters) |*t| {
t.* = try std.Thread.spawn(.{}, startGets, .{&context});
}
for (putters) |t|
t.join();
@atomicStore(bool, &context.puts_done, true, .SeqCst);
for (getters) |t|
t.join();
}
if (context.put_sum != context.get_sum) {
std.debug.panic("failure\nput_sum:{} != get_sum:{}", .{ context.put_sum, context.get_sum });
}
if (context.get_count != puts_per_thread * put_thread_count) {
std.debug.panic("failure\nget_count:{} != puts_per_thread:{} * put_thread_count:{}", .{
context.get_count,
@as(u32, puts_per_thread),
@as(u32, put_thread_count),
});
}
}
fn startPuts(ctx: *Context) u8 {
var put_count: usize = puts_per_thread;
var prng = std.rand.DefaultPrng.init(0xdeadbeef);
const random = prng.random();
while (put_count != 0) : (put_count -= 1) {
std.time.sleep(1); // let the os scheduler be our fuzz
const x = @as(i32, @bitCast(random.int(u32)));
const node = ctx.allocator.create(Stack(i32).Node) catch unreachable;
node.* = Stack(i32).Node{
.next = undefined,
.data = x,
};
ctx.stack.push(node);
_ = @atomicRmw(isize, &ctx.put_sum, .Add, x, .SeqCst);
}
return 0;
}
fn startGets(ctx: *Context) u8 {
while (true) {
const last = @atomicLoad(bool, &ctx.puts_done, .SeqCst);
while (ctx.stack.pop()) |node| {
std.time.sleep(1); // let the os scheduler be our fuzz
_ = @atomicRmw(isize, &ctx.get_sum, .Add, node.data, .SeqCst);
_ = @atomicRmw(usize, &ctx.get_count, .Add, 1, .SeqCst);
}
if (last) return 0;
}
}

View File

@ -1286,7 +1286,7 @@ fn windowsMakePipeIn(rd: *?windows.HANDLE, wr: *?windows.HANDLE, sattr: *const w
wr.* = wr_h;
}
var pipe_name_counter = std.atomic.Atomic(u32).init(1);
var pipe_name_counter = std.atomic.Value(u32).init(1);
fn windowsMakeAsyncPipe(rd: *?windows.HANDLE, wr: *?windows.HANDLE, sattr: *const windows.SECURITY_ATTRIBUTES) !void {
var tmp_bufw: [128]u16 = undefined;

View File

@ -375,7 +375,7 @@ pub fn panicExtra(
/// Non-zero whenever the program triggered a panic.
/// The counter is incremented/decremented atomically.
var panicking = std.atomic.Atomic(u8).init(0);
var panicking = std.atomic.Value(u8).init(0);
// Locked to avoid interleaving panic messages from multiple threads.
var panic_mutex = std.Thread.Mutex{};
@ -448,7 +448,7 @@ fn waitForOtherThreadToFinishPanicking() void {
if (builtin.single_threaded) unreachable;
// Sleep forever without hammering the CPU
var futex = std.atomic.Atomic(u32).init(0);
var futex = std.atomic.Value(u32).init(0);
while (true) std.Thread.Futex.wait(&futex, 0);
unreachable;
}

View File

@ -7,7 +7,6 @@ const os = std.os;
const windows = os.windows;
const maxInt = std.math.maxInt;
const Thread = std.Thread;
const Atomic = std.atomic.Atomic;
const is_windows = builtin.os.tag == .windows;
@ -854,7 +853,7 @@ pub const Loop = struct {
waiters: Waiters,
thread: std.Thread,
event: std.Thread.ResetEvent,
is_running: Atomic(bool),
is_running: std.atomic.Value(bool),
/// Initialize the delay queue by spawning the timer thread
/// and starting any timer resources.
@ -866,7 +865,7 @@ pub const Loop = struct {
},
.thread = undefined,
.event = .{},
.is_running = Atomic(bool).init(true),
.is_running = std.atomic.Value(bool).init(true),
};
// Must be after init so that it can read the other state, such as `is_running`.

View File

@ -6461,7 +6461,7 @@ pub const CopyFileRangeError = error{
CorruptedData,
} || PReadError || PWriteError || UnexpectedError;
var has_copy_file_range_syscall = std.atomic.Atomic(bool).init(true);
var has_copy_file_range_syscall = std.atomic.Value(bool).init(true);
/// Transfer data between file descriptors at specified offsets.
/// Returns the number of bytes written, which can less than requested.

View File

@ -322,7 +322,7 @@ const PanicSwitch = struct {
/// Updated atomically before taking the panic_mutex.
/// In recoverable cases, the program will not abort
/// until all panicking threads have dumped their traces.
var panicking = std.atomic.Atomic(u8).init(0);
var panicking = std.atomic.Value(u8).init(0);
// Locked to avoid interleaving panic messages from multiple threads.
var panic_mutex = std.Thread.Mutex{};
@ -477,7 +477,7 @@ const PanicSwitch = struct {
// and call abort()
// Sleep forever without hammering the CPU
var futex = std.atomic.Atomic(u32).init(0);
var futex = std.atomic.Value(u32).init(0);
while (true) std.Thread.Futex.wait(&futex, 0);
// This should be unreachable, recurse into recoverAbort.