Skip to content

Commit 5a46698

Browse files
committed
Whole bunch of related changes to Problem.
1 parent f9162b0 commit 5a46698

File tree

2 files changed

+118
-91
lines changed

2 files changed

+118
-91
lines changed

DAG Job Accounting.ipynb

+36-8
Large diffs are not rendered by default.

zephyr/Problem.py

+82-83
Original file line numberDiff line numberDiff line change
@@ -28,28 +28,65 @@ def setupSystem(scu):
2828
if 'cacheDir' in baseSystemConfig:
2929
subSystemConfig['cacheDir'] = os.path.join(baseSystemConfig['cacheDir'], 'cache', '%d-%d'%tag)
3030

31-
localLocator = Kernel.SeisLocator25D(subSystemConfig['geom'])
3231
localSystem[tag] = Kernel.SeisFDFDKernel(subSystemConfig, locator=localLocator)
3332

3433
return tag
3534

35+
# def blockOnTag(fn):
36+
# from IPython.parallel.error import UnmetDependency
37+
# def checkForSystem(*args, **kwargs):
38+
# if not args[0] in localSystem:
39+
# raise UnmetDependency
40+
41+
# return fn(*args, **kwargs)
42+
43+
# return checkForSystem
44+
45+
46+
@interactive
47+
def setupCommon():
48+
global baseSystemConfig
49+
50+
localLocator = Kernel.SeisLocator25D(subSystemConfig['geom'])
51+
52+
@interactive
53+
def clearFromTag(tag):
54+
return localSystem[tag].clear()
55+
3656
@interactive
57+
# @blockOnTag
3758
def forwardFromTagAccumulate(tag, isrc):
59+
3860
from IPython.parallel.error import UnmetDependency
3961
if not tag in localSystem:
4062
raise UnmetDependency
4163

4264
resultTracker((tag[0], isrc), localSystem[tag].forward(isrc, True))
4365

4466
@interactive
67+
# @blockOnTag
4568
def forwardFromTagAccumulateAll(tag, isrcs):
69+
4670
from IPython.parallel.error import UnmetDependency
4771
if not tag in localSystem:
4872
raise UnmetDependency
4973

5074
for isrc in isrcs:
5175
forwardFromTagAccumulate(tag, isrc)
5276

77+
# @interactive
78+
# @blockOnTag
79+
# def backpropFromTagAccumulate(tag, isrc):
80+
81+
82+
83+
# @interactive
84+
# @blockOnTag
85+
# def backpropFromTagAccumulateAll(tag, isrcs):
86+
87+
# for isrc in isrcs:
88+
# backpropFromTagAccumulate(tag, isrc)
89+
5390
@interactive
5491
def hasSystem(tag):
5592
global localSystem
@@ -112,20 +149,15 @@ def getChunks(problems, chunks=1):
112149
nproblems = len(problems)
113150
return (problems[i*nproblems // chunks: (i+1)*nproblems // chunks] for i in range(chunks))
114151

115-
def cdSame(profile=None):
152+
def cdSame(rc):
116153
import os
117-
from IPython.parallel import Client
118154

119-
if profile:
120-
rc = Client(profile=profile)
121-
else:
122-
rc = Client()
123155
dview = rc[:]
124156

125-
126157
home = os.getenv('HOME')
127158
cwd = os.getcwd()
128159

160+
@interactive
129161
def cdrel(relpath):
130162
import os
131163
home = os.getenv('HOME')
@@ -168,6 +200,7 @@ def __init__(self, systemConfig, **kwargs):
168200

169201
Problem.BaseProblem.__init__(self, mesh, **kwargs)
170202

203+
171204
splitkeys = ['freqs', 'nky']
172205

173206
subConfigSettings = {}
@@ -182,11 +215,13 @@ def __init__(self, systemConfig, **kwargs):
182215
pupdate = {'profile': self.systemConfig['profile']}
183216
else:
184217
pupdate = {}
185-
if not cdSame(**pupdate):
186-
print('Could not change all workers to the same directory as the client!')
187218

188219
pclient = Client(**pupdate)
189220

221+
222+
if not cdSame(pclient):
223+
print('Could not change all workers to the same directory as the client!')
224+
190225
self.par = {
191226
'pclient': pclient,
192227
'dview': pclient[:],
@@ -211,6 +246,7 @@ def __init__(self, systemConfig, **kwargs):
211246

212247
self._rebuildSystem()
213248

249+
214250
def _getHandles(self, systemConfig, subConfigSettings):
215251

216252
pclient = self.par['pclient']
@@ -221,12 +257,11 @@ def _getHandles(self, systemConfig, subConfigSettings):
221257
nsp = len(subConfigs)
222258

223259
# Set up dictionary for subproblem objects and push base configuration for the system
224-
#setupCache(systemConfig)
225260
dview['localSystem'] = {}
226261
dview['baseSystemConfig'] = systemConfig
227262
dview['resultTracker'] = commonReducer()
228-
#localSystem = Reference('localSystem')
229-
#resultTracker = Reference('resultTracker')
263+
dview.execute("localLocator = Kernel.SeisLocator25D(baseSystemConfig['geom'])")
264+
230265

231266
# Create a function to get a subproblem forward modelling function
232267
dview['forwardFromTag'] = lambda tag, isrc, dOnly=True: localSystem[tag].forward(isrc, dOnly)
@@ -238,6 +273,7 @@ def _getHandles(self, systemConfig, subConfigSettings):
238273

239274
dview['forwardFromTagAccumulate'] = forwardFromTagAccumulate
240275
dview['forwardFromTagAccumulateAll'] = forwardFromTagAccumulateAll
276+
dview['clearFromTag'] = clearFromTag
241277

242278
dview.wait()
243279

@@ -285,25 +321,17 @@ def _gen25DSubConfigs(self, freqs, nky, cmin):
285321

286322
# Fields
287323
def forwardAccumulate(self, isrcs=None):
324+
G = self.systemSolve(Reference('forwardFromTagAccumulateAll'), isrcs)
325+
326+
return G
327+
328+
def systemSolve(self, fnRef, isrcs=None, clearRef=Reference('clearFromTag')):
288329

289330
dview = self.par['dview']
290331
lview = self.par['lview']
291332

292333
chunksPerWorker = self.systemConfig.get('chunksPerWorker', 1)
293334

294-
# Create a function to save forward modelling results to the tracker
295-
dview.execute("setupFromTag = lambda tag: None")
296-
#dview['setupFromTag'] = lambda tag: None
297-
#setupFromTag = Reference('setupFromTag')
298-
299-
#forwardFromTagAccumulate = Reference('forwardFromTagAccumulate')
300-
301-
#forwardFromTagAccumulateAll = Reference('forwardFromTagAccumulateAll')
302-
303-
dview.execute("clearFromTag = lambda tag: localSystem[tag].clear()")
304-
#dview['clearFromTag'] = lambda tag: localSystem[tag].clear()
305-
#clearFromTag = Reference('clearFromTag')
306-
307335
G = networkx.DiGraph()
308336

309337
mainNode = 'Beginning'
@@ -330,17 +358,11 @@ def forwardAccumulate(self, isrcs=None):
330358
for ltags in systemsOnWorkers:
331359
tags = tags.union(set(ltags))
332360

333-
startJobs = {wid: [] for wid in xrange(len(ids))}
334-
systemJobs = {}
335-
endJobs = {wid: [] for wid in xrange(len(ids))}
336-
endNodes = {wid: [] for wid in xrange(len(ids))}
361+
endNodes = {}
337362
tailNodes = []
338363

339364
for tag in tags:
340365

341-
startJobsLocal = []
342-
endJobsLocal = []
343-
344366
tagNode = 'Head: %d, %d'%tag
345367
G.add_edge(mainNode, tagNode)
346368

@@ -350,83 +372,60 @@ def forwardAccumulate(self, isrcs=None):
350372
systems = systemsOnWorkers[i]
351373
rank = ids[i]
352374

353-
try:
354-
jobdeps = {'after': endJobs[i][-1]}
355-
except IndexError:
356-
jobdeps = {}
357-
358375
if tag in systems:
359376
relIDs.append(i)
360-
with lview.temp_flags(block=False, **jobdeps):
361-
job = lview.apply(depend(hasSystemRank, tag, rank)(Reference('setupFromTag')), tag)
362-
startJobsLocal.append(job)
363-
startJobs[i].append(job)
364-
label = 'Setup: %d, %d, %d'%(tag[0],tag[1],i)
365-
G.add_node(label, job=job)
366-
G.add_edge(tagNode, label)
367-
if 'after' in jobdeps:
368-
G.add_edge(endNodes[i][-1], label)
369-
370-
tagNode = 'Init: %d, %d'%tag
371-
for i in relIDs:
372-
label = 'Setup: %d, %d, %d'%(tag[0],tag[1],i)
373-
G.add_edge(label, tagNode)
374377

375-
systemJobs[tag] = []
378+
systemJobs = []
379+
endNodes[tag] = []
376380
systemNodes = []
377381

378-
with lview.temp_flags(block=False, after=startJobsLocal):
382+
with lview.temp_flags(block=False):
379383
iworks = 0
380384
for work in getChunks(isrcslist, int(round(chunksPerWorker*len(relIDs)))):
381385
if work:
382-
job = lview.apply(Reference('forwardFromTagAccumulateAll'), tag, work)
383-
systemJobs[tag].append(job)
386+
job = lview.apply(fnRef, tag, work)
387+
systemJobs.append(job)
384388
label = 'Compute: %d, %d, %d'%(tag[0], tag[1], iworks)
385389
systemNodes.append(label)
386390
G.add_node(label, job=job)
387391
G.add_edge(tagNode, label)
388392
iworks += 1
389393

390-
tagNode = 'Wrap: %d, %d'%tag
391-
for label in systemNodes:
392-
G.add_edge(label, tagNode)
394+
if self.systemConfig.get('ensembleClear', False): # True for ensemble ending, False for individual ending
395+
tagNode = 'Wrap: %d, %d'%tag
396+
for label in systemNodes:
397+
G.add_edge(label, tagNode)
393398

394-
relIDs = []
395-
for i in xrange(len(ids)):
396-
397-
systems = systemsOnWorkers[i]
398-
rank = ids[i]
399+
for i in relIDs:
399400

400-
if tag in systems:
401-
relIDs.append(i)
402-
with lview.temp_flags(block=False, after=systemJobs[tag]):
403-
job = lview.apply(depend(hasSystemRank, tag, rank)(Reference('clearFromTag')), tag)
404-
endJobsLocal.append(job)
405-
endJobs[i].append(job)
406-
label = 'Wrap: %d, %d, %d'%(tag[0],tag[1],i)
401+
rank = ids[i]
402+
403+
with lview.temp_flags(block=False, after=systemJobs):
404+
job = lview.apply(depend(hasSystemRank, tag, rank)(clearRef), tag)
405+
label = 'Wrap: %d, %d, %d'%(tag[0],tag[1], i)
407406
G.add_node(label, job=job)
408-
endNodes[i].append(label)
407+
endNodes[tag].append(label)
409408
G.add_edge(tagNode, label)
409+
else:
410+
411+
for i, sjob in enumerate(systemJobs):
412+
with lview.temp_flags(block=False, follow=sjob):
413+
job = lview.apply(clearRef, tag)
414+
label = 'Wrap: %d, %d, %d'%(tag[0],tag[1],i)
415+
G.add_node(label, job=job)
416+
endNodes[tag].append(label)
417+
G.add_edge(systemNodes[i], label)
410418

411419
tagNode = 'Tail: %d, %d'%tag
412-
for i in relIDs:
413-
label = 'Wrap: %d, %d, %d'%(tag[0],tag[1],i)
420+
for label in endNodes[tag]:
414421
G.add_edge(label, tagNode)
415422
tailNodes.append(tagNode)
416423

417424
endNode = 'End'
418425
for node in tailNodes:
419426
G.add_edge(node, endNode)
420427

421-
jobs = {
422-
'startJobs': startJobs,
423-
'systemJobs': systemJobs,
424-
'endJobs': endJobs,
425-
}
426-
427-
# finaljob dependent on endJobs
428-
429-
return jobs, G
428+
return G
430429

431430
def _rebuildSystem(self, c = None):
432431
if c is not None:

0 commit comments

Comments
 (0)