Skip to content

Commit

Permalink
Blob DB: Fix race condition between flush and write
Browse files Browse the repository at this point in the history
Summary:
A race condition will happen when:
* a user thread writes a value, but it hits the write stop condition because there are too many un-flushed memtables, while holding blob_db_impl.write_mutex_.
* Flush is triggered and call flush begin listener and try to acquire blob_db_impl.write_mutex_.

Fixing it.
Closes #3149

Differential Revision: D6279805

Pulled By: yiwu-arbug

fbshipit-source-id: 0e3c58afb78795ebe3360a2c69e05651e3908c40
  • Loading branch information
Yi Wu committed Nov 20, 2017
1 parent 3e29986 commit e58d377
Showing 1 changed file with 23 additions and 8 deletions.
31 changes: 23 additions & 8 deletions utilities/blob_db/blob_db_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -745,21 +745,29 @@ class BlobDBImpl::BlobInserter : public WriteBatch::Handler {
};

Status BlobDBImpl::Write(const WriteOptions& options, WriteBatch* updates) {
MutexLock l(&write_mutex_);

uint32_t default_cf_id =
reinterpret_cast<ColumnFamilyHandleImpl*>(DefaultColumnFamily())->GetID();
// TODO(yiwu): In case there are multiple writers the latest sequence would
// not be the actually sequence we are writting. Need to get the sequence
// from write batch after DB write instead.
SequenceNumber current_seq = GetLatestSequenceNumber() + 1;
Status s;
BlobInserter blob_inserter(options, this, default_cf_id, current_seq);
Status s = updates->Iterate(&blob_inserter);
{
// Release write_mutex_ before DB write to avoid race condition with
// flush begin listener, which also require write_mutex_ to sync
// blob files.
MutexLock l(&write_mutex_);
s = updates->Iterate(&blob_inserter);
}
if (!s.ok()) {
return s;
}
s = db_->Write(options, blob_inserter.batch());
if (!s.ok()) {
return s;
}
assert(blob_inserter.sequence() == GetLatestSequenceNumber() + 1);

// add deleted key to list of keys that have been deleted for book-keeping
class DeleteBookkeeper : public WriteBatch::Handler {
Expand Down Expand Up @@ -849,10 +857,19 @@ Status BlobDBImpl::PutWithTTL(const WriteOptions& options,
Status BlobDBImpl::PutUntil(const WriteOptions& options, const Slice& key,
const Slice& value, uint64_t expiration) {
TEST_SYNC_POINT("BlobDBImpl::PutUntil:Start");
MutexLock l(&write_mutex_);
SequenceNumber sequence = GetLatestSequenceNumber() + 1;
Status s;
WriteBatch batch;
Status s = PutBlobValue(options, key, value, expiration, sequence, &batch);
{
// Release write_mutex_ before DB write to avoid race condition with
// flush begin listener, which also require write_mutex_ to sync
// blob files.
MutexLock l(&write_mutex_);
// TODO(yiwu): In case there are multiple writers the latest sequence would
// not be the actually sequence we are writting. Need to get the sequence
// from write batch after DB write instead.
SequenceNumber sequence = GetLatestSequenceNumber() + 1;
s = PutBlobValue(options, key, value, expiration, sequence, &batch);
}
if (s.ok()) {
s = db_->Write(options, &batch);
}
Expand Down Expand Up @@ -1198,8 +1215,6 @@ Status BlobDBImpl::GetBlobValue(const Slice& key, const Slice& index_entry,
return Status::Corruption("Corruption. Blob CRC mismatch");
}

// TODO(yiwu): Should use compression flag in the blob file instead of
// current compression option.
if (bfile->compression() != kNoCompression) {
BlockContents contents;
auto cfh = reinterpret_cast<ColumnFamilyHandleImpl*>(DefaultColumnFamily());
Expand Down

0 comments on commit e58d377

Please sign in to comment.