From 12508025a4b3a841819b913e4ed88810ca04ba79 Mon Sep 17 00:00:00 2001 From: kprotty <45520026+kprotty@users.noreply.github.com> Date: Sun, 11 Oct 2020 19:16:07 -0500 Subject: [PATCH] Add more comments & cleanup AutoResetEvent --- lib/std/auto_reset_event.zig | 180 +++++++++++++++++++++-------------- lib/std/event/loop.zig | 3 + 2 files changed, 109 insertions(+), 74 deletions(-) diff --git a/lib/std/auto_reset_event.zig b/lib/std/auto_reset_event.zig index 729f5de1a2..df528e40a5 100644 --- a/lib/std/auto_reset_event.zig +++ b/lib/std/auto_reset_event.zig @@ -43,99 +43,132 @@ pub const AutoResetEvent = struct { const event_align = std.math.max(@alignOf(std.ResetEvent), 2); pub fn wait(self: *AutoResetEvent) void { - self.waitInner(null) catch unreachable; + self.waitFor(null) catch unreachable; } pub fn timedWait(self: *AutoResetEvent, timeout: u64) error{TimedOut}!void { - return self.waitInner(timeout); + return self.waitFor(timeout); } - fn waitInner(self: *AutoResetEvent, timeout: ?u64) error{TimedOut}!void { - // the local ResetEvent is lazily initialized - var has_reset_event = false; + fn waitFor(self: *AutoResetEvent, timeout: ?u64) error{TimedOut}!void { + // lazily initialized std.ResetEvent var reset_event: std.ResetEvent align(event_align) = undefined; + var has_reset_event = false; defer if (has_reset_event) { reset_event.deinit(); }; var state = @atomicLoad(usize, &self.state, .SeqCst); while (true) { - switch (state) { - UNSET => { - if (!has_reset_event) { - has_reset_event = true; - reset_event = std.ResetEvent.init(); - } - state = @cmpxchgWeak( - usize, - &self.state, - state, - @ptrToInt(&reset_event), - .SeqCst, - .SeqCst, - ) orelse { - if (timeout) |timeout_ns| { - reset_event.timedWait(timeout_ns) catch { - state = @cmpxchgStrong( - usize, - &self.state, - @ptrToInt(&reset_event), - UNSET, - .SeqCst, - .SeqCst, - ) orelse return error.TimedOut; - assert(state == SET); - reset_event.wait(); - }; - } else { - reset_event.wait(); - } - return; - }; - }, - SET => { - @atomicStore(usize, &self.state, UNSET, .SeqCst); - return; - }, - else => { - unreachable; // multiple waiters on the same Event - } + // consume a notification if there is any + if (state == SET) { + @atomicStore(usize, &self.state, UNSET, .SeqCst); + return; } + + // check if theres currently a pending ResetEvent pointer already registered + if (state != UNSET) { + unreachable; // multiple waiting threads on the same AutoResetEvent + } + + // lazily initialize the ResetEvent if it hasn't been already + if (!has_reset_event) { + has_reset_event = true; + reset_event = std.ResetEvent.init(); + } + + // Since the AutoResetEvent currently isnt set, + // try to register our ResetEvent on it to wait + // for a set() call from another thread. + if (@cmpxchgWeak( + usize, + &self.state, + UNSET, + @ptrToInt(&reset_event), + .SeqCst, + .SeqCst, + )) |new_state| { + state = new_state; + continue; + } + + // if no timeout was specified, then just wait forever + const timeout_ns = timeout orelse { + reset_event.wait(); + return; + }; + + // wait with a timeout and return if signalled via set() + if (reset_event.timedWait(timeout_ns)) |_| { + return; + } else |timed_out| {} + + // If we timed out, we need to transition the AutoResetEvent back to UNSET. + // If we don't, then when we return, a set() thread could observe a pointer to an invalid ResetEvent. + state = @cmpxchgStrong( + usize, + &self.state, + @ptrToInt(&reset_event), + UNSET, + .SeqCst, + .SeqCst, + ) orelse return error.TimedOut; + + // We didn't manage to unregister ourselves from the state. + if (state == SET) { + unreachable; // AutoResetEvent notified without waking up the waiting thread + } else if (state != UNSET) { + unreachable; // multiple waiting threads on the same AutoResetEvent observed when timing out + } + + // This menas a set() thread saw our ResetEvent pointer, acquired it, and is trying to wake it up. + // We need to wait for it to wake up our ResetEvent before we can return and invalidate it. + // We don't return error.TimedOut here as it technically notified us while we were "timing out". + reset_event.wait(); + return; } } pub fn set(self: *AutoResetEvent) void { var state = @atomicLoad(usize, &self.state, .SeqCst); while (true) { - switch (state) { - UNSET => { - state = @cmpxchgWeak( - usize, - &self.state, - state, - SET, - .SeqCst, - .SeqCst, - ) orelse return; - }, - SET => { - return; - }, - else => |reset_event_ptr| { - state = @cmpxchgWeak( - usize, - &self.state, - state, - UNSET, - .SeqCst, - .SeqCst, - ) orelse { - const reset_event = @intToPtr(*align(event_align) std.ResetEvent, reset_event_ptr); - reset_event.set(); - return; - }; - } + // If the AutoResetEvent is already set, there is nothing else left to do + if (state == SET) { + return; } + + // If the AutoResetEvent isn't set, + // then try to leave a notification for the wait() thread that we set() it. + if (state == UNSET) { + state = @cmpxchgWeak( + usize, + &self.state, + UNSET, + SET, + .SeqCst, + .SeqCst, + ) orelse return; + continue; + } + + // There is a ResetEvent pointer registered on the AutoResetEvent event thats waiting. + // Try to acquire ownership of it so that we can wake it up. + // This also resets the AutoResetEvent so that there is no race condition as defined above. + if (@cmpxchgWeak( + usize, + &self.state, + state, + UNSET, + .SeqCst, + .SeqCst, + )) |new_state| { + state = new_state; + continue; + } + + const reset_event = @intToPtr(*align(event_align) std.ResetEvent, state); + reset_event.set(); + return; } } }; @@ -161,7 +194,6 @@ test "std.AutoResetEvent" { const Self = @This(); fn sender(self: *Self) void { - std.debug.print("\n", .{}); testing.expect(self.value == 0); self.value = 1; self.out.set(); diff --git a/lib/std/event/loop.zig b/lib/std/event/loop.zig index c9911a174e..536aa6d7c2 100644 --- a/lib/std/event/loop.zig +++ b/lib/std/event/loop.zig @@ -862,8 +862,11 @@ pub const Loop = struct { const held = self.entries.mutex.acquire(); defer held.release(); + // starting from the head var head = self.entries.head orelse return null; + // traverse the list of waiting entires to + // find the Node with the smallest `expires` field var min = head; while (head.next) |node| { const minEntry = @fieldParentPtr(Entry, "node", min);