From 4530adbd3314c28e2cfe831e7a20b154614e2c9c Mon Sep 17 00:00:00 2001 From: Vexu <15308111+Vexu@users.noreply.github.com> Date: Wed, 6 Nov 2019 21:41:35 +0200 Subject: [PATCH] use global event loop in std.event types --- lib/std/event/future.zig | 17 +++++---------- lib/std/event/group.zig | 26 ++++++++++------------- lib/std/event/lock.zig | 31 ++++++++++++---------------- lib/std/event/locked.zig | 7 +++---- lib/std/event/rwlock.zig | 42 +++++++++++++++++--------------------- lib/std/event/rwlocked.zig | 9 ++++---- 6 files changed, 55 insertions(+), 77 deletions(-) diff --git a/lib/std/event/future.zig b/lib/std/event/future.zig index 3e5754982e..356d9cac79 100644 --- a/lib/std/event/future.zig +++ b/lib/std/event/future.zig @@ -3,7 +3,6 @@ const assert = std.debug.assert; const testing = std.testing; const builtin = @import("builtin"); const Lock = std.event.Lock; -const Loop = std.event.Loop; /// This is a value that starts out unavailable, until resolve() is called /// While it is unavailable, functions suspend when they try to get() it, @@ -23,9 +22,9 @@ pub fn Future(comptime T: type) type { const Self = @This(); const Queue = std.atomic.Queue(anyframe); - pub fn init(loop: *Loop) Self { + pub fn init() Self { return Self{ - .lock = Lock.initLocked(loop), + .lock = Lock.initLocked(), .available = 0, .data = undefined, }; @@ -90,17 +89,11 @@ test "std.event.Future" { // TODO provide a way to run tests in evented I/O mode if (!std.io.is_async) return error.SkipZigTest; - var loop: Loop = undefined; - try loop.initMultiThreaded(); - defer loop.deinit(); - - const handle = async testFuture(&loop); - - loop.run(); + const handle = async testFuture(); } -fn testFuture(loop: *Loop) void { - var future = Future(i32).init(loop); +fn testFuture() void { + var future = Future(i32).init(); var a = async waitOnFuture(&future); var b = async waitOnFuture(&future); diff --git a/lib/std/event/group.zig b/lib/std/event/group.zig index ee15c1d3c2..f073bb3df2 100644 --- a/lib/std/event/group.zig +++ b/lib/std/event/group.zig @@ -1,8 +1,8 @@ const std = @import("../std.zig"); const builtin = @import("builtin"); const Lock = std.event.Lock; -const Loop = std.event.Loop; const testing = std.testing; +const Allocator = std.mem.Allocator; /// ReturnType must be `void` or `E!void` pub fn Group(comptime ReturnType: type) type { @@ -10,6 +10,7 @@ pub fn Group(comptime ReturnType: type) type { frame_stack: Stack, alloc_stack: Stack, lock: Lock, + allocator: *Allocator, const Self = @This(); @@ -19,17 +20,18 @@ pub fn Group(comptime ReturnType: type) type { }; const Stack = std.atomic.Stack(anyframe->ReturnType); - pub fn init(loop: *Loop) Self { + pub fn init(allocator: *Allocator) Self { return Self{ .frame_stack = Stack.init(), .alloc_stack = Stack.init(), - .lock = Lock.init(loop), + .lock = Lock.init(), + .allocator = allocator, }; } /// Add a frame to the group. Thread-safe. pub fn add(self: *Self, handle: anyframe->ReturnType) (error{OutOfMemory}!void) { - const node = try self.lock.loop.allocator.create(Stack.Node); + const node = try self.allocator.create(Stack.Node); node.* = Stack.Node{ .next = undefined, .data = handle, @@ -66,7 +68,7 @@ pub fn Group(comptime ReturnType: type) type { } while (self.alloc_stack.pop()) |node| { const handle = node.data; - self.lock.loop.allocator.destroy(node); + self.allocator.destroy(node); if (Error == void) { await handle; } else { @@ -87,18 +89,12 @@ test "std.event.Group" { // TODO provide a way to run tests in evented I/O mode if (!std.io.is_async) return error.SkipZigTest; - var loop: Loop = undefined; - try loop.initMultiThreaded(); - defer loop.deinit(); - - const handle = async testGroup(&loop); - - loop.run(); + const handle = async testGroup(std.heap.direct_allocator); } -async fn testGroup(loop: *Loop) void { +async fn testGroup(allocator: *Allocator) void { var count: usize = 0; - var group = Group(void).init(loop); + var group = Group(void).init(allocator); var sleep_a_little_frame = async sleepALittle(&count); group.add(&sleep_a_little_frame) catch @panic("memory"); var increase_by_ten_frame = async increaseByTen(&count); @@ -106,7 +102,7 @@ async fn testGroup(loop: *Loop) void { group.wait(); testing.expect(count == 11); - var another = Group(anyerror!void).init(loop); + var another = Group(anyerror!void).init(allocator); var something_else_frame = async somethingElse(); another.add(&something_else_frame) catch @panic("memory"); var something_that_fails_frame = async doSomethingThatFails(); diff --git a/lib/std/event/lock.zig b/lib/std/event/lock.zig index da70b571df..576a09064f 100644 --- a/lib/std/event/lock.zig +++ b/lib/std/event/lock.zig @@ -11,20 +11,22 @@ const Loop = std.event.Loop; /// Allows only one actor to hold the lock. /// TODO: make this API also work in blocking I/O mode. pub const Lock = struct { - loop: *Loop, shared_bit: u8, // TODO make this a bool queue: Queue, queue_empty_bit: u8, // TODO make this a bool const Queue = std.atomic.Queue(anyframe); + const global_event_loop = Loop.instance orelse + @compileError("std.event.Lock currently only works with event-based I/O"); + pub const Held = struct { lock: *Lock, pub fn release(self: Held) void { // Resume the next item from the queue. if (self.lock.queue.get()) |node| { - self.lock.loop.onNextTick(node); + global_event_loop.onNextTick(node); return; } @@ -49,7 +51,7 @@ pub const Lock = struct { // Resume the next item from the queue. if (self.lock.queue.get()) |node| { - self.lock.loop.onNextTick(node); + global_event_loop.onNextTick(node); return; } @@ -65,18 +67,16 @@ pub const Lock = struct { } }; - pub fn init(loop: *Loop) Lock { + pub fn init() Lock { return Lock{ - .loop = loop, .shared_bit = 0, .queue = Queue.init(), .queue_empty_bit = 1, }; } - pub fn initLocked(loop: *Loop) Lock { + pub fn initLocked() Lock { return Lock{ - .loop = loop, .shared_bit = 1, .queue = Queue.init(), .queue_empty_bit = 1, @@ -126,27 +126,22 @@ test "std.event.Lock" { // TODO provide a way to run tests in evented I/O mode if (!std.io.is_async) return error.SkipZigTest; - var loop: Loop = undefined; - try loop.initMultiThreaded(); - defer loop.deinit(); - - var lock = Lock.init(&loop); + var lock = Lock.init(); defer lock.deinit(); - _ = async testLock(&loop, &lock); - loop.run(); + _ = async testLock(&lock); testing.expectEqualSlices(i32, [1]i32{3 * @intCast(i32, shared_test_data.len)} ** shared_test_data.len, shared_test_data); } -async fn testLock(loop: *Loop, lock: *Lock) void { +async fn testLock(lock: *Lock) void { var handle1 = async lockRunner(lock); var tick_node1 = Loop.NextTickNode{ .prev = undefined, .next = undefined, .data = &handle1, }; - loop.onNextTick(&tick_node1); + Loop.instance.?.onNextTick(&tick_node1); var handle2 = async lockRunner(lock); var tick_node2 = Loop.NextTickNode{ @@ -154,7 +149,7 @@ async fn testLock(loop: *Loop, lock: *Lock) void { .next = undefined, .data = &handle2, }; - loop.onNextTick(&tick_node2); + Loop.instance.?.onNextTick(&tick_node2); var handle3 = async lockRunner(lock); var tick_node3 = Loop.NextTickNode{ @@ -162,7 +157,7 @@ async fn testLock(loop: *Loop, lock: *Lock) void { .next = undefined, .data = &handle3, }; - loop.onNextTick(&tick_node3); + Loop.instance.?.onNextTick(&tick_node3); await handle1; await handle2; diff --git a/lib/std/event/locked.zig b/lib/std/event/locked.zig index aeedf3558a..5e9c0ea10e 100644 --- a/lib/std/event/locked.zig +++ b/lib/std/event/locked.zig @@ -1,6 +1,5 @@ const std = @import("../std.zig"); const Lock = std.event.Lock; -const Loop = std.event.Loop; /// Thread-safe async/await lock that protects one piece of data. /// Functions which are waiting for the lock are suspended, and @@ -21,9 +20,9 @@ pub fn Locked(comptime T: type) type { } }; - pub fn init(loop: *Loop, data: T) Self { + pub fn init(data: T) Self { return Self{ - .lock = Lock.init(loop), + .lock = Lock.init(), .private_data = data, }; } @@ -35,7 +34,7 @@ pub fn Locked(comptime T: type) type { pub async fn acquire(self: *Self) HeldLock { return HeldLock{ // TODO guaranteed allocation elision - .held = await (async self.lock.acquire() catch unreachable), + .held = self.lock.acquire(), .value = &self.private_data, }; } diff --git a/lib/std/event/rwlock.zig b/lib/std/event/rwlock.zig index 7f86b004be..c05e740b09 100644 --- a/lib/std/event/rwlock.zig +++ b/lib/std/event/rwlock.zig @@ -13,7 +13,6 @@ const Loop = std.event.Loop; /// When a write lock is held, it will not be released until the writer queue is empty. /// TODO: make this API also work in blocking I/O mode pub const RwLock = struct { - loop: *Loop, shared_state: u8, // TODO make this an enum writer_queue: Queue, reader_queue: Queue, @@ -29,6 +28,9 @@ pub const RwLock = struct { const Queue = std.atomic.Queue(anyframe); + const global_event_loop = Loop.instance orelse + @compileError("std.event.RwLock currently only works with event-based I/O"); + pub const HeldRead = struct { lock: *RwLock, @@ -55,7 +57,7 @@ pub const RwLock = struct { // See if we can leave it locked for writing, and pass the lock to the next writer // in the queue to grab the lock. if (self.lock.writer_queue.get()) |node| { - self.lock.loop.onNextTick(node); + global_event_loop.onNextTick(node); return; } @@ -64,7 +66,7 @@ pub const RwLock = struct { // Switch to a read lock. _ = @atomicRmw(u8, &self.lock.shared_state, .Xchg, State.ReadLock, .SeqCst); while (self.lock.reader_queue.get()) |node| { - self.lock.loop.onNextTick(node); + global_event_loop.onNextTick(node); } return; } @@ -76,9 +78,8 @@ pub const RwLock = struct { } }; - pub fn init(loop: *Loop) RwLock { + pub fn init() RwLock { return RwLock{ - .loop = loop, .shared_state = State.Unlocked, .writer_queue = Queue.init(), .writer_queue_empty_bit = 1, @@ -120,7 +121,7 @@ pub const RwLock = struct { // Give out all the read locks. if (self.reader_queue.get()) |first_node| { while (self.reader_queue.get()) |node| { - self.loop.onNextTick(node); + global_event_loop.onNextTick(node); } resume first_node.data; } @@ -171,7 +172,7 @@ pub const RwLock = struct { } // If there's an item in the writer queue, give them the lock, and we're done. if (self.writer_queue.get()) |node| { - self.loop.onNextTick(node); + global_event_loop.onNextTick(node); return; } // Release the lock again. @@ -187,9 +188,9 @@ pub const RwLock = struct { } // If there are any items in the reader queue, give out all the reader locks, and we're done. if (self.reader_queue.get()) |first_node| { - self.loop.onNextTick(first_node); + global_event_loop.onNextTick(first_node); while (self.reader_queue.get()) |node| { - self.loop.onNextTick(node); + global_event_loop.onNextTick(node); } return; } @@ -216,46 +217,41 @@ test "std.event.RwLock" { // TODO provide a way to run tests in evented I/O mode if (!std.io.is_async) return error.SkipZigTest; - var loop: Loop = undefined; - try loop.initMultiThreaded(); - defer loop.deinit(); - - var lock = RwLock.init(&loop); + var lock = RwLock.init(); defer lock.deinit(); - const handle = testLock(&loop, &lock); - loop.run(); + const handle = testLock(std.heap.direct_allocator, &lock); const expected_result = [1]i32{shared_it_count * @intCast(i32, shared_test_data.len)} ** shared_test_data.len; testing.expectEqualSlices(i32, expected_result, shared_test_data); } -async fn testLock(loop: *Loop, lock: *RwLock) void { +async fn testLock(allocator: *Allocator, lock: *RwLock) void { var read_nodes: [100]Loop.NextTickNode = undefined; for (read_nodes) |*read_node| { - const frame = loop.allocator.create(@Frame(readRunner)) catch @panic("memory"); + const frame = allocator.create(@Frame(readRunner)) catch @panic("memory"); read_node.data = frame; frame.* = async readRunner(lock); - loop.onNextTick(read_node); + Loop.instance.?.onNextTick(read_node); } var write_nodes: [shared_it_count]Loop.NextTickNode = undefined; for (write_nodes) |*write_node| { - const frame = loop.allocator.create(@Frame(writeRunner)) catch @panic("memory"); + const frame = allocator.create(@Frame(writeRunner)) catch @panic("memory"); write_node.data = frame; frame.* = async writeRunner(lock); - loop.onNextTick(write_node); + Loop.instance.?.onNextTick(write_node); } for (write_nodes) |*write_node| { const casted = @ptrCast(*const @Frame(writeRunner), write_node.data); await casted; - loop.allocator.destroy(casted); + allocator.destroy(casted); } for (read_nodes) |*read_node| { const casted = @ptrCast(*const @Frame(readRunner), read_node.data); await casted; - loop.allocator.destroy(casted); + allocator.destroy(casted); } } diff --git a/lib/std/event/rwlocked.zig b/lib/std/event/rwlocked.zig index 386aa08407..3f4c6ddbf8 100644 --- a/lib/std/event/rwlocked.zig +++ b/lib/std/event/rwlocked.zig @@ -1,6 +1,5 @@ const std = @import("../std.zig"); const RwLock = std.event.RwLock; -const Loop = std.event.Loop; /// Thread-safe async/await RW lock that protects one piece of data. /// Functions which are waiting for the lock are suspended, and @@ -30,9 +29,9 @@ pub fn RwLocked(comptime T: type) type { } }; - pub fn init(loop: *Loop, data: T) Self { + pub fn init(data: T) Self { return Self{ - .lock = RwLock.init(loop), + .lock = RwLock.init(), .locked_data = data, }; } @@ -43,14 +42,14 @@ pub fn RwLocked(comptime T: type) type { pub async fn acquireRead(self: *Self) HeldReadLock { return HeldReadLock{ - .held = await (async self.lock.acquireRead() catch unreachable), + .held = self.lock.acquireRead(), .value = &self.locked_data, }; } pub async fn acquireWrite(self: *Self) HeldWriteLock { return HeldWriteLock{ - .held = await (async self.lock.acquireWrite() catch unreachable), + .held = self.lock.acquireWrite(), .value = &self.locked_data, }; }