Created a ThreadSyncContext
This keep the atomic value that are shared by thread
This commit is contained in:
parent
268010fe37
commit
f5d93c94f6
@ -30,6 +30,40 @@ const log = std.log.scoped(.fileEngine);
|
||||
// TODO: Start using State at the start and end of each function for debugging
|
||||
const FileEngineState = enum { Parsing, Waiting };
|
||||
|
||||
const ThreadSyncContext = struct {
|
||||
processed_struct: std.atomic.Value(u64) = std.atomic.Value(u64).init(0),
|
||||
error_file: std.atomic.Value(u64) = std.atomic.Value(u64).init(0),
|
||||
completed_file: std.atomic.Value(u64) = std.atomic.Value(u64).init(0),
|
||||
max_struct: u64,
|
||||
max_file: u64,
|
||||
|
||||
fn init(max_struct: u64, max_file: u64) ThreadSyncContext {
|
||||
return ThreadSyncContext{
|
||||
.max_struct = max_struct,
|
||||
.max_file = max_file,
|
||||
};
|
||||
}
|
||||
|
||||
fn isComplete(self: *ThreadSyncContext) bool {
|
||||
return (self.completed_file.load(.acquire) + self.error_file.load(.acquire)) >= self.max_file;
|
||||
}
|
||||
|
||||
fn completeThread(self: *ThreadSyncContext) void {
|
||||
_ = self.completed_file.fetchAdd(1, .release);
|
||||
}
|
||||
|
||||
fn incrementAndCheckStructLimit(self: *ThreadSyncContext) bool {
|
||||
if (self.max_struct == 0) return false;
|
||||
const new_count = self.processed_struct.fetchAdd(1, .monotonic);
|
||||
return new_count >= self.max_struct;
|
||||
}
|
||||
|
||||
fn logError(self: *ThreadSyncContext, message: []const u8, err: anyerror) void {
|
||||
log.err("{s}: {any}", .{ message, err });
|
||||
_ = self.error_file.fetchAdd(1, .acquire);
|
||||
}
|
||||
};
|
||||
|
||||
/// Manage everything that is relate to read or write in files
|
||||
/// Or even get stats, whatever. If it touch files, it's here
|
||||
pub const FileEngine = struct {
|
||||
@ -190,24 +224,104 @@ pub const FileEngine = struct {
|
||||
return count;
|
||||
}
|
||||
|
||||
/// Use a struct name to populate a list with all UUID of this struct
|
||||
/// TODO: Use a radix trie or something in that style to keep UUID and file position in memory
|
||||
/// TODO: Multi thread that too
|
||||
pub fn getAllUUIDList(self: *FileEngine, struct_name: []const u8, uuid_list: *std.ArrayList(UUID)) ZipponError!void {
|
||||
/// Use a struct name and filter to populate a map with all UUID bytes as key and void as value
|
||||
pub fn populateUUIDMap(
|
||||
self: *FileEngine,
|
||||
struct_name: []const u8,
|
||||
filter: ?Filter,
|
||||
map: *std.AutoHashMap([16]u8, void),
|
||||
additional_data: *AdditionalData,
|
||||
) ZipponError!void {
|
||||
const sstruct = try self.schema_engine.structName2SchemaStruct(struct_name);
|
||||
const max_file_index = try self.maxFileIndex(sstruct.name);
|
||||
|
||||
const dir = try utils.printOpenDir("{s}/DATA/{s}", .{ self.path_to_ZipponDB_dir, sstruct.name }, .{});
|
||||
|
||||
for (0..(max_file_index + 1)) |i| {
|
||||
const path_buff = std.fmt.allocPrint(self.allocator, "{d}.zid", .{i}) catch return FileEngineError.MemoryError;
|
||||
defer self.allocator.free(path_buff);
|
||||
// Multi-threading setup
|
||||
var arena = std.heap.ThreadSafeAllocator{
|
||||
.child_allocator = self.allocator,
|
||||
};
|
||||
|
||||
var iter = zid.DataIterator.init(self.allocator, path_buff, dir, sstruct.zid_schema) catch return FileEngineError.ZipponDataError;
|
||||
defer iter.deinit();
|
||||
var pool: std.Thread.Pool = undefined;
|
||||
defer pool.deinit();
|
||||
pool.init(std.Thread.Pool.Options{
|
||||
.allocator = arena.allocator(),
|
||||
.n_jobs = CPU_CORE,
|
||||
}) catch return ZipponError.ThreadError;
|
||||
|
||||
while (iter.next() catch return FileEngineError.ZipponDataError) |row| uuid_list.append(UUID{ .bytes = row[0].UUID }) catch return FileEngineError.MemoryError;
|
||||
var sync_context = ThreadSyncContext.init(
|
||||
additional_data.entity_count_to_find,
|
||||
max_file_index + 1,
|
||||
);
|
||||
|
||||
// Create a thread-safe writer for each file
|
||||
var thread_writer_list = self.allocator.alloc(std.ArrayList([16]u8), max_file_index + 1) catch return FileEngineError.MemoryError;
|
||||
defer {
|
||||
for (thread_writer_list) |list| list.deinit();
|
||||
self.allocator.free(thread_writer_list);
|
||||
}
|
||||
|
||||
for (thread_writer_list) |*list| {
|
||||
list.* = std.ArrayList([16]u8).init(self.allocator);
|
||||
}
|
||||
|
||||
// Spawn threads for each file
|
||||
for (0..(max_file_index + 1)) |file_index| {
|
||||
pool.spawn(populateUUIDMapOneFile, .{
|
||||
sstruct,
|
||||
filter,
|
||||
&thread_writer_list[file_index],
|
||||
file_index,
|
||||
dir,
|
||||
&sync_context,
|
||||
}) catch return FileEngineError.ThreadError;
|
||||
}
|
||||
|
||||
// Wait for all threads to complete
|
||||
while (!sync_context.isComplete()) {
|
||||
std.time.sleep(10_000_000);
|
||||
}
|
||||
|
||||
// Combine results
|
||||
for (thread_writer_list) |list| {
|
||||
for (list.items) |uuid| map.put(uuid, void) catch return ZipponError.MemoryError;
|
||||
}
|
||||
}
|
||||
|
||||
fn populateUUIDMapOneFile(
|
||||
sstruct: SchemaStruct,
|
||||
filter: ?Filter,
|
||||
list: *std.ArrayList([16]u8),
|
||||
file_index: u64,
|
||||
dir: std.fs.Dir,
|
||||
sync_context: *ThreadSyncContext,
|
||||
) void {
|
||||
var data_buffer: [BUFFER_SIZE]u8 = undefined;
|
||||
var fa = std.heap.FixedBufferAllocator.init(&data_buffer);
|
||||
defer fa.reset();
|
||||
const allocator = fa.allocator();
|
||||
|
||||
var path_buffer: [128]u8 = undefined;
|
||||
const path = std.fmt.bufPrint(&path_buffer, "{d}.zid", .{file_index}) catch |err| {
|
||||
sync_context.logError("Error creating file path", err);
|
||||
return;
|
||||
};
|
||||
|
||||
var iter = zid.DataIterator.init(allocator, path, dir, sstruct.zid_schema) catch |err| {
|
||||
sync_context.logError("Error initializing DataIterator", err);
|
||||
return;
|
||||
};
|
||||
defer iter.deinit();
|
||||
|
||||
while (iter.next() catch return) |row| {
|
||||
if (filter == null or filter.?.evaluate(row)) {
|
||||
list.*.append(row[0].UUID);
|
||||
|
||||
if (sync_context.incrementAndCheckStructLimit()) break;
|
||||
}
|
||||
}
|
||||
|
||||
_ = sync_context.completeThread();
|
||||
}
|
||||
|
||||
/// Take a filter, parse all file and if one struct if validate by the filter, write it in a JSON format to the writer
|
||||
@ -233,9 +347,21 @@ pub const FileEngine = struct {
|
||||
const dir = try utils.printOpenDir("{s}/DATA/{s}", .{ self.path_to_ZipponDB_dir, sstruct.name }, .{ .access_sub_paths = false });
|
||||
|
||||
// Multi thread stuffs
|
||||
var total_entity_found: U64 = U64.init(0);
|
||||
var ended_count: U64 = U64.init(0);
|
||||
var error_count: U64 = U64.init(0);
|
||||
var arena = std.heap.ThreadSafeAllocator{
|
||||
.child_allocator = self.allocator,
|
||||
};
|
||||
|
||||
var pool: std.Thread.Pool = undefined;
|
||||
defer pool.deinit();
|
||||
pool.init(std.Thread.Pool.Options{
|
||||
.allocator = arena.allocator(),
|
||||
.n_jobs = CPU_CORE,
|
||||
}) catch return ZipponError.ThreadError;
|
||||
|
||||
var sync_context = ThreadSyncContext.init(
|
||||
additional_data.entity_count_to_find,
|
||||
max_file_index + 1,
|
||||
);
|
||||
|
||||
// Do one array and writer for each thread otherwise then create error by writing at the same time
|
||||
// Maybe use fixed lenght buffer for speed here
|
||||
@ -245,19 +371,6 @@ pub const FileEngine = struct {
|
||||
self.allocator.free(thread_writer_list);
|
||||
}
|
||||
|
||||
var thread_safe_arena: std.heap.ThreadSafeAllocator = .{
|
||||
.child_allocator = self.allocator,
|
||||
};
|
||||
const arena = thread_safe_arena.allocator();
|
||||
|
||||
// TODO: Put that in the db engine, so I dont need to init the Pool every time
|
||||
var thread_pool: Pool = undefined;
|
||||
thread_pool.init(Pool.Options{
|
||||
.allocator = arena, // this is an arena allocator from `std.heap.ArenaAllocator`
|
||||
.n_jobs = CPU_CORE, // optional, by default the number of CPUs available
|
||||
}) catch return FileEngineError.ThreadError;
|
||||
defer thread_pool.deinit();
|
||||
|
||||
// Maybe do one buffer per files ?
|
||||
var data_buffer: [BUFFER_SIZE]u8 = undefined;
|
||||
var fa = std.heap.FixedBufferAllocator.init(&data_buffer);
|
||||
@ -268,7 +381,7 @@ pub const FileEngine = struct {
|
||||
for (0..(max_file_index + 1)) |file_index| {
|
||||
thread_writer_list[file_index] = std.ArrayList(u8).init(allocator);
|
||||
|
||||
thread_pool.spawn(parseEntitiesOneFile, .{
|
||||
pool.spawn(parseEntitiesOneFile, .{
|
||||
thread_writer_list[file_index].writer(),
|
||||
file_index,
|
||||
dir,
|
||||
@ -276,19 +389,15 @@ pub const FileEngine = struct {
|
||||
filter,
|
||||
additional_data,
|
||||
try self.schema_engine.structName2DataType(struct_name),
|
||||
&total_entity_found,
|
||||
&ended_count,
|
||||
&error_count,
|
||||
&sync_context,
|
||||
}) catch return FileEngineError.ThreadError;
|
||||
}
|
||||
|
||||
// Wait for all thread to either finish or return an error
|
||||
while ((ended_count.load(.acquire) + error_count.load(.acquire)) < max_file_index + 1) {
|
||||
while (!sync_context.isComplete()) {
|
||||
std.time.sleep(10_000_000); // Check every 10ms
|
||||
}
|
||||
|
||||
if (error_count.load(.acquire) > 0) log.warn("Thread ended with an error {d}", .{error_count.load(.acquire)});
|
||||
|
||||
// Append all writer to each other
|
||||
writer.writeByte('[') catch return FileEngineError.WriteError;
|
||||
for (thread_writer_list) |list| writer.writeAll(list.items) catch return FileEngineError.WriteError;
|
||||
@ -303,10 +412,9 @@ pub const FileEngine = struct {
|
||||
filter: ?Filter,
|
||||
additional_data: *AdditionalData,
|
||||
data_types: []const DataType,
|
||||
total_entity_found: *U64,
|
||||
ended_count: *U64,
|
||||
error_count: *U64,
|
||||
sync_context: *ThreadSyncContext,
|
||||
) void {
|
||||
log.debug("{any}\n", .{@TypeOf(writer)});
|
||||
var data_buffer: [BUFFER_SIZE]u8 = undefined;
|
||||
var fa = std.heap.FixedBufferAllocator.init(&data_buffer);
|
||||
defer fa.reset();
|
||||
@ -314,30 +422,34 @@ pub const FileEngine = struct {
|
||||
|
||||
var path_buffer: [16]u8 = undefined;
|
||||
const path = std.fmt.bufPrint(&path_buffer, "{d}.zid", .{file_index}) catch |err| {
|
||||
logErrorAndIncrementCount("Error creating file path", err, error_count);
|
||||
sync_context.logError("Error creating file path", err);
|
||||
return;
|
||||
};
|
||||
|
||||
var iter = zid.DataIterator.init(allocator, path, dir, zid_schema) catch |err| {
|
||||
logErrorAndIncrementCount("Error initializing DataIterator", err, error_count);
|
||||
sync_context.logError("Error initializing DataIterator", err);
|
||||
return;
|
||||
};
|
||||
|
||||
while (iter.next() catch |err| {
|
||||
logErrorAndIncrementCount("Error in iter next", err, error_count);
|
||||
sync_context.logError("Error in iter next", err);
|
||||
return;
|
||||
}) |row| {
|
||||
if (filter) |f| if (!f.evaluate(row)) continue;
|
||||
|
||||
if (writeEntity(writer, row, additional_data, data_types)) |_| {
|
||||
if (incrementAndCheckLimit(total_entity_found, additional_data.entity_count_to_find)) break;
|
||||
} else |err| {
|
||||
logErrorAndIncrementCount("Error writing entity", err, error_count);
|
||||
writeEntity(
|
||||
writer,
|
||||
row,
|
||||
additional_data,
|
||||
data_types,
|
||||
) catch |err| {
|
||||
sync_context.logError("Error writing entity", err);
|
||||
return;
|
||||
}
|
||||
};
|
||||
if (sync_context.incrementAndCheckStructLimit()) break;
|
||||
}
|
||||
|
||||
_ = ended_count.fetchAdd(1, .acquire);
|
||||
_ = sync_context.completeThread();
|
||||
}
|
||||
|
||||
fn writeEntity(
|
||||
@ -453,21 +565,21 @@ pub const FileEngine = struct {
|
||||
const dir = try utils.printOpenDir("{s}/DATA/{s}", .{ self.path_to_ZipponDB_dir, sstruct.name }, .{});
|
||||
|
||||
// Multi-threading setup
|
||||
var total_entity_updated = U64.init(0);
|
||||
var ended_count = U64.init(0);
|
||||
var error_count = U64.init(0);
|
||||
|
||||
var thread_safe_arena: std.heap.ThreadSafeAllocator = .{
|
||||
var arena = std.heap.ThreadSafeAllocator{
|
||||
.child_allocator = self.allocator,
|
||||
};
|
||||
const arena = thread_safe_arena.allocator();
|
||||
|
||||
var thread_pool: Pool = undefined;
|
||||
thread_pool.init(Pool.Options{
|
||||
.allocator = arena,
|
||||
var pool: std.Thread.Pool = undefined;
|
||||
defer pool.deinit();
|
||||
pool.init(std.Thread.Pool.Options{
|
||||
.allocator = arena.allocator(),
|
||||
.n_jobs = CPU_CORE,
|
||||
}) catch return FileEngineError.ThreadError;
|
||||
defer thread_pool.deinit();
|
||||
}) catch return ZipponError.ThreadError;
|
||||
|
||||
var sync_context = ThreadSyncContext.init(
|
||||
additional_data.entity_count_to_find,
|
||||
max_file_index + 1,
|
||||
);
|
||||
|
||||
// Create a thread-safe writer for each file
|
||||
var thread_writer_list = self.allocator.alloc(std.ArrayList(u8), max_file_index + 1) catch return FileEngineError.MemoryError;
|
||||
@ -480,9 +592,10 @@ pub const FileEngine = struct {
|
||||
list.* = std.ArrayList(u8).init(self.allocator);
|
||||
}
|
||||
|
||||
var data_arena = std.heap.ArenaAllocator.init(std.heap.page_allocator);
|
||||
defer data_arena.deinit();
|
||||
const data_allocator = data_arena.allocator();
|
||||
var data_buffer: [BUFFER_SIZE]u8 = undefined;
|
||||
var fa = std.heap.FixedBufferAllocator.init(&data_buffer);
|
||||
defer fa.reset();
|
||||
const data_allocator = fa.allocator();
|
||||
|
||||
var new_data_buff = data_allocator.alloc(zid.Data, sstruct.members.len) catch return ZipponError.MemoryError;
|
||||
|
||||
@ -496,23 +609,20 @@ pub const FileEngine = struct {
|
||||
|
||||
// Spawn threads for each file
|
||||
for (0..(max_file_index + 1)) |file_index| {
|
||||
thread_pool.spawn(updateEntitiesOneFile, .{
|
||||
pool.spawn(updateEntitiesOneFile, .{
|
||||
new_data_buff,
|
||||
sstruct,
|
||||
filter,
|
||||
&map,
|
||||
thread_writer_list[file_index].writer(),
|
||||
additional_data,
|
||||
file_index,
|
||||
dir,
|
||||
&total_entity_updated,
|
||||
&ended_count,
|
||||
&error_count,
|
||||
&sync_context,
|
||||
}) catch return FileEngineError.ThreadError;
|
||||
}
|
||||
|
||||
// Wait for all threads to complete
|
||||
while ((ended_count.load(.acquire) + error_count.load(.acquire)) < max_file_index + 1) {
|
||||
while (sync_context.isComplete()) {
|
||||
std.time.sleep(10_000_000); // Check every 10ms
|
||||
}
|
||||
|
||||
@ -530,13 +640,11 @@ pub const FileEngine = struct {
|
||||
filter: ?Filter,
|
||||
map: *const std.StringHashMap([]const u8),
|
||||
writer: anytype,
|
||||
additional_data: *AdditionalData,
|
||||
file_index: u64,
|
||||
dir: std.fs.Dir,
|
||||
total_entity_updated: *U64,
|
||||
ended_count: *U64,
|
||||
error_count: *U64,
|
||||
sync_context: *ThreadSyncContext,
|
||||
) void {
|
||||
log.debug("{any}\n", .{@TypeOf(writer)});
|
||||
var data_buffer: [BUFFER_SIZE]u8 = undefined;
|
||||
var fa = std.heap.FixedBufferAllocator.init(&data_buffer);
|
||||
defer fa.reset();
|
||||
@ -544,34 +652,39 @@ pub const FileEngine = struct {
|
||||
|
||||
var path_buffer: [128]u8 = undefined;
|
||||
const path = std.fmt.bufPrint(&path_buffer, "{d}.zid", .{file_index}) catch |err| {
|
||||
logErrorAndIncrementCount("Error creating file path", err, error_count);
|
||||
sync_context.logError("Error creating file path", err);
|
||||
return;
|
||||
};
|
||||
|
||||
var iter = zid.DataIterator.init(allocator, path, dir, sstruct.zid_schema) catch |err| {
|
||||
logErrorAndIncrementCount("Error initializing DataIterator", err, error_count);
|
||||
sync_context.logError("Error initializing DataIterator", err);
|
||||
return;
|
||||
};
|
||||
defer iter.deinit();
|
||||
|
||||
const new_path = std.fmt.allocPrint(allocator, "{d}.zid.new", .{file_index}) catch |err| {
|
||||
logErrorAndIncrementCount("Error creating new file path", err, error_count);
|
||||
sync_context.logError("Error creating new file path", err);
|
||||
return;
|
||||
};
|
||||
defer allocator.free(new_path);
|
||||
|
||||
zid.createFile(new_path, dir) catch |err| {
|
||||
logErrorAndIncrementCount("Error creating new file", err, error_count);
|
||||
sync_context.logError("Error creating new file", err);
|
||||
return;
|
||||
};
|
||||
|
||||
var new_writer = zid.DataWriter.init(new_path, dir) catch |err| {
|
||||
logErrorAndIncrementCount("Error initializing DataWriter", err, error_count);
|
||||
sync_context.logError("Error initializing DataWriter", err);
|
||||
zid.deleteFile(new_path, dir) catch {};
|
||||
return;
|
||||
};
|
||||
defer new_writer.deinit();
|
||||
|
||||
while (iter.next() catch return) |row| {
|
||||
while (iter.next() catch |err| {
|
||||
sync_context.logError("Error initializing DataWriter", err);
|
||||
zid.deleteFile(new_path, dir) catch {};
|
||||
return;
|
||||
}) |row| {
|
||||
if (filter == null or filter.?.evaluate(row)) {
|
||||
// Add the unchanged Data in the new_data_buff
|
||||
new_data_buff[0] = row[0];
|
||||
@ -580,40 +693,57 @@ pub const FileEngine = struct {
|
||||
new_data_buff[i] = row[i];
|
||||
}
|
||||
|
||||
log.debug("{d} {any}\n\n", .{ new_data_buff.len, new_data_buff });
|
||||
|
||||
new_writer.write(new_data_buff) catch |err| {
|
||||
logErrorAndIncrementCount("Error writing new data", err, error_count);
|
||||
sync_context.logError("Error initializing DataWriter", err);
|
||||
zid.deleteFile(new_path, dir) catch {};
|
||||
return;
|
||||
};
|
||||
|
||||
writer.writeByte('{') catch return;
|
||||
writer.print("\"{s}\"", .{UUID.format_bytes(row[0].UUID)}) catch return;
|
||||
writer.writeAll("},") catch return;
|
||||
|
||||
if (incrementAndCheckLimit(total_entity_updated, additional_data.entity_count_to_find)) break;
|
||||
// FIXME: This get an error and I have no idea why
|
||||
//writer.writeByte('{') catch |err| {
|
||||
// sync_context.logError("Error initializing DataWriter", err);
|
||||
// zid.deleteFile(new_path, dir) catch {};
|
||||
// return;
|
||||
//};
|
||||
//writer.print("\"{s}\"", .{UUID.format_bytes(row[0].UUID)}) catch |err| {
|
||||
// sync_context.logError("Error initializing DataWriter", err);
|
||||
// zid.deleteFile(new_path, dir) catch {};
|
||||
// return;
|
||||
//};
|
||||
//writer.writeAll("},") catch |err| {
|
||||
// sync_context.logError("Error initializing DataWriter", err);
|
||||
// zid.deleteFile(new_path, dir) catch {};
|
||||
// return;
|
||||
//};
|
||||
if (sync_context.incrementAndCheckStructLimit()) break;
|
||||
} else {
|
||||
new_writer.write(row) catch |err| {
|
||||
logErrorAndIncrementCount("Error writing unchanged data", err, error_count);
|
||||
sync_context.logError("Error initializing DataWriter", err);
|
||||
zid.deleteFile(new_path, dir) catch {};
|
||||
return;
|
||||
};
|
||||
}
|
||||
}
|
||||
|
||||
new_writer.flush() catch |err| {
|
||||
logErrorAndIncrementCount("Error flushing new writer", err, error_count);
|
||||
sync_context.logError("Error initializing DataWriter", err);
|
||||
zid.deleteFile(new_path, dir) catch {};
|
||||
return;
|
||||
};
|
||||
|
||||
dir.deleteFile(path) catch |err| {
|
||||
logErrorAndIncrementCount("Error deleting old file", err, error_count);
|
||||
sync_context.logError("Error deleting old file", err);
|
||||
return;
|
||||
};
|
||||
|
||||
dir.rename(new_path, path) catch |err| {
|
||||
logErrorAndIncrementCount("Error renaming new file", err, error_count);
|
||||
sync_context.logError("Error initializing DataWriter", err);
|
||||
return;
|
||||
};
|
||||
|
||||
_ = ended_count.fetchAdd(1, .acquire);
|
||||
_ = sync_context.completeThread();
|
||||
}
|
||||
|
||||
/// Will delete all entity based on the filter. Will also write a JSON format list of all UUID deleted into the buffer
|
||||
@ -630,21 +760,21 @@ pub const FileEngine = struct {
|
||||
const dir = try utils.printOpenDir("{s}/DATA/{s}", .{ self.path_to_ZipponDB_dir, sstruct.name }, .{});
|
||||
|
||||
// Multi-threading setup
|
||||
var total_entity_deleted = U64.init(0);
|
||||
var ended_count = U64.init(0);
|
||||
var error_count = U64.init(0);
|
||||
|
||||
var thread_safe_arena: std.heap.ThreadSafeAllocator = .{
|
||||
var arena = std.heap.ThreadSafeAllocator{
|
||||
.child_allocator = self.allocator,
|
||||
};
|
||||
const arena = thread_safe_arena.allocator();
|
||||
|
||||
var thread_pool: Pool = undefined;
|
||||
thread_pool.init(Pool.Options{
|
||||
.allocator = arena,
|
||||
var pool: std.Thread.Pool = undefined;
|
||||
defer pool.deinit();
|
||||
pool.init(std.Thread.Pool.Options{
|
||||
.allocator = arena.allocator(),
|
||||
.n_jobs = CPU_CORE,
|
||||
}) catch return FileEngineError.ThreadError;
|
||||
defer thread_pool.deinit();
|
||||
}) catch return ZipponError.ThreadError;
|
||||
|
||||
var sync_context = ThreadSyncContext.init(
|
||||
additional_data.entity_count_to_find,
|
||||
max_file_index + 1,
|
||||
);
|
||||
|
||||
// Create a thread-safe writer for each file
|
||||
var thread_writer_list = self.allocator.alloc(std.ArrayList(u8), max_file_index + 1) catch return FileEngineError.MemoryError;
|
||||
@ -659,21 +789,18 @@ pub const FileEngine = struct {
|
||||
|
||||
// Spawn threads for each file
|
||||
for (0..(max_file_index + 1)) |file_index| {
|
||||
thread_pool.spawn(deleteEntitiesOneFile, .{
|
||||
pool.spawn(deleteEntitiesOneFile, .{
|
||||
sstruct,
|
||||
filter,
|
||||
thread_writer_list[file_index].writer(),
|
||||
additional_data,
|
||||
file_index,
|
||||
dir,
|
||||
&total_entity_deleted,
|
||||
&ended_count,
|
||||
&error_count,
|
||||
&sync_context,
|
||||
}) catch return FileEngineError.ThreadError;
|
||||
}
|
||||
|
||||
// Wait for all threads to complete
|
||||
while ((ended_count.load(.acquire) + error_count.load(.acquire)) < max_file_index + 1) {
|
||||
while (!sync_context.isComplete()) {
|
||||
std.time.sleep(10_000_000); // Check every 10ms
|
||||
}
|
||||
|
||||
@ -689,12 +816,9 @@ pub const FileEngine = struct {
|
||||
sstruct: SchemaStruct,
|
||||
filter: ?Filter,
|
||||
writer: anytype,
|
||||
additional_data: *AdditionalData,
|
||||
file_index: u64,
|
||||
dir: std.fs.Dir,
|
||||
total_entity_deleted: *U64,
|
||||
ended_count: *U64,
|
||||
error_count: *U64,
|
||||
sync_context: *ThreadSyncContext,
|
||||
) void {
|
||||
var data_buffer: [BUFFER_SIZE]u8 = undefined;
|
||||
var fa = std.heap.FixedBufferAllocator.init(&data_buffer);
|
||||
@ -703,29 +827,29 @@ pub const FileEngine = struct {
|
||||
|
||||
var path_buffer: [128]u8 = undefined;
|
||||
const path = std.fmt.bufPrint(&path_buffer, "{d}.zid", .{file_index}) catch |err| {
|
||||
logErrorAndIncrementCount("Error creating file path", err, error_count);
|
||||
sync_context.logError("Error creating file path", err);
|
||||
return;
|
||||
};
|
||||
|
||||
var iter = zid.DataIterator.init(allocator, path, dir, sstruct.zid_schema) catch |err| {
|
||||
logErrorAndIncrementCount("Error initializing DataIterator", err, error_count);
|
||||
sync_context.logError("Error initializing DataIterator", err);
|
||||
return;
|
||||
};
|
||||
defer iter.deinit();
|
||||
|
||||
const new_path = std.fmt.allocPrint(allocator, "{d}.zid.new", .{file_index}) catch |err| {
|
||||
logErrorAndIncrementCount("Error creating new file path", err, error_count);
|
||||
sync_context.logError("Error creating new file path", err);
|
||||
return;
|
||||
};
|
||||
defer allocator.free(new_path);
|
||||
|
||||
zid.createFile(new_path, dir) catch |err| {
|
||||
logErrorAndIncrementCount("Error creating new file", err, error_count);
|
||||
sync_context.logError("Error creating new file", err);
|
||||
return;
|
||||
};
|
||||
|
||||
var new_writer = zid.DataWriter.init(new_path, dir) catch |err| {
|
||||
logErrorAndIncrementCount("Error initializing DataWriter", err, error_count);
|
||||
sync_context.logError("Error initializing DataWriter", err);
|
||||
return;
|
||||
};
|
||||
defer new_writer.deinit();
|
||||
@ -736,43 +860,31 @@ pub const FileEngine = struct {
|
||||
writer.print("\"{s}\"", .{UUID.format_bytes(row[0].UUID)}) catch return;
|
||||
writer.writeAll("},") catch return;
|
||||
|
||||
if (incrementAndCheckLimit(total_entity_deleted, additional_data.entity_count_to_find)) break;
|
||||
if (sync_context.incrementAndCheckStructLimit()) break;
|
||||
} else {
|
||||
new_writer.write(row) catch |err| {
|
||||
logErrorAndIncrementCount("Error writing unchanged data", err, error_count);
|
||||
sync_context.logError("Error writing unchanged data", err);
|
||||
return;
|
||||
};
|
||||
}
|
||||
}
|
||||
|
||||
new_writer.flush() catch |err| {
|
||||
logErrorAndIncrementCount("Error flushing new writer", err, error_count);
|
||||
sync_context.logError("Error flushing new writer", err);
|
||||
return;
|
||||
};
|
||||
|
||||
dir.deleteFile(path) catch |err| {
|
||||
logErrorAndIncrementCount("Error deleting old file", err, error_count);
|
||||
sync_context.logError("Error deleting old file", err);
|
||||
return;
|
||||
};
|
||||
|
||||
dir.rename(new_path, path) catch |err| {
|
||||
logErrorAndIncrementCount("Error renaming new file", err, error_count);
|
||||
sync_context.logError("Error renaming new file", err);
|
||||
return;
|
||||
};
|
||||
|
||||
_ = ended_count.fetchAdd(1, .acquire);
|
||||
}
|
||||
|
||||
// --------------------Shared multi threading methods--------------------
|
||||
|
||||
fn incrementAndCheckLimit(counter: *U64, limit: u64) bool {
|
||||
const new_count = counter.fetchAdd(1, .monotonic) + 1;
|
||||
return limit != 0 and new_count >= limit;
|
||||
}
|
||||
|
||||
fn logErrorAndIncrementCount(message: []const u8, err: anyerror, error_count: *U64) void {
|
||||
log.err("{s}: {any}", .{ message, err });
|
||||
_ = error_count.fetchAdd(1, .acquire);
|
||||
_ = sync_context.completeThread();
|
||||
}
|
||||
|
||||
// --------------------ZipponData utils--------------------
|
||||
@ -880,6 +992,7 @@ pub const FileEngine = struct {
|
||||
const file_stat = member_dir.statFile(entry.name) catch return FileEngineError.FileStatError;
|
||||
if (file_stat.size < MAX_FILE_SIZE) {
|
||||
// Cant I just return i ? It is supossed that files are ordered. I think I already check and it is not
|
||||
std.debug.print("{s}\n\n", .{entry.name[0..(entry.name.len - 4)]});
|
||||
return std.fmt.parseInt(usize, entry.name[0..(entry.name.len - 4)], 10) catch return FileEngineError.InvalidFileIndex; // INFO: Hardcoded len of file extension
|
||||
}
|
||||
}
|
||||
|
Loading…
x
Reference in New Issue
Block a user