Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

BulkDump Framework #11780

Open
wants to merge 18 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 12 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 25 additions & 0 deletions fdbclient/BulkDumping.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
/*
* BulkDumping.cpp
*
* This source file is part of the FoundationDB open source project
*
* Copyright 2013-2024 Apple Inc. and the FoundationDB project authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

#include "fdbclient/BulkDumping.h"

BulkDumpState newBulkDumpTaskLocalSST(const KeyRange& range, const std::string& folder) {
return BulkDumpState(range, BulkDumpFileType::SST, BulkDumpTransportMethod::CP, BulkDumpExportMethod::File, folder);
}
122 changes: 122 additions & 0 deletions fdbclient/ManagementAPI.actor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2973,6 +2973,128 @@ ACTOR Future<Void> acknowledgeBulkLoadTask(Database cx, KeyRange range, UID task
return Void();
}

ACTOR Future<int> setBulkDumpMode(Database cx, int mode) {
kakaiu marked this conversation as resolved.
Show resolved Hide resolved
state Transaction tr(cx);
state BinaryWriter wr(Unversioned());
wr << mode;
loop {
try {
tr.setOption(FDBTransactionOptions::LOCK_AWARE);
tr.setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS);
tr.setOption(FDBTransactionOptions::PRIORITY_SYSTEM_IMMEDIATE);
state int oldMode = 0;
Optional<Value> oldModeValue = wait(tr.get(bulkDumpModeKey));
if (oldModeValue.present()) {
BinaryReader rd(oldModeValue.get(), Unversioned());
rd >> oldMode;
}
if (oldMode != mode) {
BinaryWriter wrMyOwner(Unversioned());
wrMyOwner << dataDistributionModeLock;
tr.set(moveKeysLockOwnerKey, wrMyOwner.toValue());
BinaryWriter wrLastWrite(Unversioned());
wrLastWrite << deterministicRandom()->randomUniqueID(); // triger DD restarts
tr.set(moveKeysLockWriteKey, wrLastWrite.toValue());
tr.set(bulkDumpModeKey, wr.toValue());
wait(tr.commit());
TraceEvent("DDBulkDumpModeKeyChanged").detail("NewMode", mode).detail("OldMode", oldMode);
}
return oldMode;
} catch (Error& e) {
wait(tr.onError(e));
}
}
}

ACTOR Future<Void> submitBulkDumpTask(Database cx, BulkDumpState bulkDumpTask) {
state Transaction tr(cx);
loop {
try {
tr.setOption(FDBTransactionOptions::LOCK_AWARE);
tr.setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS);
// TODO(BulkDump): reject the request if there is an ongoing bulk dump job
if (bulkDumpTask.getPhase() != BulkDumpPhase::Submitted) {
TraceEvent(g_network->isSimulated() ? SevError : SevWarnAlways, "SubmitBulkDumpTaskError")
.setMaxEventLength(-1)
.setMaxFieldLength(-1)
.detail("Reason", "WrongPhase")
.detail("Task", bulkDumpTask.toString());
throw bulkdump_task_failed();
}
if (!normalKeys.contains(bulkDumpTask.getRange())) {
TraceEvent(g_network->isSimulated() ? SevError : SevWarnAlways, "SubmitBulkLoadTaskError")
.setMaxEventLength(-1)
.setMaxFieldLength(-1)
.detail("Reason", "RangeOutOfScope")
.detail("Task", bulkDumpTask.toString());
throw bulkdump_task_failed();
}
wait(krmSetRange(&tr, bulkDumpPrefix, bulkDumpTask.getRange(), bulkDumpStateValue(bulkDumpTask)));
wait(tr.commit());
break;
} catch (Error& e) {
wait(tr.onError(e));
}
}
return Void();
}

// TODO(BulkDump): add a method of cancelling the existing bulkdump task

ACTOR Future<std::vector<BulkDumpState>> getBulkDumpTasksWithinRange(Database cx,
KeyRange rangeToRead,
Optional<size_t> limit,
Optional<BulkDumpPhase> phase) {
state Transaction tr(cx);
state Key readBegin = rangeToRead.begin;
state Key readEnd = rangeToRead.end;
state RangeResult rangeResult;
state std::vector<BulkDumpState> res;
while (readBegin < readEnd) {
state int retryCount = 0;
loop {
try {
rangeResult.clear();
tr.setOption(FDBTransactionOptions::READ_SYSTEM_KEYS);
tr.setOption(FDBTransactionOptions::LOCK_AWARE);
wait(store(rangeResult,
krmGetRanges(&tr,
bulkDumpPrefix,
KeyRangeRef(readBegin, readEnd),
CLIENT_KNOBS->KRM_GET_RANGE_LIMIT,
CLIENT_KNOBS->KRM_GET_RANGE_LIMIT_BYTES)));
break;
} catch (Error& e) {
if (retryCount > 30) {
throw timed_out();
}
wait(tr.onError(e));
retryCount++;
}
}
for (int i = 0; i < rangeResult.size() - 1; ++i) {
if (rangeResult[i].value.empty()) {
continue;
}
BulkDumpState bulkDumpState = decodeBulkDumpState(rangeResult[i].value);
KeyRange range = Standalone(KeyRangeRef(rangeResult[i].key, rangeResult[i + 1].key));
if (range != bulkDumpState.getRange()) {
ASSERT(bulkDumpState.getRange().contains(range));
continue;
}
if (!phase.present() || phase.get() == bulkDumpState.getPhase()) {
res.push_back(bulkDumpState);
}
if (limit.present() && res.size() >= limit.get()) {
return res;
}
}
readBegin = rangeResult.back().key;
}

return res;
}

// Persist a new owner if input uniqueId is not existing; Update description if input uniqueId exists
ACTOR Future<Void> registerRangeLockOwner(Database cx, std::string uniqueId, std::string description) {
if (uniqueId.empty() || description.empty()) {
Expand Down
6 changes: 6 additions & 0 deletions fdbclient/ServerKnobs.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -381,6 +381,12 @@ void ServerKnobs::initialize(Randomize randomize, ClientKnobs* clientKnobs, IsSi
init( DD_BULKLOAD_PARALLELISM, 10 ); if( randomize && BUGGIFY ) DD_BULKLOAD_PARALLELISM = deterministicRandom()->randomInt(1, 10);
init( DD_BULKLOAD_SCHEDULE_MIN_INTERVAL_SEC, 2.0 ); if( randomize && BUGGIFY ) DD_BULKLOAD_SCHEDULE_MIN_INTERVAL_SEC = deterministicRandom()->random01() * 10 + 1;

// BulkDumping
init( DD_BULKDUMP_TASK_METADATA_READ_SIZE, 100 ); if( randomize && BUGGIFY ) DD_BULKDUMP_TASK_METADATA_READ_SIZE = deterministicRandom()->randomInt(2, 100);
init( DD_BULKDUMP_SCHEDULE_MIN_INTERVAL_SEC, 2.0 ); if( randomize && BUGGIFY ) DD_BULKDUMP_SCHEDULE_MIN_INTERVAL_SEC = deterministicRandom()->random01() * 10 + 1;
init( SS_SERVE_BULKDUMP_PARALLELISM, 1 ); // TODO(BulkDump): Do not set to 1 after SS can resolve the file folder conflict
init( DD_BULKDUMP_PARALLELISM, 50 ); if ( randomize && BUGGIFY ) DD_BULKDUMP_PARALLELISM = deterministicRandom()->randomInt(1, 5);

// TeamRemover
init( TR_LOW_SPACE_PIVOT_DELAY_SEC, 0 ); if (isSimulated) TR_LOW_SPACE_PIVOT_DELAY_SEC = deterministicRandom()->randomInt(0, 3);
init( TR_FLAG_DISABLE_MACHINE_TEAM_REMOVER, false ); if( randomize && BUGGIFY ) TR_FLAG_DISABLE_MACHINE_TEAM_REMOVER = deterministicRandom()->random01() < 0.1 ? true : false; // false by default. disable the consistency check when it's true
Expand Down
16 changes: 16 additions & 0 deletions fdbclient/SystemData.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1210,6 +1210,22 @@ BulkLoadState decodeBulkLoadState(const ValueRef& value) {
return bulkLoadState;
}

// Bulk dumping keys
const KeyRef bulkDumpModeKey = "\xff/bulkDumpMode"_sr;
const KeyRangeRef bulkDumpKeys = KeyRangeRef("\xff/bulkDump/"_sr, "\xff/bulkDump0"_sr);
const KeyRef bulkDumpPrefix = bulkDumpKeys.begin;

const Value bulkDumpStateValue(const BulkDumpState& bulkDumpState) {
return ObjectWriter::toValue(bulkDumpState, IncludeVersion());
}

BulkDumpState decodeBulkDumpState(const ValueRef& value) {
BulkDumpState bulkDumpState;
ObjectReader reader(value.begin(), IncludeVersion());
reader.deserialize(bulkDumpState);
return bulkDumpState;
}

// Range Lock
const std::string rangeLockNameForBulkLoad = "BulkLoad";

Expand Down
Loading