Эх сурвалжийг харах

new threadpool for zig

TODO: use generics for arguments
Douglas A 1 жил өмнө
parent
commit
87d30ac38d

+ 2 - 0
threadpool/.gitignore

@@ -0,0 +1,2 @@
+zig-out/
+zig-cache/

+ 70 - 0
threadpool/build.zig

@@ -0,0 +1,70 @@
+const std = @import("std");
+
+// Although this function looks imperative, note that its job is to
+// declaratively construct a build graph that will be executed by an external
+// runner.
+pub fn build(b: *std.Build) void {
+    // Standard target options allows the person running `zig build` to choose
+    // what target to build for. Here we do not override the defaults, which
+    // means any target is allowed, and the default is native. Other options
+    // for restricting supported target set are available.
+    const target = b.standardTargetOptions(.{});
+
+    // Standard optimization options allow the person running `zig build` to select
+    // between Debug, ReleaseSafe, ReleaseFast, and ReleaseSmall. Here we do not
+    // set a preferred release mode, allowing the user to decide how to optimize.
+    const optimize = b.standardOptimizeOption(.{});
+
+    const exe = b.addExecutable(.{
+        .name = "threadpool",
+        // In this case the main source file is merely a path, however, in more
+        // complicated build scripts, this could be a generated file.
+        .root_source_file = .{ .path = "src/main.zig" },
+        .target = target,
+        .optimize = optimize,
+    });
+
+    // This declares intent for the executable to be installed into the
+    // standard location when the user invokes the "install" step (the default
+    // step when running `zig build`).
+    b.installArtifact(exe);
+
+    // This *creates* a Run step in the build graph, to be executed when another
+    // step is evaluated that depends on it. The next line below will establish
+    // such a dependency.
+    const run_cmd = b.addRunArtifact(exe);
+
+    // By making the run step depend on the install step, it will be run from the
+    // installation directory rather than directly from within the cache directory.
+    // This is not necessary, however, if the application depends on other installed
+    // files, this ensures they will be present and in the expected location.
+    run_cmd.step.dependOn(b.getInstallStep());
+
+    // This allows the user to pass arguments to the application in the build
+    // command itself, like this: `zig build run -- arg1 arg2 etc`
+    if (b.args) |args| {
+        run_cmd.addArgs(args);
+    }
+
+    // This creates a build step. It will be visible in the `zig build --help` menu,
+    // and can be selected like this: `zig build run`
+    // This will evaluate the `run` step rather than the default, which is "install".
+    const run_step = b.step("run", "Run the app");
+    run_step.dependOn(&run_cmd.step);
+
+    // Creates a step for unit testing. This only builds the test executable
+    // but does not run it.
+    const unit_tests = b.addTest(.{
+        .root_source_file = .{ .path = "src/main.zig" },
+        .target = target,
+        .optimize = optimize,
+    });
+
+    const run_unit_tests = b.addRunArtifact(unit_tests);
+
+    // Similar to creating the run step earlier, this exposes a `test` step to
+    // the `zig build --help` menu, providing a way for the user to request
+    // running the unit tests.
+    const test_step = b.step("test", "Run unit tests");
+    test_step.dependOn(&run_unit_tests.step);
+}

+ 110 - 0
threadpool/src/main.zig

@@ -0,0 +1,110 @@
+const std = @import("std");
+
+const WorkParams = struct {
+    const Self = @This();
+    func: *const fn (usize) void,
+    args: usize,
+};
+
+fn ThreadPool() type {
+    const Work = struct {
+        const Self = @This();
+        next: ?*Self,
+        value: ?WorkParams,
+    };
+
+    return struct {
+        const Self = @This();
+        alloc: std.mem.Allocator,
+        mutex: *std.Thread.Mutex,
+        head: ?*Work,
+        tail: ?*Work,
+        stop: bool,
+
+        fn init(alloc: std.mem.Allocator, mutex: *std.Thread.Mutex) Self {
+            return .{ .alloc = alloc, .mutex = mutex, .head = null, .tail = null, .stop = false };
+        }
+
+        fn start(self: *Self) void {
+            for (0..100) |_| {
+                _ = std.Thread.spawn(.{}, run, .{self}) catch {
+                    std.debug.print("could not start thread\n", .{});
+                };
+            }
+        }
+
+        fn wait(self: *Self) void {
+            while (!self.stop) {
+                std.time.sleep(300);
+            }
+        }
+
+        fn add(self: *Self, work: WorkParams) !void {
+            self.mutex.lock();
+            self.mutex.unlock();
+
+            var node = try self.alloc.create(Work);
+            node.value = work;
+            node.next = null;
+
+            if (self.head == null) {
+                self.head = node;
+                self.tail = node;
+            }
+
+            if (self.tail) |t_node| {
+                var next = t_node;
+                next.next = node;
+            }
+            self.tail = node;
+        }
+
+        fn get(self: *Self) ?WorkParams {
+            self.mutex.lock();
+            defer self.mutex.unlock();
+
+            if (self.head) |t_head| {
+                self.head = t_head.next;
+                var value = t_head.value;
+                self.alloc.destroy(t_head);
+                return value;
+            }
+            return null;
+        }
+
+        fn run(me: *Self) !void {
+            var work = me.get();
+            while (!me.stop) {
+                while (work) |w| {
+                    w.func(w.args);
+                    work = me.get();
+                }
+            }
+        }
+    };
+}
+
+pub fn do_work(val: usize) void {
+    var rnd = std.rand.DefaultPrng.init(val);
+    var sleep_time = @rem(rnd.random().int(u64), 25000);
+
+    std.debug.print("Hello, the value is: {d}... sleeping {}\n", .{ val, sleep_time });
+
+    std.time.sleep(sleep_time * 1000000);
+    std.debug.print("Awakening, the value is: {d}\n", .{val});
+}
+
+pub fn main() !void {
+    var gpa = std.heap.ArenaAllocator.init(std.heap.page_allocator);
+    defer gpa.deinit();
+    var allocator = gpa.allocator();
+    var mutex = std.Thread.Mutex{};
+    var t = ThreadPool().init(allocator, &mutex);
+
+    for (0..1000000) |idx| {
+        try t.add(WorkParams{ .args = idx, .func = do_work });
+    }
+
+    t.start();
+    t.wait();
+}