From 0d36acf01b04093527bc1e75e61c27190820230d Mon Sep 17 00:00:00 2001 From: e-max Date: Sat, 9 Feb 2019 20:49:58 +0100 Subject: [PATCH] introduce an async version of stratum server. (#2468) It seems that current approach stops workring after amount of parallel connection exceeds ~500. This PR introduces an 'async' feature which enable asyncronius, tokio based implementation of stratum server. * fix bug with passing current_difficulty in Handler I passed in Handler current_difficulty as an u64, so it was copied and all later changes were ignored. In this commit I pass referece on RwLock with current_difficulty. * fix crash in build_block * improve error handling * print error message on unknown error * replace original server by tokio based one * fixes after review * reduce sleep time in mail loop to 5ms --- Cargo.lock | 1 + servers/Cargo.toml | 1 + servers/src/grin/server.rs | 4 +- servers/src/mining/stratumserver.rs | 1090 ++++++++++++++------------- 4 files changed, 582 insertions(+), 514 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index c4df8657ab..4cdbc2bd05 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -894,6 +894,7 @@ dependencies = [ "serde 1.0.81 (registry+https://github.com/rust-lang/crates.io-index)", "serde_derive 1.0.81 (registry+https://github.com/rust-lang/crates.io-index)", "serde_json 1.0.33 (registry+https://github.com/rust-lang/crates.io-index)", + "tokio 0.1.11 (registry+https://github.com/rust-lang/crates.io-index)", ] [[package]] diff --git a/servers/Cargo.toml b/servers/Cargo.toml index 476b3c670c..7b613a3ab6 100644 --- a/servers/Cargo.toml +++ b/servers/Cargo.toml @@ -24,6 +24,7 @@ serde_json = "1" chrono = "0.4.4" bufstream = "~0.1" jsonrpc-core = "~8.0" +tokio = "0.1.11" grin_api = { path = "../api", version = "1.0.1" } grin_chain = { path = "../chain", version = "1.0.1" } diff --git a/servers/src/grin/server.rs b/servers/src/grin/server.rs index ea2532a77e..bcd404b90b 100644 --- a/servers/src/grin/server.rs +++ b/servers/src/grin/server.rs @@ -302,12 +302,12 @@ impl Server { self.chain.clone(), self.tx_pool.clone(), self.verifier_cache.clone(), + self.state_info.stratum_stats.clone(), ); - let stratum_stats = self.state_info.stratum_stats.clone(); let _ = thread::Builder::new() .name("stratum_server".to_string()) .spawn(move || { - stratum_server.run_loop(stratum_stats, edge_bits as u32, proof_size, sync_state); + stratum_server.run_loop(edge_bits as u32, proof_size, sync_state); }); } diff --git a/servers/src/mining/stratumserver.rs b/servers/src/mining/stratumserver.rs index 439265770b..929c27edf2 100644 --- a/servers/src/mining/stratumserver.rs +++ b/servers/src/mining/stratumserver.rs @@ -13,15 +13,21 @@ // limitations under the License. //! Mining Stratum Server -use crate::util::{Mutex, RwLock}; -use bufstream::BufStream; + +use futures::future::Future; +use futures::stream::Stream; +use tokio::io::AsyncRead; +use tokio::io::{lines, write_all}; +use tokio::net::TcpListener; + +use crate::util::RwLock; use chrono::prelude::Utc; use serde; use serde_json; use serde_json::Value; -use std::error::Error; -use std::io::{BufRead, ErrorKind, Write}; -use std::net::{TcpListener, TcpStream}; +use std::collections::HashMap; +use std::io::BufReader; +use std::net::SocketAddr; use std::sync::Arc; use std::time::{Duration, SystemTime}; use std::{cmp, thread}; @@ -37,6 +43,10 @@ use crate::mining::mine_block; use crate::pool; use crate::util; +use futures::sync::mpsc; + +type Tx = mpsc::UnboundedSender; + // ---------------------------------------- // http://www.jsonrpc.org/specification // RPC Methods @@ -64,6 +74,67 @@ struct RpcError { message: String, } +impl RpcError { + pub fn internal_error() -> Self { + RpcError { + code: 32603, + message: "Internal error".to_owned(), + } + } + pub fn node_is_syncing() -> Self { + RpcError { + code: -32000, + message: "Node is syncing - Please wait".to_owned(), + } + } + pub fn method_not_found() -> Self { + RpcError { + code: -32601, + message: "Method not found".to_owned(), + } + } + pub fn too_late() -> Self { + RpcError { + code: -32503, + message: "Solution submitted too late".to_string(), + } + } + pub fn cannot_validate() -> Self { + RpcError { + code: -32502, + message: "Failed to validate solution".to_string(), + } + } + pub fn too_low_difficulty() -> Self { + RpcError { + code: -32501, + message: "Share rejected due to low difficulty".to_string(), + } + } + pub fn invalid_request() -> Self { + RpcError { + code: -32600, + message: "Invalid Request".to_string(), + } + } +} + +impl From for Value { + fn from(e: RpcError) -> Self { + serde_json::to_value(e).unwrap() + } +} + +impl From for RpcError +where + T: std::error::Error, +{ + fn from(e: T) -> Self { + error!("Received unhandled error: {}", e); + RpcError::internal_error() + } +} + #[derive(Serialize, Deserialize, Debug)] struct LoginParams { login: String, @@ -98,329 +169,141 @@ pub struct WorkerStatus { stale: u64, } -// ---------------------------------------- -// Worker Factory Thread Function - -// Run in a thread. Adds new connections to the workers list -fn accept_workers( - id: String, - address: String, - workers: &mut Arc>>, - stratum_stats: &mut Arc>, -) { - let listener = TcpListener::bind(address).expect("Failed to bind to listen address"); - let mut worker_id: u32 = 0; - for stream in listener.incoming() { - match stream { - Ok(stream) => { - warn!( - "(Server ID: {}) New connection: {}", - id, - stream.peer_addr().unwrap() - ); - stream - .set_nonblocking(true) - .expect("set_nonblocking call failed"); - let worker = Worker::new(worker_id.to_string(), BufStream::new(stream)); - workers.lock().push(worker); - // stats for this worker (worker stat objects are added and updated but never - // removed) - let mut worker_stats = WorkerStats::default(); - worker_stats.is_connected = true; - worker_stats.id = worker_id.to_string(); - worker_stats.pow_difficulty = 1; // XXX TODO - let mut stratum_stats = stratum_stats.write(); - stratum_stats.worker_stats.push(worker_stats); - worker_id = worker_id + 1; - } - Err(e) => { - warn!("(Server ID: {}) Error accepting connection: {:?}", id, e); - } - } - } - // close the socket server - drop(listener); -} - -// ---------------------------------------- -// Worker Object - a connected stratum client - a miner, pool, proxy, etc... - -pub struct Worker { +struct Handler { id: String, - agent: String, - login: Option, - stream: BufStream, - error: bool, - authenticated: bool, -} - -impl Worker { - /// Creates a new Stratum Worker. - pub fn new(id: String, stream: BufStream) -> Worker { - Worker { - id: id, - agent: String::from(""), - login: None, - stream: stream, - error: false, - authenticated: false, - } - } - - // Get Message from the worker - fn read_message(&mut self, line: &mut String) -> Option { - // Read and return a single message or None - match self.stream.read_line(line) { - Ok(n) => { - return Some(n); - } - Err(ref e) if e.kind() == ErrorKind::WouldBlock => { - // Not an error, just no messages ready - return None; - } - Err(e) => { - warn!( - "(Server ID: {}) Error in connection with stratum client: {}", - self.id, e - ); - self.error = true; - return None; - } - } - } - - // Send Message to the worker - fn write_message(&mut self, mut message: String) { - // Write and Flush the message - if !message.ends_with("\n") { - message += "\n"; - } - match self.stream.write(message.as_bytes()) { - Ok(_) => match self.stream.flush() { - Ok(_) => {} - Err(e) => { - warn!( - "(Server ID: {}) Error in connection with stratum client: {}", - self.id, e - ); - self.error = true; - } - }, - Err(e) => { - warn!( - "(Server ID: {}) Error in connection with stratum client: {}", - self.id, e - ); - self.error = true; - return; - } - } - } -} // impl Worker - -// ---------------------------------------- -// Grin Stratum Server - -pub struct StratumServer { - id: String, - config: StratumServerConfig, - chain: Arc, - tx_pool: Arc>, - verifier_cache: Arc>, - current_block_versions: Vec, - current_difficulty: u64, - minimum_share_difficulty: u64, - current_key_id: Option, - workers: Arc>>, + workers: Arc, + current_block_versions: Arc>>, sync_state: Arc, + minimum_share_difficulty: Arc>, + current_key_id: Arc>>, + current_difficulty: Arc>, + chain: Arc, } -impl StratumServer { - /// Creates a new Stratum Server. +impl Handler { pub fn new( - config: StratumServerConfig, + id: String, + workers: Arc, + current_block_versions: Arc>>, + sync_state: Arc, + minimum_share_difficulty: Arc>, + current_key_id: Arc>>, + current_difficulty: Arc>, chain: Arc, - tx_pool: Arc>, - verifier_cache: Arc>, - ) -> StratumServer { - StratumServer { - id: String::from("0"), - minimum_share_difficulty: config.minimum_share_difficulty, - config, + ) -> Self { + Handler { + id, + workers, + current_block_versions, + sync_state, + minimum_share_difficulty, + current_key_id, + current_difficulty, chain, - tx_pool, - verifier_cache, - current_block_versions: Vec::new(), - current_difficulty: ::max_value(), - current_key_id: None, - workers: Arc::new(Mutex::new(Vec::new())), - sync_state: Arc::new(SyncState::new()), } } + pub fn from_stratum(stratum: &StratumServer) -> Self { + Handler::new( + stratum.id.clone(), + stratum.workers.clone(), + stratum.current_block_versions.clone(), + stratum.sync_state.clone(), + stratum.minimum_share_difficulty.clone(), + stratum.current_key_id.clone(), + stratum.current_difficulty.clone(), + stratum.chain.clone(), + ) + } + fn handle_rpc_requests(&self, request: RpcRequest, worker_id: usize) -> String { + self.workers.last_seen(worker_id); + + // Call the handler function for requested method + let response = match request.method.as_str() { + "login" => self.handle_login(request.params, worker_id), + "submit" => { + let res = self.handle_submit(request.params, worker_id); + // this key_id has been used now, reset + if let Ok((_, true)) = res { + let mut current_key_id = self.current_key_id.write(); + *current_key_id = None; + } + res.map(|(v, _)| v) + } + "keepalive" => self.handle_keepalive(), + "getjobtemplate" => { + if self.sync_state.is_syncing() { + Err(RpcError::node_is_syncing()) + } else { + self.handle_getjobtemplate() + } + } + "status" => self.handle_status(worker_id), + _ => { + // Called undefined method + Err(RpcError::method_not_found()) + } + }; - // Build and return a JobTemplate for mining the current block - fn build_block_template(&self) -> JobTemplate { - let bh = self.current_block_versions.last().unwrap().header.clone(); - // Serialize the block header into pre and post nonce strings - let mut header_buf = vec![]; - { - let mut writer = ser::BinWriter::new(&mut header_buf); - bh.write_pre_pow(&mut writer).unwrap(); - bh.pow.write_pre_pow(bh.version, &mut writer).unwrap(); - } - let pre_pow = util::to_hex(header_buf); - let job_template = JobTemplate { - height: bh.height, - job_id: (self.current_block_versions.len() - 1) as u64, - difficulty: self.minimum_share_difficulty, - pre_pow, + // Package the reply as RpcResponse json + let resp = match response { + Err(rpc_error) => RpcResponse { + id: request.id, + jsonrpc: String::from("2.0"), + method: request.method, + result: None, + error: Some(rpc_error.into()), + }, + Ok(response) => RpcResponse { + id: request.id, + jsonrpc: String::from("2.0"), + method: request.method, + result: Some(response), + error: None, + }, }; - return job_template; + serde_json::to_string(&resp).unwrap() + } + fn handle_login(&self, params: Option, worker_id: usize) -> Result { + let params: LoginParams = parse_params(params)?; + let mut workers = self.workers.workers_list.write(); + let worker = workers + .get_mut(&worker_id) + .ok_or(RpcError::internal_error())?; + worker.login = Some(params.login); + // XXX TODO Future - Validate password? + worker.agent = params.agent; + worker.authenticated = true; + return Ok("ok".into()); } - // Handle an RPC request message from the worker(s) - fn handle_rpc_requests(&mut self, stratum_stats: &mut Arc>) { - let mut workers_l = self.workers.lock(); - let mut the_message = String::with_capacity(4096); - for num in 0..workers_l.len() { - match workers_l[num].read_message(&mut the_message) { - Some(_) => { - // Decompose the request from the JSONRpc wrapper - let request: RpcRequest = match serde_json::from_str(&the_message) { - Ok(request) => request, - Err(e) => { - // not a valid JSON RpcRequest - disconnect the worker - warn!( - "(Server ID: {}) Failed to parse JSONRpc: {} - {:?}", - self.id, - e.description(), - the_message.as_bytes(), - ); - workers_l[num].error = true; - the_message.clear(); - continue; - } - }; - - the_message.clear(); - - let mut stratum_stats = stratum_stats.write(); - let worker_stats_id = match stratum_stats - .worker_stats - .iter() - .position(|r| r.id == workers_l[num].id) - { - Some(id) => id, - None => continue, - }; - stratum_stats.worker_stats[worker_stats_id].last_seen = SystemTime::now(); - - // Call the handler function for requested method - let response = match request.method.as_str() { - "login" => { - if self.current_block_versions.is_empty() { - continue; - } - stratum_stats.worker_stats[worker_stats_id].initial_block_height = - self.current_block_versions.last().unwrap().header.height; - self.handle_login(request.params, &mut workers_l[num]) - } - "submit" => { - let res = self.handle_submit( - request.params, - &mut workers_l[num], - &mut stratum_stats.worker_stats[worker_stats_id], - ); - // this key_id has been used now, reset - if let Ok((_, true)) = res { - self.current_key_id = None; - } - res.map(|(v, _)| v) - } - "keepalive" => self.handle_keepalive(), - "getjobtemplate" => { - if self.sync_state.is_syncing() { - let e = RpcError { - code: -32000, - message: "Node is syncing - Please wait".to_string(), - }; - Err(serde_json::to_value(e).unwrap()) - } else { - self.handle_getjobtemplate() - } - } - "status" => { - self.handle_status(&mut stratum_stats.worker_stats[worker_stats_id]) - } - _ => { - // Called undefined method - let e = RpcError { - code: -32601, - message: "Method not found".to_string(), - }; - Err(serde_json::to_value(e).unwrap()) - } - }; - - let id = request.id.clone(); - // Package the reply as RpcResponse json - let resp = match response { - Err(response) => RpcResponse { - id: id, - jsonrpc: String::from("2.0"), - method: request.method, - result: None, - error: Some(response), - }, - Ok(response) => RpcResponse { - id: id, - jsonrpc: String::from("2.0"), - method: request.method, - result: Some(response), - error: None, - }, - }; - if let Ok(rpc_response) = serde_json::to_string(&resp) { - // Send the reply - workers_l[num].write_message(rpc_response); - } else { - warn!("handle_rpc_requests: failed responding to {:?}", request.id); - }; - } - None => {} // No message for us from this worker - } - } + // Handle KEEPALIVE message + fn handle_keepalive(&self) -> Result { + return Ok("ok".into()); } - // Handle STATUS message - fn handle_status(&self, worker_stats: &mut WorkerStats) -> Result { + fn handle_status(&self, worker_id: usize) -> Result { // Return worker status in json for use by a dashboard or healthcheck. let status = WorkerStatus { - id: worker_stats.id.clone(), - height: self.current_block_versions.last().unwrap().header.height, - difficulty: worker_stats.pow_difficulty, - accepted: worker_stats.num_accepted, - rejected: worker_stats.num_rejected, - stale: worker_stats.num_stale, + id: self.workers.stratum_stats.read().worker_stats[worker_id] + .id + .clone(), + height: self + .current_block_versions + .read() + .last() + .unwrap() + .header + .height, + difficulty: self.workers.stratum_stats.read().worker_stats[worker_id].pow_difficulty, + accepted: self.workers.stratum_stats.read().worker_stats[worker_id].num_accepted, + rejected: self.workers.stratum_stats.read().worker_stats[worker_id].num_rejected, + stale: self.workers.stratum_stats.read().worker_stats[worker_id].num_stale, }; - if worker_stats.initial_block_height == 0 { - worker_stats.initial_block_height = status.height; - } - debug!("(Server ID: {}) Status of worker: {} - Share Accepted: {}, Rejected: {}, Stale: {}. Blocks Found: {}/{}", - self.id, - worker_stats.id, - worker_stats.num_accepted, - worker_stats.num_rejected, - worker_stats.num_stale, - worker_stats.num_blocks_found, - status.height - worker_stats.initial_block_height, - ); let response = serde_json::to_value(&status).unwrap(); return Ok(response); } - // Handle GETJOBTEMPLATE message - fn handle_getjobtemplate(&self) -> Result { + fn handle_getjobtemplate(&self) -> Result { // Build a JobTemplate from a BlockHeader and return JSON let job_template = self.build_block_template(); let response = serde_json::to_value(&job_template).unwrap(); @@ -431,21 +314,31 @@ impl StratumServer { return Ok(response); } - // Handle KEEPALIVE message - fn handle_keepalive(&self) -> Result { - return Ok(serde_json::to_value("ok".to_string()).unwrap()); - } - - // Handle LOGIN message - fn handle_login(&self, params: Option, worker: &mut Worker) -> Result { - let params: LoginParams = parse_params(params)?; - worker.login = Some(params.login); - // XXX TODO Future - Validate password? - worker.agent = params.agent; - worker.authenticated = true; - return Ok(serde_json::to_value("ok".to_string()).unwrap()); + // Build and return a JobTemplate for mining the current block + fn build_block_template(&self) -> JobTemplate { + let bh = self + .current_block_versions + .read() + .last() + .unwrap() + .header + .clone(); + // Serialize the block header into pre and post nonce strings + let mut header_buf = vec![]; + { + let mut writer = ser::BinWriter::new(&mut header_buf); + bh.write_pre_pow(&mut writer).unwrap(); + bh.pow.write_pre_pow(bh.version, &mut writer).unwrap(); + } + let pre_pow = util::to_hex(header_buf); + let job_template = JobTemplate { + height: bh.height, + job_id: (self.current_block_versions.read().len() - 1) as u64, + difficulty: *self.minimum_share_difficulty.read(), + pre_pow, + }; + return job_template; } - // Handle SUBMIT message // params contains a solved block header // We accept and log valid shares of all difficulty above configured minimum @@ -454,27 +347,30 @@ impl StratumServer { fn handle_submit( &self, params: Option, - worker: &mut Worker, - worker_stats: &mut WorkerStats, - ) -> Result<(Value, bool), Value> { + worker_id: usize, + ) -> Result<(Value, bool), RpcError> { // Validate parameters let params: SubmitParams = parse_params(params)?; + let current_block_versions = self.current_block_versions.read(); // Find the correct version of the block to match this header - let b: Option<&Block> = self.current_block_versions.get(params.job_id as usize); - if params.height != self.current_block_versions.last().unwrap().header.height || b.is_none() + let b: Option<&Block> = current_block_versions.get(params.job_id as usize); + if params.height + != self + .current_block_versions + .read() + .last() + .unwrap() + .header + .height || b.is_none() { // Return error status error!( - "(Server ID: {}) Share at height {}, edge_bits {}, nonce {}, job_id {} submitted too late", - self.id, params.height, params.edge_bits, params.nonce, params.job_id, - ); - worker_stats.num_stale += 1; - let e = RpcError { - code: -32503, - message: "Solution submitted too late".to_string(), - }; - return Err(serde_json::to_value(e).unwrap()); + "(Server ID: {}) Share at height {}, edge_bits {}, nonce {}, job_id {} submitted too late", + self.id, params.height, params.edge_bits, params.nonce, params.job_id, + ); + self.workers.update_stats(worker_id, |ws| ws.num_stale += 1); + return Err(RpcError::too_late()); } let share_difficulty: u64; @@ -489,109 +385,115 @@ impl StratumServer { if !b.header.pow.is_primary() && !b.header.pow.is_secondary() { // Return error status error!( - "(Server ID: {}) Failed to validate solution at height {}, hash {}, edge_bits {}, nonce {}, job_id {}: cuckoo size too small", - self.id, params.height, b.hash(), params.edge_bits, params.nonce, params.job_id, - ); - worker_stats.num_rejected += 1; - let e = RpcError { - code: -32502, - message: "Failed to validate solution".to_string(), - }; - return Err(serde_json::to_value(e).unwrap()); + "(Server ID: {}) Failed to validate solution at height {}, hash {}, edge_bits {}, nonce {}, job_id {}: cuckoo size too small", + self.id, params.height, b.hash(), params.edge_bits, params.nonce, params.job_id, + ); + self.workers + .update_stats(worker_id, |worker_stats| worker_stats.num_rejected += 1); + return Err(RpcError::cannot_validate()); } // Get share difficulty share_difficulty = b.header.pow.to_difficulty(b.header.height).to_num(); // If the difficulty is too low its an error - if share_difficulty < self.minimum_share_difficulty { + if share_difficulty < *self.minimum_share_difficulty.read() { // Return error status error!( - "(Server ID: {}) Share at height {}, hash {}, edge_bits {}, nonce {}, job_id {} rejected due to low difficulty: {}/{}", - self.id, params.height, b.hash(), params.edge_bits, params.nonce, params.job_id, share_difficulty, self.minimum_share_difficulty, - ); - worker_stats.num_rejected += 1; - let e = RpcError { - code: -32501, - message: "Share rejected due to low difficulty".to_string(), - }; - return Err(serde_json::to_value(e).unwrap()); + "(Server ID: {}) Share at height {}, hash {}, edge_bits {}, nonce {}, job_id {} rejected due to low difficulty: {}/{}", + self.id, params.height, b.hash(), params.edge_bits, params.nonce, params.job_id, share_difficulty, *self.minimum_share_difficulty.read(), + ); + self.workers + .update_stats(worker_id, |worker_stats| worker_stats.num_rejected += 1); + return Err(RpcError::too_low_difficulty()); } + // If the difficulty is high enough, submit it (which also validates it) - if share_difficulty >= self.current_difficulty { + if share_difficulty >= *self.current_difficulty.read() { // This is a full solution, submit it to the network let res = self.chain.process_block(b.clone(), chain::Options::MINE); if let Err(e) = res { // Return error status error!( - "(Server ID: {}) Failed to validate solution at height {}, hash {}, edge_bits {}, nonce {}, job_id {}, {}: {}", - self.id, - params.height, - b.hash(), - params.edge_bits, - params.nonce, - params.job_id, - e, - e.backtrace().unwrap(), - ); - worker_stats.num_rejected += 1; - let e = RpcError { - code: -32502, - message: "Failed to validate solution".to_string(), - }; - return Err(serde_json::to_value(e).unwrap()); + "(Server ID: {}) Failed to validate solution at height {}, hash {}, edge_bits {}, nonce {}, job_id {}, {}: {}", + self.id, + params.height, + b.hash(), + params.edge_bits, + params.nonce, + params.job_id, + e, + e.backtrace().unwrap(), + ); + self.workers + .update_stats(worker_id, |worker_stats| worker_stats.num_rejected += 1); + return Err(RpcError::cannot_validate()); } share_is_block = true; - worker_stats.num_blocks_found += 1; + self.workers + .update_stats(worker_id, |worker_stats| worker_stats.num_blocks_found += 1); // Log message to make it obvious we found a block warn!( - "(Server ID: {}) Solution Found for block {}, hash {} - Yay!!! Worker ID: {}, blocks found: {}, shares: {}", - self.id, params.height, - b.hash(), - worker_stats.id, - worker_stats.num_blocks_found, - worker_stats.num_accepted, - ); + "(Server ID: {}) Solution Found for block {}, hash {} - Yay!!! Worker ID: {}, blocks found: {}, shares: {}", + self.id, params.height, + b.hash(), + self.workers.stratum_stats.read().worker_stats[worker_id].id, + self.workers.stratum_stats.read().worker_stats[worker_id].num_blocks_found, + self.workers.stratum_stats.read().worker_stats[worker_id].num_accepted, + ); } else { // Do some validation but dont submit let res = pow::verify_size(&b.header); if !res.is_ok() { // Return error status error!( - "(Server ID: {}) Failed to validate share at height {}, hash {}, edge_bits {}, nonce {}, job_id {}. {:?}", - self.id, - params.height, - b.hash(), - params.edge_bits, - b.header.pow.nonce, - params.job_id, - res, - ); - worker_stats.num_rejected += 1; - let e = RpcError { - code: -32502, - message: "Failed to validate solution".to_string(), - }; - return Err(serde_json::to_value(e).unwrap()); + "(Server ID: {}) Failed to validate share at height {}, hash {}, edge_bits {}, nonce {}, job_id {}. {:?}", + self.id, + params.height, + b.hash(), + params.edge_bits, + b.header.pow.nonce, + params.job_id, + res, + ); + self.workers + .update_stats(worker_id, |worker_stats| worker_stats.num_rejected += 1); + return Err(RpcError::cannot_validate()); } } // Log this as a valid share - let submitted_by = match worker.login.clone() { - None => worker.id.to_string(), + let submitted_by = match self + .workers + .workers_list + .read() + .get(&worker_id) + .unwrap() + .login + .clone() + { + None => self + .workers + .workers_list + .read() + .get(&worker_id) + .unwrap() + .id + .to_string(), Some(login) => login.clone(), }; info!( - "(Server ID: {}) Got share at height {}, hash {}, edge_bits {}, nonce {}, job_id {}, difficulty {}/{}, submitted by {}", - self.id, - b.header.height, - b.hash(), - b.header.pow.proof.edge_bits, - b.header.pow.nonce, - params.job_id, - share_difficulty, - self.current_difficulty, - submitted_by, - ); - worker_stats.num_accepted += 1; + "(Server ID: {}) Got share at height {}, hash {}, edge_bits {}, nonce {}, job_id {}, difficulty {}/{}, submitted by {}", + self.id, + b.header.height, + b.hash(), + b.header.pow.proof.edge_bits, + b.header.pow.nonce, + params.job_id, + share_difficulty, + *self.current_difficulty.read(), + submitted_by, + ); + self.workers + .update_stats(worker_id, |worker_stats| worker_stats.num_accepted += 1); let submit_response; if share_is_block { submit_response = format!("blockfound - {}", b.hash().to_hex()); @@ -603,40 +505,215 @@ impl StratumServer { share_is_block, )); } // handle submit a solution +} - // Purge dead/sick workers - remove all workers marked in error state - fn clean_workers(&mut self, stratum_stats: &mut Arc>) -> usize { - let mut start = 0; - let mut workers_l = self.workers.lock(); - loop { - for num in start..workers_l.len() { - if workers_l[num].error == true { - warn!( - "(Server ID: {}) Dropping worker: {}", - self.id, workers_l[num].id - ); - // Update worker stats - let mut stratum_stats = stratum_stats.write(); - let worker_stats_id = stratum_stats - .worker_stats - .iter() - .position(|r| r.id == workers_l[num].id) - .unwrap(); - stratum_stats.worker_stats[worker_stats_id].is_connected = false; - // Remove the dead worker - workers_l.remove(num); - break; - } - start = num + 1; - } - if start >= workers_l.len() { - let mut stratum_stats = stratum_stats.write(); - stratum_stats.num_workers = workers_l.len(); - return stratum_stats.num_workers; - } +// ---------------------------------------- +// Worker Factory Thread Function +fn accept_connections(listen_addr: SocketAddr, handler: Handler) { + info!("Start tokio stratum server"); + let listener = TcpListener::bind(&listen_addr).expect(&format!( + "Stratum: Failed to bind to listen address {}", + listen_addr + )); + let handler = Arc::new(handler); + let server = listener + .incoming() + .for_each(move |socket| { + // Spawn a task to process the connection + let (tx, rx) = mpsc::unbounded(); + + let worker_id = handler.workers.add_worker(tx); + info!("Worker {} connected", worker_id); + + let (reader, writer) = socket.split(); + let reader = BufReader::new(reader); + let h = handler.clone(); + let workers = h.workers.clone(); + let input = lines(reader) + .for_each(move |line| { + let request = serde_json::from_str(&line)?; + let resp = h.handle_rpc_requests(request, worker_id); + workers.send_to(worker_id, resp); + Ok(()) + }) + .map_err(|e| error!("error {}", e)); + + let output = rx.fold(writer, |writer, s| { + let s2 = s + "\n"; + write_all(writer, s2.into_bytes()) + .map(|(writer, _)| writer) + .map_err(|e| error!("cannot send {}", e)) + }); + + let workers = handler.workers.clone(); + let both = output.map(|_| ()).select(input); + tokio::spawn(both.then(move |_| { + workers.remove_worker(worker_id); + info!("Worker {} disconnected", worker_id); + Ok(()) + })); + + Ok(()) + }) + .map_err(|err| { + error!("accept error = {:?}", err); + }); + tokio::run(server.map(|_| ()).map_err(|_| ())); +} + +// ---------------------------------------- +// Worker Object - a connected stratum client - a miner, pool, proxy, etc... + +pub struct Worker { + id: usize, + agent: String, + login: Option, + authenticated: bool, + tx: Tx, +} + +impl Worker { + /// Creates a new Stratum Worker. + pub fn new(id: usize, tx: Tx) -> Worker { + Worker { + id: id, + agent: String::from(""), + login: None, + authenticated: false, + tx: tx, + } + } +} // impl Worker + +struct WorkersList { + workers_list: Arc>>, + stratum_stats: Arc>, +} + +impl WorkersList { + pub fn new(stratum_stats: Arc>) -> Self { + WorkersList { + workers_list: Arc::new(RwLock::new(HashMap::new())), + stratum_stats: stratum_stats, + } + } + + pub fn add_worker(&self, tx: Tx) -> usize { + let mut stratum_stats = self.stratum_stats.write(); + let worker_id = stratum_stats.worker_stats.len(); + let worker = Worker::new(worker_id, tx); + let mut workers_list = self.workers_list.write(); + workers_list.insert(worker_id, worker); + + let mut worker_stats = WorkerStats::default(); + worker_stats.is_connected = true; + worker_stats.id = worker_id.to_string(); + worker_stats.pow_difficulty = 1; // XXX TODO + stratum_stats.worker_stats.push(worker_stats); + stratum_stats.num_workers = workers_list.len(); + worker_id + } + pub fn remove_worker(&self, worker_id: usize) { + self.update_stats(worker_id, |ws| ws.is_connected = false); + self.workers_list + .write() + .remove(&worker_id) + .expect("Stratum: no such addr in map"); + self.stratum_stats.write().num_workers = self.workers_list.read().len(); + } + + pub fn last_seen(&self, worker_id: usize) { + //self.stratum_stats.write().worker_stats[worker_id].last_seen = SystemTime::now(); + self.update_stats(worker_id, |ws| ws.last_seen = SystemTime::now()); + } + + pub fn update_stats(&self, worker_id: usize, f: impl FnOnce(&mut WorkerStats) -> ()) { + let mut stratum_stats = self.stratum_stats.write(); + f(&mut stratum_stats.worker_stats[worker_id]); + } + + pub fn send_to(&self, worker_id: usize, msg: String) { + self.workers_list + .read() + .get(&worker_id) + .unwrap() + .tx + .unbounded_send(msg); + } + pub fn count(&self) -> usize { + self.workers_list.read().len() + } +} + +// ---------------------------------------- +// Grin Stratum Server + +pub struct StratumServer { + id: String, + config: StratumServerConfig, + chain: Arc, + tx_pool: Arc>, + verifier_cache: Arc>, + current_block_versions: Arc>>, + current_difficulty: Arc>, + minimum_share_difficulty: Arc>, + current_key_id: Arc>>, + workers: Arc, + sync_state: Arc, + stratum_stats: Arc>, +} + +impl StratumServer { + /// Creates a new Stratum Server. + pub fn new( + config: StratumServerConfig, + chain: Arc, + tx_pool: Arc>, + verifier_cache: Arc>, + stratum_stats: Arc>, + ) -> StratumServer { + StratumServer { + id: String::from("0"), + minimum_share_difficulty: Arc::new(RwLock::new(config.minimum_share_difficulty)), + config, + chain, + tx_pool, + verifier_cache, + current_block_versions: Arc::new(RwLock::new(Vec::new())), + current_difficulty: Arc::new(RwLock::new(::max_value())), + current_key_id: Arc::new(RwLock::new(None)), + workers: Arc::new(WorkersList::new(stratum_stats.clone())), + sync_state: Arc::new(SyncState::new()), + stratum_stats: stratum_stats, } } + // Build and return a JobTemplate for mining the current block + fn build_block_template(&self) -> JobTemplate { + let bh = self + .current_block_versions + .read() + .last() + .unwrap() + .header + .clone(); + // Serialize the block header into pre and post nonce strings + let mut header_buf = vec![]; + { + let mut writer = ser::BinWriter::new(&mut header_buf); + bh.write_pre_pow(&mut writer).unwrap(); + bh.pow.write_pre_pow(bh.version, &mut writer).unwrap(); + } + let pre_pow = util::to_hex(header_buf); + let job_template = JobTemplate { + height: bh.height, + job_id: (self.current_block_versions.read().len() - 1) as u64, + difficulty: *self.minimum_share_difficulty.read(), + pre_pow, + }; + return job_template; + } + // Broadcast a jobtemplate RpcRequest to all connected workers - no response // expected fn broadcast_job(&mut self) { @@ -656,12 +733,8 @@ impl StratumServer { "(Server ID: {}) sending block {} with id {} to stratum clients", self.id, job_template.height, job_template.job_id, ); - // Push the new block to all connected clients - // NOTE: We do not give a unique nonce (should we?) so miners need - // to choose one for themselves - let mut workers_l = self.workers.lock(); - for num in 0..workers_l.len() { - workers_l[num].write_message(job_request_json.clone()); + for worker in self.workers.workers_list.read().values() { + worker.tx.unbounded_send(job_request_json.clone()); } } @@ -670,13 +743,7 @@ impl StratumServer { /// existing chain anytime required and sending that to the connected /// stratum miner, proxy, or pool, and accepts full solutions to /// be submitted. - pub fn run_loop( - &mut self, - stratum_stats: Arc>, - edge_bits: u32, - proof_size: usize, - sync_state: Arc, - ) { + pub fn run_loop(&mut self, edge_bits: u32, proof_size: usize, sync_state: Arc) { info!( "(Server ID: {}) Starting stratum server with edge_bits = {}, proof_size = {}", self.id, edge_bits, proof_size @@ -691,24 +758,29 @@ impl StratumServer { // iteration, we keep the returned derivation to provide it back when // nothing has changed. We only want to create a key_id for each new block, // and reuse it when we rebuild the current block to add new tx. - let mut num_workers: usize; let mut head = self.chain.head().unwrap(); let mut current_hash = head.prev_block_h; let mut latest_hash; - let listen_addr = self.config.stratum_server_addr.clone().unwrap(); - self.current_block_versions.push(Block::default()); + let listen_addr = self + .config + .stratum_server_addr + .clone() + .unwrap() + .parse() + .expect("Stratum: Incorrect address "); + { + self.current_block_versions.write().push(Block::default()); + } + + let handler = Handler::from_stratum(&self); - // Start a thread to accept new worker connections - let mut workers_th = self.workers.clone(); - let id_th = self.id.clone(); - let mut stats_th = stratum_stats.clone(); let _listener_th = thread::spawn(move || { - accept_workers(id_th, listen_addr, &mut workers_th, &mut stats_th); + accept_connections(listen_addr, handler); }); // We have started { - let mut stratum_stats = stratum_stats.write(); + let mut stratum_stats = self.stratum_stats.write(); stratum_stats.is_running = true; stratum_stats.edge_bits = edge_bits as u16; } @@ -720,19 +792,11 @@ impl StratumServer { // Initial Loop. Waiting node complete syncing while self.sync_state.is_syncing() { - self.clean_workers(&mut stratum_stats.clone()); - - // Handle any messages from the workers - self.handle_rpc_requests(&mut stratum_stats.clone()); - thread::sleep(Duration::from_millis(50)); } // Main Loop loop { - // Remove workers with failed connections - num_workers = self.clean_workers(&mut stratum_stats.clone()); - // get the latest chain state head = self.chain.head().unwrap(); latest_hash = head.last_block_h; @@ -742,69 +806,71 @@ impl StratumServer { // or We are rebuilding the current one to include new transactions // and there is at least one worker connected if (current_hash != latest_hash || Utc::now().timestamp() >= deadline) - && num_workers > 0 + && self.workers.count() > 0 { - let mut wallet_listener_url: Option = None; - if !self.config.burn_reward { - wallet_listener_url = Some(self.config.wallet_listener_url.clone()); - } - // If this is a new block, clear the current_block version history - if current_hash != latest_hash { - self.current_block_versions.clear(); - } - // Build the new block (version) - let (new_block, block_fees) = mine_block::get_block( - &self.chain, - &self.tx_pool, - self.verifier_cache.clone(), - self.current_key_id.clone(), - wallet_listener_url, - ); - self.current_difficulty = - (new_block.header.total_difficulty() - head.total_difficulty).to_num(); - self.current_key_id = block_fees.key_id(); - current_hash = latest_hash; - // set the minimum acceptable share difficulty for this block - self.minimum_share_difficulty = cmp::min( - self.config.minimum_share_difficulty, - self.current_difficulty, - ); - // set a new deadline for rebuilding with fresh transactions - deadline = Utc::now().timestamp() + attempt_time_per_block as i64; - { - let mut stratum_stats = stratum_stats.write(); + let mut current_block_versions = self.current_block_versions.write(); + let mut wallet_listener_url: Option = None; + if !self.config.burn_reward { + wallet_listener_url = Some(self.config.wallet_listener_url.clone()); + } + // If this is a new block, clear the current_block version history + if current_hash != latest_hash { + current_block_versions.clear(); + } + // Build the new block (version) + let (new_block, block_fees) = mine_block::get_block( + &self.chain, + &self.tx_pool, + self.verifier_cache.clone(), + self.current_key_id.read().clone(), + wallet_listener_url, + ); + { + let mut current_difficulty = self.current_difficulty.write(); + *current_difficulty = + (new_block.header.total_difficulty() - head.total_difficulty).to_num(); + } + { + let mut current_key_id = self.current_key_id.write(); + *current_key_id = block_fees.key_id(); + } + current_hash = latest_hash; + { + // set the minimum acceptable share difficulty for this block + let mut minimum_share_difficulty = self.minimum_share_difficulty.write(); + *minimum_share_difficulty = cmp::min( + self.config.minimum_share_difficulty, + *self.current_difficulty.read(), + ); + } + // set a new deadline for rebuilding with fresh transactions + deadline = Utc::now().timestamp() + attempt_time_per_block as i64; + + let mut stratum_stats = self.stratum_stats.write(); stratum_stats.block_height = new_block.header.height; - stratum_stats.network_difficulty = self.current_difficulty; + stratum_stats.network_difficulty = *self.current_difficulty.read(); + + // Add this new block version to our current block map + current_block_versions.push(new_block); } - // Add this new block version to our current block map - self.current_block_versions.push(new_block); // Send this job to all connected workers self.broadcast_job(); } - // Handle any messages from the workers - self.handle_rpc_requests(&mut stratum_stats.clone()); - // sleep before restarting loop - thread::sleep(Duration::from_micros(1)); + thread::sleep(Duration::from_millis(5)); } // Main Loop } // fn run_loop() } // StratumServer // Utility function to parse a JSON RPC parameter object, returning a proper // error if things go wrong. -fn parse_params(params: Option) -> Result +fn parse_params(params: Option) -> Result where for<'de> T: serde::Deserialize<'de>, { params .and_then(|v| serde_json::from_value(v).ok()) - .ok_or_else(|| { - let e = RpcError { - code: -32600, - message: "Invalid Request".to_string(), - }; - serde_json::to_value(e).unwrap() - }) + .ok_or(RpcError::invalid_request()) }