diff --git a/src/fileEngine.zig b/src/fileEngine.zig index 135d9f9..a2922f9 100644 --- a/src/fileEngine.zig +++ b/src/fileEngine.zig @@ -38,6 +38,8 @@ const CPU_CORE = config.CPU_CORE; const log = std.log.scoped(.fileEngine); +const FileEngineState = enum { MissingPath, MissingSchema, Ok }; + /// Manage everything that is relate to read or write in files /// Or even get stats, whatever. If it touch files, it's here /// TODO: Keep all struct dir in a haspmap so I dont need to use an allocPrint everytime @@ -46,14 +48,13 @@ pub const FileEngine = struct { path_to_ZipponDB_dir: []const u8, null_terminated_schema_buff: [:0]u8, struct_array: []SchemaStruct, + state: FileEngineState, pub fn init(allocator: Allocator, path: []const u8) ZipponError!FileEngine { - const path_to_ZipponDB_dir = path; - var schema_buf = allocator.alloc(u8, BUFFER_SIZE) catch return FileEngineError.MemoryError; // TODO: Use a list defer allocator.free(schema_buf); - const len: usize = FileEngine.readSchemaFile(path_to_ZipponDB_dir, schema_buf) catch 0; + const len: usize = FileEngine.readSchemaFile(path, schema_buf) catch 0; const null_terminated_schema_buff = allocator.dupeZ(u8, schema_buf[0..len]) catch return FileEngineError.MemoryError; errdefer allocator.free(null_terminated_schema_buff); @@ -63,15 +64,17 @@ pub const FileEngine = struct { var struct_array = std.ArrayList(SchemaStruct).init(allocator); parser.parse(&struct_array) catch return FileEngineError.SchemaNotConform; + var state: FileEngineState = .Ok; + if (len == 0) state = .MissingSchema; + if (std.mem.eql(u8, "", path)) state = .MissingPath; + return FileEngine{ .allocator = allocator, - .path_to_ZipponDB_dir = path_to_ZipponDB_dir, + .path_to_ZipponDB_dir = path, .null_terminated_schema_buff = null_terminated_schema_buff, .struct_array = struct_array.toOwnedSlice() catch return FileEngineError.MemoryError, + .state = state, }; - - // try file_engine.populateAllUUIDToFileIndexMap(); - } pub fn deinit(self: *FileEngine) void { @@ -82,34 +85,7 @@ pub const FileEngine = struct { } pub fn usable(self: FileEngine) bool { - return !std.mem.eql(u8, "", self.path_to_ZipponDB_dir); - } - - // For all struct in shema, add the UUID/index_file into the map - pub fn populateAllUUIDToFileIndexMap(self: *FileEngine) ZipponError!void { - for (self.struct_array) |*sstruct| { // Stand for schema struct - const max_file_index = try self.maxFileIndex(sstruct.name); - - var path_buff = std.fmt.allocPrint( - self.allocator, - "{s}/DATA/{s}", - .{ self.path_to_ZipponDB_dir, sstruct.name }, - ) catch return FileEngineError.MemoryError; - defer self.allocator.free(path_buff); - - const dir = std.fs.cwd().openDir(path_buff, .{}) catch return FileEngineError.CantOpenDir; - - for (0..(max_file_index + 1)) |i| { - self.allocator.free(path_buff); - path_buff = std.fmt.allocPrint(self.allocator, "{d}.zid", .{i}) catch return FileEngineError.MemoryError; - - var iter = zid.DataIterator.init(self.allocator, path_buff, dir, sstruct.zid_schema) catch return FileEngineError.ZipponDataError; - defer iter.deinit(); - while (iter.next() catch return FileEngineError.ZipponDataError) |row| { - sstruct.uuid_file_index.put(row[0].UUID, i) catch return FileEngineError.MemoryError; - } - } - } + return self.state == .Ok; } // --------------------Other-------------------- @@ -341,6 +317,14 @@ pub const FileEngine = struct { var ended_count: U64 = U64.init(0); var error_count: U64 = U64.init(0); + // Do one array and writer for each thread otherwise then create error by writing at the same time + // Maybe use fixed lenght buffer for speed here + var thread_writer_list = self.allocator.alloc(std.ArrayList(u8), max_file_index + 1) catch return FileEngineError.MemoryError; + defer { + for (thread_writer_list) |list| list.deinit(); + self.allocator.free(thread_writer_list); + } + var thread_safe_arena: std.heap.ThreadSafeAllocator = .{ .child_allocator = self.allocator, }; @@ -354,14 +338,6 @@ pub const FileEngine = struct { }) catch return FileEngineError.ThreadError; defer thread_pool.deinit(); - // Do one array and writer for each thread otherwise then create error by writing at the same time - // Maybe use fixed lenght buffer for speed here - var thread_writer_list = self.allocator.alloc(std.ArrayList(u8), max_file_index + 1) catch return FileEngineError.MemoryError; - defer { - for (thread_writer_list) |list| list.deinit(); - self.allocator.free(thread_writer_list); - } - // Maybe do one buffer per files ? var data_buffer: [BUFFER_SIZE]u8 = undefined; var fa = std.heap.FixedBufferAllocator.init(&data_buffer); @@ -560,78 +536,176 @@ pub const FileEngine = struct { ) ZipponError!void { const sstruct = try self.structName2SchemaStruct(struct_name); const max_file_index = try self.maxFileIndex(sstruct.name); - var total_currently_found: usize = 0; const dir = try utils.printOpenDir("{s}/DATA/{s}", .{ self.path_to_ZipponDB_dir, sstruct.name }, .{}); - var new_data_buff = self.allocator.alloc(zid.Data, try self.numberOfMemberInStruct(struct_name)) catch return FileEngineError.MemoryError; - defer self.allocator.free(new_data_buff); + // Multi-threading setup + var total_entity_updated = U64.init(0); + var ended_count = U64.init(0); + var error_count = U64.init(0); + + var thread_safe_arena: std.heap.ThreadSafeAllocator = .{ + .child_allocator = self.allocator, + }; + const arena = thread_safe_arena.allocator(); + + var thread_pool: Pool = undefined; + thread_pool.init(Pool.Options{ + .allocator = arena, + .n_jobs = CPU_CORE, + }) catch return FileEngineError.ThreadError; + defer thread_pool.deinit(); + + // Create a thread-safe writer for each file + var thread_writer_list = self.allocator.alloc(std.ArrayList(u8), max_file_index + 1) catch return FileEngineError.MemoryError; + defer { + for (thread_writer_list) |list| list.deinit(); + self.allocator.free(thread_writer_list); + } + + for (thread_writer_list) |*list| { + list.* = std.ArrayList(u8).init(self.allocator); + } + + // Spawn threads for each file + for (0..(max_file_index + 1)) |file_index| { + thread_pool.spawn(updateEntitiesOneFile, .{ + self, + struct_name, + filter, + &map, + thread_writer_list[file_index].writer(), + additional_data, + file_index, + dir, + &total_entity_updated, + &ended_count, + &error_count, + }) catch return FileEngineError.ThreadError; + } + + // Wait for all threads to complete + while ((ended_count.load(.acquire) + error_count.load(.acquire)) < max_file_index + 1) { + std.time.sleep(10_000_000); // Check every 10ms + } + + // Combine results + writer.writeByte('[') catch return FileEngineError.WriteError; + for (thread_writer_list) |list| { + writer.writeAll(list.items) catch return FileEngineError.WriteError; + } + writer.writeByte(']') catch return FileEngineError.WriteError; + } + + fn updateEntitiesOneFile( + file_engine: *FileEngine, + struct_name: []const u8, + filter: ?Filter, + map: *const std.StringHashMap([]const u8), + writer: anytype, + additional_data: *AdditionalData, + file_index: u64, + dir: std.fs.Dir, + total_entity_updated: *U64, + ended_count: *U64, + error_count: *U64, + ) void { + var data_buffer: [BUFFER_SIZE]u8 = undefined; + var fa = std.heap.FixedBufferAllocator.init(&data_buffer); + defer fa.reset(); + const allocator = fa.allocator(); + + const sstruct = file_engine.structName2SchemaStruct(struct_name) catch |err| { + logErrorAndIncrementCount("Error getting schema struct", err, error_count); + return; + }; + + var new_data_buff = allocator.alloc(zid.Data, file_engine.numberOfMemberInStruct(struct_name) catch return) catch { + logErrorAndIncrementCount("Memory allocation error", error.OutOfMemory, error_count); + return; + }; + defer allocator.free(new_data_buff); // Add the new data - for (try self.structName2structMembers(struct_name), 0..) |member, i| { + for (file_engine.structName2structMembers(struct_name) catch return, 0..) |member, i| { if (!map.contains(member)) continue; - const dt = try self.memberName2DataType(struct_name, member); - new_data_buff[i] = try string2Data(self.allocator, dt, map.get(member).?); + const dt = file_engine.memberName2DataType(struct_name, member) catch return; + new_data_buff[i] = string2Data(allocator, dt, map.get(member).?) catch return; } - writer.writeByte('[') catch return FileEngineError.WriteError; - for (0..(max_file_index + 1)) |file_index| { // TODO: Multi thread that - if (additional_data.entity_count_to_find != 0 and total_currently_found >= additional_data.entity_count_to_find) break; + var path_buffer: [128]u8 = undefined; + const path = std.fmt.bufPrint(&path_buffer, "{d}.zid", .{file_index}) catch |err| { + logErrorAndIncrementCount("Error creating file path", err, error_count); + return; + }; - const path_buff = std.fmt.allocPrint(self.allocator, "{d}.zid", .{file_index}) catch return FileEngineError.MemoryError; - defer self.allocator.free(path_buff); + var iter = zid.DataIterator.init(allocator, path, dir, sstruct.zid_schema) catch |err| { + logErrorAndIncrementCount("Error initializing DataIterator", err, error_count); + return; + }; + defer iter.deinit(); - var iter = zid.DataIterator.init(self.allocator, path_buff, dir, sstruct.zid_schema) catch return FileEngineError.ZipponDataError; - defer iter.deinit(); + const new_path = std.fmt.allocPrint(allocator, "{d}.zid.new", .{file_index}) catch |err| { + logErrorAndIncrementCount("Error creating new file path", err, error_count); + return; + }; + defer allocator.free(new_path); - const new_path_buff = std.fmt.allocPrint(self.allocator, "{d}.zid.new", .{file_index}) catch return FileEngineError.MemoryError; - defer self.allocator.free(new_path_buff); + zid.createFile(new_path, dir) catch |err| { + logErrorAndIncrementCount("Error creating new file", err, error_count); + return; + }; - zid.createFile(new_path_buff, dir) catch return FileEngineError.ZipponDataError; - var new_writer = zid.DataWriter.init(new_path_buff, dir) catch return FileEngineError.ZipponDataError; - defer new_writer.deinit(); + var new_writer = zid.DataWriter.init(new_path, dir) catch |err| { + logErrorAndIncrementCount("Error initializing DataWriter", err, error_count); + return; + }; + defer new_writer.deinit(); - while (iter.next() catch return FileEngineError.ZipponDataError) |row| { - if (filter == null or filter.?.evaluate(row)) { - // Add the unchanged Data in the new_data_buff - new_data_buff[0] = row[0]; - for (try self.structName2structMembers(struct_name), 0..) |member, i| { - if (map.contains(member)) continue; - new_data_buff[i] = row[i]; - } - - new_writer.write(new_data_buff) catch return FileEngineError.WriteError; - writer.writeByte('{') catch return FileEngineError.WriteError; - writer.print("\"{s}\"", .{UUID.format_bytes(row[0].UUID)}) catch return FileEngineError.WriteError; - writer.writeAll("},") catch return FileEngineError.WriteError; - total_currently_found += 1; - } else { - new_writer.write(row) catch return FileEngineError.WriteError; + while (iter.next() catch return) |row| { + if (filter == null or filter.?.evaluate(row)) { + // Add the unchanged Data in the new_data_buff + new_data_buff[0] = row[0]; + for (file_engine.structName2structMembers(struct_name) catch return, 0..) |member, i| { + if (map.contains(member)) continue; + new_data_buff[i] = row[i]; } - if (additional_data.entity_count_to_find != 0 and total_currently_found >= additional_data.entity_count_to_find) break; - } + new_writer.write(new_data_buff) catch |err| { + logErrorAndIncrementCount("Error writing new data", err, error_count); + return; + }; - writer.writeByte(']') catch return FileEngineError.WriteError; - new_writer.flush() catch return FileEngineError.ZipponDataError; - dir.deleteFile(path_buff) catch return FileEngineError.DeleteFileError; - dir.rename(new_path_buff, path_buff) catch return FileEngineError.RenameFileError; - } + writer.writeByte('{') catch return; + writer.print("\"{s}\"", .{UUID.format_bytes(row[0].UUID)}) catch return; + writer.writeAll("},") catch return; - for (try self.structName2structMembers(struct_name), 1..) |member, i| { - if (!map.contains(member)) continue; - - switch (new_data_buff[i]) { - .IntArray => self.allocator.free(new_data_buff[i].IntArray), - .FloatArray => self.allocator.free(new_data_buff[i].FloatArray), - .UnixArray => self.allocator.free(new_data_buff[i].UnixArray), - .BoolArray => self.allocator.free(new_data_buff[i].BoolArray), - .StrArray => self.allocator.free(new_data_buff[i].StrArray), - .UUIDArray => self.allocator.free(new_data_buff[i].UUIDArray), - else => continue, + if (incrementAndCheckLimit(total_entity_updated, additional_data.entity_count_to_find)) break; + } else { + new_writer.write(row) catch |err| { + logErrorAndIncrementCount("Error writing unchanged data", err, error_count); + return; + }; } } + + new_writer.flush() catch |err| { + logErrorAndIncrementCount("Error flushing new writer", err, error_count); + return; + }; + + dir.deleteFile(path) catch |err| { + logErrorAndIncrementCount("Error deleting old file", err, error_count); + return; + }; + + dir.rename(new_path, path) catch |err| { + logErrorAndIncrementCount("Error renaming new file", err, error_count); + return; + }; + + _ = ended_count.fetchAdd(1, .acquire); } /// Will delete all entity based on the filter. Will also write a JSON format list of all UUID deleted into the buffer @@ -664,7 +738,7 @@ pub const FileEngine = struct { defer new_writer.deinit(); blk: while (iter.next() catch return FileEngineError.ZipponDataError) |row| { - if (filter != null) if (!filter.?.evaluate(row)) { + if (filter != null) if (filter.?.evaluate(row)) { writer.writeByte('{') catch return FileEngineError.WriteError; writer.print("\"{s}\"", .{UUID.format_bytes(row[0].UUID)}) catch return FileEngineError.WriteError; writer.writeAll("},") catch return FileEngineError.WriteError; diff --git a/src/ziqlParser.zig b/src/ziqlParser.zig index e75d4a0..516dfc7 100644 --- a/src/ziqlParser.zig +++ b/src/ziqlParser.zig @@ -1012,10 +1012,6 @@ test "ADD" { try testParsing("ADD User (name = 'Bob', email='bob@email.com', age=-55, scores=[ 1 ], friends=[], bday=2000/01/01, a_time=12:04:54.8741, last_order=2000/01/01-12:45)"); } -test "UPDATE" { - try testParsing("UPDATE User {name = 'Bob'} TO (email='new@gmail.com')"); -} - test "GRAB filter with string" { try testParsing("GRAB User {name = 'Bob'}"); try testParsing("GRAB User {name != 'Brittany Rogers'}"); @@ -1027,6 +1023,10 @@ test "GRAB with additional data" { try testParsing("GRAB User [100; name] {age < 18}"); } +test "UPDATE" { + try testParsing("UPDATE User {name = 'Bob'} TO (email='new@gmail.com')"); +} + test "GRAB filter with int" { try testParsing("GRAB User {age = 18}"); try testParsing("GRAB User {age > -18}");