Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: beforeunload event didn't trigger on the early drop state #455

Merged
merged 3 commits into from
Nov 29, 2024
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions crates/base/src/deno_runtime.rs
Original file line number Diff line number Diff line change
Expand Up @@ -269,6 +269,7 @@ pub enum WillTerminateReason {
CPU,
Memory,
WallClock,
EarlyDrop,
}

pub struct DenoRuntime<RuntimeContext = DefaultRuntimeContext> {
Expand Down
89 changes: 54 additions & 35 deletions crates/base/src/rt_worker/supervisor/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,40 +26,6 @@ use crate::{

use super::{worker_ctx::TerminationToken, worker_pool::SupervisorPolicy};

#[repr(C)]
pub struct V8HandleTerminationData {
pub should_terminate: bool,
pub isolate_memory_usage_tx: Option<oneshot::Sender<IsolateMemoryStats>>,
}

pub extern "C" fn v8_handle_termination(isolate: &mut v8::Isolate, data: *mut std::ffi::c_void) {
let mut boxed_data: Box<V8HandleTerminationData>;

unsafe {
boxed_data = Box::from_raw(data as *mut V8HandleTerminationData);
}

// log memory usage
let mut heap_stats = v8::HeapStatistics::default();

isolate.get_heap_statistics(&mut heap_stats);

let usage = IsolateMemoryStats {
used_heap_size: heap_stats.used_heap_size(),
external_memory: heap_stats.external_memory(),
};

if let Some(usage_tx) = boxed_data.isolate_memory_usage_tx.take() {
if usage_tx.send(usage).is_err() {
error!("failed to send isolate memory usage - receiver may have been dropped");
}
}

if boxed_data.should_terminate {
isolate.terminate_execution();
}
}

#[repr(C)]
pub struct IsolateMemoryStats {
pub used_heap_size: usize,
Expand Down Expand Up @@ -172,20 +138,73 @@ async fn create_wall_clock_beforeunload_alert(wall_clock_limit_ms: u64, pct: Opt
}
}

#[repr(C)]
pub struct V8HandleTerminationData {
pub should_terminate: bool,
pub isolate_memory_usage_tx: Option<oneshot::Sender<IsolateMemoryStats>>,
}

pub extern "C" fn v8_handle_termination(isolate: &mut v8::Isolate, data: *mut std::ffi::c_void) {
let mut data = unsafe { Box::from_raw(data as *mut V8HandleTerminationData) };

// log memory usage
let mut heap_stats = v8::HeapStatistics::default();

isolate.get_heap_statistics(&mut heap_stats);

let usage = IsolateMemoryStats {
used_heap_size: heap_stats.used_heap_size(),
external_memory: heap_stats.external_memory(),
};

if let Some(usage_tx) = data.isolate_memory_usage_tx.take() {
if usage_tx.send(usage).is_err() {
error!("failed to send isolate memory usage - receiver may have been dropped");
}
}

if data.should_terminate {
isolate.terminate_execution();
}
}

extern "C" fn v8_handle_wall_clock_beforeunload(
isolate: &mut v8::Isolate,
_data: *mut std::ffi::c_void,
) {
if let Err(err) = MaybeDenoRuntime::<()>::Isolate(isolate)
.dispatch_beforeunload_event(WillTerminateReason::WallClock)
{
warn!(
error!(
"found an error while dispatching the beforeunload event: {}",
err
);
}
}

#[repr(C)]
pub struct V8HandleEarlyRetireData {
token: CancellationToken,
}

extern "C" fn v8_handle_early_drop_beforeunload(
isolate: &mut v8::Isolate,
data: *mut std::ffi::c_void,
) {
let data = unsafe { Box::from_raw(data as *mut V8HandleEarlyRetireData) };

if let Err(err) = MaybeDenoRuntime::<()>::Isolate(isolate)
.dispatch_beforeunload_event(WillTerminateReason::EarlyDrop)
{
error!(
"found an error while dispatching the beforeunload event: {}",
err
);
} else {
data.token.cancel();
}
}

#[instrument(level = "debug", skip_all)]
extern "C" fn v8_handle_early_retire(isolate: &mut v8::Isolate, _data: *mut std::ffi::c_void) {
isolate.low_memory_notification();
Expand Down
125 changes: 90 additions & 35 deletions crates/base/src/rt_worker/supervisor/strategy_per_worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,14 @@ use std::{future::pending, sync::atomic::Ordering, time::Duration};
use std::thread::ThreadId;

use event_worker::events::ShutdownReason;
use log::error;
use log::{error, info};
use sb_workers::context::{Timing, TimingStatus, UserWorkerMsgs};
use tokio_util::sync::CancellationToken;

use crate::rt_worker::supervisor::{
create_wall_clock_beforeunload_alert, v8_handle_early_retire,
v8_handle_wall_clock_beforeunload, wait_cpu_alarm, CPUUsage, Tokens,
create_wall_clock_beforeunload_alert, v8_handle_early_drop_beforeunload,
v8_handle_early_retire, v8_handle_wall_clock_beforeunload, wait_cpu_alarm, CPUUsage, Tokens,
V8HandleEarlyRetireData,
};

use super::{v8_handle_termination, Arguments, CPUUsageMetrics, V8HandleTerminationData};
Expand Down Expand Up @@ -57,12 +59,13 @@ pub async fn supervise(args: Arguments) -> (ShutdownReason, i64) {
let mut is_worker_entered = false;
let mut is_wall_clock_beforeunload_armed = false;
let mut is_cpu_time_soft_limit_reached = false;
let mut is_termination_requested = false;
let mut is_waiting_for_termination = false;
let mut have_all_reqs_been_acknowledged = false;

let mut cpu_usage_metrics_rx = cpu_usage_metrics_rx.unwrap();
let mut cpu_usage_ms = 0i64;

let mut complete_reason = None::<ShutdownReason>;
let mut wall_clock_alerts = 0;
let mut req_ack_count = 0usize;

Expand Down Expand Up @@ -97,6 +100,25 @@ pub async fn supervise(args: Arguments) -> (ShutdownReason, i64) {
guard.raise();
};

let early_drop_token = CancellationToken::new();
let early_drop_fut = early_drop_token.cancelled();

let mut dispatch_early_drop_beforeunload_fn = Some({
let token = early_drop_token.clone();
|| {
let data_ptr_mut = Box::into_raw(Box::new(V8HandleEarlyRetireData { token }));

if !thread_safe_handle.request_interrupt(
v8_handle_early_drop_beforeunload,
data_ptr_mut as *mut std::ffi::c_void,
) {
drop(unsafe { Box::from_raw(data_ptr_mut) });
} else {
waker.wake();
}
}
});

let terminate_fn = {
let thread_safe_handle = thread_safe_handle.clone();
move || {
Expand All @@ -115,23 +137,27 @@ pub async fn supervise(args: Arguments) -> (ShutdownReason, i64) {

tokio::pin!(wall_clock_duration_alert);
tokio::pin!(wall_clock_beforeunload_alert);
tokio::pin!(early_drop_fut);

let result = 'scope: loop {
loop {
tokio::select! {
_ = supervise.cancelled() => {
break 'scope (ShutdownReason::TerminationRequested, cpu_usage_ms);
complete_reason = Some(ShutdownReason::TerminationRequested);
}

_ = async {
match termination.as_ref() {
Some(token) => token.inbound.cancelled().await,
None => pending().await,
}
}, if !is_termination_requested => {
is_termination_requested = true;
}, if !is_waiting_for_termination => {
is_waiting_for_termination = true;
if promise_metrics.have_all_promises_been_resolved() {
terminate_fn();
break 'scope (ShutdownReason::TerminationRequested, cpu_usage_ms);
if let Some(func) = dispatch_early_drop_beforeunload_fn.take() {
func();
}
// terminate_fn();
// complete_reason = Some(ShutdownReason::TerminationRequested);
}
}

Expand Down Expand Up @@ -164,9 +190,9 @@ pub async fn supervise(args: Arguments) -> (ShutdownReason, i64) {

if !cpu_timer_param.is_disabled() {
if cpu_usage_ms >= hard_limit_ms as i64 {
terminate_fn();
// terminate_fn();
error!("CPU time hard limit reached: isolate: {:?}", key);
break 'scope (ShutdownReason::CPUTime, cpu_usage_ms);
complete_reason = Some(ShutdownReason::CPUTime);
} else if cpu_usage_ms >= soft_limit_ms as i64 && !is_cpu_time_soft_limit_reached {
early_retire_fn();
error!("CPU time soft limit reached: isolate: {:?}", key);
Expand All @@ -177,17 +203,23 @@ pub async fn supervise(args: Arguments) -> (ShutdownReason, i64) {
if have_all_reqs_been_acknowledged
&& promise_metrics.have_all_promises_been_resolved()
{
terminate_fn();
error!("early termination due to the last request being completed: isolate: {:?}", key);
break 'scope (ShutdownReason::EarlyDrop, cpu_usage_ms);
// error!("early termination due to the last request being completed: isolate: {:?}", key);
if let Some(func) = dispatch_early_drop_beforeunload_fn.take() {
func();
}
// terminate_fn();
// complete_reason = Some(ShutdownReason::EarlyDrop);
}

} else if is_cpu_time_soft_limit_reached
&& have_all_reqs_been_acknowledged
&& promise_metrics.have_all_promises_been_resolved()
{
terminate_fn();
break 'scope (ShutdownReason::EarlyDrop, cpu_usage_ms);
if let Some(func) = dispatch_early_drop_beforeunload_fn.take() {
func();
}
// terminate_fn();
// complete_reason = Some(ShutdownReason::EarlyDrop);
}
}
}
Expand All @@ -206,14 +238,17 @@ pub async fn supervise(args: Arguments) -> (ShutdownReason, i64) {
if have_all_reqs_been_acknowledged
&& promise_metrics.have_all_promises_been_resolved()
{
terminate_fn();
error!("early termination due to the last request being completed: isolate: {:?}", key);
break 'scope (ShutdownReason::EarlyDrop, cpu_usage_ms);
if let Some(func) = dispatch_early_drop_beforeunload_fn.take() {
func();
}
// terminate_fn();
// error!("early termination due to the last request being completed: isolate: {:?}", key);
// complete_reason = Some(ShutdownReason::EarlyDrop);
}
} else {
terminate_fn();
// terminate_fn();
error!("CPU time hard limit reached: isolate: {:?}", key);
break 'scope (ShutdownReason::CPUTime, cpu_usage_ms);
complete_reason = Some(ShutdownReason::CPUTime);
}
}
}
Expand All @@ -237,9 +272,12 @@ pub async fn supervise(args: Arguments) -> (ShutdownReason, i64) {
continue;
}

terminate_fn();
error!("early termination due to the last request being completed: isolate: {:?}", key);
break 'scope (ShutdownReason::EarlyDrop, cpu_usage_ms);
if let Some(func) = dispatch_early_drop_beforeunload_fn.take() {
func();
}
// terminate_fn();
// error!("early termination due to the last request being completed: isolate: {:?}", key);
// complete_reason = ShutdownReason::EarlyDrop;
}

_ = wall_clock_duration_alert.tick(), if !is_wall_clock_limit_disabled => {
Expand All @@ -253,9 +291,9 @@ pub async fn supervise(args: Arguments) -> (ShutdownReason, i64) {
} else {
let is_in_flight_req_exists = req_ack_count != demand.load(Ordering::Acquire);

terminate_fn();
// terminate_fn();
error!("wall clock duration reached: isolate: {:?} (in_flight_req_exists = {})", key, is_in_flight_req_exists);
break 'scope (ShutdownReason::WallClockTime, cpu_usage_ms);
complete_reason = Some(ShutdownReason::WallClockTime);
}
}

Expand All @@ -273,18 +311,35 @@ pub async fn supervise(args: Arguments) -> (ShutdownReason, i64) {
}

Some(_) = memory_limit_rx.recv() => {
terminate_fn();
// terminate_fn();
error!("memory limit reached for the worker: isolate: {:?}", key);
break 'scope (ShutdownReason::Memory, cpu_usage_ms);
complete_reason = Some(ShutdownReason::Memory);
}
}
};

match result {
(ShutdownReason::EarlyDrop, cpu_usage_ms) if is_termination_requested => {
(ShutdownReason::TerminationRequested, cpu_usage_ms)
_ = &mut early_drop_fut => {
info!("early termination has been triggered: isolate: {:?}", key);
complete_reason = Some(ShutdownReason::EarlyDrop);
}
}

result => result,
match complete_reason.take() {
Some(ShutdownReason::EarlyDrop) => {
terminate_fn();
return (
if is_waiting_for_termination {
ShutdownReason::TerminationRequested
} else {
ShutdownReason::EarlyDrop
},
cpu_usage_ms,
);
}

Some(result) => {
terminate_fn();
return (result, cpu_usage_ms);
}
None => continue,
}
}
}
1 change: 0 additions & 1 deletion crates/base/test_cases/main/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@ Deno.serve((req: Request) => {
const workerTimeoutMs = parseIntFromHeadersOrDefault(req, "x-worker-timeout-ms", 10 * 60 * 1000);
const cpuTimeSoftLimitMs = parseIntFromHeadersOrDefault(req, "x-cpu-time-soft-limit-ms", 10 * 60 * 1000);
const cpuTimeHardLimitMs = parseIntFromHeadersOrDefault(req, "x-cpu-time-hard-limit-ms", 10 * 60 * 1000);
console.log(cpuTimeSoftLimitMs);
const noModuleCache = false;
const importMapPath = null;
const envVarsObj = Deno.env.toObject();
Expand Down