Skip to content

Commit

Permalink
Fix handling of IO WouldBlock error in Stratum server
Browse files Browse the repository at this point in the history
If WouldBlock error happens Stratum server drops part of a message to
read or write. This PR inroduces a worker's buffer to store partially
read message which will be completed next time. For write the existing
util function is used.
Fixes mimblewimble#2524
  • Loading branch information
hashmap committed Feb 4, 2019
1 parent c41ea4b commit 878d8de
Showing 1 changed file with 16 additions and 11 deletions.
27 changes: 16 additions & 11 deletions servers/src/mining/stratumserver.rs
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,7 @@ pub struct Worker {
stream: BufStream<TcpStream>,
error: bool,
authenticated: bool,
buffer: String,
}

impl Worker {
Expand All @@ -164,15 +165,18 @@ impl Worker {
stream: stream,
error: false,
authenticated: false,
buffer: String::with_capacity(4096),
}
}

// Get Message from the worker
fn read_message(&mut self, line: &mut String) -> Option<usize> {
fn read_message(&mut self) -> Option<String> {
// Read and return a single message or None
match self.stream.read_line(line) {
Ok(n) => {
return Some(n);
match self.stream.read_line(&mut self.buffer) {
Ok(_) => {
let res = self.buffer.clone();
self.buffer.clear();
return Some(res);
}
Err(ref e) if e.kind() == ErrorKind::WouldBlock => {
// Not an error, just no messages ready
Expand All @@ -183,6 +187,7 @@ impl Worker {
"(Server ID: {}) Error in connection with stratum client: {}",
self.id, e
);
self.buffer.clear();
self.error = true;
return None;
}
Expand All @@ -195,7 +200,11 @@ impl Worker {
if !message.ends_with("\n") {
message += "\n";
}
match self.stream.write(message.as_bytes()) {
match util::read_write::write_all(
&mut self.stream,
message.as_bytes(),
Duration::from_secs(1),
) {
Ok(_) => match self.stream.flush() {
Ok(_) => {}
Err(e) => {
Expand Down Expand Up @@ -281,10 +290,9 @@ impl StratumServer {
// Handle an RPC request message from the worker(s)
fn handle_rpc_requests(&mut self, stratum_stats: &mut Arc<RwLock<StratumStats>>) {
let mut workers_l = self.workers.lock();
let mut the_message = String::with_capacity(4096);
for num in 0..workers_l.len() {
match workers_l[num].read_message(&mut the_message) {
Some(_) => {
match workers_l[num].read_message() {
Some(the_message) => {
// Decompose the request from the JSONRpc wrapper
let request: RpcRequest = match serde_json::from_str(&the_message) {
Ok(request) => request,
Expand All @@ -297,13 +305,10 @@ impl StratumServer {
the_message.as_bytes(),
);
workers_l[num].error = true;
the_message.clear();
continue;
}
};

the_message.clear();

let mut stratum_stats = stratum_stats.write();
let worker_stats_id = match stratum_stats
.worker_stats
Expand Down

0 comments on commit 878d8de

Please sign in to comment.