diff --git a/lib/std/Io/net/HostName.zig b/lib/std/Io/net/HostName.zig index 291b2745f4..75e69a6a76 100644 --- a/lib/std/Io/net/HostName.zig +++ b/lib/std/Io/net/HostName.zig @@ -206,34 +206,16 @@ pub fn connect( port: u16, options: IpAddress.ConnectOptions, ) ConnectError!Stream { - var canonical_name_buffer: [max_len]u8 = undefined; - var results_buffer: [32]HostName.LookupResult = undefined; - var results: Io.Queue(LookupResult) = .init(&results_buffer); + var connect_many_buffer: [32]ConnectManyResult = undefined; + var connect_many_queue: Io.Queue(ConnectManyResult) = .init(&connect_many_buffer); - var lookup_task = io.async(HostName.lookup, .{ host_name, io, &results, .{ - .port = port, - .canonical_name_buffer = &canonical_name_buffer, - } }); - defer lookup_task.cancel(io); - - const Result = union(enum) { connect_result: IpAddress.ConnectError!Stream }; - var finished_task_buffer: [results_buffer.len]Result = undefined; - var select: Io.Select(Result) = .init(io, &finished_task_buffer); - defer select.cancel(); - - while (results.getOne(io)) |result| switch (result) { - .address => |address| select.async(.connect_result, IpAddress.connect, .{ address, io, options }), - .canonical_name => continue, - .end => |lookup_result| { - try lookup_result; - break; - }, - } else |err| return err; + var connect_many = io.async(connectMany, .{ host_name, io, port, &connect_many_queue, options }); + defer connect_many.cancel(io); var aggregate_error: ConnectError = error.UnknownHostName; - while (select.outstanding != 0) switch (try select.wait()) { - .connect_result => |connect_result| if (connect_result) |stream| return stream else |err| switch (err) { + while (connect_many_queue.getOne(io)) |result| switch (result) { + .connection => |connection| if (connection) |stream| return stream else |err| switch (err) { error.SystemResources => |e| return e, error.OptionUnsupported => |e| return e, error.ProcessFdQuotaExceeded => |e| return e, @@ -242,9 +224,59 @@ pub fn connect( error.WouldBlock => return error.Unexpected, else => |e| aggregate_error = e, }, - }; + .end => |end| { + try end; + return aggregate_error; + }, + } else |err| return err; +} - return aggregate_error; +pub const ConnectManyResult = union(enum) { + connection: IpAddress.ConnectError!Stream, + end: ConnectError!void, +}; + +/// Asynchronously establishes a connection to all IP addresses associated with +/// a host name, adding them to a results queue upon completion. +pub fn connectMany( + host_name: HostName, + io: Io, + port: u16, + results: *Io.Queue(ConnectManyResult), + options: IpAddress.ConnectOptions, +) void { + var canonical_name_buffer: [max_len]u8 = undefined; + var lookup_buffer: [32]HostName.LookupResult = undefined; + var lookup_queue: Io.Queue(LookupResult) = .init(&lookup_buffer); + + host_name.lookup(io, &lookup_queue, .{ + .port = port, + .canonical_name_buffer = &canonical_name_buffer, + }); + + var group: Io.Group = .init; + defer group.cancel(io); + + while (lookup_queue.getOne(io)) |dns_result| switch (dns_result) { + .address => |address| group.async(io, enqueueConnection, .{ address, io, results, options }), + .canonical_name => continue, + .end => |lookup_result| { + group.wait(io); + results.putOneUncancelable(io, .{ .end = lookup_result }); + return; + }, + } else |err| switch (err) { + error.Canceled => |e| results.putOneUncancelable(io, .{ .end = e }), + } +} + +fn enqueueConnection( + address: IpAddress, + io: Io, + queue: *Io.Queue(ConnectManyResult), + options: IpAddress.ConnectOptions, +) void { + queue.putOneUncancelable(io, .{ .connection = address.connect(io, options) }); } pub const ResolvConf = struct {