diff --git a/benchmark.zig b/benchmark.zig index 18af7a8..96ebad5 100644 --- a/benchmark.zig +++ b/benchmark.zig @@ -10,43 +10,81 @@ const times = [_][]const u8{ "12:04", "20:45:11", "03:11:13", "03:00:01.0152" }; const datetimes = [_][]const u8{ "2000/01/01-12:04", "1954/04/02-20:45:11", "1998/01/21-03:11:13", "1977/12/31-03:00:01.0153" }; const scores = [_]i32{ 1, 2, 3, 4, 5, 6, 7, 8, 9, 10 }; -pub const std_options = .{ +pub const std_options = std.Options{ + .log_level = .info, .logFn = myLog, }; +var date_buffer: [64]u8 = undefined; +var date_fa = std.heap.FixedBufferAllocator.init(&date_buffer); +const date_allocator = date_fa.allocator(); + pub fn myLog( comptime message_level: std.log.Level, - comptime scope: @Type(.EnumLiteral), + comptime scope: @Type(.enum_literal), comptime format: []const u8, args: anytype, ) void { - if (true) return; - _ = message_level; - _ = scope; - _ = format; - _ = args; + const level_txt = comptime message_level.asText(); + const prefix = if (scope == .default) " - " else "(" ++ @tagName(scope) ++ ") - "; + + const potential_file: ?std.fs.File = std.fs.cwd().openFile("benchmarkDB/LOG/log", .{ .mode = .write_only }) catch null; + + if (potential_file) |file| { + date_fa.reset(); + const now = @import("dtype").DateTime.now(); + var date_format_buffer = std.ArrayList(u8).init(date_allocator); + defer date_format_buffer.deinit(); + now.format("YYYY/MM/DD-HH:mm:ss.SSSS", date_format_buffer.writer()) catch return; + + file.seekFromEnd(0) catch return; + const writer = file.writer(); + + writer.print("{s}{s}Time: {s} - ", .{ level_txt, prefix, date_format_buffer.items }) catch return; + writer.print(format, args) catch return; + writer.writeByte('\n') catch return; + file.close(); + } +} + +pub fn main() !void { + var gpa = std.heap.GeneralPurposeAllocator(.{ .safety = true }){}; + defer { + const deinit_status = gpa.deinit(); + switch (deinit_status) { + .leak => @panic("Oupsy"), + .ok => {}, + } + } + + try benchmark(gpa.allocator()); +} + +test "benchmark" { + const allocator = std.testing.allocator; + try benchmark(allocator); } // Maybe I can make it a test to use the testing alloc -pub fn main() !void { - const allocator = std.heap.page_allocator; - const to_test = [_]usize{ 500, 50_000, 1_000_000 }; +pub fn benchmark(allocator: std.mem.Allocator) !void { + const to_test = [_]usize{ 5, 50, 500, 5_000, 50_000, 500_000, 5_000_000 }; var line_buffer: [1024 * 1024]u8 = undefined; for (to_test) |users_count| { var db_engine = DBEngine.init(allocator, "benchmarkDB", "schema/benchmark"); defer db_engine.deinit(); + // Empty db { const null_term_query_str = try std.fmt.bufPrintZ(&line_buffer, "DELETE User {{}}", .{}); db_engine.runQuery(null_term_query_str); } + // Populate with random dummy value - // Need some speed up, spended times to find that it is the parsonConditionValue that take time, the last switch to be exact, that parse str to value { std.debug.print("\n=====================================\n\n", .{}); std.debug.print("Populating with {d} users.\n", .{users_count}); - var prng = std.rand.DefaultPrng.init(0); + var prng = std.Random.DefaultPrng.init(0); const rng = prng.random(); const populate_start_time = std.time.nanoTimestamp(); @@ -62,7 +100,7 @@ pub fn main() !void { }, ); - for (users_count - 1) |_| { + for (0..users_count - 1) |_| { try writer.print( "('{s}', '{s}', none)", .{ @@ -82,7 +120,7 @@ pub fn main() !void { std.debug.print("Populate duration: {d:.6} seconds\n\n", .{populate_duration}); - var buffer = std.ArrayList(u8).init(std.heap.page_allocator); + var buffer = std.ArrayList(u8).init(allocator); defer buffer.deinit(); try db_engine.file_engine.writeDbMetrics(&buffer); std.debug.print("{s}\n", .{buffer.items}); @@ -99,10 +137,11 @@ pub fn main() !void { // } //} - // Define your benchmark queries + // Run query { const queries = [_][]const u8{ "GRAB User {}", + "GRAB User {name='asd'}", "GRAB User [1] {}", "GRAB User [name] {}", "GRAB User {name = 'Charlie'}", @@ -122,8 +161,7 @@ pub fn main() !void { std.debug.print("Query: \t\t{s}\nDuration: \t{d:.6} ms\n\n", .{ query, duration }); } - - std.debug.print("=====================================\n\n", .{}); } } + std.debug.print("=====================================\n\n", .{}); } diff --git a/build.zig b/build.zig index 4a1e75c..b9c7bae 100644 --- a/build.zig +++ b/build.zig @@ -103,6 +103,22 @@ pub fn build(b: *std.Build) void { test_step.dependOn(&run_tests6.step); } + // Test + // ----------------------------------------------- + { + const tests1 = b.addTest(.{ + .root_source_file = b.path("lib/zid.zig"), + .target = target, + .optimize = optimize, + .name = "File parsing", + .test_runner = b.path("test_runner.zig"), + }); + const run_tests1 = b.addRunArtifact(tests1); + + const test_step = b.step("benchmark-parsing", "Run the benchmark of ZipponData (Single thread)."); + test_step.dependOn(&run_tests1.step); + } + // Benchmark // ----------------------------------------------- { @@ -152,7 +168,7 @@ pub fn build(b: *std.Build) void { .name = exe_name, .root_source_file = b.path("src/main.zig"), .target = tar, - .optimize = .ReleaseSafe, + .optimize = .ReleaseFast, }); // Add the same imports as your main executable diff --git a/lib/config.zig b/lib/config.zig index 186aca3..8968e79 100644 --- a/lib/config.zig +++ b/lib/config.zig @@ -1,6 +1,6 @@ pub const BUFFER_SIZE = 1024 * 10; // Used a bit everywhere. The size for the schema for example. 10kB pub const MAX_FILE_SIZE = 1024 * 1024; // 1MB -pub const CPU_CORE = 16; +pub const CPU_CORE = 0; // Debug pub const PRINT_STATE = false; diff --git a/lib/config_benchmark.zig b/lib/config_benchmark.zig index 764bbda..0c1270c 100644 --- a/lib/config_benchmark.zig +++ b/lib/config_benchmark.zig @@ -1,6 +1,6 @@ pub const BUFFER_SIZE = 1024 * 1024; // Used a bit everywhere. The size for the schema for example. 10kB pub const MAX_FILE_SIZE = 1024 * 1024; // 1MB -pub const CPU_CORE = 16; +pub const CPU_CORE = 1; // Debug pub const PRINT_STATE = false; diff --git a/lib/zid.zig b/lib/zid.zig index 7c7bc5c..4e9b308 100644 --- a/lib/zid.zig +++ b/lib/zid.zig @@ -745,7 +745,7 @@ test "Write and Read" { try std.fs.cwd().deleteDir("tmp"); } -test "Benchmark Write and Read" { +test "Benchmark Write and Read All" { const schema = &[_]DType{ .Int, .Float, @@ -767,6 +767,20 @@ test "Benchmark Write and Read" { try benchmark(schema, data); } +test "Benchmark Write and Read Simple User" { + const schema = &[_]DType{ + .Int, + .Str, + }; + + const data = &[_]Data{ + Data.initInt(1), + Data.initStr("Bob"), + }; + + try benchmark(schema, data); +} + fn benchmark(schema: []const DType, data: []const Data) !void { const allocator = std.testing.allocator; const sizes = [_]usize{ 1, 10, 100, 1_000, 10_000, 100_000, 1_000_000 }; diff --git a/src/cli/core.zig b/src/cli/core.zig index 4a408e8..83fd8b7 100644 --- a/src/cli/core.zig +++ b/src/cli/core.zig @@ -27,15 +27,15 @@ file_engine: FileEngine = undefined, schema_engine: SchemaEngine = undefined, thread_engine: ThreadEngine = undefined, -pub fn init(parent_allocator: Allocator, potential_main_path: ?[]const u8, potential_schema_path: ?[]const u8) Self { +pub fn init(allocator: Allocator, potential_main_path: ?[]const u8, potential_schema_path: ?[]const u8) Self { var self = Self{}; - const arena = parent_allocator.create(std.heap.ArenaAllocator) catch { + const arena = allocator.create(std.heap.ArenaAllocator) catch { log.err("Error when init Engine DB allocator", .{}); self.state = .MissingAllocator; return self; }; - arena.* = std.heap.ArenaAllocator.init(parent_allocator); + arena.* = std.heap.ArenaAllocator.init(allocator); self.arena = arena; self.allocator = arena.allocator(); @@ -152,7 +152,7 @@ pub fn getEnvVariable(allocator: Allocator, variable: []const u8) ?[]const u8 { pub fn runQuery(self: *Self, null_term_query_str: [:0]const u8) void { var parser = ziqlParser.init(&self.file_engine, &self.schema_engine); - parser.parse(null_term_query_str) catch |err| log.err("Error parsing: {any}", .{err}); + parser.parse(self.allocator, null_term_query_str) catch |err| log.err("Error parsing: {any}", .{err}); } pub fn deinit(self: *Self) void { diff --git a/src/file/entityWriter.zig b/src/file/entityWriter.zig index ccd7811..bf070fa 100644 --- a/src/file/entityWriter.zig +++ b/src/file/entityWriter.zig @@ -8,6 +8,8 @@ const DataType = dtype.DataType; const DateTime = dtype.DateTime; const UUID = dtype.UUID; +// TODO: Move this from FileEngine + const ZipponError = @import("error").ZipponError; pub fn writeEntityTable( diff --git a/src/file/read.zig b/src/file/read.zig index db60fb5..239d590 100644 --- a/src/file/read.zig +++ b/src/file/read.zig @@ -240,7 +240,6 @@ pub fn parseEntities( const allocator = arena.allocator(); var buff = std.ArrayList(u8).init(entry_allocator); - defer buff.deinit(); const writer = buff.writer(); const sstruct = try self.schema_engine.structName2SchemaStruct(struct_name); @@ -268,30 +267,35 @@ pub fn parseEntities( // But at the end, only the number of use CPU/Thread will use list simultanously // So I could pass list from a thread to another technicly var thread_writer_list = allocator.alloc(std.ArrayList(u8), to_parse.len) catch return ZipponError.MemoryError; + const data_types = try self.schema_engine.structName2DataType(struct_name); // Start parsing all file in multiple thread + var wg: std.Thread.WaitGroup = .{}; for (to_parse, 0..) |file_index, i| { thread_writer_list[file_index] = std.ArrayList(u8).init(allocator); - self.thread_pool.spawn(parseEntitiesOneFile, .{ - thread_writer_list[i].writer(), - file_index, - dir, - sstruct.zid_schema, - filter, - additional_data.*, - try self.schema_engine.structName2DataType(struct_name), - &sync_context, - }) catch return ZipponError.ThreadError; + self.thread_pool.spawnWg( + &wg, + parseEntitiesOneFile, + .{ + thread_writer_list[i].writer(), + file_index, + dir, + sstruct.zid_schema, + filter, + additional_data.*, + data_types, + &sync_context, + }, + ); } - - // Wait for all thread to either finish or return an error - while (!sync_context.isComplete()) std.time.sleep(100_000); // Check every 0.1ms + wg.wait(); // Append all writer to each other writer.writeByte('[') catch return ZipponError.WriteError; for (thread_writer_list) |list| writer.writeAll(list.items) catch return ZipponError.WriteError; writer.writeByte(']') catch return ZipponError.WriteError; + for (thread_writer_list) |list| list.deinit(); // Now I need to do the relation stuff, meaning parsing new files to get the relationship value // Without relationship to return, this function is basically finish here @@ -321,21 +325,12 @@ fn parseEntitiesOneFile( defer fa.reset(); const allocator = fa.allocator(); - const path = std.fmt.bufPrint(&path_buffer, "{d}.zid", .{file_index}) catch |err| { - sync_context.logError("Error creating file path", err); - return; - }; + const path = std.fmt.bufPrint(&path_buffer, "{d}.zid", .{file_index}) catch return; + var iter = zid.DataIterator.init(allocator, path, dir, zid_schema) catch return; - var iter = zid.DataIterator.init(allocator, path, dir, zid_schema) catch |err| { - sync_context.logError("Error initializing DataIterator", err); - return; - }; - - while (iter.next() catch |err| { - sync_context.logError("Error in iter next", err); - return; - }) |row| { - if (sync_context.checkStructLimit()) break; + while (iter.next() catch return) |row| { + fa.reset(); + if (sync_context.checkStructLimit()) return; if (filter) |f| if (!f.evaluate(row)) continue; EntityWriter.writeEntityJSON( @@ -343,14 +338,10 @@ fn parseEntitiesOneFile( row, additional_data, data_types, - ) catch |err| { - sync_context.logError("Error writing entity", err); - return; - }; - if (sync_context.incrementAndCheckStructLimit()) break; - } + ) catch return; - _ = sync_context.completeThread(); + if (sync_context.incrementAndCheckStructLimit()) return; + } } // Receive a map of UUID -> empty JsonString @@ -410,22 +401,25 @@ pub fn parseEntitiesRelationMap( ) catch return ZipponError.MemoryError; // Start parsing all file in multiple thread + var wg: std.Thread.WaitGroup = .{}; for (to_parse, 0..) |file_index, i| { thread_map_list[i] = relation_map.map.cloneWithAllocator(allocator) catch return ZipponError.MemoryError; - self.thread_pool.spawn(parseEntitiesRelationMapOneFile, .{ - &thread_map_list[i], - file_index, - dir, - sstruct.zid_schema, - relation_map.additional_data, - try self.schema_engine.structName2DataType(struct_name), - &sync_context, - }) catch return ZipponError.ThreadError; + self.thread_pool.spawnWg( + &wg, + parseEntitiesRelationMapOneFile, + .{ + &thread_map_list[i], + file_index, + dir, + sstruct.zid_schema, + relation_map.additional_data, + try self.schema_engine.structName2DataType(struct_name), + &sync_context, + }, + ); } - - // Wait for all thread to either finish or return an error - while (!sync_context.isComplete()) std.time.sleep(100_000); // Check every 0.1ms + wg.wait(); // Now here I should have a list of copy of the map with all UUID a bit everywhere @@ -470,21 +464,11 @@ fn parseEntitiesRelationMapOneFile( var string_list = std.ArrayList(u8).init(allocator); const writer = string_list.writer(); - const path = std.fmt.bufPrint(&path_buffer, "{d}.zid", .{file_index}) catch |err| { - sync_context.logError("Error creating file path", err); - return; - }; + const path = std.fmt.bufPrint(&path_buffer, "{d}.zid", .{file_index}) catch return; + var iter = zid.DataIterator.init(allocator, path, dir, zid_schema) catch return; - var iter = zid.DataIterator.init(allocator, path, dir, zid_schema) catch |err| { - sync_context.logError("Error initializing DataIterator", err); - return; - }; - - while (iter.next() catch |err| { - sync_context.logError("Error in iter next", err); - return; - }) |row| { - if (sync_context.checkStructLimit()) break; + while (iter.next() catch return) |row| { + if (sync_context.checkStructLimit()) return; if (!map.contains(row[0].UUID)) continue; defer string_list.clearRetainingCapacity(); @@ -493,23 +477,12 @@ fn parseEntitiesRelationMapOneFile( row, additional_data, data_types, - ) catch |err| { - sync_context.logError("Error writing entity", err); - return; - }; + ) catch return; + map.put(row[0].UUID, JsonString{ - .slice = parent_alloc.dupe(u8, string_list.items) catch |err| { - sync_context.logError("Error duping data", err); - return; - }, + .slice = parent_alloc.dupe(u8, string_list.items) catch return, .init = true, - }) catch |err| { - sync_context.logError("Error writing entity", err); - return; - }; - - if (sync_context.incrementAndCheckStructLimit()) break; + }) catch return; + if (sync_context.incrementAndCheckStructLimit()) return; } - - _ = sync_context.completeThread(); } diff --git a/src/main.zig b/src/main.zig index 98ded3f..db66509 100644 --- a/src/main.zig +++ b/src/main.zig @@ -11,13 +11,14 @@ var date_buffer: [64]u8 = undefined; var date_fa = std.heap.FixedBufferAllocator.init(&date_buffer); const date_allocator = date_fa.allocator(); -pub const std_options = .{ +pub const std_options = std.Options{ + .log_level = .info, .logFn = myLog, }; pub fn myLog( comptime message_level: std.log.Level, - comptime scope: @Type(.EnumLiteral), + comptime scope: @Type(.enum_literal), comptime format: []const u8, args: anytype, ) void { @@ -48,10 +49,17 @@ pub fn setLogPath(path: []const u8) void { } pub fn main() !void { - var arena = std.heap.ArenaAllocator.init(std.heap.page_allocator); - defer arena.deinit(); + var gpa = std.heap.GeneralPurposeAllocator(.{ .safety = true }){}; + const allocator = gpa.allocator(); + defer { + const deinit_status = gpa.deinit(); + switch (deinit_status) { + .leak => std.debug.print("Leak...\n", .{}), + .ok => {}, + } + } - var cli = Cli.init(arena.allocator(), null, null); + var cli = Cli.init(allocator, null, null); defer cli.deinit(); try cli.start(); diff --git a/src/thread/context.zig b/src/thread/context.zig index ac09487..b43b204 100644 --- a/src/thread/context.zig +++ b/src/thread/context.zig @@ -2,6 +2,8 @@ const std = @import("std"); const log = std.log.scoped(.thread); const U64 = std.atomic.Value(u64); +// Remove the use waitgroup instead + pub const Self = @This(); processed_struct: U64 = U64.init(0), @@ -27,13 +29,13 @@ pub fn completeThread(self: *Self) void { pub fn incrementAndCheckStructLimit(self: *Self) bool { if (self.max_struct == 0) return false; - const new_count = self.processed_struct.fetchAdd(1, .monotonic); + const new_count = self.processed_struct.fetchAdd(1, .acquire); return (new_count + 1) >= self.max_struct; } pub fn checkStructLimit(self: *Self) bool { if (self.max_struct == 0) return false; - const count = self.processed_struct.load(.monotonic); + const count = self.processed_struct.load(.acquire); return (count) >= self.max_struct; } diff --git a/src/ziql/parser.zig b/src/ziql/parser.zig index 2b78ab9..ef22a47 100644 --- a/src/ziql/parser.zig +++ b/src/ziql/parser.zig @@ -80,8 +80,8 @@ pub fn init(file_engine: *FileEngine, schema_engine: *SchemaEngine) Self { }; } -pub fn parse(self: *Self, buffer: [:0]const u8) ZipponError!void { - var arena = std.heap.ArenaAllocator.init(std.heap.page_allocator); +pub fn parse(self: *Self, parent_allocator: Allocator, buffer: [:0]const u8) ZipponError!void { + var arena = std.heap.ArenaAllocator.init(parent_allocator); defer arena.deinit(); const allocator = arena.allocator();