-
Notifications
You must be signed in to change notification settings - Fork 0
/
adjust
executable file
·224 lines (186 loc) · 10.6 KB
/
adjust
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
#!/usr/bin/env python3
import codecs
import os
import requests
import subprocess
import time
import boto3 # AWS SDK
import yaml
from adjust import Adjust
DESC = "EC2 Server adjustment (via s3) driver for Opsani Optune"
VERSION = "0.0.5"
HAS_CANCEL = False
CONFIG_KEY = 'ec2s3'
VALID_CONFIG_FILE_SETTINGS = ('encoder', 's3_bucket', 's3_path', 'describe_port', 'describe_endpoint', 'describe_encoding')
config_path = os.environ.get('OPTUNE_CONFIG', './config.yaml')
class Ec2S3Driver(Adjust):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
if not (self.args.info or self.args.version):
self.config = self.load_config()
self.check_config(self.config)
self.s3 = boto3.client('s3') # credentials are retreieved from env vars, /root/.aws, or aws ec2 meta-data api
@staticmethod
def load_config():
try:
config = yaml.safe_load(open(config_path))
except yaml.YAMLError as e:
raise Exception('Could not parse config file located at "{}". '
'Please check its contents.'.format(config_path)) from e
return config
@staticmethod
def check_config(config):
if not os.path.isdir('/root/.aws') \
and not os.path.isdir('{}/.aws'.format(os.path.expanduser('~'))) \
and not os.environ.get('AWS_CONTAINER_CREDENTIALS_RELATIVE_URI') \
and not subprocess.run(['curl', '-s', 'http://169.254.169.254/latest/meta-data/iam/security-credentials/'], stdout=subprocess.PIPE).stdout:
assert os.environ.get('AWS_ACCESS_KEY_ID', '') != '' and os.environ.get('AWS_SECRET_ACCESS_KEY', '') != '', \
'No Amazon S3 access key was provided. Please refer to README.md.'
assert os.environ.get('AWS_DEFAULT_REGION', '') != '', \
'AWS access key specified with env vars but no AWS_DEFAULT_REGION was set. Please refer to README.md.'
c = config
assert c and c.get(CONFIG_KEY, None), \
'EC2 S3 Server Configuration was not provided under key "{}" in file at "{}". Please refer to README.md' \
.format(CONFIG_KEY, config_path)
top_level = c[CONFIG_KEY]
assert len(top_level) > 0, \
'No component configurations were provided under key "{}" in "{}". Please refer to README.md' \
.format(CONFIG_KEY, config_path)
assert isinstance(top_level, dict), \
'EC2 S3 Server Configuration was malformed under key "{}" in "{}". Expected dict, found {}' \
.format(CONFIG_KEY, config_path, top_level.__class__.__name__)
for component_key, component_val in top_level.items():
error_preamble = '{} Component {} in "{}":'.format(CONFIG_KEY, component_key, config_path)
assert isinstance(component_val, dict), '{} Invalid config data. Expected dict, found {}' \
.format(error_preamble, component_val.__class__.__name__)
unsupported_keys = component_val.keys() - VALID_CONFIG_FILE_SETTINGS
assert len(unsupported_keys) < 1, '{} config was malformed, found unsupported keys: {}' \
.format(error_preamble, unsupported_keys)
assert component_val.get('encoder'), '{} Section "encoder" was not provided. Please refer to README.md.' \
.format(error_preamble)
s3b = component_val.get('s3_bucket')
assert s3b and isinstance(s3b, str), '{} Amazon S3 bucket name must be provided and must be a string. Found: {}' \
.format(error_preamble, s3b)
s3p = component_val.get('s3_path')
assert s3p and isinstance(s3p, str), '{} Amazon S3 path must be provided and must be a string. Found: {}' \
.format(error_preamble, s3p)
dprt = component_val.get('describe_port')
if dprt:
assert (isinstance(dprt, str) and (0 < int(dprt) <= 65535)) or (isinstance(dprt, int) and (0 < dprt <= 65535)), \
'{} Describe port must be a valid port number when provided. Found {}' \
.format(error_preamble, dprt)
de = component_val.get('describe_endpoint')
if de:
assert isinstance(de, str), '{} Describe endpoint must be a string containing path to describe when provided. Found {}' \
.format(error_preamble, de)
denc = component_val.get('describe_encoding')
if denc:
try:
codecs.lookup(denc)
except Exception as e:
raise Exception('{} Specified encoding "{}" was invalid'.format(error_preamble, denc)) from e
def query(self):
retVal = {'application':{'components': {}}}
for component_key, component_conf in self.config[CONFIG_KEY].items():
try:
response = self.s3.get_object(Bucket=component_conf['s3_bucket'], Key=component_conf['s3_path'])
describe_string = response['Body'].read().decode('utf-8')
except self.s3.exceptions.NoSuchKey as e:
self.print_json_error(
e.__class__.__name__,
"failure",
str(e)
)
describe_string = ''
retVal['application']['components'][component_key] = { 'settings':
self.encode_describe_if_needed(None, component_conf, describe_string)
}
return retVal
def encode_describe_if_needed(self, name, data, value):
"""
Overrides adjust.py base method
Takes:
* a setting name (not used since this driver requires an encoder)
* setting data (as defined in the config)
* value (as returned by the underlying infrastructure)
Driver config must specify an encoder to be used, the returned "value" ( in
<setting_data>) will be decoded from the value param by the encoder specified in the config.
Returns a dict in the format { <setting_name> : { <setting_data> }},
suitable for returning as a description or using for validation of adjustment
depending on how the encoder interprets the value param
(Eg. dotnet encoder will not return defaults if value is valid json).
"""
import encoders.base as enc
return enc.describe(data["encoder"], value)
def adjust(self, data=None):
assert data, 'Received no settings to adjust.'
prog_weight = (1 / len(self.config[CONFIG_KEY])) * 100
prog_half_weight = .5 * prog_weight
self.progress = 0
for component_key in self.config[CONFIG_KEY]:
settings = data['application']['components'].get(component_key, {}).get('settings')
if not settings:
self.progress += prog_weight
continue # no settings for this component
self.progress_message = "adjusting {} component s3 file. Encoding settings".format(component_key)
self.print_progress()
import encoders.base as enc
script = enc.encode(self.config[CONFIG_KEY][component_key]['encoder'], settings)[0]
self.progress += prog_half_weight
self.progress_message = "encoded settings, uploading script to bucket"
self.print_progress()
script_bytes = script.encode()
self.s3.put_object(Body=script_bytes, Bucket=self.config[CONFIG_KEY][component_key]['s3_bucket'], Key=self.config[CONFIG_KEY][component_key]['s3_path'])
self.progress += prog_half_weight
self.progress = 100
self.progress_message = "adjust complete"
self.print_progress()
# This method is returned by the static get_validator() method
# Requires list of dicts:[
# { "PublicIpAddress": ip,
# "InstanceId": id },
# ... ]
# Unlike the other validators, this will return a list of failing instances to facilitate their termination
def get_failed_adjust_insts(self, component_key, described_instances, settings_to_verify):
endpoint_frmtstr = 'http://{{}}:{}/{}'.format(
self.config[CONFIG_KEY][component_key].get('describe_port', 8080),
self.config[CONFIG_KEY][component_key].get('describe_endpoint', 'describe.json')
)
failed_instances = []
for i in described_instances:
desc_endpoint = endpoint_frmtstr.format(i["PublicIpAddress"])
try:
describe_resp = requests.get(url=desc_endpoint, timeout=2)
except requests.exceptions.ConnectTimeout: # 2s timeout will trigger ConnectTimeout but that is expected on fresh instances
failed_instances.append(i["InstanceId"])
continue
if describe_resp.status_code != 200:
failed_instances.append(i["InstanceId"])
continue
raw_data = describe_resp.content
encoding = self.config[CONFIG_KEY][component_key].get('describe_encoding', 'utf-8')
describe_data = raw_data.decode(encoding)
# The failures we're concerned with here-in are limited to deployment issues such as 500 error codes and the json never being available
# Misalignment of settings would be indicative of some other type of issue so it produces an error accordingly
desc_settings = self.encode_describe_if_needed(None, self.config[CONFIG_KEY][component_key], describe_data)
for key in desc_settings:
if settings_to_verify.get(key) and desc_settings[key]['value'] != settings_to_verify[key]['value']:
raise Exception('{} validation failed for component {} setting {}: describe endpoint value {} did not match input value {}'.format(
CONFIG_KEY,
component_key,
key,
desc_settings[key]['value'],
settings_to_verify[key]['value']
))
return failed_instances
# Returns callback to be used by external drivers
# ec2s3 does not manage server lifecycle so it implements this callback
# allowing plugability with external lifecycle hooks.
# Eg. servo-ec2win supports valdiation of ASG instances after readiness checks to ensure the settings were applied
@staticmethod
def get_validator():
driver = Ec2S3Driver(cli_desc=DESC, supports_cancel=False, version=VERSION) # these args are irrelevant during validation
return driver.get_failed_adjust_insts
if __name__ == '__main__':
driver = Ec2S3Driver(cli_desc=DESC, supports_cancel=HAS_CANCEL, version=VERSION)
driver.run()