Skip to content

Commit

Permalink
feat(stepfunctions): waitForTaskToken for Lambda, SQS, SNS (#2686)
Browse files Browse the repository at this point in the history
This PR allows one to work with Task states that implement the callback service integration pattern.

Introduces a new class for integrating with Lambda in the new invocation style, since there are a number
of subtle differences with the old invocation style.

The supported task types are:

* `RunLambdaTask` (AWS Lambda)
* `SendToQueue` (AWS SQS)
* `PublishToTopic` (AWS SNS)

Closes #2658, closes #2735.
  • Loading branch information
albegali authored and rix0rrr committed Jun 18, 2019
1 parent 65014ab commit d017a14
Show file tree
Hide file tree
Showing 9 changed files with 591 additions and 12 deletions.
1 change: 1 addition & 0 deletions packages/@aws-cdk/aws-stepfunctions-tasks/lib/index.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
export * from './invoke-function';
export * from './run-lambda-task';
export * from './invoke-activity';
export * from './run-ecs-task-base'; // Remove this once we can
export * from './run-ecs-task-base-types';
Expand Down
20 changes: 16 additions & 4 deletions packages/@aws-cdk/aws-stepfunctions-tasks/lib/invoke-function.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,25 @@ import iam = require('@aws-cdk/aws-iam');
import lambda = require('@aws-cdk/aws-lambda');
import sfn = require('@aws-cdk/aws-stepfunctions');

/**
* Properties for InvokeFunction
*/
export interface InvokeFunctionProps {
/**
* The JSON that you want to provide to your Lambda function as input.
*
* @default - The JSON data indicated by the task's InputPath is used as payload
*/
readonly payload?: { [key: string]: any };
}

/**
* A StepFunctions Task to invoke a Lambda function.
*
* A Function can be used directly as a Resource, but this class mirrors
* integration with other AWS services via a specific class instance.
* OUTPUT: the output of this task is the return value of the Lambda Function.
*/
export class InvokeFunction implements sfn.IStepFunctionsTask {
constructor(private readonly lambdaFunction: lambda.IFunction) {
constructor(private readonly lambdaFunction: lambda.IFunction, private readonly props: InvokeFunctionProps = {}) {
}

public bind(_task: sfn.Task): sfn.StepFunctionsTaskConfig {
Expand All @@ -22,6 +33,7 @@ export class InvokeFunction implements sfn.IStepFunctionsTask {
metricPrefixSingular: 'LambdaFunction',
metricPrefixPlural: 'LambdaFunctions',
metricDimensions: { LambdaFunctionArn: this.lambdaFunction.functionArn },
parameters: this.props.payload
};
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,13 @@ export interface PublishToTopicProps {
* Message subject
*/
readonly subject?: string;

/**
* Whether to pause the workflow until a task token is returned
*
* @default false
*/
readonly waitForTaskToken?: boolean;
}

/**
Expand All @@ -36,12 +43,20 @@ export interface PublishToTopicProps {
* integration with other AWS services via a specific class instance.
*/
export class PublishToTopic implements sfn.IStepFunctionsTask {

private readonly waitForTaskToken: boolean;

constructor(private readonly topic: sns.ITopic, private readonly props: PublishToTopicProps) {
this.waitForTaskToken = props.waitForTaskToken === true;

if (this.waitForTaskToken && !sfn.FieldUtils.containsTaskToken(props.message.value)) {
throw new Error('Task Token is missing in message (pass Context.taskToken somewhere in message)');
}
}

public bind(_task: sfn.Task): sfn.StepFunctionsTaskConfig {
return {
resourceArn: 'arn:aws:states:::sns:publish',
resourceArn: 'arn:aws:states:::sns:publish' + (this.waitForTaskToken ? '.waitForTaskToken' : ''),
policyStatements: [new iam.PolicyStatement({
actions: ['sns:Publish'],
resources: [this.topic.topicArn]
Expand Down
101 changes: 101 additions & 0 deletions packages/@aws-cdk/aws-stepfunctions-tasks/lib/run-lambda-task.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
import iam = require('@aws-cdk/aws-iam');
import lambda = require('@aws-cdk/aws-lambda');
import sfn = require('@aws-cdk/aws-stepfunctions');
import { FieldUtils } from '../../aws-stepfunctions/lib/fields';

/**
* Properties for RunLambdaTask
*/
export interface RunLambdaTaskProps {
/**
* The JSON that you want to provide to your Lambda function as input.
*/
readonly payload?: { [key: string]: any };

/**
* Whether to pause the workflow until a task token is returned
*
* If this is set to true, the Context.taskToken value must be included
* somewhere in the payload and the Lambda must call
* `SendTaskSuccess/SendTaskFailure` using that token.
*
* @default false
*/
readonly waitForTaskToken?: boolean;

/**
* Invocation type of the Lambda function
*
* @default RequestResponse
*/
readonly invocationType?: InvocationType;

/**
* Client context to pass to the function
*
* @default - No context
*/
readonly clientContext?: string;
}

/**
* Invoke a Lambda function as a Task
*
* OUTPUT: the output of this task is either the return value of Lambda's
* Invoke call, or whatever the Lambda Function posted back using
* `SendTaskSuccess/SendTaskFailure` in `waitForTaskToken` mode.
*
* @see https://docs.aws.amazon.com/step-functions/latest/dg/connect-lambda.html
*/
export class RunLambdaTask implements sfn.IStepFunctionsTask {
private readonly waitForTaskToken: boolean;

constructor(private readonly lambdaFunction: lambda.IFunction, private readonly props: RunLambdaTaskProps = {}) {
this.waitForTaskToken = !!props.waitForTaskToken;

if (this.waitForTaskToken && !FieldUtils.containsTaskToken(props.payload)) {
throw new Error('Task Token is missing in payload (pass Context.taskToken somewhere in payload)');
}
}

public bind(_task: sfn.Task): sfn.StepFunctionsTaskConfig {
const resourceArn = 'arn:aws:states:::lambda:invoke' + (this.waitForTaskToken ? '.waitForTaskToken' : '');

return {
resourceArn,
policyStatements: [new iam.PolicyStatement({
resources: [this.lambdaFunction.functionArn],
actions: ["lambda:InvokeFunction"],
})],
metricPrefixSingular: 'LambdaFunction',
metricPrefixPlural: 'LambdaFunctions',
metricDimensions: { LambdaFunctionArn: this.lambdaFunction.functionArn },
parameters: {
FunctionName: this.lambdaFunction.functionName,
Payload: this.props.payload,
InvocationType: this.props.invocationType,
ClientContext: this.props.clientContext,
}
};
}
}

/**
* Invocation type of a Lambda
*/
export enum InvocationType {
/**
* Invoke synchronously
*
* The API response includes the function response and additional data.
*/
RequestResponse = 'RequestResponse',

/**
* Invoke asynchronously
*
* Send events that fail multiple times to the function's dead-letter queue (if it's configured).
* The API response only includes a status code.
*/
Event = 'Event',
}
17 changes: 16 additions & 1 deletion packages/@aws-cdk/aws-stepfunctions-tasks/lib/send-to-queue.ts
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,13 @@ export interface SendToQueueProps {
* @default No group ID
*/
readonly messageGroupId?: string;

/**
* Whether to pause the workflow until a task token is returned
*
* @default false
*/
readonly waitForTaskToken?: boolean;
}

/**
Expand All @@ -45,12 +52,20 @@ export interface SendToQueueProps {
* integration with other AWS services via a specific class instance.
*/
export class SendToQueue implements sfn.IStepFunctionsTask {

private readonly waitForTaskToken: boolean;

constructor(private readonly queue: sqs.IQueue, private readonly props: SendToQueueProps) {
this.waitForTaskToken = props.waitForTaskToken === true;

if (this.waitForTaskToken && !sfn.FieldUtils.containsTaskToken(props.messageBody.value)) {
throw new Error('Task Token is missing in messageBody (pass Context.taskToken somewhere in messageBody)');
}
}

public bind(_task: sfn.Task): sfn.StepFunctionsTaskConfig {
return {
resourceArn: 'arn:aws:states:::sqs:sendMessage',
resourceArn: 'arn:aws:states:::sqs:sendMessage' + (this.waitForTaskToken ? '.waitForTaskToken' : ''),
policyStatements: [new iam.PolicyStatement({
actions: ['sqs:SendMessage'],
resources: [this.queue.queueArn]
Expand Down
Loading

0 comments on commit d017a14

Please sign in to comment.