mirror of
https://github.com/ziglang/zig.git
synced 2026-02-15 13:58:27 +00:00
Merge pull request #16207 from Luukdegram/wasi-threads
WASI: Implement experimental threading support
This commit is contained in:
commit
622c5f3200
@ -28,6 +28,8 @@ else if (use_pthreads)
|
||||
PosixThreadImpl
|
||||
else if (target.os.tag == .linux)
|
||||
LinuxThreadImpl
|
||||
else if (target.os.tag == .wasi)
|
||||
WasiThreadImpl
|
||||
else
|
||||
UnsupportedImpl;
|
||||
|
||||
@ -266,6 +268,7 @@ pub const Id = switch (target.os.tag) {
|
||||
.freebsd,
|
||||
.openbsd,
|
||||
.haiku,
|
||||
.wasi,
|
||||
=> u32,
|
||||
.macos, .ios, .watchos, .tvos => u64,
|
||||
.windows => os.windows.DWORD,
|
||||
@ -296,6 +299,8 @@ pub const SpawnConfig = struct {
|
||||
|
||||
/// Size in bytes of the Thread's stack
|
||||
stack_size: usize = 16 * 1024 * 1024,
|
||||
/// The allocator to be used to allocate memory for the to-be-spawned thread
|
||||
allocator: ?std.mem.Allocator = null,
|
||||
};
|
||||
|
||||
pub const SpawnError = error{
|
||||
@ -733,6 +738,291 @@ const PosixThreadImpl = struct {
|
||||
}
|
||||
};
|
||||
|
||||
const WasiThreadImpl = struct {
|
||||
thread: *WasiThread,
|
||||
|
||||
pub const ThreadHandle = i32;
|
||||
threadlocal var tls_thread_id: Id = 0;
|
||||
|
||||
const WasiThread = struct {
|
||||
/// Thread ID
|
||||
tid: Atomic(i32) = Atomic(i32).init(0),
|
||||
/// Contains all memory which was allocated to bootstrap this thread, including:
|
||||
/// - Guard page
|
||||
/// - Stack
|
||||
/// - TLS segment
|
||||
/// - `Instance`
|
||||
/// All memory is freed upon call to `join`
|
||||
memory: []u8,
|
||||
/// The allocator used to allocate the thread's memory,
|
||||
/// which is also used during `join` to ensure clean-up.
|
||||
allocator: std.mem.Allocator,
|
||||
/// The current state of the thread.
|
||||
state: State = State.init(.running),
|
||||
};
|
||||
|
||||
/// A meta-data structure used to bootstrap a thread
|
||||
const Instance = struct {
|
||||
thread: WasiThread,
|
||||
/// Contains the offset to the new __tls_base.
|
||||
/// The offset starting from the memory's base.
|
||||
tls_offset: usize,
|
||||
/// Contains the offset to the stack for the newly spawned thread.
|
||||
/// The offset is calculated starting from the memory's base.
|
||||
stack_offset: usize,
|
||||
/// Contains the raw pointer value to the wrapper which holds all arguments
|
||||
/// for the callback.
|
||||
raw_ptr: usize,
|
||||
/// Function pointer to a wrapping function which will call the user's
|
||||
/// function upon thread spawn. The above mentioned pointer will be passed
|
||||
/// to this function pointer as its argument.
|
||||
call_back: *const fn (usize) void,
|
||||
/// When a thread is in `detached` state, we must free all of its memory
|
||||
/// upon thread completion. However, as this is done while still within
|
||||
/// the thread, we must first jump back to the main thread's stack or else
|
||||
/// we end up freeing the stack that we're currently using.
|
||||
original_stack_pointer: [*]u8,
|
||||
};
|
||||
|
||||
const State = Atomic(enum(u8) { running, completed, detached });
|
||||
|
||||
fn getCurrentId() Id {
|
||||
return tls_thread_id;
|
||||
}
|
||||
|
||||
fn getHandle(self: Impl) ThreadHandle {
|
||||
return self.thread.tid.load(.SeqCst);
|
||||
}
|
||||
|
||||
fn detach(self: Impl) void {
|
||||
switch (self.thread.state.swap(.detached, .SeqCst)) {
|
||||
.running => {},
|
||||
.completed => self.join(),
|
||||
.detached => unreachable,
|
||||
}
|
||||
}
|
||||
|
||||
fn join(self: Impl) void {
|
||||
defer {
|
||||
// Create a copy of the allocator so we do not free the reference to the
|
||||
// original allocator while freeing the memory.
|
||||
var allocator = self.thread.allocator;
|
||||
allocator.free(self.thread.memory);
|
||||
}
|
||||
|
||||
var spin: u8 = 10;
|
||||
while (true) {
|
||||
const tid = self.thread.tid.load(.SeqCst);
|
||||
if (tid == 0) {
|
||||
break;
|
||||
}
|
||||
|
||||
if (spin > 0) {
|
||||
spin -= 1;
|
||||
std.atomic.spinLoopHint();
|
||||
continue;
|
||||
}
|
||||
|
||||
const result = asm (
|
||||
\\ local.get %[ptr]
|
||||
\\ local.get %[expected]
|
||||
\\ i64.const -1 # infinite
|
||||
\\ memory.atomic.wait32 0
|
||||
\\ local.set %[ret]
|
||||
: [ret] "=r" (-> u32),
|
||||
: [ptr] "r" (&self.thread.tid.value),
|
||||
[expected] "r" (tid),
|
||||
);
|
||||
switch (result) {
|
||||
0 => continue, // ok
|
||||
1 => continue, // expected =! loaded
|
||||
2 => unreachable, // timeout (infinite)
|
||||
else => unreachable,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn spawn(config: std.Thread.SpawnConfig, comptime f: anytype, args: anytype) !WasiThreadImpl {
|
||||
if (config.allocator == null) return error.OutOfMemory; // an allocator is required to spawn a WASI-thread
|
||||
|
||||
// Wrapping struct required to hold the user-provided function arguments.
|
||||
const Wrapper = struct {
|
||||
args: @TypeOf(args),
|
||||
fn entry(ptr: usize) void {
|
||||
const w: *@This() = @ptrFromInt(ptr);
|
||||
@call(.auto, f, w.args);
|
||||
}
|
||||
};
|
||||
|
||||
var stack_offset: usize = undefined;
|
||||
var tls_offset: usize = undefined;
|
||||
var wrapper_offset: usize = undefined;
|
||||
var instance_offset: usize = undefined;
|
||||
|
||||
// Calculate the bytes we have to allocate to store all thread information, including:
|
||||
// - The actual stack for the thread
|
||||
// - The TLS segment
|
||||
// - `Instance` - containing information about how to call the user's function.
|
||||
const map_bytes = blk: {
|
||||
// start with atleast a single page, which is used as a guard to prevent
|
||||
// other threads clobbering our new thread.
|
||||
// Unfortunately, WebAssembly has no notion of read-only segments, so this
|
||||
// is only a best effort.
|
||||
var bytes: usize = std.wasm.page_size;
|
||||
|
||||
bytes = std.mem.alignForward(usize, bytes, 16); // align stack to 16 bytes
|
||||
stack_offset = bytes;
|
||||
bytes += @max(std.wasm.page_size, config.stack_size);
|
||||
|
||||
bytes = std.mem.alignForward(usize, bytes, __tls_align());
|
||||
tls_offset = bytes;
|
||||
bytes += __tls_size();
|
||||
|
||||
bytes = std.mem.alignForward(usize, bytes, @alignOf(Wrapper));
|
||||
wrapper_offset = bytes;
|
||||
bytes += @sizeOf(Wrapper);
|
||||
|
||||
bytes = std.mem.alignForward(usize, bytes, @alignOf(Instance));
|
||||
instance_offset = bytes;
|
||||
bytes += @sizeOf(Instance);
|
||||
|
||||
bytes = std.mem.alignForward(usize, bytes, std.wasm.page_size);
|
||||
break :blk bytes;
|
||||
};
|
||||
|
||||
// Allocate the amount of memory required for all meta data.
|
||||
const allocated_memory = try config.allocator.?.alloc(u8, map_bytes);
|
||||
|
||||
const wrapper: *Wrapper = @ptrCast(@alignCast(&allocated_memory[wrapper_offset]));
|
||||
wrapper.* = .{ .args = args };
|
||||
|
||||
const instance: *Instance = @ptrCast(@alignCast(&allocated_memory[instance_offset]));
|
||||
instance.* = .{
|
||||
.thread = .{ .memory = allocated_memory, .allocator = config.allocator.? },
|
||||
.tls_offset = tls_offset,
|
||||
.stack_offset = stack_offset,
|
||||
.raw_ptr = @intFromPtr(wrapper),
|
||||
.call_back = &Wrapper.entry,
|
||||
.original_stack_pointer = __get_stack_pointer(),
|
||||
};
|
||||
|
||||
const tid = spawnWasiThread(instance);
|
||||
// The specification says any value lower than 0 indicates an error.
|
||||
// The values of such error are unspecified. WASI-Libc treats it as EAGAIN.
|
||||
if (tid < 0) {
|
||||
return error.SystemResources;
|
||||
}
|
||||
instance.thread.tid.store(tid, .SeqCst);
|
||||
|
||||
return .{ .thread = &instance.thread };
|
||||
}
|
||||
|
||||
/// Bootstrap procedure, called by the host environment after thread creation.
|
||||
export fn wasi_thread_start(tid: i32, arg: *Instance) void {
|
||||
if (builtin.single_threaded) {
|
||||
// ensure function is not analyzed in single-threaded mode
|
||||
return;
|
||||
}
|
||||
__set_stack_pointer(arg.thread.memory.ptr + arg.stack_offset);
|
||||
__wasm_init_tls(arg.thread.memory.ptr + arg.tls_offset);
|
||||
@atomicStore(u32, &WasiThreadImpl.tls_thread_id, @intCast(tid), .SeqCst);
|
||||
|
||||
// Finished bootstrapping, call user's procedure.
|
||||
arg.call_back(arg.raw_ptr);
|
||||
|
||||
switch (arg.thread.state.swap(.completed, .SeqCst)) {
|
||||
.running => {
|
||||
// reset the Thread ID
|
||||
asm volatile (
|
||||
\\ local.get %[ptr]
|
||||
\\ i32.const 0
|
||||
\\ i32.atomic.store 0
|
||||
:
|
||||
: [ptr] "r" (&arg.thread.tid.value),
|
||||
);
|
||||
|
||||
// Wake the main thread listening to this thread
|
||||
asm volatile (
|
||||
\\ local.get %[ptr]
|
||||
\\ i32.const 1 # waiters
|
||||
\\ memory.atomic.notify 0
|
||||
\\ drop # no need to know the waiters
|
||||
:
|
||||
: [ptr] "r" (&arg.thread.tid.value),
|
||||
);
|
||||
},
|
||||
.completed => unreachable,
|
||||
.detached => {
|
||||
// restore the original stack pointer so we can free the memory
|
||||
// without having to worry about freeing the stack
|
||||
__set_stack_pointer(arg.original_stack_pointer);
|
||||
// Ensure a copy so we don't free the allocator reference itself
|
||||
var allocator = arg.thread.allocator;
|
||||
allocator.free(arg.thread.memory);
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
/// Asks the host to create a new thread for us.
|
||||
/// Newly created thread will call `wasi_tread_start` with the thread ID as well
|
||||
/// as the input `arg` that was provided to `spawnWasiThread`
|
||||
const spawnWasiThread = @"thread-spawn";
|
||||
extern "wasi" fn @"thread-spawn"(arg: *Instance) i32;
|
||||
|
||||
/// Initializes the TLS data segment starting at `memory`.
|
||||
/// This is a synthetic function, generated by the linker.
|
||||
extern fn __wasm_init_tls(memory: [*]u8) void;
|
||||
|
||||
/// Returns a pointer to the base of the TLS data segment for the current thread
|
||||
inline fn __tls_base() [*]u8 {
|
||||
return asm (
|
||||
\\ .globaltype __tls_base, i32
|
||||
\\ global.get __tls_base
|
||||
\\ local.set %[ret]
|
||||
: [ret] "=r" (-> [*]u8),
|
||||
);
|
||||
}
|
||||
|
||||
/// Returns the size of the TLS segment
|
||||
inline fn __tls_size() u32 {
|
||||
return asm volatile (
|
||||
\\ .globaltype __tls_size, i32, immutable
|
||||
\\ global.get __tls_size
|
||||
\\ local.set %[ret]
|
||||
: [ret] "=r" (-> u32),
|
||||
);
|
||||
}
|
||||
|
||||
/// Returns the alignment of the TLS segment
|
||||
inline fn __tls_align() u32 {
|
||||
return asm (
|
||||
\\ .globaltype __tls_align, i32, immutable
|
||||
\\ global.get __tls_align
|
||||
\\ local.set %[ret]
|
||||
: [ret] "=r" (-> u32),
|
||||
);
|
||||
}
|
||||
|
||||
/// Allows for setting the stack pointer in the WebAssembly module.
|
||||
inline fn __set_stack_pointer(addr: [*]u8) void {
|
||||
asm volatile (
|
||||
\\ local.get %[ptr]
|
||||
\\ global.set __stack_pointer
|
||||
:
|
||||
: [ptr] "r" (addr),
|
||||
);
|
||||
}
|
||||
|
||||
/// Returns the current value of the stack pointer
|
||||
inline fn __get_stack_pointer() [*]u8 {
|
||||
return asm (
|
||||
\\ global.get __stack_pointer
|
||||
\\ local.set %[stack_ptr]
|
||||
: [stack_ptr] "=r" (-> [*]u8),
|
||||
);
|
||||
}
|
||||
};
|
||||
|
||||
const LinuxThreadImpl = struct {
|
||||
const linux = os.linux;
|
||||
|
||||
|
||||
@ -73,6 +73,8 @@ else if (builtin.os.tag == .openbsd)
|
||||
OpenbsdImpl
|
||||
else if (builtin.os.tag == .dragonfly)
|
||||
DragonflyImpl
|
||||
else if (builtin.target.isWasm())
|
||||
WasmImpl
|
||||
else if (std.Thread.use_pthreads)
|
||||
PosixImpl
|
||||
else
|
||||
@ -446,6 +448,49 @@ const DragonflyImpl = struct {
|
||||
}
|
||||
};
|
||||
|
||||
const WasmImpl = struct {
|
||||
fn wait(ptr: *const Atomic(u32), expect: u32, timeout: ?u64) error{Timeout}!void {
|
||||
if (!comptime std.Target.wasm.featureSetHas(builtin.target.cpu.features, .atomics)) {
|
||||
@compileError("WASI target missing cpu feature 'atomics'");
|
||||
}
|
||||
const to: i64 = if (timeout) |to| @intCast(to) else -1;
|
||||
const result = asm (
|
||||
\\local.get %[ptr]
|
||||
\\local.get %[expected]
|
||||
\\local.get %[timeout]
|
||||
\\memory.atomic.wait32 0
|
||||
\\local.set %[ret]
|
||||
: [ret] "=r" (-> u32),
|
||||
: [ptr] "r" (&ptr.value),
|
||||
[expected] "r" (@as(i32, @bitCast(expect))),
|
||||
[timeout] "r" (to),
|
||||
);
|
||||
switch (result) {
|
||||
0 => {}, // ok
|
||||
1 => {}, // expected =! loaded
|
||||
2 => return error.Timeout,
|
||||
else => unreachable,
|
||||
}
|
||||
}
|
||||
|
||||
fn wake(ptr: *const Atomic(u32), max_waiters: u32) void {
|
||||
if (!comptime std.Target.wasm.featureSetHas(builtin.target.cpu.features, .atomics)) {
|
||||
@compileError("WASI target missing cpu feature 'atomics'");
|
||||
}
|
||||
assert(max_waiters != 0);
|
||||
const woken_count = asm (
|
||||
\\local.get %[ptr]
|
||||
\\local.get %[waiters]
|
||||
\\memory.atomic.notify 0
|
||||
\\local.set %[ret]
|
||||
: [ret] "=r" (-> u32),
|
||||
: [ptr] "r" (&ptr.value),
|
||||
[waiters] "r" (max_waiters),
|
||||
);
|
||||
_ = woken_count; // can be 0 when linker flag 'shared-memory' is not enabled
|
||||
}
|
||||
};
|
||||
|
||||
/// Modified version of linux's futex and Go's sema to implement userspace wait queues with pthread:
|
||||
/// https://code.woboq.org/linux/linux/kernel/futex.c.html
|
||||
/// https://go.dev/src/runtime/sema.go
|
||||
|
||||
@ -557,6 +557,7 @@ pub const InitOptions = struct {
|
||||
linker_allow_shlib_undefined: ?bool = null,
|
||||
linker_bind_global_refs_locally: ?bool = null,
|
||||
linker_import_memory: ?bool = null,
|
||||
linker_export_memory: ?bool = null,
|
||||
linker_import_symbols: bool = false,
|
||||
linker_import_table: bool = false,
|
||||
linker_export_table: bool = false,
|
||||
@ -1463,6 +1464,7 @@ pub fn create(gpa: Allocator, options: InitOptions) !*Compilation {
|
||||
.module_definition_file = options.linker_module_definition_file,
|
||||
.sort_section = options.linker_sort_section,
|
||||
.import_memory = options.linker_import_memory orelse false,
|
||||
.export_memory = options.linker_export_memory orelse !(options.linker_import_memory orelse false),
|
||||
.import_symbols = options.linker_import_symbols,
|
||||
.import_table = options.linker_import_table,
|
||||
.export_table = options.linker_export_table,
|
||||
@ -2324,6 +2326,7 @@ fn addNonIncrementalStuffToCacheManifest(comp: *Compilation, man: *Cache.Manifes
|
||||
|
||||
// WASM specific stuff
|
||||
man.hash.add(comp.bin_file.options.import_memory);
|
||||
man.hash.add(comp.bin_file.options.export_memory);
|
||||
man.hash.addOptional(comp.bin_file.options.initial_memory);
|
||||
man.hash.addOptional(comp.bin_file.options.max_memory);
|
||||
man.hash.add(comp.bin_file.options.shared_memory);
|
||||
|
||||
@ -133,6 +133,7 @@ pub const Options = struct {
|
||||
compress_debug_sections: CompressDebugSections,
|
||||
bind_global_refs_locally: bool,
|
||||
import_memory: bool,
|
||||
export_memory: bool,
|
||||
import_symbols: bool,
|
||||
import_table: bool,
|
||||
export_table: bool,
|
||||
|
||||
@ -4251,6 +4251,7 @@ fn linkWithLLD(wasm: *Wasm, comp: *Compilation, prog_node: *std.Progress.Node) !
|
||||
man.hash.addOptional(wasm.base.options.stack_size_override);
|
||||
man.hash.add(wasm.base.options.build_id);
|
||||
man.hash.add(wasm.base.options.import_memory);
|
||||
man.hash.add(wasm.base.options.export_memory);
|
||||
man.hash.add(wasm.base.options.import_table);
|
||||
man.hash.add(wasm.base.options.export_table);
|
||||
man.hash.addOptional(wasm.base.options.initial_memory);
|
||||
@ -4338,6 +4339,10 @@ fn linkWithLLD(wasm: *Wasm, comp: *Compilation, prog_node: *std.Progress.Node) !
|
||||
try argv.append("--import-memory");
|
||||
}
|
||||
|
||||
if (wasm.base.options.export_memory) {
|
||||
try argv.append("--export-memory");
|
||||
}
|
||||
|
||||
if (wasm.base.options.import_table) {
|
||||
assert(!wasm.base.options.export_table);
|
||||
try argv.append("--import-table");
|
||||
|
||||
34
src/main.zig
34
src/main.zig
@ -544,6 +544,7 @@ const usage_build_generic =
|
||||
\\ -dead_strip (Darwin) remove functions and data that are unreachable by the entry point or exported symbols
|
||||
\\ -dead_strip_dylibs (Darwin) remove dylibs that are unreachable by the entry point or exported symbols
|
||||
\\ --import-memory (WebAssembly) import memory from the environment
|
||||
\\ --export-memory (WebAssembly) export memory to the host (Default unless --import-memory used)
|
||||
\\ --import-symbols (WebAssembly) import missing symbols from the host environment
|
||||
\\ --import-table (WebAssembly) import function table from the host environment
|
||||
\\ --export-table (WebAssembly) export function table to the host environment
|
||||
@ -787,6 +788,7 @@ fn buildOutputType(
|
||||
var linker_allow_shlib_undefined: ?bool = null;
|
||||
var linker_bind_global_refs_locally: ?bool = null;
|
||||
var linker_import_memory: ?bool = null;
|
||||
var linker_export_memory: ?bool = null;
|
||||
var linker_import_symbols: bool = false;
|
||||
var linker_import_table: bool = false;
|
||||
var linker_export_table: bool = false;
|
||||
@ -1419,6 +1421,8 @@ fn buildOutputType(
|
||||
}
|
||||
} else if (mem.eql(u8, arg, "--import-memory")) {
|
||||
linker_import_memory = true;
|
||||
} else if (mem.eql(u8, arg, "--export-memory")) {
|
||||
linker_export_memory = true;
|
||||
} else if (mem.eql(u8, arg, "--import-symbols")) {
|
||||
linker_import_symbols = true;
|
||||
} else if (mem.eql(u8, arg, "--import-table")) {
|
||||
@ -1982,6 +1986,8 @@ fn buildOutputType(
|
||||
linker_bind_global_refs_locally = true;
|
||||
} else if (mem.eql(u8, arg, "--import-memory")) {
|
||||
linker_import_memory = true;
|
||||
} else if (mem.eql(u8, arg, "--export-memory")) {
|
||||
linker_export_memory = true;
|
||||
} else if (mem.eql(u8, arg, "--import-symbols")) {
|
||||
linker_import_symbols = true;
|
||||
} else if (mem.eql(u8, arg, "--import-table")) {
|
||||
@ -2422,15 +2428,28 @@ fn buildOutputType(
|
||||
link_libcpp = true;
|
||||
}
|
||||
|
||||
if (target_info.target.cpu.arch.isWasm() and linker_shared_memory) {
|
||||
if (output_mode == .Obj) {
|
||||
fatal("shared memory is not allowed in object files", .{});
|
||||
if (target_info.target.cpu.arch.isWasm()) blk: {
|
||||
if (single_threaded == null) {
|
||||
single_threaded = true;
|
||||
}
|
||||
if (linker_shared_memory) {
|
||||
if (output_mode == .Obj) {
|
||||
fatal("shared memory is not allowed in object files", .{});
|
||||
}
|
||||
|
||||
if (!target_info.target.cpu.features.isEnabled(@intFromEnum(std.Target.wasm.Feature.atomics)) or
|
||||
!target_info.target.cpu.features.isEnabled(@intFromEnum(std.Target.wasm.Feature.bulk_memory)))
|
||||
{
|
||||
fatal("'atomics' and 'bulk-memory' features must be enabled to use shared memory", .{});
|
||||
}
|
||||
break :blk;
|
||||
}
|
||||
|
||||
if (!target_info.target.cpu.features.isEnabled(@intFromEnum(std.Target.wasm.Feature.atomics)) or
|
||||
!target_info.target.cpu.features.isEnabled(@intFromEnum(std.Target.wasm.Feature.bulk_memory)))
|
||||
{
|
||||
fatal("'atomics' and 'bulk-memory' features must be enabled to use shared memory", .{});
|
||||
// Single-threaded is the default for WebAssembly, so only when the user specified `-fno_single-threaded`
|
||||
// can they enable multithreaded WebAssembly builds.
|
||||
const is_single_threaded = single_threaded.?;
|
||||
if (!is_single_threaded) {
|
||||
fatal("'-fno-single-threaded' requires the linker feature shared-memory to be enabled using '--shared-memory'", .{});
|
||||
}
|
||||
}
|
||||
|
||||
@ -3113,6 +3132,7 @@ fn buildOutputType(
|
||||
.linker_allow_shlib_undefined = linker_allow_shlib_undefined,
|
||||
.linker_bind_global_refs_locally = linker_bind_global_refs_locally,
|
||||
.linker_import_memory = linker_import_memory,
|
||||
.linker_export_memory = linker_export_memory,
|
||||
.linker_import_symbols = linker_import_symbols,
|
||||
.linker_import_table = linker_import_table,
|
||||
.linker_export_table = linker_export_table,
|
||||
|
||||
@ -208,7 +208,8 @@ pub fn supports_fpic(target: std.Target) bool {
|
||||
}
|
||||
|
||||
pub fn isSingleThreaded(target: std.Target) bool {
|
||||
return target.isWasm();
|
||||
_ = target;
|
||||
return false;
|
||||
}
|
||||
|
||||
/// Valgrind supports more, but Zig does not support them yet.
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user