From 4ac36d094c06b04c1c71d972d3f3e1187bccea95 Mon Sep 17 00:00:00 2001 From: Andrew Kelley Date: Fri, 27 Apr 2018 19:27:58 -0400 Subject: [PATCH 01/12] add std.atomic.Stack and std.atomic.Queue --- CMakeLists.txt | 3 +++ std/atomic/index.zig | 7 +++++++ std/atomic/queue.zig | 37 ++++++++++++++++++++++++++++++++++++ std/atomic/stack.zig | 45 ++++++++++++++++++++++++++++++++++++++++++++ std/index.zig | 2 ++ 5 files changed, 94 insertions(+) create mode 100644 std/atomic/index.zig create mode 100644 std/atomic/queue.zig create mode 100644 std/atomic/stack.zig diff --git a/CMakeLists.txt b/CMakeLists.txt index 9bf4bdd709..721690e9dc 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -415,6 +415,9 @@ set(ZIG_CPP_SOURCES set(ZIG_STD_FILES "array_list.zig" + "atomic/index.zig" + "atomic/stack.zig" + "atomic/queue.zig" "base64.zig" "buf_map.zig" "buf_set.zig" diff --git a/std/atomic/index.zig b/std/atomic/index.zig new file mode 100644 index 0000000000..9d556a6415 --- /dev/null +++ b/std/atomic/index.zig @@ -0,0 +1,7 @@ +pub const Stack = @import("stack.zig").Stack; +pub const Queue = @import("queue.zig").Queue; + +test "std.atomic" { + _ = @import("stack.zig").Stack; + _ = @import("queue.zig").Queue; +} diff --git a/std/atomic/queue.zig b/std/atomic/queue.zig new file mode 100644 index 0000000000..54981d4a61 --- /dev/null +++ b/std/atomic/queue.zig @@ -0,0 +1,37 @@ +/// Many reader, many writer, non-allocating, thread-safe, lock-free +pub fn Queue(comptime T: type) type { + return struct { + head: &Node, + tail: &Node, + root: Node, + + pub const Self = this; + + pub const Node = struct { + next: ?&Node, + data: T, + }; + + // TODO: well defined copy elision + pub fn init(self: &Self) void { + self.root.next = null; + self.head = &self.root; + self.tail = &self.root; + } + + pub fn put(self: &Self, node: &Node) void { + node.next = null; + + const tail = @atomicRmw(&Node, &self.tail, AtomicRmwOp.Xchg, node, AtomicOrder.SeqCst); + _ = @atomicRmw(?&Node, &tail.next, AtomicRmwOp.Xchg, node, AtomicOrder.SeqCst); + } + + pub fn get(self: &Self) ?&Node { + var head = @atomicLoad(&Node, &self.head, AtomicOrder.Acquire); + while (true) { + const node = head.next ?? return null; + head = @cmpxchgWeak(&Node, &self.head, head, node, AtomicOrder.Release, AtomicOrder.Acquire) ?? return node; + } + } + }; +} diff --git a/std/atomic/stack.zig b/std/atomic/stack.zig new file mode 100644 index 0000000000..4ceecb7b1d --- /dev/null +++ b/std/atomic/stack.zig @@ -0,0 +1,45 @@ +/// Many reader, many writer, non-allocating, thread-safe, lock-free +pub fn Stack(comptime T: type) type { + return struct { + root: ?&Node, + + pub const Self = this; + + pub const Node = struct { + next: ?&Node, + data: T, + }; + + pub fn init() Self { + return Self { + .root = null, + }; + } + + /// push operation, but only if you are the first item in the stack. if you did not succeed in + /// being the first item in the stack, returns the other item that was there. + pub fn pushFirst(self: &Self, node: &Node) ?&Node { + node.next = null; + return @cmpxchgStrong(?&Node, &self.root, null, node, AtomicOrder.AcqRel, AtomicOrder.AcqRel); + } + + pub fn push(self: &Self, node: &Node) void { + var root = @atomicLoad(?&Node, &self.root, AtomicOrder.Acquire); + while (true) { + node.next = root; + root = @cmpxchgWeak(?&Node, &self.root, root, node, AtomicOrder.Release, AtomicOrder.Acquire) ?? break; + } + } + + pub fn pop(self: &Self) ?&Node { + var root = @atomicLoad(?&Node, &self.root, AtomicOrder.Acquire); + while (true) { + root = @cmpxchgWeak(?&Node, &self.root, root, (root ?? return null).next, AtomicOrder.Release, AtomicOrder.Acquire) ?? return root; + } + } + + pub fn isEmpty(self: &Self) bool { + return @atomicLoad(?&Node, &self.root, AtomicOrder.Relaxed) == null; + } + }; +} diff --git a/std/index.zig b/std/index.zig index 07c4360aab..d6a1e3c94d 100644 --- a/std/index.zig +++ b/std/index.zig @@ -8,6 +8,7 @@ pub const HashMap = @import("hash_map.zig").HashMap; pub const LinkedList = @import("linked_list.zig").LinkedList; pub const IntrusiveLinkedList = @import("linked_list.zig").IntrusiveLinkedList; +pub const atomic = @import("atomic/index.zig"); pub const base64 = @import("base64.zig"); pub const build = @import("build.zig"); pub const c = @import("c/index.zig"); @@ -34,6 +35,7 @@ pub const zig = @import("zig/index.zig"); test "std" { // run tests from these + _ = @import("atomic/index.zig"); _ = @import("array_list.zig"); _ = @import("buf_map.zig"); _ = @import("buf_set.zig"); From 96ecb402590df7a02526009f1630f27e14a0e77c Mon Sep 17 00:00:00 2001 From: Andrew Kelley Date: Sat, 28 Apr 2018 17:53:06 -0400 Subject: [PATCH 02/12] add fuzz tests for std.atomic.Stack --- src/ir.cpp | 5 +++ std/atomic/stack.zig | 87 +++++++++++++++++++++++++++++++++++++++++--- std/heap.zig | 64 ++++++++++++++++++++++++++++---- 3 files changed, 143 insertions(+), 13 deletions(-) diff --git a/src/ir.cpp b/src/ir.cpp index 4bf8240472..469900bf07 100644 --- a/src/ir.cpp +++ b/src/ir.cpp @@ -18184,6 +18184,11 @@ static TypeTableEntry *ir_analyze_instruction_atomic_rmw(IrAnalyze *ira, IrInstr } else { if (!ir_resolve_atomic_order(ira, instruction->ordering->other, &ordering)) return ira->codegen->builtin_types.entry_invalid; + if (ordering == AtomicOrderUnordered) { + ir_add_error(ira, instruction->ordering, + buf_sprintf("@atomicRmw atomic ordering must not be Unordered")); + return ira->codegen->builtin_types.entry_invalid; + } } if (instr_is_comptime(casted_operand) && instr_is_comptime(casted_ptr) && casted_ptr->value.data.x_ptr.mut == ConstPtrMutComptimeVar) diff --git a/std/atomic/stack.zig b/std/atomic/stack.zig index 4ceecb7b1d..a1e686155c 100644 --- a/std/atomic/stack.zig +++ b/std/atomic/stack.zig @@ -1,3 +1,6 @@ +const builtin = @import("builtin"); +const AtomicOrder = builtin.AtomicOrder; + /// Many reader, many writer, non-allocating, thread-safe, lock-free pub fn Stack(comptime T: type) type { return struct { @@ -20,26 +23,100 @@ pub fn Stack(comptime T: type) type { /// being the first item in the stack, returns the other item that was there. pub fn pushFirst(self: &Self, node: &Node) ?&Node { node.next = null; - return @cmpxchgStrong(?&Node, &self.root, null, node, AtomicOrder.AcqRel, AtomicOrder.AcqRel); + return @cmpxchgStrong(?&Node, &self.root, null, node, AtomicOrder.SeqCst, AtomicOrder.SeqCst); } pub fn push(self: &Self, node: &Node) void { - var root = @atomicLoad(?&Node, &self.root, AtomicOrder.Acquire); + var root = @atomicLoad(?&Node, &self.root, AtomicOrder.SeqCst); while (true) { node.next = root; - root = @cmpxchgWeak(?&Node, &self.root, root, node, AtomicOrder.Release, AtomicOrder.Acquire) ?? break; + root = @cmpxchgWeak(?&Node, &self.root, root, node, AtomicOrder.SeqCst, AtomicOrder.SeqCst) ?? break; } } pub fn pop(self: &Self) ?&Node { var root = @atomicLoad(?&Node, &self.root, AtomicOrder.Acquire); while (true) { - root = @cmpxchgWeak(?&Node, &self.root, root, (root ?? return null).next, AtomicOrder.Release, AtomicOrder.Acquire) ?? return root; + root = @cmpxchgWeak(?&Node, &self.root, root, (root ?? return null).next, AtomicOrder.SeqCst, AtomicOrder.SeqCst) ?? return root; } } pub fn isEmpty(self: &Self) bool { - return @atomicLoad(?&Node, &self.root, AtomicOrder.Relaxed) == null; + return @atomicLoad(?&Node, &self.root, AtomicOrder.SeqCst) == null; } }; } + +const std = @import("std"); +const Context = struct { + allocator: &std.mem.Allocator, + stack: &Stack(i32), + put_sum: isize, + get_sum: isize, + puts_done: u8, // TODO make this a bool +}; +const puts_per_thread = 1000; +const put_thread_count = 3; + +test "std.atomic.stack" { + var direct_allocator = std.heap.DirectAllocator.init(); + defer direct_allocator.deinit(); + + var plenty_of_memory = try direct_allocator.allocator.alloc(u8, 64 * 1024 * 1024); + defer direct_allocator.allocator.free(plenty_of_memory); + + var fixed_buffer_allocator = std.heap.ThreadSafeFixedBufferAllocator.init(plenty_of_memory); + var a = &fixed_buffer_allocator.allocator; + + var stack = Stack(i32).init(); + var context = Context { + .allocator = a, + .stack = &stack, + .put_sum = 0, + .get_sum = 0, + .puts_done = 0, + }; + + var putters: [put_thread_count]&std.os.Thread = undefined; + for (putters) |*t| { + *t = try std.os.spawnThreadAllocator(a, &context, startPuts); + } + var getters: [put_thread_count]&std.os.Thread = undefined; + for (getters) |*t| { + *t = try std.os.spawnThreadAllocator(a, &context, startGets); + } + + for (putters) |t| t.wait(); + _ = @atomicRmw(u8, &context.puts_done, builtin.AtomicRmwOp.Xchg, 1, AtomicOrder.SeqCst); + for (getters) |t| t.wait(); + + std.debug.assert(context.put_sum == context.get_sum); +} + +fn startPuts(ctx: &Context) u8 { + var put_count: usize = puts_per_thread; + var r = std.rand.DefaultPrng.init(0xdeadbeef); + while (put_count != 0) : (put_count -= 1) { + std.os.time.sleep(0, 1); // let the os scheduler be our fuzz + const x = @bitCast(i32, r.random.scalar(u32)); + const node = ctx.allocator.create(Stack(i32).Node) catch unreachable; + node.data = x; + ctx.stack.push(node); + _ = @atomicRmw(isize, &ctx.put_sum, builtin.AtomicRmwOp.Add, x, AtomicOrder.SeqCst); + } + return 0; +} + +fn startGets(ctx: &Context) u8 { + while (true) { + while (ctx.stack.pop()) |node| { + std.os.time.sleep(0, 1); // let the os scheduler be our fuzz + _ = @atomicRmw(isize, &ctx.get_sum, builtin.AtomicRmwOp.Add, node.data, builtin.AtomicOrder.SeqCst); + } + + if (@atomicLoad(u8, &ctx.puts_done, builtin.AtomicOrder.SeqCst) == 1) { + break; + } + } + return 0; +} diff --git a/std/heap.zig b/std/heap.zig index b3a1e6bf27..d632b44cd1 100644 --- a/std/heap.zig +++ b/std/heap.zig @@ -47,13 +47,6 @@ pub const DirectAllocator = struct { const HeapHandle = if (builtin.os == Os.windows) os.windows.HANDLE else void; - //pub const canary_bytes = []u8 {48, 239, 128, 46, 18, 49, 147, 9, 195, 59, 203, 3, 245, 54, 9, 122}; - //pub const want_safety = switch (builtin.mode) { - // builtin.Mode.Debug => true, - // builtin.Mode.ReleaseSafe => true, - // else => false, - //}; - pub fn init() DirectAllocator { return DirectAllocator { .allocator = Allocator { @@ -298,7 +291,7 @@ pub const FixedBufferAllocator = struct { fn alloc(allocator: &Allocator, n: usize, alignment: u29) ![]u8 { const self = @fieldParentPtr(FixedBufferAllocator, "allocator", allocator); - const addr = @ptrToInt(&self.buffer[self.end_index]); + const addr = @ptrToInt(self.buffer.ptr) + self.end_index; const rem = @rem(addr, alignment); const march_forward_bytes = if (rem == 0) 0 else (alignment - rem); const adjusted_index = self.end_index + march_forward_bytes; @@ -325,6 +318,54 @@ pub const FixedBufferAllocator = struct { fn free(allocator: &Allocator, bytes: []u8) void { } }; +/// lock free +pub const ThreadSafeFixedBufferAllocator = struct { + allocator: Allocator, + end_index: usize, + buffer: []u8, + + pub fn init(buffer: []u8) ThreadSafeFixedBufferAllocator { + return ThreadSafeFixedBufferAllocator { + .allocator = Allocator { + .allocFn = alloc, + .reallocFn = realloc, + .freeFn = free, + }, + .buffer = buffer, + .end_index = 0, + }; + } + + fn alloc(allocator: &Allocator, n: usize, alignment: u29) ![]u8 { + const self = @fieldParentPtr(ThreadSafeFixedBufferAllocator, "allocator", allocator); + var end_index = @atomicLoad(usize, &self.end_index, builtin.AtomicOrder.SeqCst); + while (true) { + const addr = @ptrToInt(self.buffer.ptr) + end_index; + const rem = @rem(addr, alignment); + const march_forward_bytes = if (rem == 0) 0 else (alignment - rem); + const adjusted_index = end_index + march_forward_bytes; + const new_end_index = adjusted_index + n; + if (new_end_index > self.buffer.len) { + return error.OutOfMemory; + } + end_index = @cmpxchgWeak(usize, &self.end_index, end_index, new_end_index, + builtin.AtomicOrder.SeqCst, builtin.AtomicOrder.SeqCst) ?? return self.buffer[adjusted_index .. new_end_index]; + } + } + + fn realloc(allocator: &Allocator, old_mem: []u8, new_size: usize, alignment: u29) ![]u8 { + if (new_size <= old_mem.len) { + return old_mem[0..new_size]; + } else { + const result = try alloc(allocator, new_size, alignment); + mem.copy(u8, result, old_mem); + return result; + } + } + + fn free(allocator: &Allocator, bytes: []u8) void { } +}; + test "c_allocator" { @@ -363,6 +404,13 @@ test "FixedBufferAllocator" { try testAllocatorLargeAlignment(&fixed_buffer_allocator.allocator); } +test "ThreadSafeFixedBufferAllocator" { + var fixed_buffer_allocator = ThreadSafeFixedBufferAllocator.init(test_fixed_buffer_allocator_memory[0..]); + + try testAllocator(&fixed_buffer_allocator.allocator); + try testAllocatorLargeAlignment(&fixed_buffer_allocator.allocator); +} + fn testAllocator(allocator: &mem.Allocator) !void { var slice = try allocator.alloc(&i32, 100); From 5d6e44b3f2bf37deaf09ce4a473fa5b52a6fb760 Mon Sep 17 00:00:00 2001 From: Andrew Kelley Date: Sat, 28 Apr 2018 18:00:51 -0400 Subject: [PATCH 03/12] add tests for std.atomic Queue and Stack --- std/atomic/queue.zig | 85 +++++++++++++++++++++++++++++++++++++++++++- std/atomic/stack.zig | 4 +++ 2 files changed, 88 insertions(+), 1 deletion(-) diff --git a/std/atomic/queue.zig b/std/atomic/queue.zig index 54981d4a61..c7ce00d6cf 100644 --- a/std/atomic/queue.zig +++ b/std/atomic/queue.zig @@ -1,3 +1,7 @@ +const builtin = @import("builtin"); +const AtomicOrder = builtin.AtomicOrder; +const AtomicRmwOp = builtin.AtomicRmwOp; + /// Many reader, many writer, non-allocating, thread-safe, lock-free pub fn Queue(comptime T: type) type { return struct { @@ -12,7 +16,7 @@ pub fn Queue(comptime T: type) type { data: T, }; - // TODO: well defined copy elision + // TODO: well defined copy elision: https://github.com/zig-lang/zig/issues/287 pub fn init(self: &Self) void { self.root.next = null; self.head = &self.root; @@ -35,3 +39,82 @@ pub fn Queue(comptime T: type) type { } }; } + +const std = @import("std"); +const Context = struct { + allocator: &std.mem.Allocator, + queue: &Queue(i32), + put_sum: isize, + get_sum: isize, + get_count: usize, + puts_done: u8, // TODO make this a bool +}; +const puts_per_thread = 10000; +const put_thread_count = 3; + +test "std.atomic.queue" { + var direct_allocator = std.heap.DirectAllocator.init(); + defer direct_allocator.deinit(); + + var plenty_of_memory = try direct_allocator.allocator.alloc(u8, 64 * 1024 * 1024); + defer direct_allocator.allocator.free(plenty_of_memory); + + var fixed_buffer_allocator = std.heap.ThreadSafeFixedBufferAllocator.init(plenty_of_memory); + var a = &fixed_buffer_allocator.allocator; + + var queue: Queue(i32) = undefined; + queue.init(); + var context = Context { + .allocator = a, + .queue = &queue, + .put_sum = 0, + .get_sum = 0, + .puts_done = 0, + .get_count = 0, + }; + + var putters: [put_thread_count]&std.os.Thread = undefined; + for (putters) |*t| { + *t = try std.os.spawnThreadAllocator(a, &context, startPuts); + } + var getters: [put_thread_count]&std.os.Thread = undefined; + for (getters) |*t| { + *t = try std.os.spawnThreadAllocator(a, &context, startGets); + } + + for (putters) |t| t.wait(); + _ = @atomicRmw(u8, &context.puts_done, builtin.AtomicRmwOp.Xchg, 1, AtomicOrder.SeqCst); + for (getters) |t| t.wait(); + + std.debug.assert(context.put_sum == context.get_sum); + std.debug.assert(context.get_count == puts_per_thread * put_thread_count); +} + +fn startPuts(ctx: &Context) u8 { + var put_count: usize = puts_per_thread; + var r = std.rand.DefaultPrng.init(0xdeadbeef); + while (put_count != 0) : (put_count -= 1) { + std.os.time.sleep(0, 1); // let the os scheduler be our fuzz + const x = @bitCast(i32, r.random.scalar(u32)); + const node = ctx.allocator.create(Queue(i32).Node) catch unreachable; + node.data = x; + ctx.queue.put(node); + _ = @atomicRmw(isize, &ctx.put_sum, builtin.AtomicRmwOp.Add, x, AtomicOrder.SeqCst); + } + return 0; +} + +fn startGets(ctx: &Context) u8 { + while (true) { + while (ctx.queue.get()) |node| { + std.os.time.sleep(0, 1); // let the os scheduler be our fuzz + _ = @atomicRmw(isize, &ctx.get_sum, builtin.AtomicRmwOp.Add, node.data, builtin.AtomicOrder.SeqCst); + _ = @atomicRmw(usize, &ctx.get_count, builtin.AtomicRmwOp.Add, 1, builtin.AtomicOrder.SeqCst); + } + + if (@atomicLoad(u8, &ctx.puts_done, builtin.AtomicOrder.SeqCst) == 1) { + break; + } + } + return 0; +} diff --git a/std/atomic/stack.zig b/std/atomic/stack.zig index a1e686155c..a53f7682b0 100644 --- a/std/atomic/stack.zig +++ b/std/atomic/stack.zig @@ -53,6 +53,7 @@ const Context = struct { stack: &Stack(i32), put_sum: isize, get_sum: isize, + get_count: usize, puts_done: u8, // TODO make this a bool }; const puts_per_thread = 1000; @@ -75,6 +76,7 @@ test "std.atomic.stack" { .put_sum = 0, .get_sum = 0, .puts_done = 0, + .get_count = 0, }; var putters: [put_thread_count]&std.os.Thread = undefined; @@ -91,6 +93,7 @@ test "std.atomic.stack" { for (getters) |t| t.wait(); std.debug.assert(context.put_sum == context.get_sum); + std.debug.assert(context.get_count == puts_per_thread * put_thread_count); } fn startPuts(ctx: &Context) u8 { @@ -112,6 +115,7 @@ fn startGets(ctx: &Context) u8 { while (ctx.stack.pop()) |node| { std.os.time.sleep(0, 1); // let the os scheduler be our fuzz _ = @atomicRmw(isize, &ctx.get_sum, builtin.AtomicRmwOp.Add, node.data, builtin.AtomicOrder.SeqCst); + _ = @atomicRmw(usize, &ctx.get_count, builtin.AtomicRmwOp.Add, 1, builtin.AtomicOrder.SeqCst); } if (@atomicLoad(u8, &ctx.puts_done, builtin.AtomicOrder.SeqCst) == 1) { From a10351b439769c57454e19915365b21e43f408bc Mon Sep 17 00:00:00 2001 From: Andrew Kelley Date: Sat, 28 Apr 2018 18:19:00 -0400 Subject: [PATCH 04/12] disable atomic stack and queue tests for non-linux --- std/atomic/queue.zig | 4 ++++ std/atomic/stack.zig | 4 ++++ 2 files changed, 8 insertions(+) diff --git a/std/atomic/queue.zig b/std/atomic/queue.zig index c7ce00d6cf..3866bad7ce 100644 --- a/std/atomic/queue.zig +++ b/std/atomic/queue.zig @@ -53,6 +53,10 @@ const puts_per_thread = 10000; const put_thread_count = 3; test "std.atomic.queue" { + if (builtin.os != builtin.Os.linux) { + // TODO implement kernel threads for windows and macos + return; + } var direct_allocator = std.heap.DirectAllocator.init(); defer direct_allocator.deinit(); diff --git a/std/atomic/stack.zig b/std/atomic/stack.zig index a53f7682b0..12de2edaaa 100644 --- a/std/atomic/stack.zig +++ b/std/atomic/stack.zig @@ -60,6 +60,10 @@ const puts_per_thread = 1000; const put_thread_count = 3; test "std.atomic.stack" { + if (builtin.os != builtin.Os.linux) { + // TODO implement kernel threads for windows and macos + return; + } var direct_allocator = std.heap.DirectAllocator.init(); defer direct_allocator.deinit(); From a344cb03bc3c48f3c7fec32dc19c1bcad0910941 Mon Sep 17 00:00:00 2001 From: Andrew Kelley Date: Sat, 28 Apr 2018 23:30:13 -0400 Subject: [PATCH 05/12] *WIP* use pthreads when linking libc --- std/c/darwin.zig | 5 +++ std/c/index.zig | 10 +++++ std/c/linux.zig | 5 +++ std/os/index.zig | 112 +++++++++++++++++++++++++++++++++++------------ std/os/test.zig | 4 +- 5 files changed, 107 insertions(+), 29 deletions(-) diff --git a/std/c/darwin.zig b/std/c/darwin.zig index b958055ae8..7ac57514c9 100644 --- a/std/c/darwin.zig +++ b/std/c/darwin.zig @@ -81,3 +81,8 @@ pub const sockaddr = extern struct { }; pub const sa_family_t = u8; + +pub const pthread_attr_t = extern struct { + __sig: c_long, + __opaque: [56]u8, +}; diff --git a/std/c/index.zig b/std/c/index.zig index cff86f4041..5ea7145cd3 100644 --- a/std/c/index.zig +++ b/std/c/index.zig @@ -53,3 +53,13 @@ pub extern "c" fn malloc(usize) ?&c_void; pub extern "c" fn realloc(&c_void, usize) ?&c_void; pub extern "c" fn free(&c_void) void; pub extern "c" fn posix_memalign(memptr: &&c_void, alignment: usize, size: usize) c_int; + +pub extern "c" fn pthread_create(noalias newthread: &pthread_t, + noalias attr: ?&const pthread_attr_t, start_routine: extern fn(?&c_void) ?&c_void, + noalias arg: ?&c_void) c_int; +pub extern "c" fn pthread_attr_init(attr: &pthread_attr_t) c_int; +pub extern "c" fn pthread_attr_setstack(attr: &pthread_attr_t, stackaddr: &c_void, stacksize: usize) c_int; +pub extern "c" fn pthread_attr_destroy(attr: &pthread_attr_t) c_int; +pub extern "c" fn pthread_join(thread: pthread_t, arg_return: ?&?&c_void) c_int; + +pub const pthread_t = &@OpaqueType(); diff --git a/std/c/linux.zig b/std/c/linux.zig index b2ac05eba5..7810fec130 100644 --- a/std/c/linux.zig +++ b/std/c/linux.zig @@ -3,3 +3,8 @@ pub use @import("../os/linux/errno.zig"); pub extern "c" fn getrandom(buf_ptr: &u8, buf_len: usize, flags: c_uint) c_int; extern "c" fn __errno_location() &c_int; pub const _errno = __errno_location; + +pub const pthread_attr_t = extern struct { + __size: [56]u8, + __align: c_long, +}; diff --git a/std/os/index.zig b/std/os/index.zig index 0639490725..3669dca198 100644 --- a/std/os/index.zig +++ b/std/os/index.zig @@ -2,6 +2,10 @@ const std = @import("../index.zig"); const builtin = @import("builtin"); const Os = builtin.Os; const is_windows = builtin.os == Os.windows; +const is_posix = switch (builtin.os) { + builtin.Os.linux, builtin.Os.macosx => true, + else => false, +}; const os = this; test "std.os" { @@ -2343,21 +2347,39 @@ pub fn posixGetSockOptConnectError(sockfd: i32) PosixConnectError!void { } pub const Thread = struct { - pid: i32, + pid: pid_t, allocator: ?&mem.Allocator, stack: []u8, + pthread_handle: pthread_t, + + pub const use_pthreads = is_posix and builtin.link_libc; + const pthread_t = if (use_pthreads) c.pthread_t else void; + const pid_t = if (!use_pthreads) i32 else void; pub fn wait(self: &const Thread) void { - while (true) { - const pid_value = @atomicLoad(i32, &self.pid, builtin.AtomicOrder.SeqCst); - if (pid_value == 0) break; - const rc = linux.futex_wait(@ptrToInt(&self.pid), linux.FUTEX_WAIT, pid_value, null); - switch (linux.getErrno(rc)) { - 0 => continue, - posix.EINTR => continue, - posix.EAGAIN => continue, + if (use_pthreads) { + const err = c.pthread_join(self.pthread_handle, null); + switch (err) { + 0 => {}, + posix.EINVAL => unreachable, + posix.ESRCH => unreachable, + posix.EDEADLK => unreachable, else => unreachable, } + } else if (builtin.os == builtin.Os.linux) { + while (true) { + const pid_value = @atomicLoad(i32, &self.pid, builtin.AtomicOrder.SeqCst); + if (pid_value == 0) break; + const rc = linux.futex_wait(@ptrToInt(&self.pid), linux.FUTEX_WAIT, pid_value, null); + switch (linux.getErrno(rc)) { + 0 => continue, + posix.EINTR => continue, + posix.EAGAIN => continue, + else => unreachable, + } + } + } else { + @compileError("Unsupported OS"); } if (self.allocator) |a| { a.free(self.stack); @@ -2429,31 +2451,67 @@ pub fn spawnThread(stack: []u8, context: var, comptime startFn: var) SpawnThread thread_ptr.stack = stack; thread_ptr.allocator = null; - const threadMain = struct { - extern fn threadMain(ctx_addr: usize) u8 { + const MainFuncs = struct { + extern fn linuxThreadMain(ctx_addr: usize) u8 { if (@sizeOf(Context) == 0) { return startFn({}); } else { return startFn(*@intToPtr(&const Context, ctx_addr)); } } - }.threadMain; + extern fn posixThreadMain(ctx: ?&c_void) ?&c_void { + if (@sizeOf(Context) == 0) { + _ = startFn({}); + return null; + } else { + _ = startFn(*@ptrCast(&const Context, @alignCast(@alignOf(Context), ctx))); + return null; + } + } + }; - const flags = posix.CLONE_VM | posix.CLONE_FS | posix.CLONE_FILES | posix.CLONE_SIGHAND - | posix.CLONE_THREAD | posix.CLONE_SYSVSEM // | posix.CLONE_SETTLS - | posix.CLONE_PARENT_SETTID | posix.CLONE_CHILD_CLEARTID | posix.CLONE_DETACHED; - const newtls: usize = 0; - const rc = posix.clone(threadMain, stack_end, flags, arg, &thread_ptr.pid, newtls, &thread_ptr.pid); - const err = posix.getErrno(rc); - switch (err) { - 0 => return thread_ptr, - posix.EAGAIN => return SpawnThreadError.ThreadQuotaExceeded, - posix.EINVAL => unreachable, - posix.ENOMEM => return SpawnThreadError.SystemResources, - posix.ENOSPC => unreachable, - posix.EPERM => unreachable, - posix.EUSERS => unreachable, - else => return unexpectedErrorPosix(err), + if (builtin.os == builtin.Os.windows) { + // use windows API directly + @compileError("TODO support spawnThread for Windows"); + } else if (Thread.use_pthreads) { + // use pthreads + var attr: c.pthread_attr_t = undefined; + if (c.pthread_attr_init(&attr) != 0) return SpawnThreadError.SystemResources; + defer assert(c.pthread_attr_destroy(&attr) == 0); + + const stack_size = stack_end - @ptrToInt(stack.ptr); + if (c.pthread_attr_setstack(&attr, @ptrCast(&c_void, stack.ptr), stack_size) != 0) { + return SpawnThreadError.SystemResources; + } + + const err = c.pthread_create(&thread_ptr.pthread_handle, &attr, MainFuncs.posixThreadMain, @intToPtr(&c_void, arg)); + switch (err) { + 0 => return thread_ptr, + posix.EAGAIN => return SpawnThreadError.SystemResources, + posix.EPERM => unreachable, + posix.EINVAL => unreachable, + else => return unexpectedErrorPosix(usize(err)), + } + } else if (builtin.os == builtin.Os.linux) { + // use linux API directly + const flags = posix.CLONE_VM | posix.CLONE_FS | posix.CLONE_FILES | posix.CLONE_SIGHAND + | posix.CLONE_THREAD | posix.CLONE_SYSVSEM // | posix.CLONE_SETTLS + | posix.CLONE_PARENT_SETTID | posix.CLONE_CHILD_CLEARTID | posix.CLONE_DETACHED; + const newtls: usize = 0; + const rc = posix.clone(MainFuncs.linuxThreadMain, stack_end, flags, arg, &thread_ptr.pid, newtls, &thread_ptr.pid); + const err = posix.getErrno(rc); + switch (err) { + 0 => return thread_ptr, + posix.EAGAIN => return SpawnThreadError.ThreadQuotaExceeded, + posix.EINVAL => unreachable, + posix.ENOMEM => return SpawnThreadError.SystemResources, + posix.ENOSPC => unreachable, + posix.EPERM => unreachable, + posix.EUSERS => unreachable, + else => return unexpectedErrorPosix(err), + } + } else { + @compileError("Unsupported OS"); } } diff --git a/std/os/test.zig b/std/os/test.zig index 41afee004a..9a155c027a 100644 --- a/std/os/test.zig +++ b/std/os/test.zig @@ -44,8 +44,8 @@ test "access file" { } test "spawn threads" { - if (builtin.os != builtin.Os.linux) { - // TODO implement threads on macos and windows + if (builtin.os == builtin.Os.windows) { + // TODO implement threads on windows return; } From 998e25a01e8b3ada235aee4a9f785a7454de4b3f Mon Sep 17 00:00:00 2001 From: Andrew Kelley Date: Sat, 28 Apr 2018 23:47:39 -0400 Subject: [PATCH 06/12] pthread support working --- src/all_types.hpp | 1 + src/analyze.cpp | 8 ++++++++ src/codegen.cpp | 2 ++ std/os/index.zig | 14 ++++++++------ std/os/test.zig | 4 ++-- 5 files changed, 21 insertions(+), 8 deletions(-) diff --git a/src/all_types.hpp b/src/all_types.hpp index d1b2ad61d2..f08b870b37 100644 --- a/src/all_types.hpp +++ b/src/all_types.hpp @@ -1486,6 +1486,7 @@ struct CodeGen { ZigList link_libs_list; LinkLib *libc_link_lib; + LinkLib *pthread_link_lib; // add -framework [name] args to linker ZigList darwin_frameworks; diff --git a/src/analyze.cpp b/src/analyze.cpp index 1ecfe32f4c..8a9d236790 100644 --- a/src/analyze.cpp +++ b/src/analyze.cpp @@ -6049,10 +6049,15 @@ LinkLib *create_link_lib(Buf *name) { LinkLib *add_link_lib(CodeGen *g, Buf *name) { bool is_libc = buf_eql_str(name, "c"); + bool is_pthread = buf_eql_str(name, "pthread"); if (is_libc && g->libc_link_lib != nullptr) return g->libc_link_lib; + if (is_pthread && g->pthread_link_lib != nullptr) { + return g->pthread_link_lib; + } + for (size_t i = 0; i < g->link_libs_list.length; i += 1) { LinkLib *existing_lib = g->link_libs_list.at(i); if (buf_eql_buf(existing_lib->name, name)) { @@ -6066,6 +6071,9 @@ LinkLib *add_link_lib(CodeGen *g, Buf *name) { if (is_libc) g->libc_link_lib = link_lib; + if (is_pthread) + g->pthread_link_lib = link_lib; + return link_lib; } diff --git a/src/codegen.cpp b/src/codegen.cpp index 2d8c385f44..9f064d5f19 100644 --- a/src/codegen.cpp +++ b/src/codegen.cpp @@ -145,6 +145,7 @@ CodeGen *codegen_create(Buf *root_src_path, const ZigTarget *target, OutType out { g->libc_link_lib = create_link_lib(buf_create_from_str("c")); g->link_libs_list.append(g->libc_link_lib); + g->pthread_link_lib = create_link_lib(buf_create_from_str("pthread")); } return g; @@ -6373,6 +6374,7 @@ static void define_builtin_compile_vars(CodeGen *g) { buf_appendf(contents, "pub const object_format = ObjectFormat.%s;\n", cur_obj_fmt); buf_appendf(contents, "pub const mode = %s;\n", build_mode_to_str(g->build_mode)); buf_appendf(contents, "pub const link_libc = %s;\n", bool_to_str(g->libc_link_lib != nullptr)); + buf_appendf(contents, "pub const link_pthread = %s;\n", bool_to_str(g->pthread_link_lib != nullptr)); buf_appendf(contents, "pub const have_error_return_tracing = %s;\n", bool_to_str(g->have_err_ret_tracing)); buf_appendf(contents, "pub const __zig_test_fn_slice = {}; // overwritten later\n"); diff --git a/std/os/index.zig b/std/os/index.zig index 3669dca198..fa1cc418a5 100644 --- a/std/os/index.zig +++ b/std/os/index.zig @@ -2352,12 +2352,11 @@ pub const Thread = struct { stack: []u8, pthread_handle: pthread_t, - pub const use_pthreads = is_posix and builtin.link_libc; - const pthread_t = if (use_pthreads) c.pthread_t else void; - const pid_t = if (!use_pthreads) i32 else void; + const pthread_t = if (builtin.link_pthread) c.pthread_t else void; + const pid_t = if (!builtin.link_pthread) i32 else void; pub fn wait(self: &const Thread) void { - if (use_pthreads) { + if (builtin.link_pthread) { const err = c.pthread_join(self.pthread_handle, null); switch (err) { 0 => {}, @@ -2407,6 +2406,9 @@ pub const SpawnThreadError = error { /// be copied. SystemResources, + /// pthreads requires at least 16384 bytes of stack space + StackTooSmall, + Unexpected, }; @@ -2473,7 +2475,7 @@ pub fn spawnThread(stack: []u8, context: var, comptime startFn: var) SpawnThread if (builtin.os == builtin.Os.windows) { // use windows API directly @compileError("TODO support spawnThread for Windows"); - } else if (Thread.use_pthreads) { + } else if (builtin.link_pthread) { // use pthreads var attr: c.pthread_attr_t = undefined; if (c.pthread_attr_init(&attr) != 0) return SpawnThreadError.SystemResources; @@ -2481,7 +2483,7 @@ pub fn spawnThread(stack: []u8, context: var, comptime startFn: var) SpawnThread const stack_size = stack_end - @ptrToInt(stack.ptr); if (c.pthread_attr_setstack(&attr, @ptrCast(&c_void, stack.ptr), stack_size) != 0) { - return SpawnThreadError.SystemResources; + return SpawnThreadError.StackTooSmall; // pthreads requires at least 16384 bytes } const err = c.pthread_create(&thread_ptr.pthread_handle, &attr, MainFuncs.posixThreadMain, @intToPtr(&c_void, arg)); diff --git a/std/os/test.zig b/std/os/test.zig index 9a155c027a..87486bde4f 100644 --- a/std/os/test.zig +++ b/std/os/test.zig @@ -57,8 +57,8 @@ test "spawn threads" { const thread1 = try std.os.spawnThreadAllocator(&direct_allocator.allocator, {}, start1); const thread4 = try std.os.spawnThreadAllocator(&direct_allocator.allocator, &shared_ctx, start2); - var stack1: [1024]u8 = undefined; - var stack2: [1024]u8 = undefined; + var stack1: [20 * 1024]u8 = undefined; + var stack2: [20 * 1024]u8 = undefined; const thread2 = try std.os.spawnThread(stack1[0..], &shared_ctx, start2); const thread3 = try std.os.spawnThread(stack2[0..], &shared_ctx, start2); From a42542099392cf189b96bdd77ecd88feadfb6382 Mon Sep 17 00:00:00 2001 From: Andrew Kelley Date: Sun, 29 Apr 2018 00:07:32 -0400 Subject: [PATCH 07/12] make pthreads threads work on darwin darwin pthreads adds a restriction that the stack start and end must be page aligned --- std/os/index.zig | 10 +++++++--- std/os/test.zig | 4 ++-- 2 files changed, 9 insertions(+), 5 deletions(-) diff --git a/std/os/index.zig b/std/os/index.zig index fa1cc418a5..8681a018b9 100644 --- a/std/os/index.zig +++ b/std/os/index.zig @@ -2421,7 +2421,7 @@ pub fn spawnThreadAllocator(allocator: &mem.Allocator, context: var, comptime st // TODO compile-time call graph analysis to determine stack upper bound // https://github.com/zig-lang/zig/issues/157 const default_stack_size = 8 * 1024 * 1024; - const stack_bytes = try allocator.alloc(u8, default_stack_size); + const stack_bytes = try allocator.alignedAlloc(u8, os.page_size, default_stack_size); const thread = try spawnThread(stack_bytes, context, startFn); thread.allocator = allocator; return thread; @@ -2431,7 +2431,7 @@ pub fn spawnThreadAllocator(allocator: &mem.Allocator, context: var, comptime st /// fn startFn(@typeOf(context)) T /// where T is u8, noreturn, void, or !void /// caller must call wait on the returned thread -pub fn spawnThread(stack: []u8, context: var, comptime startFn: var) SpawnThreadError!&Thread { +pub fn spawnThread(stack: []align(os.page_size) u8, context: var, comptime startFn: var) SpawnThreadError!&Thread { const Context = @typeOf(context); comptime assert(@ArgType(@typeOf(startFn), 0) == Context); @@ -2481,8 +2481,12 @@ pub fn spawnThread(stack: []u8, context: var, comptime startFn: var) SpawnThread if (c.pthread_attr_init(&attr) != 0) return SpawnThreadError.SystemResources; defer assert(c.pthread_attr_destroy(&attr) == 0); + // align to page + stack_end -= stack_end % os.page_size; + const stack_size = stack_end - @ptrToInt(stack.ptr); - if (c.pthread_attr_setstack(&attr, @ptrCast(&c_void, stack.ptr), stack_size) != 0) { + const setstack_err = c.pthread_attr_setstack(&attr, @ptrCast(&c_void, stack.ptr), stack_size); + if (setstack_err != 0) { return SpawnThreadError.StackTooSmall; // pthreads requires at least 16384 bytes } diff --git a/std/os/test.zig b/std/os/test.zig index 87486bde4f..37e5bf4bb8 100644 --- a/std/os/test.zig +++ b/std/os/test.zig @@ -57,8 +57,8 @@ test "spawn threads" { const thread1 = try std.os.spawnThreadAllocator(&direct_allocator.allocator, {}, start1); const thread4 = try std.os.spawnThreadAllocator(&direct_allocator.allocator, &shared_ctx, start2); - var stack1: [20 * 1024]u8 = undefined; - var stack2: [20 * 1024]u8 = undefined; + var stack1: [20 * 1024]u8 align(os.page_size) = undefined; + var stack2: [20 * 1024]u8 align(os.page_size) = undefined; const thread2 = try std.os.spawnThread(stack1[0..], &shared_ctx, start2); const thread3 = try std.os.spawnThread(stack2[0..], &shared_ctx, start2); From abf90eaa674782e092e49bb23c4c7da0f581f604 Mon Sep 17 00:00:00 2001 From: Andrew Kelley Date: Sun, 29 Apr 2018 00:09:18 -0400 Subject: [PATCH 08/12] enable atomic queue and stack tests for macos --- std/atomic/queue.zig | 4 ++-- std/atomic/stack.zig | 4 ++-- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/std/atomic/queue.zig b/std/atomic/queue.zig index 3866bad7ce..dd9b869f02 100644 --- a/std/atomic/queue.zig +++ b/std/atomic/queue.zig @@ -53,8 +53,8 @@ const puts_per_thread = 10000; const put_thread_count = 3; test "std.atomic.queue" { - if (builtin.os != builtin.Os.linux) { - // TODO implement kernel threads for windows and macos + if (builtin.os == builtin.Os.windows) { + // TODO implement kernel threads for windows return; } var direct_allocator = std.heap.DirectAllocator.init(); diff --git a/std/atomic/stack.zig b/std/atomic/stack.zig index 12de2edaaa..9f2ceacfa3 100644 --- a/std/atomic/stack.zig +++ b/std/atomic/stack.zig @@ -60,8 +60,8 @@ const puts_per_thread = 1000; const put_thread_count = 3; test "std.atomic.stack" { - if (builtin.os != builtin.Os.linux) { - // TODO implement kernel threads for windows and macos + if (builtin.os == builtin.Os.windows) { + // TODO implement kernel threads for windows return; } var direct_allocator = std.heap.DirectAllocator.init(); From bf8e419d2b7853f5cb5aba4dcba45ae28a3840aa Mon Sep 17 00:00:00 2001 From: Andrew Kelley Date: Sun, 29 Apr 2018 00:40:04 -0400 Subject: [PATCH 09/12] linux uses pthreads when linking against libc --- src/all_types.hpp | 1 - src/analyze.cpp | 8 -------- src/codegen.cpp | 2 -- std/c/index.zig | 10 +++++----- std/os/index.zig | 9 +++++---- 5 files changed, 10 insertions(+), 20 deletions(-) diff --git a/src/all_types.hpp b/src/all_types.hpp index f08b870b37..d1b2ad61d2 100644 --- a/src/all_types.hpp +++ b/src/all_types.hpp @@ -1486,7 +1486,6 @@ struct CodeGen { ZigList link_libs_list; LinkLib *libc_link_lib; - LinkLib *pthread_link_lib; // add -framework [name] args to linker ZigList darwin_frameworks; diff --git a/src/analyze.cpp b/src/analyze.cpp index 8a9d236790..1ecfe32f4c 100644 --- a/src/analyze.cpp +++ b/src/analyze.cpp @@ -6049,15 +6049,10 @@ LinkLib *create_link_lib(Buf *name) { LinkLib *add_link_lib(CodeGen *g, Buf *name) { bool is_libc = buf_eql_str(name, "c"); - bool is_pthread = buf_eql_str(name, "pthread"); if (is_libc && g->libc_link_lib != nullptr) return g->libc_link_lib; - if (is_pthread && g->pthread_link_lib != nullptr) { - return g->pthread_link_lib; - } - for (size_t i = 0; i < g->link_libs_list.length; i += 1) { LinkLib *existing_lib = g->link_libs_list.at(i); if (buf_eql_buf(existing_lib->name, name)) { @@ -6071,9 +6066,6 @@ LinkLib *add_link_lib(CodeGen *g, Buf *name) { if (is_libc) g->libc_link_lib = link_lib; - if (is_pthread) - g->pthread_link_lib = link_lib; - return link_lib; } diff --git a/src/codegen.cpp b/src/codegen.cpp index 9f064d5f19..2d8c385f44 100644 --- a/src/codegen.cpp +++ b/src/codegen.cpp @@ -145,7 +145,6 @@ CodeGen *codegen_create(Buf *root_src_path, const ZigTarget *target, OutType out { g->libc_link_lib = create_link_lib(buf_create_from_str("c")); g->link_libs_list.append(g->libc_link_lib); - g->pthread_link_lib = create_link_lib(buf_create_from_str("pthread")); } return g; @@ -6374,7 +6373,6 @@ static void define_builtin_compile_vars(CodeGen *g) { buf_appendf(contents, "pub const object_format = ObjectFormat.%s;\n", cur_obj_fmt); buf_appendf(contents, "pub const mode = %s;\n", build_mode_to_str(g->build_mode)); buf_appendf(contents, "pub const link_libc = %s;\n", bool_to_str(g->libc_link_lib != nullptr)); - buf_appendf(contents, "pub const link_pthread = %s;\n", bool_to_str(g->pthread_link_lib != nullptr)); buf_appendf(contents, "pub const have_error_return_tracing = %s;\n", bool_to_str(g->have_err_ret_tracing)); buf_appendf(contents, "pub const __zig_test_fn_slice = {}; // overwritten later\n"); diff --git a/std/c/index.zig b/std/c/index.zig index 5ea7145cd3..34269d2aa2 100644 --- a/std/c/index.zig +++ b/std/c/index.zig @@ -54,12 +54,12 @@ pub extern "c" fn realloc(&c_void, usize) ?&c_void; pub extern "c" fn free(&c_void) void; pub extern "c" fn posix_memalign(memptr: &&c_void, alignment: usize, size: usize) c_int; -pub extern "c" fn pthread_create(noalias newthread: &pthread_t, +pub extern "pthread" fn pthread_create(noalias newthread: &pthread_t, noalias attr: ?&const pthread_attr_t, start_routine: extern fn(?&c_void) ?&c_void, noalias arg: ?&c_void) c_int; -pub extern "c" fn pthread_attr_init(attr: &pthread_attr_t) c_int; -pub extern "c" fn pthread_attr_setstack(attr: &pthread_attr_t, stackaddr: &c_void, stacksize: usize) c_int; -pub extern "c" fn pthread_attr_destroy(attr: &pthread_attr_t) c_int; -pub extern "c" fn pthread_join(thread: pthread_t, arg_return: ?&?&c_void) c_int; +pub extern "pthread" fn pthread_attr_init(attr: &pthread_attr_t) c_int; +pub extern "pthread" fn pthread_attr_setstack(attr: &pthread_attr_t, stackaddr: &c_void, stacksize: usize) c_int; +pub extern "pthread" fn pthread_attr_destroy(attr: &pthread_attr_t) c_int; +pub extern "pthread" fn pthread_join(thread: pthread_t, arg_return: ?&?&c_void) c_int; pub const pthread_t = &@OpaqueType(); diff --git a/std/os/index.zig b/std/os/index.zig index 8681a018b9..85e46a1bf9 100644 --- a/std/os/index.zig +++ b/std/os/index.zig @@ -2352,11 +2352,12 @@ pub const Thread = struct { stack: []u8, pthread_handle: pthread_t, - const pthread_t = if (builtin.link_pthread) c.pthread_t else void; - const pid_t = if (!builtin.link_pthread) i32 else void; + pub const use_pthreads = is_posix and builtin.link_libc; + const pthread_t = if (use_pthreads) c.pthread_t else void; + const pid_t = if (!use_pthreads) i32 else void; pub fn wait(self: &const Thread) void { - if (builtin.link_pthread) { + if (use_pthreads) { const err = c.pthread_join(self.pthread_handle, null); switch (err) { 0 => {}, @@ -2475,7 +2476,7 @@ pub fn spawnThread(stack: []align(os.page_size) u8, context: var, comptime start if (builtin.os == builtin.Os.windows) { // use windows API directly @compileError("TODO support spawnThread for Windows"); - } else if (builtin.link_pthread) { + } else if (Thread.use_pthreads) { // use pthreads var attr: c.pthread_attr_t = undefined; if (c.pthread_attr_init(&attr) != 0) return SpawnThreadError.SystemResources; From 6376d96824c5205ecc02b2c621bcef5dc78f1a81 Mon Sep 17 00:00:00 2001 From: Andrew Kelley Date: Sun, 29 Apr 2018 02:40:22 -0400 Subject: [PATCH 10/12] support kernel threads for windows * remove std.os.spawnThreadAllocator - windows does not support an explicit stack, so using an allocator for a thread stack space does not work. * std.os.spawnThread - instead of accepting a stack argument, the implementation will directly allocate using OS-specific APIs. --- std/atomic/queue.zig | 8 +- std/atomic/stack.zig | 8 +- std/mem.zig | 1 + std/os/index.zig | 165 +++++++++++++++++++++++++-------------- std/os/test.zig | 20 +---- std/os/windows/index.zig | 6 ++ 6 files changed, 120 insertions(+), 88 deletions(-) diff --git a/std/atomic/queue.zig b/std/atomic/queue.zig index dd9b869f02..1acecbab2c 100644 --- a/std/atomic/queue.zig +++ b/std/atomic/queue.zig @@ -53,10 +53,6 @@ const puts_per_thread = 10000; const put_thread_count = 3; test "std.atomic.queue" { - if (builtin.os == builtin.Os.windows) { - // TODO implement kernel threads for windows - return; - } var direct_allocator = std.heap.DirectAllocator.init(); defer direct_allocator.deinit(); @@ -79,11 +75,11 @@ test "std.atomic.queue" { var putters: [put_thread_count]&std.os.Thread = undefined; for (putters) |*t| { - *t = try std.os.spawnThreadAllocator(a, &context, startPuts); + *t = try std.os.spawnThread(&context, startPuts); } var getters: [put_thread_count]&std.os.Thread = undefined; for (getters) |*t| { - *t = try std.os.spawnThreadAllocator(a, &context, startGets); + *t = try std.os.spawnThread(&context, startGets); } for (putters) |t| t.wait(); diff --git a/std/atomic/stack.zig b/std/atomic/stack.zig index 9f2ceacfa3..accbcc942a 100644 --- a/std/atomic/stack.zig +++ b/std/atomic/stack.zig @@ -60,10 +60,6 @@ const puts_per_thread = 1000; const put_thread_count = 3; test "std.atomic.stack" { - if (builtin.os == builtin.Os.windows) { - // TODO implement kernel threads for windows - return; - } var direct_allocator = std.heap.DirectAllocator.init(); defer direct_allocator.deinit(); @@ -85,11 +81,11 @@ test "std.atomic.stack" { var putters: [put_thread_count]&std.os.Thread = undefined; for (putters) |*t| { - *t = try std.os.spawnThreadAllocator(a, &context, startPuts); + *t = try std.os.spawnThread(&context, startPuts); } var getters: [put_thread_count]&std.os.Thread = undefined; for (getters) |*t| { - *t = try std.os.spawnThreadAllocator(a, &context, startGets); + *t = try std.os.spawnThread(&context, startGets); } for (putters) |t| t.wait(); diff --git a/std/mem.zig b/std/mem.zig index cc3161cddd..0f66f549cc 100644 --- a/std/mem.zig +++ b/std/mem.zig @@ -32,6 +32,7 @@ pub const Allocator = struct { freeFn: fn (self: &Allocator, old_mem: []u8) void, fn create(self: &Allocator, comptime T: type) !&T { + if (@sizeOf(T) == 0) return &{}; const slice = try self.alloc(T, 1); return &slice[0]; } diff --git a/std/os/index.zig b/std/os/index.zig index 85e46a1bf9..6842fd0fb3 100644 --- a/std/os/index.zig +++ b/std/os/index.zig @@ -2347,18 +2347,30 @@ pub fn posixGetSockOptConnectError(sockfd: i32) PosixConnectError!void { } pub const Thread = struct { - pid: pid_t, - allocator: ?&mem.Allocator, - stack: []u8, - pthread_handle: pthread_t, + data: Data, pub const use_pthreads = is_posix and builtin.link_libc; - const pthread_t = if (use_pthreads) c.pthread_t else void; - const pid_t = if (!use_pthreads) i32 else void; + const Data = if (use_pthreads) struct { + handle: c.pthread_t, + stack_addr: usize, + stack_len: usize, + } else switch (builtin.os) { + builtin.Os.linux => struct { + pid: i32, + stack_addr: usize, + stack_len: usize, + }, + builtin.Os.windows => struct { + handle: windows.HANDLE, + alloc_start: &c_void, + heap_handle: windows.HANDLE, + }, + else => @compileError("Unsupported OS"), + }; pub fn wait(self: &const Thread) void { if (use_pthreads) { - const err = c.pthread_join(self.pthread_handle, null); + const err = c.pthread_join(self.data.handle, null); switch (err) { 0 => {}, posix.EINVAL => unreachable, @@ -2366,23 +2378,27 @@ pub const Thread = struct { posix.EDEADLK => unreachable, else => unreachable, } - } else if (builtin.os == builtin.Os.linux) { - while (true) { - const pid_value = @atomicLoad(i32, &self.pid, builtin.AtomicOrder.SeqCst); - if (pid_value == 0) break; - const rc = linux.futex_wait(@ptrToInt(&self.pid), linux.FUTEX_WAIT, pid_value, null); - switch (linux.getErrno(rc)) { - 0 => continue, - posix.EINTR => continue, - posix.EAGAIN => continue, - else => unreachable, + assert(posix.munmap(self.data.stack_addr, self.data.stack_len) == 0); + } else switch (builtin.os) { + builtin.Os.linux => { + while (true) { + const pid_value = @atomicLoad(i32, &self.data.pid, builtin.AtomicOrder.SeqCst); + if (pid_value == 0) break; + const rc = linux.futex_wait(@ptrToInt(&self.data.pid), linux.FUTEX_WAIT, pid_value, null); + switch (linux.getErrno(rc)) { + 0 => continue, + posix.EINTR => continue, + posix.EAGAIN => continue, + else => unreachable, + } } - } - } else { - @compileError("Unsupported OS"); - } - if (self.allocator) |a| { - a.free(self.stack); + assert(posix.munmap(self.data.stack_addr, self.data.stack_len) == 0); + }, + builtin.Os.windows => { + assert(windows.WaitForSingleObject(self.data.handle, windows.INFINITE) == windows.WAIT_OBJECT_0); + assert(windows.HeapFree(self.data.heap_handle, 0, self.data.alloc_start) != 0); + }, + else => @compileError("Unsupported OS"), } } }; @@ -2407,52 +2423,60 @@ pub const SpawnThreadError = error { /// be copied. SystemResources, - /// pthreads requires at least 16384 bytes of stack space - StackTooSmall, + /// Not enough userland memory to spawn the thread. + OutOfMemory, Unexpected, }; -pub const SpawnThreadAllocatorError = SpawnThreadError || error{OutOfMemory}; - /// caller must call wait on the returned thread /// fn startFn(@typeOf(context)) T /// where T is u8, noreturn, void, or !void -pub fn spawnThreadAllocator(allocator: &mem.Allocator, context: var, comptime startFn: var) SpawnThreadAllocatorError!&Thread { +/// caller must call wait on the returned thread +pub fn spawnThread(context: var, comptime startFn: var) SpawnThreadError!&Thread { // TODO compile-time call graph analysis to determine stack upper bound // https://github.com/zig-lang/zig/issues/157 const default_stack_size = 8 * 1024 * 1024; - const stack_bytes = try allocator.alignedAlloc(u8, os.page_size, default_stack_size); - const thread = try spawnThread(stack_bytes, context, startFn); - thread.allocator = allocator; - return thread; -} -/// stack must be big enough to store one Thread and one @typeOf(context), each with default alignment, at the end -/// fn startFn(@typeOf(context)) T -/// where T is u8, noreturn, void, or !void -/// caller must call wait on the returned thread -pub fn spawnThread(stack: []align(os.page_size) u8, context: var, comptime startFn: var) SpawnThreadError!&Thread { const Context = @typeOf(context); comptime assert(@ArgType(@typeOf(startFn), 0) == Context); - var stack_end: usize = @ptrToInt(stack.ptr) + stack.len; - var arg: usize = undefined; - if (@sizeOf(Context) != 0) { - stack_end -= @sizeOf(Context); - stack_end -= stack_end % @alignOf(Context); - assert(stack_end >= @ptrToInt(stack.ptr)); - const context_ptr = @alignCast(@alignOf(Context), @intToPtr(&Context, stack_end)); - *context_ptr = context; - arg = stack_end; - } + if (builtin.os == builtin.Os.windows) { + const WinThread = struct { + const OuterContext = struct { + thread: Thread, + inner: Context, + }; + extern fn threadMain(arg: windows.LPVOID) windows.DWORD { + if (@sizeOf(Context) == 0) { + return startFn({}); + } else { + return startFn(*@ptrCast(&Context, @alignCast(@alignOf(Context), arg))); + } + } + }; - stack_end -= @sizeOf(Thread); - stack_end -= stack_end % @alignOf(Thread); - assert(stack_end >= @ptrToInt(stack.ptr)); - const thread_ptr = @alignCast(@alignOf(Thread), @intToPtr(&Thread, stack_end)); - thread_ptr.stack = stack; - thread_ptr.allocator = null; + const heap_handle = windows.GetProcessHeap() ?? return SpawnThreadError.OutOfMemory; + const byte_count = @alignOf(WinThread.OuterContext) + @sizeOf(WinThread.OuterContext); + const bytes_ptr = windows.HeapAlloc(heap_handle, 0, byte_count) ?? return SpawnThreadError.OutOfMemory; + errdefer assert(windows.HeapFree(heap_handle, 0, bytes_ptr) != 0); + const bytes = @ptrCast(&u8, bytes_ptr)[0..byte_count]; + const outer_context = std.heap.FixedBufferAllocator.init(bytes).allocator.create(WinThread.OuterContext) catch unreachable; + outer_context.inner = context; + outer_context.thread.data.heap_handle = heap_handle; + outer_context.thread.data.alloc_start = bytes_ptr; + + const parameter = if (@sizeOf(Context) == 0) null else @ptrCast(&c_void, &outer_context.inner); + outer_context.thread.data.handle = windows.CreateThread(null, default_stack_size, WinThread.threadMain, + parameter, 0, null) ?? + { + const err = windows.GetLastError(); + return switch (err) { + else => os.unexpectedErrorWindows(err), + }; + }; + return &outer_context.thread; + } const MainFuncs = struct { extern fn linuxThreadMain(ctx_addr: usize) u8 { @@ -2473,6 +2497,29 @@ pub fn spawnThread(stack: []align(os.page_size) u8, context: var, comptime start } }; + const stack_len = default_stack_size; + const stack_addr = posix.mmap(null, stack_len, posix.PROT_READ|posix.PROT_WRITE, + posix.MAP_PRIVATE|posix.MAP_ANONYMOUS|posix.MAP_GROWSDOWN, -1, 0); + if (stack_addr == posix.MAP_FAILED) return error.OutOfMemory; + errdefer _ = posix.munmap(stack_addr, stack_len); + + var stack_end: usize = stack_addr + stack_len; + var arg: usize = undefined; + if (@sizeOf(Context) != 0) { + stack_end -= @sizeOf(Context); + stack_end -= stack_end % @alignOf(Context); + assert(stack_end >= stack_addr); + const context_ptr = @alignCast(@alignOf(Context), @intToPtr(&Context, stack_end)); + *context_ptr = context; + arg = stack_end; + } + + stack_end -= @sizeOf(Thread); + stack_end -= stack_end % @alignOf(Thread); + assert(stack_end >= stack_addr); + const thread_ptr = @alignCast(@alignOf(Thread), @intToPtr(&Thread, stack_end)); + + if (builtin.os == builtin.Os.windows) { // use windows API directly @compileError("TODO support spawnThread for Windows"); @@ -2484,14 +2531,12 @@ pub fn spawnThread(stack: []align(os.page_size) u8, context: var, comptime start // align to page stack_end -= stack_end % os.page_size; + assert(c.pthread_attr_setstack(&attr, @intToPtr(&c_void, stack_addr), stack_len) == 0); - const stack_size = stack_end - @ptrToInt(stack.ptr); - const setstack_err = c.pthread_attr_setstack(&attr, @ptrCast(&c_void, stack.ptr), stack_size); - if (setstack_err != 0) { - return SpawnThreadError.StackTooSmall; // pthreads requires at least 16384 bytes - } + thread_ptr.data.stack_addr = stack_addr; + thread_ptr.data.stack_len = stack_len; - const err = c.pthread_create(&thread_ptr.pthread_handle, &attr, MainFuncs.posixThreadMain, @intToPtr(&c_void, arg)); + const err = c.pthread_create(&thread_ptr.data.handle, &attr, MainFuncs.posixThreadMain, @intToPtr(&c_void, arg)); switch (err) { 0 => return thread_ptr, posix.EAGAIN => return SpawnThreadError.SystemResources, diff --git a/std/os/test.zig b/std/os/test.zig index 37e5bf4bb8..56d6e8b309 100644 --- a/std/os/test.zig +++ b/std/os/test.zig @@ -44,24 +44,12 @@ test "access file" { } test "spawn threads" { - if (builtin.os == builtin.Os.windows) { - // TODO implement threads on windows - return; - } - - var direct_allocator = std.heap.DirectAllocator.init(); - defer direct_allocator.deinit(); - var shared_ctx: i32 = 1; - const thread1 = try std.os.spawnThreadAllocator(&direct_allocator.allocator, {}, start1); - const thread4 = try std.os.spawnThreadAllocator(&direct_allocator.allocator, &shared_ctx, start2); - - var stack1: [20 * 1024]u8 align(os.page_size) = undefined; - var stack2: [20 * 1024]u8 align(os.page_size) = undefined; - - const thread2 = try std.os.spawnThread(stack1[0..], &shared_ctx, start2); - const thread3 = try std.os.spawnThread(stack2[0..], &shared_ctx, start2); + const thread1 = try std.os.spawnThread({}, start1); + const thread2 = try std.os.spawnThread(&shared_ctx, start2); + const thread3 = try std.os.spawnThread(&shared_ctx, start2); + const thread4 = try std.os.spawnThread(&shared_ctx, start2); thread1.wait(); thread2.wait(); diff --git a/std/os/windows/index.zig b/std/os/windows/index.zig index d6ef7205e8..e13ed0f131 100644 --- a/std/os/windows/index.zig +++ b/std/os/windows/index.zig @@ -28,6 +28,9 @@ pub extern "kernel32" stdcallcc fn CreateProcessA(lpApplicationName: ?LPCSTR, lp pub extern "kernel32" stdcallcc fn CreateSymbolicLinkA(lpSymlinkFileName: LPCSTR, lpTargetFileName: LPCSTR, dwFlags: DWORD) BOOLEAN; + +pub extern "kernel32" stdcallcc fn CreateThread(lpThreadAttributes: ?LPSECURITY_ATTRIBUTES, dwStackSize: SIZE_T, lpStartAddress: LPTHREAD_START_ROUTINE, lpParameter: ?LPVOID, dwCreationFlags: DWORD, lpThreadId: ?LPDWORD) ?HANDLE; + pub extern "kernel32" stdcallcc fn DeleteFileA(lpFileName: LPCSTR) BOOL; pub extern "kernel32" stdcallcc fn ExitProcess(exit_code: UINT) noreturn; @@ -318,6 +321,9 @@ pub const HEAP_CREATE_ENABLE_EXECUTE = 0x00040000; pub const HEAP_GENERATE_EXCEPTIONS = 0x00000004; pub const HEAP_NO_SERIALIZE = 0x00000001; +pub const PTHREAD_START_ROUTINE = extern fn(LPVOID) DWORD; +pub const LPTHREAD_START_ROUTINE = PTHREAD_START_ROUTINE; + test "import" { _ = @import("util.zig"); } From b21bcbd7755d236a313c06e6ff167f5395ab8ed4 Mon Sep 17 00:00:00 2001 From: Andrew Kelley Date: Sun, 29 Apr 2018 02:52:04 -0400 Subject: [PATCH 11/12] fix std threads for macos --- std/heap.zig | 6 +++--- std/os/darwin.zig | 8 ++++---- std/os/index.zig | 6 ++++-- std/os/linux/index.zig | 6 +++--- 4 files changed, 14 insertions(+), 12 deletions(-) diff --git a/std/heap.zig b/std/heap.zig index d632b44cd1..bfdf62f658 100644 --- a/std/heap.zig +++ b/std/heap.zig @@ -91,7 +91,7 @@ pub const DirectAllocator = struct { const unused_start = addr; const unused_len = aligned_addr - 1 - unused_start; - var err = p.munmap(@intToPtr(&u8, unused_start), unused_len); + var err = p.munmap(unused_start, unused_len); debug.assert(p.getErrno(err) == 0); //It is impossible that there is an unoccupied page at the top of our @@ -132,7 +132,7 @@ pub const DirectAllocator = struct { const rem = @rem(new_addr_end, os.page_size); const new_addr_end_rounded = new_addr_end + if (rem == 0) 0 else (os.page_size - rem); if (old_addr_end > new_addr_end_rounded) { - _ = os.posix.munmap(@intToPtr(&u8, new_addr_end_rounded), old_addr_end - new_addr_end_rounded); + _ = os.posix.munmap(new_addr_end_rounded, old_addr_end - new_addr_end_rounded); } return old_mem[0..new_size]; } @@ -170,7 +170,7 @@ pub const DirectAllocator = struct { switch (builtin.os) { Os.linux, Os.macosx, Os.ios => { - _ = os.posix.munmap(bytes.ptr, bytes.len); + _ = os.posix.munmap(@ptrToInt(bytes.ptr), bytes.len); }, Os.windows => { const record_addr = @ptrToInt(bytes.ptr) + bytes.len; diff --git a/std/os/darwin.zig b/std/os/darwin.zig index 44418649ab..0a62b03ab2 100644 --- a/std/os/darwin.zig +++ b/std/os/darwin.zig @@ -184,7 +184,7 @@ pub fn write(fd: i32, buf: &const u8, nbyte: usize) usize { return errnoWrap(c.write(fd, @ptrCast(&const c_void, buf), nbyte)); } -pub fn mmap(address: ?&u8, length: usize, prot: usize, flags: usize, fd: i32, +pub fn mmap(address: ?&u8, length: usize, prot: usize, flags: u32, fd: i32, offset: isize) usize { const ptr_result = c.mmap(@ptrCast(&c_void, address), length, @@ -193,8 +193,8 @@ pub fn mmap(address: ?&u8, length: usize, prot: usize, flags: usize, fd: i32, return errnoWrap(isize_result); } -pub fn munmap(address: &u8, length: usize) usize { - return errnoWrap(c.munmap(@ptrCast(&c_void, address), length)); +pub fn munmap(address: usize, length: usize) usize { + return errnoWrap(c.munmap(@intToPtr(&c_void, address), length)); } pub fn unlink(path: &const u8) usize { @@ -341,4 +341,4 @@ pub const timeval = c.timeval; pub const mach_timebase_info_data = c.mach_timebase_info_data; pub const mach_absolute_time = c.mach_absolute_time; -pub const mach_timebase_info = c.mach_timebase_info; \ No newline at end of file +pub const mach_timebase_info = c.mach_timebase_info; diff --git a/std/os/index.zig b/std/os/index.zig index 6842fd0fb3..5053a1b016 100644 --- a/std/os/index.zig +++ b/std/os/index.zig @@ -2497,11 +2497,13 @@ pub fn spawnThread(context: var, comptime startFn: var) SpawnThreadError!&Thread } }; + const MAP_GROWSDOWN = if (builtin.os == builtin.Os.linux) linux.MAP_GROWSDOWN else 0; + const stack_len = default_stack_size; const stack_addr = posix.mmap(null, stack_len, posix.PROT_READ|posix.PROT_WRITE, - posix.MAP_PRIVATE|posix.MAP_ANONYMOUS|posix.MAP_GROWSDOWN, -1, 0); + posix.MAP_PRIVATE|posix.MAP_ANONYMOUS|MAP_GROWSDOWN, -1, 0); if (stack_addr == posix.MAP_FAILED) return error.OutOfMemory; - errdefer _ = posix.munmap(stack_addr, stack_len); + errdefer assert(posix.munmap(stack_addr, stack_len) == 0); var stack_end: usize = stack_addr + stack_len; var arg: usize = undefined; diff --git a/std/os/linux/index.zig b/std/os/linux/index.zig index dcd9532d1d..368f074b9b 100644 --- a/std/os/linux/index.zig +++ b/std/os/linux/index.zig @@ -706,13 +706,13 @@ pub fn umount2(special: &const u8, flags: u32) usize { return syscall2(SYS_umount2, @ptrToInt(special), flags); } -pub fn mmap(address: ?&u8, length: usize, prot: usize, flags: usize, fd: i32, offset: isize) usize { +pub fn mmap(address: ?&u8, length: usize, prot: usize, flags: u32, fd: i32, offset: isize) usize { return syscall6(SYS_mmap, @ptrToInt(address), length, prot, flags, usize(fd), @bitCast(usize, offset)); } -pub fn munmap(address: &u8, length: usize) usize { - return syscall2(SYS_munmap, @ptrToInt(address), length); +pub fn munmap(address: usize, length: usize) usize { + return syscall2(SYS_munmap, address, length); } pub fn read(fd: i32, buf: &u8, count: usize) usize { From c76b0a845fb4176479c8bbf915e57dbdfdb7a594 Mon Sep 17 00:00:00 2001 From: Andrew Kelley Date: Sun, 29 Apr 2018 02:56:59 -0400 Subject: [PATCH 12/12] fix std threads for linux --- std/os/index.zig | 17 ++++++++--------- 1 file changed, 8 insertions(+), 9 deletions(-) diff --git a/std/os/index.zig b/std/os/index.zig index 5053a1b016..ee9ff1516c 100644 --- a/std/os/index.zig +++ b/std/os/index.zig @@ -2499,13 +2499,13 @@ pub fn spawnThread(context: var, comptime startFn: var) SpawnThreadError!&Thread const MAP_GROWSDOWN = if (builtin.os == builtin.Os.linux) linux.MAP_GROWSDOWN else 0; - const stack_len = default_stack_size; - const stack_addr = posix.mmap(null, stack_len, posix.PROT_READ|posix.PROT_WRITE, + const mmap_len = default_stack_size; + const stack_addr = posix.mmap(null, mmap_len, posix.PROT_READ|posix.PROT_WRITE, posix.MAP_PRIVATE|posix.MAP_ANONYMOUS|MAP_GROWSDOWN, -1, 0); if (stack_addr == posix.MAP_FAILED) return error.OutOfMemory; - errdefer assert(posix.munmap(stack_addr, stack_len) == 0); + errdefer assert(posix.munmap(stack_addr, mmap_len) == 0); - var stack_end: usize = stack_addr + stack_len; + var stack_end: usize = stack_addr + mmap_len; var arg: usize = undefined; if (@sizeOf(Context) != 0) { stack_end -= @sizeOf(Context); @@ -2521,6 +2521,8 @@ pub fn spawnThread(context: var, comptime startFn: var) SpawnThreadError!&Thread assert(stack_end >= stack_addr); const thread_ptr = @alignCast(@alignOf(Thread), @intToPtr(&Thread, stack_end)); + thread_ptr.data.stack_addr = stack_addr; + thread_ptr.data.stack_len = mmap_len; if (builtin.os == builtin.Os.windows) { // use windows API directly @@ -2533,10 +2535,7 @@ pub fn spawnThread(context: var, comptime startFn: var) SpawnThreadError!&Thread // align to page stack_end -= stack_end % os.page_size; - assert(c.pthread_attr_setstack(&attr, @intToPtr(&c_void, stack_addr), stack_len) == 0); - - thread_ptr.data.stack_addr = stack_addr; - thread_ptr.data.stack_len = stack_len; + assert(c.pthread_attr_setstack(&attr, @intToPtr(&c_void, stack_addr), stack_end - stack_addr) == 0); const err = c.pthread_create(&thread_ptr.data.handle, &attr, MainFuncs.posixThreadMain, @intToPtr(&c_void, arg)); switch (err) { @@ -2552,7 +2551,7 @@ pub fn spawnThread(context: var, comptime startFn: var) SpawnThreadError!&Thread | posix.CLONE_THREAD | posix.CLONE_SYSVSEM // | posix.CLONE_SETTLS | posix.CLONE_PARENT_SETTID | posix.CLONE_CHILD_CLEARTID | posix.CLONE_DETACHED; const newtls: usize = 0; - const rc = posix.clone(MainFuncs.linuxThreadMain, stack_end, flags, arg, &thread_ptr.pid, newtls, &thread_ptr.pid); + const rc = posix.clone(MainFuncs.linuxThreadMain, stack_end, flags, arg, &thread_ptr.data.pid, newtls, &thread_ptr.data.pid); const err = posix.getErrno(rc); switch (err) { 0 => return thread_ptr,