Skip to content

Commit caa3702

Browse files
committed
Add rust async functions
1 parent a39e244 commit caa3702

File tree

6 files changed

+178
-29
lines changed

6 files changed

+178
-29
lines changed

c/opendht.cpp

+14
Original file line numberDiff line numberDiff line change
@@ -298,6 +298,20 @@ void dht_runner_bootstrap(dht_runner* r, const char* host, const char* service)
298298
runner->bootstrap(host);
299299
}
300300

301+
void dht_runner_bootstrap2(dht_runner* r, struct sockaddr *addrs[], socklen_t addrs_len[], dht_bootstrap_cb done_cb) {
302+
auto runner = reinterpret_cast<dht::DhtRunner*>(r);
303+
304+
std::vector<dht::SockAddr> sa;
305+
306+
size_t i = 0;
307+
while(addrs != nullptr && addrs[i] != nullptr) {
308+
sa.push_back(dht::SockAddr(addrs[i], addrs_len[i]));
309+
i++;
310+
}
311+
312+
runner->bootstrap(sa, done_cb);
313+
}
314+
301315
void dht_runner_get(dht_runner* r, const dht_infohash* h, dht_get_cb cb, dht_done_cb done_cb, void* cb_user_data) {
302316
auto runner = reinterpret_cast<dht::DhtRunner*>(r);
303317
auto hash = reinterpret_cast<const dht::InfoHash*>(h);

c/opendht_c.h

+2
Original file line numberDiff line numberDiff line change
@@ -94,6 +94,7 @@ OPENDHT_C_PUBLIC const char* dht_value_get_user_type(const dht_value* data);
9494
typedef bool (*dht_get_cb)(const dht_value* value, void* user_data);
9595
typedef bool (*dht_value_cb)(const dht_value* value, bool expired, void* user_data);
9696
typedef void (*dht_done_cb)(bool ok, void* user_data);
97+
typedef void (*dht_bootstrap_cb)(bool ok);
9798
typedef void (*dht_shutdown_cb)(void* user_data);
9899

99100
struct OPENDHT_C_PUBLIC dht_op_token;
@@ -139,6 +140,7 @@ OPENDHT_C_PUBLIC void dht_runner_run(dht_runner* runner, in_port_t port);
139140
OPENDHT_C_PUBLIC void dht_runner_run_config(dht_runner* runner, in_port_t port, const dht_runner_config* config);
140141
OPENDHT_C_PUBLIC void dht_runner_ping(dht_runner* runner, struct sockaddr* addr, socklen_t addr_len);
141142
OPENDHT_C_PUBLIC void dht_runner_bootstrap(dht_runner* runner, const char* host, const char* service);
143+
OPENDHT_C_PUBLIC void dht_runner_bootstrap2(dht_runner* r, struct sockaddr *addrs[], socklen_t addrs_len[], dht_bootstrap_cb done_cb);
142144
OPENDHT_C_PUBLIC void dht_runner_get(dht_runner* runner, const dht_infohash* hash, dht_get_cb cb, dht_done_cb done_cb, void* cb_user_data);
143145
OPENDHT_C_PUBLIC dht_op_token* dht_runner_listen(dht_runner* runner, const dht_infohash* hash, dht_value_cb cb, dht_shutdown_cb done_cb, void* cb_user_data);
144146
OPENDHT_C_PUBLIC void dht_runner_cancel_listen(dht_runner* runner, const dht_infohash* hash, dht_op_token* token);

rust/Cargo.toml

+2-1
Original file line numberDiff line numberDiff line change
@@ -8,4 +8,5 @@ edition = "2018"
88

99
[dependencies]
1010
libc="0.2.0"
11-
os_socketaddr="0.1.0"
11+
os_socketaddr="0.1.0"
12+
futures="0.3.4"

rust/examples/dhtnode.rs

+19-26
Original file line numberDiff line numberDiff line change
@@ -17,10 +17,11 @@
1717
*/
1818

1919
extern crate opendht;
20-
use std::{ thread, time };
20+
use std::{thread, time};
2121

2222
use opendht::{ InfoHash, DhtRunner, DhtRunnerConfig, Value };
2323
// use opendht::crypto::*;
24+
use futures::prelude::*;
2425

2526
fn main() {
2627
println!("{}", InfoHash::random());
@@ -39,35 +40,27 @@ fn main() {
3940
//let pk = PrivateKey::import("example.key", "");
4041
//config.set_identity(cert, pk);
4142
dht.run_config(1412, config);
42-
dht.bootstrap("bootstrap.jami.net", 4222);
43-
println!("Current node id: {}", dht.node_id());
43+
futures::executor::block_on(async {
44+
dht.bootstrap("bootstrap.jami.net", 4222);
4445

45-
let /* mut */ data = 42;
46-
let mut get_cb = |v: Box<Value>| {
47-
//data += 1;
48-
println!("GET: VALUE CB - data: {} - v: {}", data, v);
49-
true
50-
};
51-
let mut done_cb = |ok: bool| {
52-
println!("GET: DONE CB - data: {} - ok: {}", data, ok);
53-
};
46+
println!("Current node id: {}", dht.node_id());
5447

55-
dht.get(&InfoHash::get("alice"), &mut get_cb, &mut done_cb);
48+
let mut stream = dht.get_async(&InfoHash::get("bob"));
5649

57-
let mut put_done_cb = |ok: bool| {
58-
println!("PUT: DONE CB - data: {} - ok: {}", data, ok);
59-
};
60-
dht.put(&InfoHash::get("bob"), Value::new("hi!"), &mut put_done_cb, false);
50+
while let Ok(Some(value)) = stream.try_next().await {
51+
println!("GOT: VALUE - value: {}", value);
52+
}
6153

54+
dht.put_async(&InfoHash::get("bob"), Value::new("hi!"), false).await;
55+
56+
println!("Start listening /foo (sleep 10s)");
57+
let mut stream = dht.listen_async(&InfoHash::get("foo"));
58+
let one_min = time::Duration::from_secs(10);
59+
thread::sleep(one_min);
60+
while let Some((v, expired)) = stream.next().await {
61+
println!("LISTEN: DONE CB - v: {} - expired: {}", v, expired);
62+
}
63+
});
6264

63-
println!("Start listening /foo");
64-
let mut value_cb = |v, expired| {
65-
println!("LISTEN: DONE CB - data: {} - v: {} - expired: {}", data, v, expired);
66-
true
67-
};
68-
let token = dht.listen(&InfoHash::get("foo"), &mut value_cb);
69-
let one_min = time::Duration::from_secs(10);
70-
thread::sleep(one_min);
71-
dht.cancel_listen(&InfoHash::get("foo"), token);
7265
println!("Public ips: {:#?}", dht.public_addresses());
7366
}

rust/src/dhtrunner.rs

+138-2
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,8 @@ use std::ptr;
2525
pub use crate::ffi::*;
2626
use std::net::SocketAddr;
2727
use os_socketaddr::OsSocketAddr;
28+
use futures::prelude::*;
29+
use futures::channel::mpsc;
2830

2931
impl DhtRunnerConfig {
3032

@@ -114,13 +116,37 @@ extern fn get_handler_cb(v: *mut Value, ptr: *mut c_void) -> bool {
114116
}
115117
}
116118

119+
extern fn get_async_handler_cb(v: *mut Value, ptr: *mut c_void) -> bool {
120+
if ptr.is_null() {
121+
return true;
122+
}
123+
let f = unsafe {
124+
let tx = ptr as *mut mpsc::UnboundedSender<Option<std::io::Result<Box<Value>>>>;
125+
(*tx).send(Some(Ok((*v).boxed())))
126+
};
127+
futures::executor::block_on(f).is_ok()
128+
}
129+
117130
extern fn done_handler_cb(ok: bool, ptr: *mut c_void) {
118131
unsafe {
119132
let handler = Box::from_raw(ptr as *mut GetHandler);
120133
(*handler.done_cb)(ok)
121134
}
122135
}
123136

137+
extern fn done_async_handler_cb(ok: bool, ptr: *mut c_void) {
138+
if ptr.is_null() {
139+
return;
140+
}
141+
142+
let mut tx = unsafe {
143+
let ptr = ptr as *mut mpsc::UnboundedSender<Option<std::io::Result<Box<Value>>>>;
144+
Box::from_raw(ptr)
145+
};
146+
let item = if ok { None } else { Some(Err(std::io::Error::new(std::io::ErrorKind::Other, "get failed"))) };
147+
let _ = futures::executor::block_on((*tx).send(item));
148+
}
149+
124150
struct PutHandler<'a>
125151
{
126152
done_cb: &'a mut(dyn FnMut(bool))
@@ -165,6 +191,14 @@ extern fn listen_handler_done(ptr: *mut c_void) {
165191
}
166192
}
167193

194+
struct VerboseDrop<'a, T>(T, &'a str);
195+
196+
impl<'a, T> VerboseDrop<'a, T> {
197+
fn drop(&mut self) {
198+
println!("{}", self.1);
199+
}
200+
}
201+
168202
impl DhtRunner {
169203
pub fn new() -> Box<DhtRunner> {
170204
unsafe {
@@ -187,11 +221,30 @@ impl DhtRunner {
187221
pub fn bootstrap(&mut self, host: &str, service: u16) {
188222
unsafe {
189223
dht_runner_bootstrap(&mut *self,
190-
CString::new(host).unwrap().as_ptr(),
191-
CString::new(service.to_string()).unwrap().as_ptr())
224+
CString::new(host).unwrap().as_ptr(),
225+
CString::new(service.to_string()).unwrap().as_ptr())
192226
}
193227
}
194228

229+
pub async fn bootstrap_async<A: Iterator<Item=SocketAddr>>(&mut self, addrs: A) -> std::io::Result<bool> {
230+
let socks: Vec<OsSocketAddr> = addrs.map(|a| a.into()).collect();
231+
let sizes: Vec<libc::socklen_t> = socks.iter().map(|s| s.len()).collect();
232+
233+
let (tx, rx) = futures::channel::oneshot::channel();
234+
235+
let done_cb = Box::new(move |success| tx.send(success));
236+
let handler = Box::into_raw(done_cb) as *mut c_void;
237+
238+
unsafe {
239+
dht_runner_bootstrap2(&mut *self, socks.as_ptr() as *const *const _,
240+
sizes.as_ptr() as *const *const _, handler);
241+
}
242+
243+
let success = rx.await.expect("bootstrap_async() sender was dropped unexpectedly");
244+
245+
Ok(success)
246+
}
247+
195248
pub fn node_id(&self) -> InfoHash {
196249
unsafe {
197250
dht_runner_get_node_id(&*self)
@@ -217,6 +270,19 @@ impl DhtRunner {
217270
}
218271
}
219272

273+
pub fn get_async(&mut self, h: &InfoHash)
274+
-> impl TryStream<Ok=Box<Value>, Error=std::io::Error> + Unpin {
275+
let (tx, rx) = mpsc::unbounded();
276+
let tx = Box::new(tx);
277+
let tx = Box::into_raw(tx) as *mut c_void;
278+
279+
unsafe {
280+
dht_runner_get(&mut *self, h, get_async_handler_cb, done_async_handler_cb, tx)
281+
}
282+
rx.take_while(|item: &Option<_>| futures::future::ready(item.is_some()))
283+
.filter_map(|item| futures::future::ready(item))
284+
}
285+
220286
pub fn put<'a>(&mut self, h: &InfoHash, v: Box<Value>,
221287
done_cb: &'a mut(dyn FnMut(bool)), permanent: bool) {
222288
let handler = Box::new(PutHandler {
@@ -228,6 +294,21 @@ impl DhtRunner {
228294
}
229295
}
230296

297+
pub async fn put_async(&mut self, h: &InfoHash, v: Box<Value>, permanent: bool) -> bool {
298+
let (tx, rx) = futures::channel::oneshot::channel();
299+
let mut tx = Some(tx);
300+
301+
let mut done_cb = move |success| {
302+
if let Some(tx) = tx.take() {
303+
tx.send(success).expect("put_async() receiver was dropped unexpectedly");
304+
}
305+
};
306+
307+
self.put(h, v, &mut done_cb, permanent);
308+
309+
rx.await.expect("put_async() sender was dropped unexpectedly")
310+
}
311+
231312
pub fn put_signed<'a>(&mut self, h: &InfoHash, v: Box<Value>,
232313
done_cb: &'a mut(dyn FnMut(bool)), permanent: bool) {
233314
let handler = Box::new(PutHandler {
@@ -239,6 +320,21 @@ impl DhtRunner {
239320
}
240321
}
241322

323+
pub async fn put_signed_async(&mut self, h: &InfoHash, v: Box<Value>, permanent: bool) -> bool {
324+
let (tx, rx) = futures::channel::oneshot::channel();
325+
let mut tx = Some(tx);
326+
327+
let mut done_cb = move |success| {
328+
if let Some(tx) = tx.take() {
329+
tx.send(success).expect("put_signed_async() receiver was dropped unexpectedly");
330+
}
331+
};
332+
333+
self.put_signed(h, v, &mut done_cb, permanent);
334+
335+
rx.await.expect("put_signed_async() sender was dropped unexpectedly")
336+
}
337+
242338
pub fn put_encrypted<'a>(&mut self, h: &InfoHash, to: &InfoHash, v: Box<Value>,
243339
done_cb: &'a mut(dyn FnMut(bool)), permanent: bool) {
244340
let handler = Box::new(PutHandler {
@@ -250,6 +346,22 @@ impl DhtRunner {
250346
}
251347
}
252348

349+
pub async fn put_encrypted_async(&mut self, h: &InfoHash, to: &InfoHash, v: Box<Value>,
350+
permanent: bool) -> bool {
351+
let (tx, rx) = futures::channel::oneshot::channel();
352+
let mut tx = Some(tx);
353+
354+
let mut done_cb = move |success| {
355+
if let Some(tx) = tx.take() {
356+
tx.send(success).expect("put_encrypted_async() receiver was dropped unexpectedly");
357+
}
358+
};
359+
360+
self.put_encrypted(h, to, v, &mut done_cb, permanent);
361+
362+
rx.await.expect("put_encrypted_async() sender was dropped unexpectedly")
363+
}
364+
253365
pub fn cancel_put<'a>(&mut self, h: &InfoHash, vid: u64) {
254366
unsafe {
255367
dht_runner_cancel_put(&mut *self, h, vid)
@@ -267,6 +379,20 @@ impl DhtRunner {
267379
}
268380
}
269381

382+
pub fn listen_async(&mut self, h: &InfoHash)
383+
-> impl Stream<Item=(Box<Value>, bool)> + Unpin
384+
{
385+
let (mut tx, rx) = mpsc::unbounded();
386+
387+
let mut value_cb = move |v, expired| {
388+
futures::executor::block_on(tx.send((v, expired))).is_ok()
389+
};
390+
391+
let _token = self.listen(h, &mut value_cb);
392+
393+
return Box::pin(rx);
394+
}
395+
270396
pub fn cancel_listen(&mut self, h: &InfoHash, token: Box<OpToken>) {
271397
unsafe {
272398
dht_runner_cancel_listen(&mut *self, h, &*token)
@@ -282,6 +408,16 @@ impl DhtRunner {
282408
}
283409
}
284410

411+
pub async fn shutdown_async<'a>(&'a mut self) -> bool {
412+
let (tx, rx) = futures::channel::oneshot::channel();
413+
let tx = Box::new(tx);
414+
let ptr = Box::into_raw(tx) as *mut c_void;
415+
416+
self.shutdown(done_async_handler_cb, ptr);
417+
418+
rx.await.expect("shutdown_async() sender was dropped unexpectedly")
419+
}
420+
285421
pub fn public_addresses(&self) -> Vec<SocketAddr> {
286422
let mut result = Vec::new();
287423
unsafe {

rust/src/ffi.rs

+3
Original file line numberDiff line numberDiff line change
@@ -189,6 +189,9 @@ extern {
189189
pub fn dht_runner_run(dht: *mut DhtRunner, port: in_port_t);
190190
pub fn dht_runner_run_config(dht: *mut DhtRunner, port: in_port_t, config: *const DhtRunnerConfig);
191191
pub fn dht_runner_bootstrap(dht: *mut DhtRunner, host: *const c_char, service: *const c_char);
192+
pub fn dht_runner_bootstrap2(dht: *mut DhtRunner, addrs: *const *const OsSocketAddr,
193+
addr_lens: *const *const libc::socklen_t,
194+
done_cb: *mut c_void);
192195
pub fn dht_runner_get(dht: *mut DhtRunner, h: *const InfoHash,
193196
get_cb: extern fn(*mut Value, *mut c_void) -> bool,
194197
done_cb: extern fn(bool, *mut c_void),

0 commit comments

Comments
 (0)