Skip to content

Commit ec95976

Browse files
committed
Add rust async functions
1 parent a39e244 commit ec95976

File tree

6 files changed

+244
-3
lines changed

6 files changed

+244
-3
lines changed

c/opendht.cpp

+18
Original file line numberDiff line numberDiff line change
@@ -298,6 +298,24 @@ void dht_runner_bootstrap(dht_runner* r, const char* host, const char* service)
298298
runner->bootstrap(host);
299299
}
300300

301+
void dht_runner_bootstrap2(dht_runner* r, struct sockaddr *addrs[], socklen_t addrs_len[], dht_done_cb done_cb, void* cb_user_data) {
302+
auto runner = reinterpret_cast<dht::DhtRunner*>(r);
303+
304+
std::vector<dht::SockAddr> sa;
305+
306+
size_t i = 0;
307+
while(addrs != nullptr && addrs[i] != nullptr) {
308+
sa.push_back(dht::SockAddr(addrs[i], addrs_len[i]));
309+
i++;
310+
}
311+
312+
auto cb = [done_cb, cb_user_data](bool success) {
313+
done_cb(success, cb_user_data);
314+
};
315+
316+
runner->bootstrap(sa, cb);
317+
}
318+
301319
void dht_runner_get(dht_runner* r, const dht_infohash* h, dht_get_cb cb, dht_done_cb done_cb, void* cb_user_data) {
302320
auto runner = reinterpret_cast<dht::DhtRunner*>(r);
303321
auto hash = reinterpret_cast<const dht::InfoHash*>(h);

c/opendht_c.h

+2
Original file line numberDiff line numberDiff line change
@@ -94,6 +94,7 @@ OPENDHT_C_PUBLIC const char* dht_value_get_user_type(const dht_value* data);
9494
typedef bool (*dht_get_cb)(const dht_value* value, void* user_data);
9595
typedef bool (*dht_value_cb)(const dht_value* value, bool expired, void* user_data);
9696
typedef void (*dht_done_cb)(bool ok, void* user_data);
97+
typedef void (*dht_bootstrap_cb)(bool ok);
9798
typedef void (*dht_shutdown_cb)(void* user_data);
9899

99100
struct OPENDHT_C_PUBLIC dht_op_token;
@@ -139,6 +140,7 @@ OPENDHT_C_PUBLIC void dht_runner_run(dht_runner* runner, in_port_t port);
139140
OPENDHT_C_PUBLIC void dht_runner_run_config(dht_runner* runner, in_port_t port, const dht_runner_config* config);
140141
OPENDHT_C_PUBLIC void dht_runner_ping(dht_runner* runner, struct sockaddr* addr, socklen_t addr_len);
141142
OPENDHT_C_PUBLIC void dht_runner_bootstrap(dht_runner* runner, const char* host, const char* service);
143+
OPENDHT_C_PUBLIC void dht_runner_bootstrap2(dht_runner* r, struct sockaddr *addrs[], socklen_t addrs_len[], dht_done_cb done_cb, void* cb_user_data);
142144
OPENDHT_C_PUBLIC void dht_runner_get(dht_runner* runner, const dht_infohash* hash, dht_get_cb cb, dht_done_cb done_cb, void* cb_user_data);
143145
OPENDHT_C_PUBLIC dht_op_token* dht_runner_listen(dht_runner* runner, const dht_infohash* hash, dht_value_cb cb, dht_shutdown_cb done_cb, void* cb_user_data);
144146
OPENDHT_C_PUBLIC void dht_runner_cancel_listen(dht_runner* runner, const dht_infohash* hash, dht_op_token* token);

rust/Cargo.toml

+2-1
Original file line numberDiff line numberDiff line change
@@ -8,4 +8,5 @@ edition = "2018"
88

99
[dependencies]
1010
libc="0.2.0"
11-
os_socketaddr="0.1.0"
11+
os_socketaddr="0.1.0"
12+
futures="0.3.4"

rust/examples/dhtnode_async.rs

+69
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,69 @@
1+
/*
2+
* Copyright (C) 2014-2020 Savoir-faire Linux Inc.
3+
* Author: Sébastien Blin <[email protected]>
4+
*
5+
* This program is free software; you can redistribute it and/or modify
6+
* it under the terms of the GNU General Public License as published by
7+
* the Free Software Foundation; either version 3 of the License, or
8+
* (at your option) any later version.
9+
*
10+
* This program is distributed in the hope that it will be useful,
11+
* but WITHOUT ANY WARRANTY; without even the implied warranty of
12+
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13+
* GNU General Public License for more details.
14+
*
15+
* You should have received a copy of the GNU General Public License
16+
* along with this program. If not, see <https://www.gnu.org/licenses/>.
17+
*/
18+
19+
extern crate opendht;
20+
use std::{thread, time};
21+
22+
use opendht::{ InfoHash, DhtRunner, DhtRunnerConfig, Value };
23+
// use opendht::crypto::*;
24+
use futures::prelude::*;
25+
26+
fn main() {
27+
println!("{}", InfoHash::random());
28+
println!("{}", InfoHash::new());
29+
println!("{}", InfoHash::new().is_zero());
30+
println!("{}", InfoHash::get("alice"));
31+
println!("{}", InfoHash::get("alice").is_zero());
32+
33+
34+
let mut dht = DhtRunner::new();
35+
let /*mut*/ config = DhtRunnerConfig::new();
36+
//// If you want to inject a certificate, uncomment the following lines and previous mut.
37+
//// Note: you can generate a certificate with
38+
//// openssl req -x509 -newkey rsa:4096 -sha256 -days 3650 -nodes -keyout example.key -out example.crt -subj /CN=example.com
39+
//let cert = DhtCertificate::import("example.crt").ok().expect("Invalid cert file");
40+
//let pk = PrivateKey::import("example.key", "");
41+
//config.set_identity(cert, pk);
42+
dht.run_config(1412, config);
43+
use std::net::ToSocketAddrs;
44+
let addrs = "bootstrap.jami.net:4222".to_socket_addrs().unwrap();
45+
46+
futures::executor::block_on(async {
47+
let r = dht.bootstrap_async(addrs).await;
48+
49+
println!("Current node id: {}", dht.node_id());
50+
51+
let mut stream = dht.get_async(&InfoHash::get("bob"));
52+
53+
while let Ok(Some(value)) = stream.try_next().await {
54+
println!("GOT: VALUE - value: {}", value);
55+
}
56+
57+
dht.put_async(&InfoHash::get("bob"), Value::new("hi!"), false).await;
58+
59+
println!("Start listening /foo (sleep 10s)");
60+
let mut stream = dht.listen_async(&InfoHash::get("foo"));
61+
let one_min = time::Duration::from_secs(10);
62+
thread::sleep(one_min);
63+
while let Some((v, expired)) = stream.next().await {
64+
println!("LISTEN: DONE CB - v: {} - expired: {}", v, expired);
65+
}
66+
});
67+
68+
println!("Public ips: {:#?}", dht.public_addresses());
69+
}

rust/src/dhtrunner.rs

+150-2
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,8 @@ use std::ptr;
2525
pub use crate::ffi::*;
2626
use std::net::SocketAddr;
2727
use os_socketaddr::OsSocketAddr;
28+
use futures::prelude::*;
29+
use futures::channel::mpsc;
2830

2931
impl DhtRunnerConfig {
3032

@@ -114,13 +116,49 @@ extern fn get_handler_cb(v: *mut Value, ptr: *mut c_void) -> bool {
114116
}
115117
}
116118

119+
extern fn get_async_handler_cb(v: *mut Value, ptr: *mut c_void) -> bool {
120+
if ptr.is_null() {
121+
return true;
122+
}
123+
let f = unsafe {
124+
let tx = ptr as *mut mpsc::UnboundedSender<Option<std::io::Result<Box<Value>>>>;
125+
(*tx).send(Some(Ok((*v).boxed())))
126+
};
127+
futures::executor::block_on(f).is_ok()
128+
}
129+
117130
extern fn done_handler_cb(ok: bool, ptr: *mut c_void) {
118131
unsafe {
119132
let handler = Box::from_raw(ptr as *mut GetHandler);
120133
(*handler.done_cb)(ok)
121134
}
122135
}
123136

137+
extern fn done_async_handler_cb(ok: bool, ptr: *mut c_void) {
138+
if ptr.is_null() {
139+
return;
140+
}
141+
142+
let mut tx = unsafe {
143+
let ptr = ptr as *mut mpsc::UnboundedSender<Option<std::io::Result<Box<Value>>>>;
144+
Box::from_raw(ptr)
145+
};
146+
let item = if ok { None } else { Some(Err(std::io::Error::new(std::io::ErrorKind::Other, "get failed"))) };
147+
let _ = futures::executor::block_on((*tx).send(item));
148+
}
149+
150+
extern fn bootstrap_done_async_handler_cb(ok: bool, ptr: *mut c_void) {
151+
if ptr.is_null() {
152+
return;
153+
}
154+
155+
let tx = unsafe {
156+
let ptr = ptr as *mut futures::channel::oneshot::Sender<bool>;
157+
Box::from_raw(ptr)
158+
};
159+
let _ = (*tx).send(ok);
160+
}
161+
124162
struct PutHandler<'a>
125163
{
126164
done_cb: &'a mut(dyn FnMut(bool))
@@ -165,6 +203,14 @@ extern fn listen_handler_done(ptr: *mut c_void) {
165203
}
166204
}
167205

206+
struct VerboseDrop<'a, T>(T, &'a str);
207+
208+
impl<'a, T> VerboseDrop<'a, T> {
209+
fn drop(&mut self) {
210+
println!("{}", self.1);
211+
}
212+
}
213+
168214
impl DhtRunner {
169215
pub fn new() -> Box<DhtRunner> {
170216
unsafe {
@@ -187,11 +233,30 @@ impl DhtRunner {
187233
pub fn bootstrap(&mut self, host: &str, service: u16) {
188234
unsafe {
189235
dht_runner_bootstrap(&mut *self,
190-
CString::new(host).unwrap().as_ptr(),
191-
CString::new(service.to_string()).unwrap().as_ptr())
236+
CString::new(host).unwrap().as_ptr(),
237+
CString::new(service.to_string()).unwrap().as_ptr())
192238
}
193239
}
194240

241+
pub async fn bootstrap_async<A: Iterator<Item=SocketAddr>>(&mut self, addrs: A) -> std::io::Result<bool> {
242+
let socks: Vec<OsSocketAddr> = addrs.map(|a| a.into()).collect();
243+
let sizes: Vec<libc::socklen_t> = socks.iter().map(|s| s.len()).collect();
244+
245+
let (tx, rx) = futures::channel::oneshot::channel();
246+
247+
let tx = Box::new(tx);
248+
let tx = Box::into_raw(tx) as *mut c_void;
249+
250+
unsafe {
251+
dht_runner_bootstrap2(&mut *self, socks.as_ptr() as *const *const _,
252+
sizes.as_ptr() as *const *const _, bootstrap_done_async_handler_cb as *mut c_void, tx);
253+
}
254+
255+
let success = rx.await.expect("bootstrap_async() sender was dropped unexpectedly");
256+
257+
Ok(success)
258+
}
259+
195260
pub fn node_id(&self) -> InfoHash {
196261
unsafe {
197262
dht_runner_get_node_id(&*self)
@@ -217,6 +282,19 @@ impl DhtRunner {
217282
}
218283
}
219284

285+
pub fn get_async(&mut self, h: &InfoHash)
286+
-> impl TryStream<Ok=Box<Value>, Error=std::io::Error> + Unpin {
287+
let (tx, rx) = mpsc::unbounded();
288+
let tx = Box::new(tx);
289+
let tx = Box::into_raw(tx) as *mut c_void;
290+
291+
unsafe {
292+
dht_runner_get(&mut *self, h, get_async_handler_cb, done_async_handler_cb, tx)
293+
}
294+
rx.take_while(|item: &Option<_>| futures::future::ready(item.is_some()))
295+
.filter_map(|item| futures::future::ready(item))
296+
}
297+
220298
pub fn put<'a>(&mut self, h: &InfoHash, v: Box<Value>,
221299
done_cb: &'a mut(dyn FnMut(bool)), permanent: bool) {
222300
let handler = Box::new(PutHandler {
@@ -228,6 +306,21 @@ impl DhtRunner {
228306
}
229307
}
230308

309+
pub async fn put_async(&mut self, h: &InfoHash, v: Box<Value>, permanent: bool) -> bool {
310+
let (tx, rx) = futures::channel::oneshot::channel();
311+
let mut tx = Some(tx);
312+
313+
let mut done_cb = move |success| {
314+
if let Some(tx) = tx.take() {
315+
tx.send(success).expect("put_async() receiver was dropped unexpectedly");
316+
}
317+
};
318+
319+
self.put(h, v, &mut done_cb, permanent);
320+
321+
rx.await.expect("put_async() sender was dropped unexpectedly")
322+
}
323+
231324
pub fn put_signed<'a>(&mut self, h: &InfoHash, v: Box<Value>,
232325
done_cb: &'a mut(dyn FnMut(bool)), permanent: bool) {
233326
let handler = Box::new(PutHandler {
@@ -239,6 +332,21 @@ impl DhtRunner {
239332
}
240333
}
241334

335+
pub async fn put_signed_async(&mut self, h: &InfoHash, v: Box<Value>, permanent: bool) -> bool {
336+
let (tx, rx) = futures::channel::oneshot::channel();
337+
let mut tx = Some(tx);
338+
339+
let mut done_cb = move |success| {
340+
if let Some(tx) = tx.take() {
341+
tx.send(success).expect("put_signed_async() receiver was dropped unexpectedly");
342+
}
343+
};
344+
345+
self.put_signed(h, v, &mut done_cb, permanent);
346+
347+
rx.await.expect("put_signed_async() sender was dropped unexpectedly")
348+
}
349+
242350
pub fn put_encrypted<'a>(&mut self, h: &InfoHash, to: &InfoHash, v: Box<Value>,
243351
done_cb: &'a mut(dyn FnMut(bool)), permanent: bool) {
244352
let handler = Box::new(PutHandler {
@@ -250,6 +358,22 @@ impl DhtRunner {
250358
}
251359
}
252360

361+
pub async fn put_encrypted_async(&mut self, h: &InfoHash, to: &InfoHash, v: Box<Value>,
362+
permanent: bool) -> bool {
363+
let (tx, rx) = futures::channel::oneshot::channel();
364+
let mut tx = Some(tx);
365+
366+
let mut done_cb = move |success| {
367+
if let Some(tx) = tx.take() {
368+
tx.send(success).expect("put_encrypted_async() receiver was dropped unexpectedly");
369+
}
370+
};
371+
372+
self.put_encrypted(h, to, v, &mut done_cb, permanent);
373+
374+
rx.await.expect("put_encrypted_async() sender was dropped unexpectedly")
375+
}
376+
253377
pub fn cancel_put<'a>(&mut self, h: &InfoHash, vid: u64) {
254378
unsafe {
255379
dht_runner_cancel_put(&mut *self, h, vid)
@@ -267,6 +391,20 @@ impl DhtRunner {
267391
}
268392
}
269393

394+
pub fn listen_async(&mut self, h: &InfoHash)
395+
-> impl Stream<Item=(Box<Value>, bool)> + Unpin
396+
{
397+
let (mut tx, rx) = mpsc::unbounded();
398+
399+
let mut value_cb = move |v, expired| {
400+
futures::executor::block_on(tx.send((v, expired))).is_ok()
401+
};
402+
403+
let _token = self.listen(h, &mut value_cb);
404+
405+
return Box::pin(rx);
406+
}
407+
270408
pub fn cancel_listen(&mut self, h: &InfoHash, token: Box<OpToken>) {
271409
unsafe {
272410
dht_runner_cancel_listen(&mut *self, h, &*token)
@@ -282,6 +420,16 @@ impl DhtRunner {
282420
}
283421
}
284422

423+
pub async fn shutdown_async<'a>(&'a mut self) -> bool {
424+
let (tx, rx) = futures::channel::oneshot::channel();
425+
let tx = Box::new(tx);
426+
let ptr = Box::into_raw(tx) as *mut c_void;
427+
428+
self.shutdown(done_async_handler_cb, ptr);
429+
430+
rx.await.expect("shutdown_async() sender was dropped unexpectedly")
431+
}
432+
285433
pub fn public_addresses(&self) -> Vec<SocketAddr> {
286434
let mut result = Vec::new();
287435
unsafe {

rust/src/ffi.rs

+3
Original file line numberDiff line numberDiff line change
@@ -189,6 +189,9 @@ extern {
189189
pub fn dht_runner_run(dht: *mut DhtRunner, port: in_port_t);
190190
pub fn dht_runner_run_config(dht: *mut DhtRunner, port: in_port_t, config: *const DhtRunnerConfig);
191191
pub fn dht_runner_bootstrap(dht: *mut DhtRunner, host: *const c_char, service: *const c_char);
192+
pub fn dht_runner_bootstrap2(dht: *mut DhtRunner, addrs: *const *const OsSocketAddr,
193+
addr_lens: *const *const libc::socklen_t,
194+
done_cb: *mut c_void, cb_user_data: *mut c_void);
192195
pub fn dht_runner_get(dht: *mut DhtRunner, h: *const InfoHash,
193196
get_cb: extern fn(*mut Value, *mut c_void) -> bool,
194197
done_cb: extern fn(bool, *mut c_void),

0 commit comments

Comments
 (0)