diff --git a/lib/std/Io.zig b/lib/std/Io.zig index 7db4943a84..363aaeb787 100644 --- a/lib/std/Io.zig +++ b/lib/std/Io.zig @@ -593,18 +593,6 @@ pub const VTable = struct { context_alignment: std.mem.Alignment, start: *const fn (context: *const anyopaque, result: *anyopaque) void, ) error{OutOfMemory}!*AnyFuture, - /// Executes `start` asynchronously in a manner such that it cleans itself - /// up. This mode does not support results, await, or cancel. - /// - /// Thread-safe. - asyncDetached: *const fn ( - /// Corresponds to `Io.userdata`. - userdata: ?*anyopaque, - /// Copied and then passed to `start`. - context: []const u8, - context_alignment: std.mem.Alignment, - start: *const fn (context: *const anyopaque) void, - ) void, /// This function is only called when `async` returns a non-null value. /// /// Thread-safe. @@ -639,6 +627,23 @@ pub const VTable = struct { /// Thread-safe. cancelRequested: *const fn (?*anyopaque) bool, + /// Executes `start` asynchronously in a manner such that it cleans itself + /// up. This mode does not support results, await, or cancel. + /// + /// Thread-safe. + groupAsync: *const fn ( + /// Corresponds to `Io.userdata`. + userdata: ?*anyopaque, + /// Owner of the spawned async task. + group: *Group, + /// Copied and then passed to `start`. + context: []const u8, + context_alignment: std.mem.Alignment, + start: *const fn (context: *const anyopaque) void, + ) void, + groupWait: *const fn (?*anyopaque, *Group) void, + groupCancel: *const fn (?*anyopaque, *Group) void, + /// Blocks until one of the futures from the list has a result ready, such /// that awaiting it will not block. Returns that index. select: *const fn (?*anyopaque, futures: []const *AnyFuture) usize, @@ -751,6 +756,45 @@ pub fn Future(Result: type) type { }; } +pub const Group = struct { + state: usize, + context: ?*anyopaque, + + pub const init: Group = .{ .state = 0, .context = null }; + + /// Calls `function` with `args` asynchronously. The resource spawned is + /// owned by the group. + /// + /// `function` *may* be called immediately, before `async` returns. + /// + /// After this is called, `wait` must be called before the group is + /// deinitialized. + /// + /// See also: + /// * `async` + /// * `concurrent` + pub fn async(g: *Group, io: Io, function: anytype, args: std.meta.ArgsTuple(@TypeOf(function))) void { + const Args = @TypeOf(args); + const TypeErased = struct { + fn start(context: *const anyopaque) void { + const args_casted: *const Args = @ptrCast(@alignCast(context)); + @call(.auto, function, args_casted.*); + } + }; + io.vtable.groupAsync(io.userdata, g, @ptrCast((&args)[0..1]), .of(Args), TypeErased.start); + } + + /// Idempotent. + pub fn wait(g: *Group, io: Io) void { + io.vtable.groupWait(io.userdata, g); + } + + /// Idempotent. + pub fn cancel(g: *Group, io: Io) void { + io.vtable.groupCancel(io.userdata, g); + } +}; + pub const Mutex = if (true) struct { state: State, @@ -1099,7 +1143,7 @@ pub fn Queue(Elem: type) type { /// reusable. /// /// See also: -/// * `asyncDetached` +/// * `Group` pub fn async( io: Io, function: anytype, @@ -1159,25 +1203,6 @@ pub fn concurrent( return future; } -/// Calls `function` with `args` asynchronously. The resource cleans itself up -/// when the function returns. Does not support await, cancel, or a return value. -/// -/// `function` *may* be called immediately, before `async` returns. -/// -/// See also: -/// * `async` -/// * `concurrent` -pub fn asyncDetached(io: Io, function: anytype, args: std.meta.ArgsTuple(@TypeOf(function))) void { - const Args = @TypeOf(args); - const TypeErased = struct { - fn start(context: *const anyopaque) void { - const args_casted: *const Args = @ptrCast(@alignCast(context)); - @call(.auto, function, args_casted.*); - } - }; - io.vtable.asyncDetached(io.userdata, @ptrCast((&args)[0..1]), .of(Args), TypeErased.start); -} - pub fn cancelRequested(io: Io) bool { return io.vtable.cancelRequested(io.userdata); } diff --git a/lib/std/Io/Threaded.zig b/lib/std/Io/Threaded.zig index 762eb81060..1459f2efba 100644 --- a/lib/std/Io/Threaded.zig +++ b/lib/std/Io/Threaded.zig @@ -8,7 +8,6 @@ const windows = std.os.windows; const std = @import("../std.zig"); const Allocator = std.mem.Allocator; const assert = std.debug.assert; -const WaitGroup = std.Thread.WaitGroup; const posix = std.posix; const Io = std.Io; @@ -101,10 +100,12 @@ pub fn io(pool: *Pool) Io { .async = async, .concurrent = concurrent, .await = await, - .asyncDetached = asyncDetached, .cancel = cancel, .cancelRequested = cancelRequested, .select = select, + .groupAsync = groupAsync, + .groupWait = groupWait, + .groupCancel = groupCancel, .mutexLock = mutexLock, .mutexUnlock = mutexUnlock, @@ -279,7 +280,7 @@ fn async( .func = start, .context_offset = context_offset, .result_offset = result_offset, - .reset_event = .{}, + .reset_event = .unset, .cancel_tid = 0, .select_condition = null, .runnable = .{ @@ -347,7 +348,7 @@ fn concurrent( .func = start, .context_offset = context_offset, .result_offset = result_offset, - .reset_event = .{}, + .reset_event = .unset, .cancel_tid = 0, .select_condition = null, .runnable = .{ @@ -385,41 +386,47 @@ fn concurrent( return @ptrCast(closure); } -const DetachedClosure = struct { +const GroupClosure = struct { pool: *Pool, + group: *Io.Group, func: *const fn (context: *anyopaque) void, runnable: Runnable, context_alignment: std.mem.Alignment, context_len: usize, fn start(runnable: *Runnable) void { - const closure: *DetachedClosure = @alignCast(@fieldParentPtr("runnable", runnable)); + const closure: *GroupClosure = @alignCast(@fieldParentPtr("runnable", runnable)); closure.func(closure.contextPointer()); + const group = closure.group; const gpa = closure.pool.allocator; free(closure, gpa); + const group_state: *std.atomic.Value(usize) = @ptrCast(&group.state); + const reset_event: *std.Thread.ResetEvent = @ptrCast(&group.context); + std.Thread.WaitGroup.finishStateless(group_state, reset_event); } - fn free(closure: *DetachedClosure, gpa: Allocator) void { - const base: [*]align(@alignOf(DetachedClosure)) u8 = @ptrCast(closure); + fn free(closure: *GroupClosure, gpa: Allocator) void { + const base: [*]align(@alignOf(GroupClosure)) u8 = @ptrCast(closure); gpa.free(base[0..contextEnd(closure.context_alignment, closure.context_len)]); } fn contextOffset(context_alignment: std.mem.Alignment) usize { - return context_alignment.forward(@sizeOf(DetachedClosure)); + return context_alignment.forward(@sizeOf(GroupClosure)); } fn contextEnd(context_alignment: std.mem.Alignment, context_len: usize) usize { return contextOffset(context_alignment) + context_len; } - fn contextPointer(closure: *DetachedClosure) [*]u8 { + fn contextPointer(closure: *GroupClosure) [*]u8 { const base: [*]u8 = @ptrCast(closure); return base + contextOffset(closure.context_alignment); } }; -fn asyncDetached( +fn groupAsync( userdata: ?*anyopaque, + group: *Io.Group, context: []const u8, context_alignment: std.mem.Alignment, start: *const fn (context: *const anyopaque) void, @@ -428,17 +435,18 @@ fn asyncDetached( const pool: *Pool = @ptrCast(@alignCast(userdata)); const cpu_count = pool.cpu_count catch 1; const gpa = pool.allocator; - const n = DetachedClosure.contextEnd(context_alignment, context.len); - const closure: *DetachedClosure = @ptrCast(@alignCast(gpa.alignedAlloc(u8, .of(DetachedClosure), n) catch { + const n = GroupClosure.contextEnd(context_alignment, context.len); + const closure: *GroupClosure = @ptrCast(@alignCast(gpa.alignedAlloc(u8, .of(GroupClosure), n) catch { return start(context.ptr); })); closure.* = .{ .pool = pool, + .group = group, .func = start, .context_alignment = context_alignment, .context_len = context.len, .runnable = .{ - .start = DetachedClosure.start, + .start = GroupClosure.start, .is_parallel = false, }, }; @@ -466,10 +474,30 @@ fn asyncDetached( pool.threads.appendAssumeCapacity(thread); } + const group_state: *std.atomic.Value(usize) = @ptrCast(&group.state); + std.Thread.WaitGroup.startStateless(group_state); + pool.mutex.unlock(); pool.cond.signal(); } +fn groupWait(userdata: ?*anyopaque, group: *Io.Group) void { + if (builtin.single_threaded) return; + const pool: *Pool = @ptrCast(@alignCast(userdata)); + _ = pool; + const group_state: *std.atomic.Value(usize) = @ptrCast(&group.state); + const reset_event: *std.Thread.ResetEvent = @ptrCast(&group.context); + std.Thread.WaitGroup.waitStateless(group_state, reset_event); +} + +fn groupCancel(userdata: ?*anyopaque, group: *Io.Group) void { + if (builtin.single_threaded) return; + const pool: *Pool = @ptrCast(@alignCast(userdata)); + _ = pool; + _ = group; + @panic("TODO threaded group cancel"); +} + fn await( userdata: ?*anyopaque, any_future: *Io.AnyFuture, @@ -968,7 +996,7 @@ fn select(userdata: ?*anyopaque, futures: []const *Io.AnyFuture) usize { const pool: *Pool = @ptrCast(@alignCast(userdata)); _ = pool; - var reset_event: std.Thread.ResetEvent = .{}; + var reset_event: std.Thread.ResetEvent = .unset; for (futures, 0..) |future, i| { const closure: *AsyncClosure = @ptrCast(@alignCast(future)); diff --git a/lib/std/Io/net/HostName.zig b/lib/std/Io/net/HostName.zig index 091c6e2b96..907f5bca1b 100644 --- a/lib/std/Io/net/HostName.zig +++ b/lib/std/Io/net/HostName.zig @@ -653,7 +653,7 @@ pub const ResolvConf = struct { const mapped_nameservers = if (any_ip6) ip4_mapped[0..rc.nameservers_len] else rc.nameservers(); - var group: Io.Group = .{}; + var group: Io.Group = .init; defer group.cancel(); for (queries) |query| { @@ -702,7 +702,7 @@ test ResolvConf { .search_buffer = undefined, .search_len = 0, .ndots = 1, - .timeout = 5, + .timeout = .seconds(5), .attempts = 2, };