Skip to content
Merged
Show file tree
Hide file tree
Changes from 20 commits
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 28 additions & 0 deletions packages/@aws-cdk/aws-msk-alpha/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -232,6 +232,34 @@ const cluster = new msk.Cluster(this, 'cluster', {
});
```

## MSK Express Brokers

You can create an MSK cluster with Express Brokers by setting the `express` property to `true`. Express Brokers are a low-cost option for development, testing, and workloads that don't require the high availability guarantees of standard MSK cluster.
For more information, see [Amazon MSK Express Brokers](https://docs.aws.amazon.com/msk/latest/developerguide/msk-broker-types-express.html).

**Note:** When using Express Brokers, the following constraints apply:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: It would also be good to include information about the supported broker sizes

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.


- You must specify the `instanceType`
- The VPC must have at least 3 subnets (across 3 AZs)
- `ebsStorageInfo` is not supported
- `storageMode` is not supported
- `logging` is not supported

```ts
declare const vpc: ec2.Vpc;

const expressCluster = new msk.Cluster(this, 'ExpressCluster', {
clusterName: 'MyExpressCluster',
kafkaVersion: msk.KafkaVersion.V3_8_X,
vpc,
express: true,
instanceType: ec2.InstanceType.of(
ec2.InstanceClass.M7G,
ec2.InstanceSize.XLARGE,
),
});
```

## MSK Serverless

You can also use MSK Serverless by using `ServerlessCluster` class.
Expand Down
40 changes: 34 additions & 6 deletions packages/@aws-cdk/aws-msk-alpha/lib/cluster.ts
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,15 @@ export interface ClusterProps {
*/
readonly instanceType?: ec2.InstanceType;

/**
* Whether to use an Express Broker.
* When set to true, the cluster will be created with Express Brokers.
* When this is set to true, instanceType must also be specified.
*
* @default false
*/
readonly express?: boolean;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we switch from using boolean to enum instead readonly brokerType?: BrokerType; keeping it more future proof ?

enum BrokerType {
   STANDARD = 'STANDARD', 
   EXPRESS = 'EXPRESS' 
}

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks.
Added Enum.


/**
* The AWS security groups to associate with the elastic network interfaces in order to specify who can
* connect to and communicate with the Amazon MSK cluster.
Expand Down Expand Up @@ -502,8 +511,26 @@ export class Cluster extends ClusterBase {
throw new core.ValidationError('EBS volume size should be in the range 1-16384', this);
}

if (props.express) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

MSK ExpressBroker is supported only on Apache Kafka 3.6 and 3.8; Can we add that validation please.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added a validation.

if (!props.instanceType) {
throw new core.ValidationError('`instanceType` must also be specified when `express` is true.', this);
}
if (props.ebsStorageInfo) {
throw new core.ValidationError('`ebsStorageInfo` is not supported when `express` is true.', this);
}
if (props.storageMode) {
throw new core.ValidationError('`storageMode` is not supported when `express` is true.', this);
}
if (props.logging) {
throw new core.ValidationError('`logging` is not supported when `express` is true.', this);
}
if (subnetSelection.subnets.length < 3) {
throw new core.ValidationError(`Express cluster requires at least 3 subnets, got ${subnetSelection.subnets.length}`, this);
}
}

const instanceType = props.instanceType
? this.mskInstanceType(props.instanceType)
? this.mskInstanceType(props.instanceType, props.express)
: this.mskInstanceType(
ec2.InstanceType.of(ec2.InstanceClass.M5, ec2.InstanceSize.LARGE),
);
Expand Down Expand Up @@ -598,7 +625,7 @@ export class Cluster extends ClusterBase {
},
}));
}
const loggingInfo = {
const loggingInfo = props.express ? undefined : {
brokerLogs: {
cloudWatchLogs: {
enabled:
Expand Down Expand Up @@ -678,7 +705,7 @@ export class Cluster extends ClusterBase {
securityGroups: this.connections.securityGroups.map(
(group) => group.securityGroupId,
),
storageInfo: {
storageInfo: props.express ? undefined : {
ebsStorageInfo: {
volumeSize: volumeSize,
},
Expand All @@ -691,7 +718,7 @@ export class Cluster extends ClusterBase {
configurationInfo: props.configurationInfo,
enhancedMonitoring: props.monitoring?.clusterMonitoringLevel,
openMonitoring: openMonitoring,
storageMode: props.storageMode,
storageMode: props.express ? undefined : props.storageMode,
loggingInfo: loggingInfo,
clientAuthentication,
});
Expand All @@ -706,8 +733,9 @@ export class Cluster extends ClusterBase {
});
}

private mskInstanceType(instanceType: ec2.InstanceType): string {
return `kafka.${instanceType.toString()}`;
private mskInstanceType(instanceType: ec2.InstanceType, express?:boolean): string {
const prefix = express ? 'express.' : 'kafka.';
return `${prefix}${instanceType.toString()}`;
}

/**
Expand Down
113 changes: 113 additions & 0 deletions packages/@aws-cdk/aws-msk-alpha/test/cluster.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -980,4 +980,117 @@ describe('MSK Cluster', () => {
});
});
});

describe('with express broker ', () => {
let expressStack: core.Stack;
let expressVpc: ec2.Vpc;

beforeEach(() => {
const app = new core.App();
expressStack = new core.Stack(app, 'ExpressTestStack', {
env: {
region: 'us-east-1',
account: '123456789012',
},
});
expressVpc = new ec2.Vpc(expressStack, 'ExpressVpc', {
maxAzs: 3,
});
});

test('create a cluster with express broker', () => {
new msk.Cluster(expressStack, 'ExpressCluster', {
clusterName: 'express-cluster',
kafkaVersion: msk.KafkaVersion.V3_8_X,
vpc: expressVpc,
instanceType: ec2.InstanceType.of(
ec2.InstanceClass.M7G,
ec2.InstanceSize.XLARGE,
),
express: true,
});

Template.fromStack(expressStack).hasResourceProperties('AWS::MSK::Cluster', {
BrokerNodeGroupInfo: { InstanceType: 'express.m7g.xlarge' },
});
});

test('fails when instanceType is not specified', () => {
expect(() => {
new msk.Cluster(expressStack, 'ExpressClusterNoInstanceType', {
clusterName: 'express-cluster',
kafkaVersion: msk.KafkaVersion.V3_8_X,
vpc: expressVpc,
express: true,
});
}).toThrow(
'`instanceType` must also be specified when `express` is true.',
);
});

test('fails when ebsStorageInfo is specified', () => {
expect(() => {
new msk.Cluster(expressStack, 'ExpressClusterWithStorage', {
clusterName: 'express-cluster',
kafkaVersion: msk.KafkaVersion.V3_8_X,
vpc: expressVpc,
instanceType: ec2.InstanceType.of(
ec2.InstanceClass.M7G,
ec2.InstanceSize.XLARGE,
),
express: true,
ebsStorageInfo: { volumeSize: 100 },
});
}).toThrow('`ebsStorageInfo` is not supported when `express` is true.');
});

test('fails when express is true and storageMode is specified', () => {
expect(() => {
new msk.Cluster(expressStack, 'ExpressClusterWithStorageMode', {
clusterName: 'express-cluster',
kafkaVersion: msk.KafkaVersion.V3_8_X,
vpc: expressVpc,
instanceType: ec2.InstanceType.of(
ec2.InstanceClass.M7G,
ec2.InstanceSize.XLARGE,
),
express: true,
storageMode: msk.StorageMode.LOCAL,
});
}).toThrow('`storageMode` is not supported when `express` is true.');
});

test('fails when express is true and logging is specified', () => {
expect(() => {
new msk.Cluster(expressStack, 'ExpressClusterWithLogging', {
clusterName: 'express-cluster',
kafkaVersion: msk.KafkaVersion.V3_8_X,
vpc: expressVpc,
instanceType: ec2.InstanceType.of(
ec2.InstanceClass.M7G,
ec2.InstanceSize.XLARGE,
),
express: true,
logging: {
cloudwatchLogGroup: new logs.LogGroup(expressStack, 'LogGroup'),
},
});
}).toThrow('`logging` is not supported when `express` is true.');
});

test('fails when express is true and less than 3 subnets are provided', () => {
expect(() => {
new msk.Cluster(stack, 'ExpressClusterWithInsufficientSubnets', {
clusterName: 'express-cluster',
kafkaVersion: msk.KafkaVersion.V3_8_X,
vpc,
instanceType: ec2.InstanceType.of(
ec2.InstanceClass.M7G,
ec2.InstanceSize.XLARGE,
),
express: true,
});
}).toThrow('Express cluster requires at least 3 subnets, got 2');
});
});
});

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Large diffs are not rendered by default.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading
Loading