Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Minimal Worker support #1476

Merged
merged 2 commits into from
Jan 8, 2019
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions BUILD.gn
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,7 @@ ts_sources = [
"js/url.ts",
"js/url_search_params.ts",
"js/util.ts",
"js/workers.ts",
"js/write_file.ts",
"tsconfig.json",

Expand Down
6 changes: 6 additions & 0 deletions js/globals.ts
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import * as textEncoding from "./text_encoding";
import * as timers from "./timers";
import * as url from "./url";
import * as urlSearchParams from "./url_search_params";
import * as workers from "./workers";

// These imports are not exposed and therefore are fine to just import the
// symbols required.
Expand Down Expand Up @@ -86,3 +87,8 @@ window.TextEncoder = textEncoding.TextEncoder;
export type TextEncoder = textEncoding.TextEncoder;
window.TextDecoder = textEncoding.TextDecoder;
export type TextDecoder = textEncoding.TextDecoder;

window.workerMain = workers.workerMain;
// TODO These shouldn't be available in main isolate.
ry marked this conversation as resolved.
Show resolved Hide resolved
window.postMessage = workers.postMessage;
window.close = workers.workerClose;
75 changes: 75 additions & 0 deletions js/workers.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
// Copyright 2018-2019 the Deno authors. All rights reserved. MIT license.
import * as dispatch from "./dispatch";
import { libdeno } from "./libdeno";
import * as msg from "gen/msg_generated";
import * as flatbuffers from "./flatbuffers";
import { assert, log } from "./util";
import { globalEval } from "./global_eval";

export async function postMessage(data: Uint8Array): Promise<void> {
const builder = flatbuffers.createBuilder();
msg.WorkerPostMessage.startWorkerPostMessage(builder);
const inner = msg.WorkerPostMessage.endWorkerPostMessage(builder);
const baseRes = await dispatch.sendAsync(
builder,
msg.Any.WorkerPostMessage,
inner,
data
);
assert(baseRes != null);
}

export async function getMessage(): Promise<null | Uint8Array> {
log("getMessage");
const builder = flatbuffers.createBuilder();
msg.WorkerGetMessage.startWorkerGetMessage(builder);
const inner = msg.WorkerGetMessage.endWorkerGetMessage(builder);
const baseRes = await dispatch.sendAsync(
builder,
msg.Any.WorkerGetMessage,
inner
);
assert(baseRes != null);
assert(
msg.Any.WorkerGetMessageRes === baseRes!.innerType(),
ry marked this conversation as resolved.
Show resolved Hide resolved
`base.innerType() unexpectedly is ${baseRes!.innerType()}`
);
const res = new msg.WorkerGetMessageRes();
assert(baseRes!.inner(res) != null);

const dataArray = res.dataArray();
if (dataArray == null) {
return null;
} else {
return new Uint8Array(dataArray!);
}
}

let isClosing = false;

export function workerClose(): void {
isClosing = true;
}

export async function workerMain() {
log("workerMain");
libdeno.recv(dispatch.handleAsyncMsgFromRust);

// TODO avoid using globalEval to get Window. But circular imports if getting
// it from globals.ts
const window = globalEval("this");

while (!isClosing) {
const data = await getMessage();
if (data == null) {
log("workerMain got null message. quitting.");
break;
}
if (window["onmessage"]) {
const event = { data };
window.onmessage(event);
} else {
break;
}
}
}
2 changes: 1 addition & 1 deletion src/flags.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ macro_rules! svec {
}

#[cfg_attr(feature = "cargo-clippy", allow(stutter))]
#[derive(Debug, PartialEq, Default)]
#[derive(Clone, Debug, PartialEq, Default)]
pub struct DenoFlags {
pub help: bool,
pub log_debug: bool,
Expand Down
38 changes: 26 additions & 12 deletions src/isolate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ use js_errors::JSError;
use libdeno;
use permissions::DenoPermissions;

use futures::sync::mpsc as async_mpsc;
use futures::Future;
use libc::c_char;
use libc::c_void;
Expand All @@ -23,6 +24,7 @@ use std::ffi::CString;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::mpsc;
use std::sync::Arc;
use std::sync::Mutex;
use std::time::Duration;
use std::time::Instant;
use tokio;
Expand Down Expand Up @@ -53,6 +55,10 @@ pub struct Isolate {
pub state: Arc<IsolateState>,
}

pub type WorkerSender = async_mpsc::Sender<Buf>;
pub type WorkerReceiver = async_mpsc::Receiver<Buf>;
pub type WorkerChannels = (WorkerSender, WorkerReceiver);

// Isolate cannot be passed between threads but IsolateState can.
// IsolateState satisfies Send and Sync.
// So any state that needs to be accessed outside the main V8 thread should be
Expand All @@ -64,20 +70,35 @@ pub struct IsolateState {
pub permissions: DenoPermissions,
pub flags: flags::DenoFlags,
pub metrics: Metrics,
pub worker_channels: Option<Mutex<WorkerChannels>>,
}

impl IsolateState {
pub fn new(flags: flags::DenoFlags, argv_rest: Vec<String>) -> Self {
pub fn new(
flags: flags::DenoFlags,
argv_rest: Vec<String>,
worker_channels: Option<WorkerChannels>,
) -> Self {
let custom_root = env::var("DENO_DIR").map(|s| s.into()).ok();

Self {
dir: deno_dir::DenoDir::new(flags.reload, custom_root).unwrap(),
argv: argv_rest,
permissions: DenoPermissions::new(&flags),
flags,
metrics: Metrics::default(),
worker_channels: worker_channels.map(|wc| Mutex::new(wc)),
}
}

#[cfg(test)]
pub fn mock() -> Arc<IsolateState> {
let argv = vec![String::from("./deno"), String::from("hello.js")];
// For debugging: argv.push_back(String::from("-D"));
let (flags, rest_argv, _) = flags::set_flags(argv).unwrap();
Arc::new(IsolateState::new(flags, rest_argv, None))
}

#[inline]
pub fn check_write(&self, filename: &str) -> DenoResult<()> {
self.permissions.check_write(filename)
Expand Down Expand Up @@ -449,10 +470,7 @@ mod tests {

#[test]
fn test_dispatch_sync() {
let argv = vec![String::from("./deno"), String::from("hello.js")];
let (flags, rest_argv, _) = flags::set_flags(argv).unwrap();

let state = Arc::new(IsolateState::new(flags, rest_argv));
let state = IsolateState::mock();
let snapshot = libdeno::deno_buf::empty();
let isolate = Isolate::new(snapshot, state, dispatch_sync);
tokio_util::init(|| {
Expand Down Expand Up @@ -491,9 +509,7 @@ mod tests {

#[test]
fn test_metrics_sync() {
let argv = vec![String::from("./deno"), String::from("hello.js")];
let (flags, rest_argv, _) = flags::set_flags(argv).unwrap();
let state = Arc::new(IsolateState::new(flags, rest_argv));
let state = IsolateState::mock();
let snapshot = libdeno::deno_buf::empty();
let isolate = Isolate::new(snapshot, state, metrics_dispatch_sync);
tokio_util::init(|| {
Expand Down Expand Up @@ -527,9 +543,7 @@ mod tests {

#[test]
fn test_metrics_async() {
let argv = vec![String::from("./deno"), String::from("hello.js")];
let (flags, rest_argv, _) = flags::set_flags(argv).unwrap();
let state = Arc::new(IsolateState::new(flags, rest_argv));
let state = IsolateState::mock();
let snapshot = libdeno::deno_buf::empty();
let isolate = Isolate::new(snapshot, state, metrics_dispatch_async);
tokio_util::init(|| {
Expand Down Expand Up @@ -617,7 +631,7 @@ mod tests {
let argv = vec![String::from("./deno"), String::from(filename)];
let (flags, rest_argv, _) = flags::set_flags(argv).unwrap();

let state = Arc::new(IsolateState::new(flags, rest_argv));
let state = Arc::new(IsolateState::new(flags, rest_argv, None));
let snapshot = libdeno::deno_buf::empty();
let isolate = Isolate::new(snapshot, state, dispatch_sync);
tokio_util::init(|| {
Expand Down
3 changes: 2 additions & 1 deletion src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ pub mod snapshot;
mod tokio_util;
mod tokio_write;
pub mod version;
mod workers;

#[cfg(unix)]
mod eager_unix;
Expand Down Expand Up @@ -96,7 +97,7 @@ fn main() {
log::LevelFilter::Warn
});

let state = Arc::new(isolate::IsolateState::new(flags, rest_argv));
let state = Arc::new(isolate::IsolateState::new(flags, rest_argv, None));
let snapshot = snapshot::deno_snapshot();
let isolate = isolate::Isolate::new(snapshot, state, ops::dispatch);
tokio_util::init(|| {
Expand Down
15 changes: 15 additions & 0 deletions src/msg.fbs
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
union Any {
Start,
StartRes,
WorkerGetMessage,
WorkerGetMessageRes,
WorkerPostMessage,
CodeFetch,
CodeFetchRes,
CodeCache,
Expand Down Expand Up @@ -149,6 +152,18 @@ table StartRes {
v8_version: string;
}

table WorkerGetMessage {
unused: int8;
}

table WorkerGetMessageRes {
data: [ubyte];
}

table WorkerPostMessage {
// data passed thru the zero-copy data parameter.
}

table CodeFetch {
specifier: string;
referrer: string;
Expand Down
Loading