Skip to content

Commit

Permalink
Merge pull request #128 from Jonathas-Conceicao/topic/fix_memory_leak
Browse files Browse the repository at this point in the history
actix-rt: Spawn future to cleanup pending JoinHandles
  • Loading branch information
JohnTitor authored Apr 29, 2020
2 parents 700997f + 6906f25 commit 1b4a117
Show file tree
Hide file tree
Showing 2 changed files with 45 additions and 7 deletions.
1 change: 1 addition & 0 deletions actix-rt/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -21,4 +21,5 @@ actix-threadpool = "0.3"
futures-channel = { version = "0.3.4", default-features = false }
futures-util = { version = "0.3.4", default-features = false, features = ["alloc"] }
copyless = "0.1.4"
smallvec = "1"
tokio = { version = "0.2.6", default-features = false, features = ["rt-core", "rt-util", "io-driver", "tcp", "uds", "udp", "time", "signal", "stream"] }
51 changes: 44 additions & 7 deletions actix-rt/src/arbiter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,14 @@ use crate::system::System;

use copyless::BoxHelper;

use smallvec::SmallVec;
pub use tokio::task::JoinHandle;

thread_local!(
static ADDR: RefCell<Option<Arbiter>> = RefCell::new(None);
static RUNNING: Cell<bool> = Cell::new(false);
static Q: RefCell<Vec<Pin<Box<dyn Future<Output = ()>>>>> = RefCell::new(Vec::new());
static PENDING: RefCell<Vec<JoinHandle<()>>> = RefCell::new(Vec::new());
static PENDING: RefCell<SmallVec<[JoinHandle<()>; 8]>> = RefCell::new(SmallVec::new());
static STORAGE: RefCell<HashMap<TypeId, Box<dyn Any>>> = RefCell::new(HashMap::new());
);

Expand Down Expand Up @@ -181,9 +182,15 @@ impl Arbiter {
RUNNING.with(move |cell| {
if cell.get() {
// Spawn the future on running executor
PENDING.with(move |cell| {
cell.borrow_mut().push(tokio::task::spawn_local(future));
})
let len = PENDING.with(move |cell| {
let mut p = cell.borrow_mut();
p.push(tokio::task::spawn_local(future));
p.len()
});
if len > 7 {
// Before reaching the inline size
tokio::task::spawn_local(CleanupPending);
}
} else {
// Box the future and push it to the queue, this results in double boxing
// because the executor boxes the future again, but works for now
Expand Down Expand Up @@ -311,12 +318,36 @@ impl Arbiter {
/// have completed.
pub fn local_join() -> impl Future<Output = ()> {
PENDING.with(move |cell| {
let current = cell.replace(Vec::new());
let current = cell.replace(SmallVec::new());
future::join_all(current).map(|_| ())
})
}
}

/// Future used for cleaning-up already finished `JoinHandle`s
/// from the `PENDING` list so the vector doesn't grow indefinitely
struct CleanupPending;

impl Future for CleanupPending {
type Output = ();

fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
PENDING.with(move |cell| {
let mut pending = cell.borrow_mut();
let mut i = 0;
while i != pending.len() {
if let Poll::Ready(_) = Pin::new(&mut pending[i]).poll(cx) {
pending.remove(i);
} else {
i += 1;
}
}
});

Poll::Ready(())
}
}

struct ArbiterController {
stop: Option<Sender<i32>>,
rx: UnboundedReceiver<ArbiterCommand>,
Expand Down Expand Up @@ -350,9 +381,15 @@ impl Future for ArbiterController {
return Poll::Ready(());
}
ArbiterCommand::Execute(fut) => {
PENDING.with(move |cell| {
cell.borrow_mut().push(tokio::task::spawn_local(fut));
let len = PENDING.with(move |cell| {
let mut p = cell.borrow_mut();
p.push(tokio::task::spawn_local(fut));
p.len()
});
if len > 7 {
// Before reaching the inline size
tokio::task::spawn_local(CleanupPending);
}
}
ArbiterCommand::ExecuteFn(f) => {
f.call_box();
Expand Down

0 comments on commit 1b4a117

Please sign in to comment.