Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions bazel/repositories.bzl
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
load("@bazel_tools//tools/build_defs/repo:http.bzl", "http_archive")

ENVOY_COMMIT = "e85a7f408c7baee8e1ed4af39a647c98ee5f2215" # Aug 10, 2021
ENVOY_SHA = "1745bf00a464327a0993b5229cae5e30423854a77b9b1d0c4ee96306e9aedc64"
ENVOY_COMMIT = "95038feabf260c3937465951d5da603d31ea3bd4" # Aug 12, 2021
ENVOY_SHA = "4a584b02c24ac24362eff2550977616f86a194e61106e15f723e1cf961ca145d"

HDR_HISTOGRAM_C_VERSION = "0.11.2" # October 12th, 2020
HDR_HISTOGRAM_C_SHA = "637f28b5f64de2e268131e4e34e6eef0b91cf5ff99167db447d9b2825eae6bad"
Expand Down
10 changes: 8 additions & 2 deletions source/client/client.cc
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,13 @@ bool Main::run() {
stub = std::make_unique<nighthawk::client::NighthawkService::Stub>(channel);
process = std::make_unique<RemoteProcessImpl>(*options_, *stub);
} else {
process = std::make_unique<ProcessImpl>(*options_, time_system);
absl::StatusOr<ProcessPtr> process_or_status =
ProcessImpl::CreateProcessImpl(*options_, time_system);
if (!process_or_status.ok()) {
ENVOY_LOG(error, "Unable to create ProcessImpl: {}", process_or_status.status().ToString());
return false;
}
process = std::move(*process_or_status);
}
OutputFormatterFactoryImpl output_formatter_factory;
OutputCollectorImpl output_collector(time_system, *options_);
Expand Down Expand Up @@ -100,4 +106,4 @@ bool Main::run() {
}

} // namespace Client
} // namespace Nighthawk
} // namespace Nighthawk
51 changes: 45 additions & 6 deletions source/client/process_bootstrap.cc
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@
#include "external/envoy_api/envoy/extensions/upstreams/http/v3/http_protocol_options.pb.h"

#include "source/client/sni_utility.h"
#include "source/common/uri_impl.h"
#include "source/common/utility.h"

namespace Nighthawk {
namespace {
Expand Down Expand Up @@ -175,15 +177,52 @@ Cluster createNighthawkClusterForWorker(const Client::Options& options,
return cluster;
}

// Extracts URIs of the targets and the request source (if specified) from the
// Nighthawk options.
// Resolves all the extracted URIs.
absl::Status extractAndResolveUrisFromOptions(Envoy::Event::Dispatcher& dispatcher,
const Client::Options& options,
std::vector<UriPtr>* uris,
UriPtr* request_source_uri) {
try {
if (options.uri().has_value()) {
uris->push_back(std::make_unique<UriImpl>(options.uri().value()));
} else {
for (const nighthawk::client::MultiTarget::Endpoint& endpoint :
options.multiTargetEndpoints()) {
uris->push_back(std::make_unique<UriImpl>(fmt::format(
"{}://{}:{}{}", options.multiTargetUseHttps() ? "https" : "http",
endpoint.address().value(), endpoint.port().value(), options.multiTargetPath())));
}
}
for (const UriPtr& uri : *uris) {
uri->resolve(dispatcher, Utility::translateFamilyOptionString(options.addressFamily()));
}
if (options.requestSource() != "") {
*request_source_uri = std::make_unique<UriImpl>(options.requestSource());
(*request_source_uri)
->resolve(dispatcher, Utility::translateFamilyOptionString(options.addressFamily()));
}
} catch (const UriException& ex) {
return absl::InvalidArgumentError(
fmt::format("URI exception (for example, malformed URI syntax, bad MultiTarget path, "
"unresolvable host DNS): {}",
ex.what()));
}
return absl::OkStatus();
}

} // namespace

absl::StatusOr<Bootstrap> createBootstrapConfiguration(const Client::Options& options,
const std::vector<UriPtr>& uris,
const UriPtr& request_source_uri,
absl::StatusOr<Bootstrap> createBootstrapConfiguration(Envoy::Event::Dispatcher& dispatcher,
const Client::Options& options,
int number_of_workers) {
if (uris.empty()) {
return absl::InvalidArgumentError(
"illegal configuration with zero endpoints, at least one uri must be specified");
std::vector<UriPtr> uris;
UriPtr request_source_uri;
absl::Status uri_status =
extractAndResolveUrisFromOptions(dispatcher, options, &uris, &request_source_uri);
if (!uri_status.ok()) {
return uri_status;
}

Bootstrap bootstrap;
Expand Down
14 changes: 5 additions & 9 deletions source/client/process_bootstrap.h
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
#include "nighthawk/common/uri.h"

#include "external/envoy/source/common/common/statusor.h"
#include "external/envoy/source/common/event/dispatcher_impl.h"
#include "external/envoy_api/envoy/config/bootstrap/v3/bootstrap.pb.h"

namespace Nighthawk {
Expand All @@ -14,21 +15,16 @@ namespace Nighthawk {
* The created bootstrap configuration can be used to upstream requests to the
* specified uris.
*
* @param dispatcher is used when resolving hostnames to IP addresses in the
bootstrap.
* @param options are the options this Nighthawk execution was triggered with.
* @param uris are the endpoints to which the requests will be upstreamed. At
* least one uri must be specified. It is assumed that all the uris have
* the same scheme (e.g. https). All the uri objects must already be
* resolved.
* @param request_source_uri is the address of the request source service to
* use, can be NULL if request source isn't used. If not NULL, the uri
* object must already be resolved.
* @param number_of_workers indicates how many Nighthawk workers will be
* upstreaming requests. A separate cluster is generated for each worker.
*
* @return the created bootstrap configuration.
*/
absl::StatusOr<envoy::config::bootstrap::v3::Bootstrap>
createBootstrapConfiguration(const Client::Options& options, const std::vector<UriPtr>& uris,
const UriPtr& request_source_uri, int number_of_workers);
createBootstrapConfiguration(Envoy::Event::Dispatcher& dispatcher, const Client::Options& options,
int number_of_workers);

} // namespace Nighthawk
158 changes: 83 additions & 75 deletions source/client/process_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,44 @@ using namespace std::chrono_literals;

namespace Nighthawk {
namespace Client {
namespace {

using ::envoy::config::bootstrap::v3::Bootstrap;

// Helps in generating a bootstrap for the process.
// This is a class only to allow the use of the ENVOY_LOG macros.
class BootstrapFactory : public Envoy::Logger::Loggable<Envoy::Logger::Id::main> {
public:
// Determines the concurrency Nighthawk should use based on configuration
// (options) and the available machine resources.
static uint32_t determineConcurrency(const Options& options) {
uint32_t cpu_cores_with_affinity = Envoy::OptionsImplPlatform::getCpuCount();
bool autoscale = options.concurrency() == "auto";
// TODO(oschaaf): Maybe, in the case where the concurrency flag is left out, but
// affinity is set / we don't have affinity with all cores, we should default to autoscale.
// (e.g. we are called via taskset).
uint32_t concurrency = autoscale ? cpu_cores_with_affinity : std::stoi(options.concurrency());

if (autoscale) {
ENVOY_LOG(info, "Detected {} (v)CPUs with affinity..", cpu_cores_with_affinity);
}
std::string duration_as_string =
options.noDuration() ? "No time limit"
: fmt::format("Time limit: {} seconds", options.duration().count());
ENVOY_LOG(info, "Starting {} threads / event loops. {}.", concurrency, duration_as_string);
ENVOY_LOG(info, "Global targets: {} connections and {} calls per second.",
options.connections() * concurrency, options.requestsPerSecond() * concurrency);

if (concurrency > 1) {
ENVOY_LOG(info, " (Per-worker targets: {} connections and {} calls per second)",
options.connections(), options.requestsPerSecond());
}

return concurrency;
}
};

} // namespace

// We customize ProdClusterManagerFactory for the sole purpose of returning our specialized
// http1 pool to the benchmark client, which allows us to offer connection prefetching.
Expand Down Expand Up @@ -123,17 +161,17 @@ class ClusterManagerFactory : public Envoy::Upstream::ProdClusterManagerFactory

ProcessImpl::ProcessImpl(const Options& options, Envoy::Event::TimeSystem& time_system,
const std::shared_ptr<Envoy::ProcessWide>& process_wide)
: process_wide_(process_wide == nullptr ? std::make_shared<Envoy::ProcessWide>()
: options_(options), number_of_workers_(BootstrapFactory::determineConcurrency(options_)),
process_wide_(process_wide == nullptr ? std::make_shared<Envoy::ProcessWide>()
: process_wide),
time_system_(time_system), stats_allocator_(symbol_table_), store_root_(stats_allocator_),
quic_stat_names_(store_root_.symbolTable()),
api_(std::make_unique<Envoy::Api::Impl>(platform_impl_.threadFactory(), store_root_,
time_system_, platform_impl_.fileSystem(),
generator_)),
time_system_, platform_impl_.fileSystem(), generator_,
bootstrap_)),
dispatcher_(api_->allocateDispatcher("main_thread")), benchmark_client_factory_(options),
termination_predicate_factory_(options), sequencer_factory_(options),
request_generator_factory_(options, *api_), options_(options),
init_manager_("nh_init_manager"),
request_generator_factory_(options, *api_), init_manager_("nh_init_manager"),
local_info_(new Envoy::LocalInfo::LocalInfoImpl(
store_root_.symbolTable(), node_, node_context_params_,
Envoy::Network::Utility::getLocalAddress(Envoy::Network::Address::IpVersion::v4),
Expand All @@ -153,6 +191,35 @@ ProcessImpl::ProcessImpl(const Options& options, Envoy::Event::TimeSystem& time_
configureComponentLogLevels(spdlog::level::from_str(lower));
}

absl::StatusOr<ProcessPtr>
ProcessImpl::CreateProcessImpl(const Options& options, Envoy::Event::TimeSystem& time_system,
const std::shared_ptr<Envoy::ProcessWide>& process_wide) {
std::unique_ptr<ProcessImpl> process(new ProcessImpl(options, time_system, process_wide));

absl::StatusOr<Bootstrap> bootstrap = createBootstrapConfiguration(
*process->dispatcher_, process->options_, process->number_of_workers_);
if (!bootstrap.ok()) {
ENVOY_LOG(error, "Failed to create bootstrap configuration: {}", bootstrap.status().message());
process->shutdown();
return bootstrap.status();
}

// Ideally we would create the bootstrap first and then pass it to the
// constructor of Envoy::Api::Api. That cannot be done because of a circular
// dependency:
// 1) The constructor of Envoy::Api::Api requires an instance of Bootstrap.
// 2) The bootstrap generator requires an Envoy::Event::Dispatcher to resolve
// URIs to IPs required in the Bootstrap.
// 3) The constructor of Envoy::Event::Dispatcher requires Envoy::Api::Api.
//
// Replacing the bootstrap_ after the Envoy::Api::Api has been created is
// assumed to be safe, because we still do it while constructing the
// ProcessImpl, i.e. before we start running the process.
process->bootstrap_ = *bootstrap;

return process;
}

ProcessImpl::~ProcessImpl() {
RELEASE_ASSERT(shutdown_, "shutdown not called before destruction.");
}
Expand Down Expand Up @@ -241,32 +308,6 @@ void ProcessImpl::configureComponentLogLevels(spdlog::level::level_enum level) {
logger_to_change->setLevel(level);
}

uint32_t ProcessImpl::determineConcurrency() const {
uint32_t cpu_cores_with_affinity = Envoy::OptionsImplPlatform::getCpuCount();
bool autoscale = options_.concurrency() == "auto";
// TODO(oschaaf): Maybe, in the case where the concurrency flag is left out, but
// affinity is set / we don't have affinity with all cores, we should default to autoscale.
// (e.g. we are called via taskset).
uint32_t concurrency = autoscale ? cpu_cores_with_affinity : std::stoi(options_.concurrency());

if (autoscale) {
ENVOY_LOG(info, "Detected {} (v)CPUs with affinity..", cpu_cores_with_affinity);
}
std::string duration_as_string =
options_.noDuration() ? "No time limit"
: fmt::format("Time limit: {} seconds", options_.duration().count());
ENVOY_LOG(info, "Starting {} threads / event loops. {}.", concurrency, duration_as_string);
ENVOY_LOG(info, "Global targets: {} connections and {} calls per second.",
options_.connections() * concurrency, options_.requestsPerSecond() * concurrency);

if (concurrency > 1) {
ENVOY_LOG(info, " (Per-worker targets: {} connections and {} calls per second)",
options_.connections(), options_.requestsPerSecond());
}

return concurrency;
}

std::vector<StatisticPtr>
ProcessImpl::vectorizeStatisticPtrMap(const StatisticPtrMap& statistics) const {
std::vector<StatisticPtr> v;
Expand Down Expand Up @@ -386,8 +427,7 @@ void ProcessImpl::setupStatsSinks(const envoy::config::bootstrap::v3::Bootstrap&
}
}

bool ProcessImpl::runInternal(OutputCollector& collector, const std::vector<UriPtr>& uris,
const UriPtr& request_source_uri, const UriPtr& tracing_uri,
bool ProcessImpl::runInternal(OutputCollector& collector, const UriPtr& tracing_uri,
const absl::optional<Envoy::SystemTime>& scheduled_start) {
const Envoy::SystemTime now = time_system_.systemTime();
if (scheduled_start.value_or(now) < now) {
Expand All @@ -399,24 +439,15 @@ bool ProcessImpl::runInternal(OutputCollector& collector, const std::vector<UriP
if (cancelled_) {
return true;
}
int number_of_workers = determineConcurrency();
shutdown_ = false;

absl::StatusOr<envoy::config::bootstrap::v3::Bootstrap> bootstrap =
createBootstrapConfiguration(options_, uris, request_source_uri, number_of_workers);
if (!bootstrap.ok()) {
ENVOY_LOG(error, "Failed to create bootstrap configuration: {}",
bootstrap.status().message());
return false;
}

// Needs to happen as early as possible (before createWorkers()) in the instantiation to preempt
// the objects that require stats.
if (!options_.statsSinks().empty()) {
store_root_.setTagProducer(Envoy::Config::Utility::createTagProducer(*bootstrap));
store_root_.setTagProducer(Envoy::Config::Utility::createTagProducer(bootstrap_));
}

createWorkers(number_of_workers, scheduled_start);
createWorkers(number_of_workers_, scheduled_start);
tls_.registerThread(*dispatcher_, true);
store_root_.initializeThreading(*dispatcher_, tls_);
runtime_singleton_ = std::make_unique<Envoy::Runtime::ScopedLoaderSingleton>(
Expand Down Expand Up @@ -446,21 +477,21 @@ bool ProcessImpl::runInternal(OutputCollector& collector, const std::vector<UriP
: Http1PoolImpl::ConnectionReuseStrategy::MRU);
cluster_manager_factory_->setPrefetchConnections(options_.prefetchConnections());
if (tracing_uri != nullptr) {
setupTracingImplementation(*bootstrap, *tracing_uri);
addTracingCluster(*bootstrap, *tracing_uri);
setupTracingImplementation(bootstrap_, *tracing_uri);
addTracingCluster(bootstrap_, *tracing_uri);
}
ENVOY_LOG(debug, "Computed configuration: {}", bootstrap->DebugString());
cluster_manager_ = cluster_manager_factory_->clusterManagerFromProto(*bootstrap);
maybeCreateTracingDriver(bootstrap->tracing());
ENVOY_LOG(debug, "Computed configuration: {}", bootstrap_.DebugString());
cluster_manager_ = cluster_manager_factory_->clusterManagerFromProto(bootstrap_);
maybeCreateTracingDriver(bootstrap_.tracing());
cluster_manager_->setInitializedCb(
[this]() -> void { init_manager_.initialize(init_watcher_); });

Envoy::Runtime::LoaderSingleton::get().initialize(*cluster_manager_);

std::list<std::unique_ptr<Envoy::Stats::Sink>> stats_sinks;
setupStatsSinks(*bootstrap, stats_sinks);
setupStatsSinks(bootstrap_, stats_sinks);
std::chrono::milliseconds stats_flush_interval = std::chrono::milliseconds(
Envoy::DurationUtil::durationToMilliseconds(bootstrap->stats_flush_interval()));
Envoy::DurationUtil::durationToMilliseconds(bootstrap_.stats_flush_interval()));

if (!options_.statsSinks().empty()) {
// There should be only a single live flush worker instance at any time.
Expand Down Expand Up @@ -541,31 +572,9 @@ bool ProcessImpl::runInternal(OutputCollector& collector, const std::vector<UriP
}

bool ProcessImpl::run(OutputCollector& collector) {
std::vector<UriPtr> uris;
UriPtr request_source_uri;
UriPtr tracing_uri;

try {
// TODO(oschaaf): See if we can rid of resolving here.
// We now only do it to validate.
if (options_.uri().has_value()) {
uris.push_back(std::make_unique<UriImpl>(options_.uri().value()));
} else {
for (const nighthawk::client::MultiTarget::Endpoint& endpoint :
options_.multiTargetEndpoints()) {
uris.push_back(std::make_unique<UriImpl>(fmt::format(
"{}://{}:{}{}", options_.multiTargetUseHttps() ? "https" : "http",
endpoint.address().value(), endpoint.port().value(), options_.multiTargetPath())));
}
}
for (const UriPtr& uri : uris) {
uri->resolve(*dispatcher_, Utility::translateFamilyOptionString(options_.addressFamily()));
}
if (options_.requestSource() != "") {
request_source_uri = std::make_unique<UriImpl>(options_.requestSource());
request_source_uri->resolve(*dispatcher_,
Utility::translateFamilyOptionString(options_.addressFamily()));
}
if (options_.trace() != "") {
tracing_uri = std::make_unique<UriImpl>(options_.trace());
tracing_uri->resolve(*dispatcher_,
Expand All @@ -580,8 +589,7 @@ bool ProcessImpl::run(OutputCollector& collector) {
}

try {
return runInternal(collector, uris, request_source_uri, tracing_uri,
options_.scheduled_start());
return runInternal(collector, tracing_uri, options_.scheduled_start());
} catch (Envoy::EnvoyException& ex) {
ENVOY_LOG(error, "Fatal exception: {}", ex.what());
throw;
Expand Down
Loading