closer to std lib event stuff working

This commit is contained in:
Andrew Kelley 2019-08-08 16:41:38 -04:00
parent 93840f8610
commit da56959a9a
No known key found for this signature in database
GPG Key ID: 7C5F548F728501A9
3 changed files with 642 additions and 654 deletions

View File

@ -89,12 +89,7 @@ pub fn Channel(comptime T: type) type {
/// puts a data item in the channel. The promise completes when the value has been added to the
/// buffer, or in the case of a zero size buffer, when the item has been retrieved by a getter.
pub async fn put(self: *SelfChannel, data: T) void {
// TODO fix this workaround
suspend {
resume @handle();
}
var my_tick_node = Loop.NextTickNode.init(@handle());
var my_tick_node = Loop.NextTickNode.init(@frame());
var queue_node = std.atomic.Queue(PutNode).Node.init(PutNode{
.tick_node = &my_tick_node,
.data = data,
@ -122,15 +117,10 @@ pub fn Channel(comptime T: type) type {
/// await this function to get an item from the channel. If the buffer is empty, the promise will
/// complete when the next item is put in the channel.
pub async fn get(self: *SelfChannel) T {
// TODO fix this workaround
suspend {
resume @handle();
}
// TODO integrate this function with named return values
// so we can get rid of this extra result copy
var result: T = undefined;
var my_tick_node = Loop.NextTickNode.init(@handle());
var my_tick_node = Loop.NextTickNode.init(@frame());
var queue_node = std.atomic.Queue(GetNode).Node.init(GetNode{
.tick_node = &my_tick_node,
.data = GetNode.Data{
@ -173,15 +163,10 @@ pub fn Channel(comptime T: type) type {
/// Await is necessary for locking purposes. The function will be resumed after checking the channel
/// for data and will not wait for data to be available.
pub async fn getOrNull(self: *SelfChannel) ?T {
// TODO fix this workaround
suspend {
resume @handle();
}
// TODO integrate this function with named return values
// so we can get rid of this extra result copy
var result: ?T = null;
var my_tick_node = Loop.NextTickNode.init(@handle());
var my_tick_node = Loop.NextTickNode.init(@frame());
var or_null_node = std.atomic.Queue(*std.atomic.Queue(GetNode).Node).Node.init(undefined);
var queue_node = std.atomic.Queue(GetNode).Node.init(GetNode{
.tick_node = &my_tick_node,
@ -334,41 +319,36 @@ test "std.event.Channel" {
const channel = try Channel(i32).create(&loop, 0);
defer channel.destroy();
const handle = try async<allocator> testChannelGetter(&loop, channel);
defer cancel handle;
const putter = try async<allocator> testChannelPutter(channel);
defer cancel putter;
const handle = async testChannelGetter(&loop, channel);
const putter = async testChannelPutter(channel);
loop.run();
}
async fn testChannelGetter(loop: *Loop, channel: *Channel(i32)) void {
errdefer @panic("test failed");
const value1_promise = try async channel.get();
const value1_promise = async channel.get();
const value1 = await value1_promise;
testing.expect(value1 == 1234);
const value2_promise = try async channel.get();
const value2_promise = async channel.get();
const value2 = await value2_promise;
testing.expect(value2 == 4567);
const value3_promise = try async channel.getOrNull();
const value3_promise = async channel.getOrNull();
const value3 = await value3_promise;
testing.expect(value3 == null);
const last_put = try async testPut(channel, 4444);
const value4 = await try async channel.getOrNull();
const last_put = async testPut(channel, 4444);
const value4 = channel.getOrNull();
testing.expect(value4.? == 4444);
await last_put;
}
async fn testChannelPutter(channel: *Channel(i32)) void {
await (async channel.put(1234) catch @panic("out of memory"));
await (async channel.put(4567) catch @panic("out of memory"));
channel.put(1234);
channel.put(4567);
}
async fn testPut(channel: *Channel(i32), value: i32) void {
await (async channel.put(value) catch @panic("out of memory"));
channel.put(value);
}

File diff suppressed because it is too large Load Diff

View File

@ -98,9 +98,21 @@ pub const Loop = struct {
};
pub const instance: ?*Loop = if (@hasDecl(root, "event_loop")) root.event_loop else default_instance;
/// TODO copy elision / named return values so that the threads referencing *Loop
/// have the correct pointer value.
/// https://github.com/ziglang/zig/issues/2761 and https://github.com/ziglang/zig/issues/2765
pub fn init(self: *Loop, allocator: *mem.Allocator) !void {
if (builtin.single_threaded) {
return self.initSingleThreaded(allocator);
} else {
return self.initMultiThreaded(allocator);
}
}
/// After initialization, call run().
/// TODO copy elision / named return values so that the threads referencing *Loop
/// have the correct pointer value.
/// https://github.com/ziglang/zig/issues/2761 and https://github.com/ziglang/zig/issues/2765
pub fn initSingleThreaded(self: *Loop, allocator: *mem.Allocator) !void {
return self.initInternal(allocator, 1);
}
@ -110,6 +122,7 @@ pub const Loop = struct {
/// After initialization, call run().
/// TODO copy elision / named return values so that the threads referencing *Loop
/// have the correct pointer value.
/// https://github.com/ziglang/zig/issues/2761 and https://github.com/ziglang/zig/issues/2765
pub fn initMultiThreaded(self: *Loop, allocator: *mem.Allocator) !void {
if (builtin.single_threaded) @compileError("initMultiThreaded unavailable when building in single-threaded mode");
const core_count = try Thread.cpuCount();
@ -161,18 +174,18 @@ pub const Loop = struct {
fn initOsData(self: *Loop, extra_thread_count: usize) InitOsDataError!void {
switch (builtin.os) {
.linux => {
// TODO self.os_data.fs_queue = std.atomic.Queue(fs.Request).init();
// TODO self.os_data.fs_queue_item = 0;
// TODO // we need another thread for the file system because Linux does not have an async
// TODO // file system I/O API.
// TODO self.os_data.fs_end_request = fs.RequestNode{
// TODO .prev = undefined,
// TODO .next = undefined,
// TODO .data = fs.Request{
// TODO .msg = fs.Request.Msg.End,
// TODO .finish = fs.Request.Finish.NoAction,
// TODO },
// TODO };
self.os_data.fs_queue = std.atomic.Queue(fs.Request).init();
self.os_data.fs_queue_item = 0;
// we need another thread for the file system because Linux does not have an async
// file system I/O API.
self.os_data.fs_end_request = fs.RequestNode{
.prev = undefined,
.next = undefined,
.data = fs.Request{
.msg = fs.Request.Msg.End,
.finish = fs.Request.Finish.NoAction,
},
};
errdefer {
while (self.available_eventfd_resume_nodes.pop()) |node| os.close(node.data.eventfd);
@ -210,10 +223,10 @@ pub const Loop = struct {
&self.os_data.final_eventfd_event,
);
// TODO self.os_data.fs_thread = try Thread.spawn(self, posixFsRun);
self.os_data.fs_thread = try Thread.spawn(self, posixFsRun);
errdefer {
// TODO self.posixFsRequest(&self.os_data.fs_end_request);
// TODO self.os_data.fs_thread.wait();
self.posixFsRequest(&self.os_data.fs_end_request);
self.os_data.fs_thread.wait();
}
if (builtin.single_threaded) {
@ -315,10 +328,10 @@ pub const Loop = struct {
.udata = undefined,
};
// TODO self.os_data.fs_thread = try Thread.spawn(self, posixFsRun);
self.os_data.fs_thread = try Thread.spawn(self, posixFsRun);
errdefer {
// TODO self.posixFsRequest(&self.os_data.fs_end_request);
// TODO self.os_data.fs_thread.wait();
self.posixFsRequest(&self.os_data.fs_end_request);
self.os_data.fs_thread.wait();
}
if (builtin.single_threaded) {
@ -441,7 +454,6 @@ pub const Loop = struct {
pub async fn linuxWaitFd(self: *Loop, fd: i32, flags: u32) !void {
defer self.linuxRemoveFd(fd);
suspend {
// TODO explicitly put this memory in the coroutine frame #1194
var resume_node = ResumeNode.Basic{
.base = ResumeNode{
.id = ResumeNode.Id.Basic,
@ -454,10 +466,6 @@ pub const Loop = struct {
}
pub async fn bsdWaitKev(self: *Loop, ident: usize, filter: i16, fflags: u32) !os.Kevent {
// TODO #1194
suspend {
resume @handle();
}
var resume_node = ResumeNode.Basic{
.base = ResumeNode{
.id = ResumeNode.Id.Basic,
@ -578,7 +586,7 @@ pub const Loop = struct {
.macosx,
.freebsd,
.netbsd,
=> {}, // TODO self.os_data.fs_thread.wait(),
=> self.os_data.fs_thread.wait(),
else => {},
}
@ -631,7 +639,7 @@ pub const Loop = struct {
// cause all the threads to stop
switch (builtin.os) {
.linux => {
// TODO self.posixFsRequest(&self.os_data.fs_end_request);
self.posixFsRequest(&self.os_data.fs_end_request);
// writing 8 bytes to an eventfd cannot fail
os.write(self.os_data.final_eventfd, wakeup_bytes) catch unreachable;
return;
@ -862,10 +870,10 @@ pub const Loop = struct {
epollfd: i32,
final_eventfd: i32,
final_eventfd_event: os.linux.epoll_event,
// TODO fs_thread: *Thread,
// TODO fs_queue_item: i32,
// TODO fs_queue: std.atomic.Queue(fs.Request),
// TODO fs_end_request: fs.RequestNode,
fs_thread: *Thread,
fs_queue_item: i32,
fs_queue: std.atomic.Queue(fs.Request),
fs_end_request: fs.RequestNode,
};
};