I think I just added multithreading for GRAB :o

This look like it work, maybe I need to start using a unique Pool for
everything. Maybe keeping it directly inside the FileEngine
This commit is contained in:
Adrien Bouvais 2024-11-02 23:47:41 +01:00
parent dba73ce113
commit 72b001b72c
3 changed files with 96 additions and 43 deletions

View File

@ -1,6 +1,7 @@
pub const BUFFER_SIZE = 1024 * 64 * 64; // Line limit when parsing file and other buffers
pub const MAX_FILE_SIZE = 5e+6; // 5Mb
pub const CSV_DELIMITER = ';'; // TODO: Delete
pub const CPU_CORE = 6;
// Testing
pub const TEST_DATA_DIR = "data"; // Maybe put that directly in the build

View File

@ -4,6 +4,8 @@ const dtype = @import("dtype");
const s2t = dtype.s2t;
const zid = @import("ZipponData");
const time = std.time;
const U64 = std.atomic.Value(u64);
const Pool = std.Thread.Pool;
const Allocator = std.mem.Allocator;
const UUID = dtype.UUID;
@ -31,6 +33,7 @@ const BUFFER_SIZE = config.BUFFER_SIZE;
const MAX_FILE_SIZE = config.MAX_FILE_SIZE;
const CSV_DELIMITER = config.CSV_DELIMITER;
const RESET_LOG_AT_RESTART = config.RESET_LOG_AT_RESTART;
const CPU_CORE = config.CPU_CORE;
const log = std.log.scoped(.fileEngine);
@ -303,7 +306,6 @@ pub const FileEngine = struct {
) FileEngineError!void {
const sstruct = try self.structName2SchemaStruct(struct_name);
const max_file_index = try self.maxFileIndex(sstruct.name);
var total_currently_found: usize = 0;
var path_buff = std.fmt.allocPrint(
self.allocator,
@ -318,58 +320,107 @@ pub const FileEngine = struct {
additional_data.populateWithEverything(self.allocator, sstruct.members) catch return FileEngineError.MemoryError;
}
var data_buffer: [BUFFER_SIZE]u8 = undefined;
var fa = std.heap.FixedBufferAllocator.init(&data_buffer);
defer fa.reset();
const data_allocator = fa.allocator();
// Multi thread stuffs
var total_entity_found: U64 = U64.init(0);
var finished_count: U64 = U64.init(0);
var single_threaded_arena = std.heap.ArenaAllocator.init(self.allocator);
defer single_threaded_arena.deinit();
var thread_safe_arena: std.heap.ThreadSafeAllocator = .{
.child_allocator = single_threaded_arena.allocator(),
};
const arena = thread_safe_arena.allocator();
// INFO: Pool cant return error so far :/ That is annoying. Maybe I can do something similar myself that hundle error
var thread_pool: Pool = undefined;
thread_pool.init(Pool.Options{
.allocator = arena, // this is an arena allocator from `std.heap.ArenaAllocator`
.n_jobs = CPU_CORE, // optional, by default the number of CPUs available
}) catch return FileEngineError.ThreadError;
defer thread_pool.deinit();
writer.writeAll("[") catch return FileEngineError.WriteError;
for (0..(max_file_index + 1)) |file_index| { // TODO: Multi thread that
self.allocator.free(path_buff);
path_buff = std.fmt.allocPrint(self.allocator, "{d}.zid", .{file_index}) catch return FileEngineError.MemoryError;
fa.reset();
var iter = zid.DataIterator.init(data_allocator, path_buff, dir, sstruct.zid_schema) catch return FileEngineError.ZipponDataError;
defer iter.deinit();
blk: while (iter.next() catch return FileEngineError.ZipponDataError) |row| {
if (filter != null) if (!filter.?.evaluate(row)) continue;
writer.writeByte('{') catch return FileEngineError.WriteError;
for (additional_data.member_to_find.items) |member| {
// write the member name and = sign
writer.print("{s}: ", .{member.name}) catch return FileEngineError.WriteError;
switch (row[member.index]) {
.Int => |v| writer.print("{d}", .{v}) catch return FileEngineError.WriteError,
.Float => |v| writer.print("{d}", .{v}) catch return FileEngineError.WriteError,
.Str => |v| writer.print("\"{s}\"", .{v}) catch return FileEngineError.WriteError,
.UUID => |v| writer.print("\"{s}\"", .{UUID.format_bytes(v)}) catch return FileEngineError.WriteError,
.Bool => |v| writer.print("{any}", .{v}) catch return FileEngineError.WriteError,
.Unix => |v| {
const datetime = DateTime.initUnix(v);
writer.writeByte('"') catch return FileEngineError.WriteError;
switch (try self.memberName2DataType(struct_name, member.name)) {
.date => datetime.format("YYYY/MM/DD", writer) catch return FileEngineError.WriteError,
.time => datetime.format("HH:mm:ss.SSSS", writer) catch return FileEngineError.WriteError,
.datetime => datetime.format("YYYY/MM/DD-HH:mm:ss.SSSS", writer) catch return FileEngineError.WriteError,
else => unreachable,
}
writer.writeByte('"') catch return FileEngineError.WriteError;
},
.IntArray, .FloatArray, .StrArray, .UUIDArray, .BoolArray => try writeArray(&row[member.index], writer, null),
.UnixArray => try writeArray(&row[member.index], writer, try self.memberName2DataType(struct_name, member.name)),
}
writer.writeAll(", ") catch return FileEngineError.WriteError;
}
writer.writeAll("}, ") catch return FileEngineError.WriteError;
total_currently_found += 1;
if (additional_data.entity_count_to_find != 0 and total_currently_found >= additional_data.entity_count_to_find) break :blk;
}
thread_pool.spawn(parseEntitiesOneFile, .{
writer,
path_buff,
dir,
sstruct.zid_schema,
filter,
additional_data,
try self.structName2DataType(struct_name),
&total_entity_found,
&finished_count,
}) catch return FileEngineError.ThreadError;
}
while (finished_count.load(.acquire) < max_file_index) {
std.time.sleep(10_000_000); // Check every 10ms
}
writer.writeAll("]") catch return FileEngineError.WriteError;
}
// TODO: Add a bufferedWriter for performance and to prevent writting to the real writer if an error happend
fn parseEntitiesOneFile(
writer: anytype,
path: []const u8,
dir: std.fs.Dir,
zid_schema: []zid.DType,
filter: ?Filter,
additional_data: *AdditionalData,
data_types: []const DataType,
total_entity_found: *U64,
finished_count: *U64,
) void {
var data_buffer: [BUFFER_SIZE]u8 = undefined;
var fa = std.heap.FixedBufferAllocator.init(&data_buffer);
defer fa.reset();
const allocator = fa.allocator();
var iter = zid.DataIterator.init(allocator, path, dir, zid_schema) catch return;
defer iter.deinit();
blk: while (iter.next() catch return) |row| {
if (filter != null) if (!filter.?.evaluate(row)) continue;
writer.writeByte('{') catch return;
for (additional_data.member_to_find.items) |member| {
// write the member name and = sign
writer.print("{s}: ", .{member.name}) catch return;
switch (row[member.index]) {
.Int => |v| writer.print("{d}", .{v}) catch return,
.Float => |v| writer.print("{d}", .{v}) catch return,
.Str => |v| writer.print("\"{s}\"", .{v}) catch return,
.UUID => |v| writer.print("\"{s}\"", .{UUID.format_bytes(v)}) catch return,
.Bool => |v| writer.print("{any}", .{v}) catch return,
.Unix => |v| {
const datetime = DateTime.initUnix(v);
writer.writeByte('"') catch return;
switch (data_types[member.index - 1]) {
.date => datetime.format("YYYY/MM/DD", writer) catch return,
.time => datetime.format("HH:mm:ss.SSSS", writer) catch return,
.datetime => datetime.format("YYYY/MM/DD-HH:mm:ss.SSSS", writer) catch return,
else => unreachable,
}
writer.writeByte('"') catch return;
},
.IntArray, .FloatArray, .StrArray, .UUIDArray, .BoolArray => writeArray(&row[member.index], writer, null) catch return,
.UnixArray => writeArray(&row[member.index], writer, data_types[member.index]) catch return,
}
writer.writeAll(", ") catch return;
}
writer.writeAll("}, ") catch return;
_ = total_entity_found.fetchAdd(1, .monotonic);
if (additional_data.entity_count_to_find != 0 and total_entity_found.load(.monotonic) >= additional_data.entity_count_to_find) break :blk;
}
_ = finished_count.fetchAdd(1, .acquire);
}
fn writeArray(data: *zid.Data, writer: anytype, datatype: ?DataType) FileEngineError!void {
writer.writeByte('[') catch return FileEngineError.WriteError;
var iter = zid.ArrayIterator.init(data) catch return FileEngineError.ZipponDataError;

View File

@ -44,6 +44,7 @@ pub const FileEngineError = error{
MemberNotFound,
ZipponDataError,
AllocEncodError,
ThreadError,
};
pub const ZipponError = ZiQlParserError || FileEngineError || SchemaParserError;