Merge pull request #9175 from kprotty/thread

std.Thread enhancements
This commit is contained in:
Andrew Kelley 2021-07-04 22:31:02 -04:00 committed by GitHub
commit b7da1b2d45
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
19 changed files with 915 additions and 699 deletions

View File

@ -958,14 +958,14 @@ const assert = std.debug.assert;
threadlocal var x: i32 = 1234;
test "thread local storage" {
const thread1 = try std.Thread.spawn(testTls, {});
const thread2 = try std.Thread.spawn(testTls, {});
testTls({});
thread1.wait();
thread2.wait();
const thread1 = try std.Thread.spawn(.{}, testTls, .{});
const thread2 = try std.Thread.spawn(.{}, testTls, .{});
testTls();
thread1.join();
thread2.join();
}
fn testTls(_: void) void {
fn testTls() void {
assert(x == 1234);
x += 1;
assert(x == 1235);

File diff suppressed because it is too large Load Diff

View File

@ -220,9 +220,9 @@ test "basic usage" {
};
var context = Context{};
const send_thread = try std.Thread.spawn(Context.sender, &context);
const recv_thread = try std.Thread.spawn(Context.receiver, &context);
const send_thread = try std.Thread.spawn(.{}, Context.sender, .{&context});
const recv_thread = try std.Thread.spawn(.{}, Context.receiver, .{&context});
send_thread.wait();
recv_thread.wait();
send_thread.join();
recv_thread.join();
}

View File

@ -64,9 +64,8 @@ pub fn wait(ptr: *const Atomic(u32), expect: u32, timeout: ?u64) error{TimedOut}
/// Unblocks at most `num_waiters` callers blocked in a `wait()` call on `ptr`.
/// `num_waiters` of 1 unblocks at most one `wait(ptr, ...)` and `maxInt(u32)` unblocks effectively all `wait(ptr, ...)`.
pub fn wake(ptr: *const Atomic(u32), num_waiters: u32) void {
if (num_waiters == 0 or single_threaded) {
return;
}
if (single_threaded) return;
if (num_waiters == 0) return;
return OsFutex.wake(ptr, num_waiters);
}
@ -80,7 +79,23 @@ else if (target.isDarwin())
else if (std.builtin.link_libc)
PosixFutex
else
@compileError("Operating System unsupported");
UnsupportedFutex;
const UnsupportedFutex = struct {
fn wait(ptr: *const Atomic(u32), expect: u32, timeout: ?u64) error{TimedOut}!void {
return unsupported(.{ ptr, expect, timeout });
}
fn wake(ptr: *const Atomic(u32), num_waiters: u32) void {
return unsupported(.{ ptr, num_waiters });
}
fn unsupported(unused: anytype) noreturn {
@compileLog("Unsupported operating system", target.os.tag);
_ = unused;
unreachable;
}
};
const WindowsFutex = struct {
const windows = std.os.windows;
@ -391,75 +406,73 @@ test "Futex - wait/wake" {
}
test "Futex - Signal" {
if (!single_threaded) {
return;
if (single_threaded) {
return error.SkipZigTest;
}
try (struct {
const Paddle = struct {
value: Atomic(u32) = Atomic(u32).init(0),
current: u32 = 0,
const Self = @This();
fn send(self: *Self, value: u32) void {
self.value.store(value, .Release);
Futex.wake(&self.value, 1);
}
fn recv(self: *Self, expected: u32) void {
while (true) {
const value = self.value.load(.Acquire);
if (value == expected) break;
Futex.wait(&self.value, value, null) catch unreachable;
}
}
const Thread = struct {
tx: *Self,
rx: *Self,
const start_value = 1;
fn run(self: Thread) void {
var iterations: u32 = start_value;
while (iterations < 10) : (iterations += 1) {
self.rx.recv(iterations);
self.tx.send(iterations);
fn run(self: *@This(), hit_to: *@This()) !void {
var iterations: usize = 4;
while (iterations > 0) : (iterations -= 1) {
var value: u32 = undefined;
while (true) {
value = self.value.load(.Acquire);
if (value != self.current) break;
Futex.wait(&self.value, self.current, null) catch unreachable;
}
try testing.expectEqual(value, self.current + 1);
self.current = value;
_ = hit_to.value.fetchAdd(1, .Release);
Futex.wake(&hit_to.value, 1);
}
};
fn run() !void {
var ping = Self{};
var pong = Self{};
const t1 = try std.Thread.spawn(Thread.run, .{ .rx = &ping, .tx = &pong });
defer t1.wait();
const t2 = try std.Thread.spawn(Thread.run, .{ .rx = &pong, .tx = &ping });
defer t2.wait();
ping.send(Thread.start_value);
}
}).run();
};
var ping = Paddle{};
var pong = Paddle{};
const t1 = try std.Thread.spawn(.{}, Paddle.run, .{ &ping, &pong });
defer t1.join();
const t2 = try std.Thread.spawn(.{}, Paddle.run, .{ &pong, &ping });
defer t2.join();
_ = ping.value.fetchAdd(1, .Release);
Futex.wake(&ping.value, 1);
}
test "Futex - Broadcast" {
if (!single_threaded) {
return;
if (single_threaded) {
return error.SkipZigTest;
}
try (struct {
threads: [10]*std.Thread = undefined,
const Context = struct {
threads: [4]std.Thread = undefined,
broadcast: Atomic(u32) = Atomic(u32).init(0),
notified: Atomic(usize) = Atomic(usize).init(0),
const Self = @This();
const BROADCAST_EMPTY = 0;
const BROADCAST_SENT = 1;
const BROADCAST_RECEIVED = 2;
fn runReceiver(self: *Self) void {
fn runSender(self: *@This()) !void {
self.broadcast.store(BROADCAST_SENT, .Monotonic);
Futex.wake(&self.broadcast, @intCast(u32, self.threads.len));
while (true) {
const broadcast = self.broadcast.load(.Acquire);
if (broadcast == BROADCAST_RECEIVED) break;
try testing.expectEqual(broadcast, BROADCAST_SENT);
Futex.wait(&self.broadcast, broadcast, null) catch unreachable;
}
}
fn runReceiver(self: *@This()) void {
while (true) {
const broadcast = self.broadcast.load(.Acquire);
if (broadcast == BROADCAST_SENT) break;
@ -473,98 +486,77 @@ test "Futex - Broadcast" {
Futex.wake(&self.broadcast, 1);
}
}
};
fn run() !void {
var self = Self{};
var ctx = Context{};
for (ctx.threads) |*thread|
thread.* = try std.Thread.spawn(.{}, Context.runReceiver, .{&ctx});
defer for (ctx.threads) |thread|
thread.join();
for (self.threads) |*thread|
thread.* = try std.Thread.spawn(runReceiver, &self);
defer for (self.threads) |thread|
thread.wait();
// Try to wait for the threads to start before running runSender().
// NOTE: not actually needed for correctness.
std.time.sleep(16 * std.time.ns_per_ms);
try ctx.runSender();
std.time.sleep(16 * std.time.ns_per_ms);
self.broadcast.store(BROADCAST_SENT, .Monotonic);
Futex.wake(&self.broadcast, @intCast(u32, self.threads.len));
while (true) {
const broadcast = self.broadcast.load(.Acquire);
if (broadcast == BROADCAST_RECEIVED) break;
try testing.expectEqual(broadcast, BROADCAST_SENT);
Futex.wait(&self.broadcast, broadcast, null) catch unreachable;
}
const notified = self.notified.load(.Monotonic);
try testing.expectEqual(notified, self.threads.len);
}
}).run();
const notified = ctx.notified.load(.Monotonic);
try testing.expectEqual(notified, ctx.threads.len);
}
test "Futex - Chain" {
if (!single_threaded) {
return;
if (single_threaded) {
return error.SkipZigTest;
}
try (struct {
const Signal = struct {
value: Atomic(u32) = Atomic(u32).init(0),
fn wait(self: *@This()) void {
while (true) {
const value = self.value.load(.Acquire);
if (value == 1) break;
assert(value == 0);
Futex.wait(&self.value, 0, null) catch unreachable;
}
}
fn notify(self: *@This()) void {
assert(self.value.load(.Unordered) == 0);
self.value.store(1, .Release);
Futex.wake(&self.value, 1);
}
};
const Context = struct {
completed: Signal = .{},
threads: [10]struct {
thread: *std.Thread,
threads: [4]struct {
thread: std.Thread,
signal: Signal,
} = undefined,
const Signal = struct {
state: Atomic(u32) = Atomic(u32).init(0),
fn run(self: *@This(), index: usize) void {
const this_signal = &self.threads[index].signal;
fn wait(self: *Signal) void {
while (true) {
const value = self.value.load(.Acquire);
if (value == 1) break;
assert(value == 0);
Futex.wait(&self.value, 0, null) catch unreachable;
}
var next_signal = &self.completed;
if (index + 1 < self.threads.len) {
next_signal = &self.threads[index + 1].signal;
}
fn notify(self: *Signal) void {
assert(self.value.load(.Unordered) == 0);
self.value.store(1, .Release);
Futex.wake(&self.value, 1);
}
};
const Self = @This();
const Chain = struct {
self: *Self,
index: usize,
fn run(chain: Chain) void {
const this_signal = &chain.self.threads[chain.index].signal;
var next_signal = &chain.self.completed;
if (chain.index + 1 < chain.self.threads.len) {
next_signal = &chain.self.threads[chain.index + 1].signal;
}
this_signal.wait();
next_signal.notify();
}
};
fn run() !void {
var self = Self{};
for (self.threads) |*entry, index| {
entry.signal = .{};
entry.thread = try std.Thread.spawn(Chain.run, .{
.self = &self,
.index = index,
});
}
self.threads[0].signal.notify();
self.completed.wait();
for (self.threads) |entry| {
entry.thread.wait();
}
this_signal.wait();
next_signal.notify();
}
}).run();
};
var ctx = Context{};
for (ctx.threads) |*entry, index| {
entry.signal = .{};
entry.thread = try std.Thread.spawn(.{}, Context.run, .{ &ctx, index });
}
ctx.threads[0].signal.notify();
ctx.completed.wait();
for (ctx.threads) |entry| {
entry.thread.join();
}
}

View File

@ -297,12 +297,12 @@ test "basic usage" {
try testing.expect(context.data == TestContext.incr_count);
} else {
const thread_count = 10;
var threads: [thread_count]*std.Thread = undefined;
var threads: [thread_count]std.Thread = undefined;
for (threads) |*t| {
t.* = try std.Thread.spawn(worker, &context);
t.* = try std.Thread.spawn(.{}, worker, .{&context});
}
for (threads) |t|
t.wait();
t.join();
try testing.expect(context.data == thread_count * TestContext.incr_count);
}

View File

@ -281,8 +281,8 @@ test "basic usage" {
var context: Context = undefined;
try context.init();
defer context.deinit();
const receiver = try std.Thread.spawn(Context.receiver, &context);
defer receiver.wait();
const receiver = try std.Thread.spawn(.{}, Context.receiver, .{&context});
defer receiver.join();
try context.sender();
if (false) {
@ -290,8 +290,8 @@ test "basic usage" {
// https://github.com/ziglang/zig/issues/7009
var timed = Context.init();
defer timed.deinit();
const sleeper = try std.Thread.spawn(Context.sleeper, &timed);
defer sleeper.wait();
const sleeper = try std.Thread.spawn(.{}, Context.sleeper, .{&timed});
defer sleeper.join();
try timed.timedWaiter();
}
}

View File

@ -384,8 +384,8 @@ test "basic usage" {
};
var context = Context{};
const receiver = try std.Thread.spawn(Context.receiver, &context);
defer receiver.wait();
const receiver = try std.Thread.spawn(.{}, Context.receiver, .{&context});
defer receiver.join();
try context.sender();
if (false) {
@ -393,8 +393,8 @@ test "basic usage" {
// https://github.com/ziglang/zig/issues/7009
var timed = Context.init();
defer timed.deinit();
const sleeper = try std.Thread.spawn(Context.sleeper, &timed);
defer sleeper.wait();
const sleeper = try std.Thread.spawn(.{}, Context.sleeper, .{&timed});
defer sleeper.join();
try timed.timedWaiter();
}
}

View File

@ -214,20 +214,20 @@ test "std.atomic.Queue" {
} else {
try expect(context.queue.isEmpty());
var putters: [put_thread_count]*std.Thread = undefined;
var putters: [put_thread_count]std.Thread = undefined;
for (putters) |*t| {
t.* = try std.Thread.spawn(startPuts, &context);
t.* = try std.Thread.spawn(.{}, startPuts, .{&context});
}
var getters: [put_thread_count]*std.Thread = undefined;
var getters: [put_thread_count]std.Thread = undefined;
for (getters) |*t| {
t.* = try std.Thread.spawn(startGets, &context);
t.* = try std.Thread.spawn(.{}, startGets, .{&context});
}
for (putters) |t|
t.wait();
t.join();
@atomicStore(bool, &context.puts_done, true, .SeqCst);
for (getters) |t|
t.wait();
t.join();
try expect(context.queue.isEmpty());
}

View File

@ -121,20 +121,20 @@ test "std.atomic.stack" {
}
}
} else {
var putters: [put_thread_count]*std.Thread = undefined;
var putters: [put_thread_count]std.Thread = undefined;
for (putters) |*t| {
t.* = try std.Thread.spawn(startPuts, &context);
t.* = try std.Thread.spawn(.{}, startPuts, .{&context});
}
var getters: [put_thread_count]*std.Thread = undefined;
var getters: [put_thread_count]std.Thread = undefined;
for (getters) |*t| {
t.* = try std.Thread.spawn(startGets, &context);
t.* = try std.Thread.spawn(.{}, startGets, .{&context});
}
for (putters) |t|
t.wait();
t.join();
@atomicStore(bool, &context.puts_done, true, .SeqCst);
for (getters) |t|
t.wait();
t.join();
}
if (context.put_sum != context.get_sum) {

View File

@ -277,6 +277,7 @@ pub extern "c" fn pthread_attr_setguardsize(attr: *pthread_attr_t, guardsize: us
pub extern "c" fn pthread_attr_destroy(attr: *pthread_attr_t) c_int;
pub extern "c" fn pthread_self() pthread_t;
pub extern "c" fn pthread_join(thread: pthread_t, arg_return: ?*?*c_void) c_int;
pub extern "c" fn pthread_detach(thread: pthread_t) c_int;
pub extern "c" fn pthread_atfork(
prepare: ?fn () callconv(.C) void,
parent: ?fn () callconv(.C) void,

View File

@ -273,8 +273,8 @@ pub fn panicExtra(trace: ?*const builtin.StackTrace, first_trace_addr: ?usize, c
if (builtin.single_threaded) {
stderr.print("panic: ", .{}) catch os.abort();
} else {
const current_thread_id = std.Thread.getCurrentThreadId();
stderr.print("thread {d} panic: ", .{current_thread_id}) catch os.abort();
const current_thread_id = std.Thread.getCurrentId();
stderr.print("thread {} panic: ", .{current_thread_id}) catch os.abort();
}
stderr.print(format ++ "\n", args) catch os.abort();
if (trace) |t| {

View File

@ -21,12 +21,12 @@ pub const Loop = struct {
os_data: OsData,
final_resume_node: ResumeNode,
pending_event_count: usize,
extra_threads: []*Thread,
extra_threads: []Thread,
/// TODO change this to a pool of configurable number of threads
/// and rename it to be not file-system-specific. it will become
/// a thread pool for turning non-CPU-bound blocking things into
/// async things. A fallback for any missing OS-specific API.
fs_thread: *Thread,
fs_thread: Thread,
fs_queue: std.atomic.Queue(Request),
fs_end_request: Request.Node,
fs_thread_wakeup: std.Thread.ResetEvent,
@ -137,7 +137,7 @@ pub const Loop = struct {
}
/// After initialization, call run().
/// This is the same as `initThreadPool` using `Thread.cpuCount` to determine the thread
/// This is the same as `initThreadPool` using `Thread.getCpuCount` to determine the thread
/// pool size.
/// TODO copy elision / named return values so that the threads referencing *Loop
/// have the correct pointer value.
@ -145,7 +145,7 @@ pub const Loop = struct {
pub fn initMultiThreaded(self: *Loop) !void {
if (builtin.single_threaded)
@compileError("initMultiThreaded unavailable when building in single-threaded mode");
const core_count = try Thread.cpuCount();
const core_count = try Thread.getCpuCount();
return self.initThreadPool(core_count);
}
@ -183,17 +183,17 @@ pub const Loop = struct {
resume_node_count,
);
self.extra_threads = try self.arena.allocator.alloc(*Thread, extra_thread_count);
self.extra_threads = try self.arena.allocator.alloc(Thread, extra_thread_count);
try self.initOsData(extra_thread_count);
errdefer self.deinitOsData();
if (!builtin.single_threaded) {
self.fs_thread = try Thread.spawn(posixFsRun, self);
self.fs_thread = try Thread.spawn(.{}, posixFsRun, .{self});
}
errdefer if (!builtin.single_threaded) {
self.posixFsRequest(&self.fs_end_request);
self.fs_thread.wait();
self.fs_thread.join();
};
if (!std.builtin.single_threaded)
@ -264,11 +264,11 @@ pub const Loop = struct {
assert(amt == wakeup_bytes.len);
while (extra_thread_index != 0) {
extra_thread_index -= 1;
self.extra_threads[extra_thread_index].wait();
self.extra_threads[extra_thread_index].join();
}
}
while (extra_thread_index < extra_thread_count) : (extra_thread_index += 1) {
self.extra_threads[extra_thread_index] = try Thread.spawn(workerRun, self);
self.extra_threads[extra_thread_index] = try Thread.spawn(.{}, workerRun, .{self});
}
},
.macos, .freebsd, .netbsd, .dragonfly, .openbsd => {
@ -329,11 +329,11 @@ pub const Loop = struct {
_ = os.kevent(self.os_data.kqfd, final_kev_arr, empty_kevs, null) catch unreachable;
while (extra_thread_index != 0) {
extra_thread_index -= 1;
self.extra_threads[extra_thread_index].wait();
self.extra_threads[extra_thread_index].join();
}
}
while (extra_thread_index < extra_thread_count) : (extra_thread_index += 1) {
self.extra_threads[extra_thread_index] = try Thread.spawn(workerRun, self);
self.extra_threads[extra_thread_index] = try Thread.spawn(.{}, workerRun, .{self});
}
},
.windows => {
@ -378,11 +378,11 @@ pub const Loop = struct {
}
while (extra_thread_index != 0) {
extra_thread_index -= 1;
self.extra_threads[extra_thread_index].wait();
self.extra_threads[extra_thread_index].join();
}
}
while (extra_thread_index < extra_thread_count) : (extra_thread_index += 1) {
self.extra_threads[extra_thread_index] = try Thread.spawn(workerRun, self);
self.extra_threads[extra_thread_index] = try Thread.spawn(.{}, workerRun, .{self});
}
},
else => {},
@ -651,18 +651,18 @@ pub const Loop = struct {
.netbsd,
.dragonfly,
.openbsd,
=> self.fs_thread.wait(),
=> self.fs_thread.join(),
else => {},
}
}
for (self.extra_threads) |extra_thread| {
extra_thread.wait();
extra_thread.join();
}
@atomicStore(bool, &self.delay_queue.is_running, false, .SeqCst);
self.delay_queue.event.set();
self.delay_queue.thread.wait();
self.delay_queue.thread.join();
}
/// Runs the provided function asynchronously. The function's frame is allocated
@ -787,7 +787,7 @@ pub const Loop = struct {
const DelayQueue = struct {
timer: std.time.Timer,
waiters: Waiters,
thread: *std.Thread,
thread: std.Thread,
event: std.Thread.AutoResetEvent,
is_running: bool,
@ -802,7 +802,7 @@ pub const Loop = struct {
.event = std.Thread.AutoResetEvent{},
.is_running = true,
// Must be last so that it can read the other state, such as `is_running`.
.thread = try std.Thread.spawn(DelayQueue.run, self),
.thread = try std.Thread.spawn(.{}, DelayQueue.run, .{self}),
};
}

View File

@ -862,11 +862,10 @@ test "open file with exclusive lock twice, make sure it waits" {
errdefer file.close();
const S = struct {
const C = struct { dir: *fs.Dir, evt: *std.Thread.ResetEvent };
fn checkFn(ctx: C) !void {
const file1 = try ctx.dir.createFile(filename, .{ .lock = .Exclusive });
fn checkFn(dir: *fs.Dir, evt: *std.Thread.ResetEvent) !void {
const file1 = try dir.createFile(filename, .{ .lock = .Exclusive });
defer file1.close();
ctx.evt.set();
evt.set();
}
};
@ -874,8 +873,8 @@ test "open file with exclusive lock twice, make sure it waits" {
try evt.init();
defer evt.deinit();
const t = try std.Thread.spawn(S.checkFn, S.C{ .dir = &tmp.dir, .evt = &evt });
defer t.wait();
const t = try std.Thread.spawn(.{}, S.checkFn, .{ &tmp.dir, &evt });
defer t.join();
const SLEEP_TIMEOUT_NS = 10 * std.time.ns_per_ms;
// Make sure we've slept enough.

View File

@ -161,8 +161,8 @@ test "listen on a port, send bytes, receive bytes" {
}
};
const t = try std.Thread.spawn(S.clientFn, server.listen_address);
defer t.wait();
const t = try std.Thread.spawn(.{}, S.clientFn, .{server.listen_address});
defer t.join();
var client = try server.accept();
defer client.stream.close();
@ -277,7 +277,7 @@ test "listen on a unix socket, send bytes, receive bytes" {
try server.listen(socket_addr);
const S = struct {
fn clientFn(_: void) !void {
fn clientFn() !void {
const socket = try net.connectUnixSocket(socket_path);
defer socket.close();
@ -285,8 +285,8 @@ test "listen on a unix socket, send bytes, receive bytes" {
}
};
const t = try std.Thread.spawn(S.clientFn, {});
defer t.wait();
const t = try std.Thread.spawn(.{}, S.clientFn, .{});
defer t.join();
var client = try server.accept();
defer client.stream.close();

View File

@ -55,16 +55,16 @@ test "Once executes its function just once" {
global_once.call();
global_once.call();
} else {
var threads: [10]*std.Thread = undefined;
defer for (threads) |handle| handle.wait();
var threads: [10]std.Thread = undefined;
defer for (threads) |handle| handle.join();
for (threads) |*handle| {
handle.* = try std.Thread.spawn(struct {
handle.* = try std.Thread.spawn(.{}, struct {
fn thread_fn(x: u8) void {
_ = x;
global_once.call();
}
}.thread_fn, 0);
}.thread_fn, .{0});
}
}

View File

@ -320,18 +320,9 @@ test "std.Thread.getCurrentId" {
if (builtin.single_threaded) return error.SkipZigTest;
var thread_current_id: Thread.Id = undefined;
const thread = try Thread.spawn(testThreadIdFn, &thread_current_id);
const thread_id = thread.handle();
thread.wait();
if (Thread.use_pthreads) {
try expect(thread_current_id == thread_id);
} else if (native_os == .windows) {
try expect(Thread.getCurrentId() != thread_current_id);
} else {
// If the thread completes very quickly, then thread_id can be 0. See the
// documentation comments for `std.Thread.handle`.
try expect(thread_id == 0 or thread_current_id == thread_id);
}
const thread = try Thread.spawn(.{}, testThreadIdFn, .{&thread_current_id});
thread.join();
try expect(Thread.getCurrentId() != thread_current_id);
}
test "spawn threads" {
@ -339,21 +330,20 @@ test "spawn threads" {
var shared_ctx: i32 = 1;
const thread1 = try Thread.spawn(start1, {});
const thread2 = try Thread.spawn(start2, &shared_ctx);
const thread3 = try Thread.spawn(start2, &shared_ctx);
const thread4 = try Thread.spawn(start2, &shared_ctx);
const thread1 = try Thread.spawn(.{}, start1, .{});
const thread2 = try Thread.spawn(.{}, start2, .{&shared_ctx});
const thread3 = try Thread.spawn(.{}, start2, .{&shared_ctx});
const thread4 = try Thread.spawn(.{}, start2, .{&shared_ctx});
thread1.wait();
thread2.wait();
thread3.wait();
thread4.wait();
thread1.join();
thread2.join();
thread3.join();
thread4.join();
try expect(shared_ctx == 4);
}
fn start1(ctx: void) u8 {
_ = ctx;
fn start1() u8 {
return 0;
}
@ -365,22 +355,21 @@ fn start2(ctx: *i32) u8 {
test "cpu count" {
if (native_os == .wasi) return error.SkipZigTest;
const cpu_count = try Thread.cpuCount();
const cpu_count = try Thread.getCpuCount();
try expect(cpu_count >= 1);
}
test "thread local storage" {
if (builtin.single_threaded) return error.SkipZigTest;
const thread1 = try Thread.spawn(testTls, {});
const thread2 = try Thread.spawn(testTls, {});
try testTls({});
thread1.wait();
thread2.wait();
const thread1 = try Thread.spawn(.{}, testTls, .{});
const thread2 = try Thread.spawn(.{}, testTls, .{});
try testTls();
thread1.join();
thread2.join();
}
threadlocal var x: i32 = 1234;
fn testTls(context: void) !void {
_ = context;
fn testTls() !void {
if (x != 1234) return error.TlsBadStartValue;
x += 1;
if (x != 1235) return error.TlsBadEndValue;

View File

@ -69,6 +69,13 @@ pub const Target = struct {
};
}
pub fn isBSD(tag: Tag) bool {
return tag.isDarwin() or switch (tag) {
.kfreebsd, .freebsd, .openbsd, .netbsd, .dragonfly => true,
else => false,
};
}
pub fn dynamicLibSuffix(tag: Tag) [:0]const u8 {
if (tag.isDarwin()) {
return ".dylib";
@ -787,6 +794,13 @@ pub const Target = struct {
};
}
pub fn isAARCH64(arch: Arch) bool {
return switch (arch) {
.aarch64, .aarch64_be, .aarch64_32 => true,
else => false,
};
}
pub fn isThumb(arch: Arch) bool {
return switch (arch) {
.thumb, .thumbeb => true,
@ -1365,10 +1379,7 @@ pub const Target = struct {
}
pub fn isAndroid(self: Target) bool {
return switch (self.abi) {
.android => true,
else => false,
};
return self.abi == .android;
}
pub fn isWasm(self: Target) bool {
@ -1379,6 +1390,10 @@ pub const Target = struct {
return self.os.tag.isDarwin();
}
pub fn isBSD(self: Target) bool {
return self.os.tag.isBSD();
}
pub fn isGnuLibC_os_tag_abi(os_tag: Os.Tag, abi: Abi) bool {
return os_tag == .linux and abi.isGnu();
}

View File

@ -21,7 +21,7 @@ const Runnable = struct {
const Worker = struct {
pool: *ThreadPool,
thread: *std.Thread,
thread: std.Thread,
/// The node is for this worker only and must have an already initialized event
/// when the thread is spawned.
idle_node: IdleQueue.Node,
@ -60,7 +60,7 @@ pub fn init(self: *ThreadPool, allocator: *std.mem.Allocator) !void {
if (std.builtin.single_threaded)
return;
const worker_count = std.math.max(1, std.Thread.cpuCount() catch 1);
const worker_count = std.math.max(1, std.Thread.getCpuCount() catch 1);
self.workers = try allocator.alloc(Worker, worker_count);
errdefer allocator.free(self.workers);
@ -74,13 +74,13 @@ pub fn init(self: *ThreadPool, allocator: *std.mem.Allocator) !void {
try worker.idle_node.data.init();
errdefer worker.idle_node.data.deinit();
worker.thread = try std.Thread.spawn(Worker.run, worker);
worker.thread = try std.Thread.spawn(.{}, Worker.run, .{worker});
}
}
fn destroyWorkers(self: *ThreadPool, spawned: usize) void {
for (self.workers[0..spawned]) |*worker| {
worker.thread.wait();
worker.thread.join();
worker.idle_node.data.deinit();
}
}

View File

@ -816,18 +816,20 @@ pub fn main() anyerror!void {
});
}
} else {
var threads = try arena.alloc(*std.Thread, llvm_targets.len);
var threads = try arena.alloc(std.Thread, llvm_targets.len);
for (llvm_targets) |llvm_target, i| {
threads[i] = try std.Thread.spawn(processOneTarget, .{
.llvm_tblgen_exe = llvm_tblgen_exe,
.llvm_src_root = llvm_src_root,
.zig_src_dir = zig_src_dir,
.root_progress = root_progress,
.llvm_target = llvm_target,
threads[i] = try std.Thread.spawn(.{}, processOneTarget, .{
Job{
.llvm_tblgen_exe = llvm_tblgen_exe,
.llvm_src_root = llvm_src_root,
.zig_src_dir = zig_src_dir,
.root_progress = root_progress,
.llvm_target = llvm_target,
},
});
}
for (threads) |thread| {
thread.wait();
thread.join();
}
}
}