From 93d80e4e98fe1e2818e3e808d35ae811dfdca329 Mon Sep 17 00:00:00 2001 From: robobun <117481402+robobun@users.noreply.github.com> Date: Mon, 22 Jun 2026 15:26:16 +0000 Subject: [PATCH 01/15] Add process.on('memoryPressure') event Exposes OS-level low-memory notifications to JS so callers can release caches or reap idle resources without polling. macOS: EVFILT_MEMORYSTATUS on the main kqueue (same filter libdispatch's DISPATCH_SOURCE_TYPE_MEMORYPRESSURE uses). Level is 'warning' or 'critical' based on the NOTE_MEMORYSTATUS_PRESSURE_* fflags. Linux: PSI trigger on /proc/pressure/memory (falls back to the cgroup v2 memory.pressure file), polled via EPOLLPRI on the main loop. Gracefully no-ops when PSI is unavailable or the trigger write is rejected (no CAP_SYS_RESOURCE on kernels before 6.6). Level is always 'critical'. Windows: CreateMemoryResourceNotification(LowMemoryResourceNotification) waited on the NT thread pool via RegisterWaitForSingleObject; posts back to JS through a uv_async_t. Level is always 'critical'. The watcher is per-VM, stored in RareData, armed on the first listener and disarmed on the last removal via the same onDidChangeListeners hook signals use. It does not keep the event loop alive. FilePoll gains a MemoryPressure registration kind (EVFILT_MEMORYSTATUS on Darwin, EPOLLPRI on Linux) alongside the existing Readable/Writable/ Process/Machport kinds. --- packages/bun-types/overrides.d.ts | 20 + src/io/posix_event_loop.rs | 58 ++- src/js/internal-for-testing.ts | 6 + src/jsc/bindings/BunProcess.cpp | 28 ++ src/jsc/bindings/InternalForTesting.cpp | 15 + src/jsc/bindings/InternalForTesting.h | 1 + src/jsc/rare_data.rs | 11 + src/runtime/dispatch.rs | 3 + src/runtime/node.rs | 2 + src/runtime/node/memory_pressure.rs | 435 ++++++++++++++++++ src/sys/lib.rs | 9 + .../process/process-memory-pressure.test.ts | 99 ++++ 12 files changed, 684 insertions(+), 3 deletions(-) create mode 100644 src/runtime/node/memory_pressure.rs create mode 100644 test/js/node/process/process-memory-pressure.test.ts diff --git a/packages/bun-types/overrides.d.ts b/packages/bun-types/overrides.d.ts index d906534cc48..ac1dd181941 100644 --- a/packages/bun-types/overrides.d.ts +++ b/packages/bun-types/overrides.d.ts @@ -94,6 +94,26 @@ declare global { _exiting: boolean; noDeprecation?: boolean | undefined; + /** + * Emitted when the operating system signals that available memory is + * running low. Use this to release caches or reap idle resources instead + * of polling. + * + * On macOS `level` distinguishes `"warning"` from `"critical"` based on + * the kernel's memorystatus thresholds. On Linux (PSI) and Windows the + * event is always emitted with `"critical"`. On Linux, the underlying + * PSI trigger requires `CAP_SYS_RESOURCE` on kernels before 6.6; when + * unavailable the event is simply never emitted. + * + * This listener does not keep the event loop alive. + */ + on(event: "memoryPressure", listener: (level: "warning" | "critical") => void): this; + once(event: "memoryPressure", listener: (level: "warning" | "critical") => void): this; + off(event: "memoryPressure", listener: (level: "warning" | "critical") => void): this; + addListener(event: "memoryPressure", listener: (level: "warning" | "critical") => void): this; + removeListener(event: "memoryPressure", listener: (level: "warning" | "critical") => void): this; + emit(event: "memoryPressure", level: "warning" | "critical"): boolean; + binding(m: "constants"): { os: typeof import("node:os").constants; fs: typeof import("node:fs").constants; diff --git a/src/io/posix_event_loop.rs b/src/io/posix_event_loop.rs index 942da6b2d8d..8f36b301a9a 100644 --- a/src/io/posix_event_loop.rs +++ b/src/io/posix_event_loop.rs @@ -221,6 +221,7 @@ pub enum PollTag { TerminalPoll, ParentDeathWatchdog, LifecycleScriptSubprocessOutputReader, + MemoryPressure, } /// Compatibility module — call sites in `bun_runtime`/`bun_install` still spell @@ -244,6 +245,7 @@ pub mod poll_tag { pub const PARENT_DEATH_WATCHDOG: PollTag = PollTag::ParentDeathWatchdog; pub const LIFECYCLE_SCRIPT_SUBPROCESS_OUTPUT_READER: PollTag = PollTag::LifecycleScriptSubprocessOutputReader; + pub const MEMORY_PRESSURE: PollTag = PollTag::MemoryPressure; } #[derive(Copy, Clone)] @@ -330,6 +332,7 @@ impl FilePoll { flags.remove(Flags::Writable); flags.remove(Flags::Process); flags.remove(Flags::Machport); + flags.remove(Flags::MemoryPressure); flags.remove(Flags::Eof); flags.remove(Flags::Hup); @@ -361,6 +364,14 @@ impl FilePoll { #[cfg(all(target_os = "macos", debug_assertions))] debug_assert!(self.generation_number == kqueue_event.ext[0] as usize); + // EVFILT_MEMORYSTATUS reports the pressure level in `fflags`, not `data`; + // thread it through `size_or_offset` so the dispatch arm can read it. + #[cfg(target_os = "macos")] + if kqueue_event.filter == bun_sys::darwin::EVFILT::MEMORYSTATUS { + self.on_update(kqueue_event.fflags as i64); + return; + } + self.on_update(kqueue_event.data as i64); } @@ -442,6 +453,7 @@ impl FilePoll { || self.flags.contains(Flags::PollReadable) || self.flags.contains(Flags::PollProcess) || self.flags.contains(Flags::PollMachport) + || self.flags.contains(Flags::PollMemoryPressure) } pub fn on_update(&mut self, size_or_offset: i64) { @@ -699,6 +711,8 @@ impl FilePoll { let mut flags: u32 = match flag { Flags::Process | Flags::Readable => EPOLL::IN | EPOLL::HUP | one_shot_flag, Flags::Writable => EPOLL::OUT | EPOLL::HUP | EPOLL::ERR | one_shot_flag, + // PSI trigger fds signal via POLLPRI only. + Flags::MemoryPressure => EPOLL::PRI | EPOLL::ERR | one_shot_flag, _ => unreachable!(), }; // epoll keys on fd alone; if the other direction is already @@ -784,6 +798,17 @@ impl FilePoll { flags: EV::ADD | one_shot_flag, ext: [self.generation_number as u64, 0], }, + // System-wide memory pressure. ident is always 0; EV_CLEAR so each + // transition delivers once (matches libdispatch's registration). + Flags::MemoryPressure => kevent64_s { + ident: 0, + filter: EVFILT::MEMORYSTATUS, + data: 0, + fflags: NOTE::MEMORYSTATUS_PRESSURE_WARN | NOTE::MEMORYSTATUS_PRESSURE_CRITICAL, + udata: Pollable::init(self).ptr() as u64, + flags: EV::ADD | EV::CLEAR | one_shot_flag, + ext: [self.generation_number as u64, 0], + }, _ => unreachable!(), }; @@ -863,7 +888,7 @@ impl FilePoll { NOTE::EXIT, udata, ), - Flags::Machport => { + Flags::Machport | Flags::MemoryPressure => { return sys::Result::Err(sys::Error::from_code( sys::E::EOPNOTSUPP, sys::Tag::kevent, @@ -914,6 +939,7 @@ impl FilePoll { } Flags::Writable => Flags::PollWritable, Flags::Machport => Flags::PollMachport, + Flags::MemoryPressure => Flags::PollMemoryPressure, _ => unreachable!(), }); self.flags.remove(Flags::NeedsRearm); @@ -972,7 +998,8 @@ impl FilePoll { if !(self.flags.contains(Flags::PollReadable) || self.flags.contains(Flags::PollWritable) || self.flags.contains(Flags::PollProcess) - || self.flags.contains(Flags::PollMachport)) + || self.flags.contains(Flags::PollMachport) + || self.flags.contains(Flags::PollMemoryPressure)) { // no-op return sys::Result::Ok(()); @@ -995,6 +1022,9 @@ impl FilePoll { if self.flags.contains(Flags::PollMachport) { break 'brk Flags::Machport; } + if self.flags.contains(Flags::PollMemoryPressure) { + break 'brk Flags::MemoryPressure; + } return sys::Result::Ok(()); }; @@ -1008,6 +1038,7 @@ impl FilePoll { self.flags.remove(Flags::PollReadable); self.flags.remove(Flags::PollWritable); self.flags.remove(Flags::PollMachport); + self.flags.remove(Flags::PollMemoryPressure); return sys::Result::Ok(()); } @@ -1078,6 +1109,15 @@ impl FilePoll { flags: EV::DELETE, ext: [0, 0], }, + Flags::MemoryPressure => kevent64_s { + ident: 0, + filter: EVFILT::MEMORYSTATUS, + data: 0, + fflags: 0, + udata: Pollable::init(self).ptr() as u64, + flags: EV::DELETE, + ext: [0, 0], + }, _ => unreachable!(), }; @@ -1154,7 +1194,7 @@ impl FilePoll { Flags::Readable => make_kevent(ident, EVFILT::READ, EV::DELETE, 0, udata), Flags::Writable => make_kevent(ident, EVFILT::WRITE, EV::DELETE, 0, udata), Flags::Process => make_kevent(ident, EVFILT::PROC, EV::DELETE, NOTE::EXIT, udata), - Flags::Machport => { + Flags::Machport | Flags::MemoryPressure => { return sys::Result::Err(sys::Error::from_code( sys::E::EOPNOTSUPP, sys::Tag::kevent, @@ -1196,6 +1236,7 @@ impl FilePoll { self.flags.remove(Flags::PollWritable); self.flags.remove(Flags::PollProcess); self.flags.remove(Flags::PollMachport); + self.flags.remove(Flags::PollMemoryPressure); sys::Result::Ok(()) } @@ -1230,6 +1271,8 @@ pub enum Flags { PollProcess, /// Poll for machport events PollMachport, + /// Poll for memory-pressure events (Darwin `EVFILT_MEMORYSTATUS`, Linux PSI `EPOLLPRI`) + PollMemoryPressure, // What did the event loop tell us? Readable, @@ -1238,6 +1281,7 @@ pub enum Flags { Eof, Hup, Machport, + MemoryPressure, // What is the type of file descriptor? Fifo, @@ -1273,6 +1317,7 @@ impl Flags { Flags::Writable => Flags::PollWritable, Flags::Process => Flags::PollProcess, Flags::Machport => Flags::PollMachport, + Flags::MemoryPressure => Flags::PollMemoryPressure, other => other, } } @@ -1301,6 +1346,10 @@ impl Flags { if kqueue_event.filter == EVFILT::MACHPORT { flags.insert(Flags::Machport); } + #[cfg(target_os = "macos")] + if kqueue_event.filter == EVFILT::MEMORYSTATUS { + flags.insert(Flags::MemoryPressure); + } } flags } @@ -1315,6 +1364,9 @@ impl Flags { if epoll.events & EPOLL::OUT != 0 { flags.insert(Flags::Writable); } + if epoll.events & EPOLL::PRI != 0 { + flags.insert(Flags::MemoryPressure); + } if epoll.events & EPOLL::ERR != 0 { flags.insert(Flags::Eof); } diff --git a/src/js/internal-for-testing.ts b/src/js/internal-for-testing.ts index 2f9a0fa67b6..3d080a6aef2 100644 --- a/src/js/internal-for-testing.ts +++ b/src/js/internal-for-testing.ts @@ -290,6 +290,12 @@ export const lowercaseHeaderNameSIMD: (name: string) => string = $newCppFunction 1, ); +export const emitMemoryPressure: (level: "warning" | "critical") => void = $newCppFunction( + "InternalForTesting.cpp", + "jsFunction_emitMemoryPressure", + 1, +); + export const getEventLoopStats: () => { activeTasks: number; concurrentRef: number; numPolls: number } = $newZigFunction("event_loop.zig", "getActiveTasks", 0); diff --git a/src/jsc/bindings/BunProcess.cpp b/src/jsc/bindings/BunProcess.cpp index 90188ca6017..fec52f40a2e 100644 --- a/src/jsc/bindings/BunProcess.cpp +++ b/src/jsc/bindings/BunProcess.cpp @@ -1382,9 +1382,24 @@ __attribute__((noinline)) static void forwardSignal(int signalNumber) Bun__onPosixSignal(signalNumber); } +extern "C" void Bun__MemoryPressure__install(JSC::JSGlobalObject* global); +extern "C" void Bun__MemoryPressure__uninstall(JSC::JSGlobalObject* global); + static void onDidChangeListeners(EventEmitter& eventEmitter, const Identifier& eventName, bool isAdded) { if (Bun__isMainThreadVM()) { + if (eventName == "memoryPressure") { + auto* global = eventEmitter.scriptExecutionContext()->jsGlobalObject(); + if (isAdded) { + if (eventEmitter.listenerCount(eventName) == 1) { + Bun__MemoryPressure__install(global); + } + } else if (eventEmitter.listenerCount(eventName) == 0) { + Bun__MemoryPressure__uninstall(global); + } + return; + } + // IPC handlers if (eventName == "message" || eventName == "disconnect") { auto* global = uncheckedDowncast(eventEmitter.scriptExecutionContext()->jsGlobalObject()); @@ -4314,6 +4329,19 @@ extern "C" void Process__emitDisconnectEvent(Zig::GlobalObject* global) } } +extern "C" void Process__emitMemoryPressureEvent(Zig::GlobalObject* global, int level) +{ + auto* process = global->processObject(); + auto& vm = JSC::getVM(global); + auto ident = Identifier::fromString(vm, "memoryPressure"_s); + if (process->wrapped().hasEventListeners(ident)) { + JSC::MarkedArgumentBuffer args; + // Level values match NOTE_MEMORYSTATUS_PRESSURE_WARN (2) / _CRITICAL (4). + args.append(jsString(vm, level == 2 ? String("warning"_s) : String("critical"_s))); + process->wrapped().emit(ident, args); + } +} + extern "C" void Process__emitErrorEvent(Zig::GlobalObject* global, EncodedJSValue value) { auto* process = global->processObject(); diff --git a/src/jsc/bindings/InternalForTesting.cpp b/src/jsc/bindings/InternalForTesting.cpp index 141195bc252..31901bdc5cf 100644 --- a/src/jsc/bindings/InternalForTesting.cpp +++ b/src/jsc/bindings/InternalForTesting.cpp @@ -101,4 +101,19 @@ JSC_DEFINE_HOST_FUNCTION(jsFunction_BunString_toThreadSafeRefCountDelta, (JSC::J return JSValue::encode(jsNumber(static_cast(after) - static_cast(before))); } +extern "C" void Bun__MemoryPressure__emit(JSC::JSGlobalObject* global, int level); + +// Synthetically fire process.on("memoryPressure") so tests can exercise the +// emit path without depending on real OS memory pressure. +JSC_DEFINE_HOST_FUNCTION(jsFunction_emitMemoryPressure, (JSC::JSGlobalObject * globalObject, JSC::CallFrame* callFrame)) +{ + auto& vm = globalObject->vm(); + auto scope = DECLARE_THROW_SCOPE(vm); + auto str = callFrame->argument(0).toWTFString(globalObject); + RETURN_IF_EXCEPTION(scope, {}); + int level = str == "warning"_s ? 2 : 4; + Bun__MemoryPressure__emit(defaultGlobalObject(globalObject), level); + return encodedJSUndefined(); +} + } diff --git a/src/jsc/bindings/InternalForTesting.h b/src/jsc/bindings/InternalForTesting.h index 7d7ea01e417..31c568c7cf4 100644 --- a/src/jsc/bindings/InternalForTesting.h +++ b/src/jsc/bindings/InternalForTesting.h @@ -11,5 +11,6 @@ JSC_DECLARE_HOST_FUNCTION(jsFunction_lsanDoLeakCheck); JSC_DECLARE_HOST_FUNCTION(jsFunction_isASANEnabled); JSC_DECLARE_HOST_FUNCTION(jsFunction_BunString_toThreadSafeRefCountDelta); JSC_DECLARE_HOST_FUNCTION(jsFunction_lowercaseHeaderNameSIMD); +JSC_DECLARE_HOST_FUNCTION(jsFunction_emitMemoryPressure); } diff --git a/src/jsc/rare_data.rs b/src/jsc/rare_data.rs index e61f53ad37e..ded17e225e6 100644 --- a/src/jsc/rare_data.rs +++ b/src/jsc/rare_data.rs @@ -274,6 +274,10 @@ pub struct RareData { /// lazy-init in `bun_runtime::node::node_fs_stat_watcher`. pub node_fs_stat_watcher_scheduler: Option>, + /// `bun_runtime::node::memory_pressure::MemoryPressureWatcher` — erased + /// `Box`; lazy-init on the first `process.on("memoryPressure", ...)` listener. + pub memory_pressure_watcher: Option>, + /// Watch-mode restart needs to RST every listen socket so the new process /// can rebind without `EADDRINUSE`. Written on the JS thread; drained on /// the watcher thread — hence the mutex (PORTING.md §Concurrency: lock @@ -336,6 +340,7 @@ impl Default for RareData { default_client_ssl_ctx: None, mime_types: None, node_fs_stat_watcher_scheduler: None, + memory_pressure_watcher: None, listening_sockets_for_watch_mode: Mutex::new(Vec::new()), temp_pipe_read_buffer: None, s3_default_client: Strong::empty(), @@ -640,6 +645,12 @@ impl RareData { &mut self.node_fs_stat_watcher_scheduler } + /// Raw slot — lazy-init body lives in `bun_runtime::node::memory_pressure`. + #[inline] + pub fn memory_pressure_watcher_slot(&mut self) -> &mut Option> { + &mut self.memory_pressure_watcher + } + /// Raw slot — lazy-init body lives in `bun_http_jsc::WebSocketDeflate`. #[inline] pub fn websocket_deflate_slot(&mut self) -> &mut Option { diff --git a/src/runtime/dispatch.rs b/src/runtime/dispatch.rs index b93fe2b60cf..09c8375997c 100644 --- a/src/runtime/dispatch.rs +++ b/src/runtime/dispatch.rs @@ -679,6 +679,9 @@ pub unsafe fn __bun_run_file_poll(poll: *mut FilePoll, size_or_offset: i64) { // SAFETY: `proc` carries the +1 ref taken at queue time; this drops it. unsafe { Process::on_wait_pid_from_event_loop_task(proc) }; } + poll_tag::MEMORY_PRESSURE => { + crate::node::memory_pressure::on_poll(owner.ptr.cast(), size_or_offset); + } poll_tag::PARENT_DEATH_WATCHDOG => { let wd = owner_as!(bun_io::parent_death_watchdog::ParentDeathWatchdog); // Mac-only — debug-assert elsewhere (Linux uses prctl(PR_SET_PDEATHSIG)). diff --git a/src/runtime/node.rs b/src/runtime/node.rs index 2d79aefdd0b..3a2a41113f5 100644 --- a/src/runtime/node.rs +++ b/src/runtime/node.rs @@ -110,6 +110,8 @@ pub mod node_fs_watcher; #[cfg(windows)] #[path = "node/uv_signal_handle_windows.rs"] pub mod uv_signal_handle_windows; +#[path = "node/memory_pressure.rs"] +pub mod memory_pressure; // Type defs + non-JSC FFI bodies are live; every `#[bun_jsc::host_fn]` / // `#[bun_jsc::JsClass]` item is wrapped in ` mod _impl` inside diff --git a/src/runtime/node/memory_pressure.rs b/src/runtime/node/memory_pressure.rs new file mode 100644 index 00000000000..9f1827d18e1 --- /dev/null +++ b/src/runtime/node/memory_pressure.rs @@ -0,0 +1,435 @@ +//! `process.on("memoryPressure", level => ...)` — OS-level low-memory +//! notifications without polling. +//! +//! Backends: +//! - macOS: `EVFILT_MEMORYSTATUS` on the main event loop's kqueue (the same +//! filter libdispatch's `DISPATCH_SOURCE_TYPE_MEMORYPRESSURE` uses). The +//! kernel delivers `NOTE_MEMORYSTATUS_PRESSURE_WARN` / `_CRITICAL` in +//! `fflags` when `kern.memorystatus_level` crosses the warn/critical +//! thresholds. +//! - Linux: PSI trigger on `/proc/pressure/memory` (or the cgroup v2 +//! `memory.pressure` file for the container's own cgroup). PSI triggers +//! signal via `POLLPRI`. Requires `CAP_SYS_RESOURCE` before kernel 6.6, +//! and PSI enabled (`CONFIG_PSI=y`). If neither path can be opened for +//! writing, the watcher silently does nothing. +//! - Windows: `CreateMemoryResourceNotification(LowMemoryResourceNotification)` +//! waited on a thread-pool thread via `RegisterWaitForSingleObject`; the +//! callback posts back to the JS thread through a `uv_async_t`. +//! +//! Armed lazily on the first listener and disarmed on the last removal via +//! `onDidChangeListeners` in `BunProcess.cpp`, matching how signal handlers +//! are wired. The watcher does not keep the event loop alive. + +use bun_jsc::JSGlobalObject; + +/// Pressure level passed to JS. Values are the `NOTE_MEMORYSTATUS_PRESSURE_*` +/// bits on macOS so the kqueue dispatch can pass `fflags` through unchanged. +pub mod level { + pub const WARNING: i32 = 0x00000002; + pub const CRITICAL: i32 = 0x00000004; +} + +unsafe extern "C" { + /// Defined in `src/jsc/bindings/BunProcess.cpp`. Builds the level string + /// and emits `"memoryPressure"` on the process object. + fn Process__emitMemoryPressureEvent(global: *mut JSGlobalObject, level: i32); +} + +/// Emit the `"memoryPressure"` event on the given global's process object. +/// Called from the `FilePoll` dispatch arm (already on the JS thread). +pub fn emit(global: &JSGlobalObject, lvl: i32) { + // Anything other than WARN is reported as critical. On Linux the PSI + // trigger doesn't carry a level, and on Windows there is only + // `LowMemoryResourceNotification`; both map to critical. + let lvl = if lvl & level::WARNING != 0 { + level::WARNING + } else { + level::CRITICAL + }; + // SAFETY: `global` is the live per-thread global; the C++ side handles + // the "no listeners" case via `hasEventListeners`. + unsafe { Process__emitMemoryPressureEvent(global as *const _ as *mut _, lvl) }; +} + +// ──────────────────────────────────────────────────────────────────────────── +// POSIX backend (macOS EVFILT_MEMORYSTATUS, Linux PSI) via FilePoll +// ──────────────────────────────────────────────────────────────────────────── + +#[cfg(not(windows))] +mod posix { + use core::ptr::{self, NonNull}; + + use bun_io::posix_event_loop::{EventLoopCtx, FilePoll, Flags, Owner, poll_tag}; + use bun_jsc::JSGlobalObject; + use bun_jsc::virtual_machine::VirtualMachine; + use bun_sys::Fd; + + /// Per-VM watcher. Stored type-erased in `RareData.memory_pressure_watcher`. + pub(super) struct MemoryPressureWatcher { + /// Back-pointer so the poll dispatch can reach JS without going through + /// the per-thread VM singleton (workers each have their own global). + global: *mut JSGlobalObject, + /// Always set once `install` returns. The poll owns the PSI fd on Linux + /// (closed in `uninstall`); on macOS the fd slot is the kevent ident (0). + poll: *mut FilePoll, + /// Whether `poll` was successfully registered with kqueue/epoll. On + /// Linux this is false when PSI is unavailable or requires privileges + /// we don't have; the emit path is still functional for tests. + registered: bool, + } + + fn slot(vm: &mut VirtualMachine) -> &mut Option> { + vm.rare_data().memory_pressure_watcher_slot() + } + + /// Open a PSI memory file and write a trigger. Tries the system-wide + /// `/proc/pressure/memory` first, then the current cgroup's + /// `memory.pressure` (relevant inside containers that can't write the + /// global file). Returns the fd on success. + #[cfg(any(target_os = "linux", target_os = "android"))] + fn open_psi_fd() -> Option { + use bun_sys::O; + + /// 150 ms of "some"-stall in any 2 s window. 2 s is the minimum window + /// for unprivileged PSI triggers (kernel 6.6+). + const TRIGGER: &[u8] = b"some 150000 2000000"; + + let paths: [&bun_core::ZStr; 2] = [ + bun_core::zstr!("/proc/pressure/memory"), + bun_core::zstr!("/sys/fs/cgroup/memory.pressure"), + ]; + for path in paths { + let fd = match bun_sys::open(path, O::RDWR | O::NONBLOCK | O::CLOEXEC, 0) { + Ok(fd) => fd, + Err(_) => continue, + }; + match bun_sys::write(fd, TRIGGER) { + Ok(_) => return Some(fd), + Err(_) => { + let _ = bun_sys::close(fd); + continue; + } + } + } + None + } + + pub(super) fn install(global: &JSGlobalObject) { + let vm = global.bun_vm_ptr(); + // SAFETY: `bun_vm_ptr()` asserts same-thread; VM outlives this call. + let vm_ref = unsafe { &mut *vm }; + if slot(vm_ref).is_some() { + return; + } + + // SAFETY: VM singleton is live for the JS thread. + let ctx: EventLoopCtx = unsafe { VirtualMachine::event_loop_ctx(vm) }; + + let watcher = bun_core::heap::into_raw(Box::new(MemoryPressureWatcher { + global: global as *const _ as *mut _, + poll: ptr::null_mut(), + registered: false, + })); + + #[cfg(any(target_os = "linux", target_os = "android"))] + let fd = open_psi_fd(); + #[cfg(target_os = "macos")] + let fd = Some(Fd::from_native(0)); + #[cfg(not(any(target_os = "linux", target_os = "android", target_os = "macos")))] + let fd: Option = None; + + if let Some(fd) = fd { + let poll = FilePoll::init( + ctx, + fd, + Default::default(), + Owner::new(poll_tag::MEMORY_PRESSURE, watcher.cast()), + ); + // SAFETY: `poll` was just allocated by `FilePoll::init` (sole borrow); + // `platform_event_loop` returns the live uws loop. + let registered = match unsafe { &mut *poll }.register( + unsafe { ctx.platform_event_loop() }, + Flags::MemoryPressure, + false, + ) { + bun_sys::Result::Ok(()) => true, + Err(_) => { + #[cfg(any(target_os = "linux", target_os = "android"))] + { + let _ = bun_sys::close(fd); + } + false + } + }; + // SAFETY: `watcher` was just heap-allocated above; sole owner. + unsafe { + (*watcher).poll = poll; + (*watcher).registered = registered; + } + } + + // SAFETY: VM singleton is live; re-derive to avoid holding a `&mut` across the register. + *slot(unsafe { &mut *vm }) = NonNull::new(watcher.cast()); + } + + pub(super) fn uninstall(global: &JSGlobalObject) { + let vm = global.bun_vm_ptr(); + // SAFETY: same-thread VM access (asserted by `bun_vm_ptr`). + let Some(raw) = core::mem::take(slot(unsafe { &mut *vm })) else { + return; + }; + // SAFETY: slot was populated by `install` with a `Box`. + let watcher = unsafe { bun_core::heap::take(raw.as_ptr().cast::()) }; + + if !watcher.poll.is_null() { + #[cfg(any(target_os = "linux", target_os = "android"))] + let psi_fd = if watcher.registered { + // SAFETY: `poll` is live until `deinit` below. + Some(unsafe { (*watcher.poll).fd }) + } else { + None + }; + + // `deinit` unregisters (kqueue EV_DELETE / epoll CTL_DEL) and returns + // the slot to the hive; fd ownership is ours. + // SAFETY: `poll` is a live hive slot until this call returns. + unsafe { (*watcher.poll).deinit() }; + + #[cfg(any(target_os = "linux", target_os = "android"))] + if let Some(fd) = psi_fd { + let _ = bun_sys::close(fd); + } + } + } + + /// `__bun_run_file_poll` dispatch target. `fflags` is the kqueue `fflags` + /// on macOS (carrying the pressure level) and 0 on Linux. + pub fn on_poll(owner_ptr: *mut core::ffi::c_void, fflags: i64) { + // SAFETY: `owner_ptr` was set via `Owner::new(MEMORY_PRESSURE, watcher)` in `install`. + let watcher = unsafe { &*owner_ptr.cast::() }; + // SAFETY: `global` is the live per-thread global captured at install time. + let global = unsafe { &*watcher.global }; + #[cfg(target_os = "macos")] + let lvl = fflags as i32; + #[cfg(not(target_os = "macos"))] + let lvl = { + let _ = fflags; + super::level::CRITICAL + }; + super::emit(global, lvl); + } +} + +// ──────────────────────────────────────────────────────────────────────────── +// Windows backend: CreateMemoryResourceNotification + RegisterWaitForSingleObject +// ──────────────────────────────────────────────────────────────────────────── + +#[cfg(windows)] +mod windows { + use core::ffi::c_void; + use core::ptr::{self, NonNull}; + use core::sync::atomic::{AtomicI32, Ordering}; + + use bun_jsc::JSGlobalObject; + use bun_jsc::virtual_machine::VirtualMachine; + use bun_sys::windows::libuv; + + type HANDLE = *mut c_void; + type BOOL = i32; + type ULONG = u32; + const INVALID_HANDLE_VALUE: HANDLE = usize::MAX as HANDLE; + /// `LowMemoryResourceNotification` enum value. + const LOW_MEMORY_RESOURCE_NOTIFICATION: i32 = 0; + /// `WT_EXECUTEDEFAULT` — run the callback on a normal thread-pool thread, + /// re-arm after each fire. + const WT_EXECUTEDEFAULT: ULONG = 0; + + unsafe extern "system" { + fn CreateMemoryResourceNotification(kind: i32) -> HANDLE; + fn RegisterWaitForSingleObject( + out_wait: *mut HANDLE, + handle: HANDLE, + callback: unsafe extern "system" fn(ctx: *mut c_void, timed_out: u8), + ctx: *mut c_void, + millis: ULONG, + flags: ULONG, + ) -> BOOL; + fn UnregisterWaitEx(wait: HANDLE, completion_event: HANDLE) -> BOOL; + fn CloseHandle(h: HANDLE) -> BOOL; + } + + #[repr(C)] + struct MemoryPressureWatcher { + /// `uv_async_t` must come first so `async_.data` → `*mut Self` works in + /// the close callback. + async_: libuv::uv_async_t, + global: *mut JSGlobalObject, + /// Signalled by the kernel when available memory is low. + notify: HANDLE, + /// Thread-pool wait registration. + wait: HANDLE, + /// Set from the thread-pool callback; read on the JS thread. + pending_level: AtomicI32, + } + + fn slot(vm: &mut VirtualMachine) -> &mut Option> { + vm.rare_data().memory_pressure_watcher_slot() + } + + /// Runs on a Windows thread-pool thread. May only touch `pending_level` + /// and `uv_async_send` (which is documented thread-safe). + unsafe extern "system" fn wait_callback(ctx: *mut c_void, _timed_out: u8) { + // SAFETY: `ctx` is the watcher pointer passed at registration; the + // wait is unregistered with `INVALID_HANDLE_VALUE` before we free it, + // so it outlives every callback. + let watcher = unsafe { &*ctx.cast::() }; + watcher + .pending_level + .store(super::level::CRITICAL, Ordering::SeqCst); + // SAFETY: `async_` was uv_async_init'd on the JS loop; uv_async_send is thread-safe. + let _ = unsafe { libuv::uv_async_send(ptr::from_ref(&watcher.async_).cast_mut()) }; + } + + /// Runs on the JS thread. + unsafe extern "C" fn on_async(handle: *mut libuv::uv_async_t) { + // SAFETY: `data` was set to the watcher in `install`. + let watcher = unsafe { &*(*handle).data.cast::() }; + let lvl = watcher.pending_level.swap(0, Ordering::SeqCst); + if lvl != 0 { + // SAFETY: `global` is the live per-thread global captured at install. + super::emit(unsafe { &*watcher.global }, lvl); + } + } + + extern "C" fn free_on_close(handle: *mut libuv::uv_handle_t) { + // SAFETY: `handle` is the leading `uv_async_t` field of a heap-allocated + // `MemoryPressureWatcher`; uv_close guarantees no further use. + drop(unsafe { bun_core::heap::take(handle.cast::()) }); + } + + pub(super) fn install(global: &JSGlobalObject) { + let vm = global.bun_vm_ptr(); + // SAFETY: same-thread VM access. + let vm_ref = unsafe { &mut *vm }; + if slot(vm_ref).is_some() { + return; + } + + // SAFETY: the allocation is fully written by uv_async_init + field stores below. + let watcher: *mut MemoryPressureWatcher = + bun_core::heap::into_raw(Box::::new_uninit()).cast(); + + // SAFETY: `watcher.async_` is a valid `uv_async_t`-sized slot; `uv_loop` + // is the VM's live libuv loop. + let rc = unsafe { + libuv::uv_async_init( + global.bun_vm().uv_loop(), + ptr::addr_of_mut!((*watcher).async_), + Some(on_async), + ) + }; + if rc != 0 { + // SAFETY: never handed out; just free the raw allocation. + drop(unsafe { bun_core::heap::take(watcher) }); + return; + } + // SAFETY: `async_` is an initialized, active handle. + unsafe { libuv::uv_unref(ptr::addr_of_mut!((*watcher).async_).cast()) }; + + // SAFETY: `watcher` is a freshly allocated, uv-initialised struct. + unsafe { + (*watcher).async_.data = watcher.cast(); + (*watcher).global = global as *const _ as *mut _; + (*watcher).notify = ptr::null_mut(); + (*watcher).wait = ptr::null_mut(); + ptr::addr_of_mut!((*watcher).pending_level).write(AtomicI32::new(0)); + } + + // SAFETY: Win32 call; returns NULL on failure. + let notify = unsafe { CreateMemoryResourceNotification(LOW_MEMORY_RESOURCE_NOTIFICATION) }; + if !notify.is_null() { + let mut wait: HANDLE = ptr::null_mut(); + // SAFETY: `notify` is a valid notification handle; `watcher` + // outlives the wait (guaranteed by the blocking `UnregisterWaitEx` + // in `uninstall`). + let ok = unsafe { + RegisterWaitForSingleObject( + &mut wait, + notify, + wait_callback, + watcher.cast(), + u32::MAX, + WT_EXECUTEDEFAULT, + ) + }; + if ok != 0 { + // SAFETY: sole owner of `watcher`. + unsafe { + (*watcher).notify = notify; + (*watcher).wait = wait; + } + } else { + // SAFETY: `notify` owned here; never registered. + unsafe { CloseHandle(notify) }; + } + } + + // SAFETY: same-thread VM access. + *slot(unsafe { &mut *vm }) = NonNull::new(watcher.cast()); + } + + pub(super) fn uninstall(global: &JSGlobalObject) { + let vm = global.bun_vm_ptr(); + // SAFETY: same-thread VM access. + let Some(raw) = core::mem::take(slot(unsafe { &mut *vm })) else { + return; + }; + let watcher = raw.as_ptr().cast::(); + + // SAFETY: `watcher` is the live allocation from `install`. + unsafe { + if !(*watcher).wait.is_null() { + // `INVALID_HANDLE_VALUE` blocks until any in-flight callback returns, + // so `watcher` is guaranteed unreferenced by the thread pool after this. + UnregisterWaitEx((*watcher).wait, INVALID_HANDLE_VALUE); + } + if !(*watcher).notify.is_null() { + CloseHandle((*watcher).notify); + } + libuv::uv_close( + ptr::addr_of_mut!((*watcher).async_).cast(), + Some(free_on_close), + ); + } + } +} + +// ──────────────────────────────────────────────────────────────────────────── +// C-ABI exports for BunProcess.cpp +// ──────────────────────────────────────────────────────────────────────────── + +#[unsafe(no_mangle)] +pub extern "C" fn Bun__MemoryPressure__install(global: &JSGlobalObject) { + #[cfg(not(windows))] + posix::install(global); + #[cfg(windows)] + windows::install(global); +} + +#[unsafe(no_mangle)] +pub extern "C" fn Bun__MemoryPressure__uninstall(global: &JSGlobalObject) { + #[cfg(not(windows))] + posix::uninstall(global); + #[cfg(windows)] + windows::uninstall(global); +} + +/// Synthetic emit for `bun:internal-for-testing`. Bypasses the OS backend and +/// drives the same C++ emit path a real notification would. +#[unsafe(no_mangle)] +pub extern "C" fn Bun__MemoryPressure__emit(global: &JSGlobalObject, lvl: i32) { + emit(global, lvl); +} + +#[cfg(not(windows))] +pub use posix::on_poll; diff --git a/src/sys/lib.rs b/src/sys/lib.rs index e78f24a5cd7..64d2f26b483 100644 --- a/src/sys/lib.rs +++ b/src/sys/lib.rs @@ -5384,6 +5384,7 @@ pub mod linux { pub mod EPOLL { pub const IN: u32 = libc::EPOLLIN as u32; pub const OUT: u32 = libc::EPOLLOUT as u32; + pub const PRI: u32 = libc::EPOLLPRI as u32; pub const ERR: u32 = libc::EPOLLERR as u32; pub const HUP: u32 = libc::EPOLLHUP as u32; pub const RDHUP: u32 = libc::EPOLLRDHUP as u32; @@ -5665,6 +5666,9 @@ pub mod darwin { pub const TIMER: i16 = libc::EVFILT_TIMER; pub const USER: i16 = libc::EVFILT_USER; pub const MACHPORT: i16 = libc::EVFILT_MACHPORT; + /// xnu-private filter used by libdispatch's `DISPATCH_SOURCE_TYPE_MEMORYPRESSURE`. + /// Not in `` (only ``), so hard-code the value. + pub const MEMORYSTATUS: i16 = -14; } /// kqueue event flags (Darwin). pub mod EV { @@ -5694,6 +5698,11 @@ pub mod darwin { pub const LINK: u32 = libc::NOTE_LINK; pub const RENAME: u32 = libc::NOTE_RENAME; pub const REVOKE: u32 = libc::NOTE_REVOKE; + /// `EVFILT_MEMORYSTATUS` fflags (xnu ``). Values are + /// ABI-stable; libdispatch depends on them for `DISPATCH_MEMORYPRESSURE_*`. + pub const MEMORYSTATUS_PRESSURE_NORMAL: u32 = 0x00000001; + pub const MEMORYSTATUS_PRESSURE_WARN: u32 = 0x00000002; + pub const MEMORYSTATUS_PRESSURE_CRITICAL: u32 = 0x00000004; } /// Re-export of the platform errno enum so `bun_threading::Futex` can /// match `c::E::INTR` etc. against `__ulock_*` return codes. diff --git a/test/js/node/process/process-memory-pressure.test.ts b/test/js/node/process/process-memory-pressure.test.ts new file mode 100644 index 00000000000..4ec3b39addf --- /dev/null +++ b/test/js/node/process/process-memory-pressure.test.ts @@ -0,0 +1,99 @@ +import { describe, expect, test } from "bun:test"; +import { bunEnv, bunExe } from "harness"; + +// process.on("memoryPressure") is a Bun extension. These tests drive the +// emit path synthetically via bun:internal-for-testing since real OS memory +// pressure cannot be induced reliably (and PSI trigger creation requires +// CAP_SYS_RESOURCE on Linux kernels before 6.6, which CI containers lack). + +async function run(src: string) { + await using proc = Bun.spawn({ + cmd: [bunExe(), "-e", src], + env: bunEnv, + stderr: "pipe", + stdout: "pipe", + }); + const [stdout, stderr, exitCode] = await Promise.all([proc.stdout.text(), proc.stderr.text(), proc.exited]); + return { stdout, stderr, exitCode }; +} + +describe.concurrent("process.on('memoryPressure')", () => { + test("listener receives level argument", async () => { + const { stdout, stderr, exitCode } = await run(/* js */ ` + const { emitMemoryPressure } = require("bun:internal-for-testing"); + const seen = []; + process.on("memoryPressure", level => seen.push(level)); + emitMemoryPressure("warning"); + emitMemoryPressure("critical"); + process.stdout.write(JSON.stringify(seen)); + `); + expect({ stdout, stderr: stderr.trim() }).toEqual({ + stdout: JSON.stringify(["warning", "critical"]), + stderr: "", + }); + expect(exitCode).toBe(0); + }); + + test("disarms when the last listener is removed", async () => { + const { stdout, stderr, exitCode } = await run(/* js */ ` + const { emitMemoryPressure } = require("bun:internal-for-testing"); + const seen = []; + const a = level => seen.push("a:" + level); + const b = level => seen.push("b:" + level); + process.on("memoryPressure", a); + process.on("memoryPressure", b); + emitMemoryPressure("warning"); + process.off("memoryPressure", a); + emitMemoryPressure("critical"); + process.off("memoryPressure", b); + // No listeners registered; emit should be a no-op. + emitMemoryPressure("critical"); + // Re-arm and emit again to prove the watcher can be reinstalled. + process.on("memoryPressure", a); + emitMemoryPressure("warning"); + process.off("memoryPressure", a); + process.stdout.write(JSON.stringify(seen)); + `); + expect({ stdout, stderr: stderr.trim() }).toEqual({ + stdout: JSON.stringify(["a:warning", "b:warning", "b:critical", "a:warning"]), + stderr: "", + }); + expect(exitCode).toBe(0); + }); + + test("process.once works", async () => { + const { stdout, exitCode } = await run(/* js */ ` + const { emitMemoryPressure } = require("bun:internal-for-testing"); + const seen = []; + process.once("memoryPressure", level => seen.push(level)); + emitMemoryPressure("critical"); + emitMemoryPressure("critical"); + process.stdout.write(JSON.stringify(seen)); + `); + expect(stdout).toBe(JSON.stringify(["critical"])); + expect(exitCode).toBe(0); + }); + + test("listener does not keep the event loop alive", async () => { + const { stdout, exitCode } = await run(/* js */ ` + process.on("memoryPressure", () => {}); + process.stdout.write("done"); + `); + expect(stdout).toBe("done"); + expect(exitCode).toBe(0); + }); + + test("removing on exit does not crash", async () => { + const { stdout, exitCode } = await run(/* js */ ` + const h = () => {}; + process.on("memoryPressure", h); + process.on("exit", () => { + process.off("memoryPressure", h); + process.stdout.write("exit"); + }); + process.stdout.write("done "); + `); + expect(stdout).toBe("done exit"); + expect(exitCode).toBe(0); + }); +}); From 21ad6d5ef8022093b0aebe92d5f8bc0ea7d0d04a Mon Sep 17 00:00:00 2001 From: "autofix-ci[bot]" <114827586+autofix-ci[bot]@users.noreply.github.com> Date: Mon, 22 Jun 2026 15:29:03 +0000 Subject: [PATCH 02/15] [autofix.ci] apply automated fixes --- src/runtime/node.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/runtime/node.rs b/src/runtime/node.rs index 3a2a41113f5..3f2c9278df8 100644 --- a/src/runtime/node.rs +++ b/src/runtime/node.rs @@ -101,6 +101,8 @@ pub mod win_watcher; // Force-references `Bun__UVSignalHandle__init` / `Bun__UVSignalHandle__close` // for C++ (`src/jsc/bindings/BunProcess.cpp`). Must be `mod`-declared or the // `#[no_mangle]` exports are never compiled into the binary. +#[path = "node/memory_pressure.rs"] +pub mod memory_pressure; #[path = "node/node_fs_binding.rs"] pub mod node_fs_binding; #[path = "node/node_fs_stat_watcher.rs"] @@ -110,8 +112,6 @@ pub mod node_fs_watcher; #[cfg(windows)] #[path = "node/uv_signal_handle_windows.rs"] pub mod uv_signal_handle_windows; -#[path = "node/memory_pressure.rs"] -pub mod memory_pressure; // Type defs + non-JSC FFI bodies are live; every `#[bun_jsc::host_fn]` / // `#[bun_jsc::JsClass]` item is wrapped in ` mod _impl` inside From c0d47c174f9dedf8cc7c1eaf877007c544c3b83d Mon Sep 17 00:00:00 2001 From: robobun <117481402+robobun@users.noreply.github.com> Date: Mon, 22 Jun 2026 15:44:07 +0000 Subject: [PATCH 03/15] Fix clippy: use ptr::from_ref().cast_mut() instead of as-casts --- src/runtime/node/memory_pressure.rs | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/src/runtime/node/memory_pressure.rs b/src/runtime/node/memory_pressure.rs index 9f1827d18e1..7e1a022a566 100644 --- a/src/runtime/node/memory_pressure.rs +++ b/src/runtime/node/memory_pressure.rs @@ -48,7 +48,7 @@ pub fn emit(global: &JSGlobalObject, lvl: i32) { }; // SAFETY: `global` is the live per-thread global; the C++ side handles // the "no listeners" case via `hasEventListeners`. - unsafe { Process__emitMemoryPressureEvent(global as *const _ as *mut _, lvl) }; + unsafe { Process__emitMemoryPressureEvent(core::ptr::from_ref(global).cast_mut(), lvl) }; } // ──────────────────────────────────────────────────────────────────────────── @@ -126,7 +126,7 @@ mod posix { let ctx: EventLoopCtx = unsafe { VirtualMachine::event_loop_ctx(vm) }; let watcher = bun_core::heap::into_raw(Box::new(MemoryPressureWatcher { - global: global as *const _ as *mut _, + global: ptr::from_ref(global).cast_mut(), poll: ptr::null_mut(), registered: false, })); @@ -339,7 +339,7 @@ mod windows { // SAFETY: `watcher` is a freshly allocated, uv-initialised struct. unsafe { (*watcher).async_.data = watcher.cast(); - (*watcher).global = global as *const _ as *mut _; + (*watcher).global = ptr::from_ref(global).cast_mut(); (*watcher).notify = ptr::null_mut(); (*watcher).wait = ptr::null_mut(); ptr::addr_of_mut!((*watcher).pending_level).write(AtomicI32::new(0)); From a90554e01312c734d32dd6346d5bc070e928cd65 Mon Sep 17 00:00:00 2001 From: robobun <117481402+robobun@users.noreply.github.com> Date: Mon, 22 Jun 2026 16:12:07 +0000 Subject: [PATCH 04/15] Address review: Windows WT_EXECUTEONLYONCE + holdoff, cgroup path, install-state hook Windows: the low-memory notification handle is level-triggered, so a recurring thread-pool wait would spin while the condition persists. Use WT_EXECUTEONLYONCE and a 30s uv_timer holdoff before re-arming. Also keeps the MaybeUninit pointer typed for the uv_async_init failure path and chains uv_close through both handles before freeing. Linux: build the cgroup fallback path from the 0:: line in /proc/self/cgroup instead of hard-coding /sys/fs/cgroup/memory.pressure, so nested (non-namespaced) cgroups resolve to the correct file. Tests: add isMemoryPressureWatcherInstalled() to bun:internal-for-testing so the arm/disarm test observes the native install/uninstall path directly rather than relying on listener-count filtering in the emit helper. --- src/js/internal-for-testing.ts | 6 + src/jsc/bindings/InternalForTesting.cpp | 8 + src/jsc/bindings/InternalForTesting.h | 1 + src/runtime/node/memory_pressure.rs | 222 +++++++++++++----- .../process/process-memory-pressure.test.ts | 19 +- 5 files changed, 194 insertions(+), 62 deletions(-) diff --git a/src/js/internal-for-testing.ts b/src/js/internal-for-testing.ts index 3d080a6aef2..f75ee408b8b 100644 --- a/src/js/internal-for-testing.ts +++ b/src/js/internal-for-testing.ts @@ -296,6 +296,12 @@ export const emitMemoryPressure: (level: "warning" | "critical") => void = $newC 1, ); +export const isMemoryPressureWatcherInstalled: () => boolean = $newCppFunction( + "InternalForTesting.cpp", + "jsFunction_isMemoryPressureWatcherInstalled", + 0, +); + export const getEventLoopStats: () => { activeTasks: number; concurrentRef: number; numPolls: number } = $newZigFunction("event_loop.zig", "getActiveTasks", 0); diff --git a/src/jsc/bindings/InternalForTesting.cpp b/src/jsc/bindings/InternalForTesting.cpp index 31901bdc5cf..69121a0e5b2 100644 --- a/src/jsc/bindings/InternalForTesting.cpp +++ b/src/jsc/bindings/InternalForTesting.cpp @@ -102,6 +102,7 @@ JSC_DEFINE_HOST_FUNCTION(jsFunction_BunString_toThreadSafeRefCountDelta, (JSC::J } extern "C" void Bun__MemoryPressure__emit(JSC::JSGlobalObject* global, int level); +extern "C" bool Bun__MemoryPressure__isInstalled(JSC::JSGlobalObject* global); // Synthetically fire process.on("memoryPressure") so tests can exercise the // emit path without depending on real OS memory pressure. @@ -116,4 +117,11 @@ JSC_DEFINE_HOST_FUNCTION(jsFunction_emitMemoryPressure, (JSC::JSGlobalObject * g return encodedJSUndefined(); } +// Whether the per-VM memory-pressure watcher is currently installed, so tests +// can observe that process.on/off actually arm/disarm the OS backend. +JSC_DEFINE_HOST_FUNCTION(jsFunction_isMemoryPressureWatcherInstalled, (JSC::JSGlobalObject * globalObject, JSC::CallFrame* callFrame)) +{ + return JSValue::encode(jsBoolean(Bun__MemoryPressure__isInstalled(defaultGlobalObject(globalObject)))); +} + } diff --git a/src/jsc/bindings/InternalForTesting.h b/src/jsc/bindings/InternalForTesting.h index 31c568c7cf4..e16a2a98b6c 100644 --- a/src/jsc/bindings/InternalForTesting.h +++ b/src/jsc/bindings/InternalForTesting.h @@ -12,5 +12,6 @@ JSC_DECLARE_HOST_FUNCTION(jsFunction_isASANEnabled); JSC_DECLARE_HOST_FUNCTION(jsFunction_BunString_toThreadSafeRefCountDelta); JSC_DECLARE_HOST_FUNCTION(jsFunction_lowercaseHeaderNameSIMD); JSC_DECLARE_HOST_FUNCTION(jsFunction_emitMemoryPressure); +JSC_DECLARE_HOST_FUNCTION(jsFunction_isMemoryPressureWatcherInstalled); } diff --git a/src/runtime/node/memory_pressure.rs b/src/runtime/node/memory_pressure.rs index 7e1a022a566..da186987670 100644 --- a/src/runtime/node/memory_pressure.rs +++ b/src/runtime/node/memory_pressure.rs @@ -82,6 +82,37 @@ mod posix { vm.rare_data().memory_pressure_watcher_slot() } + /// Build `/sys/fs/cgroup//memory.pressure` from the cgroup v2 + /// entry in `/proc/self/cgroup`. Inside a cgroup namespace the entry is + /// `0::/`, which yields `/sys/fs/cgroup/memory.pressure` (the container's + /// delegated root). Outside a namespace the path may be a systemd slice. + #[cfg(any(target_os = "linux", target_os = "android"))] + fn own_cgroup_pressure_path(buf: &mut [u8]) -> Option<&bun_core::ZStr> { + use bun_sys::O; + let fd = bun_sys::open(bun_core::zstr!("/proc/self/cgroup"), O::RDONLY, 0).ok()?; + let mut read = [0u8; 256]; + let n = bun_sys::read(fd, &mut read).unwrap_or(0); + let _ = bun_sys::close(fd); + // cgroup v2 line: "0::\n". v1 lines are "N:controllers:"; + // we only want the unified hierarchy. + for line in read[..n].split(|&b| b == b'\n') { + let Some(rest) = line.strip_prefix(b"0::") else { + continue; + }; + let rest = rest.strip_prefix(b"/").unwrap_or(rest); + return bun_core::fmt::buf_print_z( + buf, + format_args!( + "/sys/fs/cgroup/{}{}memory.pressure", + rest.escape_ascii(), + if rest.is_empty() { "" } else { "/" } + ), + ) + .ok(); + } + None + } + /// Open a PSI memory file and write a trigger. Tries the system-wide /// `/proc/pressure/memory` first, then the current cgroup's /// `memory.pressure` (relevant inside containers that can't write the @@ -94,11 +125,12 @@ mod posix { /// for unprivileged PSI triggers (kernel 6.6+). const TRIGGER: &[u8] = b"some 150000 2000000"; - let paths: [&bun_core::ZStr; 2] = [ - bun_core::zstr!("/proc/pressure/memory"), - bun_core::zstr!("/sys/fs/cgroup/memory.pressure"), + let mut cgroup_buf = [0u8; 320]; + let paths: [Option<&bun_core::ZStr>; 2] = [ + Some(bun_core::zstr!("/proc/pressure/memory")), + own_cgroup_pressure_path(&mut cgroup_buf), ]; - for path in paths { + for path in paths.into_iter().flatten() { let fd = match bun_sys::open(path, O::RDWR | O::NONBLOCK | O::CLOEXEC, 0) { Ok(fd) => fd, Err(_) => continue, @@ -227,6 +259,7 @@ mod posix { #[cfg(windows)] mod windows { use core::ffi::c_void; + use core::mem::MaybeUninit; use core::ptr::{self, NonNull}; use core::sync::atomic::{AtomicI32, Ordering}; @@ -240,9 +273,13 @@ mod windows { const INVALID_HANDLE_VALUE: HANDLE = usize::MAX as HANDLE; /// `LowMemoryResourceNotification` enum value. const LOW_MEMORY_RESOURCE_NOTIFICATION: i32 = 0; - /// `WT_EXECUTEDEFAULT` — run the callback on a normal thread-pool thread, - /// re-arm after each fire. - const WT_EXECUTEDEFAULT: ULONG = 0; + /// The notification handle is level-triggered: it stays signalled for as + /// long as memory is low. A recurring wait would spin the thread pool, so + /// each registration fires once and the JS thread re-arms after a holdoff. + const WT_EXECUTEONLYONCE: ULONG = 0x00000008; + /// Minimum gap between re-arming the wait, to avoid firing on every loop + /// tick while the low-memory condition persists. + const HOLDOFF_MS: u64 = 30_000; unsafe extern "system" { fn CreateMemoryResourceNotification(kind: i32) -> HANDLE; @@ -260,13 +297,16 @@ mod windows { #[repr(C)] struct MemoryPressureWatcher { - /// `uv_async_t` must come first so `async_.data` → `*mut Self` works in - /// the close callback. + /// Must come first so the `uv_close` callback can recover `*mut Self`. async_: libuv::uv_async_t, + /// Holdoff before re-arming the wait after a fire. + holdoff: libuv::uv_timer_t, global: *mut JSGlobalObject, /// Signalled by the kernel when available memory is low. notify: HANDLE, - /// Thread-pool wait registration. + /// Thread-pool wait registration. Only ever written on the JS thread; + /// read on the thread-pool thread only while the wait it names is + /// armed. Null between fires while the holdoff timer is pending. wait: HANDLE, /// Set from the thread-pool callback; read on the JS thread. pending_level: AtomicI32, @@ -276,6 +316,27 @@ mod windows { vm.rare_data().memory_pressure_watcher_slot() } + /// Arm a one-shot thread-pool wait on `notify`. JS thread only. + /// + /// SAFETY: `watcher` is live and `(*watcher).notify` is a valid handle. + unsafe fn arm_wait(watcher: *mut MemoryPressureWatcher) { + let mut wait: HANDLE = ptr::null_mut(); + // SAFETY: `watcher` outlives the wait (uninstall blocks on + // `UnregisterWaitEx(.., INVALID_HANDLE_VALUE)` before freeing). + let ok = unsafe { + RegisterWaitForSingleObject( + &mut wait, + (*watcher).notify, + wait_callback, + watcher.cast(), + u32::MAX, + WT_EXECUTEONLYONCE, + ) + }; + // SAFETY: sole writer of `wait` on the JS thread. + unsafe { (*watcher).wait = if ok != 0 { wait } else { ptr::null_mut() } }; + } + /// Runs on a Windows thread-pool thread. May only touch `pending_level` /// and `uv_async_send` (which is documented thread-safe). unsafe extern "system" fn wait_callback(ctx: *mut c_void, _timed_out: u8) { @@ -293,18 +354,63 @@ mod windows { /// Runs on the JS thread. unsafe extern "C" fn on_async(handle: *mut libuv::uv_async_t) { // SAFETY: `data` was set to the watcher in `install`. - let watcher = unsafe { &*(*handle).data.cast::() }; - let lvl = watcher.pending_level.swap(0, Ordering::SeqCst); - if lvl != 0 { - // SAFETY: `global` is the live per-thread global captured at install. - super::emit(unsafe { &*watcher.global }, lvl); + let watcher = unsafe { (*handle).data.cast::() }; + // SAFETY: `watcher` is live; JS thread. + let lvl = unsafe { (*watcher).pending_level.swap(0, Ordering::SeqCst) }; + if lvl == 0 { + return; + } + // SAFETY: `global` is the live per-thread global captured at install. + super::emit(unsafe { &*(*watcher).global }, lvl); + + // The one-shot wait has fired; release its handle and start the + // holdoff before re-arming. The `WT_EXECUTEONLYONCE` callback has + // already returned (it posted the async that woke us), so a + // blocking unregister here does not deadlock. + // SAFETY: JS thread; `watcher` is live. + unsafe { + let wait = core::mem::replace(&mut (*watcher).wait, ptr::null_mut()); + if !wait.is_null() { + UnregisterWaitEx(wait, INVALID_HANDLE_VALUE); + } + if !(*watcher).notify.is_null() { + let _ = libuv::uv_timer_start( + ptr::addr_of_mut!((*watcher).holdoff), + Some(on_holdoff), + HOLDOFF_MS, + 0, + ); + } } } - extern "C" fn free_on_close(handle: *mut libuv::uv_handle_t) { + /// Runs on the JS thread when the holdoff timer expires. + unsafe extern "C" fn on_holdoff(handle: *mut libuv::uv_timer_t) { + // SAFETY: `data` was set to the watcher in `install`. + let watcher = unsafe { (*handle).data.cast::() }; + // SAFETY: JS thread; `watcher` is live; `notify` is valid (checked before the + // timer was started). + unsafe { arm_wait(watcher) }; + } + + extern "C" fn on_async_closed(handle: *mut libuv::uv_handle_t) { // SAFETY: `handle` is the leading `uv_async_t` field of a heap-allocated - // `MemoryPressureWatcher`; uv_close guarantees no further use. - drop(unsafe { bun_core::heap::take(handle.cast::()) }); + // `MemoryPressureWatcher`; the timer is the only other uv handle and is + // closed next, with the final free in its close callback. + let watcher = handle.cast::(); + // SAFETY: `watcher` is live until `on_holdoff_closed` frees it. + unsafe { + libuv::uv_close( + ptr::addr_of_mut!((*watcher).holdoff).cast(), + Some(on_holdoff_closed), + ) + }; + } + + extern "C" fn on_holdoff_closed(handle: *mut libuv::uv_handle_t) { + // SAFETY: `data` is the watcher pointer; both uv handles are now + // fully closed, so the allocation is unreferenced. + unsafe { bun_core::heap::destroy((*handle).data.cast::()) }; } pub(super) fn install(global: &JSGlobalObject) { @@ -315,30 +421,29 @@ mod windows { return; } - // SAFETY: the allocation is fully written by uv_async_init + field stores below. - let watcher: *mut MemoryPressureWatcher = - bun_core::heap::into_raw(Box::::new_uninit()).cast(); + let uv_loop = global.bun_vm().uv_loop(); + let uninit: *mut MaybeUninit = + bun_core::heap::into_raw(Box::::new_uninit()); + let watcher: *mut MemoryPressureWatcher = uninit.cast(); // SAFETY: `watcher.async_` is a valid `uv_async_t`-sized slot; `uv_loop` // is the VM's live libuv loop. let rc = unsafe { - libuv::uv_async_init( - global.bun_vm().uv_loop(), - ptr::addr_of_mut!((*watcher).async_), - Some(on_async), - ) + libuv::uv_async_init(uv_loop, ptr::addr_of_mut!((*watcher).async_), Some(on_async)) }; if rc != 0 { - // SAFETY: never handed out; just free the raw allocation. - drop(unsafe { bun_core::heap::take(watcher) }); + // SAFETY: allocation is still `MaybeUninit`; never handed out. + drop(unsafe { bun_core::heap::take(uninit) }); return; } - // SAFETY: `async_` is an initialized, active handle. - unsafe { libuv::uv_unref(ptr::addr_of_mut!((*watcher).async_).cast()) }; - - // SAFETY: `watcher` is a freshly allocated, uv-initialised struct. + // SAFETY: `holdoff` is a valid `uv_timer_t`-sized slot; uv_timer_init + // cannot fail on a live loop. Remaining fields are plain POD. unsafe { + let _ = libuv::uv_timer_init(uv_loop, ptr::addr_of_mut!((*watcher).holdoff)); + libuv::uv_unref(ptr::addr_of_mut!((*watcher).async_).cast()); + libuv::uv_unref(ptr::addr_of_mut!((*watcher).holdoff).cast()); (*watcher).async_.data = watcher.cast(); + (*watcher).holdoff.data = watcher.cast(); (*watcher).global = ptr::from_ref(global).cast_mut(); (*watcher).notify = ptr::null_mut(); (*watcher).wait = ptr::null_mut(); @@ -348,29 +453,10 @@ mod windows { // SAFETY: Win32 call; returns NULL on failure. let notify = unsafe { CreateMemoryResourceNotification(LOW_MEMORY_RESOURCE_NOTIFICATION) }; if !notify.is_null() { - let mut wait: HANDLE = ptr::null_mut(); - // SAFETY: `notify` is a valid notification handle; `watcher` - // outlives the wait (guaranteed by the blocking `UnregisterWaitEx` - // in `uninstall`). - let ok = unsafe { - RegisterWaitForSingleObject( - &mut wait, - notify, - wait_callback, - watcher.cast(), - u32::MAX, - WT_EXECUTEDEFAULT, - ) - }; - if ok != 0 { - // SAFETY: sole owner of `watcher`. - unsafe { - (*watcher).notify = notify; - (*watcher).wait = wait; - } - } else { - // SAFETY: `notify` owned here; never registered. - unsafe { CloseHandle(notify) }; + // SAFETY: sole owner of `watcher`; `notify` is valid. + unsafe { + (*watcher).notify = notify; + arm_wait(watcher); } } @@ -388,17 +474,24 @@ mod windows { // SAFETY: `watcher` is the live allocation from `install`. unsafe { + let _ = libuv::uv_timer_stop(ptr::addr_of_mut!((*watcher).holdoff)); if !(*watcher).wait.is_null() { - // `INVALID_HANDLE_VALUE` blocks until any in-flight callback returns, - // so `watcher` is guaranteed unreferenced by the thread pool after this. + // `INVALID_HANDLE_VALUE` blocks until any in-flight callback + // returns, so `watcher` is guaranteed unreferenced by the + // thread pool afterwards. The handle came from a successful + // `RegisterWaitForSingleObject`, so this cannot fail with + // `ERROR_INVALID_HANDLE`. UnregisterWaitEx((*watcher).wait, INVALID_HANDLE_VALUE); + (*watcher).wait = ptr::null_mut(); } if !(*watcher).notify.is_null() { CloseHandle((*watcher).notify); + (*watcher).notify = ptr::null_mut(); } + // Close async → its callback closes holdoff → that callback frees. libuv::uv_close( ptr::addr_of_mut!((*watcher).async_).cast(), - Some(free_on_close), + Some(on_async_closed), ); } } @@ -431,5 +524,18 @@ pub extern "C" fn Bun__MemoryPressure__emit(global: &JSGlobalObject, lvl: i32) { emit(global, lvl); } +/// For `bun:internal-for-testing`: whether the per-VM watcher is currently +/// installed (i.e. the `RareData` slot is populated). Lets tests observe +/// arm/disarm without depending on a real OS notification. +#[unsafe(no_mangle)] +pub extern "C" fn Bun__MemoryPressure__isInstalled(global: &JSGlobalObject) -> bool { + let vm = global.bun_vm_ptr(); + // SAFETY: same-thread VM access (asserted by `bun_vm_ptr`). + unsafe { &mut *vm } + .rare_data() + .memory_pressure_watcher_slot() + .is_some() +} + #[cfg(not(windows))] pub use posix::on_poll; diff --git a/test/js/node/process/process-memory-pressure.test.ts b/test/js/node/process/process-memory-pressure.test.ts index 4ec3b39addf..80d0223d7de 100644 --- a/test/js/node/process/process-memory-pressure.test.ts +++ b/test/js/node/process/process-memory-pressure.test.ts @@ -34,28 +34,39 @@ describe.concurrent("process.on('memoryPressure')", () => { expect(exitCode).toBe(0); }); - test("disarms when the last listener is removed", async () => { + test("arms on first listener and disarms on last removal", async () => { const { stdout, stderr, exitCode } = await run(/* js */ ` - const { emitMemoryPressure } = require("bun:internal-for-testing"); + const { emitMemoryPressure, isMemoryPressureWatcherInstalled } = require("bun:internal-for-testing"); const seen = []; + const installed = []; const a = level => seen.push("a:" + level); const b = level => seen.push("b:" + level); + installed.push(isMemoryPressureWatcherInstalled()); // false: no listeners yet process.on("memoryPressure", a); + installed.push(isMemoryPressureWatcherInstalled()); // true: first listener armed it process.on("memoryPressure", b); + installed.push(isMemoryPressureWatcherInstalled()); // true: still armed emitMemoryPressure("warning"); process.off("memoryPressure", a); + installed.push(isMemoryPressureWatcherInstalled()); // true: one listener left emitMemoryPressure("critical"); process.off("memoryPressure", b); + installed.push(isMemoryPressureWatcherInstalled()); // false: last listener removed // No listeners registered; emit should be a no-op. emitMemoryPressure("critical"); // Re-arm and emit again to prove the watcher can be reinstalled. process.on("memoryPressure", a); + installed.push(isMemoryPressureWatcherInstalled()); // true: re-armed emitMemoryPressure("warning"); process.off("memoryPressure", a); - process.stdout.write(JSON.stringify(seen)); + installed.push(isMemoryPressureWatcherInstalled()); // false: disarmed again + process.stdout.write(JSON.stringify({ seen, installed })); `); expect({ stdout, stderr: stderr.trim() }).toEqual({ - stdout: JSON.stringify(["a:warning", "b:warning", "b:critical", "a:warning"]), + stdout: JSON.stringify({ + seen: ["a:warning", "b:warning", "b:critical", "a:warning"], + installed: [false, true, true, true, false, true, false], + }), stderr: "", }); expect(exitCode).toBe(0); From 92fdb8d3c1308e05059244f9219972ff2fc8fc52 Mon Sep 17 00:00:00 2001 From: "autofix-ci[bot]" <114827586+autofix-ci[bot]@users.noreply.github.com> Date: Mon, 22 Jun 2026 16:14:22 +0000 Subject: [PATCH 05/15] [autofix.ci] apply automated fixes --- src/runtime/node/memory_pressure.rs | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/src/runtime/node/memory_pressure.rs b/src/runtime/node/memory_pressure.rs index da186987670..f6bb09ab18d 100644 --- a/src/runtime/node/memory_pressure.rs +++ b/src/runtime/node/memory_pressure.rs @@ -429,7 +429,11 @@ mod windows { // SAFETY: `watcher.async_` is a valid `uv_async_t`-sized slot; `uv_loop` // is the VM's live libuv loop. let rc = unsafe { - libuv::uv_async_init(uv_loop, ptr::addr_of_mut!((*watcher).async_), Some(on_async)) + libuv::uv_async_init( + uv_loop, + ptr::addr_of_mut!((*watcher).async_), + Some(on_async), + ) }; if rc != 0 { // SAFETY: allocation is still `MaybeUninit`; never handed out. From 8ef2cb07e73564c3de6b4ce4b8138d57e78fe15f Mon Sep 17 00:00:00 2001 From: robobun <117481402+robobun@users.noreply.github.com> Date: Tue, 23 Jun 2026 02:47:35 +0000 Subject: [PATCH 06/15] Windows: replace uv_async/uv_timer/threadpool wait with a dedicated thread Instead of RegisterWaitForSingleObject + uv_async_t + uv_timer_t, spawn a single dedicated thread that blocks on WaitForMultipleObjects over the low-memory notification handle and a shutdown event. When the notification fires, the thread posts a MemoryPressureTask (which carries only the packed level, no pointer) via the lock-free concurrent task queue, then waits 30s on the shutdown event alone before re-checking (the notification handle is level-triggered and stays signalled while memory is low). Uninstall signals the shutdown event and joins the thread before closing handles. Because the task carries no pointer into the watcher, any task enqueued before the join that runs afterwards is harmless. This removes all libuv handle management and the NT thread-pool wait, and cuts the amount of unsafe roughly in half. --- src/event_loop/ConcurrentTask.rs | 1 + src/runtime/dispatch.rs | 6 +- src/runtime/node/memory_pressure.rs | 303 ++++++++++------------------ 3 files changed, 115 insertions(+), 195 deletions(-) diff --git a/src/event_loop/ConcurrentTask.rs b/src/event_loop/ConcurrentTask.rs index 46ee11ebc2f..b4e1c3e635d 100644 --- a/src/event_loop/ConcurrentTask.rs +++ b/src/event_loop/ConcurrentTask.rs @@ -117,6 +117,7 @@ pub mod task_tag { Open, PollPendingModulesTask, PosixSignalTask, + MemoryPressureTask, ProcessWaiterThreadTask, Read, Readdir, diff --git a/src/runtime/dispatch.rs b/src/runtime/dispatch.rs index 09c8375997c..d26b535dd37 100644 --- a/src/runtime/dispatch.rs +++ b/src/runtime/dispatch.rs @@ -424,6 +424,10 @@ pub fn run_task( global, ); } + task_tag::MemoryPressureTask => { + // `ptr` is the packed level (NOTE_MEMORYSTATUS_PRESSURE_* bits), not a pointer. + crate::node::memory_pressure::emit(global, task.ptr as usize as i32); + } task_tag::NativePromiseContextDeferredDerefTask => { // `ptr` packs an int, not a pointer. NativePromiseContextDeferredDerefTask::run_from_js_thread(task.ptr as usize); @@ -579,7 +583,7 @@ fn run_task_cold(task: Task) { /// Compile-time guard that the arm count above tracks /// `bun_event_loop::task_tag::COUNT`. Bump when adding a variant. const _: () = assert!( - task_tag::COUNT == 96, + task_tag::COUNT == 97, "dispatch::run_task arm count out of sync with bun_event_loop::task_tag", ); diff --git a/src/runtime/node/memory_pressure.rs b/src/runtime/node/memory_pressure.rs index f6bb09ab18d..d645a516aba 100644 --- a/src/runtime/node/memory_pressure.rs +++ b/src/runtime/node/memory_pressure.rs @@ -12,9 +12,10 @@ //! signal via `POLLPRI`. Requires `CAP_SYS_RESOURCE` before kernel 6.6, //! and PSI enabled (`CONFIG_PSI=y`). If neither path can be opened for //! writing, the watcher silently does nothing. -//! - Windows: `CreateMemoryResourceNotification(LowMemoryResourceNotification)` -//! waited on a thread-pool thread via `RegisterWaitForSingleObject`; the -//! callback posts back to the JS thread through a `uv_async_t`. +//! - Windows: a dedicated thread blocks on +//! `CreateMemoryResourceNotification(LowMemoryResourceNotification)` and +//! posts a `ConcurrentTask` back to the JS event loop when it signals, +//! with a 30 s holdoff before re-waiting (the handle is level-triggered). //! //! Armed lazily on the first listener and disarmed on the last removal via //! `onDidChangeListeners` in `BunProcess.cpp`, matching how signal handlers @@ -253,217 +254,143 @@ mod posix { } // ──────────────────────────────────────────────────────────────────────────── -// Windows backend: CreateMemoryResourceNotification + RegisterWaitForSingleObject +// Windows backend: CreateMemoryResourceNotification on a dedicated thread // ──────────────────────────────────────────────────────────────────────────── #[cfg(windows)] mod windows { use core::ffi::c_void; - use core::mem::MaybeUninit; use core::ptr::{self, NonNull}; - use core::sync::atomic::{AtomicI32, Ordering}; + use bun_event_loop::ConcurrentTask::{ConcurrentTask, Task, task_tag}; use bun_jsc::JSGlobalObject; use bun_jsc::virtual_machine::VirtualMachine; - use bun_sys::windows::libuv; type HANDLE = *mut c_void; type BOOL = i32; - type ULONG = u32; - const INVALID_HANDLE_VALUE: HANDLE = usize::MAX as HANDLE; + type DWORD = u32; + const WAIT_OBJECT_0: DWORD = 0; /// `LowMemoryResourceNotification` enum value. const LOW_MEMORY_RESOURCE_NOTIFICATION: i32 = 0; - /// The notification handle is level-triggered: it stays signalled for as - /// long as memory is low. A recurring wait would spin the thread pool, so - /// each registration fires once and the JS thread re-arms after a holdoff. - const WT_EXECUTEONLYONCE: ULONG = 0x00000008; - /// Minimum gap between re-arming the wait, to avoid firing on every loop - /// tick while the low-memory condition persists. - const HOLDOFF_MS: u64 = 30_000; + /// The low-memory notification handle is level-triggered: it stays + /// signalled while the condition holds. After posting one event we wait + /// on `shutdown` alone for this long before re-checking `notify`, so a + /// sustained low-memory state fires at most once every 30 s. + const HOLDOFF_MS: DWORD = 30_000; unsafe extern "system" { fn CreateMemoryResourceNotification(kind: i32) -> HANDLE; - fn RegisterWaitForSingleObject( - out_wait: *mut HANDLE, - handle: HANDLE, - callback: unsafe extern "system" fn(ctx: *mut c_void, timed_out: u8), - ctx: *mut c_void, - millis: ULONG, - flags: ULONG, - ) -> BOOL; - fn UnregisterWaitEx(wait: HANDLE, completion_event: HANDLE) -> BOOL; + fn CreateEventW(attrs: *mut c_void, manual_reset: BOOL, initial: BOOL, name: *const u16) + -> HANDLE; + fn SetEvent(h: HANDLE) -> BOOL; + fn WaitForSingleObject(h: HANDLE, ms: DWORD) -> DWORD; + fn WaitForMultipleObjects(n: DWORD, h: *const HANDLE, wait_all: BOOL, ms: DWORD) -> DWORD; fn CloseHandle(h: HANDLE) -> BOOL; } - #[repr(C)] + /// Per-VM watcher. Stored type-erased in `RareData.memory_pressure_watcher`. struct MemoryPressureWatcher { - /// Must come first so the `uv_close` callback can recover `*mut Self`. - async_: libuv::uv_async_t, - /// Holdoff before re-arming the wait after a fire. - holdoff: libuv::uv_timer_t, - global: *mut JSGlobalObject, - /// Signalled by the kernel when available memory is low. + /// Signalled by the kernel while available memory is low. notify: HANDLE, - /// Thread-pool wait registration. Only ever written on the JS thread; - /// read on the thread-pool thread only while the wait it names is - /// armed. Null between fires while the holdoff timer is pending. - wait: HANDLE, - /// Set from the thread-pool callback; read on the JS thread. - pending_level: AtomicI32, - } - - fn slot(vm: &mut VirtualMachine) -> &mut Option> { - vm.rare_data().memory_pressure_watcher_slot() - } - - /// Arm a one-shot thread-pool wait on `notify`. JS thread only. - /// - /// SAFETY: `watcher` is live and `(*watcher).notify` is a valid handle. - unsafe fn arm_wait(watcher: *mut MemoryPressureWatcher) { - let mut wait: HANDLE = ptr::null_mut(); - // SAFETY: `watcher` outlives the wait (uninstall blocks on - // `UnregisterWaitEx(.., INVALID_HANDLE_VALUE)` before freeing). - let ok = unsafe { - RegisterWaitForSingleObject( - &mut wait, - (*watcher).notify, - wait_callback, - watcher.cast(), - u32::MAX, - WT_EXECUTEONLYONCE, - ) - }; - // SAFETY: sole writer of `wait` on the JS thread. - unsafe { (*watcher).wait = if ok != 0 { wait } else { ptr::null_mut() } }; - } - - /// Runs on a Windows thread-pool thread. May only touch `pending_level` - /// and `uv_async_send` (which is documented thread-safe). - unsafe extern "system" fn wait_callback(ctx: *mut c_void, _timed_out: u8) { - // SAFETY: `ctx` is the watcher pointer passed at registration; the - // wait is unregistered with `INVALID_HANDLE_VALUE` before we free it, - // so it outlives every callback. - let watcher = unsafe { &*ctx.cast::() }; - watcher - .pending_level - .store(super::level::CRITICAL, Ordering::SeqCst); - // SAFETY: `async_` was uv_async_init'd on the JS loop; uv_async_send is thread-safe. - let _ = unsafe { libuv::uv_async_send(ptr::from_ref(&watcher.async_).cast_mut()) }; + /// Manual-reset event signalled by `uninstall` to wake the thread. + shutdown: HANDLE, + thread: Option>, } - /// Runs on the JS thread. - unsafe extern "C" fn on_async(handle: *mut libuv::uv_async_t) { - // SAFETY: `data` was set to the watcher in `install`. - let watcher = unsafe { (*handle).data.cast::() }; - // SAFETY: `watcher` is live; JS thread. - let lvl = unsafe { (*watcher).pending_level.swap(0, Ordering::SeqCst) }; - if lvl == 0 { - return; - } - // SAFETY: `global` is the live per-thread global captured at install. - super::emit(unsafe { &*(*watcher).global }, lvl); - - // The one-shot wait has fired; release its handle and start the - // holdoff before re-arming. The `WT_EXECUTEONLYONCE` callback has - // already returned (it posted the async that woke us), so a - // blocking unregister here does not deadlock. - // SAFETY: JS thread; `watcher` is live. - unsafe { - let wait = core::mem::replace(&mut (*watcher).wait, ptr::null_mut()); - if !wait.is_null() { - UnregisterWaitEx(wait, INVALID_HANDLE_VALUE); - } - if !(*watcher).notify.is_null() { - let _ = libuv::uv_timer_start( - ptr::addr_of_mut!((*watcher).holdoff), - Some(on_holdoff), - HOLDOFF_MS, - 0, - ); + impl Drop for MemoryPressureWatcher { + fn drop(&mut self) { + // SAFETY: handles were created by the kernel and are owned here; + // the thread has been joined so nothing references them. + unsafe { + CloseHandle(self.shutdown); + CloseHandle(self.notify); } } } - /// Runs on the JS thread when the holdoff timer expires. - unsafe extern "C" fn on_holdoff(handle: *mut libuv::uv_timer_t) { - // SAFETY: `data` was set to the watcher in `install`. - let watcher = unsafe { (*handle).data.cast::() }; - // SAFETY: JS thread; `watcher` is live; `notify` is valid (checked before the - // timer was started). - unsafe { arm_wait(watcher) }; - } - - extern "C" fn on_async_closed(handle: *mut libuv::uv_handle_t) { - // SAFETY: `handle` is the leading `uv_async_t` field of a heap-allocated - // `MemoryPressureWatcher`; the timer is the only other uv handle and is - // closed next, with the final free in its close callback. - let watcher = handle.cast::(); - // SAFETY: `watcher` is live until `on_holdoff_closed` frees it. - unsafe { - libuv::uv_close( - ptr::addr_of_mut!((*watcher).holdoff).cast(), - Some(on_holdoff_closed), - ) - }; + fn slot(vm: &mut VirtualMachine) -> &mut Option> { + vm.rare_data().memory_pressure_watcher_slot() } - extern "C" fn on_holdoff_closed(handle: *mut libuv::uv_handle_t) { - // SAFETY: `data` is the watcher pointer; both uv handles are now - // fully closed, so the allocation is unreferenced. - unsafe { bun_core::heap::destroy((*handle).data.cast::()) }; + /// Blocks on `[shutdown, notify]` and posts a `MemoryPressureTask` to the + /// JS event loop when `notify` fires. Handles are passed as usize since + /// `HANDLE` (`*mut c_void`) is not `Send`. + fn thread_main(vm_addr: usize, notify: usize, shutdown: usize) { + bun_core::output::Source::configure_named_thread(bun_core::zstr!("MemoryPressure")); + let handles: [HANDLE; 2] = [shutdown as HANDLE, notify as HANDLE]; + loop { + // SAFETY: both handles are valid for the thread's lifetime + // (`uninstall` joins before closing them). + let rc = unsafe { WaitForMultipleObjects(2, handles.as_ptr(), 0, u32::MAX) }; + match rc { + WAIT_OBJECT_0 => break, + r if r == WAIT_OBJECT_0 + 1 => { + let task = ConcurrentTask::create(Task::new( + task_tag::MemoryPressureTask, + super::level::CRITICAL as usize as *mut (), + )); + // SAFETY: `vm_addr` is the main-thread VM captured at + // install; it lives for the process. + // `enqueue_task_concurrent` is the documented thread-safe + // entry point (lock-free MPSC push + loop wakeup). + unsafe { &*(vm_addr as *const VirtualMachine) } + .event_loop_shared() + .enqueue_task_concurrent(task); + // Holdoff on `shutdown` only: `notify` stays signalled + // while memory is low, so waiting on it again would spin. + // SAFETY: `shutdown` is valid for the thread's lifetime. + if unsafe { WaitForSingleObject(handles[0], HOLDOFF_MS) } == WAIT_OBJECT_0 { + break; + } + } + _ => break, + } + } } pub(super) fn install(global: &JSGlobalObject) { let vm = global.bun_vm_ptr(); - // SAFETY: same-thread VM access. - let vm_ref = unsafe { &mut *vm }; - if slot(vm_ref).is_some() { + // SAFETY: same-thread VM access (asserted by `bun_vm_ptr`). + if slot(unsafe { &mut *vm }).is_some() { return; } - let uv_loop = global.bun_vm().uv_loop(); - let uninit: *mut MaybeUninit = - bun_core::heap::into_raw(Box::::new_uninit()); - let watcher: *mut MemoryPressureWatcher = uninit.cast(); - - // SAFETY: `watcher.async_` is a valid `uv_async_t`-sized slot; `uv_loop` - // is the VM's live libuv loop. - let rc = unsafe { - libuv::uv_async_init( - uv_loop, - ptr::addr_of_mut!((*watcher).async_), - Some(on_async), - ) - }; - if rc != 0 { - // SAFETY: allocation is still `MaybeUninit`; never handed out. - drop(unsafe { bun_core::heap::take(uninit) }); + // SAFETY: Win32 calls; both return NULL on failure. + let notify = unsafe { CreateMemoryResourceNotification(LOW_MEMORY_RESOURCE_NOTIFICATION) }; + if notify.is_null() { return; } - // SAFETY: `holdoff` is a valid `uv_timer_t`-sized slot; uv_timer_init - // cannot fail on a live loop. Remaining fields are plain POD. - unsafe { - let _ = libuv::uv_timer_init(uv_loop, ptr::addr_of_mut!((*watcher).holdoff)); - libuv::uv_unref(ptr::addr_of_mut!((*watcher).async_).cast()); - libuv::uv_unref(ptr::addr_of_mut!((*watcher).holdoff).cast()); - (*watcher).async_.data = watcher.cast(); - (*watcher).holdoff.data = watcher.cast(); - (*watcher).global = ptr::from_ref(global).cast_mut(); - (*watcher).notify = ptr::null_mut(); - (*watcher).wait = ptr::null_mut(); - ptr::addr_of_mut!((*watcher).pending_level).write(AtomicI32::new(0)); + // SAFETY: manual-reset, initially non-signalled, unnamed. + let shutdown = unsafe { CreateEventW(ptr::null_mut(), 1, 0, ptr::null()) }; + if shutdown.is_null() { + // SAFETY: `notify` is owned here. + unsafe { CloseHandle(notify) }; + return; } - // SAFETY: Win32 call; returns NULL on failure. - let notify = unsafe { CreateMemoryResourceNotification(LOW_MEMORY_RESOURCE_NOTIFICATION) }; - if !notify.is_null() { - // SAFETY: sole owner of `watcher`; `notify` is valid. - unsafe { - (*watcher).notify = notify; - arm_wait(watcher); + let (vm_addr, notify_addr, shutdown_addr) = + (vm as usize, notify as usize, shutdown as usize); + let thread = match std::thread::Builder::new() + .name("MemoryPressure".into()) + .stack_size(64 * 1024) + .spawn(move || thread_main(vm_addr, notify_addr, shutdown_addr)) + { + Ok(t) => t, + Err(_) => { + // SAFETY: both handles are owned here and were never shared. + unsafe { + CloseHandle(shutdown); + CloseHandle(notify); + } + return; } - } + }; + let watcher = bun_core::heap::into_raw(Box::new(MemoryPressureWatcher { + notify, + shutdown, + thread: Some(thread), + })); // SAFETY: same-thread VM access. *slot(unsafe { &mut *vm }) = NonNull::new(watcher.cast()); } @@ -474,30 +401,18 @@ mod windows { let Some(raw) = core::mem::take(slot(unsafe { &mut *vm })) else { return; }; - let watcher = raw.as_ptr().cast::(); - - // SAFETY: `watcher` is the live allocation from `install`. - unsafe { - let _ = libuv::uv_timer_stop(ptr::addr_of_mut!((*watcher).holdoff)); - if !(*watcher).wait.is_null() { - // `INVALID_HANDLE_VALUE` blocks until any in-flight callback - // returns, so `watcher` is guaranteed unreferenced by the - // thread pool afterwards. The handle came from a successful - // `RegisterWaitForSingleObject`, so this cannot fail with - // `ERROR_INVALID_HANDLE`. - UnregisterWaitEx((*watcher).wait, INVALID_HANDLE_VALUE); - (*watcher).wait = ptr::null_mut(); - } - if !(*watcher).notify.is_null() { - CloseHandle((*watcher).notify); - (*watcher).notify = ptr::null_mut(); - } - // Close async → its callback closes holdoff → that callback frees. - libuv::uv_close( - ptr::addr_of_mut!((*watcher).async_).cast(), - Some(on_async_closed), - ); + // SAFETY: slot was populated by `install` with a `Box`. + let mut watcher = + unsafe { bun_core::heap::take(raw.as_ptr().cast::()) }; + // SAFETY: `shutdown` is a valid manual-reset event owned by `watcher`. + unsafe { SetEvent(watcher.shutdown) }; + if let Some(thread) = watcher.thread.take() { + let _ = thread.join(); } + // Any `MemoryPressureTask` the thread enqueued before `join` carries + // only the packed level (no pointer into `watcher`), so dropping + // `watcher` here is safe regardless of queue state. `Drop` closes the + // handles. } } From 50e527631fe76569eeaaaa7129b6a43919b97ffb Mon Sep 17 00:00:00 2001 From: "autofix-ci[bot]" <114827586+autofix-ci[bot]@users.noreply.github.com> Date: Tue, 23 Jun 2026 02:49:50 +0000 Subject: [PATCH 07/15] [autofix.ci] apply automated fixes --- src/runtime/node/memory_pressure.rs | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/src/runtime/node/memory_pressure.rs b/src/runtime/node/memory_pressure.rs index d645a516aba..b3f29ae7ae3 100644 --- a/src/runtime/node/memory_pressure.rs +++ b/src/runtime/node/memory_pressure.rs @@ -280,8 +280,12 @@ mod windows { unsafe extern "system" { fn CreateMemoryResourceNotification(kind: i32) -> HANDLE; - fn CreateEventW(attrs: *mut c_void, manual_reset: BOOL, initial: BOOL, name: *const u16) - -> HANDLE; + fn CreateEventW( + attrs: *mut c_void, + manual_reset: BOOL, + initial: BOOL, + name: *const u16, + ) -> HANDLE; fn SetEvent(h: HANDLE) -> BOOL; fn WaitForSingleObject(h: HANDLE, ms: DWORD) -> DWORD; fn WaitForMultipleObjects(n: DWORD, h: *const HANDLE, wait_all: BOOL, ms: DWORD) -> DWORD; From c71d34aaf00a264f41e22190bdf8b9e72d60a056 Mon Sep 17 00:00:00 2001 From: robobun <117481402+robobun@users.noreply.github.com> Date: Tue, 23 Jun 2026 02:54:03 +0000 Subject: [PATCH 08/15] ci: retrigger (rustup download timeout on x64-asan lane) From 6718cf09ac22c55778d4d25e749cc479f1e1525e Mon Sep 17 00:00:00 2001 From: robobun <117481402+robobun@users.noreply.github.com> Date: Tue, 23 Jun 2026 03:29:01 +0000 Subject: [PATCH 09/15] Linux: tear down PSI watch on EPOLLERR; fix cgroup path escaping When the cgroup whose memory.pressure we opened is removed, kernfs reports EPOLLERR|EPOLLPRI permanently on the fd. With a level-triggered registration and no error handling the event loop would spin emitting spurious 'critical' events on every tick. on_poll now checks the FilePoll's Eof/Hup flags and unregisters + closes the fd instead of emitting when the trigger is dead. Also: splice the cgroup path via str::from_utf8 instead of escape_ascii(), which is a Debug-style escaper that doubles backslashes and would corrupt systemd-escaped unit names like machine-qemu\x2d1\x2dvm.scope. And: add prependListener/prependOnceListener type overloads for 'memoryPressure' to match the @types/node convention. --- packages/bun-types/overrides.d.ts | 2 ++ src/runtime/dispatch.rs | 6 +++- src/runtime/node/memory_pressure.rs | 43 +++++++++++++++++++++++++---- 3 files changed, 44 insertions(+), 7 deletions(-) diff --git a/packages/bun-types/overrides.d.ts b/packages/bun-types/overrides.d.ts index ac1dd181941..ee47728f4fb 100644 --- a/packages/bun-types/overrides.d.ts +++ b/packages/bun-types/overrides.d.ts @@ -112,6 +112,8 @@ declare global { off(event: "memoryPressure", listener: (level: "warning" | "critical") => void): this; addListener(event: "memoryPressure", listener: (level: "warning" | "critical") => void): this; removeListener(event: "memoryPressure", listener: (level: "warning" | "critical") => void): this; + prependListener(event: "memoryPressure", listener: (level: "warning" | "critical") => void): this; + prependOnceListener(event: "memoryPressure", listener: (level: "warning" | "critical") => void): this; emit(event: "memoryPressure", level: "warning" | "critical"): boolean; binding(m: "constants"): { diff --git a/src/runtime/dispatch.rs b/src/runtime/dispatch.rs index d26b535dd37..c8dc4922d66 100644 --- a/src/runtime/dispatch.rs +++ b/src/runtime/dispatch.rs @@ -684,7 +684,11 @@ pub unsafe fn __bun_run_file_poll(poll: *mut FilePoll, size_or_offset: i64) { unsafe { Process::on_wait_pid_from_event_loop_task(proc) }; } poll_tag::MEMORY_PRESSURE => { - crate::node::memory_pressure::on_poll(owner.ptr.cast(), size_or_offset); + // SAFETY: `poll` is live per `__bun_run_file_poll`'s contract; + // `owner.ptr` was set at `FilePoll::init` with this tag. + unsafe { + crate::node::memory_pressure::on_poll(poll, owner.ptr.cast(), size_or_offset) + }; } poll_tag::PARENT_DEATH_WATCHDOG => { let wd = owner_as!(bun_io::parent_death_watchdog::ParentDeathWatchdog); diff --git a/src/runtime/node/memory_pressure.rs b/src/runtime/node/memory_pressure.rs index b3f29ae7ae3..7ff13eaf9eb 100644 --- a/src/runtime/node/memory_pressure.rs +++ b/src/runtime/node/memory_pressure.rs @@ -101,11 +101,16 @@ mod posix { continue; }; let rest = rest.strip_prefix(b"/").unwrap_or(rest); + // cgroup v2 names are restricted to non-NUL, non-`/` bytes and in + // practice systemd-escaped ASCII, so this always succeeds; go + // through `str` so the bytes are spliced verbatim (systemd unit + // names can contain literal `\` which `escape_ascii` would mangle). + let rest = core::str::from_utf8(rest).ok()?; return bun_core::fmt::buf_print_z( buf, format_args!( "/sys/fs/cgroup/{}{}memory.pressure", - rest.escape_ascii(), + rest, if rest.is_empty() { "" } else { "/" } ), ) @@ -237,11 +242,37 @@ mod posix { /// `__bun_run_file_poll` dispatch target. `fflags` is the kqueue `fflags` /// on macOS (carrying the pressure level) and 0 on Linux. - pub fn on_poll(owner_ptr: *mut core::ffi::c_void, fflags: i64) { - // SAFETY: `owner_ptr` was set via `Owner::new(MEMORY_PRESSURE, watcher)` in `install`. - let watcher = unsafe { &*owner_ptr.cast::() }; - // SAFETY: `global` is the live per-thread global captured at install time. - let global = unsafe { &*watcher.global }; + /// + /// # Safety + /// `poll` is the live `FilePoll` this dispatch is running for and + /// `owner_ptr` is the `MemoryPressureWatcher` set via `Owner::new` in + /// `install`; both outlive the call (guaranteed by `__bun_run_file_poll`). + pub unsafe fn on_poll(poll: *mut FilePoll, owner_ptr: *mut core::ffi::c_void, fflags: i64) { + let watcher = owner_ptr.cast::(); + + // `EPOLLERR`/`EPOLLHUP` on a PSI fd means the trigger is dead (e.g. + // the cgroup whose `memory.pressure` we opened was removed). kernfs + // reports that condition permanently, so a level-triggered + // registration would spin the loop. Tear the watch down instead of + // emitting; `uninstall` sees `poll == null` and skips the second + // deinit. No equivalent on macOS: `EVFILT_MEMORYSTATUS` is system-wide. + #[cfg(any(target_os = "linux", target_os = "android"))] + // SAFETY: caller contract above. + unsafe { + if (*poll).flags.contains(Flags::Eof) || (*poll).flags.contains(Flags::Hup) { + let fd = (*poll).fd; + (*poll).deinit(); + let _ = bun_sys::close(fd); + (*watcher).poll = ptr::null_mut(); + (*watcher).registered = false; + return; + } + } + #[cfg(not(any(target_os = "linux", target_os = "android")))] + let _ = poll; + + // SAFETY: caller contract above; `global` was captured at install time. + let global = unsafe { &*(*watcher).global }; #[cfg(target_os = "macos")] let lvl = fflags as i32; #[cfg(not(target_os = "macos"))] From 280c9a67b285aa9c0585d2fb43728edbb2ca174b Mon Sep 17 00:00:00 2001 From: robobun <117481402+robobun@users.noreply.github.com> Date: Tue, 23 Jun 2026 03:37:27 +0000 Subject: [PATCH 10/15] macOS: prefer critical when both WARN and CRITICAL bits are set EVFILT_MEMORYSTATUS under EV_CLEAR accumulates transition bits between kevent() drains, so a warn->critical escalation while the loop is busy can deliver fflags=0x6 in one event. Pick the more severe level. --- src/runtime/node/memory_pressure.rs | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/src/runtime/node/memory_pressure.rs b/src/runtime/node/memory_pressure.rs index 7ff13eaf9eb..9c9e7493651 100644 --- a/src/runtime/node/memory_pressure.rs +++ b/src/runtime/node/memory_pressure.rs @@ -39,13 +39,13 @@ unsafe extern "C" { /// Emit the `"memoryPressure"` event on the given global's process object. /// Called from the `FilePoll` dispatch arm (already on the JS thread). pub fn emit(global: &JSGlobalObject, lvl: i32) { - // Anything other than WARN is reported as critical. On Linux the PSI - // trigger doesn't carry a level, and on Windows there is only - // `LowMemoryResourceNotification`; both map to critical. - let lvl = if lvl & level::WARNING != 0 { - level::WARNING - } else { + // `EVFILT_MEMORYSTATUS` accumulates transition bits under `EV_CLEAR`, so + // both WARN and CRITICAL can arrive in one kevent; pick the more severe. + // Linux PSI and Windows carry no level and default to critical here. + let lvl = if lvl & level::CRITICAL != 0 || lvl & level::WARNING == 0 { level::CRITICAL + } else { + level::WARNING }; // SAFETY: `global` is the live per-thread global; the C++ side handles // the "no listeners" case via `hasEventListeners`. From 12ce3b9d56414eb867a62cd1992fc9fd877ed2a4 Mon Sep 17 00:00:00 2001 From: robobun <117481402+robobun@users.noreply.github.com> Date: Tue, 23 Jun 2026 04:11:23 +0000 Subject: [PATCH 11/15] posix: defer teardown when uninstall runs from inside the dispatch A process.once('memoryPressure', ...) listener (or any listener that removes itself) reaches Bun__MemoryPressure__uninstall synchronously from inside on_poll -> emit(). Calling (*watcher.poll).deinit() and freeing the watcher Box there aliases the &mut FilePoll the dispatch chain is holding and frees memory the handler is running on. Add a dispatching flag that on_poll brackets around emit(). When uninstall sees it set, it clears the RareData slot and nulls watcher.poll to signal the deferral, but leaves the poll and the Box for on_poll's tail to tear down via the dispatch-provenance poll pointer. A re-subscribe inside the listener gets a fresh watcher in the now-empty slot and is unaffected. Also factor poll teardown into deinit_poll() so the three call sites (uninstall normal path, EPOLLERR, deferred tail) share one body. --- src/runtime/node/memory_pressure.rs | 88 +++++++++++++++++++++-------- 1 file changed, 63 insertions(+), 25 deletions(-) diff --git a/src/runtime/node/memory_pressure.rs b/src/runtime/node/memory_pressure.rs index 9c9e7493651..ced5ab70b3e 100644 --- a/src/runtime/node/memory_pressure.rs +++ b/src/runtime/node/memory_pressure.rs @@ -77,6 +77,13 @@ mod posix { /// Linux this is false when PSI is unavailable or requires privileges /// we don't have; the emit path is still functional for tests. registered: bool, + /// Set while `on_poll` is on the stack around the JS emit. A + /// `process.once` listener (or any listener that removes itself) + /// can reach `uninstall` synchronously from inside `emit()`; when + /// this is set, `uninstall` defers `poll.deinit()` and the box free + /// to `on_poll`'s tail so it never aliases the dispatching + /// `&mut FilePoll` or frees the watcher under the handler. + dispatching: bool, } fn slot(vm: &mut VirtualMachine) -> &mut Option> { @@ -167,6 +174,7 @@ mod posix { global: ptr::from_ref(global).cast_mut(), poll: ptr::null_mut(), registered: false, + dispatching: false, })); #[cfg(any(target_os = "linux", target_os = "android"))] @@ -210,33 +218,52 @@ mod posix { *slot(unsafe { &mut *vm }) = NonNull::new(watcher.cast()); } + /// Unregister `poll`, close the PSI fd on Linux, and return the hive + /// slot. Uses the dispatch-chain `poll` pointer (same provenance as the + /// live `&mut FilePoll` in `on_update`), never `watcher.poll`. + /// + /// # Safety + /// `poll` must be the live hive slot owned by this watcher. + unsafe fn deinit_poll(poll: *mut FilePoll) { + #[cfg(any(target_os = "linux", target_os = "android"))] + // SAFETY: caller contract; fd is read before the slot is returned. + let fd = unsafe { (*poll).fd }; + // SAFETY: caller contract. + unsafe { (*poll).deinit() }; + #[cfg(any(target_os = "linux", target_os = "android"))] + { + let _ = bun_sys::close(fd); + } + } + pub(super) fn uninstall(global: &JSGlobalObject) { let vm = global.bun_vm_ptr(); // SAFETY: same-thread VM access (asserted by `bun_vm_ptr`). let Some(raw) = core::mem::take(slot(unsafe { &mut *vm })) else { return; }; - // SAFETY: slot was populated by `install` with a `Box`. - let watcher = unsafe { bun_core::heap::take(raw.as_ptr().cast::()) }; + let watcher = raw.as_ptr().cast::(); + + // Called re-entrantly from inside `on_poll` → `emit()` (e.g. a + // `.once()` listener removed itself). `on_poll` holds the live + // `FilePoll` via its function argument; touching it here through + // `watcher.poll` would alias that `&mut`. Signal the deferral by + // nulling `poll` and let `on_poll`'s tail do the teardown and free + // the box. The slot is already cleared, so a re-subscribe inside the + // listener gets a fresh watcher. + // SAFETY: `watcher` is the live `Box` from `install`; sole writer on the JS thread. + if unsafe { (*watcher).dispatching } { + // SAFETY: same allocation; JS-thread-only write. + unsafe { (*watcher).poll = ptr::null_mut() }; + return; + } + // SAFETY: slot was populated by `install` with a `Box`; + // not dispatching, so this is the sole owner. + let watcher = unsafe { bun_core::heap::take(watcher) }; if !watcher.poll.is_null() { - #[cfg(any(target_os = "linux", target_os = "android"))] - let psi_fd = if watcher.registered { - // SAFETY: `poll` is live until `deinit` below. - Some(unsafe { (*watcher.poll).fd }) - } else { - None - }; - - // `deinit` unregisters (kqueue EV_DELETE / epoll CTL_DEL) and returns - // the slot to the hive; fd ownership is ours. - // SAFETY: `poll` is a live hive slot until this call returns. - unsafe { (*watcher.poll).deinit() }; - - #[cfg(any(target_os = "linux", target_os = "android"))] - if let Some(fd) = psi_fd { - let _ = bun_sys::close(fd); - } + // SAFETY: not dispatching, so no other `&mut FilePoll` is live. + unsafe { deinit_poll(watcher.poll) }; } } @@ -246,7 +273,8 @@ mod posix { /// # Safety /// `poll` is the live `FilePoll` this dispatch is running for and /// `owner_ptr` is the `MemoryPressureWatcher` set via `Owner::new` in - /// `install`; both outlive the call (guaranteed by `__bun_run_file_poll`). + /// `install`; both are live on entry. `emit()` may run user JS that + /// reaches `uninstall`, which defers teardown to this function's tail. pub unsafe fn on_poll(poll: *mut FilePoll, owner_ptr: *mut core::ffi::c_void, fflags: i64) { let watcher = owner_ptr.cast::(); @@ -260,16 +288,12 @@ mod posix { // SAFETY: caller contract above. unsafe { if (*poll).flags.contains(Flags::Eof) || (*poll).flags.contains(Flags::Hup) { - let fd = (*poll).fd; - (*poll).deinit(); - let _ = bun_sys::close(fd); + deinit_poll(poll); (*watcher).poll = ptr::null_mut(); (*watcher).registered = false; return; } } - #[cfg(not(any(target_os = "linux", target_os = "android")))] - let _ = poll; // SAFETY: caller contract above; `global` was captured at install time. let global = unsafe { &*(*watcher).global }; @@ -280,7 +304,21 @@ mod posix { let _ = fflags; super::level::CRITICAL }; + + // SAFETY: `watcher` is live; JS-thread-only write. + unsafe { (*watcher).dispatching = true }; super::emit(global, lvl); + // `watcher` is guaranteed still live here: `uninstall` saw + // `dispatching` and returned without freeing. `poll == null` means + // it ran and deferred teardown to us. + // SAFETY: see above. + unsafe { + (*watcher).dispatching = false; + if (*watcher).poll.is_null() { + deinit_poll(poll); + bun_core::heap::destroy(watcher); + } + } } } From e1e129f442fcbc64dc77932ffc51249c473f6e8f Mon Sep 17 00:00:00 2001 From: robobun <117481402+robobun@users.noreply.github.com> Date: Tue, 23 Jun 2026 04:59:59 +0000 Subject: [PATCH 12/15] posix: fix double-close on register failure and macOS knote stranding on re-subscribe When register() fails, tear the poll down immediately via deinit_poll() and leave watcher.poll null, so uninstall's later !poll.is_null() check skips it. Previously the Err arm closed the PSI fd but still stored the poll (whose .fd still held the closed number) in watcher.poll, and uninstall would close that number again after it had been reassigned. The now-dead registered field is removed. In the deferred-teardown tail: if the listener re-subscribed inside emit(), the RareData slot holds a fresh watcher. On macOS the (0, EVFILT_MEMORYSTATUS) knote is unique per kqueue and the new install's EV_ADD modified it in place, so an EV_DELETE from the old poll would silently strand the new watch. Clear PollMemoryPressure on the old poll first so unregister skips the kevent and only returns the hive slot. Linux is unaffected (each install opens its own PSI fd) but takes the same codepath for uniformity. --- src/runtime/node/memory_pressure.rs | 50 +++++++++++++++-------------- 1 file changed, 26 insertions(+), 24 deletions(-) diff --git a/src/runtime/node/memory_pressure.rs b/src/runtime/node/memory_pressure.rs index ced5ab70b3e..2d303fb8643 100644 --- a/src/runtime/node/memory_pressure.rs +++ b/src/runtime/node/memory_pressure.rs @@ -70,13 +70,10 @@ mod posix { /// Back-pointer so the poll dispatch can reach JS without going through /// the per-thread VM singleton (workers each have their own global). global: *mut JSGlobalObject, - /// Always set once `install` returns. The poll owns the PSI fd on Linux - /// (closed in `uninstall`); on macOS the fd slot is the kevent ident (0). + /// Non-null only when successfully registered; null if registration + /// failed or was torn down. The poll owns the PSI fd on Linux (closed + /// in `deinit_poll`); on macOS the fd slot is the kevent ident (0). poll: *mut FilePoll, - /// Whether `poll` was successfully registered with kqueue/epoll. On - /// Linux this is false when PSI is unavailable or requires privileges - /// we don't have; the emit path is still functional for tests. - registered: bool, /// Set while `on_poll` is on the stack around the JS emit. A /// `process.once` listener (or any listener that removes itself) /// can reach `uninstall` synchronously from inside `emit()`; when @@ -173,7 +170,6 @@ mod posix { let watcher = bun_core::heap::into_raw(Box::new(MemoryPressureWatcher { global: ptr::from_ref(global).cast_mut(), poll: ptr::null_mut(), - registered: false, dispatching: false, })); @@ -193,24 +189,19 @@ mod posix { ); // SAFETY: `poll` was just allocated by `FilePoll::init` (sole borrow); // `platform_event_loop` returns the live uws loop. - let registered = match unsafe { &mut *poll }.register( - unsafe { ctx.platform_event_loop() }, - Flags::MemoryPressure, - false, - ) { - bun_sys::Result::Ok(()) => true, + match unsafe { (*poll).register(ctx.platform_event_loop(), Flags::MemoryPressure, false) } + { + bun_sys::Result::Ok(()) => { + // SAFETY: `watcher` was just heap-allocated above; sole owner. + unsafe { (*watcher).poll = poll }; + } Err(_) => { - #[cfg(any(target_os = "linux", target_os = "android"))] - { - let _ = bun_sys::close(fd); - } - false + // Return the hive slot and (on Linux) close the PSI fd + // here; `watcher.poll` stays null so `uninstall` will + // not close the fd a second time after it is reassigned. + // SAFETY: `poll` is the fresh hive slot; sole owner. + unsafe { deinit_poll(poll) }; } - }; - // SAFETY: `watcher` was just heap-allocated above; sole owner. - unsafe { - (*watcher).poll = poll; - (*watcher).registered = registered; } } @@ -290,7 +281,6 @@ mod posix { if (*poll).flags.contains(Flags::Eof) || (*poll).flags.contains(Flags::Hup) { deinit_poll(poll); (*watcher).poll = ptr::null_mut(); - (*watcher).registered = false; return; } } @@ -315,6 +305,18 @@ mod posix { unsafe { (*watcher).dispatching = false; if (*watcher).poll.is_null() { + // If the listener re-subscribed inside `emit()`, the slot now + // holds a fresh watcher whose registration reused our kernel + // key: on macOS the `(ident=0, EVFILT_MEMORYSTATUS)` knote is + // unique per kqueue and `EV_ADD` modifies in place, so an + // `EV_DELETE` from this old poll would strand the new watch. + // Drop the poll-side registration flag so `unregister` becomes + // a no-syscall cleanup. On Linux each install opens its own + // PSI fd, so this only elides a redundant `CTL_DEL` (closing + // the fd below auto-removes it from epoll anyway). + if slot(&mut *global.bun_vm_ptr()).is_some() { + (*poll).flags.remove(Flags::PollMemoryPressure); + } deinit_poll(poll); bun_core::heap::destroy(watcher); } From 3fb49d5f87be077372c261a0898858e42c8a34ae Mon Sep 17 00:00:00 2001 From: "autofix-ci[bot]" <114827586+autofix-ci[bot]@users.noreply.github.com> Date: Tue, 23 Jun 2026 05:02:08 +0000 Subject: [PATCH 13/15] [autofix.ci] apply automated fixes --- src/runtime/node/memory_pressure.rs | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/src/runtime/node/memory_pressure.rs b/src/runtime/node/memory_pressure.rs index 2d303fb8643..2656e2a91ea 100644 --- a/src/runtime/node/memory_pressure.rs +++ b/src/runtime/node/memory_pressure.rs @@ -189,8 +189,9 @@ mod posix { ); // SAFETY: `poll` was just allocated by `FilePoll::init` (sole borrow); // `platform_event_loop` returns the live uws loop. - match unsafe { (*poll).register(ctx.platform_event_loop(), Flags::MemoryPressure, false) } - { + match unsafe { + (*poll).register(ctx.platform_event_loop(), Flags::MemoryPressure, false) + } { bun_sys::Result::Ok(()) => { // SAFETY: `watcher` was just heap-allocated above; sole owner. unsafe { (*watcher).poll = poll }; From a197aef76dc9056154a56b7862e859cf8f7e22e3 Mon Sep 17 00:00:00 2001 From: robobun <117481402+robobun@users.noreply.github.com> Date: Tue, 23 Jun 2026 07:30:17 +0000 Subject: [PATCH 14/15] Rewrite with task enqueue instead of direct emit; drop most unsafe on_poll now enqueues a MemoryPressureTask (packed level, same shape as PosixSignalTask) rather than calling emit() inside the FilePoll dispatch. That removes the re-entrancy with uninstall entirely, and with it the dispatching flag, the deferred-teardown tail, and the knote-stranding check. uninstall can always deinit the poll directly. The watcher struct is reduced to a single Option> (None when the OS backend is unavailable, so isInstalled still reflects listener presence). VM access goes through the safe bun_vm().as_mut() / loop_ctx() / VirtualMachine::get_mut() accessors instead of raw-pointer derefs. Windows: wrap the kernel handles in an RAII OwnedHandle so early returns close them automatically, and use the safe VM accessors for the slot. Remaining unsafe is the irreducible set: Win32 FFI, the C++ emit FFI, heap::take on the erased RareData slot, the cross-thread VM deref in the watcher thread, and the FilePoll raw-pointer API (which has no safe wrapper in bun_io). --- src/runtime/dispatch.rs | 7 +- src/runtime/node/memory_pressure.rs | 443 ++++++++++------------------ 2 files changed, 163 insertions(+), 287 deletions(-) diff --git a/src/runtime/dispatch.rs b/src/runtime/dispatch.rs index c8dc4922d66..9ddfc8c0454 100644 --- a/src/runtime/dispatch.rs +++ b/src/runtime/dispatch.rs @@ -684,11 +684,8 @@ pub unsafe fn __bun_run_file_poll(poll: *mut FilePoll, size_or_offset: i64) { unsafe { Process::on_wait_pid_from_event_loop_task(proc) }; } poll_tag::MEMORY_PRESSURE => { - // SAFETY: `poll` is live per `__bun_run_file_poll`'s contract; - // `owner.ptr` was set at `FilePoll::init` with this tag. - unsafe { - crate::node::memory_pressure::on_poll(poll, owner.ptr.cast(), size_or_offset) - }; + // SAFETY: `poll` is live per `__bun_run_file_poll`'s contract. + crate::node::memory_pressure::on_poll(unsafe { &mut *poll }, size_or_offset); } poll_tag::PARENT_DEATH_WATCHDOG => { let wd = owner_as!(bun_io::parent_death_watchdog::ParentDeathWatchdog); diff --git a/src/runtime/node/memory_pressure.rs b/src/runtime/node/memory_pressure.rs index 2656e2a91ea..ba2d54645cf 100644 --- a/src/runtime/node/memory_pressure.rs +++ b/src/runtime/node/memory_pressure.rs @@ -14,14 +14,23 @@ //! writing, the watcher silently does nothing. //! - Windows: a dedicated thread blocks on //! `CreateMemoryResourceNotification(LowMemoryResourceNotification)` and -//! posts a `ConcurrentTask` back to the JS event loop when it signals, -//! with a 30 s holdoff before re-waiting (the handle is level-triggered). +//! posts back to the JS event loop when it signals, with a 30 s holdoff +//! before re-waiting (the handle is level-triggered). +//! +//! All three backends post a `MemoryPressureTask` to the event loop rather +//! than calling into JS from the detector, so a listener removing itself +//! during `emit()` never races with the poll/thread that produced the event. //! //! Armed lazily on the first listener and disarmed on the last removal via //! `onDidChangeListeners` in `BunProcess.cpp`, matching how signal handlers //! are wired. The watcher does not keep the event loop alive. +use bun_event_loop::ConcurrentTask::{Task, task_tag}; use bun_jsc::JSGlobalObject; +#[cfg(not(windows))] +use bun_jsc::virtual_machine::VirtualMachine; +#[cfg(not(windows))] +use core::ptr::NonNull; /// Pressure level passed to JS. Values are the `NOTE_MEMORYSTATUS_PRESSURE_*` /// bits on macOS so the kqueue dispatch can pass `fflags` through unchanged. @@ -31,66 +40,74 @@ pub mod level { } unsafe extern "C" { - /// Defined in `src/jsc/bindings/BunProcess.cpp`. Builds the level string - /// and emits `"memoryPressure"` on the process object. fn Process__emitMemoryPressureEvent(global: *mut JSGlobalObject, level: i32); } -/// Emit the `"memoryPressure"` event on the given global's process object. -/// Called from the `FilePoll` dispatch arm (already on the JS thread). +/// `run_task` target for `task_tag::MemoryPressureTask`. `lvl` is the packed +/// task payload (macOS kevent `fflags`, or `level::CRITICAL` elsewhere). pub fn emit(global: &JSGlobalObject, lvl: i32) { - // `EVFILT_MEMORYSTATUS` accumulates transition bits under `EV_CLEAR`, so - // both WARN and CRITICAL can arrive in one kevent; pick the more severe. - // Linux PSI and Windows carry no level and default to critical here. + // macOS can deliver WARN|CRITICAL together under EV_CLEAR; pick the more severe. let lvl = if lvl & level::CRITICAL != 0 || lvl & level::WARNING == 0 { level::CRITICAL } else { level::WARNING }; - // SAFETY: `global` is the live per-thread global; the C++ side handles - // the "no listeners" case via `hasEventListeners`. + // SAFETY: FFI; `global` is the live per-thread global. unsafe { Process__emitMemoryPressureEvent(core::ptr::from_ref(global).cast_mut(), lvl) }; } +pub(crate) fn pressure_task(lvl: i32) -> Task { + Task::new(task_tag::MemoryPressureTask, lvl as usize as *mut ()) +} + +#[cfg(not(windows))] +fn slot(vm: &mut VirtualMachine) -> &mut Option> { + vm.rare_data().memory_pressure_watcher_slot() +} + // ──────────────────────────────────────────────────────────────────────────── // POSIX backend (macOS EVFILT_MEMORYSTATUS, Linux PSI) via FilePoll // ──────────────────────────────────────────────────────────────────────────── #[cfg(not(windows))] mod posix { - use core::ptr::{self, NonNull}; + use core::ptr::NonNull; - use bun_io::posix_event_loop::{EventLoopCtx, FilePoll, Flags, Owner, poll_tag}; + use bun_io::posix_event_loop::FilePoll; + #[cfg(any(target_os = "linux", target_os = "android", target_os = "macos"))] + use bun_io::posix_event_loop::{Flags, Owner, poll_tag}; use bun_jsc::JSGlobalObject; use bun_jsc::virtual_machine::VirtualMachine; + #[cfg(any(target_os = "linux", target_os = "android", target_os = "macos"))] use bun_sys::Fd; - /// Per-VM watcher. Stored type-erased in `RareData.memory_pressure_watcher`. - pub(super) struct MemoryPressureWatcher { - /// Back-pointer so the poll dispatch can reach JS without going through - /// the per-thread VM singleton (workers each have their own global). - global: *mut JSGlobalObject, - /// Non-null only when successfully registered; null if registration - /// failed or was torn down. The poll owns the PSI fd on Linux (closed - /// in `deinit_poll`); on macOS the fd slot is the kevent ident (0). - poll: *mut FilePoll, - /// Set while `on_poll` is on the stack around the JS emit. A - /// `process.once` listener (or any listener that removes itself) - /// can reach `uninstall` synchronously from inside `emit()`; when - /// this is set, `uninstall` defers `poll.deinit()` and the box free - /// to `on_poll`'s tail so it never aliases the dispatching - /// `&mut FilePoll` or frees the watcher under the handler. - dispatching: bool, + use super::slot; + + /// Stored type-erased in `RareData.memory_pressure_watcher`. `poll` is + /// `None` when the OS backend is unavailable so `isInstalled` still + /// reflects listener presence. + struct MemoryPressureWatcher { + poll: Option>, } - fn slot(vm: &mut VirtualMachine) -> &mut Option> { - vm.rare_data().memory_pressure_watcher_slot() + fn take_watcher(vm: &mut VirtualMachine) -> Option> { + let raw = slot(vm).take()?; + // SAFETY: slot is populated only by `install` with a `Box`. + Some(unsafe { bun_core::heap::take(raw.as_ptr().cast::()) }) + } + + fn deinit_poll(poll: &mut FilePoll) { + #[cfg(any(target_os = "linux", target_os = "android"))] + let fd = poll.fd; + poll.deinit(); + #[cfg(any(target_os = "linux", target_os = "android"))] + { + let _ = bun_sys::close(fd); + } } /// Build `/sys/fs/cgroup//memory.pressure` from the cgroup v2 - /// entry in `/proc/self/cgroup`. Inside a cgroup namespace the entry is - /// `0::/`, which yields `/sys/fs/cgroup/memory.pressure` (the container's - /// delegated root). Outside a namespace the path may be a systemd slice. + /// entry in `/proc/self/cgroup`. #[cfg(any(target_os = "linux", target_os = "android"))] fn own_cgroup_pressure_path(buf: &mut [u8]) -> Option<&bun_core::ZStr> { use bun_sys::O; @@ -98,18 +115,11 @@ mod posix { let mut read = [0u8; 256]; let n = bun_sys::read(fd, &mut read).unwrap_or(0); let _ = bun_sys::close(fd); - // cgroup v2 line: "0::\n". v1 lines are "N:controllers:"; - // we only want the unified hierarchy. for line in read[..n].split(|&b| b == b'\n') { let Some(rest) = line.strip_prefix(b"0::") else { continue; }; - let rest = rest.strip_prefix(b"/").unwrap_or(rest); - // cgroup v2 names are restricted to non-NUL, non-`/` bytes and in - // practice systemd-escaped ASCII, so this always succeeds; go - // through `str` so the bytes are spliced verbatim (systemd unit - // names can contain literal `\` which `escape_ascii` would mangle). - let rest = core::str::from_utf8(rest).ok()?; + let rest = core::str::from_utf8(rest.strip_prefix(b"/").unwrap_or(rest)).ok()?; return bun_core::fmt::buf_print_z( buf, format_args!( @@ -124,170 +134,104 @@ mod posix { } /// Open a PSI memory file and write a trigger. Tries the system-wide - /// `/proc/pressure/memory` first, then the current cgroup's - /// `memory.pressure` (relevant inside containers that can't write the - /// global file). Returns the fd on success. + /// `/proc/pressure/memory` first, then the current cgroup's file. #[cfg(any(target_os = "linux", target_os = "android"))] fn open_psi_fd() -> Option { use bun_sys::O; - /// 150 ms of "some"-stall in any 2 s window. 2 s is the minimum window - /// for unprivileged PSI triggers (kernel 6.6+). + /// 150 ms of "some"-stall in any 2 s window. 2 s is the minimum + /// window for unprivileged PSI triggers (kernel 6.6+). const TRIGGER: &[u8] = b"some 150000 2000000"; let mut cgroup_buf = [0u8; 320]; - let paths: [Option<&bun_core::ZStr>; 2] = [ + let paths = [ Some(bun_core::zstr!("/proc/pressure/memory")), own_cgroup_pressure_path(&mut cgroup_buf), ]; for path in paths.into_iter().flatten() { - let fd = match bun_sys::open(path, O::RDWR | O::NONBLOCK | O::CLOEXEC, 0) { - Ok(fd) => fd, - Err(_) => continue, + let Ok(fd) = bun_sys::open(path, O::RDWR | O::NONBLOCK | O::CLOEXEC, 0) else { + continue; }; - match bun_sys::write(fd, TRIGGER) { - Ok(_) => return Some(fd), - Err(_) => { - let _ = bun_sys::close(fd); - continue; - } + if bun_sys::write(fd, TRIGGER).is_ok() { + return Some(fd); } + let _ = bun_sys::close(fd); } None } - pub(super) fn install(global: &JSGlobalObject) { - let vm = global.bun_vm_ptr(); - // SAFETY: `bun_vm_ptr()` asserts same-thread; VM outlives this call. - let vm_ref = unsafe { &mut *vm }; - if slot(vm_ref).is_some() { - return; - } - - // SAFETY: VM singleton is live for the JS thread. - let ctx: EventLoopCtx = unsafe { VirtualMachine::event_loop_ctx(vm) }; - - let watcher = bun_core::heap::into_raw(Box::new(MemoryPressureWatcher { - global: ptr::from_ref(global).cast_mut(), - poll: ptr::null_mut(), - dispatching: false, - })); - + fn register_os_watch(global: &JSGlobalObject) -> Option> { #[cfg(any(target_os = "linux", target_os = "android"))] - let fd = open_psi_fd(); + let fd = open_psi_fd()?; #[cfg(target_os = "macos")] - let fd = Some(Fd::from_native(0)); + let fd = Fd::from_native(0); #[cfg(not(any(target_os = "linux", target_os = "android", target_os = "macos")))] - let fd: Option = None; - - if let Some(fd) = fd { + { + let _ = global; + return None; + } + #[cfg(any(target_os = "linux", target_os = "android", target_os = "macos"))] + { + let ctx = global.bun_vm().loop_ctx(); let poll = FilePoll::init( ctx, fd, Default::default(), - Owner::new(poll_tag::MEMORY_PRESSURE, watcher.cast()), + Owner::new(poll_tag::MEMORY_PRESSURE, NonNull::<()>::dangling().as_ptr()), ); - // SAFETY: `poll` was just allocated by `FilePoll::init` (sole borrow); - // `platform_event_loop` returns the live uws loop. - match unsafe { + // SAFETY: `poll` is the fresh hive slot; `platform_event_loop` is the live uws loop. + let result = unsafe { (*poll).register(ctx.platform_event_loop(), Flags::MemoryPressure, false) - } { - bun_sys::Result::Ok(()) => { - // SAFETY: `watcher` was just heap-allocated above; sole owner. - unsafe { (*watcher).poll = poll }; - } - Err(_) => { - // Return the hive slot and (on Linux) close the PSI fd - // here; `watcher.poll` stays null so `uninstall` will - // not close the fd a second time after it is reassigned. - // SAFETY: `poll` is the fresh hive slot; sole owner. - unsafe { deinit_poll(poll) }; - } + }; + if result.is_err() { + // SAFETY: fresh hive slot never handed out. + deinit_poll(unsafe { &mut *poll }); + return None; } + NonNull::new(poll) } - - // SAFETY: VM singleton is live; re-derive to avoid holding a `&mut` across the register. - *slot(unsafe { &mut *vm }) = NonNull::new(watcher.cast()); } - /// Unregister `poll`, close the PSI fd on Linux, and return the hive - /// slot. Uses the dispatch-chain `poll` pointer (same provenance as the - /// live `&mut FilePoll` in `on_update`), never `watcher.poll`. - /// - /// # Safety - /// `poll` must be the live hive slot owned by this watcher. - unsafe fn deinit_poll(poll: *mut FilePoll) { - #[cfg(any(target_os = "linux", target_os = "android"))] - // SAFETY: caller contract; fd is read before the slot is returned. - let fd = unsafe { (*poll).fd }; - // SAFETY: caller contract. - unsafe { (*poll).deinit() }; - #[cfg(any(target_os = "linux", target_os = "android"))] - { - let _ = bun_sys::close(fd); + pub(super) fn install(global: &JSGlobalObject) { + let vm = global.bun_vm().as_mut(); + if slot(vm).is_some() { + return; } + let watcher = Box::new(MemoryPressureWatcher { + poll: register_os_watch(global), + }); + *slot(global.bun_vm().as_mut()) = NonNull::new(bun_core::heap::into_raw(watcher).cast()); } pub(super) fn uninstall(global: &JSGlobalObject) { - let vm = global.bun_vm_ptr(); - // SAFETY: same-thread VM access (asserted by `bun_vm_ptr`). - let Some(raw) = core::mem::take(slot(unsafe { &mut *vm })) else { + let Some(watcher) = take_watcher(global.bun_vm().as_mut()) else { return; }; - let watcher = raw.as_ptr().cast::(); - - // Called re-entrantly from inside `on_poll` → `emit()` (e.g. a - // `.once()` listener removed itself). `on_poll` holds the live - // `FilePoll` via its function argument; touching it here through - // `watcher.poll` would alias that `&mut`. Signal the deferral by - // nulling `poll` and let `on_poll`'s tail do the teardown and free - // the box. The slot is already cleared, so a re-subscribe inside the - // listener gets a fresh watcher. - // SAFETY: `watcher` is the live `Box` from `install`; sole writer on the JS thread. - if unsafe { (*watcher).dispatching } { - // SAFETY: same allocation; JS-thread-only write. - unsafe { (*watcher).poll = ptr::null_mut() }; - return; - } - - // SAFETY: slot was populated by `install` with a `Box`; - // not dispatching, so this is the sole owner. - let watcher = unsafe { bun_core::heap::take(watcher) }; - if !watcher.poll.is_null() { - // SAFETY: not dispatching, so no other `&mut FilePoll` is live. - unsafe { deinit_poll(watcher.poll) }; + if let Some(mut poll) = watcher.poll { + // SAFETY: `on_poll` enqueues a task instead of running user JS, + // so this is never reached from inside the dispatch and no other + // `&mut FilePoll` is live. + deinit_poll(unsafe { poll.as_mut() }); } } /// `__bun_run_file_poll` dispatch target. `fflags` is the kqueue `fflags` /// on macOS (carrying the pressure level) and 0 on Linux. - /// - /// # Safety - /// `poll` is the live `FilePoll` this dispatch is running for and - /// `owner_ptr` is the `MemoryPressureWatcher` set via `Owner::new` in - /// `install`; both are live on entry. `emit()` may run user JS that - /// reaches `uninstall`, which defers teardown to this function's tail. - pub unsafe fn on_poll(poll: *mut FilePoll, owner_ptr: *mut core::ffi::c_void, fflags: i64) { - let watcher = owner_ptr.cast::(); + pub fn on_poll(poll: &mut FilePoll, fflags: i64) { + let vm = VirtualMachine::get_mut(); // `EPOLLERR`/`EPOLLHUP` on a PSI fd means the trigger is dead (e.g. - // the cgroup whose `memory.pressure` we opened was removed). kernfs - // reports that condition permanently, so a level-triggered - // registration would spin the loop. Tear the watch down instead of - // emitting; `uninstall` sees `poll == null` and skips the second - // deinit. No equivalent on macOS: `EVFILT_MEMORYSTATUS` is system-wide. + // the cgroup was removed). kernfs reports that permanently, so tear + // the watch down instead of emitting to avoid a level-triggered spin. #[cfg(any(target_os = "linux", target_os = "android"))] - // SAFETY: caller contract above. - unsafe { - if (*poll).flags.contains(Flags::Eof) || (*poll).flags.contains(Flags::Hup) { - deinit_poll(poll); - (*watcher).poll = ptr::null_mut(); - return; - } + if poll.flags.contains(Flags::Eof) || poll.flags.contains(Flags::Hup) { + drop(take_watcher(vm)); + deinit_poll(poll); + return; } - // SAFETY: caller contract above; `global` was captured at install time. - let global = unsafe { &*(*watcher).global }; + #[cfg(not(any(target_os = "linux", target_os = "android")))] + let _ = poll; #[cfg(target_os = "macos")] let lvl = fflags as i32; #[cfg(not(target_os = "macos"))] @@ -295,33 +239,7 @@ mod posix { let _ = fflags; super::level::CRITICAL }; - - // SAFETY: `watcher` is live; JS-thread-only write. - unsafe { (*watcher).dispatching = true }; - super::emit(global, lvl); - // `watcher` is guaranteed still live here: `uninstall` saw - // `dispatching` and returned without freeing. `poll == null` means - // it ran and deferred teardown to us. - // SAFETY: see above. - unsafe { - (*watcher).dispatching = false; - if (*watcher).poll.is_null() { - // If the listener re-subscribed inside `emit()`, the slot now - // holds a fresh watcher whose registration reused our kernel - // key: on macOS the `(ident=0, EVFILT_MEMORYSTATUS)` knote is - // unique per kqueue and `EV_ADD` modifies in place, so an - // `EV_DELETE` from this old poll would strand the new watch. - // Drop the poll-side registration flag so `unregister` becomes - // a no-syscall cleanup. On Linux each install opens its own - // PSI fd, so this only elides a redundant `CTL_DEL` (closing - // the fd below auto-removes it from epoll anyway). - if slot(&mut *global.bun_vm_ptr()).is_some() { - (*poll).flags.remove(Flags::PollMemoryPressure); - } - deinit_poll(poll); - bun_core::heap::destroy(watcher); - } - } + vm.enqueue_task(super::pressure_task(lvl)); } } @@ -334,7 +252,7 @@ mod windows { use core::ffi::c_void; use core::ptr::{self, NonNull}; - use bun_event_loop::ConcurrentTask::{ConcurrentTask, Task, task_tag}; + use bun_event_loop::ConcurrentTask::ConcurrentTask; use bun_jsc::JSGlobalObject; use bun_jsc::virtual_machine::VirtualMachine; @@ -342,12 +260,10 @@ mod windows { type BOOL = i32; type DWORD = u32; const WAIT_OBJECT_0: DWORD = 0; - /// `LowMemoryResourceNotification` enum value. const LOW_MEMORY_RESOURCE_NOTIFICATION: i32 = 0; - /// The low-memory notification handle is level-triggered: it stays - /// signalled while the condition holds. After posting one event we wait - /// on `shutdown` alone for this long before re-checking `notify`, so a - /// sustained low-memory state fires at most once every 30 s. + /// The notification handle stays signalled while memory is low; after + /// posting once we wait on `shutdown` alone for this long before + /// re-checking, so sustained pressure fires at most every 30 s. const HOLDOFF_MS: DWORD = 30_000; unsafe extern "system" { @@ -364,136 +280,104 @@ mod windows { fn CloseHandle(h: HANDLE) -> BOOL; } - /// Per-VM watcher. Stored type-erased in `RareData.memory_pressure_watcher`. - struct MemoryPressureWatcher { - /// Signalled by the kernel while available memory is low. - notify: HANDLE, - /// Manual-reset event signalled by `uninstall` to wake the thread. - shutdown: HANDLE, - thread: Option>, - } - - impl Drop for MemoryPressureWatcher { + /// Owns a kernel HANDLE; closes on drop. + struct OwnedHandle(HANDLE); + impl Drop for OwnedHandle { fn drop(&mut self) { - // SAFETY: handles were created by the kernel and are owned here; - // the thread has been joined so nothing references them. - unsafe { - CloseHandle(self.shutdown); - CloseHandle(self.notify); - } + // SAFETY: constructed only from a handle returned by the kernel. + unsafe { CloseHandle(self.0) }; } } + struct MemoryPressureWatcher { + /// Held only so it is closed on drop after the thread joins. + _notify: OwnedHandle, + shutdown: OwnedHandle, + thread: Option>, + } + fn slot(vm: &mut VirtualMachine) -> &mut Option> { vm.rare_data().memory_pressure_watcher_slot() } - /// Blocks on `[shutdown, notify]` and posts a `MemoryPressureTask` to the - /// JS event loop when `notify` fires. Handles are passed as usize since - /// `HANDLE` (`*mut c_void`) is not `Send`. fn thread_main(vm_addr: usize, notify: usize, shutdown: usize) { bun_core::output::Source::configure_named_thread(bun_core::zstr!("MemoryPressure")); let handles: [HANDLE; 2] = [shutdown as HANDLE, notify as HANDLE]; loop { - // SAFETY: both handles are valid for the thread's lifetime - // (`uninstall` joins before closing them). + // SAFETY: `uninstall` joins before closing the handles. let rc = unsafe { WaitForMultipleObjects(2, handles.as_ptr(), 0, u32::MAX) }; - match rc { - WAIT_OBJECT_0 => break, - r if r == WAIT_OBJECT_0 + 1 => { - let task = ConcurrentTask::create(Task::new( - task_tag::MemoryPressureTask, - super::level::CRITICAL as usize as *mut (), - )); - // SAFETY: `vm_addr` is the main-thread VM captured at - // install; it lives for the process. - // `enqueue_task_concurrent` is the documented thread-safe - // entry point (lock-free MPSC push + loop wakeup). - unsafe { &*(vm_addr as *const VirtualMachine) } - .event_loop_shared() - .enqueue_task_concurrent(task); - // Holdoff on `shutdown` only: `notify` stays signalled - // while memory is low, so waiting on it again would spin. - // SAFETY: `shutdown` is valid for the thread's lifetime. - if unsafe { WaitForSingleObject(handles[0], HOLDOFF_MS) } == WAIT_OBJECT_0 { - break; - } - } - _ => break, + if rc != WAIT_OBJECT_0 + 1 { + break; + } + let task = ConcurrentTask::create(super::pressure_task(super::level::CRITICAL)); + // SAFETY: main-thread VM captured at install; process-lifetime. + unsafe { &*(vm_addr as *const VirtualMachine) } + .event_loop_shared() + .enqueue_task_concurrent(task); + // SAFETY: `shutdown` is valid for the thread's lifetime. + if unsafe { WaitForSingleObject(handles[0], HOLDOFF_MS) } == WAIT_OBJECT_0 { + break; } } } pub(super) fn install(global: &JSGlobalObject) { - let vm = global.bun_vm_ptr(); - // SAFETY: same-thread VM access (asserted by `bun_vm_ptr`). - if slot(unsafe { &mut *vm }).is_some() { + let vm = global.bun_vm().as_mut(); + if slot(vm).is_some() { return; } - // SAFETY: Win32 calls; both return NULL on failure. + // SAFETY: FFI; returns NULL on failure. let notify = unsafe { CreateMemoryResourceNotification(LOW_MEMORY_RESOURCE_NOTIFICATION) }; if notify.is_null() { return; } - // SAFETY: manual-reset, initially non-signalled, unnamed. + let notify = OwnedHandle(notify); + // SAFETY: FFI; manual-reset, initially non-signalled, unnamed. let shutdown = unsafe { CreateEventW(ptr::null_mut(), 1, 0, ptr::null()) }; if shutdown.is_null() { - // SAFETY: `notify` is owned here. - unsafe { CloseHandle(notify) }; return; } - - let (vm_addr, notify_addr, shutdown_addr) = - (vm as usize, notify as usize, shutdown as usize); - let thread = match std::thread::Builder::new() + let shutdown = OwnedHandle(shutdown); + + let (vm_addr, n, s) = ( + core::ptr::from_ref(global.bun_vm()) as usize, + notify.0 as usize, + shutdown.0 as usize, + ); + let Ok(thread) = std::thread::Builder::new() .name("MemoryPressure".into()) .stack_size(64 * 1024) - .spawn(move || thread_main(vm_addr, notify_addr, shutdown_addr)) - { - Ok(t) => t, - Err(_) => { - // SAFETY: both handles are owned here and were never shared. - unsafe { - CloseHandle(shutdown); - CloseHandle(notify); - } - return; - } + .spawn(move || thread_main(vm_addr, n, s)) + else { + return; }; - let watcher = bun_core::heap::into_raw(Box::new(MemoryPressureWatcher { - notify, + let watcher = Box::new(MemoryPressureWatcher { + _notify: notify, shutdown, thread: Some(thread), - })); - // SAFETY: same-thread VM access. - *slot(unsafe { &mut *vm }) = NonNull::new(watcher.cast()); + }); + *slot(global.bun_vm().as_mut()) = NonNull::new(bun_core::heap::into_raw(watcher).cast()); } pub(super) fn uninstall(global: &JSGlobalObject) { - let vm = global.bun_vm_ptr(); - // SAFETY: same-thread VM access. - let Some(raw) = core::mem::take(slot(unsafe { &mut *vm })) else { + let Some(raw) = slot(global.bun_vm().as_mut()).take() else { return; }; - // SAFETY: slot was populated by `install` with a `Box`. + // SAFETY: slot is populated only by `install` with a `Box`. let mut watcher = unsafe { bun_core::heap::take(raw.as_ptr().cast::()) }; - // SAFETY: `shutdown` is a valid manual-reset event owned by `watcher`. - unsafe { SetEvent(watcher.shutdown) }; + // SAFETY: FFI; `shutdown` is a valid event owned by `watcher`. + unsafe { SetEvent(watcher.shutdown.0) }; if let Some(thread) = watcher.thread.take() { let _ = thread.join(); } - // Any `MemoryPressureTask` the thread enqueued before `join` carries - // only the packed level (no pointer into `watcher`), so dropping - // `watcher` here is safe regardless of queue state. `Drop` closes the - // handles. } } // ──────────────────────────────────────────────────────────────────────────── -// C-ABI exports for BunProcess.cpp +// C-ABI exports for BunProcess.cpp / InternalForTesting.cpp // ──────────────────────────────────────────────────────────────────────────── #[unsafe(no_mangle)] @@ -512,21 +396,16 @@ pub extern "C" fn Bun__MemoryPressure__uninstall(global: &JSGlobalObject) { windows::uninstall(global); } -/// Synthetic emit for `bun:internal-for-testing`. Bypasses the OS backend and -/// drives the same C++ emit path a real notification would. #[unsafe(no_mangle)] pub extern "C" fn Bun__MemoryPressure__emit(global: &JSGlobalObject, lvl: i32) { emit(global, lvl); } -/// For `bun:internal-for-testing`: whether the per-VM watcher is currently -/// installed (i.e. the `RareData` slot is populated). Lets tests observe -/// arm/disarm without depending on a real OS notification. #[unsafe(no_mangle)] pub extern "C" fn Bun__MemoryPressure__isInstalled(global: &JSGlobalObject) -> bool { - let vm = global.bun_vm_ptr(); - // SAFETY: same-thread VM access (asserted by `bun_vm_ptr`). - unsafe { &mut *vm } + global + .bun_vm() + .as_mut() .rare_data() .memory_pressure_watcher_slot() .is_some() From e538b509e1de052bd14e463209cf7f90b9feb153 Mon Sep 17 00:00:00 2001 From: "autofix-ci[bot]" <114827586+autofix-ci[bot]@users.noreply.github.com> Date: Tue, 23 Jun 2026 07:32:20 +0000 Subject: [PATCH 15/15] [autofix.ci] apply automated fixes --- src/runtime/node/memory_pressure.rs | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/src/runtime/node/memory_pressure.rs b/src/runtime/node/memory_pressure.rs index ba2d54645cf..eaf6337c988 100644 --- a/src/runtime/node/memory_pressure.rs +++ b/src/runtime/node/memory_pressure.rs @@ -177,7 +177,10 @@ mod posix { ctx, fd, Default::default(), - Owner::new(poll_tag::MEMORY_PRESSURE, NonNull::<()>::dangling().as_ptr()), + Owner::new( + poll_tag::MEMORY_PRESSURE, + NonNull::<()>::dangling().as_ptr(), + ), ); // SAFETY: `poll` is the fresh hive slot; `platform_event_loop` is the live uws loop. let result = unsafe {