From 4dc3a9444cd94d2d10b427d8dd7a3515f9dc96ea Mon Sep 17 00:00:00 2001 From: Andrew Kelley Date: Mon, 21 Apr 2025 18:48:00 -0700 Subject: [PATCH] std: reader/writer fixes --- lib/std/fs/File.zig | 159 +++++++++++++++++++--------------- lib/std/io/BufferedReader.zig | 2 +- lib/std/io/BufferedWriter.zig | 81 ++++++++++++++++- lib/std/io/Writer.zig | 22 ++++- lib/std/os/linux.zig | 65 +++++++++++++- lib/std/posix.zig | 2 +- 6 files changed, 250 insertions(+), 81 deletions(-) diff --git a/lib/std/fs/File.zig b/lib/std/fs/File.zig index 7dd3d2d6af..5b429e5896 100644 --- a/lib/std/fs/File.zig +++ b/lib/std/fs/File.zig @@ -931,7 +931,8 @@ pub const WriteFileError = PReadError || WriteError; pub fn writeFileAll(self: File, in_file: File, options: BufferedWriter.WriteFileOptions) WriteFileError!void { var file_writer = self.writer(); - var bw = file_writer.interface().buffered(&.{}); + var buffer: [2000]u8 = undefined; + var bw = file_writer.interface().buffered(&buffer); bw.writeFileAll(in_file, options) catch |err| switch (err) { error.WriteFailed => if (file_writer.err) |_| unreachable else |e| return e, else => |e| return e, @@ -940,14 +941,27 @@ pub fn writeFileAll(self: File, in_file: File, options: BufferedWriter.WriteFile pub const Reader = struct { file: File, - err: ReadError!void = {}, + err: ?ReadError = null, mode: Reader.Mode = .positional, pos: u64 = 0, size: ?u64 = null, - size_err: GetEndPosError!void = {}, - seek_err: SeekError!void = {}, + size_err: ?GetEndPosError = null, + seek_err: ?SeekError = null, - pub const Mode = enum { streaming, positional }; + pub const Mode = enum { + streaming, + positional, + streaming_reading, + positional_reading, + + pub fn toStreaming(m: @This()) @This() { + return switch (m) { + .positional => .streaming, + .positional_reading => .streaming_reading, + else => unreachable, + }; + } + }; pub fn interface(r: *Reader) std.io.Reader { return .{ @@ -975,7 +989,7 @@ pub const Reader = struct { switch (r.mode) { .positional => { const size = r.size orelse { - if (r.file.getEndPos()) |size| { + if (file.getEndPos()) |size| { r.size = size; } else |err| { r.size_err = err; @@ -991,6 +1005,10 @@ pub const Reader = struct { assert(pos == 0); return 0; }, + error.Unimplemented => { + r.mode = .positional_reading; + return 0; + }, else => |e| { r.err = e; return error.ReadFailed; @@ -1003,6 +1021,10 @@ pub const Reader = struct { const n = bw.writeFile(file, .none, limit, &.{}, 0) catch |err| switch (err) { error.WriteFailed => return error.WriteFailed, error.Unseekable => unreachable, // Passing `Offset.none`. + error.Unimplemented => { + r.mode = .streaming_reading; + return 0; + }, else => |e| { r.err = e; return error.ReadFailed; @@ -1011,6 +1033,35 @@ pub const Reader = struct { r.pos = pos + n; return n; }, + .positional_reading => { + const dest = limit.slice(try bw.writableSliceGreedy(1)); + const n = file.pread(dest, pos) catch |err| switch (err) { + error.Unseekable => { + r.mode = .streaming_reading; + assert(pos == 0); + return 0; + }, + else => |e| { + r.err = e; + return error.ReadFailed; + }, + }; + if (n == 0) return error.EndOfStream; + r.pos = pos + n; + bw.advance(n); + return n; + }, + .streaming_reading => { + const dest = limit.slice(try bw.writableSliceGreedy(1)); + const n = file.read(dest) catch |err| { + r.err = err; + return error.ReadFailed; + }; + if (n == 0) return error.EndOfStream; + r.pos = pos + n; + bw.advance(n); + return n; + }, } } @@ -1020,7 +1071,7 @@ pub const Reader = struct { const pos = r.pos; switch (r.mode) { - .positional => { + .positional, .positional_reading => { if (is_windows) { // Unfortunately, `ReadFileScatter` cannot be used since it requires // page alignment, so we are stuck using only the first slice. @@ -1053,7 +1104,7 @@ pub const Reader = struct { if (send_vecs.len == 0) return 0; // Prevent false positive end detection on empty `data`. const n = posix.preadv(handle, send_vecs, pos) catch |err| switch (err) { error.Unseekable => { - r.mode = .streaming; + r.mode = r.mode.toStreaming(); assert(pos == 0); return 0; }, @@ -1066,7 +1117,7 @@ pub const Reader = struct { r.pos = pos + n; return n; }, - .streaming => { + .streaming, .streaming_reading => { if (is_windows) { // Unfortunately, `ReadFileScatter` cannot be used since it requires // page alignment, so we are stuck using only the first slice. @@ -1113,13 +1164,13 @@ pub const Reader = struct { const file = r.file; const pos = r.pos; switch (r.mode) { - .positional => { + .positional, .positional_reading => { const size = r.size orelse { if (file.getEndPos()) |size| { r.size = size; } else |err| { r.size_err = err; - r.mode = .streaming; + r.mode = r.mode.toStreaming(); } return 0; }; @@ -1127,17 +1178,13 @@ pub const Reader = struct { r.pos = pos + delta; return delta; }, - .streaming => { + .streaming, .streaming_reading => { // Unfortunately we can't seek forward without knowing the // size because the seek syscalls provided to us will not // return the true end position if a seek would exceed the // end. fallback: { - if (r.size_err) |_| { - if (r.seek_err) |_| { - break :fallback; - } else |_| {} - } else |_| {} + if (r.size_err == null and r.seek_err == null) break :fallback; var trash_buffer: [std.atomic.cache_line]u8 = undefined; const trash = &trash_buffer; if (is_windows) { @@ -1188,9 +1235,16 @@ pub const Writer = struct { err: WriteError!void = {}, mode: Writer.Mode = .positional, pos: u64 = 0, + sendfile_err: ?SendfileError = null, + read_err: ?ReadError = null, pub const Mode = Reader.Mode; + pub const SendfileError = error{ + UnsupportedOperation, + Unexpected, + }; + /// Number of slices to store on the stack, when trying to send as many byte /// vectors through the underlying write calls as possible. const max_buffers_len = 16; @@ -1269,75 +1323,38 @@ pub const Writer = struct { const w: *Writer = @ptrCast(@alignCast(context)); const out_fd = w.file.handle; const in_fd = in_file.handle; - const len_int = switch (in_limit) { - .nothing => return writeSplat(context, headers_and_trailers, 1), - .unlimited => 0, - else => in_limit.toInt().?, - }; - // TODO try using copy_file_range on linux - // TODO try using copy_file_range on freebsd - if (native_os == .linux) sf: { + // TODO try using copy_file_range on Linux + // TODO try using copy_file_range on FreeBSD + // TODO try using sendfile on macOS + // TODO try using sendfile on FreeBSD + if (native_os == .linux and w.mode == .streaming) sf: { + // Try using sendfile on Linux. + if (w.sendfile_err != null) break :sf; // Linux sendfile does not support headers or trailers but it does // support a streaming read from in_file. if (headers_len > 0) return writeSplat(context, headers_and_trailers[0..headers_len], 1); const max_count = 0x7ffff000; // Avoid EINVAL. - const smaller_len = if (len_int == 0) max_count else @min(len_int, max_count); + const smaller_len = in_limit.minInt(max_count); var off: std.os.linux.off_t = undefined; const off_ptr: ?*std.os.linux.off_t = if (in_offset.toInt()) |offset| b: { off = std.math.cast(std.os.linux.off_t, offset) orelse return writeSplat(context, headers_and_trailers, 1); break :b &off; } else null; - if (true) @panic("TODO"); const n = std.os.linux.wrapped.sendfile(out_fd, in_fd, off_ptr, smaller_len) catch |err| switch (err) { - error.UnsupportedOperation => break :sf, - error.Unseekable => break :sf, - error.Unexpected => break :sf, + // Errors that imply sendfile should be avoided on the next write. + error.UnsupportedOperation, + error.Unexpected, + => |e| { + w.sendfile_err = e; + break :sf; + }, else => |e| return e, }; - if (in_offset.toInt()) |offset| { - assert(n == off - offset); - } else if (n == 0 and len_int == 0) { - // The caller wouldn't be able to tell that the file transfer is - // done and would incorrectly repeat the same call. - return writeSplat(context, headers_and_trailers, 1); - } + w.pos += n; return n; } - var iovecs_buffer: [max_buffers_len]std.posix.iovec_const = undefined; - const iovecs = iovecs_buffer[0..@min(iovecs_buffer.len, headers_and_trailers.len)]; - for (iovecs, headers_and_trailers[0..iovecs.len]) |*v, d| v.* = .{ .base = d.ptr, .len = d.len }; - const headers = iovecs[0..@min(headers_len, iovecs.len)]; - const trailers = iovecs[headers.len..]; - const flags = 0; - return posix.sendfile(out_fd, in_fd, in_offset, len_int, headers, trailers, flags) catch |err| switch (err) { - error.Unseekable, - error.FastOpenAlreadyInProgress, - error.MessageTooBig, - error.FileDescriptorNotASocket, - error.NetworkUnreachable, - error.NetworkSubsystemFailed, - => return writeFileUnseekable(out_fd, in_fd, in_offset, in_limit, headers_and_trailers, headers_len), - - else => |e| return e, - }; - } - - fn writeFileUnseekable( - out_fd: Handle, - in_fd: Handle, - in_offset: u64, - in_limit: std.io.Writer.Limit, - headers_and_trailers: []const []const u8, - headers_len: usize, - ) std.io.Writer.FileError!usize { - _ = out_fd; - _ = in_fd; - _ = in_offset; - _ = in_limit; - _ = headers_and_trailers; - _ = headers_len; - @panic("TODO writeFileUnseekable"); + return error.Unimplemented; } }; diff --git a/lib/std/io/BufferedReader.zig b/lib/std/io/BufferedReader.zig index de911d7f03..b2eb68a332 100644 --- a/lib/std/io/BufferedReader.zig +++ b/lib/std/io/BufferedReader.zig @@ -99,7 +99,7 @@ pub fn readVecLimit(br: *BufferedReader, data: []const []u8, limit: Reader.Limit fn passthruRead(context: ?*anyopaque, bw: *BufferedWriter, limit: Reader.Limit) Reader.RwError!usize { const br: *BufferedReader = @alignCast(@ptrCast(context)); - const buffer = limit.slice(br.buffer[br.end..br.seek]); + const buffer = limit.slice(br.buffer[br.seek..br.end]); if (buffer.len > 0) { const n = try bw.write(buffer); br.seek += n; diff --git a/lib/std/io/BufferedWriter.zig b/lib/std/io/BufferedWriter.zig index bea74b743d..dbf7670ba7 100644 --- a/lib/std/io/BufferedWriter.zig +++ b/lib/std/io/BufferedWriter.zig @@ -480,6 +480,14 @@ pub fn writeSliceSwap(bw: *BufferedWriter, Elem: type, slice: []const Elem) Writ /// Unlike `writeSplat` and `writeVec`, this function will call into the /// underlying writer even if there is enough buffer capacity for the file /// contents. +/// +/// Although it would be possible to eliminate `error.Unimplemented` from +/// the error set by reading directly into the buffer in such case, +/// this is not done because it is more efficient to do it in `writeFileAll` +/// so that the error does not occur with each write. +/// +/// See `writeFileReading` for an alternative that does not have +/// `error.Unimplemented` in the error set. pub fn writeFile( bw: *BufferedWriter, file: std.fs.File, @@ -491,6 +499,23 @@ pub fn writeFile( return passthruWriteFile(bw, file, offset, limit, headers_and_trailers, headers_len); } +pub const WriteFileReadingError = std.fs.File.PReadError || Writer.Error; + +/// Returning zero bytes means end of stream. +/// +/// Asserts nonzero buffer capacity. +pub fn writeFileReading( + bw: *BufferedWriter, + file: std.fs.File, + offset: Writer.Offset, + limit: Writer.Limit, +) WriteFileReadingError!usize { + const dest = limit.slice(try bw.writableSliceGreedy(1)); + const n = if (offset.toInt()) |pos| try file.pread(dest, pos) else try file.read(dest); + bw.advance(n); + return n; +} + fn passthruWriteFile( context: ?*anyopaque, file: std.fs.File, @@ -588,7 +613,7 @@ pub const WriteFileOptions = struct { headers_len: usize = 0, }; -pub fn writeFileAll(bw: *BufferedWriter, file: std.fs.File, options: WriteFileOptions) Writer.FileError!void { +pub fn writeFileAll(bw: *BufferedWriter, file: std.fs.File, options: WriteFileOptions) WriteFileReadingError!void { const headers_and_trailers = options.headers_and_trailers; const headers = headers_and_trailers[0..options.headers_len]; switch (options.limit) { @@ -601,7 +626,15 @@ pub fn writeFileAll(bw: *BufferedWriter, file: std.fs.File, options: WriteFileOp var i: usize = 0; var offset = options.offset; while (true) { - var n = try bw.writeFile(file, offset, .unlimited, headers[i..], headers.len - i); + var n = bw.writeFile(file, offset, .unlimited, headers[i..], headers.len - i) catch |err| switch (err) { + error.Unimplemented => { + try bw.writeVecAll(headers[i..]); + try bw.writeFileReadingAll(file, offset, .unlimited); + try bw.writeVecAll(headers_and_trailers[headers.len..]); + return; + }, + else => |e| return e, + }; while (i < headers.len and n >= headers[i].len) { n -= headers[i].len; i += 1; @@ -619,7 +652,15 @@ pub fn writeFileAll(bw: *BufferedWriter, file: std.fs.File, options: WriteFileOp var i: usize = 0; var offset = options.offset; while (true) { - var n = try bw.writeFile(file, offset, .limited(len), headers_and_trailers[i..], headers.len - i); + var n = bw.writeFile(file, offset, .limited(len), headers_and_trailers[i..], headers.len - i) catch |err| switch (err) { + error.Unimplemented => { + try bw.writeVecAll(headers[i..]); + try bw.writeFileReadingAll(file, offset, .limited(len)); + try bw.writeVecAll(headers_and_trailers[headers.len..]); + return; + }, + else => |e| return e, + }; while (i < headers.len and n >= headers[i].len) { n -= headers[i].len; i += 1; @@ -646,6 +687,40 @@ pub fn writeFileAll(bw: *BufferedWriter, file: std.fs.File, options: WriteFileOp } } +/// Equivalent to `writeFileAll` but uses direct `pread` and `read` calls on +/// `file` rather than `Writer.writeFile`. This is generally used as a fallback +/// when the underlying implementation returns `error.Unimplemented`, which is +/// why that error code does not appear in this function's error set. +/// +/// Asserts nonzero buffer capacity. +pub fn writeFileReadingAll( + bw: *BufferedWriter, + file: std.fs.File, + offset: Writer.Offset, + limit: Writer.Limit, +) WriteFileReadingError!void { + if (offset.toInt()) |start_pos| { + var remaining = limit; + var pos = start_pos; + while (remaining.nonzero()) { + const dest = remaining.slice(try bw.writableSliceGreedy(1)); + const n = try file.pread(dest, pos); + if (n == 0) return; + bw.advance(n); + pos += n; + remaining = remaining.subtract(n).?; + } + } + var remaining = limit; + while (remaining.nonzero()) { + const dest = remaining.slice(try bw.writableSliceGreedy(1)); + const n = try file.read(dest); + if (n == 0) return; + bw.advance(n); + remaining = remaining.subtract(n).?; + } +} + pub fn alignBuffer( bw: *BufferedWriter, buffer: []const u8, diff --git a/lib/std/io/Writer.zig b/lib/std/io/Writer.zig index 27cb17ddcc..5c3020b54e 100644 --- a/lib/std/io/Writer.zig +++ b/lib/std/io/Writer.zig @@ -30,12 +30,20 @@ pub const VTable = struct { /// Number of bytes returned may be zero, which does not mean /// end-of-stream. A subsequent call may return nonzero, or may signal end /// of stream via `error.WriteFailed`. + /// + /// If `error.Unimplemented` is returned, the caller should do its own + /// reads from the file. The callee indicates it cannot offer a more + /// efficient implementation. writeFile: *const fn ( ctx: ?*anyopaque, file: std.fs.File, - /// If this is `none`, `file` will be streamed, affecting the seek - /// position. Otherwise, it will be read positionally without affecting - /// the seek position. + /// If this is `Offset.none`, `file` will be streamed, affecting the + /// seek position. Otherwise, it will be read positionally without + /// affecting the seek position. `error.Unseekable` is only possible + /// when reading positionally. + /// + /// An offset past the end of the file is treated the same as an offset + /// equal to the end of the file. offset: Offset, /// Maximum amount of bytes to read from the file. Implementations may /// assume that the file size does not exceed this amount. @@ -52,7 +60,13 @@ pub const Error = error{ WriteFailed, }; -pub const FileError = Error || std.fs.File.PReadError; +pub const FileError = std.fs.File.PReadError || error{ + /// See the `Writer` implementation for detailed diagnostics. + WriteFailed, + /// Indicates the caller should do its own file reading; the callee cannot + /// offer a more efficient implementation. + Unimplemented, +}; pub const Limit = std.io.Reader.Limit; diff --git a/lib/std/os/linux.zig b/lib/std/os/linux.zig index 3b2f51464e..7296cfb873 100644 --- a/lib/std/os/linux.zig +++ b/lib/std/os/linux.zig @@ -9420,4 +9420,67 @@ pub const msghdr_const = extern struct { control: ?*const anyopaque, controllen: usize, flags: u32, -}; \ No newline at end of file +}; + +/// The syscalls, but with Zig error sets, going through libc if linking libc, +/// and with some footguns eliminated. +pub const wrapped = struct { + pub const lfs64_abi = builtin.link_libc and (builtin.abi.isGnu() or builtin.abi.isAndroid()); + const system = if (builtin.link_libc) std.c else std.os.linux; + + pub const SendfileError = std.posix.UnexpectedError || error{ + /// `out_fd` is an unconnected socket, or out_fd closed its read end. + BrokenPipe, + /// Descriptor is not valid or locked, or an mmap(2)-like operation is not available for in_fd. + UnsupportedOperation, + /// Nonblocking I/O has been selected but the write would block. + WouldBlock, + /// Unspecified error while reading from in_fd. + InputOutput, + /// Insufficient kernel memory to read from in_fd. + SystemResources, + /// `offset` is not `null` but the input file is not seekable. + Unseekable, + }; + + pub fn sendfile( + out_fd: fd_t, + in_fd: fd_t, + in_offset: ?*off_t, + in_len: usize, + ) SendfileError!usize { + const adjusted_len = @min(in_len, 0x7ffff000); // Prevents EOVERFLOW. + const sendfileSymbol = if (lfs64_abi) system.sendfile64 else system.sendfile; + const rc = sendfileSymbol(out_fd, in_fd, in_offset, adjusted_len); + switch (errno(rc)) { + .SUCCESS => return @bitCast(rc), + .BADF => return invalidApiUsage(), // Always a race condition. + .FAULT => return invalidApiUsage(), // Segmentation fault. + .OVERFLOW => return unexpectedErrno(.OVERFLOW), // We avoid passing too large of a `count`. + .NOTCONN => return error.BrokenPipe, // `out_fd` is an unconnected socket + .INVAL => return error.UnsupportedOperation, + .AGAIN => return error.WouldBlock, + .IO => return error.InputOutput, + .PIPE => return error.BrokenPipe, + .NOMEM => return error.SystemResources, + .NXIO => return error.Unseekable, + .SPIPE => return error.Unseekable, + else => |err| return unexpectedErrno(err), + } + } + + const unexpectedErrno = std.posix.unexpectedErrno; + + fn invalidApiUsage() error{Unexpected} { + if (builtin.mode == .Debug) @panic("invalid API usage"); + return error.Unexpected; + } + + fn errno(rc: anytype) E { + if (builtin.link_libc) { + return if (rc == -1) @enumFromInt(std.c._errno().*) else .SUCCESS; + } else { + return errnoFromSyscall(rc); + } + } +}; diff --git a/lib/std/posix.zig b/lib/std/posix.zig index 414a9289b3..36f9f12c23 100644 --- a/lib/std/posix.zig +++ b/lib/std/posix.zig @@ -7553,7 +7553,7 @@ pub fn ioctl_SIOCGIFINDEX(fd: fd_t, ifr: *ifreq) IoCtl_SIOCGIFINDEX_Error!void { } } -const lfs64_abi = native_os == .linux and builtin.link_libc and (builtin.abi.isGnu() or builtin.abi.isAndroid()); +const lfs64_abi = native_os == .linux and linux.wrapped.lfs64_abi; /// Whether or not `error.Unexpected` will print its value and a stack trace. ///