From f05696a924ab7789bf19d6b642e166ad66c70285 Mon Sep 17 00:00:00 2001 From: MrBounty Date: Wed, 22 Jan 2025 18:24:02 +0100 Subject: [PATCH] Removed max_file from SyncContext to use work group Now instead of create a context with the number of file to parse and check every x millisecond if it is complete, I use work group. For each file, I spawn a thread in the group and then I call wg.wait() --- src/file/core.zig | 1 - src/file/read.zig | 95 +++++++++++-------------------------- src/file/write.zig | 105 +++++++++++++---------------------------- src/thread/context.zig | 8 +--- src/thread/engine.zig | 7 +-- 5 files changed, 65 insertions(+), 151 deletions(-) diff --git a/src/file/core.zig b/src/file/core.zig index 20d0e66..2889531 100644 --- a/src/file/core.zig +++ b/src/file/core.zig @@ -7,7 +7,6 @@ const ZipponError = @import("error").ZipponError; const log = std.log.scoped(.fileEngine); var path_to_ZipponDB_dir_buffer: [1024]u8 = undefined; -pub var data_buffer: [config.BUFFER_SIZE]u8 = undefined; /// Manage everything that is relate to read or write in files /// Or even get stats, whatever. If it touch files, it's here diff --git a/src/file/read.zig b/src/file/read.zig index 239d590..f10b739 100644 --- a/src/file/read.zig +++ b/src/file/read.zig @@ -23,8 +23,6 @@ const log = std.log.scoped(.fileEngine); const Self = @import("core.zig").Self; -var path_buffer: [1024]u8 = undefined; - /// Use a struct name to populate a list with all UUID of this struct /// TODO: Multi thread that too pub fn getNumberOfEntityAndFile(self: *Self, struct_name: []const u8) ZipponError!struct { entity: usize, file: usize } { @@ -49,12 +47,6 @@ pub fn populateFileIndexUUIDMap( const dir = try self.printOpenDir("{s}/DATA/{s}", .{ self.path_to_ZipponDB_dir, sstruct.name }, .{}); const to_parse = try self.allFileIndex(allocator, sstruct.name); - // Multi-threading setup - var sync_context = ThreadSyncContext.init( - 0, - to_parse.len, - ); - // Create a thread-safe writer for each file var thread_writer_list = allocator.alloc(std.ArrayList(UUID), to_parse.len) catch return ZipponError.MemoryError; defer { @@ -67,18 +59,14 @@ pub fn populateFileIndexUUIDMap( } // Spawn threads for each file - for (to_parse, 0..) |file_index, i| { - self.thread_pool.spawn(populateFileIndexUUIDMapOneFile, .{ - sstruct, - &thread_writer_list[i], - file_index, - dir, - &sync_context, - }) catch return ZipponError.ThreadError; - } - - // Wait for all threads to complete - while (!sync_context.isComplete()) std.time.sleep(10_000_000); + var wg: std.Thread.WaitGroup = .{}; + for (to_parse, 0..) |file_index, i| self.thread_pool.spawnWg(&wg, populateFileIndexUUIDMapOneFile, .{ + sstruct, + &thread_writer_list[i], + file_index, + dir, + }); + wg.wait(); // Combine results for (thread_writer_list, 0..) |list, file_index| { @@ -91,35 +79,20 @@ fn populateFileIndexUUIDMapOneFile( list: *std.ArrayList(UUID), file_index: u64, dir: std.fs.Dir, - sync_context: *ThreadSyncContext, ) void { + var path_buffer: [1024 * 10]u8 = undefined; var data_buffer: [config.BUFFER_SIZE]u8 = undefined; var fa = std.heap.FixedBufferAllocator.init(&data_buffer); defer fa.reset(); const allocator = fa.allocator(); - const path = std.fmt.bufPrint(&path_buffer, "{d}.zid", .{file_index}) catch |err| { - sync_context.logError("Error creating file path", err); - return; - }; - - var iter = zid.DataIterator.init(allocator, path, dir, sstruct.zid_schema) catch |err| { - sync_context.logError("Error initializing DataIterator", err); - return; - }; + const path = std.fmt.bufPrint(&path_buffer, "{d}.zid", .{file_index}) catch return; + var iter = zid.DataIterator.init(allocator, path, dir, sstruct.zid_schema) catch return; defer iter.deinit(); - while (iter.next() catch |err| { - sync_context.logError("Error initializing DataIterator", err); - return; - }) |row| { - list.*.append(UUID{ .bytes = row[0].UUID }) catch |err| { - sync_context.logError("Error initializing DataIterator", err); - return; - }; + while (iter.next() catch return) |row| { + list.*.append(UUID{ .bytes = row[0].UUID }) catch return; } - - _ = sync_context.completeThread(); } /// Use a struct name and filter to populate a map with all UUID bytes as key and void as value @@ -141,10 +114,7 @@ pub fn populateVoidUUIDMap( const to_parse = try self.allFileIndex(allocator, sstruct.name); // Multi-threading setup - var sync_context = ThreadSyncContext.init( - additional_data.limit, - to_parse.len, - ); + var sync_context = ThreadSyncContext.init(additional_data.limit); // Create a thread-safe writer for each file var thread_writer_list = allocator.alloc(std.ArrayList(UUID), to_parse.len + 1) catch return ZipponError.MemoryError; @@ -154,19 +124,20 @@ pub fn populateVoidUUIDMap( } // Spawn threads for each file - for (to_parse, 0..) |file_index, i| { - self.thread_pool.spawn(populateVoidUUIDMapOneFile, .{ + var wg: std.Thread.WaitGroup = .{}; + for (to_parse, 0..) |file_index, i| self.thread_pool.spawnWg( + &wg, + populateVoidUUIDMapOneFile, + .{ sstruct, filter, &thread_writer_list[i], file_index, dir, &sync_context, - }) catch return ZipponError.ThreadError; - } - - // Wait for all threads to complete - while (!sync_context.isComplete()) std.time.sleep(10_000_000); + }, + ); + wg.wait(); // Combine results for (thread_writer_list) |list| { @@ -192,6 +163,7 @@ fn populateVoidUUIDMapOneFile( dir: std.fs.Dir, sync_context: *ThreadSyncContext, ) void { + var path_buffer: [1024 * 10]u8 = undefined; var data_buffer: [config.BUFFER_SIZE]u8 = undefined; var fa = std.heap.FixedBufferAllocator.init(&data_buffer); defer fa.reset(); @@ -222,8 +194,6 @@ fn populateVoidUUIDMapOneFile( if (sync_context.incrementAndCheckStructLimit()) break; } } - - _ = sync_context.completeThread(); } /// Take a filter, parse all file and if one struct if validate by the filter, write it in a JSON format to the writer @@ -257,23 +227,16 @@ pub fn parseEntities( const dir = try self.printOpenDir("{s}/DATA/{s}", .{ self.path_to_ZipponDB_dir, sstruct.name }, .{ .access_sub_paths = false }); // Multi thread stuffs - var sync_context = ThreadSyncContext.init( - additional_data.limit, - to_parse.len, - ); + var sync_context = ThreadSyncContext.init(additional_data.limit); // Do an array of writer for each thread - // Could I create just the number of max cpu ? Because if I have 1000 files, I do 1000 list - // But at the end, only the number of use CPU/Thread will use list simultanously - // So I could pass list from a thread to another technicly var thread_writer_list = allocator.alloc(std.ArrayList(u8), to_parse.len) catch return ZipponError.MemoryError; const data_types = try self.schema_engine.structName2DataType(struct_name); // Start parsing all file in multiple thread var wg: std.Thread.WaitGroup = .{}; for (to_parse, 0..) |file_index, i| { - thread_writer_list[file_index] = std.ArrayList(u8).init(allocator); - + thread_writer_list[i] = std.ArrayList(u8).init(allocator); self.thread_pool.spawnWg( &wg, parseEntitiesOneFile, @@ -320,6 +283,7 @@ fn parseEntitiesOneFile( data_types: []const DataType, sync_context: *ThreadSyncContext, ) void { + var path_buffer: [1024 * 10]u8 = undefined; var data_buffer: [config.BUFFER_SIZE]u8 = undefined; var fa = std.heap.FixedBufferAllocator.init(&data_buffer); defer fa.reset(); @@ -389,10 +353,7 @@ pub fn parseEntitiesRelationMap( ); // Multi thread stuffs - var sync_context = ThreadSyncContext.init( - relation_map.additional_data.limit, - to_parse.len, - ); + var sync_context = ThreadSyncContext.init(relation_map.additional_data.limit); // Do one writer for each thread otherwise it create error by writing at the same time var thread_map_list = allocator.alloc( @@ -404,7 +365,6 @@ pub fn parseEntitiesRelationMap( var wg: std.Thread.WaitGroup = .{}; for (to_parse, 0..) |file_index, i| { thread_map_list[i] = relation_map.map.cloneWithAllocator(allocator) catch return ZipponError.MemoryError; - self.thread_pool.spawnWg( &wg, parseEntitiesRelationMapOneFile, @@ -455,6 +415,7 @@ fn parseEntitiesRelationMapOneFile( data_types: []const DataType, sync_context: *ThreadSyncContext, ) void { + var path_buffer: [1024 * 10]u8 = undefined; var data_buffer: [config.BUFFER_SIZE]u8 = undefined; var fa = std.heap.FixedBufferAllocator.init(&data_buffer); defer fa.reset(); diff --git a/src/file/write.zig b/src/file/write.zig index a5e0b12..bc683cf 100644 --- a/src/file/write.zig +++ b/src/file/write.zig @@ -84,10 +84,7 @@ pub fn updateEntities( const to_parse = try self.allFileIndex(allocator, struct_name); // Multi-threading setup - var sync_context = ThreadSyncContext.init( - additional_data.limit, - to_parse.len, - ); + var sync_context = ThreadSyncContext.init(additional_data.limit); // Create a thread-safe writer for each file var thread_writer_list = allocator.alloc(std.ArrayList(u8), to_parse.len) catch return ZipponError.MemoryError; @@ -115,8 +112,11 @@ pub fn updateEntities( } // Spawn threads for each file - for (to_parse, 0..) |file_index, i| { - self.thread_pool.spawn(updateEntitiesOneFile, .{ + var wg: std.Thread.WaitGroup = .{}; + for (to_parse, 0..) |file_index, i| self.thread_pool.spawnWg( + &wg, + updateEntitiesOneFile, + .{ sstruct, filter, map, @@ -125,11 +125,9 @@ pub fn updateEntities( file_index, dir, &sync_context, - }) catch return ZipponError.ThreadError; - } - - // Wait for all threads to complete - while (!sync_context.isComplete()) std.time.sleep(100_000); // Check every 0.1ms + }, + ); + wg.wait(); // Combine results writer.writeByte('[') catch return ZipponError.WriteError; @@ -162,55 +160,35 @@ fn updateEntitiesOneFile( // First I fill the one that are updated by a const for (index_switch, 0..) |is, i| switch (is) { - .fix => new_data_buff[i] = @import("utils.zig").string2Data(allocator, map.get(sstruct.members[i]).?.value) catch |err| { - sync_context.logError("Writting data", err); - return; - }, + .fix => new_data_buff[i] = @import("utils.zig").string2Data(allocator, map.get(sstruct.members[i]).?.value) catch return, else => {}, }; - const path = std.fmt.bufPrint(&path_buffer, "{d}.zid", .{file_index}) catch |err| { - sync_context.logError("Error creating file path", err); - return; - }; + const path = std.fmt.bufPrint(&path_buffer, "{d}.zid", .{file_index}) catch return; - var iter = zid.DataIterator.init(allocator, path, dir, sstruct.zid_schema) catch |err| { - sync_context.logError("Error initializing DataIterator", err); - return; - }; + var iter = zid.DataIterator.init(allocator, path, dir, sstruct.zid_schema) catch return; defer iter.deinit(); - const new_path = std.fmt.allocPrint(allocator, "{d}.zid.new", .{file_index}) catch |err| { - sync_context.logError("Error creating new file path", err); - return; - }; + const new_path = std.fmt.allocPrint(allocator, "{d}.zid.new", .{file_index}) catch return; defer allocator.free(new_path); - zid.createFile(new_path, dir) catch |err| { - sync_context.logError("Error creating new file", err); - return; - }; + zid.createFile(new_path, dir) catch return; - var new_writer = zid.DataWriter.init(new_path, dir) catch |err| { - sync_context.logError("Error initializing DataWriter", err); + var new_writer = zid.DataWriter.init(new_path, dir) catch { zid.deleteFile(new_path, dir) catch {}; return; }; defer new_writer.deinit(); var finish_writing = false; - while (iter.next() catch |err| { - sync_context.logError("Parsing files", err); - return; - }) |row| { + while (iter.next() catch return) |row| { if (!finish_writing and (filter == null or filter.?.evaluate(row))) { // Add the unchanged Data in the new_data_buff for (index_switch, 0..) |is, i| switch (is) { .stay => new_data_buff[i] = row[i], .vari => { const x = map.get(sstruct.members[i]).?.array; - updateData(allocator, x.condition, &row[i], x.data) catch |err| { - sync_context.logError("Error updating data", err); + updateData(allocator, x.condition, &row[i], x.data) catch { zid.deleteFile(new_path, dir) catch {}; return; }; @@ -220,45 +198,28 @@ fn updateEntitiesOneFile( log.debug("{d} {any}\n\n", .{ new_data_buff.len, new_data_buff }); - new_writer.write(new_data_buff) catch |err| { - sync_context.logError("Error initializing DataWriter", err); + new_writer.write(new_data_buff) catch { zid.deleteFile(new_path, dir) catch {}; return; }; - writer.print("\"{s}\",", .{UUID.format_bytes(row[0].UUID)}) catch |err| { - sync_context.logError("Error initializing DataWriter", err); + writer.print("\"{s}\",", .{UUID.format_bytes(row[0].UUID)}) catch { zid.deleteFile(new_path, dir) catch {}; return; }; finish_writing = sync_context.incrementAndCheckStructLimit(); } else { - new_writer.write(row) catch |err| { - sync_context.logError("Error initializing DataWriter", err); + new_writer.write(row) catch { zid.deleteFile(new_path, dir) catch {}; return; }; } } - new_writer.flush() catch |err| { - sync_context.logError("Error initializing DataWriter", err); - zid.deleteFile(new_path, dir) catch {}; - return; - }; - - dir.deleteFile(path) catch |err| { - sync_context.logError("Error deleting old file", err); - return; - }; - - dir.rename(new_path, path) catch |err| { - sync_context.logError("Error initializing DataWriter", err); - return; - }; - - _ = sync_context.completeThread(); + new_writer.flush() catch return; + dir.deleteFile(path) catch return; + dir.rename(new_path, path) catch return; } /// Delete all entity based on the filter. Will also write a JSON format list of all UUID deleted into the buffer @@ -279,10 +240,7 @@ pub fn deleteEntities( const to_parse = try self.allFileIndex(allocator, struct_name); // Multi-threading setup - var sync_context = ThreadSyncContext.init( - additional_data.limit, - to_parse.len, - ); + var sync_context = ThreadSyncContext.init(additional_data.limit); // Create a thread-safe writer for each file var thread_writer_list = allocator.alloc(std.ArrayList(u8), to_parse.len) catch return ZipponError.MemoryError; @@ -291,19 +249,20 @@ pub fn deleteEntities( } // Spawn threads for each file - for (to_parse, 0..) |file_index, i| { - self.thread_pool.spawn(deleteEntitiesOneFile, .{ + var wg: std.Thread.WaitGroup = .{}; + for (to_parse, 0..) |file_index, i| self.thread_pool.spawnWg( + &wg, + deleteEntitiesOneFile, + .{ sstruct, filter, thread_writer_list[i].writer(), file_index, dir, &sync_context, - }) catch return ZipponError.ThreadError; - } - - // Wait for all threads to complete - while (!sync_context.isComplete()) std.time.sleep(100_000); // Check every 0.1ms + }, + ); + wg.wait(); // Combine results writer.writeByte('[') catch return ZipponError.WriteError; diff --git a/src/thread/context.zig b/src/thread/context.zig index b43b204..62e5c7b 100644 --- a/src/thread/context.zig +++ b/src/thread/context.zig @@ -10,19 +10,13 @@ processed_struct: U64 = U64.init(0), error_file: U64 = U64.init(0), completed_file: U64 = U64.init(0), max_struct: u64, -max_file: u64, -pub fn init(max_struct: u64, max_file: u64) Self { +pub fn init(max_struct: u64) Self { return Self{ .max_struct = max_struct, - .max_file = max_file, }; } -pub fn isComplete(self: *Self) bool { - return (self.completed_file.load(.acquire) + self.error_file.load(.acquire)) >= self.max_file; -} - pub fn completeThread(self: *Self) void { _ = self.completed_file.fetchAdd(1, .release); } diff --git a/src/thread/engine.zig b/src/thread/engine.zig index 9f1b96c..802dfba 100644 --- a/src/thread/engine.zig +++ b/src/thread/engine.zig @@ -17,14 +17,15 @@ pub fn init(allocator: std.mem.Allocator) !ThreadEngine { .child_allocator = allocator, }; - const cpu_core = if (CPU_CORE == 0) try std.Thread.getCpuCount() else CPU_CORE; + const cpu_core = if (CPU_CORE == 0) std.Thread.getCpuCount() catch 1 else CPU_CORE; log.info("Using {d} cpu core", .{cpu_core}); + log.info("Using {d}Mb stack size", .{std.Thread.SpawnConfig.default_stack_size / 1024 / 1024}); const thread_pool = try allocator.create(std.Thread.Pool); - thread_pool.init(std.Thread.Pool.Options{ + try thread_pool.init(std.Thread.Pool.Options{ .allocator = thread_arena.allocator(), .n_jobs = cpu_core, - }) catch @panic("=("); + }); return ThreadEngine{ .thread_pool = thread_pool,