mirror of
https://github.com/ziglang/zig.git
synced 2025-12-16 11:13:08 +00:00
std.event.Loop: fix not waking up after file system I/O
for single threaded event loops
This commit is contained in:
parent
3dce41b61a
commit
e24cc2e77b
@ -69,6 +69,7 @@ pub extern "c" fn raise(sig: c_int) c_int;
|
|||||||
pub extern "c" fn read(fd: fd_t, buf: [*]u8, nbyte: usize) isize;
|
pub extern "c" fn read(fd: fd_t, buf: [*]u8, nbyte: usize) isize;
|
||||||
pub extern "c" fn pread(fd: fd_t, buf: [*]u8, nbyte: usize, offset: u64) isize;
|
pub extern "c" fn pread(fd: fd_t, buf: [*]u8, nbyte: usize, offset: u64) isize;
|
||||||
pub extern "c" fn preadv(fd: c_int, iov: [*]const iovec, iovcnt: c_uint, offset: usize) isize;
|
pub extern "c" fn preadv(fd: c_int, iov: [*]const iovec, iovcnt: c_uint, offset: usize) isize;
|
||||||
|
pub extern "c" fn writev(fd: c_int, iov: [*]const iovec_const, iovcnt: c_uint) isize;
|
||||||
pub extern "c" fn pwritev(fd: c_int, iov: [*]const iovec_const, iovcnt: c_uint, offset: usize) isize;
|
pub extern "c" fn pwritev(fd: c_int, iov: [*]const iovec_const, iovcnt: c_uint, offset: usize) isize;
|
||||||
pub extern "c" fn stat(noalias path: [*]const u8, noalias buf: *Stat) c_int;
|
pub extern "c" fn stat(noalias path: [*]const u8, noalias buf: *Stat) c_int;
|
||||||
pub extern "c" fn write(fd: fd_t, buf: [*]const u8, nbyte: usize) isize;
|
pub extern "c" fn write(fd: fd_t, buf: [*]const u8, nbyte: usize) isize;
|
||||||
|
|||||||
@ -149,13 +149,14 @@ pub const Loop = struct {
|
|||||||
.overlapped = ResumeNode.overlapped_init,
|
.overlapped = ResumeNode.overlapped_init,
|
||||||
},
|
},
|
||||||
};
|
};
|
||||||
const extra_thread_count = thread_count - 1;
|
// We need an extra one of these in case the fs thread wants to use onNextTick
|
||||||
self.eventfd_resume_nodes = try self.allocator.alloc(
|
self.eventfd_resume_nodes = try self.allocator.alloc(
|
||||||
std.atomic.Stack(ResumeNode.EventFd).Node,
|
std.atomic.Stack(ResumeNode.EventFd).Node,
|
||||||
extra_thread_count,
|
thread_count,
|
||||||
);
|
);
|
||||||
errdefer self.allocator.free(self.eventfd_resume_nodes);
|
errdefer self.allocator.free(self.eventfd_resume_nodes);
|
||||||
|
|
||||||
|
const extra_thread_count = thread_count - 1;
|
||||||
self.extra_threads = try self.allocator.alloc(*Thread, extra_thread_count);
|
self.extra_threads = try self.allocator.alloc(*Thread, extra_thread_count);
|
||||||
errdefer self.allocator.free(self.extra_threads);
|
errdefer self.allocator.free(self.extra_threads);
|
||||||
|
|
||||||
@ -197,7 +198,7 @@ pub const Loop = struct {
|
|||||||
eventfd_node.* = std.atomic.Stack(ResumeNode.EventFd).Node{
|
eventfd_node.* = std.atomic.Stack(ResumeNode.EventFd).Node{
|
||||||
.data = ResumeNode.EventFd{
|
.data = ResumeNode.EventFd{
|
||||||
.base = ResumeNode{
|
.base = ResumeNode{
|
||||||
.id = ResumeNode.Id.EventFd,
|
.id = .EventFd,
|
||||||
.handle = undefined,
|
.handle = undefined,
|
||||||
.overlapped = ResumeNode.overlapped_init,
|
.overlapped = ResumeNode.overlapped_init,
|
||||||
},
|
},
|
||||||
@ -454,12 +455,12 @@ pub const Loop = struct {
|
|||||||
self.finishOneEvent();
|
self.finishOneEvent();
|
||||||
}
|
}
|
||||||
|
|
||||||
pub async fn linuxWaitFd(self: *Loop, fd: i32, flags: u32) !void {
|
pub fn linuxWaitFd(self: *Loop, fd: i32, flags: u32) !void {
|
||||||
defer self.linuxRemoveFd(fd);
|
defer self.linuxRemoveFd(fd);
|
||||||
suspend {
|
suspend {
|
||||||
var resume_node = ResumeNode.Basic{
|
var resume_node = ResumeNode.Basic{
|
||||||
.base = ResumeNode{
|
.base = ResumeNode{
|
||||||
.id = ResumeNode.Id.Basic,
|
.id = .Basic,
|
||||||
.handle = @frame(),
|
.handle = @frame(),
|
||||||
.overlapped = ResumeNode.overlapped_init,
|
.overlapped = ResumeNode.overlapped_init,
|
||||||
},
|
},
|
||||||
@ -793,8 +794,8 @@ pub const Loop = struct {
|
|||||||
|
|
||||||
fn posixFsRun(self: *Loop) void {
|
fn posixFsRun(self: *Loop) void {
|
||||||
while (true) {
|
while (true) {
|
||||||
if (builtin.os == builtin.Os.linux) {
|
if (builtin.os == .linux) {
|
||||||
_ = @atomicRmw(i32, &self.os_data.fs_queue_item, AtomicRmwOp.Xchg, 0, AtomicOrder.SeqCst);
|
_ = @atomicRmw(i32, &self.os_data.fs_queue_item, .Xchg, 0, .SeqCst);
|
||||||
}
|
}
|
||||||
while (self.os_data.fs_queue.get()) |node| {
|
while (self.os_data.fs_queue.get()) |node| {
|
||||||
switch (node.data.msg) {
|
switch (node.data.msg) {
|
||||||
@ -833,14 +834,14 @@ pub const Loop = struct {
|
|||||||
self.finishOneEvent();
|
self.finishOneEvent();
|
||||||
}
|
}
|
||||||
switch (builtin.os) {
|
switch (builtin.os) {
|
||||||
builtin.Os.linux => {
|
.linux => {
|
||||||
const rc = os.linux.futex_wait(&self.os_data.fs_queue_item, os.linux.FUTEX_WAIT, 0, null);
|
const rc = os.linux.futex_wait(&self.os_data.fs_queue_item, os.linux.FUTEX_WAIT, 0, null);
|
||||||
switch (os.linux.getErrno(rc)) {
|
switch (os.linux.getErrno(rc)) {
|
||||||
0, os.EINTR, os.EAGAIN => continue,
|
0, os.EINTR, os.EAGAIN => continue,
|
||||||
else => unreachable,
|
else => unreachable,
|
||||||
}
|
}
|
||||||
},
|
},
|
||||||
builtin.Os.macosx, builtin.Os.freebsd, builtin.Os.netbsd => {
|
.macosx, .freebsd, .netbsd => {
|
||||||
const fs_kevs = (*const [1]os.Kevent)(&self.os_data.fs_kevent_wait);
|
const fs_kevs = (*const [1]os.Kevent)(&self.os_data.fs_kevent_wait);
|
||||||
var out_kevs: [1]os.Kevent = undefined;
|
var out_kevs: [1]os.Kevent = undefined;
|
||||||
_ = os.kevent(self.os_data.fs_kqfd, fs_kevs, out_kevs[0..], null) catch unreachable;
|
_ = os.kevent(self.os_data.fs_kqfd, fs_kevs, out_kevs[0..], null) catch unreachable;
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user