Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

BulkDump Framework #11780

Open
wants to merge 18 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
96 changes: 96 additions & 0 deletions documentation/sphinx/source/bulkdump.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
##############################
BulkDump (Dev)
##############################

| Author: Zhe Wang
| Reviewer: Michael Stack, Jingyu Zhou
| Audience: FDB developers, SREs and expert users.


Overview
========
In a FoundationDB (FDB) key-value cluster, every key-value pair is replicated across multiple storage servers.
The BulkDump tool is developed to dump all key-value pairs within the input range to files.
Note that when the input range is large, the range is splitted into smaller ranges.
Each subrange data is dumped to a file at a version. All data within a file is at the same version. However, different files' version can be different.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
Each subrange data is dumped to a file at a version. All data within a file is at the same version. However, different files' version can be different.
Each subrange data is dumped to a file at a version. All data within a file is at the same version. However, different files' versions can be different.


Input and output
----------------
When a user wants to start a bulkdump job, the user provides the range to dump and the path root to dump the data.
The range can be any subrange within the user key space (i.e. " " ~ "\\xff").
Dumping the data of the system key space and special key space (i.e. "\\xff" ~ "\\xff\\xff\\xff") is not allowed.
The path root can be either a blobstore url (TBD) or a path of a file system.
Given the input range, if the range is large, the range is splitted into smaller ranges.
Each subrange is dump at a version to a folder. In particular, the folder is organized as following:

1. <rootLocal>/<relativeFolder>/<dumpVersion>-manifest.txt (must have)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The format has some problem, as the rendered output looks like //-manifest.txt (must have). You might need to escape < and >.

2. <rootLocal>/<relativeFolder>/<dumpVersion>-data.sst (omitted if the subrange is empty)
3. <rootLocal>/<relativeFolder>/<dumpVersion>-sample.sst (omitted if data size is too small to have a sample)

The <relativeFolder> is defined as <JobId>/<TaskId>/<BatchId>.
At any time, a FDB cluster can have at most one bulkdump job.
A bulkdump job is partitioned into tasks by range and according to the shard boundary.
When dumping the range of a task, the data is collected in batches. All key-value pairs of a batch is collected at the same version.
Above all, <JobId> is the unique ID of a job. <TaskId> is the unique ID of a task. <BatchId> is the unique ID of a batch.
All tasks's data files of the same job locates at the same Job folder named by the JobId.

Each <relativeFolder> corresponds to exactly one subrange with exactly one manifest file.
The manifest file includes all necessary information for loading the data from the folder to a FDB cluster.
The manifest file content includes following information:

1. File paths (including the path root)
2. Key Range of the dumped data in the folder
3. Version when the data of the range is collected
4. Checksum of the data
5. Datasize of the data in bytes
6. Bytes sampling setting (when a cluster loads the folder, if the setting mismatches, the loading cluster does bytes sampling by itself; Otherwise, the loading cluster directly uses the sample file of the folder).

In the job folder, there is a global manifest file including all ranges and their corresponding manifest file.
When loading a cluster, users can use this global manifest to rebuild the data.

How to use?
-----------
Currently, low-level transactional APIs are provided to submit a job or clear a job.
These operations are achieved by issuing transactions to update the bulkdump metadata.
Submitting a job is achieved by writting the job metadata to the bulkdump metadata range of the job.
When submitting a job, the API checks if there is any ongoing bulkdump job. If yes, it will reject the job. Otherwise, it accepts the job.
Clearing a job is achieved by erasing the entire user range space of the bulkdump metadata range. When clearing a job, all metadata will be cleared and any ongoing task is stopped (with some latency).

Currently, ManagementAPI provides following interfaces to do the operations:

1. Submit a job: submitBulkDumpJob(BulkDumpState job); // For generating the input job metadata, see the point 4.
2. Clear a job: clearBulkDumpJob();
3. Enable the feature: setBulkDumpMode(int mode); // Set mode = 1 to enable; Set mode = 0 to disable.
4. BulkDump job metadata is generated by newBulkDumpTaskLocalSST(KeyRange range, std::string remoteRoot); // Will include more APIs to generate the metadata as the funcationality expands.

Mechanisms
==========

Workflow
--------
- Users input a range by a transaction and this range is persisted to bulkdump metadata (with "\\xff/bulkDump/" prefix).
- Bulkdump metadata is range-based.
- DD observes this range to dump by reading from the metadata.
- DD partitions the range into smaller ranges according to the shard boundary.
- DD randomly chooses one storage server which owns the range as the agent to do the dump. DD holds outstanding promise with this SS. The task assigned to a SS is stateless.
- DD sends the range dump request to the storage server. DD spawns a dedicated actor waiting on the call. If any failure happens at SS side, DD will know this.
- DD sends the range dump request within the max parallelism specified by the knob DD_BULKDUMP_PARALLELISM.
- SS recieves the request and read the data from local storage. If the range has been moved away or splitted, the SS replies failure to the DD and DD will retry the remaining range later. If the range is there, SS read the data and upload the data to external storage. This PR only implements to dump the data to local disk. There will be a PR to dump the data to S3.
- When SS completes, the SS marks this range as completed in the metadata.

Invariant
---------
- At any time, FDB cluster accepts at most one bulkdump job.
- DD partitions the range into subranges according to the shard boundary. For a subrange, the data is guaranteed to put into the same folder --- same as task ID.
- Each data filename is the version indicating the version of the data read by the SS.
- Each subrange always has one manifest file indicating the metadata information of the data, such as Range, Checksum (to implement later in a separate PR), and FilePath.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
- Each subrange always has one manifest file indicating the metadata information of the data, such as Range, Checksum (to implement later in a separate PR), and FilePath.
- Each subrange always has one manifest file indicating the metadata information of the data, such as Range, Checksum (to be implemented later in a separate PR), and FilePath.

- In SS, we dump files at first and then write metadata in the system key space. If any phase is failed, DD will re-do the range. For each time SS writes the folder (locally or in BlobStore), the SS erases the folder at first.
- A SS handles at most one dump task at a time (the parallelism is protected by the knob SS_SERVE_BULKDUMP_PARALLELISM. With current implementation, this knob is set to 1. However, we leave the flexibility of setting bulkdump parallelism at a SS here).
- Each subrange does not necessarily have a byteSample file and data file which depends on the data size. A SS may be assigned a range but the range is empty.
- When user issuing a bulk dump task, the client will check if there is an ongoing bulk load task. If yes, reject the request.

Failure handling
----------------
- SS failure: DD will receive broken_promise. DD gives up working on the range at this time. DD will issue the request in the future until the range completes.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
- SS failure: DD will receive broken_promise. DD gives up working on the range at this time. DD will issue the request in the future until the range completes.
- SS failure: DD will receive broken_promise. DD gives up working on the range at this time. DD will re-issue the request in the future until the range completes.

- DD failure: It is possible that the same SS recieves two requests to work on the same range. SS uses a FlowLock to guarantee that SS handles one request at a time. So, there is no conflict.
- S3 outage: Result in task failure. The failed task will be retried by DD.
3 changes: 3 additions & 0 deletions documentation/sphinx/source/technical-overview.rst
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@ These documents explain the engineering design of FoundationDB, with detailed in

* :doc:`consistency-check-urgent` describes how to complete a consistency scan of the entire database in a fast way.

* :doc:`bulkdump` describes how to do snapshot data dump to blobstore or local file system.

.. toctree::
:maxdepth: 1
:titlesonly:
Expand All @@ -57,3 +59,4 @@ These documents explain the engineering design of FoundationDB, with detailed in
read-write-path
ha-write-path
consistency-check-urgent
bulkdump
26 changes: 26 additions & 0 deletions fdbclient/BulkDumping.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
/*
* BulkDumping.cpp
*
* This source file is part of the FoundationDB open source project
*
* Copyright 2013-2024 Apple Inc. and the FoundationDB project authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

#include "fdbclient/BulkDumping.h"

BulkDumpState newBulkDumpTaskLocalSST(const KeyRange& range, const std::string& remoteRoot) {
return BulkDumpState(
range, BulkDumpFileType::SST, BulkDumpTransportMethod::CP, BulkDumpExportMethod::File, remoteRoot);
}
178 changes: 178 additions & 0 deletions fdbclient/ManagementAPI.actor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@

#include "fdbclient/GenericManagementAPI.actor.h"
#include "fdbclient/RangeLock.h"
#include "flow/Error.h"
#include "fmt/format.h"
#include "fdbclient/Knobs.h"
#include "flow/Arena.h"
Expand Down Expand Up @@ -2973,6 +2974,183 @@ ACTOR Future<Void> acknowledgeBulkLoadTask(Database cx, KeyRange range, UID task
return Void();
}

ACTOR Future<int> setBulkDumpMode(Database cx, int mode) {
kakaiu marked this conversation as resolved.
Show resolved Hide resolved
state Transaction tr(cx);
state BinaryWriter wr(Unversioned());
wr << mode;
loop {
try {
tr.setOption(FDBTransactionOptions::LOCK_AWARE);
tr.setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS);
tr.setOption(FDBTransactionOptions::PRIORITY_SYSTEM_IMMEDIATE);
state int oldMode = 0;
Optional<Value> oldModeValue = wait(tr.get(bulkDumpModeKey));
if (oldModeValue.present()) {
BinaryReader rd(oldModeValue.get(), Unversioned());
rd >> oldMode;
}
if (oldMode != mode) {
BinaryWriter wrMyOwner(Unversioned());
wrMyOwner << dataDistributionModeLock;
tr.set(moveKeysLockOwnerKey, wrMyOwner.toValue());
BinaryWriter wrLastWrite(Unversioned());
wrLastWrite << deterministicRandom()->randomUniqueID(); // triger DD restarts
tr.set(moveKeysLockWriteKey, wrLastWrite.toValue());
tr.set(bulkDumpModeKey, wr.toValue());
wait(tr.commit());
TraceEvent("DDBulkDumpModeKeyChanged").detail("NewMode", mode).detail("OldMode", oldMode);
}
return oldMode;
} catch (Error& e) {
wait(tr.onError(e));
}
}
}

// Return job Id if existing any bulk dump job globally.
// There is at most on bulk dump job at any time on the entire key space.
// A job of a range can spawn multiple tasks according to the shard boundary.
// Those tasks share the same job Id (aka belonging to the same job).
ACTOR Future<Optional<UID>> existAnyBulkDumpTask(Transaction* tr) {
state RangeResult rangeResult;
wait(store(rangeResult,
krmGetRanges(tr,
bulkDumpPrefix,
normalKeys,
CLIENT_KNOBS->KRM_GET_RANGE_LIMIT,
CLIENT_KNOBS->KRM_GET_RANGE_LIMIT_BYTES)));
// krmGetRanges splits the result into batches.
// Check first batch is enough since we only check if any task exists
for (int i = 0; i < rangeResult.size() - 1; ++i) {
if (rangeResult[i].value.empty()) {
continue;
}
BulkDumpState bulkDumpState = decodeBulkDumpState(rangeResult[i].value);
return bulkDumpState.getJobId();
}
return Optional<UID>();
}

ACTOR Future<Void> submitBulkDumpJob(Database cx, BulkDumpState bulkDumpTask) {
state Transaction tr(cx);
loop {
try {
tr.setOption(FDBTransactionOptions::LOCK_AWARE);
tr.setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS);
Optional<UID> existJobId = wait(existAnyBulkDumpTask(&tr));
if (existJobId.present() && existJobId.get() != bulkDumpTask.getJobId()) {
TraceEvent(SevWarn, "SubmitBulkDumpTaskError")
.setMaxEventLength(-1)
.setMaxFieldLength(-1)
.detail("Reason", "ExistingDifferentJob")
.detail("ExistingJobId", existJobId.get())
.detail("NewJobId", bulkDumpTask.getJobId())
.detail("Task", bulkDumpTask.toString());
}
if (existJobId.present() && existJobId.get() == bulkDumpTask.getJobId()) {
TraceEvent(SevWarn, "SubmitBulkDumpTaskError")
.setMaxEventLength(-1)
.setMaxFieldLength(-1)
.detail("Reason", "ExistingSameJob")
.detail("ExistingJobId", existJobId.get())
.detail("NewJobId", bulkDumpTask.getJobId())
.detail("Task", bulkDumpTask.toString());
return Void(); // give up if a job with same id has been submitted
}
if (bulkDumpTask.getPhase() != BulkDumpPhase::Submitted) {
TraceEvent(g_network->isSimulated() ? SevError : SevWarnAlways, "SubmitBulkDumpTaskError")
.setMaxEventLength(-1)
.setMaxFieldLength(-1)
.detail("Reason", "WrongPhase")
.detail("Task", bulkDumpTask.toString());
throw bulkdump_task_failed();
}
if (!normalKeys.contains(bulkDumpTask.getRange())) {
TraceEvent(g_network->isSimulated() ? SevError : SevWarnAlways, "SubmitBulkLoadTaskError")
.setMaxEventLength(-1)
.setMaxFieldLength(-1)
.detail("Reason", "RangeOutOfScope")
.detail("Task", bulkDumpTask.toString());
throw bulkdump_task_failed();
}
wait(krmSetRange(&tr, bulkDumpPrefix, bulkDumpTask.getRange(), bulkDumpStateValue(bulkDumpTask)));
wait(tr.commit());
break;
} catch (Error& e) {
wait(tr.onError(e));
}
}
return Void();
}

ACTOR Future<Void> clearBulkDumpJob(Database cx) {
state Transaction tr(cx);
loop {
try {
tr.clear(allKeys.withPrefix(bulkDumpPrefix));
wait(tr.commit());
break;
} catch (Error& e) {
wait(tr.onError(e));
}
}
return Void();
}

ACTOR Future<std::vector<BulkDumpState>> getBulkDumpTasksWithinRange(Database cx,
KeyRange rangeToRead,
Optional<size_t> limit,
Optional<BulkDumpPhase> phase) {
state Transaction tr(cx);
state Key readBegin = rangeToRead.begin;
state Key readEnd = rangeToRead.end;
state RangeResult rangeResult;
state std::vector<BulkDumpState> res;
while (readBegin < readEnd) {
state int retryCount = 0;
loop {
try {
rangeResult.clear();
tr.setOption(FDBTransactionOptions::READ_SYSTEM_KEYS);
tr.setOption(FDBTransactionOptions::LOCK_AWARE);
wait(store(rangeResult,
krmGetRanges(&tr,
bulkDumpPrefix,
KeyRangeRef(readBegin, readEnd),
CLIENT_KNOBS->KRM_GET_RANGE_LIMIT,
CLIENT_KNOBS->KRM_GET_RANGE_LIMIT_BYTES)));
break;
} catch (Error& e) {
if (retryCount > 30) {
throw timed_out();
}
wait(tr.onError(e));
retryCount++;
}
}
for (int i = 0; i < rangeResult.size() - 1; ++i) {
if (rangeResult[i].value.empty()) {
continue;
}
BulkDumpState bulkDumpState = decodeBulkDumpState(rangeResult[i].value);
KeyRange range = Standalone(KeyRangeRef(rangeResult[i].key, rangeResult[i + 1].key));
if (range != bulkDumpState.getRange()) {
ASSERT(bulkDumpState.getRange().contains(range));
continue;
}
if (!phase.present() || phase.get() == bulkDumpState.getPhase()) {
res.push_back(bulkDumpState);
}
if (limit.present() && res.size() >= limit.get()) {
return res;
}
}
readBegin = rangeResult.back().key;
}

return res;
}

// Persist a new owner if input uniqueId is not existing; Update description if input uniqueId exists
ACTOR Future<Void> registerRangeLockOwner(Database cx, std::string uniqueId, std::string description) {
if (uniqueId.empty() || description.empty()) {
Expand Down
6 changes: 6 additions & 0 deletions fdbclient/ServerKnobs.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -381,6 +381,12 @@ void ServerKnobs::initialize(Randomize randomize, ClientKnobs* clientKnobs, IsSi
init( DD_BULKLOAD_PARALLELISM, 10 ); if( randomize && BUGGIFY ) DD_BULKLOAD_PARALLELISM = deterministicRandom()->randomInt(1, 10);
init( DD_BULKLOAD_SCHEDULE_MIN_INTERVAL_SEC, 2.0 ); if( randomize && BUGGIFY ) DD_BULKLOAD_SCHEDULE_MIN_INTERVAL_SEC = deterministicRandom()->random01() * 10 + 1;

// BulkDumping
init( DD_BULKDUMP_TASK_METADATA_READ_SIZE, 100 ); if( randomize && BUGGIFY ) DD_BULKDUMP_TASK_METADATA_READ_SIZE = deterministicRandom()->randomInt(2, 100);
init( DD_BULKDUMP_SCHEDULE_MIN_INTERVAL_SEC, 2.0 ); if( randomize && BUGGIFY ) DD_BULKDUMP_SCHEDULE_MIN_INTERVAL_SEC = deterministicRandom()->random01() * 10 + 1;
init( SS_SERVE_BULKDUMP_PARALLELISM, 1 ); // TODO(BulkDump): Do not set to 1 after SS can resolve the file folder conflict
init( DD_BULKDUMP_PARALLELISM, 50 ); if ( randomize && BUGGIFY ) DD_BULKDUMP_PARALLELISM = deterministicRandom()->randomInt(1, 5);

// TeamRemover
init( TR_LOW_SPACE_PIVOT_DELAY_SEC, 0 ); if (isSimulated) TR_LOW_SPACE_PIVOT_DELAY_SEC = deterministicRandom()->randomInt(0, 3);
init( TR_FLAG_DISABLE_MACHINE_TEAM_REMOVER, false ); if( randomize && BUGGIFY ) TR_FLAG_DISABLE_MACHINE_TEAM_REMOVER = deterministicRandom()->random01() < 0.1 ? true : false; // false by default. disable the consistency check when it's true
Expand Down
16 changes: 16 additions & 0 deletions fdbclient/SystemData.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1210,6 +1210,22 @@ BulkLoadState decodeBulkLoadState(const ValueRef& value) {
return bulkLoadState;
}

// Bulk dumping keys
const KeyRef bulkDumpModeKey = "\xff/bulkDumpMode"_sr;
const KeyRangeRef bulkDumpKeys = KeyRangeRef("\xff/bulkDump/"_sr, "\xff/bulkDump0"_sr);
const KeyRef bulkDumpPrefix = bulkDumpKeys.begin;

const Value bulkDumpStateValue(const BulkDumpState& bulkDumpState) {
return ObjectWriter::toValue(bulkDumpState, IncludeVersion());
}

BulkDumpState decodeBulkDumpState(const ValueRef& value) {
BulkDumpState bulkDumpState;
ObjectReader reader(value.begin(), IncludeVersion());
reader.deserialize(bulkDumpState);
return bulkDumpState;
}

// Range Lock
const std::string rangeLockNameForBulkLoad = "BulkLoad";

Expand Down
Loading