std.Io.Reader: introduce readVec back into the VTable

simplifies and fixes things

addresses a subset of #24608
This commit is contained in:
Andrew Kelley 2025-07-29 23:11:10 -07:00
parent a9773944dc
commit cf7a28febb
5 changed files with 314 additions and 293 deletions

View File

@ -43,8 +43,8 @@ pub const VTable = struct {
///
/// In addition to, or instead of writing to `w`, the implementation may
/// choose to store data in `buffer`, modifying `seek` and `end`
/// accordingly. Stream implementations are encouraged to take advantage of
/// this if simplifies the logic.
/// accordingly. Implementations are encouraged to take advantage of
/// this if it simplifies the logic.
stream: *const fn (r: *Reader, w: *Writer, limit: Limit) StreamError!usize,
/// Consumes bytes from the internally tracked stream position without
@ -68,6 +68,21 @@ pub const VTable = struct {
/// This function is only called when `buffer` is empty.
discard: *const fn (r: *Reader, limit: Limit) Error!usize = defaultDiscard,
/// Returns number of bytes written to `data`.
///
/// `data` may not have nonzero length.
///
/// `data` may not contain an alias to `Reader.buffer`.
///
/// Implementations may ignore `data`, writing directly to `Reader.buffer`,
/// modifying `seek` and `end` accordingly, and returning 0 from this
/// function. Implementations are encouraged to take advantage of this if
/// it simplifies the logic.
///
/// The default implementation calls `stream` with either `data[0]` or
/// `Reader.buffer`, whichever is bigger.
readVec: *const fn (r: *Reader, data: []const []u8) Error!usize = defaultReadVec,
/// Ensures `capacity` more data can be buffered without rebasing.
///
/// Asserts `capacity` is within buffer capacity, or that the stream ends
@ -138,6 +153,7 @@ pub fn fixed(buffer: []const u8) Reader {
.vtable = &.{
.stream = endingStream,
.discard = endingDiscard,
.readVec = endingReadVec,
.rebase = endingRebase,
},
// This cast is safe because all potential writes to it will instead
@ -170,18 +186,18 @@ pub fn discard(r: *Reader, limit: Limit) Error!usize {
}
break :l .limited(n - buffered_len);
} else .unlimited;
r.seek = 0;
r.end = 0;
r.seek = r.end;
const n = try r.vtable.discard(r, remaining);
assert(n <= @intFromEnum(remaining));
return buffered_len + n;
}
pub fn defaultDiscard(r: *Reader, limit: Limit) Error!usize {
assert(r.seek == 0);
assert(r.end == 0);
var dw: Writer.Discarding = .init(r.buffer);
const n = r.stream(&dw.writer, limit) catch |err| switch (err) {
assert(r.seek == r.end);
r.seek = 0;
r.end = 0;
var d: Writer.Discarding = .init(r.buffer);
const n = r.stream(&d.writer, limit) catch |err| switch (err) {
error.WriteFailed => unreachable,
error.ReadFailed => return error.ReadFailed,
error.EndOfStream => return error.EndOfStream,
@ -294,7 +310,8 @@ pub fn appendRemaining(
list: *std.ArrayListAlignedUnmanaged(u8, alignment),
limit: Limit,
) LimitedAllocError!void {
if (limit != .unlimited) assert(r.buffer.len != 0); // Needed to detect limit exceeded without losing data.
if (limit == .unlimited) return appendRemainingUnlimited(r, gpa, alignment, list, 1);
assert(r.buffer.len != 0); // Needed to detect limit exceeded without losing data.
const buffer_contents = r.buffer[r.seek..r.end];
const copy_len = limit.minInt(buffer_contents.len);
try list.appendSlice(gpa, r.buffer[0..copy_len]);
@ -303,32 +320,67 @@ pub fn appendRemaining(
r.seek = 0;
r.end = 0;
var remaining = @intFromEnum(limit) - copy_len;
// From here, we leave `buffer` empty, appending directly to `list`.
var writer: Writer = .{
.buffer = undefined,
.end = undefined,
.vtable = &.{ .drain = Writer.fixedDrain },
};
while (true) {
try list.ensureUnusedCapacity(gpa, 1);
try list.ensureUnusedCapacity(gpa, 2);
const cap = list.unusedCapacitySlice();
const dest = cap[0..@min(cap.len, remaining)];
if (remaining - dest.len == 0) {
// Additionally provides `buffer` to detect end.
const new_remaining = readVecInner(r, &.{}, dest, remaining) catch |err| switch (err) {
error.EndOfStream => {
if (r.bufferedLen() != 0) return error.StreamTooLong;
return;
},
error.ReadFailed => return error.ReadFailed,
};
list.items.len += remaining - new_remaining;
remaining = new_remaining;
} else {
// Leave `buffer` empty, appending directly to `list`.
var dest_w: Writer = .fixed(dest);
const n = r.vtable.stream(r, &dest_w, .limited(dest.len)) catch |err| switch (err) {
error.WriteFailed => unreachable, // Prevented by the limit.
error.EndOfStream => return,
error.ReadFailed => return error.ReadFailed,
};
list.items.len += n;
remaining -= n;
const dest = cap[0..@min(cap.len, remaining + 1)];
writer.buffer = list.allocatedSlice();
writer.end = list.items.len;
const n = r.vtable.stream(r, &writer, .limited(dest.len)) catch |err| switch (err) {
error.WriteFailed => unreachable, // Prevented by the limit.
error.EndOfStream => return,
error.ReadFailed => return error.ReadFailed,
};
list.items.len += n;
if (n > remaining) {
// Move the byte to `Reader.buffer` so it is not lost.
assert(n - remaining == 1);
assert(r.end == 0);
r.buffer[0] = list.items[list.items.len - 1];
list.items.len -= 1;
r.end = 1;
return;
}
remaining -= n;
}
}
pub const UnlimitedAllocError = Allocator.Error || ShortError;
pub fn appendRemainingUnlimited(
r: *Reader,
gpa: Allocator,
comptime alignment: ?std.mem.Alignment,
list: *std.ArrayListAlignedUnmanaged(u8, alignment),
bump: usize,
) UnlimitedAllocError!void {
const buffer_contents = r.buffer[r.seek..r.end];
try list.ensureUnusedCapacity(gpa, buffer_contents.len + bump);
list.appendSliceAssumeCapacity(buffer_contents);
r.seek = 0;
r.end = 0;
// From here, we leave `buffer` empty, appending directly to `list`.
var writer: Writer = .{
.buffer = undefined,
.end = undefined,
.vtable = &.{ .drain = Writer.fixedDrain },
};
while (true) {
try list.ensureUnusedCapacity(gpa, bump);
writer.buffer = list.allocatedSlice();
writer.end = list.items.len;
const n = r.vtable.stream(r, &writer, .limited(list.unusedCapacitySlice().len)) catch |err| switch (err) {
error.WriteFailed => unreachable, // Prevented by the limit.
error.EndOfStream => return,
error.ReadFailed => return error.ReadFailed,
};
list.items.len += n;
}
}
@ -340,95 +392,64 @@ pub fn appendRemaining(
///
/// The reader's internal logical seek position moves forward in accordance
/// with the number of bytes returned from this function.
pub fn readVec(r: *Reader, data: []const []u8) Error!usize {
return readVecLimit(r, data, .unlimited);
}
/// Equivalent to `readVec` but reads at most `limit` bytes.
///
/// This ultimately will lower to a call to `stream`, but it must ensure
/// that the buffer used has at least as much capacity, in case that function
/// depends on a minimum buffer capacity. It also ensures that if the `stream`
/// implementation calls `Writer.writableVector`, it will get this data slice
/// along with the buffer at the end.
pub fn readVecLimit(r: *Reader, data: []const []u8, limit: Limit) Error!usize {
comptime assert(@intFromEnum(Limit.unlimited) == std.math.maxInt(usize));
var remaining = @intFromEnum(limit);
pub fn readVec(r: *Reader, data: [][]u8) Error!usize {
var seek = r.seek;
for (data, 0..) |buf, i| {
const buffer_contents = r.buffer[r.seek..r.end];
const copy_len = @min(buffer_contents.len, buf.len, remaining);
@memcpy(buf[0..copy_len], buffer_contents[0..copy_len]);
r.seek += copy_len;
remaining -= copy_len;
if (remaining == 0) break;
const contents = r.buffer[seek..r.end];
const copy_len = @min(contents.len, buf.len);
@memcpy(buf[0..copy_len], contents[0..copy_len]);
seek += copy_len;
if (buf.len - copy_len == 0) continue;
// All of `buffer` has been copied to `data`. We now set up a structure
// that enables the `Writer.writableVector` API, while also ensuring
// API that directly operates on the `Writable.buffer` has its minimum
// buffer capacity requirements met.
r.seek = 0;
r.end = 0;
remaining = try readVecInner(r, data[i + 1 ..], buf[copy_len..], remaining);
break;
// All of `buffer` has been copied to `data`.
const n = seek - r.seek;
r.seek = seek;
data[i] = buf[copy_len..];
defer data[i] = buf;
return n + try r.vtable.readVec(r, data[i..]);
}
return @intFromEnum(limit) - remaining;
const n = seek - r.seek;
r.seek = seek;
return n;
}
fn readVecInner(r: *Reader, middle: []const []u8, first: []u8, remaining: usize) Error!usize {
var wrapper: Writer.VectorWrapper = .{
.it = .{
.first = first,
.middle = middle,
.last = r.buffer,
},
.writer = .{
.buffer = if (first.len >= r.buffer.len) first else r.buffer,
.vtable = Writer.VectorWrapper.vtable,
},
/// Writes to `Reader.buffer` or `data`, whichever has larger capacity.
pub fn defaultReadVec(r: *Reader, data: []const []u8) Error!usize {
assert(r.seek == r.end);
r.seek = 0;
r.end = 0;
const first = data[0];
const direct = first.len >= r.buffer.len;
var writer: Writer = .{
.buffer = if (direct) first else r.buffer,
.end = 0,
.vtable = &.{ .drain = Writer.fixedDrain },
};
// If the limit may pass beyond user buffer into Reader buffer, use
// unlimited, allowing the Reader buffer to fill.
const limit: Limit = l: {
var n: usize = first.len;
for (middle) |m| n += m.len;
break :l if (remaining >= n) .unlimited else .limited(remaining);
};
var n = r.vtable.stream(r, &wrapper.writer, limit) catch |err| switch (err) {
error.WriteFailed => {
assert(!wrapper.used);
if (wrapper.writer.buffer.ptr == first.ptr) {
return remaining - wrapper.writer.end;
} else {
assert(wrapper.writer.end <= r.buffer.len);
r.end = wrapper.writer.end;
return remaining;
}
},
const limit: Limit = .limited(writer.buffer.len - writer.end);
const n = r.vtable.stream(r, &writer, limit) catch |err| switch (err) {
error.WriteFailed => unreachable,
else => |e| return e,
};
if (!wrapper.used) {
if (wrapper.writer.buffer.ptr == first.ptr) {
return remaining - n;
} else {
assert(n <= r.buffer.len);
r.end = n;
return remaining;
}
}
if (n < first.len) return remaining - n;
var result = remaining - first.len;
n -= first.len;
for (middle) |mid| {
if (n < mid.len) {
return result - n;
}
result -= mid.len;
n -= mid.len;
}
assert(n <= r.buffer.len);
r.end = n;
return result;
if (direct) return n;
r.end += n;
return 0;
}
/// Always writes to `Reader.buffer` and returns 0.
pub fn indirectReadVec(r: *Reader, data: []const []u8) Error!usize {
_ = data;
assert(r.seek == r.end);
var writer: Writer = .{
.buffer = r.buffer,
.end = r.end,
.vtable = &.{ .drain = Writer.fixedDrain },
};
const limit: Limit = .limited(writer.buffer.len - writer.end);
r.end += r.vtable.stream(r, &writer, limit) catch |err| switch (err) {
error.WriteFailed => unreachable,
else => |e| return e,
};
return 0;
}
pub fn buffered(r: *Reader) []u8 {
@ -642,29 +663,24 @@ pub fn readSliceAll(r: *Reader, buffer: []u8) Error!void {
/// See also:
/// * `readSliceAll`
pub fn readSliceShort(r: *Reader, buffer: []u8) ShortError!usize {
var i: usize = 0;
const contents = r.buffer[r.seek..r.end];
const copy_len = @min(buffer.len, contents.len);
@memcpy(buffer[0..copy_len], contents[0..copy_len]);
r.seek += copy_len;
if (buffer.len - copy_len == 0) {
@branchHint(.likely);
return buffer.len;
}
var i: usize = copy_len;
var data: [1][]u8 = undefined;
while (true) {
const buffer_contents = r.buffer[r.seek..r.end];
const dest = buffer[i..];
const copy_len = @min(dest.len, buffer_contents.len);
@memcpy(dest[0..copy_len], buffer_contents[0..copy_len]);
if (dest.len - copy_len == 0) {
@branchHint(.likely);
r.seek += copy_len;
return buffer.len;
}
i += copy_len;
r.end = 0;
r.seek = 0;
const remaining = buffer[i..];
const new_remaining_len = readVecInner(r, &.{}, remaining, remaining.len) catch |err| switch (err) {
data[0] = buffer[i..];
i += readVec(r, &data) catch |err| switch (err) {
error.EndOfStream => return i,
error.ReadFailed => return error.ReadFailed,
};
if (new_remaining_len == 0) return buffer.len;
i += remaining.len - new_remaining_len;
if (buffer.len - i == 0) return buffer.len;
}
return buffer.len;
}
/// Fill `buffer` with the next `buffer.len` bytes from the stream, advancing
@ -1632,19 +1648,6 @@ test readVec {
try testing.expectEqualStrings(std.ascii.letters[26..], bufs[1]);
}
test readVecLimit {
var r: Reader = .fixed(std.ascii.letters);
var flat_buffer: [52]u8 = undefined;
var bufs: [2][]u8 = .{
flat_buffer[0..26],
flat_buffer[26..],
};
// Short reads are possible with this function but not with fixed.
try testing.expectEqual(50, try r.readVecLimit(&bufs, .limited(50)));
try testing.expectEqualStrings(std.ascii.letters[0..26], bufs[0]);
try testing.expectEqualStrings(std.ascii.letters[26..50], bufs[1][0..24]);
}
test "expected error.EndOfStream" {
// Unit test inspired by https://github.com/ziglang/zig/issues/17733
var buffer: [3]u8 = undefined;
@ -1661,6 +1664,12 @@ fn endingStream(r: *Reader, w: *Writer, limit: Limit) StreamError!usize {
return error.EndOfStream;
}
fn endingReadVec(r: *Reader, data: []const []u8) Error!usize {
_ = r;
_ = data;
return error.EndOfStream;
}
fn endingDiscard(r: *Reader, limit: Limit) Error!usize {
_ = r;
_ = limit;
@ -1797,3 +1806,57 @@ pub fn Hashed(comptime Hasher: type) type {
}
};
}
pub fn writableVectorPosix(r: *Reader, buffer: []std.posix.iovec, data: []const []u8) Error!struct { usize, usize } {
var i: usize = 0;
var n: usize = 0;
for (data) |buf| {
if (buffer.len - i == 0) return .{ i, n };
if (buf.len != 0) {
buffer[i] = .{ .base = buf.ptr, .len = buf.len };
i += 1;
n += buf.len;
}
}
assert(r.seek == r.end);
const buf = r.buffer;
if (buf.len != 0) {
buffer[i] = .{ .base = buf.ptr, .len = buf.len };
i += 1;
}
return .{ i, n };
}
pub fn writableVectorWsa(
r: *Reader,
buffer: []std.os.windows.ws2_32.WSABUF,
data: []const []u8,
) Error!struct { usize, usize } {
var i: usize = 0;
var n: usize = 0;
for (data) |buf| {
if (buffer.len - i == 0) return .{ i, n };
if (buf.len == 0) continue;
if (std.math.cast(u32, buf.len)) |len| {
buffer[i] = .{ .buf = buf.ptr, .len = len };
i += 1;
n += len;
continue;
}
buffer[i] = .{ .buf = buf.ptr, .len = std.math.maxInt(u32) };
i += 1;
n += std.math.maxInt(u32);
return .{ i, n };
}
assert(r.seek == r.end);
const buf = r.buffer;
if (buf.len != 0) {
if (std.math.cast(u32, buf.len)) |len| {
buffer[i] = .{ .buf = buf.ptr, .len = len };
} else {
buffer[i] = .{ .buf = buf.ptr, .len = std.math.maxInt(u32) };
}
i += 1;
}
return .{ i, n };
}

View File

@ -342,97 +342,6 @@ pub fn writableSlicePreserve(w: *Writer, preserve_len: usize, len: usize) Error!
return big_slice[0..len];
}
pub const WritableVectorIterator = struct {
first: []u8,
middle: []const []u8 = &.{},
last: []u8 = &.{},
index: usize = 0,
pub fn next(it: *WritableVectorIterator) ?[]u8 {
while (true) {
const i = it.index;
it.index += 1;
if (i == 0) {
if (it.first.len == 0) continue;
return it.first;
}
const middle_index = i - 1;
if (middle_index < it.middle.len) {
const middle = it.middle[middle_index];
if (middle.len == 0) continue;
return middle;
}
if (middle_index == it.middle.len) {
if (it.last.len == 0) continue;
return it.last;
}
return null;
}
}
};
pub const VectorWrapper = struct {
writer: Writer,
it: WritableVectorIterator,
/// Tracks whether the "writable vector" API was used.
used: bool = false,
pub const vtable: *const VTable = &unique_vtable_allocation;
/// This is intended to be constant but it must be a unique address for
/// `@fieldParentPtr` to work.
var unique_vtable_allocation: VTable = .{ .drain = fixedDrain };
};
pub fn writableVectorIterator(w: *Writer) Error!WritableVectorIterator {
if (w.vtable == VectorWrapper.vtable) {
const wrapper: *VectorWrapper = @fieldParentPtr("writer", w);
wrapper.used = true;
return wrapper.it;
}
return .{ .first = try writableSliceGreedy(w, 1) };
}
pub fn writableVectorPosix(w: *Writer, buffer: []std.posix.iovec, limit: Limit) Error![]std.posix.iovec {
var it = try writableVectorIterator(w);
var i: usize = 0;
var remaining = limit;
while (it.next()) |full_buffer| {
if (!remaining.nonzero()) break;
if (buffer.len - i == 0) break;
const buf = remaining.slice(full_buffer);
if (buf.len == 0) continue;
buffer[i] = .{ .base = buf.ptr, .len = buf.len };
i += 1;
remaining = remaining.subtract(buf.len).?;
}
return buffer[0..i];
}
pub fn writableVectorWsa(
w: *Writer,
buffer: []std.os.windows.ws2_32.WSABUF,
limit: Limit,
) Error![]std.os.windows.ws2_32.WSABUF {
var it = try writableVectorIterator(w);
var i: usize = 0;
var remaining = limit;
while (it.next()) |full_buffer| {
if (!remaining.nonzero()) break;
if (buffer.len - i == 0) break;
const buf = remaining.slice(full_buffer);
if (buf.len == 0) continue;
if (std.math.cast(u32, buf.len)) |len| {
buffer[i] = .{ .buf = buf.ptr, .len = len };
i += 1;
remaining = remaining.subtract(len).?;
continue;
}
buffer[i] = .{ .buf = buf.ptr, .len = std.math.maxInt(u32) };
i += 1;
break;
}
return buffer[0..i];
}
pub fn ensureUnusedCapacity(w: *Writer, n: usize) Error!void {
_ = try writableSliceGreedy(w, n);
}
@ -451,13 +360,6 @@ pub fn advance(w: *Writer, n: usize) void {
w.end = new_end;
}
/// After calling `writableVector`, this function tracks how many bytes were
/// written to it.
pub fn advanceVector(w: *Writer, n: usize) usize {
if (w.vtable != VectorWrapper.vtable) advance(w, n);
return n;
}
/// The `data` parameter is mutable because this function needs to mutate the
/// fields in order to handle partial writes from `VTable.writeSplat`.
pub fn writeVecAll(w: *Writer, data: [][]const u8) Error!void {

View File

@ -88,6 +88,8 @@ pub fn init(input: *Reader, buffer: []u8, options: Options) Decompress {
.vtable = &.{
.stream = stream,
.rebase = rebase,
.discard = discard,
.readVec = Reader.indirectReadVec,
},
.buffer = buffer,
.seek = 0,
@ -100,11 +102,23 @@ fn rebase(r: *Reader, capacity: usize) Reader.RebaseError!void {
const d: *Decompress = @alignCast(@fieldParentPtr("reader", r));
assert(capacity <= r.buffer.len - d.window_len);
assert(r.end + capacity > r.buffer.len);
const discard = r.end - d.window_len;
const keep = r.buffer[discard..r.end];
const discard_n = r.end - d.window_len;
const keep = r.buffer[discard_n..r.end];
@memmove(r.buffer[0..keep.len], keep);
r.end = keep.len;
r.seek -= discard;
r.seek -= discard_n;
}
fn discard(r: *Reader, limit: Limit) Reader.Error!usize {
r.rebase(zstd.block_size_max) catch unreachable;
var d: Writer.Discarding = .init(r.buffer);
const n = r.stream(&d.writer, limit) catch |err| switch (err) {
error.WriteFailed => unreachable,
error.ReadFailed => return error.ReadFailed,
error.EndOfStream => return error.EndOfStream,
};
assert(n <= @intFromEnum(limit));
return n;
}
fn stream(r: *Reader, w: *Writer, limit: Limit) Reader.StreamError!usize {

View File

@ -1129,7 +1129,7 @@ pub fn seekableStream(file: File) SeekableStream {
/// * Whether reading should be done via fd-to-fd syscalls (e.g. `sendfile`)
/// versus plain variants (e.g. `read`).
///
/// Fulfills the `std.io.Reader` interface.
/// Fulfills the `std.Io.Reader` interface.
pub const Reader = struct {
file: File,
err: ?ReadError = null,
@ -1140,7 +1140,7 @@ pub const Reader = struct {
size: ?u64 = null,
size_err: ?GetEndPosError = null,
seek_err: ?Reader.SeekError = null,
interface: std.io.Reader,
interface: std.Io.Reader,
pub const SeekError = File.SeekError || error{
/// Seeking fell back to reading, and reached the end before the requested seek position.
@ -1177,11 +1177,12 @@ pub const Reader = struct {
}
};
pub fn initInterface(buffer: []u8) std.io.Reader {
pub fn initInterface(buffer: []u8) std.Io.Reader {
return .{
.vtable = &.{
.stream = Reader.stream,
.discard = Reader.discard,
.readVec = Reader.readVec,
},
.buffer = buffer,
.seek = 0,
@ -1294,7 +1295,7 @@ pub const Reader = struct {
/// vectors through the underlying read calls as possible.
const max_buffers_len = 16;
fn stream(io_reader: *std.io.Reader, w: *std.io.Writer, limit: std.io.Limit) std.io.Reader.StreamError!usize {
fn stream(io_reader: *std.Io.Reader, w: *std.Io.Writer, limit: std.Io.Limit) std.Io.Reader.StreamError!usize {
const r: *Reader = @alignCast(@fieldParentPtr("interface", io_reader));
switch (r.mode) {
.positional, .streaming => return w.sendFile(r, limit) catch |write_err| switch (write_err) {
@ -1305,16 +1306,33 @@ pub const Reader = struct {
else => |e| return e,
},
.positional_reading => {
const dest = limit.slice(try w.writableSliceGreedy(1));
const n = try readPositional(r, dest);
w.advance(n);
return n;
},
.streaming_reading => {
const dest = limit.slice(try w.writableSliceGreedy(1));
const n = try readStreaming(r, dest);
w.advance(n);
return n;
},
.failure => return error.ReadFailed,
}
}
fn readVec(io_reader: *std.Io.Reader, data: []const []u8) std.Io.Reader.Error!usize {
const r: *Reader = @alignCast(@fieldParentPtr("interface", io_reader));
switch (r.mode) {
.positional, .positional_reading => {
if (is_windows) {
// Unfortunately, `ReadFileScatter` cannot be used since it
// requires page alignment.
const dest = limit.slice(try w.writableSliceGreedy(1));
const n = try readPositional(r, dest);
w.advance(n);
return n;
return readPositional(r, data[0]);
}
var iovecs_buffer: [max_buffers_len]posix.iovec = undefined;
const dest = try w.writableVectorPosix(&iovecs_buffer, limit);
const dest_n, const data_size = try io_reader.writableVectorPosix(&iovecs_buffer, data);
const dest = iovecs_buffer[0..dest_n];
assert(dest[0].len > 0);
const n = posix.preadv(r.file.handle, dest, r.pos) catch |err| switch (err) {
error.Unseekable => {
@ -1339,19 +1357,22 @@ pub const Reader = struct {
return error.EndOfStream;
}
r.pos += n;
return w.advanceVector(n);
if (n > data_size) {
io_reader.seek = 0;
io_reader.end = n - data_size;
return data_size;
}
return n;
},
.streaming_reading => {
.streaming, .streaming_reading => {
if (is_windows) {
// Unfortunately, `ReadFileScatter` cannot be used since it
// requires page alignment.
const dest = limit.slice(try w.writableSliceGreedy(1));
const n = try readStreaming(r, dest);
w.advance(n);
return n;
return readStreaming(r, data[0]);
}
var iovecs_buffer: [max_buffers_len]posix.iovec = undefined;
const dest = try w.writableVectorPosix(&iovecs_buffer, limit);
const dest_n, const data_size = try io_reader.writableVectorPosix(&iovecs_buffer, data);
const dest = iovecs_buffer[0..dest_n];
assert(dest[0].len > 0);
const n = posix.readv(r.file.handle, dest) catch |err| {
r.err = err;
@ -1362,13 +1383,18 @@ pub const Reader = struct {
return error.EndOfStream;
}
r.pos += n;
return w.advanceVector(n);
if (n > data_size) {
io_reader.seek = 0;
io_reader.end = n - data_size;
return data_size;
}
return n;
},
.failure => return error.ReadFailed,
}
}
fn discard(io_reader: *std.io.Reader, limit: std.io.Limit) std.io.Reader.Error!usize {
fn discard(io_reader: *std.Io.Reader, limit: std.Io.Limit) std.Io.Reader.Error!usize {
const r: *Reader = @alignCast(@fieldParentPtr("interface", io_reader));
const file = r.file;
const pos = r.pos;
@ -1447,7 +1473,7 @@ pub const Reader = struct {
}
}
pub fn readPositional(r: *Reader, dest: []u8) std.io.Reader.Error!usize {
pub fn readPositional(r: *Reader, dest: []u8) std.Io.Reader.Error!usize {
const n = r.file.pread(dest, r.pos) catch |err| switch (err) {
error.Unseekable => {
r.mode = r.mode.toStreaming();
@ -1474,7 +1500,7 @@ pub const Reader = struct {
return n;
}
pub fn readStreaming(r: *Reader, dest: []u8) std.io.Reader.Error!usize {
pub fn readStreaming(r: *Reader, dest: []u8) std.Io.Reader.Error!usize {
const n = r.file.read(dest) catch |err| {
r.err = err;
return error.ReadFailed;
@ -1487,7 +1513,7 @@ pub const Reader = struct {
return n;
}
pub fn read(r: *Reader, dest: []u8) std.io.Reader.Error!usize {
pub fn read(r: *Reader, dest: []u8) std.Io.Reader.Error!usize {
switch (r.mode) {
.positional, .positional_reading => return readPositional(r, dest),
.streaming, .streaming_reading => return readStreaming(r, dest),
@ -1513,7 +1539,7 @@ pub const Writer = struct {
copy_file_range_err: ?CopyFileRangeError = null,
fcopyfile_err: ?FcopyfileError = null,
seek_err: ?SeekError = null,
interface: std.io.Writer,
interface: std.Io.Writer,
pub const Mode = Reader.Mode;
@ -1550,13 +1576,13 @@ pub const Writer = struct {
};
}
pub fn initInterface(buffer: []u8) std.io.Writer {
pub fn initInterface(buffer: []u8) std.Io.Writer {
return .{
.vtable = &.{
.drain = drain,
.sendFile = switch (builtin.zig_backend) {
else => sendFile,
.stage2_aarch64 => std.io.Writer.unimplementedSendFile,
.stage2_aarch64 => std.Io.Writer.unimplementedSendFile,
},
},
.buffer = buffer,
@ -1574,7 +1600,7 @@ pub const Writer = struct {
};
}
pub fn drain(io_w: *std.io.Writer, data: []const []const u8, splat: usize) std.io.Writer.Error!usize {
pub fn drain(io_w: *std.Io.Writer, data: []const []const u8, splat: usize) std.Io.Writer.Error!usize {
const w: *Writer = @alignCast(@fieldParentPtr("interface", io_w));
const handle = w.file.handle;
const buffered = io_w.buffered();
@ -1724,10 +1750,10 @@ pub const Writer = struct {
}
pub fn sendFile(
io_w: *std.io.Writer,
io_w: *std.Io.Writer,
file_reader: *Reader,
limit: std.io.Limit,
) std.io.Writer.FileError!usize {
limit: std.Io.Limit,
) std.Io.Writer.FileError!usize {
const reader_buffered = file_reader.interface.buffered();
if (reader_buffered.len >= @intFromEnum(limit))
return sendFileBuffered(io_w, file_reader, reader_buffered);
@ -1989,10 +2015,10 @@ pub const Writer = struct {
}
fn sendFileBuffered(
io_w: *std.io.Writer,
io_w: *std.Io.Writer,
file_reader: *Reader,
reader_buffered: []const u8,
) std.io.Writer.FileError!usize {
) std.Io.Writer.FileError!usize {
const n = try drain(io_w, &.{reader_buffered}, 1);
file_reader.seekTo(file_reader.pos + n) catch return error.ReadFailed;
return n;
@ -2015,7 +2041,7 @@ pub const Writer = struct {
}
}
pub const EndError = SetEndPosError || std.io.Writer.Error;
pub const EndError = SetEndPosError || std.Io.Writer.Error;
/// Flushes any buffered data and sets the end position of the file.
///

View File

@ -7,7 +7,7 @@ const net = @This();
const mem = std.mem;
const posix = std.posix;
const fs = std.fs;
const io = std.io;
const Io = std.Io;
const native_endian = builtin.target.cpu.arch.endian();
const native_os = builtin.os.tag;
const windows = std.os.windows;
@ -165,7 +165,7 @@ pub const Address = extern union {
}
}
pub fn format(self: Address, w: *std.io.Writer) std.io.Writer.Error!void {
pub fn format(self: Address, w: *Io.Writer) Io.Writer.Error!void {
switch (self.any.family) {
posix.AF.INET => try self.in.format(w),
posix.AF.INET6 => try self.in6.format(w),
@ -342,7 +342,7 @@ pub const Ip4Address = extern struct {
self.sa.port = mem.nativeToBig(u16, port);
}
pub fn format(self: Ip4Address, w: *std.io.Writer) std.io.Writer.Error!void {
pub fn format(self: Ip4Address, w: *Io.Writer) Io.Writer.Error!void {
const bytes: *const [4]u8 = @ptrCast(&self.sa.addr);
try w.print("{d}.{d}.{d}.{d}:{d}", .{ bytes[0], bytes[1], bytes[2], bytes[3], self.getPort() });
}
@ -633,7 +633,7 @@ pub const Ip6Address = extern struct {
self.sa.port = mem.nativeToBig(u16, port);
}
pub fn format(self: Ip6Address, w: *std.io.Writer) std.io.Writer.Error!void {
pub fn format(self: Ip6Address, w: *Io.Writer) Io.Writer.Error!void {
const port = mem.bigToNative(u16, self.sa.port);
if (mem.eql(u8, self.sa.addr[0..12], &[_]u8{ 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0xff, 0xff })) {
try w.print("[::ffff:{d}.{d}.{d}.{d}]:{d}", .{
@ -1348,7 +1348,7 @@ fn parseHosts(
name: []const u8,
family: posix.sa_family_t,
port: u16,
br: *io.Reader,
br: *Io.Reader,
) error{ OutOfMemory, ReadFailed }!void {
while (true) {
const line = br.takeDelimiterExclusive('\n') catch |err| switch (err) {
@ -1402,7 +1402,7 @@ test parseHosts {
// TODO parsing addresses should not have OS dependencies
return error.SkipZigTest;
}
var reader: std.io.Reader = .fixed(
var reader: Io.Reader = .fixed(
\\127.0.0.1 localhost
\\::1 localhost
\\127.0.0.2 abcd
@ -1583,7 +1583,7 @@ const ResolvConf = struct {
const Directive = enum { options, nameserver, domain, search };
const Option = enum { ndots, attempts, timeout };
fn parse(rc: *ResolvConf, reader: *io.Reader) !void {
fn parse(rc: *ResolvConf, reader: *Io.Reader) !void {
const gpa = rc.gpa;
while (reader.takeSentinel('\n')) |line_with_comment| {
const line = line: {
@ -1894,7 +1894,7 @@ pub const Stream = struct {
pub const Reader = switch (native_os) {
.windows => struct {
/// Use `interface` for portable code.
interface_state: io.Reader,
interface_state: Io.Reader,
/// Use `getStream` for portable code.
net_stream: Stream,
/// Use `getError` for portable code.
@ -1910,14 +1910,17 @@ pub const Stream = struct {
return r.error_state;
}
pub fn interface(r: *Reader) *io.Reader {
pub fn interface(r: *Reader) *Io.Reader {
return &r.interface_state;
}
pub fn init(net_stream: Stream, buffer: []u8) Reader {
return .{
.interface_state = .{
.vtable = &.{ .stream = stream },
.vtable = &.{
.stream = stream,
.readVec = readVec,
},
.buffer = buffer,
.seek = 0,
.end = 0,
@ -1927,16 +1930,29 @@ pub const Stream = struct {
};
}
fn stream(io_r: *io.Reader, io_w: *io.Writer, limit: io.Limit) io.Reader.StreamError!usize {
fn stream(io_r: *Io.Reader, io_w: *Io.Writer, limit: Io.Limit) Io.Reader.StreamError!usize {
const dest = limit.slice(try io_w.writableSliceGreedy(1));
const n = try readVec(io_r, &.{dest});
io_w.advance(n);
return n;
}
fn readVec(io_r: *std.Io.Reader, data: []const []u8) Io.Reader.Error!usize {
const r: *Reader = @alignCast(@fieldParentPtr("interface_state", io_r));
var iovecs: [max_buffers_len]windows.ws2_32.WSABUF = undefined;
const bufs = try io_w.writableVectorWsa(&iovecs, limit);
const bufs_n, const data_size = try io_r.writableVectorWsa(&iovecs, data);
const bufs = iovecs[0..bufs_n];
assert(bufs[0].len != 0);
const n = streamBufs(r, bufs) catch |err| {
r.error_state = err;
return error.ReadFailed;
};
if (n == 0) return error.EndOfStream;
if (n > data_size) {
io_r.seek = 0;
io_r.end = n - data_size;
return data_size;
}
return n;
}
@ -1968,7 +1984,7 @@ pub const Stream = struct {
pub const Error = ReadError;
pub fn interface(r: *Reader) *io.Reader {
pub fn interface(r: *Reader) *Io.Reader {
return &r.file_reader.interface;
}
@ -1996,7 +2012,7 @@ pub const Stream = struct {
pub const Writer = switch (native_os) {
.windows => struct {
/// This field is present on all systems.
interface: io.Writer,
interface: Io.Writer,
/// Use `getStream` for cross-platform support.
stream: Stream,
/// This field is present on all systems.
@ -2034,7 +2050,7 @@ pub const Stream = struct {
}
}
fn drain(io_w: *io.Writer, data: []const []const u8, splat: usize) io.Writer.Error!usize {
fn drain(io_w: *Io.Writer, data: []const []const u8, splat: usize) Io.Writer.Error!usize {
const w: *Writer = @alignCast(@fieldParentPtr("interface", io_w));
const buffered = io_w.buffered();
comptime assert(native_os == .windows);
@ -2106,7 +2122,7 @@ pub const Stream = struct {
},
else => struct {
/// This field is present on all systems.
interface: io.Writer,
interface: Io.Writer,
err: ?Error = null,
file_writer: File.Writer,
@ -2138,7 +2154,7 @@ pub const Stream = struct {
i.* += 1;
}
fn drain(io_w: *io.Writer, data: []const []const u8, splat: usize) io.Writer.Error!usize {
fn drain(io_w: *Io.Writer, data: []const []const u8, splat: usize) Io.Writer.Error!usize {
const w: *Writer = @alignCast(@fieldParentPtr("interface", io_w));
const buffered = io_w.buffered();
var iovecs: [max_buffers_len]posix.iovec_const = undefined;
@ -2190,7 +2206,7 @@ pub const Stream = struct {
});
}
fn sendFile(io_w: *io.Writer, file_reader: *File.Reader, limit: io.Limit) io.Writer.FileError!usize {
fn sendFile(io_w: *Io.Writer, file_reader: *File.Reader, limit: Io.Limit) Io.Writer.FileError!usize {
const w: *Writer = @alignCast(@fieldParentPtr("interface", io_w));
const n = try w.file_writer.interface.sendFileHeader(io_w.buffered(), file_reader, limit);
return io_w.consume(n);