std.event.fs.pwritev windows implementation

also fix 2 bugs where the function didn't call allocator.shrink:
 * std.mem.join
 * std.os.path.resolve
This commit is contained in:
Andrew Kelley 2018-08-08 15:05:57 -04:00
parent ac12f0df71
commit 8b456927be
7 changed files with 284 additions and 63 deletions

View File

@ -5,6 +5,8 @@ const assert = std.debug.assert;
const os = std.os;
const mem = std.mem;
const posix = os.posix;
const windows = os.windows;
const Loop = event.Loop;
pub const RequestNode = std.atomic.Queue(Request).Node;
@ -13,7 +15,7 @@ pub const Request = struct {
finish: Finish,
pub const Finish = union(enum) {
TickNode: event.Loop.NextTickNode,
TickNode: Loop.NextTickNode,
DeallocCloseOperation: *CloseOperation,
NoAction,
};
@ -71,7 +73,77 @@ pub const Request = struct {
};
/// data - just the inner references - must live until pwritev promise completes.
pub async fn pwritev(loop: *event.Loop, fd: os.FileHandle, data: []const []const u8, offset: usize) !void {
pub async fn pwritev(loop: *Loop, fd: os.FileHandle, data: []const []const u8, offset: usize) !void {
switch (builtin.os) {
builtin.Os.macosx,
builtin.Os.linux,
=> return await (async pwritevPosix(loop, fd, data, offset) catch unreachable),
builtin.Os.windows,
=> return await (async pwritevWindows(loop, fd, data, offset) catch unreachable),
else => @compileError("Unsupported OS"),
}
}
/// data - just the inner references - must live until pwritev promise completes.
pub async fn pwritevWindows(loop: *Loop, fd: os.FileHandle, data: []const []const u8, offset: usize) !void {
if (data.len == 0) return;
if (data.len == 1) return await (async pwriteWindows(loop, fd, data[0], offset) catch unreachable);
const data_copy = std.mem.dupe(loop.allocator, []const u8, data);
defer loop.allocator.free(data_copy);
var off = offset;
for (data_copy) |buf| {
try await (async pwriteWindows(loop, fd, buf, off) catch unreachable);
off += buf.len;
}
}
pub async fn pwriteWindows(loop: *Loop, fd: os.FileHandle, data: []const u8, offset: u64) os.WindowsWriteError!void {
// workaround for https://github.com/ziglang/zig/issues/1194
suspend {
resume @handle();
}
var resume_node = Loop.ResumeNode.Basic{
.base = Loop.ResumeNode{
.id = Loop.ResumeNode.Id.Basic,
.handle = @handle(),
},
};
const completion_key = @ptrToInt(&resume_node.base);
_ = try os.windowsCreateIoCompletionPort(fd, loop.os_data.io_port, completion_key, undefined);
var overlapped = windows.OVERLAPPED{
.Internal = 0,
.InternalHigh = 0,
.Offset = @truncate(u32, offset),
.OffsetHigh = @truncate(u32, offset >> 32),
.hEvent = null,
};
errdefer {
_ = windows.CancelIoEx(fd, &overlapped);
}
suspend {
_ = windows.WriteFile(fd, data.ptr, @intCast(windows.DWORD, data.len), null, &overlapped);
}
var bytes_transferred: windows.DWORD = undefined;
if (windows.GetOverlappedResult(fd, &overlapped, &bytes_transferred, windows.FALSE) == 0) {
const err = windows.GetLastError();
return switch (err) {
windows.ERROR.IO_PENDING => unreachable,
windows.ERROR.INVALID_USER_BUFFER => error.SystemResources,
windows.ERROR.NOT_ENOUGH_MEMORY => error.SystemResources,
windows.ERROR.OPERATION_ABORTED => error.OperationAborted,
windows.ERROR.NOT_ENOUGH_QUOTA => error.SystemResources,
windows.ERROR.BROKEN_PIPE => error.BrokenPipe,
else => os.unexpectedErrorWindows(err),
};
}
}
/// data - just the inner references - must live until pwritev promise completes.
pub async fn pwritevPosix(loop: *Loop, fd: os.FileHandle, data: []const []const u8, offset: usize) !void {
// workaround for https://github.com/ziglang/zig/issues/1194
suspend {
resume @handle();
@ -100,7 +172,7 @@ pub async fn pwritev(loop: *event.Loop, fd: os.FileHandle, data: []const []const
},
},
.finish = Request.Finish{
.TickNode = event.Loop.NextTickNode{
.TickNode = Loop.NextTickNode{
.prev = null,
.next = null,
.data = @handle(),
@ -118,8 +190,8 @@ pub async fn pwritev(loop: *event.Loop, fd: os.FileHandle, data: []const []const
return req_node.data.msg.PWriteV.result;
}
/// data - just the inner references - must live until pwritev promise completes.
pub async fn preadv(loop: *event.Loop, fd: os.FileHandle, data: []const []u8, offset: usize) !usize {
/// data - just the inner references - must live until preadv promise completes.
pub async fn preadv(loop: *Loop, fd: os.FileHandle, data: []const []u8, offset: usize) !usize {
//const data_dupe = try mem.dupe(loop.allocator, []const u8, data);
//defer loop.allocator.free(data_dupe);
@ -151,7 +223,7 @@ pub async fn preadv(loop: *event.Loop, fd: os.FileHandle, data: []const []u8, of
},
},
.finish = Request.Finish{
.TickNode = event.Loop.NextTickNode{
.TickNode = Loop.NextTickNode{
.prev = null,
.next = null,
.data = @handle(),
@ -169,8 +241,8 @@ pub async fn preadv(loop: *event.Loop, fd: os.FileHandle, data: []const []u8, of
return req_node.data.msg.PReadV.result;
}
pub async fn open(
loop: *event.Loop,
pub async fn openPosix(
loop: *Loop,
path: []const u8,
flags: u32,
mode: os.File.Mode,
@ -196,7 +268,7 @@ pub async fn open(
},
},
.finish = Request.Finish{
.TickNode = event.Loop.NextTickNode{
.TickNode = Loop.NextTickNode{
.prev = null,
.next = null,
.data = @handle(),
@ -214,19 +286,47 @@ pub async fn open(
return req_node.data.msg.Open.result;
}
pub async fn openRead(loop: *event.Loop, path: []const u8) os.File.OpenError!os.FileHandle {
pub async fn openRead(loop: *Loop, path: []const u8) os.File.OpenError!os.FileHandle {
const flags = posix.O_LARGEFILE | posix.O_RDONLY | posix.O_CLOEXEC;
return await (async open(loop, path, flags, 0) catch unreachable);
return await (async openPosix(loop, path, flags, os.File.default_mode) catch unreachable);
}
/// Creates if does not exist. Truncates the file if it exists.
/// Uses the default mode.
pub async fn openWrite(loop: *Loop, path: []const u8) os.File.OpenError!os.FileHandle {
return await (async openWriteMode(loop, path, os.File.default_mode) catch unreachable);
}
/// Creates if does not exist. Truncates the file if it exists.
pub async fn openWriteMode(loop: *Loop, path: []const u8, mode: os.File.Mode) os.File.OpenError!os.FileHandle {
switch (builtin.os) {
builtin.Os.macosx,
builtin.Os.linux,
=> {
const flags = posix.O_LARGEFILE | posix.O_WRONLY | posix.O_CREAT | posix.O_CLOEXEC | posix.O_TRUNC;
return await (async openPosix(loop, path, flags, os.File.default_mode) catch unreachable);
},
builtin.Os.windows,
=> return os.windowsOpen(
loop.allocator,
path,
windows.GENERIC_WRITE,
windows.FILE_SHARE_WRITE | windows.FILE_SHARE_READ | windows.FILE_SHARE_DELETE,
windows.CREATE_ALWAYS,
windows.FILE_ATTRIBUTE_NORMAL | windows.FILE_FLAG_OVERLAPPED,
),
else => @compileError("Unsupported OS"),
}
}
/// Creates if does not exist. Does not truncate.
pub async fn openReadWrite(
loop: *event.Loop,
loop: *Loop,
path: []const u8,
mode: os.File.Mode,
) os.File.OpenError!os.FileHandle {
const flags = posix.O_LARGEFILE | posix.O_RDWR | posix.O_CREAT | posix.O_CLOEXEC;
return await (async open(loop, path, flags, mode) catch unreachable);
return await (async openPosix(loop, path, flags, mode) catch unreachable);
}
/// This abstraction helps to close file handles in defer expressions
@ -236,24 +336,46 @@ pub async fn openReadWrite(
/// If you call `setHandle` then finishing will close the fd; otherwise finishing
/// will deallocate the `CloseOperation`.
pub const CloseOperation = struct {
loop: *event.Loop,
have_fd: bool,
close_req_node: RequestNode,
loop: *Loop,
os_data: OsData,
pub fn start(loop: *event.Loop) (error{OutOfMemory}!*CloseOperation) {
const OsData = switch (builtin.os) {
builtin.Os.linux,
builtin.Os.macosx,
=> struct {
have_fd: bool,
close_req_node: RequestNode,
},
builtin.Os.windows,
=> struct {
handle: ?os.FileHandle,
},
else => @compileError("Unsupported OS"),
};
pub fn start(loop: *Loop) (error{OutOfMemory}!*CloseOperation) {
const self = try loop.allocator.createOne(CloseOperation);
self.* = CloseOperation{
.loop = loop,
.have_fd = false,
.close_req_node = RequestNode{
.prev = null,
.next = null,
.data = Request{
.msg = Request.Msg{
.Close = Request.Msg.Close{ .fd = undefined },
.os_data = switch (builtin.os) {
builtin.Os.linux,
builtin.Os.macosx,
=> OsData{
.have_fd = false,
.close_req_node = RequestNode{
.prev = null,
.next = null,
.data = Request{
.msg = Request.Msg{
.Close = Request.Msg.Close{ .fd = undefined },
},
.finish = Request.Finish{ .DeallocCloseOperation = self },
},
},
.finish = Request.Finish{ .DeallocCloseOperation = self },
},
builtin.Os.windows,
=> OsData{ .handle = null },
else => @compileError("Unsupported OS"),
},
};
return self;
@ -261,36 +383,109 @@ pub const CloseOperation = struct {
/// Defer this after creating.
pub fn finish(self: *CloseOperation) void {
if (self.have_fd) {
self.loop.posixFsRequest(&self.close_req_node);
} else {
self.loop.allocator.destroy(self);
switch (builtin.os) {
builtin.Os.linux,
builtin.Os.macosx,
=> {
if (self.have_fd) {
self.loop.posixFsRequest(&self.close_req_node);
} else {
self.loop.allocator.destroy(self);
}
},
builtin.Os.windows,
=> {
if (self.handle) |handle| {
os.close(handle);
}
self.loop.allocator.destroy(self);
},
else => @compileError("Unsupported OS"),
}
}
pub fn setHandle(self: *CloseOperation, handle: os.FileHandle) void {
self.close_req_node.data.msg.Close.fd = handle;
self.have_fd = true;
switch (builtin.os) {
builtin.Os.linux,
builtin.Os.macosx,
=> {
self.close_req_node.data.msg.Close.fd = handle;
self.have_fd = true;
},
builtin.Os.windows,
=> {
self.handle = handle;
},
else => @compileError("Unsupported OS"),
}
}
/// Undo a `setHandle`.
pub fn clearHandle(self: *CloseOperation) void {
self.have_fd = false;
switch (builtin.os) {
builtin.Os.linux,
builtin.Os.macosx,
=> {
self.have_fd = false;
},
builtin.Os.windows,
=> {
self.handle = null;
},
else => @compileError("Unsupported OS"),
}
}
pub fn getHandle(self: *CloseOperation) os.FileHandle {
assert(self.have_fd);
return self.close_req_node.data.msg.Close.fd;
switch (builtin.os) {
builtin.Os.linux,
builtin.Os.macosx,
=> {
assert(self.have_fd);
return self.close_req_node.data.msg.Close.fd;
},
builtin.Os.windows,
=> {
return self.handle.?;
},
else => @compileError("Unsupported OS"),
}
}
};
/// contents must remain alive until writeFile completes.
pub async fn writeFile(loop: *event.Loop, path: []const u8, contents: []const u8) !void {
/// TODO make this atomic or provide writeFileAtomic and rename this one to writeFileTruncate
pub async fn writeFile(loop: *Loop, path: []const u8, contents: []const u8) !void {
return await (async writeFileMode(loop, path, contents, os.File.default_mode) catch unreachable);
}
/// contents must remain alive until writeFile completes.
pub async fn writeFileMode(loop: *event.Loop, path: []const u8, contents: []const u8, mode: os.File.Mode) !void {
pub async fn writeFileMode(loop: *Loop, path: []const u8, contents: []const u8, mode: os.File.Mode) !void {
switch (builtin.os) {
builtin.Os.linux,
builtin.Os.macosx,
=> return await (async writeFileModeThread(loop, path, contents, mode) catch unreachable),
builtin.Os.windows,
=> return await (async writeFileWindows(loop, path, contents) catch unreachable),
else => @compileError("Unsupported OS"),
}
}
async fn writeFileWindows(loop: *Loop, path: []const u8, contents: []const u8) !void {
const handle = try os.windowsOpen(
loop.allocator,
path,
windows.GENERIC_WRITE,
windows.FILE_SHARE_WRITE | windows.FILE_SHARE_READ | windows.FILE_SHARE_DELETE,
windows.CREATE_ALWAYS,
windows.FILE_ATTRIBUTE_NORMAL | windows.FILE_FLAG_OVERLAPPED,
);
defer os.close(handle);
try await (async pwriteWindows(loop, handle, contents, 0) catch unreachable);
}
async fn writeFileModeThread(loop: *Loop, path: []const u8, contents: []const u8, mode: os.File.Mode) !void {
// workaround for https://github.com/ziglang/zig/issues/1194
suspend {
resume @handle();
@ -312,7 +507,7 @@ pub async fn writeFileMode(loop: *event.Loop, path: []const u8, contents: []cons
},
},
.finish = Request.Finish{
.TickNode = event.Loop.NextTickNode{
.TickNode = Loop.NextTickNode{
.prev = null,
.next = null,
.data = @handle(),
@ -333,7 +528,7 @@ pub async fn writeFileMode(loop: *event.Loop, path: []const u8, contents: []cons
/// The promise resumes when the last data has been confirmed written, but before the file handle
/// is closed.
/// Caller owns returned memory.
pub async fn readFile(loop: *event.Loop, file_path: []const u8, max_size: usize) ![]u8 {
pub async fn readFile(loop: *Loop, file_path: []const u8, max_size: usize) ![]u8 {
var close_op = try CloseOperation.start(loop);
defer close_op.finish();
@ -417,7 +612,7 @@ pub fn Watch(comptime V: type) type {
pub const Error = WatchEventError;
};
pub fn create(loop: *event.Loop, event_buf_count: usize) !*Self {
pub fn create(loop: *Loop, event_buf_count: usize) !*Self {
const channel = try event.Channel(Self.Event.Error!Self.Event).create(loop, event_buf_count);
errdefer channel.destroy();
@ -482,7 +677,7 @@ pub fn Watch(comptime V: type) type {
const flags = posix.O_SYMLINK | posix.O_EVTONLY;
const mode = 0;
const fd = try await (async open(self.channel.loop, resolved_path, flags, mode) catch unreachable);
const fd = try await (async openPosix(self.channel.loop, resolved_path, flags, mode) catch unreachable);
close_op.setHandle(fd);
var put_data: *OsData.Put = undefined;
@ -722,7 +917,7 @@ test "write a file, watch it, write it again" {
try os.makePath(allocator, test_tmp_dir);
defer os.deleteTree(allocator, test_tmp_dir) catch {};
var loop: event.Loop = undefined;
var loop: Loop = undefined;
try loop.initMultiThreaded(allocator);
defer loop.deinit();
@ -734,11 +929,11 @@ test "write a file, watch it, write it again" {
return result;
}
async fn testFsWatchCantFail(loop: *event.Loop, result: *(error!void)) void {
async fn testFsWatchCantFail(loop: *Loop, result: *(error!void)) void {
result.* = await async testFsWatch(loop) catch unreachable;
}
async fn testFsWatch(loop: *event.Loop) !void {
async fn testFsWatch(loop: *Loop) !void {
const file_path = try os.path.join(loop.allocator, test_tmp_dir, "file.txt");
defer loop.allocator.free(file_path);

View File

@ -301,7 +301,7 @@ pub const Loop = struct {
windows.INVALID_HANDLE_VALUE,
null,
undefined,
undefined,
@maxValue(windows.DWORD),
);
errdefer os.close(self.os_data.io_port);
@ -315,7 +315,6 @@ pub const Loop = struct {
// this one is for sending events
.completion_key = @ptrToInt(&eventfd_node.data.base),
},
.prev = undefined,
.next = undefined,
};
self.available_eventfd_resume_nodes.push(eventfd_node);
@ -528,7 +527,12 @@ pub const Loop = struct {
self.workerRun();
self.os_data.fs_thread.wait();
switch (builtin.os) {
builtin.Os.linux,
builtin.Os.macosx,
=> self.os_data.fs_thread.wait(),
else => {},
}
for (self.extra_threads) |extra_thread| {
extra_thread.wait();
@ -794,15 +798,7 @@ pub const Loop = struct {
}
const OsData = switch (builtin.os) {
builtin.Os.linux => struct {
epollfd: i32,
final_eventfd: i32,
final_eventfd_event: os.linux.epoll_event,
fs_thread: *os.Thread,
fs_queue_item: u8,
fs_queue: std.atomic.Queue(fs.Request),
fs_end_request: fs.RequestNode,
},
builtin.Os.linux => LinuxOsData,
builtin.Os.macosx => MacOsData,
builtin.Os.windows => struct {
io_port: windows.HANDLE,
@ -821,6 +817,16 @@ pub const Loop = struct {
fs_queue: std.atomic.Queue(fs.Request),
fs_end_request: fs.RequestNode,
};
const LinuxOsData = struct {
epollfd: i32,
final_eventfd: i32,
final_eventfd_event: os.linux.epoll_event,
fs_thread: *os.Thread,
fs_queue_item: u8,
fs_queue: std.atomic.Queue(fs.Request),
fs_end_request: fs.RequestNode,
};
};
test "std.event.Loop - basic" {

View File

@ -541,7 +541,7 @@ pub fn join(allocator: *Allocator, sep: u8, strings: ...) ![]u8 {
}
}
return buf[0..buf_index];
return allocator.shrink(u8, buf, buf_index);
}
test "mem.join" {

View File

@ -506,7 +506,7 @@ pub fn resolveWindows(allocator: *Allocator, paths: []const []const u8) ![]u8 {
result_index += 1;
}
return result[0..result_index];
return allocator.shrink(u8, result, result_index);
}
/// This function is like a series of `cd` statements executed one after another.

View File

@ -67,8 +67,9 @@ pub const INVALID_FILE_ATTRIBUTES = DWORD(@maxValue(DWORD));
pub const OVERLAPPED = extern struct {
Internal: ULONG_PTR,
InternalHigh: ULONG_PTR,
Pointer: PVOID,
hEvent: HANDLE,
Offset: DWORD,
OffsetHigh: DWORD,
hEvent: ?HANDLE,
};
pub const LPOVERLAPPED = *OVERLAPPED;
@ -350,3 +351,15 @@ pub const E_ACCESSDENIED = @bitCast(c_long, c_ulong(0x80070005));
pub const E_HANDLE = @bitCast(c_long, c_ulong(0x80070006));
pub const E_OUTOFMEMORY = @bitCast(c_long, c_ulong(0x8007000E));
pub const E_INVALIDARG = @bitCast(c_long, c_ulong(0x80070057));
pub const FILE_FLAG_BACKUP_SEMANTICS = 0x02000000;
pub const FILE_FLAG_DELETE_ON_CLOSE = 0x04000000;
pub const FILE_FLAG_NO_BUFFERING = 0x20000000;
pub const FILE_FLAG_OPEN_NO_RECALL = 0x00100000;
pub const FILE_FLAG_OPEN_REPARSE_POINT = 0x00200000;
pub const FILE_FLAG_OVERLAPPED = 0x40000000;
pub const FILE_FLAG_POSIX_SEMANTICS = 0x0100000;
pub const FILE_FLAG_RANDOM_ACCESS = 0x10000000;
pub const FILE_FLAG_SESSION_AWARE = 0x00800000;
pub const FILE_FLAG_SEQUENTIAL_SCAN = 0x08000000;
pub const FILE_FLAG_WRITE_THROUGH = 0x80000000;

View File

@ -1,5 +1,8 @@
use @import("index.zig");
pub extern "kernel32" stdcallcc fn CancelIoEx(hFile: HANDLE, lpOverlapped: LPOVERLAPPED) BOOL;
pub extern "kernel32" stdcallcc fn CloseHandle(hObject: HANDLE) BOOL;
pub extern "kernel32" stdcallcc fn CreateDirectoryA(
@ -91,6 +94,9 @@ pub extern "kernel32" stdcallcc fn GetFinalPathNameByHandleA(
dwFlags: DWORD,
) DWORD;
pub extern "kernel32" stdcallcc fn GetOverlappedResult(hFile: HANDLE, lpOverlapped: *OVERLAPPED, lpNumberOfBytesTransferred: *DWORD, bWait: BOOL) BOOL;
pub extern "kernel32" stdcallcc fn GetProcessHeap() ?HANDLE;
pub extern "kernel32" stdcallcc fn GetQueuedCompletionStatus(CompletionPort: HANDLE, lpNumberOfBytesTransferred: LPDWORD, lpCompletionKey: *ULONG_PTR, lpOverlapped: *?*OVERLAPPED, dwMilliseconds: DWORD) BOOL;
@ -150,12 +156,14 @@ pub extern "kernel32" stdcallcc fn WaitForSingleObject(hHandle: HANDLE, dwMillis
pub extern "kernel32" stdcallcc fn WriteFile(
in_hFile: HANDLE,
in_lpBuffer: *const c_void,
in_lpBuffer: [*]const u8,
in_nNumberOfBytesToWrite: DWORD,
out_lpNumberOfBytesWritten: ?*DWORD,
in_out_lpOverlapped: ?*OVERLAPPED,
) BOOL;
pub extern "kernel32" stdcallcc fn WriteFileEx(hFile: HANDLE, lpBuffer: [*]const u8, nNumberOfBytesToWrite: DWORD, lpOverlapped: LPOVERLAPPED, lpCompletionRoutine: LPOVERLAPPED_COMPLETION_ROUTINE) BOOL;
//TODO: call unicode versions instead of relying on ANSI code page
pub extern "kernel32" stdcallcc fn LoadLibraryA(lpLibFileName: LPCSTR) ?HMODULE;

View File

@ -36,20 +36,19 @@ pub fn windowsClose(handle: windows.HANDLE) void {
pub const WriteError = error{
SystemResources,
OperationAborted,
IoPending,
BrokenPipe,
Unexpected,
};
pub fn windowsWrite(handle: windows.HANDLE, bytes: []const u8) WriteError!void {
if (windows.WriteFile(handle, @ptrCast(*const c_void, bytes.ptr), @intCast(u32, bytes.len), null, null) == 0) {
if (windows.WriteFile(handle, bytes.ptr, @intCast(u32, bytes.len), null, null) == 0) {
const err = windows.GetLastError();
return switch (err) {
windows.ERROR.INVALID_USER_BUFFER => WriteError.SystemResources,
windows.ERROR.NOT_ENOUGH_MEMORY => WriteError.SystemResources,
windows.ERROR.OPERATION_ABORTED => WriteError.OperationAborted,
windows.ERROR.NOT_ENOUGH_QUOTA => WriteError.SystemResources,
windows.ERROR.IO_PENDING => WriteError.IoPending,
windows.ERROR.IO_PENDING => unreachable,
windows.ERROR.BROKEN_PIPE => WriteError.BrokenPipe,
else => os.unexpectedErrorWindows(err),
};