Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 9 additions & 3 deletions src/bun.js/bindings/webcore/BroadcastChannel.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ static UncheckedKeyHashMap<BroadcastChannelIdentifier, ThreadSafeWeakPtr<Broadca
}

static Lock channelToContextIdentifierLock;
static UncheckedKeyHashMap<BroadcastChannelIdentifier, ScriptExecutionContextIdentifier>& channelToContextIdentifier()
static UncheckedKeyHashMap<BroadcastChannelIdentifier, ScriptExecutionContextIdentifier>& channelToContextIdentifier() WTF_REQUIRES_LOCK(channelToContextIdentifierLock)
{
static NeverDestroyed<UncheckedKeyHashMap<BroadcastChannelIdentifier, ScriptExecutionContextIdentifier>> map;
return map;
Expand Down Expand Up @@ -134,6 +134,7 @@ void BroadcastChannel::MainThreadBridge::registerChannel(ScriptExecutionContext&

ScriptExecutionContext::ensureOnMainThread([protectedThis = WTF::move(protectedThis), contextId = context.identifier()](auto& context) mutable {
context.broadcastChannelRegistry().registerChannel(protectedThis->m_name, protectedThis->identifier());
Locker locker { channelToContextIdentifierLock };
channelToContextIdentifier().add(protectedThis->identifier(), contextId);
});
}
Expand All @@ -144,6 +145,7 @@ void BroadcastChannel::MainThreadBridge::unregisterChannel()

ScriptExecutionContext::ensureOnMainThread([protectedThis = WTF::move(protectedThis)](auto& context) {
context.broadcastChannelRegistry().unregisterChannel(protectedThis->m_name, protectedThis->identifier());
Locker locker { channelToContextIdentifierLock };
channelToContextIdentifier().remove(protectedThis->identifier());
});
}
Expand Down Expand Up @@ -233,7 +235,11 @@ void BroadcastChannel::dispatchMessageTo(BroadcastChannelIdentifier channelIdent
{
ASSERT(isMainThread());

auto contextIdentifier = channelToContextIdentifier().get(channelIdentifier);
ScriptExecutionContextIdentifier contextIdentifier;
{
Locker locker { channelToContextIdentifierLock };
contextIdentifier = channelToContextIdentifier().get(channelIdentifier);
}
if (!contextIdentifier)
return;

Expand All @@ -256,7 +262,7 @@ void BroadcastChannel::dispatchMessage(Ref<SerializedScriptValue>&& message)
if (m_isClosed)
return;

ScriptExecutionContext::postTaskTo(contextIdForBroadcastChannelId(m_mainThreadBridge->identifier()), [this, message = WTF::move(message)](ScriptExecutionContext& context) mutable {
ScriptExecutionContext::postTaskTo(contextIdForBroadcastChannelId(m_mainThreadBridge->identifier()), [this, protectedThis = Ref { *this }, message = WTF::move(message)](ScriptExecutionContext& context) mutable {
if (m_isClosed)
return;

Expand Down
164 changes: 135 additions & 29 deletions test/js/web/broadcastchannel/broadcast-channel-worker-gc.test.ts
Original file line number Diff line number Diff line change
@@ -1,14 +1,29 @@
import { expect, test } from "bun:test";
import { bunEnv, bunExe } from "harness";
import { bunEnv, bunExe, isASAN, isDebug } from "harness";

// Debug/ASAN builds are much slower at spawning workers.
const timeout = isDebug || isASAN ? 60_000 : 10_000;

// ASAN builds unconditionally print a warning about JSC signal handlers on
// startup. Strip it so we can still assert the subprocess produced no other
// stderr output.
function filterStderr(stderr: string): string {
return stderr
.split(/\r?\n/)
.filter(line => line && !line.startsWith("WARNING: ASAN interferes"))
.join("\n");
Comment thread
coderabbitai[bot] marked this conversation as resolved.
}

// Regression test for use-after-free in BroadcastChannel global map.
// Previously, the global map stored raw BroadcastChannel* pointers. If a Worker
// created a BroadcastChannel and then terminated, a message dispatched from the
// main thread could race with the Worker's destructor and dereference a dangling
// pointer. Now the map stores ThreadSafeWeakPtr<BroadcastChannel>, so the lookup
// returns null if the channel was destroyed.
test("BroadcastChannel: no UAF when posting to channel after worker terminates", async () => {
const script = /* js */ `
test(
"BroadcastChannel: no UAF when posting to channel after worker terminates",
async () => {
const script = /* js */ `
const workerCode = \`
const bc = new BroadcastChannel("worker-gc-test");
bc.onmessage = (e) => {
Expand Down Expand Up @@ -69,22 +84,111 @@ test("BroadcastChannel: no UAF when posting to channel after worker terminates",
console.log("OK");
`;

await using proc = Bun.spawn({
cmd: [bunExe(), "-e", script],
env: bunEnv,
stderr: "pipe",
stdout: "pipe",
});
await using proc = Bun.spawn({
cmd: [bunExe(), "-e", script],
env: bunEnv,
stderr: "pipe",
stdout: "pipe",
});

const [stdout, stderr, exitCode] = await Promise.all([proc.stdout.text(), proc.stderr.text(), proc.exited]);

expect(filterStderr(stderr)).toBe("");
expect(stdout.trim()).toBe("OK");
expect(exitCode).toBe(0);
},
timeout,
);

// Regression test for two additional races in BroadcastChannel:
// (A) channelToContextIdentifier() HashMap was only locked on the worker-thread
// reader, not on the main-thread writers (registerChannel/unregisterChannel/
// dispatchMessageTo). A rehash on main concurrent with a worker-side get()
// walks a freed bucket array → ASAN heap-use-after-free in WTF::HashTable.
// (B) dispatchMessage() posted a task capturing raw `this` without a protecting
// Ref. If the worker terminated and GC ran between posting and running the
// task, the task dereferenced a freed BroadcastChannel.
//
// This test maximises contention: workers post messages (triggering the
// worker-thread map read in dispatchMessage) while the main thread is churning
// channel registrations (triggering HashMap rehashes) and terminating workers
// mid-dispatch (leaving queued tasks with dangling `this`).
test(
"BroadcastChannel: concurrent register/dispatch/terminate does not race channelToContextIdentifier",
async () => {
const script = /* js */ `
const workerCode = \`
const bc = new BroadcastChannel("race-test");
bc.onmessage = () => {};
// Post from the worker so dispatchMessage() runs on OTHER worker threads,
// reaching the worker-side channelToContextIdentifier().get() path.
for (let i = 0; i < 20; i++) bc.postMessage(i);
postMessage("ready");
\`;
const blobUrl = URL.createObjectURL(new Blob([workerCode], { type: "application/javascript" }));

const mainChannel = new BroadcastChannel("race-test");
mainChannel.onmessage = () => {};

for (let round = 0; round < 4; round++) {
const workers = [];
const readyPromises = [];

// Spawning N workers → N registerChannel() → N .add() calls on main.
// Each worker also posts messages that fan out to all other workers,
// each fan-out hop reads the map on a worker thread.
for (let i = 0; i < 4; i++) {
const worker = new Worker(blobUrl);
const { promise, resolve } = Promise.withResolvers();
worker.onmessage = () => resolve();
workers.push(worker);
readyPromises.push(promise);
}

// While workers are registering & cross-posting, also create and
// immediately drop extra channels on main to force HashMap rehashes.
const extraChannels = [];
for (let i = 0; i < 16; i++) {
extraChannels.push(new BroadcastChannel("race-test"));
}
for (const c of extraChannels) c.close();

const [stdout, stderr, exitCode] = await Promise.all([proc.stdout.text(), proc.stderr.text(), proc.exited]);
await Promise.all(readyPromises);
Comment thread
robobun marked this conversation as resolved.

expect(stderr).toBe("");
expect(stdout.trim()).toBe("OK");
expect(exitCode).toBe(0);
});
// Terminate while dispatches are still in flight → queued postTaskTo
// lambdas may outlive their BroadcastChannel.
for (const worker of workers) {
mainChannel.postMessage("x");
worker.terminate();
}

test("BroadcastChannel: repeated worker create/terminate stress", async () => {
const script = /* js */ `
Bun.gc(true);
}

mainChannel.close();
console.log("OK");
`;

await using proc = Bun.spawn({
cmd: [bunExe(), "-e", script],
env: bunEnv,
stderr: "pipe",
stdout: "pipe",
});

const [stdout, stderr, exitCode] = await Promise.all([proc.stdout.text(), proc.stderr.text(), proc.exited]);

expect(filterStderr(stderr)).toBe("");
expect(stdout.trim()).toBe("OK");
expect(exitCode).toBe(0);
},
timeout,
);

test(
"BroadcastChannel: repeated worker create/terminate stress",
async () => {
const script = /* js */ `
const workerCode = \`
const bc = new BroadcastChannel("stress-test");
bc.onmessage = () => {};
Expand Down Expand Up @@ -124,16 +228,18 @@ test("BroadcastChannel: repeated worker create/terminate stress", async () => {
console.log("OK");
`;

await using proc = Bun.spawn({
cmd: [bunExe(), "-e", script],
env: bunEnv,
stderr: "pipe",
stdout: "pipe",
});

const [stdout, stderr, exitCode] = await Promise.all([proc.stdout.text(), proc.stderr.text(), proc.exited]);

expect(stderr).toBe("");
expect(stdout.trim()).toBe("OK");
expect(exitCode).toBe(0);
});
await using proc = Bun.spawn({
cmd: [bunExe(), "-e", script],
env: bunEnv,
stderr: "pipe",
stdout: "pipe",
});

const [stdout, stderr, exitCode] = await Promise.all([proc.stdout.text(), proc.stderr.text(), proc.exited]);

expect(filterStderr(stderr)).toBe("");
expect(stdout.trim()).toBe("OK");
expect(exitCode).toBe(0);
},
timeout,
);
Loading