Skip to content

Commit

Permalink
feat: reconstructing channel implementation (#45)
Browse files Browse the repository at this point in the history
* feat: 重构为 channel 实现

* feat: add mpsc channel

* feat: remove unused code

* minor: update version to v0.2.4

* minor: update chanel bound to 0
  • Loading branch information
hanshuaikang authored Jan 25, 2025
1 parent 640ee06 commit 1acfefa
Show file tree
Hide file tree
Showing 5 changed files with 115 additions and 69 deletions.
2 changes: 1 addition & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 1 addition & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "nping"
version = "0.2.3"
version = "0.2.4"
edition = "2021"

[dependencies]
Expand All @@ -10,4 +10,3 @@ ratatui = "0.29.0"
tokio = { version = "1.42.0", features = ["full"] }
pinger="2.0.0"
anyhow="1.0.89"

72 changes: 57 additions & 15 deletions src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,13 @@ use std::collections::{HashSet, VecDeque};
use std::sync::{Arc, Mutex};
use tokio::task;
use crate::ip_data::IpData;
use std::sync::mpsc;
use std::thread;
use crate::network::send_ping;

#[derive(Parser, Debug)]
#[command(
version = "v0.2.3",
version = "v0.2.4",
author = "hanshuaikang<https://github.com/hanshuaikang>",
about = "🏎 Nping mean NB Ping, A Ping Tool in Rust with Real-Time Data and Visualizations"
)]
Expand Down Expand Up @@ -90,21 +92,28 @@ async fn run_app(
let terminal = ui::init_terminal().unwrap();
let terminal_guard = Arc::new(Mutex::new(terminal::TerminalGuard::new(terminal)));

let mut addrs = Vec::new();

// ip channel
let (ping_update_tx, ping_update_rx) = mpsc::sync_channel::<IpData>(0);

let ping_update_tx = Arc::new(ping_update_tx);


let mut ips = Vec::new();
// if multiple is set, get multiple IP addresses for each target
if targets.len() == 1 && multiple > 0 {
// get multiple IP addresses for the target
addrs = network::get_multiple_host_ipaddr(&targets[0], force_ipv6, multiple as usize)?;
ips = network::get_multiple_host_ipaddr(&targets[0], force_ipv6, multiple as usize)?;
} else {
// get IP address for each target
for target in &targets {
let ip = network::get_host_ipaddr(target, force_ipv6)?;
addrs.push(ip);
ips.push(ip);
}
}

// Define statistics variables
let ip_data = Arc::new(Mutex::new(addrs.iter().enumerate().map(|(i, _)| IpData {
let ip_data = Arc::new(Mutex::new(ips.iter().enumerate().map(|(i, _)| IpData {
ip: String::new(),
addr: if targets.len() == 1 { targets[0].clone() } else { targets[i].clone() },
rtts: VecDeque::new(),
Expand All @@ -122,25 +131,58 @@ async fn run_app(

let interval = if interval == 0 { 500 } else { interval * 1000 };
let mut tasks = Vec::new();
for (i, addr) in addrs.iter().enumerate() {
let addr = addr.clone();

{
let ip_data = ip_data.clone();
let errs = errs.clone();
let terminal_guard = terminal_guard.clone();
let view_type = view_type.clone();

{
let mut guard = terminal_guard.lock().unwrap();
let ip_data = ip_data.lock().unwrap();
// first draw ui
ui::draw_interface(
&mut guard.terminal.as_mut().unwrap(),
&view_type,
&ip_data,
&mut errs.lock().unwrap(),
).ok();
}

thread::spawn(move || {
while let Ok(updated_data) = ping_update_rx.recv() {
let mut ip_data = ip_data.lock().unwrap();
if let Some(pos) = ip_data.iter().position(|d| d.addr == updated_data.addr && d.ip == updated_data.ip) {
ip_data[pos] = updated_data;
}
let mut guard = terminal_guard.lock().unwrap();
ui::draw_interface(
&mut guard.terminal.as_mut().unwrap(),
&view_type,
&ip_data,
&mut errs.lock().unwrap(),
).ok();
}
});
}
for (i, ip) in ips.iter().enumerate() {
let ip = ip.clone();
let running = running.clone();
let errs = errs.clone();
let task = task::spawn({
let ip_data = ip_data.clone();
let errs = errs.clone();
let terminal_guard = terminal_guard.clone(); // Clone terminal_guard here
let view_type = view_type.clone();
let ping_update_tx = ping_update_tx.clone();
let ip_data = ip_data.clone();
let mut data = ip_data.lock().unwrap();
// update the ip
data[i].ip = ip.clone();
let addr = data[i].addr.clone();
async move {
send_ping(addr, i, errs.clone(), count, interval, ip_data.clone(), move || {
let mut terminal_guard = terminal_guard.lock().unwrap();
ui::draw_interface(&mut terminal_guard.terminal.as_mut().unwrap(), &view_type, &ip_data.lock().unwrap(), &errs.lock().unwrap()).unwrap();
}, running.clone()).await.unwrap();
send_ping(addr, ip, errs.clone(), count, interval, running.clone(), ping_update_tx).await.unwrap();
}
});
tasks.push(task);
tasks.push(task)
}

for task in tasks {
Expand Down
106 changes: 55 additions & 51 deletions src/network.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,11 @@
use std::collections::VecDeque;
use std::error::Error;
use std::net::{IpAddr, ToSocketAddrs};
use std::sync::{Arc, Mutex};
use std::sync::mpsc::SyncSender;
use std::time::Duration;
use anyhow::{anyhow, Context};

use pinger::{ping, PingOptions, PingResult};
use crate::ip_data::IpData;

Expand All @@ -21,7 +24,7 @@ pub(crate) fn resolve_host_ips(host: &str, force_ipv6: bool) -> Result<Vec<IpAdd
}

// filter ipv4 or ipv6
let filtered_ips:Vec<IpAddr> = if force_ipv6 {
let filtered_ips: Vec<IpAddr> = if force_ipv6 {
ipaddr.into_iter()
.filter(|ip| matches!(ip, IpAddr::V6(_)))
.collect()
Expand Down Expand Up @@ -54,43 +57,49 @@ pub(crate) fn get_multiple_host_ipaddr(host: &str, force_ipv6: bool, multiple: u

pub struct PingTask {
addr: String,
ip: String,
count: usize,
interval: u64,
index: usize,
ip_data: Arc<Mutex<Vec<IpData>>>,
running: Arc<Mutex<bool>>,
errs: Arc<Mutex<Vec<String>>>,
}

impl PingTask {
pub fn new(
addr: String,
ip: String,
count: usize,
interval: u64,
index: usize,
ip_data: Arc<Mutex<Vec<IpData>>>,
running: Arc<Mutex<bool>>,
errs: Arc<Mutex<Vec<String>>>,
) -> Self {
Self {
addr,
ip,
count,
interval,
index,
ip_data,
running,
errs,
}
}

pub async fn run<F>(&self, mut draw_ui: F) -> Result<(), Box<dyn Error>>
where
F: FnMut() + Send + 'static,
pub async fn run(&self, ping_update_tx: Arc<SyncSender<IpData>>) -> Result<(), Box<dyn Error>>
{
let mut ip_data = IpData {
addr: self.addr.clone(),
ip: self.ip.clone(),
rtts: VecDeque::new(),
last_attr: 0.0,
min_rtt: 0.0,
max_rtt: 0.0,
timeout: 0,
received: 0,
pop_count: 0,
};
// interval defined 0.5s/every ping
let interval = Duration::from_millis(self.interval);
let options = PingOptions::new(
self.addr.clone(),
self.ip.clone(),
interval,
None,
);
Expand All @@ -111,96 +120,91 @@ impl PingTask {
let rtt = duration.as_secs_f64() * 1000.0;
let rtt_display: f64 = format!("{:.2}", rtt).parse().unwrap();
update_stats(
self.ip_data.clone(),
self.index,
self.addr.parse().unwrap(),
&mut ip_data,
self.ip.parse().unwrap(),
rtt_display,
);
}
PingResult::Timeout(_) => {
update_timeout_stats(self.ip_data.clone(), self.index, self.addr.parse().unwrap());
update_timeout_stats(&mut ip_data, self.ip.parse().unwrap());
}
PingResult::PingExited(status, err) => {
if status.code() != Option::from(0) {
let err = format!("host({}) ping err, reason: ping excited, status: {} err: {}", self.addr, err, status);
let err = format!("host({}) ping err, reason: ping excited, status: {} err: {}", self.ip, err, status);
set_error(self.errs.clone(), err);
}
}
PingResult::Unknown(msg) => {
let err = format!("host({}) ping err, reason:unknown, err: {}", self.addr, msg);
let err = format!("host({}) ping err, reason:unknown, err: {}", self.ip, msg);
set_error(self.errs.clone(), err);
}
}
}
Err(err) => {
let err = format!("host({}) ping err, reason: unknown, err: {}", self.addr, err);
let err = format!("host({}) ping err, reason: unknown, err: {}", self.ip, err);
set_error(self.errs.clone(), err);
}
}
draw_ui();


// send ping data to update
ping_update_tx.send(ip_data.clone())?;
}

Ok(())
}
}

// send ping to the target address
pub async fn send_ping<F>(
pub async fn send_ping(
addr: String,
i: usize,
ip: String,
errs: Arc<Mutex<Vec<String>>>,
count: usize,
interval: i32,
ip_data: Arc<Mutex<Vec<IpData>>>,
mut draw_ui: F,
running: Arc<Mutex<bool>>,
ping_update_tx: Arc<SyncSender<IpData>>,
) -> Result<(), Box<dyn Error>>
where
F: FnMut() + Send + 'static,
{
// draw ui first
draw_ui();
let task = PingTask::new(
addr.to_string(),
ip,
count,
interval as u64,
i,
ip_data,
running,
errs,
);
Ok(task.run(draw_ui).await?)
Ok(task.run(ping_update_tx).await?)
}

// update statistics
fn update_stats(ip_data: Arc<Mutex<Vec<IpData>>>, i: usize, addr: IpAddr, rtt: f64) {
let mut data = ip_data.lock().unwrap();
data[i].ip = addr.to_string();
data[i].received += 1;
data[i].last_attr = rtt;
data[i].rtts.push_back(rtt);
if data[i].min_rtt == 0.0 || rtt < data[i].min_rtt {
data[i].min_rtt = rtt;
fn update_stats(ip_data: &mut IpData, ip: IpAddr, rtt: f64) {
ip_data.ip = ip.to_string();
ip_data.received += 1;
ip_data.last_attr = rtt;
ip_data.rtts.push_back(rtt);
if ip_data.min_rtt == 0.0 || rtt < ip_data.min_rtt {
ip_data.min_rtt = rtt;
}
if rtt > data[i].max_rtt {
data[i].max_rtt = rtt;
if rtt > ip_data.max_rtt {
ip_data.max_rtt = rtt;
}
if data[i].rtts.len() > 10 {
data[i].rtts.pop_front();
data[i].pop_count += 1;
if ip_data.rtts.len() > 10 {
ip_data.rtts.pop_front();
ip_data.pop_count += 1;
}
}

// update timeout statistics
fn update_timeout_stats(ip_data: Arc<Mutex<Vec<IpData>>>, i: usize, addr: IpAddr) {
let mut data = ip_data.lock().unwrap();
data[i].rtts.push_back(-1.0);
data[i].last_attr = -1.0;
data[i].ip = addr.to_string();
data[i].timeout += 1;
if data[i].rtts.len() > 10 {
data[i].rtts.pop_front();
data[i].pop_count += 1;
fn update_timeout_stats(ip_data: &mut IpData, ip: IpAddr) {
ip_data.rtts.push_back(-1.0);
ip_data.last_attr = -1.0;
ip_data.ip = ip.to_string();
ip_data.timeout += 1;
if ip_data.rtts.len() > 10 {
ip_data.rtts.pop_front();
ip_data.pop_count += 1;
}
}

Expand Down
1 change: 1 addition & 0 deletions src/ui.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ use std::io::{self, Stdout};
use std::error::Error;
use ratatui::prelude::Modifier;


/// init terminal
pub fn init_terminal() -> Result<Terminal<CrosstermBackend<Stdout>>, Box<dyn Error>> {
let backend = CrosstermBackend::new(io::stdout());
Expand Down

0 comments on commit 1acfefa

Please sign in to comment.