std.Io: adjust concurrent error set

Now std.Io.Threaded can return error.ConcurrencyUnavailable rather than
asserting. This is handy for logic that wants to try a concurrent
implementation but then fall back to a synchronous one.
This commit is contained in:
Andrew Kelley 2025-10-23 20:55:46 -07:00
parent ecdc00466c
commit ae86c0f529
4 changed files with 23 additions and 13 deletions

View File

@ -552,6 +552,8 @@ test {
_ = Reader; _ = Reader;
_ = Writer; _ = Writer;
_ = tty; _ = tty;
_ = Evented;
_ = Threaded;
_ = @import("Io/test.zig"); _ = @import("Io/test.zig");
} }
@ -596,7 +598,7 @@ pub const VTable = struct {
context: []const u8, context: []const u8,
context_alignment: std.mem.Alignment, context_alignment: std.mem.Alignment,
start: *const fn (context: *const anyopaque, result: *anyopaque) void, start: *const fn (context: *const anyopaque, result: *anyopaque) void,
) error{OutOfMemory}!*AnyFuture, ) ConcurrentError!*AnyFuture,
/// This function is only called when `async` returns a non-null value. /// This function is only called when `async` returns a non-null value.
/// ///
/// Thread-safe. /// Thread-safe.
@ -1557,6 +1559,12 @@ pub fn async(
return future; return future;
} }
pub const ConcurrentError = error{
/// May occur due to a temporary condition such as resource exhaustion, or
/// to the Io implementation not supporting concurrency.
ConcurrencyUnavailable,
};
/// Calls `function` with `args`, such that the return value of the function is /// Calls `function` with `args`, such that the return value of the function is
/// not guaranteed to be available until `await` is called, allowing the caller /// not guaranteed to be available until `await` is called, allowing the caller
/// to progress while waiting for any `Io` operations. /// to progress while waiting for any `Io` operations.
@ -1568,7 +1576,7 @@ pub fn concurrent(
io: Io, io: Io,
function: anytype, function: anytype,
args: std.meta.ArgsTuple(@TypeOf(function)), args: std.meta.ArgsTuple(@TypeOf(function)),
) error{OutOfMemory}!Future(@typeInfo(@TypeOf(function)).@"fn".return_type.?) { ) ConcurrentError!Future(@typeInfo(@TypeOf(function)).@"fn".return_type.?) {
const Result = @typeInfo(@TypeOf(function)).@"fn".return_type.?; const Result = @typeInfo(@TypeOf(function)).@"fn".return_type.?;
const Args = @TypeOf(args); const Args = @TypeOf(args);
const TypeErased = struct { const TypeErased = struct {

View File

@ -866,7 +866,7 @@ fn concurrent(
context: []const u8, context: []const u8,
context_alignment: Alignment, context_alignment: Alignment,
start: *const fn (context: *const anyopaque, result: *anyopaque) void, start: *const fn (context: *const anyopaque, result: *anyopaque) void,
) error{OutOfMemory}!*std.Io.AnyFuture { ) Io.ConcurrentError!*std.Io.AnyFuture {
assert(result_alignment.compare(.lte, Fiber.max_result_align)); // TODO assert(result_alignment.compare(.lte, Fiber.max_result_align)); // TODO
assert(context_alignment.compare(.lte, Fiber.max_context_align)); // TODO assert(context_alignment.compare(.lte, Fiber.max_context_align)); // TODO
assert(result_len <= Fiber.max_result_size); // TODO assert(result_len <= Fiber.max_result_size); // TODO

View File

@ -933,14 +933,14 @@ fn concurrent(
context: []const u8, context: []const u8,
context_alignment: Alignment, context_alignment: Alignment,
start: *const fn (context: *const anyopaque, result: *anyopaque) void, start: *const fn (context: *const anyopaque, result: *anyopaque) void,
) error{OutOfMemory}!*Io.AnyFuture { ) Io.ConcurrentError!*Io.AnyFuture {
const k: *Kqueue = @ptrCast(@alignCast(userdata)); const k: *Kqueue = @ptrCast(@alignCast(userdata));
assert(result_alignment.compare(.lte, Fiber.max_result_align)); // TODO assert(result_alignment.compare(.lte, Fiber.max_result_align)); // TODO
assert(context_alignment.compare(.lte, Fiber.max_context_align)); // TODO assert(context_alignment.compare(.lte, Fiber.max_context_align)); // TODO
assert(result_len <= Fiber.max_result_size); // TODO assert(result_len <= Fiber.max_result_size); // TODO
assert(context.len <= Fiber.max_context_size); // TODO assert(context.len <= Fiber.max_context_size); // TODO
const fiber = try Fiber.allocate(k); const fiber = Fiber.allocate(k) catch return error.ConcurrencyUnavailable;
std.log.debug("allocated {*}", .{fiber}); std.log.debug("allocated {*}", .{fiber});
const closure: *AsyncClosure = .fromFiber(fiber); const closure: *AsyncClosure = .fromFiber(fiber);

View File

@ -91,9 +91,9 @@ pub fn init(
return t; return t;
} }
/// Statically initialize such that any call to the following functions will /// Statically initialize such that calls to `Io.VTable.concurrent` will fail
/// fail with `error.OutOfMemory`: /// with `error.ConcurrencyUnavailable`.
/// * `Io.VTable.concurrent` ///
/// When initialized this way, `deinit` is safe, but unnecessary to call. /// When initialized this way, `deinit` is safe, but unnecessary to call.
pub const init_single_threaded: Threaded = .{ pub const init_single_threaded: Threaded = .{
.allocator = .failing, .allocator = .failing,
@ -481,8 +481,8 @@ fn concurrent(
context: []const u8, context: []const u8,
context_alignment: std.mem.Alignment, context_alignment: std.mem.Alignment,
start: *const fn (context: *const anyopaque, result: *anyopaque) void, start: *const fn (context: *const anyopaque, result: *anyopaque) void,
) error{OutOfMemory}!*Io.AnyFuture { ) Io.ConcurrentError!*Io.AnyFuture {
if (builtin.single_threaded) unreachable; if (builtin.single_threaded) return error.ConcurrencyUnavailable;
const t: *Threaded = @ptrCast(@alignCast(userdata)); const t: *Threaded = @ptrCast(@alignCast(userdata));
const cpu_count = t.cpu_count catch 1; const cpu_count = t.cpu_count catch 1;
@ -490,7 +490,9 @@ fn concurrent(
const context_offset = context_alignment.forward(@sizeOf(AsyncClosure)); const context_offset = context_alignment.forward(@sizeOf(AsyncClosure));
const result_offset = result_alignment.forward(context_offset + context.len); const result_offset = result_alignment.forward(context_offset + context.len);
const n = result_offset + result_len; const n = result_offset + result_len;
const ac: *AsyncClosure = @ptrCast(@alignCast(try gpa.alignedAlloc(u8, .of(AsyncClosure), n))); const ac_bytes = gpa.alignedAlloc(u8, .of(AsyncClosure), n) catch
return error.ConcurrencyUnavailable;
const ac: *AsyncClosure = @ptrCast(@alignCast(ac_bytes));
ac.* = .{ ac.* = .{
.closure = .{ .closure = .{
@ -515,7 +517,7 @@ fn concurrent(
t.threads.ensureTotalCapacity(gpa, thread_capacity) catch { t.threads.ensureTotalCapacity(gpa, thread_capacity) catch {
t.mutex.unlock(); t.mutex.unlock();
ac.free(gpa, result_len); ac.free(gpa, result_len);
return error.OutOfMemory; return error.ConcurrencyUnavailable;
}; };
t.run_queue.prepend(&ac.closure.node); t.run_queue.prepend(&ac.closure.node);
@ -525,7 +527,7 @@ fn concurrent(
assert(t.run_queue.popFirst() == &ac.closure.node); assert(t.run_queue.popFirst() == &ac.closure.node);
t.mutex.unlock(); t.mutex.unlock();
ac.free(gpa, result_len); ac.free(gpa, result_len);
return error.OutOfMemory; return error.ConcurrencyUnavailable;
}; };
t.threads.appendAssumeCapacity(thread); t.threads.appendAssumeCapacity(thread);
} }