Skip to content
This repository was archived by the owner on Nov 6, 2020. It is now read-only.
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion ethcore/light/src/on_demand/request.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ use common_types::receipt::Receipt;
use common_types::transaction::SignedTransaction;
use ethcore::engines::{Engine, StateDependentProof};
use ethcore::executive_state::{ProvedExecution, self};
use account_state;
use ethereum_types::{H256, U256, Address};
use ethtrie::{TrieError, TrieDB};
use hash::{KECCAK_NULL_RLP, KECCAK_EMPTY, KECCAK_EMPTY_LIST_RLP, keccak};
Expand Down
80 changes: 21 additions & 59 deletions ethcore/src/miner/stratum.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,7 @@
//! Client-side stratum job dispatcher and mining notifier handler

use std::sync::{Arc, Weak};
use std::net::{SocketAddr, AddrParseError};
use std::fmt;
use std::net::SocketAddr;

use client::{Client, ImportSealedBlock};
use ethereum_types::{H64, H256, U256};
Expand All @@ -27,9 +26,7 @@ use ethash::{self, SeedHashCompute};
use ethcore_miner::work_notify::NotifyWork;
#[cfg(feature = "work-notify")]
use ethcore_stratum::PushWorkHandler;
use ethcore_stratum::{
JobDispatcher, Stratum as StratumService, Error as StratumServiceError,
};
use ethcore_stratum::{JobDispatcher, Stratum as StratumService};
use miner::{Miner, MinerService};
use parking_lot::Mutex;
use rlp::encode;
Expand Down Expand Up @@ -62,57 +59,43 @@ struct SubmitPayload {
}

impl SubmitPayload {
fn from_args(payload: Vec<String>) -> Result<Self, PayloadError> {
fn from_args(payload: Vec<String>) -> Result<Self, String> {
if payload.len() != 3 {
return Err(PayloadError::ArgumentsAmountUnexpected(payload.len()));
return Err(format!("Stratum received unexpected amount of arguments {}", payload.len()));
}

let nonce = match clean_0x(&payload[0]).parse::<H64>() {
Ok(nonce) => nonce,
Err(e) => {
warn!(target: "stratum", "submit_work ({}): invalid nonce ({:?})", &payload[0], e);
return Err(PayloadError::InvalidNonce(payload[0].clone()))
return Err(format!("Stratum received invalid nonce: {}", payload[0]));
}
};

let pow_hash = match clean_0x(&payload[1]).parse::<H256>() {
Ok(pow_hash) => pow_hash,
Err(e) => {
warn!(target: "stratum", "submit_work ({}): invalid hash ({:?})", &payload[1], e);
return Err(PayloadError::InvalidPowHash(payload[1].clone()));
return Err(format!("Stratum received invalid pow hash: {}", payload[1]));
}
};

let mix_hash = match clean_0x(&payload[2]).parse::<H256>() {
Ok(mix_hash) => mix_hash,
Err(e) => {
warn!(target: "stratum", "submit_work ({}): invalid mix-hash ({:?})", &payload[2], e);
return Err(PayloadError::InvalidMixHash(payload[2].clone()));
return Err(format!("Stratum received invalid mix hash: {}", payload[2]));
}
};

Ok(SubmitPayload {
nonce: nonce,
pow_hash: pow_hash,
mix_hash: mix_hash,
nonce,
pow_hash,
mix_hash,
})
}
}

#[derive(Debug)]
enum PayloadError {
ArgumentsAmountUnexpected(usize),
InvalidNonce(String),
InvalidPowHash(String),
InvalidMixHash(String),
}

impl fmt::Display for PayloadError {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
fmt::Debug::fmt(&self, f)
}
}

/// Job dispatcher for stratum service
pub struct StratumJobDispatcher {
seed_compute: Mutex<SeedHashCompute>,
Expand All @@ -132,10 +115,8 @@ impl JobDispatcher for StratumJobDispatcher {
}))
}

fn submit(&self, payload: Vec<String>) -> Result<(), StratumServiceError> {
let payload = SubmitPayload::from_args(payload).map_err(|e|
StratumServiceError::Dispatch(e.to_string())
)?;
fn submit(&self, payload: Vec<String>) -> Result<(), String> {
let payload = SubmitPayload::from_args(payload)?;

trace!(
target: "stratum",
Expand All @@ -154,7 +135,7 @@ impl JobDispatcher for StratumJobDispatcher {
Ok(_) => Ok(()),
Err(e) => {
warn!(target: "stratum", "submit_seal error: {:?}", e);
Err(StratumServiceError::Dispatch(e.to_string()))
Err(format!("Stratum dispatch failed with error: {}", e))
}
}
})
Expand Down Expand Up @@ -187,7 +168,7 @@ impl StratumJobDispatcher {
self.client.upgrade().and_then(|client| self.miner.upgrade().and_then(|miner| (f)(client, miner)))
}

fn with_core_result<F>(&self, f: F) -> Result<(), StratumServiceError> where F: Fn(Arc<Client>, Arc<Miner>) -> Result<(), StratumServiceError> {
fn with_core_result<F>(&self, f: F) -> Result<(), String> where F: Fn(Arc<Client>, Arc<Miner>) -> Result<(), String> {
match (self.client.upgrade(), self.miner.upgrade()) {
(Some(client), Some(miner)) => f(client, miner),
_ => Ok(()),
Expand All @@ -201,46 +182,27 @@ pub struct Stratum {
service: Arc<StratumService>,
}

#[derive(Debug)]
/// Stratum error
pub enum Error {
/// IPC sockets error
Service(StratumServiceError),
/// Invalid network address
Address(AddrParseError),
}

impl From<StratumServiceError> for Error {
fn from(service_err: StratumServiceError) -> Error { Error::Service(service_err) }
}

impl From<AddrParseError> for Error {
fn from(err: AddrParseError) -> Error { Error::Address(err) }
}

#[cfg(feature = "work-notify")]
impl NotifyWork for Stratum {
fn notify(&self, pow_hash: H256, difficulty: U256, number: u64) {
trace!(target: "stratum", "Notify work");

self.service.push_work_all(
self.dispatcher.payload(pow_hash, difficulty, number)
).unwrap_or_else(
|e| warn!(target: "stratum", "Error while pushing work: {:?}", e)
);
self.service.push_work_all(self.dispatcher.payload(pow_hash, difficulty, number))
}
}

impl Stratum {

/// New stratum job dispatcher, given the miner, client and dedicated stratum service
pub fn start(options: &Options, miner: Weak<Miner>, client: Weak<Client>) -> Result<Stratum, Error> {
pub fn start(options: &Options, miner: Weak<Miner>, client: Weak<Client>) -> Result<Stratum, String> {
use std::net::IpAddr;

let dispatcher = Arc::new(StratumJobDispatcher::new(miner, client));

let listen_addr = options.listen_addr.parse::<IpAddr>()
.map_err(|e| format!("Stratum cannot parse listen address: {:?}", e))?;

let stratum_svc = StratumService::start(
&SocketAddr::new(options.listen_addr.parse::<IpAddr>()?, options.port),
&SocketAddr::new(listen_addr, options.port),
dispatcher.clone(),
options.secret.clone(),
)?;
Expand All @@ -253,7 +215,7 @@ impl Stratum {

/// Start STRATUM job dispatcher and register it in the miner
#[cfg(feature = "work-notify")]
pub fn register(cfg: &Options, miner: Arc<Miner>, client: Weak<Client>) -> Result<(), Error> {
pub fn register(cfg: &Options, miner: Arc<Miner>, client: Weak<Client>) -> Result<(), String> {
let stratum = Stratum::start(cfg, Arc::downgrade(&miner.clone()), client)?;
miner.add_work_listener(Box::new(stratum) as Box<NotifyWork>);
Ok(())
Expand Down
33 changes: 14 additions & 19 deletions miner/stratum/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ extern crate parking_lot;
mod traits;

pub use traits::{
JobDispatcher, PushWorkHandler, Error, ServiceConfiguration,
JobDispatcher, PushWorkHandler, ServiceConfiguration,
};

use jsonrpc_tcp_server::{
Expand Down Expand Up @@ -72,7 +72,7 @@ impl Stratum {
addr: &SocketAddr,
dispatcher: Arc<JobDispatcher>,
secret: Option<H256>,
) -> Result<Arc<Stratum>, Error> {
) -> Result<Arc<Stratum>, String> {

let implementation = Arc::new(StratumImpl {
subscribers: RwLock::default(),
Expand All @@ -93,7 +93,7 @@ impl Stratum {
let server_builder = JsonRpcServerBuilder::new(handler);
let tcp_dispatcher = server_builder.dispatcher();
let server_builder = server_builder.session_meta_extractor(PeerMetaExtractor::new(tcp_dispatcher.clone()));
let server = server_builder.start(addr)?;
let server = server_builder.start(addr).map_err(|e| format!("Stratum server cannot be started: {:?}", e))?;

let stratum = Arc::new(Stratum {
rpc_server: Some(server),
Expand All @@ -106,11 +106,11 @@ impl Stratum {
}

impl PushWorkHandler for Stratum {
fn push_work_all(&self, payload: String) -> Result<(), Error> {
fn push_work_all(&self, payload: String) {
self.implementation.push_work_all(payload, &self.tcp_dispatcher)
}

fn push_work(&self, payloads: Vec<String>) -> Result<(), Error> {
fn push_work(&self, payloads: Vec<String>) -> Result<(), String> {
self.implementation.push_work(payloads, &self.tcp_dispatcher)
}
}
Expand Down Expand Up @@ -204,13 +204,11 @@ impl StratumImpl {
/// Helper method
fn update_peers(&self, tcp_dispatcher: &Dispatcher) {
if let Some(job) = self.dispatcher.job() {
if let Err(e) = self.push_work_all(job, tcp_dispatcher) {
warn!("Failed to update some of the peers: {:?}", e);
}
self.push_work_all(job, tcp_dispatcher)
}
}

fn push_work_all(&self, payload: String, tcp_dispatcher: &Dispatcher) -> Result<(), Error> {
fn push_work_all(&self, payload: String, tcp_dispatcher: &Dispatcher) {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

wow, so this method has actually always returned an Ok(()), 👍

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Addressed this in #11161

let hup_peers = {
let workers = self.workers.read();
let next_request_id = {
Expand Down Expand Up @@ -243,18 +241,16 @@ impl StratumImpl {
let mut workers = self.workers.write();
for hup_peer in hup_peers { workers.remove(&hup_peer); }
}

Ok(())
}

fn push_work(&self, payloads: Vec<String>, tcp_dispatcher: &Dispatcher) -> Result<(), Error> {
fn push_work(&self, payloads: Vec<String>, tcp_dispatcher: &Dispatcher) -> Result<(), String> {
if !payloads.len() > 0 {
return Err(Error::NoWork);
return Err("Stratum has no work".into());
}
let workers = self.workers.read();
let addrs = workers.keys().collect::<Vec<&SocketAddr>>();
if !workers.len() > 0 {
return Err(Error::NoWorkers);
return Err("Stratum has no workers".into());
}
let mut que = payloads;
let mut addr_index = 0;
Expand All @@ -264,7 +260,7 @@ impl StratumImpl {
tcp_dispatcher.push_message(
next_worker,
next_payload.nth(0).expect("drained successfully of 0..1, so 0-th element should exist")
)?;
).map_err(|e| format!("Stratum cannot push more work: {:?}", e))?;
addr_index = addr_index + 1;
}
Ok(())
Expand Down Expand Up @@ -330,7 +326,7 @@ mod tests {
pub struct VoidManager;

impl JobDispatcher for VoidManager {
fn submit(&self, _payload: Vec<String>) -> Result<(), Error> {
fn submit(&self, _payload: Vec<String>) -> Result<(), String> {
Ok(())
}
}
Expand Down Expand Up @@ -398,7 +394,7 @@ mod tests {
Some(self.initial_payload.clone())
}

fn submit(&self, _payload: Vec<String>) -> Result<(), Error> {
fn submit(&self, _payload: Vec<String>) -> Result<(), String> {
Ok(())
}
}
Expand Down Expand Up @@ -475,8 +471,7 @@ mod tests {
.map_err(|err: timeout::Error<()>| panic!("Timeout: {:?}", err))
.and_then(move |stream| {
trace!(target: "stratum", "Pusing work to peers");
stratum.push_work_all(r#"{ "00040008", "100500" }"#.to_owned())
.expect("Pushing work should produce no errors");
stratum.push_work_all(r#"{ "00040008", "100500" }"#.to_owned());
Timeout::new(future::ok(stream), ::std::time::Duration::from_millis(100))
})
.map_err(|err: timeout::Error<()>| panic!("Timeout: {:?}", err))
Expand Down
30 changes: 3 additions & 27 deletions miner/stratum/src/traits.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,31 +14,7 @@
// You should have received a copy of the GNU General Public License
// along with Parity Ethereum. If not, see <http://www.gnu.org/licenses/>.

use std;
use std::error::Error as StdError;
use ethereum_types::H256;
use jsonrpc_tcp_server::PushMessageError;

#[derive(Debug, Clone)]
pub enum Error {
NoWork,
NoWorkers,
Io(String),
Tcp(String),
Dispatch(String),
}

impl From<std::io::Error> for Error {
fn from(err: std::io::Error) -> Self {
Error::Io(err.description().to_owned())
}
}

impl From<PushMessageError> for Error {
fn from(err: PushMessageError) -> Self {
Error::Tcp(format!("Push message error: {:?}", err))
}
}

/// Interface that can provide pow/blockchain-specific responses for the clients
pub trait JobDispatcher: Send + Sync {
Expand All @@ -49,16 +25,16 @@ pub trait JobDispatcher: Send + Sync {
// json for job update given worker_id (payload manager should split job!)
fn job(&self) -> Option<String> { None }
// miner job result
fn submit(&self, payload: Vec<String>) -> Result<(), Error>;
fn submit(&self, payload: Vec<String>) -> Result<(), String>;
}

/// Interface that can handle requests to push job for workers
pub trait PushWorkHandler: Send + Sync {
/// push the same work package for all workers (`payload`: json of pow-specific set of work specification)
fn push_work_all(&self, payload: String) -> Result<(), Error>;
fn push_work_all(&self, payload: String);

/// push the work packages worker-wise (`payload`: json of pow-specific set of work specification)
fn push_work(&self, payloads: Vec<String>) -> Result<(), Error>;
fn push_work(&self, payloads: Vec<String>) -> Result<(), String>;
}

pub struct ServiceConfiguration {
Expand Down