mirror of
https://github.com/ziglang/zig.git
synced 2025-12-06 06:13:07 +00:00
Io.Queue: fix empty and full states being indistinguishable.
This commit is contained in:
parent
c603d27f90
commit
ea76946d2a
107
lib/std/Io.zig
107
lib/std/Io.zig
@ -1238,8 +1238,8 @@ pub const TypeErasedQueue = struct {
|
||||
|
||||
/// Ring buffer. This data is logically *after* queued getters.
|
||||
buffer: []u8,
|
||||
put_index: usize,
|
||||
get_index: usize,
|
||||
start: usize,
|
||||
len: usize,
|
||||
|
||||
putters: std.DoublyLinkedList,
|
||||
getters: std.DoublyLinkedList,
|
||||
@ -1260,8 +1260,8 @@ pub const TypeErasedQueue = struct {
|
||||
return .{
|
||||
.mutex = .init,
|
||||
.buffer = buffer,
|
||||
.put_index = 0,
|
||||
.get_index = 0,
|
||||
.start = 0,
|
||||
.len = 0,
|
||||
.putters = .{},
|
||||
.getters = .{},
|
||||
};
|
||||
@ -1286,6 +1286,16 @@ pub const TypeErasedQueue = struct {
|
||||
};
|
||||
}
|
||||
|
||||
fn puttableSlice(q: *const TypeErasedQueue) ?[]u8 {
|
||||
const unwrapped_index = q.start + q.len;
|
||||
const wrapped_index, const overflow = @subWithOverflow(unwrapped_index, q.buffer.len);
|
||||
const slice = switch (overflow) {
|
||||
1 => q.buffer[unwrapped_index..],
|
||||
0 => q.buffer[wrapped_index..q.start],
|
||||
};
|
||||
return if (slice.len > 0) slice else null;
|
||||
}
|
||||
|
||||
fn putLocked(q: *TypeErasedQueue, io: Io, elements: []const u8, min: usize, uncancelable: bool) Cancelable!usize {
|
||||
// Getters have first priority on the data, and only when the getters
|
||||
// queue is empty do we start populating the buffer.
|
||||
@ -1306,20 +1316,12 @@ pub const TypeErasedQueue = struct {
|
||||
return elements.len;
|
||||
}
|
||||
|
||||
{
|
||||
const available = q.buffer[q.put_index..];
|
||||
const copy_len = @min(available.len, remaining.len);
|
||||
@memcpy(available[0..copy_len], remaining[0..copy_len]);
|
||||
while (q.puttableSlice()) |slice| {
|
||||
const copy_len = @min(slice.len, remaining.len);
|
||||
assert(copy_len > 0);
|
||||
@memcpy(slice[0..copy_len], remaining[0..copy_len]);
|
||||
q.len += copy_len;
|
||||
remaining = remaining[copy_len..];
|
||||
q.put_index += copy_len;
|
||||
if (remaining.len == 0) return elements.len;
|
||||
}
|
||||
{
|
||||
const available = q.buffer[0..q.get_index];
|
||||
const copy_len = @min(available.len, remaining.len);
|
||||
@memcpy(available[0..copy_len], remaining[0..copy_len]);
|
||||
remaining = remaining[copy_len..];
|
||||
q.put_index = copy_len;
|
||||
if (remaining.len == 0) return elements.len;
|
||||
}
|
||||
|
||||
@ -1354,46 +1356,32 @@ pub const TypeErasedQueue = struct {
|
||||
};
|
||||
}
|
||||
|
||||
fn gettableSlice(q: *const TypeErasedQueue) ?[]const u8 {
|
||||
const overlong_slice = q.buffer[q.start..];
|
||||
const slice = overlong_slice[0..@min(overlong_slice.len, q.len)];
|
||||
return if (slice.len > 0) slice else null;
|
||||
}
|
||||
|
||||
fn getLocked(q: *@This(), io: Io, buffer: []u8, min: usize, uncancelable: bool) Cancelable!usize {
|
||||
// The ring buffer gets first priority, then data should come from any
|
||||
// queued putters, then finally the ring buffer should be filled with
|
||||
// data from putters so they can be resumed.
|
||||
|
||||
var remaining = buffer;
|
||||
if (q.get_index <= q.put_index) {
|
||||
const available = q.buffer[q.get_index..q.put_index];
|
||||
const copy_len = @min(available.len, remaining.len);
|
||||
@memcpy(remaining[0..copy_len], available[0..copy_len]);
|
||||
q.get_index += copy_len;
|
||||
while (q.gettableSlice()) |slice| {
|
||||
const copy_len = @min(slice.len, remaining.len);
|
||||
assert(copy_len > 0);
|
||||
@memcpy(remaining[0..copy_len], slice[0..copy_len]);
|
||||
q.start += copy_len;
|
||||
if (q.buffer.len - q.start == 0) q.start = 0;
|
||||
q.len -= copy_len;
|
||||
remaining = remaining[copy_len..];
|
||||
if (remaining.len == 0) {
|
||||
q.fillRingBufferFromPutters(io);
|
||||
return buffer.len;
|
||||
}
|
||||
} else {
|
||||
{
|
||||
const available = q.buffer[q.get_index..];
|
||||
const copy_len = @min(available.len, remaining.len);
|
||||
@memcpy(remaining[0..copy_len], available[0..copy_len]);
|
||||
q.get_index += copy_len;
|
||||
remaining = remaining[copy_len..];
|
||||
if (remaining.len == 0) {
|
||||
q.fillRingBufferFromPutters(io);
|
||||
return buffer.len;
|
||||
}
|
||||
}
|
||||
{
|
||||
const available = q.buffer[0..q.put_index];
|
||||
const copy_len = @min(available.len, remaining.len);
|
||||
@memcpy(remaining[0..copy_len], available[0..copy_len]);
|
||||
q.get_index = copy_len;
|
||||
remaining = remaining[copy_len..];
|
||||
if (remaining.len == 0) {
|
||||
q.fillRingBufferFromPutters(io);
|
||||
return buffer.len;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Copy directly from putters into buffer.
|
||||
while (q.putters.popFirst()) |putter_node| {
|
||||
const putter: *Put = @alignCast(@fieldParentPtr("node", putter_node));
|
||||
@ -1410,6 +1398,7 @@ pub const TypeErasedQueue = struct {
|
||||
q.fillRingBufferFromPutters(io);
|
||||
return buffer.len;
|
||||
}
|
||||
|
||||
// Both ring buffer and putters queue is empty.
|
||||
const total_filled = buffer.len - remaining.len;
|
||||
if (total_filled >= min) return total_filled;
|
||||
@ -1432,30 +1421,20 @@ pub const TypeErasedQueue = struct {
|
||||
fn fillRingBufferFromPutters(q: *TypeErasedQueue, io: Io) void {
|
||||
while (q.putters.popFirst()) |putter_node| {
|
||||
const putter: *Put = @alignCast(@fieldParentPtr("node", putter_node));
|
||||
{
|
||||
const available = q.buffer[q.put_index..];
|
||||
const copy_len = @min(available.len, putter.remaining.len);
|
||||
@memcpy(available[0..copy_len], putter.remaining[0..copy_len]);
|
||||
while (q.puttableSlice()) |slice| {
|
||||
const copy_len = @min(slice.len, putter.remaining.len);
|
||||
assert(copy_len > 0);
|
||||
@memcpy(slice[0..copy_len], putter.remaining[0..copy_len]);
|
||||
q.len += copy_len;
|
||||
putter.remaining = putter.remaining[copy_len..];
|
||||
q.put_index += copy_len;
|
||||
if (putter.remaining.len == 0) {
|
||||
putter.condition.signal(io);
|
||||
continue;
|
||||
break;
|
||||
}
|
||||
} else {
|
||||
q.putters.prepend(putter_node);
|
||||
break;
|
||||
}
|
||||
{
|
||||
const available = q.buffer[0..q.get_index];
|
||||
const copy_len = @min(available.len, putter.remaining.len);
|
||||
@memcpy(available[0..copy_len], putter.remaining[0..copy_len]);
|
||||
putter.remaining = putter.remaining[copy_len..];
|
||||
q.put_index = copy_len;
|
||||
if (putter.remaining.len == 0) {
|
||||
putter.condition.signal(io);
|
||||
continue;
|
||||
}
|
||||
}
|
||||
q.putters.prepend(putter_node);
|
||||
break;
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
@ -208,3 +208,24 @@ test "select" {
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
fn testQueue(comptime len: usize) !void {
|
||||
const io = testing.io;
|
||||
var buf: [len]usize = undefined;
|
||||
var queue: Io.Queue(usize) = .init(&buf);
|
||||
var begin: usize = 0;
|
||||
for (1..len + 1) |n| {
|
||||
const end = begin + n;
|
||||
for (begin..end) |i| try queue.putOne(io, i);
|
||||
for (begin..end) |i| try expect(try queue.getOne(io) == i);
|
||||
begin = end;
|
||||
}
|
||||
}
|
||||
|
||||
test "Queue" {
|
||||
try testQueue(1);
|
||||
try testQueue(2);
|
||||
try testQueue(3);
|
||||
try testQueue(4);
|
||||
try testQueue(5);
|
||||
}
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user