Skip to content

Commit

Permalink
fooling around with a removal of the sleeper list
Browse files Browse the repository at this point in the history
  • Loading branch information
toddaaro committed Jul 6, 2014
1 parent 4c0cab7 commit 23a8057
Show file tree
Hide file tree
Showing 2 changed files with 145 additions and 50 deletions.
36 changes: 24 additions & 12 deletions src/libgreen/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -230,7 +230,7 @@ use std::sync::deque;
use std::task::{TaskBuilder, Spawner};

use sched::{Shutdown, Scheduler, SchedHandle, TaskFromFriend, PinnedTask, NewNeighbor};
use sleeper_list::SleeperList;
//use sleeper_list::SleeperList;
use stack::StackPool;
use task::GreenTask;

Expand All @@ -242,7 +242,7 @@ pub mod basic;
pub mod context;
pub mod coroutine;
pub mod sched;
pub mod sleeper_list;
//pub mod sleeper_list;
pub mod stack;
pub mod task;

Expand Down Expand Up @@ -368,7 +368,7 @@ pub struct SchedPool {
next_friend: uint,
stack_pool: StackPool,
deque_pool: deque::BufferPool<Box<task::GreenTask>>,
sleepers: SleeperList,
// sleepers: SleeperList,
factory: fn() -> Box<rtio::EventLoop + Send>,
task_state: TaskState,
tasks_done: Receiver<()>,
Expand Down Expand Up @@ -404,7 +404,7 @@ impl SchedPool {
handles: vec![],
stealers: vec![],
id: unsafe { POOL_ID.fetch_add(1, SeqCst) },
sleepers: SleeperList::new(),
// sleepers: SleeperList::new(),
stack_pool: StackPool::new(),
deque_pool: deque::BufferPool::new(),
next_friend: 0,
Expand All @@ -428,15 +428,27 @@ impl SchedPool {
// Now that we've got all our work queues, create one scheduler per
// queue, spawn the scheduler into a thread, and be sure to keep a
// handle to the scheduler and the thread to keep them alive.
let mut scheds = vec![];
for worker in workers.move_iter() {
rtdebug!("inserting a regular scheduler");
let sched = box Scheduler::new(pool.id,
(pool.factory)(),
worker,
pool.stealers.clone(),
pool.task_state.clone());
scheds.push(sched);

}
// .to_owned_vec();

for i in range(0u, scheds.len()) {
let left = if i > 0 { i - 1 } else { scheds.len() - 1 };
let right = if (i + 1) < scheds.len() { i + 1 } else { 0 };
scheds.get_mut(i).left_sched = Some(scheds.get_mut(left).make_handle());
scheds.get_mut(i).right_sched = Some(scheds.get_mut(right).make_handle());
}

let mut sched = box Scheduler::new(pool.id,
(pool.factory)(),
worker,
pool.stealers.clone(),
pool.sleepers.clone(),
pool.task_state.clone());
for mut sched in scheds.move_iter() {
rtdebug!("inserting a regular scheduler");
pool.handles.push(sched.make_handle());
pool.threads.push(Thread::start(proc() { sched.bootstrap(); }));
}
Expand Down Expand Up @@ -497,7 +509,7 @@ impl SchedPool {
(self.factory)(),
worker,
self.stealers.clone(),
self.sleepers.clone(),
// self.sleepers.clone(),
self.task_state.clone());
let ret = sched.make_handle();
self.handles.push(sched.make_handle());
Expand Down
159 changes: 121 additions & 38 deletions src/libgreen/sched.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,15 @@ use std::rt::task::Task;
use std::sync::deque;
use std::raw;

use std::sync::atomics::{AtomicBool, SeqCst};
use std::sync::Arc;

use std::rand::{XorShiftRng, Rng, Rand};

use TaskState;
use context::Context;
use coroutine::Coroutine;
use sleeper_list::SleeperList;
//use sleeper_list::SleeperList;
use stack::StackPool;
use task::{TypeSched, GreenTask, HomeSched, AnySched};
use msgq = message_queue;
Expand Down Expand Up @@ -58,14 +61,18 @@ pub struct Scheduler {
message_producer: msgq::Producer<SchedMessage>,
/// A shared list of sleeping schedulers. We'll use this to wake
/// up schedulers when pushing work onto the work queue.
sleeper_list: SleeperList,
pub left_sched: Option<SchedHandle>,
pub right_sched: Option<SchedHandle>,

// sleeper_list: SleeperList,
/// Indicates that we have previously pushed a handle onto the
/// SleeperList but have not yet received the Wake message.
/// Being `true` does not necessarily mean that the scheduler is
/// not active since there are multiple event sources that may
/// wake the scheduler. It just prevents the scheduler from pushing
/// multiple handles onto the sleeper list.
sleepy: bool,
// sleepy: bool,
sleepy: Arc<AtomicBool>,
/// A flag to indicate we've received the shutdown message and should
/// no longer try to go to sleep, but exit instead.
no_sleep: bool,
Expand Down Expand Up @@ -126,20 +133,21 @@ impl Scheduler {
event_loop: Box<EventLoop + Send>,
work_queue: deque::Worker<Box<GreenTask>>,
work_queues: Vec<deque::Stealer<Box<GreenTask>>>,
sleeper_list: SleeperList,
// sleeper_list: SleeperList,
state: TaskState)
-> Scheduler {

Scheduler::new_special(pool_id, event_loop, work_queue, work_queues,
sleeper_list, true, None, state)
// sleeper_list, true, None, state)
true, None, state)

}

pub fn new_special(pool_id: uint,
event_loop: Box<EventLoop + Send>,
work_queue: deque::Worker<Box<GreenTask>>,
work_queues: Vec<deque::Stealer<Box<GreenTask>>>,
sleeper_list: SleeperList,
// sleeper_list: SleeperList,
run_anything: bool,
friend: Option<SchedHandle>,
state: TaskState)
Expand All @@ -148,10 +156,13 @@ impl Scheduler {
let (consumer, producer) = msgq::queue();
let mut sched = Scheduler {
pool_id: pool_id,
sleeper_list: sleeper_list,
// sleeper_list: sleeper_list,
left_sched: None,
right_sched: None,
message_queue: consumer,
message_producer: producer,
sleepy: false,
// sleepy: false,
sleepy: Arc::new(AtomicBool::new(false)),
no_sleep: false,
event_loop: event_loop,
work_queue: work_queue,
Expand Down Expand Up @@ -326,11 +337,11 @@ impl Scheduler {
// If we got here then there was no work to do.
// Generate a SchedHandle and push it to the sleeper list so
// somebody can wake us up later.
if !sched.sleepy && !sched.no_sleep {
if !sched.sleepy.load(SeqCst) && !sched.no_sleep {
rtdebug!("scheduler has no work to do, going to sleep");
sched.sleepy = true;
let handle = sched.make_handle();
sched.sleeper_list.push(handle);
sched.sleepy.store(true, SeqCst);
// let handle = sched.make_handle();
// sched.sleeper_list.push(handle);
// Since we are sleeping, deactivate the idle callback.
sched.idle_callback.get_mut_ref().pause();
} else {
Expand Down Expand Up @@ -399,29 +410,77 @@ impl Scheduler {
(sched, task, true)
}
Some(Wake) => {
self.sleepy = false;
self.sleepy.store(false, SeqCst);
(self, stask, true)
}
Some(Shutdown) => {
rtdebug!("shutting down");
if self.sleepy {
// There may be an outstanding handle on the
// sleeper list. Pop them all to make sure that's
// not the case.
loop {
match self.sleeper_list.pop() {
Some(handle) => {
let mut handle = handle;
handle.send(Wake);
}
None => break
}
Some(LeftNeighborUpdate(new_left_sched)) => {

// if the incoming handle is to ourself, drop it for None.
let incoming_sched = new_left_sched.map_or(None, |handle| {
if handle.sched_id == self.sched_id() || self.no_sleep {
None
} else {
Some(handle)
}
});

// if we are already shutting down, forward along the ring


self.left_sched = incoming_sched;
let new_handle = self.make_handle();
match self.left_sched {
Some(ref mut handle) => {
handle.send(RightNeighborUpdate(Some(new_handle)));
}
None => { ( /* pass */ ) }
}
(self, stask, true)
}
Some(RightNeighborUpdate(new_right_sched)) => {

// if the incoming handle is to ourself, drop it for None.
let incoming_sched = new_right_sched.map_or(None, |handle| {
if handle.sched_id == self.sched_id() || self.no_sleep {
None
} else {
Some(handle)
}
});

self.right_sched = incoming_sched;
(self, stask, true)
}
Some(Shutdown) => {
rtdebug!("shutting down");

// No more sleeping. After there are no outstanding
// event loop references we will shut down.
self.no_sleep = true;
self.sleepy = false;
self.sleepy.store(false, SeqCst);

// Notify the two neighbors that this scheduler is
// shutting down. Additionally, notify them that they
// are now neighbors with each other. Shutdown will
// propogate around the ring, with one scheduler
// eventually being notified that it is now neighbors
// with itself. At that point it can delete the
// handles and terminate.

// Always send neighbor updates to the right first,
// and then the right neighbor will update the left
// neighbor.
let left_sched_handle = self.left_sched.take();
let right_sched_handle = self.right_sched.take();

match right_sched_handle {
Some(handle) => {
let mut handle = handle;
handle.send(LeftNeighborUpdate(left_sched_handle));
}
None => { ( /* pass */ ) }
}

(self, stask, true)
}
Some(NewNeighbor(neighbor)) => {
Expand Down Expand Up @@ -591,14 +650,29 @@ impl Scheduler {
// We've made work available. Notify a
// sleeping scheduler.

match self.sleeper_list.casual_pop() {
Some(handle) => {
let mut handle = handle;
handle.send(Wake)
// match self.sleeper_list.casual_pop() {
// Some(handle) => {
// let mut handle = handle;
// handle.send(Wake)
// }
// None => { (/* pass */) }
// };

match self.left_sched {
Some(ref mut handle) => {
handle.wakeup_if_sleepy();
}
None => { (/* pass */) }
};
}
}

match self.right_sched {
Some(ref mut handle) => {
handle.wakeup_if_sleepy();
}
None => { (/* pass */) }
}

}

// * Core Context Switching Functions

Expand Down Expand Up @@ -884,7 +958,8 @@ impl Scheduler {
return SchedHandle {
remote: remote,
queue: self.message_producer.clone(),
sched_id: self.sched_id()
sched_id: self.sched_id(),
sleepy: self.sleepy.clone()
}
}
}
Expand All @@ -901,19 +976,27 @@ pub enum SchedMessage {
PinnedTask(Box<GreenTask>),
TaskFromFriend(Box<GreenTask>),
RunOnce(Box<GreenTask>),
LeftNeighborUpdate(Option<SchedHandle>),
RightNeighborUpdate(Option<SchedHandle>),
}

pub struct SchedHandle {
remote: Box<RemoteCallback + Send>,
queue: msgq::Producer<SchedMessage>,
pub sched_id: uint
pub sched_id: uint,
sleepy: Arc<AtomicBool>
}

impl SchedHandle {
pub fn send(&mut self, msg: SchedMessage) {
self.queue.push(msg);
self.remote.fire();
}
pub fn wakeup_if_sleepy(&mut self) {
if self.sleepy.load(SeqCst) {
self.send(Wake);
}
}
}

struct SchedRunner;
Expand Down Expand Up @@ -1133,7 +1216,7 @@ mod test {

// An advanced test that checks all four possible states that a
// (task,sched) can be in regarding homes.

/*
#[test]
fn test_schedule_home_states() {
use sleeper_list::SleeperList;
Expand Down Expand Up @@ -1262,7 +1345,7 @@ mod test {
special_thread.join();
}).join();
}

*/
//#[test]
//fn test_stress_schedule_task_states() {
// if util::limit_thread_creation_due_to_osx_and_valgrind() { return; }
Expand Down

0 comments on commit 23a8057

Please sign in to comment.