Skip to content

Commit

Permalink
- On restart, make log servers read disk queue entries from known
Browse files Browse the repository at this point in the history
committed version onwards in the case where the known committed version
is behind LogData::persistentDataDurableVersion (and version vector
unicast is enabled). This is so we will populate "LogData::unknownCommittedVersions",
on log server restart, with all versions that are needed by unicast recovery.
  • Loading branch information
sbodagala committed Nov 13, 2024
1 parent 37485d2 commit 0b7fbae
Showing 1 changed file with 52 additions and 1 deletion.
53 changes: 52 additions & 1 deletion fdbserver/TLogServer.actor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -222,6 +222,8 @@ static const KeyRangeRef persistTxsTagsKeys = KeyRangeRef("TxsTags/"_sr, "TxsTag
static const KeyRange persistTagMessagesKeys = prefixRange("TagMsg/"_sr);
static const KeyRange persistTagMessageRefsKeys = prefixRange("TagMsgRef/"_sr);
static const KeyRange persistTagPoppedKeys = prefixRange("TagPop/"_sr);
static const KeyRef persistUnicastRecoveryLocationKey = KeyRef("UnicastRecoveryLocation"_sr);
static const KeyRef persistSpillTargetLogDataIdKey = KeyRef("SpillTargetLogDataId"_sr);

static const KeyRef persistEncryptionAtRestModeKey = "encryptionAtRestMode"_sr;

Expand Down Expand Up @@ -736,6 +738,10 @@ struct LogData : NonCopyable, public ReferenceCounted<LogData> {
tLogData->persistentData->clear(KeyRangeRef(msgRefKey, strinc(msgRefKey)));
Key poppedKey = logIdKey.withPrefix(persistTagPoppedKeys.begin);
tLogData->persistentData->clear(KeyRangeRef(poppedKey, strinc(poppedKey)));
if (SERVER_KNOBS->ENABLE_VERSION_VECTOR_TLOG_UNICAST) {
tLogData->persistentData->clear(singleKeyRange(logIdKey.withPrefix(persistUnicastRecoveryLocationKey)));
tLogData->persistentData->clear(singleKeyRange(logIdKey.withPrefix(persistSpillTargetLogDataIdKey)));
}
}

for (auto it = peekTracker.begin(); it != peekTracker.end(); ++it) {
Expand Down Expand Up @@ -1113,6 +1119,17 @@ ACTOR Future<Void> updatePersistentData(TLogData* self, Reference<LogData> logDa
KeyValueRef(persistRecoveryLocationKey, BinaryWriter::toValue(locationIter->value.first, Unversioned())));
}

if (SERVER_KNOBS->ENABLE_VERSION_VECTOR_TLOG_UNICAST) {
auto kcvLocationIter = logData->versionLocation.lastLessOrEqual(logData->knownCommittedVersion);
if (kcvLocationIter != logData->versionLocation.end()) {
self->persistentData->set(KeyValueRef(persistUnicastRecoveryLocationKey,
BinaryWriter::toValue(kcvLocationIter->value.first, Unversioned())));
}

self->persistentData->set(
KeyValueRef(persistSpillTargetLogDataIdKey, BinaryWriter::toValue(logData->logId, Unversioned())));
}

self->persistentData->set(
KeyValueRef(BinaryWriter::toValue(logData->logId, Unversioned()).withPrefix(persistCurrentVersionKeys.begin),
BinaryWriter::toValue(newPersistentDataVersion, Unversioned())));
Expand Down Expand Up @@ -2446,7 +2463,9 @@ ACTOR Future<Void> tLogCommit(TLogData* self,
// Notifies the commitQueue actor to commit persistentQueue, and also unblocks tLogPeekMessages actors
logData->version.set(req.version);
if (SERVER_KNOBS->ENABLE_VERSION_VECTOR_TLOG_UNICAST) {
ASSERT(req.tLogCount == req.tLogLocIds.size());
logData->unknownCommittedVersions.emplace_front(req.version, req.seqPrevVersion, req.tLogLocIds);

while (!logData->unknownCommittedVersions.empty() &&
logData->unknownCommittedVersions.back().version <= req.knownCommittedVersion) {
logData->unknownCommittedVersions.pop_back();
Expand Down Expand Up @@ -3065,6 +3084,7 @@ ACTOR Future<Void> pullAsyncData(TLogData* self,
qe.knownCommittedVersion = logData->knownCommittedVersion;
qe.messages = StringRef();
qe.id = logData->logId;
qe.prevVersion = 0;
self->persistentQueue->push(qe, logData);

self->diskQueueCommitBytes += qe.expectedSize();
Expand Down Expand Up @@ -3220,10 +3240,13 @@ ACTOR Future<Void> restorePersistentState(TLogData* self,
state Future<RangeResult> fRecoverCounts = storage->readRange(persistRecoveryCountKeys);
state Future<RangeResult> fProtocolVersions = storage->readRange(persistProtocolVersionKeys);
state Future<RangeResult> fTLogSpillTypes = storage->readRange(persistTLogSpillTypeKeys);
state Future<Optional<Value>> fUnicastRecoveryLocation = storage->readValue(persistUnicastRecoveryLocationKey);
state Future<Optional<Value>> fSpillTargetLogDataId = storage->readValue(persistSpillTargetLogDataIdKey);

// FIXME: metadata in queue?

wait(waitForAll(std::vector{ fFormat, fRecoveryLocation, fEncryptionAtRestMode }));
wait(waitForAll(std::vector{
fFormat, fRecoveryLocation, fEncryptionAtRestMode, fUnicastRecoveryLocation, fSpillTargetLogDataId }));
wait(waitForAll(std::vector{ fVers,
fKnownCommitted,
fLocality,
Expand Down Expand Up @@ -3392,6 +3415,22 @@ ACTOR Future<Void> restorePersistentState(TLogData* self,
}
}

state Optional<UID> spillTargetLogDataId;
if (SERVER_KNOBS->ENABLE_VERSION_VECTOR_TLOG_UNICAST && fUnicastRecoveryLocation.get().present() &&
fSpillTargetLogDataId.get().present()) {
spillTargetLogDataId = BinaryReader::fromStringRef<UID>(fSpillTargetLogDataId.get().get(), Unversioned());
auto iter = self->id_data.find(spillTargetLogDataId.get());
if (iter != self->id_data.end()) {
Reference<LogData> spillTargetLogData = iter->second;
if (spillTargetLogData->knownCommittedVersion < spillTargetLogData->persistentDataDurableVersion) {
minimumRecoveryLocation = BinaryReader::fromStringRef<IDiskQueue::location>(
fUnicastRecoveryLocation.get().get(), Unversioned());
}
} else {
spillTargetLogDataId.reset();
}
}

std::sort(logsByVersion.begin(), logsByVersion.end());
for (const auto& pair : logsByVersion) {
// TLogs that have been fully spilled won't have queue entries read in the loop below.
Expand Down Expand Up @@ -3432,6 +3471,17 @@ ACTOR Future<Void> restorePersistentState(TLogData* self,
// logData->version.get());

if (logData) {
if (SERVER_KNOBS->ENABLE_VERSION_VECTOR_TLOG_UNICAST && spillTargetLogDataId.present() &&
qe.id == spillTargetLogDataId.get() && qe.version < logData->persistentDataDurableVersion) {
logData->unknownCommittedVersions.emplace_front(qe.version, qe.prevVersion, qe.tLogLocIds);

while (!logData->unknownCommittedVersions.empty() &&
logData->unknownCommittedVersions.back().version <= logData->knownCommittedVersion) {
logData->unknownCommittedVersions.pop_back();
}
continue;
}

if (!self->spillOrder.size() || self->spillOrder.back() != qe.id) {
self->spillOrder.push_back(qe.id);
}
Expand Down Expand Up @@ -3724,6 +3774,7 @@ ACTOR Future<Void> tLogStart(TLogData* self, InitializeTLogRequest req, Locality
qe.knownCommittedVersion = logData->knownCommittedVersion;
qe.messages = StringRef();
qe.id = logData->logId;
qe.prevVersion = 0;
self->persistentQueue->push(qe, logData);

self->diskQueueCommitBytes += qe.expectedSize();
Expand Down

0 comments on commit 0b7fbae

Please sign in to comment.