-
Notifications
You must be signed in to change notification settings - Fork 95
/
rabbit-repoid.py
executable file
·168 lines (142 loc) · 6.03 KB
/
rabbit-repoid.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
#!/usr/bin/python3
import argparse
import glob
import json
import logging
import random
import subprocess
import osc
from osc.core import http_GET, makeurl
from osclib.core import target_archs
from lxml import etree as ET
from urllib.error import HTTPError
from osclib.PubSubConsumer import PubSubConsumer
class Listener(PubSubConsumer):
def __init__(self, apiurl, amqp_prefix, namespaces):
super(Listener, self).__init__(amqp_prefix, logging.getLogger(__name__))
self.apiurl = apiurl
self.amqp_prefix = amqp_prefix
self.namespaces = namespaces
# repos to check on startup
self.repositories_to_check = []
# repos to check periodically that in flux
self.repositories_to_monitor = set()
def interval(self):
if len(self.repositories_to_check) or len(self.repositories_to_monitor):
return 5
return super(Listener, self).interval()
def still_alive(self):
super(Listener, self).still_alive()
self.check_some_repos()
def routing_keys(self):
return [self.amqp_prefix + '.obs.repo.build_finished']
def check_arch(self, project, repository, architecture):
url = makeurl(self.apiurl, [
'build', project, repository, architecture], {'view': 'status'})
root = ET.parse(http_GET(url)).getroot()
if root.get('code') == 'finished':
buildid = root.find('buildid')
if buildid is not None:
return buildid.text
self.logger.info(f"{project}/{repository}/{architecture}: code=%s dirty=%s" % (root.get('code'), root.get('dirty', 'false')))
if root.get('code') == 'scheduling' or root.get('dirty', 'false') == 'true':
self.repositories_to_monitor.add(f'{project}/{repository}')
def check_all_archs(self, project, repository):
ids = {}
try:
archs = target_archs(self.apiurl, project, repository)
except HTTPError:
return None
for arch in archs:
repoid = self.check_arch(project, repository, arch)
if not repoid:
self.logger.info(f'{project}/{repository}/{arch} not yet done')
return None
ids[arch] = repoid
self.logger.info(f'All of {project}/{repository} finished')
return ids
def is_part_of_namespaces(self, project):
for namespace in self.namespaces:
if project.startswith(namespace):
return True
def start_consuming(self):
# now we are (re-)connected to the bus and need to fetch the
# initial state
for namespace in self.namespaces:
for state in glob.glob(f'{namespace}*.yaml'):
state = state.replace('.yaml', '')
# split
project, repository = state.split('_-_')
self.repositories_to_check.append([project, repository])
self.check_some_repos()
super(Listener, self).start_consuming()
def check_some_repos(self):
count = 0
limit = 15
while len(self.repositories_to_check):
project, repository = self.repositories_to_check.pop()
self.logger.debug(f"Check repo {project}/{repository}")
self.update_repo(project, repository)
count += 1
if count >= limit:
return
# shuffle to avoid starvation of the repos freshly added
repos = list(self.repositories_to_monitor)
random.shuffle(repos)
for entry in repos:
self.repositories_to_monitor.discard(entry)
project, repository = entry.split('/')
self.logger.debug(f"Recheck repo {project}/{repository}")
self.update_repo(project, repository)
count += 1
if count >= limit:
return
def push_git(self, message):
cmd = 'git add . && git diff --exit-code --quiet HEAD || ( git commit -m "{}" && git push ) > /dev/null'
subprocess.run(cmd.format(message), shell=True, check=True)
def update_repo(self, project, repository):
ids = self.check_all_archs(project, repository)
if not ids:
return
pathname = project + '_-_' + repository + '.yaml'
with open(pathname, 'w') as f:
for arch in sorted(ids.keys()):
f.write(f'{arch}: {ids[arch]}\n')
self.push_git(f'Repository update: {project}/{repository}')
def on_message(self, unused_channel, method, properties, body):
self.logger.debug("on_message")
self.acknowledge_message(method.delivery_tag)
try:
body = json.loads(body)
except ValueError:
return
if method.routing_key.endswith('.obs.repo.build_finished'):
if not self.is_part_of_namespaces(body['project']):
return
self.restart_timer()
self.logger.info(f"Repo finished event: {body['project']}/{body['repo']}/{body['arch']}")
self.update_repo(body['project'], body['repo'])
else:
self.logger.warning(
f'unknown rabbitmq message {method.routing_key}')
if __name__ == '__main__':
parser = argparse.ArgumentParser(
description='Monitor to commit repo status to git (for gocd trigger)')
parser.add_argument('--apiurl', '-A', type=str, help='API URL of OBS')
parser.add_argument('-d', '--debug', action='store_true', default=False,
help='enable debug information')
parser.add_argument('namespaces', nargs='*', help='namespaces to wait for')
args = parser.parse_args()
osc.conf.get_config(override_apiurl=args.apiurl)
osc.conf.config['debug'] = args.debug
apiurl = osc.conf.config['apiurl']
if apiurl.endswith('suse.de'):
amqp_prefix = 'suse'
else:
amqp_prefix = 'opensuse'
logging.basicConfig(level=logging.DEBUG if args.debug else logging.INFO)
listener = Listener(apiurl, amqp_prefix, args.namespaces)
try:
listener.run(10800)
except KeyboardInterrupt:
listener.stop()