From 8eaebf5939491b005e392698ec6e890ebaf0f86b Mon Sep 17 00:00:00 2001 From: Loris Cro Date: Tue, 4 Nov 2025 21:11:40 +0100 Subject: [PATCH] Io.Threaded PoC reimplementation This is a reimplementation of Io.Threaded that fixes the issues highlighted in the recent Zulip discussion. It's poorly tested but it does successfully run to completion the litmust test example that I offered in the discussion. This implementation has the following key design decisions: - `t.cpu_count` is used as the threadpool size. - `t.concurrency_limit` is used as the maximum number of "burst, one-shot" threads that can be spawned by `io.concurrent` past `t.cpu_count`. - `t.available_thread_count` is the number of threads in the pool that is not currently busy with work (the bookkeeping happens in the worker function). - `t.one_shot_thread_count` is the number of active threads that were spawned by `io.concurrent` past `t.cpu_count`. In this implementation: - `io.async` first tries to decrement `t.available_thread_count`. If there are no threads available, it tries to spawn a new one if possible, otherwise it runs the task immediately. - `io.concurrent` first tries to use a thread in the pool same as `io.async`, but on failure (no available threads and pool size limit reached) it tries to spawn a new one-shot thread. One shot threads run a different main function that just executes one task, decrements the number of active one shot threads, and then exits. A relevant future improvement is to have one-shot threads stay on for a few seconds (and potentially pick up a new task) to amortize spawning costs. --- lib/std/Io/Threaded.zig | 198 +++++++++++++++++++++------------------- 1 file changed, 105 insertions(+), 93 deletions(-) diff --git a/lib/std/Io/Threaded.zig b/lib/std/Io/Threaded.zig index 8dd9ae13fc..b6ba5f9a82 100644 --- a/lib/std/Io/Threaded.zig +++ b/lib/std/Io/Threaded.zig @@ -24,8 +24,10 @@ run_queue: std.SinglyLinkedList = .{}, join_requested: bool = false, threads: std.ArrayList(std.Thread), stack_size: usize, -cpu_count: std.Thread.CpuCountError!usize, -concurrent_count: usize, +cpu_count: usize, // 0 means no limit +concurrency_limit: usize, // 0 means no limit +available_thread_count: usize = 0, +one_shot_thread_count: usize = 0, wsa: if (is_windows) Wsa else struct {} = .{}, @@ -70,8 +72,6 @@ const Closure = struct { start: Start, node: std.SinglyLinkedList.Node = .{}, cancel_tid: CancelId, - /// Whether this task bumps minimum number of threads in the pool. - is_concurrent: bool, const Start = *const fn (*Closure) void; @@ -103,20 +103,20 @@ pub fn init( /// here. gpa: Allocator, ) Threaded { + assert(!builtin.single_threaded); // use 'init_single_threaded' instead + var t: Threaded = .{ .allocator = gpa, .threads = .empty, .stack_size = std.Thread.SpawnConfig.default_stack_size, - .cpu_count = std.Thread.getCpuCount(), - .concurrent_count = 0, + .cpu_count = std.Thread.getCpuCount() catch 0, + .concurrency_limit = 0, .old_sig_io = undefined, .old_sig_pipe = undefined, .have_signal_handler = false, }; - if (t.cpu_count) |n| { - t.threads.ensureTotalCapacityPrecise(gpa, n - 1) catch {}; - } else |_| {} + t.threads.ensureTotalCapacity(gpa, t.cpu_count) catch {}; if (posix.Sigaction != void) { // This causes sending `posix.SIG.IO` to thread to interrupt blocking @@ -145,7 +145,7 @@ pub const init_single_threaded: Threaded = .{ .threads = .empty, .stack_size = std.Thread.SpawnConfig.default_stack_size, .cpu_count = 1, - .concurrent_count = 0, + .concurrency_limit = 0, .old_sig_io = undefined, .old_sig_pipe = undefined, .have_signal_handler = false, @@ -184,18 +184,22 @@ fn worker(t: *Threaded) void { while (t.run_queue.popFirst()) |closure_node| { t.mutex.unlock(); const closure: *Closure = @fieldParentPtr("node", closure_node); - const is_concurrent = closure.is_concurrent; closure.start(closure); t.mutex.lock(); - if (is_concurrent) { - t.concurrent_count -= 1; - } + t.available_thread_count += 1; } if (t.join_requested) break; t.cond.wait(&t.mutex); } } +fn oneShotWorker(t: *Threaded, closure: *Closure) void { + closure.start(closure); + t.mutex.lock(); + defer t.mutex.unlock(); + t.one_shot_thread_count -= 1; +} + pub fn io(t: *Threaded) Io { return .{ .userdata = t, @@ -432,7 +436,6 @@ const AsyncClosure = struct { fn init( gpa: Allocator, - mode: enum { async, concurrent }, result_len: usize, result_alignment: std.mem.Alignment, context: []const u8, @@ -454,10 +457,6 @@ const AsyncClosure = struct { .closure = .{ .cancel_tid = .none, .start = start, - .is_concurrent = switch (mode) { - .async => false, - .concurrent => true, - }, }, .func = func, .context_alignment = context_alignment, @@ -490,55 +489,51 @@ fn async( context_alignment: std.mem.Alignment, start: *const fn (context: *const anyopaque, result: *anyopaque) void, ) ?*Io.AnyFuture { - if (builtin.single_threaded) { + const t: *Threaded = @ptrCast(@alignCast(userdata)); + if (t.cpu_count == 1) { start(context.ptr, result.ptr); return null; } - - const t: *Threaded = @ptrCast(@alignCast(userdata)); - const cpu_count = t.cpu_count catch { - return concurrent(userdata, result.len, result_alignment, context, context_alignment, start) catch { - start(context.ptr, result.ptr); - return null; - }; - }; - const gpa = t.allocator; - const ac = AsyncClosure.init(gpa, .async, result.len, result_alignment, context, context_alignment, start) catch { + const ac = AsyncClosure.init(gpa, result.len, result_alignment, context, context_alignment, start) catch { start(context.ptr, result.ptr); return null; }; t.mutex.lock(); - const thread_capacity = cpu_count - 1 + t.concurrent_count; - - t.threads.ensureTotalCapacityPrecise(gpa, thread_capacity) catch { - t.mutex.unlock(); - ac.deinit(gpa); - start(context.ptr, result.ptr); - return null; - }; - - t.run_queue.prepend(&ac.closure.node); - - if (t.threads.items.len < thread_capacity) { - const thread = std.Thread.spawn(.{ .stack_size = t.stack_size }, worker, .{t}) catch { - if (t.threads.items.len == 0) { - assert(t.run_queue.popFirst() == &ac.closure.node); - t.mutex.unlock(); - ac.deinit(gpa); - start(context.ptr, result.ptr); - return null; - } - // Rely on other workers to do it. + if (t.available_thread_count == 0) { + if (t.cpu_count != 0 and t.threads.items.len >= t.cpu_count) { t.mutex.unlock(); - t.cond.signal(); - return @ptrCast(ac); + ac.deinit(gpa); + start(context.ptr, result.ptr); + return null; + } + + t.threads.ensureUnusedCapacity(gpa, 1) catch { + t.mutex.unlock(); + ac.deinit(gpa); + start(context.ptr, result.ptr); + return null; }; + + const thread = std.Thread.spawn( + .{ .stack_size = t.stack_size }, + worker, + .{t}, + ) catch { + t.mutex.unlock(); + ac.deinit(gpa); + start(context.ptr, result.ptr); + return null; + }; + t.threads.appendAssumeCapacity(thread); + } else { + t.available_thread_count -= 1; } + t.run_queue.prepend(&ac.closure.node); t.mutex.unlock(); t.cond.signal(); return @ptrCast(ac); @@ -555,38 +550,49 @@ fn concurrent( if (builtin.single_threaded) return error.ConcurrencyUnavailable; const t: *Threaded = @ptrCast(@alignCast(userdata)); - const cpu_count = t.cpu_count catch 1; const gpa = t.allocator; - const ac = AsyncClosure.init(gpa, .concurrent, result_len, result_alignment, context, context_alignment, start) catch { + const ac = AsyncClosure.init(gpa, result_len, result_alignment, context, context_alignment, start) catch { return error.ConcurrencyUnavailable; }; + errdefer ac.deinit(gpa); t.mutex.lock(); + defer t.mutex.unlock(); - t.concurrent_count += 1; - const thread_capacity = cpu_count - 1 + t.concurrent_count; - - t.threads.ensureTotalCapacity(gpa, thread_capacity) catch { - t.mutex.unlock(); - ac.deinit(gpa); - return error.ConcurrencyUnavailable; - }; - - t.run_queue.prepend(&ac.closure.node); - - if (t.threads.items.len < thread_capacity) { - const thread = std.Thread.spawn(.{ .stack_size = t.stack_size }, worker, .{t}) catch { - assert(t.run_queue.popFirst() == &ac.closure.node); - t.mutex.unlock(); - ac.deinit(gpa); - return error.ConcurrencyUnavailable; - }; - t.threads.appendAssumeCapacity(thread); + // If there's an avilable thread, use it. + if (t.available_thread_count > 0) { + t.available_thread_count -= 1; + t.run_queue.prepend(&ac.closure.node); + t.cond.signal(); + return @ptrCast(ac); } - t.mutex.unlock(); - t.cond.signal(); + // If we can spawn a normal worker, spawn it and use it. + if (t.cpu_count == 0 or t.threads.items.len < t.cpu_count) { + t.threads.ensureUnusedCapacity(gpa, 1) catch return error.ConcurrencyUnavailable; + + const thread = std.Thread.spawn(.{ .stack_size = t.stack_size }, worker, .{t}) catch + return error.ConcurrencyUnavailable; + + t.threads.appendAssumeCapacity(thread); + t.run_queue.prepend(&ac.closure.node); + t.cond.signal(); + return @ptrCast(ac); + } + + // If we have a concurrencty limit and we havent' hit it yet, + // spawn a new one-shot thread. + if (t.concurrency_limit != 0 and t.one_shot_thread_count >= t.concurrency_limit) + return error.ConcurrencyUnavailable; + + t.one_shot_thread_count += 1; + errdefer t.one_shot_thread_count -= 1; + + const thread = std.Thread.spawn(.{ .stack_size = t.stack_size }, oneShotWorker, .{ t, &ac.closure }) catch + return error.ConcurrencyUnavailable; + thread.detach(); + return @ptrCast(ac); } @@ -652,7 +658,6 @@ const GroupClosure = struct { .closure = .{ .cancel_tid = .none, .start = start, - .is_concurrent = false, }, .t = t, .group = group, @@ -684,12 +689,9 @@ fn groupAsync( if (builtin.single_threaded) return start(group, context.ptr); const t: *Threaded = @ptrCast(@alignCast(userdata)); - const cpu_count = t.cpu_count catch 1; - const gpa = t.allocator; - const gc = GroupClosure.init(gpa, t, group, context, context_alignment, start) catch { + const gc = GroupClosure.init(gpa, t, group, context, context_alignment, start) catch return start(group, context.ptr); - }; t.mutex.lock(); @@ -697,26 +699,36 @@ fn groupAsync( gc.node = .{ .next = @ptrCast(@alignCast(group.token)) }; group.token = &gc.node; - const thread_capacity = cpu_count - 1 + t.concurrent_count; + if (t.available_thread_count == 0) { + if (t.cpu_count != 0 and t.threads.items.len >= t.cpu_count) { + t.mutex.unlock(); + gc.deinit(gpa); + return start(group, context.ptr); + } - t.threads.ensureTotalCapacityPrecise(gpa, thread_capacity) catch { - t.mutex.unlock(); - gc.deinit(gpa); - return start(group, context.ptr); - }; - - t.run_queue.prepend(&gc.closure.node); - - if (t.threads.items.len < thread_capacity) { - const thread = std.Thread.spawn(.{ .stack_size = t.stack_size }, worker, .{t}) catch { - assert(t.run_queue.popFirst() == &gc.closure.node); + t.threads.ensureUnusedCapacity(gpa, 1) catch { t.mutex.unlock(); gc.deinit(gpa); return start(group, context.ptr); }; + + const thread = std.Thread.spawn( + .{ .stack_size = t.stack_size }, + worker, + .{t}, + ) catch { + t.mutex.unlock(); + gc.deinit(gpa); + return start(group, context.ptr); + }; + t.threads.appendAssumeCapacity(thread); + } else { + t.available_thread_count -= 1; } + t.run_queue.prepend(&gc.closure.node); + // This needs to be done before unlocking the mutex to avoid a race with // the associated task finishing. const group_state: *std.atomic.Value(usize) = @ptrCast(&group.state);