mirror of
https://github.com/ziglang/zig.git
synced 2026-02-13 12:59:04 +00:00
implement Mutex, Condition, and Queue
This commit is contained in:
parent
012ef81b8b
commit
a7790bd32e
322
lib/std/Io.zig
322
lib/std/Io.zig
@ -1,7 +1,5 @@
|
||||
const std = @import("std.zig");
|
||||
const builtin = @import("builtin");
|
||||
const root = @import("root");
|
||||
const c = std.c;
|
||||
const std = @import("std.zig");
|
||||
const is_windows = builtin.os.tag == .windows;
|
||||
const windows = std.os.windows;
|
||||
const posix = std.posix;
|
||||
@ -9,8 +7,6 @@ const math = std.math;
|
||||
const assert = std.debug.assert;
|
||||
const fs = std.fs;
|
||||
const mem = std.mem;
|
||||
const meta = std.meta;
|
||||
const File = std.fs.File;
|
||||
const Allocator = std.mem.Allocator;
|
||||
const Alignment = std.mem.Alignment;
|
||||
|
||||
@ -972,6 +968,12 @@ pub const VTable = struct {
|
||||
/// Thread-safe.
|
||||
cancelRequested: *const fn (?*anyopaque) bool,
|
||||
|
||||
mutexLock: *const fn (?*anyopaque, mutex: *Mutex) void,
|
||||
mutexUnlock: *const fn (?*anyopaque, mutex: *Mutex) void,
|
||||
|
||||
conditionWait: *const fn (?*anyopaque, cond: *Condition, mutex: *Mutex, timeout_ns: ?u64) Condition.WaitError!void,
|
||||
conditionWake: *const fn (?*anyopaque, cond: *Condition, notify: Condition.Notify) void,
|
||||
|
||||
createFile: *const fn (?*anyopaque, dir: fs.Dir, sub_path: []const u8, flags: fs.File.CreateFlags) FileOpenError!fs.File,
|
||||
openFile: *const fn (?*anyopaque, dir: fs.Dir, sub_path: []const u8, flags: fs.File.OpenFlags) FileOpenError!fs.File,
|
||||
closeFile: *const fn (?*anyopaque, fs.File) void,
|
||||
@ -985,11 +987,11 @@ pub const VTable = struct {
|
||||
pub const OpenFlags = fs.File.OpenFlags;
|
||||
pub const CreateFlags = fs.File.CreateFlags;
|
||||
|
||||
pub const FileOpenError = fs.File.OpenError || error{AsyncCancel};
|
||||
pub const FileReadError = fs.File.ReadError || error{AsyncCancel};
|
||||
pub const FilePReadError = fs.File.PReadError || error{AsyncCancel};
|
||||
pub const FileWriteError = fs.File.WriteError || error{AsyncCancel};
|
||||
pub const FilePWriteError = fs.File.PWriteError || error{AsyncCancel};
|
||||
pub const FileOpenError = fs.File.OpenError || error{Canceled};
|
||||
pub const FileReadError = fs.File.ReadError || error{Canceled};
|
||||
pub const FilePReadError = fs.File.PReadError || error{Canceled};
|
||||
pub const FileWriteError = fs.File.WriteError || error{Canceled};
|
||||
pub const FilePWriteError = fs.File.PWriteError || error{Canceled};
|
||||
|
||||
pub const Timestamp = enum(i96) {
|
||||
_,
|
||||
@ -1006,8 +1008,8 @@ pub const Deadline = union(enum) {
|
||||
nanoseconds: i96,
|
||||
timestamp: Timestamp,
|
||||
};
|
||||
pub const ClockGetTimeError = std.posix.ClockGetTimeError || error{AsyncCancel};
|
||||
pub const SleepError = error{ UnsupportedClock, Unexpected, AsyncCancel };
|
||||
pub const ClockGetTimeError = std.posix.ClockGetTimeError || error{Canceled};
|
||||
pub const SleepError = error{ UnsupportedClock, Unexpected, Canceled };
|
||||
|
||||
pub const AnyFuture = opaque {};
|
||||
|
||||
@ -1036,6 +1038,302 @@ pub fn Future(Result: type) type {
|
||||
};
|
||||
}
|
||||
|
||||
pub const Mutex = struct {
|
||||
state: std.atomic.Value(u32) = std.atomic.Value(u32).init(unlocked),
|
||||
|
||||
pub const unlocked: u32 = 0b00;
|
||||
pub const locked: u32 = 0b01;
|
||||
pub const contended: u32 = 0b11; // must contain the `locked` bit for x86 optimization below
|
||||
|
||||
pub fn tryLock(m: *Mutex) bool {
|
||||
// On x86, use `lock bts` instead of `lock cmpxchg` as:
|
||||
// - they both seem to mark the cache-line as modified regardless: https://stackoverflow.com/a/63350048
|
||||
// - `lock bts` is smaller instruction-wise which makes it better for inlining
|
||||
if (builtin.target.cpu.arch.isX86()) {
|
||||
const locked_bit = @ctz(locked);
|
||||
return m.state.bitSet(locked_bit, .acquire) == 0;
|
||||
}
|
||||
|
||||
// Acquire barrier ensures grabbing the lock happens before the critical section
|
||||
// and that the previous lock holder's critical section happens before we grab the lock.
|
||||
return m.state.cmpxchgWeak(unlocked, locked, .acquire, .monotonic) == null;
|
||||
}
|
||||
|
||||
/// Avoids the vtable for uncontended locks.
|
||||
pub fn lock(m: *Mutex, io: Io) void {
|
||||
if (!m.tryLock()) {
|
||||
@branchHint(.unlikely);
|
||||
io.vtable.mutexLock(io.userdata, m);
|
||||
}
|
||||
}
|
||||
|
||||
pub fn unlock(m: *Mutex, io: Io) void {
|
||||
io.vtable.mutexUnlock(io.userdata, m);
|
||||
}
|
||||
};
|
||||
|
||||
pub const Condition = struct {
|
||||
state: u64 = 0,
|
||||
|
||||
pub const WaitError = error{
|
||||
Timeout,
|
||||
Canceled,
|
||||
};
|
||||
|
||||
/// How many waiters to wake up.
|
||||
pub const Notify = enum {
|
||||
one,
|
||||
all,
|
||||
};
|
||||
|
||||
pub fn wait(cond: *Condition, io: Io, mutex: *Mutex) void {
|
||||
io.vtable.conditionWait(io.userdata, cond, mutex, null) catch |err| switch (err) {
|
||||
error.Timeout => unreachable, // no timeout provided so we shouldn't have timed-out
|
||||
error.Canceled => return, // handled as spurious wakeup
|
||||
};
|
||||
}
|
||||
|
||||
pub fn timedWait(cond: *Condition, io: Io, mutex: *Mutex, timeout_ns: u64) WaitError!void {
|
||||
return io.vtable.conditionWait(io.userdata, cond, mutex, timeout_ns);
|
||||
}
|
||||
|
||||
pub fn signal(cond: *Condition, io: Io) void {
|
||||
io.vtable.conditionWake(io.userdata, cond, .one);
|
||||
}
|
||||
|
||||
pub fn broadcast(cond: *Condition, io: Io) void {
|
||||
io.vtable.conditionWake(io.userdata, cond, .all);
|
||||
}
|
||||
};
|
||||
|
||||
pub const TypeErasedQueue = struct {
|
||||
mutex: Mutex,
|
||||
|
||||
/// Ring buffer. This data is logically *after* queued getters.
|
||||
buffer: []u8,
|
||||
put_index: usize,
|
||||
get_index: usize,
|
||||
|
||||
putters: std.DoublyLinkedList(PutNode),
|
||||
getters: std.DoublyLinkedList(GetNode),
|
||||
|
||||
const PutNode = struct {
|
||||
remaining: []const u8,
|
||||
condition: Condition,
|
||||
};
|
||||
|
||||
const GetNode = struct {
|
||||
remaining: []u8,
|
||||
condition: Condition,
|
||||
};
|
||||
|
||||
pub fn init(buffer: []u8) TypeErasedQueue {
|
||||
return .{
|
||||
.mutex = .{},
|
||||
.buffer = buffer,
|
||||
.put_index = 0,
|
||||
.get_index = 0,
|
||||
.putters = .{},
|
||||
.getters = .{},
|
||||
};
|
||||
}
|
||||
|
||||
pub fn put(q: *TypeErasedQueue, io: Io, elements: []const u8, min: usize) usize {
|
||||
assert(elements.len >= min);
|
||||
|
||||
q.mutex.lock(io);
|
||||
defer q.mutex.unlock(io);
|
||||
|
||||
// Getters have first priority on the data, and only when the getters
|
||||
// queue is empty do we start populating the buffer.
|
||||
|
||||
var remaining = elements;
|
||||
while (true) {
|
||||
const getter = q.getters.popFirst() orelse break;
|
||||
const copy_len = @min(getter.data.remaining.len, remaining.len);
|
||||
@memcpy(getter.data.remaining[0..copy_len], remaining[0..copy_len]);
|
||||
remaining = remaining[copy_len..];
|
||||
getter.data.remaining = getter.data.remaining[copy_len..];
|
||||
if (getter.data.remaining.len == 0) {
|
||||
getter.data.condition.signal(io);
|
||||
continue;
|
||||
}
|
||||
q.getters.prepend(getter);
|
||||
assert(remaining.len == 0);
|
||||
return elements.len;
|
||||
}
|
||||
|
||||
while (true) {
|
||||
{
|
||||
const available = q.buffer[q.put_index..];
|
||||
const copy_len = @min(available.len, remaining.len);
|
||||
@memcpy(available[0..copy_len], remaining[0..copy_len]);
|
||||
remaining = remaining[copy_len..];
|
||||
q.put_index += copy_len;
|
||||
if (remaining.len == 0) return elements.len;
|
||||
}
|
||||
{
|
||||
const available = q.buffer[0..q.get_index];
|
||||
const copy_len = @min(available.len, remaining.len);
|
||||
@memcpy(available[0..copy_len], remaining[0..copy_len]);
|
||||
remaining = remaining[copy_len..];
|
||||
q.put_index = copy_len;
|
||||
if (remaining.len == 0) return elements.len;
|
||||
}
|
||||
|
||||
const total_filled = elements.len - remaining.len;
|
||||
if (total_filled >= min) return total_filled;
|
||||
|
||||
var node: std.DoublyLinkedList(PutNode).Node = .{
|
||||
.data = .{ .remaining = remaining, .condition = .{} },
|
||||
};
|
||||
q.putters.append(&node);
|
||||
node.data.condition.wait(io, &q.mutex);
|
||||
remaining = node.data.remaining;
|
||||
}
|
||||
}
|
||||
|
||||
pub fn get(q: *@This(), io: Io, buffer: []u8, min: usize) usize {
|
||||
assert(buffer.len >= min);
|
||||
|
||||
q.mutex.lock(io);
|
||||
defer q.mutex.unlock(io);
|
||||
|
||||
// The ring buffer gets first priority, then data should come from any
|
||||
// queued putters, then finally the ring buffer should be filled with
|
||||
// data from putters so they can be resumed.
|
||||
|
||||
var remaining = buffer;
|
||||
while (true) {
|
||||
if (q.get_index <= q.put_index) {
|
||||
const available = q.buffer[q.get_index..q.put_index];
|
||||
const copy_len = @min(available.len, remaining.len);
|
||||
@memcpy(remaining[0..copy_len], available[0..copy_len]);
|
||||
q.get_index += copy_len;
|
||||
remaining = remaining[copy_len..];
|
||||
if (remaining.len == 0) return fillRingBufferFromPutters(q, io, buffer.len);
|
||||
} else {
|
||||
{
|
||||
const available = q.buffer[q.get_index..];
|
||||
const copy_len = @min(available.len, remaining.len);
|
||||
@memcpy(remaining[0..copy_len], available[0..copy_len]);
|
||||
q.get_index += copy_len;
|
||||
remaining = remaining[copy_len..];
|
||||
if (remaining.len == 0) return fillRingBufferFromPutters(q, io, buffer.len);
|
||||
}
|
||||
{
|
||||
const available = q.buffer[0..q.put_index];
|
||||
const copy_len = @min(available.len, remaining.len);
|
||||
@memcpy(remaining[0..copy_len], available[0..copy_len]);
|
||||
q.get_index = copy_len;
|
||||
remaining = remaining[copy_len..];
|
||||
if (remaining.len == 0) return fillRingBufferFromPutters(q, io, buffer.len);
|
||||
}
|
||||
}
|
||||
// Copy directly from putters into buffer.
|
||||
while (remaining.len > 0) {
|
||||
const putter = q.putters.popFirst() orelse break;
|
||||
const copy_len = @min(putter.data.remaining.len, remaining.len);
|
||||
@memcpy(remaining[0..copy_len], putter.data.remaining[0..copy_len]);
|
||||
putter.data.remaining = putter.data.remaining[copy_len..];
|
||||
remaining = remaining[copy_len..];
|
||||
if (putter.data.remaining.len == 0) {
|
||||
putter.data.condition.signal(io);
|
||||
} else {
|
||||
assert(remaining.len == 0);
|
||||
q.putters.prepend(putter);
|
||||
return fillRingBufferFromPutters(q, io, buffer.len);
|
||||
}
|
||||
}
|
||||
// Both ring buffer and putters queue is empty.
|
||||
const total_filled = buffer.len - remaining.len;
|
||||
if (total_filled >= min) return total_filled;
|
||||
|
||||
var node: std.DoublyLinkedList(GetNode).Node = .{
|
||||
.data = .{ .remaining = remaining, .condition = .{} },
|
||||
};
|
||||
q.getters.append(&node);
|
||||
node.data.condition.wait(io, &q.mutex);
|
||||
remaining = node.data.remaining;
|
||||
}
|
||||
}
|
||||
|
||||
/// Called when there is nonzero space available in the ring buffer and
|
||||
/// potentially putters waiting. The mutex is already held and the task is
|
||||
/// to copy putter data to the ring buffer and signal any putters whose
|
||||
/// buffers been fully copied.
|
||||
fn fillRingBufferFromPutters(q: *TypeErasedQueue, io: Io, len: usize) usize {
|
||||
while (true) {
|
||||
const putter = q.putters.popFirst() orelse return len;
|
||||
const available = q.buffer[q.put_index..];
|
||||
const copy_len = @min(available.len, putter.data.remaining.len);
|
||||
@memcpy(available[0..copy_len], putter.data.remaining[0..copy_len]);
|
||||
putter.data.remaining = putter.data.remaining[copy_len..];
|
||||
q.put_index += copy_len;
|
||||
if (putter.data.remaining.len == 0) {
|
||||
putter.data.condition.signal(io);
|
||||
continue;
|
||||
}
|
||||
const second_available = q.buffer[0..q.get_index];
|
||||
const second_copy_len = @min(second_available.len, putter.data.remaining.len);
|
||||
@memcpy(second_available[0..second_copy_len], putter.data.remaining[0..second_copy_len]);
|
||||
putter.data.remaining = putter.data.remaining[copy_len..];
|
||||
q.put_index = copy_len;
|
||||
if (putter.data.remaining.len == 0) {
|
||||
putter.data.condition.signal(io);
|
||||
continue;
|
||||
}
|
||||
q.putters.prepend(putter);
|
||||
return len;
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
/// Many producer, many consumer, thread-safe, runtime configurable buffer size.
|
||||
/// When buffer is empty, consumers suspend and are resumed by producers.
|
||||
/// When buffer is full, producers suspend and are resumed by consumers.
|
||||
pub fn Queue(Elem: type) type {
|
||||
return struct {
|
||||
type_erased: TypeErasedQueue,
|
||||
|
||||
pub fn init(buffer: []Elem) @This() {
|
||||
return .{ .type_erased = .init(@ptrCast(buffer)) };
|
||||
}
|
||||
|
||||
/// Appends elements to the end of the queue. The function returns when
|
||||
/// at least `min` elements have been added to the buffer or sent
|
||||
/// directly to a consumer.
|
||||
///
|
||||
/// Returns how many elements have been added to the queue.
|
||||
///
|
||||
/// Asserts that `elements.len >= min`.
|
||||
pub fn put(q: *@This(), io: Io, elements: []const Elem, min: usize) usize {
|
||||
return @divExact(q.type_erased.put(io, @ptrCast(elements), min * @sizeOf(Elem)), @sizeOf(Elem));
|
||||
}
|
||||
|
||||
/// Receives elements from the beginning of the queue. The function
|
||||
/// returns when at least `min` elements have been populated inside
|
||||
/// `buffer`.
|
||||
///
|
||||
/// Returns how many elements of `buffer` have been populated.
|
||||
///
|
||||
/// Asserts that `buffer.len >= min`.
|
||||
pub fn get(q: *@This(), io: Io, buffer: []Elem, min: usize) usize {
|
||||
return @divExact(q.type_erased.get(io, @ptrCast(buffer), min * @sizeOf(Elem)), @sizeOf(Elem));
|
||||
}
|
||||
|
||||
pub fn putOne(q: *@This(), io: Io, item: Elem) void {
|
||||
assert(q.put(io, &.{item}, 1) == 1);
|
||||
}
|
||||
|
||||
pub fn getOne(q: *@This(), io: Io) Elem {
|
||||
var buf: [1]Elem = undefined;
|
||||
assert(q.get(io, &buf, 1) == 1);
|
||||
return buf[0];
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
/// Calls `function` with `args`, such that the return value of the function is
|
||||
/// not guaranteed to be available until `await` is called.
|
||||
pub fn async(io: Io, function: anytype, args: anytype) Future(@typeInfo(@TypeOf(function)).@"fn".return_type.?) {
|
||||
|
||||
@ -102,7 +102,7 @@ const Fiber = struct {
|
||||
return @ptrFromInt(alignment.forward(@intFromPtr(f) + @sizeOf(Fiber)));
|
||||
}
|
||||
|
||||
fn enterCancelRegion(fiber: *Fiber, thread: *Thread) error{AsyncCancel}!void {
|
||||
fn enterCancelRegion(fiber: *Fiber, thread: *Thread) error{Canceled}!void {
|
||||
if (@cmpxchgStrong(
|
||||
?*Thread,
|
||||
&fiber.cancel_thread,
|
||||
@ -112,7 +112,7 @@ const Fiber = struct {
|
||||
.acquire,
|
||||
)) |cancel_thread| {
|
||||
assert(cancel_thread == Thread.canceling);
|
||||
return error.AsyncCancel;
|
||||
return error.Canceled;
|
||||
}
|
||||
}
|
||||
|
||||
@ -746,7 +746,7 @@ pub fn createFile(
|
||||
switch (errno(completion.result)) {
|
||||
.SUCCESS => return .{ .handle = completion.result },
|
||||
.INTR => unreachable,
|
||||
.CANCELED => return error.AsyncCancel,
|
||||
.CANCELED => return error.Canceled,
|
||||
|
||||
.FAULT => unreachable,
|
||||
.INVAL => return error.BadPathName,
|
||||
@ -854,7 +854,7 @@ pub fn openFile(
|
||||
switch (errno(completion.result)) {
|
||||
.SUCCESS => return .{ .handle = completion.result },
|
||||
.INTR => unreachable,
|
||||
.CANCELED => return error.AsyncCancel,
|
||||
.CANCELED => return error.Canceled,
|
||||
|
||||
.FAULT => unreachable,
|
||||
.INVAL => return error.BadPathName,
|
||||
@ -950,7 +950,7 @@ pub fn pread(userdata: ?*anyopaque, file: std.fs.File, buffer: []u8, offset: std
|
||||
switch (errno(completion.result)) {
|
||||
.SUCCESS => return @as(u32, @bitCast(completion.result)),
|
||||
.INTR => unreachable,
|
||||
.CANCELED => return error.AsyncCancel,
|
||||
.CANCELED => return error.Canceled,
|
||||
|
||||
.INVAL => unreachable,
|
||||
.FAULT => unreachable,
|
||||
@ -1002,7 +1002,7 @@ pub fn pwrite(userdata: ?*anyopaque, file: std.fs.File, buffer: []const u8, offs
|
||||
switch (errno(completion.result)) {
|
||||
.SUCCESS => return @as(u32, @bitCast(completion.result)),
|
||||
.INTR => unreachable,
|
||||
.CANCELED => return error.AsyncCancel,
|
||||
.CANCELED => return error.Canceled,
|
||||
|
||||
.INVAL => return error.InvalidArgument,
|
||||
.FAULT => unreachable,
|
||||
@ -1080,7 +1080,7 @@ pub fn sleep(userdata: ?*anyopaque, clockid: std.posix.clockid_t, deadline: Io.D
|
||||
switch (errno(completion.result)) {
|
||||
.SUCCESS, .TIME => return,
|
||||
.INTR => unreachable,
|
||||
.CANCELED => return error.AsyncCancel,
|
||||
.CANCELED => return error.Canceled,
|
||||
|
||||
else => |err| return std.posix.unexpectedErrno(err),
|
||||
}
|
||||
|
||||
@ -332,9 +332,12 @@ pub fn io(pool: *Pool) Io {
|
||||
.vtable = &.{
|
||||
.@"async" = @"async",
|
||||
.@"await" = @"await",
|
||||
|
||||
.cancel = cancel,
|
||||
.cancelRequested = cancelRequested,
|
||||
.mutexLock = mutexLock,
|
||||
.mutexUnlock = mutexUnlock,
|
||||
.conditionWait = conditionWait,
|
||||
.conditionWake = conditionWake,
|
||||
|
||||
.createFile = createFile,
|
||||
.openFile = openFile,
|
||||
@ -517,11 +520,179 @@ fn cancelRequested(userdata: ?*anyopaque) bool {
|
||||
return @atomicLoad(std.Thread.Id, &closure.cancel_tid, .acquire) == AsyncClosure.canceling_tid;
|
||||
}
|
||||
|
||||
fn checkCancel(pool: *Pool) error{AsyncCancel}!void {
|
||||
if (cancelRequested(pool)) return error.AsyncCancel;
|
||||
fn checkCancel(pool: *Pool) error{Canceled}!void {
|
||||
if (cancelRequested(pool)) return error.Canceled;
|
||||
}
|
||||
|
||||
pub fn createFile(
|
||||
fn mutexLock(userdata: ?*anyopaque, m: *Io.Mutex) void {
|
||||
@branchHint(.cold);
|
||||
const pool: *std.Thread.Pool = @alignCast(@ptrCast(userdata));
|
||||
_ = pool;
|
||||
|
||||
// Avoid doing an atomic swap below if we already know the state is contended.
|
||||
// An atomic swap unconditionally stores which marks the cache-line as modified unnecessarily.
|
||||
if (m.state.load(.monotonic) == Io.Mutex.contended) {
|
||||
std.Thread.Futex.wait(&m.state, Io.Mutex.contended);
|
||||
}
|
||||
|
||||
// Try to acquire the lock while also telling the existing lock holder that there are threads waiting.
|
||||
//
|
||||
// Once we sleep on the Futex, we must acquire the mutex using `contended` rather than `locked`.
|
||||
// If not, threads sleeping on the Futex wouldn't see the state change in unlock and potentially deadlock.
|
||||
// The downside is that the last mutex unlocker will see `contended` and do an unnecessary Futex wake
|
||||
// but this is better than having to wake all waiting threads on mutex unlock.
|
||||
//
|
||||
// Acquire barrier ensures grabbing the lock happens before the critical section
|
||||
// and that the previous lock holder's critical section happens before we grab the lock.
|
||||
while (m.state.swap(Io.Mutex.contended, .acquire) != Io.Mutex.unlocked) {
|
||||
std.Thread.Futex.wait(&m.state, Io.Mutex.contended);
|
||||
}
|
||||
}
|
||||
|
||||
fn mutexUnlock(userdata: ?*anyopaque, m: *Io.Mutex) void {
|
||||
const pool: *std.Thread.Pool = @alignCast(@ptrCast(userdata));
|
||||
_ = pool;
|
||||
// Needs to also wake up a waiting thread if any.
|
||||
//
|
||||
// A waiting thread will acquire with `contended` instead of `locked`
|
||||
// which ensures that it wakes up another thread on the next unlock().
|
||||
//
|
||||
// Release barrier ensures the critical section happens before we let go of the lock
|
||||
// and that our critical section happens before the next lock holder grabs the lock.
|
||||
const state = m.state.swap(Io.Mutex.unlocked, .release);
|
||||
assert(state != Io.Mutex.unlocked);
|
||||
|
||||
if (state == Io.Mutex.contended) {
|
||||
std.Thread.Futex.wake(&m.state, 1);
|
||||
}
|
||||
}
|
||||
|
||||
fn mutexLockInternal(pool: *std.Thread.Pool, m: *Io.Mutex) void {
|
||||
if (!m.tryLock()) {
|
||||
@branchHint(.unlikely);
|
||||
mutexLock(pool, m);
|
||||
}
|
||||
}
|
||||
|
||||
fn conditionWait(
|
||||
userdata: ?*anyopaque,
|
||||
cond: *Io.Condition,
|
||||
mutex: *Io.Mutex,
|
||||
timeout: ?u64,
|
||||
) Io.Condition.WaitError!void {
|
||||
const pool: *std.Thread.Pool = @alignCast(@ptrCast(userdata));
|
||||
comptime assert(@TypeOf(cond.state) == u64);
|
||||
const ints: *[2]std.atomic.Value(u32) = @ptrCast(&cond.state);
|
||||
const cond_state = &ints[0];
|
||||
const cond_epoch = &ints[1];
|
||||
const one_waiter = 1;
|
||||
const waiter_mask = 0xffff;
|
||||
const one_signal = 1 << 16;
|
||||
const signal_mask = 0xffff << 16;
|
||||
// Observe the epoch, then check the state again to see if we should wake up.
|
||||
// The epoch must be observed before we check the state or we could potentially miss a wake() and deadlock:
|
||||
//
|
||||
// - T1: s = LOAD(&state)
|
||||
// - T2: UPDATE(&s, signal)
|
||||
// - T2: UPDATE(&epoch, 1) + FUTEX_WAKE(&epoch)
|
||||
// - T1: e = LOAD(&epoch) (was reordered after the state load)
|
||||
// - T1: s & signals == 0 -> FUTEX_WAIT(&epoch, e) (missed the state update + the epoch change)
|
||||
//
|
||||
// Acquire barrier to ensure the epoch load happens before the state load.
|
||||
var epoch = cond_epoch.load(.acquire);
|
||||
var state = cond_state.fetchAdd(one_waiter, .monotonic);
|
||||
assert(state & waiter_mask != waiter_mask);
|
||||
state += one_waiter;
|
||||
|
||||
mutexUnlock(pool, mutex);
|
||||
defer mutexLockInternal(pool, mutex);
|
||||
|
||||
var futex_deadline = std.Thread.Futex.Deadline.init(timeout);
|
||||
|
||||
while (true) {
|
||||
futex_deadline.wait(cond_epoch, epoch) catch |err| switch (err) {
|
||||
// On timeout, we must decrement the waiter we added above.
|
||||
error.Timeout => {
|
||||
while (true) {
|
||||
// If there's a signal when we're timing out, consume it and report being woken up instead.
|
||||
// Acquire barrier ensures code before the wake() which added the signal happens before we decrement it and return.
|
||||
while (state & signal_mask != 0) {
|
||||
const new_state = state - one_waiter - one_signal;
|
||||
state = cond_state.cmpxchgWeak(state, new_state, .acquire, .monotonic) orelse return;
|
||||
}
|
||||
|
||||
// Remove the waiter we added and officially return timed out.
|
||||
const new_state = state - one_waiter;
|
||||
state = cond_state.cmpxchgWeak(state, new_state, .monotonic, .monotonic) orelse return err;
|
||||
}
|
||||
},
|
||||
};
|
||||
|
||||
epoch = cond_epoch.load(.acquire);
|
||||
state = cond_state.load(.monotonic);
|
||||
|
||||
// Try to wake up by consuming a signal and decremented the waiter we added previously.
|
||||
// Acquire barrier ensures code before the wake() which added the signal happens before we decrement it and return.
|
||||
while (state & signal_mask != 0) {
|
||||
const new_state = state - one_waiter - one_signal;
|
||||
state = cond_state.cmpxchgWeak(state, new_state, .acquire, .monotonic) orelse return;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn conditionWake(userdata: ?*anyopaque, cond: *Io.Condition, notify: Io.Condition.Notify) void {
|
||||
const pool: *std.Thread.Pool = @alignCast(@ptrCast(userdata));
|
||||
_ = pool;
|
||||
comptime assert(@TypeOf(cond.state) == u64);
|
||||
const ints: *[2]std.atomic.Value(u32) = @ptrCast(&cond.state);
|
||||
const cond_state = &ints[0];
|
||||
const cond_epoch = &ints[1];
|
||||
const one_waiter = 1;
|
||||
const waiter_mask = 0xffff;
|
||||
const one_signal = 1 << 16;
|
||||
const signal_mask = 0xffff << 16;
|
||||
var state = cond_state.load(.monotonic);
|
||||
while (true) {
|
||||
const waiters = (state & waiter_mask) / one_waiter;
|
||||
const signals = (state & signal_mask) / one_signal;
|
||||
|
||||
// Reserves which waiters to wake up by incrementing the signals count.
|
||||
// Therefore, the signals count is always less than or equal to the waiters count.
|
||||
// We don't need to Futex.wake if there's nothing to wake up or if other wake() threads have reserved to wake up the current waiters.
|
||||
const wakeable = waiters - signals;
|
||||
if (wakeable == 0) {
|
||||
return;
|
||||
}
|
||||
|
||||
const to_wake = switch (notify) {
|
||||
.one => 1,
|
||||
.all => wakeable,
|
||||
};
|
||||
|
||||
// Reserve the amount of waiters to wake by incrementing the signals count.
|
||||
// Release barrier ensures code before the wake() happens before the signal it posted and consumed by the wait() threads.
|
||||
const new_state = state + (one_signal * to_wake);
|
||||
state = cond_state.cmpxchgWeak(state, new_state, .release, .monotonic) orelse {
|
||||
// Wake up the waiting threads we reserved above by changing the epoch value.
|
||||
// NOTE: a waiting thread could miss a wake up if *exactly* ((1<<32)-1) wake()s happen between it observing the epoch and sleeping on it.
|
||||
// This is very unlikely due to how many precise amount of Futex.wake() calls that would be between the waiting thread's potential preemption.
|
||||
//
|
||||
// Release barrier ensures the signal being added to the state happens before the epoch is changed.
|
||||
// If not, the waiting thread could potentially deadlock from missing both the state and epoch change:
|
||||
//
|
||||
// - T2: UPDATE(&epoch, 1) (reordered before the state change)
|
||||
// - T1: e = LOAD(&epoch)
|
||||
// - T1: s = LOAD(&state)
|
||||
// - T2: UPDATE(&state, signal) + FUTEX_WAKE(&epoch)
|
||||
// - T1: s & signals == 0 -> FUTEX_WAIT(&epoch, e) (missed both epoch change and state change)
|
||||
_ = cond_epoch.fetchAdd(1, .release);
|
||||
std.Thread.Futex.wake(cond_epoch, to_wake);
|
||||
return;
|
||||
};
|
||||
}
|
||||
}
|
||||
|
||||
fn createFile(
|
||||
userdata: ?*anyopaque,
|
||||
dir: std.fs.Dir,
|
||||
sub_path: []const u8,
|
||||
@ -532,7 +703,7 @@ pub fn createFile(
|
||||
return dir.createFile(sub_path, flags);
|
||||
}
|
||||
|
||||
pub fn openFile(
|
||||
fn openFile(
|
||||
userdata: ?*anyopaque,
|
||||
dir: std.fs.Dir,
|
||||
sub_path: []const u8,
|
||||
@ -543,13 +714,13 @@ pub fn openFile(
|
||||
return dir.openFile(sub_path, flags);
|
||||
}
|
||||
|
||||
pub fn closeFile(userdata: ?*anyopaque, file: std.fs.File) void {
|
||||
fn closeFile(userdata: ?*anyopaque, file: std.fs.File) void {
|
||||
const pool: *std.Thread.Pool = @alignCast(@ptrCast(userdata));
|
||||
_ = pool;
|
||||
return file.close();
|
||||
}
|
||||
|
||||
pub fn pread(userdata: ?*anyopaque, file: std.fs.File, buffer: []u8, offset: std.posix.off_t) Io.FilePReadError!usize {
|
||||
fn pread(userdata: ?*anyopaque, file: std.fs.File, buffer: []u8, offset: std.posix.off_t) Io.FilePReadError!usize {
|
||||
const pool: *std.Thread.Pool = @alignCast(@ptrCast(userdata));
|
||||
try pool.checkCancel();
|
||||
return switch (offset) {
|
||||
@ -558,7 +729,7 @@ pub fn pread(userdata: ?*anyopaque, file: std.fs.File, buffer: []u8, offset: std
|
||||
};
|
||||
}
|
||||
|
||||
pub fn pwrite(userdata: ?*anyopaque, file: std.fs.File, buffer: []const u8, offset: std.posix.off_t) Io.FilePWriteError!usize {
|
||||
fn pwrite(userdata: ?*anyopaque, file: std.fs.File, buffer: []const u8, offset: std.posix.off_t) Io.FilePWriteError!usize {
|
||||
const pool: *std.Thread.Pool = @alignCast(@ptrCast(userdata));
|
||||
try pool.checkCancel();
|
||||
return switch (offset) {
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user