std.Thread: more cleanup & testing

This commit is contained in:
kprotty 2021-06-28 11:27:23 -05:00
parent 7b323f84ca
commit f0fa129e9b
5 changed files with 184 additions and 146 deletions

View File

@ -958,14 +958,14 @@ const assert = std.debug.assert;
threadlocal var x: i32 = 1234;
test "thread local storage" {
const thread1 = try std.Thread.spawn(testTls, {});
const thread2 = try std.Thread.spawn(testTls, {});
testTls({});
thread1.wait();
thread2.wait();
const thread1 = try std.Thread.spawn(.{}, testTls, .{});
const thread2 = try std.Thread.spawn(.{}, testTls, .{});
testTls();
thread1.join();
thread2.join();
}
fn testTls(_: void) void {
fn testTls() void {
assert(x == 1234);
x += 1;
assert(x == 1235);

View File

@ -24,17 +24,6 @@ pub const Condition = @import("Thread/Condition.zig");
pub const spinLoopHint = @compileError("deprecated: use std.atomic.spinLoopHint");
test "std.Thread" {
// Doesn't use testing.refAllDecls() since that would pull in the compileError spinLoopHint.
_ = AutoResetEvent;
_ = Futex;
_ = ResetEvent;
_ = StaticResetEvent;
_ = Mutex;
_ = Semaphore;
_ = Condition;
}
pub const use_pthreads = target.os.tag != .windows and std.builtin.link_libc;
const Thread = @This();
@ -50,7 +39,6 @@ else
impl: Impl,
/// Represents a unique ID per thread.
/// May be an integer or pointer depending on the platform.
pub const Id = u64;
/// Returns the platform ID of the callers thread.
@ -79,7 +67,7 @@ pub const SpawnConfig = struct {
stack_size: usize = 16 * 1024 * 1024,
};
pub const SpawnError = error {
pub const SpawnError = error{
/// A system-imposed limit on the number of threads was encountered.
/// There are a number of limits that may trigger this error:
/// * the RLIMIT_NPROC soft resource limit (set via setrlimit(2)),
@ -115,7 +103,7 @@ pub const SpawnError = error {
/// or call `detach()` to excuse the caller from calling `join()` and have the thread clean up its resources on completion`.
pub fn spawn(config: SpawnConfig, comptime function: anytype, args: anytype) SpawnError!Thread {
if (std.builtin.single_threaded) {
@compileError("cannot spawn thread when building in single-threaded mode");
@compileError("Cannot spawn thread when building in single-threaded mode");
}
const impl = try Impl.spawn(config, function, args);
@ -132,11 +120,13 @@ pub fn getHandle(self: Thread) Handle {
}
/// Release the obligation of the caller to call `join()` and have the thread clean up its own resources on completion.
/// Once called, this consumes the Thread object and invoking any other functions on it is considered undefined behavior.
pub fn detach(self: Thread) void {
return self.impl.detach();
}
/// Waits for the thread to complete, then deallocates any resources created on `spawn()`.
/// Once called, this consumes the Thread object and invoking any other functions on it is considered undefined behavior.
pub fn join(self: Thread) void {
return self.impl.join();
}
@ -200,6 +190,8 @@ fn callFn(comptime f: anytype, args: anytype) switch (Impl) {
}
}
/// We can't compile error in the `Impl` switch statement as its eagerly evaluated.
/// So instead, we compile-error on the methods themselves for platforms which don't support threads.
const UnsupportedImpl = struct {
pub const ThreadHandle = void;
@ -212,7 +204,7 @@ const UnsupportedImpl = struct {
}
fn spawn(config: SpawnConfig, comptime f: anytype, args: anytype) !Impl {
return unsupported(.{config, f, args});
return unsupported(.{ config, f, args });
}
fn getHandle(self: Impl) ThreadHandle {
@ -225,7 +217,7 @@ const UnsupportedImpl = struct {
fn join(self: Impl) void {
return unsupported(self);
}
}
fn unsupported(unusued: anytype) noreturn {
@compileLog("Unsupported operating system", target.os.tag);
@ -244,6 +236,7 @@ const WindowsThreadImpl = struct {
}
fn getCpuCount() !usize {
// Faster than calling into GetSystemInfo(), even if amortized.
return windows.peb().NumberOfProcessors;
}
@ -299,16 +292,17 @@ const WindowsThreadImpl = struct {
// Its also fine if the limit here is incorrect as stack size is only a hint.
var stack_size = std.math.cast(u32, config.stack_size) catch std.math.maxInt(u32);
stack_size = std.math.max(64 * 1024, stack_size);
instance.thread.thread_handle = windows.kernel32.CreateThread(
null,
stack_size,
Instance.entryFn,
@ptrCast(*c_void, instance),
0,
null,
stack_size,
Instance.entryFn,
@ptrCast(*c_void, instance),
0,
null,
) orelse {
return windows.unexpectedError(windows.kernel32.GetLastError());
const errno = windows.kernel32.GetLastError();
return windows.unexpectedError(errno);
};
return Impl{ .thread = &instance.thread };
@ -332,7 +326,7 @@ const WindowsThreadImpl = struct {
windows.CloseHandle(self.thread.thread_handle);
assert(self.thread.completion.load(.SeqCst) == .completed);
self.thread.free();
}
}
};
const PosixThreadImpl = struct {
@ -374,7 +368,9 @@ const PosixThreadImpl = struct {
fn getCpuCount() !usize {
switch (target.os.tag) {
.linux => return LinuxThreadImpl.getCpuCount(),
.linux => {
return LinuxThreadImpl.getCpuCount();
},
.openbsd => {
var count: c_int = undefined;
var count_size: usize = @sizeOf(c_int);
@ -413,6 +409,7 @@ const PosixThreadImpl = struct {
const Instance = struct {
fn entryFn(raw_arg: ?*c_void) callconv(.C) ?*c_void {
// @alignCast() below doesn't support zero-sized-types (ZST)
if (@sizeOf(Args) < 1) {
return callFn(f, @as(Args, undefined));
}
@ -457,8 +454,9 @@ const PosixThreadImpl = struct {
fn detach(self: Impl) void {
switch (c.pthread_detach(self.handle)) {
os.EINVAL => unreachable,
os.ESRCH => unreachable,
0 => {},
os.EINVAL => unreachable, // thread handle is not joinable
os.ESRCH => unreachable, // thread handle is invalid
else => unreachable,
}
}
@ -466,9 +464,9 @@ const PosixThreadImpl = struct {
fn join(self: Impl) void {
switch (c.pthread_join(self.handle, null)) {
0 => {},
os.EINVAL => unreachable,
os.ESRCH => unreachable,
os.EDEADLK => unreachable,
os.EINVAL => unreachable, // thread handle is not joinable (or another thread is already joining in)
os.ESRCH => unreachable, // thread handle is invalid
os.EDEADLK => unreachable, // two threads tried to join each other
else => unreachable,
}
}
@ -476,7 +474,7 @@ const PosixThreadImpl = struct {
const LinuxThreadImpl = struct {
const linux = os.linux;
pub const ThreadHandle = i32;
threadlocal var tls_thread_id: ?Id = null;
@ -491,7 +489,8 @@ const LinuxThreadImpl = struct {
fn getCpuCount() !usize {
const cpu_set = try os.sched_getaffinity(0);
return @as(usize, os.CPU_COUNT(cpu_set)); // TODO should not need this usize cast
// TODO: should not need this usize cast
return @as(usize, os.CPU_COUNT(cpu_set));
}
thread: *ThreadCompletion,
@ -547,7 +546,7 @@ const LinuxThreadImpl = struct {
bytes = std.mem.alignForward(bytes, std.mem.page_size);
break :blk bytes;
};
// map all memory needed without read/write permissions
// to avoid committing the whole region right away
const mapped = os.mmap(
@ -654,7 +653,7 @@ const LinuxThreadImpl = struct {
switch (linux.getErrno(linux.futex_wait(
&self.thread.child_tid.value,
linux.FUTEX_WAIT,
linux.FUTEX_WAIT,
tid,
null,
))) {
@ -671,98 +670,145 @@ const LinuxThreadImpl = struct {
extern fn __unmap_and_exit(ptr: usize, len: usize) callconv(.C) noreturn;
comptime {
if (target.os.tag == .linux) {
asm(switch (target.cpu.arch) {
.i386 => (
\\.text
\\.global __unmap_and_exit
\\.type __unmap_and_exit, @function
\\__unmap_and_exit:
\\ movl $91, %eax
\\ movl 4(%esp), %ebx
\\ movl 8(%esp), %ecx
\\ int $128
\\ xorl %ebx, %ebx
\\ movl $1, %eax
\\ int $128
),
.x86_64 => (
\\.text
\\.global __unmap_and_exit
\\.type __unmap_and_exit, @function
\\__unmap_and_exit:
\\ movl $11, %eax
\\ syscall
\\ xor %rdi, %rdi
\\ movl $60, %eax
\\ syscall
),
.arm, .armeb, .thumb, .thumbeb => (
\\.syntax unified
\\.text
\\.global __unmap_and_exit
\\.type __unmap_and_exit, %function
\\__unmap_and_exit:
\\ mov r7, #91
\\ svc 0
\\ mov r7, #1
\\ svc 0
),
.aarch64, .aarch64_be, .aarch64_32 => (
\\.global __unmap_and_exit
\\.type __unmap_and_exit, %function
\\__unmap_and_exit:
\\ mov x8, #215
\\ svc 0
\\ mov x8, #93
\\ svc 0
),
.mips, .mipsel, => (
\\.set noreorder
\\.global __unmap_and_exit
\\.type __unmap_and_exit,@function
\\__unmap_and_exit:
\\ move $sp, $25
\\ li $2, 4091
\\ syscall
\\ li $4, 0
\\ li $2, 4001
\\ syscall
),
.mips64, .mips64el => (
\\.set noreorder
\\.global __unmap_and_exit
\\.type __unmap_and_exit, @function
\\__unmap_and_exit:
\\ li $2, 4091
\\ syscall
\\ li $4, 0
\\ li $2, 4001
\\ syscall
),
.powerpc, .powerpc64, .powerpc64le => (
\\.text
\\.global __unmap_and_exit
\\.type __unmap_and_exit, %function
\\__unmap_and_exit:
\\ li 0, 91
\\ sc
\\ li 0, 1
\\ sc
\\ blr
),
.riscv64 => (
\\.global __unmap_and_exit
\\.type __unmap_and_exit, %function
\\__unmap_and_exit:
\\ li a7, 215
\\ ecall
\\ li a7, 93
\\ ecall
),
else => |cpu_arch| {
@compileLog("linux arch", cpu_arch, "is not supported");
},
});
asm (switch (target.cpu.arch) {
.i386 => (
\\.text
\\.global __unmap_and_exit
\\.type __unmap_and_exit, @function
\\__unmap_and_exit:
\\ movl $91, %eax
\\ movl 4(%esp), %ebx
\\ movl 8(%esp), %ecx
\\ int $128
\\ xorl %ebx, %ebx
\\ movl $1, %eax
\\ int $128
),
.x86_64 => (
\\.text
\\.global __unmap_and_exit
\\.type __unmap_and_exit, @function
\\__unmap_and_exit:
\\ movl $11, %eax
\\ syscall
\\ xor %rdi, %rdi
\\ movl $60, %eax
\\ syscall
),
.arm, .armeb, .thumb, .thumbeb => (
\\.syntax unified
\\.text
\\.global __unmap_and_exit
\\.type __unmap_and_exit, %function
\\__unmap_and_exit:
\\ mov r7, #91
\\ svc 0
\\ mov r7, #1
\\ svc 0
),
.aarch64, .aarch64_be, .aarch64_32 => (
\\.global __unmap_and_exit
\\.type __unmap_and_exit, %function
\\__unmap_and_exit:
\\ mov x8, #215
\\ svc 0
\\ mov x8, #93
\\ svc 0
),
.mips,
.mipsel,
=> (
\\.set noreorder
\\.global __unmap_and_exit
\\.type __unmap_and_exit,@function
\\__unmap_and_exit:
\\ move $sp, $25
\\ li $2, 4091
\\ syscall
\\ li $4, 0
\\ li $2, 4001
\\ syscall
),
.mips64, .mips64el => (
\\.set noreorder
\\.global __unmap_and_exit
\\.type __unmap_and_exit, @function
\\__unmap_and_exit:
\\ li $2, 4091
\\ syscall
\\ li $4, 0
\\ li $2, 4001
\\ syscall
),
.powerpc, .powerpc64, .powerpc64le => (
\\.text
\\.global __unmap_and_exit
\\.type __unmap_and_exit, %function
\\__unmap_and_exit:
\\ li 0, 91
\\ sc
\\ li 0, 1
\\ sc
\\ blr
),
.riscv64 => (
\\.global __unmap_and_exit
\\.type __unmap_and_exit, %function
\\__unmap_and_exit:
\\ li a7, 215
\\ ecall
\\ li a7, 93
\\ ecall
),
else => |cpu_arch| {
@compileLog("linux arch", cpu_arch, "is not supported");
},
});
}
}
};
};
test "std.Thread" {
// Doesn't use testing.refAllDecls() since that would pull in the compileError spinLoopHint.
_ = AutoResetEvent;
_ = Futex;
_ = ResetEvent;
_ = StaticResetEvent;
_ = Mutex;
_ = Semaphore;
_ = Condition;
}
fn testIncrementNotify(value: *usize, event: *ResetEvent) void {
value.* += 1;
event.set();
}
test "Thread.join" {
if (std.builtin.single_threaded) return error.SkipZigTest;
var value: usize = 0;
var event: ResetEvent = undefined;
try event.init();
defer event.deinit();
const thread = try Thread.spawn(.{}, testIncrementNotify, .{&value, &event});
thread.join();
try std.testing.expectEqual(value, 1);
}
test "Thread.detach" {
if (std.builtin.single_threaded) return error.SkipZigTest;
var value: usize = 0;
var event: ResetEvent = undefined;
try event.init();
defer event.deinit();
const thread = try Thread.spawn(.{}, testIncrementNotify, .{&value, &event});
thread.detach();
event.wait();
try std.testing.expectEqual(value, 1);
}

View File

@ -407,7 +407,7 @@ test "Futex - wait/wake" {
test "Futex - Signal" {
if (single_threaded) {
return;
return error.SkipZigTest;
}
const Paddle = struct {
@ -449,7 +449,7 @@ test "Futex - Signal" {
test "Futex - Broadcast" {
if (single_threaded) {
return;
return error.SkipZigTest;
}
const Context = struct {
@ -506,7 +506,7 @@ test "Futex - Broadcast" {
test "Futex - Chain" {
if (single_threaded) {
return;
return error.SkipZigTest;
}
const Signal = struct {

View File

@ -277,6 +277,7 @@ pub extern "c" fn pthread_attr_setguardsize(attr: *pthread_attr_t, guardsize: us
pub extern "c" fn pthread_attr_destroy(attr: *pthread_attr_t) c_int;
pub extern "c" fn pthread_self() pthread_t;
pub extern "c" fn pthread_join(thread: pthread_t, arg_return: ?*?*c_void) c_int;
pub extern "c" fn pthread_detach(thread: pthread_t) c_int;
pub extern "c" fn pthread_atfork(
prepare: ?fn () callconv(.C) void,
parent: ?fn () callconv(.C) void,

View File

@ -321,17 +321,8 @@ test "std.Thread.getCurrentId" {
var thread_current_id: Thread.Id = undefined;
const thread = try Thread.spawn(.{}, testThreadIdFn, .{&thread_current_id});
const thread_id = thread.getHandle();
thread.join();
if (Thread.use_pthreads) {
try expect(thread_current_id == thread_id);
} else if (native_os == .windows) {
try expect(Thread.getCurrentId() != thread_current_id);
} else {
// If the thread completes very quickly, then thread_id can be 0. See the
// documentation comments for `std.Thread.handle`.
try expect(thread_id == 0 or thread_current_id == thread_id);
}
try expect(Thread.getCurrentId() != thread_current_id);
}
test "spawn threads" {