From 2e1ab5d3f791c596a663a2c83cd751e3729e7a44 Mon Sep 17 00:00:00 2001 From: Andrew Kelley Date: Mon, 29 Sep 2025 14:04:07 -0700 Subject: [PATCH] std.Io.Threaded: implement Group.cancel --- lib/std/Io.zig | 22 ++- lib/std/Io/Threaded.zig | 349 +++++++++++++++++++++------------------- 2 files changed, 194 insertions(+), 177 deletions(-) diff --git a/lib/std/Io.zig b/lib/std/Io.zig index 363aaeb787..5514cff6f9 100644 --- a/lib/std/Io.zig +++ b/lib/std/Io.zig @@ -736,10 +736,9 @@ pub fn Future(Result: type) type { any_future: ?*AnyFuture, result: Result, - /// Equivalent to `await` but sets a flag observable to application - /// code that cancellation has been requested. + /// Equivalent to `await` but places a cancellation request. /// - /// Idempotent. + /// Idempotent. Not threadsafe. pub fn cancel(f: *@This(), io: Io) Result { const any_future = f.any_future orelse return f.result; io.vtable.cancel(io.userdata, any_future, @ptrCast((&f.result)[0..1]), .of(Result)); @@ -747,6 +746,7 @@ pub fn Future(Result: type) type { return f.result; } + /// Idempotent. Not threadsafe. pub fn await(f: *@This(), io: Io) Result { const any_future = f.any_future orelse return f.result; io.vtable.await(io.userdata, any_future, @ptrCast((&f.result)[0..1]), .of(Result)); @@ -759,8 +759,9 @@ pub fn Future(Result: type) type { pub const Group = struct { state: usize, context: ?*anyopaque, + token: ?*anyopaque, - pub const init: Group = .{ .state = 0, .context = null }; + pub const init: Group = .{ .state = 0, .context = null, .token = null }; /// Calls `function` with `args` asynchronously. The resource spawned is /// owned by the group. @@ -771,7 +772,7 @@ pub const Group = struct { /// deinitialized. /// /// See also: - /// * `async` + /// * `Io.async` /// * `concurrent` pub fn async(g: *Group, io: Io, function: anytype, args: std.meta.ArgsTuple(@TypeOf(function))) void { const Args = @TypeOf(args); @@ -784,14 +785,21 @@ pub const Group = struct { io.vtable.groupAsync(io.userdata, g, @ptrCast((&args)[0..1]), .of(Args), TypeErased.start); } - /// Idempotent. + /// Blocks until all tasks of the group finish. + /// + /// Idempotent. Not threadsafe. pub fn wait(g: *Group, io: Io) void { io.vtable.groupWait(io.userdata, g); } - /// Idempotent. + /// Equivalent to `wait` but requests cancellation on all tasks owned by + /// the group. + /// + /// Idempotent. Not threadsafe. pub fn cancel(g: *Group, io: Io) void { + if (g.token == null) return; io.vtable.groupCancel(io.userdata, g); + assert(g.token == null); } }; diff --git a/lib/std/Io/Threaded.zig b/lib/std/Io/Threaded.zig index 1459f2efba..db0dad5669 100644 --- a/lib/std/Io/Threaded.zig +++ b/lib/std/Io/Threaded.zig @@ -10,6 +10,7 @@ const Allocator = std.mem.Allocator; const assert = std.debug.assert; const posix = std.posix; const Io = std.Io; +const ResetEvent = std.Thread.ResetEvent; /// Thread-safe. allocator: Allocator, @@ -20,9 +21,9 @@ join_requested: bool = false, threads: std.ArrayListUnmanaged(std.Thread), stack_size: usize, cpu_count: std.Thread.CpuCountError!usize, -parallel_count: usize, +concurrent_count: usize, -threadlocal var current_closure: ?*AsyncClosure = null; +threadlocal var current_closure: ?*Closure = null; const max_iovecs_len = 8; const splat_buffer_size = 64; @@ -31,12 +32,33 @@ comptime { assert(max_iovecs_len <= posix.IOV_MAX); } -pub const Runnable = struct { +const Closure = struct { start: Start, node: std.SinglyLinkedList.Node = .{}, - is_parallel: bool, + cancel_tid: std.Thread.Id, + /// Whether this task bumps minimum number of threads in the pool. + is_concurrent: bool, - pub const Start = *const fn (*Runnable) void; + const Start = *const fn (*Closure) void; + + const canceling_tid: std.Thread.Id = switch (@typeInfo(std.Thread.Id)) { + .int => |int_info| switch (int_info.signedness) { + .signed => -1, + .unsigned => std.math.maxInt(std.Thread.Id), + }, + .pointer => @ptrFromInt(std.math.maxInt(usize)), + else => @compileError("unsupported std.Thread.Id: " ++ @typeName(std.Thread.Id)), + }; + + fn requestCancel(closure: *Closure) void { + switch (@atomicRmw(std.Thread.Id, &closure.cancel_tid, .Xchg, canceling_tid, .acq_rel)) { + 0, canceling_tid => {}, + else => |tid| switch (builtin.os.tag) { + .linux => _ = std.os.linux.tgkill(std.os.linux.getpid(), @bitCast(tid), posix.SIG.IO), + else => {}, + }, + } + } }; pub const InitError = std.Thread.CpuCountError || Allocator.Error; @@ -47,7 +69,7 @@ pub fn init(gpa: Allocator) Pool { .threads = .empty, .stack_size = std.Thread.SpawnConfig.default_stack_size, .cpu_count = std.Thread.getCpuCount(), - .parallel_count = 0, + .concurrent_count = 0, }; if (pool.cpu_count) |n| { pool.threads.ensureTotalCapacityPrecise(gpa, n - 1) catch {}; @@ -78,14 +100,15 @@ fn worker(pool: *Pool) void { defer pool.mutex.unlock(); while (true) { - while (pool.run_queue.popFirst()) |run_node| { + while (pool.run_queue.popFirst()) |closure_node| { pool.mutex.unlock(); - const runnable: *Runnable = @fieldParentPtr("node", run_node); - runnable.start(runnable); + const closure: *Closure = @fieldParentPtr("node", closure_node); + const is_concurrent = closure.is_concurrent; + closure.start(closure); pool.mutex.lock(); - if (runnable.is_parallel) { + if (is_concurrent) { // TODO also pop thread and join sometimes - pool.parallel_count -= 1; + pool.concurrent_count -= 1; } } if (pool.join_requested) break; @@ -154,97 +177,71 @@ pub fn io(pool: *Pool) Io { }; } +/// Trailing data: +/// 1. context +/// 2. result const AsyncClosure = struct { + closure: Closure, func: *const fn (context: *anyopaque, result: *anyopaque) void, - runnable: Runnable, - reset_event: std.Thread.ResetEvent, - select_condition: ?*std.Thread.ResetEvent, - cancel_tid: std.Thread.Id, - context_offset: usize, + reset_event: ResetEvent, + select_condition: ?*ResetEvent, + context_alignment: std.mem.Alignment, result_offset: usize, + /// Whether the task has a return type with nonzero bits. + has_result: bool, - const done_reset_event: *std.Thread.ResetEvent = @ptrFromInt(@alignOf(std.Thread.ResetEvent)); + const done_reset_event: *ResetEvent = @ptrFromInt(@alignOf(ResetEvent)); - const canceling_tid: std.Thread.Id = switch (@typeInfo(std.Thread.Id)) { - .int => |int_info| switch (int_info.signedness) { - .signed => -1, - .unsigned => std.math.maxInt(std.Thread.Id), - }, - .pointer => @ptrFromInt(std.math.maxInt(usize)), - else => @compileError("unsupported std.Thread.Id: " ++ @typeName(std.Thread.Id)), - }; - - fn start(runnable: *Runnable) void { - const closure: *AsyncClosure = @alignCast(@fieldParentPtr("runnable", runnable)); + fn start(closure: *Closure) void { + const ac: *AsyncClosure = @alignCast(@fieldParentPtr("closure", closure)); const tid = std.Thread.getCurrentId(); - if (@cmpxchgStrong( - std.Thread.Id, - &closure.cancel_tid, - 0, - tid, - .acq_rel, - .acquire, - )) |cancel_tid| { - assert(cancel_tid == canceling_tid); - closure.reset_event.set(); - return; + if (@cmpxchgStrong(std.Thread.Id, &closure.cancel_tid, 0, tid, .acq_rel, .acquire)) |cancel_tid| { + assert(cancel_tid == Closure.canceling_tid); + // Even though we already know the task is canceled, we must still + // run the closure in order to make the return value valid - that + // is, unless the result is zero bytes! + if (!ac.has_result) { + ac.reset_event.set(); + return; + } } current_closure = closure; - closure.func(closure.contextPointer(), closure.resultPointer()); + ac.func(ac.contextPointer(), ac.resultPointer()); current_closure = null; - if (@cmpxchgStrong( - std.Thread.Id, - &closure.cancel_tid, - tid, - 0, - .acq_rel, - .acquire, - )) |cancel_tid| assert(cancel_tid == canceling_tid); - if (@atomicRmw( - ?*std.Thread.ResetEvent, - &closure.select_condition, - .Xchg, - done_reset_event, - .release, - )) |select_reset| { + // In case a cancel happens after successful task completion, prevents + // signal from being delivered to the thread in `requestCancel`. + if (@cmpxchgStrong(std.Thread.Id, &closure.cancel_tid, tid, 0, .acq_rel, .acquire)) |cancel_tid| { + assert(cancel_tid == Closure.canceling_tid); + } + + if (@atomicRmw(?*ResetEvent, &ac.select_condition, .Xchg, done_reset_event, .release)) |select_reset| { assert(select_reset != done_reset_event); select_reset.set(); } - closure.reset_event.set(); + ac.reset_event.set(); } - fn contextOffset(context_alignment: std.mem.Alignment) usize { - return context_alignment.forward(@sizeOf(AsyncClosure)); + fn resultPointer(ac: *AsyncClosure) [*]u8 { + const base: [*]u8 = @ptrCast(ac); + return base + ac.result_offset; } - fn resultOffset( - context_alignment: std.mem.Alignment, - context_len: usize, - result_alignment: std.mem.Alignment, - ) usize { - return result_alignment.forward(contextOffset(context_alignment) + context_len); + fn contextPointer(ac: *AsyncClosure) [*]u8 { + const base: [*]u8 = @ptrCast(ac); + return base + ac.context_alignment.forward(@sizeOf(AsyncClosure)); } - fn resultPointer(closure: *AsyncClosure) [*]u8 { - const base: [*]u8 = @ptrCast(closure); - return base + closure.result_offset; + fn waitAndFree(ac: *AsyncClosure, gpa: Allocator, result: []u8) void { + ac.reset_event.wait(); + @memcpy(result, ac.resultPointer()[0..result.len]); + free(ac, gpa, result.len); } - fn contextPointer(closure: *AsyncClosure) [*]u8 { - const base: [*]u8 = @ptrCast(closure); - return base + closure.context_offset; - } - - fn waitAndFree(closure: *AsyncClosure, gpa: Allocator, result: []u8) void { - closure.reset_event.wait(); - @memcpy(result, closure.resultPointer()[0..result.len]); - free(closure, gpa, result.len); - } - - fn free(closure: *AsyncClosure, gpa: Allocator, result_len: usize) void { - const base: [*]align(@alignOf(AsyncClosure)) u8 = @ptrCast(closure); - gpa.free(base[0 .. closure.result_offset + result_len]); + fn free(ac: *AsyncClosure, gpa: Allocator, result_len: usize) void { + if (!ac.has_result) assert(result_len == 0); + const base: [*]align(@alignOf(AsyncClosure)) u8 = @ptrCast(ac); + gpa.free(base[0 .. ac.result_offset + result_len]); } }; @@ -271,59 +268,60 @@ fn async( const context_offset = context_alignment.forward(@sizeOf(AsyncClosure)); const result_offset = result_alignment.forward(context_offset + context.len); const n = result_offset + result.len; - const closure: *AsyncClosure = @ptrCast(@alignCast(gpa.alignedAlloc(u8, .of(AsyncClosure), n) catch { + const ac: *AsyncClosure = @ptrCast(@alignCast(gpa.alignedAlloc(u8, .of(AsyncClosure), n) catch { start(context.ptr, result.ptr); return null; })); - closure.* = .{ - .func = start, - .context_offset = context_offset, - .result_offset = result_offset, - .reset_event = .unset, - .cancel_tid = 0, - .select_condition = null, - .runnable = .{ + ac.* = .{ + .closure = .{ + .cancel_tid = 0, .start = AsyncClosure.start, - .is_parallel = false, + .is_concurrent = false, }, + .func = start, + .context_alignment = context_alignment, + .result_offset = result_offset, + .has_result = result.len != 0, + .reset_event = .unset, + .select_condition = null, }; - @memcpy(closure.contextPointer()[0..context.len], context); + @memcpy(ac.contextPointer()[0..context.len], context); pool.mutex.lock(); - const thread_capacity = cpu_count - 1 + pool.parallel_count; + const thread_capacity = cpu_count - 1 + pool.concurrent_count; pool.threads.ensureTotalCapacityPrecise(gpa, thread_capacity) catch { pool.mutex.unlock(); - closure.free(gpa, result.len); + ac.free(gpa, result.len); start(context.ptr, result.ptr); return null; }; - pool.run_queue.prepend(&closure.runnable.node); + pool.run_queue.prepend(&ac.closure.node); if (pool.threads.items.len < thread_capacity) { const thread = std.Thread.spawn(.{ .stack_size = pool.stack_size }, worker, .{pool}) catch { if (pool.threads.items.len == 0) { - assert(pool.run_queue.popFirst() == &closure.runnable.node); + assert(pool.run_queue.popFirst() == &ac.closure.node); pool.mutex.unlock(); - closure.free(gpa, result.len); + ac.free(gpa, result.len); start(context.ptr, result.ptr); return null; } // Rely on other workers to do it. pool.mutex.unlock(); pool.cond.signal(); - return @ptrCast(closure); + return @ptrCast(ac); }; pool.threads.appendAssumeCapacity(thread); } pool.mutex.unlock(); pool.cond.signal(); - return @ptrCast(closure); + return @ptrCast(ac); } fn concurrent( @@ -342,40 +340,41 @@ fn concurrent( const context_offset = context_alignment.forward(@sizeOf(AsyncClosure)); const result_offset = result_alignment.forward(context_offset + context.len); const n = result_offset + result_len; - const closure: *AsyncClosure = @ptrCast(@alignCast(try gpa.alignedAlloc(u8, .of(AsyncClosure), n))); + const ac: *AsyncClosure = @ptrCast(@alignCast(try gpa.alignedAlloc(u8, .of(AsyncClosure), n))); - closure.* = .{ - .func = start, - .context_offset = context_offset, - .result_offset = result_offset, - .reset_event = .unset, - .cancel_tid = 0, - .select_condition = null, - .runnable = .{ + ac.* = .{ + .closure = .{ + .cancel_tid = 0, .start = AsyncClosure.start, - .is_parallel = true, + .is_concurrent = true, }, + .func = start, + .context_alignment = context_alignment, + .result_offset = result_offset, + .has_result = result_len != 0, + .reset_event = .unset, + .select_condition = null, }; - @memcpy(closure.contextPointer()[0..context.len], context); + @memcpy(ac.contextPointer()[0..context.len], context); pool.mutex.lock(); - pool.parallel_count += 1; - const thread_capacity = cpu_count - 1 + pool.parallel_count; + pool.concurrent_count += 1; + const thread_capacity = cpu_count - 1 + pool.concurrent_count; pool.threads.ensureTotalCapacity(gpa, thread_capacity) catch { pool.mutex.unlock(); - closure.free(gpa, result_len); + ac.free(gpa, result_len); return error.OutOfMemory; }; - pool.run_queue.prepend(&closure.runnable.node); + pool.run_queue.prepend(&ac.closure.node); if (pool.threads.items.len < thread_capacity) { const thread = std.Thread.spawn(.{ .stack_size = pool.stack_size }, worker, .{pool}) catch { - assert(pool.run_queue.popFirst() == &closure.runnable.node); + assert(pool.run_queue.popFirst() == &ac.closure.node); pool.mutex.unlock(); - closure.free(gpa, result_len); + ac.free(gpa, result_len); return error.OutOfMemory; }; pool.threads.appendAssumeCapacity(thread); @@ -383,31 +382,48 @@ fn concurrent( pool.mutex.unlock(); pool.cond.signal(); - return @ptrCast(closure); + return @ptrCast(ac); } const GroupClosure = struct { + closure: Closure, pool: *Pool, group: *Io.Group, + /// Points to sibling `GroupClosure`. Used for walking the group to cancel all. + node: std.SinglyLinkedList.Node, func: *const fn (context: *anyopaque) void, - runnable: Runnable, context_alignment: std.mem.Alignment, context_len: usize, - fn start(runnable: *Runnable) void { - const closure: *GroupClosure = @alignCast(@fieldParentPtr("runnable", runnable)); - closure.func(closure.contextPointer()); - const group = closure.group; - const gpa = closure.pool.allocator; - free(closure, gpa); + fn start(closure: *Closure) void { + const gc: *GroupClosure = @alignCast(@fieldParentPtr("closure", closure)); + const tid = std.Thread.getCurrentId(); + const group = gc.group; const group_state: *std.atomic.Value(usize) = @ptrCast(&group.state); - const reset_event: *std.Thread.ResetEvent = @ptrCast(&group.context); + const reset_event: *ResetEvent = @ptrCast(&group.context); + if (@cmpxchgStrong(std.Thread.Id, &closure.cancel_tid, 0, tid, .acq_rel, .acquire)) |cancel_tid| { + assert(cancel_tid == Closure.canceling_tid); + // We already know the task is canceled before running the callback. Since all closures + // in a Group have void return type, we can return early. + std.Thread.WaitGroup.finishStateless(group_state, reset_event); + return; + } + current_closure = closure; + gc.func(gc.contextPointer()); + current_closure = null; + + // In case a cancel happens after successful task completion, prevents + // signal from being delivered to the thread in `requestCancel`. + if (@cmpxchgStrong(std.Thread.Id, &closure.cancel_tid, tid, 0, .acq_rel, .acquire)) |cancel_tid| { + assert(cancel_tid == Closure.canceling_tid); + } + std.Thread.WaitGroup.finishStateless(group_state, reset_event); } - fn free(closure: *GroupClosure, gpa: Allocator) void { - const base: [*]align(@alignOf(GroupClosure)) u8 = @ptrCast(closure); - gpa.free(base[0..contextEnd(closure.context_alignment, closure.context_len)]); + fn free(gc: *GroupClosure, gpa: Allocator) void { + const base: [*]align(@alignOf(GroupClosure)) u8 = @ptrCast(gc); + gpa.free(base[0..contextEnd(gc.context_alignment, gc.context_len)]); } fn contextOffset(context_alignment: std.mem.Alignment) usize { @@ -418,9 +434,9 @@ const GroupClosure = struct { return contextOffset(context_alignment) + context_len; } - fn contextPointer(closure: *GroupClosure) [*]u8 { - const base: [*]u8 = @ptrCast(closure); - return base + contextOffset(closure.context_alignment); + fn contextPointer(gc: *GroupClosure) [*]u8 { + const base: [*]u8 = @ptrCast(gc); + return base + contextOffset(gc.context_alignment); } }; @@ -436,39 +452,42 @@ fn groupAsync( const cpu_count = pool.cpu_count catch 1; const gpa = pool.allocator; const n = GroupClosure.contextEnd(context_alignment, context.len); - const closure: *GroupClosure = @ptrCast(@alignCast(gpa.alignedAlloc(u8, .of(GroupClosure), n) catch { + const gc: *GroupClosure = @ptrCast(@alignCast(gpa.alignedAlloc(u8, .of(GroupClosure), n) catch { return start(context.ptr); })); - closure.* = .{ + gc.* = .{ + .closure = .{ + .cancel_tid = 0, + .start = GroupClosure.start, + .is_concurrent = false, + }, .pool = pool, .group = group, + .node = .{ .next = @ptrCast(@alignCast(group.token)) }, .func = start, .context_alignment = context_alignment, .context_len = context.len, - .runnable = .{ - .start = GroupClosure.start, - .is_parallel = false, - }, }; - @memcpy(closure.contextPointer()[0..context.len], context); + group.token = &gc.node; + @memcpy(gc.contextPointer()[0..context.len], context); pool.mutex.lock(); - const thread_capacity = cpu_count - 1 + pool.parallel_count; + const thread_capacity = cpu_count - 1 + pool.concurrent_count; pool.threads.ensureTotalCapacityPrecise(gpa, thread_capacity) catch { pool.mutex.unlock(); - closure.free(gpa); + gc.free(gpa); return start(context.ptr); }; - pool.run_queue.prepend(&closure.runnable.node); + pool.run_queue.prepend(&gc.closure.node); if (pool.threads.items.len < thread_capacity) { const thread = std.Thread.spawn(.{ .stack_size = pool.stack_size }, worker, .{pool}) catch { - assert(pool.run_queue.popFirst() == &closure.runnable.node); + assert(pool.run_queue.popFirst() == &gc.closure.node); pool.mutex.unlock(); - closure.free(gpa); + gc.free(gpa); return start(context.ptr); }; pool.threads.appendAssumeCapacity(thread); @@ -486,7 +505,7 @@ fn groupWait(userdata: ?*anyopaque, group: *Io.Group) void { const pool: *Pool = @ptrCast(@alignCast(userdata)); _ = pool; const group_state: *std.atomic.Value(usize) = @ptrCast(&group.state); - const reset_event: *std.Thread.ResetEvent = @ptrCast(&group.context); + const reset_event: *ResetEvent = @ptrCast(&group.context); std.Thread.WaitGroup.waitStateless(group_state, reset_event); } @@ -494,8 +513,14 @@ fn groupCancel(userdata: ?*anyopaque, group: *Io.Group) void { if (builtin.single_threaded) return; const pool: *Pool = @ptrCast(@alignCast(userdata)); _ = pool; - _ = group; - @panic("TODO threaded group cancel"); + const token = group.token.?; + group.token = null; + var node: *std.SinglyLinkedList.Node = @ptrCast(@alignCast(token)); + while (true) { + const gc: *GroupClosure = @fieldParentPtr("node", node); + gc.closure.requestCancel(); + node = node.next orelse break; + } } fn await( @@ -518,32 +543,16 @@ fn cancel( ) void { _ = result_alignment; const pool: *Pool = @ptrCast(@alignCast(userdata)); - const closure: *AsyncClosure = @ptrCast(@alignCast(any_future)); - switch (@atomicRmw( - std.Thread.Id, - &closure.cancel_tid, - .Xchg, - AsyncClosure.canceling_tid, - .acq_rel, - )) { - 0, AsyncClosure.canceling_tid => {}, - else => |cancel_tid| switch (builtin.os.tag) { - .linux => _ = std.os.linux.tgkill( - std.os.linux.getpid(), - @bitCast(cancel_tid), - posix.SIG.IO, - ), - else => {}, - }, - } - closure.waitAndFree(pool.allocator, result); + const ac: *AsyncClosure = @ptrCast(@alignCast(any_future)); + ac.closure.requestCancel(); + ac.waitAndFree(pool.allocator, result); } fn cancelRequested(userdata: ?*anyopaque) bool { const pool: *Pool = @ptrCast(@alignCast(userdata)); _ = pool; const closure = current_closure orelse return false; - return @atomicLoad(std.Thread.Id, &closure.cancel_tid, .acquire) == AsyncClosure.canceling_tid; + return @atomicLoad(std.Thread.Id, &closure.cancel_tid, .acquire) == Closure.canceling_tid; } fn checkCancel(pool: *Pool) error{Canceled}!void { @@ -996,14 +1005,14 @@ fn select(userdata: ?*anyopaque, futures: []const *Io.AnyFuture) usize { const pool: *Pool = @ptrCast(@alignCast(userdata)); _ = pool; - var reset_event: std.Thread.ResetEvent = .unset; + var reset_event: ResetEvent = .unset; for (futures, 0..) |future, i| { const closure: *AsyncClosure = @ptrCast(@alignCast(future)); - if (@atomicRmw(?*std.Thread.ResetEvent, &closure.select_condition, .Xchg, &reset_event, .seq_cst) == AsyncClosure.done_reset_event) { + if (@atomicRmw(?*ResetEvent, &closure.select_condition, .Xchg, &reset_event, .seq_cst) == AsyncClosure.done_reset_event) { for (futures[0..i]) |cleanup_future| { const cleanup_closure: *AsyncClosure = @ptrCast(@alignCast(cleanup_future)); - if (@atomicRmw(?*std.Thread.ResetEvent, &cleanup_closure.select_condition, .Xchg, null, .seq_cst) == AsyncClosure.done_reset_event) { + if (@atomicRmw(?*ResetEvent, &cleanup_closure.select_condition, .Xchg, null, .seq_cst) == AsyncClosure.done_reset_event) { cleanup_closure.reset_event.wait(); // Ensure no reference to our stack-allocated reset_event. } } @@ -1016,7 +1025,7 @@ fn select(userdata: ?*anyopaque, futures: []const *Io.AnyFuture) usize { var result: ?usize = null; for (futures, 0..) |future, i| { const closure: *AsyncClosure = @ptrCast(@alignCast(future)); - if (@atomicRmw(?*std.Thread.ResetEvent, &closure.select_condition, .Xchg, null, .seq_cst) == AsyncClosure.done_reset_event) { + if (@atomicRmw(?*ResetEvent, &closure.select_condition, .Xchg, null, .seq_cst) == AsyncClosure.done_reset_event) { closure.reset_event.wait(); // Ensure no reference to our stack-allocated reset_event. if (result == null) result = i; // In case multiple are ready, return first. }