M:N threading

* add std.atomic.QueueMpsc.isEmpty
 * make std.debug.global_allocator thread-safe
 * std.event.Loop: now you have to choose between
   - initSingleThreaded
   - initMultiThreaded
 * std.event.Loop multiplexes coroutines onto kernel threads
 * Remove std.event.Loop.stop. Instead the event loop run() function
   returns once there are no pending coroutines.
 * fix crash in ir.cpp for calling methods under some conditions
 * small progress self-hosted compiler, analyzing top level declarations
 * Introduce std.event.Lock for synchronizing coroutines
 * introduce std.event.Locked(T) for data that only 1 coroutine should
   modify at once.
 * make the self hosted compiler use multi threaded event loop
 * make std.heap.DirectAllocator thread-safe

See #174

TODO:
 * call sched_getaffinity instead of hard coding thread pool size 4
 * support for Windows and MacOS
 * #1194
 * #1197
This commit is contained in:
Andrew Kelley 2018-07-05 15:09:02 -04:00
parent d8295c1889
commit eb326e1553
10 changed files with 833 additions and 114 deletions

View File

@ -384,7 +384,8 @@ fn buildOutputType(allocator: *Allocator, args: []const []const u8, out_type: Mo
const zig_lib_dir = introspect.resolveZigLibDir(allocator) catch os.exit(1);
defer allocator.free(zig_lib_dir);
var loop = try event.Loop.init(allocator);
var loop: event.Loop = undefined;
try loop.initMultiThreaded(allocator);
var module = try Module.create(
&loop,
@ -493,8 +494,6 @@ async fn processBuildEvents(module: *Module, watch: bool) void {
switch (build_event) {
Module.Event.Ok => {
std.debug.warn("Build succeeded\n");
// for now we stop after 1
module.loop.stop();
return;
},
Module.Event.Error => |err| {

View File

@ -2,6 +2,7 @@ const std = @import("std");
const os = std.os;
const io = std.io;
const mem = std.mem;
const Allocator = mem.Allocator;
const Buffer = std.Buffer;
const llvm = @import("llvm.zig");
const c = @import("c.zig");
@ -13,6 +14,7 @@ const ArrayList = std.ArrayList;
const errmsg = @import("errmsg.zig");
const ast = std.zig.ast;
const event = std.event;
const assert = std.debug.assert;
pub const Module = struct {
loop: *event.Loop,
@ -81,6 +83,8 @@ pub const Module = struct {
link_out_file: ?[]const u8,
events: *event.Channel(Event),
exported_symbol_names: event.Locked(Decl.Table),
// TODO handle some of these earlier and report them in a way other than error codes
pub const BuildError = error{
OutOfMemory,
@ -232,6 +236,7 @@ pub const Module = struct {
.test_name_prefix = null,
.emit_file_type = Emit.Binary,
.link_out_file = null,
.exported_symbol_names = event.Locked(Decl.Table).init(loop, Decl.Table.init(loop.allocator)),
});
}
@ -272,38 +277,91 @@ pub const Module = struct {
return;
};
await (async self.events.put(Event.Ok) catch unreachable);
// for now we stop after 1
return;
}
}
async fn addRootSrc(self: *Module) !void {
const root_src_path = self.root_src_path orelse @panic("TODO handle null root src path");
// TODO async/await os.path.real
const root_src_real_path = os.path.real(self.a(), root_src_path) catch |err| {
try printError("unable to get real path '{}': {}", root_src_path, err);
return err;
};
errdefer self.a().free(root_src_real_path);
// TODO async/await readFileAlloc()
const source_code = io.readFileAlloc(self.a(), root_src_real_path) catch |err| {
try printError("unable to open '{}': {}", root_src_real_path, err);
return err;
};
errdefer self.a().free(source_code);
var tree = try std.zig.parse(self.a(), source_code);
defer tree.deinit();
var parsed_file = ParsedFile{
.tree = try std.zig.parse(self.a(), source_code),
.realpath = root_src_real_path,
};
errdefer parsed_file.tree.deinit();
//var it = tree.root_node.decls.iterator();
//while (it.next()) |decl_ptr| {
// const decl = decl_ptr.*;
// switch (decl.id) {
// ast.Node.Comptime => @panic("TODO"),
// ast.Node.VarDecl => @panic("TODO"),
// ast.Node.UseDecl => @panic("TODO"),
// ast.Node.FnDef => @panic("TODO"),
// ast.Node.TestDecl => @panic("TODO"),
// else => unreachable,
// }
//}
const tree = &parsed_file.tree;
// create empty struct for it
const decls = try Scope.Decls.create(self.a(), null);
errdefer decls.destroy();
var it = tree.root_node.decls.iterator(0);
while (it.next()) |decl_ptr| {
const decl = decl_ptr.*;
switch (decl.id) {
ast.Node.Id.Comptime => @panic("TODO"),
ast.Node.Id.VarDecl => @panic("TODO"),
ast.Node.Id.FnProto => {
const fn_proto = @fieldParentPtr(ast.Node.FnProto, "base", decl);
const name = if (fn_proto.name_token) |name_token| tree.tokenSlice(name_token) else {
@panic("TODO add compile error");
//try self.addCompileError(
// &parsed_file,
// fn_proto.fn_token,
// fn_proto.fn_token + 1,
// "missing function name",
//);
continue;
};
const fn_decl = try self.a().create(Decl.Fn{
.base = Decl{
.id = Decl.Id.Fn,
.name = name,
.visib = parseVisibToken(tree, fn_proto.visib_token),
.resolution = Decl.Resolution.Unresolved,
},
.value = Decl.Fn.Val{ .Unresolved = {} },
.fn_proto = fn_proto,
});
errdefer self.a().destroy(fn_decl);
// TODO make this parallel
try await try async self.addTopLevelDecl(tree, &fn_decl.base);
},
ast.Node.Id.TestDecl => @panic("TODO"),
else => unreachable,
}
}
}
async fn addTopLevelDecl(self: *Module, tree: *ast.Tree, decl: *Decl) !void {
const is_export = decl.isExported(tree);
{
const exported_symbol_names = await try async self.exported_symbol_names.acquire();
defer exported_symbol_names.release();
if (try exported_symbol_names.value.put(decl.name, decl)) |other_decl| {
@panic("TODO report compile error");
}
}
}
pub fn link(self: *Module, out_file: ?[]const u8) !void {
@ -350,3 +408,172 @@ fn printError(comptime format: []const u8, args: ...) !void {
const out_stream = &stderr_file_out_stream.stream;
try out_stream.print(format, args);
}
fn parseVisibToken(tree: *ast.Tree, optional_token_index: ?ast.TokenIndex) Visib {
if (optional_token_index) |token_index| {
const token = tree.tokens.at(token_index);
assert(token.id == Token.Id.Keyword_pub);
return Visib.Pub;
} else {
return Visib.Private;
}
}
pub const Scope = struct {
id: Id,
parent: ?*Scope,
pub const Id = enum {
Decls,
Block,
};
pub const Decls = struct {
base: Scope,
table: Decl.Table,
pub fn create(a: *Allocator, parent: ?*Scope) !*Decls {
const self = try a.create(Decls{
.base = Scope{
.id = Id.Decls,
.parent = parent,
},
.table = undefined,
});
errdefer a.destroy(self);
self.table = Decl.Table.init(a);
errdefer self.table.deinit();
return self;
}
pub fn destroy(self: *Decls) void {
self.table.deinit();
self.table.allocator.destroy(self);
self.* = undefined;
}
};
pub const Block = struct {
base: Scope,
};
};
pub const Visib = enum {
Private,
Pub,
};
pub const Decl = struct {
id: Id,
name: []const u8,
visib: Visib,
resolution: Resolution,
pub const Table = std.HashMap([]const u8, *Decl, mem.hash_slice_u8, mem.eql_slice_u8);
pub fn isExported(base: *const Decl, tree: *ast.Tree) bool {
switch (base.id) {
Id.Fn => {
const fn_decl = @fieldParentPtr(Fn, "base", base);
return fn_decl.isExported(tree);
},
else => return false,
}
}
pub const Resolution = enum {
Unresolved,
InProgress,
Invalid,
Ok,
};
pub const Id = enum {
Var,
Fn,
CompTime,
};
pub const Var = struct {
base: Decl,
};
pub const Fn = struct {
base: Decl,
value: Val,
fn_proto: *const ast.Node.FnProto,
// TODO https://github.com/ziglang/zig/issues/683 and then make this anonymous
pub const Val = union {
Unresolved: void,
Ok: *Value.Fn,
};
pub fn externLibName(self: Fn, tree: *ast.Tree) ?[]const u8 {
return if (self.fn_proto.extern_export_inline_token) |tok_index| x: {
const token = tree.tokens.at(tok_index);
break :x switch (token.id) {
Token.Id.Extern => tree.tokenSlicePtr(token),
else => null,
};
} else null;
}
pub fn isExported(self: Fn, tree: *ast.Tree) bool {
if (self.fn_proto.extern_export_inline_token) |tok_index| {
const token = tree.tokens.at(tok_index);
return token.id == Token.Id.Keyword_export;
} else {
return false;
}
}
};
pub const CompTime = struct {
base: Decl,
};
};
pub const Value = struct {
pub const Fn = struct {};
};
pub const Type = struct {
id: Id,
pub const Id = enum {
Type,
Void,
Bool,
NoReturn,
Int,
Float,
Pointer,
Array,
Struct,
ComptimeFloat,
ComptimeInt,
Undefined,
Null,
Optional,
ErrorUnion,
ErrorSet,
Enum,
Union,
Fn,
Opaque,
Promise,
};
pub const Struct = struct {
base: Type,
decls: *Scope.Decls,
};
};
pub const ParsedFile = struct {
tree: ast.Tree,
realpath: []const u8,
};

View File

@ -13278,7 +13278,7 @@ static TypeTableEntry *ir_analyze_instruction_call(IrAnalyze *ira, IrInstruction
FnTableEntry *fn_table_entry = fn_ref->value.data.x_bound_fn.fn;
IrInstruction *first_arg_ptr = fn_ref->value.data.x_bound_fn.first_arg;
return ir_analyze_fn_call(ira, call_instruction, fn_table_entry, fn_table_entry->type_entry,
nullptr, first_arg_ptr, is_comptime, call_instruction->fn_inline);
fn_ref, first_arg_ptr, is_comptime, call_instruction->fn_inline);
} else {
ir_add_error_node(ira, fn_ref->source_node,
buf_sprintf("type '%s' not a function", buf_ptr(&fn_ref->value.type->name)));

View File

@ -15,6 +15,8 @@ pub fn QueueMpsc(comptime T: type) type {
pub const Node = std.atomic.Stack(T).Node;
/// Not thread-safe. The call to init() must complete before any other functions are called.
/// No deinitialization required.
pub fn init() Self {
return Self{
.inboxes = []std.atomic.Stack(T){
@ -26,12 +28,15 @@ pub fn QueueMpsc(comptime T: type) type {
};
}
/// Fully thread-safe. put() may be called from any thread at any time.
pub fn put(self: *Self, node: *Node) void {
const inbox_index = @atomicLoad(usize, &self.inbox_index, AtomicOrder.SeqCst);
const inbox = &self.inboxes[inbox_index];
inbox.push(node);
}
/// Must be called by only 1 consumer at a time. Every call to get() and isEmpty() must complete before
/// the next call to get().
pub fn get(self: *Self) ?*Node {
if (self.outbox.pop()) |node| {
return node;
@ -43,6 +48,18 @@ pub fn QueueMpsc(comptime T: type) type {
}
return self.outbox.pop();
}
/// Must be called by only 1 consumer at a time. Every call to get() and isEmpty() must complete before
/// the next call to isEmpty().
pub fn isEmpty(self: *Self) bool {
if (!self.outbox.isEmpty()) return false;
const prev_inbox_index = @atomicRmw(usize, &self.inbox_index, AtomicRmwOp.Xor, 0x1, AtomicOrder.SeqCst);
const prev_inbox = &self.inboxes[prev_inbox_index];
while (prev_inbox.pop()) |node| {
self.outbox.push(node);
}
return self.outbox.isEmpty();
}
};
}

View File

@ -11,6 +11,11 @@ const builtin = @import("builtin");
pub const FailingAllocator = @import("failing_allocator.zig").FailingAllocator;
pub const runtime_safety = switch (builtin.mode) {
builtin.Mode.Debug, builtin.Mode.ReleaseSafe => true,
builtin.Mode.ReleaseFast, builtin.Mode.ReleaseSmall => false,
};
/// Tries to write to stderr, unbuffered, and ignores any error returned.
/// Does not append a newline.
/// TODO atomic/multithread support
@ -1098,7 +1103,7 @@ fn readILeb128(in_stream: var) !i64 {
/// This should only be used in temporary test programs.
pub const global_allocator = &global_fixed_allocator.allocator;
var global_fixed_allocator = std.heap.FixedBufferAllocator.init(global_allocator_mem[0..]);
var global_fixed_allocator = std.heap.ThreadSafeFixedBufferAllocator.init(global_allocator_mem[0..]);
var global_allocator_mem: [100 * 1024]u8 = undefined;
// TODO make thread safe

View File

@ -11,53 +11,69 @@ pub const TcpServer = struct {
handleRequestFn: async<*mem.Allocator> fn (*TcpServer, *const std.net.Address, *const std.os.File) void,
loop: *Loop,
sockfd: i32,
sockfd: ?i32,
accept_coro: ?promise,
listen_address: std.net.Address,
waiting_for_emfile_node: PromiseNode,
listen_resume_node: event.Loop.ResumeNode,
const PromiseNode = std.LinkedList(promise).Node;
pub fn init(loop: *Loop) !TcpServer {
const sockfd = try std.os.posixSocket(posix.AF_INET, posix.SOCK_STREAM | posix.SOCK_CLOEXEC | posix.SOCK_NONBLOCK, posix.PROTO_tcp);
errdefer std.os.close(sockfd);
pub fn init(loop: *Loop) TcpServer {
// TODO can't initialize handler coroutine here because we need well defined copy elision
return TcpServer{
.loop = loop,
.sockfd = sockfd,
.sockfd = null,
.accept_coro = null,
.handleRequestFn = undefined,
.waiting_for_emfile_node = undefined,
.listen_address = undefined,
.listen_resume_node = event.Loop.ResumeNode{
.id = event.Loop.ResumeNode.Id.Basic,
.handle = undefined,
},
};
}
pub fn listen(self: *TcpServer, address: *const std.net.Address, handleRequestFn: async<*mem.Allocator> fn (*TcpServer, *const std.net.Address, *const std.os.File) void) !void {
pub fn listen(
self: *TcpServer,
address: *const std.net.Address,
handleRequestFn: async<*mem.Allocator> fn (*TcpServer, *const std.net.Address, *const std.os.File) void,
) !void {
self.handleRequestFn = handleRequestFn;
try std.os.posixBind(self.sockfd, &address.os_addr);
try std.os.posixListen(self.sockfd, posix.SOMAXCONN);
self.listen_address = std.net.Address.initPosix(try std.os.posixGetSockName(self.sockfd));
const sockfd = try std.os.posixSocket(posix.AF_INET, posix.SOCK_STREAM | posix.SOCK_CLOEXEC | posix.SOCK_NONBLOCK, posix.PROTO_tcp);
errdefer std.os.close(sockfd);
self.sockfd = sockfd;
try std.os.posixBind(sockfd, &address.os_addr);
try std.os.posixListen(sockfd, posix.SOMAXCONN);
self.listen_address = std.net.Address.initPosix(try std.os.posixGetSockName(sockfd));
self.accept_coro = try async<self.loop.allocator> TcpServer.handler(self);
errdefer cancel self.accept_coro.?;
try self.loop.addFd(self.sockfd, self.accept_coro.?);
errdefer self.loop.removeFd(self.sockfd);
self.listen_resume_node.handle = self.accept_coro.?;
try self.loop.addFd(sockfd, &self.listen_resume_node);
errdefer self.loop.removeFd(sockfd);
}
/// Stop listening
pub fn close(self: *TcpServer) void {
self.loop.removeFd(self.sockfd.?);
std.os.close(self.sockfd.?);
}
pub fn deinit(self: *TcpServer) void {
self.loop.removeFd(self.sockfd);
if (self.accept_coro) |accept_coro| cancel accept_coro;
std.os.close(self.sockfd);
if (self.sockfd) |sockfd| std.os.close(sockfd);
}
pub async fn handler(self: *TcpServer) void {
while (true) {
var accepted_addr: std.net.Address = undefined;
if (std.os.posixAccept(self.sockfd, &accepted_addr.os_addr, posix.SOCK_NONBLOCK | posix.SOCK_CLOEXEC)) |accepted_fd| {
if (std.os.posixAccept(self.sockfd.?, &accepted_addr.os_addr, posix.SOCK_NONBLOCK | posix.SOCK_CLOEXEC)) |accepted_fd| {
var socket = std.os.File.openHandle(accepted_fd);
_ = async<self.loop.allocator> self.handleRequestFn(self, accepted_addr, socket) catch |err| switch (err) {
error.OutOfMemory => {
@ -95,32 +111,65 @@ pub const TcpServer = struct {
pub const Loop = struct {
allocator: *mem.Allocator,
keep_running: bool,
next_tick_queue: std.atomic.QueueMpsc(promise),
os_data: OsData,
const OsData = switch (builtin.os) {
builtin.Os.linux => struct {
epollfd: i32,
},
else => struct {},
};
dispatch_lock: u8, // TODO make this a bool
pending_event_count: usize,
extra_threads: []*std.os.Thread,
final_resume_node: ResumeNode,
pub const NextTickNode = std.atomic.QueueMpsc(promise).Node;
pub const ResumeNode = struct {
id: Id,
handle: promise,
pub const Id = enum {
Basic,
Stop,
EventFd,
};
pub const EventFd = struct {
base: ResumeNode,
eventfd: i32,
};
};
/// After initialization, call run().
/// TODO copy elision / named return values so that the threads referencing *Loop
/// have the correct pointer value.
fn initSingleThreaded(self: *Loop, allocator: *mem.Allocator) !void {
return self.initInternal(allocator, 1);
}
/// The allocator must be thread-safe because we use it for multiplexing
/// coroutines onto kernel threads.
pub fn init(allocator: *mem.Allocator) !Loop {
var self = Loop{
.keep_running = true,
/// After initialization, call run().
/// TODO copy elision / named return values so that the threads referencing *Loop
/// have the correct pointer value.
fn initMultiThreaded(self: *Loop, allocator: *mem.Allocator) !void {
// TODO check the actual cpu core count
return self.initInternal(allocator, 4);
}
/// Thread count is the total thread count. The thread pool size will be
/// max(thread_count - 1, 0)
fn initInternal(self: *Loop, allocator: *mem.Allocator, thread_count: usize) !void {
self.* = Loop{
.pending_event_count = 0,
.allocator = allocator,
.os_data = undefined,
.next_tick_queue = std.atomic.QueueMpsc(promise).init(),
.dispatch_lock = 1, // start locked so threads go directly into epoll wait
.extra_threads = undefined,
.final_resume_node = ResumeNode{
.id = ResumeNode.Id.Stop,
.handle = undefined,
},
};
try self.initOsData();
try self.initOsData(thread_count);
errdefer self.deinitOsData();
return self;
}
/// must call stop before deinit
@ -128,13 +177,70 @@ pub const Loop = struct {
self.deinitOsData();
}
const InitOsDataError = std.os.LinuxEpollCreateError;
const InitOsDataError = std.os.LinuxEpollCreateError || mem.Allocator.Error || std.os.LinuxEventFdError ||
std.os.SpawnThreadError || std.os.LinuxEpollCtlError;
fn initOsData(self: *Loop) InitOsDataError!void {
const wakeup_bytes = []u8{0x1} ** 8;
fn initOsData(self: *Loop, thread_count: usize) InitOsDataError!void {
switch (builtin.os) {
builtin.Os.linux => {
self.os_data.epollfd = try std.os.linuxEpollCreate(std.os.linux.EPOLL_CLOEXEC);
const extra_thread_count = thread_count - 1;
self.os_data.available_eventfd_resume_nodes = std.atomic.Stack(ResumeNode.EventFd).init();
self.os_data.eventfd_resume_nodes = try self.allocator.alloc(
std.atomic.Stack(ResumeNode.EventFd).Node,
extra_thread_count,
);
errdefer self.allocator.free(self.os_data.eventfd_resume_nodes);
errdefer {
while (self.os_data.available_eventfd_resume_nodes.pop()) |node| std.os.close(node.data.eventfd);
}
for (self.os_data.eventfd_resume_nodes) |*eventfd_node| {
eventfd_node.* = std.atomic.Stack(ResumeNode.EventFd).Node{
.data = ResumeNode.EventFd{
.base = ResumeNode{
.id = ResumeNode.Id.EventFd,
.handle = undefined,
},
.eventfd = try std.os.linuxEventFd(1, posix.EFD_CLOEXEC | posix.EFD_NONBLOCK),
},
.next = undefined,
};
self.os_data.available_eventfd_resume_nodes.push(eventfd_node);
}
self.os_data.epollfd = try std.os.linuxEpollCreate(posix.EPOLL_CLOEXEC);
errdefer std.os.close(self.os_data.epollfd);
self.os_data.final_eventfd = try std.os.linuxEventFd(0, posix.EFD_CLOEXEC | posix.EFD_NONBLOCK);
errdefer std.os.close(self.os_data.final_eventfd);
self.os_data.final_eventfd_event = posix.epoll_event{
.events = posix.EPOLLIN,
.data = posix.epoll_data{ .ptr = @ptrToInt(&self.final_resume_node) },
};
try std.os.linuxEpollCtl(
self.os_data.epollfd,
posix.EPOLL_CTL_ADD,
self.os_data.final_eventfd,
&self.os_data.final_eventfd_event,
);
self.extra_threads = try self.allocator.alloc(*std.os.Thread, extra_thread_count);
errdefer self.allocator.free(self.extra_threads);
var extra_thread_index: usize = 0;
errdefer {
while (extra_thread_index != 0) {
extra_thread_index -= 1;
// writing 8 bytes to an eventfd cannot fail
std.os.posixWrite(self.os_data.final_eventfd, wakeup_bytes) catch unreachable;
self.extra_threads[extra_thread_index].wait();
}
}
while (extra_thread_index < extra_thread_count) : (extra_thread_index += 1) {
self.extra_threads[extra_thread_index] = try std.os.spawnThread(self, workerRun);
}
},
else => {},
}
@ -142,65 +248,154 @@ pub const Loop = struct {
fn deinitOsData(self: *Loop) void {
switch (builtin.os) {
builtin.Os.linux => std.os.close(self.os_data.epollfd),
builtin.Os.linux => {
std.os.close(self.os_data.final_eventfd);
while (self.os_data.available_eventfd_resume_nodes.pop()) |node| std.os.close(node.data.eventfd);
std.os.close(self.os_data.epollfd);
self.allocator.free(self.os_data.eventfd_resume_nodes);
self.allocator.free(self.extra_threads);
},
else => {},
}
}
pub fn addFd(self: *Loop, fd: i32, prom: promise) !void {
/// resume_node must live longer than the promise that it holds a reference to.
pub fn addFd(self: *Loop, fd: i32, resume_node: *ResumeNode) !void {
_ = @atomicRmw(usize, &self.pending_event_count, AtomicRmwOp.Add, 1, AtomicOrder.SeqCst);
errdefer {
_ = @atomicRmw(usize, &self.pending_event_count, AtomicRmwOp.Sub, 1, AtomicOrder.SeqCst);
}
try self.addFdNoCounter(fd, resume_node);
}
fn addFdNoCounter(self: *Loop, fd: i32, resume_node: *ResumeNode) !void {
var ev = std.os.linux.epoll_event{
.events = std.os.linux.EPOLLIN | std.os.linux.EPOLLOUT | std.os.linux.EPOLLET,
.data = std.os.linux.epoll_data{ .ptr = @ptrToInt(prom) },
.data = std.os.linux.epoll_data{ .ptr = @ptrToInt(resume_node) },
};
try std.os.linuxEpollCtl(self.os_data.epollfd, std.os.linux.EPOLL_CTL_ADD, fd, &ev);
}
pub fn removeFd(self: *Loop, fd: i32) void {
self.removeFdNoCounter(fd);
_ = @atomicRmw(usize, &self.pending_event_count, AtomicRmwOp.Sub, 1, AtomicOrder.SeqCst);
}
fn removeFdNoCounter(self: *Loop, fd: i32) void {
std.os.linuxEpollCtl(self.os_data.epollfd, std.os.linux.EPOLL_CTL_DEL, fd, undefined) catch {};
}
async fn waitFd(self: *Loop, fd: i32) !void {
pub async fn waitFd(self: *Loop, fd: i32) !void {
defer self.removeFd(fd);
var resume_node = ResumeNode{
.id = ResumeNode.Id.Basic,
.handle = undefined,
};
suspend |p| {
try self.addFd(fd, p);
resume_node.handle = p;
try self.addFd(fd, &resume_node);
}
var a = &resume_node; // TODO better way to explicitly put memory in coro frame
}
pub fn stop(self: *Loop) void {
// TODO make atomic
self.keep_running = false;
// TODO activate an fd in the epoll set which should cancel all the promises
}
/// bring your own linked list node. this means it can't fail.
/// Bring your own linked list node. This means it can't fail.
pub fn onNextTick(self: *Loop, node: *NextTickNode) void {
_ = @atomicRmw(usize, &self.pending_event_count, AtomicRmwOp.Add, 1, AtomicOrder.SeqCst);
self.next_tick_queue.put(node);
}
pub fn run(self: *Loop) void {
while (self.keep_running) {
// TODO multiplex the next tick queue and the epoll event results onto a thread pool
while (self.next_tick_queue.get()) |node| {
resume node.data;
}
if (!self.keep_running) break;
self.dispatchOsEvents();
_ = @atomicRmw(u8, &self.dispatch_lock, AtomicRmwOp.Xchg, 0, AtomicOrder.SeqCst);
self.workerRun();
for (self.extra_threads) |extra_thread| {
extra_thread.wait();
}
}
fn dispatchOsEvents(self: *Loop) void {
switch (builtin.os) {
builtin.Os.linux => {
var events: [16]std.os.linux.epoll_event = undefined;
const count = std.os.linuxEpollWait(self.os_data.epollfd, events[0..], -1);
for (events[0..count]) |ev| {
const p = @intToPtr(promise, ev.data.ptr);
resume p;
fn workerRun(self: *Loop) void {
start_over: while (true) {
if (@atomicRmw(u8, &self.dispatch_lock, AtomicRmwOp.Xchg, 1, AtomicOrder.SeqCst) == 0) {
while (self.next_tick_queue.get()) |next_tick_node| {
const handle = next_tick_node.data;
if (self.next_tick_queue.isEmpty()) {
// last node, just resume it
_ = @atomicRmw(u8, &self.dispatch_lock, AtomicRmwOp.Xchg, 0, AtomicOrder.SeqCst);
resume handle;
_ = @atomicRmw(usize, &self.pending_event_count, AtomicRmwOp.Sub, 1, AtomicOrder.SeqCst);
continue :start_over;
}
// non-last node, stick it in the epoll set so that
// other threads can get to it
if (self.os_data.available_eventfd_resume_nodes.pop()) |resume_stack_node| {
const eventfd_node = &resume_stack_node.data;
eventfd_node.base.handle = handle;
// the pending count is already accounted for
self.addFdNoCounter(eventfd_node.eventfd, &eventfd_node.base) catch |_| {
// fine, we didn't need it anyway
_ = @atomicRmw(u8, &self.dispatch_lock, AtomicRmwOp.Xchg, 0, AtomicOrder.SeqCst);
self.os_data.available_eventfd_resume_nodes.push(resume_stack_node);
resume handle;
_ = @atomicRmw(usize, &self.pending_event_count, AtomicRmwOp.Sub, 1, AtomicOrder.SeqCst);
continue :start_over;
};
} else {
// threads are too busy, can't add another eventfd to wake one up
_ = @atomicRmw(u8, &self.dispatch_lock, AtomicRmwOp.Xchg, 0, AtomicOrder.SeqCst);
resume handle;
_ = @atomicRmw(usize, &self.pending_event_count, AtomicRmwOp.Sub, 1, AtomicOrder.SeqCst);
continue :start_over;
}
}
},
else => {},
const pending_event_count = @atomicLoad(usize, &self.pending_event_count, AtomicOrder.SeqCst);
if (pending_event_count == 0) {
// cause all the threads to stop
// writing 8 bytes to an eventfd cannot fail
std.os.posixWrite(self.os_data.final_eventfd, wakeup_bytes) catch unreachable;
return;
}
_ = @atomicRmw(u8, &self.dispatch_lock, AtomicRmwOp.Xchg, 0, AtomicOrder.SeqCst);
}
// only process 1 event so we don't steal from other threads
var events: [1]std.os.linux.epoll_event = undefined;
const count = std.os.linuxEpollWait(self.os_data.epollfd, events[0..], -1);
for (events[0..count]) |ev| {
const resume_node = @intToPtr(*ResumeNode, ev.data.ptr);
const handle = resume_node.handle;
const resume_node_id = resume_node.id;
switch (resume_node_id) {
ResumeNode.Id.Basic => {},
ResumeNode.Id.Stop => return,
ResumeNode.Id.EventFd => {
const event_fd_node = @fieldParentPtr(ResumeNode.EventFd, "base", resume_node);
self.removeFdNoCounter(event_fd_node.eventfd);
const stack_node = @fieldParentPtr(std.atomic.Stack(ResumeNode.EventFd).Node, "data", event_fd_node);
self.os_data.available_eventfd_resume_nodes.push(stack_node);
},
}
resume handle;
if (resume_node_id == ResumeNode.Id.EventFd) {
_ = @atomicRmw(usize, &self.pending_event_count, AtomicRmwOp.Sub, 1, AtomicOrder.SeqCst);
}
}
}
}
const OsData = switch (builtin.os) {
builtin.Os.linux => struct {
epollfd: i32,
// pre-allocated eventfds. all permanently active.
// this is how we send promises to be resumed on other threads.
available_eventfd_resume_nodes: std.atomic.Stack(ResumeNode.EventFd),
eventfd_resume_nodes: []std.atomic.Stack(ResumeNode.EventFd).Node,
final_eventfd: i32,
final_eventfd_event: posix.epoll_event,
},
else => struct {},
};
};
/// many producer, many consumer, thread-safe, lock-free, runtime configurable buffer size
@ -304,9 +499,7 @@ pub fn Channel(comptime T: type) type {
// TODO integrate this function with named return values
// so we can get rid of this extra result copy
var result: T = undefined;
var debug_handle: usize = undefined;
suspend |handle| {
debug_handle = @ptrToInt(handle);
var my_tick_node = Loop.NextTickNode{
.next = undefined,
.data = handle,
@ -438,9 +631,8 @@ test "listen on a port, send bytes, receive bytes" {
const self = @fieldParentPtr(Self, "tcp_server", tcp_server);
var socket = _socket.*; // TODO https://github.com/ziglang/zig/issues/733
defer socket.close();
const next_handler = async errorableHandler(self, _addr, socket) catch |err| switch (err) {
error.OutOfMemory => @panic("unable to handle connection: out of memory"),
};
// TODO guarantee elision of this allocation
const next_handler = async errorableHandler(self, _addr, socket) catch unreachable;
(await next_handler) catch |err| {
std.debug.panic("unable to handle connection: {}\n", err);
};
@ -461,17 +653,18 @@ test "listen on a port, send bytes, receive bytes" {
const ip4addr = std.net.parseIp4("127.0.0.1") catch unreachable;
const addr = std.net.Address.initIp4(ip4addr, 0);
var loop = try Loop.init(std.debug.global_allocator);
var server = MyServer{ .tcp_server = try TcpServer.init(&loop) };
var loop: Loop = undefined;
try loop.initSingleThreaded(std.debug.global_allocator);
var server = MyServer{ .tcp_server = TcpServer.init(&loop) };
defer server.tcp_server.deinit();
try server.tcp_server.listen(addr, MyServer.handler);
const p = try async<std.debug.global_allocator> doAsyncTest(&loop, server.tcp_server.listen_address);
const p = try async<std.debug.global_allocator> doAsyncTest(&loop, server.tcp_server.listen_address, &server.tcp_server);
defer cancel p;
loop.run();
}
async fn doAsyncTest(loop: *Loop, address: *const std.net.Address) void {
async fn doAsyncTest(loop: *Loop, address: *const std.net.Address, server: *TcpServer) void {
errdefer @panic("test failure");
var socket_file = try await try async event.connect(loop, address);
@ -481,7 +674,7 @@ async fn doAsyncTest(loop: *Loop, address: *const std.net.Address) void {
const amt_read = try socket_file.read(buf[0..]);
const msg = buf[0..amt_read];
assert(mem.eql(u8, msg, "hello from server\n"));
loop.stop();
server.close();
}
test "std.event.Channel" {
@ -490,7 +683,9 @@ test "std.event.Channel" {
const allocator = &da.allocator;
var loop = try Loop.init(allocator);
var loop: Loop = undefined;
// TODO make a multi threaded test
try loop.initSingleThreaded(allocator);
defer loop.deinit();
const channel = try Channel(i32).create(&loop, 0);
@ -515,11 +710,248 @@ async fn testChannelGetter(loop: *Loop, channel: *Channel(i32)) void {
const value2_promise = try async channel.get();
const value2 = await value2_promise;
assert(value2 == 4567);
loop.stop();
}
async fn testChannelPutter(channel: *Channel(i32)) void {
await (async channel.put(1234) catch @panic("out of memory"));
await (async channel.put(4567) catch @panic("out of memory"));
}
/// Thread-safe async/await lock.
/// Does not make any syscalls - coroutines which are waiting for the lock are suspended, and
/// are resumed when the lock is released, in order.
pub const Lock = struct {
loop: *Loop,
shared_bit: u8, // TODO make this a bool
queue: Queue,
queue_empty_bit: u8, // TODO make this a bool
const Queue = std.atomic.QueueMpsc(promise);
pub const Held = struct {
lock: *Lock,
pub fn release(self: Held) void {
// Resume the next item from the queue.
if (self.lock.queue.get()) |node| {
self.lock.loop.onNextTick(node);
return;
}
// We need to release the lock.
_ = @atomicRmw(u8, &self.lock.queue_empty_bit, AtomicRmwOp.Xchg, 1, AtomicOrder.SeqCst);
_ = @atomicRmw(u8, &self.lock.shared_bit, AtomicRmwOp.Xchg, 0, AtomicOrder.SeqCst);
// There might be a queue item. If we know the queue is empty, we can be done,
// because the other actor will try to obtain the lock.
// But if there's a queue item, we are the actor which must loop and attempt
// to grab the lock again.
if (@atomicLoad(u8, &self.lock.queue_empty_bit, AtomicOrder.SeqCst) == 1) {
return;
}
while (true) {
const old_bit = @atomicRmw(u8, &self.lock.shared_bit, AtomicRmwOp.Xchg, 1, AtomicOrder.SeqCst);
if (old_bit != 0) {
// We did not obtain the lock. Great, the queue is someone else's problem.
return;
}
// Resume the next item from the queue.
if (self.lock.queue.get()) |node| {
self.lock.loop.onNextTick(node);
return;
}
// Release the lock again.
_ = @atomicRmw(u8, &self.lock.queue_empty_bit, AtomicRmwOp.Xchg, 1, AtomicOrder.SeqCst);
_ = @atomicRmw(u8, &self.lock.shared_bit, AtomicRmwOp.Xchg, 0, AtomicOrder.SeqCst);
// Find out if we can be done.
if (@atomicLoad(u8, &self.lock.queue_empty_bit, AtomicOrder.SeqCst) == 1) {
return;
}
}
}
};
pub fn init(loop: *Loop) Lock {
return Lock{
.loop = loop,
.shared_bit = 0,
.queue = Queue.init(),
.queue_empty_bit = 1,
};
}
/// Must be called when not locked. Not thread safe.
/// All calls to acquire() and release() must complete before calling deinit().
pub fn deinit(self: *Lock) void {
assert(self.shared_bit == 0);
while (self.queue.get()) |node| cancel node.data;
}
pub async fn acquire(self: *Lock) Held {
var my_tick_node: Loop.NextTickNode = undefined;
s: suspend |handle| {
my_tick_node.data = handle;
self.queue.put(&my_tick_node);
// At this point, we are in the queue, so we might have already been resumed and this coroutine
// frame might be destroyed. For the rest of the suspend block we cannot access the coroutine frame.
// We set this bit so that later we can rely on the fact, that if queue_empty_bit is 1, some actor
// will attempt to grab the lock.
_ = @atomicRmw(u8, &self.queue_empty_bit, AtomicRmwOp.Xchg, 0, AtomicOrder.SeqCst);
while (true) {
const old_bit = @atomicRmw(u8, &self.shared_bit, AtomicRmwOp.Xchg, 1, AtomicOrder.SeqCst);
if (old_bit != 0) {
// We did not obtain the lock. Trust that our queue entry will resume us, and allow
// suspend to complete.
break;
}
// We got the lock. However we might have already been resumed from the queue.
if (self.queue.get()) |node| {
// Whether this node is us or someone else, we tail resume it.
resume node.data;
break;
} else {
// We already got resumed, and there are none left in the queue, which means that
// we aren't even supposed to hold the lock right now.
_ = @atomicRmw(u8, &self.queue_empty_bit, AtomicRmwOp.Xchg, 1, AtomicOrder.SeqCst);
_ = @atomicRmw(u8, &self.shared_bit, AtomicRmwOp.Xchg, 0, AtomicOrder.SeqCst);
// There might be a queue item. If we know the queue is empty, we can be done,
// because the other actor will try to obtain the lock.
// But if there's a queue item, we are the actor which must loop and attempt
// to grab the lock again.
if (@atomicLoad(u8, &self.queue_empty_bit, AtomicOrder.SeqCst) == 1) {
break;
} else {
continue;
}
}
unreachable;
}
}
// TODO this workaround to force my_tick_node to be in the coroutine frame should
// not be necessary
var trash1 = &my_tick_node;
return Held{ .lock = self };
}
};
/// Thread-safe async/await lock that protects one piece of data.
/// Does not make any syscalls - coroutines which are waiting for the lock are suspended, and
/// are resumed when the lock is released, in order.
pub fn Locked(comptime T: type) type {
return struct {
lock: Lock,
private_data: T,
const Self = this;
pub const HeldLock = struct {
value: *T,
held: Lock.Held,
pub fn release(self: HeldLock) void {
self.held.release();
}
};
pub fn init(loop: *Loop, data: T) Self {
return Self{
.lock = Lock.init(loop),
.private_data = data,
};
}
pub fn deinit(self: *Self) void {
self.lock.deinit();
}
pub async fn acquire(self: *Self) HeldLock {
return HeldLock{
// TODO guaranteed allocation elision
.held = await (async self.lock.acquire() catch unreachable),
.value = &self.private_data,
};
}
};
}
test "std.event.Lock" {
var da = std.heap.DirectAllocator.init();
defer da.deinit();
const allocator = &da.allocator;
var loop: Loop = undefined;
try loop.initMultiThreaded(allocator);
defer loop.deinit();
var lock = Lock.init(&loop);
defer lock.deinit();
const handle = try async<allocator> testLock(&loop, &lock);
defer cancel handle;
loop.run();
assert(mem.eql(i32, shared_test_data, [1]i32{3 * 10} ** 10));
}
async fn testLock(loop: *Loop, lock: *Lock) void {
const handle1 = async lockRunner(lock) catch @panic("out of memory");
var tick_node1 = Loop.NextTickNode{
.next = undefined,
.data = handle1,
};
loop.onNextTick(&tick_node1);
const handle2 = async lockRunner(lock) catch @panic("out of memory");
var tick_node2 = Loop.NextTickNode{
.next = undefined,
.data = handle2,
};
loop.onNextTick(&tick_node2);
const handle3 = async lockRunner(lock) catch @panic("out of memory");
var tick_node3 = Loop.NextTickNode{
.next = undefined,
.data = handle3,
};
loop.onNextTick(&tick_node3);
await handle1;
await handle2;
await handle3;
// TODO this is to force tick node memory to be in the coro frame
// there should be a way to make it explicit where the memory is
var a = &tick_node1;
var b = &tick_node2;
var c = &tick_node3;
}
var shared_test_data = [1]i32{0} ** 10;
var shared_test_index: usize = 0;
async fn lockRunner(lock: *Lock) void {
suspend; // resumed by onNextTick
var i: usize = 0;
while (i < 10) : (i += 1) {
const handle = await (async lock.acquire() catch @panic("out of memory"));
defer handle.release();
shared_test_index = 0;
while (shared_test_index < shared_test_data.len) : (shared_test_index += 1) {
shared_test_data[shared_test_index] = shared_test_data[shared_test_index] + 1;
}
}
}

View File

@ -38,7 +38,7 @@ fn cFree(self: *Allocator, old_mem: []u8) void {
}
/// This allocator makes a syscall directly for every allocation and free.
/// TODO make this thread-safe. The windows implementation will need some atomics.
/// Thread-safe and lock-free.
pub const DirectAllocator = struct {
allocator: Allocator,
heap_handle: ?HeapHandle,
@ -74,34 +74,34 @@ pub const DirectAllocator = struct {
const alloc_size = if (alignment <= os.page_size) n else n + alignment;
const addr = p.mmap(null, alloc_size, p.PROT_READ | p.PROT_WRITE, p.MAP_PRIVATE | p.MAP_ANONYMOUS, -1, 0);
if (addr == p.MAP_FAILED) return error.OutOfMemory;
if (alloc_size == n) return @intToPtr([*]u8, addr)[0..n];
var aligned_addr = addr & ~usize(alignment - 1);
aligned_addr += alignment;
const aligned_addr = (addr & ~usize(alignment - 1)) + alignment;
//We can unmap the unused portions of our mmap, but we must only
// pass munmap bytes that exist outside our allocated pages or it
// will happily eat us too
// We can unmap the unused portions of our mmap, but we must only
// pass munmap bytes that exist outside our allocated pages or it
// will happily eat us too.
//Since alignment > page_size, we are by definition on a page boundry
// Since alignment > page_size, we are by definition on a page boundary.
const unused_start = addr;
const unused_len = aligned_addr - 1 - unused_start;
var err = p.munmap(unused_start, unused_len);
debug.assert(p.getErrno(err) == 0);
const err = p.munmap(unused_start, unused_len);
assert(p.getErrno(err) == 0);
//It is impossible that there is an unoccupied page at the top of our
// mmap.
// It is impossible that there is an unoccupied page at the top of our
// mmap.
return @intToPtr([*]u8, aligned_addr)[0..n];
},
Os.windows => {
const amt = n + alignment + @sizeOf(usize);
const heap_handle = self.heap_handle orelse blk: {
const optional_heap_handle = @atomicLoad(?HeapHandle, ?self.heap_handle, builtin.AtomicOrder.SeqCst);
const heap_handle = optional_heap_handle orelse blk: {
const hh = os.windows.HeapCreate(os.windows.HEAP_NO_SERIALIZE, amt, 0) orelse return error.OutOfMemory;
self.heap_handle = hh;
break :blk hh;
const other_hh = @cmpxchgStrong(?HeapHandle, &self.heap_handle, null, hh, builtin.AtomicOrder.SeqCst, builtin.AtomicOrder.SeqCst) orelse break :blk hh;
_ = os.windows.HeapDestroy(hh);
break :blk other_hh;
};
const ptr = os.windows.HeapAlloc(heap_handle, 0, amt) orelse return error.OutOfMemory;
const root_addr = @ptrToInt(ptr);

View File

@ -6,7 +6,7 @@ const builtin = @import("builtin");
const mem = this;
pub const Allocator = struct {
const Error = error{OutOfMemory};
pub const Error = error{OutOfMemory};
/// Allocate byte_count bytes and return them in a slice, with the
/// slice's pointer aligned at least to alignment bytes.

View File

@ -2309,6 +2309,30 @@ pub fn linuxEpollWait(epfd: i32, events: []linux.epoll_event, timeout: i32) usiz
}
}
pub const LinuxEventFdError = error{
InvalidFlagValue,
SystemResources,
ProcessFdQuotaExceeded,
SystemFdQuotaExceeded,
Unexpected,
};
pub fn linuxEventFd(initval: u32, flags: u32) LinuxEventFdError!i32 {
const rc = posix.eventfd(initval, flags);
const err = posix.getErrno(rc);
switch (err) {
0 => return @intCast(i32, rc),
else => return unexpectedErrorPosix(err),
posix.EINVAL => return LinuxEventFdError.InvalidFlagValue,
posix.EMFILE => return LinuxEventFdError.ProcessFdQuotaExceeded,
posix.ENFILE => return LinuxEventFdError.SystemFdQuotaExceeded,
posix.ENODEV => return LinuxEventFdError.SystemResources,
posix.ENOMEM => return LinuxEventFdError.SystemResources,
}
}
pub const PosixGetSockNameError = error{
/// Insufficient resources were available in the system to perform the operation.
SystemResources,
@ -2605,10 +2629,17 @@ pub fn spawnThread(context: var, comptime startFn: var) SpawnThreadError!*Thread
const MainFuncs = struct {
extern fn linuxThreadMain(ctx_addr: usize) u8 {
if (@sizeOf(Context) == 0) {
return startFn({});
} else {
return startFn(@intToPtr(*const Context, ctx_addr).*);
const arg = if (@sizeOf(Context) == 0) {} else @intToPtr(*const Context, ctx_addr).*;
switch (@typeId(@typeOf(startFn).ReturnType)) {
builtin.TypeId.Int => {
return startFn(arg);
},
builtin.TypeId.Void => {
startFn(arg);
return 0;
},
else => @compileError("expected return type of startFn to be 'u8', 'noreturn', 'void', or '!void'"),
}
}
extern fn posixThreadMain(ctx: ?*c_void) ?*c_void {

View File

@ -523,6 +523,10 @@ pub const CLONE_NEWPID = 0x20000000;
pub const CLONE_NEWNET = 0x40000000;
pub const CLONE_IO = 0x80000000;
pub const EFD_SEMAPHORE = 1;
pub const EFD_CLOEXEC = O_CLOEXEC;
pub const EFD_NONBLOCK = O_NONBLOCK;
pub const MS_RDONLY = 1;
pub const MS_NOSUID = 2;
pub const MS_NODEV = 4;
@ -1221,6 +1225,10 @@ pub fn epoll_wait(epoll_fd: i32, events: [*]epoll_event, maxevents: u32, timeout
return syscall4(SYS_epoll_wait, @intCast(usize, epoll_fd), @ptrToInt(events), @intCast(usize, maxevents), @intCast(usize, timeout));
}
pub fn eventfd(count: u32, flags: u32) usize {
return syscall2(SYS_eventfd2, count, flags);
}
pub fn timerfd_create(clockid: i32, flags: u32) usize {
return syscall2(SYS_timerfd_create, @intCast(usize, clockid), @intCast(usize, flags));
}