diff --git a/lib/std/Io.zig b/lib/std/Io.zig index 40f213f3de..da06b19df2 100644 --- a/lib/std/Io.zig +++ b/lib/std/Io.zig @@ -633,11 +633,6 @@ pub const VTable = struct { result: []u8, result_alignment: std.mem.Alignment, ) void, - /// Returns whether the current thread of execution is known to have - /// been requested to cancel. - /// - /// Thread-safe. - cancelRequested: *const fn (?*anyopaque) bool, /// Executes `start` asynchronously in a manner such that it cleans itself /// up. This mode does not support results, await, or cancel. diff --git a/lib/std/Io/Threaded.zig b/lib/std/Io/Threaded.zig index 2d46ff373e..8a5c0d5758 100644 --- a/lib/std/Io/Threaded.zig +++ b/lib/std/Io/Threaded.zig @@ -12,22 +12,16 @@ const Io = std.Io; const net = std.Io.net; const HostName = std.Io.net.HostName; const IpAddress = std.Io.net.IpAddress; -const Allocator = std.mem.Allocator; const Alignment = std.mem.Alignment; const assert = std.debug.assert; const posix = std.posix; -/// Thread-safe. -allocator: Allocator, -mutex: std.Thread.Mutex = .{}, -cond: std.Thread.Condition = .{}, -run_queue: std.SinglyLinkedList = .{}, -join_requested: bool = false, -threads: std.ArrayList(std.Thread), -stack_size: usize, -thread_capacity: std.atomic.Value(ThreadCapacity), -thread_capacity_error: ?std.Thread.CpuCountError, -concurrent_count: usize, +main_thread: Thread, +stack_size: usize = default_stack_size, +capacity: std.atomic.Value(Capacity), +capacity_error: ?std.Thread.CpuCountError, +concurrent_limit: Io.Limit = .unlimited, +pid: Pid = .unknown, wsa: if (is_windows) Wsa else struct {} = .{}, @@ -35,22 +29,626 @@ have_signal_handler: bool, old_sig_io: if (have_sig_io) posix.Sigaction else void, old_sig_pipe: if (have_sig_pipe) posix.Sigaction else void, -pub const ThreadCapacity = enum(usize) { +pub const Pid = enum(if (posix.pid_t == void) u0 else posix.pid_t) { unknown = 0, _, +}; - pub fn init(n: usize) ThreadCapacity { - assert(n != 0); +pub const Thread = struct { + /// The value that needs to be passed to pthread_kill or tgkill in order to + /// send a signal. + signal_id: SignalId, + /// Points to the next thread in the list. Singly-linked so that + /// it can be updated lock-free. + list_node: std.SinglyLinkedList.Node = .{}, + run_queue: std.SinglyLinkedList.Node = .{}, + current_closure: ?*Closure = null, + completion: Completion, + mutex: std.Thread.Mutex, + cond: std.Thread.Condition, + join_requested: bool, + + threadlocal var current: *Thread = undefined; + + const SignalId = if (use_pthreads) std.c.pthread_t else std.Thread.Id; + + const Completion = switch (native_os) { + .windows => @compileError("TODO"), + .linux => struct { + state: State = State.init(.running), + child_tid: std.atomic.Value(i32) = std.atomic.Value(i32).init(1), + parent_tid: i32 = undefined, + mapped: []align(std.heap.page_size_min) u8, + + /// State to synchronize detachment of spawner thread to spawned thread + const State = std.atomic.Value(enum(switch (builtin.zig_backend) { + .stage2_riscv64 => u32, + else => u8, + }) { + running, + detached, + completed, + }); + + + /// Calls `munmap(mapped.ptr, mapped.len)` then `exit(1)` without touching the stack (which lives in `mapped.ptr`). + /// Ported over from musl libc's pthread detached implementation: + /// https://github.com/ifduyue/musl/search?q=__unmapself + fn freeAndExit(self: *Completion) noreturn { + switch (builtin.target.cpu.arch) { + .x86 => asm volatile ( + \\ movl $91, %%eax # SYS_munmap + \\ movl %[ptr], %%ebx + \\ movl %[len], %%ecx + \\ int $128 + \\ movl $1, %%eax # SYS_exit + \\ movl $0, %%ebx + \\ int $128 + : + : [ptr] "r" (@intFromPtr(self.mapped.ptr)), + [len] "r" (self.mapped.len), + : .{ .memory = true }), + .x86_64 => asm volatile (switch (builtin.target.abi) { + .gnux32, .muslx32 => + \\ movl $0x4000000b, %%eax # SYS_munmap + \\ syscall + \\ movl $0x4000003c, %%eax # SYS_exit + \\ xor %%rdi, %%rdi + \\ syscall + , + else => + \\ movl $11, %%eax # SYS_munmap + \\ syscall + \\ movl $60, %%eax # SYS_exit + \\ xor %%rdi, %%rdi + \\ syscall + , + } + : + : [ptr] "{rdi}" (@intFromPtr(self.mapped.ptr)), + [len] "{rsi}" (self.mapped.len), + ), + .arm, .armeb, .thumb, .thumbeb => asm volatile ( + \\ mov r7, #91 // SYS_munmap + \\ mov r0, %[ptr] + \\ mov r1, %[len] + \\ svc 0 + \\ mov r7, #1 // SYS_exit + \\ mov r0, #0 + \\ svc 0 + : + : [ptr] "r" (@intFromPtr(self.mapped.ptr)), + [len] "r" (self.mapped.len), + : .{ .memory = true }), + .aarch64, .aarch64_be => asm volatile ( + \\ mov x8, #215 // SYS_munmap + \\ mov x0, %[ptr] + \\ mov x1, %[len] + \\ svc 0 + \\ mov x8, #93 // SYS_exit + \\ mov x0, #0 + \\ svc 0 + : + : [ptr] "r" (@intFromPtr(self.mapped.ptr)), + [len] "r" (self.mapped.len), + : .{ .memory = true }), + .alpha => asm volatile ( + \\ ldi $0, 73 # SYS_munmap + \\ mov %[ptr], $16 + \\ mov %[len], $17 + \\ callsys + \\ ldi $0, 1 # SYS_exit + \\ ldi $16, 0 + \\ callsys + : + : [ptr] "r" (@intFromPtr(self.mapped.ptr)), + [len] "r" (self.mapped.len), + : .{ .memory = true }), + .hexagon => asm volatile ( + \\ r6 = #215 // SYS_munmap + \\ r0 = %[ptr] + \\ r1 = %[len] + \\ trap0(#1) + \\ r6 = #93 // SYS_exit + \\ r0 = #0 + \\ trap0(#1) + : + : [ptr] "r" (@intFromPtr(self.mapped.ptr)), + [len] "r" (self.mapped.len), + : .{ .memory = true }), + .hppa => asm volatile ( + \\ ldi 91, %%r20 /* SYS_munmap */ + \\ copy %[ptr], %%r26 + \\ copy %[len], %%r25 + \\ ble 0x100(%%sr2, %%r0) + \\ ldi 1, %%r20 /* SYS_exit */ + \\ ldi 0, %%r26 + \\ ble 0x100(%%sr2, %%r0) + : + : [ptr] "r" (@intFromPtr(self.mapped.ptr)), + [len] "r" (self.mapped.len), + : .{ .memory = true }), + .m68k => asm volatile ( + \\ move.l #91, %%d0 // SYS_munmap + \\ move.l %[ptr], %%d1 + \\ move.l %[len], %%d2 + \\ trap #0 + \\ move.l #1, %%d0 // SYS_exit + \\ move.l #0, %%d1 + \\ trap #0 + : + : [ptr] "r" (@intFromPtr(self.mapped.ptr)), + [len] "r" (self.mapped.len), + : .{ .memory = true }), + .microblaze, .microblazeel => asm volatile ( + \\ ori r12, r0, 91 # SYS_munmap + \\ ori r5, %[ptr], 0 + \\ ori r6, %[len], 0 + \\ brki r14, 0x8 + \\ ori r12, r0, 1 # SYS_exit + \\ or r5, r0, r0 + \\ brki r14, 0x8 + : + : [ptr] "r" (@intFromPtr(self.mapped.ptr)), + [len] "r" (self.mapped.len), + : .{ .memory = true }), + // We set `sp` to the address of the current function as a workaround for a Linux + // kernel bug that caused syscalls to return EFAULT if the stack pointer is invalid. + // The bug was introduced in 46e12c07b3b9603c60fc1d421ff18618241cb081 and fixed in + // 7928eb0370d1133d0d8cd2f5ddfca19c309079d5. + .mips, .mipsel => asm volatile ( + \\ move $sp, $t9 + \\ li $v0, 4091 # SYS_munmap + \\ move $a0, %[ptr] + \\ move $a1, %[len] + \\ syscall + \\ li $v0, 4001 # SYS_exit + \\ li $a0, 0 + \\ syscall + : + : [ptr] "r" (@intFromPtr(self.mapped.ptr)), + [len] "r" (self.mapped.len), + : .{ .memory = true }), + .mips64, .mips64el => asm volatile (switch (builtin.target.abi) { + .gnuabin32, .muslabin32 => + \\ li $v0, 6011 # SYS_munmap + \\ move $a0, %[ptr] + \\ move $a1, %[len] + \\ syscall + \\ li $v0, 6058 # SYS_exit + \\ li $a0, 0 + \\ syscall + , + else => + \\ li $v0, 5011 # SYS_munmap + \\ move $a0, %[ptr] + \\ move $a1, %[len] + \\ syscall + \\ li $v0, 5058 # SYS_exit + \\ li $a0, 0 + \\ syscall + , + } + : + : [ptr] "r" (@intFromPtr(self.mapped.ptr)), + [len] "r" (self.mapped.len), + : .{ .memory = true }), + .or1k => asm volatile ( + \\ l.ori r11, r0, 215 # SYS_munmap + \\ l.ori r3, %[ptr] + \\ l.ori r4, %[len] + \\ l.sys 1 + \\ l.ori r11, r0, 93 # SYS_exit + \\ l.ori r3, r0, r0 + \\ l.sys 1 + : + : [ptr] "r" (@intFromPtr(self.mapped.ptr)), + [len] "r" (self.mapped.len), + : .{ .memory = true }), + .powerpc, .powerpcle, .powerpc64, .powerpc64le => asm volatile ( + \\ li 0, 91 # SYS_munmap + \\ mr 3, %[ptr] + \\ mr 4, %[len] + \\ sc + \\ li 0, 1 # SYS_exit + \\ li 3, 0 + \\ sc + \\ blr + : + : [ptr] "r" (@intFromPtr(self.mapped.ptr)), + [len] "r" (self.mapped.len), + : .{ .memory = true }), + .riscv32, .riscv64 => asm volatile ( + \\ li a7, 215 # SYS_munmap + \\ mv a0, %[ptr] + \\ mv a1, %[len] + \\ ecall + \\ li a7, 93 # SYS_exit + \\ mv a0, zero + \\ ecall + : + : [ptr] "r" (@intFromPtr(self.mapped.ptr)), + [len] "r" (self.mapped.len), + : .{ .memory = true }), + .s390x => asm volatile ( + \\ lgr %%r2, %[ptr] + \\ lgr %%r3, %[len] + \\ svc 91 # SYS_munmap + \\ lghi %%r2, 0 + \\ svc 1 # SYS_exit + : + : [ptr] "r" (@intFromPtr(self.mapped.ptr)), + [len] "r" (self.mapped.len), + : .{ .memory = true }), + .sh, .sheb => asm volatile ( + \\ mov #91, r3 ! SYS_munmap + \\ mov %[ptr], r4 + \\ mov %[len], r5 + \\ trapa #31 + \\ or r0, r0 + \\ or r0, r0 + \\ or r0, r0 + \\ or r0, r0 + \\ or r0, r0 + \\ mov #1, r3 ! SYS_exit + \\ mov #0, r4 + \\ trapa #31 + \\ or r0, r0 + \\ or r0, r0 + \\ or r0, r0 + \\ or r0, r0 + \\ or r0, r0 + : + : [ptr] "r" (@intFromPtr(self.mapped.ptr)), + [len] "r" (self.mapped.len), + : .{ .memory = true }), + .sparc => asm volatile ( + \\ # See sparc64 comments below. + \\ 1: + \\ cmp %%fp, 0 + \\ beq 2f + \\ nop + \\ ba 1b + \\ restore + \\ 2: + \\ mov 73, %%g1 // SYS_munmap + \\ mov %[ptr], %%o0 + \\ mov %[len], %%o1 + \\ t 0x3 # ST_FLUSH_WINDOWS + \\ t 0x10 + \\ mov 1, %%g1 // SYS_exit + \\ mov 0, %%o0 + \\ t 0x10 + : + : [ptr] "r" (@intFromPtr(self.mapped.ptr)), + [len] "r" (self.mapped.len), + : .{ .memory = true }), + .sparc64 => asm volatile ( + \\ # SPARCs really don't like it when active stack frames + \\ # is unmapped (it will result in a segfault), so we + \\ # force-deactivate it by running `restore` until + \\ # all frames are cleared. + \\ 1: + \\ cmp %%fp, 0 + \\ beq 2f + \\ nop + \\ ba 1b + \\ restore + \\ 2: + \\ mov 73, %%g1 // SYS_munmap + \\ mov %[ptr], %%o0 + \\ mov %[len], %%o1 + \\ # Flush register window contents to prevent background + \\ # memory access before unmapping the stack. + \\ flushw + \\ t 0x6d + \\ mov 1, %%g1 // SYS_exit + \\ mov 0, %%o0 + \\ t 0x6d + : + : [ptr] "r" (@intFromPtr(self.mapped.ptr)), + [len] "r" (self.mapped.len), + : .{ .memory = true }), + .loongarch32, .loongarch64 => asm volatile ( + \\ or $a0, $zero, %[ptr] + \\ or $a1, $zero, %[len] + \\ ori $a7, $zero, 215 # SYS_munmap + \\ syscall 0 # call munmap + \\ ori $a0, $zero, 0 + \\ ori $a7, $zero, 93 # SYS_exit + \\ syscall 0 # call exit + : + : [ptr] "r" (@intFromPtr(self.mapped.ptr)), + [len] "r" (self.mapped.len), + : .{ .memory = true }), + else => |cpu_arch| @compileError("Unsupported linux arch: " ++ @tagName(cpu_arch)), + } + unreachable; + } + }, + else => void, + }; + + const AllocateError = error{OutOfMemory}; + + fn allocate(stack_size: usize) AllocateError!*Thread { + if (use_pthreads) { + @compileError("TODO"); + } else if (is_windows) { + @compileError("TODO"); + } else if (native_os == .linux) { + const linux = std.os.linux; + const page_size = std.heap.pageSize(); + + var guard_offset: usize = undefined; + var stack_offset: usize = undefined; + var tls_offset: usize = undefined; + var instance_offset: usize = undefined; + + const map_bytes = blk: { + var bytes: usize = page_size; + guard_offset = bytes; + + bytes += @max(page_size, stack_size); + bytes = std.mem.alignForward(usize, bytes, page_size); + stack_offset = bytes; + + bytes = std.mem.alignForward(usize, bytes, linux.tls.area_desc.alignment); + tls_offset = bytes; + bytes += linux.tls.area_desc.size; + + bytes = std.mem.alignForward(usize, bytes, @alignOf(Thread)); + instance_offset = bytes; + bytes += @sizeOf(Thread); + + bytes = std.mem.alignForward(usize, bytes, page_size); + break :blk bytes; + }; + + // Map all memory needed without read/write permissions to avoid + // committing the whole region right away. Anonymous mapping ensures + // file descriptor limits are not exceeded. + const mapped = posix.mmap( + null, + map_bytes, + posix.PROT.NONE, + .{ .TYPE = .PRIVATE, .ANONYMOUS = true }, + -1, + 0, + ) catch |err| switch (err) { + error.MemoryMappingNotSupported => unreachable, + error.AccessDenied => unreachable, + error.PermissionDenied => unreachable, + error.ProcessFdQuotaExceeded => unreachable, + error.SystemFdQuotaExceeded => unreachable, + error.MappingAlreadyExists => unreachable, + else => |e| return e, + }; + assert(mapped.len >= map_bytes); + errdefer posix.munmap(mapped); + + // map everything but the guard page as read/write + posix.mprotect( + @alignCast(mapped[guard_offset..]), + posix.PROT.READ | posix.PROT.WRITE, + ) catch |err| switch (err) { + error.AccessDenied => unreachable, + else => |e| return e, + }; + + // Prepare the TLS segment and prepare a user_desc struct when needed on x86 + var tls_ptr = linux.tls.prepareArea(mapped[tls_offset..]); + var user_desc: if (builtin.target.cpu.arch == .x86) linux.user_desc else void = undefined; + if (builtin.target.cpu.arch == .x86) { + defer tls_ptr = @intFromPtr(&user_desc); + user_desc = .{ + .entry_number = linux.tls.area_desc.gdt_entry_number, + .base_addr = tls_ptr, + .limit = 0xfffff, + .flags = .{ + .seg_32bit = 1, + .contents = 0, // Data + .read_exec_only = 0, + .limit_in_pages = 1, + .seg_not_present = 0, + .useable = 1, + }, + }; + } + + const instance: *Thread = @ptrCast(@alignCast(&mapped[instance_offset])); + instance.* = .{ + .signal_id = undefined, // Initialized on spawn. + .completion = .{ + .mapped = mapped, + .stack_offset = stack_offset, + }, + }; + return instance; + } else { + @compileError("unimplemented"); + } + } + + const SpawnError = error{ + ThreadQuotaExceeded, + SystemResources, + Unexpected, + }; + + fn spawn(thread: *Thread) SpawnError!void { + if (use_pthreads) { + const c = std.c; + const stack_size = {}; // TODO + + var attr: c.pthread_attr_t = undefined; + if (c.pthread_attr_init(&attr) != .SUCCESS) return error.SystemResources; + defer assert(c.pthread_attr_destroy(&attr) == .SUCCESS); + + assert(c.pthread_attr_setstacksize(&attr, stack_size) == .SUCCESS); + assert(c.pthread_attr_setguardsize(&attr, std.heap.pageSize()) == .SUCCESS); + + var handle: c.pthread_t = undefined; + switch (c.pthread_create( + &handle, + &attr, + posixStart, + @ptrCast(thread), + )) { + .SUCCESS => { + thread.signal_id = handle; + return; + }, + .AGAIN => return error.SystemResources, + .PERM => unreachable, + .INVAL => unreachable, + else => |err| return posix.unexpectedErrno(err), + } + @compileError("TODO"); + } else if (is_windows) { + @compileError("TODO"); + } else if (native_os == .linux) { + const linux = std.os.linux; + + const flags: u32 = linux.CLONE.THREAD | linux.CLONE.DETACHED | + linux.CLONE.VM | linux.CLONE.FS | linux.CLONE.FILES | + linux.CLONE.PARENT_SETTID | linux.CLONE.CHILD_CLEARTID | + linux.CLONE.SIGHAND | linux.CLONE.SYSVSEM | linux.CLONE.SETTLS; + + switch (linux.errno(linux.clone( + linuxStart, + @intFromPtr(&thread.completion.mapped[thread.completion.stack_offset]), + flags, + @intFromPtr(thread), + &thread.parent_tid, + thread.completion.tls_ptr, + &thread.child_tid.raw, + ))) { + .SUCCESS => return, + .AGAIN => return error.ThreadQuotaExceeded, + .INVAL => unreachable, + .NOMEM => return error.SystemResources, + .NOSPC => unreachable, + .PERM => unreachable, + .USERS => unreachable, + else => |err| return posix.unexpectedErrno(err), + } + } else { + @compileError("unimplemented"); + } + } + + fn linuxStart(raw_arg: usize) callconv(.c) u8 { + const t: *Thread = @ptrFromInt(raw_arg); + worker(t); + switch (t.completion.swap(.completed, .seq_cst)) { + .running => return 0, + .completed => unreachable, + .detached => t.completion.freeAndExit(), + } + unreachable; + } + + fn posixStart(raw_arg: ?*anyopaque) callconv(.c) ?*anyopaque { + const t: *Thread = @ptrCast(@alignCast(raw_arg)); + worker(t); + return null; + } + + fn worker(t: *Thread) void { + current = t; + + t.mutex.lock(); + + while (true) { + while (t.run_queue.popFirst()) |closure_node| { + t.mutex.unlock(); + const closure: *Closure = @fieldParentPtr("node", closure_node); + closure.start(closure); + t.mutex.lock(); + } + if (t.join_requested) break; + t.cond.wait(&t.mutex); + } + } + + fn checkCancel(thread: *Thread) error{Canceled}!void { + const closure = thread.current_closure orelse return; + switch (@cmpxchgStrong( + CancelStatus, + &closure.cancel_status, + .requested, + .acknowledged, + .acq_rel, + .acquire, + ) orelse return error.Canceled) { + .none => return, + .requested => unreachable, + .acknowledged => unreachable, + _ => return, + } + } + + fn beginSyscall(thread: *Thread) error{Canceled}!void { + const closure = thread.current_closure orelse return; + + switch (@cmpxchgStrong( + CancelStatus, + &closure.cancel_status, + .none, + thread.signal_id, + .acq_rel, + .acquire, + ) orelse return) { + .none => unreachable, + .requested => { + @atomicStore(CancelStatus, &closure.cancel_status, .acknowledged, .acquire); + return error.Canceled; + }, + .acknowledged => unreachable, + _ => unreachable, + } + } + + fn endSyscall(thread: *Thread) error{Canceled}!void { + const closure = thread.current_closure orelse return; + + switch (@cmpxchgStrong( + CancelStatus, + &closure.cancel_status, + thread.signal_id, + .none, + .acq_rel, + .release, + ) orelse return) { + .none => unreachable, + .requested => { + @atomicStore(CancelStatus, &closure.cancel_status, .acknowledged, .release); + return error.Canceled; + }, + .acknowledged => return, + _ => unreachable, + } + } +}; + +pub const Capacity = enum(isize) { + unknown = -30000, + _, + + pub fn init(n: isize) Capacity { + assert(n > 0); return @enumFromInt(n); } - pub fn get(tc: ThreadCapacity) ?usize { + pub fn get(tc: Capacity) ?usize { if (tc == .unknown) return null; return @intFromEnum(tc); } }; -threadlocal var current_closure: ?*Closure = null; +pub const default_stack_size = 16 * 1024 * 1024; +pub const use_pthreads = !is_windows and native_os != .wasi and builtin.link_libc; const max_iovecs_len = 8; const splat_buffer_size = 64; @@ -59,85 +657,108 @@ comptime { if (@TypeOf(posix.IOV_MAX) != void) assert(max_iovecs_len <= posix.IOV_MAX); } -const CancelId = enum(usize) { +const CancelStatus = enum(usize) { + /// Cancellation has neither been requested, nor checked. The async + /// operation will check status before entering a blocking syscall. + /// This is also the status used for uninteruptible tasks. none = 0, - canceling = std.math.maxInt(usize), + /// Cancellation has been requested and the status will be checked before + /// entering a blocking syscall. + requested = std.math.maxInt(usize) - 1, + /// Cancellation has been acknowledged and is in progress. Signals should + /// not be sent. + acknowledged = std.math.maxInt(usize), + /// Stores a `Thread.SignalId` and indicates that sending a signal to this thread + /// is needed in order to cancel. This state is set before going into + /// a blocking operation that needs to get unblocked via signal. _, - const ThreadId = if (std.Thread.use_pthreads) std.c.pthread_t else std.Thread.Id; + const Unpacked = union(enum) { + none, + requested, + acknowledeged, + signal_id: Thread.SignalId, + }; - fn currentThread() CancelId { - if (std.Thread.use_pthreads) { - return @enumFromInt(@intFromPtr(std.c.pthread_self())); - } else { - return @enumFromInt(std.Thread.getCurrentId()); - } - } - - fn toThreadId(cancel_id: CancelId) ThreadId { - if (std.Thread.use_pthreads) { - return @ptrFromInt(@intFromEnum(cancel_id)); - } else { - return @intCast(@intFromEnum(cancel_id)); - } + fn unpack(cs: CancelStatus) Unpacked { + return switch (cs) { + .none => .none, + .requested => .requested, + .acknowledged => .acknowledged, + _ => |signal_id| .{ .signal_id = signal_id }, + }; } }; const Closure = struct { start: Start, node: std.SinglyLinkedList.Node = .{}, - cancel_tid: CancelId, + cancel_status: CancelStatus, /// Whether this task bumps minimum number of threads in the pool. is_concurrent: bool, const Start = *const fn (*Closure) void; - fn requestCancel(closure: *Closure) void { - switch (@atomicRmw(CancelId, &closure.cancel_tid, .Xchg, .canceling, .acq_rel)) { - .none, .canceling => {}, - else => |tid| { - if (std.Thread.use_pthreads) { - const rc = std.c.pthread_kill(tid.toThreadId(), .IO); - if (is_debug) assert(rc == 0); - } else if (native_os == .linux) { - _ = std.os.linux.tgkill(std.os.linux.getpid(), @bitCast(tid.toThreadId()), .IO); - } - }, + fn requestCancel(closure: *Closure, t: *Threaded) void { + var signal_id = switch (@atomicRmw(CancelStatus, &closure.cancel_status, .Xchg, .requested, .monotonic).unpack()) { + .none, .acknowledged, .requested => return, + else => |signal_id| signal_id, + }; + // The task will enter a blocking syscall before checking for cancellation again. + // We can send a signal to interrupt the syscall, but if it arrives before + // the syscall instruction, it will be missed. Therefore, this code tries + // again until the cancellation request is acknowledged. + const max_attempts = 3; + for (0..max_attempts) |_| { + if (use_pthreads) { + const rc = std.c.pthread_kill(signal_id.toThreadId(), .IO); + if (is_debug) assert(rc == 0); + } else if (native_os == .linux) { + const pid: posix.pid_t = p: { + const cached_pid = @atomicLoad(Pid, &t.pid, .monotonic); + if (cached_pid != .unknown) break :p @intFromEnum(cached_pid); + const pid = std.os.linux.getpid(); + @atomicStore(Pid, &t.pid, @enumFromInt(pid), .monotonic); + break :p pid; + }; + _ = std.os.linux.tgkill(pid, @bitCast(signal_id.toThreadId()), .IO); + } else { + return; + } + + // TODO make this a nanosleep with 1 << attempt duration + std.Thread.yield() catch {}; + + switch (@atomicRmw(CancelStatus, &closure.cancel_status, .Xchg, .requested, .monotonic).unpack()) { + .requested => continue, + .none, .acknowledged => return, + else => |new_signal_id| signal_id = new_signal_id, + } } } }; -pub const InitError = std.Thread.CpuCountError || Allocator.Error; +pub const CpuCountError = error{ + PermissionDenied, + SystemResources, + Unsupported, +} || Io.UnexpectedError; /// Related: /// * `init_single_threaded` -pub fn init( - /// Must be threadsafe. Only used for the following functions: - /// * `Io.VTable.async` - /// * `Io.VTable.concurrent` - /// * `Io.VTable.groupAsync` - /// If these functions are avoided, then `Allocator.failing` may be passed - /// here. - gpa: Allocator, -) Threaded { +pub fn init() Threaded { const cpu_count = std.Thread.getCpuCount(); var t: Threaded = .{ - .allocator = gpa, .threads = .empty, - .stack_size = std.Thread.SpawnConfig.default_stack_size, - .thread_capacity = .init(if (cpu_count) |n| .init(n) else |_| .unknown), - .thread_capacity_error = if (cpu_count) |_| null else |e| e, + .capacity = .init(if (cpu_count) |n| .init(n) else |_| .unknown), + .capacity_error = if (cpu_count) |_| null else |e| e, .concurrent_count = 0, .old_sig_io = undefined, .old_sig_pipe = undefined, .have_signal_handler = false, }; - if (cpu_count) |n| { - t.threads.ensureTotalCapacityPrecise(gpa, n - 1) catch {}; - } else |_| {} - if (posix.Sigaction != void) { // This causes sending `posix.SIG.IO` to thread to interrupt blocking // syscalls, returning `posix.E.INTR`. @@ -161,11 +782,9 @@ pub fn init( /// * cancel requests have no effect. /// * `deinit` is safe, but unnecessary to call. pub const init_single_threaded: Threaded = .{ - .allocator = .failing, .threads = .empty, - .stack_size = std.Thread.SpawnConfig.default_stack_size, - .thread_capacity = .init(.init(1)), - .thread_capacity_error = null, + .capacity = .init(.init(1)), + .capacity_error = null, .concurrent_count = 0, .old_sig_io = undefined, .old_sig_pipe = undefined, @@ -173,9 +792,7 @@ pub const init_single_threaded: Threaded = .{ }; pub fn deinit(t: *Threaded) void { - const gpa = t.allocator; - t.join(); - t.threads.deinit(gpa); + join(t); if (is_windows and t.wsa.status == .initialized) { if (ws2_32.WSACleanup() != 0) recoverableOsBugDetected(); } @@ -186,46 +803,29 @@ pub fn deinit(t: *Threaded) void { t.* = undefined; } -pub fn setThreadCapacity(t: *Threaded, n: usize) void { - t.thread_capacity.store(.init(n), .monotonic); +pub fn setCapacity(t: *Threaded, n: usize) void { + t.capacity.store(.init(n), .monotonic); } -pub fn getThreadCapacity(t: *Threaded) ?usize { - return t.thread_capacity.load(.monotonic).get(); -} - -pub fn getCurrentThreadId() usize { - @panic("TODO"); +pub fn getCapacity(t: *Threaded) ?usize { + return t.capacity.load(.monotonic).get(); } fn join(t: *Threaded) void { if (builtin.single_threaded) return; + { - t.mutex.lock(); - defer t.mutex.unlock(); - t.join_requested = true; - } - t.cond.broadcast(); - for (t.threads.items) |thread| thread.join(); -} - -fn worker(t: *Threaded) void { - t.mutex.lock(); - defer t.mutex.unlock(); - - while (true) { - while (t.run_queue.popFirst()) |closure_node| { - t.mutex.unlock(); - const closure: *Closure = @fieldParentPtr("node", closure_node); - const is_concurrent = closure.is_concurrent; - closure.start(closure); - t.mutex.lock(); - if (is_concurrent) { - t.concurrent_count -= 1; + var it: ?*const std.SinglyLinkedList.Node = &t.main_thread.list_node; + while (it) |n| : (it = n.next) { + const thread: *Thread = @fieldParentPtr("list_node", n); + { + thread.mutex.lock(); + defer thread.mutex.unlock(); + thread.join_requested = true; + thread.cond.signal(); } + thread.join(); } - if (t.join_requested) break; - t.cond.wait(&t.mutex); } } @@ -237,7 +837,6 @@ pub fn io(t: *Threaded) Io { .concurrent = concurrent, .await = await, .cancel = cancel, - .cancelRequested = cancelRequested, .select = select, .groupAsync = groupAsync, @@ -333,7 +932,6 @@ pub fn ioBasic(t: *Threaded) Io { .concurrent = concurrent, .await = await, .cancel = cancel, - .cancelRequested = cancelRequested, .select = select, .groupAsync = groupAsync, @@ -428,22 +1026,10 @@ const AsyncClosure = struct { fn start(closure: *Closure) void { const ac: *AsyncClosure = @alignCast(@fieldParentPtr("closure", closure)); - const tid: CancelId = .currentThread(); - if (@cmpxchgStrong(CancelId, &closure.cancel_tid, .none, tid, .acq_rel, .acquire)) |cancel_tid| { - assert(cancel_tid == .canceling); - // Even though we already know the task is canceled, we must still - // run the closure in order to make the return value valid and in - // case there are side effects. - } - current_closure = closure; + const current_thread = Thread.current; + current_thread.current_closure = closure; ac.func(ac.contextPointer(), ac.resultPointer()); - current_closure = null; - - // In case a cancel happens after successful task completion, prevents - // signal from being delivered to the thread in `requestCancel`. - if (@cmpxchgStrong(CancelId, &closure.cancel_tid, tid, .none, .acq_rel, .acquire)) |cancel_tid| { - assert(cancel_tid == .canceling); - } + current_thread.current_closure = null; if (@atomicRmw(?*ResetEvent, &ac.select_condition, .Xchg, done_reset_event, .release)) |select_reset| { assert(select_reset != done_reset_event); @@ -464,14 +1050,14 @@ const AsyncClosure = struct { } fn init( - gpa: Allocator, + ac: *AsyncClosure, mode: enum { async, concurrent }, result_len: usize, result_alignment: Alignment, context: []const u8, context_alignment: Alignment, func: *const fn (context: *const anyopaque, result: *anyopaque) void, - ) Allocator.Error!*AsyncClosure { + ) void { const max_context_misalignment = context_alignment.toByteUnits() -| @alignOf(AsyncClosure); const worst_case_context_offset = context_alignment.forward(@sizeOf(AsyncClosure) + max_context_misalignment); const worst_case_result_offset = result_alignment.forward(worst_case_context_offset + context.len); @@ -529,7 +1115,7 @@ fn async( } const t: *Threaded = @ptrCast(@alignCast(userdata)); - const cpu_count = t.getThreadCapacity() orelse { + const may_spawn = takeCapacity(t) catch { return concurrent(userdata, result.len, result_alignment, context, context_alignment, start) catch { start(context.ptr, result.ptr); return null; @@ -538,42 +1124,25 @@ fn async( const gpa = t.allocator; const ac = AsyncClosure.init(gpa, .async, result.len, result_alignment, context, context_alignment, start) catch { + returnCapacity(t); start(context.ptr, result.ptr); return null; }; - t.mutex.lock(); + @memcpy(ac.contextPointer()[0..context.len], context); - const thread_capacity = cpu_count - 1 + t.concurrent_count; + if (may_spawn) { + // TODO Allocate Thread - t.threads.ensureTotalCapacityPrecise(gpa, thread_capacity) catch { - t.mutex.unlock(); - ac.deinit(gpa); - start(context.ptr, result.ptr); - return null; - }; + thread.run_queue.prepend(&ac.closure.node); - t.run_queue.prepend(&ac.closure.node); + // TODO start thread - if (t.threads.items.len < thread_capacity) { - const thread = std.Thread.spawn(.{ .stack_size = t.stack_size }, worker, .{t}) catch { - if (t.threads.items.len == 0) { - assert(t.run_queue.popFirst() == &ac.closure.node); - t.mutex.unlock(); - ac.deinit(gpa); - start(context.ptr, result.ptr); - return null; - } - // Rely on other workers to do it. - t.mutex.unlock(); - t.cond.signal(); - return @ptrCast(ac); - }; - t.threads.appendAssumeCapacity(thread); + return @ptrCast(ac); } - t.mutex.unlock(); - t.cond.signal(); + const thread = Thread.current; + thread.run_queue.prepend(&ac.closure.node); return @ptrCast(ac); } @@ -588,7 +1157,7 @@ fn concurrent( if (builtin.single_threaded) return error.ConcurrencyUnavailable; const t: *Threaded = @ptrCast(@alignCast(userdata)); - const cpu_count = t.getThreadCapacity() orelse 1; + const cpu_count = t.getCapacity() orelse 1; const gpa = t.allocator; const ac = AsyncClosure.init(gpa, .concurrent, result_len, result_alignment, context, context_alignment, start) catch { @@ -598,9 +1167,9 @@ fn concurrent( t.mutex.lock(); t.concurrent_count += 1; - const thread_capacity = cpu_count - 1 + t.concurrent_count; + const capacity = cpu_count - 1 + t.concurrent_count; - t.threads.ensureTotalCapacity(gpa, thread_capacity) catch { + t.threads.ensureTotalCapacity(gpa, capacity) catch { t.mutex.unlock(); ac.deinit(gpa); return error.ConcurrencyUnavailable; @@ -608,7 +1177,7 @@ fn concurrent( t.run_queue.prepend(&ac.closure.node); - if (t.threads.items.len < thread_capacity) { + if (t.threads.items.len < capacity) { const thread = std.Thread.spawn(.{ .stack_size = t.stack_size }, worker, .{t}) catch { assert(t.run_queue.popFirst() == &ac.closure.node); t.mutex.unlock(); @@ -635,24 +1204,13 @@ const GroupClosure = struct { fn start(closure: *Closure) void { const gc: *GroupClosure = @alignCast(@fieldParentPtr("closure", closure)); - const tid: CancelId = .currentThread(); + const current_thread = Thread.current; const group = gc.group; const group_state: *std.atomic.Value(usize) = @ptrCast(&group.state); const reset_event: *ResetEvent = @ptrCast(&group.context); - if (@cmpxchgStrong(CancelId, &closure.cancel_tid, .none, tid, .acq_rel, .acquire)) |cancel_tid| { - assert(cancel_tid == .canceling); - // Even though we already know the task is canceled, we must still - // run the closure in case there are side effects. - } - current_closure = closure; + current_thread.current_closure = closure; gc.func(group, gc.contextPointer()); - current_closure = null; - - // In case a cancel happens after successful task completion, prevents - // signal from being delivered to the thread in `requestCancel`. - if (@cmpxchgStrong(CancelId, &closure.cancel_tid, tid, .none, .acq_rel, .acquire)) |cancel_tid| { - assert(cancel_tid == .canceling); - } + current_thread.current_closure = null; const prev_state = group_state.fetchSub(sync_one_pending, .acq_rel); assert((prev_state / sync_one_pending) > 0); @@ -717,7 +1275,7 @@ fn groupAsync( if (builtin.single_threaded) return start(group, context.ptr); const t: *Threaded = @ptrCast(@alignCast(userdata)); - const cpu_count = t.getThreadCapacity() orelse 1; + const cpu_count = t.getCapacity() orelse 1; const gpa = t.allocator; const gc = GroupClosure.init(gpa, t, group, context, context_alignment, start) catch { @@ -730,9 +1288,9 @@ fn groupAsync( gc.node = .{ .next = @ptrCast(@alignCast(group.token)) }; group.token = &gc.node; - const thread_capacity = cpu_count - 1 + t.concurrent_count; + const capacity = cpu_count - 1 + t.concurrent_count; - t.threads.ensureTotalCapacityPrecise(gpa, thread_capacity) catch { + t.threads.ensureTotalCapacityPrecise(gpa, capacity) catch { t.mutex.unlock(); gc.deinit(gpa); return start(group, context.ptr); @@ -740,7 +1298,7 @@ fn groupAsync( t.run_queue.prepend(&gc.closure.node); - if (t.threads.items.len < thread_capacity) { + if (t.threads.items.len < capacity) { const thread = std.Thread.spawn(.{ .stack_size = t.stack_size }, worker, .{t}) catch { assert(t.run_queue.popFirst() == &gc.closure.node); t.mutex.unlock(); @@ -775,7 +1333,7 @@ fn groupWait(userdata: ?*anyopaque, group: *Io.Group, token: *anyopaque) void { var node: *std.SinglyLinkedList.Node = @ptrCast(@alignCast(token)); while (true) { const gc: *GroupClosure = @fieldParentPtr("node", node); - gc.closure.requestCancel(); + gc.closure.requestCancel(t); node = node.next orelse break; } reset_event.waitUncancelable(); @@ -801,7 +1359,7 @@ fn groupCancel(userdata: ?*anyopaque, group: *Io.Group, token: *anyopaque) void var node: *std.SinglyLinkedList.Node = @ptrCast(@alignCast(token)); while (true) { const gc: *GroupClosure = @fieldParentPtr("node", node); - gc.closure.requestCancel(); + gc.closure.requestCancel(t); node = node.next orelse break; } } @@ -844,21 +1402,10 @@ fn cancel( _ = result_alignment; const t: *Threaded = @ptrCast(@alignCast(userdata)); const ac: *AsyncClosure = @ptrCast(@alignCast(any_future)); - ac.closure.requestCancel(); + ac.closure.requestCancel(t); ac.waitAndDeinit(t.allocator, result); } -fn cancelRequested(userdata: ?*anyopaque) bool { - const t: *Threaded = @ptrCast(@alignCast(userdata)); - _ = t; - const closure = current_closure orelse return false; - return @atomicLoad(CancelId, &closure.cancel_tid, .acquire) == .canceling; -} - -fn checkCancel(t: *Threaded) error{Canceled}!void { - if (cancelRequested(t)) return error.Canceled; -} - fn mutexLock(userdata: ?*anyopaque, prev_state: Io.Mutex.State, mutex: *Io.Mutex) Io.Cancelable!void { if (builtin.single_threaded) unreachable; // Interface should have prevented this. if (native_os == .netbsd) @panic("TODO"); @@ -1043,35 +1590,47 @@ const dirMake = switch (native_os) { fn dirMakePosix(userdata: ?*anyopaque, dir: Io.Dir, sub_path: []const u8, mode: Io.Dir.Mode) Io.Dir.MakeError!void { const t: *Threaded = @ptrCast(@alignCast(userdata)); + _ = t; + const current_thread = Thread.current; var path_buffer: [posix.PATH_MAX]u8 = undefined; const sub_path_posix = try pathToPosix(sub_path, &path_buffer); + try current_thread.beginSyscall(); while (true) { - try t.checkCancel(); switch (posix.errno(posix.system.mkdirat(dir.handle, sub_path_posix, mode))) { - .SUCCESS => return, - .INTR => continue, - .CANCELED => return error.Canceled, - - .ACCES => return error.AccessDenied, - .BADF => |err| return errnoBug(err), // File descriptor used after closed. - .PERM => return error.PermissionDenied, - .DQUOT => return error.DiskQuota, - .EXIST => return error.PathAlreadyExists, - .FAULT => |err| return errnoBug(err), - .LOOP => return error.SymLinkLoop, - .MLINK => return error.LinkQuotaExceeded, - .NAMETOOLONG => return error.NameTooLong, - .NOENT => return error.FileNotFound, - .NOMEM => return error.SystemResources, - .NOSPC => return error.NoSpaceLeft, - .NOTDIR => return error.NotDir, - .ROFS => return error.ReadOnlyFileSystem, - // dragonfly: when dir_fd is unlinked from filesystem - .NOTCONN => return error.FileNotFound, - .ILSEQ => return error.BadPathName, - else => |err| return posix.unexpectedErrno(err), + .SUCCESS => { + try current_thread.endSyscall(); + break; + }, + .INTR => { + try current_thread.checkCancel(); + continue; + }, + else => |e| { + try current_thread.endSyscall(); + switch (e) { + .CANCELED => return error.Canceled, + .ACCES => return error.AccessDenied, + .BADF => |err| return errnoBug(err), // File descriptor used after closed. + .PERM => return error.PermissionDenied, + .DQUOT => return error.DiskQuota, + .EXIST => return error.PathAlreadyExists, + .FAULT => |err| return errnoBug(err), + .LOOP => return error.SymLinkLoop, + .MLINK => return error.LinkQuotaExceeded, + .NAMETOOLONG => return error.NameTooLong, + .NOENT => return error.FileNotFound, + .NOMEM => return error.SystemResources, + .NOSPC => return error.NoSpaceLeft, + .NOTDIR => return error.NotDir, + .ROFS => return error.ReadOnlyFileSystem, + // dragonfly: when dir_fd is unlinked from filesystem + .NOTCONN => return error.FileNotFound, + .ILSEQ => return error.BadPathName, + else => |err| return posix.unexpectedErrno(err), + } + }, } } } @@ -1981,6 +2540,7 @@ fn dirOpenFilePosix( flags: Io.File.OpenFlags, ) Io.File.OpenError!Io.File { const t: *Threaded = @ptrCast(@alignCast(userdata)); + const current_thread = Thread.current; var path_buffer: [posix.PATH_MAX]u8 = undefined; const sub_path_posix = try pathToPosix(sub_path, &path_buffer); @@ -2017,40 +2577,52 @@ fn dirOpenFilePosix( }, }; - const fd: posix.fd_t = while (true) { - try t.checkCancel(); + try current_thread.beginSyscall(); + const fd = while (true) { const rc = openat_sym(dir.handle, sub_path_posix, os_flags, @as(posix.mode_t, 0)); switch (posix.errno(rc)) { - .SUCCESS => break @intCast(rc), - .INTR => continue, - .CANCELED => return error.Canceled, - - .FAULT => |err| return errnoBug(err), - .INVAL => return error.BadPathName, - .BADF => |err| return errnoBug(err), // File descriptor used after closed. - .ACCES => return error.AccessDenied, - .FBIG => return error.FileTooBig, - .OVERFLOW => return error.FileTooBig, - .ISDIR => return error.IsDir, - .LOOP => return error.SymLinkLoop, - .MFILE => return error.ProcessFdQuotaExceeded, - .NAMETOOLONG => return error.NameTooLong, - .NFILE => return error.SystemFdQuotaExceeded, - .NODEV => return error.NoDevice, - .NOENT => return error.FileNotFound, - .SRCH => return error.ProcessNotFound, - .NOMEM => return error.SystemResources, - .NOSPC => return error.NoSpaceLeft, - .NOTDIR => return error.NotDir, - .PERM => return error.PermissionDenied, - .EXIST => return error.PathAlreadyExists, - .BUSY => return error.DeviceBusy, - .OPNOTSUPP => return error.FileLocksNotSupported, - .AGAIN => return error.WouldBlock, - .TXTBSY => return error.FileBusy, - .NXIO => return error.NoDevice, - .ILSEQ => return error.BadPathName, - else => |err| return posix.unexpectedErrno(err), + .SUCCESS => { + const fd: posix.fd_t = @intCast(rc); + errdefer posix.close(fd); + try current_thread.endSyscall(); + break fd; + }, + .INTR => { + try current_thread.checkCancel(); + continue; + }, + else => |e| { + try current_thread.endSyscall(); + switch (e) { + .CANCELED => return error.Canceled, + .FAULT => |err| return errnoBug(err), + .INVAL => return error.BadPathName, + .BADF => |err| return errnoBug(err), // File descriptor used after closed. + .ACCES => return error.AccessDenied, + .FBIG => return error.FileTooBig, + .OVERFLOW => return error.FileTooBig, + .ISDIR => return error.IsDir, + .LOOP => return error.SymLinkLoop, + .MFILE => return error.ProcessFdQuotaExceeded, + .NAMETOOLONG => return error.NameTooLong, + .NFILE => return error.SystemFdQuotaExceeded, + .NODEV => return error.NoDevice, + .NOENT => return error.FileNotFound, + .SRCH => return error.ProcessNotFound, + .NOMEM => return error.SystemResources, + .NOSPC => return error.NoSpaceLeft, + .NOTDIR => return error.NotDir, + .PERM => return error.PermissionDenied, + .EXIST => return error.PathAlreadyExists, + .BUSY => return error.DeviceBusy, + .OPNOTSUPP => return error.FileLocksNotSupported, + .AGAIN => return error.WouldBlock, + .TXTBSY => return error.FileBusy, + .NXIO => return error.NoDevice, + .ILSEQ => return error.BadPathName, + else => |err| return posix.unexpectedErrno(err), + } + }, } }; errdefer posix.close(fd); @@ -6208,6 +6780,7 @@ fn initializeWsa(t: *Threaded) error{NetworkDown}!void { fn doNothingSignalHandler(_: posix.SIG) callconv(.c) void {} + test { _ = @import("Threaded/test.zig"); } diff --git a/lib/std/Thread.zig b/lib/std/Thread.zig index 07b3c9076b..b917924b61 100644 --- a/lib/std/Thread.zig +++ b/lib/std/Thread.zig @@ -1,6 +1,6 @@ -//! This struct represents a kernel thread, and acts as a namespace for concurrency -//! primitives that operate on kernel threads. For concurrency primitives that support -//! both evented I/O and async I/O, see the respective names in the top level std namespace. +//! This struct represents a kernel thread, and acts as a namespace for +//! concurrency primitives that operate on kernel threads. For concurrency +//! primitives that interact with the I/O interface, see `std.Io`. const std = @import("std.zig"); const builtin = @import("builtin"); @@ -20,7 +20,7 @@ pub const RwLock = @import("Thread/RwLock.zig"); pub const Pool = @import("Thread/Pool.zig"); pub const WaitGroup = @import("Thread/WaitGroup.zig"); -pub const use_pthreads = native_os != .windows and native_os != .wasi and builtin.link_libc; +pub const use_pthreads = std.Io.Threaded.use_pthreads; /// A thread-safe logical boolean value which can be `set` and `unset`. /// @@ -422,12 +422,7 @@ pub fn getCurrentId() Id { return Impl.getCurrentId(); } -pub const CpuCountError = error{ - PermissionDenied, - SystemResources, - Unsupported, - Unexpected, -}; +pub const CpuCountError = std.Io.Threaded.CpuCountError; /// Returns the platforms view on the number of logical CPU cores available. /// @@ -446,7 +441,7 @@ pub const SpawnConfig = struct { /// The allocator to be used to allocate memory for the to-be-spawned thread allocator: ?std.mem.Allocator = null, - pub const default_stack_size = 16 * 1024 * 1024; + pub const default_stack_size = std.Io.Threaded.default_stack_size; }; pub const SpawnError = error{ @@ -1215,308 +1210,6 @@ const LinuxThreadImpl = struct { thread: *ThreadCompletion, - const ThreadCompletion = struct { - completion: Completion = Completion.init(.running), - child_tid: std.atomic.Value(i32) = std.atomic.Value(i32).init(1), - parent_tid: i32 = undefined, - mapped: []align(std.heap.page_size_min) u8, - - /// Calls `munmap(mapped.ptr, mapped.len)` then `exit(1)` without touching the stack (which lives in `mapped.ptr`). - /// Ported over from musl libc's pthread detached implementation: - /// https://github.com/ifduyue/musl/search?q=__unmapself - fn freeAndExit(self: *ThreadCompletion) noreturn { - switch (target.cpu.arch) { - .x86 => asm volatile ( - \\ movl $91, %%eax # SYS_munmap - \\ movl %[ptr], %%ebx - \\ movl %[len], %%ecx - \\ int $128 - \\ movl $1, %%eax # SYS_exit - \\ movl $0, %%ebx - \\ int $128 - : - : [ptr] "r" (@intFromPtr(self.mapped.ptr)), - [len] "r" (self.mapped.len), - : .{ .memory = true }), - .x86_64 => asm volatile (switch (target.abi) { - .gnux32, .muslx32 => - \\ movl $0x4000000b, %%eax # SYS_munmap - \\ syscall - \\ movl $0x4000003c, %%eax # SYS_exit - \\ xor %%rdi, %%rdi - \\ syscall - , - else => - \\ movl $11, %%eax # SYS_munmap - \\ syscall - \\ movl $60, %%eax # SYS_exit - \\ xor %%rdi, %%rdi - \\ syscall - , - } - : - : [ptr] "{rdi}" (@intFromPtr(self.mapped.ptr)), - [len] "{rsi}" (self.mapped.len), - ), - .arm, .armeb, .thumb, .thumbeb => asm volatile ( - \\ mov r7, #91 // SYS_munmap - \\ mov r0, %[ptr] - \\ mov r1, %[len] - \\ svc 0 - \\ mov r7, #1 // SYS_exit - \\ mov r0, #0 - \\ svc 0 - : - : [ptr] "r" (@intFromPtr(self.mapped.ptr)), - [len] "r" (self.mapped.len), - : .{ .memory = true }), - .aarch64, .aarch64_be => asm volatile ( - \\ mov x8, #215 // SYS_munmap - \\ mov x0, %[ptr] - \\ mov x1, %[len] - \\ svc 0 - \\ mov x8, #93 // SYS_exit - \\ mov x0, #0 - \\ svc 0 - : - : [ptr] "r" (@intFromPtr(self.mapped.ptr)), - [len] "r" (self.mapped.len), - : .{ .memory = true }), - .alpha => asm volatile ( - \\ ldi $0, 73 # SYS_munmap - \\ mov %[ptr], $16 - \\ mov %[len], $17 - \\ callsys - \\ ldi $0, 1 # SYS_exit - \\ ldi $16, 0 - \\ callsys - : - : [ptr] "r" (@intFromPtr(self.mapped.ptr)), - [len] "r" (self.mapped.len), - : .{ .memory = true }), - .hexagon => asm volatile ( - \\ r6 = #215 // SYS_munmap - \\ r0 = %[ptr] - \\ r1 = %[len] - \\ trap0(#1) - \\ r6 = #93 // SYS_exit - \\ r0 = #0 - \\ trap0(#1) - : - : [ptr] "r" (@intFromPtr(self.mapped.ptr)), - [len] "r" (self.mapped.len), - : .{ .memory = true }), - .hppa => asm volatile ( - \\ ldi 91, %%r20 /* SYS_munmap */ - \\ copy %[ptr], %%r26 - \\ copy %[len], %%r25 - \\ ble 0x100(%%sr2, %%r0) - \\ ldi 1, %%r20 /* SYS_exit */ - \\ ldi 0, %%r26 - \\ ble 0x100(%%sr2, %%r0) - : - : [ptr] "r" (@intFromPtr(self.mapped.ptr)), - [len] "r" (self.mapped.len), - : .{ .memory = true }), - .m68k => asm volatile ( - \\ move.l #91, %%d0 // SYS_munmap - \\ move.l %[ptr], %%d1 - \\ move.l %[len], %%d2 - \\ trap #0 - \\ move.l #1, %%d0 // SYS_exit - \\ move.l #0, %%d1 - \\ trap #0 - : - : [ptr] "r" (@intFromPtr(self.mapped.ptr)), - [len] "r" (self.mapped.len), - : .{ .memory = true }), - .microblaze, .microblazeel => asm volatile ( - \\ ori r12, r0, 91 # SYS_munmap - \\ ori r5, %[ptr], 0 - \\ ori r6, %[len], 0 - \\ brki r14, 0x8 - \\ ori r12, r0, 1 # SYS_exit - \\ or r5, r0, r0 - \\ brki r14, 0x8 - : - : [ptr] "r" (@intFromPtr(self.mapped.ptr)), - [len] "r" (self.mapped.len), - : .{ .memory = true }), - // We set `sp` to the address of the current function as a workaround for a Linux - // kernel bug that caused syscalls to return EFAULT if the stack pointer is invalid. - // The bug was introduced in 46e12c07b3b9603c60fc1d421ff18618241cb081 and fixed in - // 7928eb0370d1133d0d8cd2f5ddfca19c309079d5. - .mips, .mipsel => asm volatile ( - \\ move $sp, $t9 - \\ li $v0, 4091 # SYS_munmap - \\ move $a0, %[ptr] - \\ move $a1, %[len] - \\ syscall - \\ li $v0, 4001 # SYS_exit - \\ li $a0, 0 - \\ syscall - : - : [ptr] "r" (@intFromPtr(self.mapped.ptr)), - [len] "r" (self.mapped.len), - : .{ .memory = true }), - .mips64, .mips64el => asm volatile (switch (target.abi) { - .gnuabin32, .muslabin32 => - \\ li $v0, 6011 # SYS_munmap - \\ move $a0, %[ptr] - \\ move $a1, %[len] - \\ syscall - \\ li $v0, 6058 # SYS_exit - \\ li $a0, 0 - \\ syscall - , - else => - \\ li $v0, 5011 # SYS_munmap - \\ move $a0, %[ptr] - \\ move $a1, %[len] - \\ syscall - \\ li $v0, 5058 # SYS_exit - \\ li $a0, 0 - \\ syscall - , - } - : - : [ptr] "r" (@intFromPtr(self.mapped.ptr)), - [len] "r" (self.mapped.len), - : .{ .memory = true }), - .or1k => asm volatile ( - \\ l.ori r11, r0, 215 # SYS_munmap - \\ l.ori r3, %[ptr] - \\ l.ori r4, %[len] - \\ l.sys 1 - \\ l.ori r11, r0, 93 # SYS_exit - \\ l.ori r3, r0, r0 - \\ l.sys 1 - : - : [ptr] "r" (@intFromPtr(self.mapped.ptr)), - [len] "r" (self.mapped.len), - : .{ .memory = true }), - .powerpc, .powerpcle, .powerpc64, .powerpc64le => asm volatile ( - \\ li 0, 91 # SYS_munmap - \\ mr 3, %[ptr] - \\ mr 4, %[len] - \\ sc - \\ li 0, 1 # SYS_exit - \\ li 3, 0 - \\ sc - \\ blr - : - : [ptr] "r" (@intFromPtr(self.mapped.ptr)), - [len] "r" (self.mapped.len), - : .{ .memory = true }), - .riscv32, .riscv64 => asm volatile ( - \\ li a7, 215 # SYS_munmap - \\ mv a0, %[ptr] - \\ mv a1, %[len] - \\ ecall - \\ li a7, 93 # SYS_exit - \\ mv a0, zero - \\ ecall - : - : [ptr] "r" (@intFromPtr(self.mapped.ptr)), - [len] "r" (self.mapped.len), - : .{ .memory = true }), - .s390x => asm volatile ( - \\ lgr %%r2, %[ptr] - \\ lgr %%r3, %[len] - \\ svc 91 # SYS_munmap - \\ lghi %%r2, 0 - \\ svc 1 # SYS_exit - : - : [ptr] "r" (@intFromPtr(self.mapped.ptr)), - [len] "r" (self.mapped.len), - : .{ .memory = true }), - .sh, .sheb => asm volatile ( - \\ mov #91, r3 ! SYS_munmap - \\ mov %[ptr], r4 - \\ mov %[len], r5 - \\ trapa #31 - \\ or r0, r0 - \\ or r0, r0 - \\ or r0, r0 - \\ or r0, r0 - \\ or r0, r0 - \\ mov #1, r3 ! SYS_exit - \\ mov #0, r4 - \\ trapa #31 - \\ or r0, r0 - \\ or r0, r0 - \\ or r0, r0 - \\ or r0, r0 - \\ or r0, r0 - : - : [ptr] "r" (@intFromPtr(self.mapped.ptr)), - [len] "r" (self.mapped.len), - : .{ .memory = true }), - .sparc => asm volatile ( - \\ # See sparc64 comments below. - \\ 1: - \\ cmp %%fp, 0 - \\ beq 2f - \\ nop - \\ ba 1b - \\ restore - \\ 2: - \\ mov 73, %%g1 // SYS_munmap - \\ mov %[ptr], %%o0 - \\ mov %[len], %%o1 - \\ t 0x3 # ST_FLUSH_WINDOWS - \\ t 0x10 - \\ mov 1, %%g1 // SYS_exit - \\ mov 0, %%o0 - \\ t 0x10 - : - : [ptr] "r" (@intFromPtr(self.mapped.ptr)), - [len] "r" (self.mapped.len), - : .{ .memory = true }), - .sparc64 => asm volatile ( - \\ # SPARCs really don't like it when active stack frames - \\ # is unmapped (it will result in a segfault), so we - \\ # force-deactivate it by running `restore` until - \\ # all frames are cleared. - \\ 1: - \\ cmp %%fp, 0 - \\ beq 2f - \\ nop - \\ ba 1b - \\ restore - \\ 2: - \\ mov 73, %%g1 // SYS_munmap - \\ mov %[ptr], %%o0 - \\ mov %[len], %%o1 - \\ # Flush register window contents to prevent background - \\ # memory access before unmapping the stack. - \\ flushw - \\ t 0x6d - \\ mov 1, %%g1 // SYS_exit - \\ mov 0, %%o0 - \\ t 0x6d - : - : [ptr] "r" (@intFromPtr(self.mapped.ptr)), - [len] "r" (self.mapped.len), - : .{ .memory = true }), - .loongarch32, .loongarch64 => asm volatile ( - \\ or $a0, $zero, %[ptr] - \\ or $a1, $zero, %[len] - \\ ori $a7, $zero, 215 # SYS_munmap - \\ syscall 0 # call munmap - \\ ori $a0, $zero, 0 - \\ ori $a7, $zero, 93 # SYS_exit - \\ syscall 0 # call exit - : - : [ptr] "r" (@intFromPtr(self.mapped.ptr)), - [len] "r" (self.mapped.len), - : .{ .memory = true }), - else => |cpu_arch| @compileError("Unsupported linux arch: " ++ @tagName(cpu_arch)), - } - unreachable; - } - }; - fn spawn(config: SpawnConfig, comptime f: anytype, args: anytype) !Impl { const page_size = std.heap.pageSize(); const Args = @TypeOf(args);