std: reader/writer fixes

This commit is contained in:
Andrew Kelley 2025-04-21 18:48:00 -07:00
parent 19b82ca7ab
commit 4dc3a9444c
6 changed files with 250 additions and 81 deletions

View File

@ -931,7 +931,8 @@ pub const WriteFileError = PReadError || WriteError;
pub fn writeFileAll(self: File, in_file: File, options: BufferedWriter.WriteFileOptions) WriteFileError!void {
var file_writer = self.writer();
var bw = file_writer.interface().buffered(&.{});
var buffer: [2000]u8 = undefined;
var bw = file_writer.interface().buffered(&buffer);
bw.writeFileAll(in_file, options) catch |err| switch (err) {
error.WriteFailed => if (file_writer.err) |_| unreachable else |e| return e,
else => |e| return e,
@ -940,14 +941,27 @@ pub fn writeFileAll(self: File, in_file: File, options: BufferedWriter.WriteFile
pub const Reader = struct {
file: File,
err: ReadError!void = {},
err: ?ReadError = null,
mode: Reader.Mode = .positional,
pos: u64 = 0,
size: ?u64 = null,
size_err: GetEndPosError!void = {},
seek_err: SeekError!void = {},
size_err: ?GetEndPosError = null,
seek_err: ?SeekError = null,
pub const Mode = enum { streaming, positional };
pub const Mode = enum {
streaming,
positional,
streaming_reading,
positional_reading,
pub fn toStreaming(m: @This()) @This() {
return switch (m) {
.positional => .streaming,
.positional_reading => .streaming_reading,
else => unreachable,
};
}
};
pub fn interface(r: *Reader) std.io.Reader {
return .{
@ -975,7 +989,7 @@ pub const Reader = struct {
switch (r.mode) {
.positional => {
const size = r.size orelse {
if (r.file.getEndPos()) |size| {
if (file.getEndPos()) |size| {
r.size = size;
} else |err| {
r.size_err = err;
@ -991,6 +1005,10 @@ pub const Reader = struct {
assert(pos == 0);
return 0;
},
error.Unimplemented => {
r.mode = .positional_reading;
return 0;
},
else => |e| {
r.err = e;
return error.ReadFailed;
@ -1003,6 +1021,10 @@ pub const Reader = struct {
const n = bw.writeFile(file, .none, limit, &.{}, 0) catch |err| switch (err) {
error.WriteFailed => return error.WriteFailed,
error.Unseekable => unreachable, // Passing `Offset.none`.
error.Unimplemented => {
r.mode = .streaming_reading;
return 0;
},
else => |e| {
r.err = e;
return error.ReadFailed;
@ -1011,6 +1033,35 @@ pub const Reader = struct {
r.pos = pos + n;
return n;
},
.positional_reading => {
const dest = limit.slice(try bw.writableSliceGreedy(1));
const n = file.pread(dest, pos) catch |err| switch (err) {
error.Unseekable => {
r.mode = .streaming_reading;
assert(pos == 0);
return 0;
},
else => |e| {
r.err = e;
return error.ReadFailed;
},
};
if (n == 0) return error.EndOfStream;
r.pos = pos + n;
bw.advance(n);
return n;
},
.streaming_reading => {
const dest = limit.slice(try bw.writableSliceGreedy(1));
const n = file.read(dest) catch |err| {
r.err = err;
return error.ReadFailed;
};
if (n == 0) return error.EndOfStream;
r.pos = pos + n;
bw.advance(n);
return n;
},
}
}
@ -1020,7 +1071,7 @@ pub const Reader = struct {
const pos = r.pos;
switch (r.mode) {
.positional => {
.positional, .positional_reading => {
if (is_windows) {
// Unfortunately, `ReadFileScatter` cannot be used since it requires
// page alignment, so we are stuck using only the first slice.
@ -1053,7 +1104,7 @@ pub const Reader = struct {
if (send_vecs.len == 0) return 0; // Prevent false positive end detection on empty `data`.
const n = posix.preadv(handle, send_vecs, pos) catch |err| switch (err) {
error.Unseekable => {
r.mode = .streaming;
r.mode = r.mode.toStreaming();
assert(pos == 0);
return 0;
},
@ -1066,7 +1117,7 @@ pub const Reader = struct {
r.pos = pos + n;
return n;
},
.streaming => {
.streaming, .streaming_reading => {
if (is_windows) {
// Unfortunately, `ReadFileScatter` cannot be used since it requires
// page alignment, so we are stuck using only the first slice.
@ -1113,13 +1164,13 @@ pub const Reader = struct {
const file = r.file;
const pos = r.pos;
switch (r.mode) {
.positional => {
.positional, .positional_reading => {
const size = r.size orelse {
if (file.getEndPos()) |size| {
r.size = size;
} else |err| {
r.size_err = err;
r.mode = .streaming;
r.mode = r.mode.toStreaming();
}
return 0;
};
@ -1127,17 +1178,13 @@ pub const Reader = struct {
r.pos = pos + delta;
return delta;
},
.streaming => {
.streaming, .streaming_reading => {
// Unfortunately we can't seek forward without knowing the
// size because the seek syscalls provided to us will not
// return the true end position if a seek would exceed the
// end.
fallback: {
if (r.size_err) |_| {
if (r.seek_err) |_| {
break :fallback;
} else |_| {}
} else |_| {}
if (r.size_err == null and r.seek_err == null) break :fallback;
var trash_buffer: [std.atomic.cache_line]u8 = undefined;
const trash = &trash_buffer;
if (is_windows) {
@ -1188,9 +1235,16 @@ pub const Writer = struct {
err: WriteError!void = {},
mode: Writer.Mode = .positional,
pos: u64 = 0,
sendfile_err: ?SendfileError = null,
read_err: ?ReadError = null,
pub const Mode = Reader.Mode;
pub const SendfileError = error{
UnsupportedOperation,
Unexpected,
};
/// Number of slices to store on the stack, when trying to send as many byte
/// vectors through the underlying write calls as possible.
const max_buffers_len = 16;
@ -1269,75 +1323,38 @@ pub const Writer = struct {
const w: *Writer = @ptrCast(@alignCast(context));
const out_fd = w.file.handle;
const in_fd = in_file.handle;
const len_int = switch (in_limit) {
.nothing => return writeSplat(context, headers_and_trailers, 1),
.unlimited => 0,
else => in_limit.toInt().?,
};
// TODO try using copy_file_range on linux
// TODO try using copy_file_range on freebsd
if (native_os == .linux) sf: {
// TODO try using copy_file_range on Linux
// TODO try using copy_file_range on FreeBSD
// TODO try using sendfile on macOS
// TODO try using sendfile on FreeBSD
if (native_os == .linux and w.mode == .streaming) sf: {
// Try using sendfile on Linux.
if (w.sendfile_err != null) break :sf;
// Linux sendfile does not support headers or trailers but it does
// support a streaming read from in_file.
if (headers_len > 0) return writeSplat(context, headers_and_trailers[0..headers_len], 1);
const max_count = 0x7ffff000; // Avoid EINVAL.
const smaller_len = if (len_int == 0) max_count else @min(len_int, max_count);
const smaller_len = in_limit.minInt(max_count);
var off: std.os.linux.off_t = undefined;
const off_ptr: ?*std.os.linux.off_t = if (in_offset.toInt()) |offset| b: {
off = std.math.cast(std.os.linux.off_t, offset) orelse
return writeSplat(context, headers_and_trailers, 1);
break :b &off;
} else null;
if (true) @panic("TODO");
const n = std.os.linux.wrapped.sendfile(out_fd, in_fd, off_ptr, smaller_len) catch |err| switch (err) {
error.UnsupportedOperation => break :sf,
error.Unseekable => break :sf,
error.Unexpected => break :sf,
// Errors that imply sendfile should be avoided on the next write.
error.UnsupportedOperation,
error.Unexpected,
=> |e| {
w.sendfile_err = e;
break :sf;
},
else => |e| return e,
};
if (in_offset.toInt()) |offset| {
assert(n == off - offset);
} else if (n == 0 and len_int == 0) {
// The caller wouldn't be able to tell that the file transfer is
// done and would incorrectly repeat the same call.
return writeSplat(context, headers_and_trailers, 1);
}
w.pos += n;
return n;
}
var iovecs_buffer: [max_buffers_len]std.posix.iovec_const = undefined;
const iovecs = iovecs_buffer[0..@min(iovecs_buffer.len, headers_and_trailers.len)];
for (iovecs, headers_and_trailers[0..iovecs.len]) |*v, d| v.* = .{ .base = d.ptr, .len = d.len };
const headers = iovecs[0..@min(headers_len, iovecs.len)];
const trailers = iovecs[headers.len..];
const flags = 0;
return posix.sendfile(out_fd, in_fd, in_offset, len_int, headers, trailers, flags) catch |err| switch (err) {
error.Unseekable,
error.FastOpenAlreadyInProgress,
error.MessageTooBig,
error.FileDescriptorNotASocket,
error.NetworkUnreachable,
error.NetworkSubsystemFailed,
=> return writeFileUnseekable(out_fd, in_fd, in_offset, in_limit, headers_and_trailers, headers_len),
else => |e| return e,
};
}
fn writeFileUnseekable(
out_fd: Handle,
in_fd: Handle,
in_offset: u64,
in_limit: std.io.Writer.Limit,
headers_and_trailers: []const []const u8,
headers_len: usize,
) std.io.Writer.FileError!usize {
_ = out_fd;
_ = in_fd;
_ = in_offset;
_ = in_limit;
_ = headers_and_trailers;
_ = headers_len;
@panic("TODO writeFileUnseekable");
return error.Unimplemented;
}
};

View File

@ -99,7 +99,7 @@ pub fn readVecLimit(br: *BufferedReader, data: []const []u8, limit: Reader.Limit
fn passthruRead(context: ?*anyopaque, bw: *BufferedWriter, limit: Reader.Limit) Reader.RwError!usize {
const br: *BufferedReader = @alignCast(@ptrCast(context));
const buffer = limit.slice(br.buffer[br.end..br.seek]);
const buffer = limit.slice(br.buffer[br.seek..br.end]);
if (buffer.len > 0) {
const n = try bw.write(buffer);
br.seek += n;

View File

@ -480,6 +480,14 @@ pub fn writeSliceSwap(bw: *BufferedWriter, Elem: type, slice: []const Elem) Writ
/// Unlike `writeSplat` and `writeVec`, this function will call into the
/// underlying writer even if there is enough buffer capacity for the file
/// contents.
///
/// Although it would be possible to eliminate `error.Unimplemented` from
/// the error set by reading directly into the buffer in such case,
/// this is not done because it is more efficient to do it in `writeFileAll`
/// so that the error does not occur with each write.
///
/// See `writeFileReading` for an alternative that does not have
/// `error.Unimplemented` in the error set.
pub fn writeFile(
bw: *BufferedWriter,
file: std.fs.File,
@ -491,6 +499,23 @@ pub fn writeFile(
return passthruWriteFile(bw, file, offset, limit, headers_and_trailers, headers_len);
}
pub const WriteFileReadingError = std.fs.File.PReadError || Writer.Error;
/// Returning zero bytes means end of stream.
///
/// Asserts nonzero buffer capacity.
pub fn writeFileReading(
bw: *BufferedWriter,
file: std.fs.File,
offset: Writer.Offset,
limit: Writer.Limit,
) WriteFileReadingError!usize {
const dest = limit.slice(try bw.writableSliceGreedy(1));
const n = if (offset.toInt()) |pos| try file.pread(dest, pos) else try file.read(dest);
bw.advance(n);
return n;
}
fn passthruWriteFile(
context: ?*anyopaque,
file: std.fs.File,
@ -588,7 +613,7 @@ pub const WriteFileOptions = struct {
headers_len: usize = 0,
};
pub fn writeFileAll(bw: *BufferedWriter, file: std.fs.File, options: WriteFileOptions) Writer.FileError!void {
pub fn writeFileAll(bw: *BufferedWriter, file: std.fs.File, options: WriteFileOptions) WriteFileReadingError!void {
const headers_and_trailers = options.headers_and_trailers;
const headers = headers_and_trailers[0..options.headers_len];
switch (options.limit) {
@ -601,7 +626,15 @@ pub fn writeFileAll(bw: *BufferedWriter, file: std.fs.File, options: WriteFileOp
var i: usize = 0;
var offset = options.offset;
while (true) {
var n = try bw.writeFile(file, offset, .unlimited, headers[i..], headers.len - i);
var n = bw.writeFile(file, offset, .unlimited, headers[i..], headers.len - i) catch |err| switch (err) {
error.Unimplemented => {
try bw.writeVecAll(headers[i..]);
try bw.writeFileReadingAll(file, offset, .unlimited);
try bw.writeVecAll(headers_and_trailers[headers.len..]);
return;
},
else => |e| return e,
};
while (i < headers.len and n >= headers[i].len) {
n -= headers[i].len;
i += 1;
@ -619,7 +652,15 @@ pub fn writeFileAll(bw: *BufferedWriter, file: std.fs.File, options: WriteFileOp
var i: usize = 0;
var offset = options.offset;
while (true) {
var n = try bw.writeFile(file, offset, .limited(len), headers_and_trailers[i..], headers.len - i);
var n = bw.writeFile(file, offset, .limited(len), headers_and_trailers[i..], headers.len - i) catch |err| switch (err) {
error.Unimplemented => {
try bw.writeVecAll(headers[i..]);
try bw.writeFileReadingAll(file, offset, .limited(len));
try bw.writeVecAll(headers_and_trailers[headers.len..]);
return;
},
else => |e| return e,
};
while (i < headers.len and n >= headers[i].len) {
n -= headers[i].len;
i += 1;
@ -646,6 +687,40 @@ pub fn writeFileAll(bw: *BufferedWriter, file: std.fs.File, options: WriteFileOp
}
}
/// Equivalent to `writeFileAll` but uses direct `pread` and `read` calls on
/// `file` rather than `Writer.writeFile`. This is generally used as a fallback
/// when the underlying implementation returns `error.Unimplemented`, which is
/// why that error code does not appear in this function's error set.
///
/// Asserts nonzero buffer capacity.
pub fn writeFileReadingAll(
bw: *BufferedWriter,
file: std.fs.File,
offset: Writer.Offset,
limit: Writer.Limit,
) WriteFileReadingError!void {
if (offset.toInt()) |start_pos| {
var remaining = limit;
var pos = start_pos;
while (remaining.nonzero()) {
const dest = remaining.slice(try bw.writableSliceGreedy(1));
const n = try file.pread(dest, pos);
if (n == 0) return;
bw.advance(n);
pos += n;
remaining = remaining.subtract(n).?;
}
}
var remaining = limit;
while (remaining.nonzero()) {
const dest = remaining.slice(try bw.writableSliceGreedy(1));
const n = try file.read(dest);
if (n == 0) return;
bw.advance(n);
remaining = remaining.subtract(n).?;
}
}
pub fn alignBuffer(
bw: *BufferedWriter,
buffer: []const u8,

View File

@ -30,12 +30,20 @@ pub const VTable = struct {
/// Number of bytes returned may be zero, which does not mean
/// end-of-stream. A subsequent call may return nonzero, or may signal end
/// of stream via `error.WriteFailed`.
///
/// If `error.Unimplemented` is returned, the caller should do its own
/// reads from the file. The callee indicates it cannot offer a more
/// efficient implementation.
writeFile: *const fn (
ctx: ?*anyopaque,
file: std.fs.File,
/// If this is `none`, `file` will be streamed, affecting the seek
/// position. Otherwise, it will be read positionally without affecting
/// the seek position.
/// If this is `Offset.none`, `file` will be streamed, affecting the
/// seek position. Otherwise, it will be read positionally without
/// affecting the seek position. `error.Unseekable` is only possible
/// when reading positionally.
///
/// An offset past the end of the file is treated the same as an offset
/// equal to the end of the file.
offset: Offset,
/// Maximum amount of bytes to read from the file. Implementations may
/// assume that the file size does not exceed this amount.
@ -52,7 +60,13 @@ pub const Error = error{
WriteFailed,
};
pub const FileError = Error || std.fs.File.PReadError;
pub const FileError = std.fs.File.PReadError || error{
/// See the `Writer` implementation for detailed diagnostics.
WriteFailed,
/// Indicates the caller should do its own file reading; the callee cannot
/// offer a more efficient implementation.
Unimplemented,
};
pub const Limit = std.io.Reader.Limit;

View File

@ -9420,4 +9420,67 @@ pub const msghdr_const = extern struct {
control: ?*const anyopaque,
controllen: usize,
flags: u32,
};
};
/// The syscalls, but with Zig error sets, going through libc if linking libc,
/// and with some footguns eliminated.
pub const wrapped = struct {
pub const lfs64_abi = builtin.link_libc and (builtin.abi.isGnu() or builtin.abi.isAndroid());
const system = if (builtin.link_libc) std.c else std.os.linux;
pub const SendfileError = std.posix.UnexpectedError || error{
/// `out_fd` is an unconnected socket, or out_fd closed its read end.
BrokenPipe,
/// Descriptor is not valid or locked, or an mmap(2)-like operation is not available for in_fd.
UnsupportedOperation,
/// Nonblocking I/O has been selected but the write would block.
WouldBlock,
/// Unspecified error while reading from in_fd.
InputOutput,
/// Insufficient kernel memory to read from in_fd.
SystemResources,
/// `offset` is not `null` but the input file is not seekable.
Unseekable,
};
pub fn sendfile(
out_fd: fd_t,
in_fd: fd_t,
in_offset: ?*off_t,
in_len: usize,
) SendfileError!usize {
const adjusted_len = @min(in_len, 0x7ffff000); // Prevents EOVERFLOW.
const sendfileSymbol = if (lfs64_abi) system.sendfile64 else system.sendfile;
const rc = sendfileSymbol(out_fd, in_fd, in_offset, adjusted_len);
switch (errno(rc)) {
.SUCCESS => return @bitCast(rc),
.BADF => return invalidApiUsage(), // Always a race condition.
.FAULT => return invalidApiUsage(), // Segmentation fault.
.OVERFLOW => return unexpectedErrno(.OVERFLOW), // We avoid passing too large of a `count`.
.NOTCONN => return error.BrokenPipe, // `out_fd` is an unconnected socket
.INVAL => return error.UnsupportedOperation,
.AGAIN => return error.WouldBlock,
.IO => return error.InputOutput,
.PIPE => return error.BrokenPipe,
.NOMEM => return error.SystemResources,
.NXIO => return error.Unseekable,
.SPIPE => return error.Unseekable,
else => |err| return unexpectedErrno(err),
}
}
const unexpectedErrno = std.posix.unexpectedErrno;
fn invalidApiUsage() error{Unexpected} {
if (builtin.mode == .Debug) @panic("invalid API usage");
return error.Unexpected;
}
fn errno(rc: anytype) E {
if (builtin.link_libc) {
return if (rc == -1) @enumFromInt(std.c._errno().*) else .SUCCESS;
} else {
return errnoFromSyscall(rc);
}
}
};

View File

@ -7553,7 +7553,7 @@ pub fn ioctl_SIOCGIFINDEX(fd: fd_t, ifr: *ifreq) IoCtl_SIOCGIFINDEX_Error!void {
}
}
const lfs64_abi = native_os == .linux and builtin.link_libc and (builtin.abi.isGnu() or builtin.abi.isAndroid());
const lfs64_abi = native_os == .linux and linux.wrapped.lfs64_abi;
/// Whether or not `error.Unexpected` will print its value and a stack trace.
///