Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix race conditions in GenericRateLimiter #10374

Closed
wants to merge 11 commits into from
Closed
1 change: 1 addition & 0 deletions HISTORY.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

### Bug Fixes
* Fix a bug where `GenericRateLimiter` could revert the bandwidth set dynamically using `SetBytesPerSecond()` when a user configures a structure enclosing it, e.g., using `GetOptionsFromString()` to configure an `Options` that references an existing `RateLimiter` object.
* Fix race conditions in `GenericRateLimiter`.

## 7.5.0 (07/15/2022)
### New Features
Expand Down
16 changes: 16 additions & 0 deletions options/options_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -4958,6 +4958,22 @@ TEST_F(ConfigOptionsTest, MergeOperatorFromString) {
ASSERT_EQ(*delimiter, "&&");
}

TEST_F(ConfigOptionsTest, ConfiguringOptionsDoesNotRevertRateLimiterBandwidth) {
// Regression test for bug where rate limiter's dynamically set bandwidth
// could be silently reverted when configuring an options structure with an
// existing `rate_limiter`.
Options base_options;
base_options.rate_limiter.reset(
NewGenericRateLimiter(1 << 20 /* rate_bytes_per_sec */));
Options copy_options(base_options);

base_options.rate_limiter->SetBytesPerSecond(2 << 20);
ASSERT_EQ(2 << 20, base_options.rate_limiter->GetBytesPerSecond());

ASSERT_OK(GetOptionsFromString(base_options, "", &copy_options));
ASSERT_EQ(2 << 20, base_options.rate_limiter->GetBytesPerSecond());
}

INSTANTIATE_TEST_CASE_P(OptionsSanityCheckTest, OptionsSanityCheckTest,
::testing::Bool());
#endif // !ROCKSDB_LITE
Expand Down
48 changes: 27 additions & 21 deletions util/rate_limiter.cc
Original file line number Diff line number Diff line change
Expand Up @@ -54,20 +54,20 @@ GenericRateLimiter::GenericRateLimiter(
rate_bytes_per_sec_(auto_tuned ? rate_bytes_per_sec / 2
: rate_bytes_per_sec),
refill_bytes_per_period_(
CalculateRefillBytesPerPeriod(rate_bytes_per_sec_)),
CalculateRefillBytesPerPeriodLocked(rate_bytes_per_sec_)),
clock_(clock),
stop_(false),
exit_cv_(&request_mutex_),
requests_to_wait_(0),
available_bytes_(0),
next_refill_us_(NowMicrosMonotonic()),
next_refill_us_(NowMicrosMonotonicLocked()),
fairness_(fairness > 100 ? 100 : fairness),
rnd_((uint32_t)time(nullptr)),
wait_until_refill_pending_(false),
auto_tuned_(auto_tuned),
num_drains_(0),
max_bytes_per_sec_(rate_bytes_per_sec),
tuned_time_(NowMicrosMonotonic()) {
tuned_time_(NowMicrosMonotonicLocked()) {
for (int i = Env::IO_LOW; i < Env::IO_TOTAL; ++i) {
total_requests_[i] = 0;
total_bytes_through_[i] = 0;
Expand Down Expand Up @@ -97,10 +97,15 @@ GenericRateLimiter::~GenericRateLimiter() {

// This API allows user to dynamically change rate limiter's bytes per second.
void GenericRateLimiter::SetBytesPerSecond(int64_t bytes_per_second) {
MutexLock g(&request_mutex_);
SetBytesPerSecondLocked(bytes_per_second);
}

void GenericRateLimiter::SetBytesPerSecondLocked(int64_t bytes_per_second) {
assert(bytes_per_second > 0);
rate_bytes_per_sec_ = bytes_per_second;
rate_bytes_per_sec_.store(bytes_per_second, std::memory_order_relaxed);
refill_bytes_per_period_.store(
CalculateRefillBytesPerPeriod(bytes_per_second),
CalculateRefillBytesPerPeriodLocked(bytes_per_second),
std::memory_order_relaxed);
}

Expand All @@ -115,10 +120,10 @@ void GenericRateLimiter::Request(int64_t bytes, const Env::IOPriority pri,

if (auto_tuned_) {
static const int kRefillsPerTune = 100;
std::chrono::microseconds now(NowMicrosMonotonic());
std::chrono::microseconds now(NowMicrosMonotonicLocked());
if (now - tuned_time_ >=
kRefillsPerTune * std::chrono::microseconds(refill_period_us_)) {
Status s = Tune();
Status s = TuneLocked();
s.PermitUncheckedError(); //**TODO: What to do on error?
}
}
Expand Down Expand Up @@ -152,7 +157,7 @@ void GenericRateLimiter::Request(int64_t bytes, const Env::IOPriority pri,
// (1) Waiting for the next refill time.
// (2) Refilling the bytes and granting requests.
do {
int64_t time_until_refill_us = next_refill_us_ - NowMicrosMonotonic();
int64_t time_until_refill_us = next_refill_us_ - NowMicrosMonotonicLocked();
if (time_until_refill_us > 0) {
if (wait_until_refill_pending_) {
// Somebody is performing (1). Trust we'll be woken up when our request
Expand All @@ -173,7 +178,7 @@ void GenericRateLimiter::Request(int64_t bytes, const Env::IOPriority pri,
} else {
// Whichever thread reaches here first performs duty (2) as described
// above.
RefillBytesAndGrantRequests();
RefillBytesAndGrantRequestsLocked();
if (r.granted) {
// If there is any remaining requests, make sure there exists at least
// one candidate is awake for future duties by signaling a front request
Expand Down Expand Up @@ -215,20 +220,20 @@ void GenericRateLimiter::Request(int64_t bytes, const Env::IOPriority pri,
}

std::vector<Env::IOPriority>
GenericRateLimiter::GeneratePriorityIterationOrder() {
GenericRateLimiter::GeneratePriorityIterationOrderLocked() {
std::vector<Env::IOPriority> pri_iteration_order(Env::IO_TOTAL /* 4 */);
// We make Env::IO_USER a superior priority by always iterating its queue
// first
pri_iteration_order[0] = Env::IO_USER;

bool high_pri_iterated_after_mid_low_pri = rnd_.OneIn(fairness_);
TEST_SYNC_POINT_CALLBACK(
"GenericRateLimiter::GeneratePriorityIterationOrder::"
"GenericRateLimiter::GeneratePriorityIterationOrderLocked::"
"PostRandomOneInFairnessForHighPri",
&high_pri_iterated_after_mid_low_pri);
bool mid_pri_itereated_after_low_pri = rnd_.OneIn(fairness_);
TEST_SYNC_POINT_CALLBACK(
"GenericRateLimiter::GeneratePriorityIterationOrder::"
"GenericRateLimiter::GeneratePriorityIterationOrderLocked::"
"PostRandomOneInFairnessForMidPri",
&mid_pri_itereated_after_low_pri);

Expand All @@ -247,15 +252,16 @@ GenericRateLimiter::GeneratePriorityIterationOrder() {
}

TEST_SYNC_POINT_CALLBACK(
"GenericRateLimiter::GeneratePriorityIterationOrder::"
"GenericRateLimiter::GeneratePriorityIterationOrderLocked::"
"PreReturnPriIterationOrder",
&pri_iteration_order);
return pri_iteration_order;
}

void GenericRateLimiter::RefillBytesAndGrantRequests() {
TEST_SYNC_POINT("GenericRateLimiter::RefillBytesAndGrantRequests");
next_refill_us_ = NowMicrosMonotonic() + refill_period_us_;
void GenericRateLimiter::RefillBytesAndGrantRequestsLocked() {
TEST_SYNC_POINT_CALLBACK(
"GenericRateLimiter::RefillBytesAndGrantRequestsLocked", &request_mutex_);
next_refill_us_ = NowMicrosMonotonicLocked() + refill_period_us_;
// Carry over the left over quota from the last period
auto refill_bytes_per_period =
refill_bytes_per_period_.load(std::memory_order_relaxed);
Expand All @@ -264,7 +270,7 @@ void GenericRateLimiter::RefillBytesAndGrantRequests() {
}

std::vector<Env::IOPriority> pri_iteration_order =
GeneratePriorityIterationOrder();
GeneratePriorityIterationOrderLocked();

for (int i = Env::IO_LOW; i < Env::IO_TOTAL; ++i) {
assert(!pri_iteration_order.empty());
Expand Down Expand Up @@ -293,7 +299,7 @@ void GenericRateLimiter::RefillBytesAndGrantRequests() {
}
}

int64_t GenericRateLimiter::CalculateRefillBytesPerPeriod(
int64_t GenericRateLimiter::CalculateRefillBytesPerPeriodLocked(
int64_t rate_bytes_per_sec) {
if (std::numeric_limits<int64_t>::max() / rate_bytes_per_sec <
refill_period_us_) {
Expand All @@ -305,7 +311,7 @@ int64_t GenericRateLimiter::CalculateRefillBytesPerPeriod(
}
}

Status GenericRateLimiter::Tune() {
Status GenericRateLimiter::TuneLocked() {
const int kLowWatermarkPct = 50;
const int kHighWatermarkPct = 90;
const int kAdjustFactorPct = 5;
Expand All @@ -314,7 +320,7 @@ Status GenericRateLimiter::Tune() {
const int kAllowedRangeFactor = 20;

std::chrono::microseconds prev_tuned_time = tuned_time_;
tuned_time_ = std::chrono::microseconds(NowMicrosMonotonic());
tuned_time_ = std::chrono::microseconds(NowMicrosMonotonicLocked());

int64_t elapsed_intervals = (tuned_time_ - prev_tuned_time +
std::chrono::microseconds(refill_period_us_) -
Expand Down Expand Up @@ -349,7 +355,7 @@ Status GenericRateLimiter::Tune() {
new_bytes_per_sec = prev_bytes_per_sec;
}
if (new_bytes_per_sec != prev_bytes_per_sec) {
SetBytesPerSecond(new_bytes_per_sec);
SetBytesPerSecondLocked(new_bytes_per_sec);
}
num_drains_ = 0;
return Status::OK();
Expand Down
22 changes: 12 additions & 10 deletions util/rate_limiter.h
Original file line number Diff line number Diff line change
Expand Up @@ -92,30 +92,32 @@ class GenericRateLimiter : public RateLimiter {
}

virtual int64_t GetBytesPerSecond() const override {
return rate_bytes_per_sec_;
return rate_bytes_per_sec_.load(std::memory_order_relaxed);
}

virtual void TEST_SetClock(std::shared_ptr<SystemClock> clock) {
MutexLock g(&request_mutex_);
clock_ = std::move(clock);
next_refill_us_ = NowMicrosMonotonic();
next_refill_us_ = NowMicrosMonotonicLocked();
}

private:
void RefillBytesAndGrantRequests();
std::vector<Env::IOPriority> GeneratePriorityIterationOrder();
int64_t CalculateRefillBytesPerPeriod(int64_t rate_bytes_per_sec);
Status Tune();

uint64_t NowMicrosMonotonic() { return clock_->NowNanos() / std::milli::den; }
void RefillBytesAndGrantRequestsLocked();
std::vector<Env::IOPriority> GeneratePriorityIterationOrderLocked();
int64_t CalculateRefillBytesPerPeriodLocked(int64_t rate_bytes_per_sec);
Status TuneLocked();
void SetBytesPerSecondLocked(int64_t bytes_per_second);

uint64_t NowMicrosMonotonicLocked() {
return clock_->NowNanos() / std::milli::den;
}

// This mutex guard all internal states
mutable port::Mutex request_mutex_;

const int64_t refill_period_us_;

int64_t rate_bytes_per_sec_;
// This variable can be changed dynamically.
std::atomic<int64_t> rate_bytes_per_sec_;
std::atomic<int64_t> refill_bytes_per_period_;
std::shared_ptr<SystemClock> clock_;

Expand Down
17 changes: 10 additions & 7 deletions util/rate_limiter_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -202,7 +202,7 @@ TEST_F(RateLimiterTest, GeneratePriorityIterationOrder) {
bool mid_pri_itereated_after_low_pri_set = false;
bool pri_iteration_order_verified = false;
SyncPoint::GetInstance()->SetCallBack(
"GenericRateLimiter::GeneratePriorityIterationOrder::"
"GenericRateLimiter::GeneratePriorityIterationOrderLocked::"
"PostRandomOneInFairnessForHighPri",
[&](void* arg) {
bool* high_pri_iterated_after_mid_low_pri = (bool*)arg;
Expand All @@ -212,7 +212,7 @@ TEST_F(RateLimiterTest, GeneratePriorityIterationOrder) {
});

SyncPoint::GetInstance()->SetCallBack(
"GenericRateLimiter::GeneratePriorityIterationOrder::"
"GenericRateLimiter::GeneratePriorityIterationOrderLocked::"
"PostRandomOneInFairnessForMidPri",
[&](void* arg) {
bool* mid_pri_itereated_after_low_pri = (bool*)arg;
Expand All @@ -222,7 +222,7 @@ TEST_F(RateLimiterTest, GeneratePriorityIterationOrder) {
});

SyncPoint::GetInstance()->SetCallBack(
"GenericRateLimiter::GeneratePriorityIterationOrder::"
"GenericRateLimiter::GeneratePriorityIterationOrderLocked::"
"PreReturnPriIterationOrder",
[&](void* arg) {
std::vector<Env::IOPriority>* pri_iteration_order =
Expand All @@ -249,13 +249,13 @@ TEST_F(RateLimiterTest, GeneratePriorityIterationOrder) {
ASSERT_EQ(pri_iteration_order_verified, true);
SyncPoint::GetInstance()->DisableProcessing();
SyncPoint::GetInstance()->ClearCallBack(
"GenericRateLimiter::GeneratePriorityIterationOrder::"
"GenericRateLimiter::GeneratePriorityIterationOrderLocked::"
"PreReturnPriIterationOrder");
SyncPoint::GetInstance()->ClearCallBack(
"GenericRateLimiter::GeneratePriorityIterationOrder::"
"GenericRateLimiter::GeneratePriorityIterationOrderLocked::"
"PostRandomOneInFairnessForMidPri");
SyncPoint::GetInstance()->ClearCallBack(
"GenericRateLimiter::GeneratePriorityIterationOrder::"
"GenericRateLimiter::GeneratePriorityIterationOrderLocked::"
"PostRandomOneInFairnessForHighPri");
}
}
Expand Down Expand Up @@ -387,11 +387,14 @@ TEST_F(RateLimiterTest, LimitChangeTest) {
std::make_shared<GenericRateLimiter>(
target, refill_period, 10, RateLimiter::Mode::kWritesOnly,
SystemClock::Default(), false /* auto_tuned */);
// After "GenericRateLimiter::Request:1" the mutex is held until the bytes
// are refilled. This test could be improved to change the limit when lock
// is released in `TimedWait()`.
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->LoadDependency(
{{"GenericRateLimiter::Request",
"RateLimiterTest::LimitChangeTest:changeLimitStart"},
{"RateLimiterTest::LimitChangeTest:changeLimitEnd",
"GenericRateLimiter::RefillBytesAndGrantRequests"}});
"GenericRateLimiter::Request:1"}});
Arg arg(target, Env::IO_HIGH, limiter);
// The idea behind is to start a request first, then before it refills,
// update limit to a different value (2X/0.5X). No starvation should
Expand Down