Skip to content

Commit 7a3bfc3

Browse files
authored
feat(routing)!: support for sprayandwait routing strategy (#29)
* refactor: changed PEERS to a BTreeMap from HashMap * refactor(routing): epidemic `sender_for_bundle` renamed to the more fitting `handle_routing_cmd` * feat(routing): added initial implementation of classic spray and wait routing * feat!: added routing options and changed config entry for routing to `routing.strategy`. now `sprayandwait.num_copies` can be set at startup * fix: changed shell test scripts to work also on old bash versions plus provide a nice wrapper for `run_all_tests.sh` to hide the verbose output * test: run_all_tests.sh can now display all output again, even if no error occured by setting the env var VERBOSE=1 * doc: commented sprayandwait routing agent code
1 parent 0560789 commit 7a3bfc3

File tree

14 files changed

+462
-33
lines changed

14 files changed

+462
-33
lines changed

core/dtn7/src/bin/dtnd.rs

+36
Original file line numberDiff line numberDiff line change
@@ -156,6 +156,18 @@ async fn main() -> Result<(), std::io::Error> {
156156
dtn7::routing::routing_algorithms().join(", ")
157157
).as_str())
158158
.takes_value(true),
159+
)
160+
.arg(
161+
Arg::new("routing_options")
162+
.short('R')
163+
.long("routing-options")
164+
.value_name("ROUTING-OPTIONS")
165+
.help(format!(
166+
"Set routing options: \n{}",
167+
dtn7::routing::routing_options().join("\n")
168+
).as_str())
169+
.takes_value(true)
170+
.multiple_occurrences(true),
159171
).arg(
160172
Arg::new("db")
161173
.short('D')
@@ -352,6 +364,30 @@ Tag 255 takes 5 arguments and is interpreted as address. Usage: -S 255:'Samplest
352364
cfg.routing = r.into();
353365
}
354366
}
367+
if let Some(r_opts) = matches.values_of("routing_options") {
368+
for r_opt in r_opts {
369+
let parts: Vec<&str> = r_opt.split('=').collect();
370+
if parts.len() != 2 {
371+
panic!("Invalid routing option: {}", r_opt);
372+
}
373+
let key = parts[0];
374+
let value = parts[1];
375+
let key_parts: Vec<&str> = key.split('.').collect();
376+
if key_parts.len() != 2 {
377+
panic!("Invalid routing option: {}", r_opt);
378+
}
379+
let r_algo = key_parts[0];
380+
let r_opt = key_parts[1];
381+
if !cfg.routing_settings.contains_key(r_algo) {
382+
cfg.routing_settings.insert(r_algo.into(), HashMap::new());
383+
}
384+
cfg.routing_settings
385+
.get_mut(r_algo)
386+
.unwrap()
387+
.insert(r_opt.to_string(), value.to_string());
388+
}
389+
//cfg.routing_options = r_opts.map(|s| s.to_string()).collect();
390+
}
355391

356392
if let Some(db) = matches.value_of("db") {
357393
if dtn7::core::store::bundle_stores().contains(&db) {

core/dtn7/src/dtnconfig.rs

+22-6
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@ use log::{debug, error};
77
use rand::distributions::Alphanumeric;
88
use rand::{thread_rng, Rng};
99
use serde::Serialize;
10-
use std::collections::HashMap;
10+
use std::collections::{BTreeMap, HashMap};
1111
use std::net::SocketAddr;
1212
use std::path::PathBuf;
1313
use std::str::FromStr;
@@ -26,13 +26,14 @@ pub struct DtnConfig {
2626
pub webport: u16,
2727
pub announcement_interval: Duration,
2828
pub disable_neighbour_discovery: bool,
29-
pub discovery_destinations: HashMap<String, u32>,
29+
pub discovery_destinations: BTreeMap<String, u32>,
3030
pub janitor_interval: Duration,
3131
pub endpoints: Vec<String>,
3232
pub clas: Vec<(CLAsAvailable, HashMap<String, String>)>,
3333
pub cla_global_settings: HashMap<CLAsAvailable, HashMap<String, String>>,
34-
pub services: HashMap<u8, String>,
34+
pub services: BTreeMap<u8, String>,
3535
pub routing: String,
36+
pub routing_settings: BTreeMap<String, HashMap<String, String>>,
3637
pub peer_timeout: Duration,
3738
pub statics: Vec<DtnPeer>,
3839
pub workdir: PathBuf,
@@ -92,8 +93,21 @@ impl From<PathBuf> for DtnConfig {
9293
debug!("nodeid: {:?}", dtncfg.host_eid);
9394
dtncfg.nodeid = dtncfg.host_eid.to_string();
9495

95-
dtncfg.routing = s.get_str("routing").unwrap_or(dtncfg.routing);
96+
dtncfg.routing = s.get_str("routing.strategy").unwrap_or(dtncfg.routing);
9697
debug!("routing: {:?}", dtncfg.routing);
98+
if let Ok(routing_settings) = s.get_table("routing.settings") {
99+
for (k, v) in routing_settings.iter() {
100+
let tab = v.clone().into_table().unwrap();
101+
let mut routing_settings = HashMap::new();
102+
for (k, v) in tab {
103+
routing_settings.insert(k, v.into_str().unwrap());
104+
}
105+
dtncfg
106+
.routing_settings
107+
.insert(k.to_string(), routing_settings);
108+
}
109+
}
110+
debug!("routing options: {:?}", dtncfg.routing_settings);
97111

98112
dtncfg.workdir = if let Ok(wd) = s.get_str("workdir") {
99113
PathBuf::from(wd)
@@ -244,14 +258,15 @@ impl DtnConfig {
244258
host_eid: local_node_id,
245259
announcement_interval: "2s".parse::<humantime::Duration>().unwrap().into(),
246260
disable_neighbour_discovery: false,
247-
discovery_destinations: HashMap::new(),
261+
discovery_destinations: BTreeMap::new(),
248262
webport: 3000,
249263
janitor_interval: "10s".parse::<humantime::Duration>().unwrap().into(),
250264
endpoints: Vec::new(),
251265
clas: Vec::new(),
252266
cla_global_settings: HashMap::new(),
253-
services: HashMap::new(),
267+
services: BTreeMap::new(),
254268
routing: "epidemic".into(),
269+
routing_settings: BTreeMap::new(),
255270
peer_timeout: "20s".parse::<humantime::Duration>().unwrap().into(),
256271
statics: Vec::new(),
257272
workdir: std::env::current_dir().unwrap(),
@@ -281,6 +296,7 @@ impl DtnConfig {
281296
self.cla_global_settings = cfg.cla_global_settings;
282297
self.services = cfg.services;
283298
self.routing = cfg.routing;
299+
self.routing_settings = cfg.routing_settings;
284300
self.peer_timeout = cfg.peer_timeout;
285301
self.statics = cfg.statics;
286302
self.workdir = cfg.workdir;

core/dtn7/src/dtnd/daemon.rs

+3
Original file line numberDiff line numberDiff line change
@@ -105,6 +105,9 @@ pub async fn start_dtnd(cfg: DtnConfig) -> anyhow::Result<()> {
105105

106106
info!("RoutingAgent: {}", routing);
107107

108+
let routing_options = (*CONFIG.lock()).routing_settings.clone();
109+
info!("RoutingOptions: {:?}", routing_options);
110+
108111
let clas = (*CONFIG.lock()).clas.clone();
109112
for (cla, local_settings) in &clas {
110113
info!("Adding CLA: {:?}", cla);

core/dtn7/src/lib.rs

+2-2
Original file line numberDiff line numberDiff line change
@@ -25,14 +25,14 @@ use crate::core::store::BundleStoresEnum;
2525
use anyhow::{bail, Context, Result};
2626
use lazy_static::*;
2727
use parking_lot::Mutex;
28-
use std::collections::HashMap;
28+
use std::collections::{BTreeMap, HashMap};
2929
use tokio::sync::mpsc::Sender;
3030
use tokio::sync::oneshot;
3131

3232
lazy_static! {
3333
pub static ref CONFIG: Mutex<DtnConfig> = Mutex::new(DtnConfig::new());
3434
pub static ref DTNCORE: Mutex<DtnCore> = Mutex::new(DtnCore::new());
35-
pub static ref PEERS: Mutex<HashMap<String, DtnPeer>> = Mutex::new(HashMap::new());
35+
pub static ref PEERS: Mutex<BTreeMap<String, DtnPeer>> = Mutex::new(BTreeMap::new());
3636
pub static ref STATS: Mutex<DtnStatistics> = Mutex::new(DtnStatistics::new());
3737
pub static ref SENDERTASK: Mutex<Option<Sender<Bundle>>> = Mutex::new(None);
3838
pub static ref STORE: Mutex<BundleStoresEnum> = Mutex::new(InMemoryBundleStore::new().into());

core/dtn7/src/routing/epidemic.rs

+2-2
Original file line numberDiff line numberDiff line change
@@ -75,7 +75,7 @@ impl EpidemicRoutingAgentCore {
7575
}
7676
}
7777

78-
async fn sender_for_bundle(mut rx: mpsc::Receiver<RoutingCmd>) {
78+
async fn handle_routing_cmd(mut rx: mpsc::Receiver<RoutingCmd>) {
7979
let mut core: EpidemicRoutingAgentCore = EpidemicRoutingAgentCore::new();
8080

8181
while let Some(cmd) = rx.recv().await {
@@ -136,7 +136,7 @@ impl EpidemicRoutingAgent {
136136
pub fn new() -> EpidemicRoutingAgent {
137137
let (tx, rx) = mpsc::channel(100);
138138
tokio::spawn(async move {
139-
sender_for_bundle(rx).await;
139+
handle_routing_cmd(rx).await;
140140
});
141141

142142
EpidemicRoutingAgent { tx }

core/dtn7/src/routing/erouting/mod.rs

+2-2
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
use crate::{peers_get_for_node, BundlePack, DtnPeer, PeerAddress, RoutingNotifcation};
22
use bp7::{Bundle, EndpointID};
33
use serde::{Deserialize, Serialize};
4-
use std::collections::HashMap;
4+
use std::collections::{BTreeMap, HashMap};
55

66
pub mod processing;
77
pub mod ws_client;
@@ -141,7 +141,7 @@ pub struct DroppedPeer {
141141

142142
#[derive(Serialize, Deserialize, Clone)]
143143
pub struct PeerState {
144-
pub peers: HashMap<String, DtnPeer>,
144+
pub peers: BTreeMap<String, DtnPeer>,
145145
}
146146

147147
#[derive(Serialize, Deserialize, Clone)]

core/dtn7/src/routing/mod.rs

+9-1
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ pub mod erouting;
33
pub mod external;
44
pub mod flooding;
55
pub mod sink;
6+
pub mod sprayandwait;
67

78
use crate::cla::ClaSenderTask;
89
use crate::core::bundlepack::BundlePack;
@@ -15,6 +16,7 @@ use epidemic::EpidemicRoutingAgent;
1516
use external::ExternalRoutingAgent;
1617
use flooding::FloodingRoutingAgent;
1718
use sink::SinkRoutingAgent;
19+
use sprayandwait::SprayAndWaitRoutingAgent;
1820
use std::fmt::Debug;
1921
use std::fmt::Display;
2022
use tokio::sync::{mpsc, oneshot};
@@ -34,6 +36,7 @@ pub enum RoutingAgentsEnum {
3436
FloodingRoutingAgent,
3537
SinkRoutingAgent,
3638
ExternalRoutingAgent,
39+
SprayAndWaitRoutingAgent,
3740
}
3841

3942
pub enum RoutingCmd {
@@ -57,7 +60,11 @@ pub trait RoutingAgent: Debug + Display {
5760
}
5861

5962
pub fn routing_algorithms() -> Vec<&'static str> {
60-
vec!["epidemic", "flooding", "sink", "external"]
63+
vec!["epidemic", "flooding", "sink", "external", "sprayandwait"]
64+
}
65+
66+
pub fn routing_options() -> Vec<&'static str> {
67+
vec!["sprayandwait.num_copies=<int>"]
6168
}
6269

6370
pub fn new(routingagent: &str) -> RoutingAgentsEnum {
@@ -66,6 +73,7 @@ pub fn new(routingagent: &str) -> RoutingAgentsEnum {
6673
"epidemic" => EpidemicRoutingAgent::new().into(),
6774
"sink" => SinkRoutingAgent::new().into(),
6875
"external" => ExternalRoutingAgent::new().into(),
76+
"sprayandwait" => SprayAndWaitRoutingAgent::new().into(),
6977
_ => panic!("Unknown routing agent {}", routingagent),
7078
}
7179
}

0 commit comments

Comments
 (0)