std.http: fix chunked transfer flushing and end-of-stream handling

This commit is contained in:
Andrew Kelley 2025-05-04 20:01:31 -07:00
parent 6fe9c8f036
commit 2abbcb6d2a
4 changed files with 119 additions and 53 deletions

View File

@ -331,9 +331,12 @@ pub const Reader = struct {
/// making invalid API usage cause assertion failures rather than HTTP
/// protocol violations.
state: State,
/// Number of bytes of HTTP trailers. These are at the end of a
/// transfer-encoding: chunked message.
trailers_len: usize = 0,
/// HTTP trailer bytes. These are at the end of a transfer-encoding:
/// chunked message. This data is available only after calling one of the
/// "end" functions and points to data inside the buffer of `in`, and is
/// therefore invalidated on the next call to `receiveHead`, or any other
/// read from `in`.
trailers: []const u8 = &.{},
body_err: ?BodyError = null,
/// Stolen from `in`.
head_buffer: []u8 = &.{},
@ -344,7 +347,6 @@ pub const Reader = struct {
head = 0,
n = 1,
rn = 2,
done = std.math.maxInt(u64),
_,
pub fn init(integer: u64) RemainingChunkLen {
@ -371,6 +373,7 @@ pub const Reader = struct {
pub const BodyError = error{
HttpChunkInvalid,
HttpChunkTruncated,
HttpHeadersOversize,
};
@ -393,6 +396,7 @@ pub const Reader = struct {
/// Buffers the entire head into `head_buffer`, invalidating the previous
/// `head_buffer`, if any.
pub fn receiveHead(reader: *Reader) HeadError!void {
reader.trailers = &.{};
const in = reader.in;
in.restitute(reader.head_buffer.len);
in.rebase();
@ -544,7 +548,11 @@ pub const Reader = struct {
limit: std.io.Reader.Limit,
) std.io.Reader.RwError!usize {
const reader: *Reader = @alignCast(@ptrCast(ctx));
const chunk_len_ptr = &reader.state.body_remaining_chunk_len;
const chunk_len_ptr = switch (reader.state) {
.ready => return error.EndOfStream,
.body_remaining_chunk_len => |*x| x,
else => unreachable,
};
const in = reader.in;
len: switch (chunk_len_ptr.*) {
.head => {
@ -557,7 +565,7 @@ pub const Reader = struct {
in.toss(i);
},
else => {
try in.fill(max_chunk_header_len);
try endless(reader, in.fill(max_chunk_header_len));
const next_i = cp.feed(in.bufferContents()[i..]);
if (cp.state != .data) return reader.failBody(error.HttpChunkInvalid);
const header_len = i + next_i;
@ -566,7 +574,7 @@ pub const Reader = struct {
},
}
if (cp.chunk_len == 0) return parseTrailers(reader, 0);
const n = try in.read(bw, limit.min(.limited(cp.chunk_len)));
const n = try endless(reader, in.read(bw, limit.min(.limited(cp.chunk_len))));
chunk_len_ptr.* = .init(cp.chunk_len + 2 - n);
return n;
},
@ -576,27 +584,31 @@ pub const Reader = struct {
continue :len .head;
},
.rn => {
const rn = try in.peekArray(2);
const rn = try endless(reader, in.peekArray(2));
if (rn[0] != '\r' or rn[1] != '\n') return reader.failBody(error.HttpChunkInvalid);
in.toss(2);
continue :len .head;
},
else => |remaining_chunk_len| {
const n = try in.read(bw, limit.min(.limited(@intFromEnum(remaining_chunk_len) - 2)));
const n = try endless(reader, in.read(bw, limit.min(.limited(@intFromEnum(remaining_chunk_len) - 2))));
chunk_len_ptr.* = .init(@intFromEnum(remaining_chunk_len) - n);
return n;
},
.done => return error.EndOfStream,
}
}
fn chunkedReadVec(ctx: ?*anyopaque, data: []const []u8) std.io.Reader.Error!usize {
const reader: *Reader = @alignCast(@ptrCast(ctx));
const chunk_len_ptr = &reader.state.body_remaining_chunk_len;
const chunk_len_ptr = switch (reader.state) {
.ready => return error.EndOfStream,
.body_remaining_chunk_len => |*x| x,
else => unreachable,
};
const in = reader.in;
var already_requested_more = false;
var amt_read: usize = 0;
data: for (data) |d| {
var d_i: usize = 0;
len: switch (chunk_len_ptr.*) {
.head => {
var cp: ChunkParser = .init;
@ -609,7 +621,7 @@ pub const Reader = struct {
return amt_read;
}
already_requested_more = true;
try in.fill(max_chunk_header_len);
try endless(reader, in.fill(max_chunk_header_len));
const next_i = cp.feed(in.bufferContents()[i..]);
if (cp.state != .data) return reader.failBody(error.HttpChunkInvalid);
const header_len = i + next_i;
@ -624,23 +636,24 @@ pub const Reader = struct {
},
.n => {
if (in.bufferContents().len < 1) already_requested_more = true;
if ((try in.takeByte()) != '\n') return reader.failBody(error.HttpChunkInvalid);
if ((try endless(reader, in.takeByte())) != '\n') return reader.failBody(error.HttpChunkInvalid);
continue :len .head;
},
.rn => {
if (in.bufferContents().len < 2) already_requested_more = true;
const rn = try in.takeArray(2);
const rn = try endless(reader, in.takeArray(2));
if (rn[0] != '\r' or rn[1] != '\n') return reader.failBody(error.HttpChunkInvalid);
continue :len .head;
},
else => |remaining_chunk_len| {
const available_buffer = in.bufferContents();
const copy_len = @min(available_buffer.len, d.len, remaining_chunk_len.int() - 2);
@memcpy(d[0..copy_len], available_buffer[0..copy_len]);
const copy_len = @min(available_buffer.len, d.len - d_i, remaining_chunk_len.int() - 2);
@memcpy(d[d_i..][0..copy_len], available_buffer[0..copy_len]);
d_i += copy_len;
amt_read += copy_len;
in.toss(copy_len);
const next_chunk_len: RemainingChunkLen = .init(remaining_chunk_len.int() - copy_len);
if (copy_len == d.len) {
if (d.len - d_i == 0) {
chunk_len_ptr.* = next_chunk_len;
continue :data;
}
@ -649,10 +662,9 @@ pub const Reader = struct {
return amt_read;
}
already_requested_more = true;
try in.fill(3);
try endless(reader, in.fillMore());
continue :len next_chunk_len;
},
.done => return error.EndOfStream,
}
}
return amt_read;
@ -660,7 +672,11 @@ pub const Reader = struct {
fn chunkedDiscard(ctx: ?*anyopaque, limit: std.io.Reader.Limit) std.io.Reader.Error!usize {
const reader: *Reader = @alignCast(@ptrCast(ctx));
const chunk_len_ptr = &reader.state.body_remaining_chunk_len;
const chunk_len_ptr = switch (reader.state) {
.ready => return error.EndOfStream,
.body_remaining_chunk_len => |*x| x,
else => unreachable,
};
const in = reader.in;
len: switch (chunk_len_ptr.*) {
.head => {
@ -673,7 +689,7 @@ pub const Reader = struct {
in.toss(i);
},
else => {
try in.fill(max_chunk_header_len);
try endless(reader, in.fill(max_chunk_header_len));
const next_i = cp.feed(in.bufferContents()[i..]);
if (cp.state != .data) return reader.failBody(error.HttpChunkInvalid);
const header_len = i + next_i;
@ -682,27 +698,26 @@ pub const Reader = struct {
},
}
if (cp.chunk_len == 0) return parseTrailers(reader, 0);
const n = try in.discard(limit.min(.limited(cp.chunk_len)));
const n = try endless(reader, in.discard(limit.min(.limited(cp.chunk_len))));
chunk_len_ptr.* = .init(cp.chunk_len + 2 - n);
return n;
},
.n => {
if ((try in.peekByte()) != '\n') return reader.failBody(error.HttpChunkInvalid);
if ((try endless(reader, in.peekByte())) != '\n') return reader.failBody(error.HttpChunkInvalid);
in.toss(1);
continue :len .head;
},
.rn => {
const rn = try in.peekArray(2);
const rn = try endless(reader, in.peekArray(2));
if (rn[0] != '\r' or rn[1] != '\n') return reader.failBody(error.HttpChunkInvalid);
in.toss(2);
continue :len .head;
},
else => |remaining_chunk_len| {
const n = try in.discard(limit.min(.limited(remaining_chunk_len.int() - 2)));
const n = try endless(reader, in.discard(limit.min(.limited(remaining_chunk_len.int() - 2))));
chunk_len_ptr.* = .init(remaining_chunk_len.int() - n);
return n;
},
.done => return error.EndOfStream,
}
}
@ -717,9 +732,8 @@ pub const Reader = struct {
try in.fill(trailers_len + 1);
trailers_len += hp.feed(in.bufferContents()[trailers_len..]);
if (hp.state == .finished) {
reader.state.body_remaining_chunk_len = .done;
reader.state = .ready;
reader.trailers_len = trailers_len;
reader.trailers = in.bufferContents()[0..trailers_len];
return amt_read;
}
}
@ -729,6 +743,13 @@ pub const Reader = struct {
r.body_err = err;
return error.ReadFailed;
}
fn endless(r: *Reader, x: anytype) @TypeOf(x) {
return x catch |err| switch (err) {
error.EndOfStream => return failBody(r, error.HttpChunkTruncated),
else => return err,
};
}
};
pub const Decompressor = struct {
@ -823,18 +844,23 @@ pub const BodyWriter = struct {
};
/// Sends all buffered data across `BodyWriter.http_protocol_output`.
///
/// Some buffered data will remain if transfer-encoding is chunked and the
/// BodyWriter is mid-chunk.
pub fn flush(w: *BodyWriter) WriteError!void {
const out = w.http_protocol_output;
switch (w.state) {
.end, .none, .content_length => return w.http_protocol_output.flush(),
.end, .none, .content_length => return out.flush(),
.chunked => |*chunked| switch (chunked.*) {
.offset => |*offset| {
try w.http_protocol_output.flushLimit(.limited(offset.*));
offset.* = 0;
.offset => |offset| {
const chunk_len = out.end - offset - chunk_header_template.len;
if (chunk_len > 0) {
writeHex(out.buffer[offset..][0..chunk_len_digits], chunk_len);
chunked.* = .{ .chunk_len = 2 };
} else {
out.end = offset;
chunked.* = .{ .chunk_len = 0 };
}
try out.flush();
},
.chunk_len => return w.http_protocol_output.flush(),
.chunk_len => return out.flush(),
},
}
}
@ -875,7 +901,7 @@ pub const BodyWriter = struct {
w.state = .end;
},
.none => {},
.chunked => return endChunked(w, .{}),
.chunked => return endChunkedUnflushed(w, .{}),
}
}
@ -883,6 +909,21 @@ pub const BodyWriter = struct {
trailers: []const Header = &.{},
};
/// Writes the end-of-stream message and any optional trailers, flushing
/// the underlying stream.
///
/// Asserts that the BodyWriter is using transfer-encoding: chunked.
///
/// Respects the value of `elide` to omit all data after the headers.
///
/// See also:
/// * `endChunkedUnflushed`
/// * `end`
pub fn endChunked(w: *BodyWriter, options: EndChunkedOptions) WriteError!void {
try endChunkedUnflushed(w, options);
try w.http_protocol_output.flush();
}
/// Writes the end-of-stream message and any optional trailers.
///
/// Does not flush.
@ -892,9 +933,10 @@ pub const BodyWriter = struct {
/// Respects the value of `elide` to omit all data after the headers.
///
/// See also:
/// * `end`
/// * `endChunked`
/// * `endUnflushed`
pub fn endChunked(w: *BodyWriter, options: EndChunkedOptions) WriteError!void {
/// * `end`
pub fn endChunkedUnflushed(w: *BodyWriter, options: EndChunkedOptions) WriteError!void {
const chunked = &w.state.chunked;
if (w.elide) {
w.state = .end;

View File

@ -703,6 +703,16 @@ pub const Response = struct {
pub fn bodyErr(response: *const Response) ?http.Reader.BodyError {
return response.request.reader.body_err;
}
pub fn iterateTrailers(response: *const Response) http.HeaderIterator {
const r = &response.request.reader;
assert(r.state == .ready);
return .{
.bytes = r.trailers,
.index = 0,
.is_trailer = true,
};
}
};
pub const Request = struct {
@ -951,6 +961,7 @@ pub const Request = struct {
HttpRedirectLocationInvalid,
HttpContentEncodingUnsupported,
HttpChunkInvalid,
HttpChunkTruncated,
HttpHeadersOversize,
UnsupportedUriScheme,

View File

@ -72,20 +72,22 @@ test "trailers" {
try expectEqualStrings("Hello, World!\n", body);
var it = response.head.iterateHeaders();
{
var it = response.head.iterateHeaders();
const header = it.next().?;
try expect(!it.is_trailer);
try expectEqualStrings("transfer-encoding", header.name);
try expectEqualStrings("chunked", header.value);
try expectEqual(null, it.next());
}
{
var it = response.iterateTrailers();
const header = it.next().?;
try expect(it.is_trailer);
try expectEqualStrings("X-Checksum", header.name);
try expectEqualStrings("aaaa", header.value);
try expectEqual(null, it.next());
}
try expectEqual(null, it.next());
}
// connection has been kept alive

View File

@ -357,37 +357,48 @@ pub fn discardRemaining(br: *BufferedReader) Reader.ShortError!usize {
///
/// See also:
/// * `peek`
/// * `readSliceShort`
pub fn readSlice(br: *BufferedReader, buffer: []u8) Reader.Error!void {
const n = try readSliceShort(br, buffer);
if (n != buffer.len) return error.EndOfStream;
}
/// Fill `buffer` with the next `buffer.len` bytes from the stream, advancing
/// the seek position.
///
/// Invalidates previously returned values from `peek`.
///
/// Returns the number of bytes read, which is less than `buffer.len` if and
/// only if the stream reached the end.
///
/// See also:
/// * `readSlice`
pub fn readSliceShort(br: *BufferedReader, buffer: []u8) Reader.ShortError!usize {
const in_buffer = br.buffer[br.seek..br.end];
const copy_len = @min(buffer.len, in_buffer.len);
@memcpy(buffer[0..copy_len], in_buffer[0..copy_len]);
if (copy_len == buffer.len) {
if (buffer.len - copy_len == 0) {
br.seek += copy_len;
return;
return buffer.len;
}
var i: usize = copy_len;
br.end = 0;
br.seek = 0;
while (true) {
const remaining = buffer[i..];
const n = try br.unbuffered_reader.readVec(&.{ remaining, br.buffer });
const n = br.unbuffered_reader.readVec(&.{ remaining, br.buffer }) catch |err| switch (err) {
error.EndOfStream => return i,
error.ReadFailed => return error.ReadFailed,
};
if (n < remaining.len) {
i += n;
continue;
}
br.end = n - remaining.len;
return;
return buffer.len;
}
}
/// Returns the number of bytes read, which is less than `buffer.len` if and
/// only if the stream reached the end.
pub fn readSliceShort(br: *BufferedReader, buffer: []u8) Reader.ShortError!usize {
_ = br;
_ = buffer;
@panic("TODO");
}
pub const ReadAllocError = Reader.Error || Allocator.Error;
/// The function is inline to avoid the dead code in case `endian` is