From 37605210a3f4684bc35a50dfaefe906eccff95e9 Mon Sep 17 00:00:00 2001 From: Max Lavrenov Date: Wed, 9 Jan 2019 11:45:47 +0100 Subject: [PATCH 01/11] introduce an async version of stratum server. It seems that current approach stops workring after amount of parallel connection exceeds ~500. This PR introduces an 'async' feature which enable asyncronius, tokio based implementation of stratum server. --- Cargo.lock | 1 + Cargo.toml | 3 + servers/Cargo.toml | 6 + servers/src/grin/server.rs | 4 +- servers/src/mining.rs | 11 +- .../stratumserver/stratumserver_async.rs | 848 ++++++++++++++++++ .../stratumserver_std.rs} | 25 +- 7 files changed, 881 insertions(+), 17 deletions(-) create mode 100644 servers/src/mining/stratumserver/stratumserver_async.rs rename servers/src/mining/{stratumserver.rs => stratumserver/stratumserver_std.rs} (97%) diff --git a/Cargo.lock b/Cargo.lock index c4df8657ab..4cdbc2bd05 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -894,6 +894,7 @@ dependencies = [ "serde 1.0.81 (registry+https://github.com/rust-lang/crates.io-index)", "serde_derive 1.0.81 (registry+https://github.com/rust-lang/crates.io-index)", "serde_json 1.0.33 (registry+https://github.com/rust-lang/crates.io-index)", + "tokio 0.1.11 (registry+https://github.com/rust-lang/crates.io-index)", ] [[package]] diff --git a/Cargo.toml b/Cargo.toml index 40b80c2afa..5fea267748 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -54,3 +54,6 @@ tar = "0.4" [dev-dependencies] grin_chain = { path = "./chain", version = "1.0.1" } grin_store = { path = "./store", version = "1.0.1" } + +[features] +async = ['grin_servers/async'] diff --git a/servers/Cargo.toml b/servers/Cargo.toml index 476b3c670c..008b7bd0df 100644 --- a/servers/Cargo.toml +++ b/servers/Cargo.toml @@ -24,6 +24,7 @@ serde_json = "1" chrono = "0.4.4" bufstream = "~0.1" jsonrpc-core = "~8.0" +tokio = { version = "0.1.11", optional = true} grin_api = { path = "../api", version = "1.0.1" } grin_chain = { path = "../chain", version = "1.0.1" } @@ -37,3 +38,8 @@ grin_wallet = { path = "../wallet", version = "1.0.1" } [dev-dependencies] blake2-rfc = "0.2" + +[features] +async = ['tokio'] + + diff --git a/servers/src/grin/server.rs b/servers/src/grin/server.rs index ea2532a77e..bcd404b90b 100644 --- a/servers/src/grin/server.rs +++ b/servers/src/grin/server.rs @@ -302,12 +302,12 @@ impl Server { self.chain.clone(), self.tx_pool.clone(), self.verifier_cache.clone(), + self.state_info.stratum_stats.clone(), ); - let stratum_stats = self.state_info.stratum_stats.clone(); let _ = thread::Builder::new() .name("stratum_server".to_string()) .spawn(move || { - stratum_server.run_loop(stratum_stats, edge_bits as u32, proof_size, sync_state); + stratum_server.run_loop(edge_bits as u32, proof_size, sync_state); }); } diff --git a/servers/src/mining.rs b/servers/src/mining.rs index e07e201760..c60f68ddbf 100644 --- a/servers/src/mining.rs +++ b/servers/src/mining.rs @@ -15,5 +15,14 @@ //! Mining + Mining server mod mine_block; -pub mod stratumserver; +pub mod stratumserver { + #[cfg(not(feature = "async"))] + mod stratumserver_std; + #[cfg(not(feature = "async"))] + pub use self::stratumserver_std::*; + #[cfg(feature = "async")] + mod stratumserver_async; + #[cfg(feature = "async")] + pub use self::stratumserver_async::*; +} pub mod test_miner; diff --git a/servers/src/mining/stratumserver/stratumserver_async.rs b/servers/src/mining/stratumserver/stratumserver_async.rs new file mode 100644 index 0000000000..af727ea5bf --- /dev/null +++ b/servers/src/mining/stratumserver/stratumserver_async.rs @@ -0,0 +1,848 @@ +// Copyright 2018 The Grin Developers +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +//! Mining Stratum Server + +use futures::future::Future; +use futures::stream::Stream; +use tokio::io::AsyncRead; +use tokio::io::{lines, write_all}; +use tokio::net::TcpListener; + +use crate::util::RwLock; +use chrono::prelude::Utc; +use serde; +use serde_json; +use serde_json::Value; +use std::io::BufReader; +//use std::net::{TcpListener, TcpStream}; +use std::collections::HashMap; +use std::net::SocketAddr; +use std::sync::Arc; +use std::time::{Duration, SystemTime}; +use std::{cmp, thread}; + +use crate::chain; +use crate::common::stats::{StratumStats, WorkerStats}; +use crate::common::types::{StratumServerConfig, SyncState}; +use crate::core::core::verifier_cache::VerifierCache; +use crate::core::core::Block; +use crate::core::{pow, ser}; +use crate::keychain; +use crate::mining::mine_block; +use crate::pool; +use crate::util; + +use futures::sync::mpsc; + +type Tx = mpsc::UnboundedSender; + +// ---------------------------------------- +// http://www.jsonrpc.org/specification +// RPC Methods + +#[derive(Serialize, Deserialize, Debug)] +struct RpcRequest { + id: String, + jsonrpc: String, + method: String, + params: Option, +} + +#[derive(Serialize, Deserialize, Debug)] +struct RpcResponse { + id: String, + jsonrpc: String, + method: String, + result: Option, + error: Option, +} + +#[derive(Serialize, Deserialize, Debug)] +struct RpcError { + code: i32, + message: String, +} + +#[derive(Serialize, Deserialize, Debug)] +struct LoginParams { + login: String, + pass: String, + agent: String, +} + +#[derive(Serialize, Deserialize, Debug)] +struct SubmitParams { + height: u64, + job_id: u64, + nonce: u64, + edge_bits: u32, + pow: Vec, +} + +#[derive(Serialize, Deserialize, Debug)] +pub struct JobTemplate { + height: u64, + job_id: u64, + difficulty: u64, + pre_pow: String, +} + +#[derive(Serialize, Deserialize, Debug)] +pub struct WorkerStatus { + id: String, + height: u64, + difficulty: u64, + accepted: u64, + rejected: u64, + stale: u64, +} + +struct Handler { + id: String, + workers: Arc, + current_block_versions: Arc>>, + sync_state: Arc, + minimum_share_difficulty: u64, + current_key_id: Arc>>, + current_difficulty: u64, + chain: Arc, +} + +impl Handler { + pub fn new( + id: String, + workers: Arc, + current_block_versions: Arc>>, + sync_state: Arc, + minimum_share_difficulty: u64, + current_key_id: Arc>>, + current_difficulty: u64, + chain: Arc, + ) -> Self { + Handler { + id: id, + workers: workers, + current_block_versions: current_block_versions, + sync_state: sync_state, + minimum_share_difficulty: minimum_share_difficulty, + current_key_id: current_key_id, + current_difficulty: current_difficulty, + chain: chain, + } + } + pub fn from_stratum(stratum: &StratumServer) -> Self { + Handler::new( + stratum.id.clone(), + stratum.workers.clone(), + stratum.current_block_versions.clone(), + stratum.sync_state.clone(), + stratum.minimum_share_difficulty, + stratum.current_key_id.clone(), + stratum.current_difficulty, + stratum.chain.clone(), + ) + } + fn handle_rpc_requests(&self, request: RpcRequest, worker_id: usize) -> String { + self.workers.last_seen(worker_id); + + // Call the handler function for requested method + let response = match request.method.as_str() { + "login" => self.handle_login(request.params, worker_id), + "submit" => { + let res = self.handle_submit(request.params, worker_id); + // this key_id has been used now, reset + if let Ok((_, true)) = res { + let mut current_key_id = self.current_key_id.write(); + *current_key_id = None; + } + res.map(|(v, _)| v) + } + "keepalive" => self.handle_keepalive(), + "getjobtemplate" => { + if self.sync_state.is_syncing() { + let e = RpcError { + code: -32000, + message: "Node is syncing - Please wait".to_string(), + }; + Err(serde_json::to_value(e).unwrap()) + } else { + self.handle_getjobtemplate() + } + } + "status" => self.handle_status(worker_id), + _ => { + // Called undefined method + let e = RpcError { + code: -32601, + message: "Method not found".to_string(), + }; + Err(serde_json::to_value(e).unwrap()) + } + }; + + // Package the reply as RpcResponse json + let rpc_response: String; + match response { + Err(response) => { + let resp = RpcResponse { + id: request.id, + jsonrpc: String::from("2.0"), + method: request.method, + result: None, + error: Some(response), + }; + rpc_response = serde_json::to_string(&resp).unwrap(); + } + Ok(response) => { + let resp = RpcResponse { + id: request.id, + jsonrpc: String::from("2.0"), + method: request.method, + result: Some(response), + error: None, + }; + rpc_response = serde_json::to_string(&resp).unwrap(); + } + } + rpc_response + } + fn handle_login(&self, params: Option, worker_id: usize) -> Result { + let params: LoginParams = parse_params(params)?; + let mut workers = self.workers.workers_list.write(); + let worker = workers.get_mut(&worker_id).unwrap(); + worker.login = Some(params.login); + // XXX TODO Future - Validate password? + worker.agent = params.agent; + worker.authenticated = true; + return Ok(serde_json::to_value("ok".to_string()).unwrap()); + } + + // Handle KEEPALIVE message + fn handle_keepalive(&self) -> Result { + return Ok(serde_json::to_value("ok".to_string()).unwrap()); + } + + fn handle_status(&self, worker_id: usize) -> Result { + // Return worker status in json for use by a dashboard or healthcheck. + let status = WorkerStatus { + id: self.workers.stratum_stats.read().worker_stats[worker_id] + .id + .clone(), + height: self + .current_block_versions + .read() + .last() + .unwrap() + .header + .height, + difficulty: self.workers.stratum_stats.read().worker_stats[worker_id].pow_difficulty, + accepted: self.workers.stratum_stats.read().worker_stats[worker_id].num_accepted, + rejected: self.workers.stratum_stats.read().worker_stats[worker_id].num_rejected, + stale: self.workers.stratum_stats.read().worker_stats[worker_id].num_stale, + }; + let response = serde_json::to_value(&status).unwrap(); + return Ok(response); + } + // Handle GETJOBTEMPLATE message + fn handle_getjobtemplate(&self) -> Result { + // Build a JobTemplate from a BlockHeader and return JSON + let job_template = self.build_block_template(); + let response = serde_json::to_value(&job_template).unwrap(); + debug!( + "(Server ID: {}) sending block {} with id {} to single worker", + self.id, job_template.height, job_template.job_id, + ); + return Ok(response); + } + + // Build and return a JobTemplate for mining the current block + fn build_block_template(&self) -> JobTemplate { + let bh = self + .current_block_versions + .read() + .last() + .unwrap() + .header + .clone(); + // Serialize the block header into pre and post nonce strings + let mut header_buf = vec![]; + { + let mut writer = ser::BinWriter::new(&mut header_buf); + bh.write_pre_pow(&mut writer).unwrap(); + bh.pow.write_pre_pow(bh.version, &mut writer).unwrap(); + } + let pre_pow = util::to_hex(header_buf); + let job_template = JobTemplate { + height: bh.height, + job_id: (self.current_block_versions.read().len() - 1) as u64, + difficulty: self.minimum_share_difficulty, + pre_pow, + }; + return job_template; + } + // Handle SUBMIT message + // params contains a solved block header + // We accept and log valid shares of all difficulty above configured minimum + // Accepted shares that are full solutions will also be submitted to the + // network + fn handle_submit( + &self, + params: Option, + worker_id: usize, + ) -> Result<(Value, bool), Value> { + // Validate parameters + let params: SubmitParams = parse_params(params)?; + + let current_block_versions = self.current_block_versions.read(); + // Find the correct version of the block to match this header + let b: Option<&Block> = current_block_versions.get(params.job_id as usize); + if params.height + != self + .current_block_versions + .read() + .last() + .unwrap() + .header + .height || b.is_none() + { + // Return error status + error!( + "(Server ID: {}) Share at height {}, edge_bits {}, nonce {}, job_id {} submitted too late", + self.id, params.height, params.edge_bits, params.nonce, params.job_id, + ); + self.workers.update_stats(worker_id, |ws| ws.num_stale += 1); + let e = RpcError { + code: -32503, + message: "Solution submitted too late".to_string(), + }; + return Err(serde_json::to_value(e).unwrap()); + } + + let share_difficulty: u64; + let mut share_is_block = false; + + let mut b: Block = b.unwrap().clone(); + // Reconstruct the blocks header with this nonce and pow added + b.header.pow.proof.edge_bits = params.edge_bits as u8; + b.header.pow.nonce = params.nonce; + b.header.pow.proof.nonces = params.pow; + + if !b.header.pow.is_primary() && !b.header.pow.is_secondary() { + // Return error status + error!( + "(Server ID: {}) Failed to validate solution at height {}, hash {}, edge_bits {}, nonce {}, job_id {}: cuckoo size too small", + self.id, params.height, b.hash(), params.edge_bits, params.nonce, params.job_id, + ); + self.workers + .update_stats(worker_id, |worker_stats| worker_stats.num_rejected += 1); + let e = RpcError { + code: -32502, + message: "Failed to validate solution".to_string(), + }; + return Err(serde_json::to_value(e).unwrap()); + } + + // Get share difficulty + share_difficulty = b.header.pow.to_difficulty(b.header.height).to_num(); + // If the difficulty is too low its an error + if share_difficulty < self.minimum_share_difficulty { + // Return error status + error!( + "(Server ID: {}) Share at height {}, hash {}, edge_bits {}, nonce {}, job_id {} rejected due to low difficulty: {}/{}", + self.id, params.height, b.hash(), params.edge_bits, params.nonce, params.job_id, share_difficulty, self.minimum_share_difficulty, + ); + self.workers + .update_stats(worker_id, |worker_stats| worker_stats.num_rejected += 1); + let e = RpcError { + code: -32501, + message: "Share rejected due to low difficulty".to_string(), + }; + return Err(serde_json::to_value(e).unwrap()); + } + // If the difficulty is high enough, submit it (which also validates it) + if share_difficulty >= self.current_difficulty { + // This is a full solution, submit it to the network + let res = self.chain.process_block(b.clone(), chain::Options::MINE); + if let Err(e) = res { + // Return error status + error!( + "(Server ID: {}) Failed to validate solution at height {}, hash {}, edge_bits {}, nonce {}, job_id {}, {}: {}", + self.id, + params.height, + b.hash(), + params.edge_bits, + params.nonce, + params.job_id, + e, + e.backtrace().unwrap(), + ); + self.workers + .update_stats(worker_id, |worker_stats| worker_stats.num_rejected += 1); + let e = RpcError { + code: -32502, + message: "Failed to validate solution".to_string(), + }; + return Err(serde_json::to_value(e).unwrap()); + } + share_is_block = true; + self.workers + .update_stats(worker_id, |worker_stats| worker_stats.num_blocks_found += 1); + // Log message to make it obvious we found a block + warn!( + "(Server ID: {}) Solution Found for block {}, hash {} - Yay!!! Worker ID: {}, blocks found: {}, shares: {}", + self.id, params.height, + b.hash(), + self.workers.stratum_stats.read().worker_stats[worker_id].id, + self.workers.stratum_stats.read().worker_stats[worker_id].num_blocks_found, + self.workers.stratum_stats.read().worker_stats[worker_id].num_accepted, + ); + } else { + // Do some validation but dont submit + let res = pow::verify_size(&b.header); + if !res.is_ok() { + // Return error status + error!( + "(Server ID: {}) Failed to validate share at height {}, hash {}, edge_bits {}, nonce {}, job_id {}. {:?}", + self.id, + params.height, + b.hash(), + params.edge_bits, + b.header.pow.nonce, + params.job_id, + res, + ); + self.workers + .update_stats(worker_id, |worker_stats| worker_stats.num_rejected += 1); + let e = RpcError { + code: -32502, + message: "Failed to validate solution".to_string(), + }; + return Err(serde_json::to_value(e).unwrap()); + } + } + // Log this as a valid share + let submitted_by = match self + .workers + .workers_list + .read() + .get(&worker_id) + .unwrap() + .login + .clone() + { + None => self + .workers + .workers_list + .read() + .get(&worker_id) + .unwrap() + .id + .to_string(), + Some(login) => login.clone(), + }; + info!( + "(Server ID: {}) Got share at height {}, hash {}, edge_bits {}, nonce {}, job_id {}, difficulty {}/{}, submitted by {}", + self.id, + b.header.height, + b.hash(), + b.header.pow.proof.edge_bits, + b.header.pow.nonce, + params.job_id, + share_difficulty, + self.current_difficulty, + submitted_by, + ); + self.workers + .update_stats(worker_id, |worker_stats| worker_stats.num_accepted += 1); + let submit_response; + if share_is_block { + submit_response = format!("blockfound - {}", b.hash().to_hex()); + } else { + submit_response = "ok".to_string(); + } + return Ok(( + serde_json::to_value(submit_response).unwrap(), + share_is_block, + )); + } // handle submit a solution +} + +// ---------------------------------------- +// Worker Factory Thread Function +fn accept_connections(listen_addr: SocketAddr, handler: Handler) { + let listener = TcpListener::bind(&listen_addr).expect("Failed to bind to listen address"); + let handler = Arc::new(handler); + let server = listener + .incoming() + .for_each(move |socket| { + // Spawn a task to process the connection + let (tx, rx) = mpsc::unbounded(); + + let worker_id = handler.workers.add_worker(tx); + info!("Worker {} connected", worker_id); + + let (reader, writer) = socket.split(); + let reader = BufReader::new(reader); + let h = handler.clone(); + let workers = h.workers.clone(); + let input = lines(reader) + .for_each(move |line| { + let request = serde_json::from_str(&line)?; + let resp = h.handle_rpc_requests(request, worker_id); + workers.send_to(worker_id, resp); + Ok(()) + }) + .map_err(|e| error!("error {}", e)); + + let output = rx.fold(writer, |writer, s| { + let s2 = s + "\n"; + write_all(writer, s2.into_bytes()) + .map(|(writer, _)| writer) + .map_err(|e| error!("cannot send {}", e)) + }); + + let workers = handler.workers.clone(); + let both = output.map(|_| ()).select(input); + tokio::spawn(both.then(move |_| { + workers.remove_worker(worker_id); + info!("Worker {} disconnected", worker_id); + Ok(()) + })); + + Ok(()) + }) + .map_err(|err| { + error!("accept error = {:?}", err); + }); + tokio::run(server.map(|_| ()).map_err(|_| ())); +} + +// ---------------------------------------- +// Worker Object - a connected stratum client - a miner, pool, proxy, etc... + +pub struct Worker { + id: usize, + agent: String, + login: Option, + authenticated: bool, + tx: Tx, +} + +impl Worker { + /// Creates a new Stratum Worker. + pub fn new(id: usize, tx: Tx) -> Worker { + Worker { + id: id, + agent: String::from(""), + login: None, + authenticated: false, + tx: tx, + } + } +} // impl Worker + +struct WorkersList { + workers_list: Arc>>, + stratum_stats: Arc>, +} + +impl WorkersList { + pub fn new(stratum_stats: Arc>) -> Self { + WorkersList { + workers_list: Arc::new(RwLock::new(HashMap::new())), + stratum_stats: stratum_stats, + } + } + + pub fn add_worker(&self, tx: Tx) -> usize { + let mut stratum_stats = self.stratum_stats.write(); + let worker_id = stratum_stats.worker_stats.len(); + let worker = Worker::new(worker_id, tx); + let mut workers_list = self.workers_list.write(); + workers_list.insert(worker_id, worker); + + let mut worker_stats = WorkerStats::default(); + worker_stats.is_connected = true; + worker_stats.id = worker_id.to_string(); + worker_stats.pow_difficulty = 1; // XXX TODO + stratum_stats.worker_stats.push(worker_stats); + stratum_stats.num_workers = workers_list.len(); + worker_id + } + pub fn remove_worker(&self, worker_id: usize) { + self.update_stats(worker_id, |ws| ws.is_connected = false); + self.workers_list + .write() + .remove(&worker_id) + .expect("no such addr in map"); + self.stratum_stats.write().num_workers = self.workers_list.read().len(); + } + + //pub fn get_stats(&self, worker_id: usize) -> WorkerStats { + //let stats = self.stratum_stats.read().worker_stats; + //stats[worker_id] + //} + pub fn last_seen(&self, worker_id: usize) { + //self.stratum_stats.write().worker_stats[worker_id].last_seen = SystemTime::now(); + self.update_stats(worker_id, |ws| ws.last_seen = SystemTime::now()); + } + + pub fn update_stats(&self, worker_id: usize, f: impl FnOnce(&mut WorkerStats) -> ()) { + let mut stratum_stats = self.stratum_stats.write(); + f(&mut stratum_stats.worker_stats[worker_id]); + } + + pub fn send_to(&self, worker_id: usize, msg: String) { + self.workers_list + .read() + .get(&worker_id) + .unwrap() + .tx + .unbounded_send(msg); + } + pub fn count(&self) -> usize { + self.workers_list.read().len() + } +} + +// ---------------------------------------- +// Grin Stratum Server + +pub struct StratumServer { + id: String, + config: StratumServerConfig, + chain: Arc, + tx_pool: Arc>, + verifier_cache: Arc>, + current_block_versions: Arc>>, + current_difficulty: u64, + minimum_share_difficulty: u64, + current_key_id: Arc>>, + workers: Arc, + sync_state: Arc, + stratum_stats: Arc>, +} + +impl StratumServer { + /// Creates a new Stratum Server. + pub fn new( + config: StratumServerConfig, + chain: Arc, + tx_pool: Arc>, + verifier_cache: Arc>, + stratum_stats: Arc>, + ) -> StratumServer { + StratumServer { + id: String::from("0"), + minimum_share_difficulty: config.minimum_share_difficulty, + config, + chain, + tx_pool, + verifier_cache, + current_block_versions: Arc::new(RwLock::new(Vec::new())), + current_difficulty: ::max_value(), + current_key_id: Arc::new(RwLock::new(None)), + workers: Arc::new(WorkersList::new(stratum_stats.clone())), + sync_state: Arc::new(SyncState::new()), + stratum_stats: stratum_stats, + } + } + + // Build and return a JobTemplate for mining the current block + fn build_block_template(&self) -> JobTemplate { + let bh = self + .current_block_versions + .read() + .last() + .unwrap() + .header + .clone(); + // Serialize the block header into pre and post nonce strings + let mut header_buf = vec![]; + { + let mut writer = ser::BinWriter::new(&mut header_buf); + bh.write_pre_pow(&mut writer).unwrap(); + bh.pow.write_pre_pow(bh.version, &mut writer).unwrap(); + } + let pre_pow = util::to_hex(header_buf); + let job_template = JobTemplate { + height: bh.height, + job_id: (self.current_block_versions.read().len() - 1) as u64, + difficulty: self.minimum_share_difficulty, + pre_pow, + }; + return job_template; + } + + // Broadcast a jobtemplate RpcRequest to all connected workers - no response + // expected + fn broadcast_job(&mut self) { + // Package new block into RpcRequest + let job_template = self.build_block_template(); + let job_template_json = serde_json::to_string(&job_template).unwrap(); + // Issue #1159 - use a serde_json Value type to avoid extra quoting + let job_template_value: Value = serde_json::from_str(&job_template_json).unwrap(); + let job_request = RpcRequest { + id: String::from("Stratum"), + jsonrpc: String::from("2.0"), + method: String::from("job"), + params: Some(job_template_value), + }; + let job_request_json = serde_json::to_string(&job_request).unwrap(); + debug!( + "(Server ID: {}) sending block {} with id {} to stratum clients", + self.id, job_template.height, job_template.job_id, + ); + for worker in self.workers.workers_list.read().values() { + worker.tx.unbounded_send(job_request_json.clone()); + } + } + + /// "main()" - Starts the stratum-server. Creates a thread to Listens for + /// a connection, then enters a loop, building a new block on top of the + /// existing chain anytime required and sending that to the connected + /// stratum miner, proxy, or pool, and accepts full solutions to + /// be submitted. + pub fn run_loop(&mut self, edge_bits: u32, proof_size: usize, sync_state: Arc) { + info!( + "(Server ID: {}) Starting stratum server with edge_bits = {}, proof_size = {}", + self.id, edge_bits, proof_size + ); + + self.sync_state = sync_state; + + // "globals" for this function + let attempt_time_per_block = self.config.attempt_time_per_block; + let mut deadline: i64 = 0; + // to prevent the wallet from generating a new HD key derivation for each + // iteration, we keep the returned derivation to provide it back when + // nothing has changed. We only want to create a key_id for each new block, + // and reuse it when we rebuild the current block to add new tx. + let mut head = self.chain.head().unwrap(); + let mut current_hash = head.prev_block_h; + let mut latest_hash; + let listen_addr = self + .config + .stratum_server_addr + .clone() + .unwrap() + .parse() + .expect("Incorrect address"); + { + self.current_block_versions.write().push(Block::default()); + } + + let handler = Handler::from_stratum(&self); + + let _listener_th = thread::spawn(move || { + accept_connections(listen_addr, handler); + }); + + // We have started + { + let mut stratum_stats = self.stratum_stats.write(); + stratum_stats.is_running = true; + stratum_stats.edge_bits = edge_bits as u16; + } + + warn!( + "Stratum server started on {}", + self.config.stratum_server_addr.clone().unwrap() + ); + + // Initial Loop. Waiting node complete syncing + while self.sync_state.is_syncing() { + thread::sleep(Duration::from_millis(50)); + } + + // Main Loop + loop { + // get the latest chain state + head = self.chain.head().unwrap(); + latest_hash = head.last_block_h; + + // Build a new block if: + // There is a new block on the chain + // or We are rebuilding the current one to include new transactions + // and there is at least one worker connected + if (current_hash != latest_hash || Utc::now().timestamp() >= deadline) + && self.workers.count() > 0 + { + let mut wallet_listener_url: Option = None; + if !self.config.burn_reward { + wallet_listener_url = Some(self.config.wallet_listener_url.clone()); + } + // If this is a new block, clear the current_block version history + if current_hash != latest_hash { + self.current_block_versions.write().clear(); + } + // Build the new block (version) + let (new_block, block_fees) = mine_block::get_block( + &self.chain, + &self.tx_pool, + self.verifier_cache.clone(), + self.current_key_id.read().clone(), + wallet_listener_url, + ); + self.current_difficulty = + (new_block.header.total_difficulty() - head.total_difficulty).to_num(); + { + let mut current_key_id = self.current_key_id.write(); + *current_key_id = block_fees.key_id(); + } + current_hash = latest_hash; + // set the minimum acceptable share difficulty for this block + self.minimum_share_difficulty = cmp::min( + self.config.minimum_share_difficulty, + self.current_difficulty, + ); + // set a new deadline for rebuilding with fresh transactions + deadline = Utc::now().timestamp() + attempt_time_per_block as i64; + + { + let mut stratum_stats = self.stratum_stats.write(); + stratum_stats.block_height = new_block.header.height; + stratum_stats.network_difficulty = self.current_difficulty; + } + // Add this new block version to our current block map + { + self.current_block_versions.write().push(new_block); + } + // Send this job to all connected workers + self.broadcast_job(); + } + + // sleep before restarting loop + thread::sleep(Duration::from_millis(50)); + } // Main Loop + } // fn run_loop() +} // StratumServer + +// Utility function to parse a JSON RPC parameter object, returning a proper +// error if things go wrong. +fn parse_params(params: Option) -> Result +where + for<'de> T: serde::Deserialize<'de>, +{ + params + .and_then(|v| serde_json::from_value(v).ok()) + .ok_or_else(|| { + let e = RpcError { + code: -32600, + message: "Invalid Request".to_string(), + }; + serde_json::to_value(e).unwrap() + }) +} diff --git a/servers/src/mining/stratumserver.rs b/servers/src/mining/stratumserver/stratumserver_std.rs similarity index 97% rename from servers/src/mining/stratumserver.rs rename to servers/src/mining/stratumserver/stratumserver_std.rs index 439265770b..46a571fafe 100644 --- a/servers/src/mining/stratumserver.rs +++ b/servers/src/mining/stratumserver/stratumserver_std.rs @@ -233,6 +233,7 @@ pub struct StratumServer { current_key_id: Option, workers: Arc>>, sync_state: Arc, + stratum_stats: Arc>, } impl StratumServer { @@ -242,6 +243,7 @@ impl StratumServer { chain: Arc, tx_pool: Arc>, verifier_cache: Arc>, + stratum_stats: Arc>, ) -> StratumServer { StratumServer { id: String::from("0"), @@ -255,6 +257,7 @@ impl StratumServer { current_key_id: None, workers: Arc::new(Mutex::new(Vec::new())), sync_state: Arc::new(SyncState::new()), + stratum_stats: stratum_stats, } } @@ -670,13 +673,7 @@ impl StratumServer { /// existing chain anytime required and sending that to the connected /// stratum miner, proxy, or pool, and accepts full solutions to /// be submitted. - pub fn run_loop( - &mut self, - stratum_stats: Arc>, - edge_bits: u32, - proof_size: usize, - sync_state: Arc, - ) { + pub fn run_loop(&mut self, edge_bits: u32, proof_size: usize, sync_state: Arc) { info!( "(Server ID: {}) Starting stratum server with edge_bits = {}, proof_size = {}", self.id, edge_bits, proof_size @@ -701,14 +698,14 @@ impl StratumServer { // Start a thread to accept new worker connections let mut workers_th = self.workers.clone(); let id_th = self.id.clone(); - let mut stats_th = stratum_stats.clone(); + let mut stats_th = self.stratum_stats.clone(); let _listener_th = thread::spawn(move || { accept_workers(id_th, listen_addr, &mut workers_th, &mut stats_th); }); // We have started { - let mut stratum_stats = stratum_stats.write(); + let mut stratum_stats = self.stratum_stats.write(); stratum_stats.is_running = true; stratum_stats.edge_bits = edge_bits as u16; } @@ -720,10 +717,10 @@ impl StratumServer { // Initial Loop. Waiting node complete syncing while self.sync_state.is_syncing() { - self.clean_workers(&mut stratum_stats.clone()); + self.clean_workers(&mut self.stratum_stats.clone()); // Handle any messages from the workers - self.handle_rpc_requests(&mut stratum_stats.clone()); + self.handle_rpc_requests(&mut self.stratum_stats.clone()); thread::sleep(Duration::from_millis(50)); } @@ -731,7 +728,7 @@ impl StratumServer { // Main Loop loop { // Remove workers with failed connections - num_workers = self.clean_workers(&mut stratum_stats.clone()); + num_workers = self.clean_workers(&mut self.stratum_stats.clone()); // get the latest chain state head = self.chain.head().unwrap(); @@ -773,7 +770,7 @@ impl StratumServer { deadline = Utc::now().timestamp() + attempt_time_per_block as i64; { - let mut stratum_stats = stratum_stats.write(); + let mut stratum_stats = self.stratum_stats.write(); stratum_stats.block_height = new_block.header.height; stratum_stats.network_difficulty = self.current_difficulty; } @@ -784,7 +781,7 @@ impl StratumServer { } // Handle any messages from the workers - self.handle_rpc_requests(&mut stratum_stats.clone()); + self.handle_rpc_requests(&mut self.stratum_stats.clone()); // sleep before restarting loop thread::sleep(Duration::from_micros(1)); From 19a419ca4358852695aa8e5af21934fce925bf79 Mon Sep 17 00:00:00 2001 From: Max Lavrenov Date: Thu, 24 Jan 2019 19:20:22 +0100 Subject: [PATCH 02/11] fix bug with passing current_difficulty in Handler I passed in Handler current_difficulty as an u64, so it was copied and all later changes were ignored. In this commit I pass referece on RwLock with current_difficulty. --- .../stratumserver/stratumserver_async.rs | 55 +++++++++++-------- 1 file changed, 31 insertions(+), 24 deletions(-) diff --git a/servers/src/mining/stratumserver/stratumserver_async.rs b/servers/src/mining/stratumserver/stratumserver_async.rs index af727ea5bf..61988977d9 100644 --- a/servers/src/mining/stratumserver/stratumserver_async.rs +++ b/servers/src/mining/stratumserver/stratumserver_async.rs @@ -114,9 +114,9 @@ struct Handler { workers: Arc, current_block_versions: Arc>>, sync_state: Arc, - minimum_share_difficulty: u64, + minimum_share_difficulty: Arc>, current_key_id: Arc>>, - current_difficulty: u64, + current_difficulty: Arc>, chain: Arc, } @@ -126,9 +126,9 @@ impl Handler { workers: Arc, current_block_versions: Arc>>, sync_state: Arc, - minimum_share_difficulty: u64, + minimum_share_difficulty: Arc>, current_key_id: Arc>>, - current_difficulty: u64, + current_difficulty: Arc>, chain: Arc, ) -> Self { Handler { @@ -148,9 +148,9 @@ impl Handler { stratum.workers.clone(), stratum.current_block_versions.clone(), stratum.sync_state.clone(), - stratum.minimum_share_difficulty, + stratum.minimum_share_difficulty.clone(), stratum.current_key_id.clone(), - stratum.current_difficulty, + stratum.current_difficulty.clone(), stratum.chain.clone(), ) } @@ -287,7 +287,7 @@ impl Handler { let job_template = JobTemplate { height: bh.height, job_id: (self.current_block_versions.read().len() - 1) as u64, - difficulty: self.minimum_share_difficulty, + difficulty: *self.minimum_share_difficulty.read(), pre_pow, }; return job_template; @@ -357,11 +357,11 @@ impl Handler { // Get share difficulty share_difficulty = b.header.pow.to_difficulty(b.header.height).to_num(); // If the difficulty is too low its an error - if share_difficulty < self.minimum_share_difficulty { + if share_difficulty < *self.minimum_share_difficulty.read() { // Return error status error!( "(Server ID: {}) Share at height {}, hash {}, edge_bits {}, nonce {}, job_id {} rejected due to low difficulty: {}/{}", - self.id, params.height, b.hash(), params.edge_bits, params.nonce, params.job_id, share_difficulty, self.minimum_share_difficulty, + self.id, params.height, b.hash(), params.edge_bits, params.nonce, params.job_id, share_difficulty, *self.minimum_share_difficulty.read(), ); self.workers .update_stats(worker_id, |worker_stats| worker_stats.num_rejected += 1); @@ -371,8 +371,9 @@ impl Handler { }; return Err(serde_json::to_value(e).unwrap()); } + // If the difficulty is high enough, submit it (which also validates it) - if share_difficulty >= self.current_difficulty { + if share_difficulty >= *self.current_difficulty.read() { // This is a full solution, submit it to the network let res = self.chain.process_block(b.clone(), chain::Options::MINE); if let Err(e) = res { @@ -461,7 +462,7 @@ impl Handler { b.header.pow.nonce, params.job_id, share_difficulty, - self.current_difficulty, + *self.current_difficulty.read(), submitted_by, ); self.workers @@ -627,8 +628,8 @@ pub struct StratumServer { tx_pool: Arc>, verifier_cache: Arc>, current_block_versions: Arc>>, - current_difficulty: u64, - minimum_share_difficulty: u64, + current_difficulty: Arc>, + minimum_share_difficulty: Arc>, current_key_id: Arc>>, workers: Arc, sync_state: Arc, @@ -646,13 +647,13 @@ impl StratumServer { ) -> StratumServer { StratumServer { id: String::from("0"), - minimum_share_difficulty: config.minimum_share_difficulty, + minimum_share_difficulty: Arc::new(RwLock::new(config.minimum_share_difficulty)), config, chain, tx_pool, verifier_cache, current_block_versions: Arc::new(RwLock::new(Vec::new())), - current_difficulty: ::max_value(), + current_difficulty: Arc::new(RwLock::new(::max_value())), current_key_id: Arc::new(RwLock::new(None)), workers: Arc::new(WorkersList::new(stratum_stats.clone())), sync_state: Arc::new(SyncState::new()), @@ -680,7 +681,7 @@ impl StratumServer { let job_template = JobTemplate { height: bh.height, job_id: (self.current_block_versions.read().len() - 1) as u64, - difficulty: self.minimum_share_difficulty, + difficulty: *self.minimum_share_difficulty.read(), pre_pow, }; return job_template; @@ -796,25 +797,31 @@ impl StratumServer { self.current_key_id.read().clone(), wallet_listener_url, ); - self.current_difficulty = - (new_block.header.total_difficulty() - head.total_difficulty).to_num(); + { + let mut current_difficulty = self.current_difficulty.write(); + *current_difficulty = + (new_block.header.total_difficulty() - head.total_difficulty).to_num(); + } { let mut current_key_id = self.current_key_id.write(); *current_key_id = block_fees.key_id(); } current_hash = latest_hash; - // set the minimum acceptable share difficulty for this block - self.minimum_share_difficulty = cmp::min( - self.config.minimum_share_difficulty, - self.current_difficulty, - ); + { + // set the minimum acceptable share difficulty for this block + let mut minimum_share_difficulty = self.minimum_share_difficulty.write(); + *minimum_share_difficulty = cmp::min( + self.config.minimum_share_difficulty, + *self.current_difficulty.read(), + ); + } // set a new deadline for rebuilding with fresh transactions deadline = Utc::now().timestamp() + attempt_time_per_block as i64; { let mut stratum_stats = self.stratum_stats.write(); stratum_stats.block_height = new_block.header.height; - stratum_stats.network_difficulty = self.current_difficulty; + stratum_stats.network_difficulty = *self.current_difficulty.read(); } // Add this new block version to our current block map { From b9b8604f13490b9f2f457949298c9d55124393f5 Mon Sep 17 00:00:00 2001 From: Max Lavrenov Date: Sat, 26 Jan 2019 12:27:47 +0100 Subject: [PATCH 03/11] remove comments --- servers/src/mining/stratumserver/stratumserver_async.rs | 4 ---- 1 file changed, 4 deletions(-) diff --git a/servers/src/mining/stratumserver/stratumserver_async.rs b/servers/src/mining/stratumserver/stratumserver_async.rs index 61988977d9..7a5da61e55 100644 --- a/servers/src/mining/stratumserver/stratumserver_async.rs +++ b/servers/src/mining/stratumserver/stratumserver_async.rs @@ -591,10 +591,6 @@ impl WorkersList { self.stratum_stats.write().num_workers = self.workers_list.read().len(); } - //pub fn get_stats(&self, worker_id: usize) -> WorkerStats { - //let stats = self.stratum_stats.read().worker_stats; - //stats[worker_id] - //} pub fn last_seen(&self, worker_id: usize) { //self.stratum_stats.write().worker_stats[worker_id].last_seen = SystemTime::now(); self.update_stats(worker_id, |ws| ws.last_seen = SystemTime::now()); From f03b1dd5275e36e9fa7d51b40a7f6e9f8cca36a2 Mon Sep 17 00:00:00 2001 From: Max Lavrenov Date: Sat, 26 Jan 2019 20:59:51 +0100 Subject: [PATCH 04/11] fix crash in build_block --- .../stratumserver/stratumserver_async.rs | 78 +++++++++---------- 1 file changed, 39 insertions(+), 39 deletions(-) diff --git a/servers/src/mining/stratumserver/stratumserver_async.rs b/servers/src/mining/stratumserver/stratumserver_async.rs index 7a5da61e55..4177634edd 100644 --- a/servers/src/mining/stratumserver/stratumserver_async.rs +++ b/servers/src/mining/stratumserver/stratumserver_async.rs @@ -777,51 +777,51 @@ impl StratumServer { if (current_hash != latest_hash || Utc::now().timestamp() >= deadline) && self.workers.count() > 0 { - let mut wallet_listener_url: Option = None; - if !self.config.burn_reward { - wallet_listener_url = Some(self.config.wallet_listener_url.clone()); - } - // If this is a new block, clear the current_block version history - if current_hash != latest_hash { - self.current_block_versions.write().clear(); - } - // Build the new block (version) - let (new_block, block_fees) = mine_block::get_block( - &self.chain, - &self.tx_pool, - self.verifier_cache.clone(), - self.current_key_id.read().clone(), - wallet_listener_url, - ); { - let mut current_difficulty = self.current_difficulty.write(); - *current_difficulty = - (new_block.header.total_difficulty() - head.total_difficulty).to_num(); - } - { - let mut current_key_id = self.current_key_id.write(); - *current_key_id = block_fees.key_id(); - } - current_hash = latest_hash; - { - // set the minimum acceptable share difficulty for this block - let mut minimum_share_difficulty = self.minimum_share_difficulty.write(); - *minimum_share_difficulty = cmp::min( - self.config.minimum_share_difficulty, - *self.current_difficulty.read(), + let mut current_block_versions = self.current_block_versions.write(); + let mut wallet_listener_url: Option = None; + if !self.config.burn_reward { + wallet_listener_url = Some(self.config.wallet_listener_url.clone()); + } + // If this is a new block, clear the current_block version history + if current_hash != latest_hash { + current_block_versions.clear(); + } + // Build the new block (version) + let (new_block, block_fees) = mine_block::get_block( + &self.chain, + &self.tx_pool, + self.verifier_cache.clone(), + self.current_key_id.read().clone(), + wallet_listener_url, ); - } - // set a new deadline for rebuilding with fresh transactions - deadline = Utc::now().timestamp() + attempt_time_per_block as i64; + { + let mut current_difficulty = self.current_difficulty.write(); + *current_difficulty = + (new_block.header.total_difficulty() - head.total_difficulty).to_num(); + } + { + let mut current_key_id = self.current_key_id.write(); + *current_key_id = block_fees.key_id(); + } + current_hash = latest_hash; + { + // set the minimum acceptable share difficulty for this block + let mut minimum_share_difficulty = self.minimum_share_difficulty.write(); + *minimum_share_difficulty = cmp::min( + self.config.minimum_share_difficulty, + *self.current_difficulty.read(), + ); + } + // set a new deadline for rebuilding with fresh transactions + deadline = Utc::now().timestamp() + attempt_time_per_block as i64; - { let mut stratum_stats = self.stratum_stats.write(); stratum_stats.block_height = new_block.header.height; stratum_stats.network_difficulty = *self.current_difficulty.read(); - } - // Add this new block version to our current block map - { - self.current_block_versions.write().push(new_block); + + // Add this new block version to our current block map + current_block_versions.push(new_block); } // Send this job to all connected workers self.broadcast_job(); From 43ad574514a41c59f0c124e0040201aa4549463c Mon Sep 17 00:00:00 2001 From: Max Lavrenov Date: Sun, 27 Jan 2019 23:02:33 +0100 Subject: [PATCH 05/11] improve error handling --- .../stratumserver/stratumserver_async.rs | 138 +++++++++++------- 1 file changed, 85 insertions(+), 53 deletions(-) diff --git a/servers/src/mining/stratumserver/stratumserver_async.rs b/servers/src/mining/stratumserver/stratumserver_async.rs index 4177634edd..65bb8bac6c 100644 --- a/servers/src/mining/stratumserver/stratumserver_async.rs +++ b/servers/src/mining/stratumserver/stratumserver_async.rs @@ -75,6 +75,70 @@ struct RpcError { message: String, } +impl RpcError { + pub fn internal_error() -> Self { + RpcError { + code: 32603, + message: "Internal error".to_owned(), + } + } + pub fn node_is_syncing() -> Self { + RpcError { + code: -32000, + message: "Node is syncing - Please wait".to_owned(), + } + } + pub fn method_not_found() -> Self { + RpcError { + code: -32601, + message: "Method not found".to_owned(), + } + } + pub fn too_late() -> Self { + RpcError { + code: -32503, + message: "Solution submitted too late".to_string(), + } + } + pub fn cannot_validate() -> Self { + RpcError { + code: -32502, + message: "Failed to validate solution".to_string(), + } + } + pub fn too_low_difficulty() -> Self { + RpcError { + code: -32501, + message: "Share rejected due to low difficulty".to_string(), + } + } + pub fn invalid_request() -> Self { + RpcError { + code: -32600, + message: "Invalid Request".to_string(), + } + } +} + +impl From for Value { + fn from(e: RpcError) -> Self { + serde_json::to_value(e).unwrap() + } +} + +impl From for RpcError +where + T: std::error::Error, +{ + fn from(e: T) -> Self { + let err = RpcError { + code: 32603, + message: "Internal error".to_owned(), + }; + err + } +} + #[derive(Serialize, Deserialize, Debug)] struct LoginParams { login: String, @@ -172,11 +236,7 @@ impl Handler { "keepalive" => self.handle_keepalive(), "getjobtemplate" => { if self.sync_state.is_syncing() { - let e = RpcError { - code: -32000, - message: "Node is syncing - Please wait".to_string(), - }; - Err(serde_json::to_value(e).unwrap()) + Err(RpcError::node_is_syncing()) } else { self.handle_getjobtemplate() } @@ -184,24 +244,20 @@ impl Handler { "status" => self.handle_status(worker_id), _ => { // Called undefined method - let e = RpcError { - code: -32601, - message: "Method not found".to_string(), - }; - Err(serde_json::to_value(e).unwrap()) + Err(RpcError::method_not_found()) } }; // Package the reply as RpcResponse json let rpc_response: String; match response { - Err(response) => { + Err(rpc_error) => { let resp = RpcResponse { id: request.id, jsonrpc: String::from("2.0"), method: request.method, result: None, - error: Some(response), + error: Some(rpc_error.into()), }; rpc_response = serde_json::to_string(&resp).unwrap(); } @@ -218,23 +274,25 @@ impl Handler { } rpc_response } - fn handle_login(&self, params: Option, worker_id: usize) -> Result { + fn handle_login(&self, params: Option, worker_id: usize) -> Result { let params: LoginParams = parse_params(params)?; let mut workers = self.workers.workers_list.write(); - let worker = workers.get_mut(&worker_id).unwrap(); + let worker = workers + .get_mut(&worker_id) + .ok_or(RpcError::internal_error())?; worker.login = Some(params.login); // XXX TODO Future - Validate password? worker.agent = params.agent; worker.authenticated = true; - return Ok(serde_json::to_value("ok".to_string()).unwrap()); + return Ok("ok".into()); } // Handle KEEPALIVE message - fn handle_keepalive(&self) -> Result { - return Ok(serde_json::to_value("ok".to_string()).unwrap()); + fn handle_keepalive(&self) -> Result { + return Ok("ok".into()); } - fn handle_status(&self, worker_id: usize) -> Result { + fn handle_status(&self, worker_id: usize) -> Result { // Return worker status in json for use by a dashboard or healthcheck. let status = WorkerStatus { id: self.workers.stratum_stats.read().worker_stats[worker_id] @@ -256,7 +314,7 @@ impl Handler { return Ok(response); } // Handle GETJOBTEMPLATE message - fn handle_getjobtemplate(&self) -> Result { + fn handle_getjobtemplate(&self) -> Result { // Build a JobTemplate from a BlockHeader and return JSON let job_template = self.build_block_template(); let response = serde_json::to_value(&job_template).unwrap(); @@ -301,7 +359,7 @@ impl Handler { &self, params: Option, worker_id: usize, - ) -> Result<(Value, bool), Value> { + ) -> Result<(Value, bool), RpcError> { // Validate parameters let params: SubmitParams = parse_params(params)?; @@ -323,11 +381,7 @@ impl Handler { self.id, params.height, params.edge_bits, params.nonce, params.job_id, ); self.workers.update_stats(worker_id, |ws| ws.num_stale += 1); - let e = RpcError { - code: -32503, - message: "Solution submitted too late".to_string(), - }; - return Err(serde_json::to_value(e).unwrap()); + return Err(RpcError::too_late()); } let share_difficulty: u64; @@ -347,11 +401,7 @@ impl Handler { ); self.workers .update_stats(worker_id, |worker_stats| worker_stats.num_rejected += 1); - let e = RpcError { - code: -32502, - message: "Failed to validate solution".to_string(), - }; - return Err(serde_json::to_value(e).unwrap()); + return Err(RpcError::cannot_validate()); } // Get share difficulty @@ -365,11 +415,7 @@ impl Handler { ); self.workers .update_stats(worker_id, |worker_stats| worker_stats.num_rejected += 1); - let e = RpcError { - code: -32501, - message: "Share rejected due to low difficulty".to_string(), - }; - return Err(serde_json::to_value(e).unwrap()); + return Err(RpcError::too_low_difficulty()); } // If the difficulty is high enough, submit it (which also validates it) @@ -391,11 +437,7 @@ impl Handler { ); self.workers .update_stats(worker_id, |worker_stats| worker_stats.num_rejected += 1); - let e = RpcError { - code: -32502, - message: "Failed to validate solution".to_string(), - }; - return Err(serde_json::to_value(e).unwrap()); + return Err(RpcError::cannot_validate()); } share_is_block = true; self.workers @@ -426,11 +468,7 @@ impl Handler { ); self.workers .update_stats(worker_id, |worker_stats| worker_stats.num_rejected += 1); - let e = RpcError { - code: -32502, - message: "Failed to validate solution".to_string(), - }; - return Err(serde_json::to_value(e).unwrap()); + return Err(RpcError::cannot_validate()); } } // Log this as a valid share @@ -835,17 +873,11 @@ impl StratumServer { // Utility function to parse a JSON RPC parameter object, returning a proper // error if things go wrong. -fn parse_params(params: Option) -> Result +fn parse_params(params: Option) -> Result where for<'de> T: serde::Deserialize<'de>, { params .and_then(|v| serde_json::from_value(v).ok()) - .ok_or_else(|| { - let e = RpcError { - code: -32600, - message: "Invalid Request".to_string(), - }; - serde_json::to_value(e).unwrap() - }) + .ok_or(RpcError::invalid_request()) } From c2a49c7a9ce65a44aa956905957f8946d525f55b Mon Sep 17 00:00:00 2001 From: Max Lavrenov Date: Sun, 27 Jan 2019 23:17:17 +0100 Subject: [PATCH 06/11] small fix --- servers/src/mining/stratumserver/stratumserver_async.rs | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/servers/src/mining/stratumserver/stratumserver_async.rs b/servers/src/mining/stratumserver/stratumserver_async.rs index 65bb8bac6c..3359e16bbf 100644 --- a/servers/src/mining/stratumserver/stratumserver_async.rs +++ b/servers/src/mining/stratumserver/stratumserver_async.rs @@ -249,7 +249,6 @@ impl Handler { }; // Package the reply as RpcResponse json - let rpc_response: String; match response { Err(rpc_error) => { let resp = RpcResponse { @@ -259,7 +258,7 @@ impl Handler { result: None, error: Some(rpc_error.into()), }; - rpc_response = serde_json::to_string(&resp).unwrap(); + serde_json::to_string(&resp).unwrap() } Ok(response) => { let resp = RpcResponse { @@ -269,10 +268,9 @@ impl Handler { result: Some(response), error: None, }; - rpc_response = serde_json::to_string(&resp).unwrap(); + serde_json::to_string(&resp).unwrap() } } - rpc_response } fn handle_login(&self, params: Option, worker_id: usize) -> Result { let params: LoginParams = parse_params(params)?; From c59716937e8e2b1278a1423cd717429d7865d650 Mon Sep 17 00:00:00 2001 From: Max Lavrenov Date: Mon, 28 Jan 2019 17:40:42 +0100 Subject: [PATCH 07/11] print error message on unknown error --- servers/src/mining/stratumserver/stratumserver_async.rs | 1 + 1 file changed, 1 insertion(+) diff --git a/servers/src/mining/stratumserver/stratumserver_async.rs b/servers/src/mining/stratumserver/stratumserver_async.rs index 3359e16bbf..a9433a852f 100644 --- a/servers/src/mining/stratumserver/stratumserver_async.rs +++ b/servers/src/mining/stratumserver/stratumserver_async.rs @@ -131,6 +131,7 @@ where T: std::error::Error, { fn from(e: T) -> Self { + error!("Received unhandled error: {}", e); let err = RpcError { code: 32603, message: "Internal error".to_owned(), From 805bce992e7aa1cf70efd124867d4d40fa950f6e Mon Sep 17 00:00:00 2001 From: Max Lavrenov Date: Mon, 28 Jan 2019 18:01:58 +0100 Subject: [PATCH 08/11] replace original server by tokio based one --- Cargo.toml | 3 - servers/Cargo.toml | 7 +- servers/src/mining.rs | 11 +- ...tratumserver_async.rs => stratumserver.rs} | 0 .../mining/stratumserver/stratumserver_std.rs | 807 ------------------ 5 files changed, 2 insertions(+), 826 deletions(-) rename servers/src/mining/{stratumserver/stratumserver_async.rs => stratumserver.rs} (100%) delete mode 100644 servers/src/mining/stratumserver/stratumserver_std.rs diff --git a/Cargo.toml b/Cargo.toml index 5fea267748..40b80c2afa 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -54,6 +54,3 @@ tar = "0.4" [dev-dependencies] grin_chain = { path = "./chain", version = "1.0.1" } grin_store = { path = "./store", version = "1.0.1" } - -[features] -async = ['grin_servers/async'] diff --git a/servers/Cargo.toml b/servers/Cargo.toml index 008b7bd0df..7b613a3ab6 100644 --- a/servers/Cargo.toml +++ b/servers/Cargo.toml @@ -24,7 +24,7 @@ serde_json = "1" chrono = "0.4.4" bufstream = "~0.1" jsonrpc-core = "~8.0" -tokio = { version = "0.1.11", optional = true} +tokio = "0.1.11" grin_api = { path = "../api", version = "1.0.1" } grin_chain = { path = "../chain", version = "1.0.1" } @@ -38,8 +38,3 @@ grin_wallet = { path = "../wallet", version = "1.0.1" } [dev-dependencies] blake2-rfc = "0.2" - -[features] -async = ['tokio'] - - diff --git a/servers/src/mining.rs b/servers/src/mining.rs index c60f68ddbf..e07e201760 100644 --- a/servers/src/mining.rs +++ b/servers/src/mining.rs @@ -15,14 +15,5 @@ //! Mining + Mining server mod mine_block; -pub mod stratumserver { - #[cfg(not(feature = "async"))] - mod stratumserver_std; - #[cfg(not(feature = "async"))] - pub use self::stratumserver_std::*; - #[cfg(feature = "async")] - mod stratumserver_async; - #[cfg(feature = "async")] - pub use self::stratumserver_async::*; -} +pub mod stratumserver; pub mod test_miner; diff --git a/servers/src/mining/stratumserver/stratumserver_async.rs b/servers/src/mining/stratumserver.rs similarity index 100% rename from servers/src/mining/stratumserver/stratumserver_async.rs rename to servers/src/mining/stratumserver.rs diff --git a/servers/src/mining/stratumserver/stratumserver_std.rs b/servers/src/mining/stratumserver/stratumserver_std.rs deleted file mode 100644 index 46a571fafe..0000000000 --- a/servers/src/mining/stratumserver/stratumserver_std.rs +++ /dev/null @@ -1,807 +0,0 @@ -// Copyright 2018 The Grin Developers -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -//! Mining Stratum Server -use crate::util::{Mutex, RwLock}; -use bufstream::BufStream; -use chrono::prelude::Utc; -use serde; -use serde_json; -use serde_json::Value; -use std::error::Error; -use std::io::{BufRead, ErrorKind, Write}; -use std::net::{TcpListener, TcpStream}; -use std::sync::Arc; -use std::time::{Duration, SystemTime}; -use std::{cmp, thread}; - -use crate::chain; -use crate::common::stats::{StratumStats, WorkerStats}; -use crate::common::types::{StratumServerConfig, SyncState}; -use crate::core::core::verifier_cache::VerifierCache; -use crate::core::core::Block; -use crate::core::{pow, ser}; -use crate::keychain; -use crate::mining::mine_block; -use crate::pool; -use crate::util; - -// ---------------------------------------- -// http://www.jsonrpc.org/specification -// RPC Methods - -#[derive(Serialize, Deserialize, Debug)] -struct RpcRequest { - id: String, - jsonrpc: String, - method: String, - params: Option, -} - -#[derive(Serialize, Deserialize, Debug)] -struct RpcResponse { - id: String, - jsonrpc: String, - method: String, - result: Option, - error: Option, -} - -#[derive(Serialize, Deserialize, Debug)] -struct RpcError { - code: i32, - message: String, -} - -#[derive(Serialize, Deserialize, Debug)] -struct LoginParams { - login: String, - pass: String, - agent: String, -} - -#[derive(Serialize, Deserialize, Debug)] -struct SubmitParams { - height: u64, - job_id: u64, - nonce: u64, - edge_bits: u32, - pow: Vec, -} - -#[derive(Serialize, Deserialize, Debug)] -pub struct JobTemplate { - height: u64, - job_id: u64, - difficulty: u64, - pre_pow: String, -} - -#[derive(Serialize, Deserialize, Debug)] -pub struct WorkerStatus { - id: String, - height: u64, - difficulty: u64, - accepted: u64, - rejected: u64, - stale: u64, -} - -// ---------------------------------------- -// Worker Factory Thread Function - -// Run in a thread. Adds new connections to the workers list -fn accept_workers( - id: String, - address: String, - workers: &mut Arc>>, - stratum_stats: &mut Arc>, -) { - let listener = TcpListener::bind(address).expect("Failed to bind to listen address"); - let mut worker_id: u32 = 0; - for stream in listener.incoming() { - match stream { - Ok(stream) => { - warn!( - "(Server ID: {}) New connection: {}", - id, - stream.peer_addr().unwrap() - ); - stream - .set_nonblocking(true) - .expect("set_nonblocking call failed"); - let worker = Worker::new(worker_id.to_string(), BufStream::new(stream)); - workers.lock().push(worker); - // stats for this worker (worker stat objects are added and updated but never - // removed) - let mut worker_stats = WorkerStats::default(); - worker_stats.is_connected = true; - worker_stats.id = worker_id.to_string(); - worker_stats.pow_difficulty = 1; // XXX TODO - let mut stratum_stats = stratum_stats.write(); - stratum_stats.worker_stats.push(worker_stats); - worker_id = worker_id + 1; - } - Err(e) => { - warn!("(Server ID: {}) Error accepting connection: {:?}", id, e); - } - } - } - // close the socket server - drop(listener); -} - -// ---------------------------------------- -// Worker Object - a connected stratum client - a miner, pool, proxy, etc... - -pub struct Worker { - id: String, - agent: String, - login: Option, - stream: BufStream, - error: bool, - authenticated: bool, -} - -impl Worker { - /// Creates a new Stratum Worker. - pub fn new(id: String, stream: BufStream) -> Worker { - Worker { - id: id, - agent: String::from(""), - login: None, - stream: stream, - error: false, - authenticated: false, - } - } - - // Get Message from the worker - fn read_message(&mut self, line: &mut String) -> Option { - // Read and return a single message or None - match self.stream.read_line(line) { - Ok(n) => { - return Some(n); - } - Err(ref e) if e.kind() == ErrorKind::WouldBlock => { - // Not an error, just no messages ready - return None; - } - Err(e) => { - warn!( - "(Server ID: {}) Error in connection with stratum client: {}", - self.id, e - ); - self.error = true; - return None; - } - } - } - - // Send Message to the worker - fn write_message(&mut self, mut message: String) { - // Write and Flush the message - if !message.ends_with("\n") { - message += "\n"; - } - match self.stream.write(message.as_bytes()) { - Ok(_) => match self.stream.flush() { - Ok(_) => {} - Err(e) => { - warn!( - "(Server ID: {}) Error in connection with stratum client: {}", - self.id, e - ); - self.error = true; - } - }, - Err(e) => { - warn!( - "(Server ID: {}) Error in connection with stratum client: {}", - self.id, e - ); - self.error = true; - return; - } - } - } -} // impl Worker - -// ---------------------------------------- -// Grin Stratum Server - -pub struct StratumServer { - id: String, - config: StratumServerConfig, - chain: Arc, - tx_pool: Arc>, - verifier_cache: Arc>, - current_block_versions: Vec, - current_difficulty: u64, - minimum_share_difficulty: u64, - current_key_id: Option, - workers: Arc>>, - sync_state: Arc, - stratum_stats: Arc>, -} - -impl StratumServer { - /// Creates a new Stratum Server. - pub fn new( - config: StratumServerConfig, - chain: Arc, - tx_pool: Arc>, - verifier_cache: Arc>, - stratum_stats: Arc>, - ) -> StratumServer { - StratumServer { - id: String::from("0"), - minimum_share_difficulty: config.minimum_share_difficulty, - config, - chain, - tx_pool, - verifier_cache, - current_block_versions: Vec::new(), - current_difficulty: ::max_value(), - current_key_id: None, - workers: Arc::new(Mutex::new(Vec::new())), - sync_state: Arc::new(SyncState::new()), - stratum_stats: stratum_stats, - } - } - - // Build and return a JobTemplate for mining the current block - fn build_block_template(&self) -> JobTemplate { - let bh = self.current_block_versions.last().unwrap().header.clone(); - // Serialize the block header into pre and post nonce strings - let mut header_buf = vec![]; - { - let mut writer = ser::BinWriter::new(&mut header_buf); - bh.write_pre_pow(&mut writer).unwrap(); - bh.pow.write_pre_pow(bh.version, &mut writer).unwrap(); - } - let pre_pow = util::to_hex(header_buf); - let job_template = JobTemplate { - height: bh.height, - job_id: (self.current_block_versions.len() - 1) as u64, - difficulty: self.minimum_share_difficulty, - pre_pow, - }; - return job_template; - } - - // Handle an RPC request message from the worker(s) - fn handle_rpc_requests(&mut self, stratum_stats: &mut Arc>) { - let mut workers_l = self.workers.lock(); - let mut the_message = String::with_capacity(4096); - for num in 0..workers_l.len() { - match workers_l[num].read_message(&mut the_message) { - Some(_) => { - // Decompose the request from the JSONRpc wrapper - let request: RpcRequest = match serde_json::from_str(&the_message) { - Ok(request) => request, - Err(e) => { - // not a valid JSON RpcRequest - disconnect the worker - warn!( - "(Server ID: {}) Failed to parse JSONRpc: {} - {:?}", - self.id, - e.description(), - the_message.as_bytes(), - ); - workers_l[num].error = true; - the_message.clear(); - continue; - } - }; - - the_message.clear(); - - let mut stratum_stats = stratum_stats.write(); - let worker_stats_id = match stratum_stats - .worker_stats - .iter() - .position(|r| r.id == workers_l[num].id) - { - Some(id) => id, - None => continue, - }; - stratum_stats.worker_stats[worker_stats_id].last_seen = SystemTime::now(); - - // Call the handler function for requested method - let response = match request.method.as_str() { - "login" => { - if self.current_block_versions.is_empty() { - continue; - } - stratum_stats.worker_stats[worker_stats_id].initial_block_height = - self.current_block_versions.last().unwrap().header.height; - self.handle_login(request.params, &mut workers_l[num]) - } - "submit" => { - let res = self.handle_submit( - request.params, - &mut workers_l[num], - &mut stratum_stats.worker_stats[worker_stats_id], - ); - // this key_id has been used now, reset - if let Ok((_, true)) = res { - self.current_key_id = None; - } - res.map(|(v, _)| v) - } - "keepalive" => self.handle_keepalive(), - "getjobtemplate" => { - if self.sync_state.is_syncing() { - let e = RpcError { - code: -32000, - message: "Node is syncing - Please wait".to_string(), - }; - Err(serde_json::to_value(e).unwrap()) - } else { - self.handle_getjobtemplate() - } - } - "status" => { - self.handle_status(&mut stratum_stats.worker_stats[worker_stats_id]) - } - _ => { - // Called undefined method - let e = RpcError { - code: -32601, - message: "Method not found".to_string(), - }; - Err(serde_json::to_value(e).unwrap()) - } - }; - - let id = request.id.clone(); - // Package the reply as RpcResponse json - let resp = match response { - Err(response) => RpcResponse { - id: id, - jsonrpc: String::from("2.0"), - method: request.method, - result: None, - error: Some(response), - }, - Ok(response) => RpcResponse { - id: id, - jsonrpc: String::from("2.0"), - method: request.method, - result: Some(response), - error: None, - }, - }; - if let Ok(rpc_response) = serde_json::to_string(&resp) { - // Send the reply - workers_l[num].write_message(rpc_response); - } else { - warn!("handle_rpc_requests: failed responding to {:?}", request.id); - }; - } - None => {} // No message for us from this worker - } - } - } - - // Handle STATUS message - fn handle_status(&self, worker_stats: &mut WorkerStats) -> Result { - // Return worker status in json for use by a dashboard or healthcheck. - let status = WorkerStatus { - id: worker_stats.id.clone(), - height: self.current_block_versions.last().unwrap().header.height, - difficulty: worker_stats.pow_difficulty, - accepted: worker_stats.num_accepted, - rejected: worker_stats.num_rejected, - stale: worker_stats.num_stale, - }; - if worker_stats.initial_block_height == 0 { - worker_stats.initial_block_height = status.height; - } - debug!("(Server ID: {}) Status of worker: {} - Share Accepted: {}, Rejected: {}, Stale: {}. Blocks Found: {}/{}", - self.id, - worker_stats.id, - worker_stats.num_accepted, - worker_stats.num_rejected, - worker_stats.num_stale, - worker_stats.num_blocks_found, - status.height - worker_stats.initial_block_height, - ); - let response = serde_json::to_value(&status).unwrap(); - return Ok(response); - } - - // Handle GETJOBTEMPLATE message - fn handle_getjobtemplate(&self) -> Result { - // Build a JobTemplate from a BlockHeader and return JSON - let job_template = self.build_block_template(); - let response = serde_json::to_value(&job_template).unwrap(); - debug!( - "(Server ID: {}) sending block {} with id {} to single worker", - self.id, job_template.height, job_template.job_id, - ); - return Ok(response); - } - - // Handle KEEPALIVE message - fn handle_keepalive(&self) -> Result { - return Ok(serde_json::to_value("ok".to_string()).unwrap()); - } - - // Handle LOGIN message - fn handle_login(&self, params: Option, worker: &mut Worker) -> Result { - let params: LoginParams = parse_params(params)?; - worker.login = Some(params.login); - // XXX TODO Future - Validate password? - worker.agent = params.agent; - worker.authenticated = true; - return Ok(serde_json::to_value("ok".to_string()).unwrap()); - } - - // Handle SUBMIT message - // params contains a solved block header - // We accept and log valid shares of all difficulty above configured minimum - // Accepted shares that are full solutions will also be submitted to the - // network - fn handle_submit( - &self, - params: Option, - worker: &mut Worker, - worker_stats: &mut WorkerStats, - ) -> Result<(Value, bool), Value> { - // Validate parameters - let params: SubmitParams = parse_params(params)?; - - // Find the correct version of the block to match this header - let b: Option<&Block> = self.current_block_versions.get(params.job_id as usize); - if params.height != self.current_block_versions.last().unwrap().header.height || b.is_none() - { - // Return error status - error!( - "(Server ID: {}) Share at height {}, edge_bits {}, nonce {}, job_id {} submitted too late", - self.id, params.height, params.edge_bits, params.nonce, params.job_id, - ); - worker_stats.num_stale += 1; - let e = RpcError { - code: -32503, - message: "Solution submitted too late".to_string(), - }; - return Err(serde_json::to_value(e).unwrap()); - } - - let share_difficulty: u64; - let mut share_is_block = false; - - let mut b: Block = b.unwrap().clone(); - // Reconstruct the blocks header with this nonce and pow added - b.header.pow.proof.edge_bits = params.edge_bits as u8; - b.header.pow.nonce = params.nonce; - b.header.pow.proof.nonces = params.pow; - - if !b.header.pow.is_primary() && !b.header.pow.is_secondary() { - // Return error status - error!( - "(Server ID: {}) Failed to validate solution at height {}, hash {}, edge_bits {}, nonce {}, job_id {}: cuckoo size too small", - self.id, params.height, b.hash(), params.edge_bits, params.nonce, params.job_id, - ); - worker_stats.num_rejected += 1; - let e = RpcError { - code: -32502, - message: "Failed to validate solution".to_string(), - }; - return Err(serde_json::to_value(e).unwrap()); - } - - // Get share difficulty - share_difficulty = b.header.pow.to_difficulty(b.header.height).to_num(); - // If the difficulty is too low its an error - if share_difficulty < self.minimum_share_difficulty { - // Return error status - error!( - "(Server ID: {}) Share at height {}, hash {}, edge_bits {}, nonce {}, job_id {} rejected due to low difficulty: {}/{}", - self.id, params.height, b.hash(), params.edge_bits, params.nonce, params.job_id, share_difficulty, self.minimum_share_difficulty, - ); - worker_stats.num_rejected += 1; - let e = RpcError { - code: -32501, - message: "Share rejected due to low difficulty".to_string(), - }; - return Err(serde_json::to_value(e).unwrap()); - } - // If the difficulty is high enough, submit it (which also validates it) - if share_difficulty >= self.current_difficulty { - // This is a full solution, submit it to the network - let res = self.chain.process_block(b.clone(), chain::Options::MINE); - if let Err(e) = res { - // Return error status - error!( - "(Server ID: {}) Failed to validate solution at height {}, hash {}, edge_bits {}, nonce {}, job_id {}, {}: {}", - self.id, - params.height, - b.hash(), - params.edge_bits, - params.nonce, - params.job_id, - e, - e.backtrace().unwrap(), - ); - worker_stats.num_rejected += 1; - let e = RpcError { - code: -32502, - message: "Failed to validate solution".to_string(), - }; - return Err(serde_json::to_value(e).unwrap()); - } - share_is_block = true; - worker_stats.num_blocks_found += 1; - // Log message to make it obvious we found a block - warn!( - "(Server ID: {}) Solution Found for block {}, hash {} - Yay!!! Worker ID: {}, blocks found: {}, shares: {}", - self.id, params.height, - b.hash(), - worker_stats.id, - worker_stats.num_blocks_found, - worker_stats.num_accepted, - ); - } else { - // Do some validation but dont submit - let res = pow::verify_size(&b.header); - if !res.is_ok() { - // Return error status - error!( - "(Server ID: {}) Failed to validate share at height {}, hash {}, edge_bits {}, nonce {}, job_id {}. {:?}", - self.id, - params.height, - b.hash(), - params.edge_bits, - b.header.pow.nonce, - params.job_id, - res, - ); - worker_stats.num_rejected += 1; - let e = RpcError { - code: -32502, - message: "Failed to validate solution".to_string(), - }; - return Err(serde_json::to_value(e).unwrap()); - } - } - // Log this as a valid share - let submitted_by = match worker.login.clone() { - None => worker.id.to_string(), - Some(login) => login.clone(), - }; - info!( - "(Server ID: {}) Got share at height {}, hash {}, edge_bits {}, nonce {}, job_id {}, difficulty {}/{}, submitted by {}", - self.id, - b.header.height, - b.hash(), - b.header.pow.proof.edge_bits, - b.header.pow.nonce, - params.job_id, - share_difficulty, - self.current_difficulty, - submitted_by, - ); - worker_stats.num_accepted += 1; - let submit_response; - if share_is_block { - submit_response = format!("blockfound - {}", b.hash().to_hex()); - } else { - submit_response = "ok".to_string(); - } - return Ok(( - serde_json::to_value(submit_response).unwrap(), - share_is_block, - )); - } // handle submit a solution - - // Purge dead/sick workers - remove all workers marked in error state - fn clean_workers(&mut self, stratum_stats: &mut Arc>) -> usize { - let mut start = 0; - let mut workers_l = self.workers.lock(); - loop { - for num in start..workers_l.len() { - if workers_l[num].error == true { - warn!( - "(Server ID: {}) Dropping worker: {}", - self.id, workers_l[num].id - ); - // Update worker stats - let mut stratum_stats = stratum_stats.write(); - let worker_stats_id = stratum_stats - .worker_stats - .iter() - .position(|r| r.id == workers_l[num].id) - .unwrap(); - stratum_stats.worker_stats[worker_stats_id].is_connected = false; - // Remove the dead worker - workers_l.remove(num); - break; - } - start = num + 1; - } - if start >= workers_l.len() { - let mut stratum_stats = stratum_stats.write(); - stratum_stats.num_workers = workers_l.len(); - return stratum_stats.num_workers; - } - } - } - - // Broadcast a jobtemplate RpcRequest to all connected workers - no response - // expected - fn broadcast_job(&mut self) { - // Package new block into RpcRequest - let job_template = self.build_block_template(); - let job_template_json = serde_json::to_string(&job_template).unwrap(); - // Issue #1159 - use a serde_json Value type to avoid extra quoting - let job_template_value: Value = serde_json::from_str(&job_template_json).unwrap(); - let job_request = RpcRequest { - id: String::from("Stratum"), - jsonrpc: String::from("2.0"), - method: String::from("job"), - params: Some(job_template_value), - }; - let job_request_json = serde_json::to_string(&job_request).unwrap(); - debug!( - "(Server ID: {}) sending block {} with id {} to stratum clients", - self.id, job_template.height, job_template.job_id, - ); - // Push the new block to all connected clients - // NOTE: We do not give a unique nonce (should we?) so miners need - // to choose one for themselves - let mut workers_l = self.workers.lock(); - for num in 0..workers_l.len() { - workers_l[num].write_message(job_request_json.clone()); - } - } - - /// "main()" - Starts the stratum-server. Creates a thread to Listens for - /// a connection, then enters a loop, building a new block on top of the - /// existing chain anytime required and sending that to the connected - /// stratum miner, proxy, or pool, and accepts full solutions to - /// be submitted. - pub fn run_loop(&mut self, edge_bits: u32, proof_size: usize, sync_state: Arc) { - info!( - "(Server ID: {}) Starting stratum server with edge_bits = {}, proof_size = {}", - self.id, edge_bits, proof_size - ); - - self.sync_state = sync_state; - - // "globals" for this function - let attempt_time_per_block = self.config.attempt_time_per_block; - let mut deadline: i64 = 0; - // to prevent the wallet from generating a new HD key derivation for each - // iteration, we keep the returned derivation to provide it back when - // nothing has changed. We only want to create a key_id for each new block, - // and reuse it when we rebuild the current block to add new tx. - let mut num_workers: usize; - let mut head = self.chain.head().unwrap(); - let mut current_hash = head.prev_block_h; - let mut latest_hash; - let listen_addr = self.config.stratum_server_addr.clone().unwrap(); - self.current_block_versions.push(Block::default()); - - // Start a thread to accept new worker connections - let mut workers_th = self.workers.clone(); - let id_th = self.id.clone(); - let mut stats_th = self.stratum_stats.clone(); - let _listener_th = thread::spawn(move || { - accept_workers(id_th, listen_addr, &mut workers_th, &mut stats_th); - }); - - // We have started - { - let mut stratum_stats = self.stratum_stats.write(); - stratum_stats.is_running = true; - stratum_stats.edge_bits = edge_bits as u16; - } - - warn!( - "Stratum server started on {}", - self.config.stratum_server_addr.clone().unwrap() - ); - - // Initial Loop. Waiting node complete syncing - while self.sync_state.is_syncing() { - self.clean_workers(&mut self.stratum_stats.clone()); - - // Handle any messages from the workers - self.handle_rpc_requests(&mut self.stratum_stats.clone()); - - thread::sleep(Duration::from_millis(50)); - } - - // Main Loop - loop { - // Remove workers with failed connections - num_workers = self.clean_workers(&mut self.stratum_stats.clone()); - - // get the latest chain state - head = self.chain.head().unwrap(); - latest_hash = head.last_block_h; - - // Build a new block if: - // There is a new block on the chain - // or We are rebuilding the current one to include new transactions - // and there is at least one worker connected - if (current_hash != latest_hash || Utc::now().timestamp() >= deadline) - && num_workers > 0 - { - let mut wallet_listener_url: Option = None; - if !self.config.burn_reward { - wallet_listener_url = Some(self.config.wallet_listener_url.clone()); - } - // If this is a new block, clear the current_block version history - if current_hash != latest_hash { - self.current_block_versions.clear(); - } - // Build the new block (version) - let (new_block, block_fees) = mine_block::get_block( - &self.chain, - &self.tx_pool, - self.verifier_cache.clone(), - self.current_key_id.clone(), - wallet_listener_url, - ); - self.current_difficulty = - (new_block.header.total_difficulty() - head.total_difficulty).to_num(); - self.current_key_id = block_fees.key_id(); - current_hash = latest_hash; - // set the minimum acceptable share difficulty for this block - self.minimum_share_difficulty = cmp::min( - self.config.minimum_share_difficulty, - self.current_difficulty, - ); - // set a new deadline for rebuilding with fresh transactions - deadline = Utc::now().timestamp() + attempt_time_per_block as i64; - - { - let mut stratum_stats = self.stratum_stats.write(); - stratum_stats.block_height = new_block.header.height; - stratum_stats.network_difficulty = self.current_difficulty; - } - // Add this new block version to our current block map - self.current_block_versions.push(new_block); - // Send this job to all connected workers - self.broadcast_job(); - } - - // Handle any messages from the workers - self.handle_rpc_requests(&mut self.stratum_stats.clone()); - - // sleep before restarting loop - thread::sleep(Duration::from_micros(1)); - } // Main Loop - } // fn run_loop() -} // StratumServer - -// Utility function to parse a JSON RPC parameter object, returning a proper -// error if things go wrong. -fn parse_params(params: Option) -> Result -where - for<'de> T: serde::Deserialize<'de>, -{ - params - .and_then(|v| serde_json::from_value(v).ok()) - .ok_or_else(|| { - let e = RpcError { - code: -32600, - message: "Invalid Request".to_string(), - }; - serde_json::to_value(e).unwrap() - }) -} From c8bf32ec3e8778f866ed05f2936d5bf6dc7c860b Mon Sep 17 00:00:00 2001 From: Max Lavrenov Date: Mon, 28 Jan 2019 23:37:49 +0100 Subject: [PATCH 09/11] fixes after revew --- servers/src/mining/stratumserver.rs | 73 +++++++++++++---------------- 1 file changed, 33 insertions(+), 40 deletions(-) diff --git a/servers/src/mining/stratumserver.rs b/servers/src/mining/stratumserver.rs index a9433a852f..77962db1c4 100644 --- a/servers/src/mining/stratumserver.rs +++ b/servers/src/mining/stratumserver.rs @@ -25,9 +25,8 @@ use chrono::prelude::Utc; use serde; use serde_json; use serde_json::Value; -use std::io::BufReader; -//use std::net::{TcpListener, TcpStream}; use std::collections::HashMap; +use std::io::BufReader; use std::net::SocketAddr; use std::sync::Arc; use std::time::{Duration, SystemTime}; @@ -132,11 +131,7 @@ where { fn from(e: T) -> Self { error!("Received unhandled error: {}", e); - let err = RpcError { - code: 32603, - message: "Internal error".to_owned(), - }; - err + RpcError::internal_error() } } @@ -197,14 +192,14 @@ impl Handler { chain: Arc, ) -> Self { Handler { - id: id, - workers: workers, - current_block_versions: current_block_versions, - sync_state: sync_state, - minimum_share_difficulty: minimum_share_difficulty, - current_key_id: current_key_id, - current_difficulty: current_difficulty, - chain: chain, + id, + workers, + current_block_versions, + sync_state, + minimum_share_difficulty, + current_key_id, + current_difficulty, + chain, } } pub fn from_stratum(stratum: &StratumServer) -> Self { @@ -250,28 +245,23 @@ impl Handler { }; // Package the reply as RpcResponse json - match response { - Err(rpc_error) => { - let resp = RpcResponse { - id: request.id, - jsonrpc: String::from("2.0"), - method: request.method, - result: None, - error: Some(rpc_error.into()), - }; - serde_json::to_string(&resp).unwrap() - } - Ok(response) => { - let resp = RpcResponse { - id: request.id, - jsonrpc: String::from("2.0"), - method: request.method, - result: Some(response), - error: None, - }; - serde_json::to_string(&resp).unwrap() - } - } + let resp = match response { + Err(rpc_error) => RpcResponse { + id: request.id, + jsonrpc: String::from("2.0"), + method: request.method, + result: None, + error: Some(rpc_error.into()), + }, + Ok(response) => RpcResponse { + id: request.id, + jsonrpc: String::from("2.0"), + method: request.method, + result: Some(response), + error: None, + }, + }; + serde_json::to_string(&resp).unwrap() } fn handle_login(&self, params: Option, worker_id: usize) -> Result { let params: LoginParams = parse_params(params)?; @@ -520,7 +510,10 @@ impl Handler { // ---------------------------------------- // Worker Factory Thread Function fn accept_connections(listen_addr: SocketAddr, handler: Handler) { - let listener = TcpListener::bind(&listen_addr).expect("Failed to bind to listen address"); + let listener = TcpListener::bind(&listen_addr).expect(&format!( + "Stratum: Failed to bind to listen address {}", + listen_addr + )); let handler = Arc::new(handler); let server = listener .incoming() @@ -624,7 +617,7 @@ impl WorkersList { self.workers_list .write() .remove(&worker_id) - .expect("no such addr in map"); + .expect("Stratum: no such addr in map"); self.stratum_stats.write().num_workers = self.workers_list.read().len(); } @@ -773,7 +766,7 @@ impl StratumServer { .clone() .unwrap() .parse() - .expect("Incorrect address"); + .expect("Stratum: Incorrect address "); { self.current_block_versions.write().push(Block::default()); } From f267da9b1ab98b197814f3a72574f5de9846e61b Mon Sep 17 00:00:00 2001 From: Max Lavrenov Date: Tue, 29 Jan 2019 14:49:00 +0100 Subject: [PATCH 10/11] reduce sleep time in mail loop to 5ms --- servers/src/mining/stratumserver.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/servers/src/mining/stratumserver.rs b/servers/src/mining/stratumserver.rs index 77962db1c4..799bc37dbd 100644 --- a/servers/src/mining/stratumserver.rs +++ b/servers/src/mining/stratumserver.rs @@ -858,7 +858,7 @@ impl StratumServer { } // sleep before restarting loop - thread::sleep(Duration::from_millis(50)); + thread::sleep(Duration::from_millis(5)); } // Main Loop } // fn run_loop() } // StratumServer From 9bdd6e7861eeb942145ce98bf9baeb075abba7a8 Mon Sep 17 00:00:00 2001 From: Max Lavrenov Date: Mon, 4 Feb 2019 18:45:48 +0100 Subject: [PATCH 11/11] add debug info --- servers/src/mining/stratumserver.rs | 1 + 1 file changed, 1 insertion(+) diff --git a/servers/src/mining/stratumserver.rs b/servers/src/mining/stratumserver.rs index 799bc37dbd..929c27edf2 100644 --- a/servers/src/mining/stratumserver.rs +++ b/servers/src/mining/stratumserver.rs @@ -510,6 +510,7 @@ impl Handler { // ---------------------------------------- // Worker Factory Thread Function fn accept_connections(listen_addr: SocketAddr, handler: Handler) { + info!("Start tokio stratum server"); let listener = TcpListener::bind(&listen_addr).expect(&format!( "Stratum: Failed to bind to listen address {}", listen_addr