Removed max_file from SyncContext to use work group

Now instead of create a context with the number of file to parse and
check every x millisecond if it is complete, I use work group. For each
file, I spawn a thread in the group and then I call wg.wait()
This commit is contained in:
Adrien Bouvais 2025-01-22 18:24:02 +01:00
parent 7c34431702
commit f05696a924
5 changed files with 65 additions and 151 deletions

View File

@ -7,7 +7,6 @@ const ZipponError = @import("error").ZipponError;
const log = std.log.scoped(.fileEngine);
var path_to_ZipponDB_dir_buffer: [1024]u8 = undefined;
pub var data_buffer: [config.BUFFER_SIZE]u8 = undefined;
/// Manage everything that is relate to read or write in files
/// Or even get stats, whatever. If it touch files, it's here

View File

@ -23,8 +23,6 @@ const log = std.log.scoped(.fileEngine);
const Self = @import("core.zig").Self;
var path_buffer: [1024]u8 = undefined;
/// Use a struct name to populate a list with all UUID of this struct
/// TODO: Multi thread that too
pub fn getNumberOfEntityAndFile(self: *Self, struct_name: []const u8) ZipponError!struct { entity: usize, file: usize } {
@ -49,12 +47,6 @@ pub fn populateFileIndexUUIDMap(
const dir = try self.printOpenDir("{s}/DATA/{s}", .{ self.path_to_ZipponDB_dir, sstruct.name }, .{});
const to_parse = try self.allFileIndex(allocator, sstruct.name);
// Multi-threading setup
var sync_context = ThreadSyncContext.init(
0,
to_parse.len,
);
// Create a thread-safe writer for each file
var thread_writer_list = allocator.alloc(std.ArrayList(UUID), to_parse.len) catch return ZipponError.MemoryError;
defer {
@ -67,18 +59,14 @@ pub fn populateFileIndexUUIDMap(
}
// Spawn threads for each file
for (to_parse, 0..) |file_index, i| {
self.thread_pool.spawn(populateFileIndexUUIDMapOneFile, .{
sstruct,
&thread_writer_list[i],
file_index,
dir,
&sync_context,
}) catch return ZipponError.ThreadError;
}
// Wait for all threads to complete
while (!sync_context.isComplete()) std.time.sleep(10_000_000);
var wg: std.Thread.WaitGroup = .{};
for (to_parse, 0..) |file_index, i| self.thread_pool.spawnWg(&wg, populateFileIndexUUIDMapOneFile, .{
sstruct,
&thread_writer_list[i],
file_index,
dir,
});
wg.wait();
// Combine results
for (thread_writer_list, 0..) |list, file_index| {
@ -91,35 +79,20 @@ fn populateFileIndexUUIDMapOneFile(
list: *std.ArrayList(UUID),
file_index: u64,
dir: std.fs.Dir,
sync_context: *ThreadSyncContext,
) void {
var path_buffer: [1024 * 10]u8 = undefined;
var data_buffer: [config.BUFFER_SIZE]u8 = undefined;
var fa = std.heap.FixedBufferAllocator.init(&data_buffer);
defer fa.reset();
const allocator = fa.allocator();
const path = std.fmt.bufPrint(&path_buffer, "{d}.zid", .{file_index}) catch |err| {
sync_context.logError("Error creating file path", err);
return;
};
var iter = zid.DataIterator.init(allocator, path, dir, sstruct.zid_schema) catch |err| {
sync_context.logError("Error initializing DataIterator", err);
return;
};
const path = std.fmt.bufPrint(&path_buffer, "{d}.zid", .{file_index}) catch return;
var iter = zid.DataIterator.init(allocator, path, dir, sstruct.zid_schema) catch return;
defer iter.deinit();
while (iter.next() catch |err| {
sync_context.logError("Error initializing DataIterator", err);
return;
}) |row| {
list.*.append(UUID{ .bytes = row[0].UUID }) catch |err| {
sync_context.logError("Error initializing DataIterator", err);
return;
};
while (iter.next() catch return) |row| {
list.*.append(UUID{ .bytes = row[0].UUID }) catch return;
}
_ = sync_context.completeThread();
}
/// Use a struct name and filter to populate a map with all UUID bytes as key and void as value
@ -141,10 +114,7 @@ pub fn populateVoidUUIDMap(
const to_parse = try self.allFileIndex(allocator, sstruct.name);
// Multi-threading setup
var sync_context = ThreadSyncContext.init(
additional_data.limit,
to_parse.len,
);
var sync_context = ThreadSyncContext.init(additional_data.limit);
// Create a thread-safe writer for each file
var thread_writer_list = allocator.alloc(std.ArrayList(UUID), to_parse.len + 1) catch return ZipponError.MemoryError;
@ -154,19 +124,20 @@ pub fn populateVoidUUIDMap(
}
// Spawn threads for each file
for (to_parse, 0..) |file_index, i| {
self.thread_pool.spawn(populateVoidUUIDMapOneFile, .{
var wg: std.Thread.WaitGroup = .{};
for (to_parse, 0..) |file_index, i| self.thread_pool.spawnWg(
&wg,
populateVoidUUIDMapOneFile,
.{
sstruct,
filter,
&thread_writer_list[i],
file_index,
dir,
&sync_context,
}) catch return ZipponError.ThreadError;
}
// Wait for all threads to complete
while (!sync_context.isComplete()) std.time.sleep(10_000_000);
},
);
wg.wait();
// Combine results
for (thread_writer_list) |list| {
@ -192,6 +163,7 @@ fn populateVoidUUIDMapOneFile(
dir: std.fs.Dir,
sync_context: *ThreadSyncContext,
) void {
var path_buffer: [1024 * 10]u8 = undefined;
var data_buffer: [config.BUFFER_SIZE]u8 = undefined;
var fa = std.heap.FixedBufferAllocator.init(&data_buffer);
defer fa.reset();
@ -222,8 +194,6 @@ fn populateVoidUUIDMapOneFile(
if (sync_context.incrementAndCheckStructLimit()) break;
}
}
_ = sync_context.completeThread();
}
/// Take a filter, parse all file and if one struct if validate by the filter, write it in a JSON format to the writer
@ -257,23 +227,16 @@ pub fn parseEntities(
const dir = try self.printOpenDir("{s}/DATA/{s}", .{ self.path_to_ZipponDB_dir, sstruct.name }, .{ .access_sub_paths = false });
// Multi thread stuffs
var sync_context = ThreadSyncContext.init(
additional_data.limit,
to_parse.len,
);
var sync_context = ThreadSyncContext.init(additional_data.limit);
// Do an array of writer for each thread
// Could I create just the number of max cpu ? Because if I have 1000 files, I do 1000 list
// But at the end, only the number of use CPU/Thread will use list simultanously
// So I could pass list from a thread to another technicly
var thread_writer_list = allocator.alloc(std.ArrayList(u8), to_parse.len) catch return ZipponError.MemoryError;
const data_types = try self.schema_engine.structName2DataType(struct_name);
// Start parsing all file in multiple thread
var wg: std.Thread.WaitGroup = .{};
for (to_parse, 0..) |file_index, i| {
thread_writer_list[file_index] = std.ArrayList(u8).init(allocator);
thread_writer_list[i] = std.ArrayList(u8).init(allocator);
self.thread_pool.spawnWg(
&wg,
parseEntitiesOneFile,
@ -320,6 +283,7 @@ fn parseEntitiesOneFile(
data_types: []const DataType,
sync_context: *ThreadSyncContext,
) void {
var path_buffer: [1024 * 10]u8 = undefined;
var data_buffer: [config.BUFFER_SIZE]u8 = undefined;
var fa = std.heap.FixedBufferAllocator.init(&data_buffer);
defer fa.reset();
@ -389,10 +353,7 @@ pub fn parseEntitiesRelationMap(
);
// Multi thread stuffs
var sync_context = ThreadSyncContext.init(
relation_map.additional_data.limit,
to_parse.len,
);
var sync_context = ThreadSyncContext.init(relation_map.additional_data.limit);
// Do one writer for each thread otherwise it create error by writing at the same time
var thread_map_list = allocator.alloc(
@ -404,7 +365,6 @@ pub fn parseEntitiesRelationMap(
var wg: std.Thread.WaitGroup = .{};
for (to_parse, 0..) |file_index, i| {
thread_map_list[i] = relation_map.map.cloneWithAllocator(allocator) catch return ZipponError.MemoryError;
self.thread_pool.spawnWg(
&wg,
parseEntitiesRelationMapOneFile,
@ -455,6 +415,7 @@ fn parseEntitiesRelationMapOneFile(
data_types: []const DataType,
sync_context: *ThreadSyncContext,
) void {
var path_buffer: [1024 * 10]u8 = undefined;
var data_buffer: [config.BUFFER_SIZE]u8 = undefined;
var fa = std.heap.FixedBufferAllocator.init(&data_buffer);
defer fa.reset();

View File

@ -84,10 +84,7 @@ pub fn updateEntities(
const to_parse = try self.allFileIndex(allocator, struct_name);
// Multi-threading setup
var sync_context = ThreadSyncContext.init(
additional_data.limit,
to_parse.len,
);
var sync_context = ThreadSyncContext.init(additional_data.limit);
// Create a thread-safe writer for each file
var thread_writer_list = allocator.alloc(std.ArrayList(u8), to_parse.len) catch return ZipponError.MemoryError;
@ -115,8 +112,11 @@ pub fn updateEntities(
}
// Spawn threads for each file
for (to_parse, 0..) |file_index, i| {
self.thread_pool.spawn(updateEntitiesOneFile, .{
var wg: std.Thread.WaitGroup = .{};
for (to_parse, 0..) |file_index, i| self.thread_pool.spawnWg(
&wg,
updateEntitiesOneFile,
.{
sstruct,
filter,
map,
@ -125,11 +125,9 @@ pub fn updateEntities(
file_index,
dir,
&sync_context,
}) catch return ZipponError.ThreadError;
}
// Wait for all threads to complete
while (!sync_context.isComplete()) std.time.sleep(100_000); // Check every 0.1ms
},
);
wg.wait();
// Combine results
writer.writeByte('[') catch return ZipponError.WriteError;
@ -162,55 +160,35 @@ fn updateEntitiesOneFile(
// First I fill the one that are updated by a const
for (index_switch, 0..) |is, i| switch (is) {
.fix => new_data_buff[i] = @import("utils.zig").string2Data(allocator, map.get(sstruct.members[i]).?.value) catch |err| {
sync_context.logError("Writting data", err);
return;
},
.fix => new_data_buff[i] = @import("utils.zig").string2Data(allocator, map.get(sstruct.members[i]).?.value) catch return,
else => {},
};
const path = std.fmt.bufPrint(&path_buffer, "{d}.zid", .{file_index}) catch |err| {
sync_context.logError("Error creating file path", err);
return;
};
const path = std.fmt.bufPrint(&path_buffer, "{d}.zid", .{file_index}) catch return;
var iter = zid.DataIterator.init(allocator, path, dir, sstruct.zid_schema) catch |err| {
sync_context.logError("Error initializing DataIterator", err);
return;
};
var iter = zid.DataIterator.init(allocator, path, dir, sstruct.zid_schema) catch return;
defer iter.deinit();
const new_path = std.fmt.allocPrint(allocator, "{d}.zid.new", .{file_index}) catch |err| {
sync_context.logError("Error creating new file path", err);
return;
};
const new_path = std.fmt.allocPrint(allocator, "{d}.zid.new", .{file_index}) catch return;
defer allocator.free(new_path);
zid.createFile(new_path, dir) catch |err| {
sync_context.logError("Error creating new file", err);
return;
};
zid.createFile(new_path, dir) catch return;
var new_writer = zid.DataWriter.init(new_path, dir) catch |err| {
sync_context.logError("Error initializing DataWriter", err);
var new_writer = zid.DataWriter.init(new_path, dir) catch {
zid.deleteFile(new_path, dir) catch {};
return;
};
defer new_writer.deinit();
var finish_writing = false;
while (iter.next() catch |err| {
sync_context.logError("Parsing files", err);
return;
}) |row| {
while (iter.next() catch return) |row| {
if (!finish_writing and (filter == null or filter.?.evaluate(row))) {
// Add the unchanged Data in the new_data_buff
for (index_switch, 0..) |is, i| switch (is) {
.stay => new_data_buff[i] = row[i],
.vari => {
const x = map.get(sstruct.members[i]).?.array;
updateData(allocator, x.condition, &row[i], x.data) catch |err| {
sync_context.logError("Error updating data", err);
updateData(allocator, x.condition, &row[i], x.data) catch {
zid.deleteFile(new_path, dir) catch {};
return;
};
@ -220,45 +198,28 @@ fn updateEntitiesOneFile(
log.debug("{d} {any}\n\n", .{ new_data_buff.len, new_data_buff });
new_writer.write(new_data_buff) catch |err| {
sync_context.logError("Error initializing DataWriter", err);
new_writer.write(new_data_buff) catch {
zid.deleteFile(new_path, dir) catch {};
return;
};
writer.print("\"{s}\",", .{UUID.format_bytes(row[0].UUID)}) catch |err| {
sync_context.logError("Error initializing DataWriter", err);
writer.print("\"{s}\",", .{UUID.format_bytes(row[0].UUID)}) catch {
zid.deleteFile(new_path, dir) catch {};
return;
};
finish_writing = sync_context.incrementAndCheckStructLimit();
} else {
new_writer.write(row) catch |err| {
sync_context.logError("Error initializing DataWriter", err);
new_writer.write(row) catch {
zid.deleteFile(new_path, dir) catch {};
return;
};
}
}
new_writer.flush() catch |err| {
sync_context.logError("Error initializing DataWriter", err);
zid.deleteFile(new_path, dir) catch {};
return;
};
dir.deleteFile(path) catch |err| {
sync_context.logError("Error deleting old file", err);
return;
};
dir.rename(new_path, path) catch |err| {
sync_context.logError("Error initializing DataWriter", err);
return;
};
_ = sync_context.completeThread();
new_writer.flush() catch return;
dir.deleteFile(path) catch return;
dir.rename(new_path, path) catch return;
}
/// Delete all entity based on the filter. Will also write a JSON format list of all UUID deleted into the buffer
@ -279,10 +240,7 @@ pub fn deleteEntities(
const to_parse = try self.allFileIndex(allocator, struct_name);
// Multi-threading setup
var sync_context = ThreadSyncContext.init(
additional_data.limit,
to_parse.len,
);
var sync_context = ThreadSyncContext.init(additional_data.limit);
// Create a thread-safe writer for each file
var thread_writer_list = allocator.alloc(std.ArrayList(u8), to_parse.len) catch return ZipponError.MemoryError;
@ -291,19 +249,20 @@ pub fn deleteEntities(
}
// Spawn threads for each file
for (to_parse, 0..) |file_index, i| {
self.thread_pool.spawn(deleteEntitiesOneFile, .{
var wg: std.Thread.WaitGroup = .{};
for (to_parse, 0..) |file_index, i| self.thread_pool.spawnWg(
&wg,
deleteEntitiesOneFile,
.{
sstruct,
filter,
thread_writer_list[i].writer(),
file_index,
dir,
&sync_context,
}) catch return ZipponError.ThreadError;
}
// Wait for all threads to complete
while (!sync_context.isComplete()) std.time.sleep(100_000); // Check every 0.1ms
},
);
wg.wait();
// Combine results
writer.writeByte('[') catch return ZipponError.WriteError;

View File

@ -10,19 +10,13 @@ processed_struct: U64 = U64.init(0),
error_file: U64 = U64.init(0),
completed_file: U64 = U64.init(0),
max_struct: u64,
max_file: u64,
pub fn init(max_struct: u64, max_file: u64) Self {
pub fn init(max_struct: u64) Self {
return Self{
.max_struct = max_struct,
.max_file = max_file,
};
}
pub fn isComplete(self: *Self) bool {
return (self.completed_file.load(.acquire) + self.error_file.load(.acquire)) >= self.max_file;
}
pub fn completeThread(self: *Self) void {
_ = self.completed_file.fetchAdd(1, .release);
}

View File

@ -17,14 +17,15 @@ pub fn init(allocator: std.mem.Allocator) !ThreadEngine {
.child_allocator = allocator,
};
const cpu_core = if (CPU_CORE == 0) try std.Thread.getCpuCount() else CPU_CORE;
const cpu_core = if (CPU_CORE == 0) std.Thread.getCpuCount() catch 1 else CPU_CORE;
log.info("Using {d} cpu core", .{cpu_core});
log.info("Using {d}Mb stack size", .{std.Thread.SpawnConfig.default_stack_size / 1024 / 1024});
const thread_pool = try allocator.create(std.Thread.Pool);
thread_pool.init(std.Thread.Pool.Options{
try thread_pool.init(std.Thread.Pool.Options{
.allocator = thread_arena.allocator(),
.n_jobs = cpu_core,
}) catch @panic("=(");
});
return ThreadEngine{
.thread_pool = thread_pool,