diff --git a/TODO b/TODO index b83b058..c72b9ea 100644 --- a/TODO +++ b/TODO @@ -1,8 +1,5 @@ * stats improvements - (todo) include something about currently-expiring quadtree. - (todo) include something about the state of the initialization - process. (todo) output lengths of important and missing queues. * queuemaster commands diff --git a/munin.py b/munin.py index 0087c23..5b6913e 100755 --- a/munin.py +++ b/munin.py @@ -82,4 +82,10 @@ def queue_sort(a, b): print 'multigraph toposm_queues' for q, v in message['queue'].items(): field = 'q{0}'.format(q) - print '{0}.value {1}'.format(field, v) + try: + if 'init' in message and message['init'] <= int(q): + print '{0}.value U'.format(field) + else: + print '{0}.value {1}'.format(field, v) + except ValueError: + print '{0}.value {1}'.format(field, v) diff --git a/queue_stats.py b/queue_stats.py index 64b46d3..898add5 100755 --- a/queue_stats.py +++ b/queue_stats.py @@ -10,7 +10,11 @@ from toposm import * def print_stats(s): - print 'expire queue: %s' % s['expire'] + print 'expire queue: %s' % s['expire']['input'] + if s['expire']['status']: + print 'currently expiring at zoom %s, %s tiles' % (s['expire']['status'][0], s['expire']['status'][1]) + if 'init' in s: + print 'currently initializing at zoom %s' % s['init'] print '' for renderer, status in s['render'].items(): print '%s: %s' % (renderer, status) diff --git a/queuemaster.py b/queuemaster.py index dd30862..76345c6 100755 --- a/queuemaster.py +++ b/queuemaster.py @@ -214,10 +214,14 @@ def __init__(self, maxz, queue): threading.Thread.__init__(self) self.maxz = maxz self.queue = queue + self.current_zoom = None + self.lock = threading.Lock() def run(self): log_message('Initializing queue.') for z in xrange(2, self.maxz + 1): + with self.lock: + self.current_zoom = z for root, dirs, files in os.walk(os.path.join(BASE_TILE_DIR, REFERENCE_TILESET, str(z))): dirty_tiles = [] for file in files: @@ -229,8 +233,14 @@ def run(self): dirty_tiles.sort() for t, z, x, y in dirty_tiles: self.queue.queue_tile(z, x, y, 'zoom', 'init') + with self.lock: + self.current_zoom = -1 log_message('Queue initialized.') + def get_status(self): + with self.lock: + return self.current_zoom + class TileExpirer(threading.Thread): def __init__(self, maxz, queue): @@ -239,6 +249,9 @@ def __init__(self, maxz, queue): self.queue = queue self.keep_running = True self.input_queue = collections.deque() + self.lock = threading.Lock() + self.current_expire = None + self.current_expire_zoom = None def run(self): while self.keep_running: @@ -256,7 +269,11 @@ def run(self): time.sleep(EXPIRE_SLEEP_INTERVAL) def process_expire(self, expire): + with self.lock: + self.current_expire = expire for z in xrange(self.maxz, 2 - 1, -1): + with self.lock: + self.current_expire_zoom = z for (x, y) in expire.expiredAt(z): tile_path = getTilePath(REFERENCE_TILESET, z, x, y) if path.isfile(tile_path): @@ -266,6 +283,9 @@ def process_expire(self, expire): if path.isfile(tile_path) and 'user.toposm_dirty' not in xattr.listxattr(tile_path): xattr.setxattr(tile_path, 'user.toposm_dirty', 'yes') self.queue.queue_tile(z, x, y, 'zoom', 'expire') + with self.lock: + self.current_expire = None + self.current_expire_zoom = None def add_expired(self, tile): z, x, y = [ int(i) for i in tile.split('/') ] @@ -273,13 +293,21 @@ def add_expired(self, tile): def get_input_length(self): return len(self.input_queue) - + + def get_expire_status(self): + with self.lock: + if self.current_expire: + return (self.current_expire_zoom, self.current_expire.countExpiredAt(self.current_expire_zoom)) + else: + return None + class Queuemaster: def __init__(self, maxz): self.maxz = maxz self.queue = Queue(self.maxz) + self.initializer = None self.expirer = TileExpirer(self.maxz, self.queue) self.expirer.start() self.renderers = {} @@ -313,8 +341,8 @@ def on_expire_declare(self,frame): def on_expire_bind(self,frame): self.channel.basic_consume(self.on_expire, queue='expire_toposm', exclusive=True, no_ack=True) - queue_filler = QueueFiller(self.maxz, self.queue) - queue_filler.start() + self.initializer = QueueFiller(self.maxz, self.queue) + self.initializer.start() time.sleep(QUEUE_FILL_DELAY) self.channel.queue_declare(self.on_command_declare, exclusive=True) @@ -371,9 +399,13 @@ def on_command(self, chan, method, props, body): chan.basic_ack(delivery_tag=method.delivery_tag) def get_stats(self): - return json.dumps({'queue': self.queue.get_stats(), - 'expire': self.expirer.get_input_length(), - 'render': {r.name: r.status for r in self.renderers.values()}}) + result = {'queue': self.queue.get_stats(), + 'expire': {'input': self.expirer.get_input_length(), + 'status': self.expirer.get_expire_status()}, + 'render': {r.name: r.status for r in self.renderers.values()}} + if self.initializer and self.initializer.is_alive(): + result['init'] = self.initializer.get_status() + return json.dumps(result) def add_renderer(self, message, queue): self.renderers[queue] = Renderer(message, self.queue, queue, self.channel) diff --git a/tileexpire.py b/tileexpire.py index 66d03b2..837bada 100644 --- a/tileexpire.py +++ b/tileexpire.py @@ -94,6 +94,15 @@ def markFull(self): self.full = True self.children = [None, None, None, None] + def countExpiredAt(self, targetz): + if targetz < self.z: + raise Exception('Cannot count expired tiles at zoom {0}, which is lower than my zoom {1}'.format(targetz, self.z)) + if targetz == self.z: + return 1 + if self.full: + return 4 ** (targetz - self.z) + return sum([c.countExpiredAt(targetz) for c in self.children if c]) + def expiredAt(self, targetz): """Yield (as a generator) all the expired tiles at the given zoom.""" return self._expiredAt(targetz, 0)