Compare commits

...

18 Commits

Author SHA1 Message Date
Ryan Liptak
21f9f378f1 Reader.defaultDiscard: Fix for use with an indirect reader
If a Reader implementation implements `stream` by ignoring the Writer, writing directly to its internal buffer, and returning 0, then `defaultDiscard` would not update `seek` and also return 0, which is incorrect and can cause `discardShort` to violate the contract of `VTable.discard` by calling into `vtable.discard` with a non-empty buffer.

This commit fixes the problem by advancing seek up to the limit after the stream call. This logic could likely be somewhat simplified in the future depending on how #25170 is resolved.
2025-11-21 22:34:55 -08:00
Andrew Kelley
2ea55d7153
Merge pull request #25998 from ziglang/std.Io.Threaded-async-guarantee
std.Io: guarantee when async() returns, task is already completed or has been successfully assigned a unit of concurrency
2025-11-21 20:56:29 -08:00
Andrew Kelley
d828115dab
Merge pull request #25903 from aznashwan/minor-docs-fixes
docs: minor fixes to main language reference page.
2025-11-21 19:56:15 -08:00
Andrew Kelley
7096e66ca9 std.Thread: update doc comments 2025-11-21 19:54:41 -08:00
Andrew Kelley
eb038ffbc1 std.Io.Threaded: forward cancellation requests to awaited tasks 2025-11-21 19:54:41 -08:00
Andrew Kelley
b052afd24b std.Io.Threaded: import std.mem.Alignment 2025-11-21 19:54:41 -08:00
Andrew Kelley
cf744aa182 std.Io.Threaded: slightly different semantics
while still preserving the guarantee about async() being assigned a unit
of concurrency (or immediately running the task), this change:
* retains the error from calling getCpuCount()
* spawns all threads in detached mode, using WaitGroup to join them
* treats all workers the same regardless of whether they are processing
  concurrent or async tasks. one thread pool does all the work, while
  respecting async and concurrent limits.
2025-11-21 19:54:41 -08:00
Andrew Kelley
13b537d77c std.Io.Threaded: remove dead code 2025-11-21 19:54:41 -08:00
Andrew Kelley
aae85a4130 std.Io.Threaded: allow calling init in single-threaded mode 2025-11-21 19:54:41 -08:00
Andrew Kelley
b4ec78906c std.Io: update async documentation to reflect the guarantee 2025-11-21 19:54:41 -08:00
Loris Cro
69f9395b38 fix logic bug in groupAsync 2025-11-21 19:54:41 -08:00
Loris Cro
ff883dd6ce fix single-threaded builds 2025-11-21 19:54:41 -08:00
Loris Cro
8eaebf5939 Io.Threaded PoC reimplementation
This is a reimplementation of Io.Threaded that fixes the issues
highlighted in the recent Zulip discussion. It's poorly tested but it
does successfully run to completion the litmust test example that I
offered in the discussion.

This implementation has the following key design decisions:

- `t.cpu_count` is used as the threadpool size.
- `t.concurrency_limit` is used as the maximum number of
  "burst, one-shot" threads that can be spawned by `io.concurrent` past
  `t.cpu_count`.
- `t.available_thread_count` is the number of threads in the pool that
  is not currently busy with work (the bookkeeping happens in the worker
  function).
- `t.one_shot_thread_count` is the number of active threads that were
  spawned by `io.concurrent` past `t.cpu_count`.

In this implementation:

- `io.async` first tries to decrement `t.available_thread_count`. If
  there are no threads available, it tries to spawn a new one if possible,
  otherwise it runs the task immediately.
- `io.concurrent` first tries to use a thread in the pool same as
  `io.async`, but on failure (no available threads and pool size limit
  reached) it tries to spawn a new one-shot thread. One shot threads
  run a different main function that just executes one task, decrements
  the number of active one shot threads, and then exits.

A relevant future improvement is to have one-shot threads stay on for a
few seconds (and potentially pick up a new task) to amortize spawning
costs.
2025-11-21 19:54:41 -08:00
Andrew Kelley
d54fbc0123 disable flaky test/incremental/add_decl
tracked by https://github.com/ziglang/zig/issues/26003
2025-11-21 19:53:33 -08:00
Andrew Kelley
bb3b5d09cc Revert std.crypto kangarootwelve addition
I would like a chance to review this before it lands, please. Feel free
to submit the work again without changes and I will make review
comments.

In the meantime, these reverts avoid intermittent CI failures, and
remove bad patterns from occurring in the standard library that other
users might copy.

Revert "std.crypto: improve KT documentation, use key_length for B3 key length (#25807)"

This reverts commit 4b593a6c24797484e68a668818736b0f6a8d81a2.

Revert "crypto - threaded K12: separate context computation from thread spawning (#25793)"

This reverts commit ee4df4ad3edad160fb737a1935cd86bc2f9cfbbe.

Revert "crypto.kt128: when using incremental hashing, use SIMD when possible (#25783)"

This reverts commit bf9082518c32ce7d53d011777bf8d8056472cbf9.

Revert "Add std.crypto.hash.sha3.{KT128,KT256} - RFC 9861. (#25593)"

This reverts commit 95c76b1b4aa7302966281c6b9b7f6cadea3cf7a6.
2025-11-21 19:43:01 -08:00
Alex Rønne Petersen
a892e09435
Revert "ci: allow riscv64-linux on Forgejo Actions to run on PRs for now"
This reverts commit 2cdafe91065c5d477563361a8fe1f637898ca285.
2025-11-22 03:08:49 +01:00
Nashwan Azhari
af7dec94c6
docs: use custom error set in values.zig sample.
Signed-off-by: Nashwan Azhari <aznashwan@icloud.com>
2025-11-22 02:56:40 +02:00
Nashwan Azhari
153521279f
docs: remove normal-doc comment interleaving bug note.
Signed-off-by: Nashwan Azhari <aznashwan@icloud.com>
2025-11-22 02:51:16 +02:00
14 changed files with 188 additions and 2184 deletions

View File

@ -52,7 +52,7 @@ jobs:
run: sh ci/loongarch64-linux-release.sh run: sh ci/loongarch64-linux-release.sh
timeout-minutes: 180 timeout-minutes: 180
riscv64-linux-debug: riscv64-linux-debug:
# if: github.event_name != 'pull_request' if: github.event_name != 'pull_request'
runs-on: [self-hosted, riscv64-linux] runs-on: [self-hosted, riscv64-linux]
steps: steps:
- name: Checkout - name: Checkout
@ -63,7 +63,7 @@ jobs:
run: sh ci/riscv64-linux-debug.sh run: sh ci/riscv64-linux-debug.sh
timeout-minutes: 540 timeout-minutes: 540
riscv64-linux-release: riscv64-linux-release:
# if: github.event_name != 'pull_request' if: github.event_name != 'pull_request'
runs-on: [self-hosted, riscv64-linux] runs-on: [self-hosted, riscv64-linux]
steps: steps:
- name: Checkout - name: Checkout

View File

@ -421,8 +421,7 @@
{#code|unattached_doc-comment.zig#} {#code|unattached_doc-comment.zig#}
<p> <p>
Doc comments can be interleaved with normal comments. Currently, when producing Doc comments can be interleaved with normal comments, which are ignored.
the package documentation, normal comments are merged with doc comments.
</p> </p>
{#header_close#} {#header_close#}
{#header_open|Top-Level Doc Comments#} {#header_open|Top-Level Doc Comments#}

View File

@ -4,6 +4,11 @@ const std = @import("std");
const os = std.os; const os = std.os;
const assert = std.debug.assert; const assert = std.debug.assert;
// Custom error set definition:
const ExampleErrorSet = error{
ExampleErrorVariant,
};
pub fn main() void { pub fn main() void {
// integers // integers
const one_plus_one: i32 = 1 + 1; const one_plus_one: i32 = 1 + 1;
@ -36,7 +41,7 @@ pub fn main() void {
}); });
// error union // error union
var number_or_error: anyerror!i32 = error.ArgNotFound; var number_or_error: ExampleErrorSet!i32 = ExampleErrorSet.ExampleErrorVariant;
print("\nerror union 1\ntype: {}\nvalue: {!}\n", .{ print("\nerror union 1\ntype: {}\nvalue: {!}\n", .{
@TypeOf(number_or_error), @TypeOf(number_or_error),

View File

@ -580,6 +580,9 @@ pub const VTable = struct {
/// If it returns `null` it means `result` has been already populated and /// If it returns `null` it means `result` has been already populated and
/// `await` will be a no-op. /// `await` will be a no-op.
/// ///
/// When this function returns non-null, the implementation guarantees that
/// a unit of concurrency has been assigned to the returned task.
///
/// Thread-safe. /// Thread-safe.
async: *const fn ( async: *const fn (
/// Corresponds to `Io.userdata`. /// Corresponds to `Io.userdata`.
@ -1024,6 +1027,10 @@ pub const Group = struct {
/// ///
/// `function` *may* be called immediately, before `async` returns. /// `function` *may* be called immediately, before `async` returns.
/// ///
/// When this function returns, it is guaranteed that `function` has
/// already been called and completed, or it has successfully been assigned
/// a unit of concurrency.
///
/// After this is called, `wait` or `cancel` must be called before the /// After this is called, `wait` or `cancel` must be called before the
/// group is deinitialized. /// group is deinitialized.
/// ///
@ -1094,6 +1101,10 @@ pub fn Select(comptime U: type) type {
/// ///
/// `function` *may* be called immediately, before `async` returns. /// `function` *may* be called immediately, before `async` returns.
/// ///
/// When this function returns, it is guaranteed that `function` has
/// already been called and completed, or it has successfully been
/// assigned a unit of concurrency.
///
/// After this is called, `wait` or `cancel` must be called before the /// After this is called, `wait` or `cancel` must be called before the
/// select is deinitialized. /// select is deinitialized.
/// ///
@ -1524,8 +1535,11 @@ pub fn Queue(Elem: type) type {
/// not guaranteed to be available until `await` is called. /// not guaranteed to be available until `await` is called.
/// ///
/// `function` *may* be called immediately, before `async` returns. This has /// `function` *may* be called immediately, before `async` returns. This has
/// weaker guarantees than `concurrent`, making more portable and /// weaker guarantees than `concurrent`, making more portable and reusable.
/// reusable. ///
/// When this function returns, it is guaranteed that `function` has already
/// been called and completed, or it has successfully been assigned a unit of
/// concurrency.
/// ///
/// See also: /// See also:
/// * `Group` /// * `Group`

View File

@ -200,11 +200,17 @@ pub fn defaultDiscard(r: *Reader, limit: Limit) Error!usize {
r.seek = 0; r.seek = 0;
r.end = 0; r.end = 0;
var d: Writer.Discarding = .init(r.buffer); var d: Writer.Discarding = .init(r.buffer);
const n = r.stream(&d.writer, limit) catch |err| switch (err) { var n = r.stream(&d.writer, limit) catch |err| switch (err) {
error.WriteFailed => unreachable, error.WriteFailed => unreachable,
error.ReadFailed => return error.ReadFailed, error.ReadFailed => return error.ReadFailed,
error.EndOfStream => return error.EndOfStream, error.EndOfStream => return error.EndOfStream,
}; };
// If `stream` wrote to `r.buffer` without going through the writer,
// we need to discard as much of the buffered data as possible.
const remaining = @intFromEnum(limit) - n;
const buffered_n_to_discard = @min(remaining, r.end - r.seek);
n += buffered_n_to_discard;
r.seek += buffered_n_to_discard;
assert(n <= @intFromEnum(limit)); assert(n <= @intFromEnum(limit));
return n; return n;
} }
@ -1720,6 +1726,18 @@ fn failingDiscard(r: *Reader, limit: Limit) Error!usize {
return error.ReadFailed; return error.ReadFailed;
} }
test "discardAll that has to call discard multiple times on an indirect reader" {
var fr: Reader = .fixed("ABCDEFGHIJKLMNOPQRSTUVWXYZ");
var indirect_buffer: [3]u8 = undefined;
var tri: std.testing.ReaderIndirect = .init(&fr, &indirect_buffer);
const r = &tri.interface;
try r.discardAll(10);
var remaining_buf: [16]u8 = undefined;
try r.readSliceAll(&remaining_buf);
try std.testing.expectEqualStrings(fr.buffer[10..], remaining_buf[0..]);
}
test "readAlloc when the backing reader provides one byte at a time" { test "readAlloc when the backing reader provides one byte at a time" {
const str = "This is a test"; const str = "This is a test";
var tiny_buffer: [1]u8 = undefined; var tiny_buffer: [1]u8 = undefined;

View File

@ -13,6 +13,7 @@ const net = std.Io.net;
const HostName = std.Io.net.HostName; const HostName = std.Io.net.HostName;
const IpAddress = std.Io.net.IpAddress; const IpAddress = std.Io.net.IpAddress;
const Allocator = std.mem.Allocator; const Allocator = std.mem.Allocator;
const Alignment = std.mem.Alignment;
const assert = std.debug.assert; const assert = std.debug.assert;
const posix = std.posix; const posix = std.posix;
@ -22,10 +23,30 @@ mutex: std.Thread.Mutex = .{},
cond: std.Thread.Condition = .{}, cond: std.Thread.Condition = .{},
run_queue: std.SinglyLinkedList = .{}, run_queue: std.SinglyLinkedList = .{},
join_requested: bool = false, join_requested: bool = false,
threads: std.ArrayList(std.Thread),
stack_size: usize, stack_size: usize,
cpu_count: std.Thread.CpuCountError!usize, /// All threads are spawned detached; this is how we wait until they all exit.
concurrent_count: usize, wait_group: std.Thread.WaitGroup = .{},
/// Maximum thread pool size (excluding main thread) when dispatching async
/// tasks. Until this limit, calls to `Io.async` when all threads are busy will
/// cause a new thread to be spawned and permanently added to the pool. After
/// this limit, calls to `Io.async` when all threads are busy run the task
/// immediately.
///
/// Defaults to a number equal to logical CPU cores.
async_limit: Io.Limit,
/// Maximum thread pool size (excluding main thread) for dispatching concurrent
/// tasks. Until this limit, calls to `Io.concurrent` will increase the thread
/// pool size.
///
/// concurrent tasks. After this number, calls to `Io.concurrent` return
/// `error.ConcurrencyUnavailable`.
concurrent_limit: Io.Limit = .unlimited,
/// Error from calling `std.Thread.getCpuCount` in `init`.
cpu_count_error: ?std.Thread.CpuCountError,
/// Number of threads that are unavailable to take tasks. To calculate
/// available count, subtract this from either `async_limit` or
/// `concurrent_limit`.
busy_count: usize = 0,
wsa: if (is_windows) Wsa else struct {} = .{}, wsa: if (is_windows) Wsa else struct {} = .{},
@ -70,8 +91,6 @@ const Closure = struct {
start: Start, start: Start,
node: std.SinglyLinkedList.Node = .{}, node: std.SinglyLinkedList.Node = .{},
cancel_tid: CancelId, cancel_tid: CancelId,
/// Whether this task bumps minimum number of threads in the pool.
is_concurrent: bool,
const Start = *const fn (*Closure) void; const Start = *const fn (*Closure) void;
@ -90,8 +109,6 @@ const Closure = struct {
} }
}; };
pub const InitError = std.Thread.CpuCountError || Allocator.Error;
/// Related: /// Related:
/// * `init_single_threaded` /// * `init_single_threaded`
pub fn init( pub fn init(
@ -103,21 +120,20 @@ pub fn init(
/// here. /// here.
gpa: Allocator, gpa: Allocator,
) Threaded { ) Threaded {
if (builtin.single_threaded) return .init_single_threaded;
const cpu_count = std.Thread.getCpuCount();
var t: Threaded = .{ var t: Threaded = .{
.allocator = gpa, .allocator = gpa,
.threads = .empty,
.stack_size = std.Thread.SpawnConfig.default_stack_size, .stack_size = std.Thread.SpawnConfig.default_stack_size,
.cpu_count = std.Thread.getCpuCount(), .async_limit = if (cpu_count) |n| .limited(n - 1) else |_| .nothing,
.concurrent_count = 0, .cpu_count_error = if (cpu_count) |_| null else |e| e,
.old_sig_io = undefined, .old_sig_io = undefined,
.old_sig_pipe = undefined, .old_sig_pipe = undefined,
.have_signal_handler = false, .have_signal_handler = false,
}; };
if (t.cpu_count) |n| {
t.threads.ensureTotalCapacityPrecise(gpa, n - 1) catch {};
} else |_| {}
if (posix.Sigaction != void) { if (posix.Sigaction != void) {
// This causes sending `posix.SIG.IO` to thread to interrupt blocking // This causes sending `posix.SIG.IO` to thread to interrupt blocking
// syscalls, returning `posix.E.INTR`. // syscalls, returning `posix.E.INTR`.
@ -142,19 +158,17 @@ pub fn init(
/// * `deinit` is safe, but unnecessary to call. /// * `deinit` is safe, but unnecessary to call.
pub const init_single_threaded: Threaded = .{ pub const init_single_threaded: Threaded = .{
.allocator = .failing, .allocator = .failing,
.threads = .empty,
.stack_size = std.Thread.SpawnConfig.default_stack_size, .stack_size = std.Thread.SpawnConfig.default_stack_size,
.cpu_count = 1, .async_limit = .nothing,
.concurrent_count = 0, .cpu_count_error = null,
.concurrent_limit = .nothing,
.old_sig_io = undefined, .old_sig_io = undefined,
.old_sig_pipe = undefined, .old_sig_pipe = undefined,
.have_signal_handler = false, .have_signal_handler = false,
}; };
pub fn deinit(t: *Threaded) void { pub fn deinit(t: *Threaded) void {
const gpa = t.allocator;
t.join(); t.join();
t.threads.deinit(gpa);
if (is_windows and t.wsa.status == .initialized) { if (is_windows and t.wsa.status == .initialized) {
if (ws2_32.WSACleanup() != 0) recoverableOsBugDetected(); if (ws2_32.WSACleanup() != 0) recoverableOsBugDetected();
} }
@ -173,10 +187,12 @@ fn join(t: *Threaded) void {
t.join_requested = true; t.join_requested = true;
} }
t.cond.broadcast(); t.cond.broadcast();
for (t.threads.items) |thread| thread.join(); t.wait_group.wait();
} }
fn worker(t: *Threaded) void { fn worker(t: *Threaded) void {
defer t.wait_group.finish();
t.mutex.lock(); t.mutex.lock();
defer t.mutex.unlock(); defer t.mutex.unlock();
@ -184,12 +200,9 @@ fn worker(t: *Threaded) void {
while (t.run_queue.popFirst()) |closure_node| { while (t.run_queue.popFirst()) |closure_node| {
t.mutex.unlock(); t.mutex.unlock();
const closure: *Closure = @fieldParentPtr("node", closure_node); const closure: *Closure = @fieldParentPtr("node", closure_node);
const is_concurrent = closure.is_concurrent;
closure.start(closure); closure.start(closure);
t.mutex.lock(); t.mutex.lock();
if (is_concurrent) { t.busy_count -= 1;
t.concurrent_count -= 1;
}
} }
if (t.join_requested) break; if (t.join_requested) break;
t.cond.wait(&t.mutex); t.cond.wait(&t.mutex);
@ -387,7 +400,7 @@ const AsyncClosure = struct {
func: *const fn (context: *anyopaque, result: *anyopaque) void, func: *const fn (context: *anyopaque, result: *anyopaque) void,
reset_event: ResetEvent, reset_event: ResetEvent,
select_condition: ?*ResetEvent, select_condition: ?*ResetEvent,
context_alignment: std.mem.Alignment, context_alignment: Alignment,
result_offset: usize, result_offset: usize,
alloc_len: usize, alloc_len: usize,
@ -432,11 +445,10 @@ const AsyncClosure = struct {
fn init( fn init(
gpa: Allocator, gpa: Allocator,
mode: enum { async, concurrent },
result_len: usize, result_len: usize,
result_alignment: std.mem.Alignment, result_alignment: Alignment,
context: []const u8, context: []const u8,
context_alignment: std.mem.Alignment, context_alignment: Alignment,
func: *const fn (context: *const anyopaque, result: *anyopaque) void, func: *const fn (context: *const anyopaque, result: *anyopaque) void,
) Allocator.Error!*AsyncClosure { ) Allocator.Error!*AsyncClosure {
const max_context_misalignment = context_alignment.toByteUnits() -| @alignOf(AsyncClosure); const max_context_misalignment = context_alignment.toByteUnits() -| @alignOf(AsyncClosure);
@ -454,10 +466,6 @@ const AsyncClosure = struct {
.closure = .{ .closure = .{
.cancel_tid = .none, .cancel_tid = .none,
.start = start, .start = start,
.is_concurrent = switch (mode) {
.async => false,
.concurrent => true,
},
}, },
.func = func, .func = func,
.context_alignment = context_alignment, .context_alignment = context_alignment,
@ -470,10 +478,15 @@ const AsyncClosure = struct {
return ac; return ac;
} }
fn waitAndDeinit(ac: *AsyncClosure, gpa: Allocator, result: []u8) void { fn waitAndDeinit(ac: *AsyncClosure, t: *Threaded, result: []u8) void {
ac.reset_event.waitUncancelable(); ac.reset_event.wait(t) catch |err| switch (err) {
error.Canceled => {
ac.closure.requestCancel();
ac.reset_event.waitUncancelable();
},
};
@memcpy(result, ac.resultPointer()[0..result.len]); @memcpy(result, ac.resultPointer()[0..result.len]);
ac.deinit(gpa); ac.deinit(t.allocator);
} }
fn deinit(ac: *AsyncClosure, gpa: Allocator) void { fn deinit(ac: *AsyncClosure, gpa: Allocator) void {
@ -485,60 +498,50 @@ const AsyncClosure = struct {
fn async( fn async(
userdata: ?*anyopaque, userdata: ?*anyopaque,
result: []u8, result: []u8,
result_alignment: std.mem.Alignment, result_alignment: Alignment,
context: []const u8, context: []const u8,
context_alignment: std.mem.Alignment, context_alignment: Alignment,
start: *const fn (context: *const anyopaque, result: *anyopaque) void, start: *const fn (context: *const anyopaque, result: *anyopaque) void,
) ?*Io.AnyFuture { ) ?*Io.AnyFuture {
if (builtin.single_threaded) { const t: *Threaded = @ptrCast(@alignCast(userdata));
if (builtin.single_threaded or t.async_limit == .nothing) {
start(context.ptr, result.ptr); start(context.ptr, result.ptr);
return null; return null;
} }
const t: *Threaded = @ptrCast(@alignCast(userdata));
const cpu_count = t.cpu_count catch {
return concurrent(userdata, result.len, result_alignment, context, context_alignment, start) catch {
start(context.ptr, result.ptr);
return null;
};
};
const gpa = t.allocator; const gpa = t.allocator;
const ac = AsyncClosure.init(gpa, .async, result.len, result_alignment, context, context_alignment, start) catch { const ac = AsyncClosure.init(gpa, result.len, result_alignment, context, context_alignment, start) catch {
start(context.ptr, result.ptr); start(context.ptr, result.ptr);
return null; return null;
}; };
t.mutex.lock(); t.mutex.lock();
const thread_capacity = cpu_count - 1 + t.concurrent_count; const busy_count = t.busy_count;
t.threads.ensureTotalCapacityPrecise(gpa, thread_capacity) catch { if (busy_count >= @intFromEnum(t.async_limit)) {
t.mutex.unlock(); t.mutex.unlock();
ac.deinit(gpa); ac.deinit(gpa);
start(context.ptr, result.ptr); start(context.ptr, result.ptr);
return null; return null;
};
t.run_queue.prepend(&ac.closure.node);
if (t.threads.items.len < thread_capacity) {
const thread = std.Thread.spawn(.{ .stack_size = t.stack_size }, worker, .{t}) catch {
if (t.threads.items.len == 0) {
assert(t.run_queue.popFirst() == &ac.closure.node);
t.mutex.unlock();
ac.deinit(gpa);
start(context.ptr, result.ptr);
return null;
}
// Rely on other workers to do it.
t.mutex.unlock();
t.cond.signal();
return @ptrCast(ac);
};
t.threads.appendAssumeCapacity(thread);
} }
t.busy_count = busy_count + 1;
const pool_size = t.wait_group.value();
if (pool_size - busy_count == 0) {
t.wait_group.start();
const thread = std.Thread.spawn(.{ .stack_size = t.stack_size }, worker, .{t}) catch {
t.wait_group.finish();
t.busy_count = busy_count;
t.mutex.unlock();
ac.deinit(gpa);
start(context.ptr, result.ptr);
return null;
};
thread.detach();
}
t.run_queue.prepend(&ac.closure.node);
t.mutex.unlock(); t.mutex.unlock();
t.cond.signal(); t.cond.signal();
return @ptrCast(ac); return @ptrCast(ac);
@ -547,45 +550,42 @@ fn async(
fn concurrent( fn concurrent(
userdata: ?*anyopaque, userdata: ?*anyopaque,
result_len: usize, result_len: usize,
result_alignment: std.mem.Alignment, result_alignment: Alignment,
context: []const u8, context: []const u8,
context_alignment: std.mem.Alignment, context_alignment: Alignment,
start: *const fn (context: *const anyopaque, result: *anyopaque) void, start: *const fn (context: *const anyopaque, result: *anyopaque) void,
) Io.ConcurrentError!*Io.AnyFuture { ) Io.ConcurrentError!*Io.AnyFuture {
if (builtin.single_threaded) return error.ConcurrencyUnavailable; if (builtin.single_threaded) return error.ConcurrencyUnavailable;
const t: *Threaded = @ptrCast(@alignCast(userdata)); const t: *Threaded = @ptrCast(@alignCast(userdata));
const cpu_count = t.cpu_count catch 1;
const gpa = t.allocator; const gpa = t.allocator;
const ac = AsyncClosure.init(gpa, .concurrent, result_len, result_alignment, context, context_alignment, start) catch { const ac = AsyncClosure.init(gpa, result_len, result_alignment, context, context_alignment, start) catch
return error.ConcurrencyUnavailable; return error.ConcurrencyUnavailable;
}; errdefer ac.deinit(gpa);
t.mutex.lock(); t.mutex.lock();
defer t.mutex.unlock();
t.concurrent_count += 1; const busy_count = t.busy_count;
const thread_capacity = cpu_count - 1 + t.concurrent_count;
t.threads.ensureTotalCapacity(gpa, thread_capacity) catch { if (busy_count >= @intFromEnum(t.concurrent_limit))
t.mutex.unlock();
ac.deinit(gpa);
return error.ConcurrencyUnavailable; return error.ConcurrencyUnavailable;
};
t.run_queue.prepend(&ac.closure.node); t.busy_count = busy_count + 1;
errdefer t.busy_count = busy_count;
if (t.threads.items.len < thread_capacity) { const pool_size = t.wait_group.value();
const thread = std.Thread.spawn(.{ .stack_size = t.stack_size }, worker, .{t}) catch { if (pool_size - busy_count == 0) {
assert(t.run_queue.popFirst() == &ac.closure.node); t.wait_group.start();
t.mutex.unlock(); errdefer t.wait_group.finish();
ac.deinit(gpa);
const thread = std.Thread.spawn(.{ .stack_size = t.stack_size }, worker, .{t}) catch
return error.ConcurrencyUnavailable; return error.ConcurrencyUnavailable;
}; thread.detach();
t.threads.appendAssumeCapacity(thread);
} }
t.mutex.unlock(); t.run_queue.prepend(&ac.closure.node);
t.cond.signal(); t.cond.signal();
return @ptrCast(ac); return @ptrCast(ac);
} }
@ -597,7 +597,7 @@ const GroupClosure = struct {
/// Points to sibling `GroupClosure`. Used for walking the group to cancel all. /// Points to sibling `GroupClosure`. Used for walking the group to cancel all.
node: std.SinglyLinkedList.Node, node: std.SinglyLinkedList.Node,
func: *const fn (*Io.Group, context: *anyopaque) void, func: *const fn (*Io.Group, context: *anyopaque) void,
context_alignment: std.mem.Alignment, context_alignment: Alignment,
alloc_len: usize, alloc_len: usize,
fn start(closure: *Closure) void { fn start(closure: *Closure) void {
@ -638,7 +638,7 @@ const GroupClosure = struct {
t: *Threaded, t: *Threaded,
group: *Io.Group, group: *Io.Group,
context: []const u8, context: []const u8,
context_alignment: std.mem.Alignment, context_alignment: Alignment,
func: *const fn (*Io.Group, context: *const anyopaque) void, func: *const fn (*Io.Group, context: *const anyopaque) void,
) Allocator.Error!*GroupClosure { ) Allocator.Error!*GroupClosure {
const max_context_misalignment = context_alignment.toByteUnits() -| @alignOf(GroupClosure); const max_context_misalignment = context_alignment.toByteUnits() -| @alignOf(GroupClosure);
@ -652,7 +652,6 @@ const GroupClosure = struct {
.closure = .{ .closure = .{
.cancel_tid = .none, .cancel_tid = .none,
.start = start, .start = start,
.is_concurrent = false,
}, },
.t = t, .t = t,
.group = group, .group = group,
@ -678,45 +677,48 @@ fn groupAsync(
userdata: ?*anyopaque, userdata: ?*anyopaque,
group: *Io.Group, group: *Io.Group,
context: []const u8, context: []const u8,
context_alignment: std.mem.Alignment, context_alignment: Alignment,
start: *const fn (*Io.Group, context: *const anyopaque) void, start: *const fn (*Io.Group, context: *const anyopaque) void,
) void { ) void {
if (builtin.single_threaded) return start(group, context.ptr);
const t: *Threaded = @ptrCast(@alignCast(userdata)); const t: *Threaded = @ptrCast(@alignCast(userdata));
const cpu_count = t.cpu_count catch 1; if (builtin.single_threaded or t.async_limit == .nothing)
return start(group, context.ptr);
const gpa = t.allocator; const gpa = t.allocator;
const gc = GroupClosure.init(gpa, t, group, context, context_alignment, start) catch { const gc = GroupClosure.init(gpa, t, group, context, context_alignment, start) catch
return start(group, context.ptr); return start(group, context.ptr);
};
t.mutex.lock(); t.mutex.lock();
const busy_count = t.busy_count;
if (busy_count >= @intFromEnum(t.async_limit)) {
t.mutex.unlock();
gc.deinit(gpa);
return start(group, context.ptr);
}
t.busy_count = busy_count + 1;
const pool_size = t.wait_group.value();
if (pool_size - busy_count == 0) {
t.wait_group.start();
const thread = std.Thread.spawn(.{ .stack_size = t.stack_size }, worker, .{t}) catch {
t.wait_group.finish();
t.busy_count = busy_count;
t.mutex.unlock();
gc.deinit(gpa);
return start(group, context.ptr);
};
thread.detach();
}
// Append to the group linked list inside the mutex to make `Io.Group.async` thread-safe. // Append to the group linked list inside the mutex to make `Io.Group.async` thread-safe.
gc.node = .{ .next = @ptrCast(@alignCast(group.token)) }; gc.node = .{ .next = @ptrCast(@alignCast(group.token)) };
group.token = &gc.node; group.token = &gc.node;
const thread_capacity = cpu_count - 1 + t.concurrent_count;
t.threads.ensureTotalCapacityPrecise(gpa, thread_capacity) catch {
t.mutex.unlock();
gc.deinit(gpa);
return start(group, context.ptr);
};
t.run_queue.prepend(&gc.closure.node); t.run_queue.prepend(&gc.closure.node);
if (t.threads.items.len < thread_capacity) {
const thread = std.Thread.spawn(.{ .stack_size = t.stack_size }, worker, .{t}) catch {
assert(t.run_queue.popFirst() == &gc.closure.node);
t.mutex.unlock();
gc.deinit(gpa);
return start(group, context.ptr);
};
t.threads.appendAssumeCapacity(thread);
}
// This needs to be done before unlocking the mutex to avoid a race with // This needs to be done before unlocking the mutex to avoid a race with
// the associated task finishing. // the associated task finishing.
const group_state: *std.atomic.Value(usize) = @ptrCast(&group.state); const group_state: *std.atomic.Value(usize) = @ptrCast(&group.state);
@ -794,25 +796,25 @@ fn await(
userdata: ?*anyopaque, userdata: ?*anyopaque,
any_future: *Io.AnyFuture, any_future: *Io.AnyFuture,
result: []u8, result: []u8,
result_alignment: std.mem.Alignment, result_alignment: Alignment,
) void { ) void {
_ = result_alignment; _ = result_alignment;
const t: *Threaded = @ptrCast(@alignCast(userdata)); const t: *Threaded = @ptrCast(@alignCast(userdata));
const closure: *AsyncClosure = @ptrCast(@alignCast(any_future)); const closure: *AsyncClosure = @ptrCast(@alignCast(any_future));
closure.waitAndDeinit(t.allocator, result); closure.waitAndDeinit(t, result);
} }
fn cancel( fn cancel(
userdata: ?*anyopaque, userdata: ?*anyopaque,
any_future: *Io.AnyFuture, any_future: *Io.AnyFuture,
result: []u8, result: []u8,
result_alignment: std.mem.Alignment, result_alignment: Alignment,
) void { ) void {
_ = result_alignment; _ = result_alignment;
const t: *Threaded = @ptrCast(@alignCast(userdata)); const t: *Threaded = @ptrCast(@alignCast(userdata));
const ac: *AsyncClosure = @ptrCast(@alignCast(any_future)); const ac: *AsyncClosure = @ptrCast(@alignCast(any_future));
ac.closure.requestCancel(); ac.closure.requestCancel();
ac.waitAndDeinit(t.allocator, result); ac.waitAndDeinit(t, result);
} }
fn cancelRequested(userdata: ?*anyopaque) bool { fn cancelRequested(userdata: ?*anyopaque) bool {

View File

@ -10,7 +10,7 @@ test "concurrent vs main prevents deadlock via oversubscription" {
defer threaded.deinit(); defer threaded.deinit();
const io = threaded.io(); const io = threaded.io();
threaded.cpu_count = 1; threaded.async_limit = .nothing;
var queue: Io.Queue(u8) = .init(&.{}); var queue: Io.Queue(u8) = .init(&.{});
@ -38,7 +38,7 @@ test "concurrent vs concurrent prevents deadlock via oversubscription" {
defer threaded.deinit(); defer threaded.deinit();
const io = threaded.io(); const io = threaded.io();
threaded.cpu_count = 1; threaded.async_limit = .nothing;
var queue: Io.Queue(u8) = .init(&.{}); var queue: Io.Queue(u8) = .init(&.{});

View File

@ -1,13 +1,14 @@
//! This struct represents a kernel thread, and acts as a namespace for concurrency //! This struct represents a kernel thread, and acts as a namespace for
//! primitives that operate on kernel threads. For concurrency primitives that support //! concurrency primitives that operate on kernel threads. For concurrency
//! both evented I/O and async I/O, see the respective names in the top level std namespace. //! primitives that interact with the I/O interface, see `std.Io`.
const std = @import("std.zig");
const builtin = @import("builtin"); const builtin = @import("builtin");
const math = std.math;
const assert = std.debug.assert;
const target = builtin.target; const target = builtin.target;
const native_os = builtin.os.tag; const native_os = builtin.os.tag;
const std = @import("std.zig");
const math = std.math;
const assert = std.debug.assert;
const posix = std.posix; const posix = std.posix;
const windows = std.os.windows; const windows = std.os.windows;
const testing = std.testing; const testing = std.testing;

View File

@ -60,6 +60,10 @@ pub fn isDone(wg: *WaitGroup) bool {
return (state / one_pending) == 0; return (state / one_pending) == 0;
} }
pub fn value(wg: *WaitGroup) usize {
return wg.state.load(.monotonic) / one_pending;
}
// Spawns a new thread for the task. This is appropriate when the callee // Spawns a new thread for the task. This is appropriate when the callee
// delegates all work. // delegates all work.
pub fn spawnManager( pub fn spawnManager(

View File

@ -30,7 +30,6 @@ const hashes = [_]Crypto{
Crypto{ .ty = crypto.hash.sha3.Shake256, .name = "shake-256" }, Crypto{ .ty = crypto.hash.sha3.Shake256, .name = "shake-256" },
Crypto{ .ty = crypto.hash.sha3.TurboShake128(null), .name = "turboshake-128" }, Crypto{ .ty = crypto.hash.sha3.TurboShake128(null), .name = "turboshake-128" },
Crypto{ .ty = crypto.hash.sha3.TurboShake256(null), .name = "turboshake-256" }, Crypto{ .ty = crypto.hash.sha3.TurboShake256(null), .name = "turboshake-256" },
Crypto{ .ty = crypto.hash.sha3.KT128, .name = "kt128" },
Crypto{ .ty = crypto.hash.blake2.Blake2s256, .name = "blake2s" }, Crypto{ .ty = crypto.hash.blake2.Blake2s256, .name = "blake2s" },
Crypto{ .ty = crypto.hash.blake2.Blake2b512, .name = "blake2b" }, Crypto{ .ty = crypto.hash.blake2.Blake2b512, .name = "blake2b" },
Crypto{ .ty = crypto.hash.Blake3, .name = "blake3" }, Crypto{ .ty = crypto.hash.Blake3, .name = "blake3" },
@ -38,7 +37,6 @@ const hashes = [_]Crypto{
const parallel_hashes = [_]Crypto{ const parallel_hashes = [_]Crypto{
Crypto{ .ty = crypto.hash.Blake3, .name = "blake3-parallel" }, Crypto{ .ty = crypto.hash.Blake3, .name = "blake3-parallel" },
Crypto{ .ty = crypto.hash.sha3.KT128, .name = "kt128-parallel" },
}; };
const block_size: usize = 8 * 8192; const block_size: usize = 8 * 8192;

View File

@ -12,8 +12,8 @@ const Vec16 = @Vector(16, u32);
const chunk_length = 1024; const chunk_length = 1024;
const max_depth = 54; const max_depth = 54;
const simd_degree = std.simd.suggestVectorLength(u32) orelse 1; pub const simd_degree = std.simd.suggestVectorLength(u32) orelse 1;
const max_simd_degree = simd_degree; pub const max_simd_degree = simd_degree;
const max_simd_degree_or_2 = if (max_simd_degree > 2) max_simd_degree else 2; const max_simd_degree_or_2 = if (max_simd_degree > 2) max_simd_degree else 2;
/// Threshold for switching to parallel processing. /// Threshold for switching to parallel processing.
@ -502,7 +502,9 @@ fn hashManySimd(
var out_ptr = out.ptr; var out_ptr = out.ptr;
var cnt = counter; var cnt = counter;
if (simd_degree >= 16) { const simd_deg = comptime simd_degree;
if (comptime simd_deg >= 16) {
while (remaining >= 16) { while (remaining >= 16) {
const sixteen_inputs = [16][*]const u8{ const sixteen_inputs = [16][*]const u8{
inp[0], inp[1], inp[2], inp[3], inp[0], inp[1], inp[2], inp[3],
@ -523,7 +525,7 @@ fn hashManySimd(
} }
} }
if (simd_degree >= 8) { if (comptime simd_deg >= 8) {
while (remaining >= 8) { while (remaining >= 8) {
const eight_inputs = [8][*]const u8{ const eight_inputs = [8][*]const u8{
inp[0], inp[1], inp[2], inp[3], inp[0], inp[1], inp[2], inp[3],
@ -542,7 +544,7 @@ fn hashManySimd(
} }
} }
if (simd_degree >= 4) { if (comptime simd_deg >= 4) {
while (remaining >= 4) { while (remaining >= 4) {
const four_inputs = [4][*]const u8{ const four_inputs = [4][*]const u8{
inp[0], inp[0],
@ -569,7 +571,7 @@ fn hashManySimd(
} }
fn hashMany(inputs: [][*]const u8, num_inputs: usize, blocks: usize, key: [8]u32, counter: u64, increment_counter: bool, flags: Flags, flags_start: Flags, flags_end: Flags, out: []u8) void { fn hashMany(inputs: [][*]const u8, num_inputs: usize, blocks: usize, key: [8]u32, counter: u64, increment_counter: bool, flags: Flags, flags_start: Flags, flags_end: Flags, out: []u8) void {
if (max_simd_degree >= 4) { if (comptime max_simd_degree >= 4) {
hashManySimd(inputs, num_inputs, blocks, key, counter, increment_counter, flags, flags_start, flags_end, out); hashManySimd(inputs, num_inputs, blocks, key, counter, increment_counter, flags, flags_start, flags_end, out);
} else { } else {
hashManyPortable(inputs, num_inputs, blocks, key, counter, increment_counter, flags, flags_start, flags_end, out); hashManyPortable(inputs, num_inputs, blocks, key, counter, increment_counter, flags, flags_start, flags_end, out);
@ -907,7 +909,7 @@ pub const Blake3 = struct {
pub const digest_length = 32; pub const digest_length = 32;
pub const key_length = 32; pub const key_length = 32;
pub const Options = struct { key: ?[key_length]u8 = null }; pub const Options = struct { key: ?[digest_length]u8 = null };
pub const KdfOptions = struct {}; pub const KdfOptions = struct {};
key: [8]u32, key: [8]u32,

File diff suppressed because it is too large Load Diff

View File

@ -4,8 +4,6 @@ const assert = std.debug.assert;
const math = std.math; const math = std.math;
const mem = std.mem; const mem = std.mem;
const kangarootwelve = @import("kangarootwelve.zig");
const KeccakState = std.crypto.core.keccak.State; const KeccakState = std.crypto.core.keccak.State;
pub const Sha3_224 = Keccak(1600, 224, 0x06, 24); pub const Sha3_224 = Keccak(1600, 224, 0x06, 24);
@ -28,9 +26,6 @@ pub const KMac256 = KMac(256);
pub const TupleHash128 = TupleHash(128); pub const TupleHash128 = TupleHash(128);
pub const TupleHash256 = TupleHash(256); pub const TupleHash256 = TupleHash(256);
pub const KT128 = kangarootwelve.KT128;
pub const KT256 = kangarootwelve.KT256;
/// TurboSHAKE128 is a XOF (a secure hash function with a variable output length), with a 128 bit security level. /// TurboSHAKE128 is a XOF (a secure hash function with a variable output length), with a 128 bit security level.
/// It is based on the same permutation as SHA3 and SHAKE128, but which much higher performance. /// It is based on the same permutation as SHA3 and SHAKE128, but which much higher performance.
/// The delimiter is 0x1f by default, but can be changed for context-separation. /// The delimiter is 0x1f by default, but can be changed for context-separation.
@ -486,10 +481,6 @@ pub const NistLengthEncoding = enum {
const htest = @import("test.zig"); const htest = @import("test.zig");
test {
_ = kangarootwelve;
}
test "sha3-224 single" { test "sha3-224 single" {
try htest.assertEqualHash(Sha3_224, "6b4e03423667dbb73b6e15454f0eb1abd4597f9a1b078e3f5b5a6bc7", ""); try htest.assertEqualHash(Sha3_224, "6b4e03423667dbb73b6e15454f0eb1abd4597f9a1b078e3f5b5a6bc7", "");
try htest.assertEqualHash(Sha3_224, "e642824c3f8cf24ad09234ee7d3c766fc9a3a5168d0c94ad73b46fdf", "abc"); try htest.assertEqualHash(Sha3_224, "e642824c3f8cf24ad09234ee7d3c766fc9a3a5168d0c94ad73b46fdf", "abc");

View File

@ -1,63 +0,0 @@
#target=x86_64-linux-selfhosted
#target=x86_64-windows-selfhosted
#target=x86_64-linux-cbe
#target=x86_64-windows-cbe
//#target=wasm32-wasi-selfhosted
#update=initial version
#file=main.zig
const std = @import("std");
pub fn main() !void {
try std.fs.File.stdout().writeAll(foo);
}
const foo = "good morning\n";
#expect_stdout="good morning\n"
#update=add new declaration
#file=main.zig
const std = @import("std");
pub fn main() !void {
try std.fs.File.stdout().writeAll(foo);
}
const foo = "good morning\n";
const bar = "good evening\n";
#expect_stdout="good morning\n"
#update=reference new declaration
#file=main.zig
const std = @import("std");
pub fn main() !void {
try std.fs.File.stdout().writeAll(bar);
}
const foo = "good morning\n";
const bar = "good evening\n";
#expect_stdout="good evening\n"
#update=reference missing declaration
#file=main.zig
const std = @import("std");
pub fn main() !void {
try std.fs.File.stdout().writeAll(qux);
}
const foo = "good morning\n";
const bar = "good evening\n";
#expect_error=main.zig:3:39: error: use of undeclared identifier 'qux'
#update=add missing declaration
#file=main.zig
const std = @import("std");
pub fn main() !void {
try std.fs.File.stdout().writeAll(qux);
}
const foo = "good morning\n";
const bar = "good evening\n";
const qux = "good night\n";
#expect_stdout="good night\n"
#update=remove unused declarations
#file=main.zig
const std = @import("std");
pub fn main() !void {
try std.fs.File.stdout().writeAll(qux);
}
const qux = "good night\n";
#expect_stdout="good night\n"