forked from aliyun/ros-templates
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathcustom-resource.yml
102 lines (95 loc) · 2.94 KB
/
custom-resource.yml
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
ROSTemplateFormatVersion: '2015-09-01'
Description:
zh-cn: 创建函数计算服务与函数,实现自定义资源处理逻辑,用于计算X+Y并在特定请求类型下返回结果。
en: Create Function Compute service and functions to implement custom resource processing
logic, designed for computing X+Y and returning the result under specific request
types.
Parameters:
ServiceName:
Type: String
Default: mytest
FunctionName:
Type: String
Default: mytest
Timeout:
Type: Number
Default: 60
X:
Type: Number
Default: 1
Y:
Type: Number
Default: 2
Resources:
Service:
Type: ALIYUN::FC::Service
Properties:
ServiceName:
Ref: ServiceName
Function:
Type: ALIYUN::FC::Function
Properties:
ServiceName:
Fn::GetAtt:
- Service
- ServiceName
FunctionName:
Ref: FunctionName
Handler: index.handler
Runtime: python3
Code:
SourceCode: |
import time
import json
import urllib.request
import logging
def handler(event, context):
logger = logging.getLogger()
logger.setLevel(logging.INFO)
event = json.loads(event)
logger.info('receive request: %s', event)
res_props = event['ResourceProperties']
result = {
'RequestId': event['RequestId'],
'LogicalResourceId': event['LogicalResourceId'],
'StackId': event['StackId'],
'Status': 'SUCCESS',
'PhysicalResourceId': 'MyCustomResourceId',
'Data': {}
}
if event['RequestType'] != 'Delete':
result['Data']['z'] = res_props['X'] + res_props['Y']
headers = {
'Content-type': 'application/json',
'Accept': 'application/json',
'Date': time.strftime('%a, %d %b %Y %X GMT', time.gmtime()),
'User-Agent': 'MyCustomUserAgent'
}
req = urllib.request.Request(event['ResponseURL'], data=json.dumps(result).encode('utf-8'), headers=headers)
resp = urllib.request.urlopen(req)
resp_content = resp.read().decode('utf-8')
logger.info('response: %s', resp_content)
if resp.getcode() != 200:
logger.error('Failed to send response, status code: %s', resp.getcode())
else:
logger.info('Response sent successfully')
SimpleTest:
Type: Custom::Add
Properties:
ServiceToken:
Fn::GetAtt:
- Function
- ARN
Parameters:
X:
Ref: X
Y:
Ref: Y
Timeout:
Ref: Timeout
Outputs:
SimpleTestOutputs:
Value:
Fn::GetAtt:
- SimpleTest
- Outputs