diff --git a/packages/interceptors-opentelemetry/src/workflow/context-manager.ts b/packages/interceptors-opentelemetry/src/workflow/context-manager.ts index 33961400d..894f1057c 100644 --- a/packages/interceptors-opentelemetry/src/workflow/context-manager.ts +++ b/packages/interceptors-opentelemetry/src/workflow/context-manager.ts @@ -1,12 +1,10 @@ +import { type AsyncLocalStorage } from 'async_hooks'; import * as otel from '@opentelemetry/api'; -import { ensureWorkflowModuleLoaded, getWorkflowModuleIfAvailable } from './workflow-module-loader'; - -const AsyncLocalStorage = getWorkflowModuleIfAvailable()?.AsyncLocalStorage; +import { ensureWorkflowModuleLoaded } from './workflow-module-loader'; export class ContextManager implements otel.ContextManager { - // If `@temporalio/workflow` is not available, ignore for now. - // When ContextManager is constructed module resolution error will be thrown. - protected storage = AsyncLocalStorage ? new AsyncLocalStorage() : undefined; + // The workflow sandbox provides AsyncLocalStorage through globalThis. + protected storage: AsyncLocalStorage = new (globalThis as any).AsyncLocalStorage(); public constructor() { ensureWorkflowModuleLoaded(); diff --git a/packages/test/src/test-interceptors.ts b/packages/test/src/test-interceptors.ts index d79415ce7..e399bca37 100644 --- a/packages/test/src/test-interceptors.ts +++ b/packages/test/src/test-interceptors.ts @@ -16,10 +16,12 @@ import { cleanOptionalStackTrace, compareStackTrace, RUN_INTEGRATION_TESTS, Work import { defaultOptions } from './mock-native-worker'; import { checkDisposeRan, + conditionWithTimeoutAfterDisposal, continueAsNewToDifferentWorkflow, initAndResetFlag, interceptorExample, internalsInterceptorExample, + successString, unblockOrCancel, } from './workflows'; import { getSecretQuery, unblockWithSecretSignal } from './workflows/interceptor-example'; @@ -312,4 +314,41 @@ if (RUN_INTEGRATION_TESTS) { t.true(disposeFlagSetNow); }); }); + + // Test to trigger GH #1866 + // When `reuseV8Context: true`, dispose() calls disableStorage() which disables the + // AsyncLocalStorage instance that stores cancellation scope. + // This causes CancellationScope.current() to return rootScope instead of the correct + // inner scope for workflows that continue afterward. + // + // The bug manifests in condition() with timeout: the finally block calls + // CancellationScope.current().cancel() to clean up. + // When storage is disabled, this incorrectly cancels the rootScope, failing the workflow with "Workflow cancelled". + test.serial('workflow disposal does not break CancellationScope in other workflows in reusable vm', async (t) => { + const taskQueue = 'test-reusable-vm-disposal-cancellation-scope'; + const worker = await Worker.create({ + ...defaultOptions, + taskQueue, + }); + + const client = new WorkflowClient(); + const result = await worker.runUntil(async () => { + // Fill the cache with workflow that complete immediately + await client.execute(successString, { taskQueue, workflowId: uuid4() }); + + // Start the condition workflow + const conditionHandle = await client.start(conditionWithTimeoutAfterDisposal, { + taskQueue, + workflowId: uuid4(), + }); + + // Run another workflow to trigger an evictions and disposal() while + // conditionWithTimeoutAfterDisposal is cached and waiting + await client.execute(successString, { taskQueue, workflowId: uuid4() }); + + // If dispose incorrectly disables the cancellation scope storage, then it will fail with CancelledFailure: "Workflow cancelled" + return await conditionHandle.result(); + }); + t.is(result, 'done'); + }); } diff --git a/packages/test/src/test-workflow-async-local-storage.ts b/packages/test/src/test-workflow-async-local-storage.ts new file mode 100644 index 000000000..05c297f49 --- /dev/null +++ b/packages/test/src/test-workflow-async-local-storage.ts @@ -0,0 +1,61 @@ +import type { AsyncLocalStorage } from 'async_hooks'; +import * as workflow from '@temporalio/workflow'; +import { helpers, makeTestFunction } from './helpers-integration'; +import { unblockSignal } from './workflows/testenv-test-workflows'; + +const test = makeTestFunction({ + workflowsPath: __filename, + workflowInterceptorModules: [require.resolve('./workflows/otel-interceptors')], +}); + +export async function asyncLocalStorageWorkflow(explicitlyDisable: boolean): Promise { + const myAls: AsyncLocalStorage = new (globalThis as any).AsyncLocalStorage('My Workflow ALS'); + try { + await myAls.run({}, async () => { + let signalReceived = false; + workflow.setHandler(unblockSignal, () => { + signalReceived = true; + }); + await workflow.condition(() => signalReceived); + }); + } finally { + if (explicitlyDisable) { + myAls.disable(); + } + } +} + +test("AsyncLocalStorage in workflow context doesn't throw when disabled", async (t) => { + const { createWorker, startWorkflow } = helpers(t); + + const worker = await createWorker({ + // We disable the workflow cache to ensure that some workflow executions will get + // evicted from cache, forcing early disposal of the corresponding workflow vms. + maxCachedWorkflows: 0, + maxConcurrentWorkflowTaskExecutions: 1, + + sinks: { + exporters: { + export: { + fn: () => void 0, + }, + }, + }, + }); + + await worker.runUntil(async () => { + const wfs = await Promise.all([ + startWorkflow(asyncLocalStorageWorkflow, { args: [true] }), + startWorkflow(asyncLocalStorageWorkflow, { args: [false] }), + startWorkflow(asyncLocalStorageWorkflow, { args: [true] }), + startWorkflow(asyncLocalStorageWorkflow, { args: [false] }), + ]); + + await Promise.all([wfs[0].signal(unblockSignal), wfs[1].signal(unblockSignal)]); + await Promise.all([wfs[0].result(), wfs[1].result()]); + }); + + // We're only asserting that no error is thrown. There's unfortunately no way + // to programmatically confirm that ALS instances were properly disposed. + t.pass(); +}); diff --git a/packages/test/src/test-workflows.ts b/packages/test/src/test-workflows.ts index 7330e0bf9..3fb29d5a3 100644 --- a/packages/test/src/test-workflows.ts +++ b/packages/test/src/test-workflows.ts @@ -97,6 +97,10 @@ test.beforeEach(async (t) => { }; }); +test.afterEach(async (t) => { + await t.context.workflow?.dispose(); +}); + async function createWorkflow( workflowType: string, runId: string, diff --git a/packages/test/src/workflows/index.ts b/packages/test/src/workflows/index.ts index dc0776f09..f1bbdf28e 100644 --- a/packages/test/src/workflows/index.ts +++ b/packages/test/src/workflows/index.ts @@ -59,6 +59,7 @@ export * from './promise-then-promise'; export * from './race'; export * from './random'; export * from './reject-promise'; +export * from './reusable-vm-disposal-bug'; export * from './run-activity-in-different-task-queue'; export * from './scope-cancelled-while-waiting-on-external-workflow-cancellation'; export * from './set-timeout-after-microtasks'; diff --git a/packages/test/src/workflows/reusable-vm-disposal-bug.ts b/packages/test/src/workflows/reusable-vm-disposal-bug.ts new file mode 100644 index 000000000..171d05ef9 --- /dev/null +++ b/packages/test/src/workflows/reusable-vm-disposal-bug.ts @@ -0,0 +1,20 @@ +import { condition } from '@temporalio/workflow'; + +/** + * Workflow for reproducing the GH #1866 + * + * When the bug is present: + * 1. First condition times out + * 2. finally block calls CancellationScope.current().cancel() + * 3. With disabled storage, current() returns rootScope + * 4. rootScope.cancel() is called, failing the workflow with "Workflow cancelled" + * + * Actual failure happens on the second `condition` where when creating the new + * cancellation scope, we see that the parent/root scope is already canceled. + */ +export async function conditionWithTimeoutAfterDisposal(): Promise { + const alwaysFalse = false; + await condition(() => alwaysFalse, '500ms'); + await condition(() => alwaysFalse, '500ms'); + return 'done'; +} diff --git a/packages/worker/src/workflow/reusable-vm.ts b/packages/worker/src/workflow/reusable-vm.ts index e719c919c..6f468aace 100644 --- a/packages/worker/src/workflow/reusable-vm.ts +++ b/packages/worker/src/workflow/reusable-vm.ts @@ -27,7 +27,7 @@ export class ReusableVMWorkflowCreator implements WorkflowCreator { * * Use the {@link context} getter instead */ - private _context?: vm.Context; + private _context?: vm.Context & typeof globalThis; private pristineObj?: object; constructor( @@ -42,12 +42,12 @@ export class ReusableVMWorkflowCreator implements WorkflowCreator { ReusableVMWorkflowCreator.unhandledRejectionHandlerHasBeenSet = true; } - this._context = vm.createContext({}, { microtaskMode: 'afterEvaluate' }); + this._context = vm.createContext({}, { microtaskMode: 'afterEvaluate' }) as vm.Context & typeof globalThis; vm.runInContext( `{ const __TEMPORAL_CALL_INTO_SCOPE = () => { - const [holder, fn, args] = globalThis.__TEMPORAL_ARGS__; - delete globalThis.__TEMPORAL_ARGS__; + const [holder, fn, args] = globalThis.__temporal_args; + delete globalThis.__temporal_args; if (globalThis.__TEMPORAL_BAG_HOLDER__ !== holder) { if (globalThis.__TEMPORAL_BAG_HOLDER__ !== undefined) { @@ -81,13 +81,11 @@ export class ReusableVMWorkflowCreator implements WorkflowCreator { { timeout: isolateExecutionTimeoutMs, displayErrors: true } ); - this.injectGlobals(this._context); - const sharedModules = new Map(); const __webpack_module_cache__ = new Proxy( {}, { - get: (_, p) => { + get: (_, p: string) => { // Try the shared modules first const sharedModule = sharedModules.get(p); if (sharedModule) { @@ -96,7 +94,7 @@ export class ReusableVMWorkflowCreator implements WorkflowCreator { const moduleCache = this.context.__TEMPORAL_ACTIVATOR__?.moduleCache; return moduleCache?.get(p); }, - set: (_, p, val) => { + set: (_, p: string, val) => { const moduleCache = this.context.__TEMPORAL_ACTIVATOR__?.moduleCache; if (moduleCache != null) { moduleCache.set(p, val); @@ -115,6 +113,8 @@ export class ReusableVMWorkflowCreator implements WorkflowCreator { configurable: false, }); + this.injectGlobals(this._context); + script.runInContext(this.context); // The V8 context is really composed of two distinct objects: the 'this._context' object on the outside, and another @@ -127,7 +127,7 @@ export class ReusableVMWorkflowCreator implements WorkflowCreator { ...Object.getOwnPropertyNames(this.pristineObj), ...Object.getOwnPropertySymbols(this.pristineObj), ]) { - if (k !== 'globalThis') { + if (k !== 'globalThis' && k !== '__temporal_globalSandboxDestructors') { const v: PropertyDescriptor = (this.pristineObj as any)[k]; v.value = deepFreeze(v.value); } @@ -136,7 +136,7 @@ export class ReusableVMWorkflowCreator implements WorkflowCreator { for (const v of sharedModules.values()) deepFreeze(v); } - protected get context(): vm.Context { + protected get context(): vm.Context & typeof globalThis { const { _context } = this; if (_context == null) { throw new IllegalStateError('Tried to use v8 context after Workflow creator was destroyed'); @@ -159,15 +159,15 @@ export class ReusableVMWorkflowCreator implements WorkflowCreator { async createWorkflow(options: WorkflowCreateOptions): Promise { const context = this.context; const holder: BagHolder = { bag: this.pristineObj! }; - const { isolateExecutionTimeoutMs } = this; + const workflowModule: WorkflowModule = new Proxy( {}, { get(_: any, fn: string) { return (...args: any[]) => { // By the time we get out of this call, all microtasks will have been executed - context.__TEMPORAL_ARGS__ = [holder, fn, args]; + context.__temporal_args = [holder, fn, args]; return callIntoVmScript.runInContext(context, { timeout: isolateExecutionTimeoutMs, displayErrors: true, @@ -175,7 +175,7 @@ export class ReusableVMWorkflowCreator implements WorkflowCreator { }; }, } - ) as any; + ); workflowModule.initRuntime({ ...options, @@ -183,7 +183,7 @@ export class ReusableVMWorkflowCreator implements WorkflowCreator { getTimeOfDay: native.getTimeOfDay, registeredActivityNames: this.registeredActivityNames, }); - const activator = context['__TEMPORAL_ACTIVATOR__']; + const activator = context.__TEMPORAL_ACTIVATOR__!; const newVM = new ReusableVMWorkflow(options.info.runId, context, activator, workflowModule); ReusableVMWorkflowCreator.workflowByRunId.set(options.info.runId, newVM); return newVM; @@ -210,8 +210,12 @@ export class ReusableVMWorkflowCreator implements WorkflowCreator { * Cleanup the pre-compiled script */ public async destroy(): Promise { - globalHandlers.removeWorkflowBundle(this.workflowBundle); - delete this._context; + try { + vm.runInContext(`__TEMPORAL__.api.destroy()`, this.context); + } finally { + globalHandlers.removeWorkflowBundle(this.workflowBundle); + delete this._context; + } } } diff --git a/packages/worker/src/workflow/vm-shared.ts b/packages/worker/src/workflow/vm-shared.ts index d46835738..eb5690dcf 100644 --- a/packages/worker/src/workflow/vm-shared.ts +++ b/packages/worker/src/workflow/vm-shared.ts @@ -1,6 +1,6 @@ import v8 from 'node:v8'; import vm from 'node:vm'; -import { AsyncLocalStorage } from 'node:async_hooks'; +import { AsyncLocalStorage as AsyncLocalStorageOriginal } from 'node:async_hooks'; import assert from 'node:assert'; import { URL, URLSearchParams } from 'node:url'; import { TextDecoder, TextEncoder } from 'node:util'; @@ -18,6 +18,9 @@ import { convertDeploymentVersion } from '../utils'; import { Workflow } from './interface'; import { WorkflowBundleWithSourceMapAndFilename } from './workflow-worker-thread/input'; +// We need this import for the ambient global extensions +import '@temporalio/workflow/lib/global-attributes'; // eslint-disable-line import/no-unassigned-import + // Best effort to catch unhandled rejections from workflow code. // We crash the thread if we cannot find the culprit. export function setUnhandledRejectionHandler(getWorkflowByRunId: (runId: string) => BaseVMWorkflow | undefined): void { @@ -89,8 +92,9 @@ function formatCallsiteName(callsite: NodeJS.CallSite): string | null { * Inject global objects as well as console.[log|...] into a vm context. */ export function injectGlobals(context: vm.Context): void { + const sandboxGlobalThis = context as typeof globalThis; + const globals = { - AsyncLocalStorage, URL, URLSearchParams, assert, @@ -99,25 +103,61 @@ export function injectGlobals(context: vm.Context): void { AbortController, }; for (const [k, v] of Object.entries(globals)) { - Object.defineProperty(context, k, { value: v, writable: false, enumerable: true, configurable: false }); + Object.defineProperty(sandboxGlobalThis, k, { value: v, writable: false, enumerable: true, configurable: false }); } const consoleMethods = ['log', 'warn', 'error', 'info', 'debug'] as const; type ConsoleMethod = (typeof consoleMethods)[number]; function makeConsoleFn(level: ConsoleMethod) { return function (...args: unknown[]) { - const { info } = context.__TEMPORAL_ACTIVATOR__; - if (info.isReplaying) return; - console[level](`[${info.workflowType}(${info.workflowId})]`, ...args); + if (sandboxGlobalThis.__TEMPORAL_ACTIVATOR__ === undefined) { + // This should not happen in a normal execution environment, but this is + // often handy while debugging the SDK, and costs nothing to keep around. + console[level](`[not in workflow context]`, ...args); + } else { + const { info } = sandboxGlobalThis.__TEMPORAL_ACTIVATOR__!; + if (info.unsafe.isReplaying) return; + console[level](`[${info.workflowType}(${info.workflowId})]`, ...args); + } }; } const consoleObject = Object.fromEntries(consoleMethods.map((level) => [level, makeConsoleFn(level)])); - Object.defineProperty(context, 'console', { + Object.defineProperty(sandboxGlobalThis, 'console', { value: consoleObject, writable: true, enumerable: false, configurable: true, }); + + class AsyncLocalStorage extends AsyncLocalStorageOriginal { + constructor(private name: string = 'anonymous') { + super(); + + const activator = sandboxGlobalThis.__TEMPORAL_ACTIVATOR__; + if (activator) { + activator.workflowSandboxDestructors.push(this.disable.bind(this)); + } else { + if (sandboxGlobalThis.__temporal_globalSandboxDestructors === undefined) + Object.defineProperty(sandboxGlobalThis, '__temporal_globalSandboxDestructors', { + value: [], + writable: false, + enumerable: false, + configurable: false, + }); + sandboxGlobalThis.__temporal_globalSandboxDestructors!.push(this.disable.bind(this)); + } + } + + disable(): void { + super.disable(); + } + } + Object.defineProperty(sandboxGlobalThis, 'AsyncLocalStorage', { + value: AsyncLocalStorage, + writable: false, + enumerable: true, + configurable: false, + }); } /** diff --git a/packages/worker/src/workflow/vm.ts b/packages/worker/src/workflow/vm.ts index 0478a8d0c..0966192c2 100644 --- a/packages/worker/src/workflow/vm.ts +++ b/packages/worker/src/workflow/vm.ts @@ -38,22 +38,24 @@ export class VMWorkflowCreator implements WorkflowCreator { * Create a workflow with given options */ async createWorkflow(options: WorkflowCreateOptions): Promise { - const context = this.getContext(); + const context = this.context; const { isolateExecutionTimeoutMs } = this; + const workflowModule: WorkflowModule = new Proxy( {}, { get(_: any, fn: string) { return (...args: any[]) => { - context.__TEMPORAL__.args = args; - return vm.runInContext(`__TEMPORAL__.api.${fn}(...__TEMPORAL__.args)`, context, { + // By the time we get out of this call, all microtasks will have been executed + context.__temporal_args = args; + return vm.runInContext(`__TEMPORAL__.api.${fn}(...__temporal_args)`, context, { timeout: isolateExecutionTimeoutMs, displayErrors: true, }); }; }, } - ) as any; + ); workflowModule.initRuntime({ ...options, @@ -61,18 +63,17 @@ export class VMWorkflowCreator implements WorkflowCreator { getTimeOfDay: native.getTimeOfDay, registeredActivityNames: this.registeredActivityNames, }); - const activator = context.__TEMPORAL_ACTIVATOR__ as any; - + const activator = context.__TEMPORAL_ACTIVATOR__!; const newVM = new VMWorkflow(options.info.runId, context, activator, workflowModule); VMWorkflowCreator.workflowByRunId.set(options.info.runId, newVM); return newVM; } - protected getContext(): vm.Context { + protected get context(): vm.Context & typeof globalThis { if (this.script === undefined) { throw new IllegalStateError('Isolate context provider was destroyed'); } - const context = vm.createContext({}, { microtaskMode: 'afterEvaluate' }); + const context = vm.createContext({}, { microtaskMode: 'afterEvaluate' }) as vm.Context & typeof globalThis; this.injectGlobals(context); this.script.runInContext(context); return context; @@ -126,6 +127,10 @@ export class VMWorkflowCreator implements WorkflowCreator { export class VMWorkflow extends BaseVMWorkflow { public async dispose(): Promise { this.workflowModule.dispose(); + + // This sandbox VM won't be reused, so we can destroy it now. + this.workflowModule.destroy(); + VMWorkflowCreator.workflowByRunId.delete(this.runId); delete this.context; } diff --git a/packages/workflow/src/cancellation-scope.ts b/packages/workflow/src/cancellation-scope.ts index 92e6d3a15..cc33781ba 100644 --- a/packages/workflow/src/cancellation-scope.ts +++ b/packages/workflow/src/cancellation-scope.ts @@ -246,14 +246,7 @@ export class CancellationScope { } } -const storage = new AsyncLocalStorage(); - -/** - * Avoid exposing the storage directly so it doesn't get frozen - */ -export function disableStorage(): void { - storage.disable(); -} +const storage: ALS = new AsyncLocalStorage(); export class RootCancellationScope extends CancellationScope { constructor() { diff --git a/packages/workflow/src/global-attributes.ts b/packages/workflow/src/global-attributes.ts index 8dc5384af..b141fbc80 100644 --- a/packages/workflow/src/global-attributes.ts +++ b/packages/workflow/src/global-attributes.ts @@ -1,28 +1,46 @@ -import { IllegalStateError } from '@temporalio/common'; -import { type Activator } from './internals'; +import { IllegalStateError, type Workflow } from '@temporalio/common'; +import type { Activator } from './internals'; +import type { WorkflowInterceptorsFactory } from './interceptors'; -export function maybeGetActivatorUntyped(): unknown { - return (globalThis as any).__TEMPORAL_ACTIVATOR__; +declare global { + // The __TEMPORAL__ object will be assigned by Webpack to the exports of the entrypoint module + // file dynamically generated by the bundler (see workflow/bundler.ts#genEntrypoint()). + // + // FIXME: Rename to lowercase syntax before 1.15.0. I preserved the uppercase name for backward + // compatibility with bundles generated by 1.14.0. We generally do not guarantee compatibility of + // bundles with different versions of the SDK, but in this specific case, there's really no + // user-side value in renaming this, so making a breaking change for that would not be justified. + // + // eslint-disable-next-line no-var + var __TEMPORAL__: { + api: typeof import('./worker-interface.ts'); + importWorkflows: () => Record; + importInterceptors: () => [{ interceptors: WorkflowInterceptorsFactory }]; + }; + + // Destructors to be called when the shared sandbox is destroyed. + // eslint-disable-next-line no-var + var __temporal_globalSandboxDestructors: (() => void)[] | undefined; + + // FIXME: Rename to lowercase syntax before 1.15.0. + // eslint-disable-next-line no-var + var __TEMPORAL_ACTIVATOR__: Activator | undefined; } -export function setActivatorUntyped(activator: unknown): void { - (globalThis as any).__TEMPORAL_ACTIVATOR__ = activator; +export function setActivator(activator: Activator | undefined): void { + globalThis.__TEMPORAL_ACTIVATOR__ = activator; } export function maybeGetActivator(): Activator | undefined { - return maybeGetActivatorUntyped() as Activator | undefined; + return globalThis.__TEMPORAL_ACTIVATOR__; } -export function assertInWorkflowContext(message: string): Activator { +export function assertInWorkflowContext(uninitializedErrorMessage: string): Activator { const activator = maybeGetActivator(); - if (activator == null) throw new IllegalStateError(message); + if (activator == null) throw new IllegalStateError(uninitializedErrorMessage); return activator; } -export function getActivator(): Activator { - const activator = maybeGetActivator(); - if (activator === undefined) { - throw new IllegalStateError('Workflow uninitialized'); - } - return activator; -} +// This is really just an alias for `assertInWorkflowContext` with a default error message, +// because that name better conveys the intent in some very common use cases. +export const getActivator = assertInWorkflowContext.bind(null, 'Workflow uninitialized'); diff --git a/packages/workflow/src/internals.ts b/packages/workflow/src/internals.ts index 6e2ec2a3f..ae6783fe7 100644 --- a/packages/workflow/src/internals.ts +++ b/packages/workflow/src/internals.ts @@ -455,6 +455,8 @@ export class Activator implements ActivationHandler { public versioningBehavior?: VersioningBehavior; public workflowDefinitionOptionsGetter?: () => WorkflowDefinitionOptions; + public readonly workflowSandboxDestructors: (() => void)[] = []; + constructor({ info, now, diff --git a/packages/workflow/src/stack-helpers.ts b/packages/workflow/src/stack-helpers.ts index 7f2c283e7..effba0002 100644 --- a/packages/workflow/src/stack-helpers.ts +++ b/packages/workflow/src/stack-helpers.ts @@ -1,11 +1,10 @@ -import { maybeGetActivatorUntyped } from './global-attributes'; -import type { PromiseStackStore } from './internals'; +import { maybeGetActivator } from './global-attributes'; /** * Helper function to remove a promise from being tracked for stack trace query purposes */ export function untrackPromise(promise: Promise): void { - const store = (maybeGetActivatorUntyped() as any)?.promiseStackStore as PromiseStackStore | undefined; + const store = maybeGetActivator()?.promiseStackStore; if (!store) return; store.childToParent.delete(promise); store.promiseToStack.delete(promise); diff --git a/packages/workflow/src/update-scope.ts b/packages/workflow/src/update-scope.ts index de6a75b05..2f254e59e 100644 --- a/packages/workflow/src/update-scope.ts +++ b/packages/workflow/src/update-scope.ts @@ -57,11 +57,4 @@ export class UpdateScope { } } -const storage = new AsyncLocalStorage(); - -/** - * Disable the async local storage for updates. - */ -export function disableUpdateStorage(): void { - storage.disable(); -} +const storage: ALS = new AsyncLocalStorage(); diff --git a/packages/workflow/src/worker-interface.ts b/packages/workflow/src/worker-interface.ts index 14ec355f8..bce55d3f0 100644 --- a/packages/workflow/src/worker-interface.ts +++ b/packages/workflow/src/worker-interface.ts @@ -6,17 +6,14 @@ import { encodeVersioningBehavior, IllegalStateError, WorkflowFunctionWithOptions } from '@temporalio/common'; import { composeInterceptors } from '@temporalio/common/lib/interceptors'; import { coresdk } from '@temporalio/proto'; -import { disableStorage } from './cancellation-scope'; -import { disableUpdateStorage } from './update-scope'; -import { WorkflowInterceptorsFactory } from './interceptors'; -import { WorkflowCreateOptionsInternal } from './interfaces'; +import type { WorkflowInterceptorsFactory } from './interceptors'; +import type { WorkflowCreateOptionsInternal } from './interfaces'; import { Activator } from './internals'; -import { setActivatorUntyped, getActivator } from './global-attributes'; +import { setActivator, getActivator, maybeGetActivator } from './global-attributes'; // Export the type for use on the "worker" side export { PromiseStackStore } from './internals'; -const global = globalThis as any; const OriginalDate = globalThis.Date; /** @@ -35,7 +32,7 @@ export function initRuntime(options: WorkflowCreateOptionsInternal): void { // There's one activator per workflow instance, set it globally on the context. // We do this before importing any user code so user code can statically reference @temporalio/workflow functions // as well as Date and Math.random. - setActivatorUntyped(activator); + setActivator(activator); activator.rethrowSynchronously = true; try { @@ -54,7 +51,7 @@ export function initRuntime(options: WorkflowCreateOptionsInternal): void { activator.failureConverter = customFailureConverter; } - const { importWorkflows, importInterceptors } = global.__TEMPORAL__; + const { importWorkflows, importInterceptors } = globalThis.__TEMPORAL__; if (importWorkflows === undefined || importInterceptors === undefined) { throw new IllegalStateError('Workflow bundle did not register import hooks'); } @@ -97,6 +94,15 @@ export function initRuntime(options: WorkflowCreateOptionsInternal): void { activator.workflowDefinitionOptionsGetter = activator.workflow.workflowDefinitionOptions; } } + } catch (e) { + try { + // Core won't send an eviction job after an early error, so we are responsible for triggering + // disposal of the workflow execution context. Otherwise, there might be resource leaks. + dispose(); + } catch (_innerError) { + // Ignore error disposing of activator, it is more important to rethrow the original error + } + throw e; } finally { activator.rethrowSynchronously = false; } @@ -252,17 +258,58 @@ export function tryUnblockConditions(): number { } } +// Handle disposal of the workflow execution context (not to be confused with destroying +// the sandbox vm, which may possibly be reused if `reuseV8Context` is true). export function dispose(): void { + let error: unknown | undefined = undefined; + const activator = getActivator(); activator.rethrowSynchronously = true; try { - const dispose = composeInterceptors(activator.interceptors.internals, 'dispose', async () => { - disableStorage(); - disableUpdateStorage(); - }); - dispose({}); + try { + const dispose = composeInterceptors(activator.interceptors.internals, 'dispose', async () => {}); + dispose({}); + } catch (e) { + error = e; + } + + // Destructors are run outside of interceptors, because interceptors themselves often rely + // on objects that will be destroyed (notably AsyncLocalStorage instances), and because we + // want to make sure that resources get cleaned up even if an interceptor throws an error. + // Only the first error (if any) is rethrown. + for (const destructor of activator.workflowSandboxDestructors.splice(0)) { + try { + destructor(); + } catch (e) { + if (error == null) { + error = e; + } + } + } + + if (error != null) throw error; } finally { activator.rethrowSynchronously = false; + + // The activator is no longer valid past this point. + setActivator(undefined); + } +} + +// Destroy the sandbox VM (not to be confused with disposing of the workflow execution context). +export function destroy(): void { + const activator = maybeGetActivator(); + if (activator) throw new IllegalStateError('Workflow execution context should have been disposed first'); + + let error: unknown | undefined = undefined; + for (const destructor of globalThis.__temporal_globalSandboxDestructors?.splice(0) ?? []) { + try { + destructor(); + } catch (e) { + if (error == null) { + error = e; + } + } } }