diff --git a/source/client/process_impl.cc b/source/client/process_impl.cc index e25119a9a..816d063f3 100644 --- a/source/client/process_impl.cc +++ b/source/client/process_impl.cc @@ -377,41 +377,8 @@ void ProcessImpl::addRequestSourceCluster( socket_address->set_port_value(uri.port()); } -bool ProcessImpl::run(OutputCollector& collector) { - std::vector uris; - UriPtr request_source_uri; - UriPtr tracing_uri; - - try { - // TODO(oschaaf): See if we can rid of resolving here. - // We now only do it to validate. - if (options_.uri().has_value()) { - uris.push_back(std::make_unique(options_.uri().value())); - } else { - for (const nighthawk::client::MultiTarget::Endpoint& endpoint : - options_.multiTargetEndpoints()) { - uris.push_back(std::make_unique(fmt::format( - "{}://{}:{}{}", options_.multiTargetUseHttps() ? "https" : "http", - endpoint.address().value(), endpoint.port().value(), options_.multiTargetPath()))); - } - } - for (const UriPtr& uri : uris) { - uri->resolve(*dispatcher_, Utility::translateFamilyOptionString(options_.addressFamily())); - } - if (options_.requestSource() != "") { - request_source_uri = std::make_unique(options_.requestSource()); - request_source_uri->resolve(*dispatcher_, - Utility::translateFamilyOptionString(options_.addressFamily())); - } - if (options_.trace() != "") { - tracing_uri = std::make_unique(options_.trace()); - tracing_uri->resolve(*dispatcher_, - Utility::translateFamilyOptionString(options_.addressFamily())); - } - } catch (const UriException&) { - return false; - } - +bool ProcessImpl::runInternal(OutputCollector& collector, const std::vector& uris, + const UriPtr& request_source_uri, const UriPtr& tracing_uri) { int number_of_workers = determineConcurrency(); shutdown_ = false; const std::vector& workers = createWorkers(number_of_workers); @@ -461,8 +428,8 @@ bool ProcessImpl::run(OutputCollector& collector) { std::chrono::nanoseconds total_execution_duration = 0ns; for (auto& worker : workers_) { auto sequencer_execution_duration = worker->phase().sequencer().executionDuration(); - // We don't write per-worker results if we only have a single worker, because the global results - // will be precisely the same. + // We don't write per-worker results if we only have a single worker, because the global + // results will be precisely the same. if (workers_.size() > 1) { StatisticFactoryImpl statistic_factory(options_); collector.addResult(fmt::format("worker_{}", i), @@ -473,10 +440,11 @@ bool ProcessImpl::run(OutputCollector& collector) { i++; } - // Note that above we use use counter values snapshotted by the workers right after its execution - // completes. Here we query the live counters to get to the global numbers. To make sure the - // global aggregated numbers line up, we must take care not to shut down the benchmark client - // before we do this, as that will increment certain counters like connections closed, etc. + // Note that above we use use counter values snapshotted by the workers right after its + // execution completes. Here we query the live counters to get to the global numbers. To make + // sure the global aggregated numbers line up, we must take care not to shut down the benchmark + // client before we do this, as that will increment certain counters like connections closed, + // etc. const auto& counters = Utility().mapCountersFromStore( store_root_, [](absl::string_view, uint64_t value) { return value > 0; }); StatisticFactoryImpl statistic_factory(options_); @@ -485,6 +453,49 @@ bool ProcessImpl::run(OutputCollector& collector) { return counters.find("sequencer.failed_terminations") == counters.end(); } +bool ProcessImpl::run(OutputCollector& collector) { + std::vector uris; + UriPtr request_source_uri; + UriPtr tracing_uri; + + try { + // TODO(oschaaf): See if we can rid of resolving here. + // We now only do it to validate. + if (options_.uri().has_value()) { + uris.push_back(std::make_unique(options_.uri().value())); + } else { + for (const nighthawk::client::MultiTarget::Endpoint& endpoint : + options_.multiTargetEndpoints()) { + uris.push_back(std::make_unique(fmt::format( + "{}://{}:{}{}", options_.multiTargetUseHttps() ? "https" : "http", + endpoint.address().value(), endpoint.port().value(), options_.multiTargetPath()))); + } + } + for (const UriPtr& uri : uris) { + uri->resolve(*dispatcher_, Utility::translateFamilyOptionString(options_.addressFamily())); + } + if (options_.requestSource() != "") { + request_source_uri = std::make_unique(options_.requestSource()); + request_source_uri->resolve(*dispatcher_, + Utility::translateFamilyOptionString(options_.addressFamily())); + } + if (options_.trace() != "") { + tracing_uri = std::make_unique(options_.trace()); + tracing_uri->resolve(*dispatcher_, + Utility::translateFamilyOptionString(options_.addressFamily())); + } + } catch (const UriException&) { + return false; + } + + try { + return runInternal(collector, uris, request_source_uri, tracing_uri); + } catch (Envoy::EnvoyException& ex) { + ENVOY_LOG(error, "Fatal exception: {}", ex.what()); + throw; + } +} + void ProcessImpl::setupForHRTimers() { // We override the local environment to indicate to libevent that we favor precision over // efficiency. Note that it is also possible to do this at setup time via libevent's api's. diff --git a/source/client/process_impl.h b/source/client/process_impl.h index c5226ef99..967ee7c59 100644 --- a/source/client/process_impl.h +++ b/source/client/process_impl.h @@ -93,6 +93,9 @@ class ProcessImpl : public Process, public Envoy::Logger::Loggable mergeWorkerStatistics(const std::vector& workers) const; void setupForHRTimers(); + bool runInternal(OutputCollector& collector, const std::vector& uris, + const UriPtr& request_source_uri, const UriPtr& tracing_uri); + Envoy::ProcessWide process_wide_; Envoy::PlatformImpl platform_impl_; Envoy::Event::TimeSystem& time_system_; diff --git a/test/BUILD b/test/BUILD index 2d3294744..2c9db7a40 100644 --- a/test/BUILD +++ b/test/BUILD @@ -59,6 +59,7 @@ envoy_cc_test( "@envoy//test/mocks/local_info:local_info_mocks", "@envoy//test/mocks/protobuf:protobuf_mocks", "@envoy//test/mocks/thread_local:thread_local_mocks", + "@envoy//test/test_common:simulated_time_system_lib", ], ) diff --git a/test/client_worker_test.cc b/test/client_worker_test.cc index 21240f234..5da4167bb 100644 --- a/test/client_worker_test.cc +++ b/test/client_worker_test.cc @@ -9,6 +9,7 @@ #include "external/envoy/test/mocks/local_info/mocks.h" #include "external/envoy/test/mocks/protobuf/mocks.h" #include "external/envoy/test/mocks/thread_local/mocks.h" +#include "external/envoy/test/test_common/simulated_time_system.h" #include "common/statistic_impl.h" @@ -74,11 +75,6 @@ class ClientWorkerTest : public Test { TerminationPredicatePtr createMockTerminationPredicate() { auto predicate = std::make_unique>(); ON_CALL(*predicate, appendToChain(_)).WillByDefault(ReturnRef(*predicate)); - EXPECT_CALL(*predicate, evaluateChain()) - .Times(AtLeast(0)) - .WillOnce(Return(TerminationPredicate::Status::PROCEED)) - .WillOnce(Return(TerminationPredicate::Status::TERMINATE)); - return predicate; } @@ -91,7 +87,7 @@ class ClientWorkerTest : public Test { MockRequestSourceFactory request_generator_factory_; Envoy::Stats::IsolatedStoreImpl store_; NiceMock tls_; - Envoy::Event::TestRealTimeSystem time_system_; + Envoy::Event::SimulatedTimeSystem time_system_; MockBenchmarkClient* benchmark_client_; MockSequencer* sequencer_; MockRequestSource* request_generator_; @@ -112,8 +108,7 @@ TEST_F(ClientWorkerTest, BasicTest) { InSequence dummy; EXPECT_CALL(*benchmark_client_, setShouldMeasureLatencies(false)).Times(1); EXPECT_CALL(*benchmark_client_, tryStartRequest(_)) - .Times(1) - .WillRepeatedly(Invoke(this, &ClientWorkerTest::CheckThreadChanged)); + .WillOnce(Invoke(this, &ClientWorkerTest::CheckThreadChanged)); EXPECT_CALL(*benchmark_client_, setShouldMeasureLatencies(true)).Times(1); EXPECT_CALL(*sequencer_, start).Times(1); EXPECT_CALL(*sequencer_, waitForCompletion).Times(1); @@ -124,8 +119,7 @@ TEST_F(ClientWorkerTest, BasicTest) { auto worker = std::make_unique( *api_, tls_, cluster_manager_ptr_, benchmark_client_factory_, termination_predicate_factory_, sequencer_factory_, request_generator_factory_, store_, worker_number, - time_system_.monotonicTime() + 10ms, http_tracer_, - ClientWorkerImpl::HardCodedWarmupStyle::ON); + time_system_.monotonicTime(), http_tracer_, ClientWorkerImpl::HardCodedWarmupStyle::ON); worker->start(); worker->waitForCompletion(); diff --git a/test/integration/integration_test.py b/test/integration/integration_test.py index 2309521a7..951d56b32 100644 --- a/test/integration/integration_test.py +++ b/test/integration/integration_test.py @@ -24,7 +24,7 @@ "-x", path, "-n", - "2" if isSanitizerRun() else "20" # Number of tests to run in parallel + "4" if isSanitizerRun() else "20" # Number of tests to run in parallel ], plugins=["xdist"]) exit(r) diff --git a/test/integration/test_integration_basics.py b/test/integration/test_integration_basics.py index 8a0153714..95619bfbf 100644 --- a/test/integration/test_integration_basics.py +++ b/test/integration/test_integration_basics.py @@ -554,19 +554,16 @@ def test_https_h1_sni(sni_test_server_fixture): # Verify failure when we set no host (will get plain http) parsed_json, _ = sni_test_server_fixture.runNighthawkClient( - [sni_test_server_fixture.getTestServerRootUri(), "--rps", "100", "--duration", "100"], + [sni_test_server_fixture.getTestServerRootUri(), "--rps", "20", "--duration", "100"], expect_failure=True) # Verify success when we use plain http and don't request the sni host - parsed_json, _ = sni_test_server_fixture.runNighthawkClient( - [sni_test_server_fixture.getTestServerRootUri(), "--rps", "100", "--duration", "100"], - expect_failure=True) - parsed_json, _ = sni_test_server_fixture.runNighthawkClient([ sni_test_server_fixture.getTestServerRootUri().replace("https://", "http://"), "--rps", "100", - "--duration", "100", "--termination-predicate", "benchmark.http_2xx:2" + "--duration", "20", "--termination-predicate", "benchmark.http_2xx:2" ], expect_failure=False) + counters = sni_test_server_fixture.getNighthawkCounterMapFromJson(parsed_json) assertCounterGreaterEqual(counters, "benchmark.http_2xx", 1) assertCounterGreaterEqual(counters, "upstream_cx_http1_total", 1)