Skip to content

Commit

Permalink
[#3331] Recover after write stop
Browse files Browse the repository at this point in the history
Summary:
The number of SST files at local peer is updated after something is appended to local Raft log.
But there is possible scenario when nothing is appended to log, after number of SST files was updated.
For instance because writes are rejected due to big number of SST files.

So we get into deadlock state, where we reject writes because too big number of SST files.
But number of SST files is not updates, because nothing is written.

This diff resolves the issue by adding alternative channel for SST files number update.

Test Plan: ybd --gtest_filter QLStressTest.WriteStop

Reviewers: timur, bogdan

Reviewed By: timur, bogdan

Subscribers: ybase

Differential Revision: https://phabricator.dev.yugabyte.com/D7804
  • Loading branch information
spolitov committed Jan 26, 2020
1 parent 7372863 commit 38ef51a
Show file tree
Hide file tree
Showing 16 changed files with 320 additions and 234 deletions.
51 changes: 41 additions & 10 deletions src/yb/client/ql-stress-test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -201,6 +201,8 @@ class QLStressTest : public QLDmlTestBase {
const std::chrono::nanoseconds& sleep_duration = std::chrono::nanoseconds(),
bool allow_failures = false);

void TestWriteRejection();

TableHandle table_;

int checkpoint_index_ = 0;
Expand Down Expand Up @@ -721,12 +723,13 @@ TEST_F_EX(QLStressTest, SlowUpdateConsensus, QLStressTestSingleTablet) {
ASSERT_LE(max_peak_consumption, 150_MB);
}

template <int kSoftLimit, int kHardLimit>
class QLStressTestDelayWrite : public QLStressTestSingleTablet {
public:
void SetUp() override {
FLAGS_db_write_buffer_size = 1_KB;
FLAGS_sst_files_soft_limit = 4;
FLAGS_sst_files_hard_limit = 10;
FLAGS_sst_files_soft_limit = kSoftLimit;
FLAGS_sst_files_hard_limit = kHardLimit;
FLAGS_rocksdb_level0_file_num_compaction_trigger = 6;
FLAGS_rocksdb_universal_compaction_min_merge_width = 2;
FLAGS_rocksdb_universal_compaction_size_ratio = 1000;
Expand Down Expand Up @@ -765,7 +768,7 @@ void QLStressTest::AddWriter(
});
}

TEST_F_EX(QLStressTest, DelayWrite, QLStressTestDelayWrite) {
void QLStressTest::TestWriteRejection() {
constexpr int kWriters = 10;
constexpr int kKeyBase = 10000;

Expand Down Expand Up @@ -793,16 +796,31 @@ TEST_F_EX(QLStressTest, DelayWrite, QLStressTestDelayWrite) {
}
});

int last_keys_written = 0;
int first_keys_written_after_rejections_started_to_appear = -1;
auto last_keys_written_update_time = CoarseMonoClock::now();
uint64_t last_rejections = 0;
bool has_writes_after_rejections = false;
for (;;) {
std::this_thread::sleep_for(1s);
int keys_written = 0;
for (int i = 0; i != kWriters; ++i) {
keys_written += keys[i].load() - kKeyBase * i;
}
LOG(INFO) << "Total keys written: " << keys_written;
if (keys_written < RegularBuildVsSanitizers(1000, 100)) {
if (keys_written == last_keys_written) {
ASSERT_LE(CoarseMonoClock::now() - last_keys_written_update_time, 20s);
continue;
}
if (last_rejections != 0) {
if (first_keys_written_after_rejections_started_to_appear < 0) {
first_keys_written_after_rejections_started_to_appear = keys_written;
} else if (keys_written > first_keys_written_after_rejections_started_to_appear) {
has_writes_after_rejections = true;
}
}
last_keys_written = keys_written;
last_keys_written_update_time = CoarseMonoClock::now();

uint64_t total_rejections = 0;
for (int i = 0; i < cluster_->num_tablet_servers(); ++i) {
Expand All @@ -814,15 +832,15 @@ TEST_F_EX(QLStressTest, DelayWrite, QLStressTestDelayWrite) {
rejections += counter->value();
}
total_rejections += rejections;

LOG(INFO) << "Rejections: " << rejections;
}
LOG(INFO) << "Total rejections: " << total_rejections;
last_rejections = total_rejections;

if (!IsSanitizer() && total_rejections < 10) {
continue;
if (keys_written >= RegularBuildVsSanitizers(1000, 100) &&
(IsSanitizer() || total_rejections >= 10) &&
has_writes_after_rejections) {
break;
}

break;
}

ASSERT_OK(WaitFor([cluster = cluster_.get()] {
Expand All @@ -843,6 +861,19 @@ TEST_F_EX(QLStressTest, DelayWrite, QLStressTestDelayWrite) {
}, 30s, "Waiting tablets to sync up"));
}

typedef QLStressTestDelayWrite<4, 10> QLStressTestDelayWrite_4_10;

TEST_F_EX(QLStressTest, DelayWrite, QLStressTestDelayWrite_4_10) {
TestWriteRejection();
}

// Soft limit == hard limit to test write stop and recover after it.
typedef QLStressTestDelayWrite<6, 6> QLStressTestDelayWrite_6_6;

TEST_F_EX(QLStressTest, WriteStop, QLStressTestDelayWrite_6_6) {
TestWriteRejection();
}

class QLStressTestLongRemoteBootstrap : public QLStressTestSingleTablet {
public:
void SetUp() override {
Expand Down
17 changes: 13 additions & 4 deletions src/yb/consensus/consensus-test-util.h
Original file line number Diff line number Diff line change
Expand Up @@ -518,6 +518,14 @@ class LocalTestPeerProxy : public TestPeerProxy {
tserver::TabletServerErrorPB* error = response->mutable_error();
error->set_code(tserver::TabletServerErrorPB::UNKNOWN_ERROR);
StatusToPB(status, error->mutable_status());
ClearStatus(response);
}

void ClearStatus(VoteResponsePB* response) {
}

void ClearStatus(ConsensusResponsePB* response) {
response->clear_status();
}

template<class Request, class Response>
Expand Down Expand Up @@ -917,10 +925,11 @@ class TestRaftConsensusQueueIface : public PeerMessageQueueObserver {
majority_replicated_op_id_ = data.op_id;
committed_index->CopyFrom(data.op_id);
}
virtual void NotifyTermChange(int64_t term) override {}
virtual void NotifyFailedFollower(const std::string& uuid,
int64_t term,
const std::string& reason) override {}
void NotifyTermChange(int64_t term) override {}
void NotifyFailedFollower(const std::string& uuid,
int64_t term,
const std::string& reason) override {}
void MajorityReplicatedNumSSTFilesChanged(uint64_t) override {}

private:
mutable simple_spinlock lock_;
Expand Down
5 changes: 5 additions & 0 deletions src/yb/consensus/consensus_context.h
Original file line number Diff line number Diff line change
Expand Up @@ -57,8 +57,13 @@ class ConsensusContext {
// time value for a read/write operation in case of RF==1 mode.
virtual void ChangeConfigReplicated(const RaftConfigPB& config) = 0;

// See DB::GetCurrentVersionNumSSTFiles
virtual uint64_t NumSSTFiles() = 0;

// Register listener that will be invoked when number of SST files changed.
// Listener could be set only once and then reset.
virtual void ListenNumSSTFilesChanged(std::function<void()> listener) = 0;

virtual ~ConsensusContext() = default;
};

Expand Down
7 changes: 5 additions & 2 deletions src/yb/consensus/consensus_peers.cc
Original file line number Diff line number Diff line change
Expand Up @@ -358,6 +358,10 @@ void Peer::ProcessResponse() {
return;
}

// Response should be either error or status.
LOG_IF(DFATAL, response_.has_error() == response_.has_status())
<< "Invalid response: " << response_.ShortDebugString();

// Pass through errors we can respond to, like not found, since in that case
// we will need to remotely bootstrap. TODO: Handle DELETED response once implemented.
if ((response_.has_error() &&
Expand All @@ -372,8 +376,7 @@ void Peer::ProcessResponse() {
}

failed_attempts_ = 0;
bool more_pending = false;
queue_->ResponseFromPeer(peer_pb_.permanent_uuid(), response_, &more_pending);
bool more_pending = queue_->ResponseFromPeer(peer_pb_.permanent_uuid(), response_);

if (more_pending) {
processing_lock.unlock();
Expand Down
Loading

0 comments on commit 38ef51a

Please sign in to comment.