std.Io.Threaded: implement ResetEvent in terms of pthreads

needed for NetBSD
This commit is contained in:
Andrew Kelley 2025-10-27 21:17:44 -07:00
parent 30448d92af
commit a28d3059e6
2 changed files with 123 additions and 34 deletions

View File

@ -5787,7 +5787,13 @@ pub fn futexWake(ptr: *const std.atomic.Value(u32), max_waiters: u32) void {
///
/// It can also block threads until the value is set with cancelation via timed
/// waits. Statically initializable; four bytes on all targets.
pub const ResetEvent = enum(u32) {
pub const ResetEvent = switch (native_os) {
.netbsd => ResetEventPosix,
else => ResetEventFutex,
};
/// A `ResetEvent` implementation based on futexes.
const ResetEventFutex = enum(u32) {
unset = 0,
waiting = 1,
is_set = 2,
@ -5798,15 +5804,15 @@ pub const ResetEvent = enum(u32) {
///
/// The memory accesses before the `set` can be said to happen before
/// `isSet` returns true.
pub fn isSet(re: *const ResetEvent) bool {
if (builtin.single_threaded) return switch (re.*) {
pub fn isSet(ref: *const ResetEventFutex) bool {
if (builtin.single_threaded) return switch (ref.*) {
.unset => false,
.waiting => unreachable,
.is_set => true,
};
// Acquire barrier ensures memory accesses before `set` happen before
// returning true.
return @atomicLoad(ResetEvent, re, .acquire) == .is_set;
return @atomicLoad(ResetEventFutex, ref, .acquire) == .is_set;
}
/// Blocks the calling thread until `set` is called.
@ -5814,51 +5820,51 @@ pub const ResetEvent = enum(u32) {
/// This is effectively a more efficient version of `while (!isSet()) {}`.
///
/// The memory accesses before the `set` can be said to happen before `wait` returns.
pub fn wait(re: *ResetEvent, t: *Threaded) Io.Cancelable!void {
if (builtin.single_threaded) switch (re.*) {
pub fn wait(ref: *ResetEventFutex, t: *Threaded) Io.Cancelable!void {
if (builtin.single_threaded) switch (ref.*) {
.unset => unreachable, // Deadlock, no other threads to wake us up.
.waiting => unreachable, // Invalid state.
.is_set => return,
};
if (re.isSet()) {
// Try to set the state from `unset` to `waiting` to indicate to the
// `set` thread that others are blocked on the ResetEventFutex. Avoid using
// any strict barriers until we know the ResetEventFutex is set.
var state = @atomicLoad(ResetEventFutex, ref, .acquire);
if (state == .is_set) {
@branchHint(.likely);
return;
}
// Try to set the state from `unset` to `waiting` to indicate to the
// `set` thread that others are blocked on the ResetEvent. Avoid using
// any strict barriers until we know the ResetEvent is set.
var state = @atomicLoad(ResetEvent, re, .acquire);
if (state == .unset) {
state = @cmpxchgStrong(ResetEvent, re, state, .waiting, .acquire, .acquire) orelse .waiting;
state = @cmpxchgStrong(ResetEventFutex, ref, state, .waiting, .acquire, .acquire) orelse .waiting;
}
while (state == .waiting) {
try futexWait(t, @ptrCast(re), @intFromEnum(ResetEvent.waiting));
state = @atomicLoad(ResetEvent, re, .acquire);
try futexWait(t, @ptrCast(ref), @intFromEnum(ResetEventFutex.waiting));
state = @atomicLoad(ResetEventFutex, ref, .acquire);
}
assert(state == .is_set);
}
/// Same as `wait` except uninterruptible.
pub fn waitUncancelable(re: *ResetEvent) void {
if (builtin.single_threaded) switch (re.*) {
pub fn waitUncancelable(ref: *ResetEventFutex) void {
if (builtin.single_threaded) switch (ref.*) {
.unset => unreachable, // Deadlock, no other threads to wake us up.
.waiting => unreachable, // Invalid state.
.is_set => return,
};
if (re.isSet()) {
// Try to set the state from `unset` to `waiting` to indicate to the
// `set` thread that others are blocked on the ResetEventFutex. Avoid using
// any strict barriers until we know the ResetEventFutex is set.
var state = @atomicLoad(ResetEventFutex, ref, .acquire);
if (state == .is_set) {
@branchHint(.likely);
return;
}
// Try to set the state from `unset` to `waiting` to indicate to the
// `set` thread that others are blocked on the ResetEvent. Avoid using
// any strict barriers until we know the ResetEvent is set.
var state = @atomicLoad(ResetEvent, re, .acquire);
if (state == .unset) {
state = @cmpxchgStrong(ResetEvent, re, state, .waiting, .acquire, .acquire) orelse .waiting;
state = @cmpxchgStrong(ResetEventFutex, ref, state, .waiting, .acquire, .acquire) orelse .waiting;
}
while (state == .waiting) {
futexWaitUncancelable(@ptrCast(re), @intFromEnum(ResetEvent.waiting));
state = @atomicLoad(ResetEvent, re, .acquire);
futexWaitUncancelable(@ptrCast(ref), @intFromEnum(ResetEventFutex.waiting));
state = @atomicLoad(ResetEventFutex, ref, .acquire);
}
assert(state == .is_set);
}
@ -5871,26 +5877,109 @@ pub const ResetEvent = enum(u32) {
///
/// The memory accesses before `set` can be said to happen before `isSet`
/// returns true or `wait`/`timedWait` return successfully.
pub fn set(re: *ResetEvent) void {
pub fn set(ref: *ResetEventFutex) void {
if (builtin.single_threaded) {
re.* = .is_set;
ref.* = .is_set;
return;
}
if (@atomicRmw(ResetEvent, re, .Xchg, .is_set, .release) == .waiting) {
futexWake(@ptrCast(re), std.math.maxInt(u32));
if (@atomicRmw(ResetEventFutex, ref, .Xchg, .is_set, .release) == .waiting) {
futexWake(@ptrCast(ref), std.math.maxInt(u32));
}
}
/// Unmarks the ResetEvent as if `set` was never called.
/// Unmarks the ResetEventFutex as if `set` was never called.
///
/// Assumes no threads are blocked in `wait` or `timedWait`. Concurrent
/// calls to `set`, `isSet` and `reset` are allowed.
pub fn reset(re: *ResetEvent) void {
pub fn reset(ref: *ResetEventFutex) void {
if (builtin.single_threaded) {
re.* = .unset;
ref.* = .unset;
return;
}
@atomicStore(ResetEvent, re, .unset, .monotonic);
@atomicStore(ResetEventFutex, ref, .unset, .monotonic);
}
};
/// A `ResetEvent` implementation based on pthreads API.
const ResetEventPosix = struct {
cond: std.c.pthread_cond_t,
mutex: std.c.pthread_mutex_t,
state: ResetEventFutex,
pub const unset: ResetEventPosix = .{
.cond = std.c.PTHREAD_COND_INITIALIZER,
.mutex = std.c.PTHREAD_MUTEX_INITIALIZER,
.state = .unset,
};
pub fn isSet(rep: *const ResetEventPosix) bool {
if (builtin.single_threaded) return switch (rep.state) {
.unset => false,
.waiting => unreachable,
.is_set => true,
};
return @atomicLoad(ResetEventFutex, &rep.state, .acquire) == .is_set;
}
pub fn wait(rep: *ResetEventPosix, t: *Threaded) Io.Cancelable!void {
if (builtin.single_threaded) switch (rep.*) {
.unset => unreachable, // Deadlock, no other threads to wake us up.
.waiting => unreachable, // Invalid state.
.is_set => return,
};
assert(std.c.pthread_mutex_lock(&rep.mutex) == .SUCCESS);
defer assert(std.c.pthread_mutex_unlock(&rep.mutex) == .SUCCESS);
sw: switch (rep.state) {
.unset => {
rep.state = .waiting;
continue :sw .waiting;
},
.waiting => {
try t.checkCancel();
assert(std.c.pthread_cond_wait(&rep.cond, &rep.mutex) == .SUCCESS);
continue :sw rep.state;
},
.is_set => return,
}
}
pub fn waitUncancelable(rep: *ResetEventPosix) void {
if (builtin.single_threaded) switch (rep.*) {
.unset => unreachable, // Deadlock, no other threads to wake us up.
.waiting => unreachable, // Invalid state.
.is_set => return,
};
assert(std.c.pthread_mutex_lock(&rep.mutex) == .SUCCESS);
defer assert(std.c.pthread_mutex_unlock(&rep.mutex) == .SUCCESS);
sw: switch (rep.state) {
.unset => {
rep.state = .waiting;
continue :sw .waiting;
},
.waiting => {
assert(std.c.pthread_cond_wait(&rep.cond, &rep.mutex) == .SUCCESS);
continue :sw rep.state;
},
.is_set => return,
}
}
pub fn set(rep: *ResetEventPosix) void {
if (builtin.single_threaded) {
rep.* = .is_set;
return;
}
if (@atomicRmw(ResetEventFutex, &rep.state, .Xchg, .is_set, .release) == .waiting) {
assert(std.c.pthread_cond_broadcast(&rep.cond) == .SUCCESS);
}
}
pub fn reset(rep: *ResetEventPosix) void {
if (builtin.single_threaded) {
rep.* = .unset;
return;
}
@atomicStore(ResetEventFutex, &rep.state, .unset, .monotonic);
}
};

View File

@ -10874,13 +10874,13 @@ pub extern "c" fn dn_expand(
length: c_int,
) c_int;
pub const PTHREAD_MUTEX_INITIALIZER = pthread_mutex_t{};
pub const PTHREAD_MUTEX_INITIALIZER: pthread_mutex_t = .{};
pub extern "c" fn pthread_mutex_lock(mutex: *pthread_mutex_t) E;
pub extern "c" fn pthread_mutex_unlock(mutex: *pthread_mutex_t) E;
pub extern "c" fn pthread_mutex_trylock(mutex: *pthread_mutex_t) E;
pub extern "c" fn pthread_mutex_destroy(mutex: *pthread_mutex_t) E;
pub const PTHREAD_COND_INITIALIZER = pthread_cond_t{};
pub const PTHREAD_COND_INITIALIZER: pthread_cond_t = .{};
pub extern "c" fn pthread_cond_wait(noalias cond: *pthread_cond_t, noalias mutex: *pthread_mutex_t) E;
pub extern "c" fn pthread_cond_timedwait(noalias cond: *pthread_cond_t, noalias mutex: *pthread_mutex_t, noalias abstime: *const timespec) E;
pub extern "c" fn pthread_cond_signal(cond: *pthread_cond_t) E;