Skip to content

Commit

Permalink
Merge pull request #98 from osobiehl/feature/jb/request-builder
Browse files Browse the repository at this point in the history
[RFC] rework requests and observe
  • Loading branch information
Covertness authored Mar 27, 2024
2 parents 1b9bd83 + d933919 commit b37045a
Show file tree
Hide file tree
Showing 6 changed files with 324 additions and 140 deletions.
10 changes: 6 additions & 4 deletions examples/echo_with_dtls.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
extern crate coap;
use coap::client::CoAPClient;
use coap::dtls::DtlsConfig;
use coap::request::RequestBuilder;
use coap::Server;
use coap_lite::{CoapRequest, RequestType as Method};
use std::future::Future;
Expand Down Expand Up @@ -72,10 +73,11 @@ async fn main() {
.await
.expect("could not create client");
let domain = format!("127.0.0.1:{}", server_port);
let resp = client
.request_path("/hello", Method::Get, None, None, Some(domain.to_string()))
.await
.unwrap();

let request = RequestBuilder::new("/hello", Method::Get)
.domain(domain.to_string())
.build();
let resp = client.send(request).await.unwrap();
println!(
"receive on client: {}",
std::str::from_utf8(&resp.message.payload).unwrap()
Expand Down
188 changes: 72 additions & 116 deletions src/client.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
#[cfg(feature = "dtls")]
use crate::dtls::{DtlsConfig, DtlsConnection};
use crate::request::RequestBuilder;
use alloc::string::String;
use alloc::vec::Vec;
use coap_lite::{
Expand Down Expand Up @@ -110,7 +111,7 @@ impl<T: Transport> ClientTransport<T> {
return Ok(p);
}
let p = self.receive_internal_no_cache().await?;
let (packet, addr) = &p;
let (packet, _) = &p;
self.intercept_packet_for_acks(packet).await;
return Ok(p);
}
Expand Down Expand Up @@ -343,9 +344,12 @@ impl<T: Transport + 'static> CoAPClient<T> {
) -> IoResult<CoapResponse> {
let (domain, port, path, queries) = Self::parse_coap_url(url)?;
let mut client = UdpCoAPClient::new_udp((domain.as_str(), port)).await?;
client
.request_path(&path, method, data, queries, Some(domain))
.await
let request = RequestBuilder::new(&path, method)
.queries(queries)
.domain(domain)
.data(data)
.build();
client.send(request).await
}

/// Execute a single request (GET, POST, PUT, DELETE) with a coap url and a specfic timeout
Expand All @@ -358,62 +362,21 @@ impl<T: Transport + 'static> CoAPClient<T> {
) -> IoResult<CoapResponse> {
let (domain, port, path, queries) = Self::parse_coap_url(url)?;
let mut client = UdpCoAPClient::new_udp((domain.as_str(), port)).await?;
client
.request_path_with_timeout(&path, method, data, queries, Some(domain), timeout)
.await
}
client.set_receive_timeout(timeout);
let request = RequestBuilder::new(&path, method)
.queries(queries)
.domain(domain)
.data(data)
.build();

/// Execute a request (GET, POST, PUT, DELETE) using the given transport
pub async fn request_path(
&mut self,
path: &str,
method: Method,
data: Option<Vec<u8>>,
queries: Option<Vec<u8>>,
domain: Option<String>,
) -> IoResult<CoapResponse> {
self.request_path_with_timeout(path, method, data, queries, domain, self.transport.timeout)
.await
}

/// Execute a request (GET, POST, PUT, DELETE) with a specfic timeout using the given transport. This method will
/// try to use block1 requests with a block size of 1024 by default.
pub async fn request_path_with_timeout(
&mut self,
path: &str,
method: Method,
data: Option<Vec<u8>>,
queries: Option<Vec<u8>>,
domain: Option<String>,
timeout: Duration,
) -> IoResult<CoapResponse> {
let mut request = CoapRequest::new();
request.set_method(method);
request.set_path(path);
if let Some(q) = queries {
request.message.add_option(CoapOption::UriQuery, q);
}
if let Some(d) = domain {
request
.message
.add_option(CoapOption::UriHost, d.as_str().as_bytes().to_vec());
}
request.message.header.message_id = Self::gen_message_id(&mut self.message_id);
match data {
Some(data) => request.message.payload = data,
None => (),
}
self.set_receive_timeout(timeout);
self.perform_request(request).await
client.send(request).await
}

/// Send a Request via the given transport, and receive a response.
/// users are responsible for filling meaningful fields in the request
/// this method supports blockwise requests
pub async fn perform_request(
&mut self,
mut request: CoapRequest<SocketAddr>,
) -> IoResult<CoapResponse> {
/// use RequestBuilder to build requests to send
pub async fn send(&mut self, mut request: CoapRequest<SocketAddr>) -> IoResult<CoapResponse> {
self.send_request(&mut request).await?;
self.receive(&mut request).await
}
Expand Down Expand Up @@ -443,31 +406,21 @@ impl<T: Transport + 'static> CoAPClient<T> {
where
T: 'static + Send + Sync,
{
// TODO: support observe multi resources at the same time
let mut message_id = Self::gen_message_id(&mut self.message_id);

let request_factory = move || {
let mut register_packet = CoapRequest::new();
register_packet.message.header.message_id = Self::gen_message_id(&mut message_id);
register_packet.set_path(&resource_path);
register_packet
};
let register_packet = RequestBuilder::new(resource_path, Method::Get).build();

self.set_receive_timeout(timeout);
self.observe_with_factory(request_factory, handler).await
self.observe_with(register_packet, handler).await
}

/// observe a resource with a given transport using your own function to
/// generate requests. Use this method if you need to set some specific options in your
/// observe a resource with a given transport using your own request
/// Use this method if you need to set some specific options in your
/// requests. This method will add observe flags and a message id as a fallback
pub async fn observe_with_factory<
Fac: FnOnce() -> CoapRequest<SocketAddr>,
H: FnMut(Packet) + Send + 'static,
>(
pub async fn observe_with<H: FnMut(Packet) + Send + 'static>(
mut self,
factory: Fac,
request: CoapRequest<SocketAddr>,
mut handler: H,
) -> IoResult<oneshot::Sender<ObserveMessage>> {
let mut register_packet = factory();
let mut register_packet = request;
if 0 == register_packet.message.header.message_id {
register_packet.message.header.message_id = Self::gen_message_id(&mut self.message_id);
}
Expand Down Expand Up @@ -842,7 +795,16 @@ mod test {
let domain = "coap.me";
let mut client = UdpCoAPClient::new_udp((domain, 5683)).await.unwrap();
let resp = client
.request_path("/hello", Method::Get, None, None, Some(domain.to_string()))
.send(
RequestBuilder::request_path(
"/hello",
Method::Get,
None,
None,
Some(domain.to_string()),
)
.build(),
)
.await
.unwrap();
assert_eq!(resp.message.payload, b"world".to_vec());
Expand All @@ -864,12 +826,15 @@ mod test {
let domain = "coap.me";
let mut client = UdpCoAPClient::new_udp((domain, 5683)).await.unwrap();
let resp = client
.request_path(
"/validate",
Method::Post,
Some(b"world".to_vec()),
None,
Some(domain.to_string()),
.send(
RequestBuilder::request_path(
"/validate",
Method::Post,
Some(b"world".to_vec()),
None,
Some(domain.to_string()),
)
.build(),
)
.await
.unwrap();
Expand All @@ -893,12 +858,11 @@ mod test {
let domain = "coap.me";
let mut client = UdpCoAPClient::new_udp((domain, 5683)).await.unwrap();
let resp = client
.request_path(
"/create1",
Method::Put,
Some(b"world".to_vec()),
None,
Some(domain.to_string()),
.send(
RequestBuilder::new("/create1", Method::Put)
.data(Some(b"world".to_vec()))
.domain(domain.to_string())
.build(),
)
.await
.unwrap();
Expand All @@ -922,12 +886,10 @@ mod test {
let domain = "coap.me";
let mut client = UdpCoAPClient::new_udp((domain, 5683)).await.unwrap();
let resp = client
.request_path(
"/validate",
Method::Delete,
None,
None,
Some(domain.to_string()),
.send(
RequestBuilder::new("/validate", Method::Delete)
.domain(domain.to_string())
.build(),
)
.await
.unwrap();
Expand Down Expand Up @@ -979,12 +941,11 @@ mod test {
let domain = "coap.me";
let mut client = UdpCoAPClient::new_udp((domain, 5683)).await.unwrap();
let resp = client
.request_path(
"/large-create",
Method::Put,
Some(large_payload.clone()),
None,
Some(domain.to_string()),
.send(
RequestBuilder::new("/large-create", Method::Put)
.domain(domain.to_string())
.data(Some(large_payload.clone()))
.build(),
)
.await;
let err = resp.unwrap_err();
Expand All @@ -993,12 +954,11 @@ mod test {
client.set_block1_size(10_000_000);

let resp = client
.request_path(
"/large-create",
Method::Post,
Some(large_payload.clone()),
None,
Some(domain.to_string()),
.send(
RequestBuilder::new("/large-create", Method::Post)
.data(Some(large_payload.clone()))
.domain(domain.to_string())
.build(),
)
.await
.unwrap();
Expand Down Expand Up @@ -1083,22 +1043,18 @@ mod test {
ClientTransport::<FaultyUdp>::DEFAULT_NUM_RETRIES as u32 + 1,
)
.await;
let error = client
.request_path("/Rust", Method::Get, None, None, Some(server_addr.clone()))
.await
.unwrap_err();
let request_gen = || {
RequestBuilder::new("/Rust", Method::Get)
.domain(server_addr.clone())
.build()
};
let error = client.send(request_gen()).await.unwrap_err();
assert_eq!(error.kind(), ErrorKind::TimedOut);
//this request will work, we do this to reset the state of the faulty udp
client
.request_path("/Rust", Method::Get, None, None, Some(server_addr.clone()))
.await
.unwrap();
client.send(request_gen()).await.unwrap();

client.set_transport_retries(ClientTransport::<UdpTransport>::DEFAULT_NUM_RETRIES + 2);
let resp = client
.request_path("/Rust", Method::Get, None, None, Some(server_addr))
.await
.unwrap();
let resp = client.send(request_gen()).await.unwrap();

assert_eq!(resp.message.payload, b"Rust".to_vec());
}
Expand All @@ -1120,7 +1076,7 @@ mod test {
request.message.header.message_id = 123;
request.message.header.set_type(MessageType::NonConfirmable);

let req = client.perform_request(request).await;
let req = client.send(request).await;
assert!(req.is_err());
}
}
Loading

0 comments on commit b37045a

Please sign in to comment.