diff --git a/servers/src/mining/stratumserver.rs b/servers/src/mining/stratumserver.rs index 929c27edf2..01d5195101 100644 --- a/servers/src/mining/stratumserver.rs +++ b/servers/src/mining/stratumserver.rs @@ -169,48 +169,59 @@ pub struct WorkerStatus { stale: u64, } +struct State { + current_block_versions: Vec, + // to prevent the wallet from generating a new HD key derivation for each + // iteration, we keep the returned derivation to provide it back when + // nothing has changed. We only want to create a key_id for each new block, + // and reuse it when we rebuild the current block to add new tx. + current_key_id: Option, + current_difficulty: u64, + minimum_share_difficulty: u64, +} + +impl State { + pub fn new(minimum_share_difficulty: u64) -> Self { + let blocks = vec![Block::default()]; + State { + current_block_versions: blocks, + current_key_id: None, + current_difficulty: ::max_value(), + minimum_share_difficulty: minimum_share_difficulty, + } + } +} + struct Handler { id: String, workers: Arc, - current_block_versions: Arc>>, sync_state: Arc, - minimum_share_difficulty: Arc>, - current_key_id: Arc>>, - current_difficulty: Arc>, chain: Arc, + current_state: Arc>, } impl Handler { pub fn new( id: String, - workers: Arc, - current_block_versions: Arc>>, + stratum_stats: Arc>, sync_state: Arc, - minimum_share_difficulty: Arc>, - current_key_id: Arc>>, - current_difficulty: Arc>, + minimum_share_difficulty: u64, chain: Arc, ) -> Self { Handler { - id, - workers, - current_block_versions, - sync_state, - minimum_share_difficulty, - current_key_id, - current_difficulty, - chain, + id: id, + workers: Arc::new(WorkersList::new(stratum_stats.clone())), + sync_state: sync_state, + chain: chain, + current_state: Arc::new(RwLock::new(State::new(minimum_share_difficulty))), } } pub fn from_stratum(stratum: &StratumServer) -> Self { Handler::new( stratum.id.clone(), - stratum.workers.clone(), - stratum.current_block_versions.clone(), + stratum.stratum_stats.clone(), stratum.sync_state.clone(), - stratum.minimum_share_difficulty.clone(), - stratum.current_key_id.clone(), - stratum.current_difficulty.clone(), + stratum.config.minimum_share_difficulty, stratum.chain.clone(), ) } @@ -224,8 +235,7 @@ impl Handler { let res = self.handle_submit(request.params, worker_id); // this key_id has been used now, reset if let Ok((_, true)) = res { - let mut current_key_id = self.current_key_id.write(); - *current_key_id = None; + self.current_state.write().current_key_id = None; } res.map(|(v, _)| v) } @@ -265,14 +275,7 @@ impl Handler { } fn handle_login(&self, params: Option, worker_id: usize) -> Result { let params: LoginParams = parse_params(params)?; - let mut workers = self.workers.workers_list.write(); - let worker = workers - .get_mut(&worker_id) - .ok_or(RpcError::internal_error())?; - worker.login = Some(params.login); - // XXX TODO Future - Validate password? - worker.agent = params.agent; - worker.authenticated = true; + self.workers.login(worker_id, params.login, params.agent)?; return Ok("ok".into()); } @@ -283,21 +286,21 @@ impl Handler { fn handle_status(&self, worker_id: usize) -> Result { // Return worker status in json for use by a dashboard or healthcheck. + let stats = self.workers.get_stats(worker_id)?; let status = WorkerStatus { - id: self.workers.stratum_stats.read().worker_stats[worker_id] - .id - .clone(), + id: stats.id.clone(), height: self - .current_block_versions + .current_state .read() + .current_block_versions .last() .unwrap() .header .height, - difficulty: self.workers.stratum_stats.read().worker_stats[worker_id].pow_difficulty, - accepted: self.workers.stratum_stats.read().worker_stats[worker_id].num_accepted, - rejected: self.workers.stratum_stats.read().worker_stats[worker_id].num_rejected, - stale: self.workers.stratum_stats.read().worker_stats[worker_id].num_stale, + difficulty: stats.pow_difficulty, + accepted: stats.num_accepted, + rejected: stats.num_rejected, + stale: stats.num_stale, }; let response = serde_json::to_value(&status).unwrap(); return Ok(response); @@ -317,8 +320,9 @@ impl Handler { // Build and return a JobTemplate for mining the current block fn build_block_template(&self) -> JobTemplate { let bh = self - .current_block_versions + .current_state .read() + .current_block_versions .last() .unwrap() .header @@ -333,8 +337,8 @@ impl Handler { let pre_pow = util::to_hex(header_buf); let job_template = JobTemplate { height: bh.height, - job_id: (self.current_block_versions.read().len() - 1) as u64, - difficulty: *self.minimum_share_difficulty.read(), + job_id: (self.current_state.read().current_block_versions.len() - 1) as u64, + difficulty: self.current_state.read().minimum_share_difficulty, pre_pow, }; return job_template; @@ -352,17 +356,11 @@ impl Handler { // Validate parameters let params: SubmitParams = parse_params(params)?; - let current_block_versions = self.current_block_versions.read(); + let state = self.current_state.read(); // Find the correct version of the block to match this header - let b: Option<&Block> = current_block_versions.get(params.job_id as usize); - if params.height - != self - .current_block_versions - .read() - .last() - .unwrap() - .header - .height || b.is_none() + let b: Option<&Block> = state.current_block_versions.get(params.job_id as usize); + if params.height != state.current_block_versions.last().unwrap().header.height + || b.is_none() { // Return error status error!( @@ -396,11 +394,11 @@ impl Handler { // Get share difficulty share_difficulty = b.header.pow.to_difficulty(b.header.height).to_num(); // If the difficulty is too low its an error - if share_difficulty < *self.minimum_share_difficulty.read() { + if share_difficulty < state.minimum_share_difficulty { // Return error status error!( "(Server ID: {}) Share at height {}, hash {}, edge_bits {}, nonce {}, job_id {} rejected due to low difficulty: {}/{}", - self.id, params.height, b.hash(), params.edge_bits, params.nonce, params.job_id, share_difficulty, *self.minimum_share_difficulty.read(), + self.id, params.height, b.hash(), params.edge_bits, params.nonce, params.job_id, share_difficulty, state.minimum_share_difficulty, ); self.workers .update_stats(worker_id, |worker_stats| worker_stats.num_rejected += 1); @@ -408,7 +406,7 @@ impl Handler { } // If the difficulty is high enough, submit it (which also validates it) - if share_difficulty >= *self.current_difficulty.read() { + if share_difficulty >= state.current_difficulty { // This is a full solution, submit it to the network let res = self.chain.process_block(b.clone(), chain::Options::MINE); if let Err(e) = res { @@ -432,13 +430,14 @@ impl Handler { self.workers .update_stats(worker_id, |worker_stats| worker_stats.num_blocks_found += 1); // Log message to make it obvious we found a block + let stats = self.workers.get_stats(worker_id)?; warn!( "(Server ID: {}) Solution Found for block {}, hash {} - Yay!!! Worker ID: {}, blocks found: {}, shares: {}", self.id, params.height, b.hash(), - self.workers.stratum_stats.read().worker_stats[worker_id].id, - self.workers.stratum_stats.read().worker_stats[worker_id].num_blocks_found, - self.workers.stratum_stats.read().worker_stats[worker_id].num_accepted, + stats.id, + stats.num_blocks_found, + stats.num_accepted, ); } else { // Do some validation but dont submit @@ -461,25 +460,12 @@ impl Handler { } } // Log this as a valid share - let submitted_by = match self - .workers - .workers_list - .read() - .get(&worker_id) - .unwrap() - .login - .clone() - { - None => self - .workers - .workers_list - .read() - .get(&worker_id) - .unwrap() - .id - .to_string(), + let worker = self.workers.get_worker(worker_id)?; + let submitted_by = match worker.login { + None => worker.id.to_string(), Some(login) => login.clone(), }; + info!( "(Server ID: {}) Got share at height {}, hash {}, edge_bits {}, nonce {}, job_id {}, difficulty {}/{}, submitted by {}", self.id, @@ -489,7 +475,7 @@ impl Handler { b.header.pow.nonce, params.job_id, share_difficulty, - *self.current_difficulty.read(), + state.current_difficulty, submitted_by, ); self.workers @@ -505,17 +491,109 @@ impl Handler { share_is_block, )); } // handle submit a solution + + fn broadcast_job(&self) { + debug!("broadcast job"); + // Package new block into RpcRequest + let job_template = self.build_block_template(); + let job_template_json = serde_json::to_string(&job_template).unwrap(); + // Issue #1159 - use a serde_json Value type to avoid extra quoting + let job_template_value: Value = serde_json::from_str(&job_template_json).unwrap(); + let job_request = RpcRequest { + id: String::from("Stratum"), + jsonrpc: String::from("2.0"), + method: String::from("job"), + params: Some(job_template_value), + }; + let job_request_json = serde_json::to_string(&job_request).unwrap(); + debug!( + "(Server ID: {}) sending block {} with id {} to stratum clients", + self.id, job_template.height, job_template.job_id, + ); + self.workers.broadcast(job_request_json.clone()); + } + + pub fn run( + &self, + config: &StratumServerConfig, + tx_pool: &Arc>, + verifier_cache: Arc>, + ) { + debug!("Run main loop"); + let mut deadline: i64 = 0; + let mut head = self.chain.head().unwrap(); + let mut current_hash = head.prev_block_h; + loop { + // get the latest chain state + head = self.chain.head().unwrap(); + let latest_hash = head.last_block_h; + + // Build a new block if: + // There is a new block on the chain + // or We are rebuilding the current one to include new transactions + // and there is at least one worker connected + if (current_hash != latest_hash || Utc::now().timestamp() >= deadline) + && self.workers.count() > 0 + { + { + debug!("resend updated block"); + let mut state = self.current_state.write(); + let mut wallet_listener_url: Option = None; + if !config.burn_reward { + wallet_listener_url = Some(config.wallet_listener_url.clone()); + } + // If this is a new block, clear the current_block version history + let clear_blocks = current_hash != latest_hash; + + // Build the new block (version) + let (new_block, block_fees) = mine_block::get_block( + &self.chain, + tx_pool, + verifier_cache.clone(), + state.current_key_id.clone(), + wallet_listener_url, + ); + + state.current_difficulty = + (new_block.header.total_difficulty() - head.total_difficulty).to_num(); + + state.current_key_id = block_fees.key_id(); + + current_hash = latest_hash; + // set the minimum acceptable share difficulty for this block + state.minimum_share_difficulty = + cmp::min(config.minimum_share_difficulty, state.current_difficulty); + + // set a new deadline for rebuilding with fresh transactions + deadline = Utc::now().timestamp() + config.attempt_time_per_block as i64; + + self.workers.update_block_height(new_block.header.height); + self.workers + .update_network_difficulty(state.current_difficulty); + + if clear_blocks { + state.current_block_versions.clear(); + } + state.current_block_versions.push(new_block); + // Send this job to all connected workers + } + self.broadcast_job(); + } + + // sleep before restarting loop + thread::sleep(Duration::from_millis(5)); + } // Main Loop + } } // ---------------------------------------- // Worker Factory Thread Function -fn accept_connections(listen_addr: SocketAddr, handler: Handler) { +fn accept_connections(listen_addr: SocketAddr, handler: Arc) { info!("Start tokio stratum server"); let listener = TcpListener::bind(&listen_addr).expect(&format!( "Stratum: Failed to bind to listen address {}", listen_addr )); - let handler = Arc::new(handler); let server = listener .incoming() .for_each(move |socket| { @@ -564,6 +642,7 @@ fn accept_connections(listen_addr: SocketAddr, handler: Handler) { // ---------------------------------------- // Worker Object - a connected stratum client - a miner, pool, proxy, etc... +#[derive(Clone)] pub struct Worker { id: usize, agent: String, @@ -622,6 +701,36 @@ impl WorkersList { self.stratum_stats.write().num_workers = self.workers_list.read().len(); } + pub fn login(&self, worker_id: usize, login: String, agent: String) -> Result<(), RpcError> { + let mut wl = self.workers_list.write(); + let mut worker = wl.get_mut(&worker_id).ok_or(RpcError::internal_error())?; + worker.login = Some(login); + // XXX TODO Future - Validate password? + worker.agent = agent; + worker.authenticated = true; + Ok(()) + } + + pub fn get_worker(&self, worker_id: usize) -> Result { + self.workers_list + .read() + .get(&worker_id) + .ok_or_else(|| { + error!("Worker {} not found", worker_id); + RpcError::internal_error() + }) + .map(|w| w.clone()) + } + + pub fn get_stats(&self, worker_id: usize) -> Result { + self.stratum_stats + .read() + .worker_stats + .get(worker_id) + .ok_or(RpcError::internal_error()) + .map(|ws| ws.clone()) + } + pub fn last_seen(&self, worker_id: usize) { //self.stratum_stats.write().worker_stats[worker_id].last_seen = SystemTime::now(); self.update_stats(worker_id, |ws| ws.last_seen = SystemTime::now()); @@ -640,9 +749,26 @@ impl WorkersList { .tx .unbounded_send(msg); } + + pub fn broadcast(&self, msg: String) { + for worker in self.workers_list.read().values() { + worker.tx.unbounded_send(msg.clone()); + } + } + pub fn count(&self) -> usize { self.workers_list.read().len() } + + pub fn update_block_height(&self, height: u64) { + let mut stratum_stats = self.stratum_stats.write(); + stratum_stats.block_height = height; + } + + pub fn update_network_difficulty(&self, difficulty: u64) { + let mut stratum_stats = self.stratum_stats.write(); + stratum_stats.network_difficulty = difficulty; + } } // ---------------------------------------- @@ -654,11 +780,6 @@ pub struct StratumServer { chain: Arc, tx_pool: Arc>, verifier_cache: Arc>, - current_block_versions: Arc>>, - current_difficulty: Arc>, - minimum_share_difficulty: Arc>, - current_key_id: Arc>>, - workers: Arc, sync_state: Arc, stratum_stats: Arc>, } @@ -674,70 +795,15 @@ impl StratumServer { ) -> StratumServer { StratumServer { id: String::from("0"), - minimum_share_difficulty: Arc::new(RwLock::new(config.minimum_share_difficulty)), config, chain, tx_pool, verifier_cache, - current_block_versions: Arc::new(RwLock::new(Vec::new())), - current_difficulty: Arc::new(RwLock::new(::max_value())), - current_key_id: Arc::new(RwLock::new(None)), - workers: Arc::new(WorkersList::new(stratum_stats.clone())), sync_state: Arc::new(SyncState::new()), stratum_stats: stratum_stats, } } - // Build and return a JobTemplate for mining the current block - fn build_block_template(&self) -> JobTemplate { - let bh = self - .current_block_versions - .read() - .last() - .unwrap() - .header - .clone(); - // Serialize the block header into pre and post nonce strings - let mut header_buf = vec![]; - { - let mut writer = ser::BinWriter::new(&mut header_buf); - bh.write_pre_pow(&mut writer).unwrap(); - bh.pow.write_pre_pow(bh.version, &mut writer).unwrap(); - } - let pre_pow = util::to_hex(header_buf); - let job_template = JobTemplate { - height: bh.height, - job_id: (self.current_block_versions.read().len() - 1) as u64, - difficulty: *self.minimum_share_difficulty.read(), - pre_pow, - }; - return job_template; - } - - // Broadcast a jobtemplate RpcRequest to all connected workers - no response - // expected - fn broadcast_job(&mut self) { - // Package new block into RpcRequest - let job_template = self.build_block_template(); - let job_template_json = serde_json::to_string(&job_template).unwrap(); - // Issue #1159 - use a serde_json Value type to avoid extra quoting - let job_template_value: Value = serde_json::from_str(&job_template_json).unwrap(); - let job_request = RpcRequest { - id: String::from("Stratum"), - jsonrpc: String::from("2.0"), - method: String::from("job"), - params: Some(job_template_value), - }; - let job_request_json = serde_json::to_string(&job_request).unwrap(); - debug!( - "(Server ID: {}) sending block {} with id {} to stratum clients", - self.id, job_template.height, job_template.job_id, - ); - for worker in self.workers.workers_list.read().values() { - worker.tx.unbounded_send(job_request_json.clone()); - } - } - /// "main()" - Starts the stratum-server. Creates a thread to Listens for /// a connection, then enters a loop, building a new block on top of the /// existing chain anytime required and sending that to the connected @@ -751,16 +817,6 @@ impl StratumServer { self.sync_state = sync_state; - // "globals" for this function - let attempt_time_per_block = self.config.attempt_time_per_block; - let mut deadline: i64 = 0; - // to prevent the wallet from generating a new HD key derivation for each - // iteration, we keep the returned derivation to provide it back when - // nothing has changed. We only want to create a key_id for each new block, - // and reuse it when we rebuild the current block to add new tx. - let mut head = self.chain.head().unwrap(); - let mut current_hash = head.prev_block_h; - let mut latest_hash; let listen_addr = self .config .stratum_server_addr @@ -768,14 +824,12 @@ impl StratumServer { .unwrap() .parse() .expect("Stratum: Incorrect address "); - { - self.current_block_versions.write().push(Block::default()); - } - let handler = Handler::from_stratum(&self); + let handler = Arc::new(Handler::from_stratum(&self)); + let h = handler.clone(); let _listener_th = thread::spawn(move || { - accept_connections(listen_addr, handler); + accept_connections(listen_addr, h); }); // We have started @@ -795,72 +849,7 @@ impl StratumServer { thread::sleep(Duration::from_millis(50)); } - // Main Loop - loop { - // get the latest chain state - head = self.chain.head().unwrap(); - latest_hash = head.last_block_h; - - // Build a new block if: - // There is a new block on the chain - // or We are rebuilding the current one to include new transactions - // and there is at least one worker connected - if (current_hash != latest_hash || Utc::now().timestamp() >= deadline) - && self.workers.count() > 0 - { - { - let mut current_block_versions = self.current_block_versions.write(); - let mut wallet_listener_url: Option = None; - if !self.config.burn_reward { - wallet_listener_url = Some(self.config.wallet_listener_url.clone()); - } - // If this is a new block, clear the current_block version history - if current_hash != latest_hash { - current_block_versions.clear(); - } - // Build the new block (version) - let (new_block, block_fees) = mine_block::get_block( - &self.chain, - &self.tx_pool, - self.verifier_cache.clone(), - self.current_key_id.read().clone(), - wallet_listener_url, - ); - { - let mut current_difficulty = self.current_difficulty.write(); - *current_difficulty = - (new_block.header.total_difficulty() - head.total_difficulty).to_num(); - } - { - let mut current_key_id = self.current_key_id.write(); - *current_key_id = block_fees.key_id(); - } - current_hash = latest_hash; - { - // set the minimum acceptable share difficulty for this block - let mut minimum_share_difficulty = self.minimum_share_difficulty.write(); - *minimum_share_difficulty = cmp::min( - self.config.minimum_share_difficulty, - *self.current_difficulty.read(), - ); - } - // set a new deadline for rebuilding with fresh transactions - deadline = Utc::now().timestamp() + attempt_time_per_block as i64; - - let mut stratum_stats = self.stratum_stats.write(); - stratum_stats.block_height = new_block.header.height; - stratum_stats.network_difficulty = *self.current_difficulty.read(); - - // Add this new block version to our current block map - current_block_versions.push(new_block); - } - // Send this job to all connected workers - self.broadcast_job(); - } - - // sleep before restarting loop - thread::sleep(Duration::from_millis(5)); - } // Main Loop + handler.run(&self.config, &self.tx_pool, self.verifier_cache.clone()); } // fn run_loop() } // StratumServer