diff --git a/lib/std/Io.zig b/lib/std/Io.zig index 21309135bf..d1e3947609 100644 --- a/lib/std/Io.zig +++ b/lib/std/Io.zig @@ -982,8 +982,8 @@ pub const VTable = struct { mutexLock: *const fn (?*anyopaque, prev_state: Mutex.State, mutex: *Mutex) Cancelable!void, mutexUnlock: *const fn (?*anyopaque, prev_state: Mutex.State, mutex: *Mutex) void, - conditionWait: *const fn (?*anyopaque, cond: *Condition, mutex: *Mutex, timeout_ns: ?u64) Condition.WaitError!void, - conditionWake: *const fn (?*anyopaque, cond: *Condition, notify: Condition.Notify) void, + conditionWait: *const fn (?*anyopaque, cond: *Condition, mutex: *Mutex) Cancelable!void, + conditionWake: *const fn (?*anyopaque, cond: *Condition) void, createFile: *const fn (?*anyopaque, dir: fs.Dir, sub_path: []const u8, flags: fs.File.CreateFlags) FileOpenError!fs.File, openFile: *const fn (?*anyopaque, dir: fs.Dir, sub_path: []const u8, flags: fs.File.OpenFlags) FileOpenError!fs.File, @@ -995,6 +995,11 @@ pub const VTable = struct { sleep: *const fn (?*anyopaque, clockid: std.posix.clockid_t, deadline: Deadline) SleepError!void, }; +pub const Cancelable = error{ + /// Caller has requested the async operation to stop. + Canceled, +}; + pub const OpenFlags = fs.File.OpenFlags; pub const CreateFlags = fs.File.CreateFlags; @@ -1148,43 +1153,18 @@ pub const Mutex = if (true) struct { } }; +/// Supports exactly 1 waiter. More than 1 simultaneous wait on the same +/// condition is illegal. pub const Condition = struct { state: u64 = 0, - pub const WaitError = error{ - Timeout, - Canceled, - }; - - /// How many waiters to wake up. - pub const Notify = enum { - one, - all, - }; - pub fn wait(cond: *Condition, io: Io, mutex: *Mutex) Cancelable!void { - io.vtable.conditionWait(io.userdata, cond, mutex, null) catch |err| switch (err) { - error.Timeout => unreachable, // no timeout provided so we shouldn't have timed-out - error.Canceled => return error.Canceled, - }; + return io.vtable.conditionWait(io.userdata, cond, mutex); } - pub fn timedWait(cond: *Condition, io: Io, mutex: *Mutex, timeout_ns: u64) WaitError!void { - return io.vtable.conditionWait(io.userdata, cond, mutex, timeout_ns); + pub fn wake(cond: *Condition, io: Io) void { + io.vtable.conditionWake(io.userdata, cond); } - - pub fn signal(cond: *Condition, io: Io) void { - io.vtable.conditionWake(io.userdata, cond, .one); - } - - pub fn broadcast(cond: *Condition, io: Io) void { - io.vtable.conditionWake(io.userdata, cond, .all); - } -}; - -pub const Cancelable = error{ - /// Caller has requested the async operation to stop. - Canceled, }; pub const TypeErasedQueue = struct { @@ -1236,7 +1216,7 @@ pub const TypeErasedQueue = struct { remaining = remaining[copy_len..]; getter.data.remaining = getter.data.remaining[copy_len..]; if (getter.data.remaining.len == 0) { - getter.data.condition.signal(io); + getter.data.condition.wake(io); continue; } q.getters.prepend(getter); @@ -1319,7 +1299,7 @@ pub const TypeErasedQueue = struct { putter.data.remaining = putter.data.remaining[copy_len..]; remaining = remaining[copy_len..]; if (putter.data.remaining.len == 0) { - putter.data.condition.signal(io); + putter.data.condition.wake(io); } else { assert(remaining.len == 0); q.putters.prepend(putter); @@ -1352,7 +1332,7 @@ pub const TypeErasedQueue = struct { putter.data.remaining = putter.data.remaining[copy_len..]; q.put_index += copy_len; if (putter.data.remaining.len == 0) { - putter.data.condition.signal(io); + putter.data.condition.wake(io); continue; } const second_available = q.buffer[0..q.get_index]; @@ -1361,7 +1341,7 @@ pub const TypeErasedQueue = struct { putter.data.remaining = putter.data.remaining[copy_len..]; q.put_index = copy_len; if (putter.data.remaining.len == 0) { - putter.data.condition.signal(io); + putter.data.condition.wake(io); continue; } q.putters.prepend(putter); diff --git a/lib/std/Io/EventLoop.zig b/lib/std/Io/EventLoop.zig index d5b91db476..a27e197b7d 100644 --- a/lib/std/Io/EventLoop.zig +++ b/lib/std/Io/EventLoop.zig @@ -781,7 +781,6 @@ fn go( event_loop.schedule(current_thread, .{ .head = fiber, .tail = fiber }); } - fn @"await"( userdata: ?*anyopaque, any_future: *std.Io.AnyFuture, @@ -1277,24 +1276,24 @@ fn mutexUnlock(userdata: ?*anyopaque, prev_state: Io.Mutex.State, mutex: *Io.Mut el.yield(maybe_waiting_fiber.?, .reschedule); } -fn conditionWait( - userdata: ?*anyopaque, - cond: *Io.Condition, - mutex: *Io.Mutex, - timeout: ?u64, -) Io.Condition.WaitError!void { - _ = userdata; - _ = cond; - _ = mutex; - _ = timeout; - @panic("TODO"); +fn conditionWait(userdata: ?*anyopaque, cond: *Io.Condition, mutex: *Io.Mutex) Io.Cancelable!void { + const el: *EventLoop = @alignCast(@ptrCast(userdata)); + const cond_state: *?*Fiber = @ptrCast(&cond.state); + const thread: *Thread = .current(); + const fiber = thread.currentFiber(); + const prev = @atomicRmw(?*Fiber, cond_state, .Xchg, fiber, .acquire); + assert(prev == null); // More than one wait on same Condition is illegal. + mutex.unlock(io(el)); + el.yield(null, .nothing); + try mutex.lock(io(el)); } -fn conditionWake(userdata: ?*anyopaque, cond: *Io.Condition, notify: Io.Condition.Notify) void { - _ = userdata; - _ = cond; - _ = notify; - @panic("TODO"); +fn conditionWake(userdata: ?*anyopaque, cond: *Io.Condition) void { + const el: *EventLoop = @alignCast(@ptrCast(userdata)); + const cond_state: *?*Fiber = @ptrCast(&cond.state); + if (@atomicRmw(?*Fiber, cond_state, .Xchg, null, .acquire)) |fiber| { + el.yield(fiber, .reschedule); + } } fn errno(signed: i32) std.os.linux.E { diff --git a/lib/std/Thread/Pool.zig b/lib/std/Thread/Pool.zig index bb577f401b..05bc4801b6 100644 --- a/lib/std/Thread/Pool.zig +++ b/lib/std/Thread/Pool.zig @@ -619,12 +619,7 @@ fn mutexUnlock(userdata: ?*anyopaque, prev_state: Io.Mutex.State, mutex: *Io.Mut } } -fn conditionWait( - userdata: ?*anyopaque, - cond: *Io.Condition, - mutex: *Io.Mutex, - timeout: ?u64, -) Io.Condition.WaitError!void { +fn conditionWait(userdata: ?*anyopaque, cond: *Io.Condition, mutex: *Io.Mutex) Io.Cancelable!void { const pool: *std.Thread.Pool = @alignCast(@ptrCast(userdata)); comptime assert(@TypeOf(cond.state) == u64); const ints: *[2]std.atomic.Value(u32) = @ptrCast(&cond.state); @@ -652,25 +647,11 @@ fn conditionWait( mutex.unlock(pool.io()); defer mutex.lock(pool.io()) catch @panic("TODO"); - var futex_deadline = std.Thread.Futex.Deadline.init(timeout); + var futex_deadline = std.Thread.Futex.Deadline.init(null); while (true) { futex_deadline.wait(cond_epoch, epoch) catch |err| switch (err) { - // On timeout, we must decrement the waiter we added above. - error.Timeout => { - while (true) { - // If there's a signal when we're timing out, consume it and report being woken up instead. - // Acquire barrier ensures code before the wake() which added the signal happens before we decrement it and return. - while (state & signal_mask != 0) { - const new_state = state - one_waiter - one_signal; - state = cond_state.cmpxchgWeak(state, new_state, .acquire, .monotonic) orelse return; - } - - // Remove the waiter we added and officially return timed out. - const new_state = state - one_waiter; - state = cond_state.cmpxchgWeak(state, new_state, .monotonic, .monotonic) orelse return err; - } - }, + error.Timeout => unreachable, }; epoch = cond_epoch.load(.acquire); @@ -685,7 +666,7 @@ fn conditionWait( } } -fn conditionWake(userdata: ?*anyopaque, cond: *Io.Condition, notify: Io.Condition.Notify) void { +fn conditionWake(userdata: ?*anyopaque, cond: *Io.Condition) void { const pool: *std.Thread.Pool = @alignCast(@ptrCast(userdata)); _ = pool; comptime assert(@TypeOf(cond.state) == u64); @@ -709,10 +690,7 @@ fn conditionWake(userdata: ?*anyopaque, cond: *Io.Condition, notify: Io.Conditio return; } - const to_wake = switch (notify) { - .one => 1, - .all => wakeable, - }; + const to_wake = 1; // Reserve the amount of waiters to wake by incrementing the signals count. // Release barrier ensures code before the wake() happens before the signal it posted and consumed by the wait() threads.