diff --git a/perfkitbenchmarker/events.py b/perfkitbenchmarker/events.py new file mode 100644 index 0000000000..8f38a6c614 --- /dev/null +++ b/perfkitbenchmarker/events.py @@ -0,0 +1,59 @@ +# Copyright 2015 Google Inc. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Defines observable events in PerfKitBenchmarker. + +All events are passed keyword arguments, and possibly a sender. See event +definitions below. + +Event handlers are run synchronously in an unspecified order; any exceptions +raised will be propagated. +""" + +from blinker import Namespace + +_events = Namespace() + + +initialization_complete = _events.signal('system-ready', doc=""" +Signal sent once after the system is initialized (command-line flags +parsed, temporary directory initialized, run_uri set). + +Sender: None +Payload: parsed_flags, the parsed FLAGS object.""") + + +RUN_PHASE = 'run' + +before_phase = _events.signal('before-phase', doc=""" +Signal sent immediately before a phase runs. + +Sender: the phase. Currently only RUN_PHASE. +Payload: benchmark_spec.""") + +after_phase = _events.signal('after-phase', doc=""" +Signal sent immediately after a phase runs, regardless of whether it was +successful. + +Sender: the phase. Currently only RUN_PHASE. +Payload: benchmark_spec.""") + +sample_created = _events.signal('sample-created', doc=""" +Called with sample object and benchmark spec. + +Signal sent immediately after a sample is created by a publisher. +The sample's metadata is mutable, and may be updated by the subscriber. + +Sender: None +Payload: benchmark_spec (BenchmarkSpec), sample (dict).""") diff --git a/perfkitbenchmarker/pkb.py b/perfkitbenchmarker/pkb.py index 9cab8ab4ed..a5e218b93f 100644 --- a/perfkitbenchmarker/pkb.py +++ b/perfkitbenchmarker/pkb.py @@ -62,6 +62,7 @@ from perfkitbenchmarker import benchmark_sets from perfkitbenchmarker import benchmark_spec from perfkitbenchmarker import disk +from perfkitbenchmarker import events from perfkitbenchmarker import flags from perfkitbenchmarker import log_util from perfkitbenchmarker import static_virtual_machine @@ -77,9 +78,6 @@ LOG_FILE_NAME = 'pkb.log' REQUIRED_INFO = ['scratch_disk', 'num_machines'] REQUIRED_EXECUTABLES = frozenset(['ssh', 'ssh-keygen', 'scp', 'openssl']) -# List of functions taking a benchmark_spec. Will be called before benchmark.Run -# with two parameters: the benchmark and benchmark_spec. -BEFORE_RUN_HOOKS = [] FLAGS = flags.FLAGS flags.DEFINE_list('ssh_options', [], 'Additional options to pass to ssh.') @@ -214,12 +212,12 @@ def DoRunPhase(benchmark, name, spec, collector, timer): benchmark module's Run function. """ logging.info('Running benchmark %s', name) - for before_run_hook in BEFORE_RUN_HOOKS: - before_run_hook(benchmark=benchmark, benchmark_spec=spec) - - with vm_util.RunDStatIfConfigured(spec.vms, suffix='-{0}-dstat'.format(name)): + events.before_phase.send(events.RUN_PHASE, benchmark_spec=spec) + try: with timer.Measure('Benchmark Run'): samples = benchmark.Run(spec) + finally: + events.after_phase.send(events.RUN_PHASE, benchmark_spec=spec) collector.AddSamples(samples, name, spec) @@ -384,6 +382,7 @@ def RunBenchmarks(publish=True): vm_util.SSHKeyGen() collector = SampleCollector() + events.initialization_complete.send(parsed_flags=FLAGS) if FLAGS.static_vm_file: with open(FLAGS.static_vm_file) as fp: diff --git a/perfkitbenchmarker/publisher.py b/perfkitbenchmarker/publisher.py index 8df31593b6..c289fda685 100644 --- a/perfkitbenchmarker/publisher.py +++ b/perfkitbenchmarker/publisher.py @@ -25,6 +25,7 @@ import uuid from perfkitbenchmarker import disk +from perfkitbenchmarker import events from perfkitbenchmarker import flags from perfkitbenchmarker import version from perfkitbenchmarker import vm_util @@ -551,6 +552,8 @@ def AddSamples(self, samples, benchmark, benchmark_spec): sample['timestamp'] = time.time() sample['run_uri'] = self.run_uri sample['sample_uri'] = str(uuid.uuid4()) + events.sample_created.send(benchmark_spec=benchmark_spec, + sample=sample) self.samples.append(sample) def PublishSamples(self): diff --git a/perfkitbenchmarker/vm_util.py b/perfkitbenchmarker/vm_util.py index 8e559268a8..7ab1b04870 100644 --- a/perfkitbenchmarker/vm_util.py +++ b/perfkitbenchmarker/vm_util.py @@ -15,6 +15,7 @@ """Set of utility functions for working with virtual machines.""" import contextlib +import functools import logging import os import posixpath @@ -26,11 +27,13 @@ import threading import time import traceback +import uuid import jinja2 from perfkitbenchmarker import data from perfkitbenchmarker import errors +from perfkitbenchmarker import events from perfkitbenchmarker import flags from perfkitbenchmarker import log_util from perfkitbenchmarker import regex_util @@ -70,6 +73,11 @@ flags.DEFINE_integer('dstat_interval', None, 'dstat sample collection frequency, in seconds. Only ' 'applicable when --dstat is specified.') +flags.DEFINE_string('dstat_output', None, + 'Output directory for dstat output. ' + 'Only applicable when --dstat is specified. ' + 'Default: run temporary directory.') + class IpAddressSubset(object): @@ -592,28 +600,31 @@ def ExecutableOnPath(executable_name): return True -@contextlib.contextmanager -def RunDStatIfConfigured(vms, suffix='-dstat'): - """Installs and runs dstat on 'vms' if FLAGS.dstat is set. +class _DStatCollector(object): + """dstat collector. - On exiting the context manager, the results are copied to the run temp dir. + Installs and runs dstat on a collection of VMs. + """ - Args: - vms: List of virtual machines. - suffix: str. Suffix to add to each dstat result file. + def __init__(self, interval=None, output_directory=None): + """Runs dstat on 'vms'. - Yields: - None - """ - if not FLAGS.dstat: - yield - return + Start dstat collection via `Start`. Stop via `Stop`. - lock = threading.Lock() - pids = {} - file_names = {} + Args: + interval: Optional int. Interval in seconds in which to collect samples. + """ + self.interval = interval + self.output_directory = output_directory or GetTempDir() + self._lock = threading.Lock() + self._pids = {} + self._file_names = {} - def StartDStat(vm): + if not os.path.isdir(self.output_directory): + raise IOError('dstat output directory does not exist: {0}'.format( + self.output_directory)) + + def _StartOnVm(self, vm, suffix='-dstat'): vm.Install('dstat') num_cpus = vm.num_cpus @@ -633,25 +644,56 @@ def StartDStat(vm): max_cpu=num_cpus - 1, block_devices=','.join(block_devices), output=dstat_file, - dstat_interval=FLAGS.dstat_interval or '') + dstat_interval=self.interval or '') stdout, _ = vm.RemoteCommand(cmd) - with lock: - pids[vm.name] = stdout.strip() - file_names[vm.name] = dstat_file + with self._lock: + self._pids[vm.name] = stdout.strip() + self._file_names[vm.name] = dstat_file - def StopDStat(vm): - if vm.name not in pids: + def _StopOnVm(self, vm): + """Stop dstat on 'vm', copy the results to the run temporary directory.""" + if vm.name not in self._pids: logging.warn('No dstat PID for %s', vm.name) return - cmd = 'kill {0} || true'.format(pids[vm.name]) + else: + with self._lock: + pid = self._pids.pop(vm.name) + file_name = self._file_names.pop(vm.name) + cmd = 'kill {0} || true'.format(pid) vm.RemoteCommand(cmd) try: - vm.PullFile(GetTempDir(), file_names[vm.name]) + vm.PullFile(self.output_directory, file_name) except: - logging.exception('Failed fetching dstat result.') + logging.exception('Failed fetching dstat result from %s.', vm.name) - RunThreaded(StartDStat, vms) - try: - yield - finally: - RunThreaded(StopDStat, vms) + def Start(self, sender, benchmark_spec): + """Install and start dstat on all VMs in 'benchmark_spec'.""" + suffix = '-{0}-{1}-dstat'.format(benchmark_spec.benchmark_name, + str(uuid.uuid4())[:8]) + start_on_vm = functools.partial(self._StartOnVm, suffix=suffix) + RunThreaded(start_on_vm, benchmark_spec.vms) + + def Stop(self, sender, benchmark_spec): + """Stop dstat on all VMs in 'benchmark_spec', fetch results.""" + RunThreaded(self._StopOnVm, benchmark_spec.vms) + + +@events.initialization_complete.connect +def _RegisterDStatCollector(sender, parsed_flags): + """Registers the dstat collector if FLAGS.dstat is set.""" + if not parsed_flags.dstat: + return + + output_directory = (parsed_flags.dstat_output + if parsed_flags['dstat_output'].present + else GetTempDir()) + + logging.debug('Registering dstat collector with interval %s, output to %s.', + parsed_flags.dstat_interval, output_directory) + + if not os.path.isdir(output_directory): + os.makedirs(output_directory) + collector = _DStatCollector(interval=parsed_flags.dstat_interval, + output_directory=output_directory) + events.before_phase.connect(collector.Start, events.RUN_PHASE, weak=False) + events.after_phase.connect(collector.Stop, events.RUN_PHASE, weak=False) diff --git a/requirements.txt b/requirements.txt index 2219e693ed..882694f4ae 100644 --- a/requirements.txt +++ b/requirements.txt @@ -15,3 +15,4 @@ python-gflags==2.0 jinja2>=2.7 setuptools colorlog[windows]==2.6.0 +blinker>=1.3