Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Web workers #1993

Merged
merged 27 commits into from
Apr 1, 2019
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
6481082
Web workers pass 1
afinch7 Mar 23, 2019
368bbfd
Make linter happy
afinch7 Mar 23, 2019
d28a44d
fmt
afinch7 Mar 23, 2019
84e56d8
Refactored the way worker polling is scheduled and errors are handled.
afinch7 Mar 25, 2019
df12a00
Some cleanup and handle worker close on host end.
afinch7 Mar 26, 2019
99f6db1
Merge branch 'master' into web_workers
afinch7 Mar 26, 2019
a1b048c
Share the worker future as a Shared
afinch7 Mar 26, 2019
4f52d6b
Merge branch 'web_workers' of https://github.com/afinch7/deno into we…
afinch7 Mar 26, 2019
8d30513
Fixed some merge issues and added a worker test
afinch7 Mar 26, 2019
ec2149b
compile_sync returns errors now
afinch7 Mar 27, 2019
2103d32
Refactored compile_sync again
afinch7 Mar 27, 2019
1a6be92
fixed bugs and moved compiler error exit back to compile_sync
afinch7 Mar 27, 2019
4eb80ab
lots of comments for compile_sync
afinch7 Mar 28, 2019
1e2c184
refactored compile_sync again to use a future that is less blocking.
afinch7 Mar 28, 2019
cfd3289
Merge branch 'master' into web_workers
afinch7 Mar 28, 2019
e245db1
enable debug to find this problem
afinch7 Mar 28, 2019
58de310
remove old comments
afinch7 Mar 28, 2019
e6a3bab
maybe same tokio runtime for all compiler tasks?
afinch7 Mar 28, 2019
fa72bc9
remove debug from tests
afinch7 Mar 28, 2019
32d7ad4
worker ts lib
afinch7 Mar 30, 2019
a9c8271
requested change: workers test assert
afinch7 Mar 31, 2019
5b101a8
renamed WebWorkerBehavior to UserWorkerBehaivor and moved to workers.rs
afinch7 Mar 31, 2019
8388fa2
Merge remote-tracking branch 'upstream/master' into web_workers
afinch7 Mar 31, 2019
53fcd75
fixed remaining merge issues
afinch7 Mar 31, 2019
6e173b8
remove thread spawn from worker tests
afinch7 Mar 31, 2019
5815927
removed worker specific snapshot/bundle
afinch7 Apr 1, 2019
b0881d4
forgot to remove the startup data function
afinch7 Apr 1, 2019
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 6 additions & 3 deletions cli/compiler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ use crate::resources::ResourceId;
use crate::startup_data;
use crate::workers;
use crate::workers::WorkerBehavior;
use crate::workers::WorkerInit;
use deno_core::deno_buf;
use deno_core::deno_mod;
use deno_core::Behavior;
Expand Down Expand Up @@ -70,6 +71,7 @@ impl WorkerBehavior for CompilerBehavior {
self.state.flags.clone(),
self.state.argv.clone(),
Some(worker_channels),
None,
));
}
}
Expand Down Expand Up @@ -111,8 +113,9 @@ fn lazy_start(parent_state: Arc<IsolateState>) -> Resource {
parent_state.flags.clone(),
parent_state.argv.clone(),
None,
None,
))),
"compilerMain()".to_string(),
WorkerInit::Script("compilerMain()".to_string()),
);
resource.rid
});
Expand All @@ -138,10 +141,10 @@ pub fn compile_sync(

let compiler = lazy_start(parent_state);

let send_future = resources::worker_post_message(compiler.rid, req_msg);
let send_future = resources::post_message_to_worker(compiler.rid, req_msg);
send_future.wait().unwrap();

let recv_future = resources::worker_recv_message(compiler.rid);
let recv_future = resources::get_message_from_worker(compiler.rid);
let result = recv_future.wait().unwrap();
assert!(result.is_some());
let res_msg = result.unwrap();
Expand Down
4 changes: 2 additions & 2 deletions cli/isolate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -216,7 +216,7 @@ mod tests {
let argv = vec![String::from("./deno"), filename.clone()];
let (flags, rest_argv, _) = flags::set_flags(argv).unwrap();

let state = Arc::new(IsolateState::new(flags, rest_argv, None));
let state = Arc::new(IsolateState::new(flags, rest_argv, None, None));
let state_ = state.clone();
tokio_util::run(lazy(move || {
let cli = CliBehavior::new(None, state.clone());
Expand All @@ -239,7 +239,7 @@ mod tests {
let argv = vec![String::from("./deno"), filename.clone()];
let (flags, rest_argv, _) = flags::set_flags(argv).unwrap();

let state = Arc::new(IsolateState::new(flags, rest_argv, None));
let state = Arc::new(IsolateState::new(flags, rest_argv, None, None));
let state_ = state.clone();
tokio_util::run(lazy(move || {
let cli = CliBehavior::new(None, state.clone());
Expand Down
45 changes: 35 additions & 10 deletions cli/isolate_state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,13 +43,21 @@ pub struct IsolateState {
pub modules: Mutex<Modules>,
pub worker_channels: Option<Mutex<WorkerChannels>>,
pub global_timer: Mutex<GlobalTimer>,
pub main_module_specifier: Option<String>,
}

// TODO(afinch7) abstract out IsolateState to a trait, since
// some isolates don't require or shouldn't be given some
// state data
ry marked this conversation as resolved.
Show resolved Hide resolved
impl IsolateState {
/// main_module should be a fully qualified url
/// argv will be used to determine main module
/// if not sepcified
pub fn new(
flags: flags::DenoFlags,
argv_rest: Vec<String>,
worker_channels: Option<WorkerChannels>,
main_module_specifier: Option<String>,
) -> Self {
let custom_root = env::var("DENO_DIR").map(|s| s.into()).ok();

Expand All @@ -63,20 +71,37 @@ impl IsolateState {
modules: Mutex::new(Modules::new()),
worker_channels: worker_channels.map(Mutex::new),
global_timer: Mutex::new(GlobalTimer::new()),
main_module_specifier,
}
}

/// Attempt to resolve explicit main module specifier from creation
/// if present or extract it from argv_rest if not
pub fn main_module(&self) -> Option<String> {
if self.argv.len() <= 1 {
None
} else {
let specifier = self.argv[1].clone();
let referrer = ".";
match self.dir.resolve_module_url(&specifier, referrer) {
Ok(url) => Some(url.to_string()),
Err(e) => {
debug!("Potentially swallowed error {}", e);
match &self.main_module_specifier {
Some(specifier) => {
let referrer = ".";
match self.dir.resolve_module_url(&specifier, referrer) {
Ok(url) => Some(url.to_string()),
Err(e) => {
debug!("Potentially swallowed error {}", e);
None
}
}
}
None => {
if self.argv.len() <= 1 {
None
} else {
let specifier = self.argv[1].clone();
let referrer = ".";
match self.dir.resolve_module_url(&specifier, referrer) {
Ok(url) => Some(url.to_string()),
Err(e) => {
debug!("Potentially swallowed error {}", e);
None
}
}
}
}
}
Expand Down Expand Up @@ -112,7 +137,7 @@ impl IsolateState {
let argv = vec![String::from("./deno"), String::from("hello.js")];
// For debugging: argv.push_back(String::from("-D"));
let (flags, rest_argv, _) = flags::set_flags(argv).unwrap();
IsolateState::new(flags, rest_argv, None)
IsolateState::new(flags, rest_argv, None, None)
}

pub fn metrics_op_dispatched(
Expand Down
3 changes: 2 additions & 1 deletion cli/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ mod startup_data;
mod tokio_util;
mod tokio_write;
pub mod version;
pub mod web_worker_behavior;
pub mod workers;

use crate::cli_behavior::CliBehavior;
Expand Down Expand Up @@ -108,7 +109,7 @@ fn main() {
let should_prefetch = flags.prefetch || flags.info;
let should_display_info = flags.info;

let state = Arc::new(IsolateState::new(flags, rest_argv, None));
let state = Arc::new(IsolateState::new(flags, rest_argv, None, None));
let state_ = state.clone();
let startup_data = startup_data::deno_isolate_init();
let cli = CliBehavior::new(Some(startup_data), state_);
Expand Down
31 changes: 31 additions & 0 deletions cli/msg.fbs
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,11 @@ union Any {
StatRes,
Symlink,
Truncate,
CreateWorker,
CreateWorkerRes,
HostGetMessage,
HostGetMessageRes,
HostPostMessage,
afinch7 marked this conversation as resolved.
Show resolved Hide resolved
WorkerGetMessage,
WorkerGetMessageRes,
WorkerPostMessage,
Expand Down Expand Up @@ -174,6 +179,31 @@ table FormatErrorRes {
error: string;
}

// Create worker as host
table CreateWorker {
specifier: string;
}

table CreateWorkerRes {
rid: uint32;
}

// Get message from guest worker as host
table HostGetMessage {
rid: uint32;
}

table HostGetMessageRes {
data: [ubyte];
}

// Post message to guest worker as host
table HostPostMessage {
rid: uint32;
// data passed thru the zero-copy data parameter.
}

// Get message from host as guest worker
table WorkerGetMessage {
unused: int8;
}
Expand All @@ -182,6 +212,7 @@ table WorkerGetMessageRes {
data: [ubyte];
}

// Post message to host as guest worker
table WorkerPostMessage {
// data passed thru the zero-copy data parameter.
}
Expand Down
120 changes: 118 additions & 2 deletions cli/ops.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ use crate::resources::Resource;
use crate::tokio_util;
use crate::tokio_write;
use crate::version;
use crate::web_worker_behavior;
use crate::workers;
use deno_core::deno_buf;
use deno_core::Buf;
use deno_core::JSError;
Expand Down Expand Up @@ -141,13 +143,24 @@ pub fn dispatch_all(
(base.sync(), boxed_op)
}

/// Superset of op_selector_worker for compiler isolates
pub fn op_selector_compiler(inner_type: msg::Any) -> Option<OpCreator> {
match inner_type {
msg::Any::FetchModuleMetaData => Some(op_fetch_module_meta_data),
_ => op_selector_worker(inner_type),
}
}

/// Superset of op_selector_std for worker isolates
pub fn op_selector_worker(inner_type: msg::Any) -> Option<OpCreator> {
match inner_type {
msg::Any::WorkerGetMessage => Some(op_worker_get_message),
msg::Any::WorkerPostMessage => Some(op_worker_post_message),
_ => op_selector_std(inner_type),
}
}

/// Standard ops set for most isolates
pub fn op_selector_std(inner_type: msg::Any) -> Option<OpCreator> {
match inner_type {
msg::Any::Accept => Some(op_accept),
Expand Down Expand Up @@ -190,8 +203,9 @@ pub fn op_selector_std(inner_type: msg::Any) -> Option<OpCreator> {
msg::Any::Stat => Some(op_stat),
msg::Any::Symlink => Some(op_symlink),
msg::Any::Truncate => Some(op_truncate),
msg::Any::WorkerGetMessage => Some(op_worker_get_message),
msg::Any::WorkerPostMessage => Some(op_worker_post_message),
msg::Any::CreateWorker => Some(op_create_worker),
msg::Any::HostGetMessage => Some(op_host_get_message),
msg::Any::HostPostMessage => Some(op_host_post_message),
msg::Any::Write => Some(op_write),
msg::Any::WriteFile => Some(op_write_file),
_ => None,
Expand Down Expand Up @@ -1806,6 +1820,7 @@ impl Future for GetMessageFuture {
}
}

/// Get message from host as guest worker
fn op_worker_get_message(
sc: &IsolateStateContainer,
base: &msg::Base<'_>,
Expand Down Expand Up @@ -1840,6 +1855,7 @@ fn op_worker_get_message(
Box::new(op)
}

/// Post message to host as guest worker
fn op_worker_post_message(
sc: &IsolateStateContainer,
base: &msg::Base<'_>,
Expand Down Expand Up @@ -1872,3 +1888,103 @@ fn op_worker_post_message(
});
Box::new(op)
}

/// Create worker as the host
fn op_create_worker(
sc: &IsolateStateContainer,
base: &msg::Base<'_>,
data: deno_buf,
) -> Box<OpWithError> {
assert_eq!(data.len(), 0);
let cmd_id = base.cmd_id();
let inner = base.inner_as_create_worker().unwrap();
let specifier = inner.specifier();

Box::new(futures::future::result(move || -> OpResult {
let parent_state = sc.state().clone();
let worker_state = Arc::new(IsolateState::new(
parent_state.flags.clone(),
parent_state.argv.clone(),
None,
specifier.map(|v| v.to_string()),
));
let behavior = web_worker_behavior::WebWorkerBehavior::new(worker_state);
let resource = workers::spawn(behavior, workers::WorkerInit::MainModule());
let builder = &mut FlatBufferBuilder::new();
let msg_inner = msg::CreateWorkerRes::create(
builder,
&msg::CreateWorkerResArgs { rid: resource.rid },
);
Ok(serialize_response(
cmd_id,
builder,
msg::BaseArgs {
inner: Some(msg_inner.as_union_value()),
inner_type: msg::Any::CreateWorkerRes,
..Default::default()
},
))
}()))
}

/// Get message from guest worker as host
fn op_host_get_message(
_sc: &IsolateStateContainer,
base: &msg::Base<'_>,
data: deno_buf,
) -> Box<OpWithError> {
assert_eq!(data.len(), 0);
let cmd_id = base.cmd_id();
let inner = base.inner_as_host_get_message().unwrap();
let rid = inner.rid();

let op = resources::get_message_from_worker(rid);
let op = op.map_err(move |_| -> DenoError { unimplemented!() });
let op = op.and_then(move |maybe_buf| -> DenoResult<Buf> {
let builder = &mut FlatBufferBuilder::new();

let data = maybe_buf.as_ref().map(|buf| builder.create_vector(buf));
let msg_inner = msg::HostGetMessageRes::create(
builder,
&msg::HostGetMessageResArgs { data },
);
Ok(serialize_response(
cmd_id,
builder,
msg::BaseArgs {
inner: Some(msg_inner.as_union_value()),
inner_type: msg::Any::HostGetMessageRes,
..Default::default()
},
))
});
Box::new(op)
}

/// Post message to guest worker as host
fn op_host_post_message(
_sc: &IsolateStateContainer,
base: &msg::Base<'_>,
data: deno_buf,
) -> Box<OpWithError> {
let cmd_id = base.cmd_id();
let inner = base.inner_as_host_post_message().unwrap();
let rid = inner.rid();

let d = Vec::from(data.as_ref()).into_boxed_slice();

let op = resources::post_message_to_worker(rid, d);
let op = op.map_err(|e| errors::new(ErrorKind::Other, e.to_string()));
let op = op.and_then(move |_| -> DenoResult<Buf> {
let builder = &mut FlatBufferBuilder::new();

Ok(serialize_response(
cmd_id,
builder,
msg::BaseArgs {
..Default::default()
},
))
});
Box::new(op)
}
5 changes: 3 additions & 2 deletions cli/resources.rs
Original file line number Diff line number Diff line change
Expand Up @@ -305,7 +305,8 @@ pub fn add_worker(wc: WorkerChannels) -> Resource {
Resource { rid }
}

pub fn worker_post_message(
/// Post message to worker as a host or privilged overlord
pub fn post_message_to_worker(
rid: ResourceId,
buf: Buf,
) -> futures::sink::Send<futures::sync::mpsc::Sender<Buf>> {
Expand Down Expand Up @@ -341,7 +342,7 @@ impl Future for WorkerReceiver {
}
}

pub fn worker_recv_message(rid: ResourceId) -> WorkerReceiver {
pub fn get_message_from_worker(rid: ResourceId) -> WorkerReceiver {
WorkerReceiver { rid }
}

Expand Down
Loading