Delete Entities now use ZipponData
Also stopped to parse then delete, now I parse and delete at the same time for perf issue
This commit is contained in:
parent
bead52df5a
commit
a20a60e566
@ -265,7 +265,6 @@ pub const FileEngine = struct {
|
||||
}
|
||||
|
||||
/// Take a condition and an array of UUID and fill the array with all UUID that match the condition
|
||||
/// TODO: Use the new filter and DataIterator
|
||||
pub fn getUUIDListUsingFilter(self: *FileEngine, struct_name: []const u8, filter: Filter, uuid_list: *std.ArrayList(UUID)) FileEngineError!void {
|
||||
const sstruct = try self.structName2SchemaStruct(struct_name);
|
||||
const max_file_index = try self.maxFileIndex(sstruct.name);
|
||||
@ -292,14 +291,9 @@ pub const FileEngine = struct {
|
||||
}
|
||||
}
|
||||
|
||||
fn isIn(array: []usize, value: usize) bool {
|
||||
for (array) |v| if (v == value) return true;
|
||||
return false;
|
||||
}
|
||||
|
||||
/// Take a filter, parse all file and if one struct if validate by the filter, write it in a JSON format to the writer
|
||||
/// filter can be null. This will return all of them
|
||||
pub fn parseToSendUsingFilter(
|
||||
pub fn parseEntities(
|
||||
self: *FileEngine,
|
||||
struct_name: []const u8,
|
||||
filter: ?Filter,
|
||||
@ -605,138 +599,60 @@ pub const FileEngine = struct {
|
||||
}
|
||||
}
|
||||
|
||||
/// Take a kist of UUID and a struct name and delete the row with same UUID
|
||||
/// TODO: Use B+Tree
|
||||
pub fn deleteEntities(self: *FileEngine, struct_name: []const u8, uuids: []UUID) FileEngineError!usize {
|
||||
const max_file_index = self.maxFileIndex(struct_name) catch @panic("Cant get max index file when updating");
|
||||
var current_file_index: usize = 0;
|
||||
/// Will delete all entity based on the filter. Will also write a JSON format list of all UUID deleted into the buffer
|
||||
pub fn deleteEntities(
|
||||
self: *FileEngine,
|
||||
struct_name: []const u8,
|
||||
filter: ?Filter,
|
||||
buffer: *std.ArrayList(u8),
|
||||
additional_data: *AdditionalData,
|
||||
) FileEngineError!void {
|
||||
const sstruct = try self.structName2SchemaStruct(struct_name);
|
||||
const max_file_index = try self.maxFileIndex(sstruct.name);
|
||||
var total_currently_found: usize = 0;
|
||||
|
||||
var path_buff = std.fmt.allocPrint(
|
||||
self.allocator,
|
||||
"{s}/DATA/{s}/{d}.csv",
|
||||
.{ self.path_to_ZipponDB_dir, struct_name, current_file_index },
|
||||
"{s}/DATA/{s}",
|
||||
.{ self.path_to_ZipponDB_dir, sstruct.name },
|
||||
) catch return FileEngineError.MemoryError;
|
||||
defer self.allocator.free(path_buff);
|
||||
const dir = std.fs.cwd().openDir(path_buff, .{}) catch return FileEngineError.CantOpenDir;
|
||||
|
||||
var path_buff2 = std.fmt.allocPrint(
|
||||
self.allocator,
|
||||
"{s}/DATA/{s}/{d}.csv",
|
||||
.{ self.path_to_ZipponDB_dir, struct_name, current_file_index },
|
||||
) catch return FileEngineError.MemoryError;
|
||||
defer self.allocator.free(path_buff2);
|
||||
var writer = buffer.writer();
|
||||
writer.writeAll("[") catch return FileEngineError.WriteError;
|
||||
for (0..(max_file_index + 1)) |file_index| { // TODO: Multi thread that
|
||||
self.allocator.free(path_buff);
|
||||
path_buff = std.fmt.allocPrint(self.allocator, "{d}.zid", .{file_index}) catch return FileEngineError.MemoryError;
|
||||
|
||||
var old_file = std.fs.cwd().openFile(path_buff, .{}) catch return FileEngineError.CantOpenFile;
|
||||
var iter = zid.DataIterator.init(self.allocator, path_buff, dir, sstruct.zid_schema) catch return FileEngineError.ZipponDataError;
|
||||
defer iter.deinit();
|
||||
|
||||
self.allocator.free(path_buff);
|
||||
path_buff = std.fmt.allocPrint(
|
||||
self.allocator,
|
||||
"{s}/DATA/{s}/{d}.csv.new",
|
||||
.{ self.path_to_ZipponDB_dir, struct_name, current_file_index },
|
||||
) catch return FileEngineError.MemoryError;
|
||||
const new_path_buff = std.fmt.allocPrint(self.allocator, "{d}.zid.new", .{file_index}) catch return FileEngineError.MemoryError;
|
||||
defer self.allocator.free(new_path_buff);
|
||||
|
||||
var new_file = std.fs.cwd().createFile(path_buff, .{}) catch return FileEngineError.CantOpenFile;
|
||||
defer new_file.close();
|
||||
zid.createFile(new_path_buff, dir) catch return FileEngineError.ZipponDataError;
|
||||
var new_writer = zid.DataWriter.init(new_path_buff, dir) catch return FileEngineError.ZipponDataError;
|
||||
defer new_writer.deinit();
|
||||
|
||||
var output: [BUFFER_SIZE]u8 = undefined; // Maybe need to increase that as it limit the size of a line in a file
|
||||
var output_fbs = std.io.fixedBufferStream(&output);
|
||||
const writer = output_fbs.writer();
|
||||
blk: while (iter.next() catch return FileEngineError.ZipponDataError) |row| {
|
||||
if (filter != null) if (!filter.?.evaluate(row)) continue;
|
||||
|
||||
var buffered = std.io.bufferedReader(old_file.reader());
|
||||
var reader = buffered.reader();
|
||||
var founded = false;
|
||||
var deleted_count: usize = 0;
|
||||
|
||||
while (true) {
|
||||
output_fbs.reset();
|
||||
reader.streamUntilDelimiter(writer, CSV_DELIMITER, null) catch |err| switch (err) {
|
||||
error.EndOfStream => {
|
||||
// When end of file, check if all file was parse, if not update the reader to the next file
|
||||
// TODO: Be able to give an array of file index from the B+Tree to only parse them
|
||||
output_fbs.reset(); // clear buffer before exit
|
||||
|
||||
// Start by deleting and renaming the new file
|
||||
self.allocator.free(path_buff);
|
||||
path_buff = std.fmt.allocPrint(
|
||||
self.allocator,
|
||||
"{s}/DATA/{s}/{d}.csv",
|
||||
.{ self.path_to_ZipponDB_dir, struct_name, current_file_index },
|
||||
) catch return FileEngineError.MemoryError;
|
||||
|
||||
self.allocator.free(path_buff2);
|
||||
path_buff2 = std.fmt.allocPrint(
|
||||
self.allocator,
|
||||
"{s}/DATA/{s}/{d}.csv.new",
|
||||
.{ self.path_to_ZipponDB_dir, struct_name, current_file_index },
|
||||
) catch return FileEngineError.MemoryError;
|
||||
|
||||
old_file.close();
|
||||
std.fs.cwd().deleteFile(path_buff) catch return FileEngineError.DeleteFileError;
|
||||
std.fs.cwd().rename(path_buff2, path_buff) catch return FileEngineError.RenameFileError;
|
||||
|
||||
if (current_file_index == max_file_index) break;
|
||||
|
||||
current_file_index += 1;
|
||||
|
||||
self.allocator.free(path_buff);
|
||||
path_buff = std.fmt.allocPrint(
|
||||
self.allocator,
|
||||
"{s}/DATA/{s}/{d}.csv",
|
||||
.{ self.path_to_ZipponDB_dir, struct_name, current_file_index },
|
||||
) catch return FileEngineError.MemoryError;
|
||||
|
||||
self.allocator.free(path_buff2);
|
||||
path_buff2 = std.fmt.allocPrint(
|
||||
self.allocator,
|
||||
"{s}/DATA/{s}/{d}.csv.new",
|
||||
.{ self.path_to_ZipponDB_dir, struct_name, current_file_index },
|
||||
) catch return FileEngineError.MemoryError;
|
||||
|
||||
old_file = std.fs.cwd().openFile(path_buff, .{}) catch return FileEngineError.CantOpenFile;
|
||||
|
||||
new_file = std.fs.cwd().createFile(path_buff2, .{}) catch return FileEngineError.CantOpenFile;
|
||||
|
||||
buffered = std.io.bufferedReader(old_file.reader());
|
||||
reader = buffered.reader();
|
||||
continue;
|
||||
}, // file read till the end
|
||||
else => {
|
||||
log.err("Error while reading file: {any}", .{err});
|
||||
break;
|
||||
},
|
||||
};
|
||||
|
||||
const new_writer = new_file.writer();
|
||||
|
||||
// THis is the uuid of the current row
|
||||
const uuid = UUID.parse(output_fbs.getWritten()[0..36]) catch return FileEngineError.InvalidUUID;
|
||||
founded = false;
|
||||
|
||||
// Optimize this
|
||||
for (uuids) |elem| {
|
||||
if (elem.compare(uuid)) {
|
||||
founded = true;
|
||||
deleted_count += 1;
|
||||
break;
|
||||
}
|
||||
new_writer.write(row) catch return FileEngineError.WriteError;
|
||||
writer.writeByte('{') catch return FileEngineError.WriteError;
|
||||
writer.print("\"{s}\"", .{UUID.format_bytes(row[0].UUID)}) catch return FileEngineError.WriteError;
|
||||
writer.writeAll("}, ") catch return FileEngineError.WriteError;
|
||||
total_currently_found += 1;
|
||||
if (additional_data.entity_count_to_find != 0 and total_currently_found >= additional_data.entity_count_to_find) break :blk;
|
||||
}
|
||||
|
||||
if (!founded) {
|
||||
// stream until the delimiter
|
||||
new_writer.writeAll(output_fbs.getWritten()) catch return FileEngineError.WriteError;
|
||||
|
||||
output_fbs.reset();
|
||||
new_writer.writeByte(CSV_DELIMITER) catch return FileEngineError.WriteError;
|
||||
reader.streamUntilDelimiter(writer, '\n', null) catch return FileEngineError.WriteError;
|
||||
new_writer.writeAll(output_fbs.getWritten()) catch return FileEngineError.WriteError;
|
||||
new_writer.writeByte('\n') catch return FileEngineError.WriteError;
|
||||
} else {
|
||||
reader.streamUntilDelimiter(writer, '\n', null) catch return FileEngineError.WriteError;
|
||||
}
|
||||
new_writer.flush() catch return FileEngineError.ZipponDataError;
|
||||
dir.deleteFile(path_buff) catch return FileEngineError.DeleteFileError;
|
||||
dir.rename(new_path_buff, path_buff) catch return FileEngineError.RenameFileError;
|
||||
}
|
||||
|
||||
return deleted_count;
|
||||
writer.writeAll("]") catch return FileEngineError.WriteError;
|
||||
}
|
||||
|
||||
// --------------------ZipponData utils--------------------
|
||||
|
||||
// Function that take a map from the parseNewData and return an ordered array of Data
|
||||
|
@ -211,7 +211,7 @@ pub const Parser = struct {
|
||||
var buff = std.ArrayList(u8).init(self.allocator);
|
||||
defer buff.deinit();
|
||||
|
||||
try self.file_engine.parseToSendUsingFilter(struct_name, filter, &buff, &additional_data);
|
||||
try self.file_engine.parseEntities(struct_name, filter, &buff, &additional_data);
|
||||
send("{s}", .{buff.items});
|
||||
state = .end;
|
||||
},
|
||||
@ -219,7 +219,7 @@ pub const Parser = struct {
|
||||
var buff = std.ArrayList(u8).init(self.allocator);
|
||||
defer buff.deinit();
|
||||
|
||||
try self.file_engine.parseToSendUsingFilter(struct_name, null, &buff, &additional_data);
|
||||
try self.file_engine.parseEntities(struct_name, null, &buff, &additional_data);
|
||||
send("{s}", .{buff.items});
|
||||
state = .end;
|
||||
},
|
||||
@ -303,19 +303,19 @@ pub const Parser = struct {
|
||||
var filter = try self.parseFilter(struct_name, false);
|
||||
defer filter.deinit();
|
||||
|
||||
var uuids = std.ArrayList(UUID).init(self.allocator);
|
||||
defer uuids.deinit();
|
||||
var buff = std.ArrayList(u8).init(self.allocator);
|
||||
defer buff.deinit();
|
||||
|
||||
_ = try self.file_engine.deleteEntities(struct_name, uuids.items);
|
||||
try self.sendUUIDs(uuids.items);
|
||||
try self.file_engine.deleteEntities(struct_name, filter, &buff, &additional_data);
|
||||
send("{s}", .{buff.items});
|
||||
state = .end;
|
||||
},
|
||||
.eof => {
|
||||
var uuids = std.ArrayList(UUID).init(self.allocator);
|
||||
defer uuids.deinit();
|
||||
try self.file_engine.getAllUUIDList(struct_name, &uuids);
|
||||
_ = try self.file_engine.deleteEntities(struct_name, uuids.items);
|
||||
try self.sendUUIDs(uuids.items);
|
||||
var buff = std.ArrayList(u8).init(self.allocator);
|
||||
defer buff.deinit();
|
||||
|
||||
try self.file_engine.deleteEntities(struct_name, null, &buff, &additional_data);
|
||||
send("{s}", .{buff.items});
|
||||
state = .end;
|
||||
},
|
||||
else => return printError(
|
||||
@ -1056,6 +1056,10 @@ test "Specific query" {
|
||||
try testParsing("GRAB User [1]");
|
||||
}
|
||||
|
||||
test "DELETE" {
|
||||
try testParsing("DELETE User {name='Bob'}");
|
||||
}
|
||||
|
||||
test "Synthax error" {
|
||||
try expectParsingError("GRAB {}", ZiQlParserError.StructNotFound);
|
||||
try expectParsingError("GRAB User {qwe = 'qwe'}", ZiQlParserError.MemberNotFound);
|
||||
|
Loading…
x
Reference in New Issue
Block a user