From 9462852433a815496e0edf5d5b2e00726f5ea072 Mon Sep 17 00:00:00 2001 From: Andrew Kelley Date: Mon, 9 Jul 2018 16:49:46 -0400 Subject: [PATCH] std.event.Loop multithreading for windows using IOCP --- std/event.zig | 122 +++++++++++++++++++++++++++++++++++++-- std/heap.zig | 2 +- std/os/index.zig | 25 ++++++-- std/os/windows/index.zig | 7 +++ std/os/windows/util.zig | 47 +++++++++++++++ 5 files changed, 191 insertions(+), 12 deletions(-) diff --git a/std/event.zig b/std/event.zig index 589ab4cb5f..90d614d72e 100644 --- a/std/event.zig +++ b/std/event.zig @@ -4,6 +4,7 @@ const assert = std.debug.assert; const event = this; const mem = std.mem; const posix = std.os.posix; +const windows = std.os.windows; const AtomicRmwOp = builtin.AtomicRmwOp; const AtomicOrder = builtin.AtomicOrder; @@ -113,10 +114,10 @@ pub const Loop = struct { allocator: *mem.Allocator, next_tick_queue: std.atomic.QueueMpsc(promise), os_data: OsData, + final_resume_node: ResumeNode, dispatch_lock: u8, // TODO make this a bool pending_event_count: usize, extra_threads: []*std.os.Thread, - final_resume_node: ResumeNode, // pre-allocated eventfds. all permanently active. // this is how we send promises to be resumed on other threads. @@ -144,6 +145,7 @@ pub const Loop = struct { }, builtin.Os.windows => struct { base: ResumeNode, + completion_key: usize, }, else => @compileError("unsupported OS"), }; @@ -181,12 +183,12 @@ pub const Loop = struct { .next_tick_queue = std.atomic.QueueMpsc(promise).init(), .dispatch_lock = 1, // start locked so threads go directly into epoll wait .extra_threads = undefined, + .available_eventfd_resume_nodes = std.atomic.Stack(ResumeNode.EventFd).init(), + .eventfd_resume_nodes = undefined, .final_resume_node = ResumeNode{ .id = ResumeNode.Id.Stop, .handle = undefined, }, - .available_eventfd_resume_nodes = std.atomic.Stack(ResumeNode.EventFd).init(), - .eventfd_resume_nodes = undefined, }; const extra_thread_count = thread_count - 1; self.eventfd_resume_nodes = try self.allocator.alloc( @@ -209,7 +211,8 @@ pub const Loop = struct { } const InitOsDataError = std.os.LinuxEpollCreateError || mem.Allocator.Error || std.os.LinuxEventFdError || - std.os.SpawnThreadError || std.os.LinuxEpollCtlError || std.os.BsdKEventError; + std.os.SpawnThreadError || std.os.LinuxEpollCtlError || std.os.BsdKEventError || + std.os.WindowsCreateIoCompletionPortError; const wakeup_bytes = []u8{0x1} ** 8; @@ -335,6 +338,51 @@ pub const Loop = struct { self.extra_threads[extra_thread_index] = try std.os.spawnThread(self, workerRun); } }, + builtin.Os.windows => { + self.os_data.extra_thread_count = extra_thread_count; + + self.os_data.io_port = try std.os.windowsCreateIoCompletionPort( + windows.INVALID_HANDLE_VALUE, + null, + undefined, + undefined, + ); + errdefer std.os.close(self.os_data.io_port); + + for (self.eventfd_resume_nodes) |*eventfd_node, i| { + eventfd_node.* = std.atomic.Stack(ResumeNode.EventFd).Node{ + .data = ResumeNode.EventFd{ + .base = ResumeNode{ + .id = ResumeNode.Id.EventFd, + .handle = undefined, + }, + // this one is for sending events + .completion_key = @ptrToInt(&eventfd_node.data.base), + }, + .next = undefined, + }; + self.available_eventfd_resume_nodes.push(eventfd_node); + } + + var extra_thread_index: usize = 0; + errdefer { + var i: usize = 0; + while (i < extra_thread_index) : (i += 1) { + while (true) { + const overlapped = @intToPtr(?*windows.OVERLAPPED, 0x1); + std.os.windowsPostQueuedCompletionStatus(self.os_data.io_port, undefined, @ptrToInt(&self.final_resume_node), overlapped) catch continue; + break; + } + } + while (extra_thread_index != 0) { + extra_thread_index -= 1; + self.extra_threads[extra_thread_index].wait(); + } + } + while (extra_thread_index < extra_thread_count) : (extra_thread_index += 1) { + self.extra_threads[extra_thread_index] = try std.os.spawnThread(self, workerRun); + } + }, else => {}, } } @@ -349,6 +397,10 @@ pub const Loop = struct { }, builtin.Os.macosx => { self.allocator.free(self.os_data.kevents); + std.os.close(self.os_data.kqfd); + }, + builtin.Os.windows => { + std.os.close(self.os_data.io_port); }, else => {}, } @@ -434,7 +486,7 @@ pub const Loop = struct { builtin.Os.macosx => { const kevent_array = (*[1]posix.Kevent)(&eventfd_node.kevent); const eventlist = ([*]posix.Kevent)(undefined)[0..0]; - _ = std.os.bsdKEvent(self.os_data.kqfd, kevent_array, eventlist, null) catch |_| { + _ = std.os.bsdKEvent(self.os_data.kqfd, kevent_array, eventlist, null) catch { // fine, we didn't need it anyway _ = @atomicRmw(u8, &self.dispatch_lock, AtomicRmwOp.Xchg, 0, AtomicOrder.SeqCst); self.available_eventfd_resume_nodes.push(resume_stack_node); @@ -446,7 +498,21 @@ pub const Loop = struct { builtin.Os.linux => { // the pending count is already accounted for const epoll_events = posix.EPOLLONESHOT | std.os.linux.EPOLLIN | std.os.linux.EPOLLOUT | std.os.linux.EPOLLET; - self.modFd(eventfd_node.eventfd, eventfd_node.epoll_op, epoll_events, &eventfd_node.base) catch |_| { + self.modFd(eventfd_node.eventfd, eventfd_node.epoll_op, epoll_events, &eventfd_node.base) catch { + // fine, we didn't need it anyway + _ = @atomicRmw(u8, &self.dispatch_lock, AtomicRmwOp.Xchg, 0, AtomicOrder.SeqCst); + self.available_eventfd_resume_nodes.push(resume_stack_node); + resume handle; + _ = @atomicRmw(usize, &self.pending_event_count, AtomicRmwOp.Sub, 1, AtomicOrder.SeqCst); + continue :start_over; + }; + }, + builtin.Os.windows => { + // this value is never dereferenced but we need it to be non-null so that + // the consumer code can decide whether to read the completion key. + // it has to do this for normal I/O, so we match that behavior here. + const overlapped = @intToPtr(?*windows.OVERLAPPED, 0x1); + std.os.windowsPostQueuedCompletionStatus(self.os_data.io_port, undefined, eventfd_node.completion_key, overlapped) catch { // fine, we didn't need it anyway _ = @atomicRmw(u8, &self.dispatch_lock, AtomicRmwOp.Xchg, 0, AtomicOrder.SeqCst); self.available_eventfd_resume_nodes.push(resume_stack_node); @@ -482,6 +548,17 @@ pub const Loop = struct { _ = std.os.bsdKEvent(self.os_data.kqfd, final_kevent, eventlist, null) catch unreachable; return; }, + builtin.Os.windows => { + var i: usize = 0; + while (i < self.os_data.extra_thread_count) : (i += 1) { + while (true) { + const overlapped = @intToPtr(?*windows.OVERLAPPED, 0x1); + std.os.windowsPostQueuedCompletionStatus(self.os_data.io_port, undefined, @ptrToInt(&self.final_resume_node), overlapped) catch continue; + break; + } + } + return; + }, else => @compileError("unsupported OS"), } } @@ -536,6 +613,35 @@ pub const Loop = struct { } } }, + builtin.Os.windows => { + var completion_key: usize = undefined; + while (true) { + var nbytes: windows.DWORD = undefined; + var overlapped: ?*windows.OVERLAPPED = undefined; + switch (std.os.windowsGetQueuedCompletionStatus(self.os_data.io_port, &nbytes, &completion_key, + &overlapped, windows.INFINITE)) { + std.os.WindowsWaitResult.Aborted => return, + std.os.WindowsWaitResult.Normal => {}, + } + if (overlapped != null) break; + } + const resume_node = @intToPtr(*ResumeNode, completion_key); + const handle = resume_node.handle; + const resume_node_id = resume_node.id; + switch (resume_node_id) { + ResumeNode.Id.Basic => {}, + ResumeNode.Id.Stop => return, + ResumeNode.Id.EventFd => { + const event_fd_node = @fieldParentPtr(ResumeNode.EventFd, "base", resume_node); + const stack_node = @fieldParentPtr(std.atomic.Stack(ResumeNode.EventFd).Node, "data", event_fd_node); + self.available_eventfd_resume_nodes.push(stack_node); + }, + } + resume handle; + if (resume_node_id == ResumeNode.Id.EventFd) { + _ = @atomicRmw(usize, &self.pending_event_count, AtomicRmwOp.Sub, 1, AtomicOrder.SeqCst); + } + }, else => @compileError("unsupported OS"), } } @@ -548,6 +654,10 @@ pub const Loop = struct { final_eventfd_event: std.os.linux.epoll_event, }, builtin.Os.macosx => MacOsData, + builtin.Os.windows => struct { + io_port: windows.HANDLE, + extra_thread_count: usize, + }, else => struct {}, }; diff --git a/std/heap.zig b/std/heap.zig index caf972e605..ef22c8d0c5 100644 --- a/std/heap.zig +++ b/std/heap.zig @@ -98,7 +98,7 @@ pub const DirectAllocator = struct { const amt = n + alignment + @sizeOf(usize); const optional_heap_handle = @atomicLoad(?HeapHandle, &self.heap_handle, builtin.AtomicOrder.SeqCst); const heap_handle = optional_heap_handle orelse blk: { - const hh = os.windows.HeapCreate(os.windows.HEAP_NO_SERIALIZE, amt, 0) orelse return error.OutOfMemory; + const hh = os.windows.HeapCreate(0, amt, 0) orelse return error.OutOfMemory; const other_hh = @cmpxchgStrong(?HeapHandle, &self.heap_handle, null, hh, builtin.AtomicOrder.SeqCst, builtin.AtomicOrder.SeqCst) orelse break :blk hh; _ = os.windows.HeapDestroy(hh); break :blk other_hh.?; // can't be null because of the cmpxchg diff --git a/std/os/index.zig b/std/os/index.zig index 94fdd9dc84..896d6b3df8 100644 --- a/std/os/index.zig +++ b/std/os/index.zig @@ -61,6 +61,15 @@ pub const windowsLoadDll = windows_util.windowsLoadDll; pub const windowsUnloadDll = windows_util.windowsUnloadDll; pub const createWindowsEnvBlock = windows_util.createWindowsEnvBlock; +pub const WindowsCreateIoCompletionPortError = windows_util.WindowsCreateIoCompletionPortError; +pub const windowsCreateIoCompletionPort = windows_util.windowsCreateIoCompletionPort; + +pub const WindowsPostQueuedCompletionStatusError = windows_util.WindowsPostQueuedCompletionStatusError; +pub const windowsPostQueuedCompletionStatus = windows_util.windowsPostQueuedCompletionStatus; + +pub const WindowsWaitResult = windows_util.WindowsWaitResult; +pub const windowsGetQueuedCompletionStatus = windows_util.windowsGetQueuedCompletionStatus; + pub const WindowsWaitError = windows_util.WaitError; pub const WindowsOpenError = windows_util.OpenError; pub const WindowsWriteError = windows_util.WriteError; @@ -2592,11 +2601,17 @@ pub fn spawnThread(context: var, comptime startFn: var) SpawnThreadError!*Thread thread: Thread, inner: Context, }; - extern fn threadMain(arg: windows.LPVOID) windows.DWORD { - if (@sizeOf(Context) == 0) { - return startFn({}); - } else { - return startFn(@ptrCast(*Context, @alignCast(@alignOf(Context), arg)).*); + extern fn threadMain(raw_arg: windows.LPVOID) windows.DWORD { + const arg = if (@sizeOf(Context) == 0) {} else @ptrCast(*Context, @alignCast(@alignOf(Context), raw_arg)).*; + switch (@typeId(@typeOf(startFn).ReturnType)) { + builtin.TypeId.Int => { + return startFn(arg); + }, + builtin.TypeId.Void => { + startFn(arg); + return 0; + }, + else => @compileError("expected return type of startFn to be 'u8', 'noreturn', 'void', or '!void'"), } } }; diff --git a/std/os/windows/index.zig b/std/os/windows/index.zig index 571ac97fac..f73b8ec261 100644 --- a/std/os/windows/index.zig +++ b/std/os/windows/index.zig @@ -59,6 +59,9 @@ pub extern "kernel32" stdcallcc fn CreateSymbolicLinkA( dwFlags: DWORD, ) BOOLEAN; + +pub extern "kernel32" stdcallcc fn CreateIoCompletionPort(FileHandle: HANDLE, ExistingCompletionPort: ?HANDLE, CompletionKey: ULONG_PTR, NumberOfConcurrentThreads: DWORD) ?HANDLE; + pub extern "kernel32" stdcallcc fn CreateThread(lpThreadAttributes: ?LPSECURITY_ATTRIBUTES, dwStackSize: SIZE_T, lpStartAddress: LPTHREAD_START_ROUTINE, lpParameter: ?LPVOID, dwCreationFlags: DWORD, lpThreadId: ?LPDWORD) ?HANDLE; pub extern "kernel32" stdcallcc fn DeleteFileA(lpFileName: LPCSTR) BOOL; @@ -106,6 +109,7 @@ pub extern "kernel32" stdcallcc fn GetFinalPathNameByHandleA( ) DWORD; pub extern "kernel32" stdcallcc fn GetProcessHeap() ?HANDLE; +pub extern "kernel32" stdcallcc fn GetQueuedCompletionStatus(CompletionPort: HANDLE, lpNumberOfBytesTransferred: LPDWORD, lpCompletionKey: *ULONG_PTR, lpOverlapped: *?*OVERLAPPED, dwMilliseconds: DWORD) BOOL; pub extern "kernel32" stdcallcc fn GetSystemInfo(lpSystemInfo: *SYSTEM_INFO) void; pub extern "kernel32" stdcallcc fn GetSystemTimeAsFileTime(*FILETIME) void; @@ -130,6 +134,9 @@ pub extern "kernel32" stdcallcc fn MoveFileExA( dwFlags: DWORD, ) BOOL; + +pub extern "kernel32" stdcallcc fn PostQueuedCompletionStatus(CompletionPort: HANDLE, dwNumberOfBytesTransferred: DWORD, dwCompletionKey: ULONG_PTR, lpOverlapped: ?*OVERLAPPED) BOOL; + pub extern "kernel32" stdcallcc fn QueryPerformanceCounter(lpPerformanceCount: *LARGE_INTEGER) BOOL; pub extern "kernel32" stdcallcc fn QueryPerformanceFrequency(lpFrequency: *LARGE_INTEGER) BOOL; diff --git a/std/os/windows/util.zig b/std/os/windows/util.zig index 45b205451d..b04e8efc4b 100644 --- a/std/os/windows/util.zig +++ b/std/os/windows/util.zig @@ -214,3 +214,50 @@ pub fn windowsFindNextFile(handle: windows.HANDLE, find_file_data: *windows.WIN3 } return true; } + + +pub const WindowsCreateIoCompletionPortError = error { + Unexpected, +}; + +pub fn windowsCreateIoCompletionPort(file_handle: windows.HANDLE, existing_completion_port: ?windows.HANDLE, completion_key: usize, concurrent_thread_count: windows.DWORD) !windows.HANDLE { + const handle = windows.CreateIoCompletionPort(file_handle, existing_completion_port, completion_key, concurrent_thread_count) orelse { + const err = windows.GetLastError(); + switch (err) { + else => return os.unexpectedErrorWindows(err), + } + }; + return handle; +} + +pub const WindowsPostQueuedCompletionStatusError = error { + Unexpected, +}; + +pub fn windowsPostQueuedCompletionStatus(completion_port: windows.HANDLE, bytes_transferred_count: windows.DWORD, completion_key: usize, lpOverlapped: ?*windows.OVERLAPPED) WindowsPostQueuedCompletionStatusError!void { + if (windows.PostQueuedCompletionStatus(completion_port, bytes_transferred_count, completion_key, lpOverlapped) == 0) { + const err = windows.GetLastError(); + switch (err) { + else => return os.unexpectedErrorWindows(err), + } + } +} + +pub const WindowsWaitResult = error { + Normal, + Aborted, +}; + +pub fn windowsGetQueuedCompletionStatus(completion_port: windows.HANDLE, bytes_transferred_count: *windows.DWORD, lpCompletionKey: *usize, lpOverlapped: *?*windows.OVERLAPPED, dwMilliseconds: windows.DWORD) WindowsWaitResult { + if (windows.GetQueuedCompletionStatus(completion_port, bytes_transferred_count, lpCompletionKey, lpOverlapped, dwMilliseconds) == windows.FALSE) { + if (std.debug.runtime_safety) { + const err = windows.GetLastError(); + if (err != windows.ERROR.ABANDONED_WAIT_0) { + std.debug.warn("err: {}\n", err); + } + assert(err == windows.ERROR.ABANDONED_WAIT_0); + } + return WindowsWaitResult.Aborted; + } + return WindowsWaitResult.Normal; +}