Skip to content

Commit

Permalink
Wait for esp now send callback to be called after sending (esp-rs#232)
Browse files Browse the repository at this point in the history
* Make check_error result propagatable with ?

* Wait for esp_now_send callback after sending esp-rs#229

* Updated examples
  • Loading branch information
M4tsuri authored and bjoernQ committed May 23, 2024
1 parent e89dd06 commit 6bf2b78
Showing 1 changed file with 118 additions and 37 deletions.
155 changes: 118 additions & 37 deletions esp-wifi/src/esp_now/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
use core::{cell::RefCell, fmt::Debug};

use critical_section::Mutex;
use atomic_polyfill::{Ordering, AtomicBool};
use esp_hal_common::peripheral::{Peripheral, PeripheralRef};

use crate::compat::queue::SimpleQueue;
Expand All @@ -24,12 +25,19 @@ pub const BROADCAST_ADDRESS: [u8; 6] = [0xffu8, 0xffu8, 0xffu8, 0xffu8, 0xffu8,

static RECEIVE_QUEUE: Mutex<RefCell<SimpleQueue<ReceivedData, 10>>> =
Mutex::new(RefCell::new(SimpleQueue::new()));
/// This atomic behaves like a guard, so we need strict memory ordering when
/// operating it.
///
/// This flag indicates whether the send callback has been called after a sending.
static ESP_NOW_SEND_CB_INVOKED: AtomicBool = AtomicBool::new(false);
/// Status of esp now send, true for success, false for failure
static ESP_NOW_SEND_STATUS: AtomicBool = AtomicBool::new(true);

macro_rules! check_error {
($block:block) => {
let res = unsafe { $block };
if res != 0 {
return Err(EspNowError::Error(Error::from_code(res as u32)));
match unsafe { $block } {
0 => Ok(()),
res @ _ => Err(EspNowError::Error(Error::from_code(res as u32)))
}
};
}
Expand Down Expand Up @@ -282,10 +290,11 @@ impl<'d> EspNow<'d> {
}

let mut esp_now = EspNow { _device: device };
check_error!({ esp_wifi_set_mode(wifi_mode_t_WIFI_MODE_STA) });
check_error!({ esp_wifi_start() });
check_error!({ esp_now_init() });
check_error!({ esp_now_register_recv_cb(Some(rcv_cb)) });
check_error!({ esp_wifi_set_mode(wifi_mode_t_WIFI_MODE_STA) })?;
check_error!({ esp_wifi_start() })?;
check_error!({ esp_now_init() })?;
check_error!({ esp_now_register_recv_cb(Some(rcv_cb)) })?;
check_error!({ esp_now_register_send_cb(Some(send_cb)) })?;

esp_now.add_peer(PeerInfo {
peer_address: BROADCAST_ADDRESS,
Expand All @@ -300,14 +309,13 @@ impl<'d> EspNow<'d> {
/// Set primary WiFi channel
/// Should only be used when using ESP-NOW without AP or STA
pub fn set_channel(&mut self, channel: u8) -> Result<(), EspNowError> {
check_error!({ esp_wifi_set_channel(channel, 0) });
Ok(())
check_error!({ esp_wifi_set_channel(channel, 0) })
}

/// Get the version of ESPNOW
pub fn get_version(&self) -> Result<u32, EspNowError> {
let mut version = 0u32;
check_error!({ esp_now_get_version(&mut version as *mut u32) });
check_error!({ esp_now_get_version(&mut version as *mut u32) })?;
Ok(version)
}

Expand All @@ -321,15 +329,12 @@ impl<'d> EspNow<'d> {
encrypt: peer.encrypt,
priv_: core::ptr::null_mut(),
};
check_error!({ esp_now_add_peer(&raw_peer as *const _) });

Ok(())
check_error!({ esp_now_add_peer(&raw_peer as *const _) })
}

/// Remove the given peer
pub fn remove_peer(&mut self, peer_address: &[u8; 6]) -> Result<(), EspNowError> {
check_error!({ esp_now_del_peer(peer_address.as_ptr()) });
Ok(())
check_error!({ esp_now_del_peer(peer_address.as_ptr()) })
}

/// Modify a peer information
Expand All @@ -342,9 +347,7 @@ impl<'d> EspNow<'d> {
encrypt: peer.encrypt,
priv_: core::ptr::null_mut(),
};
check_error!({ esp_now_mod_peer(&raw_peer as *const _) });

Ok(())
check_error!({ esp_now_mod_peer(&raw_peer as *const _) })
}

/// Get peer by MAC address
Expand All @@ -357,7 +360,7 @@ impl<'d> EspNow<'d> {
encrypt: false,
priv_: core::ptr::null_mut(),
};
check_error!({ esp_now_get_peer(peer_address.as_ptr(), &mut raw_peer as *mut _) });
check_error!({ esp_now_get_peer(peer_address.as_ptr(), &mut raw_peer as *mut _) })?;

Ok(PeerInfo {
peer_address: raw_peer.peer_addr,
Expand Down Expand Up @@ -388,7 +391,7 @@ impl<'d> EspNow<'d> {
encrypt: false,
priv_: core::ptr::null_mut(),
};
check_error!({ esp_now_fetch_peer(from_head, &mut raw_peer as *mut _) });
check_error!({ esp_now_fetch_peer(from_head, &mut raw_peer as *mut _) })?;

Ok(PeerInfo {
peer_address: raw_peer.peer_addr,
Expand Down Expand Up @@ -417,7 +420,7 @@ impl<'d> EspNow<'d> {
total_num: 0,
encrypt_num: 0,
};
check_error!({ esp_now_get_peer_num(&mut peer_num as *mut _) });
check_error!({ esp_now_get_peer_num(&mut peer_num as *mut _) })?;

Ok(PeerCount {
total_count: peer_num.total_num,
Expand All @@ -427,36 +430,37 @@ impl<'d> EspNow<'d> {

/// Set the primary master key
pub fn set_pmk(&mut self, pmk: &[u8; 16]) -> Result<(), EspNowError> {
check_error!({ esp_now_set_pmk(pmk.as_ptr()) });

Ok(())
check_error!({ esp_now_set_pmk(pmk.as_ptr()) })
}

/// Set wake window for esp_now to wake up in interval unit
///
/// Window is milliseconds the chip keep waked each interval, from 0 to 65535.
pub fn set_wake_window(&mut self, wake_window: u16) -> Result<(), EspNowError> {
check_error!({ esp_now_set_wake_window(wake_window) });

Ok(())
check_error!({ esp_now_set_wake_window(wake_window) })
}

/// Config ESPNOW rate
pub fn set_rate(&mut self, rate: WifiPhyRate) -> Result<(), EspNowError> {
check_error!({ esp_wifi_config_espnow_rate(wifi_interface_t_WIFI_IF_STA, rate as u32,) });

Ok(())
check_error!({ esp_wifi_config_espnow_rate(wifi_interface_t_WIFI_IF_STA, rate as u32,) })
}

/// Send data to peer
///
/// The peer needs to be added to the peer list first
pub fn send(&mut self, dst_addr: &[u8; 6], data: &[u8]) -> Result<(), EspNowError> {
/// The peer needs to be added to the peer list first.
///
/// This method returns a `SendWaiter` on success. ESP-NOW protocol provides guaranteed
/// delivery on MAC layer. If you need this guatantee, call `wait` method of the returned
/// `SendWaiter` and make sure it returns `SendStatus::Success`.
/// However, this method will block current task for milliseconds.
/// So you can just drop the waiter if you want high frequency sending.
pub fn send(&mut self, dst_addr: &[u8; 6], data: &[u8]) -> Result<SendWaiter, EspNowError> {
let mut addr = [0u8; 6];
addr.copy_from_slice(dst_addr);
check_error!({ esp_now_send(addr.as_ptr(), data.as_ptr(), data.len() as u32) });

Ok(())
ESP_NOW_SEND_CB_INVOKED.store(false, Ordering::Release);
check_error!({ esp_now_send(addr.as_ptr(), data.as_ptr(), data.len() as u32) })?;
Ok(SendWaiter(()))
}

/// Receive data
Expand All @@ -477,6 +481,51 @@ impl Drop for EspNow<'_> {
}
}

/// This is essentially [esp_now_send_status_t](https://docs.espressif.com/projects/esp-idf/en/latest/esp32/api-reference/network/esp_now.html#_CPPv421esp_now_send_status_t)
#[derive(Debug, Clone, Copy, PartialEq)]
pub enum SendStatus {
Success,
Failed
}

/// This struct is returned by a sync esp now send. Invoking `wait` method of this
/// struct will block current task until the callback function of esp now send is called
/// and return the status of previous sending.
pub struct SendWaiter(());

impl SendWaiter {
/// Wait for the previous sending to complete, i.e. the send callback is invoked with
/// status of the sending.
///
/// Note: if you firstly dropped waiter of a sending and then wait for a following sending,
/// you probably get unreliable status because we cannot determine which sending the waited status
/// belongs to.
pub fn wait(self) -> SendStatus {
while !ESP_NOW_SEND_CB_INVOKED.load(Ordering::Acquire) {}

if ESP_NOW_SEND_STATUS.load(Ordering::Relaxed) {
SendStatus::Success
} else {
SendStatus::Failed
}
}
}

unsafe extern "C" fn send_cb(
_mac_addr: *const u8,
status: esp_now_send_status_t
) {
critical_section::with(|_| {
let is_success = status == esp_now_send_status_t_ESP_NOW_SEND_SUCCESS;
ESP_NOW_SEND_STATUS.store(is_success, Ordering::Relaxed);

ESP_NOW_SEND_CB_INVOKED.store(true, Ordering::Release);

#[cfg(feature = "async")]
asynch::ESP_NOW_TX_WAKER.wake();
})
}

unsafe extern "C" fn rcv_cb(
esp_now_info: *const esp_now_recv_info_t,
data: *const u8,
Expand Down Expand Up @@ -572,22 +621,54 @@ unsafe extern "C" fn rcv_cb(
.unwrap();

#[cfg(feature = "async")]
asynch::ESP_NOW_WAKER.wake();
asynch::ESP_NOW_RX_WAKER.wake();
});
}

#[cfg(feature = "async")]
pub use asynch::SendFuture;

#[cfg(feature = "async")]
mod asynch {
use super::*;
use core::task::{Context, Poll};
use embassy_sync::waitqueue::AtomicWaker;

pub(super) static ESP_NOW_WAKER: AtomicWaker = AtomicWaker::new();
pub(super) static ESP_NOW_TX_WAKER: AtomicWaker = AtomicWaker::new();
pub(super) static ESP_NOW_RX_WAKER: AtomicWaker = AtomicWaker::new();

impl<'d> EspNow<'d> {
pub async fn receive_async(&mut self) -> ReceivedData {
ReceiveFuture.await
}

pub fn send_async(&mut self, dst_addr: &[u8; 6], data: &[u8]) -> Result<SendFuture, EspNowError> {
let mut addr = [0u8; 6];
addr.copy_from_slice(dst_addr);
ESP_NOW_SEND_CB_INVOKED.store(false, Ordering::Release);
check_error!({ esp_now_send(addr.as_ptr(), data.as_ptr(), data.len() as u32) })?;
Ok(SendFuture(()))
}
}

pub struct SendFuture(());

impl core::future::Future for SendFuture {
type Output = SendStatus;

fn poll(self: core::pin::Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
ESP_NOW_TX_WAKER.register(cx.waker());

if ESP_NOW_SEND_CB_INVOKED.load(Ordering::Acquire) {
Poll::Ready(if ESP_NOW_SEND_STATUS.load(Ordering::Relaxed) {
SendStatus::Success
} else {
SendStatus::Failed
})
} else {
Poll::Pending
}
}
}

struct ReceiveFuture;
Expand All @@ -596,7 +677,7 @@ mod asynch {
type Output = ReceivedData;

fn poll(self: core::pin::Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
ESP_NOW_WAKER.register(cx.waker());
ESP_NOW_RX_WAKER.register(cx.waker());

if let Some(data) = critical_section::with(|cs| {
let mut queue = RECEIVE_QUEUE.borrow_ref_mut(cs);
Expand Down

0 comments on commit 6bf2b78

Please sign in to comment.