std.http compiling again

This commit is contained in:
Andrew Kelley 2025-06-26 13:09:08 -07:00
parent d4545f216a
commit 9f8486170c
13 changed files with 290 additions and 237 deletions

View File

@ -2061,7 +2061,7 @@ pub fn runAllowFail(
try child.spawn();
var file_reader = child.stdout.?.readerStreaming();
const stdout = try file_reader.interface().readRemainingAlloc(b.allocator, .limited(max_output_size));
const stdout = try file_reader.interface().allocRemaining(b.allocator, .limited(max_output_size));
errdefer b.allocator.free(stdout);
const term = try child.wait();

View File

@ -663,7 +663,7 @@ pub const Manifest = struct {
const input_file_count = self.files.entries.len;
var manifest_reader = self.manifest_file.?.reader(); // Reads positionally from zero.
const limit: std.io.Limit = .limited(manifest_file_size_max);
const file_contents = manifest_reader.interface().readRemainingAlloc(gpa, limit) catch |err| switch (err) {
const file_contents = manifest_reader.interface().allocRemaining(gpa, limit) catch |err| switch (err) {
error.OutOfMemory => return error.OutOfMemory,
error.StreamTooLong => return error.OutOfMemory,
error.ReadFailed => {

View File

@ -1788,7 +1788,7 @@ fn evalGeneric(run: *Run, child: *std.process.Child) !StdIoResult {
stderr_bytes = poller.reader(.stderr).buffered();
} else {
var fr = stdout.readerStreaming();
stdout_bytes = fr.interface().readRemainingAlloc(arena, run.stdio_limit) catch |err| switch (err) {
stdout_bytes = fr.interface().allocRemaining(arena, run.stdio_limit) catch |err| switch (err) {
error.OutOfMemory => return error.OutOfMemory,
error.ReadFailed => return fr.err.?,
error.StreamTooLong => return error.StdoutStreamTooLong,
@ -1796,7 +1796,7 @@ fn evalGeneric(run: *Run, child: *std.process.Child) !StdIoResult {
}
} else if (child.stderr) |stderr| {
var fr = stderr.readerStreaming();
stderr_bytes = fr.interface().readRemainingAlloc(arena, run.stdio_limit) catch |err| switch (err) {
stderr_bytes = fr.interface().allocRemaining(arena, run.stdio_limit) catch |err| switch (err) {
error.OutOfMemory => return error.OutOfMemory,
error.ReadFailed => return fr.err.?,
error.StreamTooLong => return error.StderrStreamTooLong,

View File

@ -465,11 +465,11 @@ fn merge_paths(base: Component, new: []u8, aux_buf: *[]u8) error{NoSpaceLeft}!Co
var aux: Writer = .fixed(aux_buf.*);
if (!base.isEmpty()) {
aux.print("{fpath}", .{base}) catch return error.NoSpaceLeft;
aux.end = std.mem.lastIndexOfScalar(u8, aux.getWritten(), '/') orelse
aux.end = std.mem.lastIndexOfScalar(u8, aux.buffered(), '/') orelse
return remove_dot_segments(new);
}
aux.print("/{s}", .{new}) catch return error.NoSpaceLeft;
const merged_path = remove_dot_segments(aux.getWritten());
const merged_path = remove_dot_segments(aux.buffered());
aux_buf.* = aux_buf.*[merged_path.percent_encoded.len..];
return merged_path;
}

View File

@ -8,7 +8,7 @@ const Writer = std.io.Writer;
const Reader = std.io.Reader;
input: *Reader,
interface: Reader,
reader: Reader,
/// Hashes, produces checksum, of uncompressed data for gzip/zlib footer.
hasher: Container.Hasher,
@ -51,7 +51,7 @@ pub const Error = Container.Error || error{
pub fn init(input: *Reader, container: Container, buffer: []u8) Decompress {
return .{
.interface = .{
.reader = .{
// TODO populate discard so that when an amount is discarded that
// includes an entire frame, skip decoding that frame.
.vtable = &.{ .stream = stream },
@ -130,7 +130,7 @@ fn decodeSymbol(self: *Decompress, decoder: anytype) !Symbol {
}
pub fn stream(r: *Reader, w: *Writer, limit: std.io.Limit) Reader.StreamError!usize {
const d: *Decompress = @alignCast(@fieldParentPtr("interface", r));
const d: *Decompress = @alignCast(@fieldParentPtr("reader", r));
return readInner(d, w, limit) catch |err| switch (err) {
error.EndOfStream => return error.EndOfStream,
error.WriteFailed => return error.WriteFailed,
@ -247,7 +247,7 @@ fn readInner(d: *Decompress, w: *Writer, limit: std.io.Limit) (Error || Reader.S
}
},
.stored_block => |remaining_len| {
const out = try w.writableSliceGreedyPreserving(flate.history_len, 1);
const out = try w.writableSliceGreedyPreserve(flate.history_len, 1);
const limited_out = limit.min(.limited(remaining_len)).slice(out);
const n = try d.input.readVec(&.{limited_out});
if (remaining_len - n == 0) {
@ -263,7 +263,7 @@ fn readInner(d: *Decompress, w: *Writer, limit: std.io.Limit) (Error || Reader.S
while (@intFromEnum(limit) > w.count - start) {
const code = try d.readFixedCode();
switch (code) {
0...255 => try w.writeBytePreserving(flate.history_len, @intCast(code)),
0...255 => try w.writeBytePreserve(flate.history_len, @intCast(code)),
256 => {
d.state = if (d.final_block) .protocol_footer else .block_header;
return w.count - start;
@ -289,7 +289,7 @@ fn readInner(d: *Decompress, w: *Writer, limit: std.io.Limit) (Error || Reader.S
const sym = try d.decodeSymbol(&d.lit_dec);
switch (sym.kind) {
.literal => try w.writeBytePreserving(flate.history_len, sym.symbol),
.literal => try w.writeBytePreserve(flate.history_len, sym.symbol),
.match => {
// Decode match backreference <length, distance>
const length = try d.decodeLength(sym.symbol);

View File

@ -7,7 +7,7 @@ const zstd = @import("../zstd.zig");
const Writer = std.io.Writer;
input: *Reader,
interface: Reader,
reader: Reader,
state: State,
verify_checksum: bool,
err: ?Error = null,
@ -68,7 +68,7 @@ pub fn init(input: *Reader, buffer: []u8, options: Options) Decompress {
.input = input,
.state = .new_frame,
.verify_checksum = options.verify_checksum,
.interface = .{
.reader = .{
.vtable = &.{ .stream = stream },
.buffer = buffer,
.seek = 0,
@ -78,7 +78,7 @@ pub fn init(input: *Reader, buffer: []u8, options: Options) Decompress {
}
fn stream(r: *Reader, w: *Writer, limit: Limit) Reader.StreamError!usize {
const d: *Decompress = @alignCast(@fieldParentPtr("interface", r));
const d: *Decompress = @alignCast(@fieldParentPtr("reader", r));
const in = d.input;
switch (d.state) {

View File

@ -2242,7 +2242,7 @@ pub const ElfModule = struct {
var zlib_stream: std.compress.flate.Decompress = .init(&section_reader, .zlib, &.{});
const decompressed_section = zlib_stream.interface.allocRemaining(gpa, .limited(ch_size)) catch
const decompressed_section = zlib_stream.reader.allocRemaining(gpa, .limited(ch_size)) catch
continue;
if (decompressed_section.len != ch_size) {
gpa.free(decompressed_section);

View File

@ -328,6 +328,9 @@ pub const Header = struct {
pub const Reader = struct {
in: *std.io.Reader,
/// This is preallocated memory that might be used by `bodyReader`. That
/// function might return a pointer to this field, or a different
/// `*std.io.Reader`. Advisable to not access this field directly.
interface: std.io.Reader,
/// Keeps track of whether the stream is ready to accept a new request,
/// making invalid API usage cause assertion failures rather than HTTP
@ -489,31 +492,31 @@ pub const Reader = struct {
content_encoding: ContentEncoding,
decompressor: *Decompressor,
decompression_buffer: []u8,
) std.io.Reader {
) *std.io.Reader {
if (transfer_encoding == .none and content_length == null) {
assert(reader.state == .received_head);
reader.state = .body_none;
switch (content_encoding) {
.identity => {
return reader.in.reader();
return reader.in;
},
.deflate => {
decompressor.compression = .{ .deflate = .init(reader.in) };
return decompressor.compression.deflate.reader();
decompressor.* = .{ .flate = .init(reader.in, .raw, decompression_buffer) };
return &decompressor.flate.reader;
},
.gzip => {
decompressor.compression = .{ .gzip = .init(reader.in) };
return decompressor.compression.gzip.reader();
decompressor.* = .{ .flate = .init(reader.in, .gzip, decompression_buffer) };
return &decompressor.flate.reader;
},
.zstd => {
decompressor.compression = .{ .zstd = .init(reader.in, .{ .verify_checksum = false }) };
return decompressor.compression.zstd.reader();
decompressor.* = .{ .zstd = .init(reader.in, decompression_buffer, .{ .verify_checksum = false }) };
return &decompressor.zstd.reader;
},
.compress => unreachable,
}
}
const transfer_reader = bodyReader(reader, transfer_encoding, content_length);
return decompressor.reader(transfer_reader, decompression_buffer, content_encoding);
const transfer_reader = bodyReader(reader, &.{}, transfer_encoding, content_length);
return decompressor.init(transfer_reader, decompression_buffer, content_encoding);
}
fn contentLengthStream(
@ -711,42 +714,33 @@ pub const Reader = struct {
}
};
pub const Decompressor = struct {
compression: Compression,
buffered_reader: std.io.Reader,
pub const Decompressor = union(enum) {
flate: std.compress.flate.Decompress,
zstd: std.compress.zstd.Decompress,
none: *std.io.Reader,
pub const Compression = union(enum) {
deflate: std.compress.flate.Decompressor,
gzip: std.compress.flate.Decompressor,
zstd: std.compress.zstd.Decompress,
none: void,
};
pub fn reader(
pub fn init(
decompressor: *Decompressor,
transfer_reader: std.io.Reader,
transfer_reader: *std.io.Reader,
buffer: []u8,
content_encoding: ContentEncoding,
) std.io.Reader {
) *std.io.Reader {
switch (content_encoding) {
.identity => {
decompressor.compression = .none;
decompressor.* = .{ .none = transfer_reader };
return transfer_reader;
},
.deflate => {
decompressor.buffered_reader = transfer_reader.buffered(buffer);
decompressor.compression = .{ .deflate = .init(&decompressor.buffered_reader) };
return decompressor.compression.deflate.reader();
decompressor.* = .{ .flate = .init(transfer_reader, .raw, buffer) };
return &decompressor.flate.reader;
},
.gzip => {
decompressor.buffered_reader = transfer_reader.buffered(buffer);
decompressor.compression = .{ .gzip = .init(&decompressor.buffered_reader) };
return decompressor.compression.gzip.reader();
decompressor.* = .{ .flate = .init(transfer_reader, .gzip, buffer) };
return &decompressor.flate.reader;
},
.zstd => {
decompressor.buffered_reader = transfer_reader.buffered(buffer);
decompressor.compression = .{ .zstd = .init(&decompressor.buffered_reader, .{}) };
return decompressor.compression.gzip.reader();
decompressor.* = .{ .zstd = .init(transfer_reader, buffer, .{ .verify_checksum = false }) };
return &decompressor.zstd.reader;
},
.compress => unreachable,
}
@ -759,7 +753,7 @@ pub const BodyWriter = struct {
/// state of this other than via methods of `BodyWriter`.
http_protocol_output: *Writer,
state: State,
interface: Writer,
writer: Writer,
pub const Error = Writer.Error;
@ -797,7 +791,7 @@ pub const BodyWriter = struct {
};
pub fn isEliding(w: *const BodyWriter) bool {
return w.interface.vtable.drain == Writer.discardingDrain;
return w.writer.vtable.drain == Writer.discardingDrain;
}
/// Sends all buffered data across `BodyWriter.http_protocol_output`.
@ -924,42 +918,42 @@ pub const BodyWriter = struct {
w.state = .end;
}
fn contentLengthDrain(w: *Writer, data: []const []const u8, splat: usize) Error!usize {
const bw: *BodyWriter = @fieldParentPtr("interface", w);
pub fn contentLengthDrain(w: *Writer, data: []const []const u8, splat: usize) Error!usize {
const bw: *BodyWriter = @fieldParentPtr("writer", w);
assert(!bw.isEliding());
const out = w.http_protocol_output;
const out = bw.http_protocol_output;
const n = try w.drainTo(out, data, splat);
w.state.content_length -= n;
bw.state.content_length -= n;
return n;
}
fn noneDrain(w: *Writer, data: []const []const u8, splat: usize) Error!usize {
const bw: *BodyWriter = @fieldParentPtr("interface", w);
pub fn noneDrain(w: *Writer, data: []const []const u8, splat: usize) Error!usize {
const bw: *BodyWriter = @fieldParentPtr("writer", w);
assert(!bw.isEliding());
const out = w.http_protocol_output;
const out = bw.http_protocol_output;
return try w.drainTo(out, data, splat);
}
/// Returns `null` if size cannot be computed without making any syscalls.
fn noneSendFile(w: *Writer, file_reader: *File.Reader, limit: std.io.Limit) Writer.FileError!usize {
const bw: *BodyWriter = @fieldParentPtr("interface", w);
pub fn noneSendFile(w: *Writer, file_reader: *File.Reader, limit: std.io.Limit) Writer.FileError!usize {
const bw: *BodyWriter = @fieldParentPtr("writer", w);
assert(!bw.isEliding());
return w.sendFileTo(bw.http_protocol_output, file_reader, limit);
}
fn contentLengthSendFile(w: *Writer, file_reader: *File.Reader, limit: std.io.Limit) Writer.FileError!usize {
const bw: *BodyWriter = @fieldParentPtr("interface", w);
pub fn contentLengthSendFile(w: *Writer, file_reader: *File.Reader, limit: std.io.Limit) Writer.FileError!usize {
const bw: *BodyWriter = @fieldParentPtr("writer", w);
assert(!bw.isEliding());
const n = try w.sendFileTo(bw.http_protocol_output, file_reader, limit);
bw.state.content_length -= n;
return n;
}
fn chunkedSendFile(w: *Writer, file_reader: *File.Reader, limit: std.io.Limit) Writer.FileError!usize {
const bw: *BodyWriter = @fieldParentPtr("interface", w);
pub fn chunkedSendFile(w: *Writer, file_reader: *File.Reader, limit: std.io.Limit) Writer.FileError!usize {
const bw: *BodyWriter = @fieldParentPtr("writer", w);
assert(!bw.isEliding());
const data_len = w.countSendFileUpperBound(file_reader, limit) orelse {
// If the file size is unknown, we cannot lower to a `writeFile` since we would
const data_len = if (file_reader.getSize()) |x| w.end + x else |_| {
// If the file size is unknown, we cannot lower to a `sendFile` since we would
// have to flush the chunk header before knowing the chunk length.
return error.Unimplemented;
};
@ -1003,10 +997,10 @@ pub const BodyWriter = struct {
}
}
fn chunkedDrain(w: *Writer, data: []const []const u8, splat: usize) Error!usize {
const bw: *BodyWriter = @fieldParentPtr("interface", w);
pub fn chunkedDrain(w: *Writer, data: []const []const u8, splat: usize) Error!usize {
const bw: *BodyWriter = @fieldParentPtr("writer", w);
assert(!bw.isEliding());
const out = w.http_protocol_output;
const out = bw.http_protocol_output;
const data_len = Writer.countSplat(w.end, data, splat);
const chunked = &bw.state.chunked;
state: switch (chunked.*) {
@ -1018,7 +1012,7 @@ pub const BodyWriter = struct {
const buffered_len = out.end - offset - chunk_header_template.len;
const chunk_len = data_len + buffered_len;
writeHex(out.buffer[offset..][0..chunk_len_digits], chunk_len);
const n = try w.drainTo(w, data, splat);
const n = try w.drainTo(out, data, splat);
chunked.* = .{ .chunk_len = data_len + 2 - n };
return n;
},
@ -1041,7 +1035,7 @@ pub const BodyWriter = struct {
continue :l 1;
},
else => {
const n = try w.drainToLimit(data, splat, .limited(chunk_len - 2));
const n = try w.drainToLimit(out, data, splat, .limited(chunk_len - 2));
chunked.chunk_len = chunk_len - n;
return n;
},

View File

@ -408,7 +408,7 @@ pub const Connection = struct {
/// HTTP protocol from server to client.
/// This either comes directly from `stream_reader`, or from a TLS client.
pub fn reader(c: *const Connection) *Reader {
pub fn reader(c: *Connection) *Reader {
return switch (c.protocol) {
.tls => {
if (disable_tls) unreachable;
@ -682,7 +682,7 @@ pub const Response = struct {
///
/// See also:
/// * `readerDecompressing`
pub fn reader(response: *Response, buffer: []u8) Reader {
pub fn reader(response: *Response, buffer: []u8) *Reader {
const req = response.request;
if (!req.method.responseHasBody()) return .ending;
const head = &response.head;
@ -702,7 +702,7 @@ pub const Response = struct {
response: *Response,
decompressor: *http.Decompressor,
decompression_buffer: []u8,
) Reader {
) *Reader {
const head = &response.head;
return response.request.reader.bodyReaderDecompressing(
head.transfer_encoding,
@ -864,14 +864,14 @@ pub const Request = struct {
pub fn sendBodyUnflushed(r: *Request, buffer: []u8) Writer.Error!http.BodyWriter {
assert(r.method.requestHasBody());
try sendHead(r);
const http_protocol_output = &r.connection.?.writer;
const http_protocol_output = r.connection.?.writer();
return switch (r.transfer_encoding) {
.chunked => .{
.http_protocol_output = http_protocol_output,
.state = .{ .chunked = .init },
.interface = .{
.writer = .{
.buffer = buffer,
.interface = &.{
.vtable = &.{
.drain = http.BodyWriter.chunkedDrain,
.sendFile = http.BodyWriter.chunkedSendFile,
},
@ -880,9 +880,9 @@ pub const Request = struct {
.content_length => |len| .{
.http_protocol_output = http_protocol_output,
.state = .{ .content_length = len },
.interface = .{
.writer = .{
.buffer = buffer,
.interface = &.{
.vtable = &.{
.drain = http.BodyWriter.contentLengthDrain,
.sendFile = http.BodyWriter.contentLengthSendFile,
},
@ -891,9 +891,9 @@ pub const Request = struct {
.none => .{
.http_protocol_output = http_protocol_output,
.state = .none,
.interface = .{
.writer = .{
.buffer = buffer,
.interface = &.{
.vtable = &.{
.drain = http.BodyWriter.noneDrain,
.sendFile = http.BodyWriter.noneSendFile,
},
@ -906,7 +906,7 @@ pub const Request = struct {
fn sendHead(r: *Request) Writer.Error!void {
const uri = r.uri;
const connection = r.connection.?;
const w = &connection.writer;
const w = connection.writer();
try r.method.write(w);
try w.writeByte(' ');
@ -1085,7 +1085,7 @@ pub const Request = struct {
if (head.status.class() == .redirect and r.redirect_behavior != .unhandled) {
if (r.redirect_behavior == .not_allowed) {
// Connection can still be reused by skipping the body.
var reader = r.reader.bodyReader(head.transfer_encoding, head.content_length);
const reader = r.reader.bodyReader(&.{}, head.transfer_encoding, head.content_length);
_ = reader.discardRemaining() catch |err| switch (err) {
error.ReadFailed => connection.closing = true,
};
@ -1117,7 +1117,7 @@ pub const Request = struct {
{
// Skip the body of the redirect response to leave the connection in
// the correct state. This causes `new_location` to be invalidated.
var reader = r.reader.bodyReader(head.transfer_encoding, head.content_length);
const reader = r.reader.bodyReader(&.{}, head.transfer_encoding, head.content_length);
_ = reader.discardRemaining() catch |err| switch (err) {
error.ReadFailed => return r.reader.body_err.?,
};
@ -1169,8 +1169,10 @@ pub const Request = struct {
r.uri = new_uri;
r.connection = new_connection;
r.reader = .{
.in = &new_connection.reader,
.in = new_connection.reader(),
.state = .ready,
// Populated when `http.Reader.bodyReader` is called.
.interface = undefined,
};
r.redirect_behavior.subtractOne();
}
@ -1292,12 +1294,12 @@ pub const basic_authorization = struct {
pub fn write(uri: Uri, out: *Writer) Writer.Error!void {
var buf: [max_user_len + ":".len + max_password_len]u8 = undefined;
var bw: Writer = .fixed(&buf);
bw.print("{fuser}:{fpassword}", .{
var w: Writer = .fixed(&buf);
w.print("{fuser}:{fpassword}", .{
uri.user orelse Uri.Component.empty,
uri.password orelse Uri.Component.empty,
}) catch unreachable;
try out.print("Basic {b64}", .{bw.getWritten()});
try out.print("Basic {b64}", .{w.buffered()});
}
};
@ -1622,8 +1624,10 @@ pub fn request(
.client = client,
.connection = connection,
.reader = .{
.in = &connection.reader,
.in = connection.reader(),
.state = .ready,
// Populated when `http.Reader.bodyReader` is called.
.interface = undefined,
},
.keep_alive = options.keep_alive,
.method = method,
@ -1711,9 +1715,8 @@ pub fn fetch(client: *Client, options: FetchOptions) FetchError!FetchResult {
if (options.payload) |payload| {
req.transfer_encoding = .{ .content_length = payload.len };
var body = try req.sendBody();
var bw = body.writer().unbuffered();
try bw.writeAll(payload);
var body = try req.sendBody(&.{});
try body.writer.writeAll(payload);
try body.end();
} else {
try req.sendBodiless();
@ -1726,7 +1729,7 @@ pub fn fetch(client: *Client, options: FetchOptions) FetchError!FetchResult {
var response = try req.receiveHead(redirect_buffer);
const storage = options.response_storage orelse {
var reader = response.reader();
const reader = response.reader(&.{});
_ = reader.discardRemaining() catch |err| switch (err) {
error.ReadFailed => return response.bodyErr().?,
};
@ -1741,18 +1744,17 @@ pub fn fetch(client: *Client, options: FetchOptions) FetchError!FetchResult {
defer if (options.decompress_buffer == null) client.allocator.free(decompress_buffer);
var decompressor: http.Decompressor = undefined;
var reader = response.readerDecompressing(&decompressor, decompress_buffer);
const reader = response.readerDecompressing(&decompressor, decompress_buffer);
const list = storage.list;
if (storage.allocator) |allocator| {
reader.readRemainingArrayList(allocator, null, list, storage.append_limit, 128) catch |err| switch (err) {
reader.appendRemaining(allocator, null, list, storage.append_limit) catch |err| switch (err) {
error.ReadFailed => return response.bodyErr().?,
else => |e| return e,
};
} else {
var br = reader.unbuffered();
const buf = storage.append_limit.slice(list.unusedCapacitySlice());
list.items.len += br.readSliceShort(buf) catch |err| switch (err) {
list.items.len += reader.readSliceShort(buf) catch |err| switch (err) {
error.ReadFailed => return response.bodyErr().?,
};
}

View File

@ -26,6 +26,8 @@ pub fn init(in: *std.io.Reader, out: *Writer) Server {
.reader = .{
.in = in,
.state = .ready,
// Populated when `http.Reader.bodyReader` is called.
.interface = undefined,
},
.out = out,
};
@ -58,7 +60,7 @@ pub const Request = struct {
/// Pointers in this struct are invalidated with the next call to
/// `receiveHead`.
head: Head,
respond_err: ?RespondError,
respond_err: ?RespondError = null,
pub const RespondError = error{
/// The request contained an `expect` header with an unrecognized value.
@ -243,6 +245,7 @@ pub const Request = struct {
.in = undefined,
.state = .received_head,
.head_buffer = @constCast(request_bytes),
.interface = undefined,
},
.out = undefined,
};
@ -381,8 +384,6 @@ pub const Request = struct {
content_length: ?u64 = null,
/// Options that are shared with the `respond` method.
respond_options: RespondOptions = .{},
/// Used by `http.BodyWriter`.
buffer: []u8,
};
/// The header is not guaranteed to be sent until `BodyWriter.flush` or
@ -400,7 +401,11 @@ pub const Request = struct {
/// be done to satisfy the request.
///
/// Asserts status is not `continue`.
pub fn respondStreaming(request: *Request, options: RespondStreamingOptions) Writer.Error!http.BodyWriter {
pub fn respondStreaming(
request: *Request,
buffer: []u8,
options: RespondStreamingOptions,
) ExpectContinueError!http.BodyWriter {
try writeExpectContinue(request);
const o = options.respond_options;
assert(o.status != .@"continue");
@ -448,12 +453,12 @@ pub const Request = struct {
return if (elide_body) .{
.http_protocol_output = request.server.out,
.state = state,
.interface = .discarding(options.buffer),
.writer = .discarding(buffer),
} else .{
.http_protocol_output = request.server.out,
.state = state,
.interface = .{
.buffer = options.buffer,
.writer = .{
.buffer = buffer,
.vtable = switch (state) {
.none => &.{
.drain = http.BodyWriter.noneDrain,
@ -559,11 +564,11 @@ pub const Request = struct {
///
/// See `readerExpectNone` for an infallible alternative that cannot write
/// to the server output stream.
pub fn readerExpectContinue(request: *Request) ExpectContinueError!std.io.Reader {
pub fn readerExpectContinue(request: *Request, buffer: []u8) ExpectContinueError!*std.io.Reader {
const flush = request.head.expect != null;
try writeExpectContinue(request);
if (flush) try request.server.out.flush();
return readerExpectNone(request);
return readerExpectNone(request, buffer);
}
/// Asserts the expect header is `null`. The caller must handle the
@ -571,11 +576,11 @@ pub const Request = struct {
/// this function.
///
/// Asserts that this function is only called once.
pub fn readerExpectNone(request: *Request) std.io.Reader {
pub fn readerExpectNone(request: *Request, buffer: []u8) *std.io.Reader {
assert(request.server.reader.state == .received_head);
assert(request.head.expect == null);
if (!request.head.method.requestHasBody()) return .ending;
return request.server.reader.bodyReader(request.head.transfer_encoding, request.head.content_length);
return request.server.reader.bodyReader(buffer, request.head.transfer_encoding, request.head.content_length);
}
pub const ExpectContinueError = error{
@ -611,7 +616,7 @@ pub const Request = struct {
.received_head => {
if (request.head.method.requestHasBody()) {
assert(request.head.transfer_encoding != .none or request.head.content_length != null);
const reader_interface = request.reader() catch return false;
const reader_interface = request.readerExpectContinue(&.{}) catch return false;
_ = reader_interface.discardRemaining() catch return false;
assert(r.state == .ready);
} else {

View File

@ -21,7 +21,7 @@ test "trailers" {
var connection_br = connection.stream.reader(&recv_buffer);
var connection_bw = connection.stream.writer(&send_buffer);
var server = http.Server.init(&connection_br, &connection_bw);
var server = http.Server.init(connection_br.interface(), &connection_bw.interface);
try expectEqual(.ready, server.reader.state);
var request = try server.receiveHead();
@ -33,11 +33,10 @@ test "trailers" {
fn serve(request: *http.Server.Request) !void {
try expectEqualStrings(request.head.target, "/trailer");
var response = try request.respondStreaming(.{});
var bw = response.writer().unbuffered();
try bw.writeAll("Hello, ");
var response = try request.respondStreaming(&.{}, .{});
try response.writer.writeAll("Hello, ");
try response.flush();
try bw.writeAll("World!\n");
try response.writer.writeAll("World!\n");
try response.flush();
try response.endChunked(.{
.trailers = &.{
@ -66,7 +65,7 @@ test "trailers" {
try req.sendBodiless();
var response = try req.receiveHead(&.{});
const body = try response.reader().readRemainingAlloc(gpa, .limited(8192));
const body = try response.reader(&.{}).allocRemaining(gpa, .limited(8192));
defer gpa.free(body);
try expectEqualStrings("Hello, World!\n", body);
@ -104,13 +103,13 @@ test "HTTP server handles a chunked transfer coding request" {
var connection_br = connection.stream.reader(&recv_buffer);
var connection_bw = connection.stream.writer(&send_buffer);
var server = http.Server.init(&connection_br, &connection_bw);
var server = http.Server.init(connection_br.interface(), &connection_bw.interface);
var request = try server.receiveHead();
try expect(request.head.transfer_encoding == .chunked);
var buf: [128]u8 = undefined;
var br = (try request.reader()).unbuffered();
var br = try request.readerExpectContinue(&.{});
const n = try br.readSliceShort(&buf);
try expectEqualStrings("ABCD", buf[0..n]);
@ -141,9 +140,8 @@ test "HTTP server handles a chunked transfer coding request" {
const gpa = std.testing.allocator;
const stream = try std.net.tcpConnectToHost(gpa, "127.0.0.1", test_server.port());
defer stream.close();
var stream_writer = stream.writer();
var writer = stream_writer.interface().unbuffered();
try writer.writeAll(request_bytes);
var stream_writer = stream.writer(&.{});
try stream_writer.interface.writeAll(request_bytes);
const expected_response =
"HTTP/1.1 200 OK\r\n" ++
@ -152,8 +150,8 @@ test "HTTP server handles a chunked transfer coding request" {
"content-type: text/plain\r\n" ++
"\r\n" ++
"message from server!\n";
var stream_reader = stream.reader();
const response = try stream_reader.interface().readRemainingAlloc(gpa, .limited(expected_response.len));
var stream_reader = stream.reader(&.{});
const response = try stream_reader.interface().allocRemaining(gpa, .limited(expected_response.len));
defer gpa.free(response);
try expectEqualStrings(expected_response, response);
}
@ -169,11 +167,9 @@ test "echo content server" {
const connection = try net_server.accept();
defer connection.stream.close();
var stream_reader = connection.stream.reader();
var stream_writer = connection.stream.writer();
var connection_br = stream_reader.interface().buffered(&recv_buffer);
var connection_bw = stream_writer.interface().buffered(&send_buffer);
var http_server = http.Server.init(&connection_br, &connection_bw);
var connection_br = connection.stream.reader(&recv_buffer);
var connection_bw = connection.stream.writer(&send_buffer);
var http_server = http.Server.init(connection_br.interface(), &connection_bw.interface);
while (http_server.reader.state == .ready) {
var request = http_server.receiveHead() catch |err| switch (err) {
@ -185,7 +181,7 @@ test "echo content server" {
}
if (request.head.expect) |expect_header_value| {
if (mem.eql(u8, expect_header_value, "garbage")) {
try expectError(error.HttpExpectationFailed, request.reader());
try expectError(error.HttpExpectationFailed, request.readerExpectContinue(&.{}));
try request.respond("", .{ .keep_alive = false });
continue;
}
@ -207,14 +203,14 @@ test "echo content server" {
// request.head.target,
//});
const body = try (try request.reader()).readRemainingAlloc(std.testing.allocator, .limited(8192));
const body = try (try request.readerExpectContinue(&.{})).allocRemaining(std.testing.allocator, .limited(8192));
defer std.testing.allocator.free(body);
try expect(mem.startsWith(u8, request.head.target, "/echo-content"));
try expectEqualStrings("Hello, World!\n", body);
try expectEqualStrings("text/plain", request.head.content_type.?);
var response = try request.respondStreaming(.{
var response = try request.respondStreaming(&.{}, .{
.content_length = switch (request.head.transfer_encoding) {
.chunked => null,
.none => len: {
@ -224,9 +220,9 @@ test "echo content server" {
},
});
try response.flush(); // Test an early flush to send the HTTP headers before the body.
var bw = response.writer().unbuffered();
try bw.writeAll("Hello, ");
try bw.writeAll("World!\n");
const w = &response.writer;
try w.writeAll("Hello, ");
try w.writeAll("World!\n");
try response.end();
//std.debug.print(" server finished responding\n", .{});
}
@ -259,27 +255,25 @@ test "Server.Request.respondStreaming non-chunked, unknown content-length" {
const connection = try net_server.accept();
defer connection.stream.close();
var stream_reader = connection.stream.reader();
var stream_writer = connection.stream.writer();
var connection_br = stream_reader.interface().buffered(&recv_buffer);
var connection_bw = stream_writer.interface().buffered(&send_buffer);
var server = http.Server.init(&connection_br, &connection_bw);
var connection_br = connection.stream.reader(&recv_buffer);
var connection_bw = connection.stream.writer(&send_buffer);
var server = http.Server.init(connection_br.interface(), &connection_bw.interface);
try expectEqual(.ready, server.reader.state);
var request = try server.receiveHead();
try expectEqualStrings(request.head.target, "/foo");
var response = try request.respondStreaming(.{
var buf: [30]u8 = undefined;
var response = try request.respondStreaming(&buf, .{
.respond_options = .{
.transfer_encoding = .none,
},
});
var buf: [30]u8 = undefined;
var bw = response.writer().buffered(&buf);
const w = &response.writer;
for (0..500) |i| {
try bw.print("{d}, ah ha ha!\n", .{i});
try w.print("{d}, ah ha ha!\n", .{i});
}
try expectEqual(7390, bw.count);
try bw.flush();
try expectEqual(7390, w.count);
try w.flush();
try response.end();
try expectEqual(.closing, server.reader.state);
}
@ -291,12 +285,11 @@ test "Server.Request.respondStreaming non-chunked, unknown content-length" {
const gpa = std.testing.allocator;
const stream = try std.net.tcpConnectToHost(gpa, "127.0.0.1", test_server.port());
defer stream.close();
var stream_writer = stream.writer();
var writer = stream_writer.interface().unbuffered();
try writer.writeAll(request_bytes);
var stream_writer = stream.writer(&.{});
try stream_writer.interface.writeAll(request_bytes);
var stream_reader = stream.reader();
const response = try stream_reader.interface().readRemainingAlloc(gpa, .limited(8192));
var stream_reader = stream.reader(&.{});
const response = try stream_reader.interface().allocRemaining(gpa, .limited(8192));
defer gpa.free(response);
var expected_response = std.ArrayList(u8).init(gpa);
@ -329,11 +322,9 @@ test "receiving arbitrary http headers from the client" {
const connection = try net_server.accept();
defer connection.stream.close();
var stream_reader = connection.stream.reader();
var stream_writer = connection.stream.writer();
var connection_br = stream_reader.interface().buffered(&recv_buffer);
var connection_bw = stream_writer.interface().buffered(&send_buffer);
var server = http.Server.init(&connection_br, &connection_bw);
var connection_br = connection.stream.reader(&recv_buffer);
var connection_bw = connection.stream.writer(&send_buffer);
var server = http.Server.init(connection_br.interface(), &connection_bw.interface);
try expectEqual(.ready, server.reader.state);
var request = try server.receiveHead();
@ -364,12 +355,11 @@ test "receiving arbitrary http headers from the client" {
const gpa = std.testing.allocator;
const stream = try std.net.tcpConnectToHost(gpa, "127.0.0.1", test_server.port());
defer stream.close();
var stream_writer = stream.writer();
var writer = stream_writer.interface().unbuffered();
try writer.writeAll(request_bytes);
var stream_writer = stream.writer(&.{});
try stream_writer.interface.writeAll(request_bytes);
var stream_reader = stream.reader();
const response = try stream_reader.interface().readRemainingAlloc(gpa, .limited(8192));
var stream_reader = stream.reader(&.{});
const response = try stream_reader.interface().allocRemaining(gpa, .limited(8192));
defer gpa.free(response);
var expected_response = std.ArrayList(u8).init(gpa);
@ -397,11 +387,9 @@ test "general client/server API coverage" {
var connection = try net_server.accept();
defer connection.stream.close();
var stream_reader = connection.stream.reader();
var stream_writer = connection.stream.writer();
var connection_br = stream_reader.interface().buffered(&recv_buffer);
var connection_bw = stream_writer.interface().buffered(&send_buffer);
var http_server = http.Server.init(&connection_br, &connection_bw);
var connection_br = connection.stream.reader(&recv_buffer);
var connection_bw = connection.stream.writer(&send_buffer);
var http_server = http.Server.init(connection_br.interface(), &connection_bw.interface);
while (http_server.reader.state == .ready) {
var request = http_server.receiveHead() catch |err| switch (err) {
@ -424,11 +412,11 @@ test "general client/server API coverage" {
});
const gpa = std.testing.allocator;
const body = try (try request.reader()).readRemainingAlloc(gpa, .limited(8192));
const body = try (try request.readerExpectContinue(&.{})).allocRemaining(gpa, .limited(8192));
defer gpa.free(body);
if (mem.startsWith(u8, request.head.target, "/get")) {
var response = try request.respondStreaming(.{
var response = try request.respondStreaming(&.{}, .{
.content_length = if (mem.indexOf(u8, request.head.target, "?chunked") == null)
14
else
@ -439,35 +427,35 @@ test "general client/server API coverage" {
},
},
});
var bw = response.writer().unbuffered();
try bw.writeAll("Hello, ");
try bw.writeAll("World!\n");
const w = &response.writer;
try w.writeAll("Hello, ");
try w.writeAll("World!\n");
try response.end();
// Writing again would cause an assertion failure.
} else if (mem.startsWith(u8, request.head.target, "/large")) {
var response = try request.respondStreaming(.{
var response = try request.respondStreaming(&.{}, .{
.content_length = 14 * 1024 + 14 * 10,
});
try response.flush(); // Test an early flush to send the HTTP headers before the body.
var bw = response.writer().unbuffered();
const w = &response.writer;
var i: u32 = 0;
while (i < 5) : (i += 1) {
try bw.writeAll("Hello, World!\n");
try w.writeAll("Hello, World!\n");
}
try bw.writeAll("Hello, World!\n" ** 1024);
try w.writeAll("Hello, World!\n" ** 1024);
i = 0;
while (i < 5) : (i += 1) {
try bw.writeAll("Hello, World!\n");
try w.writeAll("Hello, World!\n");
}
try response.end();
} else if (mem.eql(u8, request.head.target, "/redirect/1")) {
var response = try request.respondStreaming(.{
var response = try request.respondStreaming(&.{}, .{
.respond_options = .{
.status = .found,
.extra_headers = &.{
@ -476,9 +464,9 @@ test "general client/server API coverage" {
},
});
var bw = response.writer().unbuffered();
try bw.writeAll("Hello, ");
try bw.writeAll("Redirected!\n");
const w = &response.writer;
try w.writeAll("Hello, ");
try w.writeAll("Redirected!\n");
try response.end();
} else if (mem.eql(u8, request.head.target, "/redirect/2")) {
try request.respond("Hello, Redirected!\n", .{
@ -567,7 +555,7 @@ test "general client/server API coverage" {
try req.sendBodiless();
var response = try req.receiveHead(&redirect_buffer);
const body = try response.reader().readRemainingAlloc(gpa, .limited(8192));
const body = try response.reader(&.{}).allocRemaining(gpa, .limited(8192));
defer gpa.free(body);
try expectEqualStrings("Hello, World!\n", body);
@ -590,7 +578,7 @@ test "general client/server API coverage" {
try req.sendBodiless();
var response = try req.receiveHead(&redirect_buffer);
const body = try response.reader().readRemainingAlloc(gpa, .limited(8192 * 1024));
const body = try response.reader(&.{}).allocRemaining(gpa, .limited(8192 * 1024));
defer gpa.free(body);
try expectEqual(@as(usize, 14 * 1024 + 14 * 10), body.len);
@ -612,7 +600,7 @@ test "general client/server API coverage" {
try req.sendBodiless();
var response = try req.receiveHead(&redirect_buffer);
const body = try response.reader().readRemainingAlloc(gpa, .limited(8192));
const body = try response.reader(&.{}).allocRemaining(gpa, .limited(8192));
defer gpa.free(body);
try expectEqualStrings("", body);
@ -636,7 +624,7 @@ test "general client/server API coverage" {
try req.sendBodiless();
var response = try req.receiveHead(&redirect_buffer);
const body = try response.reader().readRemainingAlloc(gpa, .limited(8192));
const body = try response.reader(&.{}).allocRemaining(gpa, .limited(8192));
defer gpa.free(body);
try expectEqualStrings("Hello, World!\n", body);
@ -659,7 +647,7 @@ test "general client/server API coverage" {
try req.sendBodiless();
var response = try req.receiveHead(&redirect_buffer);
const body = try response.reader().readRemainingAlloc(gpa, .limited(8192));
const body = try response.reader(&.{}).allocRemaining(gpa, .limited(8192));
defer gpa.free(body);
try expectEqualStrings("", body);
@ -685,7 +673,7 @@ test "general client/server API coverage" {
try req.sendBodiless();
var response = try req.receiveHead(&redirect_buffer);
const body = try response.reader().readRemainingAlloc(gpa, .limited(8192));
const body = try response.reader(&.{}).allocRemaining(gpa, .limited(8192));
defer gpa.free(body);
try expectEqualStrings("Hello, World!\n", body);
@ -714,7 +702,7 @@ test "general client/server API coverage" {
try std.testing.expectEqual(.ok, response.head.status);
const body = try response.reader().readRemainingAlloc(gpa, .limited(8192));
const body = try response.reader(&.{}).allocRemaining(gpa, .limited(8192));
defer gpa.free(body);
try expectEqualStrings("", body);
@ -751,7 +739,7 @@ test "general client/server API coverage" {
try req.sendBodiless();
var response = try req.receiveHead(&redirect_buffer);
const body = try response.reader().readRemainingAlloc(gpa, .limited(8192));
const body = try response.reader(&.{}).allocRemaining(gpa, .limited(8192));
defer gpa.free(body);
try expectEqualStrings("Hello, World!\n", body);
@ -773,7 +761,7 @@ test "general client/server API coverage" {
try req.sendBodiless();
var response = try req.receiveHead(&redirect_buffer);
const body = try response.reader().readRemainingAlloc(gpa, .limited(8192));
const body = try response.reader(&.{}).allocRemaining(gpa, .limited(8192));
defer gpa.free(body);
try expectEqualStrings("Hello, World!\n", body);
@ -795,7 +783,7 @@ test "general client/server API coverage" {
try req.sendBodiless();
var response = try req.receiveHead(&redirect_buffer);
const body = try response.reader().readRemainingAlloc(gpa, .limited(8192));
const body = try response.reader(&.{}).allocRemaining(gpa, .limited(8192));
defer gpa.free(body);
try expectEqualStrings("Hello, World!\n", body);
@ -836,7 +824,7 @@ test "general client/server API coverage" {
try req.sendBodiless();
var response = try req.receiveHead(&redirect_buffer);
const body = try response.reader().readRemainingAlloc(gpa, .limited(8192));
const body = try response.reader(&.{}).allocRemaining(gpa, .limited(8192));
defer gpa.free(body);
try expectEqualStrings("Encoded redirect successful!\n", body);
@ -878,20 +866,18 @@ test "Server streams both reading and writing" {
const connection = try net_server.accept();
defer connection.stream.close();
var stream_reader = connection.stream.reader();
var stream_writer = connection.stream.writer();
var connection_br = stream_reader.interface().buffered(&recv_buffer);
var connection_bw = stream_writer.interface().buffered(&send_buffer);
var server = http.Server.init(&connection_br, &connection_bw);
var connection_br = connection.stream.reader(&recv_buffer);
var connection_bw = connection.stream.writer(&send_buffer);
var server = http.Server.init(connection_br.interface(), &connection_bw.interface);
var request = try server.receiveHead();
var read_buffer: [100]u8 = undefined;
var br = (try request.reader()).buffered(&read_buffer);
var response = try request.respondStreaming(.{
var br = try request.readerExpectContinue(&read_buffer);
var response = try request.respondStreaming(&.{}, .{
.respond_options = .{
.transfer_encoding = .none, // Causes keep_alive=false
},
});
var bw = response.writer().unbuffered();
const w = &response.writer;
while (true) {
try response.flush();
@ -901,7 +887,7 @@ test "Server streams both reading and writing" {
};
br.toss(buf.len);
for (buf) |*b| b.* = std.ascii.toUpper(b.*);
try bw.writeAll(buf);
try w.writeAll(buf);
}
try response.end();
}
@ -921,15 +907,14 @@ test "Server streams both reading and writing" {
defer req.deinit();
req.transfer_encoding = .chunked;
var body_writer = try req.sendBody();
var body_writer = try req.sendBody(&.{});
var response = try req.receiveHead(&redirect_buffer);
var w = body_writer.writer().unbuffered();
try w.writeAll("one ");
try w.writeAll("fish");
try body_writer.writer.writeAll("one ");
try body_writer.writer.writeAll("fish");
try body_writer.end();
const body = try response.reader().readRemainingAlloc(std.testing.allocator, .limited(8192));
const body = try response.reader(&.{}).allocRemaining(std.testing.allocator, .limited(8192));
defer std.testing.allocator.free(body);
try expectEqualStrings("ONE FISH", body);
@ -954,15 +939,14 @@ fn echoTests(client: *http.Client, port: u16) !void {
req.transfer_encoding = .{ .content_length = 14 };
var body_writer = try req.sendBody();
var w = body_writer.writer().unbuffered();
try w.writeAll("Hello, ");
try w.writeAll("World!\n");
var body_writer = try req.sendBody(&.{});
try body_writer.writer.writeAll("Hello, ");
try body_writer.writer.writeAll("World!\n");
try body_writer.end();
var response = try req.receiveHead(&redirect_buffer);
const body = try response.reader().readRemainingAlloc(gpa, .limited(8192));
const body = try response.reader(&.{}).allocRemaining(gpa, .limited(8192));
defer gpa.free(body);
try expectEqualStrings("Hello, World!\n", body);
@ -988,15 +972,14 @@ fn echoTests(client: *http.Client, port: u16) !void {
req.transfer_encoding = .chunked;
var body_writer = try req.sendBody();
var w = body_writer.writer().unbuffered();
try w.writeAll("Hello, ");
try w.writeAll("World!\n");
var body_writer = try req.sendBody(&.{});
try body_writer.writer.writeAll("Hello, ");
try body_writer.writer.writeAll("World!\n");
try body_writer.end();
var response = try req.receiveHead(&redirect_buffer);
const body = try response.reader().readRemainingAlloc(gpa, .limited(8192));
const body = try response.reader(&.{}).allocRemaining(gpa, .limited(8192));
defer gpa.free(body);
try expectEqualStrings("Hello, World!\n", body);
@ -1042,16 +1025,15 @@ fn echoTests(client: *http.Client, port: u16) !void {
req.transfer_encoding = .chunked;
var body_writer = try req.sendBody();
var w = body_writer.writer().unbuffered();
try w.writeAll("Hello, ");
try w.writeAll("World!\n");
var body_writer = try req.sendBody(&.{});
try body_writer.writer.writeAll("Hello, ");
try body_writer.writer.writeAll("World!\n");
try body_writer.end();
var response = try req.receiveHead(&redirect_buffer);
try expectEqual(.ok, response.head.status);
const body = try response.reader().readRemainingAlloc(gpa, .limited(8192));
const body = try response.reader(&.{}).allocRemaining(gpa, .limited(8192));
defer gpa.free(body);
try expectEqualStrings("Hello, World!\n", body);
@ -1073,11 +1055,11 @@ fn echoTests(client: *http.Client, port: u16) !void {
req.transfer_encoding = .chunked;
var body_writer = try req.sendBody();
var body_writer = try req.sendBody(&.{});
try body_writer.flush();
var response = try req.receiveHead(&redirect_buffer);
try expectEqual(.expectation_failed, response.head.status);
_ = try response.reader().discardRemaining();
_ = try response.reader(&.{}).discardRemaining();
}
}
@ -1128,11 +1110,9 @@ test "redirect to different connection" {
const connection = try net_server.accept();
defer connection.stream.close();
var stream_reader = connection.stream.reader();
var stream_writer = connection.stream.writer();
var connection_br = stream_reader.interface().buffered(&recv_buffer);
var connection_bw = stream_writer.interface().buffered(&send_buffer);
var server = http.Server.init(&connection_br, &connection_bw);
var connection_br = connection.stream.reader(&recv_buffer);
var connection_bw = connection.stream.writer(&send_buffer);
var server = http.Server.init(connection_br.interface(), &connection_bw.interface);
var request = try server.receiveHead();
try expectEqualStrings(request.head.target, "/ok");
try request.respond("good job, you pass", .{});
@ -1161,7 +1141,7 @@ test "redirect to different connection" {
var connection_br = connection.stream.reader(&recv_buffer);
var connection_bw = connection.stream.writer(&send_buffer);
var server = http.Server.init(&connection_br, &connection_bw);
var server = http.Server.init(connection_br.interface(), &connection_bw.interface);
var request = try server.receiveHead();
try expectEqualStrings(request.head.target, "/help");
try request.respond("", .{

View File

@ -96,7 +96,10 @@ pub const failing: Reader = .{
.end = 0,
};
pub const ending: Reader = .fixed(&.{});
/// This is generally safe to `@constCast` because it has an empty buffer, so
/// there is not really a way to accidentally attempt mutation of these fields.
const ending_state: Reader = .fixed(&.{});
pub const ending: *Reader = @constCast(&ending_state);
pub fn limited(r: *Reader, limit: Limit, buffer: []u8) Limited {
return Limited.init(r, limit, buffer);

View File

@ -221,11 +221,12 @@ pub fn writeSplatLimit(
/// `end`.
pub fn flush(w: *Writer) Error!void {
assert(0 == try w.vtable.drain(w, &.{}, 0));
if (w.end != 0) assert(w.vtable.drain == &fixedDrain);
}
/// Calls `VTable.drain` but hides the last `preserve_length` bytes from the
/// implementation, keeping them buffered.
pub fn drainLimited(w: *Writer, preserve_length: usize) Error!void {
pub fn drainPreserve(w: *Writer, preserve_length: usize) Error!void {
const temp_end = w.end -| preserve_length;
const preserved = w.buffer[temp_end..w.end];
w.end = temp_end;
@ -235,6 +236,67 @@ pub fn drainLimited(w: *Writer, preserve_length: usize) Error!void {
@memmove(w.buffer[w.end..][0..preserved.len], preserved);
}
/// Forwards a `drain` to a second `Writer` instance. `w` is only used for its
/// buffer, but it has its `end` and `count` adjusted accordingly depending on
/// how much was consumed.
///
/// Returns how many bytes from `data` were consumed.
pub fn drainTo(noalias w: *Writer, noalias other: *Writer, data: []const []const u8, splat: usize) Error!usize {
assert(w != other);
const header = w.buffered();
const new_end = other.end + header.len;
if (new_end <= other.buffer.len) {
@memcpy(other.buffer[other.end..][0..header.len], header);
other.end = new_end;
other.count += header.len;
w.end = 0;
const n = try other.vtable.drain(other, data, splat);
other.count += n;
return n;
}
if (other.vtable == &VectorWrapper.vtable) {
const wrapper: *VectorWrapper = @fieldParentPtr("writer", w);
while (wrapper.it.next()) |dest| {
_ = dest;
@panic("TODO");
}
}
var vecs: [8][]const u8 = undefined; // Arbitrarily chosen size.
var i: usize = 1;
vecs[0] = header;
for (data) |buf| {
if (buf.len == 0) continue;
vecs[i] = buf;
i += 1;
if (vecs.len - i == 0) break;
}
const new_splat = if (vecs[i - 1].ptr == data[data.len - 1].ptr) splat else 1;
const n = try other.vtable.drain(other, vecs[0..i], new_splat);
other.count += n;
if (n < header.len) {
const remaining = w.buffer[n..w.end];
@memmove(w.buffer[0..remaining.len], remaining);
w.end = remaining.len;
return 0;
}
defer w.end = 0;
return n - header.len;
}
pub fn drainToLimit(
noalias w: *Writer,
noalias other: *Writer,
data: []const []const u8,
splat: usize,
limit: Limit,
) Error!usize {
assert(w != other);
_ = data;
_ = splat;
_ = limit;
@panic("TODO");
}
pub fn unusedCapacitySlice(w: *const Writer) []u8 {
return w.buffer[w.end..];
}
@ -285,10 +347,10 @@ pub fn writableSliceGreedy(w: *Writer, minimum_length: usize) Error![]u8 {
/// remain buffered.
///
/// If `preserve_length` is zero, this is equivalent to `writableSliceGreedy`.
pub fn writableSliceGreedyPreserving(w: *Writer, preserve_length: usize, minimum_length: usize) Error![]u8 {
pub fn writableSliceGreedyPreserve(w: *Writer, preserve_length: usize, minimum_length: usize) Error![]u8 {
assert(w.buffer.len >= preserve_length + minimum_length);
while (w.buffer.len - w.end < minimum_length) {
try drainLimited(w, preserve_length);
try drainPreserve(w, preserve_length);
} else {
@branchHint(.likely);
return w.buffer[w.end..];
@ -444,7 +506,7 @@ pub fn write(w: *Writer, bytes: []const u8) Error!usize {
}
/// Asserts `buffer` capacity exceeds `preserve_length`.
pub fn writePreserving(w: *Writer, preserve_length: usize, bytes: []const u8) Error!usize {
pub fn writePreserve(w: *Writer, preserve_length: usize, bytes: []const u8) Error!usize {
assert(preserve_length <= w.buffer.len);
if (w.end + bytes.len <= w.buffer.len) {
@branchHint(.likely);
@ -478,9 +540,9 @@ pub fn writeAll(w: *Writer, bytes: []const u8) Error!void {
/// remain buffered.
///
/// Asserts `buffer` capacity exceeds `preserve_length`.
pub fn writeAllPreserving(w: *Writer, preserve_length: usize, bytes: []const u8) Error!void {
pub fn writeAllPreserve(w: *Writer, preserve_length: usize, bytes: []const u8) Error!void {
var index: usize = 0;
while (index < bytes.len) index += try w.writePreserving(preserve_length, bytes[index..]);
while (index < bytes.len) index += try w.writePreserve(preserve_length, bytes[index..]);
}
pub fn print(w: *Writer, comptime format: []const u8, args: anytype) Error!void {
@ -505,9 +567,9 @@ pub fn writeByte(w: *Writer, byte: u8) Error!void {
/// When draining the buffer, ensures that at least `preserve_length` bytes
/// remain buffered.
pub fn writeBytePreserving(w: *Writer, preserve_length: usize, byte: u8) Error!void {
pub fn writeBytePreserve(w: *Writer, preserve_length: usize, byte: u8) Error!void {
while (w.buffer.len - w.end == 0) {
try drainLimited(w, preserve_length);
try drainPreserve(w, preserve_length);
} else {
@branchHint(.likely);
w.buffer[w.end] = byte;
@ -615,19 +677,26 @@ pub fn sendFile(w: *Writer, file_reader: *File.Reader, limit: Limit) FileError!u
/// on how much was consumed.
///
/// Returns how many bytes from `file_reader` were consumed.
pub fn sendFileTo(w: *Writer, other: *Writer, file_reader: *File.Reader, limit: Limit) FileError!usize {
pub fn sendFileTo(
noalias w: *Writer,
noalias other: *Writer,
file_reader: *File.Reader,
limit: Limit,
) FileError!usize {
assert(w != other);
const header = w.buffered();
const new_end = other.end + header.len;
if (new_end <= other.buffer.len) {
@memcpy(other.buffer[other.end..][0..header.len], header);
other.end = new_end;
other.count += header.len;
w.end = 0;
return other.vtable.sendFile(other, file_reader, limit);
}
assert(header.len > 0);
var vec_buf: [2][]const u8 = .{ header, undefined };
var vec_i: usize = 1;
const buffered_contents = limit.slice(file_reader.buffered());
const buffered_contents = limit.slice(file_reader.interface.buffered());
if (buffered_contents.len > 0) {
vec_buf[vec_i] = buffered_contents;
vec_i += 1;