Skip to content
This repository was archived by the owner on Jul 5, 2024. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
246 changes: 36 additions & 210 deletions prover/src/bin/prover_rpcd.rs
Original file line number Diff line number Diff line change
@@ -1,223 +1,24 @@
use env_logger::Env;
use hyper::body::Buf;
use hyper::body::HttpBody;
use hyper::header::HeaderValue;
use hyper::service::{make_service_fn, service_fn};
use hyper::{Body, Method, Request, Response, Server, StatusCode};
use std::env::var;

use prover::server::serve;
use prover::shared_state::SharedState;
use prover::structs::*;

/// sets default headers for CORS requests
fn set_headers(headers: &mut hyper::HeaderMap, extended: bool) {
headers.insert("content-type", HeaderValue::from_static("application/json"));
headers.insert("access-control-allow-origin", HeaderValue::from_static("*"));

if extended {
headers.insert(
"access-control-allow-methods",
HeaderValue::from_static("post, get, options"),
);
headers.insert(
"access-control-allow-headers",
HeaderValue::from_static("origin, content-type, accept, x-requested-with"),
);
headers.insert("access-control-max-age", HeaderValue::from_static("300"));
}
}

async fn handle_request(
shared_state: SharedState,
req: Request<Body>,
) -> Result<Response<Body>, hyper::Error> {
{
// limits the request size
const MAX_BODY_SIZE: u64 = 1 << 20;
let response_content_length = match req.body().size_hint().upper() {
Some(v) => v,
None => MAX_BODY_SIZE + 1,
};

if response_content_length > MAX_BODY_SIZE {
let mut resp = Response::new(Body::from("request too large"));
*resp.status_mut() = StatusCode::BAD_REQUEST;
return Ok(resp);
}
}

match (req.method(), req.uri().path()) {
(&Method::GET, "/health") => {
// nothing to report yet - healthy by default
let mut resp = Response::default();
set_headers(resp.headers_mut(), false);
Ok(resp)
}

// returns http 200 if busy else 204.
// can be used programmatically for e.g. shutting down the instance if no workis being
// done.
(&Method::GET, "/status") => {
let rw = shared_state.rw.lock().await;
let is_busy = rw.pending_tasks > 0 || rw.tasks.iter().any(|e| e.result.is_none());
drop(rw);

let mut resp = Response::default();
*resp.status_mut() = match is_busy {
false => StatusCode::NO_CONTENT,
true => StatusCode::OK,
};
set_headers(resp.headers_mut(), false);
Ok(resp)
}

// json-rpc
(&Method::POST, "/") => {
let body_bytes = hyper::body::aggregate(req.into_body())
.await
.unwrap()
.reader();
let json_req: Result<JsonRpcRequest<Vec<serde_json::Value>>, serde_json::Error> =
serde_json::from_reader(body_bytes);

if let Err(err) = json_req {
let payload = serde_json::to_vec(&JsonRpcResponseError {
jsonrpc: "2.0".to_string(),
id: 0.into(),
error: JsonRpcError {
// parser error
code: -32700,
message: err.to_string(),
},
})
.unwrap();
let mut resp = Response::new(Body::from(payload));
set_headers(resp.headers_mut(), false);
return Ok(resp);
}

let json_req = json_req.unwrap();
let result: Result<serde_json::Value, String> =
handle_method(json_req.method.as_str(), &json_req.params, &shared_state).await;
let payload = match result {
Err(err) => {
serde_json::to_vec(&JsonRpcResponseError {
jsonrpc: "2.0".to_string(),
id: json_req.id,
error: JsonRpcError {
// internal server error
code: -32000,
message: err,
},
})
}
Ok(val) => serde_json::to_vec(&JsonRpcResponse {
jsonrpc: "2.0".to_string(),
id: json_req.id,
result: val,
}),
};
let mut resp = Response::new(Body::from(payload.unwrap()));
set_headers(resp.headers_mut(), false);
Ok(resp)
}

// serve CORS headers
(&Method::OPTIONS, "/") => {
let mut resp = Response::default();
set_headers(resp.headers_mut(), true);
Ok(resp)
}

// everything else
_ => {
let mut not_found = Response::default();
*not_found.status_mut() = StatusCode::NOT_FOUND;
Ok(not_found)
}
}
}

async fn handle_method(
method: &str,
params: &[serde_json::Value],
shared_state: &SharedState,
) -> Result<serde_json::Value, String> {
match method {
// enqueues a task for computating proof for any given block
"proof" => {
let options = params.get(0).ok_or("expected struct ProofRequestOptions")?;
let options: ProofRequestOptions =
serde_json::from_value(options.to_owned()).map_err(|e| e.to_string())?;

shared_state
.get_or_enqueue(&options)
.await
.map(|result| serde_json::to_value(result?).map_err(|e| e.to_string()))
.unwrap_or_else(|| Ok(serde_json::Value::Null))
}
// TODO/TBD: add method to only return the witnesses for a block.
// block table, tx table, etc...
//
// TODO: Add the abilitity to abort the current task.

// the following methods can be used to programmatically
// prune the `tasks` from the list.
"flushAll" => {
shared_state.rw.lock().await.tasks.clear();
Ok(serde_json::Value::Bool(true))
}
"flushPending" => {
shared_state
.rw
.lock()
.await
.tasks
.retain(|e| e.result.is_some());
Ok(serde_json::Value::Bool(true))
}
"flushCompleted" => {
shared_state
.rw
.lock()
.await
.tasks
.retain(|e| e.result.is_none());
Ok(serde_json::Value::Bool(true))
}
_ => Err("this method is not available".to_string()),
}
}

/// This command starts a http/json-rpc server and serves proof oriented
/// methods. Required environment variables:
/// - BIND - the interface address + port combination to accept connections on
/// `[::]:1234`
/// - PARAMS_PATH - a path to a file generated with the gen_params tool
/// - BIND
/// - the interface address + port combination to accept connections on
/// `[::]:1234`
/// - `PROVERD_LOOKUP`
/// - environment variable in the form of HOSTNAME:PORT
#[tokio::main]
async fn main() {
env_logger::Builder::from_env(Env::default().default_filter_or("info")).init();

let addr = var("BIND")
.expect("BIND env var")
.parse::<std::net::SocketAddr>()
.expect("valid socket address");
let shared_state = SharedState::new();

let shared_state = SharedState::default();
{
// start the http server
let ctx = shared_state.clone();
let h1 = tokio::spawn(async move {
let service = make_service_fn(move |_| {
let ctx = ctx.clone();
let service = service_fn(move |req| handle_request(ctx.clone(), req));

async move { Ok::<_, hyper::Error>(service) }
});
let server = Server::bind(&addr).serve(service);
log::info!("Listening on http://{}", addr);
server.await.expect("server should be serving");
});
let h1 = serve(&shared_state, &var("BIND").expect("BIND env var"));

// starts the duty cycle loop
let ctx = shared_state.clone();
Expand All @@ -227,14 +28,39 @@ async fn main() {
.build()
.unwrap();
let h2 = rt.spawn(async move {
let ctx = ctx.clone();
loop {
ctx.duty_cycle().await;
let ctx = ctx.clone();
// enclose this call to catch panics which may
// occur due to network services
let _ = tokio::spawn(async move {
log::debug!("task: duty_cycle");
ctx.duty_cycle().await;
})
.await;
tokio::time::sleep(std::time::Duration::from_millis(1000)).await;
}
});

// this task loop makes sure to merge task results periodically
// even if this instance is busy with proving
let ctx = shared_state.clone();
let h3 = tokio::spawn(async move {
loop {
let ctx = ctx.clone();
// enclose this call to catch panics which may
// occur due to network services
let _ = tokio::spawn(async move {
log::debug!("task: merge_tasks_from_peers");
let _ = ctx.merge_tasks_from_peers().await;
})
.await;
tokio::time::sleep(std::time::Duration::from_millis(1000)).await;
}
});

// wait for all tasks
let _ = tokio::join!(h1, h2);
if tokio::try_join!(h1, h2, h3).is_err() {
panic!("unexpected task error");
}
}
}
89 changes: 89 additions & 0 deletions prover/src/json_rpc.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
/// Common utilities for json-rpc
use hyper::body::Buf;
use hyper::client::HttpConnector;
use hyper::Body;
use hyper::Request;
use hyper::Uri;

use serde::de::DeserializeOwned;
use serde::Deserialize;
use serde::Serialize;

#[derive(Debug, Serialize, Deserialize)]
pub struct JsonRpcError {
pub code: i32,
pub message: String,
}

#[derive(Debug, Serialize)]
pub struct JsonRpcResponseError {
pub jsonrpc: String,
pub id: serde_json::Value,
pub error: JsonRpcError,
}

#[derive(Debug, Serialize, Deserialize)]
pub struct JsonRpcResponse<T> {
pub jsonrpc: String,
pub id: serde_json::Value,
pub result: Option<T>,
}

#[derive(Debug, Serialize, Deserialize)]
pub struct JsonRpcRequest<T: Serialize> {
pub jsonrpc: String,
pub id: serde_json::Value,
pub method: String,
pub params: T,
}

/// Invokes a `json-rpc` request with a timeout of `timeout` ms for the network
/// and deserialize part.
pub async fn jsonrpc_request_client<T: Serialize + Send + Sync, R: DeserializeOwned>(
timeout: u64,
client: &hyper::Client<HttpConnector>,
uri: &Uri,
method: &str,
params: T,
) -> Result<R, String> {
#[derive(Debug, Deserialize)]
struct JsonRpcResponseInternal<T> {
result: Option<T>,
error: Option<JsonRpcError>,
}

let node_req = Request::post(uri);
let req_obj = JsonRpcRequest {
jsonrpc: "2.0".to_string(),
id: 0.into(),
method: method.to_string(),
params,
};

let node_req = node_req
.header(hyper::header::CONTENT_TYPE, "application/json")
.body(Body::from(serde_json::to_vec(&req_obj).unwrap()))
.unwrap();

log::debug!("jsonrpc_request_client: {} {}", uri, method);

let json = tokio::time::timeout(std::time::Duration::from_millis(timeout), async {
let resp = client.request(node_req).await.unwrap();
let body = hyper::body::aggregate(resp).await.unwrap();
let json: JsonRpcResponseInternal<R> = serde_json::from_reader(body.reader()).unwrap();

json
})
.await
.map_err(|e| e.to_string())?;

if json.error.is_some() {
return Err(json.error.unwrap().message);
}

if json.result.is_none() {
return Err("no result in response".to_string());
}

Ok(json.result.unwrap())
}
2 changes: 2 additions & 0 deletions prover/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
pub mod compute_proof;
pub mod json_rpc;
pub mod server;
pub mod shared_state;
pub mod structs;
Loading