Skip to content

Commit

Permalink
ping() upon fill()ing
Browse files Browse the repository at this point in the history
  • Loading branch information
matteomonti committed Nov 21, 2022
1 parent 2dd3a9c commit ae7c27d
Showing 1 changed file with 20 additions and 17 deletions.
37 changes: 20 additions & 17 deletions src/net/plex/plex_connector.rs
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@ impl PlexConnector {
.map(|remote| {
let connector = self.connector.clone();
let connections_per_remote = self.settings.connections_per_remote;
let multiplex_settings = self.settings.multiplex_settings.clone();

fuse.spawn(async move {
let fuse = Fuse::new();
Expand All @@ -91,48 +92,50 @@ impl PlexConnector {

for _ in 0..connections_per_remote {
let connector = connector.clone();
let multiplex_settings = multiplex_settings.clone();

let connect_handle =
fuse.spawn(async move { connector.connect(remote).await });
let connect_handle = fuse.spawn(async move {
connector.connect(remote).await.map(|connection| {
let multiplex =
Multiplex::new(Role::Connector, connection, multiplex_settings);

let (multiplex, _) = multiplex.split();
multiplex.ping();

multiplex
})
});

connect_handles.push(connect_handle);
time::sleep(interval).await;
}

let mut connections = Vec::new();
let mut new_multiplexes = Vec::new();

for connect_handle in connect_handles {
if let Ok(connection) = connect_handle.await.unwrap().unwrap() {
connections.push(connection)
if let Ok(multiplex) = connect_handle.await.unwrap().unwrap() {
new_multiplexes.push(multiplex)
}
}

(remote, connections)
(remote, new_multiplexes)
})
})
.collect::<Vec<_>>();

for remote_handle in remote_handles {
let (remote, connections) = remote_handle.await.unwrap().unwrap();
let (remote, new_multiplexes) = remote_handle.await.unwrap().unwrap();

let multiplexes = self.pool.lock().get_multiplexes(remote);
let mut multiplexes = multiplexes.lock().await;

let missing = self.settings.connections_per_remote - multiplexes.len();

multiplexes.extend(
connections
new_multiplexes
.into_iter()
.map(|connection| {
let multiplex = Multiplex::new(
Role::Connector,
connection,
self.settings.multiplex_settings.clone(),
);

let (multiplex, _) = multiplex.split();
.map(|multiplex| {
let id = self.cursor.fetch_add(1, Ordering::Relaxed);

(id, multiplex)
})
.take(missing),
Expand Down

0 comments on commit ae7c27d

Please sign in to comment.