Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

support for sprayandwait routing strategy #29

Merged
merged 7 commits into from
Jul 29, 2022
36 changes: 36 additions & 0 deletions core/dtn7/src/bin/dtnd.rs
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,18 @@ async fn main() -> Result<(), std::io::Error> {
dtn7::routing::routing_algorithms().join(", ")
).as_str())
.takes_value(true),
)
.arg(
Arg::new("routing_options")
.short('R')
.long("routing-options")
.value_name("ROUTING-OPTIONS")
.help(format!(
"Set routing options: \n{}",
dtn7::routing::routing_options().join("\n")
).as_str())
.takes_value(true)
.multiple_occurrences(true),
).arg(
Arg::new("db")
.short('D')
Expand Down Expand Up @@ -352,6 +364,30 @@ Tag 255 takes 5 arguments and is interpreted as address. Usage: -S 255:'Samplest
cfg.routing = r.into();
}
}
if let Some(r_opts) = matches.values_of("routing_options") {
for r_opt in r_opts {
let parts: Vec<&str> = r_opt.split('=').collect();
if parts.len() != 2 {
panic!("Invalid routing option: {}", r_opt);
}
let key = parts[0];
let value = parts[1];
let key_parts: Vec<&str> = key.split('.').collect();
if key_parts.len() != 2 {
panic!("Invalid routing option: {}", r_opt);
}
let r_algo = key_parts[0];
let r_opt = key_parts[1];
if !cfg.routing_settings.contains_key(r_algo) {
cfg.routing_settings.insert(r_algo.into(), HashMap::new());
}
cfg.routing_settings
.get_mut(r_algo)
.unwrap()
.insert(r_opt.to_string(), value.to_string());
}
//cfg.routing_options = r_opts.map(|s| s.to_string()).collect();
}

if let Some(db) = matches.value_of("db") {
if dtn7::core::store::bundle_stores().contains(&db) {
Expand Down
28 changes: 22 additions & 6 deletions core/dtn7/src/dtnconfig.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use log::{debug, error};
use rand::distributions::Alphanumeric;
use rand::{thread_rng, Rng};
use serde::Serialize;
use std::collections::HashMap;
use std::collections::{BTreeMap, HashMap};
use std::net::SocketAddr;
use std::path::PathBuf;
use std::str::FromStr;
Expand All @@ -26,13 +26,14 @@ pub struct DtnConfig {
pub webport: u16,
pub announcement_interval: Duration,
pub disable_neighbour_discovery: bool,
pub discovery_destinations: HashMap<String, u32>,
pub discovery_destinations: BTreeMap<String, u32>,
pub janitor_interval: Duration,
pub endpoints: Vec<String>,
pub clas: Vec<(CLAsAvailable, HashMap<String, String>)>,
pub cla_global_settings: HashMap<CLAsAvailable, HashMap<String, String>>,
pub services: HashMap<u8, String>,
pub services: BTreeMap<u8, String>,
pub routing: String,
pub routing_settings: BTreeMap<String, HashMap<String, String>>,
pub peer_timeout: Duration,
pub statics: Vec<DtnPeer>,
pub workdir: PathBuf,
Expand Down Expand Up @@ -92,8 +93,21 @@ impl From<PathBuf> for DtnConfig {
debug!("nodeid: {:?}", dtncfg.host_eid);
dtncfg.nodeid = dtncfg.host_eid.to_string();

dtncfg.routing = s.get_str("routing").unwrap_or(dtncfg.routing);
dtncfg.routing = s.get_str("routing.strategy").unwrap_or(dtncfg.routing);
debug!("routing: {:?}", dtncfg.routing);
if let Ok(routing_settings) = s.get_table("routing.settings") {
for (k, v) in routing_settings.iter() {
let tab = v.clone().into_table().unwrap();
let mut routing_settings = HashMap::new();
for (k, v) in tab {
routing_settings.insert(k, v.into_str().unwrap());
}
dtncfg
.routing_settings
.insert(k.to_string(), routing_settings);
}
}
debug!("routing options: {:?}", dtncfg.routing_settings);

dtncfg.workdir = if let Ok(wd) = s.get_str("workdir") {
PathBuf::from(wd)
Expand Down Expand Up @@ -244,14 +258,15 @@ impl DtnConfig {
host_eid: local_node_id,
announcement_interval: "2s".parse::<humantime::Duration>().unwrap().into(),
disable_neighbour_discovery: false,
discovery_destinations: HashMap::new(),
discovery_destinations: BTreeMap::new(),
webport: 3000,
janitor_interval: "10s".parse::<humantime::Duration>().unwrap().into(),
endpoints: Vec::new(),
clas: Vec::new(),
cla_global_settings: HashMap::new(),
services: HashMap::new(),
services: BTreeMap::new(),
routing: "epidemic".into(),
routing_settings: BTreeMap::new(),
peer_timeout: "20s".parse::<humantime::Duration>().unwrap().into(),
statics: Vec::new(),
workdir: std::env::current_dir().unwrap(),
Expand Down Expand Up @@ -281,6 +296,7 @@ impl DtnConfig {
self.cla_global_settings = cfg.cla_global_settings;
self.services = cfg.services;
self.routing = cfg.routing;
self.routing_settings = cfg.routing_settings;
self.peer_timeout = cfg.peer_timeout;
self.statics = cfg.statics;
self.workdir = cfg.workdir;
Expand Down
3 changes: 3 additions & 0 deletions core/dtn7/src/dtnd/daemon.rs
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,9 @@ pub async fn start_dtnd(cfg: DtnConfig) -> anyhow::Result<()> {

info!("RoutingAgent: {}", routing);

let routing_options = (*CONFIG.lock()).routing_settings.clone();
info!("RoutingOptions: {:?}", routing_options);

let clas = (*CONFIG.lock()).clas.clone();
for (cla, local_settings) in &clas {
info!("Adding CLA: {:?}", cla);
Expand Down
4 changes: 2 additions & 2 deletions core/dtn7/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,14 +25,14 @@ use crate::core::store::BundleStoresEnum;
use anyhow::{bail, Context, Result};
use lazy_static::*;
use parking_lot::Mutex;
use std::collections::HashMap;
use std::collections::{BTreeMap, HashMap};
use tokio::sync::mpsc::Sender;
use tokio::sync::oneshot;

lazy_static! {
pub static ref CONFIG: Mutex<DtnConfig> = Mutex::new(DtnConfig::new());
pub static ref DTNCORE: Mutex<DtnCore> = Mutex::new(DtnCore::new());
pub static ref PEERS: Mutex<HashMap<String, DtnPeer>> = Mutex::new(HashMap::new());
pub static ref PEERS: Mutex<BTreeMap<String, DtnPeer>> = Mutex::new(BTreeMap::new());
pub static ref STATS: Mutex<DtnStatistics> = Mutex::new(DtnStatistics::new());
pub static ref SENDERTASK: Mutex<Option<Sender<Bundle>>> = Mutex::new(None);
pub static ref STORE: Mutex<BundleStoresEnum> = Mutex::new(InMemoryBundleStore::new().into());
Expand Down
4 changes: 2 additions & 2 deletions core/dtn7/src/routing/epidemic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ impl EpidemicRoutingAgentCore {
}
}

async fn sender_for_bundle(mut rx: mpsc::Receiver<RoutingCmd>) {
async fn handle_routing_cmd(mut rx: mpsc::Receiver<RoutingCmd>) {
let mut core: EpidemicRoutingAgentCore = EpidemicRoutingAgentCore::new();

while let Some(cmd) = rx.recv().await {
Expand Down Expand Up @@ -136,7 +136,7 @@ impl EpidemicRoutingAgent {
pub fn new() -> EpidemicRoutingAgent {
let (tx, rx) = mpsc::channel(100);
tokio::spawn(async move {
sender_for_bundle(rx).await;
handle_routing_cmd(rx).await;
});

EpidemicRoutingAgent { tx }
Expand Down
4 changes: 2 additions & 2 deletions core/dtn7/src/routing/erouting/mod.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use crate::{peers_get_for_node, BundlePack, DtnPeer, PeerAddress, RoutingNotifcation};
use bp7::{Bundle, EndpointID};
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
use std::collections::{BTreeMap, HashMap};

pub mod processing;
pub mod ws_client;
Expand Down Expand Up @@ -141,7 +141,7 @@ pub struct DroppedPeer {

#[derive(Serialize, Deserialize, Clone)]
pub struct PeerState {
pub peers: HashMap<String, DtnPeer>,
pub peers: BTreeMap<String, DtnPeer>,
}

#[derive(Serialize, Deserialize, Clone)]
Expand Down
10 changes: 9 additions & 1 deletion core/dtn7/src/routing/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ pub mod erouting;
pub mod external;
pub mod flooding;
pub mod sink;
pub mod sprayandwait;

use crate::cla::ClaSenderTask;
use crate::core::bundlepack::BundlePack;
Expand All @@ -15,6 +16,7 @@ use epidemic::EpidemicRoutingAgent;
use external::ExternalRoutingAgent;
use flooding::FloodingRoutingAgent;
use sink::SinkRoutingAgent;
use sprayandwait::SprayAndWaitRoutingAgent;
use std::fmt::Debug;
use std::fmt::Display;
use tokio::sync::{mpsc, oneshot};
Expand All @@ -34,6 +36,7 @@ pub enum RoutingAgentsEnum {
FloodingRoutingAgent,
SinkRoutingAgent,
ExternalRoutingAgent,
SprayAndWaitRoutingAgent,
}

pub enum RoutingCmd {
Expand All @@ -57,7 +60,11 @@ pub trait RoutingAgent: Debug + Display {
}

pub fn routing_algorithms() -> Vec<&'static str> {
vec!["epidemic", "flooding", "sink", "external"]
vec!["epidemic", "flooding", "sink", "external", "sprayandwait"]
}

pub fn routing_options() -> Vec<&'static str> {
vec!["sprayandwait.num_copies=<int>"]
}

pub fn new(routingagent: &str) -> RoutingAgentsEnum {
Expand All @@ -66,6 +73,7 @@ pub fn new(routingagent: &str) -> RoutingAgentsEnum {
"epidemic" => EpidemicRoutingAgent::new().into(),
"sink" => SinkRoutingAgent::new().into(),
"external" => ExternalRoutingAgent::new().into(),
"sprayandwait" => SprayAndWaitRoutingAgent::new().into(),
_ => panic!("Unknown routing agent {}", routingagent),
}
}
Loading