From a1c1d06b19986cb9a585993ec2bb33a3d5302aa7 Mon Sep 17 00:00:00 2001 From: Andrew Kelley Date: Mon, 31 Mar 2025 02:10:50 -0700 Subject: [PATCH] std.Io: add detached async --- lib/std/Io.zig | 47 ++++++++++++++++++++++++--- lib/std/Thread/Pool.zig | 70 +++++++++++++++++++++++++++++++++++++++++ 2 files changed, 113 insertions(+), 4 deletions(-) diff --git a/lib/std/Io.zig b/lib/std/Io.zig index 9444d45414..be7f905ece 100644 --- a/lib/std/Io.zig +++ b/lib/std/Io.zig @@ -580,6 +580,18 @@ pub const VTable = struct { context_alignment: std.mem.Alignment, start: *const fn (context: *const anyopaque, result: *anyopaque) void, ) ?*AnyFuture, + /// Executes `start` asynchronously in a manner such that it cleans itself + /// up. This mode does not support results, await, or cancel. + /// + /// Thread-safe. + go: *const fn ( + /// Corresponds to `Io.userdata`. + userdata: ?*anyopaque, + /// Copied and then passed to `start`. + context: []const u8, + context_alignment: std.mem.Alignment, + start: *const fn (context: *const anyopaque) void, + ) void, /// This function is only called when `async` returns a non-null value. /// /// Thread-safe. @@ -593,7 +605,6 @@ pub const VTable = struct { result: []u8, result_alignment: std.mem.Alignment, ) void, - /// Equivalent to `await` but initiates cancel request. /// /// This function is only called when `async` returns a non-null value. @@ -671,14 +682,24 @@ pub fn Future(Result: type) type { /// Idempotent. pub fn cancel(f: *@This(), io: Io) Result { const any_future = f.any_future orelse return f.result; - io.vtable.cancel(io.userdata, any_future, @ptrCast((&f.result)[0..1]), .of(Result)); + io.vtable.cancel( + io.userdata, + any_future, + if (@sizeOf(Result) == 0) &.{} else @ptrCast((&f.result)[0..1]), // work around compiler bug + .of(Result), + ); f.any_future = null; return f.result; } pub fn await(f: *@This(), io: Io) Result { const any_future = f.any_future orelse return f.result; - io.vtable.await(io.userdata, any_future, @ptrCast((&f.result)[0..1]), .of(Result)); + io.vtable.await( + io.userdata, + any_future, + if (@sizeOf(Result) == 0) &.{} else @ptrCast((&f.result)[0..1]), // work around compiler bug + .of(Result), + ); f.any_future = null; return f.result; } @@ -996,7 +1017,7 @@ pub fn async(io: Io, function: anytype, args: anytype) Future(@typeInfo(@TypeOf( var future: Future(Result) = undefined; future.any_future = io.vtable.async( io.userdata, - @ptrCast((&future.result)[0..1]), + if (@sizeOf(Result) == 0) &.{} else @ptrCast((&future.result)[0..1]), // work around compiler bug .of(Result), if (@sizeOf(Args) == 0) &.{} else @ptrCast((&args)[0..1]), // work around compiler bug .of(Args), @@ -1005,6 +1026,24 @@ pub fn async(io: Io, function: anytype, args: anytype) Future(@typeInfo(@TypeOf( return future; } +/// Calls `function` with `args` asynchronously. The resource cleans itself up +/// when the function returns. Does not support await, cancel, or a return value. +pub fn go(io: Io, function: anytype, args: anytype) void { + const Args = @TypeOf(args); + const TypeErased = struct { + fn start(context: *const anyopaque) void { + const args_casted: *const Args = @alignCast(@ptrCast(context)); + @call(.auto, function, args_casted.*); + } + }; + io.vtable.go( + io.userdata, + if (@sizeOf(Args) == 0) &.{} else @ptrCast((&args)[0..1]), // work around compiler bug + .of(Args), + TypeErased.start, + ); +} + pub fn openFile(io: Io, dir: fs.Dir, sub_path: []const u8, flags: fs.File.OpenFlags) FileOpenError!fs.File { return io.vtable.openFile(io.userdata, dir, sub_path, flags); } diff --git a/lib/std/Thread/Pool.zig b/lib/std/Thread/Pool.zig index 37018f2ab7..f46c6f6802 100644 --- a/lib/std/Thread/Pool.zig +++ b/lib/std/Thread/Pool.zig @@ -332,6 +332,7 @@ pub fn io(pool: *Pool) Io { .vtable = &.{ .@"async" = @"async", .@"await" = @"await", + .go = go, .cancel = cancel, .cancelRequested = cancelRequested, .mutexLock = mutexLock, @@ -472,6 +473,75 @@ fn @"async"( return @ptrCast(closure); } +const DetachedClosure = struct { + pool: *Pool, + func: *const fn (context: *anyopaque) void, + run_node: std.Thread.Pool.RunQueue.Node = .{ .data = .{ .runFn = runFn } }, + context_alignment: std.mem.Alignment, + context_len: usize, + + fn runFn(runnable: *std.Thread.Pool.Runnable, _: ?usize) void { + const run_node: *std.Thread.Pool.RunQueue.Node = @fieldParentPtr("data", runnable); + const closure: *DetachedClosure = @alignCast(@fieldParentPtr("run_node", run_node)); + closure.func(closure.contextPointer()); + const gpa = closure.pool.allocator; + const base: [*]align(@alignOf(DetachedClosure)) u8 = @ptrCast(closure); + gpa.free(base[0..contextEnd(closure.context_alignment, closure.context_len)]); + } + + fn contextOffset(context_alignment: std.mem.Alignment) usize { + return context_alignment.forward(@sizeOf(DetachedClosure)); + } + + fn contextEnd(context_alignment: std.mem.Alignment, context_len: usize) usize { + return contextOffset(context_alignment) + context_len; + } + + fn contextPointer(closure: *DetachedClosure) [*]u8 { + const base: [*]u8 = @ptrCast(closure); + return base + contextOffset(closure.context_alignment); + } +}; + +fn go( + userdata: ?*anyopaque, + context: []const u8, + context_alignment: std.mem.Alignment, + start: *const fn (context: *const anyopaque) void, +) void { + const pool: *std.Thread.Pool = @alignCast(@ptrCast(userdata)); + pool.mutex.lock(); + + const gpa = pool.allocator; + const n = DetachedClosure.contextEnd(context_alignment, context.len); + const closure: *DetachedClosure = @alignCast(@ptrCast(gpa.alignedAlloc(u8, @alignOf(DetachedClosure), n) catch { + pool.mutex.unlock(); + start(context.ptr); + return; + })); + closure.* = .{ + .pool = pool, + .func = start, + .context_alignment = context_alignment, + .context_len = context.len, + }; + @memcpy(closure.contextPointer()[0..context.len], context); + pool.run_queue.prepend(&closure.run_node); + + if (pool.threads.items.len < pool.threads.capacity) { + pool.threads.addOneAssumeCapacity().* = std.Thread.spawn(.{ + .stack_size = pool.stack_size, + .allocator = gpa, + }, worker, .{pool}) catch t: { + pool.threads.items.len -= 1; + break :t undefined; + }; + } + + pool.mutex.unlock(); + pool.cond.signal(); +} + fn @"await"( userdata: ?*anyopaque, any_future: *std.Io.AnyFuture,