Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support TLS in ApiServer #1565

Merged
merged 2 commits into from
Sep 21, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
85 changes: 55 additions & 30 deletions Cargo.lock

Large diffs are not rendered by default.

3 changes: 3 additions & 0 deletions api/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,10 @@ serde_json = "1"
slog = { version = "~2.3", features = ["max_level_trace", "release_max_level_trace"] }
tokio = "0.1.7"
tokio-core = "0.1.17"
tokio-tls = "0.2"
native-tls = "0.2"
http = "0.1.5"
hyper-tls = "0.3"
futures = "0.1.21"
url = "1.7.0"

Expand Down
4 changes: 3 additions & 1 deletion api/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ use http::uri::{InvalidUri, Uri};
use hyper::header::{ACCEPT, USER_AGENT};
use hyper::rt::{Future, Stream};
use hyper::{Body, Client, Request};
use hyper_tls;
use serde::{Deserialize, Serialize};
use serde_json;

Expand Down Expand Up @@ -165,7 +166,8 @@ where
}

fn send_request_async(req: Request<Body>) -> Box<Future<Item = String, Error = Error> + Send> {
let client = Client::new();
let https = hyper_tls::HttpsConnector::new(1).unwrap();
let client = Client::builder().build::<_, Body>(https);
Box::new(
client
.request(req)
Expand Down
104 changes: 12 additions & 92 deletions api/src/handlers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,17 +13,13 @@
// limitations under the License.

use std::collections::HashMap;
use std::fmt::Debug;
use std::net::SocketAddr;
use std::sync::{Arc, RwLock, Weak};
use std::thread;

use failure::ResultExt;
use futures::future::{err, ok};
use futures::{Future, Stream};
use hyper::{Body, Request, Response, StatusCode};
use serde::{Deserialize, Serialize};
use serde_json;
use futures::future::ok;
use futures::Future;
use hyper::{Body, Request, StatusCode};

use chain;
use core::core::hash::{Hash, Hashed};
Expand All @@ -40,6 +36,7 @@ use url::form_urlencoded;
use util;
use util::secp::pedersen::Commitment;
use util::LOGGER;
use web::*;

// All handlers use `Weak` references instead of `Arc` to avoid cycles that
// can never be destroyed. These 2 functions are simple helpers to reduce the
Expand Down Expand Up @@ -788,60 +785,6 @@ impl Handler for PoolPushHandler {
}
}

// Utility to serialize a struct into JSON and produce a sensible Response
// out of it.
fn json_response<T>(s: &T) -> ResponseFuture
where
T: Serialize,
{
match serde_json::to_string(s) {
Ok(json) => response(StatusCode::OK, json),
Err(_) => response(StatusCode::INTERNAL_SERVER_ERROR, ""),
}
}

fn result_to_response<T>(res: Result<T, Error>) -> ResponseFuture
where
T: Serialize,
{
match res {
Ok(s) => json_response_pretty(&s),
Err(e) => match e.kind() {
ErrorKind::Argument(msg) => response(StatusCode::BAD_REQUEST, msg.clone()),
ErrorKind::RequestError(msg) => response(StatusCode::BAD_REQUEST, msg.clone()),
ErrorKind::NotFound => response(StatusCode::NOT_FOUND, ""),
ErrorKind::Internal(msg) => response(StatusCode::INTERNAL_SERVER_ERROR, msg.clone()),
ErrorKind::ResponseError(msg) => {
response(StatusCode::INTERNAL_SERVER_ERROR, msg.clone())
}
},
}
}

// pretty-printed version of above
fn json_response_pretty<T>(s: &T) -> ResponseFuture
where
T: Serialize,
{
match serde_json::to_string_pretty(s) {
Ok(json) => response(StatusCode::OK, json),
Err(e) => response(
StatusCode::INTERNAL_SERVER_ERROR,
format!("can't create json response: {}", e),
),
}
}

fn response<T: Into<Body> + Debug>(status: StatusCode, text: T) -> ResponseFuture {
Box::new(ok(just_response(status, text)))
}

fn just_response<T: Into<Body> + Debug>(status: StatusCode, text: T) -> Response<Body> {
let mut resp = Response::new(text.into());
*resp.status_mut() = status;
resp
}

/// Start all server HTTP handlers. Register all of them with Router
/// and runs the corresponding HTTP server.
///
Expand All @@ -855,37 +798,14 @@ pub fn start_rest_apis(
chain: Weak<chain::Chain>,
tx_pool: Weak<RwLock<pool::TransactionPool>>,
peers: Weak<p2p::Peers>,
) {
let _ = thread::Builder::new()
.name("apis".to_string())
.spawn(move || {
let mut apis = ApiServer::new();

let router = build_router(chain, tx_pool, peers).expect("unable to build API router");

info!(LOGGER, "Starting HTTP API server at {}.", addr);
let socket_addr: SocketAddr = addr.parse().expect("unable to parse socket address");
apis.start(socket_addr, router).unwrap_or_else(|e| {
error!(LOGGER, "Failed to start API HTTP server: {}.", e);
});
});
}

fn parse_body<T>(req: Request<Body>) -> Box<Future<Item = T, Error = Error> + Send>
where
for<'de> T: Deserialize<'de> + Send + 'static,
{
Box::new(
req.into_body()
.concat2()
.map_err(|e| ErrorKind::RequestError(format!("Failed to read request: {}", e)).into())
.and_then(|body| match serde_json::from_reader(&body.to_vec()[..]) {
Ok(obj) => ok(obj),
Err(e) => {
err(ErrorKind::RequestError(format!("Invalid request body: {}", e)).into())
}
}),
)
) -> bool {
let mut apis = ApiServer::new();

let router = build_router(chain, tx_pool, peers).expect("unable to build API router");

info!(LOGGER, "Starting HTTP API server at {}.", addr);
let socket_addr: SocketAddr = addr.parse().expect("unable to parse socket address");
apis.start(socket_addr, router)
}

pub fn build_router(
Expand Down
5 changes: 5 additions & 0 deletions api/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ extern crate failure;
#[macro_use]
extern crate failure_derive;
extern crate hyper;
extern crate hyper_tls;
#[macro_use]
extern crate lazy_static;
extern crate regex;
Expand All @@ -35,16 +36,20 @@ extern crate serde_json;
extern crate slog;
extern crate futures;
extern crate http;
extern crate native_tls;
extern crate tokio;
extern crate tokio_core;
extern crate tokio_tls;

pub mod client;
mod handlers;
mod rest;
mod router;
mod types;
mod web;

pub use handlers::start_rest_apis;
pub use rest::*;
pub use router::*;
pub use types::*;
pub use web::*;
126 changes: 104 additions & 22 deletions api/src/rest.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,19 @@
//! To use it, just have your service(s) implement the ApiEndpoint trait and
//! register them on a ApiServer.

use failure::{Backtrace, Context, Fail};
use futures::sync::oneshot;
use futures::Stream;
use hyper::rt::Future;
use hyper::server::conn::Http;
use hyper::{rt, Body, Request, Server};
use native_tls::{Identity, TlsAcceptor};
use router::{Handler, HandlerObj, ResponseFuture, Router};
use std::fmt::{self, Display};
use std::net::SocketAddr;

use failure::{Backtrace, Context, Fail};
use std::{io, thread};
use tokio::net::TcpListener;
use tokio_tls;
use util::LOGGER;

/// Errors that can be returned by an ApiEndpoint implementation.
Expand Down Expand Up @@ -83,37 +89,113 @@ impl From<Context<ErrorKind>> for Error {
}
}

/// TLS config
pub struct TLSConfig {
pub pkcs_bytes: Vec<u8>,
pub pass: String,
}

/// HTTP server allowing the registration of ApiEndpoint implementations.
pub struct ApiServer {}
pub struct ApiServer {
shutdown_sender: Option<oneshot::Sender<()>>,
}

impl ApiServer {
/// Creates a new ApiServer that will serve ApiEndpoint implementations
/// under the root URL.
pub fn new() -> ApiServer {
ApiServer {}
ApiServer {
shutdown_sender: None,
}
}

/// Starts the ApiServer at the provided address.
pub fn start(&mut self, addr: SocketAddr, router: Router) -> Result<(), String> {
let server = Server::bind(&addr)
.serve(router)
.map_err(|e| eprintln!("server error: {}", e));

//let mut rt = Runtime::new().unwrap();
//if rt.block_on(server).is_err() {
// return Err("tokio block_on error".to_owned());
//}
rt::run(server);
Ok(())
pub fn start(&mut self, addr: SocketAddr, router: Router) -> bool {
if self.shutdown_sender.is_some() {
error!(LOGGER, "Can't start HTTP API server, it's running already");
return false;
}
let (tx, _rx) = oneshot::channel::<()>();
let _ = thread::Builder::new()
.name("apis".to_string())
.spawn(move || {
let server = Server::bind(&addr)
.serve(router)
// TODO graceful shutdown is unstable, investigate
//.with_graceful_shutdown(rx)
.map_err(|e| eprintln!("HTTP API server error: {}", e));

rt::run(server);
});

info!(LOGGER, "HTTP API server has been started");
self.shutdown_sender = Some(tx);
true
}

/// Starts the TLS ApiServer at the provided address.
/// TODO support stop operation
pub fn start_tls(&mut self, addr: SocketAddr, router: Router, conf: TLSConfig) -> bool {
if self.shutdown_sender.is_some() {
error!(LOGGER, "Can't start HTTP API server, it's running already");
return false;
}
let _ = thread::Builder::new()
.name("apis".to_string())
.spawn(move || {
let cert = Identity::from_pkcs12(conf.pkcs_bytes.as_slice(), &conf.pass).unwrap();
let tls_cx = TlsAcceptor::builder(cert).build().unwrap();
let tls_cx = tokio_tls::TlsAcceptor::from(tls_cx);
let srv = TcpListener::bind(&addr).expect("Error binding local port");
// Use lower lever hyper API to be able to intercept client connection
let server = Http::new()
.serve_incoming(
srv.incoming().and_then(move |socket| {
tls_cx
.accept(socket)
.map_err(|e| io::Error::new(io::ErrorKind::Other, e))
}),
router,
)
.then(|res| match res {
Ok(conn) => Ok(Some(conn)),
Err(e) => {
eprintln!("Error: {}", e);
Ok(None)
}
})
.for_each(|conn_opt| {
if let Some(conn) = conn_opt {
rt::spawn(
conn.and_then(|c| c.map_err(|e| panic!("Hyper error {}", e)))
.map_err(|e| eprintln!("Connection error {}", e)),
);
}
Ok(())
});

rt::run(server);
});

info!(LOGGER, "HTTPS API server has been started");
true
}

/// Stops the API server
pub fn stop(&mut self) {
// TODO implement proper stop, the following method doesn't
// work for current_thread runtime.
// if let Some(rt) = self.rt.take() {
// rt.shutdown_now().wait().unwrap();
// }
/// Stops the API server, it panics in case of error
pub fn stop(&mut self) -> bool {
if self.shutdown_sender.is_some() {
// TODO re-enable stop after investigation
//let tx = mem::replace(&mut self.shutdown_sender, None).unwrap();
//tx.send(()).expect("Failed to stop API server");
info!(LOGGER, "API server has been stoped");
true
} else {
error!(
LOGGER,
"Can't stop API server, it's not running or doesn't spport stop operation"
);
false
}
}
}

Expand Down
Loading