diff --git a/include/nighthawk/client/benchmark_client.h b/include/nighthawk/client/benchmark_client.h index 89bd6f119..cb0b555f6 100644 --- a/include/nighthawk/client/benchmark_client.h +++ b/include/nighthawk/client/benchmark_client.h @@ -30,7 +30,7 @@ class BenchmarkClient { * * @param measure_latencies true iff latencies should be measured. */ - virtual void setMeasureLatencies(bool measure_latencies) PURE; + virtual void setShouldMeasureLatencies(bool measure_latencies) PURE; /** * Gets the statistics, keyed by id. @@ -59,7 +59,7 @@ class BenchmarkClient { * * @return bool indicating if latency measurement is enabled. */ - virtual bool measureLatencies() const PURE; + virtual bool shouldMeasureLatencies() const PURE; }; using BenchmarkClientPtr = std::unique_ptr; diff --git a/include/nighthawk/client/client_worker.h b/include/nighthawk/client/client_worker.h index 34531e0aa..b772d90c4 100644 --- a/include/nighthawk/client/client_worker.h +++ b/include/nighthawk/client/client_worker.h @@ -6,7 +6,7 @@ #include "envoy/stats/store.h" #include "nighthawk/client/benchmark_client.h" -#include "nighthawk/common/sequencer.h" +#include "nighthawk/common/phase.h" #include "nighthawk/common/statistic.h" #include "nighthawk/common/worker.h" @@ -31,9 +31,9 @@ class ClientWorker : virtual public Worker { virtual const std::map& thread_local_counter_values() PURE; /** - * @return const Sequencer& + * @return const Phase& associated to this worker. */ - virtual const Sequencer& sequencer() const PURE; + virtual const Phase& phase() const PURE; }; using ClientWorkerPtr = std::unique_ptr; diff --git a/include/nighthawk/client/factories.h b/include/nighthawk/client/factories.h index ef737ddd6..bca29c657 100644 --- a/include/nighthawk/client/factories.h +++ b/include/nighthawk/client/factories.h @@ -37,9 +37,10 @@ class SequencerFactory { public: virtual ~SequencerFactory() = default; virtual SequencerPtr create(Envoy::TimeSource& time_source, Envoy::Event::Dispatcher& dispatcher, - Envoy::MonotonicTime start_time, BenchmarkClient& benchmark_client, - TerminationPredicate& termination_predicate, - Envoy::Stats::Scope& scope) const PURE; + BenchmarkClient& benchmark_client, + TerminationPredicatePtr&& termination_predicate, + Envoy::Stats::Scope& scope, + const Envoy::MonotonicTime scheduled_starting_time) const PURE; }; class StoreFactory { @@ -74,8 +75,8 @@ class RequestSourceFactory { class TerminationPredicateFactory { public: virtual ~TerminationPredicateFactory() = default; - virtual TerminationPredicatePtr create(Envoy::TimeSource& time_source, Envoy::Stats::Scope& scope, - Envoy::MonotonicTime time) const PURE; + virtual TerminationPredicatePtr create(Envoy::TimeSource& time_source, + Envoy::Stats::Scope& scope) const PURE; }; } // namespace Nighthawk diff --git a/include/nighthawk/common/BUILD b/include/nighthawk/common/BUILD index fc1269511..f549e8efc 100644 --- a/include/nighthawk/common/BUILD +++ b/include/nighthawk/common/BUILD @@ -13,6 +13,7 @@ envoy_basic_cc_library( hdrs = [ "exception.h", "operation_callback.h", + "phase.h", "platform_util.h", "rate_limiter.h", "request_source.h", diff --git a/include/nighthawk/common/phase.h b/include/nighthawk/common/phase.h new file mode 100644 index 000000000..bcbed7cf6 --- /dev/null +++ b/include/nighthawk/common/phase.h @@ -0,0 +1,51 @@ + +#pragma once + +#include + +#include "envoy/common/pure.h" + +#include "nighthawk/common/sequencer.h" + +namespace Nighthawk { + +/** + * Phase represents a distinct phase of a benchmmark execution, like warmup and cooldown. + * A phase is associated to a sequencer, which in turn can be associated to separate termination + * and failure predicates as well as its own rate limiter policy. The end of a phase also represents + * a natural boundary for reporting a snapshot of stats and latencies associated to the phase. + * High level, a worker statically configure a vector of phases, and will transfer the hot + * connection pool when transitioning between them. At this time, nothing is stopping us from + * dynamically injecting phases later, be it via grpc calls and/or live CLI input. + */ +class Phase { +public: + virtual ~Phase() = default; + + /** + * @return absl::string_view Contains the id of the phase. Should be unique but that is not + * enforced at this time so take care. + */ + virtual absl::string_view id() const PURE; + + /** + * @return Sequencer& Sequencer associated to this phase. + */ + virtual Sequencer& sequencer() const PURE; + + /** + * @return bool Indicates if latencies should be tracked for this phase. + */ + virtual bool shouldMeasureLatencies() const PURE; + + /** + * Runs the sequencer associated to this phase and blocks until completion, which means this phase + * has ended as well. + * Execution failure can be observed via the sequencer.failed_terminations counter. + */ + virtual void run() const PURE; +}; + +using PhasePtr = std::unique_ptr; + +} // namespace Nighthawk \ No newline at end of file diff --git a/include/nighthawk/common/termination_predicate.h b/include/nighthawk/common/termination_predicate.h index 688c9c735..732b2687a 100644 --- a/include/nighthawk/common/termination_predicate.h +++ b/include/nighthawk/common/termination_predicate.h @@ -32,6 +32,14 @@ class TerminationPredicate { */ virtual TerminationPredicate& link(TerminationPredicatePtr&& child) PURE; + /** + * Appends a predicate to the last element of the chain. + * + * @param child the child predicate to link. nullptr is not allowed. + * @return the dereferenced input child predicate. For convenience, so calls can be chained. + */ + virtual TerminationPredicate& appendToChain(TerminationPredicatePtr&& child) PURE; + /** * Recursively evaluates chain of linked predicates, this instance last. * If any linked element returns anything other then PROCEED that status will diff --git a/source/client/benchmark_client_impl.cc b/source/client/benchmark_client_impl.cc index cf9ece7b9..72c7959e9 100644 --- a/source/client/benchmark_client_impl.cc +++ b/source/client/benchmark_client_impl.cc @@ -157,7 +157,7 @@ bool BenchmarkClientHttpImpl::tryStartRequest(CompletionCallback caller_completi std::string x_request_id = generator_.uuid(); auto stream_decoder = new StreamDecoder( dispatcher_, api_.timeSource(), *this, std::move(caller_completion_callback), - *connect_statistic_, *response_statistic_, request->header(), measureLatencies(), + *connect_statistic_, *response_statistic_, request->header(), shouldMeasureLatencies(), content_length, x_request_id, http_tracer_); requests_initiated_++; pool_ptr->newStream(*stream_decoder, *stream_decoder); diff --git a/source/client/benchmark_client_impl.h b/source/client/benchmark_client_impl.h index 9a00d01d9..66782f97d 100644 --- a/source/client/benchmark_client_impl.h +++ b/source/client/benchmark_client_impl.h @@ -135,8 +135,8 @@ class BenchmarkClientHttpImpl : public BenchmarkClient, // BenchmarkClient void terminate() override; StatisticPtrMap statistics() const override; - bool measureLatencies() const override { return measure_latencies_; } - void setMeasureLatencies(bool measure_latencies) override { + bool shouldMeasureLatencies() const override { return measure_latencies_; } + void setShouldMeasureLatencies(bool measure_latencies) override { measure_latencies_ = measure_latencies; } bool tryStartRequest(CompletionCallback caller_completion_callback) override; diff --git a/source/client/client_worker_impl.cc b/source/client/client_worker_impl.cc index c67964fb6..984538a67 100644 --- a/source/client/client_worker_impl.cc +++ b/source/client/client_worker_impl.cc @@ -2,12 +2,15 @@ #include "external/envoy/source/common/stats/symbol_table_impl.h" -#include "common/request_source_impl.h" +#include "common/phase_impl.h" +#include "common/termination_predicate_impl.h" #include "common/utility.h" namespace Nighthawk { namespace Client { +using namespace std::chrono_literals; + ClientWorkerImpl::ClientWorkerImpl(Envoy::Api::Api& api, Envoy::ThreadLocal::Instance& tls, Envoy::Upstream::ClusterManagerPtr& cluster_manager, const BenchmarkClientFactory& benchmark_client_factory, @@ -17,20 +20,23 @@ ClientWorkerImpl::ClientWorkerImpl(Envoy::Api::Api& api, Envoy::ThreadLocal::Ins Envoy::Stats::Store& store, const int worker_number, const Envoy::MonotonicTime starting_time, Envoy::Tracing::HttpTracerPtr& http_tracer) - : WorkerImpl(api, tls, store), worker_scope_(store_.createScope("cluster.")), + : WorkerImpl(api, tls, store), termination_predicate_factory_(termination_predicate_factory), + sequencer_factory_(sequencer_factory), worker_scope_(store_.createScope("cluster.")), worker_number_scope_(worker_scope_->createScope(fmt::format("{}.", worker_number))), - worker_number_(worker_number), starting_time_(starting_time), http_tracer_(http_tracer), + worker_number_(worker_number), http_tracer_(http_tracer), request_generator_( request_generator_factory.create(cluster_manager, *dispatcher_, *worker_number_scope_, fmt::format("{}.requestsource", worker_number))), benchmark_client_(benchmark_client_factory.create( api, *dispatcher_, *worker_number_scope_, cluster_manager, http_tracer_, fmt::format("{}", worker_number), *request_generator_)), - termination_predicate_( - termination_predicate_factory.create(time_source_, *worker_number_scope_, starting_time)), - sequencer_(sequencer_factory.create(time_source_, *dispatcher_, starting_time, - *benchmark_client_, *termination_predicate_, - *worker_number_scope_)) {} + phase_(std::make_unique( + "main", + sequencer_factory_.create( + time_source_, *dispatcher_, *benchmark_client_, + termination_predicate_factory_.create(time_source_, *worker_number_scope_), + *worker_number_scope_, starting_time), + true)) {} void ClientWorkerImpl::simpleWarmup() { ENVOY_LOG(debug, "> worker {}: warmup start.", worker_number_); @@ -43,11 +49,12 @@ void ClientWorkerImpl::simpleWarmup() { } void ClientWorkerImpl::work() { + benchmark_client_->setShouldMeasureLatencies(false); request_generator_->initOnThread(); simpleWarmup(); - benchmark_client_->setMeasureLatencies(true); - sequencer_->start(); - sequencer_->waitForCompletion(); + benchmark_client_->setShouldMeasureLatencies(phase_->shouldMeasureLatencies()); + phase_->run(); + // Save a final snapshot of the worker-specific counter accumulations before // we exit the thread. for (const auto& stat : store_.counters()) { @@ -72,7 +79,8 @@ void ClientWorkerImpl::shutdownThread() { benchmark_client_->terminate(); } StatisticPtrMap ClientWorkerImpl::statistics() const { StatisticPtrMap statistics; StatisticPtrMap s1 = benchmark_client_->statistics(); - StatisticPtrMap s2 = sequencer_->statistics(); + Sequencer& sequencer = phase_->sequencer(); + StatisticPtrMap s2 = sequencer.statistics(); statistics.insert(s1.begin(), s1.end()); statistics.insert(s2.begin(), s2.end()); return statistics; diff --git a/source/client/client_worker_impl.h b/source/client/client_worker_impl.h index 53d9e0ab7..3149c7e2f 100644 --- a/source/client/client_worker_impl.h +++ b/source/client/client_worker_impl.h @@ -1,5 +1,7 @@ #pragma once +#include + #include "envoy/api/api.h" #include "envoy/event/dispatcher.h" #include "envoy/stats/store.h" @@ -9,6 +11,7 @@ #include "nighthawk/client/benchmark_client.h" #include "nighthawk/client/client_worker.h" #include "nighthawk/client/factories.h" +#include "nighthawk/common/phase.h" #include "nighthawk/common/request_source.h" #include "nighthawk/common/sequencer.h" #include "nighthawk/common/termination_predicate.h" @@ -34,7 +37,9 @@ class ClientWorkerImpl : public WorkerImpl, virtual public ClientWorker { const std::map& thread_local_counter_values() override { return thread_local_counter_values_; } - const Sequencer& sequencer() const override { return *sequencer_; } + + const Phase& phase() const override { return *phase_; } + void shutdownThread() override; protected: @@ -42,15 +47,15 @@ class ClientWorkerImpl : public WorkerImpl, virtual public ClientWorker { private: void simpleWarmup(); + const TerminationPredicateFactory& termination_predicate_factory_; + const SequencerFactory& sequencer_factory_; Envoy::Stats::ScopePtr worker_scope_; Envoy::Stats::ScopePtr worker_number_scope_; const int worker_number_; - const Envoy::MonotonicTime starting_time_; Envoy::Tracing::HttpTracerPtr& http_tracer_; RequestSourcePtr request_generator_; BenchmarkClientPtr benchmark_client_; - TerminationPredicatePtr termination_predicate_; - const SequencerPtr sequencer_; + PhasePtr phase_; Envoy::LocalInfo::LocalInfoPtr local_info_; std::map thread_local_counter_values_; }; diff --git a/source/client/factories_impl.cc b/source/client/factories_impl.cc index 28bbfdac3..483cb600a 100644 --- a/source/client/factories_impl.cc +++ b/source/client/factories_impl.cc @@ -45,15 +45,14 @@ BenchmarkClientPtr BenchmarkClientFactoryImpl::create( SequencerFactoryImpl::SequencerFactoryImpl(const Options& options) : OptionBasedFactoryImpl(options) {} -SequencerPtr SequencerFactoryImpl::create(Envoy::TimeSource& time_source, - Envoy::Event::Dispatcher& dispatcher, - Envoy::MonotonicTime start_time, - BenchmarkClient& benchmark_client, - TerminationPredicate& termination_predicate, - Envoy::Stats::Scope& scope) const { +SequencerPtr SequencerFactoryImpl::create( + Envoy::TimeSource& time_source, Envoy::Event::Dispatcher& dispatcher, + BenchmarkClient& benchmark_client, TerminationPredicatePtr&& termination_predicate, + Envoy::Stats::Scope& scope, const Envoy::MonotonicTime scheduled_starting_time) const { StatisticFactoryImpl statistic_factory(options_); - RateLimiterPtr rate_limiter = - std::make_unique(time_source, Frequency(options_.requestsPerSecond())); + Frequency frequency(options_.requestsPerSecond()); + RateLimiterPtr rate_limiter = std::make_unique( + std::make_unique(time_source, frequency), scheduled_starting_time); const uint64_t burst_size = options_.burstSize(); if (burst_size) { @@ -71,9 +70,9 @@ SequencerPtr SequencerFactoryImpl::create(Envoy::TimeSource& time_source, return benchmark_client.tryStartRequest(std::move(f)); }; return std::make_unique( - platform_util_, dispatcher, time_source, start_time, std::move(rate_limiter), - sequencer_target, statistic_factory.create(), statistic_factory.create(), - options_.sequencerIdleStrategy(), termination_predicate, scope); + platform_util_, dispatcher, time_source, std::move(rate_limiter), sequencer_target, + statistic_factory.create(), statistic_factory.create(), options_.sequencerIdleStrategy(), + std::move(termination_predicate), scope); } StoreFactoryImpl::StoreFactoryImpl(const Options& options) : OptionBasedFactoryImpl(options) {} @@ -172,11 +171,10 @@ RequestSourceFactoryImpl::create(const Envoy::Upstream::ClusterManagerPtr& clust TerminationPredicateFactoryImpl::TerminationPredicateFactoryImpl(const Options& options) : OptionBasedFactoryImpl(options) {} -TerminationPredicatePtr -TerminationPredicateFactoryImpl::create(Envoy::TimeSource& time_source, Envoy::Stats::Scope& scope, - const Envoy::MonotonicTime start) const { +TerminationPredicatePtr TerminationPredicateFactoryImpl::create(Envoy::TimeSource& time_source, + Envoy::Stats::Scope& scope) const { TerminationPredicatePtr duration_predicate = - std::make_unique(time_source, start, options_.duration()); + std::make_unique(time_source, options_.duration()); TerminationPredicate* current_predicate = duration_predicate.get(); current_predicate = linkConfiguredPredicates(*current_predicate, options_.failurePredicates(), TerminationPredicate::Status::FAIL, scope); diff --git a/source/client/factories_impl.h b/source/client/factories_impl.h index 4749dbd7b..b98aedfd4 100644 --- a/source/client/factories_impl.h +++ b/source/client/factories_impl.h @@ -39,9 +39,9 @@ class SequencerFactoryImpl : public OptionBasedFactoryImpl, public SequencerFact public: SequencerFactoryImpl(const Options& options); SequencerPtr create(Envoy::TimeSource& time_source, Envoy::Event::Dispatcher& dispatcher, - Envoy::MonotonicTime start_time, BenchmarkClient& benchmark_client, - TerminationPredicate& termination_predicate, - Envoy::Stats::Scope& scope) const override; + BenchmarkClient& benchmark_client, + TerminationPredicatePtr&& termination_predicate, Envoy::Stats::Scope& scope, + const Envoy::MonotonicTime scheduled_starting_time) const override; }; class StoreFactoryImpl : public OptionBasedFactoryImpl, public StoreFactory { @@ -78,8 +78,8 @@ class TerminationPredicateFactoryImpl : public OptionBasedFactoryImpl, public TerminationPredicateFactory { public: TerminationPredicateFactoryImpl(const Options& options); - TerminationPredicatePtr create(Envoy::TimeSource& time_source, Envoy::Stats::Scope& scope, - const Envoy::MonotonicTime start) const override; + TerminationPredicatePtr create(Envoy::TimeSource& time_source, + Envoy::Stats::Scope& scope) const override; TerminationPredicate* linkConfiguredPredicates( TerminationPredicate& last_predicate, const TerminationPredicateMap& predicates, const TerminationPredicate::Status termination_status, Envoy::Stats::Scope& scope) const; diff --git a/source/client/process_impl.cc b/source/client/process_impl.cc index 538ac0b33..83ead4c12 100644 --- a/source/client/process_impl.cc +++ b/source/client/process_impl.cc @@ -454,7 +454,7 @@ bool ProcessImpl::run(OutputCollector& collector) { int i = 0; std::chrono::nanoseconds total_execution_duration = 0ns; for (auto& worker : workers_) { - auto sequencer_execution_duration = worker->sequencer().executionDuration(); + auto sequencer_execution_duration = worker->phase().sequencer().executionDuration(); // We don't write per-worker results if we only have a single worker, because the global results // will be precisely the same. if (workers_.size() > 1) { diff --git a/source/common/BUILD b/source/common/BUILD index 019e14ed3..528bee369 100644 --- a/source/common/BUILD +++ b/source/common/BUILD @@ -64,6 +64,7 @@ envoy_cc_library( envoy_cc_library( name = "nighthawk_common_lib", srcs = [ + "phase_impl.cc", "rate_limiter_impl.cc", "sequencer_impl.cc", "statistic_impl.cc", @@ -75,6 +76,7 @@ envoy_cc_library( ], hdrs = [ "frequency.h", + "phase_impl.h", "platform_util_impl.h", "rate_limiter_impl.h", "sequencer_impl.h", diff --git a/source/common/phase_impl.cc b/source/common/phase_impl.cc new file mode 100644 index 000000000..058d155f7 --- /dev/null +++ b/source/common/phase_impl.cc @@ -0,0 +1,18 @@ +#include "common/phase_impl.h" + +namespace Nighthawk { + +absl::string_view PhaseImpl::id() const { return id_; } + +Sequencer& PhaseImpl::sequencer() const { return *sequencer_; } + +bool PhaseImpl::shouldMeasureLatencies() const { return should_measure_latencies_; } + +void PhaseImpl::run() const { + ENVOY_LOG(trace, "starting '{}' phase", id()); + sequencer().start(); + sequencer().waitForCompletion(); + ENVOY_LOG(trace, "finished '{}' phase", id()); +} + +} // namespace Nighthawk \ No newline at end of file diff --git a/source/common/phase_impl.h b/source/common/phase_impl.h new file mode 100644 index 000000000..1b40164c3 --- /dev/null +++ b/source/common/phase_impl.h @@ -0,0 +1,38 @@ + +#pragma once + +#include "envoy/common/time.h" + +#include "nighthawk/common/phase.h" + +#include "external/envoy/source/common/common/logger.h" + +#include "absl/types/optional.h" + +namespace Nighthawk { + +class PhaseImpl : public Phase, public Envoy::Logger::Loggable { +public: + /** + * @param id Unique identifier of the pase (uniqueness not enforced). + * @param sequencer Sequencer that will be used to execute this phase. + * @param should_measure_latencies Indicates if latencies should be tracked for requests issued + * during execution of this phase. + * @param time_source Time source that will be used to query the clock. + * @param start_time Optional starting time of the phase. Can be used to schedule phases ahead. + */ + PhaseImpl(absl::string_view id, SequencerPtr&& sequencer, bool should_measure_latencies) + : id_(std::string(id)), sequencer_(std::move(sequencer)), + should_measure_latencies_(should_measure_latencies) {} + absl::string_view id() const override; + Sequencer& sequencer() const override; + void run() const override; + bool shouldMeasureLatencies() const override; + +private: + const std::string id_; + const SequencerPtr sequencer_; + const bool should_measure_latencies_; +}; + +} // namespace Nighthawk \ No newline at end of file diff --git a/source/common/sequencer_impl.cc b/source/common/sequencer_impl.cc index 24ce6e451..02c864651 100644 --- a/source/common/sequencer_impl.cc +++ b/source/common/sequencer_impl.cc @@ -11,19 +11,20 @@ namespace Nighthawk { SequencerImpl::SequencerImpl( const PlatformUtil& platform_util, Envoy::Event::Dispatcher& dispatcher, - Envoy::TimeSource& time_source, Envoy::MonotonicTime start_time, RateLimiterPtr&& rate_limiter, - SequencerTarget target, StatisticPtr&& latency_statistic, StatisticPtr&& blocked_statistic, + Envoy::TimeSource& time_source, RateLimiterPtr&& rate_limiter, SequencerTarget target, + StatisticPtr&& latency_statistic, StatisticPtr&& blocked_statistic, nighthawk::client::SequencerIdleStrategy::SequencerIdleStrategyOptions idle_strategy, - TerminationPredicate& termination_predicate, Envoy::Stats::Scope& scope) + TerminationPredicatePtr&& termination_predicate, Envoy::Stats::Scope& scope) : target_(std::move(target)), platform_util_(platform_util), dispatcher_(dispatcher), time_source_(time_source), rate_limiter_(std::move(rate_limiter)), latency_statistic_(std::move(latency_statistic)), - blocked_statistic_(std::move(blocked_statistic)), start_time_(start_time), - idle_strategy_(idle_strategy), termination_predicate_(termination_predicate), + blocked_statistic_(std::move(blocked_statistic)), idle_strategy_(idle_strategy), + termination_predicate_(std::move(termination_predicate)), last_termination_status_(TerminationPredicate::Status::PROCEED), scope_(scope.createScope("sequencer.")), sequencer_stats_({ALL_SEQUENCER_STATS(POOL_COUNTER(*scope_))}) { ASSERT(target_ != nullptr, "No SequencerTarget"); + ASSERT(termination_predicate_ != nullptr, "null termination predicate"); periodic_timer_ = dispatcher_.createTimer([this]() { run(true); }); spin_timer_ = dispatcher_.createTimer([this]() { run(false); }); latency_statistic_->setId("sequencer.callback"); @@ -33,9 +34,7 @@ SequencerImpl::SequencerImpl( void SequencerImpl::start() { ASSERT(!running_); running_ = true; - if (start_time_ < time_source_.monotonicTime()) { - ENVOY_LOG(error, "Sequencer start called too late"); - } + start_time_ = time_source_.monotonicTime(); // Initiate the periodic timer loop. scheduleRun(); // Immediately run. @@ -83,48 +82,43 @@ void SequencerImpl::updateStartBlockingTimeIfNeeded() { void SequencerImpl::run(bool from_periodic_timer) { ASSERT(running_); const auto now = last_event_time_ = time_source_.monotonicTime(); - const auto running_duration = now - start_time_; - - // The running_duration we compute will be negative until it is time to start. - if (running_duration >= 0ns) { - last_termination_status_ = last_termination_status_ == TerminationPredicate::Status::PROCEED - ? termination_predicate_.evaluateChain() - : last_termination_status_; - // If we should stop according to termination conditions. - if (last_termination_status_ != TerminationPredicate::Status::PROCEED) { - stop(last_termination_status_ == TerminationPredicate::Status::FAIL); - return; - } + last_termination_status_ = last_termination_status_ == TerminationPredicate::Status::PROCEED + ? termination_predicate_->evaluateChain() + : last_termination_status_; + // If we should stop according to termination conditions. + if (last_termination_status_ != TerminationPredicate::Status::PROCEED) { + stop(last_termination_status_ == TerminationPredicate::Status::FAIL); + return; + } - while (rate_limiter_->tryAcquireOne()) { - // The rate limiter says it's OK to proceed and call the target. Let's see if the target is OK - // with that as well. - const bool target_could_start = target_([this, now](bool, bool) { - const auto dur = time_source_.monotonicTime() - now; - latency_statistic_->addValue(dur.count()); - targets_completed_++; - // Callbacks may fire after stop() is called. When the worker teardown runs the dispatcher, - // in-flight work might wrap up and fire this callback. By then we wouldn't want to - // re-enable any timers here. - if (this->running_) { - // Immediately schedule us to check again, as chances are we can get on with the next - // task. - spin_timer_->enableHRTimer(0ms); - } - }); - - if (target_could_start) { - unblockAndUpdateStatisticIfNeeded(now); - targets_initiated_++; - } else { - // This should only happen when we are running in closed-loop mode.The target wasn't able to - // proceed. Update the rate limiter. - updateStartBlockingTimeIfNeeded(); - rate_limiter_->releaseOne(); - // Retry later. When all target_ calls have completed we are going to spin until target_ - // stops returning false. Otherwise the periodic timer will wake us up to re-check. - break; + while (rate_limiter_->tryAcquireOne()) { + // The rate limiter says it's OK to proceed and call the target. Let's see if the target is OK + // with that as well. + const bool target_could_start = target_([this, now](bool, bool) { + const auto dur = time_source_.monotonicTime() - now; + latency_statistic_->addValue(dur.count()); + targets_completed_++; + // Callbacks may fire after stop() is called. When the worker teardown runs the dispatcher, + // in-flight work might wrap up and fire this callback. By then we wouldn't want to + // re-enable any timers here. + if (this->running_) { + // Immediately schedule us to check again, as chances are we can get on with the next + // task. + spin_timer_->enableHRTimer(0ms); } + }); + + if (target_could_start) { + unblockAndUpdateStatisticIfNeeded(now); + targets_initiated_++; + } else { + // This should only happen when we are running in closed-loop mode.The target wasn't able to + // proceed. Update the rate limiter. + updateStartBlockingTimeIfNeeded(); + rate_limiter_->releaseOne(); + // Retry later. When all target_ calls have completed we are going to spin until target_ + // stops returning false. Otherwise the periodic timer will wake us up to re-check. + break; } } diff --git a/source/common/sequencer_impl.h b/source/common/sequencer_impl.h index a7bae23eb..b69f09ac8 100644 --- a/source/common/sequencer_impl.h +++ b/source/common/sequencer_impl.h @@ -49,11 +49,10 @@ class SequencerImpl : public Sequencer, public Envoy::Logger::Loggable duration_ ? TerminationPredicate::Status::TERMINATE - : TerminationPredicate::Status::PROCEED; + const auto now = time_source_.monotonicTime(); + if (!start_.has_value()) { + start_ = now; + } + return now - start_.value() > duration_ ? TerminationPredicate::Status::TERMINATE + : TerminationPredicate::Status::PROCEED; } TerminationPredicate::Status StatsCounterAbsoluteThresholdTerminationPredicateImpl::evaluate() { diff --git a/source/common/termination_predicate_impl.h b/source/common/termination_predicate_impl.h index 1b88fe68e..b8314e41d 100644 --- a/source/common/termination_predicate_impl.h +++ b/source/common/termination_predicate_impl.h @@ -5,32 +5,44 @@ #include "nighthawk/common/termination_predicate.h" +#include "absl/types/optional.h" + namespace Nighthawk { class TerminationPredicateBaseImpl : public TerminationPredicate { public: TerminationPredicate& link(TerminationPredicatePtr&& child) final { RELEASE_ASSERT(linked_child_ == nullptr, "Linked child already set"); - RELEASE_ASSERT(child != nullptr, "child == nullptr"); linked_child_ = std::move(child); return *linked_child_; } + TerminationPredicate& appendToChain(TerminationPredicatePtr&& child) final { + if (linked_child_ != nullptr) { + return linked_child_->appendToChain(std::move(child)); + } else { + return link(std::move(child)); + } + } TerminationPredicate::Status evaluateChain() final; private: TerminationPredicatePtr linked_child_; }; +/** + * Predicate which indicates termination iff the passed in duration has expired. + * time tracking starts at the first call to evaluate(). + */ class DurationTerminationPredicateImpl : public TerminationPredicateBaseImpl { public: - DurationTerminationPredicateImpl(Envoy::TimeSource& time_source, const Envoy::MonotonicTime start, + DurationTerminationPredicateImpl(Envoy::TimeSource& time_source, std::chrono::microseconds duration) - : time_source_(time_source), start_(start), duration_(duration) {} + : time_source_(time_source), start_(absl::nullopt), duration_(duration) {} TerminationPredicate::Status evaluate() override; private: Envoy::TimeSource& time_source_; - const Envoy::MonotonicTime start_; + absl::optional start_; std::chrono::microseconds duration_; }; diff --git a/test/benchmark_http_client_test.cc b/test/benchmark_http_client_test.cc index f7f928e73..64e0de786 100644 --- a/test/benchmark_http_client_test.cc +++ b/test/benchmark_http_client_test.cc @@ -174,11 +174,11 @@ TEST_F(BenchmarkClientHttpTest, WeirdStatus) { TEST_F(BenchmarkClientHttpTest, EnableLatencyMeasurement) { setupBenchmarkClient(); - EXPECT_EQ(false, client_->measureLatencies()); + EXPECT_EQ(false, client_->shouldMeasureLatencies()); testBasicFunctionality(10, 1, 10); EXPECT_EQ(0, client_->statistics()["benchmark_http_client.queue_to_connect"]->count()); EXPECT_EQ(0, client_->statistics()["benchmark_http_client.request_to_response"]->count()); - client_->setMeasureLatencies(true); + client_->setShouldMeasureLatencies(true); testBasicFunctionality(10, 1, 10); EXPECT_EQ(10, client_->statistics()["benchmark_http_client.queue_to_connect"]->count()); EXPECT_EQ(10, client_->statistics()["benchmark_http_client.request_to_response"]->count()); diff --git a/test/client_worker_test.cc b/test/client_worker_test.cc index 1eb89b1d0..834d9966e 100644 --- a/test/client_worker_test.cc +++ b/test/client_worker_test.cc @@ -20,7 +20,7 @@ #include "gtest/gtest.h" using namespace testing; - +using namespace std::chrono_literals; namespace Nighthawk { namespace Client { @@ -34,7 +34,6 @@ class ClientWorkerTest : public Test { benchmark_client_ = new MockBenchmarkClient(); sequencer_ = new MockSequencer(); request_generator_ = new MockRequestSource(); - termination_predicate_ = new MockTerminationPredicate(); EXPECT_CALL(benchmark_client_factory_, create(_, _, _, _, _, _, _)) .Times(1) @@ -47,10 +46,10 @@ class ClientWorkerTest : public Test { EXPECT_CALL(request_generator_factory_, create(_, _, _, _)) .Times(1) .WillOnce(Return(ByMove(std::unique_ptr(request_generator_)))); - EXPECT_CALL(*request_generator_, initOnThread()).Times(1); - EXPECT_CALL(termination_predicate_factory_, create(_, _, _)) - .WillOnce(Return(ByMove(std::unique_ptr(termination_predicate_)))); + + EXPECT_CALL(termination_predicate_factory_, create(_, _)) + .WillOnce(Return(ByMove(createMockTerminationPredicate()))); } StatisticPtrMap createStatisticPtrMap() const { @@ -65,6 +64,17 @@ class ClientWorkerTest : public Test { return false; } + TerminationPredicatePtr createMockTerminationPredicate() { + auto predicate = std::make_unique>(); + ON_CALL(*predicate, appendToChain(_)).WillByDefault(ReturnRef(*predicate)); + EXPECT_CALL(*predicate, evaluateChain()) + .Times(AtLeast(0)) + .WillOnce(Return(TerminationPredicate::Status::PROCEED)) + .WillOnce(Return(TerminationPredicate::Status::TERMINATE)); + + return predicate; + } + StreamingStatistic statistic_; Envoy::Api::ApiPtr api_; std::thread::id thread_id_; @@ -87,7 +97,6 @@ class ClientWorkerTest : public Test { NiceMock validation_visitor_; Envoy::Upstream::ClusterManagerPtr cluster_manager_ptr_; Envoy::Tracing::HttpTracerPtr http_tracer_; - MockTerminationPredicate* termination_predicate_; }; TEST_F(ClientWorkerTest, BasicTest) { @@ -95,30 +104,21 @@ TEST_F(ClientWorkerTest, BasicTest) { { InSequence dummy; - - EXPECT_CALL(*sequencer_, start).Times(1); - EXPECT_CALL(*sequencer_, waitForCompletion).Times(1); - } - - { - InSequence dummy; - - // warmup + EXPECT_CALL(*benchmark_client_, setShouldMeasureLatencies(false)).Times(1); EXPECT_CALL(*benchmark_client_, tryStartRequest(_)) .Times(1) .WillRepeatedly(Invoke(this, &ClientWorkerTest::CheckThreadChanged)); - - // latency measurement will be initiated - EXPECT_CALL(*benchmark_client_, setMeasureLatencies(true)).Times(1); + EXPECT_CALL(*benchmark_client_, setShouldMeasureLatencies(true)).Times(1); + EXPECT_CALL(*sequencer_, start).Times(1); + EXPECT_CALL(*sequencer_, waitForCompletion).Times(1); EXPECT_CALL(*benchmark_client_, terminate()).Times(1); } - int worker_number = 12345; auto worker = std::make_unique( *api_, tls_, cluster_manager_ptr_, benchmark_client_factory_, termination_predicate_factory_, sequencer_factory_, request_generator_factory_, store_, worker_number, - time_system_.monotonicTime(), http_tracer_); + time_system_.monotonicTime() + 10ms, http_tracer_); worker->start(); worker->waitForCompletion(); diff --git a/test/factories_test.cc b/test/factories_test.cc index 7216507b4..efe9f823e 100644 --- a/test/factories_test.cc +++ b/test/factories_test.cc @@ -86,9 +86,9 @@ class SequencerFactoryTest EXPECT_CALL(dispatcher_, createTimer_(_)).Times(2); EXPECT_CALL(options_, jitterUniform()).Times(1).WillOnce(Return(1ns)); Envoy::Event::SimulatedTimeSystem time_system; - MockTerminationPredicate termination_predicate; - auto sequencer = factory.create(api_->timeSource(), dispatcher_, time_system.monotonicTime(), - benchmark_client, termination_predicate, stats_store_); + auto sequencer = factory.create(api_->timeSource(), dispatcher_, benchmark_client, + std::make_unique(), stats_store_, + time_system.monotonicTime() + 10ms); EXPECT_NE(nullptr, sequencer.get()); } }; diff --git a/test/integration/test_integration_basics.py b/test/integration/test_integration_basics.py index 90d1567be..9debb8d03 100644 --- a/test/integration/test_integration_basics.py +++ b/test/integration/test_integration_basics.py @@ -325,12 +325,12 @@ def test_https_h2_transport_socket_configuration(https_test_server_fixture): def test_https_prefetching(https_test_server_fixture): """ - Test we prefetch connections. We test for 1 second at 1 rps, which should + Test we prefetch connections. We test for 1 second at 2 rps, which should result in 1 connection max without prefetching. However, we specify 50 connections and the prefetching flag, so we ought to see 50 http1 connections created. """ parsed_json, _ = https_test_server_fixture.runNighthawkClient([ - "--duration 1", "--rps 1", "--prefetch-connections", "--connections 50", + "--duration 1", "--rps 2", "--prefetch-connections", "--connections 50", https_test_server_fixture.getTestServerRootUri() ]) counters = https_test_server_fixture.getNighthawkCounterMapFromJson(parsed_json) diff --git a/test/mocks.h b/test/mocks.h index 0af3b00eb..a55643279 100644 --- a/test/mocks.h +++ b/test/mocks.h @@ -127,10 +127,10 @@ class MockSequencerFactory : public Client::SequencerFactory { MockSequencerFactory(); MOCK_CONST_METHOD6(create, SequencerPtr(Envoy::TimeSource& time_source, Envoy::Event::Dispatcher& dispatcher, - Envoy::MonotonicTime start_time, Client::BenchmarkClient& benchmark_client, - TerminationPredicate& termination_predicate, - Envoy::Stats::Scope& scope)); + TerminationPredicatePtr&& termination_predicate, + Envoy::Stats::Scope& scope, + const Envoy::MonotonicTime scheduled_starting_time)); }; class MockStoreFactory : public Client::StoreFactory { @@ -158,9 +158,8 @@ class MockRequestSourceFactory : public RequestSourceFactory { class MockTerminationPredicateFactory : public TerminationPredicateFactory { public: MockTerminationPredicateFactory(); - MOCK_CONST_METHOD3(create, TerminationPredicatePtr(Envoy::TimeSource& time_source, - Envoy::Stats::Scope& scope, - const Envoy::MonotonicTime start)); + MOCK_CONST_METHOD2(create, TerminationPredicatePtr(Envoy::TimeSource& time_source, + Envoy::Stats::Scope& scope)); }; class FakeSequencerTarget { @@ -181,11 +180,11 @@ class MockBenchmarkClient : public Client::BenchmarkClient { MockBenchmarkClient(); MOCK_METHOD0(terminate, void()); - MOCK_METHOD1(setMeasureLatencies, void(bool)); + MOCK_METHOD1(setShouldMeasureLatencies, void(bool)); MOCK_CONST_METHOD0(statistics, StatisticPtrMap()); MOCK_METHOD1(tryStartRequest, bool(Client::CompletionCallback)); MOCK_CONST_METHOD0(scope, Envoy::Stats::Scope&()); - MOCK_CONST_METHOD0(measureLatencies, bool()); + MOCK_CONST_METHOD0(shouldMeasureLatencies, bool()); MOCK_CONST_METHOD0(requestHeaders, const Envoy::Http::HeaderMap&()); }; @@ -200,6 +199,7 @@ class MockTerminationPredicate : public TerminationPredicate { public: MockTerminationPredicate(); MOCK_METHOD1(link, TerminationPredicate&(TerminationPredicatePtr&&)); + MOCK_METHOD1(appendToChain, TerminationPredicate&(TerminationPredicatePtr&&)); MOCK_METHOD0(evaluateChain, TerminationPredicate::Status()); MOCK_METHOD0(evaluate, TerminationPredicate::Status()); }; diff --git a/test/sequencer_test.cc b/test/sequencer_test.cc index 5675da5b9..08252e29c 100644 --- a/test/sequencer_test.cc +++ b/test/sequencer_test.cc @@ -89,12 +89,16 @@ class SequencerTestWithTimerEmulation : public SequencerTest { const Envoy::ScopeTrackedObject*) { timer2_set_ = true; })); EXPECT_CALL(*dispatcher_, exit()).WillOnce(Invoke([&]() { stopped_ = true; })); simulation_start_ = time_system_.monotonicTime(); - EXPECT_CALL(termination_predicate, evaluateChain()).WillRepeatedly(Invoke([this]() { - return (time_system_.monotonicTime() - simulation_start_) <= - (test_number_of_intervals_ * interval_) - ? TerminationPredicate::Status::PROCEED - : TerminationPredicate::Status::TERMINATE; - })); + auto* unsafe_mock_termination_predicate = new MockTerminationPredicate(); + termination_predicate_ = + std::unique_ptr(unsafe_mock_termination_predicate); + EXPECT_CALL(*unsafe_mock_termination_predicate, evaluateChain()) + .WillRepeatedly(Invoke([this]() { + return (time_system_.monotonicTime() - simulation_start_) <= + (test_number_of_intervals_ * interval_) + ? TerminationPredicate::Status::PROCEED + : TerminationPredicate::Status::TERMINATE; + })); } void expectDispatcherRun() { @@ -126,7 +130,7 @@ class SequencerTestWithTimerEmulation : public SequencerTest { } MockSequencerTarget* target() { return &target_; } - MockTerminationPredicate termination_predicate; + TerminationPredicatePtr termination_predicate_; private: NiceMock* timer1_; // not owned @@ -144,11 +148,10 @@ class SequencerTestWithTimerEmulation : public SequencerTest { TEST_F(SequencerTestWithTimerEmulation, RateLimiterInteraction) { SequencerTarget callback = std::bind(&MockSequencerTarget::callback, target(), std::placeholders::_1); - SequencerImpl sequencer(platform_util_, *dispatcher_, time_system_, time_system_.monotonicTime(), - std::move(rate_limiter_), callback, - std::make_unique(), + SequencerImpl sequencer(platform_util_, *dispatcher_, time_system_, std::move(rate_limiter_), + callback, std::make_unique(), std::make_unique(), SequencerIdleStrategy::SLEEP, - termination_predicate, store_); + std::move(termination_predicate_), store_); // Have the mock rate limiter gate two calls, and block everything else. EXPECT_CALL(rate_limiter_unsafe_ref_, tryAcquireOne()) .Times(AtLeast(3)) @@ -162,29 +165,14 @@ TEST_F(SequencerTestWithTimerEmulation, RateLimiterInteraction) { sequencer.waitForCompletion(); } -TEST_F(SequencerTestWithTimerEmulation, StartingLate) { - SequencerTarget callback = - std::bind(&MockSequencerTarget::callback, target(), std::placeholders::_1); - SequencerImpl sequencer(platform_util_, *dispatcher_, time_system_, time_system_.monotonicTime(), - std::move(rate_limiter_), callback, - std::make_unique(), - std::make_unique(), SequencerIdleStrategy::SLEEP, - termination_predicate, store_); - - time_system_.setMonotonicTime(time_system_.monotonicTime() + 100s); - sequencer.start(); - sequencer.waitForCompletion(); -} - // Saturated rate limiter interaction test. TEST_F(SequencerTestWithTimerEmulation, RateLimiterSaturatedTargetInteraction) { SequencerTarget callback = std::bind(&MockSequencerTarget::callback, target(), std::placeholders::_1); - SequencerImpl sequencer(platform_util_, *dispatcher_, time_system_, time_system_.monotonicTime(), - std::move(rate_limiter_), callback, - std::make_unique(), + SequencerImpl sequencer(platform_util_, *dispatcher_, time_system_, std::move(rate_limiter_), + callback, std::make_unique(), std::make_unique(), SequencerIdleStrategy::SLEEP, - termination_predicate, store_); + std::move(termination_predicate_), store_); EXPECT_CALL(rate_limiter_unsafe_ref_, tryAcquireOne()) .Times(AtLeast(3)) @@ -222,10 +210,10 @@ class SequencerIntegrationTest : public SequencerTestWithTimerEmulation { std::unique_ptr rate_limiter_; void testRegularFlow(SequencerIdleStrategy::SequencerIdleStrategyOptions idle_strategy) { - SequencerImpl sequencer( - platform_util_, *dispatcher_, time_system_, time_system_.monotonicTime(), - std::move(rate_limiter_), sequencer_target_, std::make_unique(), - std::make_unique(), idle_strategy, termination_predicate, store_); + SequencerImpl sequencer(platform_util_, *dispatcher_, time_system_, std::move(rate_limiter_), + sequencer_target_, std::make_unique(), + std::make_unique(), idle_strategy, + std::move(termination_predicate_), store_); EXPECT_EQ(0, callback_test_count_); EXPECT_EQ(0, sequencer.latencyStatistic().count()); sequencer.start(); @@ -260,11 +248,10 @@ TEST_F(SequencerIntegrationTest, IdleStrategySleep) { TEST_F(SequencerIntegrationTest, AlwaysSaturatedTargetTest) { SequencerTarget callback = std::bind(&SequencerIntegrationTest::saturated_test, this, std::placeholders::_1); - SequencerImpl sequencer(platform_util_, *dispatcher_, time_system_, time_system_.monotonicTime(), - std::move(rate_limiter_), callback, - std::make_unique(), + SequencerImpl sequencer(platform_util_, *dispatcher_, time_system_, std::move(rate_limiter_), + callback, std::make_unique(), std::make_unique(), SequencerIdleStrategy::SLEEP, - termination_predicate, store_); + std::move(termination_predicate_), store_); EXPECT_CALL(platform_util_, sleep(_)).Times(AtLeast(1)); sequencer.start(); sequencer.waitForCompletion(); @@ -279,11 +266,10 @@ TEST_F(SequencerIntegrationTest, AlwaysSaturatedTargetTest) { TEST_F(SequencerIntegrationTest, CallbacksDoNotInfluenceTestDuration) { SequencerTarget callback = std::bind(&SequencerIntegrationTest::timeout_test, this, std::placeholders::_1); - SequencerImpl sequencer(platform_util_, *dispatcher_, time_system_, time_system_.monotonicTime(), - std::move(rate_limiter_), callback, - std::make_unique(), + SequencerImpl sequencer(platform_util_, *dispatcher_, time_system_, std::move(rate_limiter_), + callback, std::make_unique(), std::make_unique(), SequencerIdleStrategy::SLEEP, - termination_predicate, store_); + std::move(termination_predicate_), store_); EXPECT_CALL(platform_util_, sleep(_)).Times(AtLeast(1)); auto pre_timeout = time_system_.monotonicTime(); sequencer.start(); diff --git a/test/termination_predicate_test.cc b/test/termination_predicate_test.cc index 1544f0bac..cc4d2fb88 100644 --- a/test/termination_predicate_test.cc +++ b/test/termination_predicate_test.cc @@ -25,7 +25,7 @@ class TerminationPredicateTest : public Test { TEST_F(TerminationPredicateTest, DurationTerminationPredicateImplTest) { const auto duration = 100us; - DurationTerminationPredicateImpl pred(time_system, time_system.monotonicTime(), duration); + DurationTerminationPredicateImpl pred(time_system, duration); EXPECT_EQ(pred.evaluate(), TerminationPredicate::Status::PROCEED); // move to the edge. time_system.sleep(duration); @@ -64,4 +64,18 @@ TEST_F(TerminationPredicateTest, LinkedPredicates) { EXPECT_EQ(fail_pred.evaluateChain(), TerminationPredicate::Status::TERMINATE); } +TEST_F(TerminationPredicateTest, AppendToChain) { + auto& foo_counter = stats_store_.counter("foo"); + foo_counter.inc(); + StatsCounterAbsoluteThresholdTerminationPredicateImpl predicate( + foo_counter, 1, TerminationPredicate::Status::TERMINATE); + // The counter doesn't exceed the predicate threshold, so we shouldn't see TERMINATE + EXPECT_EQ(predicate.evaluateChain(), TerminationPredicate::Status::PROCEED); + auto child_predicate = std::make_unique( + foo_counter, 0, TerminationPredicate::Status::FAIL); + EXPECT_EQ(child_predicate.get(), &(predicate.appendToChain(std::move(child_predicate)))); + // This ought to evaluate to FAIL as the counter threshold is exceeded. + EXPECT_EQ(predicate.evaluateChain(), TerminationPredicate::Status::FAIL); +} + } // namespace Nighthawk