Merge branch 'emekoi-windows-mutex'

This commit is contained in:
Andrew Kelley 2019-02-01 12:22:55 -05:00
commit 8d32d25619
7 changed files with 241 additions and 60 deletions

View File

@ -454,10 +454,10 @@ set(ZIG_STD_FILES
"crypto/hmac.zig"
"crypto/index.zig"
"crypto/md5.zig"
"crypto/poly1305.zig"
"crypto/sha1.zig"
"crypto/sha2.zig"
"crypto/sha3.zig"
"crypto/poly1305.zig"
"crypto/x25519.zig"
"cstr.zig"
"debug/failing_allocator.zig"
@ -566,9 +566,9 @@ set(ZIG_STD_FILES
"math/tan.zig"
"math/tanh.zig"
"math/trunc.zig"
"mem.zig"
"meta/index.zig"
"meta/trait.zig"
"mem.zig"
"mutex.zig"
"net.zig"
"os/child_process.zig"
@ -576,16 +576,16 @@ set(ZIG_STD_FILES
"os/darwin/errno.zig"
"os/epoch.zig"
"os/file.zig"
"os/freebsd/errno.zig"
"os/freebsd/index.zig"
"os/get_app_data_dir.zig"
"os/get_user_id.zig"
"os/index.zig"
"os/linux/arm64.zig"
"os/linux/errno.zig"
"os/linux/index.zig"
"os/linux/vdso.zig"
"os/linux/x86_64.zig"
"os/linux/arm64.zig"
"os/freebsd/errno.zig"
"os/freebsd/index.zig"
"os/path.zig"
"os/time.zig"
"os/uefi.zig"
@ -612,6 +612,16 @@ set(ZIG_STD_FILES
"special/compiler_rt/comparetf2.zig"
"special/compiler_rt/divti3.zig"
"special/compiler_rt/extendXfYf2.zig"
"special/compiler_rt/fixdfdi.zig"
"special/compiler_rt/fixdfsi.zig"
"special/compiler_rt/fixdfti.zig"
"special/compiler_rt/fixint.zig"
"special/compiler_rt/fixsfdi.zig"
"special/compiler_rt/fixsfsi.zig"
"special/compiler_rt/fixsfti.zig"
"special/compiler_rt/fixtfdi.zig"
"special/compiler_rt/fixtfsi.zig"
"special/compiler_rt/fixtfti.zig"
"special/compiler_rt/fixuint.zig"
"special/compiler_rt/fixunsdfdi.zig"
"special/compiler_rt/fixunsdfsi.zig"
@ -622,16 +632,6 @@ set(ZIG_STD_FILES
"special/compiler_rt/fixunstfdi.zig"
"special/compiler_rt/fixunstfsi.zig"
"special/compiler_rt/fixunstfti.zig"
"special/compiler_rt/fixint.zig"
"special/compiler_rt/fixdfdi.zig"
"special/compiler_rt/fixdfsi.zig"
"special/compiler_rt/fixdfti.zig"
"special/compiler_rt/fixsfdi.zig"
"special/compiler_rt/fixsfsi.zig"
"special/compiler_rt/fixsfti.zig"
"special/compiler_rt/fixtfdi.zig"
"special/compiler_rt/fixtfsi.zig"
"special/compiler_rt/fixtfti.zig"
"special/compiler_rt/floattidf.zig"
"special/compiler_rt/floattisf.zig"
"special/compiler_rt/floattitf.zig"
@ -656,6 +656,7 @@ set(ZIG_STD_FILES
"special/panic.zig"
"special/test_runner.zig"
"spinlock.zig"
"statically_initialized_mutex.zig"
"unicode.zig"
"zig/ast.zig"
"zig/index.zig"

View File

@ -982,13 +982,11 @@ test "fmt.format" {
context = BufPrintContext{ .remaining = buf1[0..] };
try formatType('a', "c", &context, error{BufferTooSmall}, bufPrintWrite);
res = buf1[0 .. buf1.len - context.remaining.len];
debug.warn("{}\n", res);
assert(mem.eql(u8, res, "a"));
context = BufPrintContext{ .remaining = buf1[0..] };
try formatType(0b1100, "b", &context, error{BufferTooSmall}, bufPrintWrite);
res = buf1[0 .. buf1.len - context.remaining.len];
debug.warn("{}\n", res);
assert(mem.eql(u8, res, "1100"));
}
{

View File

@ -9,6 +9,7 @@ pub const DynLib = @import("dynamic_library.zig").DynLib;
pub const HashMap = @import("hash_map.zig").HashMap;
pub const LinkedList = @import("linked_list.zig").LinkedList;
pub const Mutex = @import("mutex.zig").Mutex;
pub const StaticallyInitializedMutex = @import("statically_initialized_mutex.zig").StaticallyInitializedMutex;
pub const SegmentedList = @import("segmented_list.zig").SegmentedList;
pub const SpinLock = @import("spinlock.zig").SpinLock;
@ -55,6 +56,7 @@ test "std" {
_ = @import("hash_map.zig");
_ = @import("linked_list.zig");
_ = @import("mutex.zig");
_ = @import("statically_initialized_mutex.zig");
_ = @import("segmented_list.zig");
_ = @import("spinlock.zig");

View File

@ -5,74 +5,99 @@ const AtomicRmwOp = builtin.AtomicRmwOp;
const assert = std.debug.assert;
const SpinLock = std.SpinLock;
const linux = std.os.linux;
const windows = std.os.windows;
/// Lock may be held only once. If the same thread
/// tries to acquire the same mutex twice, it deadlocks.
/// This type must be initialized at runtime, and then deinitialized when no
/// longer needed, to free resources.
/// If you need static initialization, use std.StaticallyInitializedMutex.
/// The Linux implementation is based on mutex3 from
/// https://www.akkadia.org/drepper/futex.pdf
pub const Mutex = struct {
/// 0: unlocked
/// 1: locked, no waiters
/// 2: locked, one or more waiters
linux_lock: @typeOf(linux_lock_init),
pub const Mutex = switch(builtin.os) {
builtin.Os.linux => struct {
/// 0: unlocked
/// 1: locked, no waiters
/// 2: locked, one or more waiters
lock: i32,
/// TODO better implementation than spin lock
spin_lock: @typeOf(spin_lock_init),
pub const Held = struct {
mutex: *Mutex,
const linux_lock_init = if (builtin.os == builtin.Os.linux) i32(0) else {};
const spin_lock_init = if (builtin.os != builtin.Os.linux) SpinLock.init() else {};
pub const Held = struct {
mutex: *Mutex,
pub fn release(self: Held) void {
if (builtin.os == builtin.Os.linux) {
const c = @atomicRmw(i32, &self.mutex.linux_lock, AtomicRmwOp.Sub, 1, AtomicOrder.Release);
pub fn release(self: Held) void {
const c = @atomicRmw(i32, &self.mutex.lock, AtomicRmwOp.Sub, 1, AtomicOrder.Release);
if (c != 1) {
_ = @atomicRmw(i32, &self.mutex.linux_lock, AtomicRmwOp.Xchg, 0, AtomicOrder.Release);
const rc = linux.futex_wake(&self.mutex.linux_lock, linux.FUTEX_WAKE | linux.FUTEX_PRIVATE_FLAG, 1);
_ = @atomicRmw(i32, &self.mutex.lock, AtomicRmwOp.Xchg, 0, AtomicOrder.Release);
const rc = linux.futex_wake(&self.mutex.lock, linux.FUTEX_WAKE | linux.FUTEX_PRIVATE_FLAG, 1);
switch (linux.getErrno(rc)) {
0 => {},
linux.EINVAL => unreachable,
else => unreachable,
}
}
} else {
SpinLock.Held.release(SpinLock.Held{ .spinlock = &self.mutex.spin_lock });
}
}
};
pub fn init() Mutex {
return Mutex{
.linux_lock = linux_lock_init,
.spin_lock = spin_lock_init,
};
}
pub fn acquire(self: *Mutex) Held {
if (builtin.os == builtin.Os.linux) {
var c = @cmpxchgWeak(i32, &self.linux_lock, 0, 1, AtomicOrder.Acquire, AtomicOrder.Monotonic) orelse
pub fn init() Mutex {
return Mutex {
.lock = 0,
};
}
pub fn deinit(self: *Mutex) void {}
pub fn acquire(self: *Mutex) Held {
var c = @cmpxchgWeak(i32, &self.lock, 0, 1, AtomicOrder.Acquire, AtomicOrder.Monotonic) orelse
return Held{ .mutex = self };
if (c != 2)
c = @atomicRmw(i32, &self.linux_lock, AtomicRmwOp.Xchg, 2, AtomicOrder.Acquire);
c = @atomicRmw(i32, &self.lock, AtomicRmwOp.Xchg, 2, AtomicOrder.Acquire);
while (c != 0) {
const rc = linux.futex_wait(&self.linux_lock, linux.FUTEX_WAIT | linux.FUTEX_PRIVATE_FLAG, 2, null);
const rc = linux.futex_wait(&self.lock, linux.FUTEX_WAIT | linux.FUTEX_PRIVATE_FLAG, 2, null);
switch (linux.getErrno(rc)) {
0, linux.EINTR, linux.EAGAIN => {},
linux.EINVAL => unreachable,
else => unreachable,
}
c = @atomicRmw(i32, &self.linux_lock, AtomicRmwOp.Xchg, 2, AtomicOrder.Acquire);
c = @atomicRmw(i32, &self.lock, AtomicRmwOp.Xchg, 2, AtomicOrder.Acquire);
}
} else {
_ = self.spin_lock.acquire();
return Held { .mutex = self };
}
return Held{ .mutex = self };
}
},
// TODO once https://github.com/ziglang/zig/issues/287 (copy elision) is solved, we can make a
// better implementation of this. The problem is we need the init() function to have access to
// the address of the CRITICAL_SECTION, and then have it not move.
builtin.Os.windows => std.StaticallyInitializedMutex,
else => struct {
/// TODO better implementation than spin lock.
/// When changing this, one must also change the corresponding
/// std.StaticallyInitializedMutex code, since it aliases this type,
/// under the assumption that it works both statically and at runtime.
lock: SpinLock,
pub const Held = struct {
mutex: *Mutex,
pub fn release(self: Held) void {
SpinLock.Held.release(SpinLock.Held { .spinlock = &self.mutex.lock });
}
};
pub fn init() Mutex {
return Mutex {
.lock = SpinLock.init(),
};
}
pub fn deinit(self: *Mutex) void {}
pub fn acquire(self: *Mutex) Held {
_ = self.lock.acquire();
return Held { .mutex = self };
}
},
};
const Context = struct {
const TestContext = struct {
mutex: *Mutex,
data: i128,
@ -90,7 +115,9 @@ test "std.Mutex" {
var a = &fixed_buffer_allocator.allocator;
var mutex = Mutex.init();
var context = Context{
defer mutex.deinit();
var context = TestContext{
.mutex = &mutex,
.data = 0,
};
@ -103,12 +130,12 @@ test "std.Mutex" {
for (threads) |t|
t.wait();
std.debug.assertOrPanic(context.data == thread_count * Context.incr_count);
std.debug.assertOrPanic(context.data == thread_count * TestContext.incr_count);
}
fn worker(ctx: *Context) void {
fn worker(ctx: *TestContext) void {
var i: usize = 0;
while (i != Context.incr_count) : (i += 1) {
while (i != TestContext.incr_count) : (i += 1) {
const held = ctx.mutex.acquire();
defer held.release();

View File

@ -49,6 +49,7 @@ pub const UNICODE = false;
pub const WCHAR = u16;
pub const WORD = u16;
pub const LARGE_INTEGER = i64;
pub const LONG = c_long;
pub const TRUE = 1;
pub const FALSE = 0;

View File

@ -220,3 +220,50 @@ pub const FOREGROUND_BLUE = 1;
pub const FOREGROUND_GREEN = 2;
pub const FOREGROUND_RED = 4;
pub const FOREGROUND_INTENSITY = 8;
pub extern "kernel32" stdcallcc fn InitializeCriticalSection(lpCriticalSection: *CRITICAL_SECTION) void;
pub extern "kernel32" stdcallcc fn EnterCriticalSection(lpCriticalSection: *CRITICAL_SECTION) void;
pub extern "kernel32" stdcallcc fn LeaveCriticalSection(lpCriticalSection: *CRITICAL_SECTION) void;
pub extern "kernel32" stdcallcc fn DeleteCriticalSection(lpCriticalSection: *CRITICAL_SECTION) void;
pub const LIST_ENTRY = extern struct {
Flink: *LIST_ENTRY,
Blink: *LIST_ENTRY,
};
pub const RTL_CRITICAL_SECTION_DEBUG = extern struct {
Type: WORD,
CreatorBackTraceIndex: WORD,
CriticalSection: *RTL_CRITICAL_SECTION,
ProcessLocksList: LIST_ENTRY,
EntryCount: DWORD,
ContentionCount: DWORD,
Flags: DWORD,
CreatorBackTraceIndexHigh: WORD,
SpareWORD: WORD,
};
pub const RTL_CRITICAL_SECTION = extern struct {
DebugInfo: *RTL_CRITICAL_SECTION_DEBUG,
LockCount: LONG,
RecursionCount: LONG,
OwningThread: HANDLE,
LockSemaphore: HANDLE,
SpinCount: ULONG_PTR,
};
pub const CRITICAL_SECTION = RTL_CRITICAL_SECTION;
pub const INIT_ONCE = RTL_RUN_ONCE;
pub const INIT_ONCE_STATIC_INIT = RTL_RUN_ONCE_INIT;
pub extern "kernel32" stdcallcc fn InitOnceExecuteOnce(InitOnce: *INIT_ONCE, InitFn: INIT_ONCE_FN, Parameter: ?*c_void, Context: ?*c_void) BOOL;
pub const INIT_ONCE_FN = extern fn(InitOnce: *INIT_ONCE, Parameter: ?*c_void, Context: ?*c_void) BOOL;
pub const RTL_RUN_ONCE = extern struct {
Ptr: ?*c_void,
};
pub const RTL_RUN_ONCE_INIT = RTL_RUN_ONCE {
.Ptr = null,
};

View File

@ -0,0 +1,105 @@
const std = @import("index.zig");
const builtin = @import("builtin");
const AtomicOrder = builtin.AtomicOrder;
const AtomicRmwOp = builtin.AtomicRmwOp;
const assert = std.debug.assert;
const windows = std.os.windows;
/// Lock may be held only once. If the same thread
/// tries to acquire the same mutex twice, it deadlocks.
/// This type is intended to be initialized statically. If you don't
/// require static initialization, use std.Mutex.
/// On Windows, this mutex allocates resources when it is
/// first used, and the resources cannot be freed.
/// On Linux, this is an alias of std.Mutex.
pub const StaticallyInitializedMutex = switch(builtin.os) {
builtin.Os.linux => std.Mutex,
builtin.Os.windows => struct {
lock: windows.CRITICAL_SECTION,
init_once: windows.RTL_RUN_ONCE,
pub const Held = struct {
mutex: *StaticallyInitializedMutex,
pub fn release(self: Held) void {
windows.LeaveCriticalSection(&self.mutex.lock);
}
};
pub fn init() StaticallyInitializedMutex {
return StaticallyInitializedMutex {
.lock = undefined,
.init_once = windows.INIT_ONCE_STATIC_INIT,
};
}
extern fn initCriticalSection(
InitOnce: *windows.RTL_RUN_ONCE,
Parameter: ?*c_void,
Context: ?*c_void,
) windows.BOOL {
const lock = @ptrCast(*windows.CRITICAL_SECTION, @alignCast(@alignOf(windows.CRITICAL_SECTION), Parameter));
windows.InitializeCriticalSection(lock);
return windows.TRUE;
}
/// TODO: once https://github.com/ziglang/zig/issues/287 is solved and std.Mutex has a better
/// implementation of a runtime initialized mutex, remove this function.
pub fn deinit(self: *StaticallyInitializedMutex) void {
assert(windows.InitOnceExecuteOnce(&self.init_once, initCriticalSection, &self.lock, null) != 0);
windows.DeleteCriticalSection(&self.lock);
}
pub fn acquire(self: *StaticallyInitializedMutex) Held {
assert(windows.InitOnceExecuteOnce(&self.init_once, initCriticalSection, &self.lock, null) != 0);
windows.EnterCriticalSection(&self.lock);
return Held { .mutex = self };
}
},
else => std.Mutex,
};
test "std.StaticallyInitializedMutex" {
const TestContext = struct {
data: i128,
const TestContext = @This();
const incr_count = 10000;
var mutex = StaticallyInitializedMutex.init();
fn worker(ctx: *TestContext) void {
var i: usize = 0;
while (i != TestContext.incr_count) : (i += 1) {
const held = mutex.acquire();
defer held.release();
ctx.data += 1;
}
}
};
var direct_allocator = std.heap.DirectAllocator.init();
defer direct_allocator.deinit();
var plenty_of_memory = try direct_allocator.allocator.alloc(u8, 300 * 1024);
defer direct_allocator.allocator.free(plenty_of_memory);
var fixed_buffer_allocator = std.heap.ThreadSafeFixedBufferAllocator.init(plenty_of_memory);
var a = &fixed_buffer_allocator.allocator;
var context = TestContext{
.data = 0,
};
const thread_count = 10;
var threads: [thread_count]*std.os.Thread = undefined;
for (threads) |*t| {
t.* = try std.os.spawnThread(&context, TestContext.worker);
}
for (threads) |t|
t.wait();
std.debug.assertOrPanic(context.data == thread_count * TestContext.incr_count);
}