From 8fba0a6ae862993afa2aeca774347adc399b3605 Mon Sep 17 00:00:00 2001 From: Andrew Kelley Date: Tue, 10 Jul 2018 15:17:01 -0400 Subject: [PATCH] introduce std.event.Group for making parallel async calls --- CMakeLists.txt | 1 + src-self-hosted/module.zig | 36 +++++++-- std/event.zig | 2 + std/event/group.zig | 158 +++++++++++++++++++++++++++++++++++++ 4 files changed, 189 insertions(+), 8 deletions(-) create mode 100644 std/event/group.zig diff --git a/CMakeLists.txt b/CMakeLists.txt index fdedcd5eec..eeb0ec2058 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -459,6 +459,7 @@ set(ZIG_STD_FILES "empty.zig" "event.zig" "event/channel.zig" + "event/group.zig" "event/lock.zig" "event/locked.zig" "event/loop.zig" diff --git a/src-self-hosted/module.zig b/src-self-hosted/module.zig index 5ce1a7965a..24be228eb8 100644 --- a/src-self-hosted/module.zig +++ b/src-self-hosted/module.zig @@ -85,6 +85,17 @@ pub const Module = struct { exported_symbol_names: event.Locked(Decl.Table), + /// Before code generation starts, must wait on this group to make sure + /// the build is complete. + build_group: event.Group(BuildError!void), + + const BuildErrorsList = std.SegmentedList(BuildErrorDesc, 1); + + pub const BuildErrorDesc = struct { + code: BuildError, + text: []const u8, + }; + // TODO handle some of these earlier and report them in a way other than error codes pub const BuildError = error{ OutOfMemory, @@ -237,6 +248,7 @@ pub const Module = struct { .emit_file_type = Emit.Binary, .link_out_file = null, .exported_symbol_names = event.Locked(Decl.Table).init(loop, Decl.Table.init(loop.allocator)), + .build_group = event.Group(BuildError!void).init(loop), }); } @@ -310,6 +322,9 @@ pub const Module = struct { const decls = try Scope.Decls.create(self.a(), null); errdefer decls.destroy(); + var decl_group = event.Group(BuildError!void).init(self.loop); + errdefer decl_group.cancelAll(); + var it = tree.root_node.decls.iterator(0); while (it.next()) |decl_ptr| { const decl = decl_ptr.*; @@ -342,25 +357,30 @@ pub const Module = struct { }); errdefer self.a().destroy(fn_decl); - // TODO make this parallel - try await try async self.addTopLevelDecl(tree, &fn_decl.base); + try decl_group.call(addTopLevelDecl, self, tree, &fn_decl.base); }, ast.Node.Id.TestDecl => @panic("TODO"), else => unreachable, } } + try await (async decl_group.wait() catch unreachable); + try await (async self.build_group.wait() catch unreachable); } async fn addTopLevelDecl(self: *Module, tree: *ast.Tree, decl: *Decl) !void { const is_export = decl.isExported(tree); - { - const exported_symbol_names = await try async self.exported_symbol_names.acquire(); - defer exported_symbol_names.release(); + if (is_export) { + try self.build_group.call(verifyUniqueSymbol, self, decl); + } + } - if (try exported_symbol_names.value.put(decl.name, decl)) |other_decl| { - @panic("TODO report compile error"); - } + async fn verifyUniqueSymbol(self: *Module, decl: *Decl) !void { + const exported_symbol_names = await (async self.exported_symbol_names.acquire() catch unreachable); + defer exported_symbol_names.release(); + + if (try exported_symbol_names.value.put(decl.name, decl)) |other_decl| { + @panic("TODO report compile error"); } } diff --git a/std/event.zig b/std/event.zig index 7e9928b3d7..516defebf8 100644 --- a/std/event.zig +++ b/std/event.zig @@ -3,6 +3,7 @@ pub const Loop = @import("event/loop.zig").Loop; pub const Lock = @import("event/lock.zig").Lock; pub const tcp = @import("event/tcp.zig"); pub const Channel = @import("event/channel.zig").Channel; +pub const Group = @import("event/group.zig").Group; test "import event tests" { _ = @import("event/locked.zig"); @@ -10,4 +11,5 @@ test "import event tests" { _ = @import("event/lock.zig"); _ = @import("event/tcp.zig"); _ = @import("event/channel.zig"); + _ = @import("event/group.zig"); } diff --git a/std/event/group.zig b/std/event/group.zig new file mode 100644 index 0000000000..c286803b53 --- /dev/null +++ b/std/event/group.zig @@ -0,0 +1,158 @@ +const std = @import("../index.zig"); +const builtin = @import("builtin"); +const Lock = std.event.Lock; +const Loop = std.event.Loop; +const AtomicRmwOp = builtin.AtomicRmwOp; +const AtomicOrder = builtin.AtomicOrder; +const assert = std.debug.assert; + +/// ReturnType should be `void` or `E!void` +pub fn Group(comptime ReturnType: type) type { + return struct { + coro_stack: Stack, + alloc_stack: Stack, + lock: Lock, + + const Self = this; + + const Error = switch (@typeInfo(ReturnType)) { + builtin.TypeId.ErrorUnion => |payload| payload.error_set, + else => void, + }; + const Stack = std.atomic.Stack(promise->ReturnType); + + pub fn init(loop: *Loop) Self { + return Self{ + .coro_stack = Stack.init(), + .alloc_stack = Stack.init(), + .lock = Lock.init(loop), + }; + } + + /// Add a promise to the group. Thread-safe. + pub fn add(self: *Self, handle: promise->ReturnType) (error{OutOfMemory}!void) { + const node = try self.lock.loop.allocator.create(Stack.Node{ + .next = undefined, + .data = handle, + }); + self.alloc_stack.push(node); + } + + /// This is equivalent to an async call, but the async function is added to the group, instead + /// of returning a promise. func must be async and have return type void. + /// Thread-safe. + pub fn call(self: *Self, comptime func: var, args: ...) (error{OutOfMemory}!void) { + const S = struct { + async fn asyncFunc(node: **Stack.Node, args2: ...) ReturnType { + // TODO this is a hack to make the memory following be inside the coro frame + suspend |p| { + var my_node: Stack.Node = undefined; + node.* = &my_node; + resume p; + } + + // TODO this allocation elision should be guaranteed because we await it in + // this coro frame + return await (async func(args2) catch unreachable); + } + }; + var node: *Stack.Node = undefined; + const handle = try async S.asyncFunc(&node, args); + node.* = Stack.Node{ + .next = undefined, + .data = handle, + }; + self.coro_stack.push(node); + } + + /// Wait for all the calls and promises of the group to complete. + /// Thread-safe. + pub async fn wait(self: *Self) ReturnType { + // TODO catch unreachable because the allocation can be grouped with + // the coro frame allocation + const held = await (async self.lock.acquire() catch unreachable); + defer held.release(); + + while (self.coro_stack.pop()) |node| { + if (Error == void) { + await node.data; + } else { + (await node.data) catch |err| { + self.cancelAll(); + return err; + }; + } + } + while (self.alloc_stack.pop()) |node| { + const handle = node.data; + self.lock.loop.allocator.destroy(node); + if (Error == void) { + await handle; + } else { + (await handle) catch |err| { + self.cancelAll(); + return err; + }; + } + } + } + + /// Cancel all the outstanding promises. May only be called if wait was never called. + pub fn cancelAll(self: *Self) void { + while (self.coro_stack.pop()) |node| { + cancel node.data; + } + while (self.alloc_stack.pop()) |node| { + cancel node.data; + self.lock.loop.allocator.destroy(node); + } + } + }; +} + +test "std.event.Group" { + var da = std.heap.DirectAllocator.init(); + defer da.deinit(); + + const allocator = &da.allocator; + + var loop: Loop = undefined; + try loop.initMultiThreaded(allocator); + defer loop.deinit(); + + const handle = try async testGroup(&loop); + defer cancel handle; + + loop.run(); +} + +async fn testGroup(loop: *Loop) void { + var count: usize = 0; + var group = Group(void).init(loop); + group.add(async sleepALittle(&count) catch @panic("memory")) catch @panic("memory"); + group.call(increaseByTen, &count) catch @panic("memory"); + await (async group.wait() catch @panic("memory")); + assert(count == 11); + + var another = Group(error!void).init(loop); + another.add(async somethingElse() catch @panic("memory")) catch @panic("memory"); + another.call(doSomethingThatFails) catch @panic("memory"); + std.debug.assertError(await (async another.wait() catch @panic("memory")), error.ItBroke); +} + +async fn sleepALittle(count: *usize) void { + std.os.time.sleep(0, 1000000); + _ = @atomicRmw(usize, count, AtomicRmwOp.Add, 1, AtomicOrder.SeqCst); +} + +async fn increaseByTen(count: *usize) void { + var i: usize = 0; + while (i < 10) : (i += 1) { + _ = @atomicRmw(usize, count, AtomicRmwOp.Add, 1, AtomicOrder.SeqCst); + } +} + +async fn doSomethingThatFails() error!void {} +async fn somethingElse() error!void { + return error.ItBroke; +}