Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -34,9 +34,11 @@ Calculating the minRTT
^^^^^^^^^^^^^^^^^^^^^^

The minRTT is periodically measured by only allowing a very low outstanding request count to an
upstream cluster and measuring the latency under these ideal conditions. The length of this minRTT
calculation window is variable depending on the number of requests the filter is configured to
aggregate to represent the expected latency of an upstream.
upstream cluster and measuring the latency under these ideal conditions. The calculation is also
triggered in scenarios where the concurrency limit is determined to be the minimum possible value
for 5 consecutive sampling windows. The length of this minRTT calculation window is variable
depending on the number of requests the filter is configured to aggregate to represent the expected
latency of an upstream.

A configurable *jitter* value is used to randomly delay the start of the minRTT calculation window
by some amount of time. This is not necessary and can be disabled; however, it is recommended to
Expand Down
2 changes: 2 additions & 0 deletions docs/root/intro/version_history.rst
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@ Version history
* access log: introduce :ref:`connection-level access loggers<envoy_api_field_Listener.access_log>`.
* adaptive concurrency: fixed bug that allowed concurrency limits to drop below the configured
minimum.
* adaptive concurrency: minRTT is now triggered when the minimum concurrency is maintained for 5
consecutive sampling intervals
* admin: added support for displaying ip address subject alternate names in :ref:`certs<operations_admin_interface_certs>` end point.
* admin: added :http:post:`/reopen_logs` endpoint to control log rotation.
* aws_lambda: added :ref:`AWS Lambda filter <config_http_filters_aws_lambda>` that converts HTTP requests to Lambda
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,8 +48,8 @@ GradientController::GradientController(GradientControllerConfig config,
const std::string& stats_prefix, Stats::Scope& scope,
Runtime::RandomGenerator& random)
: config_(std::move(config)), dispatcher_(dispatcher), scope_(scope),
stats_(generateStats(scope_, stats_prefix)), random_(random), num_rq_outstanding_(0),
concurrency_limit_(config_.minConcurrency()),
stats_(generateStats(scope_, stats_prefix)), random_(random), deferred_limit_value_(0),
num_rq_outstanding_(0), concurrency_limit_(config_.minConcurrency()),
latency_sample_hist_(hist_fast_alloc(), hist_free) {
min_rtt_calc_timer_ = dispatcher_.createTimer([this]() -> void { enterMinRTTSamplingWindow(); });

Expand Down Expand Up @@ -81,14 +81,22 @@ GradientControllerStats GradientController::generateStats(Stats::Scope& scope,
}

void GradientController::enterMinRTTSamplingWindow() {
// There a potential race condition where setting the minimum concurrency multiple times in a row
// resets the minRTT sampling timer and triggers the calculation immediately. This could occur
// after the minRTT sampling window has already been entered, so we can simply return here knowing
// the desired action is already being performed.
if (inMinRTTSamplingWindow()) {
return;
}

absl::MutexLock ml(&sample_mutation_mtx_);

stats_.min_rtt_calculation_active_.set(1);

// Set the minRTT flag to indicate we're gathering samples to update the value. This will
// prevent the sample window from resetting until enough requests are gathered to complete the
// recalculation.
deferred_limit_value_.store(concurrencyLimit());
deferred_limit_value_.store(GradientController::concurrencyLimit());
updateConcurrencyLimit(config_.minConcurrency());

// Throw away any latency samples from before the recalculation window as it may not represent
Expand Down Expand Up @@ -209,6 +217,32 @@ void GradientController::cancelLatencySample() {
--num_rq_outstanding_;
}

void GradientController::updateConcurrencyLimit(const uint32_t new_limit) {
const auto old_limit = concurrency_limit_.load();
concurrency_limit_.store(new_limit);
stats_.concurrency_limit_.set(concurrency_limit_.load());

if (!inMinRTTSamplingWindow() && old_limit == config_.minConcurrency() &&
new_limit == config_.minConcurrency()) {
++consecutive_min_concurrency_set_;
} else {
consecutive_min_concurrency_set_ = 0;
}

// If the concurrency limit is being set to the minimum value for the 5th consecutive sample
// window while not in the middle of a minRTT measurement, this might be indicative of an
// inaccurate minRTT measurement. Since the limit is already where it needs to be for a minRTT
// measurement, we should measure it again.
//
// There is a possibility that the minRTT measurement begins before we are able to
// cancel/re-enable the timer below and triggers overlapping minRTT windows. To protect against
// this, there is an explicit check when entering the minRTT measurement that ensures there is
// only a single minRTT measurement active at a time.
if (consecutive_min_concurrency_set_ >= 5) {
min_rtt_calc_timer_->enableTimer(std::chrono::milliseconds(0));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This won't end up spin looping because if this happens while not already in a window, this will start a process that waits for requests/responses to happen, right? Can you maybe put a clarifying comment about why this won't spin loop? This is also why you need the various inMinRTTSamplingWindow checks, right?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think any issues around spin looping are prevented first thing in enterMinRTTSamplingWindow. Added a comment to clarify this, but let me know if I addressed the right concern.

}
}

} // namespace Controller
} // namespace AdaptiveConcurrency
} // namespace HttpFilters
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -228,10 +228,8 @@ class GradientController : public ConcurrencyController {
void enterMinRTTSamplingWindow();
bool inMinRTTSamplingWindow() const { return deferred_limit_value_.load() > 0; }
void resetSampleWindow() ABSL_EXCLUSIVE_LOCKS_REQUIRED(sample_mutation_mtx_);
void updateConcurrencyLimit(const uint32_t new_limit) {
concurrency_limit_.store(new_limit);
stats_.concurrency_limit_.set(concurrency_limit_.load());
}
void updateConcurrencyLimit(const uint32_t new_limit)
ABSL_EXCLUSIVE_LOCKS_REQUIRED(sample_mutation_mtx_);
std::chrono::milliseconds applyJitter(std::chrono::milliseconds interval,
double jitter_pct) const;

Expand Down Expand Up @@ -271,6 +269,11 @@ class GradientController : public ConcurrencyController {
std::unique_ptr<histogram_t, decltype(&hist_free)>
latency_sample_hist_ ABSL_GUARDED_BY(sample_mutation_mtx_);

// Tracks the number of consecutive times that the concurrency limit is set to the minimum. This
// is used to determine whether the controller should trigger an additional minRTT measurement
// after remaining at the minimum limit for too long.
uint32_t consecutive_min_concurrency_set_ ABSL_GUARDED_BY(sample_mutation_mtx_);

Event::TimerPtr min_rtt_calc_timer_;
Event::TimerPtr sample_reset_timer_;
};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -624,6 +624,65 @@ TEST_F(GradientControllerTest, TimerAccuracyTestNoJitter) {
}
}

// Test that consecutively setting the concurrency limit to the minimum triggers a minRTT
// recalculation.
TEST_F(GradientControllerTest, ConsecutiveMinConcurrencyReset) {
const std::string yaml = R"EOF(
sample_aggregate_percentile:
value: 50
concurrency_limit_params:
max_concurrency_limit:
concurrency_update_interval: 0.1s
min_rtt_calc_params:
jitter:
value: 0.0
interval: 3600s
request_count: 5
buffer:
value: 0
min_concurrency: 7
)EOF";

auto controller = makeController(yaml);
EXPECT_EQ(controller->concurrencyLimit(), 7);

// Force a minRTT of 5ms.
advancePastMinRTTStage(controller, yaml, std::chrono::milliseconds(5));
EXPECT_EQ(
5, stats_.gauge("test_prefix.min_rtt_msecs", Stats::Gauge::ImportMode::NeverImport).value());

// Ensure that the concurrency window increases on its own due to the headroom calculation with
// the max gradient.
time_system_.sleep(std::chrono::milliseconds(101));
dispatcher_->run(Event::Dispatcher::RunType::Block);
EXPECT_GE(controller->concurrencyLimit(), 7);
EXPECT_LE(controller->concurrencyLimit() / 7.0, 2.0);

// Make it seem as if the recorded latencies are consistently higher than the measured minRTT to
// induce a minRTT recalculation after 5 iterations.
const auto elevated_latency = std::chrono::milliseconds(10);
for (int recalcs = 0; recalcs < 5; ++recalcs) {
for (int i = 1; i <= 5; ++i) {
tryForward(controller, true);
controller->recordLatencySample(elevated_latency);
}
time_system_.sleep(std::chrono::milliseconds(101));
dispatcher_->run(Event::Dispatcher::RunType::Block);
}

// Verify that the concurrency limit starts growing with newly measured minRTT.
for (int recalcs = 0; recalcs < 10; ++recalcs) {
const auto last_concurrency = controller->concurrencyLimit();
for (int i = 1; i <= 5; ++i) {
tryForward(controller, true);
controller->recordLatencySample(elevated_latency);
}
time_system_.sleep(std::chrono::milliseconds(101));
dispatcher_->run(Event::Dispatcher::RunType::Block);
EXPECT_GE(controller->concurrencyLimit(), last_concurrency);
}
}

} // namespace
} // namespace Controller
} // namespace AdaptiveConcurrency
Expand Down