-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathtest_executor.py
74 lines (51 loc) · 1.45 KB
/
test_executor.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
import json
import logging
import time
from mlboardclient.api import client
LOG = logging.getLogger(__name__)
m = client.Client('http://localhost:8082/api/v2')
app = m.apps.get('11-tfexample')
task = app.task('model')
task.resource('worker')['args'] = {'common': 'yes'}
def generate_tasks(num):
tasks = []
for i in range(num):
t_copy = task.copy()
t_copy.resource('worker')['args']['arg'] = i
tasks.append(t_copy)
return tasks
tasks = generate_tasks(5)
executor = m.tasks.new_executor(3)
# Spawn tasks
for t in tasks:
executor.put(t)
more_tasks = generate_tasks(3)
spawned_more = False
# Check completed and checkout logs
while len(tasks) > 0:
for t in tasks:
if not t.completed:
continue
LOG.info('Task %s:%s logs:' % (t.name, t.build))
logs = t.logs()
worker_logs = [log for k, log in logs.items() if 'worker' in k]
print(json.dumps(worker_logs, indent=2))
tasks.remove(t)
# if not spawned_more:
# # Spawn more tasks
# LOG.info('Spawn more tasks')
# for tsk in more_tasks:
# executor.put(tsk)
# tasks.append(tsk)
#
# spawned_more = True
break
time.sleep(1)
executor.wait()
# Spawn more tasks
LOG.info('Spawn more tasks')
for tsk in more_tasks:
executor.put(tsk)
tasks.append(tsk)
executor.wait()
# print(json.dumps(list(logs), indent=2))