Reader.peekDelimiterInclusive: Fix handling of stream implementations that return 0

Previously, the logic in peekDelimiterInclusive (when the delimiter was not found in the existing buffer) used the `n` returned from `r.vtable.stream` as the length of the slice to check, but it's valid for `vtable.stream` implementations to return 0 if they wrote to the buffer instead of `w`. In that scenario, the `indexOfScalarPos` would be given a 0-length slice so it would never be able to find the delimiter.

This commit changes the logic to assume that `r.vtable.stream` can both:
- return 0, and
- modify seek/end (i.e. it's also valid for a `vtable.stream` implementation to rebase)

Also introduces `std.testing.ReaderIndirect` which helps in being able to test against Reader implementations that return 0 from `stream`/`readVec`

Fixes #25428
This commit is contained in:
Ryan Liptak 2025-10-03 01:18:53 -07:00 committed by Andrew Kelley
parent ff16a7c3fa
commit acd6ffdf69
2 changed files with 97 additions and 4 deletions

View File

@ -763,13 +763,14 @@ pub fn takeDelimiterInclusive(r: *Reader, delimiter: u8) DelimiterError![]u8 {
pub fn peekDelimiterInclusive(r: *Reader, delimiter: u8) DelimiterError![]u8 {
const buffer = r.buffer[0..r.end];
const seek = r.seek;
if (std.mem.indexOfScalarPos(u8, buffer, seek, delimiter)) |end| {
if (std.mem.indexOfScalarPos(u8, buffer, seek, delimiter)) |delimiter_index| {
@branchHint(.likely);
return buffer[seek .. end + 1];
return buffer[seek .. delimiter_index + 1];
}
// TODO take a parameter for max search length rather than relying on buffer capacity
try rebase(r, r.buffer.len);
while (r.buffer.len - r.end != 0) {
const existing_buffered_len = r.end - r.seek;
const end_cap = r.buffer[r.end..];
var writer: Writer = .fixed(end_cap);
const n = r.vtable.stream(r, &writer, .limited(end_cap.len)) catch |err| switch (err) {
@ -777,8 +778,8 @@ pub fn peekDelimiterInclusive(r: *Reader, delimiter: u8) DelimiterError![]u8 {
else => |e| return e,
};
r.end += n;
if (std.mem.indexOfScalarPos(u8, end_cap[0..n], 0, delimiter)) |end| {
return r.buffer[0 .. r.end - n + end + 1];
if (std.mem.indexOfScalarPos(u8, r.buffer[0..r.end], r.seek + existing_buffered_len, delimiter)) |delimiter_index| {
return r.buffer[r.seek .. delimiter_index + 1];
}
}
return error.StreamTooLong;
@ -1534,6 +1535,18 @@ test "readSliceShort with smaller buffer than Reader" {
try testing.expectEqualStrings(str, &buf);
}
test "readSliceShort with indirect reader" {
var r: Reader = .fixed("HelloFren");
var ri_buf: [3]u8 = undefined;
var ri: std.testing.ReaderIndirect = .init(&r, &ri_buf);
var buf: [5]u8 = undefined;
try testing.expectEqual(5, try ri.interface.readSliceShort(&buf));
try testing.expectEqualStrings("Hello", buf[0..5]);
try testing.expectEqual(4, try ri.interface.readSliceShort(&buf));
try testing.expectEqualStrings("Fren", buf[0..4]);
try testing.expectEqual(0, try ri.interface.readSliceShort(&buf));
}
test readVec {
var r: Reader = .fixed(std.ascii.letters);
var flat_buffer: [52]u8 = undefined;
@ -1643,6 +1656,26 @@ test "takeDelimiterInclusive when it rebases" {
}
}
test "takeDelimiterInclusive on an indirect reader when it rebases" {
const written_line = "ABCDEFGHIJKLMNOPQRSTUVWXYZ\n";
var buffer: [128]u8 = undefined;
var tr: std.testing.Reader = .init(&buffer, &.{
.{ .buffer = written_line[0..4] },
.{ .buffer = written_line[4..] },
.{ .buffer = written_line },
.{ .buffer = written_line },
.{ .buffer = written_line },
.{ .buffer = written_line },
.{ .buffer = written_line },
});
var indirect_buffer: [128]u8 = undefined;
var tri: std.testing.ReaderIndirect = .init(&tr.interface, &indirect_buffer);
const r = &tri.interface;
for (0..6) |_| {
try std.testing.expectEqualStrings(written_line, try r.takeDelimiterInclusive('\n'));
}
}
test "takeStruct and peekStruct packed" {
var r: Reader = .fixed(&.{ 0b11110000, 0b00110011 });
const S = packed struct(u16) { a: u2, b: u6, c: u7, d: u1 };

View File

@ -1249,3 +1249,63 @@ pub const Reader = struct {
return n;
}
};
/// A `std.Io.Reader` that gets its data from another `std.Io.Reader`, and always
/// writes to its own buffer (and returns 0) during `stream` and `readVec`.
pub const ReaderIndirect = struct {
in: *std.Io.Reader,
interface: std.Io.Reader,
pub fn init(in: *std.Io.Reader, buffer: []u8) ReaderIndirect {
return .{
.in = in,
.interface = .{
.vtable = &.{
.stream = stream,
.readVec = readVec,
},
.buffer = buffer,
.seek = 0,
.end = 0,
},
};
}
fn readVec(r: *std.Io.Reader, _: [][]u8) std.Io.Reader.Error!usize {
try streamInner(r);
return 0;
}
fn stream(r: *std.Io.Reader, _: *std.Io.Writer, _: std.Io.Limit) std.Io.Reader.StreamError!usize {
try streamInner(r);
return 0;
}
fn streamInner(r: *std.Io.Reader) std.Io.Reader.Error!void {
const r_indirect: *ReaderIndirect = @alignCast(@fieldParentPtr("interface", r));
// If there's no room remaining in the buffer at all, make room.
if (r.buffer.len == r.end) {
try r.rebase(r.buffer.len);
}
var writer: std.Io.Writer = .{
.buffer = r.buffer,
.end = r.end,
.vtable = &.{
.drain = std.Io.Writer.unreachableDrain,
.rebase = std.Io.Writer.unreachableRebase,
},
};
defer r.end = writer.end;
r_indirect.in.streamExact(&writer, r.buffer.len - r.end) catch |err| switch (err) {
// Only forward EndOfStream if no new bytes were written to the buffer
error.EndOfStream => |e| if (r.end == writer.end) {
return e;
},
error.WriteFailed => unreachable,
else => |e| return e,
};
}
};