diff --git a/lib/std/http/Client.zig b/lib/std/http/Client.zig index b0c25d738a..010c557f87 100644 --- a/lib/std/http/Client.zig +++ b/lib/std/http/Client.zig @@ -15,8 +15,6 @@ const proto = @import("protocol.zig"); pub const default_connection_pool_size = 32; pub const connection_pool_size = std.options.http_connection_pool_size; -/// Used for tcpConnectToHost and storing HTTP headers when an externally -/// managed buffer is not provided. allocator: Allocator, ca_bundle: std.crypto.Certificate.Bundle = .{}, ca_bundle_mutex: std.Thread.Mutex = .{}, @@ -24,8 +22,10 @@ ca_bundle_mutex: std.Thread.Mutex = .{}, /// it will first rescan the system for root certificates. next_https_rescan_certs: bool = true, +/// The pool of connections that can be reused (and currently in use). connection_pool: ConnectionPool = .{}, +/// The last error that occurred on this client. This is not threadsafe, do not expect it to be completely accurate. last_error: ?ExtraError = null, pub const ExtraError = union(enum) { @@ -68,7 +68,9 @@ pub const ExtraError = union(enum) { decompress: DecompressError, // error.ReadFailed }; +/// A set of linked lists of connections that can be reused. pub const ConnectionPool = struct { + /// The criteria for a connection to be considered a match. pub const Criteria = struct { host: []const u8, port: u16, @@ -92,7 +94,9 @@ pub const ConnectionPool = struct { pub const Node = Queue.Node; mutex: std.Thread.Mutex = .{}, + /// Open connections that are currently in use. used: Queue = .{}, + /// Open connections that are not currently in use. free: Queue = .{}, free_len: usize = 0, free_size: usize = connection_pool_size, @@ -189,6 +193,7 @@ pub const ConnectionPool = struct { } }; +/// An interface to either a plain or TLS connection. pub const Connection = struct { stream: net.Stream, /// undefined unless protocol is tls. @@ -261,6 +266,7 @@ pub const Connection = struct { } }; +/// A buffered (and peekable) Connection. pub const BufferedConnection = struct { pub const buffer_size = 0x2000; @@ -344,12 +350,14 @@ pub const BufferedConnection = struct { } }; +/// The mode of transport for requests. pub const RequestTransfer = union(enum) { content_length: u64, chunked: void, none: void, }; +/// The decompressor for response messages. pub const Compression = union(enum) { pub const DeflateDecompressor = std.compress.zlib.ZlibStream(Request.TransferReader); pub const GzipDecompressor = std.compress.gzip.Decompress(Request.TransferReader); @@ -361,6 +369,7 @@ pub const Compression = union(enum) { none: void, }; +/// A HTTP response originating from a server. pub const Response = struct { pub const Headers = struct { status: http.Status, @@ -501,14 +510,9 @@ pub const Response = struct { skip: bool = false, }; -/// A HTTP request. +/// A HTTP request that has been sent. /// -/// Order of operations: -/// - request -/// - write -/// - finish -/// - do -/// - read +/// Order of operations: request[ -> write -> finish] -> do -> read pub const Request = struct { pub const Headers = struct { version: http.Version = .@"HTTP/1.1", @@ -862,6 +866,8 @@ pub const Request = struct { } }; +/// Release all associated resources with the client. +/// TODO: currently leaks all request allocated data pub fn deinit(client: *Client) void { client.connection_pool.deinit(client); @@ -871,6 +877,8 @@ pub fn deinit(client: *Client) void { pub const ConnectError = Allocator.Error || error{ ConnectionFailed, TlsInitializationFailed }; +/// Connect to `host:port` using the specified protocol. This will reuse a connection if one is already open. +/// This function is threadsafe. pub fn connect(client: *Client, host: []const u8, port: u16, protocol: Connection.Protocol) ConnectError!*ConnectionPool.Node { if (client.connection_pool.findConnection(.{ .host = host, @@ -955,6 +963,8 @@ pub const protocol_map = std.ComptimeStringMap(Connection.Protocol, .{ .{ "wss", .tls }, }); +/// Form and send a http request to a server. +/// This function is threadsafe. pub fn request(client: *Client, uri: Uri, headers: Request.Headers, options: Options) RequestError!Request { const protocol = protocol_map.get(uri.scheme) orelse return error.UnsupportedUrlScheme; diff --git a/lib/std/http/Server.zig b/lib/std/http/Server.zig index b870d267f5..85fbf25265 100644 --- a/lib/std/http/Server.zig +++ b/lib/std/http/Server.zig @@ -14,10 +14,7 @@ allocator: Allocator, socket: net.StreamServer, -pub const DeflateDecompressor = std.compress.zlib.ZlibStream(Response.TransferReader); -pub const GzipDecompressor = std.compress.gzip.Decompress(Response.TransferReader); -pub const ZstdDecompressor = std.compress.zstd.DecompressStream(Response.TransferReader, .{}); - +/// An interface to either a plain or TLS connection. pub const Connection = struct { stream: net.Stream, protocol: Protocol, @@ -74,6 +71,7 @@ pub const Connection = struct { } }; +/// A buffered (and peekable) Connection. pub const BufferedConnection = struct { pub const buffer_size = 0x2000; @@ -157,6 +155,7 @@ pub const BufferedConnection = struct { } }; +/// A HTTP request originating from a client. pub const Request = struct { pub const Headers = struct { method: http.Method, @@ -290,6 +289,11 @@ pub const Request = struct { compression: Compression = .none, }; +/// A HTTP response waiting to be sent. +/// +/// [/ <----------------------------------- \] +/// Order of operations: accept -> wait -> do [ -> write -> finish][ -> reset /] +/// \ -> read / pub const Response = struct { pub const Headers = struct { version: http.Version = .@"HTTP/1.1", @@ -310,6 +314,7 @@ pub const Response = struct { headers: Headers = .{}, request: Request, + /// Reset this response to its initial state. This must be called before handling a second request on the same connection. pub fn reset(res: *Response) void { switch (res.request.compression) { .none => {}, @@ -336,7 +341,8 @@ pub const Response = struct { } } - pub fn sendResponseHead(res: *Response) !void { + /// Send the response headers. + pub fn do(res: *Response) !void { var buffered = std.io.bufferedWriter(res.connection.writer()); const w = buffered.writer(); @@ -402,7 +408,8 @@ pub const Response = struct { pub const WaitForCompleteHeadError = BufferedConnection.ReadError || proto.HeadersParser.WaitForCompleteHeadError || Request.Headers.ParseError || error{ BadHeader, InvalidCompression, StreamTooLong, InvalidWindowSize } || error{CompressionNotSupported}; - pub fn waitForCompleteHead(res: *Response) !void { + /// Wait for the client to send a complete request head. + pub fn wait(res: *Response) !void { while (true) { try res.connection.fill(); @@ -451,7 +458,7 @@ pub const Response = struct { } } - pub const ReadError = DeflateDecompressor.Error || GzipDecompressor.Error || ZstdDecompressor.Error || WaitForCompleteHeadError; + pub const ReadError = Compression.DeflateDecompressor.Error || Compression.GzipDecompressor.Error || Compression.ZstdDecompressor.Error || WaitForCompleteHeadError; pub const Reader = std.io.Reader(*Response, ReadError, read); @@ -517,13 +524,19 @@ pub const Response = struct { } }; +/// The mode of transport for responses. pub const RequestTransfer = union(enum) { content_length: u64, chunked: void, none: void, }; +/// The decompressor for request messages. pub const Compression = union(enum) { + pub const DeflateDecompressor = std.compress.zlib.ZlibStream(Response.TransferReader); + pub const GzipDecompressor = std.compress.gzip.Decompress(Response.TransferReader); + pub const ZstdDecompressor = std.compress.zstd.DecompressStream(Response.TransferReader, .{}); + deflate: DeflateDecompressor, gzip: GzipDecompressor, zstd: ZstdDecompressor, @@ -543,6 +556,7 @@ pub fn deinit(server: *Server) void { pub const ListenError = std.os.SocketError || std.os.BindError || std.os.ListenError || std.os.SetSockOptError || std.os.GetSockNameError; +/// Start the HTTP server listening on the given address. pub fn listen(server: *Server, address: net.Address) !void { try server.socket.listen(address); } @@ -562,6 +576,7 @@ pub const HeaderStrategy = union(enum) { static: []u8, }; +/// Accept a new connection and allocate a Response for it. pub fn accept(server: *Server, options: HeaderStrategy) AcceptError!*Response { const in = try server.socket.accept(); diff --git a/lib/std/http/protocol.zig b/lib/std/http/protocol.zig index 2425c621cf..784b693064 100644 --- a/lib/std/http/protocol.zig +++ b/lib/std/http/protocol.zig @@ -21,6 +21,7 @@ pub const State = enum { chunk_data_suffix, chunk_data_suffix_r, + /// Returns true if the parser is in a content state (ie. not waiting for more headers). pub fn isContent(self: State) bool { return switch (self) { .invalid, .start, .seen_n, .seen_r, .seen_rn, .seen_rnr => false, @@ -31,7 +32,7 @@ pub const State = enum { pub const HeadersParser = struct { state: State = .start, - /// Wether or not `header_bytes` is allocated or was provided as a fixed buffer. + /// Whether or not `header_bytes` is allocated or was provided as a fixed buffer. header_bytes_owned: bool, /// Either a fixed buffer of len `max_header_bytes` or a dynamic buffer that can grow up to `max_header_bytes`. /// Pointers into this buffer are not stable until after a message is complete. @@ -39,10 +40,11 @@ pub const HeadersParser = struct { /// The maximum allowed size of `header_bytes`. max_header_bytes: usize, next_chunk_length: u64 = 0, - /// Wether this parser is done parsing a complete message. - /// A message is only done when the entire payload has been read + /// Whether this parser is done parsing a complete message. + /// A message is only done when the entire payload has been read. done: bool = false, + /// Initializes the parser with a dynamically growing header buffer of up to `max` bytes. pub fn initDynamic(max: usize) HeadersParser { return .{ .header_bytes = .{}, @@ -51,6 +53,7 @@ pub const HeadersParser = struct { }; } + /// Initializes the parser with a provided buffer `buf`. pub fn initStatic(buf: []u8) HeadersParser { return .{ .header_bytes = .{ .items = buf[0..0], .capacity = buf.len }, @@ -59,7 +62,11 @@ pub const HeadersParser = struct { }; } + /// Completely resets the parser to it's initial state. + /// This must be called after a message is complete. pub fn reset(r: *HeadersParser) void { + assert(r.done); // The message must be completely read before reset, otherwise the parser is in an invalid state. + r.header_bytes.clearRetainingCapacity(); r.* = .{ @@ -69,13 +76,14 @@ pub const HeadersParser = struct { }; } - /// Returns how many bytes are part of HTTP headers. Always less than or - /// equal to bytes.len. If the amount returned is less than bytes.len, it - /// means the headers ended and the first byte after the double \r\n\r\n is - /// located at `bytes[result]`. + /// Returns the number of bytes consumed by headers. This is always less than or equal to `bytes.len`. + /// You should check `r.state.isContent()` after this to check if the headers are done. + /// + /// If the amount returned is less than `bytes.len`, you may assume that the parser is in a content state and the + /// first byte of content is located at `bytes[result]`. pub fn findHeadersEnd(r: *HeadersParser, bytes: []const u8) u32 { - const vector_len = 16; - const len = @truncate(u32, bytes.len); + const vector_len: comptime_int = comptime std.simd.suggestVectorSize(u8) orelse 8; + const len = @intCast(u32, bytes.len); var index: u32 = 0; while (true) { @@ -390,8 +398,13 @@ pub const HeadersParser = struct { } } + /// Returns the number of bytes consumed by the chunk size. This is always less than or equal to `bytes.len`. + /// You should check `r.state == .chunk_data` after this to check if the chunk size has been fully parsed. + /// + /// If the amount returned is less than `bytes.len`, you may assume that the parser is in the `chunk_data` state + /// and that the first byte of the chunk is at `bytes[result]`. pub fn findChunkedLen(r: *HeadersParser, bytes: []const u8) u32 { - const len = @truncate(u32, bytes.len); + const len = @intCast(u32, bytes.len); for (bytes[0..], 0..) |c, i| { const index = @intCast(u32, i); @@ -471,8 +484,10 @@ pub const HeadersParser = struct { pub const CheckCompleteHeadError = mem.Allocator.Error || error{HttpHeadersExceededSizeLimit}; - /// Pumps `in` bytes into the parser. Returns the number of bytes consumed. This function will return 0 if the parser - /// is not in a state to parse more headers. + /// Pushes `in` into the parser. Returns the number of bytes consumed by the header. Any header bytes are appended + /// to the `header_bytes` buffer. + /// + /// This function only uses `allocator` if `r.header_bytes_owned` is true, and may be undefined otherwise. pub fn checkCompleteHead(r: *HeadersParser, allocator: std.mem.Allocator, in: []const u8) CheckCompleteHeadError!u32 { if (r.state.isContent()) return 0; @@ -493,8 +508,11 @@ pub const HeadersParser = struct { HttpChunkInvalid, }; - /// Reads the body of the message into `buffer`. If `skip` is true, the buffer will be unused and the body will be - /// skipped. Returns the number of bytes placed in the buffer. + /// Reads the body of the message into `buffer`. Returns the number of bytes placed in the buffer. + /// + /// If `skip` is true, the buffer will be unused and the body will be skipped. + /// + /// See `std.http.Client.BufferedConnection for an example of `bconn`. pub fn read(r: *HeadersParser, bconn: anytype, buffer: []u8, skip: bool) !usize { assert(r.state.isContent()); if (r.done) return 0;