Skip to content

Commit

Permalink
cherrypick : Ensure writes to WAL tail during FlushWAL(true /* sync *…
Browse files Browse the repository at this point in the history
…/) will be synced (tikv#8)

Co-authored-by: Andrew Kryczka <[email protected]>
  • Loading branch information
2 people authored and GitHub Enterprise committed Mar 26, 2024
1 parent 9fc157a commit 6bf86a8
Show file tree
Hide file tree
Showing 3 changed files with 62 additions and 12 deletions.
3 changes: 3 additions & 0 deletions HISTORY.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
# Rocksdb Change Log
## Unreleased
### Bug Fixes
* Fixed bug where `FlushWAL(true /* sync */)` (used by `GetLiveFilesStorageInfo()`, which is used by checkpoint and backup) could cause parallel writes at the tail of a WAL file to never be synced.

### Bug Fixes
* Fix a race condition in WAL size tracking which is caused by an unsafe iterator access after container is changed.
* Fix unprotected concurrent accesses to `WritableFileWriter::filesize_` by `DB::SyncWAL()` and `DB::Put()` in two write queue mode.
Expand Down
26 changes: 14 additions & 12 deletions db/db_impl/db_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1471,27 +1471,29 @@ void DBImpl::MarkLogsSynced(uint64_t up_to, bool synced_dir,
for (auto it = logs_.begin(); it != logs_.end() && it->number <= up_to;) {
auto& wal = *it;
assert(wal.IsSyncing());
ROCKS_LOG_INFO(immutable_db_options_.info_log,
"Synced log %" PRIu64 " from logs_\n", wal.number);
if (logs_.size() > 1) {

if (wal.number < logs_.back().number) {
// Inactive WAL
if (immutable_db_options_.track_and_verify_wals_in_manifest &&
wal.GetPreSyncSize() > 0) {
synced_wals->AddWal(wal.number, WalMetadata(wal.GetPreSyncSize()));
}
auto writer = wal.ReleaseWriter();
ROCKS_LOG_INFO(immutable_db_options_.info_log,
"deleting log %" PRIu64
" from logs_. Last Seq number of the WAL is %" PRIu64 "\n",
wal.number, writer->GetLastSequence());
logs_to_free_.push_back(writer);
it = logs_.erase(it);
if (wal.GetPreSyncSize() == wal.writer->file()->GetFlushedSize()) {
// Fully synced
logs_to_free_.push_back(wal.ReleaseWriter());
it = logs_.erase(it);
} else {
assert(wal.GetPreSyncSize() < wal.writer->file()->GetFlushedSize());
wal.FinishSync();
++it;
}
} else {
assert(wal.number == logs_.back().number);
// Active WAL
wal.FinishSync();
++it;
}
}
assert(logs_.empty() || logs_[0].number > up_to ||
(logs_.size() == 1 && !logs_[0].IsSyncing()));
log_sync_cv_.SignalAll();
}

Expand Down
45 changes: 45 additions & 0 deletions db/db_write_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -386,6 +386,51 @@ TEST_P(DBWriteTest, UnflushedPutRaceWithTrackedWalSync) {
Close();
}

TEST_P(DBWriteTest, InactiveWalFullySyncedBeforeUntracked) {
// Repro bug where a WAL is appended and switched after
// `FlushWAL(true /* sync */)`'s sync finishes and before it untracks fully
// synced inactive logs. Previously such a WAL would be wrongly untracked
// so the final append would never be synced.
Options options = GetOptions();
std::unique_ptr<FaultInjectionTestEnv> fault_env(
new FaultInjectionTestEnv(env_));
options.env = fault_env.get();
Reopen(options);

ASSERT_OK(Put("key1", "val1"));

SyncPoint::GetInstance()->SetCallBack(
"DBImpl::SyncWAL:BeforeMarkLogsSynced:1", [this](void* /* arg */) {
ASSERT_OK(Put("key2", "val2"));
ASSERT_OK(dbfull()->TEST_SwitchMemtable());
});
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();

ASSERT_OK(db_->FlushWAL(true /* sync */));

ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->ClearAllCallBacks();

ASSERT_OK(Put("key3", "val3"));

ASSERT_OK(db_->FlushWAL(true /* sync */));

Close();

// Simulate full loss of unsynced data. This should drop nothing since we did
// `FlushWAL(true /* sync */)` before `Close()`.
fault_env->DropUnsyncedFileData();

Reopen(options);

ASSERT_EQ("val1", Get("key1"));
ASSERT_EQ("val2", Get("key2"));
ASSERT_EQ("val3", Get("key3"));

// Need to close before `fault_env` goes out of scope.
Close();
}

TEST_P(DBWriteTest, IOErrorOnWALWriteTriggersReadOnlyMode) {
std::unique_ptr<FaultInjectionTestEnv> mock_env(
new FaultInjectionTestEnv(env_));
Expand Down

0 comments on commit 6bf86a8

Please sign in to comment.