diff --git a/finality-aleph/src/network/clique/incoming.rs b/finality-aleph/src/network/clique/incoming.rs index cab609af98..b7fcaa0c0a 100644 --- a/finality-aleph/src/network/clique/incoming.rs +++ b/finality-aleph/src/network/clique/incoming.rs @@ -1,6 +1,6 @@ use std::fmt::{Display, Error as FmtError, Formatter}; -use futures::channel::mpsc; +use futures::channel::{mpsc, oneshot}; use log::{debug, info}; use crate::network::clique::{ @@ -40,6 +40,7 @@ async fn manage_incoming( stream: S, result_for_parent: mpsc::UnboundedSender>, data_for_user: mpsc::UnboundedSender, + authorization_requests_sender: mpsc::UnboundedSender<(SK::PublicKey, oneshot::Sender)>, ) -> Result<(), IncomingError> { debug!( target: LOG_TARGET, @@ -48,7 +49,13 @@ async fn manage_incoming( let (stream, protocol) = protocol(stream).await?; debug!(target: LOG_TARGET, "Negotiated protocol, running."); Ok(protocol - .manage_incoming(stream, secret_key, result_for_parent, data_for_user) + .manage_incoming( + stream, + secret_key, + result_for_parent, + data_for_user, + authorization_requests_sender, + ) .await?) } @@ -62,9 +69,18 @@ pub async fn incoming( stream: S, result_for_parent: mpsc::UnboundedSender>, data_for_user: mpsc::UnboundedSender, + authorization_requests_sender: mpsc::UnboundedSender<(SK::PublicKey, oneshot::Sender)>, ) { let addr = stream.peer_address_info(); - if let Err(e) = manage_incoming(secret_key, stream, result_for_parent, data_for_user).await { + if let Err(e) = manage_incoming( + secret_key, + stream, + result_for_parent, + data_for_user, + authorization_requests_sender, + ) + .await + { info!( target: LOG_TARGET, "Incoming connection from {} failed: {}.", addr, e diff --git a/finality-aleph/src/network/clique/manager/mod.rs b/finality-aleph/src/network/clique/manager/mod.rs index cb15c3db81..17f5172a8c 100644 --- a/finality-aleph/src/network/clique/manager/mod.rs +++ b/finality-aleph/src/network/clique/manager/mod.rs @@ -232,6 +232,10 @@ impl Manager { pub fn status_report(&self) -> impl Display { ManagerStatus::new(self) } + + pub fn is_authorized(&self, public_key: &PK) -> bool { + self.wanted.interested(public_key) + } } #[cfg(test)] diff --git a/finality-aleph/src/network/clique/mock.rs b/finality-aleph/src/network/clique/mock.rs index 13ade25354..1c95d37439 100644 --- a/finality-aleph/src/network/clique/mock.rs +++ b/finality-aleph/src/network/clique/mock.rs @@ -554,4 +554,5 @@ pub struct MockPrelims { pub data_from_outgoing: Option>, pub result_from_incoming: UnboundedReceiver>, pub result_from_outgoing: UnboundedReceiver>, + pub authorization_requests: mpsc::UnboundedReceiver<(MockPublicKey, oneshot::Sender)>, } diff --git a/finality-aleph/src/network/clique/protocols/mod.rs b/finality-aleph/src/network/clique/protocols/mod.rs index eb056caca8..7a35124606 100644 --- a/finality-aleph/src/network/clique/protocols/mod.rs +++ b/finality-aleph/src/network/clique/protocols/mod.rs @@ -1,6 +1,6 @@ use std::fmt::{Display, Error as FmtError, Formatter}; -use futures::channel::mpsc; +use futures::channel::{mpsc, oneshot}; use crate::network::clique::{ io::{ReceiveError, SendError}, @@ -57,6 +57,8 @@ pub enum ProtocolError { NoParentConnection, /// Data channel closed. NoUserConnection, + /// Authorization error. + NotAuthorized, } impl Display for ProtocolError { @@ -69,6 +71,7 @@ impl Display for ProtocolError { CardiacArrest => write!(f, "heartbeat stopped"), NoParentConnection => write!(f, "cannot send result to service"), NoUserConnection => write!(f, "cannot send data to user"), + NotAuthorized => write!(f, "peer not authorized"), } } } @@ -103,13 +106,35 @@ impl Protocol { &self, stream: S, secret_key: SK, - result_for_service: mpsc::UnboundedSender>, + result_for_parent: mpsc::UnboundedSender>, data_for_user: mpsc::UnboundedSender, + authorization_requests_sender: mpsc::UnboundedSender<( + SK::PublicKey, + oneshot::Sender, + )>, ) -> Result<(), ProtocolError> { use Protocol::*; match self { - V0 => v0::incoming(stream, secret_key, result_for_service, data_for_user).await, - V1 => v1::incoming(stream, secret_key, result_for_service, data_for_user).await, + V0 => { + v0::incoming( + stream, + secret_key, + authorization_requests_sender, + result_for_parent, + data_for_user, + ) + .await + } + V1 => { + v1::incoming( + stream, + secret_key, + authorization_requests_sender, + result_for_parent, + data_for_user, + ) + .await + } } } diff --git a/finality-aleph/src/network/clique/protocols/v0/mod.rs b/finality-aleph/src/network/clique/protocols/v0/mod.rs index 5d93a14f14..e2c996efe4 100644 --- a/finality-aleph/src/network/clique/protocols/v0/mod.rs +++ b/finality-aleph/src/network/clique/protocols/v0/mod.rs @@ -1,4 +1,7 @@ -use futures::{channel::mpsc, StreamExt}; +use futures::{ + channel::{mpsc, oneshot}, + StreamExt, +}; use log::{debug, info, trace}; use tokio::io::{AsyncRead, AsyncWrite}; @@ -88,6 +91,7 @@ async fn receiving( pub async fn incoming( stream: S, secret_key: SK, + authorization_requests_sender: mpsc::UnboundedSender<(SK::PublicKey, oneshot::Sender)>, result_for_parent: mpsc::UnboundedSender>, data_for_user: mpsc::UnboundedSender, ) -> Result<(), ProtocolError> { @@ -98,6 +102,10 @@ pub async fn incoming( "Incoming handshake with {} finished successfully.", public_key ); + if !check_authorization::(authorization_requests_sender, public_key.clone()).await? { + return Err(ProtocolError::NotAuthorized); + } + let (tx_exit, mut exit) = mpsc::unbounded(); result_for_parent .unbounded_send(( @@ -123,14 +131,32 @@ pub async fn incoming( } } +pub async fn check_authorization( + authorization_requests_sender: mpsc::UnboundedSender<(SK::PublicKey, oneshot::Sender)>, + public_key: SK::PublicKey, +) -> Result> { + let (sender, receiver) = oneshot::channel(); + authorization_requests_sender + .unbounded_send((public_key.clone(), sender)) + .map_err(|_| ProtocolError::NoParentConnection)?; + receiver + .await + .map_err(|_| ProtocolError::NoParentConnection) +} + #[cfg(test)] -mod tests { - use futures::{channel::mpsc, pin_mut, FutureExt, StreamExt}; +pub mod tests { + use futures::{ + channel::{mpsc, oneshot}, + pin_mut, Future, FutureExt, StreamExt, + }; - use super::{incoming, outgoing, ProtocolError}; use crate::network::clique::{ mock::{key, MockPrelims, MockSplittable}, - protocols::ConnectionType, + protocols::{ + v0::{incoming, outgoing}, + ConnectionType, ProtocolError, + }, Data, }; @@ -142,9 +168,11 @@ mod tests { let (incoming_result_for_service, result_from_incoming) = mpsc::unbounded(); let (outgoing_result_for_service, result_from_outgoing) = mpsc::unbounded(); let (data_for_user, data_from_incoming) = mpsc::unbounded::(); + let (authorization_requests_sender, authorization_requests) = mpsc::unbounded(); let incoming_handle = Box::pin(incoming( stream_incoming, pen_incoming.clone(), + authorization_requests_sender, incoming_result_for_service, data_for_user, )); @@ -165,9 +193,43 @@ mod tests { data_from_outgoing: None, result_from_incoming, result_from_outgoing, + authorization_requests, } } + fn handle_authorization( + mut authorization_requests: mpsc::UnboundedReceiver<(PK, oneshot::Sender)>, + handler: impl FnOnce(PK) -> bool + Send + 'static, + ) -> impl Future> { + tokio::spawn(async move { + let (public_key, response_sender) = authorization_requests + .next() + .await + .expect("We should recieve at least one authorization request."); + let authorization_result = handler(public_key); + response_sender + .send(authorization_result) + .expect("We should be able to send back an authorization response."); + Result::<(), ()>::Ok(()) + }) + .map(|result| match result { + Ok(ok) => ok, + Err(_) => Err(()), + }) + } + + fn all_pass_authorization_handler( + authorization_requests: mpsc::UnboundedReceiver<(PK, oneshot::Sender)>, + ) -> impl Future> { + handle_authorization(authorization_requests, |_| true) + } + + fn no_go_authorization_handler( + authorization_requests: mpsc::UnboundedReceiver<(PK, oneshot::Sender)>, + ) -> impl Future> { + handle_authorization(authorization_requests, |_| false) + } + #[tokio::test] async fn send_data() { let MockPrelims { @@ -176,8 +238,10 @@ mod tests { mut data_from_incoming, result_from_incoming: _result_from_incoming, mut result_from_outgoing, + authorization_requests, .. } = prepare::>(); + let _authorization_handle = all_pass_authorization_handler(authorization_requests); let incoming_handle = incoming_handle.fuse(); let outgoing_handle = outgoing_handle.fuse(); pin_mut!(incoming_handle); @@ -223,8 +287,10 @@ mod tests { data_from_incoming: _data_from_incoming, mut result_from_incoming, result_from_outgoing: _result_from_outgoing, + authorization_requests, .. } = prepare::>(); + let _authorization_handle = all_pass_authorization_handler(authorization_requests); let incoming_handle = incoming_handle.fuse(); let outgoing_handle = outgoing_handle.fuse(); pin_mut!(incoming_handle); @@ -252,8 +318,10 @@ mod tests { data_from_incoming: _data_from_incoming, result_from_incoming, result_from_outgoing: _result_from_outgoing, + authorization_requests, .. } = prepare::>(); + let _authorization_handle = all_pass_authorization_handler(authorization_requests); std::mem::drop(result_from_incoming); let incoming_handle = incoming_handle.fuse(); let outgoing_handle = outgoing_handle.fuse(); @@ -277,8 +345,10 @@ mod tests { data_from_incoming, result_from_incoming: _result_from_incoming, mut result_from_outgoing, + authorization_requests, .. } = prepare::>(); + let _authorization_handle = all_pass_authorization_handler(authorization_requests); std::mem::drop(data_from_incoming); let incoming_handle = incoming_handle.fuse(); let outgoing_handle = outgoing_handle.fuse(); @@ -315,8 +385,10 @@ mod tests { data_from_incoming: _data_from_incoming, result_from_incoming: _result_from_incoming, result_from_outgoing: _result_from_outgoing, + authorization_requests, .. } = prepare::>(); + let _authorization_handle = all_pass_authorization_handler(authorization_requests); std::mem::drop(outgoing_handle); match incoming_handle.await { Err(ProtocolError::HandshakeError(_)) => (), @@ -333,8 +405,10 @@ mod tests { data_from_incoming: _data_from_incoming, mut result_from_incoming, result_from_outgoing: _result_from_outgoing, + authorization_requests, .. } = prepare::>(); + let _authorization_handle = all_pass_authorization_handler(authorization_requests); let incoming_handle = incoming_handle.fuse(); pin_mut!(incoming_handle); let (_, _exit, connection_type) = tokio::select! { @@ -359,8 +433,10 @@ mod tests { data_from_incoming: _data_from_incoming, result_from_incoming: _result_from_incoming, result_from_outgoing: _result_from_outgoing, + authorization_requests, .. } = prepare::>(); + let _authorization_handle = all_pass_authorization_handler(authorization_requests); std::mem::drop(incoming_handle); match outgoing_handle.await { Err(ProtocolError::HandshakeError(_)) => (), @@ -377,8 +453,10 @@ mod tests { data_from_incoming: _data_from_incoming, mut result_from_incoming, result_from_outgoing: _result_from_outgoing, + authorization_requests, .. } = prepare::>(); + let _authorization_handle = all_pass_authorization_handler(authorization_requests); let outgoing_handle = outgoing_handle.fuse(); pin_mut!(outgoing_handle); let (_, _exit, connection_type) = tokio::select! { @@ -405,8 +483,10 @@ mod tests { data_from_incoming: _data_from_incoming, mut result_from_incoming, result_from_outgoing: _result_from_outgoing, + authorization_requests, .. } = prepare::>(); + let _authorization_handle = all_pass_authorization_handler(authorization_requests); let outgoing_handle = outgoing_handle.fuse(); pin_mut!(outgoing_handle); let (_, _exit, connection_type) = tokio::select! { @@ -421,4 +501,33 @@ mod tests { Ok(_) => panic!("successfully finished when connection dead"), }; } + + #[tokio::test] + async fn do_not_call_sender_and_receiver_until_authorized() { + let MockPrelims { + incoming_handle, + outgoing_handle, + mut data_from_incoming, + mut result_from_incoming, + authorization_requests, + .. + } = prepare::>(); + + let authorization_handle = no_go_authorization_handler(authorization_requests); + + // since we are returning `NotAuthorized` all except `outgoing_handle` should finish hapilly + let (incoming_result, outgoing_result, authorization_result) = + tokio::join!(incoming_handle, outgoing_handle, authorization_handle); + + assert!(incoming_result.is_err()); + assert!(outgoing_result.is_err()); + // this also verifies if it was called at all + assert!(authorization_result.is_ok()); + + let data_from_incoming = data_from_incoming.try_next(); + assert!(data_from_incoming.ok().flatten().is_none()); + + let result_from_incoming = result_from_incoming.try_next(); + assert!(result_from_incoming.ok().flatten().is_none()); + } } diff --git a/finality-aleph/src/network/clique/protocols/v1/mod.rs b/finality-aleph/src/network/clique/protocols/v1/mod.rs index 701baeb244..713c5b05e4 100644 --- a/finality-aleph/src/network/clique/protocols/v1/mod.rs +++ b/finality-aleph/src/network/clique/protocols/v1/mod.rs @@ -1,5 +1,8 @@ use codec::{Decode, Encode}; -use futures::{channel::mpsc, StreamExt}; +use futures::{ + channel::{mpsc, oneshot}, + StreamExt, +}; use log::{debug, info, trace}; use tokio::{ io::{AsyncRead, AsyncWrite}, @@ -10,6 +13,7 @@ use crate::network::clique::{ io::{receive_data, send_data}, protocols::{ handshake::{v0_handshake_incoming, v0_handshake_outgoing}, + v0::check_authorization, ConnectionType, ProtocolError, ResultForService, }, Data, PublicKey, SecretKey, Splittable, LOG_TARGET, @@ -106,6 +110,7 @@ pub async fn outgoing( ConnectionType::New, )) .map_err(|_| ProtocolError::NoParentConnection)?; + debug!( target: LOG_TARGET, "Starting worker for communicating with {}.", public_key @@ -119,6 +124,7 @@ pub async fn outgoing( pub async fn incoming( stream: S, secret_key: SK, + authorization_requests_sender: mpsc::UnboundedSender<(SK::PublicKey, oneshot::Sender)>, result_for_parent: mpsc::UnboundedSender>, data_for_user: mpsc::UnboundedSender, ) -> Result<(), ProtocolError> { @@ -128,6 +134,11 @@ pub async fn incoming( target: LOG_TARGET, "Incoming handshake with {} finished successfully.", public_key ); + + if !check_authorization::(authorization_requests_sender, public_key.clone()).await? { + return Err(ProtocolError::NotAuthorized); + } + let (data_for_network, data_from_user) = mpsc::unbounded(); result_for_parent .unbounded_send(( @@ -145,12 +156,17 @@ pub async fn incoming( #[cfg(test)] mod tests { - use futures::{channel::mpsc, pin_mut, FutureExt, StreamExt}; + use futures::{ + channel::{mpsc, oneshot}, + pin_mut, Future, FutureExt, StreamExt, + }; - use super::{incoming, outgoing, ProtocolError}; use crate::network::clique::{ mock::{key, MockPrelims, MockSplittable}, - protocols::ConnectionType, + protocols::{ + v1::{incoming, outgoing}, + ConnectionType, ProtocolError, + }, Data, }; @@ -163,9 +179,11 @@ mod tests { let (outgoing_result_for_service, result_from_outgoing) = mpsc::unbounded(); let (incoming_data_for_user, data_from_incoming) = mpsc::unbounded::(); let (outgoing_data_for_user, data_from_outgoing) = mpsc::unbounded::(); + let (authorization_requests_sender, authorization_requests) = mpsc::unbounded(); let incoming_handle = Box::pin(incoming( stream_incoming, pen_incoming.clone(), + authorization_requests_sender, incoming_result_for_service, incoming_data_for_user, )); @@ -187,9 +205,43 @@ mod tests { data_from_outgoing: Some(data_from_outgoing), result_from_incoming, result_from_outgoing, + authorization_requests, } } + fn handle_authorization( + mut authorization_requests: mpsc::UnboundedReceiver<(PK, oneshot::Sender)>, + handler: impl FnOnce(PK) -> bool + Send + 'static, + ) -> impl Future> { + tokio::spawn(async move { + let (public_key, response_sender) = authorization_requests + .next() + .await + .expect("We should recieve at least one authorization request."); + let authorization_result = handler(public_key); + response_sender + .send(authorization_result) + .expect("We should be able to send back an authorization response."); + Result::<(), ()>::Ok(()) + }) + .map(|result| match result { + Ok(ok) => ok, + Err(_) => Err(()), + }) + } + + fn all_pass_authorization_handler( + authorization_requests: mpsc::UnboundedReceiver<(PK, oneshot::Sender)>, + ) -> impl Future> { + handle_authorization(authorization_requests, |_| true) + } + + fn no_go_authorization_handler( + authorization_requests: mpsc::UnboundedReceiver<(PK, oneshot::Sender)>, + ) -> impl Future> { + handle_authorization(authorization_requests, |_| false) + } + #[tokio::test] async fn send_data() { let MockPrelims { @@ -199,6 +251,7 @@ mod tests { data_from_outgoing, mut result_from_incoming, mut result_from_outgoing, + authorization_requests, .. } = prepare::>(); let mut data_from_outgoing = data_from_outgoing.expect("No data from outgoing!"); @@ -206,6 +259,7 @@ mod tests { let outgoing_handle = outgoing_handle.fuse(); pin_mut!(incoming_handle); pin_mut!(outgoing_handle); + let _authorization_handle = all_pass_authorization_handler(authorization_requests); let _data_for_outgoing = tokio::select! { _ = &mut incoming_handle => panic!("incoming process unexpectedly finished"), _ = &mut outgoing_handle => panic!("outgoing process unexpectedly finished"), @@ -278,12 +332,14 @@ mod tests { data_from_outgoing: _data_from_outgoing, mut result_from_incoming, result_from_outgoing: _result_from_outgoing, + authorization_requests, .. } = prepare::>(); let incoming_handle = incoming_handle.fuse(); let outgoing_handle = outgoing_handle.fuse(); pin_mut!(incoming_handle); pin_mut!(outgoing_handle); + let _authorization_handle = all_pass_authorization_handler(authorization_requests); tokio::select! { _ = &mut incoming_handle => panic!("incoming process unexpectedly finished"), _ = &mut outgoing_handle => panic!("outgoing process unexpectedly finished"), @@ -308,6 +364,7 @@ mod tests { data_from_outgoing: _data_from_outgoing, result_from_incoming, result_from_outgoing: _result_from_outgoing, + authorization_requests, .. } = prepare::>(); std::mem::drop(result_from_incoming); @@ -315,6 +372,7 @@ mod tests { let outgoing_handle = outgoing_handle.fuse(); pin_mut!(incoming_handle); pin_mut!(outgoing_handle); + let _authorization_handle = all_pass_authorization_handler(authorization_requests); tokio::select! { e = &mut incoming_handle => match e { Err(ProtocolError::NoParentConnection) => (), @@ -334,6 +392,7 @@ mod tests { data_from_outgoing: _data_from_outgoing, result_from_incoming: _result_from_incoming, mut result_from_outgoing, + authorization_requests, .. } = prepare::>(); std::mem::drop(data_from_incoming); @@ -341,6 +400,7 @@ mod tests { let outgoing_handle = outgoing_handle.fuse(); pin_mut!(incoming_handle); pin_mut!(outgoing_handle); + let _authorization_handle = all_pass_authorization_handler(authorization_requests); let _data_for_outgoing = tokio::select! { _ = &mut incoming_handle => panic!("incoming process unexpectedly finished"), _ = &mut outgoing_handle => panic!("outgoing process unexpectedly finished"), @@ -373,8 +433,10 @@ mod tests { data_from_outgoing: _data_from_outgoing, result_from_incoming: _result_from_incoming, result_from_outgoing: _result_from_outgoing, + authorization_requests, .. } = prepare::>(); + let _authorization_handle = all_pass_authorization_handler(authorization_requests); std::mem::drop(outgoing_handle); match incoming_handle.await { Err(ProtocolError::HandshakeError(_)) => (), @@ -392,8 +454,10 @@ mod tests { data_from_outgoing: _data_from_outgoing, mut result_from_incoming, result_from_outgoing: _result_from_outgoing, + authorization_requests, .. } = prepare::>(); + let _authorization_handle = all_pass_authorization_handler(authorization_requests); let incoming_handle = incoming_handle.fuse(); pin_mut!(incoming_handle); let (_, _exit, connection_type) = tokio::select! { @@ -419,8 +483,10 @@ mod tests { data_from_outgoing: _data_from_outgoing, result_from_incoming: _result_from_incoming, result_from_outgoing: _result_from_outgoing, + authorization_requests, .. } = prepare::>(); + let _authorization_handle = all_pass_authorization_handler(authorization_requests); std::mem::drop(incoming_handle); match outgoing_handle.await { Err(ProtocolError::HandshakeError(_)) => (), @@ -428,4 +494,33 @@ mod tests { Ok(_) => panic!("successfully finished when connection dead"), }; } + + #[tokio::test] + async fn do_not_call_sender_and_receiver_until_authorized() { + let MockPrelims { + incoming_handle, + outgoing_handle, + mut data_from_incoming, + mut result_from_incoming, + authorization_requests, + .. + } = prepare::>(); + + let authorization_handle = no_go_authorization_handler(authorization_requests); + + // since we are returning `NotAuthorized` all except `outgoing_handle` should finish hapilly + let (incoming_result, outgoing_result, authorization_result) = + tokio::join!(incoming_handle, outgoing_handle, authorization_handle); + + assert!(incoming_result.is_err()); + assert!(outgoing_result.is_err()); + // this also verifies if it was called at all + assert!(authorization_result.is_ok()); + + let data_from_incoming = data_from_incoming.try_next(); + assert!(data_from_incoming.ok().flatten().is_none()); + + let result_from_incoming = result_from_incoming.try_next(); + assert!(result_from_incoming.ok().flatten().is_none()); + } } diff --git a/finality-aleph/src/network/clique/service.rs b/finality-aleph/src/network/clique/service.rs index 470ca0898c..7905cc0b07 100644 --- a/finality-aleph/src/network/clique/service.rs +++ b/finality-aleph/src/network/clique/service.rs @@ -126,7 +126,7 @@ where } fn spawn_new_outgoing( - &self, + &mut self, public_key: SK::PublicKey, address: A, result_for_parent: mpsc::UnboundedSender>, @@ -152,12 +152,23 @@ where &self, stream: NL::Connection, result_for_parent: mpsc::UnboundedSender>, + authorization_requests_sender: mpsc::UnboundedSender<( + SK::PublicKey, + oneshot::Sender, + )>, ) { let secret_key = self.secret_key.clone(); let next_to_interface = self.next_to_interface.clone(); self.spawn_handle .spawn("aleph/clique_network_incoming", None, async move { - incoming(secret_key, stream, result_for_parent, next_to_interface).await; + incoming( + secret_key, + stream, + result_for_parent, + next_to_interface, + authorization_requests_sender, + ) + .await; }); } @@ -233,12 +244,13 @@ where pub async fn run(mut self, mut exit: oneshot::Receiver<()>) { let mut status_ticker = time::interval(STATUS_REPORT_INTERVAL); let (result_for_parent, mut worker_results) = mpsc::unbounded(); + let (authorization_requests_sender, mut authorization_requests) = mpsc::unbounded(); use ServiceCommand::*; loop { tokio::select! { // got new incoming connection from the listener - spawn an incoming worker maybe_stream = self.listener.accept() => match maybe_stream { - Ok(stream) => self.spawn_new_incoming(stream, result_for_parent.clone()), + Ok(stream) => self.spawn_new_incoming(stream, result_for_parent.clone(), authorization_requests_sender.clone()), Err(e) => warn!(target: LOG_TARGET, "Listener failed to accept connection: {}", e), }, // got a new command from the interface @@ -276,6 +288,12 @@ where } }, }, + Some((public_key, response_channel)) = authorization_requests.next() => { + let authorization_result = self.manager.is_authorized(&public_key); + if response_channel.send(authorization_result).is_err() { + warn!(target: LOG_TARGET, "Other side of the Authorization Service is already closed."); + } + }, // received information from a spawned worker managing a connection // check if we still want to be connected to the peer, and if so, spawn a new worker or actually add proper connection Some((public_key, maybe_data_for_network, connection_type)) = worker_results.next() => { diff --git a/scripts/synthetic-network/run_script_for_synthetic-network.sh b/scripts/synthetic-network/run_script_for_synthetic-network.sh index 419df5ad4b..7a8e88b487 100755 --- a/scripts/synthetic-network/run_script_for_synthetic-network.sh +++ b/scripts/synthetic-network/run_script_for_synthetic-network.sh @@ -50,7 +50,7 @@ if [[ "$UPDATE" = true ]]; then git submodule update fi -cd synthetic-network/frontend +cd scripts/synthetic-network/vendor/synthetic-network/frontend log "running .js script" node $SCRIPT_PATH ${@:1}