diff --git a/src/fileEngine.zig b/src/fileEngine.zig index dfb23f8..ab9d84d 100644 --- a/src/fileEngine.zig +++ b/src/fileEngine.zig @@ -30,6 +30,40 @@ const log = std.log.scoped(.fileEngine); // TODO: Start using State at the start and end of each function for debugging const FileEngineState = enum { Parsing, Waiting }; +const ThreadSyncContext = struct { + processed_struct: std.atomic.Value(u64) = std.atomic.Value(u64).init(0), + error_file: std.atomic.Value(u64) = std.atomic.Value(u64).init(0), + completed_file: std.atomic.Value(u64) = std.atomic.Value(u64).init(0), + max_struct: u64, + max_file: u64, + + fn init(max_struct: u64, max_file: u64) ThreadSyncContext { + return ThreadSyncContext{ + .max_struct = max_struct, + .max_file = max_file, + }; + } + + fn isComplete(self: *ThreadSyncContext) bool { + return (self.completed_file.load(.acquire) + self.error_file.load(.acquire)) >= self.max_file; + } + + fn completeThread(self: *ThreadSyncContext) void { + _ = self.completed_file.fetchAdd(1, .release); + } + + fn incrementAndCheckStructLimit(self: *ThreadSyncContext) bool { + if (self.max_struct == 0) return false; + const new_count = self.processed_struct.fetchAdd(1, .monotonic); + return new_count >= self.max_struct; + } + + fn logError(self: *ThreadSyncContext, message: []const u8, err: anyerror) void { + log.err("{s}: {any}", .{ message, err }); + _ = self.error_file.fetchAdd(1, .acquire); + } +}; + /// Manage everything that is relate to read or write in files /// Or even get stats, whatever. If it touch files, it's here pub const FileEngine = struct { @@ -190,24 +224,104 @@ pub const FileEngine = struct { return count; } - /// Use a struct name to populate a list with all UUID of this struct - /// TODO: Use a radix trie or something in that style to keep UUID and file position in memory - /// TODO: Multi thread that too - pub fn getAllUUIDList(self: *FileEngine, struct_name: []const u8, uuid_list: *std.ArrayList(UUID)) ZipponError!void { + /// Use a struct name and filter to populate a map with all UUID bytes as key and void as value + pub fn populateUUIDMap( + self: *FileEngine, + struct_name: []const u8, + filter: ?Filter, + map: *std.AutoHashMap([16]u8, void), + additional_data: *AdditionalData, + ) ZipponError!void { const sstruct = try self.schema_engine.structName2SchemaStruct(struct_name); const max_file_index = try self.maxFileIndex(sstruct.name); const dir = try utils.printOpenDir("{s}/DATA/{s}", .{ self.path_to_ZipponDB_dir, sstruct.name }, .{}); - for (0..(max_file_index + 1)) |i| { - const path_buff = std.fmt.allocPrint(self.allocator, "{d}.zid", .{i}) catch return FileEngineError.MemoryError; - defer self.allocator.free(path_buff); + // Multi-threading setup + var arena = std.heap.ThreadSafeAllocator{ + .child_allocator = self.allocator, + }; - var iter = zid.DataIterator.init(self.allocator, path_buff, dir, sstruct.zid_schema) catch return FileEngineError.ZipponDataError; - defer iter.deinit(); + var pool: std.Thread.Pool = undefined; + defer pool.deinit(); + pool.init(std.Thread.Pool.Options{ + .allocator = arena.allocator(), + .n_jobs = CPU_CORE, + }) catch return ZipponError.ThreadError; - while (iter.next() catch return FileEngineError.ZipponDataError) |row| uuid_list.append(UUID{ .bytes = row[0].UUID }) catch return FileEngineError.MemoryError; + var sync_context = ThreadSyncContext.init( + additional_data.entity_count_to_find, + max_file_index + 1, + ); + + // Create a thread-safe writer for each file + var thread_writer_list = self.allocator.alloc(std.ArrayList([16]u8), max_file_index + 1) catch return FileEngineError.MemoryError; + defer { + for (thread_writer_list) |list| list.deinit(); + self.allocator.free(thread_writer_list); } + + for (thread_writer_list) |*list| { + list.* = std.ArrayList([16]u8).init(self.allocator); + } + + // Spawn threads for each file + for (0..(max_file_index + 1)) |file_index| { + pool.spawn(populateUUIDMapOneFile, .{ + sstruct, + filter, + &thread_writer_list[file_index], + file_index, + dir, + &sync_context, + }) catch return FileEngineError.ThreadError; + } + + // Wait for all threads to complete + while (!sync_context.isComplete()) { + std.time.sleep(10_000_000); + } + + // Combine results + for (thread_writer_list) |list| { + for (list.items) |uuid| map.put(uuid, void) catch return ZipponError.MemoryError; + } + } + + fn populateUUIDMapOneFile( + sstruct: SchemaStruct, + filter: ?Filter, + list: *std.ArrayList([16]u8), + file_index: u64, + dir: std.fs.Dir, + sync_context: *ThreadSyncContext, + ) void { + var data_buffer: [BUFFER_SIZE]u8 = undefined; + var fa = std.heap.FixedBufferAllocator.init(&data_buffer); + defer fa.reset(); + const allocator = fa.allocator(); + + var path_buffer: [128]u8 = undefined; + const path = std.fmt.bufPrint(&path_buffer, "{d}.zid", .{file_index}) catch |err| { + sync_context.logError("Error creating file path", err); + return; + }; + + var iter = zid.DataIterator.init(allocator, path, dir, sstruct.zid_schema) catch |err| { + sync_context.logError("Error initializing DataIterator", err); + return; + }; + defer iter.deinit(); + + while (iter.next() catch return) |row| { + if (filter == null or filter.?.evaluate(row)) { + list.*.append(row[0].UUID); + + if (sync_context.incrementAndCheckStructLimit()) break; + } + } + + _ = sync_context.completeThread(); } /// Take a filter, parse all file and if one struct if validate by the filter, write it in a JSON format to the writer @@ -233,9 +347,21 @@ pub const FileEngine = struct { const dir = try utils.printOpenDir("{s}/DATA/{s}", .{ self.path_to_ZipponDB_dir, sstruct.name }, .{ .access_sub_paths = false }); // Multi thread stuffs - var total_entity_found: U64 = U64.init(0); - var ended_count: U64 = U64.init(0); - var error_count: U64 = U64.init(0); + var arena = std.heap.ThreadSafeAllocator{ + .child_allocator = self.allocator, + }; + + var pool: std.Thread.Pool = undefined; + defer pool.deinit(); + pool.init(std.Thread.Pool.Options{ + .allocator = arena.allocator(), + .n_jobs = CPU_CORE, + }) catch return ZipponError.ThreadError; + + var sync_context = ThreadSyncContext.init( + additional_data.entity_count_to_find, + max_file_index + 1, + ); // Do one array and writer for each thread otherwise then create error by writing at the same time // Maybe use fixed lenght buffer for speed here @@ -245,19 +371,6 @@ pub const FileEngine = struct { self.allocator.free(thread_writer_list); } - var thread_safe_arena: std.heap.ThreadSafeAllocator = .{ - .child_allocator = self.allocator, - }; - const arena = thread_safe_arena.allocator(); - - // TODO: Put that in the db engine, so I dont need to init the Pool every time - var thread_pool: Pool = undefined; - thread_pool.init(Pool.Options{ - .allocator = arena, // this is an arena allocator from `std.heap.ArenaAllocator` - .n_jobs = CPU_CORE, // optional, by default the number of CPUs available - }) catch return FileEngineError.ThreadError; - defer thread_pool.deinit(); - // Maybe do one buffer per files ? var data_buffer: [BUFFER_SIZE]u8 = undefined; var fa = std.heap.FixedBufferAllocator.init(&data_buffer); @@ -268,7 +381,7 @@ pub const FileEngine = struct { for (0..(max_file_index + 1)) |file_index| { thread_writer_list[file_index] = std.ArrayList(u8).init(allocator); - thread_pool.spawn(parseEntitiesOneFile, .{ + pool.spawn(parseEntitiesOneFile, .{ thread_writer_list[file_index].writer(), file_index, dir, @@ -276,19 +389,15 @@ pub const FileEngine = struct { filter, additional_data, try self.schema_engine.structName2DataType(struct_name), - &total_entity_found, - &ended_count, - &error_count, + &sync_context, }) catch return FileEngineError.ThreadError; } // Wait for all thread to either finish or return an error - while ((ended_count.load(.acquire) + error_count.load(.acquire)) < max_file_index + 1) { + while (!sync_context.isComplete()) { std.time.sleep(10_000_000); // Check every 10ms } - if (error_count.load(.acquire) > 0) log.warn("Thread ended with an error {d}", .{error_count.load(.acquire)}); - // Append all writer to each other writer.writeByte('[') catch return FileEngineError.WriteError; for (thread_writer_list) |list| writer.writeAll(list.items) catch return FileEngineError.WriteError; @@ -303,10 +412,9 @@ pub const FileEngine = struct { filter: ?Filter, additional_data: *AdditionalData, data_types: []const DataType, - total_entity_found: *U64, - ended_count: *U64, - error_count: *U64, + sync_context: *ThreadSyncContext, ) void { + log.debug("{any}\n", .{@TypeOf(writer)}); var data_buffer: [BUFFER_SIZE]u8 = undefined; var fa = std.heap.FixedBufferAllocator.init(&data_buffer); defer fa.reset(); @@ -314,30 +422,34 @@ pub const FileEngine = struct { var path_buffer: [16]u8 = undefined; const path = std.fmt.bufPrint(&path_buffer, "{d}.zid", .{file_index}) catch |err| { - logErrorAndIncrementCount("Error creating file path", err, error_count); + sync_context.logError("Error creating file path", err); return; }; var iter = zid.DataIterator.init(allocator, path, dir, zid_schema) catch |err| { - logErrorAndIncrementCount("Error initializing DataIterator", err, error_count); + sync_context.logError("Error initializing DataIterator", err); return; }; while (iter.next() catch |err| { - logErrorAndIncrementCount("Error in iter next", err, error_count); + sync_context.logError("Error in iter next", err); return; }) |row| { if (filter) |f| if (!f.evaluate(row)) continue; - if (writeEntity(writer, row, additional_data, data_types)) |_| { - if (incrementAndCheckLimit(total_entity_found, additional_data.entity_count_to_find)) break; - } else |err| { - logErrorAndIncrementCount("Error writing entity", err, error_count); + writeEntity( + writer, + row, + additional_data, + data_types, + ) catch |err| { + sync_context.logError("Error writing entity", err); return; - } + }; + if (sync_context.incrementAndCheckStructLimit()) break; } - _ = ended_count.fetchAdd(1, .acquire); + _ = sync_context.completeThread(); } fn writeEntity( @@ -453,21 +565,21 @@ pub const FileEngine = struct { const dir = try utils.printOpenDir("{s}/DATA/{s}", .{ self.path_to_ZipponDB_dir, sstruct.name }, .{}); // Multi-threading setup - var total_entity_updated = U64.init(0); - var ended_count = U64.init(0); - var error_count = U64.init(0); - - var thread_safe_arena: std.heap.ThreadSafeAllocator = .{ + var arena = std.heap.ThreadSafeAllocator{ .child_allocator = self.allocator, }; - const arena = thread_safe_arena.allocator(); - var thread_pool: Pool = undefined; - thread_pool.init(Pool.Options{ - .allocator = arena, + var pool: std.Thread.Pool = undefined; + defer pool.deinit(); + pool.init(std.Thread.Pool.Options{ + .allocator = arena.allocator(), .n_jobs = CPU_CORE, - }) catch return FileEngineError.ThreadError; - defer thread_pool.deinit(); + }) catch return ZipponError.ThreadError; + + var sync_context = ThreadSyncContext.init( + additional_data.entity_count_to_find, + max_file_index + 1, + ); // Create a thread-safe writer for each file var thread_writer_list = self.allocator.alloc(std.ArrayList(u8), max_file_index + 1) catch return FileEngineError.MemoryError; @@ -480,9 +592,10 @@ pub const FileEngine = struct { list.* = std.ArrayList(u8).init(self.allocator); } - var data_arena = std.heap.ArenaAllocator.init(std.heap.page_allocator); - defer data_arena.deinit(); - const data_allocator = data_arena.allocator(); + var data_buffer: [BUFFER_SIZE]u8 = undefined; + var fa = std.heap.FixedBufferAllocator.init(&data_buffer); + defer fa.reset(); + const data_allocator = fa.allocator(); var new_data_buff = data_allocator.alloc(zid.Data, sstruct.members.len) catch return ZipponError.MemoryError; @@ -496,23 +609,20 @@ pub const FileEngine = struct { // Spawn threads for each file for (0..(max_file_index + 1)) |file_index| { - thread_pool.spawn(updateEntitiesOneFile, .{ + pool.spawn(updateEntitiesOneFile, .{ new_data_buff, sstruct, filter, &map, thread_writer_list[file_index].writer(), - additional_data, file_index, dir, - &total_entity_updated, - &ended_count, - &error_count, + &sync_context, }) catch return FileEngineError.ThreadError; } // Wait for all threads to complete - while ((ended_count.load(.acquire) + error_count.load(.acquire)) < max_file_index + 1) { + while (sync_context.isComplete()) { std.time.sleep(10_000_000); // Check every 10ms } @@ -530,13 +640,11 @@ pub const FileEngine = struct { filter: ?Filter, map: *const std.StringHashMap([]const u8), writer: anytype, - additional_data: *AdditionalData, file_index: u64, dir: std.fs.Dir, - total_entity_updated: *U64, - ended_count: *U64, - error_count: *U64, + sync_context: *ThreadSyncContext, ) void { + log.debug("{any}\n", .{@TypeOf(writer)}); var data_buffer: [BUFFER_SIZE]u8 = undefined; var fa = std.heap.FixedBufferAllocator.init(&data_buffer); defer fa.reset(); @@ -544,34 +652,39 @@ pub const FileEngine = struct { var path_buffer: [128]u8 = undefined; const path = std.fmt.bufPrint(&path_buffer, "{d}.zid", .{file_index}) catch |err| { - logErrorAndIncrementCount("Error creating file path", err, error_count); + sync_context.logError("Error creating file path", err); return; }; var iter = zid.DataIterator.init(allocator, path, dir, sstruct.zid_schema) catch |err| { - logErrorAndIncrementCount("Error initializing DataIterator", err, error_count); + sync_context.logError("Error initializing DataIterator", err); return; }; defer iter.deinit(); const new_path = std.fmt.allocPrint(allocator, "{d}.zid.new", .{file_index}) catch |err| { - logErrorAndIncrementCount("Error creating new file path", err, error_count); + sync_context.logError("Error creating new file path", err); return; }; defer allocator.free(new_path); zid.createFile(new_path, dir) catch |err| { - logErrorAndIncrementCount("Error creating new file", err, error_count); + sync_context.logError("Error creating new file", err); return; }; var new_writer = zid.DataWriter.init(new_path, dir) catch |err| { - logErrorAndIncrementCount("Error initializing DataWriter", err, error_count); + sync_context.logError("Error initializing DataWriter", err); + zid.deleteFile(new_path, dir) catch {}; return; }; defer new_writer.deinit(); - while (iter.next() catch return) |row| { + while (iter.next() catch |err| { + sync_context.logError("Error initializing DataWriter", err); + zid.deleteFile(new_path, dir) catch {}; + return; + }) |row| { if (filter == null or filter.?.evaluate(row)) { // Add the unchanged Data in the new_data_buff new_data_buff[0] = row[0]; @@ -580,40 +693,57 @@ pub const FileEngine = struct { new_data_buff[i] = row[i]; } + log.debug("{d} {any}\n\n", .{ new_data_buff.len, new_data_buff }); + new_writer.write(new_data_buff) catch |err| { - logErrorAndIncrementCount("Error writing new data", err, error_count); + sync_context.logError("Error initializing DataWriter", err); + zid.deleteFile(new_path, dir) catch {}; return; }; - writer.writeByte('{') catch return; - writer.print("\"{s}\"", .{UUID.format_bytes(row[0].UUID)}) catch return; - writer.writeAll("},") catch return; - - if (incrementAndCheckLimit(total_entity_updated, additional_data.entity_count_to_find)) break; + // FIXME: This get an error and I have no idea why + //writer.writeByte('{') catch |err| { + // sync_context.logError("Error initializing DataWriter", err); + // zid.deleteFile(new_path, dir) catch {}; + // return; + //}; + //writer.print("\"{s}\"", .{UUID.format_bytes(row[0].UUID)}) catch |err| { + // sync_context.logError("Error initializing DataWriter", err); + // zid.deleteFile(new_path, dir) catch {}; + // return; + //}; + //writer.writeAll("},") catch |err| { + // sync_context.logError("Error initializing DataWriter", err); + // zid.deleteFile(new_path, dir) catch {}; + // return; + //}; + if (sync_context.incrementAndCheckStructLimit()) break; } else { new_writer.write(row) catch |err| { - logErrorAndIncrementCount("Error writing unchanged data", err, error_count); + sync_context.logError("Error initializing DataWriter", err); + zid.deleteFile(new_path, dir) catch {}; return; }; } } new_writer.flush() catch |err| { - logErrorAndIncrementCount("Error flushing new writer", err, error_count); + sync_context.logError("Error initializing DataWriter", err); + zid.deleteFile(new_path, dir) catch {}; return; }; dir.deleteFile(path) catch |err| { - logErrorAndIncrementCount("Error deleting old file", err, error_count); + sync_context.logError("Error deleting old file", err); return; }; dir.rename(new_path, path) catch |err| { - logErrorAndIncrementCount("Error renaming new file", err, error_count); + sync_context.logError("Error initializing DataWriter", err); return; }; - _ = ended_count.fetchAdd(1, .acquire); + _ = sync_context.completeThread(); } /// Will delete all entity based on the filter. Will also write a JSON format list of all UUID deleted into the buffer @@ -630,21 +760,21 @@ pub const FileEngine = struct { const dir = try utils.printOpenDir("{s}/DATA/{s}", .{ self.path_to_ZipponDB_dir, sstruct.name }, .{}); // Multi-threading setup - var total_entity_deleted = U64.init(0); - var ended_count = U64.init(0); - var error_count = U64.init(0); - - var thread_safe_arena: std.heap.ThreadSafeAllocator = .{ + var arena = std.heap.ThreadSafeAllocator{ .child_allocator = self.allocator, }; - const arena = thread_safe_arena.allocator(); - var thread_pool: Pool = undefined; - thread_pool.init(Pool.Options{ - .allocator = arena, + var pool: std.Thread.Pool = undefined; + defer pool.deinit(); + pool.init(std.Thread.Pool.Options{ + .allocator = arena.allocator(), .n_jobs = CPU_CORE, - }) catch return FileEngineError.ThreadError; - defer thread_pool.deinit(); + }) catch return ZipponError.ThreadError; + + var sync_context = ThreadSyncContext.init( + additional_data.entity_count_to_find, + max_file_index + 1, + ); // Create a thread-safe writer for each file var thread_writer_list = self.allocator.alloc(std.ArrayList(u8), max_file_index + 1) catch return FileEngineError.MemoryError; @@ -659,21 +789,18 @@ pub const FileEngine = struct { // Spawn threads for each file for (0..(max_file_index + 1)) |file_index| { - thread_pool.spawn(deleteEntitiesOneFile, .{ + pool.spawn(deleteEntitiesOneFile, .{ sstruct, filter, thread_writer_list[file_index].writer(), - additional_data, file_index, dir, - &total_entity_deleted, - &ended_count, - &error_count, + &sync_context, }) catch return FileEngineError.ThreadError; } // Wait for all threads to complete - while ((ended_count.load(.acquire) + error_count.load(.acquire)) < max_file_index + 1) { + while (!sync_context.isComplete()) { std.time.sleep(10_000_000); // Check every 10ms } @@ -689,12 +816,9 @@ pub const FileEngine = struct { sstruct: SchemaStruct, filter: ?Filter, writer: anytype, - additional_data: *AdditionalData, file_index: u64, dir: std.fs.Dir, - total_entity_deleted: *U64, - ended_count: *U64, - error_count: *U64, + sync_context: *ThreadSyncContext, ) void { var data_buffer: [BUFFER_SIZE]u8 = undefined; var fa = std.heap.FixedBufferAllocator.init(&data_buffer); @@ -703,29 +827,29 @@ pub const FileEngine = struct { var path_buffer: [128]u8 = undefined; const path = std.fmt.bufPrint(&path_buffer, "{d}.zid", .{file_index}) catch |err| { - logErrorAndIncrementCount("Error creating file path", err, error_count); + sync_context.logError("Error creating file path", err); return; }; var iter = zid.DataIterator.init(allocator, path, dir, sstruct.zid_schema) catch |err| { - logErrorAndIncrementCount("Error initializing DataIterator", err, error_count); + sync_context.logError("Error initializing DataIterator", err); return; }; defer iter.deinit(); const new_path = std.fmt.allocPrint(allocator, "{d}.zid.new", .{file_index}) catch |err| { - logErrorAndIncrementCount("Error creating new file path", err, error_count); + sync_context.logError("Error creating new file path", err); return; }; defer allocator.free(new_path); zid.createFile(new_path, dir) catch |err| { - logErrorAndIncrementCount("Error creating new file", err, error_count); + sync_context.logError("Error creating new file", err); return; }; var new_writer = zid.DataWriter.init(new_path, dir) catch |err| { - logErrorAndIncrementCount("Error initializing DataWriter", err, error_count); + sync_context.logError("Error initializing DataWriter", err); return; }; defer new_writer.deinit(); @@ -736,43 +860,31 @@ pub const FileEngine = struct { writer.print("\"{s}\"", .{UUID.format_bytes(row[0].UUID)}) catch return; writer.writeAll("},") catch return; - if (incrementAndCheckLimit(total_entity_deleted, additional_data.entity_count_to_find)) break; + if (sync_context.incrementAndCheckStructLimit()) break; } else { new_writer.write(row) catch |err| { - logErrorAndIncrementCount("Error writing unchanged data", err, error_count); + sync_context.logError("Error writing unchanged data", err); return; }; } } new_writer.flush() catch |err| { - logErrorAndIncrementCount("Error flushing new writer", err, error_count); + sync_context.logError("Error flushing new writer", err); return; }; dir.deleteFile(path) catch |err| { - logErrorAndIncrementCount("Error deleting old file", err, error_count); + sync_context.logError("Error deleting old file", err); return; }; dir.rename(new_path, path) catch |err| { - logErrorAndIncrementCount("Error renaming new file", err, error_count); + sync_context.logError("Error renaming new file", err); return; }; - _ = ended_count.fetchAdd(1, .acquire); - } - - // --------------------Shared multi threading methods-------------------- - - fn incrementAndCheckLimit(counter: *U64, limit: u64) bool { - const new_count = counter.fetchAdd(1, .monotonic) + 1; - return limit != 0 and new_count >= limit; - } - - fn logErrorAndIncrementCount(message: []const u8, err: anyerror, error_count: *U64) void { - log.err("{s}: {any}", .{ message, err }); - _ = error_count.fetchAdd(1, .acquire); + _ = sync_context.completeThread(); } // --------------------ZipponData utils-------------------- @@ -880,6 +992,7 @@ pub const FileEngine = struct { const file_stat = member_dir.statFile(entry.name) catch return FileEngineError.FileStatError; if (file_stat.size < MAX_FILE_SIZE) { // Cant I just return i ? It is supossed that files are ordered. I think I already check and it is not + std.debug.print("{s}\n\n", .{entry.name[0..(entry.name.len - 4)]}); return std.fmt.parseInt(usize, entry.name[0..(entry.name.len - 4)], 10) catch return FileEngineError.InvalidFileIndex; // INFO: Hardcoded len of file extension } }