Error found!!

Got an error that I could find for a while. It was when parsing a lot of
files.

Turn out it was the Thread Pool or something because if I run on 1 core,
it's ok.

Different thing in this commit, I just want to freeze a state that
benchmark work.
This commit is contained in:
Adrien Bouvais 2025-01-22 11:34:00 +01:00
parent ebf91bb61c
commit 7c34431702
11 changed files with 164 additions and 111 deletions

View File

@ -10,43 +10,81 @@ const times = [_][]const u8{ "12:04", "20:45:11", "03:11:13", "03:00:01.0152" };
const datetimes = [_][]const u8{ "2000/01/01-12:04", "1954/04/02-20:45:11", "1998/01/21-03:11:13", "1977/12/31-03:00:01.0153" };
const scores = [_]i32{ 1, 2, 3, 4, 5, 6, 7, 8, 9, 10 };
pub const std_options = .{
pub const std_options = std.Options{
.log_level = .info,
.logFn = myLog,
};
var date_buffer: [64]u8 = undefined;
var date_fa = std.heap.FixedBufferAllocator.init(&date_buffer);
const date_allocator = date_fa.allocator();
pub fn myLog(
comptime message_level: std.log.Level,
comptime scope: @Type(.EnumLiteral),
comptime scope: @Type(.enum_literal),
comptime format: []const u8,
args: anytype,
) void {
if (true) return;
_ = message_level;
_ = scope;
_ = format;
_ = args;
const level_txt = comptime message_level.asText();
const prefix = if (scope == .default) " - " else "(" ++ @tagName(scope) ++ ") - ";
const potential_file: ?std.fs.File = std.fs.cwd().openFile("benchmarkDB/LOG/log", .{ .mode = .write_only }) catch null;
if (potential_file) |file| {
date_fa.reset();
const now = @import("dtype").DateTime.now();
var date_format_buffer = std.ArrayList(u8).init(date_allocator);
defer date_format_buffer.deinit();
now.format("YYYY/MM/DD-HH:mm:ss.SSSS", date_format_buffer.writer()) catch return;
file.seekFromEnd(0) catch return;
const writer = file.writer();
writer.print("{s}{s}Time: {s} - ", .{ level_txt, prefix, date_format_buffer.items }) catch return;
writer.print(format, args) catch return;
writer.writeByte('\n') catch return;
file.close();
}
}
pub fn main() !void {
var gpa = std.heap.GeneralPurposeAllocator(.{ .safety = true }){};
defer {
const deinit_status = gpa.deinit();
switch (deinit_status) {
.leak => @panic("Oupsy"),
.ok => {},
}
}
try benchmark(gpa.allocator());
}
test "benchmark" {
const allocator = std.testing.allocator;
try benchmark(allocator);
}
// Maybe I can make it a test to use the testing alloc
pub fn main() !void {
const allocator = std.heap.page_allocator;
const to_test = [_]usize{ 500, 50_000, 1_000_000 };
pub fn benchmark(allocator: std.mem.Allocator) !void {
const to_test = [_]usize{ 5, 50, 500, 5_000, 50_000, 500_000, 5_000_000 };
var line_buffer: [1024 * 1024]u8 = undefined;
for (to_test) |users_count| {
var db_engine = DBEngine.init(allocator, "benchmarkDB", "schema/benchmark");
defer db_engine.deinit();
// Empty db
{
const null_term_query_str = try std.fmt.bufPrintZ(&line_buffer, "DELETE User {{}}", .{});
db_engine.runQuery(null_term_query_str);
}
// Populate with random dummy value
// Need some speed up, spended times to find that it is the parsonConditionValue that take time, the last switch to be exact, that parse str to value
{
std.debug.print("\n=====================================\n\n", .{});
std.debug.print("Populating with {d} users.\n", .{users_count});
var prng = std.rand.DefaultPrng.init(0);
var prng = std.Random.DefaultPrng.init(0);
const rng = prng.random();
const populate_start_time = std.time.nanoTimestamp();
@ -62,7 +100,7 @@ pub fn main() !void {
},
);
for (users_count - 1) |_| {
for (0..users_count - 1) |_| {
try writer.print(
"('{s}', '{s}', none)",
.{
@ -82,7 +120,7 @@ pub fn main() !void {
std.debug.print("Populate duration: {d:.6} seconds\n\n", .{populate_duration});
var buffer = std.ArrayList(u8).init(std.heap.page_allocator);
var buffer = std.ArrayList(u8).init(allocator);
defer buffer.deinit();
try db_engine.file_engine.writeDbMetrics(&buffer);
std.debug.print("{s}\n", .{buffer.items});
@ -99,10 +137,11 @@ pub fn main() !void {
// }
//}
// Define your benchmark queries
// Run query
{
const queries = [_][]const u8{
"GRAB User {}",
"GRAB User {name='asd'}",
"GRAB User [1] {}",
"GRAB User [name] {}",
"GRAB User {name = 'Charlie'}",
@ -122,8 +161,7 @@ pub fn main() !void {
std.debug.print("Query: \t\t{s}\nDuration: \t{d:.6} ms\n\n", .{ query, duration });
}
std.debug.print("=====================================\n\n", .{});
}
}
std.debug.print("=====================================\n\n", .{});
}

View File

@ -103,6 +103,22 @@ pub fn build(b: *std.Build) void {
test_step.dependOn(&run_tests6.step);
}
// Test
// -----------------------------------------------
{
const tests1 = b.addTest(.{
.root_source_file = b.path("lib/zid.zig"),
.target = target,
.optimize = optimize,
.name = "File parsing",
.test_runner = b.path("test_runner.zig"),
});
const run_tests1 = b.addRunArtifact(tests1);
const test_step = b.step("benchmark-parsing", "Run the benchmark of ZipponData (Single thread).");
test_step.dependOn(&run_tests1.step);
}
// Benchmark
// -----------------------------------------------
{
@ -152,7 +168,7 @@ pub fn build(b: *std.Build) void {
.name = exe_name,
.root_source_file = b.path("src/main.zig"),
.target = tar,
.optimize = .ReleaseSafe,
.optimize = .ReleaseFast,
});
// Add the same imports as your main executable

View File

@ -1,6 +1,6 @@
pub const BUFFER_SIZE = 1024 * 10; // Used a bit everywhere. The size for the schema for example. 10kB
pub const MAX_FILE_SIZE = 1024 * 1024; // 1MB
pub const CPU_CORE = 16;
pub const CPU_CORE = 0;
// Debug
pub const PRINT_STATE = false;

View File

@ -1,6 +1,6 @@
pub const BUFFER_SIZE = 1024 * 1024; // Used a bit everywhere. The size for the schema for example. 10kB
pub const MAX_FILE_SIZE = 1024 * 1024; // 1MB
pub const CPU_CORE = 16;
pub const CPU_CORE = 1;
// Debug
pub const PRINT_STATE = false;

View File

@ -745,7 +745,7 @@ test "Write and Read" {
try std.fs.cwd().deleteDir("tmp");
}
test "Benchmark Write and Read" {
test "Benchmark Write and Read All" {
const schema = &[_]DType{
.Int,
.Float,
@ -767,6 +767,20 @@ test "Benchmark Write and Read" {
try benchmark(schema, data);
}
test "Benchmark Write and Read Simple User" {
const schema = &[_]DType{
.Int,
.Str,
};
const data = &[_]Data{
Data.initInt(1),
Data.initStr("Bob"),
};
try benchmark(schema, data);
}
fn benchmark(schema: []const DType, data: []const Data) !void {
const allocator = std.testing.allocator;
const sizes = [_]usize{ 1, 10, 100, 1_000, 10_000, 100_000, 1_000_000 };

View File

@ -27,15 +27,15 @@ file_engine: FileEngine = undefined,
schema_engine: SchemaEngine = undefined,
thread_engine: ThreadEngine = undefined,
pub fn init(parent_allocator: Allocator, potential_main_path: ?[]const u8, potential_schema_path: ?[]const u8) Self {
pub fn init(allocator: Allocator, potential_main_path: ?[]const u8, potential_schema_path: ?[]const u8) Self {
var self = Self{};
const arena = parent_allocator.create(std.heap.ArenaAllocator) catch {
const arena = allocator.create(std.heap.ArenaAllocator) catch {
log.err("Error when init Engine DB allocator", .{});
self.state = .MissingAllocator;
return self;
};
arena.* = std.heap.ArenaAllocator.init(parent_allocator);
arena.* = std.heap.ArenaAllocator.init(allocator);
self.arena = arena;
self.allocator = arena.allocator();
@ -152,7 +152,7 @@ pub fn getEnvVariable(allocator: Allocator, variable: []const u8) ?[]const u8 {
pub fn runQuery(self: *Self, null_term_query_str: [:0]const u8) void {
var parser = ziqlParser.init(&self.file_engine, &self.schema_engine);
parser.parse(null_term_query_str) catch |err| log.err("Error parsing: {any}", .{err});
parser.parse(self.allocator, null_term_query_str) catch |err| log.err("Error parsing: {any}", .{err});
}
pub fn deinit(self: *Self) void {

View File

@ -8,6 +8,8 @@ const DataType = dtype.DataType;
const DateTime = dtype.DateTime;
const UUID = dtype.UUID;
// TODO: Move this from FileEngine
const ZipponError = @import("error").ZipponError;
pub fn writeEntityTable(

View File

@ -240,7 +240,6 @@ pub fn parseEntities(
const allocator = arena.allocator();
var buff = std.ArrayList(u8).init(entry_allocator);
defer buff.deinit();
const writer = buff.writer();
const sstruct = try self.schema_engine.structName2SchemaStruct(struct_name);
@ -268,30 +267,35 @@ pub fn parseEntities(
// But at the end, only the number of use CPU/Thread will use list simultanously
// So I could pass list from a thread to another technicly
var thread_writer_list = allocator.alloc(std.ArrayList(u8), to_parse.len) catch return ZipponError.MemoryError;
const data_types = try self.schema_engine.structName2DataType(struct_name);
// Start parsing all file in multiple thread
var wg: std.Thread.WaitGroup = .{};
for (to_parse, 0..) |file_index, i| {
thread_writer_list[file_index] = std.ArrayList(u8).init(allocator);
self.thread_pool.spawn(parseEntitiesOneFile, .{
thread_writer_list[i].writer(),
file_index,
dir,
sstruct.zid_schema,
filter,
additional_data.*,
try self.schema_engine.structName2DataType(struct_name),
&sync_context,
}) catch return ZipponError.ThreadError;
self.thread_pool.spawnWg(
&wg,
parseEntitiesOneFile,
.{
thread_writer_list[i].writer(),
file_index,
dir,
sstruct.zid_schema,
filter,
additional_data.*,
data_types,
&sync_context,
},
);
}
// Wait for all thread to either finish or return an error
while (!sync_context.isComplete()) std.time.sleep(100_000); // Check every 0.1ms
wg.wait();
// Append all writer to each other
writer.writeByte('[') catch return ZipponError.WriteError;
for (thread_writer_list) |list| writer.writeAll(list.items) catch return ZipponError.WriteError;
writer.writeByte(']') catch return ZipponError.WriteError;
for (thread_writer_list) |list| list.deinit();
// Now I need to do the relation stuff, meaning parsing new files to get the relationship value
// Without relationship to return, this function is basically finish here
@ -321,21 +325,12 @@ fn parseEntitiesOneFile(
defer fa.reset();
const allocator = fa.allocator();
const path = std.fmt.bufPrint(&path_buffer, "{d}.zid", .{file_index}) catch |err| {
sync_context.logError("Error creating file path", err);
return;
};
const path = std.fmt.bufPrint(&path_buffer, "{d}.zid", .{file_index}) catch return;
var iter = zid.DataIterator.init(allocator, path, dir, zid_schema) catch return;
var iter = zid.DataIterator.init(allocator, path, dir, zid_schema) catch |err| {
sync_context.logError("Error initializing DataIterator", err);
return;
};
while (iter.next() catch |err| {
sync_context.logError("Error in iter next", err);
return;
}) |row| {
if (sync_context.checkStructLimit()) break;
while (iter.next() catch return) |row| {
fa.reset();
if (sync_context.checkStructLimit()) return;
if (filter) |f| if (!f.evaluate(row)) continue;
EntityWriter.writeEntityJSON(
@ -343,14 +338,10 @@ fn parseEntitiesOneFile(
row,
additional_data,
data_types,
) catch |err| {
sync_context.logError("Error writing entity", err);
return;
};
if (sync_context.incrementAndCheckStructLimit()) break;
}
) catch return;
_ = sync_context.completeThread();
if (sync_context.incrementAndCheckStructLimit()) return;
}
}
// Receive a map of UUID -> empty JsonString
@ -410,22 +401,25 @@ pub fn parseEntitiesRelationMap(
) catch return ZipponError.MemoryError;
// Start parsing all file in multiple thread
var wg: std.Thread.WaitGroup = .{};
for (to_parse, 0..) |file_index, i| {
thread_map_list[i] = relation_map.map.cloneWithAllocator(allocator) catch return ZipponError.MemoryError;
self.thread_pool.spawn(parseEntitiesRelationMapOneFile, .{
&thread_map_list[i],
file_index,
dir,
sstruct.zid_schema,
relation_map.additional_data,
try self.schema_engine.structName2DataType(struct_name),
&sync_context,
}) catch return ZipponError.ThreadError;
self.thread_pool.spawnWg(
&wg,
parseEntitiesRelationMapOneFile,
.{
&thread_map_list[i],
file_index,
dir,
sstruct.zid_schema,
relation_map.additional_data,
try self.schema_engine.structName2DataType(struct_name),
&sync_context,
},
);
}
// Wait for all thread to either finish or return an error
while (!sync_context.isComplete()) std.time.sleep(100_000); // Check every 0.1ms
wg.wait();
// Now here I should have a list of copy of the map with all UUID a bit everywhere
@ -470,21 +464,11 @@ fn parseEntitiesRelationMapOneFile(
var string_list = std.ArrayList(u8).init(allocator);
const writer = string_list.writer();
const path = std.fmt.bufPrint(&path_buffer, "{d}.zid", .{file_index}) catch |err| {
sync_context.logError("Error creating file path", err);
return;
};
const path = std.fmt.bufPrint(&path_buffer, "{d}.zid", .{file_index}) catch return;
var iter = zid.DataIterator.init(allocator, path, dir, zid_schema) catch return;
var iter = zid.DataIterator.init(allocator, path, dir, zid_schema) catch |err| {
sync_context.logError("Error initializing DataIterator", err);
return;
};
while (iter.next() catch |err| {
sync_context.logError("Error in iter next", err);
return;
}) |row| {
if (sync_context.checkStructLimit()) break;
while (iter.next() catch return) |row| {
if (sync_context.checkStructLimit()) return;
if (!map.contains(row[0].UUID)) continue;
defer string_list.clearRetainingCapacity();
@ -493,23 +477,12 @@ fn parseEntitiesRelationMapOneFile(
row,
additional_data,
data_types,
) catch |err| {
sync_context.logError("Error writing entity", err);
return;
};
) catch return;
map.put(row[0].UUID, JsonString{
.slice = parent_alloc.dupe(u8, string_list.items) catch |err| {
sync_context.logError("Error duping data", err);
return;
},
.slice = parent_alloc.dupe(u8, string_list.items) catch return,
.init = true,
}) catch |err| {
sync_context.logError("Error writing entity", err);
return;
};
if (sync_context.incrementAndCheckStructLimit()) break;
}) catch return;
if (sync_context.incrementAndCheckStructLimit()) return;
}
_ = sync_context.completeThread();
}

View File

@ -11,13 +11,14 @@ var date_buffer: [64]u8 = undefined;
var date_fa = std.heap.FixedBufferAllocator.init(&date_buffer);
const date_allocator = date_fa.allocator();
pub const std_options = .{
pub const std_options = std.Options{
.log_level = .info,
.logFn = myLog,
};
pub fn myLog(
comptime message_level: std.log.Level,
comptime scope: @Type(.EnumLiteral),
comptime scope: @Type(.enum_literal),
comptime format: []const u8,
args: anytype,
) void {
@ -48,10 +49,17 @@ pub fn setLogPath(path: []const u8) void {
}
pub fn main() !void {
var arena = std.heap.ArenaAllocator.init(std.heap.page_allocator);
defer arena.deinit();
var gpa = std.heap.GeneralPurposeAllocator(.{ .safety = true }){};
const allocator = gpa.allocator();
defer {
const deinit_status = gpa.deinit();
switch (deinit_status) {
.leak => std.debug.print("Leak...\n", .{}),
.ok => {},
}
}
var cli = Cli.init(arena.allocator(), null, null);
var cli = Cli.init(allocator, null, null);
defer cli.deinit();
try cli.start();

View File

@ -2,6 +2,8 @@ const std = @import("std");
const log = std.log.scoped(.thread);
const U64 = std.atomic.Value(u64);
// Remove the use waitgroup instead
pub const Self = @This();
processed_struct: U64 = U64.init(0),
@ -27,13 +29,13 @@ pub fn completeThread(self: *Self) void {
pub fn incrementAndCheckStructLimit(self: *Self) bool {
if (self.max_struct == 0) return false;
const new_count = self.processed_struct.fetchAdd(1, .monotonic);
const new_count = self.processed_struct.fetchAdd(1, .acquire);
return (new_count + 1) >= self.max_struct;
}
pub fn checkStructLimit(self: *Self) bool {
if (self.max_struct == 0) return false;
const count = self.processed_struct.load(.monotonic);
const count = self.processed_struct.load(.acquire);
return (count) >= self.max_struct;
}

View File

@ -80,8 +80,8 @@ pub fn init(file_engine: *FileEngine, schema_engine: *SchemaEngine) Self {
};
}
pub fn parse(self: *Self, buffer: [:0]const u8) ZipponError!void {
var arena = std.heap.ArenaAllocator.init(std.heap.page_allocator);
pub fn parse(self: *Self, parent_allocator: Allocator, buffer: [:0]const u8) ZipponError!void {
var arena = std.heap.ArenaAllocator.init(parent_allocator);
defer arena.deinit();
const allocator = arena.allocator();