diff --git a/Cargo.lock b/Cargo.lock index d9452c07a..63e4eca94 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1177,6 +1177,7 @@ dependencies = [ "blake2-rfc 0.2.18 (registry+https://github.com/rust-lang/crates.io-index)", "byteorder 1.3.4 (registry+https://github.com/rust-lang/crates.io-index)", "chrono 0.4.10 (registry+https://github.com/rust-lang/crates.io-index)", + "crossbeam-utils 0.7.2 (registry+https://github.com/rust-lang/crates.io-index)", "data-encoding 2.2.0 (registry+https://github.com/rust-lang/crates.io-index)", "ed25519-dalek 1.0.0-pre.3 (registry+https://github.com/rust-lang/crates.io-index)", "failure 0.1.6 (registry+https://github.com/rust-lang/crates.io-index)", diff --git a/api/src/types.rs b/api/src/types.rs index 8245547e4..49e2622e7 100644 --- a/api/src/types.rs +++ b/api/src/types.rs @@ -128,6 +128,12 @@ impl EncryptedBody { let nonce = from_hex(self.nonce.clone()).context(ErrorKind::APIEncryption( "EncryptedBody Dec: Invalid Nonce".to_string(), ))?; + if nonce.len() < 12 { + return Err(ErrorKind::APIEncryption( + "EncryptedBody Dec: Invalid Nonce length".to_string(), + ) + .into()); + } let mut n = [0u8; 12]; n.copy_from_slice(&nonce[0..12]); let unbound_key = aead::UnboundKey::new(&aead::AES_256_GCM, &dec_key.0).unwrap(); diff --git a/controller/src/controller.rs b/controller/src/controller.rs index 98fea4504..a15a06def 100644 --- a/controller/src/controller.rs +++ b/controller/src/controller.rs @@ -24,10 +24,9 @@ use crate::libwallet::{ use crate::util::secp::key::SecretKey; use crate::util::{from_hex, static_secp_instance, to_base64, Mutex}; use failure::ResultExt; -use futures::future::{err, ok}; -use futures::{Future, Stream}; use grin_wallet_api::JsonId; use grin_wallet_util::OnionV3Address; +use hyper::body; use hyper::header::HeaderValue; use hyper::{Body, Request, Response, StatusCode}; use serde::{Deserialize, Serialize}; @@ -302,8 +301,6 @@ where .map_err(|e| ErrorKind::GenericError(format!("API thread panicked :{:?}", e)).into()) } -type WalletResponseFuture = Box, Error = Error> + Send>; - /// V2 API Handler/Wrapper for owner functions pub struct OwnerAPIHandlerV2 where @@ -317,7 +314,7 @@ where impl OwnerAPIHandlerV2 where - L: WalletLCProvider<'static, C, K>, + L: WalletLCProvider<'static, C, K> + 'static, C: NodeClient + 'static, K: Keychain + 'static, { @@ -328,30 +325,25 @@ where OwnerAPIHandlerV2 { wallet } } - fn call_api( - &self, - req: Request, - api: Owner, - ) -> Box + Send> { - Box::new(parse_body(req).and_then(move |val: serde_json::Value| { - let owner_api = &api as &dyn OwnerRpc; - match owner_api.handle_request(val) { - MaybeReply::Reply(r) => ok(r), - MaybeReply::DontReply => { - // Since it's http, we need to return something. We return [] because jsonrpc - // clients will parse it as an empty batch response. - ok(serde_json::json!([])) - } + async fn call_api(req: Request, api: Owner) -> Result { + let val: serde_json::Value = parse_body(req).await?; + match OwnerRpc::handle_request(&api, val) { + MaybeReply::Reply(r) => Ok(r), + MaybeReply::DontReply => { + // Since it's http, we need to return something. We return [] because jsonrpc + // clients will parse it as an empty batch response. + Ok(serde_json::json!([])) } - })) + } } - fn handle_post_request(&self, req: Request) -> WalletResponseFuture { - let api = Owner::new(self.wallet.clone(), None); - Box::new( - self.call_api(req, api) - .and_then(|resp| ok(json_response_pretty(&resp))), - ) + async fn handle_post_request( + req: Request, + wallet: Arc + 'static>>>, + ) -> Result, Error> { + let api = Owner::new(wallet, None); + let res = Self::call_api(req, api).await?; + Ok(json_response_pretty(&res)) } } @@ -362,18 +354,20 @@ where K: Keychain + 'static, { fn post(&self, req: Request) -> ResponseFuture { - Box::new( - self.handle_post_request(req) - .and_then(|r| ok(r)) - .or_else(|e| { + let wallet = self.wallet.clone(); + Box::pin(async move { + match Self::handle_post_request(req, wallet).await { + Ok(r) => Ok(r), + Err(e) => { error!("Request Error: {:?}", e); - ok(create_error_response(e)) - }), - ) + Ok(create_error_response(e)) + } + } + }) } fn options(&self, _req: Request) -> ResponseFuture { - Box::new(ok(create_ok_response("{}"))) + Box::pin(async { Ok(create_ok_response("{}")) }) } } @@ -634,81 +628,81 @@ where } } - fn call_api( - &self, + async fn call_api( req: Request, + key: Arc>>, + mask: Arc>>, + running_foreign: bool, api: Arc>, - ) -> Box + Send> { - let key = self.shared_key.clone(); - let mask = self.keychain_mask.clone(); - let running_foreign = self.running_foreign; - Box::new(parse_body(req).and_then(move |val: serde_json::Value| { - let mut val = val; - let owner_api_s = &*api as &dyn OwnerRpcS; - let mut is_init_secure_api = OwnerV3Helpers::is_init_secure_api(&val); - let mut was_encrypted = false; - let mut encrypted_req_id = JsonId::StrId(String::from("")); - if !is_init_secure_api { - if let Err(v) = OwnerV3Helpers::check_encryption_started(key.clone()) { - return ok(v); - } - let res = OwnerV3Helpers::decrypt_request(key.clone(), &val); - match res { - Err(e) => return ok(e), - Ok(v) => { - encrypted_req_id = v.0.clone(); - val = v.1; - } + ) -> Result { + let mut val: serde_json::Value = parse_body(req).await?; + let mut is_init_secure_api = OwnerV3Helpers::is_init_secure_api(&val); + let mut was_encrypted = false; + let mut encrypted_req_id = JsonId::StrId(String::from("")); + if !is_init_secure_api { + if let Err(v) = OwnerV3Helpers::check_encryption_started(key.clone()) { + return Ok(v); + } + let res = OwnerV3Helpers::decrypt_request(key.clone(), &val); + match res { + Err(e) => return Ok(e), + Ok(v) => { + encrypted_req_id = v.0.clone(); + val = v.1; } - was_encrypted = true; } - // check again, in case it was an encrypted call to init_secure_api - is_init_secure_api = OwnerV3Helpers::is_init_secure_api(&val); - // also need to intercept open/close wallet requests - let is_open_wallet = OwnerV3Helpers::is_open_wallet(&val); - match owner_api_s.handle_request(val) { - MaybeReply::Reply(mut r) => { - let (_was_error, unencrypted_intercept) = - OwnerV3Helpers::check_error_response(&r.clone()); - if is_open_wallet && running_foreign { - OwnerV3Helpers::update_mask(mask, &r.clone()); - } - if was_encrypted { - let res = OwnerV3Helpers::encrypt_response( - key.clone(), - &encrypted_req_id, - &unencrypted_intercept, - ); - r = match res { - Ok(v) => v, - Err(v) => return ok(v), - } - } - // intercept init_secure_api response (after encryption, - // in case it was an encrypted call to 'init_api_secure') - if is_init_secure_api { - OwnerV3Helpers::update_owner_api_shared_key( - key.clone(), - &unencrypted_intercept, - api.shared_key.lock().clone(), - ); + was_encrypted = true; + } + // check again, in case it was an encrypted call to init_secure_api + is_init_secure_api = OwnerV3Helpers::is_init_secure_api(&val); + // also need to intercept open/close wallet requests + let is_open_wallet = OwnerV3Helpers::is_open_wallet(&val); + match OwnerRpcS::handle_request(&*api, val) { + MaybeReply::Reply(mut r) => { + let (_was_error, unencrypted_intercept) = + OwnerV3Helpers::check_error_response(&r.clone()); + if is_open_wallet && running_foreign { + OwnerV3Helpers::update_mask(mask, &r.clone()); + } + if was_encrypted { + let res = OwnerV3Helpers::encrypt_response( + key.clone(), + &encrypted_req_id, + &unencrypted_intercept, + ); + r = match res { + Ok(v) => v, + Err(v) => return Ok(v), } - ok(r) } - MaybeReply::DontReply => { - // Since it's http, we need to return something. We return [] because jsonrpc - // clients will parse it as an empty batch response. - ok(serde_json::json!([])) + // intercept init_secure_api response (after encryption, + // in case it was an encrypted call to 'init_api_secure') + if is_init_secure_api { + OwnerV3Helpers::update_owner_api_shared_key( + key.clone(), + &unencrypted_intercept, + api.shared_key.lock().clone(), + ); } + Ok(r) + } + MaybeReply::DontReply => { + // Since it's http, we need to return something. We return [] because jsonrpc + // clients will parse it as an empty batch response. + Ok(serde_json::json!([])) } - })) + } } - fn handle_post_request(&self, req: Request) -> WalletResponseFuture { - Box::new( - self.call_api(req, self.owner_api.clone()) - .and_then(|resp| ok(json_response_pretty(&resp))), - ) + async fn handle_post_request( + req: Request, + key: Arc>>, + mask: Arc>>, + running_foreign: bool, + api: Arc>, + ) -> Result, Error> { + let res = Self::call_api(req, key, mask, running_foreign, api).await?; + Ok(json_response_pretty(&res)) } } @@ -719,18 +713,24 @@ where K: Keychain + 'static, { fn post(&self, req: Request) -> ResponseFuture { - Box::new( - self.handle_post_request(req) - .and_then(|r| ok(r)) - .or_else(|e| { + let key = self.shared_key.clone(); + let mask = self.keychain_mask.clone(); + let running_foreign = self.running_foreign; + let api = self.owner_api.clone(); + + Box::pin(async move { + match Self::handle_post_request(req, key, mask, running_foreign, api).await { + Ok(r) => Ok(r), + Err(e) => { error!("Request Error: {:?}", e); - ok(create_error_response(e)) - }), - ) + Ok(create_error_response(e)) + } + } + }) } fn options(&self, _req: Request) -> ResponseFuture { - Box::new(ok(create_ok_response("{}"))) + Box::pin(async { Ok(create_ok_response("{}")) }) } } /// V2 API Handler/Wrapper for foreign functions @@ -763,31 +763,29 @@ where } } - fn call_api( - &self, + async fn call_api( req: Request, api: Foreign<'static, L, C, K>, - ) -> Box + Send> { - Box::new(parse_body(req).and_then(move |val: serde_json::Value| { - let foreign_api = &api as &dyn ForeignRpc; - match foreign_api.handle_request(val) { - MaybeReply::Reply(r) => ok({ r }), - MaybeReply::DontReply => { - // Since it's http, we need to return something. We return [] because jsonrpc - // clients will parse it as an empty batch response. - ok(serde_json::json!([])) - } + ) -> Result { + let val: serde_json::Value = parse_body(req).await?; + match ForeignRpc::handle_request(&api, val) { + MaybeReply::Reply(r) => Ok(r), + MaybeReply::DontReply => { + // Since it's http, we need to return something. We return [] because jsonrpc + // clients will parse it as an empty batch response. + Ok(serde_json::json!([])) } - })) + } } - fn handle_post_request(&self, req: Request) -> WalletResponseFuture { - let mask = self.keychain_mask.lock(); - let api = Foreign::new(self.wallet.clone(), mask.clone(), Some(check_middleware)); - Box::new( - self.call_api(req, api) - .and_then(|resp| ok(json_response_pretty(&resp))), - ) + async fn handle_post_request( + req: Request, + mask: Option, + wallet: Arc + 'static>>>, + ) -> Result, Error> { + let api = Foreign::new(wallet, mask, Some(check_middleware)); + let res = Self::call_api(req, api).await?; + Ok(json_response_pretty(&res)) } } @@ -798,18 +796,22 @@ where K: Keychain + 'static, { fn post(&self, req: Request) -> ResponseFuture { - Box::new( - self.handle_post_request(req) - .and_then(|r| ok(r)) - .or_else(|e| { + let mask = self.keychain_mask.lock().clone(); + let wallet = self.wallet.clone(); + + Box::pin(async move { + match Self::handle_post_request(req, mask, wallet).await { + Ok(v) => Ok(v), + Err(e) => { error!("Request Error: {:?}", e); - ok(create_error_response(e)) - }), - ) + Ok(create_error_response(e)) + } + } + }) } fn options(&self, _req: Request) -> ResponseFuture { - Box::new(ok(create_ok_response("{}"))) + Box::pin(async { Ok(create_ok_response("{}")) }) } } @@ -866,9 +868,7 @@ fn create_ok_response(json: &str) -> Response { /// Whenever the status code is `StatusCode::OK` the text parameter should be /// valid JSON as the content type header will be set to `application/json' fn response>(status: StatusCode, text: T) -> Response { - let mut builder = &mut Response::builder(); - - builder = builder + let mut builder = Response::builder() .status(status) .header("access-control-allow-origin", "*") .header( @@ -883,19 +883,14 @@ fn response>(status: StatusCode, text: T) -> Response { builder.body(text.into()).unwrap() } -fn parse_body(req: Request) -> Box + Send> +async fn parse_body(req: Request) -> Result where for<'de> T: Deserialize<'de> + Send + 'static, { - Box::new( - req.into_body() - .concat2() - .map_err(|_| ErrorKind::GenericError("Failed to read request".to_owned()).into()) - .and_then(|body| match serde_json::from_reader(&body.to_vec()[..]) { - Ok(obj) => ok(obj), - Err(e) => { - err(ErrorKind::GenericError(format!("Invalid request body: {}", e)).into()) - } - }), - ) + let body = body::to_bytes(req.into_body()) + .await + .map_err(|_| ErrorKind::GenericError("Failed to read request".to_string()))?; + + serde_json::from_reader(&body[..]) + .map_err(|e| ErrorKind::GenericError(format!("Invalid request body: {}", e)).into()) } diff --git a/impls/Cargo.toml b/impls/Cargo.toml index 2e1e55c76..9e0c11093 100644 --- a/impls/Cargo.toml +++ b/impls/Cargo.toml @@ -24,6 +24,7 @@ ring = "0.16" tokio = { version = "0.2", features = ["full"] } uuid = { version = "0.7", features = ["serde", "v4"] } chrono = { version = "0.4.4", features = ["serde"] } +crossbeam-utils = "0.7" #http client (copied from grin) http = "0.1.5" diff --git a/impls/src/client_utils/client.rs b/impls/src/client_utils/client.rs index f71f8bb57..31a57e822 100644 --- a/impls/src/client_utils/client.rs +++ b/impls/src/client_utils/client.rs @@ -15,6 +15,7 @@ //! High level JSON/HTTP client API use crate::util::to_base64; +use crossbeam_utils::thread::scope; use failure::{Backtrace, Context, Fail, ResultExt}; use hyper::body; use hyper::header::{ACCEPT, AUTHORIZATION, CONTENT_TYPE, USER_AGENT}; @@ -39,7 +40,7 @@ pub enum ErrorKind { #[fail(display = "Internal error: {}", _0)] Internal(String), #[fail(display = "Bad arguments: {}", _0)] - Argument(String), + _Argument(String), #[fail(display = "Not found.")] _NotFound, #[fail(display = "Request error: {}", _0)] @@ -323,11 +324,17 @@ impl Client { pub fn send_request(&self, req: Request) -> Result { let task = self.send_request_async(req); - let mut rt = Builder::new() - .basic_scheduler() - .enable_all() - .build() - .context(ErrorKind::Internal("can't create Tokio runtime".to_owned()))?; - rt.block_on(task) + scope(|s| { + let handle = s.spawn(|_| { + let mut rt = Builder::new() + .basic_scheduler() + .enable_all() + .build() + .context(ErrorKind::Internal("can't create Tokio runtime".to_owned()))?; + rt.block_on(task) + }); + handle.join().unwrap() + }) + .unwrap() } } diff --git a/impls/src/node_clients/http.rs b/impls/src/node_clients/http.rs index 6f8478627..52c1cc302 100644 --- a/impls/src/node_clients/http.rs +++ b/impls/src/node_clients/http.rs @@ -17,6 +17,7 @@ use crate::api::{self, LocatedTxKernel, OutputListing, OutputPrintable}; use crate::core::core::{Transaction, TxKernel}; use crate::libwallet::{NodeClient, NodeVersionInfo}; +use crossbeam_utils::thread::scope; use futures::stream::FuturesUnordered; use futures::TryStreamExt; use std::collections::HashMap; @@ -250,12 +251,20 @@ impl NodeClient for HTTPNodeClient { task.try_collect().await }; - let mut rt = Builder::new() - .threaded_scheduler() - .enable_all() - .build() - .unwrap(); - let res: Result, _> = rt.block_on(task); + let res = scope(|s| { + let handle = s.spawn(|_| { + let mut rt = Builder::new() + .threaded_scheduler() + .enable_all() + .build() + .unwrap(); + let res: Result, _> = rt.block_on(task); + res + }); + handle.join().unwrap() + }) + .unwrap(); + let results: Vec = match res { Ok(resps) => { let mut results = vec![];