Skip to content

Commit d0c99d8

Browse files
authored
feat(pipes-targets): add Kinesis (#30656)
Add Kinesis data stream as a Pipes target. It's nontrivial to get the data from the Kinesis data stream, but here are screenshots showing data made it through during the integration test. <img width="656" alt="Screenshot 2024-06-24 at 7 24 10 PM" src="https://github.com/aws/aws-cdk/assets/3310356/bc6e12a2-8fea-42a7-baaa-e8b5b5ea652f"> <img width="649" alt="Screenshot 2024-06-24 at 7 26 35 PM" src="https://github.com/aws/aws-cdk/assets/3310356/5224b0d9-a356-47e6-ab48-3551ff3b5078">
1 parent 8fc6ec9 commit d0c99d8

16 files changed

+33157
-0
lines changed

packages/@aws-cdk/aws-pipes-targets-alpha/README.md

+36
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@ The following targets are supported:
3333
2. `targets.SfnStateMachine`: [Invoke a State Machine from an event source](#aws-step-functions-state-machine)
3434
3. `targets.LambdaFunction`: [Send event source to a Lambda Function](#aws-lambda-function)
3535
4. `targets.ApiDestinationTarget`: [Send event source to an EventBridge API Destination](#amazon-eventbridge-api-destination)
36+
5. `targets.KinesisTarget`: [Send event source to a Kinesis data stream](#amazon-kinesis-data-stream)
3637

3738
### Amazon SQS
3839

@@ -205,3 +206,38 @@ const pipe = new pipes.Pipe(this, 'Pipe', {
205206
target: apiTarget,
206207
});
207208
```
209+
210+
### Amazon Kinesis Data Stream
211+
212+
A data stream can be used as a target for a pipe. The data stream will receive the (enriched/filtered) source payload.
213+
214+
```ts
215+
declare const sourceQueue: sqs.Queue;
216+
declare const targetStream: kinesis.Stream;
217+
218+
const streamTarget = new targets.KinesisTarget(targetStream, {
219+
partitionKey: 'pk',
220+
});
221+
222+
const pipe = new pipes.Pipe(this, 'Pipe', {
223+
source: new SqsSource(sourceQueue),
224+
target: streamTarget,
225+
});
226+
```
227+
228+
The input to the target data stream can be transformed:
229+
230+
```ts
231+
declare const sourceQueue: sqs.Queue;
232+
declare const targetStream: kinesis.Stream;
233+
234+
const streamTarget = new targets.KinesisTarget(targetStream, {
235+
partitionKey: 'pk',
236+
inputTransformation: pipes.InputTransformation.fromObject({ body: "👀" }),
237+
});
238+
239+
const pipe = new pipes.Pipe(this, 'Pipe', {
240+
source: new SqsSource(sourceQueue),
241+
target: streamTarget,
242+
});
243+
```
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
export * from './api-destination';
2+
export * from './kinesis';
23
export * from './lambda';
34
export * from './sqs';
45
export * from './stepfunctions';
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,60 @@
1+
import { IInputTransformation, IPipe, ITarget, TargetConfig } from '@aws-cdk/aws-pipes-alpha';
2+
import { Token } from 'aws-cdk-lib';
3+
import { IRole } from 'aws-cdk-lib/aws-iam';
4+
import { IStream } from 'aws-cdk-lib/aws-kinesis';
5+
6+
/**
7+
* Kinesis target properties.
8+
*/
9+
export interface KinesisTargetParameters {
10+
/**
11+
* The input transformation to apply to the message before sending it to the target.
12+
*
13+
* @see https://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/aws-properties-pipes-pipe-pipetargetparameters.html#cfn-pipes-pipe-pipetargetparameters-inputtemplate
14+
* @default - none
15+
*/
16+
readonly inputTransformation?: IInputTransformation;
17+
18+
/**
19+
* Determines which shard in the stream the data record is assigned to.
20+
*
21+
* @see https://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/aws-properties-pipes-pipe-pipetargetkinesisstreamparameters.html#cfn-pipes-pipe-pipetargetkinesisstreamparameters-partitionkey
22+
*/
23+
readonly partitionKey: string;
24+
}
25+
26+
/**
27+
* An EventBridge Pipes target that sends messages to a Kinesis stream.
28+
*/
29+
export class KinesisTarget implements ITarget {
30+
private stream: IStream;
31+
private streamParameters: KinesisTargetParameters;
32+
public readonly targetArn: string;
33+
34+
constructor(stream: IStream, parameters: KinesisTargetParameters) {
35+
this.stream = stream;
36+
this.targetArn = stream.streamArn;
37+
this.streamParameters = parameters;
38+
39+
validatePartitionKey(parameters.partitionKey);
40+
}
41+
42+
grantPush(grantee: IRole): void {
43+
this.stream.grantWrite(grantee);
44+
}
45+
46+
bind(pipe: IPipe): TargetConfig {
47+
return {
48+
targetParameters: {
49+
inputTemplate: this.streamParameters.inputTransformation?.bind(pipe).inputTemplate,
50+
kinesisStreamParameters: this.streamParameters,
51+
},
52+
};
53+
}
54+
}
55+
56+
function validatePartitionKey(pk: string) {
57+
if (!Token.isUnresolved(pk) && pk.length > 256) {
58+
throw new Error(`Partition key must be less than or equal to 256 characters, received ${pk.length}`);
59+
}
60+
}

packages/@aws-cdk/aws-pipes-targets-alpha/rosetta/default.ts-fixture

+1
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
// Fixture with packages imported, but nothing else
22
import * as cdk from 'aws-cdk-lib';
33
import * as events from 'aws-cdk-lib/aws-events';
4+
import * as kinesis from 'aws-cdk-lib/aws-kinesis';
45
import * as sqs from 'aws-cdk-lib/aws-sqs';
56
import * as sfn from 'aws-cdk-lib/aws-stepfunctions';
67
import * as lambda from 'aws-cdk-lib/aws-lambda';
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,58 @@
1+
// Jest Snapshot v1, https://goo.gl/fbAQLP
2+
3+
exports[`Kinesis should grant pipe role push access 1`] = `
4+
{
5+
"MyPipeRoleCBC8E9AB": {
6+
"Properties": {
7+
"AssumeRolePolicyDocument": {
8+
"Statement": [
9+
{
10+
"Action": "sts:AssumeRole",
11+
"Effect": "Allow",
12+
"Principal": {
13+
"Service": "pipes.amazonaws.com",
14+
},
15+
},
16+
],
17+
"Version": "2012-10-17",
18+
},
19+
},
20+
"Type": "AWS::IAM::Role",
21+
},
22+
}
23+
`;
24+
25+
exports[`Kinesis should grant pipe role push access 2`] = `
26+
{
27+
"MyPipeRoleDefaultPolicy31387C20": {
28+
"Properties": {
29+
"PolicyDocument": {
30+
"Statement": [
31+
{
32+
"Action": [
33+
"kinesis:ListShards",
34+
"kinesis:PutRecord",
35+
"kinesis:PutRecords",
36+
],
37+
"Effect": "Allow",
38+
"Resource": {
39+
"Fn::GetAtt": [
40+
"MyStream5C050E93",
41+
"Arn",
42+
],
43+
},
44+
},
45+
],
46+
"Version": "2012-10-17",
47+
},
48+
"PolicyName": "MyPipeRoleDefaultPolicy31387C20",
49+
"Roles": [
50+
{
51+
"Ref": "MyPipeRoleCBC8E9AB",
52+
},
53+
],
54+
},
55+
"Type": "AWS::IAM::Policy",
56+
},
57+
}
58+
`;

0 commit comments

Comments
 (0)