From cd4637ef5277ea43bdc6b29a6558c40c64987475 Mon Sep 17 00:00:00 2001 From: David Palm Date: Thu, 10 Oct 2019 13:57:49 +0200 Subject: [PATCH 1/3] Cleanup stratum a bit Salvage some code from https://github.com/paritytech/parity-ethereum/pull/10884 + some cleanup and typos. --- ethcore/src/miner/stratum.rs | 9 ++------ miner/stratum/src/lib.rs | 41 ++++++++++++++++++------------------ miner/stratum/src/traits.rs | 2 +- 3 files changed, 24 insertions(+), 28 deletions(-) diff --git a/ethcore/src/miner/stratum.rs b/ethcore/src/miner/stratum.rs index 49e8c9483de..16edf2d7cd4 100644 --- a/ethcore/src/miner/stratum.rs +++ b/ethcore/src/miner/stratum.rs @@ -225,8 +225,6 @@ impl NotifyWork for Stratum { self.service.push_work_all( self.dispatcher.payload(pow_hash, difficulty, number) - ).unwrap_or_else( - |e| warn!(target: "stratum", "Error while pushing work: {:?}", e) ); } } @@ -239,16 +237,13 @@ impl Stratum { let dispatcher = Arc::new(StratumJobDispatcher::new(miner, client)); - let stratum_svc = StratumService::start( + let service = StratumService::start( &SocketAddr::new(options.listen_addr.parse::()?, options.port), dispatcher.clone(), options.secret.clone(), )?; - Ok(Stratum { - dispatcher: dispatcher, - service: stratum_svc, - }) + Ok(Stratum { dispatcher, service }) } /// Start STRATUM job dispatcher and register it in the miner diff --git a/miner/stratum/src/lib.rs b/miner/stratum/src/lib.rs index cb2eca13e72..cc30e201748 100644 --- a/miner/stratum/src/lib.rs +++ b/miner/stratum/src/lib.rs @@ -76,7 +76,7 @@ impl Stratum { let implementation = Arc::new(StratumImpl { subscribers: RwLock::default(), - job_que: RwLock::default(), + job_queue: RwLock::default(), dispatcher, workers: Arc::new(RwLock::default()), secret, @@ -106,7 +106,7 @@ impl Stratum { } impl PushWorkHandler for Stratum { - fn push_work_all(&self, payload: String) -> Result<(), Error> { + fn push_work_all(&self, payload: String) { self.implementation.push_work_all(payload, &self.tcp_dispatcher) } @@ -126,14 +126,14 @@ struct StratumImpl { /// Subscribed clients subscribers: RwLock>, /// List of workers supposed to receive job update - job_que: RwLock>, + job_queue: RwLock>, /// Payload manager dispatcher: Arc, /// Authorized workers (socket - worker_id) workers: Arc>>, /// Secret if any secret: Option, - /// Dispatch notify couinter + /// Dispatch notify counter notify_counter: RwLock, } @@ -143,7 +143,7 @@ impl StratumImpl { use std::str::FromStr; self.subscribers.write().push(meta.addr().clone()); - self.job_que.write().insert(meta.addr().clone()); + self.job_queue.write().insert(meta.addr().clone()); trace!(target: "stratum", "Subscription request from {:?}", meta.addr()); Ok(match self.dispatcher.initial() { @@ -204,31 +204,33 @@ impl StratumImpl { /// Helper method fn update_peers(&self, tcp_dispatcher: &Dispatcher) { if let Some(job) = self.dispatcher.job() { - if let Err(e) = self.push_work_all(job, tcp_dispatcher) { - warn!("Failed to update some of the peers: {:?}", e); - } + self.push_work_all(job, tcp_dispatcher) } } - fn push_work_all(&self, payload: String, tcp_dispatcher: &Dispatcher) -> Result<(), Error> { + fn push_work_all(&self, payload: String, tcp_dispatcher: &Dispatcher) { let hup_peers = { let workers = self.workers.read(); let next_request_id = { let mut counter = self.notify_counter.write(); - if *counter == ::std::u32::MAX { *counter = NOTIFY_COUNTER_INITIAL; } - else { *counter = *counter + 1 } + if *counter == ::std::u32::MAX { + *counter = NOTIFY_COUNTER_INITIAL; + } else { + *counter = *counter + 1 + } *counter }; - let mut hup_peers = HashSet::with_capacity(0); // most of the cases won't be needed, hence avoid allocation + // most of the cases won't be needed, hence avoid allocation + let mut hup_peers = HashSet::with_capacity(0); let workers_msg = format!("{{ \"id\": {}, \"method\": \"mining.notify\", \"params\": {} }}", next_request_id, payload); trace!(target: "stratum", "pushing work for {} workers (payload: '{}')", workers.len(), &workers_msg); - for (ref addr, _) in workers.iter() { + for (addr, _) in workers.iter() { trace!(target: "stratum", "pusing work to {}", addr); match tcp_dispatcher.push_message(addr, workers_msg.clone()) { Err(PushMessageError::NoSuchPeer) => { - trace!(target: "stratum", "Worker no longer connected: {}", &addr); - hup_peers.insert(*addr.clone()); + trace!(target: "stratum", "Worker no longer connected: {}", addr); + hup_peers.insert(addr.clone()); }, Err(e) => { warn!(target: "stratum", "Unexpected transport error: {:?}", e); @@ -241,10 +243,10 @@ impl StratumImpl { if !hup_peers.is_empty() { let mut workers = self.workers.write(); - for hup_peer in hup_peers { workers.remove(&hup_peer); } + for hup_peer in hup_peers { + workers.remove(&hup_peer); + } } - - Ok(()) } fn push_work(&self, payloads: Vec, tcp_dispatcher: &Dispatcher) -> Result<(), Error> { @@ -475,8 +477,7 @@ mod tests { .map_err(|err: timeout::Error<()>| panic!("Timeout: {:?}", err)) .and_then(move |stream| { trace!(target: "stratum", "Pusing work to peers"); - stratum.push_work_all(r#"{ "00040008", "100500" }"#.to_owned()) - .expect("Pushing work should produce no errors"); + stratum.push_work_all(r#"{ "00040008", "100500" }"#.to_owned()); Timeout::new(future::ok(stream), ::std::time::Duration::from_millis(100)) }) .map_err(|err: timeout::Error<()>| panic!("Timeout: {:?}", err)) diff --git a/miner/stratum/src/traits.rs b/miner/stratum/src/traits.rs index 36b95a0169e..49840fce2ed 100644 --- a/miner/stratum/src/traits.rs +++ b/miner/stratum/src/traits.rs @@ -55,7 +55,7 @@ pub trait JobDispatcher: Send + Sync { /// Interface that can handle requests to push job for workers pub trait PushWorkHandler: Send + Sync { /// push the same work package for all workers (`payload`: json of pow-specific set of work specification) - fn push_work_all(&self, payload: String) -> Result<(), Error>; + fn push_work_all(&self, payload: String); /// push the work packages worker-wise (`payload`: json of pow-specific set of work specification) fn push_work(&self, payloads: Vec) -> Result<(), Error>; From e2ecb569df6f64b181ae71c98fbc266724827014 Mon Sep 17 00:00:00 2001 From: David Palm Date: Thu, 10 Oct 2019 21:30:04 +0200 Subject: [PATCH 2/3] HashSet::new does not allocate before first insert --- miner/stratum/src/lib.rs | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/miner/stratum/src/lib.rs b/miner/stratum/src/lib.rs index cc30e201748..405b2ec5f93 100644 --- a/miner/stratum/src/lib.rs +++ b/miner/stratum/src/lib.rs @@ -221,8 +221,7 @@ impl StratumImpl { *counter }; - // most of the cases won't be needed, hence avoid allocation - let mut hup_peers = HashSet::with_capacity(0); + let mut hup_peers = HashSet::new(); let workers_msg = format!("{{ \"id\": {}, \"method\": \"mining.notify\", \"params\": {} }}", next_request_id, payload); trace!(target: "stratum", "pushing work for {} workers (payload: '{}')", workers.len(), &workers_msg); for (addr, _) in workers.iter() { From 40832628617929134ed38ae0dce51f898ab5ca59 Mon Sep 17 00:00:00 2001 From: David Palm Date: Fri, 11 Oct 2019 10:53:15 +0200 Subject: [PATCH 3/3] Remove unused method push_work() --- miner/stratum/src/lib.rs | 47 ++++++++----------------------------- miner/stratum/src/traits.rs | 3 --- 2 files changed, 10 insertions(+), 40 deletions(-) diff --git a/miner/stratum/src/lib.rs b/miner/stratum/src/lib.rs index 405b2ec5f93..75948cfeec0 100644 --- a/miner/stratum/src/lib.rs +++ b/miner/stratum/src/lib.rs @@ -109,10 +109,6 @@ impl PushWorkHandler for Stratum { fn push_work_all(&self, payload: String) { self.implementation.push_work_all(payload, &self.tcp_dispatcher) } - - fn push_work(&self, payloads: Vec) -> Result<(), Error> { - self.implementation.push_work(payloads, &self.tcp_dispatcher) - } } impl Drop for Stratum { @@ -160,7 +156,7 @@ impl StratumImpl { /// rpc method `mining.authorize` fn authorize(&self, params: Params, meta: SocketMetadata) -> RpcResult { - params.parse::<(String, String)>().map(|(worker_id, secret)|{ + params.parse::<(String, String)>().map(|(worker_id, secret)| { if let Some(valid_secret) = self.secret { let hash = keccak(secret); if hash != valid_secret { @@ -184,15 +180,15 @@ impl StratumImpl { _ => None }) .collect::>()) { - Ok(()) => { - self.update_peers(&meta.tcp_dispatcher.expect("tcp_dispatcher is always initialized; qed")); - to_value(true) - }, - Err(submit_err) => { - warn!("Error while submitting share: {:?}", submit_err); - to_value(false) - } + Ok(()) => { + self.update_peers(&meta.tcp_dispatcher.expect("tcp_dispatcher is always initialized; qed")); + to_value(true) + }, + Err(submit_err) => { + warn!("Error while submitting share: {:?}", submit_err); + to_value(false) } + } }, _ => { trace!(target: "stratum", "Invalid submit work format {:?}", params); @@ -234,7 +230,7 @@ impl StratumImpl { Err(e) => { warn!(target: "stratum", "Unexpected transport error: {:?}", e); }, - Ok(_) => { }, + Ok(_) => {}, } } hup_peers @@ -247,29 +243,6 @@ impl StratumImpl { } } } - - fn push_work(&self, payloads: Vec, tcp_dispatcher: &Dispatcher) -> Result<(), Error> { - if !payloads.len() > 0 { - return Err(Error::NoWork); - } - let workers = self.workers.read(); - let addrs = workers.keys().collect::>(); - if !workers.len() > 0 { - return Err(Error::NoWorkers); - } - let mut que = payloads; - let mut addr_index = 0; - while que.len() > 0 { - let next_worker = addrs[addr_index]; - let mut next_payload = que.drain(0..1); - tcp_dispatcher.push_message( - next_worker, - next_payload.nth(0).expect("drained successfully of 0..1, so 0-th element should exist") - )?; - addr_index = addr_index + 1; - } - Ok(()) - } } #[derive(Clone)] diff --git a/miner/stratum/src/traits.rs b/miner/stratum/src/traits.rs index 49840fce2ed..d71af1feedc 100644 --- a/miner/stratum/src/traits.rs +++ b/miner/stratum/src/traits.rs @@ -56,9 +56,6 @@ pub trait JobDispatcher: Send + Sync { pub trait PushWorkHandler: Send + Sync { /// push the same work package for all workers (`payload`: json of pow-specific set of work specification) fn push_work_all(&self, payload: String); - - /// push the work packages worker-wise (`payload`: json of pow-specific set of work specification) - fn push_work(&self, payloads: Vec) -> Result<(), Error>; } pub struct ServiceConfiguration {