Merge pull request #6655 from kprotty/timers

Integrate std.time.sleep with the event loop
This commit is contained in:
Andrew Kelley 2020-10-14 21:49:45 -04:00 committed by GitHub
commit 3b4432d9a6
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 391 additions and 1 deletions

View File

@ -0,0 +1,229 @@
// SPDX-License-Identifier: MIT
// Copyright (c) 2015-2020 Zig Contributors
// This file is part of [zig](https://ziglang.org/), which is MIT licensed.
// The MIT license requires this copyright notice to be included in all copies
// and substantial portions of the software.
const std = @import("std.zig");
const builtin = @import("builtin");
const testing = std.testing;
const assert = std.debug.assert;
/// Similar to std.ResetEvent but on `set()` it also (atomically) does `reset()`.
/// Unlike std.ResetEvent, `wait()` can only be called by one thread (MPSC-like).
pub const AutoResetEvent = struct {
// AutoResetEvent has 3 possible states:
// - UNSET: the AutoResetEvent is currently unset
// - SET: the AutoResetEvent was notified before a wait() was called
// - <std.ResetEvent pointer>: there is an active waiter waiting for a notification.
//
// When attempting to wait:
// if the event is unset, it registers a ResetEvent pointer to be notified when the event is set
// if the event is already set, then it consumes the notification and resets the event.
//
// When attempting to notify:
// if the event is unset, then we set the event
// if theres a waiting ResetEvent, then we unset the event and notify the ResetEvent
//
// This ensures that the event is automatically reset after a wait() has been issued
// and avoids the race condition when using std.ResetEvent in the following scenario:
// thread 1 | thread 2
// std.ResetEvent.wait() |
// | std.ResetEvent.set()
// | std.ResetEvent.set()
// std.ResetEvent.reset() |
// std.ResetEvent.wait() | (missed the second .set() notification above)
state: usize = UNSET,
const UNSET = 0;
const SET = 1;
// the minimum alignment for the `*std.ResetEvent` created by wait*()
const event_align = std.math.max(@alignOf(std.ResetEvent), 2);
pub fn wait(self: *AutoResetEvent) void {
self.waitFor(null) catch unreachable;
}
pub fn timedWait(self: *AutoResetEvent, timeout: u64) error{TimedOut}!void {
return self.waitFor(timeout);
}
fn waitFor(self: *AutoResetEvent, timeout: ?u64) error{TimedOut}!void {
// lazily initialized std.ResetEvent
var reset_event: std.ResetEvent align(event_align) = undefined;
var has_reset_event = false;
defer if (has_reset_event) {
reset_event.deinit();
};
var state = @atomicLoad(usize, &self.state, .SeqCst);
while (true) {
// consume a notification if there is any
if (state == SET) {
@atomicStore(usize, &self.state, UNSET, .SeqCst);
return;
}
// check if theres currently a pending ResetEvent pointer already registered
if (state != UNSET) {
unreachable; // multiple waiting threads on the same AutoResetEvent
}
// lazily initialize the ResetEvent if it hasn't been already
if (!has_reset_event) {
has_reset_event = true;
reset_event = std.ResetEvent.init();
}
// Since the AutoResetEvent currently isnt set,
// try to register our ResetEvent on it to wait
// for a set() call from another thread.
if (@cmpxchgWeak(
usize,
&self.state,
UNSET,
@ptrToInt(&reset_event),
.SeqCst,
.SeqCst,
)) |new_state| {
state = new_state;
continue;
}
// if no timeout was specified, then just wait forever
const timeout_ns = timeout orelse {
reset_event.wait();
return;
};
// wait with a timeout and return if signalled via set()
if (reset_event.timedWait(timeout_ns)) |_| {
return;
} else |timed_out| {}
// If we timed out, we need to transition the AutoResetEvent back to UNSET.
// If we don't, then when we return, a set() thread could observe a pointer to an invalid ResetEvent.
state = @cmpxchgStrong(
usize,
&self.state,
@ptrToInt(&reset_event),
UNSET,
.SeqCst,
.SeqCst,
) orelse return error.TimedOut;
// We didn't manage to unregister ourselves from the state.
if (state == SET) {
unreachable; // AutoResetEvent notified without waking up the waiting thread
} else if (state != UNSET) {
unreachable; // multiple waiting threads on the same AutoResetEvent observed when timing out
}
// This menas a set() thread saw our ResetEvent pointer, acquired it, and is trying to wake it up.
// We need to wait for it to wake up our ResetEvent before we can return and invalidate it.
// We don't return error.TimedOut here as it technically notified us while we were "timing out".
reset_event.wait();
return;
}
}
pub fn set(self: *AutoResetEvent) void {
var state = @atomicLoad(usize, &self.state, .SeqCst);
while (true) {
// If the AutoResetEvent is already set, there is nothing else left to do
if (state == SET) {
return;
}
// If the AutoResetEvent isn't set,
// then try to leave a notification for the wait() thread that we set() it.
if (state == UNSET) {
state = @cmpxchgWeak(
usize,
&self.state,
UNSET,
SET,
.SeqCst,
.SeqCst,
) orelse return;
continue;
}
// There is a ResetEvent pointer registered on the AutoResetEvent event thats waiting.
// Try to acquire ownership of it so that we can wake it up.
// This also resets the AutoResetEvent so that there is no race condition as defined above.
if (@cmpxchgWeak(
usize,
&self.state,
state,
UNSET,
.SeqCst,
.SeqCst,
)) |new_state| {
state = new_state;
continue;
}
const reset_event = @intToPtr(*align(event_align) std.ResetEvent, state);
reset_event.set();
return;
}
}
};
test "std.AutoResetEvent" {
// test local code paths
{
var event = AutoResetEvent{};
testing.expectError(error.TimedOut, event.timedWait(1));
event.set();
event.wait();
}
// test cross-thread signaling
if (builtin.single_threaded)
return;
const Context = struct {
value: u128 = 0,
in: AutoResetEvent = AutoResetEvent{},
out: AutoResetEvent = AutoResetEvent{},
const Self = @This();
fn sender(self: *Self) void {
testing.expect(self.value == 0);
self.value = 1;
self.out.set();
self.in.wait();
testing.expect(self.value == 2);
self.value = 3;
self.out.set();
self.in.wait();
testing.expect(self.value == 4);
}
fn receiver(self: *Self) void {
self.out.wait();
testing.expect(self.value == 1);
self.value = 2;
self.in.set();
self.out.wait();
testing.expect(self.value == 3);
self.value = 4;
self.in.set();
}
};
var context = Context{};
const send_thread = try std.Thread.spawn(&context, Context.sender);
const recv_thread = try std.Thread.spawn(&context, Context.receiver);
send_thread.wait();
recv_thread.wait();
}

View File

@ -35,6 +35,9 @@ pub const Loop = struct {
/// This is only used by `Loop` for the thread pool and associated resources.
arena: std.heap.ArenaAllocator,
/// State which manages frames that are sleeping on timers
delay_queue: DelayQueue,
/// Pre-allocated eventfds. All permanently active.
/// This is how `Loop` sends promises to be resumed on other threads.
available_eventfd_resume_nodes: std.atomic.Stack(ResumeNode.EventFd),
@ -162,6 +165,7 @@ pub const Loop = struct {
.fs_queue = std.atomic.Queue(Request).init(),
.fs_thread = undefined,
.fs_thread_wakeup = std.ResetEvent.init(),
.delay_queue = undefined,
};
errdefer self.fs_thread_wakeup.deinit();
errdefer self.arena.deinit();
@ -186,6 +190,9 @@ pub const Loop = struct {
self.posixFsRequest(&self.fs_end_request);
self.fs_thread.wait();
};
if (!std.builtin.single_threaded)
try self.delay_queue.init();
}
pub fn deinit(self: *Loop) void {
@ -645,6 +652,10 @@ pub const Loop = struct {
for (self.extra_threads) |extra_thread| {
extra_thread.wait();
}
@atomicStore(bool, &self.delay_queue.is_running, false, .SeqCst);
self.delay_queue.event.set();
self.delay_queue.thread.wait();
}
/// Runs the provided function asynchronously. The function's frame is allocated
@ -748,6 +759,128 @@ pub const Loop = struct {
}
}
pub fn sleep(self: *Loop, nanoseconds: u64) void {
if (std.builtin.single_threaded)
@compileError("TODO: integrate timers with epoll/kevent/iocp for single-threaded");
suspend {
const now = self.delay_queue.timer.read();
var entry: DelayQueue.Waiters.Entry = undefined;
entry.init(@frame(), now + nanoseconds);
self.delay_queue.waiters.insert(&entry);
// Speculatively wake up the timer thread when we add a new entry.
// If the timer thread is sleeping on a longer entry, we need to
// interrupt it so that our entry can be expired in time.
self.delay_queue.event.set();
}
}
const DelayQueue = struct {
timer: std.time.Timer,
waiters: Waiters,
thread: *std.Thread,
event: std.AutoResetEvent,
is_running: bool,
/// Initialize the delay queue by spawning the timer thread
/// and starting any timer resources.
fn init(self: *DelayQueue) !void {
self.* = DelayQueue{
.timer = try std.time.Timer.start(),
.waiters = DelayQueue.Waiters{
.entries = std.atomic.Queue(anyframe).init(),
},
.thread = try std.Thread.spawn(self, DelayQueue.run),
.event = std.AutoResetEvent{},
.is_running = true,
};
}
/// Entry point for the timer thread
/// which waits for timer entries to expire and reschedules them.
fn run(self: *DelayQueue) void {
const loop = @fieldParentPtr(Loop, "delay_queue", self);
while (@atomicLoad(bool, &self.is_running, .SeqCst)) {
const now = self.timer.read();
if (self.waiters.popExpired(now)) |entry| {
loop.onNextTick(&entry.node);
continue;
}
if (self.waiters.nextExpire()) |expires| {
if (now >= expires)
continue;
self.event.timedWait(expires - now) catch {};
} else {
self.event.wait();
}
}
}
// TODO: use a tickless heirarchical timer wheel:
// https://github.com/wahern/timeout/
const Waiters = struct {
entries: std.atomic.Queue(anyframe),
const Entry = struct {
node: NextTickNode,
expires: u64,
fn init(self: *Entry, frame: anyframe, expires: u64) void {
self.node.data = frame;
self.expires = expires;
}
};
/// Registers the entry into the queue of waiting frames
fn insert(self: *Waiters, entry: *Entry) void {
self.entries.put(&entry.node);
}
/// Dequeues one expired event relative to `now`
fn popExpired(self: *Waiters, now: u64) ?*Entry {
const entry = self.peekExpiringEntry() orelse return null;
if (entry.expires > now)
return null;
assert(self.entries.remove(&entry.node));
return entry;
}
/// Returns an estimate for the amount of time
/// to wait until the next waiting entry expires.
fn nextExpire(self: *Waiters) ?u64 {
const entry = self.peekExpiringEntry() orelse return null;
return entry.expires;
}
fn peekExpiringEntry(self: *Waiters) ?*Entry {
const held = self.entries.mutex.acquire();
defer held.release();
// starting from the head
var head = self.entries.head orelse return null;
// traverse the list of waiting entires to
// find the Node with the smallest `expires` field
var min = head;
while (head.next) |node| {
const minEntry = @fieldParentPtr(Entry, "node", min);
const nodeEntry = @fieldParentPtr(Entry, "node", node);
if (nodeEntry.expires < minEntry.expires)
min = node;
head = node;
}
return @fieldParentPtr(Entry, "node", min);
}
};
};
/// ------- I/0 APIs -------
pub fn accept(
self: *Loop,
@ -1550,3 +1683,27 @@ test "std.event.Loop - runDetached" {
fn testRunDetached() void {
testRunDetachedData += 1;
}
test "std.event.Loop - sleep" {
// https://github.com/ziglang/zig/issues/1908
if (builtin.single_threaded) return error.SkipZigTest;
if (!std.io.is_async) return error.SkipZigTest;
const frames = try testing.allocator.alloc(@Frame(testSleep), 10);
defer testing.allocator.free(frames);
const wait_time = 100 * std.time.ns_per_ms;
var sleep_count: usize = 0;
for (frames) |*frame|
frame.* = async testSleep(wait_time, &sleep_count);
for (frames) |*frame|
await frame;
testing.expect(sleep_count == frames.len);
}
fn testSleep(wait_ns: u64, sleep_count: *usize) void {
Loop.instance.?.sleep(wait_ns);
_ = @atomicRmw(usize, sleep_count, .Add, 1, .SeqCst);
}

View File

@ -14,6 +14,7 @@ pub const AutoArrayHashMap = array_hash_map.AutoArrayHashMap;
pub const AutoArrayHashMapUnmanaged = array_hash_map.AutoArrayHashMapUnmanaged;
pub const AutoHashMap = hash_map.AutoHashMap;
pub const AutoHashMapUnmanaged = hash_map.AutoHashMapUnmanaged;
pub const AutoResetEvent = @import("auto_reset_event.zig").AutoResetEvent;
pub const BufMap = @import("buf_map.zig").BufMap;
pub const BufSet = @import("buf_set.zig").BufSet;
pub const ChildProcess = @import("child_process.zig").ChildProcess;

View File

@ -14,8 +14,11 @@ const is_windows = std.Target.current.os.tag == .windows;
pub const epoch = @import("time/epoch.zig");
/// Spurious wakeups are possible and no precision of timing is guaranteed.
/// TODO integrate with evented I/O
pub fn sleep(nanoseconds: u64) void {
// TODO: opting out of async sleeping?
if (std.io.is_async)
return std.event.Loop.instance.?.sleep(nanoseconds);
if (is_windows) {
const big_ms_from_ns = nanoseconds / ns_per_ms;
const ms = math.cast(os.windows.DWORD, big_ms_from_ns) catch math.maxInt(os.windows.DWORD);