From 71f876295981ecf5dfb8daadc05e7e9c1c7e1cbe Mon Sep 17 00:00:00 2001 From: jumpnbrownweasel <49791153+jumpnbrownweasel@users.noreply.github.com> Date: Mon, 17 Oct 2022 16:15:15 -0700 Subject: [PATCH] Fix for #13163: DefaultRwLock accumulates write-waiters, eventually fails to write lock (#13180) * Fix for: DefaultRwLock accumulates write-waiters, eventually fails to write lock #13163 * Comment out debug.print at the end of the last test. * Code formatting * - use equality test after lock/unlock rather than peeking into internals. however, this is still implementation specific and only done for DefaultRwLock. - add num_reads maximum to ensure that reader threads stop if writer threads are starved - use relaxed orderings for the read atomic counter - don't check at the end for non-zero read ops, since the reader threads may only run once if they are starved * More review changes - Monotonic is sufficient for incrementing the reads counter --- lib/std/Thread/RwLock.zig | 128 +++++++++++++++++++++++++++++++++++++- 1 file changed, 127 insertions(+), 1 deletion(-) diff --git a/lib/std/Thread/RwLock.zig b/lib/std/Thread/RwLock.zig index 46d46cdfa4..c4105817b3 100644 --- a/lib/std/Thread/RwLock.zig +++ b/lib/std/Thread/RwLock.zig @@ -9,6 +9,7 @@ const RwLock = @This(); const std = @import("../std.zig"); const builtin = @import("builtin"); const assert = std.debug.assert; +const testing = std.testing; pub const Impl = if (builtin.single_threaded) SingleThreadedRwLock @@ -190,7 +191,7 @@ pub const DefaultRwLock = struct { _ = @atomicRmw(usize, &rwl.state, .Add, WRITER, .SeqCst); rwl.mutex.lock(); - const state = @atomicRmw(usize, &rwl.state, .Or, IS_WRITING, .SeqCst); + const state = @atomicRmw(usize, &rwl.state, .Add, IS_WRITING -% WRITER, .SeqCst); if (state & READER_MASK != 0) rwl.semaphore.wait(); } @@ -247,3 +248,128 @@ pub const DefaultRwLock = struct { rwl.semaphore.post(); } }; + +test "DefaultRwLock - internal state" { + var rwl = DefaultRwLock{}; + + // The following failed prior to the fix for Issue #13163, + // where the WRITER flag was subtracted by the lock method. + + rwl.lock(); + rwl.unlock(); + try testing.expectEqual(rwl, DefaultRwLock{}); +} + +test "RwLock - smoke test" { + var rwl = RwLock{}; + + rwl.lock(); + try testing.expect(!rwl.tryLock()); + try testing.expect(!rwl.tryLockShared()); + rwl.unlock(); + + try testing.expect(rwl.tryLock()); + try testing.expect(!rwl.tryLock()); + try testing.expect(!rwl.tryLockShared()); + rwl.unlock(); + + rwl.lockShared(); + try testing.expect(!rwl.tryLock()); + try testing.expect(rwl.tryLockShared()); + rwl.unlockShared(); + rwl.unlockShared(); + + try testing.expect(rwl.tryLockShared()); + try testing.expect(!rwl.tryLock()); + try testing.expect(rwl.tryLockShared()); + rwl.unlockShared(); + rwl.unlockShared(); + + rwl.lock(); + rwl.unlock(); +} + +test "RwLock - concurrent access" { + if (builtin.single_threaded) + return; + + const num_writers: usize = 2; + const num_readers: usize = 4; + const num_writes: usize = 10000; + const num_reads: usize = num_writes * 2; + + const Runner = struct { + const Self = @This(); + + rwl: RwLock = .{}, + writes: usize = 0, + reads: std.atomic.Atomic(usize) = std.atomic.Atomic(usize).init(0), + + term1: usize = 0, + term2: usize = 0, + term_sum: usize = 0, + + fn reader(self: *Self) !void { + while (true) { + self.rwl.lockShared(); + defer self.rwl.unlockShared(); + + if (self.writes >= num_writes or self.reads.load(.Unordered) >= num_reads) + break; + + try self.check(); + + _ = self.reads.fetchAdd(1, .Monotonic); + } + } + + fn writer(self: *Self, thread_idx: usize) !void { + var prng = std.rand.DefaultPrng.init(thread_idx); + var rnd = prng.random(); + + while (true) { + self.rwl.lock(); + defer self.rwl.unlock(); + + if (self.writes >= num_writes) + break; + + try self.check(); + + const term1 = rnd.int(usize); + self.term1 = term1; + try std.Thread.yield(); + + const term2 = rnd.int(usize); + self.term2 = term2; + try std.Thread.yield(); + + self.term_sum = term1 +% term2; + self.writes += 1; + } + } + + fn check(self: *const Self) !void { + const term_sum = self.term_sum; + try std.Thread.yield(); + + const term2 = self.term2; + try std.Thread.yield(); + + const term1 = self.term1; + try testing.expectEqual(term_sum, term1 +% term2); + } + }; + + var runner = Runner{}; + var threads: [num_writers + num_readers]std.Thread = undefined; + + for (threads[0..num_writers]) |*t, i| t.* = try std.Thread.spawn(.{}, Runner.writer, .{ &runner, i }); + for (threads[num_writers..]) |*t| t.* = try std.Thread.spawn(.{}, Runner.reader, .{&runner}); + + for (threads) |t| t.join(); + + try testing.expectEqual(num_writes, runner.writes); + + //std.debug.print("reads={}\n", .{ runner.reads.load(.Unordered)}); +}