Skip to content

Commit d49a7b8

Browse files
committed
use a more compatible approach for non-blocking sockets.
(instead of a linux-only flag)
1 parent 8a37811 commit d49a7b8

File tree

1 file changed

+39
-6
lines changed

1 file changed

+39
-6
lines changed

example.zig

+39-6
Original file line numberDiff line numberDiff line change
@@ -5,25 +5,58 @@ const zanzara = @import("src/zanzara.zig");
55
const DefaultClient = zanzara.mqtt4.DefaultClient;
66
const Subscribe = zanzara.mqtt4.packet.Subscribe;
77

8+
const TcpConnectToAddressError = std.posix.SocketError || std.posix.ConnectError;
9+
10+
fn tcpConnectToAddressNonBlock(address: net.Address) TcpConnectToAddressError!net.Stream {
11+
const sock_flags = std.posix.SOCK.STREAM | std.posix.SOCK.NONBLOCK;
12+
const sockfd = try std.posix.socket(address.any.family, sock_flags, std.posix.IPPROTO.TCP);
13+
errdefer net.Stream.close(.{ .handle = sockfd });
14+
15+
std.posix.connect(sockfd, &address.any, address.getOsSockLen()) catch |err| {
16+
switch (err) {
17+
error.WouldBlock => std.time.sleep(1 * std.time.ns_per_s), // Todo: handle this better
18+
else => return err,
19+
}
20+
};
21+
22+
return net.Stream{ .handle = sockfd };
23+
}
24+
25+
fn tcpConnectToHostNonBlock(allocator: std.mem.Allocator, name: []const u8, port: u16) net.TcpConnectToHostError!net.Stream {
26+
const list = try net.getAddressList(allocator, name, port);
27+
defer list.deinit();
28+
29+
if (list.addrs.len == 0) return error.UnknownHostName;
30+
31+
for (list.addrs) |addr| {
32+
return tcpConnectToAddressNonBlock(addr) catch |err| switch (err) {
33+
error.ConnectionRefused => {
34+
continue;
35+
},
36+
else => return err,
37+
};
38+
}
39+
return std.posix.ConnectError.ConnectionRefused;
40+
}
41+
842
pub fn main() !void {
943
var gpa = std.heap.GeneralPurposeAllocator(.{}){};
1044
const allocator = gpa.allocator();
1145

12-
const stream = try net.tcpConnectToHost(allocator, "mqtt.eclipseprojects.io", 1883);
46+
const stream = try tcpConnectToHostNonBlock(allocator, "mqtt.eclipseprojects.io", 1883);
1347
const socket = stream.handle;
1448
const writer = stream.writer();
1549

16-
var mqtt_buf: [2048]u8 = undefined;
50+
var mqtt_buf: [32 * 2048]u8 = undefined;
51+
var client = try DefaultClient.init(mqtt_buf[0 .. 1024 * 32], mqtt_buf[1024 * 32 ..]);
1752

18-
var client = try DefaultClient.init(mqtt_buf[0..1024], mqtt_buf[1024..]);
1953
// See ConnectOpts for additional options
2054
try client.connect(.{ .client_id = "zanzara" });
2155

22-
var read_buf: [2048]u8 = undefined;
56+
var read_buf: [32 * 2048]u8 = undefined;
2357

2458
while (true) {
25-
// We use os.MSG.DONTWAIT so the socket returns WouldBlock if no data is present
26-
const bytes = std.posix.recv(socket, &read_buf, std.posix.MSG.DONTWAIT) catch |err|
59+
const bytes = std.posix.recv(socket, &read_buf, 0) catch |err|
2760
if (err == error.WouldBlock) 0 else return err;
2861
var rest = read_buf[0..bytes];
2962
while (true) {

0 commit comments

Comments
 (0)