Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 11 additions & 1 deletion packages/@aws-cdk/aws-lambda-event-sources/lib/dynamodb.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ export interface DynamoEventSourceProps extends StreamEventSourceProps {
* Use an Amazon DynamoDB stream as an event source for AWS Lambda.
*/
export class DynamoEventSource extends StreamEventSource {
private _eventSourceMappingId: string | undefined = undefined;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Use ? to declare optional types. Same in other places.

Suggested change
private _eventSourceMappingId: string | undefined = undefined;
private _eventSourceMappingId?: string;


constructor(private readonly table: dynamodb.Table, props: DynamoEventSourceProps) {
super(props);

Expand All @@ -22,10 +24,18 @@ export class DynamoEventSource extends StreamEventSource {
throw new Error(`DynamoDB Streams must be enabled on the table ${this.table.node.path}`);
}

target.addEventSourceMapping(`DynamoDBEventSource:${this.table.node.uniqueId}`,
const eventSourceMapping = target.addEventSourceMapping(`DynamoDBEventSource:${this.table.node.uniqueId}`,
this.enrichMappingOptions({eventSourceArn: this.table.tableStreamArn})
);
this._eventSourceMappingId = eventSourceMapping.eventSourceMappingId;

this.table.grantStreamRead(target);
}

/**
* The Ref of the EventSourceMapping
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Update documentation per the other comment in the review

*/
public get eventSourceMappingId(): string | undefined {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should probably define this as a property on IEventSource so that all future event sources also implement.

Copy link
Contributor Author

@eliasdraexler eliasdraexler Jan 28, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I thought about it, but there are also classes like SnsEventSource & S3EventSource & ApiEventSource that implement the IEventSource interface and do not have an eventSourceMappingId.

Should I still add it to the interface?

return this._eventSourceMappingId;
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would suggest that we throw an Error here instead of returning undefined, with the message 'DynamoEventSource is not yet bound to an event source mapping'.

}
12 changes: 11 additions & 1 deletion packages/@aws-cdk/aws-lambda-event-sources/lib/kinesis.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ export interface KinesisEventSourceProps extends StreamEventSourceProps {
* Use an Amazon Kinesis stream as an event source for AWS Lambda.
*/
export class KinesisEventSource extends StreamEventSource {
private _eventSourceMappingId: string | undefined = undefined;

constructor(readonly stream: kinesis.IStream, props: KinesisEventSourceProps) {
super(props);

Expand All @@ -18,10 +20,18 @@ export class KinesisEventSource extends StreamEventSource {
}

public bind(target: lambda.IFunction) {
target.addEventSourceMapping(`KinesisEventSource:${this.stream.node.uniqueId}`,
const eventSourceMapping = target.addEventSourceMapping(`KinesisEventSource:${this.stream.node.uniqueId}`,
this.enrichMappingOptions({eventSourceArn: this.stream.streamArn})
);
this._eventSourceMappingId = eventSourceMapping.eventSourceMappingId;

this.stream.grantRead(target);
}

/**
* The Ref of the EventSourceMapping
*/
public get eventSourceMappingId(): string | undefined {
return this._eventSourceMappingId;
}
}
12 changes: 11 additions & 1 deletion packages/@aws-cdk/aws-lambda-event-sources/lib/sqs.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,18 +18,28 @@ export interface SqsEventSourceProps {
* Use an Amazon SQS queue as an event source for AWS Lambda.
*/
export class SqsEventSource implements lambda.IEventSource {
private _eventSourceMappingId: string | undefined = undefined;

constructor(readonly queue: sqs.IQueue, private readonly props: SqsEventSourceProps = { }) {
if (this.props.batchSize !== undefined && (this.props.batchSize < 1 || this.props.batchSize > 10)) {
throw new Error(`Maximum batch size must be between 1 and 10 inclusive (given ${this.props.batchSize})`);
}
}

public bind(target: lambda.IFunction) {
target.addEventSourceMapping(`SqsEventSource:${this.queue.node.uniqueId}`, {
const eventSourceMapping = target.addEventSourceMapping(`SqsEventSource:${this.queue.node.uniqueId}`, {
batchSize: this.props.batchSize,
eventSourceArn: this.queue.queueArn,
});
this._eventSourceMappingId = eventSourceMapping.eventSourceMappingId;

this.queue.grantConsumeMessages(target);
}

/**
* The Ref of the EventSourceMapping
*/
public get eventSourceMappingId(): string | undefined {
return this._eventSourceMappingId;
}
}
42 changes: 42 additions & 0 deletions packages/@aws-cdk/aws-lambda-event-sources/test/test.dynamo.ts
Original file line number Diff line number Diff line change
Expand Up @@ -231,4 +231,46 @@ export = {
test.done();
},

'contains eventSourceMappingId after lambda binding'(test: Test) {
// GIVEN
const stack = new cdk.Stack();
const fn = new TestFunction(stack, 'Fn');
const table = new dynamodb.Table(stack, 'T', {
partitionKey: {
name: 'id',
type: dynamodb.AttributeType.STRING
},
stream: dynamodb.StreamViewType.NEW_IMAGE
});
const eventSource = new sources.DynamoEventSource(table, {
startingPosition: lambda.StartingPosition.TRIM_HORIZON
});

// WHEN
fn.addEventSource(eventSource);

// THEN
test.notEqual(eventSource.eventSourceMappingId, undefined);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Use test.ok().

I would even suggest that you test for equality to the event source id that gets returned here - it'll be a cloudformation intrinsic of the form 'Fn::GetAtt: { ... }'.

In case, it starts with 'Token...', use stack.resolve(eventSource.eventSourceMappingId) to resolve the token and get the CF intrinsic.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I changed it to test.ok but could you maybe elaborate on how I should check equality of the eventSourceId?

stack.resolve(eventSource.eventSourceMappingId) returns {Ref: '...'} where shall I get the id that should be equal to this Ref value?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You cannot. The eventual evenSourceMappingId is generated by cloudformation and our unit tests don't interact with it. You can only check for direct equality with the Ref string.

test.done();
},

'eventSourceMappingId is undefined before binding to lambda'(test: Test) {
// GIVEN
const stack = new cdk.Stack();
const table = new dynamodb.Table(stack, 'T', {
partitionKey: {
name: 'id',
type: dynamodb.AttributeType.STRING
},
stream: dynamodb.StreamViewType.NEW_IMAGE
});
const eventSource = new sources.DynamoEventSource(table, {
startingPosition: lambda.StartingPosition.TRIM_HORIZON
});

// WHEN/THEN
test.equal(eventSource.eventSourceMappingId, undefined);
test.done();
},

};
30 changes: 30 additions & 0 deletions packages/@aws-cdk/aws-lambda-event-sources/test/test.kinesis.ts
Original file line number Diff line number Diff line change
Expand Up @@ -153,4 +153,34 @@ export = {

test.done();
},

'contains eventSourceMappingId after lambda binding'(test: Test) {
// GIVEN
const stack = new cdk.Stack();
const fn = new TestFunction(stack, 'Fn');
const stream = new kinesis.Stream(stack, 'S');
const eventSource = new sources.KinesisEventSource(stream, {
startingPosition: lambda.StartingPosition.TRIM_HORIZON
});

// WHEN
fn.addEventSource(eventSource);

// THEN
test.notEqual(eventSource.eventSourceMappingId, undefined);
test.done();
},

'eventSourceMappingId is undefined before binding to lambda'(test: Test) {
// GIVEN
const stack = new cdk.Stack();
const stream = new kinesis.Stream(stack, 'S');
const eventSource = new sources.KinesisEventSource(stream, {
startingPosition: lambda.StartingPosition.TRIM_HORIZON
});

// WHEN/THEN
test.equal(eventSource.eventSourceMappingId, undefined);
test.done();
},
};
26 changes: 26 additions & 0 deletions packages/@aws-cdk/aws-lambda-event-sources/test/test.sqs.ts
Original file line number Diff line number Diff line change
Expand Up @@ -112,4 +112,30 @@ export = {

test.done();
},

'contains eventSourceMappingId after lambda binding'(test: Test) {
// GIVEN
const stack = new cdk.Stack();
const fn = new TestFunction(stack, 'Fn');
const q = new sqs.Queue(stack, 'Q');
const eventSource = new sources.SqsEventSource(q);

// WHEN
fn.addEventSource(eventSource);

// THEN
test.notEqual(eventSource.eventSourceMappingId, undefined);
test.done();
},

'eventSourceMappingId is undefined before binding to lambda'(test: Test) {
// GIVEN
const stack = new cdk.Stack();
const q = new sqs.Queue(stack, 'Q');
const eventSource = new sources.SqsEventSource(q);

// WHEN/THEN
test.equal(eventSource.eventSourceMappingId, undefined);
test.done();
},
};
9 changes: 8 additions & 1 deletion packages/@aws-cdk/aws-lambda/lib/event-source-mapping.ts
Original file line number Diff line number Diff line change
Expand Up @@ -67,21 +67,28 @@ export interface EventSourceMappingProps extends EventSourceMappingOptions {
* modify the Lambda's execution role so it can consume messages from the queue.
*/
export class EventSourceMapping extends cdk.Resource {
/**
* The Ref of the EventSourceMapping
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
* The Ref of the EventSourceMapping
* The identifier for this EventSourceMapping

* @attribute
*/
public readonly eventSourceMappingId: string;

constructor(scope: cdk.Construct, id: string, props: EventSourceMappingProps) {
super(scope, id);

if (props.maxBatchingWindow && props.maxBatchingWindow.toSeconds() > 300) {
throw new Error(`maxBatchingWindow cannot be over 300 seconds, got ${props.maxBatchingWindow.toSeconds()}`);
}

new CfnEventSourceMapping(this, 'Resource', {
const cfnEventSourceMapping = new CfnEventSourceMapping(this, 'Resource', {
batchSize: props.batchSize,
enabled: props.enabled,
eventSourceArn: props.eventSourceArn,
functionName: props.target.functionName,
startingPosition: props.startingPosition,
maximumBatchingWindowInSeconds: props.maxBatchingWindow && props.maxBatchingWindow.toSeconds(),
});
this.eventSourceMappingId = cfnEventSourceMapping.ref;
}
}

Expand Down