diff --git a/api/client/options.proto b/api/client/options.proto index 13d7f1819..d1108e908 100644 --- a/api/client/options.proto +++ b/api/client/options.proto @@ -3,6 +3,7 @@ syntax = "proto3"; package nighthawk.client; import "google/protobuf/duration.proto"; +import "google/protobuf/timestamp.proto"; import "google/protobuf/wrappers.proto"; import "envoy/config/core/v3/base.proto"; import "envoy/config/metrics/v3/stats.proto"; @@ -219,4 +220,7 @@ message CommandLineOptions { // "emit_previous_request_delta_in_response_header" to record elapsed time between request // arrivals. google.protobuf.StringValue latency_response_header_name = 36; + // Provide an execution starting date and time. Optional, any value specified must be in the + // future. + google.protobuf.Timestamp scheduled_start = 105; } diff --git a/api/client/output.proto b/api/client/output.proto index 8e74f34f4..cc3e339db 100644 --- a/api/client/output.proto +++ b/api/client/output.proto @@ -49,6 +49,7 @@ message Result { repeated Statistic statistics = 2; repeated Counter counters = 3; google.protobuf.Duration execution_duration = 4; + google.protobuf.Timestamp execution_start = 5; } message Output { diff --git a/include/nighthawk/client/options.h b/include/nighthawk/client/options.h index c87b03c5f..5e6dc8292 100644 --- a/include/nighthawk/client/options.h +++ b/include/nighthawk/client/options.h @@ -6,6 +6,7 @@ #include #include "envoy/common/pure.h" +#include "envoy/common/time.h" #include "envoy/config/cluster/v3/cluster.pb.h" #include "envoy/config/core/v3/base.pb.h" #include "envoy/config/metrics/v3/stats.pb.h" @@ -74,7 +75,7 @@ class Options { virtual std::vector statsSinks() const PURE; virtual uint32_t statsFlushInterval() const PURE; virtual std::string responseHeaderWithLatencyInput() const PURE; - + virtual absl::optional scheduled_start() const PURE; /** * Converts an Options instance to an equivalent CommandLineOptions instance in terms of option * values. diff --git a/include/nighthawk/client/output_collector.h b/include/nighthawk/client/output_collector.h index 1ff274821..bd457681e 100644 --- a/include/nighthawk/client/output_collector.h +++ b/include/nighthawk/client/output_collector.h @@ -3,9 +3,12 @@ #include #include "envoy/common/pure.h" +#include "envoy/common/time.h" #include "nighthawk/common/statistic.h" +#include "absl/types/optional.h" + namespace Nighthawk { namespace Client { @@ -23,10 +26,12 @@ class OutputCollector { * @param statistics Reference to a vector of statistics to add to the output. * @param counters Reference to a map of counter values, keyed by name, to add to the output. * @param execution_duration Execution duration associated to the to-be-added result. + * @param first_acquisition_time Timing of the first rate limiter acquisition. */ virtual void addResult(absl::string_view name, const std::vector& statistics, const std::map& counters, - const std::chrono::nanoseconds execution_duration) PURE; + const std::chrono::nanoseconds execution_duration, + const absl::optional& first_acquisition_time) PURE; /** * Directly sets the output value. * diff --git a/include/nighthawk/common/factories.h b/include/nighthawk/common/factories.h index 20d5eb6a6..c93097b79 100644 --- a/include/nighthawk/common/factories.h +++ b/include/nighthawk/common/factories.h @@ -24,7 +24,7 @@ class SequencerFactory { const SequencerTarget& sequencer_target, TerminationPredicatePtr&& termination_predicate, Envoy::Stats::Scope& scope, - const Envoy::SystemTime scheduled_starting_time) const PURE; + const Envoy::MonotonicTime scheduled_starting_time) const PURE; }; class StatisticFactory { @@ -46,7 +46,7 @@ class TerminationPredicateFactory { virtual ~TerminationPredicateFactory() = default; virtual TerminationPredicatePtr create(Envoy::TimeSource& time_source, Envoy::Stats::Scope& scope, - const Envoy::SystemTime scheduled_starting_time) const PURE; + const Envoy::MonotonicTime scheduled_starting_time) const PURE; }; /** diff --git a/include/nighthawk/common/rate_limiter.h b/include/nighthawk/common/rate_limiter.h index c5f5f69e8..0c2eadad8 100644 --- a/include/nighthawk/common/rate_limiter.h +++ b/include/nighthawk/common/rate_limiter.h @@ -33,6 +33,12 @@ class RateLimiter { * @return Envoy::TimeSource& time_source used to track time. */ virtual Envoy::TimeSource& timeSource() PURE; + + /** + * @return absl::optional Time of the first acquisition, if any. + */ + virtual absl::optional firstAcquisitionTime() const PURE; + /** * @return std::chrono::nanoseconds elapsed since the first call to tryAcquireOne(). Used by some * rate limiter implementations to compute acquisition rate. diff --git a/include/nighthawk/common/sequencer.h b/include/nighthawk/common/sequencer.h index afc548518..56e177032 100644 --- a/include/nighthawk/common/sequencer.h +++ b/include/nighthawk/common/sequencer.h @@ -7,6 +7,7 @@ #include "envoy/common/pure.h" #include "nighthawk/common/operation_callback.h" +#include "nighthawk/common/rate_limiter.h" #include "nighthawk/common/statistic.h" namespace Nighthawk { @@ -35,6 +36,11 @@ class Sequencer { */ virtual std::chrono::nanoseconds executionDuration() const PURE; + /** + * @return RateLimiter& reference to the rate limiter associated to this sequencer. + */ + virtual const RateLimiter& rate_limiter() const PURE; + /** * @return double an up-to-date completions per second rate. */ diff --git a/source/client/client_worker_impl.cc b/source/client/client_worker_impl.cc index a6c23542c..a5d456763 100644 --- a/source/client/client_worker_impl.cc +++ b/source/client/client_worker_impl.cc @@ -19,7 +19,7 @@ ClientWorkerImpl::ClientWorkerImpl(Envoy::Api::Api& api, Envoy::ThreadLocal::Ins const SequencerFactory& sequencer_factory, const RequestSourceFactory& request_generator_factory, Envoy::Stats::Store& store, const int worker_number, - const Envoy::SystemTime starting_time, + const Envoy::MonotonicTime starting_time, Envoy::Tracing::HttpTracerSharedPtr& http_tracer, const HardCodedWarmupStyle hardcoded_warmup_style) : WorkerImpl(api, tls, store), diff --git a/source/client/client_worker_impl.h b/source/client/client_worker_impl.h index 0decef819..41b2660bc 100644 --- a/source/client/client_worker_impl.h +++ b/source/client/client_worker_impl.h @@ -33,7 +33,7 @@ class ClientWorkerImpl : public WorkerImpl, virtual public ClientWorker { const SequencerFactory& sequencer_factory, const RequestSourceFactory& request_generator_factory, Envoy::Stats::Store& store, const int worker_number, - const Envoy::SystemTime starting_time, + const Envoy::MonotonicTime starting_time, Envoy::Tracing::HttpTracerSharedPtr& http_tracer, const HardCodedWarmupStyle hardcoded_warmup_style); StatisticPtrMap statistics() const override; diff --git a/source/client/factories_impl.cc b/source/client/factories_impl.cc index 01ae6c50d..2031658c4 100644 --- a/source/client/factories_impl.cc +++ b/source/client/factories_impl.cc @@ -65,12 +65,10 @@ BenchmarkClientPtr BenchmarkClientFactoryImpl::create( SequencerFactoryImpl::SequencerFactoryImpl(const Options& options) : OptionBasedFactoryImpl(options) {} -SequencerPtr SequencerFactoryImpl::create(Envoy::TimeSource& time_source, - Envoy::Event::Dispatcher& dispatcher, - const SequencerTarget& sequencer_target, - TerminationPredicatePtr&& termination_predicate, - Envoy::Stats::Scope& scope, - const Envoy::SystemTime scheduled_starting_time) const { +SequencerPtr SequencerFactoryImpl::create( + Envoy::TimeSource& time_source, Envoy::Event::Dispatcher& dispatcher, + const SequencerTarget& sequencer_target, TerminationPredicatePtr&& termination_predicate, + Envoy::Stats::Scope& scope, const Envoy::MonotonicTime scheduled_starting_time) const { StatisticFactoryImpl statistic_factory(options_); Frequency frequency(options_.requestsPerSecond()); RateLimiterPtr rate_limiter = std::make_unique( @@ -211,7 +209,7 @@ TerminationPredicateFactoryImpl::TerminationPredicateFactoryImpl(const Options& TerminationPredicatePtr TerminationPredicateFactoryImpl::create(Envoy::TimeSource& time_source, Envoy::Stats::Scope& scope, - const Envoy::SystemTime scheduled_starting_time) const { + const Envoy::MonotonicTime scheduled_starting_time) const { // We'll always link a predicate which checks for requests to cancel. TerminationPredicatePtr root_predicate = std::make_unique( diff --git a/source/client/factories_impl.h b/source/client/factories_impl.h index 2c8abfd6d..2932a72bf 100644 --- a/source/client/factories_impl.h +++ b/source/client/factories_impl.h @@ -44,7 +44,7 @@ class SequencerFactoryImpl : public OptionBasedFactoryImpl, public SequencerFact SequencerPtr create(Envoy::TimeSource& time_source, Envoy::Event::Dispatcher& dispatcher, const SequencerTarget& sequencer_target, TerminationPredicatePtr&& termination_predicate, Envoy::Stats::Scope& scope, - const Envoy::SystemTime scheduled_starting_time) const override; + const Envoy::MonotonicTime scheduled_starting_time) const override; }; class StatisticFactoryImpl : public OptionBasedFactoryImpl, public StatisticFactory { @@ -93,7 +93,7 @@ class TerminationPredicateFactoryImpl : public OptionBasedFactoryImpl, public: TerminationPredicateFactoryImpl(const Options& options); TerminationPredicatePtr create(Envoy::TimeSource& time_source, Envoy::Stats::Scope& scope, - const Envoy::SystemTime scheduled_starting_time) const override; + const Envoy::MonotonicTime scheduled_starting_time) const override; TerminationPredicate* linkConfiguredPredicates( TerminationPredicate& last_predicate, const TerminationPredicateMap& predicates, const TerminationPredicate::Status termination_status, Envoy::Stats::Scope& scope) const; diff --git a/source/client/options_impl.cc b/source/client/options_impl.cc index 29af954e7..b4ff7ae3e 100644 --- a/source/client/options_impl.cc +++ b/source/client/options_impl.cc @@ -652,7 +652,12 @@ OptionsImpl::OptionsImpl(const nighthawk::client::CommandLineOptions& options) { std::copy(options.labels().begin(), options.labels().end(), std::back_inserter(labels_)); latency_response_header_name_ = PROTOBUF_GET_WRAPPED_OR_DEFAULT( options, latency_response_header_name, latency_response_header_name_); - + if (options.has_scheduled_start()) { + const auto elapsed_since_epoch = std::chrono::nanoseconds(options.scheduled_start().nanos()) + + std::chrono::seconds(options.scheduled_start().seconds()); + scheduled_start_ = + Envoy::SystemTime(std::chrono::time_point(elapsed_since_epoch)); + } validate(); } @@ -829,6 +834,11 @@ CommandLineOptionsPtr OptionsImpl::toCommandLineOptionsInternal() const { command_line_options->mutable_stats_flush_interval()->set_value(stats_flush_interval_); command_line_options->mutable_latency_response_header_name()->set_value( latency_response_header_name_); + if (scheduled_start_.has_value()) { + *(command_line_options->mutable_scheduled_start()) = + Envoy::ProtobufUtil::TimeUtil::NanosecondsToTimestamp( + scheduled_start_.value().time_since_epoch().count()); + } return command_line_options; } diff --git a/source/client/options_impl.h b/source/client/options_impl.h index c43c211a3..7132aba01 100644 --- a/source/client/options_impl.h +++ b/source/client/options_impl.h @@ -93,6 +93,7 @@ class OptionsImpl : public Options, public Envoy::Logger::Loggable scheduled_start() const override { return scheduled_start_; } private: void parsePredicates(const TCLAP::MultiArg& arg, @@ -149,6 +150,7 @@ class OptionsImpl : public Options, public Envoy::Logger::Loggable stats_sinks_; uint32_t stats_flush_interval_{5}; std::string latency_response_header_name_; + absl::optional scheduled_start_; }; } // namespace Client diff --git a/source/client/output_collector_impl.cc b/source/client/output_collector_impl.cc index 2303c164a..b6749f0c9 100644 --- a/source/client/output_collector_impl.cc +++ b/source/client/output_collector_impl.cc @@ -23,12 +23,19 @@ OutputCollectorImpl::OutputCollectorImpl(Envoy::TimeSource& time_source, const O nighthawk::client::Output OutputCollectorImpl::toProto() const { return output_; } -void OutputCollectorImpl::addResult(absl::string_view name, - const std::vector& statistics, - const std::map& counters, - const std::chrono::nanoseconds execution_duration) { +void OutputCollectorImpl::addResult( + absl::string_view name, const std::vector& statistics, + const std::map& counters, + const std::chrono::nanoseconds execution_duration, + const absl::optional& first_acquisition_time) { auto result = output_.add_results(); result->set_name(name.data(), name.size()); + if (first_acquisition_time.has_value()) { + *(result->mutable_execution_start()) = Envoy::Protobuf::util::TimeUtil::NanosecondsToTimestamp( + std::chrono::duration_cast( + first_acquisition_time.value().time_since_epoch()) + .count()); + } for (auto& statistic : statistics) { // TODO(#292): Looking at if the statistic id ends with "_size" to determine how it should be // serialized is kind of hacky. Maybe we should have a lookup table of sorts, to determine how diff --git a/source/client/output_collector_impl.h b/source/client/output_collector_impl.h index 40109f936..ccc9d5bae 100644 --- a/source/client/output_collector_impl.h +++ b/source/client/output_collector_impl.h @@ -20,7 +20,8 @@ class OutputCollectorImpl : public OutputCollector { void addResult(absl::string_view name, const std::vector& statistics, const std::map& counters, - const std::chrono::nanoseconds execution_duration) override; + const std::chrono::nanoseconds execution_duration, + const absl::optional& first_acquisition_time) override; void setOutput(const nighthawk::client::Output& output) override { output_ = output; } nighthawk::client::Output toProto() const override; diff --git a/source/client/process_impl.cc b/source/client/process_impl.cc index 08cbaefe5..cbf3b695d 100644 --- a/source/client/process_impl.cc +++ b/source/client/process_impl.cc @@ -29,6 +29,7 @@ #include "external/envoy/source/server/server.h" #include "absl/strings/str_replace.h" +#include "absl/types/optional.h" // TODO(oschaaf): See if we can leverage a static module registration like Envoy does to avoid the // ifdefs in this file. @@ -164,32 +165,37 @@ bool ProcessImpl::requestExecutionCancellation() { return true; } -void ProcessImpl::createWorkers(const uint32_t concurrency) { - // TODO(oschaaf): Expose kMinimalDelay in configuration. - const std::chrono::milliseconds kMinimalWorkerDelay = 500ms + (concurrency * 50ms); - ASSERT(workers_.empty()); +Envoy::MonotonicTime +ProcessImpl::computeFirstWorkerStart(Envoy::Event::TimeSystem& time_system, + const absl::optional& scheduled_start, + const uint32_t concurrency) { + const std::chrono::nanoseconds first_worker_delay = + scheduled_start.has_value() ? scheduled_start.value() - time_system.systemTime() + : 500ms + (concurrency * 50ms); + const Envoy::MonotonicTime monotonic_now = time_system.monotonicTime(); + const Envoy::MonotonicTime first_worker_start = monotonic_now + first_worker_delay; + return first_worker_start; +} - // We try to offset the start of each thread so that workers will execute tasks evenly spaced in - // time. Let's assume we have two workers w0/w1, which should maintain a combined global pace of - // 1000Hz. w0 and w1 both run at 500Hz, but ideally their execution is evenly spaced in time, - // and not overlapping. Workers start offsets can be computed like - // "worker_number*(1/global_frequency))", which would yield T0+[0ms, 1ms]. This helps reduce - // batching/queueing effects, both initially, but also by calibrating the linear rate limiter we - // currently have to a precise starting time, which helps later on. - // TODO(oschaaf): Arguably, this ought to be the job of a rate limiter with awareness of the - // global status quo, which we do not have right now. This has been noted in the - // track-for-future issue. - const auto first_worker_start = time_system_.systemTime() + kMinimalWorkerDelay; - const double inter_worker_delay_usec = - (1. / options_.requestsPerSecond()) * 1000000 / concurrency; +std::chrono::nanoseconds ProcessImpl::computeInterWorkerDelay(const uint32_t concurrency, + const uint32_t rps) { + const double inter_worker_delay_usec = (1. / rps) * 1000000 / concurrency; + return std::chrono::duration_cast(inter_worker_delay_usec * 1us); +} + +void ProcessImpl::createWorkers(const uint32_t concurrency, + const absl::optional& scheduled_start) { + ASSERT(workers_.empty()); + const Envoy::MonotonicTime first_worker_start = + computeFirstWorkerStart(time_system_, scheduled_start, concurrency); + const std::chrono::nanoseconds inter_worker_delay = + computeInterWorkerDelay(concurrency, options_.requestsPerSecond()); int worker_number = 0; while (workers_.size() < concurrency) { - const auto worker_delay = std::chrono::duration_cast( - ((inter_worker_delay_usec * worker_number) * 1us)); workers_.push_back(std::make_unique( *api_, tls_, cluster_manager_, benchmark_client_factory_, termination_predicate_factory_, sequencer_factory_, request_generator_factory_, store_root_, worker_number, - first_worker_start + worker_delay, http_tracer_, + first_worker_start + (inter_worker_delay * worker_number), http_tracer_, options_.simpleWarmup() ? ClientWorkerImpl::HardCodedWarmupStyle::ON : ClientWorkerImpl::HardCodedWarmupStyle::OFF)); worker_number++; @@ -445,7 +451,13 @@ void ProcessImpl::setupStatsSinks(const envoy::config::bootstrap::v3::Bootstrap& } bool ProcessImpl::runInternal(OutputCollector& collector, const std::vector& uris, - const UriPtr& request_source_uri, const UriPtr& tracing_uri) { + const UriPtr& request_source_uri, const UriPtr& tracing_uri, + const absl::optional& scheduled_start) { + const Envoy::SystemTime now = time_system_.systemTime(); + if (scheduled_start.value_or(now) < now) { + ENVOY_LOG(error, "Scheduled execution date already transpired."); + return false; + } { auto guard = std::make_unique(workers_lock_); if (cancelled_) { @@ -461,7 +473,7 @@ bool ProcessImpl::runInternal(OutputCollector& collector, const std::vector( @@ -522,15 +534,26 @@ bool ProcessImpl::runInternal(OutputCollector& collector, const std::vector first_acquisition_time = absl::nullopt; + for (auto& worker : workers_) { auto sequencer_execution_duration = worker->phase().sequencer().executionDuration(); + absl::optional worker_first_acquisition_time = + worker->phase().sequencer().rate_limiter().firstAcquisitionTime(); + if (worker_first_acquisition_time.has_value()) { + first_acquisition_time = + first_acquisition_time.has_value() + ? std::min(first_acquisition_time.value(), worker_first_acquisition_time.value()) + : worker_first_acquisition_time.value(); + } // We don't write per-worker results if we only have a single worker, because the global // results will be precisely the same. if (workers_.size() > 1) { StatisticFactoryImpl statistic_factory(options_); collector.addResult(fmt::format("worker_{}", i), vectorizeStatisticPtrMap(worker->statistics()), - worker->threadLocalCounterValues(), sequencer_execution_duration); + worker->threadLocalCounterValues(), sequencer_execution_duration, + worker_first_acquisition_time); } total_execution_duration += sequencer_execution_duration; i++; @@ -545,7 +568,7 @@ bool ProcessImpl::runInternal(OutputCollector& collector, const std::vector 0; }); StatisticFactoryImpl statistic_factory(options_); collector.addResult("global", mergeWorkerStatistics(workers_), counters, - total_execution_duration / workers_.size()); + total_execution_duration / workers_.size(), first_acquisition_time); return counters.find("sequencer.failed_terminations") == counters.end(); } @@ -585,7 +608,8 @@ bool ProcessImpl::run(OutputCollector& collector) { } try { - return runInternal(collector, uris, request_source_uri, tracing_uri); + return runInternal(collector, uris, request_source_uri, tracing_uri, + options_.scheduled_start()); } catch (Envoy::EnvoyException& ex) { ENVOY_LOG(error, "Fatal exception: {}", ex.what()); throw; diff --git a/source/client/process_impl.h b/source/client/process_impl.h index f09bb05cd..d14b86e6a 100644 --- a/source/client/process_impl.h +++ b/source/client/process_impl.h @@ -109,7 +109,7 @@ class ProcessImpl : public Process, public Envoy::Logger::Loggable& schedule); std::vector vectorizeStatisticPtrMap(const StatisticPtrMap& statistics) const; std::vector mergeWorkerStatistics(const std::vector& workers) const; @@ -124,7 +124,40 @@ class ProcessImpl : public Process, public Envoy::Logger::Loggable>& stats_sinks); bool runInternal(OutputCollector& collector, const std::vector& uris, - const UriPtr& request_source_uri, const UriPtr& tracing_uri); + const UriPtr& request_source_uri, const UriPtr& tracing_uri, + const absl::optional& schedule); + + /** + * Compute the offset at which execution should start. We adhere to the scheduled start passed in + * as an argument when specified, otherwise we need a delay that will be sufficient for all the + * workers to get up and running. + * + * @param time_system Time system used to obtain the current time. + * @param scheduled_start Optional scheduled start. + * @param concurrency The number of workers that will be used during execution. + * @return Envoy::MonotonicTime Time at which execution should start. + */ + static Envoy::MonotonicTime + computeFirstWorkerStart(Envoy::Event::TimeSystem& time_system, + const absl::optional& scheduled_start, + const uint32_t concurrency); + + /** + * We offset the start of each thread so that workers will execute tasks evenly spaced in + * time. Let's assume we have two workers w0/w1, which should maintain a combined global pace of + * 1000Hz. w0 and w1 both run at 500Hz, but ideally their execution is evenly spaced in time, + * and not overlapping. Workers start offsets can be computed like + * "worker_number*(1/global_frequency))", which would yield T0+[0ms, 1ms]. This helps reduce + * batching/queueing effects, both initially, but also by calibrating the linear rate limiter we + * currently have to a precise starting time, which helps later on. + * + * @param concurrency The number of workers that will be used during execution. + * @param rps Anticipated requests per second during execution. + * @return std::chrono::nanoseconds The delay that should be used as an offset between each + * independent worker execution start. + */ + static std::chrono::nanoseconds computeInterWorkerDelay(const uint32_t concurrency, + const uint32_t rps); std::shared_ptr process_wide_; Envoy::PlatformImpl platform_impl_; diff --git a/source/common/rate_limiter_impl.cc b/source/common/rate_limiter_impl.cc index 7d4aad4fc..15f1fd71b 100644 --- a/source/common/rate_limiter_impl.cc +++ b/source/common/rate_limiter_impl.cc @@ -53,16 +53,16 @@ void BurstingRateLimiter::releaseOne() { } ScheduledStartingRateLimiter::ScheduledStartingRateLimiter( - RateLimiterPtr&& rate_limiter, const Envoy::SystemTime scheduled_starting_time) + RateLimiterPtr&& rate_limiter, const Envoy::MonotonicTime scheduled_starting_time) : ForwardingRateLimiterImpl(std::move(rate_limiter)), scheduled_starting_time_(scheduled_starting_time) { - if (timeSource().systemTime() >= scheduled_starting_time_) { + if (timeSource().monotonicTime() >= scheduled_starting_time_) { ENVOY_LOG(error, "Scheduled starting time exceeded. This may cause unintended bursty traffic."); } } bool ScheduledStartingRateLimiter::tryAcquireOne() { - if (timeSource().systemTime() < scheduled_starting_time_) { + if (timeSource().monotonicTime() < scheduled_starting_time_) { aquisition_attempted_ = true; return false; } @@ -76,7 +76,7 @@ bool ScheduledStartingRateLimiter::tryAcquireOne() { } void ScheduledStartingRateLimiter::releaseOne() { - if (timeSource().systemTime() < scheduled_starting_time_) { + if (timeSource().monotonicTime() < scheduled_starting_time_) { throw NighthawkException("Unexpected call to releaseOne()"); } return rate_limiter_->releaseOne(); diff --git a/source/common/rate_limiter_impl.h b/source/common/rate_limiter_impl.h index 70a42e0ae..1d22e5a43 100644 --- a/source/common/rate_limiter_impl.h +++ b/source/common/rate_limiter_impl.h @@ -30,14 +30,20 @@ class RateLimiterBaseImpl : public RateLimiter { // TODO(oschaaf): consider adding an explicit start() call to the interface. const auto now = time_source_.monotonicTime(); if (start_time_ == absl::nullopt) { + first_acquisition_time_ = time_source_.systemTime(); start_time_ = now; } return now - start_time_.value(); } + absl::optional firstAcquisitionTime() const override { + return first_acquisition_time_; + } + private: Envoy::TimeSource& time_source_; absl::optional start_time_; + absl::optional first_acquisition_time_; }; /** @@ -86,6 +92,9 @@ class ForwardingRateLimiterImpl : public RateLimiter { : rate_limiter_(std::move(rate_limiter)) {} Envoy::TimeSource& timeSource() override { return rate_limiter_->timeSource(); } std::chrono::nanoseconds elapsed() override { return rate_limiter_->elapsed(); } + absl::optional firstAcquisitionTime() const override { + return rate_limiter_->firstAcquisitionTime(); + } protected: const RateLimiterPtr rate_limiter_; @@ -125,12 +134,12 @@ class ScheduledStartingRateLimiter : public ForwardingRateLimiterImpl, * @param scheduled_starting_time The starting time */ ScheduledStartingRateLimiter(RateLimiterPtr&& rate_limiter, - const Envoy::SystemTime scheduled_starting_time); + const Envoy::MonotonicTime scheduled_starting_time); bool tryAcquireOne() override; void releaseOne() override; private: - const Envoy::SystemTime scheduled_starting_time_; + const Envoy::MonotonicTime scheduled_starting_time_; bool aquisition_attempted_{false}; }; diff --git a/source/common/sequencer_impl.h b/source/common/sequencer_impl.h index ff226b6d3..40f245d98 100644 --- a/source/common/sequencer_impl.h +++ b/source/common/sequencer_impl.h @@ -62,6 +62,8 @@ class SequencerImpl : public Sequencer, public Envoy::Logger::Loggableelapsed(); } + const RateLimiter& rate_limiter() const override { return *rate_limiter_; } + double completionsPerSecond() const override { const double usec = std::chrono::duration_cast(executionDuration()).count(); diff --git a/source/common/termination_predicate_impl.cc b/source/common/termination_predicate_impl.cc index d468562f8..d32f2006b 100644 --- a/source/common/termination_predicate_impl.cc +++ b/source/common/termination_predicate_impl.cc @@ -16,8 +16,8 @@ TerminationPredicate::Status TerminationPredicateBaseImpl::evaluateChain() { } TerminationPredicate::Status DurationTerminationPredicateImpl::evaluate() { - return time_source_.systemTime() - start_ > duration_ ? TerminationPredicate::Status::TERMINATE - : TerminationPredicate::Status::PROCEED; + return time_source_.monotonicTime() - start_ > duration_ ? TerminationPredicate::Status::TERMINATE + : TerminationPredicate::Status::PROCEED; } TerminationPredicate::Status StatsCounterAbsoluteThresholdTerminationPredicateImpl::evaluate() { diff --git a/source/common/termination_predicate_impl.h b/source/common/termination_predicate_impl.h index 9a23a8f02..c1c761345 100644 --- a/source/common/termination_predicate_impl.h +++ b/source/common/termination_predicate_impl.h @@ -35,13 +35,13 @@ class DurationTerminationPredicateImpl : public TerminationPredicateBaseImpl { public: DurationTerminationPredicateImpl(Envoy::TimeSource& time_source, std::chrono::microseconds duration, - const Envoy::SystemTime start) + const Envoy::MonotonicTime start) : time_source_(time_source), start_(start), duration_(duration) {} TerminationPredicate::Status evaluate() override; private: Envoy::TimeSource& time_source_; - const Envoy::SystemTime start_; + const Envoy::MonotonicTime start_; std::chrono::microseconds duration_; }; diff --git a/test/BUILD b/test/BUILD index 326d971ee..a56d270ef 100644 --- a/test/BUILD +++ b/test/BUILD @@ -154,6 +154,7 @@ envoy_cc_test( "//test/test_common:environment_lib", "@envoy//test/test_common:network_utility_lib", "@envoy//test/test_common:registry_lib", + "@envoy//test/test_common:simulated_time_system_lib", ], ) diff --git a/test/client_worker_test.cc b/test/client_worker_test.cc index 5f00cc723..8ffdf6680 100644 --- a/test/client_worker_test.cc +++ b/test/client_worker_test.cc @@ -118,7 +118,7 @@ TEST_F(ClientWorkerTest, BasicTest) { auto worker = std::make_unique( *api_, tls_, cluster_manager_ptr_, benchmark_client_factory_, termination_predicate_factory_, sequencer_factory_, request_generator_factory_, store_, worker_number, - time_system_.systemTime(), http_tracer_, ClientWorkerImpl::HardCodedWarmupStyle::ON); + time_system_.monotonicTime(), http_tracer_, ClientWorkerImpl::HardCodedWarmupStyle::ON); worker->start(); worker->waitForCompletion(); diff --git a/test/factories_test.cc b/test/factories_test.cc index 21cf49c9e..946ed1d45 100644 --- a/test/factories_test.cc +++ b/test/factories_test.cc @@ -198,7 +198,7 @@ class SequencerFactoryTest }; auto sequencer = factory.create(api_->timeSource(), dispatcher_, dummy_sequencer_target, std::make_unique(), stats_store_, - time_system.systemTime() + 10ms); + time_system.monotonicTime() + 10ms); EXPECT_NE(nullptr, sequencer.get()); } }; diff --git a/test/mocks/client/mock_options.h b/test/mocks/client/mock_options.h index be3c6635a..04fc35ec0 100644 --- a/test/mocks/client/mock_options.h +++ b/test/mocks/client/mock_options.h @@ -57,6 +57,7 @@ class MockOptions : public Options { MOCK_CONST_METHOD0(statsSinks, std::vector()); MOCK_CONST_METHOD0(statsFlushInterval, uint32_t()); MOCK_CONST_METHOD0(responseHeaderWithLatencyInput, std::string()); + MOCK_CONST_METHOD0(scheduled_start, absl::optional()); }; } // namespace Client diff --git a/test/mocks/common/mock_rate_limiter.h b/test/mocks/common/mock_rate_limiter.h index a07062b47..a36bc154f 100644 --- a/test/mocks/common/mock_rate_limiter.h +++ b/test/mocks/common/mock_rate_limiter.h @@ -14,6 +14,7 @@ class MockRateLimiter : public RateLimiter { MOCK_METHOD0(releaseOne, void()); MOCK_METHOD0(timeSource, Envoy::TimeSource&()); MOCK_METHOD0(elapsed, std::chrono::nanoseconds()); + MOCK_CONST_METHOD0(firstAcquisitionTime, absl::optional()); }; class MockDiscreteNumericDistributionSampler : public DiscreteNumericDistributionSampler { diff --git a/test/mocks/common/mock_sequencer.h b/test/mocks/common/mock_sequencer.h index 52014362b..7d22434b7 100644 --- a/test/mocks/common/mock_sequencer.h +++ b/test/mocks/common/mock_sequencer.h @@ -1,5 +1,6 @@ #pragma once +#include "nighthawk/common/rate_limiter.h" #include "nighthawk/common/sequencer.h" #include "gmock/gmock.h" @@ -16,6 +17,7 @@ class MockSequencer : public Sequencer { MOCK_CONST_METHOD0(executionDuration, std::chrono::nanoseconds()); MOCK_CONST_METHOD0(statistics, StatisticPtrMap()); MOCK_METHOD0(cancel, void()); + MOCK_CONST_METHOD0(rate_limiter, RateLimiter&()); }; } // namespace Nighthawk \ No newline at end of file diff --git a/test/mocks/common/mock_sequencer_factory.h b/test/mocks/common/mock_sequencer_factory.h index 96983e24f..63c972f26 100644 --- a/test/mocks/common/mock_sequencer_factory.h +++ b/test/mocks/common/mock_sequencer_factory.h @@ -14,7 +14,7 @@ class MockSequencerFactory : public SequencerFactory { const SequencerTarget& sequencer_target, TerminationPredicatePtr&& termination_predicate, Envoy::Stats::Scope& scope, - const Envoy::SystemTime scheduled_starting_time)); + const Envoy::MonotonicTime scheduled_starting_time)); }; } // namespace Nighthawk \ No newline at end of file diff --git a/test/mocks/common/mock_termination_predicate_factory.h b/test/mocks/common/mock_termination_predicate_factory.h index 23aed4bf2..e37e8f128 100644 --- a/test/mocks/common/mock_termination_predicate_factory.h +++ b/test/mocks/common/mock_termination_predicate_factory.h @@ -12,7 +12,7 @@ class MockTerminationPredicateFactory : public TerminationPredicateFactory { MOCK_CONST_METHOD3(create, TerminationPredicatePtr(Envoy::TimeSource& time_source, Envoy::Stats::Scope& scope, - const Envoy::SystemTime scheduled_starting_time)); + const Envoy::MonotonicTime scheduled_starting_time)); }; } // namespace Nighthawk \ No newline at end of file diff --git a/test/output_formatter_test.cc b/test/output_formatter_test.cc index 8e23185b7..9b334fcc7 100644 --- a/test/output_formatter_test.cc +++ b/test/output_formatter_test.cc @@ -85,9 +85,9 @@ class OutputCollectorTest : public Test { void setupCollector() { collector_ = std::make_unique(time_system_, options_); - collector_->addResult("worker_0", statistics_, counters_, 1s); - collector_->addResult("worker_1", statistics_, counters_, 1s); - collector_->addResult("global", statistics_, counters_, 1s); + collector_->addResult("worker_0", statistics_, counters_, 1s, time_system_.systemTime()); + collector_->addResult("worker_1", statistics_, counters_, 1s, absl::nullopt); + collector_->addResult("global", statistics_, counters_, 1s, time_system_.systemTime()); } nighthawk::client::CommandLineOptions command_line_options_; diff --git a/test/process_test.cc b/test/process_test.cc index e0bae9a71..7d1191790 100644 --- a/test/process_test.cc +++ b/test/process_test.cc @@ -1,3 +1,4 @@ +#include #include #include @@ -6,6 +7,7 @@ #include "external/envoy/test/test_common/environment.h" #include "external/envoy/test/test_common/network_utility.h" #include "external/envoy/test/test_common/registry.h" +#include "external/envoy/test/test_common/simulated_time_system.h" #include "external/envoy/test/test_common/utility.h" #include "common/uri_impl.h" @@ -178,6 +180,85 @@ TEST_P(ProcessTest, NoFlushWhenCancelExecutionBeforeLoadTestBegin) { EXPECT_EQ(numFlushes, 0); } +/** + * Fixture for executing the Nighthawk process with simulated time. + */ +class ProcessTestWithSimTime : public Envoy::Event::TestUsingSimulatedTime, + public TestWithParam { +public: + ProcessTestWithSimTime() + : options_(TestUtility::createOptionsImpl( + fmt::format("foo --duration 1 -v error --failure-predicate foo:0 --rps 10 https://{}/", + Envoy::Network::Test::getLoopbackAddressUrlString(GetParam())))){}; + +protected: + void run(std::function verify_callback) { + auto run_thread = std::thread([this, &verify_callback] { + ProcessPtr process = std::make_unique(*options_, simTime()); + OutputCollectorImpl collector(simTime(), *options_); + const bool result = process->run(collector); + process->shutdown(); + verify_callback(result, collector.toProto()); + }); + + // We introduce real-world sleeps to give the executing ProcessImpl + // an opportunity to observe passage of simulated time. We increase simulated + // time in three steps, to give it an opportunity to start at the wrong time + // in case there is an error in the scheduling logic it implements. + // Note that these sleeps may seem excessively long, but sanitizer runs may need that. + sleep(1); + // Move time to 1 second before the scheduled execution time. + simTime().setSystemTime(options_->scheduled_start().value() - 1s); + sleep(1); + // Move time right up to the scheduled execution time. + simTime().setSystemTime(options_->scheduled_start().value()); + sleep(1); + // Move time past the scheduled execution time and execution duration. + simTime().setSystemTime(options_->scheduled_start().value() + 2s); + // Wait for execution to wrap up. + run_thread.join(); + } + + void setScheduleOnOptions(std::chrono::nanoseconds ns_since_epoch) { + CommandLineOptionsPtr command_line = options_->toCommandLineOptions(); + *(command_line->mutable_scheduled_start()) = + Envoy::Protobuf::util::TimeUtil::NanosecondsToTimestamp(ns_since_epoch.count()); + options_ = std::make_unique(*command_line); + } + + OptionsPtr options_; +}; + +INSTANTIATE_TEST_SUITE_P(IpVersions, ProcessTestWithSimTime, + ValuesIn(Envoy::TestEnvironment::getIpVersionsForTest()), + Envoy::TestUtility::ipTestParamsToString); + +// Verify that scheduling execution ahead of time works, and that the execution start timestamp +// associated to the worker result correctly reflects the scheduled time. This should be spot on +// because we use simulated time. +TEST_P(ProcessTestWithSimTime, ScheduleAheadWorks) { + for (const auto& relative_schedule : std::vector{30s, 1h}) { + setScheduleOnOptions( + std::chrono::nanoseconds(simTime().systemTime().time_since_epoch() + relative_schedule)); + run([this](bool success, const nighthawk::client::Output& output) { + EXPECT_TRUE(success); + ASSERT_EQ(output.results_size(), 1); + EXPECT_EQ(Envoy::ProtobufUtil::TimeUtil::TimestampToNanoseconds( + output.results()[0].execution_start()), + options_->scheduled_start().value().time_since_epoch().count()); + }); + } +} + +// Verify that scheduling an execution in the past yields an error. +TEST_P(ProcessTestWithSimTime, ScheduleInThePastFails) { + setScheduleOnOptions(std::chrono::nanoseconds(simTime().systemTime().time_since_epoch() - 1s)); + run([](bool success, const nighthawk::client::Output& output) { + EXPECT_FALSE(success); + EXPECT_EQ(output.results_size(), 0); + }); +} + } // namespace } // namespace Client } // namespace Nighthawk diff --git a/test/rate_limiter_test.cc b/test/rate_limiter_test.cc index 296f88373..935cdada6 100644 --- a/test/rate_limiter_test.cc +++ b/test/rate_limiter_test.cc @@ -76,7 +76,8 @@ TEST_F(RateLimiterTest, ScheduledStartingRateLimiterTest) { // scheduled delay. This should be business as usual from a functional perspective, but internally // this rate limiter specializes on this case to log a warning message, and we want to cover that. for (const bool starting_late : std::vector{false, true}) { - const Envoy::SystemTime scheduled_starting_time = time_system.systemTime() + schedule_delay; + const Envoy::MonotonicTime scheduled_starting_time = + time_system.monotonicTime() + schedule_delay; std::unique_ptr mock_rate_limiter = std::make_unique(); MockRateLimiter& unsafe_mock_rate_limiter = *mock_rate_limiter; InSequence s; @@ -95,7 +96,7 @@ TEST_F(RateLimiterTest, ScheduledStartingRateLimiterTest) { } // We should expect zero releases until it is time to start. - while (time_system.systemTime() < scheduled_starting_time) { + while (time_system.monotonicTime() < scheduled_starting_time) { EXPECT_FALSE(rate_limiter->tryAcquireOne()); time_system.advanceTimeWait(1ms); } @@ -108,8 +109,8 @@ TEST_F(RateLimiterTest, ScheduledStartingRateLimiterTest) { TEST_F(RateLimiterTest, ScheduledStartingRateLimiterTestBadArgs) { Envoy::Event::SimulatedTimeSystem time_system; // Verify we enforce future-only scheduling. - for (const auto& timing : - std::vector{time_system.systemTime(), time_system.systemTime() - 10ms}) { + for (const auto& timing : std::vector{time_system.monotonicTime(), + time_system.monotonicTime() - 10ms}) { std::unique_ptr mock_rate_limiter = std::make_unique(); MockRateLimiter& unsafe_mock_rate_limiter = *mock_rate_limiter; EXPECT_CALL(unsafe_mock_rate_limiter, timeSource) diff --git a/test/termination_predicate_test.cc b/test/termination_predicate_test.cc index 387883152..4ab0bab53 100644 --- a/test/termination_predicate_test.cc +++ b/test/termination_predicate_test.cc @@ -25,7 +25,7 @@ class TerminationPredicateTest : public Test { TEST_F(TerminationPredicateTest, DurationTerminationPredicateImplTest) { const auto duration = 100us; - DurationTerminationPredicateImpl pred(time_system, duration, time_system.systemTime()); + DurationTerminationPredicateImpl pred(time_system, duration, time_system.monotonicTime()); EXPECT_EQ(pred.evaluate(), TerminationPredicate::Status::PROCEED); // move to the edge. time_system.advanceTimeWait(duration); diff --git a/test/test_data/output_formatter.json.gold b/test/test_data/output_formatter.json.gold index 1048bc9fd..d1f142906 100644 --- a/test/test_data/output_formatter.json.gold +++ b/test/test_data/output_formatter.json.gold @@ -190,7 +190,8 @@ "value": "1" } ], - "execution_duration": "1s" + "execution_duration": "1s", + "execution_start": "2009-02-13T23:31:31.567Z" }, { "name": "worker_1", @@ -558,7 +559,8 @@ "value": "1" } ], - "execution_duration": "1s" + "execution_duration": "1s", + "execution_start": "2009-02-13T23:31:31.567Z" } ], "version": { diff --git a/test/test_data/output_formatter.yaml.gold b/test/test_data/output_formatter.yaml.gold index bc1b9b750..6a4255c66 100644 --- a/test/test_data/output_formatter.yaml.gold +++ b/test/test_data/output_formatter.yaml.gold @@ -126,6 +126,7 @@ results: - name: foo value: 1 execution_duration: 1s + execution_start: 2009-02-13T23:31:31.567Z - name: worker_1 statistics: - count: 3 @@ -358,6 +359,7 @@ results: - name: foo value: 1 execution_duration: 1s + execution_start: 2009-02-13T23:31:31.567Z version: version: major_number: @version_major@