Added multi threading to delete

This commit is contained in:
Adrien Bouvais 2024-11-06 16:04:07 +01:00
parent 65be757440
commit e5d0a122e1

View File

@ -718,45 +718,149 @@ pub const FileEngine = struct {
) ZipponError!void {
const sstruct = try self.structName2SchemaStruct(struct_name);
const max_file_index = try self.maxFileIndex(sstruct.name);
var total_currently_found: usize = 0;
const dir = try utils.printOpenDir("{s}/DATA/{s}", .{ self.path_to_ZipponDB_dir, sstruct.name }, .{});
writer.writeByte('[') catch return FileEngineError.WriteError;
for (0..(max_file_index + 1)) |file_index| { // TODO: Multi thread that
const path_buff = std.fmt.allocPrint(self.allocator, "{d}.zid", .{file_index}) catch return FileEngineError.MemoryError;
defer self.allocator.free(path_buff);
// Multi-threading setup
var total_entity_deleted = U64.init(0);
var ended_count = U64.init(0);
var error_count = U64.init(0);
var iter = zid.DataIterator.init(self.allocator, path_buff, dir, sstruct.zid_schema) catch return FileEngineError.ZipponDataError;
defer iter.deinit();
var thread_safe_arena: std.heap.ThreadSafeAllocator = .{
.child_allocator = self.allocator,
};
const arena = thread_safe_arena.allocator();
const new_path_buff = std.fmt.allocPrint(self.allocator, "{d}.zid.new", .{file_index}) catch return FileEngineError.MemoryError;
defer self.allocator.free(new_path_buff);
var thread_pool: Pool = undefined;
thread_pool.init(Pool.Options{
.allocator = arena,
.n_jobs = CPU_CORE,
}) catch return FileEngineError.ThreadError;
defer thread_pool.deinit();
zid.createFile(new_path_buff, dir) catch return FileEngineError.ZipponDataError;
var new_writer = zid.DataWriter.init(new_path_buff, dir) catch return FileEngineError.ZipponDataError;
defer new_writer.deinit();
blk: while (iter.next() catch return FileEngineError.ZipponDataError) |row| {
if (filter != null) if (filter.?.evaluate(row)) {
writer.writeByte('{') catch return FileEngineError.WriteError;
writer.print("\"{s}\"", .{UUID.format_bytes(row[0].UUID)}) catch return FileEngineError.WriteError;
writer.writeAll("},") catch return FileEngineError.WriteError;
total_currently_found += 1;
if (additional_data.entity_count_to_find != 0 and total_currently_found >= additional_data.entity_count_to_find) break :blk;
} else {
new_writer.write(row) catch return FileEngineError.WriteError;
};
}
new_writer.flush() catch return FileEngineError.ZipponDataError;
dir.deleteFile(path_buff) catch return FileEngineError.DeleteFileError;
dir.rename(new_path_buff, path_buff) catch return FileEngineError.RenameFileError;
// Create a thread-safe writer for each file
var thread_writer_list = self.allocator.alloc(std.ArrayList(u8), max_file_index + 1) catch return FileEngineError.MemoryError;
defer {
for (thread_writer_list) |list| list.deinit();
self.allocator.free(thread_writer_list);
}
for (thread_writer_list) |*list| {
list.* = std.ArrayList(u8).init(self.allocator);
}
// Spawn threads for each file
for (0..(max_file_index + 1)) |file_index| {
thread_pool.spawn(deleteEntitiesOneFile, .{
self,
struct_name,
filter,
thread_writer_list[file_index].writer(),
additional_data,
file_index,
dir,
&total_entity_deleted,
&ended_count,
&error_count,
}) catch return FileEngineError.ThreadError;
}
// Wait for all threads to complete
while ((ended_count.load(.acquire) + error_count.load(.acquire)) < max_file_index + 1) {
std.time.sleep(10_000_000); // Check every 10ms
}
// Combine results
writer.writeByte('[') catch return FileEngineError.WriteError;
for (thread_writer_list) |list| {
writer.writeAll(list.items) catch return FileEngineError.WriteError;
}
writer.writeByte(']') catch return FileEngineError.WriteError;
}
fn deleteEntitiesOneFile(
file_engine: *FileEngine,
struct_name: []const u8,
filter: ?Filter,
writer: anytype,
additional_data: *AdditionalData,
file_index: u64,
dir: std.fs.Dir,
total_entity_deleted: *U64,
ended_count: *U64,
error_count: *U64,
) void {
var data_buffer: [BUFFER_SIZE]u8 = undefined;
var fa = std.heap.FixedBufferAllocator.init(&data_buffer);
defer fa.reset();
const allocator = fa.allocator();
const sstruct = file_engine.structName2SchemaStruct(struct_name) catch |err| {
logErrorAndIncrementCount("Error getting schema struct", err, error_count);
return;
};
var path_buffer: [128]u8 = undefined;
const path = std.fmt.bufPrint(&path_buffer, "{d}.zid", .{file_index}) catch |err| {
logErrorAndIncrementCount("Error creating file path", err, error_count);
return;
};
var iter = zid.DataIterator.init(allocator, path, dir, sstruct.zid_schema) catch |err| {
logErrorAndIncrementCount("Error initializing DataIterator", err, error_count);
return;
};
defer iter.deinit();
const new_path = std.fmt.allocPrint(allocator, "{d}.zid.new", .{file_index}) catch |err| {
logErrorAndIncrementCount("Error creating new file path", err, error_count);
return;
};
defer allocator.free(new_path);
zid.createFile(new_path, dir) catch |err| {
logErrorAndIncrementCount("Error creating new file", err, error_count);
return;
};
var new_writer = zid.DataWriter.init(new_path, dir) catch |err| {
logErrorAndIncrementCount("Error initializing DataWriter", err, error_count);
return;
};
defer new_writer.deinit();
while (iter.next() catch return) |row| {
if (filter == null or filter.?.evaluate(row)) {
writer.writeByte('{') catch return;
writer.print("\"{s}\"", .{UUID.format_bytes(row[0].UUID)}) catch return;
writer.writeAll("},") catch return;
if (incrementAndCheckLimit(total_entity_deleted, additional_data.entity_count_to_find)) break;
} else {
new_writer.write(row) catch |err| {
logErrorAndIncrementCount("Error writing unchanged data", err, error_count);
return;
};
}
}
new_writer.flush() catch |err| {
logErrorAndIncrementCount("Error flushing new writer", err, error_count);
return;
};
dir.deleteFile(path) catch |err| {
logErrorAndIncrementCount("Error deleting old file", err, error_count);
return;
};
dir.rename(new_path, path) catch |err| {
logErrorAndIncrementCount("Error renaming new file", err, error_count);
return;
};
_ = ended_count.fetchAdd(1, .acquire);
}
// --------------------ZipponData utils--------------------
fn string2Data(allocator: Allocator, dt: DataType, value: []const u8) ZipponError!zid.Data {