mirror of
https://github.com/ziglang/zig.git
synced 2025-12-06 06:13:07 +00:00
Compare commits
18 Commits
d3e20e71be
...
21f9f378f1
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
21f9f378f1 | ||
|
|
2ea55d7153 | ||
|
|
d828115dab | ||
|
|
7096e66ca9 | ||
|
|
eb038ffbc1 | ||
|
|
b052afd24b | ||
|
|
cf744aa182 | ||
|
|
13b537d77c | ||
|
|
aae85a4130 | ||
|
|
b4ec78906c | ||
|
|
69f9395b38 | ||
|
|
ff883dd6ce | ||
|
|
8eaebf5939 | ||
|
|
d54fbc0123 | ||
|
|
bb3b5d09cc | ||
|
|
a892e09435 | ||
|
|
af7dec94c6 | ||
|
|
153521279f |
@ -52,7 +52,7 @@ jobs:
|
||||
run: sh ci/loongarch64-linux-release.sh
|
||||
timeout-minutes: 180
|
||||
riscv64-linux-debug:
|
||||
# if: github.event_name != 'pull_request'
|
||||
if: github.event_name != 'pull_request'
|
||||
runs-on: [self-hosted, riscv64-linux]
|
||||
steps:
|
||||
- name: Checkout
|
||||
@ -63,7 +63,7 @@ jobs:
|
||||
run: sh ci/riscv64-linux-debug.sh
|
||||
timeout-minutes: 540
|
||||
riscv64-linux-release:
|
||||
# if: github.event_name != 'pull_request'
|
||||
if: github.event_name != 'pull_request'
|
||||
runs-on: [self-hosted, riscv64-linux]
|
||||
steps:
|
||||
- name: Checkout
|
||||
|
||||
@ -421,8 +421,7 @@
|
||||
{#code|unattached_doc-comment.zig#}
|
||||
|
||||
<p>
|
||||
Doc comments can be interleaved with normal comments. Currently, when producing
|
||||
the package documentation, normal comments are merged with doc comments.
|
||||
Doc comments can be interleaved with normal comments, which are ignored.
|
||||
</p>
|
||||
{#header_close#}
|
||||
{#header_open|Top-Level Doc Comments#}
|
||||
|
||||
@ -4,6 +4,11 @@ const std = @import("std");
|
||||
const os = std.os;
|
||||
const assert = std.debug.assert;
|
||||
|
||||
// Custom error set definition:
|
||||
const ExampleErrorSet = error{
|
||||
ExampleErrorVariant,
|
||||
};
|
||||
|
||||
pub fn main() void {
|
||||
// integers
|
||||
const one_plus_one: i32 = 1 + 1;
|
||||
@ -36,7 +41,7 @@ pub fn main() void {
|
||||
});
|
||||
|
||||
// error union
|
||||
var number_or_error: anyerror!i32 = error.ArgNotFound;
|
||||
var number_or_error: ExampleErrorSet!i32 = ExampleErrorSet.ExampleErrorVariant;
|
||||
|
||||
print("\nerror union 1\ntype: {}\nvalue: {!}\n", .{
|
||||
@TypeOf(number_or_error),
|
||||
|
||||
@ -580,6 +580,9 @@ pub const VTable = struct {
|
||||
/// If it returns `null` it means `result` has been already populated and
|
||||
/// `await` will be a no-op.
|
||||
///
|
||||
/// When this function returns non-null, the implementation guarantees that
|
||||
/// a unit of concurrency has been assigned to the returned task.
|
||||
///
|
||||
/// Thread-safe.
|
||||
async: *const fn (
|
||||
/// Corresponds to `Io.userdata`.
|
||||
@ -1024,6 +1027,10 @@ pub const Group = struct {
|
||||
///
|
||||
/// `function` *may* be called immediately, before `async` returns.
|
||||
///
|
||||
/// When this function returns, it is guaranteed that `function` has
|
||||
/// already been called and completed, or it has successfully been assigned
|
||||
/// a unit of concurrency.
|
||||
///
|
||||
/// After this is called, `wait` or `cancel` must be called before the
|
||||
/// group is deinitialized.
|
||||
///
|
||||
@ -1094,6 +1101,10 @@ pub fn Select(comptime U: type) type {
|
||||
///
|
||||
/// `function` *may* be called immediately, before `async` returns.
|
||||
///
|
||||
/// When this function returns, it is guaranteed that `function` has
|
||||
/// already been called and completed, or it has successfully been
|
||||
/// assigned a unit of concurrency.
|
||||
///
|
||||
/// After this is called, `wait` or `cancel` must be called before the
|
||||
/// select is deinitialized.
|
||||
///
|
||||
@ -1524,8 +1535,11 @@ pub fn Queue(Elem: type) type {
|
||||
/// not guaranteed to be available until `await` is called.
|
||||
///
|
||||
/// `function` *may* be called immediately, before `async` returns. This has
|
||||
/// weaker guarantees than `concurrent`, making more portable and
|
||||
/// reusable.
|
||||
/// weaker guarantees than `concurrent`, making more portable and reusable.
|
||||
///
|
||||
/// When this function returns, it is guaranteed that `function` has already
|
||||
/// been called and completed, or it has successfully been assigned a unit of
|
||||
/// concurrency.
|
||||
///
|
||||
/// See also:
|
||||
/// * `Group`
|
||||
|
||||
@ -200,11 +200,17 @@ pub fn defaultDiscard(r: *Reader, limit: Limit) Error!usize {
|
||||
r.seek = 0;
|
||||
r.end = 0;
|
||||
var d: Writer.Discarding = .init(r.buffer);
|
||||
const n = r.stream(&d.writer, limit) catch |err| switch (err) {
|
||||
var n = r.stream(&d.writer, limit) catch |err| switch (err) {
|
||||
error.WriteFailed => unreachable,
|
||||
error.ReadFailed => return error.ReadFailed,
|
||||
error.EndOfStream => return error.EndOfStream,
|
||||
};
|
||||
// If `stream` wrote to `r.buffer` without going through the writer,
|
||||
// we need to discard as much of the buffered data as possible.
|
||||
const remaining = @intFromEnum(limit) - n;
|
||||
const buffered_n_to_discard = @min(remaining, r.end - r.seek);
|
||||
n += buffered_n_to_discard;
|
||||
r.seek += buffered_n_to_discard;
|
||||
assert(n <= @intFromEnum(limit));
|
||||
return n;
|
||||
}
|
||||
@ -1720,6 +1726,18 @@ fn failingDiscard(r: *Reader, limit: Limit) Error!usize {
|
||||
return error.ReadFailed;
|
||||
}
|
||||
|
||||
test "discardAll that has to call discard multiple times on an indirect reader" {
|
||||
var fr: Reader = .fixed("ABCDEFGHIJKLMNOPQRSTUVWXYZ");
|
||||
var indirect_buffer: [3]u8 = undefined;
|
||||
var tri: std.testing.ReaderIndirect = .init(&fr, &indirect_buffer);
|
||||
const r = &tri.interface;
|
||||
|
||||
try r.discardAll(10);
|
||||
var remaining_buf: [16]u8 = undefined;
|
||||
try r.readSliceAll(&remaining_buf);
|
||||
try std.testing.expectEqualStrings(fr.buffer[10..], remaining_buf[0..]);
|
||||
}
|
||||
|
||||
test "readAlloc when the backing reader provides one byte at a time" {
|
||||
const str = "This is a test";
|
||||
var tiny_buffer: [1]u8 = undefined;
|
||||
|
||||
@ -13,6 +13,7 @@ const net = std.Io.net;
|
||||
const HostName = std.Io.net.HostName;
|
||||
const IpAddress = std.Io.net.IpAddress;
|
||||
const Allocator = std.mem.Allocator;
|
||||
const Alignment = std.mem.Alignment;
|
||||
const assert = std.debug.assert;
|
||||
const posix = std.posix;
|
||||
|
||||
@ -22,10 +23,30 @@ mutex: std.Thread.Mutex = .{},
|
||||
cond: std.Thread.Condition = .{},
|
||||
run_queue: std.SinglyLinkedList = .{},
|
||||
join_requested: bool = false,
|
||||
threads: std.ArrayList(std.Thread),
|
||||
stack_size: usize,
|
||||
cpu_count: std.Thread.CpuCountError!usize,
|
||||
concurrent_count: usize,
|
||||
/// All threads are spawned detached; this is how we wait until they all exit.
|
||||
wait_group: std.Thread.WaitGroup = .{},
|
||||
/// Maximum thread pool size (excluding main thread) when dispatching async
|
||||
/// tasks. Until this limit, calls to `Io.async` when all threads are busy will
|
||||
/// cause a new thread to be spawned and permanently added to the pool. After
|
||||
/// this limit, calls to `Io.async` when all threads are busy run the task
|
||||
/// immediately.
|
||||
///
|
||||
/// Defaults to a number equal to logical CPU cores.
|
||||
async_limit: Io.Limit,
|
||||
/// Maximum thread pool size (excluding main thread) for dispatching concurrent
|
||||
/// tasks. Until this limit, calls to `Io.concurrent` will increase the thread
|
||||
/// pool size.
|
||||
///
|
||||
/// concurrent tasks. After this number, calls to `Io.concurrent` return
|
||||
/// `error.ConcurrencyUnavailable`.
|
||||
concurrent_limit: Io.Limit = .unlimited,
|
||||
/// Error from calling `std.Thread.getCpuCount` in `init`.
|
||||
cpu_count_error: ?std.Thread.CpuCountError,
|
||||
/// Number of threads that are unavailable to take tasks. To calculate
|
||||
/// available count, subtract this from either `async_limit` or
|
||||
/// `concurrent_limit`.
|
||||
busy_count: usize = 0,
|
||||
|
||||
wsa: if (is_windows) Wsa else struct {} = .{},
|
||||
|
||||
@ -70,8 +91,6 @@ const Closure = struct {
|
||||
start: Start,
|
||||
node: std.SinglyLinkedList.Node = .{},
|
||||
cancel_tid: CancelId,
|
||||
/// Whether this task bumps minimum number of threads in the pool.
|
||||
is_concurrent: bool,
|
||||
|
||||
const Start = *const fn (*Closure) void;
|
||||
|
||||
@ -90,8 +109,6 @@ const Closure = struct {
|
||||
}
|
||||
};
|
||||
|
||||
pub const InitError = std.Thread.CpuCountError || Allocator.Error;
|
||||
|
||||
/// Related:
|
||||
/// * `init_single_threaded`
|
||||
pub fn init(
|
||||
@ -103,21 +120,20 @@ pub fn init(
|
||||
/// here.
|
||||
gpa: Allocator,
|
||||
) Threaded {
|
||||
if (builtin.single_threaded) return .init_single_threaded;
|
||||
|
||||
const cpu_count = std.Thread.getCpuCount();
|
||||
|
||||
var t: Threaded = .{
|
||||
.allocator = gpa,
|
||||
.threads = .empty,
|
||||
.stack_size = std.Thread.SpawnConfig.default_stack_size,
|
||||
.cpu_count = std.Thread.getCpuCount(),
|
||||
.concurrent_count = 0,
|
||||
.async_limit = if (cpu_count) |n| .limited(n - 1) else |_| .nothing,
|
||||
.cpu_count_error = if (cpu_count) |_| null else |e| e,
|
||||
.old_sig_io = undefined,
|
||||
.old_sig_pipe = undefined,
|
||||
.have_signal_handler = false,
|
||||
};
|
||||
|
||||
if (t.cpu_count) |n| {
|
||||
t.threads.ensureTotalCapacityPrecise(gpa, n - 1) catch {};
|
||||
} else |_| {}
|
||||
|
||||
if (posix.Sigaction != void) {
|
||||
// This causes sending `posix.SIG.IO` to thread to interrupt blocking
|
||||
// syscalls, returning `posix.E.INTR`.
|
||||
@ -142,19 +158,17 @@ pub fn init(
|
||||
/// * `deinit` is safe, but unnecessary to call.
|
||||
pub const init_single_threaded: Threaded = .{
|
||||
.allocator = .failing,
|
||||
.threads = .empty,
|
||||
.stack_size = std.Thread.SpawnConfig.default_stack_size,
|
||||
.cpu_count = 1,
|
||||
.concurrent_count = 0,
|
||||
.async_limit = .nothing,
|
||||
.cpu_count_error = null,
|
||||
.concurrent_limit = .nothing,
|
||||
.old_sig_io = undefined,
|
||||
.old_sig_pipe = undefined,
|
||||
.have_signal_handler = false,
|
||||
};
|
||||
|
||||
pub fn deinit(t: *Threaded) void {
|
||||
const gpa = t.allocator;
|
||||
t.join();
|
||||
t.threads.deinit(gpa);
|
||||
if (is_windows and t.wsa.status == .initialized) {
|
||||
if (ws2_32.WSACleanup() != 0) recoverableOsBugDetected();
|
||||
}
|
||||
@ -173,10 +187,12 @@ fn join(t: *Threaded) void {
|
||||
t.join_requested = true;
|
||||
}
|
||||
t.cond.broadcast();
|
||||
for (t.threads.items) |thread| thread.join();
|
||||
t.wait_group.wait();
|
||||
}
|
||||
|
||||
fn worker(t: *Threaded) void {
|
||||
defer t.wait_group.finish();
|
||||
|
||||
t.mutex.lock();
|
||||
defer t.mutex.unlock();
|
||||
|
||||
@ -184,12 +200,9 @@ fn worker(t: *Threaded) void {
|
||||
while (t.run_queue.popFirst()) |closure_node| {
|
||||
t.mutex.unlock();
|
||||
const closure: *Closure = @fieldParentPtr("node", closure_node);
|
||||
const is_concurrent = closure.is_concurrent;
|
||||
closure.start(closure);
|
||||
t.mutex.lock();
|
||||
if (is_concurrent) {
|
||||
t.concurrent_count -= 1;
|
||||
}
|
||||
t.busy_count -= 1;
|
||||
}
|
||||
if (t.join_requested) break;
|
||||
t.cond.wait(&t.mutex);
|
||||
@ -387,7 +400,7 @@ const AsyncClosure = struct {
|
||||
func: *const fn (context: *anyopaque, result: *anyopaque) void,
|
||||
reset_event: ResetEvent,
|
||||
select_condition: ?*ResetEvent,
|
||||
context_alignment: std.mem.Alignment,
|
||||
context_alignment: Alignment,
|
||||
result_offset: usize,
|
||||
alloc_len: usize,
|
||||
|
||||
@ -432,11 +445,10 @@ const AsyncClosure = struct {
|
||||
|
||||
fn init(
|
||||
gpa: Allocator,
|
||||
mode: enum { async, concurrent },
|
||||
result_len: usize,
|
||||
result_alignment: std.mem.Alignment,
|
||||
result_alignment: Alignment,
|
||||
context: []const u8,
|
||||
context_alignment: std.mem.Alignment,
|
||||
context_alignment: Alignment,
|
||||
func: *const fn (context: *const anyopaque, result: *anyopaque) void,
|
||||
) Allocator.Error!*AsyncClosure {
|
||||
const max_context_misalignment = context_alignment.toByteUnits() -| @alignOf(AsyncClosure);
|
||||
@ -454,10 +466,6 @@ const AsyncClosure = struct {
|
||||
.closure = .{
|
||||
.cancel_tid = .none,
|
||||
.start = start,
|
||||
.is_concurrent = switch (mode) {
|
||||
.async => false,
|
||||
.concurrent => true,
|
||||
},
|
||||
},
|
||||
.func = func,
|
||||
.context_alignment = context_alignment,
|
||||
@ -470,10 +478,15 @@ const AsyncClosure = struct {
|
||||
return ac;
|
||||
}
|
||||
|
||||
fn waitAndDeinit(ac: *AsyncClosure, gpa: Allocator, result: []u8) void {
|
||||
fn waitAndDeinit(ac: *AsyncClosure, t: *Threaded, result: []u8) void {
|
||||
ac.reset_event.wait(t) catch |err| switch (err) {
|
||||
error.Canceled => {
|
||||
ac.closure.requestCancel();
|
||||
ac.reset_event.waitUncancelable();
|
||||
},
|
||||
};
|
||||
@memcpy(result, ac.resultPointer()[0..result.len]);
|
||||
ac.deinit(gpa);
|
||||
ac.deinit(t.allocator);
|
||||
}
|
||||
|
||||
fn deinit(ac: *AsyncClosure, gpa: Allocator) void {
|
||||
@ -485,60 +498,50 @@ const AsyncClosure = struct {
|
||||
fn async(
|
||||
userdata: ?*anyopaque,
|
||||
result: []u8,
|
||||
result_alignment: std.mem.Alignment,
|
||||
result_alignment: Alignment,
|
||||
context: []const u8,
|
||||
context_alignment: std.mem.Alignment,
|
||||
context_alignment: Alignment,
|
||||
start: *const fn (context: *const anyopaque, result: *anyopaque) void,
|
||||
) ?*Io.AnyFuture {
|
||||
if (builtin.single_threaded) {
|
||||
const t: *Threaded = @ptrCast(@alignCast(userdata));
|
||||
if (builtin.single_threaded or t.async_limit == .nothing) {
|
||||
start(context.ptr, result.ptr);
|
||||
return null;
|
||||
}
|
||||
|
||||
const t: *Threaded = @ptrCast(@alignCast(userdata));
|
||||
const cpu_count = t.cpu_count catch {
|
||||
return concurrent(userdata, result.len, result_alignment, context, context_alignment, start) catch {
|
||||
start(context.ptr, result.ptr);
|
||||
return null;
|
||||
};
|
||||
};
|
||||
|
||||
const gpa = t.allocator;
|
||||
const ac = AsyncClosure.init(gpa, .async, result.len, result_alignment, context, context_alignment, start) catch {
|
||||
const ac = AsyncClosure.init(gpa, result.len, result_alignment, context, context_alignment, start) catch {
|
||||
start(context.ptr, result.ptr);
|
||||
return null;
|
||||
};
|
||||
|
||||
t.mutex.lock();
|
||||
|
||||
const thread_capacity = cpu_count - 1 + t.concurrent_count;
|
||||
const busy_count = t.busy_count;
|
||||
|
||||
t.threads.ensureTotalCapacityPrecise(gpa, thread_capacity) catch {
|
||||
if (busy_count >= @intFromEnum(t.async_limit)) {
|
||||
t.mutex.unlock();
|
||||
ac.deinit(gpa);
|
||||
start(context.ptr, result.ptr);
|
||||
return null;
|
||||
}
|
||||
|
||||
t.busy_count = busy_count + 1;
|
||||
|
||||
const pool_size = t.wait_group.value();
|
||||
if (pool_size - busy_count == 0) {
|
||||
t.wait_group.start();
|
||||
const thread = std.Thread.spawn(.{ .stack_size = t.stack_size }, worker, .{t}) catch {
|
||||
t.wait_group.finish();
|
||||
t.busy_count = busy_count;
|
||||
t.mutex.unlock();
|
||||
ac.deinit(gpa);
|
||||
start(context.ptr, result.ptr);
|
||||
return null;
|
||||
};
|
||||
thread.detach();
|
||||
}
|
||||
|
||||
t.run_queue.prepend(&ac.closure.node);
|
||||
|
||||
if (t.threads.items.len < thread_capacity) {
|
||||
const thread = std.Thread.spawn(.{ .stack_size = t.stack_size }, worker, .{t}) catch {
|
||||
if (t.threads.items.len == 0) {
|
||||
assert(t.run_queue.popFirst() == &ac.closure.node);
|
||||
t.mutex.unlock();
|
||||
ac.deinit(gpa);
|
||||
start(context.ptr, result.ptr);
|
||||
return null;
|
||||
}
|
||||
// Rely on other workers to do it.
|
||||
t.mutex.unlock();
|
||||
t.cond.signal();
|
||||
return @ptrCast(ac);
|
||||
};
|
||||
t.threads.appendAssumeCapacity(thread);
|
||||
}
|
||||
|
||||
t.mutex.unlock();
|
||||
t.cond.signal();
|
||||
return @ptrCast(ac);
|
||||
@ -547,45 +550,42 @@ fn async(
|
||||
fn concurrent(
|
||||
userdata: ?*anyopaque,
|
||||
result_len: usize,
|
||||
result_alignment: std.mem.Alignment,
|
||||
result_alignment: Alignment,
|
||||
context: []const u8,
|
||||
context_alignment: std.mem.Alignment,
|
||||
context_alignment: Alignment,
|
||||
start: *const fn (context: *const anyopaque, result: *anyopaque) void,
|
||||
) Io.ConcurrentError!*Io.AnyFuture {
|
||||
if (builtin.single_threaded) return error.ConcurrencyUnavailable;
|
||||
|
||||
const t: *Threaded = @ptrCast(@alignCast(userdata));
|
||||
const cpu_count = t.cpu_count catch 1;
|
||||
|
||||
const gpa = t.allocator;
|
||||
const ac = AsyncClosure.init(gpa, .concurrent, result_len, result_alignment, context, context_alignment, start) catch {
|
||||
const ac = AsyncClosure.init(gpa, result_len, result_alignment, context, context_alignment, start) catch
|
||||
return error.ConcurrencyUnavailable;
|
||||
};
|
||||
errdefer ac.deinit(gpa);
|
||||
|
||||
t.mutex.lock();
|
||||
defer t.mutex.unlock();
|
||||
|
||||
t.concurrent_count += 1;
|
||||
const thread_capacity = cpu_count - 1 + t.concurrent_count;
|
||||
const busy_count = t.busy_count;
|
||||
|
||||
t.threads.ensureTotalCapacity(gpa, thread_capacity) catch {
|
||||
t.mutex.unlock();
|
||||
ac.deinit(gpa);
|
||||
if (busy_count >= @intFromEnum(t.concurrent_limit))
|
||||
return error.ConcurrencyUnavailable;
|
||||
};
|
||||
|
||||
t.run_queue.prepend(&ac.closure.node);
|
||||
t.busy_count = busy_count + 1;
|
||||
errdefer t.busy_count = busy_count;
|
||||
|
||||
if (t.threads.items.len < thread_capacity) {
|
||||
const thread = std.Thread.spawn(.{ .stack_size = t.stack_size }, worker, .{t}) catch {
|
||||
assert(t.run_queue.popFirst() == &ac.closure.node);
|
||||
t.mutex.unlock();
|
||||
ac.deinit(gpa);
|
||||
const pool_size = t.wait_group.value();
|
||||
if (pool_size - busy_count == 0) {
|
||||
t.wait_group.start();
|
||||
errdefer t.wait_group.finish();
|
||||
|
||||
const thread = std.Thread.spawn(.{ .stack_size = t.stack_size }, worker, .{t}) catch
|
||||
return error.ConcurrencyUnavailable;
|
||||
};
|
||||
t.threads.appendAssumeCapacity(thread);
|
||||
thread.detach();
|
||||
}
|
||||
|
||||
t.mutex.unlock();
|
||||
t.run_queue.prepend(&ac.closure.node);
|
||||
t.cond.signal();
|
||||
return @ptrCast(ac);
|
||||
}
|
||||
@ -597,7 +597,7 @@ const GroupClosure = struct {
|
||||
/// Points to sibling `GroupClosure`. Used for walking the group to cancel all.
|
||||
node: std.SinglyLinkedList.Node,
|
||||
func: *const fn (*Io.Group, context: *anyopaque) void,
|
||||
context_alignment: std.mem.Alignment,
|
||||
context_alignment: Alignment,
|
||||
alloc_len: usize,
|
||||
|
||||
fn start(closure: *Closure) void {
|
||||
@ -638,7 +638,7 @@ const GroupClosure = struct {
|
||||
t: *Threaded,
|
||||
group: *Io.Group,
|
||||
context: []const u8,
|
||||
context_alignment: std.mem.Alignment,
|
||||
context_alignment: Alignment,
|
||||
func: *const fn (*Io.Group, context: *const anyopaque) void,
|
||||
) Allocator.Error!*GroupClosure {
|
||||
const max_context_misalignment = context_alignment.toByteUnits() -| @alignOf(GroupClosure);
|
||||
@ -652,7 +652,6 @@ const GroupClosure = struct {
|
||||
.closure = .{
|
||||
.cancel_tid = .none,
|
||||
.start = start,
|
||||
.is_concurrent = false,
|
||||
},
|
||||
.t = t,
|
||||
.group = group,
|
||||
@ -678,45 +677,48 @@ fn groupAsync(
|
||||
userdata: ?*anyopaque,
|
||||
group: *Io.Group,
|
||||
context: []const u8,
|
||||
context_alignment: std.mem.Alignment,
|
||||
context_alignment: Alignment,
|
||||
start: *const fn (*Io.Group, context: *const anyopaque) void,
|
||||
) void {
|
||||
if (builtin.single_threaded) return start(group, context.ptr);
|
||||
|
||||
const t: *Threaded = @ptrCast(@alignCast(userdata));
|
||||
const cpu_count = t.cpu_count catch 1;
|
||||
if (builtin.single_threaded or t.async_limit == .nothing)
|
||||
return start(group, context.ptr);
|
||||
|
||||
const gpa = t.allocator;
|
||||
const gc = GroupClosure.init(gpa, t, group, context, context_alignment, start) catch {
|
||||
const gc = GroupClosure.init(gpa, t, group, context, context_alignment, start) catch
|
||||
return start(group, context.ptr);
|
||||
};
|
||||
|
||||
t.mutex.lock();
|
||||
|
||||
const busy_count = t.busy_count;
|
||||
|
||||
if (busy_count >= @intFromEnum(t.async_limit)) {
|
||||
t.mutex.unlock();
|
||||
gc.deinit(gpa);
|
||||
return start(group, context.ptr);
|
||||
}
|
||||
|
||||
t.busy_count = busy_count + 1;
|
||||
|
||||
const pool_size = t.wait_group.value();
|
||||
if (pool_size - busy_count == 0) {
|
||||
t.wait_group.start();
|
||||
const thread = std.Thread.spawn(.{ .stack_size = t.stack_size }, worker, .{t}) catch {
|
||||
t.wait_group.finish();
|
||||
t.busy_count = busy_count;
|
||||
t.mutex.unlock();
|
||||
gc.deinit(gpa);
|
||||
return start(group, context.ptr);
|
||||
};
|
||||
thread.detach();
|
||||
}
|
||||
|
||||
// Append to the group linked list inside the mutex to make `Io.Group.async` thread-safe.
|
||||
gc.node = .{ .next = @ptrCast(@alignCast(group.token)) };
|
||||
group.token = &gc.node;
|
||||
|
||||
const thread_capacity = cpu_count - 1 + t.concurrent_count;
|
||||
|
||||
t.threads.ensureTotalCapacityPrecise(gpa, thread_capacity) catch {
|
||||
t.mutex.unlock();
|
||||
gc.deinit(gpa);
|
||||
return start(group, context.ptr);
|
||||
};
|
||||
|
||||
t.run_queue.prepend(&gc.closure.node);
|
||||
|
||||
if (t.threads.items.len < thread_capacity) {
|
||||
const thread = std.Thread.spawn(.{ .stack_size = t.stack_size }, worker, .{t}) catch {
|
||||
assert(t.run_queue.popFirst() == &gc.closure.node);
|
||||
t.mutex.unlock();
|
||||
gc.deinit(gpa);
|
||||
return start(group, context.ptr);
|
||||
};
|
||||
t.threads.appendAssumeCapacity(thread);
|
||||
}
|
||||
|
||||
// This needs to be done before unlocking the mutex to avoid a race with
|
||||
// the associated task finishing.
|
||||
const group_state: *std.atomic.Value(usize) = @ptrCast(&group.state);
|
||||
@ -794,25 +796,25 @@ fn await(
|
||||
userdata: ?*anyopaque,
|
||||
any_future: *Io.AnyFuture,
|
||||
result: []u8,
|
||||
result_alignment: std.mem.Alignment,
|
||||
result_alignment: Alignment,
|
||||
) void {
|
||||
_ = result_alignment;
|
||||
const t: *Threaded = @ptrCast(@alignCast(userdata));
|
||||
const closure: *AsyncClosure = @ptrCast(@alignCast(any_future));
|
||||
closure.waitAndDeinit(t.allocator, result);
|
||||
closure.waitAndDeinit(t, result);
|
||||
}
|
||||
|
||||
fn cancel(
|
||||
userdata: ?*anyopaque,
|
||||
any_future: *Io.AnyFuture,
|
||||
result: []u8,
|
||||
result_alignment: std.mem.Alignment,
|
||||
result_alignment: Alignment,
|
||||
) void {
|
||||
_ = result_alignment;
|
||||
const t: *Threaded = @ptrCast(@alignCast(userdata));
|
||||
const ac: *AsyncClosure = @ptrCast(@alignCast(any_future));
|
||||
ac.closure.requestCancel();
|
||||
ac.waitAndDeinit(t.allocator, result);
|
||||
ac.waitAndDeinit(t, result);
|
||||
}
|
||||
|
||||
fn cancelRequested(userdata: ?*anyopaque) bool {
|
||||
|
||||
@ -10,7 +10,7 @@ test "concurrent vs main prevents deadlock via oversubscription" {
|
||||
defer threaded.deinit();
|
||||
const io = threaded.io();
|
||||
|
||||
threaded.cpu_count = 1;
|
||||
threaded.async_limit = .nothing;
|
||||
|
||||
var queue: Io.Queue(u8) = .init(&.{});
|
||||
|
||||
@ -38,7 +38,7 @@ test "concurrent vs concurrent prevents deadlock via oversubscription" {
|
||||
defer threaded.deinit();
|
||||
const io = threaded.io();
|
||||
|
||||
threaded.cpu_count = 1;
|
||||
threaded.async_limit = .nothing;
|
||||
|
||||
var queue: Io.Queue(u8) = .init(&.{});
|
||||
|
||||
|
||||
@ -1,13 +1,14 @@
|
||||
//! This struct represents a kernel thread, and acts as a namespace for concurrency
|
||||
//! primitives that operate on kernel threads. For concurrency primitives that support
|
||||
//! both evented I/O and async I/O, see the respective names in the top level std namespace.
|
||||
//! This struct represents a kernel thread, and acts as a namespace for
|
||||
//! concurrency primitives that operate on kernel threads. For concurrency
|
||||
//! primitives that interact with the I/O interface, see `std.Io`.
|
||||
|
||||
const std = @import("std.zig");
|
||||
const builtin = @import("builtin");
|
||||
const math = std.math;
|
||||
const assert = std.debug.assert;
|
||||
const target = builtin.target;
|
||||
const native_os = builtin.os.tag;
|
||||
|
||||
const std = @import("std.zig");
|
||||
const math = std.math;
|
||||
const assert = std.debug.assert;
|
||||
const posix = std.posix;
|
||||
const windows = std.os.windows;
|
||||
const testing = std.testing;
|
||||
|
||||
@ -60,6 +60,10 @@ pub fn isDone(wg: *WaitGroup) bool {
|
||||
return (state / one_pending) == 0;
|
||||
}
|
||||
|
||||
pub fn value(wg: *WaitGroup) usize {
|
||||
return wg.state.load(.monotonic) / one_pending;
|
||||
}
|
||||
|
||||
// Spawns a new thread for the task. This is appropriate when the callee
|
||||
// delegates all work.
|
||||
pub fn spawnManager(
|
||||
|
||||
@ -30,7 +30,6 @@ const hashes = [_]Crypto{
|
||||
Crypto{ .ty = crypto.hash.sha3.Shake256, .name = "shake-256" },
|
||||
Crypto{ .ty = crypto.hash.sha3.TurboShake128(null), .name = "turboshake-128" },
|
||||
Crypto{ .ty = crypto.hash.sha3.TurboShake256(null), .name = "turboshake-256" },
|
||||
Crypto{ .ty = crypto.hash.sha3.KT128, .name = "kt128" },
|
||||
Crypto{ .ty = crypto.hash.blake2.Blake2s256, .name = "blake2s" },
|
||||
Crypto{ .ty = crypto.hash.blake2.Blake2b512, .name = "blake2b" },
|
||||
Crypto{ .ty = crypto.hash.Blake3, .name = "blake3" },
|
||||
@ -38,7 +37,6 @@ const hashes = [_]Crypto{
|
||||
|
||||
const parallel_hashes = [_]Crypto{
|
||||
Crypto{ .ty = crypto.hash.Blake3, .name = "blake3-parallel" },
|
||||
Crypto{ .ty = crypto.hash.sha3.KT128, .name = "kt128-parallel" },
|
||||
};
|
||||
|
||||
const block_size: usize = 8 * 8192;
|
||||
|
||||
@ -12,8 +12,8 @@ const Vec16 = @Vector(16, u32);
|
||||
const chunk_length = 1024;
|
||||
const max_depth = 54;
|
||||
|
||||
const simd_degree = std.simd.suggestVectorLength(u32) orelse 1;
|
||||
const max_simd_degree = simd_degree;
|
||||
pub const simd_degree = std.simd.suggestVectorLength(u32) orelse 1;
|
||||
pub const max_simd_degree = simd_degree;
|
||||
const max_simd_degree_or_2 = if (max_simd_degree > 2) max_simd_degree else 2;
|
||||
|
||||
/// Threshold for switching to parallel processing.
|
||||
@ -502,7 +502,9 @@ fn hashManySimd(
|
||||
var out_ptr = out.ptr;
|
||||
var cnt = counter;
|
||||
|
||||
if (simd_degree >= 16) {
|
||||
const simd_deg = comptime simd_degree;
|
||||
|
||||
if (comptime simd_deg >= 16) {
|
||||
while (remaining >= 16) {
|
||||
const sixteen_inputs = [16][*]const u8{
|
||||
inp[0], inp[1], inp[2], inp[3],
|
||||
@ -523,7 +525,7 @@ fn hashManySimd(
|
||||
}
|
||||
}
|
||||
|
||||
if (simd_degree >= 8) {
|
||||
if (comptime simd_deg >= 8) {
|
||||
while (remaining >= 8) {
|
||||
const eight_inputs = [8][*]const u8{
|
||||
inp[0], inp[1], inp[2], inp[3],
|
||||
@ -542,7 +544,7 @@ fn hashManySimd(
|
||||
}
|
||||
}
|
||||
|
||||
if (simd_degree >= 4) {
|
||||
if (comptime simd_deg >= 4) {
|
||||
while (remaining >= 4) {
|
||||
const four_inputs = [4][*]const u8{
|
||||
inp[0],
|
||||
@ -569,7 +571,7 @@ fn hashManySimd(
|
||||
}
|
||||
|
||||
fn hashMany(inputs: [][*]const u8, num_inputs: usize, blocks: usize, key: [8]u32, counter: u64, increment_counter: bool, flags: Flags, flags_start: Flags, flags_end: Flags, out: []u8) void {
|
||||
if (max_simd_degree >= 4) {
|
||||
if (comptime max_simd_degree >= 4) {
|
||||
hashManySimd(inputs, num_inputs, blocks, key, counter, increment_counter, flags, flags_start, flags_end, out);
|
||||
} else {
|
||||
hashManyPortable(inputs, num_inputs, blocks, key, counter, increment_counter, flags, flags_start, flags_end, out);
|
||||
@ -907,7 +909,7 @@ pub const Blake3 = struct {
|
||||
pub const digest_length = 32;
|
||||
pub const key_length = 32;
|
||||
|
||||
pub const Options = struct { key: ?[key_length]u8 = null };
|
||||
pub const Options = struct { key: ?[digest_length]u8 = null };
|
||||
pub const KdfOptions = struct {};
|
||||
|
||||
key: [8]u32,
|
||||
|
||||
File diff suppressed because it is too large
Load Diff
@ -4,8 +4,6 @@ const assert = std.debug.assert;
|
||||
const math = std.math;
|
||||
const mem = std.mem;
|
||||
|
||||
const kangarootwelve = @import("kangarootwelve.zig");
|
||||
|
||||
const KeccakState = std.crypto.core.keccak.State;
|
||||
|
||||
pub const Sha3_224 = Keccak(1600, 224, 0x06, 24);
|
||||
@ -28,9 +26,6 @@ pub const KMac256 = KMac(256);
|
||||
pub const TupleHash128 = TupleHash(128);
|
||||
pub const TupleHash256 = TupleHash(256);
|
||||
|
||||
pub const KT128 = kangarootwelve.KT128;
|
||||
pub const KT256 = kangarootwelve.KT256;
|
||||
|
||||
/// TurboSHAKE128 is a XOF (a secure hash function with a variable output length), with a 128 bit security level.
|
||||
/// It is based on the same permutation as SHA3 and SHAKE128, but which much higher performance.
|
||||
/// The delimiter is 0x1f by default, but can be changed for context-separation.
|
||||
@ -486,10 +481,6 @@ pub const NistLengthEncoding = enum {
|
||||
|
||||
const htest = @import("test.zig");
|
||||
|
||||
test {
|
||||
_ = kangarootwelve;
|
||||
}
|
||||
|
||||
test "sha3-224 single" {
|
||||
try htest.assertEqualHash(Sha3_224, "6b4e03423667dbb73b6e15454f0eb1abd4597f9a1b078e3f5b5a6bc7", "");
|
||||
try htest.assertEqualHash(Sha3_224, "e642824c3f8cf24ad09234ee7d3c766fc9a3a5168d0c94ad73b46fdf", "abc");
|
||||
|
||||
@ -1,63 +0,0 @@
|
||||
#target=x86_64-linux-selfhosted
|
||||
#target=x86_64-windows-selfhosted
|
||||
#target=x86_64-linux-cbe
|
||||
#target=x86_64-windows-cbe
|
||||
//#target=wasm32-wasi-selfhosted
|
||||
#update=initial version
|
||||
#file=main.zig
|
||||
const std = @import("std");
|
||||
pub fn main() !void {
|
||||
try std.fs.File.stdout().writeAll(foo);
|
||||
}
|
||||
const foo = "good morning\n";
|
||||
#expect_stdout="good morning\n"
|
||||
|
||||
#update=add new declaration
|
||||
#file=main.zig
|
||||
const std = @import("std");
|
||||
pub fn main() !void {
|
||||
try std.fs.File.stdout().writeAll(foo);
|
||||
}
|
||||
const foo = "good morning\n";
|
||||
const bar = "good evening\n";
|
||||
#expect_stdout="good morning\n"
|
||||
|
||||
#update=reference new declaration
|
||||
#file=main.zig
|
||||
const std = @import("std");
|
||||
pub fn main() !void {
|
||||
try std.fs.File.stdout().writeAll(bar);
|
||||
}
|
||||
const foo = "good morning\n";
|
||||
const bar = "good evening\n";
|
||||
#expect_stdout="good evening\n"
|
||||
|
||||
#update=reference missing declaration
|
||||
#file=main.zig
|
||||
const std = @import("std");
|
||||
pub fn main() !void {
|
||||
try std.fs.File.stdout().writeAll(qux);
|
||||
}
|
||||
const foo = "good morning\n";
|
||||
const bar = "good evening\n";
|
||||
#expect_error=main.zig:3:39: error: use of undeclared identifier 'qux'
|
||||
|
||||
#update=add missing declaration
|
||||
#file=main.zig
|
||||
const std = @import("std");
|
||||
pub fn main() !void {
|
||||
try std.fs.File.stdout().writeAll(qux);
|
||||
}
|
||||
const foo = "good morning\n";
|
||||
const bar = "good evening\n";
|
||||
const qux = "good night\n";
|
||||
#expect_stdout="good night\n"
|
||||
|
||||
#update=remove unused declarations
|
||||
#file=main.zig
|
||||
const std = @import("std");
|
||||
pub fn main() !void {
|
||||
try std.fs.File.stdout().writeAll(qux);
|
||||
}
|
||||
const qux = "good night\n";
|
||||
#expect_stdout="good night\n"
|
||||
Loading…
x
Reference in New Issue
Block a user