diff --git a/lib/std/Io.zig b/lib/std/Io.zig index ff6966d7f7..eee4c55734 100644 --- a/lib/std/Io.zig +++ b/lib/std/Io.zig @@ -911,3 +911,72 @@ test { _ = @import("Io/stream_source.zig"); _ = @import("Io/test.zig"); } + +const Io = @This(); + +userdata: ?*anyopaque, +vtable: *const VTable, + +pub const VTable = struct { + /// If it returns `null` it means `result` has been already populated and + /// `await` will be a no-op. + async: *const fn ( + /// Corresponds to `Io.userdata`. + userdata: ?*anyopaque, + /// The pointer of this slice is an "eager" result value. + /// The length is the size in bytes of the result type. + eager_result: []u8, + /// Passed to `start`. + context: ?*anyopaque, + start: *const fn (context: ?*anyopaque, result: *anyopaque) void, + ) ?*AnyFuture, + + /// This function is only called when `async` returns a non-null value. + await: *const fn ( + /// Corresponds to `Io.userdata`. + userdata: ?*anyopaque, + /// The same value that was returned from `async`. + any_future: *AnyFuture, + /// Points to a buffer where the result is written. + /// The length is equal to size in bytes of result type. + result: []u8, + ) void, +}; + +pub const AnyFuture = opaque {}; + +pub fn Future(Result: type) type { + return struct { + any_future: ?*AnyFuture, + result: Result, + + pub fn await(f: *@This(), io: Io) Result { + const any_future = f.any_future orelse return f.result; + io.vtable.await(io.userdata, any_future, @ptrCast((&f.result)[0..1])); + f.any_future = null; + return f.result; + } + }; +} + +/// `s` is a struct instance that contains a function like this: +/// ``` +/// struct { +/// pub fn start(s: S) Result { ... } +/// } +/// ``` +/// where `Result` is any type. +pub fn async(io: Io, s: anytype) Future(@typeInfo(@TypeOf(@TypeOf(s).start)).@"fn".return_type.?) { + const S = @TypeOf(s); + const Result = @typeInfo(@TypeOf(S.start)).@"fn".return_type.?; + const TypeErased = struct { + fn start(context: ?*anyopaque, result: *anyopaque) void { + const context_casted: *const S = @alignCast(@ptrCast(context)); + const result_casted: *Result = @ptrCast(@alignCast(result)); + result_casted.* = S.start(context_casted.*); + } + }; + var future: Future(Result) = undefined; + future.any_future = io.vtable.async(io.userdata, @ptrCast((&future.result)[0..1]), @constCast(&s), TypeErased.start); + return future; +} diff --git a/lib/std/Thread/Pool.zig b/lib/std/Thread/Pool.zig index e836665d70..86e8e87056 100644 --- a/lib/std/Thread/Pool.zig +++ b/lib/std/Thread/Pool.zig @@ -7,6 +7,7 @@ mutex: std.Thread.Mutex = .{}, cond: std.Thread.Condition = .{}, run_queue: std.SinglyLinkedList = .{}, is_running: bool = true, +/// Must be a thread-safe allocator. allocator: std.mem.Allocator, threads: if (builtin.single_threaded) [0]std.Thread else []std.Thread, ids: if (builtin.single_threaded) struct { @@ -16,12 +17,12 @@ ids: if (builtin.single_threaded) struct { } } else std.AutoArrayHashMapUnmanaged(std.Thread.Id, void), -const Runnable = struct { +pub const Runnable = struct { runFn: RunProto, node: std.SinglyLinkedList.Node = .{}, }; -const RunProto = *const fn (*Runnable, id: ?usize) void; +pub const RunProto = *const fn (*Runnable, id: ?usize) void; pub const Options = struct { allocator: std.mem.Allocator, @@ -117,12 +118,6 @@ pub fn spawnWg(pool: *Pool, wait_group: *WaitGroup, comptime func: anytype, args const closure: *@This() = @alignCast(@fieldParentPtr("runnable", runnable)); @call(.auto, func, closure.arguments); closure.wait_group.finish(); - - // The thread pool's allocator is protected by the mutex. - const mutex = &closure.pool.mutex; - mutex.lock(); - defer mutex.unlock(); - closure.pool.allocator.destroy(closure); } }; @@ -179,12 +174,6 @@ pub fn spawnWgId(pool: *Pool, wait_group: *WaitGroup, comptime func: anytype, ar const closure: *@This() = @alignCast(@fieldParentPtr("runnable", runnable)); @call(.auto, func, .{id.?} ++ closure.arguments); closure.wait_group.finish(); - - // The thread pool's allocator is protected by the mutex. - const mutex = &closure.pool.mutex; - mutex.lock(); - defer mutex.unlock(); - closure.pool.allocator.destroy(closure); } }; @@ -228,12 +217,6 @@ pub fn spawn(pool: *Pool, comptime func: anytype, args: anytype) !void { fn runFn(runnable: *Runnable, _: ?usize) void { const closure: *@This() = @alignCast(@fieldParentPtr("runnable", runnable)); @call(.auto, func, closure.arguments); - - // The thread pool's allocator is protected by the mutex. - const mutex = &closure.pool.mutex; - mutex.lock(); - defer mutex.unlock(); - closure.pool.allocator.destroy(closure); } };