From d9950298444c3a3c9d2e5ec7efbf45e722bbed02 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Igor=20Anic=CC=81?= Date: Wed, 21 Feb 2024 20:01:45 +0100 Subject: [PATCH 1/5] add BufferedTee BufferedTee provides reader interface to the consumer. Data read by consumer is also written to the output. Output is hold lookahead_size bytes behind consumer. Allowing consumer to put back some bytes to be read again. On flush all consumed bytes are flushed to the output. input -> tee -> consumer | output input - underlying unbuffered reader output - writer, receives data read by consumer consumer - uses provided reader interface If lookahead_size is zero output always has same bytes as consumer. --- lib/std/io.zig | 4 + lib/std/io/buffered_tee.zig | 390 ++++++++++++++++++++++++++++++++++++ 2 files changed, 394 insertions(+) create mode 100644 lib/std/io/buffered_tee.zig diff --git a/lib/std/io.zig b/lib/std/io.zig index 3eb3c4e391..27c6337626 100644 --- a/lib/std/io.zig +++ b/lib/std/io.zig @@ -411,6 +411,9 @@ pub const BufferedAtomicFile = @import("io/buffered_atomic_file.zig").BufferedAt pub const StreamSource = @import("io/stream_source.zig").StreamSource; +pub const BufferedTee = @import("io/buffered_tee.zig").BufferedTee; +pub const bufferedTee = @import("io/buffered_tee.zig").bufferedTee; + pub const tty = @import("io/tty.zig"); /// A Writer that doesn't write to anything. @@ -692,4 +695,5 @@ test { _ = @import("io/seekable_stream.zig"); _ = @import("io/stream_source.zig"); _ = @import("io/test.zig"); + _ = @import("io/buffered_tee.zig"); } diff --git a/lib/std/io/buffered_tee.zig b/lib/std/io/buffered_tee.zig new file mode 100644 index 0000000000..88eaeb2852 --- /dev/null +++ b/lib/std/io/buffered_tee.zig @@ -0,0 +1,390 @@ +//! BufferedTee provides reader interface to the consumer. Data read by consumer +//! is also written to the output. Output is hold lookahead_size bytes behind +//! consumer. Allowing consumer to put back some bytes to be read again. On flush +//! all consumed bytes are flushed to the output. +//! +//! input -> tee -> consumer +//! | +//! output +//! +//! input - underlying unbuffered reader +//! output - writer, receives data read by consumer +//! consumer - uses provided reader interface +//! +//! If lookahead_size is zero output always has same bytes as consumer. +//! + +const std = @import("std"); +const io = std.io; +const assert = std.debug.assert; +const testing = std.testing; + +pub fn BufferedTee( + comptime buffer_size: usize, // internal buffer size in bytes + comptime lookahead_size: usize, // lookahead, number of bytes to hold output behind consumer + comptime InputReaderType: type, + comptime OutputWriterType: type, +) type { + comptime assert(buffer_size > lookahead_size); + + return struct { + input: InputReaderType, + output: OutputWriterType, + + buf: [buffer_size]u8 = undefined, // internal buffer + tail: usize = 0, // buffer is filled up to this position with bytes from input + rp: usize = 0, // reader pointer; consumer has read up to this position + wp: usize = 0, // writer pointer; data is sent to the output up to this position + + pub const Error = InputReaderType.Error || OutputWriterType.Error; + pub const Reader = io.Reader(*Self, Error, read); + + const Self = @This(); + + pub fn read(self: *Self, dest: []u8) Error!usize { + var dest_index: usize = 0; + + while (dest_index < dest.len) { + const written = @min(dest.len - dest_index, self.tail - self.rp); + if (written == 0) { + try self.preserveLookahead(); + // fill upper part of the buf + const n = try self.input.read(self.buf[self.tail..]); + if (n == 0) { + // reading from the unbuffered stream returned nothing + // so we have nothing left to read. + return dest_index; + } + self.tail += n; + } else { + @memcpy(dest[dest_index..][0..written], self.buf[self.rp..][0..written]); + self.rp += written; + dest_index += written; + try self.flush_(lookahead_size); + } + } + return dest.len; + } + + /// Move lookahead_size bytes to the buffer start. + fn preserveLookahead(self: *Self) !void { + assert(self.tail == self.rp); + if (lookahead_size == 0) { + // Flush is called on each read so wp must follow rp when lookahead_size == 0. + assert(self.wp == self.rp); + // Nothing to preserve rewind pointer to the buffer start + self.rp = 0; + self.wp = 0; + self.tail = 0; + return; + } + if (self.tail <= lookahead_size) { + // There is still palce in the buffer, append to buffer from tail position. + return; + } + try self.flush_(lookahead_size); + const head = self.tail - lookahead_size; + // Preserve head..tail at the start of the buffer. + std.mem.copyForwards(u8, self.buf[0..lookahead_size], self.buf[head..self.tail]); + self.wp -= head; + assert(self.wp <= lookahead_size); + self.rp = lookahead_size; + self.tail = lookahead_size; + } + + /// Flush to the output all but lookahead size bytes. + fn flush_(self: *Self, lookahead: usize) !void { + if (self.rp <= self.wp + lookahead) return; + const new_wp = self.rp - lookahead; + try self.output.writeAll(self.buf[self.wp..new_wp]); + self.wp = new_wp; + } + + /// Flush to the output all consumed bytes. + pub fn flush(self: *Self) !void { + try self.flush_(0); + } + + /// Put back some bytes to be consumed again. Usefull when we overshoot + /// reading and want to return that overshoot bytes. Can return maximum + /// of lookahead_size number of bytes. + pub fn putBack(self: *Self, n: usize) void { + assert(n <= lookahead_size and n <= self.rp); + self.rp -= n; + } + + pub fn reader(self: *Self) Reader { + return .{ .context = self }; + } + }; +} + +pub fn bufferedTee( + comptime buffer_size: usize, + comptime lookahead_size: usize, + input: anytype, + output: anytype, +) BufferedTee( + buffer_size, + lookahead_size, + @TypeOf(input), + @TypeOf(output), +) { + return BufferedTee( + buffer_size, + lookahead_size, + @TypeOf(input), + @TypeOf(output), + ){ + .input = input, + .output = output, + }; +} + +// Running test from std.io.BufferedReader on BufferedTee +// It should act as BufferedReader for consumer. + +fn BufferedReader(comptime buffer_size: usize, comptime ReaderType: type) type { + return BufferedTee(buffer_size, 0, ReaderType, @TypeOf(io.null_writer)); +} + +fn bufferedReader(reader: anytype) BufferedReader(4096, @TypeOf(reader)) { + return .{ + .input = reader, + .output = io.null_writer, + }; +} + +test "io.BufferedTee io.BufferedReader OneByte" { + const OneByteReadReader = struct { + str: []const u8, + curr: usize, + + const Error = error{NoError}; + const Self = @This(); + const Reader = io.Reader(*Self, Error, read); + + fn init(str: []const u8) Self { + return Self{ + .str = str, + .curr = 0, + }; + } + + fn read(self: *Self, dest: []u8) Error!usize { + if (self.str.len <= self.curr or dest.len == 0) + return 0; + + dest[0] = self.str[self.curr]; + self.curr += 1; + return 1; + } + + fn reader(self: *Self) Reader { + return .{ .context = self }; + } + }; + + const str = "This is a test"; + var one_byte_stream = OneByteReadReader.init(str); + var buf_reader = bufferedReader(one_byte_stream.reader()); + const stream = buf_reader.reader(); + + const res = try stream.readAllAlloc(testing.allocator, str.len + 1); + defer testing.allocator.free(res); + try testing.expectEqualSlices(u8, str, res); +} + +test "io.BufferedTee io.BufferedReader Block" { + const BlockReader = struct { + block: []const u8, + reads_allowed: usize, + curr_read: usize, + + const Error = error{NoError}; + const Self = @This(); + const Reader = io.Reader(*Self, Error, read); + + fn init(block: []const u8, reads_allowed: usize) Self { + return Self{ + .block = block, + .reads_allowed = reads_allowed, + .curr_read = 0, + }; + } + + fn read(self: *Self, dest: []u8) Error!usize { + if (self.curr_read >= self.reads_allowed) return 0; + @memcpy(dest[0..self.block.len], self.block); + + self.curr_read += 1; + return self.block.len; + } + + fn reader(self: *Self) Reader { + return .{ .context = self }; + } + }; + + const block = "0123"; + + // len out == block + { + var test_buf_reader: BufferedReader(4, BlockReader) = .{ + .input = BlockReader.init(block, 2), + .output = io.null_writer, + }; + var out_buf: [4]u8 = undefined; + _ = try test_buf_reader.read(&out_buf); + try testing.expectEqualSlices(u8, &out_buf, block); + _ = try test_buf_reader.read(&out_buf); + try testing.expectEqualSlices(u8, &out_buf, block); + try testing.expectEqual(try test_buf_reader.read(&out_buf), 0); + } + + // len out < block + { + var test_buf_reader: BufferedReader(4, BlockReader) = .{ + .input = BlockReader.init(block, 2), + .output = io.null_writer, + }; + var out_buf: [3]u8 = undefined; + _ = try test_buf_reader.read(&out_buf); + try testing.expectEqualSlices(u8, &out_buf, "012"); + _ = try test_buf_reader.read(&out_buf); + try testing.expectEqualSlices(u8, &out_buf, "301"); + const n = try test_buf_reader.read(&out_buf); + try testing.expectEqualSlices(u8, out_buf[0..n], "23"); + try testing.expectEqual(try test_buf_reader.read(&out_buf), 0); + } + + // len out > block + { + var test_buf_reader: BufferedReader(4, BlockReader) = .{ + .input = BlockReader.init(block, 2), + .output = io.null_writer, + }; + var out_buf: [5]u8 = undefined; + _ = try test_buf_reader.read(&out_buf); + try testing.expectEqualSlices(u8, &out_buf, "01230"); + const n = try test_buf_reader.read(&out_buf); + try testing.expectEqualSlices(u8, out_buf[0..n], "123"); + try testing.expectEqual(try test_buf_reader.read(&out_buf), 0); + } + + // len out == 0 + { + var test_buf_reader: BufferedReader(4, BlockReader) = .{ + .input = BlockReader.init(block, 2), + .output = io.null_writer, + }; + var out_buf: [0]u8 = undefined; + _ = try test_buf_reader.read(&out_buf); + try testing.expectEqualSlices(u8, &out_buf, ""); + } + + // len bufreader buf > block + { + var test_buf_reader: BufferedReader(5, BlockReader) = .{ + .input = BlockReader.init(block, 2), + .output = io.null_writer, + }; + var out_buf: [4]u8 = undefined; + _ = try test_buf_reader.read(&out_buf); + try testing.expectEqualSlices(u8, &out_buf, block); + _ = try test_buf_reader.read(&out_buf); + try testing.expectEqualSlices(u8, &out_buf, block); + try testing.expectEqual(try test_buf_reader.read(&out_buf), 0); + } +} + +test "io.BufferedTee with zero lookahead" { + // output is has same bytes as reader + const data = [_]u8{ 0, 1, 2, 3, 4, 5, 6, 7, 8, 9 } ** 12; + var in = io.fixedBufferStream(&data); + var out = std.ArrayList(u8).init(testing.allocator); + defer out.deinit(); + + var lbr = bufferedTee(8, 0, in.reader(), out.writer()); + + var buf: [16]u8 = undefined; + + var read_len: usize = 0; + for (0..buf.len) |i| { + const n = try lbr.read(buf[0..i]); + try testing.expectEqual(i, n); + read_len += i; + try testing.expectEqual(read_len, out.items.len); + } +} + +test "io.BufferedTee with lookahead" { + // output is lookahead bytes behind reader + inline for (1..8) |lookahead| { + const data = [_]u8{ 0, 1, 2, 3, 4, 5, 6, 7, 8, 9 } ** 12; + var in = io.fixedBufferStream(&data); + var out = std.ArrayList(u8).init(testing.allocator); + defer out.deinit(); + + var lbr = bufferedTee(8, lookahead, in.reader(), out.writer()); + var buf: [16]u8 = undefined; + + var read_len: usize = 0; + for (1..buf.len) |i| { + const n = try lbr.read(buf[0..i]); + try testing.expectEqual(i, n); + read_len += i; + const out_len = if (read_len < lookahead) 0 else read_len - lookahead; + try testing.expectEqual(out_len, out.items.len); + // std.debug.print("{d} {d} {d}\n", .{ lookahead, read_len, out_len }); + } + try testing.expectEqual(read_len, out.items.len + lookahead); + try lbr.flush(); + try testing.expectEqual(read_len, out.items.len); + } +} + +test "io.BufferedTee internal state" { + const data = [_]u8{ 0, 1, 2, 3, 4, 5, 6, 7, 8, 9 } ** 10; + var in = io.fixedBufferStream(&data); + var out = std.ArrayList(u8).init(testing.allocator); + defer out.deinit(); + + var lbr = bufferedTee(8, 4, in.reader(), out.writer()); + + var buf: [16]u8 = undefined; + var n = try lbr.read(buf[0..3]); + try testing.expectEqual(3, n); + try testing.expectEqualSlices(u8, data[0..3], buf[0..n]); + try testing.expectEqual(8, lbr.tail); + try testing.expectEqual(3, lbr.rp); + try testing.expectEqual(0, out.items.len); + + n = try lbr.read(buf[0..6]); + try testing.expectEqual(6, n); + try testing.expectEqualSlices(u8, data[3..9], buf[0..n]); + try testing.expectEqual(8, lbr.tail); + try testing.expectEqual(5, lbr.rp); + try testing.expectEqualSlices(u8, data[4..12], &lbr.buf); + try testing.expectEqual(5, out.items.len); + + n = try lbr.read(buf[0..9]); + try testing.expectEqual(9, n); + try testing.expectEqualSlices(u8, data[9..18], buf[0..n]); + try testing.expectEqual(8, lbr.tail); + try testing.expectEqual(6, lbr.rp); + try testing.expectEqualSlices(u8, data[12..20], &lbr.buf); + try testing.expectEqual(14, out.items.len); + + try lbr.flush(); + try testing.expectEqual(18, out.items.len); + + lbr.putBack(4); + n = try lbr.read(buf[0..4]); + try testing.expectEqual(4, n); + try testing.expectEqualSlices(u8, data[14..18], buf[0..n]); + + try testing.expectEqual(18, out.items.len); + try lbr.flush(); + try testing.expectEqual(18, out.items.len); +} From ce1a590fc9f57cde58c973d27461209ea2c34d37 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Igor=20Anic=CC=81?= Date: Wed, 21 Feb 2024 20:26:29 +0100 Subject: [PATCH 2/5] cleanup tests --- lib/std/io/buffered_tee.zig | 50 ++++++++++++++++++------------------- 1 file changed, 24 insertions(+), 26 deletions(-) diff --git a/lib/std/io/buffered_tee.zig b/lib/std/io/buffered_tee.zig index 88eaeb2852..cea2a707f8 100644 --- a/lib/std/io/buffered_tee.zig +++ b/lib/std/io/buffered_tee.zig @@ -299,19 +299,18 @@ test "io.BufferedTee io.BufferedReader Block" { } test "io.BufferedTee with zero lookahead" { - // output is has same bytes as reader + // output has same bytes as consumer const data = [_]u8{ 0, 1, 2, 3, 4, 5, 6, 7, 8, 9 } ** 12; var in = io.fixedBufferStream(&data); var out = std.ArrayList(u8).init(testing.allocator); defer out.deinit(); - var lbr = bufferedTee(8, 0, in.reader(), out.writer()); + var bt = bufferedTee(8, 0, in.reader(), out.writer()); var buf: [16]u8 = undefined; - var read_len: usize = 0; for (0..buf.len) |i| { - const n = try lbr.read(buf[0..i]); + const n = try bt.read(buf[0..i]); try testing.expectEqual(i, n); read_len += i; try testing.expectEqual(read_len, out.items.len); @@ -319,72 +318,71 @@ test "io.BufferedTee with zero lookahead" { } test "io.BufferedTee with lookahead" { - // output is lookahead bytes behind reader + // output is lookahead bytes behind consumer inline for (1..8) |lookahead| { const data = [_]u8{ 0, 1, 2, 3, 4, 5, 6, 7, 8, 9 } ** 12; var in = io.fixedBufferStream(&data); var out = std.ArrayList(u8).init(testing.allocator); defer out.deinit(); - var lbr = bufferedTee(8, lookahead, in.reader(), out.writer()); + var bt = bufferedTee(8, lookahead, in.reader(), out.writer()); var buf: [16]u8 = undefined; var read_len: usize = 0; for (1..buf.len) |i| { - const n = try lbr.read(buf[0..i]); + const n = try bt.read(buf[0..i]); try testing.expectEqual(i, n); read_len += i; const out_len = if (read_len < lookahead) 0 else read_len - lookahead; try testing.expectEqual(out_len, out.items.len); - // std.debug.print("{d} {d} {d}\n", .{ lookahead, read_len, out_len }); } try testing.expectEqual(read_len, out.items.len + lookahead); - try lbr.flush(); + try bt.flush(); try testing.expectEqual(read_len, out.items.len); } } test "io.BufferedTee internal state" { - const data = [_]u8{ 0, 1, 2, 3, 4, 5, 6, 7, 8, 9 } ** 10; + const data = [_]u8{ 0, 1, 2, 3, 4, 5, 6, 7, 8, 9 } ** 2; var in = io.fixedBufferStream(&data); var out = std.ArrayList(u8).init(testing.allocator); defer out.deinit(); - var lbr = bufferedTee(8, 4, in.reader(), out.writer()); + var bt = bufferedTee(8, 4, in.reader(), out.writer()); var buf: [16]u8 = undefined; - var n = try lbr.read(buf[0..3]); + var n = try bt.read(buf[0..3]); try testing.expectEqual(3, n); try testing.expectEqualSlices(u8, data[0..3], buf[0..n]); - try testing.expectEqual(8, lbr.tail); - try testing.expectEqual(3, lbr.rp); + try testing.expectEqual(8, bt.tail); + try testing.expectEqual(3, bt.rp); try testing.expectEqual(0, out.items.len); - n = try lbr.read(buf[0..6]); + n = try bt.read(buf[0..6]); try testing.expectEqual(6, n); try testing.expectEqualSlices(u8, data[3..9], buf[0..n]); - try testing.expectEqual(8, lbr.tail); - try testing.expectEqual(5, lbr.rp); - try testing.expectEqualSlices(u8, data[4..12], &lbr.buf); + try testing.expectEqual(8, bt.tail); + try testing.expectEqual(5, bt.rp); + try testing.expectEqualSlices(u8, data[4..12], &bt.buf); try testing.expectEqual(5, out.items.len); - n = try lbr.read(buf[0..9]); + n = try bt.read(buf[0..9]); try testing.expectEqual(9, n); try testing.expectEqualSlices(u8, data[9..18], buf[0..n]); - try testing.expectEqual(8, lbr.tail); - try testing.expectEqual(6, lbr.rp); - try testing.expectEqualSlices(u8, data[12..20], &lbr.buf); + try testing.expectEqual(8, bt.tail); + try testing.expectEqual(6, bt.rp); + try testing.expectEqualSlices(u8, data[12..20], &bt.buf); try testing.expectEqual(14, out.items.len); - try lbr.flush(); + try bt.flush(); try testing.expectEqual(18, out.items.len); - lbr.putBack(4); - n = try lbr.read(buf[0..4]); + bt.putBack(4); + n = try bt.read(buf[0..4]); try testing.expectEqual(4, n); try testing.expectEqualSlices(u8, data[14..18], buf[0..n]); try testing.expectEqual(18, out.items.len); - try lbr.flush(); + try bt.flush(); try testing.expectEqual(18, out.items.len); } From eb67fab2d9e8540b9052e4b0d3cd0149da9d9962 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Igor=20Anic=CC=81?= Date: Thu, 22 Feb 2024 12:29:21 +0100 Subject: [PATCH 3/5] refactor according to Ian's review https://github.com/ziglang/zig/pull/19032#pullrequestreview-1894702793 --- lib/std/io/buffered_tee.zig | 41 +++++++++++++++---------------------- 1 file changed, 16 insertions(+), 25 deletions(-) diff --git a/lib/std/io/buffered_tee.zig b/lib/std/io/buffered_tee.zig index cea2a707f8..304b360c41 100644 --- a/lib/std/io/buffered_tee.zig +++ b/lib/std/io/buffered_tee.zig @@ -1,24 +1,23 @@ -//! BufferedTee provides reader interface to the consumer. Data read by consumer -//! is also written to the output. Output is hold lookahead_size bytes behind -//! consumer. Allowing consumer to put back some bytes to be read again. On flush -//! all consumed bytes are flushed to the output. -//! -//! input -> tee -> consumer -//! | -//! output -//! -//! input - underlying unbuffered reader -//! output - writer, receives data read by consumer -//! consumer - uses provided reader interface -//! -//! If lookahead_size is zero output always has same bytes as consumer. -//! - const std = @import("std"); const io = std.io; const assert = std.debug.assert; const testing = std.testing; +/// BufferedTee provides reader interface to the consumer. Data read by consumer +/// is also written to the output. Output is hold lookahead_size bytes behind +/// consumer. Allowing consumer to put back some bytes to be read again. On flush +/// all consumed bytes are flushed to the output. +/// +/// input -> tee -> consumer +/// | +/// output +/// +/// input - underlying unbuffered reader +/// output - writer, receives data read by consumer +/// consumer - uses provided reader interface +/// +/// If lookahead_size is zero output always has same bytes as consumer. +/// pub fn BufferedTee( comptime buffer_size: usize, // internal buffer size in bytes comptime lookahead_size: usize, // lookahead, number of bytes to hold output behind consumer @@ -130,15 +129,7 @@ pub fn bufferedTee( @TypeOf(input), @TypeOf(output), ) { - return BufferedTee( - buffer_size, - lookahead_size, - @TypeOf(input), - @TypeOf(output), - ){ - .input = input, - .output = output, - }; + return .{ .input = input, .output = output }; } // Running test from std.io.BufferedReader on BufferedTee From d00faa2407cdeaa058da62f2d95f64f9e7ed6a09 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Igor=20Anic=CC=81?= Date: Thu, 22 Feb 2024 15:18:03 +0100 Subject: [PATCH 4/5] use BufferedTee in Fetch/git.zig --- src/Package/Fetch/git.zig | 172 ++++++-------------------------------- 1 file changed, 26 insertions(+), 146 deletions(-) diff --git a/src/Package/Fetch/git.zig b/src/Package/Fetch/git.zig index db50ddfab7..dd6f63d177 100644 --- a/src/Package/Fetch/git.zig +++ b/src/Package/Fetch/git.zig @@ -1091,87 +1091,7 @@ pub fn indexPack(allocator: Allocator, pack: std.fs.File, index_writer: anytype) try index_writer.writeAll(&index_checksum); } -/// A reader that stores read data in a growable internal buffer. The read -/// position can be rewound to allow previously read data to be read again. -fn AccumulatingReader(comptime ReaderType: type) type { - return struct { - child_reader: ReaderType, - buffer: std.ArrayListUnmanaged(u8) = .{}, - /// The position in `buffer` from which reads should start, returning - /// buffered data. If this is `buffer.items.len`, data will be read from - /// `child_reader` instead. - read_start: usize = 0, - allocator: Allocator, - - const Self = @This(); - - fn deinit(self: *Self) void { - self.buffer.deinit(self.allocator); - self.* = undefined; - } - - const ReadError = ReaderType.Error || Allocator.Error; - const Reader = std.io.Reader(*Self, ReadError, read); - - fn read(self: *Self, buf: []u8) ReadError!usize { - if (self.read_start < self.buffer.items.len) { - // Previously buffered data is available and should be used - // before reading more data from the underlying reader. - const available = self.buffer.items.len - self.read_start; - const count = @min(available, buf.len); - @memcpy(buf[0..count], self.buffer.items[self.read_start..][0..count]); - self.read_start += count; - return count; - } - - try self.buffer.ensureUnusedCapacity(self.allocator, buf.len); - const read_buffer = self.buffer.unusedCapacitySlice(); - const count = try self.child_reader.read(read_buffer[0..buf.len]); - @memcpy(buf[0..count], read_buffer[0..count]); - self.buffer.items.len += count; - self.read_start += count; - return count; - } - - fn reader(self: *Self) Reader { - return .{ .context = self }; - } - - /// Returns a slice of the buffered data that has already been read, - /// except the last `count_before_end` bytes. - fn readDataExcept(self: Self, count_before_end: usize) []const u8 { - assert(count_before_end <= self.read_start); - return self.buffer.items[0 .. self.read_start - count_before_end]; - } - - /// Discards the first `count` bytes of buffered data. - fn discard(self: *Self, count: usize) void { - assert(count <= self.buffer.items.len); - const retain = self.buffer.items.len - count; - mem.copyForwards( - u8, - self.buffer.items[0..retain], - self.buffer.items[count..][0..retain], - ); - self.buffer.items.len = retain; - self.read_start -= @min(self.read_start, count); - } - - /// Rewinds the read position to the beginning of buffered data. - fn rewind(self: *Self) void { - self.read_start = 0; - } - }; -} - -fn accumulatingReader( - allocator: Allocator, - reader: anytype, -) AccumulatingReader(@TypeOf(reader)) { - return .{ .child_reader = reader, .allocator = allocator }; -} - -/// Performs the first pass over the packfile data for index construction. +// Performs the first pass over the packfile data for index construction. /// This will index all non-delta objects, queue delta objects for further /// processing, and return the pack checksum (which is part of the index /// format). @@ -1181,102 +1101,62 @@ fn indexPackFirstPass( index_entries: *std.AutoHashMapUnmanaged(Oid, IndexEntry), pending_deltas: *std.ArrayListUnmanaged(IndexEntry), ) ![Sha1.digest_length]u8 { - var pack_buffered_reader = std.io.bufferedReader(pack.reader()); - var pack_accumulating_reader = accumulatingReader(allocator, pack_buffered_reader.reader()); - defer pack_accumulating_reader.deinit(); - var pack_position: usize = 0; - var pack_hash = Sha1.init(.{}); - const pack_reader = pack_accumulating_reader.reader(); + var pack_counting_writer = std.io.countingWriter(std.io.null_writer); + var pack_hashed_writer = std.compress.hashedWriter(pack_counting_writer.writer(), Sha1.init(.{})); + var entry_crc32_writer = std.compress.hashedWriter(pack_hashed_writer.writer(), std.hash.Crc32.init()); + var pack_buffered_reader = std.io.bufferedTee(4096, 8, pack.reader(), entry_crc32_writer.writer()); + const pack_reader = pack_buffered_reader.reader(); const pack_header = try PackHeader.read(pack_reader); - const pack_header_bytes = pack_accumulating_reader.readDataExcept(0); - pack_position += pack_header_bytes.len; - pack_hash.update(pack_header_bytes); - pack_accumulating_reader.discard(pack_header_bytes.len); + try pack_buffered_reader.flush(); var current_entry: u32 = 0; while (current_entry < pack_header.total_objects) : (current_entry += 1) { - const entry_offset = pack_position; - var entry_crc32 = std.hash.Crc32.init(); - + const entry_offset = pack_counting_writer.bytes_written; + entry_crc32_writer.hasher = std.hash.Crc32.init(); // reset hasher const entry_header = try EntryHeader.read(pack_reader); - const entry_header_bytes = pack_accumulating_reader.readDataExcept(0); - pack_position += entry_header_bytes.len; - pack_hash.update(entry_header_bytes); - entry_crc32.update(entry_header_bytes); - pack_accumulating_reader.discard(entry_header_bytes.len); switch (entry_header) { - .commit, .tree, .blob, .tag => |object| { + inline .commit, .tree, .blob, .tag => |object, tag| { var entry_decompress_stream = std.compress.zlib.decompressor(pack_reader); - var entry_data_size: usize = 0; + var entry_counting_reader = std.io.countingReader(entry_decompress_stream.reader()); var entry_hashed_writer = hashedWriter(std.io.null_writer, Sha1.init(.{})); const entry_writer = entry_hashed_writer.writer(); - // The object header is not included in the pack data but is // part of the object's ID - try entry_writer.print("{s} {}\x00", .{ @tagName(entry_header), object.uncompressed_length }); - - while (try entry_decompress_stream.next()) |decompressed_data| { - entry_data_size += decompressed_data.len; - try entry_writer.writeAll(decompressed_data); - - const compressed_bytes = pack_accumulating_reader.readDataExcept(entry_decompress_stream.unreadBytes()); - pack_position += compressed_bytes.len; - pack_hash.update(compressed_bytes); - entry_crc32.update(compressed_bytes); - pack_accumulating_reader.discard(compressed_bytes.len); - } - const footer_bytes = pack_accumulating_reader.readDataExcept(entry_decompress_stream.unreadBytes()); - pack_position += footer_bytes.len; - pack_hash.update(footer_bytes); - entry_crc32.update(footer_bytes); - pack_accumulating_reader.discard(footer_bytes.len); - pack_accumulating_reader.rewind(); - - if (entry_data_size != object.uncompressed_length) { + try entry_writer.print("{s} {}\x00", .{ @tagName(tag), object.uncompressed_length }); + var fifo = std.fifo.LinearFifo(u8, .{ .Static = 4096 }).init(); + try fifo.pump(entry_counting_reader.reader(), entry_writer); + if (entry_counting_reader.bytes_read != object.uncompressed_length) { return error.InvalidObject; } - const oid = entry_hashed_writer.hasher.finalResult(); + pack_buffered_reader.putBack(entry_decompress_stream.unreadBytes()); + try pack_buffered_reader.flush(); try index_entries.put(allocator, oid, .{ .offset = entry_offset, - .crc32 = entry_crc32.final(), + .crc32 = entry_crc32_writer.hasher.final(), }); }, inline .ofs_delta, .ref_delta => |delta| { var entry_decompress_stream = std.compress.zlib.decompressor(pack_reader); - var entry_data_size: usize = 0; - - while (try entry_decompress_stream.next()) |decompressed_data| { - entry_data_size += decompressed_data.len; - - const compressed_bytes = pack_accumulating_reader.readDataExcept(entry_decompress_stream.unreadBytes()); - pack_position += compressed_bytes.len; - pack_hash.update(compressed_bytes); - entry_crc32.update(compressed_bytes); - pack_accumulating_reader.discard(compressed_bytes.len); - } - const footer_bytes = pack_accumulating_reader.readDataExcept(entry_decompress_stream.unreadBytes()); - pack_position += footer_bytes.len; - pack_hash.update(footer_bytes); - entry_crc32.update(footer_bytes); - pack_accumulating_reader.discard(footer_bytes.len); - pack_accumulating_reader.rewind(); - - if (entry_data_size != delta.uncompressed_length) { + var entry_counting_reader = std.io.countingReader(entry_decompress_stream.reader()); + var fifo = std.fifo.LinearFifo(u8, .{ .Static = 4096 }).init(); + try fifo.pump(entry_counting_reader.reader(), std.io.null_writer); + if (entry_counting_reader.bytes_read != delta.uncompressed_length) { return error.InvalidObject; } - + pack_buffered_reader.putBack(entry_decompress_stream.unreadBytes()); + try pack_buffered_reader.flush(); try pending_deltas.append(allocator, .{ .offset = entry_offset, - .crc32 = entry_crc32.final(), + .crc32 = entry_crc32_writer.hasher.final(), }); }, } } - const pack_checksum = pack_hash.finalResult(); + const pack_checksum = pack_hashed_writer.hasher.finalResult(); const recorded_checksum = try pack_reader.readBytesNoEof(Sha1.digest_length); if (!mem.eql(u8, &pack_checksum, &recorded_checksum)) { return error.CorruptedPack; From a5326c5ef81885821d39f843bbe83c6a3aaff170 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Igor=20Anic=CC=81?= Date: Thu, 22 Feb 2024 17:20:41 +0100 Subject: [PATCH 5/5] return few previous fixes Review: https://github.com/ziglang/zig/pull/19032#pullrequestreview-1896251841 --- src/Package/Fetch/git.zig | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/src/Package/Fetch/git.zig b/src/Package/Fetch/git.zig index dd6f63d177..ee8f1ba543 100644 --- a/src/Package/Fetch/git.zig +++ b/src/Package/Fetch/git.zig @@ -1091,7 +1091,7 @@ pub fn indexPack(allocator: Allocator, pack: std.fs.File, index_writer: anytype) try index_writer.writeAll(&index_checksum); } -// Performs the first pass over the packfile data for index construction. +/// Performs the first pass over the packfile data for index construction. /// This will index all non-delta objects, queue delta objects for further /// processing, and return the pack checksum (which is part of the index /// format). @@ -1117,14 +1117,14 @@ fn indexPackFirstPass( const entry_header = try EntryHeader.read(pack_reader); switch (entry_header) { - inline .commit, .tree, .blob, .tag => |object, tag| { + .commit, .tree, .blob, .tag => |object| { var entry_decompress_stream = std.compress.zlib.decompressor(pack_reader); var entry_counting_reader = std.io.countingReader(entry_decompress_stream.reader()); var entry_hashed_writer = hashedWriter(std.io.null_writer, Sha1.init(.{})); const entry_writer = entry_hashed_writer.writer(); // The object header is not included in the pack data but is // part of the object's ID - try entry_writer.print("{s} {}\x00", .{ @tagName(tag), object.uncompressed_length }); + try entry_writer.print("{s} {}\x00", .{ @tagName(entry_header), object.uncompressed_length }); var fifo = std.fifo.LinearFifo(u8, .{ .Static = 4096 }).init(); try fifo.pump(entry_counting_reader.reader(), entry_writer); if (entry_counting_reader.bytes_read != object.uncompressed_length) {