std.Io.Queue: add "uncancelable" variants to "get"

useful for resource management
This commit is contained in:
Andrew Kelley 2025-10-15 11:00:00 -07:00
parent adaef433d2
commit 80069c1e69

View File

@ -1315,10 +1315,23 @@ pub const TypeErasedQueue = struct {
pub fn get(q: *@This(), io: Io, buffer: []u8, min: usize) Cancelable!usize {
assert(buffer.len >= min);
if (buffer.len == 0) return 0;
try q.mutex.lock(io);
defer q.mutex.unlock(io);
return getLocked(q, io, buffer, min, false);
}
pub fn getUncancelable(q: *@This(), io: Io, buffer: []u8, min: usize) usize {
assert(buffer.len >= min);
if (buffer.len == 0) return 0;
q.mutex.lockUncancelable(io);
defer q.mutex.unlock(io);
return getLocked(q, io, buffer, min, true) catch |err| switch (err) {
error.Canceled => unreachable,
};
}
pub fn getLocked(q: *@This(), io: Io, buffer: []u8, min: usize, uncancelable: bool) Cancelable!usize {
// The ring buffer gets first priority, then data should come from any
// queued putters, then finally the ring buffer should be filled with
// data from putters so they can be resumed.
@ -1371,7 +1384,10 @@ pub const TypeErasedQueue = struct {
var pending: Get = .{ .remaining = remaining, .condition = .{}, .node = .{} };
q.getters.append(&pending.node);
try pending.condition.wait(io, &q.mutex);
if (uncancelable)
pending.condition.waitUncancelable(io, &q.mutex)
else
try pending.condition.wait(io, &q.mutex);
remaining = pending.remaining;
}
}
@ -1439,6 +1455,14 @@ pub fn Queue(Elem: type) type {
return @divExact(q.type_erased.putUncancelable(io, @ptrCast(elements), min * @sizeOf(Elem)), @sizeOf(Elem));
}
pub fn putOne(q: *@This(), io: Io, item: Elem) Cancelable!void {
assert(try q.put(io, &.{item}, 1) == 1);
}
pub fn putOneUncancelable(q: *@This(), io: Io, item: Elem) void {
assert(q.putUncancelable(io, &.{item}, 1) == 1);
}
/// Receives elements from the beginning of the queue. The function
/// returns when at least `min` elements have been populated inside
/// `buffer`.
@ -1450,12 +1474,8 @@ pub fn Queue(Elem: type) type {
return @divExact(try q.type_erased.get(io, @ptrCast(buffer), min * @sizeOf(Elem)), @sizeOf(Elem));
}
pub fn putOne(q: *@This(), io: Io, item: Elem) Cancelable!void {
assert(try q.put(io, &.{item}, 1) == 1);
}
pub fn putOneUncancelable(q: *@This(), io: Io, item: Elem) void {
assert(q.putUncancelable(io, &.{item}, 1) == 1);
pub fn getUncancelable(q: *@This(), io: Io, buffer: []Elem, min: usize) usize {
return @divExact(q.type_erased.getUncancelable(io, @ptrCast(buffer), min * @sizeOf(Elem)), @sizeOf(Elem));
}
pub fn getOne(q: *@This(), io: Io) Cancelable!Elem {
@ -1464,6 +1484,12 @@ pub fn Queue(Elem: type) type {
return buf[0];
}
pub fn getOneUncancelable(q: *@This(), io: Io) Elem {
var buf: [1]Elem = undefined;
assert(q.getUncancelable(io, &buf, 1) == 1);
return buf[0];
}
/// Returns buffer length in `Elem` units.
pub fn capacity(q: *const @This()) usize {
return @divExact(q.type_erased.buffer.len, @sizeOf(Elem));