std.http: mostly finish the rewrite

This commit is contained in:
Andrew Kelley 2025-04-28 19:19:41 -07:00
parent aef0434c01
commit c7040171fb
7 changed files with 434 additions and 306 deletions

View File

@ -476,7 +476,7 @@ fn serveSourcesTar(ws: *WebServer, request: *std.http.Server.Request) !void {
defer arena_instance.deinit();
const arena = arena_instance.allocator();
var body_writer = try request.respondStreaming(.{
var body = try request.respondStreaming(.{
.respond_options = .{
.extra_headers = &.{
.{ .name = "content-type", .value = "application/x-tar" },
@ -517,7 +517,7 @@ fn serveSourcesTar(ws: *WebServer, request: *std.http.Server.Request) !void {
var cwd_cache: ?[]const u8 = null;
var response_writer = body_writer.interface().unbuffered();
var response_writer = body.writer().unbuffered();
var archiver: std.tar.Writer = .{ .underlying_writer = &response_writer };
for (deduped_paths) |joined_path| {
@ -531,7 +531,7 @@ fn serveSourcesTar(ws: *WebServer, request: *std.http.Server.Request) !void {
try archiver.writeFile(joined_path.sub_path, file, try file.stat());
}
try body_writer.end();
try body.end();
}
fn memoizedCwd(arena: Allocator, opt_ptr: *?[]const u8) ![]const u8 {

View File

@ -295,13 +295,24 @@ pub const TransferEncoding = enum {
};
pub const ContentEncoding = enum {
identity,
compress,
@"x-compress",
deflate,
gzip,
@"x-gzip",
zstd,
gzip,
deflate,
compress,
identity,
pub fn fromString(s: []const u8) ?ContentEncoding {
const map = std.StaticStringMap(ContentEncoding).initComptime(.{
.{ "zstd", .zstd },
.{ "gzip", .gzip },
.{ "x-gzip", .gzip },
.{ "deflate", .deflate },
.{ "compress", .compress },
.{ "x-compress", .compress },
.{ "identity", .identity },
});
return map.get(s);
}
};
pub const Connection = enum {
@ -331,18 +342,9 @@ pub const Reader = struct {
body_err: ?BodyError = null,
/// Stolen from `in`.
head_buffer: []u8 = &.{},
compression: Compression,
pub const max_chunk_header_len = 22;
pub const Compression = union(enum) {
deflate: std.compress.zlib.Decompressor,
gzip: std.compress.gzip.Decompressor,
// https://github.com/ziglang/zig/issues/18937
//zstd: std.compress.zstd.Decompressor,
none: void,
};
pub const RemainingChunkLen = enum(u64) {
head = 0,
n = 1,
@ -416,19 +418,19 @@ pub const Reader = struct {
}
}
/// If compressed body has been negotiated this will return compressed bytes.
///
/// Asserts only called once and after `receiveHead`.
pub fn interface(
reader: *Reader,
transfer_encoding: TransferEncoding,
content_length: ?u64,
content_encoding: ContentEncoding,
) std.io.Reader {
///
/// See also:
/// * `interfaceDecompressing`
pub fn bodyReader(reader: *Reader, transfer_encoding: TransferEncoding, content_length: ?u64) std.io.Reader {
assert(reader.state == .received_head);
reader.state = .receiving_body;
reader.transfer_br.unbuffered_reader = switch (transfer_encoding) {
.chunked => r: {
return switch (transfer_encoding) {
.chunked => {
reader.body_state = .{ .remaining_chunk_len = .head };
break :r .{
return .{
.context = reader,
.vtable = &.{
.read = &chunkedRead,
@ -437,10 +439,10 @@ pub const Reader = struct {
},
};
},
.none => r: {
.none => {
if (content_length) |len| {
reader.body_state = .{ .remaining_content_length = len };
break :r .{
return .{
.context = reader,
.vtable = &.{
.read = &contentLengthRead,
@ -448,40 +450,53 @@ pub const Reader = struct {
.discard = &contentLengthDiscard,
},
};
} else switch (content_encoding) {
.identity => {
reader.compression = .none;
return reader.in.reader();
},
.deflate => {
reader.compression = .{ .deflate = .init(reader.in) };
return reader.compression.deflate.reader();
},
.gzip, .@"x-gzip" => {
reader.compression = .{ .gzip = .init(reader.in) };
return reader.compression.gzip.reader();
},
.compress, .@"x-compress" => unreachable,
.zstd => unreachable, // https://github.com/ziglang/zig/issues/18937
} else {
return reader.in.reader();
}
},
};
switch (content_encoding) {
.identity => {
reader.compression = .none;
return reader.transfer_br.unbuffered_reader;
},
.deflate => {
reader.compression = .{ .deflate = .init(&reader.transfer_br) };
return reader.compression.deflate.reader();
},
.gzip, .@"x-gzip" => {
reader.compression = .{ .gzip = .init(&reader.transfer_br) };
return reader.compression.gzip.reader();
},
.compress, .@"x-compress" => unreachable,
.zstd => unreachable, // https://github.com/ziglang/zig/issues/18937
}
/// If compressed body has been negotiated this will return decompressed bytes.
///
/// Asserts only called once and after `receiveHead`.
///
/// See also:
/// * `interface`
pub fn bodyReaderDecompressing(
reader: *Reader,
transfer_encoding: TransferEncoding,
content_length: ?u64,
content_encoding: ContentEncoding,
decompressor: *Decompressor,
decompression_buffer: []u8,
) std.io.Reader {
if (transfer_encoding == .none and content_length == null) {
assert(reader.state == .received_head);
reader.state = .receiving_body;
switch (content_encoding) {
.identity => {
return reader.in.reader();
},
.deflate => {
decompressor.compression = .{ .deflate = .init(reader.in) };
return decompressor.compression.deflate.reader();
},
.gzip => {
decompressor.compression = .{ .gzip = .init(reader.in) };
return decompressor.compression.gzip.reader();
},
.zstd => {
decompressor.compression = .{ .zstd = .init(reader.in, .{
.window_buffer = decompression_buffer,
}) };
return decompressor.compression.zstd.reader();
},
.compress => unreachable,
}
}
const transfer_reader = bodyReader(reader, transfer_encoding, content_length);
return decompressor.reader(transfer_reader, decompression_buffer, content_encoding);
}
fn contentLengthRead(
@ -720,6 +735,52 @@ pub const Reader = struct {
}
};
pub const Decompressor = struct {
compression: Compression,
buffered_reader: std.io.BufferedReader,
pub const Compression = union(enum) {
deflate: std.compress.zlib.Decompressor,
gzip: std.compress.gzip.Decompressor,
zstd: std.compress.zstd.Decompressor,
none: void,
};
pub fn reader(
decompressor: *Decompressor,
transfer_reader: std.io.Reader,
buffer: []u8,
content_encoding: ContentEncoding,
) std.io.Reader {
switch (content_encoding) {
.identity => {
decompressor.compression = .none;
return transfer_reader;
},
.deflate => {
decompressor.buffered_reader = transfer_reader.buffered(buffer);
decompressor.compression = .{ .deflate = .init(&decompressor.buffered_reader) };
return decompressor.compression.deflate.reader();
},
.gzip => {
decompressor.buffered_reader = transfer_reader.buffered(buffer);
decompressor.compression = .{ .gzip = .init(&decompressor.buffered_reader) };
return decompressor.compression.gzip.reader();
},
.zstd => {
const first_half = buffer[0 .. buffer.len / 2];
const second_half = buffer[buffer.len / 2 ..];
decompressor.buffered_reader = transfer_reader.buffered(first_half);
decompressor.compression = .{ .zstd = .init(&decompressor.buffered_reader, .{
.window_buffer = second_half,
}) };
return decompressor.compression.gzip.reader();
},
.compress => unreachable,
}
}
};
/// Request or response body.
pub const BodyWriter = struct {
/// Until the lifetime of `BodyWriter` ends, it is illegal to modify the

View File

@ -85,7 +85,7 @@ pub const ConnectionPool = struct {
if (connection.port != criteria.port) continue;
// Domain names are case-insensitive (RFC 5890, Section 2.3.2.4)
if (!std.ascii.eqlIgnoreCase(connection.host, criteria.host)) continue;
if (!std.ascii.eqlIgnoreCase(connection.host(), criteria.host)) continue;
pool.acquireUnsafe(connection);
return connection;
@ -227,7 +227,8 @@ pub const Protocol = enum {
pub const Connection = struct {
client: *Client,
stream: net.Stream,
stream_writer: net.Stream.Writer,
stream_reader: net.Stream.Reader,
/// HTTP protocol from client to server.
/// This either goes directly to `stream`, or to a TLS client.
writer: std.io.BufferedWriter,
@ -249,7 +250,7 @@ pub const Connection = struct {
remote_host: []const u8,
port: u16,
stream: net.Stream,
) error{OutOfMemory}!*Connection {
) error{OutOfMemory}!*Plain {
const gpa = client.allocator;
const alloc_len = allocLen(client, remote_host.len);
const base = try gpa.alignedAlloc(u8, .of(Plain), alloc_len);
@ -263,17 +264,19 @@ pub const Connection = struct {
plain.* = .{
.connection = .{
.client = client,
.stream = stream,
.writer = stream.writer().buffered(socket_write_buffer),
.stream_writer = stream.writer(),
.stream_reader = stream.reader(),
.writer = plain.connection.stream_writer.interface().buffered(socket_write_buffer),
.pool_node = .{},
.port = port,
.host_len = @intCast(remote_host.len),
.proxied = false,
.closing = false,
.protocol = .plain,
},
.reader = undefined,
.reader = plain.connection.stream_reader.interface().buffered(socket_read_buffer),
};
plain.reader.init(stream.reader(), socket_read_buffer);
return plain;
}
fn destroy(plain: *Plain) void {
@ -321,19 +324,20 @@ pub const Connection = struct {
tls.* = .{
.connection = .{
.client = client,
.stream = stream,
.stream_writer = stream.writer(),
.stream_reader = stream.reader(),
.writer = tls.client.writer().buffered(socket_write_buffer),
.pool_node = .{},
.port = port,
.host_len = @intCast(remote_host.len),
.proxied = false,
.closing = false,
.protocol = .tls,
},
.writer = stream.writer().buffered(tls_write_buffer),
.reader = undefined,
.writer = tls.connection.stream_writer.interface().buffered(tls_write_buffer),
.reader = tls.connection.stream_reader.interface().buffered(tls_read_buffer),
.client = undefined,
};
tls.reader.init(stream.reader(), tls_read_buffer);
// TODO data race here on ca_bundle if the user sets next_https_rescan_certs to true
tls.client.init(&tls.reader, &tls.writer, .{
.host = .{ .explicit = remote_host },
@ -364,6 +368,10 @@ pub const Connection = struct {
}
};
fn getStream(c: *Connection) net.Stream {
return c.stream_reader.getStream();
}
fn host(c: *Connection) []u8 {
return switch (c.protocol) {
.tls => {
@ -396,7 +404,7 @@ pub const Connection = struct {
/// If this is called without calling `flush` or `end`, data will be
/// dropped unsent.
pub fn destroy(c: *Connection) void {
c.stream.close();
c.getStream().close();
switch (c.protocol) {
.tls => {
if (disable_tls) unreachable;
@ -457,12 +465,12 @@ pub const Response = struct {
content_encoding: http.ContentEncoding = .identity,
pub const ParseError = error{
HttpHeadersInvalid,
HttpHeaderContinuationsUnsupported,
HttpTransferEncodingUnsupported,
HttpConnectionHeaderUnsupported,
HttpContentEncodingUnsupported,
HttpHeaderContinuationsUnsupported,
HttpHeadersInvalid,
HttpTransferEncodingUnsupported,
InvalidContentLength,
CompressionUnsupported,
};
pub fn parse(bytes: []const u8) ParseError!Head {
@ -536,7 +544,7 @@ pub const Response = struct {
if (next) |second| {
const trimmed_second = mem.trim(u8, second, " ");
if (std.meta.stringToEnum(http.ContentEncoding, trimmed_second)) |transfer| {
if (http.ContentEncoding.fromString(trimmed_second)) |transfer| {
if (res.content_encoding != .identity) return error.HttpHeadersInvalid; // double compression is not supported
res.content_encoding = transfer;
} else {
@ -556,10 +564,10 @@ pub const Response = struct {
const trimmed = mem.trim(u8, header_value, " ");
if (std.meta.stringToEnum(http.ContentEncoding, trimmed)) |ce| {
if (http.ContentEncoding.fromString(trimmed)) |ce| {
res.content_encoding = ce;
} else {
return error.HttpTransferEncodingUnsupported;
return error.HttpContentEncodingUnsupported;
}
}
}
@ -664,10 +672,49 @@ pub const Response = struct {
}
};
/// If compressed body has been negotiated this will return compressed bytes.
///
/// If the returned `std.io.Reader` returns `error.ReadFailed` the error is
/// available via `bodyErr`.
///
/// Asserts that this function is only called once.
///
/// See also:
/// * `readerDecompressing`
pub fn reader(response: *Response) std.io.Reader {
const head = &response.head;
return response.request.reader.interface(head.transfer_encoding, head.content_length, head.content_encoding);
return response.request.reader.bodyReader(head.transfer_encoding, head.content_length);
}
/// If compressed body has been negotiated this will return decompressed bytes.
///
/// If the returned `std.io.Reader` returns `error.ReadFailed` the error is
/// available via `bodyErr`.
///
/// Asserts that this function is only called once.
///
/// See also:
/// * `reader`
pub fn readerDecompressing(
response: *Response,
decompressor: *http.Decompressor,
decompression_buffer: []u8,
) std.io.Reader {
const head = &response.head;
return response.request.reader.bodyReaderDecompressing(
head.transfer_encoding,
head.content_length,
head.content_encoding,
decompressor,
decompression_buffer,
);
}
/// After receiving `error.ReadFailed` from the `std.io.Reader` returned by
/// `reader` or `readerDecompressing`, this function accesses the
/// more specific error code.
pub fn bodyErr(response: *const Response) ?http.Reader.BodyError {
return response.request.reader.body_err;
}
};
@ -688,6 +735,7 @@ pub const Request = struct {
version: http.Version = .@"HTTP/1.1",
transfer_encoding: TransferEncoding,
redirect_behavior: RedirectBehavior,
accept_encoding: @TypeOf(default_accept_encoding) = default_accept_encoding,
/// Whether the request should handle a 100-continue response before sending the request body.
handle_continue: bool,
@ -705,6 +753,14 @@ pub const Request = struct {
/// Externally-owned; must outlive the Request.
privileged_headers: []const http.Header,
pub const default_accept_encoding: [@typeInfo(http.ContentEncoding).@"enum".fields.len]bool = b: {
var result: [@typeInfo(http.ContentEncoding).@"enum".fields.len]bool = @splat(false);
result[@intFromEnum(http.ContentEncoding.gzip)] = true;
result[@intFromEnum(http.ContentEncoding.deflate)] = true;
result[@intFromEnum(http.ContentEncoding.identity)] = true;
break :b result;
};
pub const TransferEncoding = union(enum) {
content_length: u64,
chunked: void,
@ -844,9 +900,18 @@ pub const Request = struct {
}
if (try emitOverridableHeader("accept-encoding: ", r.headers.accept_encoding, w)) {
// https://github.com/ziglang/zig/issues/18937
//try w.writeAll("accept-encoding: gzip, deflate, zstd\r\n");
try w.writeAll("accept-encoding: gzip, deflate\r\n");
try w.writeAll("accept-encoding: ");
for (r.accept_encoding, 0..) |enabled, i| {
if (!enabled) continue;
const tag: http.ContentEncoding = @enumFromInt(i);
if (tag == .identity) continue;
const tag_name = @tagName(tag);
try w.ensureUnusedCapacity(tag_name.len + 2);
try w.writeAll(tag_name);
try w.writeAll(", ");
}
w.undo(2);
try w.writeAll("\r\n");
}
switch (r.transfer_encoding) {
@ -884,7 +949,7 @@ pub const Request = struct {
try w.writeAll("\r\n");
}
pub const ReceiveHeadError = std.io.Writer.Error || http.Reader.HeadError || error{
pub const ReceiveHeadError = http.Reader.HeadError || ConnectError || error{
/// Server sent headers that did not conform to the HTTP protocol.
///
/// To find out more detailed diagnostics, `http.Reader.head_buffer` can be
@ -897,8 +962,14 @@ pub const Request = struct {
HttpRedirectLocationMissing,
HttpRedirectLocationOversize,
HttpRedirectLocationInvalid,
CompressionInitializationFailed,
CompressionUnsupported,
HttpContentEncodingUnsupported,
HttpChunkInvalid,
HttpHeadersOversize,
UnsupportedUriScheme,
/// Sending the request failed. Error code can be found on the
/// `Connection` object.
WriteFailed,
};
/// If handling redirects and the request has no payload, then this
@ -957,44 +1028,35 @@ pub const Request = struct {
if (head.status.class() == .redirect and r.redirect_behavior != .unhandled) {
if (r.redirect_behavior == .not_allowed) return error.TooManyHttpRedirects;
const location = head.location orelse return error.HttpRedirectLocationMissing;
try r.redirect(location, &aux_buf);
try r.redirect(head, &aux_buf);
try r.sendBodiless();
continue;
}
switch (head.content_encoding) {
.identity, .deflate, .gzip, .@"x-gzip" => {},
.compress, .@"x-compress" => return error.CompressionUnsupported,
// https://github.com/ziglang/zig/issues/18937
.zstd => return error.CompressionUnsupported,
}
if (!r.accept_encoding[@intFromEnum(head.content_encoding)])
return error.HttpContentEncodingUnsupported;
return response;
}
}
pub const RedirectError = error{
HttpRedirectLocationOversize,
HttpRedirectLocationInvalid,
};
/// This function takes an auxiliary buffer to store the arbitrarily large
/// URI which may need to be merged with the previous URI, and that data
/// needs to survive across different connections, which is where the input
/// buffer lives.
///
/// `aux_buf` must outlive accesses to `Request.uri`.
fn redirect(r: *Request, new_location: []const u8, aux_buf: *[]u8) RedirectError!void {
fn redirect(r: *Request, head: *const Response.Head, aux_buf: *[]u8) !void {
const new_location = head.location orelse return error.HttpRedirectLocationMissing;
if (new_location.len > aux_buf.*.len) return error.HttpRedirectLocationOversize;
const location = aux_buf.*[0..new_location.len];
@memcpy(location, new_location);
{
// Skip the body of the redirect response to leave the connection in
// the correct state. This causes `new_location` to be invalidated.
var reader = r.reader.interface();
var reader = r.reader.bodyReader(head.transfer_encoding, head.content_length);
_ = reader.discardRemaining() catch |err| switch (err) {
error.ReadFailed => return r.reader.err.?,
error.ReadFailed => return r.reader.body_err.?,
};
}
const new_uri = r.uri.resolveInPlace(location.len, aux_buf) catch |err| switch (err) {
@ -1003,7 +1065,6 @@ pub const Request = struct {
error.InvalidPort => return error.HttpRedirectLocationInvalid,
error.NoSpaceLeft => return error.HttpRedirectLocationOversize,
};
const resolved_len = location.len + (aux_buf.*.ptr - location.ptr);
const protocol = Protocol.fromUri(new_uri) orelse return error.UnsupportedUriScheme;
const old_connection = r.connection.?;
@ -1022,7 +1083,7 @@ pub const Request = struct {
r.privileged_headers = &.{};
}
if (switch (r.response.status) {
if (switch (head.status) {
.see_other => true,
.moved_permanently, .found => r.method == .POST,
else => false,
@ -1042,7 +1103,6 @@ pub const Request = struct {
const new_connection = try r.client.connect(new_host, uriPort(new_uri, protocol), protocol);
r.uri = new_uri;
r.stolen_bytes_len = resolved_len;
r.connection = new_connection;
r.redirect_behavior.subtractOne();
}
@ -1054,9 +1114,8 @@ pub const Request = struct {
.default => return true,
.omit => return false,
.override => |x| {
try bw.writeAll(prefix);
try bw.writeAll(x);
try bw.writeAll("\r\n");
var vecs: [3][]const u8 = .{ prefix, x, "\r\n" };
try bw.writeVecAll(&vecs);
return false;
},
}
@ -1198,9 +1257,29 @@ pub fn connectTcp(
port: u16,
protocol: Protocol,
) ConnectTcpError!*Connection {
return connectTcpOptions(client, .{ .host = host, .port = port, .protocol = protocol });
}
pub const ConnectTcpOptions = struct {
host: []const u8,
port: u16,
protocol: Protocol,
proxied_host: ?[]const u8 = null,
proxied_port: ?u16 = null,
};
pub fn connectTcpOptions(client: *Client, options: ConnectTcpOptions) ConnectTcpError!*Connection {
const host = options.host;
const port = options.port;
const protocol = options.protocol;
const proxied_host = options.proxied_host orelse host;
const proxied_port = options.proxied_port orelse port;
if (client.connection_pool.findConnection(.{
.host = host,
.port = port,
.host = proxied_host,
.port = proxied_port,
.protocol = protocol,
})) |conn| return conn;
@ -1220,12 +1299,12 @@ pub fn connectTcp(
switch (protocol) {
.tls => {
if (disable_tls) return error.TlsInitializationFailed;
const tc = try Connection.Tls.create(client, host, port, stream);
const tc = try Connection.Tls.create(client, proxied_host, proxied_port, stream);
client.connection_pool.addUsed(&tc.connection);
return &tc.connection;
},
.plain => {
const pc = try Connection.Plain.create(client, host, port, stream);
const pc = try Connection.Plain.create(client, proxied_host, proxied_port, stream);
client.connection_pool.addUsed(&pc.connection);
return &pc.connection;
},
@ -1267,69 +1346,67 @@ pub fn connectUnix(client: *Client, path: []const u8) ConnectUnixError!*Connecti
return &conn.data;
}
/// Connect to `tunnel_host:tunnel_port` using the specified proxy with HTTP
/// Connect to `proxied_host:proxied_port` using the specified proxy with HTTP
/// CONNECT. This will reuse a connection if one is already open.
///
/// This function is threadsafe.
pub fn connectTunnel(
pub fn connectProxied(
client: *Client,
proxy: *Proxy,
tunnel_host: []const u8,
tunnel_port: u16,
proxied_host: []const u8,
proxied_port: u16,
) !*Connection {
if (!proxy.supports_connect) return error.TunnelNotSupported;
if (client.connection_pool.findConnection(.{
.host = tunnel_host,
.port = tunnel_port,
.host = proxied_host,
.port = proxied_port,
.protocol = proxy.protocol,
})) |node|
return node;
})) |node| return node;
var maybe_valid = false;
(tunnel: {
const conn = try client.connectTcp(proxy.host, proxy.port, proxy.protocol);
const connection = try client.connectTcpOptions(.{
.host = proxy.host,
.port = proxy.port,
.protocol = proxy.protocol,
.proxied_host = proxied_host,
.proxied_port = proxied_port,
});
errdefer {
conn.closing = true;
client.connection_pool.release(conn);
connection.closing = true;
client.connection_pool.release(connection);
}
var buffer: [8096]u8 = undefined;
var req = client.open(.CONNECT, .{
var req = client.request(.CONNECT, .{
.scheme = "http",
.host = .{ .raw = tunnel_host },
.port = tunnel_port,
.host = .{ .raw = proxied_host },
.port = proxied_port,
}, .{
.redirect_behavior = .unhandled,
.connection = conn,
.server_header_buffer = &buffer,
.connection = connection,
}) catch |err| {
std.log.debug("err {}", .{err});
break :tunnel err;
};
defer req.deinit();
req.send() catch |err| break :tunnel err;
req.wait() catch |err| break :tunnel err;
req.sendBodiless() catch |err| break :tunnel err;
const response = req.receiveHead(&.{}) catch |err| break :tunnel err;
if (req.response.status.class() == .server_error) {
if (response.head.status.class() == .server_error) {
maybe_valid = true;
break :tunnel error.ServerError;
}
if (req.response.status != .ok) break :tunnel error.ConnectionRefused;
if (response.head.status != .ok) break :tunnel error.ConnectionRefused;
// this connection is now a tunnel, so we can't use it for anything else, it will only be released when the client is de-initialized.
// this connection is now a tunnel, so we can't use it for anything
// else, it will only be released when the client is de-initialized.
req.connection = null;
client.allocator.free(conn.host);
conn.host = try client.allocator.dupe(u8, tunnel_host);
errdefer client.allocator.free(conn.host);
connection.closing = false;
conn.port = tunnel_port;
conn.closing = false;
return conn;
return connection;
}) catch {
// something went wrong with the tunnel
proxy.supports_connect = maybe_valid;
@ -1337,12 +1414,11 @@ pub fn connectTunnel(
};
}
// Prevents a dependency loop in open()
const ConnectErrorPartial = ConnectTcpError || error{ UnsupportedUriScheme, ConnectionRefused };
pub const ConnectError = ConnectErrorPartial || RequestError;
pub const ConnectError = ConnectTcpError || RequestError;
/// Connect to `host:port` using the specified protocol. This will reuse a
/// connection if one is already open.
///
/// If a proxy is configured for the client, then the proxy will be used to
/// connect to the host.
///
@ -1366,31 +1442,24 @@ pub fn connect(
}
if (proxy.supports_connect) tunnel: {
return connectTunnel(client, proxy, host, port) catch |err| switch (err) {
return connectProxied(client, proxy, host, port) catch |err| switch (err) {
error.TunnelNotSupported => break :tunnel,
else => |e| return e,
};
}
// fall back to using the proxy as a normal http proxy
const conn = try client.connectTcp(proxy.host, proxy.port, proxy.protocol);
errdefer {
conn.closing = true;
client.connection_pool.release(conn);
}
conn.proxied = true;
return conn;
const connection = try client.connectTcp(proxy.host, proxy.port, proxy.protocol);
connection.proxied = true;
return connection;
}
/// TODO collapse each error set into its own meta error code, and store
/// the underlying error code as a field on Request
pub const RequestError = ConnectTcpError || ConnectErrorPartial || std.io.Writer.Error || std.fmt.ParseIntError ||
error{
UnsupportedUriScheme,
UriMissingHost,
CertificateBundleLoadFailure,
};
pub const RequestError = ConnectTcpError || error{
UnsupportedUriScheme,
UriMissingHost,
UriHostTooLong,
CertificateBundleLoadFailure,
};
pub const RequestOptions = struct {
version: http.Version = .@"HTTP/1.1",
@ -1440,7 +1509,7 @@ fn uriPort(uri: Uri, protocol: Protocol) u16 {
/// This function is threadsafe.
///
/// Asserts that "\r\n" does not occur in any header name or value.
pub fn open(
pub fn request(
client: *Client,
method: http.Method,
uri: Uri,
@ -1486,6 +1555,11 @@ pub fn open(
.uri = uri,
.client = client,
.connection = connection,
.reader = .{
.in = connection.reader(),
.state = .ready,
.body_state = undefined,
},
.keep_alive = options.keep_alive,
.method = method,
.version = options.version,
@ -1499,13 +1573,13 @@ pub fn open(
}
pub const FetchOptions = struct {
server_header_buffer: ?[]u8 = null,
/// `null` means it will be heap-allocated.
redirect_buffer: ?[]u8 = null,
/// `null` means it will be heap-allocated.
decompress_buffer: ?[]u8 = null,
redirect_behavior: ?Request.RedirectBehavior = null,
/// If the server sends a body, it will be appended to this ArrayList.
/// `max_append_size` provides an upper limit for how much they can grow.
response_storage: ResponseStorage = .ignore,
max_append_size: ?usize = null,
/// If the server sends a body, it will be stored here.
response_storage: ?ResponseStorage = null,
location: Location,
method: ?http.Method = null,
@ -1529,11 +1603,11 @@ pub const FetchOptions = struct {
uri: Uri,
};
pub const ResponseStorage = union(enum) {
ignore,
/// Only the existing capacity will be used.
static: *std.ArrayListUnmanaged(u8),
dynamic: *std.ArrayList(u8),
pub const ResponseStorage = struct {
list: *std.ArrayListUnmanaged(u8),
/// If null then only the existing capacity will be used.
allocator: ?Allocator = null,
append_limit: std.io.Reader.Limit = .unlimited,
};
};
@ -1541,23 +1615,28 @@ pub const FetchResult = struct {
status: http.Status,
};
pub const FetchError = Uri.ParseError || RequestError || Request.ReceiveHeadError || error{
StreamTooLong,
/// TODO provide optional diagnostics when this occurs or break into more error codes
WriteFailed,
};
/// Perform a one-shot HTTP request with the provided options.
///
/// This function is threadsafe.
pub fn fetch(client: *Client, options: FetchOptions) !FetchResult {
pub fn fetch(client: *Client, options: FetchOptions) FetchError!FetchResult {
const uri = switch (options.location) {
.url => |u| try Uri.parse(u),
.uri => |u| u,
};
var server_header_buffer: [16 * 1024]u8 = undefined;
const method: http.Method = options.method orelse
if (options.payload != null) .POST else .GET;
var req = try open(client, method, uri, .{
.server_header_buffer = options.server_header_buffer orelse &server_header_buffer,
.redirect_behavior = options.redirect_behavior orelse
if (options.payload == null) @enumFromInt(3) else .unhandled,
const redirect_behavior: Request.RedirectBehavior = options.redirect_behavior orelse
if (options.payload == null) @enumFromInt(3) else .unhandled;
var req = try request(client, method, uri, .{
.redirect_behavior = redirect_behavior,
.headers = options.headers,
.extra_headers = options.extra_headers,
.privileged_headers = options.privileged_headers,
@ -1565,44 +1644,56 @@ pub fn fetch(client: *Client, options: FetchOptions) !FetchResult {
});
defer req.deinit();
if (options.payload) |payload| req.transfer_encoding = .{ .content_length = payload.len };
try req.send();
if (options.payload) |payload| {
var w = req.writer().unbuffered();
try w.writeAll(payload);
req.transfer_encoding = .{ .content_length = payload.len };
var body = try req.sendBody();
var bw = body.writer().unbuffered();
try bw.writeAll(payload);
try body.end();
} else {
try req.sendBodiless();
}
try req.finish();
try req.wait();
const redirect_buffer: []u8 = if (redirect_behavior == .unhandled) &.{} else options.redirect_buffer orelse
try client.allocator.alloc(u8, 8 * 1024);
defer if (options.redirect_buffer == null) client.allocator.free(redirect_buffer);
switch (options.response_storage) {
.ignore => {
// Take advantage of request internals to discard the response body
// and make the connection available for another request.
req.response.skip = true;
assert(try req.transferRead(&.{}) == 0); // No buffer is necessary when skipping.
},
.dynamic => |list| {
const max_append_size = options.max_append_size orelse 2 * 1024 * 1024;
try req.reader().readAllArrayList(list, max_append_size);
},
.static => |list| {
const buf = b: {
const buf = list.unusedCapacitySlice();
if (options.max_append_size) |len| {
if (len < buf.len) break :b buf[0..len];
}
break :b buf;
};
list.items.len += try req.reader().readAll(buf);
},
}
var response = try req.receiveHead(redirect_buffer);
return .{
.status = req.response.status,
const storage = options.response_storage orelse {
var reader = response.reader();
_ = reader.discardRemaining() catch |err| switch (err) {
error.ReadFailed => return response.bodyErr().?,
};
return .{ .status = response.head.status };
};
const decompress_buffer: []u8 = switch (response.head.content_encoding) {
.identity => &.{},
.zstd => options.decompress_buffer orelse
try client.allocator.alloc(u8, std.compress.zstd.Decompressor.Options.default_window_buffer_len * 2),
else => options.decompress_buffer orelse try client.allocator.alloc(u8, 8 * 1024),
};
defer if (options.decompress_buffer == null) client.allocator.free(decompress_buffer);
var decompressor: http.Decompressor = undefined;
var reader = response.readerDecompressing(&decompressor, decompress_buffer);
const list = storage.list;
if (storage.allocator) |allocator| {
reader.readRemainingArrayList(allocator, null, list, storage.append_limit) catch |err| switch (err) {
error.ReadFailed => return response.bodyErr().?,
else => |e| return e,
};
} else {
var br = reader.unbuffered();
const buf = storage.append_limit.slice(list.unusedCapacitySlice());
list.items.len += br.readSliceShort(buf) catch |err| switch (err) {
error.ReadFailed => return response.bodyErr().?,
};
}
return .{ .status = response.head.status };
}
pub fn sameParentDomain(parent_host: []const u8, child_host: []const u8) bool {

View File

@ -55,13 +55,6 @@ pub const Request = struct {
/// `receiveHead`.
head: Head,
pub const Compression = union(enum) {
deflate: std.compress.zlib.Decompressor,
gzip: std.compress.gzip.Decompressor,
zstd: std.compress.zstd.Decompressor,
none: void,
};
pub const Head = struct {
method: http.Method,
target: []const u8,
@ -72,7 +65,6 @@ pub const Request = struct {
transfer_encoding: http.TransferEncoding,
transfer_compression: http.ContentEncoding,
keep_alive: bool,
compression: Compression,
pub const ParseError = error{
UnknownHttpMethod,
@ -126,7 +118,6 @@ pub const Request = struct {
.@"HTTP/1.0" => false,
.@"HTTP/1.1" => true,
},
.compression = .none,
};
while (it.next()) |line| {
@ -156,7 +147,7 @@ pub const Request = struct {
const trimmed = mem.trim(u8, header_value, " ");
if (std.meta.stringToEnum(http.ContentEncoding, trimmed)) |ce| {
if (http.ContentEncoding.fromString(trimmed)) |ce| {
head.transfer_compression = ce;
} else {
return error.HttpTransferEncodingUnsupported;
@ -181,7 +172,7 @@ pub const Request = struct {
if (next) |second| {
const trimmed_second = mem.trim(u8, second, " ");
if (std.meta.stringToEnum(http.ContentEncoding, trimmed_second)) |transfer| {
if (http.ContentEncoding.fromString(trimmed_second)) |transfer| {
if (head.transfer_compression != .identity)
return error.HttpHeadersInvalid; // double compression is not supported
head.transfer_compression = transfer;
@ -236,10 +227,8 @@ pub const Request = struct {
"TRansfer-encoding:\tdeflate, chunked \r\n" ++
"connectioN:\t keep-alive \r\n\r\n";
var read_buffer: [500]u8 = undefined;
@memcpy(read_buffer[0..request_bytes.len], request_bytes);
var br: std.io.BufferedReader = undefined;
br.initFixed(&read_buffer);
br.initFixed(@constCast(request_bytes));
var server: Server = .{
.reader = .{
@ -252,7 +241,6 @@ pub const Request = struct {
var request: Request = .{
.server = &server,
.trailers_len = 0,
.head = undefined,
};
@ -529,7 +517,7 @@ pub const Request = struct {
return error.HttpExpectationFailed;
}
}
return request.server.reader.interface(request.head.transfer_encoding, request.head.content_length);
return request.server.reader.bodyReader(request.head.transfer_encoding, request.head.content_length);
}
/// Returns whether the connection should remain persistent.

View File

@ -236,7 +236,7 @@ pub fn writeMessagev(ws: *WebSocket, message: []const std.posix.iovec_const, opc
},
};
var bw = ws.body_writer.interface().unbuffered();
var bw = ws.body_writer.writer().unbuffered();
try bw.writeAll(header);
for (message) |iovec| try bw.writeAll(iovec.base[0..iovec.len]);
try bw.flush();

View File

@ -61,7 +61,7 @@ test "trailers" {
const uri = try std.Uri.parse(location);
{
var req = try client.open(.GET, uri, .{});
var req = try client.request(.GET, uri, .{});
defer req.deinit();
try req.sendBodiless();
@ -263,7 +263,7 @@ test "Server.Request.respondStreaming non-chunked, unknown content-length" {
var connection_bw = stream_writer.interface().buffered(&send_buffer);
var server = http.Server.init(&connection_br, &connection_bw);
try expectEqual(.ready, server.state);
try expectEqual(.ready, server.reader.state);
var request = try server.receiveHead();
try expectEqualStrings(request.head.target, "/foo");
var response = try request.respondStreaming(.{
@ -278,7 +278,7 @@ test "Server.Request.respondStreaming non-chunked, unknown content-length" {
}
try expectEqual(7390, bw.count);
try response.end();
try expectEqual(.closing, server.state);
try expectEqual(.closing, server.reader.state);
}
}
});
@ -331,7 +331,7 @@ test "receiving arbitrary http headers from the client" {
var connection_bw = stream_writer.interface().buffered(&send_buffer);
var server = http.Server.init(&connection_br, &connection_bw);
try expectEqual(.ready, server.state);
try expectEqual(.ready, server.reader.state);
var request = try server.receiveHead();
try expectEqualStrings("/bar", request.head.target);
var it = request.iterateHeaders();
@ -563,7 +563,7 @@ test "general client/server API coverage" {
log.info("{s}", .{location});
var redirect_buffer: [1024]u8 = undefined;
var req = try client.open(.GET, uri, .{});
var req = try client.request(.GET, uri, .{});
defer req.deinit();
try req.sendBodiless();
@ -586,7 +586,7 @@ test "general client/server API coverage" {
log.info("{s}", .{location});
var redirect_buffer: [1024]u8 = undefined;
var req = try client.open(.GET, uri, .{});
var req = try client.request(.GET, uri, .{});
defer req.deinit();
try req.sendBodiless();
@ -608,7 +608,7 @@ test "general client/server API coverage" {
log.info("{s}", .{location});
var redirect_buffer: [1024]u8 = undefined;
var req = try client.open(.HEAD, uri, .{});
var req = try client.request(.HEAD, uri, .{});
defer req.deinit();
try req.sendBodiless();
@ -632,7 +632,7 @@ test "general client/server API coverage" {
log.info("{s}", .{location});
var redirect_buffer: [1024]u8 = undefined;
var req = try client.open(.GET, uri, .{});
var req = try client.request(.GET, uri, .{});
defer req.deinit();
try req.sendBodiless();
@ -655,18 +655,18 @@ test "general client/server API coverage" {
log.info("{s}", .{location});
var redirect_buffer: [1024]u8 = undefined;
var req = try client.open(.HEAD, uri, .{});
var req = try client.request(.HEAD, uri, .{});
defer req.deinit();
try req.sendBodiless();
try req.receiveHead(&redirect_buffer);
var response = try req.receiveHead(&redirect_buffer);
const body = try req.reader().readRemainingAlloc(gpa, .limited(8192));
const body = try response.reader().readRemainingAlloc(gpa, .limited(8192));
defer gpa.free(body);
try expectEqualStrings("", body);
try expectEqualStrings("text/plain", req.response.content_type.?);
try expect(req.response.transfer_encoding == .chunked);
try expectEqualStrings("text/plain", response.head.content_type.?);
try expect(response.head.transfer_encoding == .chunked);
}
// connection has been kept alive
@ -679,19 +679,19 @@ test "general client/server API coverage" {
log.info("{s}", .{location});
var redirect_buffer: [1024]u8 = undefined;
var req = try client.open(.GET, uri, .{
var req = try client.request(.GET, uri, .{
.keep_alive = false,
});
defer req.deinit();
try req.sendBodiless();
try req.receiveHead(&redirect_buffer);
var response = try req.receiveHead(&redirect_buffer);
const body = try req.reader().readRemainingAlloc(gpa, .limited(8192));
const body = try response.reader().readRemainingAlloc(gpa, .limited(8192));
defer gpa.free(body);
try expectEqualStrings("Hello, World!\n", body);
try expectEqualStrings("text/plain", req.response.content_type.?);
try expectEqualStrings("text/plain", response.head.content_type.?);
}
// connection has been closed
@ -704,7 +704,7 @@ test "general client/server API coverage" {
log.info("{s}", .{location});
var redirect_buffer: [1024]u8 = undefined;
var req = try client.open(.GET, uri, .{
var req = try client.request(.GET, uri, .{
.extra_headers = &.{
.{ .name = "empty", .value = "" },
},
@ -712,16 +712,16 @@ test "general client/server API coverage" {
defer req.deinit();
try req.sendBodiless();
try req.receiveHead(&redirect_buffer);
var response = try req.receiveHead(&redirect_buffer);
try std.testing.expectEqual(.ok, req.response.status);
try std.testing.expectEqual(.ok, response.head.status);
const body = try req.reader().readRemainingAlloc(gpa, .limited(8192));
const body = try response.reader().readRemainingAlloc(gpa, .limited(8192));
defer gpa.free(body);
try expectEqualStrings("", body);
var it = req.response.iterateHeaders();
var it = response.head.iterateHeaders();
{
const header = it.next().?;
try expect(!it.is_trailer);
@ -747,13 +747,13 @@ test "general client/server API coverage" {
log.info("{s}", .{location});
var redirect_buffer: [1024]u8 = undefined;
var req = try client.open(.GET, uri, .{});
var req = try client.request(.GET, uri, .{});
defer req.deinit();
try req.sendBodiless();
try req.receiveHead(&redirect_buffer);
var response = try req.receiveHead(&redirect_buffer);
const body = try req.reader().readRemainingAlloc(gpa, .limited(8192));
const body = try response.reader().readRemainingAlloc(gpa, .limited(8192));
defer gpa.free(body);
try expectEqualStrings("Hello, World!\n", body);
@ -769,13 +769,13 @@ test "general client/server API coverage" {
log.info("{s}", .{location});
var redirect_buffer: [1024]u8 = undefined;
var req = try client.open(.GET, uri, .{});
var req = try client.request(.GET, uri, .{});
defer req.deinit();
try req.sendBodiless();
try req.receiveHead(&redirect_buffer);
var response = try req.receiveHead(&redirect_buffer);
const body = try req.reader().readRemainingAlloc(gpa, .limited(8192));
const body = try response.reader().readRemainingAlloc(gpa, .limited(8192));
defer gpa.free(body);
try expectEqualStrings("Hello, World!\n", body);
@ -791,13 +791,13 @@ test "general client/server API coverage" {
log.info("{s}", .{location});
var redirect_buffer: [1024]u8 = undefined;
var req = try client.open(.GET, uri, .{});
var req = try client.request(.GET, uri, .{});
defer req.deinit();
try req.sendBodiless();
try req.receiveHead(&redirect_buffer);
var response = try req.receiveHead(&redirect_buffer);
const body = try req.reader().readRemainingAlloc(gpa, .limited(8192));
const body = try response.reader().readRemainingAlloc(gpa, .limited(8192));
defer gpa.free(body);
try expectEqualStrings("Hello, World!\n", body);
@ -813,14 +813,16 @@ test "general client/server API coverage" {
log.info("{s}", .{location});
var redirect_buffer: [1024]u8 = undefined;
var req = try client.open(.GET, uri, .{});
var req = try client.request(.GET, uri, .{});
defer req.deinit();
try req.sendBodiless();
req.receiveHead(&redirect_buffer) catch |err| switch (err) {
if (req.receiveHead(&redirect_buffer)) |_| {
return error.TestFailed;
} else |err| switch (err) {
error.TooManyHttpRedirects => {},
else => return err,
};
}
}
{ // redirect to encoded url
@ -830,13 +832,13 @@ test "general client/server API coverage" {
log.info("{s}", .{location});
var redirect_buffer: [1024]u8 = undefined;
var req = try client.open(.GET, uri, .{});
var req = try client.request(.GET, uri, .{});
defer req.deinit();
try req.sendBodiless();
try req.receiveHead(&redirect_buffer);
var response = try req.receiveHead(&redirect_buffer);
const body = try req.reader().readRemainingAlloc(gpa, .limited(8192));
const body = try response.reader().readRemainingAlloc(gpa, .limited(8192));
defer gpa.free(body);
try expectEqualStrings("Encoded redirect successful!\n", body);
@ -852,7 +854,7 @@ test "general client/server API coverage" {
log.info("{s}", .{location});
var redirect_buffer: [1024]u8 = undefined;
var req = try client.open(.GET, uri, .{});
var req = try client.request(.GET, uri, .{});
defer req.deinit();
try req.sendBodiless();
@ -867,36 +869,6 @@ test "general client/server API coverage" {
// connection has been kept alive
try expect(client.http_proxy != null or client.connection_pool.free_len == 1);
{ // issue 16282 *** This test leaves the client in an invalid state, it must be last ***
const location = try std.fmt.allocPrint(gpa, "http://127.0.0.1:{d}/get", .{port});
defer gpa.free(location);
const uri = try std.Uri.parse(location);
const total_connections = client.connection_pool.free_size + 64;
var requests = try gpa.alloc(http.Client.Request, total_connections);
defer gpa.free(requests);
var header_bufs = std.ArrayList([]u8).init(gpa);
defer header_bufs.deinit();
defer for (header_bufs.items) |item| gpa.free(item);
for (0..total_connections) |i| {
const headers_buf = try gpa.alloc(u8, 1024);
try header_bufs.append(headers_buf);
var req = try client.open(.GET, uri, .{});
req.response.parser.done = true;
req.connection.?.closing = false;
requests[i] = req;
}
for (0..total_connections) |i| {
requests[i].deinit();
}
// free connections should be full now
try expect(client.connection_pool.free_len == client.connection_pool.free_size);
}
client.deinit();
{
@ -950,7 +922,7 @@ test "Server streams both reading and writing" {
defer client.deinit();
var redirect_buffer: [555]u8 = undefined;
var req = try client.open(.POST, .{
var req = try client.request(.POST, .{
.scheme = "http",
.host = .{ .raw = "127.0.0.1" },
.port = test_server.port(),
@ -983,7 +955,7 @@ fn echoTests(client: *http.Client, port: u16) !void {
const uri = try std.Uri.parse(location);
var redirect_buffer: [1024]u8 = undefined;
var req = try client.open(.POST, uri, .{
var req = try client.request(.POST, uri, .{
.extra_headers = &.{
.{ .name = "content-type", .value = "text/plain" },
},
@ -1017,7 +989,7 @@ fn echoTests(client: *http.Client, port: u16) !void {
));
var redirect_buffer: [1024]u8 = undefined;
var req = try client.open(.POST, uri, .{
var req = try client.request(.POST, uri, .{
.extra_headers = &.{
.{ .name = "content-type", .value = "text/plain" },
},
@ -1048,8 +1020,8 @@ fn echoTests(client: *http.Client, port: u16) !void {
const location = try std.fmt.allocPrint(gpa, "http://127.0.0.1:{d}/echo-content#fetch", .{port});
defer gpa.free(location);
var body = std.ArrayList(u8).init(gpa);
defer body.deinit();
var body: std.ArrayListUnmanaged(u8) = .empty;
defer body.deinit(gpa);
const res = try client.fetch(.{
.location = .{ .url = location },
@ -1058,7 +1030,7 @@ fn echoTests(client: *http.Client, port: u16) !void {
.extra_headers = &.{
.{ .name = "content-type", .value = "text/plain" },
},
.response_storage = .{ .dynamic = &body },
.response_storage = .{ .allocator = gpa, .list = &body },
});
try expectEqual(.ok, res.status);
try expectEqualStrings("Hello, World!\n", body.items);
@ -1070,7 +1042,7 @@ fn echoTests(client: *http.Client, port: u16) !void {
const uri = try std.Uri.parse(location);
var redirect_buffer: [1024]u8 = undefined;
var req = try client.open(.POST, uri, .{
var req = try client.request(.POST, uri, .{
.extra_headers = &.{
.{ .name = "expect", .value = "100-continue" },
.{ .name = "content-type", .value = "text/plain" },
@ -1101,7 +1073,7 @@ fn echoTests(client: *http.Client, port: u16) !void {
const uri = try std.Uri.parse(location);
var redirect_buffer: [1024]u8 = undefined;
var req = try client.open(.POST, uri, .{
var req = try client.request(.POST, uri, .{
.extra_headers = &.{
.{ .name = "content-type", .value = "text/plain" },
.{ .name = "expect", .value = "garbage" },
@ -1222,7 +1194,7 @@ test "redirect to different connection" {
{
var redirect_buffer: [666]u8 = undefined;
var req = try client.open(.GET, uri, .{});
var req = try client.request(.GET, uri, .{});
defer req.deinit();
try req.sendBodiless();

View File

@ -1898,6 +1898,10 @@ pub const Stream = struct {
pub const Error = ReadError;
pub fn getStream(r: *const Reader) Stream {
return r.stream;
}
pub fn interface(r: *Reader) std.io.Reader {
return .{
.context = r.stream.handle,
@ -1968,6 +1972,10 @@ pub const Stream = struct {
pub fn interface(r: *Reader) std.io.Reader {
return r.file_reader.interface();
}
pub fn getStream(r: *const Reader) Stream {
return .{ .handle = r.file_reader.file.handle };
}
},
};
@ -1987,6 +1995,10 @@ pub const Stream = struct {
};
}
pub fn getStream(w: *const Writer) Stream {
return w.stream;
}
fn writeSplat(context: ?*anyopaque, data: []const []const u8, splat: usize) std.io.Writer.Error!usize {
comptime assert(native_os == .windows);
if (data.len == 1 and splat == 0) return 0;
@ -2130,6 +2142,10 @@ pub const Stream = struct {
return error.WriteFailed;
};
}
pub fn getStream(w: *const Writer) Stream {
return .{ .handle = w.file_writer.file.handle };
}
},
};