-
Notifications
You must be signed in to change notification settings - Fork 95
/
rabbit-openqa.py
executable file
·392 lines (332 loc) · 14.6 KB
/
rabbit-openqa.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
#!/usr/bin/python3
import argparse
import logging
import json
import osc
import re
from osc.core import http_GET, http_POST, makeurl
from osclib.conf import Config
from osclib.stagingapi import StagingAPI
from lxml import etree as ET
from openqa_client.client import OpenQA_Client
from packaging import version
from urllib.error import HTTPError
from urllib.parse import quote_plus
import requests
from osclib.PubSubConsumer import PubSubConsumer
class Project(object):
def __init__(self, name):
self.name = name
Config(apiurl, name)
self.api = StagingAPI(apiurl, name)
self.staging_projects = dict()
self.listener = None
self.logger = logging.getLogger(__name__)
self.replace_string = self.api.attribute_value_load('OpenQAMapping')
def init(self):
projects = set()
for project in self.api.get_staging_projects():
if self.api.is_adi_project(project):
continue
self.staging_projects[project] = self.initial_staging_state(project)
projects.add(project)
return projects
def staging_letter(self, name):
return name.split(':')[-1]
def map_iso(self, staging_project, iso):
parts = self.replace_string.split('/')
if parts[0] != 's':
raise Exception(f"{self.name}'s iso_replace_string does not start with s/")
old = parts[1]
new = parts[2]
new = new.replace('$LETTER', self.staging_letter(staging_project))
try:
stagingiso = re.compile(old).sub(new, iso)
except re.error:
self.logger.error(f"_MAP_ISO {self.replace_string} does not create valid regexps in {self.name}")
return None
if stagingiso == iso:
self.logger.info(f"{self.replace_string} did not map {iso} properly, ignoring")
return None
return stagingiso
def gather_isos(self, name, repository):
iso_set = set()
# Fetch /published/prj/repo/iso/*.iso
url = self.api.makeurl(['published', name, repository, 'iso'])
f = self.api.retried_GET(url)
root = ET.parse(f).getroot()
for entry in root.findall('entry'):
if entry.get('name').endswith('.iso'):
iso_set.add(self.map_iso(name, entry.get('name')))
# Fetch /published/prj/repo/*.{qcow2,raw.xz,spdx.json}
url = self.api.makeurl(['published', name, repository])
f = self.api.retried_GET(url)
root = ET.parse(f).getroot()
for entry in root.findall('entry'):
filename = entry.get('name')
if (filename.endswith('.qcow2') or
filename.endswith('.raw.xz') or
filename.endswith('.spdx.json')):
iso_set.add(self.map_iso(name, filename))
# Filter out isos which couldn't be mapped
return [iso for iso in iso_set if iso]
def gather_buildid(self, name, repository):
url = self.api.makeurl(['published', name, repository], {'view': 'status'})
f = self.api.retried_GET(url)
id = ET.parse(f).getroot().find('buildid')
if id is not None:
return id.text
def initial_staging_state(self, name):
return {'isos': self.gather_isos(name, 'images'),
'id': self.gather_buildid(name, 'images')}
def fetch_openqa_jobs(self, staging, iso, openqa_infos):
openqa = self.listener.jobs_for_iso(iso)
# collect job infos to pick names
for job in openqa:
print(staging, iso, job['id'], job['state'], job['result'],
job['settings']['FLAVOR'], job['settings']['TEST'], job['settings']['MACHINE'])
openqa_infos[job['id']] = {'url': self.listener.test_url(job)}
openqa_infos[job['id']]['state'] = self.map_openqa_result(job)
openqa_infos[job['id']]['build'] = job['settings']['BUILD']
openqa_infos[job['id']]['name'] = f"{job['settings']['FLAVOR']}-{job['settings']['TEST']}@{job['settings']['MACHINE']}"
def compare_simple_builds(build1, build2):
"""Simple build number comparison"""
ver1 = version.parse(build1)
ver2 = version.parse(build2)
if ver1 < ver2:
return -1
if ver1 > ver2:
return 1
return 0
def compare_composite_builds(build1, build2):
"""Compare BUILD numbers consisting of multiple _-separated components."""
components1 = build1.split('_')
components2 = build2.split('_')
if len(components1) != len(components2):
raise Exception(f'Failed to compare {build1} and {build2}: Different format')
component_cmps = [Project.compare_simple_builds(components1[i], components2[i]) for i in range(0, len(components1))]
less = -1 in component_cmps
greater = 1 in component_cmps
if less and greater:
raise Exception(f'Failed to compare {build1} and {build2}: Not ordered')
if less:
return -1
if greater:
return 1
return 0
def update_staging_status(self, staging):
openqa_infos = dict()
for iso in self.staging_projects[staging]['isos']:
self.fetch_openqa_jobs(staging, iso, openqa_infos)
buildid = self.staging_projects[staging].get('id')
if not buildid:
self.logger.info("I don't know the build id of " + staging)
return
# all openQA jobs are created at the same URL
url = self.api.makeurl(['status_reports', 'published', staging, 'images', 'reports', buildid])
# make sure the names are unique
obsolete_jobs = []
taken_names = dict()
for id in openqa_infos:
name = openqa_infos[id]['name']
if name in taken_names:
# There are multiple jobs with that specific FLAVOR-TEST@MACHINE.
# In SLE Micro, jobs currently use BUILD=(dvdbuild)_(image_build),
# so if the dvd is rebuilt, new image jobs are triggered for the
# same binary. The openQA ?latest=1 filter doesn't look at that,
# so we have to figure out which of those is the most recent one.
build1 = openqa_infos[taken_names[name]]['build']
build2 = openqa_infos[id]['build']
if '_' in build1 and '_' in build2 and build1 != build2:
# Use the more recent build
buildcmp = Project.compare_composite_builds(build1, build2)
self.logger.info(f'Multiple builds for {name}, {build1} and {build2}. Comparison: {buildcmp}')
if buildcmp < 0: # Drop the previous one
obsolete_jobs.append(taken_names[name])
taken_names[name] = id
continue
elif buildcmp > 0: # Drop this one
obsolete_jobs.append(id)
continue
raise Exception(f'Names of job #{id} and #{taken_names[name]} collide: {name}')
taken_names[name] = id
for id in obsolete_jobs:
del openqa_infos[id]
for info in openqa_infos.values():
xml = self.openqa_check_xml(info['url'], info['state'], 'openqa:' + info['name'])
try:
if self.listener.dryrun:
print(f"Would POST to {url}: {xml}")
else:
http_POST(url, data=xml)
except HTTPError:
self.logger.error('failed to post status to ' + url)
def update_staging_buildid(self, project, repository, buildid):
self.staging_projects[project]['id'] = buildid
self.staging_projects[project]['isos'] = self.gather_isos(project, repository)
self.update_staging_status(project)
def check_published_repo(self, project, repository, buildid):
if repository != 'images':
return
for p in self.staging_projects:
if project == p:
self.update_staging_buildid(project, repository, buildid)
def matching_project(self, iso):
for p in self.staging_projects:
if iso in self.staging_projects[p]['isos']:
return p
def map_openqa_result(self, job):
if job['result'] in ['passed', 'softfailed']:
return 'success'
if job['result'] == 'none':
return 'pending'
return 'failure'
def openqa_job_change(self, iso):
staging = self.matching_project(iso)
if not staging:
return
# we fetch all openqa jobs so we can avoid long job names
self.update_staging_status(staging)
def openqa_check_xml(self, url, state, name):
check = ET.Element('check')
se = ET.SubElement(check, 'url')
se.text = url
se = ET.SubElement(check, 'state')
se.text = state
se = ET.SubElement(check, 'name')
se.text = name
return ET.tostring(check)
class Listener(PubSubConsumer):
def __init__(self, amqp_prefix, openqa_url, dryrun):
super(Listener, self).__init__(amqp_prefix, logging.getLogger(__name__))
self.projects = []
self.amqp_prefix = amqp_prefix
self.openqa_url = openqa_url
self.dryrun = dryrun
self.openqa = OpenQA_Client(server=openqa_url)
self.projects_to_check = set()
def routing_keys(self):
ret = []
for suffix in ['.obs.repo.published', '.openqa.job.done',
'.openqa.job.create', '.openqa.job.restart']:
ret.append(self.amqp_prefix + suffix)
return ret
def add(self, project):
project.listener = self
self.projects.append(project)
def start_consuming(self):
# now we are (re-)connected to the bus and need to fetch the
# initial state
self.projects_to_check = set()
for project in self.projects:
try:
self.logger.info('Fetching ISOs of %s', project.name)
for sproj in project.init():
self.projects_to_check.add((project, sproj))
except HTTPError as e:
if e.code == 404:
# No staging workflow? Have to protect against "rogue" projects
self.logger.error('Failed to load staging projects')
continue
else:
raise
self.logger.info('Finished fetching initial ISOs, listening')
super(Listener, self).start_consuming()
def interval(self):
if len(self.projects_to_check):
return 5
return super(Listener, self).interval()
def check_some_projects(self):
count = 0
limit = 5
while len(self.projects_to_check):
project, staging = self.projects_to_check.pop()
project.update_staging_status(staging)
count += 1
if count >= limit:
return
def still_alive(self):
self.check_some_projects()
super(Listener, self).still_alive()
def is_production_job(self, job):
if '/' in job['settings'].get('BUILD', '/') or \
'group' not in job or 'Development' in job['group']:
return False
return True
def jobs_for_iso(self, iso):
# Try ISO= matching first
values = {
'iso': iso,
'scope': 'current',
'latest': '1',
}
jobs = self.openqa.openqa_request('GET', 'jobs', values)['jobs']
# If no matches, try HDD_1=
if len(jobs) == 0:
del values['iso']
values['hdd_1'] = iso
jobs = self.openqa.openqa_request('GET', 'jobs', values)['jobs']
# Ignore PR verification runs (and jobs without 'BUILD')
return [job for job in jobs if self.is_production_job(job)]
def get_step_url(self, testurl, modulename):
failurl = testurl + f'/modules/{quote_plus(modulename)!s}/fails'
fails = requests.get(failurl).json()
failed_step = fails.get('first_failed_step', 1)
return f"{testurl!s}#step/{modulename!s}/{failed_step:d}"
def test_url(self, job):
url = self.openqa_url + ("/tests/%d" % job['id'])
if job['result'] == 'failed':
for module in job['modules']:
if module['result'] == 'failed':
return self.get_step_url(url, module['name'])
return url
def on_published_repo(self, payload):
for p in self.projects:
p.check_published_repo(str(payload['project']), str(payload['repo']), str(payload['buildid']))
def on_openqa_job(self, iso):
self.logger.debug('openqa_job_change %s', iso)
for p in self.projects:
p.openqa_job_change(iso)
def on_message(self, unused_channel, method, properties, body):
self.acknowledge_message(method.delivery_tag)
if method.routing_key == f'{amqp_prefix}.obs.repo.published':
self.on_published_repo(json.loads(body))
elif re.search(r'.openqa.', method.routing_key):
data = json.loads(body)
if '/' in data.get('BUILD'):
return # Ignore PR verification runs
if data.get('ISO'):
self.on_openqa_job(data.get('ISO'))
elif data.get('HDD_1'):
self.on_openqa_job(data.get('HDD_1'))
else:
self.logger.warning(f"unknown rabbitmq message {method.routing_key}")
if __name__ == '__main__':
parser = argparse.ArgumentParser(
description='Bot to sync openQA status to OBS')
parser.add_argument("--apiurl", '-A', type=str, help='API URL of OBS')
parser.add_argument('-d', '--debug', action='store_true', default=False,
help='enable debug information')
parser.add_argument('--dry', action='store_true', default=False,
help='do not perform changes')
args = parser.parse_args()
osc.conf.get_config(override_apiurl=args.apiurl)
osc.conf.config['debug'] = args.debug
apiurl = osc.conf.config['apiurl']
if apiurl.endswith('suse.de'):
amqp_prefix = 'suse'
openqa_url = 'https://openqa.suse.de'
else:
amqp_prefix = 'opensuse'
openqa_url = 'https://openqa.opensuse.org'
logging.basicConfig(level=logging.INFO)
listener = Listener(amqp_prefix, openqa_url, dryrun=args.dry)
url = makeurl(apiurl, ['search', 'project', 'id'], {'match': 'attribute/@name="OSRT:OpenQAMapping"'})
f = http_GET(url)
root = ET.parse(f).getroot()
for entry in root.findall('project'):
listener.add(Project(entry.get('name')))
try:
listener.run(runtime=10800)
except KeyboardInterrupt:
listener.stop()