std.Io.Threaded: implement cancellation for pthreads

not to be confused with pthread_cancel, which is a useless API.
This commit is contained in:
Andrew Kelley 2025-10-24 13:05:13 -07:00
parent 85e159e652
commit a8f95e5176
2 changed files with 47 additions and 28 deletions

View File

@ -36,30 +36,47 @@ comptime {
if (@TypeOf(posix.IOV_MAX) != void) assert(max_iovecs_len <= posix.IOV_MAX);
}
const CancelId = enum(usize) {
none = 0,
canceling = std.math.maxInt(usize),
_,
const ThreadId = if (std.Thread.use_pthreads) std.c.pthread_t else std.Thread.Id;
fn currentThread() CancelId {
if (std.Thread.use_pthreads) {
return @enumFromInt(@intFromPtr(std.c.pthread_self()));
} else {
return @enumFromInt(std.Thread.getCurrentId());
}
}
fn toThreadId(cancel_id: CancelId) ThreadId {
if (std.Thread.use_pthreads) {
return @ptrFromInt(@intFromEnum(cancel_id));
} else {
return @intCast(@intFromEnum(cancel_id));
}
}
};
const Closure = struct {
start: Start,
node: std.SinglyLinkedList.Node = .{},
cancel_tid: std.Thread.Id,
cancel_tid: CancelId,
/// Whether this task bumps minimum number of threads in the pool.
is_concurrent: bool,
const Start = *const fn (*Closure) void;
const canceling_tid: std.Thread.Id = switch (@typeInfo(std.Thread.Id)) {
.int => |int_info| switch (int_info.signedness) {
.signed => -1,
.unsigned => std.math.maxInt(std.Thread.Id),
},
.pointer => @ptrFromInt(std.math.maxInt(usize)),
else => @compileError("unsupported std.Thread.Id: " ++ @typeName(std.Thread.Id)),
};
fn requestCancel(closure: *Closure) void {
switch (@atomicRmw(std.Thread.Id, &closure.cancel_tid, .Xchg, canceling_tid, .acq_rel)) {
0, canceling_tid => {},
switch (@atomicRmw(CancelId, &closure.cancel_tid, .Xchg, .canceling, .acq_rel)) {
.none, .canceling => {},
else => |tid| switch (native_os) {
.linux => _ = std.os.linux.tgkill(std.os.linux.getpid(), @bitCast(tid), posix.SIG.IO),
else => {},
.linux => _ = std.os.linux.tgkill(std.os.linux.getpid(), @bitCast(tid.toThreadId()), posix.SIG.IO),
else => if (std.Thread.use_pthreads) {
assert(std.c.pthread_kill(tid.toThreadId(), posix.SIG.IO) == 0);
},
},
}
}
@ -342,9 +359,9 @@ const AsyncClosure = struct {
fn start(closure: *Closure) void {
const ac: *AsyncClosure = @alignCast(@fieldParentPtr("closure", closure));
const tid = std.Thread.getCurrentId();
if (@cmpxchgStrong(std.Thread.Id, &closure.cancel_tid, 0, tid, .acq_rel, .acquire)) |cancel_tid| {
assert(cancel_tid == Closure.canceling_tid);
const tid: CancelId = .currentThread();
if (@cmpxchgStrong(CancelId, &closure.cancel_tid, .none, tid, .acq_rel, .acquire)) |cancel_tid| {
assert(cancel_tid == .canceling);
// Even though we already know the task is canceled, we must still
// run the closure in order to make the return value valid and in
// case there are side effects.
@ -355,8 +372,8 @@ const AsyncClosure = struct {
// In case a cancel happens after successful task completion, prevents
// signal from being delivered to the thread in `requestCancel`.
if (@cmpxchgStrong(std.Thread.Id, &closure.cancel_tid, tid, 0, .acq_rel, .acquire)) |cancel_tid| {
assert(cancel_tid == Closure.canceling_tid);
if (@cmpxchgStrong(CancelId, &closure.cancel_tid, tid, .none, .acq_rel, .acquire)) |cancel_tid| {
assert(cancel_tid == .canceling);
}
if (@atomicRmw(?*ResetEvent, &ac.select_condition, .Xchg, done_reset_event, .release)) |select_reset| {
@ -418,7 +435,7 @@ fn async(
ac.* = .{
.closure = .{
.cancel_tid = 0,
.cancel_tid = .none,
.start = AsyncClosure.start,
.is_concurrent = false,
},
@ -488,7 +505,7 @@ fn concurrent(
ac.* = .{
.closure = .{
.cancel_tid = 0,
.cancel_tid = .none,
.start = AsyncClosure.start,
.is_concurrent = true,
},
@ -540,12 +557,12 @@ const GroupClosure = struct {
fn start(closure: *Closure) void {
const gc: *GroupClosure = @alignCast(@fieldParentPtr("closure", closure));
const tid = std.Thread.getCurrentId();
const tid: CancelId = .currentThread();
const group = gc.group;
const group_state: *std.atomic.Value(usize) = @ptrCast(&group.state);
const reset_event: *ResetEvent = @ptrCast(&group.context);
if (@cmpxchgStrong(std.Thread.Id, &closure.cancel_tid, 0, tid, .acq_rel, .acquire)) |cancel_tid| {
assert(cancel_tid == Closure.canceling_tid);
if (@cmpxchgStrong(CancelId, &closure.cancel_tid, .none, tid, .acq_rel, .acquire)) |cancel_tid| {
assert(cancel_tid == .canceling);
// We already know the task is canceled before running the callback. Since all closures
// in a Group have void return type, we can return early.
syncFinish(group_state, reset_event);
@ -557,8 +574,8 @@ const GroupClosure = struct {
// In case a cancel happens after successful task completion, prevents
// signal from being delivered to the thread in `requestCancel`.
if (@cmpxchgStrong(std.Thread.Id, &closure.cancel_tid, tid, 0, .acq_rel, .acquire)) |cancel_tid| {
assert(cancel_tid == Closure.canceling_tid);
if (@cmpxchgStrong(CancelId, &closure.cancel_tid, tid, .none, .acq_rel, .acquire)) |cancel_tid| {
assert(cancel_tid == .canceling);
}
syncFinish(group_state, reset_event);
@ -626,7 +643,7 @@ fn groupAsync(
}));
gc.* = .{
.closure = .{
.cancel_tid = 0,
.cancel_tid = .none,
.start = GroupClosure.start,
.is_concurrent = false,
},
@ -771,7 +788,7 @@ fn cancelRequested(userdata: ?*anyopaque) bool {
const t: *Threaded = @ptrCast(@alignCast(userdata));
_ = t;
const closure = current_closure orelse return false;
return @atomicLoad(std.Thread.Id, &closure.cancel_tid, .acquire) == Closure.canceling_tid;
return @atomicLoad(CancelId, &closure.cancel_tid, .acquire) == .canceling;
}
fn checkCancel(t: *Threaded) error{Canceled}!void {

View File

@ -10763,6 +10763,8 @@ pub const pthread_setname_np = switch (native_os) {
};
pub extern "c" fn pthread_getname_np(thread: pthread_t, name: [*:0]u8, len: usize) c_int;
pub extern "c" fn pthread_kill(pthread_t, signal: c_int) c_int;
pub const pthread_threadid_np = switch (native_os) {
.macos, .ios, .tvos, .watchos, .visionos => private.pthread_threadid_np,
else => {},