diff --git a/src/fileEngine.zig b/src/fileEngine.zig index a2922f9..9eff8d8 100644 --- a/src/fileEngine.zig +++ b/src/fileEngine.zig @@ -718,45 +718,149 @@ pub const FileEngine = struct { ) ZipponError!void { const sstruct = try self.structName2SchemaStruct(struct_name); const max_file_index = try self.maxFileIndex(sstruct.name); - var total_currently_found: usize = 0; const dir = try utils.printOpenDir("{s}/DATA/{s}", .{ self.path_to_ZipponDB_dir, sstruct.name }, .{}); - writer.writeByte('[') catch return FileEngineError.WriteError; - for (0..(max_file_index + 1)) |file_index| { // TODO: Multi thread that - const path_buff = std.fmt.allocPrint(self.allocator, "{d}.zid", .{file_index}) catch return FileEngineError.MemoryError; - defer self.allocator.free(path_buff); + // Multi-threading setup + var total_entity_deleted = U64.init(0); + var ended_count = U64.init(0); + var error_count = U64.init(0); - var iter = zid.DataIterator.init(self.allocator, path_buff, dir, sstruct.zid_schema) catch return FileEngineError.ZipponDataError; - defer iter.deinit(); + var thread_safe_arena: std.heap.ThreadSafeAllocator = .{ + .child_allocator = self.allocator, + }; + const arena = thread_safe_arena.allocator(); - const new_path_buff = std.fmt.allocPrint(self.allocator, "{d}.zid.new", .{file_index}) catch return FileEngineError.MemoryError; - defer self.allocator.free(new_path_buff); + var thread_pool: Pool = undefined; + thread_pool.init(Pool.Options{ + .allocator = arena, + .n_jobs = CPU_CORE, + }) catch return FileEngineError.ThreadError; + defer thread_pool.deinit(); - zid.createFile(new_path_buff, dir) catch return FileEngineError.ZipponDataError; - var new_writer = zid.DataWriter.init(new_path_buff, dir) catch return FileEngineError.ZipponDataError; - defer new_writer.deinit(); - - blk: while (iter.next() catch return FileEngineError.ZipponDataError) |row| { - if (filter != null) if (filter.?.evaluate(row)) { - writer.writeByte('{') catch return FileEngineError.WriteError; - writer.print("\"{s}\"", .{UUID.format_bytes(row[0].UUID)}) catch return FileEngineError.WriteError; - writer.writeAll("},") catch return FileEngineError.WriteError; - total_currently_found += 1; - if (additional_data.entity_count_to_find != 0 and total_currently_found >= additional_data.entity_count_to_find) break :blk; - } else { - new_writer.write(row) catch return FileEngineError.WriteError; - }; - } - - new_writer.flush() catch return FileEngineError.ZipponDataError; - dir.deleteFile(path_buff) catch return FileEngineError.DeleteFileError; - dir.rename(new_path_buff, path_buff) catch return FileEngineError.RenameFileError; + // Create a thread-safe writer for each file + var thread_writer_list = self.allocator.alloc(std.ArrayList(u8), max_file_index + 1) catch return FileEngineError.MemoryError; + defer { + for (thread_writer_list) |list| list.deinit(); + self.allocator.free(thread_writer_list); } + for (thread_writer_list) |*list| { + list.* = std.ArrayList(u8).init(self.allocator); + } + + // Spawn threads for each file + for (0..(max_file_index + 1)) |file_index| { + thread_pool.spawn(deleteEntitiesOneFile, .{ + self, + struct_name, + filter, + thread_writer_list[file_index].writer(), + additional_data, + file_index, + dir, + &total_entity_deleted, + &ended_count, + &error_count, + }) catch return FileEngineError.ThreadError; + } + + // Wait for all threads to complete + while ((ended_count.load(.acquire) + error_count.load(.acquire)) < max_file_index + 1) { + std.time.sleep(10_000_000); // Check every 10ms + } + + // Combine results + writer.writeByte('[') catch return FileEngineError.WriteError; + for (thread_writer_list) |list| { + writer.writeAll(list.items) catch return FileEngineError.WriteError; + } writer.writeByte(']') catch return FileEngineError.WriteError; } + fn deleteEntitiesOneFile( + file_engine: *FileEngine, + struct_name: []const u8, + filter: ?Filter, + writer: anytype, + additional_data: *AdditionalData, + file_index: u64, + dir: std.fs.Dir, + total_entity_deleted: *U64, + ended_count: *U64, + error_count: *U64, + ) void { + var data_buffer: [BUFFER_SIZE]u8 = undefined; + var fa = std.heap.FixedBufferAllocator.init(&data_buffer); + defer fa.reset(); + const allocator = fa.allocator(); + + const sstruct = file_engine.structName2SchemaStruct(struct_name) catch |err| { + logErrorAndIncrementCount("Error getting schema struct", err, error_count); + return; + }; + + var path_buffer: [128]u8 = undefined; + const path = std.fmt.bufPrint(&path_buffer, "{d}.zid", .{file_index}) catch |err| { + logErrorAndIncrementCount("Error creating file path", err, error_count); + return; + }; + + var iter = zid.DataIterator.init(allocator, path, dir, sstruct.zid_schema) catch |err| { + logErrorAndIncrementCount("Error initializing DataIterator", err, error_count); + return; + }; + defer iter.deinit(); + + const new_path = std.fmt.allocPrint(allocator, "{d}.zid.new", .{file_index}) catch |err| { + logErrorAndIncrementCount("Error creating new file path", err, error_count); + return; + }; + defer allocator.free(new_path); + + zid.createFile(new_path, dir) catch |err| { + logErrorAndIncrementCount("Error creating new file", err, error_count); + return; + }; + + var new_writer = zid.DataWriter.init(new_path, dir) catch |err| { + logErrorAndIncrementCount("Error initializing DataWriter", err, error_count); + return; + }; + defer new_writer.deinit(); + + while (iter.next() catch return) |row| { + if (filter == null or filter.?.evaluate(row)) { + writer.writeByte('{') catch return; + writer.print("\"{s}\"", .{UUID.format_bytes(row[0].UUID)}) catch return; + writer.writeAll("},") catch return; + + if (incrementAndCheckLimit(total_entity_deleted, additional_data.entity_count_to_find)) break; + } else { + new_writer.write(row) catch |err| { + logErrorAndIncrementCount("Error writing unchanged data", err, error_count); + return; + }; + } + } + + new_writer.flush() catch |err| { + logErrorAndIncrementCount("Error flushing new writer", err, error_count); + return; + }; + + dir.deleteFile(path) catch |err| { + logErrorAndIncrementCount("Error deleting old file", err, error_count); + return; + }; + + dir.rename(new_path, path) catch |err| { + logErrorAndIncrementCount("Error renaming new file", err, error_count); + return; + }; + + _ = ended_count.fetchAdd(1, .acquire); + } // --------------------ZipponData utils-------------------- fn string2Data(allocator: Allocator, dt: DataType, value: []const u8) ZipponError!zid.Data {