Skip to content

Commit

Permalink
Minimal Worker support (#1476)
Browse files Browse the repository at this point in the history
This adds the ability to spawn additional Isolates from Rust and send
and receive messages from them. This is preliminary work to support
running the typescript compiler in a separate isolate and thus support
native ES modules. Ref #975.
  • Loading branch information
ry authored Jan 8, 2019
1 parent 9ff6bca commit 6f79ad7
Show file tree
Hide file tree
Showing 10 changed files with 470 additions and 55 deletions.
1 change: 1 addition & 0 deletions BUILD.gn
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,7 @@ ts_sources = [
"js/url.ts",
"js/url_search_params.ts",
"js/util.ts",
"js/workers.ts",
"js/write_file.ts",
"tsconfig.json",

Expand Down
6 changes: 6 additions & 0 deletions js/globals.ts
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import * as textEncoding from "./text_encoding";
import * as timers from "./timers";
import * as url from "./url";
import * as urlSearchParams from "./url_search_params";
import * as workers from "./workers";

// These imports are not exposed and therefore are fine to just import the
// symbols required.
Expand Down Expand Up @@ -86,3 +87,8 @@ window.TextEncoder = textEncoding.TextEncoder;
export type TextEncoder = textEncoding.TextEncoder;
window.TextDecoder = textEncoding.TextDecoder;
export type TextDecoder = textEncoding.TextDecoder;

window.workerMain = workers.workerMain;
// TODO These shouldn't be available in main isolate.
window.postMessage = workers.postMessage;
window.close = workers.workerClose;
75 changes: 75 additions & 0 deletions js/workers.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
// Copyright 2018-2019 the Deno authors. All rights reserved. MIT license.
import * as dispatch from "./dispatch";
import { libdeno } from "./libdeno";
import * as msg from "gen/msg_generated";
import * as flatbuffers from "./flatbuffers";
import { assert, log } from "./util";
import { globalEval } from "./global_eval";

export async function postMessage(data: Uint8Array): Promise<void> {
const builder = flatbuffers.createBuilder();
msg.WorkerPostMessage.startWorkerPostMessage(builder);
const inner = msg.WorkerPostMessage.endWorkerPostMessage(builder);
const baseRes = await dispatch.sendAsync(
builder,
msg.Any.WorkerPostMessage,
inner,
data
);
assert(baseRes != null);
}

export async function getMessage(): Promise<null | Uint8Array> {
log("getMessage");
const builder = flatbuffers.createBuilder();
msg.WorkerGetMessage.startWorkerGetMessage(builder);
const inner = msg.WorkerGetMessage.endWorkerGetMessage(builder);
const baseRes = await dispatch.sendAsync(
builder,
msg.Any.WorkerGetMessage,
inner
);
assert(baseRes != null);
assert(
msg.Any.WorkerGetMessageRes === baseRes!.innerType(),
`base.innerType() unexpectedly is ${baseRes!.innerType()}`
);
const res = new msg.WorkerGetMessageRes();
assert(baseRes!.inner(res) != null);

const dataArray = res.dataArray();
if (dataArray == null) {
return null;
} else {
return new Uint8Array(dataArray!);
}
}

let isClosing = false;

export function workerClose(): void {
isClosing = true;
}

export async function workerMain() {
log("workerMain");
libdeno.recv(dispatch.handleAsyncMsgFromRust);

// TODO avoid using globalEval to get Window. But circular imports if getting
// it from globals.ts
const window = globalEval("this");

while (!isClosing) {
const data = await getMessage();
if (data == null) {
log("workerMain got null message. quitting.");
break;
}
if (window["onmessage"]) {
const event = { data };
window.onmessage(event);
} else {
break;
}
}
}
2 changes: 1 addition & 1 deletion src/flags.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ macro_rules! svec {
}

#[cfg_attr(feature = "cargo-clippy", allow(stutter))]
#[derive(Debug, PartialEq, Default)]
#[derive(Clone, Debug, PartialEq, Default)]
pub struct DenoFlags {
pub help: bool,
pub log_debug: bool,
Expand Down
38 changes: 26 additions & 12 deletions src/isolate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ use js_errors::JSError;
use libdeno;
use permissions::DenoPermissions;

use futures::sync::mpsc as async_mpsc;
use futures::Future;
use libc::c_char;
use libc::c_void;
Expand All @@ -23,6 +24,7 @@ use std::ffi::CString;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::mpsc;
use std::sync::Arc;
use std::sync::Mutex;
use std::time::Duration;
use std::time::Instant;
use tokio;
Expand Down Expand Up @@ -53,6 +55,10 @@ pub struct Isolate {
pub state: Arc<IsolateState>,
}

pub type WorkerSender = async_mpsc::Sender<Buf>;
pub type WorkerReceiver = async_mpsc::Receiver<Buf>;
pub type WorkerChannels = (WorkerSender, WorkerReceiver);

// Isolate cannot be passed between threads but IsolateState can.
// IsolateState satisfies Send and Sync.
// So any state that needs to be accessed outside the main V8 thread should be
Expand All @@ -64,20 +70,35 @@ pub struct IsolateState {
pub permissions: DenoPermissions,
pub flags: flags::DenoFlags,
pub metrics: Metrics,
pub worker_channels: Option<Mutex<WorkerChannels>>,
}

impl IsolateState {
pub fn new(flags: flags::DenoFlags, argv_rest: Vec<String>) -> Self {
pub fn new(
flags: flags::DenoFlags,
argv_rest: Vec<String>,
worker_channels: Option<WorkerChannels>,
) -> Self {
let custom_root = env::var("DENO_DIR").map(|s| s.into()).ok();

Self {
dir: deno_dir::DenoDir::new(flags.reload, custom_root).unwrap(),
argv: argv_rest,
permissions: DenoPermissions::new(&flags),
flags,
metrics: Metrics::default(),
worker_channels: worker_channels.map(|wc| Mutex::new(wc)),
}
}

#[cfg(test)]
pub fn mock() -> Arc<IsolateState> {
let argv = vec![String::from("./deno"), String::from("hello.js")];
// For debugging: argv.push_back(String::from("-D"));
let (flags, rest_argv, _) = flags::set_flags(argv).unwrap();
Arc::new(IsolateState::new(flags, rest_argv, None))
}

#[inline]
pub fn check_write(&self, filename: &str) -> DenoResult<()> {
self.permissions.check_write(filename)
Expand Down Expand Up @@ -451,10 +472,7 @@ mod tests {

#[test]
fn test_dispatch_sync() {
let argv = vec![String::from("./deno"), String::from("hello.js")];
let (flags, rest_argv, _) = flags::set_flags(argv).unwrap();

let state = Arc::new(IsolateState::new(flags, rest_argv));
let state = IsolateState::mock();
let snapshot = libdeno::deno_buf::empty();
let isolate = Isolate::new(snapshot, state, dispatch_sync);
tokio_util::init(|| {
Expand Down Expand Up @@ -493,9 +511,7 @@ mod tests {

#[test]
fn test_metrics_sync() {
let argv = vec![String::from("./deno"), String::from("hello.js")];
let (flags, rest_argv, _) = flags::set_flags(argv).unwrap();
let state = Arc::new(IsolateState::new(flags, rest_argv));
let state = IsolateState::mock();
let snapshot = libdeno::deno_buf::empty();
let isolate = Isolate::new(snapshot, state, metrics_dispatch_sync);
tokio_util::init(|| {
Expand Down Expand Up @@ -529,9 +545,7 @@ mod tests {

#[test]
fn test_metrics_async() {
let argv = vec![String::from("./deno"), String::from("hello.js")];
let (flags, rest_argv, _) = flags::set_flags(argv).unwrap();
let state = Arc::new(IsolateState::new(flags, rest_argv));
let state = IsolateState::mock();
let snapshot = libdeno::deno_buf::empty();
let isolate = Isolate::new(snapshot, state, metrics_dispatch_async);
tokio_util::init(|| {
Expand Down Expand Up @@ -619,7 +633,7 @@ mod tests {
let argv = vec![String::from("./deno"), String::from(filename)];
let (flags, rest_argv, _) = flags::set_flags(argv).unwrap();

let state = Arc::new(IsolateState::new(flags, rest_argv));
let state = Arc::new(IsolateState::new(flags, rest_argv, None));
let snapshot = libdeno::deno_buf::empty();
let isolate = Isolate::new(snapshot, state, dispatch_sync);
tokio_util::init(|| {
Expand Down
3 changes: 2 additions & 1 deletion src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ pub mod snapshot;
mod tokio_util;
mod tokio_write;
pub mod version;
mod workers;

#[cfg(unix)]
mod eager_unix;
Expand Down Expand Up @@ -96,7 +97,7 @@ fn main() {
log::LevelFilter::Warn
});

let state = Arc::new(isolate::IsolateState::new(flags, rest_argv));
let state = Arc::new(isolate::IsolateState::new(flags, rest_argv, None));
let snapshot = snapshot::deno_snapshot();
let isolate = isolate::Isolate::new(snapshot, state, ops::dispatch);
tokio_util::init(|| {
Expand Down
15 changes: 15 additions & 0 deletions src/msg.fbs
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
union Any {
Start,
StartRes,
WorkerGetMessage,
WorkerGetMessageRes,
WorkerPostMessage,
CodeFetch,
CodeFetchRes,
CodeCache,
Expand Down Expand Up @@ -149,6 +152,18 @@ table StartRes {
v8_version: string;
}

table WorkerGetMessage {
unused: int8;
}

table WorkerGetMessageRes {
data: [ubyte];
}

table WorkerPostMessage {
// data passed thru the zero-copy data parameter.
}

table CodeFetch {
specifier: string;
referrer: string;
Expand Down
Loading

0 comments on commit 6f79ad7

Please sign in to comment.