From 98f0c69e618960ef7b35728d130312e4bbcc23e1 Mon Sep 17 00:00:00 2001 From: MrBounty Date: Wed, 22 Jan 2025 22:34:46 +0100 Subject: [PATCH] SOLVED THREAD BUG Needed a thread safe alloc. Will need to update other part as I just did parseEntities and deleteEntities --- benchmark.zig | 2 +- build.zig | 4 +- lib/config_benchmark.zig | 2 +- src/dataStructure/UUIDFileIndex.zig | 4 + src/dataStructure/additionalData.zig | 20 +++++ src/file/read.zig | 32 +++----- src/file/utils.zig | 1 + src/file/write.zig | 113 +++++++++------------------ src/thread/context.zig | 9 --- 9 files changed, 75 insertions(+), 112 deletions(-) diff --git a/benchmark.zig b/benchmark.zig index 96ebad5..ae76f38 100644 --- a/benchmark.zig +++ b/benchmark.zig @@ -67,7 +67,7 @@ test "benchmark" { // Maybe I can make it a test to use the testing alloc pub fn benchmark(allocator: std.mem.Allocator) !void { - const to_test = [_]usize{ 5, 50, 500, 5_000, 50_000, 500_000, 5_000_000 }; + const to_test = [_]usize{ 5, 50, 500, 5_000, 50_000, 500_000, 5_000_000, 10_000_000 }; var line_buffer: [1024 * 1024]u8 = undefined; for (to_test) |users_count| { var db_engine = DBEngine.init(allocator, "benchmarkDB", "schema/benchmark"); diff --git a/build.zig b/build.zig index b9c7bae..dd87f79 100644 --- a/build.zig +++ b/build.zig @@ -2,7 +2,7 @@ const std = @import("std"); pub fn build(b: *std.Build) void { const target = b.standardTargetOptions(.{}); - const optimize = b.standardOptimizeOption(.{ .preferred_optimize_mode = .ReleaseFast }); + const optimize = b.standardOptimizeOption(.{ .preferred_optimize_mode = .ReleaseSmall }); // Run // ----------------------------------------------- @@ -168,7 +168,7 @@ pub fn build(b: *std.Build) void { .name = exe_name, .root_source_file = b.path("src/main.zig"), .target = tar, - .optimize = .ReleaseFast, + .optimize = optimize, }); // Add the same imports as your main executable diff --git a/lib/config_benchmark.zig b/lib/config_benchmark.zig index 0c1270c..764bbda 100644 --- a/lib/config_benchmark.zig +++ b/lib/config_benchmark.zig @@ -1,6 +1,6 @@ pub const BUFFER_SIZE = 1024 * 1024; // Used a bit everywhere. The size for the schema for example. 10kB pub const MAX_FILE_SIZE = 1024 * 1024; // 1MB -pub const CPU_CORE = 1; +pub const CPU_CORE = 16; // Debug pub const PRINT_STATE = false; diff --git a/src/dataStructure/UUIDFileIndex.zig b/src/dataStructure/UUIDFileIndex.zig index c8ceaa8..f4eda8e 100644 --- a/src/dataStructure/UUIDFileIndex.zig +++ b/src/dataStructure/UUIDFileIndex.zig @@ -39,6 +39,10 @@ pub fn get(self: UUIDIndexMap, uuid: UUID) ?usize { return self.map.get(uuid); } +pub fn reset(self: *UUIDIndexMap) void { + _ = self.arena.reset(.free_all); +} + test "Create empty UUIDIndexMap" { const allocator = std.testing.allocator; diff --git a/src/dataStructure/additionalData.zig b/src/dataStructure/additionalData.zig index a97e0bd..1738cef 100644 --- a/src/dataStructure/additionalData.zig +++ b/src/dataStructure/additionalData.zig @@ -31,6 +31,18 @@ pub fn addMember(self: *AdditionalData, name: []const u8, index: usize) ZipponEr self.childrens.append(AdditionalDataMember.init(self.allocator, name, index)) catch return ZipponError.MemoryError; } +pub fn clone(self: AdditionalData, allocator: Allocator) ZipponError!AdditionalData { + var new_additional_data = AdditionalData.init(allocator); + + new_additional_data.limit = self.limit; + + for (self.childrens.items) |child| { + new_additional_data.childrens.append(child.clone(allocator) catch return ZipponError.MemoryError) catch return ZipponError.MemoryError; + } + + return new_additional_data; +} + // This is name in: [name] // There is an additional data because it can be [friend [1; name]] pub const AdditionalDataMember = struct { @@ -41,4 +53,12 @@ pub const AdditionalDataMember = struct { pub fn init(allocator: Allocator, name: []const u8, index: usize) AdditionalDataMember { return AdditionalDataMember{ .name = name, .index = index, .additional_data = AdditionalData.init(allocator) }; } + + pub fn clone(self: AdditionalDataMember, allocator: Allocator) ZipponError!AdditionalDataMember { + return AdditionalDataMember{ + .name = allocator.dupe(u8, self.name) catch return ZipponError.MemoryError, + .index = self.index, + .additional_data = self.additional_data.clone(allocator) catch return ZipponError.MemoryError, + }; + } }; diff --git a/src/file/read.zig b/src/file/read.zig index f10b739..2f08a4c 100644 --- a/src/file/read.zig +++ b/src/file/read.zig @@ -169,27 +169,15 @@ fn populateVoidUUIDMapOneFile( defer fa.reset(); const allocator = fa.allocator(); - const path = std.fmt.bufPrint(&path_buffer, "{d}.zid", .{file_index}) catch |err| { - sync_context.logError("Error creating file path", err); - return; - }; + const path = std.fmt.bufPrint(&path_buffer, "{d}.zid", .{file_index}) catch return; - var iter = zid.DataIterator.init(allocator, path, dir, sstruct.zid_schema) catch |err| { - sync_context.logError("Error initializing DataIterator", err); - return; - }; + var iter = zid.DataIterator.init(allocator, path, dir, sstruct.zid_schema) catch return; defer iter.deinit(); - while (iter.next() catch |err| { - sync_context.logError("Error in iter next", err); - return; - }) |row| { + while (iter.next() catch return) |row| { if (sync_context.checkStructLimit()) break; if (filter == null or filter.?.evaluate(row)) { - list.*.append(UUID{ .bytes = row[0].UUID }) catch |err| { - sync_context.logError("Error initializing DataIterator", err); - return; - }; + list.*.append(UUID{ .bytes = row[0].UUID }) catch return; if (sync_context.incrementAndCheckStructLimit()) break; } @@ -207,7 +195,8 @@ pub fn parseEntities( ) ZipponError![]const u8 { var arena = std.heap.ArenaAllocator.init(self.allocator); defer arena.deinit(); - const allocator = arena.allocator(); + var safe_allocator = std.heap.ThreadSafeAllocator{ .child_allocator = self.allocator }; + const allocator = safe_allocator.allocator(); var buff = std.ArrayList(u8).init(entry_allocator); const writer = buff.writer(); @@ -231,7 +220,6 @@ pub fn parseEntities( // Do an array of writer for each thread var thread_writer_list = allocator.alloc(std.ArrayList(u8), to_parse.len) catch return ZipponError.MemoryError; - const data_types = try self.schema_engine.structName2DataType(struct_name); // Start parsing all file in multiple thread var wg: std.Thread.WaitGroup = .{}; @@ -247,7 +235,7 @@ pub fn parseEntities( sstruct.zid_schema, filter, additional_data.*, - data_types, + sstruct.types, &sync_context, }, ); @@ -289,16 +277,17 @@ fn parseEntitiesOneFile( defer fa.reset(); const allocator = fa.allocator(); + var buffered_writer = std.io.bufferedWriter(writer); + const path = std.fmt.bufPrint(&path_buffer, "{d}.zid", .{file_index}) catch return; var iter = zid.DataIterator.init(allocator, path, dir, zid_schema) catch return; while (iter.next() catch return) |row| { - fa.reset(); if (sync_context.checkStructLimit()) return; if (filter) |f| if (!f.evaluate(row)) continue; EntityWriter.writeEntityJSON( - writer, + buffered_writer.writer(), row, additional_data, data_types, @@ -306,6 +295,7 @@ fn parseEntitiesOneFile( if (sync_context.incrementAndCheckStructLimit()) return; } + buffered_writer.flush() catch return; } // Receive a map of UUID -> empty JsonString diff --git a/src/file/utils.zig b/src/file/utils.zig index f5573a6..bcd2af4 100644 --- a/src/file/utils.zig +++ b/src/file/utils.zig @@ -164,6 +164,7 @@ pub fn allFileIndex(self: Self, allocator: Allocator, struct_name: []const u8) Z var iter = dir.iterate(); while (iter.next() catch return ZipponError.DirIterError) |entry| { if (entry.kind != .file) continue; + if (std.mem.eql(u8, entry.name[0..(entry.name.len - 4)], ".new")) continue; // TODO: Delete the file, shouldn't be here const index = std.fmt.parseInt(usize, entry.name[0..(entry.name.len - 4)], 10) catch return ZipponError.InvalidFileIndex; array.append(index) catch return ZipponError.MemoryError; } diff --git a/src/file/write.zig b/src/file/write.zig index bc683cf..9dd613d 100644 --- a/src/file/write.zig +++ b/src/file/write.zig @@ -153,10 +153,7 @@ fn updateEntitiesOneFile( defer fa.reset(); const allocator = fa.allocator(); - var new_data_buff = allocator.alloc(zid.Data, index_switch.len) catch |err| { - sync_context.logError("Cant init new data buff", err); - return; - }; + var new_data_buff = allocator.alloc(zid.Data, index_switch.len) catch return; // First I fill the one that are updated by a const for (index_switch, 0..) |is, i| switch (is) { @@ -232,7 +229,8 @@ pub fn deleteEntities( ) ZipponError!void { var arena = std.heap.ArenaAllocator.init(self.allocator); defer arena.deinit(); - const allocator = arena.allocator(); + var safe_allocator = std.heap.ThreadSafeAllocator{ .child_allocator = self.allocator }; + const allocator = safe_allocator.allocator(); const sstruct = try self.schema_engine.structName2SchemaStruct(struct_name); @@ -244,24 +242,24 @@ pub fn deleteEntities( // Create a thread-safe writer for each file var thread_writer_list = allocator.alloc(std.ArrayList(u8), to_parse.len) catch return ZipponError.MemoryError; - for (thread_writer_list) |*list| { - list.* = std.ArrayList(u8).init(allocator); - } // Spawn threads for each file var wg: std.Thread.WaitGroup = .{}; - for (to_parse, 0..) |file_index, i| self.thread_pool.spawnWg( - &wg, - deleteEntitiesOneFile, - .{ - sstruct, - filter, - thread_writer_list[i].writer(), - file_index, - dir, - &sync_context, - }, - ); + for (to_parse, 0..) |file_index, i| { + thread_writer_list[i] = std.ArrayList(u8).init(allocator); + self.thread_pool.spawnWg( + &wg, + deleteEntitiesOneFile, + .{ + thread_writer_list[i].writer(), + file_index, + dir, + sstruct.zid_schema, + filter, + &sync_context, + }, + ); + } wg.wait(); // Combine results @@ -274,16 +272,16 @@ pub fn deleteEntities( // FIXME: Stop doing that and just remove UUID from the map itself instead of reparsing everything at the end // It's just that I can't do it in deleteEntitiesOneFile itself sstruct.uuid_file_index.map.clearRetainingCapacity(); - _ = sstruct.uuid_file_index.arena.reset(.free_all); + _ = sstruct.uuid_file_index.reset(); try self.populateFileIndexUUIDMap(sstruct, sstruct.uuid_file_index); } fn deleteEntitiesOneFile( - sstruct: SchemaStruct, - filter: ?Filter, writer: anytype, file_index: u64, dir: std.fs.Dir, + zid_schema: []zid.DType, + filter: ?Filter, sync_context: *ThreadSyncContext, ) void { var data_buffer: [config.BUFFER_SIZE]u8 = undefined; @@ -291,77 +289,36 @@ fn deleteEntitiesOneFile( defer fa.reset(); const allocator = fa.allocator(); - const path = std.fmt.allocPrint(allocator, "{d}.zid", .{file_index}) catch |err| { - sync_context.logError("Error creating file path", err); - return; - }; + const path = std.fmt.allocPrint(allocator, "{d}.zid", .{file_index}) catch return; - var iter = zid.DataIterator.init(allocator, path, dir, sstruct.zid_schema) catch |err| { - sync_context.logError("Error initializing DataIterator", err); - return; - }; + var iter = zid.DataIterator.init(allocator, path, dir, zid_schema) catch return; defer iter.deinit(); - const new_path = std.fmt.allocPrint(allocator, "{d}.zid.new", .{file_index}) catch |err| { - sync_context.logError("Error creating file path", err); - return; - }; + const new_path = std.fmt.allocPrint(allocator, "{d}.zid.new", .{file_index}) catch return; + zid.createFile(new_path, dir) catch return; - zid.createFile(new_path, dir) catch |err| { - sync_context.logError("Error creating new file", err); - return; - }; - - var new_writer = zid.DataWriter.init(new_path, dir) catch |err| { - sync_context.logError("Error initializing DataWriter", err); - return; - }; + var new_writer = zid.DataWriter.init(new_path, dir) catch return; errdefer new_writer.deinit(); var finish_writing = false; - while (iter.next() catch |err| { - sync_context.logError("Error during iter", err); - return; - }) |row| { + while (iter.next() catch return) |row| { if (!finish_writing and (filter == null or filter.?.evaluate(row))) { - writer.print("{{\"{s}\"}},", .{UUID.format_bytes(row[0].UUID)}) catch |err| { - sync_context.logError("Error writting", err); - return; - }; - + writer.print("{{\"{s}\"}},", .{UUID.format_bytes(row[0].UUID)}) catch return; finish_writing = sync_context.incrementAndCheckStructLimit(); } else { - new_writer.write(row) catch |err| { - sync_context.logError("Error writing unchanged data", err); - return; - }; + new_writer.write(row) catch return; } } - new_writer.flush() catch |err| { - sync_context.logError("Error flushing new writer", err); - return; - }; + new_writer.flush() catch return; - dir.deleteFile(path) catch |err| { - sync_context.logError("Error deleting old file", err); - return; - }; + dir.deleteFile(path) catch return; - const file_stat = new_writer.fileStat() catch |err| { - sync_context.logError("Error getting new file stat", err); - return; - }; + const file_stat = new_writer.fileStat() catch return; new_writer.deinit(); - if (file_index != 0 and file_stat.size == 0) dir.deleteFile(new_path) catch |err| { - sync_context.logError("Error deleting empty new file", err); - return; + if (file_index != 0 and file_stat.size == 0) { + dir.deleteFile(new_path) catch return; } else { - dir.rename(new_path, path) catch |err| { - sync_context.logError("Error renaming new file", err); - return; - }; + dir.rename(new_path, path) catch return; } - - sync_context.completeThread(); } diff --git a/src/thread/context.zig b/src/thread/context.zig index 62e5c7b..ae922a8 100644 --- a/src/thread/context.zig +++ b/src/thread/context.zig @@ -17,10 +17,6 @@ pub fn init(max_struct: u64) Self { }; } -pub fn completeThread(self: *Self) void { - _ = self.completed_file.fetchAdd(1, .release); -} - pub fn incrementAndCheckStructLimit(self: *Self) bool { if (self.max_struct == 0) return false; const new_count = self.processed_struct.fetchAdd(1, .acquire); @@ -32,8 +28,3 @@ pub fn checkStructLimit(self: *Self) bool { const count = self.processed_struct.load(.acquire); return (count) >= self.max_struct; } - -pub fn logError(self: *Self, message: []const u8, err: anyerror) void { - log.err("{s}: {any}", .{ message, err }); - _ = self.error_file.fetchAdd(1, .acquire); -}