Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 19 additions & 6 deletions src/jsc/bindings/JSFFIFunction.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -37,24 +37,35 @@
#include "DOMJITIDLTypeFilter.h"
#include "DOMJITHelpers.h"

class FFICallbackFunctionWrapper {
// Refcounted so the threadsafe trampoline can extend the wrapper's lifetime
// from a foreign thread with a single atomic increment instead of copying the
// JSC::Strong members (a HandleSet mutation that is only safe on the JS
// thread). ThreadSafeRefCountedBase is non-copyable, which also statically
// prevents reintroducing the by-value capture.
class FFICallbackFunctionWrapper : public ThreadSafeRefCounted<FFICallbackFunctionWrapper> {

WTF_DEPRECATED_MAKE_FAST_ALLOCATED(FFICallbackFunctionWrapper);

public:
JSC::Strong<JSC::JSFunction> m_function;
JSC::Strong<Zig::GlobalObject> globalObject;
// Cached on the JS thread at construction time so the foreign-thread
// trampoline never has to dereference a Strong to find the context.
WebCore::ScriptExecutionContextIdentifier m_contextId;
~FFICallbackFunctionWrapper() = default;

FFICallbackFunctionWrapper(JSC::JSFunction* function, Zig::GlobalObject* globalObject)
: m_function(globalObject->vm(), function)
, globalObject(globalObject->vm(), globalObject)
, m_contextId(globalObject->scriptExecutionContext()->identifier())
{
}
};
extern "C" void FFICallbackFunctionWrapper_destroy(FFICallbackFunctionWrapper* wrapper)
{
delete wrapper;
// Releases the create-time reference owned by the Rust side. Pending
// event-loop tasks may still hold their own refs.
wrapper->deref();
}

extern "C" FFICallbackFunctionWrapper* Bun__createFFICallbackFunction(
Expand All @@ -66,7 +77,8 @@

auto* callbackFunction = uncheckedDowncast<JSC::JSFunction>(JSC::JSValue::decode(callbackFn));

// refcount 1; released by FFICallbackFunctionWrapper_destroy
auto* wrapper = new FFICallbackFunctionWrapper(callbackFunction, globalObject);

Check failure on line 81 in src/jsc/bindings/JSFFIFunction.cpp

View check run for this annotation

Claude / Claude Code Review

ThreadSafeRefCounted wrapper missing adoptRef — debug-build assertion

`FFICallbackFunctionWrapper` now inherits `ThreadSafeRefCounted` but is constructed with a raw `new` and never passed through `adoptRef()` (or `relaxAdoptionRequirement()`), so on `ASSERT_ENABLED` builds the first `ref()` from `Ref { wrapper }` in `FFI_Callback_threadsafe_call` — or the `wrapper->deref()` in `FFICallbackFunctionWrapper_destroy`, which now runs for every `JSCallback.close()` — hits `ASSERT(!m_adoptionIsRequired)`. Fix: `return &adoptRef(*new FFICallbackFunctionWrapper(callbackFun
Comment thread
Jarred-Sumner marked this conversation as resolved.
Outdated

return wrapper;
}
Expand Down Expand Up @@ -206,17 +218,18 @@
extern "C" void
FFI_Callback_threadsafe_call(FFICallbackFunctionWrapper& wrapper, size_t argCount, JSC::EncodedJSValue* args)
{

auto* globalObject = wrapper.globalObject.get();
// This runs on a foreign (non-JS) thread. Do not touch the wrapper's
// JSC::Strong members here; only take a thread-safe ref so the wrapper
// stays alive until the task runs on the JS thread.
WTF::Vector<JSC::EncodedJSValue, 8> argsVec;
for (size_t i = 0; i < argCount; ++i)
argsVec.append(args[i]);

WebCore::ScriptExecutionContext::postTaskTo(globalObject->scriptExecutionContext()->identifier(), [argsVec = WTF::move(argsVec), wrapper](WebCore::ScriptExecutionContext& ctx) mutable {
WebCore::ScriptExecutionContext::postTaskTo(wrapper.m_contextId, [argsVec = WTF::move(argsVec), protectedWrapper = Ref { wrapper }](WebCore::ScriptExecutionContext& ctx) mutable {
auto* globalObject = uncheckedDowncast<Zig::GlobalObject>(ctx.jsGlobalObject());
auto& vm = JSC::getVM(globalObject);
JSC::MarkedArgumentBuffer arguments;
auto* function = wrapper.m_function.get();
auto* function = protectedWrapper->m_function.get();
for (size_t i = 0; i < argsVec.size(); ++i)
arguments.appendWithCrashOnOverflow(JSC::JSValue::decode(argsVec[i]));
WTF::NakedPtr<JSC::Exception> exception;
Expand Down
163 changes: 162 additions & 1 deletion test/js/bun/ffi/cc.test.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { cc, CString, ptr, type FFIFunction, type Library } from "bun:ffi";
import { cc, CString, JSCallback, ptr, type FFIFunction, type Library } from "bun:ffi";
import { afterAll, beforeAll, describe, expect, it } from "bun:test";
import { promises as fs } from "fs";
import { bunEnv, bunExe, isArm64, isASAN, isWindows, tempDirWithFiles } from "harness";
Expand Down Expand Up @@ -226,3 +226,164 @@ function makeValidCase<Fns extends Record<string, FFIFunction>>(
// @ts-ignore
return library;
}

// =============================================================================

// The fixture needs pthread_create/pthread_join resolved against the host
// process, which TinyCC's in-memory output only supports on POSIX.
// TinyCC's setjmp/longjmp error handling conflicts with ASan.
describe.skipIf(isWindows || isASAN)("threadsafe JSCallback invoked from a foreign thread", () => {
// TinyCC only ships its own builtin headers, so we cannot #include
// <pthread.h>. pthread_t is `unsigned long` on glibc and a pointer on
// macOS/musl; both fit in 8 bytes.
const source = /* c */ `
typedef unsigned long bun_test_pthread_t;
extern int pthread_create(bun_test_pthread_t*, const void*, void* (*)(void*), void*);
extern int pthread_join(bun_test_pthread_t, void**);

typedef void (*bun_test_callback)(int);

static bun_test_pthread_t bun_test_thread;
static bun_test_callback bun_test_cb;
static int bun_test_count;

static void* bun_test_thread_main(void* arg) {
for (int i = 0; i < bun_test_count; i++) {
bun_test_cb(i);
}
return 0;
}

int start(void* cb, int n) {
bun_test_cb = (bun_test_callback)cb;
bun_test_count = n;
return pthread_create(&bun_test_thread, 0, bun_test_thread_main, 0);
}

int join_thread(void) {
return pthread_join(bun_test_thread, 0);
}

int enqueue_n(void* cb, int n) {
bun_test_cb = (bun_test_callback)cb;
bun_test_count = n;
if (pthread_create(&bun_test_thread, 0, bun_test_thread_main, 0) != 0) {
return 1;
}
return pthread_join(bun_test_thread, 0);
}
`;
let dir: string;
let library: Library<{
start: { args: ["ptr", "int"]; returns: "int" };
join_thread: { args: []; returns: "int" };
enqueue_n: { args: ["ptr", "int"]; returns: "int" };
}>;

beforeAll(() => {
dir = tempDirWithFiles("bun-ffi-cc-threadsafe", {
"threadsafe-callback.c": source,
// Test B fixture: enqueue invocations from a foreign thread, close the
// callback while they are still queued, then wait for all of them to be
// delivered anyway.
"close-while-enqueued.js": /* js */ `
import { cc, JSCallback } from "bun:ffi";
import source from "./threadsafe-callback.c" with { type: "file" };

const N = 50;
const { symbols } = cc({
source,
symbols: {
enqueue_n: { args: ["ptr", "int"], returns: "int" },
},
});

let count = 0;
const cb = new JSCallback(
() => {
count++;
},
{ args: ["int"], threadsafe: true },
);

// enqueue_n joins the worker thread before returning, so all N tasks
// are sitting in the event-loop queue and none have run yet.
if (symbols.enqueue_n(cb.ptr, N) !== 0) {
throw new Error("enqueue_n failed");
}
cb.close();

while (count < N) {
await new Promise(r => setImmediate(r));
}
console.log("ok");
`,
});
library = cc({
source: path.join(dir, "threadsafe-callback.c"),
symbols: {
start: { args: ["ptr", "int"], returns: "int" },
join_thread: { args: [], returns: "int" },
enqueue_n: { args: ["ptr", "int"], returns: "int" },
},
});
});

afterAll(async () => {
library?.close();
await fs.rm(dir, { recursive: true, force: true });
});

it("delivers all callbacks invoked from a foreign thread while the JS thread churns GC handles", async () => {
const N = 200;
const received = new Set<number>();
const { promise, resolve } = Promise.withResolvers<void>();

const cb = new JSCallback(
(value: number) => {
received.add(value);
if (received.size === N) {
resolve();
}
},
{ args: ["int"], threadsafe: true },
);

expect(library.symbols.start(cb.ptr, N)).toBe(0);

// Churn JS-thread GC handle allocation while the foreign thread is
// invoking the callback. Each iteration allocates and frees Strong
// handles from the same HandleSet the foreign thread used to race with.
// The setImmediate yield is required: the foreign thread's invocations
// arrive as concurrent event-loop tasks and are only drained on
// event-loop ticks.
let done = false;
promise.then(() => {
done = true;
});
while (!done) {
const tmp = new JSCallback(() => {}, { returns: "void" });
tmp.close();
await new Promise(r => setImmediate(r));
}

expect(library.symbols.join_thread()).toBe(0);
expect([...received].sort((a, b) => a - b)).toEqual(Array.from({ length: N }, (_, i) => i));
cb.close();
});

it("close() with foreign-thread invocations still enqueued delivers the pending invocations", async () => {
await using proc = Bun.spawn({
cmd: [bunExe(), "close-while-enqueued.js"],
env: bunEnv,
cwd: dir,
stderr: "pipe",
});

const [stdout, stderr, exitCode] = await Promise.all([proc.stdout.text(), proc.stderr.text(), proc.exited]);

expect(stderr).toBe("");
expect(stdout).toBe("ok\n");
expect(exitCode).toBe(0);
Comment thread
Jarred-Sumner marked this conversation as resolved.
});
});
Loading