diff --git a/packages/@aws-cdk/aws-stepfunctions-tasks/lib/index.ts b/packages/@aws-cdk/aws-stepfunctions-tasks/lib/index.ts index 41f2533ba0149..966a034532788 100644 --- a/packages/@aws-cdk/aws-stepfunctions-tasks/lib/index.ts +++ b/packages/@aws-cdk/aws-stepfunctions-tasks/lib/index.ts @@ -1,4 +1,5 @@ export * from './invoke-function'; +export * from './run-lambda-task'; export * from './invoke-activity'; export * from './run-ecs-task-base'; // Remove this once we can export * from './run-ecs-task-base-types'; diff --git a/packages/@aws-cdk/aws-stepfunctions-tasks/lib/invoke-function.ts b/packages/@aws-cdk/aws-stepfunctions-tasks/lib/invoke-function.ts index 67d42fdf79537..4a07618c39e2e 100644 --- a/packages/@aws-cdk/aws-stepfunctions-tasks/lib/invoke-function.ts +++ b/packages/@aws-cdk/aws-stepfunctions-tasks/lib/invoke-function.ts @@ -2,14 +2,25 @@ import iam = require('@aws-cdk/aws-iam'); import lambda = require('@aws-cdk/aws-lambda'); import sfn = require('@aws-cdk/aws-stepfunctions'); +/** + * Properties for InvokeFunction + */ +export interface InvokeFunctionProps { + /** + * The JSON that you want to provide to your Lambda function as input. + * + * @default - The JSON data indicated by the task's InputPath is used as payload + */ + readonly payload?: { [key: string]: any }; +} + /** * A StepFunctions Task to invoke a Lambda function. * - * A Function can be used directly as a Resource, but this class mirrors - * integration with other AWS services via a specific class instance. + * OUTPUT: the output of this task is the return value of the Lambda Function. */ export class InvokeFunction implements sfn.IStepFunctionsTask { - constructor(private readonly lambdaFunction: lambda.IFunction) { + constructor(private readonly lambdaFunction: lambda.IFunction, private readonly props: InvokeFunctionProps = {}) { } public bind(_task: sfn.Task): sfn.StepFunctionsTaskConfig { @@ -22,6 +33,7 @@ export class InvokeFunction implements sfn.IStepFunctionsTask { metricPrefixSingular: 'LambdaFunction', metricPrefixPlural: 'LambdaFunctions', metricDimensions: { LambdaFunctionArn: this.lambdaFunction.functionArn }, + parameters: this.props.payload }; } -} +} \ No newline at end of file diff --git a/packages/@aws-cdk/aws-stepfunctions-tasks/lib/publish-to-topic.ts b/packages/@aws-cdk/aws-stepfunctions-tasks/lib/publish-to-topic.ts index 87f2a254af44a..3c222772e0a5b 100644 --- a/packages/@aws-cdk/aws-stepfunctions-tasks/lib/publish-to-topic.ts +++ b/packages/@aws-cdk/aws-stepfunctions-tasks/lib/publish-to-topic.ts @@ -27,6 +27,13 @@ export interface PublishToTopicProps { * Message subject */ readonly subject?: string; + + /** + * Whether to pause the workflow until a task token is returned + * + * @default false + */ + readonly waitForTaskToken?: boolean; } /** @@ -36,12 +43,20 @@ export interface PublishToTopicProps { * integration with other AWS services via a specific class instance. */ export class PublishToTopic implements sfn.IStepFunctionsTask { + + private readonly waitForTaskToken: boolean; + constructor(private readonly topic: sns.ITopic, private readonly props: PublishToTopicProps) { + this.waitForTaskToken = props.waitForTaskToken === true; + + if (this.waitForTaskToken && !sfn.FieldUtils.containsTaskToken(props.message.value)) { + throw new Error('Task Token is missing in message (pass Context.taskToken somewhere in message)'); + } } public bind(_task: sfn.Task): sfn.StepFunctionsTaskConfig { return { - resourceArn: 'arn:aws:states:::sns:publish', + resourceArn: 'arn:aws:states:::sns:publish' + (this.waitForTaskToken ? '.waitForTaskToken' : ''), policyStatements: [new iam.PolicyStatement({ actions: ['sns:Publish'], resources: [this.topic.topicArn] diff --git a/packages/@aws-cdk/aws-stepfunctions-tasks/lib/run-lambda-task.ts b/packages/@aws-cdk/aws-stepfunctions-tasks/lib/run-lambda-task.ts new file mode 100644 index 0000000000000..c6071227854d2 --- /dev/null +++ b/packages/@aws-cdk/aws-stepfunctions-tasks/lib/run-lambda-task.ts @@ -0,0 +1,101 @@ +import iam = require('@aws-cdk/aws-iam'); +import lambda = require('@aws-cdk/aws-lambda'); +import sfn = require('@aws-cdk/aws-stepfunctions'); +import { FieldUtils } from '../../aws-stepfunctions/lib/fields'; + +/** + * Properties for RunLambdaTask + */ +export interface RunLambdaTaskProps { + /** + * The JSON that you want to provide to your Lambda function as input. + */ + readonly payload?: { [key: string]: any }; + + /** + * Whether to pause the workflow until a task token is returned + * + * If this is set to true, the Context.taskToken value must be included + * somewhere in the payload and the Lambda must call + * `SendTaskSuccess/SendTaskFailure` using that token. + * + * @default false + */ + readonly waitForTaskToken?: boolean; + + /** + * Invocation type of the Lambda function + * + * @default RequestResponse + */ + readonly invocationType?: InvocationType; + + /** + * Client context to pass to the function + * + * @default - No context + */ + readonly clientContext?: string; +} + +/** + * Invoke a Lambda function as a Task + * + * OUTPUT: the output of this task is either the return value of Lambda's + * Invoke call, or whatever the Lambda Function posted back using + * `SendTaskSuccess/SendTaskFailure` in `waitForTaskToken` mode. + * + * @see https://docs.aws.amazon.com/step-functions/latest/dg/connect-lambda.html + */ +export class RunLambdaTask implements sfn.IStepFunctionsTask { + private readonly waitForTaskToken: boolean; + + constructor(private readonly lambdaFunction: lambda.IFunction, private readonly props: RunLambdaTaskProps = {}) { + this.waitForTaskToken = !!props.waitForTaskToken; + + if (this.waitForTaskToken && !FieldUtils.containsTaskToken(props.payload)) { + throw new Error('Task Token is missing in payload (pass Context.taskToken somewhere in payload)'); + } + } + + public bind(_task: sfn.Task): sfn.StepFunctionsTaskConfig { + const resourceArn = 'arn:aws:states:::lambda:invoke' + (this.waitForTaskToken ? '.waitForTaskToken' : ''); + + return { + resourceArn, + policyStatements: [new iam.PolicyStatement({ + resources: [this.lambdaFunction.functionArn], + actions: ["lambda:InvokeFunction"], + })], + metricPrefixSingular: 'LambdaFunction', + metricPrefixPlural: 'LambdaFunctions', + metricDimensions: { LambdaFunctionArn: this.lambdaFunction.functionArn }, + parameters: { + FunctionName: this.lambdaFunction.functionName, + Payload: this.props.payload, + InvocationType: this.props.invocationType, + ClientContext: this.props.clientContext, + } + }; + } +} + +/** + * Invocation type of a Lambda + */ +export enum InvocationType { + /** + * Invoke synchronously + * + * The API response includes the function response and additional data. + */ + RequestResponse = 'RequestResponse', + + /** + * Invoke asynchronously + * + * Send events that fail multiple times to the function's dead-letter queue (if it's configured). + * The API response only includes a status code. + */ + Event = 'Event', +} \ No newline at end of file diff --git a/packages/@aws-cdk/aws-stepfunctions-tasks/lib/send-to-queue.ts b/packages/@aws-cdk/aws-stepfunctions-tasks/lib/send-to-queue.ts index 4a764b4ba0b74..3a202c5962e0d 100644 --- a/packages/@aws-cdk/aws-stepfunctions-tasks/lib/send-to-queue.ts +++ b/packages/@aws-cdk/aws-stepfunctions-tasks/lib/send-to-queue.ts @@ -36,6 +36,13 @@ export interface SendToQueueProps { * @default No group ID */ readonly messageGroupId?: string; + + /** + * Whether to pause the workflow until a task token is returned + * + * @default false + */ + readonly waitForTaskToken?: boolean; } /** @@ -45,12 +52,20 @@ export interface SendToQueueProps { * integration with other AWS services via a specific class instance. */ export class SendToQueue implements sfn.IStepFunctionsTask { + + private readonly waitForTaskToken: boolean; + constructor(private readonly queue: sqs.IQueue, private readonly props: SendToQueueProps) { + this.waitForTaskToken = props.waitForTaskToken === true; + + if (this.waitForTaskToken && !sfn.FieldUtils.containsTaskToken(props.messageBody.value)) { + throw new Error('Task Token is missing in messageBody (pass Context.taskToken somewhere in messageBody)'); + } } public bind(_task: sfn.Task): sfn.StepFunctionsTaskConfig { return { - resourceArn: 'arn:aws:states:::sqs:sendMessage', + resourceArn: 'arn:aws:states:::sqs:sendMessage' + (this.waitForTaskToken ? '.waitForTaskToken' : ''), policyStatements: [new iam.PolicyStatement({ actions: ['sqs:SendMessage'], resources: [this.queue.queueArn] diff --git a/packages/@aws-cdk/aws-stepfunctions-tasks/test/integ.invoke-function.expected.json b/packages/@aws-cdk/aws-stepfunctions-tasks/test/integ.invoke-function.expected.json new file mode 100644 index 0000000000000..18f710ef22d77 --- /dev/null +++ b/packages/@aws-cdk/aws-stepfunctions-tasks/test/integ.invoke-function.expected.json @@ -0,0 +1,314 @@ +{ + "Resources": { + "HandlerServiceRoleFCDC14AE": { + "Type": "AWS::IAM::Role", + "Properties": { + "AssumeRolePolicyDocument": { + "Statement": [ + { + "Action": "sts:AssumeRole", + "Effect": "Allow", + "Principal": { + "Service": { + "Fn::Join": [ + "", + [ + "lambda.", + { + "Ref": "AWS::URLSuffix" + } + ] + ] + } + } + } + ], + "Version": "2012-10-17" + }, + "ManagedPolicyArns": [ + { + "Fn::Join": [ + "", + [ + "arn:", + { + "Ref": "AWS::Partition" + }, + ":iam::aws:policy/service-role/AWSLambdaBasicExecutionRole" + ] + ] + } + ] + } + }, + "Handler886CB40B": { + "Type": "AWS::Lambda::Function", + "Properties": { + "Code": { + "S3Bucket": { + "Ref": "HandlerCodeS3Bucket8DD11ED9" + }, + "S3Key": { + "Fn::Join": [ + "", + [ + { + "Fn::Select": [ + 0, + { + "Fn::Split": [ + "||", + { + "Ref": "HandlerCodeS3VersionKey0BB5191E" + } + ] + } + ] + }, + { + "Fn::Select": [ + 1, + { + "Fn::Split": [ + "||", + { + "Ref": "HandlerCodeS3VersionKey0BB5191E" + } + ] + } + ] + } + ] + ] + } + }, + "Handler": "index.main", + "Role": { + "Fn::GetAtt": [ + "HandlerServiceRoleFCDC14AE", + "Arn" + ] + }, + "Runtime": "python3.6" + }, + "DependsOn": [ + "HandlerServiceRoleFCDC14AE" + ] + }, + "CallbackHandlerServiceRole3689695E": { + "Type": "AWS::IAM::Role", + "Properties": { + "AssumeRolePolicyDocument": { + "Statement": [ + { + "Action": "sts:AssumeRole", + "Effect": "Allow", + "Principal": { + "Service": { + "Fn::Join": [ + "", + [ + "lambda.", + { + "Ref": "AWS::URLSuffix" + } + ] + ] + } + } + } + ], + "Version": "2012-10-17" + }, + "ManagedPolicyArns": [ + { + "Fn::Join": [ + "", + [ + "arn:", + { + "Ref": "AWS::Partition" + }, + ":iam::aws:policy/service-role/AWSLambdaBasicExecutionRole" + ] + ] + } + ] + } + }, + "CallbackHandler4434C38D": { + "Type": "AWS::Lambda::Function", + "Properties": { + "Code": { + "S3Bucket": { + "Ref": "CallbackHandlerCodeS3Bucket806D7490" + }, + "S3Key": { + "Fn::Join": [ + "", + [ + { + "Fn::Select": [ + 0, + { + "Fn::Split": [ + "||", + { + "Ref": "CallbackHandlerCodeS3VersionKeyDD40A461" + } + ] + } + ] + }, + { + "Fn::Select": [ + 1, + { + "Fn::Split": [ + "||", + { + "Ref": "CallbackHandlerCodeS3VersionKeyDD40A461" + } + ] + } + ] + } + ] + ] + } + }, + "Handler": "index.main", + "Role": { + "Fn::GetAtt": [ + "CallbackHandlerServiceRole3689695E", + "Arn" + ] + }, + "Runtime": "python3.6" + }, + "DependsOn": [ + "CallbackHandlerServiceRole3689695E" + ] + }, + "StateMachineRoleB840431D": { + "Type": "AWS::IAM::Role", + "Properties": { + "AssumeRolePolicyDocument": { + "Statement": [ + { + "Action": "sts:AssumeRole", + "Effect": "Allow", + "Principal": { + "Service": { + "Fn::Join": [ + "", + [ + "states.", + { + "Ref": "AWS::Region" + }, + ".amazonaws.com" + ] + ] + } + } + } + ], + "Version": "2012-10-17" + } + } + }, + "StateMachineRoleDefaultPolicyDF1E6607": { + "Type": "AWS::IAM::Policy", + "Properties": { + "PolicyDocument": { + "Statement": [ + { + "Action": "lambda:InvokeFunction", + "Effect": "Allow", + "Resource": { + "Fn::GetAtt": [ + "Handler886CB40B", + "Arn" + ] + } + }, + { + "Action": "lambda:InvokeFunction", + "Effect": "Allow", + "Resource": { + "Fn::GetAtt": [ + "CallbackHandler4434C38D", + "Arn" + ] + } + } + ], + "Version": "2012-10-17" + }, + "PolicyName": "StateMachineRoleDefaultPolicyDF1E6607", + "Roles": [ + { + "Ref": "StateMachineRoleB840431D" + } + ] + } + }, + "StateMachine2E01A3A5": { + "Type": "AWS::StepFunctions::StateMachine", + "Properties": { + "DefinitionString": { + "Fn::Join": [ + "", + [ + "{\"StartAt\":\"Invoke Handler\",\"States\":{\"Invoke Handler\":{\"Next\":\"Invoke Handler with task token\",\"Type\":\"Task\",\"Resource\":\"", + { + "Fn::GetAtt": [ + "Handler886CB40B", + "Arn" + ] + }, + "\"},\"Invoke Handler with task token\":{\"Next\":\"Job Complete?\",\"InputPath\":\"$.guid\",\"Parameters\":{\"FunctionName\":\"", + { + "Ref": "CallbackHandler4434C38D" + }, + "\",\"Payload\":{\"token\":\"$$.Task.Token\"}},\"Type\":\"Task\",\"Resource\":\"arn:aws:states:::lambda:invoke.waitForTaskToken\",\"ResultPath\":\"$.status\"},\"Job Complete?\":{\"Type\":\"Choice\",\"Choices\":[{\"Variable\":\"$.status\",\"StringEquals\":\"FAILED\",\"Next\":\"Job Failed\"},{\"Variable\":\"$.status\",\"StringEquals\":\"SUCCEEDED\",\"Next\":\"Final step\"}]},\"Job Failed\":{\"Type\":\"Fail\",\"Error\":\"DescribeJob returned FAILED\",\"Cause\":\"AWS Batch Job Failed\"},\"Final step\":{\"Type\":\"Pass\",\"End\":true}},\"TimeoutSeconds\":30}" + ] + ] + }, + "RoleArn": { + "Fn::GetAtt": [ + "StateMachineRoleB840431D", + "Arn" + ] + } + } + } + }, + "Parameters": { + "HandlerCodeS3Bucket8DD11ED9": { + "Type": "String", + "Description": "S3 bucket for asset \"aws-stepfunctions-integ/Handler/Code\"" + }, + "HandlerCodeS3VersionKey0BB5191E": { + "Type": "String", + "Description": "S3 key for asset version \"aws-stepfunctions-integ/Handler/Code\"" + }, + "HandlerCodeArtifactHashD7814EF8": { + "Type": "String", + "Description": "Artifact hash for asset \"aws-stepfunctions-integ/Handler/Code\"" + }, + "CallbackHandlerCodeS3Bucket806D7490": { + "Type": "String", + "Description": "S3 bucket for asset \"aws-stepfunctions-integ/CallbackHandler/Code\"" + }, + "CallbackHandlerCodeS3VersionKeyDD40A461": { + "Type": "String", + "Description": "S3 key for asset version \"aws-stepfunctions-integ/CallbackHandler/Code\"" + }, + "CallbackHandlerCodeArtifactHash2D279BFF": { + "Type": "String", + "Description": "Artifact hash for asset \"aws-stepfunctions-integ/CallbackHandler/Code\"" + } + } +} \ No newline at end of file diff --git a/packages/@aws-cdk/aws-stepfunctions-tasks/test/integ.invoke-function.ts b/packages/@aws-cdk/aws-stepfunctions-tasks/test/integ.invoke-function.ts new file mode 100644 index 0000000000000..2f0e1d2a797ec --- /dev/null +++ b/packages/@aws-cdk/aws-stepfunctions-tasks/test/integ.invoke-function.ts @@ -0,0 +1,57 @@ +import sfn = require('@aws-cdk/aws-stepfunctions'); +import cdk = require('@aws-cdk/cdk'); +import path = require('path'); +import { Code, Function, Runtime } from '../../aws-lambda/lib'; +import tasks = require('../lib'); + +const app = new cdk.App(); +const stack = new cdk.Stack(app, 'aws-stepfunctions-integ'); + +const handler = new Function(stack, 'Handler', { + code: Code.asset(path.join(__dirname, 'my-lambda-handler')), + handler: 'index.main', + runtime: Runtime.Python36 +}); + +const submitJob = new sfn.Task(stack, 'Invoke Handler', { + task: new tasks.InvokeFunction(handler), +}); + +const callbackHandler = new Function(stack, 'CallbackHandler', { + code: Code.asset(path.join(__dirname, 'my-lambda-handler')), + handler: 'index.main', + runtime: Runtime.Python36 +}); + +const taskTokenHandler = new sfn.Task(stack, 'Invoke Handler with task token', { + task: new tasks.RunLambdaTask(callbackHandler, { + waitForTaskToken: true, + payload: { + token: sfn.Context.taskToken + } + }), + inputPath: '$.guid', + resultPath: '$.status', +}); + +const isComplete = new sfn.Choice(stack, 'Job Complete?'); +const jobFailed = new sfn.Fail(stack, 'Job Failed', { + cause: 'AWS Batch Job Failed', + error: 'DescribeJob returned FAILED', +}); +const finalStatus = new sfn.Pass(stack, 'Final step'); + +const chain = sfn.Chain + .start(submitJob) + .next(taskTokenHandler) + .next(isComplete + .when(sfn.Condition.stringEquals('$.status', 'FAILED'), jobFailed) + .when(sfn.Condition.stringEquals('$.status', 'SUCCEEDED'), finalStatus) + ); + +new sfn.StateMachine(stack, 'StateMachine', { + definition: chain, + timeoutSec: 30 +}); + +app.run(); \ No newline at end of file diff --git a/packages/@aws-cdk/aws-stepfunctions-tasks/test/invoke-function.test.ts b/packages/@aws-cdk/aws-stepfunctions-tasks/test/invoke-function.test.ts index e0fab6553ac17..611f30d627815 100644 --- a/packages/@aws-cdk/aws-stepfunctions-tasks/test/invoke-function.test.ts +++ b/packages/@aws-cdk/aws-stepfunctions-tasks/test/invoke-function.test.ts @@ -4,16 +4,19 @@ import sfn = require('@aws-cdk/aws-stepfunctions'); import { Stack } from '@aws-cdk/cdk'; import tasks = require('../lib'); -test('Lambda function can be used in a Task', () => { - // GIVEN - const stack = new Stack(); - - // WHEN - const fn = new lambda.Function(stack, 'Fn', { +let stack: Stack; +let fn: lambda.Function; +beforeEach(() => { + stack = new Stack(); + fn = new lambda.Function(stack, 'Fn', { code: lambda.Code.inline('hello'), handler: 'index.hello', runtime: lambda.Runtime.Python27, }); +}); + +test('Lambda function can be used in a Task', () => { + // WHEN const task = new sfn.Task(stack, 'Task', { task: new tasks.InvokeFunction(fn) }); new sfn.StateMachine(stack, 'SM', { definition: task @@ -29,4 +32,61 @@ test('Lambda function can be used in a Task', () => { ]] }, }); +}); + +test('Lambda function payload ends up in Parameters', () => { + new sfn.StateMachine(stack, 'SM', { + definition: new sfn.Task(stack, 'Task', { + task: new tasks.InvokeFunction(fn, { + payload: { + foo: 'bar' + } + }) + }) + }); + + expect(stack).toHaveResource('AWS::StepFunctions::StateMachine', { + DefinitionString: { + "Fn::Join": ["", [ + "{\"StartAt\":\"Task\",\"States\":{\"Task\":{\"End\":true,\"Parameters\":{\"foo\":\"bar\"},\"Type\":\"Task\",\"Resource\":\"", + { "Fn::GetAtt": ["Fn9270CBC0", "Arn"] }, + "\"}}}" + ]] + }, + }); +}); + +test('Lambda function can be used in a Task with Task Token', () => { + const task = new sfn.Task(stack, 'Task', { + task: new tasks.RunLambdaTask(fn, { + waitForTaskToken: true, + payload: { + token: sfn.Context.taskToken + } + }) + }); + new sfn.StateMachine(stack, 'SM', { + definition: task + }); + + // THEN + expect(stack).toHaveResource('AWS::StepFunctions::StateMachine', { + DefinitionString: { + "Fn::Join": ["", [ + "{\"StartAt\":\"Task\",\"States\":{\"Task\":{\"End\":true,\"Parameters\":{\"FunctionName\":\"", + { Ref: "Fn9270CBC0" }, + "\",\"Payload\":{\"token\":\"$$.Task.Token\"}},\"Type\":\"Task\",\"Resource\":\"arn:aws:states:::lambda:invoke.waitForTaskToken\"}}}" + ]] + }, + }); +}); + +test('Task throws if waitForTaskToken is supplied but task token is not included', () => { + expect(() => { + new sfn.Task(stack, 'Task', { + task: new tasks.RunLambdaTask(fn, { + waitForTaskToken: true + }) + }); + }).toThrow(/Task Token is missing in payload/i); }); \ No newline at end of file diff --git a/packages/@aws-cdk/aws-stepfunctions-tasks/test/my-lambda-handler/index.py b/packages/@aws-cdk/aws-stepfunctions-tasks/test/my-lambda-handler/index.py new file mode 100644 index 0000000000000..179dcbbb27423 --- /dev/null +++ b/packages/@aws-cdk/aws-stepfunctions-tasks/test/my-lambda-handler/index.py @@ -0,0 +1,4 @@ +def main(event, context): + return { + 'message': 'Hello, world!' + } \ No newline at end of file