Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
93 changes: 52 additions & 41 deletions source/client/process_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -377,41 +377,8 @@ void ProcessImpl::addRequestSourceCluster(
socket_address->set_port_value(uri.port());
}

bool ProcessImpl::run(OutputCollector& collector) {
std::vector<UriPtr> uris;
UriPtr request_source_uri;
UriPtr tracing_uri;

try {
// TODO(oschaaf): See if we can rid of resolving here.
// We now only do it to validate.
if (options_.uri().has_value()) {
uris.push_back(std::make_unique<UriImpl>(options_.uri().value()));
} else {
for (const nighthawk::client::MultiTarget::Endpoint& endpoint :
options_.multiTargetEndpoints()) {
uris.push_back(std::make_unique<UriImpl>(fmt::format(
"{}://{}:{}{}", options_.multiTargetUseHttps() ? "https" : "http",
endpoint.address().value(), endpoint.port().value(), options_.multiTargetPath())));
}
}
for (const UriPtr& uri : uris) {
uri->resolve(*dispatcher_, Utility::translateFamilyOptionString(options_.addressFamily()));
}
if (options_.requestSource() != "") {
request_source_uri = std::make_unique<UriImpl>(options_.requestSource());
request_source_uri->resolve(*dispatcher_,
Utility::translateFamilyOptionString(options_.addressFamily()));
}
if (options_.trace() != "") {
tracing_uri = std::make_unique<UriImpl>(options_.trace());
tracing_uri->resolve(*dispatcher_,
Utility::translateFamilyOptionString(options_.addressFamily()));
}
} catch (const UriException&) {
return false;
}

bool ProcessImpl::runInternal(OutputCollector& collector, const std::vector<UriPtr>& uris,
const UriPtr& request_source_uri, const UriPtr& tracing_uri) {
int number_of_workers = determineConcurrency();
shutdown_ = false;
const std::vector<ClientWorkerPtr>& workers = createWorkers(number_of_workers);
Expand Down Expand Up @@ -461,8 +428,8 @@ bool ProcessImpl::run(OutputCollector& collector) {
std::chrono::nanoseconds total_execution_duration = 0ns;
for (auto& worker : workers_) {
auto sequencer_execution_duration = worker->phase().sequencer().executionDuration();
// We don't write per-worker results if we only have a single worker, because the global results
// will be precisely the same.
// We don't write per-worker results if we only have a single worker, because the global
// results will be precisely the same.
if (workers_.size() > 1) {
StatisticFactoryImpl statistic_factory(options_);
collector.addResult(fmt::format("worker_{}", i),
Expand All @@ -473,10 +440,11 @@ bool ProcessImpl::run(OutputCollector& collector) {
i++;
}

// Note that above we use use counter values snapshotted by the workers right after its execution
// completes. Here we query the live counters to get to the global numbers. To make sure the
// global aggregated numbers line up, we must take care not to shut down the benchmark client
// before we do this, as that will increment certain counters like connections closed, etc.
// Note that above we use use counter values snapshotted by the workers right after its
// execution completes. Here we query the live counters to get to the global numbers. To make
// sure the global aggregated numbers line up, we must take care not to shut down the benchmark
// client before we do this, as that will increment certain counters like connections closed,
// etc.
const auto& counters = Utility().mapCountersFromStore(
store_root_, [](absl::string_view, uint64_t value) { return value > 0; });
StatisticFactoryImpl statistic_factory(options_);
Expand All @@ -485,6 +453,49 @@ bool ProcessImpl::run(OutputCollector& collector) {
return counters.find("sequencer.failed_terminations") == counters.end();
}

bool ProcessImpl::run(OutputCollector& collector) {
std::vector<UriPtr> uris;
UriPtr request_source_uri;
UriPtr tracing_uri;

try {
// TODO(oschaaf): See if we can rid of resolving here.
// We now only do it to validate.
if (options_.uri().has_value()) {
uris.push_back(std::make_unique<UriImpl>(options_.uri().value()));
} else {
for (const nighthawk::client::MultiTarget::Endpoint& endpoint :
options_.multiTargetEndpoints()) {
uris.push_back(std::make_unique<UriImpl>(fmt::format(
"{}://{}:{}{}", options_.multiTargetUseHttps() ? "https" : "http",
endpoint.address().value(), endpoint.port().value(), options_.multiTargetPath())));
}
}
for (const UriPtr& uri : uris) {
uri->resolve(*dispatcher_, Utility::translateFamilyOptionString(options_.addressFamily()));
}
if (options_.requestSource() != "") {
request_source_uri = std::make_unique<UriImpl>(options_.requestSource());
request_source_uri->resolve(*dispatcher_,
Utility::translateFamilyOptionString(options_.addressFamily()));
}
if (options_.trace() != "") {
tracing_uri = std::make_unique<UriImpl>(options_.trace());
tracing_uri->resolve(*dispatcher_,
Utility::translateFamilyOptionString(options_.addressFamily()));
}
} catch (const UriException&) {
return false;
}

try {
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This was already a fairly long function to begin with. How hard would it be to refactor the content of the new try { } into a separate private method? That way we could separate the exception handling (the outer run method) from the business logic (the new private method) and make this a bit easier to follow.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done in 9c89e74

return runInternal(collector, uris, request_source_uri, tracing_uri);
} catch (Envoy::EnvoyException& ex) {
ENVOY_LOG(error, "Fatal exception: {}", ex.what());
throw;
}
}

void ProcessImpl::setupForHRTimers() {
// We override the local environment to indicate to libevent that we favor precision over
// efficiency. Note that it is also possible to do this at setup time via libevent's api's.
Expand Down
3 changes: 3 additions & 0 deletions source/client/process_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,9 @@ class ProcessImpl : public Process, public Envoy::Logger::Loggable<Envoy::Logger
std::vector<StatisticPtr>
mergeWorkerStatistics(const std::vector<ClientWorkerPtr>& workers) const;
void setupForHRTimers();
bool runInternal(OutputCollector& collector, const std::vector<UriPtr>& uris,
const UriPtr& request_source_uri, const UriPtr& tracing_uri);

Envoy::ProcessWide process_wide_;
Envoy::PlatformImpl platform_impl_;
Envoy::Event::TimeSystem& time_system_;
Expand Down
1 change: 1 addition & 0 deletions test/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ envoy_cc_test(
"@envoy//test/mocks/local_info:local_info_mocks",
"@envoy//test/mocks/protobuf:protobuf_mocks",
"@envoy//test/mocks/thread_local:thread_local_mocks",
"@envoy//test/test_common:simulated_time_system_lib",
],
)

Expand Down
14 changes: 4 additions & 10 deletions test/client_worker_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
#include "external/envoy/test/mocks/local_info/mocks.h"
#include "external/envoy/test/mocks/protobuf/mocks.h"
#include "external/envoy/test/mocks/thread_local/mocks.h"
#include "external/envoy/test/test_common/simulated_time_system.h"

#include "common/statistic_impl.h"

Expand Down Expand Up @@ -74,11 +75,6 @@ class ClientWorkerTest : public Test {
TerminationPredicatePtr createMockTerminationPredicate() {
auto predicate = std::make_unique<NiceMock<MockTerminationPredicate>>();
ON_CALL(*predicate, appendToChain(_)).WillByDefault(ReturnRef(*predicate));
EXPECT_CALL(*predicate, evaluateChain())
.Times(AtLeast(0))
.WillOnce(Return(TerminationPredicate::Status::PROCEED))
.WillOnce(Return(TerminationPredicate::Status::TERMINATE));

return predicate;
}

Expand All @@ -91,7 +87,7 @@ class ClientWorkerTest : public Test {
MockRequestSourceFactory request_generator_factory_;
Envoy::Stats::IsolatedStoreImpl store_;
NiceMock<Envoy::ThreadLocal::MockInstance> tls_;
Envoy::Event::TestRealTimeSystem time_system_;
Envoy::Event::SimulatedTimeSystem time_system_;
MockBenchmarkClient* benchmark_client_;
MockSequencer* sequencer_;
MockRequestSource* request_generator_;
Expand All @@ -112,8 +108,7 @@ TEST_F(ClientWorkerTest, BasicTest) {
InSequence dummy;
EXPECT_CALL(*benchmark_client_, setShouldMeasureLatencies(false)).Times(1);
EXPECT_CALL(*benchmark_client_, tryStartRequest(_))
.Times(1)
.WillRepeatedly(Invoke(this, &ClientWorkerTest::CheckThreadChanged));
.WillOnce(Invoke(this, &ClientWorkerTest::CheckThreadChanged));
EXPECT_CALL(*benchmark_client_, setShouldMeasureLatencies(true)).Times(1);
EXPECT_CALL(*sequencer_, start).Times(1);
EXPECT_CALL(*sequencer_, waitForCompletion).Times(1);
Expand All @@ -124,8 +119,7 @@ TEST_F(ClientWorkerTest, BasicTest) {
auto worker = std::make_unique<ClientWorkerImpl>(
*api_, tls_, cluster_manager_ptr_, benchmark_client_factory_, termination_predicate_factory_,
sequencer_factory_, request_generator_factory_, store_, worker_number,
time_system_.monotonicTime() + 10ms, http_tracer_,
ClientWorkerImpl::HardCodedWarmupStyle::ON);
time_system_.monotonicTime(), http_tracer_, ClientWorkerImpl::HardCodedWarmupStyle::ON);

worker->start();
worker->waitForCompletion();
Expand Down
2 changes: 1 addition & 1 deletion test/integration/integration_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
"-x",
path,
"-n",
"2" if isSanitizerRun() else "20" # Number of tests to run in parallel
"4" if isSanitizerRun() else "20" # Number of tests to run in parallel
],
plugins=["xdist"])
exit(r)
9 changes: 3 additions & 6 deletions test/integration/test_integration_basics.py
Original file line number Diff line number Diff line change
Expand Up @@ -554,19 +554,16 @@ def test_https_h1_sni(sni_test_server_fixture):

# Verify failure when we set no host (will get plain http)
parsed_json, _ = sni_test_server_fixture.runNighthawkClient(
[sni_test_server_fixture.getTestServerRootUri(), "--rps", "100", "--duration", "100"],
[sni_test_server_fixture.getTestServerRootUri(), "--rps", "20", "--duration", "100"],
expect_failure=True)

# Verify success when we use plain http and don't request the sni host
parsed_json, _ = sni_test_server_fixture.runNighthawkClient(
[sni_test_server_fixture.getTestServerRootUri(), "--rps", "100", "--duration", "100"],
expect_failure=True)

parsed_json, _ = sni_test_server_fixture.runNighthawkClient([
sni_test_server_fixture.getTestServerRootUri().replace("https://", "http://"), "--rps", "100",
"--duration", "100", "--termination-predicate", "benchmark.http_2xx:2"
"--duration", "20", "--termination-predicate", "benchmark.http_2xx:2"
],
expect_failure=False)

counters = sni_test_server_fixture.getNighthawkCounterMapFromJson(parsed_json)
assertCounterGreaterEqual(counters, "benchmark.http_2xx", 1)
assertCounterGreaterEqual(counters, "upstream_cx_http1_total", 1)
Expand Down