From fd4a607bb2f1a1cbf8b8c1fd5d35f5f775e79114 Mon Sep 17 00:00:00 2001 From: kprotty Date: Sat, 26 Jun 2021 09:03:53 -0500 Subject: [PATCH] std.Thread: fix futex test + thread errors --- lib/std/Thread.zig | 2 + lib/std/Thread/Futex.zig | 186 ++++++++++++++++++--------------------- 2 files changed, 90 insertions(+), 98 deletions(-) diff --git a/lib/std/Thread.zig b/lib/std/Thread.zig index 04bd94729d..19987d47a1 100644 --- a/lib/std/Thread.zig +++ b/lib/std/Thread.zig @@ -385,6 +385,7 @@ const PosixThreadImpl = struct { }; const args_ptr = try allocator.create(Args); + args_ptr.* = args; errdefer allocator.destroy(args_ptr); var attr: c.pthread_attr_t = undefined; @@ -523,6 +524,7 @@ const LinuxThreadImpl = struct { error.PermissionDenied => unreachable, else => |e| return e, }; + assert(mapped.len >= map_bytes); errdefer os.munmap(mapped); // map everything but the guard page as read/write diff --git a/lib/std/Thread/Futex.zig b/lib/std/Thread/Futex.zig index 26ebe23364..b1b2128caa 100644 --- a/lib/std/Thread/Futex.zig +++ b/lib/std/Thread/Futex.zig @@ -391,70 +391,74 @@ test "Futex - wait/wake" { } test "Futex - Signal" { - if (!single_threaded) { + if (single_threaded) { return; } - try (struct { + const Paddle = struct { value: Atomic(u32) = Atomic(u32).init(0), + current: u32 = 0, - const Self = @This(); + fn run(self: *@This(), hit_to: *@This()) !void { + var iterations: usize = 4; + while (iterations > 0) : (iterations -= 1) { - fn send(self: *Self, value: u32) void { - self.value.store(value, .Release); - Futex.wake(&self.value, 1); - } + var value: u32 = undefined; + while (true) { + value = self.value.load(.Acquire); + if (value != self.current) break; + Futex.wait(&self.value, self.current, null) catch unreachable; + } - fn recv(self: *Self, expected: u32) void { - while (true) { - const value = self.value.load(.Acquire); - if (value == expected) break; - Futex.wait(&self.value, value, null) catch unreachable; + try testing.expectEqual(value, self.current + 1); + self.current = value; + + _ = hit_to.value.fetchAdd(1, .Release); + Futex.wake(&hit_to.value, 1); } } + }; - const start_value = 1; + var ping = Paddle{}; + var pong = Paddle{}; - fn runThread(rx: *Self, tx: *Self) void { - var iterations: u32 = start_value; - while (iterations < 10) : (iterations += 1) { - rx.recv(iterations); - tx.send(iterations); - } - } + const t1 = try std.Thread.spawn(.{}, Paddle.run, .{&ping, &pong}); + defer t1.join(); - fn run() !void { - var ping = Self{}; - var pong = Self{}; + const t2 = try std.Thread.spawn(.{}, Paddle.run, .{&pong, &ping}); + defer t2.join(); - const t1 = try std.Thread.spawn(.{}, runThread, .{ &ping, &pong }); - defer t1.join(); - - const t2 = try std.Thread.spawn(.{}, runThread, .{ &pong, &ping }); - defer t2.join(); - - ping.send(start_value); - } - }).run(); + _ = ping.value.fetchAdd(1, .Release); + Futex.wake(&ping.value, 1); } test "Futex - Broadcast" { - if (!single_threaded) { + if (single_threaded) { return; } - try (struct { - threads: [10]std.Thread = undefined, + const Context = struct { + threads: [4]std.Thread = undefined, broadcast: Atomic(u32) = Atomic(u32).init(0), notified: Atomic(usize) = Atomic(usize).init(0), - const Self = @This(); - const BROADCAST_EMPTY = 0; const BROADCAST_SENT = 1; const BROADCAST_RECEIVED = 2; - fn runReceiver(self: *Self) void { + fn runSender(self: *@This()) !void { + self.broadcast.store(BROADCAST_SENT, .Monotonic); + Futex.wake(&self.broadcast, @intCast(u32, self.threads.len)); + + while (true) { + const broadcast = self.broadcast.load(.Acquire); + if (broadcast == BROADCAST_RECEIVED) break; + try testing.expectEqual(broadcast, BROADCAST_SENT); + Futex.wait(&self.broadcast, broadcast, null) catch unreachable; + } + } + + fn runReceiver(self: *@This()) void { while (true) { const broadcast = self.broadcast.load(.Acquire); if (broadcast == BROADCAST_SENT) break; @@ -468,66 +472,55 @@ test "Futex - Broadcast" { Futex.wake(&self.broadcast, 1); } } + }; - fn run() !void { - var self = Self{}; + var ctx = Context{}; + for (ctx.threads) |*thread| + thread.* = try std.Thread.spawn(.{}, Context.runReceiver, .{&ctx}); + defer for (ctx.threads) |thread| + thread.join(); - for (self.threads) |*thread| - thread.* = try std.Thread.spawn(runReceiver, &self); - defer for (self.threads) |thread| - thread.join(); + // Try to wait for the threads to start before running runSender(). + // NOTE: not actually needed for correctness. + std.time.sleep(16 * std.time.ns_per_ms); + try ctx.runSender(); - std.time.sleep(16 * std.time.ns_per_ms); - self.broadcast.store(BROADCAST_SENT, .Monotonic); - Futex.wake(&self.broadcast, @intCast(u32, self.threads.len)); - - while (true) { - const broadcast = self.broadcast.load(.Acquire); - if (broadcast == BROADCAST_RECEIVED) break; - try testing.expectEqual(broadcast, BROADCAST_SENT); - Futex.wait(&self.broadcast, broadcast, null) catch unreachable; - } - - const notified = self.notified.load(.Monotonic); - try testing.expectEqual(notified, self.threads.len); - } - }).run(); + const notified = ctx.notified.load(.Monotonic); + try testing.expectEqual(notified, ctx.threads.len); } test "Futex - Chain" { - if (!single_threaded) { + if (single_threaded) { return; } - try (struct { + const Signal = struct { + value: Atomic(u32) = Atomic(u32).init(0), + + fn wait(self: *@This()) void { + while (true) { + const value = self.value.load(.Acquire); + if (value == 1) break; + assert(value == 0); + Futex.wait(&self.value, 0, null) catch unreachable; + } + } + + fn notify(self: *@This()) void { + assert(self.value.load(.Unordered) == 0); + self.value.store(1, .Release); + Futex.wake(&self.value, 1); + } + }; + + const Context = struct { completed: Signal = .{}, - threads: [10]struct { + threads: [4]struct { thread: std.Thread, signal: Signal, } = undefined, - const Signal = struct { - state: Atomic(u32) = Atomic(u32).init(0), - - fn wait(self: *Signal) void { - while (true) { - const value = self.value.load(.Acquire); - if (value == 1) break; - assert(value == 0); - Futex.wait(&self.value, 0, null) catch unreachable; - } - } - - fn notify(self: *Signal) void { - assert(self.value.load(.Unordered) == 0); - self.value.store(1, .Release); - Futex.wake(&self.value, 1); - } - }; - - const Self = @This(); - - fn runThread(self: *Self, index: usize) void { + fn run(self: *@This(), index: usize) void { const this_signal = &self.threads[index].signal; var next_signal = &self.completed; @@ -538,21 +531,18 @@ test "Futex - Chain" { this_signal.wait(); next_signal.notify(); } + }; - fn run() !void { - var self = Self{}; + var ctx = Context{}; + for (ctx.threads) |*entry, index| { + entry.signal = .{}; + entry.thread = try std.Thread.spawn(.{}, Context.run, .{&ctx, index}); + } - for (self.threads) |*entry, index| { - entry.signal = .{}; - entry.thread = try std.Thread.spawn(.{}, runThread, .{&self, index}); - } + ctx.threads[0].signal.notify(); + ctx.completed.wait(); - self.threads[0].signal.notify(); - self.completed.wait(); - - for (self.threads) |entry| { - entry.thread.join(); - } - } - }).run(); + for (ctx.threads) |entry| { + entry.thread.join(); + } }