Skip to content
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions api/client/options.proto
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ syntax = "proto3";
package nighthawk.client;

import "google/protobuf/duration.proto";
import "google/protobuf/timestamp.proto";
import "google/protobuf/wrappers.proto";
import "envoy/config/core/v3/base.proto";
import "envoy/config/metrics/v3/stats.proto";
Expand Down Expand Up @@ -219,4 +220,7 @@ message CommandLineOptions {
// "emit_previous_request_delta_in_response_header" to record elapsed time between request
// arrivals.
google.protobuf.StringValue latency_response_header_name = 36;
// Provide an execution starting date and time. Optional, any value specified must be in the
// future.
google.protobuf.Timestamp schedule = 105;
Comment thread
oschaaf marked this conversation as resolved.
Outdated
}
1 change: 1 addition & 0 deletions api/client/output.proto
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ message Result {
repeated Statistic statistics = 2;
repeated Counter counters = 3;
google.protobuf.Duration execution_duration = 4;
google.protobuf.Timestamp execution_start = 5;
}

message Output {
Expand Down
3 changes: 2 additions & 1 deletion include/nighthawk/client/options.h
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
#include <string>

#include "envoy/common/pure.h"
#include "envoy/common/time.h"
#include "envoy/config/cluster/v3/cluster.pb.h"
#include "envoy/config/core/v3/base.pb.h"
#include "envoy/config/metrics/v3/stats.pb.h"
Expand Down Expand Up @@ -74,7 +75,7 @@ class Options {
virtual std::vector<envoy::config::metrics::v3::StatsSink> statsSinks() const PURE;
virtual uint32_t statsFlushInterval() const PURE;
virtual std::string responseHeaderWithLatencyInput() const PURE;

virtual absl::optional<Envoy::SystemTime> schedule() const PURE;
/**
* Converts an Options instance to an equivalent CommandLineOptions instance in terms of option
* values.
Expand Down
6 changes: 5 additions & 1 deletion include/nighthawk/client/output_collector.h
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,12 @@
#include <memory>

#include "envoy/common/pure.h"
#include "envoy/common/time.h"

#include "nighthawk/common/statistic.h"

#include "absl/types/optional.h"

namespace Nighthawk {
namespace Client {

Expand All @@ -26,7 +29,8 @@ class OutputCollector {
*/
virtual void addResult(absl::string_view name, const std::vector<StatisticPtr>& statistics,
const std::map<std::string, uint64_t>& counters,
const std::chrono::nanoseconds execution_duration) PURE;
const std::chrono::nanoseconds execution_duration,
const absl::optional<Envoy::SystemTime>& first_acquisition_time) PURE;

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add comment for first_acquisition_time?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

will likely be addressed when the comment is added, but I'm not sure i understand what acquisition means in this context.

/**
* Directly sets the output value.
*
Expand Down
4 changes: 2 additions & 2 deletions include/nighthawk/common/factories.h
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ class SequencerFactory {
const SequencerTarget& sequencer_target,
TerminationPredicatePtr&& termination_predicate,
Envoy::Stats::Scope& scope,
const Envoy::SystemTime scheduled_starting_time) const PURE;
const Envoy::MonotonicTime scheduled_starting_time) const PURE;

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not actionable, just making sure I'm understanding - this is where we're moving back to MonotonicTime to prevent #569 ?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, that is right.

};

class StatisticFactory {
Expand All @@ -46,7 +46,7 @@ class TerminationPredicateFactory {
virtual ~TerminationPredicateFactory() = default;
virtual TerminationPredicatePtr
create(Envoy::TimeSource& time_source, Envoy::Stats::Scope& scope,
const Envoy::SystemTime scheduled_starting_time) const PURE;
const Envoy::MonotonicTime scheduled_starting_time) const PURE;
};

/**
Expand Down
6 changes: 6 additions & 0 deletions include/nighthawk/common/rate_limiter.h
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,12 @@ class RateLimiter {
* @return Envoy::TimeSource& time_source used to track time.
*/
virtual Envoy::TimeSource& timeSource() PURE;

/**
* @return absl::optional<Envoy::SystemTime> Time of the first acquisition, if any.
*/
virtual absl::optional<Envoy::SystemTime> firstAcquisitionTime() const PURE;

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I might have just missed it in this pull request (if so, please point me to the right file), but while I'm seeing code that writes this field or passes it around, I'm not finding where it's actually used functionally. What is its intended purpose?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.


/**
* @return std::chrono::nanoseconds elapsed since the first call to tryAcquireOne(). Used by some
* rate limiter implementations to compute acquisition rate.
Expand Down
6 changes: 6 additions & 0 deletions include/nighthawk/common/sequencer.h
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
#include "envoy/common/pure.h"

#include "nighthawk/common/operation_callback.h"
#include "nighthawk/common/rate_limiter.h"
#include "nighthawk/common/statistic.h"

namespace Nighthawk {
Expand Down Expand Up @@ -35,6 +36,11 @@ class Sequencer {
*/
virtual std::chrono::nanoseconds executionDuration() const PURE;

/**
* @return RateLimiter& reference to the rate limiter associated to this sequencer.
*/
virtual const RateLimiter& rate_limiter() const PURE;

/**
* @return double an up-to-date completions per second rate.
*/
Expand Down
2 changes: 1 addition & 1 deletion source/client/client_worker_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ ClientWorkerImpl::ClientWorkerImpl(Envoy::Api::Api& api, Envoy::ThreadLocal::Ins
const SequencerFactory& sequencer_factory,
const RequestSourceFactory& request_generator_factory,
Envoy::Stats::Store& store, const int worker_number,
const Envoy::SystemTime starting_time,
const Envoy::MonotonicTime starting_time,
Envoy::Tracing::HttpTracerSharedPtr& http_tracer,
const HardCodedWarmupStyle hardcoded_warmup_style)
: WorkerImpl(api, tls, store),
Expand Down
2 changes: 1 addition & 1 deletion source/client/client_worker_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ class ClientWorkerImpl : public WorkerImpl, virtual public ClientWorker {
const SequencerFactory& sequencer_factory,
const RequestSourceFactory& request_generator_factory,
Envoy::Stats::Store& store, const int worker_number,
const Envoy::SystemTime starting_time,
const Envoy::MonotonicTime starting_time,
Envoy::Tracing::HttpTracerSharedPtr& http_tracer,
const HardCodedWarmupStyle hardcoded_warmup_style);
StatisticPtrMap statistics() const override;
Expand Down
12 changes: 5 additions & 7 deletions source/client/factories_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -65,12 +65,10 @@ BenchmarkClientPtr BenchmarkClientFactoryImpl::create(
SequencerFactoryImpl::SequencerFactoryImpl(const Options& options)
: OptionBasedFactoryImpl(options) {}

SequencerPtr SequencerFactoryImpl::create(Envoy::TimeSource& time_source,
Envoy::Event::Dispatcher& dispatcher,
const SequencerTarget& sequencer_target,
TerminationPredicatePtr&& termination_predicate,
Envoy::Stats::Scope& scope,
const Envoy::SystemTime scheduled_starting_time) const {
SequencerPtr SequencerFactoryImpl::create(
Envoy::TimeSource& time_source, Envoy::Event::Dispatcher& dispatcher,
const SequencerTarget& sequencer_target, TerminationPredicatePtr&& termination_predicate,
Envoy::Stats::Scope& scope, const Envoy::MonotonicTime scheduled_starting_time) const {
StatisticFactoryImpl statistic_factory(options_);
Frequency frequency(options_.requestsPerSecond());
RateLimiterPtr rate_limiter = std::make_unique<ScheduledStartingRateLimiter>(
Expand Down Expand Up @@ -211,7 +209,7 @@ TerminationPredicateFactoryImpl::TerminationPredicateFactoryImpl(const Options&

TerminationPredicatePtr
TerminationPredicateFactoryImpl::create(Envoy::TimeSource& time_source, Envoy::Stats::Scope& scope,
const Envoy::SystemTime scheduled_starting_time) const {
const Envoy::MonotonicTime scheduled_starting_time) const {
// We'll always link a predicate which checks for requests to cancel.
TerminationPredicatePtr root_predicate =
std::make_unique<StatsCounterAbsoluteThresholdTerminationPredicateImpl>(
Expand Down
4 changes: 2 additions & 2 deletions source/client/factories_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ class SequencerFactoryImpl : public OptionBasedFactoryImpl, public SequencerFact
SequencerPtr create(Envoy::TimeSource& time_source, Envoy::Event::Dispatcher& dispatcher,
const SequencerTarget& sequencer_target,
TerminationPredicatePtr&& termination_predicate, Envoy::Stats::Scope& scope,
const Envoy::SystemTime scheduled_starting_time) const override;
const Envoy::MonotonicTime scheduled_starting_time) const override;
};

class StatisticFactoryImpl : public OptionBasedFactoryImpl, public StatisticFactory {
Expand Down Expand Up @@ -93,7 +93,7 @@ class TerminationPredicateFactoryImpl : public OptionBasedFactoryImpl,
public:
TerminationPredicateFactoryImpl(const Options& options);
TerminationPredicatePtr create(Envoy::TimeSource& time_source, Envoy::Stats::Scope& scope,
const Envoy::SystemTime scheduled_starting_time) const override;
const Envoy::MonotonicTime scheduled_starting_time) const override;
TerminationPredicate* linkConfiguredPredicates(
TerminationPredicate& last_predicate, const TerminationPredicateMap& predicates,
const TerminationPredicate::Status termination_status, Envoy::Stats::Scope& scope) const;
Expand Down
12 changes: 11 additions & 1 deletion source/client/options_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -651,7 +651,12 @@ OptionsImpl::OptionsImpl(const nighthawk::client::CommandLineOptions& options) {
std::copy(options.labels().begin(), options.labels().end(), std::back_inserter(labels_));
latency_response_header_name_ = PROTOBUF_GET_WRAPPED_OR_DEFAULT(
options, latency_response_header_name, latency_response_header_name_);

if (options.has_schedule()) {
const auto elapsed_since_epoch = std::chrono::nanoseconds(options.schedule().nanos()) +
std::chrono::seconds(options.schedule().seconds());
schedule_ =
Envoy::SystemTime(std::chrono::time_point<std::chrono::system_clock>(elapsed_since_epoch));
}
validate();
}

Expand Down Expand Up @@ -828,6 +833,11 @@ CommandLineOptionsPtr OptionsImpl::toCommandLineOptionsInternal() const {
command_line_options->mutable_stats_flush_interval()->set_value(stats_flush_interval_);
command_line_options->mutable_latency_response_header_name()->set_value(
latency_response_header_name_);
if (schedule_.has_value()) {
*(command_line_options->mutable_schedule()) =
Envoy::ProtobufUtil::TimeUtil::NanosecondsToTimestamp(
schedule_.value().time_since_epoch().count());
}
return command_line_options;
}

Expand Down
2 changes: 2 additions & 0 deletions source/client/options_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,7 @@ class OptionsImpl : public Options, public Envoy::Logger::Loggable<Envoy::Logger
std::string responseHeaderWithLatencyInput() const override {
return latency_response_header_name_;
};
absl::optional<Envoy::SystemTime> schedule() const override { return schedule_; }

private:
void parsePredicates(const TCLAP::MultiArg<std::string>& arg,
Expand Down Expand Up @@ -149,6 +150,7 @@ class OptionsImpl : public Options, public Envoy::Logger::Loggable<Envoy::Logger
std::vector<envoy::config::metrics::v3::StatsSink> stats_sinks_;
uint32_t stats_flush_interval_{5};
std::string latency_response_header_name_;
absl::optional<Envoy::SystemTime> schedule_;
};

} // namespace Client
Expand Down
15 changes: 11 additions & 4 deletions source/client/output_collector_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,19 @@ OutputCollectorImpl::OutputCollectorImpl(Envoy::TimeSource& time_source, const O

nighthawk::client::Output OutputCollectorImpl::toProto() const { return output_; }

void OutputCollectorImpl::addResult(absl::string_view name,
const std::vector<StatisticPtr>& statistics,
const std::map<std::string, uint64_t>& counters,
const std::chrono::nanoseconds execution_duration) {
void OutputCollectorImpl::addResult(
absl::string_view name, const std::vector<StatisticPtr>& statistics,
const std::map<std::string, uint64_t>& counters,
const std::chrono::nanoseconds execution_duration,
const absl::optional<Envoy::SystemTime>& first_acquisition_time) {
auto result = output_.add_results();
result->set_name(name.data(), name.size());
if (first_acquisition_time.has_value()) {
*(result->mutable_execution_start()) = Envoy::Protobuf::util::TimeUtil::NanosecondsToTimestamp(
std::chrono::duration_cast<std::chrono::nanoseconds>(
first_acquisition_time.value().time_since_epoch())
.count());
}
for (auto& statistic : statistics) {
// TODO(#292): Looking at if the statistic id ends with "_size" to determine how it should be
// serialized is kind of hacky. Maybe we should have a lookup table of sorts, to determine how
Expand Down
3 changes: 2 additions & 1 deletion source/client/output_collector_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,8 @@ class OutputCollectorImpl : public OutputCollector {

void addResult(absl::string_view name, const std::vector<StatisticPtr>& statistics,
const std::map<std::string, uint64_t>& counters,
const std::chrono::nanoseconds execution_duration) override;
const std::chrono::nanoseconds execution_duration,
const absl::optional<Envoy::SystemTime>& first_acquisition_time) override;
void setOutput(const nighthawk::client::Output& output) override { output_ = output; }

nighthawk::client::Output toProto() const override;
Expand Down
36 changes: 29 additions & 7 deletions source/client/process_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
#include "external/envoy/source/server/server.h"

#include "absl/strings/str_replace.h"
#include "absl/types/optional.h"

// TODO(oschaaf): See if we can leverage a static module registration like Envoy does to avoid the
// ifdefs in this file.
Expand Down Expand Up @@ -164,7 +165,8 @@ bool ProcessImpl::requestExecutionCancellation() {
return true;
}

void ProcessImpl::createWorkers(const uint32_t concurrency) {
void ProcessImpl::createWorkers(const uint32_t concurrency,
const absl::optional<Envoy::SystemTime>& schedule) {
// TODO(oschaaf): Expose kMinimalDelay in configuration.
const std::chrono::milliseconds kMinimalWorkerDelay = 500ms + (concurrency * 50ms);
ASSERT(workers_.empty());
Expand All @@ -179,7 +181,10 @@ void ProcessImpl::createWorkers(const uint32_t concurrency) {
// TODO(oschaaf): Arguably, this ought to be the job of a rate limiter with awareness of the
Comment thread
oschaaf marked this conversation as resolved.
Outdated
// global status quo, which we do not have right now. This has been noted in the
// track-for-future issue.
const auto first_worker_start = time_system_.systemTime() + kMinimalWorkerDelay;
const Envoy::MonotonicTime monotonic_now = time_system_.monotonicTime();

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Three comments here:

  1. This has gotten complex enough (especially with the long comments above to explain what's going on), that I think it might warrant its own privately scoped function. CalculateWorkerStartTime_ or something?

const std::chrono::nanoseconds offset =

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  1. I might be misunderstanding but this ternary seems off to me. kMinimalWorkerDelay has the worker offsets baked into it, correct? So, if we are using a scheduled start time, and we aren't using minimal worker delay at all, then we aren't including any worker offsets from each other.

So now, batching behavior of nighthawk acts differently depending on whether or not we are using a schedule which doesn't seem right.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This computes the start of the first worker, we will still be injecting an offset between each worker below. I refactored this for clarity, hopefully it's easier to read now.

schedule.has_value() ? schedule.value() - time_system_.systemTime() : kMinimalWorkerDelay;

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  1. We are heavily explaining the workerdelay behavior here in the comment above. Can we add a one-line comment for how the schedule behavior fits in? For instance, if I am wrong about point 2 above, we should say why the scheduled workers don't need an offset delay.

const Envoy::MonotonicTime first_worker_start = monotonic_now + offset;
const double inter_worker_delay_usec =
(1. / options_.requestsPerSecond()) * 1000000 / concurrency;
int worker_number = 0;
Expand Down Expand Up @@ -445,7 +450,13 @@ void ProcessImpl::setupStatsSinks(const envoy::config::bootstrap::v3::Bootstrap&
}

bool ProcessImpl::runInternal(OutputCollector& collector, const std::vector<UriPtr>& uris,
const UriPtr& request_source_uri, const UriPtr& tracing_uri) {
const UriPtr& request_source_uri, const UriPtr& tracing_uri,
const absl::optional<Envoy::SystemTime>& schedule) {
const Envoy::SystemTime now = time_system_.systemTime();
if (schedule.value_or(now) < now) {
ENVOY_LOG(error, "Scheduled execution date already transpired.");
return false;
}
{
auto guard = std::make_unique<Envoy::Thread::LockGuard>(workers_lock_);
if (cancelled_) {
Expand All @@ -461,7 +472,7 @@ bool ProcessImpl::runInternal(OutputCollector& collector, const std::vector<UriP
store_root_.setTagProducer(Envoy::Config::Utility::createTagProducer(bootstrap));
}

createWorkers(number_of_workers);
createWorkers(number_of_workers, schedule);
tls_.registerThread(*dispatcher_, true);
store_root_.initializeThreading(*dispatcher_, tls_);
runtime_singleton_ = std::make_unique<Envoy::Runtime::ScopedLoaderSingleton>(
Expand Down Expand Up @@ -522,15 +533,26 @@ bool ProcessImpl::runInternal(OutputCollector& collector, const std::vector<UriP

int i = 0;
std::chrono::nanoseconds total_execution_duration = 0ns;
absl::optional<Envoy::SystemTime> first_acquisition_time = absl::nullopt;

for (auto& worker : workers_) {
auto sequencer_execution_duration = worker->phase().sequencer().executionDuration();
absl::optional<Envoy::SystemTime> worker_first_acquisition_time =
worker->phase().sequencer().rate_limiter().firstAcquisitionTime();
if (worker_first_acquisition_time.has_value()) {
first_acquisition_time =
first_acquisition_time.has_value()
? std::min(first_acquisition_time.value(), worker_first_acquisition_time.value())
: worker_first_acquisition_time.value();
}
// We don't write per-worker results if we only have a single worker, because the global
// results will be precisely the same.
if (workers_.size() > 1) {
StatisticFactoryImpl statistic_factory(options_);
collector.addResult(fmt::format("worker_{}", i),
vectorizeStatisticPtrMap(worker->statistics()),
worker->threadLocalCounterValues(), sequencer_execution_duration);
worker->threadLocalCounterValues(), sequencer_execution_duration,
worker_first_acquisition_time);
}
total_execution_duration += sequencer_execution_duration;
i++;
Expand All @@ -545,7 +567,7 @@ bool ProcessImpl::runInternal(OutputCollector& collector, const std::vector<UriP
store_root_, [](absl::string_view, uint64_t value) { return value > 0; });
StatisticFactoryImpl statistic_factory(options_);
collector.addResult("global", mergeWorkerStatistics(workers_), counters,
total_execution_duration / workers_.size());
total_execution_duration / workers_.size(), first_acquisition_time);
return counters.find("sequencer.failed_terminations") == counters.end();
}

Expand Down Expand Up @@ -585,7 +607,7 @@ bool ProcessImpl::run(OutputCollector& collector) {
}

try {
return runInternal(collector, uris, request_source_uri, tracing_uri);
return runInternal(collector, uris, request_source_uri, tracing_uri, options_.schedule());
} catch (Envoy::EnvoyException& ex) {
ENVOY_LOG(error, "Fatal exception: {}", ex.what());
throw;
Expand Down
5 changes: 3 additions & 2 deletions source/client/process_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ class ProcessImpl : public Process, public Envoy::Logger::Loggable<Envoy::Logger
*
* @param concurrency the amount of workers that should be created.
*/
void createWorkers(const uint32_t concurrency);
void createWorkers(const uint32_t concurrency, const absl::optional<Envoy::SystemTime>& schedule);
std::vector<StatisticPtr> vectorizeStatisticPtrMap(const StatisticPtrMap& statistics) const;
std::vector<StatisticPtr>
mergeWorkerStatistics(const std::vector<ClientWorkerPtr>& workers) const;
Expand All @@ -124,7 +124,8 @@ class ProcessImpl : public Process, public Envoy::Logger::Loggable<Envoy::Logger
void setupStatsSinks(const envoy::config::bootstrap::v3::Bootstrap& bootstrap,
std::list<std::unique_ptr<Envoy::Stats::Sink>>& stats_sinks);
bool runInternal(OutputCollector& collector, const std::vector<UriPtr>& uris,
const UriPtr& request_source_uri, const UriPtr& tracing_uri);
const UriPtr& request_source_uri, const UriPtr& tracing_uri,
const absl::optional<Envoy::SystemTime>& schedule);

std::shared_ptr<Envoy::ProcessWide> process_wide_;
Envoy::PlatformImpl platform_impl_;
Expand Down
Loading