-
Notifications
You must be signed in to change notification settings - Fork 6
/
tools.py
159 lines (137 loc) · 6.26 KB
/
tools.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
from configparser import ConfigParser
import subprocess as sp
from sys import exit
import errno
import logging
from os import getcwd, environ, mkdir
import time
import datetime
import re
# Constants
CONFIGFILE = 'config.ini'
URL = 'url'
DURL = 'download_url'
IDIR = 'install_dir'
SCRIPTSDIR = 'scripts'
PIPELINE_BASE_BRANCH = 'master'
logging.basicConfig(level=logging.DEBUG)
l = logging.getLogger(__name__)
class GECKO_VM:
"""Interact with configuration variables."""
HAS_CHANGES = False
config = ConfigParser()
RUNNER_WORKSPACE = getcwd() + "/"
def __init__(self):
self.config.read(CONFIGFILE)
if len(self.config.sections()) == 0:
l.critical('Missing files, check {}'.format(CONFIGFILE))
exit(errno.ENOENT)
def version(self, section, new_version=None):
if new_version:
self.config[section]['version'] = new_version
else:
return self.config[section]['version']
def base_dir(self):
return self.config['BASE'][IDIR]
def install_dir(self, section):
return self.config['BASE'][IDIR] + self.config[section][IDIR]
def gems(self):
gems = []
for section in self.config.sections():
if self.config.has_option(section, 'type'):
gems.append(section)
return gems
def cleanup(self, section, subdir=''):
sp.check_output(['rm', '-rf', self.install_dir(section) + subdir])
l.info('Have removed {}'.format(self.install_dir(section) + subdir))
def scripts(self, gem):
return '{}/{}'.format(gem, SCRIPTSDIR)
def model_file_location(self, gem):
return self.install_dir(gem) + self.config[gem]['model_filename']
def mat_model(self, gem):
return self.config[gem]['mat_model']
def pr_target(self):
return self.config['BASE']['pull_request_target']
def git_clone(self, section, branch='master'):
cmd = sp.check_output(['git', 'clone', self.config[section][URL], '--depth', '1', '--branch', branch, self.install_dir(section)])
l.info(cmd.decode('utf-8'))
def download(self, gem):
if self.config[gem][DURL]:
mkdir(self.install_dir(gem))
cmd = sp.check_output(['curl', self.config[gem][DURL] , '-O'], cwd=self.install_dir(gem))
l.info(cmd.decode('utf-8'))
time_seconds = time.time()
timestamp = datetime.datetime.fromtimestamp(time_seconds).strftime('%Y-%m-%d-%H-%M')
return timestamp
else:
self.git_clone(gem)
return self.git_tag(gem)
def git_tag(self, thing):
cmd = sp.Popen(['git', 'describe', '--tags'], cwd=self.install_dir(thing), stdout=sp.PIPE)
tool_version, _ = cmd.communicate()
return tool_version.decode('utf-8').strip()
def git_checkout(self, gem):
sp.check_output(['git', 'checkout', '-B', self.__branch_name(gem)])
def git_add_and_pr(self, gem, matlab_output):
sp.check_output(['git', 'add', gem])
sp.check_output(['git', 'add', 'config.ini'])
try:
# If nothing was addded (no changes) the commit will exit with an error so we can delete the branch
cmd = sp.check_output(['git', 'commit', '-m', 'chore: update {} based on {}'.format(gem, self.version(gem))])
l.info(cmd.decode('utf-8'))
l.critical('Will push and create PR')
# Create PR and also push
pr_filename = "/tmp/githubpr"
with open(pr_filename, "w") as f:
f.write("update {} based on {}\n\n".format(gem, self.version(gem)))
f.write("```matlab\n")
f.write(matlab_output)
f.write("\n```\n")
my_env = environ.copy()
cmd = sp.check_output(['hub', 'pull-request', '-F', pr_filename, '-b', self.pr_target(), '-p'], env=my_env)
l.info(cmd.decode('utf-8'))
except sp.CalledProcessError:
l.critical('While upgrading {} to {} no changes were detected'.format(gem, self.version(gem)))
finally:
l.info('Checking out {}'.format(self.PIPELINE_BASE_BRANCH))
cmd = sp.check_output(['git', 'checkout', '-f', self.PIPELINE_BASE_BRANCH])
l.info(cmd.decode('utf-8'))
def check_dependencies(self):
# Check Matlab version
cmd = sp.check_output(['/usr/local/bin/matlab', '-nodisplay -nosplash -nodesktop -batch', '"disp(version); quit;"'])
m_version = re.findall(r'([R]\d{4}\w+)', cmd.decode('utf-8'))[0]
if m_version != self.version('MATLAB'):
l.warning('MATLAB changed from {} to {}'.format(self.version('MATLAB'), m_version))
self.version('MATLAB', m_version)
else:
l.info('MATLAB is still {}'.format(m_version))
# Check libSBML, Gurobi version; these version files have been manually created
for tool in ['libSBML', 'Gurobi']:
with open(self.install_dir(tool) + 'VERSION.txt') as f:
tool_version = f.readline().strip()
if self.version(tool) != tool_version:
self.HAS_CHANGES = True
l.warning('{} changed from {} to {}'.format(tool, self.version(tool), tool_version))
self.version(tool, tool_version)
else:
l.info('{} is still {}'.format(tool, tool_version))
# Check COBRA, RAVEN, GECKO versions
self.cleanup('GECKO')
self.git_clone('GECKO')
for tool in ['COBRA', 'RAVEN', 'GECKO']:
tool_version = self.git_tag(tool)
if tool_version != self.version(tool):
l.warning('{} changed from {} to {}'.format(tool, self.version(tool), tool_version))
self.HAS_CHANGES = True
self.version(tool, tool_version)
else:
l.info('{} is still {}'.format(tool, tool_version))
# Cleanup dummy GECKO install
self.cleanup('GECKO')
def save_config(self):
with open(CONFIGFILE, 'w') as configfile:
self.config.write(configfile)
def __branch_name(self, gem):
time_seconds = time.time()
timestamp = datetime.datetime.fromtimestamp(time_seconds).strftime('%Y-%m-%d-%H-%M-%S')
return 'update/{}/{}/{}'.format(gem, self.version(gem), timestamp)