diff --git a/packages/bun-types/overrides.d.ts b/packages/bun-types/overrides.d.ts index d906534cc48..ee47728f4fb 100644 --- a/packages/bun-types/overrides.d.ts +++ b/packages/bun-types/overrides.d.ts @@ -94,6 +94,28 @@ declare global { _exiting: boolean; noDeprecation?: boolean | undefined; + /** + * Emitted when the operating system signals that available memory is + * running low. Use this to release caches or reap idle resources instead + * of polling. + * + * On macOS `level` distinguishes `"warning"` from `"critical"` based on + * the kernel's memorystatus thresholds. On Linux (PSI) and Windows the + * event is always emitted with `"critical"`. On Linux, the underlying + * PSI trigger requires `CAP_SYS_RESOURCE` on kernels before 6.6; when + * unavailable the event is simply never emitted. + * + * This listener does not keep the event loop alive. + */ + on(event: "memoryPressure", listener: (level: "warning" | "critical") => void): this; + once(event: "memoryPressure", listener: (level: "warning" | "critical") => void): this; + off(event: "memoryPressure", listener: (level: "warning" | "critical") => void): this; + addListener(event: "memoryPressure", listener: (level: "warning" | "critical") => void): this; + removeListener(event: "memoryPressure", listener: (level: "warning" | "critical") => void): this; + prependListener(event: "memoryPressure", listener: (level: "warning" | "critical") => void): this; + prependOnceListener(event: "memoryPressure", listener: (level: "warning" | "critical") => void): this; + emit(event: "memoryPressure", level: "warning" | "critical"): boolean; + binding(m: "constants"): { os: typeof import("node:os").constants; fs: typeof import("node:fs").constants; diff --git a/src/event_loop/ConcurrentTask.rs b/src/event_loop/ConcurrentTask.rs index 46ee11ebc2f..b4e1c3e635d 100644 --- a/src/event_loop/ConcurrentTask.rs +++ b/src/event_loop/ConcurrentTask.rs @@ -117,6 +117,7 @@ pub mod task_tag { Open, PollPendingModulesTask, PosixSignalTask, + MemoryPressureTask, ProcessWaiterThreadTask, Read, Readdir, diff --git a/src/io/posix_event_loop.rs b/src/io/posix_event_loop.rs index 942da6b2d8d..8f36b301a9a 100644 --- a/src/io/posix_event_loop.rs +++ b/src/io/posix_event_loop.rs @@ -221,6 +221,7 @@ pub enum PollTag { TerminalPoll, ParentDeathWatchdog, LifecycleScriptSubprocessOutputReader, + MemoryPressure, } /// Compatibility module — call sites in `bun_runtime`/`bun_install` still spell @@ -244,6 +245,7 @@ pub mod poll_tag { pub const PARENT_DEATH_WATCHDOG: PollTag = PollTag::ParentDeathWatchdog; pub const LIFECYCLE_SCRIPT_SUBPROCESS_OUTPUT_READER: PollTag = PollTag::LifecycleScriptSubprocessOutputReader; + pub const MEMORY_PRESSURE: PollTag = PollTag::MemoryPressure; } #[derive(Copy, Clone)] @@ -330,6 +332,7 @@ impl FilePoll { flags.remove(Flags::Writable); flags.remove(Flags::Process); flags.remove(Flags::Machport); + flags.remove(Flags::MemoryPressure); flags.remove(Flags::Eof); flags.remove(Flags::Hup); @@ -361,6 +364,14 @@ impl FilePoll { #[cfg(all(target_os = "macos", debug_assertions))] debug_assert!(self.generation_number == kqueue_event.ext[0] as usize); + // EVFILT_MEMORYSTATUS reports the pressure level in `fflags`, not `data`; + // thread it through `size_or_offset` so the dispatch arm can read it. + #[cfg(target_os = "macos")] + if kqueue_event.filter == bun_sys::darwin::EVFILT::MEMORYSTATUS { + self.on_update(kqueue_event.fflags as i64); + return; + } + self.on_update(kqueue_event.data as i64); } @@ -442,6 +453,7 @@ impl FilePoll { || self.flags.contains(Flags::PollReadable) || self.flags.contains(Flags::PollProcess) || self.flags.contains(Flags::PollMachport) + || self.flags.contains(Flags::PollMemoryPressure) } pub fn on_update(&mut self, size_or_offset: i64) { @@ -699,6 +711,8 @@ impl FilePoll { let mut flags: u32 = match flag { Flags::Process | Flags::Readable => EPOLL::IN | EPOLL::HUP | one_shot_flag, Flags::Writable => EPOLL::OUT | EPOLL::HUP | EPOLL::ERR | one_shot_flag, + // PSI trigger fds signal via POLLPRI only. + Flags::MemoryPressure => EPOLL::PRI | EPOLL::ERR | one_shot_flag, _ => unreachable!(), }; // epoll keys on fd alone; if the other direction is already @@ -784,6 +798,17 @@ impl FilePoll { flags: EV::ADD | one_shot_flag, ext: [self.generation_number as u64, 0], }, + // System-wide memory pressure. ident is always 0; EV_CLEAR so each + // transition delivers once (matches libdispatch's registration). + Flags::MemoryPressure => kevent64_s { + ident: 0, + filter: EVFILT::MEMORYSTATUS, + data: 0, + fflags: NOTE::MEMORYSTATUS_PRESSURE_WARN | NOTE::MEMORYSTATUS_PRESSURE_CRITICAL, + udata: Pollable::init(self).ptr() as u64, + flags: EV::ADD | EV::CLEAR | one_shot_flag, + ext: [self.generation_number as u64, 0], + }, _ => unreachable!(), }; @@ -863,7 +888,7 @@ impl FilePoll { NOTE::EXIT, udata, ), - Flags::Machport => { + Flags::Machport | Flags::MemoryPressure => { return sys::Result::Err(sys::Error::from_code( sys::E::EOPNOTSUPP, sys::Tag::kevent, @@ -914,6 +939,7 @@ impl FilePoll { } Flags::Writable => Flags::PollWritable, Flags::Machport => Flags::PollMachport, + Flags::MemoryPressure => Flags::PollMemoryPressure, _ => unreachable!(), }); self.flags.remove(Flags::NeedsRearm); @@ -972,7 +998,8 @@ impl FilePoll { if !(self.flags.contains(Flags::PollReadable) || self.flags.contains(Flags::PollWritable) || self.flags.contains(Flags::PollProcess) - || self.flags.contains(Flags::PollMachport)) + || self.flags.contains(Flags::PollMachport) + || self.flags.contains(Flags::PollMemoryPressure)) { // no-op return sys::Result::Ok(()); @@ -995,6 +1022,9 @@ impl FilePoll { if self.flags.contains(Flags::PollMachport) { break 'brk Flags::Machport; } + if self.flags.contains(Flags::PollMemoryPressure) { + break 'brk Flags::MemoryPressure; + } return sys::Result::Ok(()); }; @@ -1008,6 +1038,7 @@ impl FilePoll { self.flags.remove(Flags::PollReadable); self.flags.remove(Flags::PollWritable); self.flags.remove(Flags::PollMachport); + self.flags.remove(Flags::PollMemoryPressure); return sys::Result::Ok(()); } @@ -1078,6 +1109,15 @@ impl FilePoll { flags: EV::DELETE, ext: [0, 0], }, + Flags::MemoryPressure => kevent64_s { + ident: 0, + filter: EVFILT::MEMORYSTATUS, + data: 0, + fflags: 0, + udata: Pollable::init(self).ptr() as u64, + flags: EV::DELETE, + ext: [0, 0], + }, _ => unreachable!(), }; @@ -1154,7 +1194,7 @@ impl FilePoll { Flags::Readable => make_kevent(ident, EVFILT::READ, EV::DELETE, 0, udata), Flags::Writable => make_kevent(ident, EVFILT::WRITE, EV::DELETE, 0, udata), Flags::Process => make_kevent(ident, EVFILT::PROC, EV::DELETE, NOTE::EXIT, udata), - Flags::Machport => { + Flags::Machport | Flags::MemoryPressure => { return sys::Result::Err(sys::Error::from_code( sys::E::EOPNOTSUPP, sys::Tag::kevent, @@ -1196,6 +1236,7 @@ impl FilePoll { self.flags.remove(Flags::PollWritable); self.flags.remove(Flags::PollProcess); self.flags.remove(Flags::PollMachport); + self.flags.remove(Flags::PollMemoryPressure); sys::Result::Ok(()) } @@ -1230,6 +1271,8 @@ pub enum Flags { PollProcess, /// Poll for machport events PollMachport, + /// Poll for memory-pressure events (Darwin `EVFILT_MEMORYSTATUS`, Linux PSI `EPOLLPRI`) + PollMemoryPressure, // What did the event loop tell us? Readable, @@ -1238,6 +1281,7 @@ pub enum Flags { Eof, Hup, Machport, + MemoryPressure, // What is the type of file descriptor? Fifo, @@ -1273,6 +1317,7 @@ impl Flags { Flags::Writable => Flags::PollWritable, Flags::Process => Flags::PollProcess, Flags::Machport => Flags::PollMachport, + Flags::MemoryPressure => Flags::PollMemoryPressure, other => other, } } @@ -1301,6 +1346,10 @@ impl Flags { if kqueue_event.filter == EVFILT::MACHPORT { flags.insert(Flags::Machport); } + #[cfg(target_os = "macos")] + if kqueue_event.filter == EVFILT::MEMORYSTATUS { + flags.insert(Flags::MemoryPressure); + } } flags } @@ -1315,6 +1364,9 @@ impl Flags { if epoll.events & EPOLL::OUT != 0 { flags.insert(Flags::Writable); } + if epoll.events & EPOLL::PRI != 0 { + flags.insert(Flags::MemoryPressure); + } if epoll.events & EPOLL::ERR != 0 { flags.insert(Flags::Eof); } diff --git a/src/js/internal-for-testing.ts b/src/js/internal-for-testing.ts index 2f9a0fa67b6..f75ee408b8b 100644 --- a/src/js/internal-for-testing.ts +++ b/src/js/internal-for-testing.ts @@ -290,6 +290,18 @@ export const lowercaseHeaderNameSIMD: (name: string) => string = $newCppFunction 1, ); +export const emitMemoryPressure: (level: "warning" | "critical") => void = $newCppFunction( + "InternalForTesting.cpp", + "jsFunction_emitMemoryPressure", + 1, +); + +export const isMemoryPressureWatcherInstalled: () => boolean = $newCppFunction( + "InternalForTesting.cpp", + "jsFunction_isMemoryPressureWatcherInstalled", + 0, +); + export const getEventLoopStats: () => { activeTasks: number; concurrentRef: number; numPolls: number } = $newZigFunction("event_loop.zig", "getActiveTasks", 0); diff --git a/src/jsc/bindings/BunProcess.cpp b/src/jsc/bindings/BunProcess.cpp index 90188ca6017..fec52f40a2e 100644 --- a/src/jsc/bindings/BunProcess.cpp +++ b/src/jsc/bindings/BunProcess.cpp @@ -1382,9 +1382,24 @@ __attribute__((noinline)) static void forwardSignal(int signalNumber) Bun__onPosixSignal(signalNumber); } +extern "C" void Bun__MemoryPressure__install(JSC::JSGlobalObject* global); +extern "C" void Bun__MemoryPressure__uninstall(JSC::JSGlobalObject* global); + static void onDidChangeListeners(EventEmitter& eventEmitter, const Identifier& eventName, bool isAdded) { if (Bun__isMainThreadVM()) { + if (eventName == "memoryPressure") { + auto* global = eventEmitter.scriptExecutionContext()->jsGlobalObject(); + if (isAdded) { + if (eventEmitter.listenerCount(eventName) == 1) { + Bun__MemoryPressure__install(global); + } + } else if (eventEmitter.listenerCount(eventName) == 0) { + Bun__MemoryPressure__uninstall(global); + } + return; + } + // IPC handlers if (eventName == "message" || eventName == "disconnect") { auto* global = uncheckedDowncast(eventEmitter.scriptExecutionContext()->jsGlobalObject()); @@ -4314,6 +4329,19 @@ extern "C" void Process__emitDisconnectEvent(Zig::GlobalObject* global) } } +extern "C" void Process__emitMemoryPressureEvent(Zig::GlobalObject* global, int level) +{ + auto* process = global->processObject(); + auto& vm = JSC::getVM(global); + auto ident = Identifier::fromString(vm, "memoryPressure"_s); + if (process->wrapped().hasEventListeners(ident)) { + JSC::MarkedArgumentBuffer args; + // Level values match NOTE_MEMORYSTATUS_PRESSURE_WARN (2) / _CRITICAL (4). + args.append(jsString(vm, level == 2 ? String("warning"_s) : String("critical"_s))); + process->wrapped().emit(ident, args); + } +} + extern "C" void Process__emitErrorEvent(Zig::GlobalObject* global, EncodedJSValue value) { auto* process = global->processObject(); diff --git a/src/jsc/bindings/InternalForTesting.cpp b/src/jsc/bindings/InternalForTesting.cpp index 141195bc252..69121a0e5b2 100644 --- a/src/jsc/bindings/InternalForTesting.cpp +++ b/src/jsc/bindings/InternalForTesting.cpp @@ -101,4 +101,27 @@ JSC_DEFINE_HOST_FUNCTION(jsFunction_BunString_toThreadSafeRefCountDelta, (JSC::J return JSValue::encode(jsNumber(static_cast(after) - static_cast(before))); } +extern "C" void Bun__MemoryPressure__emit(JSC::JSGlobalObject* global, int level); +extern "C" bool Bun__MemoryPressure__isInstalled(JSC::JSGlobalObject* global); + +// Synthetically fire process.on("memoryPressure") so tests can exercise the +// emit path without depending on real OS memory pressure. +JSC_DEFINE_HOST_FUNCTION(jsFunction_emitMemoryPressure, (JSC::JSGlobalObject * globalObject, JSC::CallFrame* callFrame)) +{ + auto& vm = globalObject->vm(); + auto scope = DECLARE_THROW_SCOPE(vm); + auto str = callFrame->argument(0).toWTFString(globalObject); + RETURN_IF_EXCEPTION(scope, {}); + int level = str == "warning"_s ? 2 : 4; + Bun__MemoryPressure__emit(defaultGlobalObject(globalObject), level); + return encodedJSUndefined(); +} + +// Whether the per-VM memory-pressure watcher is currently installed, so tests +// can observe that process.on/off actually arm/disarm the OS backend. +JSC_DEFINE_HOST_FUNCTION(jsFunction_isMemoryPressureWatcherInstalled, (JSC::JSGlobalObject * globalObject, JSC::CallFrame* callFrame)) +{ + return JSValue::encode(jsBoolean(Bun__MemoryPressure__isInstalled(defaultGlobalObject(globalObject)))); +} + } diff --git a/src/jsc/bindings/InternalForTesting.h b/src/jsc/bindings/InternalForTesting.h index 7d7ea01e417..e16a2a98b6c 100644 --- a/src/jsc/bindings/InternalForTesting.h +++ b/src/jsc/bindings/InternalForTesting.h @@ -11,5 +11,7 @@ JSC_DECLARE_HOST_FUNCTION(jsFunction_lsanDoLeakCheck); JSC_DECLARE_HOST_FUNCTION(jsFunction_isASANEnabled); JSC_DECLARE_HOST_FUNCTION(jsFunction_BunString_toThreadSafeRefCountDelta); JSC_DECLARE_HOST_FUNCTION(jsFunction_lowercaseHeaderNameSIMD); +JSC_DECLARE_HOST_FUNCTION(jsFunction_emitMemoryPressure); +JSC_DECLARE_HOST_FUNCTION(jsFunction_isMemoryPressureWatcherInstalled); } diff --git a/src/jsc/rare_data.rs b/src/jsc/rare_data.rs index e61f53ad37e..ded17e225e6 100644 --- a/src/jsc/rare_data.rs +++ b/src/jsc/rare_data.rs @@ -274,6 +274,10 @@ pub struct RareData { /// lazy-init in `bun_runtime::node::node_fs_stat_watcher`. pub node_fs_stat_watcher_scheduler: Option>, + /// `bun_runtime::node::memory_pressure::MemoryPressureWatcher` — erased + /// `Box`; lazy-init on the first `process.on("memoryPressure", ...)` listener. + pub memory_pressure_watcher: Option>, + /// Watch-mode restart needs to RST every listen socket so the new process /// can rebind without `EADDRINUSE`. Written on the JS thread; drained on /// the watcher thread — hence the mutex (PORTING.md §Concurrency: lock @@ -336,6 +340,7 @@ impl Default for RareData { default_client_ssl_ctx: None, mime_types: None, node_fs_stat_watcher_scheduler: None, + memory_pressure_watcher: None, listening_sockets_for_watch_mode: Mutex::new(Vec::new()), temp_pipe_read_buffer: None, s3_default_client: Strong::empty(), @@ -640,6 +645,12 @@ impl RareData { &mut self.node_fs_stat_watcher_scheduler } + /// Raw slot — lazy-init body lives in `bun_runtime::node::memory_pressure`. + #[inline] + pub fn memory_pressure_watcher_slot(&mut self) -> &mut Option> { + &mut self.memory_pressure_watcher + } + /// Raw slot — lazy-init body lives in `bun_http_jsc::WebSocketDeflate`. #[inline] pub fn websocket_deflate_slot(&mut self) -> &mut Option { diff --git a/src/runtime/dispatch.rs b/src/runtime/dispatch.rs index b93fe2b60cf..9ddfc8c0454 100644 --- a/src/runtime/dispatch.rs +++ b/src/runtime/dispatch.rs @@ -424,6 +424,10 @@ pub fn run_task( global, ); } + task_tag::MemoryPressureTask => { + // `ptr` is the packed level (NOTE_MEMORYSTATUS_PRESSURE_* bits), not a pointer. + crate::node::memory_pressure::emit(global, task.ptr as usize as i32); + } task_tag::NativePromiseContextDeferredDerefTask => { // `ptr` packs an int, not a pointer. NativePromiseContextDeferredDerefTask::run_from_js_thread(task.ptr as usize); @@ -579,7 +583,7 @@ fn run_task_cold(task: Task) { /// Compile-time guard that the arm count above tracks /// `bun_event_loop::task_tag::COUNT`. Bump when adding a variant. const _: () = assert!( - task_tag::COUNT == 96, + task_tag::COUNT == 97, "dispatch::run_task arm count out of sync with bun_event_loop::task_tag", ); @@ -679,6 +683,10 @@ pub unsafe fn __bun_run_file_poll(poll: *mut FilePoll, size_or_offset: i64) { // SAFETY: `proc` carries the +1 ref taken at queue time; this drops it. unsafe { Process::on_wait_pid_from_event_loop_task(proc) }; } + poll_tag::MEMORY_PRESSURE => { + // SAFETY: `poll` is live per `__bun_run_file_poll`'s contract. + crate::node::memory_pressure::on_poll(unsafe { &mut *poll }, size_or_offset); + } poll_tag::PARENT_DEATH_WATCHDOG => { let wd = owner_as!(bun_io::parent_death_watchdog::ParentDeathWatchdog); // Mac-only — debug-assert elsewhere (Linux uses prctl(PR_SET_PDEATHSIG)). diff --git a/src/runtime/node.rs b/src/runtime/node.rs index 2d79aefdd0b..3f2c9278df8 100644 --- a/src/runtime/node.rs +++ b/src/runtime/node.rs @@ -101,6 +101,8 @@ pub mod win_watcher; // Force-references `Bun__UVSignalHandle__init` / `Bun__UVSignalHandle__close` // for C++ (`src/jsc/bindings/BunProcess.cpp`). Must be `mod`-declared or the // `#[no_mangle]` exports are never compiled into the binary. +#[path = "node/memory_pressure.rs"] +pub mod memory_pressure; #[path = "node/node_fs_binding.rs"] pub mod node_fs_binding; #[path = "node/node_fs_stat_watcher.rs"] diff --git a/src/runtime/node/memory_pressure.rs b/src/runtime/node/memory_pressure.rs new file mode 100644 index 00000000000..eaf6337c988 --- /dev/null +++ b/src/runtime/node/memory_pressure.rs @@ -0,0 +1,418 @@ +//! `process.on("memoryPressure", level => ...)` — OS-level low-memory +//! notifications without polling. +//! +//! Backends: +//! - macOS: `EVFILT_MEMORYSTATUS` on the main event loop's kqueue (the same +//! filter libdispatch's `DISPATCH_SOURCE_TYPE_MEMORYPRESSURE` uses). The +//! kernel delivers `NOTE_MEMORYSTATUS_PRESSURE_WARN` / `_CRITICAL` in +//! `fflags` when `kern.memorystatus_level` crosses the warn/critical +//! thresholds. +//! - Linux: PSI trigger on `/proc/pressure/memory` (or the cgroup v2 +//! `memory.pressure` file for the container's own cgroup). PSI triggers +//! signal via `POLLPRI`. Requires `CAP_SYS_RESOURCE` before kernel 6.6, +//! and PSI enabled (`CONFIG_PSI=y`). If neither path can be opened for +//! writing, the watcher silently does nothing. +//! - Windows: a dedicated thread blocks on +//! `CreateMemoryResourceNotification(LowMemoryResourceNotification)` and +//! posts back to the JS event loop when it signals, with a 30 s holdoff +//! before re-waiting (the handle is level-triggered). +//! +//! All three backends post a `MemoryPressureTask` to the event loop rather +//! than calling into JS from the detector, so a listener removing itself +//! during `emit()` never races with the poll/thread that produced the event. +//! +//! Armed lazily on the first listener and disarmed on the last removal via +//! `onDidChangeListeners` in `BunProcess.cpp`, matching how signal handlers +//! are wired. The watcher does not keep the event loop alive. + +use bun_event_loop::ConcurrentTask::{Task, task_tag}; +use bun_jsc::JSGlobalObject; +#[cfg(not(windows))] +use bun_jsc::virtual_machine::VirtualMachine; +#[cfg(not(windows))] +use core::ptr::NonNull; + +/// Pressure level passed to JS. Values are the `NOTE_MEMORYSTATUS_PRESSURE_*` +/// bits on macOS so the kqueue dispatch can pass `fflags` through unchanged. +pub mod level { + pub const WARNING: i32 = 0x00000002; + pub const CRITICAL: i32 = 0x00000004; +} + +unsafe extern "C" { + fn Process__emitMemoryPressureEvent(global: *mut JSGlobalObject, level: i32); +} + +/// `run_task` target for `task_tag::MemoryPressureTask`. `lvl` is the packed +/// task payload (macOS kevent `fflags`, or `level::CRITICAL` elsewhere). +pub fn emit(global: &JSGlobalObject, lvl: i32) { + // macOS can deliver WARN|CRITICAL together under EV_CLEAR; pick the more severe. + let lvl = if lvl & level::CRITICAL != 0 || lvl & level::WARNING == 0 { + level::CRITICAL + } else { + level::WARNING + }; + // SAFETY: FFI; `global` is the live per-thread global. + unsafe { Process__emitMemoryPressureEvent(core::ptr::from_ref(global).cast_mut(), lvl) }; +} + +pub(crate) fn pressure_task(lvl: i32) -> Task { + Task::new(task_tag::MemoryPressureTask, lvl as usize as *mut ()) +} + +#[cfg(not(windows))] +fn slot(vm: &mut VirtualMachine) -> &mut Option> { + vm.rare_data().memory_pressure_watcher_slot() +} + +// ──────────────────────────────────────────────────────────────────────────── +// POSIX backend (macOS EVFILT_MEMORYSTATUS, Linux PSI) via FilePoll +// ──────────────────────────────────────────────────────────────────────────── + +#[cfg(not(windows))] +mod posix { + use core::ptr::NonNull; + + use bun_io::posix_event_loop::FilePoll; + #[cfg(any(target_os = "linux", target_os = "android", target_os = "macos"))] + use bun_io::posix_event_loop::{Flags, Owner, poll_tag}; + use bun_jsc::JSGlobalObject; + use bun_jsc::virtual_machine::VirtualMachine; + #[cfg(any(target_os = "linux", target_os = "android", target_os = "macos"))] + use bun_sys::Fd; + + use super::slot; + + /// Stored type-erased in `RareData.memory_pressure_watcher`. `poll` is + /// `None` when the OS backend is unavailable so `isInstalled` still + /// reflects listener presence. + struct MemoryPressureWatcher { + poll: Option>, + } + + fn take_watcher(vm: &mut VirtualMachine) -> Option> { + let raw = slot(vm).take()?; + // SAFETY: slot is populated only by `install` with a `Box`. + Some(unsafe { bun_core::heap::take(raw.as_ptr().cast::()) }) + } + + fn deinit_poll(poll: &mut FilePoll) { + #[cfg(any(target_os = "linux", target_os = "android"))] + let fd = poll.fd; + poll.deinit(); + #[cfg(any(target_os = "linux", target_os = "android"))] + { + let _ = bun_sys::close(fd); + } + } + + /// Build `/sys/fs/cgroup//memory.pressure` from the cgroup v2 + /// entry in `/proc/self/cgroup`. + #[cfg(any(target_os = "linux", target_os = "android"))] + fn own_cgroup_pressure_path(buf: &mut [u8]) -> Option<&bun_core::ZStr> { + use bun_sys::O; + let fd = bun_sys::open(bun_core::zstr!("/proc/self/cgroup"), O::RDONLY, 0).ok()?; + let mut read = [0u8; 256]; + let n = bun_sys::read(fd, &mut read).unwrap_or(0); + let _ = bun_sys::close(fd); + for line in read[..n].split(|&b| b == b'\n') { + let Some(rest) = line.strip_prefix(b"0::") else { + continue; + }; + let rest = core::str::from_utf8(rest.strip_prefix(b"/").unwrap_or(rest)).ok()?; + return bun_core::fmt::buf_print_z( + buf, + format_args!( + "/sys/fs/cgroup/{}{}memory.pressure", + rest, + if rest.is_empty() { "" } else { "/" } + ), + ) + .ok(); + } + None + } + + /// Open a PSI memory file and write a trigger. Tries the system-wide + /// `/proc/pressure/memory` first, then the current cgroup's file. + #[cfg(any(target_os = "linux", target_os = "android"))] + fn open_psi_fd() -> Option { + use bun_sys::O; + + /// 150 ms of "some"-stall in any 2 s window. 2 s is the minimum + /// window for unprivileged PSI triggers (kernel 6.6+). + const TRIGGER: &[u8] = b"some 150000 2000000"; + + let mut cgroup_buf = [0u8; 320]; + let paths = [ + Some(bun_core::zstr!("/proc/pressure/memory")), + own_cgroup_pressure_path(&mut cgroup_buf), + ]; + for path in paths.into_iter().flatten() { + let Ok(fd) = bun_sys::open(path, O::RDWR | O::NONBLOCK | O::CLOEXEC, 0) else { + continue; + }; + if bun_sys::write(fd, TRIGGER).is_ok() { + return Some(fd); + } + let _ = bun_sys::close(fd); + } + None + } + + fn register_os_watch(global: &JSGlobalObject) -> Option> { + #[cfg(any(target_os = "linux", target_os = "android"))] + let fd = open_psi_fd()?; + #[cfg(target_os = "macos")] + let fd = Fd::from_native(0); + #[cfg(not(any(target_os = "linux", target_os = "android", target_os = "macos")))] + { + let _ = global; + return None; + } + #[cfg(any(target_os = "linux", target_os = "android", target_os = "macos"))] + { + let ctx = global.bun_vm().loop_ctx(); + let poll = FilePoll::init( + ctx, + fd, + Default::default(), + Owner::new( + poll_tag::MEMORY_PRESSURE, + NonNull::<()>::dangling().as_ptr(), + ), + ); + // SAFETY: `poll` is the fresh hive slot; `platform_event_loop` is the live uws loop. + let result = unsafe { + (*poll).register(ctx.platform_event_loop(), Flags::MemoryPressure, false) + }; + if result.is_err() { + // SAFETY: fresh hive slot never handed out. + deinit_poll(unsafe { &mut *poll }); + return None; + } + NonNull::new(poll) + } + } + + pub(super) fn install(global: &JSGlobalObject) { + let vm = global.bun_vm().as_mut(); + if slot(vm).is_some() { + return; + } + let watcher = Box::new(MemoryPressureWatcher { + poll: register_os_watch(global), + }); + *slot(global.bun_vm().as_mut()) = NonNull::new(bun_core::heap::into_raw(watcher).cast()); + } + + pub(super) fn uninstall(global: &JSGlobalObject) { + let Some(watcher) = take_watcher(global.bun_vm().as_mut()) else { + return; + }; + if let Some(mut poll) = watcher.poll { + // SAFETY: `on_poll` enqueues a task instead of running user JS, + // so this is never reached from inside the dispatch and no other + // `&mut FilePoll` is live. + deinit_poll(unsafe { poll.as_mut() }); + } + } + + /// `__bun_run_file_poll` dispatch target. `fflags` is the kqueue `fflags` + /// on macOS (carrying the pressure level) and 0 on Linux. + pub fn on_poll(poll: &mut FilePoll, fflags: i64) { + let vm = VirtualMachine::get_mut(); + + // `EPOLLERR`/`EPOLLHUP` on a PSI fd means the trigger is dead (e.g. + // the cgroup was removed). kernfs reports that permanently, so tear + // the watch down instead of emitting to avoid a level-triggered spin. + #[cfg(any(target_os = "linux", target_os = "android"))] + if poll.flags.contains(Flags::Eof) || poll.flags.contains(Flags::Hup) { + drop(take_watcher(vm)); + deinit_poll(poll); + return; + } + + #[cfg(not(any(target_os = "linux", target_os = "android")))] + let _ = poll; + #[cfg(target_os = "macos")] + let lvl = fflags as i32; + #[cfg(not(target_os = "macos"))] + let lvl = { + let _ = fflags; + super::level::CRITICAL + }; + vm.enqueue_task(super::pressure_task(lvl)); + } +} + +// ──────────────────────────────────────────────────────────────────────────── +// Windows backend: CreateMemoryResourceNotification on a dedicated thread +// ──────────────────────────────────────────────────────────────────────────── + +#[cfg(windows)] +mod windows { + use core::ffi::c_void; + use core::ptr::{self, NonNull}; + + use bun_event_loop::ConcurrentTask::ConcurrentTask; + use bun_jsc::JSGlobalObject; + use bun_jsc::virtual_machine::VirtualMachine; + + type HANDLE = *mut c_void; + type BOOL = i32; + type DWORD = u32; + const WAIT_OBJECT_0: DWORD = 0; + const LOW_MEMORY_RESOURCE_NOTIFICATION: i32 = 0; + /// The notification handle stays signalled while memory is low; after + /// posting once we wait on `shutdown` alone for this long before + /// re-checking, so sustained pressure fires at most every 30 s. + const HOLDOFF_MS: DWORD = 30_000; + + unsafe extern "system" { + fn CreateMemoryResourceNotification(kind: i32) -> HANDLE; + fn CreateEventW( + attrs: *mut c_void, + manual_reset: BOOL, + initial: BOOL, + name: *const u16, + ) -> HANDLE; + fn SetEvent(h: HANDLE) -> BOOL; + fn WaitForSingleObject(h: HANDLE, ms: DWORD) -> DWORD; + fn WaitForMultipleObjects(n: DWORD, h: *const HANDLE, wait_all: BOOL, ms: DWORD) -> DWORD; + fn CloseHandle(h: HANDLE) -> BOOL; + } + + /// Owns a kernel HANDLE; closes on drop. + struct OwnedHandle(HANDLE); + impl Drop for OwnedHandle { + fn drop(&mut self) { + // SAFETY: constructed only from a handle returned by the kernel. + unsafe { CloseHandle(self.0) }; + } + } + + struct MemoryPressureWatcher { + /// Held only so it is closed on drop after the thread joins. + _notify: OwnedHandle, + shutdown: OwnedHandle, + thread: Option>, + } + + fn slot(vm: &mut VirtualMachine) -> &mut Option> { + vm.rare_data().memory_pressure_watcher_slot() + } + + fn thread_main(vm_addr: usize, notify: usize, shutdown: usize) { + bun_core::output::Source::configure_named_thread(bun_core::zstr!("MemoryPressure")); + let handles: [HANDLE; 2] = [shutdown as HANDLE, notify as HANDLE]; + loop { + // SAFETY: `uninstall` joins before closing the handles. + let rc = unsafe { WaitForMultipleObjects(2, handles.as_ptr(), 0, u32::MAX) }; + if rc != WAIT_OBJECT_0 + 1 { + break; + } + let task = ConcurrentTask::create(super::pressure_task(super::level::CRITICAL)); + // SAFETY: main-thread VM captured at install; process-lifetime. + unsafe { &*(vm_addr as *const VirtualMachine) } + .event_loop_shared() + .enqueue_task_concurrent(task); + // SAFETY: `shutdown` is valid for the thread's lifetime. + if unsafe { WaitForSingleObject(handles[0], HOLDOFF_MS) } == WAIT_OBJECT_0 { + break; + } + } + } + + pub(super) fn install(global: &JSGlobalObject) { + let vm = global.bun_vm().as_mut(); + if slot(vm).is_some() { + return; + } + + // SAFETY: FFI; returns NULL on failure. + let notify = unsafe { CreateMemoryResourceNotification(LOW_MEMORY_RESOURCE_NOTIFICATION) }; + if notify.is_null() { + return; + } + let notify = OwnedHandle(notify); + // SAFETY: FFI; manual-reset, initially non-signalled, unnamed. + let shutdown = unsafe { CreateEventW(ptr::null_mut(), 1, 0, ptr::null()) }; + if shutdown.is_null() { + return; + } + let shutdown = OwnedHandle(shutdown); + + let (vm_addr, n, s) = ( + core::ptr::from_ref(global.bun_vm()) as usize, + notify.0 as usize, + shutdown.0 as usize, + ); + let Ok(thread) = std::thread::Builder::new() + .name("MemoryPressure".into()) + .stack_size(64 * 1024) + .spawn(move || thread_main(vm_addr, n, s)) + else { + return; + }; + + let watcher = Box::new(MemoryPressureWatcher { + _notify: notify, + shutdown, + thread: Some(thread), + }); + *slot(global.bun_vm().as_mut()) = NonNull::new(bun_core::heap::into_raw(watcher).cast()); + } + + pub(super) fn uninstall(global: &JSGlobalObject) { + let Some(raw) = slot(global.bun_vm().as_mut()).take() else { + return; + }; + // SAFETY: slot is populated only by `install` with a `Box`. + let mut watcher = + unsafe { bun_core::heap::take(raw.as_ptr().cast::()) }; + // SAFETY: FFI; `shutdown` is a valid event owned by `watcher`. + unsafe { SetEvent(watcher.shutdown.0) }; + if let Some(thread) = watcher.thread.take() { + let _ = thread.join(); + } + } +} + +// ──────────────────────────────────────────────────────────────────────────── +// C-ABI exports for BunProcess.cpp / InternalForTesting.cpp +// ──────────────────────────────────────────────────────────────────────────── + +#[unsafe(no_mangle)] +pub extern "C" fn Bun__MemoryPressure__install(global: &JSGlobalObject) { + #[cfg(not(windows))] + posix::install(global); + #[cfg(windows)] + windows::install(global); +} + +#[unsafe(no_mangle)] +pub extern "C" fn Bun__MemoryPressure__uninstall(global: &JSGlobalObject) { + #[cfg(not(windows))] + posix::uninstall(global); + #[cfg(windows)] + windows::uninstall(global); +} + +#[unsafe(no_mangle)] +pub extern "C" fn Bun__MemoryPressure__emit(global: &JSGlobalObject, lvl: i32) { + emit(global, lvl); +} + +#[unsafe(no_mangle)] +pub extern "C" fn Bun__MemoryPressure__isInstalled(global: &JSGlobalObject) -> bool { + global + .bun_vm() + .as_mut() + .rare_data() + .memory_pressure_watcher_slot() + .is_some() +} + +#[cfg(not(windows))] +pub use posix::on_poll; diff --git a/src/sys/lib.rs b/src/sys/lib.rs index e78f24a5cd7..64d2f26b483 100644 --- a/src/sys/lib.rs +++ b/src/sys/lib.rs @@ -5384,6 +5384,7 @@ pub mod linux { pub mod EPOLL { pub const IN: u32 = libc::EPOLLIN as u32; pub const OUT: u32 = libc::EPOLLOUT as u32; + pub const PRI: u32 = libc::EPOLLPRI as u32; pub const ERR: u32 = libc::EPOLLERR as u32; pub const HUP: u32 = libc::EPOLLHUP as u32; pub const RDHUP: u32 = libc::EPOLLRDHUP as u32; @@ -5665,6 +5666,9 @@ pub mod darwin { pub const TIMER: i16 = libc::EVFILT_TIMER; pub const USER: i16 = libc::EVFILT_USER; pub const MACHPORT: i16 = libc::EVFILT_MACHPORT; + /// xnu-private filter used by libdispatch's `DISPATCH_SOURCE_TYPE_MEMORYPRESSURE`. + /// Not in `` (only ``), so hard-code the value. + pub const MEMORYSTATUS: i16 = -14; } /// kqueue event flags (Darwin). pub mod EV { @@ -5694,6 +5698,11 @@ pub mod darwin { pub const LINK: u32 = libc::NOTE_LINK; pub const RENAME: u32 = libc::NOTE_RENAME; pub const REVOKE: u32 = libc::NOTE_REVOKE; + /// `EVFILT_MEMORYSTATUS` fflags (xnu ``). Values are + /// ABI-stable; libdispatch depends on them for `DISPATCH_MEMORYPRESSURE_*`. + pub const MEMORYSTATUS_PRESSURE_NORMAL: u32 = 0x00000001; + pub const MEMORYSTATUS_PRESSURE_WARN: u32 = 0x00000002; + pub const MEMORYSTATUS_PRESSURE_CRITICAL: u32 = 0x00000004; } /// Re-export of the platform errno enum so `bun_threading::Futex` can /// match `c::E::INTR` etc. against `__ulock_*` return codes. diff --git a/test/js/node/process/process-memory-pressure.test.ts b/test/js/node/process/process-memory-pressure.test.ts new file mode 100644 index 00000000000..80d0223d7de --- /dev/null +++ b/test/js/node/process/process-memory-pressure.test.ts @@ -0,0 +1,110 @@ +import { describe, expect, test } from "bun:test"; +import { bunEnv, bunExe } from "harness"; + +// process.on("memoryPressure") is a Bun extension. These tests drive the +// emit path synthetically via bun:internal-for-testing since real OS memory +// pressure cannot be induced reliably (and PSI trigger creation requires +// CAP_SYS_RESOURCE on Linux kernels before 6.6, which CI containers lack). + +async function run(src: string) { + await using proc = Bun.spawn({ + cmd: [bunExe(), "-e", src], + env: bunEnv, + stderr: "pipe", + stdout: "pipe", + }); + const [stdout, stderr, exitCode] = await Promise.all([proc.stdout.text(), proc.stderr.text(), proc.exited]); + return { stdout, stderr, exitCode }; +} + +describe.concurrent("process.on('memoryPressure')", () => { + test("listener receives level argument", async () => { + const { stdout, stderr, exitCode } = await run(/* js */ ` + const { emitMemoryPressure } = require("bun:internal-for-testing"); + const seen = []; + process.on("memoryPressure", level => seen.push(level)); + emitMemoryPressure("warning"); + emitMemoryPressure("critical"); + process.stdout.write(JSON.stringify(seen)); + `); + expect({ stdout, stderr: stderr.trim() }).toEqual({ + stdout: JSON.stringify(["warning", "critical"]), + stderr: "", + }); + expect(exitCode).toBe(0); + }); + + test("arms on first listener and disarms on last removal", async () => { + const { stdout, stderr, exitCode } = await run(/* js */ ` + const { emitMemoryPressure, isMemoryPressureWatcherInstalled } = require("bun:internal-for-testing"); + const seen = []; + const installed = []; + const a = level => seen.push("a:" + level); + const b = level => seen.push("b:" + level); + installed.push(isMemoryPressureWatcherInstalled()); // false: no listeners yet + process.on("memoryPressure", a); + installed.push(isMemoryPressureWatcherInstalled()); // true: first listener armed it + process.on("memoryPressure", b); + installed.push(isMemoryPressureWatcherInstalled()); // true: still armed + emitMemoryPressure("warning"); + process.off("memoryPressure", a); + installed.push(isMemoryPressureWatcherInstalled()); // true: one listener left + emitMemoryPressure("critical"); + process.off("memoryPressure", b); + installed.push(isMemoryPressureWatcherInstalled()); // false: last listener removed + // No listeners registered; emit should be a no-op. + emitMemoryPressure("critical"); + // Re-arm and emit again to prove the watcher can be reinstalled. + process.on("memoryPressure", a); + installed.push(isMemoryPressureWatcherInstalled()); // true: re-armed + emitMemoryPressure("warning"); + process.off("memoryPressure", a); + installed.push(isMemoryPressureWatcherInstalled()); // false: disarmed again + process.stdout.write(JSON.stringify({ seen, installed })); + `); + expect({ stdout, stderr: stderr.trim() }).toEqual({ + stdout: JSON.stringify({ + seen: ["a:warning", "b:warning", "b:critical", "a:warning"], + installed: [false, true, true, true, false, true, false], + }), + stderr: "", + }); + expect(exitCode).toBe(0); + }); + + test("process.once works", async () => { + const { stdout, exitCode } = await run(/* js */ ` + const { emitMemoryPressure } = require("bun:internal-for-testing"); + const seen = []; + process.once("memoryPressure", level => seen.push(level)); + emitMemoryPressure("critical"); + emitMemoryPressure("critical"); + process.stdout.write(JSON.stringify(seen)); + `); + expect(stdout).toBe(JSON.stringify(["critical"])); + expect(exitCode).toBe(0); + }); + + test("listener does not keep the event loop alive", async () => { + const { stdout, exitCode } = await run(/* js */ ` + process.on("memoryPressure", () => {}); + process.stdout.write("done"); + `); + expect(stdout).toBe("done"); + expect(exitCode).toBe(0); + }); + + test("removing on exit does not crash", async () => { + const { stdout, exitCode } = await run(/* js */ ` + const h = () => {}; + process.on("memoryPressure", h); + process.on("exit", () => { + process.off("memoryPressure", h); + process.stdout.write("exit"); + }); + process.stdout.write("done "); + `); + expect(stdout).toBe("done exit"); + expect(exitCode).toBe(0); + }); +});