Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 30 additions & 0 deletions packages/@aws-cdk/aws-msk-alpha/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -232,6 +232,36 @@ const cluster = new msk.Cluster(this, 'cluster', {
});
```

## MSK Express Brokers

You can create an MSK cluster with Express Brokers by setting the `brokerType` property to `BrokerType.EXPRESS`. Express Brokers are a low-cost option for development, testing, and workloads that don't require the high availability guarantees of standard MSK cluster.
For more information, see [Amazon MSK Express Brokers](https://docs.aws.amazon.com/msk/latest/developerguide/msk-broker-types-express.html).

**Note:** When using Express Brokers, the following constraints apply:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: It would also be good to include information about the supported broker sizes

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.


- Apache Kafka version must be 3.6.x or 3.8.x
- You must specify the `instanceType`
- The VPC must have at least 3 subnets (across 3 AZs)
- `ebsStorageInfo` is not supported
- `storageMode` is not supported
- `logging` is not supported
- Supported broker sizes: `m7g.xlarge`, `m7g.2xlarge`, `m7g.4xlarge`, `m7g.8xlarge`, `m7g.12xlarge`, `m7g.16xlarge`

```ts
declare const vpc: ec2.Vpc;

const expressCluster = new msk.Cluster(this, 'ExpressCluster', {
clusterName: 'MyExpressCluster',
kafkaVersion: msk.KafkaVersion.V3_8_X,
vpc,
brokerType: msk.BrokerType.EXPRESS,
instanceType: ec2.InstanceType.of(
ec2.InstanceClass.M7G,
ec2.InstanceSize.XLARGE,
),
});
```

## MSK Serverless

You can also use MSK Serverless by using `ServerlessCluster` class.
Expand Down
65 changes: 59 additions & 6 deletions packages/@aws-cdk/aws-msk-alpha/lib/cluster.ts
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,15 @@ export interface ClusterProps {
*/
readonly instanceType?: ec2.InstanceType;

/**
* The broker type for the cluster.
* When set to EXPRESS, the cluster will be created with Express Brokers.
* When this is set to EXPRESS, instanceType must also be specified.
*
* @default BrokerType.STANDARD
*/
readonly brokerType?: BrokerType;

/**
* The AWS security groups to associate with the elastic network interfaces in order to specify who can
* connect to and communicate with the Amazon MSK cluster.
Expand Down Expand Up @@ -199,6 +208,21 @@ export enum StorageMode {
TIERED = 'TIERED',
}

/**
* The broker type for the cluster.
*/
export enum BrokerType {
/**
* Standard brokers provide high-availability guarantees.
*/
STANDARD = 'STANDARD',

/**
* Express brokers are a low-cost option for development, testing, and workloads that don't require the high availability guarantees of standard MSK cluster.
*/
EXPRESS = 'EXPRESS',
}

/**
* The Amazon MSK configuration to use for the cluster.
* Note: There is currently no Cloudformation Resource to create a Configuration
Expand Down Expand Up @@ -502,8 +526,36 @@ export class Cluster extends ClusterBase {
throw new core.ValidationError('EBS volume size should be in the range 1-16384', this);
}

const isExpress = props.brokerType === BrokerType.EXPRESS;

if (isExpress) {
// Validate Kafka version compatibility
const supportedVersions = ['3.6', '3.8'];
const kafkaVersionString = props.kafkaVersion.version;
const isCompatibleVersion = supportedVersions.some(version => kafkaVersionString.includes(version));
if (!isCompatibleVersion) {
throw new core.ValidationError(`Express brokers are only supported with Apache Kafka 3.6.x and 3.8.x, got ${kafkaVersionString}`, this);
}

if (!props.instanceType) {
throw new core.ValidationError('`instanceType` must also be specified when `brokerType` is `BrokerType.EXPRESS`.', this);
}
if (props.ebsStorageInfo) {
throw new core.ValidationError('`ebsStorageInfo` is not supported when `brokerType` is `BrokerType.EXPRESS`.', this);
}
if (props.storageMode) {
throw new core.ValidationError('`storageMode` is not supported when `brokerType` is `BrokerType.EXPRESS`.', this);
}
if (props.logging) {
throw new core.ValidationError('`logging` is not supported when `brokerType` is `BrokerType.EXPRESS`.', this);
}
if (subnetSelection.subnets.length < 3) {
throw new core.ValidationError(`Express cluster requires at least 3 subnets, got ${subnetSelection.subnets.length}`, this);
}
}

const instanceType = props.instanceType
? this.mskInstanceType(props.instanceType)
? this.mskInstanceType(props.instanceType, isExpress)
: this.mskInstanceType(
ec2.InstanceType.of(ec2.InstanceClass.M5, ec2.InstanceSize.LARGE),
);
Expand Down Expand Up @@ -598,7 +650,7 @@ export class Cluster extends ClusterBase {
},
}));
}
const loggingInfo = {
const loggingInfo = isExpress ? undefined : {
brokerLogs: {
cloudWatchLogs: {
enabled:
Expand Down Expand Up @@ -678,7 +730,7 @@ export class Cluster extends ClusterBase {
securityGroups: this.connections.securityGroups.map(
(group) => group.securityGroupId,
),
storageInfo: {
storageInfo: isExpress ? undefined : {
ebsStorageInfo: {
volumeSize: volumeSize,
},
Expand All @@ -691,7 +743,7 @@ export class Cluster extends ClusterBase {
configurationInfo: props.configurationInfo,
enhancedMonitoring: props.monitoring?.clusterMonitoringLevel,
openMonitoring: openMonitoring,
storageMode: props.storageMode,
storageMode: isExpress ? undefined : props.storageMode,
loggingInfo: loggingInfo,
clientAuthentication,
});
Expand All @@ -706,8 +758,9 @@ export class Cluster extends ClusterBase {
});
}

private mskInstanceType(instanceType: ec2.InstanceType): string {
return `kafka.${instanceType.toString()}`;
private mskInstanceType(instanceType: ec2.InstanceType, express?:boolean): string {
const prefix = express ? 'express.' : 'kafka.';
return `${prefix}${instanceType.toString()}`;
}

/**
Expand Down
128 changes: 128 additions & 0 deletions packages/@aws-cdk/aws-msk-alpha/test/cluster.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -981,4 +981,132 @@ describe('MSK Cluster', () => {
});
});
});

describe('with express broker ', () => {
let expressStack: core.Stack;
let expressVpc: ec2.Vpc;

beforeEach(() => {
const app = new core.App();
expressStack = new core.Stack(app, 'ExpressTestStack', {
env: {
region: 'us-east-1',
account: '123456789012',
},
});
expressVpc = new ec2.Vpc(expressStack, 'ExpressVpc', {
maxAzs: 3,
});
});

test('create a cluster with express broker', () => {
new msk.Cluster(expressStack, 'ExpressCluster', {
clusterName: 'express-cluster',
kafkaVersion: msk.KafkaVersion.V3_8_X,
vpc: expressVpc,
instanceType: ec2.InstanceType.of(
ec2.InstanceClass.M7G,
ec2.InstanceSize.XLARGE,
),
brokerType: msk.BrokerType.EXPRESS,
});

Template.fromStack(expressStack).hasResourceProperties('AWS::MSK::Cluster', {
BrokerNodeGroupInfo: { InstanceType: 'express.m7g.xlarge' },
});
});

test('fails when instanceType is not specified', () => {
expect(() => {
new msk.Cluster(expressStack, 'ExpressClusterNoInstanceType', {
clusterName: 'express-cluster',
kafkaVersion: msk.KafkaVersion.V3_8_X,
vpc: expressVpc,
brokerType: msk.BrokerType.EXPRESS,
});
}).toThrow(
'`instanceType` must also be specified when `brokerType` is `BrokerType.EXPRESS`.',
);
});

test('fails when ebsStorageInfo is specified', () => {
expect(() => {
new msk.Cluster(expressStack, 'ExpressClusterWithStorage', {
clusterName: 'express-cluster',
kafkaVersion: msk.KafkaVersion.V3_8_X,
vpc: expressVpc,
instanceType: ec2.InstanceType.of(
ec2.InstanceClass.M7G,
ec2.InstanceSize.XLARGE,
),
brokerType: msk.BrokerType.EXPRESS,
ebsStorageInfo: { volumeSize: 100 },
});
}).toThrow('`ebsStorageInfo` is not supported when `brokerType` is `BrokerType.EXPRESS`.');
});

test('fails when brokerType is EXPRESS and storageMode is specified', () => {
expect(() => {
new msk.Cluster(expressStack, 'ExpressClusterWithStorageMode', {
clusterName: 'express-cluster',
kafkaVersion: msk.KafkaVersion.V3_8_X,
vpc: expressVpc,
instanceType: ec2.InstanceType.of(
ec2.InstanceClass.M7G,
ec2.InstanceSize.XLARGE,
),
brokerType: msk.BrokerType.EXPRESS,
storageMode: msk.StorageMode.LOCAL,
});
}).toThrow('`storageMode` is not supported when `brokerType` is `BrokerType.EXPRESS`.');
});

test('fails when brokerType is EXPRESS and logging is specified', () => {
expect(() => {
new msk.Cluster(expressStack, 'ExpressClusterWithLogging', {
clusterName: 'express-cluster',
kafkaVersion: msk.KafkaVersion.V3_8_X,
vpc: expressVpc,
instanceType: ec2.InstanceType.of(
ec2.InstanceClass.M7G,
ec2.InstanceSize.XLARGE,
),
brokerType: msk.BrokerType.EXPRESS,
logging: {
cloudwatchLogGroup: new logs.LogGroup(expressStack, 'LogGroup'),
},
});
}).toThrow('`logging` is not supported when `brokerType` is `BrokerType.EXPRESS`.');
});

test('fails when brokerType is EXPRESS and less than 3 subnets are provided', () => {
expect(() => {
new msk.Cluster(stack, 'ExpressClusterWithInsufficientSubnets', {
clusterName: 'express-cluster',
kafkaVersion: msk.KafkaVersion.V3_8_X,
vpc,
instanceType: ec2.InstanceType.of(
ec2.InstanceClass.M7G,
ec2.InstanceSize.XLARGE,
),
brokerType: msk.BrokerType.EXPRESS,
});
}).toThrow('Express cluster requires at least 3 subnets, got 2');
});

test('fails when brokerType is EXPRESS and incompatible Kafka version is used', () => {
expect(() => {
new msk.Cluster(expressStack, 'ExpressClusterWithIncompatibleVersion', {
clusterName: 'express-cluster',
kafkaVersion: msk.KafkaVersion.V2_6_1,
vpc: expressVpc,
instanceType: ec2.InstanceType.of(
ec2.InstanceClass.M7G,
ec2.InstanceSize.XLARGE,
),
brokerType: msk.BrokerType.EXPRESS,
});
}).toThrow('Express brokers are only supported with Apache Kafka 3.6.x and 3.8.x, got 2.6.1');
});
});
});

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Large diffs are not rendered by default.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading
Loading