std.io.Reader: implement readVec

and specify that data.len == 0 means "flush"
This commit is contained in:
Andrew Kelley 2025-06-05 19:01:46 -07:00
parent b393112674
commit 084e45fd86
6 changed files with 205 additions and 161 deletions

View File

@ -1081,11 +1081,17 @@ const CorrespondingLines = struct {
last_byte: u8 = 0, last_byte: u8 = 0,
at_eof: bool = false, at_eof: bool = false,
span: SourceMappings.CorrespondingSpan, span: SourceMappings.CorrespondingSpan,
file: std.fs.File, file_reader: *std.fs.File.Reader,
buffered_reader: *std.io.Reader,
code_page: SupportedCodePage, code_page: SupportedCodePage,
pub fn init(cwd: std.fs.Dir, err_details: ErrorDetails, line_for_comparison: []const u8, corresponding_span: SourceMappings.CorrespondingSpan, corresponding_file: []const u8) !CorrespondingLines { pub fn init(
cwd: std.fs.Dir,
err_details: ErrorDetails,
line_for_comparison: []const u8,
corresponding_span: SourceMappings.CorrespondingSpan,
corresponding_file: []const u8,
file_read_buffer: []u8,
) !CorrespondingLines {
// We don't do line comparison for this error, so don't print the note if the line // We don't do line comparison for this error, so don't print the note if the line
// number is different // number is different
if (err_details.err == .string_literal_too_long and err_details.token.line_number != corresponding_span.start_line) { if (err_details.err == .string_literal_too_long and err_details.token.line_number != corresponding_span.start_line) {
@ -1097,15 +1103,12 @@ const CorrespondingLines = struct {
return error.NotWorthPrintingLines; return error.NotWorthPrintingLines;
} }
var corresponding_lines = CorrespondingLines{ var file = try utils.openFileNotDir(cwd, corresponding_file, .{});
var corresponding_lines: CorrespondingLines = .{
.span = corresponding_span, .span = corresponding_span,
.file = try utils.openFileNotDir(cwd, corresponding_file, .{}), .file_reader = file.reader(&file_read_buffer),
.buffered_reader = undefined,
.code_page = err_details.code_page, .code_page = err_details.code_page,
}; };
corresponding_lines.buffered_reader = .{
.unbuffered_reader = corresponding_lines.file.reader(),
};
errdefer corresponding_lines.deinit(); errdefer corresponding_lines.deinit();
var fbs = std.io.fixedBufferStream(&corresponding_lines.line_buf); var fbs = std.io.fixedBufferStream(&corresponding_lines.line_buf);
@ -1215,7 +1218,7 @@ const CorrespondingLines = struct {
} }
pub fn deinit(self: *CorrespondingLines) void { pub fn deinit(self: *CorrespondingLines) void {
self.file.close(); self.file_reader.file.close();
} }
}; };

View File

@ -1027,14 +1027,10 @@ pub const Reader = struct {
/// vectors through the underlying read calls as possible. /// vectors through the underlying read calls as possible.
const max_buffers_len = 16; const max_buffers_len = 16;
fn stream( fn stream(io_reader: *std.io.Reader, w: *std.io.Writer, limit: std.io.Limit) std.io.Reader.StreamError!usize {
io_reader: *std.io.Reader,
bw: *std.io.Writer,
limit: std.io.Limit,
) std.io.Reader.StreamError!usize {
const r: *Reader = @fieldParentPtr("interface", io_reader); const r: *Reader = @fieldParentPtr("interface", io_reader);
switch (r.mode) { switch (r.mode) {
.positional, .streaming => return bw.writeFile(r, limit, &.{}, 0) catch |write_err| switch (write_err) { .positional, .streaming => return w.writeFile(r, limit, &.{}, 0) catch |write_err| switch (write_err) {
error.ReadFailed => return error.ReadFailed, error.ReadFailed => return error.ReadFailed,
error.WriteFailed => return error.WriteFailed, error.WriteFailed => return error.WriteFailed,
error.Unimplemented => { error.Unimplemented => {
@ -1043,16 +1039,60 @@ pub const Reader = struct {
}, },
}, },
.positional_reading => { .positional_reading => {
const dest = limit.slice(try bw.writableSliceGreedy(1)); if (is_windows) {
const n = try readPositional(r, dest); // Unfortunately, `ReadFileScatter` cannot be used since it
bw.advance(n); // requires page alignment.
return n; const dest = limit.slice(try w.writableSliceGreedy(1));
const n = try readPositional(r, dest);
w.advance(n);
return n;
}
var iovecs_buffer: [max_buffers_len]posix.iovec = undefined;
const dest = w.writableVectorPosix(&iovecs_buffer, limit);
assert(dest[0].len > 0);
const n = posix.preadv(r.file.handle, dest, r.pos) catch |err| switch (err) {
error.Unseekable => {
r.mode = r.mode.toStreaming();
if (r.pos != 0) r.seekBy(@intCast(r.pos)) catch {
r.mode = .failure;
return error.ReadFailed;
};
return 0;
},
else => |e| {
r.err = e;
return error.ReadFailed;
},
};
if (n == 0) {
r.size = r.pos;
return error.EndOfStream;
}
r.pos += n;
return w.advanceVector(n);
}, },
.streaming_reading => { .streaming_reading => {
const dest = limit.slice(try bw.writableSliceGreedy(1)); if (is_windows) {
const n = try readStreaming(r, dest); // Unfortunately, `ReadFileScatter` cannot be used since it
bw.advance(n); // requires page alignment.
return n; const dest = limit.slice(try w.writableSliceGreedy(1));
const n = try readStreaming(r, dest);
w.advance(n);
return n;
}
var iovecs_buffer: [max_buffers_len]posix.iovec = undefined;
const dest = w.writableVectorPosix(&iovecs_buffer, limit);
assert(dest[0].len > 0);
const n = posix.pread(r.file.handle, dest) catch |err| {
r.err = err;
return error.ReadFailed;
};
if (n == 0) {
r.size = r.pos;
return error.EndOfStream;
}
r.pos += n;
return w.advanceVector(n);
}, },
.failure => return error.ReadFailed, .failure => return error.ReadFailed,
} }

View File

@ -22,12 +22,12 @@ seek: usize,
end: usize, end: usize,
pub const VTable = struct { pub const VTable = struct {
/// Writes bytes from the internally tracked logical position to `bw`. /// Writes bytes from the internally tracked logical position to `w`.
/// ///
/// Returns the number of bytes written, which will be at minimum `0` and /// Returns the number of bytes written, which will be at minimum `0` and
/// at most `limit`. The number returned, including zero, does not indicate /// at most `limit`. The number returned, including zero, does not indicate
/// end of stream. `limit` is guaranteed to be at least as large as the /// end of stream. `limit` is guaranteed to be at least as large as the
/// buffer capacity of `bw`. /// buffer capacity of `w`.
/// ///
/// The reader's internal logical seek position moves forward in accordance /// The reader's internal logical seek position moves forward in accordance
/// with the number of bytes returned from this function. /// with the number of bytes returned from this function.
@ -35,6 +35,8 @@ pub const VTable = struct {
/// Implementations are encouraged to utilize mandatory minimum buffer /// Implementations are encouraged to utilize mandatory minimum buffer
/// sizes combined with short reads (returning a value less than `limit`) /// sizes combined with short reads (returning a value less than `limit`)
/// in order to minimize complexity. /// in order to minimize complexity.
///
/// This function is always called when `buffer` is empty.
stream: *const fn (r: *Reader, w: *Writer, limit: Limit) StreamError!usize, stream: *const fn (r: *Reader, w: *Writer, limit: Limit) StreamError!usize,
/// Consumes bytes from the internally tracked stream position without /// Consumes bytes from the internally tracked stream position without
@ -51,7 +53,7 @@ pub const VTable = struct {
/// sizes combined with short reads (returning a value less than `limit`) /// sizes combined with short reads (returning a value less than `limit`)
/// in order to minimize complexity. /// in order to minimize complexity.
/// ///
/// The default implementation is is based on calling `read`, borrowing /// The default implementation is is based on calling `stream`, borrowing
/// `buffer` to construct a temporary `Writer` and ignoring the written /// `buffer` to construct a temporary `Writer` and ignoring the written
/// data. /// data.
discard: *const fn (r: *Reader, limit: Limit) Error!usize = defaultDiscard, discard: *const fn (r: *Reader, limit: Limit) Error!usize = defaultDiscard,
@ -88,7 +90,7 @@ pub const ShortError = error{
pub const failing: Reader = .{ pub const failing: Reader = .{
.context = undefined, .context = undefined,
.vtable = &.{ .vtable = &.{
.read = failingRead, .read = failingStream,
.discard = failingDiscard, .discard = failingDiscard,
}, },
.buffer = &.{}, .buffer = &.{},
@ -107,7 +109,7 @@ pub fn fixed(buffer: []const u8) Reader {
return .{ return .{
.context = undefined, .context = undefined,
.vtable = &.{ .vtable = &.{
.read = endingRead, .stream = endingStream,
.discard = endingDiscard, .discard = endingDiscard,
}, },
// This cast is safe because all potential writes to it will instead // This cast is safe because all potential writes to it will instead
@ -274,9 +276,17 @@ pub fn appendRemaining(
/// The reader's internal logical seek position moves forward in accordance /// The reader's internal logical seek position moves forward in accordance
/// with the number of bytes returned from this function. /// with the number of bytes returned from this function.
pub fn readVec(r: *Reader, data: []const []u8) Error!usize { pub fn readVec(r: *Reader, data: []const []u8) Error!usize {
return readVecLimit(r, data, .unlimited); return readVec(r, data, .unlimited);
} }
const VectorWrapped = struct {
writer: Writer,
first: []u8,
middle: []const []u8,
last: []u8,
var unique_address: u8 = undefined;
};
/// Equivalent to `readVec` but reads at most `limit` bytes. /// Equivalent to `readVec` but reads at most `limit` bytes.
/// ///
/// This ultimately will lower to a call to `stream`, but it must ensure /// This ultimately will lower to a call to `stream`, but it must ensure
@ -296,42 +306,55 @@ pub fn readVecLimit(r: *Reader, data: []const []u8, limit: Limit) Error!usize {
if (remaining == 0) break; if (remaining == 0) break;
if (buf.len - copy_len == 0) continue; if (buf.len - copy_len == 0) continue;
// All of `buffer` has been copied to `data`. We now set up a structure
// that enables the `Writer.writableVector` API, while also ensuring
// API that directly operates on the `Writable.buffer` has its minimum
// buffer capacity requirements met.
r.seek = 0; r.seek = 0;
r.end = 0; r.end = 0;
var vecs: [8][]u8 = undefined; // Arbitrarily chosen value. const first = buf[copy_len..];
const available_remaining_buf = buf[copy_len..]; var wrapped: VectorWrapped = .{
vecs[0] = available_remaining_buf[0..@min(available_remaining_buf.len, remaining)]; .first = first,
const vec_start_remaining = remaining; .middle = data[i + 1 ..],
remaining -= vecs[0].len; .last = r.buffer,
var vecs_i: usize = 1; .writer = .{
var data_i: usize = i + 1; .context = &VectorWrapped.unique_address,
while (true) { .buffer = if (first.len >= r.buffer.len) first else r.buffer,
if (vecs.len - vecs_i == 0) { .vtable = &.{ .drain = Writer.fixedDrain },
const n = try r.unbuffered_reader.readVec(&vecs); },
return @intFromEnum(limit) - vec_start_remaining + n; };
} var n = r.vtable.stream(r, &wrapped.writer, .limited(remaining)) catch |err| switch (err) {
if (remaining == 0 or data.len - data_i == 0) { error.WriteFailed => {
vecs[vecs_i] = r.buffer; if (wrapped.writer.buffer.ptr == first.ptr) {
vecs_i += 1; remaining -= wrapped.writer.end;
const n = try r.unbuffered_reader.readVec(vecs[0..vecs_i]);
const cutoff = vec_start_remaining - remaining;
if (n > cutoff) {
r.end = n - cutoff;
return @intFromEnum(limit) - remaining;
} else { } else {
return @intFromEnum(limit) - vec_start_remaining + n; r.end = wrapped.writer.end;
} }
} break;
if (data[data_i].len == 0) { },
data_i += 1; else => |e| return e,
continue; };
} assert(n == wrapped.writer.end);
const data_elem = data[data_i]; if (wrapped.writer.buffer.ptr != first.ptr) {
vecs[vecs_i] = data_elem[0..@min(data_elem.len, remaining)]; r.end = n;
remaining -= vecs[vecs_i].len; break;
vecs_i += 1;
data_i += 1;
} }
if (n < first.len) {
remaining -= n;
break;
}
remaining -= first.len;
n -= first.len;
for (wrapped.middle) |middle| {
if (n < middle.len) {
remaining -= n;
break;
}
remaining -= middle.len;
n -= middle.len;
}
r.end = n;
break;
} }
return @intFromEnum(limit) - remaining; return @intFromEnum(limit) - remaining;
} }
@ -366,10 +389,10 @@ pub fn readVecAll(r: *Reader, data: [][]u8) Error!void {
} }
/// "Pump" data from the reader to the writer. /// "Pump" data from the reader to the writer.
pub fn readAll(r: *Reader, bw: *Writer, limit: Limit) StreamError!void { pub fn readAll(r: *Reader, w: *Writer, limit: Limit) StreamError!void {
var remaining = limit; var remaining = limit;
while (remaining.nonzero()) { while (remaining.nonzero()) {
const n = try r.read(bw, remaining); const n = try r.read(w, remaining);
remaining = remaining.subtract(n).?; remaining = remaining.subtract(n).?;
} }
} }
@ -774,12 +797,12 @@ pub fn peekDelimiterExclusive(r: *Reader, delimiter: u8) DelimiterError![]u8 {
return result[0 .. result.len - 1]; return result[0 .. result.len - 1];
} }
/// Appends to `bw` contents by reading from the stream until `delimiter` is /// Appends to `w` contents by reading from the stream until `delimiter` is
/// found. Does not write the delimiter itself. /// found. Does not write the delimiter itself.
/// ///
/// Returns number of bytes streamed. /// Returns number of bytes streamed.
pub fn readDelimiter(r: *Reader, bw: *Writer, delimiter: u8) StreamError!usize { pub fn readDelimiter(r: *Reader, w: *Writer, delimiter: u8) StreamError!usize {
const amount, const to = try r.readAny(bw, delimiter, .unlimited); const amount, const to = try r.readAny(w, delimiter, .unlimited);
return switch (to) { return switch (to) {
.delimiter => amount, .delimiter => amount,
.limit => unreachable, .limit => unreachable,
@ -787,7 +810,7 @@ pub fn readDelimiter(r: *Reader, bw: *Writer, delimiter: u8) StreamError!usize {
}; };
} }
/// Appends to `bw` contents by reading from the stream until `delimiter` is found. /// Appends to `w` contents by reading from the stream until `delimiter` is found.
/// Does not write the delimiter itself. /// Does not write the delimiter itself.
/// ///
/// Succeeds if stream ends before delimiter found. /// Succeeds if stream ends before delimiter found.
@ -795,10 +818,10 @@ pub fn readDelimiter(r: *Reader, bw: *Writer, delimiter: u8) StreamError!usize {
/// Returns number of bytes streamed. The end is not signaled to the writer. /// Returns number of bytes streamed. The end is not signaled to the writer.
pub fn readDelimiterEnding( pub fn readDelimiterEnding(
r: *Reader, r: *Reader,
bw: *Writer, w: *Writer,
delimiter: u8, delimiter: u8,
) StreamRemainingError!usize { ) StreamRemainingError!usize {
const amount, const to = try r.readAny(bw, delimiter, .unlimited); const amount, const to = try r.readAny(w, delimiter, .unlimited);
return switch (to) { return switch (to) {
.delimiter, .end => amount, .delimiter, .end => amount,
.limit => unreachable, .limit => unreachable,
@ -812,17 +835,17 @@ pub const StreamDelimiterLimitedError = StreamRemainingError || error{
StreamTooLong, StreamTooLong,
}; };
/// Appends to `bw` contents by reading from the stream until `delimiter` is found. /// Appends to `w` contents by reading from the stream until `delimiter` is found.
/// Does not write the delimiter itself. /// Does not write the delimiter itself.
/// ///
/// Returns number of bytes streamed. /// Returns number of bytes streamed.
pub fn readDelimiterLimit( pub fn readDelimiterLimit(
r: *Reader, r: *Reader,
bw: *Writer, w: *Writer,
delimiter: u8, delimiter: u8,
limit: Limit, limit: Limit,
) StreamDelimiterLimitedError!usize { ) StreamDelimiterLimitedError!usize {
const amount, const to = try r.readAny(bw, delimiter, limit); const amount, const to = try r.readAny(w, delimiter, limit);
return switch (to) { return switch (to) {
.delimiter => amount, .delimiter => amount,
.limit => error.StreamTooLong, .limit => error.StreamTooLong,
@ -832,7 +855,7 @@ pub fn readDelimiterLimit(
fn readAny( fn readAny(
r: *Reader, r: *Reader,
bw: *Writer, w: *Writer,
delimiter: ?u8, delimiter: ?u8,
limit: Limit, limit: Limit,
) StreamRemainingError!struct { usize, enum { delimiter, limit, end } } { ) StreamRemainingError!struct { usize, enum { delimiter, limit, end } } {
@ -844,11 +867,11 @@ fn readAny(
error.EndOfStream => return .{ amount, .end }, error.EndOfStream => return .{ amount, .end },
}); });
if (delimiter) |d| if (std.mem.indexOfScalar(u8, available, d)) |delimiter_index| { if (delimiter) |d| if (std.mem.indexOfScalar(u8, available, d)) |delimiter_index| {
try bw.writeAll(available[0..delimiter_index]); try w.writeAll(available[0..delimiter_index]);
r.toss(delimiter_index + 1); r.toss(delimiter_index + 1);
return .{ amount + delimiter_index, .delimiter }; return .{ amount + delimiter_index, .delimiter };
}; };
try bw.writeAll(available); try w.writeAll(available);
r.toss(available.len); r.toss(available.len);
amount += available.len; amount += available.len;
remaining = remaining.subtract(available.len).?; remaining = remaining.subtract(available.len).?;
@ -1303,7 +1326,7 @@ test "expected error.EndOfStream" {
try std.testing.expectError(error.EndOfStream, r.isBytes("foo")); try std.testing.expectError(error.EndOfStream, r.isBytes("foo"));
} }
fn endingRead(r: *Reader, w: *Writer, limit: Limit) StreamError!usize { fn endingStream(r: *Reader, w: *Writer, limit: Limit) StreamError!usize {
_ = r; _ = r;
_ = w; _ = w;
_ = limit; _ = limit;
@ -1316,7 +1339,7 @@ fn endingDiscard(r: *Reader, limit: Limit) Error!usize {
return error.EndOfStream; return error.EndOfStream;
} }
fn failingRead(r: *Reader, w: *Writer, limit: Limit) StreamError!usize { fn failingStream(r: *Reader, w: *Writer, limit: Limit) StreamError!usize {
_ = r; _ = r;
_ = w; _ = w;
_ = limit; _ = limit;
@ -1409,8 +1432,8 @@ pub fn Hashed(comptime Hasher: type) type {
fn discard(r: *Reader, limit: Limit) Error!usize { fn discard(r: *Reader, limit: Limit) Error!usize {
const this: *@This() = @alignCast(@fieldParentPtr("interface", r)); const this: *@This() = @alignCast(@fieldParentPtr("interface", r));
var bw = this.hasher.writable(&.{}); var w = this.hasher.writable(&.{});
const n = this.in.read(&bw, limit) catch |err| switch (err) { const n = this.in.read(&w, limit) catch |err| switch (err) {
error.WriteFailed => unreachable, error.WriteFailed => unreachable,
else => |e| return e, else => |e| return e,
}; };

View File

@ -9,7 +9,12 @@ const File = std.fs.File;
const testing = std.testing; const testing = std.testing;
const Allocator = std.mem.Allocator; const Allocator = std.mem.Allocator;
context: ?*anyopaque, /// There are two strategies for obtaining context; one can use this field, or
/// embed the `Writer` and use `@fieldParentPtr`. This field must be either set
/// to a valid pointer or left as `null` because the interface will sometimes
/// check if this pointer value is a known special value, for example to make
/// `writableVector` work.
context: ?*anyopaque = null,
vtable: *const VTable, vtable: *const VTable,
/// If this has length zero, the writer is unbuffered, and `flush` is a no-op. /// If this has length zero, the writer is unbuffered, and `flush` is a no-op.
buffer: []u8, buffer: []u8,
@ -30,11 +35,16 @@ pub const VTable = struct {
/// order. Elements of `data` may alias each other but may not alias /// order. Elements of `data` may alias each other but may not alias
/// `buffer`. /// `buffer`.
/// ///
/// This function modifies `Writer` state. /// This function modifies `Writer.end` and `Writer.buffer`.
/// ///
/// `data.len` must be greater than zero, and the last element of `data` is /// If `data.len` is zero, it indicates this is a "flush" operation; all
/// special. It is repeated as necessary so that it is written `splat` /// remaining buffered data must be logically consumed. Generally, this
/// number of times, which may be zero. /// means that `end` will be set to zero before returning, however, it is
/// legal for implementations to manage that data differently. There may be
/// subsequent calls to `drain` and `sendFile` after a flush operation.
///
/// The last element of `data` is special. It is repeated as necessary so
/// that it is written `splat` number of times, which may be zero.
/// ///
/// Number of bytes actually written is returned, excluding bytes from /// Number of bytes actually written is returned, excluding bytes from
/// `buffer`. Bytes from `buffer` are tracked by modifying `end`. /// `buffer`. Bytes from `buffer` are tracked by modifying `end`.
@ -94,7 +104,6 @@ pub const FileError = error{
/// modified externally, `count` will always equal `end`. /// modified externally, `count` will always equal `end`.
pub fn fixed(buffer: []u8) Writer { pub fn fixed(buffer: []u8) Writer {
return .{ return .{
.context = undefined,
.vtable = &.{ .drain = fixedDrain }, .vtable = &.{ .drain = fixedDrain },
.buffer = buffer, .buffer = buffer,
}; };
@ -105,7 +114,6 @@ pub fn hashed(w: *Writer, hasher: anytype) Hashed(@TypeOf(hasher)) {
} }
pub const failing: Writer = .{ pub const failing: Writer = .{
.context = undefined,
.vtable = &.{ .vtable = &.{
.drain = failingDrain, .drain = failingDrain,
.sendFile = failingSendFile, .sendFile = failingSendFile,
@ -114,7 +122,6 @@ pub const failing: Writer = .{
pub fn discarding(buffer: []u8) Writer { pub fn discarding(buffer: []u8) Writer {
return .{ return .{
.context = undefined,
.vtable = &.{ .vtable = &.{
.drain = discardingDrain, .drain = discardingDrain,
.sendFile = discardingSendFile, .sendFile = discardingSendFile,
@ -199,12 +206,11 @@ pub fn writeSplatLimit(
} }
/// Drains all remaining buffered data. /// Drains all remaining buffered data.
///
/// It is legal for `VTable.drain` implementations to refrain from modifying
/// `end`.
pub fn flush(w: *Writer) Error!void { pub fn flush(w: *Writer) Error!void {
const drainFn = w.vtable.drain; assert(0 == try w.vtable.drain(w, &.{}, 0));
// This implementation allows for drain functions that do not modify `end`,
// such as `fixedDrain`.
var remaining = w.end;
while (remaining != 0) remaining -= try drainFn(w, &.{""}, 1);
} }
pub fn unusedCapacitySlice(w: *const Writer) []u8 { pub fn unusedCapacitySlice(w: *const Writer) []u8 {
@ -321,14 +327,13 @@ pub fn writeSplatAll(w: *Writer, data: [][]const u8, splat: usize) Error!void {
pub fn write(w: *Writer, bytes: []const u8) Error!usize { pub fn write(w: *Writer, bytes: []const u8) Error!usize {
if (w.end + bytes.len <= w.buffer.len) { if (w.end + bytes.len <= w.buffer.len) {
@branchHint(.likely);
@memcpy(w.buffer[w.end..][0..bytes.len], bytes); @memcpy(w.buffer[w.end..][0..bytes.len], bytes);
w.end += bytes.len; w.end += bytes.len;
w.count += bytes.len; w.count += bytes.len;
return bytes.len; return bytes.len;
} }
const end = w.end; return w.vtable.drain(w, &.{bytes}, 1);
const n = try w.vtable.drain(w, &.{bytes}, 1);
return n -| end;
} }
/// Calls `write` as many times as necessary such that all of `bytes` are /// Calls `write` as many times as necessary such that all of `bytes` are
@ -1675,6 +1680,7 @@ pub fn unimplementedSendFile(w: *Writer, file_reader: *File.Reader, limit: Limit
/// available buffer has been filled. Also, it may be called from `flush` in /// available buffer has been filled. Also, it may be called from `flush` in
/// which case it should return successfully. /// which case it should return successfully.
fn fixedDrain(w: *Writer, data: []const []const u8, splat: usize) Error!usize { fn fixedDrain(w: *Writer, data: []const []const u8, splat: usize) Error!usize {
if (data.len == 0) return 0;
for (data[0 .. data.len - 1]) |bytes| { for (data[0 .. data.len - 1]) |bytes| {
const dest = w.buffer[w.end..]; const dest = w.buffer[w.end..];
const len = @min(bytes.len, dest.len); const len = @min(bytes.len, dest.len);
@ -1726,7 +1732,6 @@ pub fn Hashed(comptime Hasher: type) type {
.out = out, .out = out,
.hasher = .{}, .hasher = .{},
.interface = .{ .interface = .{
.context = undefined,
.vtable = &.{@This().drain}, .vtable = &.{@This().drain},
}, },
}; };
@ -1734,6 +1739,13 @@ pub fn Hashed(comptime Hasher: type) type {
fn drain(w: *Writer, data: []const []const u8, splat: usize) Error!usize { fn drain(w: *Writer, data: []const []const u8, splat: usize) Error!usize {
const this: *@This() = @alignCast(@fieldParentPtr("interface", w)); const this: *@This() = @alignCast(@fieldParentPtr("interface", w));
if (data.len == 0) {
const buf = w.buffered();
try this.out.writeAll(buf);
this.hasher.update(buf);
w.end = 0;
return buf.len;
}
const aux_n = try this.out.writeSplatAux(w.buffered(), data, splat); const aux_n = try this.out.writeSplatAux(w.buffered(), data, splat);
if (aux_n < w.end) { if (aux_n < w.end) {
this.hasher.update(w.buffer[0..aux_n]); this.hasher.update(w.buffer[0..aux_n]);
@ -1839,7 +1851,6 @@ pub const Allocating = struct {
const init_interface: Writer = .{ const init_interface: Writer = .{
.interface = .{ .interface = .{
.context = undefined,
.vtable = &.{ .vtable = &.{
.drain = Allocating.drain, .drain = Allocating.drain,
.sendFile = Allocating.sendFile, .sendFile = Allocating.sendFile,

View File

@ -1912,10 +1912,7 @@ pub const Stream = struct {
return .{ return .{
.interface_state = .{ .interface_state = .{
.context = undefined, .context = undefined,
.vtable = &.{ .vtable = &.{ .stream = stream },
.stream = stream,
.discard = discard,
},
.buffer = buffer, .buffer = buffer,
}, },
.net_stream = net_stream, .net_stream = net_stream,
@ -1924,19 +1921,9 @@ pub const Stream = struct {
fn stream(io_r: *io.Reader, io_w: *io.Writer, limit: io.Limit) io.Reader.StreamError!usize { fn stream(io_r: *io.Reader, io_w: *io.Writer, limit: io.Limit) io.Reader.StreamError!usize {
const r: *Reader = @fieldParentPtr("interface", io_r); const r: *Reader = @fieldParentPtr("interface", io_r);
const data = io_w.writableVector(limit);
var iovecs: [max_buffers_len]windows.WSABUF = undefined; var iovecs: [max_buffers_len]windows.WSABUF = undefined;
var iovecs_i: usize = 0; const bufs = io_w.writableVectorWsa(&iovecs, limit);
for (data) |d| { assert(bufs[0].len > 0);
// In case Windows checks pointer address before length, we must omit
// length-zero vectors.
if (d.len == 0) continue;
iovecs[iovecs_i] = .{ .buf = d.ptr, .len = d.len };
iovecs_i += 1;
if (iovecs_i >= iovecs.len) break;
}
const bufs = iovecs[0..iovecs_i];
if (bufs.len == 0) return .{}; // Prevent false positive end detection on empty `data`.
var n: u32 = undefined; var n: u32 = undefined;
var flags: u32 = 0; var flags: u32 = 0;
const rc = windows.ws2_32.WSARecvFrom(r.net_stream.handle, bufs.ptr, bufs.len, &n, &flags, null, null, null, null); const rc = windows.ws2_32.WSARecvFrom(r.net_stream.handle, bufs.ptr, bufs.len, &n, &flags, null, null, null, null);
@ -1958,13 +1945,6 @@ pub const Stream = struct {
if (n == 0) return error.EndOfStream; if (n == 0) return error.EndOfStream;
return io_w.advanceVector(n); return io_w.advanceVector(n);
} }
fn discard(io_r: *io.Reader, limit: io.Limit) io.Reader.Error!usize {
const r: *Reader = @fieldParentPtr("interface", io_r);
_ = r;
_ = limit;
@panic("TODO");
}
}, },
else => struct { else => struct {
file_reader: File.Reader, file_reader: File.Reader,

View File

@ -8,6 +8,7 @@ const std = @import("std");
const File = std.fs.File; const File = std.fs.File;
const is_le = builtin.target.cpu.arch.endian() == .little; const is_le = builtin.target.cpu.arch.endian() == .little;
const Writer = std.io.Writer; const Writer = std.io.Writer;
const Reader = std.io.Reader;
pub const CompressionMethod = enum(u16) { pub const CompressionMethod = enum(u16) {
store = 0, store = 0,
@ -160,61 +161,47 @@ pub const EndRecord = extern struct {
} }
}; };
pub const Decompress = union { pub const Decompress = struct {
inflate: std.compress.flate.Decompress, interface: Reader,
store: *std.io.Reader, state: union {
inflate: std.compress.flate.Decompress,
store: *Reader,
},
fn readable( pub fn init(reader: *Reader, method: CompressionMethod, buffer: []u8) Reader {
d: *Decompress, return switch (method) {
reader: *std.io.Reader, .store => .{
method: CompressionMethod, .state = .{ .store = reader },
buffer: []u8, .interface = .{
) std.io.Reader { .context = undefined,
switch (method) { .vtable = &.{ .stream = streamStore },
.store => {
d.* = .{ .store = reader };
return .{
.unbuffered_reader = .{
.context = d,
.vtable = &.{ .read = readStore },
},
.buffer = buffer, .buffer = buffer,
.end = 0, .end = 0,
.seek = 0, .seek = 0,
}; },
}, },
.deflate => { .deflate => .{
d.* = .{ .inflate = .init(reader, .raw) }; .state = .{ .inflate = .init(reader, .raw) },
return .{ .interface = .{
.unbuffered_reader = .{ .context = undefined,
.context = d, .vtable = &.{ .stream = streamDeflate },
.vtable = &.{ .read = readDeflate },
},
.buffer = buffer, .buffer = buffer,
.end = 0, .end = 0,
.seek = 0, .seek = 0,
}; },
}, },
else => unreachable, else => unreachable,
} };
} }
fn readStore( fn streamStore(r: *Reader, w: *Writer, limit: std.io.Limit) Reader.StreamError!usize {
context: ?*anyopaque, const d: *Decompress = @fieldParentPtr("interface", r);
writer: *Writer, return d.store.read(w, limit);
limit: std.io.Limit,
) std.io.Reader.StreamError!usize {
const d: *Decompress = @ptrCast(@alignCast(context));
return d.store.read(writer, limit);
} }
fn readDeflate( fn streamDeflate(r: *Reader, w: *Writer, limit: std.io.Limit) Reader.StreamError!usize {
context: ?*anyopaque, const d: *Decompress = @fieldParentPtr("interface", r);
writer: *Writer, return std.compress.flate.Decompress.read(&d.inflate, w, limit);
limit: std.io.Limit,
) std.io.Reader.StreamError!usize {
const d: *Decompress = @ptrCast(@alignCast(context));
return std.compress.flate.Decompress.read(&d.inflate, writer, limit);
} }
}; };