Skip to content

Commit

Permalink
feat: support waiting for background tasks on the per_worker policy (
Browse files Browse the repository at this point in the history
…#451)

* chore: update deno manifest

* chore: update `.gitignore`

* feat: support waiting for background tasks on the `per_worker` policy

* chore: add integration tests

* chore: add an example

* chore: add `global.d.ts`
  • Loading branch information
nyannyacha authored Nov 28, 2024
1 parent 388d2ea commit 3cddc61
Show file tree
Hide file tree
Showing 17 changed files with 410 additions and 113 deletions.
3 changes: 2 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -12,4 +12,5 @@ scripts/debug.sh

node_modules/
.DS_Store
eszip.bin
eszip.bin
deno.lock
20 changes: 10 additions & 10 deletions crates/base/src/deno_runtime.rs
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ use sb_core::external_memory::CustomAllocator;
use sb_core::net::sb_core_net;
use sb_core::permissions::{sb_core_permissions, Permissions};
use sb_core::runtime::sb_core_runtime;
use sb_core::{sb_core_main_js, MemCheckWaker};
use sb_core::{sb_core_main_js, MemCheckWaker, PromiseMetrics};
use sb_env::sb_env as sb_env_op;
use sb_fs::deno_compile_fs::DenoCompileFileSystem;
use sb_graph::emitter::EmitterFactory;
Expand Down Expand Up @@ -254,6 +254,7 @@ pub struct DenoRuntime<RuntimeContext = ()> {

main_module_id: ModuleId,
maybe_inspector: Option<Inspector>,
promise_metrics: PromiseMetrics,

mem_check: Arc<MemCheck>,
waker: Arc<AtomicWaker>,
Expand Down Expand Up @@ -322,6 +323,7 @@ where
// TODO(Nyannyacha): Make sure `service_path` is an absolute path first.

let drop_token = CancellationToken::default();
let promise_metrics = PromiseMetrics::default();

let base_dir_path = std::env::current_dir().map(|p| p.join(&service_path))?;
let Ok(mut main_module_url) = Url::from_directory_path(&base_dir_path) else {
Expand Down Expand Up @@ -709,19 +711,12 @@ where

{
let main_context = js_runtime.main_context();

let op_state = js_runtime.op_state();
let mut op_state = op_state.borrow_mut();

op_state.put(dispatch_fns);
op_state.put(promise_metrics.clone());
op_state.put(GlobalMainContext(main_context));
}

let version: Option<&str> = option_env!("GIT_V_TAG");

{
let op_state_rc = js_runtime.op_state();
let mut op_state = op_state_rc.borrow_mut();

// NOTE(Andreespirela): We do this because "NODE_DEBUG" is trying to be read during
// initialization, But we need the gotham state to be up-to-date.
Expand All @@ -739,7 +734,7 @@ where
// 2: isEventsWorker
conf.is_events_worker(),
// 3: edgeRuntimeVersion
version.unwrap_or("0.1.0"),
option_env!("GIT_V_TAG").unwrap_or("0.1.0"),
// 4: denoVersion
MAYBE_DENO_VERSION
.get()
Expand Down Expand Up @@ -884,6 +879,7 @@ where

main_module_id,
maybe_inspector,
promise_metrics,

mem_check,
waker: Arc::default(),
Expand Down Expand Up @@ -1210,6 +1206,10 @@ where
self.maybe_inspector.clone()
}

pub fn promise_metrics(&self) -> PromiseMetrics {
self.promise_metrics.clone()
}

pub fn mem_check_state(&self) -> Arc<RwLock<MemCheckState>> {
self.mem_check.state.clone()
}
Expand Down
2 changes: 2 additions & 0 deletions crates/base/src/rt_worker/supervisor/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ use deno_core::v8;
use enum_as_inner::EnumAsInner;
use futures_util::task::AtomicWaker;
use log::{error, warn};
use sb_core::PromiseMetrics;
use sb_workers::context::{Timing, UserWorkerMsgs, UserWorkerRuntimeOpts};
use tokio::sync::{
mpsc::{self, UnboundedReceiver},
Expand Down Expand Up @@ -129,6 +130,7 @@ pub struct Arguments {
pub cpu_usage_metrics_rx: Option<mpsc::UnboundedReceiver<CPUUsageMetrics>>,
pub cpu_timer_param: CPUTimerParam,
pub supervisor_policy: SupervisorPolicy,
pub promise_metrics: PromiseMetrics,
pub timing: Option<Timing>,
pub memory_limit_rx: mpsc::UnboundedReceiver<()>,
pub pool_msg_tx: Option<mpsc::UnboundedSender<UserWorkerMsgs>>,
Expand Down
77 changes: 54 additions & 23 deletions crates/base/src/rt_worker/supervisor/strategy_per_worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ pub async fn supervise(args: Arguments) -> (ShutdownReason, i64) {
let Arguments {
key,
runtime_opts,
promise_metrics,
timing,
mut memory_limit_rx,
cpu_timer,
Expand Down Expand Up @@ -55,11 +56,13 @@ pub async fn supervise(args: Arguments) -> (ShutdownReason, i64) {
let is_wall_clock_limit_disabled = wall_clock_limit_ms == 0;
let mut is_worker_entered = false;
let mut is_wall_clock_beforeunload_armed = false;
let mut is_cpu_time_soft_limit_reached = false;
let mut is_termination_requested = false;
let mut have_all_reqs_been_acknowledged = false;

let mut cpu_usage_metrics_rx = cpu_usage_metrics_rx.unwrap();
let mut cpu_usage_ms = 0i64;

let mut cpu_time_soft_limit_reached = false;
let mut wall_clock_alerts = 0;
let mut req_ack_count = 0usize;

Expand Down Expand Up @@ -113,20 +116,23 @@ pub async fn supervise(args: Arguments) -> (ShutdownReason, i64) {
tokio::pin!(wall_clock_duration_alert);
tokio::pin!(wall_clock_beforeunload_alert);

loop {
let result = 'scope: loop {
tokio::select! {
_ = supervise.cancelled() => {
return (ShutdownReason::TerminationRequested, cpu_usage_ms);
break 'scope (ShutdownReason::TerminationRequested, cpu_usage_ms);
}

_ = async {
match termination.as_ref() {
Some(token) => token.inbound.cancelled().await,
None => pending().await,
}
} => {
terminate_fn();
return (ShutdownReason::TerminationRequested, cpu_usage_ms);
}, if !is_termination_requested => {
is_termination_requested = true;
if promise_metrics.have_all_promises_been_resolved() {
terminate_fn();
break 'scope (ShutdownReason::TerminationRequested, cpu_usage_ms);
}
}

Some(metrics) = cpu_usage_metrics_rx.recv() => {
Expand Down Expand Up @@ -160,17 +166,28 @@ pub async fn supervise(args: Arguments) -> (ShutdownReason, i64) {
if cpu_usage_ms >= hard_limit_ms as i64 {
terminate_fn();
error!("CPU time hard limit reached: isolate: {:?}", key);
return (ShutdownReason::CPUTime, cpu_usage_ms);
} else if cpu_usage_ms >= soft_limit_ms as i64 && !cpu_time_soft_limit_reached {
break 'scope (ShutdownReason::CPUTime, cpu_usage_ms);
} else if cpu_usage_ms >= soft_limit_ms as i64 && !is_cpu_time_soft_limit_reached {
early_retire_fn();
error!("CPU time soft limit reached: isolate: {:?}", key);
cpu_time_soft_limit_reached = true;

if req_ack_count == demand.load(Ordering::Acquire) {
is_cpu_time_soft_limit_reached = true;
have_all_reqs_been_acknowledged = req_ack_count == demand.load(Ordering::Acquire);

if have_all_reqs_been_acknowledged
&& promise_metrics.have_all_promises_been_resolved()
{
terminate_fn();
error!("early termination due to the last request being completed: isolate: {:?}", key);
return (ShutdownReason::EarlyDrop, cpu_usage_ms);
break 'scope (ShutdownReason::EarlyDrop, cpu_usage_ms);
}

} else if is_cpu_time_soft_limit_reached
&& have_all_reqs_been_acknowledged
&& promise_metrics.have_all_promises_been_resolved()
{
terminate_fn();
break 'scope (ShutdownReason::EarlyDrop, cpu_usage_ms);
}
}
}
Expand All @@ -179,42 +196,50 @@ pub async fn supervise(args: Arguments) -> (ShutdownReason, i64) {

Some(_) = wait_cpu_alarm(cpu_alarms_rx.as_mut()) => {
if is_worker_entered {
if !cpu_time_soft_limit_reached {
if !is_cpu_time_soft_limit_reached {
early_retire_fn();
error!("CPU time soft limit reached: isolate: {:?}", key);
cpu_time_soft_limit_reached = true;

if req_ack_count == demand.load(Ordering::Acquire) {
is_cpu_time_soft_limit_reached = true;
have_all_reqs_been_acknowledged = req_ack_count == demand.load(Ordering::Acquire);

if have_all_reqs_been_acknowledged
&& promise_metrics.have_all_promises_been_resolved()
{
terminate_fn();
error!("early termination due to the last request being completed: isolate: {:?}", key);
return (ShutdownReason::EarlyDrop, cpu_usage_ms);
break 'scope (ShutdownReason::EarlyDrop, cpu_usage_ms);
}
} else {
terminate_fn();
error!("CPU time hard limit reached: isolate: {:?}", key);
return (ShutdownReason::CPUTime, cpu_usage_ms);
break 'scope (ShutdownReason::CPUTime, cpu_usage_ms);
}
}
}

Some(_) = req_end_rx.recv() => {
req_ack_count += 1;
have_all_reqs_been_acknowledged = req_ack_count == demand.load(Ordering::Acquire);

if !cpu_time_soft_limit_reached {
if !is_cpu_time_soft_limit_reached {
if let Some(tx) = pool_msg_tx.clone() {
if tx.send(UserWorkerMsgs::Idle(key)).is_err() {
error!("failed to send idle msg to pool: {:?}", key);
}
}
}

if !cpu_time_soft_limit_reached || req_ack_count != demand.load(Ordering::Acquire) {
if !is_cpu_time_soft_limit_reached
|| !have_all_reqs_been_acknowledged
|| !promise_metrics.have_all_promises_been_resolved()
{
continue;
}

terminate_fn();
error!("early termination due to the last request being completed: isolate: {:?}", key);
return (ShutdownReason::EarlyDrop, cpu_usage_ms);
break 'scope (ShutdownReason::EarlyDrop, cpu_usage_ms);
}

_ = wall_clock_duration_alert.tick(), if !is_wall_clock_limit_disabled => {
Expand All @@ -229,10 +254,8 @@ pub async fn supervise(args: Arguments) -> (ShutdownReason, i64) {
let is_in_flight_req_exists = req_ack_count != demand.load(Ordering::Acquire);

terminate_fn();

error!("wall clock duration reached: isolate: {:?} (in_flight_req_exists = {})", key, is_in_flight_req_exists);

return (ShutdownReason::WallClockTime, cpu_usage_ms);
break 'scope (ShutdownReason::WallClockTime, cpu_usage_ms);
}
}

Expand All @@ -252,8 +275,16 @@ pub async fn supervise(args: Arguments) -> (ShutdownReason, i64) {
Some(_) = memory_limit_rx.recv() => {
terminate_fn();
error!("memory limit reached for the worker: isolate: {:?}", key);
return (ShutdownReason::Memory, cpu_usage_ms);
break 'scope (ShutdownReason::Memory, cpu_usage_ms);
}
}
};

match result {
(ShutdownReason::EarlyDrop, cpu_usage_ms) if is_termination_requested => {
(ShutdownReason::TerminationRequested, cpu_usage_ms)
}

result => result,
}
}
12 changes: 6 additions & 6 deletions crates/base/src/rt_worker/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -242,12 +242,12 @@ impl Worker {
},
));

if !thread_safe_handle.request_interrupt(
supervisor::v8_handle_termination,
data_ptr_mut as *mut std::ffi::c_void,
) {
drop(unsafe { Box::from_raw(data_ptr_mut) });
}
if !thread_safe_handle.request_interrupt(
supervisor::v8_handle_termination,
data_ptr_mut as *mut std::ffi::c_void,
) {
drop(unsafe { Box::from_raw(data_ptr_mut) });
}

while !is_terminated.is_raised() {
waker.wake();
Expand Down
2 changes: 2 additions & 0 deletions crates/base/src/rt_worker/worker_ctx.rs
Original file line number Diff line number Diff line change
Expand Up @@ -352,6 +352,7 @@ pub fn create_supervisor(
let _rt_guard = base_rt::SUPERVISOR_RT.enter();
let maybe_cpu_timer_inner = maybe_cpu_timer.clone();
let supervise_cancel_token_inner = supervise_cancel_token.clone();
let promise_metrics = worker_runtime.promise_metrics();

tokio::spawn(async move {
let (isolate_memory_usage_tx, isolate_memory_usage_rx) =
Expand All @@ -364,6 +365,7 @@ pub fn create_supervisor(
cpu_usage_metrics_rx,
cpu_timer_param,
supervisor_policy,
promise_metrics,
timing,
memory_limit_rx,
pool_msg_tx,
Expand Down
25 changes: 20 additions & 5 deletions crates/base/test_cases/main/index.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,20 @@
console.log('main function started');

Deno.serve(async (req: Request) => {
function parseIntFromHeadersOrDefault(req: Request, key: string, val: number) {
const headerValue = req.headers.get(key);
if (!headerValue) {
return val;
}

const parsedValue = parseInt(headerValue);
if (isNaN(parsedValue)) {
return val;
}

return parsedValue;
}

Deno.serve((req: Request) => {
console.log(req.url);
const url = new URL(req.url);
const { pathname } = url;
Expand All @@ -19,10 +33,11 @@ Deno.serve(async (req: Request) => {
console.error(`serving the request with ${servicePath}`);

const createWorker = async () => {
const memoryLimitMb = 150;
const workerTimeoutMs = 10 * 60 * 1000;
const cpuTimeSoftLimitMs = 10 * 60 * 1000;
const cpuTimeHardLimitMs = 10 * 60 * 1000;
const memoryLimitMb = parseIntFromHeadersOrDefault(req, "x-memory-limit-mb", 150);
const workerTimeoutMs = parseIntFromHeadersOrDefault(req, "x-worker-timeout-ms", 10 * 60 * 1000);
const cpuTimeSoftLimitMs = parseIntFromHeadersOrDefault(req, "x-cpu-time-soft-limit-ms", 10 * 60 * 1000);
const cpuTimeHardLimitMs = parseIntFromHeadersOrDefault(req, "x-cpu-time-hard-limit-ms", 10 * 60 * 1000);
console.log(cpuTimeSoftLimitMs);
const noModuleCache = false;
const importMapPath = null;
const envVarsObj = Deno.env.toObject();
Expand Down
43 changes: 43 additions & 0 deletions crates/base/test_cases/mark-background-task-2/index.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
function sleep(ms: number): Promise<string> {
return new Promise(res => {
setTimeout(() => {
res("meow");
}, ms)
});
}

function mySlowFunction(baseNumber: number) {
const now = Date.now();
let result = 0;
for (let i = Math.pow(baseNumber, 7); i >= 0; i--) {
result += Math.atan(i) * Math.tan(i);
}
const duration = Date.now() - now;
return { result: result, duration: duration };
}

class MyBackgroundTaskEvent extends Event {
readonly taskPromise: Promise<string>

constructor(taskPromise: Promise<string>) {
super('myBackgroundTask')
this.taskPromise = taskPromise
}
}

globalThis.addEventListener('myBackgroundTask', async (event) => {
const str = await (event as MyBackgroundTaskEvent).taskPromise
console.log(str);
});


export default {
fetch() {
// consumes lots of cpu time
mySlowFunction(10);
// however, this time we did not notify the runtime that it should wait for this promise.
// therefore, the above console.log(str) will not be output and the worker will terminate.
dispatchEvent(new MyBackgroundTaskEvent(sleep(5000)));
return new Response();
}
}
Loading

0 comments on commit 3cddc61

Please sign in to comment.