Skip to content

Commit

Permalink
Add initial FlinkApplication construct
Browse files Browse the repository at this point in the history
  • Loading branch information
Mitch Lloyd committed Jan 25, 2021
1 parent 12ef0f2 commit c99efb3
Show file tree
Hide file tree
Showing 13 changed files with 463 additions and 266 deletions.
2 changes: 1 addition & 1 deletion packages/@aws-cdk/aws-kinesisanalyticsv2/LICENSE
Original file line number Diff line number Diff line change
Expand Up @@ -186,7 +186,7 @@
same "printed page" as the copyright notice for easier
identification within third-party archives.

Copyright 2018-2020 Amazon.com, Inc. or its affiliates. All Rights Reserved.
Copyright 2018-2021 Amazon.com, Inc. or its affiliates. All Rights Reserved.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
Expand Down
2 changes: 1 addition & 1 deletion packages/@aws-cdk/aws-kinesisanalyticsv2/NOTICE
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@
AWS Cloud Development Kit (AWS CDK)
Copyright 2018-2020 Amazon.com, Inc. or its affiliates. All Rights Reserved.
Copyright 2018-2021 Amazon.com, Inc. or its affiliates. All Rights Reserved.
2 changes: 2 additions & 0 deletions packages/@aws-cdk/aws-kinesisanalyticsv2/README.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
# Kinesis Analytics V2

The `@aws-cdk/aws-kinesisanalyticsv2` package provides constructs for
creating Kinesis Data Analytics applications.

Expand Down
99 changes: 99 additions & 0 deletions packages/@aws-cdk/aws-kinesisanalyticsv2/lib/application-code.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
import * as s3 from '@aws-cdk/aws-s3';
import * as s3_assets from '@aws-cdk/aws-s3-assets';
import { Construct } from '@aws-cdk/core';

export abstract class ApplicationCode {
public static fromBucket(bucket: s3.IBucket, fileKey: string, objectVersion?: string): BucketApplicationCode {
return new BucketApplicationCode({
bucket,
fileKey,
objectVersion,
});
}

public static fromAsset(path: string, options?: s3_assets.AssetOptions): AssetApplicationCode {
return new AssetApplicationCode(path, options);
}

public abstract bind(scope: Construct): CodeConfiguration;
}

interface BucketApplicationCodeProps {
bucket: s3.IBucket;
fileKey: string;
objectVersion?: string;
}

class BucketApplicationCode extends ApplicationCode {
public readonly bucket: s3.IBucket;
public readonly fileKey: string;
public readonly objectVersion?: string;

constructor(props: BucketApplicationCodeProps) {
super();
this.bucket = props.bucket;
this.fileKey = props.fileKey;
this.objectVersion = props.objectVersion;
}

public bind(_scope: Construct): CodeConfiguration {
return {
codeContent: {
s3ContentLocation: {
bucketArn: this.bucket.bucketArn,
fileKey: this.fileKey,
objectVersion: this.objectVersion,
},
},
codeContentType: 'ZIPFILE',
};
}
}

class AssetApplicationCode extends ApplicationCode {
private readonly path: string;
private readonly options?: s3_assets.AssetOptions;
private _asset?: s3_assets.Asset;

constructor(path: string, options?: s3_assets.AssetOptions) {
super();
this.path = path;
this.options = options;
}

public bind(scope: Construct): CodeConfiguration {
this._asset = new s3_assets.Asset(scope, 'Code', {
path: this.path,
...this.options,
});

if (!this._asset.isZipArchive) {
throw new Error(`Asset must be a .zip file or a directory (${this.path})`);
}

return {
codeContent: {
s3ContentLocation: {
bucketArn: this._asset.bucket.bucketArn,
fileKey: this._asset.s3ObjectKey,
},
},
codeContentType: 'ZIPFILE',
};
}

get asset(): s3_assets.Asset | undefined {
return this._asset;
}
}

interface CodeConfiguration {
codeContent: {
s3ContentLocation: {
bucketArn: string;
fileKey: string;
objectVersion?: string;
},
},
codeContentType: 'ZIPFILE',
}
108 changes: 60 additions & 48 deletions packages/@aws-cdk/aws-kinesisanalyticsv2/lib/flink-application.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
import * as iam from '@aws-cdk/aws-iam';
import * as core from '@aws-cdk/core';
import { Construct } from 'constructs';
import { ApplicationCode } from './application-code';
import { CfnApplication } from './kinesisanalyticsv2.generated';
import { flinkApplicationArnComponents } from './private/example-resource-common';

export interface IFlinkApplication extends core.IResource, iam.IGrantable {
Expand All @@ -9,14 +11,14 @@ export interface IFlinkApplication extends core.IResource, iam.IGrantable {
*
* @attribute
*/
readonly flinkApplicationArn: string;
readonly applicationArn: string;

/**
* The name of the Flink application.
*
* @attribute
*/
readonly flinkApplicationName: string;
readonly applicationName: string;

/**
* The application IAM role.
Expand All @@ -26,25 +28,32 @@ export interface IFlinkApplication extends core.IResource, iam.IGrantable {
/**
* Convenience method for adding a policy statement to the application role.
*/
addToRolePolicy(policyStatement: iam.PolicyStatement): boolean;
addToPrincipalPolicy(policyStatement: iam.PolicyStatement): boolean;
}

export class FlinkRuntime {
public static FLINK_1_6 = new FlinkRuntime('FLINK-1_6');
public static FLINK_1_8 = new FlinkRuntime('FLINK-1_8');
public static FLINK_1_11 = new FlinkRuntime('FLINK-1_11');
public constructor(public readonly value: string) {}
}

/**
* Implements the functionality of the Flink applications that is shared
* between CDK created and imported IFlinkApplications.
* Implements the functionality shared between CDK created and imported
* IFlinkApplications.
*/
abstract class FlinkApplicationBase extends core.Resource implements IFlinkApplication {
public abstract readonly flinkApplicationArn: string;
public abstract readonly flinkApplicationName: string;
public abstract readonly applicationArn: string;
public abstract readonly applicationName: string;
public abstract readonly role?: iam.IRole;

// Implement iam.IGrantable interface
public abstract readonly grantPrincipal: iam.IPrincipal;

/** Implement the convenience {@link IFlinkApplication.addToRolePolicy} method. */
public addToRolePolicy(policyStatement: iam.PolicyStatement): boolean {
/** Implement the convenience {@link IFlinkApplication.addToPrincipalPolicy} method. */
public addToPrincipalPolicy(policyStatement: iam.PolicyStatement): boolean {
if (this.role) {
this.role.addToPolicy(policyStatement);
this.role.addToPrincipalPolicy(policyStatement);
return true;
}

Expand All @@ -54,16 +63,29 @@ abstract class FlinkApplicationBase extends core.Resource implements IFlinkAppli

/**
* Props for creating a FlinkApplication construct.
* @resource AWS::KinesisAnalyticsV2::Application
*/
export interface FlinkApplicationProps {
/**
* A name for your FlinkApplication that is unique to an AWS account.
*
* @default - CloudFormation-generated name
*/
readonly applicationName?: string;

/**
* A role to use to grant permissions to your application. Omitting this
* property and using the default role is recommended.
* The Flink version to use for this application.
*/
readonly runtime: FlinkRuntime;

/**
* The Flink code asset to run.
*/
readonly code: ApplicationCode;

/**
* A role to use to grant permissions to your application. Prefer omitting
* this property and using the default role.
*
* @default - a new Role will be created
*/
Expand All @@ -77,31 +99,35 @@ export interface FlinkApplicationProps {
readonly removalPolicy?: core.RemovalPolicy;
}

interface Attributes {
applicationName: string;
applicationArn: string;
}

/**
* The L2 construct for Flink Kinesis Data Applications.
*
* @resource AWS::KinesisAnalyticsV2::Application
* @experimental
*/
export class FlinkApplication extends core.Resource {
export class FlinkApplication extends FlinkApplicationBase {
/**
* Import an existing Flink application, defined outside of the CDK code by name.
* Import an existing Flink application, defined outside of the CDK code.
*/
public static fromFlinkApplicationName(scope: Construct, id: string, flinkApplicationName: string): IFlinkApplication {
public static fromAttributes(scope: Construct, id: string, attributes: Attributes): IFlinkApplication {
class Import extends FlinkApplicationBase {
// Imported flink applications have no associated role or grantPrincipal
public readonly role = undefined;
public readonly grantPrincipal = new iam.UnknownPrincipal({ resource: this });

public readonly flinkApplicationName = flinkApplicationName;
public readonly flinkApplicationArn = core.Stack.of(scope)
.formatArn(flinkApplicationArnComponents(flinkApplicationName));
public readonly applicationName = attributes.applicationName;
public readonly applicationArn = attributes.applicationArn;
}

return new Import(scope, id);
}

public readonly flinkApplicationArn: string;
public readonly flinkApplicationName: string;
public readonly applicationArn: string;
public readonly applicationName: string;

// Role must be optional for JSII compatibility
public readonly role?: iam.IRole;
Expand Down Expand Up @@ -132,27 +158,24 @@ export class FlinkApplication extends core.Resource {
// but since this is just an example,
// we'll use CloudFormation wait conditions.

// Remember to always, always, pass 'this' as the first argument
// when creating any constructs inside your L2s!
// This guarantees that they get scoped correctly,
// and the CDK will make sure their locally-unique identifiers
// are globally unique, which makes your L2 compose.
const waitConditionHandle = new core.CfnWaitConditionHandle(this, 'WaitConditionHandle');

// The 'main' L1 you create should always have the logical ID 'Resource'.
// This is important, so that the ConstructNode.defaultChild method works correctly.
// The local variable representing the L1 is often called 'resource' as well.
const resource = new core.CfnWaitCondition(this, 'Resource', {
count: 0,
handle: waitConditionHandle.ref,
timeout: '10',
this.role = props.role ?? new iam.Role(this, 'Role', {
assumedBy: new iam.ServicePrincipal('kinesisanalytics.amazonaws.com'),
});
this.grantPrincipal = this.role;

const resource = new CfnApplication(this, 'Resource', {
runtimeEnvironment: props.runtime.value,
serviceExecutionRole: this.role.roleArn,
applicationConfiguration: {
applicationCodeConfiguration: props.code.bind(this),
},
});

// The resource's physical name and ARN are set using
// some protected methods from the Resource superclass
// that correctly resolve when your L2 is used in another resource
// that is in a different AWS region or account than this one.
this.flinkApplicationName = this.getResourceNameAttribute(
this.applicationName = this.getResourceNameAttribute(
// A lot of the CloudFormation resources return their physical name
// when the Ref function is used on them.
// If your resource is like that, simply pass 'resource.ref' here.
Expand All @@ -164,7 +187,7 @@ export class FlinkApplication extends core.Resource {
// you can use Fn::Select and Fn::Split to take out the part after the '/' from the ARN:
core.Fn.select(1, core.Fn.split('/', resource.ref)),
);
this.flinkApplicationArn = this.getResourceArnAttribute(
this.applicationArn = this.getResourceArnAttribute(
// A lot of the L1 classes have an 'attrArn' property -
// if yours does, use it here.
// However, if it doesn't,
Expand All @@ -175,18 +198,7 @@ export class FlinkApplication extends core.Resource {
// always use the protected physicalName property for this second argument
flinkApplicationArnComponents(this.physicalName));

// if a role wasn't passed, create one
const role = props.role || new iam.Role(this, 'Role', {
// of course, fill your correct service principal here
assumedBy: new iam.ServicePrincipal('cloudformation.amazonaws.com'),
});
this.role = role;
// we need this to correctly implement the iam.IGrantable interface
this.grantPrincipal = role;

// this is how you apply the removal policy
resource.applyRemovalPolicy(props.removalPolicy, {
// this is the default to apply if props.removalPolicy is undefined
default: core.RemovalPolicy.DESTROY,
});
}
Expand Down
2 changes: 2 additions & 0 deletions packages/@aws-cdk/aws-kinesisanalyticsv2/lib/index.ts
Original file line number Diff line number Diff line change
@@ -1 +1,3 @@
export * from './application-code';
export * from './flink-application';

13 changes: 9 additions & 4 deletions packages/@aws-cdk/aws-kinesisanalyticsv2/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
},
"python": {
"distName": "aws-cdk.aws-kinesisanalyticsv2",
"module": "aws_cdk.aws-kinesisanalyticsv2",
"module": "aws_cdk.aws_kinesisanalyticsv2",
"classifiers": [
"Framework :: AWS CDK",
"Framework :: AWS CDK :: 1"
Expand All @@ -47,10 +47,11 @@
"pkglint": "pkglint -f",
"package": "cdk-package",
"awslint": "cdk-awslint",
"build+test": "npm run build && npm test",
"build+test+package": "npm run build+test && npm run package",
"build+test": "yarn build && yarn test",
"build+test+package": "yarn build+test && yarn package",
"compat": "cdk-compat",
"rosetta:extract": "yarn --silent jsii-rosetta extract"
"rosetta:extract": "yarn --silent jsii-rosetta extract",
"cfn2ts": "cfn2ts"
},
"keywords": [
"aws",
Expand All @@ -75,12 +76,16 @@
"dependencies": {
"@aws-cdk/core": "0.0.0",
"@aws-cdk/aws-iam": "0.0.0",
"@aws-cdk/aws-s3": "0.0.0",
"@aws-cdk/aws-s3-assets": "0.0.0",
"constructs": "^3.2.0"
},
"homepage": "https://github.com/aws/aws-cdk",
"peerDependencies": {
"@aws-cdk/core": "0.0.0",
"@aws-cdk/aws-iam": "0.0.0",
"@aws-cdk/aws-s3": "0.0.0",
"@aws-cdk/aws-s3-assets": "0.0.0",
"constructs": "^3.2.0"
},
"engines": {
Expand Down
Empty file.
Loading

0 comments on commit c99efb3

Please sign in to comment.