Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions include/nighthawk/common/factories.h
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ class SequencerFactory {
const SequencerTarget& sequencer_target,
TerminationPredicatePtr&& termination_predicate,
Envoy::Stats::Scope& scope,
const Envoy::MonotonicTime scheduled_starting_time) const PURE;
const Envoy::SystemTime scheduled_starting_time) const PURE;
};

class StatisticFactory {
Expand All @@ -46,7 +46,7 @@ class TerminationPredicateFactory {
virtual ~TerminationPredicateFactory() = default;
virtual TerminationPredicatePtr
create(Envoy::TimeSource& time_source, Envoy::Stats::Scope& scope,
const Envoy::MonotonicTime scheduled_starting_time) const PURE;
const Envoy::SystemTime scheduled_starting_time) const PURE;
};

/**
Expand Down
2 changes: 1 addition & 1 deletion source/client/client_worker_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ ClientWorkerImpl::ClientWorkerImpl(Envoy::Api::Api& api, Envoy::ThreadLocal::Ins
const SequencerFactory& sequencer_factory,
const RequestSourceFactory& request_generator_factory,
Envoy::Stats::Store& store, const int worker_number,
const Envoy::MonotonicTime starting_time,
const Envoy::SystemTime starting_time,
Envoy::Tracing::HttpTracerSharedPtr& http_tracer,
const HardCodedWarmupStyle hardcoded_warmup_style)
: WorkerImpl(api, tls, store),
Expand Down
2 changes: 1 addition & 1 deletion source/client/client_worker_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ class ClientWorkerImpl : public WorkerImpl, virtual public ClientWorker {
const SequencerFactory& sequencer_factory,
const RequestSourceFactory& request_generator_factory,
Envoy::Stats::Store& store, const int worker_number,
const Envoy::MonotonicTime starting_time,
const Envoy::SystemTime starting_time,
Envoy::Tracing::HttpTracerSharedPtr& http_tracer,
const HardCodedWarmupStyle hardcoded_warmup_style);
StatisticPtrMap statistics() const override;
Expand Down
14 changes: 8 additions & 6 deletions source/client/factories_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -63,10 +63,12 @@ BenchmarkClientPtr BenchmarkClientFactoryImpl::create(
SequencerFactoryImpl::SequencerFactoryImpl(const Options& options)
: OptionBasedFactoryImpl(options) {}

SequencerPtr SequencerFactoryImpl::create(
Envoy::TimeSource& time_source, Envoy::Event::Dispatcher& dispatcher,
const SequencerTarget& sequencer_target, TerminationPredicatePtr&& termination_predicate,
Envoy::Stats::Scope& scope, const Envoy::MonotonicTime scheduled_starting_time) const {
SequencerPtr SequencerFactoryImpl::create(Envoy::TimeSource& time_source,
Envoy::Event::Dispatcher& dispatcher,
const SequencerTarget& sequencer_target,
TerminationPredicatePtr&& termination_predicate,
Envoy::Stats::Scope& scope,
const Envoy::SystemTime scheduled_starting_time) const {
StatisticFactoryImpl statistic_factory(options_);
Frequency frequency(options_.requestsPerSecond());
RateLimiterPtr rate_limiter = std::make_unique<ScheduledStartingRateLimiter>(
Expand All @@ -87,7 +89,7 @@ SequencerPtr SequencerFactoryImpl::create(
return std::make_unique<SequencerImpl>(
platform_util_, dispatcher, time_source, std::move(rate_limiter), sequencer_target,
statistic_factory.create(), statistic_factory.create(), options_.sequencerIdleStrategy(),
std::move(termination_predicate), scope, scheduled_starting_time);
std::move(termination_predicate), scope);
}

StatisticFactoryImpl::StatisticFactoryImpl(const Options& options)
Expand Down Expand Up @@ -184,7 +186,7 @@ TerminationPredicateFactoryImpl::TerminationPredicateFactoryImpl(const Options&

TerminationPredicatePtr
TerminationPredicateFactoryImpl::create(Envoy::TimeSource& time_source, Envoy::Stats::Scope& scope,
const Envoy::MonotonicTime scheduled_starting_time) const {
const Envoy::SystemTime scheduled_starting_time) const {
// We'll always link a predicate which checks for requests to cancel.
TerminationPredicatePtr root_predicate =
std::make_unique<StatsCounterAbsoluteThresholdTerminationPredicateImpl>(
Expand Down
4 changes: 2 additions & 2 deletions source/client/factories_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ class SequencerFactoryImpl : public OptionBasedFactoryImpl, public SequencerFact
SequencerPtr create(Envoy::TimeSource& time_source, Envoy::Event::Dispatcher& dispatcher,
const SequencerTarget& sequencer_target,
TerminationPredicatePtr&& termination_predicate, Envoy::Stats::Scope& scope,
const Envoy::MonotonicTime scheduled_starting_time) const override;
const Envoy::SystemTime scheduled_starting_time) const override;
};

class StatisticFactoryImpl : public OptionBasedFactoryImpl, public StatisticFactory {
Expand Down Expand Up @@ -73,7 +73,7 @@ class TerminationPredicateFactoryImpl : public OptionBasedFactoryImpl,
public:
TerminationPredicateFactoryImpl(const Options& options);
TerminationPredicatePtr create(Envoy::TimeSource& time_source, Envoy::Stats::Scope& scope,
const Envoy::MonotonicTime scheduled_starting_time) const override;
const Envoy::SystemTime scheduled_starting_time) const override;
TerminationPredicate* linkConfiguredPredicates(
TerminationPredicate& last_predicate, const TerminationPredicateMap& predicates,
const TerminationPredicate::Status termination_status, Envoy::Stats::Scope& scope) const;
Expand Down
2 changes: 1 addition & 1 deletion source/client/process_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -177,7 +177,7 @@ void ProcessImpl::createWorkers(const uint32_t concurrency) {
// TODO(oschaaf): Arguably, this ought to be the job of a rate limiter with awareness of the
// global status quo, which we do not have right now. This has been noted in the
// track-for-future issue.
const auto first_worker_start = time_system_.monotonicTime() + kMinimalWorkerDelay;
const auto first_worker_start = time_system_.systemTime() + kMinimalWorkerDelay;
const double inter_worker_delay_usec =
(1. / options_.requestsPerSecond()) * 1000000 / concurrency;
int worker_number = 0;
Expand Down
4 changes: 2 additions & 2 deletions source/common/cached_time_source_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,9 @@ class CachedTimeSourceImpl : public Envoy::TimeSource {
CachedTimeSourceImpl(Envoy::Event::Dispatcher& dispatcher) : dispatcher_(dispatcher) {}

/**
* Calling this will trigger an assert.
* @return Envoy::SystemTime current system time.
*/
Envoy::SystemTime systemTime() override { NOT_IMPLEMENTED_GCOVR_EXCL_LINE; };
Envoy::SystemTime systemTime() override { return dispatcher_.timeSource().systemTime(); };

/**
* @return Envoy::MonotonicTime cached monotonic time.
Expand Down
8 changes: 4 additions & 4 deletions source/common/rate_limiter_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -53,16 +53,16 @@ void BurstingRateLimiter::releaseOne() {
}

ScheduledStartingRateLimiter::ScheduledStartingRateLimiter(
RateLimiterPtr&& rate_limiter, const Envoy::MonotonicTime scheduled_starting_time)
RateLimiterPtr&& rate_limiter, const Envoy::SystemTime scheduled_starting_time)
: ForwardingRateLimiterImpl(std::move(rate_limiter)),
scheduled_starting_time_(scheduled_starting_time) {
if (timeSource().monotonicTime() >= scheduled_starting_time_) {
if (timeSource().systemTime() >= scheduled_starting_time_) {
ENVOY_LOG(error, "Scheduled starting time exceeded. This may cause unintended bursty traffic.");
}
}

bool ScheduledStartingRateLimiter::tryAcquireOne() {
if (timeSource().monotonicTime() < scheduled_starting_time_) {
if (timeSource().systemTime() < scheduled_starting_time_) {
aquisition_attempted_ = true;
return false;
}
Expand All @@ -76,7 +76,7 @@ bool ScheduledStartingRateLimiter::tryAcquireOne() {
}

void ScheduledStartingRateLimiter::releaseOne() {
if (timeSource().monotonicTime() < scheduled_starting_time_) {
if (timeSource().systemTime() < scheduled_starting_time_) {
throw NighthawkException("Unexpected call to releaseOne()");
}
return rate_limiter_->releaseOne();
Expand Down
4 changes: 2 additions & 2 deletions source/common/rate_limiter_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -124,12 +124,12 @@ class ScheduledStartingRateLimiter : public ForwardingRateLimiterImpl,
* @param scheduled_starting_time The starting time
*/
ScheduledStartingRateLimiter(RateLimiterPtr&& rate_limiter,
const Envoy::MonotonicTime scheduled_starting_time);
const Envoy::SystemTime scheduled_starting_time);
bool tryAcquireOne() override;
void releaseOne() override;

private:
const Envoy::MonotonicTime scheduled_starting_time_;
const Envoy::SystemTime scheduled_starting_time_;
bool aquisition_attempted_{false};
};

Expand Down
12 changes: 5 additions & 7 deletions source/common/sequencer_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,12 @@ SequencerImpl::SequencerImpl(
Envoy::TimeSource& time_source, RateLimiterPtr&& rate_limiter, SequencerTarget target,
StatisticPtr&& latency_statistic, StatisticPtr&& blocked_statistic,
nighthawk::client::SequencerIdleStrategy::SequencerIdleStrategyOptions idle_strategy,
TerminationPredicatePtr&& termination_predicate, Envoy::Stats::Scope& scope,
const Envoy::MonotonicTime scheduled_starting_time)
TerminationPredicatePtr&& termination_predicate, Envoy::Stats::Scope& scope)
: target_(std::move(target)), platform_util_(platform_util), dispatcher_(dispatcher),
time_source_(time_source), rate_limiter_(std::move(rate_limiter)),
latency_statistic_(std::move(latency_statistic)),
blocked_statistic_(std::move(blocked_statistic)), start_time_(scheduled_starting_time),
idle_strategy_(idle_strategy), termination_predicate_(std::move(termination_predicate)),
blocked_statistic_(std::move(blocked_statistic)), idle_strategy_(idle_strategy),
termination_predicate_(std::move(termination_predicate)),
last_termination_status_(TerminationPredicate::Status::PROCEED),
scope_(scope.createScope("sequencer.")),
sequencer_stats_({ALL_SEQUENCER_STATS(POOL_COUNTER(*scope_))}) {
Expand Down Expand Up @@ -57,8 +56,7 @@ void SequencerImpl::stop(bool failed) {
spin_timer_.reset();
dispatcher_.exit();
unblockAndUpdateStatisticIfNeeded(time_source_.monotonicTime());
const auto ran_for =
std::chrono::duration_cast<std::chrono::milliseconds>(last_event_time_ - start_time_);
const auto ran_for = std::chrono::duration_cast<std::chrono::milliseconds>(executionDuration());
ENVOY_LOG(info,
"Stopping after {} ms. Initiated: {} / Completed: {}. "
"(Completion rate was {} per second.)",
Expand Down Expand Up @@ -93,7 +91,7 @@ void SequencerImpl::run(bool from_periodic_timer) {
// More importantly, it may help avoid a class of bugs that could be more serious, depending on
// functionality (TOC/TOU).
dispatcher_.updateApproximateMonotonicTime();
const auto now = last_event_time_ = time_source_.monotonicTime();
const auto now = time_source_.monotonicTime();

last_termination_status_ = last_termination_status_ == TerminationPredicate::Status::PROCEED
? termination_predicate_->evaluateChain()
Expand Down
12 changes: 3 additions & 9 deletions source/common/sequencer_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -47,8 +47,7 @@ class SequencerImpl : public Sequencer, public Envoy::Logger::Loggable<Envoy::Lo
Envoy::TimeSource& time_source, RateLimiterPtr&& rate_limiter, SequencerTarget target,
StatisticPtr&& latency_statistic, StatisticPtr&& blocked_statistic,
nighthawk::client::SequencerIdleStrategy::SequencerIdleStrategyOptions idle_strategy,
TerminationPredicatePtr&& termination_predicate, Envoy::Stats::Scope& scope,
const Envoy::MonotonicTime scheduled_starting_time);
TerminationPredicatePtr&& termination_predicate, Envoy::Stats::Scope& scope);

/**
* Starts the Sequencer. Should be followed up with a call to waitForCompletion().
Expand All @@ -61,14 +60,11 @@ class SequencerImpl : public Sequencer, public Envoy::Logger::Loggable<Envoy::Lo
*/
void waitForCompletion() override;

std::chrono::nanoseconds executionDuration() const override {
return last_event_time_ - start_time_;
}
std::chrono::nanoseconds executionDuration() const override { return rate_limiter_->elapsed(); }

double completionsPerSecond() const override {
const double usec =
std::chrono::duration_cast<std::chrono::microseconds>(last_event_time_ - start_time_)
.count();
std::chrono::duration_cast<std::chrono::microseconds>(executionDuration()).count();

return usec == 0 ? 0 : ((targets_completed_ / usec) * 1000000);
}
Expand Down Expand Up @@ -120,8 +116,6 @@ class SequencerImpl : public Sequencer, public Envoy::Logger::Loggable<Envoy::Lo
StatisticPtr blocked_statistic_;
Envoy::Event::TimerPtr periodic_timer_;
Envoy::Event::TimerPtr spin_timer_;
const Envoy::MonotonicTime start_time_;
Envoy::MonotonicTime last_event_time_;
uint64_t targets_initiated_{0};
uint64_t targets_completed_{0};
bool running_{};
Expand Down
4 changes: 2 additions & 2 deletions source/common/termination_predicate_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,8 @@ TerminationPredicate::Status TerminationPredicateBaseImpl::evaluateChain() {
}

TerminationPredicate::Status DurationTerminationPredicateImpl::evaluate() {
return time_source_.monotonicTime() - start_ > duration_ ? TerminationPredicate::Status::TERMINATE
: TerminationPredicate::Status::PROCEED;
return time_source_.systemTime() - start_ > duration_ ? TerminationPredicate::Status::TERMINATE
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This change looks good. in fact, I think it's possible this is more accurate now. Is there any chance this represents a change in behavior that we should document in the description?

Copy link
Copy Markdown
Member Author

@oschaaf oschaaf Sep 29, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Well, SystemTime doesn't guarantee to always move forward across calls to get snapshots of it, like MonotonicTime does, and may it be adjusted while we are polling it. But there's only a very small window in which this can affect operation here: the duration that the main thread requests workers to wait before starting execution. That delay is computed here:

const std::chrono::milliseconds kMinimalWorkerDelay = 500ms + (concurrency * 50ms);

Reasoning through clock updates that get applied right between our scheduling and starting of operations:

  1. with small updates, load generation may start a little earlier or later, no problem. Any durations that get measured for latency or execution are based on monotonic time and will not be affected.
  2. when the clock jumps forward a lot, worst case workers won't have sufficient time to get ready to start because the clock moved back in time significantly, but they will observe that and complain in the logs about it. (Execution results may be noisy because of workers having missed their schedules to start).
  3. when the clock jumps backwards a lot, workers will wait longer before starting execution. This isn't a problem, unless it's a huge leap backwards in time, in which case the wait might take a long time as well.

Also, suspend/sleep might work a little differently, I suspect that MonotonicTime may not track time spend suspended/sleeping. 2. from above more or less applies here as well.

All in all, I think chances are pretty small of anyone running into trouble because of this?

: TerminationPredicate::Status::PROCEED;
}

TerminationPredicate::Status StatsCounterAbsoluteThresholdTerminationPredicateImpl::evaluate() {
Expand Down
4 changes: 2 additions & 2 deletions source/common/termination_predicate_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -35,13 +35,13 @@ class DurationTerminationPredicateImpl : public TerminationPredicateBaseImpl {
public:
DurationTerminationPredicateImpl(Envoy::TimeSource& time_source,
std::chrono::microseconds duration,
const Envoy::MonotonicTime start)
const Envoy::SystemTime start)
: time_source_(time_source), start_(start), duration_(duration) {}
TerminationPredicate::Status evaluate() override;

private:
Envoy::TimeSource& time_source_;
const Envoy::MonotonicTime start_;
const Envoy::SystemTime start_;
std::chrono::microseconds duration_;
};

Expand Down
2 changes: 1 addition & 1 deletion test/client_worker_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ TEST_F(ClientWorkerTest, BasicTest) {
auto worker = std::make_unique<ClientWorkerImpl>(
*api_, tls_, cluster_manager_ptr_, benchmark_client_factory_, termination_predicate_factory_,
sequencer_factory_, request_generator_factory_, store_, worker_number,
time_system_.monotonicTime(), http_tracer_, ClientWorkerImpl::HardCodedWarmupStyle::ON);
time_system_.systemTime(), http_tracer_, ClientWorkerImpl::HardCodedWarmupStyle::ON);

worker->start();
worker->waitForCompletion();
Expand Down
2 changes: 1 addition & 1 deletion test/factories_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ class SequencerFactoryTest
};
auto sequencer = factory.create(api_->timeSource(), dispatcher_, dummy_sequencer_target,
std::make_unique<MockTerminationPredicate>(), stats_store_,
time_system.monotonicTime() + 10ms);
time_system.systemTime() + 10ms);
EXPECT_NE(nullptr, sequencer.get());
}
};
Expand Down
2 changes: 1 addition & 1 deletion test/mocks/common/mock_sequencer_factory.h
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ class MockSequencerFactory : public SequencerFactory {
const SequencerTarget& sequencer_target,
TerminationPredicatePtr&& termination_predicate,
Envoy::Stats::Scope& scope,
const Envoy::MonotonicTime scheduled_starting_time));
const Envoy::SystemTime scheduled_starting_time));
};

} // namespace Nighthawk
2 changes: 1 addition & 1 deletion test/mocks/common/mock_termination_predicate_factory.h
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ class MockTerminationPredicateFactory : public TerminationPredicateFactory {
MOCK_CONST_METHOD3(create,
TerminationPredicatePtr(Envoy::TimeSource& time_source,
Envoy::Stats::Scope& scope,
const Envoy::MonotonicTime scheduled_starting_time));
const Envoy::SystemTime scheduled_starting_time));
};

} // namespace Nighthawk
9 changes: 4 additions & 5 deletions test/rate_limiter_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -75,8 +75,7 @@ TEST_F(RateLimiterTest, ScheduledStartingRateLimiterTest) {
// scheduled delay. This should be business as usual from a functional perspective, but internally
// this rate limiter specializes on this case to log a warning message, and we want to cover that.
for (const bool starting_late : std::vector<bool>{false, true}) {
const Envoy::MonotonicTime scheduled_starting_time =
time_system.monotonicTime() + schedule_delay;
const Envoy::SystemTime scheduled_starting_time = time_system.systemTime() + schedule_delay;
std::unique_ptr<MockRateLimiter> mock_rate_limiter = std::make_unique<MockRateLimiter>();
MockRateLimiter& unsafe_mock_rate_limiter = *mock_rate_limiter;
InSequence s;
Expand All @@ -95,7 +94,7 @@ TEST_F(RateLimiterTest, ScheduledStartingRateLimiterTest) {
}

// We should expect zero releases until it is time to start.
while (time_system.monotonicTime() < scheduled_starting_time) {
while (time_system.systemTime() < scheduled_starting_time) {
EXPECT_FALSE(rate_limiter->tryAcquireOne());
time_system.advanceTimeWait(1ms);
}
Expand All @@ -108,8 +107,8 @@ TEST_F(RateLimiterTest, ScheduledStartingRateLimiterTest) {
TEST_F(RateLimiterTest, ScheduledStartingRateLimiterTestBadArgs) {
Envoy::Event::SimulatedTimeSystem time_system;
// Verify we enforce future-only scheduling.
for (const auto timing : std::vector<Envoy::MonotonicTime>{time_system.monotonicTime(),
time_system.monotonicTime() - 10ms}) {
for (const auto timing :
std::vector<Envoy::SystemTime>{time_system.systemTime(), time_system.systemTime() - 10ms}) {
std::unique_ptr<MockRateLimiter> mock_rate_limiter = std::make_unique<MockRateLimiter>();
MockRateLimiter& unsafe_mock_rate_limiter = *mock_rate_limiter;
EXPECT_CALL(unsafe_mock_rate_limiter, timeSource)
Expand Down
Loading