From e16d3d162a85a822e16ae181ecc6ddc507278126 Mon Sep 17 00:00:00 2001 From: kprotty Date: Sat, 19 Jun 2021 17:08:56 -0500 Subject: [PATCH 01/24] std.Thread: rewrite + extensions --- lib/std/Thread.zig | 959 ++++++++++++++++++++++++--------------------- 1 file changed, 513 insertions(+), 446 deletions(-) diff --git a/lib/std/Thread.zig b/lib/std/Thread.zig index 06fe2a84dc..53be5d6d83 100644 --- a/lib/std/Thread.zig +++ b/lib/std/Thread.zig @@ -8,7 +8,10 @@ //! primitives that operate on kernel threads. For concurrency primitives that support //! both evented I/O and async I/O, see the respective names in the top level std namespace. -data: Data, +const std = @import("std.zig"); +const os = std.os; +const target = std.Target.current; +const Atomic = std.atomic.Atomic; pub const AutoResetEvent = @import("Thread/AutoResetEvent.zig"); pub const Futex = @import("Thread/Futex.zig"); @@ -18,118 +21,59 @@ pub const Mutex = @import("Thread/Mutex.zig"); pub const Semaphore = @import("Thread/Semaphore.zig"); pub const Condition = @import("Thread/Condition.zig"); -pub const use_pthreads = std.Target.current.os.tag != .windows and builtin.link_libc; +pub const spinLoopHint = @compileError("deprecated: use std.atomic.spinLoopHint"); -const Thread = @This(); -const std = @import("std.zig"); -const builtin = std.builtin; -const os = std.os; -const mem = std.mem; -const windows = std.os.windows; -const c = std.c; -const assert = std.debug.assert; +pub const use_pthreads = target.os.tag != .windows and std.builtin.link_libc; + +const Impl = if (target.os.tag == .windows) + WindowsThreadImpl +else if (use_pthreads) + PosixThreadImpl +else if (target.os.tag == .linux) + LinuxThreadImpl +else + @compileLog("Unsupported operating system", target.os.tag); + +impl: Impl, -const bad_startfn_ret = "expected return type of startFn to be 'u8', 'noreturn', 'void', or '!void'"; /// Represents a kernel thread handle. /// May be an integer or a pointer depending on the platform. /// On Linux and POSIX, this is the same as Id. -pub const Handle = if (use_pthreads) - c.pthread_t -else switch (std.Target.current.os.tag) { - .linux => i32, - .windows => windows.HANDLE, - else => void, -}; +pub const Handle = Impl.ThreadHandle; /// Represents a unique ID per thread. /// May be an integer or pointer depending on the platform. /// On Linux and POSIX, this is the same as Handle. -pub const Id = switch (std.Target.current.os.tag) { - .windows => windows.DWORD, - else => Handle, -}; +pub const Id = Impl.ThreadId; -pub const Data = if (use_pthreads) - struct { - handle: Thread.Handle, - memory: []u8, - } -else switch (std.Target.current.os.tag) { - .linux => struct { - handle: Thread.Handle, - memory: []align(mem.page_size) u8, - }, - .windows => struct { - handle: Thread.Handle, - alloc_start: *c_void, - heap_handle: windows.HANDLE, - }, - else => struct {}, -}; - -pub const spinLoopHint = @compileError("deprecated: use std.atomic.spinLoopHint"); - -/// Returns the ID of the calling thread. -/// Makes a syscall every time the function is called. -/// On Linux and POSIX, this Id is the same as a Handle. +/// Returns the platform ID of the callers thread. +/// Attempts to use thread locals and avoid syscalls when possible. pub fn getCurrentId() Id { - if (use_pthreads) { - return c.pthread_self(); - } else return switch (std.Target.current.os.tag) { - .linux => os.linux.gettid(), - .windows => windows.kernel32.GetCurrentThreadId(), - else => @compileError("Unsupported OS"), - }; + return Impl.getCurrentId(); } -/// Returns the handle of this thread. -/// On Linux and POSIX, this is the same as Id. -/// On Linux, it is possible that the thread spawned with `spawn` -/// finishes executing entirely before the clone syscall completes. In this -/// case, this function will return 0 rather than the no-longer-existing thread's -/// pid. -pub fn handle(self: Thread) Handle { - return self.data.handle; +pub const CpuCountError = error{ + PermissionDenied, + SystemResources, + Unexpected, +}; + +/// Returns the platforms view on the number of logical CPU cores available. +pub fn getCpuCount() CpuCountError!usize { + return Impl.getCpuCount(); } -pub fn wait(self: *Thread) void { - if (use_pthreads) { - const err = c.pthread_join(self.data.handle, null); - switch (err) { - 0 => {}, - os.EINVAL => unreachable, - os.ESRCH => unreachable, - os.EDEADLK => unreachable, - else => unreachable, - } - std.heap.c_allocator.free(self.data.memory); - std.heap.c_allocator.destroy(self); - } else switch (std.Target.current.os.tag) { - .linux => { - while (true) { - const pid_value = @atomicLoad(i32, &self.data.handle, .SeqCst); - if (pid_value == 0) break; - const rc = os.linux.futex_wait(&self.data.handle, os.linux.FUTEX_WAIT, pid_value, null); - switch (os.linux.getErrno(rc)) { - 0 => continue, - os.EINTR => continue, - os.EAGAIN => continue, - else => unreachable, - } - } - os.munmap(self.data.memory); - }, - .windows => { - windows.WaitForSingleObjectEx(self.data.handle, windows.INFINITE, false) catch unreachable; - windows.CloseHandle(self.data.handle); - windows.HeapFree(self.data.heap_handle, 0, self.data.alloc_start); - }, - else => @compileError("Unsupported OS"), - } -} +/// Configuration options for hints on how to spawn threads. +pub const SpawnConfig = struct { + // TODO compile-time call graph analysis to determine stack upper bound + // https://github.com/ziglang/zig/issues/157 -pub const SpawnError = error{ + /// Size in bytes of the Thread's stack + stack_size: usize = 16 * 1024 * 1024, +}; + +pub const SpawnError = error { /// A system-imposed limit on the number of threads was encountered. /// There are a number of limits that may trigger this error: /// * the RLIMIT_NPROC soft resource limit (set via setrlimit(2)), @@ -159,248 +103,376 @@ pub const SpawnError = error{ Unexpected, }; -// Given `T`, the type of the thread startFn, extract the expected type for the -// context parameter. -fn SpawnContextType(comptime T: type) type { - const TI = @typeInfo(T); - if (TI != .Fn) - @compileError("expected function type, found " ++ @typeName(T)); +/// Spawns a new thread which executes `function` using `args` and returns a handle the spawned thread. +/// `config` can be used as hints to the platform for now to spawn and execute the `function`. +/// The caller must eventually either call `join()` to wait for the thread to finish and free its resources +/// or call `detach()` to excuse the caller from calling `join()` and have the thread clean up its resources on completion`. +pub fn spawn( + config: SpawnConfig, + comptime function: anytype, + args: std.meta.ArgsTuple(function), +) SpawnError!Thread { + if (std.builtin.single_threaded) { + @compileError("cannot spawn thread when building in single-threaded mode"); + } - if (TI.Fn.args.len != 1) - @compileError("expected function with single argument, found " ++ @typeName(T)); - - return TI.Fn.args[0].arg_type orelse - @compileError("cannot use a generic function as thread startFn"); + const impl = try Thread.spawn(config, function, args); + return .{ .impl = impl }; } -/// Spawns a new thread executing startFn, returning an handle for it. -/// Caller must call wait on the returned thread. -/// The `startFn` function must take a single argument of type T and return a -/// value of type u8, noreturn, void or !void. -/// The `context` parameter is of type T and is passed to the spawned thread. -pub fn spawn(comptime startFn: anytype, context: SpawnContextType(@TypeOf(startFn))) SpawnError!*Thread { - if (builtin.single_threaded) @compileError("cannot spawn thread when building in single-threaded mode"); - // TODO compile-time call graph analysis to determine stack upper bound - // https://github.com/ziglang/zig/issues/157 - const default_stack_size = 16 * 1024 * 1024; +/// Used by the Thread implementations to call the spawned function with the arguments. +fn callFn(comptime f: anytype, args: anytype) switch (Impl) { + WindowsThreadImpl => windows.DWORD, + LinuxThreadImpl => u8, + PosixThreadImpl => ?*c_void, + else => unreachable, +} { + const default_value = if (Impl == PosixThreadImpl) null else 0; + const bad_fn_ret = "expected return type of startFn to be 'u8', 'noreturn', 'void', or '!void'"; - const Context = @TypeOf(context); + switch (@typeInfo(@typeInfo(@TypeOf(f)).Fn.return_type.?)) { + .NoReturn => { + @call(.{}, f, args); + }, + .Void => { + @call(.{}, f, args); + return default_value; + }, + .Int => |info| { + if (info.bits != 8) { + @compileError(bad_fn_ret); + } - if (std.Target.current.os.tag == .windows) { - const WinThread = struct { - const OuterContext = struct { - thread: Thread, - inner: Context, - }; - fn threadMain(raw_arg: windows.LPVOID) callconv(.C) windows.DWORD { - const arg = if (@sizeOf(Context) == 0) undefined // - else @ptrCast(*Context, @alignCast(@alignOf(Context), raw_arg)).*; + const status = @call(.{}, f, args); + if (Impl != PosixThreadImpl) { + return status; + } - switch (@typeInfo(@typeInfo(@TypeOf(startFn)).Fn.return_type.?)) { - .NoReturn => { - startFn(arg); - }, - .Void => { - startFn(arg); - return 0; - }, - .Int => |info| { - if (info.bits != 8) { - @compileError(bad_startfn_ret); - } - return startFn(arg); - }, - .ErrorUnion => |info| { - if (info.payload != void) { - @compileError(bad_startfn_ret); - } - startFn(arg) catch |err| { - std.debug.warn("error: {s}\n", .{@errorName(err)}); - if (@errorReturnTrace()) |trace| { - std.debug.dumpStackTrace(trace.*); - } - }; - return 0; - }, - else => @compileError(bad_startfn_ret), + // pthreads don't support exit status, ignore value + _ = status; + return default_value; + }, + .ErrorUnion => |info| { + if (info.payload != void) { + @compileError(bad_fn_ret); + } + + @call(.{}, f, args) catch |err| { + std.debug.warn("error: {s}\n", .{@errorName(err)}); + if (@errorReturnTrace()) |trace| { + std.debug.dumpStackTrace(trace.*); } + }; + + return default_value; + }, + else => { + @compileError(bad_fn_ret); + }, + } +} + +/// Retrns the handle of this thread +/// On Linux and POSIX, this is the same as Id. +pub fn getHandle(self: Thread) Handle { + return self.impl.getHandle(); +} + +/// Release the obligation of the caller to call `join()` and have the thread clean up its own resources on completion. +pub fn detach(self: Thread) void { + return self.impl.detach(); +} + +/// Waits for the thread to complete, then deallocates any resources created on `spawn()`. +pub fn join(self: Thread) void { + return self.impl.join(); +} + +/// State to synchronize detachment of spawner thread to spawned thread +const Completion = Atomic(enum { + running, + detached, + completed, +}); + +const WindowsThreadImpl = struct { + const windows = os.windows; + + pub const ThreadHandle = windows.HANDLE; + pub const ThreadId = windows.DWORD; + + fn getCurrentId() ThreadId { + return windows.kernel.GetCurrentThreadId(); + } + + fn getCpuCount() !usize { + return windows.peb().NumberOfProcessors; + } + + thread: *ThreadCompletion, + + const ThreadCompletion = struct { + completion: Completion, + heap_ptr: windows.PVOID, + heap_handle: windows.HANDLE, + thread_handle: windows.HANDLE = undefined, + + fn free(self: ThreadCompletion) void { + const status = windows.kernel32.HeapFree(self.heap_handle, 0, self.heap_ptr); + assert(status == 0); + } + }; + + fn spawn(config: SpawnConfig, comptime f: anytype, args: anytype) !Impl { + const Args = @TypeOf(args); + const Instance = struct { + fn_args: Args, + thread: ThreadCompletion, + + fn entryFn(raw_ptr: *windows.PVOID) callconv(.C) windows.DWORD { + const self = @ptrCast(*@This(), @alignCast(@alignOf(@This()), raw_ptr)); + defer switch (self.thread.completion.swap(.completed, .Acquire)) { + .running => {}, + .completed => unreachable, + .detached => self.thread.free(), + }; + return callFn(f, self.fn_args); } }; const heap_handle = windows.kernel32.GetProcessHeap() orelse return error.OutOfMemory; - const byte_count = @alignOf(WinThread.OuterContext) + @sizeOf(WinThread.OuterContext); - const bytes_ptr = windows.kernel32.HeapAlloc(heap_handle, 0, byte_count) orelse return error.OutOfMemory; - errdefer assert(windows.kernel32.HeapFree(heap_handle, 0, bytes_ptr) != 0); - const bytes = @ptrCast([*]u8, bytes_ptr)[0..byte_count]; - const outer_context = std.heap.FixedBufferAllocator.init(bytes).allocator.create(WinThread.OuterContext) catch unreachable; - outer_context.* = WinThread.OuterContext{ - .thread = Thread{ - .data = Thread.Data{ - .heap_handle = heap_handle, - .alloc_start = bytes_ptr, - .handle = undefined, - }, + const alloc_bytes = @alignOf(Instance) + @sizeOf(Instance); + const alloc_ptr = windows.kernel32.HeapAlloc(heap_handle, 0, alloc_bytes) orelse return error.OutOfMemory; + errdefer assert(windows.kernel32.HeapFree(heap_handle, 0, alloc_ptr) != 0); + + const instance_bytes = @ptrCast([*]u8, alloc_ptr)[0..alloc_bytes]; + const instance = std.heap.FixedBufferAllocator.init(instance_bytes).allocator.create(Instance) catch unreachable; + instance.* = .{ + .fn_args = args, + .thread = .{ + .completion = Completion.init(.running), + .heap_ptr = alloc_ptr, + .heap_handle = heap_handle, }, - .inner = context, }; - const parameter = if (@sizeOf(Context) == 0) null else @ptrCast(*c_void, &outer_context.inner); - outer_context.thread.data.handle = windows.kernel32.CreateThread(null, default_stack_size, WinThread.threadMain, parameter, 0, null) orelse { - switch (windows.kernel32.GetLastError()) { - else => |err| return windows.unexpectedError(err), - } + const stack_size = std.math.min(64 * 1024, std.math.cast(u32, config.stack_size) catch std.math.maxInt(u32)); + + const parameter = @ptrCast(*c_void, impl); + + instance.thread.thread_handle = windows.CreateThread(null, stack_size, Impl.entry, parameter, 0, null) orelse { + return windows.unexpectedError(windows.kernel32.GetLastError()); }; - return &outer_context.thread; + + return .{ .thread = &instance.thread }; } - const MainFuncs = struct { - fn linuxThreadMain(ctx_addr: usize) callconv(.C) u8 { - const arg = if (@sizeOf(Context) == 0) undefined // - else @intToPtr(*Context, ctx_addr).*; + fn getHandle(self: Impl) ThreadHandle { + return self.thread.thread_handle; + } - switch (@typeInfo(@typeInfo(@TypeOf(startFn)).Fn.return_type.?)) { - .NoReturn => { - startFn(arg); - }, - .Void => { - startFn(arg); - return 0; - }, - .Int => |info| { - if (info.bits != 8) { - @compileError(bad_startfn_ret); - } - return startFn(arg); - }, - .ErrorUnion => |info| { - if (info.payload != void) { - @compileError(bad_startfn_ret); - } - startFn(arg) catch |err| { - std.debug.warn("error: {s}\n", .{@errorName(err)}); - if (@errorReturnTrace()) |trace| { - std.debug.dumpStackTrace(trace.*); - } - }; - return 0; - }, - else => @compileError(bad_startfn_ret), - } + fn detach(self: Impl) void { + windows.CloseHandle(self.thread.thread_handle); + switch (self.thread.completion.swap(.detached, .AcqRel)) { + .running => {}, + .completed => self.thread.free(), + .detached => unreachable, } - fn posixThreadMain(ctx: ?*c_void) callconv(.C) ?*c_void { - const arg = if (@sizeOf(Context) == 0) undefined // - else @ptrCast(*Context, @alignCast(@alignOf(Context), ctx)).*; + } - switch (@typeInfo(@typeInfo(@TypeOf(startFn)).Fn.return_type.?)) { - .NoReturn => { - startFn(arg); - }, - .Void => { - startFn(arg); - return null; - }, - .Int => |info| { - if (info.bits != 8) { - @compileError(bad_startfn_ret); - } - // pthreads don't support exit status, ignore value - _ = startFn(arg); - return null; - }, - .ErrorUnion => |info| { - if (info.payload != void) { - @compileError(bad_startfn_ret); - } - startFn(arg) catch |err| { - std.debug.warn("error: {s}\n", .{@errorName(err)}); - if (@errorReturnTrace()) |trace| { - std.debug.dumpStackTrace(trace.*); - } - }; - return null; - }, - else => @compileError(bad_startfn_ret), - } + fn join(self: Impl) void { + windows.WaitForSingleObjectEx(self.thread.thread_handle, windows.INFINITE, false) catch unreachable; + windows.CloseHandle(self.thread.thread_handle); + self.thread.free(); + } +}; + +const PosixThreadImpl = struct { + const c = std.c; + + pub const ThreadHandle = c.pthread_t; + pub const ThreadId = ThreadHandle; + + fn getCurrentId() ThreadId { + return c.pthread_self(); + } + + fn getCpuCount() !usize { + switch (target.os.tag) { + .linux => return LinuxThreadImpl.getCpuCount(), + .openbsd => { + var count: c_int = undefined; + var count_size: usize = @sizeOf(c_int); + const mib = [_]c_int{ os.CTL_HW, os.HW_NCPUONLINE }; + os.sysctl(&mib, &count, &count_size, null, 0) catch |err| switch (err) { + error.NameTooLong, error.UnknownName => unreachable, + else => |e| return e, + }; + return @intCast(usize, count); + }, + .haiku => { + var count: u32 = undefined; + var system_info: os.system_info = undefined; + _ = os.system.get_system_info(&system_info); // always returns B_OK + count = system_info.cpu_count; + return @intCast(usize, count); + }, + else => { + var count: c_int = undefined; + var count_len: usize = @sizeOf(c_int); + const name = if (comptime target.isDarwin()) "hw.logicalcpu" else "hw.ncpu"; + os.sysctlbynameZ(name, &count, &count_len, null, 0) catch |err| switch (err) { + error.NameTooLong, error.UnknownName => unreachable, + else => |e| return e, + }; + return @intCast(usize, count); + }, } - }; + } + + handle: ThreadHandle, + + fn spawn(config: SpawnConfig, comptime f: anytype, args: anytype) !Impl { + const Args = @TypeOf(args); + const allocator = std.heap.c_allocator; + const Instance = struct { + fn entryFn(raw_arg: ?*c_void) callconv(.C) ?*c_void { + const args_ptr = @ptrCast(*Args, @alignCast(@alignOf(Args), raw_arg orelse unreachable)); + defer allocator.destroy(args_ptr); + return callFn(f, args_ptr.*); + } + }; + + const args_ptr = try allocator.create(Args); + errdefer allocator.destroy(args_ptr); - if (Thread.use_pthreads) { var attr: c.pthread_attr_t = undefined; if (c.pthread_attr_init(&attr) != 0) return error.SystemResources; defer assert(c.pthread_attr_destroy(&attr) == 0); - const thread_obj = try std.heap.c_allocator.create(Thread); - errdefer std.heap.c_allocator.destroy(thread_obj); - if (@sizeOf(Context) > 0) { - thread_obj.data.memory = try std.heap.c_allocator.allocAdvanced( - u8, - @alignOf(Context), - @sizeOf(Context), - .at_least, - ); - errdefer std.heap.c_allocator.free(thread_obj.data.memory); - mem.copy(u8, thread_obj.data.memory, mem.asBytes(&context)); - } else { - thread_obj.data.memory = @as([*]u8, undefined)[0..0]; - } - // Use the same set of parameters used by the libc-less impl. - assert(c.pthread_attr_setstacksize(&attr, default_stack_size) == 0); - assert(c.pthread_attr_setguardsize(&attr, mem.page_size) == 0); + const stack_size = std.math.max(config.stack_size, 16 * 1024); + assert(c.pthread_attr_setstacksize(&attr, stack_size) == 0); + assert(c.pthread_attr_setguardsize(&attr, std.mem.page_size) == 0); - const err = c.pthread_create( - &thread_obj.data.handle, + var handle: c.pthread_t = undefined; + return switch (c.pthread_create( + &handle, &attr, - MainFuncs.posixThreadMain, - thread_obj.data.memory.ptr, - ); - switch (err) { - 0 => return thread_obj, - os.EAGAIN => return error.SystemResources, + Instance.entryFn, + @ptrCast(*c_void, args_ptr), + )) { + 0 => .{ .handle = handle }, + os.EAGAIN => error.SystemResources, os.EPERM => unreachable, os.EINVAL => unreachable, - else => return os.unexpectedErrno(err), - } - - return thread_obj; + else => os.unexpectedErrno(err), + }; } - var guard_end_offset: usize = undefined; - var stack_end_offset: usize = undefined; - var thread_start_offset: usize = undefined; - var context_start_offset: usize = undefined; - var tls_start_offset: usize = undefined; - const mmap_len = blk: { - var l: usize = mem.page_size; - // Allocate a guard page right after the end of the stack region - guard_end_offset = l; - // The stack itself, which grows downwards. - l = mem.alignForward(l + default_stack_size, mem.page_size); - stack_end_offset = l; - // Above the stack, so that it can be in the same mmap call, put the Thread object. - l = mem.alignForward(l, @alignOf(Thread)); - thread_start_offset = l; - l += @sizeOf(Thread); - // Next, the Context object. - if (@sizeOf(Context) != 0) { - l = mem.alignForward(l, @alignOf(Context)); - context_start_offset = l; - l += @sizeOf(Context); + fn getHandle(self: Impl) ThreadHandle { + return self.handle; + } + + fn detach(self: Impl) void { + switch (c.pthread_detach(self.handle)) { + os.EINVAL => unreachable, + os.ESRCH => unreachable, + else => unreachable, } - // Finally, the Thread Local Storage, if any. - l = mem.alignForward(l, os.linux.tls.tls_image.alloc_align); - tls_start_offset = l; - l += os.linux.tls.tls_image.alloc_size; - // Round the size to the page size. - break :blk mem.alignForward(l, mem.page_size); + } + + fn join(self: Impl) void { + switch (c.pthread_join(self.handle, null)) { + 0 => {}, + os.EINVAL => unreachable, + os.ESRCH => unreachable, + os.EDEADLK => unreachable, + else => unreachable, + } + } +}; + +const LinuxThreadImpl = struct { + const linux = os.linux; + + pub const ThreadHandle = i32; + pub const ThreadId = ThreadHandle; + + threadlocal var tls_thread_id: ?ThreadId = null; + + fn getCurrentId() ThreadId { + return tls_thread_id orelse { + const tid = linux.gettid(); + tls_thread_id = tid; + return tid; + }; + } + + fn getCpuCount() !usize { + const cpu_set = try os.sched_getaffinity(0); + return @as(usize, os.CPU_COUNT(cpu_set)); // TODO should not need this usize cast + } + + thread: *ThreadCompletion, + + const ThreadCompletion = struct { + completion: Completion = Completion.init(.running), + child_tid: Atomic(i32) = Atomic(i32).init(0), + parent_tid: i32 = undefined, + mapped: []align(std.mem.page_size) u8, }; - const mmap_slice = mem: { - // Map the whole stack with no rw permissions to avoid - // committing the whole region right away - const mmap_slice = os.mmap( + fn spawn(config: SpawnConfig, comptime f: anytype, args: anytype) !Impl { + const Args = @TypeOf(args); + const Instance = struct { + fn_args: Args, + thread: ThreadCompletion, + + fn entryFn(raw_arg: usize) callconv(.C) u8 { + const self = @intToPtr(*@This(), raw_arg); + defer switch (self.thread.completion.swap(.completed, .Acquire)) { + .running => {}, + .completed => unreachable, + .detached => { + const memory = self.thread.mapped; + __unmap_and_exit(@ptrToInt(memory.ptr), memory.len); + }, + }; + return callFn(f, self.fn_args); + } + }; + + var guard_offset: usize = undefined; + var stack_offset: usize = undefined; + var tls_offset: usize = undefined; + var instance_offset: usize = undefined; + + const map_bytes = blk: { + var bytes: usize = std.mem.page_size; + guard_offset = bytes; + + bytes += std.math.max(std.mem.page_size, config.stack_size); + bytes = std.mem.alignForward(bytes, std.mem.page_size); + stack_offset = bytes; + + bytes = std.mem.alignForward(bytes, linux.tls.tls_image.alloc_align); + tls_offset = bytes; + bytes += linux.tls.tls_image.alloc_size; + + bytes = std.mem.alignForward(bytes, @alignOf(Instance)); + instance_offset = bytes; + bytes += @sizeOf(Instance); + + bytes = std.mem.alignForward(bytes, std.mem.page_size); + break :blk bytes; + }; + + // map all memory needed without read/write permissions + // to avoid committing the whole region right away + const mapped = os.mmap( null, - mmap_len, + map_bytes, os.PROT_NONE, os.MAP_PRIVATE | os.MAP_ANONYMOUS, -1, @@ -411,175 +483,170 @@ pub fn spawn(comptime startFn: anytype, context: SpawnContextType(@TypeOf(startF error.PermissionDenied => unreachable, else => |e| return e, }; - errdefer os.munmap(mmap_slice); + errdefer os.munmap(mapped); - // Map everything but the guard page as rw + // map everything but the guard page as read/write os.mprotect( - mmap_slice[guard_end_offset..], + mapped[guard_offset..], os.PROT_READ | os.PROT_WRITE, ) catch |err| switch (err) { error.AccessDenied => unreachable, else => |e| return e, }; - break :mem mmap_slice; - }; + // Prepare the TLS segment and prepare a user_desc struct when needed on i386 + var tls_ptr = os.linux.tls.prepareTLS(mapped[tls_offset..]); + var user_desc: if (target.cpu.arch == .i386) os.linux.user_desc else void = undefined; + if (target.cpu.arch == .i386) { + defer tls_ptr = @ptrToInt(&user_desc); + user_desc = .{ + .entry_number = os.linux.tls.tls_image.gdt_entry_number, + .base_addr = tks_ptr, + .limit = 0xfffff, + .seg_32bit = 1, + .contents = 0, // Data + .read_exec_only = 0, + .limit_in_pages = 1, + .seg_not_present = 0, + .useable = 1, + }; + } - const mmap_addr = @ptrToInt(mmap_slice.ptr); + const instance = @ptrCast(*Instance, @alignCast(@alignOf(Instance), &mapped[instance_offset])); + instance.* = .{ + .fn_args = args, + .thread = .{ .mapped = .mapped }, + }; - const thread_ptr = @alignCast(@alignOf(Thread), @intToPtr(*Thread, mmap_addr + thread_start_offset)); - thread_ptr.data.memory = mmap_slice; - - var arg: usize = undefined; - if (@sizeOf(Context) != 0) { - arg = mmap_addr + context_start_offset; - const context_ptr = @alignCast(@alignOf(Context), @intToPtr(*Context, arg)); - context_ptr.* = context; - } - - if (std.Target.current.os.tag == .linux) { const flags: u32 = os.CLONE_VM | os.CLONE_FS | os.CLONE_FILES | os.CLONE_SIGHAND | os.CLONE_THREAD | os.CLONE_SYSVSEM | os.CLONE_PARENT_SETTID | os.CLONE_CHILD_CLEARTID | os.CLONE_DETACHED | os.CLONE_SETTLS; - // This structure is only needed when targeting i386 - var user_desc: if (std.Target.current.cpu.arch == .i386) os.linux.user_desc else void = undefined; - const tls_area = mmap_slice[tls_start_offset..]; - const tp_value = os.linux.tls.prepareTLS(tls_area); - - const newtls = blk: { - if (std.Target.current.cpu.arch == .i386) { - user_desc = os.linux.user_desc{ - .entry_number = os.linux.tls.tls_image.gdt_entry_number, - .base_addr = tp_value, - .limit = 0xfffff, - .seg_32bit = 1, - .contents = 0, // Data - .read_exec_only = 0, - .limit_in_pages = 1, - .seg_not_present = 0, - .useable = 1, - }; - break :blk @ptrToInt(&user_desc); - } else { - break :blk tp_value; - } - }; - - const rc = os.linux.clone( - MainFuncs.linuxThreadMain, - mmap_addr + stack_end_offset, + return switch (linux.getErrno(linux.clone( + Instance.entryFn, + @ptrToInt(&mapped[stack_offset]), flags, - arg, - &thread_ptr.data.handle, - newtls, - &thread_ptr.data.handle, - ); - switch (os.errno(rc)) { - 0 => return thread_ptr, - os.EAGAIN => return error.ThreadQuotaExceeded, + @ptrToInt(instance), + &instance.thread.parent_tid, + tls_ptr, + &instance.thread.child_tid.value, + ))) { + 0 => .{ .thread = &instance.thread }, + os.EAGAIN => error.ThreadQuotaExceeded, os.EINVAL => unreachable, - os.ENOMEM => return error.SystemResources, + os.ENOMEM => error.SystemResources, os.ENOSPC => unreachable, os.EPERM => unreachable, os.EUSERS => unreachable, - else => |err| return os.unexpectedErrno(err), + else => |err| os.unexpectedErrno(err), + }; + } + + fn getHandle(self: Impl) ThreadHandle { + return self.thread.parent_tid; + } + + fn detach(self: Impl) void { + switch (self.thread.completion.swap(.detached, .AcqRel)) { + .running => {}, + .completed => self.join(), + .detached => unreachable, } - } else { - @compileError("Unsupported OS"); } -} -pub const CpuCountError = error{ - PermissionDenied, - SystemResources, - Unexpected, -}; + fn join(self: Impl) void { + defer self.thread.free(); -pub fn cpuCount() CpuCountError!usize { - switch (std.Target.current.os.tag) { - .linux => { - const cpu_set = try os.sched_getaffinity(0); - return @as(usize, os.CPU_COUNT(cpu_set)); // TODO should not need this usize cast - }, - .windows => { - return os.windows.peb().NumberOfProcessors; - }, - .openbsd => { - var count: c_int = undefined; - var count_size: usize = @sizeOf(c_int); - const mib = [_]c_int{ os.CTL_HW, os.HW_NCPUONLINE }; - os.sysctl(&mib, &count, &count_size, null, 0) catch |err| switch (err) { - error.NameTooLong, error.UnknownName => unreachable, - else => |e| return e, - }; - return @intCast(usize, count); - }, - .haiku => { - var count: u32 = undefined; - // var system_info: os.system_info = undefined; - // const rc = os.system.get_system_info(&system_info); - count = system_info.cpu_count; - return @intCast(usize, count); - }, - else => { - var count: c_int = undefined; - var count_len: usize = @sizeOf(c_int); - const name = if (comptime std.Target.current.isDarwin()) "hw.logicalcpu" else "hw.ncpu"; - os.sysctlbynameZ(name, &count, &count_len, null, 0) catch |err| switch (err) { - error.NameTooLong, error.UnknownName => unreachable, - else => |e| return e, - }; - return @intCast(usize, count); - }, + var spin: u8 = 10; + while (true) { + const tid = self.thread.child_tid.load(.Acquire); + if (tid == 0) { + break; + } + + if (spin > 0) { + spin -= 1; + std.atomic.spinLoopHint(); + continue; + } + + switch (linux.getErrno(linux.futex_wait( + &self.thread.child_tid.value, + linux.FUTEX_WAIT, + tid, + null, + ))) { + 0 => continue, + os.EINTR => continue, + os.EAGAIN => continue, + else => unreachable, + } + } } -} -pub fn getCurrentThreadId() u64 { - switch (std.Target.current.os.tag) { - .linux => { - // Use the syscall directly as musl doesn't provide a wrapper. - return @bitCast(u32, os.linux.gettid()); - }, - .windows => { - return os.windows.kernel32.GetCurrentThreadId(); - }, - .macos, .ios, .watchos, .tvos => { - var thread_id: u64 = undefined; - // Pass thread=null to get the current thread ID. - assert(c.pthread_threadid_np(null, &thread_id) == 0); - return thread_id; - }, - .dragonfly => { - return @bitCast(u32, c.lwp_gettid()); - }, - .netbsd => { - return @bitCast(u32, c._lwp_self()); - }, - .freebsd => { - return @bitCast(u32, c.pthread_getthreadid_np()); - }, - .openbsd => { - return @bitCast(u32, c.getthrid()); - }, - .haiku => { - return @bitCast(u32, c.find_thread(null)); - }, - else => { - @compileError("getCurrentThreadId not implemented for this platform"); - }, + // Calls `munmap(ptr, len)` then `exit(1)` without touching the stack (which lives in `ptr`). + // Ported over from musl libc's pthread detached implementation. + extern fn __unmap_and_exit(ptr: usize, len: usize) callconv(.C) noreturn; + comptime { + asm(switch (target.cpu.arch) { + .i386 => ( + \\.text + \\.global __unmap_and_exit + \\.type __unmap_and_exit, @function + \\__unmap_and_exit: + \\ movl $91, %eax + \\ movl 4(%esp), %ebx + \\ movl 8(%esp), %ecx + \\ int $128 + \\ xorl %ebx, %ebx + \\ movl $1, %eax + \\ int $128 + ), + .x86_64 => ( + \\.text + \\.global __unmap_and_exit + \\.type __unmap_and_exit, @function + \\__unmap_and_exit: + \\ movl $11, %eax + \\ syscall + \\ xor %rdi, %rdi + \\ movl $60, %eax + \\ syscall + ), + .arm, .armeb, .aarch64, .aarch64_be, .aarch64_32 => ( + \\.text + \\.global __unmap_and_exit + \\.type __unmap_and_exit, @function + \\__unmap_and_exit: + \\ mov r7, #91 + \\ svc 0 + \\ mov r7, #1 + \\ svc 0 + ), + .mips, .mipsel, .mips64, .mips64el => ( + \\.set noreorder + \\.global __unmap_and_exit + \\.type __unmap_and_exit, @function + \\__unmap_and_exit: + \\ li $2, 4091 + \\ syscall + \\ li $4, 0 + \\ li $2, 4001 + \\ syscall + ), + .powerpc, .powerpc64, .powerpc64le => ( + \\.text + \\.global __unmap_and_exit + \\.type __unmap_and_exit, @function + \\__unmap_and_exit: + \\ li 0, 91 + \\ sc + \\ li 0, 1 + \\ sc + \\ blr + ), + else => @compileError("Platform not supported"), + }); } -} - -test "std.Thread" { - if (!builtin.single_threaded) { - _ = AutoResetEvent; - _ = Futex; - _ = ResetEvent; - _ = StaticResetEvent; - _ = Mutex; - _ = Semaphore; - _ = Condition; - } -} +}; \ No newline at end of file From 0a1def7833882249563358f262e2210beb77492a Mon Sep 17 00:00:00 2001 From: kprotty Date: Sat, 19 Jun 2021 21:31:43 -0500 Subject: [PATCH 02/24] changes to accomodate std.Thread update --- lib/std/Thread/AutoResetEvent.zig | 8 ++-- lib/std/Thread/Futex.zig | 60 ++++++++++++----------------- lib/std/Thread/Mutex.zig | 6 +-- lib/std/Thread/ResetEvent.zig | 8 ++-- lib/std/Thread/StaticResetEvent.zig | 8 ++-- lib/std/atomic/queue.zig | 12 +++--- lib/std/atomic/stack.zig | 12 +++--- lib/std/debug.zig | 4 +- lib/std/event/loop.zig | 30 +++++++-------- lib/std/fs/test.zig | 11 +++--- lib/std/net/test.zig | 10 ++--- lib/std/once.zig | 8 ++-- lib/std/os/test.zig | 36 ++++++++--------- lib/std/target.zig | 23 +++++++++-- src/ThreadPool.zig | 4 +- tools/update_cpu_features.zig | 18 +++++---- 16 files changed, 130 insertions(+), 128 deletions(-) diff --git a/lib/std/Thread/AutoResetEvent.zig b/lib/std/Thread/AutoResetEvent.zig index 7372a8d8d9..13e404d602 100644 --- a/lib/std/Thread/AutoResetEvent.zig +++ b/lib/std/Thread/AutoResetEvent.zig @@ -220,9 +220,9 @@ test "basic usage" { }; var context = Context{}; - const send_thread = try std.Thread.spawn(Context.sender, &context); - const recv_thread = try std.Thread.spawn(Context.receiver, &context); + const send_thread = try std.Thread.spawn(.{}, Context.sender, .{&context}); + const recv_thread = try std.Thread.spawn(.{}, Context.receiver, .{&context}); - send_thread.wait(); - recv_thread.wait(); + send_thread.join(); + recv_thread.join(); } diff --git a/lib/std/Thread/Futex.zig b/lib/std/Thread/Futex.zig index 356a4e2046..4153f7b7c0 100644 --- a/lib/std/Thread/Futex.zig +++ b/lib/std/Thread/Futex.zig @@ -413,32 +413,27 @@ test "Futex - Signal" { } } - const Thread = struct { - tx: *Self, - rx: *Self, + const start_value = 1; - const start_value = 1; - - fn run(self: Thread) void { - var iterations: u32 = start_value; - while (iterations < 10) : (iterations += 1) { - self.rx.recv(iterations); - self.tx.send(iterations); - } + fn runThread(rx: *Self, tx: *Self) void { + var iterations: u32 = start_value; + while (iterations < 10) : (iterations += 1) { + self.rx.recv(iterations); + self.tx.send(iterations); } - }; + } fn run() !void { var ping = Self{}; var pong = Self{}; - const t1 = try std.Thread.spawn(Thread.run, .{ .rx = &ping, .tx = &pong }); - defer t1.wait(); + const t1 = try std.Thread.spawn(.{}, runThread, .{ &ping, &pong }); + defer t1.join(); - const t2 = try std.Thread.spawn(Thread.run, .{ .rx = &pong, .tx = &ping }); - defer t2.wait(); + const t2 = try std.Thread.spawn(.{}, runThread, .{ &pong, &ping }); + defer t2.join(); - ping.send(Thread.start_value); + ping.send(start_value); } }).run(); } @@ -507,7 +502,7 @@ test "Futex - Chain" { try (struct { completed: Signal = .{}, threads: [10]struct { - thread: *std.Thread, + thread: std.Thread, signal: Signal, } = undefined, @@ -531,39 +526,32 @@ test "Futex - Chain" { }; const Self = @This(); - const Chain = struct { - self: *Self, - index: usize, - fn run(chain: Chain) void { - const this_signal = &chain.self.threads[chain.index].signal; + fn runThread(self: *Self, index: usize) void { + const this_signal = &chain.self.threads[chain.index].signal; - var next_signal = &chain.self.completed; - if (chain.index + 1 < chain.self.threads.len) { - next_signal = &chain.self.threads[chain.index + 1].signal; - } - - this_signal.wait(); - next_signal.notify(); + var next_signal = &chain.self.completed; + if (chain.index + 1 < chain.self.threads.len) { + next_signal = &chain.self.threads[chain.index + 1].signal; } - }; + + this_signal.wait(); + next_signal.notify(); + } fn run() !void { var self = Self{}; for (self.threads) |*entry, index| { entry.signal = .{}; - entry.thread = try std.Thread.spawn(Chain.run, .{ - .self = &self, - .index = index, - }); + entry.thread = try std.Thread.spawn(.{}, runThread .{&self, index}); } self.threads[0].signal.notify(); self.completed.wait(); for (self.threads) |entry| { - entry.thread.wait(); + entry.thread.join(); } } }).run(); diff --git a/lib/std/Thread/Mutex.zig b/lib/std/Thread/Mutex.zig index 49f138079d..35095b2a3c 100644 --- a/lib/std/Thread/Mutex.zig +++ b/lib/std/Thread/Mutex.zig @@ -297,12 +297,12 @@ test "basic usage" { try testing.expect(context.data == TestContext.incr_count); } else { const thread_count = 10; - var threads: [thread_count]*std.Thread = undefined; + var threads: [thread_count]std.Thread = undefined; for (threads) |*t| { - t.* = try std.Thread.spawn(worker, &context); + t.* = try std.Thread.spawn(.{}, worker, .{&context}); } for (threads) |t| - t.wait(); + t.join(); try testing.expect(context.data == thread_count * TestContext.incr_count); } diff --git a/lib/std/Thread/ResetEvent.zig b/lib/std/Thread/ResetEvent.zig index f161b46aa0..356b8eb78d 100644 --- a/lib/std/Thread/ResetEvent.zig +++ b/lib/std/Thread/ResetEvent.zig @@ -281,8 +281,8 @@ test "basic usage" { var context: Context = undefined; try context.init(); defer context.deinit(); - const receiver = try std.Thread.spawn(Context.receiver, &context); - defer receiver.wait(); + const receiver = try std.Thread.spawn(.{}, Context.receiver, .{&context}); + defer receiver.join(); try context.sender(); if (false) { @@ -290,8 +290,8 @@ test "basic usage" { // https://github.com/ziglang/zig/issues/7009 var timed = Context.init(); defer timed.deinit(); - const sleeper = try std.Thread.spawn(Context.sleeper, &timed); - defer sleeper.wait(); + const sleeper = try std.Thread.spawn(.{}, Context.sleeper, .{&timed}); + defer sleeper.join(); try timed.timedWaiter(); } } diff --git a/lib/std/Thread/StaticResetEvent.zig b/lib/std/Thread/StaticResetEvent.zig index 6f869e0d89..40974938d0 100644 --- a/lib/std/Thread/StaticResetEvent.zig +++ b/lib/std/Thread/StaticResetEvent.zig @@ -384,8 +384,8 @@ test "basic usage" { }; var context = Context{}; - const receiver = try std.Thread.spawn(Context.receiver, &context); - defer receiver.wait(); + const receiver = try std.Thread.spawn(.{}, Context.receiver, .{&context}); + defer receiver.join(); try context.sender(); if (false) { @@ -393,8 +393,8 @@ test "basic usage" { // https://github.com/ziglang/zig/issues/7009 var timed = Context.init(); defer timed.deinit(); - const sleeper = try std.Thread.spawn(Context.sleeper, &timed); - defer sleeper.wait(); + const sleeper = try std.Thread.spawn(.{}, Context.sleeper, .{&timed}); + defer sleeper.join(); try timed.timedWaiter(); } } diff --git a/lib/std/atomic/queue.zig b/lib/std/atomic/queue.zig index cc5b20488b..cee431951c 100644 --- a/lib/std/atomic/queue.zig +++ b/lib/std/atomic/queue.zig @@ -214,20 +214,20 @@ test "std.atomic.Queue" { } else { try expect(context.queue.isEmpty()); - var putters: [put_thread_count]*std.Thread = undefined; + var putters: [put_thread_count]std.Thread = undefined; for (putters) |*t| { - t.* = try std.Thread.spawn(startPuts, &context); + t.* = try std.Thread.spawn(.{}, startPuts, .{&context}); } - var getters: [put_thread_count]*std.Thread = undefined; + var getters: [put_thread_count]std.Thread = undefined; for (getters) |*t| { - t.* = try std.Thread.spawn(startGets, &context); + t.* = try std.Thread.spawn(.{}, startGets, .{&context}); } for (putters) |t| - t.wait(); + t.join(); @atomicStore(bool, &context.puts_done, true, .SeqCst); for (getters) |t| - t.wait(); + t.join(); try expect(context.queue.isEmpty()); } diff --git a/lib/std/atomic/stack.zig b/lib/std/atomic/stack.zig index 9472df7347..cfdf40fa60 100644 --- a/lib/std/atomic/stack.zig +++ b/lib/std/atomic/stack.zig @@ -121,20 +121,20 @@ test "std.atomic.stack" { } } } else { - var putters: [put_thread_count]*std.Thread = undefined; + var putters: [put_thread_count]std.Thread = undefined; for (putters) |*t| { - t.* = try std.Thread.spawn(startPuts, &context); + t.* = try std.Thread.spawn(.{}, startPuts, .{&context}); } - var getters: [put_thread_count]*std.Thread = undefined; + var getters: [put_thread_count]std.Thread = undefined; for (getters) |*t| { - t.* = try std.Thread.spawn(startGets, &context); + t.* = try std.Thread.spawn(.{}, startGets, .{&context}); } for (putters) |t| - t.wait(); + t.join(); @atomicStore(bool, &context.puts_done, true, .SeqCst); for (getters) |t| - t.wait(); + t.join(); } if (context.put_sum != context.get_sum) { diff --git a/lib/std/debug.zig b/lib/std/debug.zig index fa87e5898c..57445d3cb1 100644 --- a/lib/std/debug.zig +++ b/lib/std/debug.zig @@ -273,8 +273,8 @@ pub fn panicExtra(trace: ?*const builtin.StackTrace, first_trace_addr: ?usize, c if (builtin.single_threaded) { stderr.print("panic: ", .{}) catch os.abort(); } else { - const current_thread_id = std.Thread.getCurrentThreadId(); - stderr.print("thread {d} panic: ", .{current_thread_id}) catch os.abort(); + const current_thread_id = std.Thread.getCurrentId(); + stderr.print("thread {} panic: ", .{current_thread_id}) catch os.abort(); } stderr.print(format ++ "\n", args) catch os.abort(); if (trace) |t| { diff --git a/lib/std/event/loop.zig b/lib/std/event/loop.zig index 5353341363..87d84acba6 100644 --- a/lib/std/event/loop.zig +++ b/lib/std/event/loop.zig @@ -21,12 +21,12 @@ pub const Loop = struct { os_data: OsData, final_resume_node: ResumeNode, pending_event_count: usize, - extra_threads: []*Thread, + extra_threads: []Thread, /// TODO change this to a pool of configurable number of threads /// and rename it to be not file-system-specific. it will become /// a thread pool for turning non-CPU-bound blocking things into /// async things. A fallback for any missing OS-specific API. - fs_thread: *Thread, + fs_thread: Thread, fs_queue: std.atomic.Queue(Request), fs_end_request: Request.Node, fs_thread_wakeup: std.Thread.ResetEvent, @@ -189,11 +189,11 @@ pub const Loop = struct { errdefer self.deinitOsData(); if (!builtin.single_threaded) { - self.fs_thread = try Thread.spawn(posixFsRun, self); + self.fs_thread = try Thread.spawn(.{}, posixFsRun, .{self}); } errdefer if (!builtin.single_threaded) { self.posixFsRequest(&self.fs_end_request); - self.fs_thread.wait(); + self.fs_thread.join(); }; if (!std.builtin.single_threaded) @@ -264,11 +264,11 @@ pub const Loop = struct { assert(amt == wakeup_bytes.len); while (extra_thread_index != 0) { extra_thread_index -= 1; - self.extra_threads[extra_thread_index].wait(); + self.extra_threads[extra_thread_index].join(); } } while (extra_thread_index < extra_thread_count) : (extra_thread_index += 1) { - self.extra_threads[extra_thread_index] = try Thread.spawn(workerRun, self); + self.extra_threads[extra_thread_index] = try Thread.spawn(.{}, workerRun, .{self}); } }, .macos, .freebsd, .netbsd, .dragonfly, .openbsd => { @@ -329,11 +329,11 @@ pub const Loop = struct { _ = os.kevent(self.os_data.kqfd, final_kev_arr, empty_kevs, null) catch unreachable; while (extra_thread_index != 0) { extra_thread_index -= 1; - self.extra_threads[extra_thread_index].wait(); + self.extra_threads[extra_thread_index].join(); } } while (extra_thread_index < extra_thread_count) : (extra_thread_index += 1) { - self.extra_threads[extra_thread_index] = try Thread.spawn(workerRun, self); + self.extra_threads[extra_thread_index] = try Thread.spawn(.{}, workerRun, .{self}); } }, .windows => { @@ -378,11 +378,11 @@ pub const Loop = struct { } while (extra_thread_index != 0) { extra_thread_index -= 1; - self.extra_threads[extra_thread_index].wait(); + self.extra_threads[extra_thread_index].join(); } } while (extra_thread_index < extra_thread_count) : (extra_thread_index += 1) { - self.extra_threads[extra_thread_index] = try Thread.spawn(workerRun, self); + self.extra_threads[extra_thread_index] = try Thread.spawn(.{}, workerRun, .{self}); } }, else => {}, @@ -651,18 +651,18 @@ pub const Loop = struct { .netbsd, .dragonfly, .openbsd, - => self.fs_thread.wait(), + => self.fs_thread.join(), else => {}, } } for (self.extra_threads) |extra_thread| { - extra_thread.wait(); + extra_thread.join(); } @atomicStore(bool, &self.delay_queue.is_running, false, .SeqCst); self.delay_queue.event.set(); - self.delay_queue.thread.wait(); + self.delay_queue.thread.join(); } /// Runs the provided function asynchronously. The function's frame is allocated @@ -787,7 +787,7 @@ pub const Loop = struct { const DelayQueue = struct { timer: std.time.Timer, waiters: Waiters, - thread: *std.Thread, + thread: std.Thread, event: std.Thread.AutoResetEvent, is_running: bool, @@ -802,7 +802,7 @@ pub const Loop = struct { .event = std.Thread.AutoResetEvent{}, .is_running = true, // Must be last so that it can read the other state, such as `is_running`. - .thread = try std.Thread.spawn(DelayQueue.run, self), + .thread = try std.Thread.spawn(.{}, DelayQueue.run, .{self}), }; } diff --git a/lib/std/fs/test.zig b/lib/std/fs/test.zig index 3c7a95432c..9464959cdb 100644 --- a/lib/std/fs/test.zig +++ b/lib/std/fs/test.zig @@ -862,11 +862,10 @@ test "open file with exclusive lock twice, make sure it waits" { errdefer file.close(); const S = struct { - const C = struct { dir: *fs.Dir, evt: *std.Thread.ResetEvent }; - fn checkFn(ctx: C) !void { - const file1 = try ctx.dir.createFile(filename, .{ .lock = .Exclusive }); + fn checkFn(dir: *fs.Dir, evt: *std.Thread.ResetEvent) !void { + const file1 = try dir.createFile(filename, .{ .lock = .Exclusive }); defer file1.close(); - ctx.evt.set(); + evt.set(); } }; @@ -874,8 +873,8 @@ test "open file with exclusive lock twice, make sure it waits" { try evt.init(); defer evt.deinit(); - const t = try std.Thread.spawn(S.checkFn, S.C{ .dir = &tmp.dir, .evt = &evt }); - defer t.wait(); + const t = try std.Thread.spawn(.{}, S.checkFn, .{ &tmp.dir, &evt }); + defer t.join(); const SLEEP_TIMEOUT_NS = 10 * std.time.ns_per_ms; // Make sure we've slept enough. diff --git a/lib/std/net/test.zig b/lib/std/net/test.zig index c2596b733c..08722c9e31 100644 --- a/lib/std/net/test.zig +++ b/lib/std/net/test.zig @@ -161,8 +161,8 @@ test "listen on a port, send bytes, receive bytes" { } }; - const t = try std.Thread.spawn(S.clientFn, server.listen_address); - defer t.wait(); + const t = try std.Thread.spawn(.{}, S.clientFn, .{server.listen_address}); + defer t.join(); var client = try server.accept(); defer client.stream.close(); @@ -277,7 +277,7 @@ test "listen on a unix socket, send bytes, receive bytes" { try server.listen(socket_addr); const S = struct { - fn clientFn(_: void) !void { + fn clientFn() !void { const socket = try net.connectUnixSocket(socket_path); defer socket.close(); @@ -285,8 +285,8 @@ test "listen on a unix socket, send bytes, receive bytes" { } }; - const t = try std.Thread.spawn(S.clientFn, {}); - defer t.wait(); + const t = try std.Thread.spawn(.{}, S.clientFn, .{}); + defer t.join(); var client = try server.accept(); defer client.stream.close(); diff --git a/lib/std/once.zig b/lib/std/once.zig index 05f003c796..a557f4aac9 100644 --- a/lib/std/once.zig +++ b/lib/std/once.zig @@ -55,16 +55,16 @@ test "Once executes its function just once" { global_once.call(); global_once.call(); } else { - var threads: [10]*std.Thread = undefined; - defer for (threads) |handle| handle.wait(); + var threads: [10]std.Thread = undefined; + defer for (threads) |handle| handle.join(); for (threads) |*handle| { - handle.* = try std.Thread.spawn(struct { + handle.* = try std.Thread.spawn(.{}, struct { fn thread_fn(x: u8) void { _ = x; global_once.call(); } - }.thread_fn, 0); + }.thread_fn, .{0}); } } diff --git a/lib/std/os/test.zig b/lib/std/os/test.zig index 7a88ecd7ca..0bfe29d64b 100644 --- a/lib/std/os/test.zig +++ b/lib/std/os/test.zig @@ -320,9 +320,9 @@ test "std.Thread.getCurrentId" { if (builtin.single_threaded) return error.SkipZigTest; var thread_current_id: Thread.Id = undefined; - const thread = try Thread.spawn(testThreadIdFn, &thread_current_id); - const thread_id = thread.handle(); - thread.wait(); + const thread = try Thread.spawn(.{}, testThreadIdFn, .{&thread_current_id}); + const thread_id = thread.getHandle(); + thread.join(); if (Thread.use_pthreads) { try expect(thread_current_id == thread_id); } else if (native_os == .windows) { @@ -339,21 +339,20 @@ test "spawn threads" { var shared_ctx: i32 = 1; - const thread1 = try Thread.spawn(start1, {}); - const thread2 = try Thread.spawn(start2, &shared_ctx); - const thread3 = try Thread.spawn(start2, &shared_ctx); - const thread4 = try Thread.spawn(start2, &shared_ctx); + const thread1 = try Thread.spawn(.{}, start1, .{}); + const thread2 = try Thread.spawn(.{}, start2, .{&shared_ctx}); + const thread3 = try Thread.spawn(.{}, start2, .{&shared_ctx}); + const thread4 = try Thread.spawn(.{}, start2, .{&shared_ctx}); - thread1.wait(); - thread2.wait(); - thread3.wait(); - thread4.wait(); + thread1.join(); + thread2.join(); + thread3.join(); + thread4.join(); try expect(shared_ctx == 4); } -fn start1(ctx: void) u8 { - _ = ctx; +fn start1() u8 { return 0; } @@ -371,16 +370,15 @@ test "cpu count" { test "thread local storage" { if (builtin.single_threaded) return error.SkipZigTest; - const thread1 = try Thread.spawn(testTls, {}); - const thread2 = try Thread.spawn(testTls, {}); + const thread1 = try Thread.spawn(.{}, testTls, .{}); + const thread2 = try Thread.spawn(.{}, testTls, .{}); try testTls({}); - thread1.wait(); - thread2.wait(); + thread1.join(); + thread2.join(); } threadlocal var x: i32 = 1234; -fn testTls(context: void) !void { - _ = context; +fn testTls() !void { if (x != 1234) return error.TlsBadStartValue; x += 1; if (x != 1235) return error.TlsBadEndValue; diff --git a/lib/std/target.zig b/lib/std/target.zig index 02ce44d477..507f30c2e0 100644 --- a/lib/std/target.zig +++ b/lib/std/target.zig @@ -69,6 +69,13 @@ pub const Target = struct { }; } + pub fn isBSD(tag: Tag) bool { + return tag.isDarwin() or switch (tag) { + .kfreebsd, .freebsd, .openbsd, .netbsd, .dragonfly => true, + else => false, + }; + } + pub fn dynamicLibSuffix(tag: Tag) [:0]const u8 { if (tag.isDarwin()) { return ".dylib"; @@ -787,6 +794,13 @@ pub const Target = struct { }; } + pub fn isAARCH64(arch: Arch) bool { + return switch (arch) { + .aarch64, .aarch64_be, .aarch64_32 => true, + else => false, + }; + } + pub fn isThumb(arch: Arch) bool { return switch (arch) { .thumb, .thumbeb => true, @@ -1365,10 +1379,7 @@ pub const Target = struct { } pub fn isAndroid(self: Target) bool { - return switch (self.abi) { - .android => true, - else => false, - }; + return self.abi == .android; } pub fn isWasm(self: Target) bool { @@ -1379,6 +1390,10 @@ pub const Target = struct { return self.os.tag.isDarwin(); } + pub fn isBSD(self: Target) bool { + return self.os.tag.isBSD(); + } + pub fn isGnuLibC_os_tag_abi(os_tag: Os.Tag, abi: Abi) bool { return os_tag == .linux and abi.isGnu(); } diff --git a/src/ThreadPool.zig b/src/ThreadPool.zig index 4a7fa8cb9b..230608df4b 100644 --- a/src/ThreadPool.zig +++ b/src/ThreadPool.zig @@ -74,13 +74,13 @@ pub fn init(self: *ThreadPool, allocator: *std.mem.Allocator) !void { try worker.idle_node.data.init(); errdefer worker.idle_node.data.deinit(); - worker.thread = try std.Thread.spawn(Worker.run, worker); + worker.thread = try std.Thread.spawn(.{}, Worker.run, .{worker}); } } fn destroyWorkers(self: *ThreadPool, spawned: usize) void { for (self.workers[0..spawned]) |*worker| { - worker.thread.wait(); + worker.thread.join(); worker.idle_node.data.deinit(); } } diff --git a/tools/update_cpu_features.zig b/tools/update_cpu_features.zig index 68d9b233a7..4694e3de25 100644 --- a/tools/update_cpu_features.zig +++ b/tools/update_cpu_features.zig @@ -816,18 +816,20 @@ pub fn main() anyerror!void { }); } } else { - var threads = try arena.alloc(*std.Thread, llvm_targets.len); + var threads = try arena.alloc(std.Thread, llvm_targets.len); for (llvm_targets) |llvm_target, i| { - threads[i] = try std.Thread.spawn(processOneTarget, .{ - .llvm_tblgen_exe = llvm_tblgen_exe, - .llvm_src_root = llvm_src_root, - .zig_src_dir = zig_src_dir, - .root_progress = root_progress, - .llvm_target = llvm_target, + threads[i] = try std.Thread.spawn(.{}, processOneTarget, .{ + Job{ + .llvm_tblgen_exe = llvm_tblgen_exe, + .llvm_src_root = llvm_src_root, + .zig_src_dir = zig_src_dir, + .root_progress = root_progress, + .llvm_target = llvm_target, + }, }); } for (threads) |thread| { - thread.wait(); + thread.join(); } } } From 3a276be1355fe304c177bcc13a7896122801e5f2 Mon Sep 17 00:00:00 2001 From: kprotty Date: Sat, 19 Jun 2021 21:50:34 -0500 Subject: [PATCH 03/24] std.Thread.getCpuCount(): fix usages --- lib/std/event/loop.zig | 4 ++-- lib/std/os/test.zig | 2 +- src/ThreadPool.zig | 2 +- 3 files changed, 4 insertions(+), 4 deletions(-) diff --git a/lib/std/event/loop.zig b/lib/std/event/loop.zig index 87d84acba6..26cbf3b988 100644 --- a/lib/std/event/loop.zig +++ b/lib/std/event/loop.zig @@ -137,7 +137,7 @@ pub const Loop = struct { } /// After initialization, call run(). - /// This is the same as `initThreadPool` using `Thread.cpuCount` to determine the thread + /// This is the same as `initThreadPool` using `Thread.getCpuCount` to determine the thread /// pool size. /// TODO copy elision / named return values so that the threads referencing *Loop /// have the correct pointer value. @@ -145,7 +145,7 @@ pub const Loop = struct { pub fn initMultiThreaded(self: *Loop) !void { if (builtin.single_threaded) @compileError("initMultiThreaded unavailable when building in single-threaded mode"); - const core_count = try Thread.cpuCount(); + const core_count = try Thread.getCpuCount(); return self.initThreadPool(core_count); } diff --git a/lib/std/os/test.zig b/lib/std/os/test.zig index 0bfe29d64b..472da73235 100644 --- a/lib/std/os/test.zig +++ b/lib/std/os/test.zig @@ -364,7 +364,7 @@ fn start2(ctx: *i32) u8 { test "cpu count" { if (native_os == .wasi) return error.SkipZigTest; - const cpu_count = try Thread.cpuCount(); + const cpu_count = try Thread.getCpuCount(); try expect(cpu_count >= 1); } diff --git a/src/ThreadPool.zig b/src/ThreadPool.zig index 230608df4b..18818c7d9d 100644 --- a/src/ThreadPool.zig +++ b/src/ThreadPool.zig @@ -60,7 +60,7 @@ pub fn init(self: *ThreadPool, allocator: *std.mem.Allocator) !void { if (std.builtin.single_threaded) return; - const worker_count = std.math.max(1, std.Thread.cpuCount() catch 1); + const worker_count = std.math.max(1, std.Thread.getCpuCount() catch 1); self.workers = try allocator.alloc(Worker, worker_count); errdefer allocator.free(self.workers); From 235fcc5ba632ced7e972e1c8b133418c17cd7ab7 Mon Sep 17 00:00:00 2001 From: kprotty Date: Sat, 19 Jun 2021 22:01:54 -0500 Subject: [PATCH 04/24] std.Thread: another typo fix --- lib/std/Thread.zig | 48 +++++++++++++++++++++++----------------------- 1 file changed, 24 insertions(+), 24 deletions(-) diff --git a/lib/std/Thread.zig b/lib/std/Thread.zig index 53be5d6d83..8b9cfab237 100644 --- a/lib/std/Thread.zig +++ b/lib/std/Thread.zig @@ -25,6 +25,7 @@ pub const spinLoopHint = @compileError("deprecated: use std.atomic.spinLoopHint" pub const use_pthreads = target.os.tag != .windows and std.builtin.link_libc; +const Thread = @This(); const Impl = if (target.os.tag == .windows) WindowsThreadImpl else if (use_pthreads) @@ -36,7 +37,6 @@ else impl: Impl, - /// Represents a kernel thread handle. /// May be an integer or a pointer depending on the platform. /// On Linux and POSIX, this is the same as Id. @@ -120,6 +120,29 @@ pub fn spawn( return .{ .impl = impl }; } +/// Retrns the handle of this thread +/// On Linux and POSIX, this is the same as Id. +pub fn getHandle(self: Thread) Handle { + return self.impl.getHandle(); +} + +/// Release the obligation of the caller to call `join()` and have the thread clean up its own resources on completion. +pub fn detach(self: Thread) void { + return self.impl.detach(); +} + +/// Waits for the thread to complete, then deallocates any resources created on `spawn()`. +pub fn join(self: Thread) void { + return self.impl.join(); +} + +/// State to synchronize detachment of spawner thread to spawned thread +const Completion = Atomic(enum { + running, + detached, + completed, +}); + /// Used by the Thread implementations to call the spawned function with the arguments. fn callFn(comptime f: anytype, args: anytype) switch (Impl) { WindowsThreadImpl => windows.DWORD, @@ -172,29 +195,6 @@ fn callFn(comptime f: anytype, args: anytype) switch (Impl) { } } -/// Retrns the handle of this thread -/// On Linux and POSIX, this is the same as Id. -pub fn getHandle(self: Thread) Handle { - return self.impl.getHandle(); -} - -/// Release the obligation of the caller to call `join()` and have the thread clean up its own resources on completion. -pub fn detach(self: Thread) void { - return self.impl.detach(); -} - -/// Waits for the thread to complete, then deallocates any resources created on `spawn()`. -pub fn join(self: Thread) void { - return self.impl.join(); -} - -/// State to synchronize detachment of spawner thread to spawned thread -const Completion = Atomic(enum { - running, - detached, - completed, -}); - const WindowsThreadImpl = struct { const windows = os.windows; From 6ff64895cf0c8f331959d34dec5f4fa84e7c6365 Mon Sep 17 00:00:00 2001 From: kprotty Date: Sun, 20 Jun 2021 07:38:26 -0500 Subject: [PATCH 05/24] std.Thread: add tests + getCurrentId() returns ints when possible --- lib/std/Thread.zig | 69 ++++++++++++++++++++++++++++++++++------------ 1 file changed, 52 insertions(+), 17 deletions(-) diff --git a/lib/std/Thread.zig b/lib/std/Thread.zig index 8b9cfab237..552961988b 100644 --- a/lib/std/Thread.zig +++ b/lib/std/Thread.zig @@ -23,6 +23,19 @@ pub const Condition = @import("Thread/Condition.zig"); pub const spinLoopHint = @compileError("deprecated: use std.atomic.spinLoopHint"); +test "std.Thread" { + if (!builtin.single_threaded) { + // Doesn't use testing.refAllDecls() since that would pull in the compileError spinLoopHint. + _ = AutoResetEvent; + _ = Futex; + _ = ResetEvent; + _ = StaticResetEvent; + _ = Mutex; + _ = Semaphore; + _ = Condition; + } +} + pub const use_pthreads = target.os.tag != .windows and std.builtin.link_libc; const Thread = @This(); @@ -37,15 +50,9 @@ else impl: Impl, -/// Represents a kernel thread handle. -/// May be an integer or a pointer depending on the platform. -/// On Linux and POSIX, this is the same as Id. -pub const Handle = Impl.ThreadHandle; - /// Represents a unique ID per thread. /// May be an integer or pointer depending on the platform. -/// On Linux and POSIX, this is the same as Handle. -pub const Id = Impl.ThreadId; +pub const Id = u64; /// Returns the platform ID of the callers thread. /// Attempts to use thread locals and avoid syscalls when possible. @@ -120,8 +127,11 @@ pub fn spawn( return .{ .impl = impl }; } +/// Represents a kernel thread handle. +/// May be an integer or a pointer depending on the platform. +pub const Handle = Impl.ThreadHandle; + /// Retrns the handle of this thread -/// On Linux and POSIX, this is the same as Id. pub fn getHandle(self: Thread) Handle { return self.impl.getHandle(); } @@ -137,7 +147,7 @@ pub fn join(self: Thread) void { } /// State to synchronize detachment of spawner thread to spawned thread -const Completion = Atomic(enum { +const Completion = Atomic(enum(u8) { running, detached, completed, @@ -199,9 +209,8 @@ const WindowsThreadImpl = struct { const windows = os.windows; pub const ThreadHandle = windows.HANDLE; - pub const ThreadId = windows.DWORD; - fn getCurrentId() ThreadId { + fn getCurrentId() u64 { return windows.kernel.GetCurrentThreadId(); } @@ -291,10 +300,37 @@ const PosixThreadImpl = struct { const c = std.c; pub const ThreadHandle = c.pthread_t; - pub const ThreadId = ThreadHandle; - fn getCurrentId() ThreadId { - return c.pthread_self(); + fn getCurrentId() Id { + switch (target.os.tag) { + .linux => { + return LinuxThreadImpl.getCurrentId(); + }, + .macos, .ios, .watchos, .tvos => { + var thread_id: u64 = undefined; + // Pass thread=null to get the current thread ID. + assert(c.pthread_threadid_np(null, &thread_id) == 0); + return thread_id; + }, + .dragonfly => { + return @bitCast(u32, c.lwp_gettid()); + }, + .netbsd => { + return @bitCast(u32, c._lwp_self()); + }, + .freebsd => { + return @bitCast(u32, c.pthread_getthreadid_np()); + }, + .openbsd => { + return @bitCast(u32, c.getthrid()); + }, + .haiku => { + return @bitCast(u32, c.find_thread(null)); + }, + else => { + return @ptrToInt(c.pthread_self()); + }, + } } fn getCpuCount() !usize { @@ -397,11 +433,10 @@ const LinuxThreadImpl = struct { const linux = os.linux; pub const ThreadHandle = i32; - pub const ThreadId = ThreadHandle; - threadlocal var tls_thread_id: ?ThreadId = null; + threadlocal var tls_thread_id: ?Id = null; - fn getCurrentId() ThreadId { + fn getCurrentId() Id { return tls_thread_id orelse { const tid = linux.gettid(); tls_thread_id = tid; From ca1e61b851351ec66ee3f1937586a2f9d02bbafc Mon Sep 17 00:00:00 2001 From: kprotty Date: Sun, 20 Jun 2021 09:56:30 -0500 Subject: [PATCH 06/24] std.Thread: fix some typos --- lib/std/Thread.zig | 34 ++++++++++++++-------------------- lib/std/Thread/Futex.zig | 2 +- 2 files changed, 15 insertions(+), 21 deletions(-) diff --git a/lib/std/Thread.zig b/lib/std/Thread.zig index 552961988b..22f12c3c95 100644 --- a/lib/std/Thread.zig +++ b/lib/std/Thread.zig @@ -24,16 +24,14 @@ pub const Condition = @import("Thread/Condition.zig"); pub const spinLoopHint = @compileError("deprecated: use std.atomic.spinLoopHint"); test "std.Thread" { - if (!builtin.single_threaded) { - // Doesn't use testing.refAllDecls() since that would pull in the compileError spinLoopHint. - _ = AutoResetEvent; - _ = Futex; - _ = ResetEvent; - _ = StaticResetEvent; - _ = Mutex; - _ = Semaphore; - _ = Condition; - } + // Doesn't use testing.refAllDecls() since that would pull in the compileError spinLoopHint. + _ = AutoResetEvent; + _ = Futex; + _ = ResetEvent; + _ = StaticResetEvent; + _ = Mutex; + _ = Semaphore; + _ = Condition; } pub const use_pthreads = target.os.tag != .windows and std.builtin.link_libc; @@ -114,17 +112,13 @@ pub const SpawnError = error { /// `config` can be used as hints to the platform for now to spawn and execute the `function`. /// The caller must eventually either call `join()` to wait for the thread to finish and free its resources /// or call `detach()` to excuse the caller from calling `join()` and have the thread clean up its resources on completion`. -pub fn spawn( - config: SpawnConfig, - comptime function: anytype, - args: std.meta.ArgsTuple(function), -) SpawnError!Thread { +pub fn spawn(config: SpawnConfig, comptime function: anytype, args: anytype) SpawnError!Thread { if (std.builtin.single_threaded) { @compileError("cannot spawn thread when building in single-threaded mode"); } - const impl = try Thread.spawn(config, function, args); - return .{ .impl = impl }; + const impl = try Impl.spawn(config, function, args); + return Thread{ .impl = impl }; } /// Represents a kernel thread handle. @@ -438,7 +432,7 @@ const LinuxThreadImpl = struct { fn getCurrentId() Id { return tls_thread_id orelse { - const tid = linux.gettid(); + const tid = @bitCast(u32, linux.gettid()); tls_thread_id = tid; return tid; }; @@ -550,7 +544,7 @@ const LinuxThreadImpl = struct { const instance = @ptrCast(*Instance, @alignCast(@alignOf(Instance), &mapped[instance_offset])); instance.* = .{ .fn_args = args, - .thread = .{ .mapped = .mapped }, + .thread = .{ .mapped = mapped }, }; const flags: u32 = os.CLONE_VM | os.CLONE_FS | os.CLONE_FILES | @@ -591,7 +585,7 @@ const LinuxThreadImpl = struct { } fn join(self: Impl) void { - defer self.thread.free(); + defer os.munmap(self.thread.mapped); var spin: u8 = 10; while (true) { diff --git a/lib/std/Thread/Futex.zig b/lib/std/Thread/Futex.zig index 4153f7b7c0..971f1bd095 100644 --- a/lib/std/Thread/Futex.zig +++ b/lib/std/Thread/Futex.zig @@ -544,7 +544,7 @@ test "Futex - Chain" { for (self.threads) |*entry, index| { entry.signal = .{}; - entry.thread = try std.Thread.spawn(.{}, runThread .{&self, index}); + entry.thread = try std.Thread.spawn(.{}, runThread, .{&self, index}); } self.threads[0].signal.notify(); From 1ae969e5299ea71231e7045110332b23989e653c Mon Sep 17 00:00:00 2001 From: kprotty Date: Sun, 20 Jun 2021 10:37:40 -0500 Subject: [PATCH 07/24] std.Thread: even more typo fixes --- lib/std/Thread/Futex.zig | 4 ++-- lib/std/event/loop.zig | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/lib/std/Thread/Futex.zig b/lib/std/Thread/Futex.zig index 971f1bd095..033ecf688d 100644 --- a/lib/std/Thread/Futex.zig +++ b/lib/std/Thread/Futex.zig @@ -444,7 +444,7 @@ test "Futex - Broadcast" { } try (struct { - threads: [10]*std.Thread = undefined, + threads: [10]std.Thread = undefined, broadcast: Atomic(u32) = Atomic(u32).init(0), notified: Atomic(usize) = Atomic(usize).init(0), @@ -475,7 +475,7 @@ test "Futex - Broadcast" { for (self.threads) |*thread| thread.* = try std.Thread.spawn(runReceiver, &self); defer for (self.threads) |thread| - thread.wait(); + thread.join(); std.time.sleep(16 * std.time.ns_per_ms); self.broadcast.store(BROADCAST_SENT, .Monotonic); diff --git a/lib/std/event/loop.zig b/lib/std/event/loop.zig index 26cbf3b988..9c8550d459 100644 --- a/lib/std/event/loop.zig +++ b/lib/std/event/loop.zig @@ -183,7 +183,7 @@ pub const Loop = struct { resume_node_count, ); - self.extra_threads = try self.arena.allocator.alloc(*Thread, extra_thread_count); + self.extra_threads = try self.arena.allocator.alloc(Thread, extra_thread_count); try self.initOsData(extra_thread_count); errdefer self.deinitOsData(); From 281a9a60f07afaf1b72bc66b2bb3c925b3c908f4 Mon Sep 17 00:00:00 2001 From: kprotty Date: Sun, 20 Jun 2021 13:45:16 -0500 Subject: [PATCH 08/24] std.Thread: fixup ThreadPool.zig --- src/ThreadPool.zig | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/ThreadPool.zig b/src/ThreadPool.zig index 18818c7d9d..7386e426eb 100644 --- a/src/ThreadPool.zig +++ b/src/ThreadPool.zig @@ -21,7 +21,7 @@ const Runnable = struct { const Worker = struct { pool: *ThreadPool, - thread: *std.Thread, + thread: std.Thread, /// The node is for this worker only and must have an already initialized event /// when the thread is spawned. idle_node: IdleQueue.Node, From d016caaccba8ae9372c56d2a47e21f36cc2f4d83 Mon Sep 17 00:00:00 2001 From: kprotty Date: Sun, 20 Jun 2021 15:39:23 -0500 Subject: [PATCH 09/24] std.Thread: more compile error fixes --- lib/std/Thread.zig | 18 ++++++++++++++---- 1 file changed, 14 insertions(+), 4 deletions(-) diff --git a/lib/std/Thread.zig b/lib/std/Thread.zig index 22f12c3c95..c5cf49929c 100644 --- a/lib/std/Thread.zig +++ b/lib/std/Thread.zig @@ -10,6 +10,7 @@ const std = @import("std.zig"); const os = std.os; +const assert = std.debug.assert; const target = std.Target.current; const Atomic = std.atomic.Atomic; @@ -259,11 +260,20 @@ const WindowsThreadImpl = struct { }, }; - const stack_size = std.math.min(64 * 1024, std.math.cast(u32, config.stack_size) catch std.math.maxInt(u32)); - - const parameter = @ptrCast(*c_void, impl); + // Windows appears to only support SYSTEM_INFO.dwAllocationGranularity minimum stack size. + // Going lower makes it default to that specified in the executable (~1mb). + // Its also fine if the limit here is incorrect as stack size is only a hint. + var stack_size = std.math.cast(u32, config.stack_size) catch std.math.maxInt(u32); + stack_size = std.math.max(64 * 1024, stack_size); - instance.thread.thread_handle = windows.CreateThread(null, stack_size, Impl.entry, parameter, 0, null) orelse { + instance.thread.thread_handle = windows.CreateThread( + null, + stack_size, + Instance.entry, + @ptrCast(*c_void, instance), + 0, + null, + ) orelse { return windows.unexpectedError(windows.kernel32.GetLastError()); }; From b1f37b4eade8bf5bc63ee048ebdb83c584dbe0ed Mon Sep 17 00:00:00 2001 From: kprotty Date: Sun, 20 Jun 2021 17:12:28 -0500 Subject: [PATCH 10/24] std.Thread: uh more typo fixes --- lib/std/Thread.zig | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/lib/std/Thread.zig b/lib/std/Thread.zig index c5cf49929c..4644d5d831 100644 --- a/lib/std/Thread.zig +++ b/lib/std/Thread.zig @@ -206,7 +206,7 @@ const WindowsThreadImpl = struct { pub const ThreadHandle = windows.HANDLE; fn getCurrentId() u64 { - return windows.kernel.GetCurrentThreadId(); + return windows.kernel32.GetCurrentThreadId(); } fn getCpuCount() !usize { @@ -266,7 +266,7 @@ const WindowsThreadImpl = struct { var stack_size = std.math.cast(u32, config.stack_size) catch std.math.maxInt(u32); stack_size = std.math.max(64 * 1024, stack_size); - instance.thread.thread_handle = windows.CreateThread( + instance.thread.thread_handle = windows.kernel32.CreateThread( null, stack_size, Instance.entry, @@ -406,7 +406,7 @@ const PosixThreadImpl = struct { os.EAGAIN => error.SystemResources, os.EPERM => unreachable, os.EINVAL => unreachable, - else => os.unexpectedErrno(err), + else => |err| os.unexpectedErrno(err), }; } From 5f4a40e6aa7709fe708f9b20b32f63a50cc64736 Mon Sep 17 00:00:00 2001 From: kprotty Date: Tue, 22 Jun 2021 11:27:58 -0500 Subject: [PATCH 11/24] std.Thread: typo fixes 2 --- lib/std/Thread.zig | 168 +++++++++++++++++++++++++++------------------ 1 file changed, 101 insertions(+), 67 deletions(-) diff --git a/lib/std/Thread.zig b/lib/std/Thread.zig index 4644d5d831..a956f2d6c9 100644 --- a/lib/std/Thread.zig +++ b/lib/std/Thread.zig @@ -150,7 +150,7 @@ const Completion = Atomic(enum(u8) { /// Used by the Thread implementations to call the spawned function with the arguments. fn callFn(comptime f: anytype, args: anytype) switch (Impl) { - WindowsThreadImpl => windows.DWORD, + WindowsThreadImpl => std.os.windows.DWORD, LinuxThreadImpl => u8, PosixThreadImpl => ?*c_void, else => unreachable, @@ -223,7 +223,7 @@ const WindowsThreadImpl = struct { fn free(self: ThreadCompletion) void { const status = windows.kernel32.HeapFree(self.heap_handle, 0, self.heap_ptr); - assert(status == 0); + assert(status != 0); } }; @@ -233,9 +233,9 @@ const WindowsThreadImpl = struct { fn_args: Args, thread: ThreadCompletion, - fn entryFn(raw_ptr: *windows.PVOID) callconv(.C) windows.DWORD { + fn entryFn(raw_ptr: windows.PVOID) callconv(.C) windows.DWORD { const self = @ptrCast(*@This(), @alignCast(@alignOf(@This()), raw_ptr)); - defer switch (self.thread.completion.swap(.completed, .Acquire)) { + defer switch (self.thread.completion.swap(.completed, .SeqCst)) { .running => {}, .completed => unreachable, .detached => self.thread.free(), @@ -269,7 +269,7 @@ const WindowsThreadImpl = struct { instance.thread.thread_handle = windows.kernel32.CreateThread( null, stack_size, - Instance.entry, + Instance.entryFn, @ptrCast(*c_void, instance), 0, null, @@ -277,7 +277,7 @@ const WindowsThreadImpl = struct { return windows.unexpectedError(windows.kernel32.GetLastError()); }; - return .{ .thread = &instance.thread }; + return Impl{ .thread = &instance.thread }; } fn getHandle(self: Impl) ThreadHandle { @@ -286,7 +286,7 @@ const WindowsThreadImpl = struct { fn detach(self: Impl) void { windows.CloseHandle(self.thread.thread_handle); - switch (self.thread.completion.swap(.detached, .AcqRel)) { + switch (self.thread.completion.swap(.detached, .SeqCst)) { .running => {}, .completed => self.thread.free(), .detached => unreachable, @@ -296,6 +296,7 @@ const WindowsThreadImpl = struct { fn join(self: Impl) void { windows.WaitForSingleObjectEx(self.thread.thread_handle, windows.INFINITE, false) catch unreachable; windows.CloseHandle(self.thread.thread_handle); + assert(self.thread.completion.load(.SeqCst) == .completed); self.thread.free(); } }; @@ -625,67 +626,100 @@ const LinuxThreadImpl = struct { } // Calls `munmap(ptr, len)` then `exit(1)` without touching the stack (which lives in `ptr`). - // Ported over from musl libc's pthread detached implementation. + // Ported over from musl libc's pthread detached implementation (`__unmapself`). extern fn __unmap_and_exit(ptr: usize, len: usize) callconv(.C) noreturn; comptime { - asm(switch (target.cpu.arch) { - .i386 => ( - \\.text - \\.global __unmap_and_exit - \\.type __unmap_and_exit, @function - \\__unmap_and_exit: - \\ movl $91, %eax - \\ movl 4(%esp), %ebx - \\ movl 8(%esp), %ecx - \\ int $128 - \\ xorl %ebx, %ebx - \\ movl $1, %eax - \\ int $128 - ), - .x86_64 => ( - \\.text - \\.global __unmap_and_exit - \\.type __unmap_and_exit, @function - \\__unmap_and_exit: - \\ movl $11, %eax - \\ syscall - \\ xor %rdi, %rdi - \\ movl $60, %eax - \\ syscall - ), - .arm, .armeb, .aarch64, .aarch64_be, .aarch64_32 => ( - \\.text - \\.global __unmap_and_exit - \\.type __unmap_and_exit, @function - \\__unmap_and_exit: - \\ mov r7, #91 - \\ svc 0 - \\ mov r7, #1 - \\ svc 0 - ), - .mips, .mipsel, .mips64, .mips64el => ( - \\.set noreorder - \\.global __unmap_and_exit - \\.type __unmap_and_exit, @function - \\__unmap_and_exit: - \\ li $2, 4091 - \\ syscall - \\ li $4, 0 - \\ li $2, 4001 - \\ syscall - ), - .powerpc, .powerpc64, .powerpc64le => ( - \\.text - \\.global __unmap_and_exit - \\.type __unmap_and_exit, @function - \\__unmap_and_exit: - \\ li 0, 91 - \\ sc - \\ li 0, 1 - \\ sc - \\ blr - ), - else => @compileError("Platform not supported"), - }); + if (target.os.tag == .linux) { + asm(switch (target.cpu.arch) { + .i386 => ( + \\.text + \\.global __unmap_and_exit + \\.type __unmap_and_exit, @function + \\__unmap_and_exit: + \\ movl $91, %eax + \\ movl 4(%esp), %ebx + \\ movl 8(%esp), %ecx + \\ int $128 + \\ xorl %ebx, %ebx + \\ movl $1, %eax + \\ int $128 + ), + .x86_64 => ( + \\.text + \\.global __unmap_and_exit + \\.type __unmap_and_exit, @function + \\__unmap_and_exit: + \\ movl $11, %eax + \\ syscall + \\ xor %rdi, %rdi + \\ movl $60, %eax + \\ syscall + ), + .arm, .armeb, .thumb, .thumb_eb => ( + \\.syntax unified + \\.text + \\.global __unmap_and_exit + \\.type __unmap_and_exit, %function + \\__unmap_and_exit: + \\ mov r7, #91 + \\ svc 0 + \\ mov r7, #1 + \\ svc 0 + ), + .aarch64, .aarch64_be, .aarch64_32 => ( + \\.global __unmap_and_exit + \\.type __unmap_and_exit, %function + \\__unmap_and_exit: + \\ mov x8, #215 + \\ svc 0 + \\ mov x8, #93 + \\ svc 0 + ), + .mips, .mipsel, => ( + \\.set noreorder + \\.global __unmap_and_exit + \\.type __unmap_and_exit,@function + \\__unmap_and_exit: + \\ move $sp, $25 + \\ li $2, 4091 + \\ syscall + \\ li $4, 0 + \\ li $2, 4001 + \\ syscall + ), + .mips64, .mips64el => ( + \\.set noreorder + \\.global __unmap_and_exit + \\.type __unmap_and_exit, @function + \\__unmap_and_exit: + \\ li $2, 4091 + \\ syscall + \\ li $4, 0 + \\ li $2, 4001 + \\ syscall + ), + .powerpc, .powerpc64, .powerpc64le => ( + \\.text + \\.global __unmap_and_exit + \\.type __unmap_and_exit, %function + \\__unmap_and_exit: + \\ li 0, 91 + \\ sc + \\ li 0, 1 + \\ sc + \\ blr + ), + .riscv64 => ( + \\.global __unmap_and_exit + \\.type __unmap_and_exit, %function + \\__unmap_and_exit: + \\ li a7, 215 + \\ ecall + \\ li a7, 93 + \\ ecall + ), + else => @compileError("Platform not supported"), + }); + } } }; \ No newline at end of file From 009c95b8ec2f19f6d0db25c9284a7f79a07c387f Mon Sep 17 00:00:00 2001 From: kprotty Date: Fri, 25 Jun 2021 12:43:03 -0500 Subject: [PATCH 12/24] std.Thread: more fixes --- lib/std/Thread.zig | 20 ++++++++++---------- lib/std/os/test.zig | 2 +- 2 files changed, 11 insertions(+), 11 deletions(-) diff --git a/lib/std/Thread.zig b/lib/std/Thread.zig index a956f2d6c9..e7bb645d26 100644 --- a/lib/std/Thread.zig +++ b/lib/std/Thread.zig @@ -397,17 +397,17 @@ const PosixThreadImpl = struct { assert(c.pthread_attr_setguardsize(&attr, std.mem.page_size) == 0); var handle: c.pthread_t = undefined; - return switch (c.pthread_create( + switch (c.pthread_create( &handle, &attr, Instance.entryFn, @ptrCast(*c_void, args_ptr), )) { - 0 => .{ .handle = handle }, - os.EAGAIN => error.SystemResources, + 0 => return Impl{ .handle = handle }, + os.EAGAIN => return error.SystemResources, os.EPERM => unreachable, os.EINVAL => unreachable, - else => |err| os.unexpectedErrno(err), + else => |err| return os.unexpectedErrno(err), }; } @@ -563,7 +563,7 @@ const LinuxThreadImpl = struct { os.CLONE_PARENT_SETTID | os.CLONE_CHILD_CLEARTID | os.CLONE_DETACHED | os.CLONE_SETTLS; - return switch (linux.getErrno(linux.clone( + switch (linux.getErrno(linux.clone( Instance.entryFn, @ptrToInt(&mapped[stack_offset]), flags, @@ -572,14 +572,14 @@ const LinuxThreadImpl = struct { tls_ptr, &instance.thread.child_tid.value, ))) { - 0 => .{ .thread = &instance.thread }, - os.EAGAIN => error.ThreadQuotaExceeded, + 0 => return Impl{ .thread = &instance.thread }, + os.EAGAIN => return error.ThreadQuotaExceeded, os.EINVAL => unreachable, - os.ENOMEM => error.SystemResources, + os.ENOMEM => return error.SystemResources, os.ENOSPC => unreachable, os.EPERM => unreachable, os.EUSERS => unreachable, - else => |err| os.unexpectedErrno(err), + else => |err| return os.unexpectedErrno(err), }; } @@ -655,7 +655,7 @@ const LinuxThreadImpl = struct { \\ movl $60, %eax \\ syscall ), - .arm, .armeb, .thumb, .thumb_eb => ( + .arm, .armeb, .thumb, .thumbeb => ( \\.syntax unified \\.text \\.global __unmap_and_exit diff --git a/lib/std/os/test.zig b/lib/std/os/test.zig index 472da73235..750c63d447 100644 --- a/lib/std/os/test.zig +++ b/lib/std/os/test.zig @@ -372,7 +372,7 @@ test "thread local storage" { if (builtin.single_threaded) return error.SkipZigTest; const thread1 = try Thread.spawn(.{}, testTls, .{}); const thread2 = try Thread.spawn(.{}, testTls, .{}); - try testTls({}); + try testTls(); thread1.join(); thread2.join(); } From 6a903fc4c0ebe67f953feef253cbdfd43893a1bd Mon Sep 17 00:00:00 2001 From: kprotty Date: Fri, 25 Jun 2021 12:59:28 -0500 Subject: [PATCH 13/24] std.Thread: more typo fixes --- lib/std/Thread.zig | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/lib/std/Thread.zig b/lib/std/Thread.zig index e7bb645d26..04bd94729d 100644 --- a/lib/std/Thread.zig +++ b/lib/std/Thread.zig @@ -408,7 +408,7 @@ const PosixThreadImpl = struct { os.EPERM => unreachable, os.EINVAL => unreachable, else => |err| return os.unexpectedErrno(err), - }; + } } fn getHandle(self: Impl) ThreadHandle { @@ -580,7 +580,7 @@ const LinuxThreadImpl = struct { os.EPERM => unreachable, os.EUSERS => unreachable, else => |err| return os.unexpectedErrno(err), - }; + } } fn getHandle(self: Impl) ThreadHandle { From 18bcb2e990853f3e32cb0d72beb962390c9e8714 Mon Sep 17 00:00:00 2001 From: kprotty Date: Fri, 25 Jun 2021 14:04:11 -0500 Subject: [PATCH 14/24] std.Thread: fix futex thread spawning --- lib/std/Thread/Futex.zig | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/lib/std/Thread/Futex.zig b/lib/std/Thread/Futex.zig index 033ecf688d..26ebe23364 100644 --- a/lib/std/Thread/Futex.zig +++ b/lib/std/Thread/Futex.zig @@ -418,8 +418,8 @@ test "Futex - Signal" { fn runThread(rx: *Self, tx: *Self) void { var iterations: u32 = start_value; while (iterations < 10) : (iterations += 1) { - self.rx.recv(iterations); - self.tx.send(iterations); + rx.recv(iterations); + tx.send(iterations); } } @@ -528,11 +528,11 @@ test "Futex - Chain" { const Self = @This(); fn runThread(self: *Self, index: usize) void { - const this_signal = &chain.self.threads[chain.index].signal; + const this_signal = &self.threads[index].signal; - var next_signal = &chain.self.completed; - if (chain.index + 1 < chain.self.threads.len) { - next_signal = &chain.self.threads[chain.index + 1].signal; + var next_signal = &self.completed; + if (index + 1 < self.threads.len) { + next_signal = &self.threads[index + 1].signal; } this_signal.wait(); From fd4a607bb2f1a1cbf8b8c1fd5d35f5f775e79114 Mon Sep 17 00:00:00 2001 From: kprotty Date: Sat, 26 Jun 2021 09:03:53 -0500 Subject: [PATCH 15/24] std.Thread: fix futex test + thread errors --- lib/std/Thread.zig | 2 + lib/std/Thread/Futex.zig | 186 ++++++++++++++++++--------------------- 2 files changed, 90 insertions(+), 98 deletions(-) diff --git a/lib/std/Thread.zig b/lib/std/Thread.zig index 04bd94729d..19987d47a1 100644 --- a/lib/std/Thread.zig +++ b/lib/std/Thread.zig @@ -385,6 +385,7 @@ const PosixThreadImpl = struct { }; const args_ptr = try allocator.create(Args); + args_ptr.* = args; errdefer allocator.destroy(args_ptr); var attr: c.pthread_attr_t = undefined; @@ -523,6 +524,7 @@ const LinuxThreadImpl = struct { error.PermissionDenied => unreachable, else => |e| return e, }; + assert(mapped.len >= map_bytes); errdefer os.munmap(mapped); // map everything but the guard page as read/write diff --git a/lib/std/Thread/Futex.zig b/lib/std/Thread/Futex.zig index 26ebe23364..b1b2128caa 100644 --- a/lib/std/Thread/Futex.zig +++ b/lib/std/Thread/Futex.zig @@ -391,70 +391,74 @@ test "Futex - wait/wake" { } test "Futex - Signal" { - if (!single_threaded) { + if (single_threaded) { return; } - try (struct { + const Paddle = struct { value: Atomic(u32) = Atomic(u32).init(0), + current: u32 = 0, - const Self = @This(); + fn run(self: *@This(), hit_to: *@This()) !void { + var iterations: usize = 4; + while (iterations > 0) : (iterations -= 1) { - fn send(self: *Self, value: u32) void { - self.value.store(value, .Release); - Futex.wake(&self.value, 1); - } + var value: u32 = undefined; + while (true) { + value = self.value.load(.Acquire); + if (value != self.current) break; + Futex.wait(&self.value, self.current, null) catch unreachable; + } - fn recv(self: *Self, expected: u32) void { - while (true) { - const value = self.value.load(.Acquire); - if (value == expected) break; - Futex.wait(&self.value, value, null) catch unreachable; + try testing.expectEqual(value, self.current + 1); + self.current = value; + + _ = hit_to.value.fetchAdd(1, .Release); + Futex.wake(&hit_to.value, 1); } } + }; - const start_value = 1; + var ping = Paddle{}; + var pong = Paddle{}; - fn runThread(rx: *Self, tx: *Self) void { - var iterations: u32 = start_value; - while (iterations < 10) : (iterations += 1) { - rx.recv(iterations); - tx.send(iterations); - } - } + const t1 = try std.Thread.spawn(.{}, Paddle.run, .{&ping, &pong}); + defer t1.join(); - fn run() !void { - var ping = Self{}; - var pong = Self{}; + const t2 = try std.Thread.spawn(.{}, Paddle.run, .{&pong, &ping}); + defer t2.join(); - const t1 = try std.Thread.spawn(.{}, runThread, .{ &ping, &pong }); - defer t1.join(); - - const t2 = try std.Thread.spawn(.{}, runThread, .{ &pong, &ping }); - defer t2.join(); - - ping.send(start_value); - } - }).run(); + _ = ping.value.fetchAdd(1, .Release); + Futex.wake(&ping.value, 1); } test "Futex - Broadcast" { - if (!single_threaded) { + if (single_threaded) { return; } - try (struct { - threads: [10]std.Thread = undefined, + const Context = struct { + threads: [4]std.Thread = undefined, broadcast: Atomic(u32) = Atomic(u32).init(0), notified: Atomic(usize) = Atomic(usize).init(0), - const Self = @This(); - const BROADCAST_EMPTY = 0; const BROADCAST_SENT = 1; const BROADCAST_RECEIVED = 2; - fn runReceiver(self: *Self) void { + fn runSender(self: *@This()) !void { + self.broadcast.store(BROADCAST_SENT, .Monotonic); + Futex.wake(&self.broadcast, @intCast(u32, self.threads.len)); + + while (true) { + const broadcast = self.broadcast.load(.Acquire); + if (broadcast == BROADCAST_RECEIVED) break; + try testing.expectEqual(broadcast, BROADCAST_SENT); + Futex.wait(&self.broadcast, broadcast, null) catch unreachable; + } + } + + fn runReceiver(self: *@This()) void { while (true) { const broadcast = self.broadcast.load(.Acquire); if (broadcast == BROADCAST_SENT) break; @@ -468,66 +472,55 @@ test "Futex - Broadcast" { Futex.wake(&self.broadcast, 1); } } + }; - fn run() !void { - var self = Self{}; + var ctx = Context{}; + for (ctx.threads) |*thread| + thread.* = try std.Thread.spawn(.{}, Context.runReceiver, .{&ctx}); + defer for (ctx.threads) |thread| + thread.join(); - for (self.threads) |*thread| - thread.* = try std.Thread.spawn(runReceiver, &self); - defer for (self.threads) |thread| - thread.join(); + // Try to wait for the threads to start before running runSender(). + // NOTE: not actually needed for correctness. + std.time.sleep(16 * std.time.ns_per_ms); + try ctx.runSender(); - std.time.sleep(16 * std.time.ns_per_ms); - self.broadcast.store(BROADCAST_SENT, .Monotonic); - Futex.wake(&self.broadcast, @intCast(u32, self.threads.len)); - - while (true) { - const broadcast = self.broadcast.load(.Acquire); - if (broadcast == BROADCAST_RECEIVED) break; - try testing.expectEqual(broadcast, BROADCAST_SENT); - Futex.wait(&self.broadcast, broadcast, null) catch unreachable; - } - - const notified = self.notified.load(.Monotonic); - try testing.expectEqual(notified, self.threads.len); - } - }).run(); + const notified = ctx.notified.load(.Monotonic); + try testing.expectEqual(notified, ctx.threads.len); } test "Futex - Chain" { - if (!single_threaded) { + if (single_threaded) { return; } - try (struct { + const Signal = struct { + value: Atomic(u32) = Atomic(u32).init(0), + + fn wait(self: *@This()) void { + while (true) { + const value = self.value.load(.Acquire); + if (value == 1) break; + assert(value == 0); + Futex.wait(&self.value, 0, null) catch unreachable; + } + } + + fn notify(self: *@This()) void { + assert(self.value.load(.Unordered) == 0); + self.value.store(1, .Release); + Futex.wake(&self.value, 1); + } + }; + + const Context = struct { completed: Signal = .{}, - threads: [10]struct { + threads: [4]struct { thread: std.Thread, signal: Signal, } = undefined, - const Signal = struct { - state: Atomic(u32) = Atomic(u32).init(0), - - fn wait(self: *Signal) void { - while (true) { - const value = self.value.load(.Acquire); - if (value == 1) break; - assert(value == 0); - Futex.wait(&self.value, 0, null) catch unreachable; - } - } - - fn notify(self: *Signal) void { - assert(self.value.load(.Unordered) == 0); - self.value.store(1, .Release); - Futex.wake(&self.value, 1); - } - }; - - const Self = @This(); - - fn runThread(self: *Self, index: usize) void { + fn run(self: *@This(), index: usize) void { const this_signal = &self.threads[index].signal; var next_signal = &self.completed; @@ -538,21 +531,18 @@ test "Futex - Chain" { this_signal.wait(); next_signal.notify(); } + }; - fn run() !void { - var self = Self{}; + var ctx = Context{}; + for (ctx.threads) |*entry, index| { + entry.signal = .{}; + entry.thread = try std.Thread.spawn(.{}, Context.run, .{&ctx, index}); + } - for (self.threads) |*entry, index| { - entry.signal = .{}; - entry.thread = try std.Thread.spawn(.{}, runThread, .{&self, index}); - } + ctx.threads[0].signal.notify(); + ctx.completed.wait(); - self.threads[0].signal.notify(); - self.completed.wait(); - - for (self.threads) |entry| { - entry.thread.join(); - } - } - }).run(); + for (ctx.threads) |entry| { + entry.thread.join(); + } } From c6fb968a3d702fbf8067164052ccf562f5e362ef Mon Sep 17 00:00:00 2001 From: kprotty Date: Sat, 26 Jun 2021 10:52:34 -0500 Subject: [PATCH 16/24] std.Thread: fix posix --- lib/std/Thread.zig | 13 +++++++++---- 1 file changed, 9 insertions(+), 4 deletions(-) diff --git a/lib/std/Thread.zig b/lib/std/Thread.zig index 19987d47a1..db049b5d21 100644 --- a/lib/std/Thread.zig +++ b/lib/std/Thread.zig @@ -376,11 +376,16 @@ const PosixThreadImpl = struct { fn spawn(config: SpawnConfig, comptime f: anytype, args: anytype) !Impl { const Args = @TypeOf(args); const allocator = std.heap.c_allocator; + const Instance = struct { fn entryFn(raw_arg: ?*c_void) callconv(.C) ?*c_void { - const args_ptr = @ptrCast(*Args, @alignCast(@alignOf(Args), raw_arg orelse unreachable)); - defer allocator.destroy(args_ptr); - return callFn(f, args_ptr.*); + if (@sizeOf(Args) < 1) { + return callFn(f, @as(Args, undefined)); + } + + const args_ptr = @ptrCast(*Args, @alignCast(@alignOf(Args), raw_arg)); + defer allocator.destroy(args_ptr); + return callFn(f, args_ptr.*); } }; @@ -402,7 +407,7 @@ const PosixThreadImpl = struct { &handle, &attr, Instance.entryFn, - @ptrCast(*c_void, args_ptr), + if (@sizeOf(Args) > 1) @ptrCast(*c_void, args_ptr) else undefined, )) { 0 => return Impl{ .handle = handle }, os.EAGAIN => return error.SystemResources, From 7b323f84ca876c86bbe06f132d5a5d3775def3a2 Mon Sep 17 00:00:00 2001 From: kprotty Date: Sat, 26 Jun 2021 13:00:54 -0500 Subject: [PATCH 17/24] std.Thread: more fixes --- lib/std/Thread.zig | 40 ++++++++++++++++++++++++++++++++++++++-- lib/std/Thread/Futex.zig | 25 ++++++++++++++++++++----- 2 files changed, 58 insertions(+), 7 deletions(-) diff --git a/lib/std/Thread.zig b/lib/std/Thread.zig index db049b5d21..cbdcf00e9f 100644 --- a/lib/std/Thread.zig +++ b/lib/std/Thread.zig @@ -45,7 +45,7 @@ else if (use_pthreads) else if (target.os.tag == .linux) LinuxThreadImpl else - @compileLog("Unsupported operating system", target.os.tag); + UnsupportedImpl; impl: Impl, @@ -200,6 +200,40 @@ fn callFn(comptime f: anytype, args: anytype) switch (Impl) { } } +const UnsupportedImpl = struct { + pub const ThreadHandle = void; + + fn getCurrentId() u64 { + return unsupported({}); + } + + fn getCpuCount() !usize { + return unsupported({}); + } + + fn spawn(config: SpawnConfig, comptime f: anytype, args: anytype) !Impl { + return unsupported(.{config, f, args}); + } + + fn getHandle(self: Impl) ThreadHandle { + return unsupported(self); + } + + fn detach(self: Impl) void { + return unsupported(self); + } + + fn join(self: Impl) void { + return unsupported(self); + } + + fn unsupported(unusued: anytype) noreturn { + @compileLog("Unsupported operating system", target.os.tag); + _ = unusued; + unreachable; + } +}; + const WindowsThreadImpl = struct { const windows = os.windows; @@ -725,7 +759,9 @@ const LinuxThreadImpl = struct { \\ li a7, 93 \\ ecall ), - else => @compileError("Platform not supported"), + else => |cpu_arch| { + @compileLog("linux arch", cpu_arch, "is not supported"); + }, }); } } diff --git a/lib/std/Thread/Futex.zig b/lib/std/Thread/Futex.zig index b1b2128caa..de7dd5e73b 100644 --- a/lib/std/Thread/Futex.zig +++ b/lib/std/Thread/Futex.zig @@ -64,10 +64,9 @@ pub fn wait(ptr: *const Atomic(u32), expect: u32, timeout: ?u64) error{TimedOut} /// Unblocks at most `num_waiters` callers blocked in a `wait()` call on `ptr`. /// `num_waiters` of 1 unblocks at most one `wait(ptr, ...)` and `maxInt(u32)` unblocks effectively all `wait(ptr, ...)`. pub fn wake(ptr: *const Atomic(u32), num_waiters: u32) void { - if (num_waiters == 0 or single_threaded) { - return; - } - + if (single_threaded) return; + if (num_waiters == 0) return; + return OsFutex.wake(ptr, num_waiters); } @@ -80,7 +79,23 @@ else if (target.isDarwin()) else if (std.builtin.link_libc) PosixFutex else - @compileError("Operating System unsupported"); + UnsupportedFutex; + +const UnsupportedFutex = struct { + fn wait(ptr: *const Atomic(u32), expect: u32, timeout: ?u64) error{TimedOut}!void { + return unsupported(.{ptr, expect, timeout}); + } + + fn wake(ptr: *const Atomic(u32), num_waiters: u32) void { + return unsupported(.{ptr, num_waiters}); + } + + fn unsupported(unused: anytype) noreturn { + @compileLog("Unsupported operating system", target.os.tag); + _ = unused; + unreachable; + } +}; const WindowsFutex = struct { const windows = std.os.windows; From f0fa129e9b1cdbd90b231da14c6cd99c9413aa98 Mon Sep 17 00:00:00 2001 From: kprotty Date: Mon, 28 Jun 2021 11:27:23 -0500 Subject: [PATCH 18/24] std.Thread: more cleanup & testing --- doc/langref.html.in | 12 +- lib/std/Thread.zig | 300 ++++++++++++++++++++++----------------- lib/std/Thread/Futex.zig | 6 +- lib/std/c.zig | 1 + lib/std/os/test.zig | 11 +- 5 files changed, 184 insertions(+), 146 deletions(-) diff --git a/doc/langref.html.in b/doc/langref.html.in index 667b4cd2a7..22965f667e 100644 --- a/doc/langref.html.in +++ b/doc/langref.html.in @@ -958,14 +958,14 @@ const assert = std.debug.assert; threadlocal var x: i32 = 1234; test "thread local storage" { - const thread1 = try std.Thread.spawn(testTls, {}); - const thread2 = try std.Thread.spawn(testTls, {}); - testTls({}); - thread1.wait(); - thread2.wait(); + const thread1 = try std.Thread.spawn(.{}, testTls, .{}); + const thread2 = try std.Thread.spawn(.{}, testTls, .{}); + testTls(); + thread1.join(); + thread2.join(); } -fn testTls(_: void) void { +fn testTls() void { assert(x == 1234); x += 1; assert(x == 1235); diff --git a/lib/std/Thread.zig b/lib/std/Thread.zig index cbdcf00e9f..73678714d0 100644 --- a/lib/std/Thread.zig +++ b/lib/std/Thread.zig @@ -24,17 +24,6 @@ pub const Condition = @import("Thread/Condition.zig"); pub const spinLoopHint = @compileError("deprecated: use std.atomic.spinLoopHint"); -test "std.Thread" { - // Doesn't use testing.refAllDecls() since that would pull in the compileError spinLoopHint. - _ = AutoResetEvent; - _ = Futex; - _ = ResetEvent; - _ = StaticResetEvent; - _ = Mutex; - _ = Semaphore; - _ = Condition; -} - pub const use_pthreads = target.os.tag != .windows and std.builtin.link_libc; const Thread = @This(); @@ -50,7 +39,6 @@ else impl: Impl, /// Represents a unique ID per thread. -/// May be an integer or pointer depending on the platform. pub const Id = u64; /// Returns the platform ID of the callers thread. @@ -79,7 +67,7 @@ pub const SpawnConfig = struct { stack_size: usize = 16 * 1024 * 1024, }; -pub const SpawnError = error { +pub const SpawnError = error{ /// A system-imposed limit on the number of threads was encountered. /// There are a number of limits that may trigger this error: /// * the RLIMIT_NPROC soft resource limit (set via setrlimit(2)), @@ -115,7 +103,7 @@ pub const SpawnError = error { /// or call `detach()` to excuse the caller from calling `join()` and have the thread clean up its resources on completion`. pub fn spawn(config: SpawnConfig, comptime function: anytype, args: anytype) SpawnError!Thread { if (std.builtin.single_threaded) { - @compileError("cannot spawn thread when building in single-threaded mode"); + @compileError("Cannot spawn thread when building in single-threaded mode"); } const impl = try Impl.spawn(config, function, args); @@ -132,11 +120,13 @@ pub fn getHandle(self: Thread) Handle { } /// Release the obligation of the caller to call `join()` and have the thread clean up its own resources on completion. +/// Once called, this consumes the Thread object and invoking any other functions on it is considered undefined behavior. pub fn detach(self: Thread) void { return self.impl.detach(); } /// Waits for the thread to complete, then deallocates any resources created on `spawn()`. +/// Once called, this consumes the Thread object and invoking any other functions on it is considered undefined behavior. pub fn join(self: Thread) void { return self.impl.join(); } @@ -200,6 +190,8 @@ fn callFn(comptime f: anytype, args: anytype) switch (Impl) { } } +/// We can't compile error in the `Impl` switch statement as its eagerly evaluated. +/// So instead, we compile-error on the methods themselves for platforms which don't support threads. const UnsupportedImpl = struct { pub const ThreadHandle = void; @@ -212,7 +204,7 @@ const UnsupportedImpl = struct { } fn spawn(config: SpawnConfig, comptime f: anytype, args: anytype) !Impl { - return unsupported(.{config, f, args}); + return unsupported(.{ config, f, args }); } fn getHandle(self: Impl) ThreadHandle { @@ -225,7 +217,7 @@ const UnsupportedImpl = struct { fn join(self: Impl) void { return unsupported(self); - } + } fn unsupported(unusued: anytype) noreturn { @compileLog("Unsupported operating system", target.os.tag); @@ -244,6 +236,7 @@ const WindowsThreadImpl = struct { } fn getCpuCount() !usize { + // Faster than calling into GetSystemInfo(), even if amortized. return windows.peb().NumberOfProcessors; } @@ -299,16 +292,17 @@ const WindowsThreadImpl = struct { // Its also fine if the limit here is incorrect as stack size is only a hint. var stack_size = std.math.cast(u32, config.stack_size) catch std.math.maxInt(u32); stack_size = std.math.max(64 * 1024, stack_size); - + instance.thread.thread_handle = windows.kernel32.CreateThread( - null, - stack_size, - Instance.entryFn, - @ptrCast(*c_void, instance), - 0, + null, + stack_size, + Instance.entryFn, + @ptrCast(*c_void, instance), + 0, null, ) orelse { - return windows.unexpectedError(windows.kernel32.GetLastError()); + const errno = windows.kernel32.GetLastError(); + return windows.unexpectedError(errno); }; return Impl{ .thread = &instance.thread }; @@ -332,7 +326,7 @@ const WindowsThreadImpl = struct { windows.CloseHandle(self.thread.thread_handle); assert(self.thread.completion.load(.SeqCst) == .completed); self.thread.free(); - } + } }; const PosixThreadImpl = struct { @@ -374,7 +368,9 @@ const PosixThreadImpl = struct { fn getCpuCount() !usize { switch (target.os.tag) { - .linux => return LinuxThreadImpl.getCpuCount(), + .linux => { + return LinuxThreadImpl.getCpuCount(); + }, .openbsd => { var count: c_int = undefined; var count_size: usize = @sizeOf(c_int); @@ -413,6 +409,7 @@ const PosixThreadImpl = struct { const Instance = struct { fn entryFn(raw_arg: ?*c_void) callconv(.C) ?*c_void { + // @alignCast() below doesn't support zero-sized-types (ZST) if (@sizeOf(Args) < 1) { return callFn(f, @as(Args, undefined)); } @@ -457,8 +454,9 @@ const PosixThreadImpl = struct { fn detach(self: Impl) void { switch (c.pthread_detach(self.handle)) { - os.EINVAL => unreachable, - os.ESRCH => unreachable, + 0 => {}, + os.EINVAL => unreachable, // thread handle is not joinable + os.ESRCH => unreachable, // thread handle is invalid else => unreachable, } } @@ -466,9 +464,9 @@ const PosixThreadImpl = struct { fn join(self: Impl) void { switch (c.pthread_join(self.handle, null)) { 0 => {}, - os.EINVAL => unreachable, - os.ESRCH => unreachable, - os.EDEADLK => unreachable, + os.EINVAL => unreachable, // thread handle is not joinable (or another thread is already joining in) + os.ESRCH => unreachable, // thread handle is invalid + os.EDEADLK => unreachable, // two threads tried to join each other else => unreachable, } } @@ -476,7 +474,7 @@ const PosixThreadImpl = struct { const LinuxThreadImpl = struct { const linux = os.linux; - + pub const ThreadHandle = i32; threadlocal var tls_thread_id: ?Id = null; @@ -491,7 +489,8 @@ const LinuxThreadImpl = struct { fn getCpuCount() !usize { const cpu_set = try os.sched_getaffinity(0); - return @as(usize, os.CPU_COUNT(cpu_set)); // TODO should not need this usize cast + // TODO: should not need this usize cast + return @as(usize, os.CPU_COUNT(cpu_set)); } thread: *ThreadCompletion, @@ -547,7 +546,7 @@ const LinuxThreadImpl = struct { bytes = std.mem.alignForward(bytes, std.mem.page_size); break :blk bytes; }; - + // map all memory needed without read/write permissions // to avoid committing the whole region right away const mapped = os.mmap( @@ -654,7 +653,7 @@ const LinuxThreadImpl = struct { switch (linux.getErrno(linux.futex_wait( &self.thread.child_tid.value, - linux.FUTEX_WAIT, + linux.FUTEX_WAIT, tid, null, ))) { @@ -671,98 +670,145 @@ const LinuxThreadImpl = struct { extern fn __unmap_and_exit(ptr: usize, len: usize) callconv(.C) noreturn; comptime { if (target.os.tag == .linux) { - asm(switch (target.cpu.arch) { - .i386 => ( - \\.text - \\.global __unmap_and_exit - \\.type __unmap_and_exit, @function - \\__unmap_and_exit: - \\ movl $91, %eax - \\ movl 4(%esp), %ebx - \\ movl 8(%esp), %ecx - \\ int $128 - \\ xorl %ebx, %ebx - \\ movl $1, %eax - \\ int $128 - ), - .x86_64 => ( - \\.text - \\.global __unmap_and_exit - \\.type __unmap_and_exit, @function - \\__unmap_and_exit: - \\ movl $11, %eax - \\ syscall - \\ xor %rdi, %rdi - \\ movl $60, %eax - \\ syscall - ), - .arm, .armeb, .thumb, .thumbeb => ( - \\.syntax unified - \\.text - \\.global __unmap_and_exit - \\.type __unmap_and_exit, %function - \\__unmap_and_exit: - \\ mov r7, #91 - \\ svc 0 - \\ mov r7, #1 - \\ svc 0 - ), - .aarch64, .aarch64_be, .aarch64_32 => ( - \\.global __unmap_and_exit - \\.type __unmap_and_exit, %function - \\__unmap_and_exit: - \\ mov x8, #215 - \\ svc 0 - \\ mov x8, #93 - \\ svc 0 - ), - .mips, .mipsel, => ( - \\.set noreorder - \\.global __unmap_and_exit - \\.type __unmap_and_exit,@function - \\__unmap_and_exit: - \\ move $sp, $25 - \\ li $2, 4091 - \\ syscall - \\ li $4, 0 - \\ li $2, 4001 - \\ syscall - ), - .mips64, .mips64el => ( - \\.set noreorder - \\.global __unmap_and_exit - \\.type __unmap_and_exit, @function - \\__unmap_and_exit: - \\ li $2, 4091 - \\ syscall - \\ li $4, 0 - \\ li $2, 4001 - \\ syscall - ), - .powerpc, .powerpc64, .powerpc64le => ( - \\.text - \\.global __unmap_and_exit - \\.type __unmap_and_exit, %function - \\__unmap_and_exit: - \\ li 0, 91 - \\ sc - \\ li 0, 1 - \\ sc - \\ blr - ), - .riscv64 => ( - \\.global __unmap_and_exit - \\.type __unmap_and_exit, %function - \\__unmap_and_exit: - \\ li a7, 215 - \\ ecall - \\ li a7, 93 - \\ ecall - ), - else => |cpu_arch| { - @compileLog("linux arch", cpu_arch, "is not supported"); - }, - }); + asm (switch (target.cpu.arch) { + .i386 => ( + \\.text + \\.global __unmap_and_exit + \\.type __unmap_and_exit, @function + \\__unmap_and_exit: + \\ movl $91, %eax + \\ movl 4(%esp), %ebx + \\ movl 8(%esp), %ecx + \\ int $128 + \\ xorl %ebx, %ebx + \\ movl $1, %eax + \\ int $128 + ), + .x86_64 => ( + \\.text + \\.global __unmap_and_exit + \\.type __unmap_and_exit, @function + \\__unmap_and_exit: + \\ movl $11, %eax + \\ syscall + \\ xor %rdi, %rdi + \\ movl $60, %eax + \\ syscall + ), + .arm, .armeb, .thumb, .thumbeb => ( + \\.syntax unified + \\.text + \\.global __unmap_and_exit + \\.type __unmap_and_exit, %function + \\__unmap_and_exit: + \\ mov r7, #91 + \\ svc 0 + \\ mov r7, #1 + \\ svc 0 + ), + .aarch64, .aarch64_be, .aarch64_32 => ( + \\.global __unmap_and_exit + \\.type __unmap_and_exit, %function + \\__unmap_and_exit: + \\ mov x8, #215 + \\ svc 0 + \\ mov x8, #93 + \\ svc 0 + ), + .mips, + .mipsel, + => ( + \\.set noreorder + \\.global __unmap_and_exit + \\.type __unmap_and_exit,@function + \\__unmap_and_exit: + \\ move $sp, $25 + \\ li $2, 4091 + \\ syscall + \\ li $4, 0 + \\ li $2, 4001 + \\ syscall + ), + .mips64, .mips64el => ( + \\.set noreorder + \\.global __unmap_and_exit + \\.type __unmap_and_exit, @function + \\__unmap_and_exit: + \\ li $2, 4091 + \\ syscall + \\ li $4, 0 + \\ li $2, 4001 + \\ syscall + ), + .powerpc, .powerpc64, .powerpc64le => ( + \\.text + \\.global __unmap_and_exit + \\.type __unmap_and_exit, %function + \\__unmap_and_exit: + \\ li 0, 91 + \\ sc + \\ li 0, 1 + \\ sc + \\ blr + ), + .riscv64 => ( + \\.global __unmap_and_exit + \\.type __unmap_and_exit, %function + \\__unmap_and_exit: + \\ li a7, 215 + \\ ecall + \\ li a7, 93 + \\ ecall + ), + else => |cpu_arch| { + @compileLog("linux arch", cpu_arch, "is not supported"); + }, + }); } } -}; \ No newline at end of file +}; + +test "std.Thread" { + // Doesn't use testing.refAllDecls() since that would pull in the compileError spinLoopHint. + _ = AutoResetEvent; + _ = Futex; + _ = ResetEvent; + _ = StaticResetEvent; + _ = Mutex; + _ = Semaphore; + _ = Condition; +} + +fn testIncrementNotify(value: *usize, event: *ResetEvent) void { + value.* += 1; + event.set(); +} + +test "Thread.join" { + if (std.builtin.single_threaded) return error.SkipZigTest; + + var value: usize = 0; + var event: ResetEvent = undefined; + try event.init(); + defer event.deinit(); + + const thread = try Thread.spawn(.{}, testIncrementNotify, .{&value, &event}); + thread.join(); + + try std.testing.expectEqual(value, 1); +} + +test "Thread.detach" { + if (std.builtin.single_threaded) return error.SkipZigTest; + + var value: usize = 0; + var event: ResetEvent = undefined; + try event.init(); + defer event.deinit(); + + const thread = try Thread.spawn(.{}, testIncrementNotify, .{&value, &event}); + thread.detach(); + + event.wait(); + try std.testing.expectEqual(value, 1); +} \ No newline at end of file diff --git a/lib/std/Thread/Futex.zig b/lib/std/Thread/Futex.zig index de7dd5e73b..4725deb2e3 100644 --- a/lib/std/Thread/Futex.zig +++ b/lib/std/Thread/Futex.zig @@ -407,7 +407,7 @@ test "Futex - wait/wake" { test "Futex - Signal" { if (single_threaded) { - return; + return error.SkipZigTest; } const Paddle = struct { @@ -449,7 +449,7 @@ test "Futex - Signal" { test "Futex - Broadcast" { if (single_threaded) { - return; + return error.SkipZigTest; } const Context = struct { @@ -506,7 +506,7 @@ test "Futex - Broadcast" { test "Futex - Chain" { if (single_threaded) { - return; + return error.SkipZigTest; } const Signal = struct { diff --git a/lib/std/c.zig b/lib/std/c.zig index 2a8915bd1b..a3fe814a71 100644 --- a/lib/std/c.zig +++ b/lib/std/c.zig @@ -277,6 +277,7 @@ pub extern "c" fn pthread_attr_setguardsize(attr: *pthread_attr_t, guardsize: us pub extern "c" fn pthread_attr_destroy(attr: *pthread_attr_t) c_int; pub extern "c" fn pthread_self() pthread_t; pub extern "c" fn pthread_join(thread: pthread_t, arg_return: ?*?*c_void) c_int; +pub extern "c" fn pthread_detach(thread: pthread_t) c_int; pub extern "c" fn pthread_atfork( prepare: ?fn () callconv(.C) void, parent: ?fn () callconv(.C) void, diff --git a/lib/std/os/test.zig b/lib/std/os/test.zig index 750c63d447..6184c97706 100644 --- a/lib/std/os/test.zig +++ b/lib/std/os/test.zig @@ -321,17 +321,8 @@ test "std.Thread.getCurrentId" { var thread_current_id: Thread.Id = undefined; const thread = try Thread.spawn(.{}, testThreadIdFn, .{&thread_current_id}); - const thread_id = thread.getHandle(); thread.join(); - if (Thread.use_pthreads) { - try expect(thread_current_id == thread_id); - } else if (native_os == .windows) { - try expect(Thread.getCurrentId() != thread_current_id); - } else { - // If the thread completes very quickly, then thread_id can be 0. See the - // documentation comments for `std.Thread.handle`. - try expect(thread_id == 0 or thread_current_id == thread_id); - } + try expect(Thread.getCurrentId() != thread_current_id); } test "spawn threads" { From 6dd6604638b6796a01951f83668283d9f3c4c5b8 Mon Sep 17 00:00:00 2001 From: kprotty Date: Mon, 28 Jun 2021 14:17:16 -0500 Subject: [PATCH 19/24] std.Thread: fix tls 9386 linux typo --- lib/std/Thread.zig | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lib/std/Thread.zig b/lib/std/Thread.zig index 73678714d0..4038f755b7 100644 --- a/lib/std/Thread.zig +++ b/lib/std/Thread.zig @@ -581,7 +581,7 @@ const LinuxThreadImpl = struct { defer tls_ptr = @ptrToInt(&user_desc); user_desc = .{ .entry_number = os.linux.tls.tls_image.gdt_entry_number, - .base_addr = tks_ptr, + .base_addr = tls_ptr, .limit = 0xfffff, .seg_32bit = 1, .contents = 0, // Data From 2a6ba410209e867ced3b82cf7326960137995853 Mon Sep 17 00:00:00 2001 From: kprotty Date: Tue, 29 Jun 2021 11:51:50 -0500 Subject: [PATCH 20/24] std.Thread: add CLONE_CHILD_SETTID to fix join() --- lib/std/Thread.zig | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/lib/std/Thread.zig b/lib/std/Thread.zig index 4038f755b7..ae1462eb29 100644 --- a/lib/std/Thread.zig +++ b/lib/std/Thread.zig @@ -598,10 +598,10 @@ const LinuxThreadImpl = struct { .thread = .{ .mapped = mapped }, }; - const flags: u32 = os.CLONE_VM | os.CLONE_FS | os.CLONE_FILES | - os.CLONE_SIGHAND | os.CLONE_THREAD | os.CLONE_SYSVSEM | - os.CLONE_PARENT_SETTID | os.CLONE_CHILD_CLEARTID | - os.CLONE_DETACHED | os.CLONE_SETTLS; + const flags: u32 = os.CLONE_THREAD | os.CLONE_DETACHED | + os.CLONE_VM | os.CLONE_FS | os.CLONE_FILES | + os.CLONE_SIGHAND | os.CLONE_SYSVSEM | os.CLONE_SETTLS | + os.CLONE_PARENT_SETTID | os.CLONE_CHILD_SETTID | os.CLONE_CHILD_CLEARTID; switch (linux.getErrno(linux.clone( Instance.entryFn, From 2309c81a7812fab071fd2994bb3074edf748de63 Mon Sep 17 00:00:00 2001 From: kprotty Date: Wed, 30 Jun 2021 11:55:52 -0500 Subject: [PATCH 21/24] std.Thread: non-zero child_tid to avoid racy join() --- lib/std/Thread.zig | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/lib/std/Thread.zig b/lib/std/Thread.zig index ae1462eb29..0e82158889 100644 --- a/lib/std/Thread.zig +++ b/lib/std/Thread.zig @@ -497,7 +497,7 @@ const LinuxThreadImpl = struct { const ThreadCompletion = struct { completion: Completion = Completion.init(.running), - child_tid: Atomic(i32) = Atomic(i32).init(0), + child_tid: Atomic(i32) = Atomic(i32).init(1), parent_tid: i32 = undefined, mapped: []align(std.mem.page_size) u8, }; @@ -510,7 +510,7 @@ const LinuxThreadImpl = struct { fn entryFn(raw_arg: usize) callconv(.C) u8 { const self = @intToPtr(*@This(), raw_arg); - defer switch (self.thread.completion.swap(.completed, .Acquire)) { + defer switch (self.thread.completion.swap(.completed, .SeqCst)) { .running => {}, .completed => unreachable, .detached => { @@ -600,8 +600,8 @@ const LinuxThreadImpl = struct { const flags: u32 = os.CLONE_THREAD | os.CLONE_DETACHED | os.CLONE_VM | os.CLONE_FS | os.CLONE_FILES | - os.CLONE_SIGHAND | os.CLONE_SYSVSEM | os.CLONE_SETTLS | - os.CLONE_PARENT_SETTID | os.CLONE_CHILD_SETTID | os.CLONE_CHILD_CLEARTID; + os.CLONE_PARENT_SETTID | os.CLONE_CHILD_CLEARTID | + os.CLONE_SIGHAND | os.CLONE_SYSVSEM | os.CLONE_SETTLS; switch (linux.getErrno(linux.clone( Instance.entryFn, @@ -628,7 +628,7 @@ const LinuxThreadImpl = struct { } fn detach(self: Impl) void { - switch (self.thread.completion.swap(.detached, .AcqRel)) { + switch (self.thread.completion.swap(.detached, .SeqCst)) { .running => {}, .completed => self.join(), .detached => unreachable, @@ -640,7 +640,7 @@ const LinuxThreadImpl = struct { var spin: u8 = 10; while (true) { - const tid = self.thread.child_tid.load(.Acquire); + const tid = self.thread.child_tid.load(.SeqCst); if (tid == 0) { break; } From 98106b09d5bf1bcc1be4fe09a7fa645b3b343732 Mon Sep 17 00:00:00 2001 From: kprotty Date: Wed, 30 Jun 2021 21:49:38 -0500 Subject: [PATCH 22/24] zig fmt --- lib/std/Thread.zig | 6 +++--- lib/std/Thread/Futex.zig | 17 ++++++++--------- tools/update_cpu_features.zig | 2 +- 3 files changed, 12 insertions(+), 13 deletions(-) diff --git a/lib/std/Thread.zig b/lib/std/Thread.zig index 0e82158889..8f245c257c 100644 --- a/lib/std/Thread.zig +++ b/lib/std/Thread.zig @@ -792,7 +792,7 @@ test "Thread.join" { try event.init(); defer event.deinit(); - const thread = try Thread.spawn(.{}, testIncrementNotify, .{&value, &event}); + const thread = try Thread.spawn(.{}, testIncrementNotify, .{ &value, &event }); thread.join(); try std.testing.expectEqual(value, 1); @@ -806,9 +806,9 @@ test "Thread.detach" { try event.init(); defer event.deinit(); - const thread = try Thread.spawn(.{}, testIncrementNotify, .{&value, &event}); + const thread = try Thread.spawn(.{}, testIncrementNotify, .{ &value, &event }); thread.detach(); event.wait(); try std.testing.expectEqual(value, 1); -} \ No newline at end of file +} diff --git a/lib/std/Thread/Futex.zig b/lib/std/Thread/Futex.zig index 4725deb2e3..2a18711231 100644 --- a/lib/std/Thread/Futex.zig +++ b/lib/std/Thread/Futex.zig @@ -66,7 +66,7 @@ pub fn wait(ptr: *const Atomic(u32), expect: u32, timeout: ?u64) error{TimedOut} pub fn wake(ptr: *const Atomic(u32), num_waiters: u32) void { if (single_threaded) return; if (num_waiters == 0) return; - + return OsFutex.wake(ptr, num_waiters); } @@ -83,11 +83,11 @@ else const UnsupportedFutex = struct { fn wait(ptr: *const Atomic(u32), expect: u32, timeout: ?u64) error{TimedOut}!void { - return unsupported(.{ptr, expect, timeout}); + return unsupported(.{ ptr, expect, timeout }); } fn wake(ptr: *const Atomic(u32), num_waiters: u32) void { - return unsupported(.{ptr, num_waiters}); + return unsupported(.{ ptr, num_waiters }); } fn unsupported(unused: anytype) noreturn { @@ -417,7 +417,6 @@ test "Futex - Signal" { fn run(self: *@This(), hit_to: *@This()) !void { var iterations: usize = 4; while (iterations > 0) : (iterations -= 1) { - var value: u32 = undefined; while (true) { value = self.value.load(.Acquire); @@ -427,7 +426,7 @@ test "Futex - Signal" { try testing.expectEqual(value, self.current + 1); self.current = value; - + _ = hit_to.value.fetchAdd(1, .Release); Futex.wake(&hit_to.value, 1); } @@ -437,10 +436,10 @@ test "Futex - Signal" { var ping = Paddle{}; var pong = Paddle{}; - const t1 = try std.Thread.spawn(.{}, Paddle.run, .{&ping, &pong}); + const t1 = try std.Thread.spawn(.{}, Paddle.run, .{ &ping, &pong }); defer t1.join(); - const t2 = try std.Thread.spawn(.{}, Paddle.run, .{&pong, &ping}); + const t2 = try std.Thread.spawn(.{}, Paddle.run, .{ &pong, &ping }); defer t2.join(); _ = ping.value.fetchAdd(1, .Release); @@ -497,7 +496,7 @@ test "Futex - Broadcast" { // Try to wait for the threads to start before running runSender(). // NOTE: not actually needed for correctness. - std.time.sleep(16 * std.time.ns_per_ms); + std.time.sleep(16 * std.time.ns_per_ms); try ctx.runSender(); const notified = ctx.notified.load(.Monotonic); @@ -551,7 +550,7 @@ test "Futex - Chain" { var ctx = Context{}; for (ctx.threads) |*entry, index| { entry.signal = .{}; - entry.thread = try std.Thread.spawn(.{}, Context.run, .{&ctx, index}); + entry.thread = try std.Thread.spawn(.{}, Context.run, .{ &ctx, index }); } ctx.threads[0].signal.notify(); diff --git a/tools/update_cpu_features.zig b/tools/update_cpu_features.zig index 4694e3de25..bdc82fa22e 100644 --- a/tools/update_cpu_features.zig +++ b/tools/update_cpu_features.zig @@ -819,7 +819,7 @@ pub fn main() anyerror!void { var threads = try arena.alloc(std.Thread, llvm_targets.len); for (llvm_targets) |llvm_target, i| { threads[i] = try std.Thread.spawn(.{}, processOneTarget, .{ - Job{ + Job{ .llvm_tblgen_exe = llvm_tblgen_exe, .llvm_src_root = llvm_src_root, .zig_src_dir = zig_src_dir, From 483eb8e05788f2ee258bc1719f210c51a2057dd6 Mon Sep 17 00:00:00 2001 From: kprotty Date: Thu, 1 Jul 2021 17:34:23 -0500 Subject: [PATCH 23/24] std.Thread: move linux detach code to inline asm --- lib/std/Thread.zig | 202 +++++++++++++++++++++------------------------ 1 file changed, 96 insertions(+), 106 deletions(-) diff --git a/lib/std/Thread.zig b/lib/std/Thread.zig index 8f245c257c..37b0a8751f 100644 --- a/lib/std/Thread.zig +++ b/lib/std/Thread.zig @@ -500,6 +500,101 @@ const LinuxThreadImpl = struct { child_tid: Atomic(i32) = Atomic(i32).init(1), parent_tid: i32 = undefined, mapped: []align(std.mem.page_size) u8, + + /// Calls `munmap(mapped.ptr, mapped.len)` then `exit(1)` without touching the stack (which lives in `mapped.ptr`). + /// Ported over from musl libc's pthread detached implementation: + /// https://github.com/ifduyue/musl/search?q=__unmapself + fn freeAndExit(self: *ThreadCompletion) noreturn { + const unmap_and_exit: []const u8 = switch (target.cpu.arch) { + .i386 => ( + \\ movl $91, %%eax + \\ movl %[ptr], %%ebx + \\ movl %[len], %%ecx + \\ int $128 + \\ movl $1, %%eax + \\ movl $0, %%ebx + \\ int $128 + ), + .x86_64 => ( + \\ movq $11, %%rax + \\ movq %[ptr], %%rbx + \\ movq %[len], %%rcx + \\ syscall + \\ movq $60, %%rax + \\ movq $1, %%rdi + \\ syscall + ), + .arm, .armeb, .thumb, .thumbeb => ( + \\ mov r7, #91 + \\ mov r0, %[ptr] + \\ mov r1, %[len] + \\ svc 0 + \\ mov r7, #1 + \\ mov r0, #0 + \\ svc 0 + ), + .aarch64, .aarch64_be, .aarch64_32 => ( + \\ mov x8, #215 + \\ mov x0, %[ptr] + \\ mov x1, %[len] + \\ svc 0 + \\ mov x8, #93 + \\ mov x0, #0 + \\ svc 0 + ), + .mips, .mipsel => ( + \\ move $sp, $25 + \\ li $2, 4091 + \\ move $4, %[ptr] + \\ move $5, %[len] + \\ syscall + \\ li $2, 4001 + \\ li $4, 0 + \\ syscall + ), + .mips64, .mips64el => ( + \\ li $2, 4091 + \\ move $4, %[ptr] + \\ move $5, %[len] + \\ syscall + \\ li $2, 4001 + \\ li $4, 0 + \\ syscall + ), + .powerpc, .powerpcle, .powerpc64, .powerpc64le => ( + \\ li 0, 91 + \\ mr %[ptr], 3 + \\ mr %[len], 4 + \\ sc + \\ li 0, 1 + \\ li 3, 0 + \\ sc + \\ blr + ), + .riscv64 => ( + \\ li a7, 215 + \\ mv a0, %[ptr] + \\ mv a1, %[len] + \\ ecall + \\ li a7, 93 + \\ mv a0, zero + \\ ecall + ), + else => |cpu_arch| { + @compileLog("Unsupported linux arch ", cpu_arch); + }, + }; + + asm volatile ( + unmap_and_exit + : + : [ptr] "r" (@ptrToInt(self.mapped.ptr)), + [len] "r" (self.mapped.len) + : "memory" + ); + + unreachable; + } }; fn spawn(config: SpawnConfig, comptime f: anytype, args: anytype) !Impl { @@ -513,10 +608,7 @@ const LinuxThreadImpl = struct { defer switch (self.thread.completion.swap(.completed, .SeqCst)) { .running => {}, .completed => unreachable, - .detached => { - const memory = self.thread.mapped; - __unmap_and_exit(@ptrToInt(memory.ptr), memory.len); - }, + .detached => self.thread.freeAndExit(), }; return callFn(f, self.fn_args); } @@ -664,108 +756,6 @@ const LinuxThreadImpl = struct { } } } - - // Calls `munmap(ptr, len)` then `exit(1)` without touching the stack (which lives in `ptr`). - // Ported over from musl libc's pthread detached implementation (`__unmapself`). - extern fn __unmap_and_exit(ptr: usize, len: usize) callconv(.C) noreturn; - comptime { - if (target.os.tag == .linux) { - asm (switch (target.cpu.arch) { - .i386 => ( - \\.text - \\.global __unmap_and_exit - \\.type __unmap_and_exit, @function - \\__unmap_and_exit: - \\ movl $91, %eax - \\ movl 4(%esp), %ebx - \\ movl 8(%esp), %ecx - \\ int $128 - \\ xorl %ebx, %ebx - \\ movl $1, %eax - \\ int $128 - ), - .x86_64 => ( - \\.text - \\.global __unmap_and_exit - \\.type __unmap_and_exit, @function - \\__unmap_and_exit: - \\ movl $11, %eax - \\ syscall - \\ xor %rdi, %rdi - \\ movl $60, %eax - \\ syscall - ), - .arm, .armeb, .thumb, .thumbeb => ( - \\.syntax unified - \\.text - \\.global __unmap_and_exit - \\.type __unmap_and_exit, %function - \\__unmap_and_exit: - \\ mov r7, #91 - \\ svc 0 - \\ mov r7, #1 - \\ svc 0 - ), - .aarch64, .aarch64_be, .aarch64_32 => ( - \\.global __unmap_and_exit - \\.type __unmap_and_exit, %function - \\__unmap_and_exit: - \\ mov x8, #215 - \\ svc 0 - \\ mov x8, #93 - \\ svc 0 - ), - .mips, - .mipsel, - => ( - \\.set noreorder - \\.global __unmap_and_exit - \\.type __unmap_and_exit,@function - \\__unmap_and_exit: - \\ move $sp, $25 - \\ li $2, 4091 - \\ syscall - \\ li $4, 0 - \\ li $2, 4001 - \\ syscall - ), - .mips64, .mips64el => ( - \\.set noreorder - \\.global __unmap_and_exit - \\.type __unmap_and_exit, @function - \\__unmap_and_exit: - \\ li $2, 4091 - \\ syscall - \\ li $4, 0 - \\ li $2, 4001 - \\ syscall - ), - .powerpc, .powerpc64, .powerpc64le => ( - \\.text - \\.global __unmap_and_exit - \\.type __unmap_and_exit, %function - \\__unmap_and_exit: - \\ li 0, 91 - \\ sc - \\ li 0, 1 - \\ sc - \\ blr - ), - .riscv64 => ( - \\.global __unmap_and_exit - \\.type __unmap_and_exit, %function - \\__unmap_and_exit: - \\ li a7, 215 - \\ ecall - \\ li a7, 93 - \\ ecall - ), - else => |cpu_arch| { - @compileLog("linux arch", cpu_arch, "is not supported"); - }, - }); - } - } }; test "std.Thread" { From c8f90a7e7e10be62634454bf124bef3c6130a0db Mon Sep 17 00:00:00 2001 From: kprotty Date: Sat, 3 Jul 2021 11:49:07 -0500 Subject: [PATCH 24/24] zig fmt --- lib/std/Thread.zig | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/lib/std/Thread.zig b/lib/std/Thread.zig index 37b0a8751f..91f7ff58c3 100644 --- a/lib/std/Thread.zig +++ b/lib/std/Thread.zig @@ -585,8 +585,7 @@ const LinuxThreadImpl = struct { }, }; - asm volatile ( - unmap_and_exit + asm volatile (unmap_and_exit : : [ptr] "r" (@ptrToInt(self.mapped.ptr)), [len] "r" (self.mapped.len)