Skip to content

Commit

Permalink
#101 Added 'try_publish()' to client and topic.
Browse files Browse the repository at this point in the history
  • Loading branch information
fpagliughi committed Dec 30, 2020
1 parent d14bb27 commit 71ccf89
Show file tree
Hide file tree
Showing 3 changed files with 96 additions and 41 deletions.
17 changes: 14 additions & 3 deletions examples/async_publish_time.rs
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ fn main() {
// fail with a persistence error.
let create_opts = mqtt::CreateOptionsBuilder::new()
.server_uri(host)
.client_id("rust_async_persist_pub")
.client_id("rust_async_pub_time")
.persistence("persist")
.finalize();

Expand All @@ -81,6 +81,10 @@ fn main() {
process::exit(1);
}

// Note that with MQTT v5, this would be a good place to use a topic
// object with an alias. It might help reduce the size of the messages
// if the topic string is long.

let topic = "data/time";

// Create messages and publish them
Expand All @@ -99,8 +103,15 @@ fn main() {
let tf = 0.01 * (t as f64);

let msg = mqtt::Message::new(topic, format!("{:.3}", tf), 1);
if let Err(e) = cli.publish(msg).wait() {
println!("Error sending message: {:?}", e);

// We don't need to use `try_publish()` here since we just wait on
// the token, but this shows how we could use it.
match cli.try_publish(msg) {
Err(err) =>
eprintln!("Error creating/queuing the message: {}", err),
Ok(tok) => if let Err(err) = tok.wait() {
eprintln!("Error sending message: {}", err);
},
}
}
}
32 changes: 23 additions & 9 deletions src/async_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -83,9 +83,9 @@ use crate::{
SubscribeManyToken,
},
client_persistence::UserPersistence,
errors::{self, Result},
string_collection::StringCollection,
reason_code::ReasonCode,
errors::{self, Result, Error},
};

/////////////////////////////////////////////////////////////////////////////
Expand Down Expand Up @@ -644,13 +644,13 @@ impl AsyncClient {
}
}

/// Publishes a message to an MQTT broker
/// Attempts to publish a message to the MQTT broker, but returns an
/// error immediately if there's a problem creating or queuing the
/// message.
///
/// # Arguments
///
/// * `msg` The message to publish.
///
pub fn publish(&self, msg: Message) -> DeliveryToken {
/// Returns a Publish Error on failure so that the original message
/// can be recovered and sent again.
pub fn try_publish(&self, msg: Message) -> Result<DeliveryToken> {
debug!("Publish: {:?}", msg);

let ver = self.mqtt_version();
Expand All @@ -667,14 +667,28 @@ impl AsyncClient {
if rc != 0 {
let _ = unsafe { Token::from_raw(rsp_opts.copts.context) };
let msg: Message = tok.into();
DeliveryToken::from_error(msg, rc)
Err(Error::Publish(rc, msg))
}
else {
tok.set_msgid(rsp_opts.copts.token as i16);
tok
Ok(tok)
}
}

/// Publishes a message to the MQTT broker.
///
/// Returns a Delivery Token to track the progress of the operation.
///
pub fn publish(&self, msg: Message) -> DeliveryToken {
match self.try_publish(msg) {
Ok(tok) => tok,
Err(Error::Publish(rc, msg)) =>
DeliveryToken::from_error(msg, rc),
_ => panic!("Unknown publish error"),
}
}


/// Subscribes to a single topic.
///
/// # Arguments
Expand Down
88 changes: 59 additions & 29 deletions src/topic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ use crate::{
subscribe_options::SubscribeOptions,
message::{Message, MessageBuilder},
properties::{PropertyCode, Properties},
errors::Result,
};

/////////////////////////////////////////////////////////////////////////////
Expand Down Expand Up @@ -95,6 +96,27 @@ impl<'a> Topic<'a>
}
}

/// Create a message for the topic using the supplied payload
fn create_message<V>(&self, payload: V) -> Message
where V: Into<Vec<u8>>
{
// OPTIMIZE: This could be more efficient.
if self.alias == 0 {
Message::new(&self.topic, payload, self.qos)
}
else {
let props = properties!{ PropertyCode::TopicAlias => self.alias };
MessageBuilder::new()
.topic("")
.payload(payload)
.qos(self.qos)
.retained(self.retained)
.properties(props)
.finalize()
}
}


/// Subscribe to the topic.
pub fn subscribe(&self) -> Token {
self.cli.subscribe(self.topic.clone(), self.qos)
Expand All @@ -112,7 +134,7 @@ impl<'a> Topic<'a>
///
/// If a topic alias was previously sent, this will use the integer alias
/// property instead of sending the topic string.
/// Topis aliases are only applicable for MQTT v5 connections.
/// Topic aliases are only applicable for MQTT v5 connections.
///
/// # Arguments
///
Expand All @@ -121,23 +143,30 @@ impl<'a> Topic<'a>
pub fn publish<V>(&self, payload: V) -> DeliveryToken
where V: Into<Vec<u8>>
{
// OPTIMIZE: This could be more efficient.
let msg = if self.alias == 0 {
Message::new(&self.topic, payload, self.qos)
}
else {
let props = properties!{ PropertyCode::TopicAlias => self.alias };
MessageBuilder::new()
.topic("")
.payload(payload)
.qos(self.qos)
.retained(self.retained)
.properties(props)
.finalize()
};
let msg = self.create_message(payload);
self.cli.publish(msg)
}

/// Attempts to publish a message on the topic, but returns an error
/// immediately if there's a problem creating or queuing the message for
/// transmission.
///
/// If a topic alias was previously sent, this will use the integer alias
/// property instead of sending the topic string.
/// Topic aliases are only applicable for MQTT v5 connections.
///
/// # Arguments
///
/// `payload` The payload of the message
///
/// Returns a Publish Error containing the complete message on failure.
pub fn try_publish<V>(&self, payload: V) -> Result<DeliveryToken>
where V: Into<Vec<u8>>
{
let msg = self.create_message(payload);
self.cli.try_publish(msg)
}

/// Publish a message with a topic alias.
///
/// This publishes the message with a topic alias property to set the
Expand Down Expand Up @@ -167,20 +196,21 @@ impl<'a> Topic<'a>
where V: Into<Vec<u8>>
{
self.alias = alias;
let msg = if alias == 0 {
Message::new(&self.topic, payload, self.qos)
}
else {
let props = properties!{ PropertyCode::TopicAlias => alias };
MessageBuilder::new()
.topic(self.topic.clone())
.payload(payload)
.qos(self.qos)
.retained(self.retained)
.properties(props)
.finalize()
};
self.cli.publish(msg)
self.publish(payload)
}

/// Attempts to publish a message on the topic using and setting a new topic
/// alias, but returns an error immediately if there's a problem creating or
/// queuing the message for transmission.
///
/// See `publish_with_alias()` for more information.
///
/// Returns a Publish Error containing the complete message on failure.
pub fn try_publish_with_alias<V>(&mut self, alias: u16, payload: V) -> Result<DeliveryToken>
where V: Into<Vec<u8>>
{
self.alias = alias;
self.try_publish(payload)
}
}

0 comments on commit 71ccf89

Please sign in to comment.