diff --git a/lib/std/http/Client.zig b/lib/std/http/Client.zig index 6660761930..5b817af467 100644 --- a/lib/std/http/Client.zig +++ b/lib/std/http/Client.zig @@ -682,7 +682,7 @@ pub const Response = struct { /// /// See also: /// * `readerDecompressing` - pub fn reader(response: *Response, buffer: []u8) *Reader { + pub fn reader(response: *const Response, buffer: []u8) *Reader { const req = response.request; if (!req.method.responseHasBody()) return .ending; const head = &response.head; @@ -805,6 +805,11 @@ pub const Request = struct { unhandled = std.math.maxInt(u16), _, + pub fn init(n: u16) RedirectBehavior { + assert(n != std.math.maxInt(u16)); + return @enumFromInt(n); + } + pub fn subtractOne(rb: *RedirectBehavior) void { switch (rb.*) { .not_allowed => unreachable, @@ -855,6 +860,14 @@ pub const Request = struct { return result; } + /// Transfers the HTTP head and body over the connection and flushes. + pub fn sendBodyComplete(r: *Request, body: []u8) Writer.Error!void { + r.transfer_encoding = .{ .content_length = body.len }; + var bw = try sendBodyUnflushed(r, body); + bw.writer.end = body.len; + try bw.end(); + } + /// Transfers the HTTP head over the connection, which is not flushed until /// `BodyWriter.flush` or `BodyWriter.end` is called. /// @@ -1296,7 +1309,7 @@ pub const basic_authorization = struct { pub fn value(uri: Uri, out: []u8) []u8 { var bw: Writer = .fixed(out); write(uri, &bw) catch unreachable; - return bw.getWritten(); + return bw.buffered(); } pub fn write(uri: Uri, out: *Writer) Writer.Error!void { diff --git a/src/Package/Fetch.zig b/src/Package/Fetch.zig index 836435a86b..b6d22a37ac 100644 --- a/src/Package/Fetch.zig +++ b/src/Package/Fetch.zig @@ -385,20 +385,21 @@ pub fn run(f: *Fetch) RunError!void { var resource: Resource = .{ .dir = dir }; return f.runResource(path_or_url, &resource, null); } else |dir_err| { + var server_header_buffer: [init_resource_buffer_size]u8 = undefined; + const file_err = if (dir_err == error.NotDir) e: { if (fs.cwd().openFile(path_or_url, .{})) |file| { - var resource: Resource = .{ .file = file }; + var resource: Resource = .{ .file = file.reader(&server_header_buffer) }; return f.runResource(path_or_url, &resource, null); } else |err| break :e err; } else dir_err; const uri = std.Uri.parse(path_or_url) catch |uri_err| { return f.fail(0, try eb.printString( - "'{s}' could not be recognized as a file path ({s}) or an URL ({s})", - .{ path_or_url, @errorName(file_err), @errorName(uri_err) }, + "'{s}' could not be recognized as a file path ({t}) or an URL ({t})", + .{ path_or_url, file_err, uri_err }, )); }; - var server_header_buffer: [header_buffer_size]u8 = undefined; var resource = try f.initResource(uri, &server_header_buffer); return f.runResource(try uri.path.toRawMaybeAlloc(arena), &resource, null); } @@ -464,8 +465,8 @@ pub fn run(f: *Fetch) RunError!void { f.location_tok, try eb.printString("invalid URI: {s}", .{@errorName(err)}), ); - var server_header_buffer: [header_buffer_size]u8 = undefined; - var resource = try f.initResource(uri, &server_header_buffer); + var buffer: [init_resource_buffer_size]u8 = undefined; + var resource = try f.initResource(uri, &buffer); return f.runResource(try uri.path.toRawMaybeAlloc(arena), &resource, remote.hash); } @@ -866,8 +867,8 @@ fn fail(f: *Fetch, msg_tok: std.zig.Ast.TokenIndex, msg_str: u32) RunError { } const Resource = union(enum) { - file: fs.File, - http_request: std.http.Client.Request, + file: fs.File.Reader, + http_request: HttpRequest, git: Git, dir: fs.Dir, @@ -877,10 +878,16 @@ const Resource = union(enum) { want_oid: git.Oid, }; + const HttpRequest = struct { + request: std.http.Client.Request, + head: std.http.Client.Response.Head, + buffer: []u8, + }; + fn deinit(resource: *Resource) void { switch (resource.*) { - .file => |*file| file.close(), - .http_request => |*req| req.deinit(), + .file => |*file_reader| file_reader.file.close(), + .http_request => |*http_request| http_request.request.deinit(), .git => |*git_resource| { git_resource.fetch_stream.deinit(); git_resource.session.deinit(); @@ -890,21 +897,19 @@ const Resource = union(enum) { resource.* = undefined; } - fn reader(resource: *Resource) std.io.AnyReader { - return .{ - .context = resource, - .readFn = read, - }; - } - - fn read(context: *const anyopaque, buffer: []u8) anyerror!usize { - const resource: *Resource = @constCast(@ptrCast(@alignCast(context))); - switch (resource.*) { - .file => |*f| return f.read(buffer), - .http_request => |*r| return r.read(buffer), - .git => |*g| return g.fetch_stream.read(buffer), + fn reader(resource: *Resource) *std.Io.Reader { + return switch (resource.*) { + .file => |*file_reader| return &file_reader.interface, + .http_request => |*http_request| { + const response: std.http.Client.Response = .{ + .request = &http_request.request, + .head = http_request.head, + }; + return response.reader(http_request.buffer); + }, + .git => |*g| return &g.fetch_stream.reader, .dir => unreachable, - } + }; } }; @@ -967,20 +972,21 @@ const FileType = enum { } }; -const header_buffer_size = 16 * 1024; +const init_resource_buffer_size = git.Packet.max_data_length; -fn initResource(f: *Fetch, uri: std.Uri, server_header_buffer: []u8) RunError!Resource { +fn initResource(f: *Fetch, uri: std.Uri, reader_buffer: []u8) RunError!Resource { const gpa = f.arena.child_allocator; const arena = f.arena.allocator(); const eb = &f.error_bundle; if (ascii.eqlIgnoreCase(uri.scheme, "file")) { const path = try uri.path.toRawMaybeAlloc(arena); - return .{ .file = f.parent_package_root.openFile(path, .{}) catch |err| { - return f.fail(f.location_tok, try eb.printString("unable to open '{f}{s}': {s}", .{ - f.parent_package_root, path, @errorName(err), + const file = f.parent_package_root.openFile(path, .{}) catch |err| { + return f.fail(f.location_tok, try eb.printString("unable to open '{f}{s}': {t}", .{ + f.parent_package_root, path, err, })); - } }; + }; + return .{ .file = file.reader(reader_buffer) }; } const http_client = f.job_queue.http_client; @@ -988,37 +994,27 @@ fn initResource(f: *Fetch, uri: std.Uri, server_header_buffer: []u8) RunError!Re if (ascii.eqlIgnoreCase(uri.scheme, "http") or ascii.eqlIgnoreCase(uri.scheme, "https")) { - var req = http_client.open(.GET, uri, .{ - .server_header_buffer = server_header_buffer, - }) catch |err| { - return f.fail(f.location_tok, try eb.printString( - "unable to connect to server: {s}", - .{@errorName(err)}, - )); - }; - errdefer req.deinit(); // releases more than memory + var request = http_client.request(.GET, uri, .{}) catch |err| + return f.fail(f.location_tok, try eb.printString("unable to connect to server: {t}", .{err})); + defer request.deinit(); - req.send() catch |err| { - return f.fail(f.location_tok, try eb.printString( - "HTTP request failed: {s}", - .{@errorName(err)}, - )); - }; - req.wait() catch |err| { - return f.fail(f.location_tok, try eb.printString( - "invalid HTTP response: {s}", - .{@errorName(err)}, - )); - }; + request.sendBodiless() catch |err| + return f.fail(f.location_tok, try eb.printString("HTTP request failed: {t}", .{err})); - if (req.response.status != .ok) { - return f.fail(f.location_tok, try eb.printString( - "bad HTTP response code: '{d} {s}'", - .{ @intFromEnum(req.response.status), req.response.status.phrase() orelse "" }, - )); - } + var redirect_buffer: [1024]u8 = undefined; + const response = request.receiveHead(&redirect_buffer) catch |err| + return f.fail(f.location_tok, try eb.printString("invalid HTTP response: {t}", .{err})); - return .{ .http_request = req }; + if (response.head.status != .ok) return f.fail(f.location_tok, try eb.printString( + "bad HTTP response code: '{d} {s}'", + .{ response.head.status, response.head.status.phrase() orelse "" }, + )); + + return .{ .http_request = .{ + .request = request, + .head = response.head, + .buffer = reader_buffer, + } }; } if (ascii.eqlIgnoreCase(uri.scheme, "git+http") or @@ -1026,7 +1022,7 @@ fn initResource(f: *Fetch, uri: std.Uri, server_header_buffer: []u8) RunError!Re { var transport_uri = uri; transport_uri.scheme = uri.scheme["git+".len..]; - var session = git.Session.init(gpa, http_client, transport_uri, server_header_buffer) catch |err| { + var session = git.Session.init(gpa, http_client, transport_uri, reader_buffer) catch |err| { return f.fail(f.location_tok, try eb.printString( "unable to discover remote git server capabilities: {s}", .{@errorName(err)}, @@ -1042,16 +1038,12 @@ fn initResource(f: *Fetch, uri: std.Uri, server_header_buffer: []u8) RunError!Re const want_ref_head = try std.fmt.allocPrint(arena, "refs/heads/{s}", .{want_ref}); const want_ref_tag = try std.fmt.allocPrint(arena, "refs/tags/{s}", .{want_ref}); - var ref_iterator = session.listRefs(.{ + var ref_iterator: git.Session.RefIterator = undefined; + session.listRefs(&ref_iterator, .{ .ref_prefixes = &.{ want_ref, want_ref_head, want_ref_tag }, .include_peeled = true, - .server_header_buffer = server_header_buffer, - }) catch |err| { - return f.fail(f.location_tok, try eb.printString( - "unable to list refs: {s}", - .{@errorName(err)}, - )); - }; + .buffer = reader_buffer, + }) catch |err| return f.fail(f.location_tok, try eb.printString("unable to list refs: {t}", .{err})); defer ref_iterator.deinit(); while (ref_iterator.next() catch |err| { return f.fail(f.location_tok, try eb.printString( @@ -1089,14 +1081,14 @@ fn initResource(f: *Fetch, uri: std.Uri, server_header_buffer: []u8) RunError!Re var want_oid_buf: [git.Oid.max_formatted_length]u8 = undefined; _ = std.fmt.bufPrint(&want_oid_buf, "{f}", .{want_oid}) catch unreachable; - var fetch_stream = session.fetch(&.{&want_oid_buf}, server_header_buffer) catch |err| { - return f.fail(f.location_tok, try eb.printString( - "unable to create fetch stream: {s}", - .{@errorName(err)}, - )); + var fetch_stream: git.Session.FetchStream = undefined; + session.fetch(&fetch_stream, &.{&want_oid_buf}, reader_buffer) catch |err| { + return f.fail(f.location_tok, try eb.printString("unable to create fetch stream: {t}", .{err})); }; errdefer fetch_stream.deinit(); + if (true) @panic("TODO this moves fetch_stream, invalidating its reader"); + return .{ .git = .{ .session = session, .fetch_stream = fetch_stream, @@ -1104,10 +1096,7 @@ fn initResource(f: *Fetch, uri: std.Uri, server_header_buffer: []u8) RunError!Re } }; } - return f.fail(f.location_tok, try eb.printString( - "unsupported URL scheme: {s}", - .{uri.scheme}, - )); + return f.fail(f.location_tok, try eb.printString("unsupported URL scheme: {s}", .{uri.scheme})); } fn unpackResource( @@ -1121,9 +1110,11 @@ fn unpackResource( .file => FileType.fromPath(uri_path) orelse return f.fail(f.location_tok, try eb.printString("unknown file type: '{s}'", .{uri_path})), - .http_request => |req| ft: { + .http_request => |*http_request| ft: { + const head = &http_request.head; + // Content-Type takes first precedence. - const content_type = req.response.content_type orelse + const content_type = head.content_type orelse return f.fail(f.location_tok, try eb.addString("missing 'Content-Type' header")); // Extract the MIME type, ignoring charset and boundary directives @@ -1165,7 +1156,7 @@ fn unpackResource( } // Next, the filename from 'content-disposition: attachment' takes precedence. - if (req.response.content_disposition) |cd_header| { + if (head.content_disposition) |cd_header| { break :ft FileType.fromContentDisposition(cd_header) orelse { return f.fail(f.location_tok, try eb.printString( "unsupported Content-Disposition header value: '{s}' for Content-Type=application/octet-stream", @@ -1176,10 +1167,7 @@ fn unpackResource( // Finally, the path from the URI is used. break :ft FileType.fromPath(uri_path) orelse { - return f.fail(f.location_tok, try eb.printString( - "unknown file type: '{s}'", - .{uri_path}, - )); + return f.fail(f.location_tok, try eb.printString("unknown file type: '{s}'", .{uri_path})); }; }, @@ -1187,10 +1175,9 @@ fn unpackResource( .dir => |dir| { f.recursiveDirectoryCopy(dir, tmp_directory.handle) catch |err| { - return f.fail(f.location_tok, try eb.printString( - "unable to copy directory '{s}': {s}", - .{ uri_path, @errorName(err) }, - )); + return f.fail(f.location_tok, try eb.printString("unable to copy directory '{s}': {t}", .{ + uri_path, err, + })); }; return .{}; }, @@ -1198,15 +1185,11 @@ fn unpackResource( switch (file_type) { .tar => { - var adapter_buffer: [1024]u8 = undefined; - var adapter = resource.reader().adaptToNewApi(&adapter_buffer); - return unpackTarball(f, tmp_directory.handle, &adapter.new_interface); + return unpackTarball(f, tmp_directory.handle, resource.reader()); }, .@"tar.gz" => { - var adapter_buffer: [std.crypto.tls.max_ciphertext_record_len]u8 = undefined; - var adapter = resource.reader().adaptToNewApi(&adapter_buffer); var flate_buffer: [std.compress.flate.max_window_len]u8 = undefined; - var decompress: std.compress.flate.Decompress = .init(&adapter.new_interface, .gzip, &flate_buffer); + var decompress: std.compress.flate.Decompress = .init(resource.reader(), .gzip, &flate_buffer); return try unpackTarball(f, tmp_directory.handle, &decompress.reader); }, .@"tar.xz" => { @@ -1227,9 +1210,7 @@ fn unpackResource( .@"tar.zst" => { const window_size = std.compress.zstd.default_window_len; const window_buffer = try f.arena.allocator().create([window_size]u8); - var adapter_buffer: [std.crypto.tls.max_ciphertext_record_len]u8 = undefined; - var adapter = resource.reader().adaptToNewApi(&adapter_buffer); - var decompress: std.compress.zstd.Decompress = .init(&adapter.new_interface, window_buffer, .{ + var decompress: std.compress.zstd.Decompress = .init(resource.reader(), window_buffer, .{ .verify_checksum = false, }); return try unpackTarball(f, tmp_directory.handle, &decompress.reader); @@ -1237,12 +1218,15 @@ fn unpackResource( .git_pack => return unpackGitPack(f, tmp_directory.handle, &resource.git) catch |err| switch (err) { error.FetchFailed => return error.FetchFailed, error.OutOfMemory => return error.OutOfMemory, - else => |e| return f.fail(f.location_tok, try eb.printString( - "unable to unpack git files: {s}", - .{@errorName(e)}, - )), + else => |e| return f.fail(f.location_tok, try eb.printString("unable to unpack git files: {t}", .{e})), + }, + .zip => return unzip(f, tmp_directory.handle, resource.reader()) catch |err| switch (err) { + error.ReadFailed => return f.fail(f.location_tok, try eb.printString( + "failed reading resource: {t}", + .{err}, + )), + else => |e| return e, }, - .zip => return try unzip(f, tmp_directory.handle, resource.reader()), } } @@ -1277,99 +1261,69 @@ fn unpackTarball(f: *Fetch, out_dir: fs.Dir, reader: *std.Io.Reader) RunError!Un return res; } -fn unzip(f: *Fetch, out_dir: fs.Dir, reader: anytype) RunError!UnpackResult { +fn unzip(f: *Fetch, out_dir: fs.Dir, reader: *std.Io.Reader) error{ ReadFailed, OutOfMemory, FetchFailed }!UnpackResult { // We write the entire contents to a file first because zip files // must be processed back to front and they could be too large to // load into memory. const cache_root = f.job_queue.global_cache; - - // TODO: the downside of this solution is if we get a failure/crash/oom/power out - // during this process, we leave behind a zip file that would be - // difficult to know if/when it can be cleaned up. - // Might be worth it to use a mechanism that enables other processes - // to see if the owning process of a file is still alive (on linux this - // can be done with file locks). - // Coupled with this mechansism, we could also use slots (i.e. zig-cache/tmp/0, - // zig-cache/tmp/1, etc) which would mean that subsequent runs would - // automatically clean up old dead files. - // This could all be done with a simple TmpFile abstraction. const prefix = "tmp/"; const suffix = ".zip"; - - const random_bytes_count = 20; - const random_path_len = comptime std.fs.base64_encoder.calcSize(random_bytes_count); - var zip_path: [prefix.len + random_path_len + suffix.len]u8 = undefined; - @memcpy(zip_path[0..prefix.len], prefix); - @memcpy(zip_path[prefix.len + random_path_len ..], suffix); - { - var random_bytes: [random_bytes_count]u8 = undefined; - std.crypto.random.bytes(&random_bytes); - _ = std.fs.base64_encoder.encode( - zip_path[prefix.len..][0..random_path_len], - &random_bytes, - ); - } - - defer cache_root.handle.deleteFile(&zip_path) catch {}; - const eb = &f.error_bundle; + const random_len = @sizeOf(u64) * 2; - { - var zip_file = cache_root.handle.createFile( - &zip_path, - .{}, - ) catch |err| return f.fail(f.location_tok, try eb.printString( - "failed to create tmp zip file: {s}", - .{@errorName(err)}, - )); - defer zip_file.close(); - var buf: [4096]u8 = undefined; - while (true) { - const len = reader.readAll(&buf) catch |err| return f.fail(f.location_tok, try eb.printString( - "read zip stream failed: {s}", - .{@errorName(err)}, - )); - if (len == 0) break; - zip_file.deprecatedWriter().writeAll(buf[0..len]) catch |err| return f.fail(f.location_tok, try eb.printString( - "write temporary zip file failed: {s}", - .{@errorName(err)}, - )); - } - } + var zip_path: [prefix.len + random_len + suffix.len]u8 = undefined; + zip_path[0..prefix.len].* = prefix.*; + zip_path[prefix.len + random_len ..].* = suffix.*; + + var zip_file = while (true) { + const random_integer = std.crypto.random.int(u64); + zip_path[prefix.len..][0..random_len].* = std.fmt.hex(random_integer); + + break cache_root.handle.createFile(&zip_path, .{ + .exclusive = true, + .read = true, + }) catch |err| switch (err) { + error.PathAlreadyExists => continue, + else => |e| return f.fail( + f.location_tok, + try eb.printString("failed to create temporary zip file: {t}", .{e}), + ), + }; + }; + defer zip_file.close(); + var zip_file_buffer: [4096]u8 = undefined; + var zip_file_reader = b: { + var zip_file_writer = zip_file.writer(&zip_file_buffer); + + _ = reader.streamRemaining(&zip_file_writer.interface) catch |err| switch (err) { + error.ReadFailed => return error.ReadFailed, + error.WriteFailed => return f.fail( + f.location_tok, + try eb.printString("failed writing temporary zip file: {t}", .{err}), + ), + }; + zip_file_writer.interface.flush() catch |err| return f.fail( + f.location_tok, + try eb.printString("failed writing temporary zip file: {t}", .{err}), + ); + break :b zip_file_writer.moveToReader(); + }; var diagnostics: std.zip.Diagnostics = .{ .allocator = f.arena.allocator() }; // no need to deinit since we are using an arena allocator - { - var zip_file = cache_root.handle.openFile( - &zip_path, - .{}, - ) catch |err| return f.fail(f.location_tok, try eb.printString( - "failed to open temporary zip file: {s}", - .{@errorName(err)}, - )); - defer zip_file.close(); + zip_file_reader.seekTo(0) catch |err| + return f.fail(f.location_tok, try eb.printString("failed to seek temporary zip file: {t}", .{err})); + std.zip.extract(out_dir, &zip_file_reader, .{ + .allow_backslashes = true, + .diagnostics = &diagnostics, + }) catch |err| return f.fail(f.location_tok, try eb.printString("zip extract failed: {t}", .{err})); - var zip_file_buffer: [1024]u8 = undefined; - var zip_file_reader = zip_file.reader(&zip_file_buffer); + cache_root.handle.deleteFile(&zip_path) catch |err| + return f.fail(f.location_tok, try eb.printString("delete temporary zip failed: {t}", .{err})); - std.zip.extract(out_dir, &zip_file_reader, .{ - .allow_backslashes = true, - .diagnostics = &diagnostics, - }) catch |err| return f.fail(f.location_tok, try eb.printString( - "zip extract failed: {s}", - .{@errorName(err)}, - )); - } - - cache_root.handle.deleteFile(&zip_path) catch |err| return f.fail(f.location_tok, try eb.printString( - "delete temporary zip failed: {s}", - .{@errorName(err)}, - )); - - const res: UnpackResult = .{ .root_dir = diagnostics.root_dir }; - return res; + return .{ .root_dir = diagnostics.root_dir }; } fn unpackGitPack(f: *Fetch, out_dir: fs.Dir, resource: *Resource.Git) anyerror!UnpackResult { @@ -1387,10 +1341,13 @@ fn unpackGitPack(f: *Fetch, out_dir: fs.Dir, resource: *Resource.Git) anyerror!U var pack_file = try pack_dir.createFile("pkg.pack", .{ .read = true }); defer pack_file.close(); var pack_file_buffer: [4096]u8 = undefined; - var fifo = std.fifo.LinearFifo(u8, .{ .Slice = {} }).init(&pack_file_buffer); - try fifo.pump(resource.fetch_stream.reader(), pack_file.deprecatedWriter()); - - var pack_file_reader = pack_file.reader(&pack_file_buffer); + var pack_file_reader = b: { + var pack_file_writer = pack_file.writer(&pack_file_buffer); + const fetch_reader = &resource.fetch_stream.reader; + _ = try fetch_reader.streamRemaining(&pack_file_writer.interface); + try pack_file_writer.interface.flush(); + break :b pack_file_writer.moveToReader(); + }; var index_file = try pack_dir.createFile("pkg.idx", .{ .read = true }); defer index_file.close(); diff --git a/src/Package/Fetch/git.zig b/src/Package/Fetch/git.zig index 6ff951014b..88652343f5 100644 --- a/src/Package/Fetch/git.zig +++ b/src/Package/Fetch/git.zig @@ -585,17 +585,17 @@ const ObjectCache = struct { /// [protocol-common](https://git-scm.com/docs/protocol-common). The special /// meanings of the delimiter and response-end packets are documented in /// [protocol-v2](https://git-scm.com/docs/protocol-v2). -const Packet = union(enum) { +pub const Packet = union(enum) { flush, delimiter, response_end, data: []const u8, - const max_data_length = 65516; + pub const max_data_length = 65516; /// Reads a packet in pkt-line format. - fn read(reader: anytype, buf: *[max_data_length]u8) !Packet { - const length = std.fmt.parseUnsigned(u16, &try reader.readBytesNoEof(4), 16) catch return error.InvalidPacket; + fn read(reader: *std.Io.Reader) !Packet { + const length = std.fmt.parseUnsigned(u16, try reader.take(4), 16) catch return error.InvalidPacket; switch (length) { 0 => return .flush, 1 => return .delimiter, @@ -603,13 +603,11 @@ const Packet = union(enum) { 3 => return error.InvalidPacket, else => if (length - 4 > max_data_length) return error.InvalidPacket, } - const data = buf[0 .. length - 4]; - try reader.readNoEof(data); - return .{ .data = data }; + return .{ .data = try reader.take(length - 4) }; } /// Writes a packet in pkt-line format. - fn write(packet: Packet, writer: anytype) !void { + fn write(packet: Packet, writer: *std.Io.Writer) !void { switch (packet) { .flush => try writer.writeAll("0000"), .delimiter => try writer.writeAll("0001"), @@ -657,8 +655,10 @@ pub const Session = struct { allocator: Allocator, transport: *std.http.Client, uri: std.Uri, - http_headers_buffer: []u8, + /// Asserted to be at least `Packet.max_data_length` + response_buffer: []u8, ) !Session { + assert(response_buffer.len >= Packet.max_data_length); var session: Session = .{ .transport = transport, .location = try .init(allocator, uri), @@ -668,7 +668,8 @@ pub const Session = struct { .allocator = allocator, }; errdefer session.deinit(); - var capability_iterator = try session.getCapabilities(http_headers_buffer); + var capability_iterator: CapabilityIterator = undefined; + try session.getCapabilities(&capability_iterator, response_buffer); defer capability_iterator.deinit(); while (try capability_iterator.next()) |capability| { if (mem.eql(u8, capability.key, "agent")) { @@ -743,7 +744,8 @@ pub const Session = struct { /// /// The `session.location` is updated if the server returns a redirect, so /// that subsequent session functions do not need to handle redirects. - fn getCapabilities(session: *Session, http_headers_buffer: []u8) !CapabilityIterator { + fn getCapabilities(session: *Session, it: *CapabilityIterator, response_buffer: []u8) !void { + assert(response_buffer.len >= Packet.max_data_length); var info_refs_uri = session.location.uri; { const session_uri_path = try std.fmt.allocPrint(session.allocator, "{f}", .{ @@ -757,19 +759,22 @@ pub const Session = struct { info_refs_uri.fragment = null; const max_redirects = 3; - var request = try session.transport.open(.GET, info_refs_uri, .{ - .redirect_behavior = @enumFromInt(max_redirects), - .server_header_buffer = http_headers_buffer, - .extra_headers = &.{ - .{ .name = "Git-Protocol", .value = "version=2" }, - }, - }); - errdefer request.deinit(); - try request.send(); - try request.finish(); + it.* = .{ + .request = try session.transport.request(.GET, info_refs_uri, .{ + .redirect_behavior = .init(max_redirects), + .extra_headers = &.{ + .{ .name = "Git-Protocol", .value = "version=2" }, + }, + }), + .reader = undefined, + }; + errdefer it.deinit(); + const request = &it.request; + try request.sendBodiless(); - try request.wait(); - if (request.response.status != .ok) return error.ProtocolError; + var redirect_buffer: [1024]u8 = undefined; + const response = try request.receiveHead(&redirect_buffer); + if (response.head.status != .ok) return error.ProtocolError; const any_redirects_occurred = request.redirect_behavior.remaining() < max_redirects; if (any_redirects_occurred) { const request_uri_path = try std.fmt.allocPrint(session.allocator, "{f}", .{ @@ -784,8 +789,7 @@ pub const Session = struct { session.location = new_location; } - const reader = request.reader(); - var buf: [Packet.max_data_length]u8 = undefined; + it.reader = response.reader(response_buffer); var state: enum { response_start, response_content } = .response_start; while (true) { // Some Git servers (at least GitHub) include an additional @@ -795,15 +799,15 @@ pub const Session = struct { // Thus, we need to skip any such useless additional responses // before we get the one we're actually looking for. The responses // will be delimited by flush packets. - const packet = Packet.read(reader, &buf) catch |e| switch (e) { + const packet = Packet.read(it.reader) catch |err| switch (err) { error.EndOfStream => return error.UnsupportedProtocol, // 'version 2' packet not found - else => |other| return other, + else => |e| return e, }; switch (packet) { .flush => state = .response_start, .data => |data| switch (state) { .response_start => if (mem.eql(u8, Packet.normalizeText(data), "version 2")) { - return .{ .request = request }; + return; } else { state = .response_content; }, @@ -816,7 +820,7 @@ pub const Session = struct { const CapabilityIterator = struct { request: std.http.Client.Request, - buf: [Packet.max_data_length]u8 = undefined, + reader: *std.Io.Reader, const Capability = struct { key: []const u8, @@ -830,13 +834,13 @@ pub const Session = struct { } }; - fn deinit(iterator: *CapabilityIterator) void { - iterator.request.deinit(); - iterator.* = undefined; + fn deinit(it: *CapabilityIterator) void { + it.request.deinit(); + it.* = undefined; } - fn next(iterator: *CapabilityIterator) !?Capability { - switch (try Packet.read(iterator.request.reader(), &iterator.buf)) { + fn next(it: *CapabilityIterator) !?Capability { + switch (try Packet.read(it.reader)) { .flush => return null, .data => |data| return Capability.parse(Packet.normalizeText(data)), else => return error.UnexpectedPacket, @@ -854,11 +858,13 @@ pub const Session = struct { include_symrefs: bool = false, /// Whether to include the peeled object ID for returned tag refs. include_peeled: bool = false, - server_header_buffer: []u8, + /// Asserted to be at least `Packet.max_data_length`. + buffer: []u8, }; /// Returns an iterator over refs known to the server. - pub fn listRefs(session: Session, options: ListRefsOptions) !RefIterator { + pub fn listRefs(session: Session, it: *RefIterator, options: ListRefsOptions) !void { + assert(options.buffer.len >= Packet.max_data_length); var upload_pack_uri = session.location.uri; { const session_uri_path = try std.fmt.allocPrint(session.allocator, "{f}", .{ @@ -871,59 +877,56 @@ pub const Session = struct { upload_pack_uri.query = null; upload_pack_uri.fragment = null; - var body: std.ArrayListUnmanaged(u8) = .empty; - defer body.deinit(session.allocator); - const body_writer = body.writer(session.allocator); - try Packet.write(.{ .data = "command=ls-refs\n" }, body_writer); + var body: std.Io.Writer = .fixed(options.buffer); + try Packet.write(.{ .data = "command=ls-refs\n" }, &body); if (session.supports_agent) { - try Packet.write(.{ .data = agent_capability }, body_writer); + try Packet.write(.{ .data = agent_capability }, &body); } { - const object_format_packet = try std.fmt.allocPrint(session.allocator, "object-format={s}\n", .{@tagName(session.object_format)}); + const object_format_packet = try std.fmt.allocPrint(session.allocator, "object-format={t}\n", .{ + session.object_format, + }); defer session.allocator.free(object_format_packet); - try Packet.write(.{ .data = object_format_packet }, body_writer); + try Packet.write(.{ .data = object_format_packet }, &body); } - try Packet.write(.delimiter, body_writer); + try Packet.write(.delimiter, &body); for (options.ref_prefixes) |ref_prefix| { const ref_prefix_packet = try std.fmt.allocPrint(session.allocator, "ref-prefix {s}\n", .{ref_prefix}); defer session.allocator.free(ref_prefix_packet); - try Packet.write(.{ .data = ref_prefix_packet }, body_writer); + try Packet.write(.{ .data = ref_prefix_packet }, &body); } if (options.include_symrefs) { - try Packet.write(.{ .data = "symrefs\n" }, body_writer); + try Packet.write(.{ .data = "symrefs\n" }, &body); } if (options.include_peeled) { - try Packet.write(.{ .data = "peel\n" }, body_writer); + try Packet.write(.{ .data = "peel\n" }, &body); } - try Packet.write(.flush, body_writer); + try Packet.write(.flush, &body); - var request = try session.transport.open(.POST, upload_pack_uri, .{ - .redirect_behavior = .unhandled, - .server_header_buffer = options.server_header_buffer, - .extra_headers = &.{ - .{ .name = "Content-Type", .value = "application/x-git-upload-pack-request" }, - .{ .name = "Git-Protocol", .value = "version=2" }, - }, - }); - errdefer request.deinit(); - request.transfer_encoding = .{ .content_length = body.items.len }; - try request.send(); - try request.writeAll(body.items); - try request.finish(); - - try request.wait(); - if (request.response.status != .ok) return error.ProtocolError; - - return .{ + it.* = .{ + .request = try session.transport.request(.POST, upload_pack_uri, .{ + .redirect_behavior = .unhandled, + .extra_headers = &.{ + .{ .name = "Content-Type", .value = "application/x-git-upload-pack-request" }, + .{ .name = "Git-Protocol", .value = "version=2" }, + }, + }), + .reader = undefined, .format = session.object_format, - .request = request, }; + const request = &it.request; + errdefer request.deinit(); + try request.sendBodyComplete(body.buffered()); + + const response = try request.receiveHead(options.buffer); + if (response.head.status != .ok) return error.ProtocolError; + it.reader = response.reader(options.buffer); } pub const RefIterator = struct { format: Oid.Format, request: std.http.Client.Request, - buf: [Packet.max_data_length]u8 = undefined, + reader: *std.Io.Reader, pub const Ref = struct { oid: Oid, @@ -937,13 +940,13 @@ pub const Session = struct { iterator.* = undefined; } - pub fn next(iterator: *RefIterator) !?Ref { - switch (try Packet.read(iterator.request.reader(), &iterator.buf)) { + pub fn next(it: *RefIterator) !?Ref { + switch (try Packet.read(it.reader)) { .flush => return null, .data => |data| { const ref_data = Packet.normalizeText(data); const oid_sep_pos = mem.indexOfScalar(u8, ref_data, ' ') orelse return error.InvalidRefPacket; - const oid = Oid.parse(iterator.format, data[0..oid_sep_pos]) catch return error.InvalidRefPacket; + const oid = Oid.parse(it.format, data[0..oid_sep_pos]) catch return error.InvalidRefPacket; const name_sep_pos = mem.indexOfScalarPos(u8, ref_data, oid_sep_pos + 1, ' ') orelse ref_data.len; const name = ref_data[oid_sep_pos + 1 .. name_sep_pos]; @@ -957,7 +960,7 @@ pub const Session = struct { if (mem.startsWith(u8, attribute, "symref-target:")) { symref_target = attribute["symref-target:".len..]; } else if (mem.startsWith(u8, attribute, "peeled:")) { - peeled = Oid.parse(iterator.format, attribute["peeled:".len..]) catch return error.InvalidRefPacket; + peeled = Oid.parse(it.format, attribute["peeled:".len..]) catch return error.InvalidRefPacket; } last_sep_pos = next_sep_pos; } @@ -973,9 +976,12 @@ pub const Session = struct { /// performed if the server supports it. pub fn fetch( session: Session, + fs: *FetchStream, wants: []const []const u8, - http_headers_buffer: []u8, - ) !FetchStream { + /// Asserted to be at least `Packet.max_data_length`. + response_buffer: []u8, + ) !void { + assert(response_buffer.len >= Packet.max_data_length); var upload_pack_uri = session.location.uri; { const session_uri_path = try std.fmt.allocPrint(session.allocator, "{f}", .{ @@ -988,63 +994,71 @@ pub const Session = struct { upload_pack_uri.query = null; upload_pack_uri.fragment = null; - var body: std.ArrayListUnmanaged(u8) = .empty; - defer body.deinit(session.allocator); - const body_writer = body.writer(session.allocator); - try Packet.write(.{ .data = "command=fetch\n" }, body_writer); + var body: std.Io.Writer = .fixed(response_buffer); + try Packet.write(.{ .data = "command=fetch\n" }, &body); if (session.supports_agent) { - try Packet.write(.{ .data = agent_capability }, body_writer); + try Packet.write(.{ .data = agent_capability }, &body); } { const object_format_packet = try std.fmt.allocPrint(session.allocator, "object-format={s}\n", .{@tagName(session.object_format)}); defer session.allocator.free(object_format_packet); - try Packet.write(.{ .data = object_format_packet }, body_writer); + try Packet.write(.{ .data = object_format_packet }, &body); } - try Packet.write(.delimiter, body_writer); + try Packet.write(.delimiter, &body); // Our packfile parser supports the OFS_DELTA object type - try Packet.write(.{ .data = "ofs-delta\n" }, body_writer); + try Packet.write(.{ .data = "ofs-delta\n" }, &body); // We do not currently convey server progress information to the user - try Packet.write(.{ .data = "no-progress\n" }, body_writer); + try Packet.write(.{ .data = "no-progress\n" }, &body); if (session.supports_shallow) { - try Packet.write(.{ .data = "deepen 1\n" }, body_writer); + try Packet.write(.{ .data = "deepen 1\n" }, &body); } for (wants) |want| { var buf: [Packet.max_data_length]u8 = undefined; const arg = std.fmt.bufPrint(&buf, "want {s}\n", .{want}) catch unreachable; - try Packet.write(.{ .data = arg }, body_writer); + try Packet.write(.{ .data = arg }, &body); } - try Packet.write(.{ .data = "done\n" }, body_writer); - try Packet.write(.flush, body_writer); + try Packet.write(.{ .data = "done\n" }, &body); + try Packet.write(.flush, &body); - var request = try session.transport.open(.POST, upload_pack_uri, .{ - .redirect_behavior = .not_allowed, - .server_header_buffer = http_headers_buffer, - .extra_headers = &.{ - .{ .name = "Content-Type", .value = "application/x-git-upload-pack-request" }, - .{ .name = "Git-Protocol", .value = "version=2" }, - }, - }); + fs.* = .{ + .request = try session.transport.request(.POST, upload_pack_uri, .{ + .redirect_behavior = .not_allowed, + .extra_headers = &.{ + .{ .name = "Content-Type", .value = "application/x-git-upload-pack-request" }, + .{ .name = "Git-Protocol", .value = "version=2" }, + }, + }), + .input = undefined, + .reader = undefined, + .remaining_len = undefined, + }; + const request = &fs.request; errdefer request.deinit(); - request.transfer_encoding = .{ .content_length = body.items.len }; - try request.send(); - try request.writeAll(body.items); - try request.finish(); - try request.wait(); - if (request.response.status != .ok) return error.ProtocolError; + try request.sendBodyComplete(body.buffered()); - const reader = request.reader(); + const response = try request.receiveHead(&.{}); + if (response.head.status != .ok) return error.ProtocolError; + + const reader = response.reader(response_buffer); // We are not interested in any of the sections of the returned fetch // data other than the packfile section, since we aren't doing anything // complex like ref negotiation (this is a fresh clone). var state: enum { section_start, section_content } = .section_start; while (true) { - var buf: [Packet.max_data_length]u8 = undefined; - const packet = try Packet.read(reader, &buf); + const packet = try Packet.read(reader); switch (state) { .section_start => switch (packet) { .data => |data| if (mem.eql(u8, Packet.normalizeText(data), "packfile")) { - return .{ .request = request }; + fs.input = reader; + fs.reader = .{ + .buffer = &.{}, + .vtable = &.{ .stream = FetchStream.stream }, + .seek = 0, + .end = 0, + }; + fs.remaining_len = 0; + return; } else { state = .section_content; }, @@ -1061,20 +1075,23 @@ pub const Session = struct { pub const FetchStream = struct { request: std.http.Client.Request, - buf: [Packet.max_data_length]u8 = undefined, - pos: usize = 0, - len: usize = 0, + input: *std.Io.Reader, + reader: std.Io.Reader, + err: ?Error = null, + remaining_len: usize, - pub fn deinit(stream: *FetchStream) void { - stream.request.deinit(); + pub fn deinit(fs: *FetchStream) void { + fs.request.deinit(); } - pub const ReadError = std.http.Client.Request.ReadError || error{ + pub const Error = error{ InvalidPacket, ProtocolError, UnexpectedPacket, + WriteFailed, + ReadFailed, + EndOfStream, }; - pub const Reader = std.io.GenericReader(*FetchStream, ReadError, read); const StreamCode = enum(u8) { pack_data = 1, @@ -1083,33 +1100,41 @@ pub const Session = struct { _, }; - pub fn reader(stream: *FetchStream) Reader { - return .{ .context = stream }; - } - - pub fn read(stream: *FetchStream, buf: []u8) !usize { - if (stream.pos == stream.len) { + pub fn stream(r: *std.Io.Reader, w: *std.Io.Writer, limit: std.Io.Limit) std.Io.Reader.StreamError!usize { + const fs: *FetchStream = @alignCast(@fieldParentPtr("reader", r)); + const input = fs.input; + if (fs.remaining_len == 0) { while (true) { - switch (try Packet.read(stream.request.reader(), &stream.buf)) { - .flush => return 0, + switch (Packet.read(input) catch |err| { + fs.err = err; + return error.ReadFailed; + }) { + .flush => return error.EndOfStream, .data => |data| if (data.len > 1) switch (@as(StreamCode, @enumFromInt(data[0]))) { .pack_data => { - stream.pos = 1; - stream.len = data.len; + input.toss(1); + fs.remaining_len = data.len; break; }, - .fatal_error => return error.ProtocolError, + .fatal_error => { + fs.err = error.ProtocolError; + return error.ReadFailed; + }, else => {}, }, - else => return error.UnexpectedPacket, + else => { + fs.err = error.UnexpectedPacket; + return error.ReadFailed; + }, } } } - - const size = @min(buf.len, stream.len - stream.pos); - @memcpy(buf[0..size], stream.buf[stream.pos .. stream.pos + size]); - stream.pos += size; - return size; + const buf = limit.slice(try w.writableSliceGreedy(1)); + const n = @min(buf.len, fs.remaining_len); + @memcpy(buf[0..n], input.buffered()[0..n]); + input.toss(n); + fs.remaining_len -= n; + return n; } }; };