SOLVED THREAD BUG

Needed a thread safe alloc. Will need to update other part as I just did
parseEntities and deleteEntities
This commit is contained in:
Adrien Bouvais 2025-01-22 22:34:46 +01:00
parent f05696a924
commit 98f0c69e61
9 changed files with 75 additions and 112 deletions

View File

@ -67,7 +67,7 @@ test "benchmark" {
// Maybe I can make it a test to use the testing alloc
pub fn benchmark(allocator: std.mem.Allocator) !void {
const to_test = [_]usize{ 5, 50, 500, 5_000, 50_000, 500_000, 5_000_000 };
const to_test = [_]usize{ 5, 50, 500, 5_000, 50_000, 500_000, 5_000_000, 10_000_000 };
var line_buffer: [1024 * 1024]u8 = undefined;
for (to_test) |users_count| {
var db_engine = DBEngine.init(allocator, "benchmarkDB", "schema/benchmark");

View File

@ -2,7 +2,7 @@ const std = @import("std");
pub fn build(b: *std.Build) void {
const target = b.standardTargetOptions(.{});
const optimize = b.standardOptimizeOption(.{ .preferred_optimize_mode = .ReleaseFast });
const optimize = b.standardOptimizeOption(.{ .preferred_optimize_mode = .ReleaseSmall });
// Run
// -----------------------------------------------
@ -168,7 +168,7 @@ pub fn build(b: *std.Build) void {
.name = exe_name,
.root_source_file = b.path("src/main.zig"),
.target = tar,
.optimize = .ReleaseFast,
.optimize = optimize,
});
// Add the same imports as your main executable

View File

@ -1,6 +1,6 @@
pub const BUFFER_SIZE = 1024 * 1024; // Used a bit everywhere. The size for the schema for example. 10kB
pub const MAX_FILE_SIZE = 1024 * 1024; // 1MB
pub const CPU_CORE = 1;
pub const CPU_CORE = 16;
// Debug
pub const PRINT_STATE = false;

View File

@ -39,6 +39,10 @@ pub fn get(self: UUIDIndexMap, uuid: UUID) ?usize {
return self.map.get(uuid);
}
pub fn reset(self: *UUIDIndexMap) void {
_ = self.arena.reset(.free_all);
}
test "Create empty UUIDIndexMap" {
const allocator = std.testing.allocator;

View File

@ -31,6 +31,18 @@ pub fn addMember(self: *AdditionalData, name: []const u8, index: usize) ZipponEr
self.childrens.append(AdditionalDataMember.init(self.allocator, name, index)) catch return ZipponError.MemoryError;
}
pub fn clone(self: AdditionalData, allocator: Allocator) ZipponError!AdditionalData {
var new_additional_data = AdditionalData.init(allocator);
new_additional_data.limit = self.limit;
for (self.childrens.items) |child| {
new_additional_data.childrens.append(child.clone(allocator) catch return ZipponError.MemoryError) catch return ZipponError.MemoryError;
}
return new_additional_data;
}
// This is name in: [name]
// There is an additional data because it can be [friend [1; name]]
pub const AdditionalDataMember = struct {
@ -41,4 +53,12 @@ pub const AdditionalDataMember = struct {
pub fn init(allocator: Allocator, name: []const u8, index: usize) AdditionalDataMember {
return AdditionalDataMember{ .name = name, .index = index, .additional_data = AdditionalData.init(allocator) };
}
pub fn clone(self: AdditionalDataMember, allocator: Allocator) ZipponError!AdditionalDataMember {
return AdditionalDataMember{
.name = allocator.dupe(u8, self.name) catch return ZipponError.MemoryError,
.index = self.index,
.additional_data = self.additional_data.clone(allocator) catch return ZipponError.MemoryError,
};
}
};

View File

@ -169,27 +169,15 @@ fn populateVoidUUIDMapOneFile(
defer fa.reset();
const allocator = fa.allocator();
const path = std.fmt.bufPrint(&path_buffer, "{d}.zid", .{file_index}) catch |err| {
sync_context.logError("Error creating file path", err);
return;
};
const path = std.fmt.bufPrint(&path_buffer, "{d}.zid", .{file_index}) catch return;
var iter = zid.DataIterator.init(allocator, path, dir, sstruct.zid_schema) catch |err| {
sync_context.logError("Error initializing DataIterator", err);
return;
};
var iter = zid.DataIterator.init(allocator, path, dir, sstruct.zid_schema) catch return;
defer iter.deinit();
while (iter.next() catch |err| {
sync_context.logError("Error in iter next", err);
return;
}) |row| {
while (iter.next() catch return) |row| {
if (sync_context.checkStructLimit()) break;
if (filter == null or filter.?.evaluate(row)) {
list.*.append(UUID{ .bytes = row[0].UUID }) catch |err| {
sync_context.logError("Error initializing DataIterator", err);
return;
};
list.*.append(UUID{ .bytes = row[0].UUID }) catch return;
if (sync_context.incrementAndCheckStructLimit()) break;
}
@ -207,7 +195,8 @@ pub fn parseEntities(
) ZipponError![]const u8 {
var arena = std.heap.ArenaAllocator.init(self.allocator);
defer arena.deinit();
const allocator = arena.allocator();
var safe_allocator = std.heap.ThreadSafeAllocator{ .child_allocator = self.allocator };
const allocator = safe_allocator.allocator();
var buff = std.ArrayList(u8).init(entry_allocator);
const writer = buff.writer();
@ -231,7 +220,6 @@ pub fn parseEntities(
// Do an array of writer for each thread
var thread_writer_list = allocator.alloc(std.ArrayList(u8), to_parse.len) catch return ZipponError.MemoryError;
const data_types = try self.schema_engine.structName2DataType(struct_name);
// Start parsing all file in multiple thread
var wg: std.Thread.WaitGroup = .{};
@ -247,7 +235,7 @@ pub fn parseEntities(
sstruct.zid_schema,
filter,
additional_data.*,
data_types,
sstruct.types,
&sync_context,
},
);
@ -289,16 +277,17 @@ fn parseEntitiesOneFile(
defer fa.reset();
const allocator = fa.allocator();
var buffered_writer = std.io.bufferedWriter(writer);
const path = std.fmt.bufPrint(&path_buffer, "{d}.zid", .{file_index}) catch return;
var iter = zid.DataIterator.init(allocator, path, dir, zid_schema) catch return;
while (iter.next() catch return) |row| {
fa.reset();
if (sync_context.checkStructLimit()) return;
if (filter) |f| if (!f.evaluate(row)) continue;
EntityWriter.writeEntityJSON(
writer,
buffered_writer.writer(),
row,
additional_data,
data_types,
@ -306,6 +295,7 @@ fn parseEntitiesOneFile(
if (sync_context.incrementAndCheckStructLimit()) return;
}
buffered_writer.flush() catch return;
}
// Receive a map of UUID -> empty JsonString

View File

@ -164,6 +164,7 @@ pub fn allFileIndex(self: Self, allocator: Allocator, struct_name: []const u8) Z
var iter = dir.iterate();
while (iter.next() catch return ZipponError.DirIterError) |entry| {
if (entry.kind != .file) continue;
if (std.mem.eql(u8, entry.name[0..(entry.name.len - 4)], ".new")) continue; // TODO: Delete the file, shouldn't be here
const index = std.fmt.parseInt(usize, entry.name[0..(entry.name.len - 4)], 10) catch return ZipponError.InvalidFileIndex;
array.append(index) catch return ZipponError.MemoryError;
}

View File

@ -153,10 +153,7 @@ fn updateEntitiesOneFile(
defer fa.reset();
const allocator = fa.allocator();
var new_data_buff = allocator.alloc(zid.Data, index_switch.len) catch |err| {
sync_context.logError("Cant init new data buff", err);
return;
};
var new_data_buff = allocator.alloc(zid.Data, index_switch.len) catch return;
// First I fill the one that are updated by a const
for (index_switch, 0..) |is, i| switch (is) {
@ -232,7 +229,8 @@ pub fn deleteEntities(
) ZipponError!void {
var arena = std.heap.ArenaAllocator.init(self.allocator);
defer arena.deinit();
const allocator = arena.allocator();
var safe_allocator = std.heap.ThreadSafeAllocator{ .child_allocator = self.allocator };
const allocator = safe_allocator.allocator();
const sstruct = try self.schema_engine.structName2SchemaStruct(struct_name);
@ -244,24 +242,24 @@ pub fn deleteEntities(
// Create a thread-safe writer for each file
var thread_writer_list = allocator.alloc(std.ArrayList(u8), to_parse.len) catch return ZipponError.MemoryError;
for (thread_writer_list) |*list| {
list.* = std.ArrayList(u8).init(allocator);
}
// Spawn threads for each file
var wg: std.Thread.WaitGroup = .{};
for (to_parse, 0..) |file_index, i| self.thread_pool.spawnWg(
&wg,
deleteEntitiesOneFile,
.{
sstruct,
filter,
thread_writer_list[i].writer(),
file_index,
dir,
&sync_context,
},
);
for (to_parse, 0..) |file_index, i| {
thread_writer_list[i] = std.ArrayList(u8).init(allocator);
self.thread_pool.spawnWg(
&wg,
deleteEntitiesOneFile,
.{
thread_writer_list[i].writer(),
file_index,
dir,
sstruct.zid_schema,
filter,
&sync_context,
},
);
}
wg.wait();
// Combine results
@ -274,16 +272,16 @@ pub fn deleteEntities(
// FIXME: Stop doing that and just remove UUID from the map itself instead of reparsing everything at the end
// It's just that I can't do it in deleteEntitiesOneFile itself
sstruct.uuid_file_index.map.clearRetainingCapacity();
_ = sstruct.uuid_file_index.arena.reset(.free_all);
_ = sstruct.uuid_file_index.reset();
try self.populateFileIndexUUIDMap(sstruct, sstruct.uuid_file_index);
}
fn deleteEntitiesOneFile(
sstruct: SchemaStruct,
filter: ?Filter,
writer: anytype,
file_index: u64,
dir: std.fs.Dir,
zid_schema: []zid.DType,
filter: ?Filter,
sync_context: *ThreadSyncContext,
) void {
var data_buffer: [config.BUFFER_SIZE]u8 = undefined;
@ -291,77 +289,36 @@ fn deleteEntitiesOneFile(
defer fa.reset();
const allocator = fa.allocator();
const path = std.fmt.allocPrint(allocator, "{d}.zid", .{file_index}) catch |err| {
sync_context.logError("Error creating file path", err);
return;
};
const path = std.fmt.allocPrint(allocator, "{d}.zid", .{file_index}) catch return;
var iter = zid.DataIterator.init(allocator, path, dir, sstruct.zid_schema) catch |err| {
sync_context.logError("Error initializing DataIterator", err);
return;
};
var iter = zid.DataIterator.init(allocator, path, dir, zid_schema) catch return;
defer iter.deinit();
const new_path = std.fmt.allocPrint(allocator, "{d}.zid.new", .{file_index}) catch |err| {
sync_context.logError("Error creating file path", err);
return;
};
const new_path = std.fmt.allocPrint(allocator, "{d}.zid.new", .{file_index}) catch return;
zid.createFile(new_path, dir) catch return;
zid.createFile(new_path, dir) catch |err| {
sync_context.logError("Error creating new file", err);
return;
};
var new_writer = zid.DataWriter.init(new_path, dir) catch |err| {
sync_context.logError("Error initializing DataWriter", err);
return;
};
var new_writer = zid.DataWriter.init(new_path, dir) catch return;
errdefer new_writer.deinit();
var finish_writing = false;
while (iter.next() catch |err| {
sync_context.logError("Error during iter", err);
return;
}) |row| {
while (iter.next() catch return) |row| {
if (!finish_writing and (filter == null or filter.?.evaluate(row))) {
writer.print("{{\"{s}\"}},", .{UUID.format_bytes(row[0].UUID)}) catch |err| {
sync_context.logError("Error writting", err);
return;
};
writer.print("{{\"{s}\"}},", .{UUID.format_bytes(row[0].UUID)}) catch return;
finish_writing = sync_context.incrementAndCheckStructLimit();
} else {
new_writer.write(row) catch |err| {
sync_context.logError("Error writing unchanged data", err);
return;
};
new_writer.write(row) catch return;
}
}
new_writer.flush() catch |err| {
sync_context.logError("Error flushing new writer", err);
return;
};
new_writer.flush() catch return;
dir.deleteFile(path) catch |err| {
sync_context.logError("Error deleting old file", err);
return;
};
dir.deleteFile(path) catch return;
const file_stat = new_writer.fileStat() catch |err| {
sync_context.logError("Error getting new file stat", err);
return;
};
const file_stat = new_writer.fileStat() catch return;
new_writer.deinit();
if (file_index != 0 and file_stat.size == 0) dir.deleteFile(new_path) catch |err| {
sync_context.logError("Error deleting empty new file", err);
return;
if (file_index != 0 and file_stat.size == 0) {
dir.deleteFile(new_path) catch return;
} else {
dir.rename(new_path, path) catch |err| {
sync_context.logError("Error renaming new file", err);
return;
};
dir.rename(new_path, path) catch return;
}
sync_context.completeThread();
}

View File

@ -17,10 +17,6 @@ pub fn init(max_struct: u64) Self {
};
}
pub fn completeThread(self: *Self) void {
_ = self.completed_file.fetchAdd(1, .release);
}
pub fn incrementAndCheckStructLimit(self: *Self) bool {
if (self.max_struct == 0) return false;
const new_count = self.processed_struct.fetchAdd(1, .acquire);
@ -32,8 +28,3 @@ pub fn checkStructLimit(self: *Self) bool {
const count = self.processed_struct.load(.acquire);
return (count) >= self.max_struct;
}
pub fn logError(self: *Self, message: []const u8, err: anyerror) void {
log.err("{s}: {any}", .{ message, err });
_ = self.error_file.fetchAdd(1, .acquire);
}