Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

std: Implement SingleThreaded spawn mode for newsched #8221

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 9 additions & 3 deletions src/libstd/rt/sched.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,14 +39,14 @@ use cell::Cell;
pub struct Scheduler {
/// A queue of available work. Under a work-stealing policy there
/// is one per Scheduler.
priv work_queue: WorkQueue<~Task>,
work_queue: WorkQueue<~Task>,
/// The queue of incoming messages from other schedulers.
/// These are enqueued by SchedHandles after which a remote callback
/// is triggered to handle the message.
priv message_queue: MessageQueue<SchedMessage>,
/// A shared list of sleeping schedulers. We'll use this to wake
/// up schedulers when pushing work onto the work queue.
priv sleeper_list: SleeperList,
sleeper_list: SleeperList,
/// Indicates that we have previously pushed a handle onto the
/// SleeperList but have not yet received the Wake message.
/// Being `true` does not necessarily mean that the scheduler is
Expand Down Expand Up @@ -158,6 +158,9 @@ impl Scheduler {
// scheduler. Grab it out of TLS - performing the scheduler
// action will have given it away.
let sched = Local::take::<Scheduler>();

rtdebug!("starting scheduler %u", sched.sched_id());

sched.run();

// Now that we are done with the scheduler, clean up the
Expand All @@ -166,6 +169,9 @@ impl Scheduler {
// task.run() on the scheduler task we never get through all
// the cleanup code it runs.
let mut stask = Local::take::<Task>();

rtdebug!("stopping scheduler %u", stask.sched.get_ref().sched_id());

stask.destroyed = true;
}

Expand Down Expand Up @@ -484,7 +490,7 @@ impl Scheduler {
return None;
} else if !homed && !this.run_anything {
// the task isn't homed, but it can't be run here
this.enqueue_task(task);
this.send_to_friend(task);
return Some(this);
} else {
// task isn't home, so don't run it here, send it home
Expand Down
23 changes: 18 additions & 5 deletions src/libstd/task/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -971,16 +971,29 @@ fn test_try_fail() {
}
}

#[cfg(test)]
fn get_sched_id() -> int {
if context() == OldTaskContext {
unsafe {
rt::rust_get_sched_id() as int
}
} else {
do Local::borrow::<::rt::sched::Scheduler, int> |sched| {
sched.sched_id() as int
}
}
}

#[test]
fn test_spawn_sched() {
let (po, ch) = stream::<()>();
let ch = SharedChan::new(ch);

fn f(i: int, ch: SharedChan<()>) {
let parent_sched_id = unsafe { rt::rust_get_sched_id() };
let parent_sched_id = get_sched_id();

do spawn_sched(SingleThreaded) {
let child_sched_id = unsafe { rt::rust_get_sched_id() };
let child_sched_id = get_sched_id();
assert!(parent_sched_id != child_sched_id);

if (i == 0) {
Expand All @@ -1000,15 +1013,15 @@ fn test_spawn_sched_childs_on_default_sched() {
let (po, ch) = stream();

// Assuming tests run on the default scheduler
let default_id = unsafe { rt::rust_get_sched_id() };
let default_id = get_sched_id();

let ch = Cell::new(ch);
do spawn_sched(SingleThreaded) {
let parent_sched_id = unsafe { rt::rust_get_sched_id() };
let parent_sched_id = get_sched_id();
let ch = Cell::new(ch.take());
do spawn {
let ch = ch.take();
let child_sched_id = unsafe { rt::rust_get_sched_id() };
let child_sched_id = get_sched_id();
assert!(parent_sched_id != child_sched_id);
assert_eq!(child_sched_id, default_id);
ch.send(());
Expand Down
88 changes: 80 additions & 8 deletions src/libstd/task/spawn.rs
Original file line number Diff line number Diff line change
Expand Up @@ -78,13 +78,13 @@ use cast::transmute;
use cast;
use cell::Cell;
use container::MutableMap;
use comm::{Chan, GenericChan};
use comm::{Chan, GenericChan, oneshot};
use hashmap::{HashSet, HashSetConsumeIterator};
use local_data;
use task::local_data_priv::{local_get, local_set, OldHandle};
use task::rt::rust_task;
use task::rt;
use task::{Failure};
use task::{Failure, SingleThreaded};
use task::{Success, TaskOpts, TaskResult};
use task::unkillable;
use to_bytes::IterBytes;
Expand All @@ -93,9 +93,11 @@ use util;
use unstable::sync::Exclusive;
use rt::{OldTaskContext, TaskContext, SchedulerContext, GlobalContext, context};
use rt::local::Local;
use rt::task::Task;
use rt::task::{Task, Sched};
use rt::kill::KillHandle;
use rt::sched::Scheduler;
use rt::uv::uvio::UvEventLoop;
use rt::thread::Thread;

#[cfg(test)] use task::default_task_opts;
#[cfg(test)] use comm;
Expand Down Expand Up @@ -694,11 +696,81 @@ fn spawn_raw_newsched(mut opts: TaskOpts, f: ~fn()) {
}
};

let mut task = if opts.watched {
Task::build_child(child_wrapper)
} else {
// An unwatched task is a new root in the exit-code propagation tree
Task::build_root(child_wrapper)
let mut task = unsafe {
if opts.sched.mode != SingleThreaded {
if opts.watched {
Task::build_child(child_wrapper)
} else {
Task::build_root(child_wrapper)
}
} else {
// Creating a 1:1 task:thread ...
let sched = Local::unsafe_borrow::<Scheduler>();
let sched_handle = (*sched).make_handle();

// Create a new scheduler to hold the new task
let new_loop = ~UvEventLoop::new();
let mut new_sched = ~Scheduler::new_special(new_loop,
(*sched).work_queue.clone(),
(*sched).sleeper_list.clone(),
false,
Some(sched_handle));
let mut new_sched_handle = new_sched.make_handle();

// Allow the scheduler to exit when the pinned task exits
new_sched_handle.send(Shutdown);

// Pin the new task to the new scheduler
let new_task = if opts.watched {
Task::build_homed_child(child_wrapper, Sched(new_sched_handle))
} else {
Task::build_homed_root(child_wrapper, Sched(new_sched_handle))
};

// Create a task that will later be used to join with the new scheduler
// thread when it is ready to terminate
let (thread_port, thread_chan) = oneshot();
let thread_port_cell = Cell::new(thread_port);
let join_task = do Task::build_child() {
rtdebug!("running join task");
let thread_port = thread_port_cell.take();
let thread: Thread = thread_port.recv();
thread.join();
};

// Put the scheduler into another thread
let new_sched_cell = Cell::new(new_sched);
let orig_sched_handle_cell = Cell::new((*sched).make_handle());
let join_task_cell = Cell::new(join_task);

let thread = do Thread::start {
let mut new_sched = new_sched_cell.take();
let mut orig_sched_handle = orig_sched_handle_cell.take();
let join_task = join_task_cell.take();

let bootstrap_task = ~do Task::new_root(&mut new_sched.stack_pool) || {
rtdebug!("boostraping a 1:1 scheduler");
};
new_sched.bootstrap(bootstrap_task);

rtdebug!("enqueing join_task");
// Now tell the original scheduler to join with this thread
// by scheduling a thread-joining task on the original scheduler
orig_sched_handle.send(TaskFromFriend(join_task));

// NB: We can't simply send a message from here to another task
// because this code isn't running in a task and message passing doesn't
// work outside of tasks. Hence we're sending a scheduler message
// to execute a new task directly to a scheduler.
};

// Give the thread handle to the join task
thread_chan.send(thread);

// When this task is enqueued on the current scheduler it will then get
// forwarded to the scheduler to which it is pinned
new_task
}
};

if opts.notify_chan.is_some() {
Expand Down