From da56959a9a7dd7b83a8d2bc6b1454ae546a48be6 Mon Sep 17 00:00:00 2001 From: Andrew Kelley Date: Thu, 8 Aug 2019 16:41:38 -0400 Subject: [PATCH] closer to std lib event stuff working --- std/event/channel.zig | 46 +- std/event/fs.zig | 1184 ++++++++++++++++++++--------------------- std/event/loop.zig | 66 ++- 3 files changed, 642 insertions(+), 654 deletions(-) diff --git a/std/event/channel.zig b/std/event/channel.zig index bb2fbbf126..c9686e37e9 100644 --- a/std/event/channel.zig +++ b/std/event/channel.zig @@ -89,12 +89,7 @@ pub fn Channel(comptime T: type) type { /// puts a data item in the channel. The promise completes when the value has been added to the /// buffer, or in the case of a zero size buffer, when the item has been retrieved by a getter. pub async fn put(self: *SelfChannel, data: T) void { - // TODO fix this workaround - suspend { - resume @handle(); - } - - var my_tick_node = Loop.NextTickNode.init(@handle()); + var my_tick_node = Loop.NextTickNode.init(@frame()); var queue_node = std.atomic.Queue(PutNode).Node.init(PutNode{ .tick_node = &my_tick_node, .data = data, @@ -122,15 +117,10 @@ pub fn Channel(comptime T: type) type { /// await this function to get an item from the channel. If the buffer is empty, the promise will /// complete when the next item is put in the channel. pub async fn get(self: *SelfChannel) T { - // TODO fix this workaround - suspend { - resume @handle(); - } - // TODO integrate this function with named return values // so we can get rid of this extra result copy var result: T = undefined; - var my_tick_node = Loop.NextTickNode.init(@handle()); + var my_tick_node = Loop.NextTickNode.init(@frame()); var queue_node = std.atomic.Queue(GetNode).Node.init(GetNode{ .tick_node = &my_tick_node, .data = GetNode.Data{ @@ -173,15 +163,10 @@ pub fn Channel(comptime T: type) type { /// Await is necessary for locking purposes. The function will be resumed after checking the channel /// for data and will not wait for data to be available. pub async fn getOrNull(self: *SelfChannel) ?T { - // TODO fix this workaround - suspend { - resume @handle(); - } - // TODO integrate this function with named return values // so we can get rid of this extra result copy var result: ?T = null; - var my_tick_node = Loop.NextTickNode.init(@handle()); + var my_tick_node = Loop.NextTickNode.init(@frame()); var or_null_node = std.atomic.Queue(*std.atomic.Queue(GetNode).Node).Node.init(undefined); var queue_node = std.atomic.Queue(GetNode).Node.init(GetNode{ .tick_node = &my_tick_node, @@ -334,41 +319,36 @@ test "std.event.Channel" { const channel = try Channel(i32).create(&loop, 0); defer channel.destroy(); - const handle = try async testChannelGetter(&loop, channel); - defer cancel handle; - - const putter = try async testChannelPutter(channel); - defer cancel putter; + const handle = async testChannelGetter(&loop, channel); + const putter = async testChannelPutter(channel); loop.run(); } async fn testChannelGetter(loop: *Loop, channel: *Channel(i32)) void { - errdefer @panic("test failed"); - - const value1_promise = try async channel.get(); + const value1_promise = async channel.get(); const value1 = await value1_promise; testing.expect(value1 == 1234); - const value2_promise = try async channel.get(); + const value2_promise = async channel.get(); const value2 = await value2_promise; testing.expect(value2 == 4567); - const value3_promise = try async channel.getOrNull(); + const value3_promise = async channel.getOrNull(); const value3 = await value3_promise; testing.expect(value3 == null); - const last_put = try async testPut(channel, 4444); - const value4 = await try async channel.getOrNull(); + const last_put = async testPut(channel, 4444); + const value4 = channel.getOrNull(); testing.expect(value4.? == 4444); await last_put; } async fn testChannelPutter(channel: *Channel(i32)) void { - await (async channel.put(1234) catch @panic("out of memory")); - await (async channel.put(4567) catch @panic("out of memory")); + channel.put(1234); + channel.put(4567); } async fn testPut(channel: *Channel(i32), value: i32) void { - await (async channel.put(value) catch @panic("out of memory")); + channel.put(value); } diff --git a/std/event/fs.zig b/std/event/fs.zig index 3ead77e949..22e9fc38c9 100644 --- a/std/event/fs.zig +++ b/std/event/fs.zig @@ -715,594 +715,594 @@ pub const WatchEventId = enum { Delete, }; -pub const WatchEventError = error{ - UserResourceLimitReached, - SystemResources, - AccessDenied, - Unexpected, // TODO remove this possibility -}; - -pub fn Watch(comptime V: type) type { - return struct { - channel: *event.Channel(Event.Error!Event), - os_data: OsData, - - const OsData = switch (builtin.os) { - .macosx, .freebsd, .netbsd => struct { - file_table: FileTable, - table_lock: event.Lock, - - const FileTable = std.AutoHashMap([]const u8, *Put); - const Put = struct { - putter: promise, - value_ptr: *V, - }; - }, - - .linux => LinuxOsData, - .windows => WindowsOsData, - - else => @compileError("Unsupported OS"), - }; - - const WindowsOsData = struct { - table_lock: event.Lock, - dir_table: DirTable, - all_putters: std.atomic.Queue(promise), - ref_count: std.atomic.Int(usize), - - const DirTable = std.AutoHashMap([]const u8, *Dir); - const FileTable = std.AutoHashMap([]const u16, V); - - const Dir = struct { - putter: promise, - file_table: FileTable, - table_lock: event.Lock, - }; - }; - - const LinuxOsData = struct { - putter: promise, - inotify_fd: i32, - wd_table: WdTable, - table_lock: event.Lock, - - const WdTable = std.AutoHashMap(i32, Dir); - const FileTable = std.AutoHashMap([]const u8, V); - - const Dir = struct { - dirname: []const u8, - file_table: FileTable, - }; - }; - - const FileToHandle = std.AutoHashMap([]const u8, promise); - - const Self = @This(); - - pub const Event = struct { - id: Id, - data: V, - - pub const Id = WatchEventId; - pub const Error = WatchEventError; - }; - - pub fn create(loop: *Loop, event_buf_count: usize) !*Self { - const channel = try event.Channel(Self.Event.Error!Self.Event).create(loop, event_buf_count); - errdefer channel.destroy(); - - switch (builtin.os) { - .linux => { - const inotify_fd = try os.inotify_init1(os.linux.IN_NONBLOCK | os.linux.IN_CLOEXEC); - errdefer os.close(inotify_fd); - - var result: *Self = undefined; - _ = try async linuxEventPutter(inotify_fd, channel, &result); - return result; - }, - - .windows => { - const self = try loop.allocator.create(Self); - errdefer loop.allocator.destroy(self); - self.* = Self{ - .channel = channel, - .os_data = OsData{ - .table_lock = event.Lock.init(loop), - .dir_table = OsData.DirTable.init(loop.allocator), - .ref_count = std.atomic.Int(usize).init(1), - .all_putters = std.atomic.Queue(promise).init(), - }, - }; - return self; - }, - - .macosx, .freebsd, .netbsd => { - const self = try loop.allocator.create(Self); - errdefer loop.allocator.destroy(self); - - self.* = Self{ - .channel = channel, - .os_data = OsData{ - .table_lock = event.Lock.init(loop), - .file_table = OsData.FileTable.init(loop.allocator), - }, - }; - return self; - }, - else => @compileError("Unsupported OS"), - } - } - - /// All addFile calls and removeFile calls must have completed. - pub fn destroy(self: *Self) void { - switch (builtin.os) { - .macosx, .freebsd, .netbsd => { - // TODO we need to cancel the coroutines before destroying the lock - self.os_data.table_lock.deinit(); - var it = self.os_data.file_table.iterator(); - while (it.next()) |entry| { - cancel entry.value.putter; - self.channel.loop.allocator.free(entry.key); - } - self.channel.destroy(); - }, - .linux => cancel self.os_data.putter, - .windows => { - while (self.os_data.all_putters.get()) |putter_node| { - cancel putter_node.data; - } - self.deref(); - }, - else => @compileError("Unsupported OS"), - } - } - - fn ref(self: *Self) void { - _ = self.os_data.ref_count.incr(); - } - - fn deref(self: *Self) void { - if (self.os_data.ref_count.decr() == 1) { - const allocator = self.channel.loop.allocator; - self.os_data.table_lock.deinit(); - var it = self.os_data.dir_table.iterator(); - while (it.next()) |entry| { - allocator.free(entry.key); - allocator.destroy(entry.value); - } - self.os_data.dir_table.deinit(); - self.channel.destroy(); - allocator.destroy(self); - } - } - - pub async fn addFile(self: *Self, file_path: []const u8, value: V) !?V { - switch (builtin.os) { - .macosx, .freebsd, .netbsd => return await (async addFileKEvent(self, file_path, value) catch unreachable), - .linux => return await (async addFileLinux(self, file_path, value) catch unreachable), - .windows => return await (async addFileWindows(self, file_path, value) catch unreachable), - else => @compileError("Unsupported OS"), - } - } - - async fn addFileKEvent(self: *Self, file_path: []const u8, value: V) !?V { - const resolved_path = try std.fs.path.resolve(self.channel.loop.allocator, [_][]const u8{file_path}); - var resolved_path_consumed = false; - defer if (!resolved_path_consumed) self.channel.loop.allocator.free(resolved_path); - - var close_op = try CloseOperation.start(self.channel.loop); - var close_op_consumed = false; - defer if (!close_op_consumed) close_op.finish(); - - const flags = if (os.darwin.is_the_target) os.O_SYMLINK | os.O_EVTONLY else 0; - const mode = 0; - const fd = try await (async openPosix(self.channel.loop, resolved_path, flags, mode) catch unreachable); - close_op.setHandle(fd); - - var put_data: *OsData.Put = undefined; - const putter = try async self.kqPutEvents(close_op, value, &put_data); - close_op_consumed = true; - errdefer cancel putter; - - const result = blk: { - const held = await (async self.os_data.table_lock.acquire() catch unreachable); - defer held.release(); - - const gop = try self.os_data.file_table.getOrPut(resolved_path); - if (gop.found_existing) { - const prev_value = gop.kv.value.value_ptr.*; - cancel gop.kv.value.putter; - gop.kv.value = put_data; - break :blk prev_value; - } else { - resolved_path_consumed = true; - gop.kv.value = put_data; - break :blk null; - } - }; - - return result; - } - - async fn kqPutEvents(self: *Self, close_op: *CloseOperation, value: V, out_put: **OsData.Put) void { - // TODO https://github.com/ziglang/zig/issues/1194 - suspend { - resume @handle(); - } - - var value_copy = value; - var put = OsData.Put{ - .putter = @handle(), - .value_ptr = &value_copy, - }; - out_put.* = &put; - self.channel.loop.beginOneEvent(); - - defer { - close_op.finish(); - self.channel.loop.finishOneEvent(); - } - - while (true) { - if (await (async self.channel.loop.bsdWaitKev( - @intCast(usize, close_op.getHandle()), - os.EVFILT_VNODE, - os.NOTE_WRITE | os.NOTE_DELETE, - ) catch unreachable)) |kev| { - // TODO handle EV_ERROR - if (kev.fflags & os.NOTE_DELETE != 0) { - await (async self.channel.put(Self.Event{ - .id = Event.Id.Delete, - .data = value_copy, - }) catch unreachable); - } else if (kev.fflags & os.NOTE_WRITE != 0) { - await (async self.channel.put(Self.Event{ - .id = Event.Id.CloseWrite, - .data = value_copy, - }) catch unreachable); - } - } else |err| switch (err) { - error.EventNotFound => unreachable, - error.ProcessNotFound => unreachable, - error.Overflow => unreachable, - error.AccessDenied, error.SystemResources => |casted_err| { - await (async self.channel.put(casted_err) catch unreachable); - }, - } - } - } - - async fn addFileLinux(self: *Self, file_path: []const u8, value: V) !?V { - const value_copy = value; - - const dirname = std.fs.path.dirname(file_path) orelse "."; - const dirname_with_null = try std.cstr.addNullByte(self.channel.loop.allocator, dirname); - var dirname_with_null_consumed = false; - defer if (!dirname_with_null_consumed) self.channel.loop.allocator.free(dirname_with_null); - - const basename = std.fs.path.basename(file_path); - const basename_with_null = try std.cstr.addNullByte(self.channel.loop.allocator, basename); - var basename_with_null_consumed = false; - defer if (!basename_with_null_consumed) self.channel.loop.allocator.free(basename_with_null); - - const wd = try os.inotify_add_watchC( - self.os_data.inotify_fd, - dirname_with_null.ptr, - os.linux.IN_CLOSE_WRITE | os.linux.IN_ONLYDIR | os.linux.IN_EXCL_UNLINK, - ); - // wd is either a newly created watch or an existing one. - - const held = await (async self.os_data.table_lock.acquire() catch unreachable); - defer held.release(); - - const gop = try self.os_data.wd_table.getOrPut(wd); - if (!gop.found_existing) { - gop.kv.value = OsData.Dir{ - .dirname = dirname_with_null, - .file_table = OsData.FileTable.init(self.channel.loop.allocator), - }; - dirname_with_null_consumed = true; - } - const dir = &gop.kv.value; - - const file_table_gop = try dir.file_table.getOrPut(basename_with_null); - if (file_table_gop.found_existing) { - const prev_value = file_table_gop.kv.value; - file_table_gop.kv.value = value_copy; - return prev_value; - } else { - file_table_gop.kv.value = value_copy; - basename_with_null_consumed = true; - return null; - } - } - - async fn addFileWindows(self: *Self, file_path: []const u8, value: V) !?V { - const value_copy = value; - // TODO we might need to convert dirname and basename to canonical file paths ("short"?) - - const dirname = try std.mem.dupe(self.channel.loop.allocator, u8, std.fs.path.dirname(file_path) orelse "."); - var dirname_consumed = false; - defer if (!dirname_consumed) self.channel.loop.allocator.free(dirname); - - const dirname_utf16le = try std.unicode.utf8ToUtf16LeWithNull(self.channel.loop.allocator, dirname); - defer self.channel.loop.allocator.free(dirname_utf16le); - - // TODO https://github.com/ziglang/zig/issues/265 - const basename = std.fs.path.basename(file_path); - const basename_utf16le_null = try std.unicode.utf8ToUtf16LeWithNull(self.channel.loop.allocator, basename); - var basename_utf16le_null_consumed = false; - defer if (!basename_utf16le_null_consumed) self.channel.loop.allocator.free(basename_utf16le_null); - const basename_utf16le_no_null = basename_utf16le_null[0 .. basename_utf16le_null.len - 1]; - - const dir_handle = try windows.CreateFileW( - dirname_utf16le.ptr, - windows.FILE_LIST_DIRECTORY, - windows.FILE_SHARE_READ | windows.FILE_SHARE_DELETE | windows.FILE_SHARE_WRITE, - null, - windows.OPEN_EXISTING, - windows.FILE_FLAG_BACKUP_SEMANTICS | windows.FILE_FLAG_OVERLAPPED, - null, - ); - var dir_handle_consumed = false; - defer if (!dir_handle_consumed) windows.CloseHandle(dir_handle); - - const held = await (async self.os_data.table_lock.acquire() catch unreachable); - defer held.release(); - - const gop = try self.os_data.dir_table.getOrPut(dirname); - if (gop.found_existing) { - const dir = gop.kv.value; - const held_dir_lock = await (async dir.table_lock.acquire() catch unreachable); - defer held_dir_lock.release(); - - const file_gop = try dir.file_table.getOrPut(basename_utf16le_no_null); - if (file_gop.found_existing) { - const prev_value = file_gop.kv.value; - file_gop.kv.value = value_copy; - return prev_value; - } else { - file_gop.kv.value = value_copy; - basename_utf16le_null_consumed = true; - return null; - } - } else { - errdefer _ = self.os_data.dir_table.remove(dirname); - const dir = try self.channel.loop.allocator.create(OsData.Dir); - errdefer self.channel.loop.allocator.destroy(dir); - - dir.* = OsData.Dir{ - .file_table = OsData.FileTable.init(self.channel.loop.allocator), - .table_lock = event.Lock.init(self.channel.loop), - .putter = undefined, - }; - gop.kv.value = dir; - assert((try dir.file_table.put(basename_utf16le_no_null, value_copy)) == null); - basename_utf16le_null_consumed = true; - - dir.putter = try async self.windowsDirReader(dir_handle, dir); - dir_handle_consumed = true; - - dirname_consumed = true; - - return null; - } - } - - async fn windowsDirReader(self: *Self, dir_handle: windows.HANDLE, dir: *OsData.Dir) void { - // TODO https://github.com/ziglang/zig/issues/1194 - suspend { - resume @handle(); - } - - self.ref(); - defer self.deref(); - - defer os.close(dir_handle); - - var putter_node = std.atomic.Queue(promise).Node{ - .data = @handle(), - .prev = null, - .next = null, - }; - self.os_data.all_putters.put(&putter_node); - defer _ = self.os_data.all_putters.remove(&putter_node); - - var resume_node = Loop.ResumeNode.Basic{ - .base = Loop.ResumeNode{ - .id = Loop.ResumeNode.Id.Basic, - .handle = @handle(), - .overlapped = windows.OVERLAPPED{ - .Internal = 0, - .InternalHigh = 0, - .Offset = 0, - .OffsetHigh = 0, - .hEvent = null, - }, - }, - }; - var event_buf: [4096]u8 align(@alignOf(windows.FILE_NOTIFY_INFORMATION)) = undefined; - - // TODO handle this error not in the channel but in the setup - _ = windows.CreateIoCompletionPort( - dir_handle, - self.channel.loop.os_data.io_port, - undefined, - undefined, - ) catch |err| { - await (async self.channel.put(err) catch unreachable); - return; - }; - - while (true) { - { - // TODO only 1 beginOneEvent for the whole coroutine - self.channel.loop.beginOneEvent(); - errdefer self.channel.loop.finishOneEvent(); - errdefer { - _ = windows.kernel32.CancelIoEx(dir_handle, &resume_node.base.overlapped); - } - suspend { - _ = windows.kernel32.ReadDirectoryChangesW( - dir_handle, - &event_buf, - @intCast(windows.DWORD, event_buf.len), - windows.FALSE, // watch subtree - windows.FILE_NOTIFY_CHANGE_FILE_NAME | windows.FILE_NOTIFY_CHANGE_DIR_NAME | - windows.FILE_NOTIFY_CHANGE_ATTRIBUTES | windows.FILE_NOTIFY_CHANGE_SIZE | - windows.FILE_NOTIFY_CHANGE_LAST_WRITE | windows.FILE_NOTIFY_CHANGE_LAST_ACCESS | - windows.FILE_NOTIFY_CHANGE_CREATION | windows.FILE_NOTIFY_CHANGE_SECURITY, - null, // number of bytes transferred (unused for async) - &resume_node.base.overlapped, - null, // completion routine - unused because we use IOCP - ); - } - } - var bytes_transferred: windows.DWORD = undefined; - if (windows.kernel32.GetOverlappedResult(dir_handle, &resume_node.base.overlapped, &bytes_transferred, windows.FALSE) == 0) { - const err = switch (windows.kernel32.GetLastError()) { - else => |err| windows.unexpectedError(err), - }; - await (async self.channel.put(err) catch unreachable); - } else { - // can't use @bytesToSlice because of the special variable length name field - var ptr = event_buf[0..].ptr; - const end_ptr = ptr + bytes_transferred; - var ev: *windows.FILE_NOTIFY_INFORMATION = undefined; - while (@ptrToInt(ptr) < @ptrToInt(end_ptr)) : (ptr += ev.NextEntryOffset) { - ev = @ptrCast(*windows.FILE_NOTIFY_INFORMATION, ptr); - const emit = switch (ev.Action) { - windows.FILE_ACTION_REMOVED => WatchEventId.Delete, - windows.FILE_ACTION_MODIFIED => WatchEventId.CloseWrite, - else => null, - }; - if (emit) |id| { - const basename_utf16le = ([*]u16)(&ev.FileName)[0 .. ev.FileNameLength / 2]; - const user_value = blk: { - const held = await (async dir.table_lock.acquire() catch unreachable); - defer held.release(); - - if (dir.file_table.get(basename_utf16le)) |entry| { - break :blk entry.value; - } else { - break :blk null; - } - }; - if (user_value) |v| { - await (async self.channel.put(Event{ - .id = id, - .data = v, - }) catch unreachable); - } - } - if (ev.NextEntryOffset == 0) break; - } - } - } - } - - pub async fn removeFile(self: *Self, file_path: []const u8) ?V { - @panic("TODO"); - } - - async fn linuxEventPutter(inotify_fd: i32, channel: *event.Channel(Event.Error!Event), out_watch: **Self) void { - // TODO https://github.com/ziglang/zig/issues/1194 - suspend { - resume @handle(); - } - - const loop = channel.loop; - - var watch = Self{ - .channel = channel, - .os_data = OsData{ - .putter = @handle(), - .inotify_fd = inotify_fd, - .wd_table = OsData.WdTable.init(loop.allocator), - .table_lock = event.Lock.init(loop), - }, - }; - out_watch.* = &watch; - - loop.beginOneEvent(); - - defer { - watch.os_data.table_lock.deinit(); - var wd_it = watch.os_data.wd_table.iterator(); - while (wd_it.next()) |wd_entry| { - var file_it = wd_entry.value.file_table.iterator(); - while (file_it.next()) |file_entry| { - loop.allocator.free(file_entry.key); - } - loop.allocator.free(wd_entry.value.dirname); - } - loop.finishOneEvent(); - os.close(inotify_fd); - channel.destroy(); - } - - var event_buf: [4096]u8 align(@alignOf(os.linux.inotify_event)) = undefined; - - while (true) { - const rc = os.linux.read(inotify_fd, &event_buf, event_buf.len); - const errno = os.linux.getErrno(rc); - switch (errno) { - 0 => { - // can't use @bytesToSlice because of the special variable length name field - var ptr = event_buf[0..].ptr; - const end_ptr = ptr + event_buf.len; - var ev: *os.linux.inotify_event = undefined; - while (@ptrToInt(ptr) < @ptrToInt(end_ptr)) : (ptr += @sizeOf(os.linux.inotify_event) + ev.len) { - ev = @ptrCast(*os.linux.inotify_event, ptr); - if (ev.mask & os.linux.IN_CLOSE_WRITE == os.linux.IN_CLOSE_WRITE) { - const basename_ptr = ptr + @sizeOf(os.linux.inotify_event); - const basename_with_null = basename_ptr[0 .. std.mem.len(u8, basename_ptr) + 1]; - const user_value = blk: { - const held = await (async watch.os_data.table_lock.acquire() catch unreachable); - defer held.release(); - - const dir = &watch.os_data.wd_table.get(ev.wd).?.value; - if (dir.file_table.get(basename_with_null)) |entry| { - break :blk entry.value; - } else { - break :blk null; - } - }; - if (user_value) |v| { - await (async channel.put(Event{ - .id = WatchEventId.CloseWrite, - .data = v, - }) catch unreachable); - } - } - } - }, - os.linux.EINTR => continue, - os.linux.EINVAL => unreachable, - os.linux.EFAULT => unreachable, - os.linux.EAGAIN => { - (await (async loop.linuxWaitFd( - inotify_fd, - os.linux.EPOLLET | os.linux.EPOLLIN, - ) catch unreachable)) catch |err| { - const transformed_err = switch (err) { - error.FileDescriptorAlreadyPresentInSet => unreachable, - error.OperationCausesCircularLoop => unreachable, - error.FileDescriptorNotRegistered => unreachable, - error.FileDescriptorIncompatibleWithEpoll => unreachable, - error.Unexpected => unreachable, - else => |e| e, - }; - await (async channel.put(transformed_err) catch unreachable); - }; - }, - else => unreachable, - } - } - } - }; -} +//pub const WatchEventError = error{ +// UserResourceLimitReached, +// SystemResources, +// AccessDenied, +// Unexpected, // TODO remove this possibility +//}; +// +//pub fn Watch(comptime V: type) type { +// return struct { +// channel: *event.Channel(Event.Error!Event), +// os_data: OsData, +// +// const OsData = switch (builtin.os) { +// .macosx, .freebsd, .netbsd => struct { +// file_table: FileTable, +// table_lock: event.Lock, +// +// const FileTable = std.AutoHashMap([]const u8, *Put); +// const Put = struct { +// putter: promise, +// value_ptr: *V, +// }; +// }, +// +// .linux => LinuxOsData, +// .windows => WindowsOsData, +// +// else => @compileError("Unsupported OS"), +// }; +// +// const WindowsOsData = struct { +// table_lock: event.Lock, +// dir_table: DirTable, +// all_putters: std.atomic.Queue(promise), +// ref_count: std.atomic.Int(usize), +// +// const DirTable = std.AutoHashMap([]const u8, *Dir); +// const FileTable = std.AutoHashMap([]const u16, V); +// +// const Dir = struct { +// putter: promise, +// file_table: FileTable, +// table_lock: event.Lock, +// }; +// }; +// +// const LinuxOsData = struct { +// putter: promise, +// inotify_fd: i32, +// wd_table: WdTable, +// table_lock: event.Lock, +// +// const WdTable = std.AutoHashMap(i32, Dir); +// const FileTable = std.AutoHashMap([]const u8, V); +// +// const Dir = struct { +// dirname: []const u8, +// file_table: FileTable, +// }; +// }; +// +// const FileToHandle = std.AutoHashMap([]const u8, promise); +// +// const Self = @This(); +// +// pub const Event = struct { +// id: Id, +// data: V, +// +// pub const Id = WatchEventId; +// pub const Error = WatchEventError; +// }; +// +// pub fn create(loop: *Loop, event_buf_count: usize) !*Self { +// const channel = try event.Channel(Self.Event.Error!Self.Event).create(loop, event_buf_count); +// errdefer channel.destroy(); +// +// switch (builtin.os) { +// .linux => { +// const inotify_fd = try os.inotify_init1(os.linux.IN_NONBLOCK | os.linux.IN_CLOEXEC); +// errdefer os.close(inotify_fd); +// +// var result: *Self = undefined; +// _ = try async linuxEventPutter(inotify_fd, channel, &result); +// return result; +// }, +// +// .windows => { +// const self = try loop.allocator.create(Self); +// errdefer loop.allocator.destroy(self); +// self.* = Self{ +// .channel = channel, +// .os_data = OsData{ +// .table_lock = event.Lock.init(loop), +// .dir_table = OsData.DirTable.init(loop.allocator), +// .ref_count = std.atomic.Int(usize).init(1), +// .all_putters = std.atomic.Queue(promise).init(), +// }, +// }; +// return self; +// }, +// +// .macosx, .freebsd, .netbsd => { +// const self = try loop.allocator.create(Self); +// errdefer loop.allocator.destroy(self); +// +// self.* = Self{ +// .channel = channel, +// .os_data = OsData{ +// .table_lock = event.Lock.init(loop), +// .file_table = OsData.FileTable.init(loop.allocator), +// }, +// }; +// return self; +// }, +// else => @compileError("Unsupported OS"), +// } +// } +// +// /// All addFile calls and removeFile calls must have completed. +// pub fn destroy(self: *Self) void { +// switch (builtin.os) { +// .macosx, .freebsd, .netbsd => { +// // TODO we need to cancel the coroutines before destroying the lock +// self.os_data.table_lock.deinit(); +// var it = self.os_data.file_table.iterator(); +// while (it.next()) |entry| { +// cancel entry.value.putter; +// self.channel.loop.allocator.free(entry.key); +// } +// self.channel.destroy(); +// }, +// .linux => cancel self.os_data.putter, +// .windows => { +// while (self.os_data.all_putters.get()) |putter_node| { +// cancel putter_node.data; +// } +// self.deref(); +// }, +// else => @compileError("Unsupported OS"), +// } +// } +// +// fn ref(self: *Self) void { +// _ = self.os_data.ref_count.incr(); +// } +// +// fn deref(self: *Self) void { +// if (self.os_data.ref_count.decr() == 1) { +// const allocator = self.channel.loop.allocator; +// self.os_data.table_lock.deinit(); +// var it = self.os_data.dir_table.iterator(); +// while (it.next()) |entry| { +// allocator.free(entry.key); +// allocator.destroy(entry.value); +// } +// self.os_data.dir_table.deinit(); +// self.channel.destroy(); +// allocator.destroy(self); +// } +// } +// +// pub async fn addFile(self: *Self, file_path: []const u8, value: V) !?V { +// switch (builtin.os) { +// .macosx, .freebsd, .netbsd => return await (async addFileKEvent(self, file_path, value) catch unreachable), +// .linux => return await (async addFileLinux(self, file_path, value) catch unreachable), +// .windows => return await (async addFileWindows(self, file_path, value) catch unreachable), +// else => @compileError("Unsupported OS"), +// } +// } +// +// async fn addFileKEvent(self: *Self, file_path: []const u8, value: V) !?V { +// const resolved_path = try std.fs.path.resolve(self.channel.loop.allocator, [_][]const u8{file_path}); +// var resolved_path_consumed = false; +// defer if (!resolved_path_consumed) self.channel.loop.allocator.free(resolved_path); +// +// var close_op = try CloseOperation.start(self.channel.loop); +// var close_op_consumed = false; +// defer if (!close_op_consumed) close_op.finish(); +// +// const flags = if (os.darwin.is_the_target) os.O_SYMLINK | os.O_EVTONLY else 0; +// const mode = 0; +// const fd = try await (async openPosix(self.channel.loop, resolved_path, flags, mode) catch unreachable); +// close_op.setHandle(fd); +// +// var put_data: *OsData.Put = undefined; +// const putter = try async self.kqPutEvents(close_op, value, &put_data); +// close_op_consumed = true; +// errdefer cancel putter; +// +// const result = blk: { +// const held = await (async self.os_data.table_lock.acquire() catch unreachable); +// defer held.release(); +// +// const gop = try self.os_data.file_table.getOrPut(resolved_path); +// if (gop.found_existing) { +// const prev_value = gop.kv.value.value_ptr.*; +// cancel gop.kv.value.putter; +// gop.kv.value = put_data; +// break :blk prev_value; +// } else { +// resolved_path_consumed = true; +// gop.kv.value = put_data; +// break :blk null; +// } +// }; +// +// return result; +// } +// +// async fn kqPutEvents(self: *Self, close_op: *CloseOperation, value: V, out_put: **OsData.Put) void { +// // TODO https://github.com/ziglang/zig/issues/1194 +// suspend { +// resume @handle(); +// } +// +// var value_copy = value; +// var put = OsData.Put{ +// .putter = @handle(), +// .value_ptr = &value_copy, +// }; +// out_put.* = &put; +// self.channel.loop.beginOneEvent(); +// +// defer { +// close_op.finish(); +// self.channel.loop.finishOneEvent(); +// } +// +// while (true) { +// if (await (async self.channel.loop.bsdWaitKev( +// @intCast(usize, close_op.getHandle()), +// os.EVFILT_VNODE, +// os.NOTE_WRITE | os.NOTE_DELETE, +// ) catch unreachable)) |kev| { +// // TODO handle EV_ERROR +// if (kev.fflags & os.NOTE_DELETE != 0) { +// await (async self.channel.put(Self.Event{ +// .id = Event.Id.Delete, +// .data = value_copy, +// }) catch unreachable); +// } else if (kev.fflags & os.NOTE_WRITE != 0) { +// await (async self.channel.put(Self.Event{ +// .id = Event.Id.CloseWrite, +// .data = value_copy, +// }) catch unreachable); +// } +// } else |err| switch (err) { +// error.EventNotFound => unreachable, +// error.ProcessNotFound => unreachable, +// error.Overflow => unreachable, +// error.AccessDenied, error.SystemResources => |casted_err| { +// await (async self.channel.put(casted_err) catch unreachable); +// }, +// } +// } +// } +// +// async fn addFileLinux(self: *Self, file_path: []const u8, value: V) !?V { +// const value_copy = value; +// +// const dirname = std.fs.path.dirname(file_path) orelse "."; +// const dirname_with_null = try std.cstr.addNullByte(self.channel.loop.allocator, dirname); +// var dirname_with_null_consumed = false; +// defer if (!dirname_with_null_consumed) self.channel.loop.allocator.free(dirname_with_null); +// +// const basename = std.fs.path.basename(file_path); +// const basename_with_null = try std.cstr.addNullByte(self.channel.loop.allocator, basename); +// var basename_with_null_consumed = false; +// defer if (!basename_with_null_consumed) self.channel.loop.allocator.free(basename_with_null); +// +// const wd = try os.inotify_add_watchC( +// self.os_data.inotify_fd, +// dirname_with_null.ptr, +// os.linux.IN_CLOSE_WRITE | os.linux.IN_ONLYDIR | os.linux.IN_EXCL_UNLINK, +// ); +// // wd is either a newly created watch or an existing one. +// +// const held = await (async self.os_data.table_lock.acquire() catch unreachable); +// defer held.release(); +// +// const gop = try self.os_data.wd_table.getOrPut(wd); +// if (!gop.found_existing) { +// gop.kv.value = OsData.Dir{ +// .dirname = dirname_with_null, +// .file_table = OsData.FileTable.init(self.channel.loop.allocator), +// }; +// dirname_with_null_consumed = true; +// } +// const dir = &gop.kv.value; +// +// const file_table_gop = try dir.file_table.getOrPut(basename_with_null); +// if (file_table_gop.found_existing) { +// const prev_value = file_table_gop.kv.value; +// file_table_gop.kv.value = value_copy; +// return prev_value; +// } else { +// file_table_gop.kv.value = value_copy; +// basename_with_null_consumed = true; +// return null; +// } +// } +// +// async fn addFileWindows(self: *Self, file_path: []const u8, value: V) !?V { +// const value_copy = value; +// // TODO we might need to convert dirname and basename to canonical file paths ("short"?) +// +// const dirname = try std.mem.dupe(self.channel.loop.allocator, u8, std.fs.path.dirname(file_path) orelse "."); +// var dirname_consumed = false; +// defer if (!dirname_consumed) self.channel.loop.allocator.free(dirname); +// +// const dirname_utf16le = try std.unicode.utf8ToUtf16LeWithNull(self.channel.loop.allocator, dirname); +// defer self.channel.loop.allocator.free(dirname_utf16le); +// +// // TODO https://github.com/ziglang/zig/issues/265 +// const basename = std.fs.path.basename(file_path); +// const basename_utf16le_null = try std.unicode.utf8ToUtf16LeWithNull(self.channel.loop.allocator, basename); +// var basename_utf16le_null_consumed = false; +// defer if (!basename_utf16le_null_consumed) self.channel.loop.allocator.free(basename_utf16le_null); +// const basename_utf16le_no_null = basename_utf16le_null[0 .. basename_utf16le_null.len - 1]; +// +// const dir_handle = try windows.CreateFileW( +// dirname_utf16le.ptr, +// windows.FILE_LIST_DIRECTORY, +// windows.FILE_SHARE_READ | windows.FILE_SHARE_DELETE | windows.FILE_SHARE_WRITE, +// null, +// windows.OPEN_EXISTING, +// windows.FILE_FLAG_BACKUP_SEMANTICS | windows.FILE_FLAG_OVERLAPPED, +// null, +// ); +// var dir_handle_consumed = false; +// defer if (!dir_handle_consumed) windows.CloseHandle(dir_handle); +// +// const held = await (async self.os_data.table_lock.acquire() catch unreachable); +// defer held.release(); +// +// const gop = try self.os_data.dir_table.getOrPut(dirname); +// if (gop.found_existing) { +// const dir = gop.kv.value; +// const held_dir_lock = await (async dir.table_lock.acquire() catch unreachable); +// defer held_dir_lock.release(); +// +// const file_gop = try dir.file_table.getOrPut(basename_utf16le_no_null); +// if (file_gop.found_existing) { +// const prev_value = file_gop.kv.value; +// file_gop.kv.value = value_copy; +// return prev_value; +// } else { +// file_gop.kv.value = value_copy; +// basename_utf16le_null_consumed = true; +// return null; +// } +// } else { +// errdefer _ = self.os_data.dir_table.remove(dirname); +// const dir = try self.channel.loop.allocator.create(OsData.Dir); +// errdefer self.channel.loop.allocator.destroy(dir); +// +// dir.* = OsData.Dir{ +// .file_table = OsData.FileTable.init(self.channel.loop.allocator), +// .table_lock = event.Lock.init(self.channel.loop), +// .putter = undefined, +// }; +// gop.kv.value = dir; +// assert((try dir.file_table.put(basename_utf16le_no_null, value_copy)) == null); +// basename_utf16le_null_consumed = true; +// +// dir.putter = try async self.windowsDirReader(dir_handle, dir); +// dir_handle_consumed = true; +// +// dirname_consumed = true; +// +// return null; +// } +// } +// +// async fn windowsDirReader(self: *Self, dir_handle: windows.HANDLE, dir: *OsData.Dir) void { +// // TODO https://github.com/ziglang/zig/issues/1194 +// suspend { +// resume @handle(); +// } +// +// self.ref(); +// defer self.deref(); +// +// defer os.close(dir_handle); +// +// var putter_node = std.atomic.Queue(promise).Node{ +// .data = @handle(), +// .prev = null, +// .next = null, +// }; +// self.os_data.all_putters.put(&putter_node); +// defer _ = self.os_data.all_putters.remove(&putter_node); +// +// var resume_node = Loop.ResumeNode.Basic{ +// .base = Loop.ResumeNode{ +// .id = Loop.ResumeNode.Id.Basic, +// .handle = @handle(), +// .overlapped = windows.OVERLAPPED{ +// .Internal = 0, +// .InternalHigh = 0, +// .Offset = 0, +// .OffsetHigh = 0, +// .hEvent = null, +// }, +// }, +// }; +// var event_buf: [4096]u8 align(@alignOf(windows.FILE_NOTIFY_INFORMATION)) = undefined; +// +// // TODO handle this error not in the channel but in the setup +// _ = windows.CreateIoCompletionPort( +// dir_handle, +// self.channel.loop.os_data.io_port, +// undefined, +// undefined, +// ) catch |err| { +// await (async self.channel.put(err) catch unreachable); +// return; +// }; +// +// while (true) { +// { +// // TODO only 1 beginOneEvent for the whole coroutine +// self.channel.loop.beginOneEvent(); +// errdefer self.channel.loop.finishOneEvent(); +// errdefer { +// _ = windows.kernel32.CancelIoEx(dir_handle, &resume_node.base.overlapped); +// } +// suspend { +// _ = windows.kernel32.ReadDirectoryChangesW( +// dir_handle, +// &event_buf, +// @intCast(windows.DWORD, event_buf.len), +// windows.FALSE, // watch subtree +// windows.FILE_NOTIFY_CHANGE_FILE_NAME | windows.FILE_NOTIFY_CHANGE_DIR_NAME | +// windows.FILE_NOTIFY_CHANGE_ATTRIBUTES | windows.FILE_NOTIFY_CHANGE_SIZE | +// windows.FILE_NOTIFY_CHANGE_LAST_WRITE | windows.FILE_NOTIFY_CHANGE_LAST_ACCESS | +// windows.FILE_NOTIFY_CHANGE_CREATION | windows.FILE_NOTIFY_CHANGE_SECURITY, +// null, // number of bytes transferred (unused for async) +// &resume_node.base.overlapped, +// null, // completion routine - unused because we use IOCP +// ); +// } +// } +// var bytes_transferred: windows.DWORD = undefined; +// if (windows.kernel32.GetOverlappedResult(dir_handle, &resume_node.base.overlapped, &bytes_transferred, windows.FALSE) == 0) { +// const err = switch (windows.kernel32.GetLastError()) { +// else => |err| windows.unexpectedError(err), +// }; +// await (async self.channel.put(err) catch unreachable); +// } else { +// // can't use @bytesToSlice because of the special variable length name field +// var ptr = event_buf[0..].ptr; +// const end_ptr = ptr + bytes_transferred; +// var ev: *windows.FILE_NOTIFY_INFORMATION = undefined; +// while (@ptrToInt(ptr) < @ptrToInt(end_ptr)) : (ptr += ev.NextEntryOffset) { +// ev = @ptrCast(*windows.FILE_NOTIFY_INFORMATION, ptr); +// const emit = switch (ev.Action) { +// windows.FILE_ACTION_REMOVED => WatchEventId.Delete, +// windows.FILE_ACTION_MODIFIED => WatchEventId.CloseWrite, +// else => null, +// }; +// if (emit) |id| { +// const basename_utf16le = ([*]u16)(&ev.FileName)[0 .. ev.FileNameLength / 2]; +// const user_value = blk: { +// const held = await (async dir.table_lock.acquire() catch unreachable); +// defer held.release(); +// +// if (dir.file_table.get(basename_utf16le)) |entry| { +// break :blk entry.value; +// } else { +// break :blk null; +// } +// }; +// if (user_value) |v| { +// await (async self.channel.put(Event{ +// .id = id, +// .data = v, +// }) catch unreachable); +// } +// } +// if (ev.NextEntryOffset == 0) break; +// } +// } +// } +// } +// +// pub async fn removeFile(self: *Self, file_path: []const u8) ?V { +// @panic("TODO"); +// } +// +// async fn linuxEventPutter(inotify_fd: i32, channel: *event.Channel(Event.Error!Event), out_watch: **Self) void { +// // TODO https://github.com/ziglang/zig/issues/1194 +// suspend { +// resume @handle(); +// } +// +// const loop = channel.loop; +// +// var watch = Self{ +// .channel = channel, +// .os_data = OsData{ +// .putter = @handle(), +// .inotify_fd = inotify_fd, +// .wd_table = OsData.WdTable.init(loop.allocator), +// .table_lock = event.Lock.init(loop), +// }, +// }; +// out_watch.* = &watch; +// +// loop.beginOneEvent(); +// +// defer { +// watch.os_data.table_lock.deinit(); +// var wd_it = watch.os_data.wd_table.iterator(); +// while (wd_it.next()) |wd_entry| { +// var file_it = wd_entry.value.file_table.iterator(); +// while (file_it.next()) |file_entry| { +// loop.allocator.free(file_entry.key); +// } +// loop.allocator.free(wd_entry.value.dirname); +// } +// loop.finishOneEvent(); +// os.close(inotify_fd); +// channel.destroy(); +// } +// +// var event_buf: [4096]u8 align(@alignOf(os.linux.inotify_event)) = undefined; +// +// while (true) { +// const rc = os.linux.read(inotify_fd, &event_buf, event_buf.len); +// const errno = os.linux.getErrno(rc); +// switch (errno) { +// 0 => { +// // can't use @bytesToSlice because of the special variable length name field +// var ptr = event_buf[0..].ptr; +// const end_ptr = ptr + event_buf.len; +// var ev: *os.linux.inotify_event = undefined; +// while (@ptrToInt(ptr) < @ptrToInt(end_ptr)) : (ptr += @sizeOf(os.linux.inotify_event) + ev.len) { +// ev = @ptrCast(*os.linux.inotify_event, ptr); +// if (ev.mask & os.linux.IN_CLOSE_WRITE == os.linux.IN_CLOSE_WRITE) { +// const basename_ptr = ptr + @sizeOf(os.linux.inotify_event); +// const basename_with_null = basename_ptr[0 .. std.mem.len(u8, basename_ptr) + 1]; +// const user_value = blk: { +// const held = await (async watch.os_data.table_lock.acquire() catch unreachable); +// defer held.release(); +// +// const dir = &watch.os_data.wd_table.get(ev.wd).?.value; +// if (dir.file_table.get(basename_with_null)) |entry| { +// break :blk entry.value; +// } else { +// break :blk null; +// } +// }; +// if (user_value) |v| { +// await (async channel.put(Event{ +// .id = WatchEventId.CloseWrite, +// .data = v, +// }) catch unreachable); +// } +// } +// } +// }, +// os.linux.EINTR => continue, +// os.linux.EINVAL => unreachable, +// os.linux.EFAULT => unreachable, +// os.linux.EAGAIN => { +// (await (async loop.linuxWaitFd( +// inotify_fd, +// os.linux.EPOLLET | os.linux.EPOLLIN, +// ) catch unreachable)) catch |err| { +// const transformed_err = switch (err) { +// error.FileDescriptorAlreadyPresentInSet => unreachable, +// error.OperationCausesCircularLoop => unreachable, +// error.FileDescriptorNotRegistered => unreachable, +// error.FileDescriptorIncompatibleWithEpoll => unreachable, +// error.Unexpected => unreachable, +// else => |e| e, +// }; +// await (async channel.put(transformed_err) catch unreachable); +// }; +// }, +// else => unreachable, +// } +// } +// } +// }; +//} const test_tmp_dir = "std_event_fs_test"; @@ -1397,11 +1397,11 @@ pub const OutStream = struct { }; } - async<*mem.Allocator> fn writeFn(out_stream: *Stream, bytes: []const u8) Error!void { + fn writeFn(out_stream: *Stream, bytes: []const u8) Error!void { const self = @fieldParentPtr(OutStream, "stream", out_stream); const offset = self.offset; self.offset += bytes.len; - return await (async pwritev(self.loop, self.fd, [][]const u8{bytes}, offset) catch unreachable); + return pwritev(self.loop, self.fd, [][]const u8{bytes}, offset); } }; @@ -1423,9 +1423,9 @@ pub const InStream = struct { }; } - async<*mem.Allocator> fn readFn(in_stream: *Stream, bytes: []u8) Error!usize { + fn readFn(in_stream: *Stream, bytes: []u8) Error!usize { const self = @fieldParentPtr(InStream, "stream", in_stream); - const amt = try await (async preadv(self.loop, self.fd, [][]u8{bytes}, self.offset) catch unreachable); + const amt = try preadv(self.loop, self.fd, [][]u8{bytes}, self.offset); self.offset += amt; return amt; } diff --git a/std/event/loop.zig b/std/event/loop.zig index f0ae67a3d1..a4a60b5098 100644 --- a/std/event/loop.zig +++ b/std/event/loop.zig @@ -98,9 +98,21 @@ pub const Loop = struct { }; pub const instance: ?*Loop = if (@hasDecl(root, "event_loop")) root.event_loop else default_instance; + /// TODO copy elision / named return values so that the threads referencing *Loop + /// have the correct pointer value. + /// https://github.com/ziglang/zig/issues/2761 and https://github.com/ziglang/zig/issues/2765 + pub fn init(self: *Loop, allocator: *mem.Allocator) !void { + if (builtin.single_threaded) { + return self.initSingleThreaded(allocator); + } else { + return self.initMultiThreaded(allocator); + } + } + /// After initialization, call run(). /// TODO copy elision / named return values so that the threads referencing *Loop /// have the correct pointer value. + /// https://github.com/ziglang/zig/issues/2761 and https://github.com/ziglang/zig/issues/2765 pub fn initSingleThreaded(self: *Loop, allocator: *mem.Allocator) !void { return self.initInternal(allocator, 1); } @@ -110,6 +122,7 @@ pub const Loop = struct { /// After initialization, call run(). /// TODO copy elision / named return values so that the threads referencing *Loop /// have the correct pointer value. + /// https://github.com/ziglang/zig/issues/2761 and https://github.com/ziglang/zig/issues/2765 pub fn initMultiThreaded(self: *Loop, allocator: *mem.Allocator) !void { if (builtin.single_threaded) @compileError("initMultiThreaded unavailable when building in single-threaded mode"); const core_count = try Thread.cpuCount(); @@ -161,18 +174,18 @@ pub const Loop = struct { fn initOsData(self: *Loop, extra_thread_count: usize) InitOsDataError!void { switch (builtin.os) { .linux => { - // TODO self.os_data.fs_queue = std.atomic.Queue(fs.Request).init(); - // TODO self.os_data.fs_queue_item = 0; - // TODO // we need another thread for the file system because Linux does not have an async - // TODO // file system I/O API. - // TODO self.os_data.fs_end_request = fs.RequestNode{ - // TODO .prev = undefined, - // TODO .next = undefined, - // TODO .data = fs.Request{ - // TODO .msg = fs.Request.Msg.End, - // TODO .finish = fs.Request.Finish.NoAction, - // TODO }, - // TODO }; + self.os_data.fs_queue = std.atomic.Queue(fs.Request).init(); + self.os_data.fs_queue_item = 0; + // we need another thread for the file system because Linux does not have an async + // file system I/O API. + self.os_data.fs_end_request = fs.RequestNode{ + .prev = undefined, + .next = undefined, + .data = fs.Request{ + .msg = fs.Request.Msg.End, + .finish = fs.Request.Finish.NoAction, + }, + }; errdefer { while (self.available_eventfd_resume_nodes.pop()) |node| os.close(node.data.eventfd); @@ -210,10 +223,10 @@ pub const Loop = struct { &self.os_data.final_eventfd_event, ); - // TODO self.os_data.fs_thread = try Thread.spawn(self, posixFsRun); + self.os_data.fs_thread = try Thread.spawn(self, posixFsRun); errdefer { - // TODO self.posixFsRequest(&self.os_data.fs_end_request); - // TODO self.os_data.fs_thread.wait(); + self.posixFsRequest(&self.os_data.fs_end_request); + self.os_data.fs_thread.wait(); } if (builtin.single_threaded) { @@ -315,10 +328,10 @@ pub const Loop = struct { .udata = undefined, }; - // TODO self.os_data.fs_thread = try Thread.spawn(self, posixFsRun); + self.os_data.fs_thread = try Thread.spawn(self, posixFsRun); errdefer { - // TODO self.posixFsRequest(&self.os_data.fs_end_request); - // TODO self.os_data.fs_thread.wait(); + self.posixFsRequest(&self.os_data.fs_end_request); + self.os_data.fs_thread.wait(); } if (builtin.single_threaded) { @@ -441,7 +454,6 @@ pub const Loop = struct { pub async fn linuxWaitFd(self: *Loop, fd: i32, flags: u32) !void { defer self.linuxRemoveFd(fd); suspend { - // TODO explicitly put this memory in the coroutine frame #1194 var resume_node = ResumeNode.Basic{ .base = ResumeNode{ .id = ResumeNode.Id.Basic, @@ -454,10 +466,6 @@ pub const Loop = struct { } pub async fn bsdWaitKev(self: *Loop, ident: usize, filter: i16, fflags: u32) !os.Kevent { - // TODO #1194 - suspend { - resume @handle(); - } var resume_node = ResumeNode.Basic{ .base = ResumeNode{ .id = ResumeNode.Id.Basic, @@ -578,7 +586,7 @@ pub const Loop = struct { .macosx, .freebsd, .netbsd, - => {}, // TODO self.os_data.fs_thread.wait(), + => self.os_data.fs_thread.wait(), else => {}, } @@ -631,7 +639,7 @@ pub const Loop = struct { // cause all the threads to stop switch (builtin.os) { .linux => { - // TODO self.posixFsRequest(&self.os_data.fs_end_request); + self.posixFsRequest(&self.os_data.fs_end_request); // writing 8 bytes to an eventfd cannot fail os.write(self.os_data.final_eventfd, wakeup_bytes) catch unreachable; return; @@ -862,10 +870,10 @@ pub const Loop = struct { epollfd: i32, final_eventfd: i32, final_eventfd_event: os.linux.epoll_event, - // TODO fs_thread: *Thread, - // TODO fs_queue_item: i32, - // TODO fs_queue: std.atomic.Queue(fs.Request), - // TODO fs_end_request: fs.RequestNode, + fs_thread: *Thread, + fs_queue_item: i32, + fs_queue: std.atomic.Queue(fs.Request), + fs_end_request: fs.RequestNode, }; };