diff --git a/std/event/fs.zig b/std/event/fs.zig index 0b164276f2..54a4cd1b50 100644 --- a/std/event/fs.zig +++ b/std/event/fs.zig @@ -5,6 +5,8 @@ const assert = std.debug.assert; const os = std.os; const mem = std.mem; const posix = os.posix; +const windows = os.windows; +const Loop = event.Loop; pub const RequestNode = std.atomic.Queue(Request).Node; @@ -13,7 +15,7 @@ pub const Request = struct { finish: Finish, pub const Finish = union(enum) { - TickNode: event.Loop.NextTickNode, + TickNode: Loop.NextTickNode, DeallocCloseOperation: *CloseOperation, NoAction, }; @@ -71,7 +73,77 @@ pub const Request = struct { }; /// data - just the inner references - must live until pwritev promise completes. -pub async fn pwritev(loop: *event.Loop, fd: os.FileHandle, data: []const []const u8, offset: usize) !void { +pub async fn pwritev(loop: *Loop, fd: os.FileHandle, data: []const []const u8, offset: usize) !void { + switch (builtin.os) { + builtin.Os.macosx, + builtin.Os.linux, + => return await (async pwritevPosix(loop, fd, data, offset) catch unreachable), + builtin.Os.windows, + => return await (async pwritevWindows(loop, fd, data, offset) catch unreachable), + else => @compileError("Unsupported OS"), + } +} + +/// data - just the inner references - must live until pwritev promise completes. +pub async fn pwritevWindows(loop: *Loop, fd: os.FileHandle, data: []const []const u8, offset: usize) !void { + if (data.len == 0) return; + if (data.len == 1) return await (async pwriteWindows(loop, fd, data[0], offset) catch unreachable); + + const data_copy = std.mem.dupe(loop.allocator, []const u8, data); + defer loop.allocator.free(data_copy); + + var off = offset; + for (data_copy) |buf| { + try await (async pwriteWindows(loop, fd, buf, off) catch unreachable); + off += buf.len; + } +} + +pub async fn pwriteWindows(loop: *Loop, fd: os.FileHandle, data: []const u8, offset: u64) os.WindowsWriteError!void { + // workaround for https://github.com/ziglang/zig/issues/1194 + suspend { + resume @handle(); + } + + var resume_node = Loop.ResumeNode.Basic{ + .base = Loop.ResumeNode{ + .id = Loop.ResumeNode.Id.Basic, + .handle = @handle(), + }, + }; + const completion_key = @ptrToInt(&resume_node.base); + _ = try os.windowsCreateIoCompletionPort(fd, loop.os_data.io_port, completion_key, undefined); + var overlapped = windows.OVERLAPPED{ + .Internal = 0, + .InternalHigh = 0, + .Offset = @truncate(u32, offset), + .OffsetHigh = @truncate(u32, offset >> 32), + .hEvent = null, + }; + errdefer { + _ = windows.CancelIoEx(fd, &overlapped); + } + suspend { + _ = windows.WriteFile(fd, data.ptr, @intCast(windows.DWORD, data.len), null, &overlapped); + } + var bytes_transferred: windows.DWORD = undefined; + if (windows.GetOverlappedResult(fd, &overlapped, &bytes_transferred, windows.FALSE) == 0) { + const err = windows.GetLastError(); + return switch (err) { + windows.ERROR.IO_PENDING => unreachable, + windows.ERROR.INVALID_USER_BUFFER => error.SystemResources, + windows.ERROR.NOT_ENOUGH_MEMORY => error.SystemResources, + windows.ERROR.OPERATION_ABORTED => error.OperationAborted, + windows.ERROR.NOT_ENOUGH_QUOTA => error.SystemResources, + windows.ERROR.BROKEN_PIPE => error.BrokenPipe, + else => os.unexpectedErrorWindows(err), + }; + } +} + + +/// data - just the inner references - must live until pwritev promise completes. +pub async fn pwritevPosix(loop: *Loop, fd: os.FileHandle, data: []const []const u8, offset: usize) !void { // workaround for https://github.com/ziglang/zig/issues/1194 suspend { resume @handle(); @@ -100,7 +172,7 @@ pub async fn pwritev(loop: *event.Loop, fd: os.FileHandle, data: []const []const }, }, .finish = Request.Finish{ - .TickNode = event.Loop.NextTickNode{ + .TickNode = Loop.NextTickNode{ .prev = null, .next = null, .data = @handle(), @@ -118,8 +190,8 @@ pub async fn pwritev(loop: *event.Loop, fd: os.FileHandle, data: []const []const return req_node.data.msg.PWriteV.result; } -/// data - just the inner references - must live until pwritev promise completes. -pub async fn preadv(loop: *event.Loop, fd: os.FileHandle, data: []const []u8, offset: usize) !usize { +/// data - just the inner references - must live until preadv promise completes. +pub async fn preadv(loop: *Loop, fd: os.FileHandle, data: []const []u8, offset: usize) !usize { //const data_dupe = try mem.dupe(loop.allocator, []const u8, data); //defer loop.allocator.free(data_dupe); @@ -151,7 +223,7 @@ pub async fn preadv(loop: *event.Loop, fd: os.FileHandle, data: []const []u8, of }, }, .finish = Request.Finish{ - .TickNode = event.Loop.NextTickNode{ + .TickNode = Loop.NextTickNode{ .prev = null, .next = null, .data = @handle(), @@ -169,8 +241,8 @@ pub async fn preadv(loop: *event.Loop, fd: os.FileHandle, data: []const []u8, of return req_node.data.msg.PReadV.result; } -pub async fn open( - loop: *event.Loop, +pub async fn openPosix( + loop: *Loop, path: []const u8, flags: u32, mode: os.File.Mode, @@ -196,7 +268,7 @@ pub async fn open( }, }, .finish = Request.Finish{ - .TickNode = event.Loop.NextTickNode{ + .TickNode = Loop.NextTickNode{ .prev = null, .next = null, .data = @handle(), @@ -214,19 +286,47 @@ pub async fn open( return req_node.data.msg.Open.result; } -pub async fn openRead(loop: *event.Loop, path: []const u8) os.File.OpenError!os.FileHandle { +pub async fn openRead(loop: *Loop, path: []const u8) os.File.OpenError!os.FileHandle { const flags = posix.O_LARGEFILE | posix.O_RDONLY | posix.O_CLOEXEC; - return await (async open(loop, path, flags, 0) catch unreachable); + return await (async openPosix(loop, path, flags, os.File.default_mode) catch unreachable); +} + +/// Creates if does not exist. Truncates the file if it exists. +/// Uses the default mode. +pub async fn openWrite(loop: *Loop, path: []const u8) os.File.OpenError!os.FileHandle { + return await (async openWriteMode(loop, path, os.File.default_mode) catch unreachable); +} + +/// Creates if does not exist. Truncates the file if it exists. +pub async fn openWriteMode(loop: *Loop, path: []const u8, mode: os.File.Mode) os.File.OpenError!os.FileHandle { + switch (builtin.os) { + builtin.Os.macosx, + builtin.Os.linux, + => { + const flags = posix.O_LARGEFILE | posix.O_WRONLY | posix.O_CREAT | posix.O_CLOEXEC | posix.O_TRUNC; + return await (async openPosix(loop, path, flags, os.File.default_mode) catch unreachable); + }, + builtin.Os.windows, + => return os.windowsOpen( + loop.allocator, + path, + windows.GENERIC_WRITE, + windows.FILE_SHARE_WRITE | windows.FILE_SHARE_READ | windows.FILE_SHARE_DELETE, + windows.CREATE_ALWAYS, + windows.FILE_ATTRIBUTE_NORMAL | windows.FILE_FLAG_OVERLAPPED, + ), + else => @compileError("Unsupported OS"), + } } /// Creates if does not exist. Does not truncate. pub async fn openReadWrite( - loop: *event.Loop, + loop: *Loop, path: []const u8, mode: os.File.Mode, ) os.File.OpenError!os.FileHandle { const flags = posix.O_LARGEFILE | posix.O_RDWR | posix.O_CREAT | posix.O_CLOEXEC; - return await (async open(loop, path, flags, mode) catch unreachable); + return await (async openPosix(loop, path, flags, mode) catch unreachable); } /// This abstraction helps to close file handles in defer expressions @@ -236,24 +336,46 @@ pub async fn openReadWrite( /// If you call `setHandle` then finishing will close the fd; otherwise finishing /// will deallocate the `CloseOperation`. pub const CloseOperation = struct { - loop: *event.Loop, - have_fd: bool, - close_req_node: RequestNode, + loop: *Loop, + os_data: OsData, - pub fn start(loop: *event.Loop) (error{OutOfMemory}!*CloseOperation) { + const OsData = switch (builtin.os) { + builtin.Os.linux, + builtin.Os.macosx, + => struct { + have_fd: bool, + close_req_node: RequestNode, + }, + builtin.Os.windows, + => struct { + handle: ?os.FileHandle, + }, + else => @compileError("Unsupported OS"), + }; + + pub fn start(loop: *Loop) (error{OutOfMemory}!*CloseOperation) { const self = try loop.allocator.createOne(CloseOperation); self.* = CloseOperation{ .loop = loop, - .have_fd = false, - .close_req_node = RequestNode{ - .prev = null, - .next = null, - .data = Request{ - .msg = Request.Msg{ - .Close = Request.Msg.Close{ .fd = undefined }, + .os_data = switch (builtin.os) { + builtin.Os.linux, + builtin.Os.macosx, + => OsData{ + .have_fd = false, + .close_req_node = RequestNode{ + .prev = null, + .next = null, + .data = Request{ + .msg = Request.Msg{ + .Close = Request.Msg.Close{ .fd = undefined }, + }, + .finish = Request.Finish{ .DeallocCloseOperation = self }, + }, }, - .finish = Request.Finish{ .DeallocCloseOperation = self }, }, + builtin.Os.windows, + => OsData{ .handle = null }, + else => @compileError("Unsupported OS"), }, }; return self; @@ -261,36 +383,109 @@ pub const CloseOperation = struct { /// Defer this after creating. pub fn finish(self: *CloseOperation) void { - if (self.have_fd) { - self.loop.posixFsRequest(&self.close_req_node); - } else { - self.loop.allocator.destroy(self); + switch (builtin.os) { + builtin.Os.linux, + builtin.Os.macosx, + => { + if (self.have_fd) { + self.loop.posixFsRequest(&self.close_req_node); + } else { + self.loop.allocator.destroy(self); + } + }, + builtin.Os.windows, + => { + if (self.handle) |handle| { + os.close(handle); + } + self.loop.allocator.destroy(self); + }, + else => @compileError("Unsupported OS"), } } pub fn setHandle(self: *CloseOperation, handle: os.FileHandle) void { - self.close_req_node.data.msg.Close.fd = handle; - self.have_fd = true; + switch (builtin.os) { + builtin.Os.linux, + builtin.Os.macosx, + => { + self.close_req_node.data.msg.Close.fd = handle; + self.have_fd = true; + }, + builtin.Os.windows, + => { + self.handle = handle; + }, + else => @compileError("Unsupported OS"), + } } /// Undo a `setHandle`. pub fn clearHandle(self: *CloseOperation) void { - self.have_fd = false; + switch (builtin.os) { + builtin.Os.linux, + builtin.Os.macosx, + => { + self.have_fd = false; + }, + builtin.Os.windows, + => { + self.handle = null; + }, + else => @compileError("Unsupported OS"), + } } pub fn getHandle(self: *CloseOperation) os.FileHandle { - assert(self.have_fd); - return self.close_req_node.data.msg.Close.fd; + switch (builtin.os) { + builtin.Os.linux, + builtin.Os.macosx, + => { + assert(self.have_fd); + return self.close_req_node.data.msg.Close.fd; + }, + builtin.Os.windows, + => { + return self.handle.?; + }, + else => @compileError("Unsupported OS"), + } } }; /// contents must remain alive until writeFile completes. -pub async fn writeFile(loop: *event.Loop, path: []const u8, contents: []const u8) !void { +/// TODO make this atomic or provide writeFileAtomic and rename this one to writeFileTruncate +pub async fn writeFile(loop: *Loop, path: []const u8, contents: []const u8) !void { return await (async writeFileMode(loop, path, contents, os.File.default_mode) catch unreachable); } /// contents must remain alive until writeFile completes. -pub async fn writeFileMode(loop: *event.Loop, path: []const u8, contents: []const u8, mode: os.File.Mode) !void { +pub async fn writeFileMode(loop: *Loop, path: []const u8, contents: []const u8, mode: os.File.Mode) !void { + switch (builtin.os) { + builtin.Os.linux, + builtin.Os.macosx, + => return await (async writeFileModeThread(loop, path, contents, mode) catch unreachable), + builtin.Os.windows, + => return await (async writeFileWindows(loop, path, contents) catch unreachable), + else => @compileError("Unsupported OS"), + } +} + +async fn writeFileWindows(loop: *Loop, path: []const u8, contents: []const u8) !void { + const handle = try os.windowsOpen( + loop.allocator, + path, + windows.GENERIC_WRITE, + windows.FILE_SHARE_WRITE | windows.FILE_SHARE_READ | windows.FILE_SHARE_DELETE, + windows.CREATE_ALWAYS, + windows.FILE_ATTRIBUTE_NORMAL | windows.FILE_FLAG_OVERLAPPED, + ); + defer os.close(handle); + + try await (async pwriteWindows(loop, handle, contents, 0) catch unreachable); +} + +async fn writeFileModeThread(loop: *Loop, path: []const u8, contents: []const u8, mode: os.File.Mode) !void { // workaround for https://github.com/ziglang/zig/issues/1194 suspend { resume @handle(); @@ -312,7 +507,7 @@ pub async fn writeFileMode(loop: *event.Loop, path: []const u8, contents: []cons }, }, .finish = Request.Finish{ - .TickNode = event.Loop.NextTickNode{ + .TickNode = Loop.NextTickNode{ .prev = null, .next = null, .data = @handle(), @@ -333,7 +528,7 @@ pub async fn writeFileMode(loop: *event.Loop, path: []const u8, contents: []cons /// The promise resumes when the last data has been confirmed written, but before the file handle /// is closed. /// Caller owns returned memory. -pub async fn readFile(loop: *event.Loop, file_path: []const u8, max_size: usize) ![]u8 { +pub async fn readFile(loop: *Loop, file_path: []const u8, max_size: usize) ![]u8 { var close_op = try CloseOperation.start(loop); defer close_op.finish(); @@ -417,7 +612,7 @@ pub fn Watch(comptime V: type) type { pub const Error = WatchEventError; }; - pub fn create(loop: *event.Loop, event_buf_count: usize) !*Self { + pub fn create(loop: *Loop, event_buf_count: usize) !*Self { const channel = try event.Channel(Self.Event.Error!Self.Event).create(loop, event_buf_count); errdefer channel.destroy(); @@ -482,7 +677,7 @@ pub fn Watch(comptime V: type) type { const flags = posix.O_SYMLINK | posix.O_EVTONLY; const mode = 0; - const fd = try await (async open(self.channel.loop, resolved_path, flags, mode) catch unreachable); + const fd = try await (async openPosix(self.channel.loop, resolved_path, flags, mode) catch unreachable); close_op.setHandle(fd); var put_data: *OsData.Put = undefined; @@ -722,7 +917,7 @@ test "write a file, watch it, write it again" { try os.makePath(allocator, test_tmp_dir); defer os.deleteTree(allocator, test_tmp_dir) catch {}; - var loop: event.Loop = undefined; + var loop: Loop = undefined; try loop.initMultiThreaded(allocator); defer loop.deinit(); @@ -734,11 +929,11 @@ test "write a file, watch it, write it again" { return result; } -async fn testFsWatchCantFail(loop: *event.Loop, result: *(error!void)) void { +async fn testFsWatchCantFail(loop: *Loop, result: *(error!void)) void { result.* = await async testFsWatch(loop) catch unreachable; } -async fn testFsWatch(loop: *event.Loop) !void { +async fn testFsWatch(loop: *Loop) !void { const file_path = try os.path.join(loop.allocator, test_tmp_dir, "file.txt"); defer loop.allocator.free(file_path); diff --git a/std/event/loop.zig b/std/event/loop.zig index 6893c2e253..2c6ea927ae 100644 --- a/std/event/loop.zig +++ b/std/event/loop.zig @@ -301,7 +301,7 @@ pub const Loop = struct { windows.INVALID_HANDLE_VALUE, null, undefined, - undefined, + @maxValue(windows.DWORD), ); errdefer os.close(self.os_data.io_port); @@ -315,7 +315,6 @@ pub const Loop = struct { // this one is for sending events .completion_key = @ptrToInt(&eventfd_node.data.base), }, - .prev = undefined, .next = undefined, }; self.available_eventfd_resume_nodes.push(eventfd_node); @@ -528,7 +527,12 @@ pub const Loop = struct { self.workerRun(); - self.os_data.fs_thread.wait(); + switch (builtin.os) { + builtin.Os.linux, + builtin.Os.macosx, + => self.os_data.fs_thread.wait(), + else => {}, + } for (self.extra_threads) |extra_thread| { extra_thread.wait(); @@ -794,15 +798,7 @@ pub const Loop = struct { } const OsData = switch (builtin.os) { - builtin.Os.linux => struct { - epollfd: i32, - final_eventfd: i32, - final_eventfd_event: os.linux.epoll_event, - fs_thread: *os.Thread, - fs_queue_item: u8, - fs_queue: std.atomic.Queue(fs.Request), - fs_end_request: fs.RequestNode, - }, + builtin.Os.linux => LinuxOsData, builtin.Os.macosx => MacOsData, builtin.Os.windows => struct { io_port: windows.HANDLE, @@ -821,6 +817,16 @@ pub const Loop = struct { fs_queue: std.atomic.Queue(fs.Request), fs_end_request: fs.RequestNode, }; + + const LinuxOsData = struct { + epollfd: i32, + final_eventfd: i32, + final_eventfd_event: os.linux.epoll_event, + fs_thread: *os.Thread, + fs_queue_item: u8, + fs_queue: std.atomic.Queue(fs.Request), + fs_end_request: fs.RequestNode, + }; }; test "std.event.Loop - basic" { diff --git a/std/mem.zig b/std/mem.zig index 43961a6d14..930c88141c 100644 --- a/std/mem.zig +++ b/std/mem.zig @@ -541,7 +541,7 @@ pub fn join(allocator: *Allocator, sep: u8, strings: ...) ![]u8 { } } - return buf[0..buf_index]; + return allocator.shrink(u8, buf, buf_index); } test "mem.join" { diff --git a/std/os/path.zig b/std/os/path.zig index d3ab0c519f..23c217b295 100644 --- a/std/os/path.zig +++ b/std/os/path.zig @@ -506,7 +506,7 @@ pub fn resolveWindows(allocator: *Allocator, paths: []const []const u8) ![]u8 { result_index += 1; } - return result[0..result_index]; + return allocator.shrink(u8, result, result_index); } /// This function is like a series of `cd` statements executed one after another. diff --git a/std/os/windows/index.zig b/std/os/windows/index.zig index 90ccfaf6c5..bb055468a5 100644 --- a/std/os/windows/index.zig +++ b/std/os/windows/index.zig @@ -67,8 +67,9 @@ pub const INVALID_FILE_ATTRIBUTES = DWORD(@maxValue(DWORD)); pub const OVERLAPPED = extern struct { Internal: ULONG_PTR, InternalHigh: ULONG_PTR, - Pointer: PVOID, - hEvent: HANDLE, + Offset: DWORD, + OffsetHigh: DWORD, + hEvent: ?HANDLE, }; pub const LPOVERLAPPED = *OVERLAPPED; @@ -350,3 +351,15 @@ pub const E_ACCESSDENIED = @bitCast(c_long, c_ulong(0x80070005)); pub const E_HANDLE = @bitCast(c_long, c_ulong(0x80070006)); pub const E_OUTOFMEMORY = @bitCast(c_long, c_ulong(0x8007000E)); pub const E_INVALIDARG = @bitCast(c_long, c_ulong(0x80070057)); + +pub const FILE_FLAG_BACKUP_SEMANTICS = 0x02000000; +pub const FILE_FLAG_DELETE_ON_CLOSE = 0x04000000; +pub const FILE_FLAG_NO_BUFFERING = 0x20000000; +pub const FILE_FLAG_OPEN_NO_RECALL = 0x00100000; +pub const FILE_FLAG_OPEN_REPARSE_POINT = 0x00200000; +pub const FILE_FLAG_OVERLAPPED = 0x40000000; +pub const FILE_FLAG_POSIX_SEMANTICS = 0x0100000; +pub const FILE_FLAG_RANDOM_ACCESS = 0x10000000; +pub const FILE_FLAG_SESSION_AWARE = 0x00800000; +pub const FILE_FLAG_SEQUENTIAL_SCAN = 0x08000000; +pub const FILE_FLAG_WRITE_THROUGH = 0x80000000; diff --git a/std/os/windows/kernel32.zig b/std/os/windows/kernel32.zig index fa3473ad05..6a4519c7b9 100644 --- a/std/os/windows/kernel32.zig +++ b/std/os/windows/kernel32.zig @@ -1,5 +1,8 @@ use @import("index.zig"); + +pub extern "kernel32" stdcallcc fn CancelIoEx(hFile: HANDLE, lpOverlapped: LPOVERLAPPED) BOOL; + pub extern "kernel32" stdcallcc fn CloseHandle(hObject: HANDLE) BOOL; pub extern "kernel32" stdcallcc fn CreateDirectoryA( @@ -91,6 +94,9 @@ pub extern "kernel32" stdcallcc fn GetFinalPathNameByHandleA( dwFlags: DWORD, ) DWORD; + +pub extern "kernel32" stdcallcc fn GetOverlappedResult(hFile: HANDLE, lpOverlapped: *OVERLAPPED, lpNumberOfBytesTransferred: *DWORD, bWait: BOOL) BOOL; + pub extern "kernel32" stdcallcc fn GetProcessHeap() ?HANDLE; pub extern "kernel32" stdcallcc fn GetQueuedCompletionStatus(CompletionPort: HANDLE, lpNumberOfBytesTransferred: LPDWORD, lpCompletionKey: *ULONG_PTR, lpOverlapped: *?*OVERLAPPED, dwMilliseconds: DWORD) BOOL; @@ -150,12 +156,14 @@ pub extern "kernel32" stdcallcc fn WaitForSingleObject(hHandle: HANDLE, dwMillis pub extern "kernel32" stdcallcc fn WriteFile( in_hFile: HANDLE, - in_lpBuffer: *const c_void, + in_lpBuffer: [*]const u8, in_nNumberOfBytesToWrite: DWORD, out_lpNumberOfBytesWritten: ?*DWORD, in_out_lpOverlapped: ?*OVERLAPPED, ) BOOL; +pub extern "kernel32" stdcallcc fn WriteFileEx(hFile: HANDLE, lpBuffer: [*]const u8, nNumberOfBytesToWrite: DWORD, lpOverlapped: LPOVERLAPPED, lpCompletionRoutine: LPOVERLAPPED_COMPLETION_ROUTINE) BOOL; + //TODO: call unicode versions instead of relying on ANSI code page pub extern "kernel32" stdcallcc fn LoadLibraryA(lpLibFileName: LPCSTR) ?HMODULE; diff --git a/std/os/windows/util.zig b/std/os/windows/util.zig index c9d2c3c3e6..ca5cdd4ac0 100644 --- a/std/os/windows/util.zig +++ b/std/os/windows/util.zig @@ -36,20 +36,19 @@ pub fn windowsClose(handle: windows.HANDLE) void { pub const WriteError = error{ SystemResources, OperationAborted, - IoPending, BrokenPipe, Unexpected, }; pub fn windowsWrite(handle: windows.HANDLE, bytes: []const u8) WriteError!void { - if (windows.WriteFile(handle, @ptrCast(*const c_void, bytes.ptr), @intCast(u32, bytes.len), null, null) == 0) { + if (windows.WriteFile(handle, bytes.ptr, @intCast(u32, bytes.len), null, null) == 0) { const err = windows.GetLastError(); return switch (err) { windows.ERROR.INVALID_USER_BUFFER => WriteError.SystemResources, windows.ERROR.NOT_ENOUGH_MEMORY => WriteError.SystemResources, windows.ERROR.OPERATION_ABORTED => WriteError.OperationAborted, windows.ERROR.NOT_ENOUGH_QUOTA => WriteError.SystemResources, - windows.ERROR.IO_PENDING => WriteError.IoPending, + windows.ERROR.IO_PENDING => unreachable, windows.ERROR.BROKEN_PIPE => WriteError.BrokenPipe, else => os.unexpectedErrorWindows(err), };