Skip to content

Commit

Permalink
Return info about expire and init threads with stats.
Browse files Browse the repository at this point in the history
  • Loading branch information
asciipip committed Jun 8, 2014
1 parent 846d5b1 commit f64a47b
Show file tree
Hide file tree
Showing 5 changed files with 59 additions and 11 deletions.
3 changes: 0 additions & 3 deletions TODO
Original file line number Diff line number Diff line change
@@ -1,8 +1,5 @@

* stats improvements
(todo) include something about currently-expiring quadtree.
(todo) include something about the state of the initialization
process.
(todo) output lengths of important and missing queues.

* queuemaster commands
Expand Down
8 changes: 7 additions & 1 deletion munin.py
Original file line number Diff line number Diff line change
Expand Up @@ -82,4 +82,10 @@ def queue_sort(a, b):
print 'multigraph toposm_queues'
for q, v in message['queue'].items():
field = 'q{0}'.format(q)
print '{0}.value {1}'.format(field, v)
try:
if 'init' in message and message['init'] <= int(q):
print '{0}.value U'.format(field)
else:
print '{0}.value {1}'.format(field, v)
except ValueError:
print '{0}.value {1}'.format(field, v)
6 changes: 5 additions & 1 deletion queue_stats.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,11 @@
from toposm import *

def print_stats(s):
print 'expire queue: %s' % s['expire']
print 'expire queue: %s' % s['expire']['input']
if s['expire']['status']:
print 'currently expiring at zoom %s, %s tiles' % (s['expire']['status'][0], s['expire']['status'][1])
if 'init' in s:
print 'currently initializing at zoom %s' % s['init']
print ''
for renderer, status in s['render'].items():
print '%s: %s' % (renderer, status)
Expand Down
44 changes: 38 additions & 6 deletions queuemaster.py
Original file line number Diff line number Diff line change
Expand Up @@ -214,10 +214,14 @@ def __init__(self, maxz, queue):
threading.Thread.__init__(self)
self.maxz = maxz
self.queue = queue
self.current_zoom = None
self.lock = threading.Lock()

def run(self):
log_message('Initializing queue.')
for z in xrange(2, self.maxz + 1):
with self.lock:
self.current_zoom = z
for root, dirs, files in os.walk(os.path.join(BASE_TILE_DIR, REFERENCE_TILESET, str(z))):
dirty_tiles = []
for file in files:
Expand All @@ -229,8 +233,14 @@ def run(self):
dirty_tiles.sort()
for t, z, x, y in dirty_tiles:
self.queue.queue_tile(z, x, y, 'zoom', 'init')
with self.lock:
self.current_zoom = -1
log_message('Queue initialized.')

def get_status(self):
with self.lock:
return self.current_zoom


class TileExpirer(threading.Thread):
def __init__(self, maxz, queue):
Expand All @@ -239,6 +249,9 @@ def __init__(self, maxz, queue):
self.queue = queue
self.keep_running = True
self.input_queue = collections.deque()
self.lock = threading.Lock()
self.current_expire = None
self.current_expire_zoom = None

def run(self):
while self.keep_running:
Expand All @@ -256,7 +269,11 @@ def run(self):
time.sleep(EXPIRE_SLEEP_INTERVAL)

def process_expire(self, expire):
with self.lock:
self.current_expire = expire
for z in xrange(self.maxz, 2 - 1, -1):
with self.lock:
self.current_expire_zoom = z
for (x, y) in expire.expiredAt(z):
tile_path = getTilePath(REFERENCE_TILESET, z, x, y)
if path.isfile(tile_path):
Expand All @@ -266,20 +283,31 @@ def process_expire(self, expire):
if path.isfile(tile_path) and 'user.toposm_dirty' not in xattr.listxattr(tile_path):
xattr.setxattr(tile_path, 'user.toposm_dirty', 'yes')
self.queue.queue_tile(z, x, y, 'zoom', 'expire')
with self.lock:
self.current_expire = None
self.current_expire_zoom = None

def add_expired(self, tile):
z, x, y = [ int(i) for i in tile.split('/') ]
self.input_queue.append((z, x, y))

def get_input_length(self):
return len(self.input_queue)


def get_expire_status(self):
with self.lock:
if self.current_expire:
return (self.current_expire_zoom, self.current_expire.countExpiredAt(self.current_expire_zoom))
else:
return None


class Queuemaster:

def __init__(self, maxz):
self.maxz = maxz
self.queue = Queue(self.maxz)
self.initializer = None
self.expirer = TileExpirer(self.maxz, self.queue)
self.expirer.start()
self.renderers = {}
Expand Down Expand Up @@ -313,8 +341,8 @@ def on_expire_declare(self,frame):
def on_expire_bind(self,frame):
self.channel.basic_consume(self.on_expire, queue='expire_toposm',
exclusive=True, no_ack=True)
queue_filler = QueueFiller(self.maxz, self.queue)
queue_filler.start()
self.initializer = QueueFiller(self.maxz, self.queue)
self.initializer.start()
time.sleep(QUEUE_FILL_DELAY)
self.channel.queue_declare(self.on_command_declare, exclusive=True)

Expand Down Expand Up @@ -371,9 +399,13 @@ def on_command(self, chan, method, props, body):
chan.basic_ack(delivery_tag=method.delivery_tag)

def get_stats(self):
return json.dumps({'queue': self.queue.get_stats(),
'expire': self.expirer.get_input_length(),
'render': {r.name: r.status for r in self.renderers.values()}})
result = {'queue': self.queue.get_stats(),
'expire': {'input': self.expirer.get_input_length(),
'status': self.expirer.get_expire_status()},
'render': {r.name: r.status for r in self.renderers.values()}}
if self.initializer and self.initializer.is_alive():
result['init'] = self.initializer.get_status()
return json.dumps(result)

def add_renderer(self, message, queue):
self.renderers[queue] = Renderer(message, self.queue, queue, self.channel)
Expand Down
9 changes: 9 additions & 0 deletions tileexpire.py
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,15 @@ def markFull(self):
self.full = True
self.children = [None, None, None, None]

def countExpiredAt(self, targetz):
if targetz < self.z:
raise Exception('Cannot count expired tiles at zoom {0}, which is lower than my zoom {1}'.format(targetz, self.z))
if targetz == self.z:
return 1
if self.full:
return 4 ** (targetz - self.z)
return sum([c.countExpiredAt(targetz) for c in self.children if c])

def expiredAt(self, targetz):
"""Yield (as a generator) all the expired tiles at the given zoom."""
return self._expiredAt(targetz, 0)
Expand Down

0 comments on commit f64a47b

Please sign in to comment.