From ef72cd6698e808c731320617ea62c8a33a5ecd87 Mon Sep 17 00:00:00 2001 From: Jonathan Marler Date: Tue, 28 Feb 2023 12:28:53 -0700 Subject: [PATCH] std.io.poll initial windows implementation --- lib/std/io.zig | 154 +++++++++++++++++++++++++++++++++++++++++++++---- 1 file changed, 143 insertions(+), 11 deletions(-) diff --git a/lib/std/io.zig b/lib/std/io.zig index 8563fd50d0..b125265449 100644 --- a/lib/std/io.zig +++ b/lib/std/io.zig @@ -175,6 +175,19 @@ pub fn poll( ) Poller(StreamEnum) { const enum_fields = @typeInfo(StreamEnum).Enum.fields; var result: Poller(StreamEnum) = undefined; + + if (builtin.os.tag == .windows) result.windows = .{ + .first_read_done = false, + .overlapped = [1]os.windows.OVERLAPPED { + mem.zeroes(os.windows.OVERLAPPED), + } ** enum_fields.len, + .active = .{ + .count = 0, + .handles_buf = undefined, + .stream_map = undefined, + }, + }; + inline for (0..enum_fields.len) |i| { result.fifos[i] = .{ .allocator = allocator, @@ -182,26 +195,56 @@ pub fn poll( .head = 0, .count = 0, }; - result.poll_fds[i] = .{ - .fd = @field(files, enum_fields[i].name).handle, - .events = os.POLL.IN, - .revents = undefined, - }; + if (builtin.os.tag == .windows) { + result.windows.active.handles_buf[i] = @field(files, enum_fields[i].name).handle; + } else { + result.poll_fds[i] = .{ + .fd = @field(files, enum_fields[i].name).handle, + .events = os.POLL.IN, + .revents = undefined, + }; + } } return result; } +pub const PollFifo = std.fifo.LinearFifo(u8, .Dynamic); + pub fn Poller(comptime StreamEnum: type) type { return struct { const enum_fields = @typeInfo(StreamEnum).Enum.fields; - const Fifo = std.fifo.LinearFifo(u8, .Dynamic); + const PollFd = if (builtin.os.tag == .windows) void else std.os.pollfd; - fifos: [enum_fields.len]Fifo, - poll_fds: [enum_fields.len]std.os.pollfd, + fifos: [enum_fields.len]PollFifo, + poll_fds: [enum_fields.len]PollFd, + windows: if (builtin.os.tag == .windows) struct { + first_read_done: bool, + overlapped: [enum_fields.len]os.windows.OVERLAPPED, + active: struct { + count: math.IntFittingRange(0, enum_fields.len), + handles_buf: [enum_fields.len]os.windows.HANDLE, + stream_map: [enum_fields.len]StreamEnum, + + pub fn removeAt(self: *@This(), index: u32) void { + std.debug.assert(index < self.count); + for (index + 1 .. self.count) |i| { + self.handles_buf[i - 1] = self.handles_buf[i]; + self.stream_map[i - 1] = self.stream_map[i]; + } + self.count -= 1; + } + }, + } else void, const Self = @This(); pub fn deinit(self: *Self) void { + if (builtin.os.tag == .windows) { + // cancel any pending IO to prevent clobbering OVERLAPPED value + for (self.windows.active.handles_buf[0 .. self.windows.active.count]) |h| { + _ = os.windows.kernel32.CancelIo(h); + } + } inline for (&self.fifos) |*q| q.deinit(); self.* = undefined; } @@ -214,19 +257,89 @@ pub fn Poller(comptime StreamEnum: type) type { } } - pub inline fn fifo(self: *Self, comptime which: StreamEnum) *Fifo { + pub inline fn fifo(self: *Self, comptime which: StreamEnum) *PollFifo { return &self.fifos[@enumToInt(which)]; } pub fn done(self: Self) bool { + if (builtin.os.tag == .windows) + return self.windows.first_read_done and self.windows.active.count == 0; + for (self.poll_fds) |poll_fd| { if (poll_fd.fd != -1) return false; } else return true; } fn pollWindows(self: *Self) !void { - _ = self; - @compileError("TODO"); + const bump_amt = 512; + + if (!self.windows.first_read_done) { + // Windows Async IO requires an initial call to ReadFile before waiting on the handle + for (0..enum_fields.len) |i| { + const handle = self.windows.active.handles_buf[i]; + switch (try windowsAsyncRead( + handle, + &self.windows.overlapped[i], + &self.fifos[i], + bump_amt, + )) { + .pending => { + self.windows.active.handles_buf[self.windows.active.count] = handle; + self.windows.active.stream_map[self.windows.active.count] = @intToEnum(StreamEnum, i); + self.windows.active.count += 1; + }, + .closed => {}, // don't add to the wait_objects list + } + } + self.windows.first_read_done = true; + } + + while (true) { + if (self.windows.active.count == 0) return; + + const status = os.windows.kernel32.WaitForMultipleObjects( + self.windows.active.count, + &self.windows.active.handles_buf, + 0, + os.windows.INFINITE, + ); + if (status == os.windows.WAIT_FAILED) + return os.windows.unexpectedError(os.windows.kernel32.GetLastError()); + + if (status < os.windows.WAIT_OBJECT_0 or status > os.windows.WAIT_OBJECT_0 + enum_fields.len - 1) + unreachable; + + const active_idx = status - os.windows.WAIT_OBJECT_0; + + const handle = self.windows.active.handles_buf[active_idx]; + const stream_idx = @enumToInt(self.windows.active.stream_map[active_idx]); + var read_bytes: u32 = undefined; + if (0 == os.windows.kernel32.GetOverlappedResult( + handle, + &self.windows.overlapped[stream_idx], + &read_bytes, + 0, + )) switch (os.windows.kernel32.GetLastError()) { + .BROKEN_PIPE => { + self.windows.active.removeAt(active_idx); + continue; + }, + else => |err| return os.windows.unexpectedError(err), + }; + + self.fifos[stream_idx].update(read_bytes); + + switch (try windowsAsyncRead( + handle, + &self.windows.overlapped[stream_idx], + &self.fifos[stream_idx], + bump_amt, + )) { + .pending => {}, + .closed => self.windows.active.removeAt(active_idx), + } + return; + } } fn pollPosix(self: *Self) !void { @@ -263,6 +376,25 @@ pub fn Poller(comptime StreamEnum: type) type { }; } +fn windowsAsyncRead( + handle: os.windows.HANDLE, + overlapped: *os.windows.OVERLAPPED, + fifo: *PollFifo, + bump_amt: usize, +) !enum{ pending, closed } { + while (true) { + const buf = try fifo.writableWithSize(bump_amt); + var read_bytes: u32 = undefined; + const read_result = os.windows.kernel32.ReadFile(handle, buf.ptr, math.cast(u32, buf.len) orelse math.maxInt(u32), &read_bytes, overlapped); + if (read_result == 0) return switch (os.windows.kernel32.GetLastError()) { + .IO_PENDING => .pending, + .BROKEN_PIPE => .closed, + else => |err| os.windows.unexpectedError(err), + }; + fifo.update(read_bytes); + } +} + /// Given an enum, returns a struct with fields of that enum, each field /// representing an I/O stream for polling. pub fn PollFiles(comptime StreamEnum: type) type {