From 5c8d2b6dc7abac80689711dc5207fc1259b6a199 Mon Sep 17 00:00:00 2001 From: MrBounty Date: Sun, 17 Nov 2024 15:25:54 +0100 Subject: [PATCH] Create ThreadEngine Now the DBEngine have a ThreadEngine that have a pool. Like that it is always the same pool and I dont create one everytime I run a parseing methods of the FileEngine --- src/fileEngine.zig | 104 ++++--------------------------------------- src/main.zig | 17 +++++-- src/threadEngine.zig | 76 +++++++++++++++++++++++++++++++ 3 files changed, 98 insertions(+), 99 deletions(-) create mode 100644 src/threadEngine.zig diff --git a/src/fileEngine.zig b/src/fileEngine.zig index c2bfdee..3c68b60 100644 --- a/src/fileEngine.zig +++ b/src/fileEngine.zig @@ -6,6 +6,7 @@ const Pool = std.Thread.Pool; const Allocator = std.mem.Allocator; const SchemaEngine = @import("schemaEngine.zig").SchemaEngine; const SchemaStruct = @import("schemaParser.zig").Parser.SchemaStruct; +const ThreadSyncContext = @import("threadEngine.zig").ThreadSyncContext; const dtype = @import("dtype"); const s2t = dtype.s2t; @@ -30,40 +31,6 @@ const log = std.log.scoped(.fileEngine); // TODO: Start using State at the start and end of each function for debugging const FileEngineState = enum { Parsing, Waiting }; -const ThreadSyncContext = struct { - processed_struct: std.atomic.Value(u64) = std.atomic.Value(u64).init(0), - error_file: std.atomic.Value(u64) = std.atomic.Value(u64).init(0), - completed_file: std.atomic.Value(u64) = std.atomic.Value(u64).init(0), - max_struct: u64, - max_file: u64, - - fn init(max_struct: u64, max_file: u64) ThreadSyncContext { - return ThreadSyncContext{ - .max_struct = max_struct, - .max_file = max_file, - }; - } - - fn isComplete(self: *ThreadSyncContext) bool { - return (self.completed_file.load(.acquire) + self.error_file.load(.acquire)) >= self.max_file; - } - - fn completeThread(self: *ThreadSyncContext) void { - _ = self.completed_file.fetchAdd(1, .release); - } - - fn incrementAndCheckStructLimit(self: *ThreadSyncContext) bool { - if (self.max_struct == 0) return false; - const new_count = self.processed_struct.fetchAdd(1, .monotonic); - return new_count >= self.max_struct; - } - - fn logError(self: *ThreadSyncContext, message: []const u8, err: anyerror) void { - log.err("{s}: {any}", .{ message, err }); - _ = self.error_file.fetchAdd(1, .acquire); - } -}; - /// Manage everything that is relate to read or write in files /// Or even get stats, whatever. If it touch files, it's here pub const FileEngine = struct { @@ -71,12 +38,14 @@ pub const FileEngine = struct { state: FileEngineState, path_to_ZipponDB_dir: []const u8, schema_engine: SchemaEngine = undefined, // I dont really like that here + thread_pool: *Pool = undefined, - pub fn init(allocator: Allocator, path: []const u8) ZipponError!FileEngine { + pub fn init(allocator: Allocator, path: []const u8, thread_pool: *Pool) ZipponError!FileEngine { return FileEngine{ .allocator = allocator, .path_to_ZipponDB_dir = allocator.dupe(u8, path) catch return ZipponError.MemoryError, .state = .Waiting, + .thread_pool = thread_pool, }; } @@ -236,17 +205,6 @@ pub const FileEngine = struct { const dir = try utils.printOpenDir("{s}/DATA/{s}", .{ self.path_to_ZipponDB_dir, sstruct.name }, .{}); // Multi-threading setup - var arena = std.heap.ThreadSafeAllocator{ - .child_allocator = self.allocator, - }; - - var pool: std.Thread.Pool = undefined; - defer pool.deinit(); - pool.init(std.Thread.Pool.Options{ - .allocator = arena.allocator(), - .n_jobs = CPU_CORE, - }) catch return ZipponError.ThreadError; - var sync_context = ThreadSyncContext.init( 0, max_file_index + 1, @@ -265,7 +223,7 @@ pub const FileEngine = struct { // Spawn threads for each file for (0..(max_file_index + 1)) |file_index| { - pool.spawn(populateFileIndexUUIDMapOneFile, .{ + self.thread_pool.spawn(populateFileIndexUUIDMapOneFile, .{ sstruct, &thread_writer_list[file_index], file_index, @@ -334,17 +292,6 @@ pub const FileEngine = struct { const dir = try utils.printOpenDir("{s}/DATA/{s}", .{ self.path_to_ZipponDB_dir, sstruct.name }, .{}); // Multi-threading setup - var arena = std.heap.ThreadSafeAllocator{ - .child_allocator = self.allocator, - }; - - var pool: std.Thread.Pool = undefined; - defer pool.deinit(); - pool.init(std.Thread.Pool.Options{ - .allocator = arena.allocator(), - .n_jobs = CPU_CORE, - }) catch return ZipponError.ThreadError; - var sync_context = ThreadSyncContext.init( additional_data.entity_count_to_find, max_file_index + 1, @@ -363,7 +310,7 @@ pub const FileEngine = struct { // Spawn threads for each file for (0..(max_file_index + 1)) |file_index| { - pool.spawn(populateVoidUUIDMapOneFile, .{ + self.thread_pool.spawn(populateVoidUUIDMapOneFile, .{ sstruct, filter, &thread_writer_list[file_index], @@ -449,17 +396,6 @@ pub const FileEngine = struct { const dir = try utils.printOpenDir("{s}/DATA/{s}", .{ self.path_to_ZipponDB_dir, sstruct.name }, .{ .access_sub_paths = false }); // Multi thread stuffs - var arena = std.heap.ThreadSafeAllocator{ - .child_allocator = self.allocator, - }; - - var pool: std.Thread.Pool = undefined; - defer pool.deinit(); - pool.init(std.Thread.Pool.Options{ - .allocator = arena.allocator(), - .n_jobs = CPU_CORE, - }) catch return ZipponError.ThreadError; - var sync_context = ThreadSyncContext.init( additional_data.entity_count_to_find, max_file_index + 1, @@ -483,7 +419,7 @@ pub const FileEngine = struct { for (0..(max_file_index + 1)) |file_index| { thread_writer_list[file_index] = std.ArrayList(u8).init(allocator); - pool.spawn(parseEntitiesOneFile, .{ + self.thread_pool.spawn(parseEntitiesOneFile, .{ thread_writer_list[file_index].writer(), file_index, dir, @@ -674,17 +610,6 @@ pub const FileEngine = struct { const dir = try utils.printOpenDir("{s}/DATA/{s}", .{ self.path_to_ZipponDB_dir, sstruct.name }, .{}); // Multi-threading setup - var arena = std.heap.ThreadSafeAllocator{ - .child_allocator = self.allocator, - }; - - var pool: std.Thread.Pool = undefined; - defer pool.deinit(); - pool.init(std.Thread.Pool.Options{ - .allocator = arena.allocator(), - .n_jobs = CPU_CORE, - }) catch return ZipponError.ThreadError; - var sync_context = ThreadSyncContext.init( additional_data.entity_count_to_find, max_file_index + 1, @@ -718,7 +643,7 @@ pub const FileEngine = struct { // Spawn threads for each file for (0..(max_file_index + 1)) |file_index| { - pool.spawn(updateEntitiesOneFile, .{ + self.thread_pool.spawn(updateEntitiesOneFile, .{ new_data_buff, sstruct, filter, @@ -858,17 +783,6 @@ pub const FileEngine = struct { const dir = try utils.printOpenDir("{s}/DATA/{s}", .{ self.path_to_ZipponDB_dir, sstruct.name }, .{}); // Multi-threading setup - var arena = std.heap.ThreadSafeAllocator{ - .child_allocator = self.allocator, - }; - - var pool: std.Thread.Pool = undefined; - defer pool.deinit(); - pool.init(std.Thread.Pool.Options{ - .allocator = arena.allocator(), - .n_jobs = CPU_CORE, - }) catch return ZipponError.ThreadError; - var sync_context = ThreadSyncContext.init( additional_data.entity_count_to_find, max_file_index + 1, @@ -887,7 +801,7 @@ pub const FileEngine = struct { // Spawn threads for each file for (0..(max_file_index + 1)) |file_index| { - pool.spawn(deleteEntitiesOneFile, .{ + self.thread_pool.spawn(deleteEntitiesOneFile, .{ sstruct, filter, thread_writer_list[file_index].writer(), diff --git a/src/main.zig b/src/main.zig index 2051f84..6ee6521 100644 --- a/src/main.zig +++ b/src/main.zig @@ -2,9 +2,11 @@ const std = @import("std"); const utils = @import("stuffs/utils.zig"); const send = utils.send; const Allocator = std.mem.Allocator; +const Pool = std.Thread.Pool; const FileEngine = @import("fileEngine.zig").FileEngine; const SchemaEngine = @import("schemaEngine.zig").SchemaEngine; +const ThreadEngine = @import("threadEngine.zig").ThreadEngine; const cliTokenizer = @import("tokenizers/cli.zig").Tokenizer; const cliToken = @import("tokenizers/cli.zig").Token; @@ -15,8 +17,10 @@ const ziqlParser = @import("ziqlParser.zig").Parser; const ZipponError = @import("stuffs/errors.zig").ZipponError; -const BUFFER_SIZE = @import("config.zig").BUFFER_SIZE; -const HELP_MESSAGE = @import("config.zig").HELP_MESSAGE; +const config = @import("config.zig"); +const BUFFER_SIZE = config.BUFFER_SIZE; +const CPU_CORE = config.CPU_CORE; +const HELP_MESSAGE = config.HELP_MESSAGE; const State = enum { expect_main_command, @@ -45,19 +49,22 @@ pub const DBEngine = struct { state: DBEngineState = .Init, file_engine: FileEngine = undefined, schema_engine: SchemaEngine = undefined, + thread_engine: ThreadEngine = undefined, pub fn init(allocator: std.mem.Allocator, potential_main_path: ?[]const u8, potential_schema_path: ?[]const u8) DBEngine { var self = DBEngine{ .allocator = allocator }; + + self.thread_engine = ThreadEngine.init(allocator); + const potential_main_path_or_environment_variable = potential_main_path orelse utils.getEnvVariable(allocator, "ZIPPONDB_PATH"); defer { - log.debug("{s} {any}\n", .{ potential_main_path_or_environment_variable.?, potential_schema_path }); if (potential_main_path_or_environment_variable != null and potential_main_path == null) allocator.free(potential_main_path_or_environment_variable.?); } if (potential_main_path_or_environment_variable) |main_path| { log_path = std.fmt.bufPrint(&log_buff, "{s}/LOG/log", .{main_path}) catch ""; log.info("Found ZIPPONDB_PATH: {s}.", .{main_path}); - self.file_engine = FileEngine.init(self.allocator, main_path) catch { + self.file_engine = FileEngine.init(self.allocator, main_path, self.thread_engine.thread_pool) catch { log.err("Error when init FileEngine", .{}); self.state = .MissingFileEngine; return self; @@ -123,12 +130,14 @@ pub const DBEngine = struct { } else { log.info(HELP_MESSAGE.no_schema, .{self.file_engine.path_to_ZipponDB_dir}); } + return self; } pub fn deinit(self: *DBEngine) void { if (self.state == .Ok or self.state == .MissingSchemaEngine) self.file_engine.deinit(); // Pretty sure I can use like state > 2 because enum of just number if (self.state == .Ok) self.schema_engine.deinit(); + self.thread_engine.deinit(); } pub fn runQuery(self: *DBEngine, null_term_query_str: [:0]const u8) void { diff --git a/src/threadEngine.zig b/src/threadEngine.zig new file mode 100644 index 0000000..0c9efdb --- /dev/null +++ b/src/threadEngine.zig @@ -0,0 +1,76 @@ +// TODO: Put the ThreadSynx stuff and create a ThreadEngine with the arena, pool, and some methods + +const std = @import("std"); +const U64 = std.atomic.Value(u64); +const Pool = std.Thread.Pool; +const Allocator = std.mem.Allocator; + +const ZipponError = @import("stuffs/errors.zig").ZipponError; +const CPU_CORE = @import("config.zig").CPU_CORE; +const log = std.log.scoped(.thread); + +pub const ThreadSyncContext = struct { + processed_struct: std.atomic.Value(u64) = std.atomic.Value(u64).init(0), + error_file: std.atomic.Value(u64) = std.atomic.Value(u64).init(0), + completed_file: std.atomic.Value(u64) = std.atomic.Value(u64).init(0), + max_struct: u64, + max_file: u64, + + pub fn init(max_struct: u64, max_file: u64) ThreadSyncContext { + return ThreadSyncContext{ + .max_struct = max_struct, + .max_file = max_file, + }; + } + + pub fn isComplete(self: *ThreadSyncContext) bool { + return (self.completed_file.load(.acquire) + self.error_file.load(.acquire)) >= self.max_file; + } + + pub fn completeThread(self: *ThreadSyncContext) void { + _ = self.completed_file.fetchAdd(1, .release); + } + + pub fn incrementAndCheckStructLimit(self: *ThreadSyncContext) bool { + if (self.max_struct == 0) return false; + const new_count = self.processed_struct.fetchAdd(1, .monotonic); + return new_count >= self.max_struct; + } + + pub fn logError(self: *ThreadSyncContext, message: []const u8, err: anyerror) void { + log.err("{s}: {any}", .{ message, err }); + _ = self.error_file.fetchAdd(1, .acquire); + } +}; + +pub const ThreadEngine = struct { + allocator: Allocator, + thread_arena: *std.heap.ThreadSafeAllocator = undefined, + thread_pool: *Pool = undefined, + + // TODO: Make better error handeling + pub fn init(allocator: Allocator) ThreadEngine { + const thread_arena = allocator.create(std.heap.ThreadSafeAllocator) catch @panic("=("); + thread_arena.* = std.heap.ThreadSafeAllocator{ + .child_allocator = allocator, + }; + + const thread_pool = allocator.create(Pool) catch @panic("=("); + thread_pool.*.init(std.Thread.Pool.Options{ + .allocator = thread_arena.allocator(), + .n_jobs = CPU_CORE, + }) catch @panic("=("); + + return ThreadEngine{ + .allocator = allocator, + .thread_pool = thread_pool, + .thread_arena = thread_arena, + }; + } + + pub fn deinit(self: *ThreadEngine) void { + self.thread_pool.deinit(); + self.allocator.destroy(self.thread_pool); + self.allocator.destroy(self.thread_arena); + } +};