From cf7a28febbbe877003d8d4f9a13ceb94698c1e3e Mon Sep 17 00:00:00 2001 From: Andrew Kelley Date: Tue, 29 Jul 2025 23:11:10 -0700 Subject: [PATCH] std.Io.Reader: introduce readVec back into the VTable simplifies and fixes things addresses a subset of #24608 --- lib/std/Io/Reader.zig | 349 ++++++++++++++++----------- lib/std/Io/Writer.zig | 98 -------- lib/std/compress/zstd/Decompress.zig | 20 +- lib/std/fs/File.zig | 88 ++++--- lib/std/net.zig | 52 ++-- 5 files changed, 314 insertions(+), 293 deletions(-) diff --git a/lib/std/Io/Reader.zig b/lib/std/Io/Reader.zig index db188c87bc..2b3f4d9cd8 100644 --- a/lib/std/Io/Reader.zig +++ b/lib/std/Io/Reader.zig @@ -43,8 +43,8 @@ pub const VTable = struct { /// /// In addition to, or instead of writing to `w`, the implementation may /// choose to store data in `buffer`, modifying `seek` and `end` - /// accordingly. Stream implementations are encouraged to take advantage of - /// this if simplifies the logic. + /// accordingly. Implementations are encouraged to take advantage of + /// this if it simplifies the logic. stream: *const fn (r: *Reader, w: *Writer, limit: Limit) StreamError!usize, /// Consumes bytes from the internally tracked stream position without @@ -68,6 +68,21 @@ pub const VTable = struct { /// This function is only called when `buffer` is empty. discard: *const fn (r: *Reader, limit: Limit) Error!usize = defaultDiscard, + /// Returns number of bytes written to `data`. + /// + /// `data` may not have nonzero length. + /// + /// `data` may not contain an alias to `Reader.buffer`. + /// + /// Implementations may ignore `data`, writing directly to `Reader.buffer`, + /// modifying `seek` and `end` accordingly, and returning 0 from this + /// function. Implementations are encouraged to take advantage of this if + /// it simplifies the logic. + /// + /// The default implementation calls `stream` with either `data[0]` or + /// `Reader.buffer`, whichever is bigger. + readVec: *const fn (r: *Reader, data: []const []u8) Error!usize = defaultReadVec, + /// Ensures `capacity` more data can be buffered without rebasing. /// /// Asserts `capacity` is within buffer capacity, or that the stream ends @@ -138,6 +153,7 @@ pub fn fixed(buffer: []const u8) Reader { .vtable = &.{ .stream = endingStream, .discard = endingDiscard, + .readVec = endingReadVec, .rebase = endingRebase, }, // This cast is safe because all potential writes to it will instead @@ -170,18 +186,18 @@ pub fn discard(r: *Reader, limit: Limit) Error!usize { } break :l .limited(n - buffered_len); } else .unlimited; - r.seek = 0; - r.end = 0; + r.seek = r.end; const n = try r.vtable.discard(r, remaining); assert(n <= @intFromEnum(remaining)); return buffered_len + n; } pub fn defaultDiscard(r: *Reader, limit: Limit) Error!usize { - assert(r.seek == 0); - assert(r.end == 0); - var dw: Writer.Discarding = .init(r.buffer); - const n = r.stream(&dw.writer, limit) catch |err| switch (err) { + assert(r.seek == r.end); + r.seek = 0; + r.end = 0; + var d: Writer.Discarding = .init(r.buffer); + const n = r.stream(&d.writer, limit) catch |err| switch (err) { error.WriteFailed => unreachable, error.ReadFailed => return error.ReadFailed, error.EndOfStream => return error.EndOfStream, @@ -294,7 +310,8 @@ pub fn appendRemaining( list: *std.ArrayListAlignedUnmanaged(u8, alignment), limit: Limit, ) LimitedAllocError!void { - if (limit != .unlimited) assert(r.buffer.len != 0); // Needed to detect limit exceeded without losing data. + if (limit == .unlimited) return appendRemainingUnlimited(r, gpa, alignment, list, 1); + assert(r.buffer.len != 0); // Needed to detect limit exceeded without losing data. const buffer_contents = r.buffer[r.seek..r.end]; const copy_len = limit.minInt(buffer_contents.len); try list.appendSlice(gpa, r.buffer[0..copy_len]); @@ -303,32 +320,67 @@ pub fn appendRemaining( r.seek = 0; r.end = 0; var remaining = @intFromEnum(limit) - copy_len; + // From here, we leave `buffer` empty, appending directly to `list`. + var writer: Writer = .{ + .buffer = undefined, + .end = undefined, + .vtable = &.{ .drain = Writer.fixedDrain }, + }; while (true) { - try list.ensureUnusedCapacity(gpa, 1); + try list.ensureUnusedCapacity(gpa, 2); const cap = list.unusedCapacitySlice(); - const dest = cap[0..@min(cap.len, remaining)]; - if (remaining - dest.len == 0) { - // Additionally provides `buffer` to detect end. - const new_remaining = readVecInner(r, &.{}, dest, remaining) catch |err| switch (err) { - error.EndOfStream => { - if (r.bufferedLen() != 0) return error.StreamTooLong; - return; - }, - error.ReadFailed => return error.ReadFailed, - }; - list.items.len += remaining - new_remaining; - remaining = new_remaining; - } else { - // Leave `buffer` empty, appending directly to `list`. - var dest_w: Writer = .fixed(dest); - const n = r.vtable.stream(r, &dest_w, .limited(dest.len)) catch |err| switch (err) { - error.WriteFailed => unreachable, // Prevented by the limit. - error.EndOfStream => return, - error.ReadFailed => return error.ReadFailed, - }; - list.items.len += n; - remaining -= n; + const dest = cap[0..@min(cap.len, remaining + 1)]; + writer.buffer = list.allocatedSlice(); + writer.end = list.items.len; + const n = r.vtable.stream(r, &writer, .limited(dest.len)) catch |err| switch (err) { + error.WriteFailed => unreachable, // Prevented by the limit. + error.EndOfStream => return, + error.ReadFailed => return error.ReadFailed, + }; + list.items.len += n; + if (n > remaining) { + // Move the byte to `Reader.buffer` so it is not lost. + assert(n - remaining == 1); + assert(r.end == 0); + r.buffer[0] = list.items[list.items.len - 1]; + list.items.len -= 1; + r.end = 1; + return; } + remaining -= n; + } +} + +pub const UnlimitedAllocError = Allocator.Error || ShortError; + +pub fn appendRemainingUnlimited( + r: *Reader, + gpa: Allocator, + comptime alignment: ?std.mem.Alignment, + list: *std.ArrayListAlignedUnmanaged(u8, alignment), + bump: usize, +) UnlimitedAllocError!void { + const buffer_contents = r.buffer[r.seek..r.end]; + try list.ensureUnusedCapacity(gpa, buffer_contents.len + bump); + list.appendSliceAssumeCapacity(buffer_contents); + r.seek = 0; + r.end = 0; + // From here, we leave `buffer` empty, appending directly to `list`. + var writer: Writer = .{ + .buffer = undefined, + .end = undefined, + .vtable = &.{ .drain = Writer.fixedDrain }, + }; + while (true) { + try list.ensureUnusedCapacity(gpa, bump); + writer.buffer = list.allocatedSlice(); + writer.end = list.items.len; + const n = r.vtable.stream(r, &writer, .limited(list.unusedCapacitySlice().len)) catch |err| switch (err) { + error.WriteFailed => unreachable, // Prevented by the limit. + error.EndOfStream => return, + error.ReadFailed => return error.ReadFailed, + }; + list.items.len += n; } } @@ -340,95 +392,64 @@ pub fn appendRemaining( /// /// The reader's internal logical seek position moves forward in accordance /// with the number of bytes returned from this function. -pub fn readVec(r: *Reader, data: []const []u8) Error!usize { - return readVecLimit(r, data, .unlimited); -} - -/// Equivalent to `readVec` but reads at most `limit` bytes. -/// -/// This ultimately will lower to a call to `stream`, but it must ensure -/// that the buffer used has at least as much capacity, in case that function -/// depends on a minimum buffer capacity. It also ensures that if the `stream` -/// implementation calls `Writer.writableVector`, it will get this data slice -/// along with the buffer at the end. -pub fn readVecLimit(r: *Reader, data: []const []u8, limit: Limit) Error!usize { - comptime assert(@intFromEnum(Limit.unlimited) == std.math.maxInt(usize)); - var remaining = @intFromEnum(limit); +pub fn readVec(r: *Reader, data: [][]u8) Error!usize { + var seek = r.seek; for (data, 0..) |buf, i| { - const buffer_contents = r.buffer[r.seek..r.end]; - const copy_len = @min(buffer_contents.len, buf.len, remaining); - @memcpy(buf[0..copy_len], buffer_contents[0..copy_len]); - r.seek += copy_len; - remaining -= copy_len; - if (remaining == 0) break; + const contents = r.buffer[seek..r.end]; + const copy_len = @min(contents.len, buf.len); + @memcpy(buf[0..copy_len], contents[0..copy_len]); + seek += copy_len; if (buf.len - copy_len == 0) continue; - // All of `buffer` has been copied to `data`. We now set up a structure - // that enables the `Writer.writableVector` API, while also ensuring - // API that directly operates on the `Writable.buffer` has its minimum - // buffer capacity requirements met. - r.seek = 0; - r.end = 0; - remaining = try readVecInner(r, data[i + 1 ..], buf[copy_len..], remaining); - break; + // All of `buffer` has been copied to `data`. + const n = seek - r.seek; + r.seek = seek; + data[i] = buf[copy_len..]; + defer data[i] = buf; + return n + try r.vtable.readVec(r, data[i..]); } - return @intFromEnum(limit) - remaining; + const n = seek - r.seek; + r.seek = seek; + return n; } -fn readVecInner(r: *Reader, middle: []const []u8, first: []u8, remaining: usize) Error!usize { - var wrapper: Writer.VectorWrapper = .{ - .it = .{ - .first = first, - .middle = middle, - .last = r.buffer, - }, - .writer = .{ - .buffer = if (first.len >= r.buffer.len) first else r.buffer, - .vtable = Writer.VectorWrapper.vtable, - }, +/// Writes to `Reader.buffer` or `data`, whichever has larger capacity. +pub fn defaultReadVec(r: *Reader, data: []const []u8) Error!usize { + assert(r.seek == r.end); + r.seek = 0; + r.end = 0; + const first = data[0]; + const direct = first.len >= r.buffer.len; + var writer: Writer = .{ + .buffer = if (direct) first else r.buffer, + .end = 0, + .vtable = &.{ .drain = Writer.fixedDrain }, }; - // If the limit may pass beyond user buffer into Reader buffer, use - // unlimited, allowing the Reader buffer to fill. - const limit: Limit = l: { - var n: usize = first.len; - for (middle) |m| n += m.len; - break :l if (remaining >= n) .unlimited else .limited(remaining); - }; - var n = r.vtable.stream(r, &wrapper.writer, limit) catch |err| switch (err) { - error.WriteFailed => { - assert(!wrapper.used); - if (wrapper.writer.buffer.ptr == first.ptr) { - return remaining - wrapper.writer.end; - } else { - assert(wrapper.writer.end <= r.buffer.len); - r.end = wrapper.writer.end; - return remaining; - } - }, + const limit: Limit = .limited(writer.buffer.len - writer.end); + const n = r.vtable.stream(r, &writer, limit) catch |err| switch (err) { + error.WriteFailed => unreachable, else => |e| return e, }; - if (!wrapper.used) { - if (wrapper.writer.buffer.ptr == first.ptr) { - return remaining - n; - } else { - assert(n <= r.buffer.len); - r.end = n; - return remaining; - } - } - if (n < first.len) return remaining - n; - var result = remaining - first.len; - n -= first.len; - for (middle) |mid| { - if (n < mid.len) { - return result - n; - } - result -= mid.len; - n -= mid.len; - } - assert(n <= r.buffer.len); - r.end = n; - return result; + if (direct) return n; + r.end += n; + return 0; +} + +/// Always writes to `Reader.buffer` and returns 0. +pub fn indirectReadVec(r: *Reader, data: []const []u8) Error!usize { + _ = data; + assert(r.seek == r.end); + var writer: Writer = .{ + .buffer = r.buffer, + .end = r.end, + .vtable = &.{ .drain = Writer.fixedDrain }, + }; + const limit: Limit = .limited(writer.buffer.len - writer.end); + r.end += r.vtable.stream(r, &writer, limit) catch |err| switch (err) { + error.WriteFailed => unreachable, + else => |e| return e, + }; + return 0; } pub fn buffered(r: *Reader) []u8 { @@ -642,29 +663,24 @@ pub fn readSliceAll(r: *Reader, buffer: []u8) Error!void { /// See also: /// * `readSliceAll` pub fn readSliceShort(r: *Reader, buffer: []u8) ShortError!usize { - var i: usize = 0; + const contents = r.buffer[r.seek..r.end]; + const copy_len = @min(buffer.len, contents.len); + @memcpy(buffer[0..copy_len], contents[0..copy_len]); + r.seek += copy_len; + if (buffer.len - copy_len == 0) { + @branchHint(.likely); + return buffer.len; + } + var i: usize = copy_len; + var data: [1][]u8 = undefined; while (true) { - const buffer_contents = r.buffer[r.seek..r.end]; - const dest = buffer[i..]; - const copy_len = @min(dest.len, buffer_contents.len); - @memcpy(dest[0..copy_len], buffer_contents[0..copy_len]); - if (dest.len - copy_len == 0) { - @branchHint(.likely); - r.seek += copy_len; - return buffer.len; - } - i += copy_len; - r.end = 0; - r.seek = 0; - const remaining = buffer[i..]; - const new_remaining_len = readVecInner(r, &.{}, remaining, remaining.len) catch |err| switch (err) { + data[0] = buffer[i..]; + i += readVec(r, &data) catch |err| switch (err) { error.EndOfStream => return i, error.ReadFailed => return error.ReadFailed, }; - if (new_remaining_len == 0) return buffer.len; - i += remaining.len - new_remaining_len; + if (buffer.len - i == 0) return buffer.len; } - return buffer.len; } /// Fill `buffer` with the next `buffer.len` bytes from the stream, advancing @@ -1632,19 +1648,6 @@ test readVec { try testing.expectEqualStrings(std.ascii.letters[26..], bufs[1]); } -test readVecLimit { - var r: Reader = .fixed(std.ascii.letters); - var flat_buffer: [52]u8 = undefined; - var bufs: [2][]u8 = .{ - flat_buffer[0..26], - flat_buffer[26..], - }; - // Short reads are possible with this function but not with fixed. - try testing.expectEqual(50, try r.readVecLimit(&bufs, .limited(50))); - try testing.expectEqualStrings(std.ascii.letters[0..26], bufs[0]); - try testing.expectEqualStrings(std.ascii.letters[26..50], bufs[1][0..24]); -} - test "expected error.EndOfStream" { // Unit test inspired by https://github.com/ziglang/zig/issues/17733 var buffer: [3]u8 = undefined; @@ -1661,6 +1664,12 @@ fn endingStream(r: *Reader, w: *Writer, limit: Limit) StreamError!usize { return error.EndOfStream; } +fn endingReadVec(r: *Reader, data: []const []u8) Error!usize { + _ = r; + _ = data; + return error.EndOfStream; +} + fn endingDiscard(r: *Reader, limit: Limit) Error!usize { _ = r; _ = limit; @@ -1797,3 +1806,57 @@ pub fn Hashed(comptime Hasher: type) type { } }; } + +pub fn writableVectorPosix(r: *Reader, buffer: []std.posix.iovec, data: []const []u8) Error!struct { usize, usize } { + var i: usize = 0; + var n: usize = 0; + for (data) |buf| { + if (buffer.len - i == 0) return .{ i, n }; + if (buf.len != 0) { + buffer[i] = .{ .base = buf.ptr, .len = buf.len }; + i += 1; + n += buf.len; + } + } + assert(r.seek == r.end); + const buf = r.buffer; + if (buf.len != 0) { + buffer[i] = .{ .base = buf.ptr, .len = buf.len }; + i += 1; + } + return .{ i, n }; +} + +pub fn writableVectorWsa( + r: *Reader, + buffer: []std.os.windows.ws2_32.WSABUF, + data: []const []u8, +) Error!struct { usize, usize } { + var i: usize = 0; + var n: usize = 0; + for (data) |buf| { + if (buffer.len - i == 0) return .{ i, n }; + if (buf.len == 0) continue; + if (std.math.cast(u32, buf.len)) |len| { + buffer[i] = .{ .buf = buf.ptr, .len = len }; + i += 1; + n += len; + continue; + } + buffer[i] = .{ .buf = buf.ptr, .len = std.math.maxInt(u32) }; + i += 1; + n += std.math.maxInt(u32); + return .{ i, n }; + } + assert(r.seek == r.end); + const buf = r.buffer; + if (buf.len != 0) { + if (std.math.cast(u32, buf.len)) |len| { + buffer[i] = .{ .buf = buf.ptr, .len = len }; + } else { + buffer[i] = .{ .buf = buf.ptr, .len = std.math.maxInt(u32) }; + } + i += 1; + } + return .{ i, n }; +} diff --git a/lib/std/Io/Writer.zig b/lib/std/Io/Writer.zig index 06a6534071..a177bda8ff 100644 --- a/lib/std/Io/Writer.zig +++ b/lib/std/Io/Writer.zig @@ -342,97 +342,6 @@ pub fn writableSlicePreserve(w: *Writer, preserve_len: usize, len: usize) Error! return big_slice[0..len]; } -pub const WritableVectorIterator = struct { - first: []u8, - middle: []const []u8 = &.{}, - last: []u8 = &.{}, - index: usize = 0, - - pub fn next(it: *WritableVectorIterator) ?[]u8 { - while (true) { - const i = it.index; - it.index += 1; - if (i == 0) { - if (it.first.len == 0) continue; - return it.first; - } - const middle_index = i - 1; - if (middle_index < it.middle.len) { - const middle = it.middle[middle_index]; - if (middle.len == 0) continue; - return middle; - } - if (middle_index == it.middle.len) { - if (it.last.len == 0) continue; - return it.last; - } - return null; - } - } -}; - -pub const VectorWrapper = struct { - writer: Writer, - it: WritableVectorIterator, - /// Tracks whether the "writable vector" API was used. - used: bool = false, - pub const vtable: *const VTable = &unique_vtable_allocation; - /// This is intended to be constant but it must be a unique address for - /// `@fieldParentPtr` to work. - var unique_vtable_allocation: VTable = .{ .drain = fixedDrain }; -}; - -pub fn writableVectorIterator(w: *Writer) Error!WritableVectorIterator { - if (w.vtable == VectorWrapper.vtable) { - const wrapper: *VectorWrapper = @fieldParentPtr("writer", w); - wrapper.used = true; - return wrapper.it; - } - return .{ .first = try writableSliceGreedy(w, 1) }; -} - -pub fn writableVectorPosix(w: *Writer, buffer: []std.posix.iovec, limit: Limit) Error![]std.posix.iovec { - var it = try writableVectorIterator(w); - var i: usize = 0; - var remaining = limit; - while (it.next()) |full_buffer| { - if (!remaining.nonzero()) break; - if (buffer.len - i == 0) break; - const buf = remaining.slice(full_buffer); - if (buf.len == 0) continue; - buffer[i] = .{ .base = buf.ptr, .len = buf.len }; - i += 1; - remaining = remaining.subtract(buf.len).?; - } - return buffer[0..i]; -} - -pub fn writableVectorWsa( - w: *Writer, - buffer: []std.os.windows.ws2_32.WSABUF, - limit: Limit, -) Error![]std.os.windows.ws2_32.WSABUF { - var it = try writableVectorIterator(w); - var i: usize = 0; - var remaining = limit; - while (it.next()) |full_buffer| { - if (!remaining.nonzero()) break; - if (buffer.len - i == 0) break; - const buf = remaining.slice(full_buffer); - if (buf.len == 0) continue; - if (std.math.cast(u32, buf.len)) |len| { - buffer[i] = .{ .buf = buf.ptr, .len = len }; - i += 1; - remaining = remaining.subtract(len).?; - continue; - } - buffer[i] = .{ .buf = buf.ptr, .len = std.math.maxInt(u32) }; - i += 1; - break; - } - return buffer[0..i]; -} - pub fn ensureUnusedCapacity(w: *Writer, n: usize) Error!void { _ = try writableSliceGreedy(w, n); } @@ -451,13 +360,6 @@ pub fn advance(w: *Writer, n: usize) void { w.end = new_end; } -/// After calling `writableVector`, this function tracks how many bytes were -/// written to it. -pub fn advanceVector(w: *Writer, n: usize) usize { - if (w.vtable != VectorWrapper.vtable) advance(w, n); - return n; -} - /// The `data` parameter is mutable because this function needs to mutate the /// fields in order to handle partial writes from `VTable.writeSplat`. pub fn writeVecAll(w: *Writer, data: [][]const u8) Error!void { diff --git a/lib/std/compress/zstd/Decompress.zig b/lib/std/compress/zstd/Decompress.zig index eb431e644c..db85474e00 100644 --- a/lib/std/compress/zstd/Decompress.zig +++ b/lib/std/compress/zstd/Decompress.zig @@ -88,6 +88,8 @@ pub fn init(input: *Reader, buffer: []u8, options: Options) Decompress { .vtable = &.{ .stream = stream, .rebase = rebase, + .discard = discard, + .readVec = Reader.indirectReadVec, }, .buffer = buffer, .seek = 0, @@ -100,11 +102,23 @@ fn rebase(r: *Reader, capacity: usize) Reader.RebaseError!void { const d: *Decompress = @alignCast(@fieldParentPtr("reader", r)); assert(capacity <= r.buffer.len - d.window_len); assert(r.end + capacity > r.buffer.len); - const discard = r.end - d.window_len; - const keep = r.buffer[discard..r.end]; + const discard_n = r.end - d.window_len; + const keep = r.buffer[discard_n..r.end]; @memmove(r.buffer[0..keep.len], keep); r.end = keep.len; - r.seek -= discard; + r.seek -= discard_n; +} + +fn discard(r: *Reader, limit: Limit) Reader.Error!usize { + r.rebase(zstd.block_size_max) catch unreachable; + var d: Writer.Discarding = .init(r.buffer); + const n = r.stream(&d.writer, limit) catch |err| switch (err) { + error.WriteFailed => unreachable, + error.ReadFailed => return error.ReadFailed, + error.EndOfStream => return error.EndOfStream, + }; + assert(n <= @intFromEnum(limit)); + return n; } fn stream(r: *Reader, w: *Writer, limit: Limit) Reader.StreamError!usize { diff --git a/lib/std/fs/File.zig b/lib/std/fs/File.zig index fd965babfc..eca2d6667f 100644 --- a/lib/std/fs/File.zig +++ b/lib/std/fs/File.zig @@ -1129,7 +1129,7 @@ pub fn seekableStream(file: File) SeekableStream { /// * Whether reading should be done via fd-to-fd syscalls (e.g. `sendfile`) /// versus plain variants (e.g. `read`). /// -/// Fulfills the `std.io.Reader` interface. +/// Fulfills the `std.Io.Reader` interface. pub const Reader = struct { file: File, err: ?ReadError = null, @@ -1140,7 +1140,7 @@ pub const Reader = struct { size: ?u64 = null, size_err: ?GetEndPosError = null, seek_err: ?Reader.SeekError = null, - interface: std.io.Reader, + interface: std.Io.Reader, pub const SeekError = File.SeekError || error{ /// Seeking fell back to reading, and reached the end before the requested seek position. @@ -1177,11 +1177,12 @@ pub const Reader = struct { } }; - pub fn initInterface(buffer: []u8) std.io.Reader { + pub fn initInterface(buffer: []u8) std.Io.Reader { return .{ .vtable = &.{ .stream = Reader.stream, .discard = Reader.discard, + .readVec = Reader.readVec, }, .buffer = buffer, .seek = 0, @@ -1294,7 +1295,7 @@ pub const Reader = struct { /// vectors through the underlying read calls as possible. const max_buffers_len = 16; - fn stream(io_reader: *std.io.Reader, w: *std.io.Writer, limit: std.io.Limit) std.io.Reader.StreamError!usize { + fn stream(io_reader: *std.Io.Reader, w: *std.Io.Writer, limit: std.Io.Limit) std.Io.Reader.StreamError!usize { const r: *Reader = @alignCast(@fieldParentPtr("interface", io_reader)); switch (r.mode) { .positional, .streaming => return w.sendFile(r, limit) catch |write_err| switch (write_err) { @@ -1305,16 +1306,33 @@ pub const Reader = struct { else => |e| return e, }, .positional_reading => { + const dest = limit.slice(try w.writableSliceGreedy(1)); + const n = try readPositional(r, dest); + w.advance(n); + return n; + }, + .streaming_reading => { + const dest = limit.slice(try w.writableSliceGreedy(1)); + const n = try readStreaming(r, dest); + w.advance(n); + return n; + }, + .failure => return error.ReadFailed, + } + } + + fn readVec(io_reader: *std.Io.Reader, data: []const []u8) std.Io.Reader.Error!usize { + const r: *Reader = @alignCast(@fieldParentPtr("interface", io_reader)); + switch (r.mode) { + .positional, .positional_reading => { if (is_windows) { // Unfortunately, `ReadFileScatter` cannot be used since it // requires page alignment. - const dest = limit.slice(try w.writableSliceGreedy(1)); - const n = try readPositional(r, dest); - w.advance(n); - return n; + return readPositional(r, data[0]); } var iovecs_buffer: [max_buffers_len]posix.iovec = undefined; - const dest = try w.writableVectorPosix(&iovecs_buffer, limit); + const dest_n, const data_size = try io_reader.writableVectorPosix(&iovecs_buffer, data); + const dest = iovecs_buffer[0..dest_n]; assert(dest[0].len > 0); const n = posix.preadv(r.file.handle, dest, r.pos) catch |err| switch (err) { error.Unseekable => { @@ -1339,19 +1357,22 @@ pub const Reader = struct { return error.EndOfStream; } r.pos += n; - return w.advanceVector(n); + if (n > data_size) { + io_reader.seek = 0; + io_reader.end = n - data_size; + return data_size; + } + return n; }, - .streaming_reading => { + .streaming, .streaming_reading => { if (is_windows) { // Unfortunately, `ReadFileScatter` cannot be used since it // requires page alignment. - const dest = limit.slice(try w.writableSliceGreedy(1)); - const n = try readStreaming(r, dest); - w.advance(n); - return n; + return readStreaming(r, data[0]); } var iovecs_buffer: [max_buffers_len]posix.iovec = undefined; - const dest = try w.writableVectorPosix(&iovecs_buffer, limit); + const dest_n, const data_size = try io_reader.writableVectorPosix(&iovecs_buffer, data); + const dest = iovecs_buffer[0..dest_n]; assert(dest[0].len > 0); const n = posix.readv(r.file.handle, dest) catch |err| { r.err = err; @@ -1362,13 +1383,18 @@ pub const Reader = struct { return error.EndOfStream; } r.pos += n; - return w.advanceVector(n); + if (n > data_size) { + io_reader.seek = 0; + io_reader.end = n - data_size; + return data_size; + } + return n; }, .failure => return error.ReadFailed, } } - fn discard(io_reader: *std.io.Reader, limit: std.io.Limit) std.io.Reader.Error!usize { + fn discard(io_reader: *std.Io.Reader, limit: std.Io.Limit) std.Io.Reader.Error!usize { const r: *Reader = @alignCast(@fieldParentPtr("interface", io_reader)); const file = r.file; const pos = r.pos; @@ -1447,7 +1473,7 @@ pub const Reader = struct { } } - pub fn readPositional(r: *Reader, dest: []u8) std.io.Reader.Error!usize { + pub fn readPositional(r: *Reader, dest: []u8) std.Io.Reader.Error!usize { const n = r.file.pread(dest, r.pos) catch |err| switch (err) { error.Unseekable => { r.mode = r.mode.toStreaming(); @@ -1474,7 +1500,7 @@ pub const Reader = struct { return n; } - pub fn readStreaming(r: *Reader, dest: []u8) std.io.Reader.Error!usize { + pub fn readStreaming(r: *Reader, dest: []u8) std.Io.Reader.Error!usize { const n = r.file.read(dest) catch |err| { r.err = err; return error.ReadFailed; @@ -1487,7 +1513,7 @@ pub const Reader = struct { return n; } - pub fn read(r: *Reader, dest: []u8) std.io.Reader.Error!usize { + pub fn read(r: *Reader, dest: []u8) std.Io.Reader.Error!usize { switch (r.mode) { .positional, .positional_reading => return readPositional(r, dest), .streaming, .streaming_reading => return readStreaming(r, dest), @@ -1513,7 +1539,7 @@ pub const Writer = struct { copy_file_range_err: ?CopyFileRangeError = null, fcopyfile_err: ?FcopyfileError = null, seek_err: ?SeekError = null, - interface: std.io.Writer, + interface: std.Io.Writer, pub const Mode = Reader.Mode; @@ -1550,13 +1576,13 @@ pub const Writer = struct { }; } - pub fn initInterface(buffer: []u8) std.io.Writer { + pub fn initInterface(buffer: []u8) std.Io.Writer { return .{ .vtable = &.{ .drain = drain, .sendFile = switch (builtin.zig_backend) { else => sendFile, - .stage2_aarch64 => std.io.Writer.unimplementedSendFile, + .stage2_aarch64 => std.Io.Writer.unimplementedSendFile, }, }, .buffer = buffer, @@ -1574,7 +1600,7 @@ pub const Writer = struct { }; } - pub fn drain(io_w: *std.io.Writer, data: []const []const u8, splat: usize) std.io.Writer.Error!usize { + pub fn drain(io_w: *std.Io.Writer, data: []const []const u8, splat: usize) std.Io.Writer.Error!usize { const w: *Writer = @alignCast(@fieldParentPtr("interface", io_w)); const handle = w.file.handle; const buffered = io_w.buffered(); @@ -1724,10 +1750,10 @@ pub const Writer = struct { } pub fn sendFile( - io_w: *std.io.Writer, + io_w: *std.Io.Writer, file_reader: *Reader, - limit: std.io.Limit, - ) std.io.Writer.FileError!usize { + limit: std.Io.Limit, + ) std.Io.Writer.FileError!usize { const reader_buffered = file_reader.interface.buffered(); if (reader_buffered.len >= @intFromEnum(limit)) return sendFileBuffered(io_w, file_reader, reader_buffered); @@ -1989,10 +2015,10 @@ pub const Writer = struct { } fn sendFileBuffered( - io_w: *std.io.Writer, + io_w: *std.Io.Writer, file_reader: *Reader, reader_buffered: []const u8, - ) std.io.Writer.FileError!usize { + ) std.Io.Writer.FileError!usize { const n = try drain(io_w, &.{reader_buffered}, 1); file_reader.seekTo(file_reader.pos + n) catch return error.ReadFailed; return n; @@ -2015,7 +2041,7 @@ pub const Writer = struct { } } - pub const EndError = SetEndPosError || std.io.Writer.Error; + pub const EndError = SetEndPosError || std.Io.Writer.Error; /// Flushes any buffered data and sets the end position of the file. /// diff --git a/lib/std/net.zig b/lib/std/net.zig index f43c2f9b53..d7387662c0 100644 --- a/lib/std/net.zig +++ b/lib/std/net.zig @@ -7,7 +7,7 @@ const net = @This(); const mem = std.mem; const posix = std.posix; const fs = std.fs; -const io = std.io; +const Io = std.Io; const native_endian = builtin.target.cpu.arch.endian(); const native_os = builtin.os.tag; const windows = std.os.windows; @@ -165,7 +165,7 @@ pub const Address = extern union { } } - pub fn format(self: Address, w: *std.io.Writer) std.io.Writer.Error!void { + pub fn format(self: Address, w: *Io.Writer) Io.Writer.Error!void { switch (self.any.family) { posix.AF.INET => try self.in.format(w), posix.AF.INET6 => try self.in6.format(w), @@ -342,7 +342,7 @@ pub const Ip4Address = extern struct { self.sa.port = mem.nativeToBig(u16, port); } - pub fn format(self: Ip4Address, w: *std.io.Writer) std.io.Writer.Error!void { + pub fn format(self: Ip4Address, w: *Io.Writer) Io.Writer.Error!void { const bytes: *const [4]u8 = @ptrCast(&self.sa.addr); try w.print("{d}.{d}.{d}.{d}:{d}", .{ bytes[0], bytes[1], bytes[2], bytes[3], self.getPort() }); } @@ -633,7 +633,7 @@ pub const Ip6Address = extern struct { self.sa.port = mem.nativeToBig(u16, port); } - pub fn format(self: Ip6Address, w: *std.io.Writer) std.io.Writer.Error!void { + pub fn format(self: Ip6Address, w: *Io.Writer) Io.Writer.Error!void { const port = mem.bigToNative(u16, self.sa.port); if (mem.eql(u8, self.sa.addr[0..12], &[_]u8{ 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0xff, 0xff })) { try w.print("[::ffff:{d}.{d}.{d}.{d}]:{d}", .{ @@ -1348,7 +1348,7 @@ fn parseHosts( name: []const u8, family: posix.sa_family_t, port: u16, - br: *io.Reader, + br: *Io.Reader, ) error{ OutOfMemory, ReadFailed }!void { while (true) { const line = br.takeDelimiterExclusive('\n') catch |err| switch (err) { @@ -1402,7 +1402,7 @@ test parseHosts { // TODO parsing addresses should not have OS dependencies return error.SkipZigTest; } - var reader: std.io.Reader = .fixed( + var reader: Io.Reader = .fixed( \\127.0.0.1 localhost \\::1 localhost \\127.0.0.2 abcd @@ -1583,7 +1583,7 @@ const ResolvConf = struct { const Directive = enum { options, nameserver, domain, search }; const Option = enum { ndots, attempts, timeout }; - fn parse(rc: *ResolvConf, reader: *io.Reader) !void { + fn parse(rc: *ResolvConf, reader: *Io.Reader) !void { const gpa = rc.gpa; while (reader.takeSentinel('\n')) |line_with_comment| { const line = line: { @@ -1894,7 +1894,7 @@ pub const Stream = struct { pub const Reader = switch (native_os) { .windows => struct { /// Use `interface` for portable code. - interface_state: io.Reader, + interface_state: Io.Reader, /// Use `getStream` for portable code. net_stream: Stream, /// Use `getError` for portable code. @@ -1910,14 +1910,17 @@ pub const Stream = struct { return r.error_state; } - pub fn interface(r: *Reader) *io.Reader { + pub fn interface(r: *Reader) *Io.Reader { return &r.interface_state; } pub fn init(net_stream: Stream, buffer: []u8) Reader { return .{ .interface_state = .{ - .vtable = &.{ .stream = stream }, + .vtable = &.{ + .stream = stream, + .readVec = readVec, + }, .buffer = buffer, .seek = 0, .end = 0, @@ -1927,16 +1930,29 @@ pub const Stream = struct { }; } - fn stream(io_r: *io.Reader, io_w: *io.Writer, limit: io.Limit) io.Reader.StreamError!usize { + fn stream(io_r: *Io.Reader, io_w: *Io.Writer, limit: Io.Limit) Io.Reader.StreamError!usize { + const dest = limit.slice(try io_w.writableSliceGreedy(1)); + const n = try readVec(io_r, &.{dest}); + io_w.advance(n); + return n; + } + + fn readVec(io_r: *std.Io.Reader, data: []const []u8) Io.Reader.Error!usize { const r: *Reader = @alignCast(@fieldParentPtr("interface_state", io_r)); var iovecs: [max_buffers_len]windows.ws2_32.WSABUF = undefined; - const bufs = try io_w.writableVectorWsa(&iovecs, limit); + const bufs_n, const data_size = try io_r.writableVectorWsa(&iovecs, data); + const bufs = iovecs[0..bufs_n]; assert(bufs[0].len != 0); const n = streamBufs(r, bufs) catch |err| { r.error_state = err; return error.ReadFailed; }; if (n == 0) return error.EndOfStream; + if (n > data_size) { + io_r.seek = 0; + io_r.end = n - data_size; + return data_size; + } return n; } @@ -1968,7 +1984,7 @@ pub const Stream = struct { pub const Error = ReadError; - pub fn interface(r: *Reader) *io.Reader { + pub fn interface(r: *Reader) *Io.Reader { return &r.file_reader.interface; } @@ -1996,7 +2012,7 @@ pub const Stream = struct { pub const Writer = switch (native_os) { .windows => struct { /// This field is present on all systems. - interface: io.Writer, + interface: Io.Writer, /// Use `getStream` for cross-platform support. stream: Stream, /// This field is present on all systems. @@ -2034,7 +2050,7 @@ pub const Stream = struct { } } - fn drain(io_w: *io.Writer, data: []const []const u8, splat: usize) io.Writer.Error!usize { + fn drain(io_w: *Io.Writer, data: []const []const u8, splat: usize) Io.Writer.Error!usize { const w: *Writer = @alignCast(@fieldParentPtr("interface", io_w)); const buffered = io_w.buffered(); comptime assert(native_os == .windows); @@ -2106,7 +2122,7 @@ pub const Stream = struct { }, else => struct { /// This field is present on all systems. - interface: io.Writer, + interface: Io.Writer, err: ?Error = null, file_writer: File.Writer, @@ -2138,7 +2154,7 @@ pub const Stream = struct { i.* += 1; } - fn drain(io_w: *io.Writer, data: []const []const u8, splat: usize) io.Writer.Error!usize { + fn drain(io_w: *Io.Writer, data: []const []const u8, splat: usize) Io.Writer.Error!usize { const w: *Writer = @alignCast(@fieldParentPtr("interface", io_w)); const buffered = io_w.buffered(); var iovecs: [max_buffers_len]posix.iovec_const = undefined; @@ -2190,7 +2206,7 @@ pub const Stream = struct { }); } - fn sendFile(io_w: *io.Writer, file_reader: *File.Reader, limit: io.Limit) io.Writer.FileError!usize { + fn sendFile(io_w: *Io.Writer, file_reader: *File.Reader, limit: Io.Limit) Io.Writer.FileError!usize { const w: *Writer = @alignCast(@fieldParentPtr("interface", io_w)); const n = try w.file_writer.interface.sendFileHeader(io_w.buffered(), file_reader, limit); return io_w.consume(n);