From 51a7c58cfda58c229333f4dd5d44306cc4ec10af Mon Sep 17 00:00:00 2001 From: afinch7 Date: Thu, 29 Aug 2019 14:04:45 -0400 Subject: [PATCH] simplify cli implementation a bit --- cli/ops/compiler.rs | 2 - cli/ops/dispatch_json.rs | 16 +-- cli/ops/dispatch_minimal.rs | 12 +- cli/ops/errors.rs | 2 - cli/ops/fetch.rs | 1 - cli/ops/files.rs | 23 +--- cli/ops/fs.rs | 16 --- cli/ops/io.rs | 23 +--- cli/ops/metrics.rs | 9 +- cli/ops/mod.rs | 244 ++++++++++++++++++++++++++---------- cli/ops/net.rs | 24 +--- cli/ops/os.rs | 27 +--- cli/ops/performance.rs | 1 - cli/ops/permissions.rs | 2 - cli/ops/process.rs | 3 - cli/ops/random.rs | 1 - cli/ops/repl.rs | 12 +- cli/ops/resources.rs | 12 +- cli/ops/timers.rs | 2 - cli/ops/workers.rs | 26 +--- cli/state.rs | 136 ++++++++++++++++---- 21 files changed, 309 insertions(+), 285 deletions(-) diff --git a/cli/ops/compiler.rs b/cli/ops/compiler.rs index 78095db2ae7ed5..52e64b80a86291 100644 --- a/cli/ops/compiler.rs +++ b/cli/ops/compiler.rs @@ -41,7 +41,6 @@ impl OpDispatcher for OpCache { Ok(JsonOp::Sync(json!({}))) }, - &self.state, control, buf, ) @@ -103,7 +102,6 @@ impl OpDispatcher for OpFetchSourceFile { "sourceCode": String::from_utf8(out.source_code).unwrap(), }))) }, - &self.state, control, buf, ) diff --git a/cli/ops/dispatch_json.rs b/cli/ops/dispatch_json.rs index 323bbe1b73b1d5..49284cfabc8dd0 100644 --- a/cli/ops/dispatch_json.rs +++ b/cli/ops/dispatch_json.rs @@ -1,5 +1,4 @@ // Copyright 2018-2019 the Deno authors. All rights reserved. MIT license. -use crate::state::ThreadSafeState; use crate::tokio_util; use deno::*; use futures::Future; @@ -46,16 +45,12 @@ struct AsyncArgs { pub fn wrap_json_op( d: D, - state: &ThreadSafeState, control: &[u8], zero_copy: Option, ) -> CoreOp where D: FnOnce(Value, Option) -> Result, { - let bytes_sent_control = control.len(); - let bytes_sent_zero_copy = zero_copy.as_ref().map(|b| b.len()).unwrap_or(0); - let async_args: AsyncArgs = serde_json::from_slice(control).unwrap(); let promise_id = async_args.promise_id; let is_sync = promise_id.is_none(); @@ -64,27 +59,20 @@ where .map_err(ErrBox::from) .and_then(move |args| d(args, zero_copy)); - let state = state.clone(); - state.metrics_op_dispatched(bytes_sent_control, bytes_sent_zero_copy); match result { Ok(JsonOp::Sync(sync_value)) => { assert!(promise_id.is_none()); - let buf = serialize_result(promise_id, Ok(sync_value)); - state.metrics_op_completed(buf.len()); - CoreOp::Sync(buf) + CoreOp::Sync(serialize_result(promise_id, Ok(sync_value))) } Ok(JsonOp::Async(fut)) => { assert!(promise_id.is_some()); let fut2 = Box::new(fut.then(move |result| -> Result { - let buf = serialize_result(promise_id, result); - state.metrics_op_completed(buf.len()); - Ok(buf) + Ok(serialize_result(promise_id, result)) })); CoreOp::Async(fut2) } Err(sync_err) => { let buf = serialize_result(promise_id, Err(sync_err)); - state.metrics_op_completed(buf.len()); if is_sync { CoreOp::Sync(buf) } else { diff --git a/cli/ops/dispatch_minimal.rs b/cli/ops/dispatch_minimal.rs index 4f5e02a24c454a..aa241fc11986c1 100644 --- a/cli/ops/dispatch_minimal.rs +++ b/cli/ops/dispatch_minimal.rs @@ -4,7 +4,6 @@ //! alternative to flatbuffers using a very simple list of int32s to lay out //! messages. The first i32 is used to determine if a message a flatbuffer //! message or a "minimal" message. -use crate::state::ThreadSafeState; use deno::Buf; use deno::CoreOp; use deno::ErrBox; @@ -73,24 +72,17 @@ fn test_parse_min_record() { pub fn wrap_minimal_op( d: D, - state: &ThreadSafeState, control: &[u8], zero_copy: Option, ) -> CoreOp where D: FnOnce(i32, Option) -> Box, { - let bytes_sent_control = control.len(); - let bytes_sent_zero_copy = zero_copy.as_ref().map(|b| b.len()).unwrap_or(0); - let mut record = parse_min_record(control).unwrap(); let is_sync = record.promise_id == 0; let rid = record.arg; let min_op = d(rid, zero_copy); - let state = state.clone(); - state.metrics_op_dispatched(bytes_sent_control, bytes_sent_zero_copy); - let fut = Box::new(min_op.then(move |result| -> Result { match result { Ok(r) => { @@ -103,9 +95,7 @@ where record.result = -1; } } - let buf: Buf = record.into(); - state.metrics_op_completed(buf.len()); - Ok(buf) + Ok(record.into()) })); if is_sync { diff --git a/cli/ops/errors.rs b/cli/ops/errors.rs index 5c32bfe210dcef..625f573be7fc4e 100644 --- a/cli/ops/errors.rs +++ b/cli/ops/errors.rs @@ -35,7 +35,6 @@ impl OpDispatcher for OpFormatError { "error": error.to_string(), }))) }, - &self.state, control, buf, ) @@ -86,7 +85,6 @@ impl OpDispatcher for OpApplySourceMap { "column": orig_column as u32, }))) }, - &self.state, control, buf, ) diff --git a/cli/ops/fetch.rs b/cli/ops/fetch.rs index 50f246bb105314..e8759af0111188 100644 --- a/cli/ops/fetch.rs +++ b/cli/ops/fetch.rs @@ -91,7 +91,6 @@ impl OpDispatcher for OpFetch { Ok(JsonOp::Async(Box::new(future))) }, - &self.state, control, buf, ) diff --git a/cli/ops/files.rs b/cli/ops/files.rs index 973e4548e1839b..c0585a0278c2d2 100644 --- a/cli/ops/files.rs +++ b/cli/ops/files.rs @@ -102,7 +102,6 @@ impl OpDispatcher for OpOpen { Ok(JsonOp::Async(Box::new(op))) } }, - &self.state, control, buf, ) @@ -115,15 +114,7 @@ impl Named for OpOpen { // Close -pub struct OpClose { - state: ThreadSafeState, -} - -impl OpClose { - pub fn new(state: ThreadSafeState) -> Self { - Self { state } - } -} +pub struct OpClose; #[derive(Deserialize)] struct CloseArgs { @@ -144,7 +135,6 @@ impl OpDispatcher for OpClose { } } }, - &self.state, control, buf, ) @@ -157,15 +147,7 @@ impl Named for OpClose { // Seek -pub struct OpSeek { - state: ThreadSafeState, -} - -impl OpSeek { - pub fn new(state: ThreadSafeState) -> Self { - Self { state } - } -} +pub struct OpSeek; #[derive(Deserialize)] #[serde(rename_all = "camelCase")] @@ -196,7 +178,6 @@ impl OpDispatcher for OpSeek { } } }, - &self.state, control, buf, ) diff --git a/cli/ops/fs.rs b/cli/ops/fs.rs index bb3f3a56f8cd2f..a3becbe412f1c8 100644 --- a/cli/ops/fs.rs +++ b/cli/ops/fs.rs @@ -41,7 +41,6 @@ impl OpDispatcher for OpChdir { std::env::set_current_dir(&args.directory)?; Ok(JsonOp::Sync(json!({}))) }, - &self.state, control, buf, ) @@ -89,7 +88,6 @@ impl OpDispatcher for OpMkdir { Ok(json!({})) }) }, - &self.state, control, buf, ) @@ -143,7 +141,6 @@ impl OpDispatcher for OpChmod { Ok(json!({})) }) }, - &self.state, control, buf, ) @@ -192,7 +189,6 @@ impl OpDispatcher for OpChown { } }) }, - &self.state, control, buf, ) @@ -247,7 +243,6 @@ impl OpDispatcher for OpRemove { Ok(json!({})) }) }, - &self.state, control, buf, ) @@ -307,7 +302,6 @@ impl OpDispatcher for OpCopyFile { Ok(json!({})) }) }, - &self.state, control, buf, ) @@ -391,7 +385,6 @@ impl OpDispatcher for OpStat { })) }) }, - &self.state, control, buf, ) @@ -457,7 +450,6 @@ impl OpDispatcher for OpReadDir { Ok(json!({ "entries": entries })) }) }, - &self.state, control, buf, ) @@ -510,7 +502,6 @@ impl OpDispatcher for OpRename { Ok(json!({})) }) }, - &self.state, control, buf, ) @@ -562,7 +553,6 @@ impl OpDispatcher for OpLink { Ok(json!({})) }) }, - &self.state, control, buf, ) @@ -620,7 +610,6 @@ impl OpDispatcher for OpSymlink { Ok(json!({})) }) }, - &self.state, control, buf, ) @@ -669,7 +658,6 @@ impl OpDispatcher for OpReadLink { Ok(json!(path_str)) }) }, - &self.state, control, buf, ) @@ -720,7 +708,6 @@ impl OpDispatcher for OpTruncate { Ok(json!({})) }) }, - &self.state, control, buf, ) @@ -781,7 +768,6 @@ impl OpDispatcher for OpMakeTempDir { Ok(json!(path_str)) }) }, - &self.state, control, buf, ) @@ -826,7 +812,6 @@ impl OpDispatcher for OpUtime { Ok(json!({})) }) }, - &self.state, control, buf, ) @@ -857,7 +842,6 @@ impl OpDispatcher for OpCwd { let path_str = path.into_os_string().into_string().unwrap(); Ok(JsonOp::Sync(json!(path_str))) }, - &self.state, control, buf, ) diff --git a/cli/ops/io.rs b/cli/ops/io.rs index 61760b54332a98..c73cdd9711238a 100644 --- a/cli/ops/io.rs +++ b/cli/ops/io.rs @@ -1,7 +1,6 @@ use super::dispatch_minimal::wrap_minimal_op; use crate::deno_error; use crate::resources; -use crate::state::ThreadSafeState; use crate::tokio_write; use deno::CoreOp; use deno::ErrBox; @@ -12,15 +11,7 @@ use futures::Future; // Read -pub struct OpRead { - state: ThreadSafeState, -} - -impl OpRead { - pub fn new(state: ThreadSafeState) -> Self { - Self { state } - } -} +pub struct OpRead; impl OpDispatcher for OpRead { fn dispatch(&self, control: &[u8], buf: Option) -> CoreOp { @@ -44,7 +35,6 @@ impl OpDispatcher for OpRead { ), } }, - &self.state, control, buf, ) @@ -57,15 +47,7 @@ impl Named for OpRead { // Write -pub struct OpWrite { - state: ThreadSafeState, -} - -impl OpWrite { - pub fn new(state: ThreadSafeState) -> Self { - Self { state } - } -} +pub struct OpWrite; impl OpDispatcher for OpWrite { fn dispatch(&self, control: &[u8], buf: Option) -> CoreOp { @@ -89,7 +71,6 @@ impl OpDispatcher for OpWrite { ), } }, - &self.state, control, buf, ) diff --git a/cli/ops/metrics.rs b/cli/ops/metrics.rs index a0891669cafa86..b246c2a59ff11d 100644 --- a/cli/ops/metrics.rs +++ b/cli/ops/metrics.rs @@ -1,17 +1,17 @@ // Copyright 2018-2019 the Deno authors. All rights reserved. MIT license. use super::dispatch_json::{wrap_json_op, JsonOp}; -use crate::state::ThreadSafeState; +use crate::state::TSMetricsState; use deno::*; use std::sync::atomic::Ordering; // Metrics pub struct OpMetrics { - state: ThreadSafeState, + state: TSMetricsState, } impl OpMetrics { - pub fn new(state: ThreadSafeState) -> Self { + pub fn new(state: TSMetricsState) -> Self { Self { state } } } @@ -20,7 +20,7 @@ impl OpDispatcher for OpMetrics { fn dispatch(&self, control: &[u8], buf: Option) -> CoreOp { wrap_json_op( move |_args, _zero_copy| { - let m = &self.state.metrics; + let m = &self.state; Ok(JsonOp::Sync(json!({ "opsDispatched": m.ops_dispatched.load(Ordering::SeqCst) as u64, @@ -30,7 +30,6 @@ impl OpDispatcher for OpMetrics { "bytesReceived": m.bytes_received.load(Ordering::SeqCst) as u64 }))) }, - &self.state, control, buf, ) diff --git a/cli/ops/mod.rs b/cli/ops/mod.rs index 0c1785ab31bb55..ce659cbbb1a15f 100644 --- a/cli/ops/mod.rs +++ b/cli/ops/mod.rs @@ -27,119 +27,237 @@ const OP_NAMESPACE: &str = "builtins"; pub fn setup_dispatcher_registry(state: ThreadSafeState) -> Arc { let registry = Arc::new(OpDisReg::new()); + let metrics_state = state.metrics.clone(); // Compiler - registry.register_op(OP_NAMESPACE, compiler::OpCache::new(state.clone())); registry.register_op( OP_NAMESPACE, - compiler::OpFetchSourceFile::new(state.clone()), + metrics_state.wrap_op(compiler::OpCache::new(state.clone())), + ); + registry.register_op( + OP_NAMESPACE, + metrics_state.wrap_op(compiler::OpFetchSourceFile::new(state.clone())), ); // Errors - registry.register_op(OP_NAMESPACE, errors::OpFormatError::new(state.clone())); - registry - .register_op(OP_NAMESPACE, errors::OpApplySourceMap::new(state.clone())); + registry.register_op( + OP_NAMESPACE, + metrics_state.wrap_op(errors::OpFormatError::new(state.clone())), + ); + registry.register_op( + OP_NAMESPACE, + metrics_state.wrap_op(errors::OpApplySourceMap::new(state.clone())), + ); // Fetch - registry.register_op(OP_NAMESPACE, fetch::OpFetch::new(state.clone())); + registry.register_op( + OP_NAMESPACE, + metrics_state.wrap_op(fetch::OpFetch::new(state.clone())), + ); // Files - registry.register_op(OP_NAMESPACE, files::OpOpen::new(state.clone())); - registry.register_op(OP_NAMESPACE, files::OpClose::new(state.clone())); - registry.register_op(OP_NAMESPACE, files::OpSeek::new(state.clone())); + registry.register_op( + OP_NAMESPACE, + metrics_state.wrap_op(files::OpOpen::new(state.clone())), + ); + registry.register_op(OP_NAMESPACE, metrics_state.wrap_op(files::OpClose)); + registry.register_op(OP_NAMESPACE, metrics_state.wrap_op(files::OpSeek)); // Fs - registry.register_op(OP_NAMESPACE, fs::OpChdir::new(state.clone())); - registry.register_op(OP_NAMESPACE, fs::OpMkdir::new(state.clone())); - registry.register_op(OP_NAMESPACE, fs::OpChmod::new(state.clone())); - registry.register_op(OP_NAMESPACE, fs::OpChown::new(state.clone())); - registry.register_op(OP_NAMESPACE, fs::OpRemove::new(state.clone())); - registry.register_op(OP_NAMESPACE, fs::OpCopyFile::new(state.clone())); - registry.register_op(OP_NAMESPACE, fs::OpStat::new(state.clone())); - registry.register_op(OP_NAMESPACE, fs::OpReadDir::new(state.clone())); - registry.register_op(OP_NAMESPACE, fs::OpRename::new(state.clone())); - registry.register_op(OP_NAMESPACE, fs::OpLink::new(state.clone())); - registry.register_op(OP_NAMESPACE, fs::OpSymlink::new(state.clone())); - registry.register_op(OP_NAMESPACE, fs::OpReadLink::new(state.clone())); - registry.register_op(OP_NAMESPACE, fs::OpTruncate::new(state.clone())); - registry.register_op(OP_NAMESPACE, fs::OpMakeTempDir::new(state.clone())); - registry.register_op(OP_NAMESPACE, fs::OpUtime::new(state.clone())); - registry.register_op(OP_NAMESPACE, fs::OpCwd::new(state.clone())); + registry.register_op( + OP_NAMESPACE, + metrics_state.wrap_op(fs::OpChdir::new(state.clone())), + ); + registry.register_op( + OP_NAMESPACE, + metrics_state.wrap_op(fs::OpMkdir::new(state.clone())), + ); + registry.register_op( + OP_NAMESPACE, + metrics_state.wrap_op(fs::OpChmod::new(state.clone())), + ); + registry.register_op( + OP_NAMESPACE, + metrics_state.wrap_op(fs::OpChown::new(state.clone())), + ); + registry.register_op( + OP_NAMESPACE, + metrics_state.wrap_op(fs::OpRemove::new(state.clone())), + ); + registry.register_op( + OP_NAMESPACE, + metrics_state.wrap_op(fs::OpCopyFile::new(state.clone())), + ); + registry.register_op( + OP_NAMESPACE, + metrics_state.wrap_op(fs::OpStat::new(state.clone())), + ); + registry.register_op( + OP_NAMESPACE, + metrics_state.wrap_op(fs::OpReadDir::new(state.clone())), + ); + registry.register_op( + OP_NAMESPACE, + metrics_state.wrap_op(fs::OpRename::new(state.clone())), + ); + registry.register_op( + OP_NAMESPACE, + metrics_state.wrap_op(fs::OpLink::new(state.clone())), + ); + registry.register_op( + OP_NAMESPACE, + metrics_state.wrap_op(fs::OpSymlink::new(state.clone())), + ); + registry.register_op( + OP_NAMESPACE, + metrics_state.wrap_op(fs::OpReadLink::new(state.clone())), + ); + registry.register_op( + OP_NAMESPACE, + metrics_state.wrap_op(fs::OpTruncate::new(state.clone())), + ); + registry.register_op( + OP_NAMESPACE, + metrics_state.wrap_op(fs::OpMakeTempDir::new(state.clone())), + ); + registry.register_op( + OP_NAMESPACE, + metrics_state.wrap_op(fs::OpUtime::new(state.clone())), + ); + registry.register_op( + OP_NAMESPACE, + metrics_state.wrap_op(fs::OpCwd::new(state.clone())), + ); // Io - registry.register_op(OP_NAMESPACE, io::OpRead::new(state.clone())); - registry.register_op(OP_NAMESPACE, io::OpWrite::new(state.clone())); + registry.register_op(OP_NAMESPACE, metrics_state.wrap_op(io::OpRead)); + registry.register_op(OP_NAMESPACE, metrics_state.wrap_op(io::OpWrite)); // Metrics - registry.register_op(OP_NAMESPACE, metrics::OpMetrics::new(state.clone())); + registry.register_op( + OP_NAMESPACE, + metrics_state.wrap_op(metrics::OpMetrics::new(metrics_state.clone())), + ); // Net - registry.register_op(OP_NAMESPACE, net::OpAccept::new(state.clone())); - registry.register_op(OP_NAMESPACE, net::OpDial::new(state.clone())); - registry.register_op(OP_NAMESPACE, net::OpShutdown::new(state.clone())); - registry.register_op(OP_NAMESPACE, net::OpListen::new(state.clone())); + registry.register_op(OP_NAMESPACE, metrics_state.wrap_op(net::OpAccept)); + registry.register_op( + OP_NAMESPACE, + metrics_state.wrap_op(net::OpDial::new(state.clone())), + ); + registry.register_op(OP_NAMESPACE, metrics_state.wrap_op(net::OpShutdown)); + registry.register_op( + OP_NAMESPACE, + metrics_state.wrap_op(net::OpListen::new(state.clone())), + ); // Os - registry.register_op(OP_NAMESPACE, os::OpStart::new(state.clone())); - registry.register_op(OP_NAMESPACE, os::OpHomeDir::new(state.clone())); - registry.register_op(OP_NAMESPACE, os::OpExecPath::new(state.clone())); - registry.register_op(OP_NAMESPACE, os::OpSetEnv::new(state.clone())); - registry.register_op(OP_NAMESPACE, os::OpEnv::new(state.clone())); - registry.register_op(OP_NAMESPACE, os::OpExit::new(state.clone())); - registry.register_op(OP_NAMESPACE, os::OpIsTty::new(state.clone())); + registry.register_op( + OP_NAMESPACE, + metrics_state.wrap_op(os::OpStart::new(state.clone())), + ); + registry.register_op( + OP_NAMESPACE, + metrics_state.wrap_op(os::OpHomeDir::new(state.clone())), + ); + registry.register_op( + OP_NAMESPACE, + metrics_state.wrap_op(os::OpExecPath::new(state.clone())), + ); + registry.register_op( + OP_NAMESPACE, + metrics_state.wrap_op(os::OpSetEnv::new(state.clone())), + ); + registry.register_op( + OP_NAMESPACE, + metrics_state.wrap_op(os::OpEnv::new(state.clone())), + ); + registry.register_op(OP_NAMESPACE, metrics_state.wrap_op(os::OpExit)); + registry.register_op(OP_NAMESPACE, metrics_state.wrap_op(os::OpIsTty)); // Performance - registry.register_op(OP_NAMESPACE, performance::OpNow::new(state.clone())); + registry.register_op( + OP_NAMESPACE, + metrics_state.wrap_op(performance::OpNow::new(state.clone())), + ); // Permissions - registry - .register_op(OP_NAMESPACE, permissions::OpPermissions::new(state.clone())); registry.register_op( OP_NAMESPACE, - permissions::OpRevokePermission::new(state.clone()), + metrics_state.wrap_op(permissions::OpPermissions::new(state.clone())), + ); + registry.register_op( + OP_NAMESPACE, + metrics_state.wrap_op(permissions::OpRevokePermission::new(state.clone())), ); // Process - registry.register_op(OP_NAMESPACE, process::OpRun::new(state.clone())); - registry.register_op(OP_NAMESPACE, process::OpRunStatus::new(state.clone())); - registry.register_op(OP_NAMESPACE, process::OpKill::new(state.clone())); + registry.register_op( + OP_NAMESPACE, + metrics_state.wrap_op(process::OpRun::new(state.clone())), + ); + registry.register_op( + OP_NAMESPACE, + metrics_state.wrap_op(process::OpRunStatus::new(state.clone())), + ); + registry.register_op( + OP_NAMESPACE, + metrics_state.wrap_op(process::OpKill::new(state.clone())), + ); // Random - registry - .register_op(OP_NAMESPACE, random::OpGetRandomValues::new(state.clone())); + registry.register_op( + OP_NAMESPACE, + metrics_state.wrap_op(random::OpGetRandomValues::new(state.clone())), + ); // Repl - registry.register_op(OP_NAMESPACE, repl::OpReplStart::new(state.clone())); - registry.register_op(OP_NAMESPACE, repl::OpReplReadline::new(state.clone())); + registry.register_op( + OP_NAMESPACE, + metrics_state.wrap_op(repl::OpReplStart::new(state.clone())), + ); + registry + .register_op(OP_NAMESPACE, metrics_state.wrap_op(repl::OpReplReadline)); // Resources registry - .register_op(OP_NAMESPACE, resources::OpResources::new(state.clone())); + .register_op(OP_NAMESPACE, metrics_state.wrap_op(resources::OpResources)); // Timers - registry - .register_op(OP_NAMESPACE, timers::OpGlobalTimerStop::new(state.clone())); - registry.register_op(OP_NAMESPACE, timers::OpGlobalTimer::new(state.clone())); + registry.register_op( + OP_NAMESPACE, + metrics_state.wrap_op(timers::OpGlobalTimerStop::new(state.clone())), + ); + registry.register_op( + OP_NAMESPACE, + metrics_state.wrap_op(timers::OpGlobalTimer::new(state.clone())), + ); // Workers registry.register_op( OP_NAMESPACE, - workers::OpWorkerGetMessage::new(state.clone()), + metrics_state.wrap_op(workers::OpWorkerGetMessage::new(state.clone())), ); registry.register_op( OP_NAMESPACE, - workers::OpWorkerPostMessage::new(state.clone()), + metrics_state.wrap_op(workers::OpWorkerPostMessage::new(state.clone())), ); - registry - .register_op(OP_NAMESPACE, workers::OpCreateWorker::new(state.clone())); registry.register_op( OP_NAMESPACE, - workers::OpHostGetWorkerClosed::new(state.clone()), + metrics_state.wrap_op(workers::OpCreateWorker::new(state.clone())), + ); + registry.register_op( + OP_NAMESPACE, + metrics_state.wrap_op(workers::OpHostGetWorkerClosed::new(state.clone())), + ); + registry.register_op( + OP_NAMESPACE, + metrics_state.wrap_op(workers::OpHostGetMessage), + ); + registry.register_op( + OP_NAMESPACE, + metrics_state.wrap_op(workers::OpHostPostMessage), ); - registry - .register_op(OP_NAMESPACE, workers::OpHostGetMessage::new(state.clone())); - registry - .register_op(OP_NAMESPACE, workers::OpHostPostMessage::new(state.clone())); registry } diff --git a/cli/ops/net.rs b/cli/ops/net.rs index 15833a64876a57..41762f14d3f32b 100644 --- a/cli/ops/net.rs +++ b/cli/ops/net.rs @@ -17,15 +17,7 @@ use tokio::net::TcpStream; // Accept -pub struct OpAccept { - state: ThreadSafeState, -} - -impl OpAccept { - pub fn new(state: ThreadSafeState) -> Self { - Self { state } - } -} +pub struct OpAccept; #[derive(Deserialize)] struct AcceptArgs { @@ -64,7 +56,6 @@ impl OpDispatcher for OpAccept { } } }, - &self.state, control, buf, ) @@ -125,7 +116,6 @@ impl OpDispatcher for OpDial { Ok(JsonOp::Async(Box::new(op))) }, - &self.state, control, buf, ) @@ -138,15 +128,7 @@ impl Named for OpDial { // Shutdown -pub struct OpShutdown { - state: ThreadSafeState, -} - -impl OpShutdown { - pub fn new(state: ThreadSafeState) -> Self { - Self { state } - } -} +pub struct OpShutdown; #[derive(Deserialize)] struct ShutdownArgs { @@ -177,7 +159,6 @@ impl OpDispatcher for OpShutdown { } } }, - &self.state, control, buf, ) @@ -228,7 +209,6 @@ impl OpDispatcher for OpListen { "localAddr": local_addr.to_string() }))) }, - &self.state, control, buf, ) diff --git a/cli/ops/os.rs b/cli/ops/os.rs index 31f1b8a8b62916..c0cd811cd1a46a 100644 --- a/cli/ops/os.rs +++ b/cli/ops/os.rs @@ -44,7 +44,6 @@ impl OpDispatcher for OpStart { "xevalDelim": state.flags.xeval_delim.clone(), }))) }, - &self.state, control, buf, ) @@ -79,7 +78,6 @@ impl OpDispatcher for OpHomeDir { .unwrap_or_default(); Ok(JsonOp::Sync(json!(path))) }, - &self.state, control, buf, ) @@ -114,7 +112,6 @@ impl OpDispatcher for OpExecPath { let path = exe_url.to_file_path().unwrap(); Ok(JsonOp::Sync(json!(path))) }, - &self.state, control, buf, ) @@ -152,7 +149,6 @@ impl OpDispatcher for OpSetEnv { env::set_var(args.key, args.value); Ok(JsonOp::Sync(json!({}))) }, - &self.state, control, buf, ) @@ -183,7 +179,6 @@ impl OpDispatcher for OpEnv { let v = env::vars().collect::>(); Ok(JsonOp::Sync(json!(v))) }, - &self.state, control, buf, ) @@ -196,15 +191,7 @@ impl Named for OpEnv { // Exit -pub struct OpExit { - state: ThreadSafeState, -} - -impl OpExit { - pub fn new(state: ThreadSafeState) -> Self { - Self { state } - } -} +pub struct OpExit; #[derive(Deserialize)] struct ExitArgs { @@ -218,7 +205,6 @@ impl OpDispatcher for OpExit { let args: ExitArgs = serde_json::from_value(args)?; std::process::exit(args.code) }, - &self.state, control, buf, ) @@ -231,15 +217,7 @@ impl Named for OpExit { // Is Tty -pub struct OpIsTty { - state: ThreadSafeState, -} - -impl OpIsTty { - pub fn new(state: ThreadSafeState) -> Self { - Self { state } - } -} +pub struct OpIsTty; impl OpDispatcher for OpIsTty { fn dispatch(&self, control: &[u8], buf: Option) -> CoreOp { @@ -251,7 +229,6 @@ impl OpDispatcher for OpIsTty { "stderr": atty::is(atty::Stream::Stderr), }))) }, - &self.state, control, buf, ) diff --git a/cli/ops/performance.rs b/cli/ops/performance.rs index 2c96ce66bf94d3..3b7378d969f614 100644 --- a/cli/ops/performance.rs +++ b/cli/ops/performance.rs @@ -39,7 +39,6 @@ impl OpDispatcher for OpNow { "subsecNanos": subsec_nanos, }))) }, - &self.state, control, buf, ) diff --git a/cli/ops/permissions.rs b/cli/ops/permissions.rs index 9e9ab0e14d50ed..fcc38d4cc9dc69 100644 --- a/cli/ops/permissions.rs +++ b/cli/ops/permissions.rs @@ -29,7 +29,6 @@ impl OpDispatcher for OpPermissions { "hrtime": state.permissions.allows_hrtime(), }))) }, - &self.state, control, buf, ) @@ -76,7 +75,6 @@ impl OpDispatcher for OpRevokePermission { Ok(JsonOp::Sync(json!({}))) }, - &self.state, control, buf, ) diff --git a/cli/ops/process.rs b/cli/ops/process.rs index b55784fbd3f957..bd63bd1a7b89b6 100644 --- a/cli/ops/process.rs +++ b/cli/ops/process.rs @@ -107,7 +107,6 @@ impl OpDispatcher for OpRun { "stderrRid": resources.stderr_rid, }))) }, - &self.state, control, buf, ) @@ -169,7 +168,6 @@ impl OpDispatcher for OpRunStatus { Ok(JsonOp::Async(Box::new(future))) }, - &self.state, control, buf, ) @@ -208,7 +206,6 @@ impl OpDispatcher for OpKill { kill(args.pid, args.signo)?; Ok(JsonOp::Sync(json!({}))) }, - &self.state, control, buf, ) diff --git a/cli/ops/random.rs b/cli/ops/random.rs index 398311fc9a638c..6c64d659e0729e 100644 --- a/cli/ops/random.rs +++ b/cli/ops/random.rs @@ -33,7 +33,6 @@ impl OpDispatcher for OpGetRandomValues { Ok(JsonOp::Sync(json!({}))) }, - &self.state, control, buf, ) diff --git a/cli/ops/repl.rs b/cli/ops/repl.rs index 992aa2cdf36421..fb1842f61d8267 100644 --- a/cli/ops/repl.rs +++ b/cli/ops/repl.rs @@ -37,7 +37,6 @@ impl OpDispatcher for OpReplStart { Ok(JsonOp::Sync(json!(resource.rid))) }, - &self.state, control, buf, ) @@ -50,15 +49,7 @@ impl Named for OpReplStart { // Repl Readline -pub struct OpReplReadline { - state: ThreadSafeState, -} - -impl OpReplReadline { - pub fn new(state: ThreadSafeState) -> Self { - Self { state } - } -} +pub struct OpReplReadline; #[derive(Deserialize)] struct ReplReadlineArgs { @@ -81,7 +72,6 @@ impl OpDispatcher for OpReplReadline { Ok(json!(line)) }) }, - &self.state, control, buf, ) diff --git a/cli/ops/resources.rs b/cli/ops/resources.rs index 2bc7d367ce5420..d825cac65b5998 100644 --- a/cli/ops/resources.rs +++ b/cli/ops/resources.rs @@ -1,20 +1,11 @@ // Copyright 2018-2019 the Deno authors. All rights reserved. MIT license. use super::dispatch_json::{wrap_json_op, JsonOp}; use crate::resources::table_entries; -use crate::state::ThreadSafeState; use deno::*; // Resources -pub struct OpResources { - state: ThreadSafeState, -} - -impl OpResources { - pub fn new(state: ThreadSafeState) -> Self { - Self { state } - } -} +pub struct OpResources; impl OpDispatcher for OpResources { fn dispatch(&self, control: &[u8], buf: Option) -> CoreOp { @@ -23,7 +14,6 @@ impl OpDispatcher for OpResources { let serialized_resources = table_entries(); Ok(JsonOp::Sync(json!(serialized_resources))) }, - &self.state, control, buf, ) diff --git a/cli/ops/timers.rs b/cli/ops/timers.rs index a86724ff7529f5..e4b452f643952e 100644 --- a/cli/ops/timers.rs +++ b/cli/ops/timers.rs @@ -27,7 +27,6 @@ impl OpDispatcher for OpGlobalTimerStop { t.cancel(); Ok(JsonOp::Sync(json!({}))) }, - &self.state, control, buf, ) @@ -70,7 +69,6 @@ impl OpDispatcher for OpGlobalTimer { Ok(JsonOp::Async(Box::new(f))) }, - &self.state, control, buf, ) diff --git a/cli/ops/workers.rs b/cli/ops/workers.rs index 7d4082cfa1435d..61fede40714aef 100644 --- a/cli/ops/workers.rs +++ b/cli/ops/workers.rs @@ -64,7 +64,6 @@ impl OpDispatcher for OpWorkerGetMessage { Ok(JsonOp::Async(Box::new(op))) }, - &self.state, control, buf, ) @@ -104,7 +103,6 @@ impl OpDispatcher for OpWorkerPostMessage { Ok(JsonOp::Sync(json!({}))) }, - &self.state, control, buf, ) @@ -200,7 +198,6 @@ impl OpDispatcher for OpCreateWorker { let result = op.wait()?; Ok(JsonOp::Sync(result)) }, - &self.state, control, buf, ) @@ -251,7 +248,6 @@ impl OpDispatcher for OpHostGetWorkerClosed { Ok(JsonOp::Async(Box::new(op))) }, - &self.state, control, buf, ) @@ -265,15 +261,7 @@ impl Named for OpHostGetWorkerClosed { // Host Get Message /// Get message from guest worker as host -pub struct OpHostGetMessage { - state: ThreadSafeState, -} - -impl OpHostGetMessage { - pub fn new(state: ThreadSafeState) -> Self { - Self { state } - } -} +pub struct OpHostGetMessage; #[derive(Deserialize)] struct HostGetMessageArgs { @@ -297,7 +285,6 @@ impl OpDispatcher for OpHostGetMessage { Ok(JsonOp::Async(Box::new(op))) }, - &self.state, control, buf, ) @@ -311,15 +298,7 @@ impl Named for OpHostGetMessage { // Host Post Message /// Post message to guest worker as host -pub struct OpHostPostMessage { - state: ThreadSafeState, -} - -impl OpHostPostMessage { - pub fn new(state: ThreadSafeState) -> Self { - Self { state } - } -} +pub struct OpHostPostMessage; #[derive(Deserialize)] struct HostPostMessageArgs { @@ -342,7 +321,6 @@ impl OpDispatcher for OpHostPostMessage { Ok(JsonOp::Sync(json!({}))) }, - &self.state, control, buf, ) diff --git a/cli/state.rs b/cli/state.rs index 09afd758b9cf0f..92e1979b28f429 100644 --- a/cli/state.rs +++ b/cli/state.rs @@ -16,9 +16,14 @@ use crate::resources; use crate::resources::ResourceId; use crate::worker::Worker; use deno::Buf; +use deno::CoreOp; use deno::ErrBox; use deno::Loader; use deno::ModuleSpecifier; +use deno::Named; +use deno::Op; +use deno::OpDispatcher; +use deno::PinnedBuf; use futures::future::Shared; use futures::Future; use rand::rngs::StdRng; @@ -40,7 +45,7 @@ pub type WorkerChannels = (WorkerSender, WorkerReceiver); pub type UserWorkerTable = HashMap>; #[derive(Default)] -pub struct Metrics { +pub struct MetricsState { pub ops_dispatched: AtomicUsize, pub ops_completed: AtomicUsize, pub bytes_sent_control: AtomicUsize, @@ -50,6 +55,107 @@ pub struct Metrics { pub compiler_starts: AtomicUsize, } +/// Thread safe metrics state +#[derive(Default)] +pub struct TSMetricsState(Arc); + +impl Clone for TSMetricsState { + fn clone(&self) -> Self { + TSMetricsState(self.0.clone()) + } +} + +impl Deref for TSMetricsState { + type Target = Arc; + fn deref(&self) -> &Self::Target { + &self.0 + } +} + +impl TSMetricsState { + pub fn wrap_op(&self, d: D) -> MetricsOpWrapper + where + D: OpDispatcher + Named, + { + MetricsOpWrapper::new(d, self.clone()) + } + + pub fn metrics_op_dispatched( + &self, + bytes_sent_control: usize, + bytes_sent_data: usize, + ) { + self.ops_dispatched.fetch_add(1, Ordering::SeqCst); + self + .bytes_sent_control + .fetch_add(bytes_sent_control, Ordering::SeqCst); + self + .bytes_sent_data + .fetch_add(bytes_sent_data, Ordering::SeqCst); + } + + pub fn metrics_op_completed(&self, bytes_received: usize) { + self.ops_completed.fetch_add(1, Ordering::SeqCst); + self + .bytes_received + .fetch_add(bytes_received, Ordering::SeqCst); + } +} + +pub struct MetricsOpWrapper { + inner: D, + metrics_state: TSMetricsState, +} + +impl MetricsOpWrapper +where + D: OpDispatcher + Named, +{ + pub fn new(d: D, metrics_state: TSMetricsState) -> Self { + Self { + inner: d, + metrics_state, + } + } +} + +impl OpDispatcher for MetricsOpWrapper +where + D: OpDispatcher + Named, +{ + fn dispatch(&self, control: &[u8], zero_copy: Option) -> CoreOp { + let bytes_sent_control = control.len(); + let bytes_sent_zero_copy = zero_copy.as_ref().map(|b| b.len()).unwrap_or(0); + + let op = self.inner.dispatch(control, zero_copy); + + self + .metrics_state + .metrics_op_dispatched(bytes_sent_control, bytes_sent_zero_copy); + let state = self.metrics_state.clone(); + match op { + Op::Sync(buf) => { + state.metrics_op_completed(buf.len()); + Op::Sync(buf) + } + Op::Async(fut) => { + let fut_final = Box::new(fut.and_then(move |buf| { + state.metrics_op_completed(buf.len()); + Ok(buf) + })); + Op::Async(fut_final) + } + } + } +} + +impl Named for MetricsOpWrapper +where + D: OpDispatcher + Named, +{ + const NAME: &'static str = D::NAME; +} + /// Isolate cannot be passed between threads but ThreadSafeState can. /// ThreadSafeState satisfies Send and Sync. So any state that needs to be /// accessed outside the main V8 thread should be inside ThreadSafeState. @@ -66,7 +172,7 @@ pub struct State { /// When flags contains a `.import_map_path` option, the content of the /// import map file will be resolved and set. pub import_map: Option, - pub metrics: Metrics, + pub metrics: TSMetricsState, pub worker_channels: Mutex, pub global_timer: Mutex, pub workers: Mutex, @@ -201,7 +307,7 @@ impl ThreadSafeState { permissions: DenoPermissions::from_flags(&flags), flags, import_map, - metrics: Metrics::default(), + metrics: TSMetricsState::default(), worker_channels: Mutex::new(internal_channels), global_timer: Mutex::new(GlobalTimer::new()), workers: Mutex::new(UserWorkerTable::new()), @@ -320,30 +426,6 @@ impl ThreadSafeState { ) .unwrap() } - - pub fn metrics_op_dispatched( - &self, - bytes_sent_control: usize, - bytes_sent_data: usize, - ) { - self.metrics.ops_dispatched.fetch_add(1, Ordering::SeqCst); - self - .metrics - .bytes_sent_control - .fetch_add(bytes_sent_control, Ordering::SeqCst); - self - .metrics - .bytes_sent_data - .fetch_add(bytes_sent_data, Ordering::SeqCst); - } - - pub fn metrics_op_completed(&self, bytes_received: usize) { - self.metrics.ops_completed.fetch_add(1, Ordering::SeqCst); - self - .metrics - .bytes_received - .fetch_add(bytes_received, Ordering::SeqCst); - } } #[test]