Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions include/nighthawk/client/benchmark_client.h
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ class BenchmarkClient {
*
* @param measure_latencies true iff latencies should be measured.
*/
virtual void setShouldMeasureLatencies(bool measure_latencies) PURE;
virtual void setMeasureLatencies(bool measure_latencies) PURE;

/**
* Gets the statistics, keyed by id.
Expand Down Expand Up @@ -59,7 +59,7 @@ class BenchmarkClient {
*
* @return bool indicating if latency measurement is enabled.
*/
virtual bool shouldMeasureLatencies() const PURE;
virtual bool measureLatencies() const PURE;
};

using BenchmarkClientPtr = std::unique_ptr<BenchmarkClient>;
Expand Down
6 changes: 3 additions & 3 deletions include/nighthawk/client/client_worker.h
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
#include "envoy/stats/store.h"

#include "nighthawk/client/benchmark_client.h"
#include "nighthawk/common/phase.h"
#include "nighthawk/common/sequencer.h"
#include "nighthawk/common/statistic.h"
#include "nighthawk/common/worker.h"

Expand All @@ -31,9 +31,9 @@ class ClientWorker : virtual public Worker {
virtual const std::map<std::string, uint64_t>& threadLocalCounterValues() PURE;

/**
* @return const Phase& associated to this worker.
* @return const Sequencer&
*/
virtual const Phase& phase() const PURE;
virtual const Sequencer& sequencer() const PURE;
};

using ClientWorkerPtr = std::unique_ptr<ClientWorker>;
Expand Down
11 changes: 5 additions & 6 deletions include/nighthawk/client/factories.h
Original file line number Diff line number Diff line change
Expand Up @@ -37,10 +37,9 @@ class SequencerFactory {
public:
virtual ~SequencerFactory() = default;
virtual SequencerPtr create(Envoy::TimeSource& time_source, Envoy::Event::Dispatcher& dispatcher,
BenchmarkClient& benchmark_client,
TerminationPredicatePtr&& termination_predicate,
Envoy::Stats::Scope& scope,
const Envoy::MonotonicTime scheduled_starting_time) const PURE;
Envoy::MonotonicTime start_time, BenchmarkClient& benchmark_client,
TerminationPredicate& termination_predicate,
Envoy::Stats::Scope& scope) const PURE;
};

class StoreFactory {
Expand Down Expand Up @@ -75,8 +74,8 @@ class RequestSourceFactory {
class TerminationPredicateFactory {
public:
virtual ~TerminationPredicateFactory() = default;
virtual TerminationPredicatePtr create(Envoy::TimeSource& time_source,
Envoy::Stats::Scope& scope) const PURE;
virtual TerminationPredicatePtr create(Envoy::TimeSource& time_source, Envoy::Stats::Scope& scope,
Envoy::MonotonicTime time) const PURE;
};

} // namespace Nighthawk
1 change: 0 additions & 1 deletion include/nighthawk/common/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ envoy_basic_cc_library(
hdrs = [
"exception.h",
"operation_callback.h",
"phase.h",
"platform_util.h",
"rate_limiter.h",
"request_source.h",
Expand Down
51 changes: 0 additions & 51 deletions include/nighthawk/common/phase.h

This file was deleted.

8 changes: 0 additions & 8 deletions include/nighthawk/common/termination_predicate.h
Original file line number Diff line number Diff line change
Expand Up @@ -32,14 +32,6 @@ class TerminationPredicate {
*/
virtual TerminationPredicate& link(TerminationPredicatePtr&& child) PURE;

/**
* Appends a predicate to the last element of the chain.
*
* @param child the child predicate to link. nullptr is not allowed.
* @return the dereferenced input child predicate. For convenience, so calls can be chained.
*/
virtual TerminationPredicate& appendToChain(TerminationPredicatePtr&& child) PURE;

/**
* Recursively evaluates chain of linked predicates, this instance last.
* If any linked element returns anything other then PROCEED that status will
Expand Down
2 changes: 1 addition & 1 deletion source/client/benchmark_client_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,7 @@ bool BenchmarkClientHttpImpl::tryStartRequest(CompletionCallback caller_completi
std::string x_request_id = generator_.uuid();
auto stream_decoder = new StreamDecoder(
dispatcher_, api_.timeSource(), *this, std::move(caller_completion_callback),
*connect_statistic_, *response_statistic_, request->header(), shouldMeasureLatencies(),
*connect_statistic_, *response_statistic_, request->header(), measureLatencies(),
content_length, x_request_id, http_tracer_);
requests_initiated_++;
pool_ptr->newStream(*stream_decoder, *stream_decoder);
Expand Down
4 changes: 2 additions & 2 deletions source/client/benchmark_client_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -135,8 +135,8 @@ class BenchmarkClientHttpImpl : public BenchmarkClient,
// BenchmarkClient
void terminate() override;
StatisticPtrMap statistics() const override;
bool shouldMeasureLatencies() const override { return measure_latencies_; }
void setShouldMeasureLatencies(bool measure_latencies) override {
bool measureLatencies() const override { return measure_latencies_; }
void setMeasureLatencies(bool measure_latencies) override {
measure_latencies_ = measure_latencies;
}
bool tryStartRequest(CompletionCallback caller_completion_callback) override;
Expand Down
32 changes: 12 additions & 20 deletions source/client/client_worker_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,12 @@

#include "external/envoy/source/common/stats/symbol_table_impl.h"

#include "common/phase_impl.h"
#include "common/termination_predicate_impl.h"
#include "common/request_source_impl.h"
#include "common/utility.h"

namespace Nighthawk {
namespace Client {

using namespace std::chrono_literals;

ClientWorkerImpl::ClientWorkerImpl(Envoy::Api::Api& api, Envoy::ThreadLocal::Instance& tls,
Envoy::Upstream::ClusterManagerPtr& cluster_manager,
const BenchmarkClientFactory& benchmark_client_factory,
Expand All @@ -20,23 +17,20 @@ ClientWorkerImpl::ClientWorkerImpl(Envoy::Api::Api& api, Envoy::ThreadLocal::Ins
Envoy::Stats::Store& store, const int worker_number,
const Envoy::MonotonicTime starting_time,
Envoy::Tracing::HttpTracerPtr& http_tracer)
: WorkerImpl(api, tls, store), termination_predicate_factory_(termination_predicate_factory),
sequencer_factory_(sequencer_factory), worker_scope_(store_.createScope("cluster.")),
: WorkerImpl(api, tls, store), worker_scope_(store_.createScope("cluster.")),
worker_number_scope_(worker_scope_->createScope(fmt::format("{}.", worker_number))),
worker_number_(worker_number), http_tracer_(http_tracer),
worker_number_(worker_number), starting_time_(starting_time), http_tracer_(http_tracer),
request_generator_(
request_generator_factory.create(cluster_manager, *dispatcher_, *worker_number_scope_,
fmt::format("{}.requestsource", worker_number))),
benchmark_client_(benchmark_client_factory.create(
api, *dispatcher_, *worker_number_scope_, cluster_manager, http_tracer_,
fmt::format("{}", worker_number), *request_generator_)),
phase_(std::make_unique<PhaseImpl>(
"main",
sequencer_factory_.create(
time_source_, *dispatcher_, *benchmark_client_,
termination_predicate_factory_.create(time_source_, *worker_number_scope_),
*worker_number_scope_, starting_time),
true)) {}
termination_predicate_(
termination_predicate_factory.create(time_source_, *worker_number_scope_, starting_time)),
sequencer_(sequencer_factory.create(time_source_, *dispatcher_, starting_time,
*benchmark_client_, *termination_predicate_,
*worker_number_scope_)) {}

void ClientWorkerImpl::simpleWarmup() {
ENVOY_LOG(debug, "> worker {}: warmup start.", worker_number_);
Expand All @@ -49,12 +43,11 @@ void ClientWorkerImpl::simpleWarmup() {
}

void ClientWorkerImpl::work() {
benchmark_client_->setShouldMeasureLatencies(false);
request_generator_->initOnThread();
simpleWarmup();
benchmark_client_->setShouldMeasureLatencies(phase_->shouldMeasureLatencies());
phase_->run();

benchmark_client_->setMeasureLatencies(true);
sequencer_->start();
sequencer_->waitForCompletion();
// Save a final snapshot of the worker-specific counter accumulations before
// we exit the thread.
for (const auto& stat : store_.counters()) {
Expand All @@ -79,8 +72,7 @@ void ClientWorkerImpl::shutdownThread() { benchmark_client_->terminate(); }
StatisticPtrMap ClientWorkerImpl::statistics() const {
StatisticPtrMap statistics;
StatisticPtrMap s1 = benchmark_client_->statistics();
Sequencer& sequencer = phase_->sequencer();
StatisticPtrMap s2 = sequencer.statistics();
StatisticPtrMap s2 = sequencer_->statistics();
statistics.insert(s1.begin(), s1.end());
statistics.insert(s2.begin(), s2.end());
return statistics;
Expand Down
13 changes: 4 additions & 9 deletions source/client/client_worker_impl.h
Original file line number Diff line number Diff line change
@@ -1,7 +1,5 @@
#pragma once

#include <vector>

#include "envoy/api/api.h"
#include "envoy/event/dispatcher.h"
#include "envoy/stats/store.h"
Expand All @@ -11,7 +9,6 @@
#include "nighthawk/client/benchmark_client.h"
#include "nighthawk/client/client_worker.h"
#include "nighthawk/client/factories.h"
#include "nighthawk/common/phase.h"
#include "nighthawk/common/request_source.h"
#include "nighthawk/common/sequencer.h"
#include "nighthawk/common/termination_predicate.h"
Expand All @@ -37,25 +34,23 @@ class ClientWorkerImpl : public WorkerImpl, virtual public ClientWorker {
const std::map<std::string, uint64_t>& threadLocalCounterValues() override {
return threadLocalCounterValues_;
}

const Phase& phase() const override { return *phase_; }

const Sequencer& sequencer() const override { return *sequencer_; }
void shutdownThread() override;

protected:
void work() override;

private:
void simpleWarmup();
const TerminationPredicateFactory& termination_predicate_factory_;
const SequencerFactory& sequencer_factory_;
Envoy::Stats::ScopePtr worker_scope_;
Envoy::Stats::ScopePtr worker_number_scope_;
const int worker_number_;
const Envoy::MonotonicTime starting_time_;
Envoy::Tracing::HttpTracerPtr& http_tracer_;
RequestSourcePtr request_generator_;
BenchmarkClientPtr benchmark_client_;
PhasePtr phase_;
TerminationPredicatePtr termination_predicate_;
const SequencerPtr sequencer_;
Envoy::LocalInfo::LocalInfoPtr local_info_;
std::map<std::string, uint64_t> threadLocalCounterValues_;
};
Expand Down
28 changes: 15 additions & 13 deletions source/client/factories_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -45,14 +45,15 @@ BenchmarkClientPtr BenchmarkClientFactoryImpl::create(
SequencerFactoryImpl::SequencerFactoryImpl(const Options& options)
: OptionBasedFactoryImpl(options) {}

SequencerPtr SequencerFactoryImpl::create(
Envoy::TimeSource& time_source, Envoy::Event::Dispatcher& dispatcher,
BenchmarkClient& benchmark_client, TerminationPredicatePtr&& termination_predicate,
Envoy::Stats::Scope& scope, const Envoy::MonotonicTime scheduled_starting_time) const {
SequencerPtr SequencerFactoryImpl::create(Envoy::TimeSource& time_source,
Envoy::Event::Dispatcher& dispatcher,
Envoy::MonotonicTime start_time,
BenchmarkClient& benchmark_client,
TerminationPredicate& termination_predicate,
Envoy::Stats::Scope& scope) const {
StatisticFactoryImpl statistic_factory(options_);
Frequency frequency(options_.requestsPerSecond());
RateLimiterPtr rate_limiter = std::make_unique<ScheduledStartingRateLimiter>(
std::make_unique<LinearRateLimiter>(time_source, frequency), scheduled_starting_time);
RateLimiterPtr rate_limiter =
std::make_unique<LinearRateLimiter>(time_source, Frequency(options_.requestsPerSecond()));
const uint64_t burst_size = options_.burstSize();

if (burst_size) {
Expand All @@ -70,9 +71,9 @@ SequencerPtr SequencerFactoryImpl::create(
return benchmark_client.tryStartRequest(std::move(f));
};
return std::make_unique<SequencerImpl>(
platform_util_, dispatcher, time_source, std::move(rate_limiter), sequencer_target,
statistic_factory.create(), statistic_factory.create(), options_.sequencerIdleStrategy(),
std::move(termination_predicate), scope);
platform_util_, dispatcher, time_source, start_time, std::move(rate_limiter),
sequencer_target, statistic_factory.create(), statistic_factory.create(),
options_.sequencerIdleStrategy(), termination_predicate, scope);
}

StoreFactoryImpl::StoreFactoryImpl(const Options& options) : OptionBasedFactoryImpl(options) {}
Expand Down Expand Up @@ -171,10 +172,11 @@ RequestSourceFactoryImpl::create(const Envoy::Upstream::ClusterManagerPtr& clust
TerminationPredicateFactoryImpl::TerminationPredicateFactoryImpl(const Options& options)
: OptionBasedFactoryImpl(options) {}

TerminationPredicatePtr TerminationPredicateFactoryImpl::create(Envoy::TimeSource& time_source,
Envoy::Stats::Scope& scope) const {
TerminationPredicatePtr
TerminationPredicateFactoryImpl::create(Envoy::TimeSource& time_source, Envoy::Stats::Scope& scope,
const Envoy::MonotonicTime start) const {
TerminationPredicatePtr duration_predicate =
std::make_unique<DurationTerminationPredicateImpl>(time_source, options_.duration());
std::make_unique<DurationTerminationPredicateImpl>(time_source, start, options_.duration());
TerminationPredicate* current_predicate = duration_predicate.get();
current_predicate = linkConfiguredPredicates(*current_predicate, options_.failurePredicates(),
TerminationPredicate::Status::FAIL, scope);
Expand Down
10 changes: 5 additions & 5 deletions source/client/factories_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -39,9 +39,9 @@ class SequencerFactoryImpl : public OptionBasedFactoryImpl, public SequencerFact
public:
SequencerFactoryImpl(const Options& options);
SequencerPtr create(Envoy::TimeSource& time_source, Envoy::Event::Dispatcher& dispatcher,
BenchmarkClient& benchmark_client,
TerminationPredicatePtr&& termination_predicate, Envoy::Stats::Scope& scope,
const Envoy::MonotonicTime scheduled_starting_time) const override;
Envoy::MonotonicTime start_time, BenchmarkClient& benchmark_client,
TerminationPredicate& termination_predicate,
Envoy::Stats::Scope& scope) const override;
};

class StoreFactoryImpl : public OptionBasedFactoryImpl, public StoreFactory {
Expand Down Expand Up @@ -78,8 +78,8 @@ class TerminationPredicateFactoryImpl : public OptionBasedFactoryImpl,
public TerminationPredicateFactory {
public:
TerminationPredicateFactoryImpl(const Options& options);
TerminationPredicatePtr create(Envoy::TimeSource& time_source,
Envoy::Stats::Scope& scope) const override;
TerminationPredicatePtr create(Envoy::TimeSource& time_source, Envoy::Stats::Scope& scope,
const Envoy::MonotonicTime start) const override;
TerminationPredicate* linkConfiguredPredicates(
TerminationPredicate& last_predicate, const TerminationPredicateMap& predicates,
const TerminationPredicate::Status termination_status, Envoy::Stats::Scope& scope) const;
Expand Down
2 changes: 1 addition & 1 deletion source/client/process_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -454,7 +454,7 @@ bool ProcessImpl::run(OutputCollector& collector) {
int i = 0;
std::chrono::nanoseconds total_execution_duration = 0ns;
for (auto& worker : workers_) {
auto sequencer_execution_duration = worker->phase().sequencer().executionDuration();
auto sequencer_execution_duration = worker->sequencer().executionDuration();
// We don't write per-worker results if we only have a single worker, because the global results
// will be precisely the same.
if (workers_.size() > 1) {
Expand Down
2 changes: 0 additions & 2 deletions source/common/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,6 @@ envoy_cc_library(
envoy_cc_library(
name = "nighthawk_common_lib",
srcs = [
"phase_impl.cc",
"rate_limiter_impl.cc",
"sequencer_impl.cc",
"statistic_impl.cc",
Expand All @@ -76,7 +75,6 @@ envoy_cc_library(
],
hdrs = [
"frequency.h",
"phase_impl.h",
"platform_util_impl.h",
"rate_limiter_impl.h",
"sequencer_impl.h",
Expand Down
18 changes: 0 additions & 18 deletions source/common/phase_impl.cc

This file was deleted.

Loading