Skip to content

Commit

Permalink
Organize dispatch a bit (#2796)
Browse files Browse the repository at this point in the history
Just some clean up reorganization around flatbuffer/minimal dispatch
code. This is prep for adding a JSON dispatcher.
  • Loading branch information
ry authored Aug 22, 2019
1 parent b764d1b commit bdc97b3
Show file tree
Hide file tree
Showing 65 changed files with 662 additions and 753 deletions.
1 change: 1 addition & 0 deletions cli/BUILD.gn
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@ ts_sources = [
"../js/diagnostics.ts",
"../js/dir.ts",
"../js/dispatch.ts",
"../js/dispatch_flatbuffers.ts",
"../js/dispatch_minimal.ts",
"../js/dom_types.ts",
"../js/dom_util.ts",
Expand Down
8 changes: 3 additions & 5 deletions cli/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ pub mod deno_dir;
pub mod deno_error;
pub mod diagnostics;
mod disk_cache;
mod dispatch_minimal;
mod file_fetcher;
pub mod flags;
pub mod fmt_errors;
Expand Down Expand Up @@ -114,10 +113,9 @@ fn create_worker_and_state(
}
});
// TODO(kevinkassimo): maybe make include_deno_namespace also configurable?
let state =
ThreadSafeState::new(flags, argv, ops::op_selector_std, progress, true)
.map_err(print_err_and_exit)
.unwrap();
let state = ThreadSafeState::new(flags, argv, progress, true)
.map_err(print_err_and_exit)
.unwrap();
let worker = Worker::new(
"main".to_string(),
startup_data::deno_isolate_init(),
Expand Down
6 changes: 2 additions & 4 deletions cli/ops/compiler.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,8 @@
// Copyright 2018-2019 the Deno authors. All rights reserved. MIT license.
use super::dispatch_flatbuffers::serialize_response;
use super::utils::*;
use crate::deno_error;
use crate::msg;
use crate::ops::empty_buf;
use crate::ops::ok_buf;
use crate::ops::serialize_response;
use crate::ops::CliOpResult;
use crate::state::ThreadSafeState;
use crate::tokio_util;
use deno::*;
Expand Down
216 changes: 216 additions & 0 deletions cli/ops/dispatch_flatbuffers.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,216 @@
use super::utils::CliOpResult;
use crate::deno_error::GetErrorKind;
use crate::msg;
use crate::state::ThreadSafeState;
use deno::*;
use flatbuffers::FlatBufferBuilder;
use hyper::rt::Future;

use super::compiler::{op_cache, op_fetch_source_file};
use super::errors::{op_apply_source_map, op_format_error};
use super::fetch::op_fetch;
use super::files::{op_close, op_open, op_read, op_seek, op_write};
use super::fs::{
op_chdir, op_chmod, op_chown, op_copy_file, op_cwd, op_link,
op_make_temp_dir, op_mkdir, op_read_dir, op_read_link, op_remove, op_rename,
op_stat, op_symlink, op_truncate, op_utime,
};
use super::metrics::op_metrics;
use super::net::{op_accept, op_dial, op_listen, op_shutdown};
use super::os::{
op_env, op_exec_path, op_exit, op_home_dir, op_is_tty, op_set_env, op_start,
};
use super::performance::op_now;
use super::permissions::{op_permissions, op_revoke_permission};
use super::process::{op_kill, op_run, op_run_status};
use super::random::op_get_random_values;
use super::repl::{op_repl_readline, op_repl_start};
use super::resources::op_resources;
use super::timers::{op_global_timer, op_global_timer_stop};
use super::workers::{
op_create_worker, op_host_get_message, op_host_get_worker_closed,
op_host_post_message, op_worker_get_message, op_worker_post_message,
};

type CliDispatchFn = fn(
state: &ThreadSafeState,
base: &msg::Base<'_>,
data: Option<PinnedBuf>,
) -> CliOpResult;

/// Processes raw messages from JavaScript.
/// This functions invoked every time Deno.core.dispatch() is called.
/// control corresponds to the first argument of Deno.core.dispatch().
/// data corresponds to the second argument of Deno.core.dispatch().
pub fn dispatch(
state: &ThreadSafeState,
control: &[u8],
zero_copy: Option<PinnedBuf>,
) -> CoreOp {
let base = msg::get_root_as_base(&control);
let inner_type = base.inner_type();
let is_sync = base.sync();
let cmd_id = base.cmd_id();

debug!(
"msg_from_js {} sync {}",
msg::enum_name_any(inner_type),
is_sync
);

let op_func: CliDispatchFn = match op_selector_std(inner_type) {
Some(v) => v,
None => panic!("Unhandled message {}", msg::enum_name_any(inner_type)),
};

let op_result = op_func(state, &base, zero_copy);

let state = state.clone();

match op_result {
Ok(Op::Sync(buf)) => {
state.metrics_op_completed(buf.len());
Op::Sync(buf)
}
Ok(Op::Async(fut)) => {
let result_fut = Box::new(
fut
.or_else(move |err: ErrBox| -> Result<Buf, ()> {
debug!("op err {}", err);
// No matter whether we got an Err or Ok, we want a serialized message to
// send back. So transform the DenoError into a Buf.
let builder = &mut FlatBufferBuilder::new();
let errmsg_offset = builder.create_string(&format!("{}", err));
Ok(serialize_response(
cmd_id,
builder,
msg::BaseArgs {
error: Some(errmsg_offset),
error_kind: err.kind(),
..Default::default()
},
))
})
.and_then(move |buf: Buf| -> Result<Buf, ()> {
// Handle empty responses. For sync responses we just want
// to send null. For async we want to send a small message
// with the cmd_id.
let buf = if buf.len() > 0 {
buf
} else {
let builder = &mut FlatBufferBuilder::new();
serialize_response(
cmd_id,
builder,
msg::BaseArgs {
..Default::default()
},
)
};
state.metrics_op_completed(buf.len());
Ok(buf)
})
.map_err(|err| panic!("unexpected error {:?}", err)),
);
Op::Async(result_fut)
}
Err(err) => {
debug!("op err {}", err);
// No matter whether we got an Err or Ok, we want a serialized message to
// send back. So transform the DenoError into a Buf.
let builder = &mut FlatBufferBuilder::new();
let errmsg_offset = builder.create_string(&format!("{}", err));
let response_buf = serialize_response(
cmd_id,
builder,
msg::BaseArgs {
error: Some(errmsg_offset),
error_kind: err.kind(),
..Default::default()
},
);
state.metrics_op_completed(response_buf.len());
Op::Sync(response_buf)
}
}
}

pub fn serialize_response(
cmd_id: u32,
builder: &mut FlatBufferBuilder<'_>,
mut args: msg::BaseArgs<'_>,
) -> Buf {
args.cmd_id = cmd_id;
let base = msg::Base::create(builder, &args);
msg::finish_base_buffer(builder, base);
let data = builder.finished_data();
// println!("serialize_response {:x?}", data);
data.into()
}

/// Standard ops set for most isolates
pub fn op_selector_std(inner_type: msg::Any) -> Option<CliDispatchFn> {
match inner_type {
msg::Any::Accept => Some(op_accept),
msg::Any::ApplySourceMap => Some(op_apply_source_map),
msg::Any::Cache => Some(op_cache),
msg::Any::Chdir => Some(op_chdir),
msg::Any::Chmod => Some(op_chmod),
msg::Any::Chown => Some(op_chown),
msg::Any::Close => Some(op_close),
msg::Any::CopyFile => Some(op_copy_file),
msg::Any::CreateWorker => Some(op_create_worker),
msg::Any::Cwd => Some(op_cwd),
msg::Any::Dial => Some(op_dial),
msg::Any::Environ => Some(op_env),
msg::Any::ExecPath => Some(op_exec_path),
msg::Any::Exit => Some(op_exit),
msg::Any::Fetch => Some(op_fetch),
msg::Any::FetchSourceFile => Some(op_fetch_source_file),
msg::Any::FormatError => Some(op_format_error),
msg::Any::GetRandomValues => Some(op_get_random_values),
msg::Any::GlobalTimer => Some(op_global_timer),
msg::Any::GlobalTimerStop => Some(op_global_timer_stop),
msg::Any::HostGetMessage => Some(op_host_get_message),
msg::Any::HostGetWorkerClosed => Some(op_host_get_worker_closed),
msg::Any::HostPostMessage => Some(op_host_post_message),
msg::Any::IsTTY => Some(op_is_tty),
msg::Any::Kill => Some(op_kill),
msg::Any::Link => Some(op_link),
msg::Any::Listen => Some(op_listen),
msg::Any::MakeTempDir => Some(op_make_temp_dir),
msg::Any::Metrics => Some(op_metrics),
msg::Any::Mkdir => Some(op_mkdir),
msg::Any::Now => Some(op_now),
msg::Any::Open => Some(op_open),
msg::Any::PermissionRevoke => Some(op_revoke_permission),
msg::Any::Permissions => Some(op_permissions),
msg::Any::Read => Some(op_read),
msg::Any::ReadDir => Some(op_read_dir),
msg::Any::Readlink => Some(op_read_link),
msg::Any::Remove => Some(op_remove),
msg::Any::Rename => Some(op_rename),
msg::Any::ReplReadline => Some(op_repl_readline),
msg::Any::ReplStart => Some(op_repl_start),
msg::Any::Resources => Some(op_resources),
msg::Any::Run => Some(op_run),
msg::Any::RunStatus => Some(op_run_status),
msg::Any::Seek => Some(op_seek),
msg::Any::SetEnv => Some(op_set_env),
msg::Any::Shutdown => Some(op_shutdown),
msg::Any::Start => Some(op_start),
msg::Any::Stat => Some(op_stat),
msg::Any::Symlink => Some(op_symlink),
msg::Any::Truncate => Some(op_truncate),
msg::Any::HomeDir => Some(op_home_dir),
msg::Any::Utime => Some(op_utime),
msg::Any::Write => Some(op_write),

// TODO(ry) split these out so that only the appropriate Workers can access
// them.
msg::Any::WorkerGetMessage => Some(op_worker_get_message),
msg::Any::WorkerPostMessage => Some(op_worker_post_message),

_ => None,
}
}
76 changes: 14 additions & 62 deletions cli/dispatch_minimal.rs → cli/ops/dispatch_minimal.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,13 @@
use crate::state::ThreadSafeState;
use deno::Buf;
use deno::CoreOp;
use deno::ErrBox;
use deno::Op;
use deno::OpId;
use deno::PinnedBuf;
use futures::Future;

const OP_READ: OpId = 1;
const OP_WRITE: OpId = 2;
pub type MinimalOp = dyn Future<Item = i32, Error = ErrBox> + Send;
pub type Dispatcher = fn(i32, Option<PinnedBuf>) -> Box<MinimalOp>;

#[derive(Copy, Clone, Debug, PartialEq)]
// This corresponds to RecordMinimal on the TS side.
Expand Down Expand Up @@ -72,21 +72,24 @@ fn test_parse_min_record() {
assert_eq!(parse_min_record(&buf), None);
}

pub fn dispatch_minimal(
pub fn dispatch(
d: Dispatcher,
state: &ThreadSafeState,
op_id: OpId,
mut record: Record,
control: &[u8],
zero_copy: Option<PinnedBuf>,
) -> CoreOp {
let mut record = parse_min_record(control).unwrap();
let is_sync = record.promise_id == 0;
let min_op = match op_id {
OP_READ => ops::read(record.arg, zero_copy),
OP_WRITE => ops::write(record.arg, zero_copy),
_ => unimplemented!(),
};

// TODO(ry) Currently there aren't any sync minimal ops. This is just a sanity
// check. Remove later.
assert!(!is_sync);

let state = state.clone();

let rid = record.arg;
let min_op = d(rid, zero_copy);

let fut = Box::new(min_op.then(move |result| -> Result<Buf, ()> {
match result {
Ok(r) => {
Expand All @@ -109,54 +112,3 @@ pub fn dispatch_minimal(
Op::Async(fut)
}
}

mod ops {
use crate::deno_error;
use crate::resources;
use crate::tokio_write;
use deno::ErrBox;
use deno::PinnedBuf;
use futures::Future;

type MinimalOp = dyn Future<Item = i32, Error = ErrBox> + Send;

pub fn read(rid: i32, zero_copy: Option<PinnedBuf>) -> Box<MinimalOp> {
debug!("read rid={}", rid);
let zero_copy = match zero_copy {
None => {
return Box::new(
futures::future::err(deno_error::no_buffer_specified()),
)
}
Some(buf) => buf,
};
match resources::lookup(rid as u32) {
None => Box::new(futures::future::err(deno_error::bad_resource())),
Some(resource) => Box::new(
tokio::io::read(resource, zero_copy)
.map_err(ErrBox::from)
.and_then(move |(_resource, _buf, nread)| Ok(nread as i32)),
),
}
}

pub fn write(rid: i32, zero_copy: Option<PinnedBuf>) -> Box<MinimalOp> {
debug!("write rid={}", rid);
let zero_copy = match zero_copy {
None => {
return Box::new(
futures::future::err(deno_error::no_buffer_specified()),
)
}
Some(buf) => buf,
};
match resources::lookup(rid as u32) {
None => Box::new(futures::future::err(deno_error::bad_resource())),
Some(resource) => Box::new(
tokio_write::write(resource, zero_copy)
.map_err(ErrBox::from)
.and_then(move |(_resource, _buf, nwritten)| Ok(nwritten as i32)),
),
}
}
}
5 changes: 2 additions & 3 deletions cli/ops/errors.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,9 @@
// Copyright 2018-2019 the Deno authors. All rights reserved. MIT license.
use super::dispatch_flatbuffers::serialize_response;
use super::utils::*;
use crate::deno_error;
use crate::fmt_errors::JSError;
use crate::msg;
use crate::ops::ok_buf;
use crate::ops::serialize_response;
use crate::ops::CliOpResult;
use crate::source_maps::get_orig_position;
use crate::source_maps::CachedMaps;
use crate::state::ThreadSafeState;
Expand Down
4 changes: 2 additions & 2 deletions cli/ops/fetch.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
// Copyright 2018-2019 the Deno authors. All rights reserved. MIT license.
use super::dispatch_flatbuffers::serialize_response;
use super::utils::CliOpResult;
use crate::http_util;
use crate::msg;
use crate::msg_util;
use crate::ops::serialize_response;
use crate::ops::CliOpResult;
use crate::resources;
use crate::state::ThreadSafeState;
use deno::*;
Expand Down
Loading

0 comments on commit bdc97b3

Please sign in to comment.