Skip to content
Open
Show file tree
Hide file tree
Changes from 11 commits
Commits
Show all changes
49 commits
Select commit Hold shift + click to select a range
e20f9f9
node:worker_threads: implement postMessageToThread [1bx5ty]
robobun Apr 28, 2026
c93e868
test: bump timeouts on slow worker_threads fixture tests
robobun Apr 28, 2026
731b331
[autofix.ci] apply automated fixes
autofix-ci[bot] Apr 28, 2026
ac00c9e
[autofix.ci] apply automated fixes (attempt 2/3)
autofix-ci[bot] Apr 28, 2026
76848e4
test: use on/removeListener instead of once for workerMessage
robobun Apr 28, 2026
f5c6c08
MessagePort: restore explicit-ref path for jsRef()
robobun Apr 28, 2026
37c6af0
Merge branch 'main' into farm/6140e626/worker-postMessageToThread
dylan-conway Apr 28, 2026
9f5770e
MessagePort: don't ref on onmessageerror; document lazy mainThreadPor…
robobun Apr 29, 2026
d284e83
test: bump timeouts on execArgv/environmentData subprocess tests
robobun Apr 29, 2026
2869ce3
doc: clarify postMessageToThread pending-promise wording
robobun Apr 29, 2026
4e55aef
MessagePort: release event-loop ref in close()
robobun Apr 29, 2026
2896414
doc: note grandchild-orphan registry behaviour (matches Node)
robobun Apr 29, 2026
a56a2fe
test: set 30s default timeout for worker_threads.test.ts
robobun Apr 29, 2026
a916a10
MessagePort: unref when onmessage is set to a non-function
robobun Apr 29, 2026
394f3c3
doc: update WorkerOptions.h comment for 3-element serialized array
robobun Apr 29, 2026
54ed0ac
build(linux): only strip .eh_frame* when LTO is on
robobun Apr 29, 2026
abc2aca
worker_threads: cache threadId so destroyMainThreadPort gets the real id
robobun Apr 29, 2026
2a7ebc4
Worker: set OnlineFlag before posting 'open' to the parent
robobun Apr 29, 2026
b5fabb6
Merge remote-tracking branch 'origin/main' into farm/6140e626/worker-…
robobun Apr 29, 2026
e7b3f37
MessagePort: release event-loop ref in disentangle()
robobun Apr 29, 2026
421858e
Merge remote-tracking branch 'origin/main' into farm/6140e626/worker-…
robobun Apr 29, 2026
869a683
Merge remote-tracking branch 'origin/main' into farm/6140e626/worker-…
robobun Apr 30, 2026
a9cdbd3
Merge remote-tracking branch 'origin/main' into farm/6140e626/worker-…
robobun May 1, 2026
7da988b
MessagePort: keep JS wrapper alive while a drain is in progress
robobun May 1, 2026
b185dbe
MessagePortPipe: mirror peer's Closed bit so hasPendingActivity is a …
robobun May 1, 2026
f89b558
Merge remote-tracking branch 'origin/main' into farm/6140e626/worker-…
robobun May 1, 2026
37b4f72
Merge remote-tracking branch 'origin/main' into farm/6140e626/worker-…
robobun May 4, 2026
27d4a88
MessagePort: release event-loop ref in destructor
robobun May 4, 2026
5f762a4
MessagePort: re-enable m_hasRef on first message listener (Node parity)
robobun May 4, 2026
5921a26
Merge remote-tracking branch 'origin/main' into farm/6140e626/worker-…
robobun May 4, 2026
000ac8f
test: cover port.unref(); port.on() re-ref ordering (Node newListener…
robobun May 4, 2026
ca91e0e
MessagePort: mirror Node's removeListener → unref() on last message l…
robobun May 4, 2026
fe34005
Merge remote-tracking branch 'origin/main' into farm/6140e626/worker-…
robobun May 5, 2026
0256397
Merge remote-tracking branch 'origin/main' into farm/6140e626/worker-…
robobun May 15, 2026
ba2b25b
[autofix.ci] apply automated fixes
autofix-ci[bot] May 15, 2026
679c0cc
ErrorCode: append ERR_WORKER_MESSAGING_* at end to keep ordinals alig…
robobun May 15, 2026
b041a00
ci: retrigger (macOS 14 x64 node-http-backpressure-max timeout, share…
robobun May 15, 2026
51bf80f
codegen(bake): JSON.stringify OVERLAY_CSS define value
robobun May 15, 2026
6a28d90
Merge remote-tracking branch 'origin/main' into farm/6140e626/worker-…
robobun May 21, 2026
5162107
Merge remote-tracking branch 'origin/main' into farm/6140e626/worker-…
robobun Jun 10, 2026
dba9973
Merge branch 'main' into farm/6140e626/worker-postMessageToThread
robobun Jun 10, 2026
0936e25
Merge remote-tracking branch 'origin/main' into farm/6140e626/worker-…
robobun Jun 10, 2026
6e0f5e4
Merge remote-tracking branch 'origin/main' into farm/6140e626/worker-…
robobun Jun 16, 2026
a8c13bf
Merge remote-tracking branch 'origin/main' into farm/6140e626/worker-…
robobun Jun 17, 2026
7a5bc2d
worker: drain pending concurrent tasks on teardown
robobun Jun 17, 2026
82d5ef2
Merge remote-tracking branch 'origin/main' into farm/6140e626/worker-…
robobun Jun 18, 2026
acc8654
Merge remote-tracking branch 'origin/main' into farm/6140e626/worker-…
robobun Jun 19, 2026
06a74c0
test: sync message-port-pipe comment with PeerClosed rename
robobun Jun 19, 2026
f990b80
Merge remote-tracking branch 'origin/main' into farm/6140e626/worker-…
robobun Jun 23, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions src/bun.js/bindings/ErrorCode.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2390,6 +2390,14 @@ JSC_DEFINE_HOST_FUNCTION(Bun::jsFunctionMakeErrorWithCode, (JSC::JSGlobalObject
return JSC::JSValue::encode(createError(globalObject, ErrorCode::ERR_IPC_CHANNEL_CLOSED, "Channel closed."_s));
case ErrorCode::ERR_SOCKET_BAD_TYPE:
return JSC::JSValue::encode(createError(globalObject, ErrorCode::ERR_SOCKET_BAD_TYPE, "Bad socket type specified. Valid types are: udp4, udp6"_s));
case ErrorCode::ERR_WORKER_MESSAGING_ERRORED:
return JSC::JSValue::encode(createError(globalObject, ErrorCode::ERR_WORKER_MESSAGING_ERRORED, "The destination thread threw an error while processing the message"_s));
case ErrorCode::ERR_WORKER_MESSAGING_FAILED:
return JSC::JSValue::encode(createError(globalObject, ErrorCode::ERR_WORKER_MESSAGING_FAILED, "Cannot find the destination thread or listener"_s));
case ErrorCode::ERR_WORKER_MESSAGING_SAME_THREAD:
return JSC::JSValue::encode(createError(globalObject, ErrorCode::ERR_WORKER_MESSAGING_SAME_THREAD, "Cannot sent a message to the same thread"_s));
case ErrorCode::ERR_WORKER_MESSAGING_TIMEOUT:
return JSC::JSValue::encode(createError(globalObject, ErrorCode::ERR_WORKER_MESSAGING_TIMEOUT, "Sending a message to another thread timed out"_s));
case ErrorCode::ERR_ZLIB_INITIALIZATION_FAILED:
return JSC::JSValue::encode(createError(globalObject, ErrorCode::ERR_ZLIB_INITIALIZATION_FAILED, "Initialization failed"_s));
case ErrorCode::ERR_IPC_ONE_PIPE:
Expand Down
4 changes: 4 additions & 0 deletions src/bun.js/bindings/ErrorCode.ts
Original file line number Diff line number Diff line change
Expand Up @@ -271,6 +271,10 @@ const errors: ErrorCodeMapping = [
["ERR_WASI_NOT_STARTED", Error],
["ERR_WEBASSEMBLY_RESPONSE", TypeError],
["ERR_WORKER_INIT_FAILED", Error],
["ERR_WORKER_MESSAGING_ERRORED", Error],
["ERR_WORKER_MESSAGING_FAILED", Error],
["ERR_WORKER_MESSAGING_SAME_THREAD", Error],
["ERR_WORKER_MESSAGING_TIMEOUT", Error],
["ERR_WORKER_NOT_RUNNING", Error],
["ERR_ZLIB_INITIALIZATION_FAILED", Error],
["MODULE_NOT_FOUND", Error],
Expand Down
10 changes: 9 additions & 1 deletion src/bun.js/bindings/ScriptExecutionContext.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -134,13 +134,21 @@ ScriptExecutionContext::~ScriptExecutionContext()
Locker locker { allScriptExecutionContextsMapLock };
ASSERT_WITH_MESSAGE(!allScriptExecutionContextsMap().contains(m_identifier), "A ScriptExecutionContext subclass instance implementing postTask should have already removed itself from the map");
}
m_inScriptExecutionContextDestructor = true;
#endif // ASSERT_ENABLED

// Draining these handlers may drop the last reference to a MessagePort whose JS wrapper is
// already gone (e.g. when a worker is terminated while a port still has a pending
// processMessageWithMessagePortsSoon callback). Do this before setting
// m_inScriptExecutionContextDestructor so the observer can unregister itself without
// tripping the assertion below; takeAny() makes the subsequent loop safe regardless.
auto postMessageCompletionHandlers = WTF::move(m_processMessageWithMessagePortsSoonHandlers);
for (auto& completionHandler : postMessageCompletionHandlers)
completionHandler();

#if ASSERT_ENABLED
m_inScriptExecutionContextDestructor = true;
#endif // ASSERT_ENABLED

while (auto* destructionObserver = m_destructionObservers.takeAny())
destructionObserver->contextDestroyed();

Expand Down
3 changes: 1 addition & 2 deletions src/bun.js/bindings/webcore/EventEmitter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -116,8 +116,7 @@ bool EventEmitter::emitForBindings(const Identifier& eventType, const MarkedArgu
if (!scriptExecutionContext())
return false;

emit(eventType, arguments);
return true;
return emit(eventType, arguments);
}

bool EventEmitter::emit(const Identifier& eventType, const MarkedArgumentBuffer& arguments)
Expand Down
2 changes: 1 addition & 1 deletion src/bun.js/bindings/webcore/JSMessagePort.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -231,7 +231,7 @@
vm.writeBarrier(&thisObject, value);
ensureStillAliveHere(value);

thisObject.wrapped().jsRef(&lexicalGlobalObject);
// Unlike onmessage, Node.js does not ref the port when only onmessageerror is set.

Check notice on line 234 in src/bun.js/bindings/webcore/JSMessagePort.cpp

View check run for this annotation

Claude / Claude Code Review

onmessage = null still refs the event loop (Node divergence)

Pre-existing nit: `setJSMessagePort_onmessageSetter` (line 205) still calls `jsRef()` unconditionally, so `port.onmessage = fn; port.onmessage = null; port.hasRef()` stays `true` in Bun, whereas Node's `onmessage` setter branches on `typeof value === 'function'` and calls `unref()` otherwise (→ `false`). Not a regression — the old `jsRef()` behaved the same — but since this PR rewrote `jsRef`/`jsUnref`/`updateEventLoopRef`, added `m_wantsExplicitRef` (whose header comment names "the onmessage se

return true;
}
Expand Down
15 changes: 13 additions & 2 deletions src/bun.js/bindings/webcore/JSWorker.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -145,8 +145,10 @@ template<> JSC::EncodedJSValue JSC_HOST_CALL_ATTRIBUTES JSWorkerDOMConstructor::

WorkerOptions options {};
JSValue nodeWorkerObject {};
if (callFrame->argumentCount() == 3) {
JSValue mainThreadPort {};
if (callFrame->argumentCount() >= 3) {
nodeWorkerObject = callFrame->argument(2);
mainThreadPort = callFrame->argument(3);
options.kind = WorkerOptions::Kind::Node;
}
JSValue workerData = jsUndefined();
Expand Down Expand Up @@ -307,13 +309,22 @@ template<> JSC::EncodedJSValue JSC_HOST_CALL_ATTRIBUTES JSWorkerDOMConstructor::
}
}

// The internal port linking this worker to the main thread for worker_threads.postMessageToThread.
// Only present when constructed via node:worker_threads.
if (mainThreadPort && mainThreadPort.isObject()) {
transferList.append({ vm, mainThreadPort.getObject() });
} else {
mainThreadPort = jsUndefined();
}

Vector<RefPtr<MessagePort>> ports;
auto* valueToTransfer = constructEmptyArray(globalObject, nullptr, 2);
auto* valueToTransfer = constructEmptyArray(globalObject, nullptr, 3);
RETURN_IF_EXCEPTION(throwScope, {});
valueToTransfer->putDirectIndex(globalObject, 0, workerData);
auto* environmentData = globalObject->nodeWorkerEnvironmentData();
// If node:worker_threads has not been imported, environment data will not be set up yet.
valueToTransfer->putDirectIndex(globalObject, 1, environmentData ? environmentData : jsUndefined());
valueToTransfer->putDirectIndex(globalObject, 2, mainThreadPort);

ExceptionOr<Ref<SerializedScriptValue>> serialized = SerializedScriptValue::create(*lexicalGlobalObject, valueToTransfer, WTF::move(transferList), ports, SerializationForStorage::No, SerializationContext::WorkerPostMessage);
if (serialized.hasException()) {
Expand Down
59 changes: 29 additions & 30 deletions src/bun.js/bindings/webcore/MessagePort.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -45,8 +45,6 @@
#include <wtf/Lock.h>
#include <wtf/Scope.h>

extern "C" void Bun__eventLoop__incrementRefConcurrently(void* bunVM, int delta);

namespace WebCore {

WTF_MAKE_TZONE_ALLOCATED_IMPL(MessagePort);
Expand Down Expand Up @@ -241,6 +239,7 @@ void MessagePort::close()
MessagePortChannelProvider::singleton().messagePortClosed(m_identifier);

removeAllEventListeners();
updateEventLoopRef();
}

void MessagePort::dispatchMessages()
Expand Down Expand Up @@ -383,36 +382,40 @@ void MessagePort::contextDestroyed()
// ActiveDOMObject::contextDestroyed();
}

void MessagePort::updateEventLoopRef()
{
bool shouldRef = m_hasRef && (m_messageEventCount > 0 || m_wantsExplicitRef) && !m_isDetached;
if (shouldRef == m_isRefingEventLoop)
return;
auto* context = scriptExecutionContext();
if (!context) {
m_isRefingEventLoop = false;
return;
}
m_isRefingEventLoop = shouldRef;
if (shouldRef)
context->refEventLoop();
else
context->unrefEventLoop();
}

void MessagePort::onDidChangeListenerImpl(EventTarget& self, const AtomString& eventType, OnDidChangeListenerKind kind)
{
if (eventType == eventNames().messageEvent) {
auto& port = static_cast<MessagePort&>(self);
switch (kind) {
case Add:
if (port.m_messageEventCount == 0) {
auto* context = port.scriptExecutionContext();
if (context)
context->refEventLoop();
}
port.m_messageEventCount++;
break;
case Remove:
port.m_messageEventCount--;
if (port.m_messageEventCount == 0) {
auto* context = port.scriptExecutionContext();
if (context)
context->unrefEventLoop();
}
if (port.m_messageEventCount > 0)
port.m_messageEventCount--;
break;
case Clear:
if (port.m_messageEventCount > 0) {
auto* context = port.scriptExecutionContext();
if (context)
context->unrefEventLoop();
}
port.m_messageEventCount = 0;
break;
}
port.updateEventLoopRef();
}
Comment thread
coderabbitai[bot] marked this conversation as resolved.
};

Expand Down Expand Up @@ -453,22 +456,18 @@ WebCoreOpaqueRoot root(MessagePort* port)
return WebCoreOpaqueRoot { port };
}

void MessagePort::jsRef(JSGlobalObject* lexicalGlobalObject)
void MessagePort::jsRef(JSGlobalObject*)
{
if (!m_hasRef) {
m_hasRef = true;
ref();
Bun__eventLoop__incrementRefConcurrently(WebCore::clientData(lexicalGlobalObject->vm())->bunVM, 1);
}
m_hasRef = true;
m_wantsExplicitRef = true;
updateEventLoopRef();
}

void MessagePort::jsUnref(JSGlobalObject* lexicalGlobalObject)
void MessagePort::jsUnref(JSGlobalObject*)
{
if (m_hasRef) {
m_hasRef = false;
deref();
Bun__eventLoop__incrementRefConcurrently(WebCore::clientData(lexicalGlobalObject->vm())->bunVM, -1);
}
m_hasRef = false;
m_wantsExplicitRef = false;
updateEventLoopRef();
}

} // namespace WebCore
13 changes: 11 additions & 2 deletions src/bun.js/bindings/webcore/MessagePort.h
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ class MessagePort final : /* public ActiveDOMObject, */ public ContextDestructio

void jsRef(JSGlobalObject*);
void jsUnref(JSGlobalObject*);
bool jsHasRef() { return m_hasRef; }
bool jsHasRef() { return m_isRefingEventLoop; }

private:
explicit MessagePort(ScriptExecutionContext&, const MessagePortIdentifier& local, const MessagePortIdentifier& remote);
Expand All @@ -139,9 +139,18 @@ class MessagePort final : /* public ActiveDOMObject, */ public ContextDestructio

mutable std::atomic<unsigned> m_refCount { 1 };

bool m_hasRef { false };
// Whether this port should keep the event loop alive when it is active.
// Toggled by ref()/unref() from JS; unref() wins over everything else.
bool m_hasRef { true };
// Whether jsRef() has been called (from .ref() or the onmessage setter). This is one of the
// ways a port becomes "active" for event-loop-ref purposes; the other is m_messageEventCount,
// which is only tracked for ports whose onDidChangeListener is wired (transferred ports).
bool m_wantsExplicitRef { false };
// Whether this port is currently holding a ref on the event loop.
bool m_isRefingEventLoop { false };

uint32_t m_messageEventCount { 0 };
void updateEventLoopRef();
static void onDidChangeListenerImpl(EventTarget& self, const AtomString& eventType, OnDidChangeListenerKind kind);
};

Expand Down
11 changes: 8 additions & 3 deletions src/bun.js/bindings/webcore/Worker.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -584,6 +584,7 @@ JSValue createNodeWorkerThreadsBinding(Zig::GlobalObject* globalObject)
auto scope = DECLARE_THROW_SCOPE(globalObject->vm());
JSValue workerData = jsNull();
JSValue threadId = jsNumber(0);
JSValue mainThreadPort = jsUndefined();
JSMap* environmentData = nullptr;

if (auto* worker = WebWorker__getParentWorker(globalObject->bunVM())) {
Expand All @@ -592,17 +593,20 @@ JSValue createNodeWorkerThreadsBinding(Zig::GlobalObject* globalObject)
RefPtr<WebCore::SerializedScriptValue> serialized = WTF::move(options.workerDataAndEnvironmentData);
JSValue deserialized = serialized->deserialize(*globalObject, globalObject, WTF::move(ports));
RETURN_IF_EXCEPTION(scope, {});
// Should always be set to an Array of length 2 in the constructor in JSWorker.cpp
// Should always be set to an Array of length 3 in the constructor in JSWorker.cpp
auto* pair = uncheckedDowncast<JSArray>(deserialized);
ASSERT(pair->length() == 2);
ASSERT(pair->length() == 3);
ASSERT(pair->canGetIndexQuickly(0u));
ASSERT(pair->canGetIndexQuickly(1u));
ASSERT(pair->canGetIndexQuickly(2u));
workerData = pair->getIndexQuickly(0);
RETURN_IF_EXCEPTION(scope, {});
auto environmentDataValue = pair->getIndexQuickly(1);
// it might not be a Map if the parent had not set up environmentData yet
environmentData = environmentDataValue ? dynamicDowncast<JSMap>(environmentDataValue) : nullptr;
RETURN_IF_EXCEPTION(scope, {});
mainThreadPort = pair->getIndexQuickly(2);
RETURN_IF_EXCEPTION(scope, {});

// Main thread starts at 1
threadId = jsNumber(worker->clientIdentifier() - 1);
Expand All @@ -614,12 +618,13 @@ JSValue createNodeWorkerThreadsBinding(Zig::GlobalObject* globalObject)
ASSERT(environmentData);
globalObject->setNodeWorkerEnvironmentData(environmentData);

JSObject* array = constructEmptyArray(globalObject, nullptr, 4);
JSObject* array = constructEmptyArray(globalObject, nullptr, 5);
RETURN_IF_EXCEPTION(scope, {});
array->putDirectIndex(globalObject, 0, workerData);
array->putDirectIndex(globalObject, 1, threadId);
array->putDirectIndex(globalObject, 2, JSFunction::create(vm, globalObject, 1, "receiveMessageOnPort"_s, jsReceiveMessageOnPort, ImplementationVisibility::Public, NoIntrinsic));
array->putDirectIndex(globalObject, 3, environmentData);
array->putDirectIndex(globalObject, 4, mainThreadPort);
return array;
}

Expand Down
4 changes: 4 additions & 0 deletions src/js/builtins.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -710,6 +710,10 @@ declare function $ERR_IPC_DISCONNECTED(): Error;
declare function $ERR_SERVER_NOT_RUNNING(): Error;
declare function $ERR_IPC_CHANNEL_CLOSED(): Error;
declare function $ERR_SOCKET_BAD_TYPE(): Error;
declare function $ERR_WORKER_MESSAGING_ERRORED(): Error;
declare function $ERR_WORKER_MESSAGING_FAILED(): Error;
declare function $ERR_WORKER_MESSAGING_SAME_THREAD(): Error;
declare function $ERR_WORKER_MESSAGING_TIMEOUT(): Error;
declare function $ERR_ZLIB_INITIALIZATION_FAILED(): Error;
declare function $ERR_IPC_ONE_PIPE(): Error;
declare function $ERR_SOCKET_ALREADY_BOUND(): Error;
Expand Down
Loading
Loading