diff --git a/lib/std/event/fs.zig b/lib/std/event/fs.zig index c46fd79038..1da77e4b57 100644 --- a/lib/std/event/fs.zig +++ b/lib/std/event/fs.zig @@ -9,6 +9,9 @@ const windows = os.windows; const Loop = event.Loop; const fd_t = os.fd_t; const File = std.fs.File; +const Allocator = mem.Allocator; + +//! TODO mege this with `std.fs` const global_event_loop = Loop.instance orelse @compileError("std.event.fs currently only works with event-based I/O"); @@ -681,7 +684,7 @@ fn writeFileModeThread(allocator: *Allocator, path: []const u8, contents: []cons /// is closed. /// Caller owns returned memory. pub fn readFile(allocator: *Allocator, file_path: []const u8, max_size: usize) ![]u8 { - var close_op = try CloseOperation.start(); + var close_op = try CloseOperation.start(allocator); defer close_op.finish(); const fd = try openRead(file_path); @@ -694,7 +697,7 @@ pub fn readFile(allocator: *Allocator, file_path: []const u8, max_size: usize) ! try list.ensureCapacity(list.len + mem.page_size); const buf = list.items[list.len..]; const buf_array = [_][]u8{buf}; - const amt = try preadv(fd, buf_array, list.len); + const amt = try preadv(allocator, fd, buf_array, list.len); list.len += amt; if (list.len > max_size) { return error.FileTooBig; @@ -731,16 +734,18 @@ pub fn Watch(comptime V: type) type { return struct { channel: *event.Channel(Event.Error!Event), os_data: OsData, + allocator: *Allocator, const OsData = switch (builtin.os) { .macosx, .freebsd, .netbsd, .dragonfly => struct { file_table: FileTable, table_lock: event.Lock, - const FileTable = std.StringHashmap(*Put); + const FileTable = std.StringHashMap(*Put); const Put = struct { - putter: anyframe, - value_ptr: *V, + putter_frame: @Frame(kqPutEvents), + cancelled: bool = false, + value: V, }; }, @@ -753,24 +758,30 @@ pub fn Watch(comptime V: type) type { const WindowsOsData = struct { table_lock: event.Lock, dir_table: DirTable, - all_putters: std.atomic.Queue(anyframe), + all_putters: std.atomic.Queue(Put), ref_count: std.atomic.Int(usize), + const Put = struct { + putter: anyframe, + cancelled: bool = false, + }; + const DirTable = std.StringHashMap(*Dir); const FileTable = std.HashMap([]const u16, V, hashString, eqlString); const Dir = struct { - putter: anyframe, + putter_frame: @Frame(windowsDirReader), file_table: FileTable, table_lock: event.Lock, }; }; const LinuxOsData = struct { - putter: anyframe, + putter_frame: @Frame(linuxEventPutter), inotify_fd: i32, wd_table: WdTable, table_lock: event.Lock, + cancelled: bool = false, const WdTable = std.AutoHashMap(i32, Dir); const FileTable = std.StringHashMap(V); @@ -781,8 +792,6 @@ pub fn Watch(comptime V: type) type { }; }; - const FileToHandle = std.StringHashMap(anyframe); - const Self = @This(); pub const Event = struct { @@ -793,28 +802,44 @@ pub fn Watch(comptime V: type) type { pub const Error = WatchEventError; }; - pub fn create(loop: *Loop, event_buf_count: usize) !*Self { - const channel = try event.Channel(Self.Event.Error!Self.Event).create(loop, event_buf_count); - errdefer channel.destroy(); + pub fn init(allocator: *Allocator, event_buf_count: usize) !*Self { + const channel = try allocator.create(event.Channel(Event.Error!Event)); + errdefer allocator.destroy(channel); + var buf = try allocator.alloc(Event.Error!Event, event_buf_count); + errdefer allocator.free(buf); + channel.init(buf); + errdefer channel.deinit(); + + const self = try allocator.create(Self); + errdefer allocator.destroy(self); switch (builtin.os) { .linux => { const inotify_fd = try os.inotify_init1(os.linux.IN_NONBLOCK | os.linux.IN_CLOEXEC); errdefer os.close(inotify_fd); - var result: *Self = undefined; -// _ = try async linuxEventPutter(inotify_fd, channel, &result); - return result; + self.* = Self{ + .allocator = allocator, + .channel = channel, + .os_data = OsData{ + .putter_frame = undefined, + .inotify_fd = inotify_fd, + .wd_table = OsData.WdTable.init(allocator), + .table_lock = event.Lock.init(), + }, + }; + + self.os_data.putter_frame = async self.linuxEventPutter(); + return self; }, .windows => { - const self = try loop.allocator.create(Self); - errdefer loop.allocator.destroy(self); self.* = Self{ + .allocator = allocator, .channel = channel, .os_data = OsData{ - .table_lock = event.Lock.init(loop), - .dir_table = OsData.DirTable.init(loop.allocator), + .table_lock = event.Lock.init(), + .dir_table = OsData.DirTable.init(allocator), .ref_count = std.atomic.Int(usize).init(1), .all_putters = std.atomic.Queue(anyframe).init(), }, @@ -823,14 +848,12 @@ pub fn Watch(comptime V: type) type { }, .macosx, .freebsd, .netbsd, .dragonfly => { - const self = try loop.allocator.create(Self); - errdefer loop.allocator.destroy(self); - self.* = Self{ + .allocator = allocator, .channel = channel, .os_data = OsData{ - .table_lock = event.Lock.init(loop), - .file_table = OsData.FileTable.init(loop.allocator), + .table_lock = event.Lock.init(), + .file_table = OsData.FileTable.init(allocator), }, }; return self; @@ -840,22 +863,31 @@ pub fn Watch(comptime V: type) type { } /// All addFile calls and removeFile calls must have completed. - pub fn destroy(self: *Self) void { + pub fn deinit(self: *Self) void { switch (builtin.os) { .macosx, .freebsd, .netbsd, .dragonfly => { // TODO we need to cancel the frames before destroying the lock self.os_data.table_lock.deinit(); var it = self.os_data.file_table.iterator(); while (it.next()) |entry| { -// cancel entry.value.putter; - self.channel.loop.allocator.free(entry.key); + entry.cancelled = true; + await entry.value.putter; + self.allocator.free(entry.key); + self.allocator.free(entry.value); } - self.channel.destroy(); + self.channel.deinit(); + self.allocator.destroy(self.channel.buffer_nodes); + self.allocator.destroy(self); + }, + .linux => { + self.os_data.cancelled = true; + await self.os_data.putter_frame; + self.allocator.destroy(self); }, -// .linux => cancel self.os_data.putter, .windows => { while (self.os_data.all_putters.get()) |putter_node| { -// cancel putter_node.data; + putter_node.cancelled = true; + await putter_node.frame; } self.deref(); }, @@ -869,60 +901,68 @@ pub fn Watch(comptime V: type) type { fn deref(self: *Self) void { if (self.os_data.ref_count.decr() == 1) { - const allocator = self.channel.loop.allocator; self.os_data.table_lock.deinit(); var it = self.os_data.dir_table.iterator(); while (it.next()) |entry| { - allocator.free(entry.key); - allocator.destroy(entry.value); + self.allocator.free(entry.key); + self.allocator.destroy(entry.value); } self.os_data.dir_table.deinit(); - self.channel.destroy(); - allocator.destroy(self); + self.channel.deinit(); + self.allocator.destroy(self.channel.buffer_nodes); + self.allocator.destroy(self); } } - pub async fn addFile(self: *Self, file_path: []const u8, value: V) !?V { + pub fn addFile(self: *Self, file_path: []const u8, value: V) !?V { switch (builtin.os) { - .macosx, .freebsd, .netbsd, .dragonfly => return await (async addFileKEvent(self, file_path, value) catch unreachable), - .linux => return await (async addFileLinux(self, file_path, value) catch unreachable), - .windows => return await (async addFileWindows(self, file_path, value) catch unreachable), + .macosx, .freebsd, .netbsd, .dragonfly => return addFileKEvent(self, file_path, value), + .linux => return addFileLinux(self, file_path, value), + .windows => return addFileWindows(self, file_path, value), else => @compileError("Unsupported OS"), } } - async fn addFileKEvent(self: *Self, file_path: []const u8, value: V) !?V { - const resolved_path = try std.fs.path.resolve(self.channel.loop.allocator, [_][]const u8{file_path}); + fn addFileKEvent(self: *Self, file_path: []const u8, value: V) !?V { + const resolved_path = try std.fs.path.resolve(self.allocator, [_][]const u8{file_path}); var resolved_path_consumed = false; - defer if (!resolved_path_consumed) self.channel.loop.allocator.free(resolved_path); + defer if (!resolved_path_consumed) self.allocator.free(resolved_path); - var close_op = try CloseOperation.start(self.channel.loop); + var close_op = try CloseOperation.start(self.allocator); var close_op_consumed = false; defer if (!close_op_consumed) close_op.finish(); const flags = if (comptime std.Target.current.isDarwin()) os.O_SYMLINK | os.O_EVTONLY else 0; const mode = 0; - const fd = try await (async openPosix(self.channel.loop, resolved_path, flags, mode) catch unreachable); + const fd = try openPosix(self.allocator, resolved_path, flags, mode); close_op.setHandle(fd); - var put_data: *OsData.Put = undefined; - const putter = try async self.kqPutEvents(close_op, value, &put_data); + var put = try self.allocator.create(OsData.Put); + errdefer self.allocator.destroy(put); + put.* = OsData.Put{ + .value = value, + .putter_frame = undefined, + }; + put.putter_frame = async self.kqPutEvents(close_op, put); close_op_consumed = true; -// errdefer cancel putter; + errdefer { + put.cancelled = true; + await put.putter_frame; + } const result = blk: { - const held = await (async self.os_data.table_lock.acquire() catch unreachable); + const held = self.os_data.table_lock.acquire(); defer held.release(); const gop = try self.os_data.file_table.getOrPut(resolved_path); if (gop.found_existing) { - const prev_value = gop.kv.value.value_ptr.*; -// cancel gop.kv.value.putter; - gop.kv.value = put_data; + const prev_value = gop.kv.value.value; + await gop.kv.value.putter_frame; + gop.kv.value = put; break :blk prev_value; } else { resolved_path_consumed = true; - gop.kv.value = put_data; + gop.kv.value = put; break :blk null; } }; @@ -930,61 +970,53 @@ pub fn Watch(comptime V: type) type { return result; } - async fn kqPutEvents(self: *Self, close_op: *CloseOperation, value: V, out_put: **OsData.Put) void { - var value_copy = value; - var put = OsData.Put{ - .putter = @frame(), - .value_ptr = &value_copy, - }; - out_put.* = &put; - self.channel.loop.beginOneEvent(); + fn kqPutEvents(self: *Self, close_op: *CloseOperation, put: *OsData.Put) void { + global_event_loop.beginOneEvent(); defer { close_op.finish(); - self.channel.loop.finishOneEvent(); + global_event_loop.finishOneEvent(); } - while (true) { - if (await (async self.channel.loop.bsdWaitKev( + while (!put.cancelled) { + if (global_event_loop.bsdWaitKev( @intCast(usize, close_op.getHandle()), os.EVFILT_VNODE, os.NOTE_WRITE | os.NOTE_DELETE, - ) catch unreachable)) |kev| { + )) |kev| { // TODO handle EV_ERROR if (kev.fflags & os.NOTE_DELETE != 0) { - await (async self.channel.put(Self.Event{ + self.channel.put(Self.Event{ .id = Event.Id.Delete, - .data = value_copy, - }) catch unreachable); + .data = put.value, + }); } else if (kev.fflags & os.NOTE_WRITE != 0) { - await (async self.channel.put(Self.Event{ + self.channel.put(Self.Event{ .id = Event.Id.CloseWrite, - .data = value_copy, - }) catch unreachable); + .data = put.value, + }); } } else |err| switch (err) { error.EventNotFound => unreachable, error.ProcessNotFound => unreachable, error.Overflow => unreachable, error.AccessDenied, error.SystemResources => |casted_err| { - await (async self.channel.put(casted_err) catch unreachable); + self.channel.put(casted_err); }, } } } - async fn addFileLinux(self: *Self, file_path: []const u8, value: V) !?V { - const value_copy = value; - + fn addFileLinux(self: *Self, file_path: []const u8, value: V) !?V { const dirname = std.fs.path.dirname(file_path) orelse "."; - const dirname_with_null = try std.cstr.addNullByte(self.channel.loop.allocator, dirname); + const dirname_with_null = try std.cstr.addNullByte(self.allocator, dirname); var dirname_with_null_consumed = false; - defer if (!dirname_with_null_consumed) self.channel.loop.allocator.free(dirname_with_null); + defer if (!dirname_with_null_consumed) self.channel.free(dirname_with_null); const basename = std.fs.path.basename(file_path); - const basename_with_null = try std.cstr.addNullByte(self.channel.loop.allocator, basename); + const basename_with_null = try std.cstr.addNullByte(self.allocator, basename); var basename_with_null_consumed = false; - defer if (!basename_with_null_consumed) self.channel.loop.allocator.free(basename_with_null); + defer if (!basename_with_null_consumed) self.allocator.free(basename_with_null); const wd = try os.inotify_add_watchC( self.os_data.inotify_fd, @@ -993,14 +1025,14 @@ pub fn Watch(comptime V: type) type { ); // wd is either a newly created watch or an existing one. - const held = await (async self.os_data.table_lock.acquire() catch unreachable); + const held = self.os_data.table_lock.acquire(); defer held.release(); const gop = try self.os_data.wd_table.getOrPut(wd); if (!gop.found_existing) { gop.kv.value = OsData.Dir{ .dirname = dirname_with_null, - .file_table = OsData.FileTable.init(self.channel.loop.allocator), + .file_table = OsData.FileTable.init(self.allocator), }; dirname_with_null_consumed = true; } @@ -1009,31 +1041,29 @@ pub fn Watch(comptime V: type) type { const file_table_gop = try dir.file_table.getOrPut(basename_with_null); if (file_table_gop.found_existing) { const prev_value = file_table_gop.kv.value; - file_table_gop.kv.value = value_copy; + file_table_gop.kv.value = value; return prev_value; } else { - file_table_gop.kv.value = value_copy; + file_table_gop.kv.value = value; basename_with_null_consumed = true; return null; } } - async fn addFileWindows(self: *Self, file_path: []const u8, value: V) !?V { - const value_copy = value; + fn addFileWindows(self: *Self, file_path: []const u8, value: V) !?V { // TODO we might need to convert dirname and basename to canonical file paths ("short"?) - - const dirname = try std.mem.dupe(self.channel.loop.allocator, u8, std.fs.path.dirname(file_path) orelse "."); + const dirname = try std.mem.dupe(self.allocator, u8, std.fs.path.dirname(file_path) orelse "."); var dirname_consumed = false; - defer if (!dirname_consumed) self.channel.loop.allocator.free(dirname); + defer if (!dirname_consumed) self.allocator.free(dirname); - const dirname_utf16le = try std.unicode.utf8ToUtf16LeWithNull(self.channel.loop.allocator, dirname); - defer self.channel.loop.allocator.free(dirname_utf16le); + const dirname_utf16le = try std.unicode.utf8ToUtf16LeWithNull(self.allocator, dirname); + defer self.allocator.free(dirname_utf16le); // TODO https://github.com/ziglang/zig/issues/265 const basename = std.fs.path.basename(file_path); - const basename_utf16le_null = try std.unicode.utf8ToUtf16LeWithNull(self.channel.loop.allocator, basename); + const basename_utf16le_null = try std.unicode.utf8ToUtf16LeWithNull(self.allocator, basename); var basename_utf16le_null_consumed = false; - defer if (!basename_utf16le_null_consumed) self.channel.loop.allocator.free(basename_utf16le_null); + defer if (!basename_utf16le_null_consumed) self.allocator.free(basename_utf16le_null); const basename_utf16le_no_null = basename_utf16le_null[0 .. basename_utf16le_null.len - 1]; const dir_handle = try windows.CreateFileW( @@ -1048,40 +1078,40 @@ pub fn Watch(comptime V: type) type { var dir_handle_consumed = false; defer if (!dir_handle_consumed) windows.CloseHandle(dir_handle); - const held = await (async self.os_data.table_lock.acquire() catch unreachable); + const held = self.os_data.table_lock.acquire(); defer held.release(); const gop = try self.os_data.dir_table.getOrPut(dirname); if (gop.found_existing) { const dir = gop.kv.value; - const held_dir_lock = await (async dir.table_lock.acquire() catch unreachable); + const held_dir_lock = dir.table_lock.acquire(); defer held_dir_lock.release(); const file_gop = try dir.file_table.getOrPut(basename_utf16le_no_null); if (file_gop.found_existing) { const prev_value = file_gop.kv.value; - file_gop.kv.value = value_copy; + file_gop.kv.value = value; return prev_value; } else { - file_gop.kv.value = value_copy; + file_gop.kv.value = value; basename_utf16le_null_consumed = true; return null; } } else { errdefer _ = self.os_data.dir_table.remove(dirname); - const dir = try self.channel.loop.allocator.create(OsData.Dir); - errdefer self.channel.loop.allocator.destroy(dir); + const dir = try self.allocator.create(OsData.Dir); + errdefer self.allocator.destroy(dir); dir.* = OsData.Dir{ - .file_table = OsData.FileTable.init(self.channel.loop.allocator), - .table_lock = event.Lock.init(self.channel.loop), - .putter = undefined, + .file_table = OsData.FileTable.init(self.allocator), + .table_lock = event.Lock.init(), + .putter_frame = undefined, }; gop.kv.value = dir; - assert((try dir.file_table.put(basename_utf16le_no_null, value_copy)) == null); + assert((try dir.file_table.put(basename_utf16le_no_null, value)) == null); basename_utf16le_null_consumed = true; - dir.putter = try async self.windowsDirReader(dir_handle, dir); + dir.putter_frame = async self.windowsDirReader(dir_handle, dir); dir_handle_consumed = true; dirname_consumed = true; @@ -1090,14 +1120,14 @@ pub fn Watch(comptime V: type) type { } } - async fn windowsDirReader(self: *Self, dir_handle: windows.HANDLE, dir: *OsData.Dir) void { + fn windowsDirReader(self: *Self, dir_handle: windows.HANDLE, dir: *OsData.Dir) void { self.ref(); defer self.deref(); defer os.close(dir_handle); var putter_node = std.atomic.Queue(anyframe).Node{ - .data = @frame(), + .data = .{ .putter = @frame() }, .prev = null, .next = null, }; @@ -1122,19 +1152,19 @@ pub fn Watch(comptime V: type) type { // TODO handle this error not in the channel but in the setup _ = windows.CreateIoCompletionPort( dir_handle, - self.channel.loop.os_data.io_port, + global_event_loop.os_data.io_port, undefined, undefined, ) catch |err| { - await (async self.channel.put(err) catch unreachable); + self.channel.put(err); return; }; - while (true) { + while (!putter_node.data.cancelled) { { // TODO only 1 beginOneEvent for the whole function - self.channel.loop.beginOneEvent(); - errdefer self.channel.loop.finishOneEvent(); + global_event_loop.beginOneEvent(); + errdefer global_event_loop.finishOneEvent(); errdefer { _ = windows.kernel32.CancelIoEx(dir_handle, &resume_node.base.overlapped); } @@ -1159,7 +1189,7 @@ pub fn Watch(comptime V: type) type { const err = switch (windows.kernel32.GetLastError()) { else => |err| windows.unexpectedError(err), }; - await (async self.channel.put(err) catch unreachable); + self.channel.put(err); } else { // can't use @bytesToSlice because of the special variable length name field var ptr = event_buf[0..].ptr; @@ -1175,7 +1205,7 @@ pub fn Watch(comptime V: type) type { if (emit) |id| { const basename_utf16le = ([*]u16)(&ev.FileName)[0 .. ev.FileNameLength / 2]; const user_value = blk: { - const held = await (async dir.table_lock.acquire() catch unreachable); + const held = dir.table_lock.acquire(); defer held.release(); if (dir.file_table.get(basename_utf16le)) |entry| { @@ -1185,10 +1215,10 @@ pub fn Watch(comptime V: type) type { } }; if (user_value) |v| { - await (async self.channel.put(Event{ + self.channel.put(Event{ .id = id, .data = v, - }) catch unreachable); + }); } } if (ev.NextEntryOffset == 0) break; @@ -1197,45 +1227,35 @@ pub fn Watch(comptime V: type) type { } } - pub async fn removeFile(self: *Self, file_path: []const u8) ?V { + pub fn removeFile(self: *Self, file_path: []const u8) ?V { @panic("TODO"); } - async fn linuxEventPutter(inotify_fd: i32, channel: *event.Channel(Event.Error!Event), out_watch: **Self) void { - const loop = channel.loop; - - var watch = Self{ - .channel = channel, - .os_data = OsData{ - .putter = @frame(), - .inotify_fd = inotify_fd, - .wd_table = OsData.WdTable.init(loop.allocator), - .table_lock = event.Lock.init(loop), - }, - }; - out_watch.* = &watch; - - loop.beginOneEvent(); + fn linuxEventPutter(self: *Self) void { + global_event_loop.beginOneEvent(); defer { - watch.os_data.table_lock.deinit(); - var wd_it = watch.os_data.wd_table.iterator(); + self.os_data.table_lock.deinit(); + var wd_it = self.os_data.wd_table.iterator(); while (wd_it.next()) |wd_entry| { var file_it = wd_entry.value.file_table.iterator(); while (file_it.next()) |file_entry| { - loop.allocator.free(file_entry.key); + self.allocator.free(file_entry.key); } - loop.allocator.free(wd_entry.value.dirname); + self.allocator.free(wd_entry.value.dirname); + wd_entry.value.file_table.deinit(); } - loop.finishOneEvent(); - os.close(inotify_fd); - channel.destroy(); + self.os_data.wd_table.deinit(); + global_event_loop.finishOneEvent(); + os.close(self.os_data.inotify_fd); + self.channel.deinit(); + self.allocator.free(self.channel.buffer_nodes); } var event_buf: [4096]u8 align(@alignOf(os.linux.inotify_event)) = undefined; - while (true) { - const rc = os.linux.read(inotify_fd, &event_buf, event_buf.len); + while (!self.os_data.cancelled) { + const rc = os.linux.read(self.os_data.inotify_fd, &event_buf, event_buf.len); const errno = os.linux.getErrno(rc); switch (errno) { 0 => { @@ -1247,12 +1267,13 @@ pub fn Watch(comptime V: type) type { ev = @ptrCast(*os.linux.inotify_event, ptr); if (ev.mask & os.linux.IN_CLOSE_WRITE == os.linux.IN_CLOSE_WRITE) { const basename_ptr = ptr + @sizeOf(os.linux.inotify_event); - const basename_with_null = basename_ptr[0 .. std.mem.len(u8, basename_ptr) + 1]; + // `ev.len` counts all bytes in `ev.name` including terminating null byte. + const basename_with_null = basename_ptr[0 .. ev.len]; const user_value = blk: { - const held = await (async watch.os_data.table_lock.acquire() catch unreachable); + const held = self.os_data.table_lock.acquire(); defer held.release(); - const dir = &watch.os_data.wd_table.get(ev.wd).?.value; + const dir = &self.os_data.wd_table.get(ev.wd).?.value; if (dir.file_table.get(basename_with_null)) |entry| { break :blk entry.value; } else { @@ -1260,10 +1281,10 @@ pub fn Watch(comptime V: type) type { } }; if (user_value) |v| { - await (async channel.put(Event{ + self.channel.put(Event{ .id = WatchEventId.CloseWrite, .data = v, - }) catch unreachable); + }); } } } @@ -1272,20 +1293,7 @@ pub fn Watch(comptime V: type) type { os.linux.EINVAL => unreachable, os.linux.EFAULT => unreachable, os.linux.EAGAIN => { - (await (async loop.linuxWaitFd( - inotify_fd, - os.linux.EPOLLET | os.linux.EPOLLIN, - ) catch unreachable)) catch |err| { - const transformed_err = switch (err) { - error.FileDescriptorAlreadyPresentInSet => unreachable, - error.OperationCausesCircularLoop => unreachable, - error.FileDescriptorNotRegistered => unreachable, - error.FileDescriptorIncompatibleWithEpoll => unreachable, - error.Unexpected => unreachable, - else => |e| e, - }; - await (async channel.put(transformed_err) catch unreachable); - }; + global_event_loop.linuxWaitFd(self.os_data.inotify_fd, os.linux.EPOLLET | os.linux.EPOLLIN); }, else => unreachable, } @@ -1296,34 +1304,22 @@ pub fn Watch(comptime V: type) type { const test_tmp_dir = "std_event_fs_test"; -// TODO this test is disabled until the async function rewrite is finished. test "write a file, watch it, write it again" { - return error.SkipZigTest; + // TODO provide a way to run tests in evented I/O mode + if (!std.io.is_async) return error.SkipZigTest; + const allocator = std.heap.direct_allocator; // TODO move this into event loop too try os.makePath(allocator, test_tmp_dir); defer os.deleteTree(test_tmp_dir) catch {}; - var loop: Loop = undefined; - try loop.initMultiThreaded(allocator); - defer loop.deinit(); - - var result: anyerror!void = error.ResultNeverWritten; -// const handle = try async testFsWatchCantFail(&loop, &result); -// defer cancel handle; - - loop.run(); - return result; + return testFsWatch(&allocator); } -fn testFsWatchCantFail(loop: *Loop, result: *(anyerror!void)) void { - result.* = testFsWatch(loop); -} - -fn testFsWatch(loop: *Loop) !void { - const file_path = try std.fs.path.join(loop.allocator, [][]const u8{ test_tmp_dir, "file.txt" }); - defer loop.allocator.free(file_path); +fn testFsWatch(allocator: *Allocator) !void { + const file_path = try std.fs.path.join(allocator, [_][]const u8{ test_tmp_dir, "file.txt" }); + defer allocator.free(file_path); const contents = \\line 1 @@ -1332,27 +1328,27 @@ fn testFsWatch(loop: *Loop) !void { const line2_offset = 7; // first just write then read the file - try writeFile(loop, file_path, contents); + try writeFile(allocator, file_path, contents); - const read_contents = try readFile(loop, file_path, 1024 * 1024); + const read_contents = try readFile(allocator, file_path, 1024 * 1024); testing.expectEqualSlices(u8, contents, read_contents); // now watch the file - var watch = try Watch(void).create(loop, 0); - defer watch.destroy(); + var watch = try Watch(void).init(allocator, 0); + defer watch.deinit(); testing.expect((try watch.addFile(file_path, {})) == null); - const ev = async watch.channel.get(); + const ev = watch.channel.get(); var ev_consumed = false; defer if (!ev_consumed) await ev; // overwrite line 2 - const fd = try await openReadWrite(loop, file_path, File.default_mode); + const fd = try await openReadWrite(file_path, File.default_mode); { defer os.close(fd); - try pwritev(loop, fd, []const []const u8{"lorem ipsum"}, line2_offset); + try pwritev(allocator, fd, []const []const u8{"lorem ipsum"}, line2_offset); } ev_consumed = true; @@ -1360,7 +1356,7 @@ fn testFsWatch(loop: *Loop) !void { WatchEventId.CloseWrite => {}, WatchEventId.Delete => @panic("wrong event"), } - const contents_updated = try readFile(loop, file_path, 1024 * 1024); + const contents_updated = try readFile(allocator, file_path, 1024 * 1024); testing.expectEqualSlices(u8, \\line 1 \\lorem ipsum