Skip to content

Commit

Permalink
Fix bug of writing totalBytes as int64 but read it as int32
Browse files Browse the repository at this point in the history
now versions can be restored, but test still failed because cycle test
failed even before the restore happens, how
  • Loading branch information
Hao committed Nov 21, 2024
1 parent 34a46b8 commit 65340fa
Show file tree
Hide file tree
Showing 3 changed files with 59 additions and 37 deletions.
45 changes: 17 additions & 28 deletions fdbclient/BackupAgentBase.actor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -324,9 +324,10 @@ ACTOR static Future<Void> decodeBackupLogValue(Arena* arena,
}

// hfu5: this is the format for Param2
state uint32_t totalBytes = 0;
memcpy(&totalBytes, value.begin() + offset, sizeof(uint32_t));
offset += sizeof(uint32_t);
// change total bytes to 64 bytes in generateOldFormatMutations
state uint64_t totalBytes = 0;
memcpy(&totalBytes, value.begin() + offset, sizeof(uint64_t));
offset += sizeof(uint64_t);
state uint32_t consumed = 0;

if (totalBytes + offset > value.size())
Expand All @@ -337,21 +338,24 @@ ACTOR static Future<Void> decodeBackupLogValue(Arena* arena,
state KeyRangeRef tenantMapRange = TenantMetadata::tenantMap().subspace;

while (consumed < totalBytes) {
// fmt::print(stderr, "DecodeProcess11111, offset={}\n", offset);
uint32_t type = 0;
// hfu5: format should be type|kLen|vLen|Key|Value
memcpy(&type, value.begin() + offset, sizeof(uint32_t));
offset += sizeof(uint32_t);

state uint32_t len1 = 0;
memcpy(&len1, value.begin() + offset, sizeof(uint32_t));
offset += sizeof(uint32_t);
state uint32_t len2 = 0;
memcpy(&len2, value.begin() + offset, sizeof(uint32_t));
offset += sizeof(uint32_t);

fmt::print(stderr, "DecodeProcess, offset={}, len1={}, len2={}, size={}, type={}, valid={}\n",
offset, len1, len2, value.size(), type, isValidMutationType(type));
// fmt::print(stderr, "DecodeProcess, offset={}, len1={}, len2={}, size={}, type={}, valid={}\n",
// offset, len1, len2, value.size(), type, isValidMutationType(type));
ASSERT(offset + len1 + len2 <= value.size() && isValidMutationType(type));

// mutationref is constructed here
state MutationRef logValue;
state Arena tempArena;
logValue.type = type;
Expand Down Expand Up @@ -789,24 +793,17 @@ ACTOR Future<int> kvMutationLogToTransactions(Database cx,
state int totalBytes = 0;
// two layer of loops, outside loop for each file range,
// inside look for each transaction(version)
fmt::print(stderr, "BackupAgentBase-kvMutationLogToTransactions-beforeLoop\n");
TraceEvent("BackupAgentBaseKvMutationLogToTransactionsBeforeLoop")
.log();
// fmt::print(stderr, "BackupAgentBase-kvMutationLogToTransactions-beforeLoop\n");
loop {
state CommitTransactionRequest req;
state Version newBeginVersion = invalidVersion;
state int mutationSize = 0;
state bool tenantMapChanging = false;
loop {
try {
fmt::print(stderr, "BackupAgentBase-RCGroup-Before\n");
TraceEvent("BackupAgentBaseRCGroupBefore")
.log();
// fmt::print(stderr, "BackupAgentBase-RCGroup-Before\n");
state RCGroup group = waitNext(results.getFuture());
fmt::print(stderr, "BackupAgentBase-RCGroup-After group={}\n", group.groupKey);
TraceEvent("BackupAgentBaseRCGroupAfter")
.detail("Version", group.groupKey)
.log();
// fmt::print(stderr, "BackupAgentBase-RCGroup-After group={}\n", group.groupKey);
state CommitTransactionRequest curReq;
lock->release(group.items.expectedSize());
state int curBatchMutationSize = 0;
Expand Down Expand Up @@ -870,11 +867,7 @@ ACTOR Future<int> kvMutationLogToTransactions(Database cx,
mutationSize += curBatchMutationSize;
newBeginVersion = group.groupKey + 1;

fmt::print(stderr, "BackupAgentBase-kvMutationLogToTransactions: newBeginVersion={}, groupKey={}\n", newBeginVersion, group.groupKey);
TraceEvent("BackupAgentBaseKvMutationLogToTransactions")
.detail("NewBeginVersion", newBeginVersion)
.detail("GroupKey", group.groupKey)
.log();
// fmt::print(stderr, "BackupAgentBase-kvMutationLogToTransactions: newBeginVersion={}, groupKey={}\n", newBeginVersion, group.groupKey);

// At this point if the tenant map changed we would have already sent any normalKey mutations
// accumulated thus far, so all thats left to do is to send all the mutations in the the offending
Expand All @@ -884,7 +877,7 @@ ACTOR Future<int> kvMutationLogToTransactions(Database cx,
break;
}
} catch (Error& e) {
fmt::print(stderr, "BackupAgentBaseError error={}", e.code());
fmt::print(stderr, "BackupAgentBaseError error={}\n", e.code());
if (e.code() == error_code_end_of_stream) {
if (endVersion.present() && endVersion.get() > lastVersion && endVersion.get() > newBeginVersion) {
newBeginVersion = endVersion.get();
Expand Down Expand Up @@ -984,11 +977,7 @@ ACTOR Future<Void> applyMutations(Database cx,
state int maxBytes = CLIENT_KNOBS->APPLY_MIN_LOCK_BYTES;

keyVersion->insert(metadataVersionKey, 0);
fmt::print(stderr, "BackupAgentBaseApplyMutationBegin: begin={}, end={}\n", beginVersion, *endVersion);
TraceEvent("BackupAgentBaseApplyMutationsBegin")
.detail("BeginVersion", beginVersion)
.detail("EndVersion", *endVersion)
.log();
// fmt::print(stderr, "BackupAgentBaseApplyMutationBegin: begin={}, end={}\n", beginVersion, *endVersion);

try {
loop {
Expand All @@ -1009,7 +998,7 @@ ACTOR Future<Void> applyMutations(Database cx,
// ranges each represent a partition of version, e.g. [100, 200], [201, 300], [301, 400]
// (64, 200) -> [(64, 128), (128, 192), (192, 200)] assuming block size is 64
state Standalone<VectorRef<KeyRangeRef>> ranges = getApplyRanges(beginVersion, newEndVersion, uid);
fmt::print(stderr, "BackupAgentBaseApplyMutationRangeSize={}\n", ranges.size());
// fmt::print(stderr, "BackupAgentBaseApplyMutationRangeSize={}\n", ranges.size());
// ranges have format: applyLogKeys.begin/uid/hash(uint8)/version(64bites)/part
state size_t idx;
state std::vector<PromiseStream<RCGroup>> results;
Expand All @@ -1019,7 +1008,7 @@ ACTOR Future<Void> applyMutations(Database cx,
// each RCGroup is for a single version, each results[i] is for a single range
// one range might have multiple versions
for (int i = 0; i < ranges.size(); ++i) {
fmt::print(stderr, "BackupAgentBaseApplyMutationRangeRecord begin={}, end={}\n", ranges[i].begin, ranges[i].end);
// fmt::print(stderr, "BackupAgentBaseApplyMutationRangeRecord begin={}, end={}\n", ranges[i].begin, ranges[i].end);
results.push_back(PromiseStream<RCGroup>());
locks.push_back(makeReference<FlowLock>(
std::max(CLIENT_KNOBS->APPLY_MAX_LOCK_BYTES / ranges.size(), CLIENT_KNOBS->APPLY_MIN_LOCK_BYTES)));
Expand Down
49 changes: 41 additions & 8 deletions fdbclient/FileBackupAgent.actor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -304,7 +304,7 @@ class RestoreConfig : public KeyBackedTaskConfig {

Version beginVersion = BinaryReader::fromStringRef<Version>(beginVal.get().get(), Unversioned());
Version endVersion = BinaryReader::fromStringRef<Version>(endVal.get().get(), Unversioned());
// fmt::print(stderr, "GetLag internal: begin={}, end={}\n", beginVersion, endVersion);
fmt::print(stderr, "GetLag internal: begin={}, end={}\n", beginVersion, endVersion);

return endVersion - beginVersion;
}
Expand Down Expand Up @@ -4809,12 +4809,39 @@ REGISTER_TASKFUNC(RestoreLogDataTaskFunc);
// similar to addBackupMutations(
// MutationList::push_back_deep
Standalone<StringRef> transformMutationToOldFormat(MutationRef m) {
// i need to customize the encoding here according to
/*
// hfu5: format should be type|kLen|vLen|Key|Value
memcpy(&type, value.begin() + offset, sizeof(uint32_t));
offset += sizeof(uint32_t);
state uint32_t len1 = 0;
memcpy(&len1, value.begin() + offset, sizeof(uint32_t));
offset += sizeof(uint32_t);
state uint32_t len2 = 0;
memcpy(&len2, value.begin() + offset, sizeof(uint32_t));
offset += sizeof(uint32_t);
// mutationref is constructed here
state MutationRef logValue;
state Arena tempArena;
logValue.type = type;
logValue.param1 = value.substr(offset, len1);
offset += len1;
logValue.param2 = value.substr(offset, len2);
offset += len2;
*/
BinaryWriter bw(Unversioned());
bw << m.type;
bw << m.param1.size();
bw << m.param1;
bw << m.param2.size();
bw << m.param2;
uint32_t len1, len2, type;
type = m.type;
len1 = m.param1.size();
len2 = m.param2.size();
bw << type;
bw << len1;
bw << len2;
bw.serializeBytes(m.param1); // << is overloaded for stringref to write its size first, so
bw.serializeBytes(m.param2);
// next step to see if there are additional bytes added by binary writer
// fmt::print(stderr, "generate old format transaction, type={}, len1={}, len2={}, total={}\n", type, len1, len2, bw.toValue().size());
return bw.toValue();
}

Expand Down Expand Up @@ -4853,10 +4880,16 @@ Standalone<VectorRef<KeyValueRef>> generateOldFormatMutations(
for (auto& mutationsForSub : mutationsBySub) {
// concatenate them to param2Str
for (auto& m : mutationsForSub.second) {
// refer to transformMutationToOldFormat
// binary writer adds additional 8 bytes at the beginning for version, need to remove it
// because it is concatenated here and we will use memcpy to process this long string
// instead of binary reader
// fmt::print(stderr, "Combine param2, currentSize={}, eachSize={}\n", param2Writer.toValue().size(), m.size());
param2Writer.serializeBytes(m);
}
}
Key param2Concat = param2Writer.toValue();
// fmt::print(stderr, "param2Concat size={}\n", param2Concat.size());

// deal with param1
int32_t hashBase = commitVersion / CLIENT_KNOBS->LOG_RANGE_BLOCK_SIZE;
Expand Down Expand Up @@ -5298,9 +5331,9 @@ struct RestoreDispatchPartitionedTaskFunc : RestoreTaskFuncBase {
tr, taskBucket, task, endVersion, nextEndVersion, TaskCompletionKey::noSignal(), allPartsDone));

wait(waitForAll(addTaskFutures));
fmt::print(stderr, "before wait finish begin={}, end={}, nextEnd={} \n", beginVersion, endVersion, nextEndVersion);
// fmt::print(stderr, "before wait finish begin={}, end={}, nextEnd={} \n", beginVersion, endVersion, nextEndVersion);
wait(taskBucket->finish(tr, task));
fmt::print(stderr, "Add parent task begin={}, end={}, nextEnd={}, should happen only after children are done \n", beginVersion, endVersion, nextEndVersion);
// fmt::print(stderr, "Add parent task begin={}, end={}, nextEnd={}, should happen only after children are done \n", beginVersion, endVersion, nextEndVersion);


TraceEvent("RestorePartitionDispatch")
Expand Down
2 changes: 1 addition & 1 deletion fdbclient/TaskBucket.actor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1142,7 +1142,7 @@ class TaskFutureImpl {
for (auto& v : task->params) {
tr->set(callbackSpace.pack(v.key), v.value);
}
fmt::print(stderr, "TaskFuture::onSet callback added, isSet={}\n", is_set);
// fmt::print(stderr, "TaskFuture::onSet callback added, isSet={}\n", is_set);
}

return Void();
Expand Down

0 comments on commit 65340fa

Please sign in to comment.