Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add event support #315

Merged
merged 6 commits into from
Jun 22, 2015
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
59 changes: 59 additions & 0 deletions perfkitbenchmarker/events.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
# Copyright 2015 Google Inc. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""Defines observable events in PerfKitBenchmarker.

All events are passed keyword arguments, and possibly a sender. See event
definitions below.

Event handlers are run synchronously in an unspecified order; any exceptions
raised will be propagated.
"""

from blinker import Namespace

_events = Namespace()


initialization_complete = _events.signal('system-ready', doc="""
Signal sent once after the system is initialized (command-line flags
parsed, temporary directory initialized, run_uri set).

Sender: None
Payload: parsed_flags, the parsed FLAGS object.""")


RUN_PHASE = 'run'

before_phase = _events.signal('before-phase', doc="""
Signal sent immediately before a phase runs.

Sender: the phase. Currently only RUN_PHASE.
Payload: benchmark_spec.""")

after_phase = _events.signal('after-phase', doc="""
Signal sent immediately after a phase runs, regardless of whether it was
successful.

Sender: the phase. Currently only RUN_PHASE.
Payload: benchmark_spec.""")

sample_created = _events.signal('sample-created', doc="""
Called with sample object and benchmark spec.

Signal sent immediately after a sample is created by a publisher.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Clarify if the sample is mutable or not? Is it ok to add metadata and expect it'll be stored?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is mutable; now noted.

The sample's metadata is mutable, and may be updated by the subscriber.

Sender: None
Payload: benchmark_spec (BenchmarkSpec), sample (dict).""")
13 changes: 6 additions & 7 deletions perfkitbenchmarker/pkb.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@
from perfkitbenchmarker import benchmark_sets
from perfkitbenchmarker import benchmark_spec
from perfkitbenchmarker import disk
from perfkitbenchmarker import events
from perfkitbenchmarker import flags
from perfkitbenchmarker import log_util
from perfkitbenchmarker import static_virtual_machine
Expand All @@ -77,9 +78,6 @@
LOG_FILE_NAME = 'pkb.log'
REQUIRED_INFO = ['scratch_disk', 'num_machines']
REQUIRED_EXECUTABLES = frozenset(['ssh', 'ssh-keygen', 'scp', 'openssl'])
# List of functions taking a benchmark_spec. Will be called before benchmark.Run
# with two parameters: the benchmark and benchmark_spec.
BEFORE_RUN_HOOKS = []
FLAGS = flags.FLAGS

flags.DEFINE_list('ssh_options', [], 'Additional options to pass to ssh.')
Expand Down Expand Up @@ -214,12 +212,12 @@ def DoRunPhase(benchmark, name, spec, collector, timer):
benchmark module's Run function.
"""
logging.info('Running benchmark %s', name)
for before_run_hook in BEFORE_RUN_HOOKS:
before_run_hook(benchmark=benchmark, benchmark_spec=spec)

with vm_util.RunDStatIfConfigured(spec.vms, suffix='-{0}-dstat'.format(name)):
events.before_phase.send(events.RUN_PHASE, benchmark_spec=spec)
try:
with timer.Measure('Benchmark Run'):
samples = benchmark.Run(spec)
finally:
events.after_phase.send(events.RUN_PHASE, benchmark_spec=spec)
collector.AddSamples(samples, name, spec)


Expand Down Expand Up @@ -384,6 +382,7 @@ def RunBenchmarks(publish=True):

vm_util.SSHKeyGen()
collector = SampleCollector()
events.initialization_complete.send(parsed_flags=FLAGS)

if FLAGS.static_vm_file:
with open(FLAGS.static_vm_file) as fp:
Expand Down
3 changes: 3 additions & 0 deletions perfkitbenchmarker/publisher.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import uuid

from perfkitbenchmarker import disk
from perfkitbenchmarker import events
from perfkitbenchmarker import flags
from perfkitbenchmarker import version
from perfkitbenchmarker import vm_util
Expand Down Expand Up @@ -551,6 +552,8 @@ def AddSamples(self, samples, benchmark, benchmark_spec):
sample['timestamp'] = time.time()
sample['run_uri'] = self.run_uri
sample['sample_uri'] = str(uuid.uuid4())
events.sample_created.send(benchmark_spec=benchmark_spec,
sample=sample)
self.samples.append(sample)

def PublishSamples(self):
Expand Down
104 changes: 73 additions & 31 deletions perfkitbenchmarker/vm_util.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
"""Set of utility functions for working with virtual machines."""

import contextlib
import functools
import logging
import os
import posixpath
Expand All @@ -26,11 +27,13 @@
import threading
import time
import traceback
import uuid

import jinja2

from perfkitbenchmarker import data
from perfkitbenchmarker import errors
from perfkitbenchmarker import events
from perfkitbenchmarker import flags
from perfkitbenchmarker import log_util
from perfkitbenchmarker import regex_util
Expand Down Expand Up @@ -70,6 +73,11 @@
flags.DEFINE_integer('dstat_interval', None,
'dstat sample collection frequency, in seconds. Only '
'applicable when --dstat is specified.')
flags.DEFINE_string('dstat_output', None,
'Output directory for dstat output. '
'Only applicable when --dstat is specified. '
'Default: run temporary directory.')



class IpAddressSubset(object):
Expand Down Expand Up @@ -592,28 +600,31 @@ def ExecutableOnPath(executable_name):
return True


@contextlib.contextmanager
def RunDStatIfConfigured(vms, suffix='-dstat'):
"""Installs and runs dstat on 'vms' if FLAGS.dstat is set.
class _DStatCollector(object):
"""dstat collector.

On exiting the context manager, the results are copied to the run temp dir.
Installs and runs dstat on a collection of VMs.
"""

Args:
vms: List of virtual machines.
suffix: str. Suffix to add to each dstat result file.
def __init__(self, interval=None, output_directory=None):
"""Runs dstat on 'vms'.

Yields:
None
"""
if not FLAGS.dstat:
yield
return
Start dstat collection via `Start`. Stop via `Stop`.

lock = threading.Lock()
pids = {}
file_names = {}
Args:
interval: Optional int. Interval in seconds in which to collect samples.
"""
self.interval = interval
self.output_directory = output_directory or GetTempDir()
self._lock = threading.Lock()
self._pids = {}
self._file_names = {}

def StartDStat(vm):
if not os.path.isdir(self.output_directory):
raise IOError('dstat output directory does not exist: {0}'.format(
self.output_directory))

def _StartOnVm(self, vm, suffix='-dstat'):
vm.Install('dstat')

num_cpus = vm.num_cpus
Expand All @@ -633,25 +644,56 @@ def StartDStat(vm):
max_cpu=num_cpus - 1,
block_devices=','.join(block_devices),
output=dstat_file,
dstat_interval=FLAGS.dstat_interval or '')
dstat_interval=self.interval or '')
stdout, _ = vm.RemoteCommand(cmd)
with lock:
pids[vm.name] = stdout.strip()
file_names[vm.name] = dstat_file
with self._lock:
self._pids[vm.name] = stdout.strip()
self._file_names[vm.name] = dstat_file

def StopDStat(vm):
if vm.name not in pids:
def _StopOnVm(self, vm):
"""Stop dstat on 'vm', copy the results to the run temporary directory."""
if vm.name not in self._pids:
logging.warn('No dstat PID for %s', vm.name)
return
cmd = 'kill {0} || true'.format(pids[vm.name])
else:
with self._lock:
pid = self._pids.pop(vm.name)
file_name = self._file_names.pop(vm.name)
cmd = 'kill {0} || true'.format(pid)
vm.RemoteCommand(cmd)
try:
vm.PullFile(GetTempDir(), file_names[vm.name])
vm.PullFile(self.output_directory, file_name)
except:
logging.exception('Failed fetching dstat result.')
logging.exception('Failed fetching dstat result from %s.', vm.name)

RunThreaded(StartDStat, vms)
try:
yield
finally:
RunThreaded(StopDStat, vms)
def Start(self, sender, benchmark_spec):
"""Install and start dstat on all VMs in 'benchmark_spec'."""
suffix = '-{0}-{1}-dstat'.format(benchmark_spec.benchmark_name,
str(uuid.uuid4())[:8])
start_on_vm = functools.partial(self._StartOnVm, suffix=suffix)
RunThreaded(start_on_vm, benchmark_spec.vms)

def Stop(self, sender, benchmark_spec):
"""Stop dstat on all VMs in 'benchmark_spec', fetch results."""
RunThreaded(self._StopOnVm, benchmark_spec.vms)


@events.initialization_complete.connect
def _RegisterDStatCollector(sender, parsed_flags):
"""Registers the dstat collector if FLAGS.dstat is set."""
if not parsed_flags.dstat:
return

output_directory = (parsed_flags.dstat_output
if parsed_flags['dstat_output'].present
else GetTempDir())

logging.debug('Registering dstat collector with interval %s, output to %s.',
parsed_flags.dstat_interval, output_directory)

if not os.path.isdir(output_directory):
os.makedirs(output_directory)
collector = _DStatCollector(interval=parsed_flags.dstat_interval,
output_directory=output_directory)
events.before_phase.connect(collector.Start, events.RUN_PHASE, weak=False)
events.after_phase.connect(collector.Stop, events.RUN_PHASE, weak=False)
1 change: 1 addition & 0 deletions requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -15,3 +15,4 @@ python-gflags==2.0
jinja2>=2.7
setuptools
colorlog[windows]==2.6.0
blinker>=1.3