diff --git a/lib/std/Io/Threaded.zig b/lib/std/Io/Threaded.zig index 11f9b149ca..efd994bc42 100644 --- a/lib/std/Io/Threaded.zig +++ b/lib/std/Io/Threaded.zig @@ -24,8 +24,10 @@ run_queue: std.SinglyLinkedList = .{}, join_requested: bool = false, threads: std.ArrayListUnmanaged(std.Thread), stack_size: usize, -cpu_count: std.Thread.CpuCountError!usize, -concurrent_count: usize, +cpu_count: usize, // 0 means no limit +concurrency_limit: usize, // 0 means no limit +available_thread_count: usize = 0, +one_shot_thread_count: usize = 0, wsa: if (is_windows) Wsa else struct {} = .{}, @@ -70,8 +72,6 @@ const Closure = struct { start: Start, node: std.SinglyLinkedList.Node = .{}, cancel_tid: CancelId, - /// Whether this task bumps minimum number of threads in the pool. - is_concurrent: bool, const Start = *const fn (*Closure) void; @@ -103,20 +103,20 @@ pub fn init( /// here. gpa: Allocator, ) Threaded { + assert(!builtin.single_threaded); // use 'init_single_threaded' instead + var t: Threaded = .{ .allocator = gpa, .threads = .empty, .stack_size = std.Thread.SpawnConfig.default_stack_size, - .cpu_count = std.Thread.getCpuCount(), - .concurrent_count = 0, + .cpu_count = std.Thread.getCpuCount() catch 0, + .concurrency_limit = 0, .old_sig_io = undefined, .old_sig_pipe = undefined, .have_signal_handler = false, }; - if (t.cpu_count) |n| { - t.threads.ensureTotalCapacityPrecise(gpa, n - 1) catch {}; - } else |_| {} + t.threads.ensureTotalCapacity(gpa, t.cpu_count) catch {}; if (posix.Sigaction != void) { // This causes sending `posix.SIG.IO` to thread to interrupt blocking @@ -145,7 +145,7 @@ pub const init_single_threaded: Threaded = .{ .threads = .empty, .stack_size = std.Thread.SpawnConfig.default_stack_size, .cpu_count = 1, - .concurrent_count = 0, + .concurrency_limit = 0, .old_sig_io = undefined, .old_sig_pipe = undefined, .have_signal_handler = false, @@ -184,18 +184,22 @@ fn worker(t: *Threaded) void { while (t.run_queue.popFirst()) |closure_node| { t.mutex.unlock(); const closure: *Closure = @fieldParentPtr("node", closure_node); - const is_concurrent = closure.is_concurrent; closure.start(closure); t.mutex.lock(); - if (is_concurrent) { - t.concurrent_count -= 1; - } + t.available_thread_count += 1; } if (t.join_requested) break; t.cond.wait(&t.mutex); } } +fn oneShotWorker(t: *Threaded, closure: *Closure) void { + closure.start(closure); + t.mutex.lock(); + defer t.mutex.unlock(); + t.one_shot_thread_count -= 1; +} + pub fn io(t: *Threaded) Io { return .{ .userdata = t, @@ -448,22 +452,22 @@ fn async( context_alignment: std.mem.Alignment, start: *const fn (context: *const anyopaque, result: *anyopaque) void, ) ?*Io.AnyFuture { - if (builtin.single_threaded) { + const t: *Threaded = @ptrCast(@alignCast(userdata)); + if (t.cpu_count == 1) { start(context.ptr, result.ptr); return null; } - const t: *Threaded = @ptrCast(@alignCast(userdata)); - const cpu_count = t.cpu_count catch { - return concurrent(userdata, result.len, result_alignment, context, context_alignment, start) catch { - start(context.ptr, result.ptr); - return null; - }; - }; + const gpa = t.allocator; const context_offset = context_alignment.forward(@sizeOf(AsyncClosure)); const result_offset = result_alignment.forward(context_offset + context.len); const n = result_offset + result.len; - const ac: *AsyncClosure = @ptrCast(@alignCast(gpa.alignedAlloc(u8, .of(AsyncClosure), n) catch { + + const ac: *AsyncClosure = @ptrCast(@alignCast(gpa.alignedAlloc( + u8, + .of(AsyncClosure), + n, + ) catch { start(context.ptr, result.ptr); return null; })); @@ -472,7 +476,6 @@ fn async( .closure = .{ .cancel_tid = .none, .start = AsyncClosure.start, - .is_concurrent = false, }, .func = start, .context_alignment = context_alignment, @@ -485,34 +488,38 @@ fn async( t.mutex.lock(); - const thread_capacity = cpu_count - 1 + t.concurrent_count; - - t.threads.ensureTotalCapacityPrecise(gpa, thread_capacity) catch { - t.mutex.unlock(); - ac.free(gpa, result.len); - start(context.ptr, result.ptr); - return null; - }; - - t.run_queue.prepend(&ac.closure.node); - - if (t.threads.items.len < thread_capacity) { - const thread = std.Thread.spawn(.{ .stack_size = t.stack_size }, worker, .{t}) catch { - if (t.threads.items.len == 0) { - assert(t.run_queue.popFirst() == &ac.closure.node); - t.mutex.unlock(); - ac.free(gpa, result.len); - start(context.ptr, result.ptr); - return null; - } - // Rely on other workers to do it. + if (t.available_thread_count == 0) { + if (t.cpu_count != 0 and t.threads.items.len >= t.cpu_count) { t.mutex.unlock(); - t.cond.signal(); - return @ptrCast(ac); + ac.free(gpa, result.len); + start(context.ptr, result.ptr); + return null; + } + + t.threads.ensureUnusedCapacity(gpa, 1) catch { + t.mutex.unlock(); + ac.free(gpa, result.len); + start(context.ptr, result.ptr); + return null; }; + + const thread = std.Thread.spawn( + .{ .stack_size = t.stack_size }, + worker, + .{t}, + ) catch { + t.mutex.unlock(); + ac.free(gpa, result.len); + start(context.ptr, result.ptr); + return null; + }; + t.threads.appendAssumeCapacity(thread); + } else { + t.available_thread_count -= 1; } + t.run_queue.prepend(&ac.closure.node); t.mutex.unlock(); t.cond.signal(); return @ptrCast(ac); @@ -529,20 +536,21 @@ fn concurrent( if (builtin.single_threaded) return error.ConcurrencyUnavailable; const t: *Threaded = @ptrCast(@alignCast(userdata)); - const cpu_count = t.cpu_count catch 1; const gpa = t.allocator; const context_offset = context_alignment.forward(@sizeOf(AsyncClosure)); const result_offset = result_alignment.forward(context_offset + context.len); const n = result_offset + result_len; - const ac_bytes = gpa.alignedAlloc(u8, .of(AsyncClosure), n) catch - return error.ConcurrencyUnavailable; - const ac: *AsyncClosure = @ptrCast(@alignCast(ac_bytes)); + const ac_bytes = gpa.alignedAlloc(u8, .of(AsyncClosure), n) catch { + return error.ConcurrencyUnavailable; + }; + errdefer gpa.free(ac_bytes); + + const ac: *AsyncClosure = @ptrCast(@alignCast(ac_bytes)); ac.* = .{ .closure = .{ .cancel_tid = .none, .start = AsyncClosure.start, - .is_concurrent = true, }, .func = start, .context_alignment = context_alignment, @@ -550,33 +558,52 @@ fn concurrent( .reset_event = .unset, .select_condition = null, }; + @memcpy(ac.contextPointer()[0..context.len], context); t.mutex.lock(); + defer t.mutex.unlock(); - t.concurrent_count += 1; - const thread_capacity = cpu_count - 1 + t.concurrent_count; - - t.threads.ensureTotalCapacity(gpa, thread_capacity) catch { - t.mutex.unlock(); - ac.free(gpa, result_len); - return error.ConcurrencyUnavailable; - }; - - t.run_queue.prepend(&ac.closure.node); - - if (t.threads.items.len < thread_capacity) { - const thread = std.Thread.spawn(.{ .stack_size = t.stack_size }, worker, .{t}) catch { - assert(t.run_queue.popFirst() == &ac.closure.node); - t.mutex.unlock(); - ac.free(gpa, result_len); - return error.ConcurrencyUnavailable; - }; - t.threads.appendAssumeCapacity(thread); + // If there's an avilable thread, use it. + if (t.available_thread_count > 0) { + t.available_thread_count -= 1; + t.run_queue.prepend(&ac.closure.node); + t.cond.signal(); + return @ptrCast(ac); } - t.mutex.unlock(); - t.cond.signal(); + // If we can spawn a normal worker, spawn it and use it. + if (t.cpu_count == 0 or t.threads.items.len < t.cpu_count) { + t.threads.ensureUnusedCapacity(gpa, 1) catch return error.ConcurrencyUnavailable; + + const thread = std.Thread.spawn( + .{ .stack_size = t.stack_size }, + worker, + .{t}, + ) catch return error.ConcurrencyUnavailable; + + t.threads.appendAssumeCapacity(thread); + t.run_queue.prepend(&ac.closure.node); + t.cond.signal(); + return @ptrCast(ac); + } + + // If we have a concurrencty limit and we havent' hit it yet, + // spawn a new one-shot thread. + if (t.concurrency_limit != 0 and t.one_shot_thread_count >= t.concurrency_limit) { + return error.ConcurrencyUnavailable; + } + + t.one_shot_thread_count += 1; + errdefer t.one_shot_thread_count -= 1; + + const thread = std.Thread.spawn( + .{ .stack_size = t.stack_size }, + oneShotWorker, + .{ t, &ac.closure }, + ) catch return error.ConcurrencyUnavailable; + thread.detach(); + return @ptrCast(ac); } @@ -647,7 +674,6 @@ fn groupAsync( ) void { if (builtin.single_threaded) return start(group, context.ptr); const t: *Threaded = @ptrCast(@alignCast(userdata)); - const cpu_count = t.cpu_count catch 1; const gpa = t.allocator; const n = GroupClosure.contextEnd(context_alignment, context.len); const gc: *GroupClosure = @ptrCast(@alignCast(gpa.alignedAlloc(u8, .of(GroupClosure), n) catch { @@ -657,7 +683,6 @@ fn groupAsync( .closure = .{ .cancel_tid = .none, .start = GroupClosure.start, - .is_concurrent = false, }, .t = t, .group = group, @@ -674,26 +699,36 @@ fn groupAsync( gc.node = .{ .next = @ptrCast(@alignCast(group.token)) }; group.token = &gc.node; - const thread_capacity = cpu_count - 1 + t.concurrent_count; + if (t.available_thread_count == 0) { + if (t.cpu_count != 0 and t.threads.items.len >= t.cpu_count) { + t.mutex.unlock(); + gc.free(gpa); + return start(group, context.ptr); + } - t.threads.ensureTotalCapacityPrecise(gpa, thread_capacity) catch { - t.mutex.unlock(); - gc.free(gpa); - return start(group, context.ptr); - }; - - t.run_queue.prepend(&gc.closure.node); - - if (t.threads.items.len < thread_capacity) { - const thread = std.Thread.spawn(.{ .stack_size = t.stack_size }, worker, .{t}) catch { - assert(t.run_queue.popFirst() == &gc.closure.node); + t.threads.ensureUnusedCapacity(gpa, 1) catch { t.mutex.unlock(); gc.free(gpa); return start(group, context.ptr); }; + + const thread = std.Thread.spawn( + .{ .stack_size = t.stack_size }, + worker, + .{t}, + ) catch { + t.mutex.unlock(); + gc.free(gpa); + return start(group, context.ptr); + }; + t.threads.appendAssumeCapacity(thread); + } else { + t.available_thread_count -= 1; } + t.run_queue.prepend(&gc.closure.node); + // This needs to be done before unlocking the mutex to avoid a race with // the associated task finishing. const group_state: *std.atomic.Value(usize) = @ptrCast(&group.state);