Skip to content

Commit 0a69e5f

Browse files
authored
feat(msk): support Express brokers (#34741)
### Issue # (if applicable) Closes #32923. ### Reason for this change To support express brokers. ### Description of changes Add `express` property and validations about express limitations. ### Describe any new or updated permissions being added N/A ### Description of how you validated changes Add unit tests and an integ test. ### Checklist - [x] My code adheres to the [CONTRIBUTING GUIDE](https://github.com/aws/aws-cdk/blob/main/CONTRIBUTING.md) and [DESIGN GUIDELINES](https://github.com/aws/aws-cdk/blob/main/docs/DESIGN_GUIDELINES.md) ---- *By submitting this pull request, I confirm that my contribution is made under the terms of the Apache-2.0 license*
1 parent 35cdef3 commit 0a69e5f

File tree

13 files changed

+2327
-6
lines changed

13 files changed

+2327
-6
lines changed

packages/@aws-cdk/aws-msk-alpha/README.md

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -232,6 +232,36 @@ const cluster = new msk.Cluster(this, 'cluster', {
232232
});
233233
```
234234

235+
## MSK Express Brokers
236+
237+
You can create an MSK cluster with Express Brokers by setting the `brokerType` property to `BrokerType.EXPRESS`. Express Brokers are a low-cost option for development, testing, and workloads that don't require the high availability guarantees of standard MSK cluster.
238+
For more information, see [Amazon MSK Express Brokers](https://docs.aws.amazon.com/msk/latest/developerguide/msk-broker-types-express.html).
239+
240+
**Note:** When using Express Brokers, the following constraints apply:
241+
242+
- Apache Kafka version must be 3.6.x or 3.8.x
243+
- You must specify the `instanceType`
244+
- The VPC must have at least 3 subnets (across 3 AZs)
245+
- `ebsStorageInfo` is not supported
246+
- `storageMode` is not supported
247+
- `logging` is not supported
248+
- Supported broker sizes: `m7g.xlarge`, `m7g.2xlarge`, `m7g.4xlarge`, `m7g.8xlarge`, `m7g.12xlarge`, `m7g.16xlarge`
249+
250+
```ts
251+
declare const vpc: ec2.Vpc;
252+
253+
const expressCluster = new msk.Cluster(this, 'ExpressCluster', {
254+
clusterName: 'MyExpressCluster',
255+
kafkaVersion: msk.KafkaVersion.V3_8_X,
256+
vpc,
257+
brokerType: msk.BrokerType.EXPRESS,
258+
instanceType: ec2.InstanceType.of(
259+
ec2.InstanceClass.M7G,
260+
ec2.InstanceSize.XLARGE,
261+
),
262+
});
263+
```
264+
235265
## MSK Serverless
236266

237267
You can also use MSK Serverless by using `ServerlessCluster` class.

packages/@aws-cdk/aws-msk-alpha/lib/cluster.ts

Lines changed: 59 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -98,6 +98,15 @@ export interface ClusterProps {
9898
*/
9999
readonly instanceType?: ec2.InstanceType;
100100

101+
/**
102+
* The broker type for the cluster.
103+
* When set to EXPRESS, the cluster will be created with Express Brokers.
104+
* When this is set to EXPRESS, instanceType must also be specified.
105+
*
106+
* @default BrokerType.STANDARD
107+
*/
108+
readonly brokerType?: BrokerType;
109+
101110
/**
102111
* The AWS security groups to associate with the elastic network interfaces in order to specify who can
103112
* connect to and communicate with the Amazon MSK cluster.
@@ -199,6 +208,21 @@ export enum StorageMode {
199208
TIERED = 'TIERED',
200209
}
201210

211+
/**
212+
* The broker type for the cluster.
213+
*/
214+
export enum BrokerType {
215+
/**
216+
* Standard brokers provide high-availability guarantees.
217+
*/
218+
STANDARD = 'STANDARD',
219+
220+
/**
221+
* Express brokers are a low-cost option for development, testing, and workloads that don't require the high availability guarantees of standard MSK cluster.
222+
*/
223+
EXPRESS = 'EXPRESS',
224+
}
225+
202226
/**
203227
* The Amazon MSK configuration to use for the cluster.
204228
* Note: There is currently no Cloudformation Resource to create a Configuration
@@ -502,8 +526,36 @@ export class Cluster extends ClusterBase {
502526
throw new core.ValidationError('EBS volume size should be in the range 1-16384', this);
503527
}
504528

529+
const isExpress = props.brokerType === BrokerType.EXPRESS;
530+
531+
if (isExpress) {
532+
// Validate Kafka version compatibility
533+
const supportedVersions = ['3.6', '3.8'];
534+
const kafkaVersionString = props.kafkaVersion.version;
535+
const isCompatibleVersion = supportedVersions.some(version => kafkaVersionString.includes(version));
536+
if (!isCompatibleVersion) {
537+
throw new core.ValidationError(`Express brokers are only supported with Apache Kafka 3.6.x and 3.8.x, got ${kafkaVersionString}`, this);
538+
}
539+
540+
if (!props.instanceType) {
541+
throw new core.ValidationError('`instanceType` must also be specified when `brokerType` is `BrokerType.EXPRESS`.', this);
542+
}
543+
if (props.ebsStorageInfo) {
544+
throw new core.ValidationError('`ebsStorageInfo` is not supported when `brokerType` is `BrokerType.EXPRESS`.', this);
545+
}
546+
if (props.storageMode) {
547+
throw new core.ValidationError('`storageMode` is not supported when `brokerType` is `BrokerType.EXPRESS`.', this);
548+
}
549+
if (props.logging) {
550+
throw new core.ValidationError('`logging` is not supported when `brokerType` is `BrokerType.EXPRESS`.', this);
551+
}
552+
if (subnetSelection.subnets.length < 3) {
553+
throw new core.ValidationError(`Express cluster requires at least 3 subnets, got ${subnetSelection.subnets.length}`, this);
554+
}
555+
}
556+
505557
const instanceType = props.instanceType
506-
? this.mskInstanceType(props.instanceType)
558+
? this.mskInstanceType(props.instanceType, isExpress)
507559
: this.mskInstanceType(
508560
ec2.InstanceType.of(ec2.InstanceClass.M5, ec2.InstanceSize.LARGE),
509561
);
@@ -598,7 +650,7 @@ export class Cluster extends ClusterBase {
598650
},
599651
}));
600652
}
601-
const loggingInfo = {
653+
const loggingInfo = isExpress ? undefined : {
602654
brokerLogs: {
603655
cloudWatchLogs: {
604656
enabled:
@@ -678,7 +730,7 @@ export class Cluster extends ClusterBase {
678730
securityGroups: this.connections.securityGroups.map(
679731
(group) => group.securityGroupId,
680732
),
681-
storageInfo: {
733+
storageInfo: isExpress ? undefined : {
682734
ebsStorageInfo: {
683735
volumeSize: volumeSize,
684736
},
@@ -691,7 +743,7 @@ export class Cluster extends ClusterBase {
691743
configurationInfo: props.configurationInfo,
692744
enhancedMonitoring: props.monitoring?.clusterMonitoringLevel,
693745
openMonitoring: openMonitoring,
694-
storageMode: props.storageMode,
746+
storageMode: isExpress ? undefined : props.storageMode,
695747
loggingInfo: loggingInfo,
696748
clientAuthentication,
697749
});
@@ -706,8 +758,9 @@ export class Cluster extends ClusterBase {
706758
});
707759
}
708760

709-
private mskInstanceType(instanceType: ec2.InstanceType): string {
710-
return `kafka.${instanceType.toString()}`;
761+
private mskInstanceType(instanceType: ec2.InstanceType, express?:boolean): string {
762+
const prefix = express ? 'express.' : 'kafka.';
763+
return `${prefix}${instanceType.toString()}`;
711764
}
712765

713766
/**

packages/@aws-cdk/aws-msk-alpha/test/cluster.test.ts

Lines changed: 128 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -981,4 +981,132 @@ describe('MSK Cluster', () => {
981981
});
982982
});
983983
});
984+
985+
describe('with express broker ', () => {
986+
let expressStack: core.Stack;
987+
let expressVpc: ec2.Vpc;
988+
989+
beforeEach(() => {
990+
const app = new core.App();
991+
expressStack = new core.Stack(app, 'ExpressTestStack', {
992+
env: {
993+
region: 'us-east-1',
994+
account: '123456789012',
995+
},
996+
});
997+
expressVpc = new ec2.Vpc(expressStack, 'ExpressVpc', {
998+
maxAzs: 3,
999+
});
1000+
});
1001+
1002+
test('create a cluster with express broker', () => {
1003+
new msk.Cluster(expressStack, 'ExpressCluster', {
1004+
clusterName: 'express-cluster',
1005+
kafkaVersion: msk.KafkaVersion.V3_8_X,
1006+
vpc: expressVpc,
1007+
instanceType: ec2.InstanceType.of(
1008+
ec2.InstanceClass.M7G,
1009+
ec2.InstanceSize.XLARGE,
1010+
),
1011+
brokerType: msk.BrokerType.EXPRESS,
1012+
});
1013+
1014+
Template.fromStack(expressStack).hasResourceProperties('AWS::MSK::Cluster', {
1015+
BrokerNodeGroupInfo: { InstanceType: 'express.m7g.xlarge' },
1016+
});
1017+
});
1018+
1019+
test('fails when instanceType is not specified', () => {
1020+
expect(() => {
1021+
new msk.Cluster(expressStack, 'ExpressClusterNoInstanceType', {
1022+
clusterName: 'express-cluster',
1023+
kafkaVersion: msk.KafkaVersion.V3_8_X,
1024+
vpc: expressVpc,
1025+
brokerType: msk.BrokerType.EXPRESS,
1026+
});
1027+
}).toThrow(
1028+
'`instanceType` must also be specified when `brokerType` is `BrokerType.EXPRESS`.',
1029+
);
1030+
});
1031+
1032+
test('fails when ebsStorageInfo is specified', () => {
1033+
expect(() => {
1034+
new msk.Cluster(expressStack, 'ExpressClusterWithStorage', {
1035+
clusterName: 'express-cluster',
1036+
kafkaVersion: msk.KafkaVersion.V3_8_X,
1037+
vpc: expressVpc,
1038+
instanceType: ec2.InstanceType.of(
1039+
ec2.InstanceClass.M7G,
1040+
ec2.InstanceSize.XLARGE,
1041+
),
1042+
brokerType: msk.BrokerType.EXPRESS,
1043+
ebsStorageInfo: { volumeSize: 100 },
1044+
});
1045+
}).toThrow('`ebsStorageInfo` is not supported when `brokerType` is `BrokerType.EXPRESS`.');
1046+
});
1047+
1048+
test('fails when brokerType is EXPRESS and storageMode is specified', () => {
1049+
expect(() => {
1050+
new msk.Cluster(expressStack, 'ExpressClusterWithStorageMode', {
1051+
clusterName: 'express-cluster',
1052+
kafkaVersion: msk.KafkaVersion.V3_8_X,
1053+
vpc: expressVpc,
1054+
instanceType: ec2.InstanceType.of(
1055+
ec2.InstanceClass.M7G,
1056+
ec2.InstanceSize.XLARGE,
1057+
),
1058+
brokerType: msk.BrokerType.EXPRESS,
1059+
storageMode: msk.StorageMode.LOCAL,
1060+
});
1061+
}).toThrow('`storageMode` is not supported when `brokerType` is `BrokerType.EXPRESS`.');
1062+
});
1063+
1064+
test('fails when brokerType is EXPRESS and logging is specified', () => {
1065+
expect(() => {
1066+
new msk.Cluster(expressStack, 'ExpressClusterWithLogging', {
1067+
clusterName: 'express-cluster',
1068+
kafkaVersion: msk.KafkaVersion.V3_8_X,
1069+
vpc: expressVpc,
1070+
instanceType: ec2.InstanceType.of(
1071+
ec2.InstanceClass.M7G,
1072+
ec2.InstanceSize.XLARGE,
1073+
),
1074+
brokerType: msk.BrokerType.EXPRESS,
1075+
logging: {
1076+
cloudwatchLogGroup: new logs.LogGroup(expressStack, 'LogGroup'),
1077+
},
1078+
});
1079+
}).toThrow('`logging` is not supported when `brokerType` is `BrokerType.EXPRESS`.');
1080+
});
1081+
1082+
test('fails when brokerType is EXPRESS and less than 3 subnets are provided', () => {
1083+
expect(() => {
1084+
new msk.Cluster(stack, 'ExpressClusterWithInsufficientSubnets', {
1085+
clusterName: 'express-cluster',
1086+
kafkaVersion: msk.KafkaVersion.V3_8_X,
1087+
vpc,
1088+
instanceType: ec2.InstanceType.of(
1089+
ec2.InstanceClass.M7G,
1090+
ec2.InstanceSize.XLARGE,
1091+
),
1092+
brokerType: msk.BrokerType.EXPRESS,
1093+
});
1094+
}).toThrow('Express cluster requires at least 3 subnets, got 2');
1095+
});
1096+
1097+
test('fails when brokerType is EXPRESS and incompatible Kafka version is used', () => {
1098+
expect(() => {
1099+
new msk.Cluster(expressStack, 'ExpressClusterWithIncompatibleVersion', {
1100+
clusterName: 'express-cluster',
1101+
kafkaVersion: msk.KafkaVersion.V2_6_1,
1102+
vpc: expressVpc,
1103+
instanceType: ec2.InstanceType.of(
1104+
ec2.InstanceClass.M7G,
1105+
ec2.InstanceSize.XLARGE,
1106+
),
1107+
brokerType: msk.BrokerType.EXPRESS,
1108+
});
1109+
}).toThrow('Express brokers are only supported with Apache Kafka 3.6.x and 3.8.x, got 2.6.1');
1110+
});
1111+
});
9841112
});

packages/@aws-cdk/aws-msk-alpha/test/integ.cluster-express.js.snapshot/MskExpressClusterDefaultTestDeployAssert83DE4F95.assets.json

Lines changed: 20 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

packages/@aws-cdk/aws-msk-alpha/test/integ.cluster-express.js.snapshot/MskExpressClusterDefaultTestDeployAssert83DE4F95.template.json

Lines changed: 36 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

packages/@aws-cdk/aws-msk-alpha/test/integ.cluster-express.js.snapshot/asset.4e81d1590504dce642b87042b0999db745220956ae69e7441a66475972723162/index.js

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

packages/@aws-cdk/aws-msk-alpha/test/integ.cluster-express.js.snapshot/aws-cdk-msk-express-integ.assets.json

Lines changed: 36 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

0 commit comments

Comments
 (0)