Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 24 additions & 0 deletions packages/test/src/test-integration-workflows.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1081,6 +1081,30 @@ if (RUN_TIME_SKIPPING_TESTS) {
});
}

export async function rootWorkflow(): Promise<string> {
let result = '';
if (!workflow.workflowInfo().rootWorkflow) {
result += 'empty';
} else {
result += workflow.workflowInfo().rootWorkflow!.workflowId;
}
if (!workflow.workflowInfo().parent) {
result += ' ';
result += await workflow.executeChild(rootWorkflow, { args: [] });
}
return result;
}

test('Workflow can return root workflow', async (t) => {
const { createWorker, startWorkflow } = helpers(t);
const worker = await createWorker();
await worker.runUntil(async () => {
const handle = await startWorkflow(rootWorkflow, { workflowId: 'test-root-workflow-length', args: [] });
const result = await handle.result();
t.deepEqual(result, 'empty test-root-workflow-length');
});
});

export async function upsertAndReadMemo(memo: Record<string, unknown>): Promise<Record<string, unknown> | undefined> {
workflow.upsertMemo(memo);
return workflow.workflowInfo().memo;
Expand Down
21 changes: 19 additions & 2 deletions packages/worker/src/utils.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import type { coresdk } from '@temporalio/proto';
import { IllegalStateError, ParentWorkflowInfo } from '@temporalio/workflow';
import type { coresdk, temporal } from '@temporalio/proto';
import { IllegalStateError, ParentWorkflowInfo, RootWorkflowInfo } from '@temporalio/workflow';

export const MiB = 1024 ** 2;

Expand Down Expand Up @@ -28,3 +28,20 @@ export function convertToParentWorkflowType(
namespace: parent.namespace,
};
}

export function convertToRootWorkflowType(
root: temporal.api.common.v1.IWorkflowExecution | null | undefined
): RootWorkflowInfo | undefined {
if (!root) {
return undefined;
}

if (!root.workflowId || !root.runId) {
throw new IllegalStateError('Root IWorkflowExecution is missing a field that should be defined');
}

return {
workflowId: root.workflowId,
runId: root.runId,
};
}
4 changes: 3 additions & 1 deletion packages/worker/src/worker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ import {
} from './replay';
import { History, Runtime } from './runtime';
import { CloseableGroupedObservable, closeableGroupBy, mapWithState, mergeMapWithState } from './rxutils';
import { byteArrayToBuffer, convertToParentWorkflowType } from './utils';
import { byteArrayToBuffer, convertToParentWorkflowType, convertToRootWorkflowType } from './utils';
import {
CompiledWorkerOptions,
CompiledWorkerOptionsWithBuildId,
Expand Down Expand Up @@ -1270,6 +1270,7 @@ export class Worker {
workflowExecutionExpirationTime,
cronScheduleToScheduleInterval,
priority,
rootWorkflow,
} = initWorkflowJob;

// Note that we can't do payload convertion here, as there's no guarantee that converted payloads would be safe to
Expand Down Expand Up @@ -1307,6 +1308,7 @@ export class Worker {
isReplaying: activation.isReplaying,
},
priority: decodePriority(priority),
rootWorkflow: convertToRootWorkflowType(rootWorkflow),
};
const logAttributes = workflowLogAttributes(workflowInfo);
this.logger.trace('Creating workflow', logAttributes);
Expand Down
1 change: 1 addition & 0 deletions packages/workflow/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,7 @@ export {
ContinueAsNew,
ContinueAsNewOptions,
EnhancedStackTrace,
RootWorkflowInfo,
StackTraceFileLocation,
StackTraceFileSlice,
ParentClosePolicy,
Expand Down
21 changes: 21 additions & 0 deletions packages/workflow/src/interfaces.ts
Original file line number Diff line number Diff line change
Expand Up @@ -189,6 +189,22 @@ export interface WorkflowInfo {
* Priority of this workflow
*/
readonly priority?: Priority;

/**
* The root workflow execution, defined as follows:
* 1. A workflow without a parent workflow is its own root workflow.
* 2. A workflow with a parent workflow has the same root workflow as
* its parent.
*
* When there is no parent workflow, i.e., the workflow is its own root workflow,
* this field is `undefined`.
*
* Note that Continue-as-New (or reset) propagates the workflow parentage relationship,
* and therefore, whether the new workflow has the same root workflow as the original one
* depends on whether it had a parent.
*
*/
readonly rootWorkflow?: RootWorkflowInfo;
}

/**
Expand Down Expand Up @@ -229,6 +245,11 @@ export interface ParentWorkflowInfo {
namespace: string;
}

export interface RootWorkflowInfo {
workflowId: string;
runId: string;
}

/**
* Not an actual error, used by the Workflow runtime to abort execution when {@link continueAsNew} is called
*/
Expand Down
Loading