-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathexample.zig
62 lines (55 loc) · 2.66 KB
/
example.zig
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
const std = @import("std");
const net = std.net;
const os = std.os;
const zanzara = @import("src/zanzara.zig");
const DefaultClient = zanzara.mqtt4.DefaultClient;
const Subscribe = zanzara.mqtt4.packet.Subscribe;
pub fn main() !void {
var gpa = std.heap.GeneralPurposeAllocator(.{}){};
const allocator = gpa.allocator();
const stream = try net.tcpConnectToHost(allocator, "mqtt.eclipseprojects.io", 1883);
const socket = stream.handle;
const writer = stream.writer();
var mqtt_buf: [2048]u8 = undefined;
var client = try DefaultClient.init(mqtt_buf[0..1024], mqtt_buf[1024..]);
// See ConnectOpts for additional options
try client.connect(.{ .client_id = "zanzara" });
var read_buf: [2048]u8 = undefined;
while (true) {
// We use os.MSG.DONTWAIT so the socket returns WouldBlock if no data is present
const bytes = os.recv(socket, &read_buf, os.MSG.DONTWAIT) catch |err|
if (err == error.WouldBlock) 0 else return err;
var rest = read_buf[0..bytes];
while (true) {
// The driving force of the client is the client.feed() function
// This must be called periodically, either passing some data coming from the network
// or with an empty slice (if no incoming data is present) to allow the client to handle
// its periodic tasks, like pings etc.
const event = client.feed(rest);
switch (event.data) {
.incoming_packet => |p| {
switch (p) {
.connack => {
std.debug.print("Connected, sending subscriptions\n", .{});
// Subscribe to the topic we're publishing on
const topics = [_]Subscribe.Topic{
.{ .topic_filter = "zig/zanzara_in", .qos = .qos2 },
};
_ = try client.subscribe(&topics);
_ = try client.publish("zig/zanzara_out", "Howdy!", .{});
},
.publish => |pb| {
std.debug.print("Received publish on topic {s} with payload {s}\n", .{ pb.topic, pb.payload });
},
else => std.debug.print("Received packet: {}\n", .{p}),
}
},
.outgoing_buf => |b| try writer.writeAll(b), // Write pending stuff to the socket
.err => |e| std.debug.print("Error event: {}\n", .{e}),
.none => {},
}
rest = rest[event.consumed..];
if (rest.len == 0) break;
}
}
}