Skip to content

Commit

Permalink
refactor(io): handle multiple packets in a read (#4)
Browse files Browse the repository at this point in the history
  • Loading branch information
relufi authored Dec 17, 2024
1 parent 72c7ca1 commit f0a614e
Showing 1 changed file with 104 additions and 101 deletions.
205 changes: 104 additions & 101 deletions src/io.rs
Original file line number Diff line number Diff line change
Expand Up @@ -249,124 +249,127 @@ where
return Err(Error::IOError);
}
}

let packet_length = match packet_size(&buffer[0..cursor]) {
Some(0) => {
error!("Invalid MQTT packet");
return Err(Error::PacketError);
}
Some(len) => len,
None => {
// None is returned when there is not yet enough data to decode a packet.
continue;
}
};

let packet = match decode_slice(&buffer[0..packet_length]) {
Ok(Some(p)) => p,
Ok(None) => {
error!("Packet length calculation failed.");
return Err(Error::PacketError);
}
Err(_) => {
error!("Invalid MQTT packet");
return Err(Error::PacketError);
}
};

trace!(
"Received packet from broker: {:?}",
Debug2Format(&packet.get_type())
);

match packet {
Packet::Connack(connack) => match connack.code {
ConnectReturnCode::Accepted => {
#[cfg(feature = "homeassistant")]
self.ha_after_connected().await;

for topic in &self.subscriptions {
let _ = topic.subscribe(false).await;
let mut start_pos = 0;
loop {
let packet_length = match packet_size(&buffer[start_pos..cursor]) {
Some(0) => {
error!("Invalid MQTT packet");
return Err(Error::PacketError);
}
Some(len) => len,
None => {
// None is returned when there is not yet enough data to decode a packet.
if start_pos != 0 {
buffer.copy_within(start_pos..cursor, 0);
cursor = cursor - start_pos;
}

DATA_CHANNEL.send(MqttMessage::Connected).await;
break;
}
_ => {
error!("Connection request to broker was not accepted");
return Err(Error::IOError);
};
let packet = match decode_slice(&buffer[start_pos..(start_pos + packet_length)]) {
Ok(Some(p)) => p,
Ok(None) => {
error!("Packet length calculation failed.");
return Err(Error::PacketError);
}
},
Packet::Pingresp => {}

Packet::Publish(publish) => {
match (
Topic::from_str(publish.topic_name),
Payload::from(publish.payload),
) {
(Ok(topic), Ok(payload)) => {
if !self.ha_handle_update(&topic, &payload).await {
DATA_CHANNEL
.send(MqttMessage::Publish(topic, payload))
.await;
Err(_) => {
error!("Invalid MQTT packet");
return Err(Error::PacketError);
}
};

trace!(
"Received packet from broker: {:?}",
Debug2Format(&packet.get_type())
);

match packet {
Packet::Connack(connack) => match connack.code {
ConnectReturnCode::Accepted => {
#[cfg(feature = "homeassistant")]
self.ha_after_connected().await;

for topic in &self.subscriptions {
let _ = topic.subscribe(false).await;
}

DATA_CHANNEL.send(MqttMessage::Connected).await;
}
_ => {
error!("Unable to process publish data as it was too large");
error!("Connection request to broker was not accepted");
return Err(Error::IOError);
}
},
Packet::Pingresp => {}

Packet::Publish(publish) => {
match (
Topic::from_str(publish.topic_name),
Payload::from(publish.payload),
) {
(Ok(topic), Ok(payload)) => {
if !self.ha_handle_update(&topic, &payload).await {
DATA_CHANNEL
.send(MqttMessage::Publish(topic, payload))
.await;
}
}
_ => {
error!("Unable to process publish data as it was too large");
}
}
}

match publish.qospid {
mqttrs::QosPid::AtMostOnce => {}
mqttrs::QosPid::AtLeastOnce(pid) => {
send_packet(Packet::Puback(pid)).await?;
match publish.qospid {
mqttrs::QosPid::AtMostOnce => {}
mqttrs::QosPid::AtLeastOnce(pid) => {
send_packet(Packet::Puback(pid)).await?;
}
mqttrs::QosPid::ExactlyOnce(pid) => {
send_packet(Packet::Pubrec(pid)).await?;
}
}
mqttrs::QosPid::ExactlyOnce(pid) => {
send_packet(Packet::Pubrec(pid)).await?;
}
Packet::Puback(pid) => {
controller.publish_immediate(ControlMessage::Published(pid));
}
Packet::Pubrec(pid) => {
controller.publish_immediate(ControlMessage::Published(pid));
send_packet(Packet::Pubrel(pid)).await?;
}
Packet::Pubrel(pid) => send_packet(Packet::Pubrel(pid)).await?,
Packet::Pubcomp(_) => {}

Packet::Suback(suback) => {
if let Some(return_code) = suback.return_codes.first() {
controller.publish_immediate(ControlMessage::Subscribed(
suback.pid,
*return_code,
));
} else {
warn!("Unexpected suback with no return codes");
}
}
}
Packet::Puback(pid) => {
controller.publish_immediate(ControlMessage::Published(pid));
}
Packet::Pubrec(pid) => {
controller.publish_immediate(ControlMessage::Published(pid));
send_packet(Packet::Pubrel(pid)).await?;
}
Packet::Pubrel(pid) => send_packet(Packet::Pubrel(pid)).await?,
Packet::Pubcomp(_) => {}

Packet::Suback(suback) => {
if let Some(return_code) = suback.return_codes.first() {
controller.publish_immediate(ControlMessage::Subscribed(
suback.pid,
*return_code,
));
} else {
warn!("Unexpected suback with no return codes");
Packet::Unsuback(pid) => {
controller.publish_immediate(ControlMessage::Unsubscribed(pid));
}
}
Packet::Unsuback(pid) => {
controller.publish_immediate(ControlMessage::Unsubscribed(pid));
}

Packet::Connect(_)
| Packet::Subscribe(_)
| Packet::Pingreq
| Packet::Unsubscribe(_)
| Packet::Disconnect => {
debug!(
Packet::Connect(_)
| Packet::Subscribe(_)
| Packet::Pingreq
| Packet::Unsubscribe(_)
| Packet::Disconnect => {
debug!(
"Unexpected packet from broker: {:?}",
Debug2Format(&packet.get_type())
);
}
}
start_pos = start_pos + packet_length;
// Adjust the buffer to reclaim any unused data
if start_pos == cursor {
cursor = 0;
break;
}
}

// Adjust the buffer to reclaim any unused data
if packet_length == cursor {
cursor = 0;
} else {
buffer.copy_within(packet_length..cursor, 0);
cursor -= packet_length;
}
}
}
Expand Down

0 comments on commit f0a614e

Please sign in to comment.