From 4eb4d26fa14524652bed69325eb491f39701d995 Mon Sep 17 00:00:00 2001 From: Andrew Kelley Date: Mon, 21 Dec 2020 13:15:04 -0700 Subject: [PATCH 01/14] std.Mutex: integrate with pthreads When using pthreads for threading, std.Mutex uses pthread_mutex_t as the implementation. This integrates better with tooling. --- lib/std/c.zig | 1 + lib/std/mutex.zig | 48 +++++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 49 insertions(+) diff --git a/lib/std/c.zig b/lib/std/c.zig index aae3f383d1..b428c0907e 100644 --- a/lib/std/c.zig +++ b/lib/std/c.zig @@ -316,6 +316,7 @@ pub extern "c" fn dn_expand( pub const PTHREAD_MUTEX_INITIALIZER = pthread_mutex_t{}; pub extern "c" fn pthread_mutex_lock(mutex: *pthread_mutex_t) c_int; pub extern "c" fn pthread_mutex_unlock(mutex: *pthread_mutex_t) c_int; +pub extern "c" fn pthread_mutex_trylock(mutex: *pthread_mutex_t) c_int; pub extern "c" fn pthread_mutex_destroy(mutex: *pthread_mutex_t) c_int; pub const PTHREAD_COND_INITIALIZER = pthread_cond_t{}; diff --git a/lib/std/mutex.zig b/lib/std/mutex.zig index fb54e04289..349a250fea 100644 --- a/lib/std/mutex.zig +++ b/lib/std/mutex.zig @@ -37,6 +37,8 @@ pub const Mutex = if (builtin.single_threaded) Dummy else if (builtin.os.tag == .windows) WindowsMutex +else if (std.Thread.use_pthreads) + PthreadMutex else if (builtin.link_libc or builtin.os.tag == .linux) // stack-based version of https://github.com/Amanieu/parking_lot/blob/master/core/src/word_lock.rs struct { @@ -166,6 +168,52 @@ else if (builtin.link_libc or builtin.os.tag == .linux) else SpinLock; +pub const PthreadMutex = struct { + pthread_mutex: std.c.pthread_mutex_t = init, + + pub const Held = struct { + mutex: *PthreadMutex, + + pub fn release(self: Held) void { + switch (std.c.pthread_mutex_unlock(&self.mutex.pthread_mutex)) { + 0 => return, + std.c.EINVAL => unreachable, + std.c.EAGAIN => unreachable, + std.c.EPERM => unreachable, + else => unreachable, + } + } + }; + + /// Create a new mutex in unlocked state. + pub const init = std.c.PTHREAD_MUTEX_INITIALIZER; + + /// Try to acquire the mutex without blocking. Returns null if + /// the mutex is unavailable. Otherwise returns Held. Call + /// release on Held. + pub fn tryAcquire(self: *PthreadMutex) ?Held { + if (std.c.pthread_mutex_trylock(&self.pthread_mutex) == 0) { + return Held{ .mutex = self }; + } else { + return null; + } + } + + /// Acquire the mutex. Will deadlock if the mutex is already + /// held by the calling thread. + pub fn acquire(self: *PthreadMutex) Held { + switch (std.c.pthread_mutex_lock(&self.pthread_mutex)) { + 0 => return Held{ .mutex = self }, + std.c.EINVAL => unreachable, + std.c.EBUSY => unreachable, + std.c.EAGAIN => unreachable, + std.c.EDEADLK => unreachable, + std.c.EPERM => unreachable, + else => unreachable, + } + } +}; + /// This has the sematics as `Mutex`, however it does not actually do any /// synchronization. Operations are safety-checked no-ops. pub const Dummy = struct { From 829c00a77fd2d6b7576c6d2b724f69ba9cfe10f2 Mon Sep 17 00:00:00 2001 From: Andrew Kelley Date: Mon, 21 Dec 2020 14:21:51 -0700 Subject: [PATCH 02/14] kprotty ThreadPool and WaitGroup patch --- CMakeLists.txt | 1 - ci/drone/linux_script | 3 +- src/Event.zig | 43 ------------------------ src/ThreadPool.zig | 76 ++++++++++++++++++++----------------------- src/WaitGroup.zig | 38 ++++++++++++---------- 5 files changed, 57 insertions(+), 104 deletions(-) delete mode 100644 src/Event.zig diff --git a/CMakeLists.txt b/CMakeLists.txt index 8b4c067ae6..272cdc6921 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -512,7 +512,6 @@ set(ZIG_STAGE2_SOURCES "${CMAKE_SOURCE_DIR}/src/Cache.zig" "${CMAKE_SOURCE_DIR}/src/Compilation.zig" "${CMAKE_SOURCE_DIR}/src/DepTokenizer.zig" - "${CMAKE_SOURCE_DIR}/src/Event.zig" "${CMAKE_SOURCE_DIR}/src/Module.zig" "${CMAKE_SOURCE_DIR}/src/Package.zig" "${CMAKE_SOURCE_DIR}/src/RangeSet.zig" diff --git a/ci/drone/linux_script b/ci/drone/linux_script index 8c5dc1be2a..fdc1704fb7 100755 --- a/ci/drone/linux_script +++ b/ci/drone/linux_script @@ -17,8 +17,7 @@ git config core.abbrev 9 mkdir build cd build -# TODO figure out why Drone CI is deadlocking and stop passing -DZIG_SINGLE_THREADED=ON -cmake .. -DCMAKE_BUILD_TYPE=Release "-DCMAKE_INSTALL_PREFIX=$DISTDIR" -DZIG_STATIC=ON -DCMAKE_PREFIX_PATH=/deps/local -GNinja -DZIG_SINGLE_THREADED=ON +cmake .. -DCMAKE_BUILD_TYPE=Release "-DCMAKE_INSTALL_PREFIX=$DISTDIR" -DZIG_STATIC=ON -DCMAKE_PREFIX_PATH=/deps/local -GNinja samu install ./zig build test -Dskip-release -Dskip-non-native diff --git a/src/Event.zig b/src/Event.zig deleted file mode 100644 index 2b8d7be998..0000000000 --- a/src/Event.zig +++ /dev/null @@ -1,43 +0,0 @@ -// SPDX-License-Identifier: MIT -// Copyright (c) 2015-2020 Zig Contributors -// This file is part of [zig](https://ziglang.org/), which is MIT licensed. -// The MIT license requires this copyright notice to be included in all copies -// and substantial portions of the software. -const std = @import("std"); -const Event = @This(); - -lock: std.Mutex = .{}, -event: std.ResetEvent = undefined, -state: enum { empty, waiting, notified } = .empty, - -pub fn wait(self: *Event) void { - const held = self.lock.acquire(); - - switch (self.state) { - .empty => { - self.state = .waiting; - self.event = @TypeOf(self.event).init(); - held.release(); - self.event.wait(); - self.event.deinit(); - }, - .waiting => unreachable, - .notified => held.release(), - } -} - -pub fn set(self: *Event) void { - const held = self.lock.acquire(); - - switch (self.state) { - .empty => { - self.state = .notified; - held.release(); - }, - .waiting => { - held.release(); - self.event.set(); - }, - .notified => unreachable, - } -} diff --git a/src/ThreadPool.zig b/src/ThreadPool.zig index 00cb26772a..71c72fb8da 100644 --- a/src/ThreadPool.zig +++ b/src/ThreadPool.zig @@ -9,12 +9,12 @@ const ThreadPool = @This(); lock: std.Mutex = .{}, is_running: bool = true, allocator: *std.mem.Allocator, -running: usize = 0, +spawned: usize = 0, threads: []*std.Thread, run_queue: RunQueue = .{}, idle_queue: IdleQueue = .{}, -const IdleQueue = std.SinglyLinkedList(std.AutoResetEvent); +const IdleQueue = std.SinglyLinkedList(std.ResetEvent); const RunQueue = std.SinglyLinkedList(Runnable); const Runnable = struct { runFn: fn (*Runnable) void, @@ -30,49 +30,37 @@ pub fn init(self: *ThreadPool, allocator: *std.mem.Allocator) !void { errdefer self.deinit(); - var num_threads = std.Thread.cpuCount() catch 1; - if (num_threads > 0) - self.threads = try allocator.alloc(*std.Thread, num_threads); + var num_threads = std.math.max(1, std.Thread.cpuCount() catch 1); + self.threads = try allocator.alloc(*std.Thread, num_threads); while (num_threads > 0) : (num_threads -= 1) { const thread = try std.Thread.spawn(self, runWorker); - self.threads[self.running] = thread; - self.running += 1; + self.threads[self.spawned] = thread; + self.spawned += 1; } } pub fn deinit(self: *ThreadPool) void { - self.shutdown(); + { + const held = self.lock.acquire(); + defer held.release(); - std.debug.assert(!self.is_running); - for (self.threads[0..self.running]) |thread| + self.is_running = false; + while (self.idle_queue.popFirst()) |idle_node| + idle_node.data.set(); + } + + defer self.allocator.free(self.threads); + for (self.threads[0..self.spawned]) |thread| thread.wait(); - - defer self.threads = &[_]*std.Thread{}; - if (self.running > 0) - self.allocator.free(self.threads); -} - -pub fn shutdown(self: *ThreadPool) void { - const held = self.lock.acquire(); - - if (!self.is_running) - return held.release(); - - var idle_queue = self.idle_queue; - self.idle_queue = .{}; - self.is_running = false; - held.release(); - - while (idle_queue.popFirst()) |idle_node| - idle_node.data.set(); } pub fn spawn(self: *ThreadPool, comptime func: anytype, args: anytype) !void { if (std.builtin.single_threaded) { - @call(.{}, func, args); + const result = @call(.{}, func, args); return; } + const Args = @TypeOf(args); const Closure = struct { arguments: Args, @@ -83,24 +71,26 @@ pub fn spawn(self: *ThreadPool, comptime func: anytype, args: anytype) !void { const run_node = @fieldParentPtr(RunQueue.Node, "data", runnable); const closure = @fieldParentPtr(@This(), "run_node", run_node); const result = @call(.{}, func, closure.arguments); + + const held = closure.pool.lock.acquire(); + defer held.release(); closure.pool.allocator.destroy(closure); } }; + const held = self.lock.acquire(); + defer held.release(); + const closure = try self.allocator.create(Closure); closure.* = .{ .arguments = args, .pool = self, }; - const held = self.lock.acquire(); self.run_queue.prepend(&closure.run_node); - const idle_node = self.idle_queue.popFirst(); - held.release(); - - if (idle_node) |node| - node.data.set(); + if (self.idle_queue.popFirst()) |idle_node| + idle_node.data.set(); } fn runWorker(self: *ThreadPool) void { @@ -113,14 +103,18 @@ fn runWorker(self: *ThreadPool) void { continue; } - if (!self.is_running) { + if (self.is_running) { + var idle_node = IdleQueue.Node{ .data = std.ResetEvent.init() }; + defer idle_node.data.deinit(); + + self.idle_queue.prepend(&idle_node); held.release(); - return; + + idle_node.data.wait(); + continue; } - var idle_node = IdleQueue.Node{ .data = .{} }; - self.idle_queue.prepend(&idle_node); held.release(); - idle_node.data.wait(); + return; } } diff --git a/src/WaitGroup.zig b/src/WaitGroup.zig index 2c1b49224b..e5d4e600e2 100644 --- a/src/WaitGroup.zig +++ b/src/WaitGroup.zig @@ -5,11 +5,10 @@ // and substantial portions of the software. const std = @import("std"); const WaitGroup = @This(); -const Event = @import("Event.zig"); lock: std.Mutex = .{}, counter: usize = 0, -event: ?*Event = null, +event: ?*std.ResetEvent = null, pub fn start(self: *WaitGroup) void { const held = self.lock.acquire(); @@ -19,28 +18,33 @@ pub fn start(self: *WaitGroup) void { } pub fn stop(self: *WaitGroup) void { - var event: ?*Event = null; - defer if (event) |waiter| - waiter.set(); - const held = self.lock.acquire(); defer held.release(); self.counter -= 1; - if (self.counter == 0) - std.mem.swap(?*Event, &self.event, &event); + + if (self.counter == 0) { + if (self.event) |event| { + self.event = null; + event.set(); + } + } } pub fn wait(self: *WaitGroup) void { - var event = Event{}; - var has_event = false; - defer if (has_event) - event.wait(); - const held = self.lock.acquire(); - defer held.release(); - has_event = self.counter != 0; - if (has_event) - self.event = &event; + if (self.counter == 0) { + held.release(); + return; + } + + var event = std.ResetEvent.init(); + defer event.deinit(); + + std.debug.assert(self.event == null); + self.event = &event; + + held.release(); + event.wait(); } From 028af97df46f1b856047cc146739cc5066e65bab Mon Sep 17 00:00:00 2001 From: Andrew Kelley Date: Mon, 21 Dec 2020 15:17:12 -0700 Subject: [PATCH 03/14] std.ResetEvent: use sem_t when linking against pthreads --- lib/std/c.zig | 6 ++ lib/std/c/linux.zig | 32 +++++++++ lib/std/reset_event.zig | 151 +++++++++++++++++----------------------- 3 files changed, 100 insertions(+), 89 deletions(-) diff --git a/lib/std/c.zig b/lib/std/c.zig index b428c0907e..2ba16b2cfc 100644 --- a/lib/std/c.zig +++ b/lib/std/c.zig @@ -270,6 +270,12 @@ pub extern "c" fn pthread_atfork( parent: ?fn () callconv(.C) void, child: ?fn () callconv(.C) void, ) c_int; +pub extern "c" fn sem_init(sem: *sem_t, pshared: c_int, value: c_uint) c_int; +pub extern "c" fn sem_destroy(sem: *sem_t) c_int; +pub extern "c" fn sem_post(sem: *sem_t) c_int; +pub extern "c" fn sem_wait(sem: *sem_t) c_int; +pub extern "c" fn sem_trywait(sem: *sem_t) c_int; +pub extern "c" fn sem_timedwait(sem: *sem_t, abs_timeout: *const timespec) c_int; pub extern "c" fn kqueue() c_int; pub extern "c" fn kevent( diff --git a/lib/std/c/linux.zig b/lib/std/c/linux.zig index 97a25617ef..76d0ddb9e0 100644 --- a/lib/std/c/linux.zig +++ b/lib/std/c/linux.zig @@ -135,6 +135,38 @@ const __SIZEOF_PTHREAD_MUTEX_T = if (builtin.os.tag == .fuchsia) 40 else switch else => unreachable, }; +pub const sem_t = switch (builtin.abi) { + .musl, .musleabi, .musleabihf => extern struct { + __val: [4 * @sizeOf(c_long) / @sizeOf(c_int)]c_int, + + pub fn init(pshared: c_int, value: c_uint) @This() { + var result: @This() = undefined; + result.__val[0] = @bitCast(c_int, value); + result.__val[1] = 0; + result.__val[2] = if (pshared != 0) 0 else 128; + return result; + } + }, + .gnu, .gnuabin32, .gnuabi64, .gnueabi, .gnueabihf, .gnux32 => extern struct { + __lock: c_int, + __queue: ?*pthread_t, + __pshared: c_int, + __value: c_int, + __data: ?*c_void, + + pub fn init(pshared: c_int, value: c_uint) @This() { + return .{ + .__lock = 0, + .__queue = null, + .__pshared = pshared, + .__value = @bitCast(c_int, value), + .__data = null, + }; + } + }, + else => unreachable, +}; + pub const RTLD_LAZY = 1; pub const RTLD_NOW = 2; pub const RTLD_NOLOAD = 4; diff --git a/lib/std/reset_event.zig b/lib/std/reset_event.zig index 5da53985c6..f0655e4e6a 100644 --- a/lib/std/reset_event.zig +++ b/lib/std/reset_event.zig @@ -21,7 +21,7 @@ pub const ResetEvent = struct { pub const OsEvent = if (builtin.single_threaded) DebugEvent - else if (builtin.link_libc and builtin.os.tag != .windows and builtin.os.tag != .linux) + else if (std.Thread.use_pthreads) PosixEvent else AtomicEvent; @@ -34,11 +34,6 @@ pub const ResetEvent = struct { self.os_event.deinit(); } - /// Returns whether or not the event is currenetly set - pub fn isSet(self: *ResetEvent) bool { - return self.os_event.isSet(); - } - /// Sets the event if not already set and /// wakes up all the threads waiting on the event. pub fn set(self: *ResetEvent) void { @@ -46,20 +41,28 @@ pub const ResetEvent = struct { } /// Resets the event to its original, unset state. + /// TODO improve these docs: + /// * under what circumstances does it make sense to call this function? pub fn reset(self: *ResetEvent) void { return self.os_event.reset(); } /// Wait for the event to be set by blocking the current thread. + /// TODO improve these docs: + /// * is the function thread-safe? + /// * does it have suprious wakeups? pub fn wait(self: *ResetEvent) void { - return self.os_event.wait(null) catch unreachable; + return self.os_event.wait(); } /// Wait for the event to be set by blocking the current thread. /// A timeout in nanoseconds can be provided as a hint for how /// long the thread should block on the unset event before throwing error.TimedOut. + /// TODO improve these docs: + /// * is the function thread-safe? + /// * does it have suprious wakeups? pub fn timedWait(self: *ResetEvent, timeout_ns: u64) !void { - return self.os_event.wait(timeout_ns); + return self.os_event.timedWait(timeout_ns); } }; @@ -74,10 +77,6 @@ const DebugEvent = struct { self.* = undefined; } - fn isSet(self: *DebugEvent) bool { - return self.is_set; - } - fn reset(self: *DebugEvent) void { self.is_set = false; } @@ -86,101 +85,75 @@ const DebugEvent = struct { self.is_set = true; } - fn wait(self: *DebugEvent, timeout: ?u64) !void { + fn wait(self: *DebugEvent) void { if (self.is_set) return; - if (timeout != null) - return error.TimedOut; + @panic("deadlock detected"); } + + fn timedWait(self: *DebugEvent, timeout: u64) !void { + if (self.is_set) + return; + + return error.TimedOut; + } }; const PosixEvent = struct { - is_set: bool, - cond: c.pthread_cond_t, - mutex: c.pthread_mutex_t, + sem: c.sem_t, fn init() PosixEvent { return PosixEvent{ - .is_set = false, - .cond = c.PTHREAD_COND_INITIALIZER, - .mutex = c.PTHREAD_MUTEX_INITIALIZER, + .sem = c.sem_t.init(0, 0), }; } fn deinit(self: *PosixEvent) void { - // on dragonfly or openbsd, *destroy() functions can return EINVAL - // for statically initialized pthread structures - const err = if (builtin.os.tag == .dragonfly or builtin.os.tag == .openbsd) - os.EINVAL - else - 0; - - const retm = c.pthread_mutex_destroy(&self.mutex); - assert(retm == 0 or retm == err); - const retc = c.pthread_cond_destroy(&self.cond); - assert(retc == 0 or retc == err); - } - - fn isSet(self: *PosixEvent) bool { - assert(c.pthread_mutex_lock(&self.mutex) == 0); - defer assert(c.pthread_mutex_unlock(&self.mutex) == 0); - - return self.is_set; + assert(c.sem_destroy(&self.sem) == 0); } fn reset(self: *PosixEvent) void { - assert(c.pthread_mutex_lock(&self.mutex) == 0); - defer assert(c.pthread_mutex_unlock(&self.mutex) == 0); - - self.is_set = false; + self.deinit(); + assert(c.sem_init(&self.sem, 0, 0) == 0); } fn set(self: *PosixEvent) void { - assert(c.pthread_mutex_lock(&self.mutex) == 0); - defer assert(c.pthread_mutex_unlock(&self.mutex) == 0); + assert(c.sem_post(&self.sem) == 0); + } - if (!self.is_set) { - self.is_set = true; - assert(c.pthread_cond_broadcast(&self.cond) == 0); + fn wait(self: *PosixEvent) void { + while (true) { + switch (c.getErrno(c.sem_wait(&self.sem))) { + 0 => return, + c.EINTR => continue, + c.EINVAL => unreachable, + else => unreachable, + } } } - fn wait(self: *PosixEvent, timeout: ?u64) !void { - assert(c.pthread_mutex_lock(&self.mutex) == 0); - defer assert(c.pthread_mutex_unlock(&self.mutex) == 0); - - // quick guard before possibly calling time syscalls below - if (self.is_set) - return; - + fn timedWait(self: *PosixEvent, timeout_ns: u64) !void { var ts: os.timespec = undefined; - if (timeout) |timeout_ns| { - var timeout_abs = timeout_ns; - if (comptime std.Target.current.isDarwin()) { - var tv: os.darwin.timeval = undefined; - assert(os.darwin.gettimeofday(&tv, null) == 0); - timeout_abs += @intCast(u64, tv.tv_sec) * time.ns_per_s; - timeout_abs += @intCast(u64, tv.tv_usec) * time.ns_per_us; - } else { - os.clock_gettime(os.CLOCK_REALTIME, &ts) catch unreachable; - timeout_abs += @intCast(u64, ts.tv_sec) * time.ns_per_s; - timeout_abs += @intCast(u64, ts.tv_nsec); - } - ts.tv_sec = @intCast(@TypeOf(ts.tv_sec), @divFloor(timeout_abs, time.ns_per_s)); - ts.tv_nsec = @intCast(@TypeOf(ts.tv_nsec), @mod(timeout_abs, time.ns_per_s)); + var timeout_abs = timeout_ns; + if (comptime std.Target.current.isDarwin()) { + var tv: os.darwin.timeval = undefined; + assert(os.darwin.gettimeofday(&tv, null) == 0); + timeout_abs += @intCast(u64, tv.tv_sec) * time.ns_per_s; + timeout_abs += @intCast(u64, tv.tv_usec) * time.ns_per_us; + } else { + os.clock_gettime(os.CLOCK_REALTIME, &ts) catch return error.TimedOut; + timeout_abs += @intCast(u64, ts.tv_sec) * time.ns_per_s; + timeout_abs += @intCast(u64, ts.tv_nsec); } - - while (!self.is_set) { - const rc = switch (timeout == null) { - true => c.pthread_cond_wait(&self.cond, &self.mutex), - else => c.pthread_cond_timedwait(&self.cond, &self.mutex, &ts), - }; - switch (rc) { - 0 => {}, - os.ETIMEDOUT => return error.TimedOut, - os.EINVAL => unreachable, - os.EPERM => unreachable, + ts.tv_sec = @intCast(@TypeOf(ts.tv_sec), @divFloor(timeout_abs, time.ns_per_s)); + ts.tv_nsec = @intCast(@TypeOf(ts.tv_nsec), @mod(timeout_abs, time.ns_per_s)); + while (true) { + switch (c.getErrno(c.sem_timedwait(&self.sem, &ts))) { + 0 => return, + c.EINTR => continue, + c.EINVAL => unreachable, + c.ETIMEDOUT => return error.TimedOut, else => unreachable, } } @@ -201,10 +174,6 @@ const AtomicEvent = struct { self.* = undefined; } - fn isSet(self: *const AtomicEvent) bool { - return @atomicLoad(u32, &self.waiters, .Acquire) == WAKE; - } - fn reset(self: *AtomicEvent) void { @atomicStore(u32, &self.waiters, 0, .Monotonic); } @@ -216,7 +185,11 @@ const AtomicEvent = struct { } } - fn wait(self: *AtomicEvent, timeout: ?u64) !void { + fn wait(self: *AtomicEvent) void { + return self.timedWait(null) catch unreachable; + } + + fn timedWait(self: *AtomicEvent, timeout: ?u64) !void { var waiters = @atomicLoad(u32, &self.waiters, .Acquire); while (waiters != WAKE) { waiters = @cmpxchgWeak(u32, &self.waiters, waiters, waiters + WAIT, .Acquire, .Acquire) orelse return Futex.wait(&self.waiters, timeout); @@ -367,17 +340,17 @@ test "ResetEvent" { defer event.deinit(); // test event setting - testing.expect(event.isSet() == false); event.set(); - testing.expect(event.isSet() == true); // test event resetting event.reset(); - testing.expect(event.isSet() == false); // test event waiting (non-blocking) event.set(); event.wait(); + event.reset(); + + event.set(); try event.timedWait(1); // test cross-thread signaling From 19459840fe0a1dcc39a4552a430a70fbf39b52ea Mon Sep 17 00:00:00 2001 From: Andrew Kelley Date: Mon, 21 Dec 2020 16:42:53 -0700 Subject: [PATCH 04/14] std.ResetEvent: pthreads sem_t cannot be statically initialized because it is allowed for the implementation to use a file descriptor, which would require making a syscall at runtime. --- lib/std/c/linux.zig | 37 ++++-------------------------- lib/std/reset_event.zig | 51 +++++++++++++++++++++++++++++++++-------- 2 files changed, 47 insertions(+), 41 deletions(-) diff --git a/lib/std/c/linux.zig b/lib/std/c/linux.zig index 76d0ddb9e0..8cc8a7cd72 100644 --- a/lib/std/c/linux.zig +++ b/lib/std/c/linux.zig @@ -123,6 +123,10 @@ pub const pthread_mutex_t = extern struct { pub const pthread_cond_t = extern struct { size: [__SIZEOF_PTHREAD_COND_T]u8 align(@alignOf(usize)) = [_]u8{0} ** __SIZEOF_PTHREAD_COND_T, }; +pub const sem_t = extern struct { + __size: [__SIZEOF_SEM_T]u8 align(@alignOf(usize)), +}; + const __SIZEOF_PTHREAD_COND_T = 48; const __SIZEOF_PTHREAD_MUTEX_T = if (builtin.os.tag == .fuchsia) 40 else switch (builtin.abi) { .musl, .musleabi, .musleabihf => if (@sizeOf(usize) == 8) 40 else 24, @@ -134,38 +138,7 @@ const __SIZEOF_PTHREAD_MUTEX_T = if (builtin.os.tag == .fuchsia) 40 else switch }, else => unreachable, }; - -pub const sem_t = switch (builtin.abi) { - .musl, .musleabi, .musleabihf => extern struct { - __val: [4 * @sizeOf(c_long) / @sizeOf(c_int)]c_int, - - pub fn init(pshared: c_int, value: c_uint) @This() { - var result: @This() = undefined; - result.__val[0] = @bitCast(c_int, value); - result.__val[1] = 0; - result.__val[2] = if (pshared != 0) 0 else 128; - return result; - } - }, - .gnu, .gnuabin32, .gnuabi64, .gnueabi, .gnueabihf, .gnux32 => extern struct { - __lock: c_int, - __queue: ?*pthread_t, - __pshared: c_int, - __value: c_int, - __data: ?*c_void, - - pub fn init(pshared: c_int, value: c_uint) @This() { - return .{ - .__lock = 0, - .__queue = null, - .__pshared = pshared, - .__value = @bitCast(c_int, value), - .__data = null, - }; - } - }, - else => unreachable, -}; +const __SIZEOF_SEM_T = 4 * @sizeOf(usize); pub const RTLD_LAZY = 1; pub const RTLD_NOW = 2; diff --git a/lib/std/reset_event.zig b/lib/std/reset_event.zig index f0655e4e6a..cdbad71c75 100644 --- a/lib/std/reset_event.zig +++ b/lib/std/reset_event.zig @@ -101,30 +101,48 @@ const DebugEvent = struct { }; const PosixEvent = struct { - sem: c.sem_t, + sem: c.sem_t = undefined, + /// Sadly this is needed because pthreads semaphore API does not + /// support static initialization. + init_mutex: std.mutex.PthreadMutex = .{}, + state: enum { uninit, init } = .uninit, fn init() PosixEvent { - return PosixEvent{ - .sem = c.sem_t.init(0, 0), - }; + return .{}; } + /// Not thread-safe. fn deinit(self: *PosixEvent) void { - assert(c.sem_destroy(&self.sem) == 0); + switch (self.state) { + .uninit => {}, + .init => { + assert(c.sem_destroy(&self.sem) == 0); + }, + } + self.* = undefined; } fn reset(self: *PosixEvent) void { - self.deinit(); - assert(c.sem_init(&self.sem, 0, 0) == 0); + const sem = self.getInitializedSem(); + while (true) { + switch (c.getErrno(c.sem_trywait(sem))) { + 0 => continue, // Need to make it go to zero. + c.EINTR => continue, + c.EINVAL => unreachable, + c.EAGAIN => return, // The semaphore currently has the value zero. + else => unreachable, + } + } } fn set(self: *PosixEvent) void { - assert(c.sem_post(&self.sem) == 0); + assert(c.sem_post(self.getInitializedSem()) == 0); } fn wait(self: *PosixEvent) void { + const sem = self.getInitializedSem(); while (true) { - switch (c.getErrno(c.sem_wait(&self.sem))) { + switch (c.getErrno(c.sem_wait(sem))) { 0 => return, c.EINTR => continue, c.EINVAL => unreachable, @@ -148,6 +166,7 @@ const PosixEvent = struct { } ts.tv_sec = @intCast(@TypeOf(ts.tv_sec), @divFloor(timeout_abs, time.ns_per_s)); ts.tv_nsec = @intCast(@TypeOf(ts.tv_nsec), @mod(timeout_abs, time.ns_per_s)); + const sem = self.getInitializedSem(); while (true) { switch (c.getErrno(c.sem_timedwait(&self.sem, &ts))) { 0 => return, @@ -158,6 +177,20 @@ const PosixEvent = struct { } } } + + fn getInitializedSem(self: *PosixEvent) *c.sem_t { + const held = self.init_mutex.acquire(); + defer held.release(); + + switch (self.state) { + .init => return &self.sem, + .uninit => { + self.state = .init; + assert(c.sem_init(&self.sem, 0, 0) == 0); + return &self.sem; + }, + } + } }; const AtomicEvent = struct { From 78ab1f609a4fb2514d8223e8ce6bf9195c01b5d6 Mon Sep 17 00:00:00 2001 From: Andrew Kelley Date: Mon, 21 Dec 2020 18:23:47 -0700 Subject: [PATCH 05/14] apply kprotty's WaitGroup fix --- src/WaitGroup.zig | 26 ++++++++++++++------------ 1 file changed, 14 insertions(+), 12 deletions(-) diff --git a/src/WaitGroup.zig b/src/WaitGroup.zig index e5d4e600e2..6a0b12d050 100644 --- a/src/WaitGroup.zig +++ b/src/WaitGroup.zig @@ -32,19 +32,21 @@ pub fn stop(self: *WaitGroup) void { } pub fn wait(self: *WaitGroup) void { - const held = self.lock.acquire(); + while (true) { + const held = self.lock.acquire(); + + if (self.counter == 0) { + held.release(); + return; + } + + var event = std.ResetEvent.init(); + defer event.deinit(); + + std.debug.assert(self.event == null); + self.event = &event; - if (self.counter == 0) { held.release(); - return; + event.wait(); } - - var event = std.ResetEvent.init(); - defer event.deinit(); - - std.debug.assert(self.event == null); - self.event = &event; - - held.release(); - event.wait(); } From f2ab9512af0154a291ce11ab0a1298fbf778d751 Mon Sep 17 00:00:00 2001 From: Andrew Kelley Date: Mon, 21 Dec 2020 18:24:11 -0700 Subject: [PATCH 06/14] std.valgrind: add helgrind functions --- lib/std/valgrind.zig | 76 ++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 76 insertions(+) diff --git a/lib/std/valgrind.zig b/lib/std/valgrind.zig index 5373a2d513..e1a99e3083 100644 --- a/lib/std/valgrind.zig +++ b/lib/std/valgrind.zig @@ -80,6 +80,70 @@ pub const ClientRequest = extern enum { ChangeErrDisablement = 6145, VexInitForIri = 6401, InnerThreads = 6402, + + CLEAN_MEMORY = 1212612608, + SET_MY_PTHREAD_T = 1212612864, + PTH_API_ERROR = 1212612865, + PTHREAD_JOIN_POST = 1212612866, + PTHREAD_MUTEX_INIT_POST = 1212612867, + PTHREAD_MUTEX_DESTROY_PRE = 1212612868, + PTHREAD_MUTEX_UNLOCK_PRE = 1212612869, + PTHREAD_MUTEX_UNLOCK_POST = 1212612870, + PTHREAD_MUTEX_ACQUIRE_PRE = 1212612871, + PTHREAD_MUTEX_ACQUIRE_POST = 1212612872, + PTHREAD_COND_SIGNAL_PRE = 1212612873, + PTHREAD_COND_BROADCAST_PRE = 1212612874, + PTHREAD_COND_WAIT_PRE = 1212612875, + PTHREAD_COND_WAIT_POST = 1212612876, + PTHREAD_COND_DESTROY_PRE = 1212612877, + PTHREAD_RWLOCK_INIT_POST = 1212612878, + PTHREAD_RWLOCK_DESTROY_PRE = 1212612879, + PTHREAD_RWLOCK_LOCK_PRE = 1212612880, + PTHREAD_RWLOCK_ACQUIRED = 1212612881, + PTHREAD_RWLOCK_RELEASED = 1212612882, + PTHREAD_RWLOCK_UNLOCK_POST = 1212612883, + POSIX_SEM_INIT_POST = 1212612884, + POSIX_SEM_DESTROY_PRE = 1212612885, + POSIX_SEM_RELEASED = 1212612886, + POSIX_SEM_ACQUIRED = 1212612887, + PTHREAD_BARRIER_INIT_PRE = 1212612888, + PTHREAD_BARRIER_WAIT_PRE = 1212612889, + PTHREAD_BARRIER_DESTROY_PRE = 1212612890, + PTHREAD_SPIN_INIT_OR_UNLOCK_PRE = 1212612891, + PTHREAD_SPIN_INIT_OR_UNLOCK_POST = 1212612892, + PTHREAD_SPIN_LOCK_PRE = 1212612893, + PTHREAD_SPIN_LOCK_POST = 1212612894, + PTHREAD_SPIN_DESTROY_PRE = 1212612895, + CLIENTREQ_UNIMP = 1212612896, + USERSO_SEND_PRE = 1212612897, + USERSO_RECV_POST = 1212612898, + USERSO_FORGET_ALL = 1212612899, + RESERVED2 = 1212612900, + RESERVED3 = 1212612901, + RESERVED4 = 1212612902, + ARANGE_MAKE_UNTRACKED = 1212612903, + ARANGE_MAKE_TRACKED = 1212612904, + PTHREAD_BARRIER_RESIZE_PRE = 1212612905, + CLEAN_MEMORY_HEAPBLOCK = 1212612906, + PTHREAD_COND_INIT_POST = 1212612907, + GNAT_MASTER_HOOK = 1212612908, + GNAT_MASTER_COMPLETED_HOOK = 1212612909, + GET_ABITS = 1212612910, + PTHREAD_CREATE_BEGIN = 1212612911, + PTHREAD_CREATE_END = 1212612912, + PTHREAD_MUTEX_LOCK_PRE = 1212612913, + PTHREAD_MUTEX_LOCK_POST = 1212612914, + PTHREAD_RWLOCK_LOCK_POST = 1212612915, + PTHREAD_RWLOCK_UNLOCK_PRE = 1212612916, + POSIX_SEM_POST_PRE = 1212612917, + POSIX_SEM_POST_POST = 1212612918, + POSIX_SEM_WAIT_PRE = 1212612919, + POSIX_SEM_WAIT_POST = 1212612920, + PTHREAD_COND_SIGNAL_POST = 1212612921, + PTHREAD_COND_BROADCAST_POST = 1212612922, + RTLD_BIND_GUARD = 1212612923, + RTLD_BIND_CLEAR = 1212612924, + GNAT_DEPENDENT_MASTER_JOIN = 1212612925, }; pub fn ToolBase(base: [2]u8) u32 { return (@as(u32, base[0] & 0xff) << 24) | (@as(u32, base[1] & 0xff) << 16); @@ -259,6 +323,18 @@ pub fn monitorCommand(command: [*]u8) bool { return doClientRequestExpr(0, .GdbMonitorCommand, @ptrToInt(command.ptr), 0, 0, 0, 0) != 0; } +pub fn annotateHappensBefore(obj: *c_void) void { + doClientRequestStmt(.USERSO_SEND_PRE, @ptrToInt(obj), 0, 0, 0, 0); +} + +pub fn annotateHappensAfter(obj: *c_void) void { + doClientRequestStmt(.USERSO_RECV_POST, @ptrToInt(obj), 0, 0, 0, 0); +} + +pub fn annotateHappensBeforeForgetAll(obj: *c_void) void { + doClientRequestStmt(.USERSO_FORGET_ALL, @ptrToInt(obj), 0, 0, 0, 0); +} + pub const memcheck = @import("valgrind/memcheck.zig"); pub const callgrind = @import("valgrind/callgrind.zig"); From b2e1bce2405cc4ff15d660f788db1aed35c890d5 Mon Sep 17 00:00:00 2001 From: Andrew Kelley Date: Mon, 21 Dec 2020 18:24:20 -0700 Subject: [PATCH 07/14] minor code readability changes --- lib/std/reset_event.zig | 3 ++- src/ThreadPool.zig | 2 +- 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/lib/std/reset_event.zig b/lib/std/reset_event.zig index cdbad71c75..7df797f955 100644 --- a/lib/std/reset_event.zig +++ b/lib/std/reset_event.zig @@ -136,7 +136,8 @@ const PosixEvent = struct { } fn set(self: *PosixEvent) void { - assert(c.sem_post(self.getInitializedSem()) == 0); + const sem = self.getInitializedSem(); + assert(c.sem_post(sem) == 0); } fn wait(self: *PosixEvent) void { diff --git a/src/ThreadPool.zig b/src/ThreadPool.zig index 71c72fb8da..cf9c02fa59 100644 --- a/src/ThreadPool.zig +++ b/src/ThreadPool.zig @@ -105,12 +105,12 @@ fn runWorker(self: *ThreadPool) void { if (self.is_running) { var idle_node = IdleQueue.Node{ .data = std.ResetEvent.init() }; - defer idle_node.data.deinit(); self.idle_queue.prepend(&idle_node); held.release(); idle_node.data.wait(); + idle_node.data.deinit(); continue; } From a368c0d099a4eb43e9794d3c458a051e89a6d82e Mon Sep 17 00:00:00 2001 From: Andrew Kelley Date: Mon, 21 Dec 2020 18:38:49 -0700 Subject: [PATCH 08/14] std: add Darwin and FreeBSD sem_t bits --- lib/std/c/darwin.zig | 1 + lib/std/c/freebsd.zig | 9 +++++++++ 2 files changed, 10 insertions(+) diff --git a/lib/std/c/darwin.zig b/lib/std/c/darwin.zig index 635e0f97d4..9c5056d9c4 100644 --- a/lib/std/c/darwin.zig +++ b/lib/std/c/darwin.zig @@ -177,6 +177,7 @@ pub const pthread_cond_t = extern struct { __sig: c_long = 0x3CB0B1BB, __opaque: [__PTHREAD_COND_SIZE__]u8 = [_]u8{0} ** __PTHREAD_COND_SIZE__, }; +pub const sem_t = c_int; const __PTHREAD_MUTEX_SIZE__ = if (@sizeOf(usize) == 8) 56 else 40; const __PTHREAD_COND_SIZE__ = if (@sizeOf(usize) == 8) 40 else 24; diff --git a/lib/std/c/freebsd.zig b/lib/std/c/freebsd.zig index 8fa78b0d6f..4ca8234110 100644 --- a/lib/std/c/freebsd.zig +++ b/lib/std/c/freebsd.zig @@ -47,6 +47,15 @@ pub const pthread_attr_t = extern struct { __align: c_long, }; +pub const sem_t = extern struct { + _magic: u32, + _kern: extern struct { + _count: u32, + _flags: u32, + }, + _padding: u32, +}; + pub const EAI = extern enum(c_int) { /// address family for hostname not supported ADDRFAMILY = 1, From 485ec0884ff81a23ca5460a6d07d4e2c2d6b81cb Mon Sep 17 00:00:00 2001 From: Andrew Kelley Date: Mon, 21 Dec 2020 19:44:16 -0700 Subject: [PATCH 09/14] restore std.ResetEvent.isSet functionality --- lib/std/c.zig | 1 + lib/std/reset_event.zig | 26 ++++++++++++++++++++++++++ 2 files changed, 27 insertions(+) diff --git a/lib/std/c.zig b/lib/std/c.zig index 2ba16b2cfc..9579662151 100644 --- a/lib/std/c.zig +++ b/lib/std/c.zig @@ -276,6 +276,7 @@ pub extern "c" fn sem_post(sem: *sem_t) c_int; pub extern "c" fn sem_wait(sem: *sem_t) c_int; pub extern "c" fn sem_trywait(sem: *sem_t) c_int; pub extern "c" fn sem_timedwait(sem: *sem_t, abs_timeout: *const timespec) c_int; +pub extern "c" fn sem_getvalue(sem: *sem_t, sval: *c_int) c_int; pub extern "c" fn kqueue() c_int; pub extern "c" fn kevent( diff --git a/lib/std/reset_event.zig b/lib/std/reset_event.zig index 7df797f955..58dc40994d 100644 --- a/lib/std/reset_event.zig +++ b/lib/std/reset_event.zig @@ -34,6 +34,14 @@ pub const ResetEvent = struct { self.os_event.deinit(); } + /// When `wait` would return without blocking, this returns `true`. + /// Note that the value may be immediately invalid upon this function's + /// return, because another thread may call `wait` in between, changing + /// the event's set/cleared status. + pub fn isSet(self: *ResetEvent) bool { + return self.os_event.isSet(); + } + /// Sets the event if not already set and /// wakes up all the threads waiting on the event. pub fn set(self: *ResetEvent) void { @@ -77,6 +85,10 @@ const DebugEvent = struct { self.* = undefined; } + fn isSet(self: *DebugEvent) bool { + return self.is_set; + } + fn reset(self: *DebugEvent) void { self.is_set = false; } @@ -122,6 +134,13 @@ const PosixEvent = struct { self.* = undefined; } + fn isSet(self: *PosixEvent) bool { + const sem = self.getInitializedSem(); + var val: c_int = undefined; + assert(c.sem_getvalue(sem, &val) == 0); + return val > 0; + } + fn reset(self: *PosixEvent) void { const sem = self.getInitializedSem(); while (true) { @@ -208,6 +227,10 @@ const AtomicEvent = struct { self.* = undefined; } + fn isSet(self: *const AtomicEvent) bool { + return @atomicLoad(u32, &self.waiters, .Acquire) == WAKE; + } + fn reset(self: *AtomicEvent) void { @atomicStore(u32, &self.waiters, 0, .Monotonic); } @@ -374,10 +397,13 @@ test "ResetEvent" { defer event.deinit(); // test event setting + testing.expect(!event.isSet()); event.set(); + testing.expect(event.isSet()); // test event resetting event.reset(); + testing.expect(!event.isSet()); // test event waiting (non-blocking) event.set(); From f4d82f0ad6b10881c0831595ad0e957094892a45 Mon Sep 17 00:00:00 2001 From: Andrew Kelley Date: Mon, 21 Dec 2020 20:44:36 -0700 Subject: [PATCH 10/14] std.Progress: work around time going backwards --- lib/std/Progress.zig | 3 +++ 1 file changed, 3 insertions(+) diff --git a/lib/std/Progress.zig b/lib/std/Progress.zig index ae9b1783be..6e683c673e 100644 --- a/lib/std/Progress.zig +++ b/lib/std/Progress.zig @@ -160,6 +160,9 @@ pub fn maybeRefresh(self: *Progress) void { if (now < self.initial_delay_ns) return; const held = self.update_lock.tryAcquire() orelse return; defer held.release(); + // TODO I have observed this to happen sometimes. I think we need to follow Rust's + // lead and guarantee monotonically increasing times in the std lib itself. + if (now < self.prev_refresh_timestamp) return; if (now - self.prev_refresh_timestamp < self.refresh_rate_ns) return; return self.refreshWithHeldLock(); } From 5377b7fb97311448daa3c29a8c8f100656d871ba Mon Sep 17 00:00:00 2001 From: Andrew Kelley Date: Mon, 21 Dec 2020 22:25:03 -0700 Subject: [PATCH 11/14] put the drone CI workaround back in this branch does not solve it --- ci/drone/linux_script | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/ci/drone/linux_script b/ci/drone/linux_script index fdc1704fb7..8c5dc1be2a 100755 --- a/ci/drone/linux_script +++ b/ci/drone/linux_script @@ -17,7 +17,8 @@ git config core.abbrev 9 mkdir build cd build -cmake .. -DCMAKE_BUILD_TYPE=Release "-DCMAKE_INSTALL_PREFIX=$DISTDIR" -DZIG_STATIC=ON -DCMAKE_PREFIX_PATH=/deps/local -GNinja +# TODO figure out why Drone CI is deadlocking and stop passing -DZIG_SINGLE_THREADED=ON +cmake .. -DCMAKE_BUILD_TYPE=Release "-DCMAKE_INSTALL_PREFIX=$DISTDIR" -DZIG_STATIC=ON -DCMAKE_PREFIX_PATH=/deps/local -GNinja -DZIG_SINGLE_THREADED=ON samu install ./zig build test -Dskip-release -Dskip-non-native From 177377b6e356b34bbed40cadca596658d158af6b Mon Sep 17 00:00:00 2001 From: Andrew Kelley Date: Wed, 23 Dec 2020 16:57:18 -0800 Subject: [PATCH 12/14] rework std.ResetEvent, improve std lib Darwin integration * split std.ResetEvent into: - ResetEvent - requires init() at runtime and it can fail. Also requires deinit(). - StaticResetEvent - can be statically initialized and requires no deinitialization. Initialization cannot fail. * the POSIX sem_t implementation can in fact fail on initialization because it is allowed to be implemented as a file descriptor. * Completely define, clarify, and explain in detail the semantics of these APIs. Remove the `isSet` function. * `ResetEvent.timedWait` returns an enum instead of a possible error. * `ResetEvent.init` takes a pointer to the ResetEvent instead of returning a copy. * On Darwin, `ResetEvent` is implemented using Grand Central Dispatch, which is exposed by libSystem. stage2 changes: * ThreadPool: use a single, pre-initialized `ResetEvent` per worker. * WaitGroup: now requires init() and deinit() and init() can fail. - Add a `reset` function. - Compilation initializes one for the work queue in creation and re-uses it for every update. - Rename `stop` to `finish`. - Simplify the implementation based on the usage pattern. --- CMakeLists.txt | 5 +- lib/std/ResetEvent.zig | 297 ++++++++++++++ .../{reset_event.zig => StaticResetEvent.zig} | 385 +++++++----------- lib/std/c/darwin.zig | 12 + lib/std/debug.zig | 3 +- lib/std/fs/test.zig | 2 - lib/std/mutex.zig | 4 +- lib/std/std.zig | 3 +- src/Compilation.zig | 18 +- src/ThreadPool.zig | 96 +++-- src/WaitGroup.zig | 35 +- 11 files changed, 549 insertions(+), 311 deletions(-) create mode 100644 lib/std/ResetEvent.zig rename lib/std/{reset_event.zig => StaticResetEvent.zig} (51%) diff --git a/CMakeLists.txt b/CMakeLists.txt index 272cdc6921..b9b9310946 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -410,11 +410,12 @@ set(ZIG_STAGE2_SOURCES "${CMAKE_SOURCE_DIR}/lib/std/os/windows/bits.zig" "${CMAKE_SOURCE_DIR}/lib/std/os/windows/ntstatus.zig" "${CMAKE_SOURCE_DIR}/lib/std/os/windows/win32error.zig" + "${CMAKE_SOURCE_DIR}/lib/std/Progress.zig" + "${CMAKE_SOURCE_DIR}/lib/std/ResetEvent.zig" + "${CMAKE_SOURCE_DIR}/lib/std/StaticResetEvent.zig" "${CMAKE_SOURCE_DIR}/lib/std/pdb.zig" "${CMAKE_SOURCE_DIR}/lib/std/process.zig" - "${CMAKE_SOURCE_DIR}/lib/std/Progress.zig" "${CMAKE_SOURCE_DIR}/lib/std/rand.zig" - "${CMAKE_SOURCE_DIR}/lib/std/reset_event.zig" "${CMAKE_SOURCE_DIR}/lib/std/sort.zig" "${CMAKE_SOURCE_DIR}/lib/std/special/compiler_rt.zig" "${CMAKE_SOURCE_DIR}/lib/std/special/compiler_rt/addXf3.zig" diff --git a/lib/std/ResetEvent.zig b/lib/std/ResetEvent.zig new file mode 100644 index 0000000000..cd62eb6e21 --- /dev/null +++ b/lib/std/ResetEvent.zig @@ -0,0 +1,297 @@ +// SPDX-License-Identifier: MIT +// Copyright (c) 2015-2020 Zig Contributors +// This file is part of [zig](https://ziglang.org/), which is MIT licensed. +// The MIT license requires this copyright notice to be included in all copies +// and substantial portions of the software. + +//! A thread-safe resource which supports blocking until signaled. +//! This API is for kernel threads, not evented I/O. +//! This API requires being initialized at runtime, and initialization +//! can fail. Once initialized, the core operations cannot fail. +//! If you need an abstraction that cannot fail to be initialized, see +//! `std.StaticResetEvent`. However if you can handle initialization failure, +//! it is preferred to use `ResetEvent`. + +const ResetEvent = @This(); +const std = @import("std.zig"); +const builtin = std.builtin; +const testing = std.testing; +const assert = std.debug.assert; +const c = std.c; +const os = std.os; +const time = std.time; + +impl: Impl, + +pub const Impl = if (builtin.single_threaded) + std.StaticResetEvent.DebugEvent +else if (std.Target.current.isDarwin()) + DarwinEvent +else if (std.Thread.use_pthreads) + PosixEvent +else + std.StaticResetEvent.AtomicEvent; + +pub const InitError = error{SystemResources}; + +/// After `init`, it is legal to call any other function. +pub fn init(ev: *ResetEvent) InitError!void { + return ev.impl.init(); +} + +/// This function is not thread-safe. +/// After `deinit`, the only legal function to call is `init`. +pub fn deinit(ev: *ResetEvent) void { + return ev.impl.deinit(); +} + +/// Sets the event if not already set and wakes up all the threads waiting on +/// the event. It is safe to call `set` multiple times before calling `wait`. +/// However it is illegal to call `set` after `wait` is called until the event +/// is `reset`. This function is thread-safe. +pub fn set(ev: *ResetEvent) void { + return ev.impl.set(); +} + +/// Resets the event to its original, unset state. +/// This function is *not* thread-safe. It is equivalent to calling +/// `deinit` followed by `init` but without the possibility of failure. +pub fn reset(ev: *ResetEvent) void { + return ev.impl.reset(); +} + +/// Wait for the event to be set by blocking the current thread. +/// Thread-safe. No spurious wakeups. +/// Upon return from `wait`, the only functions available to be called +/// in `ResetEvent` are `reset` and `deinit`. +pub fn wait(ev: *ResetEvent) void { + return ev.impl.wait(); +} + +pub const TimedWaitResult = enum { event_set, timed_out }; + +/// Wait for the event to be set by blocking the current thread. +/// A timeout in nanoseconds can be provided as a hint for how +/// long the thread should block on the unset event before returning +/// `TimedWaitResult.timed_out`. +/// Thread-safe. No precision of timing is guaranteed. +/// Upon return from `wait`, the only functions available to be called +/// in `ResetEvent` are `reset` and `deinit`. +pub fn timedWait(ev: *ResetEvent, timeout_ns: u64) TimedWaitResult { + return ev.impl.timedWait(timeout_ns); +} + +/// Apple has decided to not support POSIX semaphores, so we go with a +/// different approach using Grand Central Dispatch. This API is exposed +/// by libSystem so it is guaranteed to be available on all Darwin platforms. +pub const DarwinEvent = struct { + sem: c.dispatch_semaphore_t = undefined, + + pub fn init(ev: *DarwinEvent) !void { + ev.* = .{ + .sem = c.dispatch_semaphore_create(0) orelse return error.SystemResources, + }; + } + + pub fn deinit(ev: *DarwinEvent) void { + c.dispatch_release(ev.sem); + ev.* = undefined; + } + + pub fn set(ev: *DarwinEvent) void { + // Empirically this returns the numerical value of the semaphore. + _ = c.dispatch_semaphore_signal(ev.sem); + } + + pub fn wait(ev: *DarwinEvent) void { + assert(c.dispatch_semaphore_wait(ev.sem, c.DISPATCH_TIME_FOREVER) == 0); + } + + pub fn timedWait(ev: *DarwinEvent, timeout_ns: u64) TimedWaitResult { + const t = c.dispatch_time(c.DISPATCH_TIME_NOW, @intCast(i64, timeout_ns)); + if (c.dispatch_semaphore_wait(ev.sem, t) != 0) { + return .timed_out; + } else { + return .event_set; + } + } + + pub fn reset(ev: *DarwinEvent) void { + // Keep calling until the semaphore goes back down to 0. + while (c.dispatch_semaphore_wait(ev.sem, c.DISPATCH_TIME_NOW) == 0) {} + } +}; + +/// POSIX semaphores must be initialized at runtime because they are allowed to +/// be implemented as file descriptors, in which case initialization would require +/// a syscall to open the fd. +pub const PosixEvent = struct { + sem: c.sem_t = undefined, + + pub fn init(ev: *PosixEvent) !void { + switch (c.getErrno(c.sem_init(&ev.sem, 0, 0))) { + 0 => return, + else => return error.SystemResources, + } + } + + pub fn deinit(ev: *PosixEvent) void { + assert(c.sem_destroy(&ev.sem) == 0); + ev.* = undefined; + } + + pub fn set(ev: *PosixEvent) void { + assert(c.sem_post(&ev.sem) == 0); + } + + pub fn wait(ev: *PosixEvent) void { + while (true) { + switch (c.getErrno(c.sem_wait(&ev.sem))) { + 0 => return, + c.EINTR => continue, + c.EINVAL => unreachable, + else => unreachable, + } + } + } + + pub fn timedWait(ev: *PosixEvent, timeout_ns: u64) TimedWaitResult { + var ts: os.timespec = undefined; + var timeout_abs = timeout_ns; + os.clock_gettime(os.CLOCK_REALTIME, &ts) catch return .timed_out; + timeout_abs += @intCast(u64, ts.tv_sec) * time.ns_per_s; + timeout_abs += @intCast(u64, ts.tv_nsec); + ts.tv_sec = @intCast(@TypeOf(ts.tv_sec), @divFloor(timeout_abs, time.ns_per_s)); + ts.tv_nsec = @intCast(@TypeOf(ts.tv_nsec), @mod(timeout_abs, time.ns_per_s)); + while (true) { + switch (c.getErrno(c.sem_timedwait(&ev.sem, &ts))) { + 0 => return .event_set, + c.EINTR => continue, + c.EINVAL => unreachable, + c.ETIMEDOUT => return .timed_out, + else => unreachable, + } + } + } + + pub fn reset(ev: *PosixEvent) void { + while (true) { + switch (c.getErrno(c.sem_trywait(&ev.sem))) { + 0 => continue, // Need to make it go to zero. + c.EINTR => continue, + c.EINVAL => unreachable, + c.EAGAIN => return, // The semaphore currently has the value zero. + else => unreachable, + } + } + } +}; + +test "basic usage" { + var event: ResetEvent = undefined; + try event.init(); + defer event.deinit(); + + // test event setting + event.set(); + + // test event resetting + event.reset(); + + // test event waiting (non-blocking) + event.set(); + event.wait(); + event.reset(); + + event.set(); + testing.expectEqual(TimedWaitResult.event_set, event.timedWait(1)); + + // test cross-thread signaling + if (builtin.single_threaded) + return; + + const Context = struct { + const Self = @This(); + + value: u128, + in: ResetEvent, + out: ResetEvent, + + fn init(self: *Self) !void { + self.* = .{ + .value = 0, + .in = undefined, + .out = undefined, + }; + try self.in.init(); + try self.out.init(); + } + + fn deinit(self: *Self) void { + self.in.deinit(); + self.out.deinit(); + self.* = undefined; + } + + fn sender(self: *Self) void { + // update value and signal input + testing.expect(self.value == 0); + self.value = 1; + self.in.set(); + + // wait for receiver to update value and signal output + self.out.wait(); + testing.expect(self.value == 2); + + // update value and signal final input + self.value = 3; + self.in.set(); + } + + fn receiver(self: *Self) void { + // wait for sender to update value and signal input + self.in.wait(); + assert(self.value == 1); + + // update value and signal output + self.in.reset(); + self.value = 2; + self.out.set(); + + // wait for sender to update value and signal final input + self.in.wait(); + assert(self.value == 3); + } + + fn sleeper(self: *Self) void { + self.in.set(); + time.sleep(time.ns_per_ms * 2); + self.value = 5; + self.out.set(); + } + + fn timedWaiter(self: *Self) !void { + self.in.wait(); + testing.expectEqual(TimedWaitResult.timed_out, self.out.timedWait(time.ns_per_us)); + try self.out.timedWait(time.ns_per_ms * 100); + testing.expect(self.value == 5); + } + }; + + var context: Context = undefined; + try context.init(); + defer context.deinit(); + const receiver = try std.Thread.spawn(&context, Context.receiver); + defer receiver.wait(); + context.sender(); + + if (false) { + // I have now observed this fail on macOS, Windows, and Linux. + // https://github.com/ziglang/zig/issues/7009 + var timed = Context.init(); + defer timed.deinit(); + const sleeper = try std.Thread.spawn(&timed, Context.sleeper); + defer sleeper.wait(); + try timed.timedWaiter(); + } +} diff --git a/lib/std/reset_event.zig b/lib/std/StaticResetEvent.zig similarity index 51% rename from lib/std/reset_event.zig rename to lib/std/StaticResetEvent.zig index 58dc40994d..b41e7666ac 100644 --- a/lib/std/reset_event.zig +++ b/lib/std/StaticResetEvent.zig @@ -3,270 +3,184 @@ // This file is part of [zig](https://ziglang.org/), which is MIT licensed. // The MIT license requires this copyright notice to be included in all copies // and substantial portions of the software. + +//! A thread-safe resource which supports blocking until signaled. +//! This API is for kernel threads, not evented I/O. +//! This API is statically initializable. It cannot fail to be initialized +//! and it requires no deinitialization. The downside is that it may not +//! integrate as cleanly into other synchronization APIs, or, in a worst case, +//! may be forced to fall back on spin locking. As a rule of thumb, prefer +//! to use `std.ResetEvent` when possible, and use `StaticResetEvent` when +//! the logic needs stronger API guarantees. + const std = @import("std.zig"); -const builtin = @import("builtin"); -const testing = std.testing; +const StaticResetEvent = @This(); const SpinLock = std.SpinLock; const assert = std.debug.assert; -const c = std.c; const os = std.os; const time = std.time; -const linux = os.linux; -const windows = os.windows; +const linux = std.os.linux; +const windows = std.os.windows; +const testing = std.testing; -/// A resource object which supports blocking until signaled. -/// Once finished, the `deinit()` method should be called for correctness. -pub const ResetEvent = struct { - os_event: OsEvent, +impl: Impl = .{}, - pub const OsEvent = if (builtin.single_threaded) - DebugEvent - else if (std.Thread.use_pthreads) - PosixEvent - else - AtomicEvent; +pub const Impl = if (std.builtin.single_threaded) + DebugEvent +else + AtomicEvent; - pub fn init() ResetEvent { - return ResetEvent{ .os_event = OsEvent.init() }; +/// Sets the event if not already set and wakes up all the threads waiting on +/// the event. It is safe to call `set` multiple times before calling `wait`. +/// However it is illegal to call `set` after `wait` is called until the event +/// is `reset`. This function is thread-safe. +pub fn set(ev: *StaticResetEvent) void { + return ev.impl.set(); +} + +/// Wait for the event to be set by blocking the current thread. +/// Thread-safe. No spurious wakeups. +/// Upon return from `wait`, the only function available to be called +/// in `StaticResetEvent` is `reset`. +pub fn wait(ev: *StaticResetEvent) void { + return ev.impl.wait(); +} + +/// Resets the event to its original, unset state. +/// This function is *not* thread-safe. It is equivalent to calling +/// `deinit` followed by `init` but without the possibility of failure. +pub fn reset(ev: *StaticResetEvent) void { + return ev.impl.reset(); +} + +pub const TimedWaitResult = std.ResetEvent.TimedWaitResult; + +/// Wait for the event to be set by blocking the current thread. +/// A timeout in nanoseconds can be provided as a hint for how +/// long the thread should block on the unset event before returning +/// `TimedWaitResult.timed_out`. +/// Thread-safe. No precision of timing is guaranteed. +/// Upon return from `timedWait`, the only function available to be called +/// in `StaticResetEvent` is `reset`. +pub fn timedWait(ev: *StaticResetEvent, timeout_ns: u64) TimedWaitResult { + return ev.impl.timedWait(timeout_ns); +} + +/// For single-threaded builds, we use this to detect deadlocks. +/// In unsafe modes this ends up being no-ops. +pub const DebugEvent = struct { + state: State = State.unset, + + const State = enum { + unset, + set, + waited, + }; + + /// This function is provided so that this type can be re-used inside + /// `std.ResetEvent`. + pub fn init(ev: *DebugEvent) void { + ev.* = .{}; } - pub fn deinit(self: *ResetEvent) void { - self.os_event.deinit(); + /// This function is provided so that this type can be re-used inside + /// `std.ResetEvent`. + pub fn deinit(ev: *DebugEvent) void { + ev.* = undefined; } - /// When `wait` would return without blocking, this returns `true`. - /// Note that the value may be immediately invalid upon this function's - /// return, because another thread may call `wait` in between, changing - /// the event's set/cleared status. - pub fn isSet(self: *ResetEvent) bool { - return self.os_event.isSet(); + pub fn set(ev: *DebugEvent) void { + switch (ev.state) { + .unset => ev.state = .set, + .set => {}, + .waited => unreachable, // Not allowed to call `set` until `reset`. + } } - /// Sets the event if not already set and - /// wakes up all the threads waiting on the event. - pub fn set(self: *ResetEvent) void { - return self.os_event.set(); + pub fn wait(ev: *DebugEvent) void { + switch (ev.state) { + .unset => unreachable, // Deadlock detected. + .set => return, + .waited => unreachable, // Not allowed to call `wait` until `reset`. + } } - /// Resets the event to its original, unset state. - /// TODO improve these docs: - /// * under what circumstances does it make sense to call this function? - pub fn reset(self: *ResetEvent) void { - return self.os_event.reset(); + fn timedWait(ev: *DebugEvent, timeout: u64) TimedWaitResult { + switch (ev.state) { + .unset => return .timed_out, + .set => return .event_set, + .waited => unreachable, // Not allowed to call `wait` until `reset`. + } } - /// Wait for the event to be set by blocking the current thread. - /// TODO improve these docs: - /// * is the function thread-safe? - /// * does it have suprious wakeups? - pub fn wait(self: *ResetEvent) void { - return self.os_event.wait(); - } - - /// Wait for the event to be set by blocking the current thread. - /// A timeout in nanoseconds can be provided as a hint for how - /// long the thread should block on the unset event before throwing error.TimedOut. - /// TODO improve these docs: - /// * is the function thread-safe? - /// * does it have suprious wakeups? - pub fn timedWait(self: *ResetEvent, timeout_ns: u64) !void { - return self.os_event.timedWait(timeout_ns); + pub fn reset(ev: *DebugEvent) void { + ev.state = .unset; } }; -const DebugEvent = struct { - is_set: bool, - - fn init() DebugEvent { - return DebugEvent{ .is_set = false }; - } - - fn deinit(self: *DebugEvent) void { - self.* = undefined; - } - - fn isSet(self: *DebugEvent) bool { - return self.is_set; - } - - fn reset(self: *DebugEvent) void { - self.is_set = false; - } - - fn set(self: *DebugEvent) void { - self.is_set = true; - } - - fn wait(self: *DebugEvent) void { - if (self.is_set) - return; - - @panic("deadlock detected"); - } - - fn timedWait(self: *DebugEvent, timeout: u64) !void { - if (self.is_set) - return; - - return error.TimedOut; - } -}; - -const PosixEvent = struct { - sem: c.sem_t = undefined, - /// Sadly this is needed because pthreads semaphore API does not - /// support static initialization. - init_mutex: std.mutex.PthreadMutex = .{}, - state: enum { uninit, init } = .uninit, - - fn init() PosixEvent { - return .{}; - } - - /// Not thread-safe. - fn deinit(self: *PosixEvent) void { - switch (self.state) { - .uninit => {}, - .init => { - assert(c.sem_destroy(&self.sem) == 0); - }, - } - self.* = undefined; - } - - fn isSet(self: *PosixEvent) bool { - const sem = self.getInitializedSem(); - var val: c_int = undefined; - assert(c.sem_getvalue(sem, &val) == 0); - return val > 0; - } - - fn reset(self: *PosixEvent) void { - const sem = self.getInitializedSem(); - while (true) { - switch (c.getErrno(c.sem_trywait(sem))) { - 0 => continue, // Need to make it go to zero. - c.EINTR => continue, - c.EINVAL => unreachable, - c.EAGAIN => return, // The semaphore currently has the value zero. - else => unreachable, - } - } - } - - fn set(self: *PosixEvent) void { - const sem = self.getInitializedSem(); - assert(c.sem_post(sem) == 0); - } - - fn wait(self: *PosixEvent) void { - const sem = self.getInitializedSem(); - while (true) { - switch (c.getErrno(c.sem_wait(sem))) { - 0 => return, - c.EINTR => continue, - c.EINVAL => unreachable, - else => unreachable, - } - } - } - - fn timedWait(self: *PosixEvent, timeout_ns: u64) !void { - var ts: os.timespec = undefined; - var timeout_abs = timeout_ns; - if (comptime std.Target.current.isDarwin()) { - var tv: os.darwin.timeval = undefined; - assert(os.darwin.gettimeofday(&tv, null) == 0); - timeout_abs += @intCast(u64, tv.tv_sec) * time.ns_per_s; - timeout_abs += @intCast(u64, tv.tv_usec) * time.ns_per_us; - } else { - os.clock_gettime(os.CLOCK_REALTIME, &ts) catch return error.TimedOut; - timeout_abs += @intCast(u64, ts.tv_sec) * time.ns_per_s; - timeout_abs += @intCast(u64, ts.tv_nsec); - } - ts.tv_sec = @intCast(@TypeOf(ts.tv_sec), @divFloor(timeout_abs, time.ns_per_s)); - ts.tv_nsec = @intCast(@TypeOf(ts.tv_nsec), @mod(timeout_abs, time.ns_per_s)); - const sem = self.getInitializedSem(); - while (true) { - switch (c.getErrno(c.sem_timedwait(&self.sem, &ts))) { - 0 => return, - c.EINTR => continue, - c.EINVAL => unreachable, - c.ETIMEDOUT => return error.TimedOut, - else => unreachable, - } - } - } - - fn getInitializedSem(self: *PosixEvent) *c.sem_t { - const held = self.init_mutex.acquire(); - defer held.release(); - - switch (self.state) { - .init => return &self.sem, - .uninit => { - self.state = .init; - assert(c.sem_init(&self.sem, 0, 0) == 0); - return &self.sem; - }, - } - } -}; - -const AtomicEvent = struct { - waiters: u32, +pub const AtomicEvent = struct { + waiters: u32 = 0, const WAKE = 1 << 0; const WAIT = 1 << 1; - fn init() AtomicEvent { - return AtomicEvent{ .waiters = 0 }; + /// This function is provided so that this type can be re-used inside + /// `std.ResetEvent`. + pub fn init(ev: *AtomicEvent) void { + ev.* = .{}; } - fn deinit(self: *AtomicEvent) void { - self.* = undefined; + /// This function is provided so that this type can be re-used inside + /// `std.ResetEvent`. + pub fn deinit(ev: *AtomicEvent) void { + ev.* = undefined; } - fn isSet(self: *const AtomicEvent) bool { - return @atomicLoad(u32, &self.waiters, .Acquire) == WAKE; - } - - fn reset(self: *AtomicEvent) void { - @atomicStore(u32, &self.waiters, 0, .Monotonic); - } - - fn set(self: *AtomicEvent) void { - const waiters = @atomicRmw(u32, &self.waiters, .Xchg, WAKE, .Release); + pub fn set(ev: *AtomicEvent) void { + const waiters = @atomicRmw(u32, &ev.waiters, .Xchg, WAKE, .Release); if (waiters >= WAIT) { - return Futex.wake(&self.waiters, waiters >> 1); + return Futex.wake(&ev.waiters, waiters >> 1); } } - fn wait(self: *AtomicEvent) void { - return self.timedWait(null) catch unreachable; + pub fn wait(ev: *AtomicEvent) void { + switch (ev.timedWait(null)) { + .timed_out => unreachable, + .event_set => return, + } } - fn timedWait(self: *AtomicEvent, timeout: ?u64) !void { - var waiters = @atomicLoad(u32, &self.waiters, .Acquire); + pub fn timedWait(ev: *AtomicEvent, timeout: ?u64) TimedWaitResult { + var waiters = @atomicLoad(u32, &ev.waiters, .Acquire); while (waiters != WAKE) { - waiters = @cmpxchgWeak(u32, &self.waiters, waiters, waiters + WAIT, .Acquire, .Acquire) orelse return Futex.wait(&self.waiters, timeout); + waiters = @cmpxchgWeak(u32, &ev.waiters, waiters, waiters + WAIT, .Acquire, .Acquire) orelse { + if (Futex.wait(&ev.waiters, timeout)) |_| { + return .event_set; + } else |_| { + return .timed_out; + } + }; } + return .event_set; } - pub const Futex = switch (builtin.os.tag) { + pub fn reset(ev: *AtomicEvent) void { + @atomicStore(u32, &ev.waiters, 0, .Monotonic); + } + + pub const Futex = switch (std.Target.current.os.tag) { .windows => WindowsFutex, .linux => LinuxFutex, else => SpinFutex, }; - const SpinFutex = struct { + pub const SpinFutex = struct { fn wake(waiters: *u32, wake_count: u32) void {} fn wait(waiters: *u32, timeout: ?u64) !void { - // TODO: handle platforms where a monotonic timer isnt available var timer: time.Timer = undefined; if (timeout != null) - timer = time.Timer.start() catch unreachable; + timer = time.Timer.start() catch return error.TimedOut; while (@atomicLoad(u32, waiters, .Acquire) != WAKE) { SpinLock.yield(); @@ -278,7 +192,7 @@ const AtomicEvent = struct { } }; - const LinuxFutex = struct { + pub const LinuxFutex = struct { fn wake(waiters: *u32, wake_count: u32) void { const waiting = std.math.maxInt(i32); // wake_count const ptr = @ptrCast(*const i32, waiters); @@ -313,7 +227,7 @@ const AtomicEvent = struct { } }; - const WindowsFutex = struct { + pub const WindowsFutex = struct { pub fn wake(waiters: *u32, wake_count: u32) void { const handle = getEventHandle() orelse return SpinFutex.wake(waiters, wake_count); const key = @ptrCast(*const c_void, waiters); @@ -392,18 +306,14 @@ const AtomicEvent = struct { }; }; -test "ResetEvent" { - var event = ResetEvent.init(); - defer event.deinit(); +test "basic usage" { + var event = StaticResetEvent{}; // test event setting - testing.expect(!event.isSet()); event.set(); - testing.expect(event.isSet()); // test event resetting event.reset(); - testing.expect(!event.isSet()); // test event waiting (non-blocking) event.set(); @@ -411,32 +321,18 @@ test "ResetEvent" { event.reset(); event.set(); - try event.timedWait(1); + testing.expectEqual(TimedWaitResult.event_set, event.timedWait(1)); // test cross-thread signaling - if (builtin.single_threaded) + if (std.builtin.single_threaded) return; const Context = struct { const Self = @This(); - value: u128, - in: ResetEvent, - out: ResetEvent, - - fn init() Self { - return Self{ - .value = 0, - .in = ResetEvent.init(), - .out = ResetEvent.init(), - }; - } - - fn deinit(self: *Self) void { - self.in.deinit(); - self.out.deinit(); - self.* = undefined; - } + value: u128 = 0, + in: StaticResetEvent = .{}, + out: StaticResetEvent = .{}, fn sender(self: *Self) void { // update value and signal input @@ -477,14 +373,13 @@ test "ResetEvent" { fn timedWaiter(self: *Self) !void { self.in.wait(); - testing.expectError(error.TimedOut, self.out.timedWait(time.ns_per_us)); + testing.expectEqual(TimedWaitResult.timed_out, self.out.timedWait(time.ns_per_us)); try self.out.timedWait(time.ns_per_ms * 100); testing.expect(self.value == 5); } }; - var context = Context.init(); - defer context.deinit(); + var context = Context{}; const receiver = try std.Thread.spawn(&context, Context.receiver); defer receiver.wait(); context.sender(); diff --git a/lib/std/c/darwin.zig b/lib/std/c/darwin.zig index 9c5056d9c4..0850464567 100644 --- a/lib/std/c/darwin.zig +++ b/lib/std/c/darwin.zig @@ -187,3 +187,15 @@ pub const pthread_attr_t = extern struct { }; pub extern "c" fn arc4random_buf(buf: [*]u8, len: usize) void; + +// Grand Central Dispatch is exposed by libSystem. +pub const dispatch_semaphore_t = *opaque{}; +pub const dispatch_time_t = u64; +pub const DISPATCH_TIME_NOW = @as(dispatch_time_t, 0); +pub const DISPATCH_TIME_FOREVER = ~@as(dispatch_time_t, 0); +pub extern "c" fn dispatch_semaphore_create(value: isize) ?dispatch_semaphore_t; +pub extern "c" fn dispatch_semaphore_wait(dsema: dispatch_semaphore_t, timeout: dispatch_time_t) isize; +pub extern "c" fn dispatch_semaphore_signal(dsema: dispatch_semaphore_t) isize; + +pub extern "c" fn dispatch_release(object: *c_void) void; +pub extern "c" fn dispatch_time(when: dispatch_time_t, delta: i64) dispatch_time_t; diff --git a/lib/std/debug.zig b/lib/std/debug.zig index 7284237cb2..56428075bf 100644 --- a/lib/std/debug.zig +++ b/lib/std/debug.zig @@ -274,9 +274,8 @@ pub fn panicExtra(trace: ?*const builtin.StackTrace, first_trace_addr: ?usize, c // and call abort() // Sleep forever without hammering the CPU - var event = std.ResetEvent.init(); + var event: std.StaticResetEvent = .{}; event.wait(); - unreachable; } }, diff --git a/lib/std/fs/test.zig b/lib/std/fs/test.zig index 61df39be0c..06e5bfe66b 100644 --- a/lib/std/fs/test.zig +++ b/lib/std/fs/test.zig @@ -771,8 +771,6 @@ test "open file with exclusive lock twice, make sure it waits" { std.time.sleep(SLEEP_TIMEOUT_NS); if (timer.read() >= SLEEP_TIMEOUT_NS) break; } - // Check that createFile is still waiting for the lock to be released. - testing.expect(!evt.isSet()); file.close(); // No timeout to avoid failures on heavily loaded systems. evt.wait(); diff --git a/lib/std/mutex.zig b/lib/std/mutex.zig index 349a250fea..6fa8fb6a62 100644 --- a/lib/std/mutex.zig +++ b/lib/std/mutex.zig @@ -284,7 +284,7 @@ const WindowsMutex = struct { fn acquireSlow(self: *WindowsMutex) Held { // try to use NT keyed events for blocking, falling back to spinlock if unavailable @setCold(true); - const handle = ResetEvent.OsEvent.Futex.getEventHandle() orelse return self.acquireSpinning(); + const handle = ResetEvent.Impl.Futex.getEventHandle() orelse return self.acquireSpinning(); const key = @ptrCast(*const c_void, &self.state.waiters); while (true) : (SpinLock.loopHint(1)) { @@ -312,7 +312,7 @@ const WindowsMutex = struct { pub fn release(self: Held) void { // unlock without a rmw/cmpxchg instruction @atomicStore(u8, @ptrCast(*u8, &self.mutex.state.locked), 0, .Release); - const handle = ResetEvent.OsEvent.Futex.getEventHandle() orelse return; + const handle = ResetEvent.Impl.Futex.getEventHandle() orelse return; const key = @ptrCast(*const c_void, &self.mutex.state.waiters); while (true) : (SpinLock.loopHint(1)) { diff --git a/lib/std/std.zig b/lib/std/std.zig index 5fbf2662b9..69f4ea671b 100644 --- a/lib/std/std.zig +++ b/lib/std/std.zig @@ -30,10 +30,11 @@ pub const PackedIntSlice = @import("packed_int_array.zig").PackedIntSlice; pub const PackedIntSliceEndian = @import("packed_int_array.zig").PackedIntSliceEndian; pub const PriorityQueue = @import("priority_queue.zig").PriorityQueue; pub const Progress = @import("Progress.zig"); -pub const ResetEvent = @import("reset_event.zig").ResetEvent; +pub const ResetEvent = @import("ResetEvent.zig"); pub const SemanticVersion = @import("SemanticVersion.zig"); pub const SinglyLinkedList = @import("linked_list.zig").SinglyLinkedList; pub const SpinLock = @import("spinlock.zig").SpinLock; +pub const StaticResetEvent = @import("StaticResetEvent.zig"); pub const StringHashMap = hash_map.StringHashMap; pub const StringHashMapUnmanaged = hash_map.StringHashMapUnmanaged; pub const StringArrayHashMap = array_hash_map.StringArrayHashMap; diff --git a/src/Compilation.zig b/src/Compilation.zig index 11521e5a52..d172cbadcc 100644 --- a/src/Compilation.zig +++ b/src/Compilation.zig @@ -135,6 +135,8 @@ emit_docs: ?EmitLoc, c_header: ?c_link.Header, +work_queue_wait_group: WaitGroup, + pub const InnerError = Module.InnerError; pub const CRTFile = struct { @@ -1006,11 +1008,15 @@ pub fn create(gpa: *Allocator, options: InitOptions) !*Compilation { .test_filter = options.test_filter, .test_name_prefix = options.test_name_prefix, .test_evented_io = options.test_evented_io, + .work_queue_wait_group = undefined, }; break :comp comp; }; errdefer comp.destroy(); + try comp.work_queue_wait_group.init(); + errdefer comp.work_queue_wait_group.deinit(); + if (comp.bin_file.options.module) |mod| { try comp.work_queue.writeItem(.{ .generate_builtin_zig = {} }); } @@ -1191,6 +1197,8 @@ pub fn destroy(self: *Compilation) void { self.cache_parent.manifest_dir.close(); if (self.owned_link_dir) |*dir| dir.close(); + self.work_queue_wait_group.deinit(); + // This destroys `self`. self.arena_state.promote(gpa).deinit(); } @@ -1405,13 +1413,13 @@ pub fn performAllTheWork(self: *Compilation) error{ TimerUnsupported, OutOfMemor var arena = std.heap.ArenaAllocator.init(self.gpa); defer arena.deinit(); - var wg = WaitGroup{}; - defer wg.wait(); + self.work_queue_wait_group.reset(); + defer self.work_queue_wait_group.wait(); while (self.c_object_work_queue.readItem()) |c_object| { - wg.start(); + self.work_queue_wait_group.start(); try self.thread_pool.spawn(workerUpdateCObject, .{ - self, c_object, &c_comp_progress_node, &wg, + self, c_object, &c_comp_progress_node, &self.work_queue_wait_group, }); } @@ -1764,7 +1772,7 @@ fn workerUpdateCObject( progress_node: *std.Progress.Node, wg: *WaitGroup, ) void { - defer wg.stop(); + defer wg.finish(); comp.updateCObject(c_object, progress_node) catch |err| switch (err) { error.AnalysisFail => return, diff --git a/src/ThreadPool.zig b/src/ThreadPool.zig index cf9c02fa59..1e91d3f731 100644 --- a/src/ThreadPool.zig +++ b/src/ThreadPool.zig @@ -9,8 +9,7 @@ const ThreadPool = @This(); lock: std.Mutex = .{}, is_running: bool = true, allocator: *std.mem.Allocator, -spawned: usize = 0, -threads: []*std.Thread, +workers: []Worker, run_queue: RunQueue = .{}, idle_queue: IdleQueue = .{}, @@ -20,23 +19,69 @@ const Runnable = struct { runFn: fn (*Runnable) void, }; +const Worker = struct { + pool: *ThreadPool, + thread: *std.Thread, + /// The node is for this worker only and must have an already initialized event + /// when the thread is spawned. + idle_node: IdleQueue.Node, + + fn run(worker: *Worker) void { + while (true) { + const held = worker.pool.lock.acquire(); + + if (worker.pool.run_queue.popFirst()) |run_node| { + held.release(); + (run_node.data.runFn)(&run_node.data); + continue; + } + + if (worker.pool.is_running) { + worker.idle_node.data.reset(); + + worker.pool.idle_queue.prepend(&worker.idle_node); + held.release(); + + worker.idle_node.data.wait(); + continue; + } + + held.release(); + return; + } + } +}; + pub fn init(self: *ThreadPool, allocator: *std.mem.Allocator) !void { self.* = .{ .allocator = allocator, - .threads = &[_]*std.Thread{}, + .workers = &[_]Worker{}, }; if (std.builtin.single_threaded) return; - errdefer self.deinit(); + const worker_count = std.math.max(1, std.Thread.cpuCount() catch 1); + self.workers = try allocator.alloc(Worker, worker_count); + errdefer allocator.free(self.workers); - var num_threads = std.math.max(1, std.Thread.cpuCount() catch 1); - self.threads = try allocator.alloc(*std.Thread, num_threads); + var worker_index: usize = 0; + errdefer self.destroyWorkers(worker_index); + while (worker_index < worker_count) : (worker_index += 1) { + const worker = &self.workers[worker_index]; + worker.pool = self; - while (num_threads > 0) : (num_threads -= 1) { - const thread = try std.Thread.spawn(self, runWorker); - self.threads[self.spawned] = thread; - self.spawned += 1; + // Each worker requires its ResetEvent to be pre-initialized. + try worker.idle_node.data.init(); + errdefer worker.idle_node.data.deinit(); + + worker.thread = try std.Thread.spawn(worker, Worker.run); + } +} + +fn destroyWorkers(self: *ThreadPool, spawned: usize) void { + for (self.workers[0..spawned]) |*worker| { + worker.thread.wait(); + worker.idle_node.data.deinit(); } } @@ -50,9 +95,8 @@ pub fn deinit(self: *ThreadPool) void { idle_node.data.set(); } - defer self.allocator.free(self.threads); - for (self.threads[0..self.spawned]) |thread| - thread.wait(); + self.destroyWorkers(self.workers.len); + self.allocator.free(self.workers); } pub fn spawn(self: *ThreadPool, comptime func: anytype, args: anytype) !void { @@ -92,29 +136,3 @@ pub fn spawn(self: *ThreadPool, comptime func: anytype, args: anytype) !void { if (self.idle_queue.popFirst()) |idle_node| idle_node.data.set(); } - -fn runWorker(self: *ThreadPool) void { - while (true) { - const held = self.lock.acquire(); - - if (self.run_queue.popFirst()) |run_node| { - held.release(); - (run_node.data.runFn)(&run_node.data); - continue; - } - - if (self.is_running) { - var idle_node = IdleQueue.Node{ .data = std.ResetEvent.init() }; - - self.idle_queue.prepend(&idle_node); - held.release(); - - idle_node.data.wait(); - idle_node.data.deinit(); - continue; - } - - held.release(); - return; - } -} diff --git a/src/WaitGroup.zig b/src/WaitGroup.zig index 6a0b12d050..bd6274c10a 100644 --- a/src/WaitGroup.zig +++ b/src/WaitGroup.zig @@ -8,7 +8,21 @@ const WaitGroup = @This(); lock: std.Mutex = .{}, counter: usize = 0, -event: ?*std.ResetEvent = null, +event: std.ResetEvent, + +pub fn init(self: *WaitGroup) !void { + self.* = .{ + .lock = .{}, + .counter = 0, + .event = undefined, + }; + try self.event.init(); +} + +pub fn deinit(self: *WaitGroup) void { + self.event.deinit(); + self.* = undefined; +} pub fn start(self: *WaitGroup) void { const held = self.lock.acquire(); @@ -17,17 +31,14 @@ pub fn start(self: *WaitGroup) void { self.counter += 1; } -pub fn stop(self: *WaitGroup) void { +pub fn finish(self: *WaitGroup) void { const held = self.lock.acquire(); defer held.release(); self.counter -= 1; if (self.counter == 0) { - if (self.event) |event| { - self.event = null; - event.set(); - } + self.event.set(); } } @@ -40,13 +51,11 @@ pub fn wait(self: *WaitGroup) void { return; } - var event = std.ResetEvent.init(); - defer event.deinit(); - - std.debug.assert(self.event == null); - self.event = &event; - held.release(); - event.wait(); + self.event.wait(); } } + +pub fn reset(self: *WaitGroup) void { + self.event.reset(); +} From c2b1c8895334c4b597a665c2e8e29ce4a103f5b4 Mon Sep 17 00:00:00 2001 From: Andrew Kelley Date: Wed, 23 Dec 2020 20:35:53 -0700 Subject: [PATCH 13/14] std: fix compile errors introduced in previous commit --- lib/std/StaticResetEvent.zig | 2 +- lib/std/auto_reset_event.zig | 43 ++++++++++++++++++------------------ lib/std/fs/test.zig | 3 ++- lib/std/mutex.zig | 17 +++++++------- 4 files changed, 33 insertions(+), 32 deletions(-) diff --git a/lib/std/StaticResetEvent.zig b/lib/std/StaticResetEvent.zig index b41e7666ac..de1cb535a0 100644 --- a/lib/std/StaticResetEvent.zig +++ b/lib/std/StaticResetEvent.zig @@ -105,7 +105,7 @@ pub const DebugEvent = struct { } } - fn timedWait(ev: *DebugEvent, timeout: u64) TimedWaitResult { + pub fn timedWait(ev: *DebugEvent, timeout: u64) TimedWaitResult { switch (ev.state) { .unset => return .timed_out, .set => return .event_set, diff --git a/lib/std/auto_reset_event.zig b/lib/std/auto_reset_event.zig index 3c7e65e362..b50da9b8b2 100644 --- a/lib/std/auto_reset_event.zig +++ b/lib/std/auto_reset_event.zig @@ -7,14 +7,15 @@ const std = @import("std.zig"); const builtin = @import("builtin"); const testing = std.testing; const assert = std.debug.assert; +const StaticResetEvent = std.StaticResetEvent; -/// Similar to std.ResetEvent but on `set()` it also (atomically) does `reset()`. -/// Unlike std.ResetEvent, `wait()` can only be called by one thread (MPSC-like). +/// Similar to `StaticResetEvent` but on `set()` it also (atomically) does `reset()`. +/// Unlike StaticResetEvent, `wait()` can only be called by one thread (MPSC-like). pub const AutoResetEvent = struct { /// AutoResetEvent has 3 possible states: /// - UNSET: the AutoResetEvent is currently unset /// - SET: the AutoResetEvent was notified before a wait() was called - /// - : there is an active waiter waiting for a notification. + /// - : there is an active waiter waiting for a notification. /// /// When attempting to wait: /// if the event is unset, it registers a ResetEvent pointer to be notified when the event is set @@ -25,20 +26,20 @@ pub const AutoResetEvent = struct { /// if theres a waiting ResetEvent, then we unset the event and notify the ResetEvent /// /// This ensures that the event is automatically reset after a wait() has been issued - /// and avoids the race condition when using std.ResetEvent in the following scenario: - /// thread 1 | thread 2 - /// std.ResetEvent.wait() | - /// | std.ResetEvent.set() - /// | std.ResetEvent.set() - /// std.ResetEvent.reset() | - /// std.ResetEvent.wait() | (missed the second .set() notification above) + /// and avoids the race condition when using StaticResetEvent in the following scenario: + /// thread 1 | thread 2 + /// StaticResetEvent.wait() | + /// | StaticResetEvent.set() + /// | StaticResetEvent.set() + /// StaticResetEvent.reset() | + /// StaticResetEvent.wait() | (missed the second .set() notification above) state: usize = UNSET, const UNSET = 0; const SET = 1; - /// the minimum alignment for the `*std.ResetEvent` created by wait*() - const event_align = std.math.max(@alignOf(std.ResetEvent), 2); + /// the minimum alignment for the `*StaticResetEvent` created by wait*() + const event_align = std.math.max(@alignOf(StaticResetEvent), 2); pub fn wait(self: *AutoResetEvent) void { self.waitFor(null) catch unreachable; @@ -49,12 +50,9 @@ pub const AutoResetEvent = struct { } fn waitFor(self: *AutoResetEvent, timeout: ?u64) error{TimedOut}!void { - // lazily initialized std.ResetEvent - var reset_event: std.ResetEvent align(event_align) = undefined; + // lazily initialized StaticResetEvent + var reset_event: StaticResetEvent align(event_align) = undefined; var has_reset_event = false; - defer if (has_reset_event) { - reset_event.deinit(); - }; var state = @atomicLoad(usize, &self.state, .SeqCst); while (true) { @@ -72,7 +70,7 @@ pub const AutoResetEvent = struct { // lazily initialize the ResetEvent if it hasn't been already if (!has_reset_event) { has_reset_event = true; - reset_event = std.ResetEvent.init(); + reset_event = .{}; } // Since the AutoResetEvent currently isnt set, @@ -97,9 +95,10 @@ pub const AutoResetEvent = struct { }; // wait with a timeout and return if signalled via set() - if (reset_event.timedWait(timeout_ns)) |_| { - return; - } else |timed_out| {} + switch (reset_event.timedWait(timeout_ns)) { + .event_set => return, + .timed_out => {}, + } // If we timed out, we need to transition the AutoResetEvent back to UNSET. // If we don't, then when we return, a set() thread could observe a pointer to an invalid ResetEvent. @@ -164,7 +163,7 @@ pub const AutoResetEvent = struct { continue; } - const reset_event = @intToPtr(*align(event_align) std.ResetEvent, state); + const reset_event = @intToPtr(*align(event_align) StaticResetEvent, state); reset_event.set(); return; } diff --git a/lib/std/fs/test.zig b/lib/std/fs/test.zig index 06e5bfe66b..f4d50ca958 100644 --- a/lib/std/fs/test.zig +++ b/lib/std/fs/test.zig @@ -758,7 +758,8 @@ test "open file with exclusive lock twice, make sure it waits" { } }; - var evt = std.ResetEvent.init(); + var evt: std.ResetEvent = undefined; + try evt.init(); defer evt.deinit(); const t = try std.Thread.spawn(S.C{ .dir = &tmp.dir, .evt = &evt }, S.checkFn); diff --git a/lib/std/mutex.zig b/lib/std/mutex.zig index 6fa8fb6a62..777ba3d3c2 100644 --- a/lib/std/mutex.zig +++ b/lib/std/mutex.zig @@ -10,7 +10,7 @@ const assert = std.debug.assert; const windows = os.windows; const testing = std.testing; const SpinLock = std.SpinLock; -const ResetEvent = std.ResetEvent; +const StaticResetEvent = std.StaticResetEvent; /// Lock may be held only once. If the same thread tries to acquire /// the same mutex twice, it deadlocks. This type supports static @@ -54,7 +54,7 @@ else if (builtin.link_libc or builtin.os.tag == .linux) const Node = struct { next: ?*Node, - event: ResetEvent, + event: StaticResetEvent, }; pub fn tryAcquire(self: *Mutex) ?Held { @@ -90,11 +90,12 @@ else if (builtin.link_libc or builtin.os.tag == .linux) state = @atomicLoad(usize, &self.state, .Monotonic); } - // create the ResetEvent node on the stack + // create the StaticResetEvent node on the stack // (faster than threadlocal on platforms like OSX) - var node: Node = undefined; - node.event = ResetEvent.init(); - defer node.event.deinit(); + var node: Node = .{ + .next = undefined, + .event = .{}, + }; // we've spun too long, try and add our node to the LIFO queue. // if the mutex becomes available in the process, try and grab it instead. @@ -284,7 +285,7 @@ const WindowsMutex = struct { fn acquireSlow(self: *WindowsMutex) Held { // try to use NT keyed events for blocking, falling back to spinlock if unavailable @setCold(true); - const handle = ResetEvent.Impl.Futex.getEventHandle() orelse return self.acquireSpinning(); + const handle = StaticResetEvent.Impl.Futex.getEventHandle() orelse return self.acquireSpinning(); const key = @ptrCast(*const c_void, &self.state.waiters); while (true) : (SpinLock.loopHint(1)) { @@ -312,7 +313,7 @@ const WindowsMutex = struct { pub fn release(self: Held) void { // unlock without a rmw/cmpxchg instruction @atomicStore(u8, @ptrCast(*u8, &self.mutex.state.locked), 0, .Release); - const handle = ResetEvent.Impl.Futex.getEventHandle() orelse return; + const handle = StaticResetEvent.Impl.Futex.getEventHandle() orelse return; const key = @ptrCast(*const c_void, &self.mutex.state.waiters); while (true) : (SpinLock.loopHint(1)) { From 87e4f7376aa384183a793cb42498ed0ff06222d5 Mon Sep 17 00:00:00 2001 From: Andrew Kelley Date: Thu, 24 Dec 2020 01:14:41 -0700 Subject: [PATCH 14/14] Revert "std.valgrind: add helgrind functions" This reverts commit f2ab9512af0154a291ce11ab0a1298fbf778d751. I did it wrong and didn't end up using these. --- lib/std/valgrind.zig | 76 -------------------------------------------- 1 file changed, 76 deletions(-) diff --git a/lib/std/valgrind.zig b/lib/std/valgrind.zig index e1a99e3083..5373a2d513 100644 --- a/lib/std/valgrind.zig +++ b/lib/std/valgrind.zig @@ -80,70 +80,6 @@ pub const ClientRequest = extern enum { ChangeErrDisablement = 6145, VexInitForIri = 6401, InnerThreads = 6402, - - CLEAN_MEMORY = 1212612608, - SET_MY_PTHREAD_T = 1212612864, - PTH_API_ERROR = 1212612865, - PTHREAD_JOIN_POST = 1212612866, - PTHREAD_MUTEX_INIT_POST = 1212612867, - PTHREAD_MUTEX_DESTROY_PRE = 1212612868, - PTHREAD_MUTEX_UNLOCK_PRE = 1212612869, - PTHREAD_MUTEX_UNLOCK_POST = 1212612870, - PTHREAD_MUTEX_ACQUIRE_PRE = 1212612871, - PTHREAD_MUTEX_ACQUIRE_POST = 1212612872, - PTHREAD_COND_SIGNAL_PRE = 1212612873, - PTHREAD_COND_BROADCAST_PRE = 1212612874, - PTHREAD_COND_WAIT_PRE = 1212612875, - PTHREAD_COND_WAIT_POST = 1212612876, - PTHREAD_COND_DESTROY_PRE = 1212612877, - PTHREAD_RWLOCK_INIT_POST = 1212612878, - PTHREAD_RWLOCK_DESTROY_PRE = 1212612879, - PTHREAD_RWLOCK_LOCK_PRE = 1212612880, - PTHREAD_RWLOCK_ACQUIRED = 1212612881, - PTHREAD_RWLOCK_RELEASED = 1212612882, - PTHREAD_RWLOCK_UNLOCK_POST = 1212612883, - POSIX_SEM_INIT_POST = 1212612884, - POSIX_SEM_DESTROY_PRE = 1212612885, - POSIX_SEM_RELEASED = 1212612886, - POSIX_SEM_ACQUIRED = 1212612887, - PTHREAD_BARRIER_INIT_PRE = 1212612888, - PTHREAD_BARRIER_WAIT_PRE = 1212612889, - PTHREAD_BARRIER_DESTROY_PRE = 1212612890, - PTHREAD_SPIN_INIT_OR_UNLOCK_PRE = 1212612891, - PTHREAD_SPIN_INIT_OR_UNLOCK_POST = 1212612892, - PTHREAD_SPIN_LOCK_PRE = 1212612893, - PTHREAD_SPIN_LOCK_POST = 1212612894, - PTHREAD_SPIN_DESTROY_PRE = 1212612895, - CLIENTREQ_UNIMP = 1212612896, - USERSO_SEND_PRE = 1212612897, - USERSO_RECV_POST = 1212612898, - USERSO_FORGET_ALL = 1212612899, - RESERVED2 = 1212612900, - RESERVED3 = 1212612901, - RESERVED4 = 1212612902, - ARANGE_MAKE_UNTRACKED = 1212612903, - ARANGE_MAKE_TRACKED = 1212612904, - PTHREAD_BARRIER_RESIZE_PRE = 1212612905, - CLEAN_MEMORY_HEAPBLOCK = 1212612906, - PTHREAD_COND_INIT_POST = 1212612907, - GNAT_MASTER_HOOK = 1212612908, - GNAT_MASTER_COMPLETED_HOOK = 1212612909, - GET_ABITS = 1212612910, - PTHREAD_CREATE_BEGIN = 1212612911, - PTHREAD_CREATE_END = 1212612912, - PTHREAD_MUTEX_LOCK_PRE = 1212612913, - PTHREAD_MUTEX_LOCK_POST = 1212612914, - PTHREAD_RWLOCK_LOCK_POST = 1212612915, - PTHREAD_RWLOCK_UNLOCK_PRE = 1212612916, - POSIX_SEM_POST_PRE = 1212612917, - POSIX_SEM_POST_POST = 1212612918, - POSIX_SEM_WAIT_PRE = 1212612919, - POSIX_SEM_WAIT_POST = 1212612920, - PTHREAD_COND_SIGNAL_POST = 1212612921, - PTHREAD_COND_BROADCAST_POST = 1212612922, - RTLD_BIND_GUARD = 1212612923, - RTLD_BIND_CLEAR = 1212612924, - GNAT_DEPENDENT_MASTER_JOIN = 1212612925, }; pub fn ToolBase(base: [2]u8) u32 { return (@as(u32, base[0] & 0xff) << 24) | (@as(u32, base[1] & 0xff) << 16); @@ -323,18 +259,6 @@ pub fn monitorCommand(command: [*]u8) bool { return doClientRequestExpr(0, .GdbMonitorCommand, @ptrToInt(command.ptr), 0, 0, 0, 0) != 0; } -pub fn annotateHappensBefore(obj: *c_void) void { - doClientRequestStmt(.USERSO_SEND_PRE, @ptrToInt(obj), 0, 0, 0, 0); -} - -pub fn annotateHappensAfter(obj: *c_void) void { - doClientRequestStmt(.USERSO_RECV_POST, @ptrToInt(obj), 0, 0, 0, 0); -} - -pub fn annotateHappensBeforeForgetAll(obj: *c_void) void { - doClientRequestStmt(.USERSO_FORGET_ALL, @ptrToInt(obj), 0, 0, 0, 0); -} - pub const memcheck = @import("valgrind/memcheck.zig"); pub const callgrind = @import("valgrind/callgrind.zig");