mirror of
https://github.com/ziglang/zig.git
synced 2025-12-06 22:33:08 +00:00
parent
dff7ca6784
commit
6aecc268fd
@ -12,15 +12,18 @@ const maxInt = std.math.maxInt;
|
|||||||
const Thread = std.Thread;
|
const Thread = std.Thread;
|
||||||
|
|
||||||
pub const Loop = struct {
|
pub const Loop = struct {
|
||||||
allocator: *mem.Allocator,
|
|
||||||
next_tick_queue: std.atomic.Queue(anyframe),
|
next_tick_queue: std.atomic.Queue(anyframe),
|
||||||
os_data: OsData,
|
os_data: OsData,
|
||||||
final_resume_node: ResumeNode,
|
final_resume_node: ResumeNode,
|
||||||
pending_event_count: usize,
|
pending_event_count: usize,
|
||||||
extra_threads: []*Thread,
|
extra_threads: []*Thread,
|
||||||
|
|
||||||
// pre-allocated eventfds. all permanently active.
|
/// For resources that have the same lifetime as the `Loop`.
|
||||||
// this is how we send promises to be resumed on other threads.
|
/// This is only used by `Loop` for the thread pool and associated resources.
|
||||||
|
arena: std.heap.ArenaAllocator,
|
||||||
|
|
||||||
|
/// Pre-allocated eventfds. All permanently active.
|
||||||
|
/// This is how `Loop` sends promises to be resumed on other threads.
|
||||||
available_eventfd_resume_nodes: std.atomic.Stack(ResumeNode.EventFd),
|
available_eventfd_resume_nodes: std.atomic.Stack(ResumeNode.EventFd),
|
||||||
eventfd_resume_nodes: []std.atomic.Stack(ResumeNode.EventFd).Node,
|
eventfd_resume_nodes: []std.atomic.Stack(ResumeNode.EventFd).Node,
|
||||||
|
|
||||||
@ -127,11 +130,9 @@ pub const Loop = struct {
|
|||||||
/// Thread count is the total thread count. The thread pool size will be
|
/// Thread count is the total thread count. The thread pool size will be
|
||||||
/// max(thread_count - 1, 0)
|
/// max(thread_count - 1, 0)
|
||||||
pub fn initThreadPool(self: *Loop, thread_count: usize) !void {
|
pub fn initThreadPool(self: *Loop, thread_count: usize) !void {
|
||||||
// TODO: https://github.com/ziglang/zig/issues/3539
|
|
||||||
const allocator = std.heap.page_allocator;
|
|
||||||
self.* = Loop{
|
self.* = Loop{
|
||||||
|
.arena = std.heap.ArenaAllocator.init(std.heap.page_allocator),
|
||||||
.pending_event_count = 1,
|
.pending_event_count = 1,
|
||||||
.allocator = allocator,
|
|
||||||
.os_data = undefined,
|
.os_data = undefined,
|
||||||
.next_tick_queue = std.atomic.Queue(anyframe).init(),
|
.next_tick_queue = std.atomic.Queue(anyframe).init(),
|
||||||
.extra_threads = undefined,
|
.extra_threads = undefined,
|
||||||
@ -143,17 +144,17 @@ pub const Loop = struct {
|
|||||||
.overlapped = ResumeNode.overlapped_init,
|
.overlapped = ResumeNode.overlapped_init,
|
||||||
},
|
},
|
||||||
};
|
};
|
||||||
|
errdefer self.arena.deinit();
|
||||||
|
|
||||||
// We need at least one of these in case the fs thread wants to use onNextTick
|
// We need at least one of these in case the fs thread wants to use onNextTick
|
||||||
const extra_thread_count = thread_count - 1;
|
const extra_thread_count = thread_count - 1;
|
||||||
const resume_node_count = std.math.max(extra_thread_count, 1);
|
const resume_node_count = std.math.max(extra_thread_count, 1);
|
||||||
self.eventfd_resume_nodes = try self.allocator.alloc(
|
self.eventfd_resume_nodes = try self.arena.allocator.alloc(
|
||||||
std.atomic.Stack(ResumeNode.EventFd).Node,
|
std.atomic.Stack(ResumeNode.EventFd).Node,
|
||||||
resume_node_count,
|
resume_node_count,
|
||||||
);
|
);
|
||||||
errdefer self.allocator.free(self.eventfd_resume_nodes);
|
|
||||||
|
|
||||||
self.extra_threads = try self.allocator.alloc(*Thread, extra_thread_count);
|
self.extra_threads = try self.arena.allocator.alloc(*Thread, extra_thread_count);
|
||||||
errdefer self.allocator.free(self.extra_threads);
|
|
||||||
|
|
||||||
try self.initOsData(extra_thread_count);
|
try self.initOsData(extra_thread_count);
|
||||||
errdefer self.deinitOsData();
|
errdefer self.deinitOsData();
|
||||||
@ -161,7 +162,8 @@ pub const Loop = struct {
|
|||||||
|
|
||||||
pub fn deinit(self: *Loop) void {
|
pub fn deinit(self: *Loop) void {
|
||||||
self.deinitOsData();
|
self.deinitOsData();
|
||||||
self.allocator.free(self.extra_threads);
|
self.arena.deinit();
|
||||||
|
self.* = undefined;
|
||||||
}
|
}
|
||||||
|
|
||||||
const InitOsDataError = os.EpollCreateError || mem.Allocator.Error || os.EventFdError ||
|
const InitOsDataError = os.EpollCreateError || mem.Allocator.Error || os.EventFdError ||
|
||||||
@ -407,7 +409,6 @@ pub const Loop = struct {
|
|||||||
noasync os.close(self.os_data.final_eventfd);
|
noasync os.close(self.os_data.final_eventfd);
|
||||||
while (self.available_eventfd_resume_nodes.pop()) |node| noasync os.close(node.data.eventfd);
|
while (self.available_eventfd_resume_nodes.pop()) |node| noasync os.close(node.data.eventfd);
|
||||||
noasync os.close(self.os_data.epollfd);
|
noasync os.close(self.os_data.epollfd);
|
||||||
self.allocator.free(self.eventfd_resume_nodes);
|
|
||||||
},
|
},
|
||||||
.macosx, .freebsd, .netbsd, .dragonfly => {
|
.macosx, .freebsd, .netbsd, .dragonfly => {
|
||||||
noasync os.close(self.os_data.kqfd);
|
noasync os.close(self.os_data.kqfd);
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user