Skip to content

Commit

Permalink
Implement a lossy queue for outgoing packets
Browse files Browse the repository at this point in the history
  • Loading branch information
Mossop committed Dec 17, 2024
1 parent ccd471c commit cf5cbf2
Show file tree
Hide file tree
Showing 7 changed files with 158 additions and 134 deletions.
7 changes: 6 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,4 +13,9 @@ runs forever attempting to maintain a connection to the MQTT broker. It includes
automatically subscribing to some topics when connected to the broker and registering a last will
message to be published if the connection to the broker it lost.

Has some basic support for the different MQTT QoS levels.
The different MQTT QoS levels are supported to a certain extent. In most cases a QoS of 0 is used by
default which means your message may never make it to the broker. This is particularly true in the
case where the network is disconnected or the broker is unreachable in which case you will get no
error or other warning after publishing a message. If you need that you can set a higher QoS level.
This crate will not automatically re-send messages that fail to be delivered however you will get an
error if the broker does not acknowledge messages within a certain time (currently 2 seconds).
5 changes: 0 additions & 5 deletions src/buffer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -72,11 +72,6 @@ impl<const N: usize> Buffer<N> {
pub fn available(&self) -> usize {
N - self.cursor
}

/// Resets the buffer discarding any previously written bytes.
pub(crate) fn reset(&mut self) {
self.cursor = 0;
}
}

impl<const N: usize> Deref for Buffer<N> {
Expand Down
8 changes: 3 additions & 5 deletions src/homeassistant/light.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,9 @@ use core::{ops::Deref, str};
use serde::{ser::SerializeStruct, Deserialize, Serialize, Serializer};

use crate::{
fmt::Debug2Format,
homeassistant::{binary_sensor::BinarySensorState, ser::List, Component},
Error,
Payload,
Publishable,
Topic,
Error, Payload, Publishable, Topic,
};

#[derive(Serialize)]
Expand Down Expand Up @@ -129,7 +127,7 @@ impl<'a> LightState<'a> {
let parsed: LedPayload<'a> = match payload.deserialize_json() {
Ok(p) => p,
Err(e) => {
warn!("Failed to deserialize packet: {:?}", e);
warn!("Failed to deserialize packet: {:?}", Debug2Format(&e));
if let Ok(s) = str::from_utf8(payload) {
trace!("{}", s);
}
Expand Down
179 changes: 64 additions & 115 deletions src/io.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,45 +9,21 @@ use embassy_net::{
};
use embassy_sync::{
blocking_mutex::raw::CriticalSectionRawMutex,
mutex::Mutex,
pubsub::{PubSubChannel, Subscriber, WaitResult},
signal::Signal,
};
use embassy_time::Timer;
use embedded_io_async::Write;
use mqttrs::{
decode_slice,
Connect,
ConnectReturnCode,
LastWill,
Packet,
Pid,
Protocol,
Publish,
QoS,
QosPid,
decode_slice, Connect, ConnectReturnCode, LastWill, Packet, Pid, Protocol, Publish, QoS, QosPid,
};

use crate::{
device_id,
fmt::Debug2Format,
Buffer,
ControlMessage,
Error,
MqttMessage,
Payload,
Publishable,
Topic,
TopicString,
CONFIRMATION_TIMEOUT,
DATA_CHANNEL,
DEFAULT_BACKOFF,
device_id, fmt::Debug2Format, queue::LossyQueue, ControlMessage, Error, MqttMessage, Payload,
Publishable, Topic, TopicString, CONFIRMATION_TIMEOUT, DATA_CHANNEL, DEFAULT_BACKOFF,
RESET_BACKOFF,
};

static WRITE_BUFFER: Mutex<CriticalSectionRawMutex, Buffer<4096>> = Mutex::new(Buffer::new());
static WRITE_PENDING: Signal<CriticalSectionRawMutex, ()> = Signal::new();
static WRITE_COMPLETE: Signal<CriticalSectionRawMutex, ()> = Signal::new();
static SEND_QUEUE: LossyQueue<CriticalSectionRawMutex, Payload, 10> = LossyQueue::new();

pub(crate) static CONTROL_CHANNEL: PubSubChannel<CriticalSectionRawMutex, ControlMessage, 2, 5, 0> =
PubSubChannel::new();
Expand Down Expand Up @@ -92,27 +68,19 @@ mod atomic16 {
}
}

pub(crate) async fn send_packet(packet: Packet<'_>) -> Result<(), Error> {
loop {
trace!("Waiting for data to be written");
WRITE_COMPLETE.wait().await;

{
let mut buffer = WRITE_BUFFER.lock().await;
trace!("Encoding packet");
pub(crate) fn send_packet(packet: Packet<'_>) -> Result<(), Error> {
let mut buffer = Payload::new();
trace!("Encoding packet");

match buffer.encode_packet(&packet) {
Ok(()) => {
trace!("Signaling data ready");
WRITE_PENDING.signal(());
return Ok(());
}
Err(mqttrs::Error::WriteZero) => {}
Err(_) => {
error!("Failed to send packet");
return Err(Error::PacketError);
}
}
match buffer.encode_packet(&packet) {
Ok(()) => {
trace!("Sending packet");
SEND_QUEUE.push(buffer);
Ok(())
}
Err(_) => {
error!("Failed to send packet");
Err(Error::PacketError)
}
}
}
Expand Down Expand Up @@ -174,7 +142,7 @@ pub(crate) async fn publish(
payload,
});

send_packet(packet).await?;
send_packet(packet)?;

if let Some(expected_pid) = pid {
wait_for_publish(subscriber, expected_pid).await
Expand Down Expand Up @@ -325,10 +293,10 @@ where
match publish.qospid {
mqttrs::QosPid::AtMostOnce => {}
mqttrs::QosPid::AtLeastOnce(pid) => {
send_packet(Packet::Puback(pid)).await?;
send_packet(Packet::Puback(pid))?;
}
mqttrs::QosPid::ExactlyOnce(pid) => {
send_packet(Packet::Pubrec(pid)).await?;
send_packet(Packet::Pubrec(pid))?;
}
}
}
Expand All @@ -337,9 +305,9 @@ where
}
Packet::Pubrec(pid) => {
controller.publish_immediate(ControlMessage::Published(pid));
send_packet(Packet::Pubrel(pid)).await?;
send_packet(Packet::Pubrel(pid))?;
}
Packet::Pubrel(pid) => send_packet(Packet::Pubrel(pid)).await?,
Packet::Pubrel(pid) => send_packet(Packet::Pubrel(pid))?,
Packet::Pubcomp(_) => {}

Packet::Suback(suback) => {
Expand Down Expand Up @@ -378,76 +346,57 @@ where
}

async fn write_loop(&self, mut writer: TcpWriter<'_>) {
// Clear out any old data.
{
let mut buffer = WRITE_BUFFER.lock().await;
buffer.reset();
WRITE_PENDING.reset();

let mut last_will_topic = TopicString::new();
let mut last_will_payload = Payload::new();

let last_will = self.last_will.as_ref().and_then(|p| {
if p.write_topic(&mut last_will_topic).is_ok()
&& p.write_payload(&mut last_will_payload).is_ok()
{
Some(LastWill {
topic: &last_will_topic,
message: &last_will_payload,
qos: p.qos(),
retain: p.retain(),
})
} else {
None
}
});

// Send our connection request.
if buffer
.encode_packet(&Packet::Connect(Connect {
protocol: Protocol::MQTT311,
keep_alive: 60,
client_id: device_id(),
clean_session: true,
last_will,
username: self.username,
password: self.password.map(|s| s.as_bytes()),
}))
.is_err()
{
error!("Failed to encode connection packet");
return;
}
let mut buffer = Payload::new();

if let Err(e) = writer.write_all(&buffer).await {
error!("Failed to send connection packet: {:?}", e);
return;
}
let mut last_will_topic = TopicString::new();
let mut last_will_payload = Payload::new();

buffer.reset();
let last_will = self.last_will.as_ref().and_then(|p| {
if p.write_topic(&mut last_will_topic).is_ok()
&& p.write_payload(&mut last_will_payload).is_ok()
{
Some(LastWill {
topic: &last_will_topic,
message: &last_will_payload,
qos: p.qos(),
retain: p.retain(),
})
} else {
None
}
});

// Send our connection request.
if buffer
.encode_packet(&Packet::Connect(Connect {
protocol: Protocol::MQTT311,
keep_alive: 60,
client_id: device_id(),
clean_session: true,
last_will,
username: self.username,
password: self.password.map(|s| s.as_bytes()),
}))
.is_err()
{
error!("Failed to encode connection packet");
return;
}

WRITE_COMPLETE.signal(());
if let Err(e) = writer.write_all(&buffer).await {
error!("Failed to send connection packet: {:?}", e);
return;
}

loop {
trace!("Writer waiting for data");
WRITE_PENDING.wait().await;
let buffer = SEND_QUEUE.pop().await;

{
let mut buffer = WRITE_BUFFER.lock().await;
WRITE_PENDING.reset();
trace!("Writer locked data");

if let Err(e) = writer.write_all(&buffer).await {
error!("Failed to send data: {:?}", e);
return;
}

buffer.reset();
trace!("Writer sending data");
if let Err(e) = writer.write_all(&buffer).await {
error!("Failed to send data: {:?}", e);
return;
}

trace!("Writer signaling completion");
WRITE_COMPLETE.signal(());
}
}

Expand Down Expand Up @@ -505,7 +454,7 @@ where
loop {
Timer::after_secs(45).await;

let _ = send_packet(Packet::Pingreq).await;
let _ = send_packet(Packet::Pingreq);
}
};

Expand Down
1 change: 1 addition & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ mod buffer;
pub mod homeassistant;
mod io;
mod publish;
mod queue;
mod topic;

// This really needs to match that used by mqttrs.
Expand Down
80 changes: 80 additions & 0 deletions src/queue.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
use core::{
cell::RefCell,
future::Future,
pin::Pin,
task::{Context, Poll, Waker},
};

use embassy_sync::blocking_mutex::{raw::RawMutex, Mutex};
use heapless::Deque;

struct LossyQueueData<T, const N: usize> {
receiver_waker: Option<Waker>,
queue: Deque<T, N>,
}

pub(crate) struct ReceiveFuture<'a, M: RawMutex, T, const N: usize> {
pipe: &'a LossyQueue<M, T, N>,
}

impl<M: RawMutex, T, const N: usize> Future for ReceiveFuture<'_, M, T, N> {
type Output = T;

fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
self.pipe.inner.lock(|cell| {
let mut inner = cell.borrow_mut();

if let Some(waker) = inner.receiver_waker.take() {
waker.wake();
}

if let Some(item) = inner.queue.pop_front() {
Poll::Ready(item)
} else {
inner.receiver_waker = Some(cx.waker().clone());
Poll::Pending
}
})
}
}

/// A FIFO queue holding a fixed number of items. Older items are dropped if the
/// queue is full when a new item is pushed.
pub(crate) struct LossyQueue<M: RawMutex, T, const N: usize> {
inner: Mutex<M, RefCell<LossyQueueData<T, N>>>,
}

impl<M: RawMutex, T, const N: usize> LossyQueue<M, T, N> {
pub(crate) const fn new() -> Self {
Self {
inner: Mutex::new(RefCell::new(LossyQueueData {
receiver_waker: None,
queue: Deque::new(),
})),
}
}

/// A future that waits for a new item to be available.
pub(crate) fn pop(&self) -> ReceiveFuture<'_, M, T, N> {
ReceiveFuture { pipe: self }
}

/// Pushes an item into the queue. If the queue is already full the oldest
/// item is dropped to make space.
pub(crate) fn push(&self, data: T) {
self.inner.lock(|cell| {
let mut inner = cell.borrow_mut();

if inner.queue.is_full() {
inner.queue.pop_front();
}

// As we pop above the queue cannot be full now.
let _ = inner.queue.push_back(data);

if let Some(waker) = inner.receiver_waker.take() {
waker.wake();
}
})
}
}
Loading

0 comments on commit cf5cbf2

Please sign in to comment.