Skip to content
Merged
Show file tree
Hide file tree
Changes from 8 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 20 additions & 0 deletions packages/bun-types/overrides.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,26 @@ declare global {
_exiting: boolean;
noDeprecation?: boolean | undefined;

/**
* Emitted when the operating system signals that available memory is
* running low. Use this to release caches or reap idle resources instead
* of polling.
*
* On macOS `level` distinguishes `"warning"` from `"critical"` based on
* the kernel's memorystatus thresholds. On Linux (PSI) and Windows the
* event is always emitted with `"critical"`. On Linux, the underlying
* PSI trigger requires `CAP_SYS_RESOURCE` on kernels before 6.6; when
* unavailable the event is simply never emitted.
*
* This listener does not keep the event loop alive.
*/
on(event: "memoryPressure", listener: (level: "warning" | "critical") => void): this;
once(event: "memoryPressure", listener: (level: "warning" | "critical") => void): this;
off(event: "memoryPressure", listener: (level: "warning" | "critical") => void): this;
addListener(event: "memoryPressure", listener: (level: "warning" | "critical") => void): this;
removeListener(event: "memoryPressure", listener: (level: "warning" | "critical") => void): this;
Comment thread
robobun marked this conversation as resolved.
emit(event: "memoryPressure", level: "warning" | "critical"): boolean;

binding(m: "constants"): {
os: typeof import("node:os").constants;
fs: typeof import("node:fs").constants;
Expand Down
1 change: 1 addition & 0 deletions src/event_loop/ConcurrentTask.rs
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,7 @@ pub mod task_tag {
Open,
PollPendingModulesTask,
PosixSignalTask,
MemoryPressureTask,
ProcessWaiterThreadTask,
Read,
Readdir,
Expand Down
58 changes: 55 additions & 3 deletions src/io/posix_event_loop.rs
Original file line number Diff line number Diff line change
Expand Up @@ -221,6 +221,7 @@ pub enum PollTag {
TerminalPoll,
ParentDeathWatchdog,
LifecycleScriptSubprocessOutputReader,
MemoryPressure,
}

/// Compatibility module — call sites in `bun_runtime`/`bun_install` still spell
Expand All @@ -244,6 +245,7 @@ pub mod poll_tag {
pub const PARENT_DEATH_WATCHDOG: PollTag = PollTag::ParentDeathWatchdog;
pub const LIFECYCLE_SCRIPT_SUBPROCESS_OUTPUT_READER: PollTag =
PollTag::LifecycleScriptSubprocessOutputReader;
pub const MEMORY_PRESSURE: PollTag = PollTag::MemoryPressure;
}

#[derive(Copy, Clone)]
Expand Down Expand Up @@ -330,6 +332,7 @@ impl FilePoll {
flags.remove(Flags::Writable);
flags.remove(Flags::Process);
flags.remove(Flags::Machport);
flags.remove(Flags::MemoryPressure);
flags.remove(Flags::Eof);
flags.remove(Flags::Hup);

Expand Down Expand Up @@ -361,6 +364,14 @@ impl FilePoll {
#[cfg(all(target_os = "macos", debug_assertions))]
debug_assert!(self.generation_number == kqueue_event.ext[0] as usize);

// EVFILT_MEMORYSTATUS reports the pressure level in `fflags`, not `data`;
// thread it through `size_or_offset` so the dispatch arm can read it.
#[cfg(target_os = "macos")]
if kqueue_event.filter == bun_sys::darwin::EVFILT::MEMORYSTATUS {
self.on_update(kqueue_event.fflags as i64);
return;
}

self.on_update(kqueue_event.data as i64);
}

Expand Down Expand Up @@ -442,6 +453,7 @@ impl FilePoll {
|| self.flags.contains(Flags::PollReadable)
|| self.flags.contains(Flags::PollProcess)
|| self.flags.contains(Flags::PollMachport)
|| self.flags.contains(Flags::PollMemoryPressure)
}

pub fn on_update(&mut self, size_or_offset: i64) {
Expand Down Expand Up @@ -699,6 +711,8 @@ impl FilePoll {
let mut flags: u32 = match flag {
Flags::Process | Flags::Readable => EPOLL::IN | EPOLL::HUP | one_shot_flag,
Flags::Writable => EPOLL::OUT | EPOLL::HUP | EPOLL::ERR | one_shot_flag,
// PSI trigger fds signal via POLLPRI only.
Flags::MemoryPressure => EPOLL::PRI | EPOLL::ERR | one_shot_flag,
_ => unreachable!(),
};
// epoll keys on fd alone; if the other direction is already
Expand Down Expand Up @@ -784,6 +798,17 @@ impl FilePoll {
flags: EV::ADD | one_shot_flag,
ext: [self.generation_number as u64, 0],
},
// System-wide memory pressure. ident is always 0; EV_CLEAR so each
// transition delivers once (matches libdispatch's registration).
Flags::MemoryPressure => kevent64_s {
ident: 0,
filter: EVFILT::MEMORYSTATUS,
data: 0,
fflags: NOTE::MEMORYSTATUS_PRESSURE_WARN | NOTE::MEMORYSTATUS_PRESSURE_CRITICAL,
udata: Pollable::init(self).ptr() as u64,
flags: EV::ADD | EV::CLEAR | one_shot_flag,
ext: [self.generation_number as u64, 0],
},
_ => unreachable!(),
};

Expand Down Expand Up @@ -863,7 +888,7 @@ impl FilePoll {
NOTE::EXIT,
udata,
),
Flags::Machport => {
Flags::Machport | Flags::MemoryPressure => {
return sys::Result::Err(sys::Error::from_code(
sys::E::EOPNOTSUPP,
sys::Tag::kevent,
Expand Down Expand Up @@ -914,6 +939,7 @@ impl FilePoll {
}
Flags::Writable => Flags::PollWritable,
Flags::Machport => Flags::PollMachport,
Flags::MemoryPressure => Flags::PollMemoryPressure,
_ => unreachable!(),
});
self.flags.remove(Flags::NeedsRearm);
Expand Down Expand Up @@ -972,7 +998,8 @@ impl FilePoll {
if !(self.flags.contains(Flags::PollReadable)
|| self.flags.contains(Flags::PollWritable)
|| self.flags.contains(Flags::PollProcess)
|| self.flags.contains(Flags::PollMachport))
|| self.flags.contains(Flags::PollMachport)
|| self.flags.contains(Flags::PollMemoryPressure))
{
// no-op
return sys::Result::Ok(());
Expand All @@ -995,6 +1022,9 @@ impl FilePoll {
if self.flags.contains(Flags::PollMachport) {
break 'brk Flags::Machport;
}
if self.flags.contains(Flags::PollMemoryPressure) {
break 'brk Flags::MemoryPressure;
}
return sys::Result::Ok(());
};

Expand All @@ -1008,6 +1038,7 @@ impl FilePoll {
self.flags.remove(Flags::PollReadable);
self.flags.remove(Flags::PollWritable);
self.flags.remove(Flags::PollMachport);
self.flags.remove(Flags::PollMemoryPressure);
return sys::Result::Ok(());
}

Expand Down Expand Up @@ -1078,6 +1109,15 @@ impl FilePoll {
flags: EV::DELETE,
ext: [0, 0],
},
Flags::MemoryPressure => kevent64_s {
ident: 0,
filter: EVFILT::MEMORYSTATUS,
data: 0,
fflags: 0,
udata: Pollable::init(self).ptr() as u64,
flags: EV::DELETE,
ext: [0, 0],
},
_ => unreachable!(),
};

Expand Down Expand Up @@ -1154,7 +1194,7 @@ impl FilePoll {
Flags::Readable => make_kevent(ident, EVFILT::READ, EV::DELETE, 0, udata),
Flags::Writable => make_kevent(ident, EVFILT::WRITE, EV::DELETE, 0, udata),
Flags::Process => make_kevent(ident, EVFILT::PROC, EV::DELETE, NOTE::EXIT, udata),
Flags::Machport => {
Flags::Machport | Flags::MemoryPressure => {
return sys::Result::Err(sys::Error::from_code(
sys::E::EOPNOTSUPP,
sys::Tag::kevent,
Expand Down Expand Up @@ -1196,6 +1236,7 @@ impl FilePoll {
self.flags.remove(Flags::PollWritable);
self.flags.remove(Flags::PollProcess);
self.flags.remove(Flags::PollMachport);
self.flags.remove(Flags::PollMemoryPressure);

sys::Result::Ok(())
}
Expand Down Expand Up @@ -1230,6 +1271,8 @@ pub enum Flags {
PollProcess,
/// Poll for machport events
PollMachport,
/// Poll for memory-pressure events (Darwin `EVFILT_MEMORYSTATUS`, Linux PSI `EPOLLPRI`)
PollMemoryPressure,

// What did the event loop tell us?
Readable,
Expand All @@ -1238,6 +1281,7 @@ pub enum Flags {
Eof,
Hup,
Machport,
MemoryPressure,

// What is the type of file descriptor?
Fifo,
Expand Down Expand Up @@ -1273,6 +1317,7 @@ impl Flags {
Flags::Writable => Flags::PollWritable,
Flags::Process => Flags::PollProcess,
Flags::Machport => Flags::PollMachport,
Flags::MemoryPressure => Flags::PollMemoryPressure,
other => other,
}
}
Expand Down Expand Up @@ -1301,6 +1346,10 @@ impl Flags {
if kqueue_event.filter == EVFILT::MACHPORT {
flags.insert(Flags::Machport);
}
#[cfg(target_os = "macos")]
if kqueue_event.filter == EVFILT::MEMORYSTATUS {
flags.insert(Flags::MemoryPressure);
}
}
flags
}
Expand All @@ -1315,6 +1364,9 @@ impl Flags {
if epoll.events & EPOLL::OUT != 0 {
flags.insert(Flags::Writable);
}
if epoll.events & EPOLL::PRI != 0 {
flags.insert(Flags::MemoryPressure);
}
if epoll.events & EPOLL::ERR != 0 {
flags.insert(Flags::Eof);
}
Expand Down
12 changes: 12 additions & 0 deletions src/js/internal-for-testing.ts
Original file line number Diff line number Diff line change
Expand Up @@ -290,6 +290,18 @@ export const lowercaseHeaderNameSIMD: (name: string) => string = $newCppFunction
1,
);

export const emitMemoryPressure: (level: "warning" | "critical") => void = $newCppFunction(
"InternalForTesting.cpp",
"jsFunction_emitMemoryPressure",
1,
);

export const isMemoryPressureWatcherInstalled: () => boolean = $newCppFunction(
"InternalForTesting.cpp",
"jsFunction_isMemoryPressureWatcherInstalled",
0,
);

export const getEventLoopStats: () => { activeTasks: number; concurrentRef: number; numPolls: number } =
$newZigFunction("event_loop.zig", "getActiveTasks", 0);

Expand Down
28 changes: 28 additions & 0 deletions src/jsc/bindings/BunProcess.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1382,9 +1382,24 @@ __attribute__((noinline)) static void forwardSignal(int signalNumber)
Bun__onPosixSignal(signalNumber);
}

extern "C" void Bun__MemoryPressure__install(JSC::JSGlobalObject* global);
extern "C" void Bun__MemoryPressure__uninstall(JSC::JSGlobalObject* global);

static void onDidChangeListeners(EventEmitter& eventEmitter, const Identifier& eventName, bool isAdded)
{
if (Bun__isMainThreadVM()) {
if (eventName == "memoryPressure") {
auto* global = eventEmitter.scriptExecutionContext()->jsGlobalObject();
if (isAdded) {
if (eventEmitter.listenerCount(eventName) == 1) {
Bun__MemoryPressure__install(global);
}
} else if (eventEmitter.listenerCount(eventName) == 0) {
Bun__MemoryPressure__uninstall(global);
}
return;
}

// IPC handlers
if (eventName == "message" || eventName == "disconnect") {
auto* global = uncheckedDowncast<GlobalObject>(eventEmitter.scriptExecutionContext()->jsGlobalObject());
Expand Down Expand Up @@ -4314,6 +4329,19 @@ extern "C" void Process__emitDisconnectEvent(Zig::GlobalObject* global)
}
}

extern "C" void Process__emitMemoryPressureEvent(Zig::GlobalObject* global, int level)
{
auto* process = global->processObject();
auto& vm = JSC::getVM(global);
auto ident = Identifier::fromString(vm, "memoryPressure"_s);
if (process->wrapped().hasEventListeners(ident)) {
JSC::MarkedArgumentBuffer args;
// Level values match NOTE_MEMORYSTATUS_PRESSURE_WARN (2) / _CRITICAL (4).
args.append(jsString(vm, level == 2 ? String("warning"_s) : String("critical"_s)));
process->wrapped().emit(ident, args);
}
}

Comment thread
coderabbitai[bot] marked this conversation as resolved.
extern "C" void Process__emitErrorEvent(Zig::GlobalObject* global, EncodedJSValue value)
{
auto* process = global->processObject();
Expand Down
23 changes: 23 additions & 0 deletions src/jsc/bindings/InternalForTesting.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -101,4 +101,27 @@ JSC_DEFINE_HOST_FUNCTION(jsFunction_BunString_toThreadSafeRefCountDelta, (JSC::J
return JSValue::encode(jsNumber(static_cast<int32_t>(after) - static_cast<int32_t>(before)));
}

extern "C" void Bun__MemoryPressure__emit(JSC::JSGlobalObject* global, int level);
extern "C" bool Bun__MemoryPressure__isInstalled(JSC::JSGlobalObject* global);

// Synthetically fire process.on("memoryPressure") so tests can exercise the
// emit path without depending on real OS memory pressure.
JSC_DEFINE_HOST_FUNCTION(jsFunction_emitMemoryPressure, (JSC::JSGlobalObject * globalObject, JSC::CallFrame* callFrame))
{
auto& vm = globalObject->vm();
auto scope = DECLARE_THROW_SCOPE(vm);
auto str = callFrame->argument(0).toWTFString(globalObject);
RETURN_IF_EXCEPTION(scope, {});
int level = str == "warning"_s ? 2 : 4;
Bun__MemoryPressure__emit(defaultGlobalObject(globalObject), level);
return encodedJSUndefined();
}

// Whether the per-VM memory-pressure watcher is currently installed, so tests
// can observe that process.on/off actually arm/disarm the OS backend.
JSC_DEFINE_HOST_FUNCTION(jsFunction_isMemoryPressureWatcherInstalled, (JSC::JSGlobalObject * globalObject, JSC::CallFrame* callFrame))
{
return JSValue::encode(jsBoolean(Bun__MemoryPressure__isInstalled(defaultGlobalObject(globalObject))));
}

}
2 changes: 2 additions & 0 deletions src/jsc/bindings/InternalForTesting.h
Original file line number Diff line number Diff line change
Expand Up @@ -11,5 +11,7 @@ JSC_DECLARE_HOST_FUNCTION(jsFunction_lsanDoLeakCheck);
JSC_DECLARE_HOST_FUNCTION(jsFunction_isASANEnabled);
JSC_DECLARE_HOST_FUNCTION(jsFunction_BunString_toThreadSafeRefCountDelta);
JSC_DECLARE_HOST_FUNCTION(jsFunction_lowercaseHeaderNameSIMD);
JSC_DECLARE_HOST_FUNCTION(jsFunction_emitMemoryPressure);
JSC_DECLARE_HOST_FUNCTION(jsFunction_isMemoryPressureWatcherInstalled);

}
11 changes: 11 additions & 0 deletions src/jsc/rare_data.rs
Original file line number Diff line number Diff line change
Expand Up @@ -274,6 +274,10 @@ pub struct RareData {
/// lazy-init in `bun_runtime::node::node_fs_stat_watcher`.
pub node_fs_stat_watcher_scheduler: Option<NonNull<c_void>>,

/// `bun_runtime::node::memory_pressure::MemoryPressureWatcher` — erased
/// `Box`; lazy-init on the first `process.on("memoryPressure", ...)` listener.
pub memory_pressure_watcher: Option<NonNull<c_void>>,

/// Watch-mode restart needs to RST every listen socket so the new process
/// can rebind without `EADDRINUSE`. Written on the JS thread; drained on
/// the watcher thread — hence the mutex (PORTING.md §Concurrency: lock
Expand Down Expand Up @@ -336,6 +340,7 @@ impl Default for RareData {
default_client_ssl_ctx: None,
mime_types: None,
node_fs_stat_watcher_scheduler: None,
memory_pressure_watcher: None,
listening_sockets_for_watch_mode: Mutex::new(Vec::new()),
temp_pipe_read_buffer: None,
s3_default_client: Strong::empty(),
Expand Down Expand Up @@ -640,6 +645,12 @@ impl RareData {
&mut self.node_fs_stat_watcher_scheduler
}

/// Raw slot — lazy-init body lives in `bun_runtime::node::memory_pressure`.
#[inline]
pub fn memory_pressure_watcher_slot(&mut self) -> &mut Option<NonNull<c_void>> {
&mut self.memory_pressure_watcher
}

/// Raw slot — lazy-init body lives in `bun_http_jsc::WebSocketDeflate`.
#[inline]
pub fn websocket_deflate_slot(&mut self) -> &mut Option<ErasedBox> {
Expand Down
9 changes: 8 additions & 1 deletion src/runtime/dispatch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -424,6 +424,10 @@ pub fn run_task(
global,
);
}
task_tag::MemoryPressureTask => {
// `ptr` is the packed level (NOTE_MEMORYSTATUS_PRESSURE_* bits), not a pointer.
crate::node::memory_pressure::emit(global, task.ptr as usize as i32);
}
task_tag::NativePromiseContextDeferredDerefTask => {
// `ptr` packs an int, not a pointer.
NativePromiseContextDeferredDerefTask::run_from_js_thread(task.ptr as usize);
Expand Down Expand Up @@ -579,7 +583,7 @@ fn run_task_cold(task: Task) {
/// Compile-time guard that the arm count above tracks
/// `bun_event_loop::task_tag::COUNT`. Bump when adding a variant.
const _: () = assert!(
task_tag::COUNT == 96,
task_tag::COUNT == 97,
"dispatch::run_task arm count out of sync with bun_event_loop::task_tag",
);

Expand Down Expand Up @@ -679,6 +683,9 @@ pub unsafe fn __bun_run_file_poll(poll: *mut FilePoll, size_or_offset: i64) {
// SAFETY: `proc` carries the +1 ref taken at queue time; this drops it.
unsafe { Process::on_wait_pid_from_event_loop_task(proc) };
}
poll_tag::MEMORY_PRESSURE => {
crate::node::memory_pressure::on_poll(owner.ptr.cast(), size_or_offset);
}
poll_tag::PARENT_DEATH_WATCHDOG => {
let wd = owner_as!(bun_io::parent_death_watchdog::ParentDeathWatchdog);
// Mac-only — debug-assert elsewhere (Linux uses prctl(PR_SET_PDEATHSIG)).
Expand Down
Loading
Loading