forked from radixeng/guidance-for-carbon-data-lake-on-aws
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathconstruct-calculator.ts
173 lines (159 loc) · 6.3 KB
/
construct-calculator.ts
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
import { Duration, Stack, StackProps, RemovalPolicy } from 'aws-cdk-lib'
import { aws_dynamodb as dynamodb } from 'aws-cdk-lib'
import { aws_lambda as lambda } from 'aws-cdk-lib'
import { aws_s3 as s3 } from 'aws-cdk-lib'
import { custom_resources as cr } from 'aws-cdk-lib'
import emission_factors from './emissions_factor_model_sample.json'
import * as path from 'path'
import { Construct } from 'constructs'
const DDB_BATCH_WRITE_ITEM_CHUNK_SIZE = 25
export interface CalculatorProps extends StackProps {
transformedBucket: s3.Bucket
enrichedBucket: s3.Bucket
}
export class Calculator extends Construct {
public readonly calculatorOutputTable: dynamodb.Table
public readonly calculatorLambda: lambda.Function
constructor(scope: Construct, id: string, props: CalculatorProps) {
super(scope, id)
const emissionsFactorReferenceTable = new dynamodb.Table(this, 'cdlEmissionsFactorReferenceTable', {
partitionKey: { name: 'category', type: dynamodb.AttributeType.STRING },
sortKey: { name: 'activity', type: dynamodb.AttributeType.STRING },
removalPolicy: RemovalPolicy.DESTROY,
billingMode: dynamodb.BillingMode.PAY_PER_REQUEST,
pointInTimeRecovery: true
})
// Define DynamoDB Table for calculator output
this.calculatorOutputTable = new dynamodb.Table(this, 'cdlCalculatorOutputTable', {
partitionKey: { name: 'activity_event_id', type: dynamodb.AttributeType.STRING },
removalPolicy: RemovalPolicy.DESTROY,
billingMode: dynamodb.BillingMode.PAY_PER_REQUEST,
pointInTimeRecovery: true
})
this.calculatorLambda = new lambda.Function(this, 'cdlCalculatorHandler', {
runtime: lambda.Runtime.PYTHON_3_9,
code: lambda.Code.fromAsset(path.join(__dirname, './lambda')),
handler: 'calculatorLambda.lambda_handler',
timeout: Duration.minutes(5),
environment: {
EMISSIONS_FACTOR_TABLE_NAME: emissionsFactorReferenceTable.tableName,
CALCULATOR_OUTPUT_TABLE_NAME: this.calculatorOutputTable.tableName,
TRANSFORMED_BUCKET_NAME: props.transformedBucket.bucketName,
ENRICHED_BUCKET_NAME: props.enrichedBucket.bucketName,
},
})
emissionsFactorReferenceTable.grantReadData(this.calculatorLambda)
this.calculatorOutputTable.grantWriteData(this.calculatorLambda)
props.transformedBucket.grantRead(this.calculatorLambda)
props.enrichedBucket.grantWrite(this.calculatorLambda)
checkDuplicatedEmissionFactors()
//We populate the Emission Factors DB with data from a JSON file
//We split into chunks because BatchWriteItem has a limitation of 25 items per batch
//See https://docs.aws.amazon.com/amazondynamodb/latest/APIReference/API_BatchWriteItem.html
for (let i = 0; i < emission_factors.length; i += DDB_BATCH_WRITE_ITEM_CHUNK_SIZE) {
const chunk = emission_factors.slice(i, i + DDB_BATCH_WRITE_ITEM_CHUNK_SIZE)
new cr.AwsCustomResource(this, `initcdlEmissionsFactorReferenceTable${i}`, {
onCreate: {
service: 'DynamoDB',
action: 'batchWriteItem',
parameters: {
RequestItems: {
[emissionsFactorReferenceTable.tableName]: this.generateBatch(chunk),
},
},
physicalResourceId: cr.PhysicalResourceId.of(emissionsFactorReferenceTable.tableName + '_initialization'),
},
policy: cr.AwsCustomResourcePolicy.fromSdkCalls({ resources: [emissionsFactorReferenceTable.tableArn] }),
})
}
}
private generateBatch = (chunk: IGhgEmissionFactor[]): { PutRequest: { Item: IDdbEmissionFactor } }[] => {
const result: { PutRequest: { Item: IDdbEmissionFactor } }[] = []
chunk.forEach(emission_factor => {
result.push({ PutRequest: { Item: this.generateItem(emission_factor) } })
})
return result
}
private generateItem = (emission_factor: IGhgEmissionFactor): IDdbEmissionFactor => {
const coefficients = emission_factor.emissions_factor_standards.ghg.coefficients
return {
activity: { S: emission_factor.activity },
category: { S: emission_factor.category },
scope: { N: emission_factor.scope },
emissions_factor_standards: {
M: {
ghg: {
M: {
coefficients: {
M: {
co2_factor: { S: coefficients.co2_factor },
ch4_factor: { S: coefficients.ch4_factor },
n2o_factor: { S: coefficients.n2o_factor },
AR4_kgco2e: { S: coefficients['AR4-kgco2e'] },
AR5_kgco2e: { S: coefficients['AR5-kgco2e'] },
units: { S: coefficients.units },
},
},
last_updated: { S: emission_factor.emissions_factor_standards.ghg.last_updated },
source: { S: emission_factor.emissions_factor_standards.ghg.source },
source_origin: { S: emission_factor.emissions_factor_standards.ghg.source_origin },
},
},
},
},
}
}
}
interface IDdbEmissionFactor {
category: { S: string }
activity: { S: string }
scope: { N: string }
emissions_factor_standards: {
M: {
ghg: {
M: {
coefficients: {
M: {
co2_factor: { S: string }
ch4_factor: { S: string }
n2o_factor: { S: string }
AR4_kgco2e: { S: string }
AR5_kgco2e: { S: string }
units: { S: string }
}
}
last_updated: { S: string }
source: { S: string }
source_origin: { S: string }
}
}
}
}
}
interface IGhgEmissionFactor {
category: string
activity: string
scope: string
emissions_factor_standards: {
ghg: {
coefficients: {
co2_factor: string
ch4_factor: string
n2o_factor: string
'AR4-kgco2e': string
'AR5-kgco2e': string
units: string
}
last_updated: string
source: string
source_origin: string
}
}
}
function checkDuplicatedEmissionFactors() {
const categories_and_activities = emission_factors.map(factor => factor.category + '_' + factor.activity)
const duplicates = categories_and_activities.filter((item, index) => categories_and_activities.indexOf(item) != index)
if (duplicates.length > 0) {
throw Error('duplicates found in Emission Factors: ' + duplicates)
}
}