From 55d6341eabf002017587863c92e6d5860527a72a Mon Sep 17 00:00:00 2001 From: Andrew Kelley Date: Fri, 27 Jun 2025 12:53:52 -0700 Subject: [PATCH] std.io.Writer: introduce flush into the vtable also break `drainTo` and `sendFileTo` into lower level primitives `writeSplatHeader` and `sendFileHeader` respectively. these are easier to reason about in drain implementations. --- lib/std/http.zig | 39 ++++---- lib/std/io/Writer.zig | 227 +++++++++++++++++++----------------------- lib/std/net.zig | 115 ++++++++++++--------- 3 files changed, 188 insertions(+), 193 deletions(-) diff --git a/lib/std/http.zig b/lib/std/http.zig index d3f09449d7..cc424f5c8d 100644 --- a/lib/std/http.zig +++ b/lib/std/http.zig @@ -922,37 +922,41 @@ pub const BodyWriter = struct { const bw: *BodyWriter = @fieldParentPtr("writer", w); assert(!bw.isEliding()); const out = bw.http_protocol_output; - const n = try w.drainTo(out, data, splat); + const n = try out.writeSplatHeader(w.buffered(), data, splat); bw.state.content_length -= n; - return n; + return w.consume(n); } pub fn noneDrain(w: *Writer, data: []const []const u8, splat: usize) Error!usize { const bw: *BodyWriter = @fieldParentPtr("writer", w); assert(!bw.isEliding()); const out = bw.http_protocol_output; - return try w.drainTo(out, data, splat); + const n = try out.writeSplatHeader(w.buffered(), data, splat); + return w.consume(n); } /// Returns `null` if size cannot be computed without making any syscalls. pub fn noneSendFile(w: *Writer, file_reader: *File.Reader, limit: std.io.Limit) Writer.FileError!usize { const bw: *BodyWriter = @fieldParentPtr("writer", w); assert(!bw.isEliding()); - return w.sendFileTo(bw.http_protocol_output, file_reader, limit); + const out = bw.http_protocol_output; + const n = try out.sendFileHeader(w.buffered(), file_reader, limit); + return w.consume(n); } pub fn contentLengthSendFile(w: *Writer, file_reader: *File.Reader, limit: std.io.Limit) Writer.FileError!usize { const bw: *BodyWriter = @fieldParentPtr("writer", w); assert(!bw.isEliding()); - const n = try w.sendFileTo(bw.http_protocol_output, file_reader, limit); + const out = bw.http_protocol_output; + const n = try out.sendFileHeader(w.buffered(), file_reader, limit); bw.state.content_length -= n; - return n; + return w.consume(n); } pub fn chunkedSendFile(w: *Writer, file_reader: *File.Reader, limit: std.io.Limit) Writer.FileError!usize { const bw: *BodyWriter = @fieldParentPtr("writer", w); assert(!bw.isEliding()); - const data_len = if (file_reader.getSize()) |x| w.end + x else |_| { + const data_len = Writer.countSendFileLowerBound(w.end, file_reader, limit) orelse { // If the file size is unknown, we cannot lower to a `sendFile` since we would // have to flush the chunk header before knowing the chunk length. return error.Unimplemented; @@ -965,9 +969,9 @@ pub const BodyWriter = struct { const buffered_len = out.end - off - chunk_header_template.len; const chunk_len = data_len + buffered_len; writeHex(out.buffer[off..][0..chunk_len_digits], chunk_len); - const n = try w.sendFileTo(out, file_reader, limit); + const n = try out.sendFileHeader(w.buffered(), file_reader, limit); chunked.* = .{ .chunk_len = data_len + 2 - n }; - return n; + return w.consume(n); }, .chunk_len => |chunk_len| l: switch (chunk_len) { 0 => { @@ -989,9 +993,9 @@ pub const BodyWriter = struct { }, else => { const new_limit = limit.min(.limited(chunk_len - 2)); - const n = try w.sendFileTo(out, file_reader, new_limit); + const n = try out.sendFileHeader(w.buffered(), file_reader, new_limit); chunked.chunk_len = chunk_len - n; - return n; + return w.consume(n); }, }, } @@ -1001,20 +1005,19 @@ pub const BodyWriter = struct { const bw: *BodyWriter = @fieldParentPtr("writer", w); assert(!bw.isEliding()); const out = bw.http_protocol_output; - const data_len = Writer.countSplat(w.end, data, splat); + const data_len = w.end + Writer.countSplat(data, splat); const chunked = &bw.state.chunked; state: switch (chunked.*) { .offset => |offset| { if (out.unusedCapacityLen() >= data_len) { - assert(data_len == (w.drainTo(out, data, splat) catch unreachable)); - return data_len; + return w.consume(out.writeSplatHeader(w.buffered(), data, splat) catch unreachable); } const buffered_len = out.end - offset - chunk_header_template.len; const chunk_len = data_len + buffered_len; writeHex(out.buffer[offset..][0..chunk_len_digits], chunk_len); - const n = try w.drainTo(out, data, splat); + const n = try out.writeSplatHeader(w.buffered(), data, splat); chunked.* = .{ .chunk_len = data_len + 2 - n }; - return n; + return w.consume(n); }, .chunk_len => |chunk_len| l: switch (chunk_len) { 0 => { @@ -1035,9 +1038,9 @@ pub const BodyWriter = struct { continue :l 1; }, else => { - const n = try w.drainToLimit(out, data, splat, .limited(chunk_len - 2)); + const n = try out.writeSplatHeaderLimit(w.buffered(), data, splat, .limited(chunk_len - 2)); chunked.chunk_len = chunk_len - n; - return n; + return w.consume(n); }, }, } diff --git a/lib/std/io/Writer.zig b/lib/std/io/Writer.zig index 7a7fe2de7d..c1371d3434 100644 --- a/lib/std/io/Writer.zig +++ b/lib/std/io/Writer.zig @@ -23,29 +23,26 @@ count: usize = 0, pub const VTable = struct { /// Sends bytes to the logical sink. A write will only be sent here if it - /// could not fit into `buffer`. + /// could not fit into `buffer`, or during a `flush` operation. /// /// `buffer[0..end]` is consumed first, followed by each slice of `data` in /// order. Elements of `data` may alias each other but may not alias /// `buffer`. /// - /// This function modifies `Writer.end` and `Writer.buffer`. + /// This function modifies `Writer.end` and `Writer.buffer` in an + /// implementation-defined manner. /// - /// If `data.len` is zero, it indicates this is a "flush" operation; all - /// remaining buffered data must be logically consumed. Generally, this - /// means that `end` will be set to zero before returning, however, it is - /// legal for implementations to manage that data differently. There may be - /// subsequent calls to `drain` and `sendFile` after a flush operation. + /// `data.len` must be nonzero. /// - /// The last element of `data` is special. It is repeated as necessary so - /// that it is written `splat` number of times, which may be zero. + /// The last element of `data` is repeated as necessary so that it is + /// written `splat` number of times, which may be zero. /// - /// Number of bytes actually written is returned, excluding bytes from - /// `buffer`. Bytes from `buffer` are tracked by modifying `end`. + /// Number of bytes consumed from `data` is returned, excluding bytes from + /// `buffer`. /// - /// Number of bytes returned may be zero, which does not mean - /// end-of-stream. A subsequent call may return nonzero, or signal end of - /// stream via `error.WriteFailed`. + /// Number of bytes returned may be zero, which does not indicate stream + /// end. A subsequent call may return nonzero, or signal end of stream via + /// `error.WriteFailed`. drain: *const fn (w: *Writer, data: []const []const u8, splat: usize) Error!usize, /// Copies contents from an open file to the logical sink. `buffer[0..end]` @@ -55,9 +52,9 @@ pub const VTable = struct { /// `buffer` because they have already been logically written. Number of /// bytes consumed from `buffer` are tracked by modifying `end`. /// - /// Number of bytes returned may be zero, which does not necessarily mean - /// end-of-stream. A subsequent call may return nonzero, or signal end of - /// stream via `error.WriteFailed`. Caller must check `file_reader` state + /// Number of bytes returned may be zero, which does not indicate stream + /// end. A subsequent call may return nonzero, or signal end of stream via + /// `error.WriteFailed`. Caller may check `file_reader` state /// (`File.Reader.atEnd`) to disambiguate between a zero-length read or /// write, and whether the file reached the end. /// @@ -71,6 +68,16 @@ pub const VTable = struct { /// `buffer` does not count towards this limit. limit: Limit, ) FileError!usize = unimplementedSendFile, + + /// Consumes all remaining buffer. + /// + /// The default flush implementation calls drain repeatedly until `end` is + /// zero, however it is legal for implementations to manage `end` + /// differently. For instance, `Allocating` flush is a no-op. + /// + /// There may be subsequent calls to `drain` and `sendFile` after a `flush` + /// operation. + flush: *const fn (w: *Writer) Error!void = defaultFlush, }; pub const Error = error{ @@ -141,16 +148,15 @@ pub fn buffered(w: *const Writer) []u8 { return w.buffer[0..w.end]; } -pub fn countSplat(n: usize, data: []const []const u8, splat: usize) usize { - assert(data.len > 0); - var total: usize = n; +pub fn countSplat(data: []const []const u8, splat: usize) usize { + var total: usize = 0; for (data[0 .. data.len - 1]) |buf| total += buf.len; total += data[data.len - 1].len * splat; return total; } -pub fn countSendFileUpperBound(n: usize, file_reader: *File.Reader, limit: Limit) ?usize { - const total: u64 = @min(@intFromEnum(limit), file_reader.getSize() orelse return null); +pub fn countSendFileLowerBound(n: usize, file_reader: *File.Reader, limit: Limit) ?usize { + const total: u64 = @min(@intFromEnum(limit), file_reader.getSize() catch return null); return std.math.lossyCast(usize, total + n); } @@ -167,7 +173,7 @@ pub fn writeVec(w: *Writer, data: []const []const u8) Error!usize { pub fn writeSplat(w: *Writer, data: []const []const u8, splat: usize) Error!usize { assert(data.len > 0); const buffer = w.buffer; - const count = countSplat(0, data, splat); + const count = countSplat(data, splat); if (w.end + count > buffer.len) { const n = try w.vtable.drain(w, data, splat); w.count += n; @@ -215,13 +221,65 @@ pub fn writeSplatLimit( @panic("TODO"); } +/// Returns how many bytes were consumed from `header` and `data`. +pub fn writeSplatHeader( + w: *Writer, + header: []const u8, + data: []const []const u8, + splat: usize, +) Error!usize { + const new_end = w.end + header.len; + if (new_end <= w.buffer.len) { + @memcpy(w.buffer[w.end..][0..header.len], header); + w.end = new_end; + w.count += header.len; + return header.len + try writeSplat(w, data, splat); + } + var vecs: [8][]const u8 = undefined; // Arbitrarily chosen size. + var i: usize = 1; + vecs[0] = header; + for (data) |buf| { + if (buf.len == 0) continue; + vecs[i] = buf; + i += 1; + if (vecs.len - i == 0) break; + } + const new_splat = if (vecs[i - 1].ptr == data[data.len - 1].ptr) splat else 1; + const n = try w.vtable.drain(w, vecs[0..i], new_splat); + w.count += n; + return n; +} + +/// Equivalent to `writeSplatHeader` but writes at most `limit` bytes. +pub fn writeSplatHeaderLimit( + w: *Writer, + header: []const u8, + data: []const []const u8, + splat: usize, + limit: Limit, +) Error!usize { + _ = w; + _ = header; + _ = data; + _ = splat; + _ = limit; + @panic("TODO"); +} + /// Drains all remaining buffered data. -/// -/// It is legal for `VTable.drain` implementations to refrain from modifying -/// `end`. pub fn flush(w: *Writer) Error!void { - assert(0 == try w.vtable.drain(w, &.{}, 0)); - if (w.end != 0) assert(w.vtable.drain == &fixedDrain); + return w.vtable.flush(w); +} + +/// Repeatedly calls `VTable.drain` until `end` is zero. +pub fn defaultFlush(w: *Writer) Error!void { + const drainFn = w.vtable.drain; + while (w.end != 0) _ = try drainFn(w, &.{""}, 1); +} + +/// Does nothing. +pub fn noopFlush(w: *Writer) Error!void { + _ = w; } /// Calls `VTable.drain` but hides the last `preserve_length` bytes from the @@ -236,67 +294,6 @@ pub fn drainPreserve(w: *Writer, preserve_length: usize) Error!void { @memmove(w.buffer[w.end..][0..preserved.len], preserved); } -/// Forwards a `drain` to a second `Writer` instance. `w` is only used for its -/// buffer, but it has its `end` and `count` adjusted accordingly depending on -/// how much was consumed. -/// -/// Returns how many bytes from `data` were consumed. -pub fn drainTo(noalias w: *Writer, noalias other: *Writer, data: []const []const u8, splat: usize) Error!usize { - assert(w != other); - const header = w.buffered(); - const new_end = other.end + header.len; - if (new_end <= other.buffer.len) { - @memcpy(other.buffer[other.end..][0..header.len], header); - other.end = new_end; - other.count += header.len; - w.end = 0; - const n = try other.vtable.drain(other, data, splat); - other.count += n; - return n; - } - if (other.vtable == &VectorWrapper.vtable) { - const wrapper: *VectorWrapper = @fieldParentPtr("writer", w); - while (wrapper.it.next()) |dest| { - _ = dest; - @panic("TODO"); - } - } - var vecs: [8][]const u8 = undefined; // Arbitrarily chosen size. - var i: usize = 1; - vecs[0] = header; - for (data) |buf| { - if (buf.len == 0) continue; - vecs[i] = buf; - i += 1; - if (vecs.len - i == 0) break; - } - const new_splat = if (vecs[i - 1].ptr == data[data.len - 1].ptr) splat else 1; - const n = try other.vtable.drain(other, vecs[0..i], new_splat); - other.count += n; - if (n < header.len) { - const remaining = w.buffer[n..w.end]; - @memmove(w.buffer[0..remaining.len], remaining); - w.end = remaining.len; - return 0; - } - defer w.end = 0; - return n - header.len; -} - -pub fn drainToLimit( - noalias w: *Writer, - noalias other: *Writer, - data: []const []const u8, - splat: usize, - limit: Limit, -) Error!usize { - assert(w != other); - _ = data; - _ = splat; - _ = limit; - @panic("TODO"); -} - pub fn unusedCapacitySlice(w: *const Writer) []u8 { return w.buffer[w.end..]; } @@ -672,47 +669,25 @@ pub fn sendFile(w: *Writer, file_reader: *File.Reader, limit: Limit) FileError!u return w.vtable.sendFile(w, file_reader, limit); } -/// Forwards a `sendFile` to a second `Writer` instance. `w` is only used for -/// its buffer, but it has its `end` and `count` adjusted accordingly depending -/// on how much was consumed. -/// -/// Returns how many bytes from `file_reader` were consumed. -pub fn sendFileTo( - noalias w: *Writer, - noalias other: *Writer, +/// Returns how many bytes from `header` and `file_reader` were consumed. +pub fn sendFileHeader( + w: *Writer, + header: []const u8, file_reader: *File.Reader, limit: Limit, ) FileError!usize { - assert(w != other); - const header = w.buffered(); - const new_end = other.end + header.len; - if (new_end <= other.buffer.len) { - @memcpy(other.buffer[other.end..][0..header.len], header); - other.end = new_end; - other.count += header.len; - w.end = 0; - return other.vtable.sendFile(other, file_reader, limit); + const new_end = w.end + header.len; + if (new_end <= w.buffer.len) { + @memcpy(w.buffer[w.end..][0..header.len], header); + w.end = new_end; + w.count += header.len; + return header.len + try w.vtable.sendFile(w, file_reader, limit); } - assert(header.len > 0); - var vec_buf: [2][]const u8 = .{ header, undefined }; - var vec_i: usize = 1; const buffered_contents = limit.slice(file_reader.interface.buffered()); - if (buffered_contents.len > 0) { - vec_buf[vec_i] = buffered_contents; - vec_i += 1; - } - const n = try other.vtable.drain(other, vec_buf[0..vec_i], 1); - other.count += n; - if (n < header.len) { - const remaining = w.buffer[n..w.end]; - @memmove(w.buffer[0..remaining.len], remaining); - w.end = remaining.len; - return 0; - } - w.end = 0; - const tossed = n - header.len; - file_reader.interface.toss(tossed); - return tossed; + const n = try w.vtable.drain(w, &.{ header, buffered_contents }, 1); + w.count += n; + file_reader.interface.toss(n - header.len); + return n; } /// Asserts nonzero buffer capacity. @@ -2126,6 +2101,7 @@ pub const Allocating = struct { const vtable: VTable = .{ .drain = Allocating.drain, .sendFile = Allocating.sendFile, + .flush = noopFlush, }; pub fn deinit(a: *Allocating) void { @@ -2172,7 +2148,6 @@ pub const Allocating = struct { } fn drain(w: *Writer, data: []const []const u8, splat: usize) Error!usize { - if (data.len == 0) return 0; // flush const a: *Allocating = @fieldParentPtr("interface", w); const gpa = a.allocator; const pattern = data[data.len - 1]; diff --git a/lib/std/net.zig b/lib/std/net.zig index ee2f8f1d4a..f616a52d3e 100644 --- a/lib/std/net.zig +++ b/lib/std/net.zig @@ -1999,44 +1999,56 @@ pub const Stream = struct { const w: *Writer = @fieldParentPtr("interface", io_w); const buffered = io_w.buffered(); comptime assert(native_os == .windows); - var splat_buffer: [splat_buffer_len]u8 = undefined; var iovecs: [max_buffers_len]windows.WSABUF = undefined; var len: u32 = 0; if (buffered.len != 0) { iovecs[len] = .{ - .base = buffered.ptr, + .buf = buffered.ptr, .len = buffered.len, }; len += 1; } - for (data[0..data.len]) |bytes| { + for (data) |bytes| { if (bytes.len == 0) continue; iovecs[len] = .{ .buf = bytes.ptr, .len = bytes.len, }; len += 1; + if (iovecs.len - len == 0) break; } + if (len == 0) return 0; const pattern = data[data.len - 1]; switch (splat) { - 0 => len -= 1, + 0 => if (iovecs[len - 1].buf == data[data.len - 1].ptr) { + len -= 1; + }, 1 => {}, else => switch (pattern.len) { 0 => {}, - 1 => { + 1 => memset: { // Replace the 1-byte buffer with a bigger one. + if (iovecs[len - 1].buf == data[data.len - 1].ptr) len -= 1; + if (iovecs.len - len == 0) break :memset; + const splat_buffer_candidate = io_w.buffer[io_w.end..]; + var backup_buffer: [32]u8 = undefined; + const splat_buffer = if (splat_buffer_candidate.len >= backup_buffer.len) + splat_buffer_candidate + else + &backup_buffer; const memset_len = @min(splat_buffer.len, splat); const buf = splat_buffer[0..memset_len]; @memset(buf, pattern[0]); - iovecs[len - 1] = .{ .buf = buf.ptr, .len = buf.len }; + iovecs[len] = .{ .buf = buf.ptr, .len = buf.len }; + len += 1; var remaining_splat = splat - buf.len; while (remaining_splat > splat_buffer.len and len < iovecs.len) { - iovecs[len] = .{ .buf = &splat_buffer, .len = splat_buffer.len }; + iovecs[len] = .{ .buf = splat_buffer.ptr, .len = splat_buffer.len }; remaining_splat -= splat_buffer.len; len += 1; } - if (remaining_splat > 0 and len < iovecs.len) { - iovecs[len] = .{ .buf = &splat_buffer, .len = remaining_splat }; + if (remaining_splat > 0 and iovecs.len - len != 0) { + iovecs[len] = .{ .buf = splat_buffer.ptr, .len = remaining_splat }; len += 1; } }, @@ -2134,48 +2146,52 @@ pub const Stream = struct { .flags = 0, }; }; - if (data.len != 0) { - const pattern = data[data.len - 1]; - switch (splat) { - 0 => if (msg.iovlen != 0 and iovecs[msg.iovlen - 1].base == data[data.len - 1].ptr) { - msg.iovlen -= 1; - }, - 1 => {}, - else => switch (pattern.len) { - 0 => {}, - 1 => memset: { - // Replace the 1-byte buffer with a bigger one. - if (msg.iovlen != 0 and iovecs[msg.iovlen - 1].base == data[data.len - 1].ptr) - msg.iovlen -= 1; - if (iovecs.len - msg.iovlen == 0) break :memset; - const splat_buffer = io_w.buffer[io_w.end..]; - const memset_len = @min(splat_buffer.len, splat); - const buf = splat_buffer[0..memset_len]; - @memset(buf, pattern[0]); - iovecs[msg.iovlen] = .{ .base = buf.ptr, .len = buf.len }; + if (msg.iovlen == 0) return 0; + const pattern = data[data.len - 1]; + switch (splat) { + 0 => if (iovecs[msg.iovlen - 1].base == data[data.len - 1].ptr) { + msg.iovlen -= 1; + }, + 1 => {}, + else => switch (pattern.len) { + 0 => {}, + 1 => memset: { + // Replace the 1-byte buffer with a bigger one. + if (iovecs[msg.iovlen - 1].base == data[data.len - 1].ptr) msg.iovlen -= 1; + if (iovecs.len - msg.iovlen == 0) break :memset; + const splat_buffer_candidate = io_w.buffer[io_w.end..]; + var backup_buffer: [32]u8 = undefined; + const splat_buffer = if (splat_buffer_candidate.len >= backup_buffer.len) + splat_buffer_candidate + else + &backup_buffer; + if (splat_buffer.len == 0) break :memset; + const memset_len = @min(splat_buffer.len, splat); + const buf = splat_buffer[0..memset_len]; + @memset(buf, pattern[0]); + iovecs[msg.iovlen] = .{ .base = buf.ptr, .len = buf.len }; + msg.iovlen += 1; + var remaining_splat = splat - buf.len; + while (remaining_splat > splat_buffer.len and iovecs.len - msg.iovlen != 0) { + assert(buf.len == splat_buffer.len); + iovecs[msg.iovlen] = .{ .base = splat_buffer.ptr, .len = splat_buffer.len }; msg.iovlen += 1; - var remaining_splat = splat - buf.len; - while (remaining_splat > splat_buffer.len and iovecs.len - msg.iovlen != 0) { - assert(buf.len == splat_buffer.len); - iovecs[msg.iovlen] = .{ .base = splat_buffer.ptr, .len = splat_buffer.len }; - msg.iovlen += 1; - remaining_splat -= splat_buffer.len; - } - if (remaining_splat > 0 and iovecs.len - msg.iovlen != 0) { - iovecs[msg.iovlen] = .{ .base = splat_buffer.ptr, .len = remaining_splat }; - msg.iovlen += 1; - } - }, - else => for (0..splat - 1) |_| { - if (iovecs.len - msg.iovlen == 0) break; - iovecs[msg.iovlen] = .{ - .base = pattern.ptr, - .len = pattern.len, - }; + remaining_splat -= splat_buffer.len; + } + if (remaining_splat > 0 and iovecs.len - msg.iovlen != 0) { + iovecs[msg.iovlen] = .{ .base = splat_buffer.ptr, .len = remaining_splat }; msg.iovlen += 1; - }, + } }, - } + else => for (0..splat - 1) |_| { + if (iovecs.len - msg.iovlen == 0) break; + iovecs[msg.iovlen] = .{ + .base = pattern.ptr, + .len = pattern.len, + }; + msg.iovlen += 1; + }, + }, } const flags = posix.MSG.NOSIGNAL; return io_w.consume(std.posix.sendmsg(w.file_writer.file.handle, &msg, flags) catch |err| { @@ -2186,7 +2202,8 @@ pub const Stream = struct { fn sendFile(io_w: *io.Writer, file_reader: *File.Reader, limit: io.Limit) io.Writer.FileError!usize { const w: *Writer = @fieldParentPtr("interface", io_w); - return io_w.sendFileTo(&w.file_writer.interface, file_reader, limit); + const n = try w.file_writer.interface.sendFileHeader(io_w.buffered(), file_reader, limit); + return io_w.consume(n); } }, };