From f0a614ea56d526002241b090f91f0d5b724d12f2 Mon Sep 17 00:00:00 2001 From: relufi <30567427+relufi@users.noreply.github.com> Date: Wed, 18 Dec 2024 03:08:09 +0800 Subject: [PATCH] refactor(io): handle multiple packets in a read (#4) --- src/io.rs | 205 +++++++++++++++++++++++++++--------------------------- 1 file changed, 104 insertions(+), 101 deletions(-) diff --git a/src/io.rs b/src/io.rs index 380a633..791e2c5 100644 --- a/src/io.rs +++ b/src/io.rs @@ -249,124 +249,127 @@ where return Err(Error::IOError); } } - - let packet_length = match packet_size(&buffer[0..cursor]) { - Some(0) => { - error!("Invalid MQTT packet"); - return Err(Error::PacketError); - } - Some(len) => len, - None => { - // None is returned when there is not yet enough data to decode a packet. - continue; - } - }; - - let packet = match decode_slice(&buffer[0..packet_length]) { - Ok(Some(p)) => p, - Ok(None) => { - error!("Packet length calculation failed."); - return Err(Error::PacketError); - } - Err(_) => { - error!("Invalid MQTT packet"); - return Err(Error::PacketError); - } - }; - - trace!( - "Received packet from broker: {:?}", - Debug2Format(&packet.get_type()) - ); - - match packet { - Packet::Connack(connack) => match connack.code { - ConnectReturnCode::Accepted => { - #[cfg(feature = "homeassistant")] - self.ha_after_connected().await; - - for topic in &self.subscriptions { - let _ = topic.subscribe(false).await; + let mut start_pos = 0; + loop { + let packet_length = match packet_size(&buffer[start_pos..cursor]) { + Some(0) => { + error!("Invalid MQTT packet"); + return Err(Error::PacketError); + } + Some(len) => len, + None => { + // None is returned when there is not yet enough data to decode a packet. + if start_pos != 0 { + buffer.copy_within(start_pos..cursor, 0); + cursor = cursor - start_pos; } - - DATA_CHANNEL.send(MqttMessage::Connected).await; + break; } - _ => { - error!("Connection request to broker was not accepted"); - return Err(Error::IOError); + }; + let packet = match decode_slice(&buffer[start_pos..(start_pos + packet_length)]) { + Ok(Some(p)) => p, + Ok(None) => { + error!("Packet length calculation failed."); + return Err(Error::PacketError); } - }, - Packet::Pingresp => {} - - Packet::Publish(publish) => { - match ( - Topic::from_str(publish.topic_name), - Payload::from(publish.payload), - ) { - (Ok(topic), Ok(payload)) => { - if !self.ha_handle_update(&topic, &payload).await { - DATA_CHANNEL - .send(MqttMessage::Publish(topic, payload)) - .await; + Err(_) => { + error!("Invalid MQTT packet"); + return Err(Error::PacketError); + } + }; + + trace!( + "Received packet from broker: {:?}", + Debug2Format(&packet.get_type()) + ); + + match packet { + Packet::Connack(connack) => match connack.code { + ConnectReturnCode::Accepted => { + #[cfg(feature = "homeassistant")] + self.ha_after_connected().await; + + for topic in &self.subscriptions { + let _ = topic.subscribe(false).await; } + + DATA_CHANNEL.send(MqttMessage::Connected).await; } _ => { - error!("Unable to process publish data as it was too large"); + error!("Connection request to broker was not accepted"); + return Err(Error::IOError); + } + }, + Packet::Pingresp => {} + + Packet::Publish(publish) => { + match ( + Topic::from_str(publish.topic_name), + Payload::from(publish.payload), + ) { + (Ok(topic), Ok(payload)) => { + if !self.ha_handle_update(&topic, &payload).await { + DATA_CHANNEL + .send(MqttMessage::Publish(topic, payload)) + .await; + } + } + _ => { + error!("Unable to process publish data as it was too large"); + } } - } - match publish.qospid { - mqttrs::QosPid::AtMostOnce => {} - mqttrs::QosPid::AtLeastOnce(pid) => { - send_packet(Packet::Puback(pid)).await?; + match publish.qospid { + mqttrs::QosPid::AtMostOnce => {} + mqttrs::QosPid::AtLeastOnce(pid) => { + send_packet(Packet::Puback(pid)).await?; + } + mqttrs::QosPid::ExactlyOnce(pid) => { + send_packet(Packet::Pubrec(pid)).await?; + } } - mqttrs::QosPid::ExactlyOnce(pid) => { - send_packet(Packet::Pubrec(pid)).await?; + } + Packet::Puback(pid) => { + controller.publish_immediate(ControlMessage::Published(pid)); + } + Packet::Pubrec(pid) => { + controller.publish_immediate(ControlMessage::Published(pid)); + send_packet(Packet::Pubrel(pid)).await?; + } + Packet::Pubrel(pid) => send_packet(Packet::Pubrel(pid)).await?, + Packet::Pubcomp(_) => {} + + Packet::Suback(suback) => { + if let Some(return_code) = suback.return_codes.first() { + controller.publish_immediate(ControlMessage::Subscribed( + suback.pid, + *return_code, + )); + } else { + warn!("Unexpected suback with no return codes"); } } - } - Packet::Puback(pid) => { - controller.publish_immediate(ControlMessage::Published(pid)); - } - Packet::Pubrec(pid) => { - controller.publish_immediate(ControlMessage::Published(pid)); - send_packet(Packet::Pubrel(pid)).await?; - } - Packet::Pubrel(pid) => send_packet(Packet::Pubrel(pid)).await?, - Packet::Pubcomp(_) => {} - - Packet::Suback(suback) => { - if let Some(return_code) = suback.return_codes.first() { - controller.publish_immediate(ControlMessage::Subscribed( - suback.pid, - *return_code, - )); - } else { - warn!("Unexpected suback with no return codes"); + Packet::Unsuback(pid) => { + controller.publish_immediate(ControlMessage::Unsubscribed(pid)); } - } - Packet::Unsuback(pid) => { - controller.publish_immediate(ControlMessage::Unsubscribed(pid)); - } - Packet::Connect(_) - | Packet::Subscribe(_) - | Packet::Pingreq - | Packet::Unsubscribe(_) - | Packet::Disconnect => { - debug!( + Packet::Connect(_) + | Packet::Subscribe(_) + | Packet::Pingreq + | Packet::Unsubscribe(_) + | Packet::Disconnect => { + debug!( "Unexpected packet from broker: {:?}", Debug2Format(&packet.get_type()) ); + } + } + start_pos = start_pos + packet_length; + // Adjust the buffer to reclaim any unused data + if start_pos == cursor { + cursor = 0; + break; } - } - - // Adjust the buffer to reclaim any unused data - if packet_length == cursor { - cursor = 0; - } else { - buffer.copy_within(packet_length..cursor, 0); - cursor -= packet_length; } } }