Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use blocking IO in P2P to avoid short sleeps #2778

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 2 additions & 7 deletions core/src/ser.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@

use crate::core::hash::{DefaultHashable, Hash, Hashed};
use crate::keychain::{BlindingFactor, Identifier, IDENTIFIER_SIZE};
use crate::util::read_write::read_exact;
use crate::util::secp::constants::{
AGG_SIGNATURE_SIZE, MAX_PROOF_SIZE, PEDERSEN_COMMITMENT_SIZE, SECRET_KEY_SIZE,
};
Expand All @@ -31,7 +30,6 @@ use byteorder::{BigEndian, ByteOrder, ReadBytesExt};
use std::fmt::Debug;
use std::io::{self, Read, Write};
use std::marker;
use std::time::Duration;
use std::{cmp, error, fmt};

/// Possible errors deriving from serializing or deserializing.
Expand Down Expand Up @@ -366,17 +364,14 @@ impl<'a> Reader for BinReader<'a> {
pub struct StreamingReader<'a> {
total_bytes_read: u64,
stream: &'a mut dyn Read,
timeout: Duration,
}

impl<'a> StreamingReader<'a> {
/// Create a new streaming reader with the provided underlying stream.
/// Also takes a duration to be used for each individual read_exact call.
pub fn new(stream: &'a mut dyn Read, timeout: Duration) -> StreamingReader<'a> {
pub fn new(stream: &'a mut dyn Read) -> StreamingReader<'a> {
StreamingReader {
total_bytes_read: 0,
stream,
timeout,
}
}

Expand Down Expand Up @@ -427,7 +422,7 @@ impl<'a> Reader for StreamingReader<'a> {
/// Read a fixed number of bytes.
fn read_fixed_bytes(&mut self, len: usize) -> Result<Vec<u8>, Error> {
let mut buf = vec![0u8; len];
read_exact(&mut self.stream, &mut buf, self.timeout, true)?;
self.stream.read_exact(&mut buf)?;
self.total_bytes_read += len as u64;
Ok(buf)
}
Expand Down
185 changes: 105 additions & 80 deletions p2p/src/conn.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,18 +20,18 @@
//! forces us to go through some additional gymnastic to loop over the async
//! stream and make sure we get the right number of bytes out.

use std::fs::File;
use std::io::{self, Read, Write};
use std::net::{Shutdown, TcpStream};
use std::sync::{mpsc, Arc};
use std::{cmp, thread, time};

use crate::core::ser;
use crate::core::ser::FixedLength;
use crate::msg::{read_body, read_header, read_item, write_to_buf, MsgHeader, Type};
use crate::types::Error;
use crate::util::read_write::{read_exact, write_all};
use crate::util::{RateCounter, RwLock};
use std::fs::File;
use std::io::{self, Read, Write};
use std::net::{Shutdown, TcpStream};
use std::sync::atomic::{AtomicBool, AtomicU8, Ordering};
use std::sync::{mpsc, Arc};
use std::time::Duration;
use std::{cmp, thread};

/// A trait to be implemented in order to receive messages from the
/// connection. Allows providing an optional response.
Expand All @@ -40,7 +40,7 @@ pub trait MessageHandler: Send + 'static {
&self,
msg: Message<'a>,
writer: &'a mut dyn Write,
received_bytes: Arc<RwLock<RateCounter>>,
tracker: Arc<Tracker>,
) -> Result<Option<Response<'a>>, Error>;
}

Expand All @@ -50,7 +50,11 @@ macro_rules! try_break {
($chan:ident, $inner:expr) => {
match $inner {
Ok(v) => Some(v),
Err(Error::Connection(ref e)) if e.kind() == io::ErrorKind::WouldBlock => None,
Err(Error::Connection(ref e))
if e.kind() == io::ErrorKind::WouldBlock || e.kind() == io::ErrorKind::TimedOut =>
{
None
}
Err(e) => {
let _ = $chan.send(e);
break;
Expand Down Expand Up @@ -87,12 +91,7 @@ impl<'a> Message<'a> {
while written < len {
let read_len = cmp::min(8000, len - written);
let mut buf = vec![0u8; read_len];
read_exact(
&mut self.stream,
&mut buf[..],
time::Duration::from_secs(10),
true,
)?;
self.stream.read_exact(&mut buf[..])?;
writer.write_all(&mut buf)?;
written += read_len;
}
Expand Down Expand Up @@ -123,26 +122,21 @@ impl<'a> Response<'a> {
})
}

fn write(mut self, sent_bytes: Arc<RwLock<RateCounter>>) -> Result<(), Error> {
fn write(mut self, tracker: Arc<Tracker>) -> Result<(), Error> {
let mut msg = ser::ser_vec(&MsgHeader::new(self.resp_type, self.body.len() as u64))?;
msg.append(&mut self.body);
write_all(&mut self.stream, &msg[..], time::Duration::from_secs(10))?;
// Increase sent bytes counter
{
let mut sent_bytes = sent_bytes.write();
sent_bytes.inc(msg.len() as u64);
}
self.stream.write_all(&msg[..])?;
tracker.sent_bytes_inc(msg.len() as u64);
if let Some(mut file) = self.attachment {
let mut buf = [0u8; 8000];
loop {
match file.read(&mut buf[..]) {
Ok(0) => break,
Ok(n) => {
write_all(&mut self.stream, &buf[..n], time::Duration::from_secs(10))?;
self.stream.write_all(&buf[..n])?;
// Increase sent bytes "quietly" without incrementing the counter.
// (In a loop here for the single attachment).
let mut sent_bytes = sent_bytes.write();
sent_bytes.inc_quiet(n as u64);
tracker.sent_bytes_inc_quiet(n as u64);
}
Err(e) => return Err(From::from(e)),
}
Expand All @@ -165,10 +159,10 @@ pub struct Tracker {
pub received_bytes: Arc<RwLock<RateCounter>>,
/// Channel to allow sending data through the connection
pub send_channel: mpsc::SyncSender<Vec<u8>>,
/// Channel to close the connection
pub close_channel: mpsc::Sender<()>,
/// Channel to check for errors on the connection
pub error_channel: mpsc::Receiver<Error>,
//pub error_channel: mpsc::Receiver<Error>,
closed: AtomicBool,
pub number_threads: AtomicU8,
}

impl Tracker {
Expand All @@ -179,24 +173,48 @@ impl Tracker {
let buf = write_to_buf(body, msg_type)?;
let buf_len = buf.len();
self.send_channel.try_send(buf)?;
self.sent_bytes_inc(buf_len as u64);
Ok(())
}

// Increase sent bytes counter
pub fn is_closed(&self) -> bool {
self.closed.load(Ordering::Relaxed)
}

pub fn close(&self) {
self.closed.store(true, Ordering::Relaxed)
}

pub fn received_bytes_inc(&self, bytes: u64) {
let mut received_bytes = self.received_bytes.write();
received_bytes.inc(bytes);
}

pub fn received_bytes_inc_quiet(&self, bytes: u64) {
let mut received_bytes = self.received_bytes.write();
received_bytes.inc_quiet(bytes);
}

pub fn sent_bytes_inc(&self, bytes: u64) {
let mut sent_bytes = self.sent_bytes.write();
sent_bytes.inc(buf_len as u64);
sent_bytes.inc(bytes);
}

Ok(())
pub fn sent_bytes_inc_quiet(&self, bytes: u64) {
let mut sent_bytes = self.sent_bytes.write();
sent_bytes.inc_quiet(bytes);
}
}
const IO_TIMEOUT: Duration = Duration::from_millis(1000);

/// Start listening on the provided connection and wraps it. Does not hang
/// the current thread, instead just returns a future and the Connection
/// itself.
pub fn listen<H>(stream: TcpStream, handler: H) -> Tracker
pub fn listen<H>(stream: TcpStream, handler: H) -> (Arc<Tracker>, mpsc::Receiver<Error>)
where
H: MessageHandler,
{
let (send_tx, send_rx) = mpsc::sync_channel(SEND_CHANNEL_CAP);
let (close_tx, close_rx) = mpsc::channel();
let (error_tx, error_rx) = mpsc::channel();

// Counter of number of bytes received
Expand All @@ -205,50 +223,49 @@ where
let sent_bytes = Arc::new(RwLock::new(RateCounter::new()));

stream
.set_nonblocking(true)
.expect("Non-blocking IO not available.");
poll(
stream,
handler,
send_rx,
error_tx,
close_rx,
received_bytes.clone(),
sent_bytes.clone(),
);

Tracker {
.set_read_timeout(Some(IO_TIMEOUT))
.expect("can't set read timeout");
stream
.set_write_timeout(Some(IO_TIMEOUT))
.expect("can't set write timeout");
//stream
// .set_nonblocking(true)
// .expect("Non-blocking IO not available.");
//

let tracker = Arc::new(Tracker {
sent_bytes: sent_bytes.clone(),
received_bytes: received_bytes.clone(),
send_channel: send_tx,
close_channel: close_tx,
error_channel: error_rx,
}
closed: AtomicBool::new(false),
number_threads: AtomicU8::new(0),
});
poll(stream, handler, send_rx, error_tx, tracker.clone());
(tracker, error_rx)
}

fn poll<H>(
conn: TcpStream,
handler: H,
send_rx: mpsc::Receiver<Vec<u8>>,
error_tx: mpsc::Sender<Error>,
close_rx: mpsc::Receiver<()>,
received_bytes: Arc<RwLock<RateCounter>>,
sent_bytes: Arc<RwLock<RateCounter>>,
tracker: Arc<Tracker>,
) where
H: MessageHandler,
{
// Split out tcp stream out into separate reader/writer halves.
let mut reader = conn.try_clone().expect("clone conn for reader failed");
let mut responder = conn.try_clone().expect("clone conn for reader failed");
let mut writer = conn.try_clone().expect("clone conn for writer failed");

let tracker_read = tracker.clone();
let error_read_tx = error_tx.clone();
let _ = thread::Builder::new()
.name("peer".to_string())
.name("peer_read".to_string())
.spawn(move || {
let sleep_time = time::Duration::from_millis(5);
let mut retry_send = Err(());
tracker_read.number_threads.fetch_add(1, Ordering::Relaxed);
loop {
// check the read end
if let Some(h) = try_break!(error_tx, read_header(&mut reader, None)) {
if let Some(h) = try_break!(error_read_tx, read_header(&mut reader, None)) {
let msg = Message::from_header(h, &mut reader);

trace!(
Expand All @@ -258,21 +275,38 @@ fn poll<H>(
);

// Increase received bytes counter
let received = received_bytes.clone();
{
let mut received_bytes = received_bytes.write();
received_bytes.inc(MsgHeader::LEN as u64 + msg.header.msg_len);
}
tracker_read.received_bytes_inc(MsgHeader::LEN as u64 + msg.header.msg_len);

if let Some(Some(resp)) =
try_break!(error_tx, handler.consume(msg, &mut writer, received))
{
try_break!(error_tx, resp.write(sent_bytes.clone()));
if let Some(Some(resp)) = try_break!(
error_read_tx,
handler.consume(msg, &mut responder, tracker_read.clone())
) {
try_break!(error_read_tx, resp.write(tracker_read.clone()));
}
}

// check the write end, use or_else so try_recv is lazily eval'd
let maybe_data = retry_send.or_else(|_| send_rx.try_recv());
if tracker_read.is_closed() {
debug!(
"Connection close with {} initiated by us",
conn.peer_addr()
.map(|a| a.to_string())
.unwrap_or("?".to_owned())
);
break;
}
}
tracker_read.number_threads.fetch_sub(1, Ordering::Relaxed);
let _ = conn.shutdown(Shutdown::Both);
});

let _ = thread::Builder::new()
.name("peer_write".to_string())
.spawn(move || {
tracker.number_threads.fetch_add(1, Ordering::Relaxed);
let mut retry_send = Err(());
// check the write end, use or_else so try_recv is lazily eval'd
loop {
let maybe_data = retry_send.or_else(|_| send_rx.recv_timeout(IO_TIMEOUT));
retry_send = Err(());
if let Ok(data) = maybe_data {
let written =
Expand All @@ -281,20 +315,11 @@ fn poll<H>(
retry_send = Ok(data);
}
}

// check the close channel
if let Ok(_) = close_rx.try_recv() {
debug!(
"Connection close with {} initiated by us",
conn.peer_addr()
.map(|a| a.to_string())
.unwrap_or("?".to_owned())
);
if tracker.is_closed() {
debug!("Connection close with initiated by us, closing writer end",);
break;
}

thread::sleep(sleep_time);
}
let _ = conn.shutdown(Shutdown::Both);
tracker.number_threads.fetch_sub(1, Ordering::Relaxed);
});
}
Loading