diff --git a/lib/std/Io.zig b/lib/std/Io.zig index d89f2d23af..45736d39c4 100644 --- a/lib/std/Io.zig +++ b/lib/std/Io.zig @@ -552,6 +552,8 @@ test { _ = Reader; _ = Writer; _ = tty; + _ = Evented; + _ = Threaded; _ = @import("Io/test.zig"); } @@ -596,7 +598,7 @@ pub const VTable = struct { context: []const u8, context_alignment: std.mem.Alignment, start: *const fn (context: *const anyopaque, result: *anyopaque) void, - ) error{OutOfMemory}!*AnyFuture, + ) ConcurrentError!*AnyFuture, /// This function is only called when `async` returns a non-null value. /// /// Thread-safe. @@ -1557,6 +1559,12 @@ pub fn async( return future; } +pub const ConcurrentError = error{ + /// May occur due to a temporary condition such as resource exhaustion, or + /// to the Io implementation not supporting concurrency. + ConcurrencyUnavailable, +}; + /// Calls `function` with `args`, such that the return value of the function is /// not guaranteed to be available until `await` is called, allowing the caller /// to progress while waiting for any `Io` operations. @@ -1568,7 +1576,7 @@ pub fn concurrent( io: Io, function: anytype, args: std.meta.ArgsTuple(@TypeOf(function)), -) error{OutOfMemory}!Future(@typeInfo(@TypeOf(function)).@"fn".return_type.?) { +) ConcurrentError!Future(@typeInfo(@TypeOf(function)).@"fn".return_type.?) { const Result = @typeInfo(@TypeOf(function)).@"fn".return_type.?; const Args = @TypeOf(args); const TypeErased = struct { diff --git a/lib/std/Io/IoUring.zig b/lib/std/Io/IoUring.zig index 9ec1dafb31..5561cdebd2 100644 --- a/lib/std/Io/IoUring.zig +++ b/lib/std/Io/IoUring.zig @@ -866,7 +866,7 @@ fn concurrent( context: []const u8, context_alignment: Alignment, start: *const fn (context: *const anyopaque, result: *anyopaque) void, -) error{OutOfMemory}!*std.Io.AnyFuture { +) Io.ConcurrentError!*std.Io.AnyFuture { assert(result_alignment.compare(.lte, Fiber.max_result_align)); // TODO assert(context_alignment.compare(.lte, Fiber.max_context_align)); // TODO assert(result_len <= Fiber.max_result_size); // TODO diff --git a/lib/std/Io/Kqueue.zig b/lib/std/Io/Kqueue.zig index b41a0260e0..fd5baaddde 100644 --- a/lib/std/Io/Kqueue.zig +++ b/lib/std/Io/Kqueue.zig @@ -933,14 +933,14 @@ fn concurrent( context: []const u8, context_alignment: Alignment, start: *const fn (context: *const anyopaque, result: *anyopaque) void, -) error{OutOfMemory}!*Io.AnyFuture { +) Io.ConcurrentError!*Io.AnyFuture { const k: *Kqueue = @ptrCast(@alignCast(userdata)); assert(result_alignment.compare(.lte, Fiber.max_result_align)); // TODO assert(context_alignment.compare(.lte, Fiber.max_context_align)); // TODO assert(result_len <= Fiber.max_result_size); // TODO assert(context.len <= Fiber.max_context_size); // TODO - const fiber = try Fiber.allocate(k); + const fiber = Fiber.allocate(k) catch return error.ConcurrencyUnavailable; std.log.debug("allocated {*}", .{fiber}); const closure: *AsyncClosure = .fromFiber(fiber); diff --git a/lib/std/Io/Threaded.zig b/lib/std/Io/Threaded.zig index bbbb6a03eb..da8b7fb211 100644 --- a/lib/std/Io/Threaded.zig +++ b/lib/std/Io/Threaded.zig @@ -91,9 +91,9 @@ pub fn init( return t; } -/// Statically initialize such that any call to the following functions will -/// fail with `error.OutOfMemory`: -/// * `Io.VTable.concurrent` +/// Statically initialize such that calls to `Io.VTable.concurrent` will fail +/// with `error.ConcurrencyUnavailable`. +/// /// When initialized this way, `deinit` is safe, but unnecessary to call. pub const init_single_threaded: Threaded = .{ .allocator = .failing, @@ -481,8 +481,8 @@ fn concurrent( context: []const u8, context_alignment: std.mem.Alignment, start: *const fn (context: *const anyopaque, result: *anyopaque) void, -) error{OutOfMemory}!*Io.AnyFuture { - if (builtin.single_threaded) unreachable; +) Io.ConcurrentError!*Io.AnyFuture { + if (builtin.single_threaded) return error.ConcurrencyUnavailable; const t: *Threaded = @ptrCast(@alignCast(userdata)); const cpu_count = t.cpu_count catch 1; @@ -490,7 +490,9 @@ fn concurrent( const context_offset = context_alignment.forward(@sizeOf(AsyncClosure)); const result_offset = result_alignment.forward(context_offset + context.len); const n = result_offset + result_len; - const ac: *AsyncClosure = @ptrCast(@alignCast(try gpa.alignedAlloc(u8, .of(AsyncClosure), n))); + const ac_bytes = gpa.alignedAlloc(u8, .of(AsyncClosure), n) catch + return error.ConcurrencyUnavailable; + const ac: *AsyncClosure = @ptrCast(@alignCast(ac_bytes)); ac.* = .{ .closure = .{ @@ -515,7 +517,7 @@ fn concurrent( t.threads.ensureTotalCapacity(gpa, thread_capacity) catch { t.mutex.unlock(); ac.free(gpa, result_len); - return error.OutOfMemory; + return error.ConcurrencyUnavailable; }; t.run_queue.prepend(&ac.closure.node); @@ -525,7 +527,7 @@ fn concurrent( assert(t.run_queue.popFirst() == &ac.closure.node); t.mutex.unlock(); ac.free(gpa, result_len); - return error.OutOfMemory; + return error.ConcurrencyUnavailable; }; t.threads.appendAssumeCapacity(thread); }