Create ThreadEngine

Now the DBEngine have a ThreadEngine that have a pool. Like that it is
always the same pool and I dont create one everytime I run a parseing
methods of the FileEngine
This commit is contained in:
Adrien Bouvais 2024-11-17 15:25:54 +01:00
parent 303dc81e2a
commit 5c8d2b6dc7
3 changed files with 98 additions and 99 deletions

View File

@ -6,6 +6,7 @@ const Pool = std.Thread.Pool;
const Allocator = std.mem.Allocator;
const SchemaEngine = @import("schemaEngine.zig").SchemaEngine;
const SchemaStruct = @import("schemaParser.zig").Parser.SchemaStruct;
const ThreadSyncContext = @import("threadEngine.zig").ThreadSyncContext;
const dtype = @import("dtype");
const s2t = dtype.s2t;
@ -30,40 +31,6 @@ const log = std.log.scoped(.fileEngine);
// TODO: Start using State at the start and end of each function for debugging
const FileEngineState = enum { Parsing, Waiting };
const ThreadSyncContext = struct {
processed_struct: std.atomic.Value(u64) = std.atomic.Value(u64).init(0),
error_file: std.atomic.Value(u64) = std.atomic.Value(u64).init(0),
completed_file: std.atomic.Value(u64) = std.atomic.Value(u64).init(0),
max_struct: u64,
max_file: u64,
fn init(max_struct: u64, max_file: u64) ThreadSyncContext {
return ThreadSyncContext{
.max_struct = max_struct,
.max_file = max_file,
};
}
fn isComplete(self: *ThreadSyncContext) bool {
return (self.completed_file.load(.acquire) + self.error_file.load(.acquire)) >= self.max_file;
}
fn completeThread(self: *ThreadSyncContext) void {
_ = self.completed_file.fetchAdd(1, .release);
}
fn incrementAndCheckStructLimit(self: *ThreadSyncContext) bool {
if (self.max_struct == 0) return false;
const new_count = self.processed_struct.fetchAdd(1, .monotonic);
return new_count >= self.max_struct;
}
fn logError(self: *ThreadSyncContext, message: []const u8, err: anyerror) void {
log.err("{s}: {any}", .{ message, err });
_ = self.error_file.fetchAdd(1, .acquire);
}
};
/// Manage everything that is relate to read or write in files
/// Or even get stats, whatever. If it touch files, it's here
pub const FileEngine = struct {
@ -71,12 +38,14 @@ pub const FileEngine = struct {
state: FileEngineState,
path_to_ZipponDB_dir: []const u8,
schema_engine: SchemaEngine = undefined, // I dont really like that here
thread_pool: *Pool = undefined,
pub fn init(allocator: Allocator, path: []const u8) ZipponError!FileEngine {
pub fn init(allocator: Allocator, path: []const u8, thread_pool: *Pool) ZipponError!FileEngine {
return FileEngine{
.allocator = allocator,
.path_to_ZipponDB_dir = allocator.dupe(u8, path) catch return ZipponError.MemoryError,
.state = .Waiting,
.thread_pool = thread_pool,
};
}
@ -236,17 +205,6 @@ pub const FileEngine = struct {
const dir = try utils.printOpenDir("{s}/DATA/{s}", .{ self.path_to_ZipponDB_dir, sstruct.name }, .{});
// Multi-threading setup
var arena = std.heap.ThreadSafeAllocator{
.child_allocator = self.allocator,
};
var pool: std.Thread.Pool = undefined;
defer pool.deinit();
pool.init(std.Thread.Pool.Options{
.allocator = arena.allocator(),
.n_jobs = CPU_CORE,
}) catch return ZipponError.ThreadError;
var sync_context = ThreadSyncContext.init(
0,
max_file_index + 1,
@ -265,7 +223,7 @@ pub const FileEngine = struct {
// Spawn threads for each file
for (0..(max_file_index + 1)) |file_index| {
pool.spawn(populateFileIndexUUIDMapOneFile, .{
self.thread_pool.spawn(populateFileIndexUUIDMapOneFile, .{
sstruct,
&thread_writer_list[file_index],
file_index,
@ -334,17 +292,6 @@ pub const FileEngine = struct {
const dir = try utils.printOpenDir("{s}/DATA/{s}", .{ self.path_to_ZipponDB_dir, sstruct.name }, .{});
// Multi-threading setup
var arena = std.heap.ThreadSafeAllocator{
.child_allocator = self.allocator,
};
var pool: std.Thread.Pool = undefined;
defer pool.deinit();
pool.init(std.Thread.Pool.Options{
.allocator = arena.allocator(),
.n_jobs = CPU_CORE,
}) catch return ZipponError.ThreadError;
var sync_context = ThreadSyncContext.init(
additional_data.entity_count_to_find,
max_file_index + 1,
@ -363,7 +310,7 @@ pub const FileEngine = struct {
// Spawn threads for each file
for (0..(max_file_index + 1)) |file_index| {
pool.spawn(populateVoidUUIDMapOneFile, .{
self.thread_pool.spawn(populateVoidUUIDMapOneFile, .{
sstruct,
filter,
&thread_writer_list[file_index],
@ -449,17 +396,6 @@ pub const FileEngine = struct {
const dir = try utils.printOpenDir("{s}/DATA/{s}", .{ self.path_to_ZipponDB_dir, sstruct.name }, .{ .access_sub_paths = false });
// Multi thread stuffs
var arena = std.heap.ThreadSafeAllocator{
.child_allocator = self.allocator,
};
var pool: std.Thread.Pool = undefined;
defer pool.deinit();
pool.init(std.Thread.Pool.Options{
.allocator = arena.allocator(),
.n_jobs = CPU_CORE,
}) catch return ZipponError.ThreadError;
var sync_context = ThreadSyncContext.init(
additional_data.entity_count_to_find,
max_file_index + 1,
@ -483,7 +419,7 @@ pub const FileEngine = struct {
for (0..(max_file_index + 1)) |file_index| {
thread_writer_list[file_index] = std.ArrayList(u8).init(allocator);
pool.spawn(parseEntitiesOneFile, .{
self.thread_pool.spawn(parseEntitiesOneFile, .{
thread_writer_list[file_index].writer(),
file_index,
dir,
@ -674,17 +610,6 @@ pub const FileEngine = struct {
const dir = try utils.printOpenDir("{s}/DATA/{s}", .{ self.path_to_ZipponDB_dir, sstruct.name }, .{});
// Multi-threading setup
var arena = std.heap.ThreadSafeAllocator{
.child_allocator = self.allocator,
};
var pool: std.Thread.Pool = undefined;
defer pool.deinit();
pool.init(std.Thread.Pool.Options{
.allocator = arena.allocator(),
.n_jobs = CPU_CORE,
}) catch return ZipponError.ThreadError;
var sync_context = ThreadSyncContext.init(
additional_data.entity_count_to_find,
max_file_index + 1,
@ -718,7 +643,7 @@ pub const FileEngine = struct {
// Spawn threads for each file
for (0..(max_file_index + 1)) |file_index| {
pool.spawn(updateEntitiesOneFile, .{
self.thread_pool.spawn(updateEntitiesOneFile, .{
new_data_buff,
sstruct,
filter,
@ -858,17 +783,6 @@ pub const FileEngine = struct {
const dir = try utils.printOpenDir("{s}/DATA/{s}", .{ self.path_to_ZipponDB_dir, sstruct.name }, .{});
// Multi-threading setup
var arena = std.heap.ThreadSafeAllocator{
.child_allocator = self.allocator,
};
var pool: std.Thread.Pool = undefined;
defer pool.deinit();
pool.init(std.Thread.Pool.Options{
.allocator = arena.allocator(),
.n_jobs = CPU_CORE,
}) catch return ZipponError.ThreadError;
var sync_context = ThreadSyncContext.init(
additional_data.entity_count_to_find,
max_file_index + 1,
@ -887,7 +801,7 @@ pub const FileEngine = struct {
// Spawn threads for each file
for (0..(max_file_index + 1)) |file_index| {
pool.spawn(deleteEntitiesOneFile, .{
self.thread_pool.spawn(deleteEntitiesOneFile, .{
sstruct,
filter,
thread_writer_list[file_index].writer(),

View File

@ -2,9 +2,11 @@ const std = @import("std");
const utils = @import("stuffs/utils.zig");
const send = utils.send;
const Allocator = std.mem.Allocator;
const Pool = std.Thread.Pool;
const FileEngine = @import("fileEngine.zig").FileEngine;
const SchemaEngine = @import("schemaEngine.zig").SchemaEngine;
const ThreadEngine = @import("threadEngine.zig").ThreadEngine;
const cliTokenizer = @import("tokenizers/cli.zig").Tokenizer;
const cliToken = @import("tokenizers/cli.zig").Token;
@ -15,8 +17,10 @@ const ziqlParser = @import("ziqlParser.zig").Parser;
const ZipponError = @import("stuffs/errors.zig").ZipponError;
const BUFFER_SIZE = @import("config.zig").BUFFER_SIZE;
const HELP_MESSAGE = @import("config.zig").HELP_MESSAGE;
const config = @import("config.zig");
const BUFFER_SIZE = config.BUFFER_SIZE;
const CPU_CORE = config.CPU_CORE;
const HELP_MESSAGE = config.HELP_MESSAGE;
const State = enum {
expect_main_command,
@ -45,19 +49,22 @@ pub const DBEngine = struct {
state: DBEngineState = .Init,
file_engine: FileEngine = undefined,
schema_engine: SchemaEngine = undefined,
thread_engine: ThreadEngine = undefined,
pub fn init(allocator: std.mem.Allocator, potential_main_path: ?[]const u8, potential_schema_path: ?[]const u8) DBEngine {
var self = DBEngine{ .allocator = allocator };
self.thread_engine = ThreadEngine.init(allocator);
const potential_main_path_or_environment_variable = potential_main_path orelse utils.getEnvVariable(allocator, "ZIPPONDB_PATH");
defer {
log.debug("{s} {any}\n", .{ potential_main_path_or_environment_variable.?, potential_schema_path });
if (potential_main_path_or_environment_variable != null and potential_main_path == null) allocator.free(potential_main_path_or_environment_variable.?);
}
if (potential_main_path_or_environment_variable) |main_path| {
log_path = std.fmt.bufPrint(&log_buff, "{s}/LOG/log", .{main_path}) catch "";
log.info("Found ZIPPONDB_PATH: {s}.", .{main_path});
self.file_engine = FileEngine.init(self.allocator, main_path) catch {
self.file_engine = FileEngine.init(self.allocator, main_path, self.thread_engine.thread_pool) catch {
log.err("Error when init FileEngine", .{});
self.state = .MissingFileEngine;
return self;
@ -123,12 +130,14 @@ pub const DBEngine = struct {
} else {
log.info(HELP_MESSAGE.no_schema, .{self.file_engine.path_to_ZipponDB_dir});
}
return self;
}
pub fn deinit(self: *DBEngine) void {
if (self.state == .Ok or self.state == .MissingSchemaEngine) self.file_engine.deinit(); // Pretty sure I can use like state > 2 because enum of just number
if (self.state == .Ok) self.schema_engine.deinit();
self.thread_engine.deinit();
}
pub fn runQuery(self: *DBEngine, null_term_query_str: [:0]const u8) void {

76
src/threadEngine.zig Normal file
View File

@ -0,0 +1,76 @@
// TODO: Put the ThreadSynx stuff and create a ThreadEngine with the arena, pool, and some methods
const std = @import("std");
const U64 = std.atomic.Value(u64);
const Pool = std.Thread.Pool;
const Allocator = std.mem.Allocator;
const ZipponError = @import("stuffs/errors.zig").ZipponError;
const CPU_CORE = @import("config.zig").CPU_CORE;
const log = std.log.scoped(.thread);
pub const ThreadSyncContext = struct {
processed_struct: std.atomic.Value(u64) = std.atomic.Value(u64).init(0),
error_file: std.atomic.Value(u64) = std.atomic.Value(u64).init(0),
completed_file: std.atomic.Value(u64) = std.atomic.Value(u64).init(0),
max_struct: u64,
max_file: u64,
pub fn init(max_struct: u64, max_file: u64) ThreadSyncContext {
return ThreadSyncContext{
.max_struct = max_struct,
.max_file = max_file,
};
}
pub fn isComplete(self: *ThreadSyncContext) bool {
return (self.completed_file.load(.acquire) + self.error_file.load(.acquire)) >= self.max_file;
}
pub fn completeThread(self: *ThreadSyncContext) void {
_ = self.completed_file.fetchAdd(1, .release);
}
pub fn incrementAndCheckStructLimit(self: *ThreadSyncContext) bool {
if (self.max_struct == 0) return false;
const new_count = self.processed_struct.fetchAdd(1, .monotonic);
return new_count >= self.max_struct;
}
pub fn logError(self: *ThreadSyncContext, message: []const u8, err: anyerror) void {
log.err("{s}: {any}", .{ message, err });
_ = self.error_file.fetchAdd(1, .acquire);
}
};
pub const ThreadEngine = struct {
allocator: Allocator,
thread_arena: *std.heap.ThreadSafeAllocator = undefined,
thread_pool: *Pool = undefined,
// TODO: Make better error handeling
pub fn init(allocator: Allocator) ThreadEngine {
const thread_arena = allocator.create(std.heap.ThreadSafeAllocator) catch @panic("=(");
thread_arena.* = std.heap.ThreadSafeAllocator{
.child_allocator = allocator,
};
const thread_pool = allocator.create(Pool) catch @panic("=(");
thread_pool.*.init(std.Thread.Pool.Options{
.allocator = thread_arena.allocator(),
.n_jobs = CPU_CORE,
}) catch @panic("=(");
return ThreadEngine{
.allocator = allocator,
.thread_pool = thread_pool,
.thread_arena = thread_arena,
};
}
pub fn deinit(self: *ThreadEngine) void {
self.thread_pool.deinit();
self.allocator.destroy(self.thread_pool);
self.allocator.destroy(self.thread_arena);
}
};