diff --git a/lib/std/Build.zig b/lib/std/Build.zig index 1af096207a..9dea47c81c 100644 --- a/lib/std/Build.zig +++ b/lib/std/Build.zig @@ -2061,7 +2061,7 @@ pub fn runAllowFail( try child.spawn(); var file_reader = child.stdout.?.readerStreaming(); - const stdout = try file_reader.interface().readRemainingAlloc(b.allocator, .limited(max_output_size)); + const stdout = try file_reader.interface().allocRemaining(b.allocator, .limited(max_output_size)); errdefer b.allocator.free(stdout); const term = try child.wait(); diff --git a/lib/std/Build/Cache.zig b/lib/std/Build/Cache.zig index 61c57344e5..9609f93df4 100644 --- a/lib/std/Build/Cache.zig +++ b/lib/std/Build/Cache.zig @@ -663,7 +663,7 @@ pub const Manifest = struct { const input_file_count = self.files.entries.len; var manifest_reader = self.manifest_file.?.reader(); // Reads positionally from zero. const limit: std.io.Limit = .limited(manifest_file_size_max); - const file_contents = manifest_reader.interface().readRemainingAlloc(gpa, limit) catch |err| switch (err) { + const file_contents = manifest_reader.interface().allocRemaining(gpa, limit) catch |err| switch (err) { error.OutOfMemory => return error.OutOfMemory, error.StreamTooLong => return error.OutOfMemory, error.ReadFailed => { diff --git a/lib/std/Build/Step/Run.zig b/lib/std/Build/Step/Run.zig index b62606f29e..8439807b2a 100644 --- a/lib/std/Build/Step/Run.zig +++ b/lib/std/Build/Step/Run.zig @@ -1788,7 +1788,7 @@ fn evalGeneric(run: *Run, child: *std.process.Child) !StdIoResult { stderr_bytes = poller.reader(.stderr).buffered(); } else { var fr = stdout.readerStreaming(); - stdout_bytes = fr.interface().readRemainingAlloc(arena, run.stdio_limit) catch |err| switch (err) { + stdout_bytes = fr.interface().allocRemaining(arena, run.stdio_limit) catch |err| switch (err) { error.OutOfMemory => return error.OutOfMemory, error.ReadFailed => return fr.err.?, error.StreamTooLong => return error.StdoutStreamTooLong, @@ -1796,7 +1796,7 @@ fn evalGeneric(run: *Run, child: *std.process.Child) !StdIoResult { } } else if (child.stderr) |stderr| { var fr = stderr.readerStreaming(); - stderr_bytes = fr.interface().readRemainingAlloc(arena, run.stdio_limit) catch |err| switch (err) { + stderr_bytes = fr.interface().allocRemaining(arena, run.stdio_limit) catch |err| switch (err) { error.OutOfMemory => return error.OutOfMemory, error.ReadFailed => return fr.err.?, error.StreamTooLong => return error.StderrStreamTooLong, diff --git a/lib/std/Uri.zig b/lib/std/Uri.zig index b3b82fbab1..add94a47a0 100644 --- a/lib/std/Uri.zig +++ b/lib/std/Uri.zig @@ -465,11 +465,11 @@ fn merge_paths(base: Component, new: []u8, aux_buf: *[]u8) error{NoSpaceLeft}!Co var aux: Writer = .fixed(aux_buf.*); if (!base.isEmpty()) { aux.print("{fpath}", .{base}) catch return error.NoSpaceLeft; - aux.end = std.mem.lastIndexOfScalar(u8, aux.getWritten(), '/') orelse + aux.end = std.mem.lastIndexOfScalar(u8, aux.buffered(), '/') orelse return remove_dot_segments(new); } aux.print("/{s}", .{new}) catch return error.NoSpaceLeft; - const merged_path = remove_dot_segments(aux.getWritten()); + const merged_path = remove_dot_segments(aux.buffered()); aux_buf.* = aux_buf.*[merged_path.percent_encoded.len..]; return merged_path; } diff --git a/lib/std/compress/flate/Decompress.zig b/lib/std/compress/flate/Decompress.zig index 626fd38590..846470b4c3 100644 --- a/lib/std/compress/flate/Decompress.zig +++ b/lib/std/compress/flate/Decompress.zig @@ -8,7 +8,7 @@ const Writer = std.io.Writer; const Reader = std.io.Reader; input: *Reader, -interface: Reader, +reader: Reader, /// Hashes, produces checksum, of uncompressed data for gzip/zlib footer. hasher: Container.Hasher, @@ -51,7 +51,7 @@ pub const Error = Container.Error || error{ pub fn init(input: *Reader, container: Container, buffer: []u8) Decompress { return .{ - .interface = .{ + .reader = .{ // TODO populate discard so that when an amount is discarded that // includes an entire frame, skip decoding that frame. .vtable = &.{ .stream = stream }, @@ -130,7 +130,7 @@ fn decodeSymbol(self: *Decompress, decoder: anytype) !Symbol { } pub fn stream(r: *Reader, w: *Writer, limit: std.io.Limit) Reader.StreamError!usize { - const d: *Decompress = @alignCast(@fieldParentPtr("interface", r)); + const d: *Decompress = @alignCast(@fieldParentPtr("reader", r)); return readInner(d, w, limit) catch |err| switch (err) { error.EndOfStream => return error.EndOfStream, error.WriteFailed => return error.WriteFailed, @@ -247,7 +247,7 @@ fn readInner(d: *Decompress, w: *Writer, limit: std.io.Limit) (Error || Reader.S } }, .stored_block => |remaining_len| { - const out = try w.writableSliceGreedyPreserving(flate.history_len, 1); + const out = try w.writableSliceGreedyPreserve(flate.history_len, 1); const limited_out = limit.min(.limited(remaining_len)).slice(out); const n = try d.input.readVec(&.{limited_out}); if (remaining_len - n == 0) { @@ -263,7 +263,7 @@ fn readInner(d: *Decompress, w: *Writer, limit: std.io.Limit) (Error || Reader.S while (@intFromEnum(limit) > w.count - start) { const code = try d.readFixedCode(); switch (code) { - 0...255 => try w.writeBytePreserving(flate.history_len, @intCast(code)), + 0...255 => try w.writeBytePreserve(flate.history_len, @intCast(code)), 256 => { d.state = if (d.final_block) .protocol_footer else .block_header; return w.count - start; @@ -289,7 +289,7 @@ fn readInner(d: *Decompress, w: *Writer, limit: std.io.Limit) (Error || Reader.S const sym = try d.decodeSymbol(&d.lit_dec); switch (sym.kind) { - .literal => try w.writeBytePreserving(flate.history_len, sym.symbol), + .literal => try w.writeBytePreserve(flate.history_len, sym.symbol), .match => { // Decode match backreference const length = try d.decodeLength(sym.symbol); diff --git a/lib/std/compress/zstd/Decompress.zig b/lib/std/compress/zstd/Decompress.zig index 63ccd1c367..86100b7efb 100644 --- a/lib/std/compress/zstd/Decompress.zig +++ b/lib/std/compress/zstd/Decompress.zig @@ -7,7 +7,7 @@ const zstd = @import("../zstd.zig"); const Writer = std.io.Writer; input: *Reader, -interface: Reader, +reader: Reader, state: State, verify_checksum: bool, err: ?Error = null, @@ -68,7 +68,7 @@ pub fn init(input: *Reader, buffer: []u8, options: Options) Decompress { .input = input, .state = .new_frame, .verify_checksum = options.verify_checksum, - .interface = .{ + .reader = .{ .vtable = &.{ .stream = stream }, .buffer = buffer, .seek = 0, @@ -78,7 +78,7 @@ pub fn init(input: *Reader, buffer: []u8, options: Options) Decompress { } fn stream(r: *Reader, w: *Writer, limit: Limit) Reader.StreamError!usize { - const d: *Decompress = @alignCast(@fieldParentPtr("interface", r)); + const d: *Decompress = @alignCast(@fieldParentPtr("reader", r)); const in = d.input; switch (d.state) { diff --git a/lib/std/debug/Dwarf.zig b/lib/std/debug/Dwarf.zig index fa6ed51ca6..ff07698936 100644 --- a/lib/std/debug/Dwarf.zig +++ b/lib/std/debug/Dwarf.zig @@ -2242,7 +2242,7 @@ pub const ElfModule = struct { var zlib_stream: std.compress.flate.Decompress = .init(§ion_reader, .zlib, &.{}); - const decompressed_section = zlib_stream.interface.allocRemaining(gpa, .limited(ch_size)) catch + const decompressed_section = zlib_stream.reader.allocRemaining(gpa, .limited(ch_size)) catch continue; if (decompressed_section.len != ch_size) { gpa.free(decompressed_section); diff --git a/lib/std/http.zig b/lib/std/http.zig index 949f8e9619..d3f09449d7 100644 --- a/lib/std/http.zig +++ b/lib/std/http.zig @@ -328,6 +328,9 @@ pub const Header = struct { pub const Reader = struct { in: *std.io.Reader, + /// This is preallocated memory that might be used by `bodyReader`. That + /// function might return a pointer to this field, or a different + /// `*std.io.Reader`. Advisable to not access this field directly. interface: std.io.Reader, /// Keeps track of whether the stream is ready to accept a new request, /// making invalid API usage cause assertion failures rather than HTTP @@ -489,31 +492,31 @@ pub const Reader = struct { content_encoding: ContentEncoding, decompressor: *Decompressor, decompression_buffer: []u8, - ) std.io.Reader { + ) *std.io.Reader { if (transfer_encoding == .none and content_length == null) { assert(reader.state == .received_head); reader.state = .body_none; switch (content_encoding) { .identity => { - return reader.in.reader(); + return reader.in; }, .deflate => { - decompressor.compression = .{ .deflate = .init(reader.in) }; - return decompressor.compression.deflate.reader(); + decompressor.* = .{ .flate = .init(reader.in, .raw, decompression_buffer) }; + return &decompressor.flate.reader; }, .gzip => { - decompressor.compression = .{ .gzip = .init(reader.in) }; - return decompressor.compression.gzip.reader(); + decompressor.* = .{ .flate = .init(reader.in, .gzip, decompression_buffer) }; + return &decompressor.flate.reader; }, .zstd => { - decompressor.compression = .{ .zstd = .init(reader.in, .{ .verify_checksum = false }) }; - return decompressor.compression.zstd.reader(); + decompressor.* = .{ .zstd = .init(reader.in, decompression_buffer, .{ .verify_checksum = false }) }; + return &decompressor.zstd.reader; }, .compress => unreachable, } } - const transfer_reader = bodyReader(reader, transfer_encoding, content_length); - return decompressor.reader(transfer_reader, decompression_buffer, content_encoding); + const transfer_reader = bodyReader(reader, &.{}, transfer_encoding, content_length); + return decompressor.init(transfer_reader, decompression_buffer, content_encoding); } fn contentLengthStream( @@ -711,42 +714,33 @@ pub const Reader = struct { } }; -pub const Decompressor = struct { - compression: Compression, - buffered_reader: std.io.Reader, +pub const Decompressor = union(enum) { + flate: std.compress.flate.Decompress, + zstd: std.compress.zstd.Decompress, + none: *std.io.Reader, - pub const Compression = union(enum) { - deflate: std.compress.flate.Decompressor, - gzip: std.compress.flate.Decompressor, - zstd: std.compress.zstd.Decompress, - none: void, - }; - - pub fn reader( + pub fn init( decompressor: *Decompressor, - transfer_reader: std.io.Reader, + transfer_reader: *std.io.Reader, buffer: []u8, content_encoding: ContentEncoding, - ) std.io.Reader { + ) *std.io.Reader { switch (content_encoding) { .identity => { - decompressor.compression = .none; + decompressor.* = .{ .none = transfer_reader }; return transfer_reader; }, .deflate => { - decompressor.buffered_reader = transfer_reader.buffered(buffer); - decompressor.compression = .{ .deflate = .init(&decompressor.buffered_reader) }; - return decompressor.compression.deflate.reader(); + decompressor.* = .{ .flate = .init(transfer_reader, .raw, buffer) }; + return &decompressor.flate.reader; }, .gzip => { - decompressor.buffered_reader = transfer_reader.buffered(buffer); - decompressor.compression = .{ .gzip = .init(&decompressor.buffered_reader) }; - return decompressor.compression.gzip.reader(); + decompressor.* = .{ .flate = .init(transfer_reader, .gzip, buffer) }; + return &decompressor.flate.reader; }, .zstd => { - decompressor.buffered_reader = transfer_reader.buffered(buffer); - decompressor.compression = .{ .zstd = .init(&decompressor.buffered_reader, .{}) }; - return decompressor.compression.gzip.reader(); + decompressor.* = .{ .zstd = .init(transfer_reader, buffer, .{ .verify_checksum = false }) }; + return &decompressor.zstd.reader; }, .compress => unreachable, } @@ -759,7 +753,7 @@ pub const BodyWriter = struct { /// state of this other than via methods of `BodyWriter`. http_protocol_output: *Writer, state: State, - interface: Writer, + writer: Writer, pub const Error = Writer.Error; @@ -797,7 +791,7 @@ pub const BodyWriter = struct { }; pub fn isEliding(w: *const BodyWriter) bool { - return w.interface.vtable.drain == Writer.discardingDrain; + return w.writer.vtable.drain == Writer.discardingDrain; } /// Sends all buffered data across `BodyWriter.http_protocol_output`. @@ -924,42 +918,42 @@ pub const BodyWriter = struct { w.state = .end; } - fn contentLengthDrain(w: *Writer, data: []const []const u8, splat: usize) Error!usize { - const bw: *BodyWriter = @fieldParentPtr("interface", w); + pub fn contentLengthDrain(w: *Writer, data: []const []const u8, splat: usize) Error!usize { + const bw: *BodyWriter = @fieldParentPtr("writer", w); assert(!bw.isEliding()); - const out = w.http_protocol_output; + const out = bw.http_protocol_output; const n = try w.drainTo(out, data, splat); - w.state.content_length -= n; + bw.state.content_length -= n; return n; } - fn noneDrain(w: *Writer, data: []const []const u8, splat: usize) Error!usize { - const bw: *BodyWriter = @fieldParentPtr("interface", w); + pub fn noneDrain(w: *Writer, data: []const []const u8, splat: usize) Error!usize { + const bw: *BodyWriter = @fieldParentPtr("writer", w); assert(!bw.isEliding()); - const out = w.http_protocol_output; + const out = bw.http_protocol_output; return try w.drainTo(out, data, splat); } /// Returns `null` if size cannot be computed without making any syscalls. - fn noneSendFile(w: *Writer, file_reader: *File.Reader, limit: std.io.Limit) Writer.FileError!usize { - const bw: *BodyWriter = @fieldParentPtr("interface", w); + pub fn noneSendFile(w: *Writer, file_reader: *File.Reader, limit: std.io.Limit) Writer.FileError!usize { + const bw: *BodyWriter = @fieldParentPtr("writer", w); assert(!bw.isEliding()); return w.sendFileTo(bw.http_protocol_output, file_reader, limit); } - fn contentLengthSendFile(w: *Writer, file_reader: *File.Reader, limit: std.io.Limit) Writer.FileError!usize { - const bw: *BodyWriter = @fieldParentPtr("interface", w); + pub fn contentLengthSendFile(w: *Writer, file_reader: *File.Reader, limit: std.io.Limit) Writer.FileError!usize { + const bw: *BodyWriter = @fieldParentPtr("writer", w); assert(!bw.isEliding()); const n = try w.sendFileTo(bw.http_protocol_output, file_reader, limit); bw.state.content_length -= n; return n; } - fn chunkedSendFile(w: *Writer, file_reader: *File.Reader, limit: std.io.Limit) Writer.FileError!usize { - const bw: *BodyWriter = @fieldParentPtr("interface", w); + pub fn chunkedSendFile(w: *Writer, file_reader: *File.Reader, limit: std.io.Limit) Writer.FileError!usize { + const bw: *BodyWriter = @fieldParentPtr("writer", w); assert(!bw.isEliding()); - const data_len = w.countSendFileUpperBound(file_reader, limit) orelse { - // If the file size is unknown, we cannot lower to a `writeFile` since we would + const data_len = if (file_reader.getSize()) |x| w.end + x else |_| { + // If the file size is unknown, we cannot lower to a `sendFile` since we would // have to flush the chunk header before knowing the chunk length. return error.Unimplemented; }; @@ -1003,10 +997,10 @@ pub const BodyWriter = struct { } } - fn chunkedDrain(w: *Writer, data: []const []const u8, splat: usize) Error!usize { - const bw: *BodyWriter = @fieldParentPtr("interface", w); + pub fn chunkedDrain(w: *Writer, data: []const []const u8, splat: usize) Error!usize { + const bw: *BodyWriter = @fieldParentPtr("writer", w); assert(!bw.isEliding()); - const out = w.http_protocol_output; + const out = bw.http_protocol_output; const data_len = Writer.countSplat(w.end, data, splat); const chunked = &bw.state.chunked; state: switch (chunked.*) { @@ -1018,7 +1012,7 @@ pub const BodyWriter = struct { const buffered_len = out.end - offset - chunk_header_template.len; const chunk_len = data_len + buffered_len; writeHex(out.buffer[offset..][0..chunk_len_digits], chunk_len); - const n = try w.drainTo(w, data, splat); + const n = try w.drainTo(out, data, splat); chunked.* = .{ .chunk_len = data_len + 2 - n }; return n; }, @@ -1041,7 +1035,7 @@ pub const BodyWriter = struct { continue :l 1; }, else => { - const n = try w.drainToLimit(data, splat, .limited(chunk_len - 2)); + const n = try w.drainToLimit(out, data, splat, .limited(chunk_len - 2)); chunked.chunk_len = chunk_len - n; return n; }, diff --git a/lib/std/http/Client.zig b/lib/std/http/Client.zig index 32b5853c68..6ef90641b1 100644 --- a/lib/std/http/Client.zig +++ b/lib/std/http/Client.zig @@ -408,7 +408,7 @@ pub const Connection = struct { /// HTTP protocol from server to client. /// This either comes directly from `stream_reader`, or from a TLS client. - pub fn reader(c: *const Connection) *Reader { + pub fn reader(c: *Connection) *Reader { return switch (c.protocol) { .tls => { if (disable_tls) unreachable; @@ -682,7 +682,7 @@ pub const Response = struct { /// /// See also: /// * `readerDecompressing` - pub fn reader(response: *Response, buffer: []u8) Reader { + pub fn reader(response: *Response, buffer: []u8) *Reader { const req = response.request; if (!req.method.responseHasBody()) return .ending; const head = &response.head; @@ -702,7 +702,7 @@ pub const Response = struct { response: *Response, decompressor: *http.Decompressor, decompression_buffer: []u8, - ) Reader { + ) *Reader { const head = &response.head; return response.request.reader.bodyReaderDecompressing( head.transfer_encoding, @@ -864,14 +864,14 @@ pub const Request = struct { pub fn sendBodyUnflushed(r: *Request, buffer: []u8) Writer.Error!http.BodyWriter { assert(r.method.requestHasBody()); try sendHead(r); - const http_protocol_output = &r.connection.?.writer; + const http_protocol_output = r.connection.?.writer(); return switch (r.transfer_encoding) { .chunked => .{ .http_protocol_output = http_protocol_output, .state = .{ .chunked = .init }, - .interface = .{ + .writer = .{ .buffer = buffer, - .interface = &.{ + .vtable = &.{ .drain = http.BodyWriter.chunkedDrain, .sendFile = http.BodyWriter.chunkedSendFile, }, @@ -880,9 +880,9 @@ pub const Request = struct { .content_length => |len| .{ .http_protocol_output = http_protocol_output, .state = .{ .content_length = len }, - .interface = .{ + .writer = .{ .buffer = buffer, - .interface = &.{ + .vtable = &.{ .drain = http.BodyWriter.contentLengthDrain, .sendFile = http.BodyWriter.contentLengthSendFile, }, @@ -891,9 +891,9 @@ pub const Request = struct { .none => .{ .http_protocol_output = http_protocol_output, .state = .none, - .interface = .{ + .writer = .{ .buffer = buffer, - .interface = &.{ + .vtable = &.{ .drain = http.BodyWriter.noneDrain, .sendFile = http.BodyWriter.noneSendFile, }, @@ -906,7 +906,7 @@ pub const Request = struct { fn sendHead(r: *Request) Writer.Error!void { const uri = r.uri; const connection = r.connection.?; - const w = &connection.writer; + const w = connection.writer(); try r.method.write(w); try w.writeByte(' '); @@ -1085,7 +1085,7 @@ pub const Request = struct { if (head.status.class() == .redirect and r.redirect_behavior != .unhandled) { if (r.redirect_behavior == .not_allowed) { // Connection can still be reused by skipping the body. - var reader = r.reader.bodyReader(head.transfer_encoding, head.content_length); + const reader = r.reader.bodyReader(&.{}, head.transfer_encoding, head.content_length); _ = reader.discardRemaining() catch |err| switch (err) { error.ReadFailed => connection.closing = true, }; @@ -1117,7 +1117,7 @@ pub const Request = struct { { // Skip the body of the redirect response to leave the connection in // the correct state. This causes `new_location` to be invalidated. - var reader = r.reader.bodyReader(head.transfer_encoding, head.content_length); + const reader = r.reader.bodyReader(&.{}, head.transfer_encoding, head.content_length); _ = reader.discardRemaining() catch |err| switch (err) { error.ReadFailed => return r.reader.body_err.?, }; @@ -1169,8 +1169,10 @@ pub const Request = struct { r.uri = new_uri; r.connection = new_connection; r.reader = .{ - .in = &new_connection.reader, + .in = new_connection.reader(), .state = .ready, + // Populated when `http.Reader.bodyReader` is called. + .interface = undefined, }; r.redirect_behavior.subtractOne(); } @@ -1292,12 +1294,12 @@ pub const basic_authorization = struct { pub fn write(uri: Uri, out: *Writer) Writer.Error!void { var buf: [max_user_len + ":".len + max_password_len]u8 = undefined; - var bw: Writer = .fixed(&buf); - bw.print("{fuser}:{fpassword}", .{ + var w: Writer = .fixed(&buf); + w.print("{fuser}:{fpassword}", .{ uri.user orelse Uri.Component.empty, uri.password orelse Uri.Component.empty, }) catch unreachable; - try out.print("Basic {b64}", .{bw.getWritten()}); + try out.print("Basic {b64}", .{w.buffered()}); } }; @@ -1622,8 +1624,10 @@ pub fn request( .client = client, .connection = connection, .reader = .{ - .in = &connection.reader, + .in = connection.reader(), .state = .ready, + // Populated when `http.Reader.bodyReader` is called. + .interface = undefined, }, .keep_alive = options.keep_alive, .method = method, @@ -1711,9 +1715,8 @@ pub fn fetch(client: *Client, options: FetchOptions) FetchError!FetchResult { if (options.payload) |payload| { req.transfer_encoding = .{ .content_length = payload.len }; - var body = try req.sendBody(); - var bw = body.writer().unbuffered(); - try bw.writeAll(payload); + var body = try req.sendBody(&.{}); + try body.writer.writeAll(payload); try body.end(); } else { try req.sendBodiless(); @@ -1726,7 +1729,7 @@ pub fn fetch(client: *Client, options: FetchOptions) FetchError!FetchResult { var response = try req.receiveHead(redirect_buffer); const storage = options.response_storage orelse { - var reader = response.reader(); + const reader = response.reader(&.{}); _ = reader.discardRemaining() catch |err| switch (err) { error.ReadFailed => return response.bodyErr().?, }; @@ -1741,18 +1744,17 @@ pub fn fetch(client: *Client, options: FetchOptions) FetchError!FetchResult { defer if (options.decompress_buffer == null) client.allocator.free(decompress_buffer); var decompressor: http.Decompressor = undefined; - var reader = response.readerDecompressing(&decompressor, decompress_buffer); + const reader = response.readerDecompressing(&decompressor, decompress_buffer); const list = storage.list; if (storage.allocator) |allocator| { - reader.readRemainingArrayList(allocator, null, list, storage.append_limit, 128) catch |err| switch (err) { + reader.appendRemaining(allocator, null, list, storage.append_limit) catch |err| switch (err) { error.ReadFailed => return response.bodyErr().?, else => |e| return e, }; } else { - var br = reader.unbuffered(); const buf = storage.append_limit.slice(list.unusedCapacitySlice()); - list.items.len += br.readSliceShort(buf) catch |err| switch (err) { + list.items.len += reader.readSliceShort(buf) catch |err| switch (err) { error.ReadFailed => return response.bodyErr().?, }; } diff --git a/lib/std/http/Server.zig b/lib/std/http/Server.zig index d3077842c8..004741d1ae 100644 --- a/lib/std/http/Server.zig +++ b/lib/std/http/Server.zig @@ -26,6 +26,8 @@ pub fn init(in: *std.io.Reader, out: *Writer) Server { .reader = .{ .in = in, .state = .ready, + // Populated when `http.Reader.bodyReader` is called. + .interface = undefined, }, .out = out, }; @@ -58,7 +60,7 @@ pub const Request = struct { /// Pointers in this struct are invalidated with the next call to /// `receiveHead`. head: Head, - respond_err: ?RespondError, + respond_err: ?RespondError = null, pub const RespondError = error{ /// The request contained an `expect` header with an unrecognized value. @@ -243,6 +245,7 @@ pub const Request = struct { .in = undefined, .state = .received_head, .head_buffer = @constCast(request_bytes), + .interface = undefined, }, .out = undefined, }; @@ -381,8 +384,6 @@ pub const Request = struct { content_length: ?u64 = null, /// Options that are shared with the `respond` method. respond_options: RespondOptions = .{}, - /// Used by `http.BodyWriter`. - buffer: []u8, }; /// The header is not guaranteed to be sent until `BodyWriter.flush` or @@ -400,7 +401,11 @@ pub const Request = struct { /// be done to satisfy the request. /// /// Asserts status is not `continue`. - pub fn respondStreaming(request: *Request, options: RespondStreamingOptions) Writer.Error!http.BodyWriter { + pub fn respondStreaming( + request: *Request, + buffer: []u8, + options: RespondStreamingOptions, + ) ExpectContinueError!http.BodyWriter { try writeExpectContinue(request); const o = options.respond_options; assert(o.status != .@"continue"); @@ -448,12 +453,12 @@ pub const Request = struct { return if (elide_body) .{ .http_protocol_output = request.server.out, .state = state, - .interface = .discarding(options.buffer), + .writer = .discarding(buffer), } else .{ .http_protocol_output = request.server.out, .state = state, - .interface = .{ - .buffer = options.buffer, + .writer = .{ + .buffer = buffer, .vtable = switch (state) { .none => &.{ .drain = http.BodyWriter.noneDrain, @@ -559,11 +564,11 @@ pub const Request = struct { /// /// See `readerExpectNone` for an infallible alternative that cannot write /// to the server output stream. - pub fn readerExpectContinue(request: *Request) ExpectContinueError!std.io.Reader { + pub fn readerExpectContinue(request: *Request, buffer: []u8) ExpectContinueError!*std.io.Reader { const flush = request.head.expect != null; try writeExpectContinue(request); if (flush) try request.server.out.flush(); - return readerExpectNone(request); + return readerExpectNone(request, buffer); } /// Asserts the expect header is `null`. The caller must handle the @@ -571,11 +576,11 @@ pub const Request = struct { /// this function. /// /// Asserts that this function is only called once. - pub fn readerExpectNone(request: *Request) std.io.Reader { + pub fn readerExpectNone(request: *Request, buffer: []u8) *std.io.Reader { assert(request.server.reader.state == .received_head); assert(request.head.expect == null); if (!request.head.method.requestHasBody()) return .ending; - return request.server.reader.bodyReader(request.head.transfer_encoding, request.head.content_length); + return request.server.reader.bodyReader(buffer, request.head.transfer_encoding, request.head.content_length); } pub const ExpectContinueError = error{ @@ -611,7 +616,7 @@ pub const Request = struct { .received_head => { if (request.head.method.requestHasBody()) { assert(request.head.transfer_encoding != .none or request.head.content_length != null); - const reader_interface = request.reader() catch return false; + const reader_interface = request.readerExpectContinue(&.{}) catch return false; _ = reader_interface.discardRemaining() catch return false; assert(r.state == .ready); } else { diff --git a/lib/std/http/test.zig b/lib/std/http/test.zig index 36a9f25f51..914491f9d1 100644 --- a/lib/std/http/test.zig +++ b/lib/std/http/test.zig @@ -21,7 +21,7 @@ test "trailers" { var connection_br = connection.stream.reader(&recv_buffer); var connection_bw = connection.stream.writer(&send_buffer); - var server = http.Server.init(&connection_br, &connection_bw); + var server = http.Server.init(connection_br.interface(), &connection_bw.interface); try expectEqual(.ready, server.reader.state); var request = try server.receiveHead(); @@ -33,11 +33,10 @@ test "trailers" { fn serve(request: *http.Server.Request) !void { try expectEqualStrings(request.head.target, "/trailer"); - var response = try request.respondStreaming(.{}); - var bw = response.writer().unbuffered(); - try bw.writeAll("Hello, "); + var response = try request.respondStreaming(&.{}, .{}); + try response.writer.writeAll("Hello, "); try response.flush(); - try bw.writeAll("World!\n"); + try response.writer.writeAll("World!\n"); try response.flush(); try response.endChunked(.{ .trailers = &.{ @@ -66,7 +65,7 @@ test "trailers" { try req.sendBodiless(); var response = try req.receiveHead(&.{}); - const body = try response.reader().readRemainingAlloc(gpa, .limited(8192)); + const body = try response.reader(&.{}).allocRemaining(gpa, .limited(8192)); defer gpa.free(body); try expectEqualStrings("Hello, World!\n", body); @@ -104,13 +103,13 @@ test "HTTP server handles a chunked transfer coding request" { var connection_br = connection.stream.reader(&recv_buffer); var connection_bw = connection.stream.writer(&send_buffer); - var server = http.Server.init(&connection_br, &connection_bw); + var server = http.Server.init(connection_br.interface(), &connection_bw.interface); var request = try server.receiveHead(); try expect(request.head.transfer_encoding == .chunked); var buf: [128]u8 = undefined; - var br = (try request.reader()).unbuffered(); + var br = try request.readerExpectContinue(&.{}); const n = try br.readSliceShort(&buf); try expectEqualStrings("ABCD", buf[0..n]); @@ -141,9 +140,8 @@ test "HTTP server handles a chunked transfer coding request" { const gpa = std.testing.allocator; const stream = try std.net.tcpConnectToHost(gpa, "127.0.0.1", test_server.port()); defer stream.close(); - var stream_writer = stream.writer(); - var writer = stream_writer.interface().unbuffered(); - try writer.writeAll(request_bytes); + var stream_writer = stream.writer(&.{}); + try stream_writer.interface.writeAll(request_bytes); const expected_response = "HTTP/1.1 200 OK\r\n" ++ @@ -152,8 +150,8 @@ test "HTTP server handles a chunked transfer coding request" { "content-type: text/plain\r\n" ++ "\r\n" ++ "message from server!\n"; - var stream_reader = stream.reader(); - const response = try stream_reader.interface().readRemainingAlloc(gpa, .limited(expected_response.len)); + var stream_reader = stream.reader(&.{}); + const response = try stream_reader.interface().allocRemaining(gpa, .limited(expected_response.len)); defer gpa.free(response); try expectEqualStrings(expected_response, response); } @@ -169,11 +167,9 @@ test "echo content server" { const connection = try net_server.accept(); defer connection.stream.close(); - var stream_reader = connection.stream.reader(); - var stream_writer = connection.stream.writer(); - var connection_br = stream_reader.interface().buffered(&recv_buffer); - var connection_bw = stream_writer.interface().buffered(&send_buffer); - var http_server = http.Server.init(&connection_br, &connection_bw); + var connection_br = connection.stream.reader(&recv_buffer); + var connection_bw = connection.stream.writer(&send_buffer); + var http_server = http.Server.init(connection_br.interface(), &connection_bw.interface); while (http_server.reader.state == .ready) { var request = http_server.receiveHead() catch |err| switch (err) { @@ -185,7 +181,7 @@ test "echo content server" { } if (request.head.expect) |expect_header_value| { if (mem.eql(u8, expect_header_value, "garbage")) { - try expectError(error.HttpExpectationFailed, request.reader()); + try expectError(error.HttpExpectationFailed, request.readerExpectContinue(&.{})); try request.respond("", .{ .keep_alive = false }); continue; } @@ -207,14 +203,14 @@ test "echo content server" { // request.head.target, //}); - const body = try (try request.reader()).readRemainingAlloc(std.testing.allocator, .limited(8192)); + const body = try (try request.readerExpectContinue(&.{})).allocRemaining(std.testing.allocator, .limited(8192)); defer std.testing.allocator.free(body); try expect(mem.startsWith(u8, request.head.target, "/echo-content")); try expectEqualStrings("Hello, World!\n", body); try expectEqualStrings("text/plain", request.head.content_type.?); - var response = try request.respondStreaming(.{ + var response = try request.respondStreaming(&.{}, .{ .content_length = switch (request.head.transfer_encoding) { .chunked => null, .none => len: { @@ -224,9 +220,9 @@ test "echo content server" { }, }); try response.flush(); // Test an early flush to send the HTTP headers before the body. - var bw = response.writer().unbuffered(); - try bw.writeAll("Hello, "); - try bw.writeAll("World!\n"); + const w = &response.writer; + try w.writeAll("Hello, "); + try w.writeAll("World!\n"); try response.end(); //std.debug.print(" server finished responding\n", .{}); } @@ -259,27 +255,25 @@ test "Server.Request.respondStreaming non-chunked, unknown content-length" { const connection = try net_server.accept(); defer connection.stream.close(); - var stream_reader = connection.stream.reader(); - var stream_writer = connection.stream.writer(); - var connection_br = stream_reader.interface().buffered(&recv_buffer); - var connection_bw = stream_writer.interface().buffered(&send_buffer); - var server = http.Server.init(&connection_br, &connection_bw); + var connection_br = connection.stream.reader(&recv_buffer); + var connection_bw = connection.stream.writer(&send_buffer); + var server = http.Server.init(connection_br.interface(), &connection_bw.interface); try expectEqual(.ready, server.reader.state); var request = try server.receiveHead(); try expectEqualStrings(request.head.target, "/foo"); - var response = try request.respondStreaming(.{ + var buf: [30]u8 = undefined; + var response = try request.respondStreaming(&buf, .{ .respond_options = .{ .transfer_encoding = .none, }, }); - var buf: [30]u8 = undefined; - var bw = response.writer().buffered(&buf); + const w = &response.writer; for (0..500) |i| { - try bw.print("{d}, ah ha ha!\n", .{i}); + try w.print("{d}, ah ha ha!\n", .{i}); } - try expectEqual(7390, bw.count); - try bw.flush(); + try expectEqual(7390, w.count); + try w.flush(); try response.end(); try expectEqual(.closing, server.reader.state); } @@ -291,12 +285,11 @@ test "Server.Request.respondStreaming non-chunked, unknown content-length" { const gpa = std.testing.allocator; const stream = try std.net.tcpConnectToHost(gpa, "127.0.0.1", test_server.port()); defer stream.close(); - var stream_writer = stream.writer(); - var writer = stream_writer.interface().unbuffered(); - try writer.writeAll(request_bytes); + var stream_writer = stream.writer(&.{}); + try stream_writer.interface.writeAll(request_bytes); - var stream_reader = stream.reader(); - const response = try stream_reader.interface().readRemainingAlloc(gpa, .limited(8192)); + var stream_reader = stream.reader(&.{}); + const response = try stream_reader.interface().allocRemaining(gpa, .limited(8192)); defer gpa.free(response); var expected_response = std.ArrayList(u8).init(gpa); @@ -329,11 +322,9 @@ test "receiving arbitrary http headers from the client" { const connection = try net_server.accept(); defer connection.stream.close(); - var stream_reader = connection.stream.reader(); - var stream_writer = connection.stream.writer(); - var connection_br = stream_reader.interface().buffered(&recv_buffer); - var connection_bw = stream_writer.interface().buffered(&send_buffer); - var server = http.Server.init(&connection_br, &connection_bw); + var connection_br = connection.stream.reader(&recv_buffer); + var connection_bw = connection.stream.writer(&send_buffer); + var server = http.Server.init(connection_br.interface(), &connection_bw.interface); try expectEqual(.ready, server.reader.state); var request = try server.receiveHead(); @@ -364,12 +355,11 @@ test "receiving arbitrary http headers from the client" { const gpa = std.testing.allocator; const stream = try std.net.tcpConnectToHost(gpa, "127.0.0.1", test_server.port()); defer stream.close(); - var stream_writer = stream.writer(); - var writer = stream_writer.interface().unbuffered(); - try writer.writeAll(request_bytes); + var stream_writer = stream.writer(&.{}); + try stream_writer.interface.writeAll(request_bytes); - var stream_reader = stream.reader(); - const response = try stream_reader.interface().readRemainingAlloc(gpa, .limited(8192)); + var stream_reader = stream.reader(&.{}); + const response = try stream_reader.interface().allocRemaining(gpa, .limited(8192)); defer gpa.free(response); var expected_response = std.ArrayList(u8).init(gpa); @@ -397,11 +387,9 @@ test "general client/server API coverage" { var connection = try net_server.accept(); defer connection.stream.close(); - var stream_reader = connection.stream.reader(); - var stream_writer = connection.stream.writer(); - var connection_br = stream_reader.interface().buffered(&recv_buffer); - var connection_bw = stream_writer.interface().buffered(&send_buffer); - var http_server = http.Server.init(&connection_br, &connection_bw); + var connection_br = connection.stream.reader(&recv_buffer); + var connection_bw = connection.stream.writer(&send_buffer); + var http_server = http.Server.init(connection_br.interface(), &connection_bw.interface); while (http_server.reader.state == .ready) { var request = http_server.receiveHead() catch |err| switch (err) { @@ -424,11 +412,11 @@ test "general client/server API coverage" { }); const gpa = std.testing.allocator; - const body = try (try request.reader()).readRemainingAlloc(gpa, .limited(8192)); + const body = try (try request.readerExpectContinue(&.{})).allocRemaining(gpa, .limited(8192)); defer gpa.free(body); if (mem.startsWith(u8, request.head.target, "/get")) { - var response = try request.respondStreaming(.{ + var response = try request.respondStreaming(&.{}, .{ .content_length = if (mem.indexOf(u8, request.head.target, "?chunked") == null) 14 else @@ -439,35 +427,35 @@ test "general client/server API coverage" { }, }, }); - var bw = response.writer().unbuffered(); - try bw.writeAll("Hello, "); - try bw.writeAll("World!\n"); + const w = &response.writer; + try w.writeAll("Hello, "); + try w.writeAll("World!\n"); try response.end(); // Writing again would cause an assertion failure. } else if (mem.startsWith(u8, request.head.target, "/large")) { - var response = try request.respondStreaming(.{ + var response = try request.respondStreaming(&.{}, .{ .content_length = 14 * 1024 + 14 * 10, }); try response.flush(); // Test an early flush to send the HTTP headers before the body. - var bw = response.writer().unbuffered(); + const w = &response.writer; var i: u32 = 0; while (i < 5) : (i += 1) { - try bw.writeAll("Hello, World!\n"); + try w.writeAll("Hello, World!\n"); } - try bw.writeAll("Hello, World!\n" ** 1024); + try w.writeAll("Hello, World!\n" ** 1024); i = 0; while (i < 5) : (i += 1) { - try bw.writeAll("Hello, World!\n"); + try w.writeAll("Hello, World!\n"); } try response.end(); } else if (mem.eql(u8, request.head.target, "/redirect/1")) { - var response = try request.respondStreaming(.{ + var response = try request.respondStreaming(&.{}, .{ .respond_options = .{ .status = .found, .extra_headers = &.{ @@ -476,9 +464,9 @@ test "general client/server API coverage" { }, }); - var bw = response.writer().unbuffered(); - try bw.writeAll("Hello, "); - try bw.writeAll("Redirected!\n"); + const w = &response.writer; + try w.writeAll("Hello, "); + try w.writeAll("Redirected!\n"); try response.end(); } else if (mem.eql(u8, request.head.target, "/redirect/2")) { try request.respond("Hello, Redirected!\n", .{ @@ -567,7 +555,7 @@ test "general client/server API coverage" { try req.sendBodiless(); var response = try req.receiveHead(&redirect_buffer); - const body = try response.reader().readRemainingAlloc(gpa, .limited(8192)); + const body = try response.reader(&.{}).allocRemaining(gpa, .limited(8192)); defer gpa.free(body); try expectEqualStrings("Hello, World!\n", body); @@ -590,7 +578,7 @@ test "general client/server API coverage" { try req.sendBodiless(); var response = try req.receiveHead(&redirect_buffer); - const body = try response.reader().readRemainingAlloc(gpa, .limited(8192 * 1024)); + const body = try response.reader(&.{}).allocRemaining(gpa, .limited(8192 * 1024)); defer gpa.free(body); try expectEqual(@as(usize, 14 * 1024 + 14 * 10), body.len); @@ -612,7 +600,7 @@ test "general client/server API coverage" { try req.sendBodiless(); var response = try req.receiveHead(&redirect_buffer); - const body = try response.reader().readRemainingAlloc(gpa, .limited(8192)); + const body = try response.reader(&.{}).allocRemaining(gpa, .limited(8192)); defer gpa.free(body); try expectEqualStrings("", body); @@ -636,7 +624,7 @@ test "general client/server API coverage" { try req.sendBodiless(); var response = try req.receiveHead(&redirect_buffer); - const body = try response.reader().readRemainingAlloc(gpa, .limited(8192)); + const body = try response.reader(&.{}).allocRemaining(gpa, .limited(8192)); defer gpa.free(body); try expectEqualStrings("Hello, World!\n", body); @@ -659,7 +647,7 @@ test "general client/server API coverage" { try req.sendBodiless(); var response = try req.receiveHead(&redirect_buffer); - const body = try response.reader().readRemainingAlloc(gpa, .limited(8192)); + const body = try response.reader(&.{}).allocRemaining(gpa, .limited(8192)); defer gpa.free(body); try expectEqualStrings("", body); @@ -685,7 +673,7 @@ test "general client/server API coverage" { try req.sendBodiless(); var response = try req.receiveHead(&redirect_buffer); - const body = try response.reader().readRemainingAlloc(gpa, .limited(8192)); + const body = try response.reader(&.{}).allocRemaining(gpa, .limited(8192)); defer gpa.free(body); try expectEqualStrings("Hello, World!\n", body); @@ -714,7 +702,7 @@ test "general client/server API coverage" { try std.testing.expectEqual(.ok, response.head.status); - const body = try response.reader().readRemainingAlloc(gpa, .limited(8192)); + const body = try response.reader(&.{}).allocRemaining(gpa, .limited(8192)); defer gpa.free(body); try expectEqualStrings("", body); @@ -751,7 +739,7 @@ test "general client/server API coverage" { try req.sendBodiless(); var response = try req.receiveHead(&redirect_buffer); - const body = try response.reader().readRemainingAlloc(gpa, .limited(8192)); + const body = try response.reader(&.{}).allocRemaining(gpa, .limited(8192)); defer gpa.free(body); try expectEqualStrings("Hello, World!\n", body); @@ -773,7 +761,7 @@ test "general client/server API coverage" { try req.sendBodiless(); var response = try req.receiveHead(&redirect_buffer); - const body = try response.reader().readRemainingAlloc(gpa, .limited(8192)); + const body = try response.reader(&.{}).allocRemaining(gpa, .limited(8192)); defer gpa.free(body); try expectEqualStrings("Hello, World!\n", body); @@ -795,7 +783,7 @@ test "general client/server API coverage" { try req.sendBodiless(); var response = try req.receiveHead(&redirect_buffer); - const body = try response.reader().readRemainingAlloc(gpa, .limited(8192)); + const body = try response.reader(&.{}).allocRemaining(gpa, .limited(8192)); defer gpa.free(body); try expectEqualStrings("Hello, World!\n", body); @@ -836,7 +824,7 @@ test "general client/server API coverage" { try req.sendBodiless(); var response = try req.receiveHead(&redirect_buffer); - const body = try response.reader().readRemainingAlloc(gpa, .limited(8192)); + const body = try response.reader(&.{}).allocRemaining(gpa, .limited(8192)); defer gpa.free(body); try expectEqualStrings("Encoded redirect successful!\n", body); @@ -878,20 +866,18 @@ test "Server streams both reading and writing" { const connection = try net_server.accept(); defer connection.stream.close(); - var stream_reader = connection.stream.reader(); - var stream_writer = connection.stream.writer(); - var connection_br = stream_reader.interface().buffered(&recv_buffer); - var connection_bw = stream_writer.interface().buffered(&send_buffer); - var server = http.Server.init(&connection_br, &connection_bw); + var connection_br = connection.stream.reader(&recv_buffer); + var connection_bw = connection.stream.writer(&send_buffer); + var server = http.Server.init(connection_br.interface(), &connection_bw.interface); var request = try server.receiveHead(); var read_buffer: [100]u8 = undefined; - var br = (try request.reader()).buffered(&read_buffer); - var response = try request.respondStreaming(.{ + var br = try request.readerExpectContinue(&read_buffer); + var response = try request.respondStreaming(&.{}, .{ .respond_options = .{ .transfer_encoding = .none, // Causes keep_alive=false }, }); - var bw = response.writer().unbuffered(); + const w = &response.writer; while (true) { try response.flush(); @@ -901,7 +887,7 @@ test "Server streams both reading and writing" { }; br.toss(buf.len); for (buf) |*b| b.* = std.ascii.toUpper(b.*); - try bw.writeAll(buf); + try w.writeAll(buf); } try response.end(); } @@ -921,15 +907,14 @@ test "Server streams both reading and writing" { defer req.deinit(); req.transfer_encoding = .chunked; - var body_writer = try req.sendBody(); + var body_writer = try req.sendBody(&.{}); var response = try req.receiveHead(&redirect_buffer); - var w = body_writer.writer().unbuffered(); - try w.writeAll("one "); - try w.writeAll("fish"); + try body_writer.writer.writeAll("one "); + try body_writer.writer.writeAll("fish"); try body_writer.end(); - const body = try response.reader().readRemainingAlloc(std.testing.allocator, .limited(8192)); + const body = try response.reader(&.{}).allocRemaining(std.testing.allocator, .limited(8192)); defer std.testing.allocator.free(body); try expectEqualStrings("ONE FISH", body); @@ -954,15 +939,14 @@ fn echoTests(client: *http.Client, port: u16) !void { req.transfer_encoding = .{ .content_length = 14 }; - var body_writer = try req.sendBody(); - var w = body_writer.writer().unbuffered(); - try w.writeAll("Hello, "); - try w.writeAll("World!\n"); + var body_writer = try req.sendBody(&.{}); + try body_writer.writer.writeAll("Hello, "); + try body_writer.writer.writeAll("World!\n"); try body_writer.end(); var response = try req.receiveHead(&redirect_buffer); - const body = try response.reader().readRemainingAlloc(gpa, .limited(8192)); + const body = try response.reader(&.{}).allocRemaining(gpa, .limited(8192)); defer gpa.free(body); try expectEqualStrings("Hello, World!\n", body); @@ -988,15 +972,14 @@ fn echoTests(client: *http.Client, port: u16) !void { req.transfer_encoding = .chunked; - var body_writer = try req.sendBody(); - var w = body_writer.writer().unbuffered(); - try w.writeAll("Hello, "); - try w.writeAll("World!\n"); + var body_writer = try req.sendBody(&.{}); + try body_writer.writer.writeAll("Hello, "); + try body_writer.writer.writeAll("World!\n"); try body_writer.end(); var response = try req.receiveHead(&redirect_buffer); - const body = try response.reader().readRemainingAlloc(gpa, .limited(8192)); + const body = try response.reader(&.{}).allocRemaining(gpa, .limited(8192)); defer gpa.free(body); try expectEqualStrings("Hello, World!\n", body); @@ -1042,16 +1025,15 @@ fn echoTests(client: *http.Client, port: u16) !void { req.transfer_encoding = .chunked; - var body_writer = try req.sendBody(); - var w = body_writer.writer().unbuffered(); - try w.writeAll("Hello, "); - try w.writeAll("World!\n"); + var body_writer = try req.sendBody(&.{}); + try body_writer.writer.writeAll("Hello, "); + try body_writer.writer.writeAll("World!\n"); try body_writer.end(); var response = try req.receiveHead(&redirect_buffer); try expectEqual(.ok, response.head.status); - const body = try response.reader().readRemainingAlloc(gpa, .limited(8192)); + const body = try response.reader(&.{}).allocRemaining(gpa, .limited(8192)); defer gpa.free(body); try expectEqualStrings("Hello, World!\n", body); @@ -1073,11 +1055,11 @@ fn echoTests(client: *http.Client, port: u16) !void { req.transfer_encoding = .chunked; - var body_writer = try req.sendBody(); + var body_writer = try req.sendBody(&.{}); try body_writer.flush(); var response = try req.receiveHead(&redirect_buffer); try expectEqual(.expectation_failed, response.head.status); - _ = try response.reader().discardRemaining(); + _ = try response.reader(&.{}).discardRemaining(); } } @@ -1128,11 +1110,9 @@ test "redirect to different connection" { const connection = try net_server.accept(); defer connection.stream.close(); - var stream_reader = connection.stream.reader(); - var stream_writer = connection.stream.writer(); - var connection_br = stream_reader.interface().buffered(&recv_buffer); - var connection_bw = stream_writer.interface().buffered(&send_buffer); - var server = http.Server.init(&connection_br, &connection_bw); + var connection_br = connection.stream.reader(&recv_buffer); + var connection_bw = connection.stream.writer(&send_buffer); + var server = http.Server.init(connection_br.interface(), &connection_bw.interface); var request = try server.receiveHead(); try expectEqualStrings(request.head.target, "/ok"); try request.respond("good job, you pass", .{}); @@ -1161,7 +1141,7 @@ test "redirect to different connection" { var connection_br = connection.stream.reader(&recv_buffer); var connection_bw = connection.stream.writer(&send_buffer); - var server = http.Server.init(&connection_br, &connection_bw); + var server = http.Server.init(connection_br.interface(), &connection_bw.interface); var request = try server.receiveHead(); try expectEqualStrings(request.head.target, "/help"); try request.respond("", .{ diff --git a/lib/std/io/Reader.zig b/lib/std/io/Reader.zig index fc0e57e5bc..a52d934844 100644 --- a/lib/std/io/Reader.zig +++ b/lib/std/io/Reader.zig @@ -96,7 +96,10 @@ pub const failing: Reader = .{ .end = 0, }; -pub const ending: Reader = .fixed(&.{}); +/// This is generally safe to `@constCast` because it has an empty buffer, so +/// there is not really a way to accidentally attempt mutation of these fields. +const ending_state: Reader = .fixed(&.{}); +pub const ending: *Reader = @constCast(&ending_state); pub fn limited(r: *Reader, limit: Limit, buffer: []u8) Limited { return Limited.init(r, limit, buffer); diff --git a/lib/std/io/Writer.zig b/lib/std/io/Writer.zig index 58c497c4da..80cef5eb85 100644 --- a/lib/std/io/Writer.zig +++ b/lib/std/io/Writer.zig @@ -221,11 +221,12 @@ pub fn writeSplatLimit( /// `end`. pub fn flush(w: *Writer) Error!void { assert(0 == try w.vtable.drain(w, &.{}, 0)); + if (w.end != 0) assert(w.vtable.drain == &fixedDrain); } /// Calls `VTable.drain` but hides the last `preserve_length` bytes from the /// implementation, keeping them buffered. -pub fn drainLimited(w: *Writer, preserve_length: usize) Error!void { +pub fn drainPreserve(w: *Writer, preserve_length: usize) Error!void { const temp_end = w.end -| preserve_length; const preserved = w.buffer[temp_end..w.end]; w.end = temp_end; @@ -235,6 +236,67 @@ pub fn drainLimited(w: *Writer, preserve_length: usize) Error!void { @memmove(w.buffer[w.end..][0..preserved.len], preserved); } +/// Forwards a `drain` to a second `Writer` instance. `w` is only used for its +/// buffer, but it has its `end` and `count` adjusted accordingly depending on +/// how much was consumed. +/// +/// Returns how many bytes from `data` were consumed. +pub fn drainTo(noalias w: *Writer, noalias other: *Writer, data: []const []const u8, splat: usize) Error!usize { + assert(w != other); + const header = w.buffered(); + const new_end = other.end + header.len; + if (new_end <= other.buffer.len) { + @memcpy(other.buffer[other.end..][0..header.len], header); + other.end = new_end; + other.count += header.len; + w.end = 0; + const n = try other.vtable.drain(other, data, splat); + other.count += n; + return n; + } + if (other.vtable == &VectorWrapper.vtable) { + const wrapper: *VectorWrapper = @fieldParentPtr("writer", w); + while (wrapper.it.next()) |dest| { + _ = dest; + @panic("TODO"); + } + } + var vecs: [8][]const u8 = undefined; // Arbitrarily chosen size. + var i: usize = 1; + vecs[0] = header; + for (data) |buf| { + if (buf.len == 0) continue; + vecs[i] = buf; + i += 1; + if (vecs.len - i == 0) break; + } + const new_splat = if (vecs[i - 1].ptr == data[data.len - 1].ptr) splat else 1; + const n = try other.vtable.drain(other, vecs[0..i], new_splat); + other.count += n; + if (n < header.len) { + const remaining = w.buffer[n..w.end]; + @memmove(w.buffer[0..remaining.len], remaining); + w.end = remaining.len; + return 0; + } + defer w.end = 0; + return n - header.len; +} + +pub fn drainToLimit( + noalias w: *Writer, + noalias other: *Writer, + data: []const []const u8, + splat: usize, + limit: Limit, +) Error!usize { + assert(w != other); + _ = data; + _ = splat; + _ = limit; + @panic("TODO"); +} + pub fn unusedCapacitySlice(w: *const Writer) []u8 { return w.buffer[w.end..]; } @@ -285,10 +347,10 @@ pub fn writableSliceGreedy(w: *Writer, minimum_length: usize) Error![]u8 { /// remain buffered. /// /// If `preserve_length` is zero, this is equivalent to `writableSliceGreedy`. -pub fn writableSliceGreedyPreserving(w: *Writer, preserve_length: usize, minimum_length: usize) Error![]u8 { +pub fn writableSliceGreedyPreserve(w: *Writer, preserve_length: usize, minimum_length: usize) Error![]u8 { assert(w.buffer.len >= preserve_length + minimum_length); while (w.buffer.len - w.end < minimum_length) { - try drainLimited(w, preserve_length); + try drainPreserve(w, preserve_length); } else { @branchHint(.likely); return w.buffer[w.end..]; @@ -444,7 +506,7 @@ pub fn write(w: *Writer, bytes: []const u8) Error!usize { } /// Asserts `buffer` capacity exceeds `preserve_length`. -pub fn writePreserving(w: *Writer, preserve_length: usize, bytes: []const u8) Error!usize { +pub fn writePreserve(w: *Writer, preserve_length: usize, bytes: []const u8) Error!usize { assert(preserve_length <= w.buffer.len); if (w.end + bytes.len <= w.buffer.len) { @branchHint(.likely); @@ -478,9 +540,9 @@ pub fn writeAll(w: *Writer, bytes: []const u8) Error!void { /// remain buffered. /// /// Asserts `buffer` capacity exceeds `preserve_length`. -pub fn writeAllPreserving(w: *Writer, preserve_length: usize, bytes: []const u8) Error!void { +pub fn writeAllPreserve(w: *Writer, preserve_length: usize, bytes: []const u8) Error!void { var index: usize = 0; - while (index < bytes.len) index += try w.writePreserving(preserve_length, bytes[index..]); + while (index < bytes.len) index += try w.writePreserve(preserve_length, bytes[index..]); } pub fn print(w: *Writer, comptime format: []const u8, args: anytype) Error!void { @@ -505,9 +567,9 @@ pub fn writeByte(w: *Writer, byte: u8) Error!void { /// When draining the buffer, ensures that at least `preserve_length` bytes /// remain buffered. -pub fn writeBytePreserving(w: *Writer, preserve_length: usize, byte: u8) Error!void { +pub fn writeBytePreserve(w: *Writer, preserve_length: usize, byte: u8) Error!void { while (w.buffer.len - w.end == 0) { - try drainLimited(w, preserve_length); + try drainPreserve(w, preserve_length); } else { @branchHint(.likely); w.buffer[w.end] = byte; @@ -615,19 +677,26 @@ pub fn sendFile(w: *Writer, file_reader: *File.Reader, limit: Limit) FileError!u /// on how much was consumed. /// /// Returns how many bytes from `file_reader` were consumed. -pub fn sendFileTo(w: *Writer, other: *Writer, file_reader: *File.Reader, limit: Limit) FileError!usize { +pub fn sendFileTo( + noalias w: *Writer, + noalias other: *Writer, + file_reader: *File.Reader, + limit: Limit, +) FileError!usize { + assert(w != other); const header = w.buffered(); const new_end = other.end + header.len; if (new_end <= other.buffer.len) { @memcpy(other.buffer[other.end..][0..header.len], header); other.end = new_end; + other.count += header.len; w.end = 0; return other.vtable.sendFile(other, file_reader, limit); } assert(header.len > 0); var vec_buf: [2][]const u8 = .{ header, undefined }; var vec_i: usize = 1; - const buffered_contents = limit.slice(file_reader.buffered()); + const buffered_contents = limit.slice(file_reader.interface.buffered()); if (buffered_contents.len > 0) { vec_buf[vec_i] = buffered_contents; vec_i += 1;