diff --git a/lib/std/Io/Threaded.zig b/lib/std/Io/Threaded.zig index a7ab4f2f91..9aef333e8a 100644 --- a/lib/std/Io/Threaded.zig +++ b/lib/std/Io/Threaded.zig @@ -36,30 +36,47 @@ comptime { if (@TypeOf(posix.IOV_MAX) != void) assert(max_iovecs_len <= posix.IOV_MAX); } +const CancelId = enum(usize) { + none = 0, + canceling = std.math.maxInt(usize), + _, + + const ThreadId = if (std.Thread.use_pthreads) std.c.pthread_t else std.Thread.Id; + + fn currentThread() CancelId { + if (std.Thread.use_pthreads) { + return @enumFromInt(@intFromPtr(std.c.pthread_self())); + } else { + return @enumFromInt(std.Thread.getCurrentId()); + } + } + + fn toThreadId(cancel_id: CancelId) ThreadId { + if (std.Thread.use_pthreads) { + return @ptrFromInt(@intFromEnum(cancel_id)); + } else { + return @intCast(@intFromEnum(cancel_id)); + } + } +}; + const Closure = struct { start: Start, node: std.SinglyLinkedList.Node = .{}, - cancel_tid: std.Thread.Id, + cancel_tid: CancelId, /// Whether this task bumps minimum number of threads in the pool. is_concurrent: bool, const Start = *const fn (*Closure) void; - const canceling_tid: std.Thread.Id = switch (@typeInfo(std.Thread.Id)) { - .int => |int_info| switch (int_info.signedness) { - .signed => -1, - .unsigned => std.math.maxInt(std.Thread.Id), - }, - .pointer => @ptrFromInt(std.math.maxInt(usize)), - else => @compileError("unsupported std.Thread.Id: " ++ @typeName(std.Thread.Id)), - }; - fn requestCancel(closure: *Closure) void { - switch (@atomicRmw(std.Thread.Id, &closure.cancel_tid, .Xchg, canceling_tid, .acq_rel)) { - 0, canceling_tid => {}, + switch (@atomicRmw(CancelId, &closure.cancel_tid, .Xchg, .canceling, .acq_rel)) { + .none, .canceling => {}, else => |tid| switch (native_os) { - .linux => _ = std.os.linux.tgkill(std.os.linux.getpid(), @bitCast(tid), posix.SIG.IO), - else => {}, + .linux => _ = std.os.linux.tgkill(std.os.linux.getpid(), @bitCast(tid.toThreadId()), posix.SIG.IO), + else => if (std.Thread.use_pthreads) { + assert(std.c.pthread_kill(tid.toThreadId(), posix.SIG.IO) == 0); + }, }, } } @@ -342,9 +359,9 @@ const AsyncClosure = struct { fn start(closure: *Closure) void { const ac: *AsyncClosure = @alignCast(@fieldParentPtr("closure", closure)); - const tid = std.Thread.getCurrentId(); - if (@cmpxchgStrong(std.Thread.Id, &closure.cancel_tid, 0, tid, .acq_rel, .acquire)) |cancel_tid| { - assert(cancel_tid == Closure.canceling_tid); + const tid: CancelId = .currentThread(); + if (@cmpxchgStrong(CancelId, &closure.cancel_tid, .none, tid, .acq_rel, .acquire)) |cancel_tid| { + assert(cancel_tid == .canceling); // Even though we already know the task is canceled, we must still // run the closure in order to make the return value valid and in // case there are side effects. @@ -355,8 +372,8 @@ const AsyncClosure = struct { // In case a cancel happens after successful task completion, prevents // signal from being delivered to the thread in `requestCancel`. - if (@cmpxchgStrong(std.Thread.Id, &closure.cancel_tid, tid, 0, .acq_rel, .acquire)) |cancel_tid| { - assert(cancel_tid == Closure.canceling_tid); + if (@cmpxchgStrong(CancelId, &closure.cancel_tid, tid, .none, .acq_rel, .acquire)) |cancel_tid| { + assert(cancel_tid == .canceling); } if (@atomicRmw(?*ResetEvent, &ac.select_condition, .Xchg, done_reset_event, .release)) |select_reset| { @@ -418,7 +435,7 @@ fn async( ac.* = .{ .closure = .{ - .cancel_tid = 0, + .cancel_tid = .none, .start = AsyncClosure.start, .is_concurrent = false, }, @@ -488,7 +505,7 @@ fn concurrent( ac.* = .{ .closure = .{ - .cancel_tid = 0, + .cancel_tid = .none, .start = AsyncClosure.start, .is_concurrent = true, }, @@ -540,12 +557,12 @@ const GroupClosure = struct { fn start(closure: *Closure) void { const gc: *GroupClosure = @alignCast(@fieldParentPtr("closure", closure)); - const tid = std.Thread.getCurrentId(); + const tid: CancelId = .currentThread(); const group = gc.group; const group_state: *std.atomic.Value(usize) = @ptrCast(&group.state); const reset_event: *ResetEvent = @ptrCast(&group.context); - if (@cmpxchgStrong(std.Thread.Id, &closure.cancel_tid, 0, tid, .acq_rel, .acquire)) |cancel_tid| { - assert(cancel_tid == Closure.canceling_tid); + if (@cmpxchgStrong(CancelId, &closure.cancel_tid, .none, tid, .acq_rel, .acquire)) |cancel_tid| { + assert(cancel_tid == .canceling); // We already know the task is canceled before running the callback. Since all closures // in a Group have void return type, we can return early. syncFinish(group_state, reset_event); @@ -557,8 +574,8 @@ const GroupClosure = struct { // In case a cancel happens after successful task completion, prevents // signal from being delivered to the thread in `requestCancel`. - if (@cmpxchgStrong(std.Thread.Id, &closure.cancel_tid, tid, 0, .acq_rel, .acquire)) |cancel_tid| { - assert(cancel_tid == Closure.canceling_tid); + if (@cmpxchgStrong(CancelId, &closure.cancel_tid, tid, .none, .acq_rel, .acquire)) |cancel_tid| { + assert(cancel_tid == .canceling); } syncFinish(group_state, reset_event); @@ -626,7 +643,7 @@ fn groupAsync( })); gc.* = .{ .closure = .{ - .cancel_tid = 0, + .cancel_tid = .none, .start = GroupClosure.start, .is_concurrent = false, }, @@ -771,7 +788,7 @@ fn cancelRequested(userdata: ?*anyopaque) bool { const t: *Threaded = @ptrCast(@alignCast(userdata)); _ = t; const closure = current_closure orelse return false; - return @atomicLoad(std.Thread.Id, &closure.cancel_tid, .acquire) == Closure.canceling_tid; + return @atomicLoad(CancelId, &closure.cancel_tid, .acquire) == .canceling; } fn checkCancel(t: *Threaded) error{Canceled}!void { diff --git a/lib/std/c.zig b/lib/std/c.zig index 877b9ae4e3..024c76ddc4 100644 --- a/lib/std/c.zig +++ b/lib/std/c.zig @@ -10763,6 +10763,8 @@ pub const pthread_setname_np = switch (native_os) { }; pub extern "c" fn pthread_getname_np(thread: pthread_t, name: [*:0]u8, len: usize) c_int; +pub extern "c" fn pthread_kill(pthread_t, signal: c_int) c_int; + pub const pthread_threadid_np = switch (native_os) { .macos, .ios, .tvos, .watchos, .visionos => private.pthread_threadid_np, else => {},