From 80069c1e69140ac240e5397270ea919ddbfce89b Mon Sep 17 00:00:00 2001 From: Andrew Kelley Date: Wed, 15 Oct 2025 11:00:00 -0700 Subject: [PATCH] std.Io.Queue: add "uncancelable" variants to "get" useful for resource management --- lib/std/Io.zig | 42 ++++++++++++++++++++++++++++++++++-------- 1 file changed, 34 insertions(+), 8 deletions(-) diff --git a/lib/std/Io.zig b/lib/std/Io.zig index 6da8187cd1..2e15c4b8cb 100644 --- a/lib/std/Io.zig +++ b/lib/std/Io.zig @@ -1315,10 +1315,23 @@ pub const TypeErasedQueue = struct { pub fn get(q: *@This(), io: Io, buffer: []u8, min: usize) Cancelable!usize { assert(buffer.len >= min); - + if (buffer.len == 0) return 0; try q.mutex.lock(io); defer q.mutex.unlock(io); + return getLocked(q, io, buffer, min, false); + } + pub fn getUncancelable(q: *@This(), io: Io, buffer: []u8, min: usize) usize { + assert(buffer.len >= min); + if (buffer.len == 0) return 0; + q.mutex.lockUncancelable(io); + defer q.mutex.unlock(io); + return getLocked(q, io, buffer, min, true) catch |err| switch (err) { + error.Canceled => unreachable, + }; + } + + pub fn getLocked(q: *@This(), io: Io, buffer: []u8, min: usize, uncancelable: bool) Cancelable!usize { // The ring buffer gets first priority, then data should come from any // queued putters, then finally the ring buffer should be filled with // data from putters so they can be resumed. @@ -1371,7 +1384,10 @@ pub const TypeErasedQueue = struct { var pending: Get = .{ .remaining = remaining, .condition = .{}, .node = .{} }; q.getters.append(&pending.node); - try pending.condition.wait(io, &q.mutex); + if (uncancelable) + pending.condition.waitUncancelable(io, &q.mutex) + else + try pending.condition.wait(io, &q.mutex); remaining = pending.remaining; } } @@ -1439,6 +1455,14 @@ pub fn Queue(Elem: type) type { return @divExact(q.type_erased.putUncancelable(io, @ptrCast(elements), min * @sizeOf(Elem)), @sizeOf(Elem)); } + pub fn putOne(q: *@This(), io: Io, item: Elem) Cancelable!void { + assert(try q.put(io, &.{item}, 1) == 1); + } + + pub fn putOneUncancelable(q: *@This(), io: Io, item: Elem) void { + assert(q.putUncancelable(io, &.{item}, 1) == 1); + } + /// Receives elements from the beginning of the queue. The function /// returns when at least `min` elements have been populated inside /// `buffer`. @@ -1450,12 +1474,8 @@ pub fn Queue(Elem: type) type { return @divExact(try q.type_erased.get(io, @ptrCast(buffer), min * @sizeOf(Elem)), @sizeOf(Elem)); } - pub fn putOne(q: *@This(), io: Io, item: Elem) Cancelable!void { - assert(try q.put(io, &.{item}, 1) == 1); - } - - pub fn putOneUncancelable(q: *@This(), io: Io, item: Elem) void { - assert(q.putUncancelable(io, &.{item}, 1) == 1); + pub fn getUncancelable(q: *@This(), io: Io, buffer: []Elem, min: usize) usize { + return @divExact(q.type_erased.getUncancelable(io, @ptrCast(buffer), min * @sizeOf(Elem)), @sizeOf(Elem)); } pub fn getOne(q: *@This(), io: Io) Cancelable!Elem { @@ -1464,6 +1484,12 @@ pub fn Queue(Elem: type) type { return buf[0]; } + pub fn getOneUncancelable(q: *@This(), io: Io) Elem { + var buf: [1]Elem = undefined; + assert(q.getUncancelable(io, &buf, 1) == 1); + return buf[0]; + } + /// Returns buffer length in `Elem` units. pub fn capacity(q: *const @This()) usize { return @divExact(q.type_erased.buffer.len, @sizeOf(Elem));