Skip to content

Commit

Permalink
fix double ack
Browse files Browse the repository at this point in the history
  • Loading branch information
Biehl Ulate committed Mar 8, 2024
1 parent c8a782d commit ee76d22
Showing 1 changed file with 21 additions and 8 deletions.
29 changes: 21 additions & 8 deletions src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -79,29 +79,42 @@ impl<T: Transport> ClientTransport<T> {

pub async fn try_receive_packet(&mut self) -> IoResult<(Packet, SocketAddr)> {
let result = self.try_receive_internal().await?;
if result.0.header.get_type() == MessageType::Confirmable {
self.send_ack_for_confirmable_packet(&result.0).await;
}
return Ok(result);
}
async fn send_ack_for_confirmable_packet(&mut self, packet: &Packet) {
async fn intercept_packet_for_acks(&mut self, packet: &Packet) {
match (packet.header.get_type(), packet.header.code) {
(MessageType::Confirmable, MessageClass::Response(_)) => {
self.send_ack_for_confirmable_response(&packet).await
}
_ => {}
}
}
async fn send_ack_for_confirmable_response(&mut self, packet: &Packet) {
let mut ack = Packet::new();
ack.header.set_type(MessageType::Acknowledgement);
ack.header.message_id = packet.header.message_id;
ack.header.code = MessageClass::Empty;
let _ = self.inner.send(&ack.to_bytes().unwrap()).await;
}

async fn try_receive_internal(&mut self) -> IoResult<(Packet, SocketAddr)> {
async fn receive_internal_no_cache(&mut self) -> IoResult<(Packet, SocketAddr)> {
let mut buf = [0; 1500];
if let Some(p) = self.cache.take() {
return Ok(p);
}
let (nread, src) = timeout(self.timeout, self.inner.recv(&mut buf)).await??;
let packet = Packet::from_bytes(&buf[..nread])
.map_err(|_| Error::new(ErrorKind::InvalidInput, "packet error"))?;
return Ok((packet, src));
}

async fn try_receive_internal(&mut self) -> IoResult<(Packet, SocketAddr)> {
if let Some(p) = self.cache.take() {
return Ok(p);
}
let p = self.receive_internal_no_cache().await?;
let (packet, addr) = &p;
self.intercept_packet_for_acks(packet).await;
return Ok(p);
}

/// tries to send a confirmable message with retries and timeouts
async fn try_send_confirmable_message(&mut self, msg: &[u8]) -> IoResult<()> {
for _ in 0..self.retries {
Expand Down

0 comments on commit ee76d22

Please sign in to comment.