Skip to content

Commit 79711fe

Browse files
authored
Merge branch 'master' into huijbers/json-real-list
2 parents 403513c + 73209e1 commit 79711fe

File tree

12 files changed

+835
-3
lines changed

12 files changed

+835
-3
lines changed

packages/@aws-cdk/aws-lambda-event-sources/README.md

Lines changed: 61 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -207,6 +207,67 @@ myFunction.addEventSource(new KinesisEventSource(stream, {
207207
}));
208208
```
209209

210+
## Kafka
211+
212+
You can write Lambda functions to process data either from [Amazon MSK](https://docs.aws.amazon.com/lambda/latest/dg/with-msk.html) or a [self managed Kafka](https://docs.aws.amazon.com/lambda/latest/dg/kafka-smaa.html) cluster.
213+
214+
The following code sets up Amazon MSK as an event source for a lambda function. Credentials will need to be configured to access the
215+
MSK cluster, as described in [Username/Password authentication](https://docs.aws.amazon.com/msk/latest/developerguide/msk-password.html).
216+
217+
```ts
218+
import * as lambda from '@aws-cdk/aws-lambda';
219+
import * as msk from '@aws-cdk/aws-lambda';
220+
import { Secret } from '@aws-cdk/aws-secretmanager';
221+
import { ManagedKafkaEventSource } from '@aws-cdk/aws-lambda-event-sources';
222+
223+
// Your MSK cluster
224+
const cluster = msk.Cluster.fromClusterArn(this, 'Cluster',
225+
'arn:aws:kafka:us-east-1:0123456789019:cluster/SalesCluster/abcd1234-abcd-cafe-abab-9876543210ab-4');
226+
227+
// The Kafka topic you want to subscribe to
228+
const topic = 'some-cool-topic'
229+
230+
// The secret that allows access to your MSK cluster
231+
// You still have to make sure that it is associated with your cluster as described in the documentation
232+
const secret = new Secret(this, 'Secret', { secretName: 'AmazonMSK_KafkaSecret' });
233+
234+
myFunction.addEventSource(new ManagedKafkaEventSource({
235+
cluster: cluster,
236+
topic: topic,
237+
secret: secret,
238+
batchSize: 100, // default
239+
startingPosition: lambda.StartingPosition.TRIM_HORIZON
240+
}));
241+
```
242+
243+
The following code sets up a self managed Kafka cluster as an event source. Username and password based authentication
244+
will need to be set up as described in [Managing access and permissions](https://docs.aws.amazon.com/lambda/latest/dg/smaa-permissions.html#smaa-permissions-add-secret).
245+
246+
```ts
247+
import * as lambda from '@aws-cdk/aws-lambda';
248+
import { Secret } from '@aws-cdk/aws-secretmanager';
249+
import { SelfManagedKafkaEventSource } from '@aws-cdk/aws-lambda-event-sources';
250+
251+
// The list of Kafka brokers
252+
const bootstrapServers = ['kafka-broker:9092']
253+
254+
// The Kafka topic you want to subscribe to
255+
const topic = 'some-cool-topic'
256+
257+
// The secret that allows access to your self hosted Kafka cluster
258+
const secret = new Secret(this, 'Secret', { ... });
259+
260+
myFunction.addEventSource(new SelfManagedKafkaEventSource({
261+
bootstrapServers: bootstrapServers,
262+
topic: topic,
263+
secret: secret,
264+
batchSize: 100, // default
265+
startingPosition: lambda.StartingPosition.TRIM_HORIZON
266+
}));
267+
```
268+
269+
If your self managed Kafka cluster is only reachable via VPC also configure `vpc` `vpcSubnets` and `securityGroup`.
270+
210271
## Roadmap
211272

212273
Eventually, this module will support all the event sources described under

packages/@aws-cdk/aws-lambda-event-sources/lib/index.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
export * from './api';
22
export * from './dynamodb';
3+
export * from './kafka';
34
export * from './kinesis';
45
export * from './s3';
56
export * from './sns';
Lines changed: 194 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,194 @@
1+
import * as crypto from 'crypto';
2+
import { ISecurityGroup, IVpc, SubnetSelection } from '@aws-cdk/aws-ec2';
3+
import * as iam from '@aws-cdk/aws-iam';
4+
import * as lambda from '@aws-cdk/aws-lambda';
5+
import * as msk from '@aws-cdk/aws-msk';
6+
import * as secretsmanager from '@aws-cdk/aws-secretsmanager';
7+
import { Stack } from '@aws-cdk/core';
8+
import { StreamEventSource, StreamEventSourceProps } from './stream';
9+
10+
// keep this import separate from other imports to reduce chance for merge conflicts with v2-main
11+
// eslint-disable-next-line no-duplicate-imports, import/order
12+
import { Construct } from '@aws-cdk/core';
13+
14+
/**
15+
* Properties for a Kafka event source
16+
*/
17+
export interface KafkaEventSourceProps extends StreamEventSourceProps {
18+
/**
19+
* the Kafka topic to subscribe to
20+
*/
21+
readonly topic: string,
22+
/**
23+
* the secret with the Kafka credentials, see https://docs.aws.amazon.com/msk/latest/developerguide/msk-password.html for details
24+
*/
25+
readonly secret: secretsmanager.ISecret
26+
}
27+
28+
/**
29+
* Properties for a MSK event source
30+
*/
31+
export interface ManagedKafkaEventSourceProps extends KafkaEventSourceProps {
32+
/**
33+
* an MSK cluster construct
34+
*/
35+
readonly cluster: msk.ICluster
36+
}
37+
38+
/**
39+
* The authentication method to use with SelfManagedKafkaEventSource
40+
*/
41+
export enum AuthenticationMethod {
42+
/**
43+
* SASL_SCRAM_512_AUTH authentication method for your Kafka cluster
44+
*/
45+
SASL_SCRAM_512_AUTH = 'SASL_SCRAM_512_AUTH',
46+
/**
47+
* SASL_SCRAM_256_AUTH authentication method for your Kafka cluster
48+
*/
49+
SASL_SCRAM_256_AUTH = 'SASL_SCRAM_256_AUTH',
50+
}
51+
52+
/**
53+
* Properties for a self managed Kafka cluster event source.
54+
* If your Kafka cluster is only reachable via VPC make sure to configure it.
55+
*/
56+
export interface SelfManagedKafkaEventSourceProps extends KafkaEventSourceProps {
57+
/**
58+
* The list of host and port pairs that are the addresses of the Kafka brokers in a "bootstrap" Kafka cluster that
59+
* a Kafka client connects to initially to bootstrap itself. They are in the format `abc.xyz.com:xxxx`.
60+
*/
61+
readonly bootstrapServers: string[]
62+
63+
/**
64+
* If your Kafka brokers are only reachable via VPC provide the VPC here
65+
*
66+
* @default none
67+
*/
68+
readonly vpc?: IVpc;
69+
70+
/**
71+
* If your Kafka brokers are only reachable via VPC, provide the subnets selection here
72+
*
73+
* @default - none, required if setting vpc
74+
*/
75+
readonly vpcSubnets?: SubnetSelection,
76+
77+
/**
78+
* If your Kafka brokers are only reachable via VPC, provide the security group here
79+
*
80+
* @default - none, required if setting vpc
81+
*/
82+
readonly securityGroup?: ISecurityGroup
83+
84+
/**
85+
* The authentication method for your Kafka cluster
86+
*
87+
* @default AuthenticationMethod.SASL_SCRAM_512_AUTH
88+
*/
89+
readonly authenticationMethod?: AuthenticationMethod
90+
}
91+
92+
/**
93+
* Use a MSK cluster as a streaming source for AWS Lambda
94+
*/
95+
export class ManagedKafkaEventSource extends StreamEventSource {
96+
// This is to work around JSII inheritance problems
97+
private innerProps: ManagedKafkaEventSourceProps;
98+
99+
constructor(props: ManagedKafkaEventSourceProps) {
100+
super(props);
101+
this.innerProps = props;
102+
}
103+
104+
public bind(target: lambda.IFunction) {
105+
target.addEventSourceMapping(
106+
`KafkaEventSource:${this.innerProps.cluster.clusterArn}${this.innerProps.topic}`,
107+
this.enrichMappingOptions({
108+
eventSourceArn: this.innerProps.cluster.clusterArn,
109+
startingPosition: this.innerProps.startingPosition,
110+
// From https://docs.aws.amazon.com/msk/latest/developerguide/msk-password.html#msk-password-limitations, "Amazon MSK only supports SCRAM-SHA-512 authentication."
111+
sourceAccessConfigurations: [{ type: lambda.SourceAccessConfigurationType.SASL_SCRAM_512_AUTH, uri: this.innerProps.secret.secretArn }],
112+
kafkaTopic: this.innerProps.topic,
113+
}),
114+
);
115+
116+
this.innerProps.secret.grantRead(target);
117+
118+
target.addToRolePolicy(new iam.PolicyStatement(
119+
{
120+
actions: ['kafka:DescribeCluster', 'kafka:GetBootstrapBrokers', 'kafka:ListScramSecrets'],
121+
resources: [this.innerProps.cluster.clusterArn],
122+
},
123+
));
124+
125+
target.role?.addManagedPolicy(iam.ManagedPolicy.fromAwsManagedPolicyName('service-role/AWSLambdaMSKExecutionRole'));
126+
}
127+
}
128+
129+
/**
130+
* Use a self hosted Kafka installation as a streaming source for AWS Lambda.
131+
*/
132+
export class SelfManagedKafkaEventSource extends StreamEventSource {
133+
// This is to work around JSII inheritance problems
134+
private innerProps: SelfManagedKafkaEventSourceProps;
135+
136+
constructor(props: SelfManagedKafkaEventSourceProps) {
137+
super(props);
138+
if (props.vpc) {
139+
if (!props.securityGroup) {
140+
throw new Error('securityGroup must be set when providing vpc');
141+
}
142+
if (!props.vpcSubnets) {
143+
throw new Error('vpcSubnets must be set when providing vpc');
144+
}
145+
}
146+
this.innerProps = props;
147+
}
148+
149+
public bind(target: lambda.IFunction) {
150+
if (!Construct.isConstruct(target)) { throw new Error('Function is not a construct. Unexpected error.'); }
151+
target.addEventSourceMapping(
152+
this.mappingId(target),
153+
this.enrichMappingOptions({
154+
kafkaBootstrapServers: this.innerProps.bootstrapServers,
155+
kafkaTopic: this.innerProps.topic,
156+
startingPosition: this.innerProps.startingPosition,
157+
sourceAccessConfigurations: this.sourceAccessConfigurations(),
158+
}),
159+
);
160+
this.innerProps.secret.grantRead(target);
161+
}
162+
163+
private mappingId(target: lambda.IFunction) {
164+
let hash = crypto.createHash('md5');
165+
hash.update(JSON.stringify(Stack.of(target).resolve(this.innerProps.bootstrapServers)));
166+
const idHash = hash.digest('hex');
167+
return `KafkaEventSource:${idHash}:${this.innerProps.topic}`;
168+
}
169+
170+
private sourceAccessConfigurations() {
171+
let authType;
172+
switch (this.innerProps.authenticationMethod) {
173+
case AuthenticationMethod.SASL_SCRAM_256_AUTH:
174+
authType = lambda.SourceAccessConfigurationType.SASL_SCRAM_256_AUTH;
175+
break;
176+
case AuthenticationMethod.SASL_SCRAM_512_AUTH:
177+
default:
178+
authType = lambda.SourceAccessConfigurationType.SASL_SCRAM_512_AUTH;
179+
break;
180+
}
181+
let sourceAccessConfigurations = [{ type: authType, uri: this.innerProps.secret.secretArn }];
182+
if (this.innerProps.vpcSubnets !== undefined && this.innerProps.securityGroup !== undefined) {
183+
sourceAccessConfigurations.push({
184+
type: lambda.SourceAccessConfigurationType.VPC_SECURITY_GROUP,
185+
uri: this.innerProps.securityGroup.securityGroupId,
186+
},
187+
);
188+
this.innerProps.vpc?.selectSubnets(this.innerProps.vpcSubnets).subnetIds.forEach((id) => {
189+
sourceAccessConfigurations.push({ type: lambda.SourceAccessConfigurationType.VPC_SUBNET, uri: id });
190+
});
191+
}
192+
return sourceAccessConfigurations;
193+
}
194+
}

packages/@aws-cdk/aws-lambda-event-sources/lib/stream.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@ import { Duration } from '@aws-cdk/core';
33

44
/**
55
* The set of properties for event sources that follow the streaming model,
6-
* such as, Dynamo and Kinesis.
6+
* such as, Dynamo, Kinesis and Kafka.
77
*/
88
export interface StreamEventSourceProps {
99
/**

packages/@aws-cdk/aws-lambda-event-sources/package.json

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -72,12 +72,15 @@
7272
"dependencies": {
7373
"@aws-cdk/aws-apigateway": "0.0.0",
7474
"@aws-cdk/aws-dynamodb": "0.0.0",
75+
"@aws-cdk/aws-ec2": "0.0.0",
7576
"@aws-cdk/aws-events": "0.0.0",
7677
"@aws-cdk/aws-iam": "0.0.0",
7778
"@aws-cdk/aws-kinesis": "0.0.0",
7879
"@aws-cdk/aws-lambda": "0.0.0",
80+
"@aws-cdk/aws-msk": "0.0.0",
7981
"@aws-cdk/aws-s3": "0.0.0",
8082
"@aws-cdk/aws-s3-notifications": "0.0.0",
83+
"@aws-cdk/aws-secretsmanager": "0.0.0",
8184
"@aws-cdk/aws-sns": "0.0.0",
8285
"@aws-cdk/aws-sns-subscriptions": "0.0.0",
8386
"@aws-cdk/aws-sqs": "0.0.0",
@@ -88,12 +91,15 @@
8891
"peerDependencies": {
8992
"@aws-cdk/aws-apigateway": "0.0.0",
9093
"@aws-cdk/aws-dynamodb": "0.0.0",
94+
"@aws-cdk/aws-ec2": "0.0.0",
9195
"@aws-cdk/aws-events": "0.0.0",
9296
"@aws-cdk/aws-iam": "0.0.0",
9397
"@aws-cdk/aws-kinesis": "0.0.0",
9498
"@aws-cdk/aws-lambda": "0.0.0",
99+
"@aws-cdk/aws-msk": "0.0.0",
95100
"@aws-cdk/aws-s3": "0.0.0",
96101
"@aws-cdk/aws-s3-notifications": "0.0.0",
102+
"@aws-cdk/aws-secretsmanager": "0.0.0",
97103
"@aws-cdk/aws-sns": "0.0.0",
98104
"@aws-cdk/aws-sns-subscriptions": "0.0.0",
99105
"@aws-cdk/aws-sqs": "0.0.0",

0 commit comments

Comments
 (0)