diff --git a/packages/sdk/client/rpc/bare-client.ts b/packages/sdk/client/rpc/bare-client.ts index 50c412bf70..24fab08159 100644 --- a/packages/sdk/client/rpc/bare-client.ts +++ b/packages/sdk/client/rpc/bare-client.ts @@ -25,6 +25,7 @@ import { getAllPlugins } from "@/server/plugins"; import { initializeWorkerCore, shutdownBareDirectWorker, + cleanupForTerminate, } from "@/server/worker-core"; import { assertLifecycleAllowed } from "@/server/bare/runtime-lifecycle"; @@ -227,6 +228,27 @@ function createMockRPCRequest() { } } + // Handle special pre-terminate cleanup signal. In direct mode the + // bare runtime is the host JS context, so we run cleanup but never + // exit the process here. + if ( + typeof requestData === "object" && + "type" in requestData && + requestData.type === "__shutdown__" + ) { + try { + await cleanupForTerminate(); + return Buffer.from(JSON.stringify({ success: true })); + } catch (error) { + return Buffer.from( + JSON.stringify({ + success: false, + error: error instanceof Error ? error.message : String(error), + }), + ); + } + } + const response = await send(requestData as Request); return Buffer.from(JSON.stringify(response)); }, diff --git a/packages/sdk/client/rpc/expo-rpc-client.ts b/packages/sdk/client/rpc/expo-rpc-client.ts index f268fccfa3..3545d5b3ef 100644 --- a/packages/sdk/client/rpc/expo-rpc-client.ts +++ b/packages/sdk/client/rpc/expo-rpc-client.ts @@ -15,6 +15,10 @@ let rpcPromise: Promise | null = null; let workletInstance: Worklet | null = null; let workletInitialized = false; let cachedRuntimeContext: RuntimeContext | undefined; +// Set while close() is in flight. Concurrent callers share the same +// promise instead of double-sending __shutdown__ or re-entering the +// terminate block on already-null state. +let closingPromise: Promise | null = null; logger.debug("EXPO RPC Client bundle"); @@ -121,10 +125,114 @@ export async function getRPC() { } } -export function close() { - logger.info("๐Ÿงน Closing RPC client (Expo)"); - rpcInstance = null; - rpcPromise = null; +const SHUTDOWN_RPC_TIMEOUT_MS = 10_000; + +/** + * Pre-terminate cleanup roundtrip. Sends an internal `__shutdown__` message + * to the worker so it can clear addon plugin registries (calls each addon's + * `releaseLogger` โ†’ frees env-bound js_ref_t state) and unload all model + * instances BEFORE we kill the worklet. + * + * Without this, the worklet's V8 isolate dies while addon static state still + * holds js_ref_t handles into it; the next worklet's first `setLogger` call + * trips a V8 GlobalHandle assertion (brk 0 / SIGTRAP) and the iOS app dies. + * + * Best-effort: never throws. Falls through to terminate even on timeout. + */ +async function sendShutdownMessage(rpc: RPC): Promise { + let timer: ReturnType | undefined; + try { + await Promise.race([ + (async () => { + const req = rpc.request(1); + req.send(JSON.stringify({ type: "__shutdown__" }), "utf8"); + const response = await req.reply("utf8"); + const parsed = JSON.parse(String(response)) as { + success: boolean; + error?: string; + }; + if (!parsed.success) { + throw new Error(parsed.error ?? "Worker reported cleanup failure"); + } + })(), + new Promise((_, reject) => { + timer = setTimeout( + () => + reject( + new Error( + `Worker did not ack __shutdown__ within ${SHUTDOWN_RPC_TIMEOUT_MS}ms`, + ), + ), + SHUTDOWN_RPC_TIMEOUT_MS, + ); + }), + ]); + } catch (error) { + // Best-effort: log but don't block termination if cleanup fails. + logger.warn( + `โš ๏ธ Pre-terminate worker cleanup failed: ${ + error instanceof Error ? error.message : String(error) + }`, + ); + } finally { + if (timer) clearTimeout(timer); + } +} + +export async function close(): Promise { + // Concurrent callers (or a getRPC retry that overlaps with a manual + // close) share the in-flight close promise instead of each sending + // their own __shutdown__ and racing on the terminate block. + if (closingPromise) return closingPromise; + + closingPromise = (async () => { + logger.info("๐Ÿงน Closing RPC client (Expo)"); + + // terminate() crashes on Android (addon dlclose leaves pthread_key_t + // destructors dangling); iOS dyld no-ops dlclose so it's safe there. + // Non-iOS: drop refs only -- sending __shutdown__ without a follow-up + // terminate would clear the worker plugin registry. + let platform: string | undefined; + try { + platform = (await getRuntimeContext()).platform; + } catch (err) { + logger.debug("Failed to resolve runtime context for close()", { err }); + } + + if (platform !== "ios") { + rpcInstance = null; + rpcPromise = null; + return; + } + + // iOS: existing pre-terminate cleanup + terminate. + if (rpcInstance) { + logger.info("๐Ÿงน Requesting worker pre-terminate cleanup"); + await sendShutdownMessage(rpcInstance); + } + + rpcInstance = null; + rpcPromise = null; + if (workletInstance) { + logger.info("๐Ÿป๐Ÿ”ซ Terminating bare worklet"); + try { + workletInstance.terminate(); + } catch (error) { + logger.debug("Failed to terminate worklet", { error }); + } + workletInstance = null; + workletInitialized = false; + } + })(); + + try { + await closingPromise; + } finally { + // Reset so a subsequent close() (e.g. after a fresh getRPC spawned + // a new worklet) can run again. Body guards on rpcInstance / workletInstance + // make a redundant call a no-op if state is already cleared. + closingPromise = null; + } } export async function createDuplexSession(payload: string, commandId: number) { diff --git a/packages/sdk/server/rpc/handle-request.ts b/packages/sdk/server/rpc/handle-request.ts index c32f7ddf03..8b8d072960 100644 --- a/packages/sdk/server/rpc/handle-request.ts +++ b/packages/sdk/server/rpc/handle-request.ts @@ -24,6 +24,8 @@ import { executeDuplexHandler, handleInitConfig, isInitConfigMessage, + handleShutdown, + isShutdownMessage, } from "./handler-utils"; import { createServerProfiler, type ServerProfiler } from "./profiling"; import { assertLifecycleAllowed } from "@/server/bare/runtime-lifecycle"; @@ -57,6 +59,14 @@ export async function handleRequest(req: RPC.IncomingRequest): Promise { return; } + // Handle internal pre-terminate cleanup signal (bypasses schema). Lets + // the client tear addons down while the JS env is still alive so static + // js_ref_t state doesn't survive into the next worklet's isolate. + if (isShutdownMessage(jsonData)) { + await handleShutdown(req); + return; + } + const { data: cleanData, profilingMeta } = extractProfilingMeta(jsonData); if (cleanData && typeof cleanData === "object") { diff --git a/packages/sdk/server/rpc/handler-utils.ts b/packages/sdk/server/rpc/handler-utils.ts index 7ea6064641..455e3c654a 100644 --- a/packages/sdk/server/rpc/handler-utils.ts +++ b/packages/sdk/server/rpc/handler-utils.ts @@ -298,3 +298,39 @@ export function handleInitConfig( ); } } + +// Internal pre-terminate cleanup signal. The SDK client sends this before +// tearing down the bare runtime (e.g. Worklet.terminate() on mobile) so +// addons can release env-bound state while their JS environment is still +// alive. Reply success/failure, never throws to the dispatcher. +type ShutdownMessage = { + type: "__shutdown__"; +}; + +export function isShutdownMessage(data: unknown): data is ShutdownMessage { + return ( + typeof data === "object" && + data !== null && + "type" in data && + (data as { type?: unknown }).type === "__shutdown__" + ); +} + +export async function handleShutdown(req: RPC.IncomingRequest): Promise { + try { + // Lazy import to avoid the import cycle: + // handler-utils -> worker-core -> create-server -> handle-request + // -> handler-utils. By the time this runs, all modules are loaded. + const { cleanupForTerminate } = await import("@/server/worker-core"); + await cleanupForTerminate(); + req.reply(JSON.stringify({ success: true }), "utf-8"); + } catch (error) { + req.reply( + JSON.stringify({ + success: false, + error: error instanceof Error ? error.message : String(error), + }), + "utf-8", + ); + } +} diff --git a/packages/sdk/server/worker-core.ts b/packages/sdk/server/worker-core.ts index 508e0ab052..d755c70b6c 100644 --- a/packages/sdk/server/worker-core.ts +++ b/packages/sdk/server/worker-core.ts @@ -22,6 +22,14 @@ import { let coreInitialized = false; let rpcInitialized = false; +// Set true when the cleanup body has run at least once. Lets both +// cleanupForTerminate (pre-terminate path) and shutdownBareDirectWorker +// (signal/exit path) call runCleanup() without doing duplicate work. +let cleanupRan = false; +// Set true when shutdownBareDirectWorker is in flight. Distinct from +// cleanupRan: cleanupForTerminate must NOT set this, otherwise a later +// SIGTERM/SIGINT/uncaught-exception would early-return at the guard +// in shutdownBareDirectWorker and skip releaseWorkerLock + process.exit. let isShuttingDown = false; const logger = getServerLogger(); @@ -96,6 +104,61 @@ export type BareDirectShutdownReason = | "unhandled-rejection" | "ipc-disconnect"; +/** + * Run the cleanup body shared by terminal and graceful-shutdown paths. + * Clears plugin registries (which calls each addon's `releaseLogger` โ†’ + * frees env-bound js_ref_t state), unloads all loaded models (which calls + * each addon's `destroyInstance`), and closes infra (swarm, rag, downloads, + * registry client). Does NOT touch the worker lock or call `process.exit`. + * + * Idempotent: subsequent calls are no-ops via the `cleanupRan` flag. The + * underlying clearPlugins / unloadAllModels / closers are also idempotent + * on empty registries, but the flag avoids the redundant log noise and + * allocator churn. + */ +async function runCleanup(): Promise { + if (cleanupRan) return; + cleanupRan = true; + clearRegistries(); + await Promise.allSettled([ + destroySwarm(), + closeAllRagInstances(), + cleanupDownloads(), + unloadAllModels(), + closeRegistryClient(), + ]); +} + +/** + * Pre-terminate cleanup, callable while the worker is still alive. + * + * On platforms where the worker lives in the same OS process as the JS host + * (i.e. mobile via react-native-bare-kit Worklet), `process.exit()` would + * kill the entire app. This path runs the same registry/model cleanup as + * `shutdownBareDirectWorker` but skips the lock release + exit, leaving the + * caller (typically the SDK client about to call `worklet.terminate()`) + * responsible for tearing the worker down. + * + * Critical for clean termination: addons hold static state with js_ref_t + * handles into the current V8 isolate; without this cleanup, those refs + * survive into the next worklet's isolate and crash on first access. + */ +export async function cleanupForTerminate(): Promise { + // Intentionally does NOT set isShuttingDown โ€” that flag is reserved for + // shutdownBareDirectWorker so a later SIGTERM/SIGINT still gets to run + // the lock release + process.exit path. runCleanup is idempotent on its + // own, so a follow-up shutdownBareDirectWorker call won't redo the body. + if (cleanupRan) return; + + logger.info("๐Ÿงน Pre-terminate cleanup starting..."); + try { + await runCleanup(); + logger.info("โœ… Pre-terminate cleanup completed"); + } catch (error) { + logger.error("โŒ Error during pre-terminate cleanup:", error); + } +} + export async function shutdownBareDirectWorker( reason: BareDirectShutdownReason, ): Promise { @@ -112,14 +175,8 @@ export async function shutdownBareDirectWorker( logger.info(messages[reason]); try { - clearRegistries(); - await Promise.allSettled([ - destroySwarm(), - closeAllRagInstances(), - cleanupDownloads(), - unloadAllModels(), - closeRegistryClient(), - ]); + // Idempotent: if cleanupForTerminate already ran, this is a no-op. + await runCleanup(); logger.info("โœ… Cleanup completed successfully"); } catch (error) { logger.error("โŒ Error during shutdown cleanup:", error); diff --git a/packages/sdk/tests-qvac/package.json b/packages/sdk/tests-qvac/package.json index 884cfb75d2..65d53c2d45 100644 --- a/packages/sdk/tests-qvac/package.json +++ b/packages/sdk/tests-qvac/package.json @@ -15,7 +15,7 @@ }, "dependencies": { "@qvac/sdk": "file:..", - "@tetherto/qvac-test-suite": "^0.6.1", + "@tetherto/qvac-test-suite": "^0.6.2", "mqtt": "^5.14.1", "react-native": "0.81.5", "react-native-bare-kit": "0.12.3" diff --git a/packages/sdk/tests-qvac/tests/mobile/consumer.ts b/packages/sdk/tests-qvac/tests/mobile/consumer.ts index 460d29c4ad..08cc045949 100644 --- a/packages/sdk/tests-qvac/tests/mobile/consumer.ts +++ b/packages/sdk/tests-qvac/tests/mobile/consumer.ts @@ -67,7 +67,13 @@ import { MobileDiffusionExecutor } from "./executors/diffusion-executor.js"; import { LifecycleExecutor } from "../shared/executors/lifecycle-executor.js"; import { ConfigExecutor } from "../shared/executors/config-executor.js"; -const resources = new ResourceManager(); +const resources = new ResourceManager({ + // Mobile (iOS) needs a tick after each unloadModel for the kernel to + // actually release pages โ€” without it, the next test's load arrives + // while the previous model's RSS is still resident and crashes the + // GGML allocator. Empirically 200ms is enough; desktop doesn't need it. + unloadSettleMs: 100, +}); resources.define("llm", { constant: LLAMA_3_2_1B_INST_Q4_0, @@ -330,6 +336,24 @@ export const executor = createExecutor({ ], "HTTP test disabled on mobile (OOM)"), new SkipExecutor(/^finetune-/, "Finetune tests disabled on mobile"), new SkipExecutor(/^tools-(?!simple-function$|no-function-match$)/, "Tools test disabled on mobile"), + // suspend() hangs the test runner on mobile (the lifecycle coordinator + // pauses MQTT/network ops and never resumes within the test timeout). + // Only resume-idempotent is safe -- it does not call suspend(). + skipTests([ + "lifecycle-suspend-resume-basic", + "lifecycle-suspend-idempotent", + "lifecycle-suspend-resume-inference", + "lifecycle-rapid-toggle", + "lifecycle-suspend-during-inference", + ], "suspend() hangs the runner on mobile"), + // diffusion-streaming-progress reliably times out on mobile and the + // leftover stream blocks the diffusion model from being evicted, + // hanging the next test that needs to free it (typically + // wrong-model-transcribe-on-llm via ResourceManager.evictExcept). + skipTests( + ["diffusion-streaming-progress"], + "diffusion stream times out on mobile and blocks subsequent eviction", + ), ...(Platform.OS === "ios" ? [ skipTests([ "ocr-sign-image", diff --git a/packages/sdk/tests-qvac/tests/shared/resource-lifecycle.ts b/packages/sdk/tests-qvac/tests/shared/resource-lifecycle.ts index 5c28dfb5e3..1ba153cd0d 100644 --- a/packages/sdk/tests-qvac/tests/shared/resource-lifecycle.ts +++ b/packages/sdk/tests-qvac/tests/shared/resource-lifecycle.ts @@ -7,9 +7,19 @@ export async function modelSetup(resources: ResourceManager, context: unknown) { resources.incrementTestCount(); const dep = ctx.dependency as string | undefined; - if (!dep || dep === "none") return; + // dependency:"none" means the test declares it needs no preloaded model. + // Treat this as "evict everything currently held" โ€” otherwise residue + // from the previous test (e.g. a 2GB translation model) stays resident + // while the next test allocates fresh memory on top of it, blowing the + // device memory budget on mobile (afriquegemma โ†’ sharded-model-load was + // the empirical case this manifested as). + const deps = + !dep || dep === "none" + ? [] + : dep.includes("+") + ? dep.split("+") + : [dep]; - const deps = dep.includes("+") ? dep.split("+") : [dep]; await resources.evictExcept(deps); for (const d of deps) { diff --git a/packages/sdk/tests-qvac/tests/shared/resource-manager.ts b/packages/sdk/tests-qvac/tests/shared/resource-manager.ts index 71dad421c9..a0382f28a4 100644 --- a/packages/sdk/tests-qvac/tests/shared/resource-manager.ts +++ b/packages/sdk/tests-qvac/tests/shared/resource-manager.ts @@ -15,11 +15,36 @@ interface TrackedModel { lastUsedAtTest: number; } +export interface ResourceManagerOptions { + /** + * Milliseconds to sleep after a successful unloadModel() call inside + * `evict()`. Lets the OS catch up on lazy page reclamation before the + * next load starts allocating on top. + * + * Mobile (iOS) needs this โ€” kernel doesn't release pages instantly when + * a Bare worklet's V8 isolate destroys its handles, and the next test's + * load can crash with EXC_CRASH/SIGABRT inside the GGML allocator if it + * arrives at the still-resident-residue moment. + * + * Desktop doesn't need it โ€” `unloadModel` over the IPC socket completes + * with the worker process already having freed the memory, and the + * kernel reclaims fast. + * + * Default 0 (off). + */ + unloadSettleMs?: number; +} + export class ResourceManager { private definitions = new Map(); private models = new Map(); private testCount = 0; private downloaded = false; + private readonly unloadSettleMs: number; + + constructor(options: ResourceManagerOptions = {}) { + this.unloadSettleMs = options.unloadSettleMs ?? 0; + } define(dep: string, definition: ModelDefinition) { this.definitions.set(dep, definition); @@ -183,10 +208,19 @@ export class ResourceManager { } try { await unloadModel({ modelId: entry.modelId }); + // Optionally yield so the OS can reclaim pages before the next + // load starts allocating. See `unloadSettleMs` docs above. Only + // wait when the unload actually succeeded; on failure there's + // nothing to settle. + if (this.unloadSettleMs > 0) { + await new Promise((resolve) => + setTimeout(resolve, this.unloadSettleMs), + ); + } } catch (error) { console.warn(`Error unloading model ${dep}: ${error}`); } - + this.models.delete(dep); } }