Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(aws-stepfunctions-tasks): allow specifying waitForTaskToken suffix in resourceArn #2686

Merged
merged 9 commits into from
Jun 18, 2019
49 changes: 25 additions & 24 deletions packages/@aws-cdk/aws-stepfunctions-tasks/lib/invoke-function.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,17 +15,13 @@ export interface InvokeFunctionProps {
/**
* Whether to pause the workflow until a task token is returned
*
* @default false
*/
readonly waitForTaskToken?: boolean;

/**
* Whether to invoke lambda via integrated service ARN "arn:aws:states:::lambda:invoke"
* or via Function ARN.
* If this is set to true, the Context.taskToken value must be included
* somewhere in the payload and the Lambda must call SendTaskSuccess
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

SendTaskSuccess, SendTaskFailure or SendTaskHeartbeat

* using that token.
*
* @default false
*/
readonly invokeAsIntegratedService?: boolean;
readonly waitForTaskToken?: boolean;
}

/**
Expand All @@ -35,27 +31,30 @@ export interface InvokeFunctionProps {
* integration with other AWS services via a specific class instance.
*/
export class InvokeFunction implements sfn.IStepFunctionsTask {

private readonly waitForTaskToken: boolean;
private readonly invokeAsIntegratedService: boolean;

constructor(private readonly lambdaFunction: lambda.IFunction, private readonly props: InvokeFunctionProps = {}) {
this.waitForTaskToken = props.waitForTaskToken === true;

// Invoke function as integrated service if flag is in props, or if waitForTaskToken property is true
this.invokeAsIntegratedService = props.invokeAsIntegratedService === true || this.waitForTaskToken;
this.waitForTaskToken = !!props.waitForTaskToken;

if (this.waitForTaskToken && !FieldUtils.containsTaskToken(props.payload)) {
throw new Error('Task Token is missing in payload');
throw new Error('Task Token is missing in payload (pass Context.taskToken somewhere in payload)');
}
}

public bind(_task: sfn.Task): sfn.StepFunctionsTaskConfig {
const resourceArn = this.invokeAsIntegratedService
? 'arn:aws:states:::lambda:invoke' + this.waitForTaskToken ? '.waitForTaskToken' : ''
const resourceArn = this.waitForTaskToken
? 'arn:aws:states:::lambda:invoke.waitForTaskToken'
: this.lambdaFunction.functionArn;

const includeParameters = this.invokeAsIntegratedService || this.props.payload;
let parameters: any;
if (this.waitForTaskToken) {
parameters = {
FunctionName: this.lambdaFunction.functionName,
Payload: nonEmptyObject(this.props.payload),
};
} else {
parameters = this.props.payload;
}

return {
resourceArn,
Expand All @@ -66,12 +65,14 @@ export class InvokeFunction implements sfn.IStepFunctionsTask {
metricPrefixSingular: 'LambdaFunction',
metricPrefixPlural: 'LambdaFunctions',
metricDimensions: { LambdaFunctionArn: this.lambdaFunction.functionArn },
...includeParameters && {
parameters: {
...this.invokeAsIntegratedService && { FunctionName: this.lambdaFunction.functionName },
...this.props.payload && { Payload: this.props.payload },
}
}
parameters: nonEmptyObject(parameters)
};
}
}

function nonEmptyObject(x: any): any {
if (typeof x === 'object' && x !== null && Object.entries(x).length === 0) {
return undefined;
}
return x;
}
Original file line number Diff line number Diff line change
Expand Up @@ -268,14 +268,11 @@
"Arn"
]
},
"\"},\"Invoke Handler with task token\":{\"Next\":\"Job Complete?\",\"InputPath\":\"$.guid\",\"Type\":\"Task\",\"Resource\":\"",
"\"},\"Invoke Handler with task token\":{\"Next\":\"Job Complete?\",\"InputPath\":\"$.guid\",\"Parameters\":{\"FunctionName\":\"",
{
"Fn::GetAtt": [
"CallbackHandler4434C38D",
"Arn"
]
"Ref": "CallbackHandler4434C38D"
},
"\",\"ResultPath\":\"$.status\"},\"Job Complete?\":{\"Type\":\"Choice\",\"Choices\":[{\"Variable\":\"$.status\",\"StringEquals\":\"FAILED\",\"Next\":\"Job Failed\"},{\"Variable\":\"$.status\",\"StringEquals\":\"SUCCEEDED\",\"Next\":\"Final step\"}]},\"Job Failed\":{\"Type\":\"Fail\",\"Error\":\"DescribeJob returned FAILED\",\"Cause\":\"AWS Batch Job Failed\"},\"Final step\":{\"Type\":\"Pass\",\"End\":true}},\"TimeoutSeconds\":30}"
"\",\"Payload\":{\"token\":\"$$.Task.Token\"}},\"Type\":\"Task\",\"Resource\":\"arn:aws:states:::lambda:invoke.waitForTaskToken\",\"ResultPath\":\"$.status\"},\"Job Complete?\":{\"Type\":\"Choice\",\"Choices\":[{\"Variable\":\"$.status\",\"StringEquals\":\"FAILED\",\"Next\":\"Job Failed\"},{\"Variable\":\"$.status\",\"StringEquals\":\"SUCCEEDED\",\"Next\":\"Final step\"}]},\"Job Failed\":{\"Type\":\"Fail\",\"Error\":\"DescribeJob returned FAILED\",\"Cause\":\"AWS Batch Job Failed\"},\"Final step\":{\"Type\":\"Pass\",\"End\":true}},\"TimeoutSeconds\":30}"
]
]
},
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ const taskTokenHandler = new sfn.Task(stack, 'Invoke Handler with task token', {
task: new tasks.InvokeFunction(callbackHandler, {
waitForTaskToken: true,
payload: {
"token.$": "$$.Task.Token"
token: sfn.Context.taskToken
}
}),
inputPath: '$.guid',
Expand Down
109 changes: 33 additions & 76 deletions packages/@aws-cdk/aws-stepfunctions-tasks/test/invoke-function.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,16 +4,19 @@ import sfn = require('@aws-cdk/aws-stepfunctions');
import { Stack } from '@aws-cdk/cdk';
import tasks = require('../lib');

test('Lambda function can be used in a Task', () => {
// GIVEN
const stack = new Stack();

// WHEN
const fn = new lambda.Function(stack, 'Fn', {
let stack: Stack;
let fn: lambda.Function;
beforeEach(() => {
stack = new Stack();
fn = new lambda.Function(stack, 'Fn', {
code: lambda.Code.inline('hello'),
handler: 'index.hello',
runtime: lambda.Runtime.Python27,
});
});

test('Lambda function can be used in a Task', () => {
// WHEN
const task = new sfn.Task(stack, 'Task', { task: new tasks.InvokeFunction(fn) });
new sfn.StateMachine(stack, 'SM', {
definition: task
Expand All @@ -31,90 +34,34 @@ test('Lambda function can be used in a Task', () => {
});
});

test('Lambda function can be used in a Task with Task Token', () => {
// GIVEN
const stack = new Stack();

// WHEN
const fn = new lambda.Function(stack, 'Fn', {
code: lambda.Code.inline('hello'),
handler: 'index.hello',
runtime: lambda.Runtime.Python27,
});
const task = new sfn.Task(stack, 'Task', {
task: new tasks.InvokeFunction(fn, {
waitForTaskToken: true,
payload: {
"token.$": "$$.Task.Token"
}
})
});
test('Lambda function payload ends up in Parameters', () => {
new sfn.StateMachine(stack, 'SM', {
definition: task
definition: new sfn.Task(stack, 'Task', {
task: new tasks.InvokeFunction(fn, {
payload: {
foo: 'bar'
}
})
})
});

// THEN
expect(stack).toHaveResource('AWS::StepFunctions::StateMachine', {
DefinitionString: {
"Fn::Join": ["", [
"{\"StartAt\":\"Task\",\"States\":{\"Task\":{\"End\":true,\"Type\":\"Task\",\"Resource\":\"",
"{\"StartAt\":\"Task\",\"States\":{\"Task\":{\"End\":true,\"Parameters\":{\"foo\":\"bar\"},\"Type\":\"Task\",\"Resource\":\"",
{ "Fn::GetAtt": ["Fn9270CBC0", "Arn"] },
"\"}}}"
]]
},
});
});

test('Lambda function can be used in a Task with integrated service ARN', () => {
// GIVEN
const stack = new Stack();

// WHEN
const fn = new lambda.Function(stack, 'Fn', {
code: lambda.Code.inline('hello'),
handler: 'index.hello',
runtime: lambda.Runtime.Python27,
});
const task = new sfn.Task(stack, 'Task', {
task: new tasks.InvokeFunction(fn, { invokeAsIntegratedService: true })
});
new sfn.StateMachine(stack, 'SM', {
definition: task
});

// THEN
expect(stack).toHaveResource('AWS::StepFunctions::StateMachine', {
DefinitionString: {
"Fn::Join": [
"",
[
"{\"StartAt\":\"Task\",\"States\":{\"Task\":{\"End\":true,\"Parameters\":{\"FunctionName\":\"",
{
Ref: "Fn9270CBC0"
},
"\"},\"Type\":\"Task\",\"Resource\":\".waitForTaskToken\"}}}"
]
]
},
});
});

test('Lambda function can be used in a Task with integrated service ARN and with Task Token', () => {
// GIVEN
const stack = new Stack();

// WHEN
const fn = new lambda.Function(stack, 'Fn', {
code: lambda.Code.inline('hello'),
handler: 'index.hello',
runtime: lambda.Runtime.Python27,
});
test('Lambda function can be used in a Task with Task Token', () => {
const task = new sfn.Task(stack, 'Task', {
task: new tasks.InvokeFunction(fn, {
invokeAsIntegratedService: true,
waitForTaskToken: true,
payload: {
"token.$": "$$.Task.Token"
token: sfn.Context.taskToken
}
})
});
Expand All @@ -126,10 +73,20 @@ test('Lambda function can be used in a Task with integrated service ARN and with
expect(stack).toHaveResource('AWS::StepFunctions::StateMachine', {
DefinitionString: {
"Fn::Join": ["", [
"{\"StartAt\":\"Task\",\"States\":{\"Task\":{\"End\":true,\"Type\":\"Task\",\"Resource\":\"",
{ "Fn::GetAtt": ["Fn9270CBC0", "Arn"] },
"\"}}}"
"{\"StartAt\":\"Task\",\"States\":{\"Task\":{\"End\":true,\"Parameters\":{\"FunctionName\":\"",
{ Ref: "Fn9270CBC0" },
"\",\"Payload\":{\"token\":\"$$.Task.Token\"}},\"Type\":\"Task\",\"Resource\":\"arn:aws:states:::lambda:invoke.waitForTaskToken\"}}}"
]]
},
});
});

test('Task throws if waitForTaskToken is supplied but task token is not included', () => {
expect(() => {
new sfn.Task(stack, 'Task', {
task: new tasks.InvokeFunction(fn, {
waitForTaskToken: true
})
});
}).toThrow(/Task Token is missing in payload/i);
});
2 changes: 1 addition & 1 deletion packages/decdk/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -155,4 +155,4 @@
"engines": {
"node": ">= 8.10.0"
}
}
}