std.io.Reader: extract PositionalReader to separate interface

This commit is contained in:
Andrew Kelley 2025-04-10 23:03:20 -07:00
parent 60854795b8
commit 4ee2534566
5 changed files with 114 additions and 124 deletions

View File

@ -17,6 +17,8 @@ const Alignment = std.mem.Alignment;
pub const Reader = @import("io/Reader.zig");
pub const Writer = @import("io/Writer.zig");
pub const PositionalReader = @import("io/PositionalReader.zig");
pub const BufferedReader = @import("io/BufferedReader.zig");
pub const BufferedWriter = @import("io/BufferedWriter.zig");
pub const AllocatingWriter = @import("io/AllocatingWriter.zig");
@ -451,6 +453,7 @@ test {
_ = BufferedReader;
_ = Reader;
_ = Writer;
_ = PositionalReader;
_ = AllocatingWriter;
_ = @import("io/bit_reader.zig");
_ = @import("io/bit_writer.zig");

View File

@ -28,10 +28,8 @@ const eof_writer: std.io.Writer.VTable = .{
.writeFile = eof_writeFile,
};
const eof_reader: std.io.Reader.VTable = .{
.posRead = eof_posRead,
.posReadVec = eof_posReadVec,
.streamRead = eof_streamRead,
.streamReadVec = eof_streamReadVec,
.read = eof_read,
.readv = eof_readv,
};
fn eof_writeSplat(context: ?*anyopaque, data: []const []const u8, splat: usize) anyerror!usize {
@ -58,29 +56,14 @@ fn eof_writeFile(
return error.NoSpaceLeft;
}
fn eof_posRead(ctx: ?*anyopaque, bw: *std.io.BufferedWriter, limit: Reader.Limit, offset: u64) anyerror!Reader.Status {
_ = ctx;
_ = bw;
_ = limit;
_ = offset;
return error.EndOfStream;
}
fn eof_posReadVec(ctx: ?*anyopaque, data: []const []u8, offset: u64) anyerror!Reader.Status {
_ = ctx;
_ = data;
_ = offset;
return error.EndOfStream;
}
fn eof_streamRead(ctx: ?*anyopaque, bw: *std.io.BufferedWriter, limit: Reader.Limit) anyerror!Reader.Status {
fn eof_read(ctx: ?*anyopaque, bw: *std.io.BufferedWriter, limit: Reader.Limit) anyerror!Reader.Status {
_ = ctx;
_ = bw;
_ = limit;
return error.EndOfStream;
}
fn eof_streamReadVec(ctx: ?*anyopaque, data: []const []u8) anyerror!Reader.Status {
fn eof_readv(ctx: ?*anyopaque, data: []const []u8) anyerror!Reader.Status {
_ = ctx;
_ = data;
return error.EndOfStream;
@ -117,15 +100,13 @@ pub fn reader(br: *BufferedReader) Reader {
return .{
.context = br,
.vtable = &.{
.streamRead = passthru_streamRead,
.streamReadVec = passthru_streamReadVec,
.posRead = passthru_posRead,
.posReadVec = passthru_posReadVec,
.read = passthru_read,
.readv = passthru_readv,
},
};
}
fn passthru_streamRead(ctx: ?*anyopaque, bw: *BufferedWriter, limit: Reader.Limit) anyerror!Reader.RwResult {
fn passthru_read(ctx: ?*anyopaque, bw: *BufferedWriter, limit: Reader.Limit) anyerror!Reader.RwResult {
const br: *BufferedReader = @alignCast(@ptrCast(ctx));
const buffer = br.storage.buffer.items;
const buffered = buffer[br.seek..];
@ -139,34 +120,16 @@ fn passthru_streamRead(ctx: ?*anyopaque, bw: *BufferedWriter, limit: Reader.Limi
.write_end = result.end,
};
}
return br.unbuffered_reader.streamRead(bw, limit);
return br.unbuffered_reader.read(bw, limit);
}
fn passthru_streamReadVec(ctx: ?*anyopaque, data: []const []u8) anyerror!Reader.Status {
fn passthru_readv(ctx: ?*anyopaque, data: []const []u8) anyerror!Reader.Status {
const br: *BufferedReader = @alignCast(@ptrCast(ctx));
_ = br;
_ = data;
@panic("TODO");
}
fn passthru_posRead(ctx: ?*anyopaque, bw: *BufferedWriter, limit: Reader.Limit, off: u64) anyerror!Reader.Status {
const br: *BufferedReader = @alignCast(@ptrCast(ctx));
const buffer = br.storage.buffer.items;
if (off < buffer.len) {
const send = buffer[off..limit.min(buffer.len)];
return bw.writeSplat(send, 1);
}
return br.unbuffered_reader.posRead(bw, limit, off - buffer.len);
}
fn passthru_posReadVec(ctx: ?*anyopaque, data: []const []u8, off: u64) anyerror!Reader.Status {
const br: *BufferedReader = @alignCast(@ptrCast(ctx));
_ = br;
_ = data;
_ = off;
@panic("TODO");
}
pub fn seekBy(br: *BufferedReader, seek_by: i64) anyerror!void {
if (seek_by < 0) try br.seekBackwardBy(@abs(seek_by)) else try br.seekForwardBy(@abs(seek_by));
}
@ -293,7 +256,7 @@ pub fn discardUpTo(br: *BufferedReader, n: usize) anyerror!usize {
remaining -= (list.items.len - br.seek);
list.items.len = 0;
br.seek = 0;
const result = try br.unbuffered_reader.streamRead(&br.storage, .none);
const result = try br.unbuffered_reader.read(&br.storage, .none);
result.write_err catch unreachable;
try result.read_err;
assert(result.len == list.items.len);
@ -337,7 +300,7 @@ pub fn read(br: *BufferedReader, buffer: []u8) anyerror!void {
br.seek = 0;
var i: usize = in_buffer.len;
while (true) {
const status = try br.unbuffered_reader.streamRead(&br.storage, .none);
const status = try br.unbuffered_reader.read(&br.storage, .none);
const next_i = i + list.items.len;
if (next_i >= buffer.len) {
const remaining = buffer[i..];
@ -397,7 +360,7 @@ pub fn peekDelimiterInclusive(br: *BufferedReader, delimiter: u8) anyerror![]u8
list.items.len = i;
br.seek = 0;
while (i < list.capacity) {
const status = try br.unbuffered_reader.streamRead(&br.storage, .none);
const status = try br.unbuffered_reader.read(&br.storage, .none);
if (std.mem.indexOfScalarPos(u8, list.items, i, delimiter)) |end| {
return list.items[0 .. end + 1];
}
@ -442,7 +405,7 @@ pub fn peekDelimiterConclusive(br: *BufferedReader, delimiter: u8) anyerror![]u8
list.items.len = i;
br.seek = 0;
while (i < list.capacity) {
const status = try br.unbuffered_reader.streamRead(&br.storage, .none);
const status = try br.unbuffered_reader.read(&br.storage, .none);
if (std.mem.indexOfScalarPos(u8, list.items, i, delimiter)) |end| {
return list.items[0 .. end + 1];
}
@ -540,7 +503,7 @@ pub fn fill(br: *BufferedReader, n: usize) anyerror!void {
list.items.len = remainder.len;
br.seek = 0;
while (true) {
const status = try br.unbuffered_reader.streamRead(&br.storage, .none);
const status = try br.unbuffered_reader.read(&br.storage, .none);
if (n <= list.items.len) return;
if (status.end) return error.EndOfStream;
}

View File

@ -0,0 +1,64 @@
const std = @import("../std.zig");
const PositionalReader = @This();
const assert = std.debug.assert;
context: ?*anyopaque,
vtable: *const VTable,
pub const VTable = struct {
/// Writes bytes starting from `offset` to `bw`.
///
/// Returns the number of bytes written, which will be at minimum `0` and
/// at most `limit`. The number of bytes written, including zero, does not
/// indicate end of stream.
///
/// If the resource represented by the reader has an internal seek
/// position, it is not mutated.
///
/// The implementation should do a maximum of one underlying read call.
///
/// If `error.Unseekable` is returned, the resource cannot be used via a
/// positional reading interface.
read: *const fn (ctx: ?*anyopaque, bw: *std.io.BufferedWriter, limit: Limit, offset: u64) anyerror!Status,
/// Writes bytes starting from `offset` to `data`.
///
/// Returns the number of bytes written, which will be at minimum `0` and
/// at most `limit`. The number of bytes written, including zero, does not
/// indicate end of stream.
///
/// If the resource represented by the reader has an internal seek
/// position, it is not mutated.
///
/// The implementation should do a maximum of one underlying read call.
///
/// If `error.Unseekable` is returned, the resource cannot be used via a
/// positional reading interface.
readv: *const fn (ctx: ?*anyopaque, data: []const []u8, offset: u64) anyerror!Status,
};
pub const Len = std.io.Reader.Len;
pub const Status = std.io.Reader.Status;
pub const Limit = std.io.Reader.Limit;
pub fn read(pr: PositionalReader, bw: *std.io.BufferedWriter, limit: Limit, offset: u64) anyerror!Status {
return pr.vtable.read(pr.context, bw, limit, offset);
}
pub fn readv(pr: PositionalReader, data: []const []u8, offset: u64) anyerror!Status {
return pr.vtable.read(pr.context, data, offset);
}
/// Returns total number of bytes written to `w`.
///
/// May return `error.Unseekable`, indicating this function cannot be used to
/// read from the reader.
pub fn readAll(pr: PositionalReader, w: *std.io.BufferedWriter, start_offset: u64) anyerror!usize {
const readFn = pr.vtable.read;
var offset: u64 = start_offset;
while (true) {
const status = try readFn(pr.context, w, .none, offset);
offset += status.len;
if (status.end) return @intCast(offset - start_offset);
}
}

View File

@ -6,39 +6,35 @@ context: ?*anyopaque,
vtable: *const VTable,
pub const VTable = struct {
/// Writes bytes starting from `offset` to `bw`, or returns
/// `error.Unseekable`, indicating `streamRead` should be used instead.
/// Writes bytes from the internally tracked stream position to `bw`.
///
/// Returns the number of bytes written, which will be at minimum `0` and at
/// most `limit`. The number of bytes read, including zero, does not
/// indicate end of stream.
///
/// If the reader has an internal seek position, it is not mutated.
/// If the reader has an internal seek position, it moves forward in
/// accordance with the number of bytes return from this function.
///
/// The implementation should do a maximum of one underlying read call.
///
/// If this is `null` it is equivalent to always returning
/// `error.Unseekable`.
posRead: ?*const fn (ctx: ?*anyopaque, bw: *std.io.BufferedWriter, limit: Limit, offset: u64) anyerror!Status,
posReadVec: ?*const fn (ctx: ?*anyopaque, data: []const []u8, offset: u64) anyerror!Status,
/// If `error.Unstreamable` is returned, the resource cannot be used via a
/// streaming reading interface.
read: *const fn (ctx: ?*anyopaque, bw: *std.io.BufferedWriter, limit: Limit) anyerror!Status,
/// Writes bytes from the internally tracked stream position to `bw`, or
/// returns `error.Unstreamable`, indicating `posRead` should be used
/// instead.
/// Writes bytes from the internally tracked stream position to `data`.
///
/// Returns the number of bytes written, which will be at minimum `0` and at
/// most `limit`. The number of bytes read, including zero, does not
/// indicate end of stream.
///
/// If the reader has an internal seek position, it moves forward in accordance
/// with the number of bytes return from this function.
/// If the reader has an internal seek position, it moves forward in
/// accordance with the number of bytes return from this function.
///
/// The implementation should do a maximum of one underlying read call.
///
/// If this is `null` it is equivalent to always returning
/// `error.Unstreamable`.
streamRead: ?*const fn (ctx: ?*anyopaque, bw: *std.io.BufferedWriter, limit: Limit) anyerror!Status,
streamReadVec: ?*const fn (ctx: ?*anyopaque, data: []const []u8) anyerror!Status,
/// If `error.Unstreamable` is returned, the resource cannot be used via a
/// streaming reading interface.
readv: *const fn (ctx: ?*anyopaque, data: []const []u8) anyerror!Status,
};
pub const Len = @Type(.{ .int = .{ .signedness = .unsigned, .bits = @bitSizeOf(usize) - 1 } });
@ -60,42 +56,20 @@ pub const Limit = enum(usize) {
}
};
pub fn read(r: Reader, w: *std.io.BufferedWriter, limit: Limit) anyerror!Status {
return r.vtable.read(r.context, w, limit);
}
pub fn readv(r: Reader, data: []const []u8) anyerror!Status {
return r.vtable.readv(r.context, data);
}
/// Returns total number of bytes written to `w`.
pub fn readAll(r: Reader, w: *std.io.BufferedWriter) anyerror!usize {
if (r.vtable.pread != null) {
return posReadAll(r, w) catch |err| switch (err) {
error.Unseekable => {},
else => return err,
};
}
return streamReadAll(r, w);
}
/// Returns total number of bytes written to `w`.
///
/// May return `error.Unseekable`, indicating this function cannot be used to
/// read from the reader.
pub fn posReadAll(r: Reader, w: *std.io.BufferedWriter, start_offset: u64) anyerror!usize {
const vtable_posRead = r.vtable.posRead.?;
var offset: u64 = start_offset;
while (true) {
const status = try vtable_posRead(r.context, w, .none, offset);
offset += status.len;
if (status.end) return @intCast(offset - start_offset);
}
}
/// Returns total number of bytes written to `w`.
pub fn streamRead(r: Reader, w: *std.io.BufferedWriter, limit: Limit) anyerror!Status {
return r.vtable.streamRead.?(r.context, w, limit);
}
/// Returns total number of bytes written to `w`.
pub fn streamReadAll(r: Reader, w: *std.io.BufferedWriter) anyerror!usize {
const vtable_streamRead = r.vtable.streamRead.?;
const readFn = r.vtable.read;
var offset: usize = 0;
while (true) {
const status = try vtable_streamRead(r.context, w, .none);
const status = try readFn(r.context, w, .none);
offset += status.len;
if (status.end) return offset;
}
@ -107,42 +81,28 @@ pub fn streamReadAll(r: Reader, w: *std.io.BufferedWriter) anyerror!usize {
/// Caller owns returned memory.
///
/// If this function returns an error, the contents from the stream read so far are lost.
pub fn streamReadAlloc(r: Reader, gpa: std.mem.Allocator, max_size: usize) anyerror![]u8 {
const vtable_streamRead = r.vtable.streamRead.?;
var bw: std.io.BufferedWriter = .{
.buffer = .empty,
.mode = .{ .allocator = gpa },
};
const list = &bw.buffer;
defer list.deinit(gpa);
pub fn readAlloc(r: Reader, gpa: std.mem.Allocator, max_size: usize) anyerror![]u8 {
const readFn = r.vtable.read;
var aw: std.io.AllocatingWriter = undefined;
errdefer aw.deinit();
const bw = aw.init(gpa);
var remaining = max_size;
while (remaining > 0) {
const status = try vtable_streamRead(r.context, &bw, .init(remaining));
if (status.end) return list.toOwnedSlice(gpa);
const status = try readFn(r.context, bw, .init(remaining));
if (status.end) break;
remaining -= status.len;
}
return aw.toOwnedSlice(gpa);
}
/// Reads the stream until the end, ignoring all the data.
/// Returns the number of bytes discarded.
pub fn discardUntilEnd(r: Reader) anyerror!usize {
var bw = std.io.null_writer.unbuffered();
return streamReadAll(r, &bw);
return readAll(r, &bw);
}
pub fn allocating(r: Reader, gpa: std.mem.Allocator) std.io.BufferedReader {
return .{
.reader = r,
.buffered_writer = .{
.buffer = .empty,
.mode = .{ .allocator = gpa },
},
};
}
test "when the backing reader provides one byte at a time" {
test "readAlloc when the backing reader provides one byte at a time" {
const OneByteReader = struct {
str: []const u8,
curr: usize,

View File

@ -434,7 +434,7 @@ pub const SerializeContainerOptions = struct {
/// * `beginStruct`
/// * `beginTuple`
pub const Serializer = struct {
options: Options,
options: Options = .{},
indent_level: u8 = 0,
writer: *std.io.BufferedWriter,