Moved thread stuff into a directory

This commit is contained in:
Adrien Bouvais 2025-01-12 00:11:03 +01:00
parent d775ff1d0a
commit c0e8b07025
9 changed files with 236 additions and 244 deletions

View File

@ -28,14 +28,7 @@ pub fn deinit(self: *UUIDIndexMap) void {
} }
pub fn put(self: *UUIDIndexMap, uuid: UUID, file_index: usize) !void { pub fn put(self: *UUIDIndexMap, uuid: UUID, file_index: usize) !void {
const allocator = self.arena.allocator(); try self.map.*.put(uuid, file_index);
const new_uuid = try allocator.create(UUID);
new_uuid.* = uuid;
const new_file_index = try allocator.create(usize);
new_file_index.* = file_index;
try self.map.*.put(new_uuid.*, new_file_index.*);
} }
pub fn contains(self: UUIDIndexMap, uuid: UUID) bool { pub fn contains(self: UUIDIndexMap, uuid: UUID) bool {

View File

@ -10,52 +10,51 @@ const UUID = dtype.UUID;
const ZipponError = @import("errors.zig").ZipponError; const ZipponError = @import("errors.zig").ZipponError;
pub const EntityWriter = struct { pub fn writeEntityTable(
pub fn writeEntityTable(
writer: anytype, writer: anytype,
row: []zid.Data, row: []zid.Data,
additional_data: AdditionalData, additional_data: AdditionalData,
data_types: []const DataType, data_types: []const DataType,
) !void { ) !void {
try writer.writeAll("| "); try writer.writeAll("| ");
for (additional_data.childrens.items) |member| { for (additional_data.childrens.items) |member| {
try writeValue(writer, row[member.index], data_types[member.index]); try writeValue(writer, row[member.index], data_types[member.index]);
try writer.writeAll(" \t| "); try writer.writeAll(" \t| ");
} }
try writer.writeByte('\n'); try writer.writeByte('\n');
} }
pub fn writeHeaderCsv( pub fn writeHeaderCsv(
writer: anytype, writer: anytype,
members: [][]const u8, members: [][]const u8,
delimiter: u8, delimiter: u8,
) !void { ) !void {
for (members, 0..) |member, i| { for (members, 0..) |member, i| {
try writer.writeAll(member); try writer.writeAll(member);
if (i < members.len - 1) try writer.writeByte(delimiter); if (i < members.len - 1) try writer.writeByte(delimiter);
} }
try writer.writeByte('\n'); try writer.writeByte('\n');
} }
pub fn writeEntityCsv( // FIXME: I think if one value str have a \n this will broke. I need to use like """ pub fn writeEntityCsv( // FIXME: I think if one value str have a \n this will broke. I need to use like """
writer: anytype, writer: anytype,
row: []zid.Data, row: []zid.Data,
data_types: []const DataType, data_types: []const DataType,
delimiter: u8, delimiter: u8,
) !void { ) !void {
for (0..row.len) |i| { for (0..row.len) |i| {
try writeValue(writer, row[i], data_types[i]); try writeValue(writer, row[i], data_types[i]);
if (i < row.len - 1) try writer.writeByte(delimiter); if (i < row.len - 1) try writer.writeByte(delimiter);
} }
try writer.writeByte('\n'); try writer.writeByte('\n');
} }
pub fn writeEntityJSON( pub fn writeEntityJSON(
writer: anytype, writer: anytype,
row: []zid.Data, row: []zid.Data,
additional_data: AdditionalData, additional_data: AdditionalData,
data_types: []const DataType, data_types: []const DataType,
) !void { ) !void {
try writer.writeByte('{'); try writer.writeByte('{');
for (additional_data.childrens.items) |member| { for (additional_data.childrens.items) |member| {
try writer.print("{s}: ", .{member.name}); try writer.print("{s}: ", .{member.name});
@ -63,9 +62,9 @@ pub const EntityWriter = struct {
try writer.writeAll(", "); try writer.writeAll(", ");
} }
try writer.writeAll("}, "); try writer.writeAll("}, ");
} }
fn writeValue(writer: anytype, value: zid.Data, data_type: DataType) !void { fn writeValue(writer: anytype, value: zid.Data, data_type: DataType) !void {
switch (value) { switch (value) {
.Float => |v| try writer.print("{d}", .{v}), .Float => |v| try writer.print("{d}", .{v}),
.Int => |v| try writer.print("{d}", .{v}), .Int => |v| try writer.print("{d}", .{v}),
@ -96,9 +95,9 @@ pub const EntityWriter = struct {
}, },
.IntArray, .FloatArray, .StrArray, .UUIDArray, .BoolArray, .UnixArray => try writeArray(writer, value, data_type), .IntArray, .FloatArray, .StrArray, .UUIDArray, .BoolArray, .UnixArray => try writeArray(writer, value, data_type),
} }
} }
fn writeArray(writer: anytype, data: zid.Data, data_type: DataType) ZipponError!void { fn writeArray(writer: anytype, data: zid.Data, data_type: DataType) ZipponError!void {
writer.writeByte('[') catch return ZipponError.WriteError; writer.writeByte('[') catch return ZipponError.WriteError;
var iter = zid.ArrayIterator.init(data) catch return ZipponError.ZipponDataError; var iter = zid.ArrayIterator.init(data) catch return ZipponError.ZipponDataError;
switch (data) { switch (data) {
@ -121,11 +120,11 @@ pub const EntityWriter = struct {
else => unreachable, else => unreachable,
} }
writer.writeByte(']') catch return ZipponError.WriteError; writer.writeByte(']') catch return ZipponError.WriteError;
} }
/// Take a string in the JSON format and look for {|<[16]u8>|}, then will look into the map and check if it can find this UUID /// Take a string in the JSON format and look for {|<[16]u8>|}, then will look into the map and check if it can find this UUID
/// If it find it, it ill replace the {|<[16]u8>|} will the value /// If it find it, it ill replace the {|<[16]u8>|} will the value
pub fn updateWithRelation(writer: anytype, input: []const u8, map: std.AutoHashMap([16]u8, JsonString)) ZipponError!void { pub fn updateWithRelation(writer: anytype, input: []const u8, map: std.AutoHashMap([16]u8, JsonString)) ZipponError!void {
var uuid_bytes: [16]u8 = undefined; var uuid_bytes: [16]u8 = undefined;
var start: usize = 0; var start: usize = 0;
while (std.mem.indexOf(u8, input[start..], "{|<")) |pos| { while (std.mem.indexOf(u8, input[start..], "{|<")) |pos| {
@ -151,9 +150,9 @@ pub const EntityWriter = struct {
// Write any remaining text // Write any remaining text
writer.writeAll(input[start..]) catch return ZipponError.WriteError; writer.writeAll(input[start..]) catch return ZipponError.WriteError;
} }
fn updateArray(writer: anytype, input: []const u8, map: std.AutoHashMap([16]u8, JsonString), origin: usize) ZipponError!usize { fn updateArray(writer: anytype, input: []const u8, map: std.AutoHashMap([16]u8, JsonString), origin: usize) ZipponError!usize {
var uuid_bytes: [16]u8 = undefined; var uuid_bytes: [16]u8 = undefined;
var start = origin; var start = origin;
while (input.len > start + 23 and std.mem.eql(u8, input[start .. start + 3], "{|<") and std.mem.eql(u8, input[start + 19 .. start + 23], ">|},")) : (start += 23) { while (input.len > start + 23 and std.mem.eql(u8, input[start .. start + 3], "{|<") and std.mem.eql(u8, input[start + 19 .. start + 23], ">|},")) : (start += 23) {
@ -165,5 +164,4 @@ pub const EntityWriter = struct {
} }
} }
return start; return start;
} }
};

View File

@ -4,10 +4,10 @@ const zid = @import("ZipponData");
const U64 = std.atomic.Value(u64); const U64 = std.atomic.Value(u64);
const Pool = std.Thread.Pool; const Pool = std.Thread.Pool;
const Allocator = std.mem.Allocator; const Allocator = std.mem.Allocator;
const SchemaEngine = @import("schemaEngine.zig").SchemaEngine; const SchemaEngine = @import("schemaEngine.zig");
const SchemaStruct = @import("schemaEngine.zig").SchemaStruct; const SchemaStruct = @import("schemaEngine.zig").SchemaStruct;
const ThreadSyncContext = @import("threadEngine.zig").ThreadSyncContext; const ThreadSyncContext = @import("thread/context.zig");
const EntityWriter = @import("entityWriter.zig").EntityWriter; const EntityWriter = @import("entityWriter.zig");
const dtype = @import("dtype"); const dtype = @import("dtype");
const s2t = dtype.s2t; const s2t = dtype.s2t;
@ -15,9 +15,9 @@ const UUID = dtype.UUID;
const DateTime = dtype.DateTime; const DateTime = dtype.DateTime;
const DataType = dtype.DataType; const DataType = dtype.DataType;
const AdditionalData = @import("dataStructure/additionalData.zig").AdditionalData; const AdditionalData = @import("dataStructure/additionalData.zig");
const Filter = @import("dataStructure/filter.zig").Filter; const Filter = @import("dataStructure/filter.zig").Filter;
const RelationMap = @import("dataStructure/relationMap.zig").RelationMap; const RelationMap = @import("dataStructure/relationMap.zig");
const JsonString = @import("dataStructure/relationMap.zig").JsonString; const JsonString = @import("dataStructure/relationMap.zig").JsonString;
const ConditionValue = @import("dataStructure/filter.zig").ConditionValue; const ConditionValue = @import("dataStructure/filter.zig").ConditionValue;

View File

@ -4,16 +4,16 @@ const send = utils.send;
const Allocator = std.mem.Allocator; const Allocator = std.mem.Allocator;
const Pool = std.Thread.Pool; const Pool = std.Thread.Pool;
const FileEngine = @import("fileEngine.zig").FileEngine; const FileEngine = @import("fileEngine.zig");
const SchemaEngine = @import("schemaEngine.zig").SchemaEngine; const SchemaEngine = @import("schemaEngine.zig");
const ThreadEngine = @import("threadEngine.zig").ThreadEngine; const ThreadEngine = @import("thread/engine.zig");
const cliTokenizer = @import("tokenizers/cli.zig").Tokenizer; const cliTokenizer = @import("tokenizers/cli.zig").Tokenizer;
const cliToken = @import("tokenizers/cli.zig").Token; const cliToken = @import("tokenizers/cli.zig").Token;
const ziqlTokenizer = @import("tokenizers/ziql.zig").Tokenizer; const ziqlTokenizer = @import("tokenizers/ziql.zig").Tokenizer;
const ziqlToken = @import("tokenizers/ziql.zig").Token; const ziqlToken = @import("tokenizers/ziql.zig").Token;
const ziqlParser = @import("ziqlParser.zig").Parser; const ziqlParser = @import("ziqlParser.zig");
const ZipponError = @import("errors.zig").ZipponError; const ZipponError = @import("errors.zig").ZipponError;
@ -86,7 +86,7 @@ pub const DBEngine = struct {
pub fn init(potential_main_path: ?[]const u8, potential_schema_path: ?[]const u8) DBEngine { pub fn init(potential_main_path: ?[]const u8, potential_schema_path: ?[]const u8) DBEngine {
var self = DBEngine{}; var self = DBEngine{};
self.thread_engine = ThreadEngine.init(); self.thread_engine = ThreadEngine.init() catch @panic("TODO");
const potential_main_path_or_environment_variable = potential_main_path orelse utils.getEnvVariable("ZIPPONDB_PATH"); const potential_main_path_or_environment_variable = potential_main_path orelse utils.getEnvVariable("ZIPPONDB_PATH");
if (potential_main_path_or_environment_variable) |main_path| { if (potential_main_path_or_environment_variable) |main_path| {

View File

@ -1,18 +1,18 @@
const std = @import("std"); const std = @import("std");
const zid = @import("ZipponData"); const zid = @import("ZipponData");
const Allocator = std.mem.Allocator; const Allocator = std.mem.Allocator;
const Parser = @import("schemaParser.zig").Parser; const Parser = @import("schemaParser.zig");
const Tokenizer = @import("tokenizers/schema.zig").Tokenizer; const Tokenizer = @import("tokenizers/schema.zig").Tokenizer;
const ZipponError = @import("errors.zig").ZipponError; const ZipponError = @import("errors.zig").ZipponError;
const dtype = @import("dtype"); const dtype = @import("dtype");
const DataType = dtype.DataType; const DataType = dtype.DataType;
const AdditionalData = @import("dataStructure/additionalData.zig").AdditionalData; const AdditionalData = @import("dataStructure/additionalData.zig");
const RelationMap = @import("dataStructure/relationMap.zig").RelationMap; const RelationMap = @import("dataStructure/relationMap.zig");
const JsonString = @import("dataStructure/relationMap.zig").JsonString; const JsonString = @import("dataStructure/relationMap.zig").JsonString;
const ConditionValue = @import("dataStructure/filter.zig").ConditionValue; const ConditionValue = @import("dataStructure/filter.zig").ConditionValue;
const UUID = dtype.UUID; const UUID = dtype.UUID;
const UUIDFileIndex = @import("dataStructure/UUIDFileIndex.zig").UUIDIndexMap; const UUIDFileIndex = @import("dataStructure/UUIDFileIndex.zig");
const FileEngine = @import("fileEngine.zig").FileEngine; const FileEngine = @import("fileEngine.zig");
// TODO: Create a schemaEngine directory and add this as core and the parser with it // TODO: Create a schemaEngine directory and add this as core and the parser with it

43
src/thread/context.zig Normal file
View File

@ -0,0 +1,43 @@
const std = @import("std");
const log = std.log.scoped(.thread);
const U64 = std.atomic.Value(u64);
pub const Self = @This();
processed_struct: U64 = U64.init(0),
error_file: U64 = U64.init(0),
completed_file: U64 = U64.init(0),
max_struct: u64,
max_file: u64,
pub fn init(max_struct: u64, max_file: u64) Self {
return Self{
.max_struct = max_struct,
.max_file = max_file,
};
}
pub fn isComplete(self: *Self) bool {
return (self.completed_file.load(.acquire) + self.error_file.load(.acquire)) >= self.max_file;
}
pub fn completeThread(self: *Self) void {
_ = self.completed_file.fetchAdd(1, .release);
}
pub fn incrementAndCheckStructLimit(self: *Self) bool {
if (self.max_struct == 0) return false;
const new_count = self.processed_struct.fetchAdd(1, .monotonic);
return (new_count + 1) >= self.max_struct;
}
pub fn checkStructLimit(self: *Self) bool {
if (self.max_struct == 0) return false;
const count = self.processed_struct.load(.monotonic);
return (count) >= self.max_struct;
}
pub fn logError(self: *Self, message: []const u8, err: anyerror) void {
log.err("{s}: {any}", .{ message, err });
_ = self.error_file.fetchAdd(1, .acquire);
}

38
src/thread/engine.zig Normal file
View File

@ -0,0 +1,38 @@
const std = @import("std");
const Pool = std.Thread.Pool;
const Allocator = std.mem.Allocator;
const CPU_CORE = @import("config").CPU_CORE;
const log = std.log.scoped(.thread);
const ZipponError = @import("../errors.zig").ZipponError;
pub const Self = @This();
var arena = std.heap.ArenaAllocator.init(std.heap.page_allocator);
const allocator = arena.allocator();
thread_arena: *std.heap.ThreadSafeAllocator,
thread_pool: *Pool,
pub fn init() ZipponError!Self {
const thread_arena = allocator.create(std.heap.ThreadSafeAllocator) catch return ZipponError.MemoryError;
thread_arena.* = std.heap.ThreadSafeAllocator{
.child_allocator = allocator,
};
const thread_pool = allocator.create(Pool) catch return ZipponError.MemoryError;
thread_pool.init(Pool.Options{
.allocator = thread_arena.allocator(),
.n_jobs = CPU_CORE,
}) catch return ZipponError.ThreadError;
return Self{
.thread_pool = thread_pool,
.thread_arena = thread_arena,
};
}
pub fn deinit(self: *Self) void {
self.thread_pool.deinit();
arena.deinit();
}

View File

@ -1,81 +0,0 @@
// TODO: Put the ThreadSynx stuff and create a ThreadEngine with the arena, pool, and some methods
const std = @import("std");
const U64 = std.atomic.Value(u64);
const Pool = std.Thread.Pool;
const Allocator = std.mem.Allocator;
const ZipponError = @import("errors.zig").ZipponError;
const CPU_CORE = @import("config").CPU_CORE;
const OUT_BUFFER_SIZE = @import("config").OUT_BUFFER_SIZE;
const log = std.log.scoped(.thread);
const allocator = std.heap.page_allocator;
var thread_arena: std.heap.ThreadSafeAllocator = undefined;
var thread_pool: Pool = undefined;
pub const ThreadSyncContext = struct {
processed_struct: std.atomic.Value(u64) = std.atomic.Value(u64).init(0),
error_file: std.atomic.Value(u64) = std.atomic.Value(u64).init(0),
completed_file: std.atomic.Value(u64) = std.atomic.Value(u64).init(0),
max_struct: u64,
max_file: u64,
pub fn init(max_struct: u64, max_file: u64) ThreadSyncContext {
return ThreadSyncContext{
.max_struct = max_struct,
.max_file = max_file,
};
}
pub fn isComplete(self: *ThreadSyncContext) bool {
return (self.completed_file.load(.acquire) + self.error_file.load(.acquire)) >= self.max_file;
}
pub fn completeThread(self: *ThreadSyncContext) void {
_ = self.completed_file.fetchAdd(1, .release);
}
pub fn incrementAndCheckStructLimit(self: *ThreadSyncContext) bool {
if (self.max_struct == 0) return false;
const new_count = self.processed_struct.fetchAdd(1, .monotonic);
return (new_count + 1) >= self.max_struct;
}
pub fn checkStructLimit(self: *ThreadSyncContext) bool {
if (self.max_struct == 0) return false;
const count = self.processed_struct.load(.monotonic);
return (count) >= self.max_struct;
}
pub fn logError(self: *ThreadSyncContext, message: []const u8, err: anyerror) void {
log.err("{s}: {any}", .{ message, err });
_ = self.error_file.fetchAdd(1, .acquire);
}
};
pub const ThreadEngine = @This();
thread_arena: *std.heap.ThreadSafeAllocator,
thread_pool: *Pool,
pub fn init() ThreadEngine {
thread_arena = std.heap.ThreadSafeAllocator{
.child_allocator = allocator,
};
thread_pool.init(std.Thread.Pool.Options{
.allocator = thread_arena.allocator(),
.n_jobs = CPU_CORE,
}) catch @panic("=(");
return ThreadEngine{
.thread_pool = &thread_pool,
.thread_arena = &thread_arena,
};
}
pub fn deinit(_: ThreadEngine) void {
thread_pool.deinit();
}

View File

@ -66,6 +66,7 @@ const State = enum {
}; };
pub const Parser = @This(); pub const Parser = @This();
toker: *Tokenizer, toker: *Tokenizer,
file_engine: *FileEngine, file_engine: *FileEngine,
schema_engine: *SchemaEngine, schema_engine: *SchemaEngine,