From bc5c1e40167996a27ee02ece3cef276743bfe489 Mon Sep 17 00:00:00 2001 From: Claude Date: Sat, 3 Jan 2026 21:30:11 +0000 Subject: [PATCH 1/9] Fix crash in MessageChannel with high-volume messages between workers ## Problem MessageChannel would crash with a segmentation fault when workers exchanged many messages rapidly (e.g., using setInterval). The issue manifested in two ways: 1. Segmentation fault at address 0x30 during high-volume message passing 2. Assertion failure: ASSERTION FAILED: m_openChannels.get(channel.port1()) == &channel ## Root Cause Both MessagePortChannel and MessagePortChannelRegistry had critical thread safety issues. These classes were being accessed from multiple worker threads simultaneously without any synchronization: **MessagePortChannel** - No locks protecting: - m_pendingMessages[2]: Vector operations (append, swap, removeAt) - m_processes[2]: Process identifier tracking - m_entangledToProcessProtectors[2]: Reference counting - m_messageBatchesInFlight: Message counter **MessagePortChannelRegistry** - Global singleton with no locks protecting: - m_openChannels: HashMap accessed by all workers for channel lookup When Worker1 and Worker2 both called postMessageToRemote() or takeAllMessagesForPort() concurrently, race conditions on Vector and HashMap operations led to memory corruption and crashes. ## Solution ### MessagePortChannel Thread Safety - Added Lock m_lock member variable - Protected all shared state with WTF_GUARDED_BY_LOCK(m_lock) annotations - Added Locker guards in 7 methods: - processForPort() - entanglePortWithProcess() - disentanglePort() - closePort() - postMessageToRemote() - takeAllMessagesForPort() - tryTakeMessageForPort() ### MessagePortChannelRegistry Thread Safety - Added Lock m_lock member variable - Protected m_openChannels HashMap with WTF_GUARDED_BY_LOCK(m_lock) - Added Locker guards in 9 methods: - messagePortChannelCreated() - messagePortChannelDestroyed() - didEntangleLocalToRemote() - didDisentangleMessagePort() - didCloseMessagePort() - didPostMessageToRemote() - takeAllMessagesForPort() - tryTakeMessageForPort() - existingChannelContainingPort() ### Deadlock Prevention To avoid deadlocks (since both Registry and Channel have locks), the pattern used is: 1. Grab a RefPtr while holding the registry lock 2. Release the registry lock 3. Call the channel's methods (which acquire the channel's lock) This ensures we never hold two locks simultaneously while maintaining thread safety. ## Files Modified - src/bun.js/bindings/webcore/MessagePortChannel.h - src/bun.js/bindings/webcore/MessagePortChannel.cpp - src/bun.js/bindings/webcore/MessagePortChannelRegistry.h - src/bun.js/bindings/webcore/MessagePortChannelRegistry.cpp Fixes: https://github.com/oven-sh/bun/issues/25805 Fixes: https://github.com/oven-sh/bun/issues/16186 --- .../bindings/webcore/MessagePortChannel.cpp | 52 ++++++++++++------- .../bindings/webcore/MessagePortChannel.h | 20 +++---- .../webcore/MessagePortChannelRegistry.cpp | 46 +++++++++++++--- .../webcore/MessagePortChannelRegistry.h | 4 +- 4 files changed, 88 insertions(+), 34 deletions(-) diff --git a/src/bun.js/bindings/webcore/MessagePortChannel.cpp b/src/bun.js/bindings/webcore/MessagePortChannel.cpp index ba984626f45..b149d47b675 100644 --- a/src/bun.js/bindings/webcore/MessagePortChannel.cpp +++ b/src/bun.js/bindings/webcore/MessagePortChannel.cpp @@ -61,6 +61,7 @@ std::optional MessagePortChannel::processForPort(const Messag { ASSERT(port == m_ports[0] || port == m_ports[1]); size_t i = port == m_ports[0] ? 0 : 1; + Locker locker { m_lock }; return m_processes[i]; } @@ -76,6 +77,7 @@ void MessagePortChannel::entanglePortWithProcess(const MessagePortIdentifier& po // LOG(MessagePorts, "MessagePortChannel %s (%p) entangling port %s (that port has %zu messages available)", logString().utf8().data(), this, port.logString().utf8().data(), m_pendingMessages[i].size()); + Locker locker { m_lock }; ASSERT(!m_processes[i] || *m_processes[i] == process); m_processes[i] = process; m_entangledToProcessProtectors[i] = this; @@ -89,13 +91,17 @@ void MessagePortChannel::disentanglePort(const MessagePortIdentifier& port) ASSERT(port == m_ports[0] || port == m_ports[1]); size_t i = port == m_ports[0] ? 0 : 1; - ASSERT(m_processes[i] || m_isClosed[i]); - m_processes[i] = std::nullopt; - m_pendingMessagePortTransfers[i].add(this); + RefPtr protectedThis; + { + Locker locker { m_lock }; + ASSERT(m_processes[i] || m_isClosed[i]); + m_processes[i] = std::nullopt; + m_pendingMessagePortTransfers[i].add(this); - // This set of steps is to guarantee that the lock is unlocked before the - // last ref to this object is released. - auto protectedThis = WTF::move(m_entangledToProcessProtectors[i]); + // This set of steps is to guarantee that the lock is unlocked before the + // last ref to this object is released. + protectedThis = WTF::move(m_entangledToProcessProtectors[i]); + } } void MessagePortChannel::closePort(const MessagePortIdentifier& port) @@ -103,13 +109,14 @@ void MessagePortChannel::closePort(const MessagePortIdentifier& port) ASSERT(port == m_ports[0] || port == m_ports[1]); size_t i = port == m_ports[0] ? 0 : 1; - m_processes[i] = std::nullopt; - m_isClosed[i] = true; - // This set of steps is to guarantee that the lock is unlocked before the // last ref to this object is released. Ref protectedThis { *this }; + Locker locker { m_lock }; + m_processes[i] = std::nullopt; + m_isClosed[i] = true; + m_pendingMessages[i].clear(); m_pendingMessagePortTransfers[i].clear(); m_pendingMessageProtectors[i] = nullptr; @@ -121,6 +128,7 @@ bool MessagePortChannel::postMessageToRemote(MessageWithMessagePorts&& message, ASSERT(remoteTarget == m_ports[0] || remoteTarget == m_ports[1]); size_t i = remoteTarget == m_ports[0] ? 0 : 1; + Locker locker { m_lock }; m_pendingMessages[i].append(WTF::move(message)); // LOG(MessagePorts, "MessagePortChannel %s (%p) now has %zu messages pending on port %s", logString().utf8().data(), this, m_pendingMessages[i].size(), remoteTarget.logString().utf8().data()); @@ -140,22 +148,29 @@ void MessagePortChannel::takeAllMessagesForPort(const MessagePortIdentifier& por ASSERT(port == m_ports[0] || port == m_ports[1]); size_t i = port == m_ports[0] ? 0 : 1; - if (m_pendingMessages[i].isEmpty()) { - callback({}, [] {}); - return; - } + Vector result; + RefPtr protectedThis; - ASSERT(m_pendingMessageProtectors[i]); + { + Locker locker { m_lock }; - Vector result; - result.swap(m_pendingMessages[i]); + if (m_pendingMessages[i].isEmpty()) { + callback({}, [] {}); + return; + } + + ASSERT(m_pendingMessageProtectors[i]); - ++m_messageBatchesInFlight; + result.swap(m_pendingMessages[i]); + ++m_messageBatchesInFlight; + protectedThis = WTF::move(m_pendingMessageProtectors[i]); + } // LOG(MessagePorts, "There are %zu messages to take for port %s. Taking them now, messages in flight is now %" PRIu64, result.size(), port.logString().utf8().data(), m_messageBatchesInFlight); - callback(WTF::move(result), [this, port, protectedThis = WTF::move(m_pendingMessageProtectors[i])] { + callback(WTF::move(result), [this, port, protectedThis = WTF::move(protectedThis)] { UNUSED_PARAM(port); + Locker locker { m_lock }; --m_messageBatchesInFlight; // LOG(MessagePorts, "Message port channel %s was notified that a batch of %zu message port messages targeted for port %s just completed dispatch, in flight is now %" PRIu64, logString().utf8().data(), size, port.logString().utf8().data(), m_messageBatchesInFlight); }); @@ -166,6 +181,7 @@ std::optional MessagePortChannel::tryTakeMessageForPort ASSERT(port == m_ports[0] || port == m_ports[1]); size_t i = port == m_ports[0] ? 0 : 1; + Locker locker { m_lock }; if (m_pendingMessages[i].isEmpty()) return std::nullopt; diff --git a/src/bun.js/bindings/webcore/MessagePortChannel.h b/src/bun.js/bindings/webcore/MessagePortChannel.h index 1f3e408b61e..be7513dbfcc 100644 --- a/src/bun.js/bindings/webcore/MessagePortChannel.h +++ b/src/bun.js/bindings/webcore/MessagePortChannel.h @@ -33,6 +33,7 @@ #include #include #include +#include namespace WebCore { @@ -59,7 +60,7 @@ class MessagePortChannel : public RefCountedAndCanMakeWeakPtr m_processes[2]; - RefPtr m_entangledToProcessProtectors[2]; - Vector m_pendingMessages[2]; - UncheckedKeyHashSet> m_pendingMessagePortTransfers[2]; - RefPtr m_pendingMessageProtectors[2]; - uint64_t m_messageBatchesInFlight { 0 }; - + bool m_isClosed[2] WTF_GUARDED_BY_LOCK(m_lock) { false, false }; + std::optional m_processes[2] WTF_GUARDED_BY_LOCK(m_lock); + RefPtr m_entangledToProcessProtectors[2] WTF_GUARDED_BY_LOCK(m_lock); + Vector m_pendingMessages[2] WTF_GUARDED_BY_LOCK(m_lock); + UncheckedKeyHashSet> m_pendingMessagePortTransfers[2] WTF_GUARDED_BY_LOCK(m_lock); + RefPtr m_pendingMessageProtectors[2] WTF_GUARDED_BY_LOCK(m_lock); + uint64_t m_messageBatchesInFlight WTF_GUARDED_BY_LOCK(m_lock) { 0 }; + + Lock m_lock; MessagePortChannelRegistry& m_registry; }; diff --git a/src/bun.js/bindings/webcore/MessagePortChannelRegistry.cpp b/src/bun.js/bindings/webcore/MessagePortChannelRegistry.cpp index 66ed5997cf5..ddbf74e9048 100644 --- a/src/bun.js/bindings/webcore/MessagePortChannelRegistry.cpp +++ b/src/bun.js/bindings/webcore/MessagePortChannelRegistry.cpp @@ -58,6 +58,7 @@ void MessagePortChannelRegistry::messagePortChannelCreated(MessagePortChannel& c { // ASSERT(isMainThread()); + Locker locker { m_lock }; auto result = m_openChannels.add(channel.port1(), channel); ASSERT_UNUSED(result, result.isNewEntry); @@ -69,6 +70,7 @@ void MessagePortChannelRegistry::messagePortChannelDestroyed(MessagePortChannel& { // ASSERT(isMainThread()); + Locker locker { m_lock }; ASSERT(m_openChannels.get(channel.port1()) == &channel); ASSERT(m_openChannels.get(channel.port2()) == &channel); @@ -83,7 +85,12 @@ void MessagePortChannelRegistry::didEntangleLocalToRemote(const MessagePortIdent // ASSERT(isMainThread()); // The channel might be gone if the remote side was closed. - RefPtr channel = m_openChannels.get(local); + RefPtr channel; + { + Locker locker { m_lock }; + channel = m_openChannels.get(local); + } + if (!channel) return; @@ -97,7 +104,13 @@ void MessagePortChannelRegistry::didDisentangleMessagePort(const MessagePortIden // ASSERT(isMainThread()); // The channel might be gone if the remote side was closed. - if (RefPtr channel = m_openChannels.get(port)) + RefPtr channel; + { + Locker locker { m_lock }; + channel = m_openChannels.get(port); + } + + if (channel) channel->disentanglePort(port); } @@ -107,7 +120,12 @@ void MessagePortChannelRegistry::didCloseMessagePort(const MessagePortIdentifier // LOG(MessagePorts, "Registry: MessagePort %s closed in registry", port.logString().utf8().data()); - RefPtr channel = m_openChannels.get(port); + RefPtr channel; + { + Locker locker { m_lock }; + channel = m_openChannels.get(port); + } + if (!channel) return; @@ -129,7 +147,12 @@ bool MessagePortChannelRegistry::didPostMessageToRemote(MessageWithMessagePorts& // LOG(MessagePorts, "Registry: Posting message to MessagePort %s in registry", remoteTarget.logString().utf8().data()); // The channel might be gone if the remote side was closed. - RefPtr channel = m_openChannels.get(remoteTarget); + RefPtr channel; + { + Locker locker { m_lock }; + channel = m_openChannels.get(remoteTarget); + } + if (!channel) { // LOG(MessagePorts, "Registry: Could not find MessagePortChannel for port %s; It was probably closed. Message will be dropped.", remoteTarget.logString().utf8().data()); return false; @@ -143,7 +166,12 @@ void MessagePortChannelRegistry::takeAllMessagesForPort(const MessagePortIdentif // ASSERT(isMainThread()); // The channel might be gone if the remote side was closed. - RefPtr channel = m_openChannels.get(port); + RefPtr channel; + { + Locker locker { m_lock }; + channel = m_openChannels.get(port); + } + if (!channel) { callback({}, [] {}); return; @@ -159,7 +187,12 @@ std::optional MessagePortChannelRegistry::tryTakeMessag // LOG(MessagePorts, "Registry: Trying to take a message for MessagePort %s", port.logString().utf8().data()); // The channel might be gone if the remote side was closed. - auto* channel = m_openChannels.get(port); + RefPtr channel; + { + Locker locker { m_lock }; + channel = m_openChannels.get(port); + } + if (!channel) return std::nullopt; @@ -170,6 +203,7 @@ MessagePortChannel* MessagePortChannelRegistry::existingChannelContainingPort(co { // ASSERT(isMainThread()); + Locker locker { m_lock }; return m_openChannels.get(port); } diff --git a/src/bun.js/bindings/webcore/MessagePortChannelRegistry.h b/src/bun.js/bindings/webcore/MessagePortChannelRegistry.h index 0eed464aeec..7d75070b2b4 100644 --- a/src/bun.js/bindings/webcore/MessagePortChannelRegistry.h +++ b/src/bun.js/bindings/webcore/MessagePortChannelRegistry.h @@ -31,6 +31,7 @@ #include "ProcessIdentifier.h" #include #include +#include namespace WebCore { @@ -57,7 +58,8 @@ class MessagePortChannelRegistry final : public CanMakeWeakPtr> m_openChannels; + UncheckedKeyHashMap> m_openChannels WTF_GUARDED_BY_LOCK(m_lock); + Lock m_lock; }; } // namespace WebCore From a674cd5a222b39e0a08e29a2af5a96bc00945f55 Mon Sep 17 00:00:00 2001 From: Claude Date: Sat, 3 Jan 2026 23:24:04 +0000 Subject: [PATCH 2/9] Fix race condition in messagePortChannelDestroyed The channel might have already been removed from m_openChannels by the time the destructor is called (e.g., if didCloseMessagePort was called first). Changed the ASSERT to a conditional check to handle this case gracefully. Also fixed messagePortChannelCreated to clean up stale WeakRef entries before adding new channels. When a channel is destroyed, the WeakRef becomes null but the map entry may still exist, so we need to remove these before adding new entries with the same identifiers. --- .../webcore/MessagePortChannelRegistry.cpp | 25 ++++++++++++++++--- 1 file changed, 21 insertions(+), 4 deletions(-) diff --git a/src/bun.js/bindings/webcore/MessagePortChannelRegistry.cpp b/src/bun.js/bindings/webcore/MessagePortChannelRegistry.cpp index ddbf74e9048..533c16f66e6 100644 --- a/src/bun.js/bindings/webcore/MessagePortChannelRegistry.cpp +++ b/src/bun.js/bindings/webcore/MessagePortChannelRegistry.cpp @@ -59,6 +59,19 @@ void MessagePortChannelRegistry::messagePortChannelCreated(MessagePortChannel& c // ASSERT(isMainThread()); Locker locker { m_lock }; + + // Remove any existing entries with null WeakRefs before adding new channel + // This can happen if a channel was destroyed but the map entry wasn't cleaned up + auto existingPort1 = m_openChannels.get(channel.port1()); + if (existingPort1 && !existingPort1.get()) { + m_openChannels.remove(channel.port1()); + } + + auto existingPort2 = m_openChannels.get(channel.port2()); + if (existingPort2 && !existingPort2.get()) { + m_openChannels.remove(channel.port2()); + } + auto result = m_openChannels.add(channel.port1(), channel); ASSERT_UNUSED(result, result.isNewEntry); @@ -71,11 +84,15 @@ void MessagePortChannelRegistry::messagePortChannelDestroyed(MessagePortChannel& // ASSERT(isMainThread()); Locker locker { m_lock }; - ASSERT(m_openChannels.get(channel.port1()) == &channel); - ASSERT(m_openChannels.get(channel.port2()) == &channel); - m_openChannels.remove(channel.port1()); - m_openChannels.remove(channel.port2()); + // The channel might have already been removed if it was closed before destruction + if (m_openChannels.get(channel.port1()) == &channel) { + m_openChannels.remove(channel.port1()); + } + + if (m_openChannels.get(channel.port2()) == &channel) { + m_openChannels.remove(channel.port2()); + } // LOG(MessagePorts, "Registry: After removing channel %s there are %u channels left in the registry:", channel.logString().utf8().data(), m_openChannels.size()); } From 548e9d4a5e22337be0bdfa1f815db683f1ab1ff3 Mon Sep 17 00:00:00 2001 From: Tobias Mose Date: Sun, 4 Jan 2026 00:34:59 +0100 Subject: [PATCH 3/9] Revert "Fix race condition in messagePortChannelDestroyed" This reverts commit a674cd5a222b39e0a08e29a2af5a96bc00945f55. --- .../webcore/MessagePortChannelRegistry.cpp | 25 +++---------------- 1 file changed, 4 insertions(+), 21 deletions(-) diff --git a/src/bun.js/bindings/webcore/MessagePortChannelRegistry.cpp b/src/bun.js/bindings/webcore/MessagePortChannelRegistry.cpp index 533c16f66e6..ddbf74e9048 100644 --- a/src/bun.js/bindings/webcore/MessagePortChannelRegistry.cpp +++ b/src/bun.js/bindings/webcore/MessagePortChannelRegistry.cpp @@ -59,19 +59,6 @@ void MessagePortChannelRegistry::messagePortChannelCreated(MessagePortChannel& c // ASSERT(isMainThread()); Locker locker { m_lock }; - - // Remove any existing entries with null WeakRefs before adding new channel - // This can happen if a channel was destroyed but the map entry wasn't cleaned up - auto existingPort1 = m_openChannels.get(channel.port1()); - if (existingPort1 && !existingPort1.get()) { - m_openChannels.remove(channel.port1()); - } - - auto existingPort2 = m_openChannels.get(channel.port2()); - if (existingPort2 && !existingPort2.get()) { - m_openChannels.remove(channel.port2()); - } - auto result = m_openChannels.add(channel.port1(), channel); ASSERT_UNUSED(result, result.isNewEntry); @@ -84,15 +71,11 @@ void MessagePortChannelRegistry::messagePortChannelDestroyed(MessagePortChannel& // ASSERT(isMainThread()); Locker locker { m_lock }; + ASSERT(m_openChannels.get(channel.port1()) == &channel); + ASSERT(m_openChannels.get(channel.port2()) == &channel); - // The channel might have already been removed if it was closed before destruction - if (m_openChannels.get(channel.port1()) == &channel) { - m_openChannels.remove(channel.port1()); - } - - if (m_openChannels.get(channel.port2()) == &channel) { - m_openChannels.remove(channel.port2()); - } + m_openChannels.remove(channel.port1()); + m_openChannels.remove(channel.port2()); // LOG(MessagePorts, "Registry: After removing channel %s there are %u channels left in the registry:", channel.logString().utf8().data(), m_openChannels.size()); } From 343af4ddacacb09e2a51ce4fcca74b82b97d818c Mon Sep 17 00:00:00 2001 From: Tobias Mose Date: Sun, 4 Jan 2026 01:16:33 +0100 Subject: [PATCH 4/9] Fix assertion failure in MessagePortChannel with high-volume worker messaging - Handle race condition in messagePortChannelDestroyed where channel may already be removed when both ports close from different threads - Clean up stale WeakRef entries in messagePortChannelCreated - Implement missing hasAnyMessagesPendingOrInFlight() and beingTransferredCount() - Make m_lock mutable for const method access --- .../bindings/webcore/MessagePortChannel.cpp | 12 ++++++++ .../bindings/webcore/MessagePortChannel.h | 2 +- .../webcore/MessagePortChannelRegistry.cpp | 29 +++++++++++++++---- 3 files changed, 37 insertions(+), 6 deletions(-) diff --git a/src/bun.js/bindings/webcore/MessagePortChannel.cpp b/src/bun.js/bindings/webcore/MessagePortChannel.cpp index b149d47b675..7a257ba6863 100644 --- a/src/bun.js/bindings/webcore/MessagePortChannel.cpp +++ b/src/bun.js/bindings/webcore/MessagePortChannel.cpp @@ -190,4 +190,16 @@ std::optional MessagePortChannel::tryTakeMessageForPort return WTF::move(message); } +bool MessagePortChannel::hasAnyMessagesPendingOrInFlight() const +{ + Locker locker { m_lock }; + return !m_pendingMessages[0].isEmpty() || !m_pendingMessages[1].isEmpty() || m_messageBatchesInFlight > 0; +} + +uint64_t MessagePortChannel::beingTransferredCount() const +{ + Locker locker { m_lock }; + return m_pendingMessagePortTransfers[0].size() + m_pendingMessagePortTransfers[1].size(); +} + } // namespace WebCore diff --git a/src/bun.js/bindings/webcore/MessagePortChannel.h b/src/bun.js/bindings/webcore/MessagePortChannel.h index be7513dbfcc..1389df06376 100644 --- a/src/bun.js/bindings/webcore/MessagePortChannel.h +++ b/src/bun.js/bindings/webcore/MessagePortChannel.h @@ -81,7 +81,7 @@ class MessagePortChannel : public RefCountedAndCanMakeWeakPtr m_pendingMessageProtectors[2] WTF_GUARDED_BY_LOCK(m_lock); uint64_t m_messageBatchesInFlight WTF_GUARDED_BY_LOCK(m_lock) { 0 }; - Lock m_lock; + mutable Lock m_lock; MessagePortChannelRegistry& m_registry; }; diff --git a/src/bun.js/bindings/webcore/MessagePortChannelRegistry.cpp b/src/bun.js/bindings/webcore/MessagePortChannelRegistry.cpp index ddbf74e9048..42638270bd4 100644 --- a/src/bun.js/bindings/webcore/MessagePortChannelRegistry.cpp +++ b/src/bun.js/bindings/webcore/MessagePortChannelRegistry.cpp @@ -59,6 +59,17 @@ void MessagePortChannelRegistry::messagePortChannelCreated(MessagePortChannel& c // ASSERT(isMainThread()); Locker locker { m_lock }; + + // When a channel is destroyed, its WeakRef becomes null but the map entry may still exist. + // Clean up any stale entries before adding new channels with the same port identifiers. + auto existingChannel1 = m_openChannels.get(channel.port1()); + if (!existingChannel1) + m_openChannels.remove(channel.port1()); + + auto existingChannel2 = m_openChannels.get(channel.port2()); + if (!existingChannel2) + m_openChannels.remove(channel.port2()); + auto result = m_openChannels.add(channel.port1(), channel); ASSERT_UNUSED(result, result.isNewEntry); @@ -71,11 +82,19 @@ void MessagePortChannelRegistry::messagePortChannelDestroyed(MessagePortChannel& // ASSERT(isMainThread()); Locker locker { m_lock }; - ASSERT(m_openChannels.get(channel.port1()) == &channel); - ASSERT(m_openChannels.get(channel.port2()) == &channel); - - m_openChannels.remove(channel.port1()); - m_openChannels.remove(channel.port2()); + + // The channel might have already been removed from m_openChannels if both ports + // were closed in quick succession from different threads. With WeakRef, the entries + // may still exist but point to null, or may have been removed entirely. + // We defensively remove the entries without asserting they match. + auto* existingChannel1 = m_openChannels.get(channel.port1()); + auto* existingChannel2 = m_openChannels.get(channel.port2()); + + // Only remove if the entry points to this channel (or is null/stale) + if (!existingChannel1 || existingChannel1 == &channel) + m_openChannels.remove(channel.port1()); + if (!existingChannel2 || existingChannel2 == &channel) + m_openChannels.remove(channel.port2()); // LOG(MessagePorts, "Registry: After removing channel %s there are %u channels left in the registry:", channel.logString().utf8().data(), m_openChannels.size()); } From f196d6f0a0e3beeb582ce5f14df1a80cd2cc2c39 Mon Sep 17 00:00:00 2001 From: Tobias Mose Date: Sun, 4 Jan 2026 02:03:04 +0100 Subject: [PATCH 5/9] Fix potential deadlock in takeAllMessagesForPort by invoking callback outside lock Move the empty-case callback invocation outside the Locker scope to prevent deadlocks if the callback acquires the same or other locks. --- .../bindings/webcore/MessagePortChannel.cpp | 20 ++++++++++++------- 1 file changed, 13 insertions(+), 7 deletions(-) diff --git a/src/bun.js/bindings/webcore/MessagePortChannel.cpp b/src/bun.js/bindings/webcore/MessagePortChannel.cpp index 7a257ba6863..fbc8296d1f3 100644 --- a/src/bun.js/bindings/webcore/MessagePortChannel.cpp +++ b/src/bun.js/bindings/webcore/MessagePortChannel.cpp @@ -150,20 +150,26 @@ void MessagePortChannel::takeAllMessagesForPort(const MessagePortIdentifier& por Vector result; RefPtr protectedThis; + bool isEmpty = false; { Locker locker { m_lock }; if (m_pendingMessages[i].isEmpty()) { - callback({}, [] {}); - return; - } + isEmpty = true; + } else { + ASSERT(m_pendingMessageProtectors[i]); - ASSERT(m_pendingMessageProtectors[i]); + result.swap(m_pendingMessages[i]); + ++m_messageBatchesInFlight; + protectedThis = WTF::move(m_pendingMessageProtectors[i]); + } + } - result.swap(m_pendingMessages[i]); - ++m_messageBatchesInFlight; - protectedThis = WTF::move(m_pendingMessageProtectors[i]); + // Invoke callback outside the lock to avoid potential deadlocks + if (isEmpty) { + callback({}, [] {}); + return; } // LOG(MessagePorts, "There are %zu messages to take for port %s. Taking them now, messages in flight is now %" PRIu64, result.size(), port.logString().utf8().data(), m_messageBatchesInFlight); From 3d38c36ff776a97c38713f3dff189bf6c66abf4b Mon Sep 17 00:00:00 2001 From: "autofix-ci[bot]" <114827586+autofix-ci[bot]@users.noreply.github.com> Date: Tue, 6 Jan 2026 15:45:30 +0000 Subject: [PATCH 6/9] [autofix.ci] apply automated fixes --- .../bindings/webcore/MessagePortChannelRegistry.cpp | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/src/bun.js/bindings/webcore/MessagePortChannelRegistry.cpp b/src/bun.js/bindings/webcore/MessagePortChannelRegistry.cpp index 42638270bd4..f510a64ccbf 100644 --- a/src/bun.js/bindings/webcore/MessagePortChannelRegistry.cpp +++ b/src/bun.js/bindings/webcore/MessagePortChannelRegistry.cpp @@ -59,17 +59,17 @@ void MessagePortChannelRegistry::messagePortChannelCreated(MessagePortChannel& c // ASSERT(isMainThread()); Locker locker { m_lock }; - + // When a channel is destroyed, its WeakRef becomes null but the map entry may still exist. // Clean up any stale entries before adding new channels with the same port identifiers. auto existingChannel1 = m_openChannels.get(channel.port1()); if (!existingChannel1) m_openChannels.remove(channel.port1()); - + auto existingChannel2 = m_openChannels.get(channel.port2()); if (!existingChannel2) m_openChannels.remove(channel.port2()); - + auto result = m_openChannels.add(channel.port1(), channel); ASSERT_UNUSED(result, result.isNewEntry); @@ -82,14 +82,14 @@ void MessagePortChannelRegistry::messagePortChannelDestroyed(MessagePortChannel& // ASSERT(isMainThread()); Locker locker { m_lock }; - + // The channel might have already been removed from m_openChannels if both ports // were closed in quick succession from different threads. With WeakRef, the entries // may still exist but point to null, or may have been removed entirely. // We defensively remove the entries without asserting they match. auto* existingChannel1 = m_openChannels.get(channel.port1()); auto* existingChannel2 = m_openChannels.get(channel.port2()); - + // Only remove if the entry points to this channel (or is null/stale) if (!existingChannel1 || existingChannel1 == &channel) m_openChannels.remove(channel.port1()); From 5ff590e63dfc3a33523dba8addf17f8809781379 Mon Sep 17 00:00:00 2001 From: Tobias Mose Date: Wed, 7 Jan 2026 11:10:28 +0100 Subject: [PATCH 7/9] test: add fixture and tests for MessageChannel thread safety between workers This commit introduces a new fixture script and corresponding tests to validate the thread safety of MessageChannel communication between workers. The fixture simulates rapid message passing to reproduce a crash scenario from issue #25805, ensuring that the system remains stable under stress. The tests confirm that no segmentation faults occur during execution, validating the effectiveness of the implemented fixes. --- .../message-channel-thread-safety-fixture.js | 58 +++++++++++++++++++ .../message-channel-thread-safety.test.ts | 34 +++++++++++ 2 files changed, 92 insertions(+) create mode 100755 test/js/web/workers/message-channel-thread-safety-fixture.js create mode 100755 test/js/web/workers/message-channel-thread-safety.test.ts diff --git a/test/js/web/workers/message-channel-thread-safety-fixture.js b/test/js/web/workers/message-channel-thread-safety-fixture.js new file mode 100755 index 00000000000..69eb138a032 --- /dev/null +++ b/test/js/web/workers/message-channel-thread-safety-fixture.js @@ -0,0 +1,58 @@ +/** + * Fixture script to test MessageChannel thread safety between workers. + * + * This script reproduces the crash scenario from issue #25805 where + * rapid message passing between workers via MessageChannel caused + * segmentation faults due to race conditions. + * + * Uses a tight loop to reliably trigger the race condition. + * + * @see https://github.com/oven-sh/bun/issues/25805 + */ +const { Worker, isMainThread, parentPort } = require("worker_threads"); + +if (isMainThread) { + const worker1 = new Worker(__filename); + const worker2 = new Worker(__filename); + + const { port1, port2 } = new MessageChannel(); + worker1.postMessage(port1, [port1]); + worker2.postMessage(port2, [port2]); + + // Spam messages in a tight loop - this reliably triggers the crash + // for (let i = 0; i < 100000; i++) { + // worker1.postMessage("PING"); + // worker2.postMessage("PING"); + // } + + const startTime = Date.now(); + const duration = 500; // Run for 2 seconds + // Spam messages in a tight loop - this reliably triggers the crash + while (Date.now() - startTime < duration) { + worker1.postMessage("PING"); + worker2.postMessage("PING"); + } + + // If we get here without crashing, the fix works + console.log("SUCCESS"); + worker1.terminate(); + worker2.terminate(); + process.exit(0); +} else { + let port = null; + + parentPort.on("message", msg => { + if (msg instanceof MessagePort) { + msg.addEventListener("message", () => { + // Just receive the message + }); + msg.start(); + port = msg; + return; + } + + if (msg === "PING" && port) { + port.postMessage("PONG"); + } + }); +} diff --git a/test/js/web/workers/message-channel-thread-safety.test.ts b/test/js/web/workers/message-channel-thread-safety.test.ts new file mode 100755 index 00000000000..369543eda4b --- /dev/null +++ b/test/js/web/workers/message-channel-thread-safety.test.ts @@ -0,0 +1,34 @@ +/** + * Tests for MessageChannel thread safety between workers. + * + * @see https://github.com/oven-sh/bun/pull/25806 + * @see https://github.com/oven-sh/bun/issues/25805 + */ +import { expect, test } from "bun:test"; +import { bunEnv, bunExe } from "harness"; +import path from "path"; + +test("MessageChannel between workers does not crash with rapid messages", async () => { + const fixturePath = path.join(import.meta.dir, "message-channel-thread-safety-fixture.js"); + + const proc = Bun.spawn({ + cmd: [bunExe(), fixturePath], + env: bunEnv, + stdout: "pipe", + stderr: "pipe", + }); + + const exitCode = await proc.exited; + const stdout = await new Response(proc.stdout).text(); + const stderr = await new Response(proc.stderr).text(); + + if (exitCode !== 0) { + console.log("stdout:", stdout); + console.log("stderr:", stderr); + } + + expect(stderr).not.toContain("Segmentation fault"); + expect(stderr).not.toContain("Bun has crashed"); + expect(exitCode).toBe(0); + expect(stdout).toContain("SUCCESS"); +}, 60000); From 8879a86ed22fa361c43e7e359cbdd0bc80544eab Mon Sep 17 00:00:00 2001 From: Tobias Mose Date: Wed, 7 Jan 2026 11:59:27 +0100 Subject: [PATCH 8/9] Clean up obsolete and unnecessary comments --- .../web/workers/message-channel-thread-safety-fixture.js | 8 +------- 1 file changed, 1 insertion(+), 7 deletions(-) diff --git a/test/js/web/workers/message-channel-thread-safety-fixture.js b/test/js/web/workers/message-channel-thread-safety-fixture.js index 69eb138a032..d9a56d16428 100755 --- a/test/js/web/workers/message-channel-thread-safety-fixture.js +++ b/test/js/web/workers/message-channel-thread-safety-fixture.js @@ -19,14 +19,8 @@ if (isMainThread) { worker1.postMessage(port1, [port1]); worker2.postMessage(port2, [port2]); - // Spam messages in a tight loop - this reliably triggers the crash - // for (let i = 0; i < 100000; i++) { - // worker1.postMessage("PING"); - // worker2.postMessage("PING"); - // } - const startTime = Date.now(); - const duration = 500; // Run for 2 seconds + const duration = 500; // Spam messages in a tight loop - this reliably triggers the crash while (Date.now() - startTime < duration) { worker1.postMessage("PING"); From 0849a598c2c0623f3f29c7afe1069fa2e52dea0e Mon Sep 17 00:00:00 2001 From: Tobias Mose Date: Fri, 9 Jan 2026 12:41:58 +0100 Subject: [PATCH 9/9] Refactor MessagePortChannel to use ThreadSafeRefCounted and update related registry methods This commit modifies the MessagePortChannel class to inherit from ThreadSafeRefCounted and updates the MessagePortChannelRegistry to utilize ThreadSafeWeakPtr for managing open channels. Additionally, it removes the obsolete relaxAdoptionRequirement call and adjusts related comments and method signatures to reflect the new reference management strategy. --- .../bindings/webcore/MessagePortChannel.cpp | 2 -- .../bindings/webcore/MessagePortChannel.h | 5 +-- .../webcore/MessagePortChannelRegistry.cpp | 32 +++++++++---------- .../webcore/MessagePortChannelRegistry.h | 5 +-- 4 files changed, 22 insertions(+), 22 deletions(-) diff --git a/src/bun.js/bindings/webcore/MessagePortChannel.cpp b/src/bun.js/bindings/webcore/MessagePortChannel.cpp index fbc8296d1f3..03f30ba5a77 100644 --- a/src/bun.js/bindings/webcore/MessagePortChannel.cpp +++ b/src/bun.js/bindings/webcore/MessagePortChannel.cpp @@ -42,8 +42,6 @@ MessagePortChannel::MessagePortChannel(MessagePortChannelRegistry& registry, con : m_ports { port1, port2 } , m_registry(registry) { - relaxAdoptionRequirement(); - m_processes[0] = port1.processIdentifier; m_entangledToProcessProtectors[0] = this; m_processes[1] = port2.processIdentifier; diff --git a/src/bun.js/bindings/webcore/MessagePortChannel.h b/src/bun.js/bindings/webcore/MessagePortChannel.h index 1389df06376..5074d1a40e7 100644 --- a/src/bun.js/bindings/webcore/MessagePortChannel.h +++ b/src/bun.js/bindings/webcore/MessagePortChannel.h @@ -32,14 +32,15 @@ #include #include #include -#include +#include +#include #include namespace WebCore { class MessagePortChannelRegistry; -class MessagePortChannel : public RefCountedAndCanMakeWeakPtr { +class MessagePortChannel : public ThreadSafeRefCountedAndCanMakeThreadSafeWeakPtr { public: static Ref create(MessagePortChannelRegistry&, const MessagePortIdentifier& port1, const MessagePortIdentifier& port2); diff --git a/src/bun.js/bindings/webcore/MessagePortChannelRegistry.cpp b/src/bun.js/bindings/webcore/MessagePortChannelRegistry.cpp index f510a64ccbf..31150909f24 100644 --- a/src/bun.js/bindings/webcore/MessagePortChannelRegistry.cpp +++ b/src/bun.js/bindings/webcore/MessagePortChannelRegistry.cpp @@ -60,13 +60,13 @@ void MessagePortChannelRegistry::messagePortChannelCreated(MessagePortChannel& c Locker locker { m_lock }; - // When a channel is destroyed, its WeakRef becomes null but the map entry may still exist. + // When a channel is destroyed, its ThreadSafeWeakPtr becomes null but the map entry may still exist. // Clean up any stale entries before adding new channels with the same port identifiers. - auto existingChannel1 = m_openChannels.get(channel.port1()); + RefPtr existingChannel1 = m_openChannels.get(channel.port1()).get(); if (!existingChannel1) m_openChannels.remove(channel.port1()); - auto existingChannel2 = m_openChannels.get(channel.port2()); + RefPtr existingChannel2 = m_openChannels.get(channel.port2()).get(); if (!existingChannel2) m_openChannels.remove(channel.port2()); @@ -84,16 +84,16 @@ void MessagePortChannelRegistry::messagePortChannelDestroyed(MessagePortChannel& Locker locker { m_lock }; // The channel might have already been removed from m_openChannels if both ports - // were closed in quick succession from different threads. With WeakRef, the entries + // were closed in quick succession from different threads. With ThreadSafeWeakPtr, the entries // may still exist but point to null, or may have been removed entirely. // We defensively remove the entries without asserting they match. - auto* existingChannel1 = m_openChannels.get(channel.port1()); - auto* existingChannel2 = m_openChannels.get(channel.port2()); + RefPtr existingChannel1 = m_openChannels.get(channel.port1()).get(); + RefPtr existingChannel2 = m_openChannels.get(channel.port2()).get(); // Only remove if the entry points to this channel (or is null/stale) - if (!existingChannel1 || existingChannel1 == &channel) + if (!existingChannel1 || existingChannel1.get() == &channel) m_openChannels.remove(channel.port1()); - if (!existingChannel2 || existingChannel2 == &channel) + if (!existingChannel2 || existingChannel2.get() == &channel) m_openChannels.remove(channel.port2()); // LOG(MessagePorts, "Registry: After removing channel %s there are %u channels left in the registry:", channel.logString().utf8().data(), m_openChannels.size()); @@ -107,7 +107,7 @@ void MessagePortChannelRegistry::didEntangleLocalToRemote(const MessagePortIdent RefPtr channel; { Locker locker { m_lock }; - channel = m_openChannels.get(local); + channel = m_openChannels.get(local).get(); } if (!channel) @@ -126,7 +126,7 @@ void MessagePortChannelRegistry::didDisentangleMessagePort(const MessagePortIden RefPtr channel; { Locker locker { m_lock }; - channel = m_openChannels.get(port); + channel = m_openChannels.get(port).get(); } if (channel) @@ -142,7 +142,7 @@ void MessagePortChannelRegistry::didCloseMessagePort(const MessagePortIdentifier RefPtr channel; { Locker locker { m_lock }; - channel = m_openChannels.get(port); + channel = m_openChannels.get(port).get(); } if (!channel) @@ -169,7 +169,7 @@ bool MessagePortChannelRegistry::didPostMessageToRemote(MessageWithMessagePorts& RefPtr channel; { Locker locker { m_lock }; - channel = m_openChannels.get(remoteTarget); + channel = m_openChannels.get(remoteTarget).get(); } if (!channel) { @@ -188,7 +188,7 @@ void MessagePortChannelRegistry::takeAllMessagesForPort(const MessagePortIdentif RefPtr channel; { Locker locker { m_lock }; - channel = m_openChannels.get(port); + channel = m_openChannels.get(port).get(); } if (!channel) { @@ -209,7 +209,7 @@ std::optional MessagePortChannelRegistry::tryTakeMessag RefPtr channel; { Locker locker { m_lock }; - channel = m_openChannels.get(port); + channel = m_openChannels.get(port).get(); } if (!channel) @@ -218,12 +218,12 @@ std::optional MessagePortChannelRegistry::tryTakeMessag return channel->tryTakeMessageForPort(port); } -MessagePortChannel* MessagePortChannelRegistry::existingChannelContainingPort(const MessagePortIdentifier& port) +RefPtr MessagePortChannelRegistry::existingChannelContainingPort(const MessagePortIdentifier& port) { // ASSERT(isMainThread()); Locker locker { m_lock }; - return m_openChannels.get(port); + return m_openChannels.get(port).get(); } } // namespace WebCore diff --git a/src/bun.js/bindings/webcore/MessagePortChannelRegistry.h b/src/bun.js/bindings/webcore/MessagePortChannelRegistry.h index 7d75070b2b4..dad9345dbdc 100644 --- a/src/bun.js/bindings/webcore/MessagePortChannelRegistry.h +++ b/src/bun.js/bindings/webcore/MessagePortChannelRegistry.h @@ -32,6 +32,7 @@ #include #include #include +#include namespace WebCore { @@ -52,13 +53,13 @@ class MessagePortChannelRegistry final : public CanMakeWeakPtr&&, CompletionHandler&&)>&&); WEBCORE_EXPORT std::optional tryTakeMessageForPort(const MessagePortIdentifier&); - WEBCORE_EXPORT MessagePortChannel* existingChannelContainingPort(const MessagePortIdentifier&); + WEBCORE_EXPORT RefPtr existingChannelContainingPort(const MessagePortIdentifier&); WEBCORE_EXPORT void messagePortChannelCreated(MessagePortChannel&); WEBCORE_EXPORT void messagePortChannelDestroyed(MessagePortChannel&); private: - UncheckedKeyHashMap> m_openChannels WTF_GUARDED_BY_LOCK(m_lock); + UncheckedKeyHashMap> m_openChannels WTF_GUARDED_BY_LOCK(m_lock); Lock m_lock; };