From 72b001b72ca9e32e513872bed5d9a3e61c69d1d3 Mon Sep 17 00:00:00 2001 From: MrBounty Date: Sat, 2 Nov 2024 23:47:41 +0100 Subject: [PATCH] I think I just added multithreading for GRAB :o This look like it work, maybe I need to start using a unique Pool for everything. Maybe keeping it directly inside the FileEngine --- src/config.zig | 1 + src/fileEngine.zig | 137 +++++++++++++++++++++++++++++------------- src/stuffs/errors.zig | 1 + 3 files changed, 96 insertions(+), 43 deletions(-) diff --git a/src/config.zig b/src/config.zig index 2fc862c..2475aed 100644 --- a/src/config.zig +++ b/src/config.zig @@ -1,6 +1,7 @@ pub const BUFFER_SIZE = 1024 * 64 * 64; // Line limit when parsing file and other buffers pub const MAX_FILE_SIZE = 5e+6; // 5Mb pub const CSV_DELIMITER = ';'; // TODO: Delete +pub const CPU_CORE = 6; // Testing pub const TEST_DATA_DIR = "data"; // Maybe put that directly in the build diff --git a/src/fileEngine.zig b/src/fileEngine.zig index 5f0aac1..4240119 100644 --- a/src/fileEngine.zig +++ b/src/fileEngine.zig @@ -4,6 +4,8 @@ const dtype = @import("dtype"); const s2t = dtype.s2t; const zid = @import("ZipponData"); const time = std.time; +const U64 = std.atomic.Value(u64); +const Pool = std.Thread.Pool; const Allocator = std.mem.Allocator; const UUID = dtype.UUID; @@ -31,6 +33,7 @@ const BUFFER_SIZE = config.BUFFER_SIZE; const MAX_FILE_SIZE = config.MAX_FILE_SIZE; const CSV_DELIMITER = config.CSV_DELIMITER; const RESET_LOG_AT_RESTART = config.RESET_LOG_AT_RESTART; +const CPU_CORE = config.CPU_CORE; const log = std.log.scoped(.fileEngine); @@ -303,7 +306,6 @@ pub const FileEngine = struct { ) FileEngineError!void { const sstruct = try self.structName2SchemaStruct(struct_name); const max_file_index = try self.maxFileIndex(sstruct.name); - var total_currently_found: usize = 0; var path_buff = std.fmt.allocPrint( self.allocator, @@ -318,58 +320,107 @@ pub const FileEngine = struct { additional_data.populateWithEverything(self.allocator, sstruct.members) catch return FileEngineError.MemoryError; } - var data_buffer: [BUFFER_SIZE]u8 = undefined; - var fa = std.heap.FixedBufferAllocator.init(&data_buffer); - defer fa.reset(); - const data_allocator = fa.allocator(); + // Multi thread stuffs + var total_entity_found: U64 = U64.init(0); + var finished_count: U64 = U64.init(0); + var single_threaded_arena = std.heap.ArenaAllocator.init(self.allocator); + defer single_threaded_arena.deinit(); + + var thread_safe_arena: std.heap.ThreadSafeAllocator = .{ + .child_allocator = single_threaded_arena.allocator(), + }; + const arena = thread_safe_arena.allocator(); + + // INFO: Pool cant return error so far :/ That is annoying. Maybe I can do something similar myself that hundle error + var thread_pool: Pool = undefined; + thread_pool.init(Pool.Options{ + .allocator = arena, // this is an arena allocator from `std.heap.ArenaAllocator` + .n_jobs = CPU_CORE, // optional, by default the number of CPUs available + }) catch return FileEngineError.ThreadError; + defer thread_pool.deinit(); writer.writeAll("[") catch return FileEngineError.WriteError; for (0..(max_file_index + 1)) |file_index| { // TODO: Multi thread that self.allocator.free(path_buff); path_buff = std.fmt.allocPrint(self.allocator, "{d}.zid", .{file_index}) catch return FileEngineError.MemoryError; - fa.reset(); - var iter = zid.DataIterator.init(data_allocator, path_buff, dir, sstruct.zid_schema) catch return FileEngineError.ZipponDataError; - defer iter.deinit(); - - blk: while (iter.next() catch return FileEngineError.ZipponDataError) |row| { - if (filter != null) if (!filter.?.evaluate(row)) continue; - - writer.writeByte('{') catch return FileEngineError.WriteError; - for (additional_data.member_to_find.items) |member| { - // write the member name and = sign - writer.print("{s}: ", .{member.name}) catch return FileEngineError.WriteError; - - switch (row[member.index]) { - .Int => |v| writer.print("{d}", .{v}) catch return FileEngineError.WriteError, - .Float => |v| writer.print("{d}", .{v}) catch return FileEngineError.WriteError, - .Str => |v| writer.print("\"{s}\"", .{v}) catch return FileEngineError.WriteError, - .UUID => |v| writer.print("\"{s}\"", .{UUID.format_bytes(v)}) catch return FileEngineError.WriteError, - .Bool => |v| writer.print("{any}", .{v}) catch return FileEngineError.WriteError, - .Unix => |v| { - const datetime = DateTime.initUnix(v); - writer.writeByte('"') catch return FileEngineError.WriteError; - switch (try self.memberName2DataType(struct_name, member.name)) { - .date => datetime.format("YYYY/MM/DD", writer) catch return FileEngineError.WriteError, - .time => datetime.format("HH:mm:ss.SSSS", writer) catch return FileEngineError.WriteError, - .datetime => datetime.format("YYYY/MM/DD-HH:mm:ss.SSSS", writer) catch return FileEngineError.WriteError, - else => unreachable, - } - writer.writeByte('"') catch return FileEngineError.WriteError; - }, - .IntArray, .FloatArray, .StrArray, .UUIDArray, .BoolArray => try writeArray(&row[member.index], writer, null), - .UnixArray => try writeArray(&row[member.index], writer, try self.memberName2DataType(struct_name, member.name)), - } - writer.writeAll(", ") catch return FileEngineError.WriteError; - } - writer.writeAll("}, ") catch return FileEngineError.WriteError; - total_currently_found += 1; - if (additional_data.entity_count_to_find != 0 and total_currently_found >= additional_data.entity_count_to_find) break :blk; - } + thread_pool.spawn(parseEntitiesOneFile, .{ + writer, + path_buff, + dir, + sstruct.zid_schema, + filter, + additional_data, + try self.structName2DataType(struct_name), + &total_entity_found, + &finished_count, + }) catch return FileEngineError.ThreadError; } + + while (finished_count.load(.acquire) < max_file_index) { + std.time.sleep(10_000_000); // Check every 10ms + } + writer.writeAll("]") catch return FileEngineError.WriteError; } + // TODO: Add a bufferedWriter for performance and to prevent writting to the real writer if an error happend + fn parseEntitiesOneFile( + writer: anytype, + path: []const u8, + dir: std.fs.Dir, + zid_schema: []zid.DType, + filter: ?Filter, + additional_data: *AdditionalData, + data_types: []const DataType, + total_entity_found: *U64, + finished_count: *U64, + ) void { + var data_buffer: [BUFFER_SIZE]u8 = undefined; + var fa = std.heap.FixedBufferAllocator.init(&data_buffer); + defer fa.reset(); + const allocator = fa.allocator(); + + var iter = zid.DataIterator.init(allocator, path, dir, zid_schema) catch return; + defer iter.deinit(); + + blk: while (iter.next() catch return) |row| { + if (filter != null) if (!filter.?.evaluate(row)) continue; + + writer.writeByte('{') catch return; + for (additional_data.member_to_find.items) |member| { + // write the member name and = sign + writer.print("{s}: ", .{member.name}) catch return; + + switch (row[member.index]) { + .Int => |v| writer.print("{d}", .{v}) catch return, + .Float => |v| writer.print("{d}", .{v}) catch return, + .Str => |v| writer.print("\"{s}\"", .{v}) catch return, + .UUID => |v| writer.print("\"{s}\"", .{UUID.format_bytes(v)}) catch return, + .Bool => |v| writer.print("{any}", .{v}) catch return, + .Unix => |v| { + const datetime = DateTime.initUnix(v); + writer.writeByte('"') catch return; + switch (data_types[member.index - 1]) { + .date => datetime.format("YYYY/MM/DD", writer) catch return, + .time => datetime.format("HH:mm:ss.SSSS", writer) catch return, + .datetime => datetime.format("YYYY/MM/DD-HH:mm:ss.SSSS", writer) catch return, + else => unreachable, + } + writer.writeByte('"') catch return; + }, + .IntArray, .FloatArray, .StrArray, .UUIDArray, .BoolArray => writeArray(&row[member.index], writer, null) catch return, + .UnixArray => writeArray(&row[member.index], writer, data_types[member.index]) catch return, + } + writer.writeAll(", ") catch return; + } + writer.writeAll("}, ") catch return; + _ = total_entity_found.fetchAdd(1, .monotonic); + if (additional_data.entity_count_to_find != 0 and total_entity_found.load(.monotonic) >= additional_data.entity_count_to_find) break :blk; + } + _ = finished_count.fetchAdd(1, .acquire); + } + fn writeArray(data: *zid.Data, writer: anytype, datatype: ?DataType) FileEngineError!void { writer.writeByte('[') catch return FileEngineError.WriteError; var iter = zid.ArrayIterator.init(data) catch return FileEngineError.ZipponDataError; diff --git a/src/stuffs/errors.zig b/src/stuffs/errors.zig index 31cda9e..e339d4c 100644 --- a/src/stuffs/errors.zig +++ b/src/stuffs/errors.zig @@ -44,6 +44,7 @@ pub const FileEngineError = error{ MemberNotFound, ZipponDataError, AllocEncodError, + ThreadError, }; pub const ZipponError = ZiQlParserError || FileEngineError || SchemaParserError;