Skip to content
Original file line number Diff line number Diff line change
@@ -1,12 +1,10 @@
import { type AsyncLocalStorage } from 'async_hooks';
import * as otel from '@opentelemetry/api';
import { ensureWorkflowModuleLoaded, getWorkflowModuleIfAvailable } from './workflow-module-loader';

const AsyncLocalStorage = getWorkflowModuleIfAvailable()?.AsyncLocalStorage;
import { ensureWorkflowModuleLoaded } from './workflow-module-loader';

export class ContextManager implements otel.ContextManager {
// If `@temporalio/workflow` is not available, ignore for now.
// When ContextManager is constructed module resolution error will be thrown.
protected storage = AsyncLocalStorage ? new AsyncLocalStorage<otel.Context>() : undefined;
// The workflow sandbox provides AsyncLocalStorage through globalThis.
protected storage: AsyncLocalStorage<otel.Context> = new (globalThis as any).AsyncLocalStorage();

public constructor() {
ensureWorkflowModuleLoaded();
Expand Down
39 changes: 39 additions & 0 deletions packages/test/src/test-interceptors.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,12 @@ import { cleanOptionalStackTrace, compareStackTrace, RUN_INTEGRATION_TESTS, Work
import { defaultOptions } from './mock-native-worker';
import {
checkDisposeRan,
conditionWithTimeoutAfterDisposal,
continueAsNewToDifferentWorkflow,
initAndResetFlag,
interceptorExample,
internalsInterceptorExample,
successString,
unblockOrCancel,
} from './workflows';
import { getSecretQuery, unblockWithSecretSignal } from './workflows/interceptor-example';
Expand Down Expand Up @@ -312,4 +314,41 @@ if (RUN_INTEGRATION_TESTS) {
t.true(disposeFlagSetNow);
});
});

// Test to trigger GH #1866
// When `reuseV8Context: true`, dispose() calls disableStorage() which disables the
// AsyncLocalStorage instance that stores cancellation scope.
// This causes CancellationScope.current() to return rootScope instead of the correct
// inner scope for workflows that continue afterward.
//
// The bug manifests in condition() with timeout: the finally block calls
// CancellationScope.current().cancel() to clean up.
// When storage is disabled, this incorrectly cancels the rootScope, failing the workflow with "Workflow cancelled".
test.serial('workflow disposal does not break CancellationScope in other workflows in reusable vm', async (t) => {
const taskQueue = 'test-reusable-vm-disposal-cancellation-scope';
const worker = await Worker.create({
...defaultOptions,
taskQueue,
});

const client = new WorkflowClient();
const result = await worker.runUntil(async () => {
// Fill the cache with workflow that complete immediately
await client.execute(successString, { taskQueue, workflowId: uuid4() });

// Start the condition workflow
const conditionHandle = await client.start(conditionWithTimeoutAfterDisposal, {
taskQueue,
workflowId: uuid4(),
});

// Run another workflow to trigger an evictions and disposal() while
// conditionWithTimeoutAfterDisposal is cached and waiting
await client.execute(successString, { taskQueue, workflowId: uuid4() });

// If dispose incorrectly disables the cancellation scope storage, then it will fail with CancelledFailure: "Workflow cancelled"
return await conditionHandle.result();
});
t.is(result, 'done');
});
}
61 changes: 61 additions & 0 deletions packages/test/src/test-workflow-async-local-storage.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
import type { AsyncLocalStorage } from 'async_hooks';
import * as workflow from '@temporalio/workflow';
import { helpers, makeTestFunction } from './helpers-integration';
import { unblockSignal } from './workflows/testenv-test-workflows';

const test = makeTestFunction({
workflowsPath: __filename,
workflowInterceptorModules: [require.resolve('./workflows/otel-interceptors')],
});

export async function asyncLocalStorageWorkflow(explicitlyDisable: boolean): Promise<void> {
const myAls: AsyncLocalStorage<unknown> = new (globalThis as any).AsyncLocalStorage('My Workflow ALS');
try {
await myAls.run({}, async () => {
let signalReceived = false;
workflow.setHandler(unblockSignal, () => {
signalReceived = true;
});
await workflow.condition(() => signalReceived);
});
} finally {
if (explicitlyDisable) {
myAls.disable();
}
}
}

test("AsyncLocalStorage in workflow context doesn't throw when disabled", async (t) => {
const { createWorker, startWorkflow } = helpers(t);

const worker = await createWorker({
// We disable the workflow cache to ensure that some workflow executions will get
// evicted from cache, forcing early disposal of the corresponding workflow vms.
maxCachedWorkflows: 0,
maxConcurrentWorkflowTaskExecutions: 1,

sinks: {
exporters: {
export: {
fn: () => void 0,
},
},
},
});

await worker.runUntil(async () => {
const wfs = await Promise.all([
startWorkflow(asyncLocalStorageWorkflow, { args: [true] }),
startWorkflow(asyncLocalStorageWorkflow, { args: [false] }),
startWorkflow(asyncLocalStorageWorkflow, { args: [true] }),
startWorkflow(asyncLocalStorageWorkflow, { args: [false] }),
]);

await Promise.all([wfs[0].signal(unblockSignal), wfs[1].signal(unblockSignal)]);
await Promise.all([wfs[0].result(), wfs[1].result()]);
});

// We're only asserting that no error is thrown. There's unfortunately no way
// to programmatically confirm that ALS instances were properly disposed.
t.pass();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are there any additional assertions that would be valuable here aside from "it doesn't throw"?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I played around with the possibility of observing the destruction of ALS by escaping the sandbox to create a FinalizationRegistry, which worked. The problem, though, is that I couldn't get that test to fail with the previous code, so that's not proving anything.

With that in mind, and given the complexity and fragility of that test, I decided not to commit it.

And I really can't think of any other side effect to look for.

});
4 changes: 4 additions & 0 deletions packages/test/src/test-workflows.ts
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,10 @@ test.beforeEach(async (t) => {
};
});

test.afterEach(async (t) => {
await t.context.workflow?.dispose();
});

async function createWorkflow(
workflowType: string,
runId: string,
Expand Down
1 change: 1 addition & 0 deletions packages/test/src/workflows/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ export * from './promise-then-promise';
export * from './race';
export * from './random';
export * from './reject-promise';
export * from './reusable-vm-disposal-bug';
export * from './run-activity-in-different-task-queue';
export * from './scope-cancelled-while-waiting-on-external-workflow-cancellation';
export * from './set-timeout-after-microtasks';
Expand Down
20 changes: 20 additions & 0 deletions packages/test/src/workflows/reusable-vm-disposal-bug.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
import { condition } from '@temporalio/workflow';

/**
* Workflow for reproducing the GH #1866
*
* When the bug is present:
* 1. First condition times out
* 2. finally block calls CancellationScope.current().cancel()
* 3. With disabled storage, current() returns rootScope
* 4. rootScope.cancel() is called, failing the workflow with "Workflow cancelled"
*
* Actual failure happens on the second `condition` where when creating the new
* cancellation scope, we see that the parent/root scope is already canceled.
*/
export async function conditionWithTimeoutAfterDisposal(): Promise<string> {
const alwaysFalse = false;
await condition(() => alwaysFalse, '500ms');
await condition(() => alwaysFalse, '500ms');
return 'done';
}
36 changes: 20 additions & 16 deletions packages/worker/src/workflow/reusable-vm.ts
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ export class ReusableVMWorkflowCreator implements WorkflowCreator {
*
* Use the {@link context} getter instead
*/
private _context?: vm.Context;
private _context?: vm.Context & typeof globalThis;
private pristineObj?: object;

constructor(
Expand All @@ -42,12 +42,12 @@ export class ReusableVMWorkflowCreator implements WorkflowCreator {
ReusableVMWorkflowCreator.unhandledRejectionHandlerHasBeenSet = true;
}

this._context = vm.createContext({}, { microtaskMode: 'afterEvaluate' });
this._context = vm.createContext({}, { microtaskMode: 'afterEvaluate' }) as vm.Context & typeof globalThis;
vm.runInContext(
`{
const __TEMPORAL_CALL_INTO_SCOPE = () => {
const [holder, fn, args] = globalThis.__TEMPORAL_ARGS__;
delete globalThis.__TEMPORAL_ARGS__;
const [holder, fn, args] = globalThis.__temporal_args;
delete globalThis.__temporal_args;

if (globalThis.__TEMPORAL_BAG_HOLDER__ !== holder) {
if (globalThis.__TEMPORAL_BAG_HOLDER__ !== undefined) {
Expand Down Expand Up @@ -81,13 +81,11 @@ export class ReusableVMWorkflowCreator implements WorkflowCreator {
{ timeout: isolateExecutionTimeoutMs, displayErrors: true }
);

this.injectGlobals(this._context);

const sharedModules = new Map<string | symbol, any>();
const __webpack_module_cache__ = new Proxy(
{},
{
get: (_, p) => {
get: (_, p: string) => {
// Try the shared modules first
const sharedModule = sharedModules.get(p);
if (sharedModule) {
Expand All @@ -96,7 +94,7 @@ export class ReusableVMWorkflowCreator implements WorkflowCreator {
const moduleCache = this.context.__TEMPORAL_ACTIVATOR__?.moduleCache;
return moduleCache?.get(p);
},
set: (_, p, val) => {
set: (_, p: string, val) => {
const moduleCache = this.context.__TEMPORAL_ACTIVATOR__?.moduleCache;
if (moduleCache != null) {
moduleCache.set(p, val);
Expand All @@ -115,6 +113,8 @@ export class ReusableVMWorkflowCreator implements WorkflowCreator {
configurable: false,
});

this.injectGlobals(this._context);

script.runInContext(this.context);

// The V8 context is really composed of two distinct objects: the 'this._context' object on the outside, and another
Expand All @@ -127,7 +127,7 @@ export class ReusableVMWorkflowCreator implements WorkflowCreator {
...Object.getOwnPropertyNames(this.pristineObj),
...Object.getOwnPropertySymbols(this.pristineObj),
]) {
if (k !== 'globalThis') {
if (k !== 'globalThis' && k !== '__temporal_globalSandboxDestructors') {
const v: PropertyDescriptor = (this.pristineObj as any)[k];
v.value = deepFreeze(v.value);
}
Expand All @@ -136,7 +136,7 @@ export class ReusableVMWorkflowCreator implements WorkflowCreator {
for (const v of sharedModules.values()) deepFreeze(v);
}

protected get context(): vm.Context {
protected get context(): vm.Context & typeof globalThis {
const { _context } = this;
if (_context == null) {
throw new IllegalStateError('Tried to use v8 context after Workflow creator was destroyed');
Expand All @@ -159,31 +159,31 @@ export class ReusableVMWorkflowCreator implements WorkflowCreator {
async createWorkflow(options: WorkflowCreateOptions): Promise<Workflow> {
const context = this.context;
const holder: BagHolder = { bag: this.pristineObj! };

const { isolateExecutionTimeoutMs } = this;

const workflowModule: WorkflowModule = new Proxy(
{},
{
get(_: any, fn: string) {
return (...args: any[]) => {
// By the time we get out of this call, all microtasks will have been executed
context.__TEMPORAL_ARGS__ = [holder, fn, args];
context.__temporal_args = [holder, fn, args];
return callIntoVmScript.runInContext(context, {
timeout: isolateExecutionTimeoutMs,
displayErrors: true,
});
};
},
}
) as any;
);

workflowModule.initRuntime({
...options,
sourceMap: this.workflowBundle.sourceMap,
getTimeOfDay: native.getTimeOfDay,
registeredActivityNames: this.registeredActivityNames,
});
const activator = context['__TEMPORAL_ACTIVATOR__'];
const activator = context.__TEMPORAL_ACTIVATOR__!;
const newVM = new ReusableVMWorkflow(options.info.runId, context, activator, workflowModule);
ReusableVMWorkflowCreator.workflowByRunId.set(options.info.runId, newVM);
return newVM;
Expand All @@ -210,8 +210,12 @@ export class ReusableVMWorkflowCreator implements WorkflowCreator {
* Cleanup the pre-compiled script
*/
public async destroy(): Promise<void> {
globalHandlers.removeWorkflowBundle(this.workflowBundle);
delete this._context;
try {
vm.runInContext(`__TEMPORAL__.api.destroy()`, this.context);
} finally {
globalHandlers.removeWorkflowBundle(this.workflowBundle);
delete this._context;
}
}
}

Expand Down
54 changes: 47 additions & 7 deletions packages/worker/src/workflow/vm-shared.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import v8 from 'node:v8';
import vm from 'node:vm';
import { AsyncLocalStorage } from 'node:async_hooks';
import { AsyncLocalStorage as AsyncLocalStorageOriginal } from 'node:async_hooks';
import assert from 'node:assert';
import { URL, URLSearchParams } from 'node:url';
import { TextDecoder, TextEncoder } from 'node:util';
Expand All @@ -18,6 +18,9 @@ import { convertDeploymentVersion } from '../utils';
import { Workflow } from './interface';
import { WorkflowBundleWithSourceMapAndFilename } from './workflow-worker-thread/input';

// We need this import for the ambient global extensions
import '@temporalio/workflow/lib/global-attributes'; // eslint-disable-line import/no-unassigned-import

// Best effort to catch unhandled rejections from workflow code.
// We crash the thread if we cannot find the culprit.
export function setUnhandledRejectionHandler(getWorkflowByRunId: (runId: string) => BaseVMWorkflow | undefined): void {
Expand Down Expand Up @@ -89,8 +92,9 @@ function formatCallsiteName(callsite: NodeJS.CallSite): string | null {
* Inject global objects as well as console.[log|...] into a vm context.
*/
export function injectGlobals(context: vm.Context): void {
const sandboxGlobalThis = context as typeof globalThis;

const globals = {
AsyncLocalStorage,
URL,
URLSearchParams,
assert,
Expand All @@ -99,25 +103,61 @@ export function injectGlobals(context: vm.Context): void {
AbortController,
};
for (const [k, v] of Object.entries(globals)) {
Object.defineProperty(context, k, { value: v, writable: false, enumerable: true, configurable: false });
Object.defineProperty(sandboxGlobalThis, k, { value: v, writable: false, enumerable: true, configurable: false });
}

const consoleMethods = ['log', 'warn', 'error', 'info', 'debug'] as const;
type ConsoleMethod = (typeof consoleMethods)[number];
function makeConsoleFn(level: ConsoleMethod) {
return function (...args: unknown[]) {
const { info } = context.__TEMPORAL_ACTIVATOR__;
if (info.isReplaying) return;
console[level](`[${info.workflowType}(${info.workflowId})]`, ...args);
if (sandboxGlobalThis.__TEMPORAL_ACTIVATOR__ === undefined) {
// This should not happen in a normal execution environment, but this is
// often handy while debugging the SDK, and costs nothing to keep around.
console[level](`[not in workflow context]`, ...args);
} else {
const { info } = sandboxGlobalThis.__TEMPORAL_ACTIVATOR__!;
if (info.unsafe.isReplaying) return;
console[level](`[${info.workflowType}(${info.workflowId})]`, ...args);
}
};
}
const consoleObject = Object.fromEntries(consoleMethods.map((level) => [level, makeConsoleFn(level)]));
Object.defineProperty(context, 'console', {
Object.defineProperty(sandboxGlobalThis, 'console', {
value: consoleObject,
writable: true,
enumerable: false,
configurable: true,
});

class AsyncLocalStorage extends AsyncLocalStorageOriginal<any> {
constructor(private name: string = 'anonymous') {
super();

const activator = sandboxGlobalThis.__TEMPORAL_ACTIVATOR__;
if (activator) {
activator.workflowSandboxDestructors.push(this.disable.bind(this));
} else {
if (sandboxGlobalThis.__temporal_globalSandboxDestructors === undefined)
Object.defineProperty(sandboxGlobalThis, '__temporal_globalSandboxDestructors', {
value: [],
writable: false,
enumerable: false,
configurable: false,
});
sandboxGlobalThis.__temporal_globalSandboxDestructors!.push(this.disable.bind(this));
}
}

disable(): void {
super.disable();
}
}
Object.defineProperty(sandboxGlobalThis, 'AsyncLocalStorage', {
value: AsyncLocalStorage,
writable: false,
enumerable: true,
configurable: false,
});
}

/**
Expand Down
Loading
Loading