diff --git a/lib/std/event.zig b/lib/std/event.zig index 56c5223ba3..2c72c22588 100644 --- a/lib/std/event.zig +++ b/lib/std/event.zig @@ -7,7 +7,6 @@ pub const RwLock = @import("event/rwlock.zig").RwLock; pub const RwLocked = @import("event/rwlocked.zig").RwLocked; pub const Loop = @import("event/loop.zig").Loop; pub const fs = @import("event/fs.zig"); -pub const net = @import("event/net.zig"); test "import event tests" { _ = @import("event/channel.zig"); @@ -19,5 +18,4 @@ test "import event tests" { _ = @import("event/rwlock.zig"); _ = @import("event/rwlocked.zig"); _ = @import("event/loop.zig"); - _ = @import("event/net.zig"); } diff --git a/lib/std/event/channel.zig b/lib/std/event/channel.zig index 1092f2204a..c894e6c0e8 100644 --- a/lib/std/event/channel.zig +++ b/lib/std/event/channel.zig @@ -4,9 +4,11 @@ const assert = std.debug.assert; const testing = std.testing; const Loop = std.event.Loop; -/// many producer, many consumer, thread-safe, runtime configurable buffer size -/// when buffer is empty, consumers suspend and are resumed by producers -/// when buffer is full, producers suspend and are resumed by consumers +/// Many producer, many consumer, thread-safe, runtime configurable buffer size. +/// When buffer is empty, consumers suspend and are resumed by producers. +/// When buffer is full, producers suspend and are resumed by consumers. +/// TODO now that async function rewrite has landed, this API should be adjusted +/// to not use the event loop's allocator, and to not require allocation. pub fn Channel(comptime T: type) type { return struct { loop: *Loop, @@ -48,7 +50,7 @@ pub fn Channel(comptime T: type) type { tick_node: *Loop.NextTickNode, }; - /// call destroy when done + /// Call `destroy` when done. pub fn create(loop: *Loop, capacity: usize) !*SelfChannel { const buffer_nodes = try loop.allocator.alloc(T, capacity); errdefer loop.allocator.free(buffer_nodes); diff --git a/lib/std/event/loop.zig b/lib/std/event/loop.zig index cec722985b..b0b42fe268 100644 --- a/lib/std/event/loop.zig +++ b/lib/std/event/loop.zig @@ -448,26 +448,67 @@ pub const Loop = struct { self.finishOneEvent(); } - pub fn linuxWaitFd(self: *Loop, fd: i32, flags: u32) !void { - defer self.linuxRemoveFd(fd); + pub fn linuxWaitFd(self: *Loop, fd: i32, flags: u32) void { + assert(flags & os.EPOLLET == os.EPOLLET); + assert(flags & os.EPOLLONESHOT == os.EPOLLONESHOT); + var resume_node = ResumeNode.Basic{ + .base = ResumeNode{ + .id = .Basic, + .handle = @frame(), + .overlapped = ResumeNode.overlapped_init, + }, + }; + var need_to_delete = false; + defer if (need_to_delete) self.linuxRemoveFd(fd); + suspend { - var resume_node = ResumeNode.Basic{ - .base = ResumeNode{ - .id = .Basic, - .handle = @frame(), - .overlapped = ResumeNode.overlapped_init, + if (self.linuxAddFd(fd, &resume_node.base, flags)) |_| { + need_to_delete = true; + } else |err| switch (err) { + error.FileDescriptorNotRegistered => unreachable, + error.OperationCausesCircularLoop => unreachable, + error.FileDescriptorIncompatibleWithEpoll => unreachable, + error.FileDescriptorAlreadyPresentInSet => unreachable, // evented writes to the same fd is not thread-safe + + error.SystemResources, + error.UserResourceLimitReached, + error.Unexpected, + => { + // Fall back to a blocking poll(). Ideally this codepath is never hit, since + // epoll should be just fine. But this is better than incorrect behavior. + var poll_flags: i16 = 0; + if ((flags & os.EPOLLIN) != 0) poll_flags |= os.POLLIN; + if ((flags & os.EPOLLOUT) != 0) poll_flags |= os.POLLOUT; + var pfd = [1]os.pollfd{os.pollfd{ + .fd = fd, + .events = poll_flags, + .revents = undefined, + }}; + _ = os.poll(&pfd, -1) catch |poll_err| switch (poll_err) { + error.SystemResources, + error.Unexpected, + => { + // Even poll() didn't work. The best we can do now is sleep for a + // small duration and then hope that something changed. + std.time.sleep(1 * std.time.millisecond); + }, + }; + resume @frame(); }, - }; - try self.linuxAddFd(fd, &resume_node.base, flags); + } } } - pub fn waitUntilFdReadable(self: *Loop, fd: os.fd_t) !void { - return self.linuxWaitFd(fd, os.EPOLLET | os.EPOLLIN); + pub fn waitUntilFdReadable(self: *Loop, fd: os.fd_t) void { + return self.linuxWaitFd(fd, os.EPOLLET | os.EPOLLONESHOT | os.EPOLLIN); } - pub fn waitUntilFdWritable(self: *Loop, fd: os.fd_t) !void { - return self.linuxWaitFd(fd, os.EPOLLET | os.EPOLLOUT); + pub fn waitUntilFdWritable(self: *Loop, fd: os.fd_t) void { + return self.linuxWaitFd(fd, os.EPOLLET | os.EPOLLONESHOT | os.EPOLLOUT); + } + + pub fn waitUntilFdWritableOrReadable(self: *Loop, fd: os.fd_t) void { + return self.linuxWaitFd(fd, os.EPOLLET | os.EPOLLONESHOT | os.EPOLLOUT | os.EPOLLIN); } pub async fn bsdWaitKev(self: *Loop, ident: usize, filter: i16, fflags: u32) !os.Kevent { @@ -645,7 +686,7 @@ pub const Loop = struct { .linux => { self.posixFsRequest(&self.os_data.fs_end_request); // writing 8 bytes to an eventfd cannot fail - os.write(self.os_data.final_eventfd, wakeup_bytes) catch unreachable; + noasync os.write(self.os_data.final_eventfd, wakeup_bytes) catch unreachable; return; }, .macosx, .freebsd, .netbsd => { @@ -793,6 +834,8 @@ pub const Loop = struct { } } + // TODO make this whole function noasync + // https://github.com/ziglang/zig/issues/3157 fn posixFsRun(self: *Loop) void { while (true) { if (builtin.os == .linux) { @@ -802,27 +845,27 @@ pub const Loop = struct { switch (node.data.msg) { .End => return, .WriteV => |*msg| { - msg.result = os.writev(msg.fd, msg.iov); + msg.result = noasync os.writev(msg.fd, msg.iov); }, .PWriteV => |*msg| { - msg.result = os.pwritev(msg.fd, msg.iov, msg.offset); + msg.result = noasync os.pwritev(msg.fd, msg.iov, msg.offset); }, .PReadV => |*msg| { - msg.result = os.preadv(msg.fd, msg.iov, msg.offset); + msg.result = noasync os.preadv(msg.fd, msg.iov, msg.offset); }, .Open => |*msg| { - msg.result = os.openC(msg.path.ptr, msg.flags, msg.mode); + msg.result = noasync os.openC(msg.path.ptr, msg.flags, msg.mode); }, - .Close => |*msg| os.close(msg.fd), + .Close => |*msg| noasync os.close(msg.fd), .WriteFile => |*msg| blk: { const flags = os.O_LARGEFILE | os.O_WRONLY | os.O_CREAT | os.O_CLOEXEC | os.O_TRUNC; - const fd = os.openC(msg.path.ptr, flags, msg.mode) catch |err| { + const fd = noasync os.openC(msg.path.ptr, flags, msg.mode) catch |err| { msg.result = err; break :blk; }; - defer os.close(fd); - msg.result = os.write(fd, msg.contents); + defer noasync os.close(fd); + msg.result = noasync os.write(fd, msg.contents); }, } switch (node.data.finish) { diff --git a/lib/std/event/net.zig b/lib/std/event/net.zig deleted file mode 100644 index bed665dcdc..0000000000 --- a/lib/std/event/net.zig +++ /dev/null @@ -1,358 +0,0 @@ -const std = @import("../std.zig"); -const builtin = @import("builtin"); -const testing = std.testing; -const event = std.event; -const mem = std.mem; -const os = std.os; -const Loop = std.event.Loop; -const File = std.fs.File; -const fd_t = os.fd_t; - -pub const Server = struct { - handleRequestFn: async fn (*Server, *const std.net.Address, File) void, - - loop: *Loop, - sockfd: ?i32, - accept_frame: ?anyframe, - listen_address: std.net.Address, - - waiting_for_emfile_node: PromiseNode, - listen_resume_node: event.Loop.ResumeNode, - - const PromiseNode = std.TailQueue(anyframe).Node; - - pub fn init(loop: *Loop) Server { - // TODO can't initialize handler here because we need well defined copy elision - return Server{ - .loop = loop, - .sockfd = null, - .accept_frame = null, - .handleRequestFn = undefined, - .waiting_for_emfile_node = undefined, - .listen_address = undefined, - .listen_resume_node = event.Loop.ResumeNode{ - .id = event.Loop.ResumeNode.Id.Basic, - .handle = undefined, - .overlapped = event.Loop.ResumeNode.overlapped_init, - }, - }; - } - - pub fn listen( - self: *Server, - address: *const std.net.Address, - handleRequestFn: async fn (*Server, *const std.net.Address, File) void, - ) !void { - self.handleRequestFn = handleRequestFn; - - const sockfd = try os.socket(os.AF_INET, os.SOCK_STREAM | os.SOCK_CLOEXEC | os.SOCK_NONBLOCK, os.PROTO_tcp); - errdefer os.close(sockfd); - self.sockfd = sockfd; - - try os.bind(sockfd, &address.os_addr); - try os.listen(sockfd, os.SOMAXCONN); - self.listen_address = std.net.Address.initPosix(try os.getsockname(sockfd)); - - self.accept_frame = async Server.handler(self); - errdefer await self.accept_frame.?; - - self.listen_resume_node.handle = self.accept_frame.?; - try self.loop.linuxAddFd(sockfd, &self.listen_resume_node, os.EPOLLIN | os.EPOLLOUT | os.EPOLLET); - errdefer self.loop.removeFd(sockfd); - } - - /// Stop listening - pub fn close(self: *Server) void { - self.loop.linuxRemoveFd(self.sockfd.?); - if (self.sockfd) |fd| { - os.close(fd); - self.sockfd = null; - } - } - - pub fn deinit(self: *Server) void { - if (self.accept_frame) |accept_frame| await accept_frame; - if (self.sockfd) |sockfd| os.close(sockfd); - } - - pub async fn handler(self: *Server) void { - while (true) { - var accepted_addr: std.net.Address = undefined; - // TODO just inline the following function here and don't expose it as posixAsyncAccept - if (os.accept4_async(self.sockfd.?, &accepted_addr.os_addr, os.SOCK_NONBLOCK | os.SOCK_CLOEXEC)) |accepted_fd| { - if (accepted_fd == -1) { - // would block - suspend; // we will get resumed by epoll_wait in the event loop - continue; - } - var socket = File.openHandle(accepted_fd); - self.handleRequestFn(self, &accepted_addr, socket); - } else |err| switch (err) { - error.ProcessFdQuotaExceeded => @panic("TODO handle this error"), - error.ConnectionAborted => continue, - - error.FileDescriptorNotASocket => unreachable, - error.OperationNotSupported => unreachable, - - error.SystemFdQuotaExceeded, error.SystemResources, error.ProtocolFailure, error.BlockedByFirewall, error.Unexpected => { - @panic("TODO handle this error"); - }, - } - } - } -}; - -pub async fn connectUnixSocket(loop: *Loop, path: []const u8) !i32 { - const sockfd = try os.socket( - os.AF_UNIX, - os.SOCK_STREAM | os.SOCK_CLOEXEC | os.SOCK_NONBLOCK, - 0, - ); - errdefer os.close(sockfd); - - var sock_addr = os.sockaddr_un{ - .family = os.AF_UNIX, - .path = undefined, - }; - - if (path.len > @typeOf(sock_addr.path).len) return error.NameTooLong; - mem.copy(u8, sock_addr.path[0..], path); - const size = @intCast(u32, @sizeOf(os.sa_family_t) + path.len); - try os.connect_async(sockfd, &sock_addr, size); - try loop.linuxWaitFd(sockfd, os.EPOLLIN | os.EPOLLOUT | os.EPOLLET); - try os.getsockoptError(sockfd); - - return sockfd; -} - -pub const ReadError = error{ - SystemResources, - Unexpected, - UserResourceLimitReached, - InputOutput, - - FileDescriptorNotRegistered, // TODO remove this possibility - OperationCausesCircularLoop, // TODO remove this possibility - FileDescriptorAlreadyPresentInSet, // TODO remove this possibility - FileDescriptorIncompatibleWithEpoll, // TODO remove this possibility -}; - -/// returns number of bytes read. 0 means EOF. -pub async fn read(loop: *std.event.Loop, fd: fd_t, buffer: []u8) ReadError!usize { - const iov = os.iovec{ - .iov_base = buffer.ptr, - .iov_len = buffer.len, - }; - const iovs: *const [1]os.iovec = &iov; - return readvPosix(loop, fd, iovs, 1); -} - -pub const WriteError = error{}; - -pub async fn write(loop: *std.event.Loop, fd: fd_t, buffer: []const u8) WriteError!void { - const iov = os.iovec_const{ - .iov_base = buffer.ptr, - .iov_len = buffer.len, - }; - const iovs: *const [1]os.iovec_const = &iov; - return writevPosix(loop, fd, iovs, 1); -} - -pub async fn writevPosix(loop: *Loop, fd: i32, iov: [*]const os.iovec_const, count: usize) !void { - while (true) { - switch (builtin.os) { - .macosx, .linux => { - switch (os.errno(os.system.writev(fd, iov, count))) { - 0 => return, - os.EINTR => continue, - os.ESPIPE => unreachable, - os.EINVAL => unreachable, - os.EFAULT => unreachable, - os.EAGAIN => { - try loop.linuxWaitFd(fd, os.EPOLLET | os.EPOLLOUT); - continue; - }, - os.EBADF => unreachable, // always a race condition - os.EDESTADDRREQ => unreachable, // connect was never called - os.EDQUOT => unreachable, - os.EFBIG => unreachable, - os.EIO => return error.InputOutput, - os.ENOSPC => unreachable, - os.EPERM => return error.AccessDenied, - os.EPIPE => unreachable, - else => |err| return os.unexpectedErrno(err), - } - }, - else => @compileError("Unsupported OS"), - } - } -} - -/// returns number of bytes read. 0 means EOF. -pub async fn readvPosix(loop: *std.event.Loop, fd: i32, iov: [*]os.iovec, count: usize) !usize { - while (true) { - switch (builtin.os) { - builtin.Os.linux, builtin.Os.freebsd, builtin.Os.macosx => { - const rc = os.system.readv(fd, iov, count); - switch (os.errno(rc)) { - 0 => return rc, - os.EINTR => continue, - os.EINVAL => unreachable, - os.EFAULT => unreachable, - os.EAGAIN => { - try loop.linuxWaitFd(fd, os.EPOLLET | os.EPOLLIN); - continue; - }, - os.EBADF => unreachable, // always a race condition - os.EIO => return error.InputOutput, - os.EISDIR => unreachable, - os.ENOBUFS => return error.SystemResources, - os.ENOMEM => return error.SystemResources, - else => |err| return os.unexpectedErrno(err), - } - }, - else => @compileError("Unsupported OS"), - } - } -} - -pub async fn writev(loop: *Loop, fd: fd_t, data: []const []const u8) !void { - const iovecs = try loop.allocator.alloc(os.iovec_const, data.len); - defer loop.allocator.free(iovecs); - - for (data) |buf, i| { - iovecs[i] = os.iovec_const{ - .iov_base = buf.ptr, - .iov_len = buf.len, - }; - } - - return writevPosix(loop, fd, iovecs.ptr, data.len); -} - -pub async fn readv(loop: *Loop, fd: fd_t, data: []const []u8) !usize { - const iovecs = try loop.allocator.alloc(os.iovec, data.len); - defer loop.allocator.free(iovecs); - - for (data) |buf, i| { - iovecs[i] = os.iovec{ - .iov_base = buf.ptr, - .iov_len = buf.len, - }; - } - - return readvPosix(loop, fd, iovecs.ptr, data.len); -} - -pub async fn connect(loop: *Loop, _address: *const std.net.Address) !File { - var address = _address.*; // TODO https://github.com/ziglang/zig/issues/1592 - - const sockfd = try os.socket(os.AF_INET, os.SOCK_STREAM | os.SOCK_CLOEXEC | os.SOCK_NONBLOCK, os.PROTO_tcp); - errdefer os.close(sockfd); - - try os.connect_async(sockfd, &address.os_addr, @sizeOf(os.sockaddr_in)); - try loop.linuxWaitFd(sockfd, os.EPOLLIN | os.EPOLLOUT | os.EPOLLET); - try os.getsockoptError(sockfd); - - return File.openHandle(sockfd); -} - -test "listen on a port, send bytes, receive bytes" { - // https://github.com/ziglang/zig/issues/2377 - if (true) return error.SkipZigTest; - - if (builtin.os != builtin.Os.linux) { - // TODO build abstractions for other operating systems - return error.SkipZigTest; - } - - const MyServer = struct { - tcp_server: Server, - - const Self = @This(); - async fn handler(tcp_server: *Server, _addr: *const std.net.Address, _socket: File) void { - const self = @fieldParentPtr(Self, "tcp_server", tcp_server); - var socket = _socket; // TODO https://github.com/ziglang/zig/issues/1592 - defer socket.close(); - const next_handler = errorableHandler(self, _addr, socket) catch |err| { - std.debug.panic("unable to handle connection: {}\n", err); - }; - } - async fn errorableHandler(self: *Self, _addr: *const std.net.Address, _socket: File) !void { - const addr = _addr.*; // TODO https://github.com/ziglang/zig/issues/1592 - var socket = _socket; // TODO https://github.com/ziglang/zig/issues/1592 - - const stream = &socket.outStream().stream; - try stream.print("hello from server\n"); - } - }; - - const ip4addr = std.net.parseIp4("127.0.0.1") catch unreachable; - const addr = std.net.Address.initIp4(ip4addr, 0); - - var loop: Loop = undefined; - try loop.initSingleThreaded(std.debug.global_allocator); - var server = MyServer{ .tcp_server = Server.init(&loop) }; - defer server.tcp_server.deinit(); - try server.tcp_server.listen(&addr, MyServer.handler); - - _ = async doAsyncTest(&loop, &server.tcp_server.listen_address, &server.tcp_server); - loop.run(); -} - -async fn doAsyncTest(loop: *Loop, address: *const std.net.Address, server: *Server) void { - errdefer @panic("test failure"); - - var socket_file = try connect(loop, address); - defer socket_file.close(); - - var buf: [512]u8 = undefined; - const amt_read = try socket_file.read(buf[0..]); - const msg = buf[0..amt_read]; - testing.expect(mem.eql(u8, msg, "hello from server\n")); - server.close(); -} - -pub const OutStream = struct { - fd: fd_t, - stream: Stream, - loop: *Loop, - - pub const Error = WriteError; - pub const Stream = event.io.OutStream(Error); - - pub fn init(loop: *Loop, fd: fd_t) OutStream { - return OutStream{ - .fd = fd, - .loop = loop, - .stream = Stream{ .writeFn = writeFn }, - }; - } - - async fn writeFn(out_stream: *Stream, bytes: []const u8) Error!void { - const self = @fieldParentPtr(OutStream, "stream", out_stream); - return write(self.loop, self.fd, bytes); - } -}; - -pub const InStream = struct { - fd: fd_t, - stream: Stream, - loop: *Loop, - - pub const Error = ReadError; - pub const Stream = event.io.InStream(Error); - - pub fn init(loop: *Loop, fd: fd_t) InStream { - return InStream{ - .fd = fd, - .loop = loop, - .stream = Stream{ .readFn = readFn }, - }; - } - - async fn readFn(in_stream: *Stream, bytes: []u8) Error!usize { - const self = @fieldParentPtr(InStream, "stream", in_stream); - return read(self.loop, self.fd, bytes); - } -}; diff --git a/lib/std/fmt.zig b/lib/std/fmt.zig index db30fa916d..d3d795bf9d 100644 --- a/lib/std/fmt.zig +++ b/lib/std/fmt.zig @@ -53,7 +53,7 @@ fn peekIsAlign(comptime fmt: []const u8) bool { /// The format string must be comptime known and may contain placeholders following /// this format: /// `{[position][specifier]:[fill][alignment][width].[precision]}` -/// +/// /// Each word between `[` and `]` is a parameter you have to replace with something: /// /// - *position* is the index of the argument that should be inserted @@ -78,7 +78,7 @@ fn peekIsAlign(comptime fmt: []const u8) bool { /// - `d`: output numeric value in decimal notation /// - `b`: output integer value in binary notation /// - `c`: output integer as an ASCII character. Integer type must have 8 bits at max. -/// - `*`: output the address of the value instead of the value itself. +/// - `*`: output the address of the value instead of the value itself. /// /// If a formatted user type contains a function of the type /// ``` diff --git a/lib/std/io.zig b/lib/std/io.zig index 39c3a7cf9b..95280b888f 100644 --- a/lib/std/io.zig +++ b/lib/std/io.zig @@ -64,68 +64,7 @@ pub const SeekableStream = @import("io/seekable_stream.zig").SeekableStream; pub const SliceSeekableInStream = @import("io/seekable_stream.zig").SliceSeekableInStream; pub const COutStream = @import("io/c_out_stream.zig").COutStream; pub const InStream = @import("io/in_stream.zig").InStream; - -pub fn OutStream(comptime WriteError: type) type { - return struct { - const Self = @This(); - pub const Error = WriteError; - - writeFn: fn (self: *Self, bytes: []const u8) Error!void, - - pub fn print(self: *Self, comptime format: []const u8, args: ...) Error!void { - return std.fmt.format(self, Error, self.writeFn, format, args); - } - - pub fn write(self: *Self, bytes: []const u8) Error!void { - return self.writeFn(self, bytes); - } - - pub fn writeByte(self: *Self, byte: u8) Error!void { - const slice = (*const [1]u8)(&byte)[0..]; - return self.writeFn(self, slice); - } - - pub fn writeByteNTimes(self: *Self, byte: u8, n: usize) Error!void { - const slice = (*const [1]u8)(&byte)[0..]; - var i: usize = 0; - while (i < n) : (i += 1) { - try self.writeFn(self, slice); - } - } - - /// Write a native-endian integer. - pub fn writeIntNative(self: *Self, comptime T: type, value: T) Error!void { - var bytes: [(T.bit_count + 7) / 8]u8 = undefined; - mem.writeIntNative(T, &bytes, value); - return self.writeFn(self, bytes); - } - - /// Write a foreign-endian integer. - pub fn writeIntForeign(self: *Self, comptime T: type, value: T) Error!void { - var bytes: [(T.bit_count + 7) / 8]u8 = undefined; - mem.writeIntForeign(T, &bytes, value); - return self.writeFn(self, bytes); - } - - pub fn writeIntLittle(self: *Self, comptime T: type, value: T) Error!void { - var bytes: [(T.bit_count + 7) / 8]u8 = undefined; - mem.writeIntLittle(T, &bytes, value); - return self.writeFn(self, bytes); - } - - pub fn writeIntBig(self: *Self, comptime T: type, value: T) Error!void { - var bytes: [(T.bit_count + 7) / 8]u8 = undefined; - mem.writeIntBig(T, &bytes, value); - return self.writeFn(self, bytes); - } - - pub fn writeInt(self: *Self, comptime T: type, value: T, endian: builtin.Endian) Error!void { - var bytes: [(T.bit_count + 7) / 8]u8 = undefined; - mem.writeInt(T, &bytes, value, endian); - return self.writeFn(self, bytes); - } - }; -} +pub const OutStream = @import("io/out_stream.zig").OutStream; /// TODO move this to `std.fs` and add a version to `std.fs.Dir`. pub fn writeFile(path: []const u8, data: []const u8) !void { diff --git a/lib/std/io/in_stream.zig b/lib/std/io/in_stream.zig index 6efeaa0f16..9854b90794 100644 --- a/lib/std/io/in_stream.zig +++ b/lib/std/io/in_stream.zig @@ -11,7 +11,6 @@ pub const stack_size: usize = if (@hasDecl(root, "stack_size_std_io_InStream")) root.stack_size_std_io_InStream else default_stack_size; -pub const stack_align = 16; pub fn InStream(comptime ReadError: type) type { return struct { @@ -34,7 +33,7 @@ pub fn InStream(comptime ReadError: type) type { if (std.io.is_async) { // Let's not be writing 0xaa in safe modes for upwards of 4 MiB for every stream read. @setRuntimeSafety(false); - var stack_frame: [stack_size]u8 align(stack_align) = undefined; + var stack_frame: [stack_size]u8 align(std.Target.stack_align) = undefined; return await @asyncCall(&stack_frame, {}, self.readFn, self, buffer); } else { return self.readFn(self, buffer); diff --git a/lib/std/io/out_stream.zig b/lib/std/io/out_stream.zig new file mode 100644 index 0000000000..42c40337a8 --- /dev/null +++ b/lib/std/io/out_stream.zig @@ -0,0 +1,87 @@ +const std = @import("../std.zig"); +const builtin = @import("builtin"); +const root = @import("root"); +const mem = std.mem; + +pub const default_stack_size = 1 * 1024 * 1024; +pub const stack_size: usize = if (@hasDecl(root, "stack_size_std_io_OutStream")) + root.stack_size_std_io_OutStream +else + default_stack_size; + +/// TODO this is not integrated with evented I/O yet. +/// https://github.com/ziglang/zig/issues/3557 +pub fn OutStream(comptime WriteError: type) type { + return struct { + const Self = @This(); + pub const Error = WriteError; + // TODO https://github.com/ziglang/zig/issues/3557 + pub const WriteFn = if (std.io.is_async and false) + async fn (self: *Self, bytes: []const u8) Error!void + else + fn (self: *Self, bytes: []const u8) Error!void; + + writeFn: WriteFn, + + pub fn write(self: *Self, bytes: []const u8) Error!void { + // TODO https://github.com/ziglang/zig/issues/3557 + if (std.io.is_async and false) { + // Let's not be writing 0xaa in safe modes for upwards of 4 MiB for every stream write. + @setRuntimeSafety(false); + var stack_frame: [stack_size]u8 align(std.Target.stack_align) = undefined; + return await @asyncCall(&stack_frame, {}, self.writeFn, self, bytes); + } else { + return self.writeFn(self, bytes); + } + } + + pub fn print(self: *Self, comptime format: []const u8, args: ...) Error!void { + return std.fmt.format(self, Error, self.writeFn, format, args); + } + + pub fn writeByte(self: *Self, byte: u8) Error!void { + const slice = (*const [1]u8)(&byte)[0..]; + return self.writeFn(self, slice); + } + + pub fn writeByteNTimes(self: *Self, byte: u8, n: usize) Error!void { + const slice = (*const [1]u8)(&byte)[0..]; + var i: usize = 0; + while (i < n) : (i += 1) { + try self.writeFn(self, slice); + } + } + + /// Write a native-endian integer. + pub fn writeIntNative(self: *Self, comptime T: type, value: T) Error!void { + var bytes: [(T.bit_count + 7) / 8]u8 = undefined; + mem.writeIntNative(T, &bytes, value); + return self.writeFn(self, bytes); + } + + /// Write a foreign-endian integer. + pub fn writeIntForeign(self: *Self, comptime T: type, value: T) Error!void { + var bytes: [(T.bit_count + 7) / 8]u8 = undefined; + mem.writeIntForeign(T, &bytes, value); + return self.writeFn(self, bytes); + } + + pub fn writeIntLittle(self: *Self, comptime T: type, value: T) Error!void { + var bytes: [(T.bit_count + 7) / 8]u8 = undefined; + mem.writeIntLittle(T, &bytes, value); + return self.writeFn(self, bytes); + } + + pub fn writeIntBig(self: *Self, comptime T: type, value: T) Error!void { + var bytes: [(T.bit_count + 7) / 8]u8 = undefined; + mem.writeIntBig(T, &bytes, value); + return self.writeFn(self, bytes); + } + + pub fn writeInt(self: *Self, comptime T: type, value: T, endian: builtin.Endian) Error!void { + var bytes: [(T.bit_count + 7) / 8]u8 = undefined; + mem.writeInt(T, &bytes, value, endian); + return self.writeFn(self, bytes); + } + }; +} diff --git a/lib/std/net.zig b/lib/std/net.zig index c0bba79a7b..c860dedef5 100644 --- a/lib/std/net.zig +++ b/lib/std/net.zig @@ -6,6 +6,10 @@ const mem = std.mem; const os = std.os; const fs = std.fs; +test "" { + _ = @import("net/test.zig"); +} + pub const TmpWinAddr = struct { family: u8, data: [14]u8, @@ -21,6 +25,9 @@ pub const OsAddress = switch (builtin.os) { pub const Address = struct { os_addr: OsAddress, + // TODO this crashed the compiler + //pub const localhost = initIp4(parseIp4("127.0.0.1") catch unreachable, 0); + pub fn initIp4(ip4: u32, _port: u16) Address { return Address{ .os_addr = os.sockaddr{ @@ -141,6 +148,14 @@ pub const Address = struct { else => return output(context, "(unrecognized address family)"), } } + + fn getOsSockLen(self: Address) os.socklen_t { + switch (self.os_addr.un.family) { + os.AF_INET => return @sizeOf(os.sockaddr_in), + os.AF_INET6 => return @sizeOf(os.sockaddr_in6), + else => unreachable, + } + } }; pub fn parseIp4(buf: []const u8) !u32 { @@ -260,34 +275,8 @@ pub fn parseIp6(buf: []const u8) !Ip6Addr { return error.Incomplete; } -test "std.net.parseIp4" { - assert((try parseIp4("127.0.0.1")) == mem.bigToNative(u32, 0x7f000001)); - - testParseIp4Fail("256.0.0.1", error.Overflow); - testParseIp4Fail("x.0.0.1", error.InvalidCharacter); - testParseIp4Fail("127.0.0.1.1", error.InvalidEnd); - testParseIp4Fail("127.0.0.", error.Incomplete); - testParseIp4Fail("100..0.1", error.InvalidCharacter); -} - -fn testParseIp4Fail(buf: []const u8, expected_err: anyerror) void { - if (parseIp4(buf)) |_| { - @panic("expected error"); - } else |e| { - assert(e == expected_err); - } -} - -test "std.net.parseIp6" { - const ip6 = try parseIp6("FF01:0:0:0:0:0:0:FB"); - const addr = Address.initIp6(ip6, 80); - var buf: [100]u8 = undefined; - const printed = try std.fmt.bufPrint(&buf, "{}", addr); - std.testing.expect(mem.eql(u8, "[ff01::fb]:80", printed)); -} - pub fn connectUnixSocket(path: []const u8) !fs.File { - const opt_non_block = if (std.event.Loop.instance != null) os.SOCK_NONBLOCK else 0; + const opt_non_block = if (std.io.mode == .evented) os.SOCK_NONBLOCK else 0; const sockfd = try os.socket( os.AF_UNIX, os.SOCK_STREAM | os.SOCK_CLOEXEC | opt_non_block, @@ -305,13 +294,7 @@ pub fn connectUnixSocket(path: []const u8) !fs.File { if (path.len > @typeOf(sock_addr.un.path).len) return error.NameTooLong; mem.copy(u8, sock_addr.un.path[0..], path); const size = @intCast(u32, @sizeOf(os.sa_family_t) + path.len); - if (std.event.Loop.instance) |loop| { - try os.connect_async(sockfd, &sock_addr, size); - try loop.linuxWaitFd(sockfd, os.EPOLLIN | os.EPOLLOUT | os.EPOLLET); - try os.getsockoptError(sockfd); - } else { - try os.connect(sockfd, &sock_addr, size); - } + try os.connect(sockfd, &sock_addr, size); return fs.File.openHandle(sockfd); } @@ -330,6 +313,27 @@ pub const AddressList = struct { } }; +/// All memory allocated with `allocator` will be freed before this function returns. +pub fn tcpConnectToHost(allocator: *mem.Allocator, name: []const u8, port: u16) !fs.File { + const list = getAddressList(allocator, name, port); + defer list.deinit(); + + const addrs = list.addrs.toSliceConst(); + if (addrs.len == 0) return error.UnknownHostName; + + return tcpConnectToAddress(addrs[0], port); +} + +pub fn tcpConnectToAddress(address: Address) !fs.File { + const nonblock = if (std.io.is_async) os.SOCK_NONBLOCK else 0; + const sock_flags = os.SOCK_STREAM | os.SOCK_CLOEXEC | nonblock; + const sockfd = try os.socket(address.os_addr.un.family, sock_flags, os.IPPROTO_TCP); + errdefer os.close(sockfd); + try os.connect(sockfd, address.os_addr, address.getOsSockLen()); + + return fs.File{ .handle = sockfd }; +} + /// Call `AddressList.deinit` on the result. pub fn getAddressList(allocator: *mem.Allocator, name: []const u8, port: u16) !*AddressList { const result = blk: { @@ -375,7 +379,7 @@ pub fn getAddressList(allocator: *mem.Allocator, name: []const u8, port: u16) !* c.EAI_FAMILY => return error.AddressFamilyNotSupported, c.EAI_MEMORY => return error.OutOfMemory, c.EAI_NODATA => return error.HostLacksNetworkAddresses, - c.EAI_NONAME => return error.UnknownName, + c.EAI_NONAME => return error.UnknownHostName, c.EAI_SERVICE => return error.ServiceUnavailable, c.EAI_SOCKTYPE => unreachable, // Invalid socket type requested in hints c.EAI_SYSTEM => switch (os.errno(-1)) { @@ -493,7 +497,7 @@ fn linuxLookupName( try canon.resize(0); try linuxLookupNameFromNull(addrs, family, flags); } - if (addrs.len == 0) return error.UnknownName; + if (addrs.len == 0) return error.UnknownHostName; // No further processing is needed if there are fewer than 2 // results or if there are only IPv4 results. @@ -858,7 +862,7 @@ fn linuxLookupNameFromDnsSearch( // Strip final dot for canon, fail if multiple trailing dots. if (mem.endsWith(u8, canon_name, ".")) canon_name.len -= 1; - if (mem.endsWith(u8, canon_name, ".")) return error.UnknownName; + if (mem.endsWith(u8, canon_name, ".")) return error.UnknownHostName; // Name with search domain appended is setup in canon[]. This both // provides the desired default canonical name (if the requested @@ -928,7 +932,7 @@ fn linuxLookupNameFromDns( if (addrs.len != 0) return; if (ap[0].len < 4 or (ap[0][3] & 15) == 2) return error.TemporaryNameServerFailure; - if ((ap[0][3] & 15) == 0) return error.UnknownName; + if ((ap[0][3] & 15) == 0) return error.UnknownHostName; if ((ap[0][3] & 15) == 3) return; return error.NameServerFailure; } @@ -1247,3 +1251,121 @@ fn dnsParseCallback(ctx: dpc_ctx, rr: u8, data: []const u8, packet: []const u8) else => return, } } + +/// This API only works when `std.io.mode` is `std.io.Mode.evented`. +/// This struct is immovable after calling `listen`. +pub const Server = struct { + /// This field is meant to be accessed directly. + /// Call `connections.get` to accept a connection. + connections: *ConnectionChannel, + + /// Copied from `Options` on `init`. + kernel_backlog: u32, + + /// `undefined` until `listen` returns successfully. + listen_address: Address, + + sockfd: ?os.fd_t, + accept_frame: @Frame(acceptConnections), + + pub const ConnectionChannel = std.event.Channel(AcceptError!fs.File); + + pub const AcceptError = error{ + ConnectionAborted, + + /// The per-process limit on the number of open file descriptors has been reached. + ProcessFdQuotaExceeded, + + /// The system-wide limit on the total number of open files has been reached. + SystemFdQuotaExceeded, + + /// Not enough free memory. This often means that the memory allocation is limited + /// by the socket buffer limits, not by the system memory. + SystemResources, + + ProtocolFailure, + + /// Firewall rules forbid connection. + BlockedByFirewall, + } || os.UnexpectedError; + + pub const Options = struct { + /// How many connections the kernel will accept on the application's behalf. + /// If more than this many connections pool in the kernel, clients will start + /// seeing "Connection refused". + kernel_backlog: u32 = 128, + + /// How many connections this `Server` will accept from the kernel even before + /// they are requested from the `connections` channel. + eager_connections: usize = 16, + }; + + /// After this call succeeds, resources have been acquired and must + /// be released with `deinit`. + pub fn init(options: Options) !Server { + const loop = std.event.Loop.instance orelse + @compileError("std.net.Server only works in evented I/O mode"); + return Server{ + .connections = try ConnectionChannel.create(loop, options.eager_connections), + .sockfd = null, + .kernel_backlog = options.kernel_backlog, + .listen_address = undefined, + .accept_frame = undefined, + }; + } + + /// After calling this function, one must call `init` to do anything else with this `Server`. + pub fn deinit(self: *Server) void { + self.close(); + self.connections.destroy(); + self.* = undefined; + } + + pub fn listen(self: *Server, address: Address) !void { + const sock_flags = os.SOCK_STREAM | os.SOCK_CLOEXEC | os.SOCK_NONBLOCK; + const sockfd = try os.socket(os.AF_INET, sock_flags, os.PROTO_tcp); + self.sockfd = sockfd; + errdefer { + os.close(sockfd); + self.sockfd = null; + } + + var socklen = address.getOsSockLen(); + try os.bind(sockfd, &address.os_addr, socklen); + try os.listen(sockfd, self.kernel_backlog); + try os.getsockname(sockfd, &self.listen_address.os_addr, &socklen); + + // acceptConnections loops, calling os.accept(). + self.accept_frame = async self.acceptConnections(); + errdefer await self.accept_frame; + } + + /// Stop listening. It is still necessary to call `deinit` after stopping listening. + /// Calling `deinit` will automatically call `close`. It is safe to call `close` when + /// not listening. + pub fn close(self: *Server) void { + if (self.sockfd) |fd| { + os.close(fd); + self.sockfd = null; + await self.accept_frame; + self.accept_frame = undefined; + self.listen_address = undefined; + } + } + + fn acceptConnections(self: *Server) void { + const sockfd = self.sockfd.?; + const accept_flags = os.SOCK_NONBLOCK | os.SOCK_CLOEXEC; + while (true) { + var accepted_addr: Address = undefined; + var addr_len: os.socklen_t = @sizeOf(os.sockaddr); + const conn = if (os.accept4(sockfd, &accepted_addr.os_addr, &addr_len, accept_flags)) |fd| + fs.File.openHandle(fd) + else |err| switch (err) { + error.WouldBlock => unreachable, // we asserted earlier about non-blocking I/O mode + else => |e| e, + }; + self.connections.put(conn); + } + } +}; diff --git a/lib/std/net/test.zig b/lib/std/net/test.zig new file mode 100644 index 0000000000..45e8663e6a --- /dev/null +++ b/lib/std/net/test.zig @@ -0,0 +1,71 @@ +const std = @import("../std.zig"); +const net = std.net; +const mem = std.mem; +const testing = std.testing; + +test "std.net.parseIp4" { + assert((try parseIp4("127.0.0.1")) == mem.bigToNative(u32, 0x7f000001)); + + testParseIp4Fail("256.0.0.1", error.Overflow); + testParseIp4Fail("x.0.0.1", error.InvalidCharacter); + testParseIp4Fail("127.0.0.1.1", error.InvalidEnd); + testParseIp4Fail("127.0.0.", error.Incomplete); + testParseIp4Fail("100..0.1", error.InvalidCharacter); +} + +fn testParseIp4Fail(buf: []const u8, expected_err: anyerror) void { + if (parseIp4(buf)) |_| { + @panic("expected error"); + } else |e| { + assert(e == expected_err); + } +} + +test "std.net.parseIp6" { + const ip6 = try parseIp6("FF01:0:0:0:0:0:0:FB"); + const addr = Address.initIp6(ip6, 80); + var buf: [100]u8 = undefined; + const printed = try std.fmt.bufPrint(&buf, "{}", addr); + std.testing.expect(mem.eql(u8, "[ff01::fb]:80", printed)); +} + +test "listen on a port, send bytes, receive bytes" { + if (std.builtin.os != .linux) { + // TODO build abstractions for other operating systems + return error.SkipZigTest; + } + if (std.io.mode != .evented) { + // TODO add ability to run tests in non-blocking I/O mode + return error.SkipZigTest; + } + + // TODO doing this at comptime crashed the compiler + const localhost = net.Address.initIp4(net.parseIp4("127.0.0.1") catch unreachable, 0); + + var server = try net.Server.init(net.Server.Options{}); + defer server.deinit(); + try server.listen(localhost); + + var server_frame = async testServer(&server); + var client_frame = async testClient(server.listen_address); + + try await server_frame; + try await client_frame; +} + +fn testClient(addr: net.Address) anyerror!void { + const socket_file = try net.tcpConnectToAddress(addr); + defer socket_file.close(); + + var buf: [100]u8 = undefined; + const len = try socket_file.read(&buf); + const msg = buf[0..len]; + testing.expect(mem.eql(u8, msg, "hello from server\n")); +} + +fn testServer(server: *net.Server) anyerror!void { + var client_file = try server.connections.get(); + + const stream = &client_file.outStream().stream; + try stream.print("hello from server\n"); +} diff --git a/lib/std/os.zig b/lib/std/os.zig index e19aaa08d4..4a99db5497 100644 --- a/lib/std/os.zig +++ b/lib/std/os.zig @@ -308,7 +308,7 @@ pub fn read(fd: fd_t, buf: []u8) ReadError!usize { EINVAL => unreachable, EFAULT => unreachable, EAGAIN => if (std.event.Loop.instance) |loop| { - loop.waitUntilFdReadable(fd) catch return error.WouldBlock; + loop.waitUntilFdReadable(fd); continue; } else { return error.WouldBlock; @@ -325,7 +325,36 @@ pub fn read(fd: fd_t, buf: []u8) ReadError!usize { } /// Number of bytes read is returned. Upon reading end-of-file, zero is returned. -/// This function is for blocking file descriptors only. +/// If the application has a global event loop enabled, EAGAIN is handled +/// via the event loop. Otherwise EAGAIN results in error.WouldBlock. +pub fn readv(fd: fd_t, iov: []const iovec) ReadError!usize { + while (true) { + // TODO handle the case when iov_len is too large and get rid of this @intCast + const rc = system.readv(fd, iov.ptr, @intCast(u32, iov.len)); + switch (errno(rc)) { + 0 => return @bitCast(usize, rc), + EINTR => continue, + EINVAL => unreachable, + EFAULT => unreachable, + EAGAIN => if (std.event.Loop.instance) |loop| { + loop.waitUntilFdReadable(fd); + continue; + } else { + return error.WouldBlock; + }, + EBADF => unreachable, // always a race condition + EIO => return error.InputOutput, + EISDIR => return error.IsDir, + ENOBUFS => return error.SystemResources, + ENOMEM => return error.SystemResources, + else => |err| return unexpectedErrno(err), + } + } +} + +/// Number of bytes read is returned. Upon reading end-of-file, zero is returned. +/// If the application has a global event loop enabled, EAGAIN is handled +/// via the event loop. Otherwise EAGAIN results in error.WouldBlock. pub fn preadv(fd: fd_t, iov: []const iovec, offset: u64) ReadError!usize { if (comptime std.Target.current.isDarwin()) { // Darwin does not have preadv but it does have pread. @@ -355,7 +384,12 @@ pub fn preadv(fd: fd_t, iov: []const iovec, offset: u64) ReadError!usize { EINVAL => unreachable, EFAULT => unreachable, ESPIPE => unreachable, // fd is not seekable - EAGAIN => unreachable, // This function is for blocking reads. + EAGAIN => if (std.event.Loop.instance) |loop| { + loop.waitUntilFdReadable(fd); + continue; + } else { + return error.WouldBlock; + }, EBADF => unreachable, // always a race condition EIO => return error.InputOutput, EISDIR => return error.IsDir, @@ -373,7 +407,12 @@ pub fn preadv(fd: fd_t, iov: []const iovec, offset: u64) ReadError!usize { EINTR => continue, EINVAL => unreachable, EFAULT => unreachable, - EAGAIN => unreachable, // This function is for blocking reads. + EAGAIN => if (std.event.Loop.instance) |loop| { + loop.waitUntilFdReadable(fd); + continue; + } else { + return error.WouldBlock; + }, EBADF => unreachable, // always a race condition EIO => return error.InputOutput, EISDIR => return error.IsDir, @@ -393,10 +432,17 @@ pub const WriteError = error{ BrokenPipe, SystemResources, OperationAborted, + + /// This error occurs when no global event loop is configured, + /// and reading from the file descriptor would block. + WouldBlock, } || UnexpectedError; /// Write to a file descriptor. Keeps trying if it gets interrupted. -/// This function is for blocking file descriptors only. +/// If the application has a global event loop enabled, EAGAIN is handled +/// via the event loop. Otherwise EAGAIN results in error.WouldBlock. +/// TODO evented I/O integration is disabled until +/// https://github.com/ziglang/zig/issues/3557 is solved. pub fn write(fd: fd_t, bytes: []const u8) WriteError!void { if (builtin.os == .windows) { return windows.WriteFile(fd, bytes); @@ -432,7 +478,14 @@ pub fn write(fd: fd_t, bytes: []const u8) WriteError!void { EINTR => continue, EINVAL => unreachable, EFAULT => unreachable, - EAGAIN => unreachable, // This function is for blocking writes. + // TODO https://github.com/ziglang/zig/issues/3557 + EAGAIN => return error.WouldBlock, + //EAGAIN => if (std.event.Loop.instance) |loop| { + // loop.waitUntilFdWritable(fd); + // continue; + //} else { + // return error.WouldBlock; + //}, EBADF => unreachable, // Always a race condition. EDESTADDRREQ => unreachable, // `connect` was never called. EDQUOT => return error.DiskQuota, @@ -446,9 +499,9 @@ pub fn write(fd: fd_t, bytes: []const u8) WriteError!void { } } -/// Write multiple buffers to a file descriptor. Keeps trying if it gets interrupted. -/// This function is for blocking file descriptors only. For non-blocking, see -/// `writevAsync`. +/// Write multiple buffers to a file descriptor. +/// If the application has a global event loop enabled, EAGAIN is handled +/// via the event loop. Otherwise EAGAIN results in error.WouldBlock. pub fn writev(fd: fd_t, iov: []const iovec_const) WriteError!void { while (true) { // TODO handle the case when iov_len is too large and get rid of this @intCast @@ -458,7 +511,12 @@ pub fn writev(fd: fd_t, iov: []const iovec_const) WriteError!void { EINTR => continue, EINVAL => unreachable, EFAULT => unreachable, - EAGAIN => unreachable, // This function is for blocking writes. + EAGAIN => if (std.event.Loop.instance) |loop| { + loop.waitUntilFdWritable(fd); + continue; + } else { + return error.WouldBlock; + }, EBADF => unreachable, // Always a race condition. EDESTADDRREQ => unreachable, // `connect` was never called. EDQUOT => return error.DiskQuota, @@ -474,8 +532,6 @@ pub fn writev(fd: fd_t, iov: []const iovec_const) WriteError!void { /// Write multiple buffers to a file descriptor, with a position offset. /// Keeps trying if it gets interrupted. -/// This function is for blocking file descriptors only. For non-blocking, see -/// `pwritevAsync`. pub fn pwritev(fd: fd_t, iov: []const iovec_const, offset: u64) WriteError!void { if (comptime std.Target.current.isDarwin()) { // Darwin does not have pwritev but it does have pwrite. @@ -504,7 +560,12 @@ pub fn pwritev(fd: fd_t, iov: []const iovec_const, offset: u64) WriteError!void ESPIPE => unreachable, // `fd` is not seekable. EINVAL => unreachable, EFAULT => unreachable, - EAGAIN => unreachable, // This function is for blocking writes. + EAGAIN => if (std.event.Loop.instance) |loop| { + loop.waitUntilFdWritable(fd); + continue; + } else { + return error.WouldBlock; + }, EBADF => unreachable, // Always a race condition. EDESTADDRREQ => unreachable, // `connect` was never called. EDQUOT => return error.DiskQuota, @@ -526,7 +587,12 @@ pub fn pwritev(fd: fd_t, iov: []const iovec_const, offset: u64) WriteError!void EINTR => continue, EINVAL => unreachable, EFAULT => unreachable, - EAGAIN => unreachable, // This function is for blocking writes. + EAGAIN => if (std.event.Loop.instance) |loop| { + loop.waitUntilFdWritable(fd); + continue; + } else { + return error.WouldBlock; + }, EBADF => unreachable, // Always a race condition. EDESTADDRREQ => unreachable, // `connect` was never called. EDQUOT => return error.DiskQuota, @@ -1621,12 +1687,6 @@ pub const AcceptError = error{ /// by the socket buffer limits, not by the system memory. SystemResources, - /// The file descriptor sockfd does not refer to a socket. - FileDescriptorNotASocket, - - /// The referenced socket is not of type SOCK_STREAM. - OperationNotSupported, - ProtocolFailure, /// Firewall rules forbid connection. @@ -1643,7 +1703,7 @@ pub const AcceptError = error{ pub fn accept4( /// This argument is a socket that has been created with `socket`, bound to a local address /// with `bind`, and is listening for connections after a `listen`. - sockfd: i32, + sockfd: fd_t, /// This argument is a pointer to a sockaddr structure. This structure is filled in with the /// address of the peer socket, as known to the communications layer. The exact format of the /// address returned addr is determined by the socket's address family (see `socket` and the @@ -1664,15 +1724,15 @@ pub fn accept4( /// * `SOCK_CLOEXEC` - Set the close-on-exec (`FD_CLOEXEC`) flag on the new file descriptor. See the /// description of the `O_CLOEXEC` flag in `open` for reasons why this may be useful. flags: u32, -) AcceptError!i32 { +) AcceptError!fd_t { while (true) { const rc = system.accept4(sockfd, addr, addr_size, flags); switch (errno(rc)) { - 0 => return @intCast(i32, rc), + 0 => return @intCast(fd_t, rc), EINTR => continue, EAGAIN => if (std.event.Loop.instance) |loop| { - loop.waitUntilFdReadable(sockfd) catch return error.WouldBlock; + loop.waitUntilFdReadable(sockfd); continue; } else { return error.WouldBlock; @@ -1681,12 +1741,12 @@ pub fn accept4( ECONNABORTED => return error.ConnectionAborted, EFAULT => unreachable, EINVAL => unreachable, + ENOTSOCK => unreachable, EMFILE => return error.ProcessFdQuotaExceeded, ENFILE => return error.SystemFdQuotaExceeded, ENOBUFS => return error.SystemResources, ENOMEM => return error.SystemResources, - ENOTSOCK => return error.FileDescriptorNotASocket, - EOPNOTSUPP => return error.OperationNotSupported, + EOPNOTSUPP => unreachable, EPROTO => return error.ProtocolFailure, EPERM => return error.BlockedByFirewall, @@ -1853,26 +1913,31 @@ pub const ConnectError = error{ /// Timeout while attempting connection. The server may be too busy to accept new connections. Note /// that for IP sockets the timeout may be very long when syncookies are enabled on the server. ConnectionTimedOut, + + /// This error occurs when no global event loop is configured, + /// and connecting to the socket would block. + WouldBlock, } || UnexpectedError; /// Initiate a connection on a socket. -/// This is for blocking file descriptors only. -/// For non-blocking, see `connect_async`. -pub fn connect(sockfd: i32, sock_addr: *sockaddr, len: socklen_t) ConnectError!void { +pub fn connect(sockfd: fd_t, sock_addr: sockaddr, len: socklen_t) ConnectError!void { while (true) { - switch (errno(system.connect(sockfd, sock_addr, len))) { + switch (errno(system.connect(sockfd, &sock_addr, len))) { 0 => return, EACCES => return error.PermissionDenied, EPERM => return error.PermissionDenied, EADDRINUSE => return error.AddressInUse, EADDRNOTAVAIL => return error.AddressNotAvailable, EAFNOSUPPORT => return error.AddressFamilyNotSupported, - EAGAIN => return error.SystemResources, + EAGAIN, EINPROGRESS => { + const loop = std.event.Loop.instance orelse return error.WouldBlock; + loop.waitUntilFdWritableOrReadable(sockfd); + return getsockoptError(sockfd); + }, EALREADY => unreachable, // The socket is nonblocking and a previous connection attempt has not yet been completed. EBADF => unreachable, // sockfd is not a valid open file descriptor. ECONNREFUSED => return error.ConnectionRefused, EFAULT => unreachable, // The socket structure address is outside the user's address space. - EINPROGRESS => unreachable, // The socket is nonblocking and the connection cannot be completed immediately. EINTR => continue, EISCONN => unreachable, // The socket is already connected. ENETUNREACH => return error.NetworkUnreachable, @@ -1884,34 +1949,6 @@ pub fn connect(sockfd: i32, sock_addr: *sockaddr, len: socklen_t) ConnectError!v } } -/// Same as `connect` except it is for non-blocking socket file descriptors. -/// It expects to receive EINPROGRESS`. -pub fn connect_async(sockfd: i32, sock_addr: *sockaddr, len: socklen_t) ConnectError!void { - while (true) { - switch (errno(system.connect(sockfd, sock_addr, len))) { - EINVAL => unreachable, - EINTR => continue, - 0, EINPROGRESS => return, - EACCES => return error.PermissionDenied, - EPERM => return error.PermissionDenied, - EADDRINUSE => return error.AddressInUse, - EADDRNOTAVAIL => return error.AddressNotAvailable, - EAFNOSUPPORT => return error.AddressFamilyNotSupported, - EAGAIN => return error.SystemResources, - EALREADY => unreachable, // The socket is nonblocking and a previous connection attempt has not yet been completed. - EBADF => unreachable, // sockfd is not a valid open file descriptor. - ECONNREFUSED => return error.ConnectionRefused, - EFAULT => unreachable, // The socket structure address is outside the user's address space. - EISCONN => unreachable, // The socket is already connected. - ENETUNREACH => return error.NetworkUnreachable, - ENOTSOCK => unreachable, // The file descriptor sockfd does not refer to a socket. - EPROTOTYPE => unreachable, // The socket type does not support the requested communications protocol. - ETIMEDOUT => return error.ConnectionTimedOut, - else => |err| return unexpectedErrno(err), - } - } -} - pub fn getsockoptError(sockfd: i32) ConnectError!void { var err_code: u32 = undefined; var size: u32 = @sizeOf(u32); @@ -2962,7 +2999,7 @@ pub fn sendto( EACCES => return error.AccessDenied, EAGAIN => if (std.event.Loop.instance) |loop| { - loop.waitUntilFdWritable(sockfd) catch return error.WouldBlock; + loop.waitUntilFdWritable(sockfd); continue; } else { return error.WouldBlock; @@ -3065,7 +3102,7 @@ pub fn recvfrom( ENOTSOCK => unreachable, EINTR => continue, EAGAIN => if (std.event.Loop.instance) |loop| { - loop.waitUntilFdReadable(sockfd) catch return error.WouldBlock; + loop.waitUntilFdReadable(sockfd); continue; } else { return error.WouldBlock; diff --git a/lib/std/target.zig b/lib/std/target.zig index 095ba3c8e1..d297312b4e 100644 --- a/lib/std/target.zig +++ b/lib/std/target.zig @@ -205,6 +205,8 @@ pub const Target = union(enum) { }, }; + pub const stack_align = 16; + pub fn zigTriple(self: Target, allocator: *mem.Allocator) ![]u8 { return std.fmt.allocPrint( allocator, diff --git a/src-self-hosted/stage1.zig b/src-self-hosted/stage1.zig index 619fd32dc1..3524c4f5b5 100644 --- a/src-self-hosted/stage1.zig +++ b/src-self-hosted/stage1.zig @@ -128,6 +128,7 @@ export fn stage2_free_clang_errors(errors_ptr: [*]translate_c.ClangErrMsg, error export fn stage2_render_ast(tree: *ast.Tree, output_file: *FILE) Error { const c_out_stream = &std.io.COutStream.init(output_file).stream; _ = std.zig.render(std.heap.c_allocator, c_out_stream, tree) catch |e| switch (e) { + error.WouldBlock => unreachable, // stage1 opens stuff in exclusively blocking mode error.SystemResources => return Error.SystemResources, error.OperationAborted => return Error.OperationAborted, error.BrokenPipe => return Error.BrokenPipe, diff --git a/src/analyze.cpp b/src/analyze.cpp index e9e03466c5..141bd8d191 100644 --- a/src/analyze.cpp +++ b/src/analyze.cpp @@ -4381,6 +4381,10 @@ static Error analyze_callee_async(CodeGen *g, ZigFn *fn, ZigFn *callee, AstNode if (callee->anal_state == FnAnalStateComplete) { analyze_fn_async(g, callee, true); if (callee->anal_state == FnAnalStateInvalid) { + if (g->trace_err != nullptr) { + g->trace_err = add_error_note(g, g->trace_err, call_node, + buf_sprintf("while checking if '%s' is async", buf_ptr(&fn->symbol_name))); + } return ErrorSemanticAnalyzeFail; } callee_is_async = fn_is_async(callee); @@ -7538,7 +7542,9 @@ bool type_is_c_abi_int(CodeGen *g, ZigType *ty) { uint32_t get_host_int_bytes(CodeGen *g, ZigType *struct_type, TypeStructField *field) { assert(struct_type->id == ZigTypeIdStruct); - assert(type_is_resolved(struct_type, ResolveStatusSizeKnown)); + if (struct_type->data.structure.layout != ContainerLayoutAuto) { + assert(type_is_resolved(struct_type, ResolveStatusSizeKnown)); + } if (struct_type->data.structure.host_int_bytes == nullptr) return 0; return struct_type->data.structure.host_int_bytes[field->gen_index]; diff --git a/src/ir.cpp b/src/ir.cpp index 3b3814b3ad..a53367a324 100644 --- a/src/ir.cpp +++ b/src/ir.cpp @@ -17692,7 +17692,8 @@ static IrInstruction *ir_analyze_instruction_elem_ptr(IrAnalyze *ira, IrInstruct { size_t offset = ptr_field->data.x_ptr.data.base_array.elem_index; uint64_t new_index = offset + index; - assert(new_index < ptr_field->data.x_ptr.data.base_array.array_val->type->data.array.len); + ir_assert(new_index < ptr_field->data.x_ptr.data.base_array.array_val->type->data.array.len, + &elem_ptr_instruction->base); out_val->data.x_ptr.special = ConstPtrSpecialBaseArray; out_val->data.x_ptr.data.base_array.array_val = ptr_field->data.x_ptr.data.base_array.array_val; @@ -17854,7 +17855,10 @@ static IrInstruction *ir_analyze_struct_field_ptr(IrAnalyze *ira, IrInstruction case OnePossibleValueNo: break; } - if ((err = type_resolve(ira->codegen, struct_type, ResolveStatusAlignmentKnown))) + ResolveStatus needed_resolve_status = + (struct_type->data.structure.layout == ContainerLayoutAuto) ? + ResolveStatusZeroBitsKnown : ResolveStatusSizeKnown; + if ((err = type_resolve(ira->codegen, struct_type, needed_resolve_status))) return ira->codegen->invalid_instruction; assert(struct_ptr->value.type->id == ZigTypeIdPointer); uint32_t ptr_bit_offset = struct_ptr->value.type->data.pointer.bit_offset_in_host; @@ -17873,6 +17877,9 @@ static IrInstruction *ir_analyze_struct_field_ptr(IrAnalyze *ira, IrInstruction return ira->codegen->invalid_instruction; if (ptr_val->data.x_ptr.special != ConstPtrSpecialHardCodedAddr) { + if ((err = type_resolve(ira->codegen, struct_type, ResolveStatusSizeKnown))) + return ira->codegen->invalid_instruction; + ConstExprValue *struct_val = const_ptr_pointee(ira, ira->codegen, ptr_val, source_instr->source_node); if (struct_val == nullptr) return ira->codegen->invalid_instruction; @@ -17919,7 +17926,7 @@ static IrInstruction *ir_analyze_container_field_ptr(IrAnalyze *ira, Buf *field_ Error err; ZigType *bare_type = container_ref_type(container_type); - if ((err = type_resolve(ira->codegen, bare_type, ResolveStatusSizeKnown))) + if ((err = type_resolve(ira->codegen, bare_type, ResolveStatusZeroBitsKnown))) return ira->codegen->invalid_instruction; assert(container_ptr->value.type->id == ZigTypeIdPointer);