std.http: more update

This commit is contained in:
Andrew Kelley 2025-04-27 21:04:15 -07:00
parent ab3a947bef
commit d8cea03245
4 changed files with 99 additions and 90 deletions

View File

@ -331,9 +331,18 @@ pub const Reader = struct {
body_err: ?BodyError = null,
/// Stolen from `in`.
head_buffer: []u8 = &.{},
compression: Compression,
pub const max_chunk_header_len = 22;
pub const Compression = union(enum) {
deflate: std.compress.zlib.Decompressor,
gzip: std.compress.gzip.Decompressor,
// https://github.com/ziglang/zig/issues/18937
//zstd: std.compress.zstd.Decompressor,
none: void,
};
pub const RemainingChunkLen = enum(u64) {
head = 0,
n = 1,
@ -408,13 +417,18 @@ pub const Reader = struct {
}
/// Asserts only called once and after `receiveHead`.
pub fn interface(reader: *Reader, transfer_encoding: TransferEncoding, content_length: ?u64) std.io.Reader {
pub fn interface(
reader: *Reader,
transfer_encoding: TransferEncoding,
content_length: ?u64,
content_encoding: ContentEncoding,
) std.io.Reader {
assert(reader.state == .received_head);
reader.state = .receiving_body;
switch (transfer_encoding) {
.chunked => {
reader.transfer_br.unbuffered_reader = switch (transfer_encoding) {
.chunked => r: {
reader.body_state = .{ .remaining_chunk_len = .head };
return .{
break :r .{
.context = reader,
.vtable = &.{
.read = &chunkedRead,
@ -423,10 +437,10 @@ pub const Reader = struct {
},
};
},
.none => {
.none => r: {
if (content_length) |len| {
reader.body_state = .{ .remaining_content_length = len };
return .{
break :r .{
.context = reader,
.vtable = &.{
.read = &contentLengthRead,
@ -434,10 +448,39 @@ pub const Reader = struct {
.discard = &contentLengthDiscard,
},
};
} else {
return reader.in.reader();
} else switch (content_encoding) {
.identity => {
reader.compression = .none;
return reader.in.reader();
},
.deflate => {
reader.compression = .{ .deflate = .init(reader.in) };
return reader.compression.deflate.reader();
},
.gzip, .@"x-gzip" => {
reader.compression = .{ .gzip = .init(reader.in) };
return reader.compression.gzip.reader();
},
.compress, .@"x-compress" => unreachable,
.zstd => unreachable, // https://github.com/ziglang/zig/issues/18937
}
},
};
switch (content_encoding) {
.identity => {
reader.compression = .none;
return reader.transfer_br.unbuffered_reader;
},
.deflate => {
reader.compression = .{ .deflate = .init(&reader.transfer_br) };
return reader.compression.deflate.reader();
},
.gzip, .@"x-gzip" => {
reader.compression = .{ .gzip = .init(&reader.transfer_br) };
return reader.compression.gzip.reader();
},
.compress, .@"x-compress" => unreachable,
.zstd => unreachable, // https://github.com/ziglang/zig/issues/18937
}
}
@ -731,7 +774,7 @@ pub const BodyWriter = struct {
/// BodyWriter is mid-chunk.
pub fn flush(w: *BodyWriter) WriteError!void {
switch (w.state) {
.none, .content_length => return w.http_protocol_output.flush(),
.end, .none, .content_length => return w.http_protocol_output.flush(),
.chunked => |*chunked| switch (chunked.*) {
.offset => |*offset| {
try w.http_protocol_output.flushLimit(.limited(w.http_protocol_output.end - offset.*));
@ -1018,7 +1061,7 @@ pub const BodyWriter = struct {
}
}
pub fn interface(w: *BodyWriter) std.io.Writer {
pub fn writer(w: *BodyWriter) std.io.Writer {
return .{
.context = w,
.vtable = switch (w.state) {

View File

@ -116,7 +116,7 @@ pub const ConnectionPool = struct {
/// `allocator` must be the same one used to create `connection`.
///
/// Threadsafe.
pub fn release(pool: *ConnectionPool, allocator: Allocator, connection: *Connection) void {
pub fn release(pool: *ConnectionPool, connection: *Connection) void {
if (connection.closing) return connection.destroy();
pool.mutex.lock();
@ -130,8 +130,7 @@ pub const ConnectionPool = struct {
const popped: *Connection = @fieldParentPtr("pool_node", pool.free.popFirst().?);
pool.free_len -= 1;
popped.close(allocator);
allocator.destroy(popped);
popped.destroy();
}
if (connection.proxied) {
@ -434,20 +433,6 @@ pub const Connection = struct {
}
};
/// The decompressor for response messages.
pub const Compression = union(enum) {
pub const DeflateDecompressor = std.compress.zlib.Decompressor;
pub const GzipDecompressor = std.compress.gzip.Decompressor;
// https://github.com/ziglang/zig/issues/18937
//pub const ZstdDecompressor = std.compress.zstd.DecompressStream(.{});
deflate: DeflateDecompressor,
gzip: GzipDecompressor,
// https://github.com/ziglang/zig/issues/18937
//zstd: ZstdDecompressor,
none: void,
};
pub const Response = struct {
request: *Request,
/// Pointers in this struct are invalidated with the next call to
@ -469,9 +454,7 @@ pub const Response = struct {
content_length: ?u64 = null,
transfer_encoding: http.TransferEncoding = .none,
transfer_compression: http.ContentEncoding = .identity,
compression: Compression = .none,
content_encoding: http.ContentEncoding = .identity,
pub const ParseError = error{
HttpHeadersInvalid,
@ -554,8 +537,8 @@ pub const Response = struct {
const trimmed_second = mem.trim(u8, second, " ");
if (std.meta.stringToEnum(http.ContentEncoding, trimmed_second)) |transfer| {
if (res.transfer_compression != .identity) return error.HttpHeadersInvalid; // double compression is not supported
res.transfer_compression = transfer;
if (res.content_encoding != .identity) return error.HttpHeadersInvalid; // double compression is not supported
res.content_encoding = transfer;
} else {
return error.HttpTransferEncodingUnsupported;
}
@ -569,12 +552,12 @@ pub const Response = struct {
res.content_length = content_length;
} else if (std.ascii.eqlIgnoreCase(header_name, "content-encoding")) {
if (res.transfer_compression != .identity) return error.HttpHeadersInvalid;
if (res.content_encoding != .identity) return error.HttpHeadersInvalid;
const trimmed = mem.trim(u8, header_value, " ");
if (std.meta.stringToEnum(http.ContentEncoding, trimmed)) |ce| {
res.transfer_compression = ce;
res.content_encoding = ce;
} else {
return error.HttpTransferEncodingUnsupported;
}
@ -592,7 +575,7 @@ pub const Response = struct {
"TRansfer-encoding:\tdeflate, chunked \r\n" ++
"connectioN:\t keep-alive \r\n\r\n";
const head = Head.parse(response_bytes);
const head = try Head.parse(response_bytes);
try testing.expectEqual(.@"HTTP/1.1", head.version);
try testing.expectEqualStrings("OK", head.reason);
@ -605,7 +588,7 @@ pub const Response = struct {
try testing.expectEqual(true, head.keep_alive);
try testing.expectEqual(10, head.content_length.?);
try testing.expectEqual(.chunked, head.transfer_encoding);
try testing.expectEqual(.deflate, head.transfer_compression);
try testing.expectEqual(.deflate, head.content_encoding);
}
pub fn iterateHeaders(h: Head) http.HeaderIterator {
@ -621,19 +604,8 @@ pub const Response = struct {
"TRansfer-encoding:\tdeflate, chunked \r\n" ++
"connectioN:\t keep-alive \r\n\r\n";
var header_buffer: [1024]u8 = undefined;
var res = Response{
.status = undefined,
.reason = undefined,
.version = undefined,
.keep_alive = false,
.parser = .init(&header_buffer),
};
@memcpy(header_buffer[0..response_bytes.len], response_bytes);
res.parser.header_bytes_len = response_bytes.len;
var it = res.iterateHeaders();
const head = try Head.parse(response_bytes);
var it = head.iterateHeaders();
{
const header = it.next().?;
try testing.expectEqualStrings("LOcation", header.name);
@ -695,7 +667,7 @@ pub const Response = struct {
/// Asserts that this function is only called once.
pub fn reader(response: *Response) std.io.Reader {
const head = &response.head;
return response.request.reader.interface(head.transfer_encoding, head.content_length);
return response.request.reader.interface(head.transfer_encoding, head.content_length, head.content_encoding);
}
};
@ -778,16 +750,16 @@ pub const Request = struct {
}
};
/// Frees all resources associated with the request.
pub fn deinit(req: *Request) void {
if (req.connection) |connection| {
if (!req.response.parser.done) {
// If the response wasn't fully read, then we need to close the connection.
/// Returns the request's `Connection` back to the pool of the `Client`.
pub fn deinit(r: *Request) void {
if (r.connection) |connection| {
if (r.reader.state != .ready) {
// Connection cannot be reused.
connection.closing = true;
}
req.client.connection_pool.release(req.client.allocator, connection);
r.client.connection_pool.release(connection);
}
req.* = undefined;
r.* = undefined;
}
/// Sends and flushes a complete request as only HTTP head, no body.
@ -810,12 +782,12 @@ pub const Request = struct {
try sendHead(r);
return .{
.http_protocol_output = &r.connection.?.writer,
.transfer_encoding = if (r.transfer_encoding) |te| switch (te) {
.state = switch (r.transfer_encoding) {
.chunked => .{ .chunked = .init },
.content_length => |len| .{ .content_length = len },
.none => .none,
} else .{ .chunked = .init },
.elide_body = false,
},
.elide = false,
};
}
@ -912,7 +884,7 @@ pub const Request = struct {
try w.writeAll("\r\n");
}
pub const ReceiveHeadError = http.Reader.HeadError || error{
pub const ReceiveHeadError = std.io.Writer.Error || http.Reader.HeadError || error{
/// Server sent headers that did not conform to the HTTP protocol.
///
/// To find out more detailed diagnostics, `http.Reader.head_buffer` can be
@ -956,7 +928,7 @@ pub const Request = struct {
if (head.status == .@"continue") {
if (r.handle_continue) continue;
return; // we're not handling the 100-continue
return response; // we're not handling the 100-continue
}
// This while loop is for handling redirects, which means the request's
@ -987,25 +959,17 @@ pub const Request = struct {
if (r.redirect_behavior == .not_allowed) return error.TooManyHttpRedirects;
const location = head.location orelse return error.HttpRedirectLocationMissing;
try r.redirect(location, &aux_buf);
try r.send();
try r.sendBodiless();
continue;
}
switch (head.transfer_compression) {
.identity => response.compression = .none,
switch (head.content_encoding) {
.identity, .deflate, .gzip, .@"x-gzip" => {},
.compress, .@"x-compress" => return error.CompressionUnsupported,
.deflate => response.compression = .{
.deflate = std.compress.zlib.decompressor(r.transferReader()),
},
.gzip, .@"x-gzip" => response.compression = .{
.gzip = std.compress.gzip.decompressor(r.transferReader()),
},
// https://github.com/ziglang/zig/issues/18937
//.zstd => response.compression = .{
// .zstd = std.compress.zstd.decompressStream(r.client.allocator, r.transferReader()),
//},
.zstd => return error.CompressionUnsupported,
}
return response;
}
}
@ -1050,7 +1014,7 @@ pub const Request = struct {
std.ascii.eqlIgnoreCase(r.uri.scheme, new_uri.scheme) and
sameParentDomain(old_host, new_host);
r.client.connection_pool.release(r.client.allocator, old_connection);
r.client.connection_pool.release(old_connection);
r.connection = null;
if (!keep_privileged_headers) {
@ -1327,7 +1291,7 @@ pub fn connectTunnel(
const conn = try client.connectTcp(proxy.host, proxy.port, proxy.protocol);
errdefer {
conn.closing = true;
client.connection_pool.release(client.allocator, conn);
client.connection_pool.release(conn);
}
var buffer: [8096]u8 = undefined;

View File

@ -242,9 +242,12 @@ pub const Request = struct {
br.initFixed(&read_buffer);
var server: Server = .{
.in = &br,
.reader = .{
.in = &br,
.state = .ready,
.body_state = undefined,
},
.out = undefined,
.state = .ready,
};
var request: Request = .{

View File

@ -24,10 +24,10 @@ test "trailers" {
var connection_bw = stream_writer.interface().buffered(&send_buffer);
var server = http.Server.init(&connection_br, &connection_bw);
try expectEqual(.ready, server.state);
try expectEqual(.ready, server.reader.state);
var request = try server.receiveHead();
try serve(&request);
try expectEqual(.ready, server.state);
try expectEqual(.ready, server.reader.state);
}
}
@ -72,7 +72,7 @@ test "trailers" {
try expectEqualStrings("Hello, World!\n", body);
var it = response.iterateHeaders();
var it = response.head.iterateHeaders();
{
const header = it.next().?;
try expect(!it.is_trailer);
@ -174,7 +174,7 @@ test "echo content server" {
var connection_bw = stream_writer.interface().buffered(&send_buffer);
var http_server = http.Server.init(&connection_br, &connection_bw);
while (http_server.state == .ready) {
while (http_server.reader.state == .ready) {
var request = http_server.receiveHead() catch |err| switch (err) {
error.HttpConnectionClosing => continue :accept,
else => |e| return e,
@ -401,7 +401,7 @@ test "general client/server API coverage" {
var connection_bw = stream_writer.interface().buffered(&send_buffer);
var http_server = http.Server.init(&connection_br, &connection_bw);
while (http_server.state == .ready) {
while (http_server.reader.state == .ready) {
var request = http_server.receiveHead() catch |err| switch (err) {
error.HttpConnectionClosing => continue :outer,
else => |e| return e,
@ -618,7 +618,7 @@ test "general client/server API coverage" {
defer gpa.free(body);
try expectEqualStrings("", body);
try expectEqualStrings("text/plain", response.content_type.?);
try expectEqualStrings("text/plain", response.head.content_type.?);
try expectEqual(14, response.head.content_length.?);
}
@ -962,11 +962,10 @@ test "Server streams both reading and writing" {
var body_writer = try req.sendBody();
var response = try req.receiveHead(&redirect_buffer);
var w = body_writer.interface().unbuffered();
var w = body_writer.writer().unbuffered();
try w.writeAll("one ");
try w.writeAll("fish");
try req.finish();
try body_writer.end();
const body = try response.reader().readRemainingAlloc(std.testing.allocator, .limited(8192));
defer std.testing.allocator.free(body);
@ -994,7 +993,7 @@ fn echoTests(client: *http.Client, port: u16) !void {
req.transfer_encoding = .{ .content_length = 14 };
var body_writer = try req.sendBody();
var w = body_writer.interface().unbuffered();
var w = body_writer.writer().unbuffered();
try w.writeAll("Hello, ");
try w.writeAll("World!\n");
try body_writer.end();
@ -1028,7 +1027,7 @@ fn echoTests(client: *http.Client, port: u16) !void {
req.transfer_encoding = .chunked;
var body_writer = try req.sendBody();
var w = body_writer.interface().unbuffered();
var w = body_writer.writer().unbuffered();
try w.writeAll("Hello, ");
try w.writeAll("World!\n");
try body_writer.end();
@ -1082,7 +1081,7 @@ fn echoTests(client: *http.Client, port: u16) !void {
req.transfer_encoding = .chunked;
var body_writer = try req.sendBody();
var w = body_writer.interface().unbuffered();
var w = body_writer.writer().unbuffered();
try w.writeAll("Hello, ");
try w.writeAll("World!\n");
try body_writer.end();