Skip to content
22 changes: 22 additions & 0 deletions packages/sdk/client/rpc/bare-client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import { getAllPlugins } from "@/server/plugins";
import {
initializeWorkerCore,
shutdownBareDirectWorker,
cleanupForTerminate,
} from "@/server/worker-core";
import { assertLifecycleAllowed } from "@/server/bare/runtime-lifecycle";

Expand Down Expand Up @@ -227,6 +228,27 @@ function createMockRPCRequest() {
}
}

// Handle special pre-terminate cleanup signal. In direct mode the
// bare runtime is the host JS context, so we run cleanup but never
// exit the process here.
if (
typeof requestData === "object" &&
"type" in requestData &&
requestData.type === "__shutdown__"
) {
try {
await cleanupForTerminate();
return Buffer.from(JSON.stringify({ success: true }));
} catch (error) {
return Buffer.from(
JSON.stringify({
success: false,
error: error instanceof Error ? error.message : String(error),
}),
);
}
}

const response = await send(requestData as Request);
return Buffer.from(JSON.stringify(response));
},
Expand Down
102 changes: 98 additions & 4 deletions packages/sdk/client/rpc/expo-rpc-client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,10 @@ let rpcPromise: Promise<RPC> | null = null;
let workletInstance: Worklet | null = null;
let workletInitialized = false;
let cachedRuntimeContext: RuntimeContext | undefined;
// Set while close() is in flight. Concurrent callers share the same
// promise instead of double-sending __shutdown__ or re-entering the
// terminate block on already-null state.
let closingPromise: Promise<void> | null = null;

logger.debug("EXPO RPC Client bundle");

Expand Down Expand Up @@ -121,10 +125,100 @@ export async function getRPC() {
}
}

export function close() {
logger.info("🧹 Closing RPC client (Expo)");
rpcInstance = null;
rpcPromise = null;
const SHUTDOWN_RPC_TIMEOUT_MS = 10_000;

/**
* Pre-terminate cleanup roundtrip. Sends an internal `__shutdown__` message
* to the worker so it can clear addon plugin registries (calls each addon's
* `releaseLogger` β†’ frees env-bound js_ref_t state) and unload all model
* instances BEFORE we kill the worklet.
*
* Without this, the worklet's V8 isolate dies while addon static state still
* holds js_ref_t handles into it; the next worklet's first `setLogger` call
* trips a V8 GlobalHandle assertion (brk 0 / SIGTRAP) and the iOS app dies.
*
* Best-effort: never throws. Falls through to terminate even on timeout.
*/
async function sendShutdownMessage(rpc: RPC): Promise<void> {
let timer: ReturnType<typeof setTimeout> | undefined;
try {
await Promise.race([
(async () => {
const req = rpc.request(1);
req.send(JSON.stringify({ type: "__shutdown__" }), "utf8");
const response = await req.reply("utf8");
const parsed = JSON.parse(String(response)) as {
success: boolean;
error?: string;
};
if (!parsed.success) {
throw new Error(parsed.error ?? "Worker reported cleanup failure");
}
})(),
new Promise<never>((_, reject) => {
timer = setTimeout(
() =>
reject(
new Error(
`Worker did not ack __shutdown__ within ${SHUTDOWN_RPC_TIMEOUT_MS}ms`,
),
),
SHUTDOWN_RPC_TIMEOUT_MS,
);
}),
]);
} catch (error) {
// Best-effort: log but don't block termination if cleanup fails.
logger.warn(
`⚠️ Pre-terminate worker cleanup failed: ${
error instanceof Error ? error.message : String(error)
}`,
);
} finally {
if (timer) clearTimeout(timer);
}
}

export async function close(): Promise<void> {
// Concurrent callers (or a getRPC retry that overlaps with a manual
// close) share the in-flight close promise instead of each sending
// their own __shutdown__ and racing on the terminate block.
if (closingPromise) return closingPromise;

closingPromise = (async () => {
logger.info("🧹 Closing RPC client (Expo)");

// Ask the worker to release env-bound state (addon loggers, model
// instances) BEFORE we kill its V8 isolate. Mobile-specific need; on
// desktop the spawned worker process gets SIGTERM'd and the kernel
// reclaims everything regardless.
if (rpcInstance) {
logger.info("🧹 Requesting worker pre-terminate cleanup");
await sendShutdownMessage(rpcInstance);
}

rpcInstance = null;
rpcPromise = null;
if (workletInstance) {
logger.info("πŸ»πŸ”« Terminating bare worklet");
try {
workletInstance.terminate();
} catch (error) {
logger.debug("Failed to terminate worklet", { error });
}
workletInstance = null;
workletInitialized = false;
}
})();

try {
await closingPromise;
} finally {
// Reset so a subsequent close() (e.g. after a fresh getRPC spawned
// a new worklet) can run again. Body guards on rpcInstance / workletInstance
// make a redundant call a no-op if state is already cleared.
closingPromise = null;
}
}

export async function createDuplexSession(payload: string, commandId: number) {
Expand Down
10 changes: 10 additions & 0 deletions packages/sdk/server/rpc/handle-request.ts
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ import {
executeDuplexHandler,
handleInitConfig,
isInitConfigMessage,
handleShutdown,
isShutdownMessage,
} from "./handler-utils";
import { createServerProfiler, type ServerProfiler } from "./profiling";
import { assertLifecycleAllowed } from "@/server/bare/runtime-lifecycle";
Expand Down Expand Up @@ -57,6 +59,14 @@ export async function handleRequest(req: RPC.IncomingRequest): Promise<void> {
return;
}

// Handle internal pre-terminate cleanup signal (bypasses schema). Lets
// the client tear addons down while the JS env is still alive so static
// js_ref_t state doesn't survive into the next worklet's isolate.
if (isShutdownMessage(jsonData)) {
await handleShutdown(req);
return;
}

const { data: cleanData, profilingMeta } = extractProfilingMeta(jsonData);

if (cleanData && typeof cleanData === "object") {
Expand Down
36 changes: 36 additions & 0 deletions packages/sdk/server/rpc/handler-utils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -298,3 +298,39 @@ export function handleInitConfig(
);
}
}

// Internal pre-terminate cleanup signal. The SDK client sends this before
// tearing down the bare runtime (e.g. Worklet.terminate() on mobile) so
// addons can release env-bound state while their JS environment is still
// alive. Reply success/failure, never throws to the dispatcher.
type ShutdownMessage = {
type: "__shutdown__";
};

export function isShutdownMessage(data: unknown): data is ShutdownMessage {
return (
typeof data === "object" &&
data !== null &&
"type" in data &&
(data as { type?: unknown }).type === "__shutdown__"
);
}

export async function handleShutdown(req: RPC.IncomingRequest): Promise<void> {
try {
// Lazy import to avoid the import cycle:
// handler-utils -> worker-core -> create-server -> handle-request
// -> handler-utils. By the time this runs, all modules are loaded.
const { cleanupForTerminate } = await import("@/server/worker-core");
await cleanupForTerminate();
req.reply(JSON.stringify({ success: true }), "utf-8");
} catch (error) {
req.reply(
JSON.stringify({
success: false,
error: error instanceof Error ? error.message : String(error),
}),
"utf-8",
);
}
}
73 changes: 65 additions & 8 deletions packages/sdk/server/worker-core.ts
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,14 @@ import {

let coreInitialized = false;
let rpcInitialized = false;
// Set true when the cleanup body has run at least once. Lets both
// cleanupForTerminate (pre-terminate path) and shutdownBareDirectWorker
// (signal/exit path) call runCleanup() without doing duplicate work.
let cleanupRan = false;
// Set true when shutdownBareDirectWorker is in flight. Distinct from
// cleanupRan: cleanupForTerminate must NOT set this, otherwise a later
// SIGTERM/SIGINT/uncaught-exception would early-return at the guard
// in shutdownBareDirectWorker and skip releaseWorkerLock + process.exit.
let isShuttingDown = false;

const logger = getServerLogger();
Expand Down Expand Up @@ -96,6 +104,61 @@ export type BareDirectShutdownReason =
| "unhandled-rejection"
| "ipc-disconnect";

/**
* Run the cleanup body shared by terminal and graceful-shutdown paths.
* Clears plugin registries (which calls each addon's `releaseLogger` β†’
* frees env-bound js_ref_t state), unloads all loaded models (which calls
* each addon's `destroyInstance`), and closes infra (swarm, rag, downloads,
* registry client). Does NOT touch the worker lock or call `process.exit`.
*
* Idempotent: subsequent calls are no-ops via the `cleanupRan` flag. The
* underlying clearPlugins / unloadAllModels / closers are also idempotent
* on empty registries, but the flag avoids the redundant log noise and
* allocator churn.
*/
async function runCleanup(): Promise<void> {
if (cleanupRan) return;
cleanupRan = true;
clearRegistries();
await Promise.allSettled([
destroySwarm(),
closeAllRagInstances(),
cleanupDownloads(),
unloadAllModels(),
closeRegistryClient(),
]);
}

/**
* Pre-terminate cleanup, callable while the worker is still alive.
*
* On platforms where the worker lives in the same OS process as the JS host
* (i.e. mobile via react-native-bare-kit Worklet), `process.exit()` would
* kill the entire app. This path runs the same registry/model cleanup as
* `shutdownBareDirectWorker` but skips the lock release + exit, leaving the
* caller (typically the SDK client about to call `worklet.terminate()`)
* responsible for tearing the worker down.
*
* Critical for clean termination: addons hold static state with js_ref_t
* handles into the current V8 isolate; without this cleanup, those refs
* survive into the next worklet's isolate and crash on first access.
*/
export async function cleanupForTerminate(): Promise<void> {
// Intentionally does NOT set isShuttingDown β€” that flag is reserved for
// shutdownBareDirectWorker so a later SIGTERM/SIGINT still gets to run
// the lock release + process.exit path. runCleanup is idempotent on its
// own, so a follow-up shutdownBareDirectWorker call won't redo the body.
if (cleanupRan) return;

logger.info("🧹 Pre-terminate cleanup starting...");
try {
await runCleanup();
logger.info("βœ… Pre-terminate cleanup completed");
} catch (error) {
logger.error("❌ Error during pre-terminate cleanup:", error);
}
}

export async function shutdownBareDirectWorker(
reason: BareDirectShutdownReason,
): Promise<void> {
Expand All @@ -112,14 +175,8 @@ export async function shutdownBareDirectWorker(
logger.info(messages[reason]);

try {
clearRegistries();
await Promise.allSettled([
destroySwarm(),
closeAllRagInstances(),
cleanupDownloads(),
unloadAllModels(),
closeRegistryClient(),
]);
// Idempotent: if cleanupForTerminate already ran, this is a no-op.
await runCleanup();
logger.info("βœ… Cleanup completed successfully");
} catch (error) {
logger.error("❌ Error during shutdown cleanup:", error);
Expand Down
2 changes: 1 addition & 1 deletion packages/sdk/tests-qvac/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
},
"dependencies": {
"@qvac/sdk": "file:..",
"@tetherto/qvac-test-suite": "^0.6.1",
"@tetherto/qvac-test-suite": "^0.6.2",
"mqtt": "^5.14.1",
"react-native": "0.81.5",
"react-native-bare-kit": "0.12.3"
Expand Down
18 changes: 17 additions & 1 deletion packages/sdk/tests-qvac/tests/mobile/consumer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,13 @@ import { MobileDiffusionExecutor } from "./executors/diffusion-executor.js";
import { LifecycleExecutor } from "../shared/executors/lifecycle-executor.js";
import { ConfigExecutor } from "../shared/executors/config-executor.js";

const resources = new ResourceManager();
const resources = new ResourceManager({
// Mobile (iOS) needs a tick after each unloadModel for the kernel to
// actually release pages β€” without it, the next test's load arrives
// while the previous model's RSS is still resident and crashes the
// GGML allocator. Empirically 200ms is enough; desktop doesn't need it.
unloadSettleMs: 100,
});

resources.define("llm", {
constant: LLAMA_3_2_1B_INST_Q4_0,
Expand Down Expand Up @@ -324,6 +330,16 @@ export const executor = createExecutor({
], "HTTP test disabled on mobile (OOM)"),
new SkipExecutor(/^finetune-/, "Finetune tests disabled on mobile"),
new SkipExecutor(/^tools-(?!simple-function$|no-function-match$)/, "Tools test disabled on mobile"),
// suspend() hangs the test runner on mobile (the lifecycle coordinator
// pauses MQTT/network ops and never resumes within the test timeout).
// Only resume-idempotent is safe -- it does not call suspend().
skipTests([
"lifecycle-suspend-resume-basic",
"lifecycle-suspend-idempotent",
"lifecycle-suspend-resume-inference",
"lifecycle-rapid-toggle",
"lifecycle-suspend-during-inference",
], "suspend() hangs the runner on mobile"),
...(Platform.OS === "ios" ? [
skipTests([
"ocr-sign-image",
Expand Down
14 changes: 12 additions & 2 deletions packages/sdk/tests-qvac/tests/shared/resource-lifecycle.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,19 @@ export async function modelSetup(resources: ResourceManager, context: unknown) {
resources.incrementTestCount();

const dep = ctx.dependency as string | undefined;
if (!dep || dep === "none") return;
// dependency:"none" means the test declares it needs no preloaded model.
// Treat this as "evict everything currently held" β€” otherwise residue
// from the previous test (e.g. a 2GB translation model) stays resident
// while the next test allocates fresh memory on top of it, blowing the
// device memory budget on mobile (afriquegemma β†’ sharded-model-load was
// the empirical case this manifested as).
const deps =
!dep || dep === "none"
? []
: dep.includes("+")
? dep.split("+")
: [dep];

const deps = dep.includes("+") ? dep.split("+") : [dep];
await resources.evictExcept(deps);

for (const d of deps) {
Expand Down
Loading
Loading