Skip to content

Commit

Permalink
implement in cli
Browse files Browse the repository at this point in the history
  • Loading branch information
afinch7 committed Aug 28, 2019
1 parent 9512b2f commit 25de668
Show file tree
Hide file tree
Showing 28 changed files with 2,806 additions and 1,562 deletions.
1 change: 1 addition & 0 deletions cli/BUILD.gn
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,7 @@ ts_sources = [
"../js/mkdir.ts",
"../js/mock_builtin.js",
"../js/net.ts",
"../js/ops.ts",
"../js/os.ts",
"../js/performance.ts",
"../js/permissions.ts",
Expand Down
133 changes: 90 additions & 43 deletions cli/ops/compiler.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,21 @@
// Copyright 2018-2019 the Deno authors. All rights reserved. MIT license.
use super::dispatch_json::{Deserialize, JsonOp, Value};
use super::dispatch_json::{wrap_json_op, Deserialize, JsonOp};
use crate::state::ThreadSafeState;
use crate::tokio_util;
use deno::*;

// Cache

pub struct OpCache {
state: ThreadSafeState,
}

impl OpCache {
pub fn new(state: ThreadSafeState) -> Self {
Self { state }
}
}

#[derive(Deserialize)]
#[serde(rename_all = "camelCase")]
struct CacheArgs {
Expand All @@ -12,23 +24,44 @@ struct CacheArgs {
extension: String,
}

pub fn op_cache(
state: &ThreadSafeState,
args: Value,
_zero_copy: Option<PinnedBuf>,
) -> Result<JsonOp, ErrBox> {
let args: CacheArgs = serde_json::from_value(args)?;
impl OpDispatcher for OpCache {
fn dispatch(&self, control: &[u8], buf: Option<PinnedBuf>) -> CoreOp {
wrap_json_op(
move |args, _zero_copy| {
let args: CacheArgs = serde_json::from_value(args)?;

let module_specifier = ModuleSpecifier::resolve_url(&args.module_id)
.expect("Should be valid module specifier");

self.state.ts_compiler.cache_compiler_output(
&module_specifier,
&args.extension,
&args.contents,
)?;

Ok(JsonOp::Sync(json!({})))
},
&self.state,
control,
buf,
)
}
}

impl Named for OpCache {
const NAME: &'static str = "cache";
}

let module_specifier = ModuleSpecifier::resolve_url(&args.module_id)
.expect("Should be valid module specifier");
// Fetch Source File

state.ts_compiler.cache_compiler_output(
&module_specifier,
&args.extension,
&args.contents,
)?;
pub struct OpFetchSourceFile {
state: ThreadSafeState,
}

Ok(JsonOp::Sync(json!({})))
impl OpFetchSourceFile {
pub fn new(state: ThreadSafeState) -> Self {
Self { state }
}
}

#[derive(Deserialize)]
Expand All @@ -37,32 +70,46 @@ struct FetchSourceFileArgs {
referrer: String,
}

pub fn op_fetch_source_file(
state: &ThreadSafeState,
args: Value,
_zero_copy: Option<PinnedBuf>,
) -> Result<JsonOp, ErrBox> {
let args: FetchSourceFileArgs = serde_json::from_value(args)?;

// TODO(ry) Maybe a security hole. Only the compiler worker should have access
// to this. Need a test to demonstrate the hole.
let is_dyn_import = false;

let resolved_specifier =
state.resolve(&args.specifier, &args.referrer, false, is_dyn_import)?;

let fut = state
.file_fetcher
.fetch_source_file_async(&resolved_specifier);

// WARNING: Here we use tokio_util::block_on() which starts a new Tokio
// runtime for executing the future. This is so we don't inadvernently run
// out of threads in the main runtime.
let out = tokio_util::block_on(fut)?;
Ok(JsonOp::Sync(json!({
"moduleName": out.url.to_string(),
"filename": out.filename.to_str().unwrap(),
"mediaType": out.media_type as i32,
"sourceCode": String::from_utf8(out.source_code).unwrap(),
})))
impl OpDispatcher for OpFetchSourceFile {
fn dispatch(&self, control: &[u8], buf: Option<PinnedBuf>) -> CoreOp {
wrap_json_op(
move |args, _zero_copy| {
let args: FetchSourceFileArgs = serde_json::from_value(args)?;

// TODO(ry) Maybe a security hole. Only the compiler worker should have access
// to this. Need a test to demonstrate the hole.
let is_dyn_import = false;

let resolved_specifier = self.state.resolve(
&args.specifier,
&args.referrer,
false,
is_dyn_import,
)?;

let fut = self
.state
.file_fetcher
.fetch_source_file_async(&resolved_specifier);

// WARNING: Here we use tokio_util::block_on() which starts a new Tokio
// runtime for executing the future. This is so we don't inadvernently run
// out of threads in the main runtime.
let out = tokio_util::block_on(fut)?;
Ok(JsonOp::Sync(json!({
"moduleName": out.url.to_string(),
"filename": out.filename.to_str().unwrap(),
"mediaType": out.media_type as i32,
"sourceCode": String::from_utf8(out.source_code).unwrap(),
})))
},
&self.state,
control,
buf,
)
}
}

impl Named for OpFetchSourceFile {
const NAME: &'static str = "fetchSourceFile";
}
32 changes: 20 additions & 12 deletions cli/ops/dispatch_json.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,6 @@ fn json_err(err: ErrBox) -> Value {
})
}

pub type Dispatcher = fn(
state: &ThreadSafeState,
args: Value,
zero_copy: Option<PinnedBuf>,
) -> Result<JsonOp, ErrBox>;

fn serialize_result(
promise_id: Option<u64>,
result: Result<Value, ErrBox>,
Expand All @@ -50,33 +44,47 @@ struct AsyncArgs {
promise_id: Option<u64>,
}

pub fn dispatch(
d: Dispatcher,
pub fn wrap_json_op<D>(
d: D,
state: &ThreadSafeState,
control: &[u8],
zero_copy: Option<PinnedBuf>,
) -> CoreOp {
) -> CoreOp
where
D: FnOnce(Value, Option<PinnedBuf>) -> Result<JsonOp, ErrBox>,
{
let bytes_sent_control = control.len();
let bytes_sent_zero_copy = zero_copy.as_ref().map(|b| b.len()).unwrap_or(0);

let async_args: AsyncArgs = serde_json::from_slice(control).unwrap();
let promise_id = async_args.promise_id;
let is_sync = promise_id.is_none();

let result = serde_json::from_slice(control)
.map_err(ErrBox::from)
.and_then(move |args| d(state, args, zero_copy));
.and_then(move |args| d(args, zero_copy));

let state = state.clone();
state.metrics_op_dispatched(bytes_sent_control, bytes_sent_zero_copy);
match result {
Ok(JsonOp::Sync(sync_value)) => {
assert!(promise_id.is_none());
CoreOp::Sync(serialize_result(promise_id, Ok(sync_value)))
let buf = serialize_result(promise_id, Ok(sync_value));
state.metrics_op_completed(buf.len());
CoreOp::Sync(buf)
}
Ok(JsonOp::Async(fut)) => {
assert!(promise_id.is_some());
let fut2 = Box::new(fut.then(move |result| -> Result<Buf, ()> {
Ok(serialize_result(promise_id, result))
let buf = serialize_result(promise_id, result);
state.metrics_op_completed(buf.len());
Ok(buf)
}));
CoreOp::Async(fut2)
}
Err(sync_err) => {
let buf = serialize_result(promise_id, Err(sync_err));
state.metrics_op_completed(buf.len());
if is_sync {
CoreOp::Sync(buf)
} else {
Expand Down
22 changes: 16 additions & 6 deletions cli/ops/dispatch_minimal.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ use deno::PinnedBuf;
use futures::Future;

pub type MinimalOp = dyn Future<Item = i32, Error = ErrBox> + Send;
pub type Dispatcher = fn(i32, Option<PinnedBuf>) -> Box<MinimalOp>;

#[derive(Copy, Clone, Debug, PartialEq)]
// This corresponds to RecordMinimal on the TS side.
Expand Down Expand Up @@ -72,17 +71,26 @@ fn test_parse_min_record() {
assert_eq!(parse_min_record(&buf), None);
}

pub fn dispatch(
d: Dispatcher,
_state: &ThreadSafeState,
pub fn wrap_minimal_op<D>(
d: D,
state: &ThreadSafeState,
control: &[u8],
zero_copy: Option<PinnedBuf>,
) -> CoreOp {
) -> CoreOp
where
D: FnOnce(i32, Option<PinnedBuf>) -> Box<MinimalOp>,
{
let bytes_sent_control = control.len();
let bytes_sent_zero_copy = zero_copy.as_ref().map(|b| b.len()).unwrap_or(0);

let mut record = parse_min_record(control).unwrap();
let is_sync = record.promise_id == 0;
let rid = record.arg;
let min_op = d(rid, zero_copy);

let state = state.clone();
state.metrics_op_dispatched(bytes_sent_control, bytes_sent_zero_copy);

let fut = Box::new(min_op.then(move |result| -> Result<Buf, ()> {
match result {
Ok(r) => {
Expand All @@ -95,7 +103,9 @@ pub fn dispatch(
record.result = -1;
}
}
Ok(record.into())
let buf: Buf = record.into();
state.metrics_op_completed(buf.len());
Ok(buf)
}));

if is_sync {
Expand Down
Loading

0 comments on commit 25de668

Please sign in to comment.