From a3f55aaf34f0a459c8aec4b35e55ad4534eaca30 Mon Sep 17 00:00:00 2001 From: Andrew Kelley Date: Fri, 29 Jun 2018 15:39:55 -0400 Subject: [PATCH] add event loop Channel abstraction This is akin to channels in Go, except: * implemented in userland * they are lock-free and thread-safe * they integrate with the userland event loop The self hosted compiler is changed to use a channel for events, and made to stay alive, watching files and performing builds when things change, however the main.zig file exits after 1 build. Note that nothing is actually built yet, it just parses the input and then declares that the build succeeded. Next items to do: * add windows and macos support for std.event.Loop * improve the event loop stop() operation * make the event loop multiplex coroutines onto kernel threads * watch source file for updates, and provide AST diffs (at least list the top level declaration changes) * top level declaration analysis --- src-self-hosted/main.zig | 37 ++++- src-self-hosted/module.zig | 135 +++++++++++++----- std/atomic/queue_mpsc.zig | 2 +- std/event.zig | 279 ++++++++++++++++++++++++++++++++++++- std/fmt/index.zig | 3 + std/heap.zig | 1 + 6 files changed, 416 insertions(+), 41 deletions(-) diff --git a/src-self-hosted/main.zig b/src-self-hosted/main.zig index 6dabddaefb..d17fc94c82 100644 --- a/src-self-hosted/main.zig +++ b/src-self-hosted/main.zig @@ -1,6 +1,7 @@ const std = @import("std"); const builtin = @import("builtin"); +const event = std.event; const os = std.os; const io = std.io; const mem = std.mem; @@ -43,6 +44,9 @@ const Command = struct { }; pub fn main() !void { + // This allocator needs to be thread-safe because we use it for the event.Loop + // which multiplexes coroutines onto kernel threads. + // libc allocator is guaranteed to have this property. const allocator = std.heap.c_allocator; var stdout_file = try std.io.getStdOut(); @@ -380,8 +384,10 @@ fn buildOutputType(allocator: *Allocator, args: []const []const u8, out_type: Mo const zig_lib_dir = introspect.resolveZigLibDir(allocator) catch os.exit(1); defer allocator.free(zig_lib_dir); + var loop = try event.Loop.init(allocator); + var module = try Module.create( - allocator, + &loop, root_name, root_source_file, Target.Native, @@ -471,9 +477,35 @@ fn buildOutputType(allocator: *Allocator, args: []const []const u8, out_type: Mo module.emit_file_type = emit_type; module.link_objects = link_objects; module.assembly_files = assembly_files; + module.link_out_file = flags.single("out-file"); try module.build(); - try module.link(flags.single("out-file")); + const process_build_events_handle = try async processBuildEvents(module, true); + defer cancel process_build_events_handle; + loop.run(); +} + +async fn processBuildEvents(module: *Module, watch: bool) void { + while (watch) { + // TODO directly awaiting async should guarantee memory allocation elision + const build_event = await (async module.events.get() catch unreachable); + + switch (build_event) { + Module.Event.Ok => { + std.debug.warn("Build succeeded\n"); + // for now we stop after 1 + module.loop.stop(); + return; + }, + Module.Event.Error => |err| { + std.debug.warn("build failed: {}\n", @errorName(err)); + @panic("TODO error return trace"); + }, + Module.Event.Fail => |errs| { + @panic("TODO print compile error messages"); + }, + } + } } fn cmdBuildExe(allocator: *Allocator, args: []const []const u8) !void { @@ -780,4 +812,3 @@ const CliPkg = struct { self.children.deinit(); } }; - diff --git a/src-self-hosted/module.zig b/src-self-hosted/module.zig index 4da46cd38c..4fac760790 100644 --- a/src-self-hosted/module.zig +++ b/src-self-hosted/module.zig @@ -11,9 +11,11 @@ const warn = std.debug.warn; const Token = std.zig.Token; const ArrayList = std.ArrayList; const errmsg = @import("errmsg.zig"); +const ast = std.zig.ast; +const event = std.event; pub const Module = struct { - allocator: *mem.Allocator, + loop: *event.Loop, name: Buffer, root_src_path: ?[]const u8, module: llvm.ModuleRef, @@ -76,6 +78,50 @@ pub const Module = struct { kind: Kind, + link_out_file: ?[]const u8, + events: *event.Channel(Event), + + // TODO handle some of these earlier and report them in a way other than error codes + pub const BuildError = error{ + OutOfMemory, + EndOfStream, + BadFd, + Io, + IsDir, + Unexpected, + SystemResources, + SharingViolation, + PathAlreadyExists, + FileNotFound, + AccessDenied, + PipeBusy, + FileTooBig, + SymLinkLoop, + ProcessFdQuotaExceeded, + NameTooLong, + SystemFdQuotaExceeded, + NoDevice, + PathNotFound, + NoSpaceLeft, + NotDir, + FileSystem, + OperationAborted, + IoPending, + BrokenPipe, + WouldBlock, + FileClosed, + DestinationAddressRequired, + DiskQuota, + InputOutput, + NoStdHandles, + }; + + pub const Event = union(enum) { + Ok, + Fail: []errmsg.Msg, + Error: BuildError, + }; + pub const DarwinVersionMin = union(enum) { None, MacOS: []const u8, @@ -104,7 +150,7 @@ pub const Module = struct { }; pub fn create( - allocator: *mem.Allocator, + loop: *event.Loop, name: []const u8, root_src_path: ?[]const u8, target: *const Target, @@ -113,7 +159,7 @@ pub const Module = struct { zig_lib_dir: []const u8, cache_dir: []const u8, ) !*Module { - var name_buffer = try Buffer.init(allocator, name); + var name_buffer = try Buffer.init(loop.allocator, name); errdefer name_buffer.deinit(); const context = c.LLVMContextCreate() orelse return error.OutOfMemory; @@ -125,8 +171,12 @@ pub const Module = struct { const builder = c.LLVMCreateBuilderInContext(context) orelse return error.OutOfMemory; errdefer c.LLVMDisposeBuilder(builder); - const module_ptr = try allocator.create(Module{ - .allocator = allocator, + const events = try event.Channel(Event).create(loop, 0); + errdefer events.destroy(); + + return loop.allocator.create(Module{ + .loop = loop, + .events = events, .name = name_buffer, .root_src_path = root_src_path, .module = module, @@ -171,7 +221,7 @@ pub const Module = struct { .link_objects = [][]const u8{}, .windows_subsystem_windows = false, .windows_subsystem_console = false, - .link_libs_list = ArrayList(*LinkLib).init(allocator), + .link_libs_list = ArrayList(*LinkLib).init(loop.allocator), .libc_link_lib = null, .err_color = errmsg.Color.Auto, .darwin_frameworks = [][]const u8{}, @@ -179,9 +229,8 @@ pub const Module = struct { .test_filters = [][]const u8{}, .test_name_prefix = null, .emit_file_type = Emit.Binary, + .link_out_file = null, }); - errdefer allocator.destroy(module_ptr); - return module_ptr; } fn dump(self: *Module) void { @@ -189,58 +238,70 @@ pub const Module = struct { } pub fn destroy(self: *Module) void { + self.events.destroy(); c.LLVMDisposeBuilder(self.builder); c.LLVMDisposeModule(self.module); c.LLVMContextDispose(self.context); self.name.deinit(); - self.allocator.destroy(self); + self.a().destroy(self); } pub fn build(self: *Module) !void { if (self.llvm_argv.len != 0) { - var c_compatible_args = try std.cstr.NullTerminated2DArray.fromSlices(self.allocator, [][]const []const u8{ + var c_compatible_args = try std.cstr.NullTerminated2DArray.fromSlices(self.a(), [][]const []const u8{ [][]const u8{"zig (LLVM option parsing)"}, self.llvm_argv, }); defer c_compatible_args.deinit(); + // TODO this sets global state c.ZigLLVMParseCommandLineOptions(self.llvm_argv.len + 1, c_compatible_args.ptr); } + _ = try async self.buildAsync(); + } + + async fn buildAsync(self: *Module) void { + while (true) { + // TODO directly awaiting async should guarantee memory allocation elision + // TODO also async before suspending should guarantee memory allocation elision + (await (async self.addRootSrc() catch unreachable)) catch |err| { + await (async self.events.put(Event{ .Error = err }) catch unreachable); + return; + }; + await (async self.events.put(Event.Ok) catch unreachable); + } + } + + async fn addRootSrc(self: *Module) !void { const root_src_path = self.root_src_path orelse @panic("TODO handle null root src path"); - const root_src_real_path = os.path.real(self.allocator, root_src_path) catch |err| { + const root_src_real_path = os.path.real(self.a(), root_src_path) catch |err| { try printError("unable to get real path '{}': {}", root_src_path, err); return err; }; - errdefer self.allocator.free(root_src_real_path); + errdefer self.a().free(root_src_real_path); - const source_code = io.readFileAlloc(self.allocator, root_src_real_path) catch |err| { + const source_code = io.readFileAlloc(self.a(), root_src_real_path) catch |err| { try printError("unable to open '{}': {}", root_src_real_path, err); return err; }; - errdefer self.allocator.free(source_code); + errdefer self.a().free(source_code); - warn("====input:====\n"); - - warn("{}", source_code); - - warn("====parse:====\n"); - - var tree = try std.zig.parse(self.allocator, source_code); + var tree = try std.zig.parse(self.a(), source_code); defer tree.deinit(); - var stderr_file = try std.io.getStdErr(); - var stderr_file_out_stream = std.io.FileOutStream.init(&stderr_file); - const out_stream = &stderr_file_out_stream.stream; - - warn("====fmt:====\n"); - _ = try std.zig.render(self.allocator, out_stream, &tree); - - warn("====ir:====\n"); - warn("TODO\n\n"); - - warn("====llvm ir:====\n"); - self.dump(); + //var it = tree.root_node.decls.iterator(); + //while (it.next()) |decl_ptr| { + // const decl = decl_ptr.*; + // switch (decl.id) { + // ast.Node.Comptime => @panic("TODO"), + // ast.Node.VarDecl => @panic("TODO"), + // ast.Node.UseDecl => @panic("TODO"), + // ast.Node.FnDef => @panic("TODO"), + // ast.Node.TestDecl => @panic("TODO"), + // else => unreachable, + // } + //} } pub fn link(self: *Module, out_file: ?[]const u8) !void { @@ -263,11 +324,11 @@ pub const Module = struct { } } - const link_lib = try self.allocator.create(LinkLib{ + const link_lib = try self.a().create(LinkLib{ .name = name, .path = null, .provided_explicitly = provided_explicitly, - .symbols = ArrayList([]u8).init(self.allocator), + .symbols = ArrayList([]u8).init(self.a()), }); try self.link_libs_list.append(link_lib); if (is_libc) { @@ -275,6 +336,10 @@ pub const Module = struct { } return link_lib; } + + fn a(self: Module) *mem.Allocator { + return self.loop.allocator; + } }; fn printError(comptime format: []const u8, args: ...) !void { diff --git a/std/atomic/queue_mpsc.zig b/std/atomic/queue_mpsc.zig index 66eb4573df..8030565d7a 100644 --- a/std/atomic/queue_mpsc.zig +++ b/std/atomic/queue_mpsc.zig @@ -1,4 +1,4 @@ -const std = @import("std"); +const std = @import("../index.zig"); const assert = std.debug.assert; const builtin = @import("builtin"); const AtomicOrder = builtin.AtomicOrder; diff --git a/std/event.zig b/std/event.zig index 0821c789b7..7f823bc732 100644 --- a/std/event.zig +++ b/std/event.zig @@ -4,6 +4,8 @@ const assert = std.debug.assert; const event = this; const mem = std.mem; const posix = std.os.posix; +const AtomicRmwOp = builtin.AtomicRmwOp; +const AtomicOrder = builtin.AtomicOrder; pub const TcpServer = struct { handleRequestFn: async<*mem.Allocator> fn (*TcpServer, *const std.net.Address, *const std.os.File) void, @@ -95,16 +97,29 @@ pub const Loop = struct { allocator: *mem.Allocator, epollfd: i32, keep_running: bool, + next_tick_queue: std.atomic.QueueMpsc(promise), - fn init(allocator: *mem.Allocator) !Loop { + pub const NextTickNode = std.atomic.QueueMpsc(promise).Node; + + /// The allocator must be thread-safe because we use it for multiplexing + /// coroutines onto kernel threads. + pub fn init(allocator: *mem.Allocator) !Loop { const epollfd = try std.os.linuxEpollCreate(std.os.linux.EPOLL_CLOEXEC); + errdefer std.os.close(epollfd); + return Loop{ .keep_running = true, .allocator = allocator, .epollfd = epollfd, + .next_tick_queue = std.atomic.QueueMpsc(promise).init(), }; } + /// must call stop before deinit + pub fn deinit(self: *Loop) void { + std.os.close(self.epollfd); + } + pub fn addFd(self: *Loop, fd: i32, prom: promise) !void { var ev = std.os.linux.epoll_event{ .events = std.os.linux.EPOLLIN | std.os.linux.EPOLLOUT | std.os.linux.EPOLLET, @@ -126,11 +141,21 @@ pub const Loop = struct { pub fn stop(self: *Loop) void { // TODO make atomic self.keep_running = false; - // TODO activate an fd in the epoll set + // TODO activate an fd in the epoll set which should cancel all the promises + } + + /// bring your own linked list node. this means it can't fail. + pub fn onNextTick(self: *Loop, node: *NextTickNode) void { + self.next_tick_queue.put(node); } pub fn run(self: *Loop) void { while (self.keep_running) { + // TODO multiplex the next tick queue and the epoll event results onto a thread pool + while (self.next_tick_queue.get()) |node| { + resume node.data; + } + if (!self.keep_running) break; var events: [16]std.os.linux.epoll_event = undefined; const count = std.os.linuxEpollWait(self.epollfd, events[0..], -1); for (events[0..count]) |ev| { @@ -141,6 +166,215 @@ pub const Loop = struct { } }; +/// many producer, many consumer, thread-safe, lock-free, runtime configurable buffer size +/// when buffer is empty, consumers suspend and are resumed by producers +/// when buffer is full, producers suspend and are resumed by consumers +pub fn Channel(comptime T: type) type { + return struct { + loop: *Loop, + + getters: std.atomic.QueueMpsc(GetNode), + putters: std.atomic.QueueMpsc(PutNode), + get_count: usize, + put_count: usize, + dispatch_lock: u8, // TODO make this a bool + need_dispatch: u8, // TODO make this a bool + + // simple fixed size ring buffer + buffer_nodes: []T, + buffer_index: usize, + buffer_len: usize, + + const SelfChannel = this; + const GetNode = struct { + ptr: *T, + tick_node: *Loop.NextTickNode, + }; + const PutNode = struct { + data: T, + tick_node: *Loop.NextTickNode, + }; + + /// call destroy when done + pub fn create(loop: *Loop, capacity: usize) !*SelfChannel { + const buffer_nodes = try loop.allocator.alloc(T, capacity); + errdefer loop.allocator.free(buffer_nodes); + + const self = try loop.allocator.create(SelfChannel{ + .loop = loop, + .buffer_len = 0, + .buffer_nodes = buffer_nodes, + .buffer_index = 0, + .dispatch_lock = 0, + .need_dispatch = 0, + .getters = std.atomic.QueueMpsc(GetNode).init(), + .putters = std.atomic.QueueMpsc(PutNode).init(), + .get_count = 0, + .put_count = 0, + }); + errdefer loop.allocator.destroy(self); + + return self; + } + + /// must be called when all calls to put and get have suspended and no more calls occur + pub fn destroy(self: *SelfChannel) void { + while (self.getters.get()) |get_node| { + cancel get_node.data.tick_node.data; + } + while (self.putters.get()) |put_node| { + cancel put_node.data.tick_node.data; + } + self.loop.allocator.free(self.buffer_nodes); + self.loop.allocator.destroy(self); + } + + /// puts a data item in the channel. The promise completes when the value has been added to the + /// buffer, or in the case of a zero size buffer, when the item has been retrieved by a getter. + pub async fn put(self: *SelfChannel, data: T) void { + // TODO should be able to group memory allocation failure before first suspend point + // so that the async invocation catches it + var dispatch_tick_node_ptr: *Loop.NextTickNode = undefined; + _ = async self.dispatch(&dispatch_tick_node_ptr) catch unreachable; + + suspend |handle| { + var my_tick_node = Loop.NextTickNode{ + .next = undefined, + .data = handle, + }; + var queue_node = std.atomic.QueueMpsc(PutNode).Node{ + .data = PutNode{ + .tick_node = &my_tick_node, + .data = data, + }, + .next = undefined, + }; + self.putters.put(&queue_node); + _ = @atomicRmw(usize, &self.put_count, AtomicRmwOp.Add, 1, AtomicOrder.SeqCst); + + self.loop.onNextTick(dispatch_tick_node_ptr); + } + } + + /// await this function to get an item from the channel. If the buffer is empty, the promise will + /// complete when the next item is put in the channel. + pub async fn get(self: *SelfChannel) T { + // TODO should be able to group memory allocation failure before first suspend point + // so that the async invocation catches it + var dispatch_tick_node_ptr: *Loop.NextTickNode = undefined; + _ = async self.dispatch(&dispatch_tick_node_ptr) catch unreachable; + + // TODO integrate this function with named return values + // so we can get rid of this extra result copy + var result: T = undefined; + var debug_handle: usize = undefined; + suspend |handle| { + debug_handle = @ptrToInt(handle); + var my_tick_node = Loop.NextTickNode{ + .next = undefined, + .data = handle, + }; + var queue_node = std.atomic.QueueMpsc(GetNode).Node{ + .data = GetNode{ + .ptr = &result, + .tick_node = &my_tick_node, + }, + .next = undefined, + }; + self.getters.put(&queue_node); + _ = @atomicRmw(usize, &self.get_count, AtomicRmwOp.Add, 1, AtomicOrder.SeqCst); + + self.loop.onNextTick(dispatch_tick_node_ptr); + } + return result; + } + + async fn dispatch(self: *SelfChannel, tick_node_ptr: **Loop.NextTickNode) void { + // resumed by onNextTick + suspend |handle| { + var tick_node = Loop.NextTickNode{ + .data = handle, + .next = undefined, + }; + tick_node_ptr.* = &tick_node; + } + + // set the "need dispatch" flag + _ = @atomicRmw(u8, &self.need_dispatch, AtomicRmwOp.Xchg, 1, AtomicOrder.SeqCst); + + lock: while (true) { + // set the lock flag + const prev_lock = @atomicRmw(u8, &self.dispatch_lock, AtomicRmwOp.Xchg, 1, AtomicOrder.SeqCst); + if (prev_lock != 0) return; + + // clear the need_dispatch flag since we're about to do it + _ = @atomicRmw(u8, &self.need_dispatch, AtomicRmwOp.Xchg, 0, AtomicOrder.SeqCst); + + while (true) { + one_dispatch: { + // later we correct these extra subtractions + var get_count = @atomicRmw(usize, &self.get_count, AtomicRmwOp.Sub, 1, AtomicOrder.SeqCst); + var put_count = @atomicRmw(usize, &self.put_count, AtomicRmwOp.Sub, 1, AtomicOrder.SeqCst); + + // transfer self.buffer to self.getters + while (self.buffer_len != 0) { + if (get_count == 0) break :one_dispatch; + + const get_node = &self.getters.get().?.data; + get_node.ptr.* = self.buffer_nodes[self.buffer_index -% self.buffer_len]; + self.loop.onNextTick(get_node.tick_node); + self.buffer_len -= 1; + + get_count = @atomicRmw(usize, &self.get_count, AtomicRmwOp.Sub, 1, AtomicOrder.SeqCst); + } + + // direct transfer self.putters to self.getters + while (get_count != 0 and put_count != 0) { + const get_node = &self.getters.get().?.data; + const put_node = &self.putters.get().?.data; + + get_node.ptr.* = put_node.data; + self.loop.onNextTick(get_node.tick_node); + self.loop.onNextTick(put_node.tick_node); + + get_count = @atomicRmw(usize, &self.get_count, AtomicRmwOp.Sub, 1, AtomicOrder.SeqCst); + put_count = @atomicRmw(usize, &self.put_count, AtomicRmwOp.Sub, 1, AtomicOrder.SeqCst); + } + + // transfer self.putters to self.buffer + while (self.buffer_len != self.buffer_nodes.len and put_count != 0) { + const put_node = &self.putters.get().?.data; + + self.buffer_nodes[self.buffer_index] = put_node.data; + self.loop.onNextTick(put_node.tick_node); + self.buffer_index +%= 1; + self.buffer_len += 1; + + put_count = @atomicRmw(usize, &self.put_count, AtomicRmwOp.Sub, 1, AtomicOrder.SeqCst); + } + } + + // undo the extra subtractions + _ = @atomicRmw(usize, &self.get_count, AtomicRmwOp.Add, 1, AtomicOrder.SeqCst); + _ = @atomicRmw(usize, &self.put_count, AtomicRmwOp.Add, 1, AtomicOrder.SeqCst); + + // clear need-dispatch flag + const need_dispatch = @atomicRmw(u8, &self.need_dispatch, AtomicRmwOp.Xchg, 0, AtomicOrder.SeqCst); + if (need_dispatch != 0) continue; + + const my_lock = @atomicRmw(u8, &self.dispatch_lock, AtomicRmwOp.Xchg, 0, AtomicOrder.SeqCst); + assert(my_lock != 0); + + // we have to check again now that we unlocked + if (@atomicLoad(u8, &self.need_dispatch, AtomicOrder.SeqCst) != 0) continue :lock; + + return; + } + } + } + }; +} + pub async fn connect(loop: *Loop, _address: *const std.net.Address) !std.os.File { var address = _address.*; // TODO https://github.com/ziglang/zig/issues/733 @@ -199,6 +433,7 @@ test "listen on a port, send bytes, receive bytes" { defer cancel p; loop.run(); } + async fn doAsyncTest(loop: *Loop, address: *const std.net.Address) void { errdefer @panic("test failure"); @@ -211,3 +446,43 @@ async fn doAsyncTest(loop: *Loop, address: *const std.net.Address) void { assert(mem.eql(u8, msg, "hello from server\n")); loop.stop(); } + +test "std.event.Channel" { + var da = std.heap.DirectAllocator.init(); + defer da.deinit(); + + const allocator = &da.allocator; + + var loop = try Loop.init(allocator); + defer loop.deinit(); + + const channel = try Channel(i32).create(&loop, 0); + defer channel.destroy(); + + const handle = try async testChannelGetter(&loop, channel); + defer cancel handle; + + const putter = try async testChannelPutter(channel); + defer cancel putter; + + loop.run(); +} + +async fn testChannelGetter(loop: *Loop, channel: *Channel(i32)) void { + errdefer @panic("test failed"); + + const value1_promise = try async channel.get(); + const value1 = await value1_promise; + assert(value1 == 1234); + + const value2_promise = try async channel.get(); + const value2 = await value2_promise; + assert(value2 == 4567); + + loop.stop(); +} + +async fn testChannelPutter(channel: *Channel(i32)) void { + await (async channel.put(1234) catch @panic("out of memory")); + await (async channel.put(4567) catch @panic("out of memory")); +} diff --git a/std/fmt/index.zig b/std/fmt/index.zig index bf12e86fef..c3c17f5322 100644 --- a/std/fmt/index.zig +++ b/std/fmt/index.zig @@ -130,6 +130,9 @@ pub fn formatType( try output(context, "error."); return output(context, @errorName(value)); }, + builtin.TypeId.Promise => { + return format(context, Errors, output, "promise@{x}", @ptrToInt(value)); + }, builtin.TypeId.Pointer => |ptr_info| switch (ptr_info.size) { builtin.TypeInfo.Pointer.Size.One => switch (@typeInfo(ptr_info.child)) { builtin.TypeId.Array => |info| { diff --git a/std/heap.zig b/std/heap.zig index 41d7802fdd..2e02733da1 100644 --- a/std/heap.zig +++ b/std/heap.zig @@ -38,6 +38,7 @@ fn cFree(self: *Allocator, old_mem: []u8) void { } /// This allocator makes a syscall directly for every allocation and free. +/// TODO make this thread-safe. The windows implementation will need some atomics. pub const DirectAllocator = struct { allocator: Allocator, heap_handle: ?HeapHandle,