From a28d3059e60ccbed2b2b159c41899488330bc671 Mon Sep 17 00:00:00 2001 From: Andrew Kelley Date: Mon, 27 Oct 2025 21:17:44 -0700 Subject: [PATCH] std.Io.Threaded: implement ResetEvent in terms of pthreads needed for NetBSD --- lib/std/Io/Threaded.zig | 153 +++++++++++++++++++++++++++++++--------- lib/std/c.zig | 4 +- 2 files changed, 123 insertions(+), 34 deletions(-) diff --git a/lib/std/Io/Threaded.zig b/lib/std/Io/Threaded.zig index 9becb59a7a..35fecbedd9 100644 --- a/lib/std/Io/Threaded.zig +++ b/lib/std/Io/Threaded.zig @@ -5787,7 +5787,13 @@ pub fn futexWake(ptr: *const std.atomic.Value(u32), max_waiters: u32) void { /// /// It can also block threads until the value is set with cancelation via timed /// waits. Statically initializable; four bytes on all targets. -pub const ResetEvent = enum(u32) { +pub const ResetEvent = switch (native_os) { + .netbsd => ResetEventPosix, + else => ResetEventFutex, +}; + +/// A `ResetEvent` implementation based on futexes. +const ResetEventFutex = enum(u32) { unset = 0, waiting = 1, is_set = 2, @@ -5798,15 +5804,15 @@ pub const ResetEvent = enum(u32) { /// /// The memory accesses before the `set` can be said to happen before /// `isSet` returns true. - pub fn isSet(re: *const ResetEvent) bool { - if (builtin.single_threaded) return switch (re.*) { + pub fn isSet(ref: *const ResetEventFutex) bool { + if (builtin.single_threaded) return switch (ref.*) { .unset => false, .waiting => unreachable, .is_set => true, }; // Acquire barrier ensures memory accesses before `set` happen before // returning true. - return @atomicLoad(ResetEvent, re, .acquire) == .is_set; + return @atomicLoad(ResetEventFutex, ref, .acquire) == .is_set; } /// Blocks the calling thread until `set` is called. @@ -5814,51 +5820,51 @@ pub const ResetEvent = enum(u32) { /// This is effectively a more efficient version of `while (!isSet()) {}`. /// /// The memory accesses before the `set` can be said to happen before `wait` returns. - pub fn wait(re: *ResetEvent, t: *Threaded) Io.Cancelable!void { - if (builtin.single_threaded) switch (re.*) { + pub fn wait(ref: *ResetEventFutex, t: *Threaded) Io.Cancelable!void { + if (builtin.single_threaded) switch (ref.*) { .unset => unreachable, // Deadlock, no other threads to wake us up. .waiting => unreachable, // Invalid state. .is_set => return, }; - if (re.isSet()) { + // Try to set the state from `unset` to `waiting` to indicate to the + // `set` thread that others are blocked on the ResetEventFutex. Avoid using + // any strict barriers until we know the ResetEventFutex is set. + var state = @atomicLoad(ResetEventFutex, ref, .acquire); + if (state == .is_set) { @branchHint(.likely); return; } - // Try to set the state from `unset` to `waiting` to indicate to the - // `set` thread that others are blocked on the ResetEvent. Avoid using - // any strict barriers until we know the ResetEvent is set. - var state = @atomicLoad(ResetEvent, re, .acquire); if (state == .unset) { - state = @cmpxchgStrong(ResetEvent, re, state, .waiting, .acquire, .acquire) orelse .waiting; + state = @cmpxchgStrong(ResetEventFutex, ref, state, .waiting, .acquire, .acquire) orelse .waiting; } while (state == .waiting) { - try futexWait(t, @ptrCast(re), @intFromEnum(ResetEvent.waiting)); - state = @atomicLoad(ResetEvent, re, .acquire); + try futexWait(t, @ptrCast(ref), @intFromEnum(ResetEventFutex.waiting)); + state = @atomicLoad(ResetEventFutex, ref, .acquire); } assert(state == .is_set); } /// Same as `wait` except uninterruptible. - pub fn waitUncancelable(re: *ResetEvent) void { - if (builtin.single_threaded) switch (re.*) { + pub fn waitUncancelable(ref: *ResetEventFutex) void { + if (builtin.single_threaded) switch (ref.*) { .unset => unreachable, // Deadlock, no other threads to wake us up. .waiting => unreachable, // Invalid state. .is_set => return, }; - if (re.isSet()) { + // Try to set the state from `unset` to `waiting` to indicate to the + // `set` thread that others are blocked on the ResetEventFutex. Avoid using + // any strict barriers until we know the ResetEventFutex is set. + var state = @atomicLoad(ResetEventFutex, ref, .acquire); + if (state == .is_set) { @branchHint(.likely); return; } - // Try to set the state from `unset` to `waiting` to indicate to the - // `set` thread that others are blocked on the ResetEvent. Avoid using - // any strict barriers until we know the ResetEvent is set. - var state = @atomicLoad(ResetEvent, re, .acquire); if (state == .unset) { - state = @cmpxchgStrong(ResetEvent, re, state, .waiting, .acquire, .acquire) orelse .waiting; + state = @cmpxchgStrong(ResetEventFutex, ref, state, .waiting, .acquire, .acquire) orelse .waiting; } while (state == .waiting) { - futexWaitUncancelable(@ptrCast(re), @intFromEnum(ResetEvent.waiting)); - state = @atomicLoad(ResetEvent, re, .acquire); + futexWaitUncancelable(@ptrCast(ref), @intFromEnum(ResetEventFutex.waiting)); + state = @atomicLoad(ResetEventFutex, ref, .acquire); } assert(state == .is_set); } @@ -5871,26 +5877,109 @@ pub const ResetEvent = enum(u32) { /// /// The memory accesses before `set` can be said to happen before `isSet` /// returns true or `wait`/`timedWait` return successfully. - pub fn set(re: *ResetEvent) void { + pub fn set(ref: *ResetEventFutex) void { if (builtin.single_threaded) { - re.* = .is_set; + ref.* = .is_set; return; } - if (@atomicRmw(ResetEvent, re, .Xchg, .is_set, .release) == .waiting) { - futexWake(@ptrCast(re), std.math.maxInt(u32)); + if (@atomicRmw(ResetEventFutex, ref, .Xchg, .is_set, .release) == .waiting) { + futexWake(@ptrCast(ref), std.math.maxInt(u32)); } } - /// Unmarks the ResetEvent as if `set` was never called. + /// Unmarks the ResetEventFutex as if `set` was never called. /// /// Assumes no threads are blocked in `wait` or `timedWait`. Concurrent /// calls to `set`, `isSet` and `reset` are allowed. - pub fn reset(re: *ResetEvent) void { + pub fn reset(ref: *ResetEventFutex) void { if (builtin.single_threaded) { - re.* = .unset; + ref.* = .unset; return; } - @atomicStore(ResetEvent, re, .unset, .monotonic); + @atomicStore(ResetEventFutex, ref, .unset, .monotonic); + } +}; + +/// A `ResetEvent` implementation based on pthreads API. +const ResetEventPosix = struct { + cond: std.c.pthread_cond_t, + mutex: std.c.pthread_mutex_t, + state: ResetEventFutex, + + pub const unset: ResetEventPosix = .{ + .cond = std.c.PTHREAD_COND_INITIALIZER, + .mutex = std.c.PTHREAD_MUTEX_INITIALIZER, + .state = .unset, + }; + + pub fn isSet(rep: *const ResetEventPosix) bool { + if (builtin.single_threaded) return switch (rep.state) { + .unset => false, + .waiting => unreachable, + .is_set => true, + }; + return @atomicLoad(ResetEventFutex, &rep.state, .acquire) == .is_set; + } + + pub fn wait(rep: *ResetEventPosix, t: *Threaded) Io.Cancelable!void { + if (builtin.single_threaded) switch (rep.*) { + .unset => unreachable, // Deadlock, no other threads to wake us up. + .waiting => unreachable, // Invalid state. + .is_set => return, + }; + assert(std.c.pthread_mutex_lock(&rep.mutex) == .SUCCESS); + defer assert(std.c.pthread_mutex_unlock(&rep.mutex) == .SUCCESS); + sw: switch (rep.state) { + .unset => { + rep.state = .waiting; + continue :sw .waiting; + }, + .waiting => { + try t.checkCancel(); + assert(std.c.pthread_cond_wait(&rep.cond, &rep.mutex) == .SUCCESS); + continue :sw rep.state; + }, + .is_set => return, + } + } + + pub fn waitUncancelable(rep: *ResetEventPosix) void { + if (builtin.single_threaded) switch (rep.*) { + .unset => unreachable, // Deadlock, no other threads to wake us up. + .waiting => unreachable, // Invalid state. + .is_set => return, + }; + assert(std.c.pthread_mutex_lock(&rep.mutex) == .SUCCESS); + defer assert(std.c.pthread_mutex_unlock(&rep.mutex) == .SUCCESS); + sw: switch (rep.state) { + .unset => { + rep.state = .waiting; + continue :sw .waiting; + }, + .waiting => { + assert(std.c.pthread_cond_wait(&rep.cond, &rep.mutex) == .SUCCESS); + continue :sw rep.state; + }, + .is_set => return, + } + } + + pub fn set(rep: *ResetEventPosix) void { + if (builtin.single_threaded) { + rep.* = .is_set; + return; + } + if (@atomicRmw(ResetEventFutex, &rep.state, .Xchg, .is_set, .release) == .waiting) { + assert(std.c.pthread_cond_broadcast(&rep.cond) == .SUCCESS); + } + } + + pub fn reset(rep: *ResetEventPosix) void { + if (builtin.single_threaded) { + rep.* = .unset; + return; + } + @atomicStore(ResetEventFutex, &rep.state, .unset, .monotonic); } }; diff --git a/lib/std/c.zig b/lib/std/c.zig index 89b28105ae..3eb2d76b3f 100644 --- a/lib/std/c.zig +++ b/lib/std/c.zig @@ -10874,13 +10874,13 @@ pub extern "c" fn dn_expand( length: c_int, ) c_int; -pub const PTHREAD_MUTEX_INITIALIZER = pthread_mutex_t{}; +pub const PTHREAD_MUTEX_INITIALIZER: pthread_mutex_t = .{}; pub extern "c" fn pthread_mutex_lock(mutex: *pthread_mutex_t) E; pub extern "c" fn pthread_mutex_unlock(mutex: *pthread_mutex_t) E; pub extern "c" fn pthread_mutex_trylock(mutex: *pthread_mutex_t) E; pub extern "c" fn pthread_mutex_destroy(mutex: *pthread_mutex_t) E; -pub const PTHREAD_COND_INITIALIZER = pthread_cond_t{}; +pub const PTHREAD_COND_INITIALIZER: pthread_cond_t = .{}; pub extern "c" fn pthread_cond_wait(noalias cond: *pthread_cond_t, noalias mutex: *pthread_mutex_t) E; pub extern "c" fn pthread_cond_timedwait(noalias cond: *pthread_cond_t, noalias mutex: *pthread_mutex_t, noalias abstime: *const timespec) E; pub extern "c" fn pthread_cond_signal(cond: *pthread_cond_t) E;