瀏覽代碼

feat: new `async` socket unblock

Douglas A 9 月之前
父節點
當前提交
13f3b232dd
共有 6 個文件被更改,包括 138 次插入6 次删除
  1. 1 0
      .gitignore
  2. 15 0
      unblock/build.zig
  3. 35 0
      unblock/src/client.zig
  4. 81 0
      unblock/src/main.zig
  5. 2 2
      zig-list/build.zig
  6. 4 4
      zig-list/src/main.zig

+ 1 - 0
.gitignore

@@ -0,0 +1 @@
+**/zig-cache/

+ 15 - 0
unblock/build.zig

@@ -0,0 +1,15 @@
+const std = @import("std");
+
+pub fn build(b: *std.Build) void {
+    const target = b.standardTargetOptions(.{});
+    const optimize = b.standardOptimizeOption(.{});
+
+    const exe = b.addExecutable(.{
+        .name = "unblock",
+        .root_source_file = b.path("src/main.zig"),
+        .target = target,
+        .optimize = optimize,
+    });
+
+    b.installArtifact(exe);
+}

+ 35 - 0
unblock/src/client.zig

@@ -0,0 +1,35 @@
+const std = @import("std");
+
+pub const Client = struct { sock: std.posix.socket_t, address: std.posix.sockaddr.in };
+
+pub const Clients = struct {
+    lock: std.Thread.Mutex,
+    clients: std.ArrayList(Client),
+    alloc: std.mem.Allocator,
+
+    pub fn init(alloc: std.mem.Allocator) Clients {
+        return .{ .lock = std.Thread.Mutex{}, .clients = std.ArrayList(Client).init(alloc), .alloc = alloc };
+    }
+
+    pub fn deinit(self: *Clients) void {
+        self.clients.deinit();
+    }
+
+    pub fn add(self: *Clients, client: Client) std.mem.Allocator.Error!void {
+        self.lock.lock();
+        defer self.lock.unlock();
+        return self.clients.append(client);
+    }
+
+    pub fn remove(self: *Clients, client: Client) void {
+        self.lock.lock();
+        defer self.lock.unlock();
+
+        for (self.clients.items, 0..) |c, idx| {
+            if (c.address == client.address) {
+                self.clients.swapRemove(idx);
+                break;
+            }
+        }
+    }
+};

+ 81 - 0
unblock/src/main.zig

@@ -0,0 +1,81 @@
+const std = @import("std");
+const client = @import("client.zig");
+
+fn handle_connections(clients: *client.Clients) !void {
+    while (true) {
+        for (clients.clients.items) |c| {
+            clients.lock.lock();
+
+            var buf = [_]u8{0} ** 1024;
+            const sock = c.sock;
+            const r = std.posix.recv(sock, &buf, 0) catch |err| switch (err) {
+                error.WouldBlock => {
+                    clients.lock.unlock();
+                    continue;
+                },
+                else => {
+                    std.debug.print("ERROR : {any}", .{err});
+                    clients.lock.unlock();
+
+                    continue;
+                },
+            };
+
+            if (r > 0) {
+                std.debug.print("Got {} bytes\n", .{r});
+            }
+
+            clients.lock.unlock();
+        }
+
+        std.time.sleep(1_000_000 * 1000);
+    }
+}
+
+pub fn main() !void {
+    var gpa = std.heap.GeneralPurposeAllocator(.{}){};
+    defer _ = gpa.deinit();
+
+    const allocator = gpa.allocator();
+
+    const sock = try std.posix.socket(std.posix.AF.INET, std.posix.SOCK.STREAM, 0);
+    const parsed_address = try std.net.Address.parseIp4("127.0.0.1", 8080);
+    var sock_flags = try std.posix.fcntl(sock, std.c.F.GETFD, 0);
+    if (sock_flags < 0) {
+        std.debug.print("got incorrect flags");
+        return;
+    }
+
+    const enable = [_]u8{ 1, 1, 1, 1, 1, 1, 1, 1 };
+
+    try std.posix.setsockopt(sock, std.posix.SOL.SOCKET, std.posix.SO.REUSEPORT | std.posix.SO.REUSEADDR, &enable);
+
+    sock_flags = sock_flags | std.c.SOCK.NONBLOCK;
+    const res = try std.posix.fcntl(sock, std.c.F.SETFD, sock_flags);
+    if (res < 0) {
+        std.debug.panic("could not fcntl\n");
+    }
+
+    // Only allow ipv4. select parsed_address.any to allow ipv4 and ipv6 or in6 for only ipv6
+    try std.posix.bind(sock, @ptrCast(&parsed_address.in), parsed_address.getOsSockLen());
+    try std.posix.listen(sock, 5);
+
+    var addr: std.c.sockaddr.in = .{ .addr = 0, .port = 0 };
+    var addr_size: std.posix.socklen_t = @sizeOf(std.c.sockaddr.in);
+
+    var clients = client.Clients.init(allocator);
+    defer clients.deinit();
+
+    const t = try std.Thread.spawn(.{}, handle_connections, .{&clients});
+
+    while (true) {
+        // TODO: loop over this to accept more than one client
+        const sock2 = try std.posix.accept(sock, @ptrCast(&addr), &addr_size, std.posix.SOCK.NONBLOCK);
+
+        try clients.add(.{ .sock = sock2, .address = addr });
+
+        std.debug.print("Got client {any}\n", .{addr});
+    }
+
+    t.join();
+}

+ 2 - 2
zig-list/build.zig

@@ -19,7 +19,7 @@ pub fn build(b: *std.Build) void {
         .name = "zig-list",
         // In this case the main source file is merely a path, however, in more
         // complicated build scripts, this could be a generated file.
-        .root_source_file = .{ .path = "src/main.zig" },
+        .root_source_file = b.path("src/main.zig"), //.{ b. .path = "src/main.zig" },
         .target = target,
         .optimize = optimize,
     });
@@ -55,7 +55,7 @@ pub fn build(b: *std.Build) void {
     // Creates a step for unit testing. This only builds the test executable
     // but does not run it.
     const unit_tests = b.addTest(.{
-        .root_source_file = .{ .path = "src/main.zig" },
+        .root_source_file = b.path("src/main.zig"), //.{ .path = "src/main.zig" },
         .target = target,
         .optimize = optimize,
     });

+ 4 - 4
zig-list/src/main.zig

@@ -41,7 +41,7 @@ fn Queue(comptime T: type) type {
         fn dequeue(self: *Self) ?T {
             if (self.head) |t_head| {
                 self.head = t_head.next;
-                var value = t_head.value;
+                const value = t_head.value;
                 self.alloc.destroy(t_head);
                 return value;
             }
@@ -78,7 +78,7 @@ pub fn main() !void {
     q.print();
     std.log.info("-----", .{});
 
-    var x = q.dequeue();
+    const x = q.dequeue();
     _ = x;
     q.print();
 }
@@ -118,12 +118,12 @@ test "struct* queue" {
 }
 
 test "empty queue" {
-    var arena = std.heap.ArenaAllocator.init(std.heap.page_allocator);
+    const arena = std.heap.ArenaAllocator.init(std.heap.page_allocator);
     defer arena.deinit();
 
     const allocator = arena.allocator();
 
     var q = IntQueue.new(allocator);
-    var x = q.dequeue();
+    const x = q.dequeue();
     try std.testing.expect(x == null);
 }