Skip to content

Commit 9f4526d

Browse files
Introduce a dynamic process pool for the local test driver
The new process pool allows adding jobs after testing has been started. It will also allow to restructure building the job queue (in a follow up CL), so that testing can start instantly while the queue is being built. Also attempts to clean up the keyboard-interrupt logic. Idea: Only catch keyboard interrupt once per process at the outermost level. Use proper "finally" clauses to clean up everywhere where a keyboard interrupt might occur. Never turn named exceptions into none-exceptions using anonymous "raise". TEST=python -m unittest pool_unittest [email protected] Review URL: https://codereview.chromium.org/275093002 git-svn-id: https://v8.googlecode.com/svn/branches/bleeding_edge@21310 ce2b1a6d-e550-0410-aec6-3dcde31c8c00
1 parent 635a485 commit 9f4526d

File tree

5 files changed

+271
-134
lines changed

5 files changed

+271
-134
lines changed

tools/run-tests.py

+32-37
Original file line numberDiff line numberDiff line change
@@ -463,44 +463,39 @@ def Execute(arch, mode, args, options, suites, workspace):
463463
return 0
464464

465465
# Run the tests, either locally or distributed on the network.
466-
try:
467-
start_time = time.time()
468-
progress_indicator = progress.PROGRESS_INDICATORS[options.progress]()
469-
if options.junitout:
470-
progress_indicator = progress.JUnitTestProgressIndicator(
471-
progress_indicator, options.junitout, options.junittestsuite)
472-
473-
run_networked = not options.no_network
474-
if not run_networked:
475-
print("Network distribution disabled, running tests locally.")
476-
elif utils.GuessOS() != "linux":
477-
print("Network distribution is only supported on Linux, sorry!")
466+
start_time = time.time()
467+
progress_indicator = progress.PROGRESS_INDICATORS[options.progress]()
468+
if options.junitout:
469+
progress_indicator = progress.JUnitTestProgressIndicator(
470+
progress_indicator, options.junitout, options.junittestsuite)
471+
472+
run_networked = not options.no_network
473+
if not run_networked:
474+
print("Network distribution disabled, running tests locally.")
475+
elif utils.GuessOS() != "linux":
476+
print("Network distribution is only supported on Linux, sorry!")
477+
run_networked = False
478+
peers = []
479+
if run_networked:
480+
peers = network_execution.GetPeers()
481+
if not peers:
482+
print("No connection to distribution server; running tests locally.")
478483
run_networked = False
479-
peers = []
480-
if run_networked:
481-
peers = network_execution.GetPeers()
482-
if not peers:
483-
print("No connection to distribution server; running tests locally.")
484-
run_networked = False
485-
elif len(peers) == 1:
486-
print("No other peers on the network; running tests locally.")
487-
run_networked = False
488-
elif num_tests <= 100:
489-
print("Less than 100 tests, running them locally.")
490-
run_networked = False
491-
492-
if run_networked:
493-
runner = network_execution.NetworkedRunner(suites, progress_indicator,
494-
ctx, peers, workspace)
495-
else:
496-
runner = execution.Runner(suites, progress_indicator, ctx)
497-
498-
exit_code = runner.Run(options.j)
499-
if runner.terminate:
500-
return exit_code
501-
overall_duration = time.time() - start_time
502-
except KeyboardInterrupt:
503-
raise
484+
elif len(peers) == 1:
485+
print("No other peers on the network; running tests locally.")
486+
run_networked = False
487+
elif num_tests <= 100:
488+
print("Less than 100 tests, running them locally.")
489+
run_networked = False
490+
491+
if run_networked:
492+
runner = network_execution.NetworkedRunner(suites, progress_indicator,
493+
ctx, peers, workspace)
494+
else:
495+
runner = execution.Runner(suites, progress_indicator, ctx)
496+
497+
exit_code = runner.Run(options.j)
498+
overall_duration = time.time() - start_time
504499

505500
if options.time:
506501
verbose.PrintTestDurations(suites, overall_duration)

tools/testrunner/local/commands.py

+42-47
Original file line numberDiff line numberDiff line change
@@ -64,49 +64,46 @@ def Win32SetErrorMode(mode):
6464

6565

6666
def RunProcess(verbose, timeout, args, **rest):
67-
try:
68-
if verbose: print "#", " ".join(args)
69-
popen_args = args
70-
prev_error_mode = SEM_INVALID_VALUE
71-
if utils.IsWindows():
72-
popen_args = subprocess.list2cmdline(args)
73-
# Try to change the error mode to avoid dialogs on fatal errors. Don't
74-
# touch any existing error mode flags by merging the existing error mode.
75-
# See http://blogs.msdn.com/oldnewthing/archive/2004/07/27/198410.aspx.
76-
error_mode = SEM_NOGPFAULTERRORBOX
77-
prev_error_mode = Win32SetErrorMode(error_mode)
78-
Win32SetErrorMode(error_mode | prev_error_mode)
79-
process = subprocess.Popen(
80-
shell=utils.IsWindows(),
81-
args=popen_args,
82-
**rest
83-
)
84-
if (utils.IsWindows() and prev_error_mode != SEM_INVALID_VALUE):
85-
Win32SetErrorMode(prev_error_mode)
86-
# Compute the end time - if the process crosses this limit we
87-
# consider it timed out.
88-
if timeout is None: end_time = None
89-
else: end_time = time.time() + timeout
90-
timed_out = False
91-
# Repeatedly check the exit code from the process in a
92-
# loop and keep track of whether or not it times out.
93-
exit_code = None
94-
sleep_time = INITIAL_SLEEP_TIME
95-
while exit_code is None:
96-
if (not end_time is None) and (time.time() >= end_time):
97-
# Kill the process and wait for it to exit.
98-
KillProcessWithID(process.pid)
99-
exit_code = process.wait()
100-
timed_out = True
101-
else:
102-
exit_code = process.poll()
103-
time.sleep(sleep_time)
104-
sleep_time = sleep_time * SLEEP_TIME_FACTOR
105-
if sleep_time > MAX_SLEEP_TIME:
106-
sleep_time = MAX_SLEEP_TIME
107-
return (exit_code, timed_out)
108-
except KeyboardInterrupt:
109-
raise
67+
if verbose: print "#", " ".join(args)
68+
popen_args = args
69+
prev_error_mode = SEM_INVALID_VALUE
70+
if utils.IsWindows():
71+
popen_args = subprocess.list2cmdline(args)
72+
# Try to change the error mode to avoid dialogs on fatal errors. Don't
73+
# touch any existing error mode flags by merging the existing error mode.
74+
# See http://blogs.msdn.com/oldnewthing/archive/2004/07/27/198410.aspx.
75+
error_mode = SEM_NOGPFAULTERRORBOX
76+
prev_error_mode = Win32SetErrorMode(error_mode)
77+
Win32SetErrorMode(error_mode | prev_error_mode)
78+
process = subprocess.Popen(
79+
shell=utils.IsWindows(),
80+
args=popen_args,
81+
**rest
82+
)
83+
if (utils.IsWindows() and prev_error_mode != SEM_INVALID_VALUE):
84+
Win32SetErrorMode(prev_error_mode)
85+
# Compute the end time - if the process crosses this limit we
86+
# consider it timed out.
87+
if timeout is None: end_time = None
88+
else: end_time = time.time() + timeout
89+
timed_out = False
90+
# Repeatedly check the exit code from the process in a
91+
# loop and keep track of whether or not it times out.
92+
exit_code = None
93+
sleep_time = INITIAL_SLEEP_TIME
94+
while exit_code is None:
95+
if (not end_time is None) and (time.time() >= end_time):
96+
# Kill the process and wait for it to exit.
97+
KillProcessWithID(process.pid)
98+
exit_code = process.wait()
99+
timed_out = True
100+
else:
101+
exit_code = process.poll()
102+
time.sleep(sleep_time)
103+
sleep_time = sleep_time * SLEEP_TIME_FACTOR
104+
if sleep_time > MAX_SLEEP_TIME:
105+
sleep_time = MAX_SLEEP_TIME
106+
return (exit_code, timed_out)
110107

111108

112109
def PrintError(string):
@@ -142,11 +139,9 @@ def Execute(args, verbose=False, timeout=None):
142139
stdout=fd_out,
143140
stderr=fd_err
144141
)
145-
except KeyboardInterrupt:
146-
raise
147-
except:
148-
raise
149142
finally:
143+
# TODO(machenbach): A keyboard interrupt before the assignment to
144+
# fd_out|err can lead to reference errors here.
150145
os.close(fd_out)
151146
os.close(fd_err)
152147
out = file(outname).read()

tools/testrunner/local/execution.py

+20-50
Original file line numberDiff line numberDiff line change
@@ -26,19 +26,14 @@
2626
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
2727

2828

29-
import multiprocessing
3029
import os
31-
import threading
3230
import time
3331

32+
from pool import Pool
3433
from . import commands
3534
from . import utils
3635

3736

38-
BREAK_NOW = -1
39-
EXCEPTION = -2
40-
41-
4237
class Job(object):
4338
def __init__(self, command, dep_command, test_id, timeout, verbose):
4439
self.command = command
@@ -49,24 +44,17 @@ def __init__(self, command, dep_command, test_id, timeout, verbose):
4944

5045

5146
def RunTest(job):
52-
try:
53-
start_time = time.time()
54-
if job.dep_command is not None:
55-
dep_output = commands.Execute(job.dep_command, job.verbose, job.timeout)
56-
# TODO(jkummerow): We approximate the test suite specific function
57-
# IsFailureOutput() by just checking the exit code here. Currently
58-
# only cctests define dependencies, for which this simplification is
59-
# correct.
60-
if dep_output.exit_code != 0:
61-
return (job.id, dep_output, time.time() - start_time)
62-
output = commands.Execute(job.command, job.verbose, job.timeout)
63-
return (job.id, output, time.time() - start_time)
64-
except KeyboardInterrupt:
65-
return (-1, BREAK_NOW, 0)
66-
except Exception, e:
67-
print(">>> EXCEPTION: %s" % e)
68-
return (-1, EXCEPTION, 0)
69-
47+
start_time = time.time()
48+
if job.dep_command is not None:
49+
dep_output = commands.Execute(job.dep_command, job.verbose, job.timeout)
50+
# TODO(jkummerow): We approximate the test suite specific function
51+
# IsFailureOutput() by just checking the exit code here. Currently
52+
# only cctests define dependencies, for which this simplification is
53+
# correct.
54+
if dep_output.exit_code != 0:
55+
return (job.id, dep_output, time.time() - start_time)
56+
output = commands.Execute(job.command, job.verbose, job.timeout)
57+
return (job.id, output, time.time() - start_time)
7058

7159
class Runner(object):
7260

@@ -83,8 +71,6 @@ def _CommonInit(self, num_tests, progress_indicator, context):
8371
self.remaining = num_tests
8472
self.failed = []
8573
self.crashed = 0
86-
self.terminate = False
87-
self.lock = threading.Lock()
8874

8975
def Run(self, jobs):
9076
self.indicator.Starting()
@@ -95,8 +81,11 @@ def Run(self, jobs):
9581
return 0
9682

9783
def _RunInternal(self, jobs):
98-
pool = multiprocessing.Pool(processes=jobs)
84+
pool = Pool(jobs)
9985
test_map = {}
86+
# TODO(machenbach): Instead of filling the queue completely before
87+
# pool.imap_unordered, make this a generator that already starts testing
88+
# while the queue is filled.
10089
queue = []
10190
queued_exception = None
10291
for test in self.tests:
@@ -119,22 +108,11 @@ def _RunInternal(self, jobs):
119108
else:
120109
dep_command = None
121110
job = Job(command, dep_command, test.id, timeout, self.context.verbose)
122-
queue.append(job)
111+
queue.append([job])
123112
try:
124-
kChunkSize = 1
125-
it = pool.imap_unordered(RunTest, queue, kChunkSize)
113+
it = pool.imap_unordered(RunTest, queue)
126114
for result in it:
127-
test_id = result[0]
128-
if test_id < 0:
129-
if result[1] == BREAK_NOW:
130-
self.terminate = True
131-
else:
132-
continue
133-
if self.terminate:
134-
pool.terminate()
135-
pool.join()
136-
raise BreakNowException("User pressed Ctrl+C or IO went wrong")
137-
test = test_map[test_id]
115+
test = test_map[result[0]]
138116
self.indicator.AboutToRun(test)
139117
test.output = result[1]
140118
test.duration = result[2]
@@ -147,18 +125,10 @@ def _RunInternal(self, jobs):
147125
self.succeeded += 1
148126
self.remaining -= 1
149127
self.indicator.HasRun(test, has_unexpected_output)
150-
except KeyboardInterrupt:
151-
pool.terminate()
152-
pool.join()
153-
raise
154-
except Exception, e:
155-
print("Exception: %s" % e)
128+
finally:
156129
pool.terminate()
157-
pool.join()
158-
raise
159130
if queued_exception:
160131
raise queued_exception
161-
return
162132

163133

164134
def GetCommand(self, test):

0 commit comments

Comments
 (0)