diff --git a/lib/std/Io.zig b/lib/std/Io.zig index d6b6fb7979..7a8e09c77f 100644 --- a/lib/std/Io.zig +++ b/lib/std/Io.zig @@ -1272,7 +1272,7 @@ pub const TypeErasedQueue = struct { if (elements.len == 0) return 0; try q.mutex.lock(io); defer q.mutex.unlock(io); - return putLocked(q, io, elements, min, false); + return q.putLocked(io, elements, min, false); } /// Same as `put` but cannot be canceled. @@ -1281,7 +1281,7 @@ pub const TypeErasedQueue = struct { if (elements.len == 0) return 0; q.mutex.lockUncancelable(io); defer q.mutex.unlock(io); - return putLocked(q, io, elements, min, true) catch |err| switch (err) { + return q.putLocked(io, elements, min, true) catch |err| switch (err) { error.Canceled => unreachable, }; } @@ -1291,50 +1291,49 @@ pub const TypeErasedQueue = struct { // queue is empty do we start populating the buffer. var remaining = elements; - while (true) { - const getter: *Get = @alignCast(@fieldParentPtr("node", q.getters.popFirst() orelse break)); + while (q.getters.popFirst()) |getter_node| { + const getter: *Get = @alignCast(@fieldParentPtr("node", getter_node)); const copy_len = @min(getter.remaining.len, remaining.len); + assert(copy_len > 0); @memcpy(getter.remaining[0..copy_len], remaining[0..copy_len]); remaining = remaining[copy_len..]; getter.remaining = getter.remaining[copy_len..]; if (getter.remaining.len == 0) { getter.condition.signal(io); - continue; - } - q.getters.prepend(&getter.node); + if (remaining.len > 0) continue; + } else q.getters.prepend(getter_node); assert(remaining.len == 0); return elements.len; } - while (true) { - { - const available = q.buffer[q.put_index..]; - const copy_len = @min(available.len, remaining.len); - @memcpy(available[0..copy_len], remaining[0..copy_len]); - remaining = remaining[copy_len..]; - q.put_index += copy_len; - if (remaining.len == 0) return elements.len; - } - { - const available = q.buffer[0..q.get_index]; - const copy_len = @min(available.len, remaining.len); - @memcpy(available[0..copy_len], remaining[0..copy_len]); - remaining = remaining[copy_len..]; - q.put_index = copy_len; - if (remaining.len == 0) return elements.len; - } - - const total_filled = elements.len - remaining.len; - if (total_filled >= min) return total_filled; - - var pending: Put = .{ .remaining = remaining, .condition = .{}, .node = .{} }; - q.putters.append(&pending.node); - if (uncancelable) - pending.condition.waitUncancelable(io, &q.mutex) - else - try pending.condition.wait(io, &q.mutex); - remaining = pending.remaining; + { + const available = q.buffer[q.put_index..]; + const copy_len = @min(available.len, remaining.len); + @memcpy(available[0..copy_len], remaining[0..copy_len]); + remaining = remaining[copy_len..]; + q.put_index += copy_len; + if (remaining.len == 0) return elements.len; } + { + const available = q.buffer[0..q.get_index]; + const copy_len = @min(available.len, remaining.len); + @memcpy(available[0..copy_len], remaining[0..copy_len]); + remaining = remaining[copy_len..]; + q.put_index = copy_len; + if (remaining.len == 0) return elements.len; + } + + const total_filled = elements.len - remaining.len; + if (total_filled >= min) return total_filled; + + var pending: Put = .{ .remaining = remaining, .condition = .{}, .node = .{} }; + q.putters.append(&pending.node); + defer if (pending.remaining.len > 0) q.putters.remove(&pending.node); + while (pending.remaining.len > 0) if (uncancelable) + pending.condition.waitUncancelable(io, &q.mutex) + else + try pending.condition.wait(io, &q.mutex); + return elements.len; } pub fn get(q: *@This(), io: Io, buffer: []u8, min: usize) Cancelable!usize { @@ -1342,7 +1341,7 @@ pub const TypeErasedQueue = struct { if (buffer.len == 0) return 0; try q.mutex.lock(io); defer q.mutex.unlock(io); - return getLocked(q, io, buffer, min, false); + return q.getLocked(io, buffer, min, false); } pub fn getUncancelable(q: *@This(), io: Io, buffer: []u8, min: usize) usize { @@ -1350,99 +1349,113 @@ pub const TypeErasedQueue = struct { if (buffer.len == 0) return 0; q.mutex.lockUncancelable(io); defer q.mutex.unlock(io); - return getLocked(q, io, buffer, min, true) catch |err| switch (err) { + return q.getLocked(io, buffer, min, true) catch |err| switch (err) { error.Canceled => unreachable, }; } - pub fn getLocked(q: *@This(), io: Io, buffer: []u8, min: usize, uncancelable: bool) Cancelable!usize { + fn getLocked(q: *@This(), io: Io, buffer: []u8, min: usize, uncancelable: bool) Cancelable!usize { // The ring buffer gets first priority, then data should come from any // queued putters, then finally the ring buffer should be filled with // data from putters so they can be resumed. var remaining = buffer; - while (true) { - if (q.get_index <= q.put_index) { - const available = q.buffer[q.get_index..q.put_index]; + if (q.get_index <= q.put_index) { + const available = q.buffer[q.get_index..q.put_index]; + const copy_len = @min(available.len, remaining.len); + @memcpy(remaining[0..copy_len], available[0..copy_len]); + q.get_index += copy_len; + remaining = remaining[copy_len..]; + if (remaining.len == 0) { + q.fillRingBufferFromPutters(io); + return buffer.len; + } + } else { + { + const available = q.buffer[q.get_index..]; const copy_len = @min(available.len, remaining.len); @memcpy(remaining[0..copy_len], available[0..copy_len]); q.get_index += copy_len; remaining = remaining[copy_len..]; - if (remaining.len == 0) return fillRingBufferFromPutters(q, io, buffer.len); - } else { - { - const available = q.buffer[q.get_index..]; - const copy_len = @min(available.len, remaining.len); - @memcpy(remaining[0..copy_len], available[0..copy_len]); - q.get_index += copy_len; - remaining = remaining[copy_len..]; - if (remaining.len == 0) return fillRingBufferFromPutters(q, io, buffer.len); - } - { - const available = q.buffer[0..q.put_index]; - const copy_len = @min(available.len, remaining.len); - @memcpy(remaining[0..copy_len], available[0..copy_len]); - q.get_index = copy_len; - remaining = remaining[copy_len..]; - if (remaining.len == 0) return fillRingBufferFromPutters(q, io, buffer.len); + if (remaining.len == 0) { + q.fillRingBufferFromPutters(io); + return buffer.len; } } - // Copy directly from putters into buffer. - while (remaining.len > 0) { - const putter: *Put = @alignCast(@fieldParentPtr("node", q.putters.popFirst() orelse break)); - const copy_len = @min(putter.remaining.len, remaining.len); - @memcpy(remaining[0..copy_len], putter.remaining[0..copy_len]); - putter.remaining = putter.remaining[copy_len..]; + { + const available = q.buffer[0..q.put_index]; + const copy_len = @min(available.len, remaining.len); + @memcpy(remaining[0..copy_len], available[0..copy_len]); + q.get_index = copy_len; remaining = remaining[copy_len..]; - if (putter.remaining.len == 0) { - putter.condition.signal(io); - } else { - assert(remaining.len == 0); - q.putters.prepend(&putter.node); - return fillRingBufferFromPutters(q, io, buffer.len); + if (remaining.len == 0) { + q.fillRingBufferFromPutters(io); + return buffer.len; } } - // Both ring buffer and putters queue is empty. - const total_filled = buffer.len - remaining.len; - if (total_filled >= min) return total_filled; - - var pending: Get = .{ .remaining = remaining, .condition = .{}, .node = .{} }; - q.getters.append(&pending.node); - if (uncancelable) - pending.condition.waitUncancelable(io, &q.mutex) - else - try pending.condition.wait(io, &q.mutex); - remaining = pending.remaining; } + // Copy directly from putters into buffer. + while (q.putters.popFirst()) |putter_node| { + const putter: *Put = @alignCast(@fieldParentPtr("node", putter_node)); + const copy_len = @min(putter.remaining.len, remaining.len); + assert(copy_len > 0); + @memcpy(remaining[0..copy_len], putter.remaining[0..copy_len]); + putter.remaining = putter.remaining[copy_len..]; + remaining = remaining[copy_len..]; + if (putter.remaining.len == 0) { + putter.condition.signal(io); + if (remaining.len > 0) continue; + } else q.putters.prepend(putter_node); + assert(remaining.len == 0); + q.fillRingBufferFromPutters(io); + return buffer.len; + } + // Both ring buffer and putters queue is empty. + const total_filled = buffer.len - remaining.len; + if (total_filled >= min) return total_filled; + + var pending: Get = .{ .remaining = remaining, .condition = .{}, .node = .{} }; + q.getters.append(&pending.node); + defer if (pending.remaining.len > 0) q.getters.remove(&pending.node); + while (pending.remaining.len > 0) if (uncancelable) + pending.condition.waitUncancelable(io, &q.mutex) + else + try pending.condition.wait(io, &q.mutex); + q.fillRingBufferFromPutters(io); + return buffer.len; } /// Called when there is nonzero space available in the ring buffer and /// potentially putters waiting. The mutex is already held and the task is /// to copy putter data to the ring buffer and signal any putters whose /// buffers been fully copied. - fn fillRingBufferFromPutters(q: *TypeErasedQueue, io: Io, len: usize) usize { - while (true) { - const putter: *Put = @alignCast(@fieldParentPtr("node", q.putters.popFirst() orelse return len)); - const available = q.buffer[q.put_index..]; - const copy_len = @min(available.len, putter.remaining.len); - @memcpy(available[0..copy_len], putter.remaining[0..copy_len]); - putter.remaining = putter.remaining[copy_len..]; - q.put_index += copy_len; - if (putter.remaining.len == 0) { - putter.condition.signal(io); - continue; + fn fillRingBufferFromPutters(q: *TypeErasedQueue, io: Io) void { + while (q.putters.popFirst()) |putter_node| { + const putter: *Put = @alignCast(@fieldParentPtr("node", putter_node)); + { + const available = q.buffer[q.put_index..]; + const copy_len = @min(available.len, putter.remaining.len); + @memcpy(available[0..copy_len], putter.remaining[0..copy_len]); + putter.remaining = putter.remaining[copy_len..]; + q.put_index += copy_len; + if (putter.remaining.len == 0) { + putter.condition.signal(io); + continue; + } } - const second_available = q.buffer[0..q.get_index]; - const second_copy_len = @min(second_available.len, putter.remaining.len); - @memcpy(second_available[0..second_copy_len], putter.remaining[0..second_copy_len]); - putter.remaining = putter.remaining[copy_len..]; - q.put_index = copy_len; - if (putter.remaining.len == 0) { - putter.condition.signal(io); - continue; + { + const available = q.buffer[0..q.get_index]; + const copy_len = @min(available.len, putter.remaining.len); + @memcpy(available[0..copy_len], putter.remaining[0..copy_len]); + putter.remaining = putter.remaining[copy_len..]; + q.put_index = copy_len; + if (putter.remaining.len == 0) { + putter.condition.signal(io); + continue; + } } - q.putters.prepend(&putter.node); - return len; + q.putters.prepend(putter_node); + break; } } };