Added multithreading to update

This commit is contained in:
Adrien Bouvais 2024-11-06 16:01:55 +01:00
parent fbbeb2f40d
commit 65be757440
2 changed files with 174 additions and 100 deletions

View File

@ -38,6 +38,8 @@ const CPU_CORE = config.CPU_CORE;
const log = std.log.scoped(.fileEngine);
const FileEngineState = enum { MissingPath, MissingSchema, Ok };
/// Manage everything that is relate to read or write in files
/// Or even get stats, whatever. If it touch files, it's here
/// TODO: Keep all struct dir in a haspmap so I dont need to use an allocPrint everytime
@ -46,14 +48,13 @@ pub const FileEngine = struct {
path_to_ZipponDB_dir: []const u8,
null_terminated_schema_buff: [:0]u8,
struct_array: []SchemaStruct,
state: FileEngineState,
pub fn init(allocator: Allocator, path: []const u8) ZipponError!FileEngine {
const path_to_ZipponDB_dir = path;
var schema_buf = allocator.alloc(u8, BUFFER_SIZE) catch return FileEngineError.MemoryError; // TODO: Use a list
defer allocator.free(schema_buf);
const len: usize = FileEngine.readSchemaFile(path_to_ZipponDB_dir, schema_buf) catch 0;
const len: usize = FileEngine.readSchemaFile(path, schema_buf) catch 0;
const null_terminated_schema_buff = allocator.dupeZ(u8, schema_buf[0..len]) catch return FileEngineError.MemoryError;
errdefer allocator.free(null_terminated_schema_buff);
@ -63,15 +64,17 @@ pub const FileEngine = struct {
var struct_array = std.ArrayList(SchemaStruct).init(allocator);
parser.parse(&struct_array) catch return FileEngineError.SchemaNotConform;
var state: FileEngineState = .Ok;
if (len == 0) state = .MissingSchema;
if (std.mem.eql(u8, "", path)) state = .MissingPath;
return FileEngine{
.allocator = allocator,
.path_to_ZipponDB_dir = path_to_ZipponDB_dir,
.path_to_ZipponDB_dir = path,
.null_terminated_schema_buff = null_terminated_schema_buff,
.struct_array = struct_array.toOwnedSlice() catch return FileEngineError.MemoryError,
.state = state,
};
// try file_engine.populateAllUUIDToFileIndexMap();
}
pub fn deinit(self: *FileEngine) void {
@ -82,34 +85,7 @@ pub const FileEngine = struct {
}
pub fn usable(self: FileEngine) bool {
return !std.mem.eql(u8, "", self.path_to_ZipponDB_dir);
}
// For all struct in shema, add the UUID/index_file into the map
pub fn populateAllUUIDToFileIndexMap(self: *FileEngine) ZipponError!void {
for (self.struct_array) |*sstruct| { // Stand for schema struct
const max_file_index = try self.maxFileIndex(sstruct.name);
var path_buff = std.fmt.allocPrint(
self.allocator,
"{s}/DATA/{s}",
.{ self.path_to_ZipponDB_dir, sstruct.name },
) catch return FileEngineError.MemoryError;
defer self.allocator.free(path_buff);
const dir = std.fs.cwd().openDir(path_buff, .{}) catch return FileEngineError.CantOpenDir;
for (0..(max_file_index + 1)) |i| {
self.allocator.free(path_buff);
path_buff = std.fmt.allocPrint(self.allocator, "{d}.zid", .{i}) catch return FileEngineError.MemoryError;
var iter = zid.DataIterator.init(self.allocator, path_buff, dir, sstruct.zid_schema) catch return FileEngineError.ZipponDataError;
defer iter.deinit();
while (iter.next() catch return FileEngineError.ZipponDataError) |row| {
sstruct.uuid_file_index.put(row[0].UUID, i) catch return FileEngineError.MemoryError;
}
}
}
return self.state == .Ok;
}
// --------------------Other--------------------
@ -341,6 +317,14 @@ pub const FileEngine = struct {
var ended_count: U64 = U64.init(0);
var error_count: U64 = U64.init(0);
// Do one array and writer for each thread otherwise then create error by writing at the same time
// Maybe use fixed lenght buffer for speed here
var thread_writer_list = self.allocator.alloc(std.ArrayList(u8), max_file_index + 1) catch return FileEngineError.MemoryError;
defer {
for (thread_writer_list) |list| list.deinit();
self.allocator.free(thread_writer_list);
}
var thread_safe_arena: std.heap.ThreadSafeAllocator = .{
.child_allocator = self.allocator,
};
@ -354,14 +338,6 @@ pub const FileEngine = struct {
}) catch return FileEngineError.ThreadError;
defer thread_pool.deinit();
// Do one array and writer for each thread otherwise then create error by writing at the same time
// Maybe use fixed lenght buffer for speed here
var thread_writer_list = self.allocator.alloc(std.ArrayList(u8), max_file_index + 1) catch return FileEngineError.MemoryError;
defer {
for (thread_writer_list) |list| list.deinit();
self.allocator.free(thread_writer_list);
}
// Maybe do one buffer per files ?
var data_buffer: [BUFFER_SIZE]u8 = undefined;
var fa = std.heap.FixedBufferAllocator.init(&data_buffer);
@ -560,78 +536,176 @@ pub const FileEngine = struct {
) ZipponError!void {
const sstruct = try self.structName2SchemaStruct(struct_name);
const max_file_index = try self.maxFileIndex(sstruct.name);
var total_currently_found: usize = 0;
const dir = try utils.printOpenDir("{s}/DATA/{s}", .{ self.path_to_ZipponDB_dir, sstruct.name }, .{});
var new_data_buff = self.allocator.alloc(zid.Data, try self.numberOfMemberInStruct(struct_name)) catch return FileEngineError.MemoryError;
defer self.allocator.free(new_data_buff);
// Multi-threading setup
var total_entity_updated = U64.init(0);
var ended_count = U64.init(0);
var error_count = U64.init(0);
// Add the new data
for (try self.structName2structMembers(struct_name), 0..) |member, i| {
if (!map.contains(member)) continue;
var thread_safe_arena: std.heap.ThreadSafeAllocator = .{
.child_allocator = self.allocator,
};
const arena = thread_safe_arena.allocator();
const dt = try self.memberName2DataType(struct_name, member);
new_data_buff[i] = try string2Data(self.allocator, dt, map.get(member).?);
var thread_pool: Pool = undefined;
thread_pool.init(Pool.Options{
.allocator = arena,
.n_jobs = CPU_CORE,
}) catch return FileEngineError.ThreadError;
defer thread_pool.deinit();
// Create a thread-safe writer for each file
var thread_writer_list = self.allocator.alloc(std.ArrayList(u8), max_file_index + 1) catch return FileEngineError.MemoryError;
defer {
for (thread_writer_list) |list| list.deinit();
self.allocator.free(thread_writer_list);
}
for (thread_writer_list) |*list| {
list.* = std.ArrayList(u8).init(self.allocator);
}
// Spawn threads for each file
for (0..(max_file_index + 1)) |file_index| {
thread_pool.spawn(updateEntitiesOneFile, .{
self,
struct_name,
filter,
&map,
thread_writer_list[file_index].writer(),
additional_data,
file_index,
dir,
&total_entity_updated,
&ended_count,
&error_count,
}) catch return FileEngineError.ThreadError;
}
// Wait for all threads to complete
while ((ended_count.load(.acquire) + error_count.load(.acquire)) < max_file_index + 1) {
std.time.sleep(10_000_000); // Check every 10ms
}
// Combine results
writer.writeByte('[') catch return FileEngineError.WriteError;
for (0..(max_file_index + 1)) |file_index| { // TODO: Multi thread that
if (additional_data.entity_count_to_find != 0 and total_currently_found >= additional_data.entity_count_to_find) break;
for (thread_writer_list) |list| {
writer.writeAll(list.items) catch return FileEngineError.WriteError;
}
writer.writeByte(']') catch return FileEngineError.WriteError;
}
const path_buff = std.fmt.allocPrint(self.allocator, "{d}.zid", .{file_index}) catch return FileEngineError.MemoryError;
defer self.allocator.free(path_buff);
fn updateEntitiesOneFile(
file_engine: *FileEngine,
struct_name: []const u8,
filter: ?Filter,
map: *const std.StringHashMap([]const u8),
writer: anytype,
additional_data: *AdditionalData,
file_index: u64,
dir: std.fs.Dir,
total_entity_updated: *U64,
ended_count: *U64,
error_count: *U64,
) void {
var data_buffer: [BUFFER_SIZE]u8 = undefined;
var fa = std.heap.FixedBufferAllocator.init(&data_buffer);
defer fa.reset();
const allocator = fa.allocator();
var iter = zid.DataIterator.init(self.allocator, path_buff, dir, sstruct.zid_schema) catch return FileEngineError.ZipponDataError;
const sstruct = file_engine.structName2SchemaStruct(struct_name) catch |err| {
logErrorAndIncrementCount("Error getting schema struct", err, error_count);
return;
};
var new_data_buff = allocator.alloc(zid.Data, file_engine.numberOfMemberInStruct(struct_name) catch return) catch {
logErrorAndIncrementCount("Memory allocation error", error.OutOfMemory, error_count);
return;
};
defer allocator.free(new_data_buff);
// Add the new data
for (file_engine.structName2structMembers(struct_name) catch return, 0..) |member, i| {
if (!map.contains(member)) continue;
const dt = file_engine.memberName2DataType(struct_name, member) catch return;
new_data_buff[i] = string2Data(allocator, dt, map.get(member).?) catch return;
}
var path_buffer: [128]u8 = undefined;
const path = std.fmt.bufPrint(&path_buffer, "{d}.zid", .{file_index}) catch |err| {
logErrorAndIncrementCount("Error creating file path", err, error_count);
return;
};
var iter = zid.DataIterator.init(allocator, path, dir, sstruct.zid_schema) catch |err| {
logErrorAndIncrementCount("Error initializing DataIterator", err, error_count);
return;
};
defer iter.deinit();
const new_path_buff = std.fmt.allocPrint(self.allocator, "{d}.zid.new", .{file_index}) catch return FileEngineError.MemoryError;
defer self.allocator.free(new_path_buff);
const new_path = std.fmt.allocPrint(allocator, "{d}.zid.new", .{file_index}) catch |err| {
logErrorAndIncrementCount("Error creating new file path", err, error_count);
return;
};
defer allocator.free(new_path);
zid.createFile(new_path_buff, dir) catch return FileEngineError.ZipponDataError;
var new_writer = zid.DataWriter.init(new_path_buff, dir) catch return FileEngineError.ZipponDataError;
zid.createFile(new_path, dir) catch |err| {
logErrorAndIncrementCount("Error creating new file", err, error_count);
return;
};
var new_writer = zid.DataWriter.init(new_path, dir) catch |err| {
logErrorAndIncrementCount("Error initializing DataWriter", err, error_count);
return;
};
defer new_writer.deinit();
while (iter.next() catch return FileEngineError.ZipponDataError) |row| {
while (iter.next() catch return) |row| {
if (filter == null or filter.?.evaluate(row)) {
// Add the unchanged Data in the new_data_buff
new_data_buff[0] = row[0];
for (try self.structName2structMembers(struct_name), 0..) |member, i| {
for (file_engine.structName2structMembers(struct_name) catch return, 0..) |member, i| {
if (map.contains(member)) continue;
new_data_buff[i] = row[i];
}
new_writer.write(new_data_buff) catch return FileEngineError.WriteError;
writer.writeByte('{') catch return FileEngineError.WriteError;
writer.print("\"{s}\"", .{UUID.format_bytes(row[0].UUID)}) catch return FileEngineError.WriteError;
writer.writeAll("},") catch return FileEngineError.WriteError;
total_currently_found += 1;
new_writer.write(new_data_buff) catch |err| {
logErrorAndIncrementCount("Error writing new data", err, error_count);
return;
};
writer.writeByte('{') catch return;
writer.print("\"{s}\"", .{UUID.format_bytes(row[0].UUID)}) catch return;
writer.writeAll("},") catch return;
if (incrementAndCheckLimit(total_entity_updated, additional_data.entity_count_to_find)) break;
} else {
new_writer.write(row) catch return FileEngineError.WriteError;
new_writer.write(row) catch |err| {
logErrorAndIncrementCount("Error writing unchanged data", err, error_count);
return;
};
}
}
if (additional_data.entity_count_to_find != 0 and total_currently_found >= additional_data.entity_count_to_find) break;
}
new_writer.flush() catch |err| {
logErrorAndIncrementCount("Error flushing new writer", err, error_count);
return;
};
writer.writeByte(']') catch return FileEngineError.WriteError;
new_writer.flush() catch return FileEngineError.ZipponDataError;
dir.deleteFile(path_buff) catch return FileEngineError.DeleteFileError;
dir.rename(new_path_buff, path_buff) catch return FileEngineError.RenameFileError;
}
dir.deleteFile(path) catch |err| {
logErrorAndIncrementCount("Error deleting old file", err, error_count);
return;
};
for (try self.structName2structMembers(struct_name), 1..) |member, i| {
if (!map.contains(member)) continue;
dir.rename(new_path, path) catch |err| {
logErrorAndIncrementCount("Error renaming new file", err, error_count);
return;
};
switch (new_data_buff[i]) {
.IntArray => self.allocator.free(new_data_buff[i].IntArray),
.FloatArray => self.allocator.free(new_data_buff[i].FloatArray),
.UnixArray => self.allocator.free(new_data_buff[i].UnixArray),
.BoolArray => self.allocator.free(new_data_buff[i].BoolArray),
.StrArray => self.allocator.free(new_data_buff[i].StrArray),
.UUIDArray => self.allocator.free(new_data_buff[i].UUIDArray),
else => continue,
}
}
_ = ended_count.fetchAdd(1, .acquire);
}
/// Will delete all entity based on the filter. Will also write a JSON format list of all UUID deleted into the buffer
@ -664,7 +738,7 @@ pub const FileEngine = struct {
defer new_writer.deinit();
blk: while (iter.next() catch return FileEngineError.ZipponDataError) |row| {
if (filter != null) if (!filter.?.evaluate(row)) {
if (filter != null) if (filter.?.evaluate(row)) {
writer.writeByte('{') catch return FileEngineError.WriteError;
writer.print("\"{s}\"", .{UUID.format_bytes(row[0].UUID)}) catch return FileEngineError.WriteError;
writer.writeAll("},") catch return FileEngineError.WriteError;

View File

@ -1012,10 +1012,6 @@ test "ADD" {
try testParsing("ADD User (name = 'Bob', email='bob@email.com', age=-55, scores=[ 1 ], friends=[], bday=2000/01/01, a_time=12:04:54.8741, last_order=2000/01/01-12:45)");
}
test "UPDATE" {
try testParsing("UPDATE User {name = 'Bob'} TO (email='new@gmail.com')");
}
test "GRAB filter with string" {
try testParsing("GRAB User {name = 'Bob'}");
try testParsing("GRAB User {name != 'Brittany Rogers'}");
@ -1027,6 +1023,10 @@ test "GRAB with additional data" {
try testParsing("GRAB User [100; name] {age < 18}");
}
test "UPDATE" {
try testParsing("UPDATE User {name = 'Bob'} TO (email='new@gmail.com')");
}
test "GRAB filter with int" {
try testParsing("GRAB User {age = 18}");
try testParsing("GRAB User {age > -18}");