From edb2f72988cd180c5d87b03481fa1c20b3325968 Mon Sep 17 00:00:00 2001 From: Andrew Kelley Date: Wed, 22 Nov 2023 16:39:37 -0700 Subject: [PATCH] std: remove atomic.Queue This was originally supposed to be a lock-free queue, but I gave up on that and made it be a thread-safe queue instead. Putting the mutex directly inside the queue data structure makes it non-composeable. Instead, the recommendation is to use a normal queue protected by an external mutex. --- CMakeLists.txt | 1 - lib/std/atomic.zig | 2 - lib/std/atomic/queue.zig | 413 --------------------------------------- 3 files changed, 416 deletions(-) delete mode 100644 lib/std/atomic/queue.zig diff --git a/CMakeLists.txt b/CMakeLists.txt index 2aceab91f2..f2002987b9 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -210,7 +210,6 @@ set(ZIG_STAGE2_SOURCES "${CMAKE_SOURCE_DIR}/lib/std/ascii.zig" "${CMAKE_SOURCE_DIR}/lib/std/atomic.zig" "${CMAKE_SOURCE_DIR}/lib/std/atomic/Atomic.zig" - "${CMAKE_SOURCE_DIR}/lib/std/atomic/queue.zig" "${CMAKE_SOURCE_DIR}/lib/std/base64.zig" "${CMAKE_SOURCE_DIR}/lib/std/BitStack.zig" "${CMAKE_SOURCE_DIR}/lib/std/buf_map.zig" diff --git a/lib/std/atomic.zig b/lib/std/atomic.zig index c631232463..2b3c543cf8 100644 --- a/lib/std/atomic.zig +++ b/lib/std/atomic.zig @@ -1,7 +1,6 @@ const std = @import("std.zig"); const builtin = @import("builtin"); -pub const Queue = @import("atomic/queue.zig").Queue; pub const Atomic = @import("atomic/Atomic.zig").Atomic; /// Signals to the processor that the caller is inside a busy-wait spin-loop. @@ -85,6 +84,5 @@ pub const cache_line = switch (builtin.cpu.arch) { }; test { - _ = Queue; _ = Atomic; } diff --git a/lib/std/atomic/queue.zig b/lib/std/atomic/queue.zig deleted file mode 100644 index 552f2f9510..0000000000 --- a/lib/std/atomic/queue.zig +++ /dev/null @@ -1,413 +0,0 @@ -const std = @import("../std.zig"); -const builtin = @import("builtin"); -const assert = std.debug.assert; -const expect = std.testing.expect; - -/// Many producer, many consumer, non-allocating, thread-safe. -/// Uses a mutex to protect access. -/// The queue does not manage ownership and the user is responsible to -/// manage the storage of the nodes. -pub fn Queue(comptime T: type) type { - return struct { - head: ?*Node, - tail: ?*Node, - mutex: std.Thread.Mutex, - - pub const Self = @This(); - pub const Node = std.DoublyLinkedList(T).Node; - - /// Initializes a new queue. The queue does not provide a `deinit()` - /// function, so the user must take care of cleaning up the queue elements. - pub fn init() Self { - return Self{ - .head = null, - .tail = null, - .mutex = std.Thread.Mutex{}, - }; - } - - /// Appends `node` to the queue. - /// The lifetime of `node` must be longer than the lifetime of the queue. - pub fn put(self: *Self, node: *Node) void { - node.next = null; - - self.mutex.lock(); - defer self.mutex.unlock(); - - node.prev = self.tail; - self.tail = node; - if (node.prev) |prev_tail| { - prev_tail.next = node; - } else { - assert(self.head == null); - self.head = node; - } - } - - /// Gets a previously inserted node or returns `null` if there is none. - /// It is safe to `get()` a node from the queue while another thread tries - /// to `remove()` the same node at the same time. - pub fn get(self: *Self) ?*Node { - self.mutex.lock(); - defer self.mutex.unlock(); - - const head = self.head orelse return null; - self.head = head.next; - if (head.next) |new_head| { - new_head.prev = null; - } else { - self.tail = null; - } - // This way, a get() and a remove() are thread-safe with each other. - head.prev = null; - head.next = null; - return head; - } - - /// Prepends `node` to the front of the queue. - /// The lifetime of `node` must be longer than the lifetime of the queue. - pub fn unget(self: *Self, node: *Node) void { - node.prev = null; - - self.mutex.lock(); - defer self.mutex.unlock(); - - const opt_head = self.head; - self.head = node; - if (opt_head) |old_head| { - node.next = old_head; - } else { - assert(self.tail == null); - self.tail = node; - } - } - - /// Removes a node from the queue, returns whether node was actually removed. - /// It is safe to `remove()` a node from the queue while another thread tries - /// to `get()` the same node at the same time. - pub fn remove(self: *Self, node: *Node) bool { - self.mutex.lock(); - defer self.mutex.unlock(); - - if (node.prev == null and node.next == null and self.head != node) { - return false; - } - - if (node.prev) |prev| { - prev.next = node.next; - } else { - self.head = node.next; - } - if (node.next) |next| { - next.prev = node.prev; - } else { - self.tail = node.prev; - } - node.prev = null; - node.next = null; - return true; - } - - /// Returns `true` if the queue is currently empty. - /// Note that in a multi-consumer environment a return value of `false` - /// does not mean that `get` will yield a non-`null` value! - pub fn isEmpty(self: *Self) bool { - self.mutex.lock(); - defer self.mutex.unlock(); - return self.head == null; - } - - /// Dumps the contents of the queue to `stderr`. - pub fn dump(self: *Self) void { - self.dumpToStream(std.io.getStdErr().writer()) catch return; - } - - /// Dumps the contents of the queue to `stream`. - /// Up to 4 elements from the head are dumped and the tail of the queue is - /// dumped as well. - pub fn dumpToStream(self: *Self, stream: anytype) !void { - const S = struct { - fn dumpRecursive( - s: anytype, - optional_node: ?*Node, - indent: usize, - comptime depth: comptime_int, - ) !void { - try s.writeByteNTimes(' ', indent); - if (optional_node) |node| { - try s.print("0x{x}={}\n", .{ @intFromPtr(node), node.data }); - if (depth == 0) { - try s.print("(max depth)\n", .{}); - return; - } - try dumpRecursive(s, node.next, indent + 1, depth - 1); - } else { - try s.print("(null)\n", .{}); - } - } - }; - self.mutex.lock(); - defer self.mutex.unlock(); - - try stream.print("head: ", .{}); - try S.dumpRecursive(stream, self.head, 0, 4); - try stream.print("tail: ", .{}); - try S.dumpRecursive(stream, self.tail, 0, 4); - } - }; -} - -const Context = struct { - allocator: std.mem.Allocator, - queue: *Queue(i32), - put_sum: isize, - get_sum: isize, - get_count: usize, - puts_done: bool, -}; - -// TODO add lazy evaluated build options and then put puts_per_thread behind -// some option such as: "AggressiveMultithreadedFuzzTest". In the AppVeyor -// CI we would use a less aggressive setting since at 1 core, while we still -// want this test to pass, we need a smaller value since there is so much thrashing -// we would also use a less aggressive setting when running in valgrind -const puts_per_thread = 500; -const put_thread_count = 3; - -test "std.atomic.Queue" { - const plenty_of_memory = try std.heap.page_allocator.alloc(u8, 300 * 1024); - defer std.heap.page_allocator.free(plenty_of_memory); - - var fixed_buffer_allocator = std.heap.FixedBufferAllocator.init(plenty_of_memory); - const a = fixed_buffer_allocator.threadSafeAllocator(); - - var queue = Queue(i32).init(); - var context = Context{ - .allocator = a, - .queue = &queue, - .put_sum = 0, - .get_sum = 0, - .puts_done = false, - .get_count = 0, - }; - - if (builtin.single_threaded) { - try expect(context.queue.isEmpty()); - { - var i: usize = 0; - while (i < put_thread_count) : (i += 1) { - try expect(startPuts(&context) == 0); - } - } - try expect(!context.queue.isEmpty()); - context.puts_done = true; - { - var i: usize = 0; - while (i < put_thread_count) : (i += 1) { - try expect(startGets(&context) == 0); - } - } - try expect(context.queue.isEmpty()); - } else { - try expect(context.queue.isEmpty()); - - var putters: [put_thread_count]std.Thread = undefined; - for (&putters) |*t| { - t.* = try std.Thread.spawn(.{}, startPuts, .{&context}); - } - var getters: [put_thread_count]std.Thread = undefined; - for (&getters) |*t| { - t.* = try std.Thread.spawn(.{}, startGets, .{&context}); - } - - for (putters) |t| - t.join(); - @atomicStore(bool, &context.puts_done, true, .SeqCst); - for (getters) |t| - t.join(); - - try expect(context.queue.isEmpty()); - } - - if (context.put_sum != context.get_sum) { - std.debug.panic("failure\nput_sum:{} != get_sum:{}", .{ context.put_sum, context.get_sum }); - } - - if (context.get_count != puts_per_thread * put_thread_count) { - std.debug.panic("failure\nget_count:{} != puts_per_thread:{} * put_thread_count:{}", .{ - context.get_count, - @as(u32, puts_per_thread), - @as(u32, put_thread_count), - }); - } -} - -fn startPuts(ctx: *Context) u8 { - var put_count: usize = puts_per_thread; - var prng = std.rand.DefaultPrng.init(0xdeadbeef); - const random = prng.random(); - while (put_count != 0) : (put_count -= 1) { - std.time.sleep(1); // let the os scheduler be our fuzz - const x = @as(i32, @bitCast(random.int(u32))); - const node = ctx.allocator.create(Queue(i32).Node) catch unreachable; - node.* = .{ - .prev = undefined, - .next = undefined, - .data = x, - }; - ctx.queue.put(node); - _ = @atomicRmw(isize, &ctx.put_sum, .Add, x, .SeqCst); - } - return 0; -} - -fn startGets(ctx: *Context) u8 { - while (true) { - const last = @atomicLoad(bool, &ctx.puts_done, .SeqCst); - - while (ctx.queue.get()) |node| { - std.time.sleep(1); // let the os scheduler be our fuzz - _ = @atomicRmw(isize, &ctx.get_sum, .Add, node.data, .SeqCst); - _ = @atomicRmw(usize, &ctx.get_count, .Add, 1, .SeqCst); - } - - if (last) return 0; - } -} - -test "std.atomic.Queue single-threaded" { - var queue = Queue(i32).init(); - try expect(queue.isEmpty()); - - var node_0 = Queue(i32).Node{ - .data = 0, - .next = undefined, - .prev = undefined, - }; - queue.put(&node_0); - try expect(!queue.isEmpty()); - - var node_1 = Queue(i32).Node{ - .data = 1, - .next = undefined, - .prev = undefined, - }; - queue.put(&node_1); - try expect(!queue.isEmpty()); - - try expect(queue.get().?.data == 0); - try expect(!queue.isEmpty()); - - var node_2 = Queue(i32).Node{ - .data = 2, - .next = undefined, - .prev = undefined, - }; - queue.put(&node_2); - try expect(!queue.isEmpty()); - - var node_3 = Queue(i32).Node{ - .data = 3, - .next = undefined, - .prev = undefined, - }; - queue.put(&node_3); - try expect(!queue.isEmpty()); - - try expect(queue.get().?.data == 1); - try expect(!queue.isEmpty()); - - try expect(queue.get().?.data == 2); - try expect(!queue.isEmpty()); - - var node_4 = Queue(i32).Node{ - .data = 4, - .next = undefined, - .prev = undefined, - }; - queue.put(&node_4); - try expect(!queue.isEmpty()); - - try expect(queue.get().?.data == 3); - node_3.next = null; - try expect(!queue.isEmpty()); - - queue.unget(&node_3); - try expect(queue.get().?.data == 3); - try expect(!queue.isEmpty()); - - try expect(queue.get().?.data == 4); - try expect(queue.isEmpty()); - - try expect(queue.get() == null); - try expect(queue.isEmpty()); - - // unget an empty queue - queue.unget(&node_4); - try expect(queue.tail == &node_4); - try expect(queue.head == &node_4); - - try expect(queue.get().?.data == 4); - - try expect(queue.get() == null); - try expect(queue.isEmpty()); -} - -test "std.atomic.Queue dump" { - const mem = std.mem; - var buffer: [1024]u8 = undefined; - var expected_buffer: [1024]u8 = undefined; - var fbs = std.io.fixedBufferStream(&buffer); - - var queue = Queue(i32).init(); - - // Test empty stream - fbs.reset(); - try queue.dumpToStream(fbs.writer()); - try expect(mem.eql(u8, buffer[0..fbs.pos], - \\head: (null) - \\tail: (null) - \\ - )); - - // Test a stream with one element - var node_0 = Queue(i32).Node{ - .data = 1, - .next = undefined, - .prev = undefined, - }; - queue.put(&node_0); - - fbs.reset(); - try queue.dumpToStream(fbs.writer()); - - var expected = try std.fmt.bufPrint(expected_buffer[0..], - \\head: 0x{x}=1 - \\ (null) - \\tail: 0x{x}=1 - \\ (null) - \\ - , .{ @intFromPtr(queue.head), @intFromPtr(queue.tail) }); - try expect(mem.eql(u8, buffer[0..fbs.pos], expected)); - - // Test a stream with two elements - var node_1 = Queue(i32).Node{ - .data = 2, - .next = undefined, - .prev = undefined, - }; - queue.put(&node_1); - - fbs.reset(); - try queue.dumpToStream(fbs.writer()); - - expected = try std.fmt.bufPrint(expected_buffer[0..], - \\head: 0x{x}=1 - \\ 0x{x}=2 - \\ (null) - \\tail: 0x{x}=2 - \\ (null) - \\ - , .{ @intFromPtr(queue.head), @intFromPtr(queue.head.?.next), @intFromPtr(queue.tail) }); - try expect(mem.eql(u8, buffer[0..fbs.pos], expected)); -}