std.io.Writer: introduce flush

into the vtable

also break `drainTo` and `sendFileTo` into lower level primitives
`writeSplatHeader` and `sendFileHeader` respectively. these are easier
to reason about in drain implementations.
This commit is contained in:
Andrew Kelley 2025-06-27 12:53:52 -07:00
parent 5b5243b5b7
commit 55d6341eab
3 changed files with 188 additions and 193 deletions

View File

@ -922,37 +922,41 @@ pub const BodyWriter = struct {
const bw: *BodyWriter = @fieldParentPtr("writer", w);
assert(!bw.isEliding());
const out = bw.http_protocol_output;
const n = try w.drainTo(out, data, splat);
const n = try out.writeSplatHeader(w.buffered(), data, splat);
bw.state.content_length -= n;
return n;
return w.consume(n);
}
pub fn noneDrain(w: *Writer, data: []const []const u8, splat: usize) Error!usize {
const bw: *BodyWriter = @fieldParentPtr("writer", w);
assert(!bw.isEliding());
const out = bw.http_protocol_output;
return try w.drainTo(out, data, splat);
const n = try out.writeSplatHeader(w.buffered(), data, splat);
return w.consume(n);
}
/// Returns `null` if size cannot be computed without making any syscalls.
pub fn noneSendFile(w: *Writer, file_reader: *File.Reader, limit: std.io.Limit) Writer.FileError!usize {
const bw: *BodyWriter = @fieldParentPtr("writer", w);
assert(!bw.isEliding());
return w.sendFileTo(bw.http_protocol_output, file_reader, limit);
const out = bw.http_protocol_output;
const n = try out.sendFileHeader(w.buffered(), file_reader, limit);
return w.consume(n);
}
pub fn contentLengthSendFile(w: *Writer, file_reader: *File.Reader, limit: std.io.Limit) Writer.FileError!usize {
const bw: *BodyWriter = @fieldParentPtr("writer", w);
assert(!bw.isEliding());
const n = try w.sendFileTo(bw.http_protocol_output, file_reader, limit);
const out = bw.http_protocol_output;
const n = try out.sendFileHeader(w.buffered(), file_reader, limit);
bw.state.content_length -= n;
return n;
return w.consume(n);
}
pub fn chunkedSendFile(w: *Writer, file_reader: *File.Reader, limit: std.io.Limit) Writer.FileError!usize {
const bw: *BodyWriter = @fieldParentPtr("writer", w);
assert(!bw.isEliding());
const data_len = if (file_reader.getSize()) |x| w.end + x else |_| {
const data_len = Writer.countSendFileLowerBound(w.end, file_reader, limit) orelse {
// If the file size is unknown, we cannot lower to a `sendFile` since we would
// have to flush the chunk header before knowing the chunk length.
return error.Unimplemented;
@ -965,9 +969,9 @@ pub const BodyWriter = struct {
const buffered_len = out.end - off - chunk_header_template.len;
const chunk_len = data_len + buffered_len;
writeHex(out.buffer[off..][0..chunk_len_digits], chunk_len);
const n = try w.sendFileTo(out, file_reader, limit);
const n = try out.sendFileHeader(w.buffered(), file_reader, limit);
chunked.* = .{ .chunk_len = data_len + 2 - n };
return n;
return w.consume(n);
},
.chunk_len => |chunk_len| l: switch (chunk_len) {
0 => {
@ -989,9 +993,9 @@ pub const BodyWriter = struct {
},
else => {
const new_limit = limit.min(.limited(chunk_len - 2));
const n = try w.sendFileTo(out, file_reader, new_limit);
const n = try out.sendFileHeader(w.buffered(), file_reader, new_limit);
chunked.chunk_len = chunk_len - n;
return n;
return w.consume(n);
},
},
}
@ -1001,20 +1005,19 @@ pub const BodyWriter = struct {
const bw: *BodyWriter = @fieldParentPtr("writer", w);
assert(!bw.isEliding());
const out = bw.http_protocol_output;
const data_len = Writer.countSplat(w.end, data, splat);
const data_len = w.end + Writer.countSplat(data, splat);
const chunked = &bw.state.chunked;
state: switch (chunked.*) {
.offset => |offset| {
if (out.unusedCapacityLen() >= data_len) {
assert(data_len == (w.drainTo(out, data, splat) catch unreachable));
return data_len;
return w.consume(out.writeSplatHeader(w.buffered(), data, splat) catch unreachable);
}
const buffered_len = out.end - offset - chunk_header_template.len;
const chunk_len = data_len + buffered_len;
writeHex(out.buffer[offset..][0..chunk_len_digits], chunk_len);
const n = try w.drainTo(out, data, splat);
const n = try out.writeSplatHeader(w.buffered(), data, splat);
chunked.* = .{ .chunk_len = data_len + 2 - n };
return n;
return w.consume(n);
},
.chunk_len => |chunk_len| l: switch (chunk_len) {
0 => {
@ -1035,9 +1038,9 @@ pub const BodyWriter = struct {
continue :l 1;
},
else => {
const n = try w.drainToLimit(out, data, splat, .limited(chunk_len - 2));
const n = try out.writeSplatHeaderLimit(w.buffered(), data, splat, .limited(chunk_len - 2));
chunked.chunk_len = chunk_len - n;
return n;
return w.consume(n);
},
},
}

View File

@ -23,29 +23,26 @@ count: usize = 0,
pub const VTable = struct {
/// Sends bytes to the logical sink. A write will only be sent here if it
/// could not fit into `buffer`.
/// could not fit into `buffer`, or during a `flush` operation.
///
/// `buffer[0..end]` is consumed first, followed by each slice of `data` in
/// order. Elements of `data` may alias each other but may not alias
/// `buffer`.
///
/// This function modifies `Writer.end` and `Writer.buffer`.
/// This function modifies `Writer.end` and `Writer.buffer` in an
/// implementation-defined manner.
///
/// If `data.len` is zero, it indicates this is a "flush" operation; all
/// remaining buffered data must be logically consumed. Generally, this
/// means that `end` will be set to zero before returning, however, it is
/// legal for implementations to manage that data differently. There may be
/// subsequent calls to `drain` and `sendFile` after a flush operation.
/// `data.len` must be nonzero.
///
/// The last element of `data` is special. It is repeated as necessary so
/// that it is written `splat` number of times, which may be zero.
/// The last element of `data` is repeated as necessary so that it is
/// written `splat` number of times, which may be zero.
///
/// Number of bytes actually written is returned, excluding bytes from
/// `buffer`. Bytes from `buffer` are tracked by modifying `end`.
/// Number of bytes consumed from `data` is returned, excluding bytes from
/// `buffer`.
///
/// Number of bytes returned may be zero, which does not mean
/// end-of-stream. A subsequent call may return nonzero, or signal end of
/// stream via `error.WriteFailed`.
/// Number of bytes returned may be zero, which does not indicate stream
/// end. A subsequent call may return nonzero, or signal end of stream via
/// `error.WriteFailed`.
drain: *const fn (w: *Writer, data: []const []const u8, splat: usize) Error!usize,
/// Copies contents from an open file to the logical sink. `buffer[0..end]`
@ -55,9 +52,9 @@ pub const VTable = struct {
/// `buffer` because they have already been logically written. Number of
/// bytes consumed from `buffer` are tracked by modifying `end`.
///
/// Number of bytes returned may be zero, which does not necessarily mean
/// end-of-stream. A subsequent call may return nonzero, or signal end of
/// stream via `error.WriteFailed`. Caller must check `file_reader` state
/// Number of bytes returned may be zero, which does not indicate stream
/// end. A subsequent call may return nonzero, or signal end of stream via
/// `error.WriteFailed`. Caller may check `file_reader` state
/// (`File.Reader.atEnd`) to disambiguate between a zero-length read or
/// write, and whether the file reached the end.
///
@ -71,6 +68,16 @@ pub const VTable = struct {
/// `buffer` does not count towards this limit.
limit: Limit,
) FileError!usize = unimplementedSendFile,
/// Consumes all remaining buffer.
///
/// The default flush implementation calls drain repeatedly until `end` is
/// zero, however it is legal for implementations to manage `end`
/// differently. For instance, `Allocating` flush is a no-op.
///
/// There may be subsequent calls to `drain` and `sendFile` after a `flush`
/// operation.
flush: *const fn (w: *Writer) Error!void = defaultFlush,
};
pub const Error = error{
@ -141,16 +148,15 @@ pub fn buffered(w: *const Writer) []u8 {
return w.buffer[0..w.end];
}
pub fn countSplat(n: usize, data: []const []const u8, splat: usize) usize {
assert(data.len > 0);
var total: usize = n;
pub fn countSplat(data: []const []const u8, splat: usize) usize {
var total: usize = 0;
for (data[0 .. data.len - 1]) |buf| total += buf.len;
total += data[data.len - 1].len * splat;
return total;
}
pub fn countSendFileUpperBound(n: usize, file_reader: *File.Reader, limit: Limit) ?usize {
const total: u64 = @min(@intFromEnum(limit), file_reader.getSize() orelse return null);
pub fn countSendFileLowerBound(n: usize, file_reader: *File.Reader, limit: Limit) ?usize {
const total: u64 = @min(@intFromEnum(limit), file_reader.getSize() catch return null);
return std.math.lossyCast(usize, total + n);
}
@ -167,7 +173,7 @@ pub fn writeVec(w: *Writer, data: []const []const u8) Error!usize {
pub fn writeSplat(w: *Writer, data: []const []const u8, splat: usize) Error!usize {
assert(data.len > 0);
const buffer = w.buffer;
const count = countSplat(0, data, splat);
const count = countSplat(data, splat);
if (w.end + count > buffer.len) {
const n = try w.vtable.drain(w, data, splat);
w.count += n;
@ -215,13 +221,65 @@ pub fn writeSplatLimit(
@panic("TODO");
}
/// Returns how many bytes were consumed from `header` and `data`.
pub fn writeSplatHeader(
w: *Writer,
header: []const u8,
data: []const []const u8,
splat: usize,
) Error!usize {
const new_end = w.end + header.len;
if (new_end <= w.buffer.len) {
@memcpy(w.buffer[w.end..][0..header.len], header);
w.end = new_end;
w.count += header.len;
return header.len + try writeSplat(w, data, splat);
}
var vecs: [8][]const u8 = undefined; // Arbitrarily chosen size.
var i: usize = 1;
vecs[0] = header;
for (data) |buf| {
if (buf.len == 0) continue;
vecs[i] = buf;
i += 1;
if (vecs.len - i == 0) break;
}
const new_splat = if (vecs[i - 1].ptr == data[data.len - 1].ptr) splat else 1;
const n = try w.vtable.drain(w, vecs[0..i], new_splat);
w.count += n;
return n;
}
/// Equivalent to `writeSplatHeader` but writes at most `limit` bytes.
pub fn writeSplatHeaderLimit(
w: *Writer,
header: []const u8,
data: []const []const u8,
splat: usize,
limit: Limit,
) Error!usize {
_ = w;
_ = header;
_ = data;
_ = splat;
_ = limit;
@panic("TODO");
}
/// Drains all remaining buffered data.
///
/// It is legal for `VTable.drain` implementations to refrain from modifying
/// `end`.
pub fn flush(w: *Writer) Error!void {
assert(0 == try w.vtable.drain(w, &.{}, 0));
if (w.end != 0) assert(w.vtable.drain == &fixedDrain);
return w.vtable.flush(w);
}
/// Repeatedly calls `VTable.drain` until `end` is zero.
pub fn defaultFlush(w: *Writer) Error!void {
const drainFn = w.vtable.drain;
while (w.end != 0) _ = try drainFn(w, &.{""}, 1);
}
/// Does nothing.
pub fn noopFlush(w: *Writer) Error!void {
_ = w;
}
/// Calls `VTable.drain` but hides the last `preserve_length` bytes from the
@ -236,67 +294,6 @@ pub fn drainPreserve(w: *Writer, preserve_length: usize) Error!void {
@memmove(w.buffer[w.end..][0..preserved.len], preserved);
}
/// Forwards a `drain` to a second `Writer` instance. `w` is only used for its
/// buffer, but it has its `end` and `count` adjusted accordingly depending on
/// how much was consumed.
///
/// Returns how many bytes from `data` were consumed.
pub fn drainTo(noalias w: *Writer, noalias other: *Writer, data: []const []const u8, splat: usize) Error!usize {
assert(w != other);
const header = w.buffered();
const new_end = other.end + header.len;
if (new_end <= other.buffer.len) {
@memcpy(other.buffer[other.end..][0..header.len], header);
other.end = new_end;
other.count += header.len;
w.end = 0;
const n = try other.vtable.drain(other, data, splat);
other.count += n;
return n;
}
if (other.vtable == &VectorWrapper.vtable) {
const wrapper: *VectorWrapper = @fieldParentPtr("writer", w);
while (wrapper.it.next()) |dest| {
_ = dest;
@panic("TODO");
}
}
var vecs: [8][]const u8 = undefined; // Arbitrarily chosen size.
var i: usize = 1;
vecs[0] = header;
for (data) |buf| {
if (buf.len == 0) continue;
vecs[i] = buf;
i += 1;
if (vecs.len - i == 0) break;
}
const new_splat = if (vecs[i - 1].ptr == data[data.len - 1].ptr) splat else 1;
const n = try other.vtable.drain(other, vecs[0..i], new_splat);
other.count += n;
if (n < header.len) {
const remaining = w.buffer[n..w.end];
@memmove(w.buffer[0..remaining.len], remaining);
w.end = remaining.len;
return 0;
}
defer w.end = 0;
return n - header.len;
}
pub fn drainToLimit(
noalias w: *Writer,
noalias other: *Writer,
data: []const []const u8,
splat: usize,
limit: Limit,
) Error!usize {
assert(w != other);
_ = data;
_ = splat;
_ = limit;
@panic("TODO");
}
pub fn unusedCapacitySlice(w: *const Writer) []u8 {
return w.buffer[w.end..];
}
@ -672,47 +669,25 @@ pub fn sendFile(w: *Writer, file_reader: *File.Reader, limit: Limit) FileError!u
return w.vtable.sendFile(w, file_reader, limit);
}
/// Forwards a `sendFile` to a second `Writer` instance. `w` is only used for
/// its buffer, but it has its `end` and `count` adjusted accordingly depending
/// on how much was consumed.
///
/// Returns how many bytes from `file_reader` were consumed.
pub fn sendFileTo(
noalias w: *Writer,
noalias other: *Writer,
/// Returns how many bytes from `header` and `file_reader` were consumed.
pub fn sendFileHeader(
w: *Writer,
header: []const u8,
file_reader: *File.Reader,
limit: Limit,
) FileError!usize {
assert(w != other);
const header = w.buffered();
const new_end = other.end + header.len;
if (new_end <= other.buffer.len) {
@memcpy(other.buffer[other.end..][0..header.len], header);
other.end = new_end;
other.count += header.len;
w.end = 0;
return other.vtable.sendFile(other, file_reader, limit);
const new_end = w.end + header.len;
if (new_end <= w.buffer.len) {
@memcpy(w.buffer[w.end..][0..header.len], header);
w.end = new_end;
w.count += header.len;
return header.len + try w.vtable.sendFile(w, file_reader, limit);
}
assert(header.len > 0);
var vec_buf: [2][]const u8 = .{ header, undefined };
var vec_i: usize = 1;
const buffered_contents = limit.slice(file_reader.interface.buffered());
if (buffered_contents.len > 0) {
vec_buf[vec_i] = buffered_contents;
vec_i += 1;
}
const n = try other.vtable.drain(other, vec_buf[0..vec_i], 1);
other.count += n;
if (n < header.len) {
const remaining = w.buffer[n..w.end];
@memmove(w.buffer[0..remaining.len], remaining);
w.end = remaining.len;
return 0;
}
w.end = 0;
const tossed = n - header.len;
file_reader.interface.toss(tossed);
return tossed;
const n = try w.vtable.drain(w, &.{ header, buffered_contents }, 1);
w.count += n;
file_reader.interface.toss(n - header.len);
return n;
}
/// Asserts nonzero buffer capacity.
@ -2126,6 +2101,7 @@ pub const Allocating = struct {
const vtable: VTable = .{
.drain = Allocating.drain,
.sendFile = Allocating.sendFile,
.flush = noopFlush,
};
pub fn deinit(a: *Allocating) void {
@ -2172,7 +2148,6 @@ pub const Allocating = struct {
}
fn drain(w: *Writer, data: []const []const u8, splat: usize) Error!usize {
if (data.len == 0) return 0; // flush
const a: *Allocating = @fieldParentPtr("interface", w);
const gpa = a.allocator;
const pattern = data[data.len - 1];

View File

@ -1999,44 +1999,56 @@ pub const Stream = struct {
const w: *Writer = @fieldParentPtr("interface", io_w);
const buffered = io_w.buffered();
comptime assert(native_os == .windows);
var splat_buffer: [splat_buffer_len]u8 = undefined;
var iovecs: [max_buffers_len]windows.WSABUF = undefined;
var len: u32 = 0;
if (buffered.len != 0) {
iovecs[len] = .{
.base = buffered.ptr,
.buf = buffered.ptr,
.len = buffered.len,
};
len += 1;
}
for (data[0..data.len]) |bytes| {
for (data) |bytes| {
if (bytes.len == 0) continue;
iovecs[len] = .{
.buf = bytes.ptr,
.len = bytes.len,
};
len += 1;
if (iovecs.len - len == 0) break;
}
if (len == 0) return 0;
const pattern = data[data.len - 1];
switch (splat) {
0 => len -= 1,
0 => if (iovecs[len - 1].buf == data[data.len - 1].ptr) {
len -= 1;
},
1 => {},
else => switch (pattern.len) {
0 => {},
1 => {
1 => memset: {
// Replace the 1-byte buffer with a bigger one.
if (iovecs[len - 1].buf == data[data.len - 1].ptr) len -= 1;
if (iovecs.len - len == 0) break :memset;
const splat_buffer_candidate = io_w.buffer[io_w.end..];
var backup_buffer: [32]u8 = undefined;
const splat_buffer = if (splat_buffer_candidate.len >= backup_buffer.len)
splat_buffer_candidate
else
&backup_buffer;
const memset_len = @min(splat_buffer.len, splat);
const buf = splat_buffer[0..memset_len];
@memset(buf, pattern[0]);
iovecs[len - 1] = .{ .buf = buf.ptr, .len = buf.len };
iovecs[len] = .{ .buf = buf.ptr, .len = buf.len };
len += 1;
var remaining_splat = splat - buf.len;
while (remaining_splat > splat_buffer.len and len < iovecs.len) {
iovecs[len] = .{ .buf = &splat_buffer, .len = splat_buffer.len };
iovecs[len] = .{ .buf = splat_buffer.ptr, .len = splat_buffer.len };
remaining_splat -= splat_buffer.len;
len += 1;
}
if (remaining_splat > 0 and len < iovecs.len) {
iovecs[len] = .{ .buf = &splat_buffer, .len = remaining_splat };
if (remaining_splat > 0 and iovecs.len - len != 0) {
iovecs[len] = .{ .buf = splat_buffer.ptr, .len = remaining_splat };
len += 1;
}
},
@ -2134,48 +2146,52 @@ pub const Stream = struct {
.flags = 0,
};
};
if (data.len != 0) {
const pattern = data[data.len - 1];
switch (splat) {
0 => if (msg.iovlen != 0 and iovecs[msg.iovlen - 1].base == data[data.len - 1].ptr) {
msg.iovlen -= 1;
},
1 => {},
else => switch (pattern.len) {
0 => {},
1 => memset: {
// Replace the 1-byte buffer with a bigger one.
if (msg.iovlen != 0 and iovecs[msg.iovlen - 1].base == data[data.len - 1].ptr)
msg.iovlen -= 1;
if (iovecs.len - msg.iovlen == 0) break :memset;
const splat_buffer = io_w.buffer[io_w.end..];
const memset_len = @min(splat_buffer.len, splat);
const buf = splat_buffer[0..memset_len];
@memset(buf, pattern[0]);
iovecs[msg.iovlen] = .{ .base = buf.ptr, .len = buf.len };
if (msg.iovlen == 0) return 0;
const pattern = data[data.len - 1];
switch (splat) {
0 => if (iovecs[msg.iovlen - 1].base == data[data.len - 1].ptr) {
msg.iovlen -= 1;
},
1 => {},
else => switch (pattern.len) {
0 => {},
1 => memset: {
// Replace the 1-byte buffer with a bigger one.
if (iovecs[msg.iovlen - 1].base == data[data.len - 1].ptr) msg.iovlen -= 1;
if (iovecs.len - msg.iovlen == 0) break :memset;
const splat_buffer_candidate = io_w.buffer[io_w.end..];
var backup_buffer: [32]u8 = undefined;
const splat_buffer = if (splat_buffer_candidate.len >= backup_buffer.len)
splat_buffer_candidate
else
&backup_buffer;
if (splat_buffer.len == 0) break :memset;
const memset_len = @min(splat_buffer.len, splat);
const buf = splat_buffer[0..memset_len];
@memset(buf, pattern[0]);
iovecs[msg.iovlen] = .{ .base = buf.ptr, .len = buf.len };
msg.iovlen += 1;
var remaining_splat = splat - buf.len;
while (remaining_splat > splat_buffer.len and iovecs.len - msg.iovlen != 0) {
assert(buf.len == splat_buffer.len);
iovecs[msg.iovlen] = .{ .base = splat_buffer.ptr, .len = splat_buffer.len };
msg.iovlen += 1;
var remaining_splat = splat - buf.len;
while (remaining_splat > splat_buffer.len and iovecs.len - msg.iovlen != 0) {
assert(buf.len == splat_buffer.len);
iovecs[msg.iovlen] = .{ .base = splat_buffer.ptr, .len = splat_buffer.len };
msg.iovlen += 1;
remaining_splat -= splat_buffer.len;
}
if (remaining_splat > 0 and iovecs.len - msg.iovlen != 0) {
iovecs[msg.iovlen] = .{ .base = splat_buffer.ptr, .len = remaining_splat };
msg.iovlen += 1;
}
},
else => for (0..splat - 1) |_| {
if (iovecs.len - msg.iovlen == 0) break;
iovecs[msg.iovlen] = .{
.base = pattern.ptr,
.len = pattern.len,
};
remaining_splat -= splat_buffer.len;
}
if (remaining_splat > 0 and iovecs.len - msg.iovlen != 0) {
iovecs[msg.iovlen] = .{ .base = splat_buffer.ptr, .len = remaining_splat };
msg.iovlen += 1;
},
}
},
}
else => for (0..splat - 1) |_| {
if (iovecs.len - msg.iovlen == 0) break;
iovecs[msg.iovlen] = .{
.base = pattern.ptr,
.len = pattern.len,
};
msg.iovlen += 1;
},
},
}
const flags = posix.MSG.NOSIGNAL;
return io_w.consume(std.posix.sendmsg(w.file_writer.file.handle, &msg, flags) catch |err| {
@ -2186,7 +2202,8 @@ pub const Stream = struct {
fn sendFile(io_w: *io.Writer, file_reader: *File.Reader, limit: io.Limit) io.Writer.FileError!usize {
const w: *Writer = @fieldParentPtr("interface", io_w);
return io_w.sendFileTo(&w.file_writer.interface, file_reader, limit);
const n = try w.file_writer.interface.sendFileHeader(io_w.buffered(), file_reader, limit);
return io_w.consume(n);
}
},
};