mirror of
https://github.com/ziglang/zig.git
synced 2025-12-10 08:13:07 +00:00
instead this use case is better served with passthrough streams. For instance, hashing writers should support being passed an underlying writer, and the buffer can go in front of the hasher for optimal code.
437 lines
17 KiB
Zig
437 lines
17 KiB
Zig
const builtin = @import("builtin");
|
|
const is_windows = builtin.os.tag == .windows;
|
|
|
|
const std = @import("std.zig");
|
|
const windows = std.os.windows;
|
|
const posix = std.posix;
|
|
const math = std.math;
|
|
const assert = std.debug.assert;
|
|
const Allocator = std.mem.Allocator;
|
|
const Alignment = std.mem.Alignment;
|
|
|
|
pub const Reader = @import("io/Reader.zig");
|
|
pub const Writer = @import("io/Writer.zig");
|
|
|
|
pub const BufferedReader = @import("io/BufferedReader.zig");
|
|
pub const BufferedWriter = @import("io/BufferedWriter.zig");
|
|
pub const AllocatingWriter = @import("io/AllocatingWriter.zig");
|
|
|
|
pub const ChangeDetectionStream = @import("io/change_detection_stream.zig").ChangeDetectionStream;
|
|
pub const changeDetectionStream = @import("io/change_detection_stream.zig").changeDetectionStream;
|
|
|
|
pub const tty = @import("io/tty.zig");
|
|
|
|
pub fn poll(
|
|
gpa: Allocator,
|
|
comptime StreamEnum: type,
|
|
files: PollFiles(StreamEnum),
|
|
) Poller(StreamEnum) {
|
|
const enum_fields = @typeInfo(StreamEnum).@"enum".fields;
|
|
var result: Poller(StreamEnum) = .{
|
|
.gpa = gpa,
|
|
.readers = @splat(.{
|
|
.unbuffered_reader = .failing,
|
|
.buffer = &.{},
|
|
.end = 0,
|
|
.seek = 0,
|
|
}),
|
|
.poll_fds = undefined,
|
|
.windows = if (is_windows) .{
|
|
.first_read_done = false,
|
|
.overlapped = [1]windows.OVERLAPPED{
|
|
std.mem.zeroes(windows.OVERLAPPED),
|
|
} ** enum_fields.len,
|
|
.small_bufs = undefined,
|
|
.active = .{
|
|
.count = 0,
|
|
.handles_buf = undefined,
|
|
.stream_map = undefined,
|
|
},
|
|
} else {},
|
|
};
|
|
|
|
inline for (enum_fields, 0..) |field, i| {
|
|
if (is_windows) {
|
|
result.windows.active.handles_buf[i] = @field(files, field.name).handle;
|
|
} else {
|
|
result.poll_fds[i] = .{
|
|
.fd = @field(files, field.name).handle,
|
|
.events = posix.POLL.IN,
|
|
.revents = undefined,
|
|
};
|
|
}
|
|
}
|
|
|
|
return result;
|
|
}
|
|
|
|
pub fn Poller(comptime StreamEnum: type) type {
|
|
return struct {
|
|
const enum_fields = @typeInfo(StreamEnum).@"enum".fields;
|
|
const PollFd = if (is_windows) void else posix.pollfd;
|
|
|
|
gpa: Allocator,
|
|
readers: [enum_fields.len]BufferedReader,
|
|
poll_fds: [enum_fields.len]PollFd,
|
|
windows: if (is_windows) struct {
|
|
first_read_done: bool,
|
|
overlapped: [enum_fields.len]windows.OVERLAPPED,
|
|
small_bufs: [enum_fields.len][128]u8,
|
|
active: struct {
|
|
count: math.IntFittingRange(0, enum_fields.len),
|
|
handles_buf: [enum_fields.len]windows.HANDLE,
|
|
stream_map: [enum_fields.len]StreamEnum,
|
|
|
|
pub fn removeAt(self: *@This(), index: u32) void {
|
|
assert(index < self.count);
|
|
for (index + 1..self.count) |i| {
|
|
self.handles_buf[i - 1] = self.handles_buf[i];
|
|
self.stream_map[i - 1] = self.stream_map[i];
|
|
}
|
|
self.count -= 1;
|
|
}
|
|
},
|
|
} else void,
|
|
|
|
const Self = @This();
|
|
|
|
pub fn deinit(self: *Self) void {
|
|
const gpa = self.gpa;
|
|
if (is_windows) {
|
|
// cancel any pending IO to prevent clobbering OVERLAPPED value
|
|
for (self.windows.active.handles_buf[0..self.windows.active.count]) |h| {
|
|
_ = windows.kernel32.CancelIo(h);
|
|
}
|
|
}
|
|
inline for (&self.readers) |*br| gpa.free(br.buffer);
|
|
self.* = undefined;
|
|
}
|
|
|
|
pub fn poll(self: *Self) !bool {
|
|
if (is_windows) {
|
|
return pollWindows(self, null);
|
|
} else {
|
|
return pollPosix(self, null);
|
|
}
|
|
}
|
|
|
|
pub fn pollTimeout(self: *Self, nanoseconds: u64) !bool {
|
|
if (is_windows) {
|
|
return pollWindows(self, nanoseconds);
|
|
} else {
|
|
return pollPosix(self, nanoseconds);
|
|
}
|
|
}
|
|
|
|
pub inline fn reader(self: *Self, comptime which: StreamEnum) *BufferedReader {
|
|
return &self.readers[@intFromEnum(which)];
|
|
}
|
|
|
|
fn pollWindows(self: *Self, nanoseconds: ?u64) !bool {
|
|
const bump_amt = 512;
|
|
|
|
if (!self.windows.first_read_done) {
|
|
var already_read_data = false;
|
|
for (0..enum_fields.len) |i| {
|
|
const handle = self.windows.active.handles_buf[i];
|
|
switch (try windowsAsyncReadToFifoAndQueueSmallRead(
|
|
handle,
|
|
&self.windows.overlapped[i],
|
|
&self.fifos[i],
|
|
&self.windows.small_bufs[i],
|
|
bump_amt,
|
|
)) {
|
|
.populated, .empty => |state| {
|
|
if (state == .populated) already_read_data = true;
|
|
self.windows.active.handles_buf[self.windows.active.count] = handle;
|
|
self.windows.active.stream_map[self.windows.active.count] = @as(StreamEnum, @enumFromInt(i));
|
|
self.windows.active.count += 1;
|
|
},
|
|
.closed => {}, // don't add to the wait_objects list
|
|
.closed_populated => {
|
|
// don't add to the wait_objects list, but we did already get data
|
|
already_read_data = true;
|
|
},
|
|
}
|
|
}
|
|
self.windows.first_read_done = true;
|
|
if (already_read_data) return true;
|
|
}
|
|
|
|
while (true) {
|
|
if (self.windows.active.count == 0) return false;
|
|
|
|
const status = windows.kernel32.WaitForMultipleObjects(
|
|
self.windows.active.count,
|
|
&self.windows.active.handles_buf,
|
|
0,
|
|
if (nanoseconds) |ns|
|
|
@min(std.math.cast(u32, ns / std.time.ns_per_ms) orelse (windows.INFINITE - 1), windows.INFINITE - 1)
|
|
else
|
|
windows.INFINITE,
|
|
);
|
|
if (status == windows.WAIT_FAILED)
|
|
return windows.unexpectedError(windows.GetLastError());
|
|
if (status == windows.WAIT_TIMEOUT)
|
|
return true;
|
|
|
|
if (status < windows.WAIT_OBJECT_0 or status > windows.WAIT_OBJECT_0 + enum_fields.len - 1)
|
|
unreachable;
|
|
|
|
const active_idx = status - windows.WAIT_OBJECT_0;
|
|
|
|
const stream_idx = @intFromEnum(self.windows.active.stream_map[active_idx]);
|
|
const handle = self.windows.active.handles_buf[active_idx];
|
|
|
|
const overlapped = &self.windows.overlapped[stream_idx];
|
|
const stream_fifo = &self.fifos[stream_idx];
|
|
const small_buf = &self.windows.small_bufs[stream_idx];
|
|
|
|
const num_bytes_read = switch (try windowsGetReadResult(handle, overlapped, false)) {
|
|
.success => |n| n,
|
|
.closed => {
|
|
self.windows.active.removeAt(active_idx);
|
|
continue;
|
|
},
|
|
.aborted => unreachable,
|
|
};
|
|
try stream_fifo.write(small_buf[0..num_bytes_read]);
|
|
|
|
switch (try windowsAsyncReadToFifoAndQueueSmallRead(
|
|
handle,
|
|
overlapped,
|
|
stream_fifo,
|
|
small_buf,
|
|
bump_amt,
|
|
)) {
|
|
.empty => {}, // irrelevant, we already got data from the small buffer
|
|
.populated => {},
|
|
.closed,
|
|
.closed_populated, // identical, since we already got data from the small buffer
|
|
=> self.windows.active.removeAt(active_idx),
|
|
}
|
|
return true;
|
|
}
|
|
}
|
|
|
|
fn pollPosix(self: *Self, nanoseconds: ?u64) !bool {
|
|
const gpa = self.gpa;
|
|
// We ask for ensureUnusedCapacity with this much extra space. This
|
|
// has more of an effect on small reads because once the reads
|
|
// start to get larger the amount of space an ArrayList will
|
|
// allocate grows exponentially.
|
|
const bump_amt = 512;
|
|
|
|
const err_mask = posix.POLL.ERR | posix.POLL.NVAL | posix.POLL.HUP;
|
|
|
|
const events_len = try posix.poll(&self.poll_fds, if (nanoseconds) |ns|
|
|
std.math.cast(i32, ns / std.time.ns_per_ms) orelse std.math.maxInt(i32)
|
|
else
|
|
-1);
|
|
if (events_len == 0) {
|
|
for (self.poll_fds) |poll_fd| {
|
|
if (poll_fd.fd != -1) return true;
|
|
} else return false;
|
|
}
|
|
|
|
var keep_polling = false;
|
|
inline for (&self.poll_fds, &self.readers) |*poll_fd, *br| {
|
|
// Try reading whatever is available before checking the error
|
|
// conditions.
|
|
// It's still possible to read after a POLL.HUP is received,
|
|
// always check if there's some data waiting to be read first.
|
|
if (poll_fd.revents & posix.POLL.IN != 0) {
|
|
const buf = try br.writableSliceGreedyAlloc(gpa, bump_amt);
|
|
const amt = posix.read(poll_fd.fd, buf) catch |err| switch (err) {
|
|
error.BrokenPipe => 0, // Handle the same as EOF.
|
|
else => |e| return e,
|
|
};
|
|
br.advanceBufferEnd(amt);
|
|
if (amt == 0) {
|
|
// Remove the fd when the EOF condition is met.
|
|
poll_fd.fd = -1;
|
|
} else {
|
|
keep_polling = true;
|
|
}
|
|
} else if (poll_fd.revents & err_mask != 0) {
|
|
// Exclude the fds that signaled an error.
|
|
poll_fd.fd = -1;
|
|
} else if (poll_fd.fd != -1) {
|
|
keep_polling = true;
|
|
}
|
|
}
|
|
return keep_polling;
|
|
}
|
|
};
|
|
}
|
|
|
|
/// The `ReadFile` docuementation states that `lpNumberOfBytesRead` does not have a meaningful
|
|
/// result when using overlapped I/O, but also that it cannot be `null` on Windows 7. For
|
|
/// compatibility, we point it to this dummy variables, which we never otherwise access.
|
|
/// See: https://learn.microsoft.com/en-us/windows/win32/api/fileapi/nf-fileapi-readfile
|
|
var win_dummy_bytes_read: u32 = undefined;
|
|
|
|
/// Read as much data as possible from `handle` with `overlapped`, and write it to the FIFO. Before
|
|
/// returning, queue a read into `small_buf` so that `WaitForMultipleObjects` returns when more data
|
|
/// is available. `handle` must have no pending asynchronous operation.
|
|
fn windowsAsyncReadToFifoAndQueueSmallRead(
|
|
handle: windows.HANDLE,
|
|
overlapped: *windows.OVERLAPPED,
|
|
br: *BufferedReader,
|
|
small_buf: *[128]u8,
|
|
bump_amt: usize,
|
|
) !enum { empty, populated, closed_populated, closed } {
|
|
var read_any_data = false;
|
|
while (true) {
|
|
const fifo_read_pending = while (true) {
|
|
const buf = try br.writableWithSize(bump_amt);
|
|
const buf_len = math.cast(u32, buf.len) orelse math.maxInt(u32);
|
|
|
|
if (0 == windows.kernel32.ReadFile(
|
|
handle,
|
|
buf.ptr,
|
|
buf_len,
|
|
&win_dummy_bytes_read,
|
|
overlapped,
|
|
)) switch (windows.GetLastError()) {
|
|
.IO_PENDING => break true,
|
|
.BROKEN_PIPE => return if (read_any_data) .closed_populated else .closed,
|
|
else => |err| return windows.unexpectedError(err),
|
|
};
|
|
|
|
const num_bytes_read = switch (try windowsGetReadResult(handle, overlapped, false)) {
|
|
.success => |n| n,
|
|
.closed => return if (read_any_data) .closed_populated else .closed,
|
|
.aborted => unreachable,
|
|
};
|
|
|
|
read_any_data = true;
|
|
br.update(num_bytes_read);
|
|
|
|
if (num_bytes_read == buf_len) {
|
|
// We filled the buffer, so there's probably more data available.
|
|
continue;
|
|
} else {
|
|
// We didn't fill the buffer, so assume we're out of data.
|
|
// There is no pending read.
|
|
break false;
|
|
}
|
|
};
|
|
|
|
if (fifo_read_pending) cancel_read: {
|
|
// Cancel the pending read into the FIFO.
|
|
_ = windows.kernel32.CancelIo(handle);
|
|
|
|
// We have to wait for the handle to be signalled, i.e. for the cancellation to complete.
|
|
switch (windows.kernel32.WaitForSingleObject(handle, windows.INFINITE)) {
|
|
windows.WAIT_OBJECT_0 => {},
|
|
windows.WAIT_FAILED => return windows.unexpectedError(windows.GetLastError()),
|
|
else => unreachable,
|
|
}
|
|
|
|
// If it completed before we canceled, make sure to tell the FIFO!
|
|
const num_bytes_read = switch (try windowsGetReadResult(handle, overlapped, true)) {
|
|
.success => |n| n,
|
|
.closed => return if (read_any_data) .closed_populated else .closed,
|
|
.aborted => break :cancel_read,
|
|
};
|
|
read_any_data = true;
|
|
br.update(num_bytes_read);
|
|
}
|
|
|
|
// Try to queue the 1-byte read.
|
|
if (0 == windows.kernel32.ReadFile(
|
|
handle,
|
|
small_buf,
|
|
small_buf.len,
|
|
&win_dummy_bytes_read,
|
|
overlapped,
|
|
)) switch (windows.GetLastError()) {
|
|
.IO_PENDING => {
|
|
// 1-byte read pending as intended
|
|
return if (read_any_data) .populated else .empty;
|
|
},
|
|
.BROKEN_PIPE => return if (read_any_data) .closed_populated else .closed,
|
|
else => |err| return windows.unexpectedError(err),
|
|
};
|
|
|
|
// We got data back this time. Write it to the FIFO and run the main loop again.
|
|
const num_bytes_read = switch (try windowsGetReadResult(handle, overlapped, false)) {
|
|
.success => |n| n,
|
|
.closed => return if (read_any_data) .closed_populated else .closed,
|
|
.aborted => unreachable,
|
|
};
|
|
try br.write(small_buf[0..num_bytes_read]);
|
|
read_any_data = true;
|
|
}
|
|
}
|
|
|
|
/// Simple wrapper around `GetOverlappedResult` to determine the result of a `ReadFile` operation.
|
|
/// If `!allow_aborted`, then `aborted` is never returned (`OPERATION_ABORTED` is considered unexpected).
|
|
///
|
|
/// The `ReadFile` documentation states that the number of bytes read by an overlapped `ReadFile` must be determined using `GetOverlappedResult`, even if the
|
|
/// operation immediately returns data:
|
|
/// "Use NULL for [lpNumberOfBytesRead] if this is an asynchronous operation to avoid potentially
|
|
/// erroneous results."
|
|
/// "If `hFile` was opened with `FILE_FLAG_OVERLAPPED`, the following conditions are in effect: [...]
|
|
/// The lpNumberOfBytesRead parameter should be set to NULL. Use the GetOverlappedResult function to
|
|
/// get the actual number of bytes read."
|
|
/// See: https://learn.microsoft.com/en-us/windows/win32/api/fileapi/nf-fileapi-readfile
|
|
fn windowsGetReadResult(
|
|
handle: windows.HANDLE,
|
|
overlapped: *windows.OVERLAPPED,
|
|
allow_aborted: bool,
|
|
) !union(enum) {
|
|
success: u32,
|
|
closed,
|
|
aborted,
|
|
} {
|
|
var num_bytes_read: u32 = undefined;
|
|
if (0 == windows.kernel32.GetOverlappedResult(
|
|
handle,
|
|
overlapped,
|
|
&num_bytes_read,
|
|
0,
|
|
)) switch (windows.GetLastError()) {
|
|
.BROKEN_PIPE => return .closed,
|
|
.OPERATION_ABORTED => |err| if (allow_aborted) {
|
|
return .aborted;
|
|
} else {
|
|
return windows.unexpectedError(err);
|
|
},
|
|
else => |err| return windows.unexpectedError(err),
|
|
};
|
|
return .{ .success = num_bytes_read };
|
|
}
|
|
|
|
/// Given an enum, returns a struct with fields of that enum, each field
|
|
/// representing an I/O stream for polling.
|
|
pub fn PollFiles(comptime StreamEnum: type) type {
|
|
const enum_fields = @typeInfo(StreamEnum).@"enum".fields;
|
|
var struct_fields: [enum_fields.len]std.builtin.Type.StructField = undefined;
|
|
for (&struct_fields, enum_fields) |*struct_field, enum_field| {
|
|
struct_field.* = .{
|
|
.name = enum_field.name,
|
|
.type = std.fs.File,
|
|
.default_value_ptr = null,
|
|
.is_comptime = false,
|
|
.alignment = @alignOf(std.fs.File),
|
|
};
|
|
}
|
|
return @Type(.{ .@"struct" = .{
|
|
.layout = .auto,
|
|
.fields = &struct_fields,
|
|
.decls = &.{},
|
|
.is_tuple = false,
|
|
} });
|
|
}
|
|
|
|
test {
|
|
_ = AllocatingWriter;
|
|
_ = BufferedReader;
|
|
_ = BufferedWriter;
|
|
_ = Reader;
|
|
_ = Writer;
|
|
_ = @import("io/test.zig");
|
|
}
|